1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
50 
51 #define PRO_FEATURES (FF_TRIM)
52 
53 struct packet_info {
54 	enum drbd_packet cmd;
55 	unsigned int size;
56 	unsigned int vnr;
57 	void *data;
58 };
59 
60 enum finish_epoch {
61 	FE_STILL_LIVE,
62 	FE_DESTROYED,
63 	FE_RECYCLED,
64 };
65 
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
72 
73 
74 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
75 
76 /*
77  * some helper functions to deal with single linked page lists,
78  * page->private being our "next" pointer.
79  */
80 
81 /* If at least n pages are linked at head, get n pages off.
82  * Otherwise, don't modify head, and return NULL.
83  * Locking is the responsibility of the caller.
84  */
85 static struct page *page_chain_del(struct page **head, int n)
86 {
87 	struct page *page;
88 	struct page *tmp;
89 
90 	BUG_ON(!n);
91 	BUG_ON(!head);
92 
93 	page = *head;
94 
95 	if (!page)
96 		return NULL;
97 
98 	while (page) {
99 		tmp = page_chain_next(page);
100 		if (--n == 0)
101 			break; /* found sufficient pages */
102 		if (tmp == NULL)
103 			/* insufficient pages, don't use any of them. */
104 			return NULL;
105 		page = tmp;
106 	}
107 
108 	/* add end of list marker for the returned list */
109 	set_page_private(page, 0);
110 	/* actual return value, and adjustment of head */
111 	page = *head;
112 	*head = tmp;
113 	return page;
114 }
115 
116 /* may be used outside of locks to find the tail of a (usually short)
117  * "private" page chain, before adding it back to a global chain head
118  * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
120 {
121 	struct page *tmp;
122 	int i = 1;
123 	while ((tmp = page_chain_next(page)))
124 		++i, page = tmp;
125 	if (len)
126 		*len = i;
127 	return page;
128 }
129 
130 static int page_chain_free(struct page *page)
131 {
132 	struct page *tmp;
133 	int i = 0;
134 	page_chain_for_each_safe(page, tmp) {
135 		put_page(page);
136 		++i;
137 	}
138 	return i;
139 }
140 
141 static void page_chain_add(struct page **head,
142 		struct page *chain_first, struct page *chain_last)
143 {
144 #if 1
145 	struct page *tmp;
146 	tmp = page_chain_tail(chain_first, NULL);
147 	BUG_ON(tmp != chain_last);
148 #endif
149 
150 	/* add chain to head */
151 	set_page_private(chain_last, (unsigned long)*head);
152 	*head = chain_first;
153 }
154 
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156 				       unsigned int number)
157 {
158 	struct page *page = NULL;
159 	struct page *tmp = NULL;
160 	unsigned int i = 0;
161 
162 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
163 	 * So what. It saves a spin_lock. */
164 	if (drbd_pp_vacant >= number) {
165 		spin_lock(&drbd_pp_lock);
166 		page = page_chain_del(&drbd_pp_pool, number);
167 		if (page)
168 			drbd_pp_vacant -= number;
169 		spin_unlock(&drbd_pp_lock);
170 		if (page)
171 			return page;
172 	}
173 
174 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
176 	 * which in turn might block on the other node at this very place.  */
177 	for (i = 0; i < number; i++) {
178 		tmp = alloc_page(GFP_TRY);
179 		if (!tmp)
180 			break;
181 		set_page_private(tmp, (unsigned long)page);
182 		page = tmp;
183 	}
184 
185 	if (i == number)
186 		return page;
187 
188 	/* Not enough pages immediately available this time.
189 	 * No need to jump around here, drbd_alloc_pages will retry this
190 	 * function "soon". */
191 	if (page) {
192 		tmp = page_chain_tail(page, NULL);
193 		spin_lock(&drbd_pp_lock);
194 		page_chain_add(&drbd_pp_pool, page, tmp);
195 		drbd_pp_vacant += i;
196 		spin_unlock(&drbd_pp_lock);
197 	}
198 	return NULL;
199 }
200 
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202 					   struct list_head *to_be_freed)
203 {
204 	struct drbd_peer_request *peer_req, *tmp;
205 
206 	/* The EEs are always appended to the end of the list. Since
207 	   they are sent in order over the wire, they have to finish
208 	   in order. As soon as we see the first not finished we can
209 	   stop to examine the list... */
210 
211 	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212 		if (drbd_peer_req_has_active_page(peer_req))
213 			break;
214 		list_move(&peer_req->w.list, to_be_freed);
215 	}
216 }
217 
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
219 {
220 	LIST_HEAD(reclaimed);
221 	struct drbd_peer_request *peer_req, *t;
222 
223 	spin_lock_irq(&device->resource->req_lock);
224 	reclaim_finished_net_peer_reqs(device, &reclaimed);
225 	spin_unlock_irq(&device->resource->req_lock);
226 
227 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228 		drbd_free_net_peer_req(device, peer_req);
229 }
230 
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @device:	DRBD device.
234  * @number:	number of pages requested
235  * @retry:	whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * If this allocation would exceed the max_buffers setting, we throttle
242  * allocation (schedule_timeout) to give the system some room to breathe.
243  *
244  * We do not use max-buffers as hard limit, because it could lead to
245  * congestion and further to a distributed deadlock during online-verify or
246  * (checksum based) resync, if the max-buffers, socket buffer sizes and
247  * resync-rate settings are mis-configured.
248  *
249  * Returns a page chain linked via page->private.
250  */
251 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
252 			      bool retry)
253 {
254 	struct drbd_device *device = peer_device->device;
255 	struct page *page = NULL;
256 	struct net_conf *nc;
257 	DEFINE_WAIT(wait);
258 	unsigned int mxb;
259 
260 	rcu_read_lock();
261 	nc = rcu_dereference(peer_device->connection->net_conf);
262 	mxb = nc ? nc->max_buffers : 1000000;
263 	rcu_read_unlock();
264 
265 	if (atomic_read(&device->pp_in_use) < mxb)
266 		page = __drbd_alloc_pages(device, number);
267 
268 	while (page == NULL) {
269 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
270 
271 		drbd_kick_lo_and_reclaim_net(device);
272 
273 		if (atomic_read(&device->pp_in_use) < mxb) {
274 			page = __drbd_alloc_pages(device, number);
275 			if (page)
276 				break;
277 		}
278 
279 		if (!retry)
280 			break;
281 
282 		if (signal_pending(current)) {
283 			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
284 			break;
285 		}
286 
287 		if (schedule_timeout(HZ/10) == 0)
288 			mxb = UINT_MAX;
289 	}
290 	finish_wait(&drbd_pp_wait, &wait);
291 
292 	if (page)
293 		atomic_add(number, &device->pp_in_use);
294 	return page;
295 }
296 
297 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
298  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
299  * Either links the page chain back to the global pool,
300  * or returns all pages to the system. */
301 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
302 {
303 	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
304 	int i;
305 
306 	if (page == NULL)
307 		return;
308 
309 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
310 		i = page_chain_free(page);
311 	else {
312 		struct page *tmp;
313 		tmp = page_chain_tail(page, &i);
314 		spin_lock(&drbd_pp_lock);
315 		page_chain_add(&drbd_pp_pool, page, tmp);
316 		drbd_pp_vacant += i;
317 		spin_unlock(&drbd_pp_lock);
318 	}
319 	i = atomic_sub_return(i, a);
320 	if (i < 0)
321 		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
322 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
323 	wake_up(&drbd_pp_wait);
324 }
325 
326 /*
327 You need to hold the req_lock:
328  _drbd_wait_ee_list_empty()
329 
330 You must not have the req_lock:
331  drbd_free_peer_req()
332  drbd_alloc_peer_req()
333  drbd_free_peer_reqs()
334  drbd_ee_fix_bhs()
335  drbd_finish_peer_reqs()
336  drbd_clear_done_ee()
337  drbd_wait_ee_list_empty()
338 */
339 
340 struct drbd_peer_request *
341 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
342 		    unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
343 {
344 	struct drbd_device *device = peer_device->device;
345 	struct drbd_peer_request *peer_req;
346 	struct page *page = NULL;
347 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
348 
349 	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
350 		return NULL;
351 
352 	peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
353 	if (!peer_req) {
354 		if (!(gfp_mask & __GFP_NOWARN))
355 			drbd_err(device, "%s: allocation failed\n", __func__);
356 		return NULL;
357 	}
358 
359 	if (has_payload && data_size) {
360 		page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
361 		if (!page)
362 			goto fail;
363 	}
364 
365 	drbd_clear_interval(&peer_req->i);
366 	peer_req->i.size = data_size;
367 	peer_req->i.sector = sector;
368 	peer_req->i.local = false;
369 	peer_req->i.waiting = false;
370 
371 	peer_req->epoch = NULL;
372 	peer_req->peer_device = peer_device;
373 	peer_req->pages = page;
374 	atomic_set(&peer_req->pending_bios, 0);
375 	peer_req->flags = 0;
376 	/*
377 	 * The block_id is opaque to the receiver.  It is not endianness
378 	 * converted, and sent back to the sender unchanged.
379 	 */
380 	peer_req->block_id = id;
381 
382 	return peer_req;
383 
384  fail:
385 	mempool_free(peer_req, drbd_ee_mempool);
386 	return NULL;
387 }
388 
389 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
390 		       int is_net)
391 {
392 	if (peer_req->flags & EE_HAS_DIGEST)
393 		kfree(peer_req->digest);
394 	drbd_free_pages(device, peer_req->pages, is_net);
395 	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
396 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
397 	mempool_free(peer_req, drbd_ee_mempool);
398 }
399 
400 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
401 {
402 	LIST_HEAD(work_list);
403 	struct drbd_peer_request *peer_req, *t;
404 	int count = 0;
405 	int is_net = list == &device->net_ee;
406 
407 	spin_lock_irq(&device->resource->req_lock);
408 	list_splice_init(list, &work_list);
409 	spin_unlock_irq(&device->resource->req_lock);
410 
411 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
412 		__drbd_free_peer_req(device, peer_req, is_net);
413 		count++;
414 	}
415 	return count;
416 }
417 
418 /*
419  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
420  */
421 static int drbd_finish_peer_reqs(struct drbd_device *device)
422 {
423 	LIST_HEAD(work_list);
424 	LIST_HEAD(reclaimed);
425 	struct drbd_peer_request *peer_req, *t;
426 	int err = 0;
427 
428 	spin_lock_irq(&device->resource->req_lock);
429 	reclaim_finished_net_peer_reqs(device, &reclaimed);
430 	list_splice_init(&device->done_ee, &work_list);
431 	spin_unlock_irq(&device->resource->req_lock);
432 
433 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
434 		drbd_free_net_peer_req(device, peer_req);
435 
436 	/* possible callbacks here:
437 	 * e_end_block, and e_end_resync_block, e_send_superseded.
438 	 * all ignore the last argument.
439 	 */
440 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
441 		int err2;
442 
443 		/* list_del not necessary, next/prev members not touched */
444 		err2 = peer_req->w.cb(&peer_req->w, !!err);
445 		if (!err)
446 			err = err2;
447 		drbd_free_peer_req(device, peer_req);
448 	}
449 	wake_up(&device->ee_wait);
450 
451 	return err;
452 }
453 
454 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
455 				     struct list_head *head)
456 {
457 	DEFINE_WAIT(wait);
458 
459 	/* avoids spin_lock/unlock
460 	 * and calling prepare_to_wait in the fast path */
461 	while (!list_empty(head)) {
462 		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
463 		spin_unlock_irq(&device->resource->req_lock);
464 		io_schedule();
465 		finish_wait(&device->ee_wait, &wait);
466 		spin_lock_irq(&device->resource->req_lock);
467 	}
468 }
469 
470 static void drbd_wait_ee_list_empty(struct drbd_device *device,
471 				    struct list_head *head)
472 {
473 	spin_lock_irq(&device->resource->req_lock);
474 	_drbd_wait_ee_list_empty(device, head);
475 	spin_unlock_irq(&device->resource->req_lock);
476 }
477 
478 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
479 {
480 	struct kvec iov = {
481 		.iov_base = buf,
482 		.iov_len = size,
483 	};
484 	struct msghdr msg = {
485 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
486 	};
487 	return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
488 }
489 
490 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
491 {
492 	int rv;
493 
494 	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
495 
496 	if (rv < 0) {
497 		if (rv == -ECONNRESET)
498 			drbd_info(connection, "sock was reset by peer\n");
499 		else if (rv != -ERESTARTSYS)
500 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
501 	} else if (rv == 0) {
502 		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
503 			long t;
504 			rcu_read_lock();
505 			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
506 			rcu_read_unlock();
507 
508 			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
509 
510 			if (t)
511 				goto out;
512 		}
513 		drbd_info(connection, "sock was shut down by peer\n");
514 	}
515 
516 	if (rv != size)
517 		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
518 
519 out:
520 	return rv;
521 }
522 
523 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
524 {
525 	int err;
526 
527 	err = drbd_recv(connection, buf, size);
528 	if (err != size) {
529 		if (err >= 0)
530 			err = -EIO;
531 	} else
532 		err = 0;
533 	return err;
534 }
535 
536 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
537 {
538 	int err;
539 
540 	err = drbd_recv_all(connection, buf, size);
541 	if (err && !signal_pending(current))
542 		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
543 	return err;
544 }
545 
546 /* quoting tcp(7):
547  *   On individual connections, the socket buffer size must be set prior to the
548  *   listen(2) or connect(2) calls in order to have it take effect.
549  * This is our wrapper to do so.
550  */
551 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
552 		unsigned int rcv)
553 {
554 	/* open coded SO_SNDBUF, SO_RCVBUF */
555 	if (snd) {
556 		sock->sk->sk_sndbuf = snd;
557 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
558 	}
559 	if (rcv) {
560 		sock->sk->sk_rcvbuf = rcv;
561 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
562 	}
563 }
564 
565 static struct socket *drbd_try_connect(struct drbd_connection *connection)
566 {
567 	const char *what;
568 	struct socket *sock;
569 	struct sockaddr_in6 src_in6;
570 	struct sockaddr_in6 peer_in6;
571 	struct net_conf *nc;
572 	int err, peer_addr_len, my_addr_len;
573 	int sndbuf_size, rcvbuf_size, connect_int;
574 	int disconnect_on_error = 1;
575 
576 	rcu_read_lock();
577 	nc = rcu_dereference(connection->net_conf);
578 	if (!nc) {
579 		rcu_read_unlock();
580 		return NULL;
581 	}
582 	sndbuf_size = nc->sndbuf_size;
583 	rcvbuf_size = nc->rcvbuf_size;
584 	connect_int = nc->connect_int;
585 	rcu_read_unlock();
586 
587 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
588 	memcpy(&src_in6, &connection->my_addr, my_addr_len);
589 
590 	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
591 		src_in6.sin6_port = 0;
592 	else
593 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
594 
595 	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
596 	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
597 
598 	what = "sock_create_kern";
599 	err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
600 			       SOCK_STREAM, IPPROTO_TCP, &sock);
601 	if (err < 0) {
602 		sock = NULL;
603 		goto out;
604 	}
605 
606 	sock->sk->sk_rcvtimeo =
607 	sock->sk->sk_sndtimeo = connect_int * HZ;
608 	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
609 
610        /* explicitly bind to the configured IP as source IP
611 	*  for the outgoing connections.
612 	*  This is needed for multihomed hosts and to be
613 	*  able to use lo: interfaces for drbd.
614 	* Make sure to use 0 as port number, so linux selects
615 	*  a free one dynamically.
616 	*/
617 	what = "bind before connect";
618 	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
619 	if (err < 0)
620 		goto out;
621 
622 	/* connect may fail, peer not yet available.
623 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
624 	disconnect_on_error = 0;
625 	what = "connect";
626 	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
627 
628 out:
629 	if (err < 0) {
630 		if (sock) {
631 			sock_release(sock);
632 			sock = NULL;
633 		}
634 		switch (-err) {
635 			/* timeout, busy, signal pending */
636 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
637 		case EINTR: case ERESTARTSYS:
638 			/* peer not (yet) available, network problem */
639 		case ECONNREFUSED: case ENETUNREACH:
640 		case EHOSTDOWN:    case EHOSTUNREACH:
641 			disconnect_on_error = 0;
642 			break;
643 		default:
644 			drbd_err(connection, "%s failed, err = %d\n", what, err);
645 		}
646 		if (disconnect_on_error)
647 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
648 	}
649 
650 	return sock;
651 }
652 
653 struct accept_wait_data {
654 	struct drbd_connection *connection;
655 	struct socket *s_listen;
656 	struct completion door_bell;
657 	void (*original_sk_state_change)(struct sock *sk);
658 
659 };
660 
661 static void drbd_incoming_connection(struct sock *sk)
662 {
663 	struct accept_wait_data *ad = sk->sk_user_data;
664 	void (*state_change)(struct sock *sk);
665 
666 	state_change = ad->original_sk_state_change;
667 	if (sk->sk_state == TCP_ESTABLISHED)
668 		complete(&ad->door_bell);
669 	state_change(sk);
670 }
671 
672 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
673 {
674 	int err, sndbuf_size, rcvbuf_size, my_addr_len;
675 	struct sockaddr_in6 my_addr;
676 	struct socket *s_listen;
677 	struct net_conf *nc;
678 	const char *what;
679 
680 	rcu_read_lock();
681 	nc = rcu_dereference(connection->net_conf);
682 	if (!nc) {
683 		rcu_read_unlock();
684 		return -EIO;
685 	}
686 	sndbuf_size = nc->sndbuf_size;
687 	rcvbuf_size = nc->rcvbuf_size;
688 	rcu_read_unlock();
689 
690 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
691 	memcpy(&my_addr, &connection->my_addr, my_addr_len);
692 
693 	what = "sock_create_kern";
694 	err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
695 			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
696 	if (err) {
697 		s_listen = NULL;
698 		goto out;
699 	}
700 
701 	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
702 	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
703 
704 	what = "bind before listen";
705 	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
706 	if (err < 0)
707 		goto out;
708 
709 	ad->s_listen = s_listen;
710 	write_lock_bh(&s_listen->sk->sk_callback_lock);
711 	ad->original_sk_state_change = s_listen->sk->sk_state_change;
712 	s_listen->sk->sk_state_change = drbd_incoming_connection;
713 	s_listen->sk->sk_user_data = ad;
714 	write_unlock_bh(&s_listen->sk->sk_callback_lock);
715 
716 	what = "listen";
717 	err = s_listen->ops->listen(s_listen, 5);
718 	if (err < 0)
719 		goto out;
720 
721 	return 0;
722 out:
723 	if (s_listen)
724 		sock_release(s_listen);
725 	if (err < 0) {
726 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
727 			drbd_err(connection, "%s failed, err = %d\n", what, err);
728 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
729 		}
730 	}
731 
732 	return -EIO;
733 }
734 
735 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
736 {
737 	write_lock_bh(&sk->sk_callback_lock);
738 	sk->sk_state_change = ad->original_sk_state_change;
739 	sk->sk_user_data = NULL;
740 	write_unlock_bh(&sk->sk_callback_lock);
741 }
742 
743 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
744 {
745 	int timeo, connect_int, err = 0;
746 	struct socket *s_estab = NULL;
747 	struct net_conf *nc;
748 
749 	rcu_read_lock();
750 	nc = rcu_dereference(connection->net_conf);
751 	if (!nc) {
752 		rcu_read_unlock();
753 		return NULL;
754 	}
755 	connect_int = nc->connect_int;
756 	rcu_read_unlock();
757 
758 	timeo = connect_int * HZ;
759 	/* 28.5% random jitter */
760 	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
761 
762 	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
763 	if (err <= 0)
764 		return NULL;
765 
766 	err = kernel_accept(ad->s_listen, &s_estab, 0);
767 	if (err < 0) {
768 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
769 			drbd_err(connection, "accept failed, err = %d\n", err);
770 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
771 		}
772 	}
773 
774 	if (s_estab)
775 		unregister_state_change(s_estab->sk, ad);
776 
777 	return s_estab;
778 }
779 
780 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
781 
782 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
783 			     enum drbd_packet cmd)
784 {
785 	if (!conn_prepare_command(connection, sock))
786 		return -EIO;
787 	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
788 }
789 
790 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
791 {
792 	unsigned int header_size = drbd_header_size(connection);
793 	struct packet_info pi;
794 	int err;
795 
796 	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
797 	if (err != header_size) {
798 		if (err >= 0)
799 			err = -EIO;
800 		return err;
801 	}
802 	err = decode_header(connection, connection->data.rbuf, &pi);
803 	if (err)
804 		return err;
805 	return pi.cmd;
806 }
807 
808 /**
809  * drbd_socket_okay() - Free the socket if its connection is not okay
810  * @sock:	pointer to the pointer to the socket.
811  */
812 static int drbd_socket_okay(struct socket **sock)
813 {
814 	int rr;
815 	char tb[4];
816 
817 	if (!*sock)
818 		return false;
819 
820 	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
821 
822 	if (rr > 0 || rr == -EAGAIN) {
823 		return true;
824 	} else {
825 		sock_release(*sock);
826 		*sock = NULL;
827 		return false;
828 	}
829 }
830 /* Gets called if a connection is established, or if a new minor gets created
831    in a connection */
832 int drbd_connected(struct drbd_peer_device *peer_device)
833 {
834 	struct drbd_device *device = peer_device->device;
835 	int err;
836 
837 	atomic_set(&device->packet_seq, 0);
838 	device->peer_seq = 0;
839 
840 	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
841 		&peer_device->connection->cstate_mutex :
842 		&device->own_state_mutex;
843 
844 	err = drbd_send_sync_param(peer_device);
845 	if (!err)
846 		err = drbd_send_sizes(peer_device, 0, 0);
847 	if (!err)
848 		err = drbd_send_uuids(peer_device);
849 	if (!err)
850 		err = drbd_send_current_state(peer_device);
851 	clear_bit(USE_DEGR_WFC_T, &device->flags);
852 	clear_bit(RESIZE_PENDING, &device->flags);
853 	atomic_set(&device->ap_in_flight, 0);
854 	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
855 	return err;
856 }
857 
858 /*
859  * return values:
860  *   1 yes, we have a valid connection
861  *   0 oops, did not work out, please try again
862  *  -1 peer talks different language,
863  *     no point in trying again, please go standalone.
864  *  -2 We do not have a network config...
865  */
866 static int conn_connect(struct drbd_connection *connection)
867 {
868 	struct drbd_socket sock, msock;
869 	struct drbd_peer_device *peer_device;
870 	struct net_conf *nc;
871 	int vnr, timeout, h, ok;
872 	bool discard_my_data;
873 	enum drbd_state_rv rv;
874 	struct accept_wait_data ad = {
875 		.connection = connection,
876 		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
877 	};
878 
879 	clear_bit(DISCONNECT_SENT, &connection->flags);
880 	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
881 		return -2;
882 
883 	mutex_init(&sock.mutex);
884 	sock.sbuf = connection->data.sbuf;
885 	sock.rbuf = connection->data.rbuf;
886 	sock.socket = NULL;
887 	mutex_init(&msock.mutex);
888 	msock.sbuf = connection->meta.sbuf;
889 	msock.rbuf = connection->meta.rbuf;
890 	msock.socket = NULL;
891 
892 	/* Assume that the peer only understands protocol 80 until we know better.  */
893 	connection->agreed_pro_version = 80;
894 
895 	if (prepare_listen_socket(connection, &ad))
896 		return 0;
897 
898 	do {
899 		struct socket *s;
900 
901 		s = drbd_try_connect(connection);
902 		if (s) {
903 			if (!sock.socket) {
904 				sock.socket = s;
905 				send_first_packet(connection, &sock, P_INITIAL_DATA);
906 			} else if (!msock.socket) {
907 				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
908 				msock.socket = s;
909 				send_first_packet(connection, &msock, P_INITIAL_META);
910 			} else {
911 				drbd_err(connection, "Logic error in conn_connect()\n");
912 				goto out_release_sockets;
913 			}
914 		}
915 
916 		if (sock.socket && msock.socket) {
917 			rcu_read_lock();
918 			nc = rcu_dereference(connection->net_conf);
919 			timeout = nc->ping_timeo * HZ / 10;
920 			rcu_read_unlock();
921 			schedule_timeout_interruptible(timeout);
922 			ok = drbd_socket_okay(&sock.socket);
923 			ok = drbd_socket_okay(&msock.socket) && ok;
924 			if (ok)
925 				break;
926 		}
927 
928 retry:
929 		s = drbd_wait_for_connect(connection, &ad);
930 		if (s) {
931 			int fp = receive_first_packet(connection, s);
932 			drbd_socket_okay(&sock.socket);
933 			drbd_socket_okay(&msock.socket);
934 			switch (fp) {
935 			case P_INITIAL_DATA:
936 				if (sock.socket) {
937 					drbd_warn(connection, "initial packet S crossed\n");
938 					sock_release(sock.socket);
939 					sock.socket = s;
940 					goto randomize;
941 				}
942 				sock.socket = s;
943 				break;
944 			case P_INITIAL_META:
945 				set_bit(RESOLVE_CONFLICTS, &connection->flags);
946 				if (msock.socket) {
947 					drbd_warn(connection, "initial packet M crossed\n");
948 					sock_release(msock.socket);
949 					msock.socket = s;
950 					goto randomize;
951 				}
952 				msock.socket = s;
953 				break;
954 			default:
955 				drbd_warn(connection, "Error receiving initial packet\n");
956 				sock_release(s);
957 randomize:
958 				if (prandom_u32() & 1)
959 					goto retry;
960 			}
961 		}
962 
963 		if (connection->cstate <= C_DISCONNECTING)
964 			goto out_release_sockets;
965 		if (signal_pending(current)) {
966 			flush_signals(current);
967 			smp_rmb();
968 			if (get_t_state(&connection->receiver) == EXITING)
969 				goto out_release_sockets;
970 		}
971 
972 		ok = drbd_socket_okay(&sock.socket);
973 		ok = drbd_socket_okay(&msock.socket) && ok;
974 	} while (!ok);
975 
976 	if (ad.s_listen)
977 		sock_release(ad.s_listen);
978 
979 	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
980 	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
981 
982 	sock.socket->sk->sk_allocation = GFP_NOIO;
983 	msock.socket->sk->sk_allocation = GFP_NOIO;
984 
985 	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
986 	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
987 
988 	/* NOT YET ...
989 	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
990 	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
991 	 * first set it to the P_CONNECTION_FEATURES timeout,
992 	 * which we set to 4x the configured ping_timeout. */
993 	rcu_read_lock();
994 	nc = rcu_dereference(connection->net_conf);
995 
996 	sock.socket->sk->sk_sndtimeo =
997 	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
998 
999 	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1000 	timeout = nc->timeout * HZ / 10;
1001 	discard_my_data = nc->discard_my_data;
1002 	rcu_read_unlock();
1003 
1004 	msock.socket->sk->sk_sndtimeo = timeout;
1005 
1006 	/* we don't want delays.
1007 	 * we use TCP_CORK where appropriate, though */
1008 	drbd_tcp_nodelay(sock.socket);
1009 	drbd_tcp_nodelay(msock.socket);
1010 
1011 	connection->data.socket = sock.socket;
1012 	connection->meta.socket = msock.socket;
1013 	connection->last_received = jiffies;
1014 
1015 	h = drbd_do_features(connection);
1016 	if (h <= 0)
1017 		return h;
1018 
1019 	if (connection->cram_hmac_tfm) {
1020 		/* drbd_request_state(device, NS(conn, WFAuth)); */
1021 		switch (drbd_do_auth(connection)) {
1022 		case -1:
1023 			drbd_err(connection, "Authentication of peer failed\n");
1024 			return -1;
1025 		case 0:
1026 			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1027 			return 0;
1028 		}
1029 	}
1030 
1031 	connection->data.socket->sk->sk_sndtimeo = timeout;
1032 	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1033 
1034 	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1035 		return -1;
1036 
1037 	/* Prevent a race between resync-handshake and
1038 	 * being promoted to Primary.
1039 	 *
1040 	 * Grab and release the state mutex, so we know that any current
1041 	 * drbd_set_role() is finished, and any incoming drbd_set_role
1042 	 * will see the STATE_SENT flag, and wait for it to be cleared.
1043 	 */
1044 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1045 		mutex_lock(peer_device->device->state_mutex);
1046 
1047 	set_bit(STATE_SENT, &connection->flags);
1048 
1049 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1050 		mutex_unlock(peer_device->device->state_mutex);
1051 
1052 	rcu_read_lock();
1053 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1054 		struct drbd_device *device = peer_device->device;
1055 		kref_get(&device->kref);
1056 		rcu_read_unlock();
1057 
1058 		if (discard_my_data)
1059 			set_bit(DISCARD_MY_DATA, &device->flags);
1060 		else
1061 			clear_bit(DISCARD_MY_DATA, &device->flags);
1062 
1063 		drbd_connected(peer_device);
1064 		kref_put(&device->kref, drbd_destroy_device);
1065 		rcu_read_lock();
1066 	}
1067 	rcu_read_unlock();
1068 
1069 	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1070 	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1071 		clear_bit(STATE_SENT, &connection->flags);
1072 		return 0;
1073 	}
1074 
1075 	drbd_thread_start(&connection->asender);
1076 
1077 	mutex_lock(&connection->resource->conf_update);
1078 	/* The discard_my_data flag is a single-shot modifier to the next
1079 	 * connection attempt, the handshake of which is now well underway.
1080 	 * No need for rcu style copying of the whole struct
1081 	 * just to clear a single value. */
1082 	connection->net_conf->discard_my_data = 0;
1083 	mutex_unlock(&connection->resource->conf_update);
1084 
1085 	return h;
1086 
1087 out_release_sockets:
1088 	if (ad.s_listen)
1089 		sock_release(ad.s_listen);
1090 	if (sock.socket)
1091 		sock_release(sock.socket);
1092 	if (msock.socket)
1093 		sock_release(msock.socket);
1094 	return -1;
1095 }
1096 
1097 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1098 {
1099 	unsigned int header_size = drbd_header_size(connection);
1100 
1101 	if (header_size == sizeof(struct p_header100) &&
1102 	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1103 		struct p_header100 *h = header;
1104 		if (h->pad != 0) {
1105 			drbd_err(connection, "Header padding is not zero\n");
1106 			return -EINVAL;
1107 		}
1108 		pi->vnr = be16_to_cpu(h->volume);
1109 		pi->cmd = be16_to_cpu(h->command);
1110 		pi->size = be32_to_cpu(h->length);
1111 	} else if (header_size == sizeof(struct p_header95) &&
1112 		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1113 		struct p_header95 *h = header;
1114 		pi->cmd = be16_to_cpu(h->command);
1115 		pi->size = be32_to_cpu(h->length);
1116 		pi->vnr = 0;
1117 	} else if (header_size == sizeof(struct p_header80) &&
1118 		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1119 		struct p_header80 *h = header;
1120 		pi->cmd = be16_to_cpu(h->command);
1121 		pi->size = be16_to_cpu(h->length);
1122 		pi->vnr = 0;
1123 	} else {
1124 		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1125 			 be32_to_cpu(*(__be32 *)header),
1126 			 connection->agreed_pro_version);
1127 		return -EINVAL;
1128 	}
1129 	pi->data = header + header_size;
1130 	return 0;
1131 }
1132 
1133 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1134 {
1135 	void *buffer = connection->data.rbuf;
1136 	int err;
1137 
1138 	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1139 	if (err)
1140 		return err;
1141 
1142 	err = decode_header(connection, buffer, pi);
1143 	connection->last_received = jiffies;
1144 
1145 	return err;
1146 }
1147 
1148 static void drbd_flush(struct drbd_connection *connection)
1149 {
1150 	int rv;
1151 	struct drbd_peer_device *peer_device;
1152 	int vnr;
1153 
1154 	if (connection->write_ordering >= WO_bdev_flush) {
1155 		rcu_read_lock();
1156 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1157 			struct drbd_device *device = peer_device->device;
1158 
1159 			if (!get_ldev(device))
1160 				continue;
1161 			kref_get(&device->kref);
1162 			rcu_read_unlock();
1163 
1164 			rv = blkdev_issue_flush(device->ldev->backing_bdev,
1165 					GFP_NOIO, NULL);
1166 			if (rv) {
1167 				drbd_info(device, "local disk flush failed with status %d\n", rv);
1168 				/* would rather check on EOPNOTSUPP, but that is not reliable.
1169 				 * don't try again for ANY return value != 0
1170 				 * if (rv == -EOPNOTSUPP) */
1171 				drbd_bump_write_ordering(connection, WO_drain_io);
1172 			}
1173 			put_ldev(device);
1174 			kref_put(&device->kref, drbd_destroy_device);
1175 
1176 			rcu_read_lock();
1177 			if (rv)
1178 				break;
1179 		}
1180 		rcu_read_unlock();
1181 	}
1182 }
1183 
1184 /**
1185  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1186  * @device:	DRBD device.
1187  * @epoch:	Epoch object.
1188  * @ev:		Epoch event.
1189  */
1190 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1191 					       struct drbd_epoch *epoch,
1192 					       enum epoch_event ev)
1193 {
1194 	int epoch_size;
1195 	struct drbd_epoch *next_epoch;
1196 	enum finish_epoch rv = FE_STILL_LIVE;
1197 
1198 	spin_lock(&connection->epoch_lock);
1199 	do {
1200 		next_epoch = NULL;
1201 
1202 		epoch_size = atomic_read(&epoch->epoch_size);
1203 
1204 		switch (ev & ~EV_CLEANUP) {
1205 		case EV_PUT:
1206 			atomic_dec(&epoch->active);
1207 			break;
1208 		case EV_GOT_BARRIER_NR:
1209 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1210 			break;
1211 		case EV_BECAME_LAST:
1212 			/* nothing to do*/
1213 			break;
1214 		}
1215 
1216 		if (epoch_size != 0 &&
1217 		    atomic_read(&epoch->active) == 0 &&
1218 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1219 			if (!(ev & EV_CLEANUP)) {
1220 				spin_unlock(&connection->epoch_lock);
1221 				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1222 				spin_lock(&connection->epoch_lock);
1223 			}
1224 #if 0
1225 			/* FIXME: dec unacked on connection, once we have
1226 			 * something to count pending connection packets in. */
1227 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1228 				dec_unacked(epoch->connection);
1229 #endif
1230 
1231 			if (connection->current_epoch != epoch) {
1232 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1233 				list_del(&epoch->list);
1234 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1235 				connection->epochs--;
1236 				kfree(epoch);
1237 
1238 				if (rv == FE_STILL_LIVE)
1239 					rv = FE_DESTROYED;
1240 			} else {
1241 				epoch->flags = 0;
1242 				atomic_set(&epoch->epoch_size, 0);
1243 				/* atomic_set(&epoch->active, 0); is already zero */
1244 				if (rv == FE_STILL_LIVE)
1245 					rv = FE_RECYCLED;
1246 			}
1247 		}
1248 
1249 		if (!next_epoch)
1250 			break;
1251 
1252 		epoch = next_epoch;
1253 	} while (1);
1254 
1255 	spin_unlock(&connection->epoch_lock);
1256 
1257 	return rv;
1258 }
1259 
1260 /**
1261  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1262  * @connection:	DRBD connection.
1263  * @wo:		Write ordering method to try.
1264  */
1265 void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
1266 {
1267 	struct disk_conf *dc;
1268 	struct drbd_peer_device *peer_device;
1269 	enum write_ordering_e pwo;
1270 	int vnr;
1271 	static char *write_ordering_str[] = {
1272 		[WO_none] = "none",
1273 		[WO_drain_io] = "drain",
1274 		[WO_bdev_flush] = "flush",
1275 	};
1276 
1277 	pwo = connection->write_ordering;
1278 	wo = min(pwo, wo);
1279 	rcu_read_lock();
1280 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1281 		struct drbd_device *device = peer_device->device;
1282 
1283 		if (!get_ldev_if_state(device, D_ATTACHING))
1284 			continue;
1285 		dc = rcu_dereference(device->ldev->disk_conf);
1286 
1287 		if (wo == WO_bdev_flush && !dc->disk_flushes)
1288 			wo = WO_drain_io;
1289 		if (wo == WO_drain_io && !dc->disk_drain)
1290 			wo = WO_none;
1291 		put_ldev(device);
1292 	}
1293 	rcu_read_unlock();
1294 	connection->write_ordering = wo;
1295 	if (pwo != connection->write_ordering || wo == WO_bdev_flush)
1296 		drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
1297 }
1298 
1299 /**
1300  * drbd_submit_peer_request()
1301  * @device:	DRBD device.
1302  * @peer_req:	peer request
1303  * @rw:		flag field, see bio->bi_rw
1304  *
1305  * May spread the pages to multiple bios,
1306  * depending on bio_add_page restrictions.
1307  *
1308  * Returns 0 if all bios have been submitted,
1309  * -ENOMEM if we could not allocate enough bios,
1310  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1311  *  single page to an empty bio (which should never happen and likely indicates
1312  *  that the lower level IO stack is in some way broken). This has been observed
1313  *  on certain Xen deployments.
1314  */
1315 /* TODO allocate from our own bio_set. */
1316 int drbd_submit_peer_request(struct drbd_device *device,
1317 			     struct drbd_peer_request *peer_req,
1318 			     const unsigned rw, const int fault_type)
1319 {
1320 	struct bio *bios = NULL;
1321 	struct bio *bio;
1322 	struct page *page = peer_req->pages;
1323 	sector_t sector = peer_req->i.sector;
1324 	unsigned ds = peer_req->i.size;
1325 	unsigned n_bios = 0;
1326 	unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1327 	int err = -ENOMEM;
1328 
1329 	if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1330 		/* wait for all pending IO completions, before we start
1331 		 * zeroing things out. */
1332 		conn_wait_active_ee_empty(first_peer_device(device)->connection);
1333 		if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1334 			sector, ds >> 9, GFP_NOIO))
1335 			peer_req->flags |= EE_WAS_ERROR;
1336 		drbd_endio_write_sec_final(peer_req);
1337 		return 0;
1338 	}
1339 
1340 	/* Discards don't have any payload.
1341 	 * But the scsi layer still expects a bio_vec it can use internally,
1342 	 * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1343 	if (peer_req->flags & EE_IS_TRIM)
1344 		nr_pages = 1;
1345 
1346 	/* In most cases, we will only need one bio.  But in case the lower
1347 	 * level restrictions happen to be different at this offset on this
1348 	 * side than those of the sending peer, we may need to submit the
1349 	 * request in more than one bio.
1350 	 *
1351 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1352 	 * generated bio, but a bio allocated on behalf of the peer.
1353 	 */
1354 next_bio:
1355 	bio = bio_alloc(GFP_NOIO, nr_pages);
1356 	if (!bio) {
1357 		drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1358 		goto fail;
1359 	}
1360 	/* > peer_req->i.sector, unless this is the first bio */
1361 	bio->bi_iter.bi_sector = sector;
1362 	bio->bi_bdev = device->ldev->backing_bdev;
1363 	bio->bi_rw = rw;
1364 	bio->bi_private = peer_req;
1365 	bio->bi_end_io = drbd_peer_request_endio;
1366 
1367 	bio->bi_next = bios;
1368 	bios = bio;
1369 	++n_bios;
1370 
1371 	if (rw & REQ_DISCARD) {
1372 		bio->bi_iter.bi_size = ds;
1373 		goto submit;
1374 	}
1375 
1376 	page_chain_for_each(page) {
1377 		unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1378 		if (!bio_add_page(bio, page, len, 0)) {
1379 			/* A single page must always be possible!
1380 			 * But in case it fails anyways,
1381 			 * we deal with it, and complain (below). */
1382 			if (bio->bi_vcnt == 0) {
1383 				drbd_err(device,
1384 					"bio_add_page failed for len=%u, "
1385 					"bi_vcnt=0 (bi_sector=%llu)\n",
1386 					len, (uint64_t)bio->bi_iter.bi_sector);
1387 				err = -ENOSPC;
1388 				goto fail;
1389 			}
1390 			goto next_bio;
1391 		}
1392 		ds -= len;
1393 		sector += len >> 9;
1394 		--nr_pages;
1395 	}
1396 	D_ASSERT(device, ds == 0);
1397 submit:
1398 	D_ASSERT(device, page == NULL);
1399 
1400 	atomic_set(&peer_req->pending_bios, n_bios);
1401 	do {
1402 		bio = bios;
1403 		bios = bios->bi_next;
1404 		bio->bi_next = NULL;
1405 
1406 		drbd_generic_make_request(device, fault_type, bio);
1407 	} while (bios);
1408 	return 0;
1409 
1410 fail:
1411 	while (bios) {
1412 		bio = bios;
1413 		bios = bios->bi_next;
1414 		bio_put(bio);
1415 	}
1416 	return err;
1417 }
1418 
1419 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1420 					     struct drbd_peer_request *peer_req)
1421 {
1422 	struct drbd_interval *i = &peer_req->i;
1423 
1424 	drbd_remove_interval(&device->write_requests, i);
1425 	drbd_clear_interval(i);
1426 
1427 	/* Wake up any processes waiting for this peer request to complete.  */
1428 	if (i->waiting)
1429 		wake_up(&device->misc_wait);
1430 }
1431 
1432 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1433 {
1434 	struct drbd_peer_device *peer_device;
1435 	int vnr;
1436 
1437 	rcu_read_lock();
1438 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1439 		struct drbd_device *device = peer_device->device;
1440 
1441 		kref_get(&device->kref);
1442 		rcu_read_unlock();
1443 		drbd_wait_ee_list_empty(device, &device->active_ee);
1444 		kref_put(&device->kref, drbd_destroy_device);
1445 		rcu_read_lock();
1446 	}
1447 	rcu_read_unlock();
1448 }
1449 
1450 static struct drbd_peer_device *
1451 conn_peer_device(struct drbd_connection *connection, int volume_number)
1452 {
1453 	return idr_find(&connection->peer_devices, volume_number);
1454 }
1455 
1456 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1457 {
1458 	int rv;
1459 	struct p_barrier *p = pi->data;
1460 	struct drbd_epoch *epoch;
1461 
1462 	/* FIXME these are unacked on connection,
1463 	 * not a specific (peer)device.
1464 	 */
1465 	connection->current_epoch->barrier_nr = p->barrier;
1466 	connection->current_epoch->connection = connection;
1467 	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1468 
1469 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1470 	 * the activity log, which means it would not be resynced in case the
1471 	 * R_PRIMARY crashes now.
1472 	 * Therefore we must send the barrier_ack after the barrier request was
1473 	 * completed. */
1474 	switch (connection->write_ordering) {
1475 	case WO_none:
1476 		if (rv == FE_RECYCLED)
1477 			return 0;
1478 
1479 		/* receiver context, in the writeout path of the other node.
1480 		 * avoid potential distributed deadlock */
1481 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1482 		if (epoch)
1483 			break;
1484 		else
1485 			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1486 			/* Fall through */
1487 
1488 	case WO_bdev_flush:
1489 	case WO_drain_io:
1490 		conn_wait_active_ee_empty(connection);
1491 		drbd_flush(connection);
1492 
1493 		if (atomic_read(&connection->current_epoch->epoch_size)) {
1494 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1495 			if (epoch)
1496 				break;
1497 		}
1498 
1499 		return 0;
1500 	default:
1501 		drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
1502 		return -EIO;
1503 	}
1504 
1505 	epoch->flags = 0;
1506 	atomic_set(&epoch->epoch_size, 0);
1507 	atomic_set(&epoch->active, 0);
1508 
1509 	spin_lock(&connection->epoch_lock);
1510 	if (atomic_read(&connection->current_epoch->epoch_size)) {
1511 		list_add(&epoch->list, &connection->current_epoch->list);
1512 		connection->current_epoch = epoch;
1513 		connection->epochs++;
1514 	} else {
1515 		/* The current_epoch got recycled while we allocated this one... */
1516 		kfree(epoch);
1517 	}
1518 	spin_unlock(&connection->epoch_lock);
1519 
1520 	return 0;
1521 }
1522 
1523 /* used from receive_RSDataReply (recv_resync_read)
1524  * and from receive_Data */
1525 static struct drbd_peer_request *
1526 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1527 	      struct packet_info *pi) __must_hold(local)
1528 {
1529 	struct drbd_device *device = peer_device->device;
1530 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
1531 	struct drbd_peer_request *peer_req;
1532 	struct page *page;
1533 	int dgs, ds, err;
1534 	int data_size = pi->size;
1535 	void *dig_in = peer_device->connection->int_dig_in;
1536 	void *dig_vv = peer_device->connection->int_dig_vv;
1537 	unsigned long *data;
1538 	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1539 
1540 	dgs = 0;
1541 	if (!trim && peer_device->connection->peer_integrity_tfm) {
1542 		dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1543 		/*
1544 		 * FIXME: Receive the incoming digest into the receive buffer
1545 		 *	  here, together with its struct p_data?
1546 		 */
1547 		err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1548 		if (err)
1549 			return NULL;
1550 		data_size -= dgs;
1551 	}
1552 
1553 	if (trim) {
1554 		D_ASSERT(peer_device, data_size == 0);
1555 		data_size = be32_to_cpu(trim->size);
1556 	}
1557 
1558 	if (!expect(IS_ALIGNED(data_size, 512)))
1559 		return NULL;
1560 	/* prepare for larger trim requests. */
1561 	if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1562 		return NULL;
1563 
1564 	/* even though we trust out peer,
1565 	 * we sometimes have to double check. */
1566 	if (sector + (data_size>>9) > capacity) {
1567 		drbd_err(device, "request from peer beyond end of local disk: "
1568 			"capacity: %llus < sector: %llus + size: %u\n",
1569 			(unsigned long long)capacity,
1570 			(unsigned long long)sector, data_size);
1571 		return NULL;
1572 	}
1573 
1574 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1575 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1576 	 * which in turn might block on the other node at this very place.  */
1577 	peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1578 	if (!peer_req)
1579 		return NULL;
1580 
1581 	if (trim)
1582 		return peer_req;
1583 
1584 	ds = data_size;
1585 	page = peer_req->pages;
1586 	page_chain_for_each(page) {
1587 		unsigned len = min_t(int, ds, PAGE_SIZE);
1588 		data = kmap(page);
1589 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1590 		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1591 			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1592 			data[0] = data[0] ^ (unsigned long)-1;
1593 		}
1594 		kunmap(page);
1595 		if (err) {
1596 			drbd_free_peer_req(device, peer_req);
1597 			return NULL;
1598 		}
1599 		ds -= len;
1600 	}
1601 
1602 	if (dgs) {
1603 		drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1604 		if (memcmp(dig_in, dig_vv, dgs)) {
1605 			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1606 				(unsigned long long)sector, data_size);
1607 			drbd_free_peer_req(device, peer_req);
1608 			return NULL;
1609 		}
1610 	}
1611 	device->recv_cnt += data_size>>9;
1612 	return peer_req;
1613 }
1614 
1615 /* drbd_drain_block() just takes a data block
1616  * out of the socket input buffer, and discards it.
1617  */
1618 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1619 {
1620 	struct page *page;
1621 	int err = 0;
1622 	void *data;
1623 
1624 	if (!data_size)
1625 		return 0;
1626 
1627 	page = drbd_alloc_pages(peer_device, 1, 1);
1628 
1629 	data = kmap(page);
1630 	while (data_size) {
1631 		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1632 
1633 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1634 		if (err)
1635 			break;
1636 		data_size -= len;
1637 	}
1638 	kunmap(page);
1639 	drbd_free_pages(peer_device->device, page, 0);
1640 	return err;
1641 }
1642 
1643 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1644 			   sector_t sector, int data_size)
1645 {
1646 	struct bio_vec bvec;
1647 	struct bvec_iter iter;
1648 	struct bio *bio;
1649 	int dgs, err, expect;
1650 	void *dig_in = peer_device->connection->int_dig_in;
1651 	void *dig_vv = peer_device->connection->int_dig_vv;
1652 
1653 	dgs = 0;
1654 	if (peer_device->connection->peer_integrity_tfm) {
1655 		dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1656 		err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1657 		if (err)
1658 			return err;
1659 		data_size -= dgs;
1660 	}
1661 
1662 	/* optimistically update recv_cnt.  if receiving fails below,
1663 	 * we disconnect anyways, and counters will be reset. */
1664 	peer_device->device->recv_cnt += data_size>>9;
1665 
1666 	bio = req->master_bio;
1667 	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1668 
1669 	bio_for_each_segment(bvec, bio, iter) {
1670 		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1671 		expect = min_t(int, data_size, bvec.bv_len);
1672 		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1673 		kunmap(bvec.bv_page);
1674 		if (err)
1675 			return err;
1676 		data_size -= expect;
1677 	}
1678 
1679 	if (dgs) {
1680 		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1681 		if (memcmp(dig_in, dig_vv, dgs)) {
1682 			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1683 			return -EINVAL;
1684 		}
1685 	}
1686 
1687 	D_ASSERT(peer_device->device, data_size == 0);
1688 	return 0;
1689 }
1690 
1691 /*
1692  * e_end_resync_block() is called in asender context via
1693  * drbd_finish_peer_reqs().
1694  */
1695 static int e_end_resync_block(struct drbd_work *w, int unused)
1696 {
1697 	struct drbd_peer_request *peer_req =
1698 		container_of(w, struct drbd_peer_request, w);
1699 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1700 	struct drbd_device *device = peer_device->device;
1701 	sector_t sector = peer_req->i.sector;
1702 	int err;
1703 
1704 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1705 
1706 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1707 		drbd_set_in_sync(device, sector, peer_req->i.size);
1708 		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1709 	} else {
1710 		/* Record failure to sync */
1711 		drbd_rs_failed_io(device, sector, peer_req->i.size);
1712 
1713 		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1714 	}
1715 	dec_unacked(device);
1716 
1717 	return err;
1718 }
1719 
1720 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1721 			    struct packet_info *pi) __releases(local)
1722 {
1723 	struct drbd_device *device = peer_device->device;
1724 	struct drbd_peer_request *peer_req;
1725 
1726 	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1727 	if (!peer_req)
1728 		goto fail;
1729 
1730 	dec_rs_pending(device);
1731 
1732 	inc_unacked(device);
1733 	/* corresponding dec_unacked() in e_end_resync_block()
1734 	 * respective _drbd_clear_done_ee */
1735 
1736 	peer_req->w.cb = e_end_resync_block;
1737 
1738 	spin_lock_irq(&device->resource->req_lock);
1739 	list_add(&peer_req->w.list, &device->sync_ee);
1740 	spin_unlock_irq(&device->resource->req_lock);
1741 
1742 	atomic_add(pi->size >> 9, &device->rs_sect_ev);
1743 	if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1744 		return 0;
1745 
1746 	/* don't care for the reason here */
1747 	drbd_err(device, "submit failed, triggering re-connect\n");
1748 	spin_lock_irq(&device->resource->req_lock);
1749 	list_del(&peer_req->w.list);
1750 	spin_unlock_irq(&device->resource->req_lock);
1751 
1752 	drbd_free_peer_req(device, peer_req);
1753 fail:
1754 	put_ldev(device);
1755 	return -EIO;
1756 }
1757 
1758 static struct drbd_request *
1759 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1760 	     sector_t sector, bool missing_ok, const char *func)
1761 {
1762 	struct drbd_request *req;
1763 
1764 	/* Request object according to our peer */
1765 	req = (struct drbd_request *)(unsigned long)id;
1766 	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1767 		return req;
1768 	if (!missing_ok) {
1769 		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1770 			(unsigned long)id, (unsigned long long)sector);
1771 	}
1772 	return NULL;
1773 }
1774 
1775 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1776 {
1777 	struct drbd_peer_device *peer_device;
1778 	struct drbd_device *device;
1779 	struct drbd_request *req;
1780 	sector_t sector;
1781 	int err;
1782 	struct p_data *p = pi->data;
1783 
1784 	peer_device = conn_peer_device(connection, pi->vnr);
1785 	if (!peer_device)
1786 		return -EIO;
1787 	device = peer_device->device;
1788 
1789 	sector = be64_to_cpu(p->sector);
1790 
1791 	spin_lock_irq(&device->resource->req_lock);
1792 	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1793 	spin_unlock_irq(&device->resource->req_lock);
1794 	if (unlikely(!req))
1795 		return -EIO;
1796 
1797 	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1798 	 * special casing it there for the various failure cases.
1799 	 * still no race with drbd_fail_pending_reads */
1800 	err = recv_dless_read(peer_device, req, sector, pi->size);
1801 	if (!err)
1802 		req_mod(req, DATA_RECEIVED);
1803 	/* else: nothing. handled from drbd_disconnect...
1804 	 * I don't think we may complete this just yet
1805 	 * in case we are "on-disconnect: freeze" */
1806 
1807 	return err;
1808 }
1809 
1810 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1811 {
1812 	struct drbd_peer_device *peer_device;
1813 	struct drbd_device *device;
1814 	sector_t sector;
1815 	int err;
1816 	struct p_data *p = pi->data;
1817 
1818 	peer_device = conn_peer_device(connection, pi->vnr);
1819 	if (!peer_device)
1820 		return -EIO;
1821 	device = peer_device->device;
1822 
1823 	sector = be64_to_cpu(p->sector);
1824 	D_ASSERT(device, p->block_id == ID_SYNCER);
1825 
1826 	if (get_ldev(device)) {
1827 		/* data is submitted to disk within recv_resync_read.
1828 		 * corresponding put_ldev done below on error,
1829 		 * or in drbd_peer_request_endio. */
1830 		err = recv_resync_read(peer_device, sector, pi);
1831 	} else {
1832 		if (__ratelimit(&drbd_ratelimit_state))
1833 			drbd_err(device, "Can not write resync data to local disk.\n");
1834 
1835 		err = drbd_drain_block(peer_device, pi->size);
1836 
1837 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1838 	}
1839 
1840 	atomic_add(pi->size >> 9, &device->rs_sect_in);
1841 
1842 	return err;
1843 }
1844 
1845 static void restart_conflicting_writes(struct drbd_device *device,
1846 				       sector_t sector, int size)
1847 {
1848 	struct drbd_interval *i;
1849 	struct drbd_request *req;
1850 
1851 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1852 		if (!i->local)
1853 			continue;
1854 		req = container_of(i, struct drbd_request, i);
1855 		if (req->rq_state & RQ_LOCAL_PENDING ||
1856 		    !(req->rq_state & RQ_POSTPONED))
1857 			continue;
1858 		/* as it is RQ_POSTPONED, this will cause it to
1859 		 * be queued on the retry workqueue. */
1860 		__req_mod(req, CONFLICT_RESOLVED, NULL);
1861 	}
1862 }
1863 
1864 /*
1865  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1866  */
1867 static int e_end_block(struct drbd_work *w, int cancel)
1868 {
1869 	struct drbd_peer_request *peer_req =
1870 		container_of(w, struct drbd_peer_request, w);
1871 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1872 	struct drbd_device *device = peer_device->device;
1873 	sector_t sector = peer_req->i.sector;
1874 	int err = 0, pcmd;
1875 
1876 	if (peer_req->flags & EE_SEND_WRITE_ACK) {
1877 		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1878 			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1879 				device->state.conn <= C_PAUSED_SYNC_T &&
1880 				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1881 				P_RS_WRITE_ACK : P_WRITE_ACK;
1882 			err = drbd_send_ack(peer_device, pcmd, peer_req);
1883 			if (pcmd == P_RS_WRITE_ACK)
1884 				drbd_set_in_sync(device, sector, peer_req->i.size);
1885 		} else {
1886 			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1887 			/* we expect it to be marked out of sync anyways...
1888 			 * maybe assert this?  */
1889 		}
1890 		dec_unacked(device);
1891 	}
1892 	/* we delete from the conflict detection hash _after_ we sent out the
1893 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1894 	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1895 		spin_lock_irq(&device->resource->req_lock);
1896 		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1897 		drbd_remove_epoch_entry_interval(device, peer_req);
1898 		if (peer_req->flags & EE_RESTART_REQUESTS)
1899 			restart_conflicting_writes(device, sector, peer_req->i.size);
1900 		spin_unlock_irq(&device->resource->req_lock);
1901 	} else
1902 		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1903 
1904 	drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1905 
1906 	return err;
1907 }
1908 
1909 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1910 {
1911 	struct drbd_peer_request *peer_req =
1912 		container_of(w, struct drbd_peer_request, w);
1913 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1914 	int err;
1915 
1916 	err = drbd_send_ack(peer_device, ack, peer_req);
1917 	dec_unacked(peer_device->device);
1918 
1919 	return err;
1920 }
1921 
1922 static int e_send_superseded(struct drbd_work *w, int unused)
1923 {
1924 	return e_send_ack(w, P_SUPERSEDED);
1925 }
1926 
1927 static int e_send_retry_write(struct drbd_work *w, int unused)
1928 {
1929 	struct drbd_peer_request *peer_req =
1930 		container_of(w, struct drbd_peer_request, w);
1931 	struct drbd_connection *connection = peer_req->peer_device->connection;
1932 
1933 	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1934 			     P_RETRY_WRITE : P_SUPERSEDED);
1935 }
1936 
1937 static bool seq_greater(u32 a, u32 b)
1938 {
1939 	/*
1940 	 * We assume 32-bit wrap-around here.
1941 	 * For 24-bit wrap-around, we would have to shift:
1942 	 *  a <<= 8; b <<= 8;
1943 	 */
1944 	return (s32)a - (s32)b > 0;
1945 }
1946 
1947 static u32 seq_max(u32 a, u32 b)
1948 {
1949 	return seq_greater(a, b) ? a : b;
1950 }
1951 
1952 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
1953 {
1954 	struct drbd_device *device = peer_device->device;
1955 	unsigned int newest_peer_seq;
1956 
1957 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
1958 		spin_lock(&device->peer_seq_lock);
1959 		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
1960 		device->peer_seq = newest_peer_seq;
1961 		spin_unlock(&device->peer_seq_lock);
1962 		/* wake up only if we actually changed device->peer_seq */
1963 		if (peer_seq == newest_peer_seq)
1964 			wake_up(&device->seq_wait);
1965 	}
1966 }
1967 
1968 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1969 {
1970 	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1971 }
1972 
1973 /* maybe change sync_ee into interval trees as well? */
1974 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
1975 {
1976 	struct drbd_peer_request *rs_req;
1977 	bool rv = 0;
1978 
1979 	spin_lock_irq(&device->resource->req_lock);
1980 	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
1981 		if (overlaps(peer_req->i.sector, peer_req->i.size,
1982 			     rs_req->i.sector, rs_req->i.size)) {
1983 			rv = 1;
1984 			break;
1985 		}
1986 	}
1987 	spin_unlock_irq(&device->resource->req_lock);
1988 
1989 	return rv;
1990 }
1991 
1992 /* Called from receive_Data.
1993  * Synchronize packets on sock with packets on msock.
1994  *
1995  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1996  * packet traveling on msock, they are still processed in the order they have
1997  * been sent.
1998  *
1999  * Note: we don't care for Ack packets overtaking P_DATA packets.
2000  *
2001  * In case packet_seq is larger than device->peer_seq number, there are
2002  * outstanding packets on the msock. We wait for them to arrive.
2003  * In case we are the logically next packet, we update device->peer_seq
2004  * ourselves. Correctly handles 32bit wrap around.
2005  *
2006  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2007  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2008  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2009  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2010  *
2011  * returns 0 if we may process the packet,
2012  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2013 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2014 {
2015 	struct drbd_device *device = peer_device->device;
2016 	DEFINE_WAIT(wait);
2017 	long timeout;
2018 	int ret = 0, tp;
2019 
2020 	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2021 		return 0;
2022 
2023 	spin_lock(&device->peer_seq_lock);
2024 	for (;;) {
2025 		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2026 			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2027 			break;
2028 		}
2029 
2030 		if (signal_pending(current)) {
2031 			ret = -ERESTARTSYS;
2032 			break;
2033 		}
2034 
2035 		rcu_read_lock();
2036 		tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2037 		rcu_read_unlock();
2038 
2039 		if (!tp)
2040 			break;
2041 
2042 		/* Only need to wait if two_primaries is enabled */
2043 		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2044 		spin_unlock(&device->peer_seq_lock);
2045 		rcu_read_lock();
2046 		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2047 		rcu_read_unlock();
2048 		timeout = schedule_timeout(timeout);
2049 		spin_lock(&device->peer_seq_lock);
2050 		if (!timeout) {
2051 			ret = -ETIMEDOUT;
2052 			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2053 			break;
2054 		}
2055 	}
2056 	spin_unlock(&device->peer_seq_lock);
2057 	finish_wait(&device->seq_wait, &wait);
2058 	return ret;
2059 }
2060 
2061 /* see also bio_flags_to_wire()
2062  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2063  * flags and back. We may replicate to other kernel versions. */
2064 static unsigned long wire_flags_to_bio(u32 dpf)
2065 {
2066 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2067 		(dpf & DP_FUA ? REQ_FUA : 0) |
2068 		(dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2069 		(dpf & DP_DISCARD ? REQ_DISCARD : 0);
2070 }
2071 
2072 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2073 				    unsigned int size)
2074 {
2075 	struct drbd_interval *i;
2076 
2077     repeat:
2078 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2079 		struct drbd_request *req;
2080 		struct bio_and_error m;
2081 
2082 		if (!i->local)
2083 			continue;
2084 		req = container_of(i, struct drbd_request, i);
2085 		if (!(req->rq_state & RQ_POSTPONED))
2086 			continue;
2087 		req->rq_state &= ~RQ_POSTPONED;
2088 		__req_mod(req, NEG_ACKED, &m);
2089 		spin_unlock_irq(&device->resource->req_lock);
2090 		if (m.bio)
2091 			complete_master_bio(device, &m);
2092 		spin_lock_irq(&device->resource->req_lock);
2093 		goto repeat;
2094 	}
2095 }
2096 
2097 static int handle_write_conflicts(struct drbd_device *device,
2098 				  struct drbd_peer_request *peer_req)
2099 {
2100 	struct drbd_connection *connection = peer_req->peer_device->connection;
2101 	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2102 	sector_t sector = peer_req->i.sector;
2103 	const unsigned int size = peer_req->i.size;
2104 	struct drbd_interval *i;
2105 	bool equal;
2106 	int err;
2107 
2108 	/*
2109 	 * Inserting the peer request into the write_requests tree will prevent
2110 	 * new conflicting local requests from being added.
2111 	 */
2112 	drbd_insert_interval(&device->write_requests, &peer_req->i);
2113 
2114     repeat:
2115 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2116 		if (i == &peer_req->i)
2117 			continue;
2118 
2119 		if (!i->local) {
2120 			/*
2121 			 * Our peer has sent a conflicting remote request; this
2122 			 * should not happen in a two-node setup.  Wait for the
2123 			 * earlier peer request to complete.
2124 			 */
2125 			err = drbd_wait_misc(device, i);
2126 			if (err)
2127 				goto out;
2128 			goto repeat;
2129 		}
2130 
2131 		equal = i->sector == sector && i->size == size;
2132 		if (resolve_conflicts) {
2133 			/*
2134 			 * If the peer request is fully contained within the
2135 			 * overlapping request, it can be considered overwritten
2136 			 * and thus superseded; otherwise, it will be retried
2137 			 * once all overlapping requests have completed.
2138 			 */
2139 			bool superseded = i->sector <= sector && i->sector +
2140 				       (i->size >> 9) >= sector + (size >> 9);
2141 
2142 			if (!equal)
2143 				drbd_alert(device, "Concurrent writes detected: "
2144 					       "local=%llus +%u, remote=%llus +%u, "
2145 					       "assuming %s came first\n",
2146 					  (unsigned long long)i->sector, i->size,
2147 					  (unsigned long long)sector, size,
2148 					  superseded ? "local" : "remote");
2149 
2150 			inc_unacked(device);
2151 			peer_req->w.cb = superseded ? e_send_superseded :
2152 						   e_send_retry_write;
2153 			list_add_tail(&peer_req->w.list, &device->done_ee);
2154 			wake_asender(connection);
2155 
2156 			err = -ENOENT;
2157 			goto out;
2158 		} else {
2159 			struct drbd_request *req =
2160 				container_of(i, struct drbd_request, i);
2161 
2162 			if (!equal)
2163 				drbd_alert(device, "Concurrent writes detected: "
2164 					       "local=%llus +%u, remote=%llus +%u\n",
2165 					  (unsigned long long)i->sector, i->size,
2166 					  (unsigned long long)sector, size);
2167 
2168 			if (req->rq_state & RQ_LOCAL_PENDING ||
2169 			    !(req->rq_state & RQ_POSTPONED)) {
2170 				/*
2171 				 * Wait for the node with the discard flag to
2172 				 * decide if this request has been superseded
2173 				 * or needs to be retried.
2174 				 * Requests that have been superseded will
2175 				 * disappear from the write_requests tree.
2176 				 *
2177 				 * In addition, wait for the conflicting
2178 				 * request to finish locally before submitting
2179 				 * the conflicting peer request.
2180 				 */
2181 				err = drbd_wait_misc(device, &req->i);
2182 				if (err) {
2183 					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2184 					fail_postponed_requests(device, sector, size);
2185 					goto out;
2186 				}
2187 				goto repeat;
2188 			}
2189 			/*
2190 			 * Remember to restart the conflicting requests after
2191 			 * the new peer request has completed.
2192 			 */
2193 			peer_req->flags |= EE_RESTART_REQUESTS;
2194 		}
2195 	}
2196 	err = 0;
2197 
2198     out:
2199 	if (err)
2200 		drbd_remove_epoch_entry_interval(device, peer_req);
2201 	return err;
2202 }
2203 
2204 /* mirrored write */
2205 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2206 {
2207 	struct drbd_peer_device *peer_device;
2208 	struct drbd_device *device;
2209 	sector_t sector;
2210 	struct drbd_peer_request *peer_req;
2211 	struct p_data *p = pi->data;
2212 	u32 peer_seq = be32_to_cpu(p->seq_num);
2213 	int rw = WRITE;
2214 	u32 dp_flags;
2215 	int err, tp;
2216 
2217 	peer_device = conn_peer_device(connection, pi->vnr);
2218 	if (!peer_device)
2219 		return -EIO;
2220 	device = peer_device->device;
2221 
2222 	if (!get_ldev(device)) {
2223 		int err2;
2224 
2225 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2226 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2227 		atomic_inc(&connection->current_epoch->epoch_size);
2228 		err2 = drbd_drain_block(peer_device, pi->size);
2229 		if (!err)
2230 			err = err2;
2231 		return err;
2232 	}
2233 
2234 	/*
2235 	 * Corresponding put_ldev done either below (on various errors), or in
2236 	 * drbd_peer_request_endio, if we successfully submit the data at the
2237 	 * end of this function.
2238 	 */
2239 
2240 	sector = be64_to_cpu(p->sector);
2241 	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2242 	if (!peer_req) {
2243 		put_ldev(device);
2244 		return -EIO;
2245 	}
2246 
2247 	peer_req->w.cb = e_end_block;
2248 
2249 	dp_flags = be32_to_cpu(p->dp_flags);
2250 	rw |= wire_flags_to_bio(dp_flags);
2251 	if (pi->cmd == P_TRIM) {
2252 		struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2253 		peer_req->flags |= EE_IS_TRIM;
2254 		if (!blk_queue_discard(q))
2255 			peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2256 		D_ASSERT(peer_device, peer_req->i.size > 0);
2257 		D_ASSERT(peer_device, rw & REQ_DISCARD);
2258 		D_ASSERT(peer_device, peer_req->pages == NULL);
2259 	} else if (peer_req->pages == NULL) {
2260 		D_ASSERT(device, peer_req->i.size == 0);
2261 		D_ASSERT(device, dp_flags & DP_FLUSH);
2262 	}
2263 
2264 	if (dp_flags & DP_MAY_SET_IN_SYNC)
2265 		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2266 
2267 	spin_lock(&connection->epoch_lock);
2268 	peer_req->epoch = connection->current_epoch;
2269 	atomic_inc(&peer_req->epoch->epoch_size);
2270 	atomic_inc(&peer_req->epoch->active);
2271 	spin_unlock(&connection->epoch_lock);
2272 
2273 	rcu_read_lock();
2274 	tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2275 	rcu_read_unlock();
2276 	if (tp) {
2277 		peer_req->flags |= EE_IN_INTERVAL_TREE;
2278 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2279 		if (err)
2280 			goto out_interrupted;
2281 		spin_lock_irq(&device->resource->req_lock);
2282 		err = handle_write_conflicts(device, peer_req);
2283 		if (err) {
2284 			spin_unlock_irq(&device->resource->req_lock);
2285 			if (err == -ENOENT) {
2286 				put_ldev(device);
2287 				return 0;
2288 			}
2289 			goto out_interrupted;
2290 		}
2291 	} else {
2292 		update_peer_seq(peer_device, peer_seq);
2293 		spin_lock_irq(&device->resource->req_lock);
2294 	}
2295 	/* if we use the zeroout fallback code, we process synchronously
2296 	 * and we wait for all pending requests, respectively wait for
2297 	 * active_ee to become empty in drbd_submit_peer_request();
2298 	 * better not add ourselves here. */
2299 	if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2300 		list_add(&peer_req->w.list, &device->active_ee);
2301 	spin_unlock_irq(&device->resource->req_lock);
2302 
2303 	if (device->state.conn == C_SYNC_TARGET)
2304 		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2305 
2306 	if (peer_device->connection->agreed_pro_version < 100) {
2307 		rcu_read_lock();
2308 		switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) {
2309 		case DRBD_PROT_C:
2310 			dp_flags |= DP_SEND_WRITE_ACK;
2311 			break;
2312 		case DRBD_PROT_B:
2313 			dp_flags |= DP_SEND_RECEIVE_ACK;
2314 			break;
2315 		}
2316 		rcu_read_unlock();
2317 	}
2318 
2319 	if (dp_flags & DP_SEND_WRITE_ACK) {
2320 		peer_req->flags |= EE_SEND_WRITE_ACK;
2321 		inc_unacked(device);
2322 		/* corresponding dec_unacked() in e_end_block()
2323 		 * respective _drbd_clear_done_ee */
2324 	}
2325 
2326 	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2327 		/* I really don't like it that the receiver thread
2328 		 * sends on the msock, but anyways */
2329 		drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2330 	}
2331 
2332 	if (device->state.pdsk < D_INCONSISTENT) {
2333 		/* In case we have the only disk of the cluster, */
2334 		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2335 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2336 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2337 		drbd_al_begin_io(device, &peer_req->i, true);
2338 	}
2339 
2340 	err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2341 	if (!err)
2342 		return 0;
2343 
2344 	/* don't care for the reason here */
2345 	drbd_err(device, "submit failed, triggering re-connect\n");
2346 	spin_lock_irq(&device->resource->req_lock);
2347 	list_del(&peer_req->w.list);
2348 	drbd_remove_epoch_entry_interval(device, peer_req);
2349 	spin_unlock_irq(&device->resource->req_lock);
2350 	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2351 		drbd_al_complete_io(device, &peer_req->i);
2352 
2353 out_interrupted:
2354 	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2355 	put_ldev(device);
2356 	drbd_free_peer_req(device, peer_req);
2357 	return err;
2358 }
2359 
2360 /* We may throttle resync, if the lower device seems to be busy,
2361  * and current sync rate is above c_min_rate.
2362  *
2363  * To decide whether or not the lower device is busy, we use a scheme similar
2364  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2365  * (more than 64 sectors) of activity we cannot account for with our own resync
2366  * activity, it obviously is "busy".
2367  *
2368  * The current sync rate used here uses only the most recent two step marks,
2369  * to have a short time average so we can react faster.
2370  */
2371 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
2372 {
2373 	struct lc_element *tmp;
2374 	bool throttle = true;
2375 
2376 	if (!drbd_rs_c_min_rate_throttle(device))
2377 		return false;
2378 
2379 	spin_lock_irq(&device->al_lock);
2380 	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2381 	if (tmp) {
2382 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2383 		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2384 			throttle = false;
2385 		/* Do not slow down if app IO is already waiting for this extent */
2386 	}
2387 	spin_unlock_irq(&device->al_lock);
2388 
2389 	return throttle;
2390 }
2391 
2392 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2393 {
2394 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2395 	unsigned long db, dt, dbdt;
2396 	unsigned int c_min_rate;
2397 	int curr_events;
2398 
2399 	rcu_read_lock();
2400 	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2401 	rcu_read_unlock();
2402 
2403 	/* feature disabled? */
2404 	if (c_min_rate == 0)
2405 		return false;
2406 
2407 	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2408 		      (int)part_stat_read(&disk->part0, sectors[1]) -
2409 			atomic_read(&device->rs_sect_ev);
2410 	if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
2411 		unsigned long rs_left;
2412 		int i;
2413 
2414 		device->rs_last_events = curr_events;
2415 
2416 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2417 		 * approx. */
2418 		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2419 
2420 		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2421 			rs_left = device->ov_left;
2422 		else
2423 			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2424 
2425 		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2426 		if (!dt)
2427 			dt++;
2428 		db = device->rs_mark_left[i] - rs_left;
2429 		dbdt = Bit2KB(db/dt);
2430 
2431 		if (dbdt > c_min_rate)
2432 			return true;
2433 	}
2434 	return false;
2435 }
2436 
2437 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2438 {
2439 	struct drbd_peer_device *peer_device;
2440 	struct drbd_device *device;
2441 	sector_t sector;
2442 	sector_t capacity;
2443 	struct drbd_peer_request *peer_req;
2444 	struct digest_info *di = NULL;
2445 	int size, verb;
2446 	unsigned int fault_type;
2447 	struct p_block_req *p =	pi->data;
2448 
2449 	peer_device = conn_peer_device(connection, pi->vnr);
2450 	if (!peer_device)
2451 		return -EIO;
2452 	device = peer_device->device;
2453 	capacity = drbd_get_capacity(device->this_bdev);
2454 
2455 	sector = be64_to_cpu(p->sector);
2456 	size   = be32_to_cpu(p->blksize);
2457 
2458 	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2459 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2460 				(unsigned long long)sector, size);
2461 		return -EINVAL;
2462 	}
2463 	if (sector + (size>>9) > capacity) {
2464 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2465 				(unsigned long long)sector, size);
2466 		return -EINVAL;
2467 	}
2468 
2469 	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2470 		verb = 1;
2471 		switch (pi->cmd) {
2472 		case P_DATA_REQUEST:
2473 			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2474 			break;
2475 		case P_RS_DATA_REQUEST:
2476 		case P_CSUM_RS_REQUEST:
2477 		case P_OV_REQUEST:
2478 			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2479 			break;
2480 		case P_OV_REPLY:
2481 			verb = 0;
2482 			dec_rs_pending(device);
2483 			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2484 			break;
2485 		default:
2486 			BUG();
2487 		}
2488 		if (verb && __ratelimit(&drbd_ratelimit_state))
2489 			drbd_err(device, "Can not satisfy peer's read request, "
2490 			    "no local data.\n");
2491 
2492 		/* drain possibly payload */
2493 		return drbd_drain_block(peer_device, pi->size);
2494 	}
2495 
2496 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2497 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2498 	 * which in turn might block on the other node at this very place.  */
2499 	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2500 			true /* has real payload */, GFP_NOIO);
2501 	if (!peer_req) {
2502 		put_ldev(device);
2503 		return -ENOMEM;
2504 	}
2505 
2506 	switch (pi->cmd) {
2507 	case P_DATA_REQUEST:
2508 		peer_req->w.cb = w_e_end_data_req;
2509 		fault_type = DRBD_FAULT_DT_RD;
2510 		/* application IO, don't drbd_rs_begin_io */
2511 		goto submit;
2512 
2513 	case P_RS_DATA_REQUEST:
2514 		peer_req->w.cb = w_e_end_rsdata_req;
2515 		fault_type = DRBD_FAULT_RS_RD;
2516 		/* used in the sector offset progress display */
2517 		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2518 		break;
2519 
2520 	case P_OV_REPLY:
2521 	case P_CSUM_RS_REQUEST:
2522 		fault_type = DRBD_FAULT_RS_RD;
2523 		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2524 		if (!di)
2525 			goto out_free_e;
2526 
2527 		di->digest_size = pi->size;
2528 		di->digest = (((char *)di)+sizeof(struct digest_info));
2529 
2530 		peer_req->digest = di;
2531 		peer_req->flags |= EE_HAS_DIGEST;
2532 
2533 		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2534 			goto out_free_e;
2535 
2536 		if (pi->cmd == P_CSUM_RS_REQUEST) {
2537 			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2538 			peer_req->w.cb = w_e_end_csum_rs_req;
2539 			/* used in the sector offset progress display */
2540 			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2541 		} else if (pi->cmd == P_OV_REPLY) {
2542 			/* track progress, we may need to throttle */
2543 			atomic_add(size >> 9, &device->rs_sect_in);
2544 			peer_req->w.cb = w_e_end_ov_reply;
2545 			dec_rs_pending(device);
2546 			/* drbd_rs_begin_io done when we sent this request,
2547 			 * but accounting still needs to be done. */
2548 			goto submit_for_resync;
2549 		}
2550 		break;
2551 
2552 	case P_OV_REQUEST:
2553 		if (device->ov_start_sector == ~(sector_t)0 &&
2554 		    peer_device->connection->agreed_pro_version >= 90) {
2555 			unsigned long now = jiffies;
2556 			int i;
2557 			device->ov_start_sector = sector;
2558 			device->ov_position = sector;
2559 			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2560 			device->rs_total = device->ov_left;
2561 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2562 				device->rs_mark_left[i] = device->ov_left;
2563 				device->rs_mark_time[i] = now;
2564 			}
2565 			drbd_info(device, "Online Verify start sector: %llu\n",
2566 					(unsigned long long)sector);
2567 		}
2568 		peer_req->w.cb = w_e_end_ov_req;
2569 		fault_type = DRBD_FAULT_RS_RD;
2570 		break;
2571 
2572 	default:
2573 		BUG();
2574 	}
2575 
2576 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2577 	 * wrt the receiver, but it is not as straightforward as it may seem.
2578 	 * Various places in the resync start and stop logic assume resync
2579 	 * requests are processed in order, requeuing this on the worker thread
2580 	 * introduces a bunch of new code for synchronization between threads.
2581 	 *
2582 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2583 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2584 	 * for application writes for the same time.  For now, just throttle
2585 	 * here, where the rest of the code expects the receiver to sleep for
2586 	 * a while, anyways.
2587 	 */
2588 
2589 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2590 	 * this defers syncer requests for some time, before letting at least
2591 	 * on request through.  The resync controller on the receiving side
2592 	 * will adapt to the incoming rate accordingly.
2593 	 *
2594 	 * We cannot throttle here if remote is Primary/SyncTarget:
2595 	 * we would also throttle its application reads.
2596 	 * In that case, throttling is done on the SyncTarget only.
2597 	 */
2598 	if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
2599 		schedule_timeout_uninterruptible(HZ/10);
2600 	if (drbd_rs_begin_io(device, sector))
2601 		goto out_free_e;
2602 
2603 submit_for_resync:
2604 	atomic_add(size >> 9, &device->rs_sect_ev);
2605 
2606 submit:
2607 	inc_unacked(device);
2608 	spin_lock_irq(&device->resource->req_lock);
2609 	list_add_tail(&peer_req->w.list, &device->read_ee);
2610 	spin_unlock_irq(&device->resource->req_lock);
2611 
2612 	if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2613 		return 0;
2614 
2615 	/* don't care for the reason here */
2616 	drbd_err(device, "submit failed, triggering re-connect\n");
2617 	spin_lock_irq(&device->resource->req_lock);
2618 	list_del(&peer_req->w.list);
2619 	spin_unlock_irq(&device->resource->req_lock);
2620 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2621 
2622 out_free_e:
2623 	put_ldev(device);
2624 	drbd_free_peer_req(device, peer_req);
2625 	return -EIO;
2626 }
2627 
2628 /**
2629  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2630  */
2631 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2632 {
2633 	struct drbd_device *device = peer_device->device;
2634 	int self, peer, rv = -100;
2635 	unsigned long ch_self, ch_peer;
2636 	enum drbd_after_sb_p after_sb_0p;
2637 
2638 	self = device->ldev->md.uuid[UI_BITMAP] & 1;
2639 	peer = device->p_uuid[UI_BITMAP] & 1;
2640 
2641 	ch_peer = device->p_uuid[UI_SIZE];
2642 	ch_self = device->comm_bm_set;
2643 
2644 	rcu_read_lock();
2645 	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2646 	rcu_read_unlock();
2647 	switch (after_sb_0p) {
2648 	case ASB_CONSENSUS:
2649 	case ASB_DISCARD_SECONDARY:
2650 	case ASB_CALL_HELPER:
2651 	case ASB_VIOLENTLY:
2652 		drbd_err(device, "Configuration error.\n");
2653 		break;
2654 	case ASB_DISCONNECT:
2655 		break;
2656 	case ASB_DISCARD_YOUNGER_PRI:
2657 		if (self == 0 && peer == 1) {
2658 			rv = -1;
2659 			break;
2660 		}
2661 		if (self == 1 && peer == 0) {
2662 			rv =  1;
2663 			break;
2664 		}
2665 		/* Else fall through to one of the other strategies... */
2666 	case ASB_DISCARD_OLDER_PRI:
2667 		if (self == 0 && peer == 1) {
2668 			rv = 1;
2669 			break;
2670 		}
2671 		if (self == 1 && peer == 0) {
2672 			rv = -1;
2673 			break;
2674 		}
2675 		/* Else fall through to one of the other strategies... */
2676 		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2677 		     "Using discard-least-changes instead\n");
2678 	case ASB_DISCARD_ZERO_CHG:
2679 		if (ch_peer == 0 && ch_self == 0) {
2680 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2681 				? -1 : 1;
2682 			break;
2683 		} else {
2684 			if (ch_peer == 0) { rv =  1; break; }
2685 			if (ch_self == 0) { rv = -1; break; }
2686 		}
2687 		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2688 			break;
2689 	case ASB_DISCARD_LEAST_CHG:
2690 		if	(ch_self < ch_peer)
2691 			rv = -1;
2692 		else if (ch_self > ch_peer)
2693 			rv =  1;
2694 		else /* ( ch_self == ch_peer ) */
2695 		     /* Well, then use something else. */
2696 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2697 				? -1 : 1;
2698 		break;
2699 	case ASB_DISCARD_LOCAL:
2700 		rv = -1;
2701 		break;
2702 	case ASB_DISCARD_REMOTE:
2703 		rv =  1;
2704 	}
2705 
2706 	return rv;
2707 }
2708 
2709 /**
2710  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2711  */
2712 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2713 {
2714 	struct drbd_device *device = peer_device->device;
2715 	int hg, rv = -100;
2716 	enum drbd_after_sb_p after_sb_1p;
2717 
2718 	rcu_read_lock();
2719 	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2720 	rcu_read_unlock();
2721 	switch (after_sb_1p) {
2722 	case ASB_DISCARD_YOUNGER_PRI:
2723 	case ASB_DISCARD_OLDER_PRI:
2724 	case ASB_DISCARD_LEAST_CHG:
2725 	case ASB_DISCARD_LOCAL:
2726 	case ASB_DISCARD_REMOTE:
2727 	case ASB_DISCARD_ZERO_CHG:
2728 		drbd_err(device, "Configuration error.\n");
2729 		break;
2730 	case ASB_DISCONNECT:
2731 		break;
2732 	case ASB_CONSENSUS:
2733 		hg = drbd_asb_recover_0p(peer_device);
2734 		if (hg == -1 && device->state.role == R_SECONDARY)
2735 			rv = hg;
2736 		if (hg == 1  && device->state.role == R_PRIMARY)
2737 			rv = hg;
2738 		break;
2739 	case ASB_VIOLENTLY:
2740 		rv = drbd_asb_recover_0p(peer_device);
2741 		break;
2742 	case ASB_DISCARD_SECONDARY:
2743 		return device->state.role == R_PRIMARY ? 1 : -1;
2744 	case ASB_CALL_HELPER:
2745 		hg = drbd_asb_recover_0p(peer_device);
2746 		if (hg == -1 && device->state.role == R_PRIMARY) {
2747 			enum drbd_state_rv rv2;
2748 
2749 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2750 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2751 			  * we do not need to wait for the after state change work either. */
2752 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2753 			if (rv2 != SS_SUCCESS) {
2754 				drbd_khelper(device, "pri-lost-after-sb");
2755 			} else {
2756 				drbd_warn(device, "Successfully gave up primary role.\n");
2757 				rv = hg;
2758 			}
2759 		} else
2760 			rv = hg;
2761 	}
2762 
2763 	return rv;
2764 }
2765 
2766 /**
2767  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2768  */
2769 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2770 {
2771 	struct drbd_device *device = peer_device->device;
2772 	int hg, rv = -100;
2773 	enum drbd_after_sb_p after_sb_2p;
2774 
2775 	rcu_read_lock();
2776 	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2777 	rcu_read_unlock();
2778 	switch (after_sb_2p) {
2779 	case ASB_DISCARD_YOUNGER_PRI:
2780 	case ASB_DISCARD_OLDER_PRI:
2781 	case ASB_DISCARD_LEAST_CHG:
2782 	case ASB_DISCARD_LOCAL:
2783 	case ASB_DISCARD_REMOTE:
2784 	case ASB_CONSENSUS:
2785 	case ASB_DISCARD_SECONDARY:
2786 	case ASB_DISCARD_ZERO_CHG:
2787 		drbd_err(device, "Configuration error.\n");
2788 		break;
2789 	case ASB_VIOLENTLY:
2790 		rv = drbd_asb_recover_0p(peer_device);
2791 		break;
2792 	case ASB_DISCONNECT:
2793 		break;
2794 	case ASB_CALL_HELPER:
2795 		hg = drbd_asb_recover_0p(peer_device);
2796 		if (hg == -1) {
2797 			enum drbd_state_rv rv2;
2798 
2799 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2800 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2801 			  * we do not need to wait for the after state change work either. */
2802 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2803 			if (rv2 != SS_SUCCESS) {
2804 				drbd_khelper(device, "pri-lost-after-sb");
2805 			} else {
2806 				drbd_warn(device, "Successfully gave up primary role.\n");
2807 				rv = hg;
2808 			}
2809 		} else
2810 			rv = hg;
2811 	}
2812 
2813 	return rv;
2814 }
2815 
2816 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2817 			   u64 bits, u64 flags)
2818 {
2819 	if (!uuid) {
2820 		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2821 		return;
2822 	}
2823 	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2824 	     text,
2825 	     (unsigned long long)uuid[UI_CURRENT],
2826 	     (unsigned long long)uuid[UI_BITMAP],
2827 	     (unsigned long long)uuid[UI_HISTORY_START],
2828 	     (unsigned long long)uuid[UI_HISTORY_END],
2829 	     (unsigned long long)bits,
2830 	     (unsigned long long)flags);
2831 }
2832 
2833 /*
2834   100	after split brain try auto recover
2835     2	C_SYNC_SOURCE set BitMap
2836     1	C_SYNC_SOURCE use BitMap
2837     0	no Sync
2838    -1	C_SYNC_TARGET use BitMap
2839    -2	C_SYNC_TARGET set BitMap
2840  -100	after split brain, disconnect
2841 -1000	unrelated data
2842 -1091   requires proto 91
2843 -1096   requires proto 96
2844  */
2845 static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
2846 {
2847 	u64 self, peer;
2848 	int i, j;
2849 
2850 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2851 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2852 
2853 	*rule_nr = 10;
2854 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2855 		return 0;
2856 
2857 	*rule_nr = 20;
2858 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2859 	     peer != UUID_JUST_CREATED)
2860 		return -2;
2861 
2862 	*rule_nr = 30;
2863 	if (self != UUID_JUST_CREATED &&
2864 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2865 		return 2;
2866 
2867 	if (self == peer) {
2868 		int rct, dc; /* roles at crash time */
2869 
2870 		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2871 
2872 			if (first_peer_device(device)->connection->agreed_pro_version < 91)
2873 				return -1091;
2874 
2875 			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2876 			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2877 				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2878 				drbd_uuid_move_history(device);
2879 				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2880 				device->ldev->md.uuid[UI_BITMAP] = 0;
2881 
2882 				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2883 					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2884 				*rule_nr = 34;
2885 			} else {
2886 				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2887 				*rule_nr = 36;
2888 			}
2889 
2890 			return 1;
2891 		}
2892 
2893 		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2894 
2895 			if (first_peer_device(device)->connection->agreed_pro_version < 91)
2896 				return -1091;
2897 
2898 			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2899 			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2900 				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2901 
2902 				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2903 				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2904 				device->p_uuid[UI_BITMAP] = 0UL;
2905 
2906 				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2907 				*rule_nr = 35;
2908 			} else {
2909 				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
2910 				*rule_nr = 37;
2911 			}
2912 
2913 			return -1;
2914 		}
2915 
2916 		/* Common power [off|failure] */
2917 		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
2918 			(device->p_uuid[UI_FLAGS] & 2);
2919 		/* lowest bit is set when we were primary,
2920 		 * next bit (weight 2) is set when peer was primary */
2921 		*rule_nr = 40;
2922 
2923 		switch (rct) {
2924 		case 0: /* !self_pri && !peer_pri */ return 0;
2925 		case 1: /*  self_pri && !peer_pri */ return 1;
2926 		case 2: /* !self_pri &&  peer_pri */ return -1;
2927 		case 3: /*  self_pri &&  peer_pri */
2928 			dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
2929 			return dc ? -1 : 1;
2930 		}
2931 	}
2932 
2933 	*rule_nr = 50;
2934 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2935 	if (self == peer)
2936 		return -1;
2937 
2938 	*rule_nr = 51;
2939 	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
2940 	if (self == peer) {
2941 		if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2942 		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2943 		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2944 		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
2945 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2946 			   resync as sync source modifications of the peer's UUIDs. */
2947 
2948 			if (first_peer_device(device)->connection->agreed_pro_version < 91)
2949 				return -1091;
2950 
2951 			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
2952 			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
2953 
2954 			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
2955 			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2956 
2957 			return -1;
2958 		}
2959 	}
2960 
2961 	*rule_nr = 60;
2962 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2963 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2964 		peer = device->p_uuid[i] & ~((u64)1);
2965 		if (self == peer)
2966 			return -2;
2967 	}
2968 
2969 	*rule_nr = 70;
2970 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2971 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2972 	if (self == peer)
2973 		return 1;
2974 
2975 	*rule_nr = 71;
2976 	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2977 	if (self == peer) {
2978 		if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2979 		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2980 		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2981 		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2982 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2983 			   resync as sync source modifications of our UUIDs. */
2984 
2985 			if (first_peer_device(device)->connection->agreed_pro_version < 91)
2986 				return -1091;
2987 
2988 			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
2989 			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
2990 
2991 			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
2992 			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2993 				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2994 
2995 			return 1;
2996 		}
2997 	}
2998 
2999 
3000 	*rule_nr = 80;
3001 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3002 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3003 		self = device->ldev->md.uuid[i] & ~((u64)1);
3004 		if (self == peer)
3005 			return 2;
3006 	}
3007 
3008 	*rule_nr = 90;
3009 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3010 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3011 	if (self == peer && self != ((u64)0))
3012 		return 100;
3013 
3014 	*rule_nr = 100;
3015 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3016 		self = device->ldev->md.uuid[i] & ~((u64)1);
3017 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3018 			peer = device->p_uuid[j] & ~((u64)1);
3019 			if (self == peer)
3020 				return -100;
3021 		}
3022 	}
3023 
3024 	return -1000;
3025 }
3026 
3027 /* drbd_sync_handshake() returns the new conn state on success, or
3028    CONN_MASK (-1) on failure.
3029  */
3030 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3031 					   enum drbd_role peer_role,
3032 					   enum drbd_disk_state peer_disk) __must_hold(local)
3033 {
3034 	struct drbd_device *device = peer_device->device;
3035 	enum drbd_conns rv = C_MASK;
3036 	enum drbd_disk_state mydisk;
3037 	struct net_conf *nc;
3038 	int hg, rule_nr, rr_conflict, tentative;
3039 
3040 	mydisk = device->state.disk;
3041 	if (mydisk == D_NEGOTIATING)
3042 		mydisk = device->new_state_tmp.disk;
3043 
3044 	drbd_info(device, "drbd_sync_handshake:\n");
3045 
3046 	spin_lock_irq(&device->ldev->md.uuid_lock);
3047 	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3048 	drbd_uuid_dump(device, "peer", device->p_uuid,
3049 		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3050 
3051 	hg = drbd_uuid_compare(device, &rule_nr);
3052 	spin_unlock_irq(&device->ldev->md.uuid_lock);
3053 
3054 	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3055 
3056 	if (hg == -1000) {
3057 		drbd_alert(device, "Unrelated data, aborting!\n");
3058 		return C_MASK;
3059 	}
3060 	if (hg < -1000) {
3061 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3062 		return C_MASK;
3063 	}
3064 
3065 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3066 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3067 		int f = (hg == -100) || abs(hg) == 2;
3068 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3069 		if (f)
3070 			hg = hg*2;
3071 		drbd_info(device, "Becoming sync %s due to disk states.\n",
3072 		     hg > 0 ? "source" : "target");
3073 	}
3074 
3075 	if (abs(hg) == 100)
3076 		drbd_khelper(device, "initial-split-brain");
3077 
3078 	rcu_read_lock();
3079 	nc = rcu_dereference(peer_device->connection->net_conf);
3080 
3081 	if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3082 		int pcount = (device->state.role == R_PRIMARY)
3083 			   + (peer_role == R_PRIMARY);
3084 		int forced = (hg == -100);
3085 
3086 		switch (pcount) {
3087 		case 0:
3088 			hg = drbd_asb_recover_0p(peer_device);
3089 			break;
3090 		case 1:
3091 			hg = drbd_asb_recover_1p(peer_device);
3092 			break;
3093 		case 2:
3094 			hg = drbd_asb_recover_2p(peer_device);
3095 			break;
3096 		}
3097 		if (abs(hg) < 100) {
3098 			drbd_warn(device, "Split-Brain detected, %d primaries, "
3099 			     "automatically solved. Sync from %s node\n",
3100 			     pcount, (hg < 0) ? "peer" : "this");
3101 			if (forced) {
3102 				drbd_warn(device, "Doing a full sync, since"
3103 				     " UUIDs where ambiguous.\n");
3104 				hg = hg*2;
3105 			}
3106 		}
3107 	}
3108 
3109 	if (hg == -100) {
3110 		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3111 			hg = -1;
3112 		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3113 			hg = 1;
3114 
3115 		if (abs(hg) < 100)
3116 			drbd_warn(device, "Split-Brain detected, manually solved. "
3117 			     "Sync from %s node\n",
3118 			     (hg < 0) ? "peer" : "this");
3119 	}
3120 	rr_conflict = nc->rr_conflict;
3121 	tentative = nc->tentative;
3122 	rcu_read_unlock();
3123 
3124 	if (hg == -100) {
3125 		/* FIXME this log message is not correct if we end up here
3126 		 * after an attempted attach on a diskless node.
3127 		 * We just refuse to attach -- well, we drop the "connection"
3128 		 * to that disk, in a way... */
3129 		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3130 		drbd_khelper(device, "split-brain");
3131 		return C_MASK;
3132 	}
3133 
3134 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3135 		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3136 		return C_MASK;
3137 	}
3138 
3139 	if (hg < 0 && /* by intention we do not use mydisk here. */
3140 	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3141 		switch (rr_conflict) {
3142 		case ASB_CALL_HELPER:
3143 			drbd_khelper(device, "pri-lost");
3144 			/* fall through */
3145 		case ASB_DISCONNECT:
3146 			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3147 			return C_MASK;
3148 		case ASB_VIOLENTLY:
3149 			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3150 			     "assumption\n");
3151 		}
3152 	}
3153 
3154 	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3155 		if (hg == 0)
3156 			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3157 		else
3158 			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3159 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3160 				 abs(hg) >= 2 ? "full" : "bit-map based");
3161 		return C_MASK;
3162 	}
3163 
3164 	if (abs(hg) >= 2) {
3165 		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3166 		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3167 					BM_LOCKED_SET_ALLOWED))
3168 			return C_MASK;
3169 	}
3170 
3171 	if (hg > 0) { /* become sync source. */
3172 		rv = C_WF_BITMAP_S;
3173 	} else if (hg < 0) { /* become sync target */
3174 		rv = C_WF_BITMAP_T;
3175 	} else {
3176 		rv = C_CONNECTED;
3177 		if (drbd_bm_total_weight(device)) {
3178 			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3179 			     drbd_bm_total_weight(device));
3180 		}
3181 	}
3182 
3183 	return rv;
3184 }
3185 
3186 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3187 {
3188 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3189 	if (peer == ASB_DISCARD_REMOTE)
3190 		return ASB_DISCARD_LOCAL;
3191 
3192 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3193 	if (peer == ASB_DISCARD_LOCAL)
3194 		return ASB_DISCARD_REMOTE;
3195 
3196 	/* everything else is valid if they are equal on both sides. */
3197 	return peer;
3198 }
3199 
3200 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3201 {
3202 	struct p_protocol *p = pi->data;
3203 	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3204 	int p_proto, p_discard_my_data, p_two_primaries, cf;
3205 	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3206 	char integrity_alg[SHARED_SECRET_MAX] = "";
3207 	struct crypto_hash *peer_integrity_tfm = NULL;
3208 	void *int_dig_in = NULL, *int_dig_vv = NULL;
3209 
3210 	p_proto		= be32_to_cpu(p->protocol);
3211 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3212 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3213 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3214 	p_two_primaries = be32_to_cpu(p->two_primaries);
3215 	cf		= be32_to_cpu(p->conn_flags);
3216 	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3217 
3218 	if (connection->agreed_pro_version >= 87) {
3219 		int err;
3220 
3221 		if (pi->size > sizeof(integrity_alg))
3222 			return -EIO;
3223 		err = drbd_recv_all(connection, integrity_alg, pi->size);
3224 		if (err)
3225 			return err;
3226 		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3227 	}
3228 
3229 	if (pi->cmd != P_PROTOCOL_UPDATE) {
3230 		clear_bit(CONN_DRY_RUN, &connection->flags);
3231 
3232 		if (cf & CF_DRY_RUN)
3233 			set_bit(CONN_DRY_RUN, &connection->flags);
3234 
3235 		rcu_read_lock();
3236 		nc = rcu_dereference(connection->net_conf);
3237 
3238 		if (p_proto != nc->wire_protocol) {
3239 			drbd_err(connection, "incompatible %s settings\n", "protocol");
3240 			goto disconnect_rcu_unlock;
3241 		}
3242 
3243 		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3244 			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3245 			goto disconnect_rcu_unlock;
3246 		}
3247 
3248 		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3249 			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3250 			goto disconnect_rcu_unlock;
3251 		}
3252 
3253 		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3254 			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3255 			goto disconnect_rcu_unlock;
3256 		}
3257 
3258 		if (p_discard_my_data && nc->discard_my_data) {
3259 			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3260 			goto disconnect_rcu_unlock;
3261 		}
3262 
3263 		if (p_two_primaries != nc->two_primaries) {
3264 			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3265 			goto disconnect_rcu_unlock;
3266 		}
3267 
3268 		if (strcmp(integrity_alg, nc->integrity_alg)) {
3269 			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3270 			goto disconnect_rcu_unlock;
3271 		}
3272 
3273 		rcu_read_unlock();
3274 	}
3275 
3276 	if (integrity_alg[0]) {
3277 		int hash_size;
3278 
3279 		/*
3280 		 * We can only change the peer data integrity algorithm
3281 		 * here.  Changing our own data integrity algorithm
3282 		 * requires that we send a P_PROTOCOL_UPDATE packet at
3283 		 * the same time; otherwise, the peer has no way to
3284 		 * tell between which packets the algorithm should
3285 		 * change.
3286 		 */
3287 
3288 		peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3289 		if (!peer_integrity_tfm) {
3290 			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3291 				 integrity_alg);
3292 			goto disconnect;
3293 		}
3294 
3295 		hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3296 		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3297 		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3298 		if (!(int_dig_in && int_dig_vv)) {
3299 			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3300 			goto disconnect;
3301 		}
3302 	}
3303 
3304 	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3305 	if (!new_net_conf) {
3306 		drbd_err(connection, "Allocation of new net_conf failed\n");
3307 		goto disconnect;
3308 	}
3309 
3310 	mutex_lock(&connection->data.mutex);
3311 	mutex_lock(&connection->resource->conf_update);
3312 	old_net_conf = connection->net_conf;
3313 	*new_net_conf = *old_net_conf;
3314 
3315 	new_net_conf->wire_protocol = p_proto;
3316 	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3317 	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3318 	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3319 	new_net_conf->two_primaries = p_two_primaries;
3320 
3321 	rcu_assign_pointer(connection->net_conf, new_net_conf);
3322 	mutex_unlock(&connection->resource->conf_update);
3323 	mutex_unlock(&connection->data.mutex);
3324 
3325 	crypto_free_hash(connection->peer_integrity_tfm);
3326 	kfree(connection->int_dig_in);
3327 	kfree(connection->int_dig_vv);
3328 	connection->peer_integrity_tfm = peer_integrity_tfm;
3329 	connection->int_dig_in = int_dig_in;
3330 	connection->int_dig_vv = int_dig_vv;
3331 
3332 	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3333 		drbd_info(connection, "peer data-integrity-alg: %s\n",
3334 			  integrity_alg[0] ? integrity_alg : "(none)");
3335 
3336 	synchronize_rcu();
3337 	kfree(old_net_conf);
3338 	return 0;
3339 
3340 disconnect_rcu_unlock:
3341 	rcu_read_unlock();
3342 disconnect:
3343 	crypto_free_hash(peer_integrity_tfm);
3344 	kfree(int_dig_in);
3345 	kfree(int_dig_vv);
3346 	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3347 	return -EIO;
3348 }
3349 
3350 /* helper function
3351  * input: alg name, feature name
3352  * return: NULL (alg name was "")
3353  *         ERR_PTR(error) if something goes wrong
3354  *         or the crypto hash ptr, if it worked out ok. */
3355 static
3356 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3357 		const char *alg, const char *name)
3358 {
3359 	struct crypto_hash *tfm;
3360 
3361 	if (!alg[0])
3362 		return NULL;
3363 
3364 	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3365 	if (IS_ERR(tfm)) {
3366 		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3367 			alg, name, PTR_ERR(tfm));
3368 		return tfm;
3369 	}
3370 	return tfm;
3371 }
3372 
3373 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3374 {
3375 	void *buffer = connection->data.rbuf;
3376 	int size = pi->size;
3377 
3378 	while (size) {
3379 		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3380 		s = drbd_recv(connection, buffer, s);
3381 		if (s <= 0) {
3382 			if (s < 0)
3383 				return s;
3384 			break;
3385 		}
3386 		size -= s;
3387 	}
3388 	if (size)
3389 		return -EIO;
3390 	return 0;
3391 }
3392 
3393 /*
3394  * config_unknown_volume  -  device configuration command for unknown volume
3395  *
3396  * When a device is added to an existing connection, the node on which the
3397  * device is added first will send configuration commands to its peer but the
3398  * peer will not know about the device yet.  It will warn and ignore these
3399  * commands.  Once the device is added on the second node, the second node will
3400  * send the same device configuration commands, but in the other direction.
3401  *
3402  * (We can also end up here if drbd is misconfigured.)
3403  */
3404 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3405 {
3406 	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3407 		  cmdname(pi->cmd), pi->vnr);
3408 	return ignore_remaining_packet(connection, pi);
3409 }
3410 
3411 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3412 {
3413 	struct drbd_peer_device *peer_device;
3414 	struct drbd_device *device;
3415 	struct p_rs_param_95 *p;
3416 	unsigned int header_size, data_size, exp_max_sz;
3417 	struct crypto_hash *verify_tfm = NULL;
3418 	struct crypto_hash *csums_tfm = NULL;
3419 	struct net_conf *old_net_conf, *new_net_conf = NULL;
3420 	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3421 	const int apv = connection->agreed_pro_version;
3422 	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3423 	int fifo_size = 0;
3424 	int err;
3425 
3426 	peer_device = conn_peer_device(connection, pi->vnr);
3427 	if (!peer_device)
3428 		return config_unknown_volume(connection, pi);
3429 	device = peer_device->device;
3430 
3431 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3432 		    : apv == 88 ? sizeof(struct p_rs_param)
3433 					+ SHARED_SECRET_MAX
3434 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3435 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3436 
3437 	if (pi->size > exp_max_sz) {
3438 		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3439 		    pi->size, exp_max_sz);
3440 		return -EIO;
3441 	}
3442 
3443 	if (apv <= 88) {
3444 		header_size = sizeof(struct p_rs_param);
3445 		data_size = pi->size - header_size;
3446 	} else if (apv <= 94) {
3447 		header_size = sizeof(struct p_rs_param_89);
3448 		data_size = pi->size - header_size;
3449 		D_ASSERT(device, data_size == 0);
3450 	} else {
3451 		header_size = sizeof(struct p_rs_param_95);
3452 		data_size = pi->size - header_size;
3453 		D_ASSERT(device, data_size == 0);
3454 	}
3455 
3456 	/* initialize verify_alg and csums_alg */
3457 	p = pi->data;
3458 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3459 
3460 	err = drbd_recv_all(peer_device->connection, p, header_size);
3461 	if (err)
3462 		return err;
3463 
3464 	mutex_lock(&connection->resource->conf_update);
3465 	old_net_conf = peer_device->connection->net_conf;
3466 	if (get_ldev(device)) {
3467 		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3468 		if (!new_disk_conf) {
3469 			put_ldev(device);
3470 			mutex_unlock(&connection->resource->conf_update);
3471 			drbd_err(device, "Allocation of new disk_conf failed\n");
3472 			return -ENOMEM;
3473 		}
3474 
3475 		old_disk_conf = device->ldev->disk_conf;
3476 		*new_disk_conf = *old_disk_conf;
3477 
3478 		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3479 	}
3480 
3481 	if (apv >= 88) {
3482 		if (apv == 88) {
3483 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3484 				drbd_err(device, "verify-alg of wrong size, "
3485 					"peer wants %u, accepting only up to %u byte\n",
3486 					data_size, SHARED_SECRET_MAX);
3487 				err = -EIO;
3488 				goto reconnect;
3489 			}
3490 
3491 			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3492 			if (err)
3493 				goto reconnect;
3494 			/* we expect NUL terminated string */
3495 			/* but just in case someone tries to be evil */
3496 			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3497 			p->verify_alg[data_size-1] = 0;
3498 
3499 		} else /* apv >= 89 */ {
3500 			/* we still expect NUL terminated strings */
3501 			/* but just in case someone tries to be evil */
3502 			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3503 			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3504 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3505 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3506 		}
3507 
3508 		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3509 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3510 				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3511 				    old_net_conf->verify_alg, p->verify_alg);
3512 				goto disconnect;
3513 			}
3514 			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3515 					p->verify_alg, "verify-alg");
3516 			if (IS_ERR(verify_tfm)) {
3517 				verify_tfm = NULL;
3518 				goto disconnect;
3519 			}
3520 		}
3521 
3522 		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3523 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3524 				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3525 				    old_net_conf->csums_alg, p->csums_alg);
3526 				goto disconnect;
3527 			}
3528 			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3529 					p->csums_alg, "csums-alg");
3530 			if (IS_ERR(csums_tfm)) {
3531 				csums_tfm = NULL;
3532 				goto disconnect;
3533 			}
3534 		}
3535 
3536 		if (apv > 94 && new_disk_conf) {
3537 			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3538 			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3539 			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3540 			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3541 
3542 			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3543 			if (fifo_size != device->rs_plan_s->size) {
3544 				new_plan = fifo_alloc(fifo_size);
3545 				if (!new_plan) {
3546 					drbd_err(device, "kmalloc of fifo_buffer failed");
3547 					put_ldev(device);
3548 					goto disconnect;
3549 				}
3550 			}
3551 		}
3552 
3553 		if (verify_tfm || csums_tfm) {
3554 			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3555 			if (!new_net_conf) {
3556 				drbd_err(device, "Allocation of new net_conf failed\n");
3557 				goto disconnect;
3558 			}
3559 
3560 			*new_net_conf = *old_net_conf;
3561 
3562 			if (verify_tfm) {
3563 				strcpy(new_net_conf->verify_alg, p->verify_alg);
3564 				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3565 				crypto_free_hash(peer_device->connection->verify_tfm);
3566 				peer_device->connection->verify_tfm = verify_tfm;
3567 				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3568 			}
3569 			if (csums_tfm) {
3570 				strcpy(new_net_conf->csums_alg, p->csums_alg);
3571 				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3572 				crypto_free_hash(peer_device->connection->csums_tfm);
3573 				peer_device->connection->csums_tfm = csums_tfm;
3574 				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3575 			}
3576 			rcu_assign_pointer(connection->net_conf, new_net_conf);
3577 		}
3578 	}
3579 
3580 	if (new_disk_conf) {
3581 		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3582 		put_ldev(device);
3583 	}
3584 
3585 	if (new_plan) {
3586 		old_plan = device->rs_plan_s;
3587 		rcu_assign_pointer(device->rs_plan_s, new_plan);
3588 	}
3589 
3590 	mutex_unlock(&connection->resource->conf_update);
3591 	synchronize_rcu();
3592 	if (new_net_conf)
3593 		kfree(old_net_conf);
3594 	kfree(old_disk_conf);
3595 	kfree(old_plan);
3596 
3597 	return 0;
3598 
3599 reconnect:
3600 	if (new_disk_conf) {
3601 		put_ldev(device);
3602 		kfree(new_disk_conf);
3603 	}
3604 	mutex_unlock(&connection->resource->conf_update);
3605 	return -EIO;
3606 
3607 disconnect:
3608 	kfree(new_plan);
3609 	if (new_disk_conf) {
3610 		put_ldev(device);
3611 		kfree(new_disk_conf);
3612 	}
3613 	mutex_unlock(&connection->resource->conf_update);
3614 	/* just for completeness: actually not needed,
3615 	 * as this is not reached if csums_tfm was ok. */
3616 	crypto_free_hash(csums_tfm);
3617 	/* but free the verify_tfm again, if csums_tfm did not work out */
3618 	crypto_free_hash(verify_tfm);
3619 	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3620 	return -EIO;
3621 }
3622 
3623 /* warn if the arguments differ by more than 12.5% */
3624 static void warn_if_differ_considerably(struct drbd_device *device,
3625 	const char *s, sector_t a, sector_t b)
3626 {
3627 	sector_t d;
3628 	if (a == 0 || b == 0)
3629 		return;
3630 	d = (a > b) ? (a - b) : (b - a);
3631 	if (d > (a>>3) || d > (b>>3))
3632 		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3633 		     (unsigned long long)a, (unsigned long long)b);
3634 }
3635 
3636 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3637 {
3638 	struct drbd_peer_device *peer_device;
3639 	struct drbd_device *device;
3640 	struct p_sizes *p = pi->data;
3641 	enum determine_dev_size dd = DS_UNCHANGED;
3642 	sector_t p_size, p_usize, my_usize;
3643 	int ldsc = 0; /* local disk size changed */
3644 	enum dds_flags ddsf;
3645 
3646 	peer_device = conn_peer_device(connection, pi->vnr);
3647 	if (!peer_device)
3648 		return config_unknown_volume(connection, pi);
3649 	device = peer_device->device;
3650 
3651 	p_size = be64_to_cpu(p->d_size);
3652 	p_usize = be64_to_cpu(p->u_size);
3653 
3654 	/* just store the peer's disk size for now.
3655 	 * we still need to figure out whether we accept that. */
3656 	device->p_size = p_size;
3657 
3658 	if (get_ldev(device)) {
3659 		rcu_read_lock();
3660 		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3661 		rcu_read_unlock();
3662 
3663 		warn_if_differ_considerably(device, "lower level device sizes",
3664 			   p_size, drbd_get_max_capacity(device->ldev));
3665 		warn_if_differ_considerably(device, "user requested size",
3666 					    p_usize, my_usize);
3667 
3668 		/* if this is the first connect, or an otherwise expected
3669 		 * param exchange, choose the minimum */
3670 		if (device->state.conn == C_WF_REPORT_PARAMS)
3671 			p_usize = min_not_zero(my_usize, p_usize);
3672 
3673 		/* Never shrink a device with usable data during connect.
3674 		   But allow online shrinking if we are connected. */
3675 		if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3676 		    drbd_get_capacity(device->this_bdev) &&
3677 		    device->state.disk >= D_OUTDATED &&
3678 		    device->state.conn < C_CONNECTED) {
3679 			drbd_err(device, "The peer's disk size is too small!\n");
3680 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3681 			put_ldev(device);
3682 			return -EIO;
3683 		}
3684 
3685 		if (my_usize != p_usize) {
3686 			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3687 
3688 			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3689 			if (!new_disk_conf) {
3690 				drbd_err(device, "Allocation of new disk_conf failed\n");
3691 				put_ldev(device);
3692 				return -ENOMEM;
3693 			}
3694 
3695 			mutex_lock(&connection->resource->conf_update);
3696 			old_disk_conf = device->ldev->disk_conf;
3697 			*new_disk_conf = *old_disk_conf;
3698 			new_disk_conf->disk_size = p_usize;
3699 
3700 			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3701 			mutex_unlock(&connection->resource->conf_update);
3702 			synchronize_rcu();
3703 			kfree(old_disk_conf);
3704 
3705 			drbd_info(device, "Peer sets u_size to %lu sectors\n",
3706 				 (unsigned long)my_usize);
3707 		}
3708 
3709 		put_ldev(device);
3710 	}
3711 
3712 	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3713 	drbd_reconsider_max_bio_size(device);
3714 	/* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3715 	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3716 	   drbd_reconsider_max_bio_size(), we can be sure that after
3717 	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3718 
3719 	ddsf = be16_to_cpu(p->dds_flags);
3720 	if (get_ldev(device)) {
3721 		dd = drbd_determine_dev_size(device, ddsf, NULL);
3722 		put_ldev(device);
3723 		if (dd == DS_ERROR)
3724 			return -EIO;
3725 		drbd_md_sync(device);
3726 	} else {
3727 		/* I am diskless, need to accept the peer's size. */
3728 		drbd_set_my_capacity(device, p_size);
3729 	}
3730 
3731 	if (get_ldev(device)) {
3732 		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3733 			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3734 			ldsc = 1;
3735 		}
3736 
3737 		put_ldev(device);
3738 	}
3739 
3740 	if (device->state.conn > C_WF_REPORT_PARAMS) {
3741 		if (be64_to_cpu(p->c_size) !=
3742 		    drbd_get_capacity(device->this_bdev) || ldsc) {
3743 			/* we have different sizes, probably peer
3744 			 * needs to know my new size... */
3745 			drbd_send_sizes(peer_device, 0, ddsf);
3746 		}
3747 		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3748 		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3749 			if (device->state.pdsk >= D_INCONSISTENT &&
3750 			    device->state.disk >= D_INCONSISTENT) {
3751 				if (ddsf & DDSF_NO_RESYNC)
3752 					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3753 				else
3754 					resync_after_online_grow(device);
3755 			} else
3756 				set_bit(RESYNC_AFTER_NEG, &device->flags);
3757 		}
3758 	}
3759 
3760 	return 0;
3761 }
3762 
3763 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3764 {
3765 	struct drbd_peer_device *peer_device;
3766 	struct drbd_device *device;
3767 	struct p_uuids *p = pi->data;
3768 	u64 *p_uuid;
3769 	int i, updated_uuids = 0;
3770 
3771 	peer_device = conn_peer_device(connection, pi->vnr);
3772 	if (!peer_device)
3773 		return config_unknown_volume(connection, pi);
3774 	device = peer_device->device;
3775 
3776 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3777 	if (!p_uuid) {
3778 		drbd_err(device, "kmalloc of p_uuid failed\n");
3779 		return false;
3780 	}
3781 
3782 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3783 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
3784 
3785 	kfree(device->p_uuid);
3786 	device->p_uuid = p_uuid;
3787 
3788 	if (device->state.conn < C_CONNECTED &&
3789 	    device->state.disk < D_INCONSISTENT &&
3790 	    device->state.role == R_PRIMARY &&
3791 	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3792 		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3793 		    (unsigned long long)device->ed_uuid);
3794 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3795 		return -EIO;
3796 	}
3797 
3798 	if (get_ldev(device)) {
3799 		int skip_initial_sync =
3800 			device->state.conn == C_CONNECTED &&
3801 			peer_device->connection->agreed_pro_version >= 90 &&
3802 			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3803 			(p_uuid[UI_FLAGS] & 8);
3804 		if (skip_initial_sync) {
3805 			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3806 			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3807 					"clear_n_write from receive_uuids",
3808 					BM_LOCKED_TEST_ALLOWED);
3809 			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3810 			_drbd_uuid_set(device, UI_BITMAP, 0);
3811 			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3812 					CS_VERBOSE, NULL);
3813 			drbd_md_sync(device);
3814 			updated_uuids = 1;
3815 		}
3816 		put_ldev(device);
3817 	} else if (device->state.disk < D_INCONSISTENT &&
3818 		   device->state.role == R_PRIMARY) {
3819 		/* I am a diskless primary, the peer just created a new current UUID
3820 		   for me. */
3821 		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3822 	}
3823 
3824 	/* Before we test for the disk state, we should wait until an eventually
3825 	   ongoing cluster wide state change is finished. That is important if
3826 	   we are primary and are detaching from our disk. We need to see the
3827 	   new disk state... */
3828 	mutex_lock(device->state_mutex);
3829 	mutex_unlock(device->state_mutex);
3830 	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3831 		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3832 
3833 	if (updated_uuids)
3834 		drbd_print_uuids(device, "receiver updated UUIDs to");
3835 
3836 	return 0;
3837 }
3838 
3839 /**
3840  * convert_state() - Converts the peer's view of the cluster state to our point of view
3841  * @ps:		The state as seen by the peer.
3842  */
3843 static union drbd_state convert_state(union drbd_state ps)
3844 {
3845 	union drbd_state ms;
3846 
3847 	static enum drbd_conns c_tab[] = {
3848 		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3849 		[C_CONNECTED] = C_CONNECTED,
3850 
3851 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3852 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3853 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3854 		[C_VERIFY_S]       = C_VERIFY_T,
3855 		[C_MASK]   = C_MASK,
3856 	};
3857 
3858 	ms.i = ps.i;
3859 
3860 	ms.conn = c_tab[ps.conn];
3861 	ms.peer = ps.role;
3862 	ms.role = ps.peer;
3863 	ms.pdsk = ps.disk;
3864 	ms.disk = ps.pdsk;
3865 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3866 
3867 	return ms;
3868 }
3869 
3870 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3871 {
3872 	struct drbd_peer_device *peer_device;
3873 	struct drbd_device *device;
3874 	struct p_req_state *p = pi->data;
3875 	union drbd_state mask, val;
3876 	enum drbd_state_rv rv;
3877 
3878 	peer_device = conn_peer_device(connection, pi->vnr);
3879 	if (!peer_device)
3880 		return -EIO;
3881 	device = peer_device->device;
3882 
3883 	mask.i = be32_to_cpu(p->mask);
3884 	val.i = be32_to_cpu(p->val);
3885 
3886 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3887 	    mutex_is_locked(device->state_mutex)) {
3888 		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3889 		return 0;
3890 	}
3891 
3892 	mask = convert_state(mask);
3893 	val = convert_state(val);
3894 
3895 	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3896 	drbd_send_sr_reply(peer_device, rv);
3897 
3898 	drbd_md_sync(device);
3899 
3900 	return 0;
3901 }
3902 
3903 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
3904 {
3905 	struct p_req_state *p = pi->data;
3906 	union drbd_state mask, val;
3907 	enum drbd_state_rv rv;
3908 
3909 	mask.i = be32_to_cpu(p->mask);
3910 	val.i = be32_to_cpu(p->val);
3911 
3912 	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
3913 	    mutex_is_locked(&connection->cstate_mutex)) {
3914 		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
3915 		return 0;
3916 	}
3917 
3918 	mask = convert_state(mask);
3919 	val = convert_state(val);
3920 
3921 	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3922 	conn_send_sr_reply(connection, rv);
3923 
3924 	return 0;
3925 }
3926 
3927 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
3928 {
3929 	struct drbd_peer_device *peer_device;
3930 	struct drbd_device *device;
3931 	struct p_state *p = pi->data;
3932 	union drbd_state os, ns, peer_state;
3933 	enum drbd_disk_state real_peer_disk;
3934 	enum chg_state_flags cs_flags;
3935 	int rv;
3936 
3937 	peer_device = conn_peer_device(connection, pi->vnr);
3938 	if (!peer_device)
3939 		return config_unknown_volume(connection, pi);
3940 	device = peer_device->device;
3941 
3942 	peer_state.i = be32_to_cpu(p->state);
3943 
3944 	real_peer_disk = peer_state.disk;
3945 	if (peer_state.disk == D_NEGOTIATING) {
3946 		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3947 		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3948 	}
3949 
3950 	spin_lock_irq(&device->resource->req_lock);
3951  retry:
3952 	os = ns = drbd_read_state(device);
3953 	spin_unlock_irq(&device->resource->req_lock);
3954 
3955 	/* If some other part of the code (asender thread, timeout)
3956 	 * already decided to close the connection again,
3957 	 * we must not "re-establish" it here. */
3958 	if (os.conn <= C_TEAR_DOWN)
3959 		return -ECONNRESET;
3960 
3961 	/* If this is the "end of sync" confirmation, usually the peer disk
3962 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3963 	 * set) resync started in PausedSyncT, or if the timing of pause-/
3964 	 * unpause-sync events has been "just right", the peer disk may
3965 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3966 	 */
3967 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3968 	    real_peer_disk == D_UP_TO_DATE &&
3969 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3970 		/* If we are (becoming) SyncSource, but peer is still in sync
3971 		 * preparation, ignore its uptodate-ness to avoid flapping, it
3972 		 * will change to inconsistent once the peer reaches active
3973 		 * syncing states.
3974 		 * It may have changed syncer-paused flags, however, so we
3975 		 * cannot ignore this completely. */
3976 		if (peer_state.conn > C_CONNECTED &&
3977 		    peer_state.conn < C_SYNC_SOURCE)
3978 			real_peer_disk = D_INCONSISTENT;
3979 
3980 		/* if peer_state changes to connected at the same time,
3981 		 * it explicitly notifies us that it finished resync.
3982 		 * Maybe we should finish it up, too? */
3983 		else if (os.conn >= C_SYNC_SOURCE &&
3984 			 peer_state.conn == C_CONNECTED) {
3985 			if (drbd_bm_total_weight(device) <= device->rs_failed)
3986 				drbd_resync_finished(device);
3987 			return 0;
3988 		}
3989 	}
3990 
3991 	/* explicit verify finished notification, stop sector reached. */
3992 	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3993 	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3994 		ov_out_of_sync_print(device);
3995 		drbd_resync_finished(device);
3996 		return 0;
3997 	}
3998 
3999 	/* peer says his disk is inconsistent, while we think it is uptodate,
4000 	 * and this happens while the peer still thinks we have a sync going on,
4001 	 * but we think we are already done with the sync.
4002 	 * We ignore this to avoid flapping pdsk.
4003 	 * This should not happen, if the peer is a recent version of drbd. */
4004 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4005 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4006 		real_peer_disk = D_UP_TO_DATE;
4007 
4008 	if (ns.conn == C_WF_REPORT_PARAMS)
4009 		ns.conn = C_CONNECTED;
4010 
4011 	if (peer_state.conn == C_AHEAD)
4012 		ns.conn = C_BEHIND;
4013 
4014 	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4015 	    get_ldev_if_state(device, D_NEGOTIATING)) {
4016 		int cr; /* consider resync */
4017 
4018 		/* if we established a new connection */
4019 		cr  = (os.conn < C_CONNECTED);
4020 		/* if we had an established connection
4021 		 * and one of the nodes newly attaches a disk */
4022 		cr |= (os.conn == C_CONNECTED &&
4023 		       (peer_state.disk == D_NEGOTIATING ||
4024 			os.disk == D_NEGOTIATING));
4025 		/* if we have both been inconsistent, and the peer has been
4026 		 * forced to be UpToDate with --overwrite-data */
4027 		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4028 		/* if we had been plain connected, and the admin requested to
4029 		 * start a sync by "invalidate" or "invalidate-remote" */
4030 		cr |= (os.conn == C_CONNECTED &&
4031 				(peer_state.conn >= C_STARTING_SYNC_S &&
4032 				 peer_state.conn <= C_WF_BITMAP_T));
4033 
4034 		if (cr)
4035 			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4036 
4037 		put_ldev(device);
4038 		if (ns.conn == C_MASK) {
4039 			ns.conn = C_CONNECTED;
4040 			if (device->state.disk == D_NEGOTIATING) {
4041 				drbd_force_state(device, NS(disk, D_FAILED));
4042 			} else if (peer_state.disk == D_NEGOTIATING) {
4043 				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4044 				peer_state.disk = D_DISKLESS;
4045 				real_peer_disk = D_DISKLESS;
4046 			} else {
4047 				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4048 					return -EIO;
4049 				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4050 				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4051 				return -EIO;
4052 			}
4053 		}
4054 	}
4055 
4056 	spin_lock_irq(&device->resource->req_lock);
4057 	if (os.i != drbd_read_state(device).i)
4058 		goto retry;
4059 	clear_bit(CONSIDER_RESYNC, &device->flags);
4060 	ns.peer = peer_state.role;
4061 	ns.pdsk = real_peer_disk;
4062 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4063 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4064 		ns.disk = device->new_state_tmp.disk;
4065 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4066 	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4067 	    test_bit(NEW_CUR_UUID, &device->flags)) {
4068 		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4069 		   for temporal network outages! */
4070 		spin_unlock_irq(&device->resource->req_lock);
4071 		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4072 		tl_clear(peer_device->connection);
4073 		drbd_uuid_new_current(device);
4074 		clear_bit(NEW_CUR_UUID, &device->flags);
4075 		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4076 		return -EIO;
4077 	}
4078 	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4079 	ns = drbd_read_state(device);
4080 	spin_unlock_irq(&device->resource->req_lock);
4081 
4082 	if (rv < SS_SUCCESS) {
4083 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4084 		return -EIO;
4085 	}
4086 
4087 	if (os.conn > C_WF_REPORT_PARAMS) {
4088 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4089 		    peer_state.disk != D_NEGOTIATING ) {
4090 			/* we want resync, peer has not yet decided to sync... */
4091 			/* Nowadays only used when forcing a node into primary role and
4092 			   setting its disk to UpToDate with that */
4093 			drbd_send_uuids(peer_device);
4094 			drbd_send_current_state(peer_device);
4095 		}
4096 	}
4097 
4098 	clear_bit(DISCARD_MY_DATA, &device->flags);
4099 
4100 	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4101 
4102 	return 0;
4103 }
4104 
4105 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4106 {
4107 	struct drbd_peer_device *peer_device;
4108 	struct drbd_device *device;
4109 	struct p_rs_uuid *p = pi->data;
4110 
4111 	peer_device = conn_peer_device(connection, pi->vnr);
4112 	if (!peer_device)
4113 		return -EIO;
4114 	device = peer_device->device;
4115 
4116 	wait_event(device->misc_wait,
4117 		   device->state.conn == C_WF_SYNC_UUID ||
4118 		   device->state.conn == C_BEHIND ||
4119 		   device->state.conn < C_CONNECTED ||
4120 		   device->state.disk < D_NEGOTIATING);
4121 
4122 	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4123 
4124 	/* Here the _drbd_uuid_ functions are right, current should
4125 	   _not_ be rotated into the history */
4126 	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4127 		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4128 		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4129 
4130 		drbd_print_uuids(device, "updated sync uuid");
4131 		drbd_start_resync(device, C_SYNC_TARGET);
4132 
4133 		put_ldev(device);
4134 	} else
4135 		drbd_err(device, "Ignoring SyncUUID packet!\n");
4136 
4137 	return 0;
4138 }
4139 
4140 /**
4141  * receive_bitmap_plain
4142  *
4143  * Return 0 when done, 1 when another iteration is needed, and a negative error
4144  * code upon failure.
4145  */
4146 static int
4147 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4148 		     unsigned long *p, struct bm_xfer_ctx *c)
4149 {
4150 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4151 				 drbd_header_size(peer_device->connection);
4152 	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4153 				       c->bm_words - c->word_offset);
4154 	unsigned int want = num_words * sizeof(*p);
4155 	int err;
4156 
4157 	if (want != size) {
4158 		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4159 		return -EIO;
4160 	}
4161 	if (want == 0)
4162 		return 0;
4163 	err = drbd_recv_all(peer_device->connection, p, want);
4164 	if (err)
4165 		return err;
4166 
4167 	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4168 
4169 	c->word_offset += num_words;
4170 	c->bit_offset = c->word_offset * BITS_PER_LONG;
4171 	if (c->bit_offset > c->bm_bits)
4172 		c->bit_offset = c->bm_bits;
4173 
4174 	return 1;
4175 }
4176 
4177 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4178 {
4179 	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4180 }
4181 
4182 static int dcbp_get_start(struct p_compressed_bm *p)
4183 {
4184 	return (p->encoding & 0x80) != 0;
4185 }
4186 
4187 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4188 {
4189 	return (p->encoding >> 4) & 0x7;
4190 }
4191 
4192 /**
4193  * recv_bm_rle_bits
4194  *
4195  * Return 0 when done, 1 when another iteration is needed, and a negative error
4196  * code upon failure.
4197  */
4198 static int
4199 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4200 		struct p_compressed_bm *p,
4201 		 struct bm_xfer_ctx *c,
4202 		 unsigned int len)
4203 {
4204 	struct bitstream bs;
4205 	u64 look_ahead;
4206 	u64 rl;
4207 	u64 tmp;
4208 	unsigned long s = c->bit_offset;
4209 	unsigned long e;
4210 	int toggle = dcbp_get_start(p);
4211 	int have;
4212 	int bits;
4213 
4214 	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4215 
4216 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4217 	if (bits < 0)
4218 		return -EIO;
4219 
4220 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4221 		bits = vli_decode_bits(&rl, look_ahead);
4222 		if (bits <= 0)
4223 			return -EIO;
4224 
4225 		if (toggle) {
4226 			e = s + rl -1;
4227 			if (e >= c->bm_bits) {
4228 				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4229 				return -EIO;
4230 			}
4231 			_drbd_bm_set_bits(peer_device->device, s, e);
4232 		}
4233 
4234 		if (have < bits) {
4235 			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4236 				have, bits, look_ahead,
4237 				(unsigned int)(bs.cur.b - p->code),
4238 				(unsigned int)bs.buf_len);
4239 			return -EIO;
4240 		}
4241 		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4242 		if (likely(bits < 64))
4243 			look_ahead >>= bits;
4244 		else
4245 			look_ahead = 0;
4246 		have -= bits;
4247 
4248 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4249 		if (bits < 0)
4250 			return -EIO;
4251 		look_ahead |= tmp << have;
4252 		have += bits;
4253 	}
4254 
4255 	c->bit_offset = s;
4256 	bm_xfer_ctx_bit_to_word_offset(c);
4257 
4258 	return (s != c->bm_bits);
4259 }
4260 
4261 /**
4262  * decode_bitmap_c
4263  *
4264  * Return 0 when done, 1 when another iteration is needed, and a negative error
4265  * code upon failure.
4266  */
4267 static int
4268 decode_bitmap_c(struct drbd_peer_device *peer_device,
4269 		struct p_compressed_bm *p,
4270 		struct bm_xfer_ctx *c,
4271 		unsigned int len)
4272 {
4273 	if (dcbp_get_code(p) == RLE_VLI_Bits)
4274 		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4275 
4276 	/* other variants had been implemented for evaluation,
4277 	 * but have been dropped as this one turned out to be "best"
4278 	 * during all our tests. */
4279 
4280 	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4281 	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4282 	return -EIO;
4283 }
4284 
4285 void INFO_bm_xfer_stats(struct drbd_device *device,
4286 		const char *direction, struct bm_xfer_ctx *c)
4287 {
4288 	/* what would it take to transfer it "plaintext" */
4289 	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4290 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4291 	unsigned int plain =
4292 		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4293 		c->bm_words * sizeof(unsigned long);
4294 	unsigned int total = c->bytes[0] + c->bytes[1];
4295 	unsigned int r;
4296 
4297 	/* total can not be zero. but just in case: */
4298 	if (total == 0)
4299 		return;
4300 
4301 	/* don't report if not compressed */
4302 	if (total >= plain)
4303 		return;
4304 
4305 	/* total < plain. check for overflow, still */
4306 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4307 		                    : (1000 * total / plain);
4308 
4309 	if (r > 1000)
4310 		r = 1000;
4311 
4312 	r = 1000 - r;
4313 	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4314 	     "total %u; compression: %u.%u%%\n",
4315 			direction,
4316 			c->bytes[1], c->packets[1],
4317 			c->bytes[0], c->packets[0],
4318 			total, r/10, r % 10);
4319 }
4320 
4321 /* Since we are processing the bitfield from lower addresses to higher,
4322    it does not matter if the process it in 32 bit chunks or 64 bit
4323    chunks as long as it is little endian. (Understand it as byte stream,
4324    beginning with the lowest byte...) If we would use big endian
4325    we would need to process it from the highest address to the lowest,
4326    in order to be agnostic to the 32 vs 64 bits issue.
4327 
4328    returns 0 on failure, 1 if we successfully received it. */
4329 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4330 {
4331 	struct drbd_peer_device *peer_device;
4332 	struct drbd_device *device;
4333 	struct bm_xfer_ctx c;
4334 	int err;
4335 
4336 	peer_device = conn_peer_device(connection, pi->vnr);
4337 	if (!peer_device)
4338 		return -EIO;
4339 	device = peer_device->device;
4340 
4341 	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4342 	/* you are supposed to send additional out-of-sync information
4343 	 * if you actually set bits during this phase */
4344 
4345 	c = (struct bm_xfer_ctx) {
4346 		.bm_bits = drbd_bm_bits(device),
4347 		.bm_words = drbd_bm_words(device),
4348 	};
4349 
4350 	for(;;) {
4351 		if (pi->cmd == P_BITMAP)
4352 			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4353 		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4354 			/* MAYBE: sanity check that we speak proto >= 90,
4355 			 * and the feature is enabled! */
4356 			struct p_compressed_bm *p = pi->data;
4357 
4358 			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4359 				drbd_err(device, "ReportCBitmap packet too large\n");
4360 				err = -EIO;
4361 				goto out;
4362 			}
4363 			if (pi->size <= sizeof(*p)) {
4364 				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4365 				err = -EIO;
4366 				goto out;
4367 			}
4368 			err = drbd_recv_all(peer_device->connection, p, pi->size);
4369 			if (err)
4370 			       goto out;
4371 			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4372 		} else {
4373 			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4374 			err = -EIO;
4375 			goto out;
4376 		}
4377 
4378 		c.packets[pi->cmd == P_BITMAP]++;
4379 		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4380 
4381 		if (err <= 0) {
4382 			if (err < 0)
4383 				goto out;
4384 			break;
4385 		}
4386 		err = drbd_recv_header(peer_device->connection, pi);
4387 		if (err)
4388 			goto out;
4389 	}
4390 
4391 	INFO_bm_xfer_stats(device, "receive", &c);
4392 
4393 	if (device->state.conn == C_WF_BITMAP_T) {
4394 		enum drbd_state_rv rv;
4395 
4396 		err = drbd_send_bitmap(device);
4397 		if (err)
4398 			goto out;
4399 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4400 		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4401 		D_ASSERT(device, rv == SS_SUCCESS);
4402 	} else if (device->state.conn != C_WF_BITMAP_S) {
4403 		/* admin may have requested C_DISCONNECTING,
4404 		 * other threads may have noticed network errors */
4405 		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4406 		    drbd_conn_str(device->state.conn));
4407 	}
4408 	err = 0;
4409 
4410  out:
4411 	drbd_bm_unlock(device);
4412 	if (!err && device->state.conn == C_WF_BITMAP_S)
4413 		drbd_start_resync(device, C_SYNC_SOURCE);
4414 	return err;
4415 }
4416 
4417 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4418 {
4419 	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4420 		 pi->cmd, pi->size);
4421 
4422 	return ignore_remaining_packet(connection, pi);
4423 }
4424 
4425 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4426 {
4427 	/* Make sure we've acked all the TCP data associated
4428 	 * with the data requests being unplugged */
4429 	drbd_tcp_quickack(connection->data.socket);
4430 
4431 	return 0;
4432 }
4433 
4434 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4435 {
4436 	struct drbd_peer_device *peer_device;
4437 	struct drbd_device *device;
4438 	struct p_block_desc *p = pi->data;
4439 
4440 	peer_device = conn_peer_device(connection, pi->vnr);
4441 	if (!peer_device)
4442 		return -EIO;
4443 	device = peer_device->device;
4444 
4445 	switch (device->state.conn) {
4446 	case C_WF_SYNC_UUID:
4447 	case C_WF_BITMAP_T:
4448 	case C_BEHIND:
4449 			break;
4450 	default:
4451 		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4452 				drbd_conn_str(device->state.conn));
4453 	}
4454 
4455 	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4456 
4457 	return 0;
4458 }
4459 
4460 struct data_cmd {
4461 	int expect_payload;
4462 	size_t pkt_size;
4463 	int (*fn)(struct drbd_connection *, struct packet_info *);
4464 };
4465 
4466 static struct data_cmd drbd_cmd_handler[] = {
4467 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
4468 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
4469 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4470 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4471 	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
4472 	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4473 	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4474 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4475 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4476 	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
4477 	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4478 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4479 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
4480 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
4481 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
4482 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4483 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4484 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4485 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4486 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4487 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4488 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4489 	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4490 	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4491 	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
4492 };
4493 
4494 static void drbdd(struct drbd_connection *connection)
4495 {
4496 	struct packet_info pi;
4497 	size_t shs; /* sub header size */
4498 	int err;
4499 
4500 	while (get_t_state(&connection->receiver) == RUNNING) {
4501 		struct data_cmd *cmd;
4502 
4503 		drbd_thread_current_set_cpu(&connection->receiver);
4504 		if (drbd_recv_header(connection, &pi))
4505 			goto err_out;
4506 
4507 		cmd = &drbd_cmd_handler[pi.cmd];
4508 		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4509 			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4510 				 cmdname(pi.cmd), pi.cmd);
4511 			goto err_out;
4512 		}
4513 
4514 		shs = cmd->pkt_size;
4515 		if (pi.size > shs && !cmd->expect_payload) {
4516 			drbd_err(connection, "No payload expected %s l:%d\n",
4517 				 cmdname(pi.cmd), pi.size);
4518 			goto err_out;
4519 		}
4520 
4521 		if (shs) {
4522 			err = drbd_recv_all_warn(connection, pi.data, shs);
4523 			if (err)
4524 				goto err_out;
4525 			pi.size -= shs;
4526 		}
4527 
4528 		err = cmd->fn(connection, &pi);
4529 		if (err) {
4530 			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4531 				 cmdname(pi.cmd), err, pi.size);
4532 			goto err_out;
4533 		}
4534 	}
4535 	return;
4536 
4537     err_out:
4538 	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4539 }
4540 
4541 static void conn_disconnect(struct drbd_connection *connection)
4542 {
4543 	struct drbd_peer_device *peer_device;
4544 	enum drbd_conns oc;
4545 	int vnr;
4546 
4547 	if (connection->cstate == C_STANDALONE)
4548 		return;
4549 
4550 	/* We are about to start the cleanup after connection loss.
4551 	 * Make sure drbd_make_request knows about that.
4552 	 * Usually we should be in some network failure state already,
4553 	 * but just in case we are not, we fix it up here.
4554 	 */
4555 	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4556 
4557 	/* asender does not clean up anything. it must not interfere, either */
4558 	drbd_thread_stop(&connection->asender);
4559 	drbd_free_sock(connection);
4560 
4561 	rcu_read_lock();
4562 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4563 		struct drbd_device *device = peer_device->device;
4564 		kref_get(&device->kref);
4565 		rcu_read_unlock();
4566 		drbd_disconnected(peer_device);
4567 		kref_put(&device->kref, drbd_destroy_device);
4568 		rcu_read_lock();
4569 	}
4570 	rcu_read_unlock();
4571 
4572 	if (!list_empty(&connection->current_epoch->list))
4573 		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4574 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4575 	atomic_set(&connection->current_epoch->epoch_size, 0);
4576 	connection->send.seen_any_write_yet = false;
4577 
4578 	drbd_info(connection, "Connection closed\n");
4579 
4580 	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4581 		conn_try_outdate_peer_async(connection);
4582 
4583 	spin_lock_irq(&connection->resource->req_lock);
4584 	oc = connection->cstate;
4585 	if (oc >= C_UNCONNECTED)
4586 		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4587 
4588 	spin_unlock_irq(&connection->resource->req_lock);
4589 
4590 	if (oc == C_DISCONNECTING)
4591 		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4592 }
4593 
4594 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4595 {
4596 	struct drbd_device *device = peer_device->device;
4597 	unsigned int i;
4598 
4599 	/* wait for current activity to cease. */
4600 	spin_lock_irq(&device->resource->req_lock);
4601 	_drbd_wait_ee_list_empty(device, &device->active_ee);
4602 	_drbd_wait_ee_list_empty(device, &device->sync_ee);
4603 	_drbd_wait_ee_list_empty(device, &device->read_ee);
4604 	spin_unlock_irq(&device->resource->req_lock);
4605 
4606 	/* We do not have data structures that would allow us to
4607 	 * get the rs_pending_cnt down to 0 again.
4608 	 *  * On C_SYNC_TARGET we do not have any data structures describing
4609 	 *    the pending RSDataRequest's we have sent.
4610 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
4611 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4612 	 *  And no, it is not the sum of the reference counts in the
4613 	 *  resync_LRU. The resync_LRU tracks the whole operation including
4614 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4615 	 *  on the fly. */
4616 	drbd_rs_cancel_all(device);
4617 	device->rs_total = 0;
4618 	device->rs_failed = 0;
4619 	atomic_set(&device->rs_pending_cnt, 0);
4620 	wake_up(&device->misc_wait);
4621 
4622 	del_timer_sync(&device->resync_timer);
4623 	resync_timer_fn((unsigned long)device);
4624 
4625 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4626 	 * w_make_resync_request etc. which may still be on the worker queue
4627 	 * to be "canceled" */
4628 	drbd_flush_workqueue(&peer_device->connection->sender_work);
4629 
4630 	drbd_finish_peer_reqs(device);
4631 
4632 	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4633 	   might have issued a work again. The one before drbd_finish_peer_reqs() is
4634 	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4635 	drbd_flush_workqueue(&peer_device->connection->sender_work);
4636 
4637 	/* need to do it again, drbd_finish_peer_reqs() may have populated it
4638 	 * again via drbd_try_clear_on_disk_bm(). */
4639 	drbd_rs_cancel_all(device);
4640 
4641 	kfree(device->p_uuid);
4642 	device->p_uuid = NULL;
4643 
4644 	if (!drbd_suspended(device))
4645 		tl_clear(peer_device->connection);
4646 
4647 	drbd_md_sync(device);
4648 
4649 	/* serialize with bitmap writeout triggered by the state change,
4650 	 * if any. */
4651 	wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4652 
4653 	/* tcp_close and release of sendpage pages can be deferred.  I don't
4654 	 * want to use SO_LINGER, because apparently it can be deferred for
4655 	 * more than 20 seconds (longest time I checked).
4656 	 *
4657 	 * Actually we don't care for exactly when the network stack does its
4658 	 * put_page(), but release our reference on these pages right here.
4659 	 */
4660 	i = drbd_free_peer_reqs(device, &device->net_ee);
4661 	if (i)
4662 		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4663 	i = atomic_read(&device->pp_in_use_by_net);
4664 	if (i)
4665 		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4666 	i = atomic_read(&device->pp_in_use);
4667 	if (i)
4668 		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4669 
4670 	D_ASSERT(device, list_empty(&device->read_ee));
4671 	D_ASSERT(device, list_empty(&device->active_ee));
4672 	D_ASSERT(device, list_empty(&device->sync_ee));
4673 	D_ASSERT(device, list_empty(&device->done_ee));
4674 
4675 	return 0;
4676 }
4677 
4678 /*
4679  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4680  * we can agree on is stored in agreed_pro_version.
4681  *
4682  * feature flags and the reserved array should be enough room for future
4683  * enhancements of the handshake protocol, and possible plugins...
4684  *
4685  * for now, they are expected to be zero, but ignored.
4686  */
4687 static int drbd_send_features(struct drbd_connection *connection)
4688 {
4689 	struct drbd_socket *sock;
4690 	struct p_connection_features *p;
4691 
4692 	sock = &connection->data;
4693 	p = conn_prepare_command(connection, sock);
4694 	if (!p)
4695 		return -EIO;
4696 	memset(p, 0, sizeof(*p));
4697 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4698 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4699 	p->feature_flags = cpu_to_be32(PRO_FEATURES);
4700 	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4701 }
4702 
4703 /*
4704  * return values:
4705  *   1 yes, we have a valid connection
4706  *   0 oops, did not work out, please try again
4707  *  -1 peer talks different language,
4708  *     no point in trying again, please go standalone.
4709  */
4710 static int drbd_do_features(struct drbd_connection *connection)
4711 {
4712 	/* ASSERT current == connection->receiver ... */
4713 	struct p_connection_features *p;
4714 	const int expect = sizeof(struct p_connection_features);
4715 	struct packet_info pi;
4716 	int err;
4717 
4718 	err = drbd_send_features(connection);
4719 	if (err)
4720 		return 0;
4721 
4722 	err = drbd_recv_header(connection, &pi);
4723 	if (err)
4724 		return 0;
4725 
4726 	if (pi.cmd != P_CONNECTION_FEATURES) {
4727 		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4728 			 cmdname(pi.cmd), pi.cmd);
4729 		return -1;
4730 	}
4731 
4732 	if (pi.size != expect) {
4733 		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4734 		     expect, pi.size);
4735 		return -1;
4736 	}
4737 
4738 	p = pi.data;
4739 	err = drbd_recv_all_warn(connection, p, expect);
4740 	if (err)
4741 		return 0;
4742 
4743 	p->protocol_min = be32_to_cpu(p->protocol_min);
4744 	p->protocol_max = be32_to_cpu(p->protocol_max);
4745 	if (p->protocol_max == 0)
4746 		p->protocol_max = p->protocol_min;
4747 
4748 	if (PRO_VERSION_MAX < p->protocol_min ||
4749 	    PRO_VERSION_MIN > p->protocol_max)
4750 		goto incompat;
4751 
4752 	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4753 	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4754 
4755 	drbd_info(connection, "Handshake successful: "
4756 	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
4757 
4758 	drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4759 		  connection->agreed_features & FF_TRIM ? " " : " not ");
4760 
4761 	return 1;
4762 
4763  incompat:
4764 	drbd_err(connection, "incompatible DRBD dialects: "
4765 	    "I support %d-%d, peer supports %d-%d\n",
4766 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
4767 	    p->protocol_min, p->protocol_max);
4768 	return -1;
4769 }
4770 
4771 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4772 static int drbd_do_auth(struct drbd_connection *connection)
4773 {
4774 	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4775 	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4776 	return -1;
4777 }
4778 #else
4779 #define CHALLENGE_LEN 64
4780 
4781 /* Return value:
4782 	1 - auth succeeded,
4783 	0 - failed, try again (network error),
4784 	-1 - auth failed, don't try again.
4785 */
4786 
4787 static int drbd_do_auth(struct drbd_connection *connection)
4788 {
4789 	struct drbd_socket *sock;
4790 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4791 	struct scatterlist sg;
4792 	char *response = NULL;
4793 	char *right_response = NULL;
4794 	char *peers_ch = NULL;
4795 	unsigned int key_len;
4796 	char secret[SHARED_SECRET_MAX]; /* 64 byte */
4797 	unsigned int resp_size;
4798 	struct hash_desc desc;
4799 	struct packet_info pi;
4800 	struct net_conf *nc;
4801 	int err, rv;
4802 
4803 	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4804 
4805 	rcu_read_lock();
4806 	nc = rcu_dereference(connection->net_conf);
4807 	key_len = strlen(nc->shared_secret);
4808 	memcpy(secret, nc->shared_secret, key_len);
4809 	rcu_read_unlock();
4810 
4811 	desc.tfm = connection->cram_hmac_tfm;
4812 	desc.flags = 0;
4813 
4814 	rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4815 	if (rv) {
4816 		drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4817 		rv = -1;
4818 		goto fail;
4819 	}
4820 
4821 	get_random_bytes(my_challenge, CHALLENGE_LEN);
4822 
4823 	sock = &connection->data;
4824 	if (!conn_prepare_command(connection, sock)) {
4825 		rv = 0;
4826 		goto fail;
4827 	}
4828 	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4829 				my_challenge, CHALLENGE_LEN);
4830 	if (!rv)
4831 		goto fail;
4832 
4833 	err = drbd_recv_header(connection, &pi);
4834 	if (err) {
4835 		rv = 0;
4836 		goto fail;
4837 	}
4838 
4839 	if (pi.cmd != P_AUTH_CHALLENGE) {
4840 		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4841 			 cmdname(pi.cmd), pi.cmd);
4842 		rv = 0;
4843 		goto fail;
4844 	}
4845 
4846 	if (pi.size > CHALLENGE_LEN * 2) {
4847 		drbd_err(connection, "expected AuthChallenge payload too big.\n");
4848 		rv = -1;
4849 		goto fail;
4850 	}
4851 
4852 	if (pi.size < CHALLENGE_LEN) {
4853 		drbd_err(connection, "AuthChallenge payload too small.\n");
4854 		rv = -1;
4855 		goto fail;
4856 	}
4857 
4858 	peers_ch = kmalloc(pi.size, GFP_NOIO);
4859 	if (peers_ch == NULL) {
4860 		drbd_err(connection, "kmalloc of peers_ch failed\n");
4861 		rv = -1;
4862 		goto fail;
4863 	}
4864 
4865 	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4866 	if (err) {
4867 		rv = 0;
4868 		goto fail;
4869 	}
4870 
4871 	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
4872 		drbd_err(connection, "Peer presented the same challenge!\n");
4873 		rv = -1;
4874 		goto fail;
4875 	}
4876 
4877 	resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4878 	response = kmalloc(resp_size, GFP_NOIO);
4879 	if (response == NULL) {
4880 		drbd_err(connection, "kmalloc of response failed\n");
4881 		rv = -1;
4882 		goto fail;
4883 	}
4884 
4885 	sg_init_table(&sg, 1);
4886 	sg_set_buf(&sg, peers_ch, pi.size);
4887 
4888 	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4889 	if (rv) {
4890 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4891 		rv = -1;
4892 		goto fail;
4893 	}
4894 
4895 	if (!conn_prepare_command(connection, sock)) {
4896 		rv = 0;
4897 		goto fail;
4898 	}
4899 	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
4900 				response, resp_size);
4901 	if (!rv)
4902 		goto fail;
4903 
4904 	err = drbd_recv_header(connection, &pi);
4905 	if (err) {
4906 		rv = 0;
4907 		goto fail;
4908 	}
4909 
4910 	if (pi.cmd != P_AUTH_RESPONSE) {
4911 		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
4912 			 cmdname(pi.cmd), pi.cmd);
4913 		rv = 0;
4914 		goto fail;
4915 	}
4916 
4917 	if (pi.size != resp_size) {
4918 		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
4919 		rv = 0;
4920 		goto fail;
4921 	}
4922 
4923 	err = drbd_recv_all_warn(connection, response , resp_size);
4924 	if (err) {
4925 		rv = 0;
4926 		goto fail;
4927 	}
4928 
4929 	right_response = kmalloc(resp_size, GFP_NOIO);
4930 	if (right_response == NULL) {
4931 		drbd_err(connection, "kmalloc of right_response failed\n");
4932 		rv = -1;
4933 		goto fail;
4934 	}
4935 
4936 	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4937 
4938 	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4939 	if (rv) {
4940 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4941 		rv = -1;
4942 		goto fail;
4943 	}
4944 
4945 	rv = !memcmp(response, right_response, resp_size);
4946 
4947 	if (rv)
4948 		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
4949 		     resp_size);
4950 	else
4951 		rv = -1;
4952 
4953  fail:
4954 	kfree(peers_ch);
4955 	kfree(response);
4956 	kfree(right_response);
4957 
4958 	return rv;
4959 }
4960 #endif
4961 
4962 int drbd_receiver(struct drbd_thread *thi)
4963 {
4964 	struct drbd_connection *connection = thi->connection;
4965 	int h;
4966 
4967 	drbd_info(connection, "receiver (re)started\n");
4968 
4969 	do {
4970 		h = conn_connect(connection);
4971 		if (h == 0) {
4972 			conn_disconnect(connection);
4973 			schedule_timeout_interruptible(HZ);
4974 		}
4975 		if (h == -1) {
4976 			drbd_warn(connection, "Discarding network configuration.\n");
4977 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
4978 		}
4979 	} while (h == 0);
4980 
4981 	if (h > 0)
4982 		drbdd(connection);
4983 
4984 	conn_disconnect(connection);
4985 
4986 	drbd_info(connection, "receiver terminated\n");
4987 	return 0;
4988 }
4989 
4990 /* ********* acknowledge sender ******** */
4991 
4992 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4993 {
4994 	struct p_req_state_reply *p = pi->data;
4995 	int retcode = be32_to_cpu(p->retcode);
4996 
4997 	if (retcode >= SS_SUCCESS) {
4998 		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
4999 	} else {
5000 		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5001 		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5002 			 drbd_set_st_err_str(retcode), retcode);
5003 	}
5004 	wake_up(&connection->ping_wait);
5005 
5006 	return 0;
5007 }
5008 
5009 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5010 {
5011 	struct drbd_peer_device *peer_device;
5012 	struct drbd_device *device;
5013 	struct p_req_state_reply *p = pi->data;
5014 	int retcode = be32_to_cpu(p->retcode);
5015 
5016 	peer_device = conn_peer_device(connection, pi->vnr);
5017 	if (!peer_device)
5018 		return -EIO;
5019 	device = peer_device->device;
5020 
5021 	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5022 		D_ASSERT(device, connection->agreed_pro_version < 100);
5023 		return got_conn_RqSReply(connection, pi);
5024 	}
5025 
5026 	if (retcode >= SS_SUCCESS) {
5027 		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5028 	} else {
5029 		set_bit(CL_ST_CHG_FAIL, &device->flags);
5030 		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5031 			drbd_set_st_err_str(retcode), retcode);
5032 	}
5033 	wake_up(&device->state_wait);
5034 
5035 	return 0;
5036 }
5037 
5038 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5039 {
5040 	return drbd_send_ping_ack(connection);
5041 
5042 }
5043 
5044 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5045 {
5046 	/* restore idle timeout */
5047 	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5048 	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5049 		wake_up(&connection->ping_wait);
5050 
5051 	return 0;
5052 }
5053 
5054 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5055 {
5056 	struct drbd_peer_device *peer_device;
5057 	struct drbd_device *device;
5058 	struct p_block_ack *p = pi->data;
5059 	sector_t sector = be64_to_cpu(p->sector);
5060 	int blksize = be32_to_cpu(p->blksize);
5061 
5062 	peer_device = conn_peer_device(connection, pi->vnr);
5063 	if (!peer_device)
5064 		return -EIO;
5065 	device = peer_device->device;
5066 
5067 	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5068 
5069 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5070 
5071 	if (get_ldev(device)) {
5072 		drbd_rs_complete_io(device, sector);
5073 		drbd_set_in_sync(device, sector, blksize);
5074 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5075 		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5076 		put_ldev(device);
5077 	}
5078 	dec_rs_pending(device);
5079 	atomic_add(blksize >> 9, &device->rs_sect_in);
5080 
5081 	return 0;
5082 }
5083 
5084 static int
5085 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5086 			      struct rb_root *root, const char *func,
5087 			      enum drbd_req_event what, bool missing_ok)
5088 {
5089 	struct drbd_request *req;
5090 	struct bio_and_error m;
5091 
5092 	spin_lock_irq(&device->resource->req_lock);
5093 	req = find_request(device, root, id, sector, missing_ok, func);
5094 	if (unlikely(!req)) {
5095 		spin_unlock_irq(&device->resource->req_lock);
5096 		return -EIO;
5097 	}
5098 	__req_mod(req, what, &m);
5099 	spin_unlock_irq(&device->resource->req_lock);
5100 
5101 	if (m.bio)
5102 		complete_master_bio(device, &m);
5103 	return 0;
5104 }
5105 
5106 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5107 {
5108 	struct drbd_peer_device *peer_device;
5109 	struct drbd_device *device;
5110 	struct p_block_ack *p = pi->data;
5111 	sector_t sector = be64_to_cpu(p->sector);
5112 	int blksize = be32_to_cpu(p->blksize);
5113 	enum drbd_req_event what;
5114 
5115 	peer_device = conn_peer_device(connection, pi->vnr);
5116 	if (!peer_device)
5117 		return -EIO;
5118 	device = peer_device->device;
5119 
5120 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5121 
5122 	if (p->block_id == ID_SYNCER) {
5123 		drbd_set_in_sync(device, sector, blksize);
5124 		dec_rs_pending(device);
5125 		return 0;
5126 	}
5127 	switch (pi->cmd) {
5128 	case P_RS_WRITE_ACK:
5129 		what = WRITE_ACKED_BY_PEER_AND_SIS;
5130 		break;
5131 	case P_WRITE_ACK:
5132 		what = WRITE_ACKED_BY_PEER;
5133 		break;
5134 	case P_RECV_ACK:
5135 		what = RECV_ACKED_BY_PEER;
5136 		break;
5137 	case P_SUPERSEDED:
5138 		what = CONFLICT_RESOLVED;
5139 		break;
5140 	case P_RETRY_WRITE:
5141 		what = POSTPONE_WRITE;
5142 		break;
5143 	default:
5144 		BUG();
5145 	}
5146 
5147 	return validate_req_change_req_state(device, p->block_id, sector,
5148 					     &device->write_requests, __func__,
5149 					     what, false);
5150 }
5151 
5152 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5153 {
5154 	struct drbd_peer_device *peer_device;
5155 	struct drbd_device *device;
5156 	struct p_block_ack *p = pi->data;
5157 	sector_t sector = be64_to_cpu(p->sector);
5158 	int size = be32_to_cpu(p->blksize);
5159 	int err;
5160 
5161 	peer_device = conn_peer_device(connection, pi->vnr);
5162 	if (!peer_device)
5163 		return -EIO;
5164 	device = peer_device->device;
5165 
5166 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5167 
5168 	if (p->block_id == ID_SYNCER) {
5169 		dec_rs_pending(device);
5170 		drbd_rs_failed_io(device, sector, size);
5171 		return 0;
5172 	}
5173 
5174 	err = validate_req_change_req_state(device, p->block_id, sector,
5175 					    &device->write_requests, __func__,
5176 					    NEG_ACKED, true);
5177 	if (err) {
5178 		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5179 		   The master bio might already be completed, therefore the
5180 		   request is no longer in the collision hash. */
5181 		/* In Protocol B we might already have got a P_RECV_ACK
5182 		   but then get a P_NEG_ACK afterwards. */
5183 		drbd_set_out_of_sync(device, sector, size);
5184 	}
5185 	return 0;
5186 }
5187 
5188 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5189 {
5190 	struct drbd_peer_device *peer_device;
5191 	struct drbd_device *device;
5192 	struct p_block_ack *p = pi->data;
5193 	sector_t sector = be64_to_cpu(p->sector);
5194 
5195 	peer_device = conn_peer_device(connection, pi->vnr);
5196 	if (!peer_device)
5197 		return -EIO;
5198 	device = peer_device->device;
5199 
5200 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5201 
5202 	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5203 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5204 
5205 	return validate_req_change_req_state(device, p->block_id, sector,
5206 					     &device->read_requests, __func__,
5207 					     NEG_ACKED, false);
5208 }
5209 
5210 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5211 {
5212 	struct drbd_peer_device *peer_device;
5213 	struct drbd_device *device;
5214 	sector_t sector;
5215 	int size;
5216 	struct p_block_ack *p = pi->data;
5217 
5218 	peer_device = conn_peer_device(connection, pi->vnr);
5219 	if (!peer_device)
5220 		return -EIO;
5221 	device = peer_device->device;
5222 
5223 	sector = be64_to_cpu(p->sector);
5224 	size = be32_to_cpu(p->blksize);
5225 
5226 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5227 
5228 	dec_rs_pending(device);
5229 
5230 	if (get_ldev_if_state(device, D_FAILED)) {
5231 		drbd_rs_complete_io(device, sector);
5232 		switch (pi->cmd) {
5233 		case P_NEG_RS_DREPLY:
5234 			drbd_rs_failed_io(device, sector, size);
5235 		case P_RS_CANCEL:
5236 			break;
5237 		default:
5238 			BUG();
5239 		}
5240 		put_ldev(device);
5241 	}
5242 
5243 	return 0;
5244 }
5245 
5246 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5247 {
5248 	struct p_barrier_ack *p = pi->data;
5249 	struct drbd_peer_device *peer_device;
5250 	int vnr;
5251 
5252 	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5253 
5254 	rcu_read_lock();
5255 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5256 		struct drbd_device *device = peer_device->device;
5257 
5258 		if (device->state.conn == C_AHEAD &&
5259 		    atomic_read(&device->ap_in_flight) == 0 &&
5260 		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5261 			device->start_resync_timer.expires = jiffies + HZ;
5262 			add_timer(&device->start_resync_timer);
5263 		}
5264 	}
5265 	rcu_read_unlock();
5266 
5267 	return 0;
5268 }
5269 
5270 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5271 {
5272 	struct drbd_peer_device *peer_device;
5273 	struct drbd_device *device;
5274 	struct p_block_ack *p = pi->data;
5275 	struct drbd_device_work *dw;
5276 	sector_t sector;
5277 	int size;
5278 
5279 	peer_device = conn_peer_device(connection, pi->vnr);
5280 	if (!peer_device)
5281 		return -EIO;
5282 	device = peer_device->device;
5283 
5284 	sector = be64_to_cpu(p->sector);
5285 	size = be32_to_cpu(p->blksize);
5286 
5287 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5288 
5289 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5290 		drbd_ov_out_of_sync_found(device, sector, size);
5291 	else
5292 		ov_out_of_sync_print(device);
5293 
5294 	if (!get_ldev(device))
5295 		return 0;
5296 
5297 	drbd_rs_complete_io(device, sector);
5298 	dec_rs_pending(device);
5299 
5300 	--device->ov_left;
5301 
5302 	/* let's advance progress step marks only for every other megabyte */
5303 	if ((device->ov_left & 0x200) == 0x200)
5304 		drbd_advance_rs_marks(device, device->ov_left);
5305 
5306 	if (device->ov_left == 0) {
5307 		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5308 		if (dw) {
5309 			dw->w.cb = w_ov_finished;
5310 			dw->device = device;
5311 			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5312 		} else {
5313 			drbd_err(device, "kmalloc(dw) failed.");
5314 			ov_out_of_sync_print(device);
5315 			drbd_resync_finished(device);
5316 		}
5317 	}
5318 	put_ldev(device);
5319 	return 0;
5320 }
5321 
5322 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5323 {
5324 	return 0;
5325 }
5326 
5327 static int connection_finish_peer_reqs(struct drbd_connection *connection)
5328 {
5329 	struct drbd_peer_device *peer_device;
5330 	int vnr, not_empty = 0;
5331 
5332 	do {
5333 		clear_bit(SIGNAL_ASENDER, &connection->flags);
5334 		flush_signals(current);
5335 
5336 		rcu_read_lock();
5337 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5338 			struct drbd_device *device = peer_device->device;
5339 			kref_get(&device->kref);
5340 			rcu_read_unlock();
5341 			if (drbd_finish_peer_reqs(device)) {
5342 				kref_put(&device->kref, drbd_destroy_device);
5343 				return 1;
5344 			}
5345 			kref_put(&device->kref, drbd_destroy_device);
5346 			rcu_read_lock();
5347 		}
5348 		set_bit(SIGNAL_ASENDER, &connection->flags);
5349 
5350 		spin_lock_irq(&connection->resource->req_lock);
5351 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5352 			struct drbd_device *device = peer_device->device;
5353 			not_empty = !list_empty(&device->done_ee);
5354 			if (not_empty)
5355 				break;
5356 		}
5357 		spin_unlock_irq(&connection->resource->req_lock);
5358 		rcu_read_unlock();
5359 	} while (not_empty);
5360 
5361 	return 0;
5362 }
5363 
5364 struct asender_cmd {
5365 	size_t pkt_size;
5366 	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5367 };
5368 
5369 static struct asender_cmd asender_tbl[] = {
5370 	[P_PING]	    = { 0, got_Ping },
5371 	[P_PING_ACK]	    = { 0, got_PingAck },
5372 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5373 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5374 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5375 	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5376 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5377 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5378 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5379 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
5380 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
5381 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5382 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5383 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5384 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5385 	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5386 	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5387 };
5388 
5389 int drbd_asender(struct drbd_thread *thi)
5390 {
5391 	struct drbd_connection *connection = thi->connection;
5392 	struct asender_cmd *cmd = NULL;
5393 	struct packet_info pi;
5394 	int rv;
5395 	void *buf    = connection->meta.rbuf;
5396 	int received = 0;
5397 	unsigned int header_size = drbd_header_size(connection);
5398 	int expect   = header_size;
5399 	bool ping_timeout_active = false;
5400 	struct net_conf *nc;
5401 	int ping_timeo, tcp_cork, ping_int;
5402 	struct sched_param param = { .sched_priority = 2 };
5403 
5404 	rv = sched_setscheduler(current, SCHED_RR, &param);
5405 	if (rv < 0)
5406 		drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5407 
5408 	while (get_t_state(thi) == RUNNING) {
5409 		drbd_thread_current_set_cpu(thi);
5410 
5411 		rcu_read_lock();
5412 		nc = rcu_dereference(connection->net_conf);
5413 		ping_timeo = nc->ping_timeo;
5414 		tcp_cork = nc->tcp_cork;
5415 		ping_int = nc->ping_int;
5416 		rcu_read_unlock();
5417 
5418 		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5419 			if (drbd_send_ping(connection)) {
5420 				drbd_err(connection, "drbd_send_ping has failed\n");
5421 				goto reconnect;
5422 			}
5423 			connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5424 			ping_timeout_active = true;
5425 		}
5426 
5427 		/* TODO: conditionally cork; it may hurt latency if we cork without
5428 		   much to send */
5429 		if (tcp_cork)
5430 			drbd_tcp_cork(connection->meta.socket);
5431 		if (connection_finish_peer_reqs(connection)) {
5432 			drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5433 			goto reconnect;
5434 		}
5435 		/* but unconditionally uncork unless disabled */
5436 		if (tcp_cork)
5437 			drbd_tcp_uncork(connection->meta.socket);
5438 
5439 		/* short circuit, recv_msg would return EINTR anyways. */
5440 		if (signal_pending(current))
5441 			continue;
5442 
5443 		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5444 		clear_bit(SIGNAL_ASENDER, &connection->flags);
5445 
5446 		flush_signals(current);
5447 
5448 		/* Note:
5449 		 * -EINTR	 (on meta) we got a signal
5450 		 * -EAGAIN	 (on meta) rcvtimeo expired
5451 		 * -ECONNRESET	 other side closed the connection
5452 		 * -ERESTARTSYS  (on data) we got a signal
5453 		 * rv <  0	 other than above: unexpected error!
5454 		 * rv == expected: full header or command
5455 		 * rv <  expected: "woken" by signal during receive
5456 		 * rv == 0	 : "connection shut down by peer"
5457 		 */
5458 		if (likely(rv > 0)) {
5459 			received += rv;
5460 			buf	 += rv;
5461 		} else if (rv == 0) {
5462 			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5463 				long t;
5464 				rcu_read_lock();
5465 				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5466 				rcu_read_unlock();
5467 
5468 				t = wait_event_timeout(connection->ping_wait,
5469 						       connection->cstate < C_WF_REPORT_PARAMS,
5470 						       t);
5471 				if (t)
5472 					break;
5473 			}
5474 			drbd_err(connection, "meta connection shut down by peer.\n");
5475 			goto reconnect;
5476 		} else if (rv == -EAGAIN) {
5477 			/* If the data socket received something meanwhile,
5478 			 * that is good enough: peer is still alive. */
5479 			if (time_after(connection->last_received,
5480 				jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5481 				continue;
5482 			if (ping_timeout_active) {
5483 				drbd_err(connection, "PingAck did not arrive in time.\n");
5484 				goto reconnect;
5485 			}
5486 			set_bit(SEND_PING, &connection->flags);
5487 			continue;
5488 		} else if (rv == -EINTR) {
5489 			continue;
5490 		} else {
5491 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5492 			goto reconnect;
5493 		}
5494 
5495 		if (received == expect && cmd == NULL) {
5496 			if (decode_header(connection, connection->meta.rbuf, &pi))
5497 				goto reconnect;
5498 			cmd = &asender_tbl[pi.cmd];
5499 			if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5500 				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5501 					 cmdname(pi.cmd), pi.cmd);
5502 				goto disconnect;
5503 			}
5504 			expect = header_size + cmd->pkt_size;
5505 			if (pi.size != expect - header_size) {
5506 				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5507 					pi.cmd, pi.size);
5508 				goto reconnect;
5509 			}
5510 		}
5511 		if (received == expect) {
5512 			bool err;
5513 
5514 			err = cmd->fn(connection, &pi);
5515 			if (err) {
5516 				drbd_err(connection, "%pf failed\n", cmd->fn);
5517 				goto reconnect;
5518 			}
5519 
5520 			connection->last_received = jiffies;
5521 
5522 			if (cmd == &asender_tbl[P_PING_ACK]) {
5523 				/* restore idle timeout */
5524 				connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5525 				ping_timeout_active = false;
5526 			}
5527 
5528 			buf	 = connection->meta.rbuf;
5529 			received = 0;
5530 			expect	 = header_size;
5531 			cmd	 = NULL;
5532 		}
5533 	}
5534 
5535 	if (0) {
5536 reconnect:
5537 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5538 		conn_md_sync(connection);
5539 	}
5540 	if (0) {
5541 disconnect:
5542 		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5543 	}
5544 	clear_bit(SIGNAL_ASENDER, &connection->flags);
5545 
5546 	drbd_info(connection, "asender terminated\n");
5547 
5548 	return 0;
5549 }
5550