1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
50 
51 #define PRO_FEATURES (FF_TRIM)
52 
53 struct packet_info {
54 	enum drbd_packet cmd;
55 	unsigned int size;
56 	unsigned int vnr;
57 	void *data;
58 };
59 
60 enum finish_epoch {
61 	FE_STILL_LIVE,
62 	FE_DESTROYED,
63 	FE_RECYCLED,
64 };
65 
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
72 
73 
74 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
75 
76 /*
77  * some helper functions to deal with single linked page lists,
78  * page->private being our "next" pointer.
79  */
80 
81 /* If at least n pages are linked at head, get n pages off.
82  * Otherwise, don't modify head, and return NULL.
83  * Locking is the responsibility of the caller.
84  */
85 static struct page *page_chain_del(struct page **head, int n)
86 {
87 	struct page *page;
88 	struct page *tmp;
89 
90 	BUG_ON(!n);
91 	BUG_ON(!head);
92 
93 	page = *head;
94 
95 	if (!page)
96 		return NULL;
97 
98 	while (page) {
99 		tmp = page_chain_next(page);
100 		if (--n == 0)
101 			break; /* found sufficient pages */
102 		if (tmp == NULL)
103 			/* insufficient pages, don't use any of them. */
104 			return NULL;
105 		page = tmp;
106 	}
107 
108 	/* add end of list marker for the returned list */
109 	set_page_private(page, 0);
110 	/* actual return value, and adjustment of head */
111 	page = *head;
112 	*head = tmp;
113 	return page;
114 }
115 
116 /* may be used outside of locks to find the tail of a (usually short)
117  * "private" page chain, before adding it back to a global chain head
118  * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
120 {
121 	struct page *tmp;
122 	int i = 1;
123 	while ((tmp = page_chain_next(page)))
124 		++i, page = tmp;
125 	if (len)
126 		*len = i;
127 	return page;
128 }
129 
130 static int page_chain_free(struct page *page)
131 {
132 	struct page *tmp;
133 	int i = 0;
134 	page_chain_for_each_safe(page, tmp) {
135 		put_page(page);
136 		++i;
137 	}
138 	return i;
139 }
140 
141 static void page_chain_add(struct page **head,
142 		struct page *chain_first, struct page *chain_last)
143 {
144 #if 1
145 	struct page *tmp;
146 	tmp = page_chain_tail(chain_first, NULL);
147 	BUG_ON(tmp != chain_last);
148 #endif
149 
150 	/* add chain to head */
151 	set_page_private(chain_last, (unsigned long)*head);
152 	*head = chain_first;
153 }
154 
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156 				       unsigned int number)
157 {
158 	struct page *page = NULL;
159 	struct page *tmp = NULL;
160 	unsigned int i = 0;
161 
162 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
163 	 * So what. It saves a spin_lock. */
164 	if (drbd_pp_vacant >= number) {
165 		spin_lock(&drbd_pp_lock);
166 		page = page_chain_del(&drbd_pp_pool, number);
167 		if (page)
168 			drbd_pp_vacant -= number;
169 		spin_unlock(&drbd_pp_lock);
170 		if (page)
171 			return page;
172 	}
173 
174 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
176 	 * which in turn might block on the other node at this very place.  */
177 	for (i = 0; i < number; i++) {
178 		tmp = alloc_page(GFP_TRY);
179 		if (!tmp)
180 			break;
181 		set_page_private(tmp, (unsigned long)page);
182 		page = tmp;
183 	}
184 
185 	if (i == number)
186 		return page;
187 
188 	/* Not enough pages immediately available this time.
189 	 * No need to jump around here, drbd_alloc_pages will retry this
190 	 * function "soon". */
191 	if (page) {
192 		tmp = page_chain_tail(page, NULL);
193 		spin_lock(&drbd_pp_lock);
194 		page_chain_add(&drbd_pp_pool, page, tmp);
195 		drbd_pp_vacant += i;
196 		spin_unlock(&drbd_pp_lock);
197 	}
198 	return NULL;
199 }
200 
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202 					   struct list_head *to_be_freed)
203 {
204 	struct drbd_peer_request *peer_req, *tmp;
205 
206 	/* The EEs are always appended to the end of the list. Since
207 	   they are sent in order over the wire, they have to finish
208 	   in order. As soon as we see the first not finished we can
209 	   stop to examine the list... */
210 
211 	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212 		if (drbd_peer_req_has_active_page(peer_req))
213 			break;
214 		list_move(&peer_req->w.list, to_be_freed);
215 	}
216 }
217 
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
219 {
220 	LIST_HEAD(reclaimed);
221 	struct drbd_peer_request *peer_req, *t;
222 
223 	spin_lock_irq(&device->resource->req_lock);
224 	reclaim_finished_net_peer_reqs(device, &reclaimed);
225 	spin_unlock_irq(&device->resource->req_lock);
226 
227 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228 		drbd_free_net_peer_req(device, peer_req);
229 }
230 
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @device:	DRBD device.
234  * @number:	number of pages requested
235  * @retry:	whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * If this allocation would exceed the max_buffers setting, we throttle
242  * allocation (schedule_timeout) to give the system some room to breathe.
243  *
244  * We do not use max-buffers as hard limit, because it could lead to
245  * congestion and further to a distributed deadlock during online-verify or
246  * (checksum based) resync, if the max-buffers, socket buffer sizes and
247  * resync-rate settings are mis-configured.
248  *
249  * Returns a page chain linked via page->private.
250  */
251 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
252 			      bool retry)
253 {
254 	struct drbd_device *device = peer_device->device;
255 	struct page *page = NULL;
256 	struct net_conf *nc;
257 	DEFINE_WAIT(wait);
258 	unsigned int mxb;
259 
260 	rcu_read_lock();
261 	nc = rcu_dereference(peer_device->connection->net_conf);
262 	mxb = nc ? nc->max_buffers : 1000000;
263 	rcu_read_unlock();
264 
265 	if (atomic_read(&device->pp_in_use) < mxb)
266 		page = __drbd_alloc_pages(device, number);
267 
268 	while (page == NULL) {
269 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
270 
271 		drbd_kick_lo_and_reclaim_net(device);
272 
273 		if (atomic_read(&device->pp_in_use) < mxb) {
274 			page = __drbd_alloc_pages(device, number);
275 			if (page)
276 				break;
277 		}
278 
279 		if (!retry)
280 			break;
281 
282 		if (signal_pending(current)) {
283 			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
284 			break;
285 		}
286 
287 		if (schedule_timeout(HZ/10) == 0)
288 			mxb = UINT_MAX;
289 	}
290 	finish_wait(&drbd_pp_wait, &wait);
291 
292 	if (page)
293 		atomic_add(number, &device->pp_in_use);
294 	return page;
295 }
296 
297 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
298  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
299  * Either links the page chain back to the global pool,
300  * or returns all pages to the system. */
301 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
302 {
303 	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
304 	int i;
305 
306 	if (page == NULL)
307 		return;
308 
309 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
310 		i = page_chain_free(page);
311 	else {
312 		struct page *tmp;
313 		tmp = page_chain_tail(page, &i);
314 		spin_lock(&drbd_pp_lock);
315 		page_chain_add(&drbd_pp_pool, page, tmp);
316 		drbd_pp_vacant += i;
317 		spin_unlock(&drbd_pp_lock);
318 	}
319 	i = atomic_sub_return(i, a);
320 	if (i < 0)
321 		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
322 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
323 	wake_up(&drbd_pp_wait);
324 }
325 
326 /*
327 You need to hold the req_lock:
328  _drbd_wait_ee_list_empty()
329 
330 You must not have the req_lock:
331  drbd_free_peer_req()
332  drbd_alloc_peer_req()
333  drbd_free_peer_reqs()
334  drbd_ee_fix_bhs()
335  drbd_finish_peer_reqs()
336  drbd_clear_done_ee()
337  drbd_wait_ee_list_empty()
338 */
339 
340 struct drbd_peer_request *
341 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
342 		    unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
343 {
344 	struct drbd_device *device = peer_device->device;
345 	struct drbd_peer_request *peer_req;
346 	struct page *page = NULL;
347 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
348 
349 	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
350 		return NULL;
351 
352 	peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
353 	if (!peer_req) {
354 		if (!(gfp_mask & __GFP_NOWARN))
355 			drbd_err(device, "%s: allocation failed\n", __func__);
356 		return NULL;
357 	}
358 
359 	if (has_payload && data_size) {
360 		page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
361 		if (!page)
362 			goto fail;
363 	}
364 
365 	drbd_clear_interval(&peer_req->i);
366 	peer_req->i.size = data_size;
367 	peer_req->i.sector = sector;
368 	peer_req->i.local = false;
369 	peer_req->i.waiting = false;
370 
371 	peer_req->epoch = NULL;
372 	peer_req->peer_device = peer_device;
373 	peer_req->pages = page;
374 	atomic_set(&peer_req->pending_bios, 0);
375 	peer_req->flags = 0;
376 	/*
377 	 * The block_id is opaque to the receiver.  It is not endianness
378 	 * converted, and sent back to the sender unchanged.
379 	 */
380 	peer_req->block_id = id;
381 
382 	return peer_req;
383 
384  fail:
385 	mempool_free(peer_req, drbd_ee_mempool);
386 	return NULL;
387 }
388 
389 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
390 		       int is_net)
391 {
392 	if (peer_req->flags & EE_HAS_DIGEST)
393 		kfree(peer_req->digest);
394 	drbd_free_pages(device, peer_req->pages, is_net);
395 	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
396 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
397 	mempool_free(peer_req, drbd_ee_mempool);
398 }
399 
400 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
401 {
402 	LIST_HEAD(work_list);
403 	struct drbd_peer_request *peer_req, *t;
404 	int count = 0;
405 	int is_net = list == &device->net_ee;
406 
407 	spin_lock_irq(&device->resource->req_lock);
408 	list_splice_init(list, &work_list);
409 	spin_unlock_irq(&device->resource->req_lock);
410 
411 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
412 		__drbd_free_peer_req(device, peer_req, is_net);
413 		count++;
414 	}
415 	return count;
416 }
417 
418 /*
419  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
420  */
421 static int drbd_finish_peer_reqs(struct drbd_device *device)
422 {
423 	LIST_HEAD(work_list);
424 	LIST_HEAD(reclaimed);
425 	struct drbd_peer_request *peer_req, *t;
426 	int err = 0;
427 
428 	spin_lock_irq(&device->resource->req_lock);
429 	reclaim_finished_net_peer_reqs(device, &reclaimed);
430 	list_splice_init(&device->done_ee, &work_list);
431 	spin_unlock_irq(&device->resource->req_lock);
432 
433 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
434 		drbd_free_net_peer_req(device, peer_req);
435 
436 	/* possible callbacks here:
437 	 * e_end_block, and e_end_resync_block, e_send_superseded.
438 	 * all ignore the last argument.
439 	 */
440 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
441 		int err2;
442 
443 		/* list_del not necessary, next/prev members not touched */
444 		err2 = peer_req->w.cb(&peer_req->w, !!err);
445 		if (!err)
446 			err = err2;
447 		drbd_free_peer_req(device, peer_req);
448 	}
449 	wake_up(&device->ee_wait);
450 
451 	return err;
452 }
453 
454 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
455 				     struct list_head *head)
456 {
457 	DEFINE_WAIT(wait);
458 
459 	/* avoids spin_lock/unlock
460 	 * and calling prepare_to_wait in the fast path */
461 	while (!list_empty(head)) {
462 		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
463 		spin_unlock_irq(&device->resource->req_lock);
464 		io_schedule();
465 		finish_wait(&device->ee_wait, &wait);
466 		spin_lock_irq(&device->resource->req_lock);
467 	}
468 }
469 
470 static void drbd_wait_ee_list_empty(struct drbd_device *device,
471 				    struct list_head *head)
472 {
473 	spin_lock_irq(&device->resource->req_lock);
474 	_drbd_wait_ee_list_empty(device, head);
475 	spin_unlock_irq(&device->resource->req_lock);
476 }
477 
478 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
479 {
480 	struct kvec iov = {
481 		.iov_base = buf,
482 		.iov_len = size,
483 	};
484 	struct msghdr msg = {
485 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
486 	};
487 	return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
488 }
489 
490 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
491 {
492 	int rv;
493 
494 	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
495 
496 	if (rv < 0) {
497 		if (rv == -ECONNRESET)
498 			drbd_info(connection, "sock was reset by peer\n");
499 		else if (rv != -ERESTARTSYS)
500 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
501 	} else if (rv == 0) {
502 		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
503 			long t;
504 			rcu_read_lock();
505 			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
506 			rcu_read_unlock();
507 
508 			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
509 
510 			if (t)
511 				goto out;
512 		}
513 		drbd_info(connection, "sock was shut down by peer\n");
514 	}
515 
516 	if (rv != size)
517 		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
518 
519 out:
520 	return rv;
521 }
522 
523 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
524 {
525 	int err;
526 
527 	err = drbd_recv(connection, buf, size);
528 	if (err != size) {
529 		if (err >= 0)
530 			err = -EIO;
531 	} else
532 		err = 0;
533 	return err;
534 }
535 
536 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
537 {
538 	int err;
539 
540 	err = drbd_recv_all(connection, buf, size);
541 	if (err && !signal_pending(current))
542 		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
543 	return err;
544 }
545 
546 /* quoting tcp(7):
547  *   On individual connections, the socket buffer size must be set prior to the
548  *   listen(2) or connect(2) calls in order to have it take effect.
549  * This is our wrapper to do so.
550  */
551 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
552 		unsigned int rcv)
553 {
554 	/* open coded SO_SNDBUF, SO_RCVBUF */
555 	if (snd) {
556 		sock->sk->sk_sndbuf = snd;
557 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
558 	}
559 	if (rcv) {
560 		sock->sk->sk_rcvbuf = rcv;
561 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
562 	}
563 }
564 
565 static struct socket *drbd_try_connect(struct drbd_connection *connection)
566 {
567 	const char *what;
568 	struct socket *sock;
569 	struct sockaddr_in6 src_in6;
570 	struct sockaddr_in6 peer_in6;
571 	struct net_conf *nc;
572 	int err, peer_addr_len, my_addr_len;
573 	int sndbuf_size, rcvbuf_size, connect_int;
574 	int disconnect_on_error = 1;
575 
576 	rcu_read_lock();
577 	nc = rcu_dereference(connection->net_conf);
578 	if (!nc) {
579 		rcu_read_unlock();
580 		return NULL;
581 	}
582 	sndbuf_size = nc->sndbuf_size;
583 	rcvbuf_size = nc->rcvbuf_size;
584 	connect_int = nc->connect_int;
585 	rcu_read_unlock();
586 
587 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
588 	memcpy(&src_in6, &connection->my_addr, my_addr_len);
589 
590 	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
591 		src_in6.sin6_port = 0;
592 	else
593 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
594 
595 	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
596 	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
597 
598 	what = "sock_create_kern";
599 	err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
600 			       SOCK_STREAM, IPPROTO_TCP, &sock);
601 	if (err < 0) {
602 		sock = NULL;
603 		goto out;
604 	}
605 
606 	sock->sk->sk_rcvtimeo =
607 	sock->sk->sk_sndtimeo = connect_int * HZ;
608 	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
609 
610        /* explicitly bind to the configured IP as source IP
611 	*  for the outgoing connections.
612 	*  This is needed for multihomed hosts and to be
613 	*  able to use lo: interfaces for drbd.
614 	* Make sure to use 0 as port number, so linux selects
615 	*  a free one dynamically.
616 	*/
617 	what = "bind before connect";
618 	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
619 	if (err < 0)
620 		goto out;
621 
622 	/* connect may fail, peer not yet available.
623 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
624 	disconnect_on_error = 0;
625 	what = "connect";
626 	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
627 
628 out:
629 	if (err < 0) {
630 		if (sock) {
631 			sock_release(sock);
632 			sock = NULL;
633 		}
634 		switch (-err) {
635 			/* timeout, busy, signal pending */
636 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
637 		case EINTR: case ERESTARTSYS:
638 			/* peer not (yet) available, network problem */
639 		case ECONNREFUSED: case ENETUNREACH:
640 		case EHOSTDOWN:    case EHOSTUNREACH:
641 			disconnect_on_error = 0;
642 			break;
643 		default:
644 			drbd_err(connection, "%s failed, err = %d\n", what, err);
645 		}
646 		if (disconnect_on_error)
647 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
648 	}
649 
650 	return sock;
651 }
652 
653 struct accept_wait_data {
654 	struct drbd_connection *connection;
655 	struct socket *s_listen;
656 	struct completion door_bell;
657 	void (*original_sk_state_change)(struct sock *sk);
658 
659 };
660 
661 static void drbd_incoming_connection(struct sock *sk)
662 {
663 	struct accept_wait_data *ad = sk->sk_user_data;
664 	void (*state_change)(struct sock *sk);
665 
666 	state_change = ad->original_sk_state_change;
667 	if (sk->sk_state == TCP_ESTABLISHED)
668 		complete(&ad->door_bell);
669 	state_change(sk);
670 }
671 
672 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
673 {
674 	int err, sndbuf_size, rcvbuf_size, my_addr_len;
675 	struct sockaddr_in6 my_addr;
676 	struct socket *s_listen;
677 	struct net_conf *nc;
678 	const char *what;
679 
680 	rcu_read_lock();
681 	nc = rcu_dereference(connection->net_conf);
682 	if (!nc) {
683 		rcu_read_unlock();
684 		return -EIO;
685 	}
686 	sndbuf_size = nc->sndbuf_size;
687 	rcvbuf_size = nc->rcvbuf_size;
688 	rcu_read_unlock();
689 
690 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
691 	memcpy(&my_addr, &connection->my_addr, my_addr_len);
692 
693 	what = "sock_create_kern";
694 	err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
695 			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
696 	if (err) {
697 		s_listen = NULL;
698 		goto out;
699 	}
700 
701 	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
702 	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
703 
704 	what = "bind before listen";
705 	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
706 	if (err < 0)
707 		goto out;
708 
709 	ad->s_listen = s_listen;
710 	write_lock_bh(&s_listen->sk->sk_callback_lock);
711 	ad->original_sk_state_change = s_listen->sk->sk_state_change;
712 	s_listen->sk->sk_state_change = drbd_incoming_connection;
713 	s_listen->sk->sk_user_data = ad;
714 	write_unlock_bh(&s_listen->sk->sk_callback_lock);
715 
716 	what = "listen";
717 	err = s_listen->ops->listen(s_listen, 5);
718 	if (err < 0)
719 		goto out;
720 
721 	return 0;
722 out:
723 	if (s_listen)
724 		sock_release(s_listen);
725 	if (err < 0) {
726 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
727 			drbd_err(connection, "%s failed, err = %d\n", what, err);
728 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
729 		}
730 	}
731 
732 	return -EIO;
733 }
734 
735 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
736 {
737 	write_lock_bh(&sk->sk_callback_lock);
738 	sk->sk_state_change = ad->original_sk_state_change;
739 	sk->sk_user_data = NULL;
740 	write_unlock_bh(&sk->sk_callback_lock);
741 }
742 
743 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
744 {
745 	int timeo, connect_int, err = 0;
746 	struct socket *s_estab = NULL;
747 	struct net_conf *nc;
748 
749 	rcu_read_lock();
750 	nc = rcu_dereference(connection->net_conf);
751 	if (!nc) {
752 		rcu_read_unlock();
753 		return NULL;
754 	}
755 	connect_int = nc->connect_int;
756 	rcu_read_unlock();
757 
758 	timeo = connect_int * HZ;
759 	/* 28.5% random jitter */
760 	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
761 
762 	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
763 	if (err <= 0)
764 		return NULL;
765 
766 	err = kernel_accept(ad->s_listen, &s_estab, 0);
767 	if (err < 0) {
768 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
769 			drbd_err(connection, "accept failed, err = %d\n", err);
770 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
771 		}
772 	}
773 
774 	if (s_estab)
775 		unregister_state_change(s_estab->sk, ad);
776 
777 	return s_estab;
778 }
779 
780 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
781 
782 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
783 			     enum drbd_packet cmd)
784 {
785 	if (!conn_prepare_command(connection, sock))
786 		return -EIO;
787 	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
788 }
789 
790 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
791 {
792 	unsigned int header_size = drbd_header_size(connection);
793 	struct packet_info pi;
794 	int err;
795 
796 	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
797 	if (err != header_size) {
798 		if (err >= 0)
799 			err = -EIO;
800 		return err;
801 	}
802 	err = decode_header(connection, connection->data.rbuf, &pi);
803 	if (err)
804 		return err;
805 	return pi.cmd;
806 }
807 
808 /**
809  * drbd_socket_okay() - Free the socket if its connection is not okay
810  * @sock:	pointer to the pointer to the socket.
811  */
812 static int drbd_socket_okay(struct socket **sock)
813 {
814 	int rr;
815 	char tb[4];
816 
817 	if (!*sock)
818 		return false;
819 
820 	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
821 
822 	if (rr > 0 || rr == -EAGAIN) {
823 		return true;
824 	} else {
825 		sock_release(*sock);
826 		*sock = NULL;
827 		return false;
828 	}
829 }
830 /* Gets called if a connection is established, or if a new minor gets created
831    in a connection */
832 int drbd_connected(struct drbd_peer_device *peer_device)
833 {
834 	struct drbd_device *device = peer_device->device;
835 	int err;
836 
837 	atomic_set(&device->packet_seq, 0);
838 	device->peer_seq = 0;
839 
840 	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
841 		&peer_device->connection->cstate_mutex :
842 		&device->own_state_mutex;
843 
844 	err = drbd_send_sync_param(peer_device);
845 	if (!err)
846 		err = drbd_send_sizes(peer_device, 0, 0);
847 	if (!err)
848 		err = drbd_send_uuids(peer_device);
849 	if (!err)
850 		err = drbd_send_current_state(peer_device);
851 	clear_bit(USE_DEGR_WFC_T, &device->flags);
852 	clear_bit(RESIZE_PENDING, &device->flags);
853 	atomic_set(&device->ap_in_flight, 0);
854 	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
855 	return err;
856 }
857 
858 /*
859  * return values:
860  *   1 yes, we have a valid connection
861  *   0 oops, did not work out, please try again
862  *  -1 peer talks different language,
863  *     no point in trying again, please go standalone.
864  *  -2 We do not have a network config...
865  */
866 static int conn_connect(struct drbd_connection *connection)
867 {
868 	struct drbd_socket sock, msock;
869 	struct drbd_peer_device *peer_device;
870 	struct net_conf *nc;
871 	int vnr, timeout, h, ok;
872 	bool discard_my_data;
873 	enum drbd_state_rv rv;
874 	struct accept_wait_data ad = {
875 		.connection = connection,
876 		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
877 	};
878 
879 	clear_bit(DISCONNECT_SENT, &connection->flags);
880 	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
881 		return -2;
882 
883 	mutex_init(&sock.mutex);
884 	sock.sbuf = connection->data.sbuf;
885 	sock.rbuf = connection->data.rbuf;
886 	sock.socket = NULL;
887 	mutex_init(&msock.mutex);
888 	msock.sbuf = connection->meta.sbuf;
889 	msock.rbuf = connection->meta.rbuf;
890 	msock.socket = NULL;
891 
892 	/* Assume that the peer only understands protocol 80 until we know better.  */
893 	connection->agreed_pro_version = 80;
894 
895 	if (prepare_listen_socket(connection, &ad))
896 		return 0;
897 
898 	do {
899 		struct socket *s;
900 
901 		s = drbd_try_connect(connection);
902 		if (s) {
903 			if (!sock.socket) {
904 				sock.socket = s;
905 				send_first_packet(connection, &sock, P_INITIAL_DATA);
906 			} else if (!msock.socket) {
907 				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
908 				msock.socket = s;
909 				send_first_packet(connection, &msock, P_INITIAL_META);
910 			} else {
911 				drbd_err(connection, "Logic error in conn_connect()\n");
912 				goto out_release_sockets;
913 			}
914 		}
915 
916 		if (sock.socket && msock.socket) {
917 			rcu_read_lock();
918 			nc = rcu_dereference(connection->net_conf);
919 			timeout = nc->ping_timeo * HZ / 10;
920 			rcu_read_unlock();
921 			schedule_timeout_interruptible(timeout);
922 			ok = drbd_socket_okay(&sock.socket);
923 			ok = drbd_socket_okay(&msock.socket) && ok;
924 			if (ok)
925 				break;
926 		}
927 
928 retry:
929 		s = drbd_wait_for_connect(connection, &ad);
930 		if (s) {
931 			int fp = receive_first_packet(connection, s);
932 			drbd_socket_okay(&sock.socket);
933 			drbd_socket_okay(&msock.socket);
934 			switch (fp) {
935 			case P_INITIAL_DATA:
936 				if (sock.socket) {
937 					drbd_warn(connection, "initial packet S crossed\n");
938 					sock_release(sock.socket);
939 					sock.socket = s;
940 					goto randomize;
941 				}
942 				sock.socket = s;
943 				break;
944 			case P_INITIAL_META:
945 				set_bit(RESOLVE_CONFLICTS, &connection->flags);
946 				if (msock.socket) {
947 					drbd_warn(connection, "initial packet M crossed\n");
948 					sock_release(msock.socket);
949 					msock.socket = s;
950 					goto randomize;
951 				}
952 				msock.socket = s;
953 				break;
954 			default:
955 				drbd_warn(connection, "Error receiving initial packet\n");
956 				sock_release(s);
957 randomize:
958 				if (prandom_u32() & 1)
959 					goto retry;
960 			}
961 		}
962 
963 		if (connection->cstate <= C_DISCONNECTING)
964 			goto out_release_sockets;
965 		if (signal_pending(current)) {
966 			flush_signals(current);
967 			smp_rmb();
968 			if (get_t_state(&connection->receiver) == EXITING)
969 				goto out_release_sockets;
970 		}
971 
972 		ok = drbd_socket_okay(&sock.socket);
973 		ok = drbd_socket_okay(&msock.socket) && ok;
974 	} while (!ok);
975 
976 	if (ad.s_listen)
977 		sock_release(ad.s_listen);
978 
979 	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
980 	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
981 
982 	sock.socket->sk->sk_allocation = GFP_NOIO;
983 	msock.socket->sk->sk_allocation = GFP_NOIO;
984 
985 	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
986 	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
987 
988 	/* NOT YET ...
989 	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
990 	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
991 	 * first set it to the P_CONNECTION_FEATURES timeout,
992 	 * which we set to 4x the configured ping_timeout. */
993 	rcu_read_lock();
994 	nc = rcu_dereference(connection->net_conf);
995 
996 	sock.socket->sk->sk_sndtimeo =
997 	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
998 
999 	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1000 	timeout = nc->timeout * HZ / 10;
1001 	discard_my_data = nc->discard_my_data;
1002 	rcu_read_unlock();
1003 
1004 	msock.socket->sk->sk_sndtimeo = timeout;
1005 
1006 	/* we don't want delays.
1007 	 * we use TCP_CORK where appropriate, though */
1008 	drbd_tcp_nodelay(sock.socket);
1009 	drbd_tcp_nodelay(msock.socket);
1010 
1011 	connection->data.socket = sock.socket;
1012 	connection->meta.socket = msock.socket;
1013 	connection->last_received = jiffies;
1014 
1015 	h = drbd_do_features(connection);
1016 	if (h <= 0)
1017 		return h;
1018 
1019 	if (connection->cram_hmac_tfm) {
1020 		/* drbd_request_state(device, NS(conn, WFAuth)); */
1021 		switch (drbd_do_auth(connection)) {
1022 		case -1:
1023 			drbd_err(connection, "Authentication of peer failed\n");
1024 			return -1;
1025 		case 0:
1026 			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1027 			return 0;
1028 		}
1029 	}
1030 
1031 	connection->data.socket->sk->sk_sndtimeo = timeout;
1032 	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1033 
1034 	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1035 		return -1;
1036 
1037 	/* Prevent a race between resync-handshake and
1038 	 * being promoted to Primary.
1039 	 *
1040 	 * Grab and release the state mutex, so we know that any current
1041 	 * drbd_set_role() is finished, and any incoming drbd_set_role
1042 	 * will see the STATE_SENT flag, and wait for it to be cleared.
1043 	 */
1044 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1045 		mutex_lock(peer_device->device->state_mutex);
1046 
1047 	set_bit(STATE_SENT, &connection->flags);
1048 
1049 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1050 		mutex_unlock(peer_device->device->state_mutex);
1051 
1052 	rcu_read_lock();
1053 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1054 		struct drbd_device *device = peer_device->device;
1055 		kref_get(&device->kref);
1056 		rcu_read_unlock();
1057 
1058 		if (discard_my_data)
1059 			set_bit(DISCARD_MY_DATA, &device->flags);
1060 		else
1061 			clear_bit(DISCARD_MY_DATA, &device->flags);
1062 
1063 		drbd_connected(peer_device);
1064 		kref_put(&device->kref, drbd_destroy_device);
1065 		rcu_read_lock();
1066 	}
1067 	rcu_read_unlock();
1068 
1069 	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1070 	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1071 		clear_bit(STATE_SENT, &connection->flags);
1072 		return 0;
1073 	}
1074 
1075 	drbd_thread_start(&connection->asender);
1076 
1077 	mutex_lock(&connection->resource->conf_update);
1078 	/* The discard_my_data flag is a single-shot modifier to the next
1079 	 * connection attempt, the handshake of which is now well underway.
1080 	 * No need for rcu style copying of the whole struct
1081 	 * just to clear a single value. */
1082 	connection->net_conf->discard_my_data = 0;
1083 	mutex_unlock(&connection->resource->conf_update);
1084 
1085 	return h;
1086 
1087 out_release_sockets:
1088 	if (ad.s_listen)
1089 		sock_release(ad.s_listen);
1090 	if (sock.socket)
1091 		sock_release(sock.socket);
1092 	if (msock.socket)
1093 		sock_release(msock.socket);
1094 	return -1;
1095 }
1096 
1097 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1098 {
1099 	unsigned int header_size = drbd_header_size(connection);
1100 
1101 	if (header_size == sizeof(struct p_header100) &&
1102 	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1103 		struct p_header100 *h = header;
1104 		if (h->pad != 0) {
1105 			drbd_err(connection, "Header padding is not zero\n");
1106 			return -EINVAL;
1107 		}
1108 		pi->vnr = be16_to_cpu(h->volume);
1109 		pi->cmd = be16_to_cpu(h->command);
1110 		pi->size = be32_to_cpu(h->length);
1111 	} else if (header_size == sizeof(struct p_header95) &&
1112 		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1113 		struct p_header95 *h = header;
1114 		pi->cmd = be16_to_cpu(h->command);
1115 		pi->size = be32_to_cpu(h->length);
1116 		pi->vnr = 0;
1117 	} else if (header_size == sizeof(struct p_header80) &&
1118 		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1119 		struct p_header80 *h = header;
1120 		pi->cmd = be16_to_cpu(h->command);
1121 		pi->size = be16_to_cpu(h->length);
1122 		pi->vnr = 0;
1123 	} else {
1124 		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1125 			 be32_to_cpu(*(__be32 *)header),
1126 			 connection->agreed_pro_version);
1127 		return -EINVAL;
1128 	}
1129 	pi->data = header + header_size;
1130 	return 0;
1131 }
1132 
1133 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1134 {
1135 	void *buffer = connection->data.rbuf;
1136 	int err;
1137 
1138 	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1139 	if (err)
1140 		return err;
1141 
1142 	err = decode_header(connection, buffer, pi);
1143 	connection->last_received = jiffies;
1144 
1145 	return err;
1146 }
1147 
1148 static void drbd_flush(struct drbd_connection *connection)
1149 {
1150 	int rv;
1151 	struct drbd_peer_device *peer_device;
1152 	int vnr;
1153 
1154 	if (connection->write_ordering >= WO_bdev_flush) {
1155 		rcu_read_lock();
1156 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1157 			struct drbd_device *device = peer_device->device;
1158 
1159 			if (!get_ldev(device))
1160 				continue;
1161 			kref_get(&device->kref);
1162 			rcu_read_unlock();
1163 
1164 			rv = blkdev_issue_flush(device->ldev->backing_bdev,
1165 					GFP_NOIO, NULL);
1166 			if (rv) {
1167 				drbd_info(device, "local disk flush failed with status %d\n", rv);
1168 				/* would rather check on EOPNOTSUPP, but that is not reliable.
1169 				 * don't try again for ANY return value != 0
1170 				 * if (rv == -EOPNOTSUPP) */
1171 				drbd_bump_write_ordering(connection, WO_drain_io);
1172 			}
1173 			put_ldev(device);
1174 			kref_put(&device->kref, drbd_destroy_device);
1175 
1176 			rcu_read_lock();
1177 			if (rv)
1178 				break;
1179 		}
1180 		rcu_read_unlock();
1181 	}
1182 }
1183 
1184 /**
1185  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1186  * @device:	DRBD device.
1187  * @epoch:	Epoch object.
1188  * @ev:		Epoch event.
1189  */
1190 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1191 					       struct drbd_epoch *epoch,
1192 					       enum epoch_event ev)
1193 {
1194 	int epoch_size;
1195 	struct drbd_epoch *next_epoch;
1196 	enum finish_epoch rv = FE_STILL_LIVE;
1197 
1198 	spin_lock(&connection->epoch_lock);
1199 	do {
1200 		next_epoch = NULL;
1201 
1202 		epoch_size = atomic_read(&epoch->epoch_size);
1203 
1204 		switch (ev & ~EV_CLEANUP) {
1205 		case EV_PUT:
1206 			atomic_dec(&epoch->active);
1207 			break;
1208 		case EV_GOT_BARRIER_NR:
1209 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1210 			break;
1211 		case EV_BECAME_LAST:
1212 			/* nothing to do*/
1213 			break;
1214 		}
1215 
1216 		if (epoch_size != 0 &&
1217 		    atomic_read(&epoch->active) == 0 &&
1218 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1219 			if (!(ev & EV_CLEANUP)) {
1220 				spin_unlock(&connection->epoch_lock);
1221 				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1222 				spin_lock(&connection->epoch_lock);
1223 			}
1224 #if 0
1225 			/* FIXME: dec unacked on connection, once we have
1226 			 * something to count pending connection packets in. */
1227 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1228 				dec_unacked(epoch->connection);
1229 #endif
1230 
1231 			if (connection->current_epoch != epoch) {
1232 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1233 				list_del(&epoch->list);
1234 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1235 				connection->epochs--;
1236 				kfree(epoch);
1237 
1238 				if (rv == FE_STILL_LIVE)
1239 					rv = FE_DESTROYED;
1240 			} else {
1241 				epoch->flags = 0;
1242 				atomic_set(&epoch->epoch_size, 0);
1243 				/* atomic_set(&epoch->active, 0); is already zero */
1244 				if (rv == FE_STILL_LIVE)
1245 					rv = FE_RECYCLED;
1246 			}
1247 		}
1248 
1249 		if (!next_epoch)
1250 			break;
1251 
1252 		epoch = next_epoch;
1253 	} while (1);
1254 
1255 	spin_unlock(&connection->epoch_lock);
1256 
1257 	return rv;
1258 }
1259 
1260 /**
1261  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1262  * @connection:	DRBD connection.
1263  * @wo:		Write ordering method to try.
1264  */
1265 void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
1266 {
1267 	struct disk_conf *dc;
1268 	struct drbd_peer_device *peer_device;
1269 	enum write_ordering_e pwo;
1270 	int vnr;
1271 	static char *write_ordering_str[] = {
1272 		[WO_none] = "none",
1273 		[WO_drain_io] = "drain",
1274 		[WO_bdev_flush] = "flush",
1275 	};
1276 
1277 	pwo = connection->write_ordering;
1278 	wo = min(pwo, wo);
1279 	rcu_read_lock();
1280 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1281 		struct drbd_device *device = peer_device->device;
1282 
1283 		if (!get_ldev_if_state(device, D_ATTACHING))
1284 			continue;
1285 		dc = rcu_dereference(device->ldev->disk_conf);
1286 
1287 		if (wo == WO_bdev_flush && !dc->disk_flushes)
1288 			wo = WO_drain_io;
1289 		if (wo == WO_drain_io && !dc->disk_drain)
1290 			wo = WO_none;
1291 		put_ldev(device);
1292 	}
1293 	rcu_read_unlock();
1294 	connection->write_ordering = wo;
1295 	if (pwo != connection->write_ordering || wo == WO_bdev_flush)
1296 		drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
1297 }
1298 
1299 /**
1300  * drbd_submit_peer_request()
1301  * @device:	DRBD device.
1302  * @peer_req:	peer request
1303  * @rw:		flag field, see bio->bi_rw
1304  *
1305  * May spread the pages to multiple bios,
1306  * depending on bio_add_page restrictions.
1307  *
1308  * Returns 0 if all bios have been submitted,
1309  * -ENOMEM if we could not allocate enough bios,
1310  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1311  *  single page to an empty bio (which should never happen and likely indicates
1312  *  that the lower level IO stack is in some way broken). This has been observed
1313  *  on certain Xen deployments.
1314  */
1315 /* TODO allocate from our own bio_set. */
1316 int drbd_submit_peer_request(struct drbd_device *device,
1317 			     struct drbd_peer_request *peer_req,
1318 			     const unsigned rw, const int fault_type)
1319 {
1320 	struct bio *bios = NULL;
1321 	struct bio *bio;
1322 	struct page *page = peer_req->pages;
1323 	sector_t sector = peer_req->i.sector;
1324 	unsigned ds = peer_req->i.size;
1325 	unsigned n_bios = 0;
1326 	unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1327 	int err = -ENOMEM;
1328 
1329 	if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1330 		/* wait for all pending IO completions, before we start
1331 		 * zeroing things out. */
1332 		conn_wait_active_ee_empty(first_peer_device(device)->connection);
1333 		if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1334 			sector, ds >> 9, GFP_NOIO))
1335 			peer_req->flags |= EE_WAS_ERROR;
1336 		drbd_endio_write_sec_final(peer_req);
1337 		return 0;
1338 	}
1339 
1340 	if (peer_req->flags & EE_IS_TRIM)
1341 		nr_pages = 0; /* discards don't have any payload. */
1342 
1343 	/* In most cases, we will only need one bio.  But in case the lower
1344 	 * level restrictions happen to be different at this offset on this
1345 	 * side than those of the sending peer, we may need to submit the
1346 	 * request in more than one bio.
1347 	 *
1348 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1349 	 * generated bio, but a bio allocated on behalf of the peer.
1350 	 */
1351 next_bio:
1352 	bio = bio_alloc(GFP_NOIO, nr_pages);
1353 	if (!bio) {
1354 		drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1355 		goto fail;
1356 	}
1357 	/* > peer_req->i.sector, unless this is the first bio */
1358 	bio->bi_iter.bi_sector = sector;
1359 	bio->bi_bdev = device->ldev->backing_bdev;
1360 	bio->bi_rw = rw;
1361 	bio->bi_private = peer_req;
1362 	bio->bi_end_io = drbd_peer_request_endio;
1363 
1364 	bio->bi_next = bios;
1365 	bios = bio;
1366 	++n_bios;
1367 
1368 	if (rw & REQ_DISCARD) {
1369 		bio->bi_iter.bi_size = ds;
1370 		goto submit;
1371 	}
1372 
1373 	page_chain_for_each(page) {
1374 		unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1375 		if (!bio_add_page(bio, page, len, 0)) {
1376 			/* A single page must always be possible!
1377 			 * But in case it fails anyways,
1378 			 * we deal with it, and complain (below). */
1379 			if (bio->bi_vcnt == 0) {
1380 				drbd_err(device,
1381 					"bio_add_page failed for len=%u, "
1382 					"bi_vcnt=0 (bi_sector=%llu)\n",
1383 					len, (uint64_t)bio->bi_iter.bi_sector);
1384 				err = -ENOSPC;
1385 				goto fail;
1386 			}
1387 			goto next_bio;
1388 		}
1389 		ds -= len;
1390 		sector += len >> 9;
1391 		--nr_pages;
1392 	}
1393 	D_ASSERT(device, ds == 0);
1394 submit:
1395 	D_ASSERT(device, page == NULL);
1396 
1397 	atomic_set(&peer_req->pending_bios, n_bios);
1398 	do {
1399 		bio = bios;
1400 		bios = bios->bi_next;
1401 		bio->bi_next = NULL;
1402 
1403 		drbd_generic_make_request(device, fault_type, bio);
1404 	} while (bios);
1405 	return 0;
1406 
1407 fail:
1408 	while (bios) {
1409 		bio = bios;
1410 		bios = bios->bi_next;
1411 		bio_put(bio);
1412 	}
1413 	return err;
1414 }
1415 
1416 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1417 					     struct drbd_peer_request *peer_req)
1418 {
1419 	struct drbd_interval *i = &peer_req->i;
1420 
1421 	drbd_remove_interval(&device->write_requests, i);
1422 	drbd_clear_interval(i);
1423 
1424 	/* Wake up any processes waiting for this peer request to complete.  */
1425 	if (i->waiting)
1426 		wake_up(&device->misc_wait);
1427 }
1428 
1429 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1430 {
1431 	struct drbd_peer_device *peer_device;
1432 	int vnr;
1433 
1434 	rcu_read_lock();
1435 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1436 		struct drbd_device *device = peer_device->device;
1437 
1438 		kref_get(&device->kref);
1439 		rcu_read_unlock();
1440 		drbd_wait_ee_list_empty(device, &device->active_ee);
1441 		kref_put(&device->kref, drbd_destroy_device);
1442 		rcu_read_lock();
1443 	}
1444 	rcu_read_unlock();
1445 }
1446 
1447 static struct drbd_peer_device *
1448 conn_peer_device(struct drbd_connection *connection, int volume_number)
1449 {
1450 	return idr_find(&connection->peer_devices, volume_number);
1451 }
1452 
1453 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1454 {
1455 	int rv;
1456 	struct p_barrier *p = pi->data;
1457 	struct drbd_epoch *epoch;
1458 
1459 	/* FIXME these are unacked on connection,
1460 	 * not a specific (peer)device.
1461 	 */
1462 	connection->current_epoch->barrier_nr = p->barrier;
1463 	connection->current_epoch->connection = connection;
1464 	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1465 
1466 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1467 	 * the activity log, which means it would not be resynced in case the
1468 	 * R_PRIMARY crashes now.
1469 	 * Therefore we must send the barrier_ack after the barrier request was
1470 	 * completed. */
1471 	switch (connection->write_ordering) {
1472 	case WO_none:
1473 		if (rv == FE_RECYCLED)
1474 			return 0;
1475 
1476 		/* receiver context, in the writeout path of the other node.
1477 		 * avoid potential distributed deadlock */
1478 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1479 		if (epoch)
1480 			break;
1481 		else
1482 			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1483 			/* Fall through */
1484 
1485 	case WO_bdev_flush:
1486 	case WO_drain_io:
1487 		conn_wait_active_ee_empty(connection);
1488 		drbd_flush(connection);
1489 
1490 		if (atomic_read(&connection->current_epoch->epoch_size)) {
1491 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1492 			if (epoch)
1493 				break;
1494 		}
1495 
1496 		return 0;
1497 	default:
1498 		drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
1499 		return -EIO;
1500 	}
1501 
1502 	epoch->flags = 0;
1503 	atomic_set(&epoch->epoch_size, 0);
1504 	atomic_set(&epoch->active, 0);
1505 
1506 	spin_lock(&connection->epoch_lock);
1507 	if (atomic_read(&connection->current_epoch->epoch_size)) {
1508 		list_add(&epoch->list, &connection->current_epoch->list);
1509 		connection->current_epoch = epoch;
1510 		connection->epochs++;
1511 	} else {
1512 		/* The current_epoch got recycled while we allocated this one... */
1513 		kfree(epoch);
1514 	}
1515 	spin_unlock(&connection->epoch_lock);
1516 
1517 	return 0;
1518 }
1519 
1520 /* used from receive_RSDataReply (recv_resync_read)
1521  * and from receive_Data */
1522 static struct drbd_peer_request *
1523 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1524 	      struct packet_info *pi) __must_hold(local)
1525 {
1526 	struct drbd_device *device = peer_device->device;
1527 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
1528 	struct drbd_peer_request *peer_req;
1529 	struct page *page;
1530 	int dgs, ds, err;
1531 	int data_size = pi->size;
1532 	void *dig_in = peer_device->connection->int_dig_in;
1533 	void *dig_vv = peer_device->connection->int_dig_vv;
1534 	unsigned long *data;
1535 	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1536 
1537 	dgs = 0;
1538 	if (!trim && peer_device->connection->peer_integrity_tfm) {
1539 		dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1540 		/*
1541 		 * FIXME: Receive the incoming digest into the receive buffer
1542 		 *	  here, together with its struct p_data?
1543 		 */
1544 		err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1545 		if (err)
1546 			return NULL;
1547 		data_size -= dgs;
1548 	}
1549 
1550 	if (trim) {
1551 		D_ASSERT(peer_device, data_size == 0);
1552 		data_size = be32_to_cpu(trim->size);
1553 	}
1554 
1555 	if (!expect(IS_ALIGNED(data_size, 512)))
1556 		return NULL;
1557 	/* prepare for larger trim requests. */
1558 	if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1559 		return NULL;
1560 
1561 	/* even though we trust out peer,
1562 	 * we sometimes have to double check. */
1563 	if (sector + (data_size>>9) > capacity) {
1564 		drbd_err(device, "request from peer beyond end of local disk: "
1565 			"capacity: %llus < sector: %llus + size: %u\n",
1566 			(unsigned long long)capacity,
1567 			(unsigned long long)sector, data_size);
1568 		return NULL;
1569 	}
1570 
1571 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1572 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1573 	 * which in turn might block on the other node at this very place.  */
1574 	peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1575 	if (!peer_req)
1576 		return NULL;
1577 
1578 	if (trim)
1579 		return peer_req;
1580 
1581 	ds = data_size;
1582 	page = peer_req->pages;
1583 	page_chain_for_each(page) {
1584 		unsigned len = min_t(int, ds, PAGE_SIZE);
1585 		data = kmap(page);
1586 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1587 		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1588 			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1589 			data[0] = data[0] ^ (unsigned long)-1;
1590 		}
1591 		kunmap(page);
1592 		if (err) {
1593 			drbd_free_peer_req(device, peer_req);
1594 			return NULL;
1595 		}
1596 		ds -= len;
1597 	}
1598 
1599 	if (dgs) {
1600 		drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1601 		if (memcmp(dig_in, dig_vv, dgs)) {
1602 			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1603 				(unsigned long long)sector, data_size);
1604 			drbd_free_peer_req(device, peer_req);
1605 			return NULL;
1606 		}
1607 	}
1608 	device->recv_cnt += data_size>>9;
1609 	return peer_req;
1610 }
1611 
1612 /* drbd_drain_block() just takes a data block
1613  * out of the socket input buffer, and discards it.
1614  */
1615 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1616 {
1617 	struct page *page;
1618 	int err = 0;
1619 	void *data;
1620 
1621 	if (!data_size)
1622 		return 0;
1623 
1624 	page = drbd_alloc_pages(peer_device, 1, 1);
1625 
1626 	data = kmap(page);
1627 	while (data_size) {
1628 		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1629 
1630 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1631 		if (err)
1632 			break;
1633 		data_size -= len;
1634 	}
1635 	kunmap(page);
1636 	drbd_free_pages(peer_device->device, page, 0);
1637 	return err;
1638 }
1639 
1640 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1641 			   sector_t sector, int data_size)
1642 {
1643 	struct bio_vec bvec;
1644 	struct bvec_iter iter;
1645 	struct bio *bio;
1646 	int dgs, err, expect;
1647 	void *dig_in = peer_device->connection->int_dig_in;
1648 	void *dig_vv = peer_device->connection->int_dig_vv;
1649 
1650 	dgs = 0;
1651 	if (peer_device->connection->peer_integrity_tfm) {
1652 		dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1653 		err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1654 		if (err)
1655 			return err;
1656 		data_size -= dgs;
1657 	}
1658 
1659 	/* optimistically update recv_cnt.  if receiving fails below,
1660 	 * we disconnect anyways, and counters will be reset. */
1661 	peer_device->device->recv_cnt += data_size>>9;
1662 
1663 	bio = req->master_bio;
1664 	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1665 
1666 	bio_for_each_segment(bvec, bio, iter) {
1667 		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1668 		expect = min_t(int, data_size, bvec.bv_len);
1669 		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1670 		kunmap(bvec.bv_page);
1671 		if (err)
1672 			return err;
1673 		data_size -= expect;
1674 	}
1675 
1676 	if (dgs) {
1677 		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1678 		if (memcmp(dig_in, dig_vv, dgs)) {
1679 			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1680 			return -EINVAL;
1681 		}
1682 	}
1683 
1684 	D_ASSERT(peer_device->device, data_size == 0);
1685 	return 0;
1686 }
1687 
1688 /*
1689  * e_end_resync_block() is called in asender context via
1690  * drbd_finish_peer_reqs().
1691  */
1692 static int e_end_resync_block(struct drbd_work *w, int unused)
1693 {
1694 	struct drbd_peer_request *peer_req =
1695 		container_of(w, struct drbd_peer_request, w);
1696 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1697 	struct drbd_device *device = peer_device->device;
1698 	sector_t sector = peer_req->i.sector;
1699 	int err;
1700 
1701 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1702 
1703 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1704 		drbd_set_in_sync(device, sector, peer_req->i.size);
1705 		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1706 	} else {
1707 		/* Record failure to sync */
1708 		drbd_rs_failed_io(device, sector, peer_req->i.size);
1709 
1710 		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1711 	}
1712 	dec_unacked(device);
1713 
1714 	return err;
1715 }
1716 
1717 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1718 			    struct packet_info *pi) __releases(local)
1719 {
1720 	struct drbd_device *device = peer_device->device;
1721 	struct drbd_peer_request *peer_req;
1722 
1723 	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1724 	if (!peer_req)
1725 		goto fail;
1726 
1727 	dec_rs_pending(device);
1728 
1729 	inc_unacked(device);
1730 	/* corresponding dec_unacked() in e_end_resync_block()
1731 	 * respective _drbd_clear_done_ee */
1732 
1733 	peer_req->w.cb = e_end_resync_block;
1734 
1735 	spin_lock_irq(&device->resource->req_lock);
1736 	list_add(&peer_req->w.list, &device->sync_ee);
1737 	spin_unlock_irq(&device->resource->req_lock);
1738 
1739 	atomic_add(pi->size >> 9, &device->rs_sect_ev);
1740 	if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1741 		return 0;
1742 
1743 	/* don't care for the reason here */
1744 	drbd_err(device, "submit failed, triggering re-connect\n");
1745 	spin_lock_irq(&device->resource->req_lock);
1746 	list_del(&peer_req->w.list);
1747 	spin_unlock_irq(&device->resource->req_lock);
1748 
1749 	drbd_free_peer_req(device, peer_req);
1750 fail:
1751 	put_ldev(device);
1752 	return -EIO;
1753 }
1754 
1755 static struct drbd_request *
1756 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1757 	     sector_t sector, bool missing_ok, const char *func)
1758 {
1759 	struct drbd_request *req;
1760 
1761 	/* Request object according to our peer */
1762 	req = (struct drbd_request *)(unsigned long)id;
1763 	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1764 		return req;
1765 	if (!missing_ok) {
1766 		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1767 			(unsigned long)id, (unsigned long long)sector);
1768 	}
1769 	return NULL;
1770 }
1771 
1772 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1773 {
1774 	struct drbd_peer_device *peer_device;
1775 	struct drbd_device *device;
1776 	struct drbd_request *req;
1777 	sector_t sector;
1778 	int err;
1779 	struct p_data *p = pi->data;
1780 
1781 	peer_device = conn_peer_device(connection, pi->vnr);
1782 	if (!peer_device)
1783 		return -EIO;
1784 	device = peer_device->device;
1785 
1786 	sector = be64_to_cpu(p->sector);
1787 
1788 	spin_lock_irq(&device->resource->req_lock);
1789 	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1790 	spin_unlock_irq(&device->resource->req_lock);
1791 	if (unlikely(!req))
1792 		return -EIO;
1793 
1794 	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1795 	 * special casing it there for the various failure cases.
1796 	 * still no race with drbd_fail_pending_reads */
1797 	err = recv_dless_read(peer_device, req, sector, pi->size);
1798 	if (!err)
1799 		req_mod(req, DATA_RECEIVED);
1800 	/* else: nothing. handled from drbd_disconnect...
1801 	 * I don't think we may complete this just yet
1802 	 * in case we are "on-disconnect: freeze" */
1803 
1804 	return err;
1805 }
1806 
1807 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1808 {
1809 	struct drbd_peer_device *peer_device;
1810 	struct drbd_device *device;
1811 	sector_t sector;
1812 	int err;
1813 	struct p_data *p = pi->data;
1814 
1815 	peer_device = conn_peer_device(connection, pi->vnr);
1816 	if (!peer_device)
1817 		return -EIO;
1818 	device = peer_device->device;
1819 
1820 	sector = be64_to_cpu(p->sector);
1821 	D_ASSERT(device, p->block_id == ID_SYNCER);
1822 
1823 	if (get_ldev(device)) {
1824 		/* data is submitted to disk within recv_resync_read.
1825 		 * corresponding put_ldev done below on error,
1826 		 * or in drbd_peer_request_endio. */
1827 		err = recv_resync_read(peer_device, sector, pi);
1828 	} else {
1829 		if (__ratelimit(&drbd_ratelimit_state))
1830 			drbd_err(device, "Can not write resync data to local disk.\n");
1831 
1832 		err = drbd_drain_block(peer_device, pi->size);
1833 
1834 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1835 	}
1836 
1837 	atomic_add(pi->size >> 9, &device->rs_sect_in);
1838 
1839 	return err;
1840 }
1841 
1842 static void restart_conflicting_writes(struct drbd_device *device,
1843 				       sector_t sector, int size)
1844 {
1845 	struct drbd_interval *i;
1846 	struct drbd_request *req;
1847 
1848 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1849 		if (!i->local)
1850 			continue;
1851 		req = container_of(i, struct drbd_request, i);
1852 		if (req->rq_state & RQ_LOCAL_PENDING ||
1853 		    !(req->rq_state & RQ_POSTPONED))
1854 			continue;
1855 		/* as it is RQ_POSTPONED, this will cause it to
1856 		 * be queued on the retry workqueue. */
1857 		__req_mod(req, CONFLICT_RESOLVED, NULL);
1858 	}
1859 }
1860 
1861 /*
1862  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1863  */
1864 static int e_end_block(struct drbd_work *w, int cancel)
1865 {
1866 	struct drbd_peer_request *peer_req =
1867 		container_of(w, struct drbd_peer_request, w);
1868 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1869 	struct drbd_device *device = peer_device->device;
1870 	sector_t sector = peer_req->i.sector;
1871 	int err = 0, pcmd;
1872 
1873 	if (peer_req->flags & EE_SEND_WRITE_ACK) {
1874 		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1875 			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1876 				device->state.conn <= C_PAUSED_SYNC_T &&
1877 				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1878 				P_RS_WRITE_ACK : P_WRITE_ACK;
1879 			err = drbd_send_ack(peer_device, pcmd, peer_req);
1880 			if (pcmd == P_RS_WRITE_ACK)
1881 				drbd_set_in_sync(device, sector, peer_req->i.size);
1882 		} else {
1883 			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1884 			/* we expect it to be marked out of sync anyways...
1885 			 * maybe assert this?  */
1886 		}
1887 		dec_unacked(device);
1888 	}
1889 	/* we delete from the conflict detection hash _after_ we sent out the
1890 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1891 	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1892 		spin_lock_irq(&device->resource->req_lock);
1893 		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1894 		drbd_remove_epoch_entry_interval(device, peer_req);
1895 		if (peer_req->flags & EE_RESTART_REQUESTS)
1896 			restart_conflicting_writes(device, sector, peer_req->i.size);
1897 		spin_unlock_irq(&device->resource->req_lock);
1898 	} else
1899 		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1900 
1901 	drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1902 
1903 	return err;
1904 }
1905 
1906 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1907 {
1908 	struct drbd_peer_request *peer_req =
1909 		container_of(w, struct drbd_peer_request, w);
1910 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1911 	int err;
1912 
1913 	err = drbd_send_ack(peer_device, ack, peer_req);
1914 	dec_unacked(peer_device->device);
1915 
1916 	return err;
1917 }
1918 
1919 static int e_send_superseded(struct drbd_work *w, int unused)
1920 {
1921 	return e_send_ack(w, P_SUPERSEDED);
1922 }
1923 
1924 static int e_send_retry_write(struct drbd_work *w, int unused)
1925 {
1926 	struct drbd_peer_request *peer_req =
1927 		container_of(w, struct drbd_peer_request, w);
1928 	struct drbd_connection *connection = peer_req->peer_device->connection;
1929 
1930 	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1931 			     P_RETRY_WRITE : P_SUPERSEDED);
1932 }
1933 
1934 static bool seq_greater(u32 a, u32 b)
1935 {
1936 	/*
1937 	 * We assume 32-bit wrap-around here.
1938 	 * For 24-bit wrap-around, we would have to shift:
1939 	 *  a <<= 8; b <<= 8;
1940 	 */
1941 	return (s32)a - (s32)b > 0;
1942 }
1943 
1944 static u32 seq_max(u32 a, u32 b)
1945 {
1946 	return seq_greater(a, b) ? a : b;
1947 }
1948 
1949 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
1950 {
1951 	struct drbd_device *device = peer_device->device;
1952 	unsigned int newest_peer_seq;
1953 
1954 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
1955 		spin_lock(&device->peer_seq_lock);
1956 		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
1957 		device->peer_seq = newest_peer_seq;
1958 		spin_unlock(&device->peer_seq_lock);
1959 		/* wake up only if we actually changed device->peer_seq */
1960 		if (peer_seq == newest_peer_seq)
1961 			wake_up(&device->seq_wait);
1962 	}
1963 }
1964 
1965 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1966 {
1967 	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1968 }
1969 
1970 /* maybe change sync_ee into interval trees as well? */
1971 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
1972 {
1973 	struct drbd_peer_request *rs_req;
1974 	bool rv = 0;
1975 
1976 	spin_lock_irq(&device->resource->req_lock);
1977 	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
1978 		if (overlaps(peer_req->i.sector, peer_req->i.size,
1979 			     rs_req->i.sector, rs_req->i.size)) {
1980 			rv = 1;
1981 			break;
1982 		}
1983 	}
1984 	spin_unlock_irq(&device->resource->req_lock);
1985 
1986 	return rv;
1987 }
1988 
1989 /* Called from receive_Data.
1990  * Synchronize packets on sock with packets on msock.
1991  *
1992  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1993  * packet traveling on msock, they are still processed in the order they have
1994  * been sent.
1995  *
1996  * Note: we don't care for Ack packets overtaking P_DATA packets.
1997  *
1998  * In case packet_seq is larger than device->peer_seq number, there are
1999  * outstanding packets on the msock. We wait for them to arrive.
2000  * In case we are the logically next packet, we update device->peer_seq
2001  * ourselves. Correctly handles 32bit wrap around.
2002  *
2003  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2004  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2005  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2006  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2007  *
2008  * returns 0 if we may process the packet,
2009  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2010 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2011 {
2012 	struct drbd_device *device = peer_device->device;
2013 	DEFINE_WAIT(wait);
2014 	long timeout;
2015 	int ret = 0, tp;
2016 
2017 	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2018 		return 0;
2019 
2020 	spin_lock(&device->peer_seq_lock);
2021 	for (;;) {
2022 		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2023 			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2024 			break;
2025 		}
2026 
2027 		if (signal_pending(current)) {
2028 			ret = -ERESTARTSYS;
2029 			break;
2030 		}
2031 
2032 		rcu_read_lock();
2033 		tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2034 		rcu_read_unlock();
2035 
2036 		if (!tp)
2037 			break;
2038 
2039 		/* Only need to wait if two_primaries is enabled */
2040 		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2041 		spin_unlock(&device->peer_seq_lock);
2042 		rcu_read_lock();
2043 		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2044 		rcu_read_unlock();
2045 		timeout = schedule_timeout(timeout);
2046 		spin_lock(&device->peer_seq_lock);
2047 		if (!timeout) {
2048 			ret = -ETIMEDOUT;
2049 			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2050 			break;
2051 		}
2052 	}
2053 	spin_unlock(&device->peer_seq_lock);
2054 	finish_wait(&device->seq_wait, &wait);
2055 	return ret;
2056 }
2057 
2058 /* see also bio_flags_to_wire()
2059  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2060  * flags and back. We may replicate to other kernel versions. */
2061 static unsigned long wire_flags_to_bio(u32 dpf)
2062 {
2063 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2064 		(dpf & DP_FUA ? REQ_FUA : 0) |
2065 		(dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2066 		(dpf & DP_DISCARD ? REQ_DISCARD : 0);
2067 }
2068 
2069 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2070 				    unsigned int size)
2071 {
2072 	struct drbd_interval *i;
2073 
2074     repeat:
2075 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2076 		struct drbd_request *req;
2077 		struct bio_and_error m;
2078 
2079 		if (!i->local)
2080 			continue;
2081 		req = container_of(i, struct drbd_request, i);
2082 		if (!(req->rq_state & RQ_POSTPONED))
2083 			continue;
2084 		req->rq_state &= ~RQ_POSTPONED;
2085 		__req_mod(req, NEG_ACKED, &m);
2086 		spin_unlock_irq(&device->resource->req_lock);
2087 		if (m.bio)
2088 			complete_master_bio(device, &m);
2089 		spin_lock_irq(&device->resource->req_lock);
2090 		goto repeat;
2091 	}
2092 }
2093 
2094 static int handle_write_conflicts(struct drbd_device *device,
2095 				  struct drbd_peer_request *peer_req)
2096 {
2097 	struct drbd_connection *connection = peer_req->peer_device->connection;
2098 	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2099 	sector_t sector = peer_req->i.sector;
2100 	const unsigned int size = peer_req->i.size;
2101 	struct drbd_interval *i;
2102 	bool equal;
2103 	int err;
2104 
2105 	/*
2106 	 * Inserting the peer request into the write_requests tree will prevent
2107 	 * new conflicting local requests from being added.
2108 	 */
2109 	drbd_insert_interval(&device->write_requests, &peer_req->i);
2110 
2111     repeat:
2112 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2113 		if (i == &peer_req->i)
2114 			continue;
2115 
2116 		if (!i->local) {
2117 			/*
2118 			 * Our peer has sent a conflicting remote request; this
2119 			 * should not happen in a two-node setup.  Wait for the
2120 			 * earlier peer request to complete.
2121 			 */
2122 			err = drbd_wait_misc(device, i);
2123 			if (err)
2124 				goto out;
2125 			goto repeat;
2126 		}
2127 
2128 		equal = i->sector == sector && i->size == size;
2129 		if (resolve_conflicts) {
2130 			/*
2131 			 * If the peer request is fully contained within the
2132 			 * overlapping request, it can be considered overwritten
2133 			 * and thus superseded; otherwise, it will be retried
2134 			 * once all overlapping requests have completed.
2135 			 */
2136 			bool superseded = i->sector <= sector && i->sector +
2137 				       (i->size >> 9) >= sector + (size >> 9);
2138 
2139 			if (!equal)
2140 				drbd_alert(device, "Concurrent writes detected: "
2141 					       "local=%llus +%u, remote=%llus +%u, "
2142 					       "assuming %s came first\n",
2143 					  (unsigned long long)i->sector, i->size,
2144 					  (unsigned long long)sector, size,
2145 					  superseded ? "local" : "remote");
2146 
2147 			inc_unacked(device);
2148 			peer_req->w.cb = superseded ? e_send_superseded :
2149 						   e_send_retry_write;
2150 			list_add_tail(&peer_req->w.list, &device->done_ee);
2151 			wake_asender(connection);
2152 
2153 			err = -ENOENT;
2154 			goto out;
2155 		} else {
2156 			struct drbd_request *req =
2157 				container_of(i, struct drbd_request, i);
2158 
2159 			if (!equal)
2160 				drbd_alert(device, "Concurrent writes detected: "
2161 					       "local=%llus +%u, remote=%llus +%u\n",
2162 					  (unsigned long long)i->sector, i->size,
2163 					  (unsigned long long)sector, size);
2164 
2165 			if (req->rq_state & RQ_LOCAL_PENDING ||
2166 			    !(req->rq_state & RQ_POSTPONED)) {
2167 				/*
2168 				 * Wait for the node with the discard flag to
2169 				 * decide if this request has been superseded
2170 				 * or needs to be retried.
2171 				 * Requests that have been superseded will
2172 				 * disappear from the write_requests tree.
2173 				 *
2174 				 * In addition, wait for the conflicting
2175 				 * request to finish locally before submitting
2176 				 * the conflicting peer request.
2177 				 */
2178 				err = drbd_wait_misc(device, &req->i);
2179 				if (err) {
2180 					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2181 					fail_postponed_requests(device, sector, size);
2182 					goto out;
2183 				}
2184 				goto repeat;
2185 			}
2186 			/*
2187 			 * Remember to restart the conflicting requests after
2188 			 * the new peer request has completed.
2189 			 */
2190 			peer_req->flags |= EE_RESTART_REQUESTS;
2191 		}
2192 	}
2193 	err = 0;
2194 
2195     out:
2196 	if (err)
2197 		drbd_remove_epoch_entry_interval(device, peer_req);
2198 	return err;
2199 }
2200 
2201 /* mirrored write */
2202 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2203 {
2204 	struct drbd_peer_device *peer_device;
2205 	struct drbd_device *device;
2206 	sector_t sector;
2207 	struct drbd_peer_request *peer_req;
2208 	struct p_data *p = pi->data;
2209 	u32 peer_seq = be32_to_cpu(p->seq_num);
2210 	int rw = WRITE;
2211 	u32 dp_flags;
2212 	int err, tp;
2213 
2214 	peer_device = conn_peer_device(connection, pi->vnr);
2215 	if (!peer_device)
2216 		return -EIO;
2217 	device = peer_device->device;
2218 
2219 	if (!get_ldev(device)) {
2220 		int err2;
2221 
2222 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2223 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2224 		atomic_inc(&connection->current_epoch->epoch_size);
2225 		err2 = drbd_drain_block(peer_device, pi->size);
2226 		if (!err)
2227 			err = err2;
2228 		return err;
2229 	}
2230 
2231 	/*
2232 	 * Corresponding put_ldev done either below (on various errors), or in
2233 	 * drbd_peer_request_endio, if we successfully submit the data at the
2234 	 * end of this function.
2235 	 */
2236 
2237 	sector = be64_to_cpu(p->sector);
2238 	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2239 	if (!peer_req) {
2240 		put_ldev(device);
2241 		return -EIO;
2242 	}
2243 
2244 	peer_req->w.cb = e_end_block;
2245 
2246 	dp_flags = be32_to_cpu(p->dp_flags);
2247 	rw |= wire_flags_to_bio(dp_flags);
2248 	if (pi->cmd == P_TRIM) {
2249 		struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2250 		peer_req->flags |= EE_IS_TRIM;
2251 		if (!blk_queue_discard(q))
2252 			peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2253 		D_ASSERT(peer_device, peer_req->i.size > 0);
2254 		D_ASSERT(peer_device, rw & REQ_DISCARD);
2255 		D_ASSERT(peer_device, peer_req->pages == NULL);
2256 	} else if (peer_req->pages == NULL) {
2257 		D_ASSERT(device, peer_req->i.size == 0);
2258 		D_ASSERT(device, dp_flags & DP_FLUSH);
2259 	}
2260 
2261 	if (dp_flags & DP_MAY_SET_IN_SYNC)
2262 		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2263 
2264 	spin_lock(&connection->epoch_lock);
2265 	peer_req->epoch = connection->current_epoch;
2266 	atomic_inc(&peer_req->epoch->epoch_size);
2267 	atomic_inc(&peer_req->epoch->active);
2268 	spin_unlock(&connection->epoch_lock);
2269 
2270 	rcu_read_lock();
2271 	tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2272 	rcu_read_unlock();
2273 	if (tp) {
2274 		peer_req->flags |= EE_IN_INTERVAL_TREE;
2275 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2276 		if (err)
2277 			goto out_interrupted;
2278 		spin_lock_irq(&device->resource->req_lock);
2279 		err = handle_write_conflicts(device, peer_req);
2280 		if (err) {
2281 			spin_unlock_irq(&device->resource->req_lock);
2282 			if (err == -ENOENT) {
2283 				put_ldev(device);
2284 				return 0;
2285 			}
2286 			goto out_interrupted;
2287 		}
2288 	} else {
2289 		update_peer_seq(peer_device, peer_seq);
2290 		spin_lock_irq(&device->resource->req_lock);
2291 	}
2292 	/* if we use the zeroout fallback code, we process synchronously
2293 	 * and we wait for all pending requests, respectively wait for
2294 	 * active_ee to become empty in drbd_submit_peer_request();
2295 	 * better not add ourselves here. */
2296 	if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2297 		list_add(&peer_req->w.list, &device->active_ee);
2298 	spin_unlock_irq(&device->resource->req_lock);
2299 
2300 	if (device->state.conn == C_SYNC_TARGET)
2301 		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2302 
2303 	if (peer_device->connection->agreed_pro_version < 100) {
2304 		rcu_read_lock();
2305 		switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) {
2306 		case DRBD_PROT_C:
2307 			dp_flags |= DP_SEND_WRITE_ACK;
2308 			break;
2309 		case DRBD_PROT_B:
2310 			dp_flags |= DP_SEND_RECEIVE_ACK;
2311 			break;
2312 		}
2313 		rcu_read_unlock();
2314 	}
2315 
2316 	if (dp_flags & DP_SEND_WRITE_ACK) {
2317 		peer_req->flags |= EE_SEND_WRITE_ACK;
2318 		inc_unacked(device);
2319 		/* corresponding dec_unacked() in e_end_block()
2320 		 * respective _drbd_clear_done_ee */
2321 	}
2322 
2323 	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2324 		/* I really don't like it that the receiver thread
2325 		 * sends on the msock, but anyways */
2326 		drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2327 	}
2328 
2329 	if (device->state.pdsk < D_INCONSISTENT) {
2330 		/* In case we have the only disk of the cluster, */
2331 		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2332 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2333 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2334 		drbd_al_begin_io(device, &peer_req->i, true);
2335 	}
2336 
2337 	err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2338 	if (!err)
2339 		return 0;
2340 
2341 	/* don't care for the reason here */
2342 	drbd_err(device, "submit failed, triggering re-connect\n");
2343 	spin_lock_irq(&device->resource->req_lock);
2344 	list_del(&peer_req->w.list);
2345 	drbd_remove_epoch_entry_interval(device, peer_req);
2346 	spin_unlock_irq(&device->resource->req_lock);
2347 	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2348 		drbd_al_complete_io(device, &peer_req->i);
2349 
2350 out_interrupted:
2351 	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2352 	put_ldev(device);
2353 	drbd_free_peer_req(device, peer_req);
2354 	return err;
2355 }
2356 
2357 /* We may throttle resync, if the lower device seems to be busy,
2358  * and current sync rate is above c_min_rate.
2359  *
2360  * To decide whether or not the lower device is busy, we use a scheme similar
2361  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2362  * (more than 64 sectors) of activity we cannot account for with our own resync
2363  * activity, it obviously is "busy".
2364  *
2365  * The current sync rate used here uses only the most recent two step marks,
2366  * to have a short time average so we can react faster.
2367  */
2368 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
2369 {
2370 	struct lc_element *tmp;
2371 	bool throttle = true;
2372 
2373 	if (!drbd_rs_c_min_rate_throttle(device))
2374 		return false;
2375 
2376 	spin_lock_irq(&device->al_lock);
2377 	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2378 	if (tmp) {
2379 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2380 		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2381 			throttle = false;
2382 		/* Do not slow down if app IO is already waiting for this extent */
2383 	}
2384 	spin_unlock_irq(&device->al_lock);
2385 
2386 	return throttle;
2387 }
2388 
2389 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2390 {
2391 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2392 	unsigned long db, dt, dbdt;
2393 	unsigned int c_min_rate;
2394 	int curr_events;
2395 
2396 	rcu_read_lock();
2397 	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2398 	rcu_read_unlock();
2399 
2400 	/* feature disabled? */
2401 	if (c_min_rate == 0)
2402 		return false;
2403 
2404 	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2405 		      (int)part_stat_read(&disk->part0, sectors[1]) -
2406 			atomic_read(&device->rs_sect_ev);
2407 	if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
2408 		unsigned long rs_left;
2409 		int i;
2410 
2411 		device->rs_last_events = curr_events;
2412 
2413 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2414 		 * approx. */
2415 		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2416 
2417 		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2418 			rs_left = device->ov_left;
2419 		else
2420 			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2421 
2422 		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2423 		if (!dt)
2424 			dt++;
2425 		db = device->rs_mark_left[i] - rs_left;
2426 		dbdt = Bit2KB(db/dt);
2427 
2428 		if (dbdt > c_min_rate)
2429 			return true;
2430 	}
2431 	return false;
2432 }
2433 
2434 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2435 {
2436 	struct drbd_peer_device *peer_device;
2437 	struct drbd_device *device;
2438 	sector_t sector;
2439 	sector_t capacity;
2440 	struct drbd_peer_request *peer_req;
2441 	struct digest_info *di = NULL;
2442 	int size, verb;
2443 	unsigned int fault_type;
2444 	struct p_block_req *p =	pi->data;
2445 
2446 	peer_device = conn_peer_device(connection, pi->vnr);
2447 	if (!peer_device)
2448 		return -EIO;
2449 	device = peer_device->device;
2450 	capacity = drbd_get_capacity(device->this_bdev);
2451 
2452 	sector = be64_to_cpu(p->sector);
2453 	size   = be32_to_cpu(p->blksize);
2454 
2455 	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2456 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2457 				(unsigned long long)sector, size);
2458 		return -EINVAL;
2459 	}
2460 	if (sector + (size>>9) > capacity) {
2461 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2462 				(unsigned long long)sector, size);
2463 		return -EINVAL;
2464 	}
2465 
2466 	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2467 		verb = 1;
2468 		switch (pi->cmd) {
2469 		case P_DATA_REQUEST:
2470 			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2471 			break;
2472 		case P_RS_DATA_REQUEST:
2473 		case P_CSUM_RS_REQUEST:
2474 		case P_OV_REQUEST:
2475 			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2476 			break;
2477 		case P_OV_REPLY:
2478 			verb = 0;
2479 			dec_rs_pending(device);
2480 			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2481 			break;
2482 		default:
2483 			BUG();
2484 		}
2485 		if (verb && __ratelimit(&drbd_ratelimit_state))
2486 			drbd_err(device, "Can not satisfy peer's read request, "
2487 			    "no local data.\n");
2488 
2489 		/* drain possibly payload */
2490 		return drbd_drain_block(peer_device, pi->size);
2491 	}
2492 
2493 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2494 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2495 	 * which in turn might block on the other node at this very place.  */
2496 	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2497 			true /* has real payload */, GFP_NOIO);
2498 	if (!peer_req) {
2499 		put_ldev(device);
2500 		return -ENOMEM;
2501 	}
2502 
2503 	switch (pi->cmd) {
2504 	case P_DATA_REQUEST:
2505 		peer_req->w.cb = w_e_end_data_req;
2506 		fault_type = DRBD_FAULT_DT_RD;
2507 		/* application IO, don't drbd_rs_begin_io */
2508 		goto submit;
2509 
2510 	case P_RS_DATA_REQUEST:
2511 		peer_req->w.cb = w_e_end_rsdata_req;
2512 		fault_type = DRBD_FAULT_RS_RD;
2513 		/* used in the sector offset progress display */
2514 		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2515 		break;
2516 
2517 	case P_OV_REPLY:
2518 	case P_CSUM_RS_REQUEST:
2519 		fault_type = DRBD_FAULT_RS_RD;
2520 		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2521 		if (!di)
2522 			goto out_free_e;
2523 
2524 		di->digest_size = pi->size;
2525 		di->digest = (((char *)di)+sizeof(struct digest_info));
2526 
2527 		peer_req->digest = di;
2528 		peer_req->flags |= EE_HAS_DIGEST;
2529 
2530 		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2531 			goto out_free_e;
2532 
2533 		if (pi->cmd == P_CSUM_RS_REQUEST) {
2534 			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2535 			peer_req->w.cb = w_e_end_csum_rs_req;
2536 			/* used in the sector offset progress display */
2537 			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2538 		} else if (pi->cmd == P_OV_REPLY) {
2539 			/* track progress, we may need to throttle */
2540 			atomic_add(size >> 9, &device->rs_sect_in);
2541 			peer_req->w.cb = w_e_end_ov_reply;
2542 			dec_rs_pending(device);
2543 			/* drbd_rs_begin_io done when we sent this request,
2544 			 * but accounting still needs to be done. */
2545 			goto submit_for_resync;
2546 		}
2547 		break;
2548 
2549 	case P_OV_REQUEST:
2550 		if (device->ov_start_sector == ~(sector_t)0 &&
2551 		    peer_device->connection->agreed_pro_version >= 90) {
2552 			unsigned long now = jiffies;
2553 			int i;
2554 			device->ov_start_sector = sector;
2555 			device->ov_position = sector;
2556 			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2557 			device->rs_total = device->ov_left;
2558 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2559 				device->rs_mark_left[i] = device->ov_left;
2560 				device->rs_mark_time[i] = now;
2561 			}
2562 			drbd_info(device, "Online Verify start sector: %llu\n",
2563 					(unsigned long long)sector);
2564 		}
2565 		peer_req->w.cb = w_e_end_ov_req;
2566 		fault_type = DRBD_FAULT_RS_RD;
2567 		break;
2568 
2569 	default:
2570 		BUG();
2571 	}
2572 
2573 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2574 	 * wrt the receiver, but it is not as straightforward as it may seem.
2575 	 * Various places in the resync start and stop logic assume resync
2576 	 * requests are processed in order, requeuing this on the worker thread
2577 	 * introduces a bunch of new code for synchronization between threads.
2578 	 *
2579 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2580 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2581 	 * for application writes for the same time.  For now, just throttle
2582 	 * here, where the rest of the code expects the receiver to sleep for
2583 	 * a while, anyways.
2584 	 */
2585 
2586 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2587 	 * this defers syncer requests for some time, before letting at least
2588 	 * on request through.  The resync controller on the receiving side
2589 	 * will adapt to the incoming rate accordingly.
2590 	 *
2591 	 * We cannot throttle here if remote is Primary/SyncTarget:
2592 	 * we would also throttle its application reads.
2593 	 * In that case, throttling is done on the SyncTarget only.
2594 	 */
2595 	if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
2596 		schedule_timeout_uninterruptible(HZ/10);
2597 	if (drbd_rs_begin_io(device, sector))
2598 		goto out_free_e;
2599 
2600 submit_for_resync:
2601 	atomic_add(size >> 9, &device->rs_sect_ev);
2602 
2603 submit:
2604 	inc_unacked(device);
2605 	spin_lock_irq(&device->resource->req_lock);
2606 	list_add_tail(&peer_req->w.list, &device->read_ee);
2607 	spin_unlock_irq(&device->resource->req_lock);
2608 
2609 	if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2610 		return 0;
2611 
2612 	/* don't care for the reason here */
2613 	drbd_err(device, "submit failed, triggering re-connect\n");
2614 	spin_lock_irq(&device->resource->req_lock);
2615 	list_del(&peer_req->w.list);
2616 	spin_unlock_irq(&device->resource->req_lock);
2617 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2618 
2619 out_free_e:
2620 	put_ldev(device);
2621 	drbd_free_peer_req(device, peer_req);
2622 	return -EIO;
2623 }
2624 
2625 /**
2626  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2627  */
2628 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2629 {
2630 	struct drbd_device *device = peer_device->device;
2631 	int self, peer, rv = -100;
2632 	unsigned long ch_self, ch_peer;
2633 	enum drbd_after_sb_p after_sb_0p;
2634 
2635 	self = device->ldev->md.uuid[UI_BITMAP] & 1;
2636 	peer = device->p_uuid[UI_BITMAP] & 1;
2637 
2638 	ch_peer = device->p_uuid[UI_SIZE];
2639 	ch_self = device->comm_bm_set;
2640 
2641 	rcu_read_lock();
2642 	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2643 	rcu_read_unlock();
2644 	switch (after_sb_0p) {
2645 	case ASB_CONSENSUS:
2646 	case ASB_DISCARD_SECONDARY:
2647 	case ASB_CALL_HELPER:
2648 	case ASB_VIOLENTLY:
2649 		drbd_err(device, "Configuration error.\n");
2650 		break;
2651 	case ASB_DISCONNECT:
2652 		break;
2653 	case ASB_DISCARD_YOUNGER_PRI:
2654 		if (self == 0 && peer == 1) {
2655 			rv = -1;
2656 			break;
2657 		}
2658 		if (self == 1 && peer == 0) {
2659 			rv =  1;
2660 			break;
2661 		}
2662 		/* Else fall through to one of the other strategies... */
2663 	case ASB_DISCARD_OLDER_PRI:
2664 		if (self == 0 && peer == 1) {
2665 			rv = 1;
2666 			break;
2667 		}
2668 		if (self == 1 && peer == 0) {
2669 			rv = -1;
2670 			break;
2671 		}
2672 		/* Else fall through to one of the other strategies... */
2673 		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2674 		     "Using discard-least-changes instead\n");
2675 	case ASB_DISCARD_ZERO_CHG:
2676 		if (ch_peer == 0 && ch_self == 0) {
2677 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2678 				? -1 : 1;
2679 			break;
2680 		} else {
2681 			if (ch_peer == 0) { rv =  1; break; }
2682 			if (ch_self == 0) { rv = -1; break; }
2683 		}
2684 		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2685 			break;
2686 	case ASB_DISCARD_LEAST_CHG:
2687 		if	(ch_self < ch_peer)
2688 			rv = -1;
2689 		else if (ch_self > ch_peer)
2690 			rv =  1;
2691 		else /* ( ch_self == ch_peer ) */
2692 		     /* Well, then use something else. */
2693 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2694 				? -1 : 1;
2695 		break;
2696 	case ASB_DISCARD_LOCAL:
2697 		rv = -1;
2698 		break;
2699 	case ASB_DISCARD_REMOTE:
2700 		rv =  1;
2701 	}
2702 
2703 	return rv;
2704 }
2705 
2706 /**
2707  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2708  */
2709 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2710 {
2711 	struct drbd_device *device = peer_device->device;
2712 	int hg, rv = -100;
2713 	enum drbd_after_sb_p after_sb_1p;
2714 
2715 	rcu_read_lock();
2716 	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2717 	rcu_read_unlock();
2718 	switch (after_sb_1p) {
2719 	case ASB_DISCARD_YOUNGER_PRI:
2720 	case ASB_DISCARD_OLDER_PRI:
2721 	case ASB_DISCARD_LEAST_CHG:
2722 	case ASB_DISCARD_LOCAL:
2723 	case ASB_DISCARD_REMOTE:
2724 	case ASB_DISCARD_ZERO_CHG:
2725 		drbd_err(device, "Configuration error.\n");
2726 		break;
2727 	case ASB_DISCONNECT:
2728 		break;
2729 	case ASB_CONSENSUS:
2730 		hg = drbd_asb_recover_0p(peer_device);
2731 		if (hg == -1 && device->state.role == R_SECONDARY)
2732 			rv = hg;
2733 		if (hg == 1  && device->state.role == R_PRIMARY)
2734 			rv = hg;
2735 		break;
2736 	case ASB_VIOLENTLY:
2737 		rv = drbd_asb_recover_0p(peer_device);
2738 		break;
2739 	case ASB_DISCARD_SECONDARY:
2740 		return device->state.role == R_PRIMARY ? 1 : -1;
2741 	case ASB_CALL_HELPER:
2742 		hg = drbd_asb_recover_0p(peer_device);
2743 		if (hg == -1 && device->state.role == R_PRIMARY) {
2744 			enum drbd_state_rv rv2;
2745 
2746 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2747 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2748 			  * we do not need to wait for the after state change work either. */
2749 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2750 			if (rv2 != SS_SUCCESS) {
2751 				drbd_khelper(device, "pri-lost-after-sb");
2752 			} else {
2753 				drbd_warn(device, "Successfully gave up primary role.\n");
2754 				rv = hg;
2755 			}
2756 		} else
2757 			rv = hg;
2758 	}
2759 
2760 	return rv;
2761 }
2762 
2763 /**
2764  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2765  */
2766 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2767 {
2768 	struct drbd_device *device = peer_device->device;
2769 	int hg, rv = -100;
2770 	enum drbd_after_sb_p after_sb_2p;
2771 
2772 	rcu_read_lock();
2773 	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2774 	rcu_read_unlock();
2775 	switch (after_sb_2p) {
2776 	case ASB_DISCARD_YOUNGER_PRI:
2777 	case ASB_DISCARD_OLDER_PRI:
2778 	case ASB_DISCARD_LEAST_CHG:
2779 	case ASB_DISCARD_LOCAL:
2780 	case ASB_DISCARD_REMOTE:
2781 	case ASB_CONSENSUS:
2782 	case ASB_DISCARD_SECONDARY:
2783 	case ASB_DISCARD_ZERO_CHG:
2784 		drbd_err(device, "Configuration error.\n");
2785 		break;
2786 	case ASB_VIOLENTLY:
2787 		rv = drbd_asb_recover_0p(peer_device);
2788 		break;
2789 	case ASB_DISCONNECT:
2790 		break;
2791 	case ASB_CALL_HELPER:
2792 		hg = drbd_asb_recover_0p(peer_device);
2793 		if (hg == -1) {
2794 			enum drbd_state_rv rv2;
2795 
2796 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2797 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2798 			  * we do not need to wait for the after state change work either. */
2799 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2800 			if (rv2 != SS_SUCCESS) {
2801 				drbd_khelper(device, "pri-lost-after-sb");
2802 			} else {
2803 				drbd_warn(device, "Successfully gave up primary role.\n");
2804 				rv = hg;
2805 			}
2806 		} else
2807 			rv = hg;
2808 	}
2809 
2810 	return rv;
2811 }
2812 
2813 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2814 			   u64 bits, u64 flags)
2815 {
2816 	if (!uuid) {
2817 		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2818 		return;
2819 	}
2820 	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2821 	     text,
2822 	     (unsigned long long)uuid[UI_CURRENT],
2823 	     (unsigned long long)uuid[UI_BITMAP],
2824 	     (unsigned long long)uuid[UI_HISTORY_START],
2825 	     (unsigned long long)uuid[UI_HISTORY_END],
2826 	     (unsigned long long)bits,
2827 	     (unsigned long long)flags);
2828 }
2829 
2830 /*
2831   100	after split brain try auto recover
2832     2	C_SYNC_SOURCE set BitMap
2833     1	C_SYNC_SOURCE use BitMap
2834     0	no Sync
2835    -1	C_SYNC_TARGET use BitMap
2836    -2	C_SYNC_TARGET set BitMap
2837  -100	after split brain, disconnect
2838 -1000	unrelated data
2839 -1091   requires proto 91
2840 -1096   requires proto 96
2841  */
2842 static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
2843 {
2844 	u64 self, peer;
2845 	int i, j;
2846 
2847 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2848 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2849 
2850 	*rule_nr = 10;
2851 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2852 		return 0;
2853 
2854 	*rule_nr = 20;
2855 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2856 	     peer != UUID_JUST_CREATED)
2857 		return -2;
2858 
2859 	*rule_nr = 30;
2860 	if (self != UUID_JUST_CREATED &&
2861 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2862 		return 2;
2863 
2864 	if (self == peer) {
2865 		int rct, dc; /* roles at crash time */
2866 
2867 		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2868 
2869 			if (first_peer_device(device)->connection->agreed_pro_version < 91)
2870 				return -1091;
2871 
2872 			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2873 			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2874 				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2875 				drbd_uuid_move_history(device);
2876 				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2877 				device->ldev->md.uuid[UI_BITMAP] = 0;
2878 
2879 				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2880 					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2881 				*rule_nr = 34;
2882 			} else {
2883 				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2884 				*rule_nr = 36;
2885 			}
2886 
2887 			return 1;
2888 		}
2889 
2890 		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2891 
2892 			if (first_peer_device(device)->connection->agreed_pro_version < 91)
2893 				return -1091;
2894 
2895 			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2896 			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2897 				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2898 
2899 				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2900 				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2901 				device->p_uuid[UI_BITMAP] = 0UL;
2902 
2903 				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2904 				*rule_nr = 35;
2905 			} else {
2906 				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
2907 				*rule_nr = 37;
2908 			}
2909 
2910 			return -1;
2911 		}
2912 
2913 		/* Common power [off|failure] */
2914 		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
2915 			(device->p_uuid[UI_FLAGS] & 2);
2916 		/* lowest bit is set when we were primary,
2917 		 * next bit (weight 2) is set when peer was primary */
2918 		*rule_nr = 40;
2919 
2920 		switch (rct) {
2921 		case 0: /* !self_pri && !peer_pri */ return 0;
2922 		case 1: /*  self_pri && !peer_pri */ return 1;
2923 		case 2: /* !self_pri &&  peer_pri */ return -1;
2924 		case 3: /*  self_pri &&  peer_pri */
2925 			dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
2926 			return dc ? -1 : 1;
2927 		}
2928 	}
2929 
2930 	*rule_nr = 50;
2931 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2932 	if (self == peer)
2933 		return -1;
2934 
2935 	*rule_nr = 51;
2936 	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
2937 	if (self == peer) {
2938 		if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2939 		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2940 		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2941 		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
2942 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2943 			   resync as sync source modifications of the peer's UUIDs. */
2944 
2945 			if (first_peer_device(device)->connection->agreed_pro_version < 91)
2946 				return -1091;
2947 
2948 			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
2949 			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
2950 
2951 			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
2952 			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2953 
2954 			return -1;
2955 		}
2956 	}
2957 
2958 	*rule_nr = 60;
2959 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2960 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2961 		peer = device->p_uuid[i] & ~((u64)1);
2962 		if (self == peer)
2963 			return -2;
2964 	}
2965 
2966 	*rule_nr = 70;
2967 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2968 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2969 	if (self == peer)
2970 		return 1;
2971 
2972 	*rule_nr = 71;
2973 	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2974 	if (self == peer) {
2975 		if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2976 		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2977 		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2978 		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2979 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2980 			   resync as sync source modifications of our UUIDs. */
2981 
2982 			if (first_peer_device(device)->connection->agreed_pro_version < 91)
2983 				return -1091;
2984 
2985 			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
2986 			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
2987 
2988 			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
2989 			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2990 				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2991 
2992 			return 1;
2993 		}
2994 	}
2995 
2996 
2997 	*rule_nr = 80;
2998 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2999 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3000 		self = device->ldev->md.uuid[i] & ~((u64)1);
3001 		if (self == peer)
3002 			return 2;
3003 	}
3004 
3005 	*rule_nr = 90;
3006 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3007 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3008 	if (self == peer && self != ((u64)0))
3009 		return 100;
3010 
3011 	*rule_nr = 100;
3012 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3013 		self = device->ldev->md.uuid[i] & ~((u64)1);
3014 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3015 			peer = device->p_uuid[j] & ~((u64)1);
3016 			if (self == peer)
3017 				return -100;
3018 		}
3019 	}
3020 
3021 	return -1000;
3022 }
3023 
3024 /* drbd_sync_handshake() returns the new conn state on success, or
3025    CONN_MASK (-1) on failure.
3026  */
3027 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3028 					   enum drbd_role peer_role,
3029 					   enum drbd_disk_state peer_disk) __must_hold(local)
3030 {
3031 	struct drbd_device *device = peer_device->device;
3032 	enum drbd_conns rv = C_MASK;
3033 	enum drbd_disk_state mydisk;
3034 	struct net_conf *nc;
3035 	int hg, rule_nr, rr_conflict, tentative;
3036 
3037 	mydisk = device->state.disk;
3038 	if (mydisk == D_NEGOTIATING)
3039 		mydisk = device->new_state_tmp.disk;
3040 
3041 	drbd_info(device, "drbd_sync_handshake:\n");
3042 
3043 	spin_lock_irq(&device->ldev->md.uuid_lock);
3044 	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3045 	drbd_uuid_dump(device, "peer", device->p_uuid,
3046 		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3047 
3048 	hg = drbd_uuid_compare(device, &rule_nr);
3049 	spin_unlock_irq(&device->ldev->md.uuid_lock);
3050 
3051 	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3052 
3053 	if (hg == -1000) {
3054 		drbd_alert(device, "Unrelated data, aborting!\n");
3055 		return C_MASK;
3056 	}
3057 	if (hg < -1000) {
3058 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3059 		return C_MASK;
3060 	}
3061 
3062 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3063 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3064 		int f = (hg == -100) || abs(hg) == 2;
3065 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3066 		if (f)
3067 			hg = hg*2;
3068 		drbd_info(device, "Becoming sync %s due to disk states.\n",
3069 		     hg > 0 ? "source" : "target");
3070 	}
3071 
3072 	if (abs(hg) == 100)
3073 		drbd_khelper(device, "initial-split-brain");
3074 
3075 	rcu_read_lock();
3076 	nc = rcu_dereference(peer_device->connection->net_conf);
3077 
3078 	if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3079 		int pcount = (device->state.role == R_PRIMARY)
3080 			   + (peer_role == R_PRIMARY);
3081 		int forced = (hg == -100);
3082 
3083 		switch (pcount) {
3084 		case 0:
3085 			hg = drbd_asb_recover_0p(peer_device);
3086 			break;
3087 		case 1:
3088 			hg = drbd_asb_recover_1p(peer_device);
3089 			break;
3090 		case 2:
3091 			hg = drbd_asb_recover_2p(peer_device);
3092 			break;
3093 		}
3094 		if (abs(hg) < 100) {
3095 			drbd_warn(device, "Split-Brain detected, %d primaries, "
3096 			     "automatically solved. Sync from %s node\n",
3097 			     pcount, (hg < 0) ? "peer" : "this");
3098 			if (forced) {
3099 				drbd_warn(device, "Doing a full sync, since"
3100 				     " UUIDs where ambiguous.\n");
3101 				hg = hg*2;
3102 			}
3103 		}
3104 	}
3105 
3106 	if (hg == -100) {
3107 		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3108 			hg = -1;
3109 		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3110 			hg = 1;
3111 
3112 		if (abs(hg) < 100)
3113 			drbd_warn(device, "Split-Brain detected, manually solved. "
3114 			     "Sync from %s node\n",
3115 			     (hg < 0) ? "peer" : "this");
3116 	}
3117 	rr_conflict = nc->rr_conflict;
3118 	tentative = nc->tentative;
3119 	rcu_read_unlock();
3120 
3121 	if (hg == -100) {
3122 		/* FIXME this log message is not correct if we end up here
3123 		 * after an attempted attach on a diskless node.
3124 		 * We just refuse to attach -- well, we drop the "connection"
3125 		 * to that disk, in a way... */
3126 		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3127 		drbd_khelper(device, "split-brain");
3128 		return C_MASK;
3129 	}
3130 
3131 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3132 		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3133 		return C_MASK;
3134 	}
3135 
3136 	if (hg < 0 && /* by intention we do not use mydisk here. */
3137 	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3138 		switch (rr_conflict) {
3139 		case ASB_CALL_HELPER:
3140 			drbd_khelper(device, "pri-lost");
3141 			/* fall through */
3142 		case ASB_DISCONNECT:
3143 			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3144 			return C_MASK;
3145 		case ASB_VIOLENTLY:
3146 			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3147 			     "assumption\n");
3148 		}
3149 	}
3150 
3151 	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3152 		if (hg == 0)
3153 			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3154 		else
3155 			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3156 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3157 				 abs(hg) >= 2 ? "full" : "bit-map based");
3158 		return C_MASK;
3159 	}
3160 
3161 	if (abs(hg) >= 2) {
3162 		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3163 		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3164 					BM_LOCKED_SET_ALLOWED))
3165 			return C_MASK;
3166 	}
3167 
3168 	if (hg > 0) { /* become sync source. */
3169 		rv = C_WF_BITMAP_S;
3170 	} else if (hg < 0) { /* become sync target */
3171 		rv = C_WF_BITMAP_T;
3172 	} else {
3173 		rv = C_CONNECTED;
3174 		if (drbd_bm_total_weight(device)) {
3175 			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3176 			     drbd_bm_total_weight(device));
3177 		}
3178 	}
3179 
3180 	return rv;
3181 }
3182 
3183 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3184 {
3185 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3186 	if (peer == ASB_DISCARD_REMOTE)
3187 		return ASB_DISCARD_LOCAL;
3188 
3189 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3190 	if (peer == ASB_DISCARD_LOCAL)
3191 		return ASB_DISCARD_REMOTE;
3192 
3193 	/* everything else is valid if they are equal on both sides. */
3194 	return peer;
3195 }
3196 
3197 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3198 {
3199 	struct p_protocol *p = pi->data;
3200 	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3201 	int p_proto, p_discard_my_data, p_two_primaries, cf;
3202 	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3203 	char integrity_alg[SHARED_SECRET_MAX] = "";
3204 	struct crypto_hash *peer_integrity_tfm = NULL;
3205 	void *int_dig_in = NULL, *int_dig_vv = NULL;
3206 
3207 	p_proto		= be32_to_cpu(p->protocol);
3208 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3209 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3210 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3211 	p_two_primaries = be32_to_cpu(p->two_primaries);
3212 	cf		= be32_to_cpu(p->conn_flags);
3213 	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3214 
3215 	if (connection->agreed_pro_version >= 87) {
3216 		int err;
3217 
3218 		if (pi->size > sizeof(integrity_alg))
3219 			return -EIO;
3220 		err = drbd_recv_all(connection, integrity_alg, pi->size);
3221 		if (err)
3222 			return err;
3223 		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3224 	}
3225 
3226 	if (pi->cmd != P_PROTOCOL_UPDATE) {
3227 		clear_bit(CONN_DRY_RUN, &connection->flags);
3228 
3229 		if (cf & CF_DRY_RUN)
3230 			set_bit(CONN_DRY_RUN, &connection->flags);
3231 
3232 		rcu_read_lock();
3233 		nc = rcu_dereference(connection->net_conf);
3234 
3235 		if (p_proto != nc->wire_protocol) {
3236 			drbd_err(connection, "incompatible %s settings\n", "protocol");
3237 			goto disconnect_rcu_unlock;
3238 		}
3239 
3240 		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3241 			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3242 			goto disconnect_rcu_unlock;
3243 		}
3244 
3245 		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3246 			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3247 			goto disconnect_rcu_unlock;
3248 		}
3249 
3250 		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3251 			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3252 			goto disconnect_rcu_unlock;
3253 		}
3254 
3255 		if (p_discard_my_data && nc->discard_my_data) {
3256 			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3257 			goto disconnect_rcu_unlock;
3258 		}
3259 
3260 		if (p_two_primaries != nc->two_primaries) {
3261 			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3262 			goto disconnect_rcu_unlock;
3263 		}
3264 
3265 		if (strcmp(integrity_alg, nc->integrity_alg)) {
3266 			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3267 			goto disconnect_rcu_unlock;
3268 		}
3269 
3270 		rcu_read_unlock();
3271 	}
3272 
3273 	if (integrity_alg[0]) {
3274 		int hash_size;
3275 
3276 		/*
3277 		 * We can only change the peer data integrity algorithm
3278 		 * here.  Changing our own data integrity algorithm
3279 		 * requires that we send a P_PROTOCOL_UPDATE packet at
3280 		 * the same time; otherwise, the peer has no way to
3281 		 * tell between which packets the algorithm should
3282 		 * change.
3283 		 */
3284 
3285 		peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3286 		if (!peer_integrity_tfm) {
3287 			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3288 				 integrity_alg);
3289 			goto disconnect;
3290 		}
3291 
3292 		hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3293 		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3294 		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3295 		if (!(int_dig_in && int_dig_vv)) {
3296 			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3297 			goto disconnect;
3298 		}
3299 	}
3300 
3301 	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3302 	if (!new_net_conf) {
3303 		drbd_err(connection, "Allocation of new net_conf failed\n");
3304 		goto disconnect;
3305 	}
3306 
3307 	mutex_lock(&connection->data.mutex);
3308 	mutex_lock(&connection->resource->conf_update);
3309 	old_net_conf = connection->net_conf;
3310 	*new_net_conf = *old_net_conf;
3311 
3312 	new_net_conf->wire_protocol = p_proto;
3313 	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3314 	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3315 	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3316 	new_net_conf->two_primaries = p_two_primaries;
3317 
3318 	rcu_assign_pointer(connection->net_conf, new_net_conf);
3319 	mutex_unlock(&connection->resource->conf_update);
3320 	mutex_unlock(&connection->data.mutex);
3321 
3322 	crypto_free_hash(connection->peer_integrity_tfm);
3323 	kfree(connection->int_dig_in);
3324 	kfree(connection->int_dig_vv);
3325 	connection->peer_integrity_tfm = peer_integrity_tfm;
3326 	connection->int_dig_in = int_dig_in;
3327 	connection->int_dig_vv = int_dig_vv;
3328 
3329 	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3330 		drbd_info(connection, "peer data-integrity-alg: %s\n",
3331 			  integrity_alg[0] ? integrity_alg : "(none)");
3332 
3333 	synchronize_rcu();
3334 	kfree(old_net_conf);
3335 	return 0;
3336 
3337 disconnect_rcu_unlock:
3338 	rcu_read_unlock();
3339 disconnect:
3340 	crypto_free_hash(peer_integrity_tfm);
3341 	kfree(int_dig_in);
3342 	kfree(int_dig_vv);
3343 	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3344 	return -EIO;
3345 }
3346 
3347 /* helper function
3348  * input: alg name, feature name
3349  * return: NULL (alg name was "")
3350  *         ERR_PTR(error) if something goes wrong
3351  *         or the crypto hash ptr, if it worked out ok. */
3352 static
3353 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3354 		const char *alg, const char *name)
3355 {
3356 	struct crypto_hash *tfm;
3357 
3358 	if (!alg[0])
3359 		return NULL;
3360 
3361 	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3362 	if (IS_ERR(tfm)) {
3363 		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3364 			alg, name, PTR_ERR(tfm));
3365 		return tfm;
3366 	}
3367 	return tfm;
3368 }
3369 
3370 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3371 {
3372 	void *buffer = connection->data.rbuf;
3373 	int size = pi->size;
3374 
3375 	while (size) {
3376 		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3377 		s = drbd_recv(connection, buffer, s);
3378 		if (s <= 0) {
3379 			if (s < 0)
3380 				return s;
3381 			break;
3382 		}
3383 		size -= s;
3384 	}
3385 	if (size)
3386 		return -EIO;
3387 	return 0;
3388 }
3389 
3390 /*
3391  * config_unknown_volume  -  device configuration command for unknown volume
3392  *
3393  * When a device is added to an existing connection, the node on which the
3394  * device is added first will send configuration commands to its peer but the
3395  * peer will not know about the device yet.  It will warn and ignore these
3396  * commands.  Once the device is added on the second node, the second node will
3397  * send the same device configuration commands, but in the other direction.
3398  *
3399  * (We can also end up here if drbd is misconfigured.)
3400  */
3401 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3402 {
3403 	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3404 		  cmdname(pi->cmd), pi->vnr);
3405 	return ignore_remaining_packet(connection, pi);
3406 }
3407 
3408 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3409 {
3410 	struct drbd_peer_device *peer_device;
3411 	struct drbd_device *device;
3412 	struct p_rs_param_95 *p;
3413 	unsigned int header_size, data_size, exp_max_sz;
3414 	struct crypto_hash *verify_tfm = NULL;
3415 	struct crypto_hash *csums_tfm = NULL;
3416 	struct net_conf *old_net_conf, *new_net_conf = NULL;
3417 	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3418 	const int apv = connection->agreed_pro_version;
3419 	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3420 	int fifo_size = 0;
3421 	int err;
3422 
3423 	peer_device = conn_peer_device(connection, pi->vnr);
3424 	if (!peer_device)
3425 		return config_unknown_volume(connection, pi);
3426 	device = peer_device->device;
3427 
3428 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3429 		    : apv == 88 ? sizeof(struct p_rs_param)
3430 					+ SHARED_SECRET_MAX
3431 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3432 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3433 
3434 	if (pi->size > exp_max_sz) {
3435 		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3436 		    pi->size, exp_max_sz);
3437 		return -EIO;
3438 	}
3439 
3440 	if (apv <= 88) {
3441 		header_size = sizeof(struct p_rs_param);
3442 		data_size = pi->size - header_size;
3443 	} else if (apv <= 94) {
3444 		header_size = sizeof(struct p_rs_param_89);
3445 		data_size = pi->size - header_size;
3446 		D_ASSERT(device, data_size == 0);
3447 	} else {
3448 		header_size = sizeof(struct p_rs_param_95);
3449 		data_size = pi->size - header_size;
3450 		D_ASSERT(device, data_size == 0);
3451 	}
3452 
3453 	/* initialize verify_alg and csums_alg */
3454 	p = pi->data;
3455 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3456 
3457 	err = drbd_recv_all(peer_device->connection, p, header_size);
3458 	if (err)
3459 		return err;
3460 
3461 	mutex_lock(&connection->resource->conf_update);
3462 	old_net_conf = peer_device->connection->net_conf;
3463 	if (get_ldev(device)) {
3464 		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3465 		if (!new_disk_conf) {
3466 			put_ldev(device);
3467 			mutex_unlock(&connection->resource->conf_update);
3468 			drbd_err(device, "Allocation of new disk_conf failed\n");
3469 			return -ENOMEM;
3470 		}
3471 
3472 		old_disk_conf = device->ldev->disk_conf;
3473 		*new_disk_conf = *old_disk_conf;
3474 
3475 		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3476 	}
3477 
3478 	if (apv >= 88) {
3479 		if (apv == 88) {
3480 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3481 				drbd_err(device, "verify-alg of wrong size, "
3482 					"peer wants %u, accepting only up to %u byte\n",
3483 					data_size, SHARED_SECRET_MAX);
3484 				err = -EIO;
3485 				goto reconnect;
3486 			}
3487 
3488 			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3489 			if (err)
3490 				goto reconnect;
3491 			/* we expect NUL terminated string */
3492 			/* but just in case someone tries to be evil */
3493 			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3494 			p->verify_alg[data_size-1] = 0;
3495 
3496 		} else /* apv >= 89 */ {
3497 			/* we still expect NUL terminated strings */
3498 			/* but just in case someone tries to be evil */
3499 			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3500 			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3501 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3502 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3503 		}
3504 
3505 		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3506 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3507 				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3508 				    old_net_conf->verify_alg, p->verify_alg);
3509 				goto disconnect;
3510 			}
3511 			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3512 					p->verify_alg, "verify-alg");
3513 			if (IS_ERR(verify_tfm)) {
3514 				verify_tfm = NULL;
3515 				goto disconnect;
3516 			}
3517 		}
3518 
3519 		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3520 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3521 				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3522 				    old_net_conf->csums_alg, p->csums_alg);
3523 				goto disconnect;
3524 			}
3525 			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3526 					p->csums_alg, "csums-alg");
3527 			if (IS_ERR(csums_tfm)) {
3528 				csums_tfm = NULL;
3529 				goto disconnect;
3530 			}
3531 		}
3532 
3533 		if (apv > 94 && new_disk_conf) {
3534 			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3535 			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3536 			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3537 			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3538 
3539 			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3540 			if (fifo_size != device->rs_plan_s->size) {
3541 				new_plan = fifo_alloc(fifo_size);
3542 				if (!new_plan) {
3543 					drbd_err(device, "kmalloc of fifo_buffer failed");
3544 					put_ldev(device);
3545 					goto disconnect;
3546 				}
3547 			}
3548 		}
3549 
3550 		if (verify_tfm || csums_tfm) {
3551 			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3552 			if (!new_net_conf) {
3553 				drbd_err(device, "Allocation of new net_conf failed\n");
3554 				goto disconnect;
3555 			}
3556 
3557 			*new_net_conf = *old_net_conf;
3558 
3559 			if (verify_tfm) {
3560 				strcpy(new_net_conf->verify_alg, p->verify_alg);
3561 				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3562 				crypto_free_hash(peer_device->connection->verify_tfm);
3563 				peer_device->connection->verify_tfm = verify_tfm;
3564 				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3565 			}
3566 			if (csums_tfm) {
3567 				strcpy(new_net_conf->csums_alg, p->csums_alg);
3568 				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3569 				crypto_free_hash(peer_device->connection->csums_tfm);
3570 				peer_device->connection->csums_tfm = csums_tfm;
3571 				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3572 			}
3573 			rcu_assign_pointer(connection->net_conf, new_net_conf);
3574 		}
3575 	}
3576 
3577 	if (new_disk_conf) {
3578 		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3579 		put_ldev(device);
3580 	}
3581 
3582 	if (new_plan) {
3583 		old_plan = device->rs_plan_s;
3584 		rcu_assign_pointer(device->rs_plan_s, new_plan);
3585 	}
3586 
3587 	mutex_unlock(&connection->resource->conf_update);
3588 	synchronize_rcu();
3589 	if (new_net_conf)
3590 		kfree(old_net_conf);
3591 	kfree(old_disk_conf);
3592 	kfree(old_plan);
3593 
3594 	return 0;
3595 
3596 reconnect:
3597 	if (new_disk_conf) {
3598 		put_ldev(device);
3599 		kfree(new_disk_conf);
3600 	}
3601 	mutex_unlock(&connection->resource->conf_update);
3602 	return -EIO;
3603 
3604 disconnect:
3605 	kfree(new_plan);
3606 	if (new_disk_conf) {
3607 		put_ldev(device);
3608 		kfree(new_disk_conf);
3609 	}
3610 	mutex_unlock(&connection->resource->conf_update);
3611 	/* just for completeness: actually not needed,
3612 	 * as this is not reached if csums_tfm was ok. */
3613 	crypto_free_hash(csums_tfm);
3614 	/* but free the verify_tfm again, if csums_tfm did not work out */
3615 	crypto_free_hash(verify_tfm);
3616 	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3617 	return -EIO;
3618 }
3619 
3620 /* warn if the arguments differ by more than 12.5% */
3621 static void warn_if_differ_considerably(struct drbd_device *device,
3622 	const char *s, sector_t a, sector_t b)
3623 {
3624 	sector_t d;
3625 	if (a == 0 || b == 0)
3626 		return;
3627 	d = (a > b) ? (a - b) : (b - a);
3628 	if (d > (a>>3) || d > (b>>3))
3629 		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3630 		     (unsigned long long)a, (unsigned long long)b);
3631 }
3632 
3633 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3634 {
3635 	struct drbd_peer_device *peer_device;
3636 	struct drbd_device *device;
3637 	struct p_sizes *p = pi->data;
3638 	enum determine_dev_size dd = DS_UNCHANGED;
3639 	sector_t p_size, p_usize, my_usize;
3640 	int ldsc = 0; /* local disk size changed */
3641 	enum dds_flags ddsf;
3642 
3643 	peer_device = conn_peer_device(connection, pi->vnr);
3644 	if (!peer_device)
3645 		return config_unknown_volume(connection, pi);
3646 	device = peer_device->device;
3647 
3648 	p_size = be64_to_cpu(p->d_size);
3649 	p_usize = be64_to_cpu(p->u_size);
3650 
3651 	/* just store the peer's disk size for now.
3652 	 * we still need to figure out whether we accept that. */
3653 	device->p_size = p_size;
3654 
3655 	if (get_ldev(device)) {
3656 		rcu_read_lock();
3657 		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3658 		rcu_read_unlock();
3659 
3660 		warn_if_differ_considerably(device, "lower level device sizes",
3661 			   p_size, drbd_get_max_capacity(device->ldev));
3662 		warn_if_differ_considerably(device, "user requested size",
3663 					    p_usize, my_usize);
3664 
3665 		/* if this is the first connect, or an otherwise expected
3666 		 * param exchange, choose the minimum */
3667 		if (device->state.conn == C_WF_REPORT_PARAMS)
3668 			p_usize = min_not_zero(my_usize, p_usize);
3669 
3670 		/* Never shrink a device with usable data during connect.
3671 		   But allow online shrinking if we are connected. */
3672 		if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3673 		    drbd_get_capacity(device->this_bdev) &&
3674 		    device->state.disk >= D_OUTDATED &&
3675 		    device->state.conn < C_CONNECTED) {
3676 			drbd_err(device, "The peer's disk size is too small!\n");
3677 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3678 			put_ldev(device);
3679 			return -EIO;
3680 		}
3681 
3682 		if (my_usize != p_usize) {
3683 			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3684 
3685 			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3686 			if (!new_disk_conf) {
3687 				drbd_err(device, "Allocation of new disk_conf failed\n");
3688 				put_ldev(device);
3689 				return -ENOMEM;
3690 			}
3691 
3692 			mutex_lock(&connection->resource->conf_update);
3693 			old_disk_conf = device->ldev->disk_conf;
3694 			*new_disk_conf = *old_disk_conf;
3695 			new_disk_conf->disk_size = p_usize;
3696 
3697 			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3698 			mutex_unlock(&connection->resource->conf_update);
3699 			synchronize_rcu();
3700 			kfree(old_disk_conf);
3701 
3702 			drbd_info(device, "Peer sets u_size to %lu sectors\n",
3703 				 (unsigned long)my_usize);
3704 		}
3705 
3706 		put_ldev(device);
3707 	}
3708 
3709 	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3710 	drbd_reconsider_max_bio_size(device);
3711 	/* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3712 	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3713 	   drbd_reconsider_max_bio_size(), we can be sure that after
3714 	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3715 
3716 	ddsf = be16_to_cpu(p->dds_flags);
3717 	if (get_ldev(device)) {
3718 		dd = drbd_determine_dev_size(device, ddsf, NULL);
3719 		put_ldev(device);
3720 		if (dd == DS_ERROR)
3721 			return -EIO;
3722 		drbd_md_sync(device);
3723 	} else {
3724 		/* I am diskless, need to accept the peer's size. */
3725 		drbd_set_my_capacity(device, p_size);
3726 	}
3727 
3728 	if (get_ldev(device)) {
3729 		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3730 			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3731 			ldsc = 1;
3732 		}
3733 
3734 		put_ldev(device);
3735 	}
3736 
3737 	if (device->state.conn > C_WF_REPORT_PARAMS) {
3738 		if (be64_to_cpu(p->c_size) !=
3739 		    drbd_get_capacity(device->this_bdev) || ldsc) {
3740 			/* we have different sizes, probably peer
3741 			 * needs to know my new size... */
3742 			drbd_send_sizes(peer_device, 0, ddsf);
3743 		}
3744 		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3745 		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3746 			if (device->state.pdsk >= D_INCONSISTENT &&
3747 			    device->state.disk >= D_INCONSISTENT) {
3748 				if (ddsf & DDSF_NO_RESYNC)
3749 					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3750 				else
3751 					resync_after_online_grow(device);
3752 			} else
3753 				set_bit(RESYNC_AFTER_NEG, &device->flags);
3754 		}
3755 	}
3756 
3757 	return 0;
3758 }
3759 
3760 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3761 {
3762 	struct drbd_peer_device *peer_device;
3763 	struct drbd_device *device;
3764 	struct p_uuids *p = pi->data;
3765 	u64 *p_uuid;
3766 	int i, updated_uuids = 0;
3767 
3768 	peer_device = conn_peer_device(connection, pi->vnr);
3769 	if (!peer_device)
3770 		return config_unknown_volume(connection, pi);
3771 	device = peer_device->device;
3772 
3773 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3774 	if (!p_uuid) {
3775 		drbd_err(device, "kmalloc of p_uuid failed\n");
3776 		return false;
3777 	}
3778 
3779 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3780 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
3781 
3782 	kfree(device->p_uuid);
3783 	device->p_uuid = p_uuid;
3784 
3785 	if (device->state.conn < C_CONNECTED &&
3786 	    device->state.disk < D_INCONSISTENT &&
3787 	    device->state.role == R_PRIMARY &&
3788 	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3789 		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3790 		    (unsigned long long)device->ed_uuid);
3791 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3792 		return -EIO;
3793 	}
3794 
3795 	if (get_ldev(device)) {
3796 		int skip_initial_sync =
3797 			device->state.conn == C_CONNECTED &&
3798 			peer_device->connection->agreed_pro_version >= 90 &&
3799 			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3800 			(p_uuid[UI_FLAGS] & 8);
3801 		if (skip_initial_sync) {
3802 			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3803 			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3804 					"clear_n_write from receive_uuids",
3805 					BM_LOCKED_TEST_ALLOWED);
3806 			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3807 			_drbd_uuid_set(device, UI_BITMAP, 0);
3808 			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3809 					CS_VERBOSE, NULL);
3810 			drbd_md_sync(device);
3811 			updated_uuids = 1;
3812 		}
3813 		put_ldev(device);
3814 	} else if (device->state.disk < D_INCONSISTENT &&
3815 		   device->state.role == R_PRIMARY) {
3816 		/* I am a diskless primary, the peer just created a new current UUID
3817 		   for me. */
3818 		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3819 	}
3820 
3821 	/* Before we test for the disk state, we should wait until an eventually
3822 	   ongoing cluster wide state change is finished. That is important if
3823 	   we are primary and are detaching from our disk. We need to see the
3824 	   new disk state... */
3825 	mutex_lock(device->state_mutex);
3826 	mutex_unlock(device->state_mutex);
3827 	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3828 		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3829 
3830 	if (updated_uuids)
3831 		drbd_print_uuids(device, "receiver updated UUIDs to");
3832 
3833 	return 0;
3834 }
3835 
3836 /**
3837  * convert_state() - Converts the peer's view of the cluster state to our point of view
3838  * @ps:		The state as seen by the peer.
3839  */
3840 static union drbd_state convert_state(union drbd_state ps)
3841 {
3842 	union drbd_state ms;
3843 
3844 	static enum drbd_conns c_tab[] = {
3845 		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3846 		[C_CONNECTED] = C_CONNECTED,
3847 
3848 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3849 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3850 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3851 		[C_VERIFY_S]       = C_VERIFY_T,
3852 		[C_MASK]   = C_MASK,
3853 	};
3854 
3855 	ms.i = ps.i;
3856 
3857 	ms.conn = c_tab[ps.conn];
3858 	ms.peer = ps.role;
3859 	ms.role = ps.peer;
3860 	ms.pdsk = ps.disk;
3861 	ms.disk = ps.pdsk;
3862 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3863 
3864 	return ms;
3865 }
3866 
3867 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3868 {
3869 	struct drbd_peer_device *peer_device;
3870 	struct drbd_device *device;
3871 	struct p_req_state *p = pi->data;
3872 	union drbd_state mask, val;
3873 	enum drbd_state_rv rv;
3874 
3875 	peer_device = conn_peer_device(connection, pi->vnr);
3876 	if (!peer_device)
3877 		return -EIO;
3878 	device = peer_device->device;
3879 
3880 	mask.i = be32_to_cpu(p->mask);
3881 	val.i = be32_to_cpu(p->val);
3882 
3883 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3884 	    mutex_is_locked(device->state_mutex)) {
3885 		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3886 		return 0;
3887 	}
3888 
3889 	mask = convert_state(mask);
3890 	val = convert_state(val);
3891 
3892 	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3893 	drbd_send_sr_reply(peer_device, rv);
3894 
3895 	drbd_md_sync(device);
3896 
3897 	return 0;
3898 }
3899 
3900 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
3901 {
3902 	struct p_req_state *p = pi->data;
3903 	union drbd_state mask, val;
3904 	enum drbd_state_rv rv;
3905 
3906 	mask.i = be32_to_cpu(p->mask);
3907 	val.i = be32_to_cpu(p->val);
3908 
3909 	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
3910 	    mutex_is_locked(&connection->cstate_mutex)) {
3911 		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
3912 		return 0;
3913 	}
3914 
3915 	mask = convert_state(mask);
3916 	val = convert_state(val);
3917 
3918 	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3919 	conn_send_sr_reply(connection, rv);
3920 
3921 	return 0;
3922 }
3923 
3924 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
3925 {
3926 	struct drbd_peer_device *peer_device;
3927 	struct drbd_device *device;
3928 	struct p_state *p = pi->data;
3929 	union drbd_state os, ns, peer_state;
3930 	enum drbd_disk_state real_peer_disk;
3931 	enum chg_state_flags cs_flags;
3932 	int rv;
3933 
3934 	peer_device = conn_peer_device(connection, pi->vnr);
3935 	if (!peer_device)
3936 		return config_unknown_volume(connection, pi);
3937 	device = peer_device->device;
3938 
3939 	peer_state.i = be32_to_cpu(p->state);
3940 
3941 	real_peer_disk = peer_state.disk;
3942 	if (peer_state.disk == D_NEGOTIATING) {
3943 		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3944 		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3945 	}
3946 
3947 	spin_lock_irq(&device->resource->req_lock);
3948  retry:
3949 	os = ns = drbd_read_state(device);
3950 	spin_unlock_irq(&device->resource->req_lock);
3951 
3952 	/* If some other part of the code (asender thread, timeout)
3953 	 * already decided to close the connection again,
3954 	 * we must not "re-establish" it here. */
3955 	if (os.conn <= C_TEAR_DOWN)
3956 		return -ECONNRESET;
3957 
3958 	/* If this is the "end of sync" confirmation, usually the peer disk
3959 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3960 	 * set) resync started in PausedSyncT, or if the timing of pause-/
3961 	 * unpause-sync events has been "just right", the peer disk may
3962 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3963 	 */
3964 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3965 	    real_peer_disk == D_UP_TO_DATE &&
3966 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3967 		/* If we are (becoming) SyncSource, but peer is still in sync
3968 		 * preparation, ignore its uptodate-ness to avoid flapping, it
3969 		 * will change to inconsistent once the peer reaches active
3970 		 * syncing states.
3971 		 * It may have changed syncer-paused flags, however, so we
3972 		 * cannot ignore this completely. */
3973 		if (peer_state.conn > C_CONNECTED &&
3974 		    peer_state.conn < C_SYNC_SOURCE)
3975 			real_peer_disk = D_INCONSISTENT;
3976 
3977 		/* if peer_state changes to connected at the same time,
3978 		 * it explicitly notifies us that it finished resync.
3979 		 * Maybe we should finish it up, too? */
3980 		else if (os.conn >= C_SYNC_SOURCE &&
3981 			 peer_state.conn == C_CONNECTED) {
3982 			if (drbd_bm_total_weight(device) <= device->rs_failed)
3983 				drbd_resync_finished(device);
3984 			return 0;
3985 		}
3986 	}
3987 
3988 	/* explicit verify finished notification, stop sector reached. */
3989 	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3990 	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3991 		ov_out_of_sync_print(device);
3992 		drbd_resync_finished(device);
3993 		return 0;
3994 	}
3995 
3996 	/* peer says his disk is inconsistent, while we think it is uptodate,
3997 	 * and this happens while the peer still thinks we have a sync going on,
3998 	 * but we think we are already done with the sync.
3999 	 * We ignore this to avoid flapping pdsk.
4000 	 * This should not happen, if the peer is a recent version of drbd. */
4001 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4002 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4003 		real_peer_disk = D_UP_TO_DATE;
4004 
4005 	if (ns.conn == C_WF_REPORT_PARAMS)
4006 		ns.conn = C_CONNECTED;
4007 
4008 	if (peer_state.conn == C_AHEAD)
4009 		ns.conn = C_BEHIND;
4010 
4011 	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4012 	    get_ldev_if_state(device, D_NEGOTIATING)) {
4013 		int cr; /* consider resync */
4014 
4015 		/* if we established a new connection */
4016 		cr  = (os.conn < C_CONNECTED);
4017 		/* if we had an established connection
4018 		 * and one of the nodes newly attaches a disk */
4019 		cr |= (os.conn == C_CONNECTED &&
4020 		       (peer_state.disk == D_NEGOTIATING ||
4021 			os.disk == D_NEGOTIATING));
4022 		/* if we have both been inconsistent, and the peer has been
4023 		 * forced to be UpToDate with --overwrite-data */
4024 		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4025 		/* if we had been plain connected, and the admin requested to
4026 		 * start a sync by "invalidate" or "invalidate-remote" */
4027 		cr |= (os.conn == C_CONNECTED &&
4028 				(peer_state.conn >= C_STARTING_SYNC_S &&
4029 				 peer_state.conn <= C_WF_BITMAP_T));
4030 
4031 		if (cr)
4032 			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4033 
4034 		put_ldev(device);
4035 		if (ns.conn == C_MASK) {
4036 			ns.conn = C_CONNECTED;
4037 			if (device->state.disk == D_NEGOTIATING) {
4038 				drbd_force_state(device, NS(disk, D_FAILED));
4039 			} else if (peer_state.disk == D_NEGOTIATING) {
4040 				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4041 				peer_state.disk = D_DISKLESS;
4042 				real_peer_disk = D_DISKLESS;
4043 			} else {
4044 				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4045 					return -EIO;
4046 				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4047 				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4048 				return -EIO;
4049 			}
4050 		}
4051 	}
4052 
4053 	spin_lock_irq(&device->resource->req_lock);
4054 	if (os.i != drbd_read_state(device).i)
4055 		goto retry;
4056 	clear_bit(CONSIDER_RESYNC, &device->flags);
4057 	ns.peer = peer_state.role;
4058 	ns.pdsk = real_peer_disk;
4059 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4060 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4061 		ns.disk = device->new_state_tmp.disk;
4062 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4063 	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4064 	    test_bit(NEW_CUR_UUID, &device->flags)) {
4065 		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4066 		   for temporal network outages! */
4067 		spin_unlock_irq(&device->resource->req_lock);
4068 		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4069 		tl_clear(peer_device->connection);
4070 		drbd_uuid_new_current(device);
4071 		clear_bit(NEW_CUR_UUID, &device->flags);
4072 		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4073 		return -EIO;
4074 	}
4075 	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4076 	ns = drbd_read_state(device);
4077 	spin_unlock_irq(&device->resource->req_lock);
4078 
4079 	if (rv < SS_SUCCESS) {
4080 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4081 		return -EIO;
4082 	}
4083 
4084 	if (os.conn > C_WF_REPORT_PARAMS) {
4085 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4086 		    peer_state.disk != D_NEGOTIATING ) {
4087 			/* we want resync, peer has not yet decided to sync... */
4088 			/* Nowadays only used when forcing a node into primary role and
4089 			   setting its disk to UpToDate with that */
4090 			drbd_send_uuids(peer_device);
4091 			drbd_send_current_state(peer_device);
4092 		}
4093 	}
4094 
4095 	clear_bit(DISCARD_MY_DATA, &device->flags);
4096 
4097 	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4098 
4099 	return 0;
4100 }
4101 
4102 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4103 {
4104 	struct drbd_peer_device *peer_device;
4105 	struct drbd_device *device;
4106 	struct p_rs_uuid *p = pi->data;
4107 
4108 	peer_device = conn_peer_device(connection, pi->vnr);
4109 	if (!peer_device)
4110 		return -EIO;
4111 	device = peer_device->device;
4112 
4113 	wait_event(device->misc_wait,
4114 		   device->state.conn == C_WF_SYNC_UUID ||
4115 		   device->state.conn == C_BEHIND ||
4116 		   device->state.conn < C_CONNECTED ||
4117 		   device->state.disk < D_NEGOTIATING);
4118 
4119 	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4120 
4121 	/* Here the _drbd_uuid_ functions are right, current should
4122 	   _not_ be rotated into the history */
4123 	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4124 		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4125 		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4126 
4127 		drbd_print_uuids(device, "updated sync uuid");
4128 		drbd_start_resync(device, C_SYNC_TARGET);
4129 
4130 		put_ldev(device);
4131 	} else
4132 		drbd_err(device, "Ignoring SyncUUID packet!\n");
4133 
4134 	return 0;
4135 }
4136 
4137 /**
4138  * receive_bitmap_plain
4139  *
4140  * Return 0 when done, 1 when another iteration is needed, and a negative error
4141  * code upon failure.
4142  */
4143 static int
4144 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4145 		     unsigned long *p, struct bm_xfer_ctx *c)
4146 {
4147 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4148 				 drbd_header_size(peer_device->connection);
4149 	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4150 				       c->bm_words - c->word_offset);
4151 	unsigned int want = num_words * sizeof(*p);
4152 	int err;
4153 
4154 	if (want != size) {
4155 		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4156 		return -EIO;
4157 	}
4158 	if (want == 0)
4159 		return 0;
4160 	err = drbd_recv_all(peer_device->connection, p, want);
4161 	if (err)
4162 		return err;
4163 
4164 	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4165 
4166 	c->word_offset += num_words;
4167 	c->bit_offset = c->word_offset * BITS_PER_LONG;
4168 	if (c->bit_offset > c->bm_bits)
4169 		c->bit_offset = c->bm_bits;
4170 
4171 	return 1;
4172 }
4173 
4174 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4175 {
4176 	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4177 }
4178 
4179 static int dcbp_get_start(struct p_compressed_bm *p)
4180 {
4181 	return (p->encoding & 0x80) != 0;
4182 }
4183 
4184 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4185 {
4186 	return (p->encoding >> 4) & 0x7;
4187 }
4188 
4189 /**
4190  * recv_bm_rle_bits
4191  *
4192  * Return 0 when done, 1 when another iteration is needed, and a negative error
4193  * code upon failure.
4194  */
4195 static int
4196 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4197 		struct p_compressed_bm *p,
4198 		 struct bm_xfer_ctx *c,
4199 		 unsigned int len)
4200 {
4201 	struct bitstream bs;
4202 	u64 look_ahead;
4203 	u64 rl;
4204 	u64 tmp;
4205 	unsigned long s = c->bit_offset;
4206 	unsigned long e;
4207 	int toggle = dcbp_get_start(p);
4208 	int have;
4209 	int bits;
4210 
4211 	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4212 
4213 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4214 	if (bits < 0)
4215 		return -EIO;
4216 
4217 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4218 		bits = vli_decode_bits(&rl, look_ahead);
4219 		if (bits <= 0)
4220 			return -EIO;
4221 
4222 		if (toggle) {
4223 			e = s + rl -1;
4224 			if (e >= c->bm_bits) {
4225 				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4226 				return -EIO;
4227 			}
4228 			_drbd_bm_set_bits(peer_device->device, s, e);
4229 		}
4230 
4231 		if (have < bits) {
4232 			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4233 				have, bits, look_ahead,
4234 				(unsigned int)(bs.cur.b - p->code),
4235 				(unsigned int)bs.buf_len);
4236 			return -EIO;
4237 		}
4238 		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4239 		if (likely(bits < 64))
4240 			look_ahead >>= bits;
4241 		else
4242 			look_ahead = 0;
4243 		have -= bits;
4244 
4245 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4246 		if (bits < 0)
4247 			return -EIO;
4248 		look_ahead |= tmp << have;
4249 		have += bits;
4250 	}
4251 
4252 	c->bit_offset = s;
4253 	bm_xfer_ctx_bit_to_word_offset(c);
4254 
4255 	return (s != c->bm_bits);
4256 }
4257 
4258 /**
4259  * decode_bitmap_c
4260  *
4261  * Return 0 when done, 1 when another iteration is needed, and a negative error
4262  * code upon failure.
4263  */
4264 static int
4265 decode_bitmap_c(struct drbd_peer_device *peer_device,
4266 		struct p_compressed_bm *p,
4267 		struct bm_xfer_ctx *c,
4268 		unsigned int len)
4269 {
4270 	if (dcbp_get_code(p) == RLE_VLI_Bits)
4271 		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4272 
4273 	/* other variants had been implemented for evaluation,
4274 	 * but have been dropped as this one turned out to be "best"
4275 	 * during all our tests. */
4276 
4277 	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4278 	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4279 	return -EIO;
4280 }
4281 
4282 void INFO_bm_xfer_stats(struct drbd_device *device,
4283 		const char *direction, struct bm_xfer_ctx *c)
4284 {
4285 	/* what would it take to transfer it "plaintext" */
4286 	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4287 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4288 	unsigned int plain =
4289 		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4290 		c->bm_words * sizeof(unsigned long);
4291 	unsigned int total = c->bytes[0] + c->bytes[1];
4292 	unsigned int r;
4293 
4294 	/* total can not be zero. but just in case: */
4295 	if (total == 0)
4296 		return;
4297 
4298 	/* don't report if not compressed */
4299 	if (total >= plain)
4300 		return;
4301 
4302 	/* total < plain. check for overflow, still */
4303 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4304 		                    : (1000 * total / plain);
4305 
4306 	if (r > 1000)
4307 		r = 1000;
4308 
4309 	r = 1000 - r;
4310 	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4311 	     "total %u; compression: %u.%u%%\n",
4312 			direction,
4313 			c->bytes[1], c->packets[1],
4314 			c->bytes[0], c->packets[0],
4315 			total, r/10, r % 10);
4316 }
4317 
4318 /* Since we are processing the bitfield from lower addresses to higher,
4319    it does not matter if the process it in 32 bit chunks or 64 bit
4320    chunks as long as it is little endian. (Understand it as byte stream,
4321    beginning with the lowest byte...) If we would use big endian
4322    we would need to process it from the highest address to the lowest,
4323    in order to be agnostic to the 32 vs 64 bits issue.
4324 
4325    returns 0 on failure, 1 if we successfully received it. */
4326 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4327 {
4328 	struct drbd_peer_device *peer_device;
4329 	struct drbd_device *device;
4330 	struct bm_xfer_ctx c;
4331 	int err;
4332 
4333 	peer_device = conn_peer_device(connection, pi->vnr);
4334 	if (!peer_device)
4335 		return -EIO;
4336 	device = peer_device->device;
4337 
4338 	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4339 	/* you are supposed to send additional out-of-sync information
4340 	 * if you actually set bits during this phase */
4341 
4342 	c = (struct bm_xfer_ctx) {
4343 		.bm_bits = drbd_bm_bits(device),
4344 		.bm_words = drbd_bm_words(device),
4345 	};
4346 
4347 	for(;;) {
4348 		if (pi->cmd == P_BITMAP)
4349 			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4350 		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4351 			/* MAYBE: sanity check that we speak proto >= 90,
4352 			 * and the feature is enabled! */
4353 			struct p_compressed_bm *p = pi->data;
4354 
4355 			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4356 				drbd_err(device, "ReportCBitmap packet too large\n");
4357 				err = -EIO;
4358 				goto out;
4359 			}
4360 			if (pi->size <= sizeof(*p)) {
4361 				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4362 				err = -EIO;
4363 				goto out;
4364 			}
4365 			err = drbd_recv_all(peer_device->connection, p, pi->size);
4366 			if (err)
4367 			       goto out;
4368 			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4369 		} else {
4370 			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4371 			err = -EIO;
4372 			goto out;
4373 		}
4374 
4375 		c.packets[pi->cmd == P_BITMAP]++;
4376 		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4377 
4378 		if (err <= 0) {
4379 			if (err < 0)
4380 				goto out;
4381 			break;
4382 		}
4383 		err = drbd_recv_header(peer_device->connection, pi);
4384 		if (err)
4385 			goto out;
4386 	}
4387 
4388 	INFO_bm_xfer_stats(device, "receive", &c);
4389 
4390 	if (device->state.conn == C_WF_BITMAP_T) {
4391 		enum drbd_state_rv rv;
4392 
4393 		err = drbd_send_bitmap(device);
4394 		if (err)
4395 			goto out;
4396 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4397 		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4398 		D_ASSERT(device, rv == SS_SUCCESS);
4399 	} else if (device->state.conn != C_WF_BITMAP_S) {
4400 		/* admin may have requested C_DISCONNECTING,
4401 		 * other threads may have noticed network errors */
4402 		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4403 		    drbd_conn_str(device->state.conn));
4404 	}
4405 	err = 0;
4406 
4407  out:
4408 	drbd_bm_unlock(device);
4409 	if (!err && device->state.conn == C_WF_BITMAP_S)
4410 		drbd_start_resync(device, C_SYNC_SOURCE);
4411 	return err;
4412 }
4413 
4414 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4415 {
4416 	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4417 		 pi->cmd, pi->size);
4418 
4419 	return ignore_remaining_packet(connection, pi);
4420 }
4421 
4422 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4423 {
4424 	/* Make sure we've acked all the TCP data associated
4425 	 * with the data requests being unplugged */
4426 	drbd_tcp_quickack(connection->data.socket);
4427 
4428 	return 0;
4429 }
4430 
4431 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4432 {
4433 	struct drbd_peer_device *peer_device;
4434 	struct drbd_device *device;
4435 	struct p_block_desc *p = pi->data;
4436 
4437 	peer_device = conn_peer_device(connection, pi->vnr);
4438 	if (!peer_device)
4439 		return -EIO;
4440 	device = peer_device->device;
4441 
4442 	switch (device->state.conn) {
4443 	case C_WF_SYNC_UUID:
4444 	case C_WF_BITMAP_T:
4445 	case C_BEHIND:
4446 			break;
4447 	default:
4448 		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4449 				drbd_conn_str(device->state.conn));
4450 	}
4451 
4452 	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4453 
4454 	return 0;
4455 }
4456 
4457 struct data_cmd {
4458 	int expect_payload;
4459 	size_t pkt_size;
4460 	int (*fn)(struct drbd_connection *, struct packet_info *);
4461 };
4462 
4463 static struct data_cmd drbd_cmd_handler[] = {
4464 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
4465 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
4466 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4467 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4468 	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
4469 	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4470 	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4471 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4472 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4473 	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
4474 	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4475 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4476 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
4477 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
4478 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
4479 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4480 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4481 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4482 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4483 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4484 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4485 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4486 	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4487 	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4488 	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
4489 };
4490 
4491 static void drbdd(struct drbd_connection *connection)
4492 {
4493 	struct packet_info pi;
4494 	size_t shs; /* sub header size */
4495 	int err;
4496 
4497 	while (get_t_state(&connection->receiver) == RUNNING) {
4498 		struct data_cmd *cmd;
4499 
4500 		drbd_thread_current_set_cpu(&connection->receiver);
4501 		if (drbd_recv_header(connection, &pi))
4502 			goto err_out;
4503 
4504 		cmd = &drbd_cmd_handler[pi.cmd];
4505 		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4506 			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4507 				 cmdname(pi.cmd), pi.cmd);
4508 			goto err_out;
4509 		}
4510 
4511 		shs = cmd->pkt_size;
4512 		if (pi.size > shs && !cmd->expect_payload) {
4513 			drbd_err(connection, "No payload expected %s l:%d\n",
4514 				 cmdname(pi.cmd), pi.size);
4515 			goto err_out;
4516 		}
4517 
4518 		if (shs) {
4519 			err = drbd_recv_all_warn(connection, pi.data, shs);
4520 			if (err)
4521 				goto err_out;
4522 			pi.size -= shs;
4523 		}
4524 
4525 		err = cmd->fn(connection, &pi);
4526 		if (err) {
4527 			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4528 				 cmdname(pi.cmd), err, pi.size);
4529 			goto err_out;
4530 		}
4531 	}
4532 	return;
4533 
4534     err_out:
4535 	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4536 }
4537 
4538 static void conn_disconnect(struct drbd_connection *connection)
4539 {
4540 	struct drbd_peer_device *peer_device;
4541 	enum drbd_conns oc;
4542 	int vnr;
4543 
4544 	if (connection->cstate == C_STANDALONE)
4545 		return;
4546 
4547 	/* We are about to start the cleanup after connection loss.
4548 	 * Make sure drbd_make_request knows about that.
4549 	 * Usually we should be in some network failure state already,
4550 	 * but just in case we are not, we fix it up here.
4551 	 */
4552 	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4553 
4554 	/* asender does not clean up anything. it must not interfere, either */
4555 	drbd_thread_stop(&connection->asender);
4556 	drbd_free_sock(connection);
4557 
4558 	rcu_read_lock();
4559 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4560 		struct drbd_device *device = peer_device->device;
4561 		kref_get(&device->kref);
4562 		rcu_read_unlock();
4563 		drbd_disconnected(peer_device);
4564 		kref_put(&device->kref, drbd_destroy_device);
4565 		rcu_read_lock();
4566 	}
4567 	rcu_read_unlock();
4568 
4569 	if (!list_empty(&connection->current_epoch->list))
4570 		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4571 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4572 	atomic_set(&connection->current_epoch->epoch_size, 0);
4573 	connection->send.seen_any_write_yet = false;
4574 
4575 	drbd_info(connection, "Connection closed\n");
4576 
4577 	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4578 		conn_try_outdate_peer_async(connection);
4579 
4580 	spin_lock_irq(&connection->resource->req_lock);
4581 	oc = connection->cstate;
4582 	if (oc >= C_UNCONNECTED)
4583 		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4584 
4585 	spin_unlock_irq(&connection->resource->req_lock);
4586 
4587 	if (oc == C_DISCONNECTING)
4588 		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4589 }
4590 
4591 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4592 {
4593 	struct drbd_device *device = peer_device->device;
4594 	unsigned int i;
4595 
4596 	/* wait for current activity to cease. */
4597 	spin_lock_irq(&device->resource->req_lock);
4598 	_drbd_wait_ee_list_empty(device, &device->active_ee);
4599 	_drbd_wait_ee_list_empty(device, &device->sync_ee);
4600 	_drbd_wait_ee_list_empty(device, &device->read_ee);
4601 	spin_unlock_irq(&device->resource->req_lock);
4602 
4603 	/* We do not have data structures that would allow us to
4604 	 * get the rs_pending_cnt down to 0 again.
4605 	 *  * On C_SYNC_TARGET we do not have any data structures describing
4606 	 *    the pending RSDataRequest's we have sent.
4607 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
4608 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4609 	 *  And no, it is not the sum of the reference counts in the
4610 	 *  resync_LRU. The resync_LRU tracks the whole operation including
4611 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4612 	 *  on the fly. */
4613 	drbd_rs_cancel_all(device);
4614 	device->rs_total = 0;
4615 	device->rs_failed = 0;
4616 	atomic_set(&device->rs_pending_cnt, 0);
4617 	wake_up(&device->misc_wait);
4618 
4619 	del_timer_sync(&device->resync_timer);
4620 	resync_timer_fn((unsigned long)device);
4621 
4622 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4623 	 * w_make_resync_request etc. which may still be on the worker queue
4624 	 * to be "canceled" */
4625 	drbd_flush_workqueue(&peer_device->connection->sender_work);
4626 
4627 	drbd_finish_peer_reqs(device);
4628 
4629 	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4630 	   might have issued a work again. The one before drbd_finish_peer_reqs() is
4631 	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4632 	drbd_flush_workqueue(&peer_device->connection->sender_work);
4633 
4634 	/* need to do it again, drbd_finish_peer_reqs() may have populated it
4635 	 * again via drbd_try_clear_on_disk_bm(). */
4636 	drbd_rs_cancel_all(device);
4637 
4638 	kfree(device->p_uuid);
4639 	device->p_uuid = NULL;
4640 
4641 	if (!drbd_suspended(device))
4642 		tl_clear(peer_device->connection);
4643 
4644 	drbd_md_sync(device);
4645 
4646 	/* serialize with bitmap writeout triggered by the state change,
4647 	 * if any. */
4648 	wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4649 
4650 	/* tcp_close and release of sendpage pages can be deferred.  I don't
4651 	 * want to use SO_LINGER, because apparently it can be deferred for
4652 	 * more than 20 seconds (longest time I checked).
4653 	 *
4654 	 * Actually we don't care for exactly when the network stack does its
4655 	 * put_page(), but release our reference on these pages right here.
4656 	 */
4657 	i = drbd_free_peer_reqs(device, &device->net_ee);
4658 	if (i)
4659 		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4660 	i = atomic_read(&device->pp_in_use_by_net);
4661 	if (i)
4662 		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4663 	i = atomic_read(&device->pp_in_use);
4664 	if (i)
4665 		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4666 
4667 	D_ASSERT(device, list_empty(&device->read_ee));
4668 	D_ASSERT(device, list_empty(&device->active_ee));
4669 	D_ASSERT(device, list_empty(&device->sync_ee));
4670 	D_ASSERT(device, list_empty(&device->done_ee));
4671 
4672 	return 0;
4673 }
4674 
4675 /*
4676  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4677  * we can agree on is stored in agreed_pro_version.
4678  *
4679  * feature flags and the reserved array should be enough room for future
4680  * enhancements of the handshake protocol, and possible plugins...
4681  *
4682  * for now, they are expected to be zero, but ignored.
4683  */
4684 static int drbd_send_features(struct drbd_connection *connection)
4685 {
4686 	struct drbd_socket *sock;
4687 	struct p_connection_features *p;
4688 
4689 	sock = &connection->data;
4690 	p = conn_prepare_command(connection, sock);
4691 	if (!p)
4692 		return -EIO;
4693 	memset(p, 0, sizeof(*p));
4694 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4695 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4696 	p->feature_flags = cpu_to_be32(PRO_FEATURES);
4697 	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4698 }
4699 
4700 /*
4701  * return values:
4702  *   1 yes, we have a valid connection
4703  *   0 oops, did not work out, please try again
4704  *  -1 peer talks different language,
4705  *     no point in trying again, please go standalone.
4706  */
4707 static int drbd_do_features(struct drbd_connection *connection)
4708 {
4709 	/* ASSERT current == connection->receiver ... */
4710 	struct p_connection_features *p;
4711 	const int expect = sizeof(struct p_connection_features);
4712 	struct packet_info pi;
4713 	int err;
4714 
4715 	err = drbd_send_features(connection);
4716 	if (err)
4717 		return 0;
4718 
4719 	err = drbd_recv_header(connection, &pi);
4720 	if (err)
4721 		return 0;
4722 
4723 	if (pi.cmd != P_CONNECTION_FEATURES) {
4724 		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4725 			 cmdname(pi.cmd), pi.cmd);
4726 		return -1;
4727 	}
4728 
4729 	if (pi.size != expect) {
4730 		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4731 		     expect, pi.size);
4732 		return -1;
4733 	}
4734 
4735 	p = pi.data;
4736 	err = drbd_recv_all_warn(connection, p, expect);
4737 	if (err)
4738 		return 0;
4739 
4740 	p->protocol_min = be32_to_cpu(p->protocol_min);
4741 	p->protocol_max = be32_to_cpu(p->protocol_max);
4742 	if (p->protocol_max == 0)
4743 		p->protocol_max = p->protocol_min;
4744 
4745 	if (PRO_VERSION_MAX < p->protocol_min ||
4746 	    PRO_VERSION_MIN > p->protocol_max)
4747 		goto incompat;
4748 
4749 	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4750 	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4751 
4752 	drbd_info(connection, "Handshake successful: "
4753 	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
4754 
4755 	drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4756 		  connection->agreed_features & FF_TRIM ? " " : " not ");
4757 
4758 	return 1;
4759 
4760  incompat:
4761 	drbd_err(connection, "incompatible DRBD dialects: "
4762 	    "I support %d-%d, peer supports %d-%d\n",
4763 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
4764 	    p->protocol_min, p->protocol_max);
4765 	return -1;
4766 }
4767 
4768 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4769 static int drbd_do_auth(struct drbd_connection *connection)
4770 {
4771 	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4772 	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4773 	return -1;
4774 }
4775 #else
4776 #define CHALLENGE_LEN 64
4777 
4778 /* Return value:
4779 	1 - auth succeeded,
4780 	0 - failed, try again (network error),
4781 	-1 - auth failed, don't try again.
4782 */
4783 
4784 static int drbd_do_auth(struct drbd_connection *connection)
4785 {
4786 	struct drbd_socket *sock;
4787 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4788 	struct scatterlist sg;
4789 	char *response = NULL;
4790 	char *right_response = NULL;
4791 	char *peers_ch = NULL;
4792 	unsigned int key_len;
4793 	char secret[SHARED_SECRET_MAX]; /* 64 byte */
4794 	unsigned int resp_size;
4795 	struct hash_desc desc;
4796 	struct packet_info pi;
4797 	struct net_conf *nc;
4798 	int err, rv;
4799 
4800 	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4801 
4802 	rcu_read_lock();
4803 	nc = rcu_dereference(connection->net_conf);
4804 	key_len = strlen(nc->shared_secret);
4805 	memcpy(secret, nc->shared_secret, key_len);
4806 	rcu_read_unlock();
4807 
4808 	desc.tfm = connection->cram_hmac_tfm;
4809 	desc.flags = 0;
4810 
4811 	rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4812 	if (rv) {
4813 		drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4814 		rv = -1;
4815 		goto fail;
4816 	}
4817 
4818 	get_random_bytes(my_challenge, CHALLENGE_LEN);
4819 
4820 	sock = &connection->data;
4821 	if (!conn_prepare_command(connection, sock)) {
4822 		rv = 0;
4823 		goto fail;
4824 	}
4825 	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4826 				my_challenge, CHALLENGE_LEN);
4827 	if (!rv)
4828 		goto fail;
4829 
4830 	err = drbd_recv_header(connection, &pi);
4831 	if (err) {
4832 		rv = 0;
4833 		goto fail;
4834 	}
4835 
4836 	if (pi.cmd != P_AUTH_CHALLENGE) {
4837 		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4838 			 cmdname(pi.cmd), pi.cmd);
4839 		rv = 0;
4840 		goto fail;
4841 	}
4842 
4843 	if (pi.size > CHALLENGE_LEN * 2) {
4844 		drbd_err(connection, "expected AuthChallenge payload too big.\n");
4845 		rv = -1;
4846 		goto fail;
4847 	}
4848 
4849 	if (pi.size < CHALLENGE_LEN) {
4850 		drbd_err(connection, "AuthChallenge payload too small.\n");
4851 		rv = -1;
4852 		goto fail;
4853 	}
4854 
4855 	peers_ch = kmalloc(pi.size, GFP_NOIO);
4856 	if (peers_ch == NULL) {
4857 		drbd_err(connection, "kmalloc of peers_ch failed\n");
4858 		rv = -1;
4859 		goto fail;
4860 	}
4861 
4862 	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4863 	if (err) {
4864 		rv = 0;
4865 		goto fail;
4866 	}
4867 
4868 	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
4869 		drbd_err(connection, "Peer presented the same challenge!\n");
4870 		rv = -1;
4871 		goto fail;
4872 	}
4873 
4874 	resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4875 	response = kmalloc(resp_size, GFP_NOIO);
4876 	if (response == NULL) {
4877 		drbd_err(connection, "kmalloc of response failed\n");
4878 		rv = -1;
4879 		goto fail;
4880 	}
4881 
4882 	sg_init_table(&sg, 1);
4883 	sg_set_buf(&sg, peers_ch, pi.size);
4884 
4885 	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4886 	if (rv) {
4887 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4888 		rv = -1;
4889 		goto fail;
4890 	}
4891 
4892 	if (!conn_prepare_command(connection, sock)) {
4893 		rv = 0;
4894 		goto fail;
4895 	}
4896 	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
4897 				response, resp_size);
4898 	if (!rv)
4899 		goto fail;
4900 
4901 	err = drbd_recv_header(connection, &pi);
4902 	if (err) {
4903 		rv = 0;
4904 		goto fail;
4905 	}
4906 
4907 	if (pi.cmd != P_AUTH_RESPONSE) {
4908 		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
4909 			 cmdname(pi.cmd), pi.cmd);
4910 		rv = 0;
4911 		goto fail;
4912 	}
4913 
4914 	if (pi.size != resp_size) {
4915 		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
4916 		rv = 0;
4917 		goto fail;
4918 	}
4919 
4920 	err = drbd_recv_all_warn(connection, response , resp_size);
4921 	if (err) {
4922 		rv = 0;
4923 		goto fail;
4924 	}
4925 
4926 	right_response = kmalloc(resp_size, GFP_NOIO);
4927 	if (right_response == NULL) {
4928 		drbd_err(connection, "kmalloc of right_response failed\n");
4929 		rv = -1;
4930 		goto fail;
4931 	}
4932 
4933 	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4934 
4935 	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4936 	if (rv) {
4937 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4938 		rv = -1;
4939 		goto fail;
4940 	}
4941 
4942 	rv = !memcmp(response, right_response, resp_size);
4943 
4944 	if (rv)
4945 		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
4946 		     resp_size);
4947 	else
4948 		rv = -1;
4949 
4950  fail:
4951 	kfree(peers_ch);
4952 	kfree(response);
4953 	kfree(right_response);
4954 
4955 	return rv;
4956 }
4957 #endif
4958 
4959 int drbd_receiver(struct drbd_thread *thi)
4960 {
4961 	struct drbd_connection *connection = thi->connection;
4962 	int h;
4963 
4964 	drbd_info(connection, "receiver (re)started\n");
4965 
4966 	do {
4967 		h = conn_connect(connection);
4968 		if (h == 0) {
4969 			conn_disconnect(connection);
4970 			schedule_timeout_interruptible(HZ);
4971 		}
4972 		if (h == -1) {
4973 			drbd_warn(connection, "Discarding network configuration.\n");
4974 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
4975 		}
4976 	} while (h == 0);
4977 
4978 	if (h > 0)
4979 		drbdd(connection);
4980 
4981 	conn_disconnect(connection);
4982 
4983 	drbd_info(connection, "receiver terminated\n");
4984 	return 0;
4985 }
4986 
4987 /* ********* acknowledge sender ******** */
4988 
4989 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4990 {
4991 	struct p_req_state_reply *p = pi->data;
4992 	int retcode = be32_to_cpu(p->retcode);
4993 
4994 	if (retcode >= SS_SUCCESS) {
4995 		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
4996 	} else {
4997 		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
4998 		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
4999 			 drbd_set_st_err_str(retcode), retcode);
5000 	}
5001 	wake_up(&connection->ping_wait);
5002 
5003 	return 0;
5004 }
5005 
5006 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5007 {
5008 	struct drbd_peer_device *peer_device;
5009 	struct drbd_device *device;
5010 	struct p_req_state_reply *p = pi->data;
5011 	int retcode = be32_to_cpu(p->retcode);
5012 
5013 	peer_device = conn_peer_device(connection, pi->vnr);
5014 	if (!peer_device)
5015 		return -EIO;
5016 	device = peer_device->device;
5017 
5018 	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5019 		D_ASSERT(device, connection->agreed_pro_version < 100);
5020 		return got_conn_RqSReply(connection, pi);
5021 	}
5022 
5023 	if (retcode >= SS_SUCCESS) {
5024 		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5025 	} else {
5026 		set_bit(CL_ST_CHG_FAIL, &device->flags);
5027 		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5028 			drbd_set_st_err_str(retcode), retcode);
5029 	}
5030 	wake_up(&device->state_wait);
5031 
5032 	return 0;
5033 }
5034 
5035 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5036 {
5037 	return drbd_send_ping_ack(connection);
5038 
5039 }
5040 
5041 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5042 {
5043 	/* restore idle timeout */
5044 	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5045 	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5046 		wake_up(&connection->ping_wait);
5047 
5048 	return 0;
5049 }
5050 
5051 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5052 {
5053 	struct drbd_peer_device *peer_device;
5054 	struct drbd_device *device;
5055 	struct p_block_ack *p = pi->data;
5056 	sector_t sector = be64_to_cpu(p->sector);
5057 	int blksize = be32_to_cpu(p->blksize);
5058 
5059 	peer_device = conn_peer_device(connection, pi->vnr);
5060 	if (!peer_device)
5061 		return -EIO;
5062 	device = peer_device->device;
5063 
5064 	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5065 
5066 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5067 
5068 	if (get_ldev(device)) {
5069 		drbd_rs_complete_io(device, sector);
5070 		drbd_set_in_sync(device, sector, blksize);
5071 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5072 		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5073 		put_ldev(device);
5074 	}
5075 	dec_rs_pending(device);
5076 	atomic_add(blksize >> 9, &device->rs_sect_in);
5077 
5078 	return 0;
5079 }
5080 
5081 static int
5082 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5083 			      struct rb_root *root, const char *func,
5084 			      enum drbd_req_event what, bool missing_ok)
5085 {
5086 	struct drbd_request *req;
5087 	struct bio_and_error m;
5088 
5089 	spin_lock_irq(&device->resource->req_lock);
5090 	req = find_request(device, root, id, sector, missing_ok, func);
5091 	if (unlikely(!req)) {
5092 		spin_unlock_irq(&device->resource->req_lock);
5093 		return -EIO;
5094 	}
5095 	__req_mod(req, what, &m);
5096 	spin_unlock_irq(&device->resource->req_lock);
5097 
5098 	if (m.bio)
5099 		complete_master_bio(device, &m);
5100 	return 0;
5101 }
5102 
5103 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5104 {
5105 	struct drbd_peer_device *peer_device;
5106 	struct drbd_device *device;
5107 	struct p_block_ack *p = pi->data;
5108 	sector_t sector = be64_to_cpu(p->sector);
5109 	int blksize = be32_to_cpu(p->blksize);
5110 	enum drbd_req_event what;
5111 
5112 	peer_device = conn_peer_device(connection, pi->vnr);
5113 	if (!peer_device)
5114 		return -EIO;
5115 	device = peer_device->device;
5116 
5117 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5118 
5119 	if (p->block_id == ID_SYNCER) {
5120 		drbd_set_in_sync(device, sector, blksize);
5121 		dec_rs_pending(device);
5122 		return 0;
5123 	}
5124 	switch (pi->cmd) {
5125 	case P_RS_WRITE_ACK:
5126 		what = WRITE_ACKED_BY_PEER_AND_SIS;
5127 		break;
5128 	case P_WRITE_ACK:
5129 		what = WRITE_ACKED_BY_PEER;
5130 		break;
5131 	case P_RECV_ACK:
5132 		what = RECV_ACKED_BY_PEER;
5133 		break;
5134 	case P_SUPERSEDED:
5135 		what = CONFLICT_RESOLVED;
5136 		break;
5137 	case P_RETRY_WRITE:
5138 		what = POSTPONE_WRITE;
5139 		break;
5140 	default:
5141 		BUG();
5142 	}
5143 
5144 	return validate_req_change_req_state(device, p->block_id, sector,
5145 					     &device->write_requests, __func__,
5146 					     what, false);
5147 }
5148 
5149 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5150 {
5151 	struct drbd_peer_device *peer_device;
5152 	struct drbd_device *device;
5153 	struct p_block_ack *p = pi->data;
5154 	sector_t sector = be64_to_cpu(p->sector);
5155 	int size = be32_to_cpu(p->blksize);
5156 	int err;
5157 
5158 	peer_device = conn_peer_device(connection, pi->vnr);
5159 	if (!peer_device)
5160 		return -EIO;
5161 	device = peer_device->device;
5162 
5163 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5164 
5165 	if (p->block_id == ID_SYNCER) {
5166 		dec_rs_pending(device);
5167 		drbd_rs_failed_io(device, sector, size);
5168 		return 0;
5169 	}
5170 
5171 	err = validate_req_change_req_state(device, p->block_id, sector,
5172 					    &device->write_requests, __func__,
5173 					    NEG_ACKED, true);
5174 	if (err) {
5175 		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5176 		   The master bio might already be completed, therefore the
5177 		   request is no longer in the collision hash. */
5178 		/* In Protocol B we might already have got a P_RECV_ACK
5179 		   but then get a P_NEG_ACK afterwards. */
5180 		drbd_set_out_of_sync(device, sector, size);
5181 	}
5182 	return 0;
5183 }
5184 
5185 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5186 {
5187 	struct drbd_peer_device *peer_device;
5188 	struct drbd_device *device;
5189 	struct p_block_ack *p = pi->data;
5190 	sector_t sector = be64_to_cpu(p->sector);
5191 
5192 	peer_device = conn_peer_device(connection, pi->vnr);
5193 	if (!peer_device)
5194 		return -EIO;
5195 	device = peer_device->device;
5196 
5197 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5198 
5199 	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5200 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5201 
5202 	return validate_req_change_req_state(device, p->block_id, sector,
5203 					     &device->read_requests, __func__,
5204 					     NEG_ACKED, false);
5205 }
5206 
5207 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5208 {
5209 	struct drbd_peer_device *peer_device;
5210 	struct drbd_device *device;
5211 	sector_t sector;
5212 	int size;
5213 	struct p_block_ack *p = pi->data;
5214 
5215 	peer_device = conn_peer_device(connection, pi->vnr);
5216 	if (!peer_device)
5217 		return -EIO;
5218 	device = peer_device->device;
5219 
5220 	sector = be64_to_cpu(p->sector);
5221 	size = be32_to_cpu(p->blksize);
5222 
5223 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5224 
5225 	dec_rs_pending(device);
5226 
5227 	if (get_ldev_if_state(device, D_FAILED)) {
5228 		drbd_rs_complete_io(device, sector);
5229 		switch (pi->cmd) {
5230 		case P_NEG_RS_DREPLY:
5231 			drbd_rs_failed_io(device, sector, size);
5232 		case P_RS_CANCEL:
5233 			break;
5234 		default:
5235 			BUG();
5236 		}
5237 		put_ldev(device);
5238 	}
5239 
5240 	return 0;
5241 }
5242 
5243 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5244 {
5245 	struct p_barrier_ack *p = pi->data;
5246 	struct drbd_peer_device *peer_device;
5247 	int vnr;
5248 
5249 	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5250 
5251 	rcu_read_lock();
5252 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5253 		struct drbd_device *device = peer_device->device;
5254 
5255 		if (device->state.conn == C_AHEAD &&
5256 		    atomic_read(&device->ap_in_flight) == 0 &&
5257 		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5258 			device->start_resync_timer.expires = jiffies + HZ;
5259 			add_timer(&device->start_resync_timer);
5260 		}
5261 	}
5262 	rcu_read_unlock();
5263 
5264 	return 0;
5265 }
5266 
5267 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5268 {
5269 	struct drbd_peer_device *peer_device;
5270 	struct drbd_device *device;
5271 	struct p_block_ack *p = pi->data;
5272 	struct drbd_device_work *dw;
5273 	sector_t sector;
5274 	int size;
5275 
5276 	peer_device = conn_peer_device(connection, pi->vnr);
5277 	if (!peer_device)
5278 		return -EIO;
5279 	device = peer_device->device;
5280 
5281 	sector = be64_to_cpu(p->sector);
5282 	size = be32_to_cpu(p->blksize);
5283 
5284 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5285 
5286 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5287 		drbd_ov_out_of_sync_found(device, sector, size);
5288 	else
5289 		ov_out_of_sync_print(device);
5290 
5291 	if (!get_ldev(device))
5292 		return 0;
5293 
5294 	drbd_rs_complete_io(device, sector);
5295 	dec_rs_pending(device);
5296 
5297 	--device->ov_left;
5298 
5299 	/* let's advance progress step marks only for every other megabyte */
5300 	if ((device->ov_left & 0x200) == 0x200)
5301 		drbd_advance_rs_marks(device, device->ov_left);
5302 
5303 	if (device->ov_left == 0) {
5304 		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5305 		if (dw) {
5306 			dw->w.cb = w_ov_finished;
5307 			dw->device = device;
5308 			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5309 		} else {
5310 			drbd_err(device, "kmalloc(dw) failed.");
5311 			ov_out_of_sync_print(device);
5312 			drbd_resync_finished(device);
5313 		}
5314 	}
5315 	put_ldev(device);
5316 	return 0;
5317 }
5318 
5319 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5320 {
5321 	return 0;
5322 }
5323 
5324 static int connection_finish_peer_reqs(struct drbd_connection *connection)
5325 {
5326 	struct drbd_peer_device *peer_device;
5327 	int vnr, not_empty = 0;
5328 
5329 	do {
5330 		clear_bit(SIGNAL_ASENDER, &connection->flags);
5331 		flush_signals(current);
5332 
5333 		rcu_read_lock();
5334 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5335 			struct drbd_device *device = peer_device->device;
5336 			kref_get(&device->kref);
5337 			rcu_read_unlock();
5338 			if (drbd_finish_peer_reqs(device)) {
5339 				kref_put(&device->kref, drbd_destroy_device);
5340 				return 1;
5341 			}
5342 			kref_put(&device->kref, drbd_destroy_device);
5343 			rcu_read_lock();
5344 		}
5345 		set_bit(SIGNAL_ASENDER, &connection->flags);
5346 
5347 		spin_lock_irq(&connection->resource->req_lock);
5348 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5349 			struct drbd_device *device = peer_device->device;
5350 			not_empty = !list_empty(&device->done_ee);
5351 			if (not_empty)
5352 				break;
5353 		}
5354 		spin_unlock_irq(&connection->resource->req_lock);
5355 		rcu_read_unlock();
5356 	} while (not_empty);
5357 
5358 	return 0;
5359 }
5360 
5361 struct asender_cmd {
5362 	size_t pkt_size;
5363 	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5364 };
5365 
5366 static struct asender_cmd asender_tbl[] = {
5367 	[P_PING]	    = { 0, got_Ping },
5368 	[P_PING_ACK]	    = { 0, got_PingAck },
5369 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5370 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5371 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5372 	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5373 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5374 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5375 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5376 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
5377 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
5378 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5379 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5380 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5381 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5382 	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5383 	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5384 };
5385 
5386 int drbd_asender(struct drbd_thread *thi)
5387 {
5388 	struct drbd_connection *connection = thi->connection;
5389 	struct asender_cmd *cmd = NULL;
5390 	struct packet_info pi;
5391 	int rv;
5392 	void *buf    = connection->meta.rbuf;
5393 	int received = 0;
5394 	unsigned int header_size = drbd_header_size(connection);
5395 	int expect   = header_size;
5396 	bool ping_timeout_active = false;
5397 	struct net_conf *nc;
5398 	int ping_timeo, tcp_cork, ping_int;
5399 	struct sched_param param = { .sched_priority = 2 };
5400 
5401 	rv = sched_setscheduler(current, SCHED_RR, &param);
5402 	if (rv < 0)
5403 		drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5404 
5405 	while (get_t_state(thi) == RUNNING) {
5406 		drbd_thread_current_set_cpu(thi);
5407 
5408 		rcu_read_lock();
5409 		nc = rcu_dereference(connection->net_conf);
5410 		ping_timeo = nc->ping_timeo;
5411 		tcp_cork = nc->tcp_cork;
5412 		ping_int = nc->ping_int;
5413 		rcu_read_unlock();
5414 
5415 		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5416 			if (drbd_send_ping(connection)) {
5417 				drbd_err(connection, "drbd_send_ping has failed\n");
5418 				goto reconnect;
5419 			}
5420 			connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5421 			ping_timeout_active = true;
5422 		}
5423 
5424 		/* TODO: conditionally cork; it may hurt latency if we cork without
5425 		   much to send */
5426 		if (tcp_cork)
5427 			drbd_tcp_cork(connection->meta.socket);
5428 		if (connection_finish_peer_reqs(connection)) {
5429 			drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5430 			goto reconnect;
5431 		}
5432 		/* but unconditionally uncork unless disabled */
5433 		if (tcp_cork)
5434 			drbd_tcp_uncork(connection->meta.socket);
5435 
5436 		/* short circuit, recv_msg would return EINTR anyways. */
5437 		if (signal_pending(current))
5438 			continue;
5439 
5440 		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5441 		clear_bit(SIGNAL_ASENDER, &connection->flags);
5442 
5443 		flush_signals(current);
5444 
5445 		/* Note:
5446 		 * -EINTR	 (on meta) we got a signal
5447 		 * -EAGAIN	 (on meta) rcvtimeo expired
5448 		 * -ECONNRESET	 other side closed the connection
5449 		 * -ERESTARTSYS  (on data) we got a signal
5450 		 * rv <  0	 other than above: unexpected error!
5451 		 * rv == expected: full header or command
5452 		 * rv <  expected: "woken" by signal during receive
5453 		 * rv == 0	 : "connection shut down by peer"
5454 		 */
5455 		if (likely(rv > 0)) {
5456 			received += rv;
5457 			buf	 += rv;
5458 		} else if (rv == 0) {
5459 			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5460 				long t;
5461 				rcu_read_lock();
5462 				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5463 				rcu_read_unlock();
5464 
5465 				t = wait_event_timeout(connection->ping_wait,
5466 						       connection->cstate < C_WF_REPORT_PARAMS,
5467 						       t);
5468 				if (t)
5469 					break;
5470 			}
5471 			drbd_err(connection, "meta connection shut down by peer.\n");
5472 			goto reconnect;
5473 		} else if (rv == -EAGAIN) {
5474 			/* If the data socket received something meanwhile,
5475 			 * that is good enough: peer is still alive. */
5476 			if (time_after(connection->last_received,
5477 				jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5478 				continue;
5479 			if (ping_timeout_active) {
5480 				drbd_err(connection, "PingAck did not arrive in time.\n");
5481 				goto reconnect;
5482 			}
5483 			set_bit(SEND_PING, &connection->flags);
5484 			continue;
5485 		} else if (rv == -EINTR) {
5486 			continue;
5487 		} else {
5488 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5489 			goto reconnect;
5490 		}
5491 
5492 		if (received == expect && cmd == NULL) {
5493 			if (decode_header(connection, connection->meta.rbuf, &pi))
5494 				goto reconnect;
5495 			cmd = &asender_tbl[pi.cmd];
5496 			if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5497 				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5498 					 cmdname(pi.cmd), pi.cmd);
5499 				goto disconnect;
5500 			}
5501 			expect = header_size + cmd->pkt_size;
5502 			if (pi.size != expect - header_size) {
5503 				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5504 					pi.cmd, pi.size);
5505 				goto reconnect;
5506 			}
5507 		}
5508 		if (received == expect) {
5509 			bool err;
5510 
5511 			err = cmd->fn(connection, &pi);
5512 			if (err) {
5513 				drbd_err(connection, "%pf failed\n", cmd->fn);
5514 				goto reconnect;
5515 			}
5516 
5517 			connection->last_received = jiffies;
5518 
5519 			if (cmd == &asender_tbl[P_PING_ACK]) {
5520 				/* restore idle timeout */
5521 				connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5522 				ping_timeout_active = false;
5523 			}
5524 
5525 			buf	 = connection->meta.rbuf;
5526 			received = 0;
5527 			expect	 = header_size;
5528 			cmd	 = NULL;
5529 		}
5530 	}
5531 
5532 	if (0) {
5533 reconnect:
5534 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5535 		conn_md_sync(connection);
5536 	}
5537 	if (0) {
5538 disconnect:
5539 		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5540 	}
5541 	clear_bit(SIGNAL_ASENDER, &connection->flags);
5542 
5543 	drbd_info(connection, "asender terminated\n");
5544 
5545 	return 0;
5546 }
5547