xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision cdd67a74)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38 
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41 
42 #define SLEEP_TIME (HZ/10)
43 
44 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
45 
46 
47 
48 /* defined here:
49    drbd_md_io_complete
50    drbd_endio_write_sec
51    drbd_endio_read_sec
52    drbd_endio_pri
53 
54  * more endio handlers:
55    atodb_endio in drbd_actlog.c
56    drbd_bm_async_io_complete in drbd_bitmap.c
57 
58  * For all these callbacks, note the following:
59  * The callbacks will be called in irq context by the IDE drivers,
60  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
61  * Try to get the locking right :)
62  *
63  */
64 
65 
66 /* About the global_state_lock
67    Each state transition on an device holds a read lock. In case we have
68    to evaluate the sync after dependencies, we grab a write lock, because
69    we need stable states on all devices for that.  */
70 rwlock_t global_state_lock;
71 
72 /* used for synchronous meta data and bitmap IO
73  * submitted by drbd_md_sync_page_io()
74  */
75 void drbd_md_io_complete(struct bio *bio, int error)
76 {
77 	struct drbd_md_io *md_io;
78 
79 	md_io = (struct drbd_md_io *)bio->bi_private;
80 	md_io->error = error;
81 
82 	complete(&md_io->event);
83 }
84 
85 /* reads on behalf of the partner,
86  * "submitted" by the receiver
87  */
88 void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
89 {
90 	unsigned long flags = 0;
91 	struct drbd_epoch_entry *e = NULL;
92 	struct drbd_conf *mdev;
93 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
94 
95 	e = bio->bi_private;
96 	mdev = e->mdev;
97 
98 	if (error)
99 		dev_warn(DEV, "read: error=%d s=%llus\n", error,
100 				(unsigned long long)e->sector);
101 	if (!error && !uptodate) {
102 		dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
103 				(unsigned long long)e->sector);
104 		/* strange behavior of some lower level drivers...
105 		 * fail the request by clearing the uptodate flag,
106 		 * but do not return any error?! */
107 		error = -EIO;
108 	}
109 
110 	D_ASSERT(e->block_id != ID_VACANT);
111 
112 	spin_lock_irqsave(&mdev->req_lock, flags);
113 	mdev->read_cnt += e->size >> 9;
114 	list_del(&e->w.list);
115 	if (list_empty(&mdev->read_ee))
116 		wake_up(&mdev->ee_wait);
117 	spin_unlock_irqrestore(&mdev->req_lock, flags);
118 
119 	drbd_chk_io_error(mdev, error, FALSE);
120 	drbd_queue_work(&mdev->data.work, &e->w);
121 	put_ldev(mdev);
122 }
123 
124 /* writes on behalf of the partner, or resync writes,
125  * "submitted" by the receiver.
126  */
127 void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
128 {
129 	unsigned long flags = 0;
130 	struct drbd_epoch_entry *e = NULL;
131 	struct drbd_conf *mdev;
132 	sector_t e_sector;
133 	int do_wake;
134 	int is_syncer_req;
135 	int do_al_complete_io;
136 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
137 	int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
138 
139 	e = bio->bi_private;
140 	mdev = e->mdev;
141 
142 	if (error)
143 		dev_warn(DEV, "write: error=%d s=%llus\n", error,
144 				(unsigned long long)e->sector);
145 	if (!error && !uptodate) {
146 		dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
147 				(unsigned long long)e->sector);
148 		/* strange behavior of some lower level drivers...
149 		 * fail the request by clearing the uptodate flag,
150 		 * but do not return any error?! */
151 		error = -EIO;
152 	}
153 
154 	/* error == -ENOTSUPP would be a better test,
155 	 * alas it is not reliable */
156 	if (error && is_barrier && e->flags & EE_IS_BARRIER) {
157 		drbd_bump_write_ordering(mdev, WO_bdev_flush);
158 		spin_lock_irqsave(&mdev->req_lock, flags);
159 		list_del(&e->w.list);
160 		e->w.cb = w_e_reissue;
161 		/* put_ldev actually happens below, once we come here again. */
162 		__release(local);
163 		spin_unlock_irqrestore(&mdev->req_lock, flags);
164 		drbd_queue_work(&mdev->data.work, &e->w);
165 		return;
166 	}
167 
168 	D_ASSERT(e->block_id != ID_VACANT);
169 
170 	spin_lock_irqsave(&mdev->req_lock, flags);
171 	mdev->writ_cnt += e->size >> 9;
172 	is_syncer_req = is_syncer_block_id(e->block_id);
173 
174 	/* after we moved e to done_ee,
175 	 * we may no longer access it,
176 	 * it may be freed/reused already!
177 	 * (as soon as we release the req_lock) */
178 	e_sector = e->sector;
179 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
180 
181 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
182 	list_add_tail(&e->w.list, &mdev->done_ee);
183 
184 	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
185 	 * neither did we wake possibly waiting conflicting requests.
186 	 * done from "drbd_process_done_ee" within the appropriate w.cb
187 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
188 
189 	do_wake = is_syncer_req
190 		? list_empty(&mdev->sync_ee)
191 		: list_empty(&mdev->active_ee);
192 
193 	if (error)
194 		__drbd_chk_io_error(mdev, FALSE);
195 	spin_unlock_irqrestore(&mdev->req_lock, flags);
196 
197 	if (is_syncer_req)
198 		drbd_rs_complete_io(mdev, e_sector);
199 
200 	if (do_wake)
201 		wake_up(&mdev->ee_wait);
202 
203 	if (do_al_complete_io)
204 		drbd_al_complete_io(mdev, e_sector);
205 
206 	wake_asender(mdev);
207 	put_ldev(mdev);
208 
209 }
210 
211 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
212  */
213 void drbd_endio_pri(struct bio *bio, int error)
214 {
215 	unsigned long flags;
216 	struct drbd_request *req = bio->bi_private;
217 	struct drbd_conf *mdev = req->mdev;
218 	struct bio_and_error m;
219 	enum drbd_req_event what;
220 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
221 
222 	if (error)
223 		dev_warn(DEV, "p %s: error=%d\n",
224 			 bio_data_dir(bio) == WRITE ? "write" : "read", error);
225 	if (!error && !uptodate) {
226 		dev_warn(DEV, "p %s: setting error to -EIO\n",
227 			 bio_data_dir(bio) == WRITE ? "write" : "read");
228 		/* strange behavior of some lower level drivers...
229 		 * fail the request by clearing the uptodate flag,
230 		 * but do not return any error?! */
231 		error = -EIO;
232 	}
233 
234 	/* to avoid recursion in __req_mod */
235 	if (unlikely(error)) {
236 		what = (bio_data_dir(bio) == WRITE)
237 			? write_completed_with_error
238 			: (bio_rw(bio) == READA)
239 			  ? read_completed_with_error
240 			  : read_ahead_completed_with_error;
241 	} else
242 		what = completed_ok;
243 
244 	bio_put(req->private_bio);
245 	req->private_bio = ERR_PTR(error);
246 
247 	spin_lock_irqsave(&mdev->req_lock, flags);
248 	__req_mod(req, what, &m);
249 	spin_unlock_irqrestore(&mdev->req_lock, flags);
250 
251 	if (m.bio)
252 		complete_master_bio(mdev, &m);
253 }
254 
255 int w_io_error(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
256 {
257 	struct drbd_request *req = container_of(w, struct drbd_request, w);
258 
259 	/* NOTE: mdev->ldev can be NULL by the time we get here! */
260 	/* D_ASSERT(mdev->ldev->dc.on_io_error != EP_PASS_ON); */
261 
262 	/* the only way this callback is scheduled is from _req_may_be_done,
263 	 * when it is done and had a local write error, see comments there */
264 	drbd_req_free(req);
265 
266 	return TRUE;
267 }
268 
269 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
270 {
271 	struct drbd_request *req = container_of(w, struct drbd_request, w);
272 
273 	/* We should not detach for read io-error,
274 	 * but try to WRITE the P_DATA_REPLY to the failed location,
275 	 * to give the disk the chance to relocate that block */
276 
277 	spin_lock_irq(&mdev->req_lock);
278 	if (cancel ||
279 	    mdev->state.conn < C_CONNECTED ||
280 	    mdev->state.pdsk <= D_INCONSISTENT) {
281 		_req_mod(req, send_canceled);
282 		spin_unlock_irq(&mdev->req_lock);
283 		dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n");
284 		return 1;
285 	}
286 	spin_unlock_irq(&mdev->req_lock);
287 
288 	return w_send_read_req(mdev, w, 0);
289 }
290 
291 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
292 {
293 	ERR_IF(cancel) return 1;
294 	dev_err(DEV, "resync inactive, but callback triggered??\n");
295 	return 1; /* Simply ignore this! */
296 }
297 
298 void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
299 {
300 	struct hash_desc desc;
301 	struct scatterlist sg;
302 	struct bio_vec *bvec;
303 	int i;
304 
305 	desc.tfm = tfm;
306 	desc.flags = 0;
307 
308 	sg_init_table(&sg, 1);
309 	crypto_hash_init(&desc);
310 
311 	__bio_for_each_segment(bvec, bio, i, 0) {
312 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
313 		crypto_hash_update(&desc, &sg, sg.length);
314 	}
315 	crypto_hash_final(&desc, digest);
316 }
317 
318 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
319 {
320 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
321 	int digest_size;
322 	void *digest;
323 	int ok;
324 
325 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
326 
327 	if (unlikely(cancel)) {
328 		drbd_free_ee(mdev, e);
329 		return 1;
330 	}
331 
332 	if (likely(drbd_bio_uptodate(e->private_bio))) {
333 		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
334 		digest = kmalloc(digest_size, GFP_NOIO);
335 		if (digest) {
336 			drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
337 
338 			inc_rs_pending(mdev);
339 			ok = drbd_send_drequest_csum(mdev,
340 						     e->sector,
341 						     e->size,
342 						     digest,
343 						     digest_size,
344 						     P_CSUM_RS_REQUEST);
345 			kfree(digest);
346 		} else {
347 			dev_err(DEV, "kmalloc() of digest failed.\n");
348 			ok = 0;
349 		}
350 	} else
351 		ok = 1;
352 
353 	drbd_free_ee(mdev, e);
354 
355 	if (unlikely(!ok))
356 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
357 	return ok;
358 }
359 
360 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
361 
362 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
363 {
364 	struct drbd_epoch_entry *e;
365 
366 	if (!get_ldev(mdev))
367 		return 0;
368 
369 	/* GFP_TRY, because if there is no memory available right now, this may
370 	 * be rescheduled for later. It is "only" background resync, after all. */
371 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
372 	if (!e) {
373 		put_ldev(mdev);
374 		return 2;
375 	}
376 
377 	spin_lock_irq(&mdev->req_lock);
378 	list_add(&e->w.list, &mdev->read_ee);
379 	spin_unlock_irq(&mdev->req_lock);
380 
381 	e->private_bio->bi_end_io = drbd_endio_read_sec;
382 	e->private_bio->bi_rw = READ;
383 	e->w.cb = w_e_send_csum;
384 
385 	mdev->read_cnt += size >> 9;
386 	drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
387 
388 	return 1;
389 }
390 
391 void resync_timer_fn(unsigned long data)
392 {
393 	unsigned long flags;
394 	struct drbd_conf *mdev = (struct drbd_conf *) data;
395 	int queue;
396 
397 	spin_lock_irqsave(&mdev->req_lock, flags);
398 
399 	if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
400 		queue = 1;
401 		if (mdev->state.conn == C_VERIFY_S)
402 			mdev->resync_work.cb = w_make_ov_request;
403 		else
404 			mdev->resync_work.cb = w_make_resync_request;
405 	} else {
406 		queue = 0;
407 		mdev->resync_work.cb = w_resync_inactive;
408 	}
409 
410 	spin_unlock_irqrestore(&mdev->req_lock, flags);
411 
412 	/* harmless race: list_empty outside data.work.q_lock */
413 	if (list_empty(&mdev->resync_work.list) && queue)
414 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
415 }
416 
417 static int calc_resync_rate(struct drbd_conf *mdev)
418 {
419 	int d = mdev->data_delay / 1000; /* us -> ms */
420 	int td = mdev->sync_conf.throttle_th * 100;  /* 0.1s -> ms */
421 	int hd = mdev->sync_conf.hold_off_th * 100;  /* 0.1s -> ms */
422 	int cr = mdev->sync_conf.rate;
423 
424 	return d <= td ? cr :
425 		d >= hd ? 0 :
426 		cr + (cr * (td - d) / (hd - td));
427 }
428 
429 int w_make_resync_request(struct drbd_conf *mdev,
430 		struct drbd_work *w, int cancel)
431 {
432 	unsigned long bit;
433 	sector_t sector;
434 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
435 	int max_segment_size = queue_max_segment_size(mdev->rq_queue);
436 	int number, i, size, pe, mx;
437 	int align, queued, sndbuf;
438 
439 	if (unlikely(cancel))
440 		return 1;
441 
442 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
443 		dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
444 		return 0;
445 	}
446 
447 	if (mdev->state.conn != C_SYNC_TARGET)
448 		dev_err(DEV, "%s in w_make_resync_request\n",
449 			drbd_conn_str(mdev->state.conn));
450 
451 	if (!get_ldev(mdev)) {
452 		/* Since we only need to access mdev->rsync a
453 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
454 		   to continue resync with a broken disk makes no sense at
455 		   all */
456 		dev_err(DEV, "Disk broke down during resync!\n");
457 		mdev->resync_work.cb = w_resync_inactive;
458 		return 1;
459 	}
460 
461 	mdev->c_sync_rate = calc_resync_rate(mdev);
462 	number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
463 	pe = atomic_read(&mdev->rs_pending_cnt);
464 
465 	mutex_lock(&mdev->data.mutex);
466 	if (mdev->data.socket)
467 		mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
468 	else
469 		mx = 1;
470 	mutex_unlock(&mdev->data.mutex);
471 
472 	/* For resync rates >160MB/sec, allow more pending RS requests */
473 	if (number > mx)
474 		mx = number;
475 
476 	/* Limit the number of pending RS requests to no more than the peer's receive buffer */
477 	if ((pe + number) > mx) {
478 		number = mx - pe;
479 	}
480 
481 	for (i = 0; i < number; i++) {
482 		/* Stop generating RS requests, when half of the send buffer is filled */
483 		mutex_lock(&mdev->data.mutex);
484 		if (mdev->data.socket) {
485 			queued = mdev->data.socket->sk->sk_wmem_queued;
486 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
487 		} else {
488 			queued = 1;
489 			sndbuf = 0;
490 		}
491 		mutex_unlock(&mdev->data.mutex);
492 		if (queued > sndbuf / 2)
493 			goto requeue;
494 
495 next_sector:
496 		size = BM_BLOCK_SIZE;
497 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
498 
499 		if (bit == -1UL) {
500 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
501 			mdev->resync_work.cb = w_resync_inactive;
502 			put_ldev(mdev);
503 			return 1;
504 		}
505 
506 		sector = BM_BIT_TO_SECT(bit);
507 
508 		if (drbd_try_rs_begin_io(mdev, sector)) {
509 			mdev->bm_resync_fo = bit;
510 			goto requeue;
511 		}
512 		mdev->bm_resync_fo = bit + 1;
513 
514 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
515 			drbd_rs_complete_io(mdev, sector);
516 			goto next_sector;
517 		}
518 
519 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
520 		/* try to find some adjacent bits.
521 		 * we stop if we have already the maximum req size.
522 		 *
523 		 * Additionally always align bigger requests, in order to
524 		 * be prepared for all stripe sizes of software RAIDs.
525 		 *
526 		 * we _do_ care about the agreed-upon q->max_segment_size
527 		 * here, as splitting up the requests on the other side is more
528 		 * difficult.  the consequence is, that on lvm and md and other
529 		 * "indirect" devices, this is dead code, since
530 		 * q->max_segment_size will be PAGE_SIZE.
531 		 */
532 		align = 1;
533 		for (;;) {
534 			if (size + BM_BLOCK_SIZE > max_segment_size)
535 				break;
536 
537 			/* Be always aligned */
538 			if (sector & ((1<<(align+3))-1))
539 				break;
540 
541 			/* do not cross extent boundaries */
542 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
543 				break;
544 			/* now, is it actually dirty, after all?
545 			 * caution, drbd_bm_test_bit is tri-state for some
546 			 * obscure reason; ( b == 0 ) would get the out-of-band
547 			 * only accidentally right because of the "oddly sized"
548 			 * adjustment below */
549 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
550 				break;
551 			bit++;
552 			size += BM_BLOCK_SIZE;
553 			if ((BM_BLOCK_SIZE << align) <= size)
554 				align++;
555 			i++;
556 		}
557 		/* if we merged some,
558 		 * reset the offset to start the next drbd_bm_find_next from */
559 		if (size > BM_BLOCK_SIZE)
560 			mdev->bm_resync_fo = bit + 1;
561 #endif
562 
563 		/* adjust very last sectors, in case we are oddly sized */
564 		if (sector + (size>>9) > capacity)
565 			size = (capacity-sector)<<9;
566 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
567 			switch (read_for_csum(mdev, sector, size)) {
568 			case 0: /* Disk failure*/
569 				put_ldev(mdev);
570 				return 0;
571 			case 2: /* Allocation failed */
572 				drbd_rs_complete_io(mdev, sector);
573 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
574 				goto requeue;
575 			/* case 1: everything ok */
576 			}
577 		} else {
578 			inc_rs_pending(mdev);
579 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
580 					       sector, size, ID_SYNCER)) {
581 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
582 				dec_rs_pending(mdev);
583 				put_ldev(mdev);
584 				return 0;
585 			}
586 		}
587 	}
588 
589 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
590 		/* last syncer _request_ was sent,
591 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
592 		 * next sync group will resume), as soon as we receive the last
593 		 * resync data block, and the last bit is cleared.
594 		 * until then resync "work" is "inactive" ...
595 		 */
596 		mdev->resync_work.cb = w_resync_inactive;
597 		put_ldev(mdev);
598 		return 1;
599 	}
600 
601  requeue:
602 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
603 	put_ldev(mdev);
604 	return 1;
605 }
606 
607 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
608 {
609 	int number, i, size;
610 	sector_t sector;
611 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
612 
613 	if (unlikely(cancel))
614 		return 1;
615 
616 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
617 		dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
618 		return 0;
619 	}
620 
621 	number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
622 	if (atomic_read(&mdev->rs_pending_cnt) > number)
623 		goto requeue;
624 
625 	number -= atomic_read(&mdev->rs_pending_cnt);
626 
627 	sector = mdev->ov_position;
628 	for (i = 0; i < number; i++) {
629 		if (sector >= capacity) {
630 			mdev->resync_work.cb = w_resync_inactive;
631 			return 1;
632 		}
633 
634 		size = BM_BLOCK_SIZE;
635 
636 		if (drbd_try_rs_begin_io(mdev, sector)) {
637 			mdev->ov_position = sector;
638 			goto requeue;
639 		}
640 
641 		if (sector + (size>>9) > capacity)
642 			size = (capacity-sector)<<9;
643 
644 		inc_rs_pending(mdev);
645 		if (!drbd_send_ov_request(mdev, sector, size)) {
646 			dec_rs_pending(mdev);
647 			return 0;
648 		}
649 		sector += BM_SECT_PER_BIT;
650 	}
651 	mdev->ov_position = sector;
652 
653  requeue:
654 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
655 	return 1;
656 }
657 
658 
659 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
660 {
661 	kfree(w);
662 	ov_oos_print(mdev);
663 	drbd_resync_finished(mdev);
664 
665 	return 1;
666 }
667 
668 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
669 {
670 	kfree(w);
671 
672 	drbd_resync_finished(mdev);
673 
674 	return 1;
675 }
676 
677 int drbd_resync_finished(struct drbd_conf *mdev)
678 {
679 	unsigned long db, dt, dbdt;
680 	unsigned long n_oos;
681 	union drbd_state os, ns;
682 	struct drbd_work *w;
683 	char *khelper_cmd = NULL;
684 
685 	/* Remove all elements from the resync LRU. Since future actions
686 	 * might set bits in the (main) bitmap, then the entries in the
687 	 * resync LRU would be wrong. */
688 	if (drbd_rs_del_all(mdev)) {
689 		/* In case this is not possible now, most probably because
690 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
691 		 * queue (or even the read operations for those packets
692 		 * is not finished by now).   Retry in 100ms. */
693 
694 		drbd_kick_lo(mdev);
695 		__set_current_state(TASK_INTERRUPTIBLE);
696 		schedule_timeout(HZ / 10);
697 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
698 		if (w) {
699 			w->cb = w_resync_finished;
700 			drbd_queue_work(&mdev->data.work, w);
701 			return 1;
702 		}
703 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
704 	}
705 
706 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
707 	if (dt <= 0)
708 		dt = 1;
709 	db = mdev->rs_total;
710 	dbdt = Bit2KB(db/dt);
711 	mdev->rs_paused /= HZ;
712 
713 	if (!get_ldev(mdev))
714 		goto out;
715 
716 	spin_lock_irq(&mdev->req_lock);
717 	os = mdev->state;
718 
719 	/* This protects us against multiple calls (that can happen in the presence
720 	   of application IO), and against connectivity loss just before we arrive here. */
721 	if (os.conn <= C_CONNECTED)
722 		goto out_unlock;
723 
724 	ns = os;
725 	ns.conn = C_CONNECTED;
726 
727 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
728 	     (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
729 	     "Online verify " : "Resync",
730 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
731 
732 	n_oos = drbd_bm_total_weight(mdev);
733 
734 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
735 		if (n_oos) {
736 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
737 			      n_oos, Bit2KB(1));
738 			khelper_cmd = "out-of-sync";
739 		}
740 	} else {
741 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
742 
743 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
744 			khelper_cmd = "after-resync-target";
745 
746 		if (mdev->csums_tfm && mdev->rs_total) {
747 			const unsigned long s = mdev->rs_same_csum;
748 			const unsigned long t = mdev->rs_total;
749 			const int ratio =
750 				(t == 0)     ? 0 :
751 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
752 			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
753 			     "transferred %luK total %luK\n",
754 			     ratio,
755 			     Bit2KB(mdev->rs_same_csum),
756 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
757 			     Bit2KB(mdev->rs_total));
758 		}
759 	}
760 
761 	if (mdev->rs_failed) {
762 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
763 
764 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
765 			ns.disk = D_INCONSISTENT;
766 			ns.pdsk = D_UP_TO_DATE;
767 		} else {
768 			ns.disk = D_UP_TO_DATE;
769 			ns.pdsk = D_INCONSISTENT;
770 		}
771 	} else {
772 		ns.disk = D_UP_TO_DATE;
773 		ns.pdsk = D_UP_TO_DATE;
774 
775 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
776 			if (mdev->p_uuid) {
777 				int i;
778 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
779 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
780 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
781 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
782 			} else {
783 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
784 			}
785 		}
786 
787 		drbd_uuid_set_bm(mdev, 0UL);
788 
789 		if (mdev->p_uuid) {
790 			/* Now the two UUID sets are equal, update what we
791 			 * know of the peer. */
792 			int i;
793 			for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
794 				mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
795 		}
796 	}
797 
798 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
799 out_unlock:
800 	spin_unlock_irq(&mdev->req_lock);
801 	put_ldev(mdev);
802 out:
803 	mdev->rs_total  = 0;
804 	mdev->rs_failed = 0;
805 	mdev->rs_paused = 0;
806 	mdev->ov_start_sector = 0;
807 
808 	if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
809 		dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
810 		drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
811 	}
812 
813 	if (khelper_cmd)
814 		drbd_khelper(mdev, khelper_cmd);
815 
816 	return 1;
817 }
818 
819 /* helper */
820 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
821 {
822 	if (drbd_bio_has_active_page(e->private_bio)) {
823 		/* This might happen if sendpage() has not finished */
824 		spin_lock_irq(&mdev->req_lock);
825 		list_add_tail(&e->w.list, &mdev->net_ee);
826 		spin_unlock_irq(&mdev->req_lock);
827 	} else
828 		drbd_free_ee(mdev, e);
829 }
830 
831 /**
832  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
833  * @mdev:	DRBD device.
834  * @w:		work object.
835  * @cancel:	The connection will be closed anyways
836  */
837 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
838 {
839 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
840 	int ok;
841 
842 	if (unlikely(cancel)) {
843 		drbd_free_ee(mdev, e);
844 		dec_unacked(mdev);
845 		return 1;
846 	}
847 
848 	if (likely(drbd_bio_uptodate(e->private_bio))) {
849 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
850 	} else {
851 		if (__ratelimit(&drbd_ratelimit_state))
852 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
853 			    (unsigned long long)e->sector);
854 
855 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
856 	}
857 
858 	dec_unacked(mdev);
859 
860 	move_to_net_ee_or_free(mdev, e);
861 
862 	if (unlikely(!ok))
863 		dev_err(DEV, "drbd_send_block() failed\n");
864 	return ok;
865 }
866 
867 /**
868  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
869  * @mdev:	DRBD device.
870  * @w:		work object.
871  * @cancel:	The connection will be closed anyways
872  */
873 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
874 {
875 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
876 	int ok;
877 
878 	if (unlikely(cancel)) {
879 		drbd_free_ee(mdev, e);
880 		dec_unacked(mdev);
881 		return 1;
882 	}
883 
884 	if (get_ldev_if_state(mdev, D_FAILED)) {
885 		drbd_rs_complete_io(mdev, e->sector);
886 		put_ldev(mdev);
887 	}
888 
889 	if (likely(drbd_bio_uptodate(e->private_bio))) {
890 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
891 			inc_rs_pending(mdev);
892 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
893 		} else {
894 			if (__ratelimit(&drbd_ratelimit_state))
895 				dev_err(DEV, "Not sending RSDataReply, "
896 				    "partner DISKLESS!\n");
897 			ok = 1;
898 		}
899 	} else {
900 		if (__ratelimit(&drbd_ratelimit_state))
901 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
902 			    (unsigned long long)e->sector);
903 
904 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
905 
906 		/* update resync data with failure */
907 		drbd_rs_failed_io(mdev, e->sector, e->size);
908 	}
909 
910 	dec_unacked(mdev);
911 
912 	move_to_net_ee_or_free(mdev, e);
913 
914 	if (unlikely(!ok))
915 		dev_err(DEV, "drbd_send_block() failed\n");
916 	return ok;
917 }
918 
919 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
920 {
921 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
922 	struct digest_info *di;
923 	int digest_size;
924 	void *digest = NULL;
925 	int ok, eq = 0;
926 
927 	if (unlikely(cancel)) {
928 		drbd_free_ee(mdev, e);
929 		dec_unacked(mdev);
930 		return 1;
931 	}
932 
933 	drbd_rs_complete_io(mdev, e->sector);
934 
935 	di = (struct digest_info *)(unsigned long)e->block_id;
936 
937 	if (likely(drbd_bio_uptodate(e->private_bio))) {
938 		/* quick hack to try to avoid a race against reconfiguration.
939 		 * a real fix would be much more involved,
940 		 * introducing more locking mechanisms */
941 		if (mdev->csums_tfm) {
942 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
943 			D_ASSERT(digest_size == di->digest_size);
944 			digest = kmalloc(digest_size, GFP_NOIO);
945 		}
946 		if (digest) {
947 			drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
948 			eq = !memcmp(digest, di->digest, digest_size);
949 			kfree(digest);
950 		}
951 
952 		if (eq) {
953 			drbd_set_in_sync(mdev, e->sector, e->size);
954 			/* rs_same_csums unit is BM_BLOCK_SIZE */
955 			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
956 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
957 		} else {
958 			inc_rs_pending(mdev);
959 			e->block_id = ID_SYNCER;
960 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
961 		}
962 	} else {
963 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
964 		if (__ratelimit(&drbd_ratelimit_state))
965 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
966 	}
967 
968 	dec_unacked(mdev);
969 
970 	kfree(di);
971 
972 	move_to_net_ee_or_free(mdev, e);
973 
974 	if (unlikely(!ok))
975 		dev_err(DEV, "drbd_send_block/ack() failed\n");
976 	return ok;
977 }
978 
979 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
980 {
981 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
982 	int digest_size;
983 	void *digest;
984 	int ok = 1;
985 
986 	if (unlikely(cancel))
987 		goto out;
988 
989 	if (unlikely(!drbd_bio_uptodate(e->private_bio)))
990 		goto out;
991 
992 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
993 	/* FIXME if this allocation fails, online verify will not terminate! */
994 	digest = kmalloc(digest_size, GFP_NOIO);
995 	if (digest) {
996 		drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
997 		inc_rs_pending(mdev);
998 		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
999 					     digest, digest_size, P_OV_REPLY);
1000 		if (!ok)
1001 			dec_rs_pending(mdev);
1002 		kfree(digest);
1003 	}
1004 
1005 out:
1006 	drbd_free_ee(mdev, e);
1007 
1008 	dec_unacked(mdev);
1009 
1010 	return ok;
1011 }
1012 
1013 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1014 {
1015 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1016 		mdev->ov_last_oos_size += size>>9;
1017 	} else {
1018 		mdev->ov_last_oos_start = sector;
1019 		mdev->ov_last_oos_size = size>>9;
1020 	}
1021 	drbd_set_out_of_sync(mdev, sector, size);
1022 	set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1023 }
1024 
1025 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1026 {
1027 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1028 	struct digest_info *di;
1029 	int digest_size;
1030 	void *digest;
1031 	int ok, eq = 0;
1032 
1033 	if (unlikely(cancel)) {
1034 		drbd_free_ee(mdev, e);
1035 		dec_unacked(mdev);
1036 		return 1;
1037 	}
1038 
1039 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1040 	 * the resync lru has been cleaned up already */
1041 	drbd_rs_complete_io(mdev, e->sector);
1042 
1043 	di = (struct digest_info *)(unsigned long)e->block_id;
1044 
1045 	if (likely(drbd_bio_uptodate(e->private_bio))) {
1046 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1047 		digest = kmalloc(digest_size, GFP_NOIO);
1048 		if (digest) {
1049 			drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
1050 
1051 			D_ASSERT(digest_size == di->digest_size);
1052 			eq = !memcmp(digest, di->digest, digest_size);
1053 			kfree(digest);
1054 		}
1055 	} else {
1056 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1057 		if (__ratelimit(&drbd_ratelimit_state))
1058 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1059 	}
1060 
1061 	dec_unacked(mdev);
1062 
1063 	kfree(di);
1064 
1065 	if (!eq)
1066 		drbd_ov_oos_found(mdev, e->sector, e->size);
1067 	else
1068 		ov_oos_print(mdev);
1069 
1070 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1071 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1072 
1073 	drbd_free_ee(mdev, e);
1074 
1075 	if (--mdev->ov_left == 0) {
1076 		ov_oos_print(mdev);
1077 		drbd_resync_finished(mdev);
1078 	}
1079 
1080 	return ok;
1081 }
1082 
1083 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1084 {
1085 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1086 	complete(&b->done);
1087 	return 1;
1088 }
1089 
1090 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1091 {
1092 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1093 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1094 	int ok = 1;
1095 
1096 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1097 	 * just before it was reassigned and re-queued, so double check that.
1098 	 * actually, this race was harmless, since we only try to send the
1099 	 * barrier packet here, and otherwise do nothing with the object.
1100 	 * but compare with the head of w_clear_epoch */
1101 	spin_lock_irq(&mdev->req_lock);
1102 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1103 		cancel = 1;
1104 	spin_unlock_irq(&mdev->req_lock);
1105 	if (cancel)
1106 		return 1;
1107 
1108 	if (!drbd_get_data_sock(mdev))
1109 		return 0;
1110 	p->barrier = b->br_number;
1111 	/* inc_ap_pending was done where this was queued.
1112 	 * dec_ap_pending will be done in got_BarrierAck
1113 	 * or (on connection loss) in w_clear_epoch.  */
1114 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1115 				(struct p_header *)p, sizeof(*p), 0);
1116 	drbd_put_data_sock(mdev);
1117 
1118 	return ok;
1119 }
1120 
1121 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1122 {
1123 	if (cancel)
1124 		return 1;
1125 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1126 }
1127 
1128 /**
1129  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1130  * @mdev:	DRBD device.
1131  * @w:		work object.
1132  * @cancel:	The connection will be closed anyways
1133  */
1134 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1135 {
1136 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1137 	int ok;
1138 
1139 	if (unlikely(cancel)) {
1140 		req_mod(req, send_canceled);
1141 		return 1;
1142 	}
1143 
1144 	ok = drbd_send_dblock(mdev, req);
1145 	req_mod(req, ok ? handed_over_to_network : send_failed);
1146 
1147 	return ok;
1148 }
1149 
1150 /**
1151  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1152  * @mdev:	DRBD device.
1153  * @w:		work object.
1154  * @cancel:	The connection will be closed anyways
1155  */
1156 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1157 {
1158 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1159 	int ok;
1160 
1161 	if (unlikely(cancel)) {
1162 		req_mod(req, send_canceled);
1163 		return 1;
1164 	}
1165 
1166 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1167 				(unsigned long)req);
1168 
1169 	if (!ok) {
1170 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1171 		 * so this is probably redundant */
1172 		if (mdev->state.conn >= C_CONNECTED)
1173 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1174 	}
1175 	req_mod(req, ok ? handed_over_to_network : send_failed);
1176 
1177 	return ok;
1178 }
1179 
1180 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1181 {
1182 	struct drbd_conf *odev = mdev;
1183 
1184 	while (1) {
1185 		if (odev->sync_conf.after == -1)
1186 			return 1;
1187 		odev = minor_to_mdev(odev->sync_conf.after);
1188 		ERR_IF(!odev) return 1;
1189 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1190 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1191 		    odev->state.aftr_isp || odev->state.peer_isp ||
1192 		    odev->state.user_isp)
1193 			return 0;
1194 	}
1195 }
1196 
1197 /**
1198  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1199  * @mdev:	DRBD device.
1200  *
1201  * Called from process context only (admin command and after_state_ch).
1202  */
1203 static int _drbd_pause_after(struct drbd_conf *mdev)
1204 {
1205 	struct drbd_conf *odev;
1206 	int i, rv = 0;
1207 
1208 	for (i = 0; i < minor_count; i++) {
1209 		odev = minor_to_mdev(i);
1210 		if (!odev)
1211 			continue;
1212 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1213 			continue;
1214 		if (!_drbd_may_sync_now(odev))
1215 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1216 			       != SS_NOTHING_TO_DO);
1217 	}
1218 
1219 	return rv;
1220 }
1221 
1222 /**
1223  * _drbd_resume_next() - Resume resync on all devices that may resync now
1224  * @mdev:	DRBD device.
1225  *
1226  * Called from process context only (admin command and worker).
1227  */
1228 static int _drbd_resume_next(struct drbd_conf *mdev)
1229 {
1230 	struct drbd_conf *odev;
1231 	int i, rv = 0;
1232 
1233 	for (i = 0; i < minor_count; i++) {
1234 		odev = minor_to_mdev(i);
1235 		if (!odev)
1236 			continue;
1237 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1238 			continue;
1239 		if (odev->state.aftr_isp) {
1240 			if (_drbd_may_sync_now(odev))
1241 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1242 							CS_HARD, NULL)
1243 				       != SS_NOTHING_TO_DO) ;
1244 		}
1245 	}
1246 	return rv;
1247 }
1248 
1249 void resume_next_sg(struct drbd_conf *mdev)
1250 {
1251 	write_lock_irq(&global_state_lock);
1252 	_drbd_resume_next(mdev);
1253 	write_unlock_irq(&global_state_lock);
1254 }
1255 
1256 void suspend_other_sg(struct drbd_conf *mdev)
1257 {
1258 	write_lock_irq(&global_state_lock);
1259 	_drbd_pause_after(mdev);
1260 	write_unlock_irq(&global_state_lock);
1261 }
1262 
1263 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1264 {
1265 	struct drbd_conf *odev;
1266 
1267 	if (o_minor == -1)
1268 		return NO_ERROR;
1269 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1270 		return ERR_SYNC_AFTER;
1271 
1272 	/* check for loops */
1273 	odev = minor_to_mdev(o_minor);
1274 	while (1) {
1275 		if (odev == mdev)
1276 			return ERR_SYNC_AFTER_CYCLE;
1277 
1278 		/* dependency chain ends here, no cycles. */
1279 		if (odev->sync_conf.after == -1)
1280 			return NO_ERROR;
1281 
1282 		/* follow the dependency chain */
1283 		odev = minor_to_mdev(odev->sync_conf.after);
1284 	}
1285 }
1286 
1287 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1288 {
1289 	int changes;
1290 	int retcode;
1291 
1292 	write_lock_irq(&global_state_lock);
1293 	retcode = sync_after_error(mdev, na);
1294 	if (retcode == NO_ERROR) {
1295 		mdev->sync_conf.after = na;
1296 		do {
1297 			changes  = _drbd_pause_after(mdev);
1298 			changes |= _drbd_resume_next(mdev);
1299 		} while (changes);
1300 	}
1301 	write_unlock_irq(&global_state_lock);
1302 	return retcode;
1303 }
1304 
1305 static void ping_peer(struct drbd_conf *mdev)
1306 {
1307 	clear_bit(GOT_PING_ACK, &mdev->flags);
1308 	request_ping(mdev);
1309 	wait_event(mdev->misc_wait,
1310 		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1311 }
1312 
1313 /**
1314  * drbd_start_resync() - Start the resync process
1315  * @mdev:	DRBD device.
1316  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1317  *
1318  * This function might bring you directly into one of the
1319  * C_PAUSED_SYNC_* states.
1320  */
1321 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1322 {
1323 	union drbd_state ns;
1324 	int r;
1325 
1326 	if (mdev->state.conn >= C_SYNC_SOURCE) {
1327 		dev_err(DEV, "Resync already running!\n");
1328 		return;
1329 	}
1330 
1331 	/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1332 	drbd_rs_cancel_all(mdev);
1333 
1334 	if (side == C_SYNC_TARGET) {
1335 		/* Since application IO was locked out during C_WF_BITMAP_T and
1336 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1337 		   we check that we might make the data inconsistent. */
1338 		r = drbd_khelper(mdev, "before-resync-target");
1339 		r = (r >> 8) & 0xff;
1340 		if (r > 0) {
1341 			dev_info(DEV, "before-resync-target handler returned %d, "
1342 			     "dropping connection.\n", r);
1343 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1344 			return;
1345 		}
1346 	}
1347 
1348 	drbd_state_lock(mdev);
1349 
1350 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1351 		drbd_state_unlock(mdev);
1352 		return;
1353 	}
1354 
1355 	if (side == C_SYNC_TARGET) {
1356 		mdev->bm_resync_fo = 0;
1357 	} else /* side == C_SYNC_SOURCE */ {
1358 		u64 uuid;
1359 
1360 		get_random_bytes(&uuid, sizeof(u64));
1361 		drbd_uuid_set(mdev, UI_BITMAP, uuid);
1362 		drbd_send_sync_uuid(mdev, uuid);
1363 
1364 		D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1365 	}
1366 
1367 	write_lock_irq(&global_state_lock);
1368 	ns = mdev->state;
1369 
1370 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1371 
1372 	ns.conn = side;
1373 
1374 	if (side == C_SYNC_TARGET)
1375 		ns.disk = D_INCONSISTENT;
1376 	else /* side == C_SYNC_SOURCE */
1377 		ns.pdsk = D_INCONSISTENT;
1378 
1379 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1380 	ns = mdev->state;
1381 
1382 	if (ns.conn < C_CONNECTED)
1383 		r = SS_UNKNOWN_ERROR;
1384 
1385 	if (r == SS_SUCCESS) {
1386 		mdev->rs_total     =
1387 		mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1388 		mdev->rs_failed    = 0;
1389 		mdev->rs_paused    = 0;
1390 		mdev->rs_start     =
1391 		mdev->rs_mark_time = jiffies;
1392 		mdev->rs_same_csum = 0;
1393 		_drbd_pause_after(mdev);
1394 	}
1395 	write_unlock_irq(&global_state_lock);
1396 	put_ldev(mdev);
1397 
1398 	if (r == SS_SUCCESS) {
1399 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1400 		     drbd_conn_str(ns.conn),
1401 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1402 		     (unsigned long) mdev->rs_total);
1403 
1404 		if (mdev->rs_total == 0) {
1405 			/* Peer still reachable? Beware of failing before-resync-target handlers! */
1406 			ping_peer(mdev);
1407 			drbd_resync_finished(mdev);
1408 		}
1409 
1410 		/* ns.conn may already be != mdev->state.conn,
1411 		 * we may have been paused in between, or become paused until
1412 		 * the timer triggers.
1413 		 * No matter, that is handled in resync_timer_fn() */
1414 		if (ns.conn == C_SYNC_TARGET)
1415 			mod_timer(&mdev->resync_timer, jiffies);
1416 
1417 		drbd_md_sync(mdev);
1418 	}
1419 	drbd_state_unlock(mdev);
1420 }
1421 
1422 int drbd_worker(struct drbd_thread *thi)
1423 {
1424 	struct drbd_conf *mdev = thi->mdev;
1425 	struct drbd_work *w = NULL;
1426 	LIST_HEAD(work_list);
1427 	int intr = 0, i;
1428 
1429 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1430 
1431 	while (get_t_state(thi) == Running) {
1432 		drbd_thread_current_set_cpu(mdev);
1433 
1434 		if (down_trylock(&mdev->data.work.s)) {
1435 			mutex_lock(&mdev->data.mutex);
1436 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1437 				drbd_tcp_uncork(mdev->data.socket);
1438 			mutex_unlock(&mdev->data.mutex);
1439 
1440 			intr = down_interruptible(&mdev->data.work.s);
1441 
1442 			mutex_lock(&mdev->data.mutex);
1443 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1444 				drbd_tcp_cork(mdev->data.socket);
1445 			mutex_unlock(&mdev->data.mutex);
1446 		}
1447 
1448 		if (intr) {
1449 			D_ASSERT(intr == -EINTR);
1450 			flush_signals(current);
1451 			ERR_IF (get_t_state(thi) == Running)
1452 				continue;
1453 			break;
1454 		}
1455 
1456 		if (get_t_state(thi) != Running)
1457 			break;
1458 		/* With this break, we have done a down() but not consumed
1459 		   the entry from the list. The cleanup code takes care of
1460 		   this...   */
1461 
1462 		w = NULL;
1463 		spin_lock_irq(&mdev->data.work.q_lock);
1464 		ERR_IF(list_empty(&mdev->data.work.q)) {
1465 			/* something terribly wrong in our logic.
1466 			 * we were able to down() the semaphore,
1467 			 * but the list is empty... doh.
1468 			 *
1469 			 * what is the best thing to do now?
1470 			 * try again from scratch, restarting the receiver,
1471 			 * asender, whatnot? could break even more ugly,
1472 			 * e.g. when we are primary, but no good local data.
1473 			 *
1474 			 * I'll try to get away just starting over this loop.
1475 			 */
1476 			spin_unlock_irq(&mdev->data.work.q_lock);
1477 			continue;
1478 		}
1479 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1480 		list_del_init(&w->list);
1481 		spin_unlock_irq(&mdev->data.work.q_lock);
1482 
1483 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1484 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1485 			if (mdev->state.conn >= C_CONNECTED)
1486 				drbd_force_state(mdev,
1487 						NS(conn, C_NETWORK_FAILURE));
1488 		}
1489 	}
1490 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1491 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1492 
1493 	spin_lock_irq(&mdev->data.work.q_lock);
1494 	i = 0;
1495 	while (!list_empty(&mdev->data.work.q)) {
1496 		list_splice_init(&mdev->data.work.q, &work_list);
1497 		spin_unlock_irq(&mdev->data.work.q_lock);
1498 
1499 		while (!list_empty(&work_list)) {
1500 			w = list_entry(work_list.next, struct drbd_work, list);
1501 			list_del_init(&w->list);
1502 			w->cb(mdev, w, 1);
1503 			i++; /* dead debugging code */
1504 		}
1505 
1506 		spin_lock_irq(&mdev->data.work.q_lock);
1507 	}
1508 	sema_init(&mdev->data.work.s, 0);
1509 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1510 	 * but up() ed outside the spinlock, we could get an up() on the
1511 	 * semaphore without corresponding list entry.
1512 	 * So don't do that.
1513 	 */
1514 	spin_unlock_irq(&mdev->data.work.q_lock);
1515 
1516 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1517 	/* _drbd_set_state only uses stop_nowait.
1518 	 * wait here for the Exiting receiver. */
1519 	drbd_thread_stop(&mdev->receiver);
1520 	drbd_mdev_cleanup(mdev);
1521 
1522 	dev_info(DEV, "worker terminated\n");
1523 
1524 	clear_bit(DEVICE_DYING, &mdev->flags);
1525 	clear_bit(CONFIG_PENDING, &mdev->flags);
1526 	wake_up(&mdev->state_wait);
1527 
1528 	return 0;
1529 }
1530