xref: /openbmc/linux/drivers/block/drbd/drbd_main.c (revision a09d2831)
1 /*
2    drbd.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12 
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17 
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22 
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 
27  */
28 
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/smp_lock.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48 
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52 
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56 
57 #include "drbd_vli.h"
58 
59 struct after_state_chg_work {
60 	struct drbd_work w;
61 	union drbd_state os;
62 	union drbd_state ns;
63 	enum chg_state_flags flags;
64 	struct completion *done;
65 };
66 
67 int drbdd_init(struct drbd_thread *);
68 int drbd_worker(struct drbd_thread *);
69 int drbd_asender(struct drbd_thread *);
70 
71 int drbd_init(void);
72 static int drbd_open(struct block_device *bdev, fmode_t mode);
73 static int drbd_release(struct gendisk *gd, fmode_t mode);
74 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
75 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
76 			   union drbd_state ns, enum chg_state_flags flags);
77 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
78 static void md_sync_timer_fn(unsigned long data);
79 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
80 
81 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
82 	      "Lars Ellenberg <lars@linbit.com>");
83 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
84 MODULE_VERSION(REL_VERSION);
85 MODULE_LICENSE("GPL");
86 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
87 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
88 
89 #include <linux/moduleparam.h>
90 /* allow_open_on_secondary */
91 MODULE_PARM_DESC(allow_oos, "DONT USE!");
92 /* thanks to these macros, if compiled into the kernel (not-module),
93  * this becomes the boot parameter drbd.minor_count */
94 module_param(minor_count, uint, 0444);
95 module_param(disable_sendpage, bool, 0644);
96 module_param(allow_oos, bool, 0);
97 module_param(cn_idx, uint, 0444);
98 module_param(proc_details, int, 0644);
99 
100 #ifdef CONFIG_DRBD_FAULT_INJECTION
101 int enable_faults;
102 int fault_rate;
103 static int fault_count;
104 int fault_devs;
105 /* bitmap of enabled faults */
106 module_param(enable_faults, int, 0664);
107 /* fault rate % value - applies to all enabled faults */
108 module_param(fault_rate, int, 0664);
109 /* count of faults inserted */
110 module_param(fault_count, int, 0664);
111 /* bitmap of devices to insert faults on */
112 module_param(fault_devs, int, 0644);
113 #endif
114 
115 /* module parameter, defined */
116 unsigned int minor_count = 32;
117 int disable_sendpage;
118 int allow_oos;
119 unsigned int cn_idx = CN_IDX_DRBD;
120 int proc_details;       /* Detail level in proc drbd*/
121 
122 /* Module parameter for setting the user mode helper program
123  * to run. Default is /sbin/drbdadm */
124 char usermode_helper[80] = "/sbin/drbdadm";
125 
126 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
127 
128 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
129  * as member "struct gendisk *vdisk;"
130  */
131 struct drbd_conf **minor_table;
132 
133 struct kmem_cache *drbd_request_cache;
134 struct kmem_cache *drbd_ee_cache;	/* epoch entries */
135 struct kmem_cache *drbd_bm_ext_cache;	/* bitmap extents */
136 struct kmem_cache *drbd_al_ext_cache;	/* activity log extents */
137 mempool_t *drbd_request_mempool;
138 mempool_t *drbd_ee_mempool;
139 
140 /* I do not use a standard mempool, because:
141    1) I want to hand out the pre-allocated objects first.
142    2) I want to be able to interrupt sleeping allocation with a signal.
143    Note: This is a single linked list, the next pointer is the private
144 	 member of struct page.
145  */
146 struct page *drbd_pp_pool;
147 spinlock_t   drbd_pp_lock;
148 int          drbd_pp_vacant;
149 wait_queue_head_t drbd_pp_wait;
150 
151 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
152 
153 static const struct block_device_operations drbd_ops = {
154 	.owner =   THIS_MODULE,
155 	.open =    drbd_open,
156 	.release = drbd_release,
157 };
158 
159 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
160 
161 #ifdef __CHECKER__
162 /* When checking with sparse, and this is an inline function, sparse will
163    give tons of false positives. When this is a real functions sparse works.
164  */
165 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
166 {
167 	int io_allowed;
168 
169 	atomic_inc(&mdev->local_cnt);
170 	io_allowed = (mdev->state.disk >= mins);
171 	if (!io_allowed) {
172 		if (atomic_dec_and_test(&mdev->local_cnt))
173 			wake_up(&mdev->misc_wait);
174 	}
175 	return io_allowed;
176 }
177 
178 #endif
179 
180 /**
181  * DOC: The transfer log
182  *
183  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
184  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
185  * of the list. There is always at least one &struct drbd_tl_epoch object.
186  *
187  * Each &struct drbd_tl_epoch has a circular double linked list of requests
188  * attached.
189  */
190 static int tl_init(struct drbd_conf *mdev)
191 {
192 	struct drbd_tl_epoch *b;
193 
194 	/* during device minor initialization, we may well use GFP_KERNEL */
195 	b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
196 	if (!b)
197 		return 0;
198 	INIT_LIST_HEAD(&b->requests);
199 	INIT_LIST_HEAD(&b->w.list);
200 	b->next = NULL;
201 	b->br_number = 4711;
202 	b->n_req = 0;
203 	b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
204 
205 	mdev->oldest_tle = b;
206 	mdev->newest_tle = b;
207 	INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
208 
209 	mdev->tl_hash = NULL;
210 	mdev->tl_hash_s = 0;
211 
212 	return 1;
213 }
214 
215 static void tl_cleanup(struct drbd_conf *mdev)
216 {
217 	D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
218 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
219 	kfree(mdev->oldest_tle);
220 	mdev->oldest_tle = NULL;
221 	kfree(mdev->unused_spare_tle);
222 	mdev->unused_spare_tle = NULL;
223 	kfree(mdev->tl_hash);
224 	mdev->tl_hash = NULL;
225 	mdev->tl_hash_s = 0;
226 }
227 
228 /**
229  * _tl_add_barrier() - Adds a barrier to the transfer log
230  * @mdev:	DRBD device.
231  * @new:	Barrier to be added before the current head of the TL.
232  *
233  * The caller must hold the req_lock.
234  */
235 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
236 {
237 	struct drbd_tl_epoch *newest_before;
238 
239 	INIT_LIST_HEAD(&new->requests);
240 	INIT_LIST_HEAD(&new->w.list);
241 	new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
242 	new->next = NULL;
243 	new->n_req = 0;
244 
245 	newest_before = mdev->newest_tle;
246 	/* never send a barrier number == 0, because that is special-cased
247 	 * when using TCQ for our write ordering code */
248 	new->br_number = (newest_before->br_number+1) ?: 1;
249 	if (mdev->newest_tle != new) {
250 		mdev->newest_tle->next = new;
251 		mdev->newest_tle = new;
252 	}
253 }
254 
255 /**
256  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
257  * @mdev:	DRBD device.
258  * @barrier_nr:	Expected identifier of the DRBD write barrier packet.
259  * @set_size:	Expected number of requests before that barrier.
260  *
261  * In case the passed barrier_nr or set_size does not match the oldest
262  * &struct drbd_tl_epoch objects this function will cause a termination
263  * of the connection.
264  */
265 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
266 		       unsigned int set_size)
267 {
268 	struct drbd_tl_epoch *b, *nob; /* next old barrier */
269 	struct list_head *le, *tle;
270 	struct drbd_request *r;
271 
272 	spin_lock_irq(&mdev->req_lock);
273 
274 	b = mdev->oldest_tle;
275 
276 	/* first some paranoia code */
277 	if (b == NULL) {
278 		dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
279 			barrier_nr);
280 		goto bail;
281 	}
282 	if (b->br_number != barrier_nr) {
283 		dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
284 			barrier_nr, b->br_number);
285 		goto bail;
286 	}
287 	if (b->n_req != set_size) {
288 		dev_err(DEV, "BAD! BarrierAck #%u received with n_req=%u, expected n_req=%u!\n",
289 			barrier_nr, set_size, b->n_req);
290 		goto bail;
291 	}
292 
293 	/* Clean up list of requests processed during current epoch */
294 	list_for_each_safe(le, tle, &b->requests) {
295 		r = list_entry(le, struct drbd_request, tl_requests);
296 		_req_mod(r, barrier_acked);
297 	}
298 	/* There could be requests on the list waiting for completion
299 	   of the write to the local disk. To avoid corruptions of
300 	   slab's data structures we have to remove the lists head.
301 
302 	   Also there could have been a barrier ack out of sequence, overtaking
303 	   the write acks - which would be a bug and violating write ordering.
304 	   To not deadlock in case we lose connection while such requests are
305 	   still pending, we need some way to find them for the
306 	   _req_mode(connection_lost_while_pending).
307 
308 	   These have been list_move'd to the out_of_sequence_requests list in
309 	   _req_mod(, barrier_acked) above.
310 	   */
311 	list_del_init(&b->requests);
312 
313 	nob = b->next;
314 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
315 		_tl_add_barrier(mdev, b);
316 		if (nob)
317 			mdev->oldest_tle = nob;
318 		/* if nob == NULL b was the only barrier, and becomes the new
319 		   barrier. Therefore mdev->oldest_tle points already to b */
320 	} else {
321 		D_ASSERT(nob != NULL);
322 		mdev->oldest_tle = nob;
323 		kfree(b);
324 	}
325 
326 	spin_unlock_irq(&mdev->req_lock);
327 	dec_ap_pending(mdev);
328 
329 	return;
330 
331 bail:
332 	spin_unlock_irq(&mdev->req_lock);
333 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
334 }
335 
336 
337 /**
338  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
339  * @mdev:	DRBD device.
340  *
341  * This is called after the connection to the peer was lost. The storage covered
342  * by the requests on the transfer gets marked as our of sync. Called from the
343  * receiver thread and the worker thread.
344  */
345 void tl_clear(struct drbd_conf *mdev)
346 {
347 	struct drbd_tl_epoch *b, *tmp;
348 	struct list_head *le, *tle;
349 	struct drbd_request *r;
350 	int new_initial_bnr = net_random();
351 
352 	spin_lock_irq(&mdev->req_lock);
353 
354 	b = mdev->oldest_tle;
355 	while (b) {
356 		list_for_each_safe(le, tle, &b->requests) {
357 			r = list_entry(le, struct drbd_request, tl_requests);
358 			/* It would be nice to complete outside of spinlock.
359 			 * But this is easier for now. */
360 			_req_mod(r, connection_lost_while_pending);
361 		}
362 		tmp = b->next;
363 
364 		/* there could still be requests on that ring list,
365 		 * in case local io is still pending */
366 		list_del(&b->requests);
367 
368 		/* dec_ap_pending corresponding to queue_barrier.
369 		 * the newest barrier may not have been queued yet,
370 		 * in which case w.cb is still NULL. */
371 		if (b->w.cb != NULL)
372 			dec_ap_pending(mdev);
373 
374 		if (b == mdev->newest_tle) {
375 			/* recycle, but reinit! */
376 			D_ASSERT(tmp == NULL);
377 			INIT_LIST_HEAD(&b->requests);
378 			INIT_LIST_HEAD(&b->w.list);
379 			b->w.cb = NULL;
380 			b->br_number = new_initial_bnr;
381 			b->n_req = 0;
382 
383 			mdev->oldest_tle = b;
384 			break;
385 		}
386 		kfree(b);
387 		b = tmp;
388 	}
389 
390 	/* we expect this list to be empty. */
391 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
392 
393 	/* but just in case, clean it up anyways! */
394 	list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
395 		r = list_entry(le, struct drbd_request, tl_requests);
396 		/* It would be nice to complete outside of spinlock.
397 		 * But this is easier for now. */
398 		_req_mod(r, connection_lost_while_pending);
399 	}
400 
401 	/* ensure bit indicating barrier is required is clear */
402 	clear_bit(CREATE_BARRIER, &mdev->flags);
403 
404 	spin_unlock_irq(&mdev->req_lock);
405 }
406 
407 /**
408  * cl_wide_st_chg() - TRUE if the state change is a cluster wide one
409  * @mdev:	DRBD device.
410  * @os:		old (current) state.
411  * @ns:		new (wanted) state.
412  */
413 static int cl_wide_st_chg(struct drbd_conf *mdev,
414 			  union drbd_state os, union drbd_state ns)
415 {
416 	return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
417 		 ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
418 		  (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
419 		  (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
420 		  (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
421 		(os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
422 		(os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
423 }
424 
425 int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
426 		      union drbd_state mask, union drbd_state val)
427 {
428 	unsigned long flags;
429 	union drbd_state os, ns;
430 	int rv;
431 
432 	spin_lock_irqsave(&mdev->req_lock, flags);
433 	os = mdev->state;
434 	ns.i = (os.i & ~mask.i) | val.i;
435 	rv = _drbd_set_state(mdev, ns, f, NULL);
436 	ns = mdev->state;
437 	spin_unlock_irqrestore(&mdev->req_lock, flags);
438 
439 	return rv;
440 }
441 
442 /**
443  * drbd_force_state() - Impose a change which happens outside our control on our state
444  * @mdev:	DRBD device.
445  * @mask:	mask of state bits to change.
446  * @val:	value of new state bits.
447  */
448 void drbd_force_state(struct drbd_conf *mdev,
449 	union drbd_state mask, union drbd_state val)
450 {
451 	drbd_change_state(mdev, CS_HARD, mask, val);
452 }
453 
454 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
455 static int is_valid_state_transition(struct drbd_conf *,
456 				     union drbd_state, union drbd_state);
457 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
458 				       union drbd_state ns, int *warn_sync_abort);
459 int drbd_send_state_req(struct drbd_conf *,
460 			union drbd_state, union drbd_state);
461 
462 static enum drbd_state_ret_codes _req_st_cond(struct drbd_conf *mdev,
463 				    union drbd_state mask, union drbd_state val)
464 {
465 	union drbd_state os, ns;
466 	unsigned long flags;
467 	int rv;
468 
469 	if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
470 		return SS_CW_SUCCESS;
471 
472 	if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
473 		return SS_CW_FAILED_BY_PEER;
474 
475 	rv = 0;
476 	spin_lock_irqsave(&mdev->req_lock, flags);
477 	os = mdev->state;
478 	ns.i = (os.i & ~mask.i) | val.i;
479 	ns = sanitize_state(mdev, os, ns, NULL);
480 
481 	if (!cl_wide_st_chg(mdev, os, ns))
482 		rv = SS_CW_NO_NEED;
483 	if (!rv) {
484 		rv = is_valid_state(mdev, ns);
485 		if (rv == SS_SUCCESS) {
486 			rv = is_valid_state_transition(mdev, ns, os);
487 			if (rv == SS_SUCCESS)
488 				rv = 0; /* cont waiting, otherwise fail. */
489 		}
490 	}
491 	spin_unlock_irqrestore(&mdev->req_lock, flags);
492 
493 	return rv;
494 }
495 
496 /**
497  * drbd_req_state() - Perform an eventually cluster wide state change
498  * @mdev:	DRBD device.
499  * @mask:	mask of state bits to change.
500  * @val:	value of new state bits.
501  * @f:		flags
502  *
503  * Should not be called directly, use drbd_request_state() or
504  * _drbd_request_state().
505  */
506 static int drbd_req_state(struct drbd_conf *mdev,
507 			  union drbd_state mask, union drbd_state val,
508 			  enum chg_state_flags f)
509 {
510 	struct completion done;
511 	unsigned long flags;
512 	union drbd_state os, ns;
513 	int rv;
514 
515 	init_completion(&done);
516 
517 	if (f & CS_SERIALIZE)
518 		mutex_lock(&mdev->state_mutex);
519 
520 	spin_lock_irqsave(&mdev->req_lock, flags);
521 	os = mdev->state;
522 	ns.i = (os.i & ~mask.i) | val.i;
523 	ns = sanitize_state(mdev, os, ns, NULL);
524 
525 	if (cl_wide_st_chg(mdev, os, ns)) {
526 		rv = is_valid_state(mdev, ns);
527 		if (rv == SS_SUCCESS)
528 			rv = is_valid_state_transition(mdev, ns, os);
529 		spin_unlock_irqrestore(&mdev->req_lock, flags);
530 
531 		if (rv < SS_SUCCESS) {
532 			if (f & CS_VERBOSE)
533 				print_st_err(mdev, os, ns, rv);
534 			goto abort;
535 		}
536 
537 		drbd_state_lock(mdev);
538 		if (!drbd_send_state_req(mdev, mask, val)) {
539 			drbd_state_unlock(mdev);
540 			rv = SS_CW_FAILED_BY_PEER;
541 			if (f & CS_VERBOSE)
542 				print_st_err(mdev, os, ns, rv);
543 			goto abort;
544 		}
545 
546 		wait_event(mdev->state_wait,
547 			(rv = _req_st_cond(mdev, mask, val)));
548 
549 		if (rv < SS_SUCCESS) {
550 			drbd_state_unlock(mdev);
551 			if (f & CS_VERBOSE)
552 				print_st_err(mdev, os, ns, rv);
553 			goto abort;
554 		}
555 		spin_lock_irqsave(&mdev->req_lock, flags);
556 		os = mdev->state;
557 		ns.i = (os.i & ~mask.i) | val.i;
558 		rv = _drbd_set_state(mdev, ns, f, &done);
559 		drbd_state_unlock(mdev);
560 	} else {
561 		rv = _drbd_set_state(mdev, ns, f, &done);
562 	}
563 
564 	spin_unlock_irqrestore(&mdev->req_lock, flags);
565 
566 	if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
567 		D_ASSERT(current != mdev->worker.task);
568 		wait_for_completion(&done);
569 	}
570 
571 abort:
572 	if (f & CS_SERIALIZE)
573 		mutex_unlock(&mdev->state_mutex);
574 
575 	return rv;
576 }
577 
578 /**
579  * _drbd_request_state() - Request a state change (with flags)
580  * @mdev:	DRBD device.
581  * @mask:	mask of state bits to change.
582  * @val:	value of new state bits.
583  * @f:		flags
584  *
585  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
586  * flag, or when logging of failed state change requests is not desired.
587  */
588 int _drbd_request_state(struct drbd_conf *mdev,	union drbd_state mask,
589 			union drbd_state val,	enum chg_state_flags f)
590 {
591 	int rv;
592 
593 	wait_event(mdev->state_wait,
594 		   (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
595 
596 	return rv;
597 }
598 
599 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
600 {
601 	dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
602 	    name,
603 	    drbd_conn_str(ns.conn),
604 	    drbd_role_str(ns.role),
605 	    drbd_role_str(ns.peer),
606 	    drbd_disk_str(ns.disk),
607 	    drbd_disk_str(ns.pdsk),
608 	    ns.susp ? 's' : 'r',
609 	    ns.aftr_isp ? 'a' : '-',
610 	    ns.peer_isp ? 'p' : '-',
611 	    ns.user_isp ? 'u' : '-'
612 	    );
613 }
614 
615 void print_st_err(struct drbd_conf *mdev,
616 	union drbd_state os, union drbd_state ns, int err)
617 {
618 	if (err == SS_IN_TRANSIENT_STATE)
619 		return;
620 	dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
621 	print_st(mdev, " state", os);
622 	print_st(mdev, "wanted", ns);
623 }
624 
625 
626 #define drbd_peer_str drbd_role_str
627 #define drbd_pdsk_str drbd_disk_str
628 
629 #define drbd_susp_str(A)     ((A) ? "1" : "0")
630 #define drbd_aftr_isp_str(A) ((A) ? "1" : "0")
631 #define drbd_peer_isp_str(A) ((A) ? "1" : "0")
632 #define drbd_user_isp_str(A) ((A) ? "1" : "0")
633 
634 #define PSC(A) \
635 	({ if (ns.A != os.A) { \
636 		pbp += sprintf(pbp, #A "( %s -> %s ) ", \
637 			      drbd_##A##_str(os.A), \
638 			      drbd_##A##_str(ns.A)); \
639 	} })
640 
641 /**
642  * is_valid_state() - Returns an SS_ error code if ns is not valid
643  * @mdev:	DRBD device.
644  * @ns:		State to consider.
645  */
646 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
647 {
648 	/* See drbd_state_sw_errors in drbd_strings.c */
649 
650 	enum drbd_fencing_p fp;
651 	int rv = SS_SUCCESS;
652 
653 	fp = FP_DONT_CARE;
654 	if (get_ldev(mdev)) {
655 		fp = mdev->ldev->dc.fencing;
656 		put_ldev(mdev);
657 	}
658 
659 	if (get_net_conf(mdev)) {
660 		if (!mdev->net_conf->two_primaries &&
661 		    ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
662 			rv = SS_TWO_PRIMARIES;
663 		put_net_conf(mdev);
664 	}
665 
666 	if (rv <= 0)
667 		/* already found a reason to abort */;
668 	else if (ns.role == R_SECONDARY && mdev->open_cnt)
669 		rv = SS_DEVICE_IN_USE;
670 
671 	else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
672 		rv = SS_NO_UP_TO_DATE_DISK;
673 
674 	else if (fp >= FP_RESOURCE &&
675 		 ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
676 		rv = SS_PRIMARY_NOP;
677 
678 	else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
679 		rv = SS_NO_UP_TO_DATE_DISK;
680 
681 	else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
682 		rv = SS_NO_LOCAL_DISK;
683 
684 	else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
685 		rv = SS_NO_REMOTE_DISK;
686 
687 	else if ((ns.conn == C_CONNECTED ||
688 		  ns.conn == C_WF_BITMAP_S ||
689 		  ns.conn == C_SYNC_SOURCE ||
690 		  ns.conn == C_PAUSED_SYNC_S) &&
691 		  ns.disk == D_OUTDATED)
692 		rv = SS_CONNECTED_OUTDATES;
693 
694 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
695 		 (mdev->sync_conf.verify_alg[0] == 0))
696 		rv = SS_NO_VERIFY_ALG;
697 
698 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
699 		  mdev->agreed_pro_version < 88)
700 		rv = SS_NOT_SUPPORTED;
701 
702 	return rv;
703 }
704 
705 /**
706  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
707  * @mdev:	DRBD device.
708  * @ns:		new state.
709  * @os:		old state.
710  */
711 static int is_valid_state_transition(struct drbd_conf *mdev,
712 				     union drbd_state ns, union drbd_state os)
713 {
714 	int rv = SS_SUCCESS;
715 
716 	if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
717 	    os.conn > C_CONNECTED)
718 		rv = SS_RESYNC_RUNNING;
719 
720 	if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
721 		rv = SS_ALREADY_STANDALONE;
722 
723 	if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
724 		rv = SS_IS_DISKLESS;
725 
726 	if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
727 		rv = SS_NO_NET_CONFIG;
728 
729 	if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
730 		rv = SS_LOWER_THAN_OUTDATED;
731 
732 	if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
733 		rv = SS_IN_TRANSIENT_STATE;
734 
735 	if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
736 		rv = SS_IN_TRANSIENT_STATE;
737 
738 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
739 		rv = SS_NEED_CONNECTION;
740 
741 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
742 	    ns.conn != os.conn && os.conn > C_CONNECTED)
743 		rv = SS_RESYNC_RUNNING;
744 
745 	if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
746 	    os.conn < C_CONNECTED)
747 		rv = SS_NEED_CONNECTION;
748 
749 	return rv;
750 }
751 
752 /**
753  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
754  * @mdev:	DRBD device.
755  * @os:		old state.
756  * @ns:		new state.
757  * @warn_sync_abort:
758  *
759  * When we loose connection, we have to set the state of the peers disk (pdsk)
760  * to D_UNKNOWN. This rule and many more along those lines are in this function.
761  */
762 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
763 				       union drbd_state ns, int *warn_sync_abort)
764 {
765 	enum drbd_fencing_p fp;
766 
767 	fp = FP_DONT_CARE;
768 	if (get_ldev(mdev)) {
769 		fp = mdev->ldev->dc.fencing;
770 		put_ldev(mdev);
771 	}
772 
773 	/* Disallow Network errors to configure a device's network part */
774 	if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
775 	    os.conn <= C_DISCONNECTING)
776 		ns.conn = os.conn;
777 
778 	/* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow */
779 	if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
780 	    ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING)
781 		ns.conn = os.conn;
782 
783 	/* After C_DISCONNECTING only C_STANDALONE may follow */
784 	if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
785 		ns.conn = os.conn;
786 
787 	if (ns.conn < C_CONNECTED) {
788 		ns.peer_isp = 0;
789 		ns.peer = R_UNKNOWN;
790 		if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
791 			ns.pdsk = D_UNKNOWN;
792 	}
793 
794 	/* Clear the aftr_isp when becoming unconfigured */
795 	if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
796 		ns.aftr_isp = 0;
797 
798 	if (ns.conn <= C_DISCONNECTING && ns.disk == D_DISKLESS)
799 		ns.pdsk = D_UNKNOWN;
800 
801 	/* Abort resync if a disk fails/detaches */
802 	if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
803 	    (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
804 		if (warn_sync_abort)
805 			*warn_sync_abort = 1;
806 		ns.conn = C_CONNECTED;
807 	}
808 
809 	if (ns.conn >= C_CONNECTED &&
810 	    ((ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED) ||
811 	     (ns.disk == D_NEGOTIATING && ns.conn == C_WF_BITMAP_T))) {
812 		switch (ns.conn) {
813 		case C_WF_BITMAP_T:
814 		case C_PAUSED_SYNC_T:
815 			ns.disk = D_OUTDATED;
816 			break;
817 		case C_CONNECTED:
818 		case C_WF_BITMAP_S:
819 		case C_SYNC_SOURCE:
820 		case C_PAUSED_SYNC_S:
821 			ns.disk = D_UP_TO_DATE;
822 			break;
823 		case C_SYNC_TARGET:
824 			ns.disk = D_INCONSISTENT;
825 			dev_warn(DEV, "Implicitly set disk state Inconsistent!\n");
826 			break;
827 		}
828 		if (os.disk == D_OUTDATED && ns.disk == D_UP_TO_DATE)
829 			dev_warn(DEV, "Implicitly set disk from Outdated to UpToDate\n");
830 	}
831 
832 	if (ns.conn >= C_CONNECTED &&
833 	    (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)) {
834 		switch (ns.conn) {
835 		case C_CONNECTED:
836 		case C_WF_BITMAP_T:
837 		case C_PAUSED_SYNC_T:
838 		case C_SYNC_TARGET:
839 			ns.pdsk = D_UP_TO_DATE;
840 			break;
841 		case C_WF_BITMAP_S:
842 		case C_PAUSED_SYNC_S:
843 			ns.pdsk = D_OUTDATED;
844 			break;
845 		case C_SYNC_SOURCE:
846 			ns.pdsk = D_INCONSISTENT;
847 			dev_warn(DEV, "Implicitly set pdsk Inconsistent!\n");
848 			break;
849 		}
850 		if (os.pdsk == D_OUTDATED && ns.pdsk == D_UP_TO_DATE)
851 			dev_warn(DEV, "Implicitly set pdsk from Outdated to UpToDate\n");
852 	}
853 
854 	/* Connection breaks down before we finished "Negotiating" */
855 	if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
856 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
857 		if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
858 			ns.disk = mdev->new_state_tmp.disk;
859 			ns.pdsk = mdev->new_state_tmp.pdsk;
860 		} else {
861 			dev_alert(DEV, "Connection lost while negotiating, no data!\n");
862 			ns.disk = D_DISKLESS;
863 			ns.pdsk = D_UNKNOWN;
864 		}
865 		put_ldev(mdev);
866 	}
867 
868 	if (fp == FP_STONITH &&
869 	    (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
870 	    !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
871 		ns.susp = 1;
872 
873 	if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
874 		if (ns.conn == C_SYNC_SOURCE)
875 			ns.conn = C_PAUSED_SYNC_S;
876 		if (ns.conn == C_SYNC_TARGET)
877 			ns.conn = C_PAUSED_SYNC_T;
878 	} else {
879 		if (ns.conn == C_PAUSED_SYNC_S)
880 			ns.conn = C_SYNC_SOURCE;
881 		if (ns.conn == C_PAUSED_SYNC_T)
882 			ns.conn = C_SYNC_TARGET;
883 	}
884 
885 	return ns;
886 }
887 
888 /* helper for __drbd_set_state */
889 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
890 {
891 	if (cs == C_VERIFY_T) {
892 		/* starting online verify from an arbitrary position
893 		 * does not fit well into the existing protocol.
894 		 * on C_VERIFY_T, we initialize ov_left and friends
895 		 * implicitly in receive_DataRequest once the
896 		 * first P_OV_REQUEST is received */
897 		mdev->ov_start_sector = ~(sector_t)0;
898 	} else {
899 		unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
900 		if (bit >= mdev->rs_total)
901 			mdev->ov_start_sector =
902 				BM_BIT_TO_SECT(mdev->rs_total - 1);
903 		mdev->ov_position = mdev->ov_start_sector;
904 	}
905 }
906 
907 /**
908  * __drbd_set_state() - Set a new DRBD state
909  * @mdev:	DRBD device.
910  * @ns:		new state.
911  * @flags:	Flags
912  * @done:	Optional completion, that will get completed after the after_state_ch() finished
913  *
914  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
915  */
916 int __drbd_set_state(struct drbd_conf *mdev,
917 		    union drbd_state ns, enum chg_state_flags flags,
918 		    struct completion *done)
919 {
920 	union drbd_state os;
921 	int rv = SS_SUCCESS;
922 	int warn_sync_abort = 0;
923 	struct after_state_chg_work *ascw;
924 
925 	os = mdev->state;
926 
927 	ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
928 
929 	if (ns.i == os.i)
930 		return SS_NOTHING_TO_DO;
931 
932 	if (!(flags & CS_HARD)) {
933 		/*  pre-state-change checks ; only look at ns  */
934 		/* See drbd_state_sw_errors in drbd_strings.c */
935 
936 		rv = is_valid_state(mdev, ns);
937 		if (rv < SS_SUCCESS) {
938 			/* If the old state was illegal as well, then let
939 			   this happen...*/
940 
941 			if (is_valid_state(mdev, os) == rv) {
942 				dev_err(DEV, "Considering state change from bad state. "
943 				    "Error would be: '%s'\n",
944 				    drbd_set_st_err_str(rv));
945 				print_st(mdev, "old", os);
946 				print_st(mdev, "new", ns);
947 				rv = is_valid_state_transition(mdev, ns, os);
948 			}
949 		} else
950 			rv = is_valid_state_transition(mdev, ns, os);
951 	}
952 
953 	if (rv < SS_SUCCESS) {
954 		if (flags & CS_VERBOSE)
955 			print_st_err(mdev, os, ns, rv);
956 		return rv;
957 	}
958 
959 	if (warn_sync_abort)
960 		dev_warn(DEV, "Resync aborted.\n");
961 
962 	{
963 		char *pbp, pb[300];
964 		pbp = pb;
965 		*pbp = 0;
966 		PSC(role);
967 		PSC(peer);
968 		PSC(conn);
969 		PSC(disk);
970 		PSC(pdsk);
971 		PSC(susp);
972 		PSC(aftr_isp);
973 		PSC(peer_isp);
974 		PSC(user_isp);
975 		dev_info(DEV, "%s\n", pb);
976 	}
977 
978 	/* solve the race between becoming unconfigured,
979 	 * worker doing the cleanup, and
980 	 * admin reconfiguring us:
981 	 * on (re)configure, first set CONFIG_PENDING,
982 	 * then wait for a potentially exiting worker,
983 	 * start the worker, and schedule one no_op.
984 	 * then proceed with configuration.
985 	 */
986 	if (ns.disk == D_DISKLESS &&
987 	    ns.conn == C_STANDALONE &&
988 	    ns.role == R_SECONDARY &&
989 	    !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
990 		set_bit(DEVICE_DYING, &mdev->flags);
991 
992 	mdev->state.i = ns.i;
993 	wake_up(&mdev->misc_wait);
994 	wake_up(&mdev->state_wait);
995 
996 	/*   post-state-change actions   */
997 	if (os.conn >= C_SYNC_SOURCE   && ns.conn <= C_CONNECTED) {
998 		set_bit(STOP_SYNC_TIMER, &mdev->flags);
999 		mod_timer(&mdev->resync_timer, jiffies);
1000 	}
1001 
1002 	/* aborted verify run. log the last position */
1003 	if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1004 	    ns.conn < C_CONNECTED) {
1005 		mdev->ov_start_sector =
1006 			BM_BIT_TO_SECT(mdev->rs_total - mdev->ov_left);
1007 		dev_info(DEV, "Online Verify reached sector %llu\n",
1008 			(unsigned long long)mdev->ov_start_sector);
1009 	}
1010 
1011 	if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1012 	    (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1013 		dev_info(DEV, "Syncer continues.\n");
1014 		mdev->rs_paused += (long)jiffies-(long)mdev->rs_mark_time;
1015 		if (ns.conn == C_SYNC_TARGET) {
1016 			if (!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))
1017 				mod_timer(&mdev->resync_timer, jiffies);
1018 			/* This if (!test_bit) is only needed for the case
1019 			   that a device that has ceased to used its timer,
1020 			   i.e. it is already in drbd_resync_finished() gets
1021 			   paused and resumed. */
1022 		}
1023 	}
1024 
1025 	if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1026 	    (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1027 		dev_info(DEV, "Resync suspended\n");
1028 		mdev->rs_mark_time = jiffies;
1029 		if (ns.conn == C_PAUSED_SYNC_T)
1030 			set_bit(STOP_SYNC_TIMER, &mdev->flags);
1031 	}
1032 
1033 	if (os.conn == C_CONNECTED &&
1034 	    (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1035 		mdev->ov_position = 0;
1036 		mdev->rs_total =
1037 		mdev->rs_mark_left = drbd_bm_bits(mdev);
1038 		if (mdev->agreed_pro_version >= 90)
1039 			set_ov_position(mdev, ns.conn);
1040 		else
1041 			mdev->ov_start_sector = 0;
1042 		mdev->ov_left = mdev->rs_total
1043 			      - BM_SECT_TO_BIT(mdev->ov_position);
1044 		mdev->rs_start     =
1045 		mdev->rs_mark_time = jiffies;
1046 		mdev->ov_last_oos_size = 0;
1047 		mdev->ov_last_oos_start = 0;
1048 
1049 		if (ns.conn == C_VERIFY_S) {
1050 			dev_info(DEV, "Starting Online Verify from sector %llu\n",
1051 					(unsigned long long)mdev->ov_position);
1052 			mod_timer(&mdev->resync_timer, jiffies);
1053 		}
1054 	}
1055 
1056 	if (get_ldev(mdev)) {
1057 		u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1058 						 MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1059 						 MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1060 
1061 		if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1062 			mdf |= MDF_CRASHED_PRIMARY;
1063 		if (mdev->state.role == R_PRIMARY ||
1064 		    (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1065 			mdf |= MDF_PRIMARY_IND;
1066 		if (mdev->state.conn > C_WF_REPORT_PARAMS)
1067 			mdf |= MDF_CONNECTED_IND;
1068 		if (mdev->state.disk > D_INCONSISTENT)
1069 			mdf |= MDF_CONSISTENT;
1070 		if (mdev->state.disk > D_OUTDATED)
1071 			mdf |= MDF_WAS_UP_TO_DATE;
1072 		if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1073 			mdf |= MDF_PEER_OUT_DATED;
1074 		if (mdf != mdev->ldev->md.flags) {
1075 			mdev->ldev->md.flags = mdf;
1076 			drbd_md_mark_dirty(mdev);
1077 		}
1078 		if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1079 			drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1080 		put_ldev(mdev);
1081 	}
1082 
1083 	/* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1084 	if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1085 	    os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1086 		set_bit(CONSIDER_RESYNC, &mdev->flags);
1087 
1088 	/* Receiver should clean up itself */
1089 	if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1090 		drbd_thread_stop_nowait(&mdev->receiver);
1091 
1092 	/* Now the receiver finished cleaning up itself, it should die */
1093 	if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1094 		drbd_thread_stop_nowait(&mdev->receiver);
1095 
1096 	/* Upon network failure, we need to restart the receiver. */
1097 	if (os.conn > C_TEAR_DOWN &&
1098 	    ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1099 		drbd_thread_restart_nowait(&mdev->receiver);
1100 
1101 	ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1102 	if (ascw) {
1103 		ascw->os = os;
1104 		ascw->ns = ns;
1105 		ascw->flags = flags;
1106 		ascw->w.cb = w_after_state_ch;
1107 		ascw->done = done;
1108 		drbd_queue_work(&mdev->data.work, &ascw->w);
1109 	} else {
1110 		dev_warn(DEV, "Could not kmalloc an ascw\n");
1111 	}
1112 
1113 	return rv;
1114 }
1115 
1116 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1117 {
1118 	struct after_state_chg_work *ascw =
1119 		container_of(w, struct after_state_chg_work, w);
1120 	after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1121 	if (ascw->flags & CS_WAIT_COMPLETE) {
1122 		D_ASSERT(ascw->done != NULL);
1123 		complete(ascw->done);
1124 	}
1125 	kfree(ascw);
1126 
1127 	return 1;
1128 }
1129 
1130 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1131 {
1132 	if (rv) {
1133 		dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1134 		_drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1135 		return;
1136 	}
1137 
1138 	switch (mdev->state.conn) {
1139 	case C_STARTING_SYNC_T:
1140 		_drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1141 		break;
1142 	case C_STARTING_SYNC_S:
1143 		drbd_start_resync(mdev, C_SYNC_SOURCE);
1144 		break;
1145 	}
1146 }
1147 
1148 /**
1149  * after_state_ch() - Perform after state change actions that may sleep
1150  * @mdev:	DRBD device.
1151  * @os:		old state.
1152  * @ns:		new state.
1153  * @flags:	Flags
1154  */
1155 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1156 			   union drbd_state ns, enum chg_state_flags flags)
1157 {
1158 	enum drbd_fencing_p fp;
1159 
1160 	if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1161 		clear_bit(CRASHED_PRIMARY, &mdev->flags);
1162 		if (mdev->p_uuid)
1163 			mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1164 	}
1165 
1166 	fp = FP_DONT_CARE;
1167 	if (get_ldev(mdev)) {
1168 		fp = mdev->ldev->dc.fencing;
1169 		put_ldev(mdev);
1170 	}
1171 
1172 	/* Inform userspace about the change... */
1173 	drbd_bcast_state(mdev, ns);
1174 
1175 	if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1176 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1177 		drbd_khelper(mdev, "pri-on-incon-degr");
1178 
1179 	/* Here we have the actions that are performed after a
1180 	   state change. This function might sleep */
1181 
1182 	if (fp == FP_STONITH && ns.susp) {
1183 		/* case1: The outdate peer handler is successful:
1184 		 * case2: The connection was established again: */
1185 		if ((os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) ||
1186 		    (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)) {
1187 			tl_clear(mdev);
1188 			spin_lock_irq(&mdev->req_lock);
1189 			_drbd_set_state(_NS(mdev, susp, 0), CS_VERBOSE, NULL);
1190 			spin_unlock_irq(&mdev->req_lock);
1191 		}
1192 	}
1193 	/* Do not change the order of the if above and the two below... */
1194 	if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1195 		drbd_send_uuids(mdev);
1196 		drbd_send_state(mdev);
1197 	}
1198 	if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1199 		drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1200 
1201 	/* Lost contact to peer's copy of the data */
1202 	if ((os.pdsk >= D_INCONSISTENT &&
1203 	     os.pdsk != D_UNKNOWN &&
1204 	     os.pdsk != D_OUTDATED)
1205 	&&  (ns.pdsk < D_INCONSISTENT ||
1206 	     ns.pdsk == D_UNKNOWN ||
1207 	     ns.pdsk == D_OUTDATED)) {
1208 		kfree(mdev->p_uuid);
1209 		mdev->p_uuid = NULL;
1210 		if (get_ldev(mdev)) {
1211 			if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1212 			    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1213 				drbd_uuid_new_current(mdev);
1214 				drbd_send_uuids(mdev);
1215 			}
1216 			put_ldev(mdev);
1217 		}
1218 	}
1219 
1220 	if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1221 		if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0)
1222 			drbd_uuid_new_current(mdev);
1223 
1224 		/* D_DISKLESS Peer becomes secondary */
1225 		if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1226 			drbd_al_to_on_disk_bm(mdev);
1227 		put_ldev(mdev);
1228 	}
1229 
1230 	/* Last part of the attaching process ... */
1231 	if (ns.conn >= C_CONNECTED &&
1232 	    os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1233 		kfree(mdev->p_uuid); /* We expect to receive up-to-date UUIDs soon. */
1234 		mdev->p_uuid = NULL; /* ...to not use the old ones in the mean time */
1235 		drbd_send_sizes(mdev, 0);  /* to start sync... */
1236 		drbd_send_uuids(mdev);
1237 		drbd_send_state(mdev);
1238 	}
1239 
1240 	/* We want to pause/continue resync, tell peer. */
1241 	if (ns.conn >= C_CONNECTED &&
1242 	     ((os.aftr_isp != ns.aftr_isp) ||
1243 	      (os.user_isp != ns.user_isp)))
1244 		drbd_send_state(mdev);
1245 
1246 	/* In case one of the isp bits got set, suspend other devices. */
1247 	if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1248 	    (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1249 		suspend_other_sg(mdev);
1250 
1251 	/* Make sure the peer gets informed about eventual state
1252 	   changes (ISP bits) while we were in WFReportParams. */
1253 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1254 		drbd_send_state(mdev);
1255 
1256 	/* We are in the progress to start a full sync... */
1257 	if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1258 	    (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1259 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1260 
1261 	/* We are invalidating our self... */
1262 	if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1263 	    os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1264 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1265 
1266 	if (os.disk > D_FAILED && ns.disk == D_FAILED) {
1267 		enum drbd_io_error_p eh;
1268 
1269 		eh = EP_PASS_ON;
1270 		if (get_ldev_if_state(mdev, D_FAILED)) {
1271 			eh = mdev->ldev->dc.on_io_error;
1272 			put_ldev(mdev);
1273 		}
1274 
1275 		drbd_rs_cancel_all(mdev);
1276 		/* since get_ldev() only works as long as disk>=D_INCONSISTENT,
1277 		   and it is D_DISKLESS here, local_cnt can only go down, it can
1278 		   not increase... It will reach zero */
1279 		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1280 		mdev->rs_total = 0;
1281 		mdev->rs_failed = 0;
1282 		atomic_set(&mdev->rs_pending_cnt, 0);
1283 
1284 		spin_lock_irq(&mdev->req_lock);
1285 		_drbd_set_state(_NS(mdev, disk, D_DISKLESS), CS_HARD, NULL);
1286 		spin_unlock_irq(&mdev->req_lock);
1287 
1288 		if (eh == EP_CALL_HELPER)
1289 			drbd_khelper(mdev, "local-io-error");
1290 	}
1291 
1292 	if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
1293 
1294 		if (os.disk == D_FAILED) /* && ns.disk == D_DISKLESS*/ {
1295 			if (drbd_send_state(mdev))
1296 				dev_warn(DEV, "Notified peer that my disk is broken.\n");
1297 			else
1298 				dev_err(DEV, "Sending state in drbd_io_error() failed\n");
1299 		}
1300 
1301 		lc_destroy(mdev->resync);
1302 		mdev->resync = NULL;
1303 		lc_destroy(mdev->act_log);
1304 		mdev->act_log = NULL;
1305 		__no_warn(local,
1306 			drbd_free_bc(mdev->ldev);
1307 			mdev->ldev = NULL;);
1308 
1309 		if (mdev->md_io_tmpp)
1310 			__free_page(mdev->md_io_tmpp);
1311 	}
1312 
1313 	/* Disks got bigger while they were detached */
1314 	if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1315 	    test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1316 		if (ns.conn == C_CONNECTED)
1317 			resync_after_online_grow(mdev);
1318 	}
1319 
1320 	/* A resync finished or aborted, wake paused devices... */
1321 	if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1322 	    (os.peer_isp && !ns.peer_isp) ||
1323 	    (os.user_isp && !ns.user_isp))
1324 		resume_next_sg(mdev);
1325 
1326 	/* Upon network connection, we need to start the receiver */
1327 	if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1328 		drbd_thread_start(&mdev->receiver);
1329 
1330 	/* Terminate worker thread if we are unconfigured - it will be
1331 	   restarted as needed... */
1332 	if (ns.disk == D_DISKLESS &&
1333 	    ns.conn == C_STANDALONE &&
1334 	    ns.role == R_SECONDARY) {
1335 		if (os.aftr_isp != ns.aftr_isp)
1336 			resume_next_sg(mdev);
1337 		/* set in __drbd_set_state, unless CONFIG_PENDING was set */
1338 		if (test_bit(DEVICE_DYING, &mdev->flags))
1339 			drbd_thread_stop_nowait(&mdev->worker);
1340 	}
1341 
1342 	drbd_md_sync(mdev);
1343 }
1344 
1345 
1346 static int drbd_thread_setup(void *arg)
1347 {
1348 	struct drbd_thread *thi = (struct drbd_thread *) arg;
1349 	struct drbd_conf *mdev = thi->mdev;
1350 	unsigned long flags;
1351 	int retval;
1352 
1353 restart:
1354 	retval = thi->function(thi);
1355 
1356 	spin_lock_irqsave(&thi->t_lock, flags);
1357 
1358 	/* if the receiver has been "Exiting", the last thing it did
1359 	 * was set the conn state to "StandAlone",
1360 	 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1361 	 * and receiver thread will be "started".
1362 	 * drbd_thread_start needs to set "Restarting" in that case.
1363 	 * t_state check and assignment needs to be within the same spinlock,
1364 	 * so either thread_start sees Exiting, and can remap to Restarting,
1365 	 * or thread_start see None, and can proceed as normal.
1366 	 */
1367 
1368 	if (thi->t_state == Restarting) {
1369 		dev_info(DEV, "Restarting %s\n", current->comm);
1370 		thi->t_state = Running;
1371 		spin_unlock_irqrestore(&thi->t_lock, flags);
1372 		goto restart;
1373 	}
1374 
1375 	thi->task = NULL;
1376 	thi->t_state = None;
1377 	smp_mb();
1378 	complete(&thi->stop);
1379 	spin_unlock_irqrestore(&thi->t_lock, flags);
1380 
1381 	dev_info(DEV, "Terminating %s\n", current->comm);
1382 
1383 	/* Release mod reference taken when thread was started */
1384 	module_put(THIS_MODULE);
1385 	return retval;
1386 }
1387 
1388 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1389 		      int (*func) (struct drbd_thread *))
1390 {
1391 	spin_lock_init(&thi->t_lock);
1392 	thi->task    = NULL;
1393 	thi->t_state = None;
1394 	thi->function = func;
1395 	thi->mdev = mdev;
1396 }
1397 
1398 int drbd_thread_start(struct drbd_thread *thi)
1399 {
1400 	struct drbd_conf *mdev = thi->mdev;
1401 	struct task_struct *nt;
1402 	unsigned long flags;
1403 
1404 	const char *me =
1405 		thi == &mdev->receiver ? "receiver" :
1406 		thi == &mdev->asender  ? "asender"  :
1407 		thi == &mdev->worker   ? "worker"   : "NONSENSE";
1408 
1409 	/* is used from state engine doing drbd_thread_stop_nowait,
1410 	 * while holding the req lock irqsave */
1411 	spin_lock_irqsave(&thi->t_lock, flags);
1412 
1413 	switch (thi->t_state) {
1414 	case None:
1415 		dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1416 				me, current->comm, current->pid);
1417 
1418 		/* Get ref on module for thread - this is released when thread exits */
1419 		if (!try_module_get(THIS_MODULE)) {
1420 			dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1421 			spin_unlock_irqrestore(&thi->t_lock, flags);
1422 			return FALSE;
1423 		}
1424 
1425 		init_completion(&thi->stop);
1426 		D_ASSERT(thi->task == NULL);
1427 		thi->reset_cpu_mask = 1;
1428 		thi->t_state = Running;
1429 		spin_unlock_irqrestore(&thi->t_lock, flags);
1430 		flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1431 
1432 		nt = kthread_create(drbd_thread_setup, (void *) thi,
1433 				    "drbd%d_%s", mdev_to_minor(mdev), me);
1434 
1435 		if (IS_ERR(nt)) {
1436 			dev_err(DEV, "Couldn't start thread\n");
1437 
1438 			module_put(THIS_MODULE);
1439 			return FALSE;
1440 		}
1441 		spin_lock_irqsave(&thi->t_lock, flags);
1442 		thi->task = nt;
1443 		thi->t_state = Running;
1444 		spin_unlock_irqrestore(&thi->t_lock, flags);
1445 		wake_up_process(nt);
1446 		break;
1447 	case Exiting:
1448 		thi->t_state = Restarting;
1449 		dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1450 				me, current->comm, current->pid);
1451 		/* fall through */
1452 	case Running:
1453 	case Restarting:
1454 	default:
1455 		spin_unlock_irqrestore(&thi->t_lock, flags);
1456 		break;
1457 	}
1458 
1459 	return TRUE;
1460 }
1461 
1462 
1463 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1464 {
1465 	unsigned long flags;
1466 
1467 	enum drbd_thread_state ns = restart ? Restarting : Exiting;
1468 
1469 	/* may be called from state engine, holding the req lock irqsave */
1470 	spin_lock_irqsave(&thi->t_lock, flags);
1471 
1472 	if (thi->t_state == None) {
1473 		spin_unlock_irqrestore(&thi->t_lock, flags);
1474 		if (restart)
1475 			drbd_thread_start(thi);
1476 		return;
1477 	}
1478 
1479 	if (thi->t_state != ns) {
1480 		if (thi->task == NULL) {
1481 			spin_unlock_irqrestore(&thi->t_lock, flags);
1482 			return;
1483 		}
1484 
1485 		thi->t_state = ns;
1486 		smp_mb();
1487 		init_completion(&thi->stop);
1488 		if (thi->task != current)
1489 			force_sig(DRBD_SIGKILL, thi->task);
1490 
1491 	}
1492 
1493 	spin_unlock_irqrestore(&thi->t_lock, flags);
1494 
1495 	if (wait)
1496 		wait_for_completion(&thi->stop);
1497 }
1498 
1499 #ifdef CONFIG_SMP
1500 /**
1501  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1502  * @mdev:	DRBD device.
1503  *
1504  * Forces all threads of a device onto the same CPU. This is beneficial for
1505  * DRBD's performance. May be overwritten by user's configuration.
1506  */
1507 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1508 {
1509 	int ord, cpu;
1510 
1511 	/* user override. */
1512 	if (cpumask_weight(mdev->cpu_mask))
1513 		return;
1514 
1515 	ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1516 	for_each_online_cpu(cpu) {
1517 		if (ord-- == 0) {
1518 			cpumask_set_cpu(cpu, mdev->cpu_mask);
1519 			return;
1520 		}
1521 	}
1522 	/* should not be reached */
1523 	cpumask_setall(mdev->cpu_mask);
1524 }
1525 
1526 /**
1527  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1528  * @mdev:	DRBD device.
1529  *
1530  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1531  * prematurely.
1532  */
1533 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1534 {
1535 	struct task_struct *p = current;
1536 	struct drbd_thread *thi =
1537 		p == mdev->asender.task  ? &mdev->asender  :
1538 		p == mdev->receiver.task ? &mdev->receiver :
1539 		p == mdev->worker.task   ? &mdev->worker   :
1540 		NULL;
1541 	ERR_IF(thi == NULL)
1542 		return;
1543 	if (!thi->reset_cpu_mask)
1544 		return;
1545 	thi->reset_cpu_mask = 0;
1546 	set_cpus_allowed_ptr(p, mdev->cpu_mask);
1547 }
1548 #endif
1549 
1550 /* the appropriate socket mutex must be held already */
1551 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1552 			  enum drbd_packets cmd, struct p_header *h,
1553 			  size_t size, unsigned msg_flags)
1554 {
1555 	int sent, ok;
1556 
1557 	ERR_IF(!h) return FALSE;
1558 	ERR_IF(!size) return FALSE;
1559 
1560 	h->magic   = BE_DRBD_MAGIC;
1561 	h->command = cpu_to_be16(cmd);
1562 	h->length  = cpu_to_be16(size-sizeof(struct p_header));
1563 
1564 	sent = drbd_send(mdev, sock, h, size, msg_flags);
1565 
1566 	ok = (sent == size);
1567 	if (!ok)
1568 		dev_err(DEV, "short sent %s size=%d sent=%d\n",
1569 		    cmdname(cmd), (int)size, sent);
1570 	return ok;
1571 }
1572 
1573 /* don't pass the socket. we may only look at it
1574  * when we hold the appropriate socket mutex.
1575  */
1576 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1577 		  enum drbd_packets cmd, struct p_header *h, size_t size)
1578 {
1579 	int ok = 0;
1580 	struct socket *sock;
1581 
1582 	if (use_data_socket) {
1583 		mutex_lock(&mdev->data.mutex);
1584 		sock = mdev->data.socket;
1585 	} else {
1586 		mutex_lock(&mdev->meta.mutex);
1587 		sock = mdev->meta.socket;
1588 	}
1589 
1590 	/* drbd_disconnect() could have called drbd_free_sock()
1591 	 * while we were waiting in down()... */
1592 	if (likely(sock != NULL))
1593 		ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1594 
1595 	if (use_data_socket)
1596 		mutex_unlock(&mdev->data.mutex);
1597 	else
1598 		mutex_unlock(&mdev->meta.mutex);
1599 	return ok;
1600 }
1601 
1602 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1603 		   size_t size)
1604 {
1605 	struct p_header h;
1606 	int ok;
1607 
1608 	h.magic   = BE_DRBD_MAGIC;
1609 	h.command = cpu_to_be16(cmd);
1610 	h.length  = cpu_to_be16(size);
1611 
1612 	if (!drbd_get_data_sock(mdev))
1613 		return 0;
1614 
1615 	ok = (sizeof(h) ==
1616 		drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1617 	ok = ok && (size ==
1618 		drbd_send(mdev, mdev->data.socket, data, size, 0));
1619 
1620 	drbd_put_data_sock(mdev);
1621 
1622 	return ok;
1623 }
1624 
1625 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1626 {
1627 	struct p_rs_param_89 *p;
1628 	struct socket *sock;
1629 	int size, rv;
1630 	const int apv = mdev->agreed_pro_version;
1631 
1632 	size = apv <= 87 ? sizeof(struct p_rs_param)
1633 		: apv == 88 ? sizeof(struct p_rs_param)
1634 			+ strlen(mdev->sync_conf.verify_alg) + 1
1635 		: /* 89 */    sizeof(struct p_rs_param_89);
1636 
1637 	/* used from admin command context and receiver/worker context.
1638 	 * to avoid kmalloc, grab the socket right here,
1639 	 * then use the pre-allocated sbuf there */
1640 	mutex_lock(&mdev->data.mutex);
1641 	sock = mdev->data.socket;
1642 
1643 	if (likely(sock != NULL)) {
1644 		enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1645 
1646 		p = &mdev->data.sbuf.rs_param_89;
1647 
1648 		/* initialize verify_alg and csums_alg */
1649 		memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1650 
1651 		p->rate = cpu_to_be32(sc->rate);
1652 
1653 		if (apv >= 88)
1654 			strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1655 		if (apv >= 89)
1656 			strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1657 
1658 		rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1659 	} else
1660 		rv = 0; /* not ok */
1661 
1662 	mutex_unlock(&mdev->data.mutex);
1663 
1664 	return rv;
1665 }
1666 
1667 int drbd_send_protocol(struct drbd_conf *mdev)
1668 {
1669 	struct p_protocol *p;
1670 	int size, rv;
1671 
1672 	size = sizeof(struct p_protocol);
1673 
1674 	if (mdev->agreed_pro_version >= 87)
1675 		size += strlen(mdev->net_conf->integrity_alg) + 1;
1676 
1677 	/* we must not recurse into our own queue,
1678 	 * as that is blocked during handshake */
1679 	p = kmalloc(size, GFP_NOIO);
1680 	if (p == NULL)
1681 		return 0;
1682 
1683 	p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1684 	p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1685 	p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1686 	p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1687 	p->want_lose     = cpu_to_be32(mdev->net_conf->want_lose);
1688 	p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1689 
1690 	if (mdev->agreed_pro_version >= 87)
1691 		strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1692 
1693 	rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1694 			   (struct p_header *)p, size);
1695 	kfree(p);
1696 	return rv;
1697 }
1698 
1699 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1700 {
1701 	struct p_uuids p;
1702 	int i;
1703 
1704 	if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1705 		return 1;
1706 
1707 	for (i = UI_CURRENT; i < UI_SIZE; i++)
1708 		p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1709 
1710 	mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1711 	p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1712 	uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1713 	uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1714 	uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1715 	p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1716 
1717 	put_ldev(mdev);
1718 
1719 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
1720 			     (struct p_header *)&p, sizeof(p));
1721 }
1722 
1723 int drbd_send_uuids(struct drbd_conf *mdev)
1724 {
1725 	return _drbd_send_uuids(mdev, 0);
1726 }
1727 
1728 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1729 {
1730 	return _drbd_send_uuids(mdev, 8);
1731 }
1732 
1733 
1734 int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
1735 {
1736 	struct p_rs_uuid p;
1737 
1738 	p.uuid = cpu_to_be64(val);
1739 
1740 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
1741 			     (struct p_header *)&p, sizeof(p));
1742 }
1743 
1744 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply)
1745 {
1746 	struct p_sizes p;
1747 	sector_t d_size, u_size;
1748 	int q_order_type;
1749 	int ok;
1750 
1751 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1752 		D_ASSERT(mdev->ldev->backing_bdev);
1753 		d_size = drbd_get_max_capacity(mdev->ldev);
1754 		u_size = mdev->ldev->dc.disk_size;
1755 		q_order_type = drbd_queue_order_type(mdev);
1756 		p.queue_order_type = cpu_to_be32(drbd_queue_order_type(mdev));
1757 		put_ldev(mdev);
1758 	} else {
1759 		d_size = 0;
1760 		u_size = 0;
1761 		q_order_type = QUEUE_ORDERED_NONE;
1762 	}
1763 
1764 	p.d_size = cpu_to_be64(d_size);
1765 	p.u_size = cpu_to_be64(u_size);
1766 	p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1767 	p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue));
1768 	p.queue_order_type = cpu_to_be32(q_order_type);
1769 
1770 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
1771 			   (struct p_header *)&p, sizeof(p));
1772 	return ok;
1773 }
1774 
1775 /**
1776  * drbd_send_state() - Sends the drbd state to the peer
1777  * @mdev:	DRBD device.
1778  */
1779 int drbd_send_state(struct drbd_conf *mdev)
1780 {
1781 	struct socket *sock;
1782 	struct p_state p;
1783 	int ok = 0;
1784 
1785 	/* Grab state lock so we wont send state if we're in the middle
1786 	 * of a cluster wide state change on another thread */
1787 	drbd_state_lock(mdev);
1788 
1789 	mutex_lock(&mdev->data.mutex);
1790 
1791 	p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1792 	sock = mdev->data.socket;
1793 
1794 	if (likely(sock != NULL)) {
1795 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
1796 				    (struct p_header *)&p, sizeof(p), 0);
1797 	}
1798 
1799 	mutex_unlock(&mdev->data.mutex);
1800 
1801 	drbd_state_unlock(mdev);
1802 	return ok;
1803 }
1804 
1805 int drbd_send_state_req(struct drbd_conf *mdev,
1806 	union drbd_state mask, union drbd_state val)
1807 {
1808 	struct p_req_state p;
1809 
1810 	p.mask    = cpu_to_be32(mask.i);
1811 	p.val     = cpu_to_be32(val.i);
1812 
1813 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
1814 			     (struct p_header *)&p, sizeof(p));
1815 }
1816 
1817 int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
1818 {
1819 	struct p_req_state_reply p;
1820 
1821 	p.retcode    = cpu_to_be32(retcode);
1822 
1823 	return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
1824 			     (struct p_header *)&p, sizeof(p));
1825 }
1826 
1827 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1828 	struct p_compressed_bm *p,
1829 	struct bm_xfer_ctx *c)
1830 {
1831 	struct bitstream bs;
1832 	unsigned long plain_bits;
1833 	unsigned long tmp;
1834 	unsigned long rl;
1835 	unsigned len;
1836 	unsigned toggle;
1837 	int bits;
1838 
1839 	/* may we use this feature? */
1840 	if ((mdev->sync_conf.use_rle == 0) ||
1841 		(mdev->agreed_pro_version < 90))
1842 			return 0;
1843 
1844 	if (c->bit_offset >= c->bm_bits)
1845 		return 0; /* nothing to do. */
1846 
1847 	/* use at most thus many bytes */
1848 	bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1849 	memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1850 	/* plain bits covered in this code string */
1851 	plain_bits = 0;
1852 
1853 	/* p->encoding & 0x80 stores whether the first run length is set.
1854 	 * bit offset is implicit.
1855 	 * start with toggle == 2 to be able to tell the first iteration */
1856 	toggle = 2;
1857 
1858 	/* see how much plain bits we can stuff into one packet
1859 	 * using RLE and VLI. */
1860 	do {
1861 		tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1862 				    : _drbd_bm_find_next(mdev, c->bit_offset);
1863 		if (tmp == -1UL)
1864 			tmp = c->bm_bits;
1865 		rl = tmp - c->bit_offset;
1866 
1867 		if (toggle == 2) { /* first iteration */
1868 			if (rl == 0) {
1869 				/* the first checked bit was set,
1870 				 * store start value, */
1871 				DCBP_set_start(p, 1);
1872 				/* but skip encoding of zero run length */
1873 				toggle = !toggle;
1874 				continue;
1875 			}
1876 			DCBP_set_start(p, 0);
1877 		}
1878 
1879 		/* paranoia: catch zero runlength.
1880 		 * can only happen if bitmap is modified while we scan it. */
1881 		if (rl == 0) {
1882 			dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1883 			    "t:%u bo:%lu\n", toggle, c->bit_offset);
1884 			return -1;
1885 		}
1886 
1887 		bits = vli_encode_bits(&bs, rl);
1888 		if (bits == -ENOBUFS) /* buffer full */
1889 			break;
1890 		if (bits <= 0) {
1891 			dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1892 			return 0;
1893 		}
1894 
1895 		toggle = !toggle;
1896 		plain_bits += rl;
1897 		c->bit_offset = tmp;
1898 	} while (c->bit_offset < c->bm_bits);
1899 
1900 	len = bs.cur.b - p->code + !!bs.cur.bit;
1901 
1902 	if (plain_bits < (len << 3)) {
1903 		/* incompressible with this method.
1904 		 * we need to rewind both word and bit position. */
1905 		c->bit_offset -= plain_bits;
1906 		bm_xfer_ctx_bit_to_word_offset(c);
1907 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1908 		return 0;
1909 	}
1910 
1911 	/* RLE + VLI was able to compress it just fine.
1912 	 * update c->word_offset. */
1913 	bm_xfer_ctx_bit_to_word_offset(c);
1914 
1915 	/* store pad_bits */
1916 	DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1917 
1918 	return len;
1919 }
1920 
1921 enum { OK, FAILED, DONE }
1922 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1923 	struct p_header *h, struct bm_xfer_ctx *c)
1924 {
1925 	struct p_compressed_bm *p = (void*)h;
1926 	unsigned long num_words;
1927 	int len;
1928 	int ok;
1929 
1930 	len = fill_bitmap_rle_bits(mdev, p, c);
1931 
1932 	if (len < 0)
1933 		return FAILED;
1934 
1935 	if (len) {
1936 		DCBP_set_code(p, RLE_VLI_Bits);
1937 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
1938 			sizeof(*p) + len, 0);
1939 
1940 		c->packets[0]++;
1941 		c->bytes[0] += sizeof(*p) + len;
1942 
1943 		if (c->bit_offset >= c->bm_bits)
1944 			len = 0; /* DONE */
1945 	} else {
1946 		/* was not compressible.
1947 		 * send a buffer full of plain text bits instead. */
1948 		num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1949 		len = num_words * sizeof(long);
1950 		if (len)
1951 			drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
1952 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
1953 				   h, sizeof(struct p_header) + len, 0);
1954 		c->word_offset += num_words;
1955 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1956 
1957 		c->packets[1]++;
1958 		c->bytes[1] += sizeof(struct p_header) + len;
1959 
1960 		if (c->bit_offset > c->bm_bits)
1961 			c->bit_offset = c->bm_bits;
1962 	}
1963 	ok = ok ? ((len == 0) ? DONE : OK) : FAILED;
1964 
1965 	if (ok == DONE)
1966 		INFO_bm_xfer_stats(mdev, "send", c);
1967 	return ok;
1968 }
1969 
1970 /* See the comment at receive_bitmap() */
1971 int _drbd_send_bitmap(struct drbd_conf *mdev)
1972 {
1973 	struct bm_xfer_ctx c;
1974 	struct p_header *p;
1975 	int ret;
1976 
1977 	ERR_IF(!mdev->bitmap) return FALSE;
1978 
1979 	/* maybe we should use some per thread scratch page,
1980 	 * and allocate that during initial device creation? */
1981 	p = (struct p_header *) __get_free_page(GFP_NOIO);
1982 	if (!p) {
1983 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
1984 		return FALSE;
1985 	}
1986 
1987 	if (get_ldev(mdev)) {
1988 		if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1989 			dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1990 			drbd_bm_set_all(mdev);
1991 			if (drbd_bm_write(mdev)) {
1992 				/* write_bm did fail! Leave full sync flag set in Meta P_DATA
1993 				 * but otherwise process as per normal - need to tell other
1994 				 * side that a full resync is required! */
1995 				dev_err(DEV, "Failed to write bitmap to disk!\n");
1996 			} else {
1997 				drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1998 				drbd_md_sync(mdev);
1999 			}
2000 		}
2001 		put_ldev(mdev);
2002 	}
2003 
2004 	c = (struct bm_xfer_ctx) {
2005 		.bm_bits = drbd_bm_bits(mdev),
2006 		.bm_words = drbd_bm_words(mdev),
2007 	};
2008 
2009 	do {
2010 		ret = send_bitmap_rle_or_plain(mdev, p, &c);
2011 	} while (ret == OK);
2012 
2013 	free_page((unsigned long) p);
2014 	return (ret == DONE);
2015 }
2016 
2017 int drbd_send_bitmap(struct drbd_conf *mdev)
2018 {
2019 	int err;
2020 
2021 	if (!drbd_get_data_sock(mdev))
2022 		return -1;
2023 	err = !_drbd_send_bitmap(mdev);
2024 	drbd_put_data_sock(mdev);
2025 	return err;
2026 }
2027 
2028 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2029 {
2030 	int ok;
2031 	struct p_barrier_ack p;
2032 
2033 	p.barrier  = barrier_nr;
2034 	p.set_size = cpu_to_be32(set_size);
2035 
2036 	if (mdev->state.conn < C_CONNECTED)
2037 		return FALSE;
2038 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2039 			(struct p_header *)&p, sizeof(p));
2040 	return ok;
2041 }
2042 
2043 /**
2044  * _drbd_send_ack() - Sends an ack packet
2045  * @mdev:	DRBD device.
2046  * @cmd:	Packet command code.
2047  * @sector:	sector, needs to be in big endian byte order
2048  * @blksize:	size in byte, needs to be in big endian byte order
2049  * @block_id:	Id, big endian byte order
2050  */
2051 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2052 			  u64 sector,
2053 			  u32 blksize,
2054 			  u64 block_id)
2055 {
2056 	int ok;
2057 	struct p_block_ack p;
2058 
2059 	p.sector   = sector;
2060 	p.block_id = block_id;
2061 	p.blksize  = blksize;
2062 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2063 
2064 	if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2065 		return FALSE;
2066 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2067 				(struct p_header *)&p, sizeof(p));
2068 	return ok;
2069 }
2070 
2071 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2072 		     struct p_data *dp)
2073 {
2074 	const int header_size = sizeof(struct p_data)
2075 			      - sizeof(struct p_header);
2076 	int data_size  = ((struct p_header *)dp)->length - header_size;
2077 
2078 	return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2079 			      dp->block_id);
2080 }
2081 
2082 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2083 		     struct p_block_req *rp)
2084 {
2085 	return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2086 }
2087 
2088 /**
2089  * drbd_send_ack() - Sends an ack packet
2090  * @mdev:	DRBD device.
2091  * @cmd:	Packet command code.
2092  * @e:		Epoch entry.
2093  */
2094 int drbd_send_ack(struct drbd_conf *mdev,
2095 	enum drbd_packets cmd, struct drbd_epoch_entry *e)
2096 {
2097 	return _drbd_send_ack(mdev, cmd,
2098 			      cpu_to_be64(e->sector),
2099 			      cpu_to_be32(e->size),
2100 			      e->block_id);
2101 }
2102 
2103 /* This function misuses the block_id field to signal if the blocks
2104  * are is sync or not. */
2105 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2106 		     sector_t sector, int blksize, u64 block_id)
2107 {
2108 	return _drbd_send_ack(mdev, cmd,
2109 			      cpu_to_be64(sector),
2110 			      cpu_to_be32(blksize),
2111 			      cpu_to_be64(block_id));
2112 }
2113 
2114 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2115 		       sector_t sector, int size, u64 block_id)
2116 {
2117 	int ok;
2118 	struct p_block_req p;
2119 
2120 	p.sector   = cpu_to_be64(sector);
2121 	p.block_id = block_id;
2122 	p.blksize  = cpu_to_be32(size);
2123 
2124 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2125 				(struct p_header *)&p, sizeof(p));
2126 	return ok;
2127 }
2128 
2129 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2130 			    sector_t sector, int size,
2131 			    void *digest, int digest_size,
2132 			    enum drbd_packets cmd)
2133 {
2134 	int ok;
2135 	struct p_block_req p;
2136 
2137 	p.sector   = cpu_to_be64(sector);
2138 	p.block_id = BE_DRBD_MAGIC + 0xbeef;
2139 	p.blksize  = cpu_to_be32(size);
2140 
2141 	p.head.magic   = BE_DRBD_MAGIC;
2142 	p.head.command = cpu_to_be16(cmd);
2143 	p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header) + digest_size);
2144 
2145 	mutex_lock(&mdev->data.mutex);
2146 
2147 	ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2148 	ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2149 
2150 	mutex_unlock(&mdev->data.mutex);
2151 
2152 	return ok;
2153 }
2154 
2155 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2156 {
2157 	int ok;
2158 	struct p_block_req p;
2159 
2160 	p.sector   = cpu_to_be64(sector);
2161 	p.block_id = BE_DRBD_MAGIC + 0xbabe;
2162 	p.blksize  = cpu_to_be32(size);
2163 
2164 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2165 			   (struct p_header *)&p, sizeof(p));
2166 	return ok;
2167 }
2168 
2169 /* called on sndtimeo
2170  * returns FALSE if we should retry,
2171  * TRUE if we think connection is dead
2172  */
2173 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2174 {
2175 	int drop_it;
2176 	/* long elapsed = (long)(jiffies - mdev->last_received); */
2177 
2178 	drop_it =   mdev->meta.socket == sock
2179 		|| !mdev->asender.task
2180 		|| get_t_state(&mdev->asender) != Running
2181 		|| mdev->state.conn < C_CONNECTED;
2182 
2183 	if (drop_it)
2184 		return TRUE;
2185 
2186 	drop_it = !--mdev->ko_count;
2187 	if (!drop_it) {
2188 		dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2189 		       current->comm, current->pid, mdev->ko_count);
2190 		request_ping(mdev);
2191 	}
2192 
2193 	return drop_it; /* && (mdev->state == R_PRIMARY) */;
2194 }
2195 
2196 /* The idea of sendpage seems to be to put some kind of reference
2197  * to the page into the skb, and to hand it over to the NIC. In
2198  * this process get_page() gets called.
2199  *
2200  * As soon as the page was really sent over the network put_page()
2201  * gets called by some part of the network layer. [ NIC driver? ]
2202  *
2203  * [ get_page() / put_page() increment/decrement the count. If count
2204  *   reaches 0 the page will be freed. ]
2205  *
2206  * This works nicely with pages from FSs.
2207  * But this means that in protocol A we might signal IO completion too early!
2208  *
2209  * In order not to corrupt data during a resync we must make sure
2210  * that we do not reuse our own buffer pages (EEs) to early, therefore
2211  * we have the net_ee list.
2212  *
2213  * XFS seems to have problems, still, it submits pages with page_count == 0!
2214  * As a workaround, we disable sendpage on pages
2215  * with page_count == 0 or PageSlab.
2216  */
2217 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2218 		   int offset, size_t size)
2219 {
2220 	int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, 0);
2221 	kunmap(page);
2222 	if (sent == size)
2223 		mdev->send_cnt += size>>9;
2224 	return sent == size;
2225 }
2226 
2227 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2228 		    int offset, size_t size)
2229 {
2230 	mm_segment_t oldfs = get_fs();
2231 	int sent, ok;
2232 	int len = size;
2233 
2234 	/* e.g. XFS meta- & log-data is in slab pages, which have a
2235 	 * page_count of 0 and/or have PageSlab() set.
2236 	 * we cannot use send_page for those, as that does get_page();
2237 	 * put_page(); and would cause either a VM_BUG directly, or
2238 	 * __page_cache_release a page that would actually still be referenced
2239 	 * by someone, leading to some obscure delayed Oops somewhere else. */
2240 	if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2241 		return _drbd_no_send_page(mdev, page, offset, size);
2242 
2243 	drbd_update_congested(mdev);
2244 	set_fs(KERNEL_DS);
2245 	do {
2246 		sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2247 							offset, len,
2248 							MSG_NOSIGNAL);
2249 		if (sent == -EAGAIN) {
2250 			if (we_should_drop_the_connection(mdev,
2251 							  mdev->data.socket))
2252 				break;
2253 			else
2254 				continue;
2255 		}
2256 		if (sent <= 0) {
2257 			dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2258 			     __func__, (int)size, len, sent);
2259 			break;
2260 		}
2261 		len    -= sent;
2262 		offset += sent;
2263 	} while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2264 	set_fs(oldfs);
2265 	clear_bit(NET_CONGESTED, &mdev->flags);
2266 
2267 	ok = (len == 0);
2268 	if (likely(ok))
2269 		mdev->send_cnt += size>>9;
2270 	return ok;
2271 }
2272 
2273 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2274 {
2275 	struct bio_vec *bvec;
2276 	int i;
2277 	__bio_for_each_segment(bvec, bio, i, 0) {
2278 		if (!_drbd_no_send_page(mdev, bvec->bv_page,
2279 				     bvec->bv_offset, bvec->bv_len))
2280 			return 0;
2281 	}
2282 	return 1;
2283 }
2284 
2285 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2286 {
2287 	struct bio_vec *bvec;
2288 	int i;
2289 	__bio_for_each_segment(bvec, bio, i, 0) {
2290 		if (!_drbd_send_page(mdev, bvec->bv_page,
2291 				     bvec->bv_offset, bvec->bv_len))
2292 			return 0;
2293 	}
2294 
2295 	return 1;
2296 }
2297 
2298 /* Used to send write requests
2299  * R_PRIMARY -> Peer	(P_DATA)
2300  */
2301 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2302 {
2303 	int ok = 1;
2304 	struct p_data p;
2305 	unsigned int dp_flags = 0;
2306 	void *dgb;
2307 	int dgs;
2308 
2309 	if (!drbd_get_data_sock(mdev))
2310 		return 0;
2311 
2312 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2313 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2314 
2315 	p.head.magic   = BE_DRBD_MAGIC;
2316 	p.head.command = cpu_to_be16(P_DATA);
2317 	p.head.length  =
2318 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + req->size);
2319 
2320 	p.sector   = cpu_to_be64(req->sector);
2321 	p.block_id = (unsigned long)req;
2322 	p.seq_num  = cpu_to_be32(req->seq_num =
2323 				 atomic_add_return(1, &mdev->packet_seq));
2324 	dp_flags = 0;
2325 
2326 	/* NOTE: no need to check if barriers supported here as we would
2327 	 *       not pass the test in make_request_common in that case
2328 	 */
2329 	if (bio_rw_flagged(req->master_bio, BIO_RW_BARRIER)) {
2330 		dev_err(DEV, "ASSERT FAILED would have set DP_HARDBARRIER\n");
2331 		/* dp_flags |= DP_HARDBARRIER; */
2332 	}
2333 	if (bio_rw_flagged(req->master_bio, BIO_RW_SYNCIO))
2334 		dp_flags |= DP_RW_SYNC;
2335 	/* for now handle SYNCIO and UNPLUG
2336 	 * as if they still were one and the same flag */
2337 	if (bio_rw_flagged(req->master_bio, BIO_RW_UNPLUG))
2338 		dp_flags |= DP_RW_SYNC;
2339 	if (mdev->state.conn >= C_SYNC_SOURCE &&
2340 	    mdev->state.conn <= C_PAUSED_SYNC_T)
2341 		dp_flags |= DP_MAY_SET_IN_SYNC;
2342 
2343 	p.dp_flags = cpu_to_be32(dp_flags);
2344 	set_bit(UNPLUG_REMOTE, &mdev->flags);
2345 	ok = (sizeof(p) ==
2346 		drbd_send(mdev, mdev->data.socket, &p, sizeof(p), MSG_MORE));
2347 	if (ok && dgs) {
2348 		dgb = mdev->int_dig_out;
2349 		drbd_csum(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2350 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2351 	}
2352 	if (ok) {
2353 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
2354 			ok = _drbd_send_bio(mdev, req->master_bio);
2355 		else
2356 			ok = _drbd_send_zc_bio(mdev, req->master_bio);
2357 	}
2358 
2359 	drbd_put_data_sock(mdev);
2360 	return ok;
2361 }
2362 
2363 /* answer packet, used to send data back for read requests:
2364  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2365  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2366  */
2367 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2368 		    struct drbd_epoch_entry *e)
2369 {
2370 	int ok;
2371 	struct p_data p;
2372 	void *dgb;
2373 	int dgs;
2374 
2375 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2376 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2377 
2378 	p.head.magic   = BE_DRBD_MAGIC;
2379 	p.head.command = cpu_to_be16(cmd);
2380 	p.head.length  =
2381 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + e->size);
2382 
2383 	p.sector   = cpu_to_be64(e->sector);
2384 	p.block_id = e->block_id;
2385 	/* p.seq_num  = 0;    No sequence numbers here.. */
2386 
2387 	/* Only called by our kernel thread.
2388 	 * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2389 	 * in response to admin command or module unload.
2390 	 */
2391 	if (!drbd_get_data_sock(mdev))
2392 		return 0;
2393 
2394 	ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p,
2395 					sizeof(p), MSG_MORE);
2396 	if (ok && dgs) {
2397 		dgb = mdev->int_dig_out;
2398 		drbd_csum(mdev, mdev->integrity_w_tfm, e->private_bio, dgb);
2399 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2400 	}
2401 	if (ok)
2402 		ok = _drbd_send_zc_bio(mdev, e->private_bio);
2403 
2404 	drbd_put_data_sock(mdev);
2405 	return ok;
2406 }
2407 
2408 /*
2409   drbd_send distinguishes two cases:
2410 
2411   Packets sent via the data socket "sock"
2412   and packets sent via the meta data socket "msock"
2413 
2414 		    sock                      msock
2415   -----------------+-------------------------+------------------------------
2416   timeout           conf.timeout / 2          conf.timeout / 2
2417   timeout action    send a ping via msock     Abort communication
2418 					      and close all sockets
2419 */
2420 
2421 /*
2422  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2423  */
2424 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2425 	      void *buf, size_t size, unsigned msg_flags)
2426 {
2427 	struct kvec iov;
2428 	struct msghdr msg;
2429 	int rv, sent = 0;
2430 
2431 	if (!sock)
2432 		return -1000;
2433 
2434 	/* THINK  if (signal_pending) return ... ? */
2435 
2436 	iov.iov_base = buf;
2437 	iov.iov_len  = size;
2438 
2439 	msg.msg_name       = NULL;
2440 	msg.msg_namelen    = 0;
2441 	msg.msg_control    = NULL;
2442 	msg.msg_controllen = 0;
2443 	msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2444 
2445 	if (sock == mdev->data.socket) {
2446 		mdev->ko_count = mdev->net_conf->ko_count;
2447 		drbd_update_congested(mdev);
2448 	}
2449 	do {
2450 		/* STRANGE
2451 		 * tcp_sendmsg does _not_ use its size parameter at all ?
2452 		 *
2453 		 * -EAGAIN on timeout, -EINTR on signal.
2454 		 */
2455 /* THINK
2456  * do we need to block DRBD_SIG if sock == &meta.socket ??
2457  * otherwise wake_asender() might interrupt some send_*Ack !
2458  */
2459 		rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2460 		if (rv == -EAGAIN) {
2461 			if (we_should_drop_the_connection(mdev, sock))
2462 				break;
2463 			else
2464 				continue;
2465 		}
2466 		D_ASSERT(rv != 0);
2467 		if (rv == -EINTR) {
2468 			flush_signals(current);
2469 			rv = 0;
2470 		}
2471 		if (rv < 0)
2472 			break;
2473 		sent += rv;
2474 		iov.iov_base += rv;
2475 		iov.iov_len  -= rv;
2476 	} while (sent < size);
2477 
2478 	if (sock == mdev->data.socket)
2479 		clear_bit(NET_CONGESTED, &mdev->flags);
2480 
2481 	if (rv <= 0) {
2482 		if (rv != -EAGAIN) {
2483 			dev_err(DEV, "%s_sendmsg returned %d\n",
2484 			    sock == mdev->meta.socket ? "msock" : "sock",
2485 			    rv);
2486 			drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2487 		} else
2488 			drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2489 	}
2490 
2491 	return sent;
2492 }
2493 
2494 static int drbd_open(struct block_device *bdev, fmode_t mode)
2495 {
2496 	struct drbd_conf *mdev = bdev->bd_disk->private_data;
2497 	unsigned long flags;
2498 	int rv = 0;
2499 
2500 	spin_lock_irqsave(&mdev->req_lock, flags);
2501 	/* to have a stable mdev->state.role
2502 	 * and no race with updating open_cnt */
2503 
2504 	if (mdev->state.role != R_PRIMARY) {
2505 		if (mode & FMODE_WRITE)
2506 			rv = -EROFS;
2507 		else if (!allow_oos)
2508 			rv = -EMEDIUMTYPE;
2509 	}
2510 
2511 	if (!rv)
2512 		mdev->open_cnt++;
2513 	spin_unlock_irqrestore(&mdev->req_lock, flags);
2514 
2515 	return rv;
2516 }
2517 
2518 static int drbd_release(struct gendisk *gd, fmode_t mode)
2519 {
2520 	struct drbd_conf *mdev = gd->private_data;
2521 	mdev->open_cnt--;
2522 	return 0;
2523 }
2524 
2525 static void drbd_unplug_fn(struct request_queue *q)
2526 {
2527 	struct drbd_conf *mdev = q->queuedata;
2528 
2529 	/* unplug FIRST */
2530 	spin_lock_irq(q->queue_lock);
2531 	blk_remove_plug(q);
2532 	spin_unlock_irq(q->queue_lock);
2533 
2534 	/* only if connected */
2535 	spin_lock_irq(&mdev->req_lock);
2536 	if (mdev->state.pdsk >= D_INCONSISTENT && mdev->state.conn >= C_CONNECTED) {
2537 		D_ASSERT(mdev->state.role == R_PRIMARY);
2538 		if (test_and_clear_bit(UNPLUG_REMOTE, &mdev->flags)) {
2539 			/* add to the data.work queue,
2540 			 * unless already queued.
2541 			 * XXX this might be a good addition to drbd_queue_work
2542 			 * anyways, to detect "double queuing" ... */
2543 			if (list_empty(&mdev->unplug_work.list))
2544 				drbd_queue_work(&mdev->data.work,
2545 						&mdev->unplug_work);
2546 		}
2547 	}
2548 	spin_unlock_irq(&mdev->req_lock);
2549 
2550 	if (mdev->state.disk >= D_INCONSISTENT)
2551 		drbd_kick_lo(mdev);
2552 }
2553 
2554 static void drbd_set_defaults(struct drbd_conf *mdev)
2555 {
2556 	mdev->sync_conf.after      = DRBD_AFTER_DEF;
2557 	mdev->sync_conf.rate       = DRBD_RATE_DEF;
2558 	mdev->sync_conf.al_extents = DRBD_AL_EXTENTS_DEF;
2559 	mdev->state = (union drbd_state) {
2560 		{ .role = R_SECONDARY,
2561 		  .peer = R_UNKNOWN,
2562 		  .conn = C_STANDALONE,
2563 		  .disk = D_DISKLESS,
2564 		  .pdsk = D_UNKNOWN,
2565 		  .susp = 0
2566 		} };
2567 }
2568 
2569 void drbd_init_set_defaults(struct drbd_conf *mdev)
2570 {
2571 	/* the memset(,0,) did most of this.
2572 	 * note: only assignments, no allocation in here */
2573 
2574 	drbd_set_defaults(mdev);
2575 
2576 	/* for now, we do NOT yet support it,
2577 	 * even though we start some framework
2578 	 * to eventually support barriers */
2579 	set_bit(NO_BARRIER_SUPP, &mdev->flags);
2580 
2581 	atomic_set(&mdev->ap_bio_cnt, 0);
2582 	atomic_set(&mdev->ap_pending_cnt, 0);
2583 	atomic_set(&mdev->rs_pending_cnt, 0);
2584 	atomic_set(&mdev->unacked_cnt, 0);
2585 	atomic_set(&mdev->local_cnt, 0);
2586 	atomic_set(&mdev->net_cnt, 0);
2587 	atomic_set(&mdev->packet_seq, 0);
2588 	atomic_set(&mdev->pp_in_use, 0);
2589 
2590 	mutex_init(&mdev->md_io_mutex);
2591 	mutex_init(&mdev->data.mutex);
2592 	mutex_init(&mdev->meta.mutex);
2593 	sema_init(&mdev->data.work.s, 0);
2594 	sema_init(&mdev->meta.work.s, 0);
2595 	mutex_init(&mdev->state_mutex);
2596 
2597 	spin_lock_init(&mdev->data.work.q_lock);
2598 	spin_lock_init(&mdev->meta.work.q_lock);
2599 
2600 	spin_lock_init(&mdev->al_lock);
2601 	spin_lock_init(&mdev->req_lock);
2602 	spin_lock_init(&mdev->peer_seq_lock);
2603 	spin_lock_init(&mdev->epoch_lock);
2604 
2605 	INIT_LIST_HEAD(&mdev->active_ee);
2606 	INIT_LIST_HEAD(&mdev->sync_ee);
2607 	INIT_LIST_HEAD(&mdev->done_ee);
2608 	INIT_LIST_HEAD(&mdev->read_ee);
2609 	INIT_LIST_HEAD(&mdev->net_ee);
2610 	INIT_LIST_HEAD(&mdev->resync_reads);
2611 	INIT_LIST_HEAD(&mdev->data.work.q);
2612 	INIT_LIST_HEAD(&mdev->meta.work.q);
2613 	INIT_LIST_HEAD(&mdev->resync_work.list);
2614 	INIT_LIST_HEAD(&mdev->unplug_work.list);
2615 	INIT_LIST_HEAD(&mdev->md_sync_work.list);
2616 	INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2617 	mdev->resync_work.cb  = w_resync_inactive;
2618 	mdev->unplug_work.cb  = w_send_write_hint;
2619 	mdev->md_sync_work.cb = w_md_sync;
2620 	mdev->bm_io_work.w.cb = w_bitmap_io;
2621 	init_timer(&mdev->resync_timer);
2622 	init_timer(&mdev->md_sync_timer);
2623 	mdev->resync_timer.function = resync_timer_fn;
2624 	mdev->resync_timer.data = (unsigned long) mdev;
2625 	mdev->md_sync_timer.function = md_sync_timer_fn;
2626 	mdev->md_sync_timer.data = (unsigned long) mdev;
2627 
2628 	init_waitqueue_head(&mdev->misc_wait);
2629 	init_waitqueue_head(&mdev->state_wait);
2630 	init_waitqueue_head(&mdev->ee_wait);
2631 	init_waitqueue_head(&mdev->al_wait);
2632 	init_waitqueue_head(&mdev->seq_wait);
2633 
2634 	drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
2635 	drbd_thread_init(mdev, &mdev->worker, drbd_worker);
2636 	drbd_thread_init(mdev, &mdev->asender, drbd_asender);
2637 
2638 	mdev->agreed_pro_version = PRO_VERSION_MAX;
2639 	mdev->write_ordering = WO_bio_barrier;
2640 	mdev->resync_wenr = LC_FREE;
2641 }
2642 
2643 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2644 {
2645 	if (mdev->receiver.t_state != None)
2646 		dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2647 				mdev->receiver.t_state);
2648 
2649 	/* no need to lock it, I'm the only thread alive */
2650 	if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2651 		dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2652 	mdev->al_writ_cnt  =
2653 	mdev->bm_writ_cnt  =
2654 	mdev->read_cnt     =
2655 	mdev->recv_cnt     =
2656 	mdev->send_cnt     =
2657 	mdev->writ_cnt     =
2658 	mdev->p_size       =
2659 	mdev->rs_start     =
2660 	mdev->rs_total     =
2661 	mdev->rs_failed    =
2662 	mdev->rs_mark_left =
2663 	mdev->rs_mark_time = 0;
2664 	D_ASSERT(mdev->net_conf == NULL);
2665 
2666 	drbd_set_my_capacity(mdev, 0);
2667 	if (mdev->bitmap) {
2668 		/* maybe never allocated. */
2669 		drbd_bm_resize(mdev, 0);
2670 		drbd_bm_cleanup(mdev);
2671 	}
2672 
2673 	drbd_free_resources(mdev);
2674 
2675 	/*
2676 	 * currently we drbd_init_ee only on module load, so
2677 	 * we may do drbd_release_ee only on module unload!
2678 	 */
2679 	D_ASSERT(list_empty(&mdev->active_ee));
2680 	D_ASSERT(list_empty(&mdev->sync_ee));
2681 	D_ASSERT(list_empty(&mdev->done_ee));
2682 	D_ASSERT(list_empty(&mdev->read_ee));
2683 	D_ASSERT(list_empty(&mdev->net_ee));
2684 	D_ASSERT(list_empty(&mdev->resync_reads));
2685 	D_ASSERT(list_empty(&mdev->data.work.q));
2686 	D_ASSERT(list_empty(&mdev->meta.work.q));
2687 	D_ASSERT(list_empty(&mdev->resync_work.list));
2688 	D_ASSERT(list_empty(&mdev->unplug_work.list));
2689 
2690 }
2691 
2692 
2693 static void drbd_destroy_mempools(void)
2694 {
2695 	struct page *page;
2696 
2697 	while (drbd_pp_pool) {
2698 		page = drbd_pp_pool;
2699 		drbd_pp_pool = (struct page *)page_private(page);
2700 		__free_page(page);
2701 		drbd_pp_vacant--;
2702 	}
2703 
2704 	/* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2705 
2706 	if (drbd_ee_mempool)
2707 		mempool_destroy(drbd_ee_mempool);
2708 	if (drbd_request_mempool)
2709 		mempool_destroy(drbd_request_mempool);
2710 	if (drbd_ee_cache)
2711 		kmem_cache_destroy(drbd_ee_cache);
2712 	if (drbd_request_cache)
2713 		kmem_cache_destroy(drbd_request_cache);
2714 	if (drbd_bm_ext_cache)
2715 		kmem_cache_destroy(drbd_bm_ext_cache);
2716 	if (drbd_al_ext_cache)
2717 		kmem_cache_destroy(drbd_al_ext_cache);
2718 
2719 	drbd_ee_mempool      = NULL;
2720 	drbd_request_mempool = NULL;
2721 	drbd_ee_cache        = NULL;
2722 	drbd_request_cache   = NULL;
2723 	drbd_bm_ext_cache    = NULL;
2724 	drbd_al_ext_cache    = NULL;
2725 
2726 	return;
2727 }
2728 
2729 static int drbd_create_mempools(void)
2730 {
2731 	struct page *page;
2732 	const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
2733 	int i;
2734 
2735 	/* prepare our caches and mempools */
2736 	drbd_request_mempool = NULL;
2737 	drbd_ee_cache        = NULL;
2738 	drbd_request_cache   = NULL;
2739 	drbd_bm_ext_cache    = NULL;
2740 	drbd_al_ext_cache    = NULL;
2741 	drbd_pp_pool         = NULL;
2742 
2743 	/* caches */
2744 	drbd_request_cache = kmem_cache_create(
2745 		"drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2746 	if (drbd_request_cache == NULL)
2747 		goto Enomem;
2748 
2749 	drbd_ee_cache = kmem_cache_create(
2750 		"drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
2751 	if (drbd_ee_cache == NULL)
2752 		goto Enomem;
2753 
2754 	drbd_bm_ext_cache = kmem_cache_create(
2755 		"drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2756 	if (drbd_bm_ext_cache == NULL)
2757 		goto Enomem;
2758 
2759 	drbd_al_ext_cache = kmem_cache_create(
2760 		"drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2761 	if (drbd_al_ext_cache == NULL)
2762 		goto Enomem;
2763 
2764 	/* mempools */
2765 	drbd_request_mempool = mempool_create(number,
2766 		mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2767 	if (drbd_request_mempool == NULL)
2768 		goto Enomem;
2769 
2770 	drbd_ee_mempool = mempool_create(number,
2771 		mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2772 	if (drbd_request_mempool == NULL)
2773 		goto Enomem;
2774 
2775 	/* drbd's page pool */
2776 	spin_lock_init(&drbd_pp_lock);
2777 
2778 	for (i = 0; i < number; i++) {
2779 		page = alloc_page(GFP_HIGHUSER);
2780 		if (!page)
2781 			goto Enomem;
2782 		set_page_private(page, (unsigned long)drbd_pp_pool);
2783 		drbd_pp_pool = page;
2784 	}
2785 	drbd_pp_vacant = number;
2786 
2787 	return 0;
2788 
2789 Enomem:
2790 	drbd_destroy_mempools(); /* in case we allocated some */
2791 	return -ENOMEM;
2792 }
2793 
2794 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2795 	void *unused)
2796 {
2797 	/* just so we have it.  you never know what interesting things we
2798 	 * might want to do here some day...
2799 	 */
2800 
2801 	return NOTIFY_DONE;
2802 }
2803 
2804 static struct notifier_block drbd_notifier = {
2805 	.notifier_call = drbd_notify_sys,
2806 };
2807 
2808 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2809 {
2810 	int rr;
2811 
2812 	rr = drbd_release_ee(mdev, &mdev->active_ee);
2813 	if (rr)
2814 		dev_err(DEV, "%d EEs in active list found!\n", rr);
2815 
2816 	rr = drbd_release_ee(mdev, &mdev->sync_ee);
2817 	if (rr)
2818 		dev_err(DEV, "%d EEs in sync list found!\n", rr);
2819 
2820 	rr = drbd_release_ee(mdev, &mdev->read_ee);
2821 	if (rr)
2822 		dev_err(DEV, "%d EEs in read list found!\n", rr);
2823 
2824 	rr = drbd_release_ee(mdev, &mdev->done_ee);
2825 	if (rr)
2826 		dev_err(DEV, "%d EEs in done list found!\n", rr);
2827 
2828 	rr = drbd_release_ee(mdev, &mdev->net_ee);
2829 	if (rr)
2830 		dev_err(DEV, "%d EEs in net list found!\n", rr);
2831 }
2832 
2833 /* caution. no locking.
2834  * currently only used from module cleanup code. */
2835 static void drbd_delete_device(unsigned int minor)
2836 {
2837 	struct drbd_conf *mdev = minor_to_mdev(minor);
2838 
2839 	if (!mdev)
2840 		return;
2841 
2842 	/* paranoia asserts */
2843 	if (mdev->open_cnt != 0)
2844 		dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
2845 				__FILE__ , __LINE__);
2846 
2847 	ERR_IF (!list_empty(&mdev->data.work.q)) {
2848 		struct list_head *lp;
2849 		list_for_each(lp, &mdev->data.work.q) {
2850 			dev_err(DEV, "lp = %p\n", lp);
2851 		}
2852 	};
2853 	/* end paranoia asserts */
2854 
2855 	del_gendisk(mdev->vdisk);
2856 
2857 	/* cleanup stuff that may have been allocated during
2858 	 * device (re-)configuration or state changes */
2859 
2860 	if (mdev->this_bdev)
2861 		bdput(mdev->this_bdev);
2862 
2863 	drbd_free_resources(mdev);
2864 
2865 	drbd_release_ee_lists(mdev);
2866 
2867 	/* should be free'd on disconnect? */
2868 	kfree(mdev->ee_hash);
2869 	/*
2870 	mdev->ee_hash_s = 0;
2871 	mdev->ee_hash = NULL;
2872 	*/
2873 
2874 	lc_destroy(mdev->act_log);
2875 	lc_destroy(mdev->resync);
2876 
2877 	kfree(mdev->p_uuid);
2878 	/* mdev->p_uuid = NULL; */
2879 
2880 	kfree(mdev->int_dig_out);
2881 	kfree(mdev->int_dig_in);
2882 	kfree(mdev->int_dig_vv);
2883 
2884 	/* cleanup the rest that has been
2885 	 * allocated from drbd_new_device
2886 	 * and actually free the mdev itself */
2887 	drbd_free_mdev(mdev);
2888 }
2889 
2890 static void drbd_cleanup(void)
2891 {
2892 	unsigned int i;
2893 
2894 	unregister_reboot_notifier(&drbd_notifier);
2895 
2896 	drbd_nl_cleanup();
2897 
2898 	if (minor_table) {
2899 		if (drbd_proc)
2900 			remove_proc_entry("drbd", NULL);
2901 		i = minor_count;
2902 		while (i--)
2903 			drbd_delete_device(i);
2904 		drbd_destroy_mempools();
2905 	}
2906 
2907 	kfree(minor_table);
2908 
2909 	unregister_blkdev(DRBD_MAJOR, "drbd");
2910 
2911 	printk(KERN_INFO "drbd: module cleanup done.\n");
2912 }
2913 
2914 /**
2915  * drbd_congested() - Callback for pdflush
2916  * @congested_data:	User data
2917  * @bdi_bits:		Bits pdflush is currently interested in
2918  *
2919  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2920  */
2921 static int drbd_congested(void *congested_data, int bdi_bits)
2922 {
2923 	struct drbd_conf *mdev = congested_data;
2924 	struct request_queue *q;
2925 	char reason = '-';
2926 	int r = 0;
2927 
2928 	if (!__inc_ap_bio_cond(mdev)) {
2929 		/* DRBD has frozen IO */
2930 		r = bdi_bits;
2931 		reason = 'd';
2932 		goto out;
2933 	}
2934 
2935 	if (get_ldev(mdev)) {
2936 		q = bdev_get_queue(mdev->ldev->backing_bdev);
2937 		r = bdi_congested(&q->backing_dev_info, bdi_bits);
2938 		put_ldev(mdev);
2939 		if (r)
2940 			reason = 'b';
2941 	}
2942 
2943 	if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
2944 		r |= (1 << BDI_async_congested);
2945 		reason = reason == 'b' ? 'a' : 'n';
2946 	}
2947 
2948 out:
2949 	mdev->congestion_reason = reason;
2950 	return r;
2951 }
2952 
2953 struct drbd_conf *drbd_new_device(unsigned int minor)
2954 {
2955 	struct drbd_conf *mdev;
2956 	struct gendisk *disk;
2957 	struct request_queue *q;
2958 
2959 	/* GFP_KERNEL, we are outside of all write-out paths */
2960 	mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2961 	if (!mdev)
2962 		return NULL;
2963 	if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
2964 		goto out_no_cpumask;
2965 
2966 	mdev->minor = minor;
2967 
2968 	drbd_init_set_defaults(mdev);
2969 
2970 	q = blk_alloc_queue(GFP_KERNEL);
2971 	if (!q)
2972 		goto out_no_q;
2973 	mdev->rq_queue = q;
2974 	q->queuedata   = mdev;
2975 	blk_queue_max_segment_size(q, DRBD_MAX_SEGMENT_SIZE);
2976 
2977 	disk = alloc_disk(1);
2978 	if (!disk)
2979 		goto out_no_disk;
2980 	mdev->vdisk = disk;
2981 
2982 	set_disk_ro(disk, TRUE);
2983 
2984 	disk->queue = q;
2985 	disk->major = DRBD_MAJOR;
2986 	disk->first_minor = minor;
2987 	disk->fops = &drbd_ops;
2988 	sprintf(disk->disk_name, "drbd%d", minor);
2989 	disk->private_data = mdev;
2990 
2991 	mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2992 	/* we have no partitions. we contain only ourselves. */
2993 	mdev->this_bdev->bd_contains = mdev->this_bdev;
2994 
2995 	q->backing_dev_info.congested_fn = drbd_congested;
2996 	q->backing_dev_info.congested_data = mdev;
2997 
2998 	blk_queue_make_request(q, drbd_make_request_26);
2999 	blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3000 	blk_queue_merge_bvec(q, drbd_merge_bvec);
3001 	q->queue_lock = &mdev->req_lock; /* needed since we use */
3002 		/* plugging on a queue, that actually has no requests! */
3003 	q->unplug_fn = drbd_unplug_fn;
3004 
3005 	mdev->md_io_page = alloc_page(GFP_KERNEL);
3006 	if (!mdev->md_io_page)
3007 		goto out_no_io_page;
3008 
3009 	if (drbd_bm_init(mdev))
3010 		goto out_no_bitmap;
3011 	/* no need to lock access, we are still initializing this minor device. */
3012 	if (!tl_init(mdev))
3013 		goto out_no_tl;
3014 
3015 	mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3016 	if (!mdev->app_reads_hash)
3017 		goto out_no_app_reads;
3018 
3019 	mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3020 	if (!mdev->current_epoch)
3021 		goto out_no_epoch;
3022 
3023 	INIT_LIST_HEAD(&mdev->current_epoch->list);
3024 	mdev->epochs = 1;
3025 
3026 	return mdev;
3027 
3028 /* out_whatever_else:
3029 	kfree(mdev->current_epoch); */
3030 out_no_epoch:
3031 	kfree(mdev->app_reads_hash);
3032 out_no_app_reads:
3033 	tl_cleanup(mdev);
3034 out_no_tl:
3035 	drbd_bm_cleanup(mdev);
3036 out_no_bitmap:
3037 	__free_page(mdev->md_io_page);
3038 out_no_io_page:
3039 	put_disk(disk);
3040 out_no_disk:
3041 	blk_cleanup_queue(q);
3042 out_no_q:
3043 	free_cpumask_var(mdev->cpu_mask);
3044 out_no_cpumask:
3045 	kfree(mdev);
3046 	return NULL;
3047 }
3048 
3049 /* counterpart of drbd_new_device.
3050  * last part of drbd_delete_device. */
3051 void drbd_free_mdev(struct drbd_conf *mdev)
3052 {
3053 	kfree(mdev->current_epoch);
3054 	kfree(mdev->app_reads_hash);
3055 	tl_cleanup(mdev);
3056 	if (mdev->bitmap) /* should no longer be there. */
3057 		drbd_bm_cleanup(mdev);
3058 	__free_page(mdev->md_io_page);
3059 	put_disk(mdev->vdisk);
3060 	blk_cleanup_queue(mdev->rq_queue);
3061 	free_cpumask_var(mdev->cpu_mask);
3062 	kfree(mdev);
3063 }
3064 
3065 
3066 int __init drbd_init(void)
3067 {
3068 	int err;
3069 
3070 	if (sizeof(struct p_handshake) != 80) {
3071 		printk(KERN_ERR
3072 		       "drbd: never change the size or layout "
3073 		       "of the HandShake packet.\n");
3074 		return -EINVAL;
3075 	}
3076 
3077 	if (1 > minor_count || minor_count > 255) {
3078 		printk(KERN_ERR
3079 			"drbd: invalid minor_count (%d)\n", minor_count);
3080 #ifdef MODULE
3081 		return -EINVAL;
3082 #else
3083 		minor_count = 8;
3084 #endif
3085 	}
3086 
3087 	err = drbd_nl_init();
3088 	if (err)
3089 		return err;
3090 
3091 	err = register_blkdev(DRBD_MAJOR, "drbd");
3092 	if (err) {
3093 		printk(KERN_ERR
3094 		       "drbd: unable to register block device major %d\n",
3095 		       DRBD_MAJOR);
3096 		return err;
3097 	}
3098 
3099 	register_reboot_notifier(&drbd_notifier);
3100 
3101 	/*
3102 	 * allocate all necessary structs
3103 	 */
3104 	err = -ENOMEM;
3105 
3106 	init_waitqueue_head(&drbd_pp_wait);
3107 
3108 	drbd_proc = NULL; /* play safe for drbd_cleanup */
3109 	minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3110 				GFP_KERNEL);
3111 	if (!minor_table)
3112 		goto Enomem;
3113 
3114 	err = drbd_create_mempools();
3115 	if (err)
3116 		goto Enomem;
3117 
3118 	drbd_proc = proc_create("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops);
3119 	if (!drbd_proc)	{
3120 		printk(KERN_ERR "drbd: unable to register proc file\n");
3121 		goto Enomem;
3122 	}
3123 
3124 	rwlock_init(&global_state_lock);
3125 
3126 	printk(KERN_INFO "drbd: initialized. "
3127 	       "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3128 	       API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3129 	printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3130 	printk(KERN_INFO "drbd: registered as block device major %d\n",
3131 		DRBD_MAJOR);
3132 	printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3133 
3134 	return 0; /* Success! */
3135 
3136 Enomem:
3137 	drbd_cleanup();
3138 	if (err == -ENOMEM)
3139 		/* currently always the case */
3140 		printk(KERN_ERR "drbd: ran out of memory\n");
3141 	else
3142 		printk(KERN_ERR "drbd: initialization failure\n");
3143 	return err;
3144 }
3145 
3146 void drbd_free_bc(struct drbd_backing_dev *ldev)
3147 {
3148 	if (ldev == NULL)
3149 		return;
3150 
3151 	bd_release(ldev->backing_bdev);
3152 	bd_release(ldev->md_bdev);
3153 
3154 	fput(ldev->lo_file);
3155 	fput(ldev->md_file);
3156 
3157 	kfree(ldev);
3158 }
3159 
3160 void drbd_free_sock(struct drbd_conf *mdev)
3161 {
3162 	if (mdev->data.socket) {
3163 		kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3164 		sock_release(mdev->data.socket);
3165 		mdev->data.socket = NULL;
3166 	}
3167 	if (mdev->meta.socket) {
3168 		kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3169 		sock_release(mdev->meta.socket);
3170 		mdev->meta.socket = NULL;
3171 	}
3172 }
3173 
3174 
3175 void drbd_free_resources(struct drbd_conf *mdev)
3176 {
3177 	crypto_free_hash(mdev->csums_tfm);
3178 	mdev->csums_tfm = NULL;
3179 	crypto_free_hash(mdev->verify_tfm);
3180 	mdev->verify_tfm = NULL;
3181 	crypto_free_hash(mdev->cram_hmac_tfm);
3182 	mdev->cram_hmac_tfm = NULL;
3183 	crypto_free_hash(mdev->integrity_w_tfm);
3184 	mdev->integrity_w_tfm = NULL;
3185 	crypto_free_hash(mdev->integrity_r_tfm);
3186 	mdev->integrity_r_tfm = NULL;
3187 
3188 	drbd_free_sock(mdev);
3189 
3190 	__no_warn(local,
3191 		  drbd_free_bc(mdev->ldev);
3192 		  mdev->ldev = NULL;);
3193 }
3194 
3195 /* meta data management */
3196 
3197 struct meta_data_on_disk {
3198 	u64 la_size;           /* last agreed size. */
3199 	u64 uuid[UI_SIZE];   /* UUIDs. */
3200 	u64 device_uuid;
3201 	u64 reserved_u64_1;
3202 	u32 flags;             /* MDF */
3203 	u32 magic;
3204 	u32 md_size_sect;
3205 	u32 al_offset;         /* offset to this block */
3206 	u32 al_nr_extents;     /* important for restoring the AL */
3207 	      /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3208 	u32 bm_offset;         /* offset to the bitmap, from here */
3209 	u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3210 	u32 reserved_u32[4];
3211 
3212 } __packed;
3213 
3214 /**
3215  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3216  * @mdev:	DRBD device.
3217  */
3218 void drbd_md_sync(struct drbd_conf *mdev)
3219 {
3220 	struct meta_data_on_disk *buffer;
3221 	sector_t sector;
3222 	int i;
3223 
3224 	if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3225 		return;
3226 	del_timer(&mdev->md_sync_timer);
3227 
3228 	/* We use here D_FAILED and not D_ATTACHING because we try to write
3229 	 * metadata even if we detach due to a disk failure! */
3230 	if (!get_ldev_if_state(mdev, D_FAILED))
3231 		return;
3232 
3233 	mutex_lock(&mdev->md_io_mutex);
3234 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3235 	memset(buffer, 0, 512);
3236 
3237 	buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3238 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3239 		buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3240 	buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3241 	buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3242 
3243 	buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3244 	buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3245 	buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3246 	buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3247 	buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3248 
3249 	buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3250 
3251 	D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3252 	sector = mdev->ldev->md.md_offset;
3253 
3254 	if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3255 		clear_bit(MD_DIRTY, &mdev->flags);
3256 	} else {
3257 		/* this was a try anyways ... */
3258 		dev_err(DEV, "meta data update failed!\n");
3259 
3260 		drbd_chk_io_error(mdev, 1, TRUE);
3261 	}
3262 
3263 	/* Update mdev->ldev->md.la_size_sect,
3264 	 * since we updated it on metadata. */
3265 	mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3266 
3267 	mutex_unlock(&mdev->md_io_mutex);
3268 	put_ldev(mdev);
3269 }
3270 
3271 /**
3272  * drbd_md_read() - Reads in the meta data super block
3273  * @mdev:	DRBD device.
3274  * @bdev:	Device from which the meta data should be read in.
3275  *
3276  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_codes in case
3277  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3278  */
3279 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3280 {
3281 	struct meta_data_on_disk *buffer;
3282 	int i, rv = NO_ERROR;
3283 
3284 	if (!get_ldev_if_state(mdev, D_ATTACHING))
3285 		return ERR_IO_MD_DISK;
3286 
3287 	mutex_lock(&mdev->md_io_mutex);
3288 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3289 
3290 	if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3291 		/* NOTE: cant do normal error processing here as this is
3292 		   called BEFORE disk is attached */
3293 		dev_err(DEV, "Error while reading metadata.\n");
3294 		rv = ERR_IO_MD_DISK;
3295 		goto err;
3296 	}
3297 
3298 	if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3299 		dev_err(DEV, "Error while reading metadata, magic not found.\n");
3300 		rv = ERR_MD_INVALID;
3301 		goto err;
3302 	}
3303 	if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3304 		dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3305 		    be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3306 		rv = ERR_MD_INVALID;
3307 		goto err;
3308 	}
3309 	if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3310 		dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3311 		    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3312 		rv = ERR_MD_INVALID;
3313 		goto err;
3314 	}
3315 	if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3316 		dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3317 		    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3318 		rv = ERR_MD_INVALID;
3319 		goto err;
3320 	}
3321 
3322 	if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3323 		dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3324 		    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3325 		rv = ERR_MD_INVALID;
3326 		goto err;
3327 	}
3328 
3329 	bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3330 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3331 		bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3332 	bdev->md.flags = be32_to_cpu(buffer->flags);
3333 	mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3334 	bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3335 
3336 	if (mdev->sync_conf.al_extents < 7)
3337 		mdev->sync_conf.al_extents = 127;
3338 
3339  err:
3340 	mutex_unlock(&mdev->md_io_mutex);
3341 	put_ldev(mdev);
3342 
3343 	return rv;
3344 }
3345 
3346 /**
3347  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3348  * @mdev:	DRBD device.
3349  *
3350  * Call this function if you change anything that should be written to
3351  * the meta-data super block. This function sets MD_DIRTY, and starts a
3352  * timer that ensures that within five seconds you have to call drbd_md_sync().
3353  */
3354 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3355 {
3356 	set_bit(MD_DIRTY, &mdev->flags);
3357 	mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3358 }
3359 
3360 
3361 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3362 {
3363 	int i;
3364 
3365 	for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3366 		mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3367 }
3368 
3369 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3370 {
3371 	if (idx == UI_CURRENT) {
3372 		if (mdev->state.role == R_PRIMARY)
3373 			val |= 1;
3374 		else
3375 			val &= ~((u64)1);
3376 
3377 		drbd_set_ed_uuid(mdev, val);
3378 	}
3379 
3380 	mdev->ldev->md.uuid[idx] = val;
3381 	drbd_md_mark_dirty(mdev);
3382 }
3383 
3384 
3385 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3386 {
3387 	if (mdev->ldev->md.uuid[idx]) {
3388 		drbd_uuid_move_history(mdev);
3389 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3390 	}
3391 	_drbd_uuid_set(mdev, idx, val);
3392 }
3393 
3394 /**
3395  * drbd_uuid_new_current() - Creates a new current UUID
3396  * @mdev:	DRBD device.
3397  *
3398  * Creates a new current UUID, and rotates the old current UUID into
3399  * the bitmap slot. Causes an incremental resync upon next connect.
3400  */
3401 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3402 {
3403 	u64 val;
3404 
3405 	dev_info(DEV, "Creating new current UUID\n");
3406 	D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3407 	mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3408 
3409 	get_random_bytes(&val, sizeof(u64));
3410 	_drbd_uuid_set(mdev, UI_CURRENT, val);
3411 }
3412 
3413 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3414 {
3415 	if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3416 		return;
3417 
3418 	if (val == 0) {
3419 		drbd_uuid_move_history(mdev);
3420 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3421 		mdev->ldev->md.uuid[UI_BITMAP] = 0;
3422 	} else {
3423 		if (mdev->ldev->md.uuid[UI_BITMAP])
3424 			dev_warn(DEV, "bm UUID already set");
3425 
3426 		mdev->ldev->md.uuid[UI_BITMAP] = val;
3427 		mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3428 
3429 	}
3430 	drbd_md_mark_dirty(mdev);
3431 }
3432 
3433 /**
3434  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3435  * @mdev:	DRBD device.
3436  *
3437  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3438  */
3439 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3440 {
3441 	int rv = -EIO;
3442 
3443 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3444 		drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3445 		drbd_md_sync(mdev);
3446 		drbd_bm_set_all(mdev);
3447 
3448 		rv = drbd_bm_write(mdev);
3449 
3450 		if (!rv) {
3451 			drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3452 			drbd_md_sync(mdev);
3453 		}
3454 
3455 		put_ldev(mdev);
3456 	}
3457 
3458 	return rv;
3459 }
3460 
3461 /**
3462  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3463  * @mdev:	DRBD device.
3464  *
3465  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3466  */
3467 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3468 {
3469 	int rv = -EIO;
3470 
3471 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3472 		drbd_bm_clear_all(mdev);
3473 		rv = drbd_bm_write(mdev);
3474 		put_ldev(mdev);
3475 	}
3476 
3477 	return rv;
3478 }
3479 
3480 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3481 {
3482 	struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3483 	int rv;
3484 
3485 	D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3486 
3487 	drbd_bm_lock(mdev, work->why);
3488 	rv = work->io_fn(mdev);
3489 	drbd_bm_unlock(mdev);
3490 
3491 	clear_bit(BITMAP_IO, &mdev->flags);
3492 	wake_up(&mdev->misc_wait);
3493 
3494 	if (work->done)
3495 		work->done(mdev, rv);
3496 
3497 	clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3498 	work->why = NULL;
3499 
3500 	return 1;
3501 }
3502 
3503 /**
3504  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3505  * @mdev:	DRBD device.
3506  * @io_fn:	IO callback to be called when bitmap IO is possible
3507  * @done:	callback to be called after the bitmap IO was performed
3508  * @why:	Descriptive text of the reason for doing the IO
3509  *
3510  * While IO on the bitmap happens we freeze application IO thus we ensure
3511  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3512  * called from worker context. It MUST NOT be used while a previous such
3513  * work is still pending!
3514  */
3515 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3516 			  int (*io_fn)(struct drbd_conf *),
3517 			  void (*done)(struct drbd_conf *, int),
3518 			  char *why)
3519 {
3520 	D_ASSERT(current == mdev->worker.task);
3521 
3522 	D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3523 	D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3524 	D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3525 	if (mdev->bm_io_work.why)
3526 		dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3527 			why, mdev->bm_io_work.why);
3528 
3529 	mdev->bm_io_work.io_fn = io_fn;
3530 	mdev->bm_io_work.done = done;
3531 	mdev->bm_io_work.why = why;
3532 
3533 	set_bit(BITMAP_IO, &mdev->flags);
3534 	if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3535 		if (list_empty(&mdev->bm_io_work.w.list)) {
3536 			set_bit(BITMAP_IO_QUEUED, &mdev->flags);
3537 			drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
3538 		} else
3539 			dev_err(DEV, "FIXME avoided double queuing bm_io_work\n");
3540 	}
3541 }
3542 
3543 /**
3544  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3545  * @mdev:	DRBD device.
3546  * @io_fn:	IO callback to be called when bitmap IO is possible
3547  * @why:	Descriptive text of the reason for doing the IO
3548  *
3549  * freezes application IO while that the actual IO operations runs. This
3550  * functions MAY NOT be called from worker context.
3551  */
3552 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
3553 {
3554 	int rv;
3555 
3556 	D_ASSERT(current != mdev->worker.task);
3557 
3558 	drbd_suspend_io(mdev);
3559 
3560 	drbd_bm_lock(mdev, why);
3561 	rv = io_fn(mdev);
3562 	drbd_bm_unlock(mdev);
3563 
3564 	drbd_resume_io(mdev);
3565 
3566 	return rv;
3567 }
3568 
3569 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3570 {
3571 	if ((mdev->ldev->md.flags & flag) != flag) {
3572 		drbd_md_mark_dirty(mdev);
3573 		mdev->ldev->md.flags |= flag;
3574 	}
3575 }
3576 
3577 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3578 {
3579 	if ((mdev->ldev->md.flags & flag) != 0) {
3580 		drbd_md_mark_dirty(mdev);
3581 		mdev->ldev->md.flags &= ~flag;
3582 	}
3583 }
3584 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3585 {
3586 	return (bdev->md.flags & flag) != 0;
3587 }
3588 
3589 static void md_sync_timer_fn(unsigned long data)
3590 {
3591 	struct drbd_conf *mdev = (struct drbd_conf *) data;
3592 
3593 	drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
3594 }
3595 
3596 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3597 {
3598 	dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3599 	drbd_md_sync(mdev);
3600 
3601 	return 1;
3602 }
3603 
3604 #ifdef CONFIG_DRBD_FAULT_INJECTION
3605 /* Fault insertion support including random number generator shamelessly
3606  * stolen from kernel/rcutorture.c */
3607 struct fault_random_state {
3608 	unsigned long state;
3609 	unsigned long count;
3610 };
3611 
3612 #define FAULT_RANDOM_MULT 39916801  /* prime */
3613 #define FAULT_RANDOM_ADD	479001701 /* prime */
3614 #define FAULT_RANDOM_REFRESH 10000
3615 
3616 /*
3617  * Crude but fast random-number generator.  Uses a linear congruential
3618  * generator, with occasional help from get_random_bytes().
3619  */
3620 static unsigned long
3621 _drbd_fault_random(struct fault_random_state *rsp)
3622 {
3623 	long refresh;
3624 
3625 	if (!rsp->count--) {
3626 		get_random_bytes(&refresh, sizeof(refresh));
3627 		rsp->state += refresh;
3628 		rsp->count = FAULT_RANDOM_REFRESH;
3629 	}
3630 	rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3631 	return swahw32(rsp->state);
3632 }
3633 
3634 static char *
3635 _drbd_fault_str(unsigned int type) {
3636 	static char *_faults[] = {
3637 		[DRBD_FAULT_MD_WR] = "Meta-data write",
3638 		[DRBD_FAULT_MD_RD] = "Meta-data read",
3639 		[DRBD_FAULT_RS_WR] = "Resync write",
3640 		[DRBD_FAULT_RS_RD] = "Resync read",
3641 		[DRBD_FAULT_DT_WR] = "Data write",
3642 		[DRBD_FAULT_DT_RD] = "Data read",
3643 		[DRBD_FAULT_DT_RA] = "Data read ahead",
3644 		[DRBD_FAULT_BM_ALLOC] = "BM allocation",
3645 		[DRBD_FAULT_AL_EE] = "EE allocation"
3646 	};
3647 
3648 	return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3649 }
3650 
3651 unsigned int
3652 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3653 {
3654 	static struct fault_random_state rrs = {0, 0};
3655 
3656 	unsigned int ret = (
3657 		(fault_devs == 0 ||
3658 			((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3659 		(((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3660 
3661 	if (ret) {
3662 		fault_count++;
3663 
3664 		if (printk_ratelimit())
3665 			dev_warn(DEV, "***Simulating %s failure\n",
3666 				_drbd_fault_str(type));
3667 	}
3668 
3669 	return ret;
3670 }
3671 #endif
3672 
3673 const char *drbd_buildtag(void)
3674 {
3675 	/* DRBD built from external sources has here a reference to the
3676 	   git hash of the source code. */
3677 
3678 	static char buildtag[38] = "\0uilt-in";
3679 
3680 	if (buildtag[0] == 0) {
3681 #ifdef CONFIG_MODULES
3682 		if (THIS_MODULE != NULL)
3683 			sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3684 		else
3685 #endif
3686 			buildtag[0] = 'b';
3687 	}
3688 
3689 	return buildtag;
3690 }
3691 
3692 module_init(drbd_init)
3693 module_exit(drbd_cleanup)
3694 
3695 EXPORT_SYMBOL(drbd_conn_str);
3696 EXPORT_SYMBOL(drbd_role_str);
3697 EXPORT_SYMBOL(drbd_disk_str);
3698 EXPORT_SYMBOL(drbd_set_st_err_str);
3699