xref: /openbmc/linux/drivers/block/drbd/drbd_main.c (revision 95e9fd10)
1 /*
2    drbd.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12 
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17 
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22 
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 
27  */
28 
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48 
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52 
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56 
57 #include "drbd_vli.h"
58 
59 struct after_state_chg_work {
60 	struct drbd_work w;
61 	union drbd_state os;
62 	union drbd_state ns;
63 	enum chg_state_flags flags;
64 	struct completion *done;
65 };
66 
67 static DEFINE_MUTEX(drbd_main_mutex);
68 int drbdd_init(struct drbd_thread *);
69 int drbd_worker(struct drbd_thread *);
70 int drbd_asender(struct drbd_thread *);
71 
72 int drbd_init(void);
73 static int drbd_open(struct block_device *bdev, fmode_t mode);
74 static int drbd_release(struct gendisk *gd, fmode_t mode);
75 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
76 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
77 			   union drbd_state ns, enum chg_state_flags flags);
78 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
79 static void md_sync_timer_fn(unsigned long data);
80 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
81 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused);
82 static void _tl_clear(struct drbd_conf *mdev);
83 
84 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
85 	      "Lars Ellenberg <lars@linbit.com>");
86 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
87 MODULE_VERSION(REL_VERSION);
88 MODULE_LICENSE("GPL");
89 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices ("
90 		 __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
91 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
92 
93 #include <linux/moduleparam.h>
94 /* allow_open_on_secondary */
95 MODULE_PARM_DESC(allow_oos, "DONT USE!");
96 /* thanks to these macros, if compiled into the kernel (not-module),
97  * this becomes the boot parameter drbd.minor_count */
98 module_param(minor_count, uint, 0444);
99 module_param(disable_sendpage, bool, 0644);
100 module_param(allow_oos, bool, 0);
101 module_param(cn_idx, uint, 0444);
102 module_param(proc_details, int, 0644);
103 
104 #ifdef CONFIG_DRBD_FAULT_INJECTION
105 int enable_faults;
106 int fault_rate;
107 static int fault_count;
108 int fault_devs;
109 /* bitmap of enabled faults */
110 module_param(enable_faults, int, 0664);
111 /* fault rate % value - applies to all enabled faults */
112 module_param(fault_rate, int, 0664);
113 /* count of faults inserted */
114 module_param(fault_count, int, 0664);
115 /* bitmap of devices to insert faults on */
116 module_param(fault_devs, int, 0644);
117 #endif
118 
119 /* module parameter, defined */
120 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
121 bool disable_sendpage;
122 bool allow_oos;
123 unsigned int cn_idx = CN_IDX_DRBD;
124 int proc_details;       /* Detail level in proc drbd*/
125 
126 /* Module parameter for setting the user mode helper program
127  * to run. Default is /sbin/drbdadm */
128 char usermode_helper[80] = "/sbin/drbdadm";
129 
130 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
131 
132 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
133  * as member "struct gendisk *vdisk;"
134  */
135 struct drbd_conf **minor_table;
136 
137 struct kmem_cache *drbd_request_cache;
138 struct kmem_cache *drbd_ee_cache;	/* epoch entries */
139 struct kmem_cache *drbd_bm_ext_cache;	/* bitmap extents */
140 struct kmem_cache *drbd_al_ext_cache;	/* activity log extents */
141 mempool_t *drbd_request_mempool;
142 mempool_t *drbd_ee_mempool;
143 mempool_t *drbd_md_io_page_pool;
144 struct bio_set *drbd_md_io_bio_set;
145 
146 /* I do not use a standard mempool, because:
147    1) I want to hand out the pre-allocated objects first.
148    2) I want to be able to interrupt sleeping allocation with a signal.
149    Note: This is a single linked list, the next pointer is the private
150 	 member of struct page.
151  */
152 struct page *drbd_pp_pool;
153 spinlock_t   drbd_pp_lock;
154 int          drbd_pp_vacant;
155 wait_queue_head_t drbd_pp_wait;
156 
157 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
158 
159 static const struct block_device_operations drbd_ops = {
160 	.owner =   THIS_MODULE,
161 	.open =    drbd_open,
162 	.release = drbd_release,
163 };
164 
165 static void bio_destructor_drbd(struct bio *bio)
166 {
167 	bio_free(bio, drbd_md_io_bio_set);
168 }
169 
170 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
171 {
172 	struct bio *bio;
173 
174 	if (!drbd_md_io_bio_set)
175 		return bio_alloc(gfp_mask, 1);
176 
177 	bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
178 	if (!bio)
179 		return NULL;
180 	bio->bi_destructor = bio_destructor_drbd;
181 	return bio;
182 }
183 
184 #ifdef __CHECKER__
185 /* When checking with sparse, and this is an inline function, sparse will
186    give tons of false positives. When this is a real functions sparse works.
187  */
188 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
189 {
190 	int io_allowed;
191 
192 	atomic_inc(&mdev->local_cnt);
193 	io_allowed = (mdev->state.disk >= mins);
194 	if (!io_allowed) {
195 		if (atomic_dec_and_test(&mdev->local_cnt))
196 			wake_up(&mdev->misc_wait);
197 	}
198 	return io_allowed;
199 }
200 
201 #endif
202 
203 /**
204  * DOC: The transfer log
205  *
206  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
207  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
208  * of the list. There is always at least one &struct drbd_tl_epoch object.
209  *
210  * Each &struct drbd_tl_epoch has a circular double linked list of requests
211  * attached.
212  */
213 static int tl_init(struct drbd_conf *mdev)
214 {
215 	struct drbd_tl_epoch *b;
216 
217 	/* during device minor initialization, we may well use GFP_KERNEL */
218 	b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
219 	if (!b)
220 		return 0;
221 	INIT_LIST_HEAD(&b->requests);
222 	INIT_LIST_HEAD(&b->w.list);
223 	b->next = NULL;
224 	b->br_number = 4711;
225 	b->n_writes = 0;
226 	b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
227 
228 	mdev->oldest_tle = b;
229 	mdev->newest_tle = b;
230 	INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
231 	INIT_LIST_HEAD(&mdev->barrier_acked_requests);
232 
233 	mdev->tl_hash = NULL;
234 	mdev->tl_hash_s = 0;
235 
236 	return 1;
237 }
238 
239 static void tl_cleanup(struct drbd_conf *mdev)
240 {
241 	D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
242 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
243 	kfree(mdev->oldest_tle);
244 	mdev->oldest_tle = NULL;
245 	kfree(mdev->unused_spare_tle);
246 	mdev->unused_spare_tle = NULL;
247 	kfree(mdev->tl_hash);
248 	mdev->tl_hash = NULL;
249 	mdev->tl_hash_s = 0;
250 }
251 
252 /**
253  * _tl_add_barrier() - Adds a barrier to the transfer log
254  * @mdev:	DRBD device.
255  * @new:	Barrier to be added before the current head of the TL.
256  *
257  * The caller must hold the req_lock.
258  */
259 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
260 {
261 	struct drbd_tl_epoch *newest_before;
262 
263 	INIT_LIST_HEAD(&new->requests);
264 	INIT_LIST_HEAD(&new->w.list);
265 	new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
266 	new->next = NULL;
267 	new->n_writes = 0;
268 
269 	newest_before = mdev->newest_tle;
270 	new->br_number = newest_before->br_number+1;
271 	if (mdev->newest_tle != new) {
272 		mdev->newest_tle->next = new;
273 		mdev->newest_tle = new;
274 	}
275 }
276 
277 /**
278  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
279  * @mdev:	DRBD device.
280  * @barrier_nr:	Expected identifier of the DRBD write barrier packet.
281  * @set_size:	Expected number of requests before that barrier.
282  *
283  * In case the passed barrier_nr or set_size does not match the oldest
284  * &struct drbd_tl_epoch objects this function will cause a termination
285  * of the connection.
286  */
287 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
288 		       unsigned int set_size)
289 {
290 	struct drbd_tl_epoch *b, *nob; /* next old barrier */
291 	struct list_head *le, *tle;
292 	struct drbd_request *r;
293 
294 	spin_lock_irq(&mdev->req_lock);
295 
296 	b = mdev->oldest_tle;
297 
298 	/* first some paranoia code */
299 	if (b == NULL) {
300 		dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
301 			barrier_nr);
302 		goto bail;
303 	}
304 	if (b->br_number != barrier_nr) {
305 		dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
306 			barrier_nr, b->br_number);
307 		goto bail;
308 	}
309 	if (b->n_writes != set_size) {
310 		dev_err(DEV, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
311 			barrier_nr, set_size, b->n_writes);
312 		goto bail;
313 	}
314 
315 	/* Clean up list of requests processed during current epoch */
316 	list_for_each_safe(le, tle, &b->requests) {
317 		r = list_entry(le, struct drbd_request, tl_requests);
318 		_req_mod(r, barrier_acked);
319 	}
320 	/* There could be requests on the list waiting for completion
321 	   of the write to the local disk. To avoid corruptions of
322 	   slab's data structures we have to remove the lists head.
323 
324 	   Also there could have been a barrier ack out of sequence, overtaking
325 	   the write acks - which would be a bug and violating write ordering.
326 	   To not deadlock in case we lose connection while such requests are
327 	   still pending, we need some way to find them for the
328 	   _req_mode(connection_lost_while_pending).
329 
330 	   These have been list_move'd to the out_of_sequence_requests list in
331 	   _req_mod(, barrier_acked) above.
332 	   */
333 	list_splice_init(&b->requests, &mdev->barrier_acked_requests);
334 
335 	nob = b->next;
336 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
337 		_tl_add_barrier(mdev, b);
338 		if (nob)
339 			mdev->oldest_tle = nob;
340 		/* if nob == NULL b was the only barrier, and becomes the new
341 		   barrier. Therefore mdev->oldest_tle points already to b */
342 	} else {
343 		D_ASSERT(nob != NULL);
344 		mdev->oldest_tle = nob;
345 		kfree(b);
346 	}
347 
348 	spin_unlock_irq(&mdev->req_lock);
349 	dec_ap_pending(mdev);
350 
351 	return;
352 
353 bail:
354 	spin_unlock_irq(&mdev->req_lock);
355 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
356 }
357 
358 
359 /**
360  * _tl_restart() - Walks the transfer log, and applies an action to all requests
361  * @mdev:	DRBD device.
362  * @what:       The action/event to perform with all request objects
363  *
364  * @what might be one of connection_lost_while_pending, resend, fail_frozen_disk_io,
365  * restart_frozen_disk_io.
366  */
367 static void _tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
368 {
369 	struct drbd_tl_epoch *b, *tmp, **pn;
370 	struct list_head *le, *tle, carry_reads;
371 	struct drbd_request *req;
372 	int rv, n_writes, n_reads;
373 
374 	b = mdev->oldest_tle;
375 	pn = &mdev->oldest_tle;
376 	while (b) {
377 		n_writes = 0;
378 		n_reads = 0;
379 		INIT_LIST_HEAD(&carry_reads);
380 		list_for_each_safe(le, tle, &b->requests) {
381 			req = list_entry(le, struct drbd_request, tl_requests);
382 			rv = _req_mod(req, what);
383 
384 			n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
385 			n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
386 		}
387 		tmp = b->next;
388 
389 		if (n_writes) {
390 			if (what == resend) {
391 				b->n_writes = n_writes;
392 				if (b->w.cb == NULL) {
393 					b->w.cb = w_send_barrier;
394 					inc_ap_pending(mdev);
395 					set_bit(CREATE_BARRIER, &mdev->flags);
396 				}
397 
398 				drbd_queue_work(&mdev->data.work, &b->w);
399 			}
400 			pn = &b->next;
401 		} else {
402 			if (n_reads)
403 				list_add(&carry_reads, &b->requests);
404 			/* there could still be requests on that ring list,
405 			 * in case local io is still pending */
406 			list_del(&b->requests);
407 
408 			/* dec_ap_pending corresponding to queue_barrier.
409 			 * the newest barrier may not have been queued yet,
410 			 * in which case w.cb is still NULL. */
411 			if (b->w.cb != NULL)
412 				dec_ap_pending(mdev);
413 
414 			if (b == mdev->newest_tle) {
415 				/* recycle, but reinit! */
416 				D_ASSERT(tmp == NULL);
417 				INIT_LIST_HEAD(&b->requests);
418 				list_splice(&carry_reads, &b->requests);
419 				INIT_LIST_HEAD(&b->w.list);
420 				b->w.cb = NULL;
421 				b->br_number = net_random();
422 				b->n_writes = 0;
423 
424 				*pn = b;
425 				break;
426 			}
427 			*pn = tmp;
428 			kfree(b);
429 		}
430 		b = tmp;
431 		list_splice(&carry_reads, &b->requests);
432 	}
433 
434 	/* Actions operating on the disk state, also want to work on
435 	   requests that got barrier acked. */
436 
437 	list_for_each_safe(le, tle, &mdev->barrier_acked_requests) {
438 		req = list_entry(le, struct drbd_request, tl_requests);
439 		_req_mod(req, what);
440 	}
441 }
442 
443 
444 /**
445  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
446  * @mdev:	DRBD device.
447  *
448  * This is called after the connection to the peer was lost. The storage covered
449  * by the requests on the transfer gets marked as our of sync. Called from the
450  * receiver thread and the worker thread.
451  */
452 void tl_clear(struct drbd_conf *mdev)
453 {
454 	spin_lock_irq(&mdev->req_lock);
455 	_tl_clear(mdev);
456 	spin_unlock_irq(&mdev->req_lock);
457 }
458 
459 static void _tl_clear(struct drbd_conf *mdev)
460 {
461 	struct list_head *le, *tle;
462 	struct drbd_request *r;
463 
464 	_tl_restart(mdev, connection_lost_while_pending);
465 
466 	/* we expect this list to be empty. */
467 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
468 
469 	/* but just in case, clean it up anyways! */
470 	list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
471 		r = list_entry(le, struct drbd_request, tl_requests);
472 		/* It would be nice to complete outside of spinlock.
473 		 * But this is easier for now. */
474 		_req_mod(r, connection_lost_while_pending);
475 	}
476 
477 	/* ensure bit indicating barrier is required is clear */
478 	clear_bit(CREATE_BARRIER, &mdev->flags);
479 
480 	memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
481 
482 }
483 
484 void tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
485 {
486 	spin_lock_irq(&mdev->req_lock);
487 	_tl_restart(mdev, what);
488 	spin_unlock_irq(&mdev->req_lock);
489 }
490 
491 /**
492  * tl_abort_disk_io() - Abort disk I/O for all requests for a certain mdev in the TL
493  * @mdev:	DRBD device.
494  */
495 void tl_abort_disk_io(struct drbd_conf *mdev)
496 {
497 	struct drbd_tl_epoch *b;
498 	struct list_head *le, *tle;
499 	struct drbd_request *req;
500 
501 	spin_lock_irq(&mdev->req_lock);
502 	b = mdev->oldest_tle;
503 	while (b) {
504 		list_for_each_safe(le, tle, &b->requests) {
505 			req = list_entry(le, struct drbd_request, tl_requests);
506 			if (!(req->rq_state & RQ_LOCAL_PENDING))
507 				continue;
508 			_req_mod(req, abort_disk_io);
509 		}
510 		b = b->next;
511 	}
512 
513 	list_for_each_safe(le, tle, &mdev->barrier_acked_requests) {
514 		req = list_entry(le, struct drbd_request, tl_requests);
515 		if (!(req->rq_state & RQ_LOCAL_PENDING))
516 			continue;
517 		_req_mod(req, abort_disk_io);
518 	}
519 
520 	spin_unlock_irq(&mdev->req_lock);
521 }
522 
523 /**
524  * cl_wide_st_chg() - true if the state change is a cluster wide one
525  * @mdev:	DRBD device.
526  * @os:		old (current) state.
527  * @ns:		new (wanted) state.
528  */
529 static int cl_wide_st_chg(struct drbd_conf *mdev,
530 			  union drbd_state os, union drbd_state ns)
531 {
532 	return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
533 		 ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
534 		  (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
535 		  (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
536 		  (os.disk != D_FAILED && ns.disk == D_FAILED))) ||
537 		(os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
538 		(os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
539 }
540 
541 enum drbd_state_rv
542 drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
543 		  union drbd_state mask, union drbd_state val)
544 {
545 	unsigned long flags;
546 	union drbd_state os, ns;
547 	enum drbd_state_rv rv;
548 
549 	spin_lock_irqsave(&mdev->req_lock, flags);
550 	os = mdev->state;
551 	ns.i = (os.i & ~mask.i) | val.i;
552 	rv = _drbd_set_state(mdev, ns, f, NULL);
553 	ns = mdev->state;
554 	spin_unlock_irqrestore(&mdev->req_lock, flags);
555 
556 	return rv;
557 }
558 
559 /**
560  * drbd_force_state() - Impose a change which happens outside our control on our state
561  * @mdev:	DRBD device.
562  * @mask:	mask of state bits to change.
563  * @val:	value of new state bits.
564  */
565 void drbd_force_state(struct drbd_conf *mdev,
566 	union drbd_state mask, union drbd_state val)
567 {
568 	drbd_change_state(mdev, CS_HARD, mask, val);
569 }
570 
571 static enum drbd_state_rv is_valid_state(struct drbd_conf *, union drbd_state);
572 static enum drbd_state_rv is_valid_state_transition(struct drbd_conf *,
573 						    union drbd_state,
574 						    union drbd_state);
575 enum sanitize_state_warnings {
576 	NO_WARNING,
577 	ABORTED_ONLINE_VERIFY,
578 	ABORTED_RESYNC,
579 	CONNECTION_LOST_NEGOTIATING,
580 	IMPLICITLY_UPGRADED_DISK,
581 	IMPLICITLY_UPGRADED_PDSK,
582 };
583 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
584 				       union drbd_state ns, enum sanitize_state_warnings *warn);
585 int drbd_send_state_req(struct drbd_conf *,
586 			union drbd_state, union drbd_state);
587 
588 static enum drbd_state_rv
589 _req_st_cond(struct drbd_conf *mdev, union drbd_state mask,
590 	     union drbd_state val)
591 {
592 	union drbd_state os, ns;
593 	unsigned long flags;
594 	enum drbd_state_rv rv;
595 
596 	if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
597 		return SS_CW_SUCCESS;
598 
599 	if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
600 		return SS_CW_FAILED_BY_PEER;
601 
602 	rv = 0;
603 	spin_lock_irqsave(&mdev->req_lock, flags);
604 	os = mdev->state;
605 	ns.i = (os.i & ~mask.i) | val.i;
606 	ns = sanitize_state(mdev, os, ns, NULL);
607 
608 	if (!cl_wide_st_chg(mdev, os, ns))
609 		rv = SS_CW_NO_NEED;
610 	if (!rv) {
611 		rv = is_valid_state(mdev, ns);
612 		if (rv == SS_SUCCESS) {
613 			rv = is_valid_state_transition(mdev, ns, os);
614 			if (rv == SS_SUCCESS)
615 				rv = SS_UNKNOWN_ERROR; /* cont waiting, otherwise fail. */
616 		}
617 	}
618 	spin_unlock_irqrestore(&mdev->req_lock, flags);
619 
620 	return rv;
621 }
622 
623 /**
624  * drbd_req_state() - Perform an eventually cluster wide state change
625  * @mdev:	DRBD device.
626  * @mask:	mask of state bits to change.
627  * @val:	value of new state bits.
628  * @f:		flags
629  *
630  * Should not be called directly, use drbd_request_state() or
631  * _drbd_request_state().
632  */
633 static enum drbd_state_rv
634 drbd_req_state(struct drbd_conf *mdev, union drbd_state mask,
635 	       union drbd_state val, enum chg_state_flags f)
636 {
637 	struct completion done;
638 	unsigned long flags;
639 	union drbd_state os, ns;
640 	enum drbd_state_rv rv;
641 
642 	init_completion(&done);
643 
644 	if (f & CS_SERIALIZE)
645 		mutex_lock(&mdev->state_mutex);
646 
647 	spin_lock_irqsave(&mdev->req_lock, flags);
648 	os = mdev->state;
649 	ns.i = (os.i & ~mask.i) | val.i;
650 	ns = sanitize_state(mdev, os, ns, NULL);
651 
652 	if (cl_wide_st_chg(mdev, os, ns)) {
653 		rv = is_valid_state(mdev, ns);
654 		if (rv == SS_SUCCESS)
655 			rv = is_valid_state_transition(mdev, ns, os);
656 		spin_unlock_irqrestore(&mdev->req_lock, flags);
657 
658 		if (rv < SS_SUCCESS) {
659 			if (f & CS_VERBOSE)
660 				print_st_err(mdev, os, ns, rv);
661 			goto abort;
662 		}
663 
664 		drbd_state_lock(mdev);
665 		if (!drbd_send_state_req(mdev, mask, val)) {
666 			drbd_state_unlock(mdev);
667 			rv = SS_CW_FAILED_BY_PEER;
668 			if (f & CS_VERBOSE)
669 				print_st_err(mdev, os, ns, rv);
670 			goto abort;
671 		}
672 
673 		wait_event(mdev->state_wait,
674 			(rv = _req_st_cond(mdev, mask, val)));
675 
676 		if (rv < SS_SUCCESS) {
677 			drbd_state_unlock(mdev);
678 			if (f & CS_VERBOSE)
679 				print_st_err(mdev, os, ns, rv);
680 			goto abort;
681 		}
682 		spin_lock_irqsave(&mdev->req_lock, flags);
683 		os = mdev->state;
684 		ns.i = (os.i & ~mask.i) | val.i;
685 		rv = _drbd_set_state(mdev, ns, f, &done);
686 		drbd_state_unlock(mdev);
687 	} else {
688 		rv = _drbd_set_state(mdev, ns, f, &done);
689 	}
690 
691 	spin_unlock_irqrestore(&mdev->req_lock, flags);
692 
693 	if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
694 		D_ASSERT(current != mdev->worker.task);
695 		wait_for_completion(&done);
696 	}
697 
698 abort:
699 	if (f & CS_SERIALIZE)
700 		mutex_unlock(&mdev->state_mutex);
701 
702 	return rv;
703 }
704 
705 /**
706  * _drbd_request_state() - Request a state change (with flags)
707  * @mdev:	DRBD device.
708  * @mask:	mask of state bits to change.
709  * @val:	value of new state bits.
710  * @f:		flags
711  *
712  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
713  * flag, or when logging of failed state change requests is not desired.
714  */
715 enum drbd_state_rv
716 _drbd_request_state(struct drbd_conf *mdev, union drbd_state mask,
717 		    union drbd_state val, enum chg_state_flags f)
718 {
719 	enum drbd_state_rv rv;
720 
721 	wait_event(mdev->state_wait,
722 		   (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
723 
724 	return rv;
725 }
726 
727 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
728 {
729 	dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
730 	    name,
731 	    drbd_conn_str(ns.conn),
732 	    drbd_role_str(ns.role),
733 	    drbd_role_str(ns.peer),
734 	    drbd_disk_str(ns.disk),
735 	    drbd_disk_str(ns.pdsk),
736 	    is_susp(ns) ? 's' : 'r',
737 	    ns.aftr_isp ? 'a' : '-',
738 	    ns.peer_isp ? 'p' : '-',
739 	    ns.user_isp ? 'u' : '-'
740 	    );
741 }
742 
743 void print_st_err(struct drbd_conf *mdev, union drbd_state os,
744 	          union drbd_state ns, enum drbd_state_rv err)
745 {
746 	if (err == SS_IN_TRANSIENT_STATE)
747 		return;
748 	dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
749 	print_st(mdev, " state", os);
750 	print_st(mdev, "wanted", ns);
751 }
752 
753 
754 /**
755  * is_valid_state() - Returns an SS_ error code if ns is not valid
756  * @mdev:	DRBD device.
757  * @ns:		State to consider.
758  */
759 static enum drbd_state_rv
760 is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
761 {
762 	/* See drbd_state_sw_errors in drbd_strings.c */
763 
764 	enum drbd_fencing_p fp;
765 	enum drbd_state_rv rv = SS_SUCCESS;
766 
767 	fp = FP_DONT_CARE;
768 	if (get_ldev(mdev)) {
769 		fp = mdev->ldev->dc.fencing;
770 		put_ldev(mdev);
771 	}
772 
773 	if (get_net_conf(mdev)) {
774 		if (!mdev->net_conf->two_primaries &&
775 		    ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
776 			rv = SS_TWO_PRIMARIES;
777 		put_net_conf(mdev);
778 	}
779 
780 	if (rv <= 0)
781 		/* already found a reason to abort */;
782 	else if (ns.role == R_SECONDARY && mdev->open_cnt)
783 		rv = SS_DEVICE_IN_USE;
784 
785 	else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
786 		rv = SS_NO_UP_TO_DATE_DISK;
787 
788 	else if (fp >= FP_RESOURCE &&
789 		 ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
790 		rv = SS_PRIMARY_NOP;
791 
792 	else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
793 		rv = SS_NO_UP_TO_DATE_DISK;
794 
795 	else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
796 		rv = SS_NO_LOCAL_DISK;
797 
798 	else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
799 		rv = SS_NO_REMOTE_DISK;
800 
801 	else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
802 		rv = SS_NO_UP_TO_DATE_DISK;
803 
804 	else if ((ns.conn == C_CONNECTED ||
805 		  ns.conn == C_WF_BITMAP_S ||
806 		  ns.conn == C_SYNC_SOURCE ||
807 		  ns.conn == C_PAUSED_SYNC_S) &&
808 		  ns.disk == D_OUTDATED)
809 		rv = SS_CONNECTED_OUTDATES;
810 
811 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
812 		 (mdev->sync_conf.verify_alg[0] == 0))
813 		rv = SS_NO_VERIFY_ALG;
814 
815 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
816 		  mdev->agreed_pro_version < 88)
817 		rv = SS_NOT_SUPPORTED;
818 
819 	else if (ns.conn >= C_CONNECTED && ns.pdsk == D_UNKNOWN)
820 		rv = SS_CONNECTED_OUTDATES;
821 
822 	return rv;
823 }
824 
825 /**
826  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
827  * @mdev:	DRBD device.
828  * @ns:		new state.
829  * @os:		old state.
830  */
831 static enum drbd_state_rv
832 is_valid_state_transition(struct drbd_conf *mdev, union drbd_state ns,
833 			  union drbd_state os)
834 {
835 	enum drbd_state_rv rv = SS_SUCCESS;
836 
837 	if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
838 	    os.conn > C_CONNECTED)
839 		rv = SS_RESYNC_RUNNING;
840 
841 	if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
842 		rv = SS_ALREADY_STANDALONE;
843 
844 	if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
845 		rv = SS_IS_DISKLESS;
846 
847 	if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
848 		rv = SS_NO_NET_CONFIG;
849 
850 	if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
851 		rv = SS_LOWER_THAN_OUTDATED;
852 
853 	if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
854 		rv = SS_IN_TRANSIENT_STATE;
855 
856 	if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
857 		rv = SS_IN_TRANSIENT_STATE;
858 
859 	/* While establishing a connection only allow cstate to change.
860 	   Delay/refuse role changes, detach attach etc... */
861 	if (test_bit(STATE_SENT, &mdev->flags) &&
862 	    !(os.conn == C_WF_REPORT_PARAMS ||
863 	      (ns.conn == C_WF_REPORT_PARAMS && os.conn == C_WF_CONNECTION)))
864 		rv = SS_IN_TRANSIENT_STATE;
865 
866 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
867 		rv = SS_NEED_CONNECTION;
868 
869 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
870 	    ns.conn != os.conn && os.conn > C_CONNECTED)
871 		rv = SS_RESYNC_RUNNING;
872 
873 	if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
874 	    os.conn < C_CONNECTED)
875 		rv = SS_NEED_CONNECTION;
876 
877 	if ((ns.conn == C_SYNC_TARGET || ns.conn == C_SYNC_SOURCE)
878 	    && os.conn < C_WF_REPORT_PARAMS)
879 		rv = SS_NEED_CONNECTION; /* No NetworkFailure -> SyncTarget etc... */
880 
881 	return rv;
882 }
883 
884 static void print_sanitize_warnings(struct drbd_conf *mdev, enum sanitize_state_warnings warn)
885 {
886 	static const char *msg_table[] = {
887 		[NO_WARNING] = "",
888 		[ABORTED_ONLINE_VERIFY] = "Online-verify aborted.",
889 		[ABORTED_RESYNC] = "Resync aborted.",
890 		[CONNECTION_LOST_NEGOTIATING] = "Connection lost while negotiating, no data!",
891 		[IMPLICITLY_UPGRADED_DISK] = "Implicitly upgraded disk",
892 		[IMPLICITLY_UPGRADED_PDSK] = "Implicitly upgraded pdsk",
893 	};
894 
895 	if (warn != NO_WARNING)
896 		dev_warn(DEV, "%s\n", msg_table[warn]);
897 }
898 
899 /**
900  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
901  * @mdev:	DRBD device.
902  * @os:		old state.
903  * @ns:		new state.
904  * @warn_sync_abort:
905  *
906  * When we loose connection, we have to set the state of the peers disk (pdsk)
907  * to D_UNKNOWN. This rule and many more along those lines are in this function.
908  */
909 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
910 				       union drbd_state ns, enum sanitize_state_warnings *warn)
911 {
912 	enum drbd_fencing_p fp;
913 	enum drbd_disk_state disk_min, disk_max, pdsk_min, pdsk_max;
914 
915 	if (warn)
916 		*warn = NO_WARNING;
917 
918 	fp = FP_DONT_CARE;
919 	if (get_ldev(mdev)) {
920 		fp = mdev->ldev->dc.fencing;
921 		put_ldev(mdev);
922 	}
923 
924 	/* Disallow Network errors to configure a device's network part */
925 	if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
926 	    os.conn <= C_DISCONNECTING)
927 		ns.conn = os.conn;
928 
929 	/* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow.
930 	 * If you try to go into some Sync* state, that shall fail (elsewhere). */
931 	if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
932 	    ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING && ns.conn <= C_CONNECTED)
933 		ns.conn = os.conn;
934 
935 	/* we cannot fail (again) if we already detached */
936 	if (ns.disk == D_FAILED && os.disk == D_DISKLESS)
937 		ns.disk = D_DISKLESS;
938 
939 	/* After C_DISCONNECTING only C_STANDALONE may follow */
940 	if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
941 		ns.conn = os.conn;
942 
943 	if (ns.conn < C_CONNECTED) {
944 		ns.peer_isp = 0;
945 		ns.peer = R_UNKNOWN;
946 		if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
947 			ns.pdsk = D_UNKNOWN;
948 	}
949 
950 	/* Clear the aftr_isp when becoming unconfigured */
951 	if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
952 		ns.aftr_isp = 0;
953 
954 	/* Abort resync if a disk fails/detaches */
955 	if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
956 	    (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
957 		if (warn)
958 			*warn =	os.conn == C_VERIFY_S || os.conn == C_VERIFY_T ?
959 				ABORTED_ONLINE_VERIFY : ABORTED_RESYNC;
960 		ns.conn = C_CONNECTED;
961 	}
962 
963 	/* Connection breaks down before we finished "Negotiating" */
964 	if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
965 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
966 		if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
967 			ns.disk = mdev->new_state_tmp.disk;
968 			ns.pdsk = mdev->new_state_tmp.pdsk;
969 		} else {
970 			if (warn)
971 				*warn = CONNECTION_LOST_NEGOTIATING;
972 			ns.disk = D_DISKLESS;
973 			ns.pdsk = D_UNKNOWN;
974 		}
975 		put_ldev(mdev);
976 	}
977 
978 	/* D_CONSISTENT and D_OUTDATED vanish when we get connected */
979 	if (ns.conn >= C_CONNECTED && ns.conn < C_AHEAD) {
980 		if (ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED)
981 			ns.disk = D_UP_TO_DATE;
982 		if (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)
983 			ns.pdsk = D_UP_TO_DATE;
984 	}
985 
986 	/* Implications of the connection stat on the disk states */
987 	disk_min = D_DISKLESS;
988 	disk_max = D_UP_TO_DATE;
989 	pdsk_min = D_INCONSISTENT;
990 	pdsk_max = D_UNKNOWN;
991 	switch ((enum drbd_conns)ns.conn) {
992 	case C_WF_BITMAP_T:
993 	case C_PAUSED_SYNC_T:
994 	case C_STARTING_SYNC_T:
995 	case C_WF_SYNC_UUID:
996 	case C_BEHIND:
997 		disk_min = D_INCONSISTENT;
998 		disk_max = D_OUTDATED;
999 		pdsk_min = D_UP_TO_DATE;
1000 		pdsk_max = D_UP_TO_DATE;
1001 		break;
1002 	case C_VERIFY_S:
1003 	case C_VERIFY_T:
1004 		disk_min = D_UP_TO_DATE;
1005 		disk_max = D_UP_TO_DATE;
1006 		pdsk_min = D_UP_TO_DATE;
1007 		pdsk_max = D_UP_TO_DATE;
1008 		break;
1009 	case C_CONNECTED:
1010 		disk_min = D_DISKLESS;
1011 		disk_max = D_UP_TO_DATE;
1012 		pdsk_min = D_DISKLESS;
1013 		pdsk_max = D_UP_TO_DATE;
1014 		break;
1015 	case C_WF_BITMAP_S:
1016 	case C_PAUSED_SYNC_S:
1017 	case C_STARTING_SYNC_S:
1018 	case C_AHEAD:
1019 		disk_min = D_UP_TO_DATE;
1020 		disk_max = D_UP_TO_DATE;
1021 		pdsk_min = D_INCONSISTENT;
1022 		pdsk_max = D_CONSISTENT; /* D_OUTDATED would be nice. But explicit outdate necessary*/
1023 		break;
1024 	case C_SYNC_TARGET:
1025 		disk_min = D_INCONSISTENT;
1026 		disk_max = D_INCONSISTENT;
1027 		pdsk_min = D_UP_TO_DATE;
1028 		pdsk_max = D_UP_TO_DATE;
1029 		break;
1030 	case C_SYNC_SOURCE:
1031 		disk_min = D_UP_TO_DATE;
1032 		disk_max = D_UP_TO_DATE;
1033 		pdsk_min = D_INCONSISTENT;
1034 		pdsk_max = D_INCONSISTENT;
1035 		break;
1036 	case C_STANDALONE:
1037 	case C_DISCONNECTING:
1038 	case C_UNCONNECTED:
1039 	case C_TIMEOUT:
1040 	case C_BROKEN_PIPE:
1041 	case C_NETWORK_FAILURE:
1042 	case C_PROTOCOL_ERROR:
1043 	case C_TEAR_DOWN:
1044 	case C_WF_CONNECTION:
1045 	case C_WF_REPORT_PARAMS:
1046 	case C_MASK:
1047 		break;
1048 	}
1049 	if (ns.disk > disk_max)
1050 		ns.disk = disk_max;
1051 
1052 	if (ns.disk < disk_min) {
1053 		if (warn)
1054 			*warn = IMPLICITLY_UPGRADED_DISK;
1055 		ns.disk = disk_min;
1056 	}
1057 	if (ns.pdsk > pdsk_max)
1058 		ns.pdsk = pdsk_max;
1059 
1060 	if (ns.pdsk < pdsk_min) {
1061 		if (warn)
1062 			*warn = IMPLICITLY_UPGRADED_PDSK;
1063 		ns.pdsk = pdsk_min;
1064 	}
1065 
1066 	if (fp == FP_STONITH &&
1067 	    (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
1068 	    !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
1069 		ns.susp_fen = 1; /* Suspend IO while fence-peer handler runs (peer lost) */
1070 
1071 	if (mdev->sync_conf.on_no_data == OND_SUSPEND_IO &&
1072 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE) &&
1073 	    !(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE))
1074 		ns.susp_nod = 1; /* Suspend IO while no data available (no accessible data available) */
1075 
1076 	if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
1077 		if (ns.conn == C_SYNC_SOURCE)
1078 			ns.conn = C_PAUSED_SYNC_S;
1079 		if (ns.conn == C_SYNC_TARGET)
1080 			ns.conn = C_PAUSED_SYNC_T;
1081 	} else {
1082 		if (ns.conn == C_PAUSED_SYNC_S)
1083 			ns.conn = C_SYNC_SOURCE;
1084 		if (ns.conn == C_PAUSED_SYNC_T)
1085 			ns.conn = C_SYNC_TARGET;
1086 	}
1087 
1088 	return ns;
1089 }
1090 
1091 /* helper for __drbd_set_state */
1092 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
1093 {
1094 	if (mdev->agreed_pro_version < 90)
1095 		mdev->ov_start_sector = 0;
1096 	mdev->rs_total = drbd_bm_bits(mdev);
1097 	mdev->ov_position = 0;
1098 	if (cs == C_VERIFY_T) {
1099 		/* starting online verify from an arbitrary position
1100 		 * does not fit well into the existing protocol.
1101 		 * on C_VERIFY_T, we initialize ov_left and friends
1102 		 * implicitly in receive_DataRequest once the
1103 		 * first P_OV_REQUEST is received */
1104 		mdev->ov_start_sector = ~(sector_t)0;
1105 	} else {
1106 		unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
1107 		if (bit >= mdev->rs_total) {
1108 			mdev->ov_start_sector =
1109 				BM_BIT_TO_SECT(mdev->rs_total - 1);
1110 			mdev->rs_total = 1;
1111 		} else
1112 			mdev->rs_total -= bit;
1113 		mdev->ov_position = mdev->ov_start_sector;
1114 	}
1115 	mdev->ov_left = mdev->rs_total;
1116 }
1117 
1118 static void drbd_resume_al(struct drbd_conf *mdev)
1119 {
1120 	if (test_and_clear_bit(AL_SUSPENDED, &mdev->flags))
1121 		dev_info(DEV, "Resumed AL updates\n");
1122 }
1123 
1124 /**
1125  * __drbd_set_state() - Set a new DRBD state
1126  * @mdev:	DRBD device.
1127  * @ns:		new state.
1128  * @flags:	Flags
1129  * @done:	Optional completion, that will get completed after the after_state_ch() finished
1130  *
1131  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
1132  */
1133 enum drbd_state_rv
1134 __drbd_set_state(struct drbd_conf *mdev, union drbd_state ns,
1135 	         enum chg_state_flags flags, struct completion *done)
1136 {
1137 	union drbd_state os;
1138 	enum drbd_state_rv rv = SS_SUCCESS;
1139 	enum sanitize_state_warnings ssw;
1140 	struct after_state_chg_work *ascw;
1141 
1142 	os = mdev->state;
1143 
1144 	ns = sanitize_state(mdev, os, ns, &ssw);
1145 
1146 	if (ns.i == os.i)
1147 		return SS_NOTHING_TO_DO;
1148 
1149 	if (!(flags & CS_HARD)) {
1150 		/*  pre-state-change checks ; only look at ns  */
1151 		/* See drbd_state_sw_errors in drbd_strings.c */
1152 
1153 		rv = is_valid_state(mdev, ns);
1154 		if (rv < SS_SUCCESS) {
1155 			/* If the old state was illegal as well, then let
1156 			   this happen...*/
1157 
1158 			if (is_valid_state(mdev, os) == rv)
1159 				rv = is_valid_state_transition(mdev, ns, os);
1160 		} else
1161 			rv = is_valid_state_transition(mdev, ns, os);
1162 	}
1163 
1164 	if (rv < SS_SUCCESS) {
1165 		if (flags & CS_VERBOSE)
1166 			print_st_err(mdev, os, ns, rv);
1167 		return rv;
1168 	}
1169 
1170 	print_sanitize_warnings(mdev, ssw);
1171 
1172 	{
1173 	char *pbp, pb[300];
1174 	pbp = pb;
1175 	*pbp = 0;
1176 	if (ns.role != os.role)
1177 		pbp += sprintf(pbp, "role( %s -> %s ) ",
1178 			       drbd_role_str(os.role),
1179 			       drbd_role_str(ns.role));
1180 	if (ns.peer != os.peer)
1181 		pbp += sprintf(pbp, "peer( %s -> %s ) ",
1182 			       drbd_role_str(os.peer),
1183 			       drbd_role_str(ns.peer));
1184 	if (ns.conn != os.conn)
1185 		pbp += sprintf(pbp, "conn( %s -> %s ) ",
1186 			       drbd_conn_str(os.conn),
1187 			       drbd_conn_str(ns.conn));
1188 	if (ns.disk != os.disk)
1189 		pbp += sprintf(pbp, "disk( %s -> %s ) ",
1190 			       drbd_disk_str(os.disk),
1191 			       drbd_disk_str(ns.disk));
1192 	if (ns.pdsk != os.pdsk)
1193 		pbp += sprintf(pbp, "pdsk( %s -> %s ) ",
1194 			       drbd_disk_str(os.pdsk),
1195 			       drbd_disk_str(ns.pdsk));
1196 	if (is_susp(ns) != is_susp(os))
1197 		pbp += sprintf(pbp, "susp( %d -> %d ) ",
1198 			       is_susp(os),
1199 			       is_susp(ns));
1200 	if (ns.aftr_isp != os.aftr_isp)
1201 		pbp += sprintf(pbp, "aftr_isp( %d -> %d ) ",
1202 			       os.aftr_isp,
1203 			       ns.aftr_isp);
1204 	if (ns.peer_isp != os.peer_isp)
1205 		pbp += sprintf(pbp, "peer_isp( %d -> %d ) ",
1206 			       os.peer_isp,
1207 			       ns.peer_isp);
1208 	if (ns.user_isp != os.user_isp)
1209 		pbp += sprintf(pbp, "user_isp( %d -> %d ) ",
1210 			       os.user_isp,
1211 			       ns.user_isp);
1212 	dev_info(DEV, "%s\n", pb);
1213 	}
1214 
1215 	/* solve the race between becoming unconfigured,
1216 	 * worker doing the cleanup, and
1217 	 * admin reconfiguring us:
1218 	 * on (re)configure, first set CONFIG_PENDING,
1219 	 * then wait for a potentially exiting worker,
1220 	 * start the worker, and schedule one no_op.
1221 	 * then proceed with configuration.
1222 	 */
1223 	if (ns.disk == D_DISKLESS &&
1224 	    ns.conn == C_STANDALONE &&
1225 	    ns.role == R_SECONDARY &&
1226 	    !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
1227 		set_bit(DEVICE_DYING, &mdev->flags);
1228 
1229 	/* if we are going -> D_FAILED or D_DISKLESS, grab one extra reference
1230 	 * on the ldev here, to be sure the transition -> D_DISKLESS resp.
1231 	 * drbd_ldev_destroy() won't happen before our corresponding
1232 	 * after_state_ch works run, where we put_ldev again. */
1233 	if ((os.disk != D_FAILED && ns.disk == D_FAILED) ||
1234 	    (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))
1235 		atomic_inc(&mdev->local_cnt);
1236 
1237 	mdev->state = ns;
1238 
1239 	if (os.disk == D_ATTACHING && ns.disk >= D_NEGOTIATING)
1240 		drbd_print_uuids(mdev, "attached to UUIDs");
1241 
1242 	wake_up(&mdev->misc_wait);
1243 	wake_up(&mdev->state_wait);
1244 
1245 	/* aborted verify run. log the last position */
1246 	if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1247 	    ns.conn < C_CONNECTED) {
1248 		mdev->ov_start_sector =
1249 			BM_BIT_TO_SECT(drbd_bm_bits(mdev) - mdev->ov_left);
1250 		dev_info(DEV, "Online Verify reached sector %llu\n",
1251 			(unsigned long long)mdev->ov_start_sector);
1252 	}
1253 
1254 	if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1255 	    (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1256 		dev_info(DEV, "Syncer continues.\n");
1257 		mdev->rs_paused += (long)jiffies
1258 				  -(long)mdev->rs_mark_time[mdev->rs_last_mark];
1259 		if (ns.conn == C_SYNC_TARGET)
1260 			mod_timer(&mdev->resync_timer, jiffies);
1261 	}
1262 
1263 	if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1264 	    (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1265 		dev_info(DEV, "Resync suspended\n");
1266 		mdev->rs_mark_time[mdev->rs_last_mark] = jiffies;
1267 	}
1268 
1269 	if (os.conn == C_CONNECTED &&
1270 	    (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1271 		unsigned long now = jiffies;
1272 		int i;
1273 
1274 		set_ov_position(mdev, ns.conn);
1275 		mdev->rs_start = now;
1276 		mdev->rs_last_events = 0;
1277 		mdev->rs_last_sect_ev = 0;
1278 		mdev->ov_last_oos_size = 0;
1279 		mdev->ov_last_oos_start = 0;
1280 
1281 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1282 			mdev->rs_mark_left[i] = mdev->ov_left;
1283 			mdev->rs_mark_time[i] = now;
1284 		}
1285 
1286 		drbd_rs_controller_reset(mdev);
1287 
1288 		if (ns.conn == C_VERIFY_S) {
1289 			dev_info(DEV, "Starting Online Verify from sector %llu\n",
1290 					(unsigned long long)mdev->ov_position);
1291 			mod_timer(&mdev->resync_timer, jiffies);
1292 		}
1293 	}
1294 
1295 	if (get_ldev(mdev)) {
1296 		u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1297 						 MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1298 						 MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1299 
1300 		if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1301 			mdf |= MDF_CRASHED_PRIMARY;
1302 		if (mdev->state.role == R_PRIMARY ||
1303 		    (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1304 			mdf |= MDF_PRIMARY_IND;
1305 		if (mdev->state.conn > C_WF_REPORT_PARAMS)
1306 			mdf |= MDF_CONNECTED_IND;
1307 		if (mdev->state.disk > D_INCONSISTENT)
1308 			mdf |= MDF_CONSISTENT;
1309 		if (mdev->state.disk > D_OUTDATED)
1310 			mdf |= MDF_WAS_UP_TO_DATE;
1311 		if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1312 			mdf |= MDF_PEER_OUT_DATED;
1313 		if (mdf != mdev->ldev->md.flags) {
1314 			mdev->ldev->md.flags = mdf;
1315 			drbd_md_mark_dirty(mdev);
1316 		}
1317 		if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1318 			drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1319 		put_ldev(mdev);
1320 	}
1321 
1322 	/* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1323 	if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1324 	    os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1325 		set_bit(CONSIDER_RESYNC, &mdev->flags);
1326 
1327 	/* Receiver should clean up itself */
1328 	if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1329 		drbd_thread_stop_nowait(&mdev->receiver);
1330 
1331 	/* Now the receiver finished cleaning up itself, it should die */
1332 	if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1333 		drbd_thread_stop_nowait(&mdev->receiver);
1334 
1335 	/* Upon network failure, we need to restart the receiver. */
1336 	if (os.conn > C_WF_CONNECTION &&
1337 	    ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1338 		drbd_thread_restart_nowait(&mdev->receiver);
1339 
1340 	/* Resume AL writing if we get a connection */
1341 	if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1342 		drbd_resume_al(mdev);
1343 
1344 	/* remember last connect and attach times so request_timer_fn() won't
1345 	 * kill newly established sessions while we are still trying to thaw
1346 	 * previously frozen IO */
1347 	if (os.conn != C_WF_REPORT_PARAMS && ns.conn == C_WF_REPORT_PARAMS)
1348 		mdev->last_reconnect_jif = jiffies;
1349 	if ((os.disk == D_ATTACHING || os.disk == D_NEGOTIATING) &&
1350 	    ns.disk > D_NEGOTIATING)
1351 		mdev->last_reattach_jif = jiffies;
1352 
1353 	ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1354 	if (ascw) {
1355 		ascw->os = os;
1356 		ascw->ns = ns;
1357 		ascw->flags = flags;
1358 		ascw->w.cb = w_after_state_ch;
1359 		ascw->done = done;
1360 		drbd_queue_work(&mdev->data.work, &ascw->w);
1361 	} else {
1362 		dev_warn(DEV, "Could not kmalloc an ascw\n");
1363 	}
1364 
1365 	return rv;
1366 }
1367 
1368 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1369 {
1370 	struct after_state_chg_work *ascw =
1371 		container_of(w, struct after_state_chg_work, w);
1372 	after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1373 	if (ascw->flags & CS_WAIT_COMPLETE) {
1374 		D_ASSERT(ascw->done != NULL);
1375 		complete(ascw->done);
1376 	}
1377 	kfree(ascw);
1378 
1379 	return 1;
1380 }
1381 
1382 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1383 {
1384 	if (rv) {
1385 		dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1386 		_drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1387 		return;
1388 	}
1389 
1390 	switch (mdev->state.conn) {
1391 	case C_STARTING_SYNC_T:
1392 		_drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1393 		break;
1394 	case C_STARTING_SYNC_S:
1395 		drbd_start_resync(mdev, C_SYNC_SOURCE);
1396 		break;
1397 	}
1398 }
1399 
1400 int drbd_bitmap_io_from_worker(struct drbd_conf *mdev,
1401 		int (*io_fn)(struct drbd_conf *),
1402 		char *why, enum bm_flag flags)
1403 {
1404 	int rv;
1405 
1406 	D_ASSERT(current == mdev->worker.task);
1407 
1408 	/* open coded non-blocking drbd_suspend_io(mdev); */
1409 	set_bit(SUSPEND_IO, &mdev->flags);
1410 
1411 	drbd_bm_lock(mdev, why, flags);
1412 	rv = io_fn(mdev);
1413 	drbd_bm_unlock(mdev);
1414 
1415 	drbd_resume_io(mdev);
1416 
1417 	return rv;
1418 }
1419 
1420 /**
1421  * after_state_ch() - Perform after state change actions that may sleep
1422  * @mdev:	DRBD device.
1423  * @os:		old state.
1424  * @ns:		new state.
1425  * @flags:	Flags
1426  */
1427 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1428 			   union drbd_state ns, enum chg_state_flags flags)
1429 {
1430 	enum drbd_fencing_p fp;
1431 	enum drbd_req_event what = nothing;
1432 	union drbd_state nsm = (union drbd_state){ .i = -1 };
1433 
1434 	if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1435 		clear_bit(CRASHED_PRIMARY, &mdev->flags);
1436 		if (mdev->p_uuid)
1437 			mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1438 	}
1439 
1440 	fp = FP_DONT_CARE;
1441 	if (get_ldev(mdev)) {
1442 		fp = mdev->ldev->dc.fencing;
1443 		put_ldev(mdev);
1444 	}
1445 
1446 	/* Inform userspace about the change... */
1447 	drbd_bcast_state(mdev, ns);
1448 
1449 	if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1450 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1451 		drbd_khelper(mdev, "pri-on-incon-degr");
1452 
1453 	/* Here we have the actions that are performed after a
1454 	   state change. This function might sleep */
1455 
1456 	if (os.disk <= D_NEGOTIATING && ns.disk > D_NEGOTIATING)
1457 		mod_timer(&mdev->request_timer, jiffies + HZ);
1458 
1459 	nsm.i = -1;
1460 	if (ns.susp_nod) {
1461 		if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1462 			what = resend;
1463 
1464 		if ((os.disk == D_ATTACHING || os.disk == D_NEGOTIATING) &&
1465 		    ns.disk > D_NEGOTIATING)
1466 			what = restart_frozen_disk_io;
1467 
1468 		if (what != nothing)
1469 			nsm.susp_nod = 0;
1470 	}
1471 
1472 	if (ns.susp_fen) {
1473 		/* case1: The outdate peer handler is successful: */
1474 		if (os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) {
1475 			if (test_bit(NEW_CUR_UUID, &mdev->flags)) {
1476 				drbd_uuid_new_current(mdev);
1477 				clear_bit(NEW_CUR_UUID, &mdev->flags);
1478 			}
1479 			spin_lock_irq(&mdev->req_lock);
1480 			_tl_clear(mdev);
1481 			_drbd_set_state(_NS(mdev, susp_fen, 0), CS_VERBOSE, NULL);
1482 			spin_unlock_irq(&mdev->req_lock);
1483 		}
1484 		/* case2: The connection was established again: */
1485 		if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1486 			clear_bit(NEW_CUR_UUID, &mdev->flags);
1487 			what = resend;
1488 			nsm.susp_fen = 0;
1489 		}
1490 	}
1491 
1492 	if (what != nothing) {
1493 		spin_lock_irq(&mdev->req_lock);
1494 		_tl_restart(mdev, what);
1495 		nsm.i &= mdev->state.i;
1496 		_drbd_set_state(mdev, nsm, CS_VERBOSE, NULL);
1497 		spin_unlock_irq(&mdev->req_lock);
1498 	}
1499 
1500 	/* Became sync source.  With protocol >= 96, we still need to send out
1501 	 * the sync uuid now. Need to do that before any drbd_send_state, or
1502 	 * the other side may go "paused sync" before receiving the sync uuids,
1503 	 * which is unexpected. */
1504 	if ((os.conn != C_SYNC_SOURCE && os.conn != C_PAUSED_SYNC_S) &&
1505 	    (ns.conn == C_SYNC_SOURCE || ns.conn == C_PAUSED_SYNC_S) &&
1506 	    mdev->agreed_pro_version >= 96 && get_ldev(mdev)) {
1507 		drbd_gen_and_send_sync_uuid(mdev);
1508 		put_ldev(mdev);
1509 	}
1510 
1511 	/* Do not change the order of the if above and the two below... */
1512 	if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1513 		/* we probably will start a resync soon.
1514 		 * make sure those things are properly reset. */
1515 		mdev->rs_total = 0;
1516 		mdev->rs_failed = 0;
1517 		atomic_set(&mdev->rs_pending_cnt, 0);
1518 		drbd_rs_cancel_all(mdev);
1519 
1520 		drbd_send_uuids(mdev);
1521 		drbd_send_state(mdev, ns);
1522 	}
1523 	/* No point in queuing send_bitmap if we don't have a connection
1524 	 * anymore, so check also the _current_ state, not only the new state
1525 	 * at the time this work was queued. */
1526 	if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S &&
1527 	    mdev->state.conn == C_WF_BITMAP_S)
1528 		drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL,
1529 				"send_bitmap (WFBitMapS)",
1530 				BM_LOCKED_TEST_ALLOWED);
1531 
1532 	/* Lost contact to peer's copy of the data */
1533 	if ((os.pdsk >= D_INCONSISTENT &&
1534 	     os.pdsk != D_UNKNOWN &&
1535 	     os.pdsk != D_OUTDATED)
1536 	&&  (ns.pdsk < D_INCONSISTENT ||
1537 	     ns.pdsk == D_UNKNOWN ||
1538 	     ns.pdsk == D_OUTDATED)) {
1539 		if (get_ldev(mdev)) {
1540 			if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1541 			    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1542 				if (is_susp(mdev->state)) {
1543 					set_bit(NEW_CUR_UUID, &mdev->flags);
1544 				} else {
1545 					drbd_uuid_new_current(mdev);
1546 					drbd_send_uuids(mdev);
1547 				}
1548 			}
1549 			put_ldev(mdev);
1550 		}
1551 	}
1552 
1553 	if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1554 		if (os.peer == R_SECONDARY && ns.peer == R_PRIMARY &&
1555 		    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1556 			drbd_uuid_new_current(mdev);
1557 			drbd_send_uuids(mdev);
1558 		}
1559 		/* D_DISKLESS Peer becomes secondary */
1560 		if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1561 			/* We may still be Primary ourselves.
1562 			 * No harm done if the bitmap still changes,
1563 			 * redirtied pages will follow later. */
1564 			drbd_bitmap_io_from_worker(mdev, &drbd_bm_write,
1565 				"demote diskless peer", BM_LOCKED_SET_ALLOWED);
1566 		put_ldev(mdev);
1567 	}
1568 
1569 	/* Write out all changed bits on demote.
1570 	 * Though, no need to da that just yet
1571 	 * if there is a resync going on still */
1572 	if (os.role == R_PRIMARY && ns.role == R_SECONDARY &&
1573 		mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
1574 		/* No changes to the bitmap expected this time, so assert that,
1575 		 * even though no harm was done if it did change. */
1576 		drbd_bitmap_io_from_worker(mdev, &drbd_bm_write,
1577 				"demote", BM_LOCKED_TEST_ALLOWED);
1578 		put_ldev(mdev);
1579 	}
1580 
1581 	/* Last part of the attaching process ... */
1582 	if (ns.conn >= C_CONNECTED &&
1583 	    os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1584 		drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1585 		drbd_send_uuids(mdev);
1586 		drbd_send_state(mdev, ns);
1587 	}
1588 
1589 	/* We want to pause/continue resync, tell peer. */
1590 	if (ns.conn >= C_CONNECTED &&
1591 	     ((os.aftr_isp != ns.aftr_isp) ||
1592 	      (os.user_isp != ns.user_isp)))
1593 		drbd_send_state(mdev, ns);
1594 
1595 	/* In case one of the isp bits got set, suspend other devices. */
1596 	if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1597 	    (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1598 		suspend_other_sg(mdev);
1599 
1600 	/* Make sure the peer gets informed about eventual state
1601 	   changes (ISP bits) while we were in WFReportParams. */
1602 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1603 		drbd_send_state(mdev, ns);
1604 
1605 	if (os.conn != C_AHEAD && ns.conn == C_AHEAD)
1606 		drbd_send_state(mdev, ns);
1607 
1608 	/* We are in the progress to start a full sync... */
1609 	if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1610 	    (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1611 		/* no other bitmap changes expected during this phase */
1612 		drbd_queue_bitmap_io(mdev,
1613 			&drbd_bmio_set_n_write, &abw_start_sync,
1614 			"set_n_write from StartingSync", BM_LOCKED_TEST_ALLOWED);
1615 
1616 	/* We are invalidating our self... */
1617 	if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1618 	    os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1619 		/* other bitmap operation expected during this phase */
1620 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL,
1621 			"set_n_write from invalidate", BM_LOCKED_MASK);
1622 
1623 	/* first half of local IO error, failure to attach,
1624 	 * or administrative detach */
1625 	if (os.disk != D_FAILED && ns.disk == D_FAILED) {
1626 		enum drbd_io_error_p eh = EP_PASS_ON;
1627 		int was_io_error = 0;
1628 		/* corresponding get_ldev was in __drbd_set_state, to serialize
1629 		 * our cleanup here with the transition to D_DISKLESS.
1630 		 * But is is still not save to dreference ldev here, since
1631 		 * we might come from an failed Attach before ldev was set. */
1632 		if (mdev->ldev) {
1633 			eh = mdev->ldev->dc.on_io_error;
1634 			was_io_error = test_and_clear_bit(WAS_IO_ERROR, &mdev->flags);
1635 
1636 			if (was_io_error && eh == EP_CALL_HELPER)
1637 				drbd_khelper(mdev, "local-io-error");
1638 
1639 			/* Immediately allow completion of all application IO,
1640 			 * that waits for completion from the local disk,
1641 			 * if this was a force-detach due to disk_timeout
1642 			 * or administrator request (drbdsetup detach --force).
1643 			 * Do NOT abort otherwise.
1644 			 * Aborting local requests may cause serious problems,
1645 			 * if requests are completed to upper layers already,
1646 			 * and then later the already submitted local bio completes.
1647 			 * This can cause DMA into former bio pages that meanwhile
1648 			 * have been re-used for other things.
1649 			 * So aborting local requests may cause crashes,
1650 			 * or even worse, silent data corruption.
1651 			 */
1652 			if (test_and_clear_bit(FORCE_DETACH, &mdev->flags))
1653 				tl_abort_disk_io(mdev);
1654 
1655 			/* current state still has to be D_FAILED,
1656 			 * there is only one way out: to D_DISKLESS,
1657 			 * and that may only happen after our put_ldev below. */
1658 			if (mdev->state.disk != D_FAILED)
1659 				dev_err(DEV,
1660 					"ASSERT FAILED: disk is %s during detach\n",
1661 					drbd_disk_str(mdev->state.disk));
1662 
1663 			if (ns.conn >= C_CONNECTED)
1664 				drbd_send_state(mdev, ns);
1665 
1666 			drbd_rs_cancel_all(mdev);
1667 
1668 			/* In case we want to get something to stable storage still,
1669 			 * this may be the last chance.
1670 			 * Following put_ldev may transition to D_DISKLESS. */
1671 			drbd_md_sync(mdev);
1672 		}
1673 		put_ldev(mdev);
1674 	}
1675 
1676         /* second half of local IO error, failure to attach,
1677          * or administrative detach,
1678          * after local_cnt references have reached zero again */
1679         if (os.disk != D_DISKLESS && ns.disk == D_DISKLESS) {
1680                 /* We must still be diskless,
1681                  * re-attach has to be serialized with this! */
1682                 if (mdev->state.disk != D_DISKLESS)
1683                         dev_err(DEV,
1684                                 "ASSERT FAILED: disk is %s while going diskless\n",
1685                                 drbd_disk_str(mdev->state.disk));
1686 
1687 		if (ns.conn >= C_CONNECTED)
1688 			drbd_send_state(mdev, ns);
1689 
1690 		/* corresponding get_ldev in __drbd_set_state
1691 		 * this may finally trigger drbd_ldev_destroy. */
1692 		put_ldev(mdev);
1693 	}
1694 
1695 	/* Notify peer that I had a local IO error, and did not detached.. */
1696 	if (os.disk == D_UP_TO_DATE && ns.disk == D_INCONSISTENT && ns.conn >= C_CONNECTED)
1697 		drbd_send_state(mdev, ns);
1698 
1699 	/* Disks got bigger while they were detached */
1700 	if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1701 	    test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1702 		if (ns.conn == C_CONNECTED)
1703 			resync_after_online_grow(mdev);
1704 	}
1705 
1706 	/* A resync finished or aborted, wake paused devices... */
1707 	if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1708 	    (os.peer_isp && !ns.peer_isp) ||
1709 	    (os.user_isp && !ns.user_isp))
1710 		resume_next_sg(mdev);
1711 
1712 	/* sync target done with resync.  Explicitly notify peer, even though
1713 	 * it should (at least for non-empty resyncs) already know itself. */
1714 	if (os.disk < D_UP_TO_DATE && os.conn >= C_SYNC_SOURCE && ns.conn == C_CONNECTED)
1715 		drbd_send_state(mdev, ns);
1716 
1717 	/* Wake up role changes, that were delayed because of connection establishing */
1718 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn != C_WF_REPORT_PARAMS) {
1719 		clear_bit(STATE_SENT, &mdev->flags);
1720 		wake_up(&mdev->state_wait);
1721 	}
1722 
1723 	/* This triggers bitmap writeout of potentially still unwritten pages
1724 	 * if the resync finished cleanly, or aborted because of peer disk
1725 	 * failure, or because of connection loss.
1726 	 * For resync aborted because of local disk failure, we cannot do
1727 	 * any bitmap writeout anymore.
1728 	 * No harm done if some bits change during this phase.
1729 	 */
1730 	if (os.conn > C_CONNECTED && ns.conn <= C_CONNECTED && get_ldev(mdev)) {
1731 		drbd_queue_bitmap_io(mdev, &drbd_bm_write_copy_pages, NULL,
1732 			"write from resync_finished", BM_LOCKED_CHANGE_ALLOWED);
1733 		put_ldev(mdev);
1734 	}
1735 
1736 	/* free tl_hash if we Got thawed and are C_STANDALONE */
1737 	if (ns.conn == C_STANDALONE && !is_susp(ns) && mdev->tl_hash)
1738 		drbd_free_tl_hash(mdev);
1739 
1740 	/* Upon network connection, we need to start the receiver */
1741 	if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1742 		drbd_thread_start(&mdev->receiver);
1743 
1744 	/* Terminate worker thread if we are unconfigured - it will be
1745 	   restarted as needed... */
1746 	if (ns.disk == D_DISKLESS &&
1747 	    ns.conn == C_STANDALONE &&
1748 	    ns.role == R_SECONDARY) {
1749 		if (os.aftr_isp != ns.aftr_isp)
1750 			resume_next_sg(mdev);
1751 		/* set in __drbd_set_state, unless CONFIG_PENDING was set */
1752 		if (test_bit(DEVICE_DYING, &mdev->flags))
1753 			drbd_thread_stop_nowait(&mdev->worker);
1754 	}
1755 
1756 	drbd_md_sync(mdev);
1757 }
1758 
1759 
1760 static int drbd_thread_setup(void *arg)
1761 {
1762 	struct drbd_thread *thi = (struct drbd_thread *) arg;
1763 	struct drbd_conf *mdev = thi->mdev;
1764 	unsigned long flags;
1765 	int retval;
1766 
1767 restart:
1768 	retval = thi->function(thi);
1769 
1770 	spin_lock_irqsave(&thi->t_lock, flags);
1771 
1772 	/* if the receiver has been "Exiting", the last thing it did
1773 	 * was set the conn state to "StandAlone",
1774 	 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1775 	 * and receiver thread will be "started".
1776 	 * drbd_thread_start needs to set "Restarting" in that case.
1777 	 * t_state check and assignment needs to be within the same spinlock,
1778 	 * so either thread_start sees Exiting, and can remap to Restarting,
1779 	 * or thread_start see None, and can proceed as normal.
1780 	 */
1781 
1782 	if (thi->t_state == Restarting) {
1783 		dev_info(DEV, "Restarting %s\n", current->comm);
1784 		thi->t_state = Running;
1785 		spin_unlock_irqrestore(&thi->t_lock, flags);
1786 		goto restart;
1787 	}
1788 
1789 	thi->task = NULL;
1790 	thi->t_state = None;
1791 	smp_mb();
1792 	complete(&thi->stop);
1793 	spin_unlock_irqrestore(&thi->t_lock, flags);
1794 
1795 	dev_info(DEV, "Terminating %s\n", current->comm);
1796 
1797 	/* Release mod reference taken when thread was started */
1798 	module_put(THIS_MODULE);
1799 	return retval;
1800 }
1801 
1802 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1803 		      int (*func) (struct drbd_thread *))
1804 {
1805 	spin_lock_init(&thi->t_lock);
1806 	thi->task    = NULL;
1807 	thi->t_state = None;
1808 	thi->function = func;
1809 	thi->mdev = mdev;
1810 }
1811 
1812 int drbd_thread_start(struct drbd_thread *thi)
1813 {
1814 	struct drbd_conf *mdev = thi->mdev;
1815 	struct task_struct *nt;
1816 	unsigned long flags;
1817 
1818 	const char *me =
1819 		thi == &mdev->receiver ? "receiver" :
1820 		thi == &mdev->asender  ? "asender"  :
1821 		thi == &mdev->worker   ? "worker"   : "NONSENSE";
1822 
1823 	/* is used from state engine doing drbd_thread_stop_nowait,
1824 	 * while holding the req lock irqsave */
1825 	spin_lock_irqsave(&thi->t_lock, flags);
1826 
1827 	switch (thi->t_state) {
1828 	case None:
1829 		dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1830 				me, current->comm, current->pid);
1831 
1832 		/* Get ref on module for thread - this is released when thread exits */
1833 		if (!try_module_get(THIS_MODULE)) {
1834 			dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1835 			spin_unlock_irqrestore(&thi->t_lock, flags);
1836 			return false;
1837 		}
1838 
1839 		init_completion(&thi->stop);
1840 		D_ASSERT(thi->task == NULL);
1841 		thi->reset_cpu_mask = 1;
1842 		thi->t_state = Running;
1843 		spin_unlock_irqrestore(&thi->t_lock, flags);
1844 		flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1845 
1846 		nt = kthread_create(drbd_thread_setup, (void *) thi,
1847 				    "drbd%d_%s", mdev_to_minor(mdev), me);
1848 
1849 		if (IS_ERR(nt)) {
1850 			dev_err(DEV, "Couldn't start thread\n");
1851 
1852 			module_put(THIS_MODULE);
1853 			return false;
1854 		}
1855 		spin_lock_irqsave(&thi->t_lock, flags);
1856 		thi->task = nt;
1857 		thi->t_state = Running;
1858 		spin_unlock_irqrestore(&thi->t_lock, flags);
1859 		wake_up_process(nt);
1860 		break;
1861 	case Exiting:
1862 		thi->t_state = Restarting;
1863 		dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1864 				me, current->comm, current->pid);
1865 		/* fall through */
1866 	case Running:
1867 	case Restarting:
1868 	default:
1869 		spin_unlock_irqrestore(&thi->t_lock, flags);
1870 		break;
1871 	}
1872 
1873 	return true;
1874 }
1875 
1876 
1877 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1878 {
1879 	unsigned long flags;
1880 
1881 	enum drbd_thread_state ns = restart ? Restarting : Exiting;
1882 
1883 	/* may be called from state engine, holding the req lock irqsave */
1884 	spin_lock_irqsave(&thi->t_lock, flags);
1885 
1886 	if (thi->t_state == None) {
1887 		spin_unlock_irqrestore(&thi->t_lock, flags);
1888 		if (restart)
1889 			drbd_thread_start(thi);
1890 		return;
1891 	}
1892 
1893 	if (thi->t_state != ns) {
1894 		if (thi->task == NULL) {
1895 			spin_unlock_irqrestore(&thi->t_lock, flags);
1896 			return;
1897 		}
1898 
1899 		thi->t_state = ns;
1900 		smp_mb();
1901 		init_completion(&thi->stop);
1902 		if (thi->task != current)
1903 			force_sig(DRBD_SIGKILL, thi->task);
1904 
1905 	}
1906 
1907 	spin_unlock_irqrestore(&thi->t_lock, flags);
1908 
1909 	if (wait)
1910 		wait_for_completion(&thi->stop);
1911 }
1912 
1913 #ifdef CONFIG_SMP
1914 /**
1915  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1916  * @mdev:	DRBD device.
1917  *
1918  * Forces all threads of a device onto the same CPU. This is beneficial for
1919  * DRBD's performance. May be overwritten by user's configuration.
1920  */
1921 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1922 {
1923 	int ord, cpu;
1924 
1925 	/* user override. */
1926 	if (cpumask_weight(mdev->cpu_mask))
1927 		return;
1928 
1929 	ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1930 	for_each_online_cpu(cpu) {
1931 		if (ord-- == 0) {
1932 			cpumask_set_cpu(cpu, mdev->cpu_mask);
1933 			return;
1934 		}
1935 	}
1936 	/* should not be reached */
1937 	cpumask_setall(mdev->cpu_mask);
1938 }
1939 
1940 /**
1941  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1942  * @mdev:	DRBD device.
1943  *
1944  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1945  * prematurely.
1946  */
1947 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1948 {
1949 	struct task_struct *p = current;
1950 	struct drbd_thread *thi =
1951 		p == mdev->asender.task  ? &mdev->asender  :
1952 		p == mdev->receiver.task ? &mdev->receiver :
1953 		p == mdev->worker.task   ? &mdev->worker   :
1954 		NULL;
1955 	ERR_IF(thi == NULL)
1956 		return;
1957 	if (!thi->reset_cpu_mask)
1958 		return;
1959 	thi->reset_cpu_mask = 0;
1960 	set_cpus_allowed_ptr(p, mdev->cpu_mask);
1961 }
1962 #endif
1963 
1964 /* the appropriate socket mutex must be held already */
1965 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1966 			  enum drbd_packets cmd, struct p_header80 *h,
1967 			  size_t size, unsigned msg_flags)
1968 {
1969 	int sent, ok;
1970 
1971 	ERR_IF(!h) return false;
1972 	ERR_IF(!size) return false;
1973 
1974 	h->magic   = BE_DRBD_MAGIC;
1975 	h->command = cpu_to_be16(cmd);
1976 	h->length  = cpu_to_be16(size-sizeof(struct p_header80));
1977 
1978 	sent = drbd_send(mdev, sock, h, size, msg_flags);
1979 
1980 	ok = (sent == size);
1981 	if (!ok && !signal_pending(current))
1982 		dev_warn(DEV, "short sent %s size=%d sent=%d\n",
1983 		    cmdname(cmd), (int)size, sent);
1984 	return ok;
1985 }
1986 
1987 /* don't pass the socket. we may only look at it
1988  * when we hold the appropriate socket mutex.
1989  */
1990 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1991 		  enum drbd_packets cmd, struct p_header80 *h, size_t size)
1992 {
1993 	int ok = 0;
1994 	struct socket *sock;
1995 
1996 	if (use_data_socket) {
1997 		mutex_lock(&mdev->data.mutex);
1998 		sock = mdev->data.socket;
1999 	} else {
2000 		mutex_lock(&mdev->meta.mutex);
2001 		sock = mdev->meta.socket;
2002 	}
2003 
2004 	/* drbd_disconnect() could have called drbd_free_sock()
2005 	 * while we were waiting in down()... */
2006 	if (likely(sock != NULL))
2007 		ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
2008 
2009 	if (use_data_socket)
2010 		mutex_unlock(&mdev->data.mutex);
2011 	else
2012 		mutex_unlock(&mdev->meta.mutex);
2013 	return ok;
2014 }
2015 
2016 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
2017 		   size_t size)
2018 {
2019 	struct p_header80 h;
2020 	int ok;
2021 
2022 	h.magic   = BE_DRBD_MAGIC;
2023 	h.command = cpu_to_be16(cmd);
2024 	h.length  = cpu_to_be16(size);
2025 
2026 	if (!drbd_get_data_sock(mdev))
2027 		return 0;
2028 
2029 	ok = (sizeof(h) ==
2030 		drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
2031 	ok = ok && (size ==
2032 		drbd_send(mdev, mdev->data.socket, data, size, 0));
2033 
2034 	drbd_put_data_sock(mdev);
2035 
2036 	return ok;
2037 }
2038 
2039 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
2040 {
2041 	struct p_rs_param_95 *p;
2042 	struct socket *sock;
2043 	int size, rv;
2044 	const int apv = mdev->agreed_pro_version;
2045 
2046 	size = apv <= 87 ? sizeof(struct p_rs_param)
2047 		: apv == 88 ? sizeof(struct p_rs_param)
2048 			+ strlen(mdev->sync_conf.verify_alg) + 1
2049 		: apv <= 94 ? sizeof(struct p_rs_param_89)
2050 		: /* apv >= 95 */ sizeof(struct p_rs_param_95);
2051 
2052 	/* used from admin command context and receiver/worker context.
2053 	 * to avoid kmalloc, grab the socket right here,
2054 	 * then use the pre-allocated sbuf there */
2055 	mutex_lock(&mdev->data.mutex);
2056 	sock = mdev->data.socket;
2057 
2058 	if (likely(sock != NULL)) {
2059 		enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
2060 
2061 		p = &mdev->data.sbuf.rs_param_95;
2062 
2063 		/* initialize verify_alg and csums_alg */
2064 		memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2065 
2066 		p->rate = cpu_to_be32(sc->rate);
2067 		p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
2068 		p->c_delay_target = cpu_to_be32(sc->c_delay_target);
2069 		p->c_fill_target = cpu_to_be32(sc->c_fill_target);
2070 		p->c_max_rate = cpu_to_be32(sc->c_max_rate);
2071 
2072 		if (apv >= 88)
2073 			strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
2074 		if (apv >= 89)
2075 			strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
2076 
2077 		rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
2078 	} else
2079 		rv = 0; /* not ok */
2080 
2081 	mutex_unlock(&mdev->data.mutex);
2082 
2083 	return rv;
2084 }
2085 
2086 int drbd_send_protocol(struct drbd_conf *mdev)
2087 {
2088 	struct p_protocol *p;
2089 	int size, cf, rv;
2090 
2091 	size = sizeof(struct p_protocol);
2092 
2093 	if (mdev->agreed_pro_version >= 87)
2094 		size += strlen(mdev->net_conf->integrity_alg) + 1;
2095 
2096 	/* we must not recurse into our own queue,
2097 	 * as that is blocked during handshake */
2098 	p = kmalloc(size, GFP_NOIO);
2099 	if (p == NULL)
2100 		return 0;
2101 
2102 	p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
2103 	p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
2104 	p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
2105 	p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
2106 	p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
2107 
2108 	cf = 0;
2109 	if (mdev->net_conf->want_lose)
2110 		cf |= CF_WANT_LOSE;
2111 	if (mdev->net_conf->dry_run) {
2112 		if (mdev->agreed_pro_version >= 92)
2113 			cf |= CF_DRY_RUN;
2114 		else {
2115 			dev_err(DEV, "--dry-run is not supported by peer");
2116 			kfree(p);
2117 			return -1;
2118 		}
2119 	}
2120 	p->conn_flags    = cpu_to_be32(cf);
2121 
2122 	if (mdev->agreed_pro_version >= 87)
2123 		strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
2124 
2125 	rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
2126 			   (struct p_header80 *)p, size);
2127 	kfree(p);
2128 	return rv;
2129 }
2130 
2131 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
2132 {
2133 	struct p_uuids p;
2134 	int i;
2135 
2136 	if (!get_ldev_if_state(mdev, D_NEGOTIATING))
2137 		return 1;
2138 
2139 	for (i = UI_CURRENT; i < UI_SIZE; i++)
2140 		p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
2141 
2142 	mdev->comm_bm_set = drbd_bm_total_weight(mdev);
2143 	p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
2144 	uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
2145 	uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
2146 	uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
2147 	p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
2148 
2149 	put_ldev(mdev);
2150 
2151 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
2152 			     (struct p_header80 *)&p, sizeof(p));
2153 }
2154 
2155 int drbd_send_uuids(struct drbd_conf *mdev)
2156 {
2157 	return _drbd_send_uuids(mdev, 0);
2158 }
2159 
2160 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
2161 {
2162 	return _drbd_send_uuids(mdev, 8);
2163 }
2164 
2165 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
2166 {
2167 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
2168 		u64 *uuid = mdev->ldev->md.uuid;
2169 		dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
2170 		     text,
2171 		     (unsigned long long)uuid[UI_CURRENT],
2172 		     (unsigned long long)uuid[UI_BITMAP],
2173 		     (unsigned long long)uuid[UI_HISTORY_START],
2174 		     (unsigned long long)uuid[UI_HISTORY_END]);
2175 		put_ldev(mdev);
2176 	} else {
2177 		dev_info(DEV, "%s effective data uuid: %016llX\n",
2178 				text,
2179 				(unsigned long long)mdev->ed_uuid);
2180 	}
2181 }
2182 
2183 int drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
2184 {
2185 	struct p_rs_uuid p;
2186 	u64 uuid;
2187 
2188 	D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
2189 
2190 	uuid = mdev->ldev->md.uuid[UI_BITMAP];
2191 	if (uuid && uuid != UUID_JUST_CREATED)
2192 		uuid = uuid + UUID_NEW_BM_OFFSET;
2193 	else
2194 		get_random_bytes(&uuid, sizeof(u64));
2195 	drbd_uuid_set(mdev, UI_BITMAP, uuid);
2196 	drbd_print_uuids(mdev, "updated sync UUID");
2197 	drbd_md_sync(mdev);
2198 	p.uuid = cpu_to_be64(uuid);
2199 
2200 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
2201 			     (struct p_header80 *)&p, sizeof(p));
2202 }
2203 
2204 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
2205 {
2206 	struct p_sizes p;
2207 	sector_t d_size, u_size;
2208 	int q_order_type;
2209 	unsigned int max_bio_size;
2210 	int ok;
2211 
2212 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
2213 		D_ASSERT(mdev->ldev->backing_bdev);
2214 		d_size = drbd_get_max_capacity(mdev->ldev);
2215 		u_size = mdev->ldev->dc.disk_size;
2216 		q_order_type = drbd_queue_order_type(mdev);
2217 		max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
2218 		max_bio_size = min(max_bio_size, DRBD_MAX_BIO_SIZE);
2219 		put_ldev(mdev);
2220 	} else {
2221 		d_size = 0;
2222 		u_size = 0;
2223 		q_order_type = QUEUE_ORDERED_NONE;
2224 		max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
2225 	}
2226 
2227 	/* Never allow old drbd (up to 8.3.7) to see more than 32KiB */
2228 	if (mdev->agreed_pro_version <= 94)
2229 		max_bio_size = min(max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
2230 
2231 	p.d_size = cpu_to_be64(d_size);
2232 	p.u_size = cpu_to_be64(u_size);
2233 	p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
2234 	p.max_bio_size = cpu_to_be32(max_bio_size);
2235 	p.queue_order_type = cpu_to_be16(q_order_type);
2236 	p.dds_flags = cpu_to_be16(flags);
2237 
2238 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
2239 			   (struct p_header80 *)&p, sizeof(p));
2240 	return ok;
2241 }
2242 
2243 /**
2244  * drbd_send_current_state() - Sends the drbd state to the peer
2245  * @mdev:	DRBD device.
2246  */
2247 int drbd_send_current_state(struct drbd_conf *mdev)
2248 {
2249 	struct socket *sock;
2250 	struct p_state p;
2251 	int ok = 0;
2252 
2253 	/* Grab state lock so we wont send state if we're in the middle
2254 	 * of a cluster wide state change on another thread */
2255 	drbd_state_lock(mdev);
2256 
2257 	mutex_lock(&mdev->data.mutex);
2258 
2259 	p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
2260 	sock = mdev->data.socket;
2261 
2262 	if (likely(sock != NULL)) {
2263 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
2264 				    (struct p_header80 *)&p, sizeof(p), 0);
2265 	}
2266 
2267 	mutex_unlock(&mdev->data.mutex);
2268 
2269 	drbd_state_unlock(mdev);
2270 	return ok;
2271 }
2272 
2273 /**
2274  * drbd_send_state() - After a state change, sends the new state to the peer
2275  * @mdev:	DRBD device.
2276  * @state:	the state to send, not necessarily the current state.
2277  *
2278  * Each state change queues an "after_state_ch" work, which will eventually
2279  * send the resulting new state to the peer. If more state changes happen
2280  * between queuing and processing of the after_state_ch work, we still
2281  * want to send each intermediary state in the order it occurred.
2282  */
2283 int drbd_send_state(struct drbd_conf *mdev, union drbd_state state)
2284 {
2285 	struct socket *sock;
2286 	struct p_state p;
2287 	int ok = 0;
2288 
2289 	mutex_lock(&mdev->data.mutex);
2290 
2291 	p.state = cpu_to_be32(state.i);
2292 	sock = mdev->data.socket;
2293 
2294 	if (likely(sock != NULL)) {
2295 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
2296 				    (struct p_header80 *)&p, sizeof(p), 0);
2297 	}
2298 
2299 	mutex_unlock(&mdev->data.mutex);
2300 
2301 	return ok;
2302 }
2303 
2304 int drbd_send_state_req(struct drbd_conf *mdev,
2305 	union drbd_state mask, union drbd_state val)
2306 {
2307 	struct p_req_state p;
2308 
2309 	p.mask    = cpu_to_be32(mask.i);
2310 	p.val     = cpu_to_be32(val.i);
2311 
2312 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
2313 			     (struct p_header80 *)&p, sizeof(p));
2314 }
2315 
2316 int drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
2317 {
2318 	struct p_req_state_reply p;
2319 
2320 	p.retcode    = cpu_to_be32(retcode);
2321 
2322 	return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
2323 			     (struct p_header80 *)&p, sizeof(p));
2324 }
2325 
2326 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
2327 	struct p_compressed_bm *p,
2328 	struct bm_xfer_ctx *c)
2329 {
2330 	struct bitstream bs;
2331 	unsigned long plain_bits;
2332 	unsigned long tmp;
2333 	unsigned long rl;
2334 	unsigned len;
2335 	unsigned toggle;
2336 	int bits;
2337 
2338 	/* may we use this feature? */
2339 	if ((mdev->sync_conf.use_rle == 0) ||
2340 		(mdev->agreed_pro_version < 90))
2341 			return 0;
2342 
2343 	if (c->bit_offset >= c->bm_bits)
2344 		return 0; /* nothing to do. */
2345 
2346 	/* use at most thus many bytes */
2347 	bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
2348 	memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
2349 	/* plain bits covered in this code string */
2350 	plain_bits = 0;
2351 
2352 	/* p->encoding & 0x80 stores whether the first run length is set.
2353 	 * bit offset is implicit.
2354 	 * start with toggle == 2 to be able to tell the first iteration */
2355 	toggle = 2;
2356 
2357 	/* see how much plain bits we can stuff into one packet
2358 	 * using RLE and VLI. */
2359 	do {
2360 		tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
2361 				    : _drbd_bm_find_next(mdev, c->bit_offset);
2362 		if (tmp == -1UL)
2363 			tmp = c->bm_bits;
2364 		rl = tmp - c->bit_offset;
2365 
2366 		if (toggle == 2) { /* first iteration */
2367 			if (rl == 0) {
2368 				/* the first checked bit was set,
2369 				 * store start value, */
2370 				DCBP_set_start(p, 1);
2371 				/* but skip encoding of zero run length */
2372 				toggle = !toggle;
2373 				continue;
2374 			}
2375 			DCBP_set_start(p, 0);
2376 		}
2377 
2378 		/* paranoia: catch zero runlength.
2379 		 * can only happen if bitmap is modified while we scan it. */
2380 		if (rl == 0) {
2381 			dev_err(DEV, "unexpected zero runlength while encoding bitmap "
2382 			    "t:%u bo:%lu\n", toggle, c->bit_offset);
2383 			return -1;
2384 		}
2385 
2386 		bits = vli_encode_bits(&bs, rl);
2387 		if (bits == -ENOBUFS) /* buffer full */
2388 			break;
2389 		if (bits <= 0) {
2390 			dev_err(DEV, "error while encoding bitmap: %d\n", bits);
2391 			return 0;
2392 		}
2393 
2394 		toggle = !toggle;
2395 		plain_bits += rl;
2396 		c->bit_offset = tmp;
2397 	} while (c->bit_offset < c->bm_bits);
2398 
2399 	len = bs.cur.b - p->code + !!bs.cur.bit;
2400 
2401 	if (plain_bits < (len << 3)) {
2402 		/* incompressible with this method.
2403 		 * we need to rewind both word and bit position. */
2404 		c->bit_offset -= plain_bits;
2405 		bm_xfer_ctx_bit_to_word_offset(c);
2406 		c->bit_offset = c->word_offset * BITS_PER_LONG;
2407 		return 0;
2408 	}
2409 
2410 	/* RLE + VLI was able to compress it just fine.
2411 	 * update c->word_offset. */
2412 	bm_xfer_ctx_bit_to_word_offset(c);
2413 
2414 	/* store pad_bits */
2415 	DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
2416 
2417 	return len;
2418 }
2419 
2420 /**
2421  * send_bitmap_rle_or_plain
2422  *
2423  * Return 0 when done, 1 when another iteration is needed, and a negative error
2424  * code upon failure.
2425  */
2426 static int
2427 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
2428 			 struct p_header80 *h, struct bm_xfer_ctx *c)
2429 {
2430 	struct p_compressed_bm *p = (void*)h;
2431 	unsigned long num_words;
2432 	int len;
2433 	int ok;
2434 
2435 	len = fill_bitmap_rle_bits(mdev, p, c);
2436 
2437 	if (len < 0)
2438 		return -EIO;
2439 
2440 	if (len) {
2441 		DCBP_set_code(p, RLE_VLI_Bits);
2442 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
2443 			sizeof(*p) + len, 0);
2444 
2445 		c->packets[0]++;
2446 		c->bytes[0] += sizeof(*p) + len;
2447 
2448 		if (c->bit_offset >= c->bm_bits)
2449 			len = 0; /* DONE */
2450 	} else {
2451 		/* was not compressible.
2452 		 * send a buffer full of plain text bits instead. */
2453 		num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
2454 		len = num_words * sizeof(long);
2455 		if (len)
2456 			drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
2457 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
2458 				   h, sizeof(struct p_header80) + len, 0);
2459 		c->word_offset += num_words;
2460 		c->bit_offset = c->word_offset * BITS_PER_LONG;
2461 
2462 		c->packets[1]++;
2463 		c->bytes[1] += sizeof(struct p_header80) + len;
2464 
2465 		if (c->bit_offset > c->bm_bits)
2466 			c->bit_offset = c->bm_bits;
2467 	}
2468 	if (ok) {
2469 		if (len == 0) {
2470 			INFO_bm_xfer_stats(mdev, "send", c);
2471 			return 0;
2472 		} else
2473 			return 1;
2474 	}
2475 	return -EIO;
2476 }
2477 
2478 /* See the comment at receive_bitmap() */
2479 int _drbd_send_bitmap(struct drbd_conf *mdev)
2480 {
2481 	struct bm_xfer_ctx c;
2482 	struct p_header80 *p;
2483 	int err;
2484 
2485 	ERR_IF(!mdev->bitmap) return false;
2486 
2487 	/* maybe we should use some per thread scratch page,
2488 	 * and allocate that during initial device creation? */
2489 	p = (struct p_header80 *) __get_free_page(GFP_NOIO);
2490 	if (!p) {
2491 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2492 		return false;
2493 	}
2494 
2495 	if (get_ldev(mdev)) {
2496 		if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2497 			dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2498 			drbd_bm_set_all(mdev);
2499 			if (drbd_bm_write(mdev)) {
2500 				/* write_bm did fail! Leave full sync flag set in Meta P_DATA
2501 				 * but otherwise process as per normal - need to tell other
2502 				 * side that a full resync is required! */
2503 				dev_err(DEV, "Failed to write bitmap to disk!\n");
2504 			} else {
2505 				drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2506 				drbd_md_sync(mdev);
2507 			}
2508 		}
2509 		put_ldev(mdev);
2510 	}
2511 
2512 	c = (struct bm_xfer_ctx) {
2513 		.bm_bits = drbd_bm_bits(mdev),
2514 		.bm_words = drbd_bm_words(mdev),
2515 	};
2516 
2517 	do {
2518 		err = send_bitmap_rle_or_plain(mdev, p, &c);
2519 	} while (err > 0);
2520 
2521 	free_page((unsigned long) p);
2522 	return err == 0;
2523 }
2524 
2525 int drbd_send_bitmap(struct drbd_conf *mdev)
2526 {
2527 	int err;
2528 
2529 	if (!drbd_get_data_sock(mdev))
2530 		return -1;
2531 	err = !_drbd_send_bitmap(mdev);
2532 	drbd_put_data_sock(mdev);
2533 	return err;
2534 }
2535 
2536 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2537 {
2538 	int ok;
2539 	struct p_barrier_ack p;
2540 
2541 	p.barrier  = barrier_nr;
2542 	p.set_size = cpu_to_be32(set_size);
2543 
2544 	if (mdev->state.conn < C_CONNECTED)
2545 		return false;
2546 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2547 			(struct p_header80 *)&p, sizeof(p));
2548 	return ok;
2549 }
2550 
2551 /**
2552  * _drbd_send_ack() - Sends an ack packet
2553  * @mdev:	DRBD device.
2554  * @cmd:	Packet command code.
2555  * @sector:	sector, needs to be in big endian byte order
2556  * @blksize:	size in byte, needs to be in big endian byte order
2557  * @block_id:	Id, big endian byte order
2558  */
2559 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2560 			  u64 sector,
2561 			  u32 blksize,
2562 			  u64 block_id)
2563 {
2564 	int ok;
2565 	struct p_block_ack p;
2566 
2567 	p.sector   = sector;
2568 	p.block_id = block_id;
2569 	p.blksize  = blksize;
2570 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2571 
2572 	if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2573 		return false;
2574 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2575 				(struct p_header80 *)&p, sizeof(p));
2576 	return ok;
2577 }
2578 
2579 /* dp->sector and dp->block_id already/still in network byte order,
2580  * data_size is payload size according to dp->head,
2581  * and may need to be corrected for digest size. */
2582 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2583 		     struct p_data *dp, int data_size)
2584 {
2585 	data_size -= (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
2586 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
2587 	return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2588 			      dp->block_id);
2589 }
2590 
2591 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2592 		     struct p_block_req *rp)
2593 {
2594 	return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2595 }
2596 
2597 /**
2598  * drbd_send_ack() - Sends an ack packet
2599  * @mdev:	DRBD device.
2600  * @cmd:	Packet command code.
2601  * @e:		Epoch entry.
2602  */
2603 int drbd_send_ack(struct drbd_conf *mdev,
2604 	enum drbd_packets cmd, struct drbd_epoch_entry *e)
2605 {
2606 	return _drbd_send_ack(mdev, cmd,
2607 			      cpu_to_be64(e->sector),
2608 			      cpu_to_be32(e->size),
2609 			      e->block_id);
2610 }
2611 
2612 /* This function misuses the block_id field to signal if the blocks
2613  * are is sync or not. */
2614 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2615 		     sector_t sector, int blksize, u64 block_id)
2616 {
2617 	return _drbd_send_ack(mdev, cmd,
2618 			      cpu_to_be64(sector),
2619 			      cpu_to_be32(blksize),
2620 			      cpu_to_be64(block_id));
2621 }
2622 
2623 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2624 		       sector_t sector, int size, u64 block_id)
2625 {
2626 	int ok;
2627 	struct p_block_req p;
2628 
2629 	p.sector   = cpu_to_be64(sector);
2630 	p.block_id = block_id;
2631 	p.blksize  = cpu_to_be32(size);
2632 
2633 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2634 				(struct p_header80 *)&p, sizeof(p));
2635 	return ok;
2636 }
2637 
2638 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2639 			    sector_t sector, int size,
2640 			    void *digest, int digest_size,
2641 			    enum drbd_packets cmd)
2642 {
2643 	int ok;
2644 	struct p_block_req p;
2645 
2646 	p.sector   = cpu_to_be64(sector);
2647 	p.block_id = BE_DRBD_MAGIC + 0xbeef;
2648 	p.blksize  = cpu_to_be32(size);
2649 
2650 	p.head.magic   = BE_DRBD_MAGIC;
2651 	p.head.command = cpu_to_be16(cmd);
2652 	p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + digest_size);
2653 
2654 	mutex_lock(&mdev->data.mutex);
2655 
2656 	ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2657 	ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2658 
2659 	mutex_unlock(&mdev->data.mutex);
2660 
2661 	return ok;
2662 }
2663 
2664 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2665 {
2666 	int ok;
2667 	struct p_block_req p;
2668 
2669 	p.sector   = cpu_to_be64(sector);
2670 	p.block_id = BE_DRBD_MAGIC + 0xbabe;
2671 	p.blksize  = cpu_to_be32(size);
2672 
2673 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2674 			   (struct p_header80 *)&p, sizeof(p));
2675 	return ok;
2676 }
2677 
2678 /* called on sndtimeo
2679  * returns false if we should retry,
2680  * true if we think connection is dead
2681  */
2682 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2683 {
2684 	int drop_it;
2685 	/* long elapsed = (long)(jiffies - mdev->last_received); */
2686 
2687 	drop_it =   mdev->meta.socket == sock
2688 		|| !mdev->asender.task
2689 		|| get_t_state(&mdev->asender) != Running
2690 		|| mdev->state.conn < C_CONNECTED;
2691 
2692 	if (drop_it)
2693 		return true;
2694 
2695 	drop_it = !--mdev->ko_count;
2696 	if (!drop_it) {
2697 		dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2698 		       current->comm, current->pid, mdev->ko_count);
2699 		request_ping(mdev);
2700 	}
2701 
2702 	return drop_it; /* && (mdev->state == R_PRIMARY) */;
2703 }
2704 
2705 /* The idea of sendpage seems to be to put some kind of reference
2706  * to the page into the skb, and to hand it over to the NIC. In
2707  * this process get_page() gets called.
2708  *
2709  * As soon as the page was really sent over the network put_page()
2710  * gets called by some part of the network layer. [ NIC driver? ]
2711  *
2712  * [ get_page() / put_page() increment/decrement the count. If count
2713  *   reaches 0 the page will be freed. ]
2714  *
2715  * This works nicely with pages from FSs.
2716  * But this means that in protocol A we might signal IO completion too early!
2717  *
2718  * In order not to corrupt data during a resync we must make sure
2719  * that we do not reuse our own buffer pages (EEs) to early, therefore
2720  * we have the net_ee list.
2721  *
2722  * XFS seems to have problems, still, it submits pages with page_count == 0!
2723  * As a workaround, we disable sendpage on pages
2724  * with page_count == 0 or PageSlab.
2725  */
2726 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2727 		   int offset, size_t size, unsigned msg_flags)
2728 {
2729 	int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags);
2730 	kunmap(page);
2731 	if (sent == size)
2732 		mdev->send_cnt += size>>9;
2733 	return sent == size;
2734 }
2735 
2736 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2737 		    int offset, size_t size, unsigned msg_flags)
2738 {
2739 	mm_segment_t oldfs = get_fs();
2740 	int sent, ok;
2741 	int len = size;
2742 
2743 	/* e.g. XFS meta- & log-data is in slab pages, which have a
2744 	 * page_count of 0 and/or have PageSlab() set.
2745 	 * we cannot use send_page for those, as that does get_page();
2746 	 * put_page(); and would cause either a VM_BUG directly, or
2747 	 * __page_cache_release a page that would actually still be referenced
2748 	 * by someone, leading to some obscure delayed Oops somewhere else. */
2749 	if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2750 		return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2751 
2752 	msg_flags |= MSG_NOSIGNAL;
2753 	drbd_update_congested(mdev);
2754 	set_fs(KERNEL_DS);
2755 	do {
2756 		sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2757 							offset, len,
2758 							msg_flags);
2759 		if (sent == -EAGAIN) {
2760 			if (we_should_drop_the_connection(mdev,
2761 							  mdev->data.socket))
2762 				break;
2763 			else
2764 				continue;
2765 		}
2766 		if (sent <= 0) {
2767 			dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2768 			     __func__, (int)size, len, sent);
2769 			break;
2770 		}
2771 		len    -= sent;
2772 		offset += sent;
2773 	} while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2774 	set_fs(oldfs);
2775 	clear_bit(NET_CONGESTED, &mdev->flags);
2776 
2777 	ok = (len == 0);
2778 	if (likely(ok))
2779 		mdev->send_cnt += size>>9;
2780 	return ok;
2781 }
2782 
2783 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2784 {
2785 	struct bio_vec *bvec;
2786 	int i;
2787 	/* hint all but last page with MSG_MORE */
2788 	bio_for_each_segment(bvec, bio, i) {
2789 		if (!_drbd_no_send_page(mdev, bvec->bv_page,
2790 				     bvec->bv_offset, bvec->bv_len,
2791 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2792 			return 0;
2793 	}
2794 	return 1;
2795 }
2796 
2797 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2798 {
2799 	struct bio_vec *bvec;
2800 	int i;
2801 	/* hint all but last page with MSG_MORE */
2802 	bio_for_each_segment(bvec, bio, i) {
2803 		if (!_drbd_send_page(mdev, bvec->bv_page,
2804 				     bvec->bv_offset, bvec->bv_len,
2805 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2806 			return 0;
2807 	}
2808 	return 1;
2809 }
2810 
2811 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2812 {
2813 	struct page *page = e->pages;
2814 	unsigned len = e->size;
2815 	/* hint all but last page with MSG_MORE */
2816 	page_chain_for_each(page) {
2817 		unsigned l = min_t(unsigned, len, PAGE_SIZE);
2818 		if (!_drbd_send_page(mdev, page, 0, l,
2819 				page_chain_next(page) ? MSG_MORE : 0))
2820 			return 0;
2821 		len -= l;
2822 	}
2823 	return 1;
2824 }
2825 
2826 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
2827 {
2828 	if (mdev->agreed_pro_version >= 95)
2829 		return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
2830 			(bi_rw & REQ_FUA ? DP_FUA : 0) |
2831 			(bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
2832 			(bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
2833 	else
2834 		return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
2835 }
2836 
2837 /* Used to send write requests
2838  * R_PRIMARY -> Peer	(P_DATA)
2839  */
2840 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2841 {
2842 	int ok = 1;
2843 	struct p_data p;
2844 	unsigned int dp_flags = 0;
2845 	void *dgb;
2846 	int dgs;
2847 
2848 	if (!drbd_get_data_sock(mdev))
2849 		return 0;
2850 
2851 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2852 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2853 
2854 	if (req->size <= DRBD_MAX_SIZE_H80_PACKET) {
2855 		p.head.h80.magic   = BE_DRBD_MAGIC;
2856 		p.head.h80.command = cpu_to_be16(P_DATA);
2857 		p.head.h80.length  =
2858 			cpu_to_be16(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2859 	} else {
2860 		p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2861 		p.head.h95.command = cpu_to_be16(P_DATA);
2862 		p.head.h95.length  =
2863 			cpu_to_be32(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2864 	}
2865 
2866 	p.sector   = cpu_to_be64(req->sector);
2867 	p.block_id = (unsigned long)req;
2868 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2869 
2870 	dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
2871 
2872 	if (mdev->state.conn >= C_SYNC_SOURCE &&
2873 	    mdev->state.conn <= C_PAUSED_SYNC_T)
2874 		dp_flags |= DP_MAY_SET_IN_SYNC;
2875 
2876 	p.dp_flags = cpu_to_be32(dp_flags);
2877 	set_bit(UNPLUG_REMOTE, &mdev->flags);
2878 	ok = (sizeof(p) ==
2879 		drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2880 	if (ok && dgs) {
2881 		dgb = mdev->int_dig_out;
2882 		drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2883 		ok = dgs == drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2884 	}
2885 	if (ok) {
2886 		/* For protocol A, we have to memcpy the payload into
2887 		 * socket buffers, as we may complete right away
2888 		 * as soon as we handed it over to tcp, at which point the data
2889 		 * pages may become invalid.
2890 		 *
2891 		 * For data-integrity enabled, we copy it as well, so we can be
2892 		 * sure that even if the bio pages may still be modified, it
2893 		 * won't change the data on the wire, thus if the digest checks
2894 		 * out ok after sending on this side, but does not fit on the
2895 		 * receiving side, we sure have detected corruption elsewhere.
2896 		 */
2897 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A || dgs)
2898 			ok = _drbd_send_bio(mdev, req->master_bio);
2899 		else
2900 			ok = _drbd_send_zc_bio(mdev, req->master_bio);
2901 
2902 		/* double check digest, sometimes buffers have been modified in flight. */
2903 		if (dgs > 0 && dgs <= 64) {
2904 			/* 64 byte, 512 bit, is the largest digest size
2905 			 * currently supported in kernel crypto. */
2906 			unsigned char digest[64];
2907 			drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, digest);
2908 			if (memcmp(mdev->int_dig_out, digest, dgs)) {
2909 				dev_warn(DEV,
2910 					"Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
2911 					(unsigned long long)req->sector, req->size);
2912 			}
2913 		} /* else if (dgs > 64) {
2914 		     ... Be noisy about digest too large ...
2915 		} */
2916 	}
2917 
2918 	drbd_put_data_sock(mdev);
2919 
2920 	return ok;
2921 }
2922 
2923 /* answer packet, used to send data back for read requests:
2924  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2925  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2926  */
2927 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2928 		    struct drbd_epoch_entry *e)
2929 {
2930 	int ok;
2931 	struct p_data p;
2932 	void *dgb;
2933 	int dgs;
2934 
2935 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2936 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2937 
2938 	if (e->size <= DRBD_MAX_SIZE_H80_PACKET) {
2939 		p.head.h80.magic   = BE_DRBD_MAGIC;
2940 		p.head.h80.command = cpu_to_be16(cmd);
2941 		p.head.h80.length  =
2942 			cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2943 	} else {
2944 		p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2945 		p.head.h95.command = cpu_to_be16(cmd);
2946 		p.head.h95.length  =
2947 			cpu_to_be32(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2948 	}
2949 
2950 	p.sector   = cpu_to_be64(e->sector);
2951 	p.block_id = e->block_id;
2952 	/* p.seq_num  = 0;    No sequence numbers here.. */
2953 
2954 	/* Only called by our kernel thread.
2955 	 * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2956 	 * in response to admin command or module unload.
2957 	 */
2958 	if (!drbd_get_data_sock(mdev))
2959 		return 0;
2960 
2961 	ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
2962 	if (ok && dgs) {
2963 		dgb = mdev->int_dig_out;
2964 		drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2965 		ok = dgs == drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2966 	}
2967 	if (ok)
2968 		ok = _drbd_send_zc_ee(mdev, e);
2969 
2970 	drbd_put_data_sock(mdev);
2971 
2972 	return ok;
2973 }
2974 
2975 int drbd_send_oos(struct drbd_conf *mdev, struct drbd_request *req)
2976 {
2977 	struct p_block_desc p;
2978 
2979 	p.sector  = cpu_to_be64(req->sector);
2980 	p.blksize = cpu_to_be32(req->size);
2981 
2982 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OUT_OF_SYNC, &p.head, sizeof(p));
2983 }
2984 
2985 /*
2986   drbd_send distinguishes two cases:
2987 
2988   Packets sent via the data socket "sock"
2989   and packets sent via the meta data socket "msock"
2990 
2991 		    sock                      msock
2992   -----------------+-------------------------+------------------------------
2993   timeout           conf.timeout / 2          conf.timeout / 2
2994   timeout action    send a ping via msock     Abort communication
2995 					      and close all sockets
2996 */
2997 
2998 /*
2999  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
3000  */
3001 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
3002 	      void *buf, size_t size, unsigned msg_flags)
3003 {
3004 	struct kvec iov;
3005 	struct msghdr msg;
3006 	int rv, sent = 0;
3007 
3008 	if (!sock)
3009 		return -1000;
3010 
3011 	/* THINK  if (signal_pending) return ... ? */
3012 
3013 	iov.iov_base = buf;
3014 	iov.iov_len  = size;
3015 
3016 	msg.msg_name       = NULL;
3017 	msg.msg_namelen    = 0;
3018 	msg.msg_control    = NULL;
3019 	msg.msg_controllen = 0;
3020 	msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
3021 
3022 	if (sock == mdev->data.socket) {
3023 		mdev->ko_count = mdev->net_conf->ko_count;
3024 		drbd_update_congested(mdev);
3025 	}
3026 	do {
3027 		/* STRANGE
3028 		 * tcp_sendmsg does _not_ use its size parameter at all ?
3029 		 *
3030 		 * -EAGAIN on timeout, -EINTR on signal.
3031 		 */
3032 /* THINK
3033  * do we need to block DRBD_SIG if sock == &meta.socket ??
3034  * otherwise wake_asender() might interrupt some send_*Ack !
3035  */
3036 		rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
3037 		if (rv == -EAGAIN) {
3038 			if (we_should_drop_the_connection(mdev, sock))
3039 				break;
3040 			else
3041 				continue;
3042 		}
3043 		D_ASSERT(rv != 0);
3044 		if (rv == -EINTR) {
3045 			flush_signals(current);
3046 			rv = 0;
3047 		}
3048 		if (rv < 0)
3049 			break;
3050 		sent += rv;
3051 		iov.iov_base += rv;
3052 		iov.iov_len  -= rv;
3053 	} while (sent < size);
3054 
3055 	if (sock == mdev->data.socket)
3056 		clear_bit(NET_CONGESTED, &mdev->flags);
3057 
3058 	if (rv <= 0) {
3059 		if (rv != -EAGAIN) {
3060 			dev_err(DEV, "%s_sendmsg returned %d\n",
3061 			    sock == mdev->meta.socket ? "msock" : "sock",
3062 			    rv);
3063 			drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
3064 		} else
3065 			drbd_force_state(mdev, NS(conn, C_TIMEOUT));
3066 	}
3067 
3068 	return sent;
3069 }
3070 
3071 static int drbd_open(struct block_device *bdev, fmode_t mode)
3072 {
3073 	struct drbd_conf *mdev = bdev->bd_disk->private_data;
3074 	unsigned long flags;
3075 	int rv = 0;
3076 
3077 	mutex_lock(&drbd_main_mutex);
3078 	spin_lock_irqsave(&mdev->req_lock, flags);
3079 	/* to have a stable mdev->state.role
3080 	 * and no race with updating open_cnt */
3081 
3082 	if (mdev->state.role != R_PRIMARY) {
3083 		if (mode & FMODE_WRITE)
3084 			rv = -EROFS;
3085 		else if (!allow_oos)
3086 			rv = -EMEDIUMTYPE;
3087 	}
3088 
3089 	if (!rv)
3090 		mdev->open_cnt++;
3091 	spin_unlock_irqrestore(&mdev->req_lock, flags);
3092 	mutex_unlock(&drbd_main_mutex);
3093 
3094 	return rv;
3095 }
3096 
3097 static int drbd_release(struct gendisk *gd, fmode_t mode)
3098 {
3099 	struct drbd_conf *mdev = gd->private_data;
3100 	mutex_lock(&drbd_main_mutex);
3101 	mdev->open_cnt--;
3102 	mutex_unlock(&drbd_main_mutex);
3103 	return 0;
3104 }
3105 
3106 static void drbd_set_defaults(struct drbd_conf *mdev)
3107 {
3108 	/* This way we get a compile error when sync_conf grows,
3109 	   and we forgot to initialize it here */
3110 	mdev->sync_conf = (struct syncer_conf) {
3111 		/* .rate = */		DRBD_RATE_DEF,
3112 		/* .after = */		DRBD_AFTER_DEF,
3113 		/* .al_extents = */	DRBD_AL_EXTENTS_DEF,
3114 		/* .verify_alg = */	{}, 0,
3115 		/* .cpu_mask = */	{}, 0,
3116 		/* .csums_alg = */	{}, 0,
3117 		/* .use_rle = */	0,
3118 		/* .on_no_data = */	DRBD_ON_NO_DATA_DEF,
3119 		/* .c_plan_ahead = */	DRBD_C_PLAN_AHEAD_DEF,
3120 		/* .c_delay_target = */	DRBD_C_DELAY_TARGET_DEF,
3121 		/* .c_fill_target = */	DRBD_C_FILL_TARGET_DEF,
3122 		/* .c_max_rate = */	DRBD_C_MAX_RATE_DEF,
3123 		/* .c_min_rate = */	DRBD_C_MIN_RATE_DEF
3124 	};
3125 
3126 	/* Have to use that way, because the layout differs between
3127 	   big endian and little endian */
3128 	mdev->state = (union drbd_state) {
3129 		{ .role = R_SECONDARY,
3130 		  .peer = R_UNKNOWN,
3131 		  .conn = C_STANDALONE,
3132 		  .disk = D_DISKLESS,
3133 		  .pdsk = D_UNKNOWN,
3134 		  .susp = 0,
3135 		  .susp_nod = 0,
3136 		  .susp_fen = 0
3137 		} };
3138 }
3139 
3140 void drbd_init_set_defaults(struct drbd_conf *mdev)
3141 {
3142 	/* the memset(,0,) did most of this.
3143 	 * note: only assignments, no allocation in here */
3144 
3145 	drbd_set_defaults(mdev);
3146 
3147 	atomic_set(&mdev->ap_bio_cnt, 0);
3148 	atomic_set(&mdev->ap_pending_cnt, 0);
3149 	atomic_set(&mdev->rs_pending_cnt, 0);
3150 	atomic_set(&mdev->unacked_cnt, 0);
3151 	atomic_set(&mdev->local_cnt, 0);
3152 	atomic_set(&mdev->net_cnt, 0);
3153 	atomic_set(&mdev->packet_seq, 0);
3154 	atomic_set(&mdev->pp_in_use, 0);
3155 	atomic_set(&mdev->pp_in_use_by_net, 0);
3156 	atomic_set(&mdev->rs_sect_in, 0);
3157 	atomic_set(&mdev->rs_sect_ev, 0);
3158 	atomic_set(&mdev->ap_in_flight, 0);
3159 	atomic_set(&mdev->md_io_in_use, 0);
3160 
3161 	mutex_init(&mdev->data.mutex);
3162 	mutex_init(&mdev->meta.mutex);
3163 	sema_init(&mdev->data.work.s, 0);
3164 	sema_init(&mdev->meta.work.s, 0);
3165 	mutex_init(&mdev->state_mutex);
3166 
3167 	spin_lock_init(&mdev->data.work.q_lock);
3168 	spin_lock_init(&mdev->meta.work.q_lock);
3169 
3170 	spin_lock_init(&mdev->al_lock);
3171 	spin_lock_init(&mdev->req_lock);
3172 	spin_lock_init(&mdev->peer_seq_lock);
3173 	spin_lock_init(&mdev->epoch_lock);
3174 
3175 	INIT_LIST_HEAD(&mdev->active_ee);
3176 	INIT_LIST_HEAD(&mdev->sync_ee);
3177 	INIT_LIST_HEAD(&mdev->done_ee);
3178 	INIT_LIST_HEAD(&mdev->read_ee);
3179 	INIT_LIST_HEAD(&mdev->net_ee);
3180 	INIT_LIST_HEAD(&mdev->resync_reads);
3181 	INIT_LIST_HEAD(&mdev->data.work.q);
3182 	INIT_LIST_HEAD(&mdev->meta.work.q);
3183 	INIT_LIST_HEAD(&mdev->resync_work.list);
3184 	INIT_LIST_HEAD(&mdev->unplug_work.list);
3185 	INIT_LIST_HEAD(&mdev->go_diskless.list);
3186 	INIT_LIST_HEAD(&mdev->md_sync_work.list);
3187 	INIT_LIST_HEAD(&mdev->start_resync_work.list);
3188 	INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
3189 
3190 	mdev->resync_work.cb  = w_resync_timer;
3191 	mdev->unplug_work.cb  = w_send_write_hint;
3192 	mdev->go_diskless.cb  = w_go_diskless;
3193 	mdev->md_sync_work.cb = w_md_sync;
3194 	mdev->bm_io_work.w.cb = w_bitmap_io;
3195 	mdev->start_resync_work.cb = w_start_resync;
3196 	init_timer(&mdev->resync_timer);
3197 	init_timer(&mdev->md_sync_timer);
3198 	init_timer(&mdev->start_resync_timer);
3199 	init_timer(&mdev->request_timer);
3200 	mdev->resync_timer.function = resync_timer_fn;
3201 	mdev->resync_timer.data = (unsigned long) mdev;
3202 	mdev->md_sync_timer.function = md_sync_timer_fn;
3203 	mdev->md_sync_timer.data = (unsigned long) mdev;
3204 	mdev->start_resync_timer.function = start_resync_timer_fn;
3205 	mdev->start_resync_timer.data = (unsigned long) mdev;
3206 	mdev->request_timer.function = request_timer_fn;
3207 	mdev->request_timer.data = (unsigned long) mdev;
3208 
3209 	init_waitqueue_head(&mdev->misc_wait);
3210 	init_waitqueue_head(&mdev->state_wait);
3211 	init_waitqueue_head(&mdev->net_cnt_wait);
3212 	init_waitqueue_head(&mdev->ee_wait);
3213 	init_waitqueue_head(&mdev->al_wait);
3214 	init_waitqueue_head(&mdev->seq_wait);
3215 
3216 	drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
3217 	drbd_thread_init(mdev, &mdev->worker, drbd_worker);
3218 	drbd_thread_init(mdev, &mdev->asender, drbd_asender);
3219 
3220 	mdev->agreed_pro_version = PRO_VERSION_MAX;
3221 	mdev->write_ordering = WO_bdev_flush;
3222 	mdev->resync_wenr = LC_FREE;
3223 	mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
3224 	mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
3225 }
3226 
3227 void drbd_mdev_cleanup(struct drbd_conf *mdev)
3228 {
3229 	int i;
3230 	if (mdev->receiver.t_state != None)
3231 		dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
3232 				mdev->receiver.t_state);
3233 
3234 	/* no need to lock it, I'm the only thread alive */
3235 	if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
3236 		dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
3237 	mdev->al_writ_cnt  =
3238 	mdev->bm_writ_cnt  =
3239 	mdev->read_cnt     =
3240 	mdev->recv_cnt     =
3241 	mdev->send_cnt     =
3242 	mdev->writ_cnt     =
3243 	mdev->p_size       =
3244 	mdev->rs_start     =
3245 	mdev->rs_total     =
3246 	mdev->rs_failed    = 0;
3247 	mdev->rs_last_events = 0;
3248 	mdev->rs_last_sect_ev = 0;
3249 	for (i = 0; i < DRBD_SYNC_MARKS; i++) {
3250 		mdev->rs_mark_left[i] = 0;
3251 		mdev->rs_mark_time[i] = 0;
3252 	}
3253 	D_ASSERT(mdev->net_conf == NULL);
3254 
3255 	drbd_set_my_capacity(mdev, 0);
3256 	if (mdev->bitmap) {
3257 		/* maybe never allocated. */
3258 		drbd_bm_resize(mdev, 0, 1);
3259 		drbd_bm_cleanup(mdev);
3260 	}
3261 
3262 	drbd_free_resources(mdev);
3263 	clear_bit(AL_SUSPENDED, &mdev->flags);
3264 
3265 	/*
3266 	 * currently we drbd_init_ee only on module load, so
3267 	 * we may do drbd_release_ee only on module unload!
3268 	 */
3269 	D_ASSERT(list_empty(&mdev->active_ee));
3270 	D_ASSERT(list_empty(&mdev->sync_ee));
3271 	D_ASSERT(list_empty(&mdev->done_ee));
3272 	D_ASSERT(list_empty(&mdev->read_ee));
3273 	D_ASSERT(list_empty(&mdev->net_ee));
3274 	D_ASSERT(list_empty(&mdev->resync_reads));
3275 	D_ASSERT(list_empty(&mdev->data.work.q));
3276 	D_ASSERT(list_empty(&mdev->meta.work.q));
3277 	D_ASSERT(list_empty(&mdev->resync_work.list));
3278 	D_ASSERT(list_empty(&mdev->unplug_work.list));
3279 	D_ASSERT(list_empty(&mdev->go_diskless.list));
3280 
3281 	drbd_set_defaults(mdev);
3282 }
3283 
3284 
3285 static void drbd_destroy_mempools(void)
3286 {
3287 	struct page *page;
3288 
3289 	while (drbd_pp_pool) {
3290 		page = drbd_pp_pool;
3291 		drbd_pp_pool = (struct page *)page_private(page);
3292 		__free_page(page);
3293 		drbd_pp_vacant--;
3294 	}
3295 
3296 	/* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
3297 
3298 	if (drbd_md_io_bio_set)
3299 		bioset_free(drbd_md_io_bio_set);
3300 	if (drbd_md_io_page_pool)
3301 		mempool_destroy(drbd_md_io_page_pool);
3302 	if (drbd_ee_mempool)
3303 		mempool_destroy(drbd_ee_mempool);
3304 	if (drbd_request_mempool)
3305 		mempool_destroy(drbd_request_mempool);
3306 	if (drbd_ee_cache)
3307 		kmem_cache_destroy(drbd_ee_cache);
3308 	if (drbd_request_cache)
3309 		kmem_cache_destroy(drbd_request_cache);
3310 	if (drbd_bm_ext_cache)
3311 		kmem_cache_destroy(drbd_bm_ext_cache);
3312 	if (drbd_al_ext_cache)
3313 		kmem_cache_destroy(drbd_al_ext_cache);
3314 
3315 	drbd_md_io_bio_set   = NULL;
3316 	drbd_md_io_page_pool = NULL;
3317 	drbd_ee_mempool      = NULL;
3318 	drbd_request_mempool = NULL;
3319 	drbd_ee_cache        = NULL;
3320 	drbd_request_cache   = NULL;
3321 	drbd_bm_ext_cache    = NULL;
3322 	drbd_al_ext_cache    = NULL;
3323 
3324 	return;
3325 }
3326 
3327 static int drbd_create_mempools(void)
3328 {
3329 	struct page *page;
3330 	const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
3331 	int i;
3332 
3333 	/* prepare our caches and mempools */
3334 	drbd_request_mempool = NULL;
3335 	drbd_ee_cache        = NULL;
3336 	drbd_request_cache   = NULL;
3337 	drbd_bm_ext_cache    = NULL;
3338 	drbd_al_ext_cache    = NULL;
3339 	drbd_pp_pool         = NULL;
3340 	drbd_md_io_page_pool = NULL;
3341 	drbd_md_io_bio_set   = NULL;
3342 
3343 	/* caches */
3344 	drbd_request_cache = kmem_cache_create(
3345 		"drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
3346 	if (drbd_request_cache == NULL)
3347 		goto Enomem;
3348 
3349 	drbd_ee_cache = kmem_cache_create(
3350 		"drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
3351 	if (drbd_ee_cache == NULL)
3352 		goto Enomem;
3353 
3354 	drbd_bm_ext_cache = kmem_cache_create(
3355 		"drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
3356 	if (drbd_bm_ext_cache == NULL)
3357 		goto Enomem;
3358 
3359 	drbd_al_ext_cache = kmem_cache_create(
3360 		"drbd_al", sizeof(struct lc_element), 0, 0, NULL);
3361 	if (drbd_al_ext_cache == NULL)
3362 		goto Enomem;
3363 
3364 	/* mempools */
3365 #ifdef COMPAT_HAVE_BIOSET_CREATE
3366 	drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
3367 	if (drbd_md_io_bio_set == NULL)
3368 		goto Enomem;
3369 #endif
3370 
3371 	drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
3372 	if (drbd_md_io_page_pool == NULL)
3373 		goto Enomem;
3374 
3375 	drbd_request_mempool = mempool_create(number,
3376 		mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
3377 	if (drbd_request_mempool == NULL)
3378 		goto Enomem;
3379 
3380 	drbd_ee_mempool = mempool_create(number,
3381 		mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
3382 	if (drbd_ee_mempool == NULL)
3383 		goto Enomem;
3384 
3385 	/* drbd's page pool */
3386 	spin_lock_init(&drbd_pp_lock);
3387 
3388 	for (i = 0; i < number; i++) {
3389 		page = alloc_page(GFP_HIGHUSER);
3390 		if (!page)
3391 			goto Enomem;
3392 		set_page_private(page, (unsigned long)drbd_pp_pool);
3393 		drbd_pp_pool = page;
3394 	}
3395 	drbd_pp_vacant = number;
3396 
3397 	return 0;
3398 
3399 Enomem:
3400 	drbd_destroy_mempools(); /* in case we allocated some */
3401 	return -ENOMEM;
3402 }
3403 
3404 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
3405 	void *unused)
3406 {
3407 	/* just so we have it.  you never know what interesting things we
3408 	 * might want to do here some day...
3409 	 */
3410 
3411 	return NOTIFY_DONE;
3412 }
3413 
3414 static struct notifier_block drbd_notifier = {
3415 	.notifier_call = drbd_notify_sys,
3416 };
3417 
3418 static void drbd_release_ee_lists(struct drbd_conf *mdev)
3419 {
3420 	int rr;
3421 
3422 	rr = drbd_release_ee(mdev, &mdev->active_ee);
3423 	if (rr)
3424 		dev_err(DEV, "%d EEs in active list found!\n", rr);
3425 
3426 	rr = drbd_release_ee(mdev, &mdev->sync_ee);
3427 	if (rr)
3428 		dev_err(DEV, "%d EEs in sync list found!\n", rr);
3429 
3430 	rr = drbd_release_ee(mdev, &mdev->read_ee);
3431 	if (rr)
3432 		dev_err(DEV, "%d EEs in read list found!\n", rr);
3433 
3434 	rr = drbd_release_ee(mdev, &mdev->done_ee);
3435 	if (rr)
3436 		dev_err(DEV, "%d EEs in done list found!\n", rr);
3437 
3438 	rr = drbd_release_ee(mdev, &mdev->net_ee);
3439 	if (rr)
3440 		dev_err(DEV, "%d EEs in net list found!\n", rr);
3441 }
3442 
3443 /* caution. no locking.
3444  * currently only used from module cleanup code. */
3445 static void drbd_delete_device(unsigned int minor)
3446 {
3447 	struct drbd_conf *mdev = minor_to_mdev(minor);
3448 
3449 	if (!mdev)
3450 		return;
3451 
3452 	del_timer_sync(&mdev->request_timer);
3453 
3454 	/* paranoia asserts */
3455 	if (mdev->open_cnt != 0)
3456 		dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
3457 				__FILE__ , __LINE__);
3458 
3459 	ERR_IF (!list_empty(&mdev->data.work.q)) {
3460 		struct list_head *lp;
3461 		list_for_each(lp, &mdev->data.work.q) {
3462 			dev_err(DEV, "lp = %p\n", lp);
3463 		}
3464 	};
3465 	/* end paranoia asserts */
3466 
3467 	del_gendisk(mdev->vdisk);
3468 
3469 	/* cleanup stuff that may have been allocated during
3470 	 * device (re-)configuration or state changes */
3471 
3472 	if (mdev->this_bdev)
3473 		bdput(mdev->this_bdev);
3474 
3475 	drbd_free_resources(mdev);
3476 
3477 	drbd_release_ee_lists(mdev);
3478 
3479 	/* should be freed on disconnect? */
3480 	kfree(mdev->ee_hash);
3481 	/*
3482 	mdev->ee_hash_s = 0;
3483 	mdev->ee_hash = NULL;
3484 	*/
3485 
3486 	lc_destroy(mdev->act_log);
3487 	lc_destroy(mdev->resync);
3488 
3489 	kfree(mdev->p_uuid);
3490 	/* mdev->p_uuid = NULL; */
3491 
3492 	kfree(mdev->int_dig_out);
3493 	kfree(mdev->int_dig_in);
3494 	kfree(mdev->int_dig_vv);
3495 
3496 	/* cleanup the rest that has been
3497 	 * allocated from drbd_new_device
3498 	 * and actually free the mdev itself */
3499 	drbd_free_mdev(mdev);
3500 }
3501 
3502 static void drbd_cleanup(void)
3503 {
3504 	unsigned int i;
3505 
3506 	unregister_reboot_notifier(&drbd_notifier);
3507 
3508 	/* first remove proc,
3509 	 * drbdsetup uses it's presence to detect
3510 	 * whether DRBD is loaded.
3511 	 * If we would get stuck in proc removal,
3512 	 * but have netlink already deregistered,
3513 	 * some drbdsetup commands may wait forever
3514 	 * for an answer.
3515 	 */
3516 	if (drbd_proc)
3517 		remove_proc_entry("drbd", NULL);
3518 
3519 	drbd_nl_cleanup();
3520 
3521 	if (minor_table) {
3522 		i = minor_count;
3523 		while (i--)
3524 			drbd_delete_device(i);
3525 		drbd_destroy_mempools();
3526 	}
3527 
3528 	kfree(minor_table);
3529 
3530 	unregister_blkdev(DRBD_MAJOR, "drbd");
3531 
3532 	printk(KERN_INFO "drbd: module cleanup done.\n");
3533 }
3534 
3535 /**
3536  * drbd_congested() - Callback for the flusher thread
3537  * @congested_data:	User data
3538  * @bdi_bits:		Bits the BDI flusher thread is currently interested in
3539  *
3540  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3541  */
3542 static int drbd_congested(void *congested_data, int bdi_bits)
3543 {
3544 	struct drbd_conf *mdev = congested_data;
3545 	struct request_queue *q;
3546 	char reason = '-';
3547 	int r = 0;
3548 
3549 	if (!may_inc_ap_bio(mdev)) {
3550 		/* DRBD has frozen IO */
3551 		r = bdi_bits;
3552 		reason = 'd';
3553 		goto out;
3554 	}
3555 
3556 	if (test_bit(CALLBACK_PENDING, &mdev->flags)) {
3557 		r |= (1 << BDI_async_congested);
3558 		/* Without good local data, we would need to read from remote,
3559 		 * and that would need the worker thread as well, which is
3560 		 * currently blocked waiting for that usermode helper to
3561 		 * finish.
3562 		 */
3563 		if (!get_ldev_if_state(mdev, D_UP_TO_DATE))
3564 			r |= (1 << BDI_sync_congested);
3565 		else
3566 			put_ldev(mdev);
3567 		r &= bdi_bits;
3568 		reason = 'c';
3569 		goto out;
3570 	}
3571 
3572 	if (get_ldev(mdev)) {
3573 		q = bdev_get_queue(mdev->ldev->backing_bdev);
3574 		r = bdi_congested(&q->backing_dev_info, bdi_bits);
3575 		put_ldev(mdev);
3576 		if (r)
3577 			reason = 'b';
3578 	}
3579 
3580 	if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3581 		r |= (1 << BDI_async_congested);
3582 		reason = reason == 'b' ? 'a' : 'n';
3583 	}
3584 
3585 out:
3586 	mdev->congestion_reason = reason;
3587 	return r;
3588 }
3589 
3590 struct drbd_conf *drbd_new_device(unsigned int minor)
3591 {
3592 	struct drbd_conf *mdev;
3593 	struct gendisk *disk;
3594 	struct request_queue *q;
3595 
3596 	/* GFP_KERNEL, we are outside of all write-out paths */
3597 	mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3598 	if (!mdev)
3599 		return NULL;
3600 	if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3601 		goto out_no_cpumask;
3602 
3603 	mdev->minor = minor;
3604 
3605 	drbd_init_set_defaults(mdev);
3606 
3607 	q = blk_alloc_queue(GFP_KERNEL);
3608 	if (!q)
3609 		goto out_no_q;
3610 	mdev->rq_queue = q;
3611 	q->queuedata   = mdev;
3612 
3613 	disk = alloc_disk(1);
3614 	if (!disk)
3615 		goto out_no_disk;
3616 	mdev->vdisk = disk;
3617 
3618 	set_disk_ro(disk, true);
3619 
3620 	disk->queue = q;
3621 	disk->major = DRBD_MAJOR;
3622 	disk->first_minor = minor;
3623 	disk->fops = &drbd_ops;
3624 	sprintf(disk->disk_name, "drbd%d", minor);
3625 	disk->private_data = mdev;
3626 
3627 	mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3628 	/* we have no partitions. we contain only ourselves. */
3629 	mdev->this_bdev->bd_contains = mdev->this_bdev;
3630 
3631 	q->backing_dev_info.congested_fn = drbd_congested;
3632 	q->backing_dev_info.congested_data = mdev;
3633 
3634 	blk_queue_make_request(q, drbd_make_request);
3635 	blk_queue_flush(q, REQ_FLUSH | REQ_FUA);
3636 	/* Setting the max_hw_sectors to an odd value of 8kibyte here
3637 	   This triggers a max_bio_size message upon first attach or connect */
3638 	blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
3639 	blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3640 	blk_queue_merge_bvec(q, drbd_merge_bvec);
3641 	q->queue_lock = &mdev->req_lock;
3642 
3643 	mdev->md_io_page = alloc_page(GFP_KERNEL);
3644 	if (!mdev->md_io_page)
3645 		goto out_no_io_page;
3646 
3647 	if (drbd_bm_init(mdev))
3648 		goto out_no_bitmap;
3649 	/* no need to lock access, we are still initializing this minor device. */
3650 	if (!tl_init(mdev))
3651 		goto out_no_tl;
3652 
3653 	mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3654 	if (!mdev->app_reads_hash)
3655 		goto out_no_app_reads;
3656 
3657 	mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3658 	if (!mdev->current_epoch)
3659 		goto out_no_epoch;
3660 
3661 	INIT_LIST_HEAD(&mdev->current_epoch->list);
3662 	mdev->epochs = 1;
3663 
3664 	return mdev;
3665 
3666 /* out_whatever_else:
3667 	kfree(mdev->current_epoch); */
3668 out_no_epoch:
3669 	kfree(mdev->app_reads_hash);
3670 out_no_app_reads:
3671 	tl_cleanup(mdev);
3672 out_no_tl:
3673 	drbd_bm_cleanup(mdev);
3674 out_no_bitmap:
3675 	__free_page(mdev->md_io_page);
3676 out_no_io_page:
3677 	put_disk(disk);
3678 out_no_disk:
3679 	blk_cleanup_queue(q);
3680 out_no_q:
3681 	free_cpumask_var(mdev->cpu_mask);
3682 out_no_cpumask:
3683 	kfree(mdev);
3684 	return NULL;
3685 }
3686 
3687 /* counterpart of drbd_new_device.
3688  * last part of drbd_delete_device. */
3689 void drbd_free_mdev(struct drbd_conf *mdev)
3690 {
3691 	kfree(mdev->current_epoch);
3692 	kfree(mdev->app_reads_hash);
3693 	tl_cleanup(mdev);
3694 	if (mdev->bitmap) /* should no longer be there. */
3695 		drbd_bm_cleanup(mdev);
3696 	__free_page(mdev->md_io_page);
3697 	put_disk(mdev->vdisk);
3698 	blk_cleanup_queue(mdev->rq_queue);
3699 	free_cpumask_var(mdev->cpu_mask);
3700 	drbd_free_tl_hash(mdev);
3701 	kfree(mdev);
3702 }
3703 
3704 
3705 int __init drbd_init(void)
3706 {
3707 	int err;
3708 
3709 	if (sizeof(struct p_handshake) != 80) {
3710 		printk(KERN_ERR
3711 		       "drbd: never change the size or layout "
3712 		       "of the HandShake packet.\n");
3713 		return -EINVAL;
3714 	}
3715 
3716 	if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
3717 		printk(KERN_ERR
3718 			"drbd: invalid minor_count (%d)\n", minor_count);
3719 #ifdef MODULE
3720 		return -EINVAL;
3721 #else
3722 		minor_count = 8;
3723 #endif
3724 	}
3725 
3726 	err = drbd_nl_init();
3727 	if (err)
3728 		return err;
3729 
3730 	err = register_blkdev(DRBD_MAJOR, "drbd");
3731 	if (err) {
3732 		printk(KERN_ERR
3733 		       "drbd: unable to register block device major %d\n",
3734 		       DRBD_MAJOR);
3735 		return err;
3736 	}
3737 
3738 	register_reboot_notifier(&drbd_notifier);
3739 
3740 	/*
3741 	 * allocate all necessary structs
3742 	 */
3743 	err = -ENOMEM;
3744 
3745 	init_waitqueue_head(&drbd_pp_wait);
3746 
3747 	drbd_proc = NULL; /* play safe for drbd_cleanup */
3748 	minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3749 				GFP_KERNEL);
3750 	if (!minor_table)
3751 		goto Enomem;
3752 
3753 	err = drbd_create_mempools();
3754 	if (err)
3755 		goto Enomem;
3756 
3757 	drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3758 	if (!drbd_proc)	{
3759 		printk(KERN_ERR "drbd: unable to register proc file\n");
3760 		goto Enomem;
3761 	}
3762 
3763 	rwlock_init(&global_state_lock);
3764 
3765 	printk(KERN_INFO "drbd: initialized. "
3766 	       "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3767 	       API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3768 	printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3769 	printk(KERN_INFO "drbd: registered as block device major %d\n",
3770 		DRBD_MAJOR);
3771 	printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3772 
3773 	return 0; /* Success! */
3774 
3775 Enomem:
3776 	drbd_cleanup();
3777 	if (err == -ENOMEM)
3778 		/* currently always the case */
3779 		printk(KERN_ERR "drbd: ran out of memory\n");
3780 	else
3781 		printk(KERN_ERR "drbd: initialization failure\n");
3782 	return err;
3783 }
3784 
3785 void drbd_free_bc(struct drbd_backing_dev *ldev)
3786 {
3787 	if (ldev == NULL)
3788 		return;
3789 
3790 	blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3791 	blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3792 
3793 	kfree(ldev);
3794 }
3795 
3796 void drbd_free_sock(struct drbd_conf *mdev)
3797 {
3798 	if (mdev->data.socket) {
3799 		mutex_lock(&mdev->data.mutex);
3800 		kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3801 		sock_release(mdev->data.socket);
3802 		mdev->data.socket = NULL;
3803 		mutex_unlock(&mdev->data.mutex);
3804 	}
3805 	if (mdev->meta.socket) {
3806 		mutex_lock(&mdev->meta.mutex);
3807 		kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3808 		sock_release(mdev->meta.socket);
3809 		mdev->meta.socket = NULL;
3810 		mutex_unlock(&mdev->meta.mutex);
3811 	}
3812 }
3813 
3814 
3815 void drbd_free_resources(struct drbd_conf *mdev)
3816 {
3817 	crypto_free_hash(mdev->csums_tfm);
3818 	mdev->csums_tfm = NULL;
3819 	crypto_free_hash(mdev->verify_tfm);
3820 	mdev->verify_tfm = NULL;
3821 	crypto_free_hash(mdev->cram_hmac_tfm);
3822 	mdev->cram_hmac_tfm = NULL;
3823 	crypto_free_hash(mdev->integrity_w_tfm);
3824 	mdev->integrity_w_tfm = NULL;
3825 	crypto_free_hash(mdev->integrity_r_tfm);
3826 	mdev->integrity_r_tfm = NULL;
3827 
3828 	drbd_free_sock(mdev);
3829 
3830 	__no_warn(local,
3831 		  drbd_free_bc(mdev->ldev);
3832 		  mdev->ldev = NULL;);
3833 }
3834 
3835 /* meta data management */
3836 
3837 struct meta_data_on_disk {
3838 	u64 la_size;           /* last agreed size. */
3839 	u64 uuid[UI_SIZE];   /* UUIDs. */
3840 	u64 device_uuid;
3841 	u64 reserved_u64_1;
3842 	u32 flags;             /* MDF */
3843 	u32 magic;
3844 	u32 md_size_sect;
3845 	u32 al_offset;         /* offset to this block */
3846 	u32 al_nr_extents;     /* important for restoring the AL */
3847 	      /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3848 	u32 bm_offset;         /* offset to the bitmap, from here */
3849 	u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3850 	u32 la_peer_max_bio_size;   /* last peer max_bio_size */
3851 	u32 reserved_u32[3];
3852 
3853 } __packed;
3854 
3855 /**
3856  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3857  * @mdev:	DRBD device.
3858  */
3859 void drbd_md_sync(struct drbd_conf *mdev)
3860 {
3861 	struct meta_data_on_disk *buffer;
3862 	sector_t sector;
3863 	int i;
3864 
3865 	del_timer(&mdev->md_sync_timer);
3866 	/* timer may be rearmed by drbd_md_mark_dirty() now. */
3867 	if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3868 		return;
3869 
3870 	/* We use here D_FAILED and not D_ATTACHING because we try to write
3871 	 * metadata even if we detach due to a disk failure! */
3872 	if (!get_ldev_if_state(mdev, D_FAILED))
3873 		return;
3874 
3875 	buffer = drbd_md_get_buffer(mdev);
3876 	if (!buffer)
3877 		goto out;
3878 
3879 	memset(buffer, 0, 512);
3880 
3881 	buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3882 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3883 		buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3884 	buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3885 	buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3886 
3887 	buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3888 	buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3889 	buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3890 	buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3891 	buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3892 
3893 	buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3894 	buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
3895 
3896 	D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3897 	sector = mdev->ldev->md.md_offset;
3898 
3899 	if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3900 		/* this was a try anyways ... */
3901 		dev_err(DEV, "meta data update failed!\n");
3902 		drbd_chk_io_error(mdev, 1, DRBD_META_IO_ERROR);
3903 	}
3904 
3905 	/* Update mdev->ldev->md.la_size_sect,
3906 	 * since we updated it on metadata. */
3907 	mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3908 
3909 	drbd_md_put_buffer(mdev);
3910 out:
3911 	put_ldev(mdev);
3912 }
3913 
3914 /**
3915  * drbd_md_read() - Reads in the meta data super block
3916  * @mdev:	DRBD device.
3917  * @bdev:	Device from which the meta data should be read in.
3918  *
3919  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
3920  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3921  */
3922 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3923 {
3924 	struct meta_data_on_disk *buffer;
3925 	int i, rv = NO_ERROR;
3926 
3927 	if (!get_ldev_if_state(mdev, D_ATTACHING))
3928 		return ERR_IO_MD_DISK;
3929 
3930 	buffer = drbd_md_get_buffer(mdev);
3931 	if (!buffer)
3932 		goto out;
3933 
3934 	if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3935 		/* NOTE: can't do normal error processing here as this is
3936 		   called BEFORE disk is attached */
3937 		dev_err(DEV, "Error while reading metadata.\n");
3938 		rv = ERR_IO_MD_DISK;
3939 		goto err;
3940 	}
3941 
3942 	if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3943 		dev_err(DEV, "Error while reading metadata, magic not found.\n");
3944 		rv = ERR_MD_INVALID;
3945 		goto err;
3946 	}
3947 	if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3948 		dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3949 		    be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3950 		rv = ERR_MD_INVALID;
3951 		goto err;
3952 	}
3953 	if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3954 		dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3955 		    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3956 		rv = ERR_MD_INVALID;
3957 		goto err;
3958 	}
3959 	if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3960 		dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3961 		    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3962 		rv = ERR_MD_INVALID;
3963 		goto err;
3964 	}
3965 
3966 	if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3967 		dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3968 		    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3969 		rv = ERR_MD_INVALID;
3970 		goto err;
3971 	}
3972 
3973 	bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3974 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3975 		bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3976 	bdev->md.flags = be32_to_cpu(buffer->flags);
3977 	mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3978 	bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3979 
3980 	spin_lock_irq(&mdev->req_lock);
3981 	if (mdev->state.conn < C_CONNECTED) {
3982 		unsigned int peer;
3983 		peer = be32_to_cpu(buffer->la_peer_max_bio_size);
3984 		peer = max(peer, DRBD_MAX_BIO_SIZE_SAFE);
3985 		mdev->peer_max_bio_size = peer;
3986 	}
3987 	spin_unlock_irq(&mdev->req_lock);
3988 
3989 	if (mdev->sync_conf.al_extents < 7)
3990 		mdev->sync_conf.al_extents = 127;
3991 
3992  err:
3993 	drbd_md_put_buffer(mdev);
3994  out:
3995 	put_ldev(mdev);
3996 
3997 	return rv;
3998 }
3999 
4000 /**
4001  * drbd_md_mark_dirty() - Mark meta data super block as dirty
4002  * @mdev:	DRBD device.
4003  *
4004  * Call this function if you change anything that should be written to
4005  * the meta-data super block. This function sets MD_DIRTY, and starts a
4006  * timer that ensures that within five seconds you have to call drbd_md_sync().
4007  */
4008 #ifdef DEBUG
4009 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
4010 {
4011 	if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
4012 		mod_timer(&mdev->md_sync_timer, jiffies + HZ);
4013 		mdev->last_md_mark_dirty.line = line;
4014 		mdev->last_md_mark_dirty.func = func;
4015 	}
4016 }
4017 #else
4018 void drbd_md_mark_dirty(struct drbd_conf *mdev)
4019 {
4020 	if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
4021 		mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
4022 }
4023 #endif
4024 
4025 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
4026 {
4027 	int i;
4028 
4029 	for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
4030 		mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
4031 }
4032 
4033 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
4034 {
4035 	if (idx == UI_CURRENT) {
4036 		if (mdev->state.role == R_PRIMARY)
4037 			val |= 1;
4038 		else
4039 			val &= ~((u64)1);
4040 
4041 		drbd_set_ed_uuid(mdev, val);
4042 	}
4043 
4044 	mdev->ldev->md.uuid[idx] = val;
4045 	drbd_md_mark_dirty(mdev);
4046 }
4047 
4048 
4049 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
4050 {
4051 	if (mdev->ldev->md.uuid[idx]) {
4052 		drbd_uuid_move_history(mdev);
4053 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
4054 	}
4055 	_drbd_uuid_set(mdev, idx, val);
4056 }
4057 
4058 /**
4059  * drbd_uuid_new_current() - Creates a new current UUID
4060  * @mdev:	DRBD device.
4061  *
4062  * Creates a new current UUID, and rotates the old current UUID into
4063  * the bitmap slot. Causes an incremental resync upon next connect.
4064  */
4065 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
4066 {
4067 	u64 val;
4068 	unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
4069 
4070 	if (bm_uuid)
4071 		dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
4072 
4073 	mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
4074 
4075 	get_random_bytes(&val, sizeof(u64));
4076 	_drbd_uuid_set(mdev, UI_CURRENT, val);
4077 	drbd_print_uuids(mdev, "new current UUID");
4078 	/* get it to stable storage _now_ */
4079 	drbd_md_sync(mdev);
4080 }
4081 
4082 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
4083 {
4084 	if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
4085 		return;
4086 
4087 	if (val == 0) {
4088 		drbd_uuid_move_history(mdev);
4089 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
4090 		mdev->ldev->md.uuid[UI_BITMAP] = 0;
4091 	} else {
4092 		unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
4093 		if (bm_uuid)
4094 			dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
4095 
4096 		mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
4097 	}
4098 	drbd_md_mark_dirty(mdev);
4099 }
4100 
4101 /**
4102  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
4103  * @mdev:	DRBD device.
4104  *
4105  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
4106  */
4107 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
4108 {
4109 	int rv = -EIO;
4110 
4111 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
4112 		drbd_md_set_flag(mdev, MDF_FULL_SYNC);
4113 		drbd_md_sync(mdev);
4114 		drbd_bm_set_all(mdev);
4115 
4116 		rv = drbd_bm_write(mdev);
4117 
4118 		if (!rv) {
4119 			drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
4120 			drbd_md_sync(mdev);
4121 		}
4122 
4123 		put_ldev(mdev);
4124 	}
4125 
4126 	return rv;
4127 }
4128 
4129 /**
4130  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
4131  * @mdev:	DRBD device.
4132  *
4133  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
4134  */
4135 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
4136 {
4137 	int rv = -EIO;
4138 
4139 	drbd_resume_al(mdev);
4140 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
4141 		drbd_bm_clear_all(mdev);
4142 		rv = drbd_bm_write(mdev);
4143 		put_ldev(mdev);
4144 	}
4145 
4146 	return rv;
4147 }
4148 
4149 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
4150 {
4151 	struct bm_io_work *work = container_of(w, struct bm_io_work, w);
4152 	int rv = -EIO;
4153 
4154 	D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
4155 
4156 	if (get_ldev(mdev)) {
4157 		drbd_bm_lock(mdev, work->why, work->flags);
4158 		rv = work->io_fn(mdev);
4159 		drbd_bm_unlock(mdev);
4160 		put_ldev(mdev);
4161 	}
4162 
4163 	clear_bit(BITMAP_IO, &mdev->flags);
4164 	smp_mb__after_clear_bit();
4165 	wake_up(&mdev->misc_wait);
4166 
4167 	if (work->done)
4168 		work->done(mdev, rv);
4169 
4170 	clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
4171 	work->why = NULL;
4172 	work->flags = 0;
4173 
4174 	return 1;
4175 }
4176 
4177 void drbd_ldev_destroy(struct drbd_conf *mdev)
4178 {
4179 	lc_destroy(mdev->resync);
4180 	mdev->resync = NULL;
4181 	lc_destroy(mdev->act_log);
4182 	mdev->act_log = NULL;
4183 	__no_warn(local,
4184 		drbd_free_bc(mdev->ldev);
4185 		mdev->ldev = NULL;);
4186 
4187 	if (mdev->md_io_tmpp) {
4188 		__free_page(mdev->md_io_tmpp);
4189 		mdev->md_io_tmpp = NULL;
4190 	}
4191 	clear_bit(GO_DISKLESS, &mdev->flags);
4192 }
4193 
4194 static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused)
4195 {
4196 	D_ASSERT(mdev->state.disk == D_FAILED);
4197 	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
4198 	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
4199 	 * the protected members anymore, though, so once put_ldev reaches zero
4200 	 * again, it will be safe to free them. */
4201 	drbd_force_state(mdev, NS(disk, D_DISKLESS));
4202 	return 1;
4203 }
4204 
4205 void drbd_go_diskless(struct drbd_conf *mdev)
4206 {
4207 	D_ASSERT(mdev->state.disk == D_FAILED);
4208 	if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
4209 		drbd_queue_work(&mdev->data.work, &mdev->go_diskless);
4210 }
4211 
4212 /**
4213  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
4214  * @mdev:	DRBD device.
4215  * @io_fn:	IO callback to be called when bitmap IO is possible
4216  * @done:	callback to be called after the bitmap IO was performed
4217  * @why:	Descriptive text of the reason for doing the IO
4218  *
4219  * While IO on the bitmap happens we freeze application IO thus we ensure
4220  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
4221  * called from worker context. It MUST NOT be used while a previous such
4222  * work is still pending!
4223  */
4224 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
4225 			  int (*io_fn)(struct drbd_conf *),
4226 			  void (*done)(struct drbd_conf *, int),
4227 			  char *why, enum bm_flag flags)
4228 {
4229 	D_ASSERT(current == mdev->worker.task);
4230 
4231 	D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
4232 	D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
4233 	D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
4234 	if (mdev->bm_io_work.why)
4235 		dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
4236 			why, mdev->bm_io_work.why);
4237 
4238 	mdev->bm_io_work.io_fn = io_fn;
4239 	mdev->bm_io_work.done = done;
4240 	mdev->bm_io_work.why = why;
4241 	mdev->bm_io_work.flags = flags;
4242 
4243 	spin_lock_irq(&mdev->req_lock);
4244 	set_bit(BITMAP_IO, &mdev->flags);
4245 	if (atomic_read(&mdev->ap_bio_cnt) == 0) {
4246 		if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
4247 			drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
4248 	}
4249 	spin_unlock_irq(&mdev->req_lock);
4250 }
4251 
4252 /**
4253  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
4254  * @mdev:	DRBD device.
4255  * @io_fn:	IO callback to be called when bitmap IO is possible
4256  * @why:	Descriptive text of the reason for doing the IO
4257  *
4258  * freezes application IO while that the actual IO operations runs. This
4259  * functions MAY NOT be called from worker context.
4260  */
4261 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
4262 		char *why, enum bm_flag flags)
4263 {
4264 	int rv;
4265 
4266 	D_ASSERT(current != mdev->worker.task);
4267 
4268 	if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
4269 		drbd_suspend_io(mdev);
4270 
4271 	drbd_bm_lock(mdev, why, flags);
4272 	rv = io_fn(mdev);
4273 	drbd_bm_unlock(mdev);
4274 
4275 	if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
4276 		drbd_resume_io(mdev);
4277 
4278 	return rv;
4279 }
4280 
4281 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
4282 {
4283 	if ((mdev->ldev->md.flags & flag) != flag) {
4284 		drbd_md_mark_dirty(mdev);
4285 		mdev->ldev->md.flags |= flag;
4286 	}
4287 }
4288 
4289 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
4290 {
4291 	if ((mdev->ldev->md.flags & flag) != 0) {
4292 		drbd_md_mark_dirty(mdev);
4293 		mdev->ldev->md.flags &= ~flag;
4294 	}
4295 }
4296 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
4297 {
4298 	return (bdev->md.flags & flag) != 0;
4299 }
4300 
4301 static void md_sync_timer_fn(unsigned long data)
4302 {
4303 	struct drbd_conf *mdev = (struct drbd_conf *) data;
4304 
4305 	drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
4306 }
4307 
4308 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
4309 {
4310 	dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
4311 #ifdef DEBUG
4312 	dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
4313 		mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
4314 #endif
4315 	drbd_md_sync(mdev);
4316 	return 1;
4317 }
4318 
4319 #ifdef CONFIG_DRBD_FAULT_INJECTION
4320 /* Fault insertion support including random number generator shamelessly
4321  * stolen from kernel/rcutorture.c */
4322 struct fault_random_state {
4323 	unsigned long state;
4324 	unsigned long count;
4325 };
4326 
4327 #define FAULT_RANDOM_MULT 39916801  /* prime */
4328 #define FAULT_RANDOM_ADD	479001701 /* prime */
4329 #define FAULT_RANDOM_REFRESH 10000
4330 
4331 /*
4332  * Crude but fast random-number generator.  Uses a linear congruential
4333  * generator, with occasional help from get_random_bytes().
4334  */
4335 static unsigned long
4336 _drbd_fault_random(struct fault_random_state *rsp)
4337 {
4338 	long refresh;
4339 
4340 	if (!rsp->count--) {
4341 		get_random_bytes(&refresh, sizeof(refresh));
4342 		rsp->state += refresh;
4343 		rsp->count = FAULT_RANDOM_REFRESH;
4344 	}
4345 	rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
4346 	return swahw32(rsp->state);
4347 }
4348 
4349 static char *
4350 _drbd_fault_str(unsigned int type) {
4351 	static char *_faults[] = {
4352 		[DRBD_FAULT_MD_WR] = "Meta-data write",
4353 		[DRBD_FAULT_MD_RD] = "Meta-data read",
4354 		[DRBD_FAULT_RS_WR] = "Resync write",
4355 		[DRBD_FAULT_RS_RD] = "Resync read",
4356 		[DRBD_FAULT_DT_WR] = "Data write",
4357 		[DRBD_FAULT_DT_RD] = "Data read",
4358 		[DRBD_FAULT_DT_RA] = "Data read ahead",
4359 		[DRBD_FAULT_BM_ALLOC] = "BM allocation",
4360 		[DRBD_FAULT_AL_EE] = "EE allocation",
4361 		[DRBD_FAULT_RECEIVE] = "receive data corruption",
4362 	};
4363 
4364 	return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
4365 }
4366 
4367 unsigned int
4368 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
4369 {
4370 	static struct fault_random_state rrs = {0, 0};
4371 
4372 	unsigned int ret = (
4373 		(fault_devs == 0 ||
4374 			((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
4375 		(((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
4376 
4377 	if (ret) {
4378 		fault_count++;
4379 
4380 		if (__ratelimit(&drbd_ratelimit_state))
4381 			dev_warn(DEV, "***Simulating %s failure\n",
4382 				_drbd_fault_str(type));
4383 	}
4384 
4385 	return ret;
4386 }
4387 #endif
4388 
4389 const char *drbd_buildtag(void)
4390 {
4391 	/* DRBD built from external sources has here a reference to the
4392 	   git hash of the source code. */
4393 
4394 	static char buildtag[38] = "\0uilt-in";
4395 
4396 	if (buildtag[0] == 0) {
4397 #ifdef MODULE
4398 		sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
4399 #else
4400 		buildtag[0] = 'b';
4401 #endif
4402 	}
4403 
4404 	return buildtag;
4405 }
4406 
4407 module_init(drbd_init)
4408 module_exit(drbd_cleanup)
4409 
4410 EXPORT_SYMBOL(drbd_conn_str);
4411 EXPORT_SYMBOL(drbd_role_str);
4412 EXPORT_SYMBOL(drbd_disk_str);
4413 EXPORT_SYMBOL(drbd_set_st_err_str);
4414