xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision 7dd65feb)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38 
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41 
42 #define SLEEP_TIME (HZ/10)
43 
44 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
45 
46 
47 
48 /* defined here:
49    drbd_md_io_complete
50    drbd_endio_write_sec
51    drbd_endio_read_sec
52    drbd_endio_pri
53 
54  * more endio handlers:
55    atodb_endio in drbd_actlog.c
56    drbd_bm_async_io_complete in drbd_bitmap.c
57 
58  * For all these callbacks, note the following:
59  * The callbacks will be called in irq context by the IDE drivers,
60  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
61  * Try to get the locking right :)
62  *
63  */
64 
65 
66 /* About the global_state_lock
67    Each state transition on an device holds a read lock. In case we have
68    to evaluate the sync after dependencies, we grab a write lock, because
69    we need stable states on all devices for that.  */
70 rwlock_t global_state_lock;
71 
72 /* used for synchronous meta data and bitmap IO
73  * submitted by drbd_md_sync_page_io()
74  */
75 void drbd_md_io_complete(struct bio *bio, int error)
76 {
77 	struct drbd_md_io *md_io;
78 
79 	md_io = (struct drbd_md_io *)bio->bi_private;
80 	md_io->error = error;
81 
82 	complete(&md_io->event);
83 }
84 
85 /* reads on behalf of the partner,
86  * "submitted" by the receiver
87  */
88 void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
89 {
90 	unsigned long flags = 0;
91 	struct drbd_epoch_entry *e = NULL;
92 	struct drbd_conf *mdev;
93 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
94 
95 	e = bio->bi_private;
96 	mdev = e->mdev;
97 
98 	if (error)
99 		dev_warn(DEV, "read: error=%d s=%llus\n", error,
100 				(unsigned long long)e->sector);
101 	if (!error && !uptodate) {
102 		dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
103 				(unsigned long long)e->sector);
104 		/* strange behavior of some lower level drivers...
105 		 * fail the request by clearing the uptodate flag,
106 		 * but do not return any error?! */
107 		error = -EIO;
108 	}
109 
110 	D_ASSERT(e->block_id != ID_VACANT);
111 
112 	spin_lock_irqsave(&mdev->req_lock, flags);
113 	mdev->read_cnt += e->size >> 9;
114 	list_del(&e->w.list);
115 	if (list_empty(&mdev->read_ee))
116 		wake_up(&mdev->ee_wait);
117 	spin_unlock_irqrestore(&mdev->req_lock, flags);
118 
119 	drbd_chk_io_error(mdev, error, FALSE);
120 	drbd_queue_work(&mdev->data.work, &e->w);
121 	put_ldev(mdev);
122 }
123 
124 /* writes on behalf of the partner, or resync writes,
125  * "submitted" by the receiver.
126  */
127 void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
128 {
129 	unsigned long flags = 0;
130 	struct drbd_epoch_entry *e = NULL;
131 	struct drbd_conf *mdev;
132 	sector_t e_sector;
133 	int do_wake;
134 	int is_syncer_req;
135 	int do_al_complete_io;
136 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
137 	int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
138 
139 	e = bio->bi_private;
140 	mdev = e->mdev;
141 
142 	if (error)
143 		dev_warn(DEV, "write: error=%d s=%llus\n", error,
144 				(unsigned long long)e->sector);
145 	if (!error && !uptodate) {
146 		dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
147 				(unsigned long long)e->sector);
148 		/* strange behavior of some lower level drivers...
149 		 * fail the request by clearing the uptodate flag,
150 		 * but do not return any error?! */
151 		error = -EIO;
152 	}
153 
154 	/* error == -ENOTSUPP would be a better test,
155 	 * alas it is not reliable */
156 	if (error && is_barrier && e->flags & EE_IS_BARRIER) {
157 		drbd_bump_write_ordering(mdev, WO_bdev_flush);
158 		spin_lock_irqsave(&mdev->req_lock, flags);
159 		list_del(&e->w.list);
160 		e->w.cb = w_e_reissue;
161 		/* put_ldev actually happens below, once we come here again. */
162 		__release(local);
163 		spin_unlock_irqrestore(&mdev->req_lock, flags);
164 		drbd_queue_work(&mdev->data.work, &e->w);
165 		return;
166 	}
167 
168 	D_ASSERT(e->block_id != ID_VACANT);
169 
170 	spin_lock_irqsave(&mdev->req_lock, flags);
171 	mdev->writ_cnt += e->size >> 9;
172 	is_syncer_req = is_syncer_block_id(e->block_id);
173 
174 	/* after we moved e to done_ee,
175 	 * we may no longer access it,
176 	 * it may be freed/reused already!
177 	 * (as soon as we release the req_lock) */
178 	e_sector = e->sector;
179 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
180 
181 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
182 	list_add_tail(&e->w.list, &mdev->done_ee);
183 
184 	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
185 	 * neither did we wake possibly waiting conflicting requests.
186 	 * done from "drbd_process_done_ee" within the appropriate w.cb
187 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
188 
189 	do_wake = is_syncer_req
190 		? list_empty(&mdev->sync_ee)
191 		: list_empty(&mdev->active_ee);
192 
193 	if (error)
194 		__drbd_chk_io_error(mdev, FALSE);
195 	spin_unlock_irqrestore(&mdev->req_lock, flags);
196 
197 	if (is_syncer_req)
198 		drbd_rs_complete_io(mdev, e_sector);
199 
200 	if (do_wake)
201 		wake_up(&mdev->ee_wait);
202 
203 	if (do_al_complete_io)
204 		drbd_al_complete_io(mdev, e_sector);
205 
206 	wake_asender(mdev);
207 	put_ldev(mdev);
208 
209 }
210 
211 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
212  */
213 void drbd_endio_pri(struct bio *bio, int error)
214 {
215 	unsigned long flags;
216 	struct drbd_request *req = bio->bi_private;
217 	struct drbd_conf *mdev = req->mdev;
218 	struct bio_and_error m;
219 	enum drbd_req_event what;
220 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
221 
222 	if (error)
223 		dev_warn(DEV, "p %s: error=%d\n",
224 			 bio_data_dir(bio) == WRITE ? "write" : "read", error);
225 	if (!error && !uptodate) {
226 		dev_warn(DEV, "p %s: setting error to -EIO\n",
227 			 bio_data_dir(bio) == WRITE ? "write" : "read");
228 		/* strange behavior of some lower level drivers...
229 		 * fail the request by clearing the uptodate flag,
230 		 * but do not return any error?! */
231 		error = -EIO;
232 	}
233 
234 	/* to avoid recursion in __req_mod */
235 	if (unlikely(error)) {
236 		what = (bio_data_dir(bio) == WRITE)
237 			? write_completed_with_error
238 			: (bio_rw(bio) == READA)
239 			  ? read_completed_with_error
240 			  : read_ahead_completed_with_error;
241 	} else
242 		what = completed_ok;
243 
244 	bio_put(req->private_bio);
245 	req->private_bio = ERR_PTR(error);
246 
247 	spin_lock_irqsave(&mdev->req_lock, flags);
248 	__req_mod(req, what, &m);
249 	spin_unlock_irqrestore(&mdev->req_lock, flags);
250 
251 	if (m.bio)
252 		complete_master_bio(mdev, &m);
253 }
254 
255 int w_io_error(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
256 {
257 	struct drbd_request *req = container_of(w, struct drbd_request, w);
258 
259 	/* NOTE: mdev->ldev can be NULL by the time we get here! */
260 	/* D_ASSERT(mdev->ldev->dc.on_io_error != EP_PASS_ON); */
261 
262 	/* the only way this callback is scheduled is from _req_may_be_done,
263 	 * when it is done and had a local write error, see comments there */
264 	drbd_req_free(req);
265 
266 	return TRUE;
267 }
268 
269 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
270 {
271 	struct drbd_request *req = container_of(w, struct drbd_request, w);
272 
273 	/* We should not detach for read io-error,
274 	 * but try to WRITE the P_DATA_REPLY to the failed location,
275 	 * to give the disk the chance to relocate that block */
276 
277 	spin_lock_irq(&mdev->req_lock);
278 	if (cancel ||
279 	    mdev->state.conn < C_CONNECTED ||
280 	    mdev->state.pdsk <= D_INCONSISTENT) {
281 		_req_mod(req, send_canceled);
282 		spin_unlock_irq(&mdev->req_lock);
283 		dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n");
284 		return 1;
285 	}
286 	spin_unlock_irq(&mdev->req_lock);
287 
288 	return w_send_read_req(mdev, w, 0);
289 }
290 
291 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
292 {
293 	ERR_IF(cancel) return 1;
294 	dev_err(DEV, "resync inactive, but callback triggered??\n");
295 	return 1; /* Simply ignore this! */
296 }
297 
298 void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
299 {
300 	struct hash_desc desc;
301 	struct scatterlist sg;
302 	struct bio_vec *bvec;
303 	int i;
304 
305 	desc.tfm = tfm;
306 	desc.flags = 0;
307 
308 	sg_init_table(&sg, 1);
309 	crypto_hash_init(&desc);
310 
311 	__bio_for_each_segment(bvec, bio, i, 0) {
312 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
313 		crypto_hash_update(&desc, &sg, sg.length);
314 	}
315 	crypto_hash_final(&desc, digest);
316 }
317 
318 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
319 {
320 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
321 	int digest_size;
322 	void *digest;
323 	int ok;
324 
325 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
326 
327 	if (unlikely(cancel)) {
328 		drbd_free_ee(mdev, e);
329 		return 1;
330 	}
331 
332 	if (likely(drbd_bio_uptodate(e->private_bio))) {
333 		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
334 		digest = kmalloc(digest_size, GFP_NOIO);
335 		if (digest) {
336 			drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
337 
338 			inc_rs_pending(mdev);
339 			ok = drbd_send_drequest_csum(mdev,
340 						     e->sector,
341 						     e->size,
342 						     digest,
343 						     digest_size,
344 						     P_CSUM_RS_REQUEST);
345 			kfree(digest);
346 		} else {
347 			dev_err(DEV, "kmalloc() of digest failed.\n");
348 			ok = 0;
349 		}
350 	} else
351 		ok = 1;
352 
353 	drbd_free_ee(mdev, e);
354 
355 	if (unlikely(!ok))
356 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
357 	return ok;
358 }
359 
360 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
361 
362 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
363 {
364 	struct drbd_epoch_entry *e;
365 
366 	if (!get_ldev(mdev))
367 		return 0;
368 
369 	/* GFP_TRY, because if there is no memory available right now, this may
370 	 * be rescheduled for later. It is "only" background resync, after all. */
371 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
372 	if (!e) {
373 		put_ldev(mdev);
374 		return 2;
375 	}
376 
377 	spin_lock_irq(&mdev->req_lock);
378 	list_add(&e->w.list, &mdev->read_ee);
379 	spin_unlock_irq(&mdev->req_lock);
380 
381 	e->private_bio->bi_end_io = drbd_endio_read_sec;
382 	e->private_bio->bi_rw = READ;
383 	e->w.cb = w_e_send_csum;
384 
385 	mdev->read_cnt += size >> 9;
386 	drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
387 
388 	return 1;
389 }
390 
391 void resync_timer_fn(unsigned long data)
392 {
393 	unsigned long flags;
394 	struct drbd_conf *mdev = (struct drbd_conf *) data;
395 	int queue;
396 
397 	spin_lock_irqsave(&mdev->req_lock, flags);
398 
399 	if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
400 		queue = 1;
401 		if (mdev->state.conn == C_VERIFY_S)
402 			mdev->resync_work.cb = w_make_ov_request;
403 		else
404 			mdev->resync_work.cb = w_make_resync_request;
405 	} else {
406 		queue = 0;
407 		mdev->resync_work.cb = w_resync_inactive;
408 	}
409 
410 	spin_unlock_irqrestore(&mdev->req_lock, flags);
411 
412 	/* harmless race: list_empty outside data.work.q_lock */
413 	if (list_empty(&mdev->resync_work.list) && queue)
414 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
415 }
416 
417 int w_make_resync_request(struct drbd_conf *mdev,
418 		struct drbd_work *w, int cancel)
419 {
420 	unsigned long bit;
421 	sector_t sector;
422 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
423 	int max_segment_size = queue_max_segment_size(mdev->rq_queue);
424 	int number, i, size, pe, mx;
425 	int align, queued, sndbuf;
426 
427 	if (unlikely(cancel))
428 		return 1;
429 
430 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
431 		dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
432 		return 0;
433 	}
434 
435 	if (mdev->state.conn != C_SYNC_TARGET)
436 		dev_err(DEV, "%s in w_make_resync_request\n",
437 			drbd_conn_str(mdev->state.conn));
438 
439 	if (!get_ldev(mdev)) {
440 		/* Since we only need to access mdev->rsync a
441 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
442 		   to continue resync with a broken disk makes no sense at
443 		   all */
444 		dev_err(DEV, "Disk broke down during resync!\n");
445 		mdev->resync_work.cb = w_resync_inactive;
446 		return 1;
447 	}
448 
449 	number = SLEEP_TIME * mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
450 	pe = atomic_read(&mdev->rs_pending_cnt);
451 
452 	mutex_lock(&mdev->data.mutex);
453 	if (mdev->data.socket)
454 		mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
455 	else
456 		mx = 1;
457 	mutex_unlock(&mdev->data.mutex);
458 
459 	/* For resync rates >160MB/sec, allow more pending RS requests */
460 	if (number > mx)
461 		mx = number;
462 
463 	/* Limit the number of pending RS requests to no more than the peer's receive buffer */
464 	if ((pe + number) > mx) {
465 		number = mx - pe;
466 	}
467 
468 	for (i = 0; i < number; i++) {
469 		/* Stop generating RS requests, when half of the send buffer is filled */
470 		mutex_lock(&mdev->data.mutex);
471 		if (mdev->data.socket) {
472 			queued = mdev->data.socket->sk->sk_wmem_queued;
473 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
474 		} else {
475 			queued = 1;
476 			sndbuf = 0;
477 		}
478 		mutex_unlock(&mdev->data.mutex);
479 		if (queued > sndbuf / 2)
480 			goto requeue;
481 
482 next_sector:
483 		size = BM_BLOCK_SIZE;
484 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
485 
486 		if (bit == -1UL) {
487 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
488 			mdev->resync_work.cb = w_resync_inactive;
489 			put_ldev(mdev);
490 			return 1;
491 		}
492 
493 		sector = BM_BIT_TO_SECT(bit);
494 
495 		if (drbd_try_rs_begin_io(mdev, sector)) {
496 			mdev->bm_resync_fo = bit;
497 			goto requeue;
498 		}
499 		mdev->bm_resync_fo = bit + 1;
500 
501 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
502 			drbd_rs_complete_io(mdev, sector);
503 			goto next_sector;
504 		}
505 
506 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
507 		/* try to find some adjacent bits.
508 		 * we stop if we have already the maximum req size.
509 		 *
510 		 * Additionally always align bigger requests, in order to
511 		 * be prepared for all stripe sizes of software RAIDs.
512 		 *
513 		 * we _do_ care about the agreed-upon q->max_segment_size
514 		 * here, as splitting up the requests on the other side is more
515 		 * difficult.  the consequence is, that on lvm and md and other
516 		 * "indirect" devices, this is dead code, since
517 		 * q->max_segment_size will be PAGE_SIZE.
518 		 */
519 		align = 1;
520 		for (;;) {
521 			if (size + BM_BLOCK_SIZE > max_segment_size)
522 				break;
523 
524 			/* Be always aligned */
525 			if (sector & ((1<<(align+3))-1))
526 				break;
527 
528 			/* do not cross extent boundaries */
529 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
530 				break;
531 			/* now, is it actually dirty, after all?
532 			 * caution, drbd_bm_test_bit is tri-state for some
533 			 * obscure reason; ( b == 0 ) would get the out-of-band
534 			 * only accidentally right because of the "oddly sized"
535 			 * adjustment below */
536 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
537 				break;
538 			bit++;
539 			size += BM_BLOCK_SIZE;
540 			if ((BM_BLOCK_SIZE << align) <= size)
541 				align++;
542 			i++;
543 		}
544 		/* if we merged some,
545 		 * reset the offset to start the next drbd_bm_find_next from */
546 		if (size > BM_BLOCK_SIZE)
547 			mdev->bm_resync_fo = bit + 1;
548 #endif
549 
550 		/* adjust very last sectors, in case we are oddly sized */
551 		if (sector + (size>>9) > capacity)
552 			size = (capacity-sector)<<9;
553 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
554 			switch (read_for_csum(mdev, sector, size)) {
555 			case 0: /* Disk failure*/
556 				put_ldev(mdev);
557 				return 0;
558 			case 2: /* Allocation failed */
559 				drbd_rs_complete_io(mdev, sector);
560 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
561 				goto requeue;
562 			/* case 1: everything ok */
563 			}
564 		} else {
565 			inc_rs_pending(mdev);
566 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
567 					       sector, size, ID_SYNCER)) {
568 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
569 				dec_rs_pending(mdev);
570 				put_ldev(mdev);
571 				return 0;
572 			}
573 		}
574 	}
575 
576 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
577 		/* last syncer _request_ was sent,
578 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
579 		 * next sync group will resume), as soon as we receive the last
580 		 * resync data block, and the last bit is cleared.
581 		 * until then resync "work" is "inactive" ...
582 		 */
583 		mdev->resync_work.cb = w_resync_inactive;
584 		put_ldev(mdev);
585 		return 1;
586 	}
587 
588  requeue:
589 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
590 	put_ldev(mdev);
591 	return 1;
592 }
593 
594 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
595 {
596 	int number, i, size;
597 	sector_t sector;
598 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
599 
600 	if (unlikely(cancel))
601 		return 1;
602 
603 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
604 		dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
605 		return 0;
606 	}
607 
608 	number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
609 	if (atomic_read(&mdev->rs_pending_cnt) > number)
610 		goto requeue;
611 
612 	number -= atomic_read(&mdev->rs_pending_cnt);
613 
614 	sector = mdev->ov_position;
615 	for (i = 0; i < number; i++) {
616 		if (sector >= capacity) {
617 			mdev->resync_work.cb = w_resync_inactive;
618 			return 1;
619 		}
620 
621 		size = BM_BLOCK_SIZE;
622 
623 		if (drbd_try_rs_begin_io(mdev, sector)) {
624 			mdev->ov_position = sector;
625 			goto requeue;
626 		}
627 
628 		if (sector + (size>>9) > capacity)
629 			size = (capacity-sector)<<9;
630 
631 		inc_rs_pending(mdev);
632 		if (!drbd_send_ov_request(mdev, sector, size)) {
633 			dec_rs_pending(mdev);
634 			return 0;
635 		}
636 		sector += BM_SECT_PER_BIT;
637 	}
638 	mdev->ov_position = sector;
639 
640  requeue:
641 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
642 	return 1;
643 }
644 
645 
646 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
647 {
648 	kfree(w);
649 	ov_oos_print(mdev);
650 	drbd_resync_finished(mdev);
651 
652 	return 1;
653 }
654 
655 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
656 {
657 	kfree(w);
658 
659 	drbd_resync_finished(mdev);
660 
661 	return 1;
662 }
663 
664 int drbd_resync_finished(struct drbd_conf *mdev)
665 {
666 	unsigned long db, dt, dbdt;
667 	unsigned long n_oos;
668 	union drbd_state os, ns;
669 	struct drbd_work *w;
670 	char *khelper_cmd = NULL;
671 
672 	/* Remove all elements from the resync LRU. Since future actions
673 	 * might set bits in the (main) bitmap, then the entries in the
674 	 * resync LRU would be wrong. */
675 	if (drbd_rs_del_all(mdev)) {
676 		/* In case this is not possible now, most probably because
677 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
678 		 * queue (or even the read operations for those packets
679 		 * is not finished by now).   Retry in 100ms. */
680 
681 		drbd_kick_lo(mdev);
682 		__set_current_state(TASK_INTERRUPTIBLE);
683 		schedule_timeout(HZ / 10);
684 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
685 		if (w) {
686 			w->cb = w_resync_finished;
687 			drbd_queue_work(&mdev->data.work, w);
688 			return 1;
689 		}
690 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
691 	}
692 
693 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
694 	if (dt <= 0)
695 		dt = 1;
696 	db = mdev->rs_total;
697 	dbdt = Bit2KB(db/dt);
698 	mdev->rs_paused /= HZ;
699 
700 	if (!get_ldev(mdev))
701 		goto out;
702 
703 	spin_lock_irq(&mdev->req_lock);
704 	os = mdev->state;
705 
706 	/* This protects us against multiple calls (that can happen in the presence
707 	   of application IO), and against connectivity loss just before we arrive here. */
708 	if (os.conn <= C_CONNECTED)
709 		goto out_unlock;
710 
711 	ns = os;
712 	ns.conn = C_CONNECTED;
713 
714 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
715 	     (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
716 	     "Online verify " : "Resync",
717 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
718 
719 	n_oos = drbd_bm_total_weight(mdev);
720 
721 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
722 		if (n_oos) {
723 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
724 			      n_oos, Bit2KB(1));
725 			khelper_cmd = "out-of-sync";
726 		}
727 	} else {
728 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
729 
730 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
731 			khelper_cmd = "after-resync-target";
732 
733 		if (mdev->csums_tfm && mdev->rs_total) {
734 			const unsigned long s = mdev->rs_same_csum;
735 			const unsigned long t = mdev->rs_total;
736 			const int ratio =
737 				(t == 0)     ? 0 :
738 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
739 			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
740 			     "transferred %luK total %luK\n",
741 			     ratio,
742 			     Bit2KB(mdev->rs_same_csum),
743 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
744 			     Bit2KB(mdev->rs_total));
745 		}
746 	}
747 
748 	if (mdev->rs_failed) {
749 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
750 
751 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
752 			ns.disk = D_INCONSISTENT;
753 			ns.pdsk = D_UP_TO_DATE;
754 		} else {
755 			ns.disk = D_UP_TO_DATE;
756 			ns.pdsk = D_INCONSISTENT;
757 		}
758 	} else {
759 		ns.disk = D_UP_TO_DATE;
760 		ns.pdsk = D_UP_TO_DATE;
761 
762 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
763 			if (mdev->p_uuid) {
764 				int i;
765 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
766 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
767 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
768 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
769 			} else {
770 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
771 			}
772 		}
773 
774 		drbd_uuid_set_bm(mdev, 0UL);
775 
776 		if (mdev->p_uuid) {
777 			/* Now the two UUID sets are equal, update what we
778 			 * know of the peer. */
779 			int i;
780 			for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
781 				mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
782 		}
783 	}
784 
785 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
786 out_unlock:
787 	spin_unlock_irq(&mdev->req_lock);
788 	put_ldev(mdev);
789 out:
790 	mdev->rs_total  = 0;
791 	mdev->rs_failed = 0;
792 	mdev->rs_paused = 0;
793 	mdev->ov_start_sector = 0;
794 
795 	if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
796 		dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
797 		drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
798 	}
799 
800 	if (khelper_cmd)
801 		drbd_khelper(mdev, khelper_cmd);
802 
803 	return 1;
804 }
805 
806 /* helper */
807 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
808 {
809 	if (drbd_bio_has_active_page(e->private_bio)) {
810 		/* This might happen if sendpage() has not finished */
811 		spin_lock_irq(&mdev->req_lock);
812 		list_add_tail(&e->w.list, &mdev->net_ee);
813 		spin_unlock_irq(&mdev->req_lock);
814 	} else
815 		drbd_free_ee(mdev, e);
816 }
817 
818 /**
819  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
820  * @mdev:	DRBD device.
821  * @w:		work object.
822  * @cancel:	The connection will be closed anyways
823  */
824 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
825 {
826 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
827 	int ok;
828 
829 	if (unlikely(cancel)) {
830 		drbd_free_ee(mdev, e);
831 		dec_unacked(mdev);
832 		return 1;
833 	}
834 
835 	if (likely(drbd_bio_uptodate(e->private_bio))) {
836 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
837 	} else {
838 		if (__ratelimit(&drbd_ratelimit_state))
839 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
840 			    (unsigned long long)e->sector);
841 
842 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
843 	}
844 
845 	dec_unacked(mdev);
846 
847 	move_to_net_ee_or_free(mdev, e);
848 
849 	if (unlikely(!ok))
850 		dev_err(DEV, "drbd_send_block() failed\n");
851 	return ok;
852 }
853 
854 /**
855  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
856  * @mdev:	DRBD device.
857  * @w:		work object.
858  * @cancel:	The connection will be closed anyways
859  */
860 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
861 {
862 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
863 	int ok;
864 
865 	if (unlikely(cancel)) {
866 		drbd_free_ee(mdev, e);
867 		dec_unacked(mdev);
868 		return 1;
869 	}
870 
871 	if (get_ldev_if_state(mdev, D_FAILED)) {
872 		drbd_rs_complete_io(mdev, e->sector);
873 		put_ldev(mdev);
874 	}
875 
876 	if (likely(drbd_bio_uptodate(e->private_bio))) {
877 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
878 			inc_rs_pending(mdev);
879 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
880 		} else {
881 			if (__ratelimit(&drbd_ratelimit_state))
882 				dev_err(DEV, "Not sending RSDataReply, "
883 				    "partner DISKLESS!\n");
884 			ok = 1;
885 		}
886 	} else {
887 		if (__ratelimit(&drbd_ratelimit_state))
888 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
889 			    (unsigned long long)e->sector);
890 
891 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
892 
893 		/* update resync data with failure */
894 		drbd_rs_failed_io(mdev, e->sector, e->size);
895 	}
896 
897 	dec_unacked(mdev);
898 
899 	move_to_net_ee_or_free(mdev, e);
900 
901 	if (unlikely(!ok))
902 		dev_err(DEV, "drbd_send_block() failed\n");
903 	return ok;
904 }
905 
906 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
907 {
908 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
909 	struct digest_info *di;
910 	int digest_size;
911 	void *digest = NULL;
912 	int ok, eq = 0;
913 
914 	if (unlikely(cancel)) {
915 		drbd_free_ee(mdev, e);
916 		dec_unacked(mdev);
917 		return 1;
918 	}
919 
920 	drbd_rs_complete_io(mdev, e->sector);
921 
922 	di = (struct digest_info *)(unsigned long)e->block_id;
923 
924 	if (likely(drbd_bio_uptodate(e->private_bio))) {
925 		/* quick hack to try to avoid a race against reconfiguration.
926 		 * a real fix would be much more involved,
927 		 * introducing more locking mechanisms */
928 		if (mdev->csums_tfm) {
929 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
930 			D_ASSERT(digest_size == di->digest_size);
931 			digest = kmalloc(digest_size, GFP_NOIO);
932 		}
933 		if (digest) {
934 			drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
935 			eq = !memcmp(digest, di->digest, digest_size);
936 			kfree(digest);
937 		}
938 
939 		if (eq) {
940 			drbd_set_in_sync(mdev, e->sector, e->size);
941 			mdev->rs_same_csum++;
942 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
943 		} else {
944 			inc_rs_pending(mdev);
945 			e->block_id = ID_SYNCER;
946 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
947 		}
948 	} else {
949 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
950 		if (__ratelimit(&drbd_ratelimit_state))
951 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
952 	}
953 
954 	dec_unacked(mdev);
955 
956 	kfree(di);
957 
958 	move_to_net_ee_or_free(mdev, e);
959 
960 	if (unlikely(!ok))
961 		dev_err(DEV, "drbd_send_block/ack() failed\n");
962 	return ok;
963 }
964 
965 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
966 {
967 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
968 	int digest_size;
969 	void *digest;
970 	int ok = 1;
971 
972 	if (unlikely(cancel))
973 		goto out;
974 
975 	if (unlikely(!drbd_bio_uptodate(e->private_bio)))
976 		goto out;
977 
978 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
979 	/* FIXME if this allocation fails, online verify will not terminate! */
980 	digest = kmalloc(digest_size, GFP_NOIO);
981 	if (digest) {
982 		drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
983 		inc_rs_pending(mdev);
984 		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
985 					     digest, digest_size, P_OV_REPLY);
986 		if (!ok)
987 			dec_rs_pending(mdev);
988 		kfree(digest);
989 	}
990 
991 out:
992 	drbd_free_ee(mdev, e);
993 
994 	dec_unacked(mdev);
995 
996 	return ok;
997 }
998 
999 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1000 {
1001 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1002 		mdev->ov_last_oos_size += size>>9;
1003 	} else {
1004 		mdev->ov_last_oos_start = sector;
1005 		mdev->ov_last_oos_size = size>>9;
1006 	}
1007 	drbd_set_out_of_sync(mdev, sector, size);
1008 	set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1009 }
1010 
1011 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1012 {
1013 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1014 	struct digest_info *di;
1015 	int digest_size;
1016 	void *digest;
1017 	int ok, eq = 0;
1018 
1019 	if (unlikely(cancel)) {
1020 		drbd_free_ee(mdev, e);
1021 		dec_unacked(mdev);
1022 		return 1;
1023 	}
1024 
1025 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1026 	 * the resync lru has been cleaned up already */
1027 	drbd_rs_complete_io(mdev, e->sector);
1028 
1029 	di = (struct digest_info *)(unsigned long)e->block_id;
1030 
1031 	if (likely(drbd_bio_uptodate(e->private_bio))) {
1032 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1033 		digest = kmalloc(digest_size, GFP_NOIO);
1034 		if (digest) {
1035 			drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
1036 
1037 			D_ASSERT(digest_size == di->digest_size);
1038 			eq = !memcmp(digest, di->digest, digest_size);
1039 			kfree(digest);
1040 		}
1041 	} else {
1042 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1043 		if (__ratelimit(&drbd_ratelimit_state))
1044 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1045 	}
1046 
1047 	dec_unacked(mdev);
1048 
1049 	kfree(di);
1050 
1051 	if (!eq)
1052 		drbd_ov_oos_found(mdev, e->sector, e->size);
1053 	else
1054 		ov_oos_print(mdev);
1055 
1056 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1057 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1058 
1059 	drbd_free_ee(mdev, e);
1060 
1061 	if (--mdev->ov_left == 0) {
1062 		ov_oos_print(mdev);
1063 		drbd_resync_finished(mdev);
1064 	}
1065 
1066 	return ok;
1067 }
1068 
1069 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1070 {
1071 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1072 	complete(&b->done);
1073 	return 1;
1074 }
1075 
1076 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1077 {
1078 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1079 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1080 	int ok = 1;
1081 
1082 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1083 	 * just before it was reassigned and re-queued, so double check that.
1084 	 * actually, this race was harmless, since we only try to send the
1085 	 * barrier packet here, and otherwise do nothing with the object.
1086 	 * but compare with the head of w_clear_epoch */
1087 	spin_lock_irq(&mdev->req_lock);
1088 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1089 		cancel = 1;
1090 	spin_unlock_irq(&mdev->req_lock);
1091 	if (cancel)
1092 		return 1;
1093 
1094 	if (!drbd_get_data_sock(mdev))
1095 		return 0;
1096 	p->barrier = b->br_number;
1097 	/* inc_ap_pending was done where this was queued.
1098 	 * dec_ap_pending will be done in got_BarrierAck
1099 	 * or (on connection loss) in w_clear_epoch.  */
1100 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1101 				(struct p_header *)p, sizeof(*p), 0);
1102 	drbd_put_data_sock(mdev);
1103 
1104 	return ok;
1105 }
1106 
1107 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1108 {
1109 	if (cancel)
1110 		return 1;
1111 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1112 }
1113 
1114 /**
1115  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1116  * @mdev:	DRBD device.
1117  * @w:		work object.
1118  * @cancel:	The connection will be closed anyways
1119  */
1120 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1121 {
1122 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1123 	int ok;
1124 
1125 	if (unlikely(cancel)) {
1126 		req_mod(req, send_canceled);
1127 		return 1;
1128 	}
1129 
1130 	ok = drbd_send_dblock(mdev, req);
1131 	req_mod(req, ok ? handed_over_to_network : send_failed);
1132 
1133 	return ok;
1134 }
1135 
1136 /**
1137  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1138  * @mdev:	DRBD device.
1139  * @w:		work object.
1140  * @cancel:	The connection will be closed anyways
1141  */
1142 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1143 {
1144 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1145 	int ok;
1146 
1147 	if (unlikely(cancel)) {
1148 		req_mod(req, send_canceled);
1149 		return 1;
1150 	}
1151 
1152 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1153 				(unsigned long)req);
1154 
1155 	if (!ok) {
1156 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1157 		 * so this is probably redundant */
1158 		if (mdev->state.conn >= C_CONNECTED)
1159 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1160 	}
1161 	req_mod(req, ok ? handed_over_to_network : send_failed);
1162 
1163 	return ok;
1164 }
1165 
1166 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1167 {
1168 	struct drbd_conf *odev = mdev;
1169 
1170 	while (1) {
1171 		if (odev->sync_conf.after == -1)
1172 			return 1;
1173 		odev = minor_to_mdev(odev->sync_conf.after);
1174 		ERR_IF(!odev) return 1;
1175 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1176 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1177 		    odev->state.aftr_isp || odev->state.peer_isp ||
1178 		    odev->state.user_isp)
1179 			return 0;
1180 	}
1181 }
1182 
1183 /**
1184  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1185  * @mdev:	DRBD device.
1186  *
1187  * Called from process context only (admin command and after_state_ch).
1188  */
1189 static int _drbd_pause_after(struct drbd_conf *mdev)
1190 {
1191 	struct drbd_conf *odev;
1192 	int i, rv = 0;
1193 
1194 	for (i = 0; i < minor_count; i++) {
1195 		odev = minor_to_mdev(i);
1196 		if (!odev)
1197 			continue;
1198 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1199 			continue;
1200 		if (!_drbd_may_sync_now(odev))
1201 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1202 			       != SS_NOTHING_TO_DO);
1203 	}
1204 
1205 	return rv;
1206 }
1207 
1208 /**
1209  * _drbd_resume_next() - Resume resync on all devices that may resync now
1210  * @mdev:	DRBD device.
1211  *
1212  * Called from process context only (admin command and worker).
1213  */
1214 static int _drbd_resume_next(struct drbd_conf *mdev)
1215 {
1216 	struct drbd_conf *odev;
1217 	int i, rv = 0;
1218 
1219 	for (i = 0; i < minor_count; i++) {
1220 		odev = minor_to_mdev(i);
1221 		if (!odev)
1222 			continue;
1223 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1224 			continue;
1225 		if (odev->state.aftr_isp) {
1226 			if (_drbd_may_sync_now(odev))
1227 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1228 							CS_HARD, NULL)
1229 				       != SS_NOTHING_TO_DO) ;
1230 		}
1231 	}
1232 	return rv;
1233 }
1234 
1235 void resume_next_sg(struct drbd_conf *mdev)
1236 {
1237 	write_lock_irq(&global_state_lock);
1238 	_drbd_resume_next(mdev);
1239 	write_unlock_irq(&global_state_lock);
1240 }
1241 
1242 void suspend_other_sg(struct drbd_conf *mdev)
1243 {
1244 	write_lock_irq(&global_state_lock);
1245 	_drbd_pause_after(mdev);
1246 	write_unlock_irq(&global_state_lock);
1247 }
1248 
1249 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1250 {
1251 	struct drbd_conf *odev;
1252 
1253 	if (o_minor == -1)
1254 		return NO_ERROR;
1255 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1256 		return ERR_SYNC_AFTER;
1257 
1258 	/* check for loops */
1259 	odev = minor_to_mdev(o_minor);
1260 	while (1) {
1261 		if (odev == mdev)
1262 			return ERR_SYNC_AFTER_CYCLE;
1263 
1264 		/* dependency chain ends here, no cycles. */
1265 		if (odev->sync_conf.after == -1)
1266 			return NO_ERROR;
1267 
1268 		/* follow the dependency chain */
1269 		odev = minor_to_mdev(odev->sync_conf.after);
1270 	}
1271 }
1272 
1273 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1274 {
1275 	int changes;
1276 	int retcode;
1277 
1278 	write_lock_irq(&global_state_lock);
1279 	retcode = sync_after_error(mdev, na);
1280 	if (retcode == NO_ERROR) {
1281 		mdev->sync_conf.after = na;
1282 		do {
1283 			changes  = _drbd_pause_after(mdev);
1284 			changes |= _drbd_resume_next(mdev);
1285 		} while (changes);
1286 	}
1287 	write_unlock_irq(&global_state_lock);
1288 	return retcode;
1289 }
1290 
1291 /**
1292  * drbd_start_resync() - Start the resync process
1293  * @mdev:	DRBD device.
1294  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1295  *
1296  * This function might bring you directly into one of the
1297  * C_PAUSED_SYNC_* states.
1298  */
1299 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1300 {
1301 	union drbd_state ns;
1302 	int r;
1303 
1304 	if (mdev->state.conn >= C_SYNC_SOURCE) {
1305 		dev_err(DEV, "Resync already running!\n");
1306 		return;
1307 	}
1308 
1309 	/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1310 	drbd_rs_cancel_all(mdev);
1311 
1312 	if (side == C_SYNC_TARGET) {
1313 		/* Since application IO was locked out during C_WF_BITMAP_T and
1314 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1315 		   we check that we might make the data inconsistent. */
1316 		r = drbd_khelper(mdev, "before-resync-target");
1317 		r = (r >> 8) & 0xff;
1318 		if (r > 0) {
1319 			dev_info(DEV, "before-resync-target handler returned %d, "
1320 			     "dropping connection.\n", r);
1321 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1322 			return;
1323 		}
1324 	}
1325 
1326 	drbd_state_lock(mdev);
1327 
1328 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1329 		drbd_state_unlock(mdev);
1330 		return;
1331 	}
1332 
1333 	if (side == C_SYNC_TARGET) {
1334 		mdev->bm_resync_fo = 0;
1335 	} else /* side == C_SYNC_SOURCE */ {
1336 		u64 uuid;
1337 
1338 		get_random_bytes(&uuid, sizeof(u64));
1339 		drbd_uuid_set(mdev, UI_BITMAP, uuid);
1340 		drbd_send_sync_uuid(mdev, uuid);
1341 
1342 		D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1343 	}
1344 
1345 	write_lock_irq(&global_state_lock);
1346 	ns = mdev->state;
1347 
1348 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1349 
1350 	ns.conn = side;
1351 
1352 	if (side == C_SYNC_TARGET)
1353 		ns.disk = D_INCONSISTENT;
1354 	else /* side == C_SYNC_SOURCE */
1355 		ns.pdsk = D_INCONSISTENT;
1356 
1357 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1358 	ns = mdev->state;
1359 
1360 	if (ns.conn < C_CONNECTED)
1361 		r = SS_UNKNOWN_ERROR;
1362 
1363 	if (r == SS_SUCCESS) {
1364 		mdev->rs_total     =
1365 		mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1366 		mdev->rs_failed    = 0;
1367 		mdev->rs_paused    = 0;
1368 		mdev->rs_start     =
1369 		mdev->rs_mark_time = jiffies;
1370 		mdev->rs_same_csum = 0;
1371 		_drbd_pause_after(mdev);
1372 	}
1373 	write_unlock_irq(&global_state_lock);
1374 	drbd_state_unlock(mdev);
1375 	put_ldev(mdev);
1376 
1377 	if (r == SS_SUCCESS) {
1378 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1379 		     drbd_conn_str(ns.conn),
1380 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1381 		     (unsigned long) mdev->rs_total);
1382 
1383 		if (mdev->rs_total == 0) {
1384 			/* Peer still reachable? Beware of failing before-resync-target handlers! */
1385 			request_ping(mdev);
1386 			__set_current_state(TASK_INTERRUPTIBLE);
1387 			schedule_timeout(mdev->net_conf->ping_timeo*HZ/9); /* 9 instead 10 */
1388 			drbd_resync_finished(mdev);
1389 			return;
1390 		}
1391 
1392 		/* ns.conn may already be != mdev->state.conn,
1393 		 * we may have been paused in between, or become paused until
1394 		 * the timer triggers.
1395 		 * No matter, that is handled in resync_timer_fn() */
1396 		if (ns.conn == C_SYNC_TARGET)
1397 			mod_timer(&mdev->resync_timer, jiffies);
1398 
1399 		drbd_md_sync(mdev);
1400 	}
1401 }
1402 
1403 int drbd_worker(struct drbd_thread *thi)
1404 {
1405 	struct drbd_conf *mdev = thi->mdev;
1406 	struct drbd_work *w = NULL;
1407 	LIST_HEAD(work_list);
1408 	int intr = 0, i;
1409 
1410 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1411 
1412 	while (get_t_state(thi) == Running) {
1413 		drbd_thread_current_set_cpu(mdev);
1414 
1415 		if (down_trylock(&mdev->data.work.s)) {
1416 			mutex_lock(&mdev->data.mutex);
1417 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1418 				drbd_tcp_uncork(mdev->data.socket);
1419 			mutex_unlock(&mdev->data.mutex);
1420 
1421 			intr = down_interruptible(&mdev->data.work.s);
1422 
1423 			mutex_lock(&mdev->data.mutex);
1424 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1425 				drbd_tcp_cork(mdev->data.socket);
1426 			mutex_unlock(&mdev->data.mutex);
1427 		}
1428 
1429 		if (intr) {
1430 			D_ASSERT(intr == -EINTR);
1431 			flush_signals(current);
1432 			ERR_IF (get_t_state(thi) == Running)
1433 				continue;
1434 			break;
1435 		}
1436 
1437 		if (get_t_state(thi) != Running)
1438 			break;
1439 		/* With this break, we have done a down() but not consumed
1440 		   the entry from the list. The cleanup code takes care of
1441 		   this...   */
1442 
1443 		w = NULL;
1444 		spin_lock_irq(&mdev->data.work.q_lock);
1445 		ERR_IF(list_empty(&mdev->data.work.q)) {
1446 			/* something terribly wrong in our logic.
1447 			 * we were able to down() the semaphore,
1448 			 * but the list is empty... doh.
1449 			 *
1450 			 * what is the best thing to do now?
1451 			 * try again from scratch, restarting the receiver,
1452 			 * asender, whatnot? could break even more ugly,
1453 			 * e.g. when we are primary, but no good local data.
1454 			 *
1455 			 * I'll try to get away just starting over this loop.
1456 			 */
1457 			spin_unlock_irq(&mdev->data.work.q_lock);
1458 			continue;
1459 		}
1460 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1461 		list_del_init(&w->list);
1462 		spin_unlock_irq(&mdev->data.work.q_lock);
1463 
1464 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1465 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1466 			if (mdev->state.conn >= C_CONNECTED)
1467 				drbd_force_state(mdev,
1468 						NS(conn, C_NETWORK_FAILURE));
1469 		}
1470 	}
1471 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1472 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1473 
1474 	spin_lock_irq(&mdev->data.work.q_lock);
1475 	i = 0;
1476 	while (!list_empty(&mdev->data.work.q)) {
1477 		list_splice_init(&mdev->data.work.q, &work_list);
1478 		spin_unlock_irq(&mdev->data.work.q_lock);
1479 
1480 		while (!list_empty(&work_list)) {
1481 			w = list_entry(work_list.next, struct drbd_work, list);
1482 			list_del_init(&w->list);
1483 			w->cb(mdev, w, 1);
1484 			i++; /* dead debugging code */
1485 		}
1486 
1487 		spin_lock_irq(&mdev->data.work.q_lock);
1488 	}
1489 	sema_init(&mdev->data.work.s, 0);
1490 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1491 	 * but up() ed outside the spinlock, we could get an up() on the
1492 	 * semaphore without corresponding list entry.
1493 	 * So don't do that.
1494 	 */
1495 	spin_unlock_irq(&mdev->data.work.q_lock);
1496 
1497 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1498 	/* _drbd_set_state only uses stop_nowait.
1499 	 * wait here for the Exiting receiver. */
1500 	drbd_thread_stop(&mdev->receiver);
1501 	drbd_mdev_cleanup(mdev);
1502 
1503 	dev_info(DEV, "worker terminated\n");
1504 
1505 	clear_bit(DEVICE_DYING, &mdev->flags);
1506 	clear_bit(CONFIG_PENDING, &mdev->flags);
1507 	wake_up(&mdev->state_wait);
1508 
1509 	return 0;
1510 }
1511