xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision 7b886f4f)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/version.h>
28 #include <linux/drbd.h>
29 #include <linux/sched.h>
30 #include <linux/smp_lock.h>
31 #include <linux/wait.h>
32 #include <linux/mm.h>
33 #include <linux/memcontrol.h>
34 #include <linux/mm_inline.h>
35 #include <linux/slab.h>
36 #include <linux/random.h>
37 #include <linux/string.h>
38 #include <linux/scatterlist.h>
39 
40 #include "drbd_int.h"
41 #include "drbd_req.h"
42 
43 #define SLEEP_TIME (HZ/10)
44 
45 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
46 
47 
48 
49 /* defined here:
50    drbd_md_io_complete
51    drbd_endio_write_sec
52    drbd_endio_read_sec
53    drbd_endio_pri
54 
55  * more endio handlers:
56    atodb_endio in drbd_actlog.c
57    drbd_bm_async_io_complete in drbd_bitmap.c
58 
59  * For all these callbacks, note the following:
60  * The callbacks will be called in irq context by the IDE drivers,
61  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
62  * Try to get the locking right :)
63  *
64  */
65 
66 
67 /* About the global_state_lock
68    Each state transition on an device holds a read lock. In case we have
69    to evaluate the sync after dependencies, we grab a write lock, because
70    we need stable states on all devices for that.  */
71 rwlock_t global_state_lock;
72 
73 /* used for synchronous meta data and bitmap IO
74  * submitted by drbd_md_sync_page_io()
75  */
76 void drbd_md_io_complete(struct bio *bio, int error)
77 {
78 	struct drbd_md_io *md_io;
79 
80 	md_io = (struct drbd_md_io *)bio->bi_private;
81 	md_io->error = error;
82 
83 	complete(&md_io->event);
84 }
85 
86 /* reads on behalf of the partner,
87  * "submitted" by the receiver
88  */
89 void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
90 {
91 	unsigned long flags = 0;
92 	struct drbd_epoch_entry *e = NULL;
93 	struct drbd_conf *mdev;
94 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
95 
96 	e = bio->bi_private;
97 	mdev = e->mdev;
98 
99 	if (error)
100 		dev_warn(DEV, "read: error=%d s=%llus\n", error,
101 				(unsigned long long)e->sector);
102 	if (!error && !uptodate) {
103 		dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
104 				(unsigned long long)e->sector);
105 		/* strange behavior of some lower level drivers...
106 		 * fail the request by clearing the uptodate flag,
107 		 * but do not return any error?! */
108 		error = -EIO;
109 	}
110 
111 	D_ASSERT(e->block_id != ID_VACANT);
112 
113 	spin_lock_irqsave(&mdev->req_lock, flags);
114 	mdev->read_cnt += e->size >> 9;
115 	list_del(&e->w.list);
116 	if (list_empty(&mdev->read_ee))
117 		wake_up(&mdev->ee_wait);
118 	spin_unlock_irqrestore(&mdev->req_lock, flags);
119 
120 	drbd_chk_io_error(mdev, error, FALSE);
121 	drbd_queue_work(&mdev->data.work, &e->w);
122 	put_ldev(mdev);
123 }
124 
125 /* writes on behalf of the partner, or resync writes,
126  * "submitted" by the receiver.
127  */
128 void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
129 {
130 	unsigned long flags = 0;
131 	struct drbd_epoch_entry *e = NULL;
132 	struct drbd_conf *mdev;
133 	sector_t e_sector;
134 	int do_wake;
135 	int is_syncer_req;
136 	int do_al_complete_io;
137 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
138 	int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
139 
140 	e = bio->bi_private;
141 	mdev = e->mdev;
142 
143 	if (error)
144 		dev_warn(DEV, "write: error=%d s=%llus\n", error,
145 				(unsigned long long)e->sector);
146 	if (!error && !uptodate) {
147 		dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
148 				(unsigned long long)e->sector);
149 		/* strange behavior of some lower level drivers...
150 		 * fail the request by clearing the uptodate flag,
151 		 * but do not return any error?! */
152 		error = -EIO;
153 	}
154 
155 	/* error == -ENOTSUPP would be a better test,
156 	 * alas it is not reliable */
157 	if (error && is_barrier && e->flags & EE_IS_BARRIER) {
158 		drbd_bump_write_ordering(mdev, WO_bdev_flush);
159 		spin_lock_irqsave(&mdev->req_lock, flags);
160 		list_del(&e->w.list);
161 		e->w.cb = w_e_reissue;
162 		/* put_ldev actually happens below, once we come here again. */
163 		__release(local);
164 		spin_unlock_irqrestore(&mdev->req_lock, flags);
165 		drbd_queue_work(&mdev->data.work, &e->w);
166 		return;
167 	}
168 
169 	D_ASSERT(e->block_id != ID_VACANT);
170 
171 	spin_lock_irqsave(&mdev->req_lock, flags);
172 	mdev->writ_cnt += e->size >> 9;
173 	is_syncer_req = is_syncer_block_id(e->block_id);
174 
175 	/* after we moved e to done_ee,
176 	 * we may no longer access it,
177 	 * it may be freed/reused already!
178 	 * (as soon as we release the req_lock) */
179 	e_sector = e->sector;
180 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
181 
182 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
183 	list_add_tail(&e->w.list, &mdev->done_ee);
184 
185 	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
186 	 * neither did we wake possibly waiting conflicting requests.
187 	 * done from "drbd_process_done_ee" within the appropriate w.cb
188 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
189 
190 	do_wake = is_syncer_req
191 		? list_empty(&mdev->sync_ee)
192 		: list_empty(&mdev->active_ee);
193 
194 	if (error)
195 		__drbd_chk_io_error(mdev, FALSE);
196 	spin_unlock_irqrestore(&mdev->req_lock, flags);
197 
198 	if (is_syncer_req)
199 		drbd_rs_complete_io(mdev, e_sector);
200 
201 	if (do_wake)
202 		wake_up(&mdev->ee_wait);
203 
204 	if (do_al_complete_io)
205 		drbd_al_complete_io(mdev, e_sector);
206 
207 	wake_asender(mdev);
208 	put_ldev(mdev);
209 
210 }
211 
212 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
213  */
214 void drbd_endio_pri(struct bio *bio, int error)
215 {
216 	unsigned long flags;
217 	struct drbd_request *req = bio->bi_private;
218 	struct drbd_conf *mdev = req->mdev;
219 	struct bio_and_error m;
220 	enum drbd_req_event what;
221 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
222 
223 	if (error)
224 		dev_warn(DEV, "p %s: error=%d\n",
225 			 bio_data_dir(bio) == WRITE ? "write" : "read", error);
226 	if (!error && !uptodate) {
227 		dev_warn(DEV, "p %s: setting error to -EIO\n",
228 			 bio_data_dir(bio) == WRITE ? "write" : "read");
229 		/* strange behavior of some lower level drivers...
230 		 * fail the request by clearing the uptodate flag,
231 		 * but do not return any error?! */
232 		error = -EIO;
233 	}
234 
235 	/* to avoid recursion in __req_mod */
236 	if (unlikely(error)) {
237 		what = (bio_data_dir(bio) == WRITE)
238 			? write_completed_with_error
239 			: (bio_rw(bio) == READA)
240 			  ? read_completed_with_error
241 			  : read_ahead_completed_with_error;
242 	} else
243 		what = completed_ok;
244 
245 	bio_put(req->private_bio);
246 	req->private_bio = ERR_PTR(error);
247 
248 	spin_lock_irqsave(&mdev->req_lock, flags);
249 	__req_mod(req, what, &m);
250 	spin_unlock_irqrestore(&mdev->req_lock, flags);
251 
252 	if (m.bio)
253 		complete_master_bio(mdev, &m);
254 }
255 
256 int w_io_error(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
257 {
258 	struct drbd_request *req = container_of(w, struct drbd_request, w);
259 
260 	/* NOTE: mdev->ldev can be NULL by the time we get here! */
261 	/* D_ASSERT(mdev->ldev->dc.on_io_error != EP_PASS_ON); */
262 
263 	/* the only way this callback is scheduled is from _req_may_be_done,
264 	 * when it is done and had a local write error, see comments there */
265 	drbd_req_free(req);
266 
267 	return TRUE;
268 }
269 
270 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
271 {
272 	struct drbd_request *req = container_of(w, struct drbd_request, w);
273 
274 	/* We should not detach for read io-error,
275 	 * but try to WRITE the P_DATA_REPLY to the failed location,
276 	 * to give the disk the chance to relocate that block */
277 
278 	spin_lock_irq(&mdev->req_lock);
279 	if (cancel ||
280 	    mdev->state.conn < C_CONNECTED ||
281 	    mdev->state.pdsk <= D_INCONSISTENT) {
282 		_req_mod(req, send_canceled);
283 		spin_unlock_irq(&mdev->req_lock);
284 		dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n");
285 		return 1;
286 	}
287 	spin_unlock_irq(&mdev->req_lock);
288 
289 	return w_send_read_req(mdev, w, 0);
290 }
291 
292 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
293 {
294 	ERR_IF(cancel) return 1;
295 	dev_err(DEV, "resync inactive, but callback triggered??\n");
296 	return 1; /* Simply ignore this! */
297 }
298 
299 void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
300 {
301 	struct hash_desc desc;
302 	struct scatterlist sg;
303 	struct bio_vec *bvec;
304 	int i;
305 
306 	desc.tfm = tfm;
307 	desc.flags = 0;
308 
309 	sg_init_table(&sg, 1);
310 	crypto_hash_init(&desc);
311 
312 	__bio_for_each_segment(bvec, bio, i, 0) {
313 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
314 		crypto_hash_update(&desc, &sg, sg.length);
315 	}
316 	crypto_hash_final(&desc, digest);
317 }
318 
319 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
320 {
321 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
322 	int digest_size;
323 	void *digest;
324 	int ok;
325 
326 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
327 
328 	if (unlikely(cancel)) {
329 		drbd_free_ee(mdev, e);
330 		return 1;
331 	}
332 
333 	if (likely(drbd_bio_uptodate(e->private_bio))) {
334 		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
335 		digest = kmalloc(digest_size, GFP_NOIO);
336 		if (digest) {
337 			drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
338 
339 			inc_rs_pending(mdev);
340 			ok = drbd_send_drequest_csum(mdev,
341 						     e->sector,
342 						     e->size,
343 						     digest,
344 						     digest_size,
345 						     P_CSUM_RS_REQUEST);
346 			kfree(digest);
347 		} else {
348 			dev_err(DEV, "kmalloc() of digest failed.\n");
349 			ok = 0;
350 		}
351 	} else
352 		ok = 1;
353 
354 	drbd_free_ee(mdev, e);
355 
356 	if (unlikely(!ok))
357 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
358 	return ok;
359 }
360 
361 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
362 
363 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
364 {
365 	struct drbd_epoch_entry *e;
366 
367 	if (!get_ldev(mdev))
368 		return 0;
369 
370 	/* GFP_TRY, because if there is no memory available right now, this may
371 	 * be rescheduled for later. It is "only" background resync, after all. */
372 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
373 	if (!e) {
374 		put_ldev(mdev);
375 		return 2;
376 	}
377 
378 	spin_lock_irq(&mdev->req_lock);
379 	list_add(&e->w.list, &mdev->read_ee);
380 	spin_unlock_irq(&mdev->req_lock);
381 
382 	e->private_bio->bi_end_io = drbd_endio_read_sec;
383 	e->private_bio->bi_rw = READ;
384 	e->w.cb = w_e_send_csum;
385 
386 	mdev->read_cnt += size >> 9;
387 	drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
388 
389 	return 1;
390 }
391 
392 void resync_timer_fn(unsigned long data)
393 {
394 	unsigned long flags;
395 	struct drbd_conf *mdev = (struct drbd_conf *) data;
396 	int queue;
397 
398 	spin_lock_irqsave(&mdev->req_lock, flags);
399 
400 	if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
401 		queue = 1;
402 		if (mdev->state.conn == C_VERIFY_S)
403 			mdev->resync_work.cb = w_make_ov_request;
404 		else
405 			mdev->resync_work.cb = w_make_resync_request;
406 	} else {
407 		queue = 0;
408 		mdev->resync_work.cb = w_resync_inactive;
409 	}
410 
411 	spin_unlock_irqrestore(&mdev->req_lock, flags);
412 
413 	/* harmless race: list_empty outside data.work.q_lock */
414 	if (list_empty(&mdev->resync_work.list) && queue)
415 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
416 }
417 
418 int w_make_resync_request(struct drbd_conf *mdev,
419 		struct drbd_work *w, int cancel)
420 {
421 	unsigned long bit;
422 	sector_t sector;
423 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
424 	int max_segment_size = queue_max_segment_size(mdev->rq_queue);
425 	int number, i, size, pe, mx;
426 	int align, queued, sndbuf;
427 
428 	if (unlikely(cancel))
429 		return 1;
430 
431 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
432 		dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
433 		return 0;
434 	}
435 
436 	if (mdev->state.conn != C_SYNC_TARGET)
437 		dev_err(DEV, "%s in w_make_resync_request\n",
438 			drbd_conn_str(mdev->state.conn));
439 
440 	if (!get_ldev(mdev)) {
441 		/* Since we only need to access mdev->rsync a
442 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
443 		   to continue resync with a broken disk makes no sense at
444 		   all */
445 		dev_err(DEV, "Disk broke down during resync!\n");
446 		mdev->resync_work.cb = w_resync_inactive;
447 		return 1;
448 	}
449 
450 	number = SLEEP_TIME * mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
451 	pe = atomic_read(&mdev->rs_pending_cnt);
452 
453 	mutex_lock(&mdev->data.mutex);
454 	if (mdev->data.socket)
455 		mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
456 	else
457 		mx = 1;
458 	mutex_unlock(&mdev->data.mutex);
459 
460 	/* For resync rates >160MB/sec, allow more pending RS requests */
461 	if (number > mx)
462 		mx = number;
463 
464 	/* Limit the number of pending RS requests to no more than the peer's receive buffer */
465 	if ((pe + number) > mx) {
466 		number = mx - pe;
467 	}
468 
469 	for (i = 0; i < number; i++) {
470 		/* Stop generating RS requests, when half of the send buffer is filled */
471 		mutex_lock(&mdev->data.mutex);
472 		if (mdev->data.socket) {
473 			queued = mdev->data.socket->sk->sk_wmem_queued;
474 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
475 		} else {
476 			queued = 1;
477 			sndbuf = 0;
478 		}
479 		mutex_unlock(&mdev->data.mutex);
480 		if (queued > sndbuf / 2)
481 			goto requeue;
482 
483 next_sector:
484 		size = BM_BLOCK_SIZE;
485 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
486 
487 		if (bit == -1UL) {
488 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
489 			mdev->resync_work.cb = w_resync_inactive;
490 			put_ldev(mdev);
491 			return 1;
492 		}
493 
494 		sector = BM_BIT_TO_SECT(bit);
495 
496 		if (drbd_try_rs_begin_io(mdev, sector)) {
497 			mdev->bm_resync_fo = bit;
498 			goto requeue;
499 		}
500 		mdev->bm_resync_fo = bit + 1;
501 
502 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
503 			drbd_rs_complete_io(mdev, sector);
504 			goto next_sector;
505 		}
506 
507 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
508 		/* try to find some adjacent bits.
509 		 * we stop if we have already the maximum req size.
510 		 *
511 		 * Additionally always align bigger requests, in order to
512 		 * be prepared for all stripe sizes of software RAIDs.
513 		 *
514 		 * we _do_ care about the agreed-upon q->max_segment_size
515 		 * here, as splitting up the requests on the other side is more
516 		 * difficult.  the consequence is, that on lvm and md and other
517 		 * "indirect" devices, this is dead code, since
518 		 * q->max_segment_size will be PAGE_SIZE.
519 		 */
520 		align = 1;
521 		for (;;) {
522 			if (size + BM_BLOCK_SIZE > max_segment_size)
523 				break;
524 
525 			/* Be always aligned */
526 			if (sector & ((1<<(align+3))-1))
527 				break;
528 
529 			/* do not cross extent boundaries */
530 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
531 				break;
532 			/* now, is it actually dirty, after all?
533 			 * caution, drbd_bm_test_bit is tri-state for some
534 			 * obscure reason; ( b == 0 ) would get the out-of-band
535 			 * only accidentally right because of the "oddly sized"
536 			 * adjustment below */
537 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
538 				break;
539 			bit++;
540 			size += BM_BLOCK_SIZE;
541 			if ((BM_BLOCK_SIZE << align) <= size)
542 				align++;
543 			i++;
544 		}
545 		/* if we merged some,
546 		 * reset the offset to start the next drbd_bm_find_next from */
547 		if (size > BM_BLOCK_SIZE)
548 			mdev->bm_resync_fo = bit + 1;
549 #endif
550 
551 		/* adjust very last sectors, in case we are oddly sized */
552 		if (sector + (size>>9) > capacity)
553 			size = (capacity-sector)<<9;
554 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
555 			switch (read_for_csum(mdev, sector, size)) {
556 			case 0: /* Disk failure*/
557 				put_ldev(mdev);
558 				return 0;
559 			case 2: /* Allocation failed */
560 				drbd_rs_complete_io(mdev, sector);
561 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
562 				goto requeue;
563 			/* case 1: everything ok */
564 			}
565 		} else {
566 			inc_rs_pending(mdev);
567 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
568 					       sector, size, ID_SYNCER)) {
569 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
570 				dec_rs_pending(mdev);
571 				put_ldev(mdev);
572 				return 0;
573 			}
574 		}
575 	}
576 
577 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
578 		/* last syncer _request_ was sent,
579 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
580 		 * next sync group will resume), as soon as we receive the last
581 		 * resync data block, and the last bit is cleared.
582 		 * until then resync "work" is "inactive" ...
583 		 */
584 		mdev->resync_work.cb = w_resync_inactive;
585 		put_ldev(mdev);
586 		return 1;
587 	}
588 
589  requeue:
590 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
591 	put_ldev(mdev);
592 	return 1;
593 }
594 
595 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
596 {
597 	int number, i, size;
598 	sector_t sector;
599 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
600 
601 	if (unlikely(cancel))
602 		return 1;
603 
604 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
605 		dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
606 		return 0;
607 	}
608 
609 	number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
610 	if (atomic_read(&mdev->rs_pending_cnt) > number)
611 		goto requeue;
612 
613 	number -= atomic_read(&mdev->rs_pending_cnt);
614 
615 	sector = mdev->ov_position;
616 	for (i = 0; i < number; i++) {
617 		if (sector >= capacity) {
618 			mdev->resync_work.cb = w_resync_inactive;
619 			return 1;
620 		}
621 
622 		size = BM_BLOCK_SIZE;
623 
624 		if (drbd_try_rs_begin_io(mdev, sector)) {
625 			mdev->ov_position = sector;
626 			goto requeue;
627 		}
628 
629 		if (sector + (size>>9) > capacity)
630 			size = (capacity-sector)<<9;
631 
632 		inc_rs_pending(mdev);
633 		if (!drbd_send_ov_request(mdev, sector, size)) {
634 			dec_rs_pending(mdev);
635 			return 0;
636 		}
637 		sector += BM_SECT_PER_BIT;
638 	}
639 	mdev->ov_position = sector;
640 
641  requeue:
642 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
643 	return 1;
644 }
645 
646 
647 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
648 {
649 	kfree(w);
650 	ov_oos_print(mdev);
651 	drbd_resync_finished(mdev);
652 
653 	return 1;
654 }
655 
656 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
657 {
658 	kfree(w);
659 
660 	drbd_resync_finished(mdev);
661 
662 	return 1;
663 }
664 
665 int drbd_resync_finished(struct drbd_conf *mdev)
666 {
667 	unsigned long db, dt, dbdt;
668 	unsigned long n_oos;
669 	union drbd_state os, ns;
670 	struct drbd_work *w;
671 	char *khelper_cmd = NULL;
672 
673 	/* Remove all elements from the resync LRU. Since future actions
674 	 * might set bits in the (main) bitmap, then the entries in the
675 	 * resync LRU would be wrong. */
676 	if (drbd_rs_del_all(mdev)) {
677 		/* In case this is not possible now, most probably because
678 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
679 		 * queue (or even the read operations for those packets
680 		 * is not finished by now).   Retry in 100ms. */
681 
682 		drbd_kick_lo(mdev);
683 		__set_current_state(TASK_INTERRUPTIBLE);
684 		schedule_timeout(HZ / 10);
685 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
686 		if (w) {
687 			w->cb = w_resync_finished;
688 			drbd_queue_work(&mdev->data.work, w);
689 			return 1;
690 		}
691 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
692 	}
693 
694 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
695 	if (dt <= 0)
696 		dt = 1;
697 	db = mdev->rs_total;
698 	dbdt = Bit2KB(db/dt);
699 	mdev->rs_paused /= HZ;
700 
701 	if (!get_ldev(mdev))
702 		goto out;
703 
704 	spin_lock_irq(&mdev->req_lock);
705 	os = mdev->state;
706 
707 	/* This protects us against multiple calls (that can happen in the presence
708 	   of application IO), and against connectivity loss just before we arrive here. */
709 	if (os.conn <= C_CONNECTED)
710 		goto out_unlock;
711 
712 	ns = os;
713 	ns.conn = C_CONNECTED;
714 
715 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
716 	     (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
717 	     "Online verify " : "Resync",
718 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
719 
720 	n_oos = drbd_bm_total_weight(mdev);
721 
722 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
723 		if (n_oos) {
724 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
725 			      n_oos, Bit2KB(1));
726 			khelper_cmd = "out-of-sync";
727 		}
728 	} else {
729 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
730 
731 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
732 			khelper_cmd = "after-resync-target";
733 
734 		if (mdev->csums_tfm && mdev->rs_total) {
735 			const unsigned long s = mdev->rs_same_csum;
736 			const unsigned long t = mdev->rs_total;
737 			const int ratio =
738 				(t == 0)     ? 0 :
739 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
740 			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
741 			     "transferred %luK total %luK\n",
742 			     ratio,
743 			     Bit2KB(mdev->rs_same_csum),
744 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
745 			     Bit2KB(mdev->rs_total));
746 		}
747 	}
748 
749 	if (mdev->rs_failed) {
750 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
751 
752 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
753 			ns.disk = D_INCONSISTENT;
754 			ns.pdsk = D_UP_TO_DATE;
755 		} else {
756 			ns.disk = D_UP_TO_DATE;
757 			ns.pdsk = D_INCONSISTENT;
758 		}
759 	} else {
760 		ns.disk = D_UP_TO_DATE;
761 		ns.pdsk = D_UP_TO_DATE;
762 
763 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
764 			if (mdev->p_uuid) {
765 				int i;
766 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
767 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
768 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
769 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
770 			} else {
771 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
772 			}
773 		}
774 
775 		drbd_uuid_set_bm(mdev, 0UL);
776 
777 		if (mdev->p_uuid) {
778 			/* Now the two UUID sets are equal, update what we
779 			 * know of the peer. */
780 			int i;
781 			for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
782 				mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
783 		}
784 	}
785 
786 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
787 out_unlock:
788 	spin_unlock_irq(&mdev->req_lock);
789 	put_ldev(mdev);
790 out:
791 	mdev->rs_total  = 0;
792 	mdev->rs_failed = 0;
793 	mdev->rs_paused = 0;
794 	mdev->ov_start_sector = 0;
795 
796 	if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
797 		dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
798 		drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
799 	}
800 
801 	if (khelper_cmd)
802 		drbd_khelper(mdev, khelper_cmd);
803 
804 	return 1;
805 }
806 
807 /* helper */
808 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
809 {
810 	if (drbd_bio_has_active_page(e->private_bio)) {
811 		/* This might happen if sendpage() has not finished */
812 		spin_lock_irq(&mdev->req_lock);
813 		list_add_tail(&e->w.list, &mdev->net_ee);
814 		spin_unlock_irq(&mdev->req_lock);
815 	} else
816 		drbd_free_ee(mdev, e);
817 }
818 
819 /**
820  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
821  * @mdev:	DRBD device.
822  * @w:		work object.
823  * @cancel:	The connection will be closed anyways
824  */
825 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
826 {
827 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
828 	int ok;
829 
830 	if (unlikely(cancel)) {
831 		drbd_free_ee(mdev, e);
832 		dec_unacked(mdev);
833 		return 1;
834 	}
835 
836 	if (likely(drbd_bio_uptodate(e->private_bio))) {
837 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
838 	} else {
839 		if (__ratelimit(&drbd_ratelimit_state))
840 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
841 			    (unsigned long long)e->sector);
842 
843 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
844 	}
845 
846 	dec_unacked(mdev);
847 
848 	move_to_net_ee_or_free(mdev, e);
849 
850 	if (unlikely(!ok))
851 		dev_err(DEV, "drbd_send_block() failed\n");
852 	return ok;
853 }
854 
855 /**
856  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
857  * @mdev:	DRBD device.
858  * @w:		work object.
859  * @cancel:	The connection will be closed anyways
860  */
861 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
862 {
863 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
864 	int ok;
865 
866 	if (unlikely(cancel)) {
867 		drbd_free_ee(mdev, e);
868 		dec_unacked(mdev);
869 		return 1;
870 	}
871 
872 	if (get_ldev_if_state(mdev, D_FAILED)) {
873 		drbd_rs_complete_io(mdev, e->sector);
874 		put_ldev(mdev);
875 	}
876 
877 	if (likely(drbd_bio_uptodate(e->private_bio))) {
878 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
879 			inc_rs_pending(mdev);
880 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
881 		} else {
882 			if (__ratelimit(&drbd_ratelimit_state))
883 				dev_err(DEV, "Not sending RSDataReply, "
884 				    "partner DISKLESS!\n");
885 			ok = 1;
886 		}
887 	} else {
888 		if (__ratelimit(&drbd_ratelimit_state))
889 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
890 			    (unsigned long long)e->sector);
891 
892 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
893 
894 		/* update resync data with failure */
895 		drbd_rs_failed_io(mdev, e->sector, e->size);
896 	}
897 
898 	dec_unacked(mdev);
899 
900 	move_to_net_ee_or_free(mdev, e);
901 
902 	if (unlikely(!ok))
903 		dev_err(DEV, "drbd_send_block() failed\n");
904 	return ok;
905 }
906 
907 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
908 {
909 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
910 	struct digest_info *di;
911 	int digest_size;
912 	void *digest = NULL;
913 	int ok, eq = 0;
914 
915 	if (unlikely(cancel)) {
916 		drbd_free_ee(mdev, e);
917 		dec_unacked(mdev);
918 		return 1;
919 	}
920 
921 	drbd_rs_complete_io(mdev, e->sector);
922 
923 	di = (struct digest_info *)(unsigned long)e->block_id;
924 
925 	if (likely(drbd_bio_uptodate(e->private_bio))) {
926 		/* quick hack to try to avoid a race against reconfiguration.
927 		 * a real fix would be much more involved,
928 		 * introducing more locking mechanisms */
929 		if (mdev->csums_tfm) {
930 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
931 			D_ASSERT(digest_size == di->digest_size);
932 			digest = kmalloc(digest_size, GFP_NOIO);
933 		}
934 		if (digest) {
935 			drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
936 			eq = !memcmp(digest, di->digest, digest_size);
937 			kfree(digest);
938 		}
939 
940 		if (eq) {
941 			drbd_set_in_sync(mdev, e->sector, e->size);
942 			mdev->rs_same_csum++;
943 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
944 		} else {
945 			inc_rs_pending(mdev);
946 			e->block_id = ID_SYNCER;
947 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
948 		}
949 	} else {
950 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
951 		if (__ratelimit(&drbd_ratelimit_state))
952 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
953 	}
954 
955 	dec_unacked(mdev);
956 
957 	kfree(di);
958 
959 	move_to_net_ee_or_free(mdev, e);
960 
961 	if (unlikely(!ok))
962 		dev_err(DEV, "drbd_send_block/ack() failed\n");
963 	return ok;
964 }
965 
966 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
967 {
968 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
969 	int digest_size;
970 	void *digest;
971 	int ok = 1;
972 
973 	if (unlikely(cancel))
974 		goto out;
975 
976 	if (unlikely(!drbd_bio_uptodate(e->private_bio)))
977 		goto out;
978 
979 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
980 	/* FIXME if this allocation fails, online verify will not terminate! */
981 	digest = kmalloc(digest_size, GFP_NOIO);
982 	if (digest) {
983 		drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
984 		inc_rs_pending(mdev);
985 		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
986 					     digest, digest_size, P_OV_REPLY);
987 		if (!ok)
988 			dec_rs_pending(mdev);
989 		kfree(digest);
990 	}
991 
992 out:
993 	drbd_free_ee(mdev, e);
994 
995 	dec_unacked(mdev);
996 
997 	return ok;
998 }
999 
1000 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1001 {
1002 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1003 		mdev->ov_last_oos_size += size>>9;
1004 	} else {
1005 		mdev->ov_last_oos_start = sector;
1006 		mdev->ov_last_oos_size = size>>9;
1007 	}
1008 	drbd_set_out_of_sync(mdev, sector, size);
1009 	set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1010 }
1011 
1012 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1013 {
1014 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1015 	struct digest_info *di;
1016 	int digest_size;
1017 	void *digest;
1018 	int ok, eq = 0;
1019 
1020 	if (unlikely(cancel)) {
1021 		drbd_free_ee(mdev, e);
1022 		dec_unacked(mdev);
1023 		return 1;
1024 	}
1025 
1026 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1027 	 * the resync lru has been cleaned up already */
1028 	drbd_rs_complete_io(mdev, e->sector);
1029 
1030 	di = (struct digest_info *)(unsigned long)e->block_id;
1031 
1032 	if (likely(drbd_bio_uptodate(e->private_bio))) {
1033 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1034 		digest = kmalloc(digest_size, GFP_NOIO);
1035 		if (digest) {
1036 			drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
1037 
1038 			D_ASSERT(digest_size == di->digest_size);
1039 			eq = !memcmp(digest, di->digest, digest_size);
1040 			kfree(digest);
1041 		}
1042 	} else {
1043 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1044 		if (__ratelimit(&drbd_ratelimit_state))
1045 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1046 	}
1047 
1048 	dec_unacked(mdev);
1049 
1050 	kfree(di);
1051 
1052 	if (!eq)
1053 		drbd_ov_oos_found(mdev, e->sector, e->size);
1054 	else
1055 		ov_oos_print(mdev);
1056 
1057 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1058 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1059 
1060 	drbd_free_ee(mdev, e);
1061 
1062 	if (--mdev->ov_left == 0) {
1063 		ov_oos_print(mdev);
1064 		drbd_resync_finished(mdev);
1065 	}
1066 
1067 	return ok;
1068 }
1069 
1070 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1071 {
1072 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1073 	complete(&b->done);
1074 	return 1;
1075 }
1076 
1077 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1078 {
1079 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1080 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1081 	int ok = 1;
1082 
1083 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1084 	 * just before it was reassigned and re-queued, so double check that.
1085 	 * actually, this race was harmless, since we only try to send the
1086 	 * barrier packet here, and otherwise do nothing with the object.
1087 	 * but compare with the head of w_clear_epoch */
1088 	spin_lock_irq(&mdev->req_lock);
1089 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1090 		cancel = 1;
1091 	spin_unlock_irq(&mdev->req_lock);
1092 	if (cancel)
1093 		return 1;
1094 
1095 	if (!drbd_get_data_sock(mdev))
1096 		return 0;
1097 	p->barrier = b->br_number;
1098 	/* inc_ap_pending was done where this was queued.
1099 	 * dec_ap_pending will be done in got_BarrierAck
1100 	 * or (on connection loss) in w_clear_epoch.  */
1101 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1102 				(struct p_header *)p, sizeof(*p), 0);
1103 	drbd_put_data_sock(mdev);
1104 
1105 	return ok;
1106 }
1107 
1108 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1109 {
1110 	if (cancel)
1111 		return 1;
1112 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1113 }
1114 
1115 /**
1116  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1117  * @mdev:	DRBD device.
1118  * @w:		work object.
1119  * @cancel:	The connection will be closed anyways
1120  */
1121 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1122 {
1123 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1124 	int ok;
1125 
1126 	if (unlikely(cancel)) {
1127 		req_mod(req, send_canceled);
1128 		return 1;
1129 	}
1130 
1131 	ok = drbd_send_dblock(mdev, req);
1132 	req_mod(req, ok ? handed_over_to_network : send_failed);
1133 
1134 	return ok;
1135 }
1136 
1137 /**
1138  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1139  * @mdev:	DRBD device.
1140  * @w:		work object.
1141  * @cancel:	The connection will be closed anyways
1142  */
1143 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1144 {
1145 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1146 	int ok;
1147 
1148 	if (unlikely(cancel)) {
1149 		req_mod(req, send_canceled);
1150 		return 1;
1151 	}
1152 
1153 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1154 				(unsigned long)req);
1155 
1156 	if (!ok) {
1157 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1158 		 * so this is probably redundant */
1159 		if (mdev->state.conn >= C_CONNECTED)
1160 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1161 	}
1162 	req_mod(req, ok ? handed_over_to_network : send_failed);
1163 
1164 	return ok;
1165 }
1166 
1167 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1168 {
1169 	struct drbd_conf *odev = mdev;
1170 
1171 	while (1) {
1172 		if (odev->sync_conf.after == -1)
1173 			return 1;
1174 		odev = minor_to_mdev(odev->sync_conf.after);
1175 		ERR_IF(!odev) return 1;
1176 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1177 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1178 		    odev->state.aftr_isp || odev->state.peer_isp ||
1179 		    odev->state.user_isp)
1180 			return 0;
1181 	}
1182 }
1183 
1184 /**
1185  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1186  * @mdev:	DRBD device.
1187  *
1188  * Called from process context only (admin command and after_state_ch).
1189  */
1190 static int _drbd_pause_after(struct drbd_conf *mdev)
1191 {
1192 	struct drbd_conf *odev;
1193 	int i, rv = 0;
1194 
1195 	for (i = 0; i < minor_count; i++) {
1196 		odev = minor_to_mdev(i);
1197 		if (!odev)
1198 			continue;
1199 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1200 			continue;
1201 		if (!_drbd_may_sync_now(odev))
1202 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1203 			       != SS_NOTHING_TO_DO);
1204 	}
1205 
1206 	return rv;
1207 }
1208 
1209 /**
1210  * _drbd_resume_next() - Resume resync on all devices that may resync now
1211  * @mdev:	DRBD device.
1212  *
1213  * Called from process context only (admin command and worker).
1214  */
1215 static int _drbd_resume_next(struct drbd_conf *mdev)
1216 {
1217 	struct drbd_conf *odev;
1218 	int i, rv = 0;
1219 
1220 	for (i = 0; i < minor_count; i++) {
1221 		odev = minor_to_mdev(i);
1222 		if (!odev)
1223 			continue;
1224 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1225 			continue;
1226 		if (odev->state.aftr_isp) {
1227 			if (_drbd_may_sync_now(odev))
1228 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1229 							CS_HARD, NULL)
1230 				       != SS_NOTHING_TO_DO) ;
1231 		}
1232 	}
1233 	return rv;
1234 }
1235 
1236 void resume_next_sg(struct drbd_conf *mdev)
1237 {
1238 	write_lock_irq(&global_state_lock);
1239 	_drbd_resume_next(mdev);
1240 	write_unlock_irq(&global_state_lock);
1241 }
1242 
1243 void suspend_other_sg(struct drbd_conf *mdev)
1244 {
1245 	write_lock_irq(&global_state_lock);
1246 	_drbd_pause_after(mdev);
1247 	write_unlock_irq(&global_state_lock);
1248 }
1249 
1250 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1251 {
1252 	struct drbd_conf *odev;
1253 
1254 	if (o_minor == -1)
1255 		return NO_ERROR;
1256 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1257 		return ERR_SYNC_AFTER;
1258 
1259 	/* check for loops */
1260 	odev = minor_to_mdev(o_minor);
1261 	while (1) {
1262 		if (odev == mdev)
1263 			return ERR_SYNC_AFTER_CYCLE;
1264 
1265 		/* dependency chain ends here, no cycles. */
1266 		if (odev->sync_conf.after == -1)
1267 			return NO_ERROR;
1268 
1269 		/* follow the dependency chain */
1270 		odev = minor_to_mdev(odev->sync_conf.after);
1271 	}
1272 }
1273 
1274 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1275 {
1276 	int changes;
1277 	int retcode;
1278 
1279 	write_lock_irq(&global_state_lock);
1280 	retcode = sync_after_error(mdev, na);
1281 	if (retcode == NO_ERROR) {
1282 		mdev->sync_conf.after = na;
1283 		do {
1284 			changes  = _drbd_pause_after(mdev);
1285 			changes |= _drbd_resume_next(mdev);
1286 		} while (changes);
1287 	}
1288 	write_unlock_irq(&global_state_lock);
1289 	return retcode;
1290 }
1291 
1292 /**
1293  * drbd_start_resync() - Start the resync process
1294  * @mdev:	DRBD device.
1295  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1296  *
1297  * This function might bring you directly into one of the
1298  * C_PAUSED_SYNC_* states.
1299  */
1300 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1301 {
1302 	union drbd_state ns;
1303 	int r;
1304 
1305 	if (mdev->state.conn >= C_SYNC_SOURCE) {
1306 		dev_err(DEV, "Resync already running!\n");
1307 		return;
1308 	}
1309 
1310 	/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1311 	drbd_rs_cancel_all(mdev);
1312 
1313 	if (side == C_SYNC_TARGET) {
1314 		/* Since application IO was locked out during C_WF_BITMAP_T and
1315 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1316 		   we check that we might make the data inconsistent. */
1317 		r = drbd_khelper(mdev, "before-resync-target");
1318 		r = (r >> 8) & 0xff;
1319 		if (r > 0) {
1320 			dev_info(DEV, "before-resync-target handler returned %d, "
1321 			     "dropping connection.\n", r);
1322 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1323 			return;
1324 		}
1325 	}
1326 
1327 	drbd_state_lock(mdev);
1328 
1329 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1330 		drbd_state_unlock(mdev);
1331 		return;
1332 	}
1333 
1334 	if (side == C_SYNC_TARGET) {
1335 		mdev->bm_resync_fo = 0;
1336 	} else /* side == C_SYNC_SOURCE */ {
1337 		u64 uuid;
1338 
1339 		get_random_bytes(&uuid, sizeof(u64));
1340 		drbd_uuid_set(mdev, UI_BITMAP, uuid);
1341 		drbd_send_sync_uuid(mdev, uuid);
1342 
1343 		D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1344 	}
1345 
1346 	write_lock_irq(&global_state_lock);
1347 	ns = mdev->state;
1348 
1349 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1350 
1351 	ns.conn = side;
1352 
1353 	if (side == C_SYNC_TARGET)
1354 		ns.disk = D_INCONSISTENT;
1355 	else /* side == C_SYNC_SOURCE */
1356 		ns.pdsk = D_INCONSISTENT;
1357 
1358 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1359 	ns = mdev->state;
1360 
1361 	if (ns.conn < C_CONNECTED)
1362 		r = SS_UNKNOWN_ERROR;
1363 
1364 	if (r == SS_SUCCESS) {
1365 		mdev->rs_total     =
1366 		mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1367 		mdev->rs_failed    = 0;
1368 		mdev->rs_paused    = 0;
1369 		mdev->rs_start     =
1370 		mdev->rs_mark_time = jiffies;
1371 		mdev->rs_same_csum = 0;
1372 		_drbd_pause_after(mdev);
1373 	}
1374 	write_unlock_irq(&global_state_lock);
1375 	drbd_state_unlock(mdev);
1376 	put_ldev(mdev);
1377 
1378 	if (r == SS_SUCCESS) {
1379 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1380 		     drbd_conn_str(ns.conn),
1381 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1382 		     (unsigned long) mdev->rs_total);
1383 
1384 		if (mdev->rs_total == 0) {
1385 			/* Peer still reachable? Beware of failing before-resync-target handlers! */
1386 			request_ping(mdev);
1387 			__set_current_state(TASK_INTERRUPTIBLE);
1388 			schedule_timeout(mdev->net_conf->ping_timeo*HZ/9); /* 9 instead 10 */
1389 			drbd_resync_finished(mdev);
1390 			return;
1391 		}
1392 
1393 		/* ns.conn may already be != mdev->state.conn,
1394 		 * we may have been paused in between, or become paused until
1395 		 * the timer triggers.
1396 		 * No matter, that is handled in resync_timer_fn() */
1397 		if (ns.conn == C_SYNC_TARGET)
1398 			mod_timer(&mdev->resync_timer, jiffies);
1399 
1400 		drbd_md_sync(mdev);
1401 	}
1402 }
1403 
1404 int drbd_worker(struct drbd_thread *thi)
1405 {
1406 	struct drbd_conf *mdev = thi->mdev;
1407 	struct drbd_work *w = NULL;
1408 	LIST_HEAD(work_list);
1409 	int intr = 0, i;
1410 
1411 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1412 
1413 	while (get_t_state(thi) == Running) {
1414 		drbd_thread_current_set_cpu(mdev);
1415 
1416 		if (down_trylock(&mdev->data.work.s)) {
1417 			mutex_lock(&mdev->data.mutex);
1418 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1419 				drbd_tcp_uncork(mdev->data.socket);
1420 			mutex_unlock(&mdev->data.mutex);
1421 
1422 			intr = down_interruptible(&mdev->data.work.s);
1423 
1424 			mutex_lock(&mdev->data.mutex);
1425 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1426 				drbd_tcp_cork(mdev->data.socket);
1427 			mutex_unlock(&mdev->data.mutex);
1428 		}
1429 
1430 		if (intr) {
1431 			D_ASSERT(intr == -EINTR);
1432 			flush_signals(current);
1433 			ERR_IF (get_t_state(thi) == Running)
1434 				continue;
1435 			break;
1436 		}
1437 
1438 		if (get_t_state(thi) != Running)
1439 			break;
1440 		/* With this break, we have done a down() but not consumed
1441 		   the entry from the list. The cleanup code takes care of
1442 		   this...   */
1443 
1444 		w = NULL;
1445 		spin_lock_irq(&mdev->data.work.q_lock);
1446 		ERR_IF(list_empty(&mdev->data.work.q)) {
1447 			/* something terribly wrong in our logic.
1448 			 * we were able to down() the semaphore,
1449 			 * but the list is empty... doh.
1450 			 *
1451 			 * what is the best thing to do now?
1452 			 * try again from scratch, restarting the receiver,
1453 			 * asender, whatnot? could break even more ugly,
1454 			 * e.g. when we are primary, but no good local data.
1455 			 *
1456 			 * I'll try to get away just starting over this loop.
1457 			 */
1458 			spin_unlock_irq(&mdev->data.work.q_lock);
1459 			continue;
1460 		}
1461 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1462 		list_del_init(&w->list);
1463 		spin_unlock_irq(&mdev->data.work.q_lock);
1464 
1465 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1466 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1467 			if (mdev->state.conn >= C_CONNECTED)
1468 				drbd_force_state(mdev,
1469 						NS(conn, C_NETWORK_FAILURE));
1470 		}
1471 	}
1472 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1473 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1474 
1475 	spin_lock_irq(&mdev->data.work.q_lock);
1476 	i = 0;
1477 	while (!list_empty(&mdev->data.work.q)) {
1478 		list_splice_init(&mdev->data.work.q, &work_list);
1479 		spin_unlock_irq(&mdev->data.work.q_lock);
1480 
1481 		while (!list_empty(&work_list)) {
1482 			w = list_entry(work_list.next, struct drbd_work, list);
1483 			list_del_init(&w->list);
1484 			w->cb(mdev, w, 1);
1485 			i++; /* dead debugging code */
1486 		}
1487 
1488 		spin_lock_irq(&mdev->data.work.q_lock);
1489 	}
1490 	sema_init(&mdev->data.work.s, 0);
1491 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1492 	 * but up() ed outside the spinlock, we could get an up() on the
1493 	 * semaphore without corresponding list entry.
1494 	 * So don't do that.
1495 	 */
1496 	spin_unlock_irq(&mdev->data.work.q_lock);
1497 
1498 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1499 	/* _drbd_set_state only uses stop_nowait.
1500 	 * wait here for the Exiting receiver. */
1501 	drbd_thread_stop(&mdev->receiver);
1502 	drbd_mdev_cleanup(mdev);
1503 
1504 	dev_info(DEV, "worker terminated\n");
1505 
1506 	clear_bit(DEVICE_DYING, &mdev->flags);
1507 	clear_bit(CONFIG_PENDING, &mdev->flags);
1508 	wake_up(&mdev->state_wait);
1509 
1510 	return 0;
1511 }
1512