xref: /openbmc/linux/drivers/block/drbd/drbd_main.c (revision b6dcefde)
1 /*
2    drbd.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12 
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17 
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22 
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 
27  */
28 
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/smp_lock.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48 
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52 
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56 
57 #include "drbd_vli.h"
58 
59 struct after_state_chg_work {
60 	struct drbd_work w;
61 	union drbd_state os;
62 	union drbd_state ns;
63 	enum chg_state_flags flags;
64 	struct completion *done;
65 };
66 
67 int drbdd_init(struct drbd_thread *);
68 int drbd_worker(struct drbd_thread *);
69 int drbd_asender(struct drbd_thread *);
70 
71 int drbd_init(void);
72 static int drbd_open(struct block_device *bdev, fmode_t mode);
73 static int drbd_release(struct gendisk *gd, fmode_t mode);
74 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
75 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
76 			   union drbd_state ns, enum chg_state_flags flags);
77 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
78 static void md_sync_timer_fn(unsigned long data);
79 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
80 
81 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
82 	      "Lars Ellenberg <lars@linbit.com>");
83 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
84 MODULE_VERSION(REL_VERSION);
85 MODULE_LICENSE("GPL");
86 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
87 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
88 
89 #include <linux/moduleparam.h>
90 /* allow_open_on_secondary */
91 MODULE_PARM_DESC(allow_oos, "DONT USE!");
92 /* thanks to these macros, if compiled into the kernel (not-module),
93  * this becomes the boot parameter drbd.minor_count */
94 module_param(minor_count, uint, 0444);
95 module_param(disable_sendpage, bool, 0644);
96 module_param(allow_oos, bool, 0);
97 module_param(cn_idx, uint, 0444);
98 module_param(proc_details, int, 0644);
99 
100 #ifdef CONFIG_DRBD_FAULT_INJECTION
101 int enable_faults;
102 int fault_rate;
103 static int fault_count;
104 int fault_devs;
105 /* bitmap of enabled faults */
106 module_param(enable_faults, int, 0664);
107 /* fault rate % value - applies to all enabled faults */
108 module_param(fault_rate, int, 0664);
109 /* count of faults inserted */
110 module_param(fault_count, int, 0664);
111 /* bitmap of devices to insert faults on */
112 module_param(fault_devs, int, 0644);
113 #endif
114 
115 /* module parameter, defined */
116 unsigned int minor_count = 32;
117 int disable_sendpage;
118 int allow_oos;
119 unsigned int cn_idx = CN_IDX_DRBD;
120 int proc_details;       /* Detail level in proc drbd*/
121 
122 /* Module parameter for setting the user mode helper program
123  * to run. Default is /sbin/drbdadm */
124 char usermode_helper[80] = "/sbin/drbdadm";
125 
126 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
127 
128 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
129  * as member "struct gendisk *vdisk;"
130  */
131 struct drbd_conf **minor_table;
132 
133 struct kmem_cache *drbd_request_cache;
134 struct kmem_cache *drbd_ee_cache;	/* epoch entries */
135 struct kmem_cache *drbd_bm_ext_cache;	/* bitmap extents */
136 struct kmem_cache *drbd_al_ext_cache;	/* activity log extents */
137 mempool_t *drbd_request_mempool;
138 mempool_t *drbd_ee_mempool;
139 
140 /* I do not use a standard mempool, because:
141    1) I want to hand out the pre-allocated objects first.
142    2) I want to be able to interrupt sleeping allocation with a signal.
143    Note: This is a single linked list, the next pointer is the private
144 	 member of struct page.
145  */
146 struct page *drbd_pp_pool;
147 spinlock_t   drbd_pp_lock;
148 int          drbd_pp_vacant;
149 wait_queue_head_t drbd_pp_wait;
150 
151 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
152 
153 static const struct block_device_operations drbd_ops = {
154 	.owner =   THIS_MODULE,
155 	.open =    drbd_open,
156 	.release = drbd_release,
157 };
158 
159 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
160 
161 #ifdef __CHECKER__
162 /* When checking with sparse, and this is an inline function, sparse will
163    give tons of false positives. When this is a real functions sparse works.
164  */
165 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
166 {
167 	int io_allowed;
168 
169 	atomic_inc(&mdev->local_cnt);
170 	io_allowed = (mdev->state.disk >= mins);
171 	if (!io_allowed) {
172 		if (atomic_dec_and_test(&mdev->local_cnt))
173 			wake_up(&mdev->misc_wait);
174 	}
175 	return io_allowed;
176 }
177 
178 #endif
179 
180 /**
181  * DOC: The transfer log
182  *
183  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
184  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
185  * of the list. There is always at least one &struct drbd_tl_epoch object.
186  *
187  * Each &struct drbd_tl_epoch has a circular double linked list of requests
188  * attached.
189  */
190 static int tl_init(struct drbd_conf *mdev)
191 {
192 	struct drbd_tl_epoch *b;
193 
194 	/* during device minor initialization, we may well use GFP_KERNEL */
195 	b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
196 	if (!b)
197 		return 0;
198 	INIT_LIST_HEAD(&b->requests);
199 	INIT_LIST_HEAD(&b->w.list);
200 	b->next = NULL;
201 	b->br_number = 4711;
202 	b->n_req = 0;
203 	b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
204 
205 	mdev->oldest_tle = b;
206 	mdev->newest_tle = b;
207 	INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
208 
209 	mdev->tl_hash = NULL;
210 	mdev->tl_hash_s = 0;
211 
212 	return 1;
213 }
214 
215 static void tl_cleanup(struct drbd_conf *mdev)
216 {
217 	D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
218 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
219 	kfree(mdev->oldest_tle);
220 	mdev->oldest_tle = NULL;
221 	kfree(mdev->unused_spare_tle);
222 	mdev->unused_spare_tle = NULL;
223 	kfree(mdev->tl_hash);
224 	mdev->tl_hash = NULL;
225 	mdev->tl_hash_s = 0;
226 }
227 
228 /**
229  * _tl_add_barrier() - Adds a barrier to the transfer log
230  * @mdev:	DRBD device.
231  * @new:	Barrier to be added before the current head of the TL.
232  *
233  * The caller must hold the req_lock.
234  */
235 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
236 {
237 	struct drbd_tl_epoch *newest_before;
238 
239 	INIT_LIST_HEAD(&new->requests);
240 	INIT_LIST_HEAD(&new->w.list);
241 	new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
242 	new->next = NULL;
243 	new->n_req = 0;
244 
245 	newest_before = mdev->newest_tle;
246 	/* never send a barrier number == 0, because that is special-cased
247 	 * when using TCQ for our write ordering code */
248 	new->br_number = (newest_before->br_number+1) ?: 1;
249 	if (mdev->newest_tle != new) {
250 		mdev->newest_tle->next = new;
251 		mdev->newest_tle = new;
252 	}
253 }
254 
255 /**
256  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
257  * @mdev:	DRBD device.
258  * @barrier_nr:	Expected identifier of the DRBD write barrier packet.
259  * @set_size:	Expected number of requests before that barrier.
260  *
261  * In case the passed barrier_nr or set_size does not match the oldest
262  * &struct drbd_tl_epoch objects this function will cause a termination
263  * of the connection.
264  */
265 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
266 		       unsigned int set_size)
267 {
268 	struct drbd_tl_epoch *b, *nob; /* next old barrier */
269 	struct list_head *le, *tle;
270 	struct drbd_request *r;
271 
272 	spin_lock_irq(&mdev->req_lock);
273 
274 	b = mdev->oldest_tle;
275 
276 	/* first some paranoia code */
277 	if (b == NULL) {
278 		dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
279 			barrier_nr);
280 		goto bail;
281 	}
282 	if (b->br_number != barrier_nr) {
283 		dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
284 			barrier_nr, b->br_number);
285 		goto bail;
286 	}
287 	if (b->n_req != set_size) {
288 		dev_err(DEV, "BAD! BarrierAck #%u received with n_req=%u, expected n_req=%u!\n",
289 			barrier_nr, set_size, b->n_req);
290 		goto bail;
291 	}
292 
293 	/* Clean up list of requests processed during current epoch */
294 	list_for_each_safe(le, tle, &b->requests) {
295 		r = list_entry(le, struct drbd_request, tl_requests);
296 		_req_mod(r, barrier_acked);
297 	}
298 	/* There could be requests on the list waiting for completion
299 	   of the write to the local disk. To avoid corruptions of
300 	   slab's data structures we have to remove the lists head.
301 
302 	   Also there could have been a barrier ack out of sequence, overtaking
303 	   the write acks - which would be a bug and violating write ordering.
304 	   To not deadlock in case we lose connection while such requests are
305 	   still pending, we need some way to find them for the
306 	   _req_mode(connection_lost_while_pending).
307 
308 	   These have been list_move'd to the out_of_sequence_requests list in
309 	   _req_mod(, barrier_acked) above.
310 	   */
311 	list_del_init(&b->requests);
312 
313 	nob = b->next;
314 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
315 		_tl_add_barrier(mdev, b);
316 		if (nob)
317 			mdev->oldest_tle = nob;
318 		/* if nob == NULL b was the only barrier, and becomes the new
319 		   barrier. Therefore mdev->oldest_tle points already to b */
320 	} else {
321 		D_ASSERT(nob != NULL);
322 		mdev->oldest_tle = nob;
323 		kfree(b);
324 	}
325 
326 	spin_unlock_irq(&mdev->req_lock);
327 	dec_ap_pending(mdev);
328 
329 	return;
330 
331 bail:
332 	spin_unlock_irq(&mdev->req_lock);
333 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
334 }
335 
336 
337 /**
338  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
339  * @mdev:	DRBD device.
340  *
341  * This is called after the connection to the peer was lost. The storage covered
342  * by the requests on the transfer gets marked as our of sync. Called from the
343  * receiver thread and the worker thread.
344  */
345 void tl_clear(struct drbd_conf *mdev)
346 {
347 	struct drbd_tl_epoch *b, *tmp;
348 	struct list_head *le, *tle;
349 	struct drbd_request *r;
350 	int new_initial_bnr = net_random();
351 
352 	spin_lock_irq(&mdev->req_lock);
353 
354 	b = mdev->oldest_tle;
355 	while (b) {
356 		list_for_each_safe(le, tle, &b->requests) {
357 			r = list_entry(le, struct drbd_request, tl_requests);
358 			/* It would be nice to complete outside of spinlock.
359 			 * But this is easier for now. */
360 			_req_mod(r, connection_lost_while_pending);
361 		}
362 		tmp = b->next;
363 
364 		/* there could still be requests on that ring list,
365 		 * in case local io is still pending */
366 		list_del(&b->requests);
367 
368 		/* dec_ap_pending corresponding to queue_barrier.
369 		 * the newest barrier may not have been queued yet,
370 		 * in which case w.cb is still NULL. */
371 		if (b->w.cb != NULL)
372 			dec_ap_pending(mdev);
373 
374 		if (b == mdev->newest_tle) {
375 			/* recycle, but reinit! */
376 			D_ASSERT(tmp == NULL);
377 			INIT_LIST_HEAD(&b->requests);
378 			INIT_LIST_HEAD(&b->w.list);
379 			b->w.cb = NULL;
380 			b->br_number = new_initial_bnr;
381 			b->n_req = 0;
382 
383 			mdev->oldest_tle = b;
384 			break;
385 		}
386 		kfree(b);
387 		b = tmp;
388 	}
389 
390 	/* we expect this list to be empty. */
391 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
392 
393 	/* but just in case, clean it up anyways! */
394 	list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
395 		r = list_entry(le, struct drbd_request, tl_requests);
396 		/* It would be nice to complete outside of spinlock.
397 		 * But this is easier for now. */
398 		_req_mod(r, connection_lost_while_pending);
399 	}
400 
401 	/* ensure bit indicating barrier is required is clear */
402 	clear_bit(CREATE_BARRIER, &mdev->flags);
403 
404 	spin_unlock_irq(&mdev->req_lock);
405 }
406 
407 /**
408  * cl_wide_st_chg() - TRUE if the state change is a cluster wide one
409  * @mdev:	DRBD device.
410  * @os:		old (current) state.
411  * @ns:		new (wanted) state.
412  */
413 static int cl_wide_st_chg(struct drbd_conf *mdev,
414 			  union drbd_state os, union drbd_state ns)
415 {
416 	return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
417 		 ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
418 		  (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
419 		  (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
420 		  (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
421 		(os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
422 		(os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
423 }
424 
425 int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
426 		      union drbd_state mask, union drbd_state val)
427 {
428 	unsigned long flags;
429 	union drbd_state os, ns;
430 	int rv;
431 
432 	spin_lock_irqsave(&mdev->req_lock, flags);
433 	os = mdev->state;
434 	ns.i = (os.i & ~mask.i) | val.i;
435 	rv = _drbd_set_state(mdev, ns, f, NULL);
436 	ns = mdev->state;
437 	spin_unlock_irqrestore(&mdev->req_lock, flags);
438 
439 	return rv;
440 }
441 
442 /**
443  * drbd_force_state() - Impose a change which happens outside our control on our state
444  * @mdev:	DRBD device.
445  * @mask:	mask of state bits to change.
446  * @val:	value of new state bits.
447  */
448 void drbd_force_state(struct drbd_conf *mdev,
449 	union drbd_state mask, union drbd_state val)
450 {
451 	drbd_change_state(mdev, CS_HARD, mask, val);
452 }
453 
454 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
455 static int is_valid_state_transition(struct drbd_conf *,
456 				     union drbd_state, union drbd_state);
457 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
458 				       union drbd_state ns, int *warn_sync_abort);
459 int drbd_send_state_req(struct drbd_conf *,
460 			union drbd_state, union drbd_state);
461 
462 static enum drbd_state_ret_codes _req_st_cond(struct drbd_conf *mdev,
463 				    union drbd_state mask, union drbd_state val)
464 {
465 	union drbd_state os, ns;
466 	unsigned long flags;
467 	int rv;
468 
469 	if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
470 		return SS_CW_SUCCESS;
471 
472 	if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
473 		return SS_CW_FAILED_BY_PEER;
474 
475 	rv = 0;
476 	spin_lock_irqsave(&mdev->req_lock, flags);
477 	os = mdev->state;
478 	ns.i = (os.i & ~mask.i) | val.i;
479 	ns = sanitize_state(mdev, os, ns, NULL);
480 
481 	if (!cl_wide_st_chg(mdev, os, ns))
482 		rv = SS_CW_NO_NEED;
483 	if (!rv) {
484 		rv = is_valid_state(mdev, ns);
485 		if (rv == SS_SUCCESS) {
486 			rv = is_valid_state_transition(mdev, ns, os);
487 			if (rv == SS_SUCCESS)
488 				rv = 0; /* cont waiting, otherwise fail. */
489 		}
490 	}
491 	spin_unlock_irqrestore(&mdev->req_lock, flags);
492 
493 	return rv;
494 }
495 
496 /**
497  * drbd_req_state() - Perform an eventually cluster wide state change
498  * @mdev:	DRBD device.
499  * @mask:	mask of state bits to change.
500  * @val:	value of new state bits.
501  * @f:		flags
502  *
503  * Should not be called directly, use drbd_request_state() or
504  * _drbd_request_state().
505  */
506 static int drbd_req_state(struct drbd_conf *mdev,
507 			  union drbd_state mask, union drbd_state val,
508 			  enum chg_state_flags f)
509 {
510 	struct completion done;
511 	unsigned long flags;
512 	union drbd_state os, ns;
513 	int rv;
514 
515 	init_completion(&done);
516 
517 	if (f & CS_SERIALIZE)
518 		mutex_lock(&mdev->state_mutex);
519 
520 	spin_lock_irqsave(&mdev->req_lock, flags);
521 	os = mdev->state;
522 	ns.i = (os.i & ~mask.i) | val.i;
523 	ns = sanitize_state(mdev, os, ns, NULL);
524 
525 	if (cl_wide_st_chg(mdev, os, ns)) {
526 		rv = is_valid_state(mdev, ns);
527 		if (rv == SS_SUCCESS)
528 			rv = is_valid_state_transition(mdev, ns, os);
529 		spin_unlock_irqrestore(&mdev->req_lock, flags);
530 
531 		if (rv < SS_SUCCESS) {
532 			if (f & CS_VERBOSE)
533 				print_st_err(mdev, os, ns, rv);
534 			goto abort;
535 		}
536 
537 		drbd_state_lock(mdev);
538 		if (!drbd_send_state_req(mdev, mask, val)) {
539 			drbd_state_unlock(mdev);
540 			rv = SS_CW_FAILED_BY_PEER;
541 			if (f & CS_VERBOSE)
542 				print_st_err(mdev, os, ns, rv);
543 			goto abort;
544 		}
545 
546 		wait_event(mdev->state_wait,
547 			(rv = _req_st_cond(mdev, mask, val)));
548 
549 		if (rv < SS_SUCCESS) {
550 			drbd_state_unlock(mdev);
551 			if (f & CS_VERBOSE)
552 				print_st_err(mdev, os, ns, rv);
553 			goto abort;
554 		}
555 		spin_lock_irqsave(&mdev->req_lock, flags);
556 		os = mdev->state;
557 		ns.i = (os.i & ~mask.i) | val.i;
558 		rv = _drbd_set_state(mdev, ns, f, &done);
559 		drbd_state_unlock(mdev);
560 	} else {
561 		rv = _drbd_set_state(mdev, ns, f, &done);
562 	}
563 
564 	spin_unlock_irqrestore(&mdev->req_lock, flags);
565 
566 	if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
567 		D_ASSERT(current != mdev->worker.task);
568 		wait_for_completion(&done);
569 	}
570 
571 abort:
572 	if (f & CS_SERIALIZE)
573 		mutex_unlock(&mdev->state_mutex);
574 
575 	return rv;
576 }
577 
578 /**
579  * _drbd_request_state() - Request a state change (with flags)
580  * @mdev:	DRBD device.
581  * @mask:	mask of state bits to change.
582  * @val:	value of new state bits.
583  * @f:		flags
584  *
585  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
586  * flag, or when logging of failed state change requests is not desired.
587  */
588 int _drbd_request_state(struct drbd_conf *mdev,	union drbd_state mask,
589 			union drbd_state val,	enum chg_state_flags f)
590 {
591 	int rv;
592 
593 	wait_event(mdev->state_wait,
594 		   (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
595 
596 	return rv;
597 }
598 
599 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
600 {
601 	dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
602 	    name,
603 	    drbd_conn_str(ns.conn),
604 	    drbd_role_str(ns.role),
605 	    drbd_role_str(ns.peer),
606 	    drbd_disk_str(ns.disk),
607 	    drbd_disk_str(ns.pdsk),
608 	    ns.susp ? 's' : 'r',
609 	    ns.aftr_isp ? 'a' : '-',
610 	    ns.peer_isp ? 'p' : '-',
611 	    ns.user_isp ? 'u' : '-'
612 	    );
613 }
614 
615 void print_st_err(struct drbd_conf *mdev,
616 	union drbd_state os, union drbd_state ns, int err)
617 {
618 	if (err == SS_IN_TRANSIENT_STATE)
619 		return;
620 	dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
621 	print_st(mdev, " state", os);
622 	print_st(mdev, "wanted", ns);
623 }
624 
625 
626 #define drbd_peer_str drbd_role_str
627 #define drbd_pdsk_str drbd_disk_str
628 
629 #define drbd_susp_str(A)     ((A) ? "1" : "0")
630 #define drbd_aftr_isp_str(A) ((A) ? "1" : "0")
631 #define drbd_peer_isp_str(A) ((A) ? "1" : "0")
632 #define drbd_user_isp_str(A) ((A) ? "1" : "0")
633 
634 #define PSC(A) \
635 	({ if (ns.A != os.A) { \
636 		pbp += sprintf(pbp, #A "( %s -> %s ) ", \
637 			      drbd_##A##_str(os.A), \
638 			      drbd_##A##_str(ns.A)); \
639 	} })
640 
641 /**
642  * is_valid_state() - Returns an SS_ error code if ns is not valid
643  * @mdev:	DRBD device.
644  * @ns:		State to consider.
645  */
646 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
647 {
648 	/* See drbd_state_sw_errors in drbd_strings.c */
649 
650 	enum drbd_fencing_p fp;
651 	int rv = SS_SUCCESS;
652 
653 	fp = FP_DONT_CARE;
654 	if (get_ldev(mdev)) {
655 		fp = mdev->ldev->dc.fencing;
656 		put_ldev(mdev);
657 	}
658 
659 	if (get_net_conf(mdev)) {
660 		if (!mdev->net_conf->two_primaries &&
661 		    ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
662 			rv = SS_TWO_PRIMARIES;
663 		put_net_conf(mdev);
664 	}
665 
666 	if (rv <= 0)
667 		/* already found a reason to abort */;
668 	else if (ns.role == R_SECONDARY && mdev->open_cnt)
669 		rv = SS_DEVICE_IN_USE;
670 
671 	else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
672 		rv = SS_NO_UP_TO_DATE_DISK;
673 
674 	else if (fp >= FP_RESOURCE &&
675 		 ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
676 		rv = SS_PRIMARY_NOP;
677 
678 	else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
679 		rv = SS_NO_UP_TO_DATE_DISK;
680 
681 	else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
682 		rv = SS_NO_LOCAL_DISK;
683 
684 	else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
685 		rv = SS_NO_REMOTE_DISK;
686 
687 	else if ((ns.conn == C_CONNECTED ||
688 		  ns.conn == C_WF_BITMAP_S ||
689 		  ns.conn == C_SYNC_SOURCE ||
690 		  ns.conn == C_PAUSED_SYNC_S) &&
691 		  ns.disk == D_OUTDATED)
692 		rv = SS_CONNECTED_OUTDATES;
693 
694 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
695 		 (mdev->sync_conf.verify_alg[0] == 0))
696 		rv = SS_NO_VERIFY_ALG;
697 
698 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
699 		  mdev->agreed_pro_version < 88)
700 		rv = SS_NOT_SUPPORTED;
701 
702 	return rv;
703 }
704 
705 /**
706  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
707  * @mdev:	DRBD device.
708  * @ns:		new state.
709  * @os:		old state.
710  */
711 static int is_valid_state_transition(struct drbd_conf *mdev,
712 				     union drbd_state ns, union drbd_state os)
713 {
714 	int rv = SS_SUCCESS;
715 
716 	if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
717 	    os.conn > C_CONNECTED)
718 		rv = SS_RESYNC_RUNNING;
719 
720 	if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
721 		rv = SS_ALREADY_STANDALONE;
722 
723 	if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
724 		rv = SS_IS_DISKLESS;
725 
726 	if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
727 		rv = SS_NO_NET_CONFIG;
728 
729 	if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
730 		rv = SS_LOWER_THAN_OUTDATED;
731 
732 	if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
733 		rv = SS_IN_TRANSIENT_STATE;
734 
735 	if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
736 		rv = SS_IN_TRANSIENT_STATE;
737 
738 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
739 		rv = SS_NEED_CONNECTION;
740 
741 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
742 	    ns.conn != os.conn && os.conn > C_CONNECTED)
743 		rv = SS_RESYNC_RUNNING;
744 
745 	if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
746 	    os.conn < C_CONNECTED)
747 		rv = SS_NEED_CONNECTION;
748 
749 	return rv;
750 }
751 
752 /**
753  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
754  * @mdev:	DRBD device.
755  * @os:		old state.
756  * @ns:		new state.
757  * @warn_sync_abort:
758  *
759  * When we loose connection, we have to set the state of the peers disk (pdsk)
760  * to D_UNKNOWN. This rule and many more along those lines are in this function.
761  */
762 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
763 				       union drbd_state ns, int *warn_sync_abort)
764 {
765 	enum drbd_fencing_p fp;
766 
767 	fp = FP_DONT_CARE;
768 	if (get_ldev(mdev)) {
769 		fp = mdev->ldev->dc.fencing;
770 		put_ldev(mdev);
771 	}
772 
773 	/* Disallow Network errors to configure a device's network part */
774 	if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
775 	    os.conn <= C_DISCONNECTING)
776 		ns.conn = os.conn;
777 
778 	/* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow */
779 	if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
780 	    ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING)
781 		ns.conn = os.conn;
782 
783 	/* After C_DISCONNECTING only C_STANDALONE may follow */
784 	if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
785 		ns.conn = os.conn;
786 
787 	if (ns.conn < C_CONNECTED) {
788 		ns.peer_isp = 0;
789 		ns.peer = R_UNKNOWN;
790 		if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
791 			ns.pdsk = D_UNKNOWN;
792 	}
793 
794 	/* Clear the aftr_isp when becoming unconfigured */
795 	if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
796 		ns.aftr_isp = 0;
797 
798 	if (ns.conn <= C_DISCONNECTING && ns.disk == D_DISKLESS)
799 		ns.pdsk = D_UNKNOWN;
800 
801 	/* Abort resync if a disk fails/detaches */
802 	if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
803 	    (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
804 		if (warn_sync_abort)
805 			*warn_sync_abort = 1;
806 		ns.conn = C_CONNECTED;
807 	}
808 
809 	if (ns.conn >= C_CONNECTED &&
810 	    ((ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED) ||
811 	     (ns.disk == D_NEGOTIATING && ns.conn == C_WF_BITMAP_T))) {
812 		switch (ns.conn) {
813 		case C_WF_BITMAP_T:
814 		case C_PAUSED_SYNC_T:
815 			ns.disk = D_OUTDATED;
816 			break;
817 		case C_CONNECTED:
818 		case C_WF_BITMAP_S:
819 		case C_SYNC_SOURCE:
820 		case C_PAUSED_SYNC_S:
821 			ns.disk = D_UP_TO_DATE;
822 			break;
823 		case C_SYNC_TARGET:
824 			ns.disk = D_INCONSISTENT;
825 			dev_warn(DEV, "Implicitly set disk state Inconsistent!\n");
826 			break;
827 		}
828 		if (os.disk == D_OUTDATED && ns.disk == D_UP_TO_DATE)
829 			dev_warn(DEV, "Implicitly set disk from Outdated to UpToDate\n");
830 	}
831 
832 	if (ns.conn >= C_CONNECTED &&
833 	    (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)) {
834 		switch (ns.conn) {
835 		case C_CONNECTED:
836 		case C_WF_BITMAP_T:
837 		case C_PAUSED_SYNC_T:
838 		case C_SYNC_TARGET:
839 			ns.pdsk = D_UP_TO_DATE;
840 			break;
841 		case C_WF_BITMAP_S:
842 		case C_PAUSED_SYNC_S:
843 			ns.pdsk = D_OUTDATED;
844 			break;
845 		case C_SYNC_SOURCE:
846 			ns.pdsk = D_INCONSISTENT;
847 			dev_warn(DEV, "Implicitly set pdsk Inconsistent!\n");
848 			break;
849 		}
850 		if (os.pdsk == D_OUTDATED && ns.pdsk == D_UP_TO_DATE)
851 			dev_warn(DEV, "Implicitly set pdsk from Outdated to UpToDate\n");
852 	}
853 
854 	/* Connection breaks down before we finished "Negotiating" */
855 	if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
856 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
857 		if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
858 			ns.disk = mdev->new_state_tmp.disk;
859 			ns.pdsk = mdev->new_state_tmp.pdsk;
860 		} else {
861 			dev_alert(DEV, "Connection lost while negotiating, no data!\n");
862 			ns.disk = D_DISKLESS;
863 			ns.pdsk = D_UNKNOWN;
864 		}
865 		put_ldev(mdev);
866 	}
867 
868 	if (fp == FP_STONITH &&
869 	    (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
870 	    !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
871 		ns.susp = 1;
872 
873 	if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
874 		if (ns.conn == C_SYNC_SOURCE)
875 			ns.conn = C_PAUSED_SYNC_S;
876 		if (ns.conn == C_SYNC_TARGET)
877 			ns.conn = C_PAUSED_SYNC_T;
878 	} else {
879 		if (ns.conn == C_PAUSED_SYNC_S)
880 			ns.conn = C_SYNC_SOURCE;
881 		if (ns.conn == C_PAUSED_SYNC_T)
882 			ns.conn = C_SYNC_TARGET;
883 	}
884 
885 	return ns;
886 }
887 
888 /* helper for __drbd_set_state */
889 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
890 {
891 	if (cs == C_VERIFY_T) {
892 		/* starting online verify from an arbitrary position
893 		 * does not fit well into the existing protocol.
894 		 * on C_VERIFY_T, we initialize ov_left and friends
895 		 * implicitly in receive_DataRequest once the
896 		 * first P_OV_REQUEST is received */
897 		mdev->ov_start_sector = ~(sector_t)0;
898 	} else {
899 		unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
900 		if (bit >= mdev->rs_total)
901 			mdev->ov_start_sector =
902 				BM_BIT_TO_SECT(mdev->rs_total - 1);
903 		mdev->ov_position = mdev->ov_start_sector;
904 	}
905 }
906 
907 /**
908  * __drbd_set_state() - Set a new DRBD state
909  * @mdev:	DRBD device.
910  * @ns:		new state.
911  * @flags:	Flags
912  * @done:	Optional completion, that will get completed after the after_state_ch() finished
913  *
914  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
915  */
916 int __drbd_set_state(struct drbd_conf *mdev,
917 		    union drbd_state ns, enum chg_state_flags flags,
918 		    struct completion *done)
919 {
920 	union drbd_state os;
921 	int rv = SS_SUCCESS;
922 	int warn_sync_abort = 0;
923 	struct after_state_chg_work *ascw;
924 
925 	os = mdev->state;
926 
927 	ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
928 
929 	if (ns.i == os.i)
930 		return SS_NOTHING_TO_DO;
931 
932 	if (!(flags & CS_HARD)) {
933 		/*  pre-state-change checks ; only look at ns  */
934 		/* See drbd_state_sw_errors in drbd_strings.c */
935 
936 		rv = is_valid_state(mdev, ns);
937 		if (rv < SS_SUCCESS) {
938 			/* If the old state was illegal as well, then let
939 			   this happen...*/
940 
941 			if (is_valid_state(mdev, os) == rv) {
942 				dev_err(DEV, "Considering state change from bad state. "
943 				    "Error would be: '%s'\n",
944 				    drbd_set_st_err_str(rv));
945 				print_st(mdev, "old", os);
946 				print_st(mdev, "new", ns);
947 				rv = is_valid_state_transition(mdev, ns, os);
948 			}
949 		} else
950 			rv = is_valid_state_transition(mdev, ns, os);
951 	}
952 
953 	if (rv < SS_SUCCESS) {
954 		if (flags & CS_VERBOSE)
955 			print_st_err(mdev, os, ns, rv);
956 		return rv;
957 	}
958 
959 	if (warn_sync_abort)
960 		dev_warn(DEV, "Resync aborted.\n");
961 
962 	{
963 		char *pbp, pb[300];
964 		pbp = pb;
965 		*pbp = 0;
966 		PSC(role);
967 		PSC(peer);
968 		PSC(conn);
969 		PSC(disk);
970 		PSC(pdsk);
971 		PSC(susp);
972 		PSC(aftr_isp);
973 		PSC(peer_isp);
974 		PSC(user_isp);
975 		dev_info(DEV, "%s\n", pb);
976 	}
977 
978 	/* solve the race between becoming unconfigured,
979 	 * worker doing the cleanup, and
980 	 * admin reconfiguring us:
981 	 * on (re)configure, first set CONFIG_PENDING,
982 	 * then wait for a potentially exiting worker,
983 	 * start the worker, and schedule one no_op.
984 	 * then proceed with configuration.
985 	 */
986 	if (ns.disk == D_DISKLESS &&
987 	    ns.conn == C_STANDALONE &&
988 	    ns.role == R_SECONDARY &&
989 	    !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
990 		set_bit(DEVICE_DYING, &mdev->flags);
991 
992 	mdev->state.i = ns.i;
993 	wake_up(&mdev->misc_wait);
994 	wake_up(&mdev->state_wait);
995 
996 	/*   post-state-change actions   */
997 	if (os.conn >= C_SYNC_SOURCE   && ns.conn <= C_CONNECTED) {
998 		set_bit(STOP_SYNC_TIMER, &mdev->flags);
999 		mod_timer(&mdev->resync_timer, jiffies);
1000 	}
1001 
1002 	/* aborted verify run. log the last position */
1003 	if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1004 	    ns.conn < C_CONNECTED) {
1005 		mdev->ov_start_sector =
1006 			BM_BIT_TO_SECT(mdev->rs_total - mdev->ov_left);
1007 		dev_info(DEV, "Online Verify reached sector %llu\n",
1008 			(unsigned long long)mdev->ov_start_sector);
1009 	}
1010 
1011 	if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1012 	    (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1013 		dev_info(DEV, "Syncer continues.\n");
1014 		mdev->rs_paused += (long)jiffies-(long)mdev->rs_mark_time;
1015 		if (ns.conn == C_SYNC_TARGET) {
1016 			if (!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))
1017 				mod_timer(&mdev->resync_timer, jiffies);
1018 			/* This if (!test_bit) is only needed for the case
1019 			   that a device that has ceased to used its timer,
1020 			   i.e. it is already in drbd_resync_finished() gets
1021 			   paused and resumed. */
1022 		}
1023 	}
1024 
1025 	if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1026 	    (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1027 		dev_info(DEV, "Resync suspended\n");
1028 		mdev->rs_mark_time = jiffies;
1029 		if (ns.conn == C_PAUSED_SYNC_T)
1030 			set_bit(STOP_SYNC_TIMER, &mdev->flags);
1031 	}
1032 
1033 	if (os.conn == C_CONNECTED &&
1034 	    (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1035 		mdev->ov_position = 0;
1036 		mdev->rs_total =
1037 		mdev->rs_mark_left = drbd_bm_bits(mdev);
1038 		if (mdev->agreed_pro_version >= 90)
1039 			set_ov_position(mdev, ns.conn);
1040 		else
1041 			mdev->ov_start_sector = 0;
1042 		mdev->ov_left = mdev->rs_total
1043 			      - BM_SECT_TO_BIT(mdev->ov_position);
1044 		mdev->rs_start     =
1045 		mdev->rs_mark_time = jiffies;
1046 		mdev->ov_last_oos_size = 0;
1047 		mdev->ov_last_oos_start = 0;
1048 
1049 		if (ns.conn == C_VERIFY_S) {
1050 			dev_info(DEV, "Starting Online Verify from sector %llu\n",
1051 					(unsigned long long)mdev->ov_position);
1052 			mod_timer(&mdev->resync_timer, jiffies);
1053 		}
1054 	}
1055 
1056 	if (get_ldev(mdev)) {
1057 		u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1058 						 MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1059 						 MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1060 
1061 		if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1062 			mdf |= MDF_CRASHED_PRIMARY;
1063 		if (mdev->state.role == R_PRIMARY ||
1064 		    (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1065 			mdf |= MDF_PRIMARY_IND;
1066 		if (mdev->state.conn > C_WF_REPORT_PARAMS)
1067 			mdf |= MDF_CONNECTED_IND;
1068 		if (mdev->state.disk > D_INCONSISTENT)
1069 			mdf |= MDF_CONSISTENT;
1070 		if (mdev->state.disk > D_OUTDATED)
1071 			mdf |= MDF_WAS_UP_TO_DATE;
1072 		if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1073 			mdf |= MDF_PEER_OUT_DATED;
1074 		if (mdf != mdev->ldev->md.flags) {
1075 			mdev->ldev->md.flags = mdf;
1076 			drbd_md_mark_dirty(mdev);
1077 		}
1078 		if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1079 			drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1080 		put_ldev(mdev);
1081 	}
1082 
1083 	/* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1084 	if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1085 	    os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1086 		set_bit(CONSIDER_RESYNC, &mdev->flags);
1087 
1088 	/* Receiver should clean up itself */
1089 	if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1090 		drbd_thread_stop_nowait(&mdev->receiver);
1091 
1092 	/* Now the receiver finished cleaning up itself, it should die */
1093 	if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1094 		drbd_thread_stop_nowait(&mdev->receiver);
1095 
1096 	/* Upon network failure, we need to restart the receiver. */
1097 	if (os.conn > C_TEAR_DOWN &&
1098 	    ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1099 		drbd_thread_restart_nowait(&mdev->receiver);
1100 
1101 	ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1102 	if (ascw) {
1103 		ascw->os = os;
1104 		ascw->ns = ns;
1105 		ascw->flags = flags;
1106 		ascw->w.cb = w_after_state_ch;
1107 		ascw->done = done;
1108 		drbd_queue_work(&mdev->data.work, &ascw->w);
1109 	} else {
1110 		dev_warn(DEV, "Could not kmalloc an ascw\n");
1111 	}
1112 
1113 	return rv;
1114 }
1115 
1116 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1117 {
1118 	struct after_state_chg_work *ascw =
1119 		container_of(w, struct after_state_chg_work, w);
1120 	after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1121 	if (ascw->flags & CS_WAIT_COMPLETE) {
1122 		D_ASSERT(ascw->done != NULL);
1123 		complete(ascw->done);
1124 	}
1125 	kfree(ascw);
1126 
1127 	return 1;
1128 }
1129 
1130 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1131 {
1132 	if (rv) {
1133 		dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1134 		_drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1135 		return;
1136 	}
1137 
1138 	switch (mdev->state.conn) {
1139 	case C_STARTING_SYNC_T:
1140 		_drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1141 		break;
1142 	case C_STARTING_SYNC_S:
1143 		drbd_start_resync(mdev, C_SYNC_SOURCE);
1144 		break;
1145 	}
1146 }
1147 
1148 /**
1149  * after_state_ch() - Perform after state change actions that may sleep
1150  * @mdev:	DRBD device.
1151  * @os:		old state.
1152  * @ns:		new state.
1153  * @flags:	Flags
1154  */
1155 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1156 			   union drbd_state ns, enum chg_state_flags flags)
1157 {
1158 	enum drbd_fencing_p fp;
1159 
1160 	if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1161 		clear_bit(CRASHED_PRIMARY, &mdev->flags);
1162 		if (mdev->p_uuid)
1163 			mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1164 	}
1165 
1166 	fp = FP_DONT_CARE;
1167 	if (get_ldev(mdev)) {
1168 		fp = mdev->ldev->dc.fencing;
1169 		put_ldev(mdev);
1170 	}
1171 
1172 	/* Inform userspace about the change... */
1173 	drbd_bcast_state(mdev, ns);
1174 
1175 	if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1176 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1177 		drbd_khelper(mdev, "pri-on-incon-degr");
1178 
1179 	/* Here we have the actions that are performed after a
1180 	   state change. This function might sleep */
1181 
1182 	if (fp == FP_STONITH && ns.susp) {
1183 		/* case1: The outdate peer handler is successful:
1184 		 * case2: The connection was established again: */
1185 		if ((os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) ||
1186 		    (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)) {
1187 			tl_clear(mdev);
1188 			spin_lock_irq(&mdev->req_lock);
1189 			_drbd_set_state(_NS(mdev, susp, 0), CS_VERBOSE, NULL);
1190 			spin_unlock_irq(&mdev->req_lock);
1191 		}
1192 	}
1193 	/* Do not change the order of the if above and the two below... */
1194 	if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1195 		drbd_send_uuids(mdev);
1196 		drbd_send_state(mdev);
1197 	}
1198 	if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1199 		drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1200 
1201 	/* Lost contact to peer's copy of the data */
1202 	if ((os.pdsk >= D_INCONSISTENT &&
1203 	     os.pdsk != D_UNKNOWN &&
1204 	     os.pdsk != D_OUTDATED)
1205 	&&  (ns.pdsk < D_INCONSISTENT ||
1206 	     ns.pdsk == D_UNKNOWN ||
1207 	     ns.pdsk == D_OUTDATED)) {
1208 		kfree(mdev->p_uuid);
1209 		mdev->p_uuid = NULL;
1210 		if (get_ldev(mdev)) {
1211 			if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1212 			    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1213 				drbd_uuid_new_current(mdev);
1214 				drbd_send_uuids(mdev);
1215 			}
1216 			put_ldev(mdev);
1217 		}
1218 	}
1219 
1220 	if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1221 		if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0)
1222 			drbd_uuid_new_current(mdev);
1223 
1224 		/* D_DISKLESS Peer becomes secondary */
1225 		if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1226 			drbd_al_to_on_disk_bm(mdev);
1227 		put_ldev(mdev);
1228 	}
1229 
1230 	/* Last part of the attaching process ... */
1231 	if (ns.conn >= C_CONNECTED &&
1232 	    os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1233 		kfree(mdev->p_uuid); /* We expect to receive up-to-date UUIDs soon. */
1234 		mdev->p_uuid = NULL; /* ...to not use the old ones in the mean time */
1235 		drbd_send_sizes(mdev, 0);  /* to start sync... */
1236 		drbd_send_uuids(mdev);
1237 		drbd_send_state(mdev);
1238 	}
1239 
1240 	/* We want to pause/continue resync, tell peer. */
1241 	if (ns.conn >= C_CONNECTED &&
1242 	     ((os.aftr_isp != ns.aftr_isp) ||
1243 	      (os.user_isp != ns.user_isp)))
1244 		drbd_send_state(mdev);
1245 
1246 	/* In case one of the isp bits got set, suspend other devices. */
1247 	if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1248 	    (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1249 		suspend_other_sg(mdev);
1250 
1251 	/* Make sure the peer gets informed about eventual state
1252 	   changes (ISP bits) while we were in WFReportParams. */
1253 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1254 		drbd_send_state(mdev);
1255 
1256 	/* We are in the progress to start a full sync... */
1257 	if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1258 	    (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1259 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1260 
1261 	/* We are invalidating our self... */
1262 	if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1263 	    os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1264 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1265 
1266 	if (os.disk > D_FAILED && ns.disk == D_FAILED) {
1267 		enum drbd_io_error_p eh;
1268 
1269 		eh = EP_PASS_ON;
1270 		if (get_ldev_if_state(mdev, D_FAILED)) {
1271 			eh = mdev->ldev->dc.on_io_error;
1272 			put_ldev(mdev);
1273 		}
1274 
1275 		drbd_rs_cancel_all(mdev);
1276 		/* since get_ldev() only works as long as disk>=D_INCONSISTENT,
1277 		   and it is D_DISKLESS here, local_cnt can only go down, it can
1278 		   not increase... It will reach zero */
1279 		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1280 		mdev->rs_total = 0;
1281 		mdev->rs_failed = 0;
1282 		atomic_set(&mdev->rs_pending_cnt, 0);
1283 
1284 		spin_lock_irq(&mdev->req_lock);
1285 		_drbd_set_state(_NS(mdev, disk, D_DISKLESS), CS_HARD, NULL);
1286 		spin_unlock_irq(&mdev->req_lock);
1287 
1288 		if (eh == EP_CALL_HELPER)
1289 			drbd_khelper(mdev, "local-io-error");
1290 	}
1291 
1292 	if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
1293 
1294 		if (os.disk == D_FAILED) /* && ns.disk == D_DISKLESS*/ {
1295 			if (drbd_send_state(mdev))
1296 				dev_warn(DEV, "Notified peer that my disk is broken.\n");
1297 			else
1298 				dev_err(DEV, "Sending state in drbd_io_error() failed\n");
1299 		}
1300 
1301 		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1302 		lc_destroy(mdev->resync);
1303 		mdev->resync = NULL;
1304 		lc_destroy(mdev->act_log);
1305 		mdev->act_log = NULL;
1306 		__no_warn(local,
1307 			drbd_free_bc(mdev->ldev);
1308 			mdev->ldev = NULL;);
1309 
1310 		if (mdev->md_io_tmpp)
1311 			__free_page(mdev->md_io_tmpp);
1312 	}
1313 
1314 	/* Disks got bigger while they were detached */
1315 	if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1316 	    test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1317 		if (ns.conn == C_CONNECTED)
1318 			resync_after_online_grow(mdev);
1319 	}
1320 
1321 	/* A resync finished or aborted, wake paused devices... */
1322 	if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1323 	    (os.peer_isp && !ns.peer_isp) ||
1324 	    (os.user_isp && !ns.user_isp))
1325 		resume_next_sg(mdev);
1326 
1327 	/* Upon network connection, we need to start the receiver */
1328 	if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1329 		drbd_thread_start(&mdev->receiver);
1330 
1331 	/* Terminate worker thread if we are unconfigured - it will be
1332 	   restarted as needed... */
1333 	if (ns.disk == D_DISKLESS &&
1334 	    ns.conn == C_STANDALONE &&
1335 	    ns.role == R_SECONDARY) {
1336 		if (os.aftr_isp != ns.aftr_isp)
1337 			resume_next_sg(mdev);
1338 		/* set in __drbd_set_state, unless CONFIG_PENDING was set */
1339 		if (test_bit(DEVICE_DYING, &mdev->flags))
1340 			drbd_thread_stop_nowait(&mdev->worker);
1341 	}
1342 
1343 	drbd_md_sync(mdev);
1344 }
1345 
1346 
1347 static int drbd_thread_setup(void *arg)
1348 {
1349 	struct drbd_thread *thi = (struct drbd_thread *) arg;
1350 	struct drbd_conf *mdev = thi->mdev;
1351 	unsigned long flags;
1352 	int retval;
1353 
1354 restart:
1355 	retval = thi->function(thi);
1356 
1357 	spin_lock_irqsave(&thi->t_lock, flags);
1358 
1359 	/* if the receiver has been "Exiting", the last thing it did
1360 	 * was set the conn state to "StandAlone",
1361 	 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1362 	 * and receiver thread will be "started".
1363 	 * drbd_thread_start needs to set "Restarting" in that case.
1364 	 * t_state check and assignment needs to be within the same spinlock,
1365 	 * so either thread_start sees Exiting, and can remap to Restarting,
1366 	 * or thread_start see None, and can proceed as normal.
1367 	 */
1368 
1369 	if (thi->t_state == Restarting) {
1370 		dev_info(DEV, "Restarting %s\n", current->comm);
1371 		thi->t_state = Running;
1372 		spin_unlock_irqrestore(&thi->t_lock, flags);
1373 		goto restart;
1374 	}
1375 
1376 	thi->task = NULL;
1377 	thi->t_state = None;
1378 	smp_mb();
1379 	complete(&thi->stop);
1380 	spin_unlock_irqrestore(&thi->t_lock, flags);
1381 
1382 	dev_info(DEV, "Terminating %s\n", current->comm);
1383 
1384 	/* Release mod reference taken when thread was started */
1385 	module_put(THIS_MODULE);
1386 	return retval;
1387 }
1388 
1389 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1390 		      int (*func) (struct drbd_thread *))
1391 {
1392 	spin_lock_init(&thi->t_lock);
1393 	thi->task    = NULL;
1394 	thi->t_state = None;
1395 	thi->function = func;
1396 	thi->mdev = mdev;
1397 }
1398 
1399 int drbd_thread_start(struct drbd_thread *thi)
1400 {
1401 	struct drbd_conf *mdev = thi->mdev;
1402 	struct task_struct *nt;
1403 	unsigned long flags;
1404 
1405 	const char *me =
1406 		thi == &mdev->receiver ? "receiver" :
1407 		thi == &mdev->asender  ? "asender"  :
1408 		thi == &mdev->worker   ? "worker"   : "NONSENSE";
1409 
1410 	/* is used from state engine doing drbd_thread_stop_nowait,
1411 	 * while holding the req lock irqsave */
1412 	spin_lock_irqsave(&thi->t_lock, flags);
1413 
1414 	switch (thi->t_state) {
1415 	case None:
1416 		dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1417 				me, current->comm, current->pid);
1418 
1419 		/* Get ref on module for thread - this is released when thread exits */
1420 		if (!try_module_get(THIS_MODULE)) {
1421 			dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1422 			spin_unlock_irqrestore(&thi->t_lock, flags);
1423 			return FALSE;
1424 		}
1425 
1426 		init_completion(&thi->stop);
1427 		D_ASSERT(thi->task == NULL);
1428 		thi->reset_cpu_mask = 1;
1429 		thi->t_state = Running;
1430 		spin_unlock_irqrestore(&thi->t_lock, flags);
1431 		flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1432 
1433 		nt = kthread_create(drbd_thread_setup, (void *) thi,
1434 				    "drbd%d_%s", mdev_to_minor(mdev), me);
1435 
1436 		if (IS_ERR(nt)) {
1437 			dev_err(DEV, "Couldn't start thread\n");
1438 
1439 			module_put(THIS_MODULE);
1440 			return FALSE;
1441 		}
1442 		spin_lock_irqsave(&thi->t_lock, flags);
1443 		thi->task = nt;
1444 		thi->t_state = Running;
1445 		spin_unlock_irqrestore(&thi->t_lock, flags);
1446 		wake_up_process(nt);
1447 		break;
1448 	case Exiting:
1449 		thi->t_state = Restarting;
1450 		dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1451 				me, current->comm, current->pid);
1452 		/* fall through */
1453 	case Running:
1454 	case Restarting:
1455 	default:
1456 		spin_unlock_irqrestore(&thi->t_lock, flags);
1457 		break;
1458 	}
1459 
1460 	return TRUE;
1461 }
1462 
1463 
1464 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1465 {
1466 	unsigned long flags;
1467 
1468 	enum drbd_thread_state ns = restart ? Restarting : Exiting;
1469 
1470 	/* may be called from state engine, holding the req lock irqsave */
1471 	spin_lock_irqsave(&thi->t_lock, flags);
1472 
1473 	if (thi->t_state == None) {
1474 		spin_unlock_irqrestore(&thi->t_lock, flags);
1475 		if (restart)
1476 			drbd_thread_start(thi);
1477 		return;
1478 	}
1479 
1480 	if (thi->t_state != ns) {
1481 		if (thi->task == NULL) {
1482 			spin_unlock_irqrestore(&thi->t_lock, flags);
1483 			return;
1484 		}
1485 
1486 		thi->t_state = ns;
1487 		smp_mb();
1488 		init_completion(&thi->stop);
1489 		if (thi->task != current)
1490 			force_sig(DRBD_SIGKILL, thi->task);
1491 
1492 	}
1493 
1494 	spin_unlock_irqrestore(&thi->t_lock, flags);
1495 
1496 	if (wait)
1497 		wait_for_completion(&thi->stop);
1498 }
1499 
1500 #ifdef CONFIG_SMP
1501 /**
1502  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1503  * @mdev:	DRBD device.
1504  *
1505  * Forces all threads of a device onto the same CPU. This is beneficial for
1506  * DRBD's performance. May be overwritten by user's configuration.
1507  */
1508 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1509 {
1510 	int ord, cpu;
1511 
1512 	/* user override. */
1513 	if (cpumask_weight(mdev->cpu_mask))
1514 		return;
1515 
1516 	ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1517 	for_each_online_cpu(cpu) {
1518 		if (ord-- == 0) {
1519 			cpumask_set_cpu(cpu, mdev->cpu_mask);
1520 			return;
1521 		}
1522 	}
1523 	/* should not be reached */
1524 	cpumask_setall(mdev->cpu_mask);
1525 }
1526 
1527 /**
1528  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1529  * @mdev:	DRBD device.
1530  *
1531  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1532  * prematurely.
1533  */
1534 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1535 {
1536 	struct task_struct *p = current;
1537 	struct drbd_thread *thi =
1538 		p == mdev->asender.task  ? &mdev->asender  :
1539 		p == mdev->receiver.task ? &mdev->receiver :
1540 		p == mdev->worker.task   ? &mdev->worker   :
1541 		NULL;
1542 	ERR_IF(thi == NULL)
1543 		return;
1544 	if (!thi->reset_cpu_mask)
1545 		return;
1546 	thi->reset_cpu_mask = 0;
1547 	set_cpus_allowed_ptr(p, mdev->cpu_mask);
1548 }
1549 #endif
1550 
1551 /* the appropriate socket mutex must be held already */
1552 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1553 			  enum drbd_packets cmd, struct p_header *h,
1554 			  size_t size, unsigned msg_flags)
1555 {
1556 	int sent, ok;
1557 
1558 	ERR_IF(!h) return FALSE;
1559 	ERR_IF(!size) return FALSE;
1560 
1561 	h->magic   = BE_DRBD_MAGIC;
1562 	h->command = cpu_to_be16(cmd);
1563 	h->length  = cpu_to_be16(size-sizeof(struct p_header));
1564 
1565 	sent = drbd_send(mdev, sock, h, size, msg_flags);
1566 
1567 	ok = (sent == size);
1568 	if (!ok)
1569 		dev_err(DEV, "short sent %s size=%d sent=%d\n",
1570 		    cmdname(cmd), (int)size, sent);
1571 	return ok;
1572 }
1573 
1574 /* don't pass the socket. we may only look at it
1575  * when we hold the appropriate socket mutex.
1576  */
1577 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1578 		  enum drbd_packets cmd, struct p_header *h, size_t size)
1579 {
1580 	int ok = 0;
1581 	struct socket *sock;
1582 
1583 	if (use_data_socket) {
1584 		mutex_lock(&mdev->data.mutex);
1585 		sock = mdev->data.socket;
1586 	} else {
1587 		mutex_lock(&mdev->meta.mutex);
1588 		sock = mdev->meta.socket;
1589 	}
1590 
1591 	/* drbd_disconnect() could have called drbd_free_sock()
1592 	 * while we were waiting in down()... */
1593 	if (likely(sock != NULL))
1594 		ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1595 
1596 	if (use_data_socket)
1597 		mutex_unlock(&mdev->data.mutex);
1598 	else
1599 		mutex_unlock(&mdev->meta.mutex);
1600 	return ok;
1601 }
1602 
1603 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1604 		   size_t size)
1605 {
1606 	struct p_header h;
1607 	int ok;
1608 
1609 	h.magic   = BE_DRBD_MAGIC;
1610 	h.command = cpu_to_be16(cmd);
1611 	h.length  = cpu_to_be16(size);
1612 
1613 	if (!drbd_get_data_sock(mdev))
1614 		return 0;
1615 
1616 	ok = (sizeof(h) ==
1617 		drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1618 	ok = ok && (size ==
1619 		drbd_send(mdev, mdev->data.socket, data, size, 0));
1620 
1621 	drbd_put_data_sock(mdev);
1622 
1623 	return ok;
1624 }
1625 
1626 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1627 {
1628 	struct p_rs_param_89 *p;
1629 	struct socket *sock;
1630 	int size, rv;
1631 	const int apv = mdev->agreed_pro_version;
1632 
1633 	size = apv <= 87 ? sizeof(struct p_rs_param)
1634 		: apv == 88 ? sizeof(struct p_rs_param)
1635 			+ strlen(mdev->sync_conf.verify_alg) + 1
1636 		: /* 89 */    sizeof(struct p_rs_param_89);
1637 
1638 	/* used from admin command context and receiver/worker context.
1639 	 * to avoid kmalloc, grab the socket right here,
1640 	 * then use the pre-allocated sbuf there */
1641 	mutex_lock(&mdev->data.mutex);
1642 	sock = mdev->data.socket;
1643 
1644 	if (likely(sock != NULL)) {
1645 		enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1646 
1647 		p = &mdev->data.sbuf.rs_param_89;
1648 
1649 		/* initialize verify_alg and csums_alg */
1650 		memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1651 
1652 		p->rate = cpu_to_be32(sc->rate);
1653 
1654 		if (apv >= 88)
1655 			strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1656 		if (apv >= 89)
1657 			strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1658 
1659 		rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1660 	} else
1661 		rv = 0; /* not ok */
1662 
1663 	mutex_unlock(&mdev->data.mutex);
1664 
1665 	return rv;
1666 }
1667 
1668 int drbd_send_protocol(struct drbd_conf *mdev)
1669 {
1670 	struct p_protocol *p;
1671 	int size, rv;
1672 
1673 	size = sizeof(struct p_protocol);
1674 
1675 	if (mdev->agreed_pro_version >= 87)
1676 		size += strlen(mdev->net_conf->integrity_alg) + 1;
1677 
1678 	/* we must not recurse into our own queue,
1679 	 * as that is blocked during handshake */
1680 	p = kmalloc(size, GFP_NOIO);
1681 	if (p == NULL)
1682 		return 0;
1683 
1684 	p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1685 	p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1686 	p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1687 	p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1688 	p->want_lose     = cpu_to_be32(mdev->net_conf->want_lose);
1689 	p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1690 
1691 	if (mdev->agreed_pro_version >= 87)
1692 		strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1693 
1694 	rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1695 			   (struct p_header *)p, size);
1696 	kfree(p);
1697 	return rv;
1698 }
1699 
1700 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1701 {
1702 	struct p_uuids p;
1703 	int i;
1704 
1705 	if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1706 		return 1;
1707 
1708 	for (i = UI_CURRENT; i < UI_SIZE; i++)
1709 		p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1710 
1711 	mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1712 	p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1713 	uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1714 	uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1715 	uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1716 	p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1717 
1718 	put_ldev(mdev);
1719 
1720 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
1721 			     (struct p_header *)&p, sizeof(p));
1722 }
1723 
1724 int drbd_send_uuids(struct drbd_conf *mdev)
1725 {
1726 	return _drbd_send_uuids(mdev, 0);
1727 }
1728 
1729 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1730 {
1731 	return _drbd_send_uuids(mdev, 8);
1732 }
1733 
1734 
1735 int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
1736 {
1737 	struct p_rs_uuid p;
1738 
1739 	p.uuid = cpu_to_be64(val);
1740 
1741 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
1742 			     (struct p_header *)&p, sizeof(p));
1743 }
1744 
1745 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply)
1746 {
1747 	struct p_sizes p;
1748 	sector_t d_size, u_size;
1749 	int q_order_type;
1750 	int ok;
1751 
1752 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1753 		D_ASSERT(mdev->ldev->backing_bdev);
1754 		d_size = drbd_get_max_capacity(mdev->ldev);
1755 		u_size = mdev->ldev->dc.disk_size;
1756 		q_order_type = drbd_queue_order_type(mdev);
1757 		p.queue_order_type = cpu_to_be32(drbd_queue_order_type(mdev));
1758 		put_ldev(mdev);
1759 	} else {
1760 		d_size = 0;
1761 		u_size = 0;
1762 		q_order_type = QUEUE_ORDERED_NONE;
1763 	}
1764 
1765 	p.d_size = cpu_to_be64(d_size);
1766 	p.u_size = cpu_to_be64(u_size);
1767 	p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1768 	p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue));
1769 	p.queue_order_type = cpu_to_be32(q_order_type);
1770 
1771 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
1772 			   (struct p_header *)&p, sizeof(p));
1773 	return ok;
1774 }
1775 
1776 /**
1777  * drbd_send_state() - Sends the drbd state to the peer
1778  * @mdev:	DRBD device.
1779  */
1780 int drbd_send_state(struct drbd_conf *mdev)
1781 {
1782 	struct socket *sock;
1783 	struct p_state p;
1784 	int ok = 0;
1785 
1786 	/* Grab state lock so we wont send state if we're in the middle
1787 	 * of a cluster wide state change on another thread */
1788 	drbd_state_lock(mdev);
1789 
1790 	mutex_lock(&mdev->data.mutex);
1791 
1792 	p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1793 	sock = mdev->data.socket;
1794 
1795 	if (likely(sock != NULL)) {
1796 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
1797 				    (struct p_header *)&p, sizeof(p), 0);
1798 	}
1799 
1800 	mutex_unlock(&mdev->data.mutex);
1801 
1802 	drbd_state_unlock(mdev);
1803 	return ok;
1804 }
1805 
1806 int drbd_send_state_req(struct drbd_conf *mdev,
1807 	union drbd_state mask, union drbd_state val)
1808 {
1809 	struct p_req_state p;
1810 
1811 	p.mask    = cpu_to_be32(mask.i);
1812 	p.val     = cpu_to_be32(val.i);
1813 
1814 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
1815 			     (struct p_header *)&p, sizeof(p));
1816 }
1817 
1818 int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
1819 {
1820 	struct p_req_state_reply p;
1821 
1822 	p.retcode    = cpu_to_be32(retcode);
1823 
1824 	return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
1825 			     (struct p_header *)&p, sizeof(p));
1826 }
1827 
1828 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1829 	struct p_compressed_bm *p,
1830 	struct bm_xfer_ctx *c)
1831 {
1832 	struct bitstream bs;
1833 	unsigned long plain_bits;
1834 	unsigned long tmp;
1835 	unsigned long rl;
1836 	unsigned len;
1837 	unsigned toggle;
1838 	int bits;
1839 
1840 	/* may we use this feature? */
1841 	if ((mdev->sync_conf.use_rle == 0) ||
1842 		(mdev->agreed_pro_version < 90))
1843 			return 0;
1844 
1845 	if (c->bit_offset >= c->bm_bits)
1846 		return 0; /* nothing to do. */
1847 
1848 	/* use at most thus many bytes */
1849 	bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1850 	memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1851 	/* plain bits covered in this code string */
1852 	plain_bits = 0;
1853 
1854 	/* p->encoding & 0x80 stores whether the first run length is set.
1855 	 * bit offset is implicit.
1856 	 * start with toggle == 2 to be able to tell the first iteration */
1857 	toggle = 2;
1858 
1859 	/* see how much plain bits we can stuff into one packet
1860 	 * using RLE and VLI. */
1861 	do {
1862 		tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1863 				    : _drbd_bm_find_next(mdev, c->bit_offset);
1864 		if (tmp == -1UL)
1865 			tmp = c->bm_bits;
1866 		rl = tmp - c->bit_offset;
1867 
1868 		if (toggle == 2) { /* first iteration */
1869 			if (rl == 0) {
1870 				/* the first checked bit was set,
1871 				 * store start value, */
1872 				DCBP_set_start(p, 1);
1873 				/* but skip encoding of zero run length */
1874 				toggle = !toggle;
1875 				continue;
1876 			}
1877 			DCBP_set_start(p, 0);
1878 		}
1879 
1880 		/* paranoia: catch zero runlength.
1881 		 * can only happen if bitmap is modified while we scan it. */
1882 		if (rl == 0) {
1883 			dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1884 			    "t:%u bo:%lu\n", toggle, c->bit_offset);
1885 			return -1;
1886 		}
1887 
1888 		bits = vli_encode_bits(&bs, rl);
1889 		if (bits == -ENOBUFS) /* buffer full */
1890 			break;
1891 		if (bits <= 0) {
1892 			dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1893 			return 0;
1894 		}
1895 
1896 		toggle = !toggle;
1897 		plain_bits += rl;
1898 		c->bit_offset = tmp;
1899 	} while (c->bit_offset < c->bm_bits);
1900 
1901 	len = bs.cur.b - p->code + !!bs.cur.bit;
1902 
1903 	if (plain_bits < (len << 3)) {
1904 		/* incompressible with this method.
1905 		 * we need to rewind both word and bit position. */
1906 		c->bit_offset -= plain_bits;
1907 		bm_xfer_ctx_bit_to_word_offset(c);
1908 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1909 		return 0;
1910 	}
1911 
1912 	/* RLE + VLI was able to compress it just fine.
1913 	 * update c->word_offset. */
1914 	bm_xfer_ctx_bit_to_word_offset(c);
1915 
1916 	/* store pad_bits */
1917 	DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1918 
1919 	return len;
1920 }
1921 
1922 enum { OK, FAILED, DONE }
1923 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1924 	struct p_header *h, struct bm_xfer_ctx *c)
1925 {
1926 	struct p_compressed_bm *p = (void*)h;
1927 	unsigned long num_words;
1928 	int len;
1929 	int ok;
1930 
1931 	len = fill_bitmap_rle_bits(mdev, p, c);
1932 
1933 	if (len < 0)
1934 		return FAILED;
1935 
1936 	if (len) {
1937 		DCBP_set_code(p, RLE_VLI_Bits);
1938 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
1939 			sizeof(*p) + len, 0);
1940 
1941 		c->packets[0]++;
1942 		c->bytes[0] += sizeof(*p) + len;
1943 
1944 		if (c->bit_offset >= c->bm_bits)
1945 			len = 0; /* DONE */
1946 	} else {
1947 		/* was not compressible.
1948 		 * send a buffer full of plain text bits instead. */
1949 		num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1950 		len = num_words * sizeof(long);
1951 		if (len)
1952 			drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
1953 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
1954 				   h, sizeof(struct p_header) + len, 0);
1955 		c->word_offset += num_words;
1956 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1957 
1958 		c->packets[1]++;
1959 		c->bytes[1] += sizeof(struct p_header) + len;
1960 
1961 		if (c->bit_offset > c->bm_bits)
1962 			c->bit_offset = c->bm_bits;
1963 	}
1964 	ok = ok ? ((len == 0) ? DONE : OK) : FAILED;
1965 
1966 	if (ok == DONE)
1967 		INFO_bm_xfer_stats(mdev, "send", c);
1968 	return ok;
1969 }
1970 
1971 /* See the comment at receive_bitmap() */
1972 int _drbd_send_bitmap(struct drbd_conf *mdev)
1973 {
1974 	struct bm_xfer_ctx c;
1975 	struct p_header *p;
1976 	int ret;
1977 
1978 	ERR_IF(!mdev->bitmap) return FALSE;
1979 
1980 	/* maybe we should use some per thread scratch page,
1981 	 * and allocate that during initial device creation? */
1982 	p = (struct p_header *) __get_free_page(GFP_NOIO);
1983 	if (!p) {
1984 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
1985 		return FALSE;
1986 	}
1987 
1988 	if (get_ldev(mdev)) {
1989 		if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1990 			dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1991 			drbd_bm_set_all(mdev);
1992 			if (drbd_bm_write(mdev)) {
1993 				/* write_bm did fail! Leave full sync flag set in Meta P_DATA
1994 				 * but otherwise process as per normal - need to tell other
1995 				 * side that a full resync is required! */
1996 				dev_err(DEV, "Failed to write bitmap to disk!\n");
1997 			} else {
1998 				drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1999 				drbd_md_sync(mdev);
2000 			}
2001 		}
2002 		put_ldev(mdev);
2003 	}
2004 
2005 	c = (struct bm_xfer_ctx) {
2006 		.bm_bits = drbd_bm_bits(mdev),
2007 		.bm_words = drbd_bm_words(mdev),
2008 	};
2009 
2010 	do {
2011 		ret = send_bitmap_rle_or_plain(mdev, p, &c);
2012 	} while (ret == OK);
2013 
2014 	free_page((unsigned long) p);
2015 	return (ret == DONE);
2016 }
2017 
2018 int drbd_send_bitmap(struct drbd_conf *mdev)
2019 {
2020 	int err;
2021 
2022 	if (!drbd_get_data_sock(mdev))
2023 		return -1;
2024 	err = !_drbd_send_bitmap(mdev);
2025 	drbd_put_data_sock(mdev);
2026 	return err;
2027 }
2028 
2029 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2030 {
2031 	int ok;
2032 	struct p_barrier_ack p;
2033 
2034 	p.barrier  = barrier_nr;
2035 	p.set_size = cpu_to_be32(set_size);
2036 
2037 	if (mdev->state.conn < C_CONNECTED)
2038 		return FALSE;
2039 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2040 			(struct p_header *)&p, sizeof(p));
2041 	return ok;
2042 }
2043 
2044 /**
2045  * _drbd_send_ack() - Sends an ack packet
2046  * @mdev:	DRBD device.
2047  * @cmd:	Packet command code.
2048  * @sector:	sector, needs to be in big endian byte order
2049  * @blksize:	size in byte, needs to be in big endian byte order
2050  * @block_id:	Id, big endian byte order
2051  */
2052 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2053 			  u64 sector,
2054 			  u32 blksize,
2055 			  u64 block_id)
2056 {
2057 	int ok;
2058 	struct p_block_ack p;
2059 
2060 	p.sector   = sector;
2061 	p.block_id = block_id;
2062 	p.blksize  = blksize;
2063 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2064 
2065 	if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2066 		return FALSE;
2067 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2068 				(struct p_header *)&p, sizeof(p));
2069 	return ok;
2070 }
2071 
2072 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2073 		     struct p_data *dp)
2074 {
2075 	const int header_size = sizeof(struct p_data)
2076 			      - sizeof(struct p_header);
2077 	int data_size  = ((struct p_header *)dp)->length - header_size;
2078 
2079 	return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2080 			      dp->block_id);
2081 }
2082 
2083 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2084 		     struct p_block_req *rp)
2085 {
2086 	return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2087 }
2088 
2089 /**
2090  * drbd_send_ack() - Sends an ack packet
2091  * @mdev:	DRBD device.
2092  * @cmd:	Packet command code.
2093  * @e:		Epoch entry.
2094  */
2095 int drbd_send_ack(struct drbd_conf *mdev,
2096 	enum drbd_packets cmd, struct drbd_epoch_entry *e)
2097 {
2098 	return _drbd_send_ack(mdev, cmd,
2099 			      cpu_to_be64(e->sector),
2100 			      cpu_to_be32(e->size),
2101 			      e->block_id);
2102 }
2103 
2104 /* This function misuses the block_id field to signal if the blocks
2105  * are is sync or not. */
2106 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2107 		     sector_t sector, int blksize, u64 block_id)
2108 {
2109 	return _drbd_send_ack(mdev, cmd,
2110 			      cpu_to_be64(sector),
2111 			      cpu_to_be32(blksize),
2112 			      cpu_to_be64(block_id));
2113 }
2114 
2115 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2116 		       sector_t sector, int size, u64 block_id)
2117 {
2118 	int ok;
2119 	struct p_block_req p;
2120 
2121 	p.sector   = cpu_to_be64(sector);
2122 	p.block_id = block_id;
2123 	p.blksize  = cpu_to_be32(size);
2124 
2125 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2126 				(struct p_header *)&p, sizeof(p));
2127 	return ok;
2128 }
2129 
2130 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2131 			    sector_t sector, int size,
2132 			    void *digest, int digest_size,
2133 			    enum drbd_packets cmd)
2134 {
2135 	int ok;
2136 	struct p_block_req p;
2137 
2138 	p.sector   = cpu_to_be64(sector);
2139 	p.block_id = BE_DRBD_MAGIC + 0xbeef;
2140 	p.blksize  = cpu_to_be32(size);
2141 
2142 	p.head.magic   = BE_DRBD_MAGIC;
2143 	p.head.command = cpu_to_be16(cmd);
2144 	p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header) + digest_size);
2145 
2146 	mutex_lock(&mdev->data.mutex);
2147 
2148 	ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2149 	ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2150 
2151 	mutex_unlock(&mdev->data.mutex);
2152 
2153 	return ok;
2154 }
2155 
2156 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2157 {
2158 	int ok;
2159 	struct p_block_req p;
2160 
2161 	p.sector   = cpu_to_be64(sector);
2162 	p.block_id = BE_DRBD_MAGIC + 0xbabe;
2163 	p.blksize  = cpu_to_be32(size);
2164 
2165 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2166 			   (struct p_header *)&p, sizeof(p));
2167 	return ok;
2168 }
2169 
2170 /* called on sndtimeo
2171  * returns FALSE if we should retry,
2172  * TRUE if we think connection is dead
2173  */
2174 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2175 {
2176 	int drop_it;
2177 	/* long elapsed = (long)(jiffies - mdev->last_received); */
2178 
2179 	drop_it =   mdev->meta.socket == sock
2180 		|| !mdev->asender.task
2181 		|| get_t_state(&mdev->asender) != Running
2182 		|| mdev->state.conn < C_CONNECTED;
2183 
2184 	if (drop_it)
2185 		return TRUE;
2186 
2187 	drop_it = !--mdev->ko_count;
2188 	if (!drop_it) {
2189 		dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2190 		       current->comm, current->pid, mdev->ko_count);
2191 		request_ping(mdev);
2192 	}
2193 
2194 	return drop_it; /* && (mdev->state == R_PRIMARY) */;
2195 }
2196 
2197 /* The idea of sendpage seems to be to put some kind of reference
2198  * to the page into the skb, and to hand it over to the NIC. In
2199  * this process get_page() gets called.
2200  *
2201  * As soon as the page was really sent over the network put_page()
2202  * gets called by some part of the network layer. [ NIC driver? ]
2203  *
2204  * [ get_page() / put_page() increment/decrement the count. If count
2205  *   reaches 0 the page will be freed. ]
2206  *
2207  * This works nicely with pages from FSs.
2208  * But this means that in protocol A we might signal IO completion too early!
2209  *
2210  * In order not to corrupt data during a resync we must make sure
2211  * that we do not reuse our own buffer pages (EEs) to early, therefore
2212  * we have the net_ee list.
2213  *
2214  * XFS seems to have problems, still, it submits pages with page_count == 0!
2215  * As a workaround, we disable sendpage on pages
2216  * with page_count == 0 or PageSlab.
2217  */
2218 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2219 		   int offset, size_t size)
2220 {
2221 	int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, 0);
2222 	kunmap(page);
2223 	if (sent == size)
2224 		mdev->send_cnt += size>>9;
2225 	return sent == size;
2226 }
2227 
2228 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2229 		    int offset, size_t size)
2230 {
2231 	mm_segment_t oldfs = get_fs();
2232 	int sent, ok;
2233 	int len = size;
2234 
2235 	/* e.g. XFS meta- & log-data is in slab pages, which have a
2236 	 * page_count of 0 and/or have PageSlab() set.
2237 	 * we cannot use send_page for those, as that does get_page();
2238 	 * put_page(); and would cause either a VM_BUG directly, or
2239 	 * __page_cache_release a page that would actually still be referenced
2240 	 * by someone, leading to some obscure delayed Oops somewhere else. */
2241 	if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2242 		return _drbd_no_send_page(mdev, page, offset, size);
2243 
2244 	drbd_update_congested(mdev);
2245 	set_fs(KERNEL_DS);
2246 	do {
2247 		sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2248 							offset, len,
2249 							MSG_NOSIGNAL);
2250 		if (sent == -EAGAIN) {
2251 			if (we_should_drop_the_connection(mdev,
2252 							  mdev->data.socket))
2253 				break;
2254 			else
2255 				continue;
2256 		}
2257 		if (sent <= 0) {
2258 			dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2259 			     __func__, (int)size, len, sent);
2260 			break;
2261 		}
2262 		len    -= sent;
2263 		offset += sent;
2264 	} while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2265 	set_fs(oldfs);
2266 	clear_bit(NET_CONGESTED, &mdev->flags);
2267 
2268 	ok = (len == 0);
2269 	if (likely(ok))
2270 		mdev->send_cnt += size>>9;
2271 	return ok;
2272 }
2273 
2274 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2275 {
2276 	struct bio_vec *bvec;
2277 	int i;
2278 	__bio_for_each_segment(bvec, bio, i, 0) {
2279 		if (!_drbd_no_send_page(mdev, bvec->bv_page,
2280 				     bvec->bv_offset, bvec->bv_len))
2281 			return 0;
2282 	}
2283 	return 1;
2284 }
2285 
2286 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2287 {
2288 	struct bio_vec *bvec;
2289 	int i;
2290 	__bio_for_each_segment(bvec, bio, i, 0) {
2291 		if (!_drbd_send_page(mdev, bvec->bv_page,
2292 				     bvec->bv_offset, bvec->bv_len))
2293 			return 0;
2294 	}
2295 
2296 	return 1;
2297 }
2298 
2299 /* Used to send write requests
2300  * R_PRIMARY -> Peer	(P_DATA)
2301  */
2302 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2303 {
2304 	int ok = 1;
2305 	struct p_data p;
2306 	unsigned int dp_flags = 0;
2307 	void *dgb;
2308 	int dgs;
2309 
2310 	if (!drbd_get_data_sock(mdev))
2311 		return 0;
2312 
2313 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2314 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2315 
2316 	p.head.magic   = BE_DRBD_MAGIC;
2317 	p.head.command = cpu_to_be16(P_DATA);
2318 	p.head.length  =
2319 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + req->size);
2320 
2321 	p.sector   = cpu_to_be64(req->sector);
2322 	p.block_id = (unsigned long)req;
2323 	p.seq_num  = cpu_to_be32(req->seq_num =
2324 				 atomic_add_return(1, &mdev->packet_seq));
2325 	dp_flags = 0;
2326 
2327 	/* NOTE: no need to check if barriers supported here as we would
2328 	 *       not pass the test in make_request_common in that case
2329 	 */
2330 	if (bio_rw_flagged(req->master_bio, BIO_RW_BARRIER)) {
2331 		dev_err(DEV, "ASSERT FAILED would have set DP_HARDBARRIER\n");
2332 		/* dp_flags |= DP_HARDBARRIER; */
2333 	}
2334 	if (bio_rw_flagged(req->master_bio, BIO_RW_SYNCIO))
2335 		dp_flags |= DP_RW_SYNC;
2336 	/* for now handle SYNCIO and UNPLUG
2337 	 * as if they still were one and the same flag */
2338 	if (bio_rw_flagged(req->master_bio, BIO_RW_UNPLUG))
2339 		dp_flags |= DP_RW_SYNC;
2340 	if (mdev->state.conn >= C_SYNC_SOURCE &&
2341 	    mdev->state.conn <= C_PAUSED_SYNC_T)
2342 		dp_flags |= DP_MAY_SET_IN_SYNC;
2343 
2344 	p.dp_flags = cpu_to_be32(dp_flags);
2345 	set_bit(UNPLUG_REMOTE, &mdev->flags);
2346 	ok = (sizeof(p) ==
2347 		drbd_send(mdev, mdev->data.socket, &p, sizeof(p), MSG_MORE));
2348 	if (ok && dgs) {
2349 		dgb = mdev->int_dig_out;
2350 		drbd_csum(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2351 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2352 	}
2353 	if (ok) {
2354 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
2355 			ok = _drbd_send_bio(mdev, req->master_bio);
2356 		else
2357 			ok = _drbd_send_zc_bio(mdev, req->master_bio);
2358 	}
2359 
2360 	drbd_put_data_sock(mdev);
2361 	return ok;
2362 }
2363 
2364 /* answer packet, used to send data back for read requests:
2365  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2366  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2367  */
2368 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2369 		    struct drbd_epoch_entry *e)
2370 {
2371 	int ok;
2372 	struct p_data p;
2373 	void *dgb;
2374 	int dgs;
2375 
2376 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2377 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2378 
2379 	p.head.magic   = BE_DRBD_MAGIC;
2380 	p.head.command = cpu_to_be16(cmd);
2381 	p.head.length  =
2382 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + e->size);
2383 
2384 	p.sector   = cpu_to_be64(e->sector);
2385 	p.block_id = e->block_id;
2386 	/* p.seq_num  = 0;    No sequence numbers here.. */
2387 
2388 	/* Only called by our kernel thread.
2389 	 * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2390 	 * in response to admin command or module unload.
2391 	 */
2392 	if (!drbd_get_data_sock(mdev))
2393 		return 0;
2394 
2395 	ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p,
2396 					sizeof(p), MSG_MORE);
2397 	if (ok && dgs) {
2398 		dgb = mdev->int_dig_out;
2399 		drbd_csum(mdev, mdev->integrity_w_tfm, e->private_bio, dgb);
2400 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, MSG_MORE);
2401 	}
2402 	if (ok)
2403 		ok = _drbd_send_zc_bio(mdev, e->private_bio);
2404 
2405 	drbd_put_data_sock(mdev);
2406 	return ok;
2407 }
2408 
2409 /*
2410   drbd_send distinguishes two cases:
2411 
2412   Packets sent via the data socket "sock"
2413   and packets sent via the meta data socket "msock"
2414 
2415 		    sock                      msock
2416   -----------------+-------------------------+------------------------------
2417   timeout           conf.timeout / 2          conf.timeout / 2
2418   timeout action    send a ping via msock     Abort communication
2419 					      and close all sockets
2420 */
2421 
2422 /*
2423  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2424  */
2425 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2426 	      void *buf, size_t size, unsigned msg_flags)
2427 {
2428 	struct kvec iov;
2429 	struct msghdr msg;
2430 	int rv, sent = 0;
2431 
2432 	if (!sock)
2433 		return -1000;
2434 
2435 	/* THINK  if (signal_pending) return ... ? */
2436 
2437 	iov.iov_base = buf;
2438 	iov.iov_len  = size;
2439 
2440 	msg.msg_name       = NULL;
2441 	msg.msg_namelen    = 0;
2442 	msg.msg_control    = NULL;
2443 	msg.msg_controllen = 0;
2444 	msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2445 
2446 	if (sock == mdev->data.socket) {
2447 		mdev->ko_count = mdev->net_conf->ko_count;
2448 		drbd_update_congested(mdev);
2449 	}
2450 	do {
2451 		/* STRANGE
2452 		 * tcp_sendmsg does _not_ use its size parameter at all ?
2453 		 *
2454 		 * -EAGAIN on timeout, -EINTR on signal.
2455 		 */
2456 /* THINK
2457  * do we need to block DRBD_SIG if sock == &meta.socket ??
2458  * otherwise wake_asender() might interrupt some send_*Ack !
2459  */
2460 		rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2461 		if (rv == -EAGAIN) {
2462 			if (we_should_drop_the_connection(mdev, sock))
2463 				break;
2464 			else
2465 				continue;
2466 		}
2467 		D_ASSERT(rv != 0);
2468 		if (rv == -EINTR) {
2469 			flush_signals(current);
2470 			rv = 0;
2471 		}
2472 		if (rv < 0)
2473 			break;
2474 		sent += rv;
2475 		iov.iov_base += rv;
2476 		iov.iov_len  -= rv;
2477 	} while (sent < size);
2478 
2479 	if (sock == mdev->data.socket)
2480 		clear_bit(NET_CONGESTED, &mdev->flags);
2481 
2482 	if (rv <= 0) {
2483 		if (rv != -EAGAIN) {
2484 			dev_err(DEV, "%s_sendmsg returned %d\n",
2485 			    sock == mdev->meta.socket ? "msock" : "sock",
2486 			    rv);
2487 			drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2488 		} else
2489 			drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2490 	}
2491 
2492 	return sent;
2493 }
2494 
2495 static int drbd_open(struct block_device *bdev, fmode_t mode)
2496 {
2497 	struct drbd_conf *mdev = bdev->bd_disk->private_data;
2498 	unsigned long flags;
2499 	int rv = 0;
2500 
2501 	spin_lock_irqsave(&mdev->req_lock, flags);
2502 	/* to have a stable mdev->state.role
2503 	 * and no race with updating open_cnt */
2504 
2505 	if (mdev->state.role != R_PRIMARY) {
2506 		if (mode & FMODE_WRITE)
2507 			rv = -EROFS;
2508 		else if (!allow_oos)
2509 			rv = -EMEDIUMTYPE;
2510 	}
2511 
2512 	if (!rv)
2513 		mdev->open_cnt++;
2514 	spin_unlock_irqrestore(&mdev->req_lock, flags);
2515 
2516 	return rv;
2517 }
2518 
2519 static int drbd_release(struct gendisk *gd, fmode_t mode)
2520 {
2521 	struct drbd_conf *mdev = gd->private_data;
2522 	mdev->open_cnt--;
2523 	return 0;
2524 }
2525 
2526 static void drbd_unplug_fn(struct request_queue *q)
2527 {
2528 	struct drbd_conf *mdev = q->queuedata;
2529 
2530 	/* unplug FIRST */
2531 	spin_lock_irq(q->queue_lock);
2532 	blk_remove_plug(q);
2533 	spin_unlock_irq(q->queue_lock);
2534 
2535 	/* only if connected */
2536 	spin_lock_irq(&mdev->req_lock);
2537 	if (mdev->state.pdsk >= D_INCONSISTENT && mdev->state.conn >= C_CONNECTED) {
2538 		D_ASSERT(mdev->state.role == R_PRIMARY);
2539 		if (test_and_clear_bit(UNPLUG_REMOTE, &mdev->flags)) {
2540 			/* add to the data.work queue,
2541 			 * unless already queued.
2542 			 * XXX this might be a good addition to drbd_queue_work
2543 			 * anyways, to detect "double queuing" ... */
2544 			if (list_empty(&mdev->unplug_work.list))
2545 				drbd_queue_work(&mdev->data.work,
2546 						&mdev->unplug_work);
2547 		}
2548 	}
2549 	spin_unlock_irq(&mdev->req_lock);
2550 
2551 	if (mdev->state.disk >= D_INCONSISTENT)
2552 		drbd_kick_lo(mdev);
2553 }
2554 
2555 static void drbd_set_defaults(struct drbd_conf *mdev)
2556 {
2557 	mdev->sync_conf.after      = DRBD_AFTER_DEF;
2558 	mdev->sync_conf.rate       = DRBD_RATE_DEF;
2559 	mdev->sync_conf.al_extents = DRBD_AL_EXTENTS_DEF;
2560 	mdev->state = (union drbd_state) {
2561 		{ .role = R_SECONDARY,
2562 		  .peer = R_UNKNOWN,
2563 		  .conn = C_STANDALONE,
2564 		  .disk = D_DISKLESS,
2565 		  .pdsk = D_UNKNOWN,
2566 		  .susp = 0
2567 		} };
2568 }
2569 
2570 void drbd_init_set_defaults(struct drbd_conf *mdev)
2571 {
2572 	/* the memset(,0,) did most of this.
2573 	 * note: only assignments, no allocation in here */
2574 
2575 	drbd_set_defaults(mdev);
2576 
2577 	/* for now, we do NOT yet support it,
2578 	 * even though we start some framework
2579 	 * to eventually support barriers */
2580 	set_bit(NO_BARRIER_SUPP, &mdev->flags);
2581 
2582 	atomic_set(&mdev->ap_bio_cnt, 0);
2583 	atomic_set(&mdev->ap_pending_cnt, 0);
2584 	atomic_set(&mdev->rs_pending_cnt, 0);
2585 	atomic_set(&mdev->unacked_cnt, 0);
2586 	atomic_set(&mdev->local_cnt, 0);
2587 	atomic_set(&mdev->net_cnt, 0);
2588 	atomic_set(&mdev->packet_seq, 0);
2589 	atomic_set(&mdev->pp_in_use, 0);
2590 
2591 	mutex_init(&mdev->md_io_mutex);
2592 	mutex_init(&mdev->data.mutex);
2593 	mutex_init(&mdev->meta.mutex);
2594 	sema_init(&mdev->data.work.s, 0);
2595 	sema_init(&mdev->meta.work.s, 0);
2596 	mutex_init(&mdev->state_mutex);
2597 
2598 	spin_lock_init(&mdev->data.work.q_lock);
2599 	spin_lock_init(&mdev->meta.work.q_lock);
2600 
2601 	spin_lock_init(&mdev->al_lock);
2602 	spin_lock_init(&mdev->req_lock);
2603 	spin_lock_init(&mdev->peer_seq_lock);
2604 	spin_lock_init(&mdev->epoch_lock);
2605 
2606 	INIT_LIST_HEAD(&mdev->active_ee);
2607 	INIT_LIST_HEAD(&mdev->sync_ee);
2608 	INIT_LIST_HEAD(&mdev->done_ee);
2609 	INIT_LIST_HEAD(&mdev->read_ee);
2610 	INIT_LIST_HEAD(&mdev->net_ee);
2611 	INIT_LIST_HEAD(&mdev->resync_reads);
2612 	INIT_LIST_HEAD(&mdev->data.work.q);
2613 	INIT_LIST_HEAD(&mdev->meta.work.q);
2614 	INIT_LIST_HEAD(&mdev->resync_work.list);
2615 	INIT_LIST_HEAD(&mdev->unplug_work.list);
2616 	INIT_LIST_HEAD(&mdev->md_sync_work.list);
2617 	INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2618 	mdev->resync_work.cb  = w_resync_inactive;
2619 	mdev->unplug_work.cb  = w_send_write_hint;
2620 	mdev->md_sync_work.cb = w_md_sync;
2621 	mdev->bm_io_work.w.cb = w_bitmap_io;
2622 	init_timer(&mdev->resync_timer);
2623 	init_timer(&mdev->md_sync_timer);
2624 	mdev->resync_timer.function = resync_timer_fn;
2625 	mdev->resync_timer.data = (unsigned long) mdev;
2626 	mdev->md_sync_timer.function = md_sync_timer_fn;
2627 	mdev->md_sync_timer.data = (unsigned long) mdev;
2628 
2629 	init_waitqueue_head(&mdev->misc_wait);
2630 	init_waitqueue_head(&mdev->state_wait);
2631 	init_waitqueue_head(&mdev->ee_wait);
2632 	init_waitqueue_head(&mdev->al_wait);
2633 	init_waitqueue_head(&mdev->seq_wait);
2634 
2635 	drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
2636 	drbd_thread_init(mdev, &mdev->worker, drbd_worker);
2637 	drbd_thread_init(mdev, &mdev->asender, drbd_asender);
2638 
2639 	mdev->agreed_pro_version = PRO_VERSION_MAX;
2640 	mdev->write_ordering = WO_bio_barrier;
2641 	mdev->resync_wenr = LC_FREE;
2642 }
2643 
2644 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2645 {
2646 	if (mdev->receiver.t_state != None)
2647 		dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2648 				mdev->receiver.t_state);
2649 
2650 	/* no need to lock it, I'm the only thread alive */
2651 	if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2652 		dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2653 	mdev->al_writ_cnt  =
2654 	mdev->bm_writ_cnt  =
2655 	mdev->read_cnt     =
2656 	mdev->recv_cnt     =
2657 	mdev->send_cnt     =
2658 	mdev->writ_cnt     =
2659 	mdev->p_size       =
2660 	mdev->rs_start     =
2661 	mdev->rs_total     =
2662 	mdev->rs_failed    =
2663 	mdev->rs_mark_left =
2664 	mdev->rs_mark_time = 0;
2665 	D_ASSERT(mdev->net_conf == NULL);
2666 
2667 	drbd_set_my_capacity(mdev, 0);
2668 	if (mdev->bitmap) {
2669 		/* maybe never allocated. */
2670 		drbd_bm_resize(mdev, 0);
2671 		drbd_bm_cleanup(mdev);
2672 	}
2673 
2674 	drbd_free_resources(mdev);
2675 
2676 	/*
2677 	 * currently we drbd_init_ee only on module load, so
2678 	 * we may do drbd_release_ee only on module unload!
2679 	 */
2680 	D_ASSERT(list_empty(&mdev->active_ee));
2681 	D_ASSERT(list_empty(&mdev->sync_ee));
2682 	D_ASSERT(list_empty(&mdev->done_ee));
2683 	D_ASSERT(list_empty(&mdev->read_ee));
2684 	D_ASSERT(list_empty(&mdev->net_ee));
2685 	D_ASSERT(list_empty(&mdev->resync_reads));
2686 	D_ASSERT(list_empty(&mdev->data.work.q));
2687 	D_ASSERT(list_empty(&mdev->meta.work.q));
2688 	D_ASSERT(list_empty(&mdev->resync_work.list));
2689 	D_ASSERT(list_empty(&mdev->unplug_work.list));
2690 
2691 }
2692 
2693 
2694 static void drbd_destroy_mempools(void)
2695 {
2696 	struct page *page;
2697 
2698 	while (drbd_pp_pool) {
2699 		page = drbd_pp_pool;
2700 		drbd_pp_pool = (struct page *)page_private(page);
2701 		__free_page(page);
2702 		drbd_pp_vacant--;
2703 	}
2704 
2705 	/* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2706 
2707 	if (drbd_ee_mempool)
2708 		mempool_destroy(drbd_ee_mempool);
2709 	if (drbd_request_mempool)
2710 		mempool_destroy(drbd_request_mempool);
2711 	if (drbd_ee_cache)
2712 		kmem_cache_destroy(drbd_ee_cache);
2713 	if (drbd_request_cache)
2714 		kmem_cache_destroy(drbd_request_cache);
2715 	if (drbd_bm_ext_cache)
2716 		kmem_cache_destroy(drbd_bm_ext_cache);
2717 	if (drbd_al_ext_cache)
2718 		kmem_cache_destroy(drbd_al_ext_cache);
2719 
2720 	drbd_ee_mempool      = NULL;
2721 	drbd_request_mempool = NULL;
2722 	drbd_ee_cache        = NULL;
2723 	drbd_request_cache   = NULL;
2724 	drbd_bm_ext_cache    = NULL;
2725 	drbd_al_ext_cache    = NULL;
2726 
2727 	return;
2728 }
2729 
2730 static int drbd_create_mempools(void)
2731 {
2732 	struct page *page;
2733 	const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
2734 	int i;
2735 
2736 	/* prepare our caches and mempools */
2737 	drbd_request_mempool = NULL;
2738 	drbd_ee_cache        = NULL;
2739 	drbd_request_cache   = NULL;
2740 	drbd_bm_ext_cache    = NULL;
2741 	drbd_al_ext_cache    = NULL;
2742 	drbd_pp_pool         = NULL;
2743 
2744 	/* caches */
2745 	drbd_request_cache = kmem_cache_create(
2746 		"drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2747 	if (drbd_request_cache == NULL)
2748 		goto Enomem;
2749 
2750 	drbd_ee_cache = kmem_cache_create(
2751 		"drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
2752 	if (drbd_ee_cache == NULL)
2753 		goto Enomem;
2754 
2755 	drbd_bm_ext_cache = kmem_cache_create(
2756 		"drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2757 	if (drbd_bm_ext_cache == NULL)
2758 		goto Enomem;
2759 
2760 	drbd_al_ext_cache = kmem_cache_create(
2761 		"drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2762 	if (drbd_al_ext_cache == NULL)
2763 		goto Enomem;
2764 
2765 	/* mempools */
2766 	drbd_request_mempool = mempool_create(number,
2767 		mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2768 	if (drbd_request_mempool == NULL)
2769 		goto Enomem;
2770 
2771 	drbd_ee_mempool = mempool_create(number,
2772 		mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2773 	if (drbd_request_mempool == NULL)
2774 		goto Enomem;
2775 
2776 	/* drbd's page pool */
2777 	spin_lock_init(&drbd_pp_lock);
2778 
2779 	for (i = 0; i < number; i++) {
2780 		page = alloc_page(GFP_HIGHUSER);
2781 		if (!page)
2782 			goto Enomem;
2783 		set_page_private(page, (unsigned long)drbd_pp_pool);
2784 		drbd_pp_pool = page;
2785 	}
2786 	drbd_pp_vacant = number;
2787 
2788 	return 0;
2789 
2790 Enomem:
2791 	drbd_destroy_mempools(); /* in case we allocated some */
2792 	return -ENOMEM;
2793 }
2794 
2795 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2796 	void *unused)
2797 {
2798 	/* just so we have it.  you never know what interesting things we
2799 	 * might want to do here some day...
2800 	 */
2801 
2802 	return NOTIFY_DONE;
2803 }
2804 
2805 static struct notifier_block drbd_notifier = {
2806 	.notifier_call = drbd_notify_sys,
2807 };
2808 
2809 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2810 {
2811 	int rr;
2812 
2813 	rr = drbd_release_ee(mdev, &mdev->active_ee);
2814 	if (rr)
2815 		dev_err(DEV, "%d EEs in active list found!\n", rr);
2816 
2817 	rr = drbd_release_ee(mdev, &mdev->sync_ee);
2818 	if (rr)
2819 		dev_err(DEV, "%d EEs in sync list found!\n", rr);
2820 
2821 	rr = drbd_release_ee(mdev, &mdev->read_ee);
2822 	if (rr)
2823 		dev_err(DEV, "%d EEs in read list found!\n", rr);
2824 
2825 	rr = drbd_release_ee(mdev, &mdev->done_ee);
2826 	if (rr)
2827 		dev_err(DEV, "%d EEs in done list found!\n", rr);
2828 
2829 	rr = drbd_release_ee(mdev, &mdev->net_ee);
2830 	if (rr)
2831 		dev_err(DEV, "%d EEs in net list found!\n", rr);
2832 }
2833 
2834 /* caution. no locking.
2835  * currently only used from module cleanup code. */
2836 static void drbd_delete_device(unsigned int minor)
2837 {
2838 	struct drbd_conf *mdev = minor_to_mdev(minor);
2839 
2840 	if (!mdev)
2841 		return;
2842 
2843 	/* paranoia asserts */
2844 	if (mdev->open_cnt != 0)
2845 		dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
2846 				__FILE__ , __LINE__);
2847 
2848 	ERR_IF (!list_empty(&mdev->data.work.q)) {
2849 		struct list_head *lp;
2850 		list_for_each(lp, &mdev->data.work.q) {
2851 			dev_err(DEV, "lp = %p\n", lp);
2852 		}
2853 	};
2854 	/* end paranoia asserts */
2855 
2856 	del_gendisk(mdev->vdisk);
2857 
2858 	/* cleanup stuff that may have been allocated during
2859 	 * device (re-)configuration or state changes */
2860 
2861 	if (mdev->this_bdev)
2862 		bdput(mdev->this_bdev);
2863 
2864 	drbd_free_resources(mdev);
2865 
2866 	drbd_release_ee_lists(mdev);
2867 
2868 	/* should be free'd on disconnect? */
2869 	kfree(mdev->ee_hash);
2870 	/*
2871 	mdev->ee_hash_s = 0;
2872 	mdev->ee_hash = NULL;
2873 	*/
2874 
2875 	lc_destroy(mdev->act_log);
2876 	lc_destroy(mdev->resync);
2877 
2878 	kfree(mdev->p_uuid);
2879 	/* mdev->p_uuid = NULL; */
2880 
2881 	kfree(mdev->int_dig_out);
2882 	kfree(mdev->int_dig_in);
2883 	kfree(mdev->int_dig_vv);
2884 
2885 	/* cleanup the rest that has been
2886 	 * allocated from drbd_new_device
2887 	 * and actually free the mdev itself */
2888 	drbd_free_mdev(mdev);
2889 }
2890 
2891 static void drbd_cleanup(void)
2892 {
2893 	unsigned int i;
2894 
2895 	unregister_reboot_notifier(&drbd_notifier);
2896 
2897 	drbd_nl_cleanup();
2898 
2899 	if (minor_table) {
2900 		if (drbd_proc)
2901 			remove_proc_entry("drbd", NULL);
2902 		i = minor_count;
2903 		while (i--)
2904 			drbd_delete_device(i);
2905 		drbd_destroy_mempools();
2906 	}
2907 
2908 	kfree(minor_table);
2909 
2910 	unregister_blkdev(DRBD_MAJOR, "drbd");
2911 
2912 	printk(KERN_INFO "drbd: module cleanup done.\n");
2913 }
2914 
2915 /**
2916  * drbd_congested() - Callback for pdflush
2917  * @congested_data:	User data
2918  * @bdi_bits:		Bits pdflush is currently interested in
2919  *
2920  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2921  */
2922 static int drbd_congested(void *congested_data, int bdi_bits)
2923 {
2924 	struct drbd_conf *mdev = congested_data;
2925 	struct request_queue *q;
2926 	char reason = '-';
2927 	int r = 0;
2928 
2929 	if (!__inc_ap_bio_cond(mdev)) {
2930 		/* DRBD has frozen IO */
2931 		r = bdi_bits;
2932 		reason = 'd';
2933 		goto out;
2934 	}
2935 
2936 	if (get_ldev(mdev)) {
2937 		q = bdev_get_queue(mdev->ldev->backing_bdev);
2938 		r = bdi_congested(&q->backing_dev_info, bdi_bits);
2939 		put_ldev(mdev);
2940 		if (r)
2941 			reason = 'b';
2942 	}
2943 
2944 	if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
2945 		r |= (1 << BDI_async_congested);
2946 		reason = reason == 'b' ? 'a' : 'n';
2947 	}
2948 
2949 out:
2950 	mdev->congestion_reason = reason;
2951 	return r;
2952 }
2953 
2954 struct drbd_conf *drbd_new_device(unsigned int minor)
2955 {
2956 	struct drbd_conf *mdev;
2957 	struct gendisk *disk;
2958 	struct request_queue *q;
2959 
2960 	/* GFP_KERNEL, we are outside of all write-out paths */
2961 	mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2962 	if (!mdev)
2963 		return NULL;
2964 	if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
2965 		goto out_no_cpumask;
2966 
2967 	mdev->minor = minor;
2968 
2969 	drbd_init_set_defaults(mdev);
2970 
2971 	q = blk_alloc_queue(GFP_KERNEL);
2972 	if (!q)
2973 		goto out_no_q;
2974 	mdev->rq_queue = q;
2975 	q->queuedata   = mdev;
2976 
2977 	disk = alloc_disk(1);
2978 	if (!disk)
2979 		goto out_no_disk;
2980 	mdev->vdisk = disk;
2981 
2982 	set_disk_ro(disk, TRUE);
2983 
2984 	disk->queue = q;
2985 	disk->major = DRBD_MAJOR;
2986 	disk->first_minor = minor;
2987 	disk->fops = &drbd_ops;
2988 	sprintf(disk->disk_name, "drbd%d", minor);
2989 	disk->private_data = mdev;
2990 
2991 	mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2992 	/* we have no partitions. we contain only ourselves. */
2993 	mdev->this_bdev->bd_contains = mdev->this_bdev;
2994 
2995 	q->backing_dev_info.congested_fn = drbd_congested;
2996 	q->backing_dev_info.congested_data = mdev;
2997 
2998 	blk_queue_make_request(q, drbd_make_request_26);
2999 	blk_queue_max_segment_size(q, DRBD_MAX_SEGMENT_SIZE);
3000 	blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3001 	blk_queue_merge_bvec(q, drbd_merge_bvec);
3002 	q->queue_lock = &mdev->req_lock; /* needed since we use */
3003 		/* plugging on a queue, that actually has no requests! */
3004 	q->unplug_fn = drbd_unplug_fn;
3005 
3006 	mdev->md_io_page = alloc_page(GFP_KERNEL);
3007 	if (!mdev->md_io_page)
3008 		goto out_no_io_page;
3009 
3010 	if (drbd_bm_init(mdev))
3011 		goto out_no_bitmap;
3012 	/* no need to lock access, we are still initializing this minor device. */
3013 	if (!tl_init(mdev))
3014 		goto out_no_tl;
3015 
3016 	mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3017 	if (!mdev->app_reads_hash)
3018 		goto out_no_app_reads;
3019 
3020 	mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3021 	if (!mdev->current_epoch)
3022 		goto out_no_epoch;
3023 
3024 	INIT_LIST_HEAD(&mdev->current_epoch->list);
3025 	mdev->epochs = 1;
3026 
3027 	return mdev;
3028 
3029 /* out_whatever_else:
3030 	kfree(mdev->current_epoch); */
3031 out_no_epoch:
3032 	kfree(mdev->app_reads_hash);
3033 out_no_app_reads:
3034 	tl_cleanup(mdev);
3035 out_no_tl:
3036 	drbd_bm_cleanup(mdev);
3037 out_no_bitmap:
3038 	__free_page(mdev->md_io_page);
3039 out_no_io_page:
3040 	put_disk(disk);
3041 out_no_disk:
3042 	blk_cleanup_queue(q);
3043 out_no_q:
3044 	free_cpumask_var(mdev->cpu_mask);
3045 out_no_cpumask:
3046 	kfree(mdev);
3047 	return NULL;
3048 }
3049 
3050 /* counterpart of drbd_new_device.
3051  * last part of drbd_delete_device. */
3052 void drbd_free_mdev(struct drbd_conf *mdev)
3053 {
3054 	kfree(mdev->current_epoch);
3055 	kfree(mdev->app_reads_hash);
3056 	tl_cleanup(mdev);
3057 	if (mdev->bitmap) /* should no longer be there. */
3058 		drbd_bm_cleanup(mdev);
3059 	__free_page(mdev->md_io_page);
3060 	put_disk(mdev->vdisk);
3061 	blk_cleanup_queue(mdev->rq_queue);
3062 	free_cpumask_var(mdev->cpu_mask);
3063 	kfree(mdev);
3064 }
3065 
3066 
3067 int __init drbd_init(void)
3068 {
3069 	int err;
3070 
3071 	if (sizeof(struct p_handshake) != 80) {
3072 		printk(KERN_ERR
3073 		       "drbd: never change the size or layout "
3074 		       "of the HandShake packet.\n");
3075 		return -EINVAL;
3076 	}
3077 
3078 	if (1 > minor_count || minor_count > 255) {
3079 		printk(KERN_ERR
3080 			"drbd: invalid minor_count (%d)\n", minor_count);
3081 #ifdef MODULE
3082 		return -EINVAL;
3083 #else
3084 		minor_count = 8;
3085 #endif
3086 	}
3087 
3088 	err = drbd_nl_init();
3089 	if (err)
3090 		return err;
3091 
3092 	err = register_blkdev(DRBD_MAJOR, "drbd");
3093 	if (err) {
3094 		printk(KERN_ERR
3095 		       "drbd: unable to register block device major %d\n",
3096 		       DRBD_MAJOR);
3097 		return err;
3098 	}
3099 
3100 	register_reboot_notifier(&drbd_notifier);
3101 
3102 	/*
3103 	 * allocate all necessary structs
3104 	 */
3105 	err = -ENOMEM;
3106 
3107 	init_waitqueue_head(&drbd_pp_wait);
3108 
3109 	drbd_proc = NULL; /* play safe for drbd_cleanup */
3110 	minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3111 				GFP_KERNEL);
3112 	if (!minor_table)
3113 		goto Enomem;
3114 
3115 	err = drbd_create_mempools();
3116 	if (err)
3117 		goto Enomem;
3118 
3119 	drbd_proc = proc_create("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops);
3120 	if (!drbd_proc)	{
3121 		printk(KERN_ERR "drbd: unable to register proc file\n");
3122 		goto Enomem;
3123 	}
3124 
3125 	rwlock_init(&global_state_lock);
3126 
3127 	printk(KERN_INFO "drbd: initialized. "
3128 	       "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3129 	       API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3130 	printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3131 	printk(KERN_INFO "drbd: registered as block device major %d\n",
3132 		DRBD_MAJOR);
3133 	printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3134 
3135 	return 0; /* Success! */
3136 
3137 Enomem:
3138 	drbd_cleanup();
3139 	if (err == -ENOMEM)
3140 		/* currently always the case */
3141 		printk(KERN_ERR "drbd: ran out of memory\n");
3142 	else
3143 		printk(KERN_ERR "drbd: initialization failure\n");
3144 	return err;
3145 }
3146 
3147 void drbd_free_bc(struct drbd_backing_dev *ldev)
3148 {
3149 	if (ldev == NULL)
3150 		return;
3151 
3152 	bd_release(ldev->backing_bdev);
3153 	bd_release(ldev->md_bdev);
3154 
3155 	fput(ldev->lo_file);
3156 	fput(ldev->md_file);
3157 
3158 	kfree(ldev);
3159 }
3160 
3161 void drbd_free_sock(struct drbd_conf *mdev)
3162 {
3163 	if (mdev->data.socket) {
3164 		kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3165 		sock_release(mdev->data.socket);
3166 		mdev->data.socket = NULL;
3167 	}
3168 	if (mdev->meta.socket) {
3169 		kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3170 		sock_release(mdev->meta.socket);
3171 		mdev->meta.socket = NULL;
3172 	}
3173 }
3174 
3175 
3176 void drbd_free_resources(struct drbd_conf *mdev)
3177 {
3178 	crypto_free_hash(mdev->csums_tfm);
3179 	mdev->csums_tfm = NULL;
3180 	crypto_free_hash(mdev->verify_tfm);
3181 	mdev->verify_tfm = NULL;
3182 	crypto_free_hash(mdev->cram_hmac_tfm);
3183 	mdev->cram_hmac_tfm = NULL;
3184 	crypto_free_hash(mdev->integrity_w_tfm);
3185 	mdev->integrity_w_tfm = NULL;
3186 	crypto_free_hash(mdev->integrity_r_tfm);
3187 	mdev->integrity_r_tfm = NULL;
3188 
3189 	drbd_free_sock(mdev);
3190 
3191 	__no_warn(local,
3192 		  drbd_free_bc(mdev->ldev);
3193 		  mdev->ldev = NULL;);
3194 }
3195 
3196 /* meta data management */
3197 
3198 struct meta_data_on_disk {
3199 	u64 la_size;           /* last agreed size. */
3200 	u64 uuid[UI_SIZE];   /* UUIDs. */
3201 	u64 device_uuid;
3202 	u64 reserved_u64_1;
3203 	u32 flags;             /* MDF */
3204 	u32 magic;
3205 	u32 md_size_sect;
3206 	u32 al_offset;         /* offset to this block */
3207 	u32 al_nr_extents;     /* important for restoring the AL */
3208 	      /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3209 	u32 bm_offset;         /* offset to the bitmap, from here */
3210 	u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3211 	u32 reserved_u32[4];
3212 
3213 } __packed;
3214 
3215 /**
3216  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3217  * @mdev:	DRBD device.
3218  */
3219 void drbd_md_sync(struct drbd_conf *mdev)
3220 {
3221 	struct meta_data_on_disk *buffer;
3222 	sector_t sector;
3223 	int i;
3224 
3225 	if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3226 		return;
3227 	del_timer(&mdev->md_sync_timer);
3228 
3229 	/* We use here D_FAILED and not D_ATTACHING because we try to write
3230 	 * metadata even if we detach due to a disk failure! */
3231 	if (!get_ldev_if_state(mdev, D_FAILED))
3232 		return;
3233 
3234 	mutex_lock(&mdev->md_io_mutex);
3235 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3236 	memset(buffer, 0, 512);
3237 
3238 	buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3239 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3240 		buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3241 	buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3242 	buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3243 
3244 	buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3245 	buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3246 	buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3247 	buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3248 	buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3249 
3250 	buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3251 
3252 	D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3253 	sector = mdev->ldev->md.md_offset;
3254 
3255 	if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3256 		clear_bit(MD_DIRTY, &mdev->flags);
3257 	} else {
3258 		/* this was a try anyways ... */
3259 		dev_err(DEV, "meta data update failed!\n");
3260 
3261 		drbd_chk_io_error(mdev, 1, TRUE);
3262 	}
3263 
3264 	/* Update mdev->ldev->md.la_size_sect,
3265 	 * since we updated it on metadata. */
3266 	mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3267 
3268 	mutex_unlock(&mdev->md_io_mutex);
3269 	put_ldev(mdev);
3270 }
3271 
3272 /**
3273  * drbd_md_read() - Reads in the meta data super block
3274  * @mdev:	DRBD device.
3275  * @bdev:	Device from which the meta data should be read in.
3276  *
3277  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_codes in case
3278  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3279  */
3280 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3281 {
3282 	struct meta_data_on_disk *buffer;
3283 	int i, rv = NO_ERROR;
3284 
3285 	if (!get_ldev_if_state(mdev, D_ATTACHING))
3286 		return ERR_IO_MD_DISK;
3287 
3288 	mutex_lock(&mdev->md_io_mutex);
3289 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3290 
3291 	if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3292 		/* NOTE: cant do normal error processing here as this is
3293 		   called BEFORE disk is attached */
3294 		dev_err(DEV, "Error while reading metadata.\n");
3295 		rv = ERR_IO_MD_DISK;
3296 		goto err;
3297 	}
3298 
3299 	if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3300 		dev_err(DEV, "Error while reading metadata, magic not found.\n");
3301 		rv = ERR_MD_INVALID;
3302 		goto err;
3303 	}
3304 	if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3305 		dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3306 		    be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3307 		rv = ERR_MD_INVALID;
3308 		goto err;
3309 	}
3310 	if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3311 		dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3312 		    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3313 		rv = ERR_MD_INVALID;
3314 		goto err;
3315 	}
3316 	if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3317 		dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3318 		    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3319 		rv = ERR_MD_INVALID;
3320 		goto err;
3321 	}
3322 
3323 	if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3324 		dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3325 		    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3326 		rv = ERR_MD_INVALID;
3327 		goto err;
3328 	}
3329 
3330 	bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3331 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3332 		bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3333 	bdev->md.flags = be32_to_cpu(buffer->flags);
3334 	mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3335 	bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3336 
3337 	if (mdev->sync_conf.al_extents < 7)
3338 		mdev->sync_conf.al_extents = 127;
3339 
3340  err:
3341 	mutex_unlock(&mdev->md_io_mutex);
3342 	put_ldev(mdev);
3343 
3344 	return rv;
3345 }
3346 
3347 /**
3348  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3349  * @mdev:	DRBD device.
3350  *
3351  * Call this function if you change anything that should be written to
3352  * the meta-data super block. This function sets MD_DIRTY, and starts a
3353  * timer that ensures that within five seconds you have to call drbd_md_sync().
3354  */
3355 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3356 {
3357 	set_bit(MD_DIRTY, &mdev->flags);
3358 	mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3359 }
3360 
3361 
3362 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3363 {
3364 	int i;
3365 
3366 	for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3367 		mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3368 }
3369 
3370 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3371 {
3372 	if (idx == UI_CURRENT) {
3373 		if (mdev->state.role == R_PRIMARY)
3374 			val |= 1;
3375 		else
3376 			val &= ~((u64)1);
3377 
3378 		drbd_set_ed_uuid(mdev, val);
3379 	}
3380 
3381 	mdev->ldev->md.uuid[idx] = val;
3382 	drbd_md_mark_dirty(mdev);
3383 }
3384 
3385 
3386 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3387 {
3388 	if (mdev->ldev->md.uuid[idx]) {
3389 		drbd_uuid_move_history(mdev);
3390 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3391 	}
3392 	_drbd_uuid_set(mdev, idx, val);
3393 }
3394 
3395 /**
3396  * drbd_uuid_new_current() - Creates a new current UUID
3397  * @mdev:	DRBD device.
3398  *
3399  * Creates a new current UUID, and rotates the old current UUID into
3400  * the bitmap slot. Causes an incremental resync upon next connect.
3401  */
3402 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3403 {
3404 	u64 val;
3405 
3406 	dev_info(DEV, "Creating new current UUID\n");
3407 	D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3408 	mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3409 
3410 	get_random_bytes(&val, sizeof(u64));
3411 	_drbd_uuid_set(mdev, UI_CURRENT, val);
3412 }
3413 
3414 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3415 {
3416 	if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3417 		return;
3418 
3419 	if (val == 0) {
3420 		drbd_uuid_move_history(mdev);
3421 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3422 		mdev->ldev->md.uuid[UI_BITMAP] = 0;
3423 	} else {
3424 		if (mdev->ldev->md.uuid[UI_BITMAP])
3425 			dev_warn(DEV, "bm UUID already set");
3426 
3427 		mdev->ldev->md.uuid[UI_BITMAP] = val;
3428 		mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3429 
3430 	}
3431 	drbd_md_mark_dirty(mdev);
3432 }
3433 
3434 /**
3435  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3436  * @mdev:	DRBD device.
3437  *
3438  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3439  */
3440 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3441 {
3442 	int rv = -EIO;
3443 
3444 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3445 		drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3446 		drbd_md_sync(mdev);
3447 		drbd_bm_set_all(mdev);
3448 
3449 		rv = drbd_bm_write(mdev);
3450 
3451 		if (!rv) {
3452 			drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3453 			drbd_md_sync(mdev);
3454 		}
3455 
3456 		put_ldev(mdev);
3457 	}
3458 
3459 	return rv;
3460 }
3461 
3462 /**
3463  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3464  * @mdev:	DRBD device.
3465  *
3466  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3467  */
3468 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3469 {
3470 	int rv = -EIO;
3471 
3472 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3473 		drbd_bm_clear_all(mdev);
3474 		rv = drbd_bm_write(mdev);
3475 		put_ldev(mdev);
3476 	}
3477 
3478 	return rv;
3479 }
3480 
3481 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3482 {
3483 	struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3484 	int rv;
3485 
3486 	D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3487 
3488 	drbd_bm_lock(mdev, work->why);
3489 	rv = work->io_fn(mdev);
3490 	drbd_bm_unlock(mdev);
3491 
3492 	clear_bit(BITMAP_IO, &mdev->flags);
3493 	wake_up(&mdev->misc_wait);
3494 
3495 	if (work->done)
3496 		work->done(mdev, rv);
3497 
3498 	clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3499 	work->why = NULL;
3500 
3501 	return 1;
3502 }
3503 
3504 /**
3505  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3506  * @mdev:	DRBD device.
3507  * @io_fn:	IO callback to be called when bitmap IO is possible
3508  * @done:	callback to be called after the bitmap IO was performed
3509  * @why:	Descriptive text of the reason for doing the IO
3510  *
3511  * While IO on the bitmap happens we freeze application IO thus we ensure
3512  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3513  * called from worker context. It MUST NOT be used while a previous such
3514  * work is still pending!
3515  */
3516 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3517 			  int (*io_fn)(struct drbd_conf *),
3518 			  void (*done)(struct drbd_conf *, int),
3519 			  char *why)
3520 {
3521 	D_ASSERT(current == mdev->worker.task);
3522 
3523 	D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3524 	D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3525 	D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3526 	if (mdev->bm_io_work.why)
3527 		dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3528 			why, mdev->bm_io_work.why);
3529 
3530 	mdev->bm_io_work.io_fn = io_fn;
3531 	mdev->bm_io_work.done = done;
3532 	mdev->bm_io_work.why = why;
3533 
3534 	set_bit(BITMAP_IO, &mdev->flags);
3535 	if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3536 		if (list_empty(&mdev->bm_io_work.w.list)) {
3537 			set_bit(BITMAP_IO_QUEUED, &mdev->flags);
3538 			drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
3539 		} else
3540 			dev_err(DEV, "FIXME avoided double queuing bm_io_work\n");
3541 	}
3542 }
3543 
3544 /**
3545  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3546  * @mdev:	DRBD device.
3547  * @io_fn:	IO callback to be called when bitmap IO is possible
3548  * @why:	Descriptive text of the reason for doing the IO
3549  *
3550  * freezes application IO while that the actual IO operations runs. This
3551  * functions MAY NOT be called from worker context.
3552  */
3553 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
3554 {
3555 	int rv;
3556 
3557 	D_ASSERT(current != mdev->worker.task);
3558 
3559 	drbd_suspend_io(mdev);
3560 
3561 	drbd_bm_lock(mdev, why);
3562 	rv = io_fn(mdev);
3563 	drbd_bm_unlock(mdev);
3564 
3565 	drbd_resume_io(mdev);
3566 
3567 	return rv;
3568 }
3569 
3570 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3571 {
3572 	if ((mdev->ldev->md.flags & flag) != flag) {
3573 		drbd_md_mark_dirty(mdev);
3574 		mdev->ldev->md.flags |= flag;
3575 	}
3576 }
3577 
3578 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3579 {
3580 	if ((mdev->ldev->md.flags & flag) != 0) {
3581 		drbd_md_mark_dirty(mdev);
3582 		mdev->ldev->md.flags &= ~flag;
3583 	}
3584 }
3585 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3586 {
3587 	return (bdev->md.flags & flag) != 0;
3588 }
3589 
3590 static void md_sync_timer_fn(unsigned long data)
3591 {
3592 	struct drbd_conf *mdev = (struct drbd_conf *) data;
3593 
3594 	drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
3595 }
3596 
3597 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3598 {
3599 	dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3600 	drbd_md_sync(mdev);
3601 
3602 	return 1;
3603 }
3604 
3605 #ifdef CONFIG_DRBD_FAULT_INJECTION
3606 /* Fault insertion support including random number generator shamelessly
3607  * stolen from kernel/rcutorture.c */
3608 struct fault_random_state {
3609 	unsigned long state;
3610 	unsigned long count;
3611 };
3612 
3613 #define FAULT_RANDOM_MULT 39916801  /* prime */
3614 #define FAULT_RANDOM_ADD	479001701 /* prime */
3615 #define FAULT_RANDOM_REFRESH 10000
3616 
3617 /*
3618  * Crude but fast random-number generator.  Uses a linear congruential
3619  * generator, with occasional help from get_random_bytes().
3620  */
3621 static unsigned long
3622 _drbd_fault_random(struct fault_random_state *rsp)
3623 {
3624 	long refresh;
3625 
3626 	if (!rsp->count--) {
3627 		get_random_bytes(&refresh, sizeof(refresh));
3628 		rsp->state += refresh;
3629 		rsp->count = FAULT_RANDOM_REFRESH;
3630 	}
3631 	rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3632 	return swahw32(rsp->state);
3633 }
3634 
3635 static char *
3636 _drbd_fault_str(unsigned int type) {
3637 	static char *_faults[] = {
3638 		[DRBD_FAULT_MD_WR] = "Meta-data write",
3639 		[DRBD_FAULT_MD_RD] = "Meta-data read",
3640 		[DRBD_FAULT_RS_WR] = "Resync write",
3641 		[DRBD_FAULT_RS_RD] = "Resync read",
3642 		[DRBD_FAULT_DT_WR] = "Data write",
3643 		[DRBD_FAULT_DT_RD] = "Data read",
3644 		[DRBD_FAULT_DT_RA] = "Data read ahead",
3645 		[DRBD_FAULT_BM_ALLOC] = "BM allocation",
3646 		[DRBD_FAULT_AL_EE] = "EE allocation"
3647 	};
3648 
3649 	return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3650 }
3651 
3652 unsigned int
3653 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3654 {
3655 	static struct fault_random_state rrs = {0, 0};
3656 
3657 	unsigned int ret = (
3658 		(fault_devs == 0 ||
3659 			((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3660 		(((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3661 
3662 	if (ret) {
3663 		fault_count++;
3664 
3665 		if (printk_ratelimit())
3666 			dev_warn(DEV, "***Simulating %s failure\n",
3667 				_drbd_fault_str(type));
3668 	}
3669 
3670 	return ret;
3671 }
3672 #endif
3673 
3674 const char *drbd_buildtag(void)
3675 {
3676 	/* DRBD built from external sources has here a reference to the
3677 	   git hash of the source code. */
3678 
3679 	static char buildtag[38] = "\0uilt-in";
3680 
3681 	if (buildtag[0] == 0) {
3682 #ifdef CONFIG_MODULES
3683 		if (THIS_MODULE != NULL)
3684 			sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3685 		else
3686 #endif
3687 			buildtag[0] = 'b';
3688 	}
3689 
3690 	return buildtag;
3691 }
3692 
3693 module_init(drbd_init)
3694 module_exit(drbd_cleanup)
3695 
3696 EXPORT_SYMBOL(drbd_conn_str);
3697 EXPORT_SYMBOL(drbd_role_str);
3698 EXPORT_SYMBOL(drbd_disk_str);
3699 EXPORT_SYMBOL(drbd_set_st_err_str);
3700