xref: /openbmc/linux/drivers/block/drbd/drbd_main.c (revision 1fa6ac37)
1 /*
2    drbd.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12 
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17 
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22 
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 
27  */
28 
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/smp_lock.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48 
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52 
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56 
57 #include "drbd_vli.h"
58 
59 struct after_state_chg_work {
60 	struct drbd_work w;
61 	union drbd_state os;
62 	union drbd_state ns;
63 	enum chg_state_flags flags;
64 	struct completion *done;
65 };
66 
67 int drbdd_init(struct drbd_thread *);
68 int drbd_worker(struct drbd_thread *);
69 int drbd_asender(struct drbd_thread *);
70 
71 int drbd_init(void);
72 static int drbd_open(struct block_device *bdev, fmode_t mode);
73 static int drbd_release(struct gendisk *gd, fmode_t mode);
74 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
75 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
76 			   union drbd_state ns, enum chg_state_flags flags);
77 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
78 static void md_sync_timer_fn(unsigned long data);
79 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
80 
81 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
82 	      "Lars Ellenberg <lars@linbit.com>");
83 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
84 MODULE_VERSION(REL_VERSION);
85 MODULE_LICENSE("GPL");
86 MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
87 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
88 
89 #include <linux/moduleparam.h>
90 /* allow_open_on_secondary */
91 MODULE_PARM_DESC(allow_oos, "DONT USE!");
92 /* thanks to these macros, if compiled into the kernel (not-module),
93  * this becomes the boot parameter drbd.minor_count */
94 module_param(minor_count, uint, 0444);
95 module_param(disable_sendpage, bool, 0644);
96 module_param(allow_oos, bool, 0);
97 module_param(cn_idx, uint, 0444);
98 module_param(proc_details, int, 0644);
99 
100 #ifdef CONFIG_DRBD_FAULT_INJECTION
101 int enable_faults;
102 int fault_rate;
103 static int fault_count;
104 int fault_devs;
105 /* bitmap of enabled faults */
106 module_param(enable_faults, int, 0664);
107 /* fault rate % value - applies to all enabled faults */
108 module_param(fault_rate, int, 0664);
109 /* count of faults inserted */
110 module_param(fault_count, int, 0664);
111 /* bitmap of devices to insert faults on */
112 module_param(fault_devs, int, 0644);
113 #endif
114 
115 /* module parameter, defined */
116 unsigned int minor_count = 32;
117 int disable_sendpage;
118 int allow_oos;
119 unsigned int cn_idx = CN_IDX_DRBD;
120 int proc_details;       /* Detail level in proc drbd*/
121 
122 /* Module parameter for setting the user mode helper program
123  * to run. Default is /sbin/drbdadm */
124 char usermode_helper[80] = "/sbin/drbdadm";
125 
126 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
127 
128 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
129  * as member "struct gendisk *vdisk;"
130  */
131 struct drbd_conf **minor_table;
132 
133 struct kmem_cache *drbd_request_cache;
134 struct kmem_cache *drbd_ee_cache;	/* epoch entries */
135 struct kmem_cache *drbd_bm_ext_cache;	/* bitmap extents */
136 struct kmem_cache *drbd_al_ext_cache;	/* activity log extents */
137 mempool_t *drbd_request_mempool;
138 mempool_t *drbd_ee_mempool;
139 
140 /* I do not use a standard mempool, because:
141    1) I want to hand out the pre-allocated objects first.
142    2) I want to be able to interrupt sleeping allocation with a signal.
143    Note: This is a single linked list, the next pointer is the private
144 	 member of struct page.
145  */
146 struct page *drbd_pp_pool;
147 spinlock_t   drbd_pp_lock;
148 int          drbd_pp_vacant;
149 wait_queue_head_t drbd_pp_wait;
150 
151 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
152 
153 static const struct block_device_operations drbd_ops = {
154 	.owner =   THIS_MODULE,
155 	.open =    drbd_open,
156 	.release = drbd_release,
157 };
158 
159 #define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
160 
161 #ifdef __CHECKER__
162 /* When checking with sparse, and this is an inline function, sparse will
163    give tons of false positives. When this is a real functions sparse works.
164  */
165 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
166 {
167 	int io_allowed;
168 
169 	atomic_inc(&mdev->local_cnt);
170 	io_allowed = (mdev->state.disk >= mins);
171 	if (!io_allowed) {
172 		if (atomic_dec_and_test(&mdev->local_cnt))
173 			wake_up(&mdev->misc_wait);
174 	}
175 	return io_allowed;
176 }
177 
178 #endif
179 
180 /**
181  * DOC: The transfer log
182  *
183  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
184  * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
185  * of the list. There is always at least one &struct drbd_tl_epoch object.
186  *
187  * Each &struct drbd_tl_epoch has a circular double linked list of requests
188  * attached.
189  */
190 static int tl_init(struct drbd_conf *mdev)
191 {
192 	struct drbd_tl_epoch *b;
193 
194 	/* during device minor initialization, we may well use GFP_KERNEL */
195 	b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
196 	if (!b)
197 		return 0;
198 	INIT_LIST_HEAD(&b->requests);
199 	INIT_LIST_HEAD(&b->w.list);
200 	b->next = NULL;
201 	b->br_number = 4711;
202 	b->n_req = 0;
203 	b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
204 
205 	mdev->oldest_tle = b;
206 	mdev->newest_tle = b;
207 	INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
208 
209 	mdev->tl_hash = NULL;
210 	mdev->tl_hash_s = 0;
211 
212 	return 1;
213 }
214 
215 static void tl_cleanup(struct drbd_conf *mdev)
216 {
217 	D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
218 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
219 	kfree(mdev->oldest_tle);
220 	mdev->oldest_tle = NULL;
221 	kfree(mdev->unused_spare_tle);
222 	mdev->unused_spare_tle = NULL;
223 	kfree(mdev->tl_hash);
224 	mdev->tl_hash = NULL;
225 	mdev->tl_hash_s = 0;
226 }
227 
228 /**
229  * _tl_add_barrier() - Adds a barrier to the transfer log
230  * @mdev:	DRBD device.
231  * @new:	Barrier to be added before the current head of the TL.
232  *
233  * The caller must hold the req_lock.
234  */
235 void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
236 {
237 	struct drbd_tl_epoch *newest_before;
238 
239 	INIT_LIST_HEAD(&new->requests);
240 	INIT_LIST_HEAD(&new->w.list);
241 	new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
242 	new->next = NULL;
243 	new->n_req = 0;
244 
245 	newest_before = mdev->newest_tle;
246 	/* never send a barrier number == 0, because that is special-cased
247 	 * when using TCQ for our write ordering code */
248 	new->br_number = (newest_before->br_number+1) ?: 1;
249 	if (mdev->newest_tle != new) {
250 		mdev->newest_tle->next = new;
251 		mdev->newest_tle = new;
252 	}
253 }
254 
255 /**
256  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
257  * @mdev:	DRBD device.
258  * @barrier_nr:	Expected identifier of the DRBD write barrier packet.
259  * @set_size:	Expected number of requests before that barrier.
260  *
261  * In case the passed barrier_nr or set_size does not match the oldest
262  * &struct drbd_tl_epoch objects this function will cause a termination
263  * of the connection.
264  */
265 void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
266 		       unsigned int set_size)
267 {
268 	struct drbd_tl_epoch *b, *nob; /* next old barrier */
269 	struct list_head *le, *tle;
270 	struct drbd_request *r;
271 
272 	spin_lock_irq(&mdev->req_lock);
273 
274 	b = mdev->oldest_tle;
275 
276 	/* first some paranoia code */
277 	if (b == NULL) {
278 		dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
279 			barrier_nr);
280 		goto bail;
281 	}
282 	if (b->br_number != barrier_nr) {
283 		dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
284 			barrier_nr, b->br_number);
285 		goto bail;
286 	}
287 	if (b->n_req != set_size) {
288 		dev_err(DEV, "BAD! BarrierAck #%u received with n_req=%u, expected n_req=%u!\n",
289 			barrier_nr, set_size, b->n_req);
290 		goto bail;
291 	}
292 
293 	/* Clean up list of requests processed during current epoch */
294 	list_for_each_safe(le, tle, &b->requests) {
295 		r = list_entry(le, struct drbd_request, tl_requests);
296 		_req_mod(r, barrier_acked);
297 	}
298 	/* There could be requests on the list waiting for completion
299 	   of the write to the local disk. To avoid corruptions of
300 	   slab's data structures we have to remove the lists head.
301 
302 	   Also there could have been a barrier ack out of sequence, overtaking
303 	   the write acks - which would be a bug and violating write ordering.
304 	   To not deadlock in case we lose connection while such requests are
305 	   still pending, we need some way to find them for the
306 	   _req_mode(connection_lost_while_pending).
307 
308 	   These have been list_move'd to the out_of_sequence_requests list in
309 	   _req_mod(, barrier_acked) above.
310 	   */
311 	list_del_init(&b->requests);
312 
313 	nob = b->next;
314 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
315 		_tl_add_barrier(mdev, b);
316 		if (nob)
317 			mdev->oldest_tle = nob;
318 		/* if nob == NULL b was the only barrier, and becomes the new
319 		   barrier. Therefore mdev->oldest_tle points already to b */
320 	} else {
321 		D_ASSERT(nob != NULL);
322 		mdev->oldest_tle = nob;
323 		kfree(b);
324 	}
325 
326 	spin_unlock_irq(&mdev->req_lock);
327 	dec_ap_pending(mdev);
328 
329 	return;
330 
331 bail:
332 	spin_unlock_irq(&mdev->req_lock);
333 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
334 }
335 
336 
337 /**
338  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
339  * @mdev:	DRBD device.
340  *
341  * This is called after the connection to the peer was lost. The storage covered
342  * by the requests on the transfer gets marked as our of sync. Called from the
343  * receiver thread and the worker thread.
344  */
345 void tl_clear(struct drbd_conf *mdev)
346 {
347 	struct drbd_tl_epoch *b, *tmp;
348 	struct list_head *le, *tle;
349 	struct drbd_request *r;
350 	int new_initial_bnr = net_random();
351 
352 	spin_lock_irq(&mdev->req_lock);
353 
354 	b = mdev->oldest_tle;
355 	while (b) {
356 		list_for_each_safe(le, tle, &b->requests) {
357 			r = list_entry(le, struct drbd_request, tl_requests);
358 			/* It would be nice to complete outside of spinlock.
359 			 * But this is easier for now. */
360 			_req_mod(r, connection_lost_while_pending);
361 		}
362 		tmp = b->next;
363 
364 		/* there could still be requests on that ring list,
365 		 * in case local io is still pending */
366 		list_del(&b->requests);
367 
368 		/* dec_ap_pending corresponding to queue_barrier.
369 		 * the newest barrier may not have been queued yet,
370 		 * in which case w.cb is still NULL. */
371 		if (b->w.cb != NULL)
372 			dec_ap_pending(mdev);
373 
374 		if (b == mdev->newest_tle) {
375 			/* recycle, but reinit! */
376 			D_ASSERT(tmp == NULL);
377 			INIT_LIST_HEAD(&b->requests);
378 			INIT_LIST_HEAD(&b->w.list);
379 			b->w.cb = NULL;
380 			b->br_number = new_initial_bnr;
381 			b->n_req = 0;
382 
383 			mdev->oldest_tle = b;
384 			break;
385 		}
386 		kfree(b);
387 		b = tmp;
388 	}
389 
390 	/* we expect this list to be empty. */
391 	D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
392 
393 	/* but just in case, clean it up anyways! */
394 	list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
395 		r = list_entry(le, struct drbd_request, tl_requests);
396 		/* It would be nice to complete outside of spinlock.
397 		 * But this is easier for now. */
398 		_req_mod(r, connection_lost_while_pending);
399 	}
400 
401 	/* ensure bit indicating barrier is required is clear */
402 	clear_bit(CREATE_BARRIER, &mdev->flags);
403 
404 	spin_unlock_irq(&mdev->req_lock);
405 }
406 
407 /**
408  * cl_wide_st_chg() - TRUE if the state change is a cluster wide one
409  * @mdev:	DRBD device.
410  * @os:		old (current) state.
411  * @ns:		new (wanted) state.
412  */
413 static int cl_wide_st_chg(struct drbd_conf *mdev,
414 			  union drbd_state os, union drbd_state ns)
415 {
416 	return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
417 		 ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
418 		  (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
419 		  (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
420 		  (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
421 		(os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
422 		(os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
423 }
424 
425 int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
426 		      union drbd_state mask, union drbd_state val)
427 {
428 	unsigned long flags;
429 	union drbd_state os, ns;
430 	int rv;
431 
432 	spin_lock_irqsave(&mdev->req_lock, flags);
433 	os = mdev->state;
434 	ns.i = (os.i & ~mask.i) | val.i;
435 	rv = _drbd_set_state(mdev, ns, f, NULL);
436 	ns = mdev->state;
437 	spin_unlock_irqrestore(&mdev->req_lock, flags);
438 
439 	return rv;
440 }
441 
442 /**
443  * drbd_force_state() - Impose a change which happens outside our control on our state
444  * @mdev:	DRBD device.
445  * @mask:	mask of state bits to change.
446  * @val:	value of new state bits.
447  */
448 void drbd_force_state(struct drbd_conf *mdev,
449 	union drbd_state mask, union drbd_state val)
450 {
451 	drbd_change_state(mdev, CS_HARD, mask, val);
452 }
453 
454 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
455 static int is_valid_state_transition(struct drbd_conf *,
456 				     union drbd_state, union drbd_state);
457 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
458 				       union drbd_state ns, int *warn_sync_abort);
459 int drbd_send_state_req(struct drbd_conf *,
460 			union drbd_state, union drbd_state);
461 
462 static enum drbd_state_ret_codes _req_st_cond(struct drbd_conf *mdev,
463 				    union drbd_state mask, union drbd_state val)
464 {
465 	union drbd_state os, ns;
466 	unsigned long flags;
467 	int rv;
468 
469 	if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
470 		return SS_CW_SUCCESS;
471 
472 	if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
473 		return SS_CW_FAILED_BY_PEER;
474 
475 	rv = 0;
476 	spin_lock_irqsave(&mdev->req_lock, flags);
477 	os = mdev->state;
478 	ns.i = (os.i & ~mask.i) | val.i;
479 	ns = sanitize_state(mdev, os, ns, NULL);
480 
481 	if (!cl_wide_st_chg(mdev, os, ns))
482 		rv = SS_CW_NO_NEED;
483 	if (!rv) {
484 		rv = is_valid_state(mdev, ns);
485 		if (rv == SS_SUCCESS) {
486 			rv = is_valid_state_transition(mdev, ns, os);
487 			if (rv == SS_SUCCESS)
488 				rv = 0; /* cont waiting, otherwise fail. */
489 		}
490 	}
491 	spin_unlock_irqrestore(&mdev->req_lock, flags);
492 
493 	return rv;
494 }
495 
496 /**
497  * drbd_req_state() - Perform an eventually cluster wide state change
498  * @mdev:	DRBD device.
499  * @mask:	mask of state bits to change.
500  * @val:	value of new state bits.
501  * @f:		flags
502  *
503  * Should not be called directly, use drbd_request_state() or
504  * _drbd_request_state().
505  */
506 static int drbd_req_state(struct drbd_conf *mdev,
507 			  union drbd_state mask, union drbd_state val,
508 			  enum chg_state_flags f)
509 {
510 	struct completion done;
511 	unsigned long flags;
512 	union drbd_state os, ns;
513 	int rv;
514 
515 	init_completion(&done);
516 
517 	if (f & CS_SERIALIZE)
518 		mutex_lock(&mdev->state_mutex);
519 
520 	spin_lock_irqsave(&mdev->req_lock, flags);
521 	os = mdev->state;
522 	ns.i = (os.i & ~mask.i) | val.i;
523 	ns = sanitize_state(mdev, os, ns, NULL);
524 
525 	if (cl_wide_st_chg(mdev, os, ns)) {
526 		rv = is_valid_state(mdev, ns);
527 		if (rv == SS_SUCCESS)
528 			rv = is_valid_state_transition(mdev, ns, os);
529 		spin_unlock_irqrestore(&mdev->req_lock, flags);
530 
531 		if (rv < SS_SUCCESS) {
532 			if (f & CS_VERBOSE)
533 				print_st_err(mdev, os, ns, rv);
534 			goto abort;
535 		}
536 
537 		drbd_state_lock(mdev);
538 		if (!drbd_send_state_req(mdev, mask, val)) {
539 			drbd_state_unlock(mdev);
540 			rv = SS_CW_FAILED_BY_PEER;
541 			if (f & CS_VERBOSE)
542 				print_st_err(mdev, os, ns, rv);
543 			goto abort;
544 		}
545 
546 		wait_event(mdev->state_wait,
547 			(rv = _req_st_cond(mdev, mask, val)));
548 
549 		if (rv < SS_SUCCESS) {
550 			drbd_state_unlock(mdev);
551 			if (f & CS_VERBOSE)
552 				print_st_err(mdev, os, ns, rv);
553 			goto abort;
554 		}
555 		spin_lock_irqsave(&mdev->req_lock, flags);
556 		os = mdev->state;
557 		ns.i = (os.i & ~mask.i) | val.i;
558 		rv = _drbd_set_state(mdev, ns, f, &done);
559 		drbd_state_unlock(mdev);
560 	} else {
561 		rv = _drbd_set_state(mdev, ns, f, &done);
562 	}
563 
564 	spin_unlock_irqrestore(&mdev->req_lock, flags);
565 
566 	if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
567 		D_ASSERT(current != mdev->worker.task);
568 		wait_for_completion(&done);
569 	}
570 
571 abort:
572 	if (f & CS_SERIALIZE)
573 		mutex_unlock(&mdev->state_mutex);
574 
575 	return rv;
576 }
577 
578 /**
579  * _drbd_request_state() - Request a state change (with flags)
580  * @mdev:	DRBD device.
581  * @mask:	mask of state bits to change.
582  * @val:	value of new state bits.
583  * @f:		flags
584  *
585  * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
586  * flag, or when logging of failed state change requests is not desired.
587  */
588 int _drbd_request_state(struct drbd_conf *mdev,	union drbd_state mask,
589 			union drbd_state val,	enum chg_state_flags f)
590 {
591 	int rv;
592 
593 	wait_event(mdev->state_wait,
594 		   (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
595 
596 	return rv;
597 }
598 
599 static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
600 {
601 	dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
602 	    name,
603 	    drbd_conn_str(ns.conn),
604 	    drbd_role_str(ns.role),
605 	    drbd_role_str(ns.peer),
606 	    drbd_disk_str(ns.disk),
607 	    drbd_disk_str(ns.pdsk),
608 	    ns.susp ? 's' : 'r',
609 	    ns.aftr_isp ? 'a' : '-',
610 	    ns.peer_isp ? 'p' : '-',
611 	    ns.user_isp ? 'u' : '-'
612 	    );
613 }
614 
615 void print_st_err(struct drbd_conf *mdev,
616 	union drbd_state os, union drbd_state ns, int err)
617 {
618 	if (err == SS_IN_TRANSIENT_STATE)
619 		return;
620 	dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
621 	print_st(mdev, " state", os);
622 	print_st(mdev, "wanted", ns);
623 }
624 
625 
626 #define drbd_peer_str drbd_role_str
627 #define drbd_pdsk_str drbd_disk_str
628 
629 #define drbd_susp_str(A)     ((A) ? "1" : "0")
630 #define drbd_aftr_isp_str(A) ((A) ? "1" : "0")
631 #define drbd_peer_isp_str(A) ((A) ? "1" : "0")
632 #define drbd_user_isp_str(A) ((A) ? "1" : "0")
633 
634 #define PSC(A) \
635 	({ if (ns.A != os.A) { \
636 		pbp += sprintf(pbp, #A "( %s -> %s ) ", \
637 			      drbd_##A##_str(os.A), \
638 			      drbd_##A##_str(ns.A)); \
639 	} })
640 
641 /**
642  * is_valid_state() - Returns an SS_ error code if ns is not valid
643  * @mdev:	DRBD device.
644  * @ns:		State to consider.
645  */
646 static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
647 {
648 	/* See drbd_state_sw_errors in drbd_strings.c */
649 
650 	enum drbd_fencing_p fp;
651 	int rv = SS_SUCCESS;
652 
653 	fp = FP_DONT_CARE;
654 	if (get_ldev(mdev)) {
655 		fp = mdev->ldev->dc.fencing;
656 		put_ldev(mdev);
657 	}
658 
659 	if (get_net_conf(mdev)) {
660 		if (!mdev->net_conf->two_primaries &&
661 		    ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
662 			rv = SS_TWO_PRIMARIES;
663 		put_net_conf(mdev);
664 	}
665 
666 	if (rv <= 0)
667 		/* already found a reason to abort */;
668 	else if (ns.role == R_SECONDARY && mdev->open_cnt)
669 		rv = SS_DEVICE_IN_USE;
670 
671 	else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
672 		rv = SS_NO_UP_TO_DATE_DISK;
673 
674 	else if (fp >= FP_RESOURCE &&
675 		 ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
676 		rv = SS_PRIMARY_NOP;
677 
678 	else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
679 		rv = SS_NO_UP_TO_DATE_DISK;
680 
681 	else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
682 		rv = SS_NO_LOCAL_DISK;
683 
684 	else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
685 		rv = SS_NO_REMOTE_DISK;
686 
687 	else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
688 		rv = SS_NO_UP_TO_DATE_DISK;
689 
690 	else if ((ns.conn == C_CONNECTED ||
691 		  ns.conn == C_WF_BITMAP_S ||
692 		  ns.conn == C_SYNC_SOURCE ||
693 		  ns.conn == C_PAUSED_SYNC_S) &&
694 		  ns.disk == D_OUTDATED)
695 		rv = SS_CONNECTED_OUTDATES;
696 
697 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
698 		 (mdev->sync_conf.verify_alg[0] == 0))
699 		rv = SS_NO_VERIFY_ALG;
700 
701 	else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
702 		  mdev->agreed_pro_version < 88)
703 		rv = SS_NOT_SUPPORTED;
704 
705 	return rv;
706 }
707 
708 /**
709  * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
710  * @mdev:	DRBD device.
711  * @ns:		new state.
712  * @os:		old state.
713  */
714 static int is_valid_state_transition(struct drbd_conf *mdev,
715 				     union drbd_state ns, union drbd_state os)
716 {
717 	int rv = SS_SUCCESS;
718 
719 	if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
720 	    os.conn > C_CONNECTED)
721 		rv = SS_RESYNC_RUNNING;
722 
723 	if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
724 		rv = SS_ALREADY_STANDALONE;
725 
726 	if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
727 		rv = SS_IS_DISKLESS;
728 
729 	if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
730 		rv = SS_NO_NET_CONFIG;
731 
732 	if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
733 		rv = SS_LOWER_THAN_OUTDATED;
734 
735 	if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
736 		rv = SS_IN_TRANSIENT_STATE;
737 
738 	if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
739 		rv = SS_IN_TRANSIENT_STATE;
740 
741 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
742 		rv = SS_NEED_CONNECTION;
743 
744 	if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
745 	    ns.conn != os.conn && os.conn > C_CONNECTED)
746 		rv = SS_RESYNC_RUNNING;
747 
748 	if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
749 	    os.conn < C_CONNECTED)
750 		rv = SS_NEED_CONNECTION;
751 
752 	return rv;
753 }
754 
755 /**
756  * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
757  * @mdev:	DRBD device.
758  * @os:		old state.
759  * @ns:		new state.
760  * @warn_sync_abort:
761  *
762  * When we loose connection, we have to set the state of the peers disk (pdsk)
763  * to D_UNKNOWN. This rule and many more along those lines are in this function.
764  */
765 static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
766 				       union drbd_state ns, int *warn_sync_abort)
767 {
768 	enum drbd_fencing_p fp;
769 
770 	fp = FP_DONT_CARE;
771 	if (get_ldev(mdev)) {
772 		fp = mdev->ldev->dc.fencing;
773 		put_ldev(mdev);
774 	}
775 
776 	/* Disallow Network errors to configure a device's network part */
777 	if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
778 	    os.conn <= C_DISCONNECTING)
779 		ns.conn = os.conn;
780 
781 	/* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow */
782 	if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
783 	    ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING)
784 		ns.conn = os.conn;
785 
786 	/* After C_DISCONNECTING only C_STANDALONE may follow */
787 	if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
788 		ns.conn = os.conn;
789 
790 	if (ns.conn < C_CONNECTED) {
791 		ns.peer_isp = 0;
792 		ns.peer = R_UNKNOWN;
793 		if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
794 			ns.pdsk = D_UNKNOWN;
795 	}
796 
797 	/* Clear the aftr_isp when becoming unconfigured */
798 	if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
799 		ns.aftr_isp = 0;
800 
801 	if (ns.conn <= C_DISCONNECTING && ns.disk == D_DISKLESS)
802 		ns.pdsk = D_UNKNOWN;
803 
804 	/* Abort resync if a disk fails/detaches */
805 	if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
806 	    (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
807 		if (warn_sync_abort)
808 			*warn_sync_abort = 1;
809 		ns.conn = C_CONNECTED;
810 	}
811 
812 	if (ns.conn >= C_CONNECTED &&
813 	    ((ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED) ||
814 	     (ns.disk == D_NEGOTIATING && ns.conn == C_WF_BITMAP_T))) {
815 		switch (ns.conn) {
816 		case C_WF_BITMAP_T:
817 		case C_PAUSED_SYNC_T:
818 			ns.disk = D_OUTDATED;
819 			break;
820 		case C_CONNECTED:
821 		case C_WF_BITMAP_S:
822 		case C_SYNC_SOURCE:
823 		case C_PAUSED_SYNC_S:
824 			ns.disk = D_UP_TO_DATE;
825 			break;
826 		case C_SYNC_TARGET:
827 			ns.disk = D_INCONSISTENT;
828 			dev_warn(DEV, "Implicitly set disk state Inconsistent!\n");
829 			break;
830 		}
831 		if (os.disk == D_OUTDATED && ns.disk == D_UP_TO_DATE)
832 			dev_warn(DEV, "Implicitly set disk from Outdated to UpToDate\n");
833 	}
834 
835 	if (ns.conn >= C_CONNECTED &&
836 	    (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)) {
837 		switch (ns.conn) {
838 		case C_CONNECTED:
839 		case C_WF_BITMAP_T:
840 		case C_PAUSED_SYNC_T:
841 		case C_SYNC_TARGET:
842 			ns.pdsk = D_UP_TO_DATE;
843 			break;
844 		case C_WF_BITMAP_S:
845 		case C_PAUSED_SYNC_S:
846 			/* remap any consistent state to D_OUTDATED,
847 			 * but disallow "upgrade" of not even consistent states.
848 			 */
849 			ns.pdsk =
850 				(D_DISKLESS < os.pdsk && os.pdsk < D_OUTDATED)
851 				? os.pdsk : D_OUTDATED;
852 			break;
853 		case C_SYNC_SOURCE:
854 			ns.pdsk = D_INCONSISTENT;
855 			dev_warn(DEV, "Implicitly set pdsk Inconsistent!\n");
856 			break;
857 		}
858 		if (os.pdsk == D_OUTDATED && ns.pdsk == D_UP_TO_DATE)
859 			dev_warn(DEV, "Implicitly set pdsk from Outdated to UpToDate\n");
860 	}
861 
862 	/* Connection breaks down before we finished "Negotiating" */
863 	if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
864 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
865 		if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
866 			ns.disk = mdev->new_state_tmp.disk;
867 			ns.pdsk = mdev->new_state_tmp.pdsk;
868 		} else {
869 			dev_alert(DEV, "Connection lost while negotiating, no data!\n");
870 			ns.disk = D_DISKLESS;
871 			ns.pdsk = D_UNKNOWN;
872 		}
873 		put_ldev(mdev);
874 	}
875 
876 	if (fp == FP_STONITH &&
877 	    (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
878 	    !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
879 		ns.susp = 1;
880 
881 	if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
882 		if (ns.conn == C_SYNC_SOURCE)
883 			ns.conn = C_PAUSED_SYNC_S;
884 		if (ns.conn == C_SYNC_TARGET)
885 			ns.conn = C_PAUSED_SYNC_T;
886 	} else {
887 		if (ns.conn == C_PAUSED_SYNC_S)
888 			ns.conn = C_SYNC_SOURCE;
889 		if (ns.conn == C_PAUSED_SYNC_T)
890 			ns.conn = C_SYNC_TARGET;
891 	}
892 
893 	return ns;
894 }
895 
896 /* helper for __drbd_set_state */
897 static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
898 {
899 	if (cs == C_VERIFY_T) {
900 		/* starting online verify from an arbitrary position
901 		 * does not fit well into the existing protocol.
902 		 * on C_VERIFY_T, we initialize ov_left and friends
903 		 * implicitly in receive_DataRequest once the
904 		 * first P_OV_REQUEST is received */
905 		mdev->ov_start_sector = ~(sector_t)0;
906 	} else {
907 		unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
908 		if (bit >= mdev->rs_total)
909 			mdev->ov_start_sector =
910 				BM_BIT_TO_SECT(mdev->rs_total - 1);
911 		mdev->ov_position = mdev->ov_start_sector;
912 	}
913 }
914 
915 /**
916  * __drbd_set_state() - Set a new DRBD state
917  * @mdev:	DRBD device.
918  * @ns:		new state.
919  * @flags:	Flags
920  * @done:	Optional completion, that will get completed after the after_state_ch() finished
921  *
922  * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
923  */
924 int __drbd_set_state(struct drbd_conf *mdev,
925 		    union drbd_state ns, enum chg_state_flags flags,
926 		    struct completion *done)
927 {
928 	union drbd_state os;
929 	int rv = SS_SUCCESS;
930 	int warn_sync_abort = 0;
931 	struct after_state_chg_work *ascw;
932 
933 	os = mdev->state;
934 
935 	ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
936 
937 	if (ns.i == os.i)
938 		return SS_NOTHING_TO_DO;
939 
940 	if (!(flags & CS_HARD)) {
941 		/*  pre-state-change checks ; only look at ns  */
942 		/* See drbd_state_sw_errors in drbd_strings.c */
943 
944 		rv = is_valid_state(mdev, ns);
945 		if (rv < SS_SUCCESS) {
946 			/* If the old state was illegal as well, then let
947 			   this happen...*/
948 
949 			if (is_valid_state(mdev, os) == rv) {
950 				dev_err(DEV, "Considering state change from bad state. "
951 				    "Error would be: '%s'\n",
952 				    drbd_set_st_err_str(rv));
953 				print_st(mdev, "old", os);
954 				print_st(mdev, "new", ns);
955 				rv = is_valid_state_transition(mdev, ns, os);
956 			}
957 		} else
958 			rv = is_valid_state_transition(mdev, ns, os);
959 	}
960 
961 	if (rv < SS_SUCCESS) {
962 		if (flags & CS_VERBOSE)
963 			print_st_err(mdev, os, ns, rv);
964 		return rv;
965 	}
966 
967 	if (warn_sync_abort)
968 		dev_warn(DEV, "Resync aborted.\n");
969 
970 	{
971 		char *pbp, pb[300];
972 		pbp = pb;
973 		*pbp = 0;
974 		PSC(role);
975 		PSC(peer);
976 		PSC(conn);
977 		PSC(disk);
978 		PSC(pdsk);
979 		PSC(susp);
980 		PSC(aftr_isp);
981 		PSC(peer_isp);
982 		PSC(user_isp);
983 		dev_info(DEV, "%s\n", pb);
984 	}
985 
986 	/* solve the race between becoming unconfigured,
987 	 * worker doing the cleanup, and
988 	 * admin reconfiguring us:
989 	 * on (re)configure, first set CONFIG_PENDING,
990 	 * then wait for a potentially exiting worker,
991 	 * start the worker, and schedule one no_op.
992 	 * then proceed with configuration.
993 	 */
994 	if (ns.disk == D_DISKLESS &&
995 	    ns.conn == C_STANDALONE &&
996 	    ns.role == R_SECONDARY &&
997 	    !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
998 		set_bit(DEVICE_DYING, &mdev->flags);
999 
1000 	mdev->state.i = ns.i;
1001 	wake_up(&mdev->misc_wait);
1002 	wake_up(&mdev->state_wait);
1003 
1004 	/*   post-state-change actions   */
1005 	if (os.conn >= C_SYNC_SOURCE   && ns.conn <= C_CONNECTED) {
1006 		set_bit(STOP_SYNC_TIMER, &mdev->flags);
1007 		mod_timer(&mdev->resync_timer, jiffies);
1008 	}
1009 
1010 	/* aborted verify run. log the last position */
1011 	if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1012 	    ns.conn < C_CONNECTED) {
1013 		mdev->ov_start_sector =
1014 			BM_BIT_TO_SECT(mdev->rs_total - mdev->ov_left);
1015 		dev_info(DEV, "Online Verify reached sector %llu\n",
1016 			(unsigned long long)mdev->ov_start_sector);
1017 	}
1018 
1019 	if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1020 	    (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1021 		dev_info(DEV, "Syncer continues.\n");
1022 		mdev->rs_paused += (long)jiffies-(long)mdev->rs_mark_time;
1023 		if (ns.conn == C_SYNC_TARGET) {
1024 			if (!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))
1025 				mod_timer(&mdev->resync_timer, jiffies);
1026 			/* This if (!test_bit) is only needed for the case
1027 			   that a device that has ceased to used its timer,
1028 			   i.e. it is already in drbd_resync_finished() gets
1029 			   paused and resumed. */
1030 		}
1031 	}
1032 
1033 	if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1034 	    (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1035 		dev_info(DEV, "Resync suspended\n");
1036 		mdev->rs_mark_time = jiffies;
1037 		if (ns.conn == C_PAUSED_SYNC_T)
1038 			set_bit(STOP_SYNC_TIMER, &mdev->flags);
1039 	}
1040 
1041 	if (os.conn == C_CONNECTED &&
1042 	    (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1043 		mdev->ov_position = 0;
1044 		mdev->rs_total =
1045 		mdev->rs_mark_left = drbd_bm_bits(mdev);
1046 		if (mdev->agreed_pro_version >= 90)
1047 			set_ov_position(mdev, ns.conn);
1048 		else
1049 			mdev->ov_start_sector = 0;
1050 		mdev->ov_left = mdev->rs_total
1051 			      - BM_SECT_TO_BIT(mdev->ov_position);
1052 		mdev->rs_start     =
1053 		mdev->rs_mark_time = jiffies;
1054 		mdev->ov_last_oos_size = 0;
1055 		mdev->ov_last_oos_start = 0;
1056 
1057 		if (ns.conn == C_VERIFY_S) {
1058 			dev_info(DEV, "Starting Online Verify from sector %llu\n",
1059 					(unsigned long long)mdev->ov_position);
1060 			mod_timer(&mdev->resync_timer, jiffies);
1061 		}
1062 	}
1063 
1064 	if (get_ldev(mdev)) {
1065 		u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1066 						 MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1067 						 MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1068 
1069 		if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1070 			mdf |= MDF_CRASHED_PRIMARY;
1071 		if (mdev->state.role == R_PRIMARY ||
1072 		    (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1073 			mdf |= MDF_PRIMARY_IND;
1074 		if (mdev->state.conn > C_WF_REPORT_PARAMS)
1075 			mdf |= MDF_CONNECTED_IND;
1076 		if (mdev->state.disk > D_INCONSISTENT)
1077 			mdf |= MDF_CONSISTENT;
1078 		if (mdev->state.disk > D_OUTDATED)
1079 			mdf |= MDF_WAS_UP_TO_DATE;
1080 		if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1081 			mdf |= MDF_PEER_OUT_DATED;
1082 		if (mdf != mdev->ldev->md.flags) {
1083 			mdev->ldev->md.flags = mdf;
1084 			drbd_md_mark_dirty(mdev);
1085 		}
1086 		if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1087 			drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1088 		put_ldev(mdev);
1089 	}
1090 
1091 	/* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1092 	if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1093 	    os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1094 		set_bit(CONSIDER_RESYNC, &mdev->flags);
1095 
1096 	/* Receiver should clean up itself */
1097 	if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1098 		drbd_thread_stop_nowait(&mdev->receiver);
1099 
1100 	/* Now the receiver finished cleaning up itself, it should die */
1101 	if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1102 		drbd_thread_stop_nowait(&mdev->receiver);
1103 
1104 	/* Upon network failure, we need to restart the receiver. */
1105 	if (os.conn > C_TEAR_DOWN &&
1106 	    ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1107 		drbd_thread_restart_nowait(&mdev->receiver);
1108 
1109 	ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1110 	if (ascw) {
1111 		ascw->os = os;
1112 		ascw->ns = ns;
1113 		ascw->flags = flags;
1114 		ascw->w.cb = w_after_state_ch;
1115 		ascw->done = done;
1116 		drbd_queue_work(&mdev->data.work, &ascw->w);
1117 	} else {
1118 		dev_warn(DEV, "Could not kmalloc an ascw\n");
1119 	}
1120 
1121 	return rv;
1122 }
1123 
1124 static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1125 {
1126 	struct after_state_chg_work *ascw =
1127 		container_of(w, struct after_state_chg_work, w);
1128 	after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1129 	if (ascw->flags & CS_WAIT_COMPLETE) {
1130 		D_ASSERT(ascw->done != NULL);
1131 		complete(ascw->done);
1132 	}
1133 	kfree(ascw);
1134 
1135 	return 1;
1136 }
1137 
1138 static void abw_start_sync(struct drbd_conf *mdev, int rv)
1139 {
1140 	if (rv) {
1141 		dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1142 		_drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1143 		return;
1144 	}
1145 
1146 	switch (mdev->state.conn) {
1147 	case C_STARTING_SYNC_T:
1148 		_drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1149 		break;
1150 	case C_STARTING_SYNC_S:
1151 		drbd_start_resync(mdev, C_SYNC_SOURCE);
1152 		break;
1153 	}
1154 }
1155 
1156 /**
1157  * after_state_ch() - Perform after state change actions that may sleep
1158  * @mdev:	DRBD device.
1159  * @os:		old state.
1160  * @ns:		new state.
1161  * @flags:	Flags
1162  */
1163 static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1164 			   union drbd_state ns, enum chg_state_flags flags)
1165 {
1166 	enum drbd_fencing_p fp;
1167 
1168 	if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1169 		clear_bit(CRASHED_PRIMARY, &mdev->flags);
1170 		if (mdev->p_uuid)
1171 			mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1172 	}
1173 
1174 	fp = FP_DONT_CARE;
1175 	if (get_ldev(mdev)) {
1176 		fp = mdev->ldev->dc.fencing;
1177 		put_ldev(mdev);
1178 	}
1179 
1180 	/* Inform userspace about the change... */
1181 	drbd_bcast_state(mdev, ns);
1182 
1183 	if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1184 	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1185 		drbd_khelper(mdev, "pri-on-incon-degr");
1186 
1187 	/* Here we have the actions that are performed after a
1188 	   state change. This function might sleep */
1189 
1190 	if (fp == FP_STONITH && ns.susp) {
1191 		/* case1: The outdate peer handler is successful:
1192 		 * case2: The connection was established again: */
1193 		if ((os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) ||
1194 		    (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)) {
1195 			tl_clear(mdev);
1196 			spin_lock_irq(&mdev->req_lock);
1197 			_drbd_set_state(_NS(mdev, susp, 0), CS_VERBOSE, NULL);
1198 			spin_unlock_irq(&mdev->req_lock);
1199 		}
1200 	}
1201 	/* Do not change the order of the if above and the two below... */
1202 	if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1203 		drbd_send_uuids(mdev);
1204 		drbd_send_state(mdev);
1205 	}
1206 	if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1207 		drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1208 
1209 	/* Lost contact to peer's copy of the data */
1210 	if ((os.pdsk >= D_INCONSISTENT &&
1211 	     os.pdsk != D_UNKNOWN &&
1212 	     os.pdsk != D_OUTDATED)
1213 	&&  (ns.pdsk < D_INCONSISTENT ||
1214 	     ns.pdsk == D_UNKNOWN ||
1215 	     ns.pdsk == D_OUTDATED)) {
1216 		if (get_ldev(mdev)) {
1217 			if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1218 			    mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1219 				drbd_uuid_new_current(mdev);
1220 				drbd_send_uuids(mdev);
1221 			}
1222 			put_ldev(mdev);
1223 		}
1224 	}
1225 
1226 	if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1227 		if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0)
1228 			drbd_uuid_new_current(mdev);
1229 
1230 		/* D_DISKLESS Peer becomes secondary */
1231 		if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1232 			drbd_al_to_on_disk_bm(mdev);
1233 		put_ldev(mdev);
1234 	}
1235 
1236 	/* Last part of the attaching process ... */
1237 	if (ns.conn >= C_CONNECTED &&
1238 	    os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1239 		kfree(mdev->p_uuid); /* We expect to receive up-to-date UUIDs soon. */
1240 		mdev->p_uuid = NULL; /* ...to not use the old ones in the mean time */
1241 		drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1242 		drbd_send_uuids(mdev);
1243 		drbd_send_state(mdev);
1244 	}
1245 
1246 	/* We want to pause/continue resync, tell peer. */
1247 	if (ns.conn >= C_CONNECTED &&
1248 	     ((os.aftr_isp != ns.aftr_isp) ||
1249 	      (os.user_isp != ns.user_isp)))
1250 		drbd_send_state(mdev);
1251 
1252 	/* In case one of the isp bits got set, suspend other devices. */
1253 	if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1254 	    (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1255 		suspend_other_sg(mdev);
1256 
1257 	/* Make sure the peer gets informed about eventual state
1258 	   changes (ISP bits) while we were in WFReportParams. */
1259 	if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1260 		drbd_send_state(mdev);
1261 
1262 	/* We are in the progress to start a full sync... */
1263 	if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1264 	    (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1265 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1266 
1267 	/* We are invalidating our self... */
1268 	if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1269 	    os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1270 		drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1271 
1272 	if (os.disk > D_FAILED && ns.disk == D_FAILED) {
1273 		enum drbd_io_error_p eh;
1274 
1275 		eh = EP_PASS_ON;
1276 		if (get_ldev_if_state(mdev, D_FAILED)) {
1277 			eh = mdev->ldev->dc.on_io_error;
1278 			put_ldev(mdev);
1279 		}
1280 
1281 		drbd_rs_cancel_all(mdev);
1282 		/* since get_ldev() only works as long as disk>=D_INCONSISTENT,
1283 		   and it is D_DISKLESS here, local_cnt can only go down, it can
1284 		   not increase... It will reach zero */
1285 		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1286 		mdev->rs_total = 0;
1287 		mdev->rs_failed = 0;
1288 		atomic_set(&mdev->rs_pending_cnt, 0);
1289 
1290 		spin_lock_irq(&mdev->req_lock);
1291 		_drbd_set_state(_NS(mdev, disk, D_DISKLESS), CS_HARD, NULL);
1292 		spin_unlock_irq(&mdev->req_lock);
1293 
1294 		if (eh == EP_CALL_HELPER)
1295 			drbd_khelper(mdev, "local-io-error");
1296 	}
1297 
1298 	if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
1299 
1300 		if (os.disk == D_FAILED) /* && ns.disk == D_DISKLESS*/ {
1301 			if (drbd_send_state(mdev))
1302 				dev_warn(DEV, "Notified peer that my disk is broken.\n");
1303 			else
1304 				dev_err(DEV, "Sending state in drbd_io_error() failed\n");
1305 		}
1306 
1307 		wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
1308 		lc_destroy(mdev->resync);
1309 		mdev->resync = NULL;
1310 		lc_destroy(mdev->act_log);
1311 		mdev->act_log = NULL;
1312 		__no_warn(local,
1313 			drbd_free_bc(mdev->ldev);
1314 			mdev->ldev = NULL;);
1315 
1316 		if (mdev->md_io_tmpp)
1317 			__free_page(mdev->md_io_tmpp);
1318 	}
1319 
1320 	/* Disks got bigger while they were detached */
1321 	if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1322 	    test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1323 		if (ns.conn == C_CONNECTED)
1324 			resync_after_online_grow(mdev);
1325 	}
1326 
1327 	/* A resync finished or aborted, wake paused devices... */
1328 	if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1329 	    (os.peer_isp && !ns.peer_isp) ||
1330 	    (os.user_isp && !ns.user_isp))
1331 		resume_next_sg(mdev);
1332 
1333 	/* Upon network connection, we need to start the receiver */
1334 	if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1335 		drbd_thread_start(&mdev->receiver);
1336 
1337 	/* Terminate worker thread if we are unconfigured - it will be
1338 	   restarted as needed... */
1339 	if (ns.disk == D_DISKLESS &&
1340 	    ns.conn == C_STANDALONE &&
1341 	    ns.role == R_SECONDARY) {
1342 		if (os.aftr_isp != ns.aftr_isp)
1343 			resume_next_sg(mdev);
1344 		/* set in __drbd_set_state, unless CONFIG_PENDING was set */
1345 		if (test_bit(DEVICE_DYING, &mdev->flags))
1346 			drbd_thread_stop_nowait(&mdev->worker);
1347 	}
1348 
1349 	drbd_md_sync(mdev);
1350 }
1351 
1352 
1353 static int drbd_thread_setup(void *arg)
1354 {
1355 	struct drbd_thread *thi = (struct drbd_thread *) arg;
1356 	struct drbd_conf *mdev = thi->mdev;
1357 	unsigned long flags;
1358 	int retval;
1359 
1360 restart:
1361 	retval = thi->function(thi);
1362 
1363 	spin_lock_irqsave(&thi->t_lock, flags);
1364 
1365 	/* if the receiver has been "Exiting", the last thing it did
1366 	 * was set the conn state to "StandAlone",
1367 	 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1368 	 * and receiver thread will be "started".
1369 	 * drbd_thread_start needs to set "Restarting" in that case.
1370 	 * t_state check and assignment needs to be within the same spinlock,
1371 	 * so either thread_start sees Exiting, and can remap to Restarting,
1372 	 * or thread_start see None, and can proceed as normal.
1373 	 */
1374 
1375 	if (thi->t_state == Restarting) {
1376 		dev_info(DEV, "Restarting %s\n", current->comm);
1377 		thi->t_state = Running;
1378 		spin_unlock_irqrestore(&thi->t_lock, flags);
1379 		goto restart;
1380 	}
1381 
1382 	thi->task = NULL;
1383 	thi->t_state = None;
1384 	smp_mb();
1385 	complete(&thi->stop);
1386 	spin_unlock_irqrestore(&thi->t_lock, flags);
1387 
1388 	dev_info(DEV, "Terminating %s\n", current->comm);
1389 
1390 	/* Release mod reference taken when thread was started */
1391 	module_put(THIS_MODULE);
1392 	return retval;
1393 }
1394 
1395 static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1396 		      int (*func) (struct drbd_thread *))
1397 {
1398 	spin_lock_init(&thi->t_lock);
1399 	thi->task    = NULL;
1400 	thi->t_state = None;
1401 	thi->function = func;
1402 	thi->mdev = mdev;
1403 }
1404 
1405 int drbd_thread_start(struct drbd_thread *thi)
1406 {
1407 	struct drbd_conf *mdev = thi->mdev;
1408 	struct task_struct *nt;
1409 	unsigned long flags;
1410 
1411 	const char *me =
1412 		thi == &mdev->receiver ? "receiver" :
1413 		thi == &mdev->asender  ? "asender"  :
1414 		thi == &mdev->worker   ? "worker"   : "NONSENSE";
1415 
1416 	/* is used from state engine doing drbd_thread_stop_nowait,
1417 	 * while holding the req lock irqsave */
1418 	spin_lock_irqsave(&thi->t_lock, flags);
1419 
1420 	switch (thi->t_state) {
1421 	case None:
1422 		dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1423 				me, current->comm, current->pid);
1424 
1425 		/* Get ref on module for thread - this is released when thread exits */
1426 		if (!try_module_get(THIS_MODULE)) {
1427 			dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1428 			spin_unlock_irqrestore(&thi->t_lock, flags);
1429 			return FALSE;
1430 		}
1431 
1432 		init_completion(&thi->stop);
1433 		D_ASSERT(thi->task == NULL);
1434 		thi->reset_cpu_mask = 1;
1435 		thi->t_state = Running;
1436 		spin_unlock_irqrestore(&thi->t_lock, flags);
1437 		flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1438 
1439 		nt = kthread_create(drbd_thread_setup, (void *) thi,
1440 				    "drbd%d_%s", mdev_to_minor(mdev), me);
1441 
1442 		if (IS_ERR(nt)) {
1443 			dev_err(DEV, "Couldn't start thread\n");
1444 
1445 			module_put(THIS_MODULE);
1446 			return FALSE;
1447 		}
1448 		spin_lock_irqsave(&thi->t_lock, flags);
1449 		thi->task = nt;
1450 		thi->t_state = Running;
1451 		spin_unlock_irqrestore(&thi->t_lock, flags);
1452 		wake_up_process(nt);
1453 		break;
1454 	case Exiting:
1455 		thi->t_state = Restarting;
1456 		dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1457 				me, current->comm, current->pid);
1458 		/* fall through */
1459 	case Running:
1460 	case Restarting:
1461 	default:
1462 		spin_unlock_irqrestore(&thi->t_lock, flags);
1463 		break;
1464 	}
1465 
1466 	return TRUE;
1467 }
1468 
1469 
1470 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1471 {
1472 	unsigned long flags;
1473 
1474 	enum drbd_thread_state ns = restart ? Restarting : Exiting;
1475 
1476 	/* may be called from state engine, holding the req lock irqsave */
1477 	spin_lock_irqsave(&thi->t_lock, flags);
1478 
1479 	if (thi->t_state == None) {
1480 		spin_unlock_irqrestore(&thi->t_lock, flags);
1481 		if (restart)
1482 			drbd_thread_start(thi);
1483 		return;
1484 	}
1485 
1486 	if (thi->t_state != ns) {
1487 		if (thi->task == NULL) {
1488 			spin_unlock_irqrestore(&thi->t_lock, flags);
1489 			return;
1490 		}
1491 
1492 		thi->t_state = ns;
1493 		smp_mb();
1494 		init_completion(&thi->stop);
1495 		if (thi->task != current)
1496 			force_sig(DRBD_SIGKILL, thi->task);
1497 
1498 	}
1499 
1500 	spin_unlock_irqrestore(&thi->t_lock, flags);
1501 
1502 	if (wait)
1503 		wait_for_completion(&thi->stop);
1504 }
1505 
1506 #ifdef CONFIG_SMP
1507 /**
1508  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1509  * @mdev:	DRBD device.
1510  *
1511  * Forces all threads of a device onto the same CPU. This is beneficial for
1512  * DRBD's performance. May be overwritten by user's configuration.
1513  */
1514 void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1515 {
1516 	int ord, cpu;
1517 
1518 	/* user override. */
1519 	if (cpumask_weight(mdev->cpu_mask))
1520 		return;
1521 
1522 	ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1523 	for_each_online_cpu(cpu) {
1524 		if (ord-- == 0) {
1525 			cpumask_set_cpu(cpu, mdev->cpu_mask);
1526 			return;
1527 		}
1528 	}
1529 	/* should not be reached */
1530 	cpumask_setall(mdev->cpu_mask);
1531 }
1532 
1533 /**
1534  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1535  * @mdev:	DRBD device.
1536  *
1537  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1538  * prematurely.
1539  */
1540 void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1541 {
1542 	struct task_struct *p = current;
1543 	struct drbd_thread *thi =
1544 		p == mdev->asender.task  ? &mdev->asender  :
1545 		p == mdev->receiver.task ? &mdev->receiver :
1546 		p == mdev->worker.task   ? &mdev->worker   :
1547 		NULL;
1548 	ERR_IF(thi == NULL)
1549 		return;
1550 	if (!thi->reset_cpu_mask)
1551 		return;
1552 	thi->reset_cpu_mask = 0;
1553 	set_cpus_allowed_ptr(p, mdev->cpu_mask);
1554 }
1555 #endif
1556 
1557 /* the appropriate socket mutex must be held already */
1558 int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1559 			  enum drbd_packets cmd, struct p_header *h,
1560 			  size_t size, unsigned msg_flags)
1561 {
1562 	int sent, ok;
1563 
1564 	ERR_IF(!h) return FALSE;
1565 	ERR_IF(!size) return FALSE;
1566 
1567 	h->magic   = BE_DRBD_MAGIC;
1568 	h->command = cpu_to_be16(cmd);
1569 	h->length  = cpu_to_be16(size-sizeof(struct p_header));
1570 
1571 	sent = drbd_send(mdev, sock, h, size, msg_flags);
1572 
1573 	ok = (sent == size);
1574 	if (!ok)
1575 		dev_err(DEV, "short sent %s size=%d sent=%d\n",
1576 		    cmdname(cmd), (int)size, sent);
1577 	return ok;
1578 }
1579 
1580 /* don't pass the socket. we may only look at it
1581  * when we hold the appropriate socket mutex.
1582  */
1583 int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1584 		  enum drbd_packets cmd, struct p_header *h, size_t size)
1585 {
1586 	int ok = 0;
1587 	struct socket *sock;
1588 
1589 	if (use_data_socket) {
1590 		mutex_lock(&mdev->data.mutex);
1591 		sock = mdev->data.socket;
1592 	} else {
1593 		mutex_lock(&mdev->meta.mutex);
1594 		sock = mdev->meta.socket;
1595 	}
1596 
1597 	/* drbd_disconnect() could have called drbd_free_sock()
1598 	 * while we were waiting in down()... */
1599 	if (likely(sock != NULL))
1600 		ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1601 
1602 	if (use_data_socket)
1603 		mutex_unlock(&mdev->data.mutex);
1604 	else
1605 		mutex_unlock(&mdev->meta.mutex);
1606 	return ok;
1607 }
1608 
1609 int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1610 		   size_t size)
1611 {
1612 	struct p_header h;
1613 	int ok;
1614 
1615 	h.magic   = BE_DRBD_MAGIC;
1616 	h.command = cpu_to_be16(cmd);
1617 	h.length  = cpu_to_be16(size);
1618 
1619 	if (!drbd_get_data_sock(mdev))
1620 		return 0;
1621 
1622 	ok = (sizeof(h) ==
1623 		drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1624 	ok = ok && (size ==
1625 		drbd_send(mdev, mdev->data.socket, data, size, 0));
1626 
1627 	drbd_put_data_sock(mdev);
1628 
1629 	return ok;
1630 }
1631 
1632 int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1633 {
1634 	struct p_rs_param_89 *p;
1635 	struct socket *sock;
1636 	int size, rv;
1637 	const int apv = mdev->agreed_pro_version;
1638 
1639 	size = apv <= 87 ? sizeof(struct p_rs_param)
1640 		: apv == 88 ? sizeof(struct p_rs_param)
1641 			+ strlen(mdev->sync_conf.verify_alg) + 1
1642 		: /* 89 */    sizeof(struct p_rs_param_89);
1643 
1644 	/* used from admin command context and receiver/worker context.
1645 	 * to avoid kmalloc, grab the socket right here,
1646 	 * then use the pre-allocated sbuf there */
1647 	mutex_lock(&mdev->data.mutex);
1648 	sock = mdev->data.socket;
1649 
1650 	if (likely(sock != NULL)) {
1651 		enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1652 
1653 		p = &mdev->data.sbuf.rs_param_89;
1654 
1655 		/* initialize verify_alg and csums_alg */
1656 		memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1657 
1658 		p->rate = cpu_to_be32(sc->rate);
1659 
1660 		if (apv >= 88)
1661 			strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1662 		if (apv >= 89)
1663 			strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1664 
1665 		rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1666 	} else
1667 		rv = 0; /* not ok */
1668 
1669 	mutex_unlock(&mdev->data.mutex);
1670 
1671 	return rv;
1672 }
1673 
1674 int drbd_send_protocol(struct drbd_conf *mdev)
1675 {
1676 	struct p_protocol *p;
1677 	int size, cf, rv;
1678 
1679 	size = sizeof(struct p_protocol);
1680 
1681 	if (mdev->agreed_pro_version >= 87)
1682 		size += strlen(mdev->net_conf->integrity_alg) + 1;
1683 
1684 	/* we must not recurse into our own queue,
1685 	 * as that is blocked during handshake */
1686 	p = kmalloc(size, GFP_NOIO);
1687 	if (p == NULL)
1688 		return 0;
1689 
1690 	p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1691 	p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1692 	p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1693 	p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1694 	p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1695 
1696 	cf = 0;
1697 	if (mdev->net_conf->want_lose)
1698 		cf |= CF_WANT_LOSE;
1699 	if (mdev->net_conf->dry_run) {
1700 		if (mdev->agreed_pro_version >= 92)
1701 			cf |= CF_DRY_RUN;
1702 		else {
1703 			dev_err(DEV, "--dry-run is not supported by peer");
1704 			kfree(p);
1705 			return 0;
1706 		}
1707 	}
1708 	p->conn_flags    = cpu_to_be32(cf);
1709 
1710 	if (mdev->agreed_pro_version >= 87)
1711 		strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1712 
1713 	rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1714 			   (struct p_header *)p, size);
1715 	kfree(p);
1716 	return rv;
1717 }
1718 
1719 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1720 {
1721 	struct p_uuids p;
1722 	int i;
1723 
1724 	if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1725 		return 1;
1726 
1727 	for (i = UI_CURRENT; i < UI_SIZE; i++)
1728 		p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1729 
1730 	mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1731 	p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1732 	uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1733 	uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1734 	uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1735 	p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1736 
1737 	put_ldev(mdev);
1738 
1739 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
1740 			     (struct p_header *)&p, sizeof(p));
1741 }
1742 
1743 int drbd_send_uuids(struct drbd_conf *mdev)
1744 {
1745 	return _drbd_send_uuids(mdev, 0);
1746 }
1747 
1748 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1749 {
1750 	return _drbd_send_uuids(mdev, 8);
1751 }
1752 
1753 
1754 int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
1755 {
1756 	struct p_rs_uuid p;
1757 
1758 	p.uuid = cpu_to_be64(val);
1759 
1760 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
1761 			     (struct p_header *)&p, sizeof(p));
1762 }
1763 
1764 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1765 {
1766 	struct p_sizes p;
1767 	sector_t d_size, u_size;
1768 	int q_order_type;
1769 	int ok;
1770 
1771 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1772 		D_ASSERT(mdev->ldev->backing_bdev);
1773 		d_size = drbd_get_max_capacity(mdev->ldev);
1774 		u_size = mdev->ldev->dc.disk_size;
1775 		q_order_type = drbd_queue_order_type(mdev);
1776 		put_ldev(mdev);
1777 	} else {
1778 		d_size = 0;
1779 		u_size = 0;
1780 		q_order_type = QUEUE_ORDERED_NONE;
1781 	}
1782 
1783 	p.d_size = cpu_to_be64(d_size);
1784 	p.u_size = cpu_to_be64(u_size);
1785 	p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1786 	p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue));
1787 	p.queue_order_type = cpu_to_be16(q_order_type);
1788 	p.dds_flags = cpu_to_be16(flags);
1789 
1790 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
1791 			   (struct p_header *)&p, sizeof(p));
1792 	return ok;
1793 }
1794 
1795 /**
1796  * drbd_send_state() - Sends the drbd state to the peer
1797  * @mdev:	DRBD device.
1798  */
1799 int drbd_send_state(struct drbd_conf *mdev)
1800 {
1801 	struct socket *sock;
1802 	struct p_state p;
1803 	int ok = 0;
1804 
1805 	/* Grab state lock so we wont send state if we're in the middle
1806 	 * of a cluster wide state change on another thread */
1807 	drbd_state_lock(mdev);
1808 
1809 	mutex_lock(&mdev->data.mutex);
1810 
1811 	p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1812 	sock = mdev->data.socket;
1813 
1814 	if (likely(sock != NULL)) {
1815 		ok = _drbd_send_cmd(mdev, sock, P_STATE,
1816 				    (struct p_header *)&p, sizeof(p), 0);
1817 	}
1818 
1819 	mutex_unlock(&mdev->data.mutex);
1820 
1821 	drbd_state_unlock(mdev);
1822 	return ok;
1823 }
1824 
1825 int drbd_send_state_req(struct drbd_conf *mdev,
1826 	union drbd_state mask, union drbd_state val)
1827 {
1828 	struct p_req_state p;
1829 
1830 	p.mask    = cpu_to_be32(mask.i);
1831 	p.val     = cpu_to_be32(val.i);
1832 
1833 	return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
1834 			     (struct p_header *)&p, sizeof(p));
1835 }
1836 
1837 int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
1838 {
1839 	struct p_req_state_reply p;
1840 
1841 	p.retcode    = cpu_to_be32(retcode);
1842 
1843 	return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
1844 			     (struct p_header *)&p, sizeof(p));
1845 }
1846 
1847 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1848 	struct p_compressed_bm *p,
1849 	struct bm_xfer_ctx *c)
1850 {
1851 	struct bitstream bs;
1852 	unsigned long plain_bits;
1853 	unsigned long tmp;
1854 	unsigned long rl;
1855 	unsigned len;
1856 	unsigned toggle;
1857 	int bits;
1858 
1859 	/* may we use this feature? */
1860 	if ((mdev->sync_conf.use_rle == 0) ||
1861 		(mdev->agreed_pro_version < 90))
1862 			return 0;
1863 
1864 	if (c->bit_offset >= c->bm_bits)
1865 		return 0; /* nothing to do. */
1866 
1867 	/* use at most thus many bytes */
1868 	bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1869 	memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1870 	/* plain bits covered in this code string */
1871 	plain_bits = 0;
1872 
1873 	/* p->encoding & 0x80 stores whether the first run length is set.
1874 	 * bit offset is implicit.
1875 	 * start with toggle == 2 to be able to tell the first iteration */
1876 	toggle = 2;
1877 
1878 	/* see how much plain bits we can stuff into one packet
1879 	 * using RLE and VLI. */
1880 	do {
1881 		tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1882 				    : _drbd_bm_find_next(mdev, c->bit_offset);
1883 		if (tmp == -1UL)
1884 			tmp = c->bm_bits;
1885 		rl = tmp - c->bit_offset;
1886 
1887 		if (toggle == 2) { /* first iteration */
1888 			if (rl == 0) {
1889 				/* the first checked bit was set,
1890 				 * store start value, */
1891 				DCBP_set_start(p, 1);
1892 				/* but skip encoding of zero run length */
1893 				toggle = !toggle;
1894 				continue;
1895 			}
1896 			DCBP_set_start(p, 0);
1897 		}
1898 
1899 		/* paranoia: catch zero runlength.
1900 		 * can only happen if bitmap is modified while we scan it. */
1901 		if (rl == 0) {
1902 			dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1903 			    "t:%u bo:%lu\n", toggle, c->bit_offset);
1904 			return -1;
1905 		}
1906 
1907 		bits = vli_encode_bits(&bs, rl);
1908 		if (bits == -ENOBUFS) /* buffer full */
1909 			break;
1910 		if (bits <= 0) {
1911 			dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1912 			return 0;
1913 		}
1914 
1915 		toggle = !toggle;
1916 		plain_bits += rl;
1917 		c->bit_offset = tmp;
1918 	} while (c->bit_offset < c->bm_bits);
1919 
1920 	len = bs.cur.b - p->code + !!bs.cur.bit;
1921 
1922 	if (plain_bits < (len << 3)) {
1923 		/* incompressible with this method.
1924 		 * we need to rewind both word and bit position. */
1925 		c->bit_offset -= plain_bits;
1926 		bm_xfer_ctx_bit_to_word_offset(c);
1927 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1928 		return 0;
1929 	}
1930 
1931 	/* RLE + VLI was able to compress it just fine.
1932 	 * update c->word_offset. */
1933 	bm_xfer_ctx_bit_to_word_offset(c);
1934 
1935 	/* store pad_bits */
1936 	DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1937 
1938 	return len;
1939 }
1940 
1941 enum { OK, FAILED, DONE }
1942 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1943 	struct p_header *h, struct bm_xfer_ctx *c)
1944 {
1945 	struct p_compressed_bm *p = (void*)h;
1946 	unsigned long num_words;
1947 	int len;
1948 	int ok;
1949 
1950 	len = fill_bitmap_rle_bits(mdev, p, c);
1951 
1952 	if (len < 0)
1953 		return FAILED;
1954 
1955 	if (len) {
1956 		DCBP_set_code(p, RLE_VLI_Bits);
1957 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
1958 			sizeof(*p) + len, 0);
1959 
1960 		c->packets[0]++;
1961 		c->bytes[0] += sizeof(*p) + len;
1962 
1963 		if (c->bit_offset >= c->bm_bits)
1964 			len = 0; /* DONE */
1965 	} else {
1966 		/* was not compressible.
1967 		 * send a buffer full of plain text bits instead. */
1968 		num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1969 		len = num_words * sizeof(long);
1970 		if (len)
1971 			drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
1972 		ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
1973 				   h, sizeof(struct p_header) + len, 0);
1974 		c->word_offset += num_words;
1975 		c->bit_offset = c->word_offset * BITS_PER_LONG;
1976 
1977 		c->packets[1]++;
1978 		c->bytes[1] += sizeof(struct p_header) + len;
1979 
1980 		if (c->bit_offset > c->bm_bits)
1981 			c->bit_offset = c->bm_bits;
1982 	}
1983 	ok = ok ? ((len == 0) ? DONE : OK) : FAILED;
1984 
1985 	if (ok == DONE)
1986 		INFO_bm_xfer_stats(mdev, "send", c);
1987 	return ok;
1988 }
1989 
1990 /* See the comment at receive_bitmap() */
1991 int _drbd_send_bitmap(struct drbd_conf *mdev)
1992 {
1993 	struct bm_xfer_ctx c;
1994 	struct p_header *p;
1995 	int ret;
1996 
1997 	ERR_IF(!mdev->bitmap) return FALSE;
1998 
1999 	/* maybe we should use some per thread scratch page,
2000 	 * and allocate that during initial device creation? */
2001 	p = (struct p_header *) __get_free_page(GFP_NOIO);
2002 	if (!p) {
2003 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2004 		return FALSE;
2005 	}
2006 
2007 	if (get_ldev(mdev)) {
2008 		if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2009 			dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2010 			drbd_bm_set_all(mdev);
2011 			if (drbd_bm_write(mdev)) {
2012 				/* write_bm did fail! Leave full sync flag set in Meta P_DATA
2013 				 * but otherwise process as per normal - need to tell other
2014 				 * side that a full resync is required! */
2015 				dev_err(DEV, "Failed to write bitmap to disk!\n");
2016 			} else {
2017 				drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2018 				drbd_md_sync(mdev);
2019 			}
2020 		}
2021 		put_ldev(mdev);
2022 	}
2023 
2024 	c = (struct bm_xfer_ctx) {
2025 		.bm_bits = drbd_bm_bits(mdev),
2026 		.bm_words = drbd_bm_words(mdev),
2027 	};
2028 
2029 	do {
2030 		ret = send_bitmap_rle_or_plain(mdev, p, &c);
2031 	} while (ret == OK);
2032 
2033 	free_page((unsigned long) p);
2034 	return (ret == DONE);
2035 }
2036 
2037 int drbd_send_bitmap(struct drbd_conf *mdev)
2038 {
2039 	int err;
2040 
2041 	if (!drbd_get_data_sock(mdev))
2042 		return -1;
2043 	err = !_drbd_send_bitmap(mdev);
2044 	drbd_put_data_sock(mdev);
2045 	return err;
2046 }
2047 
2048 int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2049 {
2050 	int ok;
2051 	struct p_barrier_ack p;
2052 
2053 	p.barrier  = barrier_nr;
2054 	p.set_size = cpu_to_be32(set_size);
2055 
2056 	if (mdev->state.conn < C_CONNECTED)
2057 		return FALSE;
2058 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2059 			(struct p_header *)&p, sizeof(p));
2060 	return ok;
2061 }
2062 
2063 /**
2064  * _drbd_send_ack() - Sends an ack packet
2065  * @mdev:	DRBD device.
2066  * @cmd:	Packet command code.
2067  * @sector:	sector, needs to be in big endian byte order
2068  * @blksize:	size in byte, needs to be in big endian byte order
2069  * @block_id:	Id, big endian byte order
2070  */
2071 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2072 			  u64 sector,
2073 			  u32 blksize,
2074 			  u64 block_id)
2075 {
2076 	int ok;
2077 	struct p_block_ack p;
2078 
2079 	p.sector   = sector;
2080 	p.block_id = block_id;
2081 	p.blksize  = blksize;
2082 	p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2083 
2084 	if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2085 		return FALSE;
2086 	ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2087 				(struct p_header *)&p, sizeof(p));
2088 	return ok;
2089 }
2090 
2091 int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2092 		     struct p_data *dp)
2093 {
2094 	const int header_size = sizeof(struct p_data)
2095 			      - sizeof(struct p_header);
2096 	int data_size  = ((struct p_header *)dp)->length - header_size;
2097 
2098 	return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2099 			      dp->block_id);
2100 }
2101 
2102 int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2103 		     struct p_block_req *rp)
2104 {
2105 	return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2106 }
2107 
2108 /**
2109  * drbd_send_ack() - Sends an ack packet
2110  * @mdev:	DRBD device.
2111  * @cmd:	Packet command code.
2112  * @e:		Epoch entry.
2113  */
2114 int drbd_send_ack(struct drbd_conf *mdev,
2115 	enum drbd_packets cmd, struct drbd_epoch_entry *e)
2116 {
2117 	return _drbd_send_ack(mdev, cmd,
2118 			      cpu_to_be64(e->sector),
2119 			      cpu_to_be32(e->size),
2120 			      e->block_id);
2121 }
2122 
2123 /* This function misuses the block_id field to signal if the blocks
2124  * are is sync or not. */
2125 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2126 		     sector_t sector, int blksize, u64 block_id)
2127 {
2128 	return _drbd_send_ack(mdev, cmd,
2129 			      cpu_to_be64(sector),
2130 			      cpu_to_be32(blksize),
2131 			      cpu_to_be64(block_id));
2132 }
2133 
2134 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2135 		       sector_t sector, int size, u64 block_id)
2136 {
2137 	int ok;
2138 	struct p_block_req p;
2139 
2140 	p.sector   = cpu_to_be64(sector);
2141 	p.block_id = block_id;
2142 	p.blksize  = cpu_to_be32(size);
2143 
2144 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2145 				(struct p_header *)&p, sizeof(p));
2146 	return ok;
2147 }
2148 
2149 int drbd_send_drequest_csum(struct drbd_conf *mdev,
2150 			    sector_t sector, int size,
2151 			    void *digest, int digest_size,
2152 			    enum drbd_packets cmd)
2153 {
2154 	int ok;
2155 	struct p_block_req p;
2156 
2157 	p.sector   = cpu_to_be64(sector);
2158 	p.block_id = BE_DRBD_MAGIC + 0xbeef;
2159 	p.blksize  = cpu_to_be32(size);
2160 
2161 	p.head.magic   = BE_DRBD_MAGIC;
2162 	p.head.command = cpu_to_be16(cmd);
2163 	p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header) + digest_size);
2164 
2165 	mutex_lock(&mdev->data.mutex);
2166 
2167 	ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2168 	ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2169 
2170 	mutex_unlock(&mdev->data.mutex);
2171 
2172 	return ok;
2173 }
2174 
2175 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2176 {
2177 	int ok;
2178 	struct p_block_req p;
2179 
2180 	p.sector   = cpu_to_be64(sector);
2181 	p.block_id = BE_DRBD_MAGIC + 0xbabe;
2182 	p.blksize  = cpu_to_be32(size);
2183 
2184 	ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2185 			   (struct p_header *)&p, sizeof(p));
2186 	return ok;
2187 }
2188 
2189 static int drbd_send_delay_probe(struct drbd_conf *mdev, struct drbd_socket *ds)
2190 {
2191 	struct p_delay_probe dp;
2192 	int offset, ok = 0;
2193 	struct timeval now;
2194 
2195 	mutex_lock(&ds->mutex);
2196 	if (likely(ds->socket)) {
2197 		do_gettimeofday(&now);
2198 		offset = now.tv_usec - mdev->dps_time.tv_usec +
2199 			 (now.tv_sec - mdev->dps_time.tv_sec) * 1000000;
2200 		dp.seq_num  = cpu_to_be32(mdev->delay_seq);
2201 		dp.offset   = cpu_to_be32(offset);
2202 
2203 		ok = _drbd_send_cmd(mdev, ds->socket, P_DELAY_PROBE,
2204 				    (struct p_header *)&dp, sizeof(dp), 0);
2205 	}
2206 	mutex_unlock(&ds->mutex);
2207 
2208 	return ok;
2209 }
2210 
2211 static int drbd_send_delay_probes(struct drbd_conf *mdev)
2212 {
2213 	int ok;
2214 
2215 	mdev->delay_seq++;
2216 	do_gettimeofday(&mdev->dps_time);
2217 	ok = drbd_send_delay_probe(mdev, &mdev->meta);
2218 	ok = ok && drbd_send_delay_probe(mdev, &mdev->data);
2219 
2220 	mdev->dp_volume_last = mdev->send_cnt;
2221 	mod_timer(&mdev->delay_probe_timer, jiffies + mdev->sync_conf.dp_interval * HZ / 10);
2222 
2223 	return ok;
2224 }
2225 
2226 /* called on sndtimeo
2227  * returns FALSE if we should retry,
2228  * TRUE if we think connection is dead
2229  */
2230 static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2231 {
2232 	int drop_it;
2233 	/* long elapsed = (long)(jiffies - mdev->last_received); */
2234 
2235 	drop_it =   mdev->meta.socket == sock
2236 		|| !mdev->asender.task
2237 		|| get_t_state(&mdev->asender) != Running
2238 		|| mdev->state.conn < C_CONNECTED;
2239 
2240 	if (drop_it)
2241 		return TRUE;
2242 
2243 	drop_it = !--mdev->ko_count;
2244 	if (!drop_it) {
2245 		dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2246 		       current->comm, current->pid, mdev->ko_count);
2247 		request_ping(mdev);
2248 	}
2249 
2250 	return drop_it; /* && (mdev->state == R_PRIMARY) */;
2251 }
2252 
2253 /* The idea of sendpage seems to be to put some kind of reference
2254  * to the page into the skb, and to hand it over to the NIC. In
2255  * this process get_page() gets called.
2256  *
2257  * As soon as the page was really sent over the network put_page()
2258  * gets called by some part of the network layer. [ NIC driver? ]
2259  *
2260  * [ get_page() / put_page() increment/decrement the count. If count
2261  *   reaches 0 the page will be freed. ]
2262  *
2263  * This works nicely with pages from FSs.
2264  * But this means that in protocol A we might signal IO completion too early!
2265  *
2266  * In order not to corrupt data during a resync we must make sure
2267  * that we do not reuse our own buffer pages (EEs) to early, therefore
2268  * we have the net_ee list.
2269  *
2270  * XFS seems to have problems, still, it submits pages with page_count == 0!
2271  * As a workaround, we disable sendpage on pages
2272  * with page_count == 0 or PageSlab.
2273  */
2274 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2275 		   int offset, size_t size, unsigned msg_flags)
2276 {
2277 	int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags);
2278 	kunmap(page);
2279 	if (sent == size)
2280 		mdev->send_cnt += size>>9;
2281 	return sent == size;
2282 }
2283 
2284 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2285 		    int offset, size_t size, unsigned msg_flags)
2286 {
2287 	mm_segment_t oldfs = get_fs();
2288 	int sent, ok;
2289 	int len = size;
2290 
2291 	/* e.g. XFS meta- & log-data is in slab pages, which have a
2292 	 * page_count of 0 and/or have PageSlab() set.
2293 	 * we cannot use send_page for those, as that does get_page();
2294 	 * put_page(); and would cause either a VM_BUG directly, or
2295 	 * __page_cache_release a page that would actually still be referenced
2296 	 * by someone, leading to some obscure delayed Oops somewhere else. */
2297 	if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2298 		return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2299 
2300 	msg_flags |= MSG_NOSIGNAL;
2301 	drbd_update_congested(mdev);
2302 	set_fs(KERNEL_DS);
2303 	do {
2304 		sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2305 							offset, len,
2306 							msg_flags);
2307 		if (sent == -EAGAIN) {
2308 			if (we_should_drop_the_connection(mdev,
2309 							  mdev->data.socket))
2310 				break;
2311 			else
2312 				continue;
2313 		}
2314 		if (sent <= 0) {
2315 			dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2316 			     __func__, (int)size, len, sent);
2317 			break;
2318 		}
2319 		len    -= sent;
2320 		offset += sent;
2321 	} while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2322 	set_fs(oldfs);
2323 	clear_bit(NET_CONGESTED, &mdev->flags);
2324 
2325 	ok = (len == 0);
2326 	if (likely(ok))
2327 		mdev->send_cnt += size>>9;
2328 	return ok;
2329 }
2330 
2331 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2332 {
2333 	struct bio_vec *bvec;
2334 	int i;
2335 	/* hint all but last page with MSG_MORE */
2336 	__bio_for_each_segment(bvec, bio, i, 0) {
2337 		if (!_drbd_no_send_page(mdev, bvec->bv_page,
2338 				     bvec->bv_offset, bvec->bv_len,
2339 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2340 			return 0;
2341 	}
2342 	return 1;
2343 }
2344 
2345 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2346 {
2347 	struct bio_vec *bvec;
2348 	int i;
2349 	/* hint all but last page with MSG_MORE */
2350 	__bio_for_each_segment(bvec, bio, i, 0) {
2351 		if (!_drbd_send_page(mdev, bvec->bv_page,
2352 				     bvec->bv_offset, bvec->bv_len,
2353 				     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2354 			return 0;
2355 	}
2356 	return 1;
2357 }
2358 
2359 static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2360 {
2361 	struct page *page = e->pages;
2362 	unsigned len = e->size;
2363 	/* hint all but last page with MSG_MORE */
2364 	page_chain_for_each(page) {
2365 		unsigned l = min_t(unsigned, len, PAGE_SIZE);
2366 		if (!_drbd_send_page(mdev, page, 0, l,
2367 				page_chain_next(page) ? MSG_MORE : 0))
2368 			return 0;
2369 		len -= l;
2370 	}
2371 	return 1;
2372 }
2373 
2374 static void consider_delay_probes(struct drbd_conf *mdev)
2375 {
2376 	if (mdev->state.conn != C_SYNC_SOURCE || mdev->agreed_pro_version < 93)
2377 		return;
2378 
2379 	if (mdev->dp_volume_last + mdev->sync_conf.dp_volume * 2 < mdev->send_cnt)
2380 		drbd_send_delay_probes(mdev);
2381 }
2382 
2383 static int w_delay_probes(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
2384 {
2385 	if (!cancel && mdev->state.conn == C_SYNC_SOURCE)
2386 		drbd_send_delay_probes(mdev);
2387 
2388 	return 1;
2389 }
2390 
2391 static void delay_probe_timer_fn(unsigned long data)
2392 {
2393 	struct drbd_conf *mdev = (struct drbd_conf *) data;
2394 
2395 	if (list_empty(&mdev->delay_probe_work.list))
2396 		drbd_queue_work(&mdev->data.work, &mdev->delay_probe_work);
2397 }
2398 
2399 /* Used to send write requests
2400  * R_PRIMARY -> Peer	(P_DATA)
2401  */
2402 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2403 {
2404 	int ok = 1;
2405 	struct p_data p;
2406 	unsigned int dp_flags = 0;
2407 	void *dgb;
2408 	int dgs;
2409 
2410 	if (!drbd_get_data_sock(mdev))
2411 		return 0;
2412 
2413 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2414 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2415 
2416 	p.head.magic   = BE_DRBD_MAGIC;
2417 	p.head.command = cpu_to_be16(P_DATA);
2418 	p.head.length  =
2419 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + req->size);
2420 
2421 	p.sector   = cpu_to_be64(req->sector);
2422 	p.block_id = (unsigned long)req;
2423 	p.seq_num  = cpu_to_be32(req->seq_num =
2424 				 atomic_add_return(1, &mdev->packet_seq));
2425 	dp_flags = 0;
2426 
2427 	/* NOTE: no need to check if barriers supported here as we would
2428 	 *       not pass the test in make_request_common in that case
2429 	 */
2430 	if (bio_rw_flagged(req->master_bio, BIO_RW_BARRIER)) {
2431 		dev_err(DEV, "ASSERT FAILED would have set DP_HARDBARRIER\n");
2432 		/* dp_flags |= DP_HARDBARRIER; */
2433 	}
2434 	if (bio_rw_flagged(req->master_bio, BIO_RW_SYNCIO))
2435 		dp_flags |= DP_RW_SYNC;
2436 	/* for now handle SYNCIO and UNPLUG
2437 	 * as if they still were one and the same flag */
2438 	if (bio_rw_flagged(req->master_bio, BIO_RW_UNPLUG))
2439 		dp_flags |= DP_RW_SYNC;
2440 	if (mdev->state.conn >= C_SYNC_SOURCE &&
2441 	    mdev->state.conn <= C_PAUSED_SYNC_T)
2442 		dp_flags |= DP_MAY_SET_IN_SYNC;
2443 
2444 	p.dp_flags = cpu_to_be32(dp_flags);
2445 	set_bit(UNPLUG_REMOTE, &mdev->flags);
2446 	ok = (sizeof(p) ==
2447 		drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2448 	if (ok && dgs) {
2449 		dgb = mdev->int_dig_out;
2450 		drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2451 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2452 	}
2453 	if (ok) {
2454 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
2455 			ok = _drbd_send_bio(mdev, req->master_bio);
2456 		else
2457 			ok = _drbd_send_zc_bio(mdev, req->master_bio);
2458 	}
2459 
2460 	drbd_put_data_sock(mdev);
2461 
2462 	if (ok)
2463 		consider_delay_probes(mdev);
2464 
2465 	return ok;
2466 }
2467 
2468 /* answer packet, used to send data back for read requests:
2469  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2470  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2471  */
2472 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2473 		    struct drbd_epoch_entry *e)
2474 {
2475 	int ok;
2476 	struct p_data p;
2477 	void *dgb;
2478 	int dgs;
2479 
2480 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2481 		crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2482 
2483 	p.head.magic   = BE_DRBD_MAGIC;
2484 	p.head.command = cpu_to_be16(cmd);
2485 	p.head.length  =
2486 		cpu_to_be16(sizeof(p) - sizeof(struct p_header) + dgs + e->size);
2487 
2488 	p.sector   = cpu_to_be64(e->sector);
2489 	p.block_id = e->block_id;
2490 	/* p.seq_num  = 0;    No sequence numbers here.. */
2491 
2492 	/* Only called by our kernel thread.
2493 	 * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2494 	 * in response to admin command or module unload.
2495 	 */
2496 	if (!drbd_get_data_sock(mdev))
2497 		return 0;
2498 
2499 	ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p,
2500 					sizeof(p), dgs ? MSG_MORE : 0);
2501 	if (ok && dgs) {
2502 		dgb = mdev->int_dig_out;
2503 		drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2504 		ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2505 	}
2506 	if (ok)
2507 		ok = _drbd_send_zc_ee(mdev, e);
2508 
2509 	drbd_put_data_sock(mdev);
2510 
2511 	if (ok)
2512 		consider_delay_probes(mdev);
2513 
2514 	return ok;
2515 }
2516 
2517 /*
2518   drbd_send distinguishes two cases:
2519 
2520   Packets sent via the data socket "sock"
2521   and packets sent via the meta data socket "msock"
2522 
2523 		    sock                      msock
2524   -----------------+-------------------------+------------------------------
2525   timeout           conf.timeout / 2          conf.timeout / 2
2526   timeout action    send a ping via msock     Abort communication
2527 					      and close all sockets
2528 */
2529 
2530 /*
2531  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2532  */
2533 int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2534 	      void *buf, size_t size, unsigned msg_flags)
2535 {
2536 	struct kvec iov;
2537 	struct msghdr msg;
2538 	int rv, sent = 0;
2539 
2540 	if (!sock)
2541 		return -1000;
2542 
2543 	/* THINK  if (signal_pending) return ... ? */
2544 
2545 	iov.iov_base = buf;
2546 	iov.iov_len  = size;
2547 
2548 	msg.msg_name       = NULL;
2549 	msg.msg_namelen    = 0;
2550 	msg.msg_control    = NULL;
2551 	msg.msg_controllen = 0;
2552 	msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2553 
2554 	if (sock == mdev->data.socket) {
2555 		mdev->ko_count = mdev->net_conf->ko_count;
2556 		drbd_update_congested(mdev);
2557 	}
2558 	do {
2559 		/* STRANGE
2560 		 * tcp_sendmsg does _not_ use its size parameter at all ?
2561 		 *
2562 		 * -EAGAIN on timeout, -EINTR on signal.
2563 		 */
2564 /* THINK
2565  * do we need to block DRBD_SIG if sock == &meta.socket ??
2566  * otherwise wake_asender() might interrupt some send_*Ack !
2567  */
2568 		rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2569 		if (rv == -EAGAIN) {
2570 			if (we_should_drop_the_connection(mdev, sock))
2571 				break;
2572 			else
2573 				continue;
2574 		}
2575 		D_ASSERT(rv != 0);
2576 		if (rv == -EINTR) {
2577 			flush_signals(current);
2578 			rv = 0;
2579 		}
2580 		if (rv < 0)
2581 			break;
2582 		sent += rv;
2583 		iov.iov_base += rv;
2584 		iov.iov_len  -= rv;
2585 	} while (sent < size);
2586 
2587 	if (sock == mdev->data.socket)
2588 		clear_bit(NET_CONGESTED, &mdev->flags);
2589 
2590 	if (rv <= 0) {
2591 		if (rv != -EAGAIN) {
2592 			dev_err(DEV, "%s_sendmsg returned %d\n",
2593 			    sock == mdev->meta.socket ? "msock" : "sock",
2594 			    rv);
2595 			drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2596 		} else
2597 			drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2598 	}
2599 
2600 	return sent;
2601 }
2602 
2603 static int drbd_open(struct block_device *bdev, fmode_t mode)
2604 {
2605 	struct drbd_conf *mdev = bdev->bd_disk->private_data;
2606 	unsigned long flags;
2607 	int rv = 0;
2608 
2609 	spin_lock_irqsave(&mdev->req_lock, flags);
2610 	/* to have a stable mdev->state.role
2611 	 * and no race with updating open_cnt */
2612 
2613 	if (mdev->state.role != R_PRIMARY) {
2614 		if (mode & FMODE_WRITE)
2615 			rv = -EROFS;
2616 		else if (!allow_oos)
2617 			rv = -EMEDIUMTYPE;
2618 	}
2619 
2620 	if (!rv)
2621 		mdev->open_cnt++;
2622 	spin_unlock_irqrestore(&mdev->req_lock, flags);
2623 
2624 	return rv;
2625 }
2626 
2627 static int drbd_release(struct gendisk *gd, fmode_t mode)
2628 {
2629 	struct drbd_conf *mdev = gd->private_data;
2630 	mdev->open_cnt--;
2631 	return 0;
2632 }
2633 
2634 static void drbd_unplug_fn(struct request_queue *q)
2635 {
2636 	struct drbd_conf *mdev = q->queuedata;
2637 
2638 	/* unplug FIRST */
2639 	spin_lock_irq(q->queue_lock);
2640 	blk_remove_plug(q);
2641 	spin_unlock_irq(q->queue_lock);
2642 
2643 	/* only if connected */
2644 	spin_lock_irq(&mdev->req_lock);
2645 	if (mdev->state.pdsk >= D_INCONSISTENT && mdev->state.conn >= C_CONNECTED) {
2646 		D_ASSERT(mdev->state.role == R_PRIMARY);
2647 		if (test_and_clear_bit(UNPLUG_REMOTE, &mdev->flags)) {
2648 			/* add to the data.work queue,
2649 			 * unless already queued.
2650 			 * XXX this might be a good addition to drbd_queue_work
2651 			 * anyways, to detect "double queuing" ... */
2652 			if (list_empty(&mdev->unplug_work.list))
2653 				drbd_queue_work(&mdev->data.work,
2654 						&mdev->unplug_work);
2655 		}
2656 	}
2657 	spin_unlock_irq(&mdev->req_lock);
2658 
2659 	if (mdev->state.disk >= D_INCONSISTENT)
2660 		drbd_kick_lo(mdev);
2661 }
2662 
2663 static void drbd_set_defaults(struct drbd_conf *mdev)
2664 {
2665 	mdev->sync_conf.after      = DRBD_AFTER_DEF;
2666 	mdev->sync_conf.rate       = DRBD_RATE_DEF;
2667 	mdev->sync_conf.al_extents = DRBD_AL_EXTENTS_DEF;
2668 	mdev->state = (union drbd_state) {
2669 		{ .role = R_SECONDARY,
2670 		  .peer = R_UNKNOWN,
2671 		  .conn = C_STANDALONE,
2672 		  .disk = D_DISKLESS,
2673 		  .pdsk = D_UNKNOWN,
2674 		  .susp = 0
2675 		} };
2676 }
2677 
2678 void drbd_init_set_defaults(struct drbd_conf *mdev)
2679 {
2680 	/* the memset(,0,) did most of this.
2681 	 * note: only assignments, no allocation in here */
2682 
2683 	drbd_set_defaults(mdev);
2684 
2685 	/* for now, we do NOT yet support it,
2686 	 * even though we start some framework
2687 	 * to eventually support barriers */
2688 	set_bit(NO_BARRIER_SUPP, &mdev->flags);
2689 
2690 	atomic_set(&mdev->ap_bio_cnt, 0);
2691 	atomic_set(&mdev->ap_pending_cnt, 0);
2692 	atomic_set(&mdev->rs_pending_cnt, 0);
2693 	atomic_set(&mdev->unacked_cnt, 0);
2694 	atomic_set(&mdev->local_cnt, 0);
2695 	atomic_set(&mdev->net_cnt, 0);
2696 	atomic_set(&mdev->packet_seq, 0);
2697 	atomic_set(&mdev->pp_in_use, 0);
2698 
2699 	mutex_init(&mdev->md_io_mutex);
2700 	mutex_init(&mdev->data.mutex);
2701 	mutex_init(&mdev->meta.mutex);
2702 	sema_init(&mdev->data.work.s, 0);
2703 	sema_init(&mdev->meta.work.s, 0);
2704 	mutex_init(&mdev->state_mutex);
2705 
2706 	spin_lock_init(&mdev->data.work.q_lock);
2707 	spin_lock_init(&mdev->meta.work.q_lock);
2708 
2709 	spin_lock_init(&mdev->al_lock);
2710 	spin_lock_init(&mdev->req_lock);
2711 	spin_lock_init(&mdev->peer_seq_lock);
2712 	spin_lock_init(&mdev->epoch_lock);
2713 
2714 	INIT_LIST_HEAD(&mdev->active_ee);
2715 	INIT_LIST_HEAD(&mdev->sync_ee);
2716 	INIT_LIST_HEAD(&mdev->done_ee);
2717 	INIT_LIST_HEAD(&mdev->read_ee);
2718 	INIT_LIST_HEAD(&mdev->net_ee);
2719 	INIT_LIST_HEAD(&mdev->resync_reads);
2720 	INIT_LIST_HEAD(&mdev->data.work.q);
2721 	INIT_LIST_HEAD(&mdev->meta.work.q);
2722 	INIT_LIST_HEAD(&mdev->resync_work.list);
2723 	INIT_LIST_HEAD(&mdev->unplug_work.list);
2724 	INIT_LIST_HEAD(&mdev->md_sync_work.list);
2725 	INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2726 	INIT_LIST_HEAD(&mdev->delay_probes);
2727 	INIT_LIST_HEAD(&mdev->delay_probe_work.list);
2728 
2729 	mdev->resync_work.cb  = w_resync_inactive;
2730 	mdev->unplug_work.cb  = w_send_write_hint;
2731 	mdev->md_sync_work.cb = w_md_sync;
2732 	mdev->bm_io_work.w.cb = w_bitmap_io;
2733 	mdev->delay_probe_work.cb = w_delay_probes;
2734 	init_timer(&mdev->resync_timer);
2735 	init_timer(&mdev->md_sync_timer);
2736 	init_timer(&mdev->delay_probe_timer);
2737 	mdev->resync_timer.function = resync_timer_fn;
2738 	mdev->resync_timer.data = (unsigned long) mdev;
2739 	mdev->md_sync_timer.function = md_sync_timer_fn;
2740 	mdev->md_sync_timer.data = (unsigned long) mdev;
2741 	mdev->delay_probe_timer.function = delay_probe_timer_fn;
2742 	mdev->delay_probe_timer.data = (unsigned long) mdev;
2743 
2744 
2745 	init_waitqueue_head(&mdev->misc_wait);
2746 	init_waitqueue_head(&mdev->state_wait);
2747 	init_waitqueue_head(&mdev->ee_wait);
2748 	init_waitqueue_head(&mdev->al_wait);
2749 	init_waitqueue_head(&mdev->seq_wait);
2750 
2751 	drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
2752 	drbd_thread_init(mdev, &mdev->worker, drbd_worker);
2753 	drbd_thread_init(mdev, &mdev->asender, drbd_asender);
2754 
2755 	mdev->agreed_pro_version = PRO_VERSION_MAX;
2756 	mdev->write_ordering = WO_bio_barrier;
2757 	mdev->resync_wenr = LC_FREE;
2758 }
2759 
2760 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2761 {
2762 	if (mdev->receiver.t_state != None)
2763 		dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2764 				mdev->receiver.t_state);
2765 
2766 	/* no need to lock it, I'm the only thread alive */
2767 	if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2768 		dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2769 	mdev->al_writ_cnt  =
2770 	mdev->bm_writ_cnt  =
2771 	mdev->read_cnt     =
2772 	mdev->recv_cnt     =
2773 	mdev->send_cnt     =
2774 	mdev->writ_cnt     =
2775 	mdev->p_size       =
2776 	mdev->rs_start     =
2777 	mdev->rs_total     =
2778 	mdev->rs_failed    =
2779 	mdev->rs_mark_left =
2780 	mdev->rs_mark_time = 0;
2781 	D_ASSERT(mdev->net_conf == NULL);
2782 
2783 	drbd_set_my_capacity(mdev, 0);
2784 	if (mdev->bitmap) {
2785 		/* maybe never allocated. */
2786 		drbd_bm_resize(mdev, 0, 1);
2787 		drbd_bm_cleanup(mdev);
2788 	}
2789 
2790 	drbd_free_resources(mdev);
2791 
2792 	/*
2793 	 * currently we drbd_init_ee only on module load, so
2794 	 * we may do drbd_release_ee only on module unload!
2795 	 */
2796 	D_ASSERT(list_empty(&mdev->active_ee));
2797 	D_ASSERT(list_empty(&mdev->sync_ee));
2798 	D_ASSERT(list_empty(&mdev->done_ee));
2799 	D_ASSERT(list_empty(&mdev->read_ee));
2800 	D_ASSERT(list_empty(&mdev->net_ee));
2801 	D_ASSERT(list_empty(&mdev->resync_reads));
2802 	D_ASSERT(list_empty(&mdev->data.work.q));
2803 	D_ASSERT(list_empty(&mdev->meta.work.q));
2804 	D_ASSERT(list_empty(&mdev->resync_work.list));
2805 	D_ASSERT(list_empty(&mdev->unplug_work.list));
2806 
2807 }
2808 
2809 
2810 static void drbd_destroy_mempools(void)
2811 {
2812 	struct page *page;
2813 
2814 	while (drbd_pp_pool) {
2815 		page = drbd_pp_pool;
2816 		drbd_pp_pool = (struct page *)page_private(page);
2817 		__free_page(page);
2818 		drbd_pp_vacant--;
2819 	}
2820 
2821 	/* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2822 
2823 	if (drbd_ee_mempool)
2824 		mempool_destroy(drbd_ee_mempool);
2825 	if (drbd_request_mempool)
2826 		mempool_destroy(drbd_request_mempool);
2827 	if (drbd_ee_cache)
2828 		kmem_cache_destroy(drbd_ee_cache);
2829 	if (drbd_request_cache)
2830 		kmem_cache_destroy(drbd_request_cache);
2831 	if (drbd_bm_ext_cache)
2832 		kmem_cache_destroy(drbd_bm_ext_cache);
2833 	if (drbd_al_ext_cache)
2834 		kmem_cache_destroy(drbd_al_ext_cache);
2835 
2836 	drbd_ee_mempool      = NULL;
2837 	drbd_request_mempool = NULL;
2838 	drbd_ee_cache        = NULL;
2839 	drbd_request_cache   = NULL;
2840 	drbd_bm_ext_cache    = NULL;
2841 	drbd_al_ext_cache    = NULL;
2842 
2843 	return;
2844 }
2845 
2846 static int drbd_create_mempools(void)
2847 {
2848 	struct page *page;
2849 	const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
2850 	int i;
2851 
2852 	/* prepare our caches and mempools */
2853 	drbd_request_mempool = NULL;
2854 	drbd_ee_cache        = NULL;
2855 	drbd_request_cache   = NULL;
2856 	drbd_bm_ext_cache    = NULL;
2857 	drbd_al_ext_cache    = NULL;
2858 	drbd_pp_pool         = NULL;
2859 
2860 	/* caches */
2861 	drbd_request_cache = kmem_cache_create(
2862 		"drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2863 	if (drbd_request_cache == NULL)
2864 		goto Enomem;
2865 
2866 	drbd_ee_cache = kmem_cache_create(
2867 		"drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
2868 	if (drbd_ee_cache == NULL)
2869 		goto Enomem;
2870 
2871 	drbd_bm_ext_cache = kmem_cache_create(
2872 		"drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2873 	if (drbd_bm_ext_cache == NULL)
2874 		goto Enomem;
2875 
2876 	drbd_al_ext_cache = kmem_cache_create(
2877 		"drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2878 	if (drbd_al_ext_cache == NULL)
2879 		goto Enomem;
2880 
2881 	/* mempools */
2882 	drbd_request_mempool = mempool_create(number,
2883 		mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2884 	if (drbd_request_mempool == NULL)
2885 		goto Enomem;
2886 
2887 	drbd_ee_mempool = mempool_create(number,
2888 		mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2889 	if (drbd_request_mempool == NULL)
2890 		goto Enomem;
2891 
2892 	/* drbd's page pool */
2893 	spin_lock_init(&drbd_pp_lock);
2894 
2895 	for (i = 0; i < number; i++) {
2896 		page = alloc_page(GFP_HIGHUSER);
2897 		if (!page)
2898 			goto Enomem;
2899 		set_page_private(page, (unsigned long)drbd_pp_pool);
2900 		drbd_pp_pool = page;
2901 	}
2902 	drbd_pp_vacant = number;
2903 
2904 	return 0;
2905 
2906 Enomem:
2907 	drbd_destroy_mempools(); /* in case we allocated some */
2908 	return -ENOMEM;
2909 }
2910 
2911 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2912 	void *unused)
2913 {
2914 	/* just so we have it.  you never know what interesting things we
2915 	 * might want to do here some day...
2916 	 */
2917 
2918 	return NOTIFY_DONE;
2919 }
2920 
2921 static struct notifier_block drbd_notifier = {
2922 	.notifier_call = drbd_notify_sys,
2923 };
2924 
2925 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2926 {
2927 	int rr;
2928 
2929 	rr = drbd_release_ee(mdev, &mdev->active_ee);
2930 	if (rr)
2931 		dev_err(DEV, "%d EEs in active list found!\n", rr);
2932 
2933 	rr = drbd_release_ee(mdev, &mdev->sync_ee);
2934 	if (rr)
2935 		dev_err(DEV, "%d EEs in sync list found!\n", rr);
2936 
2937 	rr = drbd_release_ee(mdev, &mdev->read_ee);
2938 	if (rr)
2939 		dev_err(DEV, "%d EEs in read list found!\n", rr);
2940 
2941 	rr = drbd_release_ee(mdev, &mdev->done_ee);
2942 	if (rr)
2943 		dev_err(DEV, "%d EEs in done list found!\n", rr);
2944 
2945 	rr = drbd_release_ee(mdev, &mdev->net_ee);
2946 	if (rr)
2947 		dev_err(DEV, "%d EEs in net list found!\n", rr);
2948 }
2949 
2950 /* caution. no locking.
2951  * currently only used from module cleanup code. */
2952 static void drbd_delete_device(unsigned int minor)
2953 {
2954 	struct drbd_conf *mdev = minor_to_mdev(minor);
2955 
2956 	if (!mdev)
2957 		return;
2958 
2959 	/* paranoia asserts */
2960 	if (mdev->open_cnt != 0)
2961 		dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
2962 				__FILE__ , __LINE__);
2963 
2964 	ERR_IF (!list_empty(&mdev->data.work.q)) {
2965 		struct list_head *lp;
2966 		list_for_each(lp, &mdev->data.work.q) {
2967 			dev_err(DEV, "lp = %p\n", lp);
2968 		}
2969 	};
2970 	/* end paranoia asserts */
2971 
2972 	del_gendisk(mdev->vdisk);
2973 
2974 	/* cleanup stuff that may have been allocated during
2975 	 * device (re-)configuration or state changes */
2976 
2977 	if (mdev->this_bdev)
2978 		bdput(mdev->this_bdev);
2979 
2980 	drbd_free_resources(mdev);
2981 
2982 	drbd_release_ee_lists(mdev);
2983 
2984 	/* should be free'd on disconnect? */
2985 	kfree(mdev->ee_hash);
2986 	/*
2987 	mdev->ee_hash_s = 0;
2988 	mdev->ee_hash = NULL;
2989 	*/
2990 
2991 	lc_destroy(mdev->act_log);
2992 	lc_destroy(mdev->resync);
2993 
2994 	kfree(mdev->p_uuid);
2995 	/* mdev->p_uuid = NULL; */
2996 
2997 	kfree(mdev->int_dig_out);
2998 	kfree(mdev->int_dig_in);
2999 	kfree(mdev->int_dig_vv);
3000 
3001 	/* cleanup the rest that has been
3002 	 * allocated from drbd_new_device
3003 	 * and actually free the mdev itself */
3004 	drbd_free_mdev(mdev);
3005 }
3006 
3007 static void drbd_cleanup(void)
3008 {
3009 	unsigned int i;
3010 
3011 	unregister_reboot_notifier(&drbd_notifier);
3012 
3013 	drbd_nl_cleanup();
3014 
3015 	if (minor_table) {
3016 		if (drbd_proc)
3017 			remove_proc_entry("drbd", NULL);
3018 		i = minor_count;
3019 		while (i--)
3020 			drbd_delete_device(i);
3021 		drbd_destroy_mempools();
3022 	}
3023 
3024 	kfree(minor_table);
3025 
3026 	unregister_blkdev(DRBD_MAJOR, "drbd");
3027 
3028 	printk(KERN_INFO "drbd: module cleanup done.\n");
3029 }
3030 
3031 /**
3032  * drbd_congested() - Callback for pdflush
3033  * @congested_data:	User data
3034  * @bdi_bits:		Bits pdflush is currently interested in
3035  *
3036  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3037  */
3038 static int drbd_congested(void *congested_data, int bdi_bits)
3039 {
3040 	struct drbd_conf *mdev = congested_data;
3041 	struct request_queue *q;
3042 	char reason = '-';
3043 	int r = 0;
3044 
3045 	if (!__inc_ap_bio_cond(mdev)) {
3046 		/* DRBD has frozen IO */
3047 		r = bdi_bits;
3048 		reason = 'd';
3049 		goto out;
3050 	}
3051 
3052 	if (get_ldev(mdev)) {
3053 		q = bdev_get_queue(mdev->ldev->backing_bdev);
3054 		r = bdi_congested(&q->backing_dev_info, bdi_bits);
3055 		put_ldev(mdev);
3056 		if (r)
3057 			reason = 'b';
3058 	}
3059 
3060 	if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3061 		r |= (1 << BDI_async_congested);
3062 		reason = reason == 'b' ? 'a' : 'n';
3063 	}
3064 
3065 out:
3066 	mdev->congestion_reason = reason;
3067 	return r;
3068 }
3069 
3070 struct drbd_conf *drbd_new_device(unsigned int minor)
3071 {
3072 	struct drbd_conf *mdev;
3073 	struct gendisk *disk;
3074 	struct request_queue *q;
3075 
3076 	/* GFP_KERNEL, we are outside of all write-out paths */
3077 	mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3078 	if (!mdev)
3079 		return NULL;
3080 	if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3081 		goto out_no_cpumask;
3082 
3083 	mdev->minor = minor;
3084 
3085 	drbd_init_set_defaults(mdev);
3086 
3087 	q = blk_alloc_queue(GFP_KERNEL);
3088 	if (!q)
3089 		goto out_no_q;
3090 	mdev->rq_queue = q;
3091 	q->queuedata   = mdev;
3092 
3093 	disk = alloc_disk(1);
3094 	if (!disk)
3095 		goto out_no_disk;
3096 	mdev->vdisk = disk;
3097 
3098 	set_disk_ro(disk, TRUE);
3099 
3100 	disk->queue = q;
3101 	disk->major = DRBD_MAJOR;
3102 	disk->first_minor = minor;
3103 	disk->fops = &drbd_ops;
3104 	sprintf(disk->disk_name, "drbd%d", minor);
3105 	disk->private_data = mdev;
3106 
3107 	mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3108 	/* we have no partitions. we contain only ourselves. */
3109 	mdev->this_bdev->bd_contains = mdev->this_bdev;
3110 
3111 	q->backing_dev_info.congested_fn = drbd_congested;
3112 	q->backing_dev_info.congested_data = mdev;
3113 
3114 	blk_queue_make_request(q, drbd_make_request_26);
3115 	blk_queue_max_segment_size(q, DRBD_MAX_SEGMENT_SIZE);
3116 	blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3117 	blk_queue_merge_bvec(q, drbd_merge_bvec);
3118 	q->queue_lock = &mdev->req_lock; /* needed since we use */
3119 		/* plugging on a queue, that actually has no requests! */
3120 	q->unplug_fn = drbd_unplug_fn;
3121 
3122 	mdev->md_io_page = alloc_page(GFP_KERNEL);
3123 	if (!mdev->md_io_page)
3124 		goto out_no_io_page;
3125 
3126 	if (drbd_bm_init(mdev))
3127 		goto out_no_bitmap;
3128 	/* no need to lock access, we are still initializing this minor device. */
3129 	if (!tl_init(mdev))
3130 		goto out_no_tl;
3131 
3132 	mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3133 	if (!mdev->app_reads_hash)
3134 		goto out_no_app_reads;
3135 
3136 	mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3137 	if (!mdev->current_epoch)
3138 		goto out_no_epoch;
3139 
3140 	INIT_LIST_HEAD(&mdev->current_epoch->list);
3141 	mdev->epochs = 1;
3142 
3143 	return mdev;
3144 
3145 /* out_whatever_else:
3146 	kfree(mdev->current_epoch); */
3147 out_no_epoch:
3148 	kfree(mdev->app_reads_hash);
3149 out_no_app_reads:
3150 	tl_cleanup(mdev);
3151 out_no_tl:
3152 	drbd_bm_cleanup(mdev);
3153 out_no_bitmap:
3154 	__free_page(mdev->md_io_page);
3155 out_no_io_page:
3156 	put_disk(disk);
3157 out_no_disk:
3158 	blk_cleanup_queue(q);
3159 out_no_q:
3160 	free_cpumask_var(mdev->cpu_mask);
3161 out_no_cpumask:
3162 	kfree(mdev);
3163 	return NULL;
3164 }
3165 
3166 /* counterpart of drbd_new_device.
3167  * last part of drbd_delete_device. */
3168 void drbd_free_mdev(struct drbd_conf *mdev)
3169 {
3170 	kfree(mdev->current_epoch);
3171 	kfree(mdev->app_reads_hash);
3172 	tl_cleanup(mdev);
3173 	if (mdev->bitmap) /* should no longer be there. */
3174 		drbd_bm_cleanup(mdev);
3175 	__free_page(mdev->md_io_page);
3176 	put_disk(mdev->vdisk);
3177 	blk_cleanup_queue(mdev->rq_queue);
3178 	free_cpumask_var(mdev->cpu_mask);
3179 	kfree(mdev);
3180 }
3181 
3182 
3183 int __init drbd_init(void)
3184 {
3185 	int err;
3186 
3187 	if (sizeof(struct p_handshake) != 80) {
3188 		printk(KERN_ERR
3189 		       "drbd: never change the size or layout "
3190 		       "of the HandShake packet.\n");
3191 		return -EINVAL;
3192 	}
3193 
3194 	if (1 > minor_count || minor_count > 255) {
3195 		printk(KERN_ERR
3196 			"drbd: invalid minor_count (%d)\n", minor_count);
3197 #ifdef MODULE
3198 		return -EINVAL;
3199 #else
3200 		minor_count = 8;
3201 #endif
3202 	}
3203 
3204 	err = drbd_nl_init();
3205 	if (err)
3206 		return err;
3207 
3208 	err = register_blkdev(DRBD_MAJOR, "drbd");
3209 	if (err) {
3210 		printk(KERN_ERR
3211 		       "drbd: unable to register block device major %d\n",
3212 		       DRBD_MAJOR);
3213 		return err;
3214 	}
3215 
3216 	register_reboot_notifier(&drbd_notifier);
3217 
3218 	/*
3219 	 * allocate all necessary structs
3220 	 */
3221 	err = -ENOMEM;
3222 
3223 	init_waitqueue_head(&drbd_pp_wait);
3224 
3225 	drbd_proc = NULL; /* play safe for drbd_cleanup */
3226 	minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3227 				GFP_KERNEL);
3228 	if (!minor_table)
3229 		goto Enomem;
3230 
3231 	err = drbd_create_mempools();
3232 	if (err)
3233 		goto Enomem;
3234 
3235 	drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3236 	if (!drbd_proc)	{
3237 		printk(KERN_ERR "drbd: unable to register proc file\n");
3238 		goto Enomem;
3239 	}
3240 
3241 	rwlock_init(&global_state_lock);
3242 
3243 	printk(KERN_INFO "drbd: initialized. "
3244 	       "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3245 	       API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3246 	printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3247 	printk(KERN_INFO "drbd: registered as block device major %d\n",
3248 		DRBD_MAJOR);
3249 	printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3250 
3251 	return 0; /* Success! */
3252 
3253 Enomem:
3254 	drbd_cleanup();
3255 	if (err == -ENOMEM)
3256 		/* currently always the case */
3257 		printk(KERN_ERR "drbd: ran out of memory\n");
3258 	else
3259 		printk(KERN_ERR "drbd: initialization failure\n");
3260 	return err;
3261 }
3262 
3263 void drbd_free_bc(struct drbd_backing_dev *ldev)
3264 {
3265 	if (ldev == NULL)
3266 		return;
3267 
3268 	bd_release(ldev->backing_bdev);
3269 	bd_release(ldev->md_bdev);
3270 
3271 	fput(ldev->lo_file);
3272 	fput(ldev->md_file);
3273 
3274 	kfree(ldev);
3275 }
3276 
3277 void drbd_free_sock(struct drbd_conf *mdev)
3278 {
3279 	if (mdev->data.socket) {
3280 		mutex_lock(&mdev->data.mutex);
3281 		kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3282 		sock_release(mdev->data.socket);
3283 		mdev->data.socket = NULL;
3284 		mutex_unlock(&mdev->data.mutex);
3285 	}
3286 	if (mdev->meta.socket) {
3287 		mutex_lock(&mdev->meta.mutex);
3288 		kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3289 		sock_release(mdev->meta.socket);
3290 		mdev->meta.socket = NULL;
3291 		mutex_unlock(&mdev->meta.mutex);
3292 	}
3293 }
3294 
3295 
3296 void drbd_free_resources(struct drbd_conf *mdev)
3297 {
3298 	crypto_free_hash(mdev->csums_tfm);
3299 	mdev->csums_tfm = NULL;
3300 	crypto_free_hash(mdev->verify_tfm);
3301 	mdev->verify_tfm = NULL;
3302 	crypto_free_hash(mdev->cram_hmac_tfm);
3303 	mdev->cram_hmac_tfm = NULL;
3304 	crypto_free_hash(mdev->integrity_w_tfm);
3305 	mdev->integrity_w_tfm = NULL;
3306 	crypto_free_hash(mdev->integrity_r_tfm);
3307 	mdev->integrity_r_tfm = NULL;
3308 
3309 	drbd_free_sock(mdev);
3310 
3311 	__no_warn(local,
3312 		  drbd_free_bc(mdev->ldev);
3313 		  mdev->ldev = NULL;);
3314 }
3315 
3316 /* meta data management */
3317 
3318 struct meta_data_on_disk {
3319 	u64 la_size;           /* last agreed size. */
3320 	u64 uuid[UI_SIZE];   /* UUIDs. */
3321 	u64 device_uuid;
3322 	u64 reserved_u64_1;
3323 	u32 flags;             /* MDF */
3324 	u32 magic;
3325 	u32 md_size_sect;
3326 	u32 al_offset;         /* offset to this block */
3327 	u32 al_nr_extents;     /* important for restoring the AL */
3328 	      /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3329 	u32 bm_offset;         /* offset to the bitmap, from here */
3330 	u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3331 	u32 reserved_u32[4];
3332 
3333 } __packed;
3334 
3335 /**
3336  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3337  * @mdev:	DRBD device.
3338  */
3339 void drbd_md_sync(struct drbd_conf *mdev)
3340 {
3341 	struct meta_data_on_disk *buffer;
3342 	sector_t sector;
3343 	int i;
3344 
3345 	if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3346 		return;
3347 	del_timer(&mdev->md_sync_timer);
3348 
3349 	/* We use here D_FAILED and not D_ATTACHING because we try to write
3350 	 * metadata even if we detach due to a disk failure! */
3351 	if (!get_ldev_if_state(mdev, D_FAILED))
3352 		return;
3353 
3354 	mutex_lock(&mdev->md_io_mutex);
3355 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3356 	memset(buffer, 0, 512);
3357 
3358 	buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3359 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3360 		buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3361 	buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3362 	buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3363 
3364 	buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3365 	buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3366 	buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3367 	buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3368 	buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3369 
3370 	buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3371 
3372 	D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3373 	sector = mdev->ldev->md.md_offset;
3374 
3375 	if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3376 		clear_bit(MD_DIRTY, &mdev->flags);
3377 	} else {
3378 		/* this was a try anyways ... */
3379 		dev_err(DEV, "meta data update failed!\n");
3380 
3381 		drbd_chk_io_error(mdev, 1, TRUE);
3382 	}
3383 
3384 	/* Update mdev->ldev->md.la_size_sect,
3385 	 * since we updated it on metadata. */
3386 	mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3387 
3388 	mutex_unlock(&mdev->md_io_mutex);
3389 	put_ldev(mdev);
3390 }
3391 
3392 /**
3393  * drbd_md_read() - Reads in the meta data super block
3394  * @mdev:	DRBD device.
3395  * @bdev:	Device from which the meta data should be read in.
3396  *
3397  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_codes in case
3398  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3399  */
3400 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3401 {
3402 	struct meta_data_on_disk *buffer;
3403 	int i, rv = NO_ERROR;
3404 
3405 	if (!get_ldev_if_state(mdev, D_ATTACHING))
3406 		return ERR_IO_MD_DISK;
3407 
3408 	mutex_lock(&mdev->md_io_mutex);
3409 	buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3410 
3411 	if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3412 		/* NOTE: cant do normal error processing here as this is
3413 		   called BEFORE disk is attached */
3414 		dev_err(DEV, "Error while reading metadata.\n");
3415 		rv = ERR_IO_MD_DISK;
3416 		goto err;
3417 	}
3418 
3419 	if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3420 		dev_err(DEV, "Error while reading metadata, magic not found.\n");
3421 		rv = ERR_MD_INVALID;
3422 		goto err;
3423 	}
3424 	if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3425 		dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3426 		    be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3427 		rv = ERR_MD_INVALID;
3428 		goto err;
3429 	}
3430 	if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3431 		dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3432 		    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3433 		rv = ERR_MD_INVALID;
3434 		goto err;
3435 	}
3436 	if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3437 		dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3438 		    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3439 		rv = ERR_MD_INVALID;
3440 		goto err;
3441 	}
3442 
3443 	if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3444 		dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3445 		    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3446 		rv = ERR_MD_INVALID;
3447 		goto err;
3448 	}
3449 
3450 	bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3451 	for (i = UI_CURRENT; i < UI_SIZE; i++)
3452 		bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3453 	bdev->md.flags = be32_to_cpu(buffer->flags);
3454 	mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3455 	bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3456 
3457 	if (mdev->sync_conf.al_extents < 7)
3458 		mdev->sync_conf.al_extents = 127;
3459 
3460  err:
3461 	mutex_unlock(&mdev->md_io_mutex);
3462 	put_ldev(mdev);
3463 
3464 	return rv;
3465 }
3466 
3467 /**
3468  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3469  * @mdev:	DRBD device.
3470  *
3471  * Call this function if you change anything that should be written to
3472  * the meta-data super block. This function sets MD_DIRTY, and starts a
3473  * timer that ensures that within five seconds you have to call drbd_md_sync().
3474  */
3475 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3476 {
3477 	set_bit(MD_DIRTY, &mdev->flags);
3478 	mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3479 }
3480 
3481 
3482 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3483 {
3484 	int i;
3485 
3486 	for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3487 		mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3488 }
3489 
3490 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3491 {
3492 	if (idx == UI_CURRENT) {
3493 		if (mdev->state.role == R_PRIMARY)
3494 			val |= 1;
3495 		else
3496 			val &= ~((u64)1);
3497 
3498 		drbd_set_ed_uuid(mdev, val);
3499 	}
3500 
3501 	mdev->ldev->md.uuid[idx] = val;
3502 	drbd_md_mark_dirty(mdev);
3503 }
3504 
3505 
3506 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3507 {
3508 	if (mdev->ldev->md.uuid[idx]) {
3509 		drbd_uuid_move_history(mdev);
3510 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3511 	}
3512 	_drbd_uuid_set(mdev, idx, val);
3513 }
3514 
3515 /**
3516  * drbd_uuid_new_current() - Creates a new current UUID
3517  * @mdev:	DRBD device.
3518  *
3519  * Creates a new current UUID, and rotates the old current UUID into
3520  * the bitmap slot. Causes an incremental resync upon next connect.
3521  */
3522 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3523 {
3524 	u64 val;
3525 
3526 	dev_info(DEV, "Creating new current UUID\n");
3527 	D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3528 	mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3529 
3530 	get_random_bytes(&val, sizeof(u64));
3531 	_drbd_uuid_set(mdev, UI_CURRENT, val);
3532 }
3533 
3534 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3535 {
3536 	if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3537 		return;
3538 
3539 	if (val == 0) {
3540 		drbd_uuid_move_history(mdev);
3541 		mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3542 		mdev->ldev->md.uuid[UI_BITMAP] = 0;
3543 	} else {
3544 		if (mdev->ldev->md.uuid[UI_BITMAP])
3545 			dev_warn(DEV, "bm UUID already set");
3546 
3547 		mdev->ldev->md.uuid[UI_BITMAP] = val;
3548 		mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3549 
3550 	}
3551 	drbd_md_mark_dirty(mdev);
3552 }
3553 
3554 /**
3555  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3556  * @mdev:	DRBD device.
3557  *
3558  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3559  */
3560 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3561 {
3562 	int rv = -EIO;
3563 
3564 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3565 		drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3566 		drbd_md_sync(mdev);
3567 		drbd_bm_set_all(mdev);
3568 
3569 		rv = drbd_bm_write(mdev);
3570 
3571 		if (!rv) {
3572 			drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3573 			drbd_md_sync(mdev);
3574 		}
3575 
3576 		put_ldev(mdev);
3577 	}
3578 
3579 	return rv;
3580 }
3581 
3582 /**
3583  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3584  * @mdev:	DRBD device.
3585  *
3586  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3587  */
3588 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3589 {
3590 	int rv = -EIO;
3591 
3592 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
3593 		drbd_bm_clear_all(mdev);
3594 		rv = drbd_bm_write(mdev);
3595 		put_ldev(mdev);
3596 	}
3597 
3598 	return rv;
3599 }
3600 
3601 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3602 {
3603 	struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3604 	int rv;
3605 
3606 	D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3607 
3608 	drbd_bm_lock(mdev, work->why);
3609 	rv = work->io_fn(mdev);
3610 	drbd_bm_unlock(mdev);
3611 
3612 	clear_bit(BITMAP_IO, &mdev->flags);
3613 	wake_up(&mdev->misc_wait);
3614 
3615 	if (work->done)
3616 		work->done(mdev, rv);
3617 
3618 	clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3619 	work->why = NULL;
3620 
3621 	return 1;
3622 }
3623 
3624 /**
3625  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3626  * @mdev:	DRBD device.
3627  * @io_fn:	IO callback to be called when bitmap IO is possible
3628  * @done:	callback to be called after the bitmap IO was performed
3629  * @why:	Descriptive text of the reason for doing the IO
3630  *
3631  * While IO on the bitmap happens we freeze application IO thus we ensure
3632  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3633  * called from worker context. It MUST NOT be used while a previous such
3634  * work is still pending!
3635  */
3636 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3637 			  int (*io_fn)(struct drbd_conf *),
3638 			  void (*done)(struct drbd_conf *, int),
3639 			  char *why)
3640 {
3641 	D_ASSERT(current == mdev->worker.task);
3642 
3643 	D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3644 	D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3645 	D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3646 	if (mdev->bm_io_work.why)
3647 		dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3648 			why, mdev->bm_io_work.why);
3649 
3650 	mdev->bm_io_work.io_fn = io_fn;
3651 	mdev->bm_io_work.done = done;
3652 	mdev->bm_io_work.why = why;
3653 
3654 	set_bit(BITMAP_IO, &mdev->flags);
3655 	if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3656 		if (list_empty(&mdev->bm_io_work.w.list)) {
3657 			set_bit(BITMAP_IO_QUEUED, &mdev->flags);
3658 			drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
3659 		} else
3660 			dev_err(DEV, "FIXME avoided double queuing bm_io_work\n");
3661 	}
3662 }
3663 
3664 /**
3665  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3666  * @mdev:	DRBD device.
3667  * @io_fn:	IO callback to be called when bitmap IO is possible
3668  * @why:	Descriptive text of the reason for doing the IO
3669  *
3670  * freezes application IO while that the actual IO operations runs. This
3671  * functions MAY NOT be called from worker context.
3672  */
3673 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
3674 {
3675 	int rv;
3676 
3677 	D_ASSERT(current != mdev->worker.task);
3678 
3679 	drbd_suspend_io(mdev);
3680 
3681 	drbd_bm_lock(mdev, why);
3682 	rv = io_fn(mdev);
3683 	drbd_bm_unlock(mdev);
3684 
3685 	drbd_resume_io(mdev);
3686 
3687 	return rv;
3688 }
3689 
3690 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3691 {
3692 	if ((mdev->ldev->md.flags & flag) != flag) {
3693 		drbd_md_mark_dirty(mdev);
3694 		mdev->ldev->md.flags |= flag;
3695 	}
3696 }
3697 
3698 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3699 {
3700 	if ((mdev->ldev->md.flags & flag) != 0) {
3701 		drbd_md_mark_dirty(mdev);
3702 		mdev->ldev->md.flags &= ~flag;
3703 	}
3704 }
3705 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3706 {
3707 	return (bdev->md.flags & flag) != 0;
3708 }
3709 
3710 static void md_sync_timer_fn(unsigned long data)
3711 {
3712 	struct drbd_conf *mdev = (struct drbd_conf *) data;
3713 
3714 	drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
3715 }
3716 
3717 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3718 {
3719 	dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3720 	drbd_md_sync(mdev);
3721 
3722 	return 1;
3723 }
3724 
3725 #ifdef CONFIG_DRBD_FAULT_INJECTION
3726 /* Fault insertion support including random number generator shamelessly
3727  * stolen from kernel/rcutorture.c */
3728 struct fault_random_state {
3729 	unsigned long state;
3730 	unsigned long count;
3731 };
3732 
3733 #define FAULT_RANDOM_MULT 39916801  /* prime */
3734 #define FAULT_RANDOM_ADD	479001701 /* prime */
3735 #define FAULT_RANDOM_REFRESH 10000
3736 
3737 /*
3738  * Crude but fast random-number generator.  Uses a linear congruential
3739  * generator, with occasional help from get_random_bytes().
3740  */
3741 static unsigned long
3742 _drbd_fault_random(struct fault_random_state *rsp)
3743 {
3744 	long refresh;
3745 
3746 	if (!rsp->count--) {
3747 		get_random_bytes(&refresh, sizeof(refresh));
3748 		rsp->state += refresh;
3749 		rsp->count = FAULT_RANDOM_REFRESH;
3750 	}
3751 	rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3752 	return swahw32(rsp->state);
3753 }
3754 
3755 static char *
3756 _drbd_fault_str(unsigned int type) {
3757 	static char *_faults[] = {
3758 		[DRBD_FAULT_MD_WR] = "Meta-data write",
3759 		[DRBD_FAULT_MD_RD] = "Meta-data read",
3760 		[DRBD_FAULT_RS_WR] = "Resync write",
3761 		[DRBD_FAULT_RS_RD] = "Resync read",
3762 		[DRBD_FAULT_DT_WR] = "Data write",
3763 		[DRBD_FAULT_DT_RD] = "Data read",
3764 		[DRBD_FAULT_DT_RA] = "Data read ahead",
3765 		[DRBD_FAULT_BM_ALLOC] = "BM allocation",
3766 		[DRBD_FAULT_AL_EE] = "EE allocation",
3767 		[DRBD_FAULT_RECEIVE] = "receive data corruption",
3768 	};
3769 
3770 	return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3771 }
3772 
3773 unsigned int
3774 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3775 {
3776 	static struct fault_random_state rrs = {0, 0};
3777 
3778 	unsigned int ret = (
3779 		(fault_devs == 0 ||
3780 			((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3781 		(((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3782 
3783 	if (ret) {
3784 		fault_count++;
3785 
3786 		if (__ratelimit(&drbd_ratelimit_state))
3787 			dev_warn(DEV, "***Simulating %s failure\n",
3788 				_drbd_fault_str(type));
3789 	}
3790 
3791 	return ret;
3792 }
3793 #endif
3794 
3795 const char *drbd_buildtag(void)
3796 {
3797 	/* DRBD built from external sources has here a reference to the
3798 	   git hash of the source code. */
3799 
3800 	static char buildtag[38] = "\0uilt-in";
3801 
3802 	if (buildtag[0] == 0) {
3803 #ifdef CONFIG_MODULES
3804 		if (THIS_MODULE != NULL)
3805 			sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3806 		else
3807 #endif
3808 			buildtag[0] = 'b';
3809 	}
3810 
3811 	return buildtag;
3812 }
3813 
3814 module_init(drbd_init)
3815 module_exit(drbd_cleanup)
3816 
3817 EXPORT_SYMBOL(drbd_conn_str);
3818 EXPORT_SYMBOL(drbd_role_str);
3819 EXPORT_SYMBOL(drbd_disk_str);
3820 EXPORT_SYMBOL(drbd_set_st_err_str);
3821