1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48 
49 #include "drbd_vli.h"
50 
51 enum finish_epoch {
52 	FE_STILL_LIVE,
53 	FE_DESTROYED,
54 	FE_RECYCLED,
55 };
56 
57 static int drbd_do_handshake(struct drbd_conf *mdev);
58 static int drbd_do_auth(struct drbd_conf *mdev);
59 
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62 
63 
64 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
65 
66 /*
67  * some helper functions to deal with single linked page lists,
68  * page->private being our "next" pointer.
69  */
70 
71 /* If at least n pages are linked at head, get n pages off.
72  * Otherwise, don't modify head, and return NULL.
73  * Locking is the responsibility of the caller.
74  */
75 static struct page *page_chain_del(struct page **head, int n)
76 {
77 	struct page *page;
78 	struct page *tmp;
79 
80 	BUG_ON(!n);
81 	BUG_ON(!head);
82 
83 	page = *head;
84 
85 	if (!page)
86 		return NULL;
87 
88 	while (page) {
89 		tmp = page_chain_next(page);
90 		if (--n == 0)
91 			break; /* found sufficient pages */
92 		if (tmp == NULL)
93 			/* insufficient pages, don't use any of them. */
94 			return NULL;
95 		page = tmp;
96 	}
97 
98 	/* add end of list marker for the returned list */
99 	set_page_private(page, 0);
100 	/* actual return value, and adjustment of head */
101 	page = *head;
102 	*head = tmp;
103 	return page;
104 }
105 
106 /* may be used outside of locks to find the tail of a (usually short)
107  * "private" page chain, before adding it back to a global chain head
108  * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
110 {
111 	struct page *tmp;
112 	int i = 1;
113 	while ((tmp = page_chain_next(page)))
114 		++i, page = tmp;
115 	if (len)
116 		*len = i;
117 	return page;
118 }
119 
120 static int page_chain_free(struct page *page)
121 {
122 	struct page *tmp;
123 	int i = 0;
124 	page_chain_for_each_safe(page, tmp) {
125 		put_page(page);
126 		++i;
127 	}
128 	return i;
129 }
130 
131 static void page_chain_add(struct page **head,
132 		struct page *chain_first, struct page *chain_last)
133 {
134 #if 1
135 	struct page *tmp;
136 	tmp = page_chain_tail(chain_first, NULL);
137 	BUG_ON(tmp != chain_last);
138 #endif
139 
140 	/* add chain to head */
141 	set_page_private(chain_last, (unsigned long)*head);
142 	*head = chain_first;
143 }
144 
145 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
146 {
147 	struct page *page = NULL;
148 	struct page *tmp = NULL;
149 	int i = 0;
150 
151 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
152 	 * So what. It saves a spin_lock. */
153 	if (drbd_pp_vacant >= number) {
154 		spin_lock(&drbd_pp_lock);
155 		page = page_chain_del(&drbd_pp_pool, number);
156 		if (page)
157 			drbd_pp_vacant -= number;
158 		spin_unlock(&drbd_pp_lock);
159 		if (page)
160 			return page;
161 	}
162 
163 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
165 	 * which in turn might block on the other node at this very place.  */
166 	for (i = 0; i < number; i++) {
167 		tmp = alloc_page(GFP_TRY);
168 		if (!tmp)
169 			break;
170 		set_page_private(tmp, (unsigned long)page);
171 		page = tmp;
172 	}
173 
174 	if (i == number)
175 		return page;
176 
177 	/* Not enough pages immediately available this time.
178 	 * No need to jump around here, drbd_pp_alloc will retry this
179 	 * function "soon". */
180 	if (page) {
181 		tmp = page_chain_tail(page, NULL);
182 		spin_lock(&drbd_pp_lock);
183 		page_chain_add(&drbd_pp_pool, page, tmp);
184 		drbd_pp_vacant += i;
185 		spin_unlock(&drbd_pp_lock);
186 	}
187 	return NULL;
188 }
189 
190 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
191 {
192 	struct drbd_epoch_entry *e;
193 	struct list_head *le, *tle;
194 
195 	/* The EEs are always appended to the end of the list. Since
196 	   they are sent in order over the wire, they have to finish
197 	   in order. As soon as we see the first not finished we can
198 	   stop to examine the list... */
199 
200 	list_for_each_safe(le, tle, &mdev->net_ee) {
201 		e = list_entry(le, struct drbd_epoch_entry, w.list);
202 		if (drbd_ee_has_active_page(e))
203 			break;
204 		list_move(le, to_be_freed);
205 	}
206 }
207 
208 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
209 {
210 	LIST_HEAD(reclaimed);
211 	struct drbd_epoch_entry *e, *t;
212 
213 	spin_lock_irq(&mdev->req_lock);
214 	reclaim_net_ee(mdev, &reclaimed);
215 	spin_unlock_irq(&mdev->req_lock);
216 
217 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
218 		drbd_free_net_ee(mdev, e);
219 }
220 
221 /**
222  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
223  * @mdev:	DRBD device.
224  * @number:	number of pages requested
225  * @retry:	whether to retry, if not enough pages are available right now
226  *
227  * Tries to allocate number pages, first from our own page pool, then from
228  * the kernel, unless this allocation would exceed the max_buffers setting.
229  * Possibly retry until DRBD frees sufficient pages somewhere else.
230  *
231  * Returns a page chain linked via page->private.
232  */
233 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
234 {
235 	struct page *page = NULL;
236 	DEFINE_WAIT(wait);
237 
238 	/* Yes, we may run up to @number over max_buffers. If we
239 	 * follow it strictly, the admin will get it wrong anyways. */
240 	if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
241 		page = drbd_pp_first_pages_or_try_alloc(mdev, number);
242 
243 	while (page == NULL) {
244 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
245 
246 		drbd_kick_lo_and_reclaim_net(mdev);
247 
248 		if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
249 			page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250 			if (page)
251 				break;
252 		}
253 
254 		if (!retry)
255 			break;
256 
257 		if (signal_pending(current)) {
258 			dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
259 			break;
260 		}
261 
262 		schedule();
263 	}
264 	finish_wait(&drbd_pp_wait, &wait);
265 
266 	if (page)
267 		atomic_add(number, &mdev->pp_in_use);
268 	return page;
269 }
270 
271 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
272  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
273  * Either links the page chain back to the global pool,
274  * or returns all pages to the system. */
275 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
276 {
277 	atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
278 	int i;
279 
280 	if (page == NULL)
281 		return;
282 
283 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
284 		i = page_chain_free(page);
285 	else {
286 		struct page *tmp;
287 		tmp = page_chain_tail(page, &i);
288 		spin_lock(&drbd_pp_lock);
289 		page_chain_add(&drbd_pp_pool, page, tmp);
290 		drbd_pp_vacant += i;
291 		spin_unlock(&drbd_pp_lock);
292 	}
293 	i = atomic_sub_return(i, a);
294 	if (i < 0)
295 		dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
296 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
297 	wake_up(&drbd_pp_wait);
298 }
299 
300 /*
301 You need to hold the req_lock:
302  _drbd_wait_ee_list_empty()
303 
304 You must not have the req_lock:
305  drbd_free_ee()
306  drbd_alloc_ee()
307  drbd_init_ee()
308  drbd_release_ee()
309  drbd_ee_fix_bhs()
310  drbd_process_done_ee()
311  drbd_clear_done_ee()
312  drbd_wait_ee_list_empty()
313 */
314 
315 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
316 				     u64 id,
317 				     sector_t sector,
318 				     unsigned int data_size,
319 				     gfp_t gfp_mask) __must_hold(local)
320 {
321 	struct drbd_epoch_entry *e;
322 	struct page *page = NULL;
323 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
324 
325 	if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
326 		return NULL;
327 
328 	e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
329 	if (!e) {
330 		if (!(gfp_mask & __GFP_NOWARN))
331 			dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
332 		return NULL;
333 	}
334 
335 	if (data_size) {
336 		page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
337 		if (!page)
338 			goto fail;
339 	}
340 
341 	INIT_HLIST_NODE(&e->collision);
342 	e->epoch = NULL;
343 	e->mdev = mdev;
344 	e->pages = page;
345 	atomic_set(&e->pending_bios, 0);
346 	e->size = data_size;
347 	e->flags = 0;
348 	e->sector = sector;
349 	e->block_id = id;
350 
351 	return e;
352 
353  fail:
354 	mempool_free(e, drbd_ee_mempool);
355 	return NULL;
356 }
357 
358 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
359 {
360 	if (e->flags & EE_HAS_DIGEST)
361 		kfree(e->digest);
362 	drbd_pp_free(mdev, e->pages, is_net);
363 	D_ASSERT(atomic_read(&e->pending_bios) == 0);
364 	D_ASSERT(hlist_unhashed(&e->collision));
365 	mempool_free(e, drbd_ee_mempool);
366 }
367 
368 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
369 {
370 	LIST_HEAD(work_list);
371 	struct drbd_epoch_entry *e, *t;
372 	int count = 0;
373 	int is_net = list == &mdev->net_ee;
374 
375 	spin_lock_irq(&mdev->req_lock);
376 	list_splice_init(list, &work_list);
377 	spin_unlock_irq(&mdev->req_lock);
378 
379 	list_for_each_entry_safe(e, t, &work_list, w.list) {
380 		drbd_free_some_ee(mdev, e, is_net);
381 		count++;
382 	}
383 	return count;
384 }
385 
386 
387 /*
388  * This function is called from _asender only_
389  * but see also comments in _req_mod(,barrier_acked)
390  * and receive_Barrier.
391  *
392  * Move entries from net_ee to done_ee, if ready.
393  * Grab done_ee, call all callbacks, free the entries.
394  * The callbacks typically send out ACKs.
395  */
396 static int drbd_process_done_ee(struct drbd_conf *mdev)
397 {
398 	LIST_HEAD(work_list);
399 	LIST_HEAD(reclaimed);
400 	struct drbd_epoch_entry *e, *t;
401 	int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
402 
403 	spin_lock_irq(&mdev->req_lock);
404 	reclaim_net_ee(mdev, &reclaimed);
405 	list_splice_init(&mdev->done_ee, &work_list);
406 	spin_unlock_irq(&mdev->req_lock);
407 
408 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
409 		drbd_free_net_ee(mdev, e);
410 
411 	/* possible callbacks here:
412 	 * e_end_block, and e_end_resync_block, e_send_discard_ack.
413 	 * all ignore the last argument.
414 	 */
415 	list_for_each_entry_safe(e, t, &work_list, w.list) {
416 		/* list_del not necessary, next/prev members not touched */
417 		ok = e->w.cb(mdev, &e->w, !ok) && ok;
418 		drbd_free_ee(mdev, e);
419 	}
420 	wake_up(&mdev->ee_wait);
421 
422 	return ok;
423 }
424 
425 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
426 {
427 	DEFINE_WAIT(wait);
428 
429 	/* avoids spin_lock/unlock
430 	 * and calling prepare_to_wait in the fast path */
431 	while (!list_empty(head)) {
432 		prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
433 		spin_unlock_irq(&mdev->req_lock);
434 		io_schedule();
435 		finish_wait(&mdev->ee_wait, &wait);
436 		spin_lock_irq(&mdev->req_lock);
437 	}
438 }
439 
440 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
441 {
442 	spin_lock_irq(&mdev->req_lock);
443 	_drbd_wait_ee_list_empty(mdev, head);
444 	spin_unlock_irq(&mdev->req_lock);
445 }
446 
447 /* see also kernel_accept; which is only present since 2.6.18.
448  * also we want to log which part of it failed, exactly */
449 static int drbd_accept(struct drbd_conf *mdev, const char **what,
450 		struct socket *sock, struct socket **newsock)
451 {
452 	struct sock *sk = sock->sk;
453 	int err = 0;
454 
455 	*what = "listen";
456 	err = sock->ops->listen(sock, 5);
457 	if (err < 0)
458 		goto out;
459 
460 	*what = "sock_create_lite";
461 	err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
462 			       newsock);
463 	if (err < 0)
464 		goto out;
465 
466 	*what = "accept";
467 	err = sock->ops->accept(sock, *newsock, 0);
468 	if (err < 0) {
469 		sock_release(*newsock);
470 		*newsock = NULL;
471 		goto out;
472 	}
473 	(*newsock)->ops  = sock->ops;
474 	__module_get((*newsock)->ops->owner);
475 
476 out:
477 	return err;
478 }
479 
480 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
481 		    void *buf, size_t size, int flags)
482 {
483 	mm_segment_t oldfs;
484 	struct kvec iov = {
485 		.iov_base = buf,
486 		.iov_len = size,
487 	};
488 	struct msghdr msg = {
489 		.msg_iovlen = 1,
490 		.msg_iov = (struct iovec *)&iov,
491 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
492 	};
493 	int rv;
494 
495 	oldfs = get_fs();
496 	set_fs(KERNEL_DS);
497 	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
498 	set_fs(oldfs);
499 
500 	return rv;
501 }
502 
503 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
504 {
505 	mm_segment_t oldfs;
506 	struct kvec iov = {
507 		.iov_base = buf,
508 		.iov_len = size,
509 	};
510 	struct msghdr msg = {
511 		.msg_iovlen = 1,
512 		.msg_iov = (struct iovec *)&iov,
513 		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL
514 	};
515 	int rv;
516 
517 	oldfs = get_fs();
518 	set_fs(KERNEL_DS);
519 
520 	for (;;) {
521 		rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
522 		if (rv == size)
523 			break;
524 
525 		/* Note:
526 		 * ECONNRESET	other side closed the connection
527 		 * ERESTARTSYS	(on  sock) we got a signal
528 		 */
529 
530 		if (rv < 0) {
531 			if (rv == -ECONNRESET)
532 				dev_info(DEV, "sock was reset by peer\n");
533 			else if (rv != -ERESTARTSYS)
534 				dev_err(DEV, "sock_recvmsg returned %d\n", rv);
535 			break;
536 		} else if (rv == 0) {
537 			dev_info(DEV, "sock was shut down by peer\n");
538 			break;
539 		} else	{
540 			/* signal came in, or peer/link went down,
541 			 * after we read a partial message
542 			 */
543 			/* D_ASSERT(signal_pending(current)); */
544 			break;
545 		}
546 	};
547 
548 	set_fs(oldfs);
549 
550 	if (rv != size)
551 		drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
552 
553 	return rv;
554 }
555 
556 /* quoting tcp(7):
557  *   On individual connections, the socket buffer size must be set prior to the
558  *   listen(2) or connect(2) calls in order to have it take effect.
559  * This is our wrapper to do so.
560  */
561 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
562 		unsigned int rcv)
563 {
564 	/* open coded SO_SNDBUF, SO_RCVBUF */
565 	if (snd) {
566 		sock->sk->sk_sndbuf = snd;
567 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
568 	}
569 	if (rcv) {
570 		sock->sk->sk_rcvbuf = rcv;
571 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
572 	}
573 }
574 
575 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
576 {
577 	const char *what;
578 	struct socket *sock;
579 	struct sockaddr_in6 src_in6;
580 	int err;
581 	int disconnect_on_error = 1;
582 
583 	if (!get_net_conf(mdev))
584 		return NULL;
585 
586 	what = "sock_create_kern";
587 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
588 		SOCK_STREAM, IPPROTO_TCP, &sock);
589 	if (err < 0) {
590 		sock = NULL;
591 		goto out;
592 	}
593 
594 	sock->sk->sk_rcvtimeo =
595 	sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
596 	drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
597 			mdev->net_conf->rcvbuf_size);
598 
599        /* explicitly bind to the configured IP as source IP
600 	*  for the outgoing connections.
601 	*  This is needed for multihomed hosts and to be
602 	*  able to use lo: interfaces for drbd.
603 	* Make sure to use 0 as port number, so linux selects
604 	*  a free one dynamically.
605 	*/
606 	memcpy(&src_in6, mdev->net_conf->my_addr,
607 	       min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
608 	if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
609 		src_in6.sin6_port = 0;
610 	else
611 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
612 
613 	what = "bind before connect";
614 	err = sock->ops->bind(sock,
615 			      (struct sockaddr *) &src_in6,
616 			      mdev->net_conf->my_addr_len);
617 	if (err < 0)
618 		goto out;
619 
620 	/* connect may fail, peer not yet available.
621 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
622 	disconnect_on_error = 0;
623 	what = "connect";
624 	err = sock->ops->connect(sock,
625 				 (struct sockaddr *)mdev->net_conf->peer_addr,
626 				 mdev->net_conf->peer_addr_len, 0);
627 
628 out:
629 	if (err < 0) {
630 		if (sock) {
631 			sock_release(sock);
632 			sock = NULL;
633 		}
634 		switch (-err) {
635 			/* timeout, busy, signal pending */
636 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
637 		case EINTR: case ERESTARTSYS:
638 			/* peer not (yet) available, network problem */
639 		case ECONNREFUSED: case ENETUNREACH:
640 		case EHOSTDOWN:    case EHOSTUNREACH:
641 			disconnect_on_error = 0;
642 			break;
643 		default:
644 			dev_err(DEV, "%s failed, err = %d\n", what, err);
645 		}
646 		if (disconnect_on_error)
647 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
648 	}
649 	put_net_conf(mdev);
650 	return sock;
651 }
652 
653 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
654 {
655 	int timeo, err;
656 	struct socket *s_estab = NULL, *s_listen;
657 	const char *what;
658 
659 	if (!get_net_conf(mdev))
660 		return NULL;
661 
662 	what = "sock_create_kern";
663 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
664 		SOCK_STREAM, IPPROTO_TCP, &s_listen);
665 	if (err) {
666 		s_listen = NULL;
667 		goto out;
668 	}
669 
670 	timeo = mdev->net_conf->try_connect_int * HZ;
671 	timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
672 
673 	s_listen->sk->sk_reuse    = SK_CAN_REUSE; /* SO_REUSEADDR */
674 	s_listen->sk->sk_rcvtimeo = timeo;
675 	s_listen->sk->sk_sndtimeo = timeo;
676 	drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
677 			mdev->net_conf->rcvbuf_size);
678 
679 	what = "bind before listen";
680 	err = s_listen->ops->bind(s_listen,
681 			      (struct sockaddr *) mdev->net_conf->my_addr,
682 			      mdev->net_conf->my_addr_len);
683 	if (err < 0)
684 		goto out;
685 
686 	err = drbd_accept(mdev, &what, s_listen, &s_estab);
687 
688 out:
689 	if (s_listen)
690 		sock_release(s_listen);
691 	if (err < 0) {
692 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
693 			dev_err(DEV, "%s failed, err = %d\n", what, err);
694 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
695 		}
696 	}
697 	put_net_conf(mdev);
698 
699 	return s_estab;
700 }
701 
702 static int drbd_send_fp(struct drbd_conf *mdev,
703 	struct socket *sock, enum drbd_packets cmd)
704 {
705 	struct p_header80 *h = &mdev->data.sbuf.header.h80;
706 
707 	return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
708 }
709 
710 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
711 {
712 	struct p_header80 *h = &mdev->data.rbuf.header.h80;
713 	int rr;
714 
715 	rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
716 
717 	if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
718 		return be16_to_cpu(h->command);
719 
720 	return 0xffff;
721 }
722 
723 /**
724  * drbd_socket_okay() - Free the socket if its connection is not okay
725  * @mdev:	DRBD device.
726  * @sock:	pointer to the pointer to the socket.
727  */
728 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
729 {
730 	int rr;
731 	char tb[4];
732 
733 	if (!*sock)
734 		return false;
735 
736 	rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
737 
738 	if (rr > 0 || rr == -EAGAIN) {
739 		return true;
740 	} else {
741 		sock_release(*sock);
742 		*sock = NULL;
743 		return false;
744 	}
745 }
746 
747 /*
748  * return values:
749  *   1 yes, we have a valid connection
750  *   0 oops, did not work out, please try again
751  *  -1 peer talks different language,
752  *     no point in trying again, please go standalone.
753  *  -2 We do not have a network config...
754  */
755 static int drbd_connect(struct drbd_conf *mdev)
756 {
757 	struct socket *s, *sock, *msock;
758 	int try, h, ok;
759 	enum drbd_state_rv rv;
760 
761 	D_ASSERT(!mdev->data.socket);
762 
763 	if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
764 		return -2;
765 
766 	clear_bit(DISCARD_CONCURRENT, &mdev->flags);
767 
768 	sock  = NULL;
769 	msock = NULL;
770 
771 	do {
772 		for (try = 0;;) {
773 			/* 3 tries, this should take less than a second! */
774 			s = drbd_try_connect(mdev);
775 			if (s || ++try >= 3)
776 				break;
777 			/* give the other side time to call bind() & listen() */
778 			schedule_timeout_interruptible(HZ / 10);
779 		}
780 
781 		if (s) {
782 			if (!sock) {
783 				drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
784 				sock = s;
785 				s = NULL;
786 			} else if (!msock) {
787 				drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
788 				msock = s;
789 				s = NULL;
790 			} else {
791 				dev_err(DEV, "Logic error in drbd_connect()\n");
792 				goto out_release_sockets;
793 			}
794 		}
795 
796 		if (sock && msock) {
797 			schedule_timeout_interruptible(mdev->net_conf->ping_timeo*HZ/10);
798 			ok = drbd_socket_okay(mdev, &sock);
799 			ok = drbd_socket_okay(mdev, &msock) && ok;
800 			if (ok)
801 				break;
802 		}
803 
804 retry:
805 		s = drbd_wait_for_connect(mdev);
806 		if (s) {
807 			try = drbd_recv_fp(mdev, s);
808 			drbd_socket_okay(mdev, &sock);
809 			drbd_socket_okay(mdev, &msock);
810 			switch (try) {
811 			case P_HAND_SHAKE_S:
812 				if (sock) {
813 					dev_warn(DEV, "initial packet S crossed\n");
814 					sock_release(sock);
815 				}
816 				sock = s;
817 				break;
818 			case P_HAND_SHAKE_M:
819 				if (msock) {
820 					dev_warn(DEV, "initial packet M crossed\n");
821 					sock_release(msock);
822 				}
823 				msock = s;
824 				set_bit(DISCARD_CONCURRENT, &mdev->flags);
825 				break;
826 			default:
827 				dev_warn(DEV, "Error receiving initial packet\n");
828 				sock_release(s);
829 				if (random32() & 1)
830 					goto retry;
831 			}
832 		}
833 
834 		if (mdev->state.conn <= C_DISCONNECTING)
835 			goto out_release_sockets;
836 		if (signal_pending(current)) {
837 			flush_signals(current);
838 			smp_rmb();
839 			if (get_t_state(&mdev->receiver) == Exiting)
840 				goto out_release_sockets;
841 		}
842 
843 		if (sock && msock) {
844 			ok = drbd_socket_okay(mdev, &sock);
845 			ok = drbd_socket_okay(mdev, &msock) && ok;
846 			if (ok)
847 				break;
848 		}
849 	} while (1);
850 
851 	msock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
852 	sock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
853 
854 	sock->sk->sk_allocation = GFP_NOIO;
855 	msock->sk->sk_allocation = GFP_NOIO;
856 
857 	sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
858 	msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
859 
860 	/* NOT YET ...
861 	 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862 	 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
863 	 * first set it to the P_HAND_SHAKE timeout,
864 	 * which we set to 4x the configured ping_timeout. */
865 	sock->sk->sk_sndtimeo =
866 	sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
867 
868 	msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
869 	msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
870 
871 	/* we don't want delays.
872 	 * we use TCP_CORK where appropriate, though */
873 	drbd_tcp_nodelay(sock);
874 	drbd_tcp_nodelay(msock);
875 
876 	mdev->data.socket = sock;
877 	mdev->meta.socket = msock;
878 	mdev->last_received = jiffies;
879 
880 	D_ASSERT(mdev->asender.task == NULL);
881 
882 	h = drbd_do_handshake(mdev);
883 	if (h <= 0)
884 		return h;
885 
886 	if (mdev->cram_hmac_tfm) {
887 		/* drbd_request_state(mdev, NS(conn, WFAuth)); */
888 		switch (drbd_do_auth(mdev)) {
889 		case -1:
890 			dev_err(DEV, "Authentication of peer failed\n");
891 			return -1;
892 		case 0:
893 			dev_err(DEV, "Authentication of peer failed, trying again.\n");
894 			return 0;
895 		}
896 	}
897 
898 	sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
899 	sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
900 
901 	atomic_set(&mdev->packet_seq, 0);
902 	mdev->peer_seq = 0;
903 
904 	if (drbd_send_protocol(mdev) == -1)
905 		return -1;
906 	set_bit(STATE_SENT, &mdev->flags);
907 	drbd_send_sync_param(mdev, &mdev->sync_conf);
908 	drbd_send_sizes(mdev, 0, 0);
909 	drbd_send_uuids(mdev);
910 	drbd_send_current_state(mdev);
911 	clear_bit(USE_DEGR_WFC_T, &mdev->flags);
912 	clear_bit(RESIZE_PENDING, &mdev->flags);
913 
914 	spin_lock_irq(&mdev->req_lock);
915 	rv = _drbd_set_state(_NS(mdev, conn, C_WF_REPORT_PARAMS), CS_VERBOSE, NULL);
916 	if (mdev->state.conn != C_WF_REPORT_PARAMS)
917 		clear_bit(STATE_SENT, &mdev->flags);
918 	spin_unlock_irq(&mdev->req_lock);
919 
920 	if (rv < SS_SUCCESS)
921 		return 0;
922 
923 	drbd_thread_start(&mdev->asender);
924 	mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
925 
926 	return 1;
927 
928 out_release_sockets:
929 	if (sock)
930 		sock_release(sock);
931 	if (msock)
932 		sock_release(msock);
933 	return -1;
934 }
935 
936 static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
937 {
938 	union p_header *h = &mdev->data.rbuf.header;
939 	int r;
940 
941 	r = drbd_recv(mdev, h, sizeof(*h));
942 	if (unlikely(r != sizeof(*h))) {
943 		if (!signal_pending(current))
944 			dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
945 		return false;
946 	}
947 
948 	if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
949 		*cmd = be16_to_cpu(h->h80.command);
950 		*packet_size = be16_to_cpu(h->h80.length);
951 	} else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
952 		*cmd = be16_to_cpu(h->h95.command);
953 		*packet_size = be32_to_cpu(h->h95.length);
954 	} else {
955 		dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
956 		    be32_to_cpu(h->h80.magic),
957 		    be16_to_cpu(h->h80.command),
958 		    be16_to_cpu(h->h80.length));
959 		return false;
960 	}
961 	mdev->last_received = jiffies;
962 
963 	return true;
964 }
965 
966 static void drbd_flush(struct drbd_conf *mdev)
967 {
968 	int rv;
969 
970 	if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
971 		rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
972 					NULL);
973 		if (rv) {
974 			dev_info(DEV, "local disk flush failed with status %d\n", rv);
975 			/* would rather check on EOPNOTSUPP, but that is not reliable.
976 			 * don't try again for ANY return value != 0
977 			 * if (rv == -EOPNOTSUPP) */
978 			drbd_bump_write_ordering(mdev, WO_drain_io);
979 		}
980 		put_ldev(mdev);
981 	}
982 }
983 
984 /**
985  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
986  * @mdev:	DRBD device.
987  * @epoch:	Epoch object.
988  * @ev:		Epoch event.
989  */
990 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
991 					       struct drbd_epoch *epoch,
992 					       enum epoch_event ev)
993 {
994 	int epoch_size;
995 	struct drbd_epoch *next_epoch;
996 	enum finish_epoch rv = FE_STILL_LIVE;
997 
998 	spin_lock(&mdev->epoch_lock);
999 	do {
1000 		next_epoch = NULL;
1001 
1002 		epoch_size = atomic_read(&epoch->epoch_size);
1003 
1004 		switch (ev & ~EV_CLEANUP) {
1005 		case EV_PUT:
1006 			atomic_dec(&epoch->active);
1007 			break;
1008 		case EV_GOT_BARRIER_NR:
1009 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1010 			break;
1011 		case EV_BECAME_LAST:
1012 			/* nothing to do*/
1013 			break;
1014 		}
1015 
1016 		if (epoch_size != 0 &&
1017 		    atomic_read(&epoch->active) == 0 &&
1018 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1019 			if (!(ev & EV_CLEANUP)) {
1020 				spin_unlock(&mdev->epoch_lock);
1021 				drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1022 				spin_lock(&mdev->epoch_lock);
1023 			}
1024 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1025 				dec_unacked(mdev);
1026 
1027 			if (mdev->current_epoch != epoch) {
1028 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1029 				list_del(&epoch->list);
1030 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1031 				mdev->epochs--;
1032 				kfree(epoch);
1033 
1034 				if (rv == FE_STILL_LIVE)
1035 					rv = FE_DESTROYED;
1036 			} else {
1037 				epoch->flags = 0;
1038 				atomic_set(&epoch->epoch_size, 0);
1039 				/* atomic_set(&epoch->active, 0); is already zero */
1040 				if (rv == FE_STILL_LIVE)
1041 					rv = FE_RECYCLED;
1042 				wake_up(&mdev->ee_wait);
1043 			}
1044 		}
1045 
1046 		if (!next_epoch)
1047 			break;
1048 
1049 		epoch = next_epoch;
1050 	} while (1);
1051 
1052 	spin_unlock(&mdev->epoch_lock);
1053 
1054 	return rv;
1055 }
1056 
1057 /**
1058  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1059  * @mdev:	DRBD device.
1060  * @wo:		Write ordering method to try.
1061  */
1062 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1063 {
1064 	enum write_ordering_e pwo;
1065 	static char *write_ordering_str[] = {
1066 		[WO_none] = "none",
1067 		[WO_drain_io] = "drain",
1068 		[WO_bdev_flush] = "flush",
1069 	};
1070 
1071 	pwo = mdev->write_ordering;
1072 	wo = min(pwo, wo);
1073 	if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1074 		wo = WO_drain_io;
1075 	if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1076 		wo = WO_none;
1077 	mdev->write_ordering = wo;
1078 	if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1079 		dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1080 }
1081 
1082 /**
1083  * drbd_submit_ee()
1084  * @mdev:	DRBD device.
1085  * @e:		epoch entry
1086  * @rw:		flag field, see bio->bi_rw
1087  *
1088  * May spread the pages to multiple bios,
1089  * depending on bio_add_page restrictions.
1090  *
1091  * Returns 0 if all bios have been submitted,
1092  * -ENOMEM if we could not allocate enough bios,
1093  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1094  *  single page to an empty bio (which should never happen and likely indicates
1095  *  that the lower level IO stack is in some way broken). This has been observed
1096  *  on certain Xen deployments.
1097  */
1098 /* TODO allocate from our own bio_set. */
1099 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1100 		const unsigned rw, const int fault_type)
1101 {
1102 	struct bio *bios = NULL;
1103 	struct bio *bio;
1104 	struct page *page = e->pages;
1105 	sector_t sector = e->sector;
1106 	unsigned ds = e->size;
1107 	unsigned n_bios = 0;
1108 	unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1109 	int err = -ENOMEM;
1110 
1111 	/* In most cases, we will only need one bio.  But in case the lower
1112 	 * level restrictions happen to be different at this offset on this
1113 	 * side than those of the sending peer, we may need to submit the
1114 	 * request in more than one bio.
1115 	 *
1116 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1117 	 * generated bio, but a bio allocated on behalf of the peer.
1118 	 */
1119 next_bio:
1120 	bio = bio_alloc(GFP_NOIO, nr_pages);
1121 	if (!bio) {
1122 		dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1123 		goto fail;
1124 	}
1125 	/* > e->sector, unless this is the first bio */
1126 	bio->bi_sector = sector;
1127 	bio->bi_bdev = mdev->ldev->backing_bdev;
1128 	bio->bi_rw = rw;
1129 	bio->bi_private = e;
1130 	bio->bi_end_io = drbd_endio_sec;
1131 
1132 	bio->bi_next = bios;
1133 	bios = bio;
1134 	++n_bios;
1135 
1136 	page_chain_for_each(page) {
1137 		unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1138 		if (!bio_add_page(bio, page, len, 0)) {
1139 			/* A single page must always be possible!
1140 			 * But in case it fails anyways,
1141 			 * we deal with it, and complain (below). */
1142 			if (bio->bi_vcnt == 0) {
1143 				dev_err(DEV,
1144 					"bio_add_page failed for len=%u, "
1145 					"bi_vcnt=0 (bi_sector=%llu)\n",
1146 					len, (unsigned long long)bio->bi_sector);
1147 				err = -ENOSPC;
1148 				goto fail;
1149 			}
1150 			goto next_bio;
1151 		}
1152 		ds -= len;
1153 		sector += len >> 9;
1154 		--nr_pages;
1155 	}
1156 	D_ASSERT(page == NULL);
1157 	D_ASSERT(ds == 0);
1158 
1159 	atomic_set(&e->pending_bios, n_bios);
1160 	do {
1161 		bio = bios;
1162 		bios = bios->bi_next;
1163 		bio->bi_next = NULL;
1164 
1165 		drbd_generic_make_request(mdev, fault_type, bio);
1166 	} while (bios);
1167 	return 0;
1168 
1169 fail:
1170 	while (bios) {
1171 		bio = bios;
1172 		bios = bios->bi_next;
1173 		bio_put(bio);
1174 	}
1175 	return err;
1176 }
1177 
1178 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1179 {
1180 	int rv;
1181 	struct p_barrier *p = &mdev->data.rbuf.barrier;
1182 	struct drbd_epoch *epoch;
1183 
1184 	inc_unacked(mdev);
1185 
1186 	mdev->current_epoch->barrier_nr = p->barrier;
1187 	rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1188 
1189 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1190 	 * the activity log, which means it would not be resynced in case the
1191 	 * R_PRIMARY crashes now.
1192 	 * Therefore we must send the barrier_ack after the barrier request was
1193 	 * completed. */
1194 	switch (mdev->write_ordering) {
1195 	case WO_none:
1196 		if (rv == FE_RECYCLED)
1197 			return true;
1198 
1199 		/* receiver context, in the writeout path of the other node.
1200 		 * avoid potential distributed deadlock */
1201 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1202 		if (epoch)
1203 			break;
1204 		else
1205 			dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1206 			/* Fall through */
1207 
1208 	case WO_bdev_flush:
1209 	case WO_drain_io:
1210 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1211 		drbd_flush(mdev);
1212 
1213 		if (atomic_read(&mdev->current_epoch->epoch_size)) {
1214 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1215 			if (epoch)
1216 				break;
1217 		}
1218 
1219 		epoch = mdev->current_epoch;
1220 		wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1221 
1222 		D_ASSERT(atomic_read(&epoch->active) == 0);
1223 		D_ASSERT(epoch->flags == 0);
1224 
1225 		return true;
1226 	default:
1227 		dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1228 		return false;
1229 	}
1230 
1231 	epoch->flags = 0;
1232 	atomic_set(&epoch->epoch_size, 0);
1233 	atomic_set(&epoch->active, 0);
1234 
1235 	spin_lock(&mdev->epoch_lock);
1236 	if (atomic_read(&mdev->current_epoch->epoch_size)) {
1237 		list_add(&epoch->list, &mdev->current_epoch->list);
1238 		mdev->current_epoch = epoch;
1239 		mdev->epochs++;
1240 	} else {
1241 		/* The current_epoch got recycled while we allocated this one... */
1242 		kfree(epoch);
1243 	}
1244 	spin_unlock(&mdev->epoch_lock);
1245 
1246 	return true;
1247 }
1248 
1249 /* used from receive_RSDataReply (recv_resync_read)
1250  * and from receive_Data */
1251 static struct drbd_epoch_entry *
1252 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1253 {
1254 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1255 	struct drbd_epoch_entry *e;
1256 	struct page *page;
1257 	int dgs, ds, rr;
1258 	void *dig_in = mdev->int_dig_in;
1259 	void *dig_vv = mdev->int_dig_vv;
1260 	unsigned long *data;
1261 
1262 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1263 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1264 
1265 	if (dgs) {
1266 		rr = drbd_recv(mdev, dig_in, dgs);
1267 		if (rr != dgs) {
1268 			if (!signal_pending(current))
1269 				dev_warn(DEV,
1270 					"short read receiving data digest: read %d expected %d\n",
1271 					rr, dgs);
1272 			return NULL;
1273 		}
1274 	}
1275 
1276 	data_size -= dgs;
1277 
1278 	ERR_IF(data_size &  0x1ff) return NULL;
1279 	ERR_IF(data_size >  DRBD_MAX_BIO_SIZE) return NULL;
1280 
1281 	/* even though we trust out peer,
1282 	 * we sometimes have to double check. */
1283 	if (sector + (data_size>>9) > capacity) {
1284 		dev_err(DEV, "request from peer beyond end of local disk: "
1285 			"capacity: %llus < sector: %llus + size: %u\n",
1286 			(unsigned long long)capacity,
1287 			(unsigned long long)sector, data_size);
1288 		return NULL;
1289 	}
1290 
1291 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1292 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1293 	 * which in turn might block on the other node at this very place.  */
1294 	e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1295 	if (!e)
1296 		return NULL;
1297 
1298 	if (!data_size)
1299 		return e;
1300 
1301 	ds = data_size;
1302 	page = e->pages;
1303 	page_chain_for_each(page) {
1304 		unsigned len = min_t(int, ds, PAGE_SIZE);
1305 		data = kmap(page);
1306 		rr = drbd_recv(mdev, data, len);
1307 		if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1308 			dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1309 			data[0] = data[0] ^ (unsigned long)-1;
1310 		}
1311 		kunmap(page);
1312 		if (rr != len) {
1313 			drbd_free_ee(mdev, e);
1314 			if (!signal_pending(current))
1315 				dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1316 				rr, len);
1317 			return NULL;
1318 		}
1319 		ds -= rr;
1320 	}
1321 
1322 	if (dgs) {
1323 		drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1324 		if (memcmp(dig_in, dig_vv, dgs)) {
1325 			dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1326 				(unsigned long long)sector, data_size);
1327 			drbd_bcast_ee(mdev, "digest failed",
1328 					dgs, dig_in, dig_vv, e);
1329 			drbd_free_ee(mdev, e);
1330 			return NULL;
1331 		}
1332 	}
1333 	mdev->recv_cnt += data_size>>9;
1334 	return e;
1335 }
1336 
1337 /* drbd_drain_block() just takes a data block
1338  * out of the socket input buffer, and discards it.
1339  */
1340 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1341 {
1342 	struct page *page;
1343 	int rr, rv = 1;
1344 	void *data;
1345 
1346 	if (!data_size)
1347 		return true;
1348 
1349 	page = drbd_pp_alloc(mdev, 1, 1);
1350 
1351 	data = kmap(page);
1352 	while (data_size) {
1353 		rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1354 		if (rr != min_t(int, data_size, PAGE_SIZE)) {
1355 			rv = 0;
1356 			if (!signal_pending(current))
1357 				dev_warn(DEV,
1358 					"short read receiving data: read %d expected %d\n",
1359 					rr, min_t(int, data_size, PAGE_SIZE));
1360 			break;
1361 		}
1362 		data_size -= rr;
1363 	}
1364 	kunmap(page);
1365 	drbd_pp_free(mdev, page, 0);
1366 	return rv;
1367 }
1368 
1369 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1370 			   sector_t sector, int data_size)
1371 {
1372 	struct bio_vec *bvec;
1373 	struct bio *bio;
1374 	int dgs, rr, i, expect;
1375 	void *dig_in = mdev->int_dig_in;
1376 	void *dig_vv = mdev->int_dig_vv;
1377 
1378 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1379 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1380 
1381 	if (dgs) {
1382 		rr = drbd_recv(mdev, dig_in, dgs);
1383 		if (rr != dgs) {
1384 			if (!signal_pending(current))
1385 				dev_warn(DEV,
1386 					"short read receiving data reply digest: read %d expected %d\n",
1387 					rr, dgs);
1388 			return 0;
1389 		}
1390 	}
1391 
1392 	data_size -= dgs;
1393 
1394 	/* optimistically update recv_cnt.  if receiving fails below,
1395 	 * we disconnect anyways, and counters will be reset. */
1396 	mdev->recv_cnt += data_size>>9;
1397 
1398 	bio = req->master_bio;
1399 	D_ASSERT(sector == bio->bi_sector);
1400 
1401 	bio_for_each_segment(bvec, bio, i) {
1402 		expect = min_t(int, data_size, bvec->bv_len);
1403 		rr = drbd_recv(mdev,
1404 			     kmap(bvec->bv_page)+bvec->bv_offset,
1405 			     expect);
1406 		kunmap(bvec->bv_page);
1407 		if (rr != expect) {
1408 			if (!signal_pending(current))
1409 				dev_warn(DEV, "short read receiving data reply: "
1410 					"read %d expected %d\n",
1411 					rr, expect);
1412 			return 0;
1413 		}
1414 		data_size -= rr;
1415 	}
1416 
1417 	if (dgs) {
1418 		drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1419 		if (memcmp(dig_in, dig_vv, dgs)) {
1420 			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1421 			return 0;
1422 		}
1423 	}
1424 
1425 	D_ASSERT(data_size == 0);
1426 	return 1;
1427 }
1428 
1429 /* e_end_resync_block() is called via
1430  * drbd_process_done_ee() by asender only */
1431 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1432 {
1433 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1434 	sector_t sector = e->sector;
1435 	int ok;
1436 
1437 	D_ASSERT(hlist_unhashed(&e->collision));
1438 
1439 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1440 		drbd_set_in_sync(mdev, sector, e->size);
1441 		ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1442 	} else {
1443 		/* Record failure to sync */
1444 		drbd_rs_failed_io(mdev, sector, e->size);
1445 
1446 		ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1447 	}
1448 	dec_unacked(mdev);
1449 
1450 	return ok;
1451 }
1452 
1453 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1454 {
1455 	struct drbd_epoch_entry *e;
1456 
1457 	e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1458 	if (!e)
1459 		goto fail;
1460 
1461 	dec_rs_pending(mdev);
1462 
1463 	inc_unacked(mdev);
1464 	/* corresponding dec_unacked() in e_end_resync_block()
1465 	 * respective _drbd_clear_done_ee */
1466 
1467 	e->w.cb = e_end_resync_block;
1468 
1469 	spin_lock_irq(&mdev->req_lock);
1470 	list_add(&e->w.list, &mdev->sync_ee);
1471 	spin_unlock_irq(&mdev->req_lock);
1472 
1473 	atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1474 	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1475 		return true;
1476 
1477 	/* don't care for the reason here */
1478 	dev_err(DEV, "submit failed, triggering re-connect\n");
1479 	spin_lock_irq(&mdev->req_lock);
1480 	list_del(&e->w.list);
1481 	spin_unlock_irq(&mdev->req_lock);
1482 
1483 	drbd_free_ee(mdev, e);
1484 fail:
1485 	put_ldev(mdev);
1486 	return false;
1487 }
1488 
1489 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1490 {
1491 	struct drbd_request *req;
1492 	sector_t sector;
1493 	int ok;
1494 	struct p_data *p = &mdev->data.rbuf.data;
1495 
1496 	sector = be64_to_cpu(p->sector);
1497 
1498 	spin_lock_irq(&mdev->req_lock);
1499 	req = _ar_id_to_req(mdev, p->block_id, sector);
1500 	spin_unlock_irq(&mdev->req_lock);
1501 	if (unlikely(!req)) {
1502 		dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1503 		return false;
1504 	}
1505 
1506 	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1507 	 * special casing it there for the various failure cases.
1508 	 * still no race with drbd_fail_pending_reads */
1509 	ok = recv_dless_read(mdev, req, sector, data_size);
1510 
1511 	if (ok)
1512 		req_mod(req, data_received);
1513 	/* else: nothing. handled from drbd_disconnect...
1514 	 * I don't think we may complete this just yet
1515 	 * in case we are "on-disconnect: freeze" */
1516 
1517 	return ok;
1518 }
1519 
1520 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1521 {
1522 	sector_t sector;
1523 	int ok;
1524 	struct p_data *p = &mdev->data.rbuf.data;
1525 
1526 	sector = be64_to_cpu(p->sector);
1527 	D_ASSERT(p->block_id == ID_SYNCER);
1528 
1529 	if (get_ldev(mdev)) {
1530 		/* data is submitted to disk within recv_resync_read.
1531 		 * corresponding put_ldev done below on error,
1532 		 * or in drbd_endio_write_sec. */
1533 		ok = recv_resync_read(mdev, sector, data_size);
1534 	} else {
1535 		if (__ratelimit(&drbd_ratelimit_state))
1536 			dev_err(DEV, "Can not write resync data to local disk.\n");
1537 
1538 		ok = drbd_drain_block(mdev, data_size);
1539 
1540 		drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1541 	}
1542 
1543 	atomic_add(data_size >> 9, &mdev->rs_sect_in);
1544 
1545 	return ok;
1546 }
1547 
1548 /* e_end_block() is called via drbd_process_done_ee().
1549  * this means this function only runs in the asender thread
1550  */
1551 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1552 {
1553 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1554 	sector_t sector = e->sector;
1555 	int ok = 1, pcmd;
1556 
1557 	if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1558 		if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1559 			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1560 				mdev->state.conn <= C_PAUSED_SYNC_T &&
1561 				e->flags & EE_MAY_SET_IN_SYNC) ?
1562 				P_RS_WRITE_ACK : P_WRITE_ACK;
1563 			ok &= drbd_send_ack(mdev, pcmd, e);
1564 			if (pcmd == P_RS_WRITE_ACK)
1565 				drbd_set_in_sync(mdev, sector, e->size);
1566 		} else {
1567 			ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1568 			/* we expect it to be marked out of sync anyways...
1569 			 * maybe assert this?  */
1570 		}
1571 		dec_unacked(mdev);
1572 	}
1573 	/* we delete from the conflict detection hash _after_ we sent out the
1574 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1575 	if (mdev->net_conf->two_primaries) {
1576 		spin_lock_irq(&mdev->req_lock);
1577 		D_ASSERT(!hlist_unhashed(&e->collision));
1578 		hlist_del_init(&e->collision);
1579 		spin_unlock_irq(&mdev->req_lock);
1580 	} else {
1581 		D_ASSERT(hlist_unhashed(&e->collision));
1582 	}
1583 
1584 	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1585 
1586 	return ok;
1587 }
1588 
1589 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1590 {
1591 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1592 	int ok = 1;
1593 
1594 	D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1595 	ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1596 
1597 	spin_lock_irq(&mdev->req_lock);
1598 	D_ASSERT(!hlist_unhashed(&e->collision));
1599 	hlist_del_init(&e->collision);
1600 	spin_unlock_irq(&mdev->req_lock);
1601 
1602 	dec_unacked(mdev);
1603 
1604 	return ok;
1605 }
1606 
1607 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_epoch_entry *data_e)
1608 {
1609 
1610 	struct drbd_epoch_entry *rs_e;
1611 	bool rv = 0;
1612 
1613 	spin_lock_irq(&mdev->req_lock);
1614 	list_for_each_entry(rs_e, &mdev->sync_ee, w.list) {
1615 		if (overlaps(data_e->sector, data_e->size, rs_e->sector, rs_e->size)) {
1616 			rv = 1;
1617 			break;
1618 		}
1619 	}
1620 	spin_unlock_irq(&mdev->req_lock);
1621 
1622 	return rv;
1623 }
1624 
1625 /* Called from receive_Data.
1626  * Synchronize packets on sock with packets on msock.
1627  *
1628  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1629  * packet traveling on msock, they are still processed in the order they have
1630  * been sent.
1631  *
1632  * Note: we don't care for Ack packets overtaking P_DATA packets.
1633  *
1634  * In case packet_seq is larger than mdev->peer_seq number, there are
1635  * outstanding packets on the msock. We wait for them to arrive.
1636  * In case we are the logically next packet, we update mdev->peer_seq
1637  * ourselves. Correctly handles 32bit wrap around.
1638  *
1639  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1640  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1641  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1642  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1643  *
1644  * returns 0 if we may process the packet,
1645  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1646 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1647 {
1648 	DEFINE_WAIT(wait);
1649 	unsigned int p_seq;
1650 	long timeout;
1651 	int ret = 0;
1652 	spin_lock(&mdev->peer_seq_lock);
1653 	for (;;) {
1654 		prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1655 		if (seq_le(packet_seq, mdev->peer_seq+1))
1656 			break;
1657 		if (signal_pending(current)) {
1658 			ret = -ERESTARTSYS;
1659 			break;
1660 		}
1661 		p_seq = mdev->peer_seq;
1662 		spin_unlock(&mdev->peer_seq_lock);
1663 		timeout = schedule_timeout(30*HZ);
1664 		spin_lock(&mdev->peer_seq_lock);
1665 		if (timeout == 0 && p_seq == mdev->peer_seq) {
1666 			ret = -ETIMEDOUT;
1667 			dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1668 			break;
1669 		}
1670 	}
1671 	finish_wait(&mdev->seq_wait, &wait);
1672 	if (mdev->peer_seq+1 == packet_seq)
1673 		mdev->peer_seq++;
1674 	spin_unlock(&mdev->peer_seq_lock);
1675 	return ret;
1676 }
1677 
1678 /* see also bio_flags_to_wire()
1679  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1680  * flags and back. We may replicate to other kernel versions. */
1681 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1682 {
1683 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1684 		(dpf & DP_FUA ? REQ_FUA : 0) |
1685 		(dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1686 		(dpf & DP_DISCARD ? REQ_DISCARD : 0);
1687 }
1688 
1689 /* mirrored write */
1690 static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1691 {
1692 	sector_t sector;
1693 	struct drbd_epoch_entry *e;
1694 	struct p_data *p = &mdev->data.rbuf.data;
1695 	int rw = WRITE;
1696 	u32 dp_flags;
1697 
1698 	if (!get_ldev(mdev)) {
1699 		spin_lock(&mdev->peer_seq_lock);
1700 		if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1701 			mdev->peer_seq++;
1702 		spin_unlock(&mdev->peer_seq_lock);
1703 
1704 		drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1705 		atomic_inc(&mdev->current_epoch->epoch_size);
1706 		return drbd_drain_block(mdev, data_size);
1707 	}
1708 
1709 	/* get_ldev(mdev) successful.
1710 	 * Corresponding put_ldev done either below (on various errors),
1711 	 * or in drbd_endio_write_sec, if we successfully submit the data at
1712 	 * the end of this function. */
1713 
1714 	sector = be64_to_cpu(p->sector);
1715 	e = read_in_block(mdev, p->block_id, sector, data_size);
1716 	if (!e) {
1717 		put_ldev(mdev);
1718 		return false;
1719 	}
1720 
1721 	e->w.cb = e_end_block;
1722 
1723 	dp_flags = be32_to_cpu(p->dp_flags);
1724 	rw |= wire_flags_to_bio(mdev, dp_flags);
1725 	if (e->pages == NULL) {
1726 		D_ASSERT(e->size == 0);
1727 		D_ASSERT(dp_flags & DP_FLUSH);
1728 	}
1729 
1730 	if (dp_flags & DP_MAY_SET_IN_SYNC)
1731 		e->flags |= EE_MAY_SET_IN_SYNC;
1732 
1733 	spin_lock(&mdev->epoch_lock);
1734 	e->epoch = mdev->current_epoch;
1735 	atomic_inc(&e->epoch->epoch_size);
1736 	atomic_inc(&e->epoch->active);
1737 	spin_unlock(&mdev->epoch_lock);
1738 
1739 	/* I'm the receiver, I do hold a net_cnt reference. */
1740 	if (!mdev->net_conf->two_primaries) {
1741 		spin_lock_irq(&mdev->req_lock);
1742 	} else {
1743 		/* don't get the req_lock yet,
1744 		 * we may sleep in drbd_wait_peer_seq */
1745 		const int size = e->size;
1746 		const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1747 		DEFINE_WAIT(wait);
1748 		struct drbd_request *i;
1749 		struct hlist_node *n;
1750 		struct hlist_head *slot;
1751 		int first;
1752 
1753 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1754 		BUG_ON(mdev->ee_hash == NULL);
1755 		BUG_ON(mdev->tl_hash == NULL);
1756 
1757 		/* conflict detection and handling:
1758 		 * 1. wait on the sequence number,
1759 		 *    in case this data packet overtook ACK packets.
1760 		 * 2. check our hash tables for conflicting requests.
1761 		 *    we only need to walk the tl_hash, since an ee can not
1762 		 *    have a conflict with an other ee: on the submitting
1763 		 *    node, the corresponding req had already been conflicting,
1764 		 *    and a conflicting req is never sent.
1765 		 *
1766 		 * Note: for two_primaries, we are protocol C,
1767 		 * so there cannot be any request that is DONE
1768 		 * but still on the transfer log.
1769 		 *
1770 		 * unconditionally add to the ee_hash.
1771 		 *
1772 		 * if no conflicting request is found:
1773 		 *    submit.
1774 		 *
1775 		 * if any conflicting request is found
1776 		 * that has not yet been acked,
1777 		 * AND I have the "discard concurrent writes" flag:
1778 		 *	 queue (via done_ee) the P_DISCARD_ACK; OUT.
1779 		 *
1780 		 * if any conflicting request is found:
1781 		 *	 block the receiver, waiting on misc_wait
1782 		 *	 until no more conflicting requests are there,
1783 		 *	 or we get interrupted (disconnect).
1784 		 *
1785 		 *	 we do not just write after local io completion of those
1786 		 *	 requests, but only after req is done completely, i.e.
1787 		 *	 we wait for the P_DISCARD_ACK to arrive!
1788 		 *
1789 		 *	 then proceed normally, i.e. submit.
1790 		 */
1791 		if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1792 			goto out_interrupted;
1793 
1794 		spin_lock_irq(&mdev->req_lock);
1795 
1796 		hlist_add_head(&e->collision, ee_hash_slot(mdev, sector));
1797 
1798 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1799 		slot = tl_hash_slot(mdev, sector);
1800 		first = 1;
1801 		for (;;) {
1802 			int have_unacked = 0;
1803 			int have_conflict = 0;
1804 			prepare_to_wait(&mdev->misc_wait, &wait,
1805 				TASK_INTERRUPTIBLE);
1806 			hlist_for_each_entry(i, n, slot, collision) {
1807 				if (OVERLAPS) {
1808 					/* only ALERT on first iteration,
1809 					 * we may be woken up early... */
1810 					if (first)
1811 						dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1812 						      "	new: %llus +%u; pending: %llus +%u\n",
1813 						      current->comm, current->pid,
1814 						      (unsigned long long)sector, size,
1815 						      (unsigned long long)i->sector, i->size);
1816 					if (i->rq_state & RQ_NET_PENDING)
1817 						++have_unacked;
1818 					++have_conflict;
1819 				}
1820 			}
1821 #undef OVERLAPS
1822 			if (!have_conflict)
1823 				break;
1824 
1825 			/* Discard Ack only for the _first_ iteration */
1826 			if (first && discard && have_unacked) {
1827 				dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1828 				     (unsigned long long)sector);
1829 				inc_unacked(mdev);
1830 				e->w.cb = e_send_discard_ack;
1831 				list_add_tail(&e->w.list, &mdev->done_ee);
1832 
1833 				spin_unlock_irq(&mdev->req_lock);
1834 
1835 				/* we could probably send that P_DISCARD_ACK ourselves,
1836 				 * but I don't like the receiver using the msock */
1837 
1838 				put_ldev(mdev);
1839 				wake_asender(mdev);
1840 				finish_wait(&mdev->misc_wait, &wait);
1841 				return true;
1842 			}
1843 
1844 			if (signal_pending(current)) {
1845 				hlist_del_init(&e->collision);
1846 
1847 				spin_unlock_irq(&mdev->req_lock);
1848 
1849 				finish_wait(&mdev->misc_wait, &wait);
1850 				goto out_interrupted;
1851 			}
1852 
1853 			spin_unlock_irq(&mdev->req_lock);
1854 			if (first) {
1855 				first = 0;
1856 				dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1857 				     "sec=%llus\n", (unsigned long long)sector);
1858 			} else if (discard) {
1859 				/* we had none on the first iteration.
1860 				 * there must be none now. */
1861 				D_ASSERT(have_unacked == 0);
1862 			}
1863 			schedule();
1864 			spin_lock_irq(&mdev->req_lock);
1865 		}
1866 		finish_wait(&mdev->misc_wait, &wait);
1867 	}
1868 
1869 	list_add(&e->w.list, &mdev->active_ee);
1870 	spin_unlock_irq(&mdev->req_lock);
1871 
1872 	if (mdev->state.conn == C_SYNC_TARGET)
1873 		wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, e));
1874 
1875 	switch (mdev->net_conf->wire_protocol) {
1876 	case DRBD_PROT_C:
1877 		inc_unacked(mdev);
1878 		/* corresponding dec_unacked() in e_end_block()
1879 		 * respective _drbd_clear_done_ee */
1880 		break;
1881 	case DRBD_PROT_B:
1882 		/* I really don't like it that the receiver thread
1883 		 * sends on the msock, but anyways */
1884 		drbd_send_ack(mdev, P_RECV_ACK, e);
1885 		break;
1886 	case DRBD_PROT_A:
1887 		/* nothing to do */
1888 		break;
1889 	}
1890 
1891 	if (mdev->state.pdsk < D_INCONSISTENT) {
1892 		/* In case we have the only disk of the cluster, */
1893 		drbd_set_out_of_sync(mdev, e->sector, e->size);
1894 		e->flags |= EE_CALL_AL_COMPLETE_IO;
1895 		e->flags &= ~EE_MAY_SET_IN_SYNC;
1896 		drbd_al_begin_io(mdev, e->sector);
1897 	}
1898 
1899 	if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1900 		return true;
1901 
1902 	/* don't care for the reason here */
1903 	dev_err(DEV, "submit failed, triggering re-connect\n");
1904 	spin_lock_irq(&mdev->req_lock);
1905 	list_del(&e->w.list);
1906 	hlist_del_init(&e->collision);
1907 	spin_unlock_irq(&mdev->req_lock);
1908 	if (e->flags & EE_CALL_AL_COMPLETE_IO)
1909 		drbd_al_complete_io(mdev, e->sector);
1910 
1911 out_interrupted:
1912 	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
1913 	put_ldev(mdev);
1914 	drbd_free_ee(mdev, e);
1915 	return false;
1916 }
1917 
1918 /* We may throttle resync, if the lower device seems to be busy,
1919  * and current sync rate is above c_min_rate.
1920  *
1921  * To decide whether or not the lower device is busy, we use a scheme similar
1922  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1923  * (more than 64 sectors) of activity we cannot account for with our own resync
1924  * activity, it obviously is "busy".
1925  *
1926  * The current sync rate used here uses only the most recent two step marks,
1927  * to have a short time average so we can react faster.
1928  */
1929 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
1930 {
1931 	struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1932 	unsigned long db, dt, dbdt;
1933 	struct lc_element *tmp;
1934 	int curr_events;
1935 	int throttle = 0;
1936 
1937 	/* feature disabled? */
1938 	if (mdev->sync_conf.c_min_rate == 0)
1939 		return 0;
1940 
1941 	spin_lock_irq(&mdev->al_lock);
1942 	tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1943 	if (tmp) {
1944 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1945 		if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1946 			spin_unlock_irq(&mdev->al_lock);
1947 			return 0;
1948 		}
1949 		/* Do not slow down if app IO is already waiting for this extent */
1950 	}
1951 	spin_unlock_irq(&mdev->al_lock);
1952 
1953 	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1954 		      (int)part_stat_read(&disk->part0, sectors[1]) -
1955 			atomic_read(&mdev->rs_sect_ev);
1956 
1957 	if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1958 		unsigned long rs_left;
1959 		int i;
1960 
1961 		mdev->rs_last_events = curr_events;
1962 
1963 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1964 		 * approx. */
1965 		i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1966 
1967 		if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1968 			rs_left = mdev->ov_left;
1969 		else
1970 			rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1971 
1972 		dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1973 		if (!dt)
1974 			dt++;
1975 		db = mdev->rs_mark_left[i] - rs_left;
1976 		dbdt = Bit2KB(db/dt);
1977 
1978 		if (dbdt > mdev->sync_conf.c_min_rate)
1979 			throttle = 1;
1980 	}
1981 	return throttle;
1982 }
1983 
1984 
1985 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1986 {
1987 	sector_t sector;
1988 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1989 	struct drbd_epoch_entry *e;
1990 	struct digest_info *di = NULL;
1991 	int size, verb;
1992 	unsigned int fault_type;
1993 	struct p_block_req *p =	&mdev->data.rbuf.block_req;
1994 
1995 	sector = be64_to_cpu(p->sector);
1996 	size   = be32_to_cpu(p->blksize);
1997 
1998 	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
1999 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2000 				(unsigned long long)sector, size);
2001 		return false;
2002 	}
2003 	if (sector + (size>>9) > capacity) {
2004 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2005 				(unsigned long long)sector, size);
2006 		return false;
2007 	}
2008 
2009 	if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2010 		verb = 1;
2011 		switch (cmd) {
2012 		case P_DATA_REQUEST:
2013 			drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2014 			break;
2015 		case P_RS_DATA_REQUEST:
2016 		case P_CSUM_RS_REQUEST:
2017 		case P_OV_REQUEST:
2018 			drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2019 			break;
2020 		case P_OV_REPLY:
2021 			verb = 0;
2022 			dec_rs_pending(mdev);
2023 			drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2024 			break;
2025 		default:
2026 			dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2027 				cmdname(cmd));
2028 		}
2029 		if (verb && __ratelimit(&drbd_ratelimit_state))
2030 			dev_err(DEV, "Can not satisfy peer's read request, "
2031 			    "no local data.\n");
2032 
2033 		/* drain possibly payload */
2034 		return drbd_drain_block(mdev, digest_size);
2035 	}
2036 
2037 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2038 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2039 	 * which in turn might block on the other node at this very place.  */
2040 	e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2041 	if (!e) {
2042 		put_ldev(mdev);
2043 		return false;
2044 	}
2045 
2046 	switch (cmd) {
2047 	case P_DATA_REQUEST:
2048 		e->w.cb = w_e_end_data_req;
2049 		fault_type = DRBD_FAULT_DT_RD;
2050 		/* application IO, don't drbd_rs_begin_io */
2051 		goto submit;
2052 
2053 	case P_RS_DATA_REQUEST:
2054 		e->w.cb = w_e_end_rsdata_req;
2055 		fault_type = DRBD_FAULT_RS_RD;
2056 		/* used in the sector offset progress display */
2057 		mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2058 		break;
2059 
2060 	case P_OV_REPLY:
2061 	case P_CSUM_RS_REQUEST:
2062 		fault_type = DRBD_FAULT_RS_RD;
2063 		di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2064 		if (!di)
2065 			goto out_free_e;
2066 
2067 		di->digest_size = digest_size;
2068 		di->digest = (((char *)di)+sizeof(struct digest_info));
2069 
2070 		e->digest = di;
2071 		e->flags |= EE_HAS_DIGEST;
2072 
2073 		if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2074 			goto out_free_e;
2075 
2076 		if (cmd == P_CSUM_RS_REQUEST) {
2077 			D_ASSERT(mdev->agreed_pro_version >= 89);
2078 			e->w.cb = w_e_end_csum_rs_req;
2079 			/* used in the sector offset progress display */
2080 			mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2081 		} else if (cmd == P_OV_REPLY) {
2082 			/* track progress, we may need to throttle */
2083 			atomic_add(size >> 9, &mdev->rs_sect_in);
2084 			e->w.cb = w_e_end_ov_reply;
2085 			dec_rs_pending(mdev);
2086 			/* drbd_rs_begin_io done when we sent this request,
2087 			 * but accounting still needs to be done. */
2088 			goto submit_for_resync;
2089 		}
2090 		break;
2091 
2092 	case P_OV_REQUEST:
2093 		if (mdev->ov_start_sector == ~(sector_t)0 &&
2094 		    mdev->agreed_pro_version >= 90) {
2095 			unsigned long now = jiffies;
2096 			int i;
2097 			mdev->ov_start_sector = sector;
2098 			mdev->ov_position = sector;
2099 			mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2100 			mdev->rs_total = mdev->ov_left;
2101 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2102 				mdev->rs_mark_left[i] = mdev->ov_left;
2103 				mdev->rs_mark_time[i] = now;
2104 			}
2105 			dev_info(DEV, "Online Verify start sector: %llu\n",
2106 					(unsigned long long)sector);
2107 		}
2108 		e->w.cb = w_e_end_ov_req;
2109 		fault_type = DRBD_FAULT_RS_RD;
2110 		break;
2111 
2112 	default:
2113 		dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2114 		    cmdname(cmd));
2115 		fault_type = DRBD_FAULT_MAX;
2116 		goto out_free_e;
2117 	}
2118 
2119 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2120 	 * wrt the receiver, but it is not as straightforward as it may seem.
2121 	 * Various places in the resync start and stop logic assume resync
2122 	 * requests are processed in order, requeuing this on the worker thread
2123 	 * introduces a bunch of new code for synchronization between threads.
2124 	 *
2125 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2126 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2127 	 * for application writes for the same time.  For now, just throttle
2128 	 * here, where the rest of the code expects the receiver to sleep for
2129 	 * a while, anyways.
2130 	 */
2131 
2132 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2133 	 * this defers syncer requests for some time, before letting at least
2134 	 * on request through.  The resync controller on the receiving side
2135 	 * will adapt to the incoming rate accordingly.
2136 	 *
2137 	 * We cannot throttle here if remote is Primary/SyncTarget:
2138 	 * we would also throttle its application reads.
2139 	 * In that case, throttling is done on the SyncTarget only.
2140 	 */
2141 	if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2142 		schedule_timeout_uninterruptible(HZ/10);
2143 	if (drbd_rs_begin_io(mdev, sector))
2144 		goto out_free_e;
2145 
2146 submit_for_resync:
2147 	atomic_add(size >> 9, &mdev->rs_sect_ev);
2148 
2149 submit:
2150 	inc_unacked(mdev);
2151 	spin_lock_irq(&mdev->req_lock);
2152 	list_add_tail(&e->w.list, &mdev->read_ee);
2153 	spin_unlock_irq(&mdev->req_lock);
2154 
2155 	if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2156 		return true;
2157 
2158 	/* don't care for the reason here */
2159 	dev_err(DEV, "submit failed, triggering re-connect\n");
2160 	spin_lock_irq(&mdev->req_lock);
2161 	list_del(&e->w.list);
2162 	spin_unlock_irq(&mdev->req_lock);
2163 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2164 
2165 out_free_e:
2166 	put_ldev(mdev);
2167 	drbd_free_ee(mdev, e);
2168 	return false;
2169 }
2170 
2171 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2172 {
2173 	int self, peer, rv = -100;
2174 	unsigned long ch_self, ch_peer;
2175 
2176 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2177 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2178 
2179 	ch_peer = mdev->p_uuid[UI_SIZE];
2180 	ch_self = mdev->comm_bm_set;
2181 
2182 	switch (mdev->net_conf->after_sb_0p) {
2183 	case ASB_CONSENSUS:
2184 	case ASB_DISCARD_SECONDARY:
2185 	case ASB_CALL_HELPER:
2186 		dev_err(DEV, "Configuration error.\n");
2187 		break;
2188 	case ASB_DISCONNECT:
2189 		break;
2190 	case ASB_DISCARD_YOUNGER_PRI:
2191 		if (self == 0 && peer == 1) {
2192 			rv = -1;
2193 			break;
2194 		}
2195 		if (self == 1 && peer == 0) {
2196 			rv =  1;
2197 			break;
2198 		}
2199 		/* Else fall through to one of the other strategies... */
2200 	case ASB_DISCARD_OLDER_PRI:
2201 		if (self == 0 && peer == 1) {
2202 			rv = 1;
2203 			break;
2204 		}
2205 		if (self == 1 && peer == 0) {
2206 			rv = -1;
2207 			break;
2208 		}
2209 		/* Else fall through to one of the other strategies... */
2210 		dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2211 		     "Using discard-least-changes instead\n");
2212 	case ASB_DISCARD_ZERO_CHG:
2213 		if (ch_peer == 0 && ch_self == 0) {
2214 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2215 				? -1 : 1;
2216 			break;
2217 		} else {
2218 			if (ch_peer == 0) { rv =  1; break; }
2219 			if (ch_self == 0) { rv = -1; break; }
2220 		}
2221 		if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2222 			break;
2223 	case ASB_DISCARD_LEAST_CHG:
2224 		if	(ch_self < ch_peer)
2225 			rv = -1;
2226 		else if (ch_self > ch_peer)
2227 			rv =  1;
2228 		else /* ( ch_self == ch_peer ) */
2229 		     /* Well, then use something else. */
2230 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2231 				? -1 : 1;
2232 		break;
2233 	case ASB_DISCARD_LOCAL:
2234 		rv = -1;
2235 		break;
2236 	case ASB_DISCARD_REMOTE:
2237 		rv =  1;
2238 	}
2239 
2240 	return rv;
2241 }
2242 
2243 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2244 {
2245 	int hg, rv = -100;
2246 
2247 	switch (mdev->net_conf->after_sb_1p) {
2248 	case ASB_DISCARD_YOUNGER_PRI:
2249 	case ASB_DISCARD_OLDER_PRI:
2250 	case ASB_DISCARD_LEAST_CHG:
2251 	case ASB_DISCARD_LOCAL:
2252 	case ASB_DISCARD_REMOTE:
2253 		dev_err(DEV, "Configuration error.\n");
2254 		break;
2255 	case ASB_DISCONNECT:
2256 		break;
2257 	case ASB_CONSENSUS:
2258 		hg = drbd_asb_recover_0p(mdev);
2259 		if (hg == -1 && mdev->state.role == R_SECONDARY)
2260 			rv = hg;
2261 		if (hg == 1  && mdev->state.role == R_PRIMARY)
2262 			rv = hg;
2263 		break;
2264 	case ASB_VIOLENTLY:
2265 		rv = drbd_asb_recover_0p(mdev);
2266 		break;
2267 	case ASB_DISCARD_SECONDARY:
2268 		return mdev->state.role == R_PRIMARY ? 1 : -1;
2269 	case ASB_CALL_HELPER:
2270 		hg = drbd_asb_recover_0p(mdev);
2271 		if (hg == -1 && mdev->state.role == R_PRIMARY) {
2272 			enum drbd_state_rv rv2;
2273 
2274 			drbd_set_role(mdev, R_SECONDARY, 0);
2275 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2276 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2277 			  * we do not need to wait for the after state change work either. */
2278 			rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2279 			if (rv2 != SS_SUCCESS) {
2280 				drbd_khelper(mdev, "pri-lost-after-sb");
2281 			} else {
2282 				dev_warn(DEV, "Successfully gave up primary role.\n");
2283 				rv = hg;
2284 			}
2285 		} else
2286 			rv = hg;
2287 	}
2288 
2289 	return rv;
2290 }
2291 
2292 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2293 {
2294 	int hg, rv = -100;
2295 
2296 	switch (mdev->net_conf->after_sb_2p) {
2297 	case ASB_DISCARD_YOUNGER_PRI:
2298 	case ASB_DISCARD_OLDER_PRI:
2299 	case ASB_DISCARD_LEAST_CHG:
2300 	case ASB_DISCARD_LOCAL:
2301 	case ASB_DISCARD_REMOTE:
2302 	case ASB_CONSENSUS:
2303 	case ASB_DISCARD_SECONDARY:
2304 		dev_err(DEV, "Configuration error.\n");
2305 		break;
2306 	case ASB_VIOLENTLY:
2307 		rv = drbd_asb_recover_0p(mdev);
2308 		break;
2309 	case ASB_DISCONNECT:
2310 		break;
2311 	case ASB_CALL_HELPER:
2312 		hg = drbd_asb_recover_0p(mdev);
2313 		if (hg == -1) {
2314 			enum drbd_state_rv rv2;
2315 
2316 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2317 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2318 			  * we do not need to wait for the after state change work either. */
2319 			rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2320 			if (rv2 != SS_SUCCESS) {
2321 				drbd_khelper(mdev, "pri-lost-after-sb");
2322 			} else {
2323 				dev_warn(DEV, "Successfully gave up primary role.\n");
2324 				rv = hg;
2325 			}
2326 		} else
2327 			rv = hg;
2328 	}
2329 
2330 	return rv;
2331 }
2332 
2333 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2334 			   u64 bits, u64 flags)
2335 {
2336 	if (!uuid) {
2337 		dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2338 		return;
2339 	}
2340 	dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2341 	     text,
2342 	     (unsigned long long)uuid[UI_CURRENT],
2343 	     (unsigned long long)uuid[UI_BITMAP],
2344 	     (unsigned long long)uuid[UI_HISTORY_START],
2345 	     (unsigned long long)uuid[UI_HISTORY_END],
2346 	     (unsigned long long)bits,
2347 	     (unsigned long long)flags);
2348 }
2349 
2350 /*
2351   100	after split brain try auto recover
2352     2	C_SYNC_SOURCE set BitMap
2353     1	C_SYNC_SOURCE use BitMap
2354     0	no Sync
2355    -1	C_SYNC_TARGET use BitMap
2356    -2	C_SYNC_TARGET set BitMap
2357  -100	after split brain, disconnect
2358 -1000	unrelated data
2359 -1091   requires proto 91
2360 -1096   requires proto 96
2361  */
2362 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2363 {
2364 	u64 self, peer;
2365 	int i, j;
2366 
2367 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2368 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2369 
2370 	*rule_nr = 10;
2371 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2372 		return 0;
2373 
2374 	*rule_nr = 20;
2375 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2376 	     peer != UUID_JUST_CREATED)
2377 		return -2;
2378 
2379 	*rule_nr = 30;
2380 	if (self != UUID_JUST_CREATED &&
2381 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2382 		return 2;
2383 
2384 	if (self == peer) {
2385 		int rct, dc; /* roles at crash time */
2386 
2387 		if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2388 
2389 			if (mdev->agreed_pro_version < 91)
2390 				return -1091;
2391 
2392 			if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2393 			    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2394 				dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2395 				drbd_uuid_set_bm(mdev, 0UL);
2396 
2397 				drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2398 					       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2399 				*rule_nr = 34;
2400 			} else {
2401 				dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2402 				*rule_nr = 36;
2403 			}
2404 
2405 			return 1;
2406 		}
2407 
2408 		if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2409 
2410 			if (mdev->agreed_pro_version < 91)
2411 				return -1091;
2412 
2413 			if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2414 			    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2415 				dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2416 
2417 				mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2418 				mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2419 				mdev->p_uuid[UI_BITMAP] = 0UL;
2420 
2421 				drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2422 				*rule_nr = 35;
2423 			} else {
2424 				dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2425 				*rule_nr = 37;
2426 			}
2427 
2428 			return -1;
2429 		}
2430 
2431 		/* Common power [off|failure] */
2432 		rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2433 			(mdev->p_uuid[UI_FLAGS] & 2);
2434 		/* lowest bit is set when we were primary,
2435 		 * next bit (weight 2) is set when peer was primary */
2436 		*rule_nr = 40;
2437 
2438 		switch (rct) {
2439 		case 0: /* !self_pri && !peer_pri */ return 0;
2440 		case 1: /*  self_pri && !peer_pri */ return 1;
2441 		case 2: /* !self_pri &&  peer_pri */ return -1;
2442 		case 3: /*  self_pri &&  peer_pri */
2443 			dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2444 			return dc ? -1 : 1;
2445 		}
2446 	}
2447 
2448 	*rule_nr = 50;
2449 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2450 	if (self == peer)
2451 		return -1;
2452 
2453 	*rule_nr = 51;
2454 	peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2455 	if (self == peer) {
2456 		if (mdev->agreed_pro_version < 96 ?
2457 		    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2458 		    (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2459 		    peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2460 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2461 			   resync as sync source modifications of the peer's UUIDs. */
2462 
2463 			if (mdev->agreed_pro_version < 91)
2464 				return -1091;
2465 
2466 			mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2467 			mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2468 
2469 			dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2470 			drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2471 
2472 			return -1;
2473 		}
2474 	}
2475 
2476 	*rule_nr = 60;
2477 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2478 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2479 		peer = mdev->p_uuid[i] & ~((u64)1);
2480 		if (self == peer)
2481 			return -2;
2482 	}
2483 
2484 	*rule_nr = 70;
2485 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2486 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2487 	if (self == peer)
2488 		return 1;
2489 
2490 	*rule_nr = 71;
2491 	self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2492 	if (self == peer) {
2493 		if (mdev->agreed_pro_version < 96 ?
2494 		    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2495 		    (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2496 		    self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2497 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2498 			   resync as sync source modifications of our UUIDs. */
2499 
2500 			if (mdev->agreed_pro_version < 91)
2501 				return -1091;
2502 
2503 			_drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2504 			_drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2505 
2506 			dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2507 			drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2508 				       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2509 
2510 			return 1;
2511 		}
2512 	}
2513 
2514 
2515 	*rule_nr = 80;
2516 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2517 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2518 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2519 		if (self == peer)
2520 			return 2;
2521 	}
2522 
2523 	*rule_nr = 90;
2524 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2525 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2526 	if (self == peer && self != ((u64)0))
2527 		return 100;
2528 
2529 	*rule_nr = 100;
2530 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2531 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2532 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2533 			peer = mdev->p_uuid[j] & ~((u64)1);
2534 			if (self == peer)
2535 				return -100;
2536 		}
2537 	}
2538 
2539 	return -1000;
2540 }
2541 
2542 /* drbd_sync_handshake() returns the new conn state on success, or
2543    CONN_MASK (-1) on failure.
2544  */
2545 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2546 					   enum drbd_disk_state peer_disk) __must_hold(local)
2547 {
2548 	int hg, rule_nr;
2549 	enum drbd_conns rv = C_MASK;
2550 	enum drbd_disk_state mydisk;
2551 
2552 	mydisk = mdev->state.disk;
2553 	if (mydisk == D_NEGOTIATING)
2554 		mydisk = mdev->new_state_tmp.disk;
2555 
2556 	dev_info(DEV, "drbd_sync_handshake:\n");
2557 	drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2558 	drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2559 		       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2560 
2561 	hg = drbd_uuid_compare(mdev, &rule_nr);
2562 
2563 	dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2564 
2565 	if (hg == -1000) {
2566 		dev_alert(DEV, "Unrelated data, aborting!\n");
2567 		return C_MASK;
2568 	}
2569 	if (hg < -1000) {
2570 		dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2571 		return C_MASK;
2572 	}
2573 
2574 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2575 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2576 		int f = (hg == -100) || abs(hg) == 2;
2577 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
2578 		if (f)
2579 			hg = hg*2;
2580 		dev_info(DEV, "Becoming sync %s due to disk states.\n",
2581 		     hg > 0 ? "source" : "target");
2582 	}
2583 
2584 	if (abs(hg) == 100)
2585 		drbd_khelper(mdev, "initial-split-brain");
2586 
2587 	if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2588 		int pcount = (mdev->state.role == R_PRIMARY)
2589 			   + (peer_role == R_PRIMARY);
2590 		int forced = (hg == -100);
2591 
2592 		switch (pcount) {
2593 		case 0:
2594 			hg = drbd_asb_recover_0p(mdev);
2595 			break;
2596 		case 1:
2597 			hg = drbd_asb_recover_1p(mdev);
2598 			break;
2599 		case 2:
2600 			hg = drbd_asb_recover_2p(mdev);
2601 			break;
2602 		}
2603 		if (abs(hg) < 100) {
2604 			dev_warn(DEV, "Split-Brain detected, %d primaries, "
2605 			     "automatically solved. Sync from %s node\n",
2606 			     pcount, (hg < 0) ? "peer" : "this");
2607 			if (forced) {
2608 				dev_warn(DEV, "Doing a full sync, since"
2609 				     " UUIDs where ambiguous.\n");
2610 				hg = hg*2;
2611 			}
2612 		}
2613 	}
2614 
2615 	if (hg == -100) {
2616 		if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2617 			hg = -1;
2618 		if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2619 			hg = 1;
2620 
2621 		if (abs(hg) < 100)
2622 			dev_warn(DEV, "Split-Brain detected, manually solved. "
2623 			     "Sync from %s node\n",
2624 			     (hg < 0) ? "peer" : "this");
2625 	}
2626 
2627 	if (hg == -100) {
2628 		/* FIXME this log message is not correct if we end up here
2629 		 * after an attempted attach on a diskless node.
2630 		 * We just refuse to attach -- well, we drop the "connection"
2631 		 * to that disk, in a way... */
2632 		dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2633 		drbd_khelper(mdev, "split-brain");
2634 		return C_MASK;
2635 	}
2636 
2637 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
2638 		dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2639 		return C_MASK;
2640 	}
2641 
2642 	if (hg < 0 && /* by intention we do not use mydisk here. */
2643 	    mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2644 		switch (mdev->net_conf->rr_conflict) {
2645 		case ASB_CALL_HELPER:
2646 			drbd_khelper(mdev, "pri-lost");
2647 			/* fall through */
2648 		case ASB_DISCONNECT:
2649 			dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2650 			return C_MASK;
2651 		case ASB_VIOLENTLY:
2652 			dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2653 			     "assumption\n");
2654 		}
2655 	}
2656 
2657 	if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2658 		if (hg == 0)
2659 			dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2660 		else
2661 			dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2662 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2663 				 abs(hg) >= 2 ? "full" : "bit-map based");
2664 		return C_MASK;
2665 	}
2666 
2667 	if (abs(hg) >= 2) {
2668 		dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2669 		if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2670 					BM_LOCKED_SET_ALLOWED))
2671 			return C_MASK;
2672 	}
2673 
2674 	if (hg > 0) { /* become sync source. */
2675 		rv = C_WF_BITMAP_S;
2676 	} else if (hg < 0) { /* become sync target */
2677 		rv = C_WF_BITMAP_T;
2678 	} else {
2679 		rv = C_CONNECTED;
2680 		if (drbd_bm_total_weight(mdev)) {
2681 			dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2682 			     drbd_bm_total_weight(mdev));
2683 		}
2684 	}
2685 
2686 	return rv;
2687 }
2688 
2689 /* returns 1 if invalid */
2690 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2691 {
2692 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2693 	if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2694 	    (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2695 		return 0;
2696 
2697 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2698 	if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2699 	    self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2700 		return 1;
2701 
2702 	/* everything else is valid if they are equal on both sides. */
2703 	if (peer == self)
2704 		return 0;
2705 
2706 	/* everything es is invalid. */
2707 	return 1;
2708 }
2709 
2710 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2711 {
2712 	struct p_protocol *p = &mdev->data.rbuf.protocol;
2713 	int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2714 	int p_want_lose, p_two_primaries, cf;
2715 	char p_integrity_alg[SHARED_SECRET_MAX] = "";
2716 
2717 	p_proto		= be32_to_cpu(p->protocol);
2718 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
2719 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
2720 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
2721 	p_two_primaries = be32_to_cpu(p->two_primaries);
2722 	cf		= be32_to_cpu(p->conn_flags);
2723 	p_want_lose = cf & CF_WANT_LOSE;
2724 
2725 	clear_bit(CONN_DRY_RUN, &mdev->flags);
2726 
2727 	if (cf & CF_DRY_RUN)
2728 		set_bit(CONN_DRY_RUN, &mdev->flags);
2729 
2730 	if (p_proto != mdev->net_conf->wire_protocol) {
2731 		dev_err(DEV, "incompatible communication protocols\n");
2732 		goto disconnect;
2733 	}
2734 
2735 	if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2736 		dev_err(DEV, "incompatible after-sb-0pri settings\n");
2737 		goto disconnect;
2738 	}
2739 
2740 	if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2741 		dev_err(DEV, "incompatible after-sb-1pri settings\n");
2742 		goto disconnect;
2743 	}
2744 
2745 	if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2746 		dev_err(DEV, "incompatible after-sb-2pri settings\n");
2747 		goto disconnect;
2748 	}
2749 
2750 	if (p_want_lose && mdev->net_conf->want_lose) {
2751 		dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2752 		goto disconnect;
2753 	}
2754 
2755 	if (p_two_primaries != mdev->net_conf->two_primaries) {
2756 		dev_err(DEV, "incompatible setting of the two-primaries options\n");
2757 		goto disconnect;
2758 	}
2759 
2760 	if (mdev->agreed_pro_version >= 87) {
2761 		unsigned char *my_alg = mdev->net_conf->integrity_alg;
2762 
2763 		if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2764 			return false;
2765 
2766 		p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2767 		if (strcmp(p_integrity_alg, my_alg)) {
2768 			dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2769 			goto disconnect;
2770 		}
2771 		dev_info(DEV, "data-integrity-alg: %s\n",
2772 		     my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2773 	}
2774 
2775 	return true;
2776 
2777 disconnect:
2778 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2779 	return false;
2780 }
2781 
2782 /* helper function
2783  * input: alg name, feature name
2784  * return: NULL (alg name was "")
2785  *         ERR_PTR(error) if something goes wrong
2786  *         or the crypto hash ptr, if it worked out ok. */
2787 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2788 		const char *alg, const char *name)
2789 {
2790 	struct crypto_hash *tfm;
2791 
2792 	if (!alg[0])
2793 		return NULL;
2794 
2795 	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2796 	if (IS_ERR(tfm)) {
2797 		dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2798 			alg, name, PTR_ERR(tfm));
2799 		return tfm;
2800 	}
2801 	if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2802 		crypto_free_hash(tfm);
2803 		dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2804 		return ERR_PTR(-EINVAL);
2805 	}
2806 	return tfm;
2807 }
2808 
2809 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2810 {
2811 	int ok = true;
2812 	struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2813 	unsigned int header_size, data_size, exp_max_sz;
2814 	struct crypto_hash *verify_tfm = NULL;
2815 	struct crypto_hash *csums_tfm = NULL;
2816 	const int apv = mdev->agreed_pro_version;
2817 	int *rs_plan_s = NULL;
2818 	int fifo_size = 0;
2819 
2820 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2821 		    : apv == 88 ? sizeof(struct p_rs_param)
2822 					+ SHARED_SECRET_MAX
2823 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
2824 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2825 
2826 	if (packet_size > exp_max_sz) {
2827 		dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2828 		    packet_size, exp_max_sz);
2829 		return false;
2830 	}
2831 
2832 	if (apv <= 88) {
2833 		header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2834 		data_size   = packet_size  - header_size;
2835 	} else if (apv <= 94) {
2836 		header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2837 		data_size   = packet_size  - header_size;
2838 		D_ASSERT(data_size == 0);
2839 	} else {
2840 		header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2841 		data_size   = packet_size  - header_size;
2842 		D_ASSERT(data_size == 0);
2843 	}
2844 
2845 	/* initialize verify_alg and csums_alg */
2846 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2847 
2848 	if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2849 		return false;
2850 
2851 	mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2852 
2853 	if (apv >= 88) {
2854 		if (apv == 88) {
2855 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
2856 				dev_err(DEV, "verify-alg of wrong size, "
2857 					"peer wants %u, accepting only up to %u byte\n",
2858 					data_size, SHARED_SECRET_MAX);
2859 				return false;
2860 			}
2861 
2862 			if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2863 				return false;
2864 
2865 			/* we expect NUL terminated string */
2866 			/* but just in case someone tries to be evil */
2867 			D_ASSERT(p->verify_alg[data_size-1] == 0);
2868 			p->verify_alg[data_size-1] = 0;
2869 
2870 		} else /* apv >= 89 */ {
2871 			/* we still expect NUL terminated strings */
2872 			/* but just in case someone tries to be evil */
2873 			D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2874 			D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2875 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2876 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2877 		}
2878 
2879 		if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2880 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2881 				dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2882 				    mdev->sync_conf.verify_alg, p->verify_alg);
2883 				goto disconnect;
2884 			}
2885 			verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2886 					p->verify_alg, "verify-alg");
2887 			if (IS_ERR(verify_tfm)) {
2888 				verify_tfm = NULL;
2889 				goto disconnect;
2890 			}
2891 		}
2892 
2893 		if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2894 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2895 				dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2896 				    mdev->sync_conf.csums_alg, p->csums_alg);
2897 				goto disconnect;
2898 			}
2899 			csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2900 					p->csums_alg, "csums-alg");
2901 			if (IS_ERR(csums_tfm)) {
2902 				csums_tfm = NULL;
2903 				goto disconnect;
2904 			}
2905 		}
2906 
2907 		if (apv > 94) {
2908 			mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2909 			mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2910 			mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2911 			mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2912 			mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2913 
2914 			fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2915 			if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2916 				rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2917 				if (!rs_plan_s) {
2918 					dev_err(DEV, "kmalloc of fifo_buffer failed");
2919 					goto disconnect;
2920 				}
2921 			}
2922 		}
2923 
2924 		spin_lock(&mdev->peer_seq_lock);
2925 		/* lock against drbd_nl_syncer_conf() */
2926 		if (verify_tfm) {
2927 			strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2928 			mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2929 			crypto_free_hash(mdev->verify_tfm);
2930 			mdev->verify_tfm = verify_tfm;
2931 			dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2932 		}
2933 		if (csums_tfm) {
2934 			strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2935 			mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2936 			crypto_free_hash(mdev->csums_tfm);
2937 			mdev->csums_tfm = csums_tfm;
2938 			dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2939 		}
2940 		if (fifo_size != mdev->rs_plan_s.size) {
2941 			kfree(mdev->rs_plan_s.values);
2942 			mdev->rs_plan_s.values = rs_plan_s;
2943 			mdev->rs_plan_s.size   = fifo_size;
2944 			mdev->rs_planed = 0;
2945 		}
2946 		spin_unlock(&mdev->peer_seq_lock);
2947 	}
2948 
2949 	return ok;
2950 disconnect:
2951 	/* just for completeness: actually not needed,
2952 	 * as this is not reached if csums_tfm was ok. */
2953 	crypto_free_hash(csums_tfm);
2954 	/* but free the verify_tfm again, if csums_tfm did not work out */
2955 	crypto_free_hash(verify_tfm);
2956 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2957 	return false;
2958 }
2959 
2960 /* warn if the arguments differ by more than 12.5% */
2961 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2962 	const char *s, sector_t a, sector_t b)
2963 {
2964 	sector_t d;
2965 	if (a == 0 || b == 0)
2966 		return;
2967 	d = (a > b) ? (a - b) : (b - a);
2968 	if (d > (a>>3) || d > (b>>3))
2969 		dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2970 		     (unsigned long long)a, (unsigned long long)b);
2971 }
2972 
2973 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2974 {
2975 	struct p_sizes *p = &mdev->data.rbuf.sizes;
2976 	enum determine_dev_size dd = unchanged;
2977 	sector_t p_size, p_usize, my_usize;
2978 	int ldsc = 0; /* local disk size changed */
2979 	enum dds_flags ddsf;
2980 
2981 	p_size = be64_to_cpu(p->d_size);
2982 	p_usize = be64_to_cpu(p->u_size);
2983 
2984 	if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2985 		dev_err(DEV, "some backing storage is needed\n");
2986 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2987 		return false;
2988 	}
2989 
2990 	/* just store the peer's disk size for now.
2991 	 * we still need to figure out whether we accept that. */
2992 	mdev->p_size = p_size;
2993 
2994 	if (get_ldev(mdev)) {
2995 		warn_if_differ_considerably(mdev, "lower level device sizes",
2996 			   p_size, drbd_get_max_capacity(mdev->ldev));
2997 		warn_if_differ_considerably(mdev, "user requested size",
2998 					    p_usize, mdev->ldev->dc.disk_size);
2999 
3000 		/* if this is the first connect, or an otherwise expected
3001 		 * param exchange, choose the minimum */
3002 		if (mdev->state.conn == C_WF_REPORT_PARAMS)
3003 			p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3004 					     p_usize);
3005 
3006 		my_usize = mdev->ldev->dc.disk_size;
3007 
3008 		if (mdev->ldev->dc.disk_size != p_usize) {
3009 			mdev->ldev->dc.disk_size = p_usize;
3010 			dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3011 			     (unsigned long)mdev->ldev->dc.disk_size);
3012 		}
3013 
3014 		/* Never shrink a device with usable data during connect.
3015 		   But allow online shrinking if we are connected. */
3016 		if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3017 		   drbd_get_capacity(mdev->this_bdev) &&
3018 		   mdev->state.disk >= D_OUTDATED &&
3019 		   mdev->state.conn < C_CONNECTED) {
3020 			dev_err(DEV, "The peer's disk size is too small!\n");
3021 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3022 			mdev->ldev->dc.disk_size = my_usize;
3023 			put_ldev(mdev);
3024 			return false;
3025 		}
3026 		put_ldev(mdev);
3027 	}
3028 
3029 	ddsf = be16_to_cpu(p->dds_flags);
3030 	if (get_ldev(mdev)) {
3031 		dd = drbd_determine_dev_size(mdev, ddsf);
3032 		put_ldev(mdev);
3033 		if (dd == dev_size_error)
3034 			return false;
3035 		drbd_md_sync(mdev);
3036 	} else {
3037 		/* I am diskless, need to accept the peer's size. */
3038 		drbd_set_my_capacity(mdev, p_size);
3039 	}
3040 
3041 	mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3042 	drbd_reconsider_max_bio_size(mdev);
3043 
3044 	if (get_ldev(mdev)) {
3045 		if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3046 			mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3047 			ldsc = 1;
3048 		}
3049 
3050 		put_ldev(mdev);
3051 	}
3052 
3053 	if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3054 		if (be64_to_cpu(p->c_size) !=
3055 		    drbd_get_capacity(mdev->this_bdev) || ldsc) {
3056 			/* we have different sizes, probably peer
3057 			 * needs to know my new size... */
3058 			drbd_send_sizes(mdev, 0, ddsf);
3059 		}
3060 		if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3061 		    (dd == grew && mdev->state.conn == C_CONNECTED)) {
3062 			if (mdev->state.pdsk >= D_INCONSISTENT &&
3063 			    mdev->state.disk >= D_INCONSISTENT) {
3064 				if (ddsf & DDSF_NO_RESYNC)
3065 					dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3066 				else
3067 					resync_after_online_grow(mdev);
3068 			} else
3069 				set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3070 		}
3071 	}
3072 
3073 	return true;
3074 }
3075 
3076 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3077 {
3078 	struct p_uuids *p = &mdev->data.rbuf.uuids;
3079 	u64 *p_uuid;
3080 	int i, updated_uuids = 0;
3081 
3082 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3083 
3084 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3085 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
3086 
3087 	kfree(mdev->p_uuid);
3088 	mdev->p_uuid = p_uuid;
3089 
3090 	if (mdev->state.conn < C_CONNECTED &&
3091 	    mdev->state.disk < D_INCONSISTENT &&
3092 	    mdev->state.role == R_PRIMARY &&
3093 	    (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3094 		dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3095 		    (unsigned long long)mdev->ed_uuid);
3096 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3097 		return false;
3098 	}
3099 
3100 	if (get_ldev(mdev)) {
3101 		int skip_initial_sync =
3102 			mdev->state.conn == C_CONNECTED &&
3103 			mdev->agreed_pro_version >= 90 &&
3104 			mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3105 			(p_uuid[UI_FLAGS] & 8);
3106 		if (skip_initial_sync) {
3107 			dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3108 			drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3109 					"clear_n_write from receive_uuids",
3110 					BM_LOCKED_TEST_ALLOWED);
3111 			_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3112 			_drbd_uuid_set(mdev, UI_BITMAP, 0);
3113 			_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3114 					CS_VERBOSE, NULL);
3115 			drbd_md_sync(mdev);
3116 			updated_uuids = 1;
3117 		}
3118 		put_ldev(mdev);
3119 	} else if (mdev->state.disk < D_INCONSISTENT &&
3120 		   mdev->state.role == R_PRIMARY) {
3121 		/* I am a diskless primary, the peer just created a new current UUID
3122 		   for me. */
3123 		updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3124 	}
3125 
3126 	/* Before we test for the disk state, we should wait until an eventually
3127 	   ongoing cluster wide state change is finished. That is important if
3128 	   we are primary and are detaching from our disk. We need to see the
3129 	   new disk state... */
3130 	wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3131 	if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3132 		updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3133 
3134 	if (updated_uuids)
3135 		drbd_print_uuids(mdev, "receiver updated UUIDs to");
3136 
3137 	return true;
3138 }
3139 
3140 /**
3141  * convert_state() - Converts the peer's view of the cluster state to our point of view
3142  * @ps:		The state as seen by the peer.
3143  */
3144 static union drbd_state convert_state(union drbd_state ps)
3145 {
3146 	union drbd_state ms;
3147 
3148 	static enum drbd_conns c_tab[] = {
3149 		[C_CONNECTED] = C_CONNECTED,
3150 
3151 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3152 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3153 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3154 		[C_VERIFY_S]       = C_VERIFY_T,
3155 		[C_MASK]   = C_MASK,
3156 	};
3157 
3158 	ms.i = ps.i;
3159 
3160 	ms.conn = c_tab[ps.conn];
3161 	ms.peer = ps.role;
3162 	ms.role = ps.peer;
3163 	ms.pdsk = ps.disk;
3164 	ms.disk = ps.pdsk;
3165 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3166 
3167 	return ms;
3168 }
3169 
3170 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3171 {
3172 	struct p_req_state *p = &mdev->data.rbuf.req_state;
3173 	union drbd_state mask, val;
3174 	enum drbd_state_rv rv;
3175 
3176 	mask.i = be32_to_cpu(p->mask);
3177 	val.i = be32_to_cpu(p->val);
3178 
3179 	if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3180 	    test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3181 		drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3182 		return true;
3183 	}
3184 
3185 	mask = convert_state(mask);
3186 	val = convert_state(val);
3187 
3188 	rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3189 
3190 	drbd_send_sr_reply(mdev, rv);
3191 	drbd_md_sync(mdev);
3192 
3193 	return true;
3194 }
3195 
3196 static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3197 {
3198 	struct p_state *p = &mdev->data.rbuf.state;
3199 	union drbd_state os, ns, peer_state;
3200 	enum drbd_disk_state real_peer_disk;
3201 	enum chg_state_flags cs_flags;
3202 	int rv;
3203 
3204 	peer_state.i = be32_to_cpu(p->state);
3205 
3206 	real_peer_disk = peer_state.disk;
3207 	if (peer_state.disk == D_NEGOTIATING) {
3208 		real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3209 		dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3210 	}
3211 
3212 	spin_lock_irq(&mdev->req_lock);
3213  retry:
3214 	os = ns = mdev->state;
3215 	spin_unlock_irq(&mdev->req_lock);
3216 
3217 	/* If some other part of the code (asender thread, timeout)
3218 	 * already decided to close the connection again,
3219 	 * we must not "re-establish" it here. */
3220 	if (os.conn <= C_TEAR_DOWN)
3221 		return false;
3222 
3223 	/* If this is the "end of sync" confirmation, usually the peer disk
3224 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3225 	 * set) resync started in PausedSyncT, or if the timing of pause-/
3226 	 * unpause-sync events has been "just right", the peer disk may
3227 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3228 	 */
3229 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3230 	    real_peer_disk == D_UP_TO_DATE &&
3231 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3232 		/* If we are (becoming) SyncSource, but peer is still in sync
3233 		 * preparation, ignore its uptodate-ness to avoid flapping, it
3234 		 * will change to inconsistent once the peer reaches active
3235 		 * syncing states.
3236 		 * It may have changed syncer-paused flags, however, so we
3237 		 * cannot ignore this completely. */
3238 		if (peer_state.conn > C_CONNECTED &&
3239 		    peer_state.conn < C_SYNC_SOURCE)
3240 			real_peer_disk = D_INCONSISTENT;
3241 
3242 		/* if peer_state changes to connected at the same time,
3243 		 * it explicitly notifies us that it finished resync.
3244 		 * Maybe we should finish it up, too? */
3245 		else if (os.conn >= C_SYNC_SOURCE &&
3246 			 peer_state.conn == C_CONNECTED) {
3247 			if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3248 				drbd_resync_finished(mdev);
3249 			return true;
3250 		}
3251 	}
3252 
3253 	/* peer says his disk is inconsistent, while we think it is uptodate,
3254 	 * and this happens while the peer still thinks we have a sync going on,
3255 	 * but we think we are already done with the sync.
3256 	 * We ignore this to avoid flapping pdsk.
3257 	 * This should not happen, if the peer is a recent version of drbd. */
3258 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3259 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3260 		real_peer_disk = D_UP_TO_DATE;
3261 
3262 	if (ns.conn == C_WF_REPORT_PARAMS)
3263 		ns.conn = C_CONNECTED;
3264 
3265 	if (peer_state.conn == C_AHEAD)
3266 		ns.conn = C_BEHIND;
3267 
3268 	if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3269 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
3270 		int cr; /* consider resync */
3271 
3272 		/* if we established a new connection */
3273 		cr  = (os.conn < C_CONNECTED);
3274 		/* if we had an established connection
3275 		 * and one of the nodes newly attaches a disk */
3276 		cr |= (os.conn == C_CONNECTED &&
3277 		       (peer_state.disk == D_NEGOTIATING ||
3278 			os.disk == D_NEGOTIATING));
3279 		/* if we have both been inconsistent, and the peer has been
3280 		 * forced to be UpToDate with --overwrite-data */
3281 		cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3282 		/* if we had been plain connected, and the admin requested to
3283 		 * start a sync by "invalidate" or "invalidate-remote" */
3284 		cr |= (os.conn == C_CONNECTED &&
3285 				(peer_state.conn >= C_STARTING_SYNC_S &&
3286 				 peer_state.conn <= C_WF_BITMAP_T));
3287 
3288 		if (cr)
3289 			ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3290 
3291 		put_ldev(mdev);
3292 		if (ns.conn == C_MASK) {
3293 			ns.conn = C_CONNECTED;
3294 			if (mdev->state.disk == D_NEGOTIATING) {
3295 				drbd_force_state(mdev, NS(disk, D_FAILED));
3296 			} else if (peer_state.disk == D_NEGOTIATING) {
3297 				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3298 				peer_state.disk = D_DISKLESS;
3299 				real_peer_disk = D_DISKLESS;
3300 			} else {
3301 				if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3302 					return false;
3303 				D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3304 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3305 				return false;
3306 			}
3307 		}
3308 	}
3309 
3310 	spin_lock_irq(&mdev->req_lock);
3311 	if (mdev->state.i != os.i)
3312 		goto retry;
3313 	clear_bit(CONSIDER_RESYNC, &mdev->flags);
3314 	ns.peer = peer_state.role;
3315 	ns.pdsk = real_peer_disk;
3316 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3317 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3318 		ns.disk = mdev->new_state_tmp.disk;
3319 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3320 	if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3321 	    test_bit(NEW_CUR_UUID, &mdev->flags)) {
3322 		/* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3323 		   for temporal network outages! */
3324 		spin_unlock_irq(&mdev->req_lock);
3325 		dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3326 		tl_clear(mdev);
3327 		drbd_uuid_new_current(mdev);
3328 		clear_bit(NEW_CUR_UUID, &mdev->flags);
3329 		drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3330 		return false;
3331 	}
3332 	rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3333 	ns = mdev->state;
3334 	spin_unlock_irq(&mdev->req_lock);
3335 
3336 	if (rv < SS_SUCCESS) {
3337 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3338 		return false;
3339 	}
3340 
3341 	if (os.conn > C_WF_REPORT_PARAMS) {
3342 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3343 		    peer_state.disk != D_NEGOTIATING ) {
3344 			/* we want resync, peer has not yet decided to sync... */
3345 			/* Nowadays only used when forcing a node into primary role and
3346 			   setting its disk to UpToDate with that */
3347 			drbd_send_uuids(mdev);
3348 			drbd_send_current_state(mdev);
3349 		}
3350 	}
3351 
3352 	mdev->net_conf->want_lose = 0;
3353 
3354 	drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3355 
3356 	return true;
3357 }
3358 
3359 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3360 {
3361 	struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3362 
3363 	wait_event(mdev->misc_wait,
3364 		   mdev->state.conn == C_WF_SYNC_UUID ||
3365 		   mdev->state.conn == C_BEHIND ||
3366 		   mdev->state.conn < C_CONNECTED ||
3367 		   mdev->state.disk < D_NEGOTIATING);
3368 
3369 	/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3370 
3371 	/* Here the _drbd_uuid_ functions are right, current should
3372 	   _not_ be rotated into the history */
3373 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3374 		_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3375 		_drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3376 
3377 		drbd_print_uuids(mdev, "updated sync uuid");
3378 		drbd_start_resync(mdev, C_SYNC_TARGET);
3379 
3380 		put_ldev(mdev);
3381 	} else
3382 		dev_err(DEV, "Ignoring SyncUUID packet!\n");
3383 
3384 	return true;
3385 }
3386 
3387 /**
3388  * receive_bitmap_plain
3389  *
3390  * Return 0 when done, 1 when another iteration is needed, and a negative error
3391  * code upon failure.
3392  */
3393 static int
3394 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3395 		     unsigned long *buffer, struct bm_xfer_ctx *c)
3396 {
3397 	unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3398 	unsigned want = num_words * sizeof(long);
3399 	int err;
3400 
3401 	if (want != data_size) {
3402 		dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3403 		return -EIO;
3404 	}
3405 	if (want == 0)
3406 		return 0;
3407 	err = drbd_recv(mdev, buffer, want);
3408 	if (err != want) {
3409 		if (err >= 0)
3410 			err = -EIO;
3411 		return err;
3412 	}
3413 
3414 	drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3415 
3416 	c->word_offset += num_words;
3417 	c->bit_offset = c->word_offset * BITS_PER_LONG;
3418 	if (c->bit_offset > c->bm_bits)
3419 		c->bit_offset = c->bm_bits;
3420 
3421 	return 1;
3422 }
3423 
3424 /**
3425  * recv_bm_rle_bits
3426  *
3427  * Return 0 when done, 1 when another iteration is needed, and a negative error
3428  * code upon failure.
3429  */
3430 static int
3431 recv_bm_rle_bits(struct drbd_conf *mdev,
3432 		struct p_compressed_bm *p,
3433 		struct bm_xfer_ctx *c)
3434 {
3435 	struct bitstream bs;
3436 	u64 look_ahead;
3437 	u64 rl;
3438 	u64 tmp;
3439 	unsigned long s = c->bit_offset;
3440 	unsigned long e;
3441 	int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3442 	int toggle = DCBP_get_start(p);
3443 	int have;
3444 	int bits;
3445 
3446 	bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3447 
3448 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
3449 	if (bits < 0)
3450 		return -EIO;
3451 
3452 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
3453 		bits = vli_decode_bits(&rl, look_ahead);
3454 		if (bits <= 0)
3455 			return -EIO;
3456 
3457 		if (toggle) {
3458 			e = s + rl -1;
3459 			if (e >= c->bm_bits) {
3460 				dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3461 				return -EIO;
3462 			}
3463 			_drbd_bm_set_bits(mdev, s, e);
3464 		}
3465 
3466 		if (have < bits) {
3467 			dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3468 				have, bits, look_ahead,
3469 				(unsigned int)(bs.cur.b - p->code),
3470 				(unsigned int)bs.buf_len);
3471 			return -EIO;
3472 		}
3473 		look_ahead >>= bits;
3474 		have -= bits;
3475 
3476 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3477 		if (bits < 0)
3478 			return -EIO;
3479 		look_ahead |= tmp << have;
3480 		have += bits;
3481 	}
3482 
3483 	c->bit_offset = s;
3484 	bm_xfer_ctx_bit_to_word_offset(c);
3485 
3486 	return (s != c->bm_bits);
3487 }
3488 
3489 /**
3490  * decode_bitmap_c
3491  *
3492  * Return 0 when done, 1 when another iteration is needed, and a negative error
3493  * code upon failure.
3494  */
3495 static int
3496 decode_bitmap_c(struct drbd_conf *mdev,
3497 		struct p_compressed_bm *p,
3498 		struct bm_xfer_ctx *c)
3499 {
3500 	if (DCBP_get_code(p) == RLE_VLI_Bits)
3501 		return recv_bm_rle_bits(mdev, p, c);
3502 
3503 	/* other variants had been implemented for evaluation,
3504 	 * but have been dropped as this one turned out to be "best"
3505 	 * during all our tests. */
3506 
3507 	dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3508 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3509 	return -EIO;
3510 }
3511 
3512 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3513 		const char *direction, struct bm_xfer_ctx *c)
3514 {
3515 	/* what would it take to transfer it "plaintext" */
3516 	unsigned plain = sizeof(struct p_header80) *
3517 		((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3518 		+ c->bm_words * sizeof(long);
3519 	unsigned total = c->bytes[0] + c->bytes[1];
3520 	unsigned r;
3521 
3522 	/* total can not be zero. but just in case: */
3523 	if (total == 0)
3524 		return;
3525 
3526 	/* don't report if not compressed */
3527 	if (total >= plain)
3528 		return;
3529 
3530 	/* total < plain. check for overflow, still */
3531 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3532 		                    : (1000 * total / plain);
3533 
3534 	if (r > 1000)
3535 		r = 1000;
3536 
3537 	r = 1000 - r;
3538 	dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3539 	     "total %u; compression: %u.%u%%\n",
3540 			direction,
3541 			c->bytes[1], c->packets[1],
3542 			c->bytes[0], c->packets[0],
3543 			total, r/10, r % 10);
3544 }
3545 
3546 /* Since we are processing the bitfield from lower addresses to higher,
3547    it does not matter if the process it in 32 bit chunks or 64 bit
3548    chunks as long as it is little endian. (Understand it as byte stream,
3549    beginning with the lowest byte...) If we would use big endian
3550    we would need to process it from the highest address to the lowest,
3551    in order to be agnostic to the 32 vs 64 bits issue.
3552 
3553    returns 0 on failure, 1 if we successfully received it. */
3554 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3555 {
3556 	struct bm_xfer_ctx c;
3557 	void *buffer;
3558 	int err;
3559 	int ok = false;
3560 	struct p_header80 *h = &mdev->data.rbuf.header.h80;
3561 
3562 	drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3563 	/* you are supposed to send additional out-of-sync information
3564 	 * if you actually set bits during this phase */
3565 
3566 	/* maybe we should use some per thread scratch page,
3567 	 * and allocate that during initial device creation? */
3568 	buffer	 = (unsigned long *) __get_free_page(GFP_NOIO);
3569 	if (!buffer) {
3570 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3571 		goto out;
3572 	}
3573 
3574 	c = (struct bm_xfer_ctx) {
3575 		.bm_bits = drbd_bm_bits(mdev),
3576 		.bm_words = drbd_bm_words(mdev),
3577 	};
3578 
3579 	for(;;) {
3580 		if (cmd == P_BITMAP) {
3581 			err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3582 		} else if (cmd == P_COMPRESSED_BITMAP) {
3583 			/* MAYBE: sanity check that we speak proto >= 90,
3584 			 * and the feature is enabled! */
3585 			struct p_compressed_bm *p;
3586 
3587 			if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3588 				dev_err(DEV, "ReportCBitmap packet too large\n");
3589 				goto out;
3590 			}
3591 			/* use the page buff */
3592 			p = buffer;
3593 			memcpy(p, h, sizeof(*h));
3594 			if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3595 				goto out;
3596 			if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3597 				dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3598 				goto out;
3599 			}
3600 			err = decode_bitmap_c(mdev, p, &c);
3601 		} else {
3602 			dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3603 			goto out;
3604 		}
3605 
3606 		c.packets[cmd == P_BITMAP]++;
3607 		c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3608 
3609 		if (err <= 0) {
3610 			if (err < 0)
3611 				goto out;
3612 			break;
3613 		}
3614 		if (!drbd_recv_header(mdev, &cmd, &data_size))
3615 			goto out;
3616 	}
3617 
3618 	INFO_bm_xfer_stats(mdev, "receive", &c);
3619 
3620 	if (mdev->state.conn == C_WF_BITMAP_T) {
3621 		enum drbd_state_rv rv;
3622 
3623 		ok = !drbd_send_bitmap(mdev);
3624 		if (!ok)
3625 			goto out;
3626 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3627 		rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3628 		D_ASSERT(rv == SS_SUCCESS);
3629 	} else if (mdev->state.conn != C_WF_BITMAP_S) {
3630 		/* admin may have requested C_DISCONNECTING,
3631 		 * other threads may have noticed network errors */
3632 		dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3633 		    drbd_conn_str(mdev->state.conn));
3634 	}
3635 
3636 	ok = true;
3637  out:
3638 	drbd_bm_unlock(mdev);
3639 	if (ok && mdev->state.conn == C_WF_BITMAP_S)
3640 		drbd_start_resync(mdev, C_SYNC_SOURCE);
3641 	free_page((unsigned long) buffer);
3642 	return ok;
3643 }
3644 
3645 static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3646 {
3647 	/* TODO zero copy sink :) */
3648 	static char sink[128];
3649 	int size, want, r;
3650 
3651 	dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3652 		 cmd, data_size);
3653 
3654 	size = data_size;
3655 	while (size > 0) {
3656 		want = min_t(int, size, sizeof(sink));
3657 		r = drbd_recv(mdev, sink, want);
3658 		ERR_IF(r <= 0) break;
3659 		size -= r;
3660 	}
3661 	return size == 0;
3662 }
3663 
3664 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3665 {
3666 	/* Make sure we've acked all the TCP data associated
3667 	 * with the data requests being unplugged */
3668 	drbd_tcp_quickack(mdev->data.socket);
3669 
3670 	return true;
3671 }
3672 
3673 static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3674 {
3675 	struct p_block_desc *p = &mdev->data.rbuf.block_desc;
3676 
3677 	switch (mdev->state.conn) {
3678 	case C_WF_SYNC_UUID:
3679 	case C_WF_BITMAP_T:
3680 	case C_BEHIND:
3681 			break;
3682 	default:
3683 		dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3684 				drbd_conn_str(mdev->state.conn));
3685 	}
3686 
3687 	drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3688 
3689 	return true;
3690 }
3691 
3692 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3693 
3694 struct data_cmd {
3695 	int expect_payload;
3696 	size_t pkt_size;
3697 	drbd_cmd_handler_f function;
3698 };
3699 
3700 static struct data_cmd drbd_cmd_handler[] = {
3701 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
3702 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
3703 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3704 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3705 	[P_BITMAP]	    = { 1, sizeof(struct p_header80), receive_bitmap } ,
3706 	[P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3707 	[P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3708 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3709 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3710 	[P_SYNC_PARAM]	    = { 1, sizeof(struct p_header80), receive_SyncParam },
3711 	[P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
3712 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
3713 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
3714 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
3715 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
3716 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
3717 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3718 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
3719 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
3720 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3721 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
3722 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3723 	/* anything missing from this table is in
3724 	 * the asender_tbl, see get_asender_cmd */
3725 	[P_MAX_CMD]	    = { 0, 0, NULL },
3726 };
3727 
3728 /* All handler functions that expect a sub-header get that sub-heder in
3729    mdev->data.rbuf.header.head.payload.
3730 
3731    Usually in mdev->data.rbuf.header.head the callback can find the usual
3732    p_header, but they may not rely on that. Since there is also p_header95 !
3733  */
3734 
3735 static void drbdd(struct drbd_conf *mdev)
3736 {
3737 	union p_header *header = &mdev->data.rbuf.header;
3738 	unsigned int packet_size;
3739 	enum drbd_packets cmd;
3740 	size_t shs; /* sub header size */
3741 	int rv;
3742 
3743 	while (get_t_state(&mdev->receiver) == Running) {
3744 		drbd_thread_current_set_cpu(mdev);
3745 		if (!drbd_recv_header(mdev, &cmd, &packet_size))
3746 			goto err_out;
3747 
3748 		if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3749 			dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3750 			goto err_out;
3751 		}
3752 
3753 		shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3754 		if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3755 			dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3756 			goto err_out;
3757 		}
3758 
3759 		if (shs) {
3760 			rv = drbd_recv(mdev, &header->h80.payload, shs);
3761 			if (unlikely(rv != shs)) {
3762 				if (!signal_pending(current))
3763 					dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
3764 				goto err_out;
3765 			}
3766 		}
3767 
3768 		rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3769 
3770 		if (unlikely(!rv)) {
3771 			dev_err(DEV, "error receiving %s, l: %d!\n",
3772 			    cmdname(cmd), packet_size);
3773 			goto err_out;
3774 		}
3775 	}
3776 
3777 	if (0) {
3778 	err_out:
3779 		drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3780 	}
3781 	/* If we leave here, we probably want to update at least the
3782 	 * "Connected" indicator on stable storage. Do so explicitly here. */
3783 	drbd_md_sync(mdev);
3784 }
3785 
3786 void drbd_flush_workqueue(struct drbd_conf *mdev)
3787 {
3788 	struct drbd_wq_barrier barr;
3789 
3790 	barr.w.cb = w_prev_work_done;
3791 	init_completion(&barr.done);
3792 	drbd_queue_work(&mdev->data.work, &barr.w);
3793 	wait_for_completion(&barr.done);
3794 }
3795 
3796 void drbd_free_tl_hash(struct drbd_conf *mdev)
3797 {
3798 	struct hlist_head *h;
3799 
3800 	spin_lock_irq(&mdev->req_lock);
3801 
3802 	if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3803 		spin_unlock_irq(&mdev->req_lock);
3804 		return;
3805 	}
3806 	/* paranoia code */
3807 	for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3808 		if (h->first)
3809 			dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3810 				(int)(h - mdev->ee_hash), h->first);
3811 	kfree(mdev->ee_hash);
3812 	mdev->ee_hash = NULL;
3813 	mdev->ee_hash_s = 0;
3814 
3815 	/* We may not have had the chance to wait for all locally pending
3816 	 * application requests. The hlist_add_fake() prevents access after
3817 	 * free on master bio completion. */
3818 	for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++) {
3819 		struct drbd_request *req;
3820 		struct hlist_node *pos, *n;
3821 		hlist_for_each_entry_safe(req, pos, n, h, collision) {
3822 			hlist_del_init(&req->collision);
3823 			hlist_add_fake(&req->collision);
3824 		}
3825 	}
3826 
3827 	kfree(mdev->tl_hash);
3828 	mdev->tl_hash = NULL;
3829 	mdev->tl_hash_s = 0;
3830 	spin_unlock_irq(&mdev->req_lock);
3831 }
3832 
3833 static void drbd_disconnect(struct drbd_conf *mdev)
3834 {
3835 	enum drbd_fencing_p fp;
3836 	union drbd_state os, ns;
3837 	int rv = SS_UNKNOWN_ERROR;
3838 	unsigned int i;
3839 
3840 	if (mdev->state.conn == C_STANDALONE)
3841 		return;
3842 
3843 	/* We are about to start the cleanup after connection loss.
3844 	 * Make sure drbd_make_request knows about that.
3845 	 * Usually we should be in some network failure state already,
3846 	 * but just in case we are not, we fix it up here.
3847 	 */
3848 	drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
3849 
3850 	/* asender does not clean up anything. it must not interfere, either */
3851 	drbd_thread_stop(&mdev->asender);
3852 	drbd_free_sock(mdev);
3853 
3854 	/* wait for current activity to cease. */
3855 	spin_lock_irq(&mdev->req_lock);
3856 	_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3857 	_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3858 	_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3859 	spin_unlock_irq(&mdev->req_lock);
3860 
3861 	/* We do not have data structures that would allow us to
3862 	 * get the rs_pending_cnt down to 0 again.
3863 	 *  * On C_SYNC_TARGET we do not have any data structures describing
3864 	 *    the pending RSDataRequest's we have sent.
3865 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
3866 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3867 	 *  And no, it is not the sum of the reference counts in the
3868 	 *  resync_LRU. The resync_LRU tracks the whole operation including
3869 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3870 	 *  on the fly. */
3871 	drbd_rs_cancel_all(mdev);
3872 	mdev->rs_total = 0;
3873 	mdev->rs_failed = 0;
3874 	atomic_set(&mdev->rs_pending_cnt, 0);
3875 	wake_up(&mdev->misc_wait);
3876 
3877 	/* make sure syncer is stopped and w_resume_next_sg queued */
3878 	del_timer_sync(&mdev->resync_timer);
3879 	resync_timer_fn((unsigned long)mdev);
3880 
3881 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3882 	 * w_make_resync_request etc. which may still be on the worker queue
3883 	 * to be "canceled" */
3884 	drbd_flush_workqueue(mdev);
3885 
3886 	/* This also does reclaim_net_ee().  If we do this too early, we might
3887 	 * miss some resync ee and pages.*/
3888 	drbd_process_done_ee(mdev);
3889 
3890 	kfree(mdev->p_uuid);
3891 	mdev->p_uuid = NULL;
3892 
3893 	if (!is_susp(mdev->state))
3894 		tl_clear(mdev);
3895 
3896 	dev_info(DEV, "Connection closed\n");
3897 
3898 	drbd_md_sync(mdev);
3899 
3900 	fp = FP_DONT_CARE;
3901 	if (get_ldev(mdev)) {
3902 		fp = mdev->ldev->dc.fencing;
3903 		put_ldev(mdev);
3904 	}
3905 
3906 	if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3907 		drbd_try_outdate_peer_async(mdev);
3908 
3909 	spin_lock_irq(&mdev->req_lock);
3910 	os = mdev->state;
3911 	if (os.conn >= C_UNCONNECTED) {
3912 		/* Do not restart in case we are C_DISCONNECTING */
3913 		ns = os;
3914 		ns.conn = C_UNCONNECTED;
3915 		rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3916 	}
3917 	spin_unlock_irq(&mdev->req_lock);
3918 
3919 	if (os.conn == C_DISCONNECTING) {
3920 		wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3921 
3922 		crypto_free_hash(mdev->cram_hmac_tfm);
3923 		mdev->cram_hmac_tfm = NULL;
3924 
3925 		kfree(mdev->net_conf);
3926 		mdev->net_conf = NULL;
3927 		drbd_request_state(mdev, NS(conn, C_STANDALONE));
3928 	}
3929 
3930 	/* serialize with bitmap writeout triggered by the state change,
3931 	 * if any. */
3932 	wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3933 
3934 	/* tcp_close and release of sendpage pages can be deferred.  I don't
3935 	 * want to use SO_LINGER, because apparently it can be deferred for
3936 	 * more than 20 seconds (longest time I checked).
3937 	 *
3938 	 * Actually we don't care for exactly when the network stack does its
3939 	 * put_page(), but release our reference on these pages right here.
3940 	 */
3941 	i = drbd_release_ee(mdev, &mdev->net_ee);
3942 	if (i)
3943 		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3944 	i = atomic_read(&mdev->pp_in_use_by_net);
3945 	if (i)
3946 		dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3947 	i = atomic_read(&mdev->pp_in_use);
3948 	if (i)
3949 		dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3950 
3951 	D_ASSERT(list_empty(&mdev->read_ee));
3952 	D_ASSERT(list_empty(&mdev->active_ee));
3953 	D_ASSERT(list_empty(&mdev->sync_ee));
3954 	D_ASSERT(list_empty(&mdev->done_ee));
3955 
3956 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3957 	atomic_set(&mdev->current_epoch->epoch_size, 0);
3958 	D_ASSERT(list_empty(&mdev->current_epoch->list));
3959 }
3960 
3961 /*
3962  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3963  * we can agree on is stored in agreed_pro_version.
3964  *
3965  * feature flags and the reserved array should be enough room for future
3966  * enhancements of the handshake protocol, and possible plugins...
3967  *
3968  * for now, they are expected to be zero, but ignored.
3969  */
3970 static int drbd_send_handshake(struct drbd_conf *mdev)
3971 {
3972 	/* ASSERT current == mdev->receiver ... */
3973 	struct p_handshake *p = &mdev->data.sbuf.handshake;
3974 	int ok;
3975 
3976 	if (mutex_lock_interruptible(&mdev->data.mutex)) {
3977 		dev_err(DEV, "interrupted during initial handshake\n");
3978 		return 0; /* interrupted. not ok. */
3979 	}
3980 
3981 	if (mdev->data.socket == NULL) {
3982 		mutex_unlock(&mdev->data.mutex);
3983 		return 0;
3984 	}
3985 
3986 	memset(p, 0, sizeof(*p));
3987 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3988 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3989 	ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3990 			     (struct p_header80 *)p, sizeof(*p), 0 );
3991 	mutex_unlock(&mdev->data.mutex);
3992 	return ok;
3993 }
3994 
3995 /*
3996  * return values:
3997  *   1 yes, we have a valid connection
3998  *   0 oops, did not work out, please try again
3999  *  -1 peer talks different language,
4000  *     no point in trying again, please go standalone.
4001  */
4002 static int drbd_do_handshake(struct drbd_conf *mdev)
4003 {
4004 	/* ASSERT current == mdev->receiver ... */
4005 	struct p_handshake *p = &mdev->data.rbuf.handshake;
4006 	const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
4007 	unsigned int length;
4008 	enum drbd_packets cmd;
4009 	int rv;
4010 
4011 	rv = drbd_send_handshake(mdev);
4012 	if (!rv)
4013 		return 0;
4014 
4015 	rv = drbd_recv_header(mdev, &cmd, &length);
4016 	if (!rv)
4017 		return 0;
4018 
4019 	if (cmd != P_HAND_SHAKE) {
4020 		dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
4021 		     cmdname(cmd), cmd);
4022 		return -1;
4023 	}
4024 
4025 	if (length != expect) {
4026 		dev_err(DEV, "expected HandShake length: %u, received: %u\n",
4027 		     expect, length);
4028 		return -1;
4029 	}
4030 
4031 	rv = drbd_recv(mdev, &p->head.payload, expect);
4032 
4033 	if (rv != expect) {
4034 		if (!signal_pending(current))
4035 			dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
4036 		return 0;
4037 	}
4038 
4039 	p->protocol_min = be32_to_cpu(p->protocol_min);
4040 	p->protocol_max = be32_to_cpu(p->protocol_max);
4041 	if (p->protocol_max == 0)
4042 		p->protocol_max = p->protocol_min;
4043 
4044 	if (PRO_VERSION_MAX < p->protocol_min ||
4045 	    PRO_VERSION_MIN > p->protocol_max)
4046 		goto incompat;
4047 
4048 	mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4049 
4050 	dev_info(DEV, "Handshake successful: "
4051 	     "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4052 
4053 	return 1;
4054 
4055  incompat:
4056 	dev_err(DEV, "incompatible DRBD dialects: "
4057 	    "I support %d-%d, peer supports %d-%d\n",
4058 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
4059 	    p->protocol_min, p->protocol_max);
4060 	return -1;
4061 }
4062 
4063 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4064 static int drbd_do_auth(struct drbd_conf *mdev)
4065 {
4066 	dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4067 	dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4068 	return -1;
4069 }
4070 #else
4071 #define CHALLENGE_LEN 64
4072 
4073 /* Return value:
4074 	1 - auth succeeded,
4075 	0 - failed, try again (network error),
4076 	-1 - auth failed, don't try again.
4077 */
4078 
4079 static int drbd_do_auth(struct drbd_conf *mdev)
4080 {
4081 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4082 	struct scatterlist sg;
4083 	char *response = NULL;
4084 	char *right_response = NULL;
4085 	char *peers_ch = NULL;
4086 	unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4087 	unsigned int resp_size;
4088 	struct hash_desc desc;
4089 	enum drbd_packets cmd;
4090 	unsigned int length;
4091 	int rv;
4092 
4093 	desc.tfm = mdev->cram_hmac_tfm;
4094 	desc.flags = 0;
4095 
4096 	rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4097 				(u8 *)mdev->net_conf->shared_secret, key_len);
4098 	if (rv) {
4099 		dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4100 		rv = -1;
4101 		goto fail;
4102 	}
4103 
4104 	get_random_bytes(my_challenge, CHALLENGE_LEN);
4105 
4106 	rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4107 	if (!rv)
4108 		goto fail;
4109 
4110 	rv = drbd_recv_header(mdev, &cmd, &length);
4111 	if (!rv)
4112 		goto fail;
4113 
4114 	if (cmd != P_AUTH_CHALLENGE) {
4115 		dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4116 		    cmdname(cmd), cmd);
4117 		rv = 0;
4118 		goto fail;
4119 	}
4120 
4121 	if (length > CHALLENGE_LEN * 2) {
4122 		dev_err(DEV, "expected AuthChallenge payload too big.\n");
4123 		rv = -1;
4124 		goto fail;
4125 	}
4126 
4127 	peers_ch = kmalloc(length, GFP_NOIO);
4128 	if (peers_ch == NULL) {
4129 		dev_err(DEV, "kmalloc of peers_ch failed\n");
4130 		rv = -1;
4131 		goto fail;
4132 	}
4133 
4134 	rv = drbd_recv(mdev, peers_ch, length);
4135 
4136 	if (rv != length) {
4137 		if (!signal_pending(current))
4138 			dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
4139 		rv = 0;
4140 		goto fail;
4141 	}
4142 
4143 	resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4144 	response = kmalloc(resp_size, GFP_NOIO);
4145 	if (response == NULL) {
4146 		dev_err(DEV, "kmalloc of response failed\n");
4147 		rv = -1;
4148 		goto fail;
4149 	}
4150 
4151 	sg_init_table(&sg, 1);
4152 	sg_set_buf(&sg, peers_ch, length);
4153 
4154 	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4155 	if (rv) {
4156 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4157 		rv = -1;
4158 		goto fail;
4159 	}
4160 
4161 	rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4162 	if (!rv)
4163 		goto fail;
4164 
4165 	rv = drbd_recv_header(mdev, &cmd, &length);
4166 	if (!rv)
4167 		goto fail;
4168 
4169 	if (cmd != P_AUTH_RESPONSE) {
4170 		dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4171 			cmdname(cmd), cmd);
4172 		rv = 0;
4173 		goto fail;
4174 	}
4175 
4176 	if (length != resp_size) {
4177 		dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4178 		rv = 0;
4179 		goto fail;
4180 	}
4181 
4182 	rv = drbd_recv(mdev, response , resp_size);
4183 
4184 	if (rv != resp_size) {
4185 		if (!signal_pending(current))
4186 			dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4187 		rv = 0;
4188 		goto fail;
4189 	}
4190 
4191 	right_response = kmalloc(resp_size, GFP_NOIO);
4192 	if (right_response == NULL) {
4193 		dev_err(DEV, "kmalloc of right_response failed\n");
4194 		rv = -1;
4195 		goto fail;
4196 	}
4197 
4198 	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4199 
4200 	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4201 	if (rv) {
4202 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4203 		rv = -1;
4204 		goto fail;
4205 	}
4206 
4207 	rv = !memcmp(response, right_response, resp_size);
4208 
4209 	if (rv)
4210 		dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4211 		     resp_size, mdev->net_conf->cram_hmac_alg);
4212 	else
4213 		rv = -1;
4214 
4215  fail:
4216 	kfree(peers_ch);
4217 	kfree(response);
4218 	kfree(right_response);
4219 
4220 	return rv;
4221 }
4222 #endif
4223 
4224 int drbdd_init(struct drbd_thread *thi)
4225 {
4226 	struct drbd_conf *mdev = thi->mdev;
4227 	unsigned int minor = mdev_to_minor(mdev);
4228 	int h;
4229 
4230 	sprintf(current->comm, "drbd%d_receiver", minor);
4231 
4232 	dev_info(DEV, "receiver (re)started\n");
4233 
4234 	do {
4235 		h = drbd_connect(mdev);
4236 		if (h == 0) {
4237 			drbd_disconnect(mdev);
4238 			schedule_timeout_interruptible(HZ);
4239 		}
4240 		if (h == -1) {
4241 			dev_warn(DEV, "Discarding network configuration.\n");
4242 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4243 		}
4244 	} while (h == 0);
4245 
4246 	if (h > 0) {
4247 		if (get_net_conf(mdev)) {
4248 			drbdd(mdev);
4249 			put_net_conf(mdev);
4250 		}
4251 	}
4252 
4253 	drbd_disconnect(mdev);
4254 
4255 	dev_info(DEV, "receiver terminated\n");
4256 	return 0;
4257 }
4258 
4259 /* ********* acknowledge sender ******** */
4260 
4261 static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4262 {
4263 	struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4264 
4265 	int retcode = be32_to_cpu(p->retcode);
4266 
4267 	if (retcode >= SS_SUCCESS) {
4268 		set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4269 	} else {
4270 		set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4271 		dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4272 		    drbd_set_st_err_str(retcode), retcode);
4273 	}
4274 	wake_up(&mdev->state_wait);
4275 
4276 	return true;
4277 }
4278 
4279 static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4280 {
4281 	return drbd_send_ping_ack(mdev);
4282 
4283 }
4284 
4285 static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4286 {
4287 	/* restore idle timeout */
4288 	mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4289 	if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4290 		wake_up(&mdev->misc_wait);
4291 
4292 	return true;
4293 }
4294 
4295 static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4296 {
4297 	struct p_block_ack *p = (struct p_block_ack *)h;
4298 	sector_t sector = be64_to_cpu(p->sector);
4299 	int blksize = be32_to_cpu(p->blksize);
4300 
4301 	D_ASSERT(mdev->agreed_pro_version >= 89);
4302 
4303 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4304 
4305 	if (get_ldev(mdev)) {
4306 		drbd_rs_complete_io(mdev, sector);
4307 		drbd_set_in_sync(mdev, sector, blksize);
4308 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4309 		mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4310 		put_ldev(mdev);
4311 	}
4312 	dec_rs_pending(mdev);
4313 	atomic_add(blksize >> 9, &mdev->rs_sect_in);
4314 
4315 	return true;
4316 }
4317 
4318 /* when we receive the ACK for a write request,
4319  * verify that we actually know about it */
4320 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4321 	u64 id, sector_t sector)
4322 {
4323 	struct hlist_head *slot = tl_hash_slot(mdev, sector);
4324 	struct hlist_node *n;
4325 	struct drbd_request *req;
4326 
4327 	hlist_for_each_entry(req, n, slot, collision) {
4328 		if ((unsigned long)req == (unsigned long)id) {
4329 			if (req->sector != sector) {
4330 				dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4331 				    "wrong sector (%llus versus %llus)\n", req,
4332 				    (unsigned long long)req->sector,
4333 				    (unsigned long long)sector);
4334 				break;
4335 			}
4336 			return req;
4337 		}
4338 	}
4339 	return NULL;
4340 }
4341 
4342 typedef struct drbd_request *(req_validator_fn)
4343 	(struct drbd_conf *mdev, u64 id, sector_t sector);
4344 
4345 static int validate_req_change_req_state(struct drbd_conf *mdev,
4346 	u64 id, sector_t sector, req_validator_fn validator,
4347 	const char *func, enum drbd_req_event what)
4348 {
4349 	struct drbd_request *req;
4350 	struct bio_and_error m;
4351 
4352 	spin_lock_irq(&mdev->req_lock);
4353 	req = validator(mdev, id, sector);
4354 	if (unlikely(!req)) {
4355 		spin_unlock_irq(&mdev->req_lock);
4356 
4357 		dev_err(DEV, "%s: failed to find req %p, sector %llus\n", func,
4358 			(void *)(unsigned long)id, (unsigned long long)sector);
4359 		return false;
4360 	}
4361 	__req_mod(req, what, &m);
4362 	spin_unlock_irq(&mdev->req_lock);
4363 
4364 	if (m.bio)
4365 		complete_master_bio(mdev, &m);
4366 	return true;
4367 }
4368 
4369 static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4370 {
4371 	struct p_block_ack *p = (struct p_block_ack *)h;
4372 	sector_t sector = be64_to_cpu(p->sector);
4373 	int blksize = be32_to_cpu(p->blksize);
4374 	enum drbd_req_event what;
4375 
4376 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4377 
4378 	if (is_syncer_block_id(p->block_id)) {
4379 		drbd_set_in_sync(mdev, sector, blksize);
4380 		dec_rs_pending(mdev);
4381 		return true;
4382 	}
4383 	switch (be16_to_cpu(h->command)) {
4384 	case P_RS_WRITE_ACK:
4385 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4386 		what = write_acked_by_peer_and_sis;
4387 		break;
4388 	case P_WRITE_ACK:
4389 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4390 		what = write_acked_by_peer;
4391 		break;
4392 	case P_RECV_ACK:
4393 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4394 		what = recv_acked_by_peer;
4395 		break;
4396 	case P_DISCARD_ACK:
4397 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4398 		what = conflict_discarded_by_peer;
4399 		break;
4400 	default:
4401 		D_ASSERT(0);
4402 		return false;
4403 	}
4404 
4405 	return validate_req_change_req_state(mdev, p->block_id, sector,
4406 		_ack_id_to_req, __func__ , what);
4407 }
4408 
4409 static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4410 {
4411 	struct p_block_ack *p = (struct p_block_ack *)h;
4412 	sector_t sector = be64_to_cpu(p->sector);
4413 	int size = be32_to_cpu(p->blksize);
4414 	struct drbd_request *req;
4415 	struct bio_and_error m;
4416 
4417 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4418 
4419 	if (is_syncer_block_id(p->block_id)) {
4420 		dec_rs_pending(mdev);
4421 		drbd_rs_failed_io(mdev, sector, size);
4422 		return true;
4423 	}
4424 
4425 	spin_lock_irq(&mdev->req_lock);
4426 	req = _ack_id_to_req(mdev, p->block_id, sector);
4427 	if (!req) {
4428 		spin_unlock_irq(&mdev->req_lock);
4429 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A ||
4430 		    mdev->net_conf->wire_protocol == DRBD_PROT_B) {
4431 			/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4432 			   The master bio might already be completed, therefore the
4433 			   request is no longer in the collision hash.
4434 			   => Do not try to validate block_id as request. */
4435 			/* In Protocol B we might already have got a P_RECV_ACK
4436 			   but then get a P_NEG_ACK after wards. */
4437 			drbd_set_out_of_sync(mdev, sector, size);
4438 			return true;
4439 		} else {
4440 			dev_err(DEV, "%s: failed to find req %p, sector %llus\n", __func__,
4441 				(void *)(unsigned long)p->block_id, (unsigned long long)sector);
4442 			return false;
4443 		}
4444 	}
4445 	__req_mod(req, neg_acked, &m);
4446 	spin_unlock_irq(&mdev->req_lock);
4447 
4448 	if (m.bio)
4449 		complete_master_bio(mdev, &m);
4450 	return true;
4451 }
4452 
4453 static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4454 {
4455 	struct p_block_ack *p = (struct p_block_ack *)h;
4456 	sector_t sector = be64_to_cpu(p->sector);
4457 
4458 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4459 	dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4460 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
4461 
4462 	return validate_req_change_req_state(mdev, p->block_id, sector,
4463 		_ar_id_to_req, __func__ , neg_acked);
4464 }
4465 
4466 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4467 {
4468 	sector_t sector;
4469 	int size;
4470 	struct p_block_ack *p = (struct p_block_ack *)h;
4471 
4472 	sector = be64_to_cpu(p->sector);
4473 	size = be32_to_cpu(p->blksize);
4474 
4475 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4476 
4477 	dec_rs_pending(mdev);
4478 
4479 	if (get_ldev_if_state(mdev, D_FAILED)) {
4480 		drbd_rs_complete_io(mdev, sector);
4481 		switch (be16_to_cpu(h->command)) {
4482 		case P_NEG_RS_DREPLY:
4483 			drbd_rs_failed_io(mdev, sector, size);
4484 		case P_RS_CANCEL:
4485 			break;
4486 		default:
4487 			D_ASSERT(0);
4488 			put_ldev(mdev);
4489 			return false;
4490 		}
4491 		put_ldev(mdev);
4492 	}
4493 
4494 	return true;
4495 }
4496 
4497 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4498 {
4499 	struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4500 
4501 	tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4502 
4503 	if (mdev->state.conn == C_AHEAD &&
4504 	    atomic_read(&mdev->ap_in_flight) == 0 &&
4505 	    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
4506 		mdev->start_resync_timer.expires = jiffies + HZ;
4507 		add_timer(&mdev->start_resync_timer);
4508 	}
4509 
4510 	return true;
4511 }
4512 
4513 static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4514 {
4515 	struct p_block_ack *p = (struct p_block_ack *)h;
4516 	struct drbd_work *w;
4517 	sector_t sector;
4518 	int size;
4519 
4520 	sector = be64_to_cpu(p->sector);
4521 	size = be32_to_cpu(p->blksize);
4522 
4523 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4524 
4525 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4526 		drbd_ov_oos_found(mdev, sector, size);
4527 	else
4528 		ov_oos_print(mdev);
4529 
4530 	if (!get_ldev(mdev))
4531 		return true;
4532 
4533 	drbd_rs_complete_io(mdev, sector);
4534 	dec_rs_pending(mdev);
4535 
4536 	--mdev->ov_left;
4537 
4538 	/* let's advance progress step marks only for every other megabyte */
4539 	if ((mdev->ov_left & 0x200) == 0x200)
4540 		drbd_advance_rs_marks(mdev, mdev->ov_left);
4541 
4542 	if (mdev->ov_left == 0) {
4543 		w = kmalloc(sizeof(*w), GFP_NOIO);
4544 		if (w) {
4545 			w->cb = w_ov_finished;
4546 			drbd_queue_work_front(&mdev->data.work, w);
4547 		} else {
4548 			dev_err(DEV, "kmalloc(w) failed.");
4549 			ov_oos_print(mdev);
4550 			drbd_resync_finished(mdev);
4551 		}
4552 	}
4553 	put_ldev(mdev);
4554 	return true;
4555 }
4556 
4557 static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4558 {
4559 	return true;
4560 }
4561 
4562 struct asender_cmd {
4563 	size_t pkt_size;
4564 	int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4565 };
4566 
4567 static struct asender_cmd *get_asender_cmd(int cmd)
4568 {
4569 	static struct asender_cmd asender_tbl[] = {
4570 		/* anything missing from this table is in
4571 		 * the drbd_cmd_handler (drbd_default_handler) table,
4572 		 * see the beginning of drbdd() */
4573 	[P_PING]	    = { sizeof(struct p_header80), got_Ping },
4574 	[P_PING_ACK]	    = { sizeof(struct p_header80), got_PingAck },
4575 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4576 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4577 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4578 	[P_DISCARD_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4579 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
4580 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
4581 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4582 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
4583 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
4584 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4585 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4586 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4587 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply},
4588 	[P_MAX_CMD]	    = { 0, NULL },
4589 	};
4590 	if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4591 		return NULL;
4592 	return &asender_tbl[cmd];
4593 }
4594 
4595 int drbd_asender(struct drbd_thread *thi)
4596 {
4597 	struct drbd_conf *mdev = thi->mdev;
4598 	struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4599 	struct asender_cmd *cmd = NULL;
4600 
4601 	int rv, len;
4602 	void *buf    = h;
4603 	int received = 0;
4604 	int expect   = sizeof(struct p_header80);
4605 	int empty;
4606 	int ping_timeout_active = 0;
4607 
4608 	sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4609 
4610 	current->policy = SCHED_RR;  /* Make this a realtime task! */
4611 	current->rt_priority = 2;    /* more important than all other tasks */
4612 
4613 	while (get_t_state(thi) == Running) {
4614 		drbd_thread_current_set_cpu(mdev);
4615 		if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4616 			ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4617 			mdev->meta.socket->sk->sk_rcvtimeo =
4618 				mdev->net_conf->ping_timeo*HZ/10;
4619 			ping_timeout_active = 1;
4620 		}
4621 
4622 		/* conditionally cork;
4623 		 * it may hurt latency if we cork without much to send */
4624 		if (!mdev->net_conf->no_cork &&
4625 			3 < atomic_read(&mdev->unacked_cnt))
4626 			drbd_tcp_cork(mdev->meta.socket);
4627 		while (1) {
4628 			clear_bit(SIGNAL_ASENDER, &mdev->flags);
4629 			flush_signals(current);
4630 			if (!drbd_process_done_ee(mdev))
4631 				goto reconnect;
4632 			/* to avoid race with newly queued ACKs */
4633 			set_bit(SIGNAL_ASENDER, &mdev->flags);
4634 			spin_lock_irq(&mdev->req_lock);
4635 			empty = list_empty(&mdev->done_ee);
4636 			spin_unlock_irq(&mdev->req_lock);
4637 			/* new ack may have been queued right here,
4638 			 * but then there is also a signal pending,
4639 			 * and we start over... */
4640 			if (empty)
4641 				break;
4642 		}
4643 		/* but unconditionally uncork unless disabled */
4644 		if (!mdev->net_conf->no_cork)
4645 			drbd_tcp_uncork(mdev->meta.socket);
4646 
4647 		/* short circuit, recv_msg would return EINTR anyways. */
4648 		if (signal_pending(current))
4649 			continue;
4650 
4651 		rv = drbd_recv_short(mdev, mdev->meta.socket,
4652 				     buf, expect-received, 0);
4653 		clear_bit(SIGNAL_ASENDER, &mdev->flags);
4654 
4655 		flush_signals(current);
4656 
4657 		/* Note:
4658 		 * -EINTR	 (on meta) we got a signal
4659 		 * -EAGAIN	 (on meta) rcvtimeo expired
4660 		 * -ECONNRESET	 other side closed the connection
4661 		 * -ERESTARTSYS  (on data) we got a signal
4662 		 * rv <  0	 other than above: unexpected error!
4663 		 * rv == expected: full header or command
4664 		 * rv <  expected: "woken" by signal during receive
4665 		 * rv == 0	 : "connection shut down by peer"
4666 		 */
4667 		if (likely(rv > 0)) {
4668 			received += rv;
4669 			buf	 += rv;
4670 		} else if (rv == 0) {
4671 			dev_err(DEV, "meta connection shut down by peer.\n");
4672 			goto reconnect;
4673 		} else if (rv == -EAGAIN) {
4674 			/* If the data socket received something meanwhile,
4675 			 * that is good enough: peer is still alive. */
4676 			if (time_after(mdev->last_received,
4677 				jiffies - mdev->meta.socket->sk->sk_rcvtimeo))
4678 				continue;
4679 			if (ping_timeout_active) {
4680 				dev_err(DEV, "PingAck did not arrive in time.\n");
4681 				goto reconnect;
4682 			}
4683 			set_bit(SEND_PING, &mdev->flags);
4684 			continue;
4685 		} else if (rv == -EINTR) {
4686 			continue;
4687 		} else {
4688 			dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4689 			goto reconnect;
4690 		}
4691 
4692 		if (received == expect && cmd == NULL) {
4693 			if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4694 				dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4695 				    be32_to_cpu(h->magic),
4696 				    be16_to_cpu(h->command),
4697 				    be16_to_cpu(h->length));
4698 				goto reconnect;
4699 			}
4700 			cmd = get_asender_cmd(be16_to_cpu(h->command));
4701 			len = be16_to_cpu(h->length);
4702 			if (unlikely(cmd == NULL)) {
4703 				dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4704 				    be32_to_cpu(h->magic),
4705 				    be16_to_cpu(h->command),
4706 				    be16_to_cpu(h->length));
4707 				goto disconnect;
4708 			}
4709 			expect = cmd->pkt_size;
4710 			ERR_IF(len != expect-sizeof(struct p_header80))
4711 				goto reconnect;
4712 		}
4713 		if (received == expect) {
4714 			mdev->last_received = jiffies;
4715 			D_ASSERT(cmd != NULL);
4716 			if (!cmd->process(mdev, h))
4717 				goto reconnect;
4718 
4719 			/* the idle_timeout (ping-int)
4720 			 * has been restored in got_PingAck() */
4721 			if (cmd == get_asender_cmd(P_PING_ACK))
4722 				ping_timeout_active = 0;
4723 
4724 			buf	 = h;
4725 			received = 0;
4726 			expect	 = sizeof(struct p_header80);
4727 			cmd	 = NULL;
4728 		}
4729 	}
4730 
4731 	if (0) {
4732 reconnect:
4733 		drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4734 		drbd_md_sync(mdev);
4735 	}
4736 	if (0) {
4737 disconnect:
4738 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4739 		drbd_md_sync(mdev);
4740 	}
4741 	clear_bit(SIGNAL_ASENDER, &mdev->flags);
4742 
4743 	D_ASSERT(mdev->state.conn < C_CONNECTED);
4744 	dev_info(DEV, "asender terminated\n");
4745 
4746 	return 0;
4747 }
4748