1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/smp_lock.h>
40 #include <linux/pkt_sched.h>
41 #define __KERNEL_SYSCALLS__
42 #include <linux/unistd.h>
43 #include <linux/vmalloc.h>
44 #include <linux/random.h>
45 #include <linux/mm.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_req.h"
50 
51 #include "drbd_vli.h"
52 
53 struct flush_work {
54 	struct drbd_work w;
55 	struct drbd_epoch *epoch;
56 };
57 
58 enum finish_epoch {
59 	FE_STILL_LIVE,
60 	FE_DESTROYED,
61 	FE_RECYCLED,
62 };
63 
64 static int drbd_do_handshake(struct drbd_conf *mdev);
65 static int drbd_do_auth(struct drbd_conf *mdev);
66 
67 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
68 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
69 
70 static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
71 {
72 	struct drbd_epoch *prev;
73 	spin_lock(&mdev->epoch_lock);
74 	prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
75 	if (prev == epoch || prev == mdev->current_epoch)
76 		prev = NULL;
77 	spin_unlock(&mdev->epoch_lock);
78 	return prev;
79 }
80 
81 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
82 
83 static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev)
84 {
85 	struct page *page = NULL;
86 
87 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
88 	 * So what. It saves a spin_lock. */
89 	if (drbd_pp_vacant > 0) {
90 		spin_lock(&drbd_pp_lock);
91 		page = drbd_pp_pool;
92 		if (page) {
93 			drbd_pp_pool = (struct page *)page_private(page);
94 			set_page_private(page, 0); /* just to be polite */
95 			drbd_pp_vacant--;
96 		}
97 		spin_unlock(&drbd_pp_lock);
98 	}
99 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
100 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
101 	 * which in turn might block on the other node at this very place.  */
102 	if (!page)
103 		page = alloc_page(GFP_TRY);
104 	if (page)
105 		atomic_inc(&mdev->pp_in_use);
106 	return page;
107 }
108 
109 /* kick lower level device, if we have more than (arbitrary number)
110  * reference counts on it, which typically are locally submitted io
111  * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
112 static void maybe_kick_lo(struct drbd_conf *mdev)
113 {
114 	if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
115 		drbd_kick_lo(mdev);
116 }
117 
118 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
119 {
120 	struct drbd_epoch_entry *e;
121 	struct list_head *le, *tle;
122 
123 	/* The EEs are always appended to the end of the list. Since
124 	   they are sent in order over the wire, they have to finish
125 	   in order. As soon as we see the first not finished we can
126 	   stop to examine the list... */
127 
128 	list_for_each_safe(le, tle, &mdev->net_ee) {
129 		e = list_entry(le, struct drbd_epoch_entry, w.list);
130 		if (drbd_bio_has_active_page(e->private_bio))
131 			break;
132 		list_move(le, to_be_freed);
133 	}
134 }
135 
136 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
137 {
138 	LIST_HEAD(reclaimed);
139 	struct drbd_epoch_entry *e, *t;
140 
141 	maybe_kick_lo(mdev);
142 	spin_lock_irq(&mdev->req_lock);
143 	reclaim_net_ee(mdev, &reclaimed);
144 	spin_unlock_irq(&mdev->req_lock);
145 
146 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
147 		drbd_free_ee(mdev, e);
148 }
149 
150 /**
151  * drbd_pp_alloc() - Returns a page, fails only if a signal comes in
152  * @mdev:	DRBD device.
153  * @retry:	whether or not to retry allocation forever (or until signalled)
154  *
155  * Tries to allocate a page, first from our own page pool, then from the
156  * kernel, unless this allocation would exceed the max_buffers setting.
157  * If @retry is non-zero, retry until DRBD frees a page somewhere else.
158  */
159 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
160 {
161 	struct page *page = NULL;
162 	DEFINE_WAIT(wait);
163 
164 	if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
165 		page = drbd_pp_first_page_or_try_alloc(mdev);
166 		if (page)
167 			return page;
168 	}
169 
170 	for (;;) {
171 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
172 
173 		drbd_kick_lo_and_reclaim_net(mdev);
174 
175 		if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
176 			page = drbd_pp_first_page_or_try_alloc(mdev);
177 			if (page)
178 				break;
179 		}
180 
181 		if (!retry)
182 			break;
183 
184 		if (signal_pending(current)) {
185 			dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
186 			break;
187 		}
188 
189 		schedule();
190 	}
191 	finish_wait(&drbd_pp_wait, &wait);
192 
193 	return page;
194 }
195 
196 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
197  * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */
198 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
199 {
200 	int free_it;
201 
202 	spin_lock(&drbd_pp_lock);
203 	if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
204 		free_it = 1;
205 	} else {
206 		set_page_private(page, (unsigned long)drbd_pp_pool);
207 		drbd_pp_pool = page;
208 		drbd_pp_vacant++;
209 		free_it = 0;
210 	}
211 	spin_unlock(&drbd_pp_lock);
212 
213 	atomic_dec(&mdev->pp_in_use);
214 
215 	if (free_it)
216 		__free_page(page);
217 
218 	wake_up(&drbd_pp_wait);
219 }
220 
221 static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
222 {
223 	struct page *p_to_be_freed = NULL;
224 	struct page *page;
225 	struct bio_vec *bvec;
226 	int i;
227 
228 	spin_lock(&drbd_pp_lock);
229 	__bio_for_each_segment(bvec, bio, i, 0) {
230 		if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
231 			set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
232 			p_to_be_freed = bvec->bv_page;
233 		} else {
234 			set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
235 			drbd_pp_pool = bvec->bv_page;
236 			drbd_pp_vacant++;
237 		}
238 	}
239 	spin_unlock(&drbd_pp_lock);
240 	atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
241 
242 	while (p_to_be_freed) {
243 		page = p_to_be_freed;
244 		p_to_be_freed = (struct page *)page_private(page);
245 		set_page_private(page, 0); /* just to be polite */
246 		put_page(page);
247 	}
248 
249 	wake_up(&drbd_pp_wait);
250 }
251 
252 /*
253 You need to hold the req_lock:
254  _drbd_wait_ee_list_empty()
255 
256 You must not have the req_lock:
257  drbd_free_ee()
258  drbd_alloc_ee()
259  drbd_init_ee()
260  drbd_release_ee()
261  drbd_ee_fix_bhs()
262  drbd_process_done_ee()
263  drbd_clear_done_ee()
264  drbd_wait_ee_list_empty()
265 */
266 
267 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
268 				     u64 id,
269 				     sector_t sector,
270 				     unsigned int data_size,
271 				     gfp_t gfp_mask) __must_hold(local)
272 {
273 	struct request_queue *q;
274 	struct drbd_epoch_entry *e;
275 	struct page *page;
276 	struct bio *bio;
277 	unsigned int ds;
278 
279 	if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
280 		return NULL;
281 
282 	e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
283 	if (!e) {
284 		if (!(gfp_mask & __GFP_NOWARN))
285 			dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
286 		return NULL;
287 	}
288 
289 	bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE));
290 	if (!bio) {
291 		if (!(gfp_mask & __GFP_NOWARN))
292 			dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
293 		goto fail1;
294 	}
295 
296 	bio->bi_bdev = mdev->ldev->backing_bdev;
297 	bio->bi_sector = sector;
298 
299 	ds = data_size;
300 	while (ds) {
301 		page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
302 		if (!page) {
303 			if (!(gfp_mask & __GFP_NOWARN))
304 				dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
305 			goto fail2;
306 		}
307 		if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
308 			drbd_pp_free(mdev, page);
309 			dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
310 			    "data_size=%u,ds=%u) failed\n",
311 			    (unsigned long long)sector, data_size, ds);
312 
313 			q = bdev_get_queue(bio->bi_bdev);
314 			if (q->merge_bvec_fn) {
315 				struct bvec_merge_data bvm = {
316 					.bi_bdev = bio->bi_bdev,
317 					.bi_sector = bio->bi_sector,
318 					.bi_size = bio->bi_size,
319 					.bi_rw = bio->bi_rw,
320 				};
321 				int l = q->merge_bvec_fn(q, &bvm,
322 						&bio->bi_io_vec[bio->bi_vcnt]);
323 				dev_err(DEV, "merge_bvec_fn() = %d\n", l);
324 			}
325 
326 			/* dump more of the bio. */
327 			dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
328 			dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
329 			dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
330 			dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
331 
332 			goto fail2;
333 			break;
334 		}
335 		ds -= min_t(int, ds, PAGE_SIZE);
336 	}
337 
338 	D_ASSERT(data_size == bio->bi_size);
339 
340 	bio->bi_private = e;
341 	e->mdev = mdev;
342 	e->sector = sector;
343 	e->size = bio->bi_size;
344 
345 	e->private_bio = bio;
346 	e->block_id = id;
347 	INIT_HLIST_NODE(&e->colision);
348 	e->epoch = NULL;
349 	e->flags = 0;
350 
351 	return e;
352 
353  fail2:
354 	drbd_pp_free_bio_pages(mdev, bio);
355 	bio_put(bio);
356  fail1:
357 	mempool_free(e, drbd_ee_mempool);
358 
359 	return NULL;
360 }
361 
362 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
363 {
364 	struct bio *bio = e->private_bio;
365 	drbd_pp_free_bio_pages(mdev, bio);
366 	bio_put(bio);
367 	D_ASSERT(hlist_unhashed(&e->colision));
368 	mempool_free(e, drbd_ee_mempool);
369 }
370 
371 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
372 {
373 	LIST_HEAD(work_list);
374 	struct drbd_epoch_entry *e, *t;
375 	int count = 0;
376 
377 	spin_lock_irq(&mdev->req_lock);
378 	list_splice_init(list, &work_list);
379 	spin_unlock_irq(&mdev->req_lock);
380 
381 	list_for_each_entry_safe(e, t, &work_list, w.list) {
382 		drbd_free_ee(mdev, e);
383 		count++;
384 	}
385 	return count;
386 }
387 
388 
389 /*
390  * This function is called from _asender only_
391  * but see also comments in _req_mod(,barrier_acked)
392  * and receive_Barrier.
393  *
394  * Move entries from net_ee to done_ee, if ready.
395  * Grab done_ee, call all callbacks, free the entries.
396  * The callbacks typically send out ACKs.
397  */
398 static int drbd_process_done_ee(struct drbd_conf *mdev)
399 {
400 	LIST_HEAD(work_list);
401 	LIST_HEAD(reclaimed);
402 	struct drbd_epoch_entry *e, *t;
403 	int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
404 
405 	spin_lock_irq(&mdev->req_lock);
406 	reclaim_net_ee(mdev, &reclaimed);
407 	list_splice_init(&mdev->done_ee, &work_list);
408 	spin_unlock_irq(&mdev->req_lock);
409 
410 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
411 		drbd_free_ee(mdev, e);
412 
413 	/* possible callbacks here:
414 	 * e_end_block, and e_end_resync_block, e_send_discard_ack.
415 	 * all ignore the last argument.
416 	 */
417 	list_for_each_entry_safe(e, t, &work_list, w.list) {
418 		/* list_del not necessary, next/prev members not touched */
419 		ok = e->w.cb(mdev, &e->w, !ok) && ok;
420 		drbd_free_ee(mdev, e);
421 	}
422 	wake_up(&mdev->ee_wait);
423 
424 	return ok;
425 }
426 
427 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
428 {
429 	DEFINE_WAIT(wait);
430 
431 	/* avoids spin_lock/unlock
432 	 * and calling prepare_to_wait in the fast path */
433 	while (!list_empty(head)) {
434 		prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
435 		spin_unlock_irq(&mdev->req_lock);
436 		drbd_kick_lo(mdev);
437 		schedule();
438 		finish_wait(&mdev->ee_wait, &wait);
439 		spin_lock_irq(&mdev->req_lock);
440 	}
441 }
442 
443 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
444 {
445 	spin_lock_irq(&mdev->req_lock);
446 	_drbd_wait_ee_list_empty(mdev, head);
447 	spin_unlock_irq(&mdev->req_lock);
448 }
449 
450 /* see also kernel_accept; which is only present since 2.6.18.
451  * also we want to log which part of it failed, exactly */
452 static int drbd_accept(struct drbd_conf *mdev, const char **what,
453 		struct socket *sock, struct socket **newsock)
454 {
455 	struct sock *sk = sock->sk;
456 	int err = 0;
457 
458 	*what = "listen";
459 	err = sock->ops->listen(sock, 5);
460 	if (err < 0)
461 		goto out;
462 
463 	*what = "sock_create_lite";
464 	err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
465 			       newsock);
466 	if (err < 0)
467 		goto out;
468 
469 	*what = "accept";
470 	err = sock->ops->accept(sock, *newsock, 0);
471 	if (err < 0) {
472 		sock_release(*newsock);
473 		*newsock = NULL;
474 		goto out;
475 	}
476 	(*newsock)->ops  = sock->ops;
477 
478 out:
479 	return err;
480 }
481 
482 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
483 		    void *buf, size_t size, int flags)
484 {
485 	mm_segment_t oldfs;
486 	struct kvec iov = {
487 		.iov_base = buf,
488 		.iov_len = size,
489 	};
490 	struct msghdr msg = {
491 		.msg_iovlen = 1,
492 		.msg_iov = (struct iovec *)&iov,
493 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
494 	};
495 	int rv;
496 
497 	oldfs = get_fs();
498 	set_fs(KERNEL_DS);
499 	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
500 	set_fs(oldfs);
501 
502 	return rv;
503 }
504 
505 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
506 {
507 	mm_segment_t oldfs;
508 	struct kvec iov = {
509 		.iov_base = buf,
510 		.iov_len = size,
511 	};
512 	struct msghdr msg = {
513 		.msg_iovlen = 1,
514 		.msg_iov = (struct iovec *)&iov,
515 		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL
516 	};
517 	int rv;
518 
519 	oldfs = get_fs();
520 	set_fs(KERNEL_DS);
521 
522 	for (;;) {
523 		rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
524 		if (rv == size)
525 			break;
526 
527 		/* Note:
528 		 * ECONNRESET	other side closed the connection
529 		 * ERESTARTSYS	(on  sock) we got a signal
530 		 */
531 
532 		if (rv < 0) {
533 			if (rv == -ECONNRESET)
534 				dev_info(DEV, "sock was reset by peer\n");
535 			else if (rv != -ERESTARTSYS)
536 				dev_err(DEV, "sock_recvmsg returned %d\n", rv);
537 			break;
538 		} else if (rv == 0) {
539 			dev_info(DEV, "sock was shut down by peer\n");
540 			break;
541 		} else	{
542 			/* signal came in, or peer/link went down,
543 			 * after we read a partial message
544 			 */
545 			/* D_ASSERT(signal_pending(current)); */
546 			break;
547 		}
548 	};
549 
550 	set_fs(oldfs);
551 
552 	if (rv != size)
553 		drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
554 
555 	return rv;
556 }
557 
558 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
559 {
560 	const char *what;
561 	struct socket *sock;
562 	struct sockaddr_in6 src_in6;
563 	int err;
564 	int disconnect_on_error = 1;
565 
566 	if (!get_net_conf(mdev))
567 		return NULL;
568 
569 	what = "sock_create_kern";
570 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
571 		SOCK_STREAM, IPPROTO_TCP, &sock);
572 	if (err < 0) {
573 		sock = NULL;
574 		goto out;
575 	}
576 
577 	sock->sk->sk_rcvtimeo =
578 	sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
579 
580        /* explicitly bind to the configured IP as source IP
581 	*  for the outgoing connections.
582 	*  This is needed for multihomed hosts and to be
583 	*  able to use lo: interfaces for drbd.
584 	* Make sure to use 0 as port number, so linux selects
585 	*  a free one dynamically.
586 	*/
587 	memcpy(&src_in6, mdev->net_conf->my_addr,
588 	       min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
589 	if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
590 		src_in6.sin6_port = 0;
591 	else
592 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
593 
594 	what = "bind before connect";
595 	err = sock->ops->bind(sock,
596 			      (struct sockaddr *) &src_in6,
597 			      mdev->net_conf->my_addr_len);
598 	if (err < 0)
599 		goto out;
600 
601 	/* connect may fail, peer not yet available.
602 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
603 	disconnect_on_error = 0;
604 	what = "connect";
605 	err = sock->ops->connect(sock,
606 				 (struct sockaddr *)mdev->net_conf->peer_addr,
607 				 mdev->net_conf->peer_addr_len, 0);
608 
609 out:
610 	if (err < 0) {
611 		if (sock) {
612 			sock_release(sock);
613 			sock = NULL;
614 		}
615 		switch (-err) {
616 			/* timeout, busy, signal pending */
617 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
618 		case EINTR: case ERESTARTSYS:
619 			/* peer not (yet) available, network problem */
620 		case ECONNREFUSED: case ENETUNREACH:
621 		case EHOSTDOWN:    case EHOSTUNREACH:
622 			disconnect_on_error = 0;
623 			break;
624 		default:
625 			dev_err(DEV, "%s failed, err = %d\n", what, err);
626 		}
627 		if (disconnect_on_error)
628 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
629 	}
630 	put_net_conf(mdev);
631 	return sock;
632 }
633 
634 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
635 {
636 	int timeo, err;
637 	struct socket *s_estab = NULL, *s_listen;
638 	const char *what;
639 
640 	if (!get_net_conf(mdev))
641 		return NULL;
642 
643 	what = "sock_create_kern";
644 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
645 		SOCK_STREAM, IPPROTO_TCP, &s_listen);
646 	if (err) {
647 		s_listen = NULL;
648 		goto out;
649 	}
650 
651 	timeo = mdev->net_conf->try_connect_int * HZ;
652 	timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
653 
654 	s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
655 	s_listen->sk->sk_rcvtimeo = timeo;
656 	s_listen->sk->sk_sndtimeo = timeo;
657 
658 	what = "bind before listen";
659 	err = s_listen->ops->bind(s_listen,
660 			      (struct sockaddr *) mdev->net_conf->my_addr,
661 			      mdev->net_conf->my_addr_len);
662 	if (err < 0)
663 		goto out;
664 
665 	err = drbd_accept(mdev, &what, s_listen, &s_estab);
666 
667 out:
668 	if (s_listen)
669 		sock_release(s_listen);
670 	if (err < 0) {
671 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
672 			dev_err(DEV, "%s failed, err = %d\n", what, err);
673 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
674 		}
675 	}
676 	put_net_conf(mdev);
677 
678 	return s_estab;
679 }
680 
681 static int drbd_send_fp(struct drbd_conf *mdev,
682 	struct socket *sock, enum drbd_packets cmd)
683 {
684 	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
685 
686 	return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
687 }
688 
689 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
690 {
691 	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
692 	int rr;
693 
694 	rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
695 
696 	if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
697 		return be16_to_cpu(h->command);
698 
699 	return 0xffff;
700 }
701 
702 /**
703  * drbd_socket_okay() - Free the socket if its connection is not okay
704  * @mdev:	DRBD device.
705  * @sock:	pointer to the pointer to the socket.
706  */
707 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
708 {
709 	int rr;
710 	char tb[4];
711 
712 	if (!*sock)
713 		return FALSE;
714 
715 	rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
716 
717 	if (rr > 0 || rr == -EAGAIN) {
718 		return TRUE;
719 	} else {
720 		sock_release(*sock);
721 		*sock = NULL;
722 		return FALSE;
723 	}
724 }
725 
726 /*
727  * return values:
728  *   1 yes, we have a valid connection
729  *   0 oops, did not work out, please try again
730  *  -1 peer talks different language,
731  *     no point in trying again, please go standalone.
732  *  -2 We do not have a network config...
733  */
734 static int drbd_connect(struct drbd_conf *mdev)
735 {
736 	struct socket *s, *sock, *msock;
737 	int try, h, ok;
738 
739 	D_ASSERT(!mdev->data.socket);
740 
741 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
742 		dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
743 
744 	if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
745 		return -2;
746 
747 	clear_bit(DISCARD_CONCURRENT, &mdev->flags);
748 
749 	sock  = NULL;
750 	msock = NULL;
751 
752 	do {
753 		for (try = 0;;) {
754 			/* 3 tries, this should take less than a second! */
755 			s = drbd_try_connect(mdev);
756 			if (s || ++try >= 3)
757 				break;
758 			/* give the other side time to call bind() & listen() */
759 			__set_current_state(TASK_INTERRUPTIBLE);
760 			schedule_timeout(HZ / 10);
761 		}
762 
763 		if (s) {
764 			if (!sock) {
765 				drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
766 				sock = s;
767 				s = NULL;
768 			} else if (!msock) {
769 				drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
770 				msock = s;
771 				s = NULL;
772 			} else {
773 				dev_err(DEV, "Logic error in drbd_connect()\n");
774 				goto out_release_sockets;
775 			}
776 		}
777 
778 		if (sock && msock) {
779 			__set_current_state(TASK_INTERRUPTIBLE);
780 			schedule_timeout(HZ / 10);
781 			ok = drbd_socket_okay(mdev, &sock);
782 			ok = drbd_socket_okay(mdev, &msock) && ok;
783 			if (ok)
784 				break;
785 		}
786 
787 retry:
788 		s = drbd_wait_for_connect(mdev);
789 		if (s) {
790 			try = drbd_recv_fp(mdev, s);
791 			drbd_socket_okay(mdev, &sock);
792 			drbd_socket_okay(mdev, &msock);
793 			switch (try) {
794 			case P_HAND_SHAKE_S:
795 				if (sock) {
796 					dev_warn(DEV, "initial packet S crossed\n");
797 					sock_release(sock);
798 				}
799 				sock = s;
800 				break;
801 			case P_HAND_SHAKE_M:
802 				if (msock) {
803 					dev_warn(DEV, "initial packet M crossed\n");
804 					sock_release(msock);
805 				}
806 				msock = s;
807 				set_bit(DISCARD_CONCURRENT, &mdev->flags);
808 				break;
809 			default:
810 				dev_warn(DEV, "Error receiving initial packet\n");
811 				sock_release(s);
812 				if (random32() & 1)
813 					goto retry;
814 			}
815 		}
816 
817 		if (mdev->state.conn <= C_DISCONNECTING)
818 			goto out_release_sockets;
819 		if (signal_pending(current)) {
820 			flush_signals(current);
821 			smp_rmb();
822 			if (get_t_state(&mdev->receiver) == Exiting)
823 				goto out_release_sockets;
824 		}
825 
826 		if (sock && msock) {
827 			ok = drbd_socket_okay(mdev, &sock);
828 			ok = drbd_socket_okay(mdev, &msock) && ok;
829 			if (ok)
830 				break;
831 		}
832 	} while (1);
833 
834 	msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
835 	sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
836 
837 	sock->sk->sk_allocation = GFP_NOIO;
838 	msock->sk->sk_allocation = GFP_NOIO;
839 
840 	sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
841 	msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
842 
843 	if (mdev->net_conf->sndbuf_size) {
844 		sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
845 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
846 	}
847 
848 	if (mdev->net_conf->rcvbuf_size) {
849 		sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
850 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
851 	}
852 
853 	/* NOT YET ...
854 	 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
855 	 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
856 	 * first set it to the P_HAND_SHAKE timeout,
857 	 * which we set to 4x the configured ping_timeout. */
858 	sock->sk->sk_sndtimeo =
859 	sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
860 
861 	msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862 	msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
863 
864 	/* we don't want delays.
865 	 * we use TCP_CORK where apropriate, though */
866 	drbd_tcp_nodelay(sock);
867 	drbd_tcp_nodelay(msock);
868 
869 	mdev->data.socket = sock;
870 	mdev->meta.socket = msock;
871 	mdev->last_received = jiffies;
872 
873 	D_ASSERT(mdev->asender.task == NULL);
874 
875 	h = drbd_do_handshake(mdev);
876 	if (h <= 0)
877 		return h;
878 
879 	if (mdev->cram_hmac_tfm) {
880 		/* drbd_request_state(mdev, NS(conn, WFAuth)); */
881 		if (!drbd_do_auth(mdev)) {
882 			dev_err(DEV, "Authentication of peer failed\n");
883 			return -1;
884 		}
885 	}
886 
887 	if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
888 		return 0;
889 
890 	sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
891 	sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
892 
893 	atomic_set(&mdev->packet_seq, 0);
894 	mdev->peer_seq = 0;
895 
896 	drbd_thread_start(&mdev->asender);
897 
898 	drbd_send_protocol(mdev);
899 	drbd_send_sync_param(mdev, &mdev->sync_conf);
900 	drbd_send_sizes(mdev, 0);
901 	drbd_send_uuids(mdev);
902 	drbd_send_state(mdev);
903 	clear_bit(USE_DEGR_WFC_T, &mdev->flags);
904 	clear_bit(RESIZE_PENDING, &mdev->flags);
905 
906 	return 1;
907 
908 out_release_sockets:
909 	if (sock)
910 		sock_release(sock);
911 	if (msock)
912 		sock_release(msock);
913 	return -1;
914 }
915 
916 static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
917 {
918 	int r;
919 
920 	r = drbd_recv(mdev, h, sizeof(*h));
921 
922 	if (unlikely(r != sizeof(*h))) {
923 		dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
924 		return FALSE;
925 	};
926 	h->command = be16_to_cpu(h->command);
927 	h->length  = be16_to_cpu(h->length);
928 	if (unlikely(h->magic != BE_DRBD_MAGIC)) {
929 		dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
930 		    (long)be32_to_cpu(h->magic),
931 		    h->command, h->length);
932 		return FALSE;
933 	}
934 	mdev->last_received = jiffies;
935 
936 	return TRUE;
937 }
938 
939 static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
940 {
941 	int rv;
942 
943 	if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
944 		rv = blkdev_issue_flush(mdev->ldev->backing_bdev, NULL);
945 		if (rv) {
946 			dev_err(DEV, "local disk flush failed with status %d\n", rv);
947 			/* would rather check on EOPNOTSUPP, but that is not reliable.
948 			 * don't try again for ANY return value != 0
949 			 * if (rv == -EOPNOTSUPP) */
950 			drbd_bump_write_ordering(mdev, WO_drain_io);
951 		}
952 		put_ldev(mdev);
953 	}
954 
955 	return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
956 }
957 
958 static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
959 {
960 	struct flush_work *fw = (struct flush_work *)w;
961 	struct drbd_epoch *epoch = fw->epoch;
962 
963 	kfree(w);
964 
965 	if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
966 		drbd_flush_after_epoch(mdev, epoch);
967 
968 	drbd_may_finish_epoch(mdev, epoch, EV_PUT |
969 			      (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
970 
971 	return 1;
972 }
973 
974 /**
975  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
976  * @mdev:	DRBD device.
977  * @epoch:	Epoch object.
978  * @ev:		Epoch event.
979  */
980 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
981 					       struct drbd_epoch *epoch,
982 					       enum epoch_event ev)
983 {
984 	int finish, epoch_size;
985 	struct drbd_epoch *next_epoch;
986 	int schedule_flush = 0;
987 	enum finish_epoch rv = FE_STILL_LIVE;
988 
989 	spin_lock(&mdev->epoch_lock);
990 	do {
991 		next_epoch = NULL;
992 		finish = 0;
993 
994 		epoch_size = atomic_read(&epoch->epoch_size);
995 
996 		switch (ev & ~EV_CLEANUP) {
997 		case EV_PUT:
998 			atomic_dec(&epoch->active);
999 			break;
1000 		case EV_GOT_BARRIER_NR:
1001 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1002 
1003 			/* Special case: If we just switched from WO_bio_barrier to
1004 			   WO_bdev_flush we should not finish the current epoch */
1005 			if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1006 			    mdev->write_ordering != WO_bio_barrier &&
1007 			    epoch == mdev->current_epoch)
1008 				clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1009 			break;
1010 		case EV_BARRIER_DONE:
1011 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1012 			break;
1013 		case EV_BECAME_LAST:
1014 			/* nothing to do*/
1015 			break;
1016 		}
1017 
1018 		if (epoch_size != 0 &&
1019 		    atomic_read(&epoch->active) == 0 &&
1020 		    test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1021 		    epoch->list.prev == &mdev->current_epoch->list &&
1022 		    !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1023 			/* Nearly all conditions are met to finish that epoch... */
1024 			if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1025 			    mdev->write_ordering == WO_none ||
1026 			    (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1027 			    ev & EV_CLEANUP) {
1028 				finish = 1;
1029 				set_bit(DE_IS_FINISHING, &epoch->flags);
1030 			} else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1031 				 mdev->write_ordering == WO_bio_barrier) {
1032 				atomic_inc(&epoch->active);
1033 				schedule_flush = 1;
1034 			}
1035 		}
1036 		if (finish) {
1037 			if (!(ev & EV_CLEANUP)) {
1038 				spin_unlock(&mdev->epoch_lock);
1039 				drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1040 				spin_lock(&mdev->epoch_lock);
1041 			}
1042 			dec_unacked(mdev);
1043 
1044 			if (mdev->current_epoch != epoch) {
1045 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1046 				list_del(&epoch->list);
1047 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1048 				mdev->epochs--;
1049 				kfree(epoch);
1050 
1051 				if (rv == FE_STILL_LIVE)
1052 					rv = FE_DESTROYED;
1053 			} else {
1054 				epoch->flags = 0;
1055 				atomic_set(&epoch->epoch_size, 0);
1056 				/* atomic_set(&epoch->active, 0); is alrady zero */
1057 				if (rv == FE_STILL_LIVE)
1058 					rv = FE_RECYCLED;
1059 			}
1060 		}
1061 
1062 		if (!next_epoch)
1063 			break;
1064 
1065 		epoch = next_epoch;
1066 	} while (1);
1067 
1068 	spin_unlock(&mdev->epoch_lock);
1069 
1070 	if (schedule_flush) {
1071 		struct flush_work *fw;
1072 		fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1073 		if (fw) {
1074 			fw->w.cb = w_flush;
1075 			fw->epoch = epoch;
1076 			drbd_queue_work(&mdev->data.work, &fw->w);
1077 		} else {
1078 			dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1079 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1080 			/* That is not a recursion, only one level */
1081 			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1082 			drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1083 		}
1084 	}
1085 
1086 	return rv;
1087 }
1088 
1089 /**
1090  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1091  * @mdev:	DRBD device.
1092  * @wo:		Write ordering method to try.
1093  */
1094 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1095 {
1096 	enum write_ordering_e pwo;
1097 	static char *write_ordering_str[] = {
1098 		[WO_none] = "none",
1099 		[WO_drain_io] = "drain",
1100 		[WO_bdev_flush] = "flush",
1101 		[WO_bio_barrier] = "barrier",
1102 	};
1103 
1104 	pwo = mdev->write_ordering;
1105 	wo = min(pwo, wo);
1106 	if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1107 		wo = WO_bdev_flush;
1108 	if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1109 		wo = WO_drain_io;
1110 	if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1111 		wo = WO_none;
1112 	mdev->write_ordering = wo;
1113 	if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1114 		dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1115 }
1116 
1117 /**
1118  * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1119  * @mdev:	DRBD device.
1120  * @w:		work object.
1121  * @cancel:	The connection will be closed anyways (unused in this callback)
1122  */
1123 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1124 {
1125 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1126 	struct bio *bio = e->private_bio;
1127 
1128 	/* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1129 	   (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1130 	   so that we can finish that epoch in drbd_may_finish_epoch().
1131 	   That is necessary if we already have a long chain of Epochs, before
1132 	   we realize that BIO_RW_BARRIER is actually not supported */
1133 
1134 	/* As long as the -ENOTSUPP on the barrier is reported immediately
1135 	   that will never trigger. If it is reported late, we will just
1136 	   print that warning and continue correctly for all future requests
1137 	   with WO_bdev_flush */
1138 	if (previous_epoch(mdev, e->epoch))
1139 		dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1140 
1141 	/* prepare bio for re-submit,
1142 	 * re-init volatile members */
1143 	/* we still have a local reference,
1144 	 * get_ldev was done in receive_Data. */
1145 	bio->bi_bdev = mdev->ldev->backing_bdev;
1146 	bio->bi_sector = e->sector;
1147 	bio->bi_size = e->size;
1148 	bio->bi_idx = 0;
1149 
1150 	bio->bi_flags &= ~(BIO_POOL_MASK - 1);
1151 	bio->bi_flags |= 1 << BIO_UPTODATE;
1152 
1153 	/* don't know whether this is necessary: */
1154 	bio->bi_phys_segments = 0;
1155 	bio->bi_next = NULL;
1156 
1157 	/* these should be unchanged: */
1158 	/* bio->bi_end_io = drbd_endio_write_sec; */
1159 	/* bio->bi_vcnt = whatever; */
1160 
1161 	e->w.cb = e_end_block;
1162 
1163 	/* This is no longer a barrier request. */
1164 	bio->bi_rw &= ~(1UL << BIO_RW_BARRIER);
1165 
1166 	drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio);
1167 
1168 	return 1;
1169 }
1170 
1171 static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1172 {
1173 	int rv, issue_flush;
1174 	struct p_barrier *p = (struct p_barrier *)h;
1175 	struct drbd_epoch *epoch;
1176 
1177 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1178 
1179 	rv = drbd_recv(mdev, h->payload, h->length);
1180 	ERR_IF(rv != h->length) return FALSE;
1181 
1182 	inc_unacked(mdev);
1183 
1184 	if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1185 		drbd_kick_lo(mdev);
1186 
1187 	mdev->current_epoch->barrier_nr = p->barrier;
1188 	rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1189 
1190 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1191 	 * the activity log, which means it would not be resynced in case the
1192 	 * R_PRIMARY crashes now.
1193 	 * Therefore we must send the barrier_ack after the barrier request was
1194 	 * completed. */
1195 	switch (mdev->write_ordering) {
1196 	case WO_bio_barrier:
1197 	case WO_none:
1198 		if (rv == FE_RECYCLED)
1199 			return TRUE;
1200 		break;
1201 
1202 	case WO_bdev_flush:
1203 	case WO_drain_io:
1204 		D_ASSERT(rv == FE_STILL_LIVE);
1205 		set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1206 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1207 		rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1208 		if (rv == FE_RECYCLED)
1209 			return TRUE;
1210 
1211 		/* The asender will send all the ACKs and barrier ACKs out, since
1212 		   all EEs moved from the active_ee to the done_ee. We need to
1213 		   provide a new epoch object for the EEs that come in soon */
1214 		break;
1215 	}
1216 
1217 	/* receiver context, in the writeout path of the other node.
1218 	 * avoid potential distributed deadlock */
1219 	epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1220 	if (!epoch) {
1221 		dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1222 		issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1223 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1224 		if (issue_flush) {
1225 			rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1226 			if (rv == FE_RECYCLED)
1227 				return TRUE;
1228 		}
1229 
1230 		drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1231 
1232 		return TRUE;
1233 	}
1234 
1235 	epoch->flags = 0;
1236 	atomic_set(&epoch->epoch_size, 0);
1237 	atomic_set(&epoch->active, 0);
1238 
1239 	spin_lock(&mdev->epoch_lock);
1240 	if (atomic_read(&mdev->current_epoch->epoch_size)) {
1241 		list_add(&epoch->list, &mdev->current_epoch->list);
1242 		mdev->current_epoch = epoch;
1243 		mdev->epochs++;
1244 	} else {
1245 		/* The current_epoch got recycled while we allocated this one... */
1246 		kfree(epoch);
1247 	}
1248 	spin_unlock(&mdev->epoch_lock);
1249 
1250 	return TRUE;
1251 }
1252 
1253 /* used from receive_RSDataReply (recv_resync_read)
1254  * and from receive_Data */
1255 static struct drbd_epoch_entry *
1256 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1257 {
1258 	struct drbd_epoch_entry *e;
1259 	struct bio_vec *bvec;
1260 	struct page *page;
1261 	struct bio *bio;
1262 	int dgs, ds, i, rr;
1263 	void *dig_in = mdev->int_dig_in;
1264 	void *dig_vv = mdev->int_dig_vv;
1265 
1266 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1267 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1268 
1269 	if (dgs) {
1270 		rr = drbd_recv(mdev, dig_in, dgs);
1271 		if (rr != dgs) {
1272 			dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1273 			     rr, dgs);
1274 			return NULL;
1275 		}
1276 	}
1277 
1278 	data_size -= dgs;
1279 
1280 	ERR_IF(data_size &  0x1ff) return NULL;
1281 	ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return NULL;
1282 
1283 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1284 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1285 	 * which in turn might block on the other node at this very place.  */
1286 	e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1287 	if (!e)
1288 		return NULL;
1289 	bio = e->private_bio;
1290 	ds = data_size;
1291 	bio_for_each_segment(bvec, bio, i) {
1292 		page = bvec->bv_page;
1293 		rr = drbd_recv(mdev, kmap(page), min_t(int, ds, PAGE_SIZE));
1294 		kunmap(page);
1295 		if (rr != min_t(int, ds, PAGE_SIZE)) {
1296 			drbd_free_ee(mdev, e);
1297 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1298 			     rr, min_t(int, ds, PAGE_SIZE));
1299 			return NULL;
1300 		}
1301 		ds -= rr;
1302 	}
1303 
1304 	if (dgs) {
1305 		drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1306 		if (memcmp(dig_in, dig_vv, dgs)) {
1307 			dev_err(DEV, "Digest integrity check FAILED.\n");
1308 			drbd_bcast_ee(mdev, "digest failed",
1309 					dgs, dig_in, dig_vv, e);
1310 			drbd_free_ee(mdev, e);
1311 			return NULL;
1312 		}
1313 	}
1314 	mdev->recv_cnt += data_size>>9;
1315 	return e;
1316 }
1317 
1318 /* drbd_drain_block() just takes a data block
1319  * out of the socket input buffer, and discards it.
1320  */
1321 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1322 {
1323 	struct page *page;
1324 	int rr, rv = 1;
1325 	void *data;
1326 
1327 	page = drbd_pp_alloc(mdev, 1);
1328 
1329 	data = kmap(page);
1330 	while (data_size) {
1331 		rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1332 		if (rr != min_t(int, data_size, PAGE_SIZE)) {
1333 			rv = 0;
1334 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1335 			     rr, min_t(int, data_size, PAGE_SIZE));
1336 			break;
1337 		}
1338 		data_size -= rr;
1339 	}
1340 	kunmap(page);
1341 	drbd_pp_free(mdev, page);
1342 	return rv;
1343 }
1344 
1345 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1346 			   sector_t sector, int data_size)
1347 {
1348 	struct bio_vec *bvec;
1349 	struct bio *bio;
1350 	int dgs, rr, i, expect;
1351 	void *dig_in = mdev->int_dig_in;
1352 	void *dig_vv = mdev->int_dig_vv;
1353 
1354 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1355 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1356 
1357 	if (dgs) {
1358 		rr = drbd_recv(mdev, dig_in, dgs);
1359 		if (rr != dgs) {
1360 			dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1361 			     rr, dgs);
1362 			return 0;
1363 		}
1364 	}
1365 
1366 	data_size -= dgs;
1367 
1368 	/* optimistically update recv_cnt.  if receiving fails below,
1369 	 * we disconnect anyways, and counters will be reset. */
1370 	mdev->recv_cnt += data_size>>9;
1371 
1372 	bio = req->master_bio;
1373 	D_ASSERT(sector == bio->bi_sector);
1374 
1375 	bio_for_each_segment(bvec, bio, i) {
1376 		expect = min_t(int, data_size, bvec->bv_len);
1377 		rr = drbd_recv(mdev,
1378 			     kmap(bvec->bv_page)+bvec->bv_offset,
1379 			     expect);
1380 		kunmap(bvec->bv_page);
1381 		if (rr != expect) {
1382 			dev_warn(DEV, "short read receiving data reply: "
1383 			     "read %d expected %d\n",
1384 			     rr, expect);
1385 			return 0;
1386 		}
1387 		data_size -= rr;
1388 	}
1389 
1390 	if (dgs) {
1391 		drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1392 		if (memcmp(dig_in, dig_vv, dgs)) {
1393 			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1394 			return 0;
1395 		}
1396 	}
1397 
1398 	D_ASSERT(data_size == 0);
1399 	return 1;
1400 }
1401 
1402 /* e_end_resync_block() is called via
1403  * drbd_process_done_ee() by asender only */
1404 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1405 {
1406 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1407 	sector_t sector = e->sector;
1408 	int ok;
1409 
1410 	D_ASSERT(hlist_unhashed(&e->colision));
1411 
1412 	if (likely(drbd_bio_uptodate(e->private_bio))) {
1413 		drbd_set_in_sync(mdev, sector, e->size);
1414 		ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1415 	} else {
1416 		/* Record failure to sync */
1417 		drbd_rs_failed_io(mdev, sector, e->size);
1418 
1419 		ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1420 	}
1421 	dec_unacked(mdev);
1422 
1423 	return ok;
1424 }
1425 
1426 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1427 {
1428 	struct drbd_epoch_entry *e;
1429 
1430 	e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1431 	if (!e) {
1432 		put_ldev(mdev);
1433 		return FALSE;
1434 	}
1435 
1436 	dec_rs_pending(mdev);
1437 
1438 	e->private_bio->bi_end_io = drbd_endio_write_sec;
1439 	e->private_bio->bi_rw = WRITE;
1440 	e->w.cb = e_end_resync_block;
1441 
1442 	inc_unacked(mdev);
1443 	/* corresponding dec_unacked() in e_end_resync_block()
1444 	 * respective _drbd_clear_done_ee */
1445 
1446 	spin_lock_irq(&mdev->req_lock);
1447 	list_add(&e->w.list, &mdev->sync_ee);
1448 	spin_unlock_irq(&mdev->req_lock);
1449 
1450 	drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio);
1451 	/* accounting done in endio */
1452 
1453 	maybe_kick_lo(mdev);
1454 	return TRUE;
1455 }
1456 
1457 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1458 {
1459 	struct drbd_request *req;
1460 	sector_t sector;
1461 	unsigned int header_size, data_size;
1462 	int ok;
1463 	struct p_data *p = (struct p_data *)h;
1464 
1465 	header_size = sizeof(*p) - sizeof(*h);
1466 	data_size   = h->length  - header_size;
1467 
1468 	ERR_IF(data_size == 0) return FALSE;
1469 
1470 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1471 		return FALSE;
1472 
1473 	sector = be64_to_cpu(p->sector);
1474 
1475 	spin_lock_irq(&mdev->req_lock);
1476 	req = _ar_id_to_req(mdev, p->block_id, sector);
1477 	spin_unlock_irq(&mdev->req_lock);
1478 	if (unlikely(!req)) {
1479 		dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1480 		return FALSE;
1481 	}
1482 
1483 	/* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1484 	 * special casing it there for the various failure cases.
1485 	 * still no race with drbd_fail_pending_reads */
1486 	ok = recv_dless_read(mdev, req, sector, data_size);
1487 
1488 	if (ok)
1489 		req_mod(req, data_received);
1490 	/* else: nothing. handled from drbd_disconnect...
1491 	 * I don't think we may complete this just yet
1492 	 * in case we are "on-disconnect: freeze" */
1493 
1494 	return ok;
1495 }
1496 
1497 static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1498 {
1499 	sector_t sector;
1500 	unsigned int header_size, data_size;
1501 	int ok;
1502 	struct p_data *p = (struct p_data *)h;
1503 
1504 	header_size = sizeof(*p) - sizeof(*h);
1505 	data_size   = h->length  - header_size;
1506 
1507 	ERR_IF(data_size == 0) return FALSE;
1508 
1509 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1510 		return FALSE;
1511 
1512 	sector = be64_to_cpu(p->sector);
1513 	D_ASSERT(p->block_id == ID_SYNCER);
1514 
1515 	if (get_ldev(mdev)) {
1516 		/* data is submitted to disk within recv_resync_read.
1517 		 * corresponding put_ldev done below on error,
1518 		 * or in drbd_endio_write_sec. */
1519 		ok = recv_resync_read(mdev, sector, data_size);
1520 	} else {
1521 		if (__ratelimit(&drbd_ratelimit_state))
1522 			dev_err(DEV, "Can not write resync data to local disk.\n");
1523 
1524 		ok = drbd_drain_block(mdev, data_size);
1525 
1526 		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1527 	}
1528 
1529 	return ok;
1530 }
1531 
1532 /* e_end_block() is called via drbd_process_done_ee().
1533  * this means this function only runs in the asender thread
1534  */
1535 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1536 {
1537 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1538 	sector_t sector = e->sector;
1539 	struct drbd_epoch *epoch;
1540 	int ok = 1, pcmd;
1541 
1542 	if (e->flags & EE_IS_BARRIER) {
1543 		epoch = previous_epoch(mdev, e->epoch);
1544 		if (epoch)
1545 			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1546 	}
1547 
1548 	if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1549 		if (likely(drbd_bio_uptodate(e->private_bio))) {
1550 			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1551 				mdev->state.conn <= C_PAUSED_SYNC_T &&
1552 				e->flags & EE_MAY_SET_IN_SYNC) ?
1553 				P_RS_WRITE_ACK : P_WRITE_ACK;
1554 			ok &= drbd_send_ack(mdev, pcmd, e);
1555 			if (pcmd == P_RS_WRITE_ACK)
1556 				drbd_set_in_sync(mdev, sector, e->size);
1557 		} else {
1558 			ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1559 			/* we expect it to be marked out of sync anyways...
1560 			 * maybe assert this?  */
1561 		}
1562 		dec_unacked(mdev);
1563 	}
1564 	/* we delete from the conflict detection hash _after_ we sent out the
1565 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1566 	if (mdev->net_conf->two_primaries) {
1567 		spin_lock_irq(&mdev->req_lock);
1568 		D_ASSERT(!hlist_unhashed(&e->colision));
1569 		hlist_del_init(&e->colision);
1570 		spin_unlock_irq(&mdev->req_lock);
1571 	} else {
1572 		D_ASSERT(hlist_unhashed(&e->colision));
1573 	}
1574 
1575 	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1576 
1577 	return ok;
1578 }
1579 
1580 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1581 {
1582 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1583 	int ok = 1;
1584 
1585 	D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1586 	ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1587 
1588 	spin_lock_irq(&mdev->req_lock);
1589 	D_ASSERT(!hlist_unhashed(&e->colision));
1590 	hlist_del_init(&e->colision);
1591 	spin_unlock_irq(&mdev->req_lock);
1592 
1593 	dec_unacked(mdev);
1594 
1595 	return ok;
1596 }
1597 
1598 /* Called from receive_Data.
1599  * Synchronize packets on sock with packets on msock.
1600  *
1601  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1602  * packet traveling on msock, they are still processed in the order they have
1603  * been sent.
1604  *
1605  * Note: we don't care for Ack packets overtaking P_DATA packets.
1606  *
1607  * In case packet_seq is larger than mdev->peer_seq number, there are
1608  * outstanding packets on the msock. We wait for them to arrive.
1609  * In case we are the logically next packet, we update mdev->peer_seq
1610  * ourselves. Correctly handles 32bit wrap around.
1611  *
1612  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1613  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1614  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1615  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1616  *
1617  * returns 0 if we may process the packet,
1618  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1619 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1620 {
1621 	DEFINE_WAIT(wait);
1622 	unsigned int p_seq;
1623 	long timeout;
1624 	int ret = 0;
1625 	spin_lock(&mdev->peer_seq_lock);
1626 	for (;;) {
1627 		prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1628 		if (seq_le(packet_seq, mdev->peer_seq+1))
1629 			break;
1630 		if (signal_pending(current)) {
1631 			ret = -ERESTARTSYS;
1632 			break;
1633 		}
1634 		p_seq = mdev->peer_seq;
1635 		spin_unlock(&mdev->peer_seq_lock);
1636 		timeout = schedule_timeout(30*HZ);
1637 		spin_lock(&mdev->peer_seq_lock);
1638 		if (timeout == 0 && p_seq == mdev->peer_seq) {
1639 			ret = -ETIMEDOUT;
1640 			dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1641 			break;
1642 		}
1643 	}
1644 	finish_wait(&mdev->seq_wait, &wait);
1645 	if (mdev->peer_seq+1 == packet_seq)
1646 		mdev->peer_seq++;
1647 	spin_unlock(&mdev->peer_seq_lock);
1648 	return ret;
1649 }
1650 
1651 /* mirrored write */
1652 static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1653 {
1654 	sector_t sector;
1655 	struct drbd_epoch_entry *e;
1656 	struct p_data *p = (struct p_data *)h;
1657 	int header_size, data_size;
1658 	int rw = WRITE;
1659 	u32 dp_flags;
1660 
1661 	header_size = sizeof(*p) - sizeof(*h);
1662 	data_size   = h->length  - header_size;
1663 
1664 	ERR_IF(data_size == 0) return FALSE;
1665 
1666 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1667 		return FALSE;
1668 
1669 	if (!get_ldev(mdev)) {
1670 		if (__ratelimit(&drbd_ratelimit_state))
1671 			dev_err(DEV, "Can not write mirrored data block "
1672 			    "to local disk.\n");
1673 		spin_lock(&mdev->peer_seq_lock);
1674 		if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1675 			mdev->peer_seq++;
1676 		spin_unlock(&mdev->peer_seq_lock);
1677 
1678 		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1679 		atomic_inc(&mdev->current_epoch->epoch_size);
1680 		return drbd_drain_block(mdev, data_size);
1681 	}
1682 
1683 	/* get_ldev(mdev) successful.
1684 	 * Corresponding put_ldev done either below (on various errors),
1685 	 * or in drbd_endio_write_sec, if we successfully submit the data at
1686 	 * the end of this function. */
1687 
1688 	sector = be64_to_cpu(p->sector);
1689 	e = read_in_block(mdev, p->block_id, sector, data_size);
1690 	if (!e) {
1691 		put_ldev(mdev);
1692 		return FALSE;
1693 	}
1694 
1695 	e->private_bio->bi_end_io = drbd_endio_write_sec;
1696 	e->w.cb = e_end_block;
1697 
1698 	spin_lock(&mdev->epoch_lock);
1699 	e->epoch = mdev->current_epoch;
1700 	atomic_inc(&e->epoch->epoch_size);
1701 	atomic_inc(&e->epoch->active);
1702 
1703 	if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1704 		struct drbd_epoch *epoch;
1705 		/* Issue a barrier if we start a new epoch, and the previous epoch
1706 		   was not a epoch containing a single request which already was
1707 		   a Barrier. */
1708 		epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1709 		if (epoch == e->epoch) {
1710 			set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1711 			rw |= (1<<BIO_RW_BARRIER);
1712 			e->flags |= EE_IS_BARRIER;
1713 		} else {
1714 			if (atomic_read(&epoch->epoch_size) > 1 ||
1715 			    !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1716 				set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1717 				set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1718 				rw |= (1<<BIO_RW_BARRIER);
1719 				e->flags |= EE_IS_BARRIER;
1720 			}
1721 		}
1722 	}
1723 	spin_unlock(&mdev->epoch_lock);
1724 
1725 	dp_flags = be32_to_cpu(p->dp_flags);
1726 	if (dp_flags & DP_HARDBARRIER) {
1727 		dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1728 		/* rw |= (1<<BIO_RW_BARRIER); */
1729 	}
1730 	if (dp_flags & DP_RW_SYNC)
1731 		rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1732 	if (dp_flags & DP_MAY_SET_IN_SYNC)
1733 		e->flags |= EE_MAY_SET_IN_SYNC;
1734 
1735 	/* I'm the receiver, I do hold a net_cnt reference. */
1736 	if (!mdev->net_conf->two_primaries) {
1737 		spin_lock_irq(&mdev->req_lock);
1738 	} else {
1739 		/* don't get the req_lock yet,
1740 		 * we may sleep in drbd_wait_peer_seq */
1741 		const int size = e->size;
1742 		const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1743 		DEFINE_WAIT(wait);
1744 		struct drbd_request *i;
1745 		struct hlist_node *n;
1746 		struct hlist_head *slot;
1747 		int first;
1748 
1749 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1750 		BUG_ON(mdev->ee_hash == NULL);
1751 		BUG_ON(mdev->tl_hash == NULL);
1752 
1753 		/* conflict detection and handling:
1754 		 * 1. wait on the sequence number,
1755 		 *    in case this data packet overtook ACK packets.
1756 		 * 2. check our hash tables for conflicting requests.
1757 		 *    we only need to walk the tl_hash, since an ee can not
1758 		 *    have a conflict with an other ee: on the submitting
1759 		 *    node, the corresponding req had already been conflicting,
1760 		 *    and a conflicting req is never sent.
1761 		 *
1762 		 * Note: for two_primaries, we are protocol C,
1763 		 * so there cannot be any request that is DONE
1764 		 * but still on the transfer log.
1765 		 *
1766 		 * unconditionally add to the ee_hash.
1767 		 *
1768 		 * if no conflicting request is found:
1769 		 *    submit.
1770 		 *
1771 		 * if any conflicting request is found
1772 		 * that has not yet been acked,
1773 		 * AND I have the "discard concurrent writes" flag:
1774 		 *	 queue (via done_ee) the P_DISCARD_ACK; OUT.
1775 		 *
1776 		 * if any conflicting request is found:
1777 		 *	 block the receiver, waiting on misc_wait
1778 		 *	 until no more conflicting requests are there,
1779 		 *	 or we get interrupted (disconnect).
1780 		 *
1781 		 *	 we do not just write after local io completion of those
1782 		 *	 requests, but only after req is done completely, i.e.
1783 		 *	 we wait for the P_DISCARD_ACK to arrive!
1784 		 *
1785 		 *	 then proceed normally, i.e. submit.
1786 		 */
1787 		if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1788 			goto out_interrupted;
1789 
1790 		spin_lock_irq(&mdev->req_lock);
1791 
1792 		hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1793 
1794 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1795 		slot = tl_hash_slot(mdev, sector);
1796 		first = 1;
1797 		for (;;) {
1798 			int have_unacked = 0;
1799 			int have_conflict = 0;
1800 			prepare_to_wait(&mdev->misc_wait, &wait,
1801 				TASK_INTERRUPTIBLE);
1802 			hlist_for_each_entry(i, n, slot, colision) {
1803 				if (OVERLAPS) {
1804 					/* only ALERT on first iteration,
1805 					 * we may be woken up early... */
1806 					if (first)
1807 						dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1808 						      "	new: %llus +%u; pending: %llus +%u\n",
1809 						      current->comm, current->pid,
1810 						      (unsigned long long)sector, size,
1811 						      (unsigned long long)i->sector, i->size);
1812 					if (i->rq_state & RQ_NET_PENDING)
1813 						++have_unacked;
1814 					++have_conflict;
1815 				}
1816 			}
1817 #undef OVERLAPS
1818 			if (!have_conflict)
1819 				break;
1820 
1821 			/* Discard Ack only for the _first_ iteration */
1822 			if (first && discard && have_unacked) {
1823 				dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1824 				     (unsigned long long)sector);
1825 				inc_unacked(mdev);
1826 				e->w.cb = e_send_discard_ack;
1827 				list_add_tail(&e->w.list, &mdev->done_ee);
1828 
1829 				spin_unlock_irq(&mdev->req_lock);
1830 
1831 				/* we could probably send that P_DISCARD_ACK ourselves,
1832 				 * but I don't like the receiver using the msock */
1833 
1834 				put_ldev(mdev);
1835 				wake_asender(mdev);
1836 				finish_wait(&mdev->misc_wait, &wait);
1837 				return TRUE;
1838 			}
1839 
1840 			if (signal_pending(current)) {
1841 				hlist_del_init(&e->colision);
1842 
1843 				spin_unlock_irq(&mdev->req_lock);
1844 
1845 				finish_wait(&mdev->misc_wait, &wait);
1846 				goto out_interrupted;
1847 			}
1848 
1849 			spin_unlock_irq(&mdev->req_lock);
1850 			if (first) {
1851 				first = 0;
1852 				dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1853 				     "sec=%llus\n", (unsigned long long)sector);
1854 			} else if (discard) {
1855 				/* we had none on the first iteration.
1856 				 * there must be none now. */
1857 				D_ASSERT(have_unacked == 0);
1858 			}
1859 			schedule();
1860 			spin_lock_irq(&mdev->req_lock);
1861 		}
1862 		finish_wait(&mdev->misc_wait, &wait);
1863 	}
1864 
1865 	list_add(&e->w.list, &mdev->active_ee);
1866 	spin_unlock_irq(&mdev->req_lock);
1867 
1868 	switch (mdev->net_conf->wire_protocol) {
1869 	case DRBD_PROT_C:
1870 		inc_unacked(mdev);
1871 		/* corresponding dec_unacked() in e_end_block()
1872 		 * respective _drbd_clear_done_ee */
1873 		break;
1874 	case DRBD_PROT_B:
1875 		/* I really don't like it that the receiver thread
1876 		 * sends on the msock, but anyways */
1877 		drbd_send_ack(mdev, P_RECV_ACK, e);
1878 		break;
1879 	case DRBD_PROT_A:
1880 		/* nothing to do */
1881 		break;
1882 	}
1883 
1884 	if (mdev->state.pdsk == D_DISKLESS) {
1885 		/* In case we have the only disk of the cluster, */
1886 		drbd_set_out_of_sync(mdev, e->sector, e->size);
1887 		e->flags |= EE_CALL_AL_COMPLETE_IO;
1888 		drbd_al_begin_io(mdev, e->sector);
1889 	}
1890 
1891 	e->private_bio->bi_rw = rw;
1892 	drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio);
1893 	/* accounting done in endio */
1894 
1895 	maybe_kick_lo(mdev);
1896 	return TRUE;
1897 
1898 out_interrupted:
1899 	/* yes, the epoch_size now is imbalanced.
1900 	 * but we drop the connection anyways, so we don't have a chance to
1901 	 * receive a barrier... atomic_inc(&mdev->epoch_size); */
1902 	put_ldev(mdev);
1903 	drbd_free_ee(mdev, e);
1904 	return FALSE;
1905 }
1906 
1907 static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
1908 {
1909 	sector_t sector;
1910 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1911 	struct drbd_epoch_entry *e;
1912 	struct digest_info *di = NULL;
1913 	int size, digest_size;
1914 	unsigned int fault_type;
1915 	struct p_block_req *p =
1916 		(struct p_block_req *)h;
1917 	const int brps = sizeof(*p)-sizeof(*h);
1918 
1919 	if (drbd_recv(mdev, h->payload, brps) != brps)
1920 		return FALSE;
1921 
1922 	sector = be64_to_cpu(p->sector);
1923 	size   = be32_to_cpu(p->blksize);
1924 
1925 	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
1926 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1927 				(unsigned long long)sector, size);
1928 		return FALSE;
1929 	}
1930 	if (sector + (size>>9) > capacity) {
1931 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1932 				(unsigned long long)sector, size);
1933 		return FALSE;
1934 	}
1935 
1936 	if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1937 		if (__ratelimit(&drbd_ratelimit_state))
1938 			dev_err(DEV, "Can not satisfy peer's read request, "
1939 			    "no local data.\n");
1940 		drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
1941 				 P_NEG_RS_DREPLY , p);
1942 		return TRUE;
1943 	}
1944 
1945 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1946 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1947 	 * which in turn might block on the other node at this very place.  */
1948 	e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1949 	if (!e) {
1950 		put_ldev(mdev);
1951 		return FALSE;
1952 	}
1953 
1954 	e->private_bio->bi_rw = READ;
1955 	e->private_bio->bi_end_io = drbd_endio_read_sec;
1956 
1957 	switch (h->command) {
1958 	case P_DATA_REQUEST:
1959 		e->w.cb = w_e_end_data_req;
1960 		fault_type = DRBD_FAULT_DT_RD;
1961 		break;
1962 	case P_RS_DATA_REQUEST:
1963 		e->w.cb = w_e_end_rsdata_req;
1964 		fault_type = DRBD_FAULT_RS_RD;
1965 		/* Eventually this should become asynchronously. Currently it
1966 		 * blocks the whole receiver just to delay the reading of a
1967 		 * resync data block.
1968 		 * the drbd_work_queue mechanism is made for this...
1969 		 */
1970 		if (!drbd_rs_begin_io(mdev, sector)) {
1971 			/* we have been interrupted,
1972 			 * probably connection lost! */
1973 			D_ASSERT(signal_pending(current));
1974 			goto out_free_e;
1975 		}
1976 		break;
1977 
1978 	case P_OV_REPLY:
1979 	case P_CSUM_RS_REQUEST:
1980 		fault_type = DRBD_FAULT_RS_RD;
1981 		digest_size = h->length - brps ;
1982 		di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
1983 		if (!di)
1984 			goto out_free_e;
1985 
1986 		di->digest_size = digest_size;
1987 		di->digest = (((char *)di)+sizeof(struct digest_info));
1988 
1989 		if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
1990 			goto out_free_e;
1991 
1992 		e->block_id = (u64)(unsigned long)di;
1993 		if (h->command == P_CSUM_RS_REQUEST) {
1994 			D_ASSERT(mdev->agreed_pro_version >= 89);
1995 			e->w.cb = w_e_end_csum_rs_req;
1996 		} else if (h->command == P_OV_REPLY) {
1997 			e->w.cb = w_e_end_ov_reply;
1998 			dec_rs_pending(mdev);
1999 			break;
2000 		}
2001 
2002 		if (!drbd_rs_begin_io(mdev, sector)) {
2003 			/* we have been interrupted, probably connection lost! */
2004 			D_ASSERT(signal_pending(current));
2005 			goto out_free_e;
2006 		}
2007 		break;
2008 
2009 	case P_OV_REQUEST:
2010 		if (mdev->state.conn >= C_CONNECTED &&
2011 		    mdev->state.conn != C_VERIFY_T)
2012 			dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2013 				drbd_conn_str(mdev->state.conn));
2014 		if (mdev->ov_start_sector == ~(sector_t)0 &&
2015 		    mdev->agreed_pro_version >= 90) {
2016 			mdev->ov_start_sector = sector;
2017 			mdev->ov_position = sector;
2018 			mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2019 			dev_info(DEV, "Online Verify start sector: %llu\n",
2020 					(unsigned long long)sector);
2021 		}
2022 		e->w.cb = w_e_end_ov_req;
2023 		fault_type = DRBD_FAULT_RS_RD;
2024 		/* Eventually this should become asynchronous. Currently it
2025 		 * blocks the whole receiver just to delay the reading of a
2026 		 * resync data block.
2027 		 * the drbd_work_queue mechanism is made for this...
2028 		 */
2029 		if (!drbd_rs_begin_io(mdev, sector)) {
2030 			/* we have been interrupted,
2031 			 * probably connection lost! */
2032 			D_ASSERT(signal_pending(current));
2033 			goto out_free_e;
2034 		}
2035 		break;
2036 
2037 
2038 	default:
2039 		dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2040 		    cmdname(h->command));
2041 		fault_type = DRBD_FAULT_MAX;
2042 	}
2043 
2044 	spin_lock_irq(&mdev->req_lock);
2045 	list_add(&e->w.list, &mdev->read_ee);
2046 	spin_unlock_irq(&mdev->req_lock);
2047 
2048 	inc_unacked(mdev);
2049 
2050 	drbd_generic_make_request(mdev, fault_type, e->private_bio);
2051 	maybe_kick_lo(mdev);
2052 
2053 	return TRUE;
2054 
2055 out_free_e:
2056 	kfree(di);
2057 	put_ldev(mdev);
2058 	drbd_free_ee(mdev, e);
2059 	return FALSE;
2060 }
2061 
2062 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2063 {
2064 	int self, peer, rv = -100;
2065 	unsigned long ch_self, ch_peer;
2066 
2067 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2068 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2069 
2070 	ch_peer = mdev->p_uuid[UI_SIZE];
2071 	ch_self = mdev->comm_bm_set;
2072 
2073 	switch (mdev->net_conf->after_sb_0p) {
2074 	case ASB_CONSENSUS:
2075 	case ASB_DISCARD_SECONDARY:
2076 	case ASB_CALL_HELPER:
2077 		dev_err(DEV, "Configuration error.\n");
2078 		break;
2079 	case ASB_DISCONNECT:
2080 		break;
2081 	case ASB_DISCARD_YOUNGER_PRI:
2082 		if (self == 0 && peer == 1) {
2083 			rv = -1;
2084 			break;
2085 		}
2086 		if (self == 1 && peer == 0) {
2087 			rv =  1;
2088 			break;
2089 		}
2090 		/* Else fall through to one of the other strategies... */
2091 	case ASB_DISCARD_OLDER_PRI:
2092 		if (self == 0 && peer == 1) {
2093 			rv = 1;
2094 			break;
2095 		}
2096 		if (self == 1 && peer == 0) {
2097 			rv = -1;
2098 			break;
2099 		}
2100 		/* Else fall through to one of the other strategies... */
2101 		dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2102 		     "Using discard-least-changes instead\n");
2103 	case ASB_DISCARD_ZERO_CHG:
2104 		if (ch_peer == 0 && ch_self == 0) {
2105 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2106 				? -1 : 1;
2107 			break;
2108 		} else {
2109 			if (ch_peer == 0) { rv =  1; break; }
2110 			if (ch_self == 0) { rv = -1; break; }
2111 		}
2112 		if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2113 			break;
2114 	case ASB_DISCARD_LEAST_CHG:
2115 		if	(ch_self < ch_peer)
2116 			rv = -1;
2117 		else if (ch_self > ch_peer)
2118 			rv =  1;
2119 		else /* ( ch_self == ch_peer ) */
2120 		     /* Well, then use something else. */
2121 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2122 				? -1 : 1;
2123 		break;
2124 	case ASB_DISCARD_LOCAL:
2125 		rv = -1;
2126 		break;
2127 	case ASB_DISCARD_REMOTE:
2128 		rv =  1;
2129 	}
2130 
2131 	return rv;
2132 }
2133 
2134 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2135 {
2136 	int self, peer, hg, rv = -100;
2137 
2138 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2139 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2140 
2141 	switch (mdev->net_conf->after_sb_1p) {
2142 	case ASB_DISCARD_YOUNGER_PRI:
2143 	case ASB_DISCARD_OLDER_PRI:
2144 	case ASB_DISCARD_LEAST_CHG:
2145 	case ASB_DISCARD_LOCAL:
2146 	case ASB_DISCARD_REMOTE:
2147 		dev_err(DEV, "Configuration error.\n");
2148 		break;
2149 	case ASB_DISCONNECT:
2150 		break;
2151 	case ASB_CONSENSUS:
2152 		hg = drbd_asb_recover_0p(mdev);
2153 		if (hg == -1 && mdev->state.role == R_SECONDARY)
2154 			rv = hg;
2155 		if (hg == 1  && mdev->state.role == R_PRIMARY)
2156 			rv = hg;
2157 		break;
2158 	case ASB_VIOLENTLY:
2159 		rv = drbd_asb_recover_0p(mdev);
2160 		break;
2161 	case ASB_DISCARD_SECONDARY:
2162 		return mdev->state.role == R_PRIMARY ? 1 : -1;
2163 	case ASB_CALL_HELPER:
2164 		hg = drbd_asb_recover_0p(mdev);
2165 		if (hg == -1 && mdev->state.role == R_PRIMARY) {
2166 			self = drbd_set_role(mdev, R_SECONDARY, 0);
2167 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2168 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2169 			  * we do not need to wait for the after state change work either. */
2170 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2171 			if (self != SS_SUCCESS) {
2172 				drbd_khelper(mdev, "pri-lost-after-sb");
2173 			} else {
2174 				dev_warn(DEV, "Successfully gave up primary role.\n");
2175 				rv = hg;
2176 			}
2177 		} else
2178 			rv = hg;
2179 	}
2180 
2181 	return rv;
2182 }
2183 
2184 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2185 {
2186 	int self, peer, hg, rv = -100;
2187 
2188 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2189 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2190 
2191 	switch (mdev->net_conf->after_sb_2p) {
2192 	case ASB_DISCARD_YOUNGER_PRI:
2193 	case ASB_DISCARD_OLDER_PRI:
2194 	case ASB_DISCARD_LEAST_CHG:
2195 	case ASB_DISCARD_LOCAL:
2196 	case ASB_DISCARD_REMOTE:
2197 	case ASB_CONSENSUS:
2198 	case ASB_DISCARD_SECONDARY:
2199 		dev_err(DEV, "Configuration error.\n");
2200 		break;
2201 	case ASB_VIOLENTLY:
2202 		rv = drbd_asb_recover_0p(mdev);
2203 		break;
2204 	case ASB_DISCONNECT:
2205 		break;
2206 	case ASB_CALL_HELPER:
2207 		hg = drbd_asb_recover_0p(mdev);
2208 		if (hg == -1) {
2209 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2210 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2211 			  * we do not need to wait for the after state change work either. */
2212 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2213 			if (self != SS_SUCCESS) {
2214 				drbd_khelper(mdev, "pri-lost-after-sb");
2215 			} else {
2216 				dev_warn(DEV, "Successfully gave up primary role.\n");
2217 				rv = hg;
2218 			}
2219 		} else
2220 			rv = hg;
2221 	}
2222 
2223 	return rv;
2224 }
2225 
2226 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2227 			   u64 bits, u64 flags)
2228 {
2229 	if (!uuid) {
2230 		dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2231 		return;
2232 	}
2233 	dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2234 	     text,
2235 	     (unsigned long long)uuid[UI_CURRENT],
2236 	     (unsigned long long)uuid[UI_BITMAP],
2237 	     (unsigned long long)uuid[UI_HISTORY_START],
2238 	     (unsigned long long)uuid[UI_HISTORY_END],
2239 	     (unsigned long long)bits,
2240 	     (unsigned long long)flags);
2241 }
2242 
2243 /*
2244   100	after split brain try auto recover
2245     2	C_SYNC_SOURCE set BitMap
2246     1	C_SYNC_SOURCE use BitMap
2247     0	no Sync
2248    -1	C_SYNC_TARGET use BitMap
2249    -2	C_SYNC_TARGET set BitMap
2250  -100	after split brain, disconnect
2251 -1000	unrelated data
2252  */
2253 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2254 {
2255 	u64 self, peer;
2256 	int i, j;
2257 
2258 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2259 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2260 
2261 	*rule_nr = 10;
2262 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2263 		return 0;
2264 
2265 	*rule_nr = 20;
2266 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2267 	     peer != UUID_JUST_CREATED)
2268 		return -2;
2269 
2270 	*rule_nr = 30;
2271 	if (self != UUID_JUST_CREATED &&
2272 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2273 		return 2;
2274 
2275 	if (self == peer) {
2276 		int rct, dc; /* roles at crash time */
2277 
2278 		if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2279 
2280 			if (mdev->agreed_pro_version < 91)
2281 				return -1001;
2282 
2283 			if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2284 			    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2285 				dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2286 				drbd_uuid_set_bm(mdev, 0UL);
2287 
2288 				drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2289 					       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2290 				*rule_nr = 34;
2291 			} else {
2292 				dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2293 				*rule_nr = 36;
2294 			}
2295 
2296 			return 1;
2297 		}
2298 
2299 		if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2300 
2301 			if (mdev->agreed_pro_version < 91)
2302 				return -1001;
2303 
2304 			if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2305 			    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2306 				dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2307 
2308 				mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2309 				mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2310 				mdev->p_uuid[UI_BITMAP] = 0UL;
2311 
2312 				drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2313 				*rule_nr = 35;
2314 			} else {
2315 				dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2316 				*rule_nr = 37;
2317 			}
2318 
2319 			return -1;
2320 		}
2321 
2322 		/* Common power [off|failure] */
2323 		rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2324 			(mdev->p_uuid[UI_FLAGS] & 2);
2325 		/* lowest bit is set when we were primary,
2326 		 * next bit (weight 2) is set when peer was primary */
2327 		*rule_nr = 40;
2328 
2329 		switch (rct) {
2330 		case 0: /* !self_pri && !peer_pri */ return 0;
2331 		case 1: /*  self_pri && !peer_pri */ return 1;
2332 		case 2: /* !self_pri &&  peer_pri */ return -1;
2333 		case 3: /*  self_pri &&  peer_pri */
2334 			dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2335 			return dc ? -1 : 1;
2336 		}
2337 	}
2338 
2339 	*rule_nr = 50;
2340 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2341 	if (self == peer)
2342 		return -1;
2343 
2344 	*rule_nr = 51;
2345 	peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2346 	if (self == peer) {
2347 		self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2348 		peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2349 		if (self == peer) {
2350 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2351 			   resync as sync source modifications of the peer's UUIDs. */
2352 
2353 			if (mdev->agreed_pro_version < 91)
2354 				return -1001;
2355 
2356 			mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2357 			mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2358 			return -1;
2359 		}
2360 	}
2361 
2362 	*rule_nr = 60;
2363 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2364 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2365 		peer = mdev->p_uuid[i] & ~((u64)1);
2366 		if (self == peer)
2367 			return -2;
2368 	}
2369 
2370 	*rule_nr = 70;
2371 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2372 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2373 	if (self == peer)
2374 		return 1;
2375 
2376 	*rule_nr = 71;
2377 	self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2378 	if (self == peer) {
2379 		self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2380 		peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2381 		if (self == peer) {
2382 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2383 			   resync as sync source modifications of our UUIDs. */
2384 
2385 			if (mdev->agreed_pro_version < 91)
2386 				return -1001;
2387 
2388 			_drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2389 			_drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2390 
2391 			dev_info(DEV, "Undid last start of resync:\n");
2392 
2393 			drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2394 				       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2395 
2396 			return 1;
2397 		}
2398 	}
2399 
2400 
2401 	*rule_nr = 80;
2402 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2403 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2404 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2405 		if (self == peer)
2406 			return 2;
2407 	}
2408 
2409 	*rule_nr = 90;
2410 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2411 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2412 	if (self == peer && self != ((u64)0))
2413 		return 100;
2414 
2415 	*rule_nr = 100;
2416 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2417 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2418 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2419 			peer = mdev->p_uuid[j] & ~((u64)1);
2420 			if (self == peer)
2421 				return -100;
2422 		}
2423 	}
2424 
2425 	return -1000;
2426 }
2427 
2428 /* drbd_sync_handshake() returns the new conn state on success, or
2429    CONN_MASK (-1) on failure.
2430  */
2431 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2432 					   enum drbd_disk_state peer_disk) __must_hold(local)
2433 {
2434 	int hg, rule_nr;
2435 	enum drbd_conns rv = C_MASK;
2436 	enum drbd_disk_state mydisk;
2437 
2438 	mydisk = mdev->state.disk;
2439 	if (mydisk == D_NEGOTIATING)
2440 		mydisk = mdev->new_state_tmp.disk;
2441 
2442 	dev_info(DEV, "drbd_sync_handshake:\n");
2443 	drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2444 	drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2445 		       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2446 
2447 	hg = drbd_uuid_compare(mdev, &rule_nr);
2448 
2449 	dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2450 
2451 	if (hg == -1000) {
2452 		dev_alert(DEV, "Unrelated data, aborting!\n");
2453 		return C_MASK;
2454 	}
2455 	if (hg == -1001) {
2456 		dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2457 		return C_MASK;
2458 	}
2459 
2460 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2461 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2462 		int f = (hg == -100) || abs(hg) == 2;
2463 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
2464 		if (f)
2465 			hg = hg*2;
2466 		dev_info(DEV, "Becoming sync %s due to disk states.\n",
2467 		     hg > 0 ? "source" : "target");
2468 	}
2469 
2470 	if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2471 		int pcount = (mdev->state.role == R_PRIMARY)
2472 			   + (peer_role == R_PRIMARY);
2473 		int forced = (hg == -100);
2474 
2475 		switch (pcount) {
2476 		case 0:
2477 			hg = drbd_asb_recover_0p(mdev);
2478 			break;
2479 		case 1:
2480 			hg = drbd_asb_recover_1p(mdev);
2481 			break;
2482 		case 2:
2483 			hg = drbd_asb_recover_2p(mdev);
2484 			break;
2485 		}
2486 		if (abs(hg) < 100) {
2487 			dev_warn(DEV, "Split-Brain detected, %d primaries, "
2488 			     "automatically solved. Sync from %s node\n",
2489 			     pcount, (hg < 0) ? "peer" : "this");
2490 			if (forced) {
2491 				dev_warn(DEV, "Doing a full sync, since"
2492 				     " UUIDs where ambiguous.\n");
2493 				hg = hg*2;
2494 			}
2495 		}
2496 	}
2497 
2498 	if (hg == -100) {
2499 		if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2500 			hg = -1;
2501 		if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2502 			hg = 1;
2503 
2504 		if (abs(hg) < 100)
2505 			dev_warn(DEV, "Split-Brain detected, manually solved. "
2506 			     "Sync from %s node\n",
2507 			     (hg < 0) ? "peer" : "this");
2508 	}
2509 
2510 	if (hg == -100) {
2511 		dev_alert(DEV, "Split-Brain detected, dropping connection!\n");
2512 		drbd_khelper(mdev, "split-brain");
2513 		return C_MASK;
2514 	}
2515 
2516 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
2517 		dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2518 		return C_MASK;
2519 	}
2520 
2521 	if (hg < 0 && /* by intention we do not use mydisk here. */
2522 	    mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2523 		switch (mdev->net_conf->rr_conflict) {
2524 		case ASB_CALL_HELPER:
2525 			drbd_khelper(mdev, "pri-lost");
2526 			/* fall through */
2527 		case ASB_DISCONNECT:
2528 			dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2529 			return C_MASK;
2530 		case ASB_VIOLENTLY:
2531 			dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2532 			     "assumption\n");
2533 		}
2534 	}
2535 
2536 	if (abs(hg) >= 2) {
2537 		dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2538 		if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2539 			return C_MASK;
2540 	}
2541 
2542 	if (hg > 0) { /* become sync source. */
2543 		rv = C_WF_BITMAP_S;
2544 	} else if (hg < 0) { /* become sync target */
2545 		rv = C_WF_BITMAP_T;
2546 	} else {
2547 		rv = C_CONNECTED;
2548 		if (drbd_bm_total_weight(mdev)) {
2549 			dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2550 			     drbd_bm_total_weight(mdev));
2551 		}
2552 	}
2553 
2554 	return rv;
2555 }
2556 
2557 /* returns 1 if invalid */
2558 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2559 {
2560 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2561 	if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2562 	    (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2563 		return 0;
2564 
2565 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2566 	if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2567 	    self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2568 		return 1;
2569 
2570 	/* everything else is valid if they are equal on both sides. */
2571 	if (peer == self)
2572 		return 0;
2573 
2574 	/* everything es is invalid. */
2575 	return 1;
2576 }
2577 
2578 static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2579 {
2580 	struct p_protocol *p = (struct p_protocol *)h;
2581 	int header_size, data_size;
2582 	int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2583 	int p_want_lose, p_two_primaries;
2584 	char p_integrity_alg[SHARED_SECRET_MAX] = "";
2585 
2586 	header_size = sizeof(*p) - sizeof(*h);
2587 	data_size   = h->length  - header_size;
2588 
2589 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2590 		return FALSE;
2591 
2592 	p_proto		= be32_to_cpu(p->protocol);
2593 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
2594 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
2595 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
2596 	p_want_lose	= be32_to_cpu(p->want_lose);
2597 	p_two_primaries = be32_to_cpu(p->two_primaries);
2598 
2599 	if (p_proto != mdev->net_conf->wire_protocol) {
2600 		dev_err(DEV, "incompatible communication protocols\n");
2601 		goto disconnect;
2602 	}
2603 
2604 	if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2605 		dev_err(DEV, "incompatible after-sb-0pri settings\n");
2606 		goto disconnect;
2607 	}
2608 
2609 	if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2610 		dev_err(DEV, "incompatible after-sb-1pri settings\n");
2611 		goto disconnect;
2612 	}
2613 
2614 	if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2615 		dev_err(DEV, "incompatible after-sb-2pri settings\n");
2616 		goto disconnect;
2617 	}
2618 
2619 	if (p_want_lose && mdev->net_conf->want_lose) {
2620 		dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2621 		goto disconnect;
2622 	}
2623 
2624 	if (p_two_primaries != mdev->net_conf->two_primaries) {
2625 		dev_err(DEV, "incompatible setting of the two-primaries options\n");
2626 		goto disconnect;
2627 	}
2628 
2629 	if (mdev->agreed_pro_version >= 87) {
2630 		unsigned char *my_alg = mdev->net_conf->integrity_alg;
2631 
2632 		if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2633 			return FALSE;
2634 
2635 		p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2636 		if (strcmp(p_integrity_alg, my_alg)) {
2637 			dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2638 			goto disconnect;
2639 		}
2640 		dev_info(DEV, "data-integrity-alg: %s\n",
2641 		     my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2642 	}
2643 
2644 	return TRUE;
2645 
2646 disconnect:
2647 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2648 	return FALSE;
2649 }
2650 
2651 /* helper function
2652  * input: alg name, feature name
2653  * return: NULL (alg name was "")
2654  *         ERR_PTR(error) if something goes wrong
2655  *         or the crypto hash ptr, if it worked out ok. */
2656 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2657 		const char *alg, const char *name)
2658 {
2659 	struct crypto_hash *tfm;
2660 
2661 	if (!alg[0])
2662 		return NULL;
2663 
2664 	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2665 	if (IS_ERR(tfm)) {
2666 		dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2667 			alg, name, PTR_ERR(tfm));
2668 		return tfm;
2669 	}
2670 	if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2671 		crypto_free_hash(tfm);
2672 		dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2673 		return ERR_PTR(-EINVAL);
2674 	}
2675 	return tfm;
2676 }
2677 
2678 static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2679 {
2680 	int ok = TRUE;
2681 	struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2682 	unsigned int header_size, data_size, exp_max_sz;
2683 	struct crypto_hash *verify_tfm = NULL;
2684 	struct crypto_hash *csums_tfm = NULL;
2685 	const int apv = mdev->agreed_pro_version;
2686 
2687 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2688 		    : apv == 88 ? sizeof(struct p_rs_param)
2689 					+ SHARED_SECRET_MAX
2690 		    : /* 89 */    sizeof(struct p_rs_param_89);
2691 
2692 	if (h->length > exp_max_sz) {
2693 		dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2694 		    h->length, exp_max_sz);
2695 		return FALSE;
2696 	}
2697 
2698 	if (apv <= 88) {
2699 		header_size = sizeof(struct p_rs_param) - sizeof(*h);
2700 		data_size   = h->length  - header_size;
2701 	} else /* apv >= 89 */ {
2702 		header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2703 		data_size   = h->length  - header_size;
2704 		D_ASSERT(data_size == 0);
2705 	}
2706 
2707 	/* initialize verify_alg and csums_alg */
2708 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2709 
2710 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2711 		return FALSE;
2712 
2713 	mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2714 
2715 	if (apv >= 88) {
2716 		if (apv == 88) {
2717 			if (data_size > SHARED_SECRET_MAX) {
2718 				dev_err(DEV, "verify-alg too long, "
2719 				    "peer wants %u, accepting only %u byte\n",
2720 						data_size, SHARED_SECRET_MAX);
2721 				return FALSE;
2722 			}
2723 
2724 			if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2725 				return FALSE;
2726 
2727 			/* we expect NUL terminated string */
2728 			/* but just in case someone tries to be evil */
2729 			D_ASSERT(p->verify_alg[data_size-1] == 0);
2730 			p->verify_alg[data_size-1] = 0;
2731 
2732 		} else /* apv >= 89 */ {
2733 			/* we still expect NUL terminated strings */
2734 			/* but just in case someone tries to be evil */
2735 			D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2736 			D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2737 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2738 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2739 		}
2740 
2741 		if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2742 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2743 				dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2744 				    mdev->sync_conf.verify_alg, p->verify_alg);
2745 				goto disconnect;
2746 			}
2747 			verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2748 					p->verify_alg, "verify-alg");
2749 			if (IS_ERR(verify_tfm)) {
2750 				verify_tfm = NULL;
2751 				goto disconnect;
2752 			}
2753 		}
2754 
2755 		if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2756 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2757 				dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2758 				    mdev->sync_conf.csums_alg, p->csums_alg);
2759 				goto disconnect;
2760 			}
2761 			csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2762 					p->csums_alg, "csums-alg");
2763 			if (IS_ERR(csums_tfm)) {
2764 				csums_tfm = NULL;
2765 				goto disconnect;
2766 			}
2767 		}
2768 
2769 
2770 		spin_lock(&mdev->peer_seq_lock);
2771 		/* lock against drbd_nl_syncer_conf() */
2772 		if (verify_tfm) {
2773 			strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2774 			mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2775 			crypto_free_hash(mdev->verify_tfm);
2776 			mdev->verify_tfm = verify_tfm;
2777 			dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2778 		}
2779 		if (csums_tfm) {
2780 			strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2781 			mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2782 			crypto_free_hash(mdev->csums_tfm);
2783 			mdev->csums_tfm = csums_tfm;
2784 			dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2785 		}
2786 		spin_unlock(&mdev->peer_seq_lock);
2787 	}
2788 
2789 	return ok;
2790 disconnect:
2791 	/* just for completeness: actually not needed,
2792 	 * as this is not reached if csums_tfm was ok. */
2793 	crypto_free_hash(csums_tfm);
2794 	/* but free the verify_tfm again, if csums_tfm did not work out */
2795 	crypto_free_hash(verify_tfm);
2796 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2797 	return FALSE;
2798 }
2799 
2800 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2801 {
2802 	/* sorry, we currently have no working implementation
2803 	 * of distributed TCQ */
2804 }
2805 
2806 /* warn if the arguments differ by more than 12.5% */
2807 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2808 	const char *s, sector_t a, sector_t b)
2809 {
2810 	sector_t d;
2811 	if (a == 0 || b == 0)
2812 		return;
2813 	d = (a > b) ? (a - b) : (b - a);
2814 	if (d > (a>>3) || d > (b>>3))
2815 		dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2816 		     (unsigned long long)a, (unsigned long long)b);
2817 }
2818 
2819 static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2820 {
2821 	struct p_sizes *p = (struct p_sizes *)h;
2822 	enum determine_dev_size dd = unchanged;
2823 	unsigned int max_seg_s;
2824 	sector_t p_size, p_usize, my_usize;
2825 	int ldsc = 0; /* local disk size changed */
2826 	enum drbd_conns nconn;
2827 
2828 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2829 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
2830 		return FALSE;
2831 
2832 	p_size = be64_to_cpu(p->d_size);
2833 	p_usize = be64_to_cpu(p->u_size);
2834 
2835 	if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2836 		dev_err(DEV, "some backing storage is needed\n");
2837 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2838 		return FALSE;
2839 	}
2840 
2841 	/* just store the peer's disk size for now.
2842 	 * we still need to figure out whether we accept that. */
2843 	mdev->p_size = p_size;
2844 
2845 #define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2846 	if (get_ldev(mdev)) {
2847 		warn_if_differ_considerably(mdev, "lower level device sizes",
2848 			   p_size, drbd_get_max_capacity(mdev->ldev));
2849 		warn_if_differ_considerably(mdev, "user requested size",
2850 					    p_usize, mdev->ldev->dc.disk_size);
2851 
2852 		/* if this is the first connect, or an otherwise expected
2853 		 * param exchange, choose the minimum */
2854 		if (mdev->state.conn == C_WF_REPORT_PARAMS)
2855 			p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2856 					     p_usize);
2857 
2858 		my_usize = mdev->ldev->dc.disk_size;
2859 
2860 		if (mdev->ldev->dc.disk_size != p_usize) {
2861 			mdev->ldev->dc.disk_size = p_usize;
2862 			dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2863 			     (unsigned long)mdev->ldev->dc.disk_size);
2864 		}
2865 
2866 		/* Never shrink a device with usable data during connect.
2867 		   But allow online shrinking if we are connected. */
2868 		if (drbd_new_dev_size(mdev, mdev->ldev) <
2869 		   drbd_get_capacity(mdev->this_bdev) &&
2870 		   mdev->state.disk >= D_OUTDATED &&
2871 		   mdev->state.conn < C_CONNECTED) {
2872 			dev_err(DEV, "The peer's disk size is too small!\n");
2873 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2874 			mdev->ldev->dc.disk_size = my_usize;
2875 			put_ldev(mdev);
2876 			return FALSE;
2877 		}
2878 		put_ldev(mdev);
2879 	}
2880 #undef min_not_zero
2881 
2882 	if (get_ldev(mdev)) {
2883 		dd = drbd_determin_dev_size(mdev);
2884 		put_ldev(mdev);
2885 		if (dd == dev_size_error)
2886 			return FALSE;
2887 		drbd_md_sync(mdev);
2888 	} else {
2889 		/* I am diskless, need to accept the peer's size. */
2890 		drbd_set_my_capacity(mdev, p_size);
2891 	}
2892 
2893 	if (mdev->p_uuid && mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
2894 		nconn = drbd_sync_handshake(mdev,
2895 				mdev->state.peer, mdev->state.pdsk);
2896 		put_ldev(mdev);
2897 
2898 		if (nconn == C_MASK) {
2899 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2900 			return FALSE;
2901 		}
2902 
2903 		if (drbd_request_state(mdev, NS(conn, nconn)) < SS_SUCCESS) {
2904 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2905 			return FALSE;
2906 		}
2907 	}
2908 
2909 	if (get_ldev(mdev)) {
2910 		if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
2911 			mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
2912 			ldsc = 1;
2913 		}
2914 
2915 		max_seg_s = be32_to_cpu(p->max_segment_size);
2916 		if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
2917 			drbd_setup_queue_param(mdev, max_seg_s);
2918 
2919 		drbd_setup_order_type(mdev, be32_to_cpu(p->queue_order_type));
2920 		put_ldev(mdev);
2921 	}
2922 
2923 	if (mdev->state.conn > C_WF_REPORT_PARAMS) {
2924 		if (be64_to_cpu(p->c_size) !=
2925 		    drbd_get_capacity(mdev->this_bdev) || ldsc) {
2926 			/* we have different sizes, probably peer
2927 			 * needs to know my new size... */
2928 			drbd_send_sizes(mdev, 0);
2929 		}
2930 		if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
2931 		    (dd == grew && mdev->state.conn == C_CONNECTED)) {
2932 			if (mdev->state.pdsk >= D_INCONSISTENT &&
2933 			    mdev->state.disk >= D_INCONSISTENT)
2934 				resync_after_online_grow(mdev);
2935 			else
2936 				set_bit(RESYNC_AFTER_NEG, &mdev->flags);
2937 		}
2938 	}
2939 
2940 	return TRUE;
2941 }
2942 
2943 static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
2944 {
2945 	struct p_uuids *p = (struct p_uuids *)h;
2946 	u64 *p_uuid;
2947 	int i;
2948 
2949 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2950 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
2951 		return FALSE;
2952 
2953 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
2954 
2955 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
2956 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
2957 
2958 	kfree(mdev->p_uuid);
2959 	mdev->p_uuid = p_uuid;
2960 
2961 	if (mdev->state.conn < C_CONNECTED &&
2962 	    mdev->state.disk < D_INCONSISTENT &&
2963 	    mdev->state.role == R_PRIMARY &&
2964 	    (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
2965 		dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
2966 		    (unsigned long long)mdev->ed_uuid);
2967 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2968 		return FALSE;
2969 	}
2970 
2971 	if (get_ldev(mdev)) {
2972 		int skip_initial_sync =
2973 			mdev->state.conn == C_CONNECTED &&
2974 			mdev->agreed_pro_version >= 90 &&
2975 			mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
2976 			(p_uuid[UI_FLAGS] & 8);
2977 		if (skip_initial_sync) {
2978 			dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
2979 			drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
2980 					"clear_n_write from receive_uuids");
2981 			_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
2982 			_drbd_uuid_set(mdev, UI_BITMAP, 0);
2983 			_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
2984 					CS_VERBOSE, NULL);
2985 			drbd_md_sync(mdev);
2986 		}
2987 		put_ldev(mdev);
2988 	}
2989 
2990 	/* Before we test for the disk state, we should wait until an eventually
2991 	   ongoing cluster wide state change is finished. That is important if
2992 	   we are primary and are detaching from our disk. We need to see the
2993 	   new disk state... */
2994 	wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
2995 	if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
2996 		drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
2997 
2998 	return TRUE;
2999 }
3000 
3001 /**
3002  * convert_state() - Converts the peer's view of the cluster state to our point of view
3003  * @ps:		The state as seen by the peer.
3004  */
3005 static union drbd_state convert_state(union drbd_state ps)
3006 {
3007 	union drbd_state ms;
3008 
3009 	static enum drbd_conns c_tab[] = {
3010 		[C_CONNECTED] = C_CONNECTED,
3011 
3012 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3013 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3014 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3015 		[C_VERIFY_S]       = C_VERIFY_T,
3016 		[C_MASK]   = C_MASK,
3017 	};
3018 
3019 	ms.i = ps.i;
3020 
3021 	ms.conn = c_tab[ps.conn];
3022 	ms.peer = ps.role;
3023 	ms.role = ps.peer;
3024 	ms.pdsk = ps.disk;
3025 	ms.disk = ps.pdsk;
3026 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3027 
3028 	return ms;
3029 }
3030 
3031 static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3032 {
3033 	struct p_req_state *p = (struct p_req_state *)h;
3034 	union drbd_state mask, val;
3035 	int rv;
3036 
3037 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3038 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3039 		return FALSE;
3040 
3041 	mask.i = be32_to_cpu(p->mask);
3042 	val.i = be32_to_cpu(p->val);
3043 
3044 	if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3045 	    test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3046 		drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3047 		return TRUE;
3048 	}
3049 
3050 	mask = convert_state(mask);
3051 	val = convert_state(val);
3052 
3053 	rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3054 
3055 	drbd_send_sr_reply(mdev, rv);
3056 	drbd_md_sync(mdev);
3057 
3058 	return TRUE;
3059 }
3060 
3061 static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3062 {
3063 	struct p_state *p = (struct p_state *)h;
3064 	enum drbd_conns nconn, oconn;
3065 	union drbd_state ns, peer_state;
3066 	enum drbd_disk_state real_peer_disk;
3067 	int rv;
3068 
3069 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3070 		return FALSE;
3071 
3072 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3073 		return FALSE;
3074 
3075 	peer_state.i = be32_to_cpu(p->state);
3076 
3077 	real_peer_disk = peer_state.disk;
3078 	if (peer_state.disk == D_NEGOTIATING) {
3079 		real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3080 		dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3081 	}
3082 
3083 	spin_lock_irq(&mdev->req_lock);
3084  retry:
3085 	oconn = nconn = mdev->state.conn;
3086 	spin_unlock_irq(&mdev->req_lock);
3087 
3088 	if (nconn == C_WF_REPORT_PARAMS)
3089 		nconn = C_CONNECTED;
3090 
3091 	if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3092 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
3093 		int cr; /* consider resync */
3094 
3095 		/* if we established a new connection */
3096 		cr  = (oconn < C_CONNECTED);
3097 		/* if we had an established connection
3098 		 * and one of the nodes newly attaches a disk */
3099 		cr |= (oconn == C_CONNECTED &&
3100 		       (peer_state.disk == D_NEGOTIATING ||
3101 			mdev->state.disk == D_NEGOTIATING));
3102 		/* if we have both been inconsistent, and the peer has been
3103 		 * forced to be UpToDate with --overwrite-data */
3104 		cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3105 		/* if we had been plain connected, and the admin requested to
3106 		 * start a sync by "invalidate" or "invalidate-remote" */
3107 		cr |= (oconn == C_CONNECTED &&
3108 				(peer_state.conn >= C_STARTING_SYNC_S &&
3109 				 peer_state.conn <= C_WF_BITMAP_T));
3110 
3111 		if (cr)
3112 			nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3113 
3114 		put_ldev(mdev);
3115 		if (nconn == C_MASK) {
3116 			if (mdev->state.disk == D_NEGOTIATING) {
3117 				drbd_force_state(mdev, NS(disk, D_DISKLESS));
3118 				nconn = C_CONNECTED;
3119 			} else if (peer_state.disk == D_NEGOTIATING) {
3120 				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3121 				peer_state.disk = D_DISKLESS;
3122 			} else {
3123 				D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3124 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3125 				return FALSE;
3126 			}
3127 		}
3128 	}
3129 
3130 	spin_lock_irq(&mdev->req_lock);
3131 	if (mdev->state.conn != oconn)
3132 		goto retry;
3133 	clear_bit(CONSIDER_RESYNC, &mdev->flags);
3134 	ns.i = mdev->state.i;
3135 	ns.conn = nconn;
3136 	ns.peer = peer_state.role;
3137 	ns.pdsk = real_peer_disk;
3138 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3139 	if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3140 		ns.disk = mdev->new_state_tmp.disk;
3141 
3142 	rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3143 	ns = mdev->state;
3144 	spin_unlock_irq(&mdev->req_lock);
3145 
3146 	if (rv < SS_SUCCESS) {
3147 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3148 		return FALSE;
3149 	}
3150 
3151 	if (oconn > C_WF_REPORT_PARAMS) {
3152 		if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3153 		    peer_state.disk != D_NEGOTIATING ) {
3154 			/* we want resync, peer has not yet decided to sync... */
3155 			/* Nowadays only used when forcing a node into primary role and
3156 			   setting its disk to UpToDate with that */
3157 			drbd_send_uuids(mdev);
3158 			drbd_send_state(mdev);
3159 		}
3160 	}
3161 
3162 	mdev->net_conf->want_lose = 0;
3163 
3164 	drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3165 
3166 	return TRUE;
3167 }
3168 
3169 static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3170 {
3171 	struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3172 
3173 	wait_event(mdev->misc_wait,
3174 		   mdev->state.conn == C_WF_SYNC_UUID ||
3175 		   mdev->state.conn < C_CONNECTED ||
3176 		   mdev->state.disk < D_NEGOTIATING);
3177 
3178 	/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3179 
3180 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3181 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3182 		return FALSE;
3183 
3184 	/* Here the _drbd_uuid_ functions are right, current should
3185 	   _not_ be rotated into the history */
3186 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3187 		_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3188 		_drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3189 
3190 		drbd_start_resync(mdev, C_SYNC_TARGET);
3191 
3192 		put_ldev(mdev);
3193 	} else
3194 		dev_err(DEV, "Ignoring SyncUUID packet!\n");
3195 
3196 	return TRUE;
3197 }
3198 
3199 enum receive_bitmap_ret { OK, DONE, FAILED };
3200 
3201 static enum receive_bitmap_ret
3202 receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3203 	unsigned long *buffer, struct bm_xfer_ctx *c)
3204 {
3205 	unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3206 	unsigned want = num_words * sizeof(long);
3207 
3208 	if (want != h->length) {
3209 		dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3210 		return FAILED;
3211 	}
3212 	if (want == 0)
3213 		return DONE;
3214 	if (drbd_recv(mdev, buffer, want) != want)
3215 		return FAILED;
3216 
3217 	drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3218 
3219 	c->word_offset += num_words;
3220 	c->bit_offset = c->word_offset * BITS_PER_LONG;
3221 	if (c->bit_offset > c->bm_bits)
3222 		c->bit_offset = c->bm_bits;
3223 
3224 	return OK;
3225 }
3226 
3227 static enum receive_bitmap_ret
3228 recv_bm_rle_bits(struct drbd_conf *mdev,
3229 		struct p_compressed_bm *p,
3230 		struct bm_xfer_ctx *c)
3231 {
3232 	struct bitstream bs;
3233 	u64 look_ahead;
3234 	u64 rl;
3235 	u64 tmp;
3236 	unsigned long s = c->bit_offset;
3237 	unsigned long e;
3238 	int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3239 	int toggle = DCBP_get_start(p);
3240 	int have;
3241 	int bits;
3242 
3243 	bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3244 
3245 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
3246 	if (bits < 0)
3247 		return FAILED;
3248 
3249 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
3250 		bits = vli_decode_bits(&rl, look_ahead);
3251 		if (bits <= 0)
3252 			return FAILED;
3253 
3254 		if (toggle) {
3255 			e = s + rl -1;
3256 			if (e >= c->bm_bits) {
3257 				dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3258 				return FAILED;
3259 			}
3260 			_drbd_bm_set_bits(mdev, s, e);
3261 		}
3262 
3263 		if (have < bits) {
3264 			dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3265 				have, bits, look_ahead,
3266 				(unsigned int)(bs.cur.b - p->code),
3267 				(unsigned int)bs.buf_len);
3268 			return FAILED;
3269 		}
3270 		look_ahead >>= bits;
3271 		have -= bits;
3272 
3273 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3274 		if (bits < 0)
3275 			return FAILED;
3276 		look_ahead |= tmp << have;
3277 		have += bits;
3278 	}
3279 
3280 	c->bit_offset = s;
3281 	bm_xfer_ctx_bit_to_word_offset(c);
3282 
3283 	return (s == c->bm_bits) ? DONE : OK;
3284 }
3285 
3286 static enum receive_bitmap_ret
3287 decode_bitmap_c(struct drbd_conf *mdev,
3288 		struct p_compressed_bm *p,
3289 		struct bm_xfer_ctx *c)
3290 {
3291 	if (DCBP_get_code(p) == RLE_VLI_Bits)
3292 		return recv_bm_rle_bits(mdev, p, c);
3293 
3294 	/* other variants had been implemented for evaluation,
3295 	 * but have been dropped as this one turned out to be "best"
3296 	 * during all our tests. */
3297 
3298 	dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3299 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3300 	return FAILED;
3301 }
3302 
3303 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3304 		const char *direction, struct bm_xfer_ctx *c)
3305 {
3306 	/* what would it take to transfer it "plaintext" */
3307 	unsigned plain = sizeof(struct p_header) *
3308 		((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3309 		+ c->bm_words * sizeof(long);
3310 	unsigned total = c->bytes[0] + c->bytes[1];
3311 	unsigned r;
3312 
3313 	/* total can not be zero. but just in case: */
3314 	if (total == 0)
3315 		return;
3316 
3317 	/* don't report if not compressed */
3318 	if (total >= plain)
3319 		return;
3320 
3321 	/* total < plain. check for overflow, still */
3322 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3323 		                    : (1000 * total / plain);
3324 
3325 	if (r > 1000)
3326 		r = 1000;
3327 
3328 	r = 1000 - r;
3329 	dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3330 	     "total %u; compression: %u.%u%%\n",
3331 			direction,
3332 			c->bytes[1], c->packets[1],
3333 			c->bytes[0], c->packets[0],
3334 			total, r/10, r % 10);
3335 }
3336 
3337 /* Since we are processing the bitfield from lower addresses to higher,
3338    it does not matter if the process it in 32 bit chunks or 64 bit
3339    chunks as long as it is little endian. (Understand it as byte stream,
3340    beginning with the lowest byte...) If we would use big endian
3341    we would need to process it from the highest address to the lowest,
3342    in order to be agnostic to the 32 vs 64 bits issue.
3343 
3344    returns 0 on failure, 1 if we successfully received it. */
3345 static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3346 {
3347 	struct bm_xfer_ctx c;
3348 	void *buffer;
3349 	enum receive_bitmap_ret ret;
3350 	int ok = FALSE;
3351 
3352 	wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3353 
3354 	drbd_bm_lock(mdev, "receive bitmap");
3355 
3356 	/* maybe we should use some per thread scratch page,
3357 	 * and allocate that during initial device creation? */
3358 	buffer	 = (unsigned long *) __get_free_page(GFP_NOIO);
3359 	if (!buffer) {
3360 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3361 		goto out;
3362 	}
3363 
3364 	c = (struct bm_xfer_ctx) {
3365 		.bm_bits = drbd_bm_bits(mdev),
3366 		.bm_words = drbd_bm_words(mdev),
3367 	};
3368 
3369 	do {
3370 		if (h->command == P_BITMAP) {
3371 			ret = receive_bitmap_plain(mdev, h, buffer, &c);
3372 		} else if (h->command == P_COMPRESSED_BITMAP) {
3373 			/* MAYBE: sanity check that we speak proto >= 90,
3374 			 * and the feature is enabled! */
3375 			struct p_compressed_bm *p;
3376 
3377 			if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3378 				dev_err(DEV, "ReportCBitmap packet too large\n");
3379 				goto out;
3380 			}
3381 			/* use the page buff */
3382 			p = buffer;
3383 			memcpy(p, h, sizeof(*h));
3384 			if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3385 				goto out;
3386 			if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3387 				dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3388 				return FAILED;
3389 			}
3390 			ret = decode_bitmap_c(mdev, p, &c);
3391 		} else {
3392 			dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3393 			goto out;
3394 		}
3395 
3396 		c.packets[h->command == P_BITMAP]++;
3397 		c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3398 
3399 		if (ret != OK)
3400 			break;
3401 
3402 		if (!drbd_recv_header(mdev, h))
3403 			goto out;
3404 	} while (ret == OK);
3405 	if (ret == FAILED)
3406 		goto out;
3407 
3408 	INFO_bm_xfer_stats(mdev, "receive", &c);
3409 
3410 	if (mdev->state.conn == C_WF_BITMAP_T) {
3411 		ok = !drbd_send_bitmap(mdev);
3412 		if (!ok)
3413 			goto out;
3414 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3415 		ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3416 		D_ASSERT(ok == SS_SUCCESS);
3417 	} else if (mdev->state.conn != C_WF_BITMAP_S) {
3418 		/* admin may have requested C_DISCONNECTING,
3419 		 * other threads may have noticed network errors */
3420 		dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3421 		    drbd_conn_str(mdev->state.conn));
3422 	}
3423 
3424 	ok = TRUE;
3425  out:
3426 	drbd_bm_unlock(mdev);
3427 	if (ok && mdev->state.conn == C_WF_BITMAP_S)
3428 		drbd_start_resync(mdev, C_SYNC_SOURCE);
3429 	free_page((unsigned long) buffer);
3430 	return ok;
3431 }
3432 
3433 static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3434 {
3435 	/* TODO zero copy sink :) */
3436 	static char sink[128];
3437 	int size, want, r;
3438 
3439 	dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3440 	     h->command, h->length);
3441 
3442 	size = h->length;
3443 	while (size > 0) {
3444 		want = min_t(int, size, sizeof(sink));
3445 		r = drbd_recv(mdev, sink, want);
3446 		ERR_IF(r <= 0) break;
3447 		size -= r;
3448 	}
3449 	return size == 0;
3450 }
3451 
3452 static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3453 {
3454 	if (mdev->state.disk >= D_INCONSISTENT)
3455 		drbd_kick_lo(mdev);
3456 
3457 	/* Make sure we've acked all the TCP data associated
3458 	 * with the data requests being unplugged */
3459 	drbd_tcp_quickack(mdev->data.socket);
3460 
3461 	return TRUE;
3462 }
3463 
3464 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3465 
3466 static drbd_cmd_handler_f drbd_default_handler[] = {
3467 	[P_DATA]	    = receive_Data,
3468 	[P_DATA_REPLY]	    = receive_DataReply,
3469 	[P_RS_DATA_REPLY]   = receive_RSDataReply,
3470 	[P_BARRIER]	    = receive_Barrier,
3471 	[P_BITMAP]	    = receive_bitmap,
3472 	[P_COMPRESSED_BITMAP]    = receive_bitmap,
3473 	[P_UNPLUG_REMOTE]   = receive_UnplugRemote,
3474 	[P_DATA_REQUEST]    = receive_DataRequest,
3475 	[P_RS_DATA_REQUEST] = receive_DataRequest,
3476 	[P_SYNC_PARAM]	    = receive_SyncParam,
3477 	[P_SYNC_PARAM89]	   = receive_SyncParam,
3478 	[P_PROTOCOL]        = receive_protocol,
3479 	[P_UUIDS]	    = receive_uuids,
3480 	[P_SIZES]	    = receive_sizes,
3481 	[P_STATE]	    = receive_state,
3482 	[P_STATE_CHG_REQ]   = receive_req_state,
3483 	[P_SYNC_UUID]       = receive_sync_uuid,
3484 	[P_OV_REQUEST]      = receive_DataRequest,
3485 	[P_OV_REPLY]        = receive_DataRequest,
3486 	[P_CSUM_RS_REQUEST]    = receive_DataRequest,
3487 	/* anything missing from this table is in
3488 	 * the asender_tbl, see get_asender_cmd */
3489 	[P_MAX_CMD]	    = NULL,
3490 };
3491 
3492 static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3493 static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3494 
3495 static void drbdd(struct drbd_conf *mdev)
3496 {
3497 	drbd_cmd_handler_f handler;
3498 	struct p_header *header = &mdev->data.rbuf.header;
3499 
3500 	while (get_t_state(&mdev->receiver) == Running) {
3501 		drbd_thread_current_set_cpu(mdev);
3502 		if (!drbd_recv_header(mdev, header)) {
3503 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3504 			break;
3505 		}
3506 
3507 		if (header->command < P_MAX_CMD)
3508 			handler = drbd_cmd_handler[header->command];
3509 		else if (P_MAY_IGNORE < header->command
3510 		     && header->command < P_MAX_OPT_CMD)
3511 			handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3512 		else if (header->command > P_MAX_OPT_CMD)
3513 			handler = receive_skip;
3514 		else
3515 			handler = NULL;
3516 
3517 		if (unlikely(!handler)) {
3518 			dev_err(DEV, "unknown packet type %d, l: %d!\n",
3519 			    header->command, header->length);
3520 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3521 			break;
3522 		}
3523 		if (unlikely(!handler(mdev, header))) {
3524 			dev_err(DEV, "error receiving %s, l: %d!\n",
3525 			    cmdname(header->command), header->length);
3526 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3527 			break;
3528 		}
3529 	}
3530 }
3531 
3532 static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3533 {
3534 	struct hlist_head *slot;
3535 	struct hlist_node *pos;
3536 	struct hlist_node *tmp;
3537 	struct drbd_request *req;
3538 	int i;
3539 
3540 	/*
3541 	 * Application READ requests
3542 	 */
3543 	spin_lock_irq(&mdev->req_lock);
3544 	for (i = 0; i < APP_R_HSIZE; i++) {
3545 		slot = mdev->app_reads_hash+i;
3546 		hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3547 			/* it may (but should not any longer!)
3548 			 * be on the work queue; if that assert triggers,
3549 			 * we need to also grab the
3550 			 * spin_lock_irq(&mdev->data.work.q_lock);
3551 			 * and list_del_init here. */
3552 			D_ASSERT(list_empty(&req->w.list));
3553 			/* It would be nice to complete outside of spinlock.
3554 			 * But this is easier for now. */
3555 			_req_mod(req, connection_lost_while_pending);
3556 		}
3557 	}
3558 	for (i = 0; i < APP_R_HSIZE; i++)
3559 		if (!hlist_empty(mdev->app_reads_hash+i))
3560 			dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3561 				"%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3562 
3563 	memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3564 	spin_unlock_irq(&mdev->req_lock);
3565 }
3566 
3567 void drbd_flush_workqueue(struct drbd_conf *mdev)
3568 {
3569 	struct drbd_wq_barrier barr;
3570 
3571 	barr.w.cb = w_prev_work_done;
3572 	init_completion(&barr.done);
3573 	drbd_queue_work(&mdev->data.work, &barr.w);
3574 	wait_for_completion(&barr.done);
3575 }
3576 
3577 static void drbd_disconnect(struct drbd_conf *mdev)
3578 {
3579 	enum drbd_fencing_p fp;
3580 	union drbd_state os, ns;
3581 	int rv = SS_UNKNOWN_ERROR;
3582 	unsigned int i;
3583 
3584 	if (mdev->state.conn == C_STANDALONE)
3585 		return;
3586 	if (mdev->state.conn >= C_WF_CONNECTION)
3587 		dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3588 				drbd_conn_str(mdev->state.conn));
3589 
3590 	/* asender does not clean up anything. it must not interfere, either */
3591 	drbd_thread_stop(&mdev->asender);
3592 
3593 	mutex_lock(&mdev->data.mutex);
3594 	drbd_free_sock(mdev);
3595 	mutex_unlock(&mdev->data.mutex);
3596 
3597 	spin_lock_irq(&mdev->req_lock);
3598 	_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3599 	_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3600 	_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3601 	spin_unlock_irq(&mdev->req_lock);
3602 
3603 	/* We do not have data structures that would allow us to
3604 	 * get the rs_pending_cnt down to 0 again.
3605 	 *  * On C_SYNC_TARGET we do not have any data structures describing
3606 	 *    the pending RSDataRequest's we have sent.
3607 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
3608 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3609 	 *  And no, it is not the sum of the reference counts in the
3610 	 *  resync_LRU. The resync_LRU tracks the whole operation including
3611 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3612 	 *  on the fly. */
3613 	drbd_rs_cancel_all(mdev);
3614 	mdev->rs_total = 0;
3615 	mdev->rs_failed = 0;
3616 	atomic_set(&mdev->rs_pending_cnt, 0);
3617 	wake_up(&mdev->misc_wait);
3618 
3619 	/* make sure syncer is stopped and w_resume_next_sg queued */
3620 	del_timer_sync(&mdev->resync_timer);
3621 	set_bit(STOP_SYNC_TIMER, &mdev->flags);
3622 	resync_timer_fn((unsigned long)mdev);
3623 
3624 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3625 	 * w_make_resync_request etc. which may still be on the worker queue
3626 	 * to be "canceled" */
3627 	drbd_flush_workqueue(mdev);
3628 
3629 	/* This also does reclaim_net_ee().  If we do this too early, we might
3630 	 * miss some resync ee and pages.*/
3631 	drbd_process_done_ee(mdev);
3632 
3633 	kfree(mdev->p_uuid);
3634 	mdev->p_uuid = NULL;
3635 
3636 	if (!mdev->state.susp)
3637 		tl_clear(mdev);
3638 
3639 	drbd_fail_pending_reads(mdev);
3640 
3641 	dev_info(DEV, "Connection closed\n");
3642 
3643 	drbd_md_sync(mdev);
3644 
3645 	fp = FP_DONT_CARE;
3646 	if (get_ldev(mdev)) {
3647 		fp = mdev->ldev->dc.fencing;
3648 		put_ldev(mdev);
3649 	}
3650 
3651 	if (mdev->state.role == R_PRIMARY) {
3652 		if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3653 			enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3654 			drbd_request_state(mdev, NS(pdsk, nps));
3655 		}
3656 	}
3657 
3658 	spin_lock_irq(&mdev->req_lock);
3659 	os = mdev->state;
3660 	if (os.conn >= C_UNCONNECTED) {
3661 		/* Do not restart in case we are C_DISCONNECTING */
3662 		ns = os;
3663 		ns.conn = C_UNCONNECTED;
3664 		rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3665 	}
3666 	spin_unlock_irq(&mdev->req_lock);
3667 
3668 	if (os.conn == C_DISCONNECTING) {
3669 		struct hlist_head *h;
3670 		wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3671 
3672 		/* we must not free the tl_hash
3673 		 * while application io is still on the fly */
3674 		wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3675 
3676 		spin_lock_irq(&mdev->req_lock);
3677 		/* paranoia code */
3678 		for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3679 			if (h->first)
3680 				dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3681 						(int)(h - mdev->ee_hash), h->first);
3682 		kfree(mdev->ee_hash);
3683 		mdev->ee_hash = NULL;
3684 		mdev->ee_hash_s = 0;
3685 
3686 		/* paranoia code */
3687 		for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3688 			if (h->first)
3689 				dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3690 						(int)(h - mdev->tl_hash), h->first);
3691 		kfree(mdev->tl_hash);
3692 		mdev->tl_hash = NULL;
3693 		mdev->tl_hash_s = 0;
3694 		spin_unlock_irq(&mdev->req_lock);
3695 
3696 		crypto_free_hash(mdev->cram_hmac_tfm);
3697 		mdev->cram_hmac_tfm = NULL;
3698 
3699 		kfree(mdev->net_conf);
3700 		mdev->net_conf = NULL;
3701 		drbd_request_state(mdev, NS(conn, C_STANDALONE));
3702 	}
3703 
3704 	/* tcp_close and release of sendpage pages can be deferred.  I don't
3705 	 * want to use SO_LINGER, because apparently it can be deferred for
3706 	 * more than 20 seconds (longest time I checked).
3707 	 *
3708 	 * Actually we don't care for exactly when the network stack does its
3709 	 * put_page(), but release our reference on these pages right here.
3710 	 */
3711 	i = drbd_release_ee(mdev, &mdev->net_ee);
3712 	if (i)
3713 		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3714 	i = atomic_read(&mdev->pp_in_use);
3715 	if (i)
3716 		dev_info(DEV, "pp_in_use = %u, expected 0\n", i);
3717 
3718 	D_ASSERT(list_empty(&mdev->read_ee));
3719 	D_ASSERT(list_empty(&mdev->active_ee));
3720 	D_ASSERT(list_empty(&mdev->sync_ee));
3721 	D_ASSERT(list_empty(&mdev->done_ee));
3722 
3723 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3724 	atomic_set(&mdev->current_epoch->epoch_size, 0);
3725 	D_ASSERT(list_empty(&mdev->current_epoch->list));
3726 }
3727 
3728 /*
3729  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3730  * we can agree on is stored in agreed_pro_version.
3731  *
3732  * feature flags and the reserved array should be enough room for future
3733  * enhancements of the handshake protocol, and possible plugins...
3734  *
3735  * for now, they are expected to be zero, but ignored.
3736  */
3737 static int drbd_send_handshake(struct drbd_conf *mdev)
3738 {
3739 	/* ASSERT current == mdev->receiver ... */
3740 	struct p_handshake *p = &mdev->data.sbuf.handshake;
3741 	int ok;
3742 
3743 	if (mutex_lock_interruptible(&mdev->data.mutex)) {
3744 		dev_err(DEV, "interrupted during initial handshake\n");
3745 		return 0; /* interrupted. not ok. */
3746 	}
3747 
3748 	if (mdev->data.socket == NULL) {
3749 		mutex_unlock(&mdev->data.mutex);
3750 		return 0;
3751 	}
3752 
3753 	memset(p, 0, sizeof(*p));
3754 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3755 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3756 	ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3757 			     (struct p_header *)p, sizeof(*p), 0 );
3758 	mutex_unlock(&mdev->data.mutex);
3759 	return ok;
3760 }
3761 
3762 /*
3763  * return values:
3764  *   1 yes, we have a valid connection
3765  *   0 oops, did not work out, please try again
3766  *  -1 peer talks different language,
3767  *     no point in trying again, please go standalone.
3768  */
3769 static int drbd_do_handshake(struct drbd_conf *mdev)
3770 {
3771 	/* ASSERT current == mdev->receiver ... */
3772 	struct p_handshake *p = &mdev->data.rbuf.handshake;
3773 	const int expect = sizeof(struct p_handshake)
3774 			  -sizeof(struct p_header);
3775 	int rv;
3776 
3777 	rv = drbd_send_handshake(mdev);
3778 	if (!rv)
3779 		return 0;
3780 
3781 	rv = drbd_recv_header(mdev, &p->head);
3782 	if (!rv)
3783 		return 0;
3784 
3785 	if (p->head.command != P_HAND_SHAKE) {
3786 		dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3787 		     cmdname(p->head.command), p->head.command);
3788 		return -1;
3789 	}
3790 
3791 	if (p->head.length != expect) {
3792 		dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3793 		     expect, p->head.length);
3794 		return -1;
3795 	}
3796 
3797 	rv = drbd_recv(mdev, &p->head.payload, expect);
3798 
3799 	if (rv != expect) {
3800 		dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3801 		return 0;
3802 	}
3803 
3804 	p->protocol_min = be32_to_cpu(p->protocol_min);
3805 	p->protocol_max = be32_to_cpu(p->protocol_max);
3806 	if (p->protocol_max == 0)
3807 		p->protocol_max = p->protocol_min;
3808 
3809 	if (PRO_VERSION_MAX < p->protocol_min ||
3810 	    PRO_VERSION_MIN > p->protocol_max)
3811 		goto incompat;
3812 
3813 	mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3814 
3815 	dev_info(DEV, "Handshake successful: "
3816 	     "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3817 
3818 	return 1;
3819 
3820  incompat:
3821 	dev_err(DEV, "incompatible DRBD dialects: "
3822 	    "I support %d-%d, peer supports %d-%d\n",
3823 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
3824 	    p->protocol_min, p->protocol_max);
3825 	return -1;
3826 }
3827 
3828 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3829 static int drbd_do_auth(struct drbd_conf *mdev)
3830 {
3831 	dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3832 	dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3833 	return 0;
3834 }
3835 #else
3836 #define CHALLENGE_LEN 64
3837 static int drbd_do_auth(struct drbd_conf *mdev)
3838 {
3839 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
3840 	struct scatterlist sg;
3841 	char *response = NULL;
3842 	char *right_response = NULL;
3843 	char *peers_ch = NULL;
3844 	struct p_header p;
3845 	unsigned int key_len = strlen(mdev->net_conf->shared_secret);
3846 	unsigned int resp_size;
3847 	struct hash_desc desc;
3848 	int rv;
3849 
3850 	desc.tfm = mdev->cram_hmac_tfm;
3851 	desc.flags = 0;
3852 
3853 	rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
3854 				(u8 *)mdev->net_conf->shared_secret, key_len);
3855 	if (rv) {
3856 		dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
3857 		rv = 0;
3858 		goto fail;
3859 	}
3860 
3861 	get_random_bytes(my_challenge, CHALLENGE_LEN);
3862 
3863 	rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
3864 	if (!rv)
3865 		goto fail;
3866 
3867 	rv = drbd_recv_header(mdev, &p);
3868 	if (!rv)
3869 		goto fail;
3870 
3871 	if (p.command != P_AUTH_CHALLENGE) {
3872 		dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
3873 		    cmdname(p.command), p.command);
3874 		rv = 0;
3875 		goto fail;
3876 	}
3877 
3878 	if (p.length > CHALLENGE_LEN*2) {
3879 		dev_err(DEV, "expected AuthChallenge payload too big.\n");
3880 		rv = 0;
3881 		goto fail;
3882 	}
3883 
3884 	peers_ch = kmalloc(p.length, GFP_NOIO);
3885 	if (peers_ch == NULL) {
3886 		dev_err(DEV, "kmalloc of peers_ch failed\n");
3887 		rv = 0;
3888 		goto fail;
3889 	}
3890 
3891 	rv = drbd_recv(mdev, peers_ch, p.length);
3892 
3893 	if (rv != p.length) {
3894 		dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
3895 		rv = 0;
3896 		goto fail;
3897 	}
3898 
3899 	resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
3900 	response = kmalloc(resp_size, GFP_NOIO);
3901 	if (response == NULL) {
3902 		dev_err(DEV, "kmalloc of response failed\n");
3903 		rv = 0;
3904 		goto fail;
3905 	}
3906 
3907 	sg_init_table(&sg, 1);
3908 	sg_set_buf(&sg, peers_ch, p.length);
3909 
3910 	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
3911 	if (rv) {
3912 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
3913 		rv = 0;
3914 		goto fail;
3915 	}
3916 
3917 	rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
3918 	if (!rv)
3919 		goto fail;
3920 
3921 	rv = drbd_recv_header(mdev, &p);
3922 	if (!rv)
3923 		goto fail;
3924 
3925 	if (p.command != P_AUTH_RESPONSE) {
3926 		dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
3927 		    cmdname(p.command), p.command);
3928 		rv = 0;
3929 		goto fail;
3930 	}
3931 
3932 	if (p.length != resp_size) {
3933 		dev_err(DEV, "expected AuthResponse payload of wrong size\n");
3934 		rv = 0;
3935 		goto fail;
3936 	}
3937 
3938 	rv = drbd_recv(mdev, response , resp_size);
3939 
3940 	if (rv != resp_size) {
3941 		dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
3942 		rv = 0;
3943 		goto fail;
3944 	}
3945 
3946 	right_response = kmalloc(resp_size, GFP_NOIO);
3947 	if (response == NULL) {
3948 		dev_err(DEV, "kmalloc of right_response failed\n");
3949 		rv = 0;
3950 		goto fail;
3951 	}
3952 
3953 	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
3954 
3955 	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
3956 	if (rv) {
3957 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
3958 		rv = 0;
3959 		goto fail;
3960 	}
3961 
3962 	rv = !memcmp(response, right_response, resp_size);
3963 
3964 	if (rv)
3965 		dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
3966 		     resp_size, mdev->net_conf->cram_hmac_alg);
3967 
3968  fail:
3969 	kfree(peers_ch);
3970 	kfree(response);
3971 	kfree(right_response);
3972 
3973 	return rv;
3974 }
3975 #endif
3976 
3977 int drbdd_init(struct drbd_thread *thi)
3978 {
3979 	struct drbd_conf *mdev = thi->mdev;
3980 	unsigned int minor = mdev_to_minor(mdev);
3981 	int h;
3982 
3983 	sprintf(current->comm, "drbd%d_receiver", minor);
3984 
3985 	dev_info(DEV, "receiver (re)started\n");
3986 
3987 	do {
3988 		h = drbd_connect(mdev);
3989 		if (h == 0) {
3990 			drbd_disconnect(mdev);
3991 			__set_current_state(TASK_INTERRUPTIBLE);
3992 			schedule_timeout(HZ);
3993 		}
3994 		if (h == -1) {
3995 			dev_warn(DEV, "Discarding network configuration.\n");
3996 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3997 		}
3998 	} while (h == 0);
3999 
4000 	if (h > 0) {
4001 		if (get_net_conf(mdev)) {
4002 			drbdd(mdev);
4003 			put_net_conf(mdev);
4004 		}
4005 	}
4006 
4007 	drbd_disconnect(mdev);
4008 
4009 	dev_info(DEV, "receiver terminated\n");
4010 	return 0;
4011 }
4012 
4013 /* ********* acknowledge sender ******** */
4014 
4015 static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4016 {
4017 	struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4018 
4019 	int retcode = be32_to_cpu(p->retcode);
4020 
4021 	if (retcode >= SS_SUCCESS) {
4022 		set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4023 	} else {
4024 		set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4025 		dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4026 		    drbd_set_st_err_str(retcode), retcode);
4027 	}
4028 	wake_up(&mdev->state_wait);
4029 
4030 	return TRUE;
4031 }
4032 
4033 static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4034 {
4035 	return drbd_send_ping_ack(mdev);
4036 
4037 }
4038 
4039 static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4040 {
4041 	/* restore idle timeout */
4042 	mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4043 
4044 	return TRUE;
4045 }
4046 
4047 static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4048 {
4049 	struct p_block_ack *p = (struct p_block_ack *)h;
4050 	sector_t sector = be64_to_cpu(p->sector);
4051 	int blksize = be32_to_cpu(p->blksize);
4052 
4053 	D_ASSERT(mdev->agreed_pro_version >= 89);
4054 
4055 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4056 
4057 	drbd_rs_complete_io(mdev, sector);
4058 	drbd_set_in_sync(mdev, sector, blksize);
4059 	/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4060 	mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4061 	dec_rs_pending(mdev);
4062 
4063 	return TRUE;
4064 }
4065 
4066 /* when we receive the ACK for a write request,
4067  * verify that we actually know about it */
4068 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4069 	u64 id, sector_t sector)
4070 {
4071 	struct hlist_head *slot = tl_hash_slot(mdev, sector);
4072 	struct hlist_node *n;
4073 	struct drbd_request *req;
4074 
4075 	hlist_for_each_entry(req, n, slot, colision) {
4076 		if ((unsigned long)req == (unsigned long)id) {
4077 			if (req->sector != sector) {
4078 				dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4079 				    "wrong sector (%llus versus %llus)\n", req,
4080 				    (unsigned long long)req->sector,
4081 				    (unsigned long long)sector);
4082 				break;
4083 			}
4084 			return req;
4085 		}
4086 	}
4087 	dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4088 		(void *)(unsigned long)id, (unsigned long long)sector);
4089 	return NULL;
4090 }
4091 
4092 typedef struct drbd_request *(req_validator_fn)
4093 	(struct drbd_conf *mdev, u64 id, sector_t sector);
4094 
4095 static int validate_req_change_req_state(struct drbd_conf *mdev,
4096 	u64 id, sector_t sector, req_validator_fn validator,
4097 	const char *func, enum drbd_req_event what)
4098 {
4099 	struct drbd_request *req;
4100 	struct bio_and_error m;
4101 
4102 	spin_lock_irq(&mdev->req_lock);
4103 	req = validator(mdev, id, sector);
4104 	if (unlikely(!req)) {
4105 		spin_unlock_irq(&mdev->req_lock);
4106 		dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4107 		return FALSE;
4108 	}
4109 	__req_mod(req, what, &m);
4110 	spin_unlock_irq(&mdev->req_lock);
4111 
4112 	if (m.bio)
4113 		complete_master_bio(mdev, &m);
4114 	return TRUE;
4115 }
4116 
4117 static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4118 {
4119 	struct p_block_ack *p = (struct p_block_ack *)h;
4120 	sector_t sector = be64_to_cpu(p->sector);
4121 	int blksize = be32_to_cpu(p->blksize);
4122 	enum drbd_req_event what;
4123 
4124 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4125 
4126 	if (is_syncer_block_id(p->block_id)) {
4127 		drbd_set_in_sync(mdev, sector, blksize);
4128 		dec_rs_pending(mdev);
4129 		return TRUE;
4130 	}
4131 	switch (be16_to_cpu(h->command)) {
4132 	case P_RS_WRITE_ACK:
4133 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4134 		what = write_acked_by_peer_and_sis;
4135 		break;
4136 	case P_WRITE_ACK:
4137 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4138 		what = write_acked_by_peer;
4139 		break;
4140 	case P_RECV_ACK:
4141 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4142 		what = recv_acked_by_peer;
4143 		break;
4144 	case P_DISCARD_ACK:
4145 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4146 		what = conflict_discarded_by_peer;
4147 		break;
4148 	default:
4149 		D_ASSERT(0);
4150 		return FALSE;
4151 	}
4152 
4153 	return validate_req_change_req_state(mdev, p->block_id, sector,
4154 		_ack_id_to_req, __func__ , what);
4155 }
4156 
4157 static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4158 {
4159 	struct p_block_ack *p = (struct p_block_ack *)h;
4160 	sector_t sector = be64_to_cpu(p->sector);
4161 
4162 	if (__ratelimit(&drbd_ratelimit_state))
4163 		dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4164 
4165 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4166 
4167 	if (is_syncer_block_id(p->block_id)) {
4168 		int size = be32_to_cpu(p->blksize);
4169 		dec_rs_pending(mdev);
4170 		drbd_rs_failed_io(mdev, sector, size);
4171 		return TRUE;
4172 	}
4173 	return validate_req_change_req_state(mdev, p->block_id, sector,
4174 		_ack_id_to_req, __func__ , neg_acked);
4175 }
4176 
4177 static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4178 {
4179 	struct p_block_ack *p = (struct p_block_ack *)h;
4180 	sector_t sector = be64_to_cpu(p->sector);
4181 
4182 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4183 	dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4184 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
4185 
4186 	return validate_req_change_req_state(mdev, p->block_id, sector,
4187 		_ar_id_to_req, __func__ , neg_acked);
4188 }
4189 
4190 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4191 {
4192 	sector_t sector;
4193 	int size;
4194 	struct p_block_ack *p = (struct p_block_ack *)h;
4195 
4196 	sector = be64_to_cpu(p->sector);
4197 	size = be32_to_cpu(p->blksize);
4198 	D_ASSERT(p->block_id == ID_SYNCER);
4199 
4200 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4201 
4202 	dec_rs_pending(mdev);
4203 
4204 	if (get_ldev_if_state(mdev, D_FAILED)) {
4205 		drbd_rs_complete_io(mdev, sector);
4206 		drbd_rs_failed_io(mdev, sector, size);
4207 		put_ldev(mdev);
4208 	}
4209 
4210 	return TRUE;
4211 }
4212 
4213 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4214 {
4215 	struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4216 
4217 	tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4218 
4219 	return TRUE;
4220 }
4221 
4222 static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4223 {
4224 	struct p_block_ack *p = (struct p_block_ack *)h;
4225 	struct drbd_work *w;
4226 	sector_t sector;
4227 	int size;
4228 
4229 	sector = be64_to_cpu(p->sector);
4230 	size = be32_to_cpu(p->blksize);
4231 
4232 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4233 
4234 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4235 		drbd_ov_oos_found(mdev, sector, size);
4236 	else
4237 		ov_oos_print(mdev);
4238 
4239 	drbd_rs_complete_io(mdev, sector);
4240 	dec_rs_pending(mdev);
4241 
4242 	if (--mdev->ov_left == 0) {
4243 		w = kmalloc(sizeof(*w), GFP_NOIO);
4244 		if (w) {
4245 			w->cb = w_ov_finished;
4246 			drbd_queue_work_front(&mdev->data.work, w);
4247 		} else {
4248 			dev_err(DEV, "kmalloc(w) failed.");
4249 			ov_oos_print(mdev);
4250 			drbd_resync_finished(mdev);
4251 		}
4252 	}
4253 	return TRUE;
4254 }
4255 
4256 struct asender_cmd {
4257 	size_t pkt_size;
4258 	int (*process)(struct drbd_conf *mdev, struct p_header *h);
4259 };
4260 
4261 static struct asender_cmd *get_asender_cmd(int cmd)
4262 {
4263 	static struct asender_cmd asender_tbl[] = {
4264 		/* anything missing from this table is in
4265 		 * the drbd_cmd_handler (drbd_default_handler) table,
4266 		 * see the beginning of drbdd() */
4267 	[P_PING]	    = { sizeof(struct p_header), got_Ping },
4268 	[P_PING_ACK]	    = { sizeof(struct p_header), got_PingAck },
4269 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4270 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4271 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4272 	[P_DISCARD_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4273 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
4274 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
4275 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4276 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
4277 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
4278 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4279 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4280 	[P_MAX_CMD]	    = { 0, NULL },
4281 	};
4282 	if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4283 		return NULL;
4284 	return &asender_tbl[cmd];
4285 }
4286 
4287 int drbd_asender(struct drbd_thread *thi)
4288 {
4289 	struct drbd_conf *mdev = thi->mdev;
4290 	struct p_header *h = &mdev->meta.rbuf.header;
4291 	struct asender_cmd *cmd = NULL;
4292 
4293 	int rv, len;
4294 	void *buf    = h;
4295 	int received = 0;
4296 	int expect   = sizeof(struct p_header);
4297 	int empty;
4298 
4299 	sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4300 
4301 	current->policy = SCHED_RR;  /* Make this a realtime task! */
4302 	current->rt_priority = 2;    /* more important than all other tasks */
4303 
4304 	while (get_t_state(thi) == Running) {
4305 		drbd_thread_current_set_cpu(mdev);
4306 		if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4307 			ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4308 			mdev->meta.socket->sk->sk_rcvtimeo =
4309 				mdev->net_conf->ping_timeo*HZ/10;
4310 		}
4311 
4312 		/* conditionally cork;
4313 		 * it may hurt latency if we cork without much to send */
4314 		if (!mdev->net_conf->no_cork &&
4315 			3 < atomic_read(&mdev->unacked_cnt))
4316 			drbd_tcp_cork(mdev->meta.socket);
4317 		while (1) {
4318 			clear_bit(SIGNAL_ASENDER, &mdev->flags);
4319 			flush_signals(current);
4320 			if (!drbd_process_done_ee(mdev)) {
4321 				dev_err(DEV, "process_done_ee() = NOT_OK\n");
4322 				goto reconnect;
4323 			}
4324 			/* to avoid race with newly queued ACKs */
4325 			set_bit(SIGNAL_ASENDER, &mdev->flags);
4326 			spin_lock_irq(&mdev->req_lock);
4327 			empty = list_empty(&mdev->done_ee);
4328 			spin_unlock_irq(&mdev->req_lock);
4329 			/* new ack may have been queued right here,
4330 			 * but then there is also a signal pending,
4331 			 * and we start over... */
4332 			if (empty)
4333 				break;
4334 		}
4335 		/* but unconditionally uncork unless disabled */
4336 		if (!mdev->net_conf->no_cork)
4337 			drbd_tcp_uncork(mdev->meta.socket);
4338 
4339 		/* short circuit, recv_msg would return EINTR anyways. */
4340 		if (signal_pending(current))
4341 			continue;
4342 
4343 		rv = drbd_recv_short(mdev, mdev->meta.socket,
4344 				     buf, expect-received, 0);
4345 		clear_bit(SIGNAL_ASENDER, &mdev->flags);
4346 
4347 		flush_signals(current);
4348 
4349 		/* Note:
4350 		 * -EINTR	 (on meta) we got a signal
4351 		 * -EAGAIN	 (on meta) rcvtimeo expired
4352 		 * -ECONNRESET	 other side closed the connection
4353 		 * -ERESTARTSYS  (on data) we got a signal
4354 		 * rv <  0	 other than above: unexpected error!
4355 		 * rv == expected: full header or command
4356 		 * rv <  expected: "woken" by signal during receive
4357 		 * rv == 0	 : "connection shut down by peer"
4358 		 */
4359 		if (likely(rv > 0)) {
4360 			received += rv;
4361 			buf	 += rv;
4362 		} else if (rv == 0) {
4363 			dev_err(DEV, "meta connection shut down by peer.\n");
4364 			goto reconnect;
4365 		} else if (rv == -EAGAIN) {
4366 			if (mdev->meta.socket->sk->sk_rcvtimeo ==
4367 			    mdev->net_conf->ping_timeo*HZ/10) {
4368 				dev_err(DEV, "PingAck did not arrive in time.\n");
4369 				goto reconnect;
4370 			}
4371 			set_bit(SEND_PING, &mdev->flags);
4372 			continue;
4373 		} else if (rv == -EINTR) {
4374 			continue;
4375 		} else {
4376 			dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4377 			goto reconnect;
4378 		}
4379 
4380 		if (received == expect && cmd == NULL) {
4381 			if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4382 				dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4383 				    (long)be32_to_cpu(h->magic),
4384 				    h->command, h->length);
4385 				goto reconnect;
4386 			}
4387 			cmd = get_asender_cmd(be16_to_cpu(h->command));
4388 			len = be16_to_cpu(h->length);
4389 			if (unlikely(cmd == NULL)) {
4390 				dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4391 				    (long)be32_to_cpu(h->magic),
4392 				    h->command, h->length);
4393 				goto disconnect;
4394 			}
4395 			expect = cmd->pkt_size;
4396 			ERR_IF(len != expect-sizeof(struct p_header))
4397 				goto reconnect;
4398 		}
4399 		if (received == expect) {
4400 			D_ASSERT(cmd != NULL);
4401 			if (!cmd->process(mdev, h))
4402 				goto reconnect;
4403 
4404 			buf	 = h;
4405 			received = 0;
4406 			expect	 = sizeof(struct p_header);
4407 			cmd	 = NULL;
4408 		}
4409 	}
4410 
4411 	if (0) {
4412 reconnect:
4413 		drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4414 	}
4415 	if (0) {
4416 disconnect:
4417 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4418 	}
4419 	clear_bit(SIGNAL_ASENDER, &mdev->flags);
4420 
4421 	D_ASSERT(mdev->state.conn < C_CONNECTED);
4422 	dev_info(DEV, "asender terminated\n");
4423 
4424 	return 0;
4425 }
4426