xref: /openbmc/linux/drivers/block/drbd/drbd_receiver.c (revision baa7eb025ab14f3cba2e35c0a8648f9c9f01d24f)
1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48 
49 #include "drbd_vli.h"
50 
51 enum finish_epoch {
52 	FE_STILL_LIVE,
53 	FE_DESTROYED,
54 	FE_RECYCLED,
55 };
56 
57 static int drbd_do_handshake(struct drbd_conf *mdev);
58 static int drbd_do_auth(struct drbd_conf *mdev);
59 
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62 
63 
64 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
65 
66 /*
67  * some helper functions to deal with single linked page lists,
68  * page->private being our "next" pointer.
69  */
70 
71 /* If at least n pages are linked at head, get n pages off.
72  * Otherwise, don't modify head, and return NULL.
73  * Locking is the responsibility of the caller.
74  */
75 static struct page *page_chain_del(struct page **head, int n)
76 {
77 	struct page *page;
78 	struct page *tmp;
79 
80 	BUG_ON(!n);
81 	BUG_ON(!head);
82 
83 	page = *head;
84 
85 	if (!page)
86 		return NULL;
87 
88 	while (page) {
89 		tmp = page_chain_next(page);
90 		if (--n == 0)
91 			break; /* found sufficient pages */
92 		if (tmp == NULL)
93 			/* insufficient pages, don't use any of them. */
94 			return NULL;
95 		page = tmp;
96 	}
97 
98 	/* add end of list marker for the returned list */
99 	set_page_private(page, 0);
100 	/* actual return value, and adjustment of head */
101 	page = *head;
102 	*head = tmp;
103 	return page;
104 }
105 
106 /* may be used outside of locks to find the tail of a (usually short)
107  * "private" page chain, before adding it back to a global chain head
108  * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
110 {
111 	struct page *tmp;
112 	int i = 1;
113 	while ((tmp = page_chain_next(page)))
114 		++i, page = tmp;
115 	if (len)
116 		*len = i;
117 	return page;
118 }
119 
120 static int page_chain_free(struct page *page)
121 {
122 	struct page *tmp;
123 	int i = 0;
124 	page_chain_for_each_safe(page, tmp) {
125 		put_page(page);
126 		++i;
127 	}
128 	return i;
129 }
130 
131 static void page_chain_add(struct page **head,
132 		struct page *chain_first, struct page *chain_last)
133 {
134 #if 1
135 	struct page *tmp;
136 	tmp = page_chain_tail(chain_first, NULL);
137 	BUG_ON(tmp != chain_last);
138 #endif
139 
140 	/* add chain to head */
141 	set_page_private(chain_last, (unsigned long)*head);
142 	*head = chain_first;
143 }
144 
145 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
146 {
147 	struct page *page = NULL;
148 	struct page *tmp = NULL;
149 	int i = 0;
150 
151 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
152 	 * So what. It saves a spin_lock. */
153 	if (drbd_pp_vacant >= number) {
154 		spin_lock(&drbd_pp_lock);
155 		page = page_chain_del(&drbd_pp_pool, number);
156 		if (page)
157 			drbd_pp_vacant -= number;
158 		spin_unlock(&drbd_pp_lock);
159 		if (page)
160 			return page;
161 	}
162 
163 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
165 	 * which in turn might block on the other node at this very place.  */
166 	for (i = 0; i < number; i++) {
167 		tmp = alloc_page(GFP_TRY);
168 		if (!tmp)
169 			break;
170 		set_page_private(tmp, (unsigned long)page);
171 		page = tmp;
172 	}
173 
174 	if (i == number)
175 		return page;
176 
177 	/* Not enough pages immediately available this time.
178 	 * No need to jump around here, drbd_pp_alloc will retry this
179 	 * function "soon". */
180 	if (page) {
181 		tmp = page_chain_tail(page, NULL);
182 		spin_lock(&drbd_pp_lock);
183 		page_chain_add(&drbd_pp_pool, page, tmp);
184 		drbd_pp_vacant += i;
185 		spin_unlock(&drbd_pp_lock);
186 	}
187 	return NULL;
188 }
189 
190 /* kick lower level device, if we have more than (arbitrary number)
191  * reference counts on it, which typically are locally submitted io
192  * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
193 static void maybe_kick_lo(struct drbd_conf *mdev)
194 {
195 	if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
196 		drbd_kick_lo(mdev);
197 }
198 
199 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
200 {
201 	struct drbd_epoch_entry *e;
202 	struct list_head *le, *tle;
203 
204 	/* The EEs are always appended to the end of the list. Since
205 	   they are sent in order over the wire, they have to finish
206 	   in order. As soon as we see the first not finished we can
207 	   stop to examine the list... */
208 
209 	list_for_each_safe(le, tle, &mdev->net_ee) {
210 		e = list_entry(le, struct drbd_epoch_entry, w.list);
211 		if (drbd_ee_has_active_page(e))
212 			break;
213 		list_move(le, to_be_freed);
214 	}
215 }
216 
217 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
218 {
219 	LIST_HEAD(reclaimed);
220 	struct drbd_epoch_entry *e, *t;
221 
222 	maybe_kick_lo(mdev);
223 	spin_lock_irq(&mdev->req_lock);
224 	reclaim_net_ee(mdev, &reclaimed);
225 	spin_unlock_irq(&mdev->req_lock);
226 
227 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
228 		drbd_free_net_ee(mdev, e);
229 }
230 
231 /**
232  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:	DRBD device.
234  * @number:	number of pages requested
235  * @retry:	whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
244 {
245 	struct page *page = NULL;
246 	DEFINE_WAIT(wait);
247 
248 	/* Yes, we may run up to @number over max_buffers. If we
249 	 * follow it strictly, the admin will get it wrong anyways. */
250 	if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
251 		page = drbd_pp_first_pages_or_try_alloc(mdev, number);
252 
253 	while (page == NULL) {
254 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
255 
256 		drbd_kick_lo_and_reclaim_net(mdev);
257 
258 		if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
259 			page = drbd_pp_first_pages_or_try_alloc(mdev, number);
260 			if (page)
261 				break;
262 		}
263 
264 		if (!retry)
265 			break;
266 
267 		if (signal_pending(current)) {
268 			dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
269 			break;
270 		}
271 
272 		schedule();
273 	}
274 	finish_wait(&drbd_pp_wait, &wait);
275 
276 	if (page)
277 		atomic_add(number, &mdev->pp_in_use);
278 	return page;
279 }
280 
281 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
282  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
283  * Either links the page chain back to the global pool,
284  * or returns all pages to the system. */
285 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
286 {
287 	atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
288 	int i;
289 
290 	if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
291 		i = page_chain_free(page);
292 	else {
293 		struct page *tmp;
294 		tmp = page_chain_tail(page, &i);
295 		spin_lock(&drbd_pp_lock);
296 		page_chain_add(&drbd_pp_pool, page, tmp);
297 		drbd_pp_vacant += i;
298 		spin_unlock(&drbd_pp_lock);
299 	}
300 	i = atomic_sub_return(i, a);
301 	if (i < 0)
302 		dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
303 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
304 	wake_up(&drbd_pp_wait);
305 }
306 
307 /*
308 You need to hold the req_lock:
309  _drbd_wait_ee_list_empty()
310 
311 You must not have the req_lock:
312  drbd_free_ee()
313  drbd_alloc_ee()
314  drbd_init_ee()
315  drbd_release_ee()
316  drbd_ee_fix_bhs()
317  drbd_process_done_ee()
318  drbd_clear_done_ee()
319  drbd_wait_ee_list_empty()
320 */
321 
322 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
323 				     u64 id,
324 				     sector_t sector,
325 				     unsigned int data_size,
326 				     gfp_t gfp_mask) __must_hold(local)
327 {
328 	struct drbd_epoch_entry *e;
329 	struct page *page;
330 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
331 
332 	if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
333 		return NULL;
334 
335 	e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
336 	if (!e) {
337 		if (!(gfp_mask & __GFP_NOWARN))
338 			dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
339 		return NULL;
340 	}
341 
342 	page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
343 	if (!page)
344 		goto fail;
345 
346 	INIT_HLIST_NODE(&e->colision);
347 	e->epoch = NULL;
348 	e->mdev = mdev;
349 	e->pages = page;
350 	atomic_set(&e->pending_bios, 0);
351 	e->size = data_size;
352 	e->flags = 0;
353 	e->sector = sector;
354 	e->block_id = id;
355 
356 	return e;
357 
358  fail:
359 	mempool_free(e, drbd_ee_mempool);
360 	return NULL;
361 }
362 
363 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
364 {
365 	if (e->flags & EE_HAS_DIGEST)
366 		kfree(e->digest);
367 	drbd_pp_free(mdev, e->pages, is_net);
368 	D_ASSERT(atomic_read(&e->pending_bios) == 0);
369 	D_ASSERT(hlist_unhashed(&e->colision));
370 	mempool_free(e, drbd_ee_mempool);
371 }
372 
373 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
374 {
375 	LIST_HEAD(work_list);
376 	struct drbd_epoch_entry *e, *t;
377 	int count = 0;
378 	int is_net = list == &mdev->net_ee;
379 
380 	spin_lock_irq(&mdev->req_lock);
381 	list_splice_init(list, &work_list);
382 	spin_unlock_irq(&mdev->req_lock);
383 
384 	list_for_each_entry_safe(e, t, &work_list, w.list) {
385 		drbd_free_some_ee(mdev, e, is_net);
386 		count++;
387 	}
388 	return count;
389 }
390 
391 
392 /*
393  * This function is called from _asender only_
394  * but see also comments in _req_mod(,barrier_acked)
395  * and receive_Barrier.
396  *
397  * Move entries from net_ee to done_ee, if ready.
398  * Grab done_ee, call all callbacks, free the entries.
399  * The callbacks typically send out ACKs.
400  */
401 static int drbd_process_done_ee(struct drbd_conf *mdev)
402 {
403 	LIST_HEAD(work_list);
404 	LIST_HEAD(reclaimed);
405 	struct drbd_epoch_entry *e, *t;
406 	int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
407 
408 	spin_lock_irq(&mdev->req_lock);
409 	reclaim_net_ee(mdev, &reclaimed);
410 	list_splice_init(&mdev->done_ee, &work_list);
411 	spin_unlock_irq(&mdev->req_lock);
412 
413 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
414 		drbd_free_net_ee(mdev, e);
415 
416 	/* possible callbacks here:
417 	 * e_end_block, and e_end_resync_block, e_send_discard_ack.
418 	 * all ignore the last argument.
419 	 */
420 	list_for_each_entry_safe(e, t, &work_list, w.list) {
421 		/* list_del not necessary, next/prev members not touched */
422 		ok = e->w.cb(mdev, &e->w, !ok) && ok;
423 		drbd_free_ee(mdev, e);
424 	}
425 	wake_up(&mdev->ee_wait);
426 
427 	return ok;
428 }
429 
430 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
431 {
432 	DEFINE_WAIT(wait);
433 
434 	/* avoids spin_lock/unlock
435 	 * and calling prepare_to_wait in the fast path */
436 	while (!list_empty(head)) {
437 		prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
438 		spin_unlock_irq(&mdev->req_lock);
439 		drbd_kick_lo(mdev);
440 		schedule();
441 		finish_wait(&mdev->ee_wait, &wait);
442 		spin_lock_irq(&mdev->req_lock);
443 	}
444 }
445 
446 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
447 {
448 	spin_lock_irq(&mdev->req_lock);
449 	_drbd_wait_ee_list_empty(mdev, head);
450 	spin_unlock_irq(&mdev->req_lock);
451 }
452 
453 /* see also kernel_accept; which is only present since 2.6.18.
454  * also we want to log which part of it failed, exactly */
455 static int drbd_accept(struct drbd_conf *mdev, const char **what,
456 		struct socket *sock, struct socket **newsock)
457 {
458 	struct sock *sk = sock->sk;
459 	int err = 0;
460 
461 	*what = "listen";
462 	err = sock->ops->listen(sock, 5);
463 	if (err < 0)
464 		goto out;
465 
466 	*what = "sock_create_lite";
467 	err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
468 			       newsock);
469 	if (err < 0)
470 		goto out;
471 
472 	*what = "accept";
473 	err = sock->ops->accept(sock, *newsock, 0);
474 	if (err < 0) {
475 		sock_release(*newsock);
476 		*newsock = NULL;
477 		goto out;
478 	}
479 	(*newsock)->ops  = sock->ops;
480 
481 out:
482 	return err;
483 }
484 
485 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
486 		    void *buf, size_t size, int flags)
487 {
488 	mm_segment_t oldfs;
489 	struct kvec iov = {
490 		.iov_base = buf,
491 		.iov_len = size,
492 	};
493 	struct msghdr msg = {
494 		.msg_iovlen = 1,
495 		.msg_iov = (struct iovec *)&iov,
496 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
497 	};
498 	int rv;
499 
500 	oldfs = get_fs();
501 	set_fs(KERNEL_DS);
502 	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
503 	set_fs(oldfs);
504 
505 	return rv;
506 }
507 
508 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
509 {
510 	mm_segment_t oldfs;
511 	struct kvec iov = {
512 		.iov_base = buf,
513 		.iov_len = size,
514 	};
515 	struct msghdr msg = {
516 		.msg_iovlen = 1,
517 		.msg_iov = (struct iovec *)&iov,
518 		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL
519 	};
520 	int rv;
521 
522 	oldfs = get_fs();
523 	set_fs(KERNEL_DS);
524 
525 	for (;;) {
526 		rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
527 		if (rv == size)
528 			break;
529 
530 		/* Note:
531 		 * ECONNRESET	other side closed the connection
532 		 * ERESTARTSYS	(on  sock) we got a signal
533 		 */
534 
535 		if (rv < 0) {
536 			if (rv == -ECONNRESET)
537 				dev_info(DEV, "sock was reset by peer\n");
538 			else if (rv != -ERESTARTSYS)
539 				dev_err(DEV, "sock_recvmsg returned %d\n", rv);
540 			break;
541 		} else if (rv == 0) {
542 			dev_info(DEV, "sock was shut down by peer\n");
543 			break;
544 		} else	{
545 			/* signal came in, or peer/link went down,
546 			 * after we read a partial message
547 			 */
548 			/* D_ASSERT(signal_pending(current)); */
549 			break;
550 		}
551 	};
552 
553 	set_fs(oldfs);
554 
555 	if (rv != size)
556 		drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
557 
558 	return rv;
559 }
560 
561 /* quoting tcp(7):
562  *   On individual connections, the socket buffer size must be set prior to the
563  *   listen(2) or connect(2) calls in order to have it take effect.
564  * This is our wrapper to do so.
565  */
566 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
567 		unsigned int rcv)
568 {
569 	/* open coded SO_SNDBUF, SO_RCVBUF */
570 	if (snd) {
571 		sock->sk->sk_sndbuf = snd;
572 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
573 	}
574 	if (rcv) {
575 		sock->sk->sk_rcvbuf = rcv;
576 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
577 	}
578 }
579 
580 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
581 {
582 	const char *what;
583 	struct socket *sock;
584 	struct sockaddr_in6 src_in6;
585 	int err;
586 	int disconnect_on_error = 1;
587 
588 	if (!get_net_conf(mdev))
589 		return NULL;
590 
591 	what = "sock_create_kern";
592 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
593 		SOCK_STREAM, IPPROTO_TCP, &sock);
594 	if (err < 0) {
595 		sock = NULL;
596 		goto out;
597 	}
598 
599 	sock->sk->sk_rcvtimeo =
600 	sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
601 	drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
602 			mdev->net_conf->rcvbuf_size);
603 
604        /* explicitly bind to the configured IP as source IP
605 	*  for the outgoing connections.
606 	*  This is needed for multihomed hosts and to be
607 	*  able to use lo: interfaces for drbd.
608 	* Make sure to use 0 as port number, so linux selects
609 	*  a free one dynamically.
610 	*/
611 	memcpy(&src_in6, mdev->net_conf->my_addr,
612 	       min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
613 	if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
614 		src_in6.sin6_port = 0;
615 	else
616 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
617 
618 	what = "bind before connect";
619 	err = sock->ops->bind(sock,
620 			      (struct sockaddr *) &src_in6,
621 			      mdev->net_conf->my_addr_len);
622 	if (err < 0)
623 		goto out;
624 
625 	/* connect may fail, peer not yet available.
626 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
627 	disconnect_on_error = 0;
628 	what = "connect";
629 	err = sock->ops->connect(sock,
630 				 (struct sockaddr *)mdev->net_conf->peer_addr,
631 				 mdev->net_conf->peer_addr_len, 0);
632 
633 out:
634 	if (err < 0) {
635 		if (sock) {
636 			sock_release(sock);
637 			sock = NULL;
638 		}
639 		switch (-err) {
640 			/* timeout, busy, signal pending */
641 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
642 		case EINTR: case ERESTARTSYS:
643 			/* peer not (yet) available, network problem */
644 		case ECONNREFUSED: case ENETUNREACH:
645 		case EHOSTDOWN:    case EHOSTUNREACH:
646 			disconnect_on_error = 0;
647 			break;
648 		default:
649 			dev_err(DEV, "%s failed, err = %d\n", what, err);
650 		}
651 		if (disconnect_on_error)
652 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
653 	}
654 	put_net_conf(mdev);
655 	return sock;
656 }
657 
658 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
659 {
660 	int timeo, err;
661 	struct socket *s_estab = NULL, *s_listen;
662 	const char *what;
663 
664 	if (!get_net_conf(mdev))
665 		return NULL;
666 
667 	what = "sock_create_kern";
668 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
669 		SOCK_STREAM, IPPROTO_TCP, &s_listen);
670 	if (err) {
671 		s_listen = NULL;
672 		goto out;
673 	}
674 
675 	timeo = mdev->net_conf->try_connect_int * HZ;
676 	timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
677 
678 	s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
679 	s_listen->sk->sk_rcvtimeo = timeo;
680 	s_listen->sk->sk_sndtimeo = timeo;
681 	drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
682 			mdev->net_conf->rcvbuf_size);
683 
684 	what = "bind before listen";
685 	err = s_listen->ops->bind(s_listen,
686 			      (struct sockaddr *) mdev->net_conf->my_addr,
687 			      mdev->net_conf->my_addr_len);
688 	if (err < 0)
689 		goto out;
690 
691 	err = drbd_accept(mdev, &what, s_listen, &s_estab);
692 
693 out:
694 	if (s_listen)
695 		sock_release(s_listen);
696 	if (err < 0) {
697 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
698 			dev_err(DEV, "%s failed, err = %d\n", what, err);
699 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
700 		}
701 	}
702 	put_net_conf(mdev);
703 
704 	return s_estab;
705 }
706 
707 static int drbd_send_fp(struct drbd_conf *mdev,
708 	struct socket *sock, enum drbd_packets cmd)
709 {
710 	struct p_header80 *h = &mdev->data.sbuf.header.h80;
711 
712 	return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
713 }
714 
715 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
716 {
717 	struct p_header80 *h = &mdev->data.rbuf.header.h80;
718 	int rr;
719 
720 	rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
721 
722 	if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
723 		return be16_to_cpu(h->command);
724 
725 	return 0xffff;
726 }
727 
728 /**
729  * drbd_socket_okay() - Free the socket if its connection is not okay
730  * @mdev:	DRBD device.
731  * @sock:	pointer to the pointer to the socket.
732  */
733 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
734 {
735 	int rr;
736 	char tb[4];
737 
738 	if (!*sock)
739 		return FALSE;
740 
741 	rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
742 
743 	if (rr > 0 || rr == -EAGAIN) {
744 		return TRUE;
745 	} else {
746 		sock_release(*sock);
747 		*sock = NULL;
748 		return FALSE;
749 	}
750 }
751 
752 /*
753  * return values:
754  *   1 yes, we have a valid connection
755  *   0 oops, did not work out, please try again
756  *  -1 peer talks different language,
757  *     no point in trying again, please go standalone.
758  *  -2 We do not have a network config...
759  */
760 static int drbd_connect(struct drbd_conf *mdev)
761 {
762 	struct socket *s, *sock, *msock;
763 	int try, h, ok;
764 
765 	D_ASSERT(!mdev->data.socket);
766 
767 	if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
768 		return -2;
769 
770 	clear_bit(DISCARD_CONCURRENT, &mdev->flags);
771 
772 	sock  = NULL;
773 	msock = NULL;
774 
775 	do {
776 		for (try = 0;;) {
777 			/* 3 tries, this should take less than a second! */
778 			s = drbd_try_connect(mdev);
779 			if (s || ++try >= 3)
780 				break;
781 			/* give the other side time to call bind() & listen() */
782 			__set_current_state(TASK_INTERRUPTIBLE);
783 			schedule_timeout(HZ / 10);
784 		}
785 
786 		if (s) {
787 			if (!sock) {
788 				drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
789 				sock = s;
790 				s = NULL;
791 			} else if (!msock) {
792 				drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
793 				msock = s;
794 				s = NULL;
795 			} else {
796 				dev_err(DEV, "Logic error in drbd_connect()\n");
797 				goto out_release_sockets;
798 			}
799 		}
800 
801 		if (sock && msock) {
802 			__set_current_state(TASK_INTERRUPTIBLE);
803 			schedule_timeout(HZ / 10);
804 			ok = drbd_socket_okay(mdev, &sock);
805 			ok = drbd_socket_okay(mdev, &msock) && ok;
806 			if (ok)
807 				break;
808 		}
809 
810 retry:
811 		s = drbd_wait_for_connect(mdev);
812 		if (s) {
813 			try = drbd_recv_fp(mdev, s);
814 			drbd_socket_okay(mdev, &sock);
815 			drbd_socket_okay(mdev, &msock);
816 			switch (try) {
817 			case P_HAND_SHAKE_S:
818 				if (sock) {
819 					dev_warn(DEV, "initial packet S crossed\n");
820 					sock_release(sock);
821 				}
822 				sock = s;
823 				break;
824 			case P_HAND_SHAKE_M:
825 				if (msock) {
826 					dev_warn(DEV, "initial packet M crossed\n");
827 					sock_release(msock);
828 				}
829 				msock = s;
830 				set_bit(DISCARD_CONCURRENT, &mdev->flags);
831 				break;
832 			default:
833 				dev_warn(DEV, "Error receiving initial packet\n");
834 				sock_release(s);
835 				if (random32() & 1)
836 					goto retry;
837 			}
838 		}
839 
840 		if (mdev->state.conn <= C_DISCONNECTING)
841 			goto out_release_sockets;
842 		if (signal_pending(current)) {
843 			flush_signals(current);
844 			smp_rmb();
845 			if (get_t_state(&mdev->receiver) == Exiting)
846 				goto out_release_sockets;
847 		}
848 
849 		if (sock && msock) {
850 			ok = drbd_socket_okay(mdev, &sock);
851 			ok = drbd_socket_okay(mdev, &msock) && ok;
852 			if (ok)
853 				break;
854 		}
855 	} while (1);
856 
857 	msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
858 	sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
859 
860 	sock->sk->sk_allocation = GFP_NOIO;
861 	msock->sk->sk_allocation = GFP_NOIO;
862 
863 	sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
864 	msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
865 
866 	/* NOT YET ...
867 	 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
868 	 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
869 	 * first set it to the P_HAND_SHAKE timeout,
870 	 * which we set to 4x the configured ping_timeout. */
871 	sock->sk->sk_sndtimeo =
872 	sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
873 
874 	msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
875 	msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
876 
877 	/* we don't want delays.
878 	 * we use TCP_CORK where apropriate, though */
879 	drbd_tcp_nodelay(sock);
880 	drbd_tcp_nodelay(msock);
881 
882 	mdev->data.socket = sock;
883 	mdev->meta.socket = msock;
884 	mdev->last_received = jiffies;
885 
886 	D_ASSERT(mdev->asender.task == NULL);
887 
888 	h = drbd_do_handshake(mdev);
889 	if (h <= 0)
890 		return h;
891 
892 	if (mdev->cram_hmac_tfm) {
893 		/* drbd_request_state(mdev, NS(conn, WFAuth)); */
894 		switch (drbd_do_auth(mdev)) {
895 		case -1:
896 			dev_err(DEV, "Authentication of peer failed\n");
897 			return -1;
898 		case 0:
899 			dev_err(DEV, "Authentication of peer failed, trying again.\n");
900 			return 0;
901 		}
902 	}
903 
904 	if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
905 		return 0;
906 
907 	sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
908 	sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
909 
910 	atomic_set(&mdev->packet_seq, 0);
911 	mdev->peer_seq = 0;
912 
913 	drbd_thread_start(&mdev->asender);
914 
915 	if (mdev->agreed_pro_version < 95 && get_ldev(mdev)) {
916 		drbd_setup_queue_param(mdev, DRBD_MAX_SIZE_H80_PACKET);
917 		put_ldev(mdev);
918 	}
919 
920 	if (!drbd_send_protocol(mdev))
921 		return -1;
922 	drbd_send_sync_param(mdev, &mdev->sync_conf);
923 	drbd_send_sizes(mdev, 0, 0);
924 	drbd_send_uuids(mdev);
925 	drbd_send_state(mdev);
926 	clear_bit(USE_DEGR_WFC_T, &mdev->flags);
927 	clear_bit(RESIZE_PENDING, &mdev->flags);
928 
929 	return 1;
930 
931 out_release_sockets:
932 	if (sock)
933 		sock_release(sock);
934 	if (msock)
935 		sock_release(msock);
936 	return -1;
937 }
938 
939 static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
940 {
941 	union p_header *h = &mdev->data.rbuf.header;
942 	int r;
943 
944 	r = drbd_recv(mdev, h, sizeof(*h));
945 	if (unlikely(r != sizeof(*h))) {
946 		dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
947 		return FALSE;
948 	}
949 
950 	if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
951 		*cmd = be16_to_cpu(h->h80.command);
952 		*packet_size = be16_to_cpu(h->h80.length);
953 	} else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
954 		*cmd = be16_to_cpu(h->h95.command);
955 		*packet_size = be32_to_cpu(h->h95.length);
956 	} else {
957 		dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
958 		    be32_to_cpu(h->h80.magic),
959 		    be16_to_cpu(h->h80.command),
960 		    be16_to_cpu(h->h80.length));
961 		return FALSE;
962 	}
963 	mdev->last_received = jiffies;
964 
965 	return TRUE;
966 }
967 
968 static void drbd_flush(struct drbd_conf *mdev)
969 {
970 	int rv;
971 
972 	if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
973 		rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
974 					NULL);
975 		if (rv) {
976 			dev_err(DEV, "local disk flush failed with status %d\n", rv);
977 			/* would rather check on EOPNOTSUPP, but that is not reliable.
978 			 * don't try again for ANY return value != 0
979 			 * if (rv == -EOPNOTSUPP) */
980 			drbd_bump_write_ordering(mdev, WO_drain_io);
981 		}
982 		put_ldev(mdev);
983 	}
984 }
985 
986 /**
987  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
988  * @mdev:	DRBD device.
989  * @epoch:	Epoch object.
990  * @ev:		Epoch event.
991  */
992 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
993 					       struct drbd_epoch *epoch,
994 					       enum epoch_event ev)
995 {
996 	int epoch_size;
997 	struct drbd_epoch *next_epoch;
998 	enum finish_epoch rv = FE_STILL_LIVE;
999 
1000 	spin_lock(&mdev->epoch_lock);
1001 	do {
1002 		next_epoch = NULL;
1003 
1004 		epoch_size = atomic_read(&epoch->epoch_size);
1005 
1006 		switch (ev & ~EV_CLEANUP) {
1007 		case EV_PUT:
1008 			atomic_dec(&epoch->active);
1009 			break;
1010 		case EV_GOT_BARRIER_NR:
1011 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1012 			break;
1013 		case EV_BECAME_LAST:
1014 			/* nothing to do*/
1015 			break;
1016 		}
1017 
1018 		if (epoch_size != 0 &&
1019 		    atomic_read(&epoch->active) == 0 &&
1020 		    test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1021 			if (!(ev & EV_CLEANUP)) {
1022 				spin_unlock(&mdev->epoch_lock);
1023 				drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1024 				spin_lock(&mdev->epoch_lock);
1025 			}
1026 			dec_unacked(mdev);
1027 
1028 			if (mdev->current_epoch != epoch) {
1029 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1030 				list_del(&epoch->list);
1031 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1032 				mdev->epochs--;
1033 				kfree(epoch);
1034 
1035 				if (rv == FE_STILL_LIVE)
1036 					rv = FE_DESTROYED;
1037 			} else {
1038 				epoch->flags = 0;
1039 				atomic_set(&epoch->epoch_size, 0);
1040 				/* atomic_set(&epoch->active, 0); is already zero */
1041 				if (rv == FE_STILL_LIVE)
1042 					rv = FE_RECYCLED;
1043 				wake_up(&mdev->ee_wait);
1044 			}
1045 		}
1046 
1047 		if (!next_epoch)
1048 			break;
1049 
1050 		epoch = next_epoch;
1051 	} while (1);
1052 
1053 	spin_unlock(&mdev->epoch_lock);
1054 
1055 	return rv;
1056 }
1057 
1058 /**
1059  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1060  * @mdev:	DRBD device.
1061  * @wo:		Write ordering method to try.
1062  */
1063 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1064 {
1065 	enum write_ordering_e pwo;
1066 	static char *write_ordering_str[] = {
1067 		[WO_none] = "none",
1068 		[WO_drain_io] = "drain",
1069 		[WO_bdev_flush] = "flush",
1070 	};
1071 
1072 	pwo = mdev->write_ordering;
1073 	wo = min(pwo, wo);
1074 	if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1075 		wo = WO_drain_io;
1076 	if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1077 		wo = WO_none;
1078 	mdev->write_ordering = wo;
1079 	if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1080 		dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1081 }
1082 
1083 /**
1084  * drbd_submit_ee()
1085  * @mdev:	DRBD device.
1086  * @e:		epoch entry
1087  * @rw:		flag field, see bio->bi_rw
1088  */
1089 /* TODO allocate from our own bio_set. */
1090 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1091 		const unsigned rw, const int fault_type)
1092 {
1093 	struct bio *bios = NULL;
1094 	struct bio *bio;
1095 	struct page *page = e->pages;
1096 	sector_t sector = e->sector;
1097 	unsigned ds = e->size;
1098 	unsigned n_bios = 0;
1099 	unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1100 
1101 	/* In most cases, we will only need one bio.  But in case the lower
1102 	 * level restrictions happen to be different at this offset on this
1103 	 * side than those of the sending peer, we may need to submit the
1104 	 * request in more than one bio. */
1105 next_bio:
1106 	bio = bio_alloc(GFP_NOIO, nr_pages);
1107 	if (!bio) {
1108 		dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1109 		goto fail;
1110 	}
1111 	/* > e->sector, unless this is the first bio */
1112 	bio->bi_sector = sector;
1113 	bio->bi_bdev = mdev->ldev->backing_bdev;
1114 	/* we special case some flags in the multi-bio case, see below
1115 	 * (REQ_UNPLUG) */
1116 	bio->bi_rw = rw;
1117 	bio->bi_private = e;
1118 	bio->bi_end_io = drbd_endio_sec;
1119 
1120 	bio->bi_next = bios;
1121 	bios = bio;
1122 	++n_bios;
1123 
1124 	page_chain_for_each(page) {
1125 		unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1126 		if (!bio_add_page(bio, page, len, 0)) {
1127 			/* a single page must always be possible! */
1128 			BUG_ON(bio->bi_vcnt == 0);
1129 			goto next_bio;
1130 		}
1131 		ds -= len;
1132 		sector += len >> 9;
1133 		--nr_pages;
1134 	}
1135 	D_ASSERT(page == NULL);
1136 	D_ASSERT(ds == 0);
1137 
1138 	atomic_set(&e->pending_bios, n_bios);
1139 	do {
1140 		bio = bios;
1141 		bios = bios->bi_next;
1142 		bio->bi_next = NULL;
1143 
1144 		/* strip off REQ_UNPLUG unless it is the last bio */
1145 		if (bios)
1146 			bio->bi_rw &= ~REQ_UNPLUG;
1147 
1148 		drbd_generic_make_request(mdev, fault_type, bio);
1149 	} while (bios);
1150 	maybe_kick_lo(mdev);
1151 	return 0;
1152 
1153 fail:
1154 	while (bios) {
1155 		bio = bios;
1156 		bios = bios->bi_next;
1157 		bio_put(bio);
1158 	}
1159 	return -ENOMEM;
1160 }
1161 
1162 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1163 {
1164 	int rv;
1165 	struct p_barrier *p = &mdev->data.rbuf.barrier;
1166 	struct drbd_epoch *epoch;
1167 
1168 	inc_unacked(mdev);
1169 
1170 	if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1171 		drbd_kick_lo(mdev);
1172 
1173 	mdev->current_epoch->barrier_nr = p->barrier;
1174 	rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1175 
1176 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1177 	 * the activity log, which means it would not be resynced in case the
1178 	 * R_PRIMARY crashes now.
1179 	 * Therefore we must send the barrier_ack after the barrier request was
1180 	 * completed. */
1181 	switch (mdev->write_ordering) {
1182 	case WO_none:
1183 		if (rv == FE_RECYCLED)
1184 			return TRUE;
1185 
1186 		/* receiver context, in the writeout path of the other node.
1187 		 * avoid potential distributed deadlock */
1188 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1189 		if (epoch)
1190 			break;
1191 		else
1192 			dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1193 			/* Fall through */
1194 
1195 	case WO_bdev_flush:
1196 	case WO_drain_io:
1197 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1198 		drbd_flush(mdev);
1199 
1200 		if (atomic_read(&mdev->current_epoch->epoch_size)) {
1201 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1202 			if (epoch)
1203 				break;
1204 		}
1205 
1206 		epoch = mdev->current_epoch;
1207 		wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1208 
1209 		D_ASSERT(atomic_read(&epoch->active) == 0);
1210 		D_ASSERT(epoch->flags == 0);
1211 
1212 		return TRUE;
1213 	default:
1214 		dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1215 		return FALSE;
1216 	}
1217 
1218 	epoch->flags = 0;
1219 	atomic_set(&epoch->epoch_size, 0);
1220 	atomic_set(&epoch->active, 0);
1221 
1222 	spin_lock(&mdev->epoch_lock);
1223 	if (atomic_read(&mdev->current_epoch->epoch_size)) {
1224 		list_add(&epoch->list, &mdev->current_epoch->list);
1225 		mdev->current_epoch = epoch;
1226 		mdev->epochs++;
1227 	} else {
1228 		/* The current_epoch got recycled while we allocated this one... */
1229 		kfree(epoch);
1230 	}
1231 	spin_unlock(&mdev->epoch_lock);
1232 
1233 	return TRUE;
1234 }
1235 
1236 /* used from receive_RSDataReply (recv_resync_read)
1237  * and from receive_Data */
1238 static struct drbd_epoch_entry *
1239 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1240 {
1241 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1242 	struct drbd_epoch_entry *e;
1243 	struct page *page;
1244 	int dgs, ds, rr;
1245 	void *dig_in = mdev->int_dig_in;
1246 	void *dig_vv = mdev->int_dig_vv;
1247 	unsigned long *data;
1248 
1249 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1250 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1251 
1252 	if (dgs) {
1253 		rr = drbd_recv(mdev, dig_in, dgs);
1254 		if (rr != dgs) {
1255 			dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1256 			     rr, dgs);
1257 			return NULL;
1258 		}
1259 	}
1260 
1261 	data_size -= dgs;
1262 
1263 	ERR_IF(data_size &  0x1ff) return NULL;
1264 	ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return NULL;
1265 
1266 	/* even though we trust out peer,
1267 	 * we sometimes have to double check. */
1268 	if (sector + (data_size>>9) > capacity) {
1269 		dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1270 			(unsigned long long)capacity,
1271 			(unsigned long long)sector, data_size);
1272 		return NULL;
1273 	}
1274 
1275 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1276 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1277 	 * which in turn might block on the other node at this very place.  */
1278 	e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1279 	if (!e)
1280 		return NULL;
1281 
1282 	ds = data_size;
1283 	page = e->pages;
1284 	page_chain_for_each(page) {
1285 		unsigned len = min_t(int, ds, PAGE_SIZE);
1286 		data = kmap(page);
1287 		rr = drbd_recv(mdev, data, len);
1288 		if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1289 			dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1290 			data[0] = data[0] ^ (unsigned long)-1;
1291 		}
1292 		kunmap(page);
1293 		if (rr != len) {
1294 			drbd_free_ee(mdev, e);
1295 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1296 			     rr, len);
1297 			return NULL;
1298 		}
1299 		ds -= rr;
1300 	}
1301 
1302 	if (dgs) {
1303 		drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1304 		if (memcmp(dig_in, dig_vv, dgs)) {
1305 			dev_err(DEV, "Digest integrity check FAILED.\n");
1306 			drbd_bcast_ee(mdev, "digest failed",
1307 					dgs, dig_in, dig_vv, e);
1308 			drbd_free_ee(mdev, e);
1309 			return NULL;
1310 		}
1311 	}
1312 	mdev->recv_cnt += data_size>>9;
1313 	return e;
1314 }
1315 
1316 /* drbd_drain_block() just takes a data block
1317  * out of the socket input buffer, and discards it.
1318  */
1319 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1320 {
1321 	struct page *page;
1322 	int rr, rv = 1;
1323 	void *data;
1324 
1325 	if (!data_size)
1326 		return TRUE;
1327 
1328 	page = drbd_pp_alloc(mdev, 1, 1);
1329 
1330 	data = kmap(page);
1331 	while (data_size) {
1332 		rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1333 		if (rr != min_t(int, data_size, PAGE_SIZE)) {
1334 			rv = 0;
1335 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1336 			     rr, min_t(int, data_size, PAGE_SIZE));
1337 			break;
1338 		}
1339 		data_size -= rr;
1340 	}
1341 	kunmap(page);
1342 	drbd_pp_free(mdev, page, 0);
1343 	return rv;
1344 }
1345 
1346 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1347 			   sector_t sector, int data_size)
1348 {
1349 	struct bio_vec *bvec;
1350 	struct bio *bio;
1351 	int dgs, rr, i, expect;
1352 	void *dig_in = mdev->int_dig_in;
1353 	void *dig_vv = mdev->int_dig_vv;
1354 
1355 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1356 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1357 
1358 	if (dgs) {
1359 		rr = drbd_recv(mdev, dig_in, dgs);
1360 		if (rr != dgs) {
1361 			dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1362 			     rr, dgs);
1363 			return 0;
1364 		}
1365 	}
1366 
1367 	data_size -= dgs;
1368 
1369 	/* optimistically update recv_cnt.  if receiving fails below,
1370 	 * we disconnect anyways, and counters will be reset. */
1371 	mdev->recv_cnt += data_size>>9;
1372 
1373 	bio = req->master_bio;
1374 	D_ASSERT(sector == bio->bi_sector);
1375 
1376 	bio_for_each_segment(bvec, bio, i) {
1377 		expect = min_t(int, data_size, bvec->bv_len);
1378 		rr = drbd_recv(mdev,
1379 			     kmap(bvec->bv_page)+bvec->bv_offset,
1380 			     expect);
1381 		kunmap(bvec->bv_page);
1382 		if (rr != expect) {
1383 			dev_warn(DEV, "short read receiving data reply: "
1384 			     "read %d expected %d\n",
1385 			     rr, expect);
1386 			return 0;
1387 		}
1388 		data_size -= rr;
1389 	}
1390 
1391 	if (dgs) {
1392 		drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1393 		if (memcmp(dig_in, dig_vv, dgs)) {
1394 			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1395 			return 0;
1396 		}
1397 	}
1398 
1399 	D_ASSERT(data_size == 0);
1400 	return 1;
1401 }
1402 
1403 /* e_end_resync_block() is called via
1404  * drbd_process_done_ee() by asender only */
1405 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1406 {
1407 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1408 	sector_t sector = e->sector;
1409 	int ok;
1410 
1411 	D_ASSERT(hlist_unhashed(&e->colision));
1412 
1413 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1414 		drbd_set_in_sync(mdev, sector, e->size);
1415 		ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1416 	} else {
1417 		/* Record failure to sync */
1418 		drbd_rs_failed_io(mdev, sector, e->size);
1419 
1420 		ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1421 	}
1422 	dec_unacked(mdev);
1423 
1424 	return ok;
1425 }
1426 
1427 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1428 {
1429 	struct drbd_epoch_entry *e;
1430 
1431 	e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1432 	if (!e)
1433 		goto fail;
1434 
1435 	dec_rs_pending(mdev);
1436 
1437 	inc_unacked(mdev);
1438 	/* corresponding dec_unacked() in e_end_resync_block()
1439 	 * respective _drbd_clear_done_ee */
1440 
1441 	e->w.cb = e_end_resync_block;
1442 
1443 	spin_lock_irq(&mdev->req_lock);
1444 	list_add(&e->w.list, &mdev->sync_ee);
1445 	spin_unlock_irq(&mdev->req_lock);
1446 
1447 	atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1448 	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1449 		return TRUE;
1450 
1451 	/* drbd_submit_ee currently fails for one reason only:
1452 	 * not being able to allocate enough bios.
1453 	 * Is dropping the connection going to help? */
1454 	spin_lock_irq(&mdev->req_lock);
1455 	list_del(&e->w.list);
1456 	spin_unlock_irq(&mdev->req_lock);
1457 
1458 	drbd_free_ee(mdev, e);
1459 fail:
1460 	put_ldev(mdev);
1461 	return FALSE;
1462 }
1463 
1464 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1465 {
1466 	struct drbd_request *req;
1467 	sector_t sector;
1468 	int ok;
1469 	struct p_data *p = &mdev->data.rbuf.data;
1470 
1471 	sector = be64_to_cpu(p->sector);
1472 
1473 	spin_lock_irq(&mdev->req_lock);
1474 	req = _ar_id_to_req(mdev, p->block_id, sector);
1475 	spin_unlock_irq(&mdev->req_lock);
1476 	if (unlikely(!req)) {
1477 		dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1478 		return FALSE;
1479 	}
1480 
1481 	/* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1482 	 * special casing it there for the various failure cases.
1483 	 * still no race with drbd_fail_pending_reads */
1484 	ok = recv_dless_read(mdev, req, sector, data_size);
1485 
1486 	if (ok)
1487 		req_mod(req, data_received);
1488 	/* else: nothing. handled from drbd_disconnect...
1489 	 * I don't think we may complete this just yet
1490 	 * in case we are "on-disconnect: freeze" */
1491 
1492 	return ok;
1493 }
1494 
1495 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1496 {
1497 	sector_t sector;
1498 	int ok;
1499 	struct p_data *p = &mdev->data.rbuf.data;
1500 
1501 	sector = be64_to_cpu(p->sector);
1502 	D_ASSERT(p->block_id == ID_SYNCER);
1503 
1504 	if (get_ldev(mdev)) {
1505 		/* data is submitted to disk within recv_resync_read.
1506 		 * corresponding put_ldev done below on error,
1507 		 * or in drbd_endio_write_sec. */
1508 		ok = recv_resync_read(mdev, sector, data_size);
1509 	} else {
1510 		if (__ratelimit(&drbd_ratelimit_state))
1511 			dev_err(DEV, "Can not write resync data to local disk.\n");
1512 
1513 		ok = drbd_drain_block(mdev, data_size);
1514 
1515 		drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1516 	}
1517 
1518 	atomic_add(data_size >> 9, &mdev->rs_sect_in);
1519 
1520 	return ok;
1521 }
1522 
1523 /* e_end_block() is called via drbd_process_done_ee().
1524  * this means this function only runs in the asender thread
1525  */
1526 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1527 {
1528 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1529 	sector_t sector = e->sector;
1530 	int ok = 1, pcmd;
1531 
1532 	if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1533 		if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1534 			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1535 				mdev->state.conn <= C_PAUSED_SYNC_T &&
1536 				e->flags & EE_MAY_SET_IN_SYNC) ?
1537 				P_RS_WRITE_ACK : P_WRITE_ACK;
1538 			ok &= drbd_send_ack(mdev, pcmd, e);
1539 			if (pcmd == P_RS_WRITE_ACK)
1540 				drbd_set_in_sync(mdev, sector, e->size);
1541 		} else {
1542 			ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1543 			/* we expect it to be marked out of sync anyways...
1544 			 * maybe assert this?  */
1545 		}
1546 		dec_unacked(mdev);
1547 	}
1548 	/* we delete from the conflict detection hash _after_ we sent out the
1549 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1550 	if (mdev->net_conf->two_primaries) {
1551 		spin_lock_irq(&mdev->req_lock);
1552 		D_ASSERT(!hlist_unhashed(&e->colision));
1553 		hlist_del_init(&e->colision);
1554 		spin_unlock_irq(&mdev->req_lock);
1555 	} else {
1556 		D_ASSERT(hlist_unhashed(&e->colision));
1557 	}
1558 
1559 	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1560 
1561 	return ok;
1562 }
1563 
1564 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1565 {
1566 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1567 	int ok = 1;
1568 
1569 	D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1570 	ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1571 
1572 	spin_lock_irq(&mdev->req_lock);
1573 	D_ASSERT(!hlist_unhashed(&e->colision));
1574 	hlist_del_init(&e->colision);
1575 	spin_unlock_irq(&mdev->req_lock);
1576 
1577 	dec_unacked(mdev);
1578 
1579 	return ok;
1580 }
1581 
1582 /* Called from receive_Data.
1583  * Synchronize packets on sock with packets on msock.
1584  *
1585  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1586  * packet traveling on msock, they are still processed in the order they have
1587  * been sent.
1588  *
1589  * Note: we don't care for Ack packets overtaking P_DATA packets.
1590  *
1591  * In case packet_seq is larger than mdev->peer_seq number, there are
1592  * outstanding packets on the msock. We wait for them to arrive.
1593  * In case we are the logically next packet, we update mdev->peer_seq
1594  * ourselves. Correctly handles 32bit wrap around.
1595  *
1596  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1597  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1598  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1599  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1600  *
1601  * returns 0 if we may process the packet,
1602  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1603 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1604 {
1605 	DEFINE_WAIT(wait);
1606 	unsigned int p_seq;
1607 	long timeout;
1608 	int ret = 0;
1609 	spin_lock(&mdev->peer_seq_lock);
1610 	for (;;) {
1611 		prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1612 		if (seq_le(packet_seq, mdev->peer_seq+1))
1613 			break;
1614 		if (signal_pending(current)) {
1615 			ret = -ERESTARTSYS;
1616 			break;
1617 		}
1618 		p_seq = mdev->peer_seq;
1619 		spin_unlock(&mdev->peer_seq_lock);
1620 		timeout = schedule_timeout(30*HZ);
1621 		spin_lock(&mdev->peer_seq_lock);
1622 		if (timeout == 0 && p_seq == mdev->peer_seq) {
1623 			ret = -ETIMEDOUT;
1624 			dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1625 			break;
1626 		}
1627 	}
1628 	finish_wait(&mdev->seq_wait, &wait);
1629 	if (mdev->peer_seq+1 == packet_seq)
1630 		mdev->peer_seq++;
1631 	spin_unlock(&mdev->peer_seq_lock);
1632 	return ret;
1633 }
1634 
1635 static unsigned long write_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1636 {
1637 	if (mdev->agreed_pro_version >= 95)
1638 		return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1639 			(dpf & DP_UNPLUG ? REQ_UNPLUG : 0) |
1640 			(dpf & DP_FUA ? REQ_FUA : 0) |
1641 			(dpf & DP_FLUSH ? REQ_FUA : 0) |
1642 			(dpf & DP_DISCARD ? REQ_DISCARD : 0);
1643 	else
1644 		return dpf & DP_RW_SYNC ? (REQ_SYNC | REQ_UNPLUG) : 0;
1645 }
1646 
1647 /* mirrored write */
1648 static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1649 {
1650 	sector_t sector;
1651 	struct drbd_epoch_entry *e;
1652 	struct p_data *p = &mdev->data.rbuf.data;
1653 	int rw = WRITE;
1654 	u32 dp_flags;
1655 
1656 	if (!get_ldev(mdev)) {
1657 		if (__ratelimit(&drbd_ratelimit_state))
1658 			dev_err(DEV, "Can not write mirrored data block "
1659 			    "to local disk.\n");
1660 		spin_lock(&mdev->peer_seq_lock);
1661 		if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1662 			mdev->peer_seq++;
1663 		spin_unlock(&mdev->peer_seq_lock);
1664 
1665 		drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1666 		atomic_inc(&mdev->current_epoch->epoch_size);
1667 		return drbd_drain_block(mdev, data_size);
1668 	}
1669 
1670 	/* get_ldev(mdev) successful.
1671 	 * Corresponding put_ldev done either below (on various errors),
1672 	 * or in drbd_endio_write_sec, if we successfully submit the data at
1673 	 * the end of this function. */
1674 
1675 	sector = be64_to_cpu(p->sector);
1676 	e = read_in_block(mdev, p->block_id, sector, data_size);
1677 	if (!e) {
1678 		put_ldev(mdev);
1679 		return FALSE;
1680 	}
1681 
1682 	e->w.cb = e_end_block;
1683 
1684 	spin_lock(&mdev->epoch_lock);
1685 	e->epoch = mdev->current_epoch;
1686 	atomic_inc(&e->epoch->epoch_size);
1687 	atomic_inc(&e->epoch->active);
1688 	spin_unlock(&mdev->epoch_lock);
1689 
1690 	dp_flags = be32_to_cpu(p->dp_flags);
1691 	rw |= write_flags_to_bio(mdev, dp_flags);
1692 
1693 	if (dp_flags & DP_MAY_SET_IN_SYNC)
1694 		e->flags |= EE_MAY_SET_IN_SYNC;
1695 
1696 	/* I'm the receiver, I do hold a net_cnt reference. */
1697 	if (!mdev->net_conf->two_primaries) {
1698 		spin_lock_irq(&mdev->req_lock);
1699 	} else {
1700 		/* don't get the req_lock yet,
1701 		 * we may sleep in drbd_wait_peer_seq */
1702 		const int size = e->size;
1703 		const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1704 		DEFINE_WAIT(wait);
1705 		struct drbd_request *i;
1706 		struct hlist_node *n;
1707 		struct hlist_head *slot;
1708 		int first;
1709 
1710 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1711 		BUG_ON(mdev->ee_hash == NULL);
1712 		BUG_ON(mdev->tl_hash == NULL);
1713 
1714 		/* conflict detection and handling:
1715 		 * 1. wait on the sequence number,
1716 		 *    in case this data packet overtook ACK packets.
1717 		 * 2. check our hash tables for conflicting requests.
1718 		 *    we only need to walk the tl_hash, since an ee can not
1719 		 *    have a conflict with an other ee: on the submitting
1720 		 *    node, the corresponding req had already been conflicting,
1721 		 *    and a conflicting req is never sent.
1722 		 *
1723 		 * Note: for two_primaries, we are protocol C,
1724 		 * so there cannot be any request that is DONE
1725 		 * but still on the transfer log.
1726 		 *
1727 		 * unconditionally add to the ee_hash.
1728 		 *
1729 		 * if no conflicting request is found:
1730 		 *    submit.
1731 		 *
1732 		 * if any conflicting request is found
1733 		 * that has not yet been acked,
1734 		 * AND I have the "discard concurrent writes" flag:
1735 		 *	 queue (via done_ee) the P_DISCARD_ACK; OUT.
1736 		 *
1737 		 * if any conflicting request is found:
1738 		 *	 block the receiver, waiting on misc_wait
1739 		 *	 until no more conflicting requests are there,
1740 		 *	 or we get interrupted (disconnect).
1741 		 *
1742 		 *	 we do not just write after local io completion of those
1743 		 *	 requests, but only after req is done completely, i.e.
1744 		 *	 we wait for the P_DISCARD_ACK to arrive!
1745 		 *
1746 		 *	 then proceed normally, i.e. submit.
1747 		 */
1748 		if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1749 			goto out_interrupted;
1750 
1751 		spin_lock_irq(&mdev->req_lock);
1752 
1753 		hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1754 
1755 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1756 		slot = tl_hash_slot(mdev, sector);
1757 		first = 1;
1758 		for (;;) {
1759 			int have_unacked = 0;
1760 			int have_conflict = 0;
1761 			prepare_to_wait(&mdev->misc_wait, &wait,
1762 				TASK_INTERRUPTIBLE);
1763 			hlist_for_each_entry(i, n, slot, colision) {
1764 				if (OVERLAPS) {
1765 					/* only ALERT on first iteration,
1766 					 * we may be woken up early... */
1767 					if (first)
1768 						dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1769 						      "	new: %llus +%u; pending: %llus +%u\n",
1770 						      current->comm, current->pid,
1771 						      (unsigned long long)sector, size,
1772 						      (unsigned long long)i->sector, i->size);
1773 					if (i->rq_state & RQ_NET_PENDING)
1774 						++have_unacked;
1775 					++have_conflict;
1776 				}
1777 			}
1778 #undef OVERLAPS
1779 			if (!have_conflict)
1780 				break;
1781 
1782 			/* Discard Ack only for the _first_ iteration */
1783 			if (first && discard && have_unacked) {
1784 				dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1785 				     (unsigned long long)sector);
1786 				inc_unacked(mdev);
1787 				e->w.cb = e_send_discard_ack;
1788 				list_add_tail(&e->w.list, &mdev->done_ee);
1789 
1790 				spin_unlock_irq(&mdev->req_lock);
1791 
1792 				/* we could probably send that P_DISCARD_ACK ourselves,
1793 				 * but I don't like the receiver using the msock */
1794 
1795 				put_ldev(mdev);
1796 				wake_asender(mdev);
1797 				finish_wait(&mdev->misc_wait, &wait);
1798 				return TRUE;
1799 			}
1800 
1801 			if (signal_pending(current)) {
1802 				hlist_del_init(&e->colision);
1803 
1804 				spin_unlock_irq(&mdev->req_lock);
1805 
1806 				finish_wait(&mdev->misc_wait, &wait);
1807 				goto out_interrupted;
1808 			}
1809 
1810 			spin_unlock_irq(&mdev->req_lock);
1811 			if (first) {
1812 				first = 0;
1813 				dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1814 				     "sec=%llus\n", (unsigned long long)sector);
1815 			} else if (discard) {
1816 				/* we had none on the first iteration.
1817 				 * there must be none now. */
1818 				D_ASSERT(have_unacked == 0);
1819 			}
1820 			schedule();
1821 			spin_lock_irq(&mdev->req_lock);
1822 		}
1823 		finish_wait(&mdev->misc_wait, &wait);
1824 	}
1825 
1826 	list_add(&e->w.list, &mdev->active_ee);
1827 	spin_unlock_irq(&mdev->req_lock);
1828 
1829 	switch (mdev->net_conf->wire_protocol) {
1830 	case DRBD_PROT_C:
1831 		inc_unacked(mdev);
1832 		/* corresponding dec_unacked() in e_end_block()
1833 		 * respective _drbd_clear_done_ee */
1834 		break;
1835 	case DRBD_PROT_B:
1836 		/* I really don't like it that the receiver thread
1837 		 * sends on the msock, but anyways */
1838 		drbd_send_ack(mdev, P_RECV_ACK, e);
1839 		break;
1840 	case DRBD_PROT_A:
1841 		/* nothing to do */
1842 		break;
1843 	}
1844 
1845 	if (mdev->state.pdsk < D_INCONSISTENT) {
1846 		/* In case we have the only disk of the cluster, */
1847 		drbd_set_out_of_sync(mdev, e->sector, e->size);
1848 		e->flags |= EE_CALL_AL_COMPLETE_IO;
1849 		e->flags &= ~EE_MAY_SET_IN_SYNC;
1850 		drbd_al_begin_io(mdev, e->sector);
1851 	}
1852 
1853 	if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1854 		return TRUE;
1855 
1856 	/* drbd_submit_ee currently fails for one reason only:
1857 	 * not being able to allocate enough bios.
1858 	 * Is dropping the connection going to help? */
1859 	spin_lock_irq(&mdev->req_lock);
1860 	list_del(&e->w.list);
1861 	hlist_del_init(&e->colision);
1862 	spin_unlock_irq(&mdev->req_lock);
1863 	if (e->flags & EE_CALL_AL_COMPLETE_IO)
1864 		drbd_al_complete_io(mdev, e->sector);
1865 
1866 out_interrupted:
1867 	/* yes, the epoch_size now is imbalanced.
1868 	 * but we drop the connection anyways, so we don't have a chance to
1869 	 * receive a barrier... atomic_inc(&mdev->epoch_size); */
1870 	put_ldev(mdev);
1871 	drbd_free_ee(mdev, e);
1872 	return FALSE;
1873 }
1874 
1875 /* We may throttle resync, if the lower device seems to be busy,
1876  * and current sync rate is above c_min_rate.
1877  *
1878  * To decide whether or not the lower device is busy, we use a scheme similar
1879  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1880  * (more than 64 sectors) of activity we cannot account for with our own resync
1881  * activity, it obviously is "busy".
1882  *
1883  * The current sync rate used here uses only the most recent two step marks,
1884  * to have a short time average so we can react faster.
1885  */
1886 int drbd_rs_should_slow_down(struct drbd_conf *mdev)
1887 {
1888 	struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1889 	unsigned long db, dt, dbdt;
1890 	int curr_events;
1891 	int throttle = 0;
1892 
1893 	/* feature disabled? */
1894 	if (mdev->sync_conf.c_min_rate == 0)
1895 		return 0;
1896 
1897 	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1898 		      (int)part_stat_read(&disk->part0, sectors[1]) -
1899 			atomic_read(&mdev->rs_sect_ev);
1900 	if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1901 		unsigned long rs_left;
1902 		int i;
1903 
1904 		mdev->rs_last_events = curr_events;
1905 
1906 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1907 		 * approx. */
1908 		i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-2) % DRBD_SYNC_MARKS;
1909 		rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1910 
1911 		dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1912 		if (!dt)
1913 			dt++;
1914 		db = mdev->rs_mark_left[i] - rs_left;
1915 		dbdt = Bit2KB(db/dt);
1916 
1917 		if (dbdt > mdev->sync_conf.c_min_rate)
1918 			throttle = 1;
1919 	}
1920 	return throttle;
1921 }
1922 
1923 
1924 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1925 {
1926 	sector_t sector;
1927 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1928 	struct drbd_epoch_entry *e;
1929 	struct digest_info *di = NULL;
1930 	int size, verb;
1931 	unsigned int fault_type;
1932 	struct p_block_req *p =	&mdev->data.rbuf.block_req;
1933 
1934 	sector = be64_to_cpu(p->sector);
1935 	size   = be32_to_cpu(p->blksize);
1936 
1937 	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
1938 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1939 				(unsigned long long)sector, size);
1940 		return FALSE;
1941 	}
1942 	if (sector + (size>>9) > capacity) {
1943 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1944 				(unsigned long long)sector, size);
1945 		return FALSE;
1946 	}
1947 
1948 	if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1949 		verb = 1;
1950 		switch (cmd) {
1951 		case P_DATA_REQUEST:
1952 			drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
1953 			break;
1954 		case P_RS_DATA_REQUEST:
1955 		case P_CSUM_RS_REQUEST:
1956 		case P_OV_REQUEST:
1957 			drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
1958 			break;
1959 		case P_OV_REPLY:
1960 			verb = 0;
1961 			dec_rs_pending(mdev);
1962 			drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
1963 			break;
1964 		default:
1965 			dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
1966 				cmdname(cmd));
1967 		}
1968 		if (verb && __ratelimit(&drbd_ratelimit_state))
1969 			dev_err(DEV, "Can not satisfy peer's read request, "
1970 			    "no local data.\n");
1971 
1972 		/* drain possibly payload */
1973 		return drbd_drain_block(mdev, digest_size);
1974 	}
1975 
1976 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1977 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1978 	 * which in turn might block on the other node at this very place.  */
1979 	e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1980 	if (!e) {
1981 		put_ldev(mdev);
1982 		return FALSE;
1983 	}
1984 
1985 	switch (cmd) {
1986 	case P_DATA_REQUEST:
1987 		e->w.cb = w_e_end_data_req;
1988 		fault_type = DRBD_FAULT_DT_RD;
1989 		/* application IO, don't drbd_rs_begin_io */
1990 		goto submit;
1991 
1992 	case P_RS_DATA_REQUEST:
1993 		e->w.cb = w_e_end_rsdata_req;
1994 		fault_type = DRBD_FAULT_RS_RD;
1995 		break;
1996 
1997 	case P_OV_REPLY:
1998 	case P_CSUM_RS_REQUEST:
1999 		fault_type = DRBD_FAULT_RS_RD;
2000 		di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2001 		if (!di)
2002 			goto out_free_e;
2003 
2004 		di->digest_size = digest_size;
2005 		di->digest = (((char *)di)+sizeof(struct digest_info));
2006 
2007 		e->digest = di;
2008 		e->flags |= EE_HAS_DIGEST;
2009 
2010 		if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2011 			goto out_free_e;
2012 
2013 		if (cmd == P_CSUM_RS_REQUEST) {
2014 			D_ASSERT(mdev->agreed_pro_version >= 89);
2015 			e->w.cb = w_e_end_csum_rs_req;
2016 		} else if (cmd == P_OV_REPLY) {
2017 			e->w.cb = w_e_end_ov_reply;
2018 			dec_rs_pending(mdev);
2019 			/* drbd_rs_begin_io done when we sent this request,
2020 			 * but accounting still needs to be done. */
2021 			goto submit_for_resync;
2022 		}
2023 		break;
2024 
2025 	case P_OV_REQUEST:
2026 		if (mdev->ov_start_sector == ~(sector_t)0 &&
2027 		    mdev->agreed_pro_version >= 90) {
2028 			mdev->ov_start_sector = sector;
2029 			mdev->ov_position = sector;
2030 			mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2031 			dev_info(DEV, "Online Verify start sector: %llu\n",
2032 					(unsigned long long)sector);
2033 		}
2034 		e->w.cb = w_e_end_ov_req;
2035 		fault_type = DRBD_FAULT_RS_RD;
2036 		break;
2037 
2038 	default:
2039 		dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2040 		    cmdname(cmd));
2041 		fault_type = DRBD_FAULT_MAX;
2042 		goto out_free_e;
2043 	}
2044 
2045 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2046 	 * wrt the receiver, but it is not as straightforward as it may seem.
2047 	 * Various places in the resync start and stop logic assume resync
2048 	 * requests are processed in order, requeuing this on the worker thread
2049 	 * introduces a bunch of new code for synchronization between threads.
2050 	 *
2051 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2052 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2053 	 * for application writes for the same time.  For now, just throttle
2054 	 * here, where the rest of the code expects the receiver to sleep for
2055 	 * a while, anyways.
2056 	 */
2057 
2058 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2059 	 * this defers syncer requests for some time, before letting at least
2060 	 * on request through.  The resync controller on the receiving side
2061 	 * will adapt to the incoming rate accordingly.
2062 	 *
2063 	 * We cannot throttle here if remote is Primary/SyncTarget:
2064 	 * we would also throttle its application reads.
2065 	 * In that case, throttling is done on the SyncTarget only.
2066 	 */
2067 	if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev))
2068 		msleep(100);
2069 	if (drbd_rs_begin_io(mdev, e->sector))
2070 		goto out_free_e;
2071 
2072 submit_for_resync:
2073 	atomic_add(size >> 9, &mdev->rs_sect_ev);
2074 
2075 submit:
2076 	inc_unacked(mdev);
2077 	spin_lock_irq(&mdev->req_lock);
2078 	list_add_tail(&e->w.list, &mdev->read_ee);
2079 	spin_unlock_irq(&mdev->req_lock);
2080 
2081 	if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2082 		return TRUE;
2083 
2084 	/* drbd_submit_ee currently fails for one reason only:
2085 	 * not being able to allocate enough bios.
2086 	 * Is dropping the connection going to help? */
2087 	spin_lock_irq(&mdev->req_lock);
2088 	list_del(&e->w.list);
2089 	spin_unlock_irq(&mdev->req_lock);
2090 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2091 
2092 out_free_e:
2093 	put_ldev(mdev);
2094 	drbd_free_ee(mdev, e);
2095 	return FALSE;
2096 }
2097 
2098 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2099 {
2100 	int self, peer, rv = -100;
2101 	unsigned long ch_self, ch_peer;
2102 
2103 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2104 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2105 
2106 	ch_peer = mdev->p_uuid[UI_SIZE];
2107 	ch_self = mdev->comm_bm_set;
2108 
2109 	switch (mdev->net_conf->after_sb_0p) {
2110 	case ASB_CONSENSUS:
2111 	case ASB_DISCARD_SECONDARY:
2112 	case ASB_CALL_HELPER:
2113 		dev_err(DEV, "Configuration error.\n");
2114 		break;
2115 	case ASB_DISCONNECT:
2116 		break;
2117 	case ASB_DISCARD_YOUNGER_PRI:
2118 		if (self == 0 && peer == 1) {
2119 			rv = -1;
2120 			break;
2121 		}
2122 		if (self == 1 && peer == 0) {
2123 			rv =  1;
2124 			break;
2125 		}
2126 		/* Else fall through to one of the other strategies... */
2127 	case ASB_DISCARD_OLDER_PRI:
2128 		if (self == 0 && peer == 1) {
2129 			rv = 1;
2130 			break;
2131 		}
2132 		if (self == 1 && peer == 0) {
2133 			rv = -1;
2134 			break;
2135 		}
2136 		/* Else fall through to one of the other strategies... */
2137 		dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2138 		     "Using discard-least-changes instead\n");
2139 	case ASB_DISCARD_ZERO_CHG:
2140 		if (ch_peer == 0 && ch_self == 0) {
2141 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2142 				? -1 : 1;
2143 			break;
2144 		} else {
2145 			if (ch_peer == 0) { rv =  1; break; }
2146 			if (ch_self == 0) { rv = -1; break; }
2147 		}
2148 		if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2149 			break;
2150 	case ASB_DISCARD_LEAST_CHG:
2151 		if	(ch_self < ch_peer)
2152 			rv = -1;
2153 		else if (ch_self > ch_peer)
2154 			rv =  1;
2155 		else /* ( ch_self == ch_peer ) */
2156 		     /* Well, then use something else. */
2157 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2158 				? -1 : 1;
2159 		break;
2160 	case ASB_DISCARD_LOCAL:
2161 		rv = -1;
2162 		break;
2163 	case ASB_DISCARD_REMOTE:
2164 		rv =  1;
2165 	}
2166 
2167 	return rv;
2168 }
2169 
2170 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2171 {
2172 	int self, peer, hg, rv = -100;
2173 
2174 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2175 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2176 
2177 	switch (mdev->net_conf->after_sb_1p) {
2178 	case ASB_DISCARD_YOUNGER_PRI:
2179 	case ASB_DISCARD_OLDER_PRI:
2180 	case ASB_DISCARD_LEAST_CHG:
2181 	case ASB_DISCARD_LOCAL:
2182 	case ASB_DISCARD_REMOTE:
2183 		dev_err(DEV, "Configuration error.\n");
2184 		break;
2185 	case ASB_DISCONNECT:
2186 		break;
2187 	case ASB_CONSENSUS:
2188 		hg = drbd_asb_recover_0p(mdev);
2189 		if (hg == -1 && mdev->state.role == R_SECONDARY)
2190 			rv = hg;
2191 		if (hg == 1  && mdev->state.role == R_PRIMARY)
2192 			rv = hg;
2193 		break;
2194 	case ASB_VIOLENTLY:
2195 		rv = drbd_asb_recover_0p(mdev);
2196 		break;
2197 	case ASB_DISCARD_SECONDARY:
2198 		return mdev->state.role == R_PRIMARY ? 1 : -1;
2199 	case ASB_CALL_HELPER:
2200 		hg = drbd_asb_recover_0p(mdev);
2201 		if (hg == -1 && mdev->state.role == R_PRIMARY) {
2202 			self = drbd_set_role(mdev, R_SECONDARY, 0);
2203 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2204 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2205 			  * we do not need to wait for the after state change work either. */
2206 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2207 			if (self != SS_SUCCESS) {
2208 				drbd_khelper(mdev, "pri-lost-after-sb");
2209 			} else {
2210 				dev_warn(DEV, "Successfully gave up primary role.\n");
2211 				rv = hg;
2212 			}
2213 		} else
2214 			rv = hg;
2215 	}
2216 
2217 	return rv;
2218 }
2219 
2220 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2221 {
2222 	int self, peer, hg, rv = -100;
2223 
2224 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2225 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2226 
2227 	switch (mdev->net_conf->after_sb_2p) {
2228 	case ASB_DISCARD_YOUNGER_PRI:
2229 	case ASB_DISCARD_OLDER_PRI:
2230 	case ASB_DISCARD_LEAST_CHG:
2231 	case ASB_DISCARD_LOCAL:
2232 	case ASB_DISCARD_REMOTE:
2233 	case ASB_CONSENSUS:
2234 	case ASB_DISCARD_SECONDARY:
2235 		dev_err(DEV, "Configuration error.\n");
2236 		break;
2237 	case ASB_VIOLENTLY:
2238 		rv = drbd_asb_recover_0p(mdev);
2239 		break;
2240 	case ASB_DISCONNECT:
2241 		break;
2242 	case ASB_CALL_HELPER:
2243 		hg = drbd_asb_recover_0p(mdev);
2244 		if (hg == -1) {
2245 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2246 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2247 			  * we do not need to wait for the after state change work either. */
2248 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2249 			if (self != SS_SUCCESS) {
2250 				drbd_khelper(mdev, "pri-lost-after-sb");
2251 			} else {
2252 				dev_warn(DEV, "Successfully gave up primary role.\n");
2253 				rv = hg;
2254 			}
2255 		} else
2256 			rv = hg;
2257 	}
2258 
2259 	return rv;
2260 }
2261 
2262 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2263 			   u64 bits, u64 flags)
2264 {
2265 	if (!uuid) {
2266 		dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2267 		return;
2268 	}
2269 	dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2270 	     text,
2271 	     (unsigned long long)uuid[UI_CURRENT],
2272 	     (unsigned long long)uuid[UI_BITMAP],
2273 	     (unsigned long long)uuid[UI_HISTORY_START],
2274 	     (unsigned long long)uuid[UI_HISTORY_END],
2275 	     (unsigned long long)bits,
2276 	     (unsigned long long)flags);
2277 }
2278 
2279 /*
2280   100	after split brain try auto recover
2281     2	C_SYNC_SOURCE set BitMap
2282     1	C_SYNC_SOURCE use BitMap
2283     0	no Sync
2284    -1	C_SYNC_TARGET use BitMap
2285    -2	C_SYNC_TARGET set BitMap
2286  -100	after split brain, disconnect
2287 -1000	unrelated data
2288  */
2289 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2290 {
2291 	u64 self, peer;
2292 	int i, j;
2293 
2294 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2295 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2296 
2297 	*rule_nr = 10;
2298 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2299 		return 0;
2300 
2301 	*rule_nr = 20;
2302 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2303 	     peer != UUID_JUST_CREATED)
2304 		return -2;
2305 
2306 	*rule_nr = 30;
2307 	if (self != UUID_JUST_CREATED &&
2308 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2309 		return 2;
2310 
2311 	if (self == peer) {
2312 		int rct, dc; /* roles at crash time */
2313 
2314 		if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2315 
2316 			if (mdev->agreed_pro_version < 91)
2317 				return -1001;
2318 
2319 			if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2320 			    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2321 				dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2322 				drbd_uuid_set_bm(mdev, 0UL);
2323 
2324 				drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2325 					       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2326 				*rule_nr = 34;
2327 			} else {
2328 				dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2329 				*rule_nr = 36;
2330 			}
2331 
2332 			return 1;
2333 		}
2334 
2335 		if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2336 
2337 			if (mdev->agreed_pro_version < 91)
2338 				return -1001;
2339 
2340 			if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2341 			    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2342 				dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2343 
2344 				mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2345 				mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2346 				mdev->p_uuid[UI_BITMAP] = 0UL;
2347 
2348 				drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2349 				*rule_nr = 35;
2350 			} else {
2351 				dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2352 				*rule_nr = 37;
2353 			}
2354 
2355 			return -1;
2356 		}
2357 
2358 		/* Common power [off|failure] */
2359 		rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2360 			(mdev->p_uuid[UI_FLAGS] & 2);
2361 		/* lowest bit is set when we were primary,
2362 		 * next bit (weight 2) is set when peer was primary */
2363 		*rule_nr = 40;
2364 
2365 		switch (rct) {
2366 		case 0: /* !self_pri && !peer_pri */ return 0;
2367 		case 1: /*  self_pri && !peer_pri */ return 1;
2368 		case 2: /* !self_pri &&  peer_pri */ return -1;
2369 		case 3: /*  self_pri &&  peer_pri */
2370 			dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2371 			return dc ? -1 : 1;
2372 		}
2373 	}
2374 
2375 	*rule_nr = 50;
2376 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2377 	if (self == peer)
2378 		return -1;
2379 
2380 	*rule_nr = 51;
2381 	peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2382 	if (self == peer) {
2383 		self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2384 		peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2385 		if (self == peer) {
2386 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2387 			   resync as sync source modifications of the peer's UUIDs. */
2388 
2389 			if (mdev->agreed_pro_version < 91)
2390 				return -1001;
2391 
2392 			mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2393 			mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2394 			return -1;
2395 		}
2396 	}
2397 
2398 	*rule_nr = 60;
2399 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2400 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2401 		peer = mdev->p_uuid[i] & ~((u64)1);
2402 		if (self == peer)
2403 			return -2;
2404 	}
2405 
2406 	*rule_nr = 70;
2407 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2408 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2409 	if (self == peer)
2410 		return 1;
2411 
2412 	*rule_nr = 71;
2413 	self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2414 	if (self == peer) {
2415 		self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2416 		peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2417 		if (self == peer) {
2418 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2419 			   resync as sync source modifications of our UUIDs. */
2420 
2421 			if (mdev->agreed_pro_version < 91)
2422 				return -1001;
2423 
2424 			_drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2425 			_drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2426 
2427 			dev_info(DEV, "Undid last start of resync:\n");
2428 
2429 			drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2430 				       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2431 
2432 			return 1;
2433 		}
2434 	}
2435 
2436 
2437 	*rule_nr = 80;
2438 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2439 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2440 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2441 		if (self == peer)
2442 			return 2;
2443 	}
2444 
2445 	*rule_nr = 90;
2446 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2447 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2448 	if (self == peer && self != ((u64)0))
2449 		return 100;
2450 
2451 	*rule_nr = 100;
2452 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2453 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2454 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2455 			peer = mdev->p_uuid[j] & ~((u64)1);
2456 			if (self == peer)
2457 				return -100;
2458 		}
2459 	}
2460 
2461 	return -1000;
2462 }
2463 
2464 /* drbd_sync_handshake() returns the new conn state on success, or
2465    CONN_MASK (-1) on failure.
2466  */
2467 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2468 					   enum drbd_disk_state peer_disk) __must_hold(local)
2469 {
2470 	int hg, rule_nr;
2471 	enum drbd_conns rv = C_MASK;
2472 	enum drbd_disk_state mydisk;
2473 
2474 	mydisk = mdev->state.disk;
2475 	if (mydisk == D_NEGOTIATING)
2476 		mydisk = mdev->new_state_tmp.disk;
2477 
2478 	dev_info(DEV, "drbd_sync_handshake:\n");
2479 	drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2480 	drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2481 		       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2482 
2483 	hg = drbd_uuid_compare(mdev, &rule_nr);
2484 
2485 	dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2486 
2487 	if (hg == -1000) {
2488 		dev_alert(DEV, "Unrelated data, aborting!\n");
2489 		return C_MASK;
2490 	}
2491 	if (hg == -1001) {
2492 		dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2493 		return C_MASK;
2494 	}
2495 
2496 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2497 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2498 		int f = (hg == -100) || abs(hg) == 2;
2499 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
2500 		if (f)
2501 			hg = hg*2;
2502 		dev_info(DEV, "Becoming sync %s due to disk states.\n",
2503 		     hg > 0 ? "source" : "target");
2504 	}
2505 
2506 	if (abs(hg) == 100)
2507 		drbd_khelper(mdev, "initial-split-brain");
2508 
2509 	if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2510 		int pcount = (mdev->state.role == R_PRIMARY)
2511 			   + (peer_role == R_PRIMARY);
2512 		int forced = (hg == -100);
2513 
2514 		switch (pcount) {
2515 		case 0:
2516 			hg = drbd_asb_recover_0p(mdev);
2517 			break;
2518 		case 1:
2519 			hg = drbd_asb_recover_1p(mdev);
2520 			break;
2521 		case 2:
2522 			hg = drbd_asb_recover_2p(mdev);
2523 			break;
2524 		}
2525 		if (abs(hg) < 100) {
2526 			dev_warn(DEV, "Split-Brain detected, %d primaries, "
2527 			     "automatically solved. Sync from %s node\n",
2528 			     pcount, (hg < 0) ? "peer" : "this");
2529 			if (forced) {
2530 				dev_warn(DEV, "Doing a full sync, since"
2531 				     " UUIDs where ambiguous.\n");
2532 				hg = hg*2;
2533 			}
2534 		}
2535 	}
2536 
2537 	if (hg == -100) {
2538 		if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2539 			hg = -1;
2540 		if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2541 			hg = 1;
2542 
2543 		if (abs(hg) < 100)
2544 			dev_warn(DEV, "Split-Brain detected, manually solved. "
2545 			     "Sync from %s node\n",
2546 			     (hg < 0) ? "peer" : "this");
2547 	}
2548 
2549 	if (hg == -100) {
2550 		/* FIXME this log message is not correct if we end up here
2551 		 * after an attempted attach on a diskless node.
2552 		 * We just refuse to attach -- well, we drop the "connection"
2553 		 * to that disk, in a way... */
2554 		dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2555 		drbd_khelper(mdev, "split-brain");
2556 		return C_MASK;
2557 	}
2558 
2559 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
2560 		dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2561 		return C_MASK;
2562 	}
2563 
2564 	if (hg < 0 && /* by intention we do not use mydisk here. */
2565 	    mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2566 		switch (mdev->net_conf->rr_conflict) {
2567 		case ASB_CALL_HELPER:
2568 			drbd_khelper(mdev, "pri-lost");
2569 			/* fall through */
2570 		case ASB_DISCONNECT:
2571 			dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2572 			return C_MASK;
2573 		case ASB_VIOLENTLY:
2574 			dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2575 			     "assumption\n");
2576 		}
2577 	}
2578 
2579 	if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2580 		if (hg == 0)
2581 			dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2582 		else
2583 			dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2584 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2585 				 abs(hg) >= 2 ? "full" : "bit-map based");
2586 		return C_MASK;
2587 	}
2588 
2589 	if (abs(hg) >= 2) {
2590 		dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2591 		if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2592 			return C_MASK;
2593 	}
2594 
2595 	if (hg > 0) { /* become sync source. */
2596 		rv = C_WF_BITMAP_S;
2597 	} else if (hg < 0) { /* become sync target */
2598 		rv = C_WF_BITMAP_T;
2599 	} else {
2600 		rv = C_CONNECTED;
2601 		if (drbd_bm_total_weight(mdev)) {
2602 			dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2603 			     drbd_bm_total_weight(mdev));
2604 		}
2605 	}
2606 
2607 	return rv;
2608 }
2609 
2610 /* returns 1 if invalid */
2611 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2612 {
2613 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2614 	if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2615 	    (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2616 		return 0;
2617 
2618 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2619 	if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2620 	    self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2621 		return 1;
2622 
2623 	/* everything else is valid if they are equal on both sides. */
2624 	if (peer == self)
2625 		return 0;
2626 
2627 	/* everything es is invalid. */
2628 	return 1;
2629 }
2630 
2631 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2632 {
2633 	struct p_protocol *p = &mdev->data.rbuf.protocol;
2634 	int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2635 	int p_want_lose, p_two_primaries, cf;
2636 	char p_integrity_alg[SHARED_SECRET_MAX] = "";
2637 
2638 	p_proto		= be32_to_cpu(p->protocol);
2639 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
2640 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
2641 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
2642 	p_two_primaries = be32_to_cpu(p->two_primaries);
2643 	cf		= be32_to_cpu(p->conn_flags);
2644 	p_want_lose = cf & CF_WANT_LOSE;
2645 
2646 	clear_bit(CONN_DRY_RUN, &mdev->flags);
2647 
2648 	if (cf & CF_DRY_RUN)
2649 		set_bit(CONN_DRY_RUN, &mdev->flags);
2650 
2651 	if (p_proto != mdev->net_conf->wire_protocol) {
2652 		dev_err(DEV, "incompatible communication protocols\n");
2653 		goto disconnect;
2654 	}
2655 
2656 	if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2657 		dev_err(DEV, "incompatible after-sb-0pri settings\n");
2658 		goto disconnect;
2659 	}
2660 
2661 	if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2662 		dev_err(DEV, "incompatible after-sb-1pri settings\n");
2663 		goto disconnect;
2664 	}
2665 
2666 	if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2667 		dev_err(DEV, "incompatible after-sb-2pri settings\n");
2668 		goto disconnect;
2669 	}
2670 
2671 	if (p_want_lose && mdev->net_conf->want_lose) {
2672 		dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2673 		goto disconnect;
2674 	}
2675 
2676 	if (p_two_primaries != mdev->net_conf->two_primaries) {
2677 		dev_err(DEV, "incompatible setting of the two-primaries options\n");
2678 		goto disconnect;
2679 	}
2680 
2681 	if (mdev->agreed_pro_version >= 87) {
2682 		unsigned char *my_alg = mdev->net_conf->integrity_alg;
2683 
2684 		if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2685 			return FALSE;
2686 
2687 		p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2688 		if (strcmp(p_integrity_alg, my_alg)) {
2689 			dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2690 			goto disconnect;
2691 		}
2692 		dev_info(DEV, "data-integrity-alg: %s\n",
2693 		     my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2694 	}
2695 
2696 	return TRUE;
2697 
2698 disconnect:
2699 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2700 	return FALSE;
2701 }
2702 
2703 /* helper function
2704  * input: alg name, feature name
2705  * return: NULL (alg name was "")
2706  *         ERR_PTR(error) if something goes wrong
2707  *         or the crypto hash ptr, if it worked out ok. */
2708 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2709 		const char *alg, const char *name)
2710 {
2711 	struct crypto_hash *tfm;
2712 
2713 	if (!alg[0])
2714 		return NULL;
2715 
2716 	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2717 	if (IS_ERR(tfm)) {
2718 		dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2719 			alg, name, PTR_ERR(tfm));
2720 		return tfm;
2721 	}
2722 	if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2723 		crypto_free_hash(tfm);
2724 		dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2725 		return ERR_PTR(-EINVAL);
2726 	}
2727 	return tfm;
2728 }
2729 
2730 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2731 {
2732 	int ok = TRUE;
2733 	struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2734 	unsigned int header_size, data_size, exp_max_sz;
2735 	struct crypto_hash *verify_tfm = NULL;
2736 	struct crypto_hash *csums_tfm = NULL;
2737 	const int apv = mdev->agreed_pro_version;
2738 	int *rs_plan_s = NULL;
2739 	int fifo_size = 0;
2740 
2741 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2742 		    : apv == 88 ? sizeof(struct p_rs_param)
2743 					+ SHARED_SECRET_MAX
2744 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
2745 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2746 
2747 	if (packet_size > exp_max_sz) {
2748 		dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2749 		    packet_size, exp_max_sz);
2750 		return FALSE;
2751 	}
2752 
2753 	if (apv <= 88) {
2754 		header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2755 		data_size   = packet_size  - header_size;
2756 	} else if (apv <= 94) {
2757 		header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2758 		data_size   = packet_size  - header_size;
2759 		D_ASSERT(data_size == 0);
2760 	} else {
2761 		header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2762 		data_size   = packet_size  - header_size;
2763 		D_ASSERT(data_size == 0);
2764 	}
2765 
2766 	/* initialize verify_alg and csums_alg */
2767 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2768 
2769 	if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2770 		return FALSE;
2771 
2772 	mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2773 
2774 	if (apv >= 88) {
2775 		if (apv == 88) {
2776 			if (data_size > SHARED_SECRET_MAX) {
2777 				dev_err(DEV, "verify-alg too long, "
2778 				    "peer wants %u, accepting only %u byte\n",
2779 						data_size, SHARED_SECRET_MAX);
2780 				return FALSE;
2781 			}
2782 
2783 			if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2784 				return FALSE;
2785 
2786 			/* we expect NUL terminated string */
2787 			/* but just in case someone tries to be evil */
2788 			D_ASSERT(p->verify_alg[data_size-1] == 0);
2789 			p->verify_alg[data_size-1] = 0;
2790 
2791 		} else /* apv >= 89 */ {
2792 			/* we still expect NUL terminated strings */
2793 			/* but just in case someone tries to be evil */
2794 			D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2795 			D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2796 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2797 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2798 		}
2799 
2800 		if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2801 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2802 				dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2803 				    mdev->sync_conf.verify_alg, p->verify_alg);
2804 				goto disconnect;
2805 			}
2806 			verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2807 					p->verify_alg, "verify-alg");
2808 			if (IS_ERR(verify_tfm)) {
2809 				verify_tfm = NULL;
2810 				goto disconnect;
2811 			}
2812 		}
2813 
2814 		if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2815 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2816 				dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2817 				    mdev->sync_conf.csums_alg, p->csums_alg);
2818 				goto disconnect;
2819 			}
2820 			csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2821 					p->csums_alg, "csums-alg");
2822 			if (IS_ERR(csums_tfm)) {
2823 				csums_tfm = NULL;
2824 				goto disconnect;
2825 			}
2826 		}
2827 
2828 		if (apv > 94) {
2829 			mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2830 			mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2831 			mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2832 			mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2833 			mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2834 
2835 			fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2836 			if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2837 				rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2838 				if (!rs_plan_s) {
2839 					dev_err(DEV, "kmalloc of fifo_buffer failed");
2840 					goto disconnect;
2841 				}
2842 			}
2843 		}
2844 
2845 		spin_lock(&mdev->peer_seq_lock);
2846 		/* lock against drbd_nl_syncer_conf() */
2847 		if (verify_tfm) {
2848 			strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2849 			mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2850 			crypto_free_hash(mdev->verify_tfm);
2851 			mdev->verify_tfm = verify_tfm;
2852 			dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2853 		}
2854 		if (csums_tfm) {
2855 			strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2856 			mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2857 			crypto_free_hash(mdev->csums_tfm);
2858 			mdev->csums_tfm = csums_tfm;
2859 			dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2860 		}
2861 		if (fifo_size != mdev->rs_plan_s.size) {
2862 			kfree(mdev->rs_plan_s.values);
2863 			mdev->rs_plan_s.values = rs_plan_s;
2864 			mdev->rs_plan_s.size   = fifo_size;
2865 			mdev->rs_planed = 0;
2866 		}
2867 		spin_unlock(&mdev->peer_seq_lock);
2868 	}
2869 
2870 	return ok;
2871 disconnect:
2872 	/* just for completeness: actually not needed,
2873 	 * as this is not reached if csums_tfm was ok. */
2874 	crypto_free_hash(csums_tfm);
2875 	/* but free the verify_tfm again, if csums_tfm did not work out */
2876 	crypto_free_hash(verify_tfm);
2877 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2878 	return FALSE;
2879 }
2880 
2881 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2882 {
2883 	/* sorry, we currently have no working implementation
2884 	 * of distributed TCQ */
2885 }
2886 
2887 /* warn if the arguments differ by more than 12.5% */
2888 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2889 	const char *s, sector_t a, sector_t b)
2890 {
2891 	sector_t d;
2892 	if (a == 0 || b == 0)
2893 		return;
2894 	d = (a > b) ? (a - b) : (b - a);
2895 	if (d > (a>>3) || d > (b>>3))
2896 		dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2897 		     (unsigned long long)a, (unsigned long long)b);
2898 }
2899 
2900 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2901 {
2902 	struct p_sizes *p = &mdev->data.rbuf.sizes;
2903 	enum determine_dev_size dd = unchanged;
2904 	unsigned int max_seg_s;
2905 	sector_t p_size, p_usize, my_usize;
2906 	int ldsc = 0; /* local disk size changed */
2907 	enum dds_flags ddsf;
2908 
2909 	p_size = be64_to_cpu(p->d_size);
2910 	p_usize = be64_to_cpu(p->u_size);
2911 
2912 	if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2913 		dev_err(DEV, "some backing storage is needed\n");
2914 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2915 		return FALSE;
2916 	}
2917 
2918 	/* just store the peer's disk size for now.
2919 	 * we still need to figure out whether we accept that. */
2920 	mdev->p_size = p_size;
2921 
2922 	if (get_ldev(mdev)) {
2923 		warn_if_differ_considerably(mdev, "lower level device sizes",
2924 			   p_size, drbd_get_max_capacity(mdev->ldev));
2925 		warn_if_differ_considerably(mdev, "user requested size",
2926 					    p_usize, mdev->ldev->dc.disk_size);
2927 
2928 		/* if this is the first connect, or an otherwise expected
2929 		 * param exchange, choose the minimum */
2930 		if (mdev->state.conn == C_WF_REPORT_PARAMS)
2931 			p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2932 					     p_usize);
2933 
2934 		my_usize = mdev->ldev->dc.disk_size;
2935 
2936 		if (mdev->ldev->dc.disk_size != p_usize) {
2937 			mdev->ldev->dc.disk_size = p_usize;
2938 			dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2939 			     (unsigned long)mdev->ldev->dc.disk_size);
2940 		}
2941 
2942 		/* Never shrink a device with usable data during connect.
2943 		   But allow online shrinking if we are connected. */
2944 		if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2945 		   drbd_get_capacity(mdev->this_bdev) &&
2946 		   mdev->state.disk >= D_OUTDATED &&
2947 		   mdev->state.conn < C_CONNECTED) {
2948 			dev_err(DEV, "The peer's disk size is too small!\n");
2949 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2950 			mdev->ldev->dc.disk_size = my_usize;
2951 			put_ldev(mdev);
2952 			return FALSE;
2953 		}
2954 		put_ldev(mdev);
2955 	}
2956 #undef min_not_zero
2957 
2958 	ddsf = be16_to_cpu(p->dds_flags);
2959 	if (get_ldev(mdev)) {
2960 		dd = drbd_determin_dev_size(mdev, ddsf);
2961 		put_ldev(mdev);
2962 		if (dd == dev_size_error)
2963 			return FALSE;
2964 		drbd_md_sync(mdev);
2965 	} else {
2966 		/* I am diskless, need to accept the peer's size. */
2967 		drbd_set_my_capacity(mdev, p_size);
2968 	}
2969 
2970 	if (get_ldev(mdev)) {
2971 		if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
2972 			mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
2973 			ldsc = 1;
2974 		}
2975 
2976 		if (mdev->agreed_pro_version < 94)
2977 			max_seg_s = be32_to_cpu(p->max_segment_size);
2978 		else if (mdev->agreed_pro_version == 94)
2979 			max_seg_s = DRBD_MAX_SIZE_H80_PACKET;
2980 		else /* drbd 8.3.8 onwards */
2981 			max_seg_s = DRBD_MAX_SEGMENT_SIZE;
2982 
2983 		if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
2984 			drbd_setup_queue_param(mdev, max_seg_s);
2985 
2986 		drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
2987 		put_ldev(mdev);
2988 	}
2989 
2990 	if (mdev->state.conn > C_WF_REPORT_PARAMS) {
2991 		if (be64_to_cpu(p->c_size) !=
2992 		    drbd_get_capacity(mdev->this_bdev) || ldsc) {
2993 			/* we have different sizes, probably peer
2994 			 * needs to know my new size... */
2995 			drbd_send_sizes(mdev, 0, ddsf);
2996 		}
2997 		if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
2998 		    (dd == grew && mdev->state.conn == C_CONNECTED)) {
2999 			if (mdev->state.pdsk >= D_INCONSISTENT &&
3000 			    mdev->state.disk >= D_INCONSISTENT) {
3001 				if (ddsf & DDSF_NO_RESYNC)
3002 					dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3003 				else
3004 					resync_after_online_grow(mdev);
3005 			} else
3006 				set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3007 		}
3008 	}
3009 
3010 	return TRUE;
3011 }
3012 
3013 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3014 {
3015 	struct p_uuids *p = &mdev->data.rbuf.uuids;
3016 	u64 *p_uuid;
3017 	int i;
3018 
3019 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3020 
3021 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3022 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
3023 
3024 	kfree(mdev->p_uuid);
3025 	mdev->p_uuid = p_uuid;
3026 
3027 	if (mdev->state.conn < C_CONNECTED &&
3028 	    mdev->state.disk < D_INCONSISTENT &&
3029 	    mdev->state.role == R_PRIMARY &&
3030 	    (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3031 		dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3032 		    (unsigned long long)mdev->ed_uuid);
3033 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3034 		return FALSE;
3035 	}
3036 
3037 	if (get_ldev(mdev)) {
3038 		int skip_initial_sync =
3039 			mdev->state.conn == C_CONNECTED &&
3040 			mdev->agreed_pro_version >= 90 &&
3041 			mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3042 			(p_uuid[UI_FLAGS] & 8);
3043 		if (skip_initial_sync) {
3044 			dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3045 			drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3046 					"clear_n_write from receive_uuids");
3047 			_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3048 			_drbd_uuid_set(mdev, UI_BITMAP, 0);
3049 			_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3050 					CS_VERBOSE, NULL);
3051 			drbd_md_sync(mdev);
3052 		}
3053 		put_ldev(mdev);
3054 	} else if (mdev->state.disk < D_INCONSISTENT &&
3055 		   mdev->state.role == R_PRIMARY) {
3056 		/* I am a diskless primary, the peer just created a new current UUID
3057 		   for me. */
3058 		drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3059 	}
3060 
3061 	/* Before we test for the disk state, we should wait until an eventually
3062 	   ongoing cluster wide state change is finished. That is important if
3063 	   we are primary and are detaching from our disk. We need to see the
3064 	   new disk state... */
3065 	wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3066 	if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3067 		drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3068 
3069 	return TRUE;
3070 }
3071 
3072 /**
3073  * convert_state() - Converts the peer's view of the cluster state to our point of view
3074  * @ps:		The state as seen by the peer.
3075  */
3076 static union drbd_state convert_state(union drbd_state ps)
3077 {
3078 	union drbd_state ms;
3079 
3080 	static enum drbd_conns c_tab[] = {
3081 		[C_CONNECTED] = C_CONNECTED,
3082 
3083 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3084 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3085 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3086 		[C_VERIFY_S]       = C_VERIFY_T,
3087 		[C_MASK]   = C_MASK,
3088 	};
3089 
3090 	ms.i = ps.i;
3091 
3092 	ms.conn = c_tab[ps.conn];
3093 	ms.peer = ps.role;
3094 	ms.role = ps.peer;
3095 	ms.pdsk = ps.disk;
3096 	ms.disk = ps.pdsk;
3097 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3098 
3099 	return ms;
3100 }
3101 
3102 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3103 {
3104 	struct p_req_state *p = &mdev->data.rbuf.req_state;
3105 	union drbd_state mask, val;
3106 	int rv;
3107 
3108 	mask.i = be32_to_cpu(p->mask);
3109 	val.i = be32_to_cpu(p->val);
3110 
3111 	if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3112 	    test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3113 		drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3114 		return TRUE;
3115 	}
3116 
3117 	mask = convert_state(mask);
3118 	val = convert_state(val);
3119 
3120 	rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3121 
3122 	drbd_send_sr_reply(mdev, rv);
3123 	drbd_md_sync(mdev);
3124 
3125 	return TRUE;
3126 }
3127 
3128 static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3129 {
3130 	struct p_state *p = &mdev->data.rbuf.state;
3131 	union drbd_state os, ns, peer_state;
3132 	enum drbd_disk_state real_peer_disk;
3133 	enum chg_state_flags cs_flags;
3134 	int rv;
3135 
3136 	peer_state.i = be32_to_cpu(p->state);
3137 
3138 	real_peer_disk = peer_state.disk;
3139 	if (peer_state.disk == D_NEGOTIATING) {
3140 		real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3141 		dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3142 	}
3143 
3144 	spin_lock_irq(&mdev->req_lock);
3145  retry:
3146 	os = ns = mdev->state;
3147 	spin_unlock_irq(&mdev->req_lock);
3148 
3149 	/* peer says his disk is uptodate, while we think it is inconsistent,
3150 	 * and this happens while we think we have a sync going on. */
3151 	if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3152 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3153 		/* If we are (becoming) SyncSource, but peer is still in sync
3154 		 * preparation, ignore its uptodate-ness to avoid flapping, it
3155 		 * will change to inconsistent once the peer reaches active
3156 		 * syncing states.
3157 		 * It may have changed syncer-paused flags, however, so we
3158 		 * cannot ignore this completely. */
3159 		if (peer_state.conn > C_CONNECTED &&
3160 		    peer_state.conn < C_SYNC_SOURCE)
3161 			real_peer_disk = D_INCONSISTENT;
3162 
3163 		/* if peer_state changes to connected at the same time,
3164 		 * it explicitly notifies us that it finished resync.
3165 		 * Maybe we should finish it up, too? */
3166 		else if (os.conn >= C_SYNC_SOURCE &&
3167 			 peer_state.conn == C_CONNECTED) {
3168 			if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3169 				drbd_resync_finished(mdev);
3170 			return TRUE;
3171 		}
3172 	}
3173 
3174 	/* peer says his disk is inconsistent, while we think it is uptodate,
3175 	 * and this happens while the peer still thinks we have a sync going on,
3176 	 * but we think we are already done with the sync.
3177 	 * We ignore this to avoid flapping pdsk.
3178 	 * This should not happen, if the peer is a recent version of drbd. */
3179 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3180 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3181 		real_peer_disk = D_UP_TO_DATE;
3182 
3183 	if (ns.conn == C_WF_REPORT_PARAMS)
3184 		ns.conn = C_CONNECTED;
3185 
3186 	if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3187 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
3188 		int cr; /* consider resync */
3189 
3190 		/* if we established a new connection */
3191 		cr  = (os.conn < C_CONNECTED);
3192 		/* if we had an established connection
3193 		 * and one of the nodes newly attaches a disk */
3194 		cr |= (os.conn == C_CONNECTED &&
3195 		       (peer_state.disk == D_NEGOTIATING ||
3196 			os.disk == D_NEGOTIATING));
3197 		/* if we have both been inconsistent, and the peer has been
3198 		 * forced to be UpToDate with --overwrite-data */
3199 		cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3200 		/* if we had been plain connected, and the admin requested to
3201 		 * start a sync by "invalidate" or "invalidate-remote" */
3202 		cr |= (os.conn == C_CONNECTED &&
3203 				(peer_state.conn >= C_STARTING_SYNC_S &&
3204 				 peer_state.conn <= C_WF_BITMAP_T));
3205 
3206 		if (cr)
3207 			ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3208 
3209 		put_ldev(mdev);
3210 		if (ns.conn == C_MASK) {
3211 			ns.conn = C_CONNECTED;
3212 			if (mdev->state.disk == D_NEGOTIATING) {
3213 				drbd_force_state(mdev, NS(disk, D_FAILED));
3214 			} else if (peer_state.disk == D_NEGOTIATING) {
3215 				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3216 				peer_state.disk = D_DISKLESS;
3217 				real_peer_disk = D_DISKLESS;
3218 			} else {
3219 				if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3220 					return FALSE;
3221 				D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3222 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3223 				return FALSE;
3224 			}
3225 		}
3226 	}
3227 
3228 	spin_lock_irq(&mdev->req_lock);
3229 	if (mdev->state.i != os.i)
3230 		goto retry;
3231 	clear_bit(CONSIDER_RESYNC, &mdev->flags);
3232 	ns.peer = peer_state.role;
3233 	ns.pdsk = real_peer_disk;
3234 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3235 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3236 		ns.disk = mdev->new_state_tmp.disk;
3237 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3238 	if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3239 	    test_bit(NEW_CUR_UUID, &mdev->flags)) {
3240 		/* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3241 		   for temporal network outages! */
3242 		spin_unlock_irq(&mdev->req_lock);
3243 		dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3244 		tl_clear(mdev);
3245 		drbd_uuid_new_current(mdev);
3246 		clear_bit(NEW_CUR_UUID, &mdev->flags);
3247 		drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3248 		return FALSE;
3249 	}
3250 	rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3251 	ns = mdev->state;
3252 	spin_unlock_irq(&mdev->req_lock);
3253 
3254 	if (rv < SS_SUCCESS) {
3255 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3256 		return FALSE;
3257 	}
3258 
3259 	if (os.conn > C_WF_REPORT_PARAMS) {
3260 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3261 		    peer_state.disk != D_NEGOTIATING ) {
3262 			/* we want resync, peer has not yet decided to sync... */
3263 			/* Nowadays only used when forcing a node into primary role and
3264 			   setting its disk to UpToDate with that */
3265 			drbd_send_uuids(mdev);
3266 			drbd_send_state(mdev);
3267 		}
3268 	}
3269 
3270 	mdev->net_conf->want_lose = 0;
3271 
3272 	drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3273 
3274 	return TRUE;
3275 }
3276 
3277 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3278 {
3279 	struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3280 
3281 	wait_event(mdev->misc_wait,
3282 		   mdev->state.conn == C_WF_SYNC_UUID ||
3283 		   mdev->state.conn < C_CONNECTED ||
3284 		   mdev->state.disk < D_NEGOTIATING);
3285 
3286 	/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3287 
3288 	/* Here the _drbd_uuid_ functions are right, current should
3289 	   _not_ be rotated into the history */
3290 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3291 		_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3292 		_drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3293 
3294 		drbd_start_resync(mdev, C_SYNC_TARGET);
3295 
3296 		put_ldev(mdev);
3297 	} else
3298 		dev_err(DEV, "Ignoring SyncUUID packet!\n");
3299 
3300 	return TRUE;
3301 }
3302 
3303 enum receive_bitmap_ret { OK, DONE, FAILED };
3304 
3305 static enum receive_bitmap_ret
3306 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3307 		     unsigned long *buffer, struct bm_xfer_ctx *c)
3308 {
3309 	unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3310 	unsigned want = num_words * sizeof(long);
3311 
3312 	if (want != data_size) {
3313 		dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3314 		return FAILED;
3315 	}
3316 	if (want == 0)
3317 		return DONE;
3318 	if (drbd_recv(mdev, buffer, want) != want)
3319 		return FAILED;
3320 
3321 	drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3322 
3323 	c->word_offset += num_words;
3324 	c->bit_offset = c->word_offset * BITS_PER_LONG;
3325 	if (c->bit_offset > c->bm_bits)
3326 		c->bit_offset = c->bm_bits;
3327 
3328 	return OK;
3329 }
3330 
3331 static enum receive_bitmap_ret
3332 recv_bm_rle_bits(struct drbd_conf *mdev,
3333 		struct p_compressed_bm *p,
3334 		struct bm_xfer_ctx *c)
3335 {
3336 	struct bitstream bs;
3337 	u64 look_ahead;
3338 	u64 rl;
3339 	u64 tmp;
3340 	unsigned long s = c->bit_offset;
3341 	unsigned long e;
3342 	int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3343 	int toggle = DCBP_get_start(p);
3344 	int have;
3345 	int bits;
3346 
3347 	bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3348 
3349 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
3350 	if (bits < 0)
3351 		return FAILED;
3352 
3353 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
3354 		bits = vli_decode_bits(&rl, look_ahead);
3355 		if (bits <= 0)
3356 			return FAILED;
3357 
3358 		if (toggle) {
3359 			e = s + rl -1;
3360 			if (e >= c->bm_bits) {
3361 				dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3362 				return FAILED;
3363 			}
3364 			_drbd_bm_set_bits(mdev, s, e);
3365 		}
3366 
3367 		if (have < bits) {
3368 			dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3369 				have, bits, look_ahead,
3370 				(unsigned int)(bs.cur.b - p->code),
3371 				(unsigned int)bs.buf_len);
3372 			return FAILED;
3373 		}
3374 		look_ahead >>= bits;
3375 		have -= bits;
3376 
3377 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3378 		if (bits < 0)
3379 			return FAILED;
3380 		look_ahead |= tmp << have;
3381 		have += bits;
3382 	}
3383 
3384 	c->bit_offset = s;
3385 	bm_xfer_ctx_bit_to_word_offset(c);
3386 
3387 	return (s == c->bm_bits) ? DONE : OK;
3388 }
3389 
3390 static enum receive_bitmap_ret
3391 decode_bitmap_c(struct drbd_conf *mdev,
3392 		struct p_compressed_bm *p,
3393 		struct bm_xfer_ctx *c)
3394 {
3395 	if (DCBP_get_code(p) == RLE_VLI_Bits)
3396 		return recv_bm_rle_bits(mdev, p, c);
3397 
3398 	/* other variants had been implemented for evaluation,
3399 	 * but have been dropped as this one turned out to be "best"
3400 	 * during all our tests. */
3401 
3402 	dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3403 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3404 	return FAILED;
3405 }
3406 
3407 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3408 		const char *direction, struct bm_xfer_ctx *c)
3409 {
3410 	/* what would it take to transfer it "plaintext" */
3411 	unsigned plain = sizeof(struct p_header80) *
3412 		((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3413 		+ c->bm_words * sizeof(long);
3414 	unsigned total = c->bytes[0] + c->bytes[1];
3415 	unsigned r;
3416 
3417 	/* total can not be zero. but just in case: */
3418 	if (total == 0)
3419 		return;
3420 
3421 	/* don't report if not compressed */
3422 	if (total >= plain)
3423 		return;
3424 
3425 	/* total < plain. check for overflow, still */
3426 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3427 		                    : (1000 * total / plain);
3428 
3429 	if (r > 1000)
3430 		r = 1000;
3431 
3432 	r = 1000 - r;
3433 	dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3434 	     "total %u; compression: %u.%u%%\n",
3435 			direction,
3436 			c->bytes[1], c->packets[1],
3437 			c->bytes[0], c->packets[0],
3438 			total, r/10, r % 10);
3439 }
3440 
3441 /* Since we are processing the bitfield from lower addresses to higher,
3442    it does not matter if the process it in 32 bit chunks or 64 bit
3443    chunks as long as it is little endian. (Understand it as byte stream,
3444    beginning with the lowest byte...) If we would use big endian
3445    we would need to process it from the highest address to the lowest,
3446    in order to be agnostic to the 32 vs 64 bits issue.
3447 
3448    returns 0 on failure, 1 if we successfully received it. */
3449 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3450 {
3451 	struct bm_xfer_ctx c;
3452 	void *buffer;
3453 	enum receive_bitmap_ret ret;
3454 	int ok = FALSE;
3455 	struct p_header80 *h = &mdev->data.rbuf.header.h80;
3456 
3457 	wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3458 
3459 	drbd_bm_lock(mdev, "receive bitmap");
3460 
3461 	/* maybe we should use some per thread scratch page,
3462 	 * and allocate that during initial device creation? */
3463 	buffer	 = (unsigned long *) __get_free_page(GFP_NOIO);
3464 	if (!buffer) {
3465 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3466 		goto out;
3467 	}
3468 
3469 	c = (struct bm_xfer_ctx) {
3470 		.bm_bits = drbd_bm_bits(mdev),
3471 		.bm_words = drbd_bm_words(mdev),
3472 	};
3473 
3474 	do {
3475 		if (cmd == P_BITMAP) {
3476 			ret = receive_bitmap_plain(mdev, data_size, buffer, &c);
3477 		} else if (cmd == P_COMPRESSED_BITMAP) {
3478 			/* MAYBE: sanity check that we speak proto >= 90,
3479 			 * and the feature is enabled! */
3480 			struct p_compressed_bm *p;
3481 
3482 			if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3483 				dev_err(DEV, "ReportCBitmap packet too large\n");
3484 				goto out;
3485 			}
3486 			/* use the page buff */
3487 			p = buffer;
3488 			memcpy(p, h, sizeof(*h));
3489 			if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3490 				goto out;
3491 			if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3492 				dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3493 				return FAILED;
3494 			}
3495 			ret = decode_bitmap_c(mdev, p, &c);
3496 		} else {
3497 			dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3498 			goto out;
3499 		}
3500 
3501 		c.packets[cmd == P_BITMAP]++;
3502 		c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3503 
3504 		if (ret != OK)
3505 			break;
3506 
3507 		if (!drbd_recv_header(mdev, &cmd, &data_size))
3508 			goto out;
3509 	} while (ret == OK);
3510 	if (ret == FAILED)
3511 		goto out;
3512 
3513 	INFO_bm_xfer_stats(mdev, "receive", &c);
3514 
3515 	if (mdev->state.conn == C_WF_BITMAP_T) {
3516 		ok = !drbd_send_bitmap(mdev);
3517 		if (!ok)
3518 			goto out;
3519 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3520 		ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3521 		D_ASSERT(ok == SS_SUCCESS);
3522 	} else if (mdev->state.conn != C_WF_BITMAP_S) {
3523 		/* admin may have requested C_DISCONNECTING,
3524 		 * other threads may have noticed network errors */
3525 		dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3526 		    drbd_conn_str(mdev->state.conn));
3527 	}
3528 
3529 	ok = TRUE;
3530  out:
3531 	drbd_bm_unlock(mdev);
3532 	if (ok && mdev->state.conn == C_WF_BITMAP_S)
3533 		drbd_start_resync(mdev, C_SYNC_SOURCE);
3534 	free_page((unsigned long) buffer);
3535 	return ok;
3536 }
3537 
3538 static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3539 {
3540 	/* TODO zero copy sink :) */
3541 	static char sink[128];
3542 	int size, want, r;
3543 
3544 	dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3545 		 cmd, data_size);
3546 
3547 	size = data_size;
3548 	while (size > 0) {
3549 		want = min_t(int, size, sizeof(sink));
3550 		r = drbd_recv(mdev, sink, want);
3551 		ERR_IF(r <= 0) break;
3552 		size -= r;
3553 	}
3554 	return size == 0;
3555 }
3556 
3557 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3558 {
3559 	if (mdev->state.disk >= D_INCONSISTENT)
3560 		drbd_kick_lo(mdev);
3561 
3562 	/* Make sure we've acked all the TCP data associated
3563 	 * with the data requests being unplugged */
3564 	drbd_tcp_quickack(mdev->data.socket);
3565 
3566 	return TRUE;
3567 }
3568 
3569 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3570 
3571 struct data_cmd {
3572 	int expect_payload;
3573 	size_t pkt_size;
3574 	drbd_cmd_handler_f function;
3575 };
3576 
3577 static struct data_cmd drbd_cmd_handler[] = {
3578 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
3579 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
3580 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3581 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3582 	[P_BITMAP]	    = { 1, sizeof(struct p_header80), receive_bitmap } ,
3583 	[P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3584 	[P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3585 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3586 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3587 	[P_SYNC_PARAM]	    = { 1, sizeof(struct p_header80), receive_SyncParam },
3588 	[P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
3589 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
3590 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
3591 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
3592 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
3593 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
3594 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3595 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
3596 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
3597 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3598 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
3599 	/* anything missing from this table is in
3600 	 * the asender_tbl, see get_asender_cmd */
3601 	[P_MAX_CMD]	    = { 0, 0, NULL },
3602 };
3603 
3604 /* All handler functions that expect a sub-header get that sub-heder in
3605    mdev->data.rbuf.header.head.payload.
3606 
3607    Usually in mdev->data.rbuf.header.head the callback can find the usual
3608    p_header, but they may not rely on that. Since there is also p_header95 !
3609  */
3610 
3611 static void drbdd(struct drbd_conf *mdev)
3612 {
3613 	union p_header *header = &mdev->data.rbuf.header;
3614 	unsigned int packet_size;
3615 	enum drbd_packets cmd;
3616 	size_t shs; /* sub header size */
3617 	int rv;
3618 
3619 	while (get_t_state(&mdev->receiver) == Running) {
3620 		drbd_thread_current_set_cpu(mdev);
3621 		if (!drbd_recv_header(mdev, &cmd, &packet_size))
3622 			goto err_out;
3623 
3624 		if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3625 			dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3626 			goto err_out;
3627 		}
3628 
3629 		shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3630 		if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3631 			dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3632 			goto err_out;
3633 		}
3634 
3635 		if (shs) {
3636 			rv = drbd_recv(mdev, &header->h80.payload, shs);
3637 			if (unlikely(rv != shs)) {
3638 				dev_err(DEV, "short read while reading sub header: rv=%d\n", rv);
3639 				goto err_out;
3640 			}
3641 		}
3642 
3643 		rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3644 
3645 		if (unlikely(!rv)) {
3646 			dev_err(DEV, "error receiving %s, l: %d!\n",
3647 			    cmdname(cmd), packet_size);
3648 			goto err_out;
3649 		}
3650 	}
3651 
3652 	if (0) {
3653 	err_out:
3654 		drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3655 	}
3656 	/* If we leave here, we probably want to update at least the
3657 	 * "Connected" indicator on stable storage. Do so explicitly here. */
3658 	drbd_md_sync(mdev);
3659 }
3660 
3661 void drbd_flush_workqueue(struct drbd_conf *mdev)
3662 {
3663 	struct drbd_wq_barrier barr;
3664 
3665 	barr.w.cb = w_prev_work_done;
3666 	init_completion(&barr.done);
3667 	drbd_queue_work(&mdev->data.work, &barr.w);
3668 	wait_for_completion(&barr.done);
3669 }
3670 
3671 void drbd_free_tl_hash(struct drbd_conf *mdev)
3672 {
3673 	struct hlist_head *h;
3674 
3675 	spin_lock_irq(&mdev->req_lock);
3676 
3677 	if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3678 		spin_unlock_irq(&mdev->req_lock);
3679 		return;
3680 	}
3681 	/* paranoia code */
3682 	for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3683 		if (h->first)
3684 			dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3685 				(int)(h - mdev->ee_hash), h->first);
3686 	kfree(mdev->ee_hash);
3687 	mdev->ee_hash = NULL;
3688 	mdev->ee_hash_s = 0;
3689 
3690 	/* paranoia code */
3691 	for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3692 		if (h->first)
3693 			dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3694 				(int)(h - mdev->tl_hash), h->first);
3695 	kfree(mdev->tl_hash);
3696 	mdev->tl_hash = NULL;
3697 	mdev->tl_hash_s = 0;
3698 	spin_unlock_irq(&mdev->req_lock);
3699 }
3700 
3701 static void drbd_disconnect(struct drbd_conf *mdev)
3702 {
3703 	enum drbd_fencing_p fp;
3704 	union drbd_state os, ns;
3705 	int rv = SS_UNKNOWN_ERROR;
3706 	unsigned int i;
3707 
3708 	if (mdev->state.conn == C_STANDALONE)
3709 		return;
3710 	if (mdev->state.conn >= C_WF_CONNECTION)
3711 		dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3712 				drbd_conn_str(mdev->state.conn));
3713 
3714 	/* asender does not clean up anything. it must not interfere, either */
3715 	drbd_thread_stop(&mdev->asender);
3716 	drbd_free_sock(mdev);
3717 
3718 	/* wait for current activity to cease. */
3719 	spin_lock_irq(&mdev->req_lock);
3720 	_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3721 	_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3722 	_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3723 	spin_unlock_irq(&mdev->req_lock);
3724 
3725 	/* We do not have data structures that would allow us to
3726 	 * get the rs_pending_cnt down to 0 again.
3727 	 *  * On C_SYNC_TARGET we do not have any data structures describing
3728 	 *    the pending RSDataRequest's we have sent.
3729 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
3730 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3731 	 *  And no, it is not the sum of the reference counts in the
3732 	 *  resync_LRU. The resync_LRU tracks the whole operation including
3733 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3734 	 *  on the fly. */
3735 	drbd_rs_cancel_all(mdev);
3736 	mdev->rs_total = 0;
3737 	mdev->rs_failed = 0;
3738 	atomic_set(&mdev->rs_pending_cnt, 0);
3739 	wake_up(&mdev->misc_wait);
3740 
3741 	/* make sure syncer is stopped and w_resume_next_sg queued */
3742 	del_timer_sync(&mdev->resync_timer);
3743 	resync_timer_fn((unsigned long)mdev);
3744 
3745 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3746 	 * w_make_resync_request etc. which may still be on the worker queue
3747 	 * to be "canceled" */
3748 	drbd_flush_workqueue(mdev);
3749 
3750 	/* This also does reclaim_net_ee().  If we do this too early, we might
3751 	 * miss some resync ee and pages.*/
3752 	drbd_process_done_ee(mdev);
3753 
3754 	kfree(mdev->p_uuid);
3755 	mdev->p_uuid = NULL;
3756 
3757 	if (!is_susp(mdev->state))
3758 		tl_clear(mdev);
3759 
3760 	dev_info(DEV, "Connection closed\n");
3761 
3762 	drbd_md_sync(mdev);
3763 
3764 	fp = FP_DONT_CARE;
3765 	if (get_ldev(mdev)) {
3766 		fp = mdev->ldev->dc.fencing;
3767 		put_ldev(mdev);
3768 	}
3769 
3770 	if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3771 		drbd_try_outdate_peer_async(mdev);
3772 
3773 	spin_lock_irq(&mdev->req_lock);
3774 	os = mdev->state;
3775 	if (os.conn >= C_UNCONNECTED) {
3776 		/* Do not restart in case we are C_DISCONNECTING */
3777 		ns = os;
3778 		ns.conn = C_UNCONNECTED;
3779 		rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3780 	}
3781 	spin_unlock_irq(&mdev->req_lock);
3782 
3783 	if (os.conn == C_DISCONNECTING) {
3784 		wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3785 
3786 		if (!is_susp(mdev->state)) {
3787 			/* we must not free the tl_hash
3788 			 * while application io is still on the fly */
3789 			wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3790 			drbd_free_tl_hash(mdev);
3791 		}
3792 
3793 		crypto_free_hash(mdev->cram_hmac_tfm);
3794 		mdev->cram_hmac_tfm = NULL;
3795 
3796 		kfree(mdev->net_conf);
3797 		mdev->net_conf = NULL;
3798 		drbd_request_state(mdev, NS(conn, C_STANDALONE));
3799 	}
3800 
3801 	/* tcp_close and release of sendpage pages can be deferred.  I don't
3802 	 * want to use SO_LINGER, because apparently it can be deferred for
3803 	 * more than 20 seconds (longest time I checked).
3804 	 *
3805 	 * Actually we don't care for exactly when the network stack does its
3806 	 * put_page(), but release our reference on these pages right here.
3807 	 */
3808 	i = drbd_release_ee(mdev, &mdev->net_ee);
3809 	if (i)
3810 		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3811 	i = atomic_read(&mdev->pp_in_use_by_net);
3812 	if (i)
3813 		dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3814 	i = atomic_read(&mdev->pp_in_use);
3815 	if (i)
3816 		dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3817 
3818 	D_ASSERT(list_empty(&mdev->read_ee));
3819 	D_ASSERT(list_empty(&mdev->active_ee));
3820 	D_ASSERT(list_empty(&mdev->sync_ee));
3821 	D_ASSERT(list_empty(&mdev->done_ee));
3822 
3823 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3824 	atomic_set(&mdev->current_epoch->epoch_size, 0);
3825 	D_ASSERT(list_empty(&mdev->current_epoch->list));
3826 }
3827 
3828 /*
3829  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3830  * we can agree on is stored in agreed_pro_version.
3831  *
3832  * feature flags and the reserved array should be enough room for future
3833  * enhancements of the handshake protocol, and possible plugins...
3834  *
3835  * for now, they are expected to be zero, but ignored.
3836  */
3837 static int drbd_send_handshake(struct drbd_conf *mdev)
3838 {
3839 	/* ASSERT current == mdev->receiver ... */
3840 	struct p_handshake *p = &mdev->data.sbuf.handshake;
3841 	int ok;
3842 
3843 	if (mutex_lock_interruptible(&mdev->data.mutex)) {
3844 		dev_err(DEV, "interrupted during initial handshake\n");
3845 		return 0; /* interrupted. not ok. */
3846 	}
3847 
3848 	if (mdev->data.socket == NULL) {
3849 		mutex_unlock(&mdev->data.mutex);
3850 		return 0;
3851 	}
3852 
3853 	memset(p, 0, sizeof(*p));
3854 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3855 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3856 	ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3857 			     (struct p_header80 *)p, sizeof(*p), 0 );
3858 	mutex_unlock(&mdev->data.mutex);
3859 	return ok;
3860 }
3861 
3862 /*
3863  * return values:
3864  *   1 yes, we have a valid connection
3865  *   0 oops, did not work out, please try again
3866  *  -1 peer talks different language,
3867  *     no point in trying again, please go standalone.
3868  */
3869 static int drbd_do_handshake(struct drbd_conf *mdev)
3870 {
3871 	/* ASSERT current == mdev->receiver ... */
3872 	struct p_handshake *p = &mdev->data.rbuf.handshake;
3873 	const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
3874 	unsigned int length;
3875 	enum drbd_packets cmd;
3876 	int rv;
3877 
3878 	rv = drbd_send_handshake(mdev);
3879 	if (!rv)
3880 		return 0;
3881 
3882 	rv = drbd_recv_header(mdev, &cmd, &length);
3883 	if (!rv)
3884 		return 0;
3885 
3886 	if (cmd != P_HAND_SHAKE) {
3887 		dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3888 		     cmdname(cmd), cmd);
3889 		return -1;
3890 	}
3891 
3892 	if (length != expect) {
3893 		dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3894 		     expect, length);
3895 		return -1;
3896 	}
3897 
3898 	rv = drbd_recv(mdev, &p->head.payload, expect);
3899 
3900 	if (rv != expect) {
3901 		dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3902 		return 0;
3903 	}
3904 
3905 	p->protocol_min = be32_to_cpu(p->protocol_min);
3906 	p->protocol_max = be32_to_cpu(p->protocol_max);
3907 	if (p->protocol_max == 0)
3908 		p->protocol_max = p->protocol_min;
3909 
3910 	if (PRO_VERSION_MAX < p->protocol_min ||
3911 	    PRO_VERSION_MIN > p->protocol_max)
3912 		goto incompat;
3913 
3914 	mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3915 
3916 	dev_info(DEV, "Handshake successful: "
3917 	     "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3918 
3919 	return 1;
3920 
3921  incompat:
3922 	dev_err(DEV, "incompatible DRBD dialects: "
3923 	    "I support %d-%d, peer supports %d-%d\n",
3924 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
3925 	    p->protocol_min, p->protocol_max);
3926 	return -1;
3927 }
3928 
3929 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3930 static int drbd_do_auth(struct drbd_conf *mdev)
3931 {
3932 	dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3933 	dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3934 	return -1;
3935 }
3936 #else
3937 #define CHALLENGE_LEN 64
3938 
3939 /* Return value:
3940 	1 - auth succeeded,
3941 	0 - failed, try again (network error),
3942 	-1 - auth failed, don't try again.
3943 */
3944 
3945 static int drbd_do_auth(struct drbd_conf *mdev)
3946 {
3947 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
3948 	struct scatterlist sg;
3949 	char *response = NULL;
3950 	char *right_response = NULL;
3951 	char *peers_ch = NULL;
3952 	unsigned int key_len = strlen(mdev->net_conf->shared_secret);
3953 	unsigned int resp_size;
3954 	struct hash_desc desc;
3955 	enum drbd_packets cmd;
3956 	unsigned int length;
3957 	int rv;
3958 
3959 	desc.tfm = mdev->cram_hmac_tfm;
3960 	desc.flags = 0;
3961 
3962 	rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
3963 				(u8 *)mdev->net_conf->shared_secret, key_len);
3964 	if (rv) {
3965 		dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
3966 		rv = -1;
3967 		goto fail;
3968 	}
3969 
3970 	get_random_bytes(my_challenge, CHALLENGE_LEN);
3971 
3972 	rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
3973 	if (!rv)
3974 		goto fail;
3975 
3976 	rv = drbd_recv_header(mdev, &cmd, &length);
3977 	if (!rv)
3978 		goto fail;
3979 
3980 	if (cmd != P_AUTH_CHALLENGE) {
3981 		dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
3982 		    cmdname(cmd), cmd);
3983 		rv = 0;
3984 		goto fail;
3985 	}
3986 
3987 	if (length > CHALLENGE_LEN * 2) {
3988 		dev_err(DEV, "expected AuthChallenge payload too big.\n");
3989 		rv = -1;
3990 		goto fail;
3991 	}
3992 
3993 	peers_ch = kmalloc(length, GFP_NOIO);
3994 	if (peers_ch == NULL) {
3995 		dev_err(DEV, "kmalloc of peers_ch failed\n");
3996 		rv = -1;
3997 		goto fail;
3998 	}
3999 
4000 	rv = drbd_recv(mdev, peers_ch, length);
4001 
4002 	if (rv != length) {
4003 		dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
4004 		rv = 0;
4005 		goto fail;
4006 	}
4007 
4008 	resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4009 	response = kmalloc(resp_size, GFP_NOIO);
4010 	if (response == NULL) {
4011 		dev_err(DEV, "kmalloc of response failed\n");
4012 		rv = -1;
4013 		goto fail;
4014 	}
4015 
4016 	sg_init_table(&sg, 1);
4017 	sg_set_buf(&sg, peers_ch, length);
4018 
4019 	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4020 	if (rv) {
4021 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4022 		rv = -1;
4023 		goto fail;
4024 	}
4025 
4026 	rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4027 	if (!rv)
4028 		goto fail;
4029 
4030 	rv = drbd_recv_header(mdev, &cmd, &length);
4031 	if (!rv)
4032 		goto fail;
4033 
4034 	if (cmd != P_AUTH_RESPONSE) {
4035 		dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4036 			cmdname(cmd), cmd);
4037 		rv = 0;
4038 		goto fail;
4039 	}
4040 
4041 	if (length != resp_size) {
4042 		dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4043 		rv = 0;
4044 		goto fail;
4045 	}
4046 
4047 	rv = drbd_recv(mdev, response , resp_size);
4048 
4049 	if (rv != resp_size) {
4050 		dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4051 		rv = 0;
4052 		goto fail;
4053 	}
4054 
4055 	right_response = kmalloc(resp_size, GFP_NOIO);
4056 	if (right_response == NULL) {
4057 		dev_err(DEV, "kmalloc of right_response failed\n");
4058 		rv = -1;
4059 		goto fail;
4060 	}
4061 
4062 	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4063 
4064 	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4065 	if (rv) {
4066 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4067 		rv = -1;
4068 		goto fail;
4069 	}
4070 
4071 	rv = !memcmp(response, right_response, resp_size);
4072 
4073 	if (rv)
4074 		dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4075 		     resp_size, mdev->net_conf->cram_hmac_alg);
4076 	else
4077 		rv = -1;
4078 
4079  fail:
4080 	kfree(peers_ch);
4081 	kfree(response);
4082 	kfree(right_response);
4083 
4084 	return rv;
4085 }
4086 #endif
4087 
4088 int drbdd_init(struct drbd_thread *thi)
4089 {
4090 	struct drbd_conf *mdev = thi->mdev;
4091 	unsigned int minor = mdev_to_minor(mdev);
4092 	int h;
4093 
4094 	sprintf(current->comm, "drbd%d_receiver", minor);
4095 
4096 	dev_info(DEV, "receiver (re)started\n");
4097 
4098 	do {
4099 		h = drbd_connect(mdev);
4100 		if (h == 0) {
4101 			drbd_disconnect(mdev);
4102 			__set_current_state(TASK_INTERRUPTIBLE);
4103 			schedule_timeout(HZ);
4104 		}
4105 		if (h == -1) {
4106 			dev_warn(DEV, "Discarding network configuration.\n");
4107 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4108 		}
4109 	} while (h == 0);
4110 
4111 	if (h > 0) {
4112 		if (get_net_conf(mdev)) {
4113 			drbdd(mdev);
4114 			put_net_conf(mdev);
4115 		}
4116 	}
4117 
4118 	drbd_disconnect(mdev);
4119 
4120 	dev_info(DEV, "receiver terminated\n");
4121 	return 0;
4122 }
4123 
4124 /* ********* acknowledge sender ******** */
4125 
4126 static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4127 {
4128 	struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4129 
4130 	int retcode = be32_to_cpu(p->retcode);
4131 
4132 	if (retcode >= SS_SUCCESS) {
4133 		set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4134 	} else {
4135 		set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4136 		dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4137 		    drbd_set_st_err_str(retcode), retcode);
4138 	}
4139 	wake_up(&mdev->state_wait);
4140 
4141 	return TRUE;
4142 }
4143 
4144 static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4145 {
4146 	return drbd_send_ping_ack(mdev);
4147 
4148 }
4149 
4150 static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4151 {
4152 	/* restore idle timeout */
4153 	mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4154 	if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4155 		wake_up(&mdev->misc_wait);
4156 
4157 	return TRUE;
4158 }
4159 
4160 static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4161 {
4162 	struct p_block_ack *p = (struct p_block_ack *)h;
4163 	sector_t sector = be64_to_cpu(p->sector);
4164 	int blksize = be32_to_cpu(p->blksize);
4165 
4166 	D_ASSERT(mdev->agreed_pro_version >= 89);
4167 
4168 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4169 
4170 	if (get_ldev(mdev)) {
4171 		drbd_rs_complete_io(mdev, sector);
4172 		drbd_set_in_sync(mdev, sector, blksize);
4173 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4174 		mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4175 		put_ldev(mdev);
4176 	}
4177 	dec_rs_pending(mdev);
4178 	atomic_add(blksize >> 9, &mdev->rs_sect_in);
4179 
4180 	return TRUE;
4181 }
4182 
4183 /* when we receive the ACK for a write request,
4184  * verify that we actually know about it */
4185 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4186 	u64 id, sector_t sector)
4187 {
4188 	struct hlist_head *slot = tl_hash_slot(mdev, sector);
4189 	struct hlist_node *n;
4190 	struct drbd_request *req;
4191 
4192 	hlist_for_each_entry(req, n, slot, colision) {
4193 		if ((unsigned long)req == (unsigned long)id) {
4194 			if (req->sector != sector) {
4195 				dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4196 				    "wrong sector (%llus versus %llus)\n", req,
4197 				    (unsigned long long)req->sector,
4198 				    (unsigned long long)sector);
4199 				break;
4200 			}
4201 			return req;
4202 		}
4203 	}
4204 	dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4205 		(void *)(unsigned long)id, (unsigned long long)sector);
4206 	return NULL;
4207 }
4208 
4209 typedef struct drbd_request *(req_validator_fn)
4210 	(struct drbd_conf *mdev, u64 id, sector_t sector);
4211 
4212 static int validate_req_change_req_state(struct drbd_conf *mdev,
4213 	u64 id, sector_t sector, req_validator_fn validator,
4214 	const char *func, enum drbd_req_event what)
4215 {
4216 	struct drbd_request *req;
4217 	struct bio_and_error m;
4218 
4219 	spin_lock_irq(&mdev->req_lock);
4220 	req = validator(mdev, id, sector);
4221 	if (unlikely(!req)) {
4222 		spin_unlock_irq(&mdev->req_lock);
4223 		dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4224 		return FALSE;
4225 	}
4226 	__req_mod(req, what, &m);
4227 	spin_unlock_irq(&mdev->req_lock);
4228 
4229 	if (m.bio)
4230 		complete_master_bio(mdev, &m);
4231 	return TRUE;
4232 }
4233 
4234 static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4235 {
4236 	struct p_block_ack *p = (struct p_block_ack *)h;
4237 	sector_t sector = be64_to_cpu(p->sector);
4238 	int blksize = be32_to_cpu(p->blksize);
4239 	enum drbd_req_event what;
4240 
4241 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4242 
4243 	if (is_syncer_block_id(p->block_id)) {
4244 		drbd_set_in_sync(mdev, sector, blksize);
4245 		dec_rs_pending(mdev);
4246 		return TRUE;
4247 	}
4248 	switch (be16_to_cpu(h->command)) {
4249 	case P_RS_WRITE_ACK:
4250 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4251 		what = write_acked_by_peer_and_sis;
4252 		break;
4253 	case P_WRITE_ACK:
4254 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4255 		what = write_acked_by_peer;
4256 		break;
4257 	case P_RECV_ACK:
4258 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4259 		what = recv_acked_by_peer;
4260 		break;
4261 	case P_DISCARD_ACK:
4262 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4263 		what = conflict_discarded_by_peer;
4264 		break;
4265 	default:
4266 		D_ASSERT(0);
4267 		return FALSE;
4268 	}
4269 
4270 	return validate_req_change_req_state(mdev, p->block_id, sector,
4271 		_ack_id_to_req, __func__ , what);
4272 }
4273 
4274 static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4275 {
4276 	struct p_block_ack *p = (struct p_block_ack *)h;
4277 	sector_t sector = be64_to_cpu(p->sector);
4278 
4279 	if (__ratelimit(&drbd_ratelimit_state))
4280 		dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4281 
4282 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4283 
4284 	if (is_syncer_block_id(p->block_id)) {
4285 		int size = be32_to_cpu(p->blksize);
4286 		dec_rs_pending(mdev);
4287 		drbd_rs_failed_io(mdev, sector, size);
4288 		return TRUE;
4289 	}
4290 	return validate_req_change_req_state(mdev, p->block_id, sector,
4291 		_ack_id_to_req, __func__ , neg_acked);
4292 }
4293 
4294 static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4295 {
4296 	struct p_block_ack *p = (struct p_block_ack *)h;
4297 	sector_t sector = be64_to_cpu(p->sector);
4298 
4299 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4300 	dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4301 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
4302 
4303 	return validate_req_change_req_state(mdev, p->block_id, sector,
4304 		_ar_id_to_req, __func__ , neg_acked);
4305 }
4306 
4307 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4308 {
4309 	sector_t sector;
4310 	int size;
4311 	struct p_block_ack *p = (struct p_block_ack *)h;
4312 
4313 	sector = be64_to_cpu(p->sector);
4314 	size = be32_to_cpu(p->blksize);
4315 
4316 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4317 
4318 	dec_rs_pending(mdev);
4319 
4320 	if (get_ldev_if_state(mdev, D_FAILED)) {
4321 		drbd_rs_complete_io(mdev, sector);
4322 		drbd_rs_failed_io(mdev, sector, size);
4323 		put_ldev(mdev);
4324 	}
4325 
4326 	return TRUE;
4327 }
4328 
4329 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4330 {
4331 	struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4332 
4333 	tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4334 
4335 	return TRUE;
4336 }
4337 
4338 static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4339 {
4340 	struct p_block_ack *p = (struct p_block_ack *)h;
4341 	struct drbd_work *w;
4342 	sector_t sector;
4343 	int size;
4344 
4345 	sector = be64_to_cpu(p->sector);
4346 	size = be32_to_cpu(p->blksize);
4347 
4348 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4349 
4350 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4351 		drbd_ov_oos_found(mdev, sector, size);
4352 	else
4353 		ov_oos_print(mdev);
4354 
4355 	if (!get_ldev(mdev))
4356 		return TRUE;
4357 
4358 	drbd_rs_complete_io(mdev, sector);
4359 	dec_rs_pending(mdev);
4360 
4361 	if (--mdev->ov_left == 0) {
4362 		w = kmalloc(sizeof(*w), GFP_NOIO);
4363 		if (w) {
4364 			w->cb = w_ov_finished;
4365 			drbd_queue_work_front(&mdev->data.work, w);
4366 		} else {
4367 			dev_err(DEV, "kmalloc(w) failed.");
4368 			ov_oos_print(mdev);
4369 			drbd_resync_finished(mdev);
4370 		}
4371 	}
4372 	put_ldev(mdev);
4373 	return TRUE;
4374 }
4375 
4376 static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4377 {
4378 	return TRUE;
4379 }
4380 
4381 struct asender_cmd {
4382 	size_t pkt_size;
4383 	int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4384 };
4385 
4386 static struct asender_cmd *get_asender_cmd(int cmd)
4387 {
4388 	static struct asender_cmd asender_tbl[] = {
4389 		/* anything missing from this table is in
4390 		 * the drbd_cmd_handler (drbd_default_handler) table,
4391 		 * see the beginning of drbdd() */
4392 	[P_PING]	    = { sizeof(struct p_header80), got_Ping },
4393 	[P_PING_ACK]	    = { sizeof(struct p_header80), got_PingAck },
4394 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4395 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4396 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4397 	[P_DISCARD_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4398 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
4399 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
4400 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4401 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
4402 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
4403 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4404 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4405 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4406 	[P_MAX_CMD]	    = { 0, NULL },
4407 	};
4408 	if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4409 		return NULL;
4410 	return &asender_tbl[cmd];
4411 }
4412 
4413 int drbd_asender(struct drbd_thread *thi)
4414 {
4415 	struct drbd_conf *mdev = thi->mdev;
4416 	struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4417 	struct asender_cmd *cmd = NULL;
4418 
4419 	int rv, len;
4420 	void *buf    = h;
4421 	int received = 0;
4422 	int expect   = sizeof(struct p_header80);
4423 	int empty;
4424 
4425 	sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4426 
4427 	current->policy = SCHED_RR;  /* Make this a realtime task! */
4428 	current->rt_priority = 2;    /* more important than all other tasks */
4429 
4430 	while (get_t_state(thi) == Running) {
4431 		drbd_thread_current_set_cpu(mdev);
4432 		if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4433 			ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4434 			mdev->meta.socket->sk->sk_rcvtimeo =
4435 				mdev->net_conf->ping_timeo*HZ/10;
4436 		}
4437 
4438 		/* conditionally cork;
4439 		 * it may hurt latency if we cork without much to send */
4440 		if (!mdev->net_conf->no_cork &&
4441 			3 < atomic_read(&mdev->unacked_cnt))
4442 			drbd_tcp_cork(mdev->meta.socket);
4443 		while (1) {
4444 			clear_bit(SIGNAL_ASENDER, &mdev->flags);
4445 			flush_signals(current);
4446 			if (!drbd_process_done_ee(mdev))
4447 				goto reconnect;
4448 			/* to avoid race with newly queued ACKs */
4449 			set_bit(SIGNAL_ASENDER, &mdev->flags);
4450 			spin_lock_irq(&mdev->req_lock);
4451 			empty = list_empty(&mdev->done_ee);
4452 			spin_unlock_irq(&mdev->req_lock);
4453 			/* new ack may have been queued right here,
4454 			 * but then there is also a signal pending,
4455 			 * and we start over... */
4456 			if (empty)
4457 				break;
4458 		}
4459 		/* but unconditionally uncork unless disabled */
4460 		if (!mdev->net_conf->no_cork)
4461 			drbd_tcp_uncork(mdev->meta.socket);
4462 
4463 		/* short circuit, recv_msg would return EINTR anyways. */
4464 		if (signal_pending(current))
4465 			continue;
4466 
4467 		rv = drbd_recv_short(mdev, mdev->meta.socket,
4468 				     buf, expect-received, 0);
4469 		clear_bit(SIGNAL_ASENDER, &mdev->flags);
4470 
4471 		flush_signals(current);
4472 
4473 		/* Note:
4474 		 * -EINTR	 (on meta) we got a signal
4475 		 * -EAGAIN	 (on meta) rcvtimeo expired
4476 		 * -ECONNRESET	 other side closed the connection
4477 		 * -ERESTARTSYS  (on data) we got a signal
4478 		 * rv <  0	 other than above: unexpected error!
4479 		 * rv == expected: full header or command
4480 		 * rv <  expected: "woken" by signal during receive
4481 		 * rv == 0	 : "connection shut down by peer"
4482 		 */
4483 		if (likely(rv > 0)) {
4484 			received += rv;
4485 			buf	 += rv;
4486 		} else if (rv == 0) {
4487 			dev_err(DEV, "meta connection shut down by peer.\n");
4488 			goto reconnect;
4489 		} else if (rv == -EAGAIN) {
4490 			if (mdev->meta.socket->sk->sk_rcvtimeo ==
4491 			    mdev->net_conf->ping_timeo*HZ/10) {
4492 				dev_err(DEV, "PingAck did not arrive in time.\n");
4493 				goto reconnect;
4494 			}
4495 			set_bit(SEND_PING, &mdev->flags);
4496 			continue;
4497 		} else if (rv == -EINTR) {
4498 			continue;
4499 		} else {
4500 			dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4501 			goto reconnect;
4502 		}
4503 
4504 		if (received == expect && cmd == NULL) {
4505 			if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4506 				dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4507 				    be32_to_cpu(h->magic),
4508 				    be16_to_cpu(h->command),
4509 				    be16_to_cpu(h->length));
4510 				goto reconnect;
4511 			}
4512 			cmd = get_asender_cmd(be16_to_cpu(h->command));
4513 			len = be16_to_cpu(h->length);
4514 			if (unlikely(cmd == NULL)) {
4515 				dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4516 				    be32_to_cpu(h->magic),
4517 				    be16_to_cpu(h->command),
4518 				    be16_to_cpu(h->length));
4519 				goto disconnect;
4520 			}
4521 			expect = cmd->pkt_size;
4522 			ERR_IF(len != expect-sizeof(struct p_header80))
4523 				goto reconnect;
4524 		}
4525 		if (received == expect) {
4526 			D_ASSERT(cmd != NULL);
4527 			if (!cmd->process(mdev, h))
4528 				goto reconnect;
4529 
4530 			buf	 = h;
4531 			received = 0;
4532 			expect	 = sizeof(struct p_header80);
4533 			cmd	 = NULL;
4534 		}
4535 	}
4536 
4537 	if (0) {
4538 reconnect:
4539 		drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4540 		drbd_md_sync(mdev);
4541 	}
4542 	if (0) {
4543 disconnect:
4544 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4545 		drbd_md_sync(mdev);
4546 	}
4547 	clear_bit(SIGNAL_ASENDER, &mdev->flags);
4548 
4549 	D_ASSERT(mdev->state.conn < C_CONNECTED);
4550 	dev_info(DEV, "asender terminated\n");
4551 
4552 	return 0;
4553 }
4554