xref: /openbmc/linux/drivers/block/drbd/drbd_main.c (revision 05bcf503)
1 /*
2    drbd.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12 
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17 
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22 
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 
27  */
28 
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48 
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52 
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56 
57 #include "drbd_vli.h"
58 
59 struct after_state_chg_work {
60 	struct drbd_work w;
61 	union drbd_state os;
62 	union drbd_state ns;
63 	enum chg_state_flags flags;
64 	struct completion *done;
65 };
66 
67 static DEFINE_MUTEX(drbd_main_mutex);
68 int drbdd_init(struct drbd_thread *);
69 int drbd_worker(struct drbd_thread *);
70 int drbd_asender(struct drbd_thread *);
71 
72 int drbd_init(void);
73 static int drbd_open(struct block_device *bdev, fmode_t mode);
74 static int drbd_release(struct gendisk *gd, fmode_t mode);
75 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
76 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
77 			   union drbd_state ns, enum chg_state_flags flags);
78 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
79 static void md_sync_timer_fn(unsigned long data);
80 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
81 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused);
82 static void _tl_clear(struct drbd_conf *mdev);
83 
84 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
85 	      "Lars Ellenberg <lars@linbit.com>");
86 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
87 MODULE_VERSION(REL_VERSION);
88 MODULE_LICENSE("GPL");
89 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices ("
90 		 __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
91 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
92 
93 #include <linux/moduleparam.h>
94 /* allow_open_on_secondary */
95 MODULE_PARM_DESC(allow_oos, "DONT USE!");
96 /* thanks to these macros, if compiled into the kernel (not-module),
97  * this becomes the boot parameter drbd.minor_count */
98 module_param(minor_count, uint, 0444);
99 module_param(disable_sendpage, bool, 0644);
100 module_param(allow_oos, bool, 0);
101 module_param(cn_idx, uint, 0444);
102 module_param(proc_details, int, 0644);
103 
104 #ifdef CONFIG_DRBD_FAULT_INJECTION
105 int enable_faults;
106 int fault_rate;
107 static int fault_count;
108 int fault_devs;
109 /* bitmap of enabled faults */
110 module_param(enable_faults, int, 0664);
111 /* fault rate % value - applies to all enabled faults */
112 module_param(fault_rate, int, 0664);
113 /* count of faults inserted */
114 module_param(fault_count, int, 0664);
115 /* bitmap of devices to insert faults on */
116 module_param(fault_devs, int, 0644);
117 #endif
118 
119 /* module parameter, defined */
120 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
121 bool disable_sendpage;
122 bool allow_oos;
123 unsigned int cn_idx = CN_IDX_DRBD;
124 int proc_details;       /* Detail level in proc drbd*/
125 
126 /* Module parameter for setting the user mode helper program
127  * to run. Default is /sbin/drbdadm */
128 char usermode_helper[80] = "/sbin/drbdadm";
129 
130 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
131 
132 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
133  * as member "struct gendisk *vdisk;"
134  */
135 struct drbd_conf **minor_table;
136 
137 struct kmem_cache *drbd_request_cache;
138 struct kmem_cache *drbd_ee_cache;	/* epoch entries */
139 struct kmem_cache *drbd_bm_ext_cache;	/* bitmap extents */
140 struct kmem_cache *drbd_al_ext_cache;	/* activity log extents */
141 mempool_t *drbd_request_mempool;
142 mempool_t *drbd_ee_mempool;
143 mempool_t *drbd_md_io_page_pool;
144 struct bio_set *drbd_md_io_bio_set;
145 
146 /* I do not use a standard mempool, because:
147    1) I want to hand out the pre-allocated objects first.
148    2) I want to be able to interrupt sleeping allocation with a signal.
149    Note: This is a single linked list, the next pointer is the private
150 	 member of struct page.
151  */
152 struct page *drbd_pp_pool;
153 spinlock_t   drbd_pp_lock;
154 int          drbd_pp_vacant;
155 wait_queue_head_t drbd_pp_wait;
156 
157 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
158 
159 static const struct block_device_operations drbd_ops = {
160 	.owner =   THIS_MODULE,
161 	.open =    drbd_open,
162 	.release = drbd_release,
163 };
164 
165 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
166 {
167 	if (!drbd_md_io_bio_set)
168 		return bio_alloc(gfp_mask, 1);
169 
170 	return bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
171 }
172 
173 #ifdef __CHECKER__
174 /* When checking with sparse, and this is an inline function, sparse will
175    give tons of false positives. When this is a real functions sparse works.
176  */
177 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
178 {
179 	int io_allowed;
180 
181 	atomic_inc(&mdev->local_cnt);
182 	io_allowed = (mdev->state.disk >= mins);
183 	if (!io_allowed) {
184 		if (atomic_dec_and_test(&mdev->local_cnt))
185 			wake_up(&mdev->misc_wait);
186 	}
187 	return io_allowed;
188 }
189 
190 #endif
191 
192 /**
193  * DOC: The transfer log
194  *
195  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
196  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
197  * of the list. There is always at least one &struct drbd_tl_epoch object.
198  *
199  * Each &struct drbd_tl_epoch has a circular double linked list of requests
200  * attached.
201  */
202 static int tl_init(struct drbd_conf *mdev)
203 {
204 	struct drbd_tl_epoch *b;
205 
206 	/* during device minor initialization, we may well use GFP_KERNEL */
207 	b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
208 	if (!b)
209 		return 0;
210 	INIT_LIST_HEAD(&b->requests);
211 	INIT_LIST_HEAD(&b->w.list);
212 	b->next = NULL;
213 	b->br_number = 4711;
214 	b->n_writes = 0;
215 	b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
216 
217 	mdev->oldest_tle = b;
218 	mdev->newest_tle = b;
219 	INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
220 	INIT_LIST_HEAD(&mdev->barrier_acked_requests);
221 
222 	mdev->tl_hash = NULL;
223 	mdev->tl_hash_s = 0;
224 
225 	return 1;
226 }
227 
228 static void tl_cleanup(struct drbd_conf *mdev)
229 {
230 	D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
231 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
232 	kfree(mdev->oldest_tle);
233 	mdev->oldest_tle = NULL;
234 	kfree(mdev->unused_spare_tle);
235 	mdev->unused_spare_tle = NULL;
236 	kfree(mdev->tl_hash);
237 	mdev->tl_hash = NULL;
238 	mdev->tl_hash_s = 0;
239 }
240 
241 /**
242  * _tl_add_barrier() - Adds a barrier to the transfer log
243  * @mdev:	DRBD device.
244  * @new:	Barrier to be added before the current head of the TL.
245  *
246  * The caller must hold the req_lock.
247  */
248 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
249 {
250 	struct drbd_tl_epoch *newest_before;
251 
252 	INIT_LIST_HEAD(&new->requests);
253 	INIT_LIST_HEAD(&new->w.list);
254 	new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
255 	new->next = NULL;
256 	new->n_writes = 0;
257 
258 	newest_before = mdev->newest_tle;
259 	new->br_number = newest_before->br_number+1;
260 	if (mdev->newest_tle != new) {
261 		mdev->newest_tle->next = new;
262 		mdev->newest_tle = new;
263 	}
264 }
265 
266 /**
267  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
268  * @mdev:	DRBD device.
269  * @barrier_nr:	Expected identifier of the DRBD write barrier packet.
270  * @set_size:	Expected number of requests before that barrier.
271  *
272  * In case the passed barrier_nr or set_size does not match the oldest
273  * &struct drbd_tl_epoch objects this function will cause a termination
274  * of the connection.
275  */
276 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
277 		       unsigned int set_size)
278 {
279 	struct drbd_tl_epoch *b, *nob; /* next old barrier */
280 	struct list_head *le, *tle;
281 	struct drbd_request *r;
282 
283 	spin_lock_irq(&mdev->req_lock);
284 
285 	b = mdev->oldest_tle;
286 
287 	/* first some paranoia code */
288 	if (b == NULL) {
289 		dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
290 			barrier_nr);
291 		goto bail;
292 	}
293 	if (b->br_number != barrier_nr) {
294 		dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
295 			barrier_nr, b->br_number);
296 		goto bail;
297 	}
298 	if (b->n_writes != set_size) {
299 		dev_err(DEV, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
300 			barrier_nr, set_size, b->n_writes);
301 		goto bail;
302 	}
303 
304 	/* Clean up list of requests processed during current epoch */
305 	list_for_each_safe(le, tle, &b->requests) {
306 		r = list_entry(le, struct drbd_request, tl_requests);
307 		_req_mod(r, barrier_acked);
308 	}
309 	/* There could be requests on the list waiting for completion
310 	   of the write to the local disk. To avoid corruptions of
311 	   slab's data structures we have to remove the lists head.
312 
313 	   Also there could have been a barrier ack out of sequence, overtaking
314 	   the write acks - which would be a bug and violating write ordering.
315 	   To not deadlock in case we lose connection while such requests are
316 	   still pending, we need some way to find them for the
317 	   _req_mode(connection_lost_while_pending).
318 
319 	   These have been list_move'd to the out_of_sequence_requests list in
320 	   _req_mod(, barrier_acked) above.
321 	   */
322 	list_splice_init(&b->requests, &mdev->barrier_acked_requests);
323 
324 	nob = b->next;
325 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
326 		_tl_add_barrier(mdev, b);
327 		if (nob)
328 			mdev->oldest_tle = nob;
329 		/* if nob == NULL b was the only barrier, and becomes the new
330 		   barrier. Therefore mdev->oldest_tle points already to b */
331 	} else {
332 		D_ASSERT(nob != NULL);
333 		mdev->oldest_tle = nob;
334 		kfree(b);
335 	}
336 
337 	spin_unlock_irq(&mdev->req_lock);
338 	dec_ap_pending(mdev);
339 
340 	return;
341 
342 bail:
343 	spin_unlock_irq(&mdev->req_lock);
344 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
345 }
346 
347 
348 /**
349  * _tl_restart() - Walks the transfer log, and applies an action to all requests
350  * @mdev:	DRBD device.
351  * @what:       The action/event to perform with all request objects
352  *
353  * @what might be one of connection_lost_while_pending, resend, fail_frozen_disk_io,
354  * restart_frozen_disk_io.
355  */
356 static void _tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
357 {
358 	struct drbd_tl_epoch *b, *tmp, **pn;
359 	struct list_head *le, *tle, carry_reads;
360 	struct drbd_request *req;
361 	int rv, n_writes, n_reads;
362 
363 	b = mdev->oldest_tle;
364 	pn = &mdev->oldest_tle;
365 	while (b) {
366 		n_writes = 0;
367 		n_reads = 0;
368 		INIT_LIST_HEAD(&carry_reads);
369 		list_for_each_safe(le, tle, &b->requests) {
370 			req = list_entry(le, struct drbd_request, tl_requests);
371 			rv = _req_mod(req, what);
372 
373 			n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
374 			n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
375 		}
376 		tmp = b->next;
377 
378 		if (n_writes) {
379 			if (what == resend) {
380 				b->n_writes = n_writes;
381 				if (b->w.cb == NULL) {
382 					b->w.cb = w_send_barrier;
383 					inc_ap_pending(mdev);
384 					set_bit(CREATE_BARRIER, &mdev->flags);
385 				}
386 
387 				drbd_queue_work(&mdev->data.work, &b->w);
388 			}
389 			pn = &b->next;
390 		} else {
391 			if (n_reads)
392 				list_add(&carry_reads, &b->requests);
393 			/* there could still be requests on that ring list,
394 			 * in case local io is still pending */
395 			list_del(&b->requests);
396 
397 			/* dec_ap_pending corresponding to queue_barrier.
398 			 * the newest barrier may not have been queued yet,
399 			 * in which case w.cb is still NULL. */
400 			if (b->w.cb != NULL)
401 				dec_ap_pending(mdev);
402 
403 			if (b == mdev->newest_tle) {
404 				/* recycle, but reinit! */
405 				D_ASSERT(tmp == NULL);
406 				INIT_LIST_HEAD(&b->requests);
407 				list_splice(&carry_reads, &b->requests);
408 				INIT_LIST_HEAD(&b->w.list);
409 				b->w.cb = NULL;
410 				b->br_number = net_random();
411 				b->n_writes = 0;
412 
413 				*pn = b;
414 				break;
415 			}
416 			*pn = tmp;
417 			kfree(b);
418 		}
419 		b = tmp;
420 		list_splice(&carry_reads, &b->requests);
421 	}
422 
423 	/* Actions operating on the disk state, also want to work on
424 	   requests that got barrier acked. */
425 
426 	list_for_each_safe(le, tle, &mdev->barrier_acked_requests) {
427 		req = list_entry(le, struct drbd_request, tl_requests);
428 		_req_mod(req, what);
429 	}
430 }
431 
432 
433 /**
434  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
435  * @mdev:	DRBD device.
436  *
437  * This is called after the connection to the peer was lost. The storage covered
438  * by the requests on the transfer gets marked as our of sync. Called from the
439  * receiver thread and the worker thread.
440  */
441 void tl_clear(struct drbd_conf *mdev)
442 {
443 	spin_lock_irq(&mdev->req_lock);
444 	_tl_clear(mdev);
445 	spin_unlock_irq(&mdev->req_lock);
446 }
447 
448 static void _tl_clear(struct drbd_conf *mdev)
449 {
450 	struct list_head *le, *tle;
451 	struct drbd_request *r;
452 
453 	_tl_restart(mdev, connection_lost_while_pending);
454 
455 	/* we expect this list to be empty. */
456 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
457 
458 	/* but just in case, clean it up anyways! */
459 	list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
460 		r = list_entry(le, struct drbd_request, tl_requests);
461 		/* It would be nice to complete outside of spinlock.
462 		 * But this is easier for now. */
463 		_req_mod(r, connection_lost_while_pending);
464 	}
465 
466 	/* ensure bit indicating barrier is required is clear */
467 	clear_bit(CREATE_BARRIER, &mdev->flags);
468 
469 	memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
470 
471 }
472 
473 void tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
474 {
475 	spin_lock_irq(&mdev->req_lock);
476 	_tl_restart(mdev, what);
477 	spin_unlock_irq(&mdev->req_lock);
478 }
479 
480 /**
481  * tl_abort_disk_io() - Abort disk I/O for all requests for a certain mdev in the TL
482  * @mdev:	DRBD device.
483  */
484 void tl_abort_disk_io(struct drbd_conf *mdev)
485 {
486 	struct drbd_tl_epoch *b;
487 	struct list_head *le, *tle;
488 	struct drbd_request *req;
489 
490 	spin_lock_irq(&mdev->req_lock);
491 	b = mdev->oldest_tle;
492 	while (b) {
493 		list_for_each_safe(le, tle, &b->requests) {
494 			req = list_entry(le, struct drbd_request, tl_requests);
495 			if (!(req->rq_state & RQ_LOCAL_PENDING))
496 				continue;
497 			_req_mod(req, abort_disk_io);
498 		}
499 		b = b->next;
500 	}
501 
502 	list_for_each_safe(le, tle, &mdev->barrier_acked_requests) {
503 		req = list_entry(le, struct drbd_request, tl_requests);
504 		if (!(req->rq_state & RQ_LOCAL_PENDING))
505 			continue;
506 		_req_mod(req, abort_disk_io);
507 	}
508 
509 	spin_unlock_irq(&mdev->req_lock);
510 }
511 
512 /**
513  * cl_wide_st_chg() - true if the state change is a cluster wide one
514  * @mdev:	DRBD device.
515  * @os:		old (current) state.
516  * @ns:		new (wanted) state.
517  */
518 static int cl_wide_st_chg(struct drbd_conf *mdev,
519 			  union drbd_state os, union drbd_state ns)
520 {
521 	return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
522 		 ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
523 		  (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
524 		  (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
525 		  (os.disk != D_FAILED && ns.disk == D_FAILED))) ||
526 		(os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
527 		(os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
528 }
529 
530 enum drbd_state_rv
531 drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
532 		  union drbd_state mask, union drbd_state val)
533 {
534 	unsigned long flags;
535 	union drbd_state os, ns;
536 	enum drbd_state_rv rv;
537 
538 	spin_lock_irqsave(&mdev->req_lock, flags);
539 	os = mdev->state;
540 	ns.i = (os.i & ~mask.i) | val.i;
541 	rv = _drbd_set_state(mdev, ns, f, NULL);
542 	ns = mdev->state;
543 	spin_unlock_irqrestore(&mdev->req_lock, flags);
544 
545 	return rv;
546 }
547 
548 /**
549  * drbd_force_state() - Impose a change which happens outside our control on our state
550  * @mdev:	DRBD device.
551  * @mask:	mask of state bits to change.
552  * @val:	value of new state bits.
553  */
554 void drbd_force_state(struct drbd_conf *mdev,
555 	union drbd_state mask, union drbd_state val)
556 {
557 	drbd_change_state(mdev, CS_HARD, mask, val);
558 }
559 
560 static enum drbd_state_rv is_valid_state(struct drbd_conf *, union drbd_state);
561 static enum drbd_state_rv is_valid_state_transition(struct drbd_conf *,
562 						    union drbd_state,
563 						    union drbd_state);
564 enum sanitize_state_warnings {
565 	NO_WARNING,
566 	ABORTED_ONLINE_VERIFY,
567 	ABORTED_RESYNC,
568 	CONNECTION_LOST_NEGOTIATING,
569 	IMPLICITLY_UPGRADED_DISK,
570 	IMPLICITLY_UPGRADED_PDSK,
571 };
572 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
573 				       union drbd_state ns, enum sanitize_state_warnings *warn);
574 int drbd_send_state_req(struct drbd_conf *,
575 			union drbd_state, union drbd_state);
576 
577 static enum drbd_state_rv
578 _req_st_cond(struct drbd_conf *mdev, union drbd_state mask,
579 	     union drbd_state val)
580 {
581 	union drbd_state os, ns;
582 	unsigned long flags;
583 	enum drbd_state_rv rv;
584 
585 	if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
586 		return SS_CW_SUCCESS;
587 
588 	if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
589 		return SS_CW_FAILED_BY_PEER;
590 
591 	rv = 0;
592 	spin_lock_irqsave(&mdev->req_lock, flags);
593 	os = mdev->state;
594 	ns.i = (os.i & ~mask.i) | val.i;
595 	ns = sanitize_state(mdev, os, ns, NULL);
596 
597 	if (!cl_wide_st_chg(mdev, os, ns))
598 		rv = SS_CW_NO_NEED;
599 	if (!rv) {
600 		rv = is_valid_state(mdev, ns);
601 		if (rv == SS_SUCCESS) {
602 			rv = is_valid_state_transition(mdev, ns, os);
603 			if (rv == SS_SUCCESS)
604 				rv = SS_UNKNOWN_ERROR; /* cont waiting, otherwise fail. */
605 		}
606 	}
607 	spin_unlock_irqrestore(&mdev->req_lock, flags);
608 
609 	return rv;
610 }
611 
612 /**
613  * drbd_req_state() - Perform an eventually cluster wide state change
614  * @mdev:	DRBD device.
615  * @mask:	mask of state bits to change.
616  * @val:	value of new state bits.
617  * @f:		flags
618  *
619  * Should not be called directly, use drbd_request_state() or
620  * _drbd_request_state().
621  */
622 static enum drbd_state_rv
623 drbd_req_state(struct drbd_conf *mdev, union drbd_state mask,
624 	       union drbd_state val, enum chg_state_flags f)
625 {
626 	struct completion done;
627 	unsigned long flags;
628 	union drbd_state os, ns;
629 	enum drbd_state_rv rv;
630 
631 	init_completion(&done);
632 
633 	if (f & CS_SERIALIZE)
634 		mutex_lock(&mdev->state_mutex);
635 
636 	spin_lock_irqsave(&mdev->req_lock, flags);
637 	os = mdev->state;
638 	ns.i = (os.i & ~mask.i) | val.i;
639 	ns = sanitize_state(mdev, os, ns, NULL);
640 
641 	if (cl_wide_st_chg(mdev, os, ns)) {
642 		rv = is_valid_state(mdev, ns);
643 		if (rv == SS_SUCCESS)
644 			rv = is_valid_state_transition(mdev, ns, os);
645 		spin_unlock_irqrestore(&mdev->req_lock, flags);
646 
647 		if (rv < SS_SUCCESS) {
648 			if (f & CS_VERBOSE)
649 				print_st_err(mdev, os, ns, rv);
650 			goto abort;
651 		}
652 
653 		drbd_state_lock(mdev);
654 		if (!drbd_send_state_req(mdev, mask, val)) {
655 			drbd_state_unlock(mdev);
656 			rv = SS_CW_FAILED_BY_PEER;
657 			if (f & CS_VERBOSE)
658 				print_st_err(mdev, os, ns, rv);
659 			goto abort;
660 		}
661 
662 		wait_event(mdev->state_wait,
663 			(rv = _req_st_cond(mdev, mask, val)));
664 
665 		if (rv < SS_SUCCESS) {
666 			drbd_state_unlock(mdev);
667 			if (f & CS_VERBOSE)
668 				print_st_err(mdev, os, ns, rv);
669 			goto abort;
670 		}
671 		spin_lock_irqsave(&mdev->req_lock, flags);
672 		os = mdev->state;
673 		ns.i = (os.i & ~mask.i) | val.i;
674 		rv = _drbd_set_state(mdev, ns, f, &done);
675 		drbd_state_unlock(mdev);
676 	} else {
677 		rv = _drbd_set_state(mdev, ns, f, &done);
678 	}
679 
680 	spin_unlock_irqrestore(&mdev->req_lock, flags);
681 
682 	if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
683 		D_ASSERT(current != mdev->worker.task);
684 		wait_for_completion(&done);
685 	}
686 
687 abort:
688 	if (f & CS_SERIALIZE)
689 		mutex_unlock(&mdev->state_mutex);
690 
691 	return rv;
692 }
693 
694 /**
695  * _drbd_request_state() - Request a state change (with flags)
696  * @mdev:	DRBD device.
697  * @mask:	mask of state bits to change.
698  * @val:	value of new state bits.
699  * @f:		flags
700  *
701  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
702  * flag, or when logging of failed state change requests is not desired.
703  */
704 enum drbd_state_rv
705 _drbd_request_state(struct drbd_conf *mdev, union drbd_state mask,
706 		    union drbd_state val, enum chg_state_flags f)
707 {
708 	enum drbd_state_rv rv;
709 
710 	wait_event(mdev->state_wait,
711 		   (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
712 
713 	return rv;
714 }
715 
716 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
717 {
718 	dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
719 	    name,
720 	    drbd_conn_str(ns.conn),
721 	    drbd_role_str(ns.role),
722 	    drbd_role_str(ns.peer),
723 	    drbd_disk_str(ns.disk),
724 	    drbd_disk_str(ns.pdsk),
725 	    is_susp(ns) ? 's' : 'r',
726 	    ns.aftr_isp ? 'a' : '-',
727 	    ns.peer_isp ? 'p' : '-',
728 	    ns.user_isp ? 'u' : '-'
729 	    );
730 }
731 
732 void print_st_err(struct drbd_conf *mdev, union drbd_state os,
733 	          union drbd_state ns, enum drbd_state_rv err)
734 {
735 	if (err == SS_IN_TRANSIENT_STATE)
736 		return;
737 	dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
738 	print_st(mdev, " state", os);
739 	print_st(mdev, "wanted", ns);
740 }
741 
742 
743 /**
744  * is_valid_state() - Returns an SS_ error code if ns is not valid
745  * @mdev:	DRBD device.
746  * @ns:		State to consider.
747  */
748 static enum drbd_state_rv
749 is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
750 {
751 	/* See drbd_state_sw_errors in drbd_strings.c */
752 
753 	enum drbd_fencing_p fp;
754 	enum drbd_state_rv rv = SS_SUCCESS;
755 
756 	fp = FP_DONT_CARE;
757 	if (get_ldev(mdev)) {
758 		fp = mdev->ldev->dc.fencing;
759 		put_ldev(mdev);
760 	}
761 
762 	if (get_net_conf(mdev)) {
763 		if (!mdev->net_conf->two_primaries &&
764 		    ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
765 			rv = SS_TWO_PRIMARIES;
766 		put_net_conf(mdev);
767 	}
768 
769 	if (rv <= 0)
770 		/* already found a reason to abort */;
771 	else if (ns.role == R_SECONDARY && mdev->open_cnt)
772 		rv = SS_DEVICE_IN_USE;
773 
774 	else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
775 		rv = SS_NO_UP_TO_DATE_DISK;
776 
777 	else if (fp >= FP_RESOURCE &&
778 		 ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
779 		rv = SS_PRIMARY_NOP;
780 
781 	else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
782 		rv = SS_NO_UP_TO_DATE_DISK;
783 
784 	else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
785 		rv = SS_NO_LOCAL_DISK;
786 
787 	else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
788 		rv = SS_NO_REMOTE_DISK;
789 
790 	else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
791 		rv = SS_NO_UP_TO_DATE_DISK;
792 
793 	else if ((ns.conn == C_CONNECTED ||
794 		  ns.conn == C_WF_BITMAP_S ||
795 		  ns.conn == C_SYNC_SOURCE ||
796 		  ns.conn == C_PAUSED_SYNC_S) &&
797 		  ns.disk == D_OUTDATED)
798 		rv = SS_CONNECTED_OUTDATES;
799 
800 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
801 		 (mdev->sync_conf.verify_alg[0] == 0))
802 		rv = SS_NO_VERIFY_ALG;
803 
804 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
805 		  mdev->agreed_pro_version < 88)
806 		rv = SS_NOT_SUPPORTED;
807 
808 	else if (ns.conn >= C_CONNECTED && ns.pdsk == D_UNKNOWN)
809 		rv = SS_CONNECTED_OUTDATES;
810 
811 	return rv;
812 }
813 
814 /**
815  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
816  * @mdev:	DRBD device.
817  * @ns:		new state.
818  * @os:		old state.
819  */
820 static enum drbd_state_rv
821 is_valid_state_transition(struct drbd_conf *mdev, union drbd_state ns,
822 			  union drbd_state os)
823 {
824 	enum drbd_state_rv rv = SS_SUCCESS;
825 
826 	if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
827 	    os.conn > C_CONNECTED)
828 		rv = SS_RESYNC_RUNNING;
829 
830 	if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
831 		rv = SS_ALREADY_STANDALONE;
832 
833 	if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
834 		rv = SS_IS_DISKLESS;
835 
836 	if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
837 		rv = SS_NO_NET_CONFIG;
838 
839 	if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
840 		rv = SS_LOWER_THAN_OUTDATED;
841 
842 	if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
843 		rv = SS_IN_TRANSIENT_STATE;
844 
845 	if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
846 		rv = SS_IN_TRANSIENT_STATE;
847 
848 	/* While establishing a connection only allow cstate to change.
849 	   Delay/refuse role changes, detach attach etc... */
850 	if (test_bit(STATE_SENT, &mdev->flags) &&
851 	    !(os.conn == C_WF_REPORT_PARAMS ||
852 	      (ns.conn == C_WF_REPORT_PARAMS && os.conn == C_WF_CONNECTION)))
853 		rv = SS_IN_TRANSIENT_STATE;
854 
855 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
856 		rv = SS_NEED_CONNECTION;
857 
858 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
859 	    ns.conn != os.conn && os.conn > C_CONNECTED)
860 		rv = SS_RESYNC_RUNNING;
861 
862 	if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
863 	    os.conn < C_CONNECTED)
864 		rv = SS_NEED_CONNECTION;
865 
866 	if ((ns.conn == C_SYNC_TARGET || ns.conn == C_SYNC_SOURCE)
867 	    && os.conn < C_WF_REPORT_PARAMS)
868 		rv = SS_NEED_CONNECTION; /* No NetworkFailure -> SyncTarget etc... */
869 
870 	return rv;
871 }
872 
873 static void print_sanitize_warnings(struct drbd_conf *mdev, enum sanitize_state_warnings warn)
874 {
875 	static const char *msg_table[] = {
876 		[NO_WARNING] = "",
877 		[ABORTED_ONLINE_VERIFY] = "Online-verify aborted.",
878 		[ABORTED_RESYNC] = "Resync aborted.",
879 		[CONNECTION_LOST_NEGOTIATING] = "Connection lost while negotiating, no data!",
880 		[IMPLICITLY_UPGRADED_DISK] = "Implicitly upgraded disk",
881 		[IMPLICITLY_UPGRADED_PDSK] = "Implicitly upgraded pdsk",
882 	};
883 
884 	if (warn != NO_WARNING)
885 		dev_warn(DEV, "%s\n", msg_table[warn]);
886 }
887 
888 /**
889  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
890  * @mdev:	DRBD device.
891  * @os:		old state.
892  * @ns:		new state.
893  * @warn_sync_abort:
894  *
895  * When we loose connection, we have to set the state of the peers disk (pdsk)
896  * to D_UNKNOWN. This rule and many more along those lines are in this function.
897  */
898 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
899 				       union drbd_state ns, enum sanitize_state_warnings *warn)
900 {
901 	enum drbd_fencing_p fp;
902 	enum drbd_disk_state disk_min, disk_max, pdsk_min, pdsk_max;
903 
904 	if (warn)
905 		*warn = NO_WARNING;
906 
907 	fp = FP_DONT_CARE;
908 	if (get_ldev(mdev)) {
909 		fp = mdev->ldev->dc.fencing;
910 		put_ldev(mdev);
911 	}
912 
913 	/* Disallow Network errors to configure a device's network part */
914 	if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
915 	    os.conn <= C_DISCONNECTING)
916 		ns.conn = os.conn;
917 
918 	/* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow.
919 	 * If you try to go into some Sync* state, that shall fail (elsewhere). */
920 	if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
921 	    ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING && ns.conn <= C_CONNECTED)
922 		ns.conn = os.conn;
923 
924 	/* we cannot fail (again) if we already detached */
925 	if (ns.disk == D_FAILED && os.disk == D_DISKLESS)
926 		ns.disk = D_DISKLESS;
927 
928 	/* After C_DISCONNECTING only C_STANDALONE may follow */
929 	if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
930 		ns.conn = os.conn;
931 
932 	if (ns.conn < C_CONNECTED) {
933 		ns.peer_isp = 0;
934 		ns.peer = R_UNKNOWN;
935 		if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
936 			ns.pdsk = D_UNKNOWN;
937 	}
938 
939 	/* Clear the aftr_isp when becoming unconfigured */
940 	if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
941 		ns.aftr_isp = 0;
942 
943 	/* Abort resync if a disk fails/detaches */
944 	if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
945 	    (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
946 		if (warn)
947 			*warn =	os.conn == C_VERIFY_S || os.conn == C_VERIFY_T ?
948 				ABORTED_ONLINE_VERIFY : ABORTED_RESYNC;
949 		ns.conn = C_CONNECTED;
950 	}
951 
952 	/* Connection breaks down before we finished "Negotiating" */
953 	if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
954 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
955 		if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
956 			ns.disk = mdev->new_state_tmp.disk;
957 			ns.pdsk = mdev->new_state_tmp.pdsk;
958 		} else {
959 			if (warn)
960 				*warn = CONNECTION_LOST_NEGOTIATING;
961 			ns.disk = D_DISKLESS;
962 			ns.pdsk = D_UNKNOWN;
963 		}
964 		put_ldev(mdev);
965 	}
966 
967 	/* D_CONSISTENT and D_OUTDATED vanish when we get connected */
968 	if (ns.conn >= C_CONNECTED && ns.conn < C_AHEAD) {
969 		if (ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED)
970 			ns.disk = D_UP_TO_DATE;
971 		if (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)
972 			ns.pdsk = D_UP_TO_DATE;
973 	}
974 
975 	/* Implications of the connection stat on the disk states */
976 	disk_min = D_DISKLESS;
977 	disk_max = D_UP_TO_DATE;
978 	pdsk_min = D_INCONSISTENT;
979 	pdsk_max = D_UNKNOWN;
980 	switch ((enum drbd_conns)ns.conn) {
981 	case C_WF_BITMAP_T:
982 	case C_PAUSED_SYNC_T:
983 	case C_STARTING_SYNC_T:
984 	case C_WF_SYNC_UUID:
985 	case C_BEHIND:
986 		disk_min = D_INCONSISTENT;
987 		disk_max = D_OUTDATED;
988 		pdsk_min = D_UP_TO_DATE;
989 		pdsk_max = D_UP_TO_DATE;
990 		break;
991 	case C_VERIFY_S:
992 	case C_VERIFY_T:
993 		disk_min = D_UP_TO_DATE;
994 		disk_max = D_UP_TO_DATE;
995 		pdsk_min = D_UP_TO_DATE;
996 		pdsk_max = D_UP_TO_DATE;
997 		break;
998 	case C_CONNECTED:
999 		disk_min = D_DISKLESS;
1000 		disk_max = D_UP_TO_DATE;
1001 		pdsk_min = D_DISKLESS;
1002 		pdsk_max = D_UP_TO_DATE;
1003 		break;
1004 	case C_WF_BITMAP_S:
1005 	case C_PAUSED_SYNC_S:
1006 	case C_STARTING_SYNC_S:
1007 	case C_AHEAD:
1008 		disk_min = D_UP_TO_DATE;
1009 		disk_max = D_UP_TO_DATE;
1010 		pdsk_min = D_INCONSISTENT;
1011 		pdsk_max = D_CONSISTENT; /* D_OUTDATED would be nice. But explicit outdate necessary*/
1012 		break;
1013 	case C_SYNC_TARGET:
1014 		disk_min = D_INCONSISTENT;
1015 		disk_max = D_INCONSISTENT;
1016 		pdsk_min = D_UP_TO_DATE;
1017 		pdsk_max = D_UP_TO_DATE;
1018 		break;
1019 	case C_SYNC_SOURCE:
1020 		disk_min = D_UP_TO_DATE;
1021 		disk_max = D_UP_TO_DATE;
1022 		pdsk_min = D_INCONSISTENT;
1023 		pdsk_max = D_INCONSISTENT;
1024 		break;
1025 	case C_STANDALONE:
1026 	case C_DISCONNECTING:
1027 	case C_UNCONNECTED:
1028 	case C_TIMEOUT:
1029 	case C_BROKEN_PIPE:
1030 	case C_NETWORK_FAILURE:
1031 	case C_PROTOCOL_ERROR:
1032 	case C_TEAR_DOWN:
1033 	case C_WF_CONNECTION:
1034 	case C_WF_REPORT_PARAMS:
1035 	case C_MASK:
1036 		break;
1037 	}
1038 	if (ns.disk > disk_max)
1039 		ns.disk = disk_max;
1040 
1041 	if (ns.disk < disk_min) {
1042 		if (warn)
1043 			*warn = IMPLICITLY_UPGRADED_DISK;
1044 		ns.disk = disk_min;
1045 	}
1046 	if (ns.pdsk > pdsk_max)
1047 		ns.pdsk = pdsk_max;
1048 
1049 	if (ns.pdsk < pdsk_min) {
1050 		if (warn)
1051 			*warn = IMPLICITLY_UPGRADED_PDSK;
1052 		ns.pdsk = pdsk_min;
1053 	}
1054 
1055 	if (fp == FP_STONITH &&
1056 	    (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
1057 	    !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
1058 		ns.susp_fen = 1; /* Suspend IO while fence-peer handler runs (peer lost) */
1059 
1060 	if (mdev->sync_conf.on_no_data == OND_SUSPEND_IO &&
1061 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE) &&
1062 	    !(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE))
1063 		ns.susp_nod = 1; /* Suspend IO while no data available (no accessible data available) */
1064 
1065 	if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
1066 		if (ns.conn == C_SYNC_SOURCE)
1067 			ns.conn = C_PAUSED_SYNC_S;
1068 		if (ns.conn == C_SYNC_TARGET)
1069 			ns.conn = C_PAUSED_SYNC_T;
1070 	} else {
1071 		if (ns.conn == C_PAUSED_SYNC_S)
1072 			ns.conn = C_SYNC_SOURCE;
1073 		if (ns.conn == C_PAUSED_SYNC_T)
1074 			ns.conn = C_SYNC_TARGET;
1075 	}
1076 
1077 	return ns;
1078 }
1079 
1080 /* helper for __drbd_set_state */
1081 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
1082 {
1083 	if (mdev->agreed_pro_version < 90)
1084 		mdev->ov_start_sector = 0;
1085 	mdev->rs_total = drbd_bm_bits(mdev);
1086 	mdev->ov_position = 0;
1087 	if (cs == C_VERIFY_T) {
1088 		/* starting online verify from an arbitrary position
1089 		 * does not fit well into the existing protocol.
1090 		 * on C_VERIFY_T, we initialize ov_left and friends
1091 		 * implicitly in receive_DataRequest once the
1092 		 * first P_OV_REQUEST is received */
1093 		mdev->ov_start_sector = ~(sector_t)0;
1094 	} else {
1095 		unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
1096 		if (bit >= mdev->rs_total) {
1097 			mdev->ov_start_sector =
1098 				BM_BIT_TO_SECT(mdev->rs_total - 1);
1099 			mdev->rs_total = 1;
1100 		} else
1101 			mdev->rs_total -= bit;
1102 		mdev->ov_position = mdev->ov_start_sector;
1103 	}
1104 	mdev->ov_left = mdev->rs_total;
1105 }
1106 
1107 static void drbd_resume_al(struct drbd_conf *mdev)
1108 {
1109 	if (test_and_clear_bit(AL_SUSPENDED, &mdev->flags))
1110 		dev_info(DEV, "Resumed AL updates\n");
1111 }
1112 
1113 /**
1114  * __drbd_set_state() - Set a new DRBD state
1115  * @mdev:	DRBD device.
1116  * @ns:		new state.
1117  * @flags:	Flags
1118  * @done:	Optional completion, that will get completed after the after_state_ch() finished
1119  *
1120  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
1121  */
1122 enum drbd_state_rv
1123 __drbd_set_state(struct drbd_conf *mdev, union drbd_state ns,
1124 	         enum chg_state_flags flags, struct completion *done)
1125 {
1126 	union drbd_state os;
1127 	enum drbd_state_rv rv = SS_SUCCESS;
1128 	enum sanitize_state_warnings ssw;
1129 	struct after_state_chg_work *ascw;
1130 
1131 	os = mdev->state;
1132 
1133 	ns = sanitize_state(mdev, os, ns, &ssw);
1134 
1135 	if (ns.i == os.i)
1136 		return SS_NOTHING_TO_DO;
1137 
1138 	if (!(flags & CS_HARD)) {
1139 		/*  pre-state-change checks ; only look at ns  */
1140 		/* See drbd_state_sw_errors in drbd_strings.c */
1141 
1142 		rv = is_valid_state(mdev, ns);
1143 		if (rv < SS_SUCCESS) {
1144 			/* If the old state was illegal as well, then let
1145 			   this happen...*/
1146 
1147 			if (is_valid_state(mdev, os) == rv)
1148 				rv = is_valid_state_transition(mdev, ns, os);
1149 		} else
1150 			rv = is_valid_state_transition(mdev, ns, os);
1151 	}
1152 
1153 	if (rv < SS_SUCCESS) {
1154 		if (flags & CS_VERBOSE)
1155 			print_st_err(mdev, os, ns, rv);
1156 		return rv;
1157 	}
1158 
1159 	print_sanitize_warnings(mdev, ssw);
1160 
1161 	{
1162 	char *pbp, pb[300];
1163 	pbp = pb;
1164 	*pbp = 0;
1165 	if (ns.role != os.role)
1166 		pbp += sprintf(pbp, "role( %s -> %s ) ",
1167 			       drbd_role_str(os.role),
1168 			       drbd_role_str(ns.role));
1169 	if (ns.peer != os.peer)
1170 		pbp += sprintf(pbp, "peer( %s -> %s ) ",
1171 			       drbd_role_str(os.peer),
1172 			       drbd_role_str(ns.peer));
1173 	if (ns.conn != os.conn)
1174 		pbp += sprintf(pbp, "conn( %s -> %s ) ",
1175 			       drbd_conn_str(os.conn),
1176 			       drbd_conn_str(ns.conn));
1177 	if (ns.disk != os.disk)
1178 		pbp += sprintf(pbp, "disk( %s -> %s ) ",
1179 			       drbd_disk_str(os.disk),
1180 			       drbd_disk_str(ns.disk));
1181 	if (ns.pdsk != os.pdsk)
1182 		pbp += sprintf(pbp, "pdsk( %s -> %s ) ",
1183 			       drbd_disk_str(os.pdsk),
1184 			       drbd_disk_str(ns.pdsk));
1185 	if (is_susp(ns) != is_susp(os))
1186 		pbp += sprintf(pbp, "susp( %d -> %d ) ",
1187 			       is_susp(os),
1188 			       is_susp(ns));
1189 	if (ns.aftr_isp != os.aftr_isp)
1190 		pbp += sprintf(pbp, "aftr_isp( %d -> %d ) ",
1191 			       os.aftr_isp,
1192 			       ns.aftr_isp);
1193 	if (ns.peer_isp != os.peer_isp)
1194 		pbp += sprintf(pbp, "peer_isp( %d -> %d ) ",
1195 			       os.peer_isp,
1196 			       ns.peer_isp);
1197 	if (ns.user_isp != os.user_isp)
1198 		pbp += sprintf(pbp, "user_isp( %d -> %d ) ",
1199 			       os.user_isp,
1200 			       ns.user_isp);
1201 	dev_info(DEV, "%s\n", pb);
1202 	}
1203 
1204 	/* solve the race between becoming unconfigured,
1205 	 * worker doing the cleanup, and
1206 	 * admin reconfiguring us:
1207 	 * on (re)configure, first set CONFIG_PENDING,
1208 	 * then wait for a potentially exiting worker,
1209 	 * start the worker, and schedule one no_op.
1210 	 * then proceed with configuration.
1211 	 */
1212 	if (ns.disk == D_DISKLESS &&
1213 	    ns.conn == C_STANDALONE &&
1214 	    ns.role == R_SECONDARY &&
1215 	    !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
1216 		set_bit(DEVICE_DYING, &mdev->flags);
1217 
1218 	/* if we are going -> D_FAILED or D_DISKLESS, grab one extra reference
1219 	 * on the ldev here, to be sure the transition -> D_DISKLESS resp.
1220 	 * drbd_ldev_destroy() won't happen before our corresponding
1221 	 * after_state_ch works run, where we put_ldev again. */
1222 	if ((os.disk != D_FAILED && ns.disk == D_FAILED) ||
1223 	    (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))
1224 		atomic_inc(&mdev->local_cnt);
1225 
1226 	mdev->state = ns;
1227 
1228 	if (os.disk == D_ATTACHING && ns.disk >= D_NEGOTIATING)
1229 		drbd_print_uuids(mdev, "attached to UUIDs");
1230 
1231 	wake_up(&mdev->misc_wait);
1232 	wake_up(&mdev->state_wait);
1233 
1234 	/* aborted verify run. log the last position */
1235 	if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1236 	    ns.conn < C_CONNECTED) {
1237 		mdev->ov_start_sector =
1238 			BM_BIT_TO_SECT(drbd_bm_bits(mdev) - mdev->ov_left);
1239 		dev_info(DEV, "Online Verify reached sector %llu\n",
1240 			(unsigned long long)mdev->ov_start_sector);
1241 	}
1242 
1243 	if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1244 	    (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1245 		dev_info(DEV, "Syncer continues.\n");
1246 		mdev->rs_paused += (long)jiffies
1247 				  -(long)mdev->rs_mark_time[mdev->rs_last_mark];
1248 		if (ns.conn == C_SYNC_TARGET)
1249 			mod_timer(&mdev->resync_timer, jiffies);
1250 	}
1251 
1252 	if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1253 	    (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1254 		dev_info(DEV, "Resync suspended\n");
1255 		mdev->rs_mark_time[mdev->rs_last_mark] = jiffies;
1256 	}
1257 
1258 	if (os.conn == C_CONNECTED &&
1259 	    (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1260 		unsigned long now = jiffies;
1261 		int i;
1262 
1263 		set_ov_position(mdev, ns.conn);
1264 		mdev->rs_start = now;
1265 		mdev->rs_last_events = 0;
1266 		mdev->rs_last_sect_ev = 0;
1267 		mdev->ov_last_oos_size = 0;
1268 		mdev->ov_last_oos_start = 0;
1269 
1270 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1271 			mdev->rs_mark_left[i] = mdev->ov_left;
1272 			mdev->rs_mark_time[i] = now;
1273 		}
1274 
1275 		drbd_rs_controller_reset(mdev);
1276 
1277 		if (ns.conn == C_VERIFY_S) {
1278 			dev_info(DEV, "Starting Online Verify from sector %llu\n",
1279 					(unsigned long long)mdev->ov_position);
1280 			mod_timer(&mdev->resync_timer, jiffies);
1281 		}
1282 	}
1283 
1284 	if (get_ldev(mdev)) {
1285 		u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1286 						 MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1287 						 MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1288 
1289 		if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1290 			mdf |= MDF_CRASHED_PRIMARY;
1291 		if (mdev->state.role == R_PRIMARY ||
1292 		    (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1293 			mdf |= MDF_PRIMARY_IND;
1294 		if (mdev->state.conn > C_WF_REPORT_PARAMS)
1295 			mdf |= MDF_CONNECTED_IND;
1296 		if (mdev->state.disk > D_INCONSISTENT)
1297 			mdf |= MDF_CONSISTENT;
1298 		if (mdev->state.disk > D_OUTDATED)
1299 			mdf |= MDF_WAS_UP_TO_DATE;
1300 		if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1301 			mdf |= MDF_PEER_OUT_DATED;
1302 		if (mdf != mdev->ldev->md.flags) {
1303 			mdev->ldev->md.flags = mdf;
1304 			drbd_md_mark_dirty(mdev);
1305 		}
1306 		if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1307 			drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1308 		put_ldev(mdev);
1309 	}
1310 
1311 	/* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1312 	if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1313 	    os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1314 		set_bit(CONSIDER_RESYNC, &mdev->flags);
1315 
1316 	/* Receiver should clean up itself */
1317 	if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1318 		drbd_thread_stop_nowait(&mdev->receiver);
1319 
1320 	/* Now the receiver finished cleaning up itself, it should die */
1321 	if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1322 		drbd_thread_stop_nowait(&mdev->receiver);
1323 
1324 	/* Upon network failure, we need to restart the receiver. */
1325 	if (os.conn > C_WF_CONNECTION &&
1326 	    ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1327 		drbd_thread_restart_nowait(&mdev->receiver);
1328 
1329 	/* Resume AL writing if we get a connection */
1330 	if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1331 		drbd_resume_al(mdev);
1332 
1333 	/* remember last connect and attach times so request_timer_fn() won't
1334 	 * kill newly established sessions while we are still trying to thaw
1335 	 * previously frozen IO */
1336 	if (os.conn != C_WF_REPORT_PARAMS && ns.conn == C_WF_REPORT_PARAMS)
1337 		mdev->last_reconnect_jif = jiffies;
1338 	if ((os.disk == D_ATTACHING || os.disk == D_NEGOTIATING) &&
1339 	    ns.disk > D_NEGOTIATING)
1340 		mdev->last_reattach_jif = jiffies;
1341 
1342 	ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1343 	if (ascw) {
1344 		ascw->os = os;
1345 		ascw->ns = ns;
1346 		ascw->flags = flags;
1347 		ascw->w.cb = w_after_state_ch;
1348 		ascw->done = done;
1349 		drbd_queue_work(&mdev->data.work, &ascw->w);
1350 	} else {
1351 		dev_warn(DEV, "Could not kmalloc an ascw\n");
1352 	}
1353 
1354 	return rv;
1355 }
1356 
1357 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1358 {
1359 	struct after_state_chg_work *ascw =
1360 		container_of(w, struct after_state_chg_work, w);
1361 	after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1362 	if (ascw->flags & CS_WAIT_COMPLETE) {
1363 		D_ASSERT(ascw->done != NULL);
1364 		complete(ascw->done);
1365 	}
1366 	kfree(ascw);
1367 
1368 	return 1;
1369 }
1370 
1371 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1372 {
1373 	if (rv) {
1374 		dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1375 		_drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1376 		return;
1377 	}
1378 
1379 	switch (mdev->state.conn) {
1380 	case C_STARTING_SYNC_T:
1381 		_drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1382 		break;
1383 	case C_STARTING_SYNC_S:
1384 		drbd_start_resync(mdev, C_SYNC_SOURCE);
1385 		break;
1386 	}
1387 }
1388 
1389 int drbd_bitmap_io_from_worker(struct drbd_conf *mdev,
1390 		int (*io_fn)(struct drbd_conf *),
1391 		char *why, enum bm_flag flags)
1392 {
1393 	int rv;
1394 
1395 	D_ASSERT(current == mdev->worker.task);
1396 
1397 	/* open coded non-blocking drbd_suspend_io(mdev); */
1398 	set_bit(SUSPEND_IO, &mdev->flags);
1399 
1400 	drbd_bm_lock(mdev, why, flags);
1401 	rv = io_fn(mdev);
1402 	drbd_bm_unlock(mdev);
1403 
1404 	drbd_resume_io(mdev);
1405 
1406 	return rv;
1407 }
1408 
1409 /**
1410  * after_state_ch() - Perform after state change actions that may sleep
1411  * @mdev:	DRBD device.
1412  * @os:		old state.
1413  * @ns:		new state.
1414  * @flags:	Flags
1415  */
1416 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1417 			   union drbd_state ns, enum chg_state_flags flags)
1418 {
1419 	enum drbd_fencing_p fp;
1420 	enum drbd_req_event what = nothing;
1421 	union drbd_state nsm = (union drbd_state){ .i = -1 };
1422 
1423 	if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1424 		clear_bit(CRASHED_PRIMARY, &mdev->flags);
1425 		if (mdev->p_uuid)
1426 			mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1427 	}
1428 
1429 	fp = FP_DONT_CARE;
1430 	if (get_ldev(mdev)) {
1431 		fp = mdev->ldev->dc.fencing;
1432 		put_ldev(mdev);
1433 	}
1434 
1435 	/* Inform userspace about the change... */
1436 	drbd_bcast_state(mdev, ns);
1437 
1438 	if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1439 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1440 		drbd_khelper(mdev, "pri-on-incon-degr");
1441 
1442 	/* Here we have the actions that are performed after a
1443 	   state change. This function might sleep */
1444 
1445 	if (os.disk <= D_NEGOTIATING && ns.disk > D_NEGOTIATING)
1446 		mod_timer(&mdev->request_timer, jiffies + HZ);
1447 
1448 	nsm.i = -1;
1449 	if (ns.susp_nod) {
1450 		if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1451 			what = resend;
1452 
1453 		if ((os.disk == D_ATTACHING || os.disk == D_NEGOTIATING) &&
1454 		    ns.disk > D_NEGOTIATING)
1455 			what = restart_frozen_disk_io;
1456 
1457 		if (what != nothing)
1458 			nsm.susp_nod = 0;
1459 	}
1460 
1461 	if (ns.susp_fen) {
1462 		/* case1: The outdate peer handler is successful: */
1463 		if (os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) {
1464 			if (test_bit(NEW_CUR_UUID, &mdev->flags)) {
1465 				drbd_uuid_new_current(mdev);
1466 				clear_bit(NEW_CUR_UUID, &mdev->flags);
1467 			}
1468 			spin_lock_irq(&mdev->req_lock);
1469 			_tl_clear(mdev);
1470 			_drbd_set_state(_NS(mdev, susp_fen, 0), CS_VERBOSE, NULL);
1471 			spin_unlock_irq(&mdev->req_lock);
1472 		}
1473 		/* case2: The connection was established again: */
1474 		if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1475 			clear_bit(NEW_CUR_UUID, &mdev->flags);
1476 			what = resend;
1477 			nsm.susp_fen = 0;
1478 		}
1479 	}
1480 
1481 	if (what != nothing) {
1482 		spin_lock_irq(&mdev->req_lock);
1483 		_tl_restart(mdev, what);
1484 		nsm.i &= mdev->state.i;
1485 		_drbd_set_state(mdev, nsm, CS_VERBOSE, NULL);
1486 		spin_unlock_irq(&mdev->req_lock);
1487 	}
1488 
1489 	/* Became sync source.  With protocol >= 96, we still need to send out
1490 	 * the sync uuid now. Need to do that before any drbd_send_state, or
1491 	 * the other side may go "paused sync" before receiving the sync uuids,
1492 	 * which is unexpected. */
1493 	if ((os.conn != C_SYNC_SOURCE && os.conn != C_PAUSED_SYNC_S) &&
1494 	    (ns.conn == C_SYNC_SOURCE || ns.conn == C_PAUSED_SYNC_S) &&
1495 	    mdev->agreed_pro_version >= 96 && get_ldev(mdev)) {
1496 		drbd_gen_and_send_sync_uuid(mdev);
1497 		put_ldev(mdev);
1498 	}
1499 
1500 	/* Do not change the order of the if above and the two below... */
1501 	if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1502 		/* we probably will start a resync soon.
1503 		 * make sure those things are properly reset. */
1504 		mdev->rs_total = 0;
1505 		mdev->rs_failed = 0;
1506 		atomic_set(&mdev->rs_pending_cnt, 0);
1507 		drbd_rs_cancel_all(mdev);
1508 
1509 		drbd_send_uuids(mdev);
1510 		drbd_send_state(mdev, ns);
1511 	}
1512 	/* No point in queuing send_bitmap if we don't have a connection
1513 	 * anymore, so check also the _current_ state, not only the new state
1514 	 * at the time this work was queued. */
1515 	if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S &&
1516 	    mdev->state.conn == C_WF_BITMAP_S)
1517 		drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL,
1518 				"send_bitmap (WFBitMapS)",
1519 				BM_LOCKED_TEST_ALLOWED);
1520 
1521 	/* Lost contact to peer's copy of the data */
1522 	if ((os.pdsk >= D_INCONSISTENT &&
1523 	     os.pdsk != D_UNKNOWN &&
1524 	     os.pdsk != D_OUTDATED)
1525 	&&  (ns.pdsk < D_INCONSISTENT ||
1526 	     ns.pdsk == D_UNKNOWN ||
1527 	     ns.pdsk == D_OUTDATED)) {
1528 		if (get_ldev(mdev)) {
1529 			if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1530 			    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1531 				if (is_susp(mdev->state)) {
1532 					set_bit(NEW_CUR_UUID, &mdev->flags);
1533 				} else {
1534 					drbd_uuid_new_current(mdev);
1535 					drbd_send_uuids(mdev);
1536 				}
1537 			}
1538 			put_ldev(mdev);
1539 		}
1540 	}
1541 
1542 	if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1543 		if (os.peer == R_SECONDARY && ns.peer == R_PRIMARY &&
1544 		    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1545 			drbd_uuid_new_current(mdev);
1546 			drbd_send_uuids(mdev);
1547 		}
1548 		/* D_DISKLESS Peer becomes secondary */
1549 		if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1550 			/* We may still be Primary ourselves.
1551 			 * No harm done if the bitmap still changes,
1552 			 * redirtied pages will follow later. */
1553 			drbd_bitmap_io_from_worker(mdev, &drbd_bm_write,
1554 				"demote diskless peer", BM_LOCKED_SET_ALLOWED);
1555 		put_ldev(mdev);
1556 	}
1557 
1558 	/* Write out all changed bits on demote.
1559 	 * Though, no need to da that just yet
1560 	 * if there is a resync going on still */
1561 	if (os.role == R_PRIMARY && ns.role == R_SECONDARY &&
1562 		mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
1563 		/* No changes to the bitmap expected this time, so assert that,
1564 		 * even though no harm was done if it did change. */
1565 		drbd_bitmap_io_from_worker(mdev, &drbd_bm_write,
1566 				"demote", BM_LOCKED_TEST_ALLOWED);
1567 		put_ldev(mdev);
1568 	}
1569 
1570 	/* Last part of the attaching process ... */
1571 	if (ns.conn >= C_CONNECTED &&
1572 	    os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1573 		drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1574 		drbd_send_uuids(mdev);
1575 		drbd_send_state(mdev, ns);
1576 	}
1577 
1578 	/* We want to pause/continue resync, tell peer. */
1579 	if (ns.conn >= C_CONNECTED &&
1580 	     ((os.aftr_isp != ns.aftr_isp) ||
1581 	      (os.user_isp != ns.user_isp)))
1582 		drbd_send_state(mdev, ns);
1583 
1584 	/* In case one of the isp bits got set, suspend other devices. */
1585 	if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1586 	    (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1587 		suspend_other_sg(mdev);
1588 
1589 	/* Make sure the peer gets informed about eventual state
1590 	   changes (ISP bits) while we were in WFReportParams. */
1591 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1592 		drbd_send_state(mdev, ns);
1593 
1594 	if (os.conn != C_AHEAD && ns.conn == C_AHEAD)
1595 		drbd_send_state(mdev, ns);
1596 
1597 	/* We are in the progress to start a full sync... */
1598 	if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1599 	    (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1600 		/* no other bitmap changes expected during this phase */
1601 		drbd_queue_bitmap_io(mdev,
1602 			&drbd_bmio_set_n_write, &abw_start_sync,
1603 			"set_n_write from StartingSync", BM_LOCKED_TEST_ALLOWED);
1604 
1605 	/* We are invalidating our self... */
1606 	if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1607 	    os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1608 		/* other bitmap operation expected during this phase */
1609 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL,
1610 			"set_n_write from invalidate", BM_LOCKED_MASK);
1611 
1612 	/* first half of local IO error, failure to attach,
1613 	 * or administrative detach */
1614 	if (os.disk != D_FAILED && ns.disk == D_FAILED) {
1615 		enum drbd_io_error_p eh = EP_PASS_ON;
1616 		int was_io_error = 0;
1617 		/* corresponding get_ldev was in __drbd_set_state, to serialize
1618 		 * our cleanup here with the transition to D_DISKLESS.
1619 		 * But is is still not save to dreference ldev here, since
1620 		 * we might come from an failed Attach before ldev was set. */
1621 		if (mdev->ldev) {
1622 			eh = mdev->ldev->dc.on_io_error;
1623 			was_io_error = test_and_clear_bit(WAS_IO_ERROR, &mdev->flags);
1624 
1625 			if (was_io_error && eh == EP_CALL_HELPER)
1626 				drbd_khelper(mdev, "local-io-error");
1627 
1628 			/* Immediately allow completion of all application IO,
1629 			 * that waits for completion from the local disk,
1630 			 * if this was a force-detach due to disk_timeout
1631 			 * or administrator request (drbdsetup detach --force).
1632 			 * Do NOT abort otherwise.
1633 			 * Aborting local requests may cause serious problems,
1634 			 * if requests are completed to upper layers already,
1635 			 * and then later the already submitted local bio completes.
1636 			 * This can cause DMA into former bio pages that meanwhile
1637 			 * have been re-used for other things.
1638 			 * So aborting local requests may cause crashes,
1639 			 * or even worse, silent data corruption.
1640 			 */
1641 			if (test_and_clear_bit(FORCE_DETACH, &mdev->flags))
1642 				tl_abort_disk_io(mdev);
1643 
1644 			/* current state still has to be D_FAILED,
1645 			 * there is only one way out: to D_DISKLESS,
1646 			 * and that may only happen after our put_ldev below. */
1647 			if (mdev->state.disk != D_FAILED)
1648 				dev_err(DEV,
1649 					"ASSERT FAILED: disk is %s during detach\n",
1650 					drbd_disk_str(mdev->state.disk));
1651 
1652 			if (ns.conn >= C_CONNECTED)
1653 				drbd_send_state(mdev, ns);
1654 
1655 			drbd_rs_cancel_all(mdev);
1656 
1657 			/* In case we want to get something to stable storage still,
1658 			 * this may be the last chance.
1659 			 * Following put_ldev may transition to D_DISKLESS. */
1660 			drbd_md_sync(mdev);
1661 		}
1662 		put_ldev(mdev);
1663 	}
1664 
1665         /* second half of local IO error, failure to attach,
1666          * or administrative detach,
1667          * after local_cnt references have reached zero again */
1668         if (os.disk != D_DISKLESS && ns.disk == D_DISKLESS) {
1669                 /* We must still be diskless,
1670                  * re-attach has to be serialized with this! */
1671                 if (mdev->state.disk != D_DISKLESS)
1672                         dev_err(DEV,
1673                                 "ASSERT FAILED: disk is %s while going diskless\n",
1674                                 drbd_disk_str(mdev->state.disk));
1675 
1676 		if (ns.conn >= C_CONNECTED)
1677 			drbd_send_state(mdev, ns);
1678 
1679 		/* corresponding get_ldev in __drbd_set_state
1680 		 * this may finally trigger drbd_ldev_destroy. */
1681 		put_ldev(mdev);
1682 	}
1683 
1684 	/* Notify peer that I had a local IO error, and did not detached.. */
1685 	if (os.disk == D_UP_TO_DATE && ns.disk == D_INCONSISTENT && ns.conn >= C_CONNECTED)
1686 		drbd_send_state(mdev, ns);
1687 
1688 	/* Disks got bigger while they were detached */
1689 	if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1690 	    test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1691 		if (ns.conn == C_CONNECTED)
1692 			resync_after_online_grow(mdev);
1693 	}
1694 
1695 	/* A resync finished or aborted, wake paused devices... */
1696 	if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1697 	    (os.peer_isp && !ns.peer_isp) ||
1698 	    (os.user_isp && !ns.user_isp))
1699 		resume_next_sg(mdev);
1700 
1701 	/* sync target done with resync.  Explicitly notify peer, even though
1702 	 * it should (at least for non-empty resyncs) already know itself. */
1703 	if (os.disk < D_UP_TO_DATE && os.conn >= C_SYNC_SOURCE && ns.conn == C_CONNECTED)
1704 		drbd_send_state(mdev, ns);
1705 
1706 	/* Wake up role changes, that were delayed because of connection establishing */
1707 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn != C_WF_REPORT_PARAMS) {
1708 		clear_bit(STATE_SENT, &mdev->flags);
1709 		wake_up(&mdev->state_wait);
1710 	}
1711 
1712 	/* This triggers bitmap writeout of potentially still unwritten pages
1713 	 * if the resync finished cleanly, or aborted because of peer disk
1714 	 * failure, or because of connection loss.
1715 	 * For resync aborted because of local disk failure, we cannot do
1716 	 * any bitmap writeout anymore.
1717 	 * No harm done if some bits change during this phase.
1718 	 */
1719 	if (os.conn > C_CONNECTED && ns.conn <= C_CONNECTED && get_ldev(mdev)) {
1720 		drbd_queue_bitmap_io(mdev, &drbd_bm_write_copy_pages, NULL,
1721 			"write from resync_finished", BM_LOCKED_CHANGE_ALLOWED);
1722 		put_ldev(mdev);
1723 	}
1724 
1725 	/* free tl_hash if we Got thawed and are C_STANDALONE */
1726 	if (ns.conn == C_STANDALONE && !is_susp(ns) && mdev->tl_hash)
1727 		drbd_free_tl_hash(mdev);
1728 
1729 	/* Upon network connection, we need to start the receiver */
1730 	if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1731 		drbd_thread_start(&mdev->receiver);
1732 
1733 	/* Terminate worker thread if we are unconfigured - it will be
1734 	   restarted as needed... */
1735 	if (ns.disk == D_DISKLESS &&
1736 	    ns.conn == C_STANDALONE &&
1737 	    ns.role == R_SECONDARY) {
1738 		if (os.aftr_isp != ns.aftr_isp)
1739 			resume_next_sg(mdev);
1740 		/* set in __drbd_set_state, unless CONFIG_PENDING was set */
1741 		if (test_bit(DEVICE_DYING, &mdev->flags))
1742 			drbd_thread_stop_nowait(&mdev->worker);
1743 	}
1744 
1745 	drbd_md_sync(mdev);
1746 }
1747 
1748 
1749 static int drbd_thread_setup(void *arg)
1750 {
1751 	struct drbd_thread *thi = (struct drbd_thread *) arg;
1752 	struct drbd_conf *mdev = thi->mdev;
1753 	unsigned long flags;
1754 	int retval;
1755 
1756 restart:
1757 	retval = thi->function(thi);
1758 
1759 	spin_lock_irqsave(&thi->t_lock, flags);
1760 
1761 	/* if the receiver has been "Exiting", the last thing it did
1762 	 * was set the conn state to "StandAlone",
1763 	 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1764 	 * and receiver thread will be "started".
1765 	 * drbd_thread_start needs to set "Restarting" in that case.
1766 	 * t_state check and assignment needs to be within the same spinlock,
1767 	 * so either thread_start sees Exiting, and can remap to Restarting,
1768 	 * or thread_start see None, and can proceed as normal.
1769 	 */
1770 
1771 	if (thi->t_state == Restarting) {
1772 		dev_info(DEV, "Restarting %s\n", current->comm);
1773 		thi->t_state = Running;
1774 		spin_unlock_irqrestore(&thi->t_lock, flags);
1775 		goto restart;
1776 	}
1777 
1778 	thi->task = NULL;
1779 	thi->t_state = None;
1780 	smp_mb();
1781 	complete(&thi->stop);
1782 	spin_unlock_irqrestore(&thi->t_lock, flags);
1783 
1784 	dev_info(DEV, "Terminating %s\n", current->comm);
1785 
1786 	/* Release mod reference taken when thread was started */
1787 	module_put(THIS_MODULE);
1788 	return retval;
1789 }
1790 
1791 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1792 		      int (*func) (struct drbd_thread *))
1793 {
1794 	spin_lock_init(&thi->t_lock);
1795 	thi->task    = NULL;
1796 	thi->t_state = None;
1797 	thi->function = func;
1798 	thi->mdev = mdev;
1799 }
1800 
1801 int drbd_thread_start(struct drbd_thread *thi)
1802 {
1803 	struct drbd_conf *mdev = thi->mdev;
1804 	struct task_struct *nt;
1805 	unsigned long flags;
1806 
1807 	const char *me =
1808 		thi == &mdev->receiver ? "receiver" :
1809 		thi == &mdev->asender  ? "asender"  :
1810 		thi == &mdev->worker   ? "worker"   : "NONSENSE";
1811 
1812 	/* is used from state engine doing drbd_thread_stop_nowait,
1813 	 * while holding the req lock irqsave */
1814 	spin_lock_irqsave(&thi->t_lock, flags);
1815 
1816 	switch (thi->t_state) {
1817 	case None:
1818 		dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1819 				me, current->comm, current->pid);
1820 
1821 		/* Get ref on module for thread - this is released when thread exits */
1822 		if (!try_module_get(THIS_MODULE)) {
1823 			dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1824 			spin_unlock_irqrestore(&thi->t_lock, flags);
1825 			return false;
1826 		}
1827 
1828 		init_completion(&thi->stop);
1829 		D_ASSERT(thi->task == NULL);
1830 		thi->reset_cpu_mask = 1;
1831 		thi->t_state = Running;
1832 		spin_unlock_irqrestore(&thi->t_lock, flags);
1833 		flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1834 
1835 		nt = kthread_create(drbd_thread_setup, (void *) thi,
1836 				    "drbd%d_%s", mdev_to_minor(mdev), me);
1837 
1838 		if (IS_ERR(nt)) {
1839 			dev_err(DEV, "Couldn't start thread\n");
1840 
1841 			module_put(THIS_MODULE);
1842 			return false;
1843 		}
1844 		spin_lock_irqsave(&thi->t_lock, flags);
1845 		thi->task = nt;
1846 		thi->t_state = Running;
1847 		spin_unlock_irqrestore(&thi->t_lock, flags);
1848 		wake_up_process(nt);
1849 		break;
1850 	case Exiting:
1851 		thi->t_state = Restarting;
1852 		dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1853 				me, current->comm, current->pid);
1854 		/* fall through */
1855 	case Running:
1856 	case Restarting:
1857 	default:
1858 		spin_unlock_irqrestore(&thi->t_lock, flags);
1859 		break;
1860 	}
1861 
1862 	return true;
1863 }
1864 
1865 
1866 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1867 {
1868 	unsigned long flags;
1869 
1870 	enum drbd_thread_state ns = restart ? Restarting : Exiting;
1871 
1872 	/* may be called from state engine, holding the req lock irqsave */
1873 	spin_lock_irqsave(&thi->t_lock, flags);
1874 
1875 	if (thi->t_state == None) {
1876 		spin_unlock_irqrestore(&thi->t_lock, flags);
1877 		if (restart)
1878 			drbd_thread_start(thi);
1879 		return;
1880 	}
1881 
1882 	if (thi->t_state != ns) {
1883 		if (thi->task == NULL) {
1884 			spin_unlock_irqrestore(&thi->t_lock, flags);
1885 			return;
1886 		}
1887 
1888 		thi->t_state = ns;
1889 		smp_mb();
1890 		init_completion(&thi->stop);
1891 		if (thi->task != current)
1892 			force_sig(DRBD_SIGKILL, thi->task);
1893 
1894 	}
1895 
1896 	spin_unlock_irqrestore(&thi->t_lock, flags);
1897 
1898 	if (wait)
1899 		wait_for_completion(&thi->stop);
1900 }
1901 
1902 #ifdef CONFIG_SMP
1903 /**
1904  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1905  * @mdev:	DRBD device.
1906  *
1907  * Forces all threads of a device onto the same CPU. This is beneficial for
1908  * DRBD's performance. May be overwritten by user's configuration.
1909  */
1910 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1911 {
1912 	int ord, cpu;
1913 
1914 	/* user override. */
1915 	if (cpumask_weight(mdev->cpu_mask))
1916 		return;
1917 
1918 	ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1919 	for_each_online_cpu(cpu) {
1920 		if (ord-- == 0) {
1921 			cpumask_set_cpu(cpu, mdev->cpu_mask);
1922 			return;
1923 		}
1924 	}
1925 	/* should not be reached */
1926 	cpumask_setall(mdev->cpu_mask);
1927 }
1928 
1929 /**
1930  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1931  * @mdev:	DRBD device.
1932  *
1933  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1934  * prematurely.
1935  */
1936 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1937 {
1938 	struct task_struct *p = current;
1939 	struct drbd_thread *thi =
1940 		p == mdev->asender.task  ? &mdev->asender  :
1941 		p == mdev->receiver.task ? &mdev->receiver :
1942 		p == mdev->worker.task   ? &mdev->worker   :
1943 		NULL;
1944 	ERR_IF(thi == NULL)
1945 		return;
1946 	if (!thi->reset_cpu_mask)
1947 		return;
1948 	thi->reset_cpu_mask = 0;
1949 	set_cpus_allowed_ptr(p, mdev->cpu_mask);
1950 }
1951 #endif
1952 
1953 /* the appropriate socket mutex must be held already */
1954 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1955 			  enum drbd_packets cmd, struct p_header80 *h,
1956 			  size_t size, unsigned msg_flags)
1957 {
1958 	int sent, ok;
1959 
1960 	ERR_IF(!h) return false;
1961 	ERR_IF(!size) return false;
1962 
1963 	h->magic   = BE_DRBD_MAGIC;
1964 	h->command = cpu_to_be16(cmd);
1965 	h->length  = cpu_to_be16(size-sizeof(struct p_header80));
1966 
1967 	sent = drbd_send(mdev, sock, h, size, msg_flags);
1968 
1969 	ok = (sent == size);
1970 	if (!ok && !signal_pending(current))
1971 		dev_warn(DEV, "short sent %s size=%d sent=%d\n",
1972 		    cmdname(cmd), (int)size, sent);
1973 	return ok;
1974 }
1975 
1976 /* don't pass the socket. we may only look at it
1977  * when we hold the appropriate socket mutex.
1978  */
1979 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1980 		  enum drbd_packets cmd, struct p_header80 *h, size_t size)
1981 {
1982 	int ok = 0;
1983 	struct socket *sock;
1984 
1985 	if (use_data_socket) {
1986 		mutex_lock(&mdev->data.mutex);
1987 		sock = mdev->data.socket;
1988 	} else {
1989 		mutex_lock(&mdev->meta.mutex);
1990 		sock = mdev->meta.socket;
1991 	}
1992 
1993 	/* drbd_disconnect() could have called drbd_free_sock()
1994 	 * while we were waiting in down()... */
1995 	if (likely(sock != NULL))
1996 		ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1997 
1998 	if (use_data_socket)
1999 		mutex_unlock(&mdev->data.mutex);
2000 	else
2001 		mutex_unlock(&mdev->meta.mutex);
2002 	return ok;
2003 }
2004 
2005 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
2006 		   size_t size)
2007 {
2008 	struct p_header80 h;
2009 	int ok;
2010 
2011 	h.magic   = BE_DRBD_MAGIC;
2012 	h.command = cpu_to_be16(cmd);
2013 	h.length  = cpu_to_be16(size);
2014 
2015 	if (!drbd_get_data_sock(mdev))
2016 		return 0;
2017 
2018 	ok = (sizeof(h) ==
2019 		drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
2020 	ok = ok && (size ==
2021 		drbd_send(mdev, mdev->data.socket, data, size, 0));
2022 
2023 	drbd_put_data_sock(mdev);
2024 
2025 	return ok;
2026 }
2027 
2028 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
2029 {
2030 	struct p_rs_param_95 *p;
2031 	struct socket *sock;
2032 	int size, rv;
2033 	const int apv = mdev->agreed_pro_version;
2034 
2035 	size = apv <= 87 ? sizeof(struct p_rs_param)
2036 		: apv == 88 ? sizeof(struct p_rs_param)
2037 			+ strlen(mdev->sync_conf.verify_alg) + 1
2038 		: apv <= 94 ? sizeof(struct p_rs_param_89)
2039 		: /* apv >= 95 */ sizeof(struct p_rs_param_95);
2040 
2041 	/* used from admin command context and receiver/worker context.
2042 	 * to avoid kmalloc, grab the socket right here,
2043 	 * then use the pre-allocated sbuf there */
2044 	mutex_lock(&mdev->data.mutex);
2045 	sock = mdev->data.socket;
2046 
2047 	if (likely(sock != NULL)) {
2048 		enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
2049 
2050 		p = &mdev->data.sbuf.rs_param_95;
2051 
2052 		/* initialize verify_alg and csums_alg */
2053 		memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2054 
2055 		p->rate = cpu_to_be32(sc->rate);
2056 		p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
2057 		p->c_delay_target = cpu_to_be32(sc->c_delay_target);
2058 		p->c_fill_target = cpu_to_be32(sc->c_fill_target);
2059 		p->c_max_rate = cpu_to_be32(sc->c_max_rate);
2060 
2061 		if (apv >= 88)
2062 			strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
2063 		if (apv >= 89)
2064 			strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
2065 
2066 		rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
2067 	} else
2068 		rv = 0; /* not ok */
2069 
2070 	mutex_unlock(&mdev->data.mutex);
2071 
2072 	return rv;
2073 }
2074 
2075 int drbd_send_protocol(struct drbd_conf *mdev)
2076 {
2077 	struct p_protocol *p;
2078 	int size, cf, rv;
2079 
2080 	size = sizeof(struct p_protocol);
2081 
2082 	if (mdev->agreed_pro_version >= 87)
2083 		size += strlen(mdev->net_conf->integrity_alg) + 1;
2084 
2085 	/* we must not recurse into our own queue,
2086 	 * as that is blocked during handshake */
2087 	p = kmalloc(size, GFP_NOIO);
2088 	if (p == NULL)
2089 		return 0;
2090 
2091 	p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
2092 	p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
2093 	p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
2094 	p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
2095 	p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
2096 
2097 	cf = 0;
2098 	if (mdev->net_conf->want_lose)
2099 		cf |= CF_WANT_LOSE;
2100 	if (mdev->net_conf->dry_run) {
2101 		if (mdev->agreed_pro_version >= 92)
2102 			cf |= CF_DRY_RUN;
2103 		else {
2104 			dev_err(DEV, "--dry-run is not supported by peer");
2105 			kfree(p);
2106 			return -1;
2107 		}
2108 	}
2109 	p->conn_flags    = cpu_to_be32(cf);
2110 
2111 	if (mdev->agreed_pro_version >= 87)
2112 		strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
2113 
2114 	rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
2115 			   (struct p_header80 *)p, size);
2116 	kfree(p);
2117 	return rv;
2118 }
2119 
2120 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
2121 {
2122 	struct p_uuids p;
2123 	int i;
2124 
2125 	if (!get_ldev_if_state(mdev, D_NEGOTIATING))
2126 		return 1;
2127 
2128 	for (i = UI_CURRENT; i < UI_SIZE; i++)
2129 		p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
2130 
2131 	mdev->comm_bm_set = drbd_bm_total_weight(mdev);
2132 	p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
2133 	uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
2134 	uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
2135 	uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
2136 	p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
2137 
2138 	put_ldev(mdev);
2139 
2140 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
2141 			     (struct p_header80 *)&p, sizeof(p));
2142 }
2143 
2144 int drbd_send_uuids(struct drbd_conf *mdev)
2145 {
2146 	return _drbd_send_uuids(mdev, 0);
2147 }
2148 
2149 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
2150 {
2151 	return _drbd_send_uuids(mdev, 8);
2152 }
2153 
2154 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
2155 {
2156 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
2157 		u64 *uuid = mdev->ldev->md.uuid;
2158 		dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
2159 		     text,
2160 		     (unsigned long long)uuid[UI_CURRENT],
2161 		     (unsigned long long)uuid[UI_BITMAP],
2162 		     (unsigned long long)uuid[UI_HISTORY_START],
2163 		     (unsigned long long)uuid[UI_HISTORY_END]);
2164 		put_ldev(mdev);
2165 	} else {
2166 		dev_info(DEV, "%s effective data uuid: %016llX\n",
2167 				text,
2168 				(unsigned long long)mdev->ed_uuid);
2169 	}
2170 }
2171 
2172 int drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
2173 {
2174 	struct p_rs_uuid p;
2175 	u64 uuid;
2176 
2177 	D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
2178 
2179 	uuid = mdev->ldev->md.uuid[UI_BITMAP];
2180 	if (uuid && uuid != UUID_JUST_CREATED)
2181 		uuid = uuid + UUID_NEW_BM_OFFSET;
2182 	else
2183 		get_random_bytes(&uuid, sizeof(u64));
2184 	drbd_uuid_set(mdev, UI_BITMAP, uuid);
2185 	drbd_print_uuids(mdev, "updated sync UUID");
2186 	drbd_md_sync(mdev);
2187 	p.uuid = cpu_to_be64(uuid);
2188 
2189 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
2190 			     (struct p_header80 *)&p, sizeof(p));
2191 }
2192 
2193 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
2194 {
2195 	struct p_sizes p;
2196 	sector_t d_size, u_size;
2197 	int q_order_type;
2198 	unsigned int max_bio_size;
2199 	int ok;
2200 
2201 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
2202 		D_ASSERT(mdev->ldev->backing_bdev);
2203 		d_size = drbd_get_max_capacity(mdev->ldev);
2204 		u_size = mdev->ldev->dc.disk_size;
2205 		q_order_type = drbd_queue_order_type(mdev);
2206 		max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
2207 		max_bio_size = min(max_bio_size, DRBD_MAX_BIO_SIZE);
2208 		put_ldev(mdev);
2209 	} else {
2210 		d_size = 0;
2211 		u_size = 0;
2212 		q_order_type = QUEUE_ORDERED_NONE;
2213 		max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
2214 	}
2215 
2216 	/* Never allow old drbd (up to 8.3.7) to see more than 32KiB */
2217 	if (mdev->agreed_pro_version <= 94)
2218 		max_bio_size = min(max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
2219 
2220 	p.d_size = cpu_to_be64(d_size);
2221 	p.u_size = cpu_to_be64(u_size);
2222 	p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
2223 	p.max_bio_size = cpu_to_be32(max_bio_size);
2224 	p.queue_order_type = cpu_to_be16(q_order_type);
2225 	p.dds_flags = cpu_to_be16(flags);
2226 
2227 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
2228 			   (struct p_header80 *)&p, sizeof(p));
2229 	return ok;
2230 }
2231 
2232 /**
2233  * drbd_send_current_state() - Sends the drbd state to the peer
2234  * @mdev:	DRBD device.
2235  */
2236 int drbd_send_current_state(struct drbd_conf *mdev)
2237 {
2238 	struct socket *sock;
2239 	struct p_state p;
2240 	int ok = 0;
2241 
2242 	/* Grab state lock so we wont send state if we're in the middle
2243 	 * of a cluster wide state change on another thread */
2244 	drbd_state_lock(mdev);
2245 
2246 	mutex_lock(&mdev->data.mutex);
2247 
2248 	p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
2249 	sock = mdev->data.socket;
2250 
2251 	if (likely(sock != NULL)) {
2252 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
2253 				    (struct p_header80 *)&p, sizeof(p), 0);
2254 	}
2255 
2256 	mutex_unlock(&mdev->data.mutex);
2257 
2258 	drbd_state_unlock(mdev);
2259 	return ok;
2260 }
2261 
2262 /**
2263  * drbd_send_state() - After a state change, sends the new state to the peer
2264  * @mdev:	DRBD device.
2265  * @state:	the state to send, not necessarily the current state.
2266  *
2267  * Each state change queues an "after_state_ch" work, which will eventually
2268  * send the resulting new state to the peer. If more state changes happen
2269  * between queuing and processing of the after_state_ch work, we still
2270  * want to send each intermediary state in the order it occurred.
2271  */
2272 int drbd_send_state(struct drbd_conf *mdev, union drbd_state state)
2273 {
2274 	struct socket *sock;
2275 	struct p_state p;
2276 	int ok = 0;
2277 
2278 	mutex_lock(&mdev->data.mutex);
2279 
2280 	p.state = cpu_to_be32(state.i);
2281 	sock = mdev->data.socket;
2282 
2283 	if (likely(sock != NULL)) {
2284 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
2285 				    (struct p_header80 *)&p, sizeof(p), 0);
2286 	}
2287 
2288 	mutex_unlock(&mdev->data.mutex);
2289 
2290 	return ok;
2291 }
2292 
2293 int drbd_send_state_req(struct drbd_conf *mdev,
2294 	union drbd_state mask, union drbd_state val)
2295 {
2296 	struct p_req_state p;
2297 
2298 	p.mask    = cpu_to_be32(mask.i);
2299 	p.val     = cpu_to_be32(val.i);
2300 
2301 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
2302 			     (struct p_header80 *)&p, sizeof(p));
2303 }
2304 
2305 int drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
2306 {
2307 	struct p_req_state_reply p;
2308 
2309 	p.retcode    = cpu_to_be32(retcode);
2310 
2311 	return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
2312 			     (struct p_header80 *)&p, sizeof(p));
2313 }
2314 
2315 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
2316 	struct p_compressed_bm *p,
2317 	struct bm_xfer_ctx *c)
2318 {
2319 	struct bitstream bs;
2320 	unsigned long plain_bits;
2321 	unsigned long tmp;
2322 	unsigned long rl;
2323 	unsigned len;
2324 	unsigned toggle;
2325 	int bits;
2326 
2327 	/* may we use this feature? */
2328 	if ((mdev->sync_conf.use_rle == 0) ||
2329 		(mdev->agreed_pro_version < 90))
2330 			return 0;
2331 
2332 	if (c->bit_offset >= c->bm_bits)
2333 		return 0; /* nothing to do. */
2334 
2335 	/* use at most thus many bytes */
2336 	bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
2337 	memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
2338 	/* plain bits covered in this code string */
2339 	plain_bits = 0;
2340 
2341 	/* p->encoding & 0x80 stores whether the first run length is set.
2342 	 * bit offset is implicit.
2343 	 * start with toggle == 2 to be able to tell the first iteration */
2344 	toggle = 2;
2345 
2346 	/* see how much plain bits we can stuff into one packet
2347 	 * using RLE and VLI. */
2348 	do {
2349 		tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
2350 				    : _drbd_bm_find_next(mdev, c->bit_offset);
2351 		if (tmp == -1UL)
2352 			tmp = c->bm_bits;
2353 		rl = tmp - c->bit_offset;
2354 
2355 		if (toggle == 2) { /* first iteration */
2356 			if (rl == 0) {
2357 				/* the first checked bit was set,
2358 				 * store start value, */
2359 				DCBP_set_start(p, 1);
2360 				/* but skip encoding of zero run length */
2361 				toggle = !toggle;
2362 				continue;
2363 			}
2364 			DCBP_set_start(p, 0);
2365 		}
2366 
2367 		/* paranoia: catch zero runlength.
2368 		 * can only happen if bitmap is modified while we scan it. */
2369 		if (rl == 0) {
2370 			dev_err(DEV, "unexpected zero runlength while encoding bitmap "
2371 			    "t:%u bo:%lu\n", toggle, c->bit_offset);
2372 			return -1;
2373 		}
2374 
2375 		bits = vli_encode_bits(&bs, rl);
2376 		if (bits == -ENOBUFS) /* buffer full */
2377 			break;
2378 		if (bits <= 0) {
2379 			dev_err(DEV, "error while encoding bitmap: %d\n", bits);
2380 			return 0;
2381 		}
2382 
2383 		toggle = !toggle;
2384 		plain_bits += rl;
2385 		c->bit_offset = tmp;
2386 	} while (c->bit_offset < c->bm_bits);
2387 
2388 	len = bs.cur.b - p->code + !!bs.cur.bit;
2389 
2390 	if (plain_bits < (len << 3)) {
2391 		/* incompressible with this method.
2392 		 * we need to rewind both word and bit position. */
2393 		c->bit_offset -= plain_bits;
2394 		bm_xfer_ctx_bit_to_word_offset(c);
2395 		c->bit_offset = c->word_offset * BITS_PER_LONG;
2396 		return 0;
2397 	}
2398 
2399 	/* RLE + VLI was able to compress it just fine.
2400 	 * update c->word_offset. */
2401 	bm_xfer_ctx_bit_to_word_offset(c);
2402 
2403 	/* store pad_bits */
2404 	DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
2405 
2406 	return len;
2407 }
2408 
2409 /**
2410  * send_bitmap_rle_or_plain
2411  *
2412  * Return 0 when done, 1 when another iteration is needed, and a negative error
2413  * code upon failure.
2414  */
2415 static int
2416 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
2417 			 struct p_header80 *h, struct bm_xfer_ctx *c)
2418 {
2419 	struct p_compressed_bm *p = (void*)h;
2420 	unsigned long num_words;
2421 	int len;
2422 	int ok;
2423 
2424 	len = fill_bitmap_rle_bits(mdev, p, c);
2425 
2426 	if (len < 0)
2427 		return -EIO;
2428 
2429 	if (len) {
2430 		DCBP_set_code(p, RLE_VLI_Bits);
2431 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
2432 			sizeof(*p) + len, 0);
2433 
2434 		c->packets[0]++;
2435 		c->bytes[0] += sizeof(*p) + len;
2436 
2437 		if (c->bit_offset >= c->bm_bits)
2438 			len = 0; /* DONE */
2439 	} else {
2440 		/* was not compressible.
2441 		 * send a buffer full of plain text bits instead. */
2442 		num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
2443 		len = num_words * sizeof(long);
2444 		if (len)
2445 			drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
2446 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
2447 				   h, sizeof(struct p_header80) + len, 0);
2448 		c->word_offset += num_words;
2449 		c->bit_offset = c->word_offset * BITS_PER_LONG;
2450 
2451 		c->packets[1]++;
2452 		c->bytes[1] += sizeof(struct p_header80) + len;
2453 
2454 		if (c->bit_offset > c->bm_bits)
2455 			c->bit_offset = c->bm_bits;
2456 	}
2457 	if (ok) {
2458 		if (len == 0) {
2459 			INFO_bm_xfer_stats(mdev, "send", c);
2460 			return 0;
2461 		} else
2462 			return 1;
2463 	}
2464 	return -EIO;
2465 }
2466 
2467 /* See the comment at receive_bitmap() */
2468 int _drbd_send_bitmap(struct drbd_conf *mdev)
2469 {
2470 	struct bm_xfer_ctx c;
2471 	struct p_header80 *p;
2472 	int err;
2473 
2474 	ERR_IF(!mdev->bitmap) return false;
2475 
2476 	/* maybe we should use some per thread scratch page,
2477 	 * and allocate that during initial device creation? */
2478 	p = (struct p_header80 *) __get_free_page(GFP_NOIO);
2479 	if (!p) {
2480 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2481 		return false;
2482 	}
2483 
2484 	if (get_ldev(mdev)) {
2485 		if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2486 			dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2487 			drbd_bm_set_all(mdev);
2488 			if (drbd_bm_write(mdev)) {
2489 				/* write_bm did fail! Leave full sync flag set in Meta P_DATA
2490 				 * but otherwise process as per normal - need to tell other
2491 				 * side that a full resync is required! */
2492 				dev_err(DEV, "Failed to write bitmap to disk!\n");
2493 			} else {
2494 				drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2495 				drbd_md_sync(mdev);
2496 			}
2497 		}
2498 		put_ldev(mdev);
2499 	}
2500 
2501 	c = (struct bm_xfer_ctx) {
2502 		.bm_bits = drbd_bm_bits(mdev),
2503 		.bm_words = drbd_bm_words(mdev),
2504 	};
2505 
2506 	do {
2507 		err = send_bitmap_rle_or_plain(mdev, p, &c);
2508 	} while (err > 0);
2509 
2510 	free_page((unsigned long) p);
2511 	return err == 0;
2512 }
2513 
2514 int drbd_send_bitmap(struct drbd_conf *mdev)
2515 {
2516 	int err;
2517 
2518 	if (!drbd_get_data_sock(mdev))
2519 		return -1;
2520 	err = !_drbd_send_bitmap(mdev);
2521 	drbd_put_data_sock(mdev);
2522 	return err;
2523 }
2524 
2525 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2526 {
2527 	int ok;
2528 	struct p_barrier_ack p;
2529 
2530 	p.barrier  = barrier_nr;
2531 	p.set_size = cpu_to_be32(set_size);
2532 
2533 	if (mdev->state.conn < C_CONNECTED)
2534 		return false;
2535 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2536 			(struct p_header80 *)&p, sizeof(p));
2537 	return ok;
2538 }
2539 
2540 /**
2541  * _drbd_send_ack() - Sends an ack packet
2542  * @mdev:	DRBD device.
2543  * @cmd:	Packet command code.
2544  * @sector:	sector, needs to be in big endian byte order
2545  * @blksize:	size in byte, needs to be in big endian byte order
2546  * @block_id:	Id, big endian byte order
2547  */
2548 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2549 			  u64 sector,
2550 			  u32 blksize,
2551 			  u64 block_id)
2552 {
2553 	int ok;
2554 	struct p_block_ack p;
2555 
2556 	p.sector   = sector;
2557 	p.block_id = block_id;
2558 	p.blksize  = blksize;
2559 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2560 
2561 	if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2562 		return false;
2563 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2564 				(struct p_header80 *)&p, sizeof(p));
2565 	return ok;
2566 }
2567 
2568 /* dp->sector and dp->block_id already/still in network byte order,
2569  * data_size is payload size according to dp->head,
2570  * and may need to be corrected for digest size. */
2571 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2572 		     struct p_data *dp, int data_size)
2573 {
2574 	data_size -= (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
2575 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
2576 	return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2577 			      dp->block_id);
2578 }
2579 
2580 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2581 		     struct p_block_req *rp)
2582 {
2583 	return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2584 }
2585 
2586 /**
2587  * drbd_send_ack() - Sends an ack packet
2588  * @mdev:	DRBD device.
2589  * @cmd:	Packet command code.
2590  * @e:		Epoch entry.
2591  */
2592 int drbd_send_ack(struct drbd_conf *mdev,
2593 	enum drbd_packets cmd, struct drbd_epoch_entry *e)
2594 {
2595 	return _drbd_send_ack(mdev, cmd,
2596 			      cpu_to_be64(e->sector),
2597 			      cpu_to_be32(e->size),
2598 			      e->block_id);
2599 }
2600 
2601 /* This function misuses the block_id field to signal if the blocks
2602  * are is sync or not. */
2603 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2604 		     sector_t sector, int blksize, u64 block_id)
2605 {
2606 	return _drbd_send_ack(mdev, cmd,
2607 			      cpu_to_be64(sector),
2608 			      cpu_to_be32(blksize),
2609 			      cpu_to_be64(block_id));
2610 }
2611 
2612 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2613 		       sector_t sector, int size, u64 block_id)
2614 {
2615 	int ok;
2616 	struct p_block_req p;
2617 
2618 	p.sector   = cpu_to_be64(sector);
2619 	p.block_id = block_id;
2620 	p.blksize  = cpu_to_be32(size);
2621 
2622 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2623 				(struct p_header80 *)&p, sizeof(p));
2624 	return ok;
2625 }
2626 
2627 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2628 			    sector_t sector, int size,
2629 			    void *digest, int digest_size,
2630 			    enum drbd_packets cmd)
2631 {
2632 	int ok;
2633 	struct p_block_req p;
2634 
2635 	p.sector   = cpu_to_be64(sector);
2636 	p.block_id = BE_DRBD_MAGIC + 0xbeef;
2637 	p.blksize  = cpu_to_be32(size);
2638 
2639 	p.head.magic   = BE_DRBD_MAGIC;
2640 	p.head.command = cpu_to_be16(cmd);
2641 	p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + digest_size);
2642 
2643 	mutex_lock(&mdev->data.mutex);
2644 
2645 	ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2646 	ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2647 
2648 	mutex_unlock(&mdev->data.mutex);
2649 
2650 	return ok;
2651 }
2652 
2653 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2654 {
2655 	int ok;
2656 	struct p_block_req p;
2657 
2658 	p.sector   = cpu_to_be64(sector);
2659 	p.block_id = BE_DRBD_MAGIC + 0xbabe;
2660 	p.blksize  = cpu_to_be32(size);
2661 
2662 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2663 			   (struct p_header80 *)&p, sizeof(p));
2664 	return ok;
2665 }
2666 
2667 /* called on sndtimeo
2668  * returns false if we should retry,
2669  * true if we think connection is dead
2670  */
2671 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2672 {
2673 	int drop_it;
2674 	/* long elapsed = (long)(jiffies - mdev->last_received); */
2675 
2676 	drop_it =   mdev->meta.socket == sock
2677 		|| !mdev->asender.task
2678 		|| get_t_state(&mdev->asender) != Running
2679 		|| mdev->state.conn < C_CONNECTED;
2680 
2681 	if (drop_it)
2682 		return true;
2683 
2684 	drop_it = !--mdev->ko_count;
2685 	if (!drop_it) {
2686 		dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2687 		       current->comm, current->pid, mdev->ko_count);
2688 		request_ping(mdev);
2689 	}
2690 
2691 	return drop_it; /* && (mdev->state == R_PRIMARY) */;
2692 }
2693 
2694 /* The idea of sendpage seems to be to put some kind of reference
2695  * to the page into the skb, and to hand it over to the NIC. In
2696  * this process get_page() gets called.
2697  *
2698  * As soon as the page was really sent over the network put_page()
2699  * gets called by some part of the network layer. [ NIC driver? ]
2700  *
2701  * [ get_page() / put_page() increment/decrement the count. If count
2702  *   reaches 0 the page will be freed. ]
2703  *
2704  * This works nicely with pages from FSs.
2705  * But this means that in protocol A we might signal IO completion too early!
2706  *
2707  * In order not to corrupt data during a resync we must make sure
2708  * that we do not reuse our own buffer pages (EEs) to early, therefore
2709  * we have the net_ee list.
2710  *
2711  * XFS seems to have problems, still, it submits pages with page_count == 0!
2712  * As a workaround, we disable sendpage on pages
2713  * with page_count == 0 or PageSlab.
2714  */
2715 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2716 		   int offset, size_t size, unsigned msg_flags)
2717 {
2718 	int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags);
2719 	kunmap(page);
2720 	if (sent == size)
2721 		mdev->send_cnt += size>>9;
2722 	return sent == size;
2723 }
2724 
2725 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2726 		    int offset, size_t size, unsigned msg_flags)
2727 {
2728 	mm_segment_t oldfs = get_fs();
2729 	int sent, ok;
2730 	int len = size;
2731 
2732 	/* e.g. XFS meta- & log-data is in slab pages, which have a
2733 	 * page_count of 0 and/or have PageSlab() set.
2734 	 * we cannot use send_page for those, as that does get_page();
2735 	 * put_page(); and would cause either a VM_BUG directly, or
2736 	 * __page_cache_release a page that would actually still be referenced
2737 	 * by someone, leading to some obscure delayed Oops somewhere else. */
2738 	if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2739 		return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2740 
2741 	msg_flags |= MSG_NOSIGNAL;
2742 	drbd_update_congested(mdev);
2743 	set_fs(KERNEL_DS);
2744 	do {
2745 		sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2746 							offset, len,
2747 							msg_flags);
2748 		if (sent == -EAGAIN) {
2749 			if (we_should_drop_the_connection(mdev,
2750 							  mdev->data.socket))
2751 				break;
2752 			else
2753 				continue;
2754 		}
2755 		if (sent <= 0) {
2756 			dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2757 			     __func__, (int)size, len, sent);
2758 			break;
2759 		}
2760 		len    -= sent;
2761 		offset += sent;
2762 	} while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2763 	set_fs(oldfs);
2764 	clear_bit(NET_CONGESTED, &mdev->flags);
2765 
2766 	ok = (len == 0);
2767 	if (likely(ok))
2768 		mdev->send_cnt += size>>9;
2769 	return ok;
2770 }
2771 
2772 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2773 {
2774 	struct bio_vec *bvec;
2775 	int i;
2776 	/* hint all but last page with MSG_MORE */
2777 	bio_for_each_segment(bvec, bio, i) {
2778 		if (!_drbd_no_send_page(mdev, bvec->bv_page,
2779 				     bvec->bv_offset, bvec->bv_len,
2780 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2781 			return 0;
2782 	}
2783 	return 1;
2784 }
2785 
2786 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2787 {
2788 	struct bio_vec *bvec;
2789 	int i;
2790 	/* hint all but last page with MSG_MORE */
2791 	bio_for_each_segment(bvec, bio, i) {
2792 		if (!_drbd_send_page(mdev, bvec->bv_page,
2793 				     bvec->bv_offset, bvec->bv_len,
2794 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2795 			return 0;
2796 	}
2797 	return 1;
2798 }
2799 
2800 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2801 {
2802 	struct page *page = e->pages;
2803 	unsigned len = e->size;
2804 	/* hint all but last page with MSG_MORE */
2805 	page_chain_for_each(page) {
2806 		unsigned l = min_t(unsigned, len, PAGE_SIZE);
2807 		if (!_drbd_send_page(mdev, page, 0, l,
2808 				page_chain_next(page) ? MSG_MORE : 0))
2809 			return 0;
2810 		len -= l;
2811 	}
2812 	return 1;
2813 }
2814 
2815 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
2816 {
2817 	if (mdev->agreed_pro_version >= 95)
2818 		return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
2819 			(bi_rw & REQ_FUA ? DP_FUA : 0) |
2820 			(bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
2821 			(bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
2822 	else
2823 		return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
2824 }
2825 
2826 /* Used to send write requests
2827  * R_PRIMARY -> Peer	(P_DATA)
2828  */
2829 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2830 {
2831 	int ok = 1;
2832 	struct p_data p;
2833 	unsigned int dp_flags = 0;
2834 	void *dgb;
2835 	int dgs;
2836 
2837 	if (!drbd_get_data_sock(mdev))
2838 		return 0;
2839 
2840 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2841 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2842 
2843 	if (req->size <= DRBD_MAX_SIZE_H80_PACKET) {
2844 		p.head.h80.magic   = BE_DRBD_MAGIC;
2845 		p.head.h80.command = cpu_to_be16(P_DATA);
2846 		p.head.h80.length  =
2847 			cpu_to_be16(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2848 	} else {
2849 		p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2850 		p.head.h95.command = cpu_to_be16(P_DATA);
2851 		p.head.h95.length  =
2852 			cpu_to_be32(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2853 	}
2854 
2855 	p.sector   = cpu_to_be64(req->sector);
2856 	p.block_id = (unsigned long)req;
2857 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2858 
2859 	dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
2860 
2861 	if (mdev->state.conn >= C_SYNC_SOURCE &&
2862 	    mdev->state.conn <= C_PAUSED_SYNC_T)
2863 		dp_flags |= DP_MAY_SET_IN_SYNC;
2864 
2865 	p.dp_flags = cpu_to_be32(dp_flags);
2866 	set_bit(UNPLUG_REMOTE, &mdev->flags);
2867 	ok = (sizeof(p) ==
2868 		drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2869 	if (ok && dgs) {
2870 		dgb = mdev->int_dig_out;
2871 		drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2872 		ok = dgs == drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2873 	}
2874 	if (ok) {
2875 		/* For protocol A, we have to memcpy the payload into
2876 		 * socket buffers, as we may complete right away
2877 		 * as soon as we handed it over to tcp, at which point the data
2878 		 * pages may become invalid.
2879 		 *
2880 		 * For data-integrity enabled, we copy it as well, so we can be
2881 		 * sure that even if the bio pages may still be modified, it
2882 		 * won't change the data on the wire, thus if the digest checks
2883 		 * out ok after sending on this side, but does not fit on the
2884 		 * receiving side, we sure have detected corruption elsewhere.
2885 		 */
2886 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A || dgs)
2887 			ok = _drbd_send_bio(mdev, req->master_bio);
2888 		else
2889 			ok = _drbd_send_zc_bio(mdev, req->master_bio);
2890 
2891 		/* double check digest, sometimes buffers have been modified in flight. */
2892 		if (dgs > 0 && dgs <= 64) {
2893 			/* 64 byte, 512 bit, is the largest digest size
2894 			 * currently supported in kernel crypto. */
2895 			unsigned char digest[64];
2896 			drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, digest);
2897 			if (memcmp(mdev->int_dig_out, digest, dgs)) {
2898 				dev_warn(DEV,
2899 					"Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
2900 					(unsigned long long)req->sector, req->size);
2901 			}
2902 		} /* else if (dgs > 64) {
2903 		     ... Be noisy about digest too large ...
2904 		} */
2905 	}
2906 
2907 	drbd_put_data_sock(mdev);
2908 
2909 	return ok;
2910 }
2911 
2912 /* answer packet, used to send data back for read requests:
2913  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2914  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2915  */
2916 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2917 		    struct drbd_epoch_entry *e)
2918 {
2919 	int ok;
2920 	struct p_data p;
2921 	void *dgb;
2922 	int dgs;
2923 
2924 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2925 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2926 
2927 	if (e->size <= DRBD_MAX_SIZE_H80_PACKET) {
2928 		p.head.h80.magic   = BE_DRBD_MAGIC;
2929 		p.head.h80.command = cpu_to_be16(cmd);
2930 		p.head.h80.length  =
2931 			cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2932 	} else {
2933 		p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2934 		p.head.h95.command = cpu_to_be16(cmd);
2935 		p.head.h95.length  =
2936 			cpu_to_be32(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2937 	}
2938 
2939 	p.sector   = cpu_to_be64(e->sector);
2940 	p.block_id = e->block_id;
2941 	/* p.seq_num  = 0;    No sequence numbers here.. */
2942 
2943 	/* Only called by our kernel thread.
2944 	 * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2945 	 * in response to admin command or module unload.
2946 	 */
2947 	if (!drbd_get_data_sock(mdev))
2948 		return 0;
2949 
2950 	ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
2951 	if (ok && dgs) {
2952 		dgb = mdev->int_dig_out;
2953 		drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2954 		ok = dgs == drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2955 	}
2956 	if (ok)
2957 		ok = _drbd_send_zc_ee(mdev, e);
2958 
2959 	drbd_put_data_sock(mdev);
2960 
2961 	return ok;
2962 }
2963 
2964 int drbd_send_oos(struct drbd_conf *mdev, struct drbd_request *req)
2965 {
2966 	struct p_block_desc p;
2967 
2968 	p.sector  = cpu_to_be64(req->sector);
2969 	p.blksize = cpu_to_be32(req->size);
2970 
2971 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OUT_OF_SYNC, &p.head, sizeof(p));
2972 }
2973 
2974 /*
2975   drbd_send distinguishes two cases:
2976 
2977   Packets sent via the data socket "sock"
2978   and packets sent via the meta data socket "msock"
2979 
2980 		    sock                      msock
2981   -----------------+-------------------------+------------------------------
2982   timeout           conf.timeout / 2          conf.timeout / 2
2983   timeout action    send a ping via msock     Abort communication
2984 					      and close all sockets
2985 */
2986 
2987 /*
2988  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2989  */
2990 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2991 	      void *buf, size_t size, unsigned msg_flags)
2992 {
2993 	struct kvec iov;
2994 	struct msghdr msg;
2995 	int rv, sent = 0;
2996 
2997 	if (!sock)
2998 		return -1000;
2999 
3000 	/* THINK  if (signal_pending) return ... ? */
3001 
3002 	iov.iov_base = buf;
3003 	iov.iov_len  = size;
3004 
3005 	msg.msg_name       = NULL;
3006 	msg.msg_namelen    = 0;
3007 	msg.msg_control    = NULL;
3008 	msg.msg_controllen = 0;
3009 	msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
3010 
3011 	if (sock == mdev->data.socket) {
3012 		mdev->ko_count = mdev->net_conf->ko_count;
3013 		drbd_update_congested(mdev);
3014 	}
3015 	do {
3016 		/* STRANGE
3017 		 * tcp_sendmsg does _not_ use its size parameter at all ?
3018 		 *
3019 		 * -EAGAIN on timeout, -EINTR on signal.
3020 		 */
3021 /* THINK
3022  * do we need to block DRBD_SIG if sock == &meta.socket ??
3023  * otherwise wake_asender() might interrupt some send_*Ack !
3024  */
3025 		rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
3026 		if (rv == -EAGAIN) {
3027 			if (we_should_drop_the_connection(mdev, sock))
3028 				break;
3029 			else
3030 				continue;
3031 		}
3032 		D_ASSERT(rv != 0);
3033 		if (rv == -EINTR) {
3034 			flush_signals(current);
3035 			rv = 0;
3036 		}
3037 		if (rv < 0)
3038 			break;
3039 		sent += rv;
3040 		iov.iov_base += rv;
3041 		iov.iov_len  -= rv;
3042 	} while (sent < size);
3043 
3044 	if (sock == mdev->data.socket)
3045 		clear_bit(NET_CONGESTED, &mdev->flags);
3046 
3047 	if (rv <= 0) {
3048 		if (rv != -EAGAIN) {
3049 			dev_err(DEV, "%s_sendmsg returned %d\n",
3050 			    sock == mdev->meta.socket ? "msock" : "sock",
3051 			    rv);
3052 			drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
3053 		} else
3054 			drbd_force_state(mdev, NS(conn, C_TIMEOUT));
3055 	}
3056 
3057 	return sent;
3058 }
3059 
3060 static int drbd_open(struct block_device *bdev, fmode_t mode)
3061 {
3062 	struct drbd_conf *mdev = bdev->bd_disk->private_data;
3063 	unsigned long flags;
3064 	int rv = 0;
3065 
3066 	mutex_lock(&drbd_main_mutex);
3067 	spin_lock_irqsave(&mdev->req_lock, flags);
3068 	/* to have a stable mdev->state.role
3069 	 * and no race with updating open_cnt */
3070 
3071 	if (mdev->state.role != R_PRIMARY) {
3072 		if (mode & FMODE_WRITE)
3073 			rv = -EROFS;
3074 		else if (!allow_oos)
3075 			rv = -EMEDIUMTYPE;
3076 	}
3077 
3078 	if (!rv)
3079 		mdev->open_cnt++;
3080 	spin_unlock_irqrestore(&mdev->req_lock, flags);
3081 	mutex_unlock(&drbd_main_mutex);
3082 
3083 	return rv;
3084 }
3085 
3086 static int drbd_release(struct gendisk *gd, fmode_t mode)
3087 {
3088 	struct drbd_conf *mdev = gd->private_data;
3089 	mutex_lock(&drbd_main_mutex);
3090 	mdev->open_cnt--;
3091 	mutex_unlock(&drbd_main_mutex);
3092 	return 0;
3093 }
3094 
3095 static void drbd_set_defaults(struct drbd_conf *mdev)
3096 {
3097 	/* This way we get a compile error when sync_conf grows,
3098 	   and we forgot to initialize it here */
3099 	mdev->sync_conf = (struct syncer_conf) {
3100 		/* .rate = */		DRBD_RATE_DEF,
3101 		/* .after = */		DRBD_AFTER_DEF,
3102 		/* .al_extents = */	DRBD_AL_EXTENTS_DEF,
3103 		/* .verify_alg = */	{}, 0,
3104 		/* .cpu_mask = */	{}, 0,
3105 		/* .csums_alg = */	{}, 0,
3106 		/* .use_rle = */	0,
3107 		/* .on_no_data = */	DRBD_ON_NO_DATA_DEF,
3108 		/* .c_plan_ahead = */	DRBD_C_PLAN_AHEAD_DEF,
3109 		/* .c_delay_target = */	DRBD_C_DELAY_TARGET_DEF,
3110 		/* .c_fill_target = */	DRBD_C_FILL_TARGET_DEF,
3111 		/* .c_max_rate = */	DRBD_C_MAX_RATE_DEF,
3112 		/* .c_min_rate = */	DRBD_C_MIN_RATE_DEF
3113 	};
3114 
3115 	/* Have to use that way, because the layout differs between
3116 	   big endian and little endian */
3117 	mdev->state = (union drbd_state) {
3118 		{ .role = R_SECONDARY,
3119 		  .peer = R_UNKNOWN,
3120 		  .conn = C_STANDALONE,
3121 		  .disk = D_DISKLESS,
3122 		  .pdsk = D_UNKNOWN,
3123 		  .susp = 0,
3124 		  .susp_nod = 0,
3125 		  .susp_fen = 0
3126 		} };
3127 }
3128 
3129 void drbd_init_set_defaults(struct drbd_conf *mdev)
3130 {
3131 	/* the memset(,0,) did most of this.
3132 	 * note: only assignments, no allocation in here */
3133 
3134 	drbd_set_defaults(mdev);
3135 
3136 	atomic_set(&mdev->ap_bio_cnt, 0);
3137 	atomic_set(&mdev->ap_pending_cnt, 0);
3138 	atomic_set(&mdev->rs_pending_cnt, 0);
3139 	atomic_set(&mdev->unacked_cnt, 0);
3140 	atomic_set(&mdev->local_cnt, 0);
3141 	atomic_set(&mdev->net_cnt, 0);
3142 	atomic_set(&mdev->packet_seq, 0);
3143 	atomic_set(&mdev->pp_in_use, 0);
3144 	atomic_set(&mdev->pp_in_use_by_net, 0);
3145 	atomic_set(&mdev->rs_sect_in, 0);
3146 	atomic_set(&mdev->rs_sect_ev, 0);
3147 	atomic_set(&mdev->ap_in_flight, 0);
3148 	atomic_set(&mdev->md_io_in_use, 0);
3149 
3150 	mutex_init(&mdev->data.mutex);
3151 	mutex_init(&mdev->meta.mutex);
3152 	sema_init(&mdev->data.work.s, 0);
3153 	sema_init(&mdev->meta.work.s, 0);
3154 	mutex_init(&mdev->state_mutex);
3155 
3156 	spin_lock_init(&mdev->data.work.q_lock);
3157 	spin_lock_init(&mdev->meta.work.q_lock);
3158 
3159 	spin_lock_init(&mdev->al_lock);
3160 	spin_lock_init(&mdev->req_lock);
3161 	spin_lock_init(&mdev->peer_seq_lock);
3162 	spin_lock_init(&mdev->epoch_lock);
3163 
3164 	INIT_LIST_HEAD(&mdev->active_ee);
3165 	INIT_LIST_HEAD(&mdev->sync_ee);
3166 	INIT_LIST_HEAD(&mdev->done_ee);
3167 	INIT_LIST_HEAD(&mdev->read_ee);
3168 	INIT_LIST_HEAD(&mdev->net_ee);
3169 	INIT_LIST_HEAD(&mdev->resync_reads);
3170 	INIT_LIST_HEAD(&mdev->data.work.q);
3171 	INIT_LIST_HEAD(&mdev->meta.work.q);
3172 	INIT_LIST_HEAD(&mdev->resync_work.list);
3173 	INIT_LIST_HEAD(&mdev->unplug_work.list);
3174 	INIT_LIST_HEAD(&mdev->go_diskless.list);
3175 	INIT_LIST_HEAD(&mdev->md_sync_work.list);
3176 	INIT_LIST_HEAD(&mdev->start_resync_work.list);
3177 	INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
3178 
3179 	mdev->resync_work.cb  = w_resync_timer;
3180 	mdev->unplug_work.cb  = w_send_write_hint;
3181 	mdev->go_diskless.cb  = w_go_diskless;
3182 	mdev->md_sync_work.cb = w_md_sync;
3183 	mdev->bm_io_work.w.cb = w_bitmap_io;
3184 	mdev->start_resync_work.cb = w_start_resync;
3185 	init_timer(&mdev->resync_timer);
3186 	init_timer(&mdev->md_sync_timer);
3187 	init_timer(&mdev->start_resync_timer);
3188 	init_timer(&mdev->request_timer);
3189 	mdev->resync_timer.function = resync_timer_fn;
3190 	mdev->resync_timer.data = (unsigned long) mdev;
3191 	mdev->md_sync_timer.function = md_sync_timer_fn;
3192 	mdev->md_sync_timer.data = (unsigned long) mdev;
3193 	mdev->start_resync_timer.function = start_resync_timer_fn;
3194 	mdev->start_resync_timer.data = (unsigned long) mdev;
3195 	mdev->request_timer.function = request_timer_fn;
3196 	mdev->request_timer.data = (unsigned long) mdev;
3197 
3198 	init_waitqueue_head(&mdev->misc_wait);
3199 	init_waitqueue_head(&mdev->state_wait);
3200 	init_waitqueue_head(&mdev->net_cnt_wait);
3201 	init_waitqueue_head(&mdev->ee_wait);
3202 	init_waitqueue_head(&mdev->al_wait);
3203 	init_waitqueue_head(&mdev->seq_wait);
3204 
3205 	drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
3206 	drbd_thread_init(mdev, &mdev->worker, drbd_worker);
3207 	drbd_thread_init(mdev, &mdev->asender, drbd_asender);
3208 
3209 	mdev->agreed_pro_version = PRO_VERSION_MAX;
3210 	mdev->write_ordering = WO_bdev_flush;
3211 	mdev->resync_wenr = LC_FREE;
3212 	mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
3213 	mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
3214 }
3215 
3216 void drbd_mdev_cleanup(struct drbd_conf *mdev)
3217 {
3218 	int i;
3219 	if (mdev->receiver.t_state != None)
3220 		dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
3221 				mdev->receiver.t_state);
3222 
3223 	/* no need to lock it, I'm the only thread alive */
3224 	if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
3225 		dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
3226 	mdev->al_writ_cnt  =
3227 	mdev->bm_writ_cnt  =
3228 	mdev->read_cnt     =
3229 	mdev->recv_cnt     =
3230 	mdev->send_cnt     =
3231 	mdev->writ_cnt     =
3232 	mdev->p_size       =
3233 	mdev->rs_start     =
3234 	mdev->rs_total     =
3235 	mdev->rs_failed    = 0;
3236 	mdev->rs_last_events = 0;
3237 	mdev->rs_last_sect_ev = 0;
3238 	for (i = 0; i < DRBD_SYNC_MARKS; i++) {
3239 		mdev->rs_mark_left[i] = 0;
3240 		mdev->rs_mark_time[i] = 0;
3241 	}
3242 	D_ASSERT(mdev->net_conf == NULL);
3243 
3244 	drbd_set_my_capacity(mdev, 0);
3245 	if (mdev->bitmap) {
3246 		/* maybe never allocated. */
3247 		drbd_bm_resize(mdev, 0, 1);
3248 		drbd_bm_cleanup(mdev);
3249 	}
3250 
3251 	drbd_free_resources(mdev);
3252 	clear_bit(AL_SUSPENDED, &mdev->flags);
3253 
3254 	/*
3255 	 * currently we drbd_init_ee only on module load, so
3256 	 * we may do drbd_release_ee only on module unload!
3257 	 */
3258 	D_ASSERT(list_empty(&mdev->active_ee));
3259 	D_ASSERT(list_empty(&mdev->sync_ee));
3260 	D_ASSERT(list_empty(&mdev->done_ee));
3261 	D_ASSERT(list_empty(&mdev->read_ee));
3262 	D_ASSERT(list_empty(&mdev->net_ee));
3263 	D_ASSERT(list_empty(&mdev->resync_reads));
3264 	D_ASSERT(list_empty(&mdev->data.work.q));
3265 	D_ASSERT(list_empty(&mdev->meta.work.q));
3266 	D_ASSERT(list_empty(&mdev->resync_work.list));
3267 	D_ASSERT(list_empty(&mdev->unplug_work.list));
3268 	D_ASSERT(list_empty(&mdev->go_diskless.list));
3269 
3270 	drbd_set_defaults(mdev);
3271 }
3272 
3273 
3274 static void drbd_destroy_mempools(void)
3275 {
3276 	struct page *page;
3277 
3278 	while (drbd_pp_pool) {
3279 		page = drbd_pp_pool;
3280 		drbd_pp_pool = (struct page *)page_private(page);
3281 		__free_page(page);
3282 		drbd_pp_vacant--;
3283 	}
3284 
3285 	/* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
3286 
3287 	if (drbd_md_io_bio_set)
3288 		bioset_free(drbd_md_io_bio_set);
3289 	if (drbd_md_io_page_pool)
3290 		mempool_destroy(drbd_md_io_page_pool);
3291 	if (drbd_ee_mempool)
3292 		mempool_destroy(drbd_ee_mempool);
3293 	if (drbd_request_mempool)
3294 		mempool_destroy(drbd_request_mempool);
3295 	if (drbd_ee_cache)
3296 		kmem_cache_destroy(drbd_ee_cache);
3297 	if (drbd_request_cache)
3298 		kmem_cache_destroy(drbd_request_cache);
3299 	if (drbd_bm_ext_cache)
3300 		kmem_cache_destroy(drbd_bm_ext_cache);
3301 	if (drbd_al_ext_cache)
3302 		kmem_cache_destroy(drbd_al_ext_cache);
3303 
3304 	drbd_md_io_bio_set   = NULL;
3305 	drbd_md_io_page_pool = NULL;
3306 	drbd_ee_mempool      = NULL;
3307 	drbd_request_mempool = NULL;
3308 	drbd_ee_cache        = NULL;
3309 	drbd_request_cache   = NULL;
3310 	drbd_bm_ext_cache    = NULL;
3311 	drbd_al_ext_cache    = NULL;
3312 
3313 	return;
3314 }
3315 
3316 static int drbd_create_mempools(void)
3317 {
3318 	struct page *page;
3319 	const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
3320 	int i;
3321 
3322 	/* prepare our caches and mempools */
3323 	drbd_request_mempool = NULL;
3324 	drbd_ee_cache        = NULL;
3325 	drbd_request_cache   = NULL;
3326 	drbd_bm_ext_cache    = NULL;
3327 	drbd_al_ext_cache    = NULL;
3328 	drbd_pp_pool         = NULL;
3329 	drbd_md_io_page_pool = NULL;
3330 	drbd_md_io_bio_set   = NULL;
3331 
3332 	/* caches */
3333 	drbd_request_cache = kmem_cache_create(
3334 		"drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
3335 	if (drbd_request_cache == NULL)
3336 		goto Enomem;
3337 
3338 	drbd_ee_cache = kmem_cache_create(
3339 		"drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
3340 	if (drbd_ee_cache == NULL)
3341 		goto Enomem;
3342 
3343 	drbd_bm_ext_cache = kmem_cache_create(
3344 		"drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
3345 	if (drbd_bm_ext_cache == NULL)
3346 		goto Enomem;
3347 
3348 	drbd_al_ext_cache = kmem_cache_create(
3349 		"drbd_al", sizeof(struct lc_element), 0, 0, NULL);
3350 	if (drbd_al_ext_cache == NULL)
3351 		goto Enomem;
3352 
3353 	/* mempools */
3354 #ifdef COMPAT_HAVE_BIOSET_CREATE
3355 	drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
3356 	if (drbd_md_io_bio_set == NULL)
3357 		goto Enomem;
3358 #endif
3359 
3360 	drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
3361 	if (drbd_md_io_page_pool == NULL)
3362 		goto Enomem;
3363 
3364 	drbd_request_mempool = mempool_create(number,
3365 		mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
3366 	if (drbd_request_mempool == NULL)
3367 		goto Enomem;
3368 
3369 	drbd_ee_mempool = mempool_create(number,
3370 		mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
3371 	if (drbd_ee_mempool == NULL)
3372 		goto Enomem;
3373 
3374 	/* drbd's page pool */
3375 	spin_lock_init(&drbd_pp_lock);
3376 
3377 	for (i = 0; i < number; i++) {
3378 		page = alloc_page(GFP_HIGHUSER);
3379 		if (!page)
3380 			goto Enomem;
3381 		set_page_private(page, (unsigned long)drbd_pp_pool);
3382 		drbd_pp_pool = page;
3383 	}
3384 	drbd_pp_vacant = number;
3385 
3386 	return 0;
3387 
3388 Enomem:
3389 	drbd_destroy_mempools(); /* in case we allocated some */
3390 	return -ENOMEM;
3391 }
3392 
3393 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
3394 	void *unused)
3395 {
3396 	/* just so we have it.  you never know what interesting things we
3397 	 * might want to do here some day...
3398 	 */
3399 
3400 	return NOTIFY_DONE;
3401 }
3402 
3403 static struct notifier_block drbd_notifier = {
3404 	.notifier_call = drbd_notify_sys,
3405 };
3406 
3407 static void drbd_release_ee_lists(struct drbd_conf *mdev)
3408 {
3409 	int rr;
3410 
3411 	rr = drbd_release_ee(mdev, &mdev->active_ee);
3412 	if (rr)
3413 		dev_err(DEV, "%d EEs in active list found!\n", rr);
3414 
3415 	rr = drbd_release_ee(mdev, &mdev->sync_ee);
3416 	if (rr)
3417 		dev_err(DEV, "%d EEs in sync list found!\n", rr);
3418 
3419 	rr = drbd_release_ee(mdev, &mdev->read_ee);
3420 	if (rr)
3421 		dev_err(DEV, "%d EEs in read list found!\n", rr);
3422 
3423 	rr = drbd_release_ee(mdev, &mdev->done_ee);
3424 	if (rr)
3425 		dev_err(DEV, "%d EEs in done list found!\n", rr);
3426 
3427 	rr = drbd_release_ee(mdev, &mdev->net_ee);
3428 	if (rr)
3429 		dev_err(DEV, "%d EEs in net list found!\n", rr);
3430 }
3431 
3432 /* caution. no locking.
3433  * currently only used from module cleanup code. */
3434 static void drbd_delete_device(unsigned int minor)
3435 {
3436 	struct drbd_conf *mdev = minor_to_mdev(minor);
3437 
3438 	if (!mdev)
3439 		return;
3440 
3441 	del_timer_sync(&mdev->request_timer);
3442 
3443 	/* paranoia asserts */
3444 	if (mdev->open_cnt != 0)
3445 		dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
3446 				__FILE__ , __LINE__);
3447 
3448 	ERR_IF (!list_empty(&mdev->data.work.q)) {
3449 		struct list_head *lp;
3450 		list_for_each(lp, &mdev->data.work.q) {
3451 			dev_err(DEV, "lp = %p\n", lp);
3452 		}
3453 	};
3454 	/* end paranoia asserts */
3455 
3456 	del_gendisk(mdev->vdisk);
3457 
3458 	/* cleanup stuff that may have been allocated during
3459 	 * device (re-)configuration or state changes */
3460 
3461 	if (mdev->this_bdev)
3462 		bdput(mdev->this_bdev);
3463 
3464 	drbd_free_resources(mdev);
3465 
3466 	drbd_release_ee_lists(mdev);
3467 
3468 	/* should be freed on disconnect? */
3469 	kfree(mdev->ee_hash);
3470 	/*
3471 	mdev->ee_hash_s = 0;
3472 	mdev->ee_hash = NULL;
3473 	*/
3474 
3475 	lc_destroy(mdev->act_log);
3476 	lc_destroy(mdev->resync);
3477 
3478 	kfree(mdev->p_uuid);
3479 	/* mdev->p_uuid = NULL; */
3480 
3481 	kfree(mdev->int_dig_out);
3482 	kfree(mdev->int_dig_in);
3483 	kfree(mdev->int_dig_vv);
3484 
3485 	/* cleanup the rest that has been
3486 	 * allocated from drbd_new_device
3487 	 * and actually free the mdev itself */
3488 	drbd_free_mdev(mdev);
3489 }
3490 
3491 static void drbd_cleanup(void)
3492 {
3493 	unsigned int i;
3494 
3495 	unregister_reboot_notifier(&drbd_notifier);
3496 
3497 	/* first remove proc,
3498 	 * drbdsetup uses it's presence to detect
3499 	 * whether DRBD is loaded.
3500 	 * If we would get stuck in proc removal,
3501 	 * but have netlink already deregistered,
3502 	 * some drbdsetup commands may wait forever
3503 	 * for an answer.
3504 	 */
3505 	if (drbd_proc)
3506 		remove_proc_entry("drbd", NULL);
3507 
3508 	drbd_nl_cleanup();
3509 
3510 	if (minor_table) {
3511 		i = minor_count;
3512 		while (i--)
3513 			drbd_delete_device(i);
3514 		drbd_destroy_mempools();
3515 	}
3516 
3517 	kfree(minor_table);
3518 
3519 	unregister_blkdev(DRBD_MAJOR, "drbd");
3520 
3521 	printk(KERN_INFO "drbd: module cleanup done.\n");
3522 }
3523 
3524 /**
3525  * drbd_congested() - Callback for the flusher thread
3526  * @congested_data:	User data
3527  * @bdi_bits:		Bits the BDI flusher thread is currently interested in
3528  *
3529  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3530  */
3531 static int drbd_congested(void *congested_data, int bdi_bits)
3532 {
3533 	struct drbd_conf *mdev = congested_data;
3534 	struct request_queue *q;
3535 	char reason = '-';
3536 	int r = 0;
3537 
3538 	if (!may_inc_ap_bio(mdev)) {
3539 		/* DRBD has frozen IO */
3540 		r = bdi_bits;
3541 		reason = 'd';
3542 		goto out;
3543 	}
3544 
3545 	if (test_bit(CALLBACK_PENDING, &mdev->flags)) {
3546 		r |= (1 << BDI_async_congested);
3547 		/* Without good local data, we would need to read from remote,
3548 		 * and that would need the worker thread as well, which is
3549 		 * currently blocked waiting for that usermode helper to
3550 		 * finish.
3551 		 */
3552 		if (!get_ldev_if_state(mdev, D_UP_TO_DATE))
3553 			r |= (1 << BDI_sync_congested);
3554 		else
3555 			put_ldev(mdev);
3556 		r &= bdi_bits;
3557 		reason = 'c';
3558 		goto out;
3559 	}
3560 
3561 	if (get_ldev(mdev)) {
3562 		q = bdev_get_queue(mdev->ldev->backing_bdev);
3563 		r = bdi_congested(&q->backing_dev_info, bdi_bits);
3564 		put_ldev(mdev);
3565 		if (r)
3566 			reason = 'b';
3567 	}
3568 
3569 	if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3570 		r |= (1 << BDI_async_congested);
3571 		reason = reason == 'b' ? 'a' : 'n';
3572 	}
3573 
3574 out:
3575 	mdev->congestion_reason = reason;
3576 	return r;
3577 }
3578 
3579 struct drbd_conf *drbd_new_device(unsigned int minor)
3580 {
3581 	struct drbd_conf *mdev;
3582 	struct gendisk *disk;
3583 	struct request_queue *q;
3584 
3585 	/* GFP_KERNEL, we are outside of all write-out paths */
3586 	mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3587 	if (!mdev)
3588 		return NULL;
3589 	if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3590 		goto out_no_cpumask;
3591 
3592 	mdev->minor = minor;
3593 
3594 	drbd_init_set_defaults(mdev);
3595 
3596 	q = blk_alloc_queue(GFP_KERNEL);
3597 	if (!q)
3598 		goto out_no_q;
3599 	mdev->rq_queue = q;
3600 	q->queuedata   = mdev;
3601 
3602 	disk = alloc_disk(1);
3603 	if (!disk)
3604 		goto out_no_disk;
3605 	mdev->vdisk = disk;
3606 
3607 	set_disk_ro(disk, true);
3608 
3609 	disk->queue = q;
3610 	disk->major = DRBD_MAJOR;
3611 	disk->first_minor = minor;
3612 	disk->fops = &drbd_ops;
3613 	sprintf(disk->disk_name, "drbd%d", minor);
3614 	disk->private_data = mdev;
3615 
3616 	mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3617 	/* we have no partitions. we contain only ourselves. */
3618 	mdev->this_bdev->bd_contains = mdev->this_bdev;
3619 
3620 	q->backing_dev_info.congested_fn = drbd_congested;
3621 	q->backing_dev_info.congested_data = mdev;
3622 
3623 	blk_queue_make_request(q, drbd_make_request);
3624 	blk_queue_flush(q, REQ_FLUSH | REQ_FUA);
3625 	/* Setting the max_hw_sectors to an odd value of 8kibyte here
3626 	   This triggers a max_bio_size message upon first attach or connect */
3627 	blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
3628 	blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3629 	blk_queue_merge_bvec(q, drbd_merge_bvec);
3630 	q->queue_lock = &mdev->req_lock;
3631 
3632 	mdev->md_io_page = alloc_page(GFP_KERNEL);
3633 	if (!mdev->md_io_page)
3634 		goto out_no_io_page;
3635 
3636 	if (drbd_bm_init(mdev))
3637 		goto out_no_bitmap;
3638 	/* no need to lock access, we are still initializing this minor device. */
3639 	if (!tl_init(mdev))
3640 		goto out_no_tl;
3641 
3642 	mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3643 	if (!mdev->app_reads_hash)
3644 		goto out_no_app_reads;
3645 
3646 	mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3647 	if (!mdev->current_epoch)
3648 		goto out_no_epoch;
3649 
3650 	INIT_LIST_HEAD(&mdev->current_epoch->list);
3651 	mdev->epochs = 1;
3652 
3653 	return mdev;
3654 
3655 /* out_whatever_else:
3656 	kfree(mdev->current_epoch); */
3657 out_no_epoch:
3658 	kfree(mdev->app_reads_hash);
3659 out_no_app_reads:
3660 	tl_cleanup(mdev);
3661 out_no_tl:
3662 	drbd_bm_cleanup(mdev);
3663 out_no_bitmap:
3664 	__free_page(mdev->md_io_page);
3665 out_no_io_page:
3666 	put_disk(disk);
3667 out_no_disk:
3668 	blk_cleanup_queue(q);
3669 out_no_q:
3670 	free_cpumask_var(mdev->cpu_mask);
3671 out_no_cpumask:
3672 	kfree(mdev);
3673 	return NULL;
3674 }
3675 
3676 /* counterpart of drbd_new_device.
3677  * last part of drbd_delete_device. */
3678 void drbd_free_mdev(struct drbd_conf *mdev)
3679 {
3680 	kfree(mdev->current_epoch);
3681 	kfree(mdev->app_reads_hash);
3682 	tl_cleanup(mdev);
3683 	if (mdev->bitmap) /* should no longer be there. */
3684 		drbd_bm_cleanup(mdev);
3685 	__free_page(mdev->md_io_page);
3686 	put_disk(mdev->vdisk);
3687 	blk_cleanup_queue(mdev->rq_queue);
3688 	free_cpumask_var(mdev->cpu_mask);
3689 	drbd_free_tl_hash(mdev);
3690 	kfree(mdev);
3691 }
3692 
3693 
3694 int __init drbd_init(void)
3695 {
3696 	int err;
3697 
3698 	if (sizeof(struct p_handshake) != 80) {
3699 		printk(KERN_ERR
3700 		       "drbd: never change the size or layout "
3701 		       "of the HandShake packet.\n");
3702 		return -EINVAL;
3703 	}
3704 
3705 	if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
3706 		printk(KERN_ERR
3707 			"drbd: invalid minor_count (%d)\n", minor_count);
3708 #ifdef MODULE
3709 		return -EINVAL;
3710 #else
3711 		minor_count = 8;
3712 #endif
3713 	}
3714 
3715 	err = drbd_nl_init();
3716 	if (err)
3717 		return err;
3718 
3719 	err = register_blkdev(DRBD_MAJOR, "drbd");
3720 	if (err) {
3721 		printk(KERN_ERR
3722 		       "drbd: unable to register block device major %d\n",
3723 		       DRBD_MAJOR);
3724 		return err;
3725 	}
3726 
3727 	register_reboot_notifier(&drbd_notifier);
3728 
3729 	/*
3730 	 * allocate all necessary structs
3731 	 */
3732 	err = -ENOMEM;
3733 
3734 	init_waitqueue_head(&drbd_pp_wait);
3735 
3736 	drbd_proc = NULL; /* play safe for drbd_cleanup */
3737 	minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3738 				GFP_KERNEL);
3739 	if (!minor_table)
3740 		goto Enomem;
3741 
3742 	err = drbd_create_mempools();
3743 	if (err)
3744 		goto Enomem;
3745 
3746 	drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3747 	if (!drbd_proc)	{
3748 		printk(KERN_ERR "drbd: unable to register proc file\n");
3749 		goto Enomem;
3750 	}
3751 
3752 	rwlock_init(&global_state_lock);
3753 
3754 	printk(KERN_INFO "drbd: initialized. "
3755 	       "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3756 	       API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3757 	printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3758 	printk(KERN_INFO "drbd: registered as block device major %d\n",
3759 		DRBD_MAJOR);
3760 	printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3761 
3762 	return 0; /* Success! */
3763 
3764 Enomem:
3765 	drbd_cleanup();
3766 	if (err == -ENOMEM)
3767 		/* currently always the case */
3768 		printk(KERN_ERR "drbd: ran out of memory\n");
3769 	else
3770 		printk(KERN_ERR "drbd: initialization failure\n");
3771 	return err;
3772 }
3773 
3774 void drbd_free_bc(struct drbd_backing_dev *ldev)
3775 {
3776 	if (ldev == NULL)
3777 		return;
3778 
3779 	blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3780 	blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3781 
3782 	kfree(ldev);
3783 }
3784 
3785 void drbd_free_sock(struct drbd_conf *mdev)
3786 {
3787 	if (mdev->data.socket) {
3788 		mutex_lock(&mdev->data.mutex);
3789 		kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3790 		sock_release(mdev->data.socket);
3791 		mdev->data.socket = NULL;
3792 		mutex_unlock(&mdev->data.mutex);
3793 	}
3794 	if (mdev->meta.socket) {
3795 		mutex_lock(&mdev->meta.mutex);
3796 		kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3797 		sock_release(mdev->meta.socket);
3798 		mdev->meta.socket = NULL;
3799 		mutex_unlock(&mdev->meta.mutex);
3800 	}
3801 }
3802 
3803 
3804 void drbd_free_resources(struct drbd_conf *mdev)
3805 {
3806 	crypto_free_hash(mdev->csums_tfm);
3807 	mdev->csums_tfm = NULL;
3808 	crypto_free_hash(mdev->verify_tfm);
3809 	mdev->verify_tfm = NULL;
3810 	crypto_free_hash(mdev->cram_hmac_tfm);
3811 	mdev->cram_hmac_tfm = NULL;
3812 	crypto_free_hash(mdev->integrity_w_tfm);
3813 	mdev->integrity_w_tfm = NULL;
3814 	crypto_free_hash(mdev->integrity_r_tfm);
3815 	mdev->integrity_r_tfm = NULL;
3816 
3817 	drbd_free_sock(mdev);
3818 
3819 	__no_warn(local,
3820 		  drbd_free_bc(mdev->ldev);
3821 		  mdev->ldev = NULL;);
3822 }
3823 
3824 /* meta data management */
3825 
3826 struct meta_data_on_disk {
3827 	u64 la_size;           /* last agreed size. */
3828 	u64 uuid[UI_SIZE];   /* UUIDs. */
3829 	u64 device_uuid;
3830 	u64 reserved_u64_1;
3831 	u32 flags;             /* MDF */
3832 	u32 magic;
3833 	u32 md_size_sect;
3834 	u32 al_offset;         /* offset to this block */
3835 	u32 al_nr_extents;     /* important for restoring the AL */
3836 	      /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3837 	u32 bm_offset;         /* offset to the bitmap, from here */
3838 	u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3839 	u32 la_peer_max_bio_size;   /* last peer max_bio_size */
3840 	u32 reserved_u32[3];
3841 
3842 } __packed;
3843 
3844 /**
3845  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3846  * @mdev:	DRBD device.
3847  */
3848 void drbd_md_sync(struct drbd_conf *mdev)
3849 {
3850 	struct meta_data_on_disk *buffer;
3851 	sector_t sector;
3852 	int i;
3853 
3854 	del_timer(&mdev->md_sync_timer);
3855 	/* timer may be rearmed by drbd_md_mark_dirty() now. */
3856 	if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3857 		return;
3858 
3859 	/* We use here D_FAILED and not D_ATTACHING because we try to write
3860 	 * metadata even if we detach due to a disk failure! */
3861 	if (!get_ldev_if_state(mdev, D_FAILED))
3862 		return;
3863 
3864 	buffer = drbd_md_get_buffer(mdev);
3865 	if (!buffer)
3866 		goto out;
3867 
3868 	memset(buffer, 0, 512);
3869 
3870 	buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3871 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3872 		buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3873 	buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3874 	buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3875 
3876 	buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3877 	buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3878 	buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3879 	buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3880 	buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3881 
3882 	buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3883 	buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
3884 
3885 	D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3886 	sector = mdev->ldev->md.md_offset;
3887 
3888 	if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3889 		/* this was a try anyways ... */
3890 		dev_err(DEV, "meta data update failed!\n");
3891 		drbd_chk_io_error(mdev, 1, DRBD_META_IO_ERROR);
3892 	}
3893 
3894 	/* Update mdev->ldev->md.la_size_sect,
3895 	 * since we updated it on metadata. */
3896 	mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3897 
3898 	drbd_md_put_buffer(mdev);
3899 out:
3900 	put_ldev(mdev);
3901 }
3902 
3903 /**
3904  * drbd_md_read() - Reads in the meta data super block
3905  * @mdev:	DRBD device.
3906  * @bdev:	Device from which the meta data should be read in.
3907  *
3908  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
3909  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3910  */
3911 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3912 {
3913 	struct meta_data_on_disk *buffer;
3914 	int i, rv = NO_ERROR;
3915 
3916 	if (!get_ldev_if_state(mdev, D_ATTACHING))
3917 		return ERR_IO_MD_DISK;
3918 
3919 	buffer = drbd_md_get_buffer(mdev);
3920 	if (!buffer)
3921 		goto out;
3922 
3923 	if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3924 		/* NOTE: can't do normal error processing here as this is
3925 		   called BEFORE disk is attached */
3926 		dev_err(DEV, "Error while reading metadata.\n");
3927 		rv = ERR_IO_MD_DISK;
3928 		goto err;
3929 	}
3930 
3931 	if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3932 		dev_err(DEV, "Error while reading metadata, magic not found.\n");
3933 		rv = ERR_MD_INVALID;
3934 		goto err;
3935 	}
3936 	if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3937 		dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3938 		    be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3939 		rv = ERR_MD_INVALID;
3940 		goto err;
3941 	}
3942 	if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3943 		dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3944 		    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3945 		rv = ERR_MD_INVALID;
3946 		goto err;
3947 	}
3948 	if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3949 		dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3950 		    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3951 		rv = ERR_MD_INVALID;
3952 		goto err;
3953 	}
3954 
3955 	if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3956 		dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3957 		    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3958 		rv = ERR_MD_INVALID;
3959 		goto err;
3960 	}
3961 
3962 	bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3963 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3964 		bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3965 	bdev->md.flags = be32_to_cpu(buffer->flags);
3966 	mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3967 	bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3968 
3969 	spin_lock_irq(&mdev->req_lock);
3970 	if (mdev->state.conn < C_CONNECTED) {
3971 		unsigned int peer;
3972 		peer = be32_to_cpu(buffer->la_peer_max_bio_size);
3973 		peer = max(peer, DRBD_MAX_BIO_SIZE_SAFE);
3974 		mdev->peer_max_bio_size = peer;
3975 	}
3976 	spin_unlock_irq(&mdev->req_lock);
3977 
3978 	if (mdev->sync_conf.al_extents < 7)
3979 		mdev->sync_conf.al_extents = 127;
3980 
3981  err:
3982 	drbd_md_put_buffer(mdev);
3983  out:
3984 	put_ldev(mdev);
3985 
3986 	return rv;
3987 }
3988 
3989 /**
3990  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3991  * @mdev:	DRBD device.
3992  *
3993  * Call this function if you change anything that should be written to
3994  * the meta-data super block. This function sets MD_DIRTY, and starts a
3995  * timer that ensures that within five seconds you have to call drbd_md_sync().
3996  */
3997 #ifdef DEBUG
3998 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3999 {
4000 	if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
4001 		mod_timer(&mdev->md_sync_timer, jiffies + HZ);
4002 		mdev->last_md_mark_dirty.line = line;
4003 		mdev->last_md_mark_dirty.func = func;
4004 	}
4005 }
4006 #else
4007 void drbd_md_mark_dirty(struct drbd_conf *mdev)
4008 {
4009 	if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
4010 		mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
4011 }
4012 #endif
4013 
4014 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
4015 {
4016 	int i;
4017 
4018 	for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
4019 		mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
4020 }
4021 
4022 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
4023 {
4024 	if (idx == UI_CURRENT) {
4025 		if (mdev->state.role == R_PRIMARY)
4026 			val |= 1;
4027 		else
4028 			val &= ~((u64)1);
4029 
4030 		drbd_set_ed_uuid(mdev, val);
4031 	}
4032 
4033 	mdev->ldev->md.uuid[idx] = val;
4034 	drbd_md_mark_dirty(mdev);
4035 }
4036 
4037 
4038 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
4039 {
4040 	if (mdev->ldev->md.uuid[idx]) {
4041 		drbd_uuid_move_history(mdev);
4042 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
4043 	}
4044 	_drbd_uuid_set(mdev, idx, val);
4045 }
4046 
4047 /**
4048  * drbd_uuid_new_current() - Creates a new current UUID
4049  * @mdev:	DRBD device.
4050  *
4051  * Creates a new current UUID, and rotates the old current UUID into
4052  * the bitmap slot. Causes an incremental resync upon next connect.
4053  */
4054 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
4055 {
4056 	u64 val;
4057 	unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
4058 
4059 	if (bm_uuid)
4060 		dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
4061 
4062 	mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
4063 
4064 	get_random_bytes(&val, sizeof(u64));
4065 	_drbd_uuid_set(mdev, UI_CURRENT, val);
4066 	drbd_print_uuids(mdev, "new current UUID");
4067 	/* get it to stable storage _now_ */
4068 	drbd_md_sync(mdev);
4069 }
4070 
4071 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
4072 {
4073 	if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
4074 		return;
4075 
4076 	if (val == 0) {
4077 		drbd_uuid_move_history(mdev);
4078 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
4079 		mdev->ldev->md.uuid[UI_BITMAP] = 0;
4080 	} else {
4081 		unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
4082 		if (bm_uuid)
4083 			dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
4084 
4085 		mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
4086 	}
4087 	drbd_md_mark_dirty(mdev);
4088 }
4089 
4090 /**
4091  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
4092  * @mdev:	DRBD device.
4093  *
4094  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
4095  */
4096 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
4097 {
4098 	int rv = -EIO;
4099 
4100 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
4101 		drbd_md_set_flag(mdev, MDF_FULL_SYNC);
4102 		drbd_md_sync(mdev);
4103 		drbd_bm_set_all(mdev);
4104 
4105 		rv = drbd_bm_write(mdev);
4106 
4107 		if (!rv) {
4108 			drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
4109 			drbd_md_sync(mdev);
4110 		}
4111 
4112 		put_ldev(mdev);
4113 	}
4114 
4115 	return rv;
4116 }
4117 
4118 /**
4119  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
4120  * @mdev:	DRBD device.
4121  *
4122  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
4123  */
4124 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
4125 {
4126 	int rv = -EIO;
4127 
4128 	drbd_resume_al(mdev);
4129 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
4130 		drbd_bm_clear_all(mdev);
4131 		rv = drbd_bm_write(mdev);
4132 		put_ldev(mdev);
4133 	}
4134 
4135 	return rv;
4136 }
4137 
4138 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
4139 {
4140 	struct bm_io_work *work = container_of(w, struct bm_io_work, w);
4141 	int rv = -EIO;
4142 
4143 	D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
4144 
4145 	if (get_ldev(mdev)) {
4146 		drbd_bm_lock(mdev, work->why, work->flags);
4147 		rv = work->io_fn(mdev);
4148 		drbd_bm_unlock(mdev);
4149 		put_ldev(mdev);
4150 	}
4151 
4152 	clear_bit(BITMAP_IO, &mdev->flags);
4153 	smp_mb__after_clear_bit();
4154 	wake_up(&mdev->misc_wait);
4155 
4156 	if (work->done)
4157 		work->done(mdev, rv);
4158 
4159 	clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
4160 	work->why = NULL;
4161 	work->flags = 0;
4162 
4163 	return 1;
4164 }
4165 
4166 void drbd_ldev_destroy(struct drbd_conf *mdev)
4167 {
4168 	lc_destroy(mdev->resync);
4169 	mdev->resync = NULL;
4170 	lc_destroy(mdev->act_log);
4171 	mdev->act_log = NULL;
4172 	__no_warn(local,
4173 		drbd_free_bc(mdev->ldev);
4174 		mdev->ldev = NULL;);
4175 
4176 	if (mdev->md_io_tmpp) {
4177 		__free_page(mdev->md_io_tmpp);
4178 		mdev->md_io_tmpp = NULL;
4179 	}
4180 	clear_bit(GO_DISKLESS, &mdev->flags);
4181 }
4182 
4183 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused)
4184 {
4185 	D_ASSERT(mdev->state.disk == D_FAILED);
4186 	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
4187 	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
4188 	 * the protected members anymore, though, so once put_ldev reaches zero
4189 	 * again, it will be safe to free them. */
4190 	drbd_force_state(mdev, NS(disk, D_DISKLESS));
4191 	return 1;
4192 }
4193 
4194 void drbd_go_diskless(struct drbd_conf *mdev)
4195 {
4196 	D_ASSERT(mdev->state.disk == D_FAILED);
4197 	if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
4198 		drbd_queue_work(&mdev->data.work, &mdev->go_diskless);
4199 }
4200 
4201 /**
4202  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
4203  * @mdev:	DRBD device.
4204  * @io_fn:	IO callback to be called when bitmap IO is possible
4205  * @done:	callback to be called after the bitmap IO was performed
4206  * @why:	Descriptive text of the reason for doing the IO
4207  *
4208  * While IO on the bitmap happens we freeze application IO thus we ensure
4209  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
4210  * called from worker context. It MUST NOT be used while a previous such
4211  * work is still pending!
4212  */
4213 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
4214 			  int (*io_fn)(struct drbd_conf *),
4215 			  void (*done)(struct drbd_conf *, int),
4216 			  char *why, enum bm_flag flags)
4217 {
4218 	D_ASSERT(current == mdev->worker.task);
4219 
4220 	D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
4221 	D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
4222 	D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
4223 	if (mdev->bm_io_work.why)
4224 		dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
4225 			why, mdev->bm_io_work.why);
4226 
4227 	mdev->bm_io_work.io_fn = io_fn;
4228 	mdev->bm_io_work.done = done;
4229 	mdev->bm_io_work.why = why;
4230 	mdev->bm_io_work.flags = flags;
4231 
4232 	spin_lock_irq(&mdev->req_lock);
4233 	set_bit(BITMAP_IO, &mdev->flags);
4234 	if (atomic_read(&mdev->ap_bio_cnt) == 0) {
4235 		if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
4236 			drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
4237 	}
4238 	spin_unlock_irq(&mdev->req_lock);
4239 }
4240 
4241 /**
4242  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
4243  * @mdev:	DRBD device.
4244  * @io_fn:	IO callback to be called when bitmap IO is possible
4245  * @why:	Descriptive text of the reason for doing the IO
4246  *
4247  * freezes application IO while that the actual IO operations runs. This
4248  * functions MAY NOT be called from worker context.
4249  */
4250 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
4251 		char *why, enum bm_flag flags)
4252 {
4253 	int rv;
4254 
4255 	D_ASSERT(current != mdev->worker.task);
4256 
4257 	if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
4258 		drbd_suspend_io(mdev);
4259 
4260 	drbd_bm_lock(mdev, why, flags);
4261 	rv = io_fn(mdev);
4262 	drbd_bm_unlock(mdev);
4263 
4264 	if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
4265 		drbd_resume_io(mdev);
4266 
4267 	return rv;
4268 }
4269 
4270 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
4271 {
4272 	if ((mdev->ldev->md.flags & flag) != flag) {
4273 		drbd_md_mark_dirty(mdev);
4274 		mdev->ldev->md.flags |= flag;
4275 	}
4276 }
4277 
4278 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
4279 {
4280 	if ((mdev->ldev->md.flags & flag) != 0) {
4281 		drbd_md_mark_dirty(mdev);
4282 		mdev->ldev->md.flags &= ~flag;
4283 	}
4284 }
4285 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
4286 {
4287 	return (bdev->md.flags & flag) != 0;
4288 }
4289 
4290 static void md_sync_timer_fn(unsigned long data)
4291 {
4292 	struct drbd_conf *mdev = (struct drbd_conf *) data;
4293 
4294 	drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
4295 }
4296 
4297 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
4298 {
4299 	dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
4300 #ifdef DEBUG
4301 	dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
4302 		mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
4303 #endif
4304 	drbd_md_sync(mdev);
4305 	return 1;
4306 }
4307 
4308 #ifdef CONFIG_DRBD_FAULT_INJECTION
4309 /* Fault insertion support including random number generator shamelessly
4310  * stolen from kernel/rcutorture.c */
4311 struct fault_random_state {
4312 	unsigned long state;
4313 	unsigned long count;
4314 };
4315 
4316 #define FAULT_RANDOM_MULT 39916801  /* prime */
4317 #define FAULT_RANDOM_ADD	479001701 /* prime */
4318 #define FAULT_RANDOM_REFRESH 10000
4319 
4320 /*
4321  * Crude but fast random-number generator.  Uses a linear congruential
4322  * generator, with occasional help from get_random_bytes().
4323  */
4324 static unsigned long
4325 _drbd_fault_random(struct fault_random_state *rsp)
4326 {
4327 	long refresh;
4328 
4329 	if (!rsp->count--) {
4330 		get_random_bytes(&refresh, sizeof(refresh));
4331 		rsp->state += refresh;
4332 		rsp->count = FAULT_RANDOM_REFRESH;
4333 	}
4334 	rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
4335 	return swahw32(rsp->state);
4336 }
4337 
4338 static char *
4339 _drbd_fault_str(unsigned int type) {
4340 	static char *_faults[] = {
4341 		[DRBD_FAULT_MD_WR] = "Meta-data write",
4342 		[DRBD_FAULT_MD_RD] = "Meta-data read",
4343 		[DRBD_FAULT_RS_WR] = "Resync write",
4344 		[DRBD_FAULT_RS_RD] = "Resync read",
4345 		[DRBD_FAULT_DT_WR] = "Data write",
4346 		[DRBD_FAULT_DT_RD] = "Data read",
4347 		[DRBD_FAULT_DT_RA] = "Data read ahead",
4348 		[DRBD_FAULT_BM_ALLOC] = "BM allocation",
4349 		[DRBD_FAULT_AL_EE] = "EE allocation",
4350 		[DRBD_FAULT_RECEIVE] = "receive data corruption",
4351 	};
4352 
4353 	return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
4354 }
4355 
4356 unsigned int
4357 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
4358 {
4359 	static struct fault_random_state rrs = {0, 0};
4360 
4361 	unsigned int ret = (
4362 		(fault_devs == 0 ||
4363 			((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
4364 		(((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
4365 
4366 	if (ret) {
4367 		fault_count++;
4368 
4369 		if (__ratelimit(&drbd_ratelimit_state))
4370 			dev_warn(DEV, "***Simulating %s failure\n",
4371 				_drbd_fault_str(type));
4372 	}
4373 
4374 	return ret;
4375 }
4376 #endif
4377 
4378 const char *drbd_buildtag(void)
4379 {
4380 	/* DRBD built from external sources has here a reference to the
4381 	   git hash of the source code. */
4382 
4383 	static char buildtag[38] = "\0uilt-in";
4384 
4385 	if (buildtag[0] == 0) {
4386 #ifdef MODULE
4387 		sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
4388 #else
4389 		buildtag[0] = 'b';
4390 #endif
4391 	}
4392 
4393 	return buildtag;
4394 }
4395 
4396 module_init(drbd_init)
4397 module_exit(drbd_cleanup)
4398 
4399 EXPORT_SYMBOL(drbd_conn_str);
4400 EXPORT_SYMBOL(drbd_role_str);
4401 EXPORT_SYMBOL(drbd_disk_str);
4402 EXPORT_SYMBOL(drbd_set_st_err_str);
4403