xref: /openbmc/linux/drivers/block/drbd/drbd_main.c (revision e190bfe5)
1 /*
2    drbd.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12 
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17 
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22 
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 
27  */
28 
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/smp_lock.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48 
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52 
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56 
57 #include "drbd_vli.h"
58 
59 struct after_state_chg_work {
60 	struct drbd_work w;
61 	union drbd_state os;
62 	union drbd_state ns;
63 	enum chg_state_flags flags;
64 	struct completion *done;
65 };
66 
67 int drbdd_init(struct drbd_thread *);
68 int drbd_worker(struct drbd_thread *);
69 int drbd_asender(struct drbd_thread *);
70 
71 int drbd_init(void);
72 static int drbd_open(struct block_device *bdev, fmode_t mode);
73 static int drbd_release(struct gendisk *gd, fmode_t mode);
74 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
75 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
76 			   union drbd_state ns, enum chg_state_flags flags);
77 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
78 static void md_sync_timer_fn(unsigned long data);
79 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
80 
81 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
82 	      "Lars Ellenberg <lars@linbit.com>");
83 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
84 MODULE_VERSION(REL_VERSION);
85 MODULE_LICENSE("GPL");
86 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
87 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
88 
89 #include <linux/moduleparam.h>
90 /* allow_open_on_secondary */
91 MODULE_PARM_DESC(allow_oos, "DONT USE!");
92 /* thanks to these macros, if compiled into the kernel (not-module),
93  * this becomes the boot parameter drbd.minor_count */
94 module_param(minor_count, uint, 0444);
95 module_param(disable_sendpage, bool, 0644);
96 module_param(allow_oos, bool, 0);
97 module_param(cn_idx, uint, 0444);
98 module_param(proc_details, int, 0644);
99 
100 #ifdef CONFIG_DRBD_FAULT_INJECTION
101 int enable_faults;
102 int fault_rate;
103 static int fault_count;
104 int fault_devs;
105 /* bitmap of enabled faults */
106 module_param(enable_faults, int, 0664);
107 /* fault rate % value - applies to all enabled faults */
108 module_param(fault_rate, int, 0664);
109 /* count of faults inserted */
110 module_param(fault_count, int, 0664);
111 /* bitmap of devices to insert faults on */
112 module_param(fault_devs, int, 0644);
113 #endif
114 
115 /* module parameter, defined */
116 unsigned int minor_count = 32;
117 int disable_sendpage;
118 int allow_oos;
119 unsigned int cn_idx = CN_IDX_DRBD;
120 int proc_details;       /* Detail level in proc drbd*/
121 
122 /* Module parameter for setting the user mode helper program
123  * to run. Default is /sbin/drbdadm */
124 char usermode_helper[80] = "/sbin/drbdadm";
125 
126 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
127 
128 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
129  * as member "struct gendisk *vdisk;"
130  */
131 struct drbd_conf **minor_table;
132 
133 struct kmem_cache *drbd_request_cache;
134 struct kmem_cache *drbd_ee_cache;	/* epoch entries */
135 struct kmem_cache *drbd_bm_ext_cache;	/* bitmap extents */
136 struct kmem_cache *drbd_al_ext_cache;	/* activity log extents */
137 mempool_t *drbd_request_mempool;
138 mempool_t *drbd_ee_mempool;
139 
140 /* I do not use a standard mempool, because:
141    1) I want to hand out the pre-allocated objects first.
142    2) I want to be able to interrupt sleeping allocation with a signal.
143    Note: This is a single linked list, the next pointer is the private
144 	 member of struct page.
145  */
146 struct page *drbd_pp_pool;
147 spinlock_t   drbd_pp_lock;
148 int          drbd_pp_vacant;
149 wait_queue_head_t drbd_pp_wait;
150 
151 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
152 
153 static const struct block_device_operations drbd_ops = {
154 	.owner =   THIS_MODULE,
155 	.open =    drbd_open,
156 	.release = drbd_release,
157 };
158 
159 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
160 
161 #ifdef __CHECKER__
162 /* When checking with sparse, and this is an inline function, sparse will
163    give tons of false positives. When this is a real functions sparse works.
164  */
165 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
166 {
167 	int io_allowed;
168 
169 	atomic_inc(&mdev->local_cnt);
170 	io_allowed = (mdev->state.disk >= mins);
171 	if (!io_allowed) {
172 		if (atomic_dec_and_test(&mdev->local_cnt))
173 			wake_up(&mdev->misc_wait);
174 	}
175 	return io_allowed;
176 }
177 
178 #endif
179 
180 /**
181  * DOC: The transfer log
182  *
183  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
184  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
185  * of the list. There is always at least one &struct drbd_tl_epoch object.
186  *
187  * Each &struct drbd_tl_epoch has a circular double linked list of requests
188  * attached.
189  */
190 static int tl_init(struct drbd_conf *mdev)
191 {
192 	struct drbd_tl_epoch *b;
193 
194 	/* during device minor initialization, we may well use GFP_KERNEL */
195 	b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
196 	if (!b)
197 		return 0;
198 	INIT_LIST_HEAD(&b->requests);
199 	INIT_LIST_HEAD(&b->w.list);
200 	b->next = NULL;
201 	b->br_number = 4711;
202 	b->n_req = 0;
203 	b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
204 
205 	mdev->oldest_tle = b;
206 	mdev->newest_tle = b;
207 	INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
208 
209 	mdev->tl_hash = NULL;
210 	mdev->tl_hash_s = 0;
211 
212 	return 1;
213 }
214 
215 static void tl_cleanup(struct drbd_conf *mdev)
216 {
217 	D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
218 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
219 	kfree(mdev->oldest_tle);
220 	mdev->oldest_tle = NULL;
221 	kfree(mdev->unused_spare_tle);
222 	mdev->unused_spare_tle = NULL;
223 	kfree(mdev->tl_hash);
224 	mdev->tl_hash = NULL;
225 	mdev->tl_hash_s = 0;
226 }
227 
228 /**
229  * _tl_add_barrier() - Adds a barrier to the transfer log
230  * @mdev:	DRBD device.
231  * @new:	Barrier to be added before the current head of the TL.
232  *
233  * The caller must hold the req_lock.
234  */
235 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
236 {
237 	struct drbd_tl_epoch *newest_before;
238 
239 	INIT_LIST_HEAD(&new->requests);
240 	INIT_LIST_HEAD(&new->w.list);
241 	new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
242 	new->next = NULL;
243 	new->n_req = 0;
244 
245 	newest_before = mdev->newest_tle;
246 	/* never send a barrier number == 0, because that is special-cased
247 	 * when using TCQ for our write ordering code */
248 	new->br_number = (newest_before->br_number+1) ?: 1;
249 	if (mdev->newest_tle != new) {
250 		mdev->newest_tle->next = new;
251 		mdev->newest_tle = new;
252 	}
253 }
254 
255 /**
256  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
257  * @mdev:	DRBD device.
258  * @barrier_nr:	Expected identifier of the DRBD write barrier packet.
259  * @set_size:	Expected number of requests before that barrier.
260  *
261  * In case the passed barrier_nr or set_size does not match the oldest
262  * &struct drbd_tl_epoch objects this function will cause a termination
263  * of the connection.
264  */
265 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
266 		       unsigned int set_size)
267 {
268 	struct drbd_tl_epoch *b, *nob; /* next old barrier */
269 	struct list_head *le, *tle;
270 	struct drbd_request *r;
271 
272 	spin_lock_irq(&mdev->req_lock);
273 
274 	b = mdev->oldest_tle;
275 
276 	/* first some paranoia code */
277 	if (b == NULL) {
278 		dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
279 			barrier_nr);
280 		goto bail;
281 	}
282 	if (b->br_number != barrier_nr) {
283 		dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
284 			barrier_nr, b->br_number);
285 		goto bail;
286 	}
287 	if (b->n_req != set_size) {
288 		dev_err(DEV, "BAD! BarrierAck #%u received with n_req=%u, expected n_req=%u!\n",
289 			barrier_nr, set_size, b->n_req);
290 		goto bail;
291 	}
292 
293 	/* Clean up list of requests processed during current epoch */
294 	list_for_each_safe(le, tle, &b->requests) {
295 		r = list_entry(le, struct drbd_request, tl_requests);
296 		_req_mod(r, barrier_acked);
297 	}
298 	/* There could be requests on the list waiting for completion
299 	   of the write to the local disk. To avoid corruptions of
300 	   slab's data structures we have to remove the lists head.
301 
302 	   Also there could have been a barrier ack out of sequence, overtaking
303 	   the write acks - which would be a bug and violating write ordering.
304 	   To not deadlock in case we lose connection while such requests are
305 	   still pending, we need some way to find them for the
306 	   _req_mode(connection_lost_while_pending).
307 
308 	   These have been list_move'd to the out_of_sequence_requests list in
309 	   _req_mod(, barrier_acked) above.
310 	   */
311 	list_del_init(&b->requests);
312 
313 	nob = b->next;
314 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
315 		_tl_add_barrier(mdev, b);
316 		if (nob)
317 			mdev->oldest_tle = nob;
318 		/* if nob == NULL b was the only barrier, and becomes the new
319 		   barrier. Therefore mdev->oldest_tle points already to b */
320 	} else {
321 		D_ASSERT(nob != NULL);
322 		mdev->oldest_tle = nob;
323 		kfree(b);
324 	}
325 
326 	spin_unlock_irq(&mdev->req_lock);
327 	dec_ap_pending(mdev);
328 
329 	return;
330 
331 bail:
332 	spin_unlock_irq(&mdev->req_lock);
333 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
334 }
335 
336 
337 /**
338  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
339  * @mdev:	DRBD device.
340  *
341  * This is called after the connection to the peer was lost. The storage covered
342  * by the requests on the transfer gets marked as our of sync. Called from the
343  * receiver thread and the worker thread.
344  */
345 void tl_clear(struct drbd_conf *mdev)
346 {
347 	struct drbd_tl_epoch *b, *tmp;
348 	struct list_head *le, *tle;
349 	struct drbd_request *r;
350 	int new_initial_bnr = net_random();
351 
352 	spin_lock_irq(&mdev->req_lock);
353 
354 	b = mdev->oldest_tle;
355 	while (b) {
356 		list_for_each_safe(le, tle, &b->requests) {
357 			r = list_entry(le, struct drbd_request, tl_requests);
358 			/* It would be nice to complete outside of spinlock.
359 			 * But this is easier for now. */
360 			_req_mod(r, connection_lost_while_pending);
361 		}
362 		tmp = b->next;
363 
364 		/* there could still be requests on that ring list,
365 		 * in case local io is still pending */
366 		list_del(&b->requests);
367 
368 		/* dec_ap_pending corresponding to queue_barrier.
369 		 * the newest barrier may not have been queued yet,
370 		 * in which case w.cb is still NULL. */
371 		if (b->w.cb != NULL)
372 			dec_ap_pending(mdev);
373 
374 		if (b == mdev->newest_tle) {
375 			/* recycle, but reinit! */
376 			D_ASSERT(tmp == NULL);
377 			INIT_LIST_HEAD(&b->requests);
378 			INIT_LIST_HEAD(&b->w.list);
379 			b->w.cb = NULL;
380 			b->br_number = new_initial_bnr;
381 			b->n_req = 0;
382 
383 			mdev->oldest_tle = b;
384 			break;
385 		}
386 		kfree(b);
387 		b = tmp;
388 	}
389 
390 	/* we expect this list to be empty. */
391 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
392 
393 	/* but just in case, clean it up anyways! */
394 	list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
395 		r = list_entry(le, struct drbd_request, tl_requests);
396 		/* It would be nice to complete outside of spinlock.
397 		 * But this is easier for now. */
398 		_req_mod(r, connection_lost_while_pending);
399 	}
400 
401 	/* ensure bit indicating barrier is required is clear */
402 	clear_bit(CREATE_BARRIER, &mdev->flags);
403 
404 	spin_unlock_irq(&mdev->req_lock);
405 }
406 
407 /**
408  * cl_wide_st_chg() - TRUE if the state change is a cluster wide one
409  * @mdev:	DRBD device.
410  * @os:		old (current) state.
411  * @ns:		new (wanted) state.
412  */
413 static int cl_wide_st_chg(struct drbd_conf *mdev,
414 			  union drbd_state os, union drbd_state ns)
415 {
416 	return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
417 		 ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
418 		  (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
419 		  (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
420 		  (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
421 		(os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
422 		(os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
423 }
424 
425 int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
426 		      union drbd_state mask, union drbd_state val)
427 {
428 	unsigned long flags;
429 	union drbd_state os, ns;
430 	int rv;
431 
432 	spin_lock_irqsave(&mdev->req_lock, flags);
433 	os = mdev->state;
434 	ns.i = (os.i & ~mask.i) | val.i;
435 	rv = _drbd_set_state(mdev, ns, f, NULL);
436 	ns = mdev->state;
437 	spin_unlock_irqrestore(&mdev->req_lock, flags);
438 
439 	return rv;
440 }
441 
442 /**
443  * drbd_force_state() - Impose a change which happens outside our control on our state
444  * @mdev:	DRBD device.
445  * @mask:	mask of state bits to change.
446  * @val:	value of new state bits.
447  */
448 void drbd_force_state(struct drbd_conf *mdev,
449 	union drbd_state mask, union drbd_state val)
450 {
451 	drbd_change_state(mdev, CS_HARD, mask, val);
452 }
453 
454 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
455 static int is_valid_state_transition(struct drbd_conf *,
456 				     union drbd_state, union drbd_state);
457 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
458 				       union drbd_state ns, int *warn_sync_abort);
459 int drbd_send_state_req(struct drbd_conf *,
460 			union drbd_state, union drbd_state);
461 
462 static enum drbd_state_ret_codes _req_st_cond(struct drbd_conf *mdev,
463 				    union drbd_state mask, union drbd_state val)
464 {
465 	union drbd_state os, ns;
466 	unsigned long flags;
467 	int rv;
468 
469 	if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
470 		return SS_CW_SUCCESS;
471 
472 	if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
473 		return SS_CW_FAILED_BY_PEER;
474 
475 	rv = 0;
476 	spin_lock_irqsave(&mdev->req_lock, flags);
477 	os = mdev->state;
478 	ns.i = (os.i & ~mask.i) | val.i;
479 	ns = sanitize_state(mdev, os, ns, NULL);
480 
481 	if (!cl_wide_st_chg(mdev, os, ns))
482 		rv = SS_CW_NO_NEED;
483 	if (!rv) {
484 		rv = is_valid_state(mdev, ns);
485 		if (rv == SS_SUCCESS) {
486 			rv = is_valid_state_transition(mdev, ns, os);
487 			if (rv == SS_SUCCESS)
488 				rv = 0; /* cont waiting, otherwise fail. */
489 		}
490 	}
491 	spin_unlock_irqrestore(&mdev->req_lock, flags);
492 
493 	return rv;
494 }
495 
496 /**
497  * drbd_req_state() - Perform an eventually cluster wide state change
498  * @mdev:	DRBD device.
499  * @mask:	mask of state bits to change.
500  * @val:	value of new state bits.
501  * @f:		flags
502  *
503  * Should not be called directly, use drbd_request_state() or
504  * _drbd_request_state().
505  */
506 static int drbd_req_state(struct drbd_conf *mdev,
507 			  union drbd_state mask, union drbd_state val,
508 			  enum chg_state_flags f)
509 {
510 	struct completion done;
511 	unsigned long flags;
512 	union drbd_state os, ns;
513 	int rv;
514 
515 	init_completion(&done);
516 
517 	if (f & CS_SERIALIZE)
518 		mutex_lock(&mdev->state_mutex);
519 
520 	spin_lock_irqsave(&mdev->req_lock, flags);
521 	os = mdev->state;
522 	ns.i = (os.i & ~mask.i) | val.i;
523 	ns = sanitize_state(mdev, os, ns, NULL);
524 
525 	if (cl_wide_st_chg(mdev, os, ns)) {
526 		rv = is_valid_state(mdev, ns);
527 		if (rv == SS_SUCCESS)
528 			rv = is_valid_state_transition(mdev, ns, os);
529 		spin_unlock_irqrestore(&mdev->req_lock, flags);
530 
531 		if (rv < SS_SUCCESS) {
532 			if (f & CS_VERBOSE)
533 				print_st_err(mdev, os, ns, rv);
534 			goto abort;
535 		}
536 
537 		drbd_state_lock(mdev);
538 		if (!drbd_send_state_req(mdev, mask, val)) {
539 			drbd_state_unlock(mdev);
540 			rv = SS_CW_FAILED_BY_PEER;
541 			if (f & CS_VERBOSE)
542 				print_st_err(mdev, os, ns, rv);
543 			goto abort;
544 		}
545 
546 		wait_event(mdev->state_wait,
547 			(rv = _req_st_cond(mdev, mask, val)));
548 
549 		if (rv < SS_SUCCESS) {
550 			drbd_state_unlock(mdev);
551 			if (f & CS_VERBOSE)
552 				print_st_err(mdev, os, ns, rv);
553 			goto abort;
554 		}
555 		spin_lock_irqsave(&mdev->req_lock, flags);
556 		os = mdev->state;
557 		ns.i = (os.i & ~mask.i) | val.i;
558 		rv = _drbd_set_state(mdev, ns, f, &done);
559 		drbd_state_unlock(mdev);
560 	} else {
561 		rv = _drbd_set_state(mdev, ns, f, &done);
562 	}
563 
564 	spin_unlock_irqrestore(&mdev->req_lock, flags);
565 
566 	if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
567 		D_ASSERT(current != mdev->worker.task);
568 		wait_for_completion(&done);
569 	}
570 
571 abort:
572 	if (f & CS_SERIALIZE)
573 		mutex_unlock(&mdev->state_mutex);
574 
575 	return rv;
576 }
577 
578 /**
579  * _drbd_request_state() - Request a state change (with flags)
580  * @mdev:	DRBD device.
581  * @mask:	mask of state bits to change.
582  * @val:	value of new state bits.
583  * @f:		flags
584  *
585  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
586  * flag, or when logging of failed state change requests is not desired.
587  */
588 int _drbd_request_state(struct drbd_conf *mdev,	union drbd_state mask,
589 			union drbd_state val,	enum chg_state_flags f)
590 {
591 	int rv;
592 
593 	wait_event(mdev->state_wait,
594 		   (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
595 
596 	return rv;
597 }
598 
599 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
600 {
601 	dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
602 	    name,
603 	    drbd_conn_str(ns.conn),
604 	    drbd_role_str(ns.role),
605 	    drbd_role_str(ns.peer),
606 	    drbd_disk_str(ns.disk),
607 	    drbd_disk_str(ns.pdsk),
608 	    ns.susp ? 's' : 'r',
609 	    ns.aftr_isp ? 'a' : '-',
610 	    ns.peer_isp ? 'p' : '-',
611 	    ns.user_isp ? 'u' : '-'
612 	    );
613 }
614 
615 void print_st_err(struct drbd_conf *mdev,
616 	union drbd_state os, union drbd_state ns, int err)
617 {
618 	if (err == SS_IN_TRANSIENT_STATE)
619 		return;
620 	dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
621 	print_st(mdev, " state", os);
622 	print_st(mdev, "wanted", ns);
623 }
624 
625 
626 #define drbd_peer_str drbd_role_str
627 #define drbd_pdsk_str drbd_disk_str
628 
629 #define drbd_susp_str(A)     ((A) ? "1" : "0")
630 #define drbd_aftr_isp_str(A) ((A) ? "1" : "0")
631 #define drbd_peer_isp_str(A) ((A) ? "1" : "0")
632 #define drbd_user_isp_str(A) ((A) ? "1" : "0")
633 
634 #define PSC(A) \
635 	({ if (ns.A != os.A) { \
636 		pbp += sprintf(pbp, #A "( %s -> %s ) ", \
637 			      drbd_##A##_str(os.A), \
638 			      drbd_##A##_str(ns.A)); \
639 	} })
640 
641 /**
642  * is_valid_state() - Returns an SS_ error code if ns is not valid
643  * @mdev:	DRBD device.
644  * @ns:		State to consider.
645  */
646 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
647 {
648 	/* See drbd_state_sw_errors in drbd_strings.c */
649 
650 	enum drbd_fencing_p fp;
651 	int rv = SS_SUCCESS;
652 
653 	fp = FP_DONT_CARE;
654 	if (get_ldev(mdev)) {
655 		fp = mdev->ldev->dc.fencing;
656 		put_ldev(mdev);
657 	}
658 
659 	if (get_net_conf(mdev)) {
660 		if (!mdev->net_conf->two_primaries &&
661 		    ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
662 			rv = SS_TWO_PRIMARIES;
663 		put_net_conf(mdev);
664 	}
665 
666 	if (rv <= 0)
667 		/* already found a reason to abort */;
668 	else if (ns.role == R_SECONDARY && mdev->open_cnt)
669 		rv = SS_DEVICE_IN_USE;
670 
671 	else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
672 		rv = SS_NO_UP_TO_DATE_DISK;
673 
674 	else if (fp >= FP_RESOURCE &&
675 		 ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
676 		rv = SS_PRIMARY_NOP;
677 
678 	else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
679 		rv = SS_NO_UP_TO_DATE_DISK;
680 
681 	else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
682 		rv = SS_NO_LOCAL_DISK;
683 
684 	else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
685 		rv = SS_NO_REMOTE_DISK;
686 
687 	else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
688 		rv = SS_NO_UP_TO_DATE_DISK;
689 
690 	else if ((ns.conn == C_CONNECTED ||
691 		  ns.conn == C_WF_BITMAP_S ||
692 		  ns.conn == C_SYNC_SOURCE ||
693 		  ns.conn == C_PAUSED_SYNC_S) &&
694 		  ns.disk == D_OUTDATED)
695 		rv = SS_CONNECTED_OUTDATES;
696 
697 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
698 		 (mdev->sync_conf.verify_alg[0] == 0))
699 		rv = SS_NO_VERIFY_ALG;
700 
701 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
702 		  mdev->agreed_pro_version < 88)
703 		rv = SS_NOT_SUPPORTED;
704 
705 	return rv;
706 }
707 
708 /**
709  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
710  * @mdev:	DRBD device.
711  * @ns:		new state.
712  * @os:		old state.
713  */
714 static int is_valid_state_transition(struct drbd_conf *mdev,
715 				     union drbd_state ns, union drbd_state os)
716 {
717 	int rv = SS_SUCCESS;
718 
719 	if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
720 	    os.conn > C_CONNECTED)
721 		rv = SS_RESYNC_RUNNING;
722 
723 	if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
724 		rv = SS_ALREADY_STANDALONE;
725 
726 	if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
727 		rv = SS_IS_DISKLESS;
728 
729 	if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
730 		rv = SS_NO_NET_CONFIG;
731 
732 	if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
733 		rv = SS_LOWER_THAN_OUTDATED;
734 
735 	if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
736 		rv = SS_IN_TRANSIENT_STATE;
737 
738 	if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
739 		rv = SS_IN_TRANSIENT_STATE;
740 
741 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
742 		rv = SS_NEED_CONNECTION;
743 
744 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
745 	    ns.conn != os.conn && os.conn > C_CONNECTED)
746 		rv = SS_RESYNC_RUNNING;
747 
748 	if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
749 	    os.conn < C_CONNECTED)
750 		rv = SS_NEED_CONNECTION;
751 
752 	return rv;
753 }
754 
755 /**
756  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
757  * @mdev:	DRBD device.
758  * @os:		old state.
759  * @ns:		new state.
760  * @warn_sync_abort:
761  *
762  * When we loose connection, we have to set the state of the peers disk (pdsk)
763  * to D_UNKNOWN. This rule and many more along those lines are in this function.
764  */
765 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
766 				       union drbd_state ns, int *warn_sync_abort)
767 {
768 	enum drbd_fencing_p fp;
769 
770 	fp = FP_DONT_CARE;
771 	if (get_ldev(mdev)) {
772 		fp = mdev->ldev->dc.fencing;
773 		put_ldev(mdev);
774 	}
775 
776 	/* Disallow Network errors to configure a device's network part */
777 	if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
778 	    os.conn <= C_DISCONNECTING)
779 		ns.conn = os.conn;
780 
781 	/* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow */
782 	if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
783 	    ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING)
784 		ns.conn = os.conn;
785 
786 	/* After C_DISCONNECTING only C_STANDALONE may follow */
787 	if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
788 		ns.conn = os.conn;
789 
790 	if (ns.conn < C_CONNECTED) {
791 		ns.peer_isp = 0;
792 		ns.peer = R_UNKNOWN;
793 		if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
794 			ns.pdsk = D_UNKNOWN;
795 	}
796 
797 	/* Clear the aftr_isp when becoming unconfigured */
798 	if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
799 		ns.aftr_isp = 0;
800 
801 	if (ns.conn <= C_DISCONNECTING && ns.disk == D_DISKLESS)
802 		ns.pdsk = D_UNKNOWN;
803 
804 	/* Abort resync if a disk fails/detaches */
805 	if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
806 	    (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
807 		if (warn_sync_abort)
808 			*warn_sync_abort = 1;
809 		ns.conn = C_CONNECTED;
810 	}
811 
812 	if (ns.conn >= C_CONNECTED &&
813 	    ((ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED) ||
814 	     (ns.disk == D_NEGOTIATING && ns.conn == C_WF_BITMAP_T))) {
815 		switch (ns.conn) {
816 		case C_WF_BITMAP_T:
817 		case C_PAUSED_SYNC_T:
818 			ns.disk = D_OUTDATED;
819 			break;
820 		case C_CONNECTED:
821 		case C_WF_BITMAP_S:
822 		case C_SYNC_SOURCE:
823 		case C_PAUSED_SYNC_S:
824 			ns.disk = D_UP_TO_DATE;
825 			break;
826 		case C_SYNC_TARGET:
827 			ns.disk = D_INCONSISTENT;
828 			dev_warn(DEV, "Implicitly set disk state Inconsistent!\n");
829 			break;
830 		}
831 		if (os.disk == D_OUTDATED && ns.disk == D_UP_TO_DATE)
832 			dev_warn(DEV, "Implicitly set disk from Outdated to UpToDate\n");
833 	}
834 
835 	if (ns.conn >= C_CONNECTED &&
836 	    (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)) {
837 		switch (ns.conn) {
838 		case C_CONNECTED:
839 		case C_WF_BITMAP_T:
840 		case C_PAUSED_SYNC_T:
841 		case C_SYNC_TARGET:
842 			ns.pdsk = D_UP_TO_DATE;
843 			break;
844 		case C_WF_BITMAP_S:
845 		case C_PAUSED_SYNC_S:
846 			/* remap any consistent state to D_OUTDATED,
847 			 * but disallow "upgrade" of not even consistent states.
848 			 */
849 			ns.pdsk =
850 				(D_DISKLESS < os.pdsk && os.pdsk < D_OUTDATED)
851 				? os.pdsk : D_OUTDATED;
852 			break;
853 		case C_SYNC_SOURCE:
854 			ns.pdsk = D_INCONSISTENT;
855 			dev_warn(DEV, "Implicitly set pdsk Inconsistent!\n");
856 			break;
857 		}
858 		if (os.pdsk == D_OUTDATED && ns.pdsk == D_UP_TO_DATE)
859 			dev_warn(DEV, "Implicitly set pdsk from Outdated to UpToDate\n");
860 	}
861 
862 	/* Connection breaks down before we finished "Negotiating" */
863 	if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
864 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
865 		if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
866 			ns.disk = mdev->new_state_tmp.disk;
867 			ns.pdsk = mdev->new_state_tmp.pdsk;
868 		} else {
869 			dev_alert(DEV, "Connection lost while negotiating, no data!\n");
870 			ns.disk = D_DISKLESS;
871 			ns.pdsk = D_UNKNOWN;
872 		}
873 		put_ldev(mdev);
874 	}
875 
876 	if (fp == FP_STONITH &&
877 	    (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
878 	    !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
879 		ns.susp = 1;
880 
881 	if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
882 		if (ns.conn == C_SYNC_SOURCE)
883 			ns.conn = C_PAUSED_SYNC_S;
884 		if (ns.conn == C_SYNC_TARGET)
885 			ns.conn = C_PAUSED_SYNC_T;
886 	} else {
887 		if (ns.conn == C_PAUSED_SYNC_S)
888 			ns.conn = C_SYNC_SOURCE;
889 		if (ns.conn == C_PAUSED_SYNC_T)
890 			ns.conn = C_SYNC_TARGET;
891 	}
892 
893 	return ns;
894 }
895 
896 /* helper for __drbd_set_state */
897 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
898 {
899 	if (cs == C_VERIFY_T) {
900 		/* starting online verify from an arbitrary position
901 		 * does not fit well into the existing protocol.
902 		 * on C_VERIFY_T, we initialize ov_left and friends
903 		 * implicitly in receive_DataRequest once the
904 		 * first P_OV_REQUEST is received */
905 		mdev->ov_start_sector = ~(sector_t)0;
906 	} else {
907 		unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
908 		if (bit >= mdev->rs_total)
909 			mdev->ov_start_sector =
910 				BM_BIT_TO_SECT(mdev->rs_total - 1);
911 		mdev->ov_position = mdev->ov_start_sector;
912 	}
913 }
914 
915 /**
916  * __drbd_set_state() - Set a new DRBD state
917  * @mdev:	DRBD device.
918  * @ns:		new state.
919  * @flags:	Flags
920  * @done:	Optional completion, that will get completed after the after_state_ch() finished
921  *
922  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
923  */
924 int __drbd_set_state(struct drbd_conf *mdev,
925 		    union drbd_state ns, enum chg_state_flags flags,
926 		    struct completion *done)
927 {
928 	union drbd_state os;
929 	int rv = SS_SUCCESS;
930 	int warn_sync_abort = 0;
931 	struct after_state_chg_work *ascw;
932 
933 	os = mdev->state;
934 
935 	ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
936 
937 	if (ns.i == os.i)
938 		return SS_NOTHING_TO_DO;
939 
940 	if (!(flags & CS_HARD)) {
941 		/*  pre-state-change checks ; only look at ns  */
942 		/* See drbd_state_sw_errors in drbd_strings.c */
943 
944 		rv = is_valid_state(mdev, ns);
945 		if (rv < SS_SUCCESS) {
946 			/* If the old state was illegal as well, then let
947 			   this happen...*/
948 
949 			if (is_valid_state(mdev, os) == rv) {
950 				dev_err(DEV, "Considering state change from bad state. "
951 				    "Error would be: '%s'\n",
952 				    drbd_set_st_err_str(rv));
953 				print_st(mdev, "old", os);
954 				print_st(mdev, "new", ns);
955 				rv = is_valid_state_transition(mdev, ns, os);
956 			}
957 		} else
958 			rv = is_valid_state_transition(mdev, ns, os);
959 	}
960 
961 	if (rv < SS_SUCCESS) {
962 		if (flags & CS_VERBOSE)
963 			print_st_err(mdev, os, ns, rv);
964 		return rv;
965 	}
966 
967 	if (warn_sync_abort)
968 		dev_warn(DEV, "Resync aborted.\n");
969 
970 	{
971 		char *pbp, pb[300];
972 		pbp = pb;
973 		*pbp = 0;
974 		PSC(role);
975 		PSC(peer);
976 		PSC(conn);
977 		PSC(disk);
978 		PSC(pdsk);
979 		PSC(susp);
980 		PSC(aftr_isp);
981 		PSC(peer_isp);
982 		PSC(user_isp);
983 		dev_info(DEV, "%s\n", pb);
984 	}
985 
986 	/* solve the race between becoming unconfigured,
987 	 * worker doing the cleanup, and
988 	 * admin reconfiguring us:
989 	 * on (re)configure, first set CONFIG_PENDING,
990 	 * then wait for a potentially exiting worker,
991 	 * start the worker, and schedule one no_op.
992 	 * then proceed with configuration.
993 	 */
994 	if (ns.disk == D_DISKLESS &&
995 	    ns.conn == C_STANDALONE &&
996 	    ns.role == R_SECONDARY &&
997 	    !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
998 		set_bit(DEVICE_DYING, &mdev->flags);
999 
1000 	mdev->state.i = ns.i;
1001 	wake_up(&mdev->misc_wait);
1002 	wake_up(&mdev->state_wait);
1003 
1004 	/*   post-state-change actions   */
1005 	if (os.conn >= C_SYNC_SOURCE   && ns.conn <= C_CONNECTED) {
1006 		set_bit(STOP_SYNC_TIMER, &mdev->flags);
1007 		mod_timer(&mdev->resync_timer, jiffies);
1008 	}
1009 
1010 	/* aborted verify run. log the last position */
1011 	if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1012 	    ns.conn < C_CONNECTED) {
1013 		mdev->ov_start_sector =
1014 			BM_BIT_TO_SECT(mdev->rs_total - mdev->ov_left);
1015 		dev_info(DEV, "Online Verify reached sector %llu\n",
1016 			(unsigned long long)mdev->ov_start_sector);
1017 	}
1018 
1019 	if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1020 	    (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1021 		dev_info(DEV, "Syncer continues.\n");
1022 		mdev->rs_paused += (long)jiffies-(long)mdev->rs_mark_time;
1023 		if (ns.conn == C_SYNC_TARGET) {
1024 			if (!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))
1025 				mod_timer(&mdev->resync_timer, jiffies);
1026 			/* This if (!test_bit) is only needed for the case
1027 			   that a device that has ceased to used its timer,
1028 			   i.e. it is already in drbd_resync_finished() gets
1029 			   paused and resumed. */
1030 		}
1031 	}
1032 
1033 	if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1034 	    (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1035 		dev_info(DEV, "Resync suspended\n");
1036 		mdev->rs_mark_time = jiffies;
1037 		if (ns.conn == C_PAUSED_SYNC_T)
1038 			set_bit(STOP_SYNC_TIMER, &mdev->flags);
1039 	}
1040 
1041 	if (os.conn == C_CONNECTED &&
1042 	    (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1043 		mdev->ov_position = 0;
1044 		mdev->rs_total =
1045 		mdev->rs_mark_left = drbd_bm_bits(mdev);
1046 		if (mdev->agreed_pro_version >= 90)
1047 			set_ov_position(mdev, ns.conn);
1048 		else
1049 			mdev->ov_start_sector = 0;
1050 		mdev->ov_left = mdev->rs_total
1051 			      - BM_SECT_TO_BIT(mdev->ov_position);
1052 		mdev->rs_start     =
1053 		mdev->rs_mark_time = jiffies;
1054 		mdev->ov_last_oos_size = 0;
1055 		mdev->ov_last_oos_start = 0;
1056 
1057 		if (ns.conn == C_VERIFY_S) {
1058 			dev_info(DEV, "Starting Online Verify from sector %llu\n",
1059 					(unsigned long long)mdev->ov_position);
1060 			mod_timer(&mdev->resync_timer, jiffies);
1061 		}
1062 	}
1063 
1064 	if (get_ldev(mdev)) {
1065 		u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1066 						 MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1067 						 MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1068 
1069 		if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1070 			mdf |= MDF_CRASHED_PRIMARY;
1071 		if (mdev->state.role == R_PRIMARY ||
1072 		    (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1073 			mdf |= MDF_PRIMARY_IND;
1074 		if (mdev->state.conn > C_WF_REPORT_PARAMS)
1075 			mdf |= MDF_CONNECTED_IND;
1076 		if (mdev->state.disk > D_INCONSISTENT)
1077 			mdf |= MDF_CONSISTENT;
1078 		if (mdev->state.disk > D_OUTDATED)
1079 			mdf |= MDF_WAS_UP_TO_DATE;
1080 		if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1081 			mdf |= MDF_PEER_OUT_DATED;
1082 		if (mdf != mdev->ldev->md.flags) {
1083 			mdev->ldev->md.flags = mdf;
1084 			drbd_md_mark_dirty(mdev);
1085 		}
1086 		if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1087 			drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1088 		put_ldev(mdev);
1089 	}
1090 
1091 	/* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1092 	if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1093 	    os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1094 		set_bit(CONSIDER_RESYNC, &mdev->flags);
1095 
1096 	/* Receiver should clean up itself */
1097 	if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1098 		drbd_thread_stop_nowait(&mdev->receiver);
1099 
1100 	/* Now the receiver finished cleaning up itself, it should die */
1101 	if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1102 		drbd_thread_stop_nowait(&mdev->receiver);
1103 
1104 	/* Upon network failure, we need to restart the receiver. */
1105 	if (os.conn > C_TEAR_DOWN &&
1106 	    ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1107 		drbd_thread_restart_nowait(&mdev->receiver);
1108 
1109 	ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1110 	if (ascw) {
1111 		ascw->os = os;
1112 		ascw->ns = ns;
1113 		ascw->flags = flags;
1114 		ascw->w.cb = w_after_state_ch;
1115 		ascw->done = done;
1116 		drbd_queue_work(&mdev->data.work, &ascw->w);
1117 	} else {
1118 		dev_warn(DEV, "Could not kmalloc an ascw\n");
1119 	}
1120 
1121 	return rv;
1122 }
1123 
1124 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1125 {
1126 	struct after_state_chg_work *ascw =
1127 		container_of(w, struct after_state_chg_work, w);
1128 	after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1129 	if (ascw->flags & CS_WAIT_COMPLETE) {
1130 		D_ASSERT(ascw->done != NULL);
1131 		complete(ascw->done);
1132 	}
1133 	kfree(ascw);
1134 
1135 	return 1;
1136 }
1137 
1138 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1139 {
1140 	if (rv) {
1141 		dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1142 		_drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1143 		return;
1144 	}
1145 
1146 	switch (mdev->state.conn) {
1147 	case C_STARTING_SYNC_T:
1148 		_drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1149 		break;
1150 	case C_STARTING_SYNC_S:
1151 		drbd_start_resync(mdev, C_SYNC_SOURCE);
1152 		break;
1153 	}
1154 }
1155 
1156 /**
1157  * after_state_ch() - Perform after state change actions that may sleep
1158  * @mdev:	DRBD device.
1159  * @os:		old state.
1160  * @ns:		new state.
1161  * @flags:	Flags
1162  */
1163 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1164 			   union drbd_state ns, enum chg_state_flags flags)
1165 {
1166 	enum drbd_fencing_p fp;
1167 
1168 	if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1169 		clear_bit(CRASHED_PRIMARY, &mdev->flags);
1170 		if (mdev->p_uuid)
1171 			mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1172 	}
1173 
1174 	fp = FP_DONT_CARE;
1175 	if (get_ldev(mdev)) {
1176 		fp = mdev->ldev->dc.fencing;
1177 		put_ldev(mdev);
1178 	}
1179 
1180 	/* Inform userspace about the change... */
1181 	drbd_bcast_state(mdev, ns);
1182 
1183 	if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1184 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1185 		drbd_khelper(mdev, "pri-on-incon-degr");
1186 
1187 	/* Here we have the actions that are performed after a
1188 	   state change. This function might sleep */
1189 
1190 	if (fp == FP_STONITH && ns.susp) {
1191 		/* case1: The outdate peer handler is successful:
1192 		 * case2: The connection was established again: */
1193 		if ((os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) ||
1194 		    (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)) {
1195 			tl_clear(mdev);
1196 			spin_lock_irq(&mdev->req_lock);
1197 			_drbd_set_state(_NS(mdev, susp, 0), CS_VERBOSE, NULL);
1198 			spin_unlock_irq(&mdev->req_lock);
1199 		}
1200 	}
1201 	/* Do not change the order of the if above and the two below... */
1202 	if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1203 		drbd_send_uuids(mdev);
1204 		drbd_send_state(mdev);
1205 	}
1206 	if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1207 		drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1208 
1209 	/* Lost contact to peer's copy of the data */
1210 	if ((os.pdsk >= D_INCONSISTENT &&
1211 	     os.pdsk != D_UNKNOWN &&
1212 	     os.pdsk != D_OUTDATED)
1213 	&&  (ns.pdsk < D_INCONSISTENT ||
1214 	     ns.pdsk == D_UNKNOWN ||
1215 	     ns.pdsk == D_OUTDATED)) {
1216 		if (get_ldev(mdev)) {
1217 			if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1218 			    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1219 				drbd_uuid_new_current(mdev);
1220 				drbd_send_uuids(mdev);
1221 			}
1222 			put_ldev(mdev);
1223 		}
1224 	}
1225 
1226 	if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1227 		if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0)
1228 			drbd_uuid_new_current(mdev);
1229 
1230 		/* D_DISKLESS Peer becomes secondary */
1231 		if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1232 			drbd_al_to_on_disk_bm(mdev);
1233 		put_ldev(mdev);
1234 	}
1235 
1236 	/* Last part of the attaching process ... */
1237 	if (ns.conn >= C_CONNECTED &&
1238 	    os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1239 		drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1240 		drbd_send_uuids(mdev);
1241 		drbd_send_state(mdev);
1242 	}
1243 
1244 	/* We want to pause/continue resync, tell peer. */
1245 	if (ns.conn >= C_CONNECTED &&
1246 	     ((os.aftr_isp != ns.aftr_isp) ||
1247 	      (os.user_isp != ns.user_isp)))
1248 		drbd_send_state(mdev);
1249 
1250 	/* In case one of the isp bits got set, suspend other devices. */
1251 	if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1252 	    (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1253 		suspend_other_sg(mdev);
1254 
1255 	/* Make sure the peer gets informed about eventual state
1256 	   changes (ISP bits) while we were in WFReportParams. */
1257 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1258 		drbd_send_state(mdev);
1259 
1260 	/* We are in the progress to start a full sync... */
1261 	if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1262 	    (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1263 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1264 
1265 	/* We are invalidating our self... */
1266 	if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1267 	    os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1268 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1269 
1270 	if (os.disk > D_FAILED && ns.disk == D_FAILED) {
1271 		enum drbd_io_error_p eh;
1272 
1273 		eh = EP_PASS_ON;
1274 		if (get_ldev_if_state(mdev, D_FAILED)) {
1275 			eh = mdev->ldev->dc.on_io_error;
1276 			put_ldev(mdev);
1277 		}
1278 
1279 		drbd_rs_cancel_all(mdev);
1280 		/* since get_ldev() only works as long as disk>=D_INCONSISTENT,
1281 		   and it is D_DISKLESS here, local_cnt can only go down, it can
1282 		   not increase... It will reach zero */
1283 		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1284 		mdev->rs_total = 0;
1285 		mdev->rs_failed = 0;
1286 		atomic_set(&mdev->rs_pending_cnt, 0);
1287 
1288 		spin_lock_irq(&mdev->req_lock);
1289 		_drbd_set_state(_NS(mdev, disk, D_DISKLESS), CS_HARD, NULL);
1290 		spin_unlock_irq(&mdev->req_lock);
1291 
1292 		if (eh == EP_CALL_HELPER)
1293 			drbd_khelper(mdev, "local-io-error");
1294 	}
1295 
1296 	if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
1297 
1298 		if (os.disk == D_FAILED) /* && ns.disk == D_DISKLESS*/ {
1299 			if (drbd_send_state(mdev))
1300 				dev_warn(DEV, "Notified peer that my disk is broken.\n");
1301 			else
1302 				dev_err(DEV, "Sending state in drbd_io_error() failed\n");
1303 		}
1304 
1305 		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1306 		lc_destroy(mdev->resync);
1307 		mdev->resync = NULL;
1308 		lc_destroy(mdev->act_log);
1309 		mdev->act_log = NULL;
1310 		__no_warn(local,
1311 			drbd_free_bc(mdev->ldev);
1312 			mdev->ldev = NULL;);
1313 
1314 		if (mdev->md_io_tmpp)
1315 			__free_page(mdev->md_io_tmpp);
1316 	}
1317 
1318 	/* Disks got bigger while they were detached */
1319 	if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1320 	    test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1321 		if (ns.conn == C_CONNECTED)
1322 			resync_after_online_grow(mdev);
1323 	}
1324 
1325 	/* A resync finished or aborted, wake paused devices... */
1326 	if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1327 	    (os.peer_isp && !ns.peer_isp) ||
1328 	    (os.user_isp && !ns.user_isp))
1329 		resume_next_sg(mdev);
1330 
1331 	/* Upon network connection, we need to start the receiver */
1332 	if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1333 		drbd_thread_start(&mdev->receiver);
1334 
1335 	/* Terminate worker thread if we are unconfigured - it will be
1336 	   restarted as needed... */
1337 	if (ns.disk == D_DISKLESS &&
1338 	    ns.conn == C_STANDALONE &&
1339 	    ns.role == R_SECONDARY) {
1340 		if (os.aftr_isp != ns.aftr_isp)
1341 			resume_next_sg(mdev);
1342 		/* set in __drbd_set_state, unless CONFIG_PENDING was set */
1343 		if (test_bit(DEVICE_DYING, &mdev->flags))
1344 			drbd_thread_stop_nowait(&mdev->worker);
1345 	}
1346 
1347 	drbd_md_sync(mdev);
1348 }
1349 
1350 
1351 static int drbd_thread_setup(void *arg)
1352 {
1353 	struct drbd_thread *thi = (struct drbd_thread *) arg;
1354 	struct drbd_conf *mdev = thi->mdev;
1355 	unsigned long flags;
1356 	int retval;
1357 
1358 restart:
1359 	retval = thi->function(thi);
1360 
1361 	spin_lock_irqsave(&thi->t_lock, flags);
1362 
1363 	/* if the receiver has been "Exiting", the last thing it did
1364 	 * was set the conn state to "StandAlone",
1365 	 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1366 	 * and receiver thread will be "started".
1367 	 * drbd_thread_start needs to set "Restarting" in that case.
1368 	 * t_state check and assignment needs to be within the same spinlock,
1369 	 * so either thread_start sees Exiting, and can remap to Restarting,
1370 	 * or thread_start see None, and can proceed as normal.
1371 	 */
1372 
1373 	if (thi->t_state == Restarting) {
1374 		dev_info(DEV, "Restarting %s\n", current->comm);
1375 		thi->t_state = Running;
1376 		spin_unlock_irqrestore(&thi->t_lock, flags);
1377 		goto restart;
1378 	}
1379 
1380 	thi->task = NULL;
1381 	thi->t_state = None;
1382 	smp_mb();
1383 	complete(&thi->stop);
1384 	spin_unlock_irqrestore(&thi->t_lock, flags);
1385 
1386 	dev_info(DEV, "Terminating %s\n", current->comm);
1387 
1388 	/* Release mod reference taken when thread was started */
1389 	module_put(THIS_MODULE);
1390 	return retval;
1391 }
1392 
1393 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1394 		      int (*func) (struct drbd_thread *))
1395 {
1396 	spin_lock_init(&thi->t_lock);
1397 	thi->task    = NULL;
1398 	thi->t_state = None;
1399 	thi->function = func;
1400 	thi->mdev = mdev;
1401 }
1402 
1403 int drbd_thread_start(struct drbd_thread *thi)
1404 {
1405 	struct drbd_conf *mdev = thi->mdev;
1406 	struct task_struct *nt;
1407 	unsigned long flags;
1408 
1409 	const char *me =
1410 		thi == &mdev->receiver ? "receiver" :
1411 		thi == &mdev->asender  ? "asender"  :
1412 		thi == &mdev->worker   ? "worker"   : "NONSENSE";
1413 
1414 	/* is used from state engine doing drbd_thread_stop_nowait,
1415 	 * while holding the req lock irqsave */
1416 	spin_lock_irqsave(&thi->t_lock, flags);
1417 
1418 	switch (thi->t_state) {
1419 	case None:
1420 		dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1421 				me, current->comm, current->pid);
1422 
1423 		/* Get ref on module for thread - this is released when thread exits */
1424 		if (!try_module_get(THIS_MODULE)) {
1425 			dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1426 			spin_unlock_irqrestore(&thi->t_lock, flags);
1427 			return FALSE;
1428 		}
1429 
1430 		init_completion(&thi->stop);
1431 		D_ASSERT(thi->task == NULL);
1432 		thi->reset_cpu_mask = 1;
1433 		thi->t_state = Running;
1434 		spin_unlock_irqrestore(&thi->t_lock, flags);
1435 		flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1436 
1437 		nt = kthread_create(drbd_thread_setup, (void *) thi,
1438 				    "drbd%d_%s", mdev_to_minor(mdev), me);
1439 
1440 		if (IS_ERR(nt)) {
1441 			dev_err(DEV, "Couldn't start thread\n");
1442 
1443 			module_put(THIS_MODULE);
1444 			return FALSE;
1445 		}
1446 		spin_lock_irqsave(&thi->t_lock, flags);
1447 		thi->task = nt;
1448 		thi->t_state = Running;
1449 		spin_unlock_irqrestore(&thi->t_lock, flags);
1450 		wake_up_process(nt);
1451 		break;
1452 	case Exiting:
1453 		thi->t_state = Restarting;
1454 		dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1455 				me, current->comm, current->pid);
1456 		/* fall through */
1457 	case Running:
1458 	case Restarting:
1459 	default:
1460 		spin_unlock_irqrestore(&thi->t_lock, flags);
1461 		break;
1462 	}
1463 
1464 	return TRUE;
1465 }
1466 
1467 
1468 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1469 {
1470 	unsigned long flags;
1471 
1472 	enum drbd_thread_state ns = restart ? Restarting : Exiting;
1473 
1474 	/* may be called from state engine, holding the req lock irqsave */
1475 	spin_lock_irqsave(&thi->t_lock, flags);
1476 
1477 	if (thi->t_state == None) {
1478 		spin_unlock_irqrestore(&thi->t_lock, flags);
1479 		if (restart)
1480 			drbd_thread_start(thi);
1481 		return;
1482 	}
1483 
1484 	if (thi->t_state != ns) {
1485 		if (thi->task == NULL) {
1486 			spin_unlock_irqrestore(&thi->t_lock, flags);
1487 			return;
1488 		}
1489 
1490 		thi->t_state = ns;
1491 		smp_mb();
1492 		init_completion(&thi->stop);
1493 		if (thi->task != current)
1494 			force_sig(DRBD_SIGKILL, thi->task);
1495 
1496 	}
1497 
1498 	spin_unlock_irqrestore(&thi->t_lock, flags);
1499 
1500 	if (wait)
1501 		wait_for_completion(&thi->stop);
1502 }
1503 
1504 #ifdef CONFIG_SMP
1505 /**
1506  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1507  * @mdev:	DRBD device.
1508  *
1509  * Forces all threads of a device onto the same CPU. This is beneficial for
1510  * DRBD's performance. May be overwritten by user's configuration.
1511  */
1512 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1513 {
1514 	int ord, cpu;
1515 
1516 	/* user override. */
1517 	if (cpumask_weight(mdev->cpu_mask))
1518 		return;
1519 
1520 	ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1521 	for_each_online_cpu(cpu) {
1522 		if (ord-- == 0) {
1523 			cpumask_set_cpu(cpu, mdev->cpu_mask);
1524 			return;
1525 		}
1526 	}
1527 	/* should not be reached */
1528 	cpumask_setall(mdev->cpu_mask);
1529 }
1530 
1531 /**
1532  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1533  * @mdev:	DRBD device.
1534  *
1535  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1536  * prematurely.
1537  */
1538 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1539 {
1540 	struct task_struct *p = current;
1541 	struct drbd_thread *thi =
1542 		p == mdev->asender.task  ? &mdev->asender  :
1543 		p == mdev->receiver.task ? &mdev->receiver :
1544 		p == mdev->worker.task   ? &mdev->worker   :
1545 		NULL;
1546 	ERR_IF(thi == NULL)
1547 		return;
1548 	if (!thi->reset_cpu_mask)
1549 		return;
1550 	thi->reset_cpu_mask = 0;
1551 	set_cpus_allowed_ptr(p, mdev->cpu_mask);
1552 }
1553 #endif
1554 
1555 /* the appropriate socket mutex must be held already */
1556 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1557 			  enum drbd_packets cmd, struct p_header *h,
1558 			  size_t size, unsigned msg_flags)
1559 {
1560 	int sent, ok;
1561 
1562 	ERR_IF(!h) return FALSE;
1563 	ERR_IF(!size) return FALSE;
1564 
1565 	h->magic   = BE_DRBD_MAGIC;
1566 	h->command = cpu_to_be16(cmd);
1567 	h->length  = cpu_to_be16(size-sizeof(struct p_header));
1568 
1569 	sent = drbd_send(mdev, sock, h, size, msg_flags);
1570 
1571 	ok = (sent == size);
1572 	if (!ok)
1573 		dev_err(DEV, "short sent %s size=%d sent=%d\n",
1574 		    cmdname(cmd), (int)size, sent);
1575 	return ok;
1576 }
1577 
1578 /* don't pass the socket. we may only look at it
1579  * when we hold the appropriate socket mutex.
1580  */
1581 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1582 		  enum drbd_packets cmd, struct p_header *h, size_t size)
1583 {
1584 	int ok = 0;
1585 	struct socket *sock;
1586 
1587 	if (use_data_socket) {
1588 		mutex_lock(&mdev->data.mutex);
1589 		sock = mdev->data.socket;
1590 	} else {
1591 		mutex_lock(&mdev->meta.mutex);
1592 		sock = mdev->meta.socket;
1593 	}
1594 
1595 	/* drbd_disconnect() could have called drbd_free_sock()
1596 	 * while we were waiting in down()... */
1597 	if (likely(sock != NULL))
1598 		ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1599 
1600 	if (use_data_socket)
1601 		mutex_unlock(&mdev->data.mutex);
1602 	else
1603 		mutex_unlock(&mdev->meta.mutex);
1604 	return ok;
1605 }
1606 
1607 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1608 		   size_t size)
1609 {
1610 	struct p_header h;
1611 	int ok;
1612 
1613 	h.magic   = BE_DRBD_MAGIC;
1614 	h.command = cpu_to_be16(cmd);
1615 	h.length  = cpu_to_be16(size);
1616 
1617 	if (!drbd_get_data_sock(mdev))
1618 		return 0;
1619 
1620 	ok = (sizeof(h) ==
1621 		drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1622 	ok = ok && (size ==
1623 		drbd_send(mdev, mdev->data.socket, data, size, 0));
1624 
1625 	drbd_put_data_sock(mdev);
1626 
1627 	return ok;
1628 }
1629 
1630 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1631 {
1632 	struct p_rs_param_89 *p;
1633 	struct socket *sock;
1634 	int size, rv;
1635 	const int apv = mdev->agreed_pro_version;
1636 
1637 	size = apv <= 87 ? sizeof(struct p_rs_param)
1638 		: apv == 88 ? sizeof(struct p_rs_param)
1639 			+ strlen(mdev->sync_conf.verify_alg) + 1
1640 		: /* 89 */    sizeof(struct p_rs_param_89);
1641 
1642 	/* used from admin command context and receiver/worker context.
1643 	 * to avoid kmalloc, grab the socket right here,
1644 	 * then use the pre-allocated sbuf there */
1645 	mutex_lock(&mdev->data.mutex);
1646 	sock = mdev->data.socket;
1647 
1648 	if (likely(sock != NULL)) {
1649 		enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1650 
1651 		p = &mdev->data.sbuf.rs_param_89;
1652 
1653 		/* initialize verify_alg and csums_alg */
1654 		memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1655 
1656 		p->rate = cpu_to_be32(sc->rate);
1657 
1658 		if (apv >= 88)
1659 			strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1660 		if (apv >= 89)
1661 			strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1662 
1663 		rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1664 	} else
1665 		rv = 0; /* not ok */
1666 
1667 	mutex_unlock(&mdev->data.mutex);
1668 
1669 	return rv;
1670 }
1671 
1672 int drbd_send_protocol(struct drbd_conf *mdev)
1673 {
1674 	struct p_protocol *p;
1675 	int size, cf, rv;
1676 
1677 	size = sizeof(struct p_protocol);
1678 
1679 	if (mdev->agreed_pro_version >= 87)
1680 		size += strlen(mdev->net_conf->integrity_alg) + 1;
1681 
1682 	/* we must not recurse into our own queue,
1683 	 * as that is blocked during handshake */
1684 	p = kmalloc(size, GFP_NOIO);
1685 	if (p == NULL)
1686 		return 0;
1687 
1688 	p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1689 	p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1690 	p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1691 	p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1692 	p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1693 
1694 	cf = 0;
1695 	if (mdev->net_conf->want_lose)
1696 		cf |= CF_WANT_LOSE;
1697 	if (mdev->net_conf->dry_run) {
1698 		if (mdev->agreed_pro_version >= 92)
1699 			cf |= CF_DRY_RUN;
1700 		else {
1701 			dev_err(DEV, "--dry-run is not supported by peer");
1702 			kfree(p);
1703 			return 0;
1704 		}
1705 	}
1706 	p->conn_flags    = cpu_to_be32(cf);
1707 
1708 	if (mdev->agreed_pro_version >= 87)
1709 		strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1710 
1711 	rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1712 			   (struct p_header *)p, size);
1713 	kfree(p);
1714 	return rv;
1715 }
1716 
1717 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1718 {
1719 	struct p_uuids p;
1720 	int i;
1721 
1722 	if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1723 		return 1;
1724 
1725 	for (i = UI_CURRENT; i < UI_SIZE; i++)
1726 		p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1727 
1728 	mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1729 	p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1730 	uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1731 	uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1732 	uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1733 	p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1734 
1735 	put_ldev(mdev);
1736 
1737 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
1738 			     (struct p_header *)&p, sizeof(p));
1739 }
1740 
1741 int drbd_send_uuids(struct drbd_conf *mdev)
1742 {
1743 	return _drbd_send_uuids(mdev, 0);
1744 }
1745 
1746 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1747 {
1748 	return _drbd_send_uuids(mdev, 8);
1749 }
1750 
1751 
1752 int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
1753 {
1754 	struct p_rs_uuid p;
1755 
1756 	p.uuid = cpu_to_be64(val);
1757 
1758 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
1759 			     (struct p_header *)&p, sizeof(p));
1760 }
1761 
1762 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1763 {
1764 	struct p_sizes p;
1765 	sector_t d_size, u_size;
1766 	int q_order_type;
1767 	int ok;
1768 
1769 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1770 		D_ASSERT(mdev->ldev->backing_bdev);
1771 		d_size = drbd_get_max_capacity(mdev->ldev);
1772 		u_size = mdev->ldev->dc.disk_size;
1773 		q_order_type = drbd_queue_order_type(mdev);
1774 		put_ldev(mdev);
1775 	} else {
1776 		d_size = 0;
1777 		u_size = 0;
1778 		q_order_type = QUEUE_ORDERED_NONE;
1779 	}
1780 
1781 	p.d_size = cpu_to_be64(d_size);
1782 	p.u_size = cpu_to_be64(u_size);
1783 	p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1784 	p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue));
1785 	p.queue_order_type = cpu_to_be16(q_order_type);
1786 	p.dds_flags = cpu_to_be16(flags);
1787 
1788 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
1789 			   (struct p_header *)&p, sizeof(p));
1790 	return ok;
1791 }
1792 
1793 /**
1794  * drbd_send_state() - Sends the drbd state to the peer
1795  * @mdev:	DRBD device.
1796  */
1797 int drbd_send_state(struct drbd_conf *mdev)
1798 {
1799 	struct socket *sock;
1800 	struct p_state p;
1801 	int ok = 0;
1802 
1803 	/* Grab state lock so we wont send state if we're in the middle
1804 	 * of a cluster wide state change on another thread */
1805 	drbd_state_lock(mdev);
1806 
1807 	mutex_lock(&mdev->data.mutex);
1808 
1809 	p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1810 	sock = mdev->data.socket;
1811 
1812 	if (likely(sock != NULL)) {
1813 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
1814 				    (struct p_header *)&p, sizeof(p), 0);
1815 	}
1816 
1817 	mutex_unlock(&mdev->data.mutex);
1818 
1819 	drbd_state_unlock(mdev);
1820 	return ok;
1821 }
1822 
1823 int drbd_send_state_req(struct drbd_conf *mdev,
1824 	union drbd_state mask, union drbd_state val)
1825 {
1826 	struct p_req_state p;
1827 
1828 	p.mask    = cpu_to_be32(mask.i);
1829 	p.val     = cpu_to_be32(val.i);
1830 
1831 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
1832 			     (struct p_header *)&p, sizeof(p));
1833 }
1834 
1835 int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
1836 {
1837 	struct p_req_state_reply p;
1838 
1839 	p.retcode    = cpu_to_be32(retcode);
1840 
1841 	return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
1842 			     (struct p_header *)&p, sizeof(p));
1843 }
1844 
1845 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1846 	struct p_compressed_bm *p,
1847 	struct bm_xfer_ctx *c)
1848 {
1849 	struct bitstream bs;
1850 	unsigned long plain_bits;
1851 	unsigned long tmp;
1852 	unsigned long rl;
1853 	unsigned len;
1854 	unsigned toggle;
1855 	int bits;
1856 
1857 	/* may we use this feature? */
1858 	if ((mdev->sync_conf.use_rle == 0) ||
1859 		(mdev->agreed_pro_version < 90))
1860 			return 0;
1861 
1862 	if (c->bit_offset >= c->bm_bits)
1863 		return 0; /* nothing to do. */
1864 
1865 	/* use at most thus many bytes */
1866 	bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1867 	memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1868 	/* plain bits covered in this code string */
1869 	plain_bits = 0;
1870 
1871 	/* p->encoding & 0x80 stores whether the first run length is set.
1872 	 * bit offset is implicit.
1873 	 * start with toggle == 2 to be able to tell the first iteration */
1874 	toggle = 2;
1875 
1876 	/* see how much plain bits we can stuff into one packet
1877 	 * using RLE and VLI. */
1878 	do {
1879 		tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1880 				    : _drbd_bm_find_next(mdev, c->bit_offset);
1881 		if (tmp == -1UL)
1882 			tmp = c->bm_bits;
1883 		rl = tmp - c->bit_offset;
1884 
1885 		if (toggle == 2) { /* first iteration */
1886 			if (rl == 0) {
1887 				/* the first checked bit was set,
1888 				 * store start value, */
1889 				DCBP_set_start(p, 1);
1890 				/* but skip encoding of zero run length */
1891 				toggle = !toggle;
1892 				continue;
1893 			}
1894 			DCBP_set_start(p, 0);
1895 		}
1896 
1897 		/* paranoia: catch zero runlength.
1898 		 * can only happen if bitmap is modified while we scan it. */
1899 		if (rl == 0) {
1900 			dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1901 			    "t:%u bo:%lu\n", toggle, c->bit_offset);
1902 			return -1;
1903 		}
1904 
1905 		bits = vli_encode_bits(&bs, rl);
1906 		if (bits == -ENOBUFS) /* buffer full */
1907 			break;
1908 		if (bits <= 0) {
1909 			dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1910 			return 0;
1911 		}
1912 
1913 		toggle = !toggle;
1914 		plain_bits += rl;
1915 		c->bit_offset = tmp;
1916 	} while (c->bit_offset < c->bm_bits);
1917 
1918 	len = bs.cur.b - p->code + !!bs.cur.bit;
1919 
1920 	if (plain_bits < (len << 3)) {
1921 		/* incompressible with this method.
1922 		 * we need to rewind both word and bit position. */
1923 		c->bit_offset -= plain_bits;
1924 		bm_xfer_ctx_bit_to_word_offset(c);
1925 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1926 		return 0;
1927 	}
1928 
1929 	/* RLE + VLI was able to compress it just fine.
1930 	 * update c->word_offset. */
1931 	bm_xfer_ctx_bit_to_word_offset(c);
1932 
1933 	/* store pad_bits */
1934 	DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1935 
1936 	return len;
1937 }
1938 
1939 enum { OK, FAILED, DONE }
1940 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1941 	struct p_header *h, struct bm_xfer_ctx *c)
1942 {
1943 	struct p_compressed_bm *p = (void*)h;
1944 	unsigned long num_words;
1945 	int len;
1946 	int ok;
1947 
1948 	len = fill_bitmap_rle_bits(mdev, p, c);
1949 
1950 	if (len < 0)
1951 		return FAILED;
1952 
1953 	if (len) {
1954 		DCBP_set_code(p, RLE_VLI_Bits);
1955 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
1956 			sizeof(*p) + len, 0);
1957 
1958 		c->packets[0]++;
1959 		c->bytes[0] += sizeof(*p) + len;
1960 
1961 		if (c->bit_offset >= c->bm_bits)
1962 			len = 0; /* DONE */
1963 	} else {
1964 		/* was not compressible.
1965 		 * send a buffer full of plain text bits instead. */
1966 		num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1967 		len = num_words * sizeof(long);
1968 		if (len)
1969 			drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
1970 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
1971 				   h, sizeof(struct p_header) + len, 0);
1972 		c->word_offset += num_words;
1973 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1974 
1975 		c->packets[1]++;
1976 		c->bytes[1] += sizeof(struct p_header) + len;
1977 
1978 		if (c->bit_offset > c->bm_bits)
1979 			c->bit_offset = c->bm_bits;
1980 	}
1981 	ok = ok ? ((len == 0) ? DONE : OK) : FAILED;
1982 
1983 	if (ok == DONE)
1984 		INFO_bm_xfer_stats(mdev, "send", c);
1985 	return ok;
1986 }
1987 
1988 /* See the comment at receive_bitmap() */
1989 int _drbd_send_bitmap(struct drbd_conf *mdev)
1990 {
1991 	struct bm_xfer_ctx c;
1992 	struct p_header *p;
1993 	int ret;
1994 
1995 	ERR_IF(!mdev->bitmap) return FALSE;
1996 
1997 	/* maybe we should use some per thread scratch page,
1998 	 * and allocate that during initial device creation? */
1999 	p = (struct p_header *) __get_free_page(GFP_NOIO);
2000 	if (!p) {
2001 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2002 		return FALSE;
2003 	}
2004 
2005 	if (get_ldev(mdev)) {
2006 		if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2007 			dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2008 			drbd_bm_set_all(mdev);
2009 			if (drbd_bm_write(mdev)) {
2010 				/* write_bm did fail! Leave full sync flag set in Meta P_DATA
2011 				 * but otherwise process as per normal - need to tell other
2012 				 * side that a full resync is required! */
2013 				dev_err(DEV, "Failed to write bitmap to disk!\n");
2014 			} else {
2015 				drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2016 				drbd_md_sync(mdev);
2017 			}
2018 		}
2019 		put_ldev(mdev);
2020 	}
2021 
2022 	c = (struct bm_xfer_ctx) {
2023 		.bm_bits = drbd_bm_bits(mdev),
2024 		.bm_words = drbd_bm_words(mdev),
2025 	};
2026 
2027 	do {
2028 		ret = send_bitmap_rle_or_plain(mdev, p, &c);
2029 	} while (ret == OK);
2030 
2031 	free_page((unsigned long) p);
2032 	return (ret == DONE);
2033 }
2034 
2035 int drbd_send_bitmap(struct drbd_conf *mdev)
2036 {
2037 	int err;
2038 
2039 	if (!drbd_get_data_sock(mdev))
2040 		return -1;
2041 	err = !_drbd_send_bitmap(mdev);
2042 	drbd_put_data_sock(mdev);
2043 	return err;
2044 }
2045 
2046 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2047 {
2048 	int ok;
2049 	struct p_barrier_ack p;
2050 
2051 	p.barrier  = barrier_nr;
2052 	p.set_size = cpu_to_be32(set_size);
2053 
2054 	if (mdev->state.conn < C_CONNECTED)
2055 		return FALSE;
2056 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2057 			(struct p_header *)&p, sizeof(p));
2058 	return ok;
2059 }
2060 
2061 /**
2062  * _drbd_send_ack() - Sends an ack packet
2063  * @mdev:	DRBD device.
2064  * @cmd:	Packet command code.
2065  * @sector:	sector, needs to be in big endian byte order
2066  * @blksize:	size in byte, needs to be in big endian byte order
2067  * @block_id:	Id, big endian byte order
2068  */
2069 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2070 			  u64 sector,
2071 			  u32 blksize,
2072 			  u64 block_id)
2073 {
2074 	int ok;
2075 	struct p_block_ack p;
2076 
2077 	p.sector   = sector;
2078 	p.block_id = block_id;
2079 	p.blksize  = blksize;
2080 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2081 
2082 	if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2083 		return FALSE;
2084 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2085 				(struct p_header *)&p, sizeof(p));
2086 	return ok;
2087 }
2088 
2089 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2090 		     struct p_data *dp)
2091 {
2092 	const int header_size = sizeof(struct p_data)
2093 			      - sizeof(struct p_header);
2094 	int data_size  = ((struct p_header *)dp)->length - header_size;
2095 
2096 	return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2097 			      dp->block_id);
2098 }
2099 
2100 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2101 		     struct p_block_req *rp)
2102 {
2103 	return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2104 }
2105 
2106 /**
2107  * drbd_send_ack() - Sends an ack packet
2108  * @mdev:	DRBD device.
2109  * @cmd:	Packet command code.
2110  * @e:		Epoch entry.
2111  */
2112 int drbd_send_ack(struct drbd_conf *mdev,
2113 	enum drbd_packets cmd, struct drbd_epoch_entry *e)
2114 {
2115 	return _drbd_send_ack(mdev, cmd,
2116 			      cpu_to_be64(e->sector),
2117 			      cpu_to_be32(e->size),
2118 			      e->block_id);
2119 }
2120 
2121 /* This function misuses the block_id field to signal if the blocks
2122  * are is sync or not. */
2123 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2124 		     sector_t sector, int blksize, u64 block_id)
2125 {
2126 	return _drbd_send_ack(mdev, cmd,
2127 			      cpu_to_be64(sector),
2128 			      cpu_to_be32(blksize),
2129 			      cpu_to_be64(block_id));
2130 }
2131 
2132 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2133 		       sector_t sector, int size, u64 block_id)
2134 {
2135 	int ok;
2136 	struct p_block_req p;
2137 
2138 	p.sector   = cpu_to_be64(sector);
2139 	p.block_id = block_id;
2140 	p.blksize  = cpu_to_be32(size);
2141 
2142 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2143 				(struct p_header *)&p, sizeof(p));
2144 	return ok;
2145 }
2146 
2147 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2148 			    sector_t sector, int size,
2149 			    void *digest, int digest_size,
2150 			    enum drbd_packets cmd)
2151 {
2152 	int ok;
2153 	struct p_block_req p;
2154 
2155 	p.sector   = cpu_to_be64(sector);
2156 	p.block_id = BE_DRBD_MAGIC + 0xbeef;
2157 	p.blksize  = cpu_to_be32(size);
2158 
2159 	p.head.magic   = BE_DRBD_MAGIC;
2160 	p.head.command = cpu_to_be16(cmd);
2161 	p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header) + digest_size);
2162 
2163 	mutex_lock(&mdev->data.mutex);
2164 
2165 	ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2166 	ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2167 
2168 	mutex_unlock(&mdev->data.mutex);
2169 
2170 	return ok;
2171 }
2172 
2173 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2174 {
2175 	int ok;
2176 	struct p_block_req p;
2177 
2178 	p.sector   = cpu_to_be64(sector);
2179 	p.block_id = BE_DRBD_MAGIC + 0xbabe;
2180 	p.blksize  = cpu_to_be32(size);
2181 
2182 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2183 			   (struct p_header *)&p, sizeof(p));
2184 	return ok;
2185 }
2186 
2187 static int drbd_send_delay_probe(struct drbd_conf *mdev, struct drbd_socket *ds)
2188 {
2189 	struct p_delay_probe dp;
2190 	int offset, ok = 0;
2191 	struct timeval now;
2192 
2193 	mutex_lock(&ds->mutex);
2194 	if (likely(ds->socket)) {
2195 		do_gettimeofday(&now);
2196 		offset = now.tv_usec - mdev->dps_time.tv_usec +
2197 			 (now.tv_sec - mdev->dps_time.tv_sec) * 1000000;
2198 		dp.seq_num  = cpu_to_be32(mdev->delay_seq);
2199 		dp.offset   = cpu_to_be32(offset);
2200 
2201 		ok = _drbd_send_cmd(mdev, ds->socket, P_DELAY_PROBE,
2202 				    (struct p_header *)&dp, sizeof(dp), 0);
2203 	}
2204 	mutex_unlock(&ds->mutex);
2205 
2206 	return ok;
2207 }
2208 
2209 static int drbd_send_delay_probes(struct drbd_conf *mdev)
2210 {
2211 	int ok;
2212 
2213 	mdev->delay_seq++;
2214 	do_gettimeofday(&mdev->dps_time);
2215 	ok = drbd_send_delay_probe(mdev, &mdev->meta);
2216 	ok = ok && drbd_send_delay_probe(mdev, &mdev->data);
2217 
2218 	mdev->dp_volume_last = mdev->send_cnt;
2219 	mod_timer(&mdev->delay_probe_timer, jiffies + mdev->sync_conf.dp_interval * HZ / 10);
2220 
2221 	return ok;
2222 }
2223 
2224 /* called on sndtimeo
2225  * returns FALSE if we should retry,
2226  * TRUE if we think connection is dead
2227  */
2228 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2229 {
2230 	int drop_it;
2231 	/* long elapsed = (long)(jiffies - mdev->last_received); */
2232 
2233 	drop_it =   mdev->meta.socket == sock
2234 		|| !mdev->asender.task
2235 		|| get_t_state(&mdev->asender) != Running
2236 		|| mdev->state.conn < C_CONNECTED;
2237 
2238 	if (drop_it)
2239 		return TRUE;
2240 
2241 	drop_it = !--mdev->ko_count;
2242 	if (!drop_it) {
2243 		dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2244 		       current->comm, current->pid, mdev->ko_count);
2245 		request_ping(mdev);
2246 	}
2247 
2248 	return drop_it; /* && (mdev->state == R_PRIMARY) */;
2249 }
2250 
2251 /* The idea of sendpage seems to be to put some kind of reference
2252  * to the page into the skb, and to hand it over to the NIC. In
2253  * this process get_page() gets called.
2254  *
2255  * As soon as the page was really sent over the network put_page()
2256  * gets called by some part of the network layer. [ NIC driver? ]
2257  *
2258  * [ get_page() / put_page() increment/decrement the count. If count
2259  *   reaches 0 the page will be freed. ]
2260  *
2261  * This works nicely with pages from FSs.
2262  * But this means that in protocol A we might signal IO completion too early!
2263  *
2264  * In order not to corrupt data during a resync we must make sure
2265  * that we do not reuse our own buffer pages (EEs) to early, therefore
2266  * we have the net_ee list.
2267  *
2268  * XFS seems to have problems, still, it submits pages with page_count == 0!
2269  * As a workaround, we disable sendpage on pages
2270  * with page_count == 0 or PageSlab.
2271  */
2272 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2273 		   int offset, size_t size, unsigned msg_flags)
2274 {
2275 	int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags);
2276 	kunmap(page);
2277 	if (sent == size)
2278 		mdev->send_cnt += size>>9;
2279 	return sent == size;
2280 }
2281 
2282 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2283 		    int offset, size_t size, unsigned msg_flags)
2284 {
2285 	mm_segment_t oldfs = get_fs();
2286 	int sent, ok;
2287 	int len = size;
2288 
2289 	/* e.g. XFS meta- & log-data is in slab pages, which have a
2290 	 * page_count of 0 and/or have PageSlab() set.
2291 	 * we cannot use send_page for those, as that does get_page();
2292 	 * put_page(); and would cause either a VM_BUG directly, or
2293 	 * __page_cache_release a page that would actually still be referenced
2294 	 * by someone, leading to some obscure delayed Oops somewhere else. */
2295 	if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2296 		return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2297 
2298 	msg_flags |= MSG_NOSIGNAL;
2299 	drbd_update_congested(mdev);
2300 	set_fs(KERNEL_DS);
2301 	do {
2302 		sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2303 							offset, len,
2304 							msg_flags);
2305 		if (sent == -EAGAIN) {
2306 			if (we_should_drop_the_connection(mdev,
2307 							  mdev->data.socket))
2308 				break;
2309 			else
2310 				continue;
2311 		}
2312 		if (sent <= 0) {
2313 			dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2314 			     __func__, (int)size, len, sent);
2315 			break;
2316 		}
2317 		len    -= sent;
2318 		offset += sent;
2319 	} while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2320 	set_fs(oldfs);
2321 	clear_bit(NET_CONGESTED, &mdev->flags);
2322 
2323 	ok = (len == 0);
2324 	if (likely(ok))
2325 		mdev->send_cnt += size>>9;
2326 	return ok;
2327 }
2328 
2329 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2330 {
2331 	struct bio_vec *bvec;
2332 	int i;
2333 	/* hint all but last page with MSG_MORE */
2334 	__bio_for_each_segment(bvec, bio, i, 0) {
2335 		if (!_drbd_no_send_page(mdev, bvec->bv_page,
2336 				     bvec->bv_offset, bvec->bv_len,
2337 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2338 			return 0;
2339 	}
2340 	return 1;
2341 }
2342 
2343 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2344 {
2345 	struct bio_vec *bvec;
2346 	int i;
2347 	/* hint all but last page with MSG_MORE */
2348 	__bio_for_each_segment(bvec, bio, i, 0) {
2349 		if (!_drbd_send_page(mdev, bvec->bv_page,
2350 				     bvec->bv_offset, bvec->bv_len,
2351 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2352 			return 0;
2353 	}
2354 	return 1;
2355 }
2356 
2357 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2358 {
2359 	struct page *page = e->pages;
2360 	unsigned len = e->size;
2361 	/* hint all but last page with MSG_MORE */
2362 	page_chain_for_each(page) {
2363 		unsigned l = min_t(unsigned, len, PAGE_SIZE);
2364 		if (!_drbd_send_page(mdev, page, 0, l,
2365 				page_chain_next(page) ? MSG_MORE : 0))
2366 			return 0;
2367 		len -= l;
2368 	}
2369 	return 1;
2370 }
2371 
2372 static void consider_delay_probes(struct drbd_conf *mdev)
2373 {
2374 	if (mdev->state.conn != C_SYNC_SOURCE || mdev->agreed_pro_version < 93)
2375 		return;
2376 
2377 	if (mdev->dp_volume_last + mdev->sync_conf.dp_volume * 2 < mdev->send_cnt)
2378 		drbd_send_delay_probes(mdev);
2379 }
2380 
2381 static int w_delay_probes(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
2382 {
2383 	if (!cancel && mdev->state.conn == C_SYNC_SOURCE)
2384 		drbd_send_delay_probes(mdev);
2385 
2386 	return 1;
2387 }
2388 
2389 static void delay_probe_timer_fn(unsigned long data)
2390 {
2391 	struct drbd_conf *mdev = (struct drbd_conf *) data;
2392 
2393 	if (list_empty(&mdev->delay_probe_work.list))
2394 		drbd_queue_work(&mdev->data.work, &mdev->delay_probe_work);
2395 }
2396 
2397 /* Used to send write requests
2398  * R_PRIMARY -> Peer	(P_DATA)
2399  */
2400 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2401 {
2402 	int ok = 1;
2403 	struct p_data p;
2404 	unsigned int dp_flags = 0;
2405 	void *dgb;
2406 	int dgs;
2407 
2408 	if (!drbd_get_data_sock(mdev))
2409 		return 0;
2410 
2411 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2412 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2413 
2414 	p.head.magic   = BE_DRBD_MAGIC;
2415 	p.head.command = cpu_to_be16(P_DATA);
2416 	p.head.length  =
2417 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + req->size);
2418 
2419 	p.sector   = cpu_to_be64(req->sector);
2420 	p.block_id = (unsigned long)req;
2421 	p.seq_num  = cpu_to_be32(req->seq_num =
2422 				 atomic_add_return(1, &mdev->packet_seq));
2423 	dp_flags = 0;
2424 
2425 	/* NOTE: no need to check if barriers supported here as we would
2426 	 *       not pass the test in make_request_common in that case
2427 	 */
2428 	if (bio_rw_flagged(req->master_bio, BIO_RW_BARRIER)) {
2429 		dev_err(DEV, "ASSERT FAILED would have set DP_HARDBARRIER\n");
2430 		/* dp_flags |= DP_HARDBARRIER; */
2431 	}
2432 	if (bio_rw_flagged(req->master_bio, BIO_RW_SYNCIO))
2433 		dp_flags |= DP_RW_SYNC;
2434 	/* for now handle SYNCIO and UNPLUG
2435 	 * as if they still were one and the same flag */
2436 	if (bio_rw_flagged(req->master_bio, BIO_RW_UNPLUG))
2437 		dp_flags |= DP_RW_SYNC;
2438 	if (mdev->state.conn >= C_SYNC_SOURCE &&
2439 	    mdev->state.conn <= C_PAUSED_SYNC_T)
2440 		dp_flags |= DP_MAY_SET_IN_SYNC;
2441 
2442 	p.dp_flags = cpu_to_be32(dp_flags);
2443 	set_bit(UNPLUG_REMOTE, &mdev->flags);
2444 	ok = (sizeof(p) ==
2445 		drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2446 	if (ok && dgs) {
2447 		dgb = mdev->int_dig_out;
2448 		drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2449 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2450 	}
2451 	if (ok) {
2452 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
2453 			ok = _drbd_send_bio(mdev, req->master_bio);
2454 		else
2455 			ok = _drbd_send_zc_bio(mdev, req->master_bio);
2456 	}
2457 
2458 	drbd_put_data_sock(mdev);
2459 
2460 	if (ok)
2461 		consider_delay_probes(mdev);
2462 
2463 	return ok;
2464 }
2465 
2466 /* answer packet, used to send data back for read requests:
2467  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2468  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2469  */
2470 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2471 		    struct drbd_epoch_entry *e)
2472 {
2473 	int ok;
2474 	struct p_data p;
2475 	void *dgb;
2476 	int dgs;
2477 
2478 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2479 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2480 
2481 	p.head.magic   = BE_DRBD_MAGIC;
2482 	p.head.command = cpu_to_be16(cmd);
2483 	p.head.length  =
2484 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + e->size);
2485 
2486 	p.sector   = cpu_to_be64(e->sector);
2487 	p.block_id = e->block_id;
2488 	/* p.seq_num  = 0;    No sequence numbers here.. */
2489 
2490 	/* Only called by our kernel thread.
2491 	 * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2492 	 * in response to admin command or module unload.
2493 	 */
2494 	if (!drbd_get_data_sock(mdev))
2495 		return 0;
2496 
2497 	ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p,
2498 					sizeof(p), dgs ? MSG_MORE : 0);
2499 	if (ok && dgs) {
2500 		dgb = mdev->int_dig_out;
2501 		drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2502 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2503 	}
2504 	if (ok)
2505 		ok = _drbd_send_zc_ee(mdev, e);
2506 
2507 	drbd_put_data_sock(mdev);
2508 
2509 	if (ok)
2510 		consider_delay_probes(mdev);
2511 
2512 	return ok;
2513 }
2514 
2515 /*
2516   drbd_send distinguishes two cases:
2517 
2518   Packets sent via the data socket "sock"
2519   and packets sent via the meta data socket "msock"
2520 
2521 		    sock                      msock
2522   -----------------+-------------------------+------------------------------
2523   timeout           conf.timeout / 2          conf.timeout / 2
2524   timeout action    send a ping via msock     Abort communication
2525 					      and close all sockets
2526 */
2527 
2528 /*
2529  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2530  */
2531 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2532 	      void *buf, size_t size, unsigned msg_flags)
2533 {
2534 	struct kvec iov;
2535 	struct msghdr msg;
2536 	int rv, sent = 0;
2537 
2538 	if (!sock)
2539 		return -1000;
2540 
2541 	/* THINK  if (signal_pending) return ... ? */
2542 
2543 	iov.iov_base = buf;
2544 	iov.iov_len  = size;
2545 
2546 	msg.msg_name       = NULL;
2547 	msg.msg_namelen    = 0;
2548 	msg.msg_control    = NULL;
2549 	msg.msg_controllen = 0;
2550 	msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2551 
2552 	if (sock == mdev->data.socket) {
2553 		mdev->ko_count = mdev->net_conf->ko_count;
2554 		drbd_update_congested(mdev);
2555 	}
2556 	do {
2557 		/* STRANGE
2558 		 * tcp_sendmsg does _not_ use its size parameter at all ?
2559 		 *
2560 		 * -EAGAIN on timeout, -EINTR on signal.
2561 		 */
2562 /* THINK
2563  * do we need to block DRBD_SIG if sock == &meta.socket ??
2564  * otherwise wake_asender() might interrupt some send_*Ack !
2565  */
2566 		rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2567 		if (rv == -EAGAIN) {
2568 			if (we_should_drop_the_connection(mdev, sock))
2569 				break;
2570 			else
2571 				continue;
2572 		}
2573 		D_ASSERT(rv != 0);
2574 		if (rv == -EINTR) {
2575 			flush_signals(current);
2576 			rv = 0;
2577 		}
2578 		if (rv < 0)
2579 			break;
2580 		sent += rv;
2581 		iov.iov_base += rv;
2582 		iov.iov_len  -= rv;
2583 	} while (sent < size);
2584 
2585 	if (sock == mdev->data.socket)
2586 		clear_bit(NET_CONGESTED, &mdev->flags);
2587 
2588 	if (rv <= 0) {
2589 		if (rv != -EAGAIN) {
2590 			dev_err(DEV, "%s_sendmsg returned %d\n",
2591 			    sock == mdev->meta.socket ? "msock" : "sock",
2592 			    rv);
2593 			drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2594 		} else
2595 			drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2596 	}
2597 
2598 	return sent;
2599 }
2600 
2601 static int drbd_open(struct block_device *bdev, fmode_t mode)
2602 {
2603 	struct drbd_conf *mdev = bdev->bd_disk->private_data;
2604 	unsigned long flags;
2605 	int rv = 0;
2606 
2607 	spin_lock_irqsave(&mdev->req_lock, flags);
2608 	/* to have a stable mdev->state.role
2609 	 * and no race with updating open_cnt */
2610 
2611 	if (mdev->state.role != R_PRIMARY) {
2612 		if (mode & FMODE_WRITE)
2613 			rv = -EROFS;
2614 		else if (!allow_oos)
2615 			rv = -EMEDIUMTYPE;
2616 	}
2617 
2618 	if (!rv)
2619 		mdev->open_cnt++;
2620 	spin_unlock_irqrestore(&mdev->req_lock, flags);
2621 
2622 	return rv;
2623 }
2624 
2625 static int drbd_release(struct gendisk *gd, fmode_t mode)
2626 {
2627 	struct drbd_conf *mdev = gd->private_data;
2628 	mdev->open_cnt--;
2629 	return 0;
2630 }
2631 
2632 static void drbd_unplug_fn(struct request_queue *q)
2633 {
2634 	struct drbd_conf *mdev = q->queuedata;
2635 
2636 	/* unplug FIRST */
2637 	spin_lock_irq(q->queue_lock);
2638 	blk_remove_plug(q);
2639 	spin_unlock_irq(q->queue_lock);
2640 
2641 	/* only if connected */
2642 	spin_lock_irq(&mdev->req_lock);
2643 	if (mdev->state.pdsk >= D_INCONSISTENT && mdev->state.conn >= C_CONNECTED) {
2644 		D_ASSERT(mdev->state.role == R_PRIMARY);
2645 		if (test_and_clear_bit(UNPLUG_REMOTE, &mdev->flags)) {
2646 			/* add to the data.work queue,
2647 			 * unless already queued.
2648 			 * XXX this might be a good addition to drbd_queue_work
2649 			 * anyways, to detect "double queuing" ... */
2650 			if (list_empty(&mdev->unplug_work.list))
2651 				drbd_queue_work(&mdev->data.work,
2652 						&mdev->unplug_work);
2653 		}
2654 	}
2655 	spin_unlock_irq(&mdev->req_lock);
2656 
2657 	if (mdev->state.disk >= D_INCONSISTENT)
2658 		drbd_kick_lo(mdev);
2659 }
2660 
2661 static void drbd_set_defaults(struct drbd_conf *mdev)
2662 {
2663 	mdev->sync_conf.after      = DRBD_AFTER_DEF;
2664 	mdev->sync_conf.rate       = DRBD_RATE_DEF;
2665 	mdev->sync_conf.al_extents = DRBD_AL_EXTENTS_DEF;
2666 	mdev->state = (union drbd_state) {
2667 		{ .role = R_SECONDARY,
2668 		  .peer = R_UNKNOWN,
2669 		  .conn = C_STANDALONE,
2670 		  .disk = D_DISKLESS,
2671 		  .pdsk = D_UNKNOWN,
2672 		  .susp = 0
2673 		} };
2674 }
2675 
2676 void drbd_init_set_defaults(struct drbd_conf *mdev)
2677 {
2678 	/* the memset(,0,) did most of this.
2679 	 * note: only assignments, no allocation in here */
2680 
2681 	drbd_set_defaults(mdev);
2682 
2683 	/* for now, we do NOT yet support it,
2684 	 * even though we start some framework
2685 	 * to eventually support barriers */
2686 	set_bit(NO_BARRIER_SUPP, &mdev->flags);
2687 
2688 	atomic_set(&mdev->ap_bio_cnt, 0);
2689 	atomic_set(&mdev->ap_pending_cnt, 0);
2690 	atomic_set(&mdev->rs_pending_cnt, 0);
2691 	atomic_set(&mdev->unacked_cnt, 0);
2692 	atomic_set(&mdev->local_cnt, 0);
2693 	atomic_set(&mdev->net_cnt, 0);
2694 	atomic_set(&mdev->packet_seq, 0);
2695 	atomic_set(&mdev->pp_in_use, 0);
2696 
2697 	mutex_init(&mdev->md_io_mutex);
2698 	mutex_init(&mdev->data.mutex);
2699 	mutex_init(&mdev->meta.mutex);
2700 	sema_init(&mdev->data.work.s, 0);
2701 	sema_init(&mdev->meta.work.s, 0);
2702 	mutex_init(&mdev->state_mutex);
2703 
2704 	spin_lock_init(&mdev->data.work.q_lock);
2705 	spin_lock_init(&mdev->meta.work.q_lock);
2706 
2707 	spin_lock_init(&mdev->al_lock);
2708 	spin_lock_init(&mdev->req_lock);
2709 	spin_lock_init(&mdev->peer_seq_lock);
2710 	spin_lock_init(&mdev->epoch_lock);
2711 
2712 	INIT_LIST_HEAD(&mdev->active_ee);
2713 	INIT_LIST_HEAD(&mdev->sync_ee);
2714 	INIT_LIST_HEAD(&mdev->done_ee);
2715 	INIT_LIST_HEAD(&mdev->read_ee);
2716 	INIT_LIST_HEAD(&mdev->net_ee);
2717 	INIT_LIST_HEAD(&mdev->resync_reads);
2718 	INIT_LIST_HEAD(&mdev->data.work.q);
2719 	INIT_LIST_HEAD(&mdev->meta.work.q);
2720 	INIT_LIST_HEAD(&mdev->resync_work.list);
2721 	INIT_LIST_HEAD(&mdev->unplug_work.list);
2722 	INIT_LIST_HEAD(&mdev->md_sync_work.list);
2723 	INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2724 	INIT_LIST_HEAD(&mdev->delay_probes);
2725 	INIT_LIST_HEAD(&mdev->delay_probe_work.list);
2726 
2727 	mdev->resync_work.cb  = w_resync_inactive;
2728 	mdev->unplug_work.cb  = w_send_write_hint;
2729 	mdev->md_sync_work.cb = w_md_sync;
2730 	mdev->bm_io_work.w.cb = w_bitmap_io;
2731 	mdev->delay_probe_work.cb = w_delay_probes;
2732 	init_timer(&mdev->resync_timer);
2733 	init_timer(&mdev->md_sync_timer);
2734 	init_timer(&mdev->delay_probe_timer);
2735 	mdev->resync_timer.function = resync_timer_fn;
2736 	mdev->resync_timer.data = (unsigned long) mdev;
2737 	mdev->md_sync_timer.function = md_sync_timer_fn;
2738 	mdev->md_sync_timer.data = (unsigned long) mdev;
2739 	mdev->delay_probe_timer.function = delay_probe_timer_fn;
2740 	mdev->delay_probe_timer.data = (unsigned long) mdev;
2741 
2742 
2743 	init_waitqueue_head(&mdev->misc_wait);
2744 	init_waitqueue_head(&mdev->state_wait);
2745 	init_waitqueue_head(&mdev->ee_wait);
2746 	init_waitqueue_head(&mdev->al_wait);
2747 	init_waitqueue_head(&mdev->seq_wait);
2748 
2749 	drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
2750 	drbd_thread_init(mdev, &mdev->worker, drbd_worker);
2751 	drbd_thread_init(mdev, &mdev->asender, drbd_asender);
2752 
2753 	mdev->agreed_pro_version = PRO_VERSION_MAX;
2754 	mdev->write_ordering = WO_bio_barrier;
2755 	mdev->resync_wenr = LC_FREE;
2756 }
2757 
2758 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2759 {
2760 	if (mdev->receiver.t_state != None)
2761 		dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2762 				mdev->receiver.t_state);
2763 
2764 	/* no need to lock it, I'm the only thread alive */
2765 	if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2766 		dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2767 	mdev->al_writ_cnt  =
2768 	mdev->bm_writ_cnt  =
2769 	mdev->read_cnt     =
2770 	mdev->recv_cnt     =
2771 	mdev->send_cnt     =
2772 	mdev->writ_cnt     =
2773 	mdev->p_size       =
2774 	mdev->rs_start     =
2775 	mdev->rs_total     =
2776 	mdev->rs_failed    =
2777 	mdev->rs_mark_left =
2778 	mdev->rs_mark_time = 0;
2779 	D_ASSERT(mdev->net_conf == NULL);
2780 
2781 	drbd_set_my_capacity(mdev, 0);
2782 	if (mdev->bitmap) {
2783 		/* maybe never allocated. */
2784 		drbd_bm_resize(mdev, 0, 1);
2785 		drbd_bm_cleanup(mdev);
2786 	}
2787 
2788 	drbd_free_resources(mdev);
2789 
2790 	/*
2791 	 * currently we drbd_init_ee only on module load, so
2792 	 * we may do drbd_release_ee only on module unload!
2793 	 */
2794 	D_ASSERT(list_empty(&mdev->active_ee));
2795 	D_ASSERT(list_empty(&mdev->sync_ee));
2796 	D_ASSERT(list_empty(&mdev->done_ee));
2797 	D_ASSERT(list_empty(&mdev->read_ee));
2798 	D_ASSERT(list_empty(&mdev->net_ee));
2799 	D_ASSERT(list_empty(&mdev->resync_reads));
2800 	D_ASSERT(list_empty(&mdev->data.work.q));
2801 	D_ASSERT(list_empty(&mdev->meta.work.q));
2802 	D_ASSERT(list_empty(&mdev->resync_work.list));
2803 	D_ASSERT(list_empty(&mdev->unplug_work.list));
2804 
2805 }
2806 
2807 
2808 static void drbd_destroy_mempools(void)
2809 {
2810 	struct page *page;
2811 
2812 	while (drbd_pp_pool) {
2813 		page = drbd_pp_pool;
2814 		drbd_pp_pool = (struct page *)page_private(page);
2815 		__free_page(page);
2816 		drbd_pp_vacant--;
2817 	}
2818 
2819 	/* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2820 
2821 	if (drbd_ee_mempool)
2822 		mempool_destroy(drbd_ee_mempool);
2823 	if (drbd_request_mempool)
2824 		mempool_destroy(drbd_request_mempool);
2825 	if (drbd_ee_cache)
2826 		kmem_cache_destroy(drbd_ee_cache);
2827 	if (drbd_request_cache)
2828 		kmem_cache_destroy(drbd_request_cache);
2829 	if (drbd_bm_ext_cache)
2830 		kmem_cache_destroy(drbd_bm_ext_cache);
2831 	if (drbd_al_ext_cache)
2832 		kmem_cache_destroy(drbd_al_ext_cache);
2833 
2834 	drbd_ee_mempool      = NULL;
2835 	drbd_request_mempool = NULL;
2836 	drbd_ee_cache        = NULL;
2837 	drbd_request_cache   = NULL;
2838 	drbd_bm_ext_cache    = NULL;
2839 	drbd_al_ext_cache    = NULL;
2840 
2841 	return;
2842 }
2843 
2844 static int drbd_create_mempools(void)
2845 {
2846 	struct page *page;
2847 	const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
2848 	int i;
2849 
2850 	/* prepare our caches and mempools */
2851 	drbd_request_mempool = NULL;
2852 	drbd_ee_cache        = NULL;
2853 	drbd_request_cache   = NULL;
2854 	drbd_bm_ext_cache    = NULL;
2855 	drbd_al_ext_cache    = NULL;
2856 	drbd_pp_pool         = NULL;
2857 
2858 	/* caches */
2859 	drbd_request_cache = kmem_cache_create(
2860 		"drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2861 	if (drbd_request_cache == NULL)
2862 		goto Enomem;
2863 
2864 	drbd_ee_cache = kmem_cache_create(
2865 		"drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
2866 	if (drbd_ee_cache == NULL)
2867 		goto Enomem;
2868 
2869 	drbd_bm_ext_cache = kmem_cache_create(
2870 		"drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2871 	if (drbd_bm_ext_cache == NULL)
2872 		goto Enomem;
2873 
2874 	drbd_al_ext_cache = kmem_cache_create(
2875 		"drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2876 	if (drbd_al_ext_cache == NULL)
2877 		goto Enomem;
2878 
2879 	/* mempools */
2880 	drbd_request_mempool = mempool_create(number,
2881 		mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2882 	if (drbd_request_mempool == NULL)
2883 		goto Enomem;
2884 
2885 	drbd_ee_mempool = mempool_create(number,
2886 		mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2887 	if (drbd_request_mempool == NULL)
2888 		goto Enomem;
2889 
2890 	/* drbd's page pool */
2891 	spin_lock_init(&drbd_pp_lock);
2892 
2893 	for (i = 0; i < number; i++) {
2894 		page = alloc_page(GFP_HIGHUSER);
2895 		if (!page)
2896 			goto Enomem;
2897 		set_page_private(page, (unsigned long)drbd_pp_pool);
2898 		drbd_pp_pool = page;
2899 	}
2900 	drbd_pp_vacant = number;
2901 
2902 	return 0;
2903 
2904 Enomem:
2905 	drbd_destroy_mempools(); /* in case we allocated some */
2906 	return -ENOMEM;
2907 }
2908 
2909 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2910 	void *unused)
2911 {
2912 	/* just so we have it.  you never know what interesting things we
2913 	 * might want to do here some day...
2914 	 */
2915 
2916 	return NOTIFY_DONE;
2917 }
2918 
2919 static struct notifier_block drbd_notifier = {
2920 	.notifier_call = drbd_notify_sys,
2921 };
2922 
2923 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2924 {
2925 	int rr;
2926 
2927 	rr = drbd_release_ee(mdev, &mdev->active_ee);
2928 	if (rr)
2929 		dev_err(DEV, "%d EEs in active list found!\n", rr);
2930 
2931 	rr = drbd_release_ee(mdev, &mdev->sync_ee);
2932 	if (rr)
2933 		dev_err(DEV, "%d EEs in sync list found!\n", rr);
2934 
2935 	rr = drbd_release_ee(mdev, &mdev->read_ee);
2936 	if (rr)
2937 		dev_err(DEV, "%d EEs in read list found!\n", rr);
2938 
2939 	rr = drbd_release_ee(mdev, &mdev->done_ee);
2940 	if (rr)
2941 		dev_err(DEV, "%d EEs in done list found!\n", rr);
2942 
2943 	rr = drbd_release_ee(mdev, &mdev->net_ee);
2944 	if (rr)
2945 		dev_err(DEV, "%d EEs in net list found!\n", rr);
2946 }
2947 
2948 /* caution. no locking.
2949  * currently only used from module cleanup code. */
2950 static void drbd_delete_device(unsigned int minor)
2951 {
2952 	struct drbd_conf *mdev = minor_to_mdev(minor);
2953 
2954 	if (!mdev)
2955 		return;
2956 
2957 	/* paranoia asserts */
2958 	if (mdev->open_cnt != 0)
2959 		dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
2960 				__FILE__ , __LINE__);
2961 
2962 	ERR_IF (!list_empty(&mdev->data.work.q)) {
2963 		struct list_head *lp;
2964 		list_for_each(lp, &mdev->data.work.q) {
2965 			dev_err(DEV, "lp = %p\n", lp);
2966 		}
2967 	};
2968 	/* end paranoia asserts */
2969 
2970 	del_gendisk(mdev->vdisk);
2971 
2972 	/* cleanup stuff that may have been allocated during
2973 	 * device (re-)configuration or state changes */
2974 
2975 	if (mdev->this_bdev)
2976 		bdput(mdev->this_bdev);
2977 
2978 	drbd_free_resources(mdev);
2979 
2980 	drbd_release_ee_lists(mdev);
2981 
2982 	/* should be free'd on disconnect? */
2983 	kfree(mdev->ee_hash);
2984 	/*
2985 	mdev->ee_hash_s = 0;
2986 	mdev->ee_hash = NULL;
2987 	*/
2988 
2989 	lc_destroy(mdev->act_log);
2990 	lc_destroy(mdev->resync);
2991 
2992 	kfree(mdev->p_uuid);
2993 	/* mdev->p_uuid = NULL; */
2994 
2995 	kfree(mdev->int_dig_out);
2996 	kfree(mdev->int_dig_in);
2997 	kfree(mdev->int_dig_vv);
2998 
2999 	/* cleanup the rest that has been
3000 	 * allocated from drbd_new_device
3001 	 * and actually free the mdev itself */
3002 	drbd_free_mdev(mdev);
3003 }
3004 
3005 static void drbd_cleanup(void)
3006 {
3007 	unsigned int i;
3008 
3009 	unregister_reboot_notifier(&drbd_notifier);
3010 
3011 	drbd_nl_cleanup();
3012 
3013 	if (minor_table) {
3014 		if (drbd_proc)
3015 			remove_proc_entry("drbd", NULL);
3016 		i = minor_count;
3017 		while (i--)
3018 			drbd_delete_device(i);
3019 		drbd_destroy_mempools();
3020 	}
3021 
3022 	kfree(minor_table);
3023 
3024 	unregister_blkdev(DRBD_MAJOR, "drbd");
3025 
3026 	printk(KERN_INFO "drbd: module cleanup done.\n");
3027 }
3028 
3029 /**
3030  * drbd_congested() - Callback for pdflush
3031  * @congested_data:	User data
3032  * @bdi_bits:		Bits pdflush is currently interested in
3033  *
3034  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3035  */
3036 static int drbd_congested(void *congested_data, int bdi_bits)
3037 {
3038 	struct drbd_conf *mdev = congested_data;
3039 	struct request_queue *q;
3040 	char reason = '-';
3041 	int r = 0;
3042 
3043 	if (!__inc_ap_bio_cond(mdev)) {
3044 		/* DRBD has frozen IO */
3045 		r = bdi_bits;
3046 		reason = 'd';
3047 		goto out;
3048 	}
3049 
3050 	if (get_ldev(mdev)) {
3051 		q = bdev_get_queue(mdev->ldev->backing_bdev);
3052 		r = bdi_congested(&q->backing_dev_info, bdi_bits);
3053 		put_ldev(mdev);
3054 		if (r)
3055 			reason = 'b';
3056 	}
3057 
3058 	if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3059 		r |= (1 << BDI_async_congested);
3060 		reason = reason == 'b' ? 'a' : 'n';
3061 	}
3062 
3063 out:
3064 	mdev->congestion_reason = reason;
3065 	return r;
3066 }
3067 
3068 struct drbd_conf *drbd_new_device(unsigned int minor)
3069 {
3070 	struct drbd_conf *mdev;
3071 	struct gendisk *disk;
3072 	struct request_queue *q;
3073 
3074 	/* GFP_KERNEL, we are outside of all write-out paths */
3075 	mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3076 	if (!mdev)
3077 		return NULL;
3078 	if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3079 		goto out_no_cpumask;
3080 
3081 	mdev->minor = minor;
3082 
3083 	drbd_init_set_defaults(mdev);
3084 
3085 	q = blk_alloc_queue(GFP_KERNEL);
3086 	if (!q)
3087 		goto out_no_q;
3088 	mdev->rq_queue = q;
3089 	q->queuedata   = mdev;
3090 
3091 	disk = alloc_disk(1);
3092 	if (!disk)
3093 		goto out_no_disk;
3094 	mdev->vdisk = disk;
3095 
3096 	set_disk_ro(disk, TRUE);
3097 
3098 	disk->queue = q;
3099 	disk->major = DRBD_MAJOR;
3100 	disk->first_minor = minor;
3101 	disk->fops = &drbd_ops;
3102 	sprintf(disk->disk_name, "drbd%d", minor);
3103 	disk->private_data = mdev;
3104 
3105 	mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3106 	/* we have no partitions. we contain only ourselves. */
3107 	mdev->this_bdev->bd_contains = mdev->this_bdev;
3108 
3109 	q->backing_dev_info.congested_fn = drbd_congested;
3110 	q->backing_dev_info.congested_data = mdev;
3111 
3112 	blk_queue_make_request(q, drbd_make_request_26);
3113 	blk_queue_max_segment_size(q, DRBD_MAX_SEGMENT_SIZE);
3114 	blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3115 	blk_queue_merge_bvec(q, drbd_merge_bvec);
3116 	q->queue_lock = &mdev->req_lock; /* needed since we use */
3117 		/* plugging on a queue, that actually has no requests! */
3118 	q->unplug_fn = drbd_unplug_fn;
3119 
3120 	mdev->md_io_page = alloc_page(GFP_KERNEL);
3121 	if (!mdev->md_io_page)
3122 		goto out_no_io_page;
3123 
3124 	if (drbd_bm_init(mdev))
3125 		goto out_no_bitmap;
3126 	/* no need to lock access, we are still initializing this minor device. */
3127 	if (!tl_init(mdev))
3128 		goto out_no_tl;
3129 
3130 	mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3131 	if (!mdev->app_reads_hash)
3132 		goto out_no_app_reads;
3133 
3134 	mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3135 	if (!mdev->current_epoch)
3136 		goto out_no_epoch;
3137 
3138 	INIT_LIST_HEAD(&mdev->current_epoch->list);
3139 	mdev->epochs = 1;
3140 
3141 	return mdev;
3142 
3143 /* out_whatever_else:
3144 	kfree(mdev->current_epoch); */
3145 out_no_epoch:
3146 	kfree(mdev->app_reads_hash);
3147 out_no_app_reads:
3148 	tl_cleanup(mdev);
3149 out_no_tl:
3150 	drbd_bm_cleanup(mdev);
3151 out_no_bitmap:
3152 	__free_page(mdev->md_io_page);
3153 out_no_io_page:
3154 	put_disk(disk);
3155 out_no_disk:
3156 	blk_cleanup_queue(q);
3157 out_no_q:
3158 	free_cpumask_var(mdev->cpu_mask);
3159 out_no_cpumask:
3160 	kfree(mdev);
3161 	return NULL;
3162 }
3163 
3164 /* counterpart of drbd_new_device.
3165  * last part of drbd_delete_device. */
3166 void drbd_free_mdev(struct drbd_conf *mdev)
3167 {
3168 	kfree(mdev->current_epoch);
3169 	kfree(mdev->app_reads_hash);
3170 	tl_cleanup(mdev);
3171 	if (mdev->bitmap) /* should no longer be there. */
3172 		drbd_bm_cleanup(mdev);
3173 	__free_page(mdev->md_io_page);
3174 	put_disk(mdev->vdisk);
3175 	blk_cleanup_queue(mdev->rq_queue);
3176 	free_cpumask_var(mdev->cpu_mask);
3177 	kfree(mdev);
3178 }
3179 
3180 
3181 int __init drbd_init(void)
3182 {
3183 	int err;
3184 
3185 	if (sizeof(struct p_handshake) != 80) {
3186 		printk(KERN_ERR
3187 		       "drbd: never change the size or layout "
3188 		       "of the HandShake packet.\n");
3189 		return -EINVAL;
3190 	}
3191 
3192 	if (1 > minor_count || minor_count > 255) {
3193 		printk(KERN_ERR
3194 			"drbd: invalid minor_count (%d)\n", minor_count);
3195 #ifdef MODULE
3196 		return -EINVAL;
3197 #else
3198 		minor_count = 8;
3199 #endif
3200 	}
3201 
3202 	err = drbd_nl_init();
3203 	if (err)
3204 		return err;
3205 
3206 	err = register_blkdev(DRBD_MAJOR, "drbd");
3207 	if (err) {
3208 		printk(KERN_ERR
3209 		       "drbd: unable to register block device major %d\n",
3210 		       DRBD_MAJOR);
3211 		return err;
3212 	}
3213 
3214 	register_reboot_notifier(&drbd_notifier);
3215 
3216 	/*
3217 	 * allocate all necessary structs
3218 	 */
3219 	err = -ENOMEM;
3220 
3221 	init_waitqueue_head(&drbd_pp_wait);
3222 
3223 	drbd_proc = NULL; /* play safe for drbd_cleanup */
3224 	minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3225 				GFP_KERNEL);
3226 	if (!minor_table)
3227 		goto Enomem;
3228 
3229 	err = drbd_create_mempools();
3230 	if (err)
3231 		goto Enomem;
3232 
3233 	drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3234 	if (!drbd_proc)	{
3235 		printk(KERN_ERR "drbd: unable to register proc file\n");
3236 		goto Enomem;
3237 	}
3238 
3239 	rwlock_init(&global_state_lock);
3240 
3241 	printk(KERN_INFO "drbd: initialized. "
3242 	       "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3243 	       API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3244 	printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3245 	printk(KERN_INFO "drbd: registered as block device major %d\n",
3246 		DRBD_MAJOR);
3247 	printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3248 
3249 	return 0; /* Success! */
3250 
3251 Enomem:
3252 	drbd_cleanup();
3253 	if (err == -ENOMEM)
3254 		/* currently always the case */
3255 		printk(KERN_ERR "drbd: ran out of memory\n");
3256 	else
3257 		printk(KERN_ERR "drbd: initialization failure\n");
3258 	return err;
3259 }
3260 
3261 void drbd_free_bc(struct drbd_backing_dev *ldev)
3262 {
3263 	if (ldev == NULL)
3264 		return;
3265 
3266 	bd_release(ldev->backing_bdev);
3267 	bd_release(ldev->md_bdev);
3268 
3269 	fput(ldev->lo_file);
3270 	fput(ldev->md_file);
3271 
3272 	kfree(ldev);
3273 }
3274 
3275 void drbd_free_sock(struct drbd_conf *mdev)
3276 {
3277 	if (mdev->data.socket) {
3278 		mutex_lock(&mdev->data.mutex);
3279 		kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3280 		sock_release(mdev->data.socket);
3281 		mdev->data.socket = NULL;
3282 		mutex_unlock(&mdev->data.mutex);
3283 	}
3284 	if (mdev->meta.socket) {
3285 		mutex_lock(&mdev->meta.mutex);
3286 		kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3287 		sock_release(mdev->meta.socket);
3288 		mdev->meta.socket = NULL;
3289 		mutex_unlock(&mdev->meta.mutex);
3290 	}
3291 }
3292 
3293 
3294 void drbd_free_resources(struct drbd_conf *mdev)
3295 {
3296 	crypto_free_hash(mdev->csums_tfm);
3297 	mdev->csums_tfm = NULL;
3298 	crypto_free_hash(mdev->verify_tfm);
3299 	mdev->verify_tfm = NULL;
3300 	crypto_free_hash(mdev->cram_hmac_tfm);
3301 	mdev->cram_hmac_tfm = NULL;
3302 	crypto_free_hash(mdev->integrity_w_tfm);
3303 	mdev->integrity_w_tfm = NULL;
3304 	crypto_free_hash(mdev->integrity_r_tfm);
3305 	mdev->integrity_r_tfm = NULL;
3306 
3307 	drbd_free_sock(mdev);
3308 
3309 	__no_warn(local,
3310 		  drbd_free_bc(mdev->ldev);
3311 		  mdev->ldev = NULL;);
3312 }
3313 
3314 /* meta data management */
3315 
3316 struct meta_data_on_disk {
3317 	u64 la_size;           /* last agreed size. */
3318 	u64 uuid[UI_SIZE];   /* UUIDs. */
3319 	u64 device_uuid;
3320 	u64 reserved_u64_1;
3321 	u32 flags;             /* MDF */
3322 	u32 magic;
3323 	u32 md_size_sect;
3324 	u32 al_offset;         /* offset to this block */
3325 	u32 al_nr_extents;     /* important for restoring the AL */
3326 	      /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3327 	u32 bm_offset;         /* offset to the bitmap, from here */
3328 	u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3329 	u32 reserved_u32[4];
3330 
3331 } __packed;
3332 
3333 /**
3334  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3335  * @mdev:	DRBD device.
3336  */
3337 void drbd_md_sync(struct drbd_conf *mdev)
3338 {
3339 	struct meta_data_on_disk *buffer;
3340 	sector_t sector;
3341 	int i;
3342 
3343 	if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3344 		return;
3345 	del_timer(&mdev->md_sync_timer);
3346 
3347 	/* We use here D_FAILED and not D_ATTACHING because we try to write
3348 	 * metadata even if we detach due to a disk failure! */
3349 	if (!get_ldev_if_state(mdev, D_FAILED))
3350 		return;
3351 
3352 	mutex_lock(&mdev->md_io_mutex);
3353 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3354 	memset(buffer, 0, 512);
3355 
3356 	buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3357 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3358 		buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3359 	buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3360 	buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3361 
3362 	buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3363 	buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3364 	buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3365 	buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3366 	buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3367 
3368 	buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3369 
3370 	D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3371 	sector = mdev->ldev->md.md_offset;
3372 
3373 	if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3374 		clear_bit(MD_DIRTY, &mdev->flags);
3375 	} else {
3376 		/* this was a try anyways ... */
3377 		dev_err(DEV, "meta data update failed!\n");
3378 
3379 		drbd_chk_io_error(mdev, 1, TRUE);
3380 	}
3381 
3382 	/* Update mdev->ldev->md.la_size_sect,
3383 	 * since we updated it on metadata. */
3384 	mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3385 
3386 	mutex_unlock(&mdev->md_io_mutex);
3387 	put_ldev(mdev);
3388 }
3389 
3390 /**
3391  * drbd_md_read() - Reads in the meta data super block
3392  * @mdev:	DRBD device.
3393  * @bdev:	Device from which the meta data should be read in.
3394  *
3395  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_codes in case
3396  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3397  */
3398 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3399 {
3400 	struct meta_data_on_disk *buffer;
3401 	int i, rv = NO_ERROR;
3402 
3403 	if (!get_ldev_if_state(mdev, D_ATTACHING))
3404 		return ERR_IO_MD_DISK;
3405 
3406 	mutex_lock(&mdev->md_io_mutex);
3407 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3408 
3409 	if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3410 		/* NOTE: cant do normal error processing here as this is
3411 		   called BEFORE disk is attached */
3412 		dev_err(DEV, "Error while reading metadata.\n");
3413 		rv = ERR_IO_MD_DISK;
3414 		goto err;
3415 	}
3416 
3417 	if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3418 		dev_err(DEV, "Error while reading metadata, magic not found.\n");
3419 		rv = ERR_MD_INVALID;
3420 		goto err;
3421 	}
3422 	if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3423 		dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3424 		    be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3425 		rv = ERR_MD_INVALID;
3426 		goto err;
3427 	}
3428 	if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3429 		dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3430 		    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3431 		rv = ERR_MD_INVALID;
3432 		goto err;
3433 	}
3434 	if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3435 		dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3436 		    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3437 		rv = ERR_MD_INVALID;
3438 		goto err;
3439 	}
3440 
3441 	if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3442 		dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3443 		    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3444 		rv = ERR_MD_INVALID;
3445 		goto err;
3446 	}
3447 
3448 	bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3449 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3450 		bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3451 	bdev->md.flags = be32_to_cpu(buffer->flags);
3452 	mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3453 	bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3454 
3455 	if (mdev->sync_conf.al_extents < 7)
3456 		mdev->sync_conf.al_extents = 127;
3457 
3458  err:
3459 	mutex_unlock(&mdev->md_io_mutex);
3460 	put_ldev(mdev);
3461 
3462 	return rv;
3463 }
3464 
3465 /**
3466  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3467  * @mdev:	DRBD device.
3468  *
3469  * Call this function if you change anything that should be written to
3470  * the meta-data super block. This function sets MD_DIRTY, and starts a
3471  * timer that ensures that within five seconds you have to call drbd_md_sync().
3472  */
3473 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3474 {
3475 	set_bit(MD_DIRTY, &mdev->flags);
3476 	mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3477 }
3478 
3479 
3480 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3481 {
3482 	int i;
3483 
3484 	for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3485 		mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3486 }
3487 
3488 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3489 {
3490 	if (idx == UI_CURRENT) {
3491 		if (mdev->state.role == R_PRIMARY)
3492 			val |= 1;
3493 		else
3494 			val &= ~((u64)1);
3495 
3496 		drbd_set_ed_uuid(mdev, val);
3497 	}
3498 
3499 	mdev->ldev->md.uuid[idx] = val;
3500 	drbd_md_mark_dirty(mdev);
3501 }
3502 
3503 
3504 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3505 {
3506 	if (mdev->ldev->md.uuid[idx]) {
3507 		drbd_uuid_move_history(mdev);
3508 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3509 	}
3510 	_drbd_uuid_set(mdev, idx, val);
3511 }
3512 
3513 /**
3514  * drbd_uuid_new_current() - Creates a new current UUID
3515  * @mdev:	DRBD device.
3516  *
3517  * Creates a new current UUID, and rotates the old current UUID into
3518  * the bitmap slot. Causes an incremental resync upon next connect.
3519  */
3520 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3521 {
3522 	u64 val;
3523 
3524 	dev_info(DEV, "Creating new current UUID\n");
3525 	D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3526 	mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3527 
3528 	get_random_bytes(&val, sizeof(u64));
3529 	_drbd_uuid_set(mdev, UI_CURRENT, val);
3530 }
3531 
3532 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3533 {
3534 	if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3535 		return;
3536 
3537 	if (val == 0) {
3538 		drbd_uuid_move_history(mdev);
3539 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3540 		mdev->ldev->md.uuid[UI_BITMAP] = 0;
3541 	} else {
3542 		if (mdev->ldev->md.uuid[UI_BITMAP])
3543 			dev_warn(DEV, "bm UUID already set");
3544 
3545 		mdev->ldev->md.uuid[UI_BITMAP] = val;
3546 		mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3547 
3548 	}
3549 	drbd_md_mark_dirty(mdev);
3550 }
3551 
3552 /**
3553  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3554  * @mdev:	DRBD device.
3555  *
3556  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3557  */
3558 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3559 {
3560 	int rv = -EIO;
3561 
3562 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3563 		drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3564 		drbd_md_sync(mdev);
3565 		drbd_bm_set_all(mdev);
3566 
3567 		rv = drbd_bm_write(mdev);
3568 
3569 		if (!rv) {
3570 			drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3571 			drbd_md_sync(mdev);
3572 		}
3573 
3574 		put_ldev(mdev);
3575 	}
3576 
3577 	return rv;
3578 }
3579 
3580 /**
3581  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3582  * @mdev:	DRBD device.
3583  *
3584  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3585  */
3586 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3587 {
3588 	int rv = -EIO;
3589 
3590 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3591 		drbd_bm_clear_all(mdev);
3592 		rv = drbd_bm_write(mdev);
3593 		put_ldev(mdev);
3594 	}
3595 
3596 	return rv;
3597 }
3598 
3599 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3600 {
3601 	struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3602 	int rv;
3603 
3604 	D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3605 
3606 	drbd_bm_lock(mdev, work->why);
3607 	rv = work->io_fn(mdev);
3608 	drbd_bm_unlock(mdev);
3609 
3610 	clear_bit(BITMAP_IO, &mdev->flags);
3611 	wake_up(&mdev->misc_wait);
3612 
3613 	if (work->done)
3614 		work->done(mdev, rv);
3615 
3616 	clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3617 	work->why = NULL;
3618 
3619 	return 1;
3620 }
3621 
3622 /**
3623  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3624  * @mdev:	DRBD device.
3625  * @io_fn:	IO callback to be called when bitmap IO is possible
3626  * @done:	callback to be called after the bitmap IO was performed
3627  * @why:	Descriptive text of the reason for doing the IO
3628  *
3629  * While IO on the bitmap happens we freeze application IO thus we ensure
3630  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3631  * called from worker context. It MUST NOT be used while a previous such
3632  * work is still pending!
3633  */
3634 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3635 			  int (*io_fn)(struct drbd_conf *),
3636 			  void (*done)(struct drbd_conf *, int),
3637 			  char *why)
3638 {
3639 	D_ASSERT(current == mdev->worker.task);
3640 
3641 	D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3642 	D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3643 	D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3644 	if (mdev->bm_io_work.why)
3645 		dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3646 			why, mdev->bm_io_work.why);
3647 
3648 	mdev->bm_io_work.io_fn = io_fn;
3649 	mdev->bm_io_work.done = done;
3650 	mdev->bm_io_work.why = why;
3651 
3652 	set_bit(BITMAP_IO, &mdev->flags);
3653 	if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3654 		if (list_empty(&mdev->bm_io_work.w.list)) {
3655 			set_bit(BITMAP_IO_QUEUED, &mdev->flags);
3656 			drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
3657 		} else
3658 			dev_err(DEV, "FIXME avoided double queuing bm_io_work\n");
3659 	}
3660 }
3661 
3662 /**
3663  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3664  * @mdev:	DRBD device.
3665  * @io_fn:	IO callback to be called when bitmap IO is possible
3666  * @why:	Descriptive text of the reason for doing the IO
3667  *
3668  * freezes application IO while that the actual IO operations runs. This
3669  * functions MAY NOT be called from worker context.
3670  */
3671 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
3672 {
3673 	int rv;
3674 
3675 	D_ASSERT(current != mdev->worker.task);
3676 
3677 	drbd_suspend_io(mdev);
3678 
3679 	drbd_bm_lock(mdev, why);
3680 	rv = io_fn(mdev);
3681 	drbd_bm_unlock(mdev);
3682 
3683 	drbd_resume_io(mdev);
3684 
3685 	return rv;
3686 }
3687 
3688 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3689 {
3690 	if ((mdev->ldev->md.flags & flag) != flag) {
3691 		drbd_md_mark_dirty(mdev);
3692 		mdev->ldev->md.flags |= flag;
3693 	}
3694 }
3695 
3696 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3697 {
3698 	if ((mdev->ldev->md.flags & flag) != 0) {
3699 		drbd_md_mark_dirty(mdev);
3700 		mdev->ldev->md.flags &= ~flag;
3701 	}
3702 }
3703 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3704 {
3705 	return (bdev->md.flags & flag) != 0;
3706 }
3707 
3708 static void md_sync_timer_fn(unsigned long data)
3709 {
3710 	struct drbd_conf *mdev = (struct drbd_conf *) data;
3711 
3712 	drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
3713 }
3714 
3715 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3716 {
3717 	dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3718 	drbd_md_sync(mdev);
3719 
3720 	return 1;
3721 }
3722 
3723 #ifdef CONFIG_DRBD_FAULT_INJECTION
3724 /* Fault insertion support including random number generator shamelessly
3725  * stolen from kernel/rcutorture.c */
3726 struct fault_random_state {
3727 	unsigned long state;
3728 	unsigned long count;
3729 };
3730 
3731 #define FAULT_RANDOM_MULT 39916801  /* prime */
3732 #define FAULT_RANDOM_ADD	479001701 /* prime */
3733 #define FAULT_RANDOM_REFRESH 10000
3734 
3735 /*
3736  * Crude but fast random-number generator.  Uses a linear congruential
3737  * generator, with occasional help from get_random_bytes().
3738  */
3739 static unsigned long
3740 _drbd_fault_random(struct fault_random_state *rsp)
3741 {
3742 	long refresh;
3743 
3744 	if (!rsp->count--) {
3745 		get_random_bytes(&refresh, sizeof(refresh));
3746 		rsp->state += refresh;
3747 		rsp->count = FAULT_RANDOM_REFRESH;
3748 	}
3749 	rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3750 	return swahw32(rsp->state);
3751 }
3752 
3753 static char *
3754 _drbd_fault_str(unsigned int type) {
3755 	static char *_faults[] = {
3756 		[DRBD_FAULT_MD_WR] = "Meta-data write",
3757 		[DRBD_FAULT_MD_RD] = "Meta-data read",
3758 		[DRBD_FAULT_RS_WR] = "Resync write",
3759 		[DRBD_FAULT_RS_RD] = "Resync read",
3760 		[DRBD_FAULT_DT_WR] = "Data write",
3761 		[DRBD_FAULT_DT_RD] = "Data read",
3762 		[DRBD_FAULT_DT_RA] = "Data read ahead",
3763 		[DRBD_FAULT_BM_ALLOC] = "BM allocation",
3764 		[DRBD_FAULT_AL_EE] = "EE allocation",
3765 		[DRBD_FAULT_RECEIVE] = "receive data corruption",
3766 	};
3767 
3768 	return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3769 }
3770 
3771 unsigned int
3772 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3773 {
3774 	static struct fault_random_state rrs = {0, 0};
3775 
3776 	unsigned int ret = (
3777 		(fault_devs == 0 ||
3778 			((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3779 		(((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3780 
3781 	if (ret) {
3782 		fault_count++;
3783 
3784 		if (__ratelimit(&drbd_ratelimit_state))
3785 			dev_warn(DEV, "***Simulating %s failure\n",
3786 				_drbd_fault_str(type));
3787 	}
3788 
3789 	return ret;
3790 }
3791 #endif
3792 
3793 const char *drbd_buildtag(void)
3794 {
3795 	/* DRBD built from external sources has here a reference to the
3796 	   git hash of the source code. */
3797 
3798 	static char buildtag[38] = "\0uilt-in";
3799 
3800 	if (buildtag[0] == 0) {
3801 #ifdef CONFIG_MODULES
3802 		if (THIS_MODULE != NULL)
3803 			sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3804 		else
3805 #endif
3806 			buildtag[0] = 'b';
3807 	}
3808 
3809 	return buildtag;
3810 }
3811 
3812 module_init(drbd_init)
3813 module_exit(drbd_cleanup)
3814 
3815 EXPORT_SYMBOL(drbd_conn_str);
3816 EXPORT_SYMBOL(drbd_role_str);
3817 EXPORT_SYMBOL(drbd_disk_str);
3818 EXPORT_SYMBOL(drbd_set_st_err_str);
3819