xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision e65f440d)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40 
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 
43 
44 
45 /* defined here:
46    drbd_md_io_complete
47    drbd_endio_sec
48    drbd_endio_pri
49 
50  * more endio handlers:
51    atodb_endio in drbd_actlog.c
52    drbd_bm_async_io_complete in drbd_bitmap.c
53 
54  * For all these callbacks, note the following:
55  * The callbacks will be called in irq context by the IDE drivers,
56  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
57  * Try to get the locking right :)
58  *
59  */
60 
61 
62 /* About the global_state_lock
63    Each state transition on an device holds a read lock. In case we have
64    to evaluate the sync after dependencies, we grab a write lock, because
65    we need stable states on all devices for that.  */
66 rwlock_t global_state_lock;
67 
68 /* used for synchronous meta data and bitmap IO
69  * submitted by drbd_md_sync_page_io()
70  */
71 void drbd_md_io_complete(struct bio *bio, int error)
72 {
73 	struct drbd_md_io *md_io;
74 
75 	md_io = (struct drbd_md_io *)bio->bi_private;
76 	md_io->error = error;
77 
78 	complete(&md_io->event);
79 }
80 
81 /* reads on behalf of the partner,
82  * "submitted" by the receiver
83  */
84 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
85 {
86 	unsigned long flags = 0;
87 	struct drbd_conf *mdev = e->mdev;
88 
89 	D_ASSERT(e->block_id != ID_VACANT);
90 
91 	spin_lock_irqsave(&mdev->req_lock, flags);
92 	mdev->read_cnt += e->size >> 9;
93 	list_del(&e->w.list);
94 	if (list_empty(&mdev->read_ee))
95 		wake_up(&mdev->ee_wait);
96 	if (test_bit(__EE_WAS_ERROR, &e->flags))
97 		__drbd_chk_io_error(mdev, FALSE);
98 	spin_unlock_irqrestore(&mdev->req_lock, flags);
99 
100 	drbd_queue_work(&mdev->data.work, &e->w);
101 	put_ldev(mdev);
102 }
103 
104 /* writes on behalf of the partner, or resync writes,
105  * "submitted" by the receiver, final stage.  */
106 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
107 {
108 	unsigned long flags = 0;
109 	struct drbd_conf *mdev = e->mdev;
110 	sector_t e_sector;
111 	int do_wake;
112 	int is_syncer_req;
113 	int do_al_complete_io;
114 
115 	D_ASSERT(e->block_id != ID_VACANT);
116 
117 	/* after we moved e to done_ee,
118 	 * we may no longer access it,
119 	 * it may be freed/reused already!
120 	 * (as soon as we release the req_lock) */
121 	e_sector = e->sector;
122 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
123 	is_syncer_req = is_syncer_block_id(e->block_id);
124 
125 	spin_lock_irqsave(&mdev->req_lock, flags);
126 	mdev->writ_cnt += e->size >> 9;
127 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
128 	list_add_tail(&e->w.list, &mdev->done_ee);
129 
130 	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
131 	 * neither did we wake possibly waiting conflicting requests.
132 	 * done from "drbd_process_done_ee" within the appropriate w.cb
133 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
134 
135 	do_wake = is_syncer_req
136 		? list_empty(&mdev->sync_ee)
137 		: list_empty(&mdev->active_ee);
138 
139 	if (test_bit(__EE_WAS_ERROR, &e->flags))
140 		__drbd_chk_io_error(mdev, FALSE);
141 	spin_unlock_irqrestore(&mdev->req_lock, flags);
142 
143 	if (is_syncer_req)
144 		drbd_rs_complete_io(mdev, e_sector);
145 
146 	if (do_wake)
147 		wake_up(&mdev->ee_wait);
148 
149 	if (do_al_complete_io)
150 		drbd_al_complete_io(mdev, e_sector);
151 
152 	wake_asender(mdev);
153 	put_ldev(mdev);
154 }
155 
156 /* writes on behalf of the partner, or resync writes,
157  * "submitted" by the receiver.
158  */
159 void drbd_endio_sec(struct bio *bio, int error)
160 {
161 	struct drbd_epoch_entry *e = bio->bi_private;
162 	struct drbd_conf *mdev = e->mdev;
163 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
164 	int is_write = bio_data_dir(bio) == WRITE;
165 
166 	if (error)
167 		dev_warn(DEV, "%s: error=%d s=%llus\n",
168 				is_write ? "write" : "read", error,
169 				(unsigned long long)e->sector);
170 	if (!error && !uptodate) {
171 		dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
172 				is_write ? "write" : "read",
173 				(unsigned long long)e->sector);
174 		/* strange behavior of some lower level drivers...
175 		 * fail the request by clearing the uptodate flag,
176 		 * but do not return any error?! */
177 		error = -EIO;
178 	}
179 
180 	if (error)
181 		set_bit(__EE_WAS_ERROR, &e->flags);
182 
183 	bio_put(bio); /* no need for the bio anymore */
184 	if (atomic_dec_and_test(&e->pending_bios)) {
185 		if (is_write)
186 			drbd_endio_write_sec_final(e);
187 		else
188 			drbd_endio_read_sec_final(e);
189 	}
190 }
191 
192 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
193  */
194 void drbd_endio_pri(struct bio *bio, int error)
195 {
196 	unsigned long flags;
197 	struct drbd_request *req = bio->bi_private;
198 	struct drbd_conf *mdev = req->mdev;
199 	struct bio_and_error m;
200 	enum drbd_req_event what;
201 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
202 
203 	if (!error && !uptodate) {
204 		dev_warn(DEV, "p %s: setting error to -EIO\n",
205 			 bio_data_dir(bio) == WRITE ? "write" : "read");
206 		/* strange behavior of some lower level drivers...
207 		 * fail the request by clearing the uptodate flag,
208 		 * but do not return any error?! */
209 		error = -EIO;
210 	}
211 
212 	/* to avoid recursion in __req_mod */
213 	if (unlikely(error)) {
214 		what = (bio_data_dir(bio) == WRITE)
215 			? write_completed_with_error
216 			: (bio_rw(bio) == READ)
217 			  ? read_completed_with_error
218 			  : read_ahead_completed_with_error;
219 	} else
220 		what = completed_ok;
221 
222 	bio_put(req->private_bio);
223 	req->private_bio = ERR_PTR(error);
224 
225 	/* not req_mod(), we need irqsave here! */
226 	spin_lock_irqsave(&mdev->req_lock, flags);
227 	__req_mod(req, what, &m);
228 	spin_unlock_irqrestore(&mdev->req_lock, flags);
229 
230 	if (m.bio)
231 		complete_master_bio(mdev, &m);
232 }
233 
234 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
235 {
236 	struct drbd_request *req = container_of(w, struct drbd_request, w);
237 
238 	/* We should not detach for read io-error,
239 	 * but try to WRITE the P_DATA_REPLY to the failed location,
240 	 * to give the disk the chance to relocate that block */
241 
242 	spin_lock_irq(&mdev->req_lock);
243 	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
244 		_req_mod(req, read_retry_remote_canceled);
245 		spin_unlock_irq(&mdev->req_lock);
246 		return 1;
247 	}
248 	spin_unlock_irq(&mdev->req_lock);
249 
250 	return w_send_read_req(mdev, w, 0);
251 }
252 
253 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
254 {
255 	ERR_IF(cancel) return 1;
256 	dev_err(DEV, "resync inactive, but callback triggered??\n");
257 	return 1; /* Simply ignore this! */
258 }
259 
260 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
261 {
262 	struct hash_desc desc;
263 	struct scatterlist sg;
264 	struct page *page = e->pages;
265 	struct page *tmp;
266 	unsigned len;
267 
268 	desc.tfm = tfm;
269 	desc.flags = 0;
270 
271 	sg_init_table(&sg, 1);
272 	crypto_hash_init(&desc);
273 
274 	while ((tmp = page_chain_next(page))) {
275 		/* all but the last page will be fully used */
276 		sg_set_page(&sg, page, PAGE_SIZE, 0);
277 		crypto_hash_update(&desc, &sg, sg.length);
278 		page = tmp;
279 	}
280 	/* and now the last, possibly only partially used page */
281 	len = e->size & (PAGE_SIZE - 1);
282 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
283 	crypto_hash_update(&desc, &sg, sg.length);
284 	crypto_hash_final(&desc, digest);
285 }
286 
287 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
288 {
289 	struct hash_desc desc;
290 	struct scatterlist sg;
291 	struct bio_vec *bvec;
292 	int i;
293 
294 	desc.tfm = tfm;
295 	desc.flags = 0;
296 
297 	sg_init_table(&sg, 1);
298 	crypto_hash_init(&desc);
299 
300 	__bio_for_each_segment(bvec, bio, i, 0) {
301 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
302 		crypto_hash_update(&desc, &sg, sg.length);
303 	}
304 	crypto_hash_final(&desc, digest);
305 }
306 
307 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
308 {
309 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
310 	int digest_size;
311 	void *digest;
312 	int ok;
313 
314 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
315 
316 	if (unlikely(cancel)) {
317 		drbd_free_ee(mdev, e);
318 		return 1;
319 	}
320 
321 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
322 		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
323 		digest = kmalloc(digest_size, GFP_NOIO);
324 		if (digest) {
325 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
326 
327 			inc_rs_pending(mdev);
328 			ok = drbd_send_drequest_csum(mdev,
329 						     e->sector,
330 						     e->size,
331 						     digest,
332 						     digest_size,
333 						     P_CSUM_RS_REQUEST);
334 			kfree(digest);
335 		} else {
336 			dev_err(DEV, "kmalloc() of digest failed.\n");
337 			ok = 0;
338 		}
339 	} else
340 		ok = 1;
341 
342 	drbd_free_ee(mdev, e);
343 
344 	if (unlikely(!ok))
345 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
346 	return ok;
347 }
348 
349 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
350 
351 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
352 {
353 	struct drbd_epoch_entry *e;
354 
355 	if (!get_ldev(mdev))
356 		return -EIO;
357 
358 	if (drbd_rs_should_slow_down(mdev))
359 		goto defer;
360 
361 	/* GFP_TRY, because if there is no memory available right now, this may
362 	 * be rescheduled for later. It is "only" background resync, after all. */
363 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
364 	if (!e)
365 		goto defer;
366 
367 	e->w.cb = w_e_send_csum;
368 	spin_lock_irq(&mdev->req_lock);
369 	list_add(&e->w.list, &mdev->read_ee);
370 	spin_unlock_irq(&mdev->req_lock);
371 
372 	atomic_add(size >> 9, &mdev->rs_sect_ev);
373 	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
374 		return 0;
375 
376 	/* drbd_submit_ee currently fails for one reason only:
377 	 * not being able to allocate enough bios.
378 	 * Is dropping the connection going to help? */
379 	spin_lock_irq(&mdev->req_lock);
380 	list_del(&e->w.list);
381 	spin_unlock_irq(&mdev->req_lock);
382 
383 	drbd_free_ee(mdev, e);
384 defer:
385 	put_ldev(mdev);
386 	return -EAGAIN;
387 }
388 
389 void resync_timer_fn(unsigned long data)
390 {
391 	struct drbd_conf *mdev = (struct drbd_conf *) data;
392 	int queue;
393 
394 	queue = 1;
395 	switch (mdev->state.conn) {
396 	case C_VERIFY_S:
397 		mdev->resync_work.cb = w_make_ov_request;
398 		break;
399 	case C_SYNC_TARGET:
400 		mdev->resync_work.cb = w_make_resync_request;
401 		break;
402 	default:
403 		queue = 0;
404 		mdev->resync_work.cb = w_resync_inactive;
405 	}
406 
407 	/* harmless race: list_empty outside data.work.q_lock */
408 	if (list_empty(&mdev->resync_work.list) && queue)
409 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
410 }
411 
412 static void fifo_set(struct fifo_buffer *fb, int value)
413 {
414 	int i;
415 
416 	for (i = 0; i < fb->size; i++)
417 		fb->values[i] = value;
418 }
419 
420 static int fifo_push(struct fifo_buffer *fb, int value)
421 {
422 	int ov;
423 
424 	ov = fb->values[fb->head_index];
425 	fb->values[fb->head_index++] = value;
426 
427 	if (fb->head_index >= fb->size)
428 		fb->head_index = 0;
429 
430 	return ov;
431 }
432 
433 static void fifo_add_val(struct fifo_buffer *fb, int value)
434 {
435 	int i;
436 
437 	for (i = 0; i < fb->size; i++)
438 		fb->values[i] += value;
439 }
440 
441 int drbd_rs_controller(struct drbd_conf *mdev)
442 {
443 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
444 	unsigned int want;     /* The number of sectors we want in the proxy */
445 	int req_sect; /* Number of sectors to request in this turn */
446 	int correction; /* Number of sectors more we need in the proxy*/
447 	int cps; /* correction per invocation of drbd_rs_controller() */
448 	int steps; /* Number of time steps to plan ahead */
449 	int curr_corr;
450 	int max_sect;
451 
452 	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
453 	mdev->rs_in_flight -= sect_in;
454 
455 	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
456 
457 	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
458 
459 	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
460 		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
461 	} else { /* normal path */
462 		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
463 			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
464 	}
465 
466 	correction = want - mdev->rs_in_flight - mdev->rs_planed;
467 
468 	/* Plan ahead */
469 	cps = correction / steps;
470 	fifo_add_val(&mdev->rs_plan_s, cps);
471 	mdev->rs_planed += cps * steps;
472 
473 	/* What we do in this step */
474 	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
475 	spin_unlock(&mdev->peer_seq_lock);
476 	mdev->rs_planed -= curr_corr;
477 
478 	req_sect = sect_in + curr_corr;
479 	if (req_sect < 0)
480 		req_sect = 0;
481 
482 	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
483 	if (req_sect > max_sect)
484 		req_sect = max_sect;
485 
486 	/*
487 	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
488 		 sect_in, mdev->rs_in_flight, want, correction,
489 		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
490 	*/
491 
492 	return req_sect;
493 }
494 
495 int drbd_rs_number_requests(struct drbd_conf *mdev)
496 {
497 	int number;
498 	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
499 		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
500 		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
501 	} else {
502 		mdev->c_sync_rate = mdev->sync_conf.rate;
503 		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
504 	}
505 
506 	/* Throttle resync on lower level disk activity, which may also be
507 	 * caused by application IO on Primary/SyncTarget.
508 	 * Keep this after the call to drbd_rs_controller, as that assumes
509 	 * to be called as precisely as possible every SLEEP_TIME,
510 	 * and would be confused otherwise. */
511 	if (number && drbd_rs_should_slow_down(mdev)) {
512 		mdev->c_sync_rate = 1;
513 		number = 0;
514 	}
515 
516 	/* ignore the amount of pending requests, the resync controller should
517 	 * throttle down to incoming reply rate soon enough anyways. */
518 	return number;
519 }
520 
521 int w_make_resync_request(struct drbd_conf *mdev,
522 		struct drbd_work *w, int cancel)
523 {
524 	unsigned long bit;
525 	sector_t sector;
526 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
527 	int max_segment_size;
528 	int number, rollback_i, size;
529 	int align, queued, sndbuf;
530 	int i = 0;
531 
532 	if (unlikely(cancel))
533 		return 1;
534 
535 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
536 		dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
537 		return 0;
538 	}
539 
540 	if (mdev->state.conn != C_SYNC_TARGET)
541 		dev_err(DEV, "%s in w_make_resync_request\n",
542 			drbd_conn_str(mdev->state.conn));
543 
544 	if (mdev->rs_total == 0) {
545 		/* empty resync? */
546 		drbd_resync_finished(mdev);
547 		return 1;
548 	}
549 
550 	if (!get_ldev(mdev)) {
551 		/* Since we only need to access mdev->rsync a
552 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
553 		   to continue resync with a broken disk makes no sense at
554 		   all */
555 		dev_err(DEV, "Disk broke down during resync!\n");
556 		mdev->resync_work.cb = w_resync_inactive;
557 		return 1;
558 	}
559 
560 	/* starting with drbd 8.3.8, we can handle multi-bio EEs,
561 	 * if it should be necessary */
562 	max_segment_size =
563 		mdev->agreed_pro_version < 94 ? queue_max_segment_size(mdev->rq_queue) :
564 		mdev->agreed_pro_version < 95 ?	DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_SEGMENT_SIZE;
565 
566 	number = drbd_rs_number_requests(mdev);
567 	if (number == 0)
568 		goto requeue;
569 
570 	for (i = 0; i < number; i++) {
571 		/* Stop generating RS requests, when half of the send buffer is filled */
572 		mutex_lock(&mdev->data.mutex);
573 		if (mdev->data.socket) {
574 			queued = mdev->data.socket->sk->sk_wmem_queued;
575 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
576 		} else {
577 			queued = 1;
578 			sndbuf = 0;
579 		}
580 		mutex_unlock(&mdev->data.mutex);
581 		if (queued > sndbuf / 2)
582 			goto requeue;
583 
584 next_sector:
585 		size = BM_BLOCK_SIZE;
586 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
587 
588 		if (bit == -1UL) {
589 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
590 			mdev->resync_work.cb = w_resync_inactive;
591 			put_ldev(mdev);
592 			return 1;
593 		}
594 
595 		sector = BM_BIT_TO_SECT(bit);
596 
597 		if (drbd_try_rs_begin_io(mdev, sector)) {
598 			mdev->bm_resync_fo = bit;
599 			goto requeue;
600 		}
601 		mdev->bm_resync_fo = bit + 1;
602 
603 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
604 			drbd_rs_complete_io(mdev, sector);
605 			goto next_sector;
606 		}
607 
608 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
609 		/* try to find some adjacent bits.
610 		 * we stop if we have already the maximum req size.
611 		 *
612 		 * Additionally always align bigger requests, in order to
613 		 * be prepared for all stripe sizes of software RAIDs.
614 		 */
615 		align = 1;
616 		rollback_i = i;
617 		for (;;) {
618 			if (size + BM_BLOCK_SIZE > max_segment_size)
619 				break;
620 
621 			/* Be always aligned */
622 			if (sector & ((1<<(align+3))-1))
623 				break;
624 
625 			/* do not cross extent boundaries */
626 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
627 				break;
628 			/* now, is it actually dirty, after all?
629 			 * caution, drbd_bm_test_bit is tri-state for some
630 			 * obscure reason; ( b == 0 ) would get the out-of-band
631 			 * only accidentally right because of the "oddly sized"
632 			 * adjustment below */
633 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
634 				break;
635 			bit++;
636 			size += BM_BLOCK_SIZE;
637 			if ((BM_BLOCK_SIZE << align) <= size)
638 				align++;
639 			i++;
640 		}
641 		/* if we merged some,
642 		 * reset the offset to start the next drbd_bm_find_next from */
643 		if (size > BM_BLOCK_SIZE)
644 			mdev->bm_resync_fo = bit + 1;
645 #endif
646 
647 		/* adjust very last sectors, in case we are oddly sized */
648 		if (sector + (size>>9) > capacity)
649 			size = (capacity-sector)<<9;
650 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
651 			switch (read_for_csum(mdev, sector, size)) {
652 			case -EIO: /* Disk failure */
653 				put_ldev(mdev);
654 				return 0;
655 			case -EAGAIN: /* allocation failed, or ldev busy */
656 				drbd_rs_complete_io(mdev, sector);
657 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
658 				i = rollback_i;
659 				goto requeue;
660 			case 0:
661 				/* everything ok */
662 				break;
663 			default:
664 				BUG();
665 			}
666 		} else {
667 			inc_rs_pending(mdev);
668 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
669 					       sector, size, ID_SYNCER)) {
670 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
671 				dec_rs_pending(mdev);
672 				put_ldev(mdev);
673 				return 0;
674 			}
675 		}
676 	}
677 
678 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
679 		/* last syncer _request_ was sent,
680 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
681 		 * next sync group will resume), as soon as we receive the last
682 		 * resync data block, and the last bit is cleared.
683 		 * until then resync "work" is "inactive" ...
684 		 */
685 		mdev->resync_work.cb = w_resync_inactive;
686 		put_ldev(mdev);
687 		return 1;
688 	}
689 
690  requeue:
691 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
692 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
693 	put_ldev(mdev);
694 	return 1;
695 }
696 
697 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
698 {
699 	int number, i, size;
700 	sector_t sector;
701 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
702 
703 	if (unlikely(cancel))
704 		return 1;
705 
706 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
707 		dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
708 		return 0;
709 	}
710 
711 	number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
712 	if (atomic_read(&mdev->rs_pending_cnt) > number)
713 		goto requeue;
714 
715 	number -= atomic_read(&mdev->rs_pending_cnt);
716 
717 	sector = mdev->ov_position;
718 	for (i = 0; i < number; i++) {
719 		if (sector >= capacity) {
720 			mdev->resync_work.cb = w_resync_inactive;
721 			return 1;
722 		}
723 
724 		size = BM_BLOCK_SIZE;
725 
726 		if (drbd_try_rs_begin_io(mdev, sector)) {
727 			mdev->ov_position = sector;
728 			goto requeue;
729 		}
730 
731 		if (sector + (size>>9) > capacity)
732 			size = (capacity-sector)<<9;
733 
734 		inc_rs_pending(mdev);
735 		if (!drbd_send_ov_request(mdev, sector, size)) {
736 			dec_rs_pending(mdev);
737 			return 0;
738 		}
739 		sector += BM_SECT_PER_BIT;
740 	}
741 	mdev->ov_position = sector;
742 
743  requeue:
744 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
745 	return 1;
746 }
747 
748 
749 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
750 {
751 	kfree(w);
752 	ov_oos_print(mdev);
753 	drbd_resync_finished(mdev);
754 
755 	return 1;
756 }
757 
758 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
759 {
760 	kfree(w);
761 
762 	drbd_resync_finished(mdev);
763 
764 	return 1;
765 }
766 
767 static void ping_peer(struct drbd_conf *mdev)
768 {
769 	clear_bit(GOT_PING_ACK, &mdev->flags);
770 	request_ping(mdev);
771 	wait_event(mdev->misc_wait,
772 		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
773 }
774 
775 int drbd_resync_finished(struct drbd_conf *mdev)
776 {
777 	unsigned long db, dt, dbdt;
778 	unsigned long n_oos;
779 	union drbd_state os, ns;
780 	struct drbd_work *w;
781 	char *khelper_cmd = NULL;
782 	int verify_done = 0;
783 
784 	/* Remove all elements from the resync LRU. Since future actions
785 	 * might set bits in the (main) bitmap, then the entries in the
786 	 * resync LRU would be wrong. */
787 	if (drbd_rs_del_all(mdev)) {
788 		/* In case this is not possible now, most probably because
789 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
790 		 * queue (or even the read operations for those packets
791 		 * is not finished by now).   Retry in 100ms. */
792 
793 		__set_current_state(TASK_INTERRUPTIBLE);
794 		schedule_timeout(HZ / 10);
795 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
796 		if (w) {
797 			w->cb = w_resync_finished;
798 			drbd_queue_work(&mdev->data.work, w);
799 			return 1;
800 		}
801 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
802 	}
803 
804 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
805 	if (dt <= 0)
806 		dt = 1;
807 	db = mdev->rs_total;
808 	dbdt = Bit2KB(db/dt);
809 	mdev->rs_paused /= HZ;
810 
811 	if (!get_ldev(mdev))
812 		goto out;
813 
814 	ping_peer(mdev);
815 
816 	spin_lock_irq(&mdev->req_lock);
817 	os = mdev->state;
818 
819 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
820 
821 	/* This protects us against multiple calls (that can happen in the presence
822 	   of application IO), and against connectivity loss just before we arrive here. */
823 	if (os.conn <= C_CONNECTED)
824 		goto out_unlock;
825 
826 	ns = os;
827 	ns.conn = C_CONNECTED;
828 
829 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
830 	     verify_done ? "Online verify " : "Resync",
831 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
832 
833 	n_oos = drbd_bm_total_weight(mdev);
834 
835 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
836 		if (n_oos) {
837 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
838 			      n_oos, Bit2KB(1));
839 			khelper_cmd = "out-of-sync";
840 		}
841 	} else {
842 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
843 
844 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
845 			khelper_cmd = "after-resync-target";
846 
847 		if (mdev->csums_tfm && mdev->rs_total) {
848 			const unsigned long s = mdev->rs_same_csum;
849 			const unsigned long t = mdev->rs_total;
850 			const int ratio =
851 				(t == 0)     ? 0 :
852 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
853 			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
854 			     "transferred %luK total %luK\n",
855 			     ratio,
856 			     Bit2KB(mdev->rs_same_csum),
857 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
858 			     Bit2KB(mdev->rs_total));
859 		}
860 	}
861 
862 	if (mdev->rs_failed) {
863 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
864 
865 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
866 			ns.disk = D_INCONSISTENT;
867 			ns.pdsk = D_UP_TO_DATE;
868 		} else {
869 			ns.disk = D_UP_TO_DATE;
870 			ns.pdsk = D_INCONSISTENT;
871 		}
872 	} else {
873 		ns.disk = D_UP_TO_DATE;
874 		ns.pdsk = D_UP_TO_DATE;
875 
876 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
877 			if (mdev->p_uuid) {
878 				int i;
879 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
880 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
881 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
882 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
883 			} else {
884 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
885 			}
886 		}
887 
888 		drbd_uuid_set_bm(mdev, 0UL);
889 
890 		if (mdev->p_uuid) {
891 			/* Now the two UUID sets are equal, update what we
892 			 * know of the peer. */
893 			int i;
894 			for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
895 				mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
896 		}
897 	}
898 
899 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
900 out_unlock:
901 	spin_unlock_irq(&mdev->req_lock);
902 	put_ldev(mdev);
903 out:
904 	mdev->rs_total  = 0;
905 	mdev->rs_failed = 0;
906 	mdev->rs_paused = 0;
907 	if (verify_done)
908 		mdev->ov_start_sector = 0;
909 
910 	drbd_md_sync(mdev);
911 
912 	if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
913 		dev_info(DEV, "Writing the whole bitmap\n");
914 		drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
915 	}
916 
917 	if (khelper_cmd)
918 		drbd_khelper(mdev, khelper_cmd);
919 
920 	return 1;
921 }
922 
923 /* helper */
924 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
925 {
926 	if (drbd_ee_has_active_page(e)) {
927 		/* This might happen if sendpage() has not finished */
928 		int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
929 		atomic_add(i, &mdev->pp_in_use_by_net);
930 		atomic_sub(i, &mdev->pp_in_use);
931 		spin_lock_irq(&mdev->req_lock);
932 		list_add_tail(&e->w.list, &mdev->net_ee);
933 		spin_unlock_irq(&mdev->req_lock);
934 		wake_up(&drbd_pp_wait);
935 	} else
936 		drbd_free_ee(mdev, e);
937 }
938 
939 /**
940  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
941  * @mdev:	DRBD device.
942  * @w:		work object.
943  * @cancel:	The connection will be closed anyways
944  */
945 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
946 {
947 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
948 	int ok;
949 
950 	if (unlikely(cancel)) {
951 		drbd_free_ee(mdev, e);
952 		dec_unacked(mdev);
953 		return 1;
954 	}
955 
956 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
957 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
958 	} else {
959 		if (__ratelimit(&drbd_ratelimit_state))
960 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
961 			    (unsigned long long)e->sector);
962 
963 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
964 	}
965 
966 	dec_unacked(mdev);
967 
968 	move_to_net_ee_or_free(mdev, e);
969 
970 	if (unlikely(!ok))
971 		dev_err(DEV, "drbd_send_block() failed\n");
972 	return ok;
973 }
974 
975 /**
976  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
977  * @mdev:	DRBD device.
978  * @w:		work object.
979  * @cancel:	The connection will be closed anyways
980  */
981 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
982 {
983 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
984 	int ok;
985 
986 	if (unlikely(cancel)) {
987 		drbd_free_ee(mdev, e);
988 		dec_unacked(mdev);
989 		return 1;
990 	}
991 
992 	if (get_ldev_if_state(mdev, D_FAILED)) {
993 		drbd_rs_complete_io(mdev, e->sector);
994 		put_ldev(mdev);
995 	}
996 
997 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
998 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
999 			inc_rs_pending(mdev);
1000 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1001 		} else {
1002 			if (__ratelimit(&drbd_ratelimit_state))
1003 				dev_err(DEV, "Not sending RSDataReply, "
1004 				    "partner DISKLESS!\n");
1005 			ok = 1;
1006 		}
1007 	} else {
1008 		if (__ratelimit(&drbd_ratelimit_state))
1009 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1010 			    (unsigned long long)e->sector);
1011 
1012 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1013 
1014 		/* update resync data with failure */
1015 		drbd_rs_failed_io(mdev, e->sector, e->size);
1016 	}
1017 
1018 	dec_unacked(mdev);
1019 
1020 	move_to_net_ee_or_free(mdev, e);
1021 
1022 	if (unlikely(!ok))
1023 		dev_err(DEV, "drbd_send_block() failed\n");
1024 	return ok;
1025 }
1026 
1027 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1028 {
1029 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1030 	struct digest_info *di;
1031 	int digest_size;
1032 	void *digest = NULL;
1033 	int ok, eq = 0;
1034 
1035 	if (unlikely(cancel)) {
1036 		drbd_free_ee(mdev, e);
1037 		dec_unacked(mdev);
1038 		return 1;
1039 	}
1040 
1041 	if (get_ldev(mdev)) {
1042 		drbd_rs_complete_io(mdev, e->sector);
1043 		put_ldev(mdev);
1044 	}
1045 
1046 	di = e->digest;
1047 
1048 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1049 		/* quick hack to try to avoid a race against reconfiguration.
1050 		 * a real fix would be much more involved,
1051 		 * introducing more locking mechanisms */
1052 		if (mdev->csums_tfm) {
1053 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1054 			D_ASSERT(digest_size == di->digest_size);
1055 			digest = kmalloc(digest_size, GFP_NOIO);
1056 		}
1057 		if (digest) {
1058 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1059 			eq = !memcmp(digest, di->digest, digest_size);
1060 			kfree(digest);
1061 		}
1062 
1063 		if (eq) {
1064 			drbd_set_in_sync(mdev, e->sector, e->size);
1065 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1066 			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1067 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1068 		} else {
1069 			inc_rs_pending(mdev);
1070 			e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1071 			e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1072 			kfree(di);
1073 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1074 		}
1075 	} else {
1076 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1077 		if (__ratelimit(&drbd_ratelimit_state))
1078 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1079 	}
1080 
1081 	dec_unacked(mdev);
1082 	move_to_net_ee_or_free(mdev, e);
1083 
1084 	if (unlikely(!ok))
1085 		dev_err(DEV, "drbd_send_block/ack() failed\n");
1086 	return ok;
1087 }
1088 
1089 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1090 {
1091 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1092 	int digest_size;
1093 	void *digest;
1094 	int ok = 1;
1095 
1096 	if (unlikely(cancel))
1097 		goto out;
1098 
1099 	if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1100 		goto out;
1101 
1102 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1103 	/* FIXME if this allocation fails, online verify will not terminate! */
1104 	digest = kmalloc(digest_size, GFP_NOIO);
1105 	if (digest) {
1106 		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1107 		inc_rs_pending(mdev);
1108 		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1109 					     digest, digest_size, P_OV_REPLY);
1110 		if (!ok)
1111 			dec_rs_pending(mdev);
1112 		kfree(digest);
1113 	}
1114 
1115 out:
1116 	drbd_free_ee(mdev, e);
1117 
1118 	dec_unacked(mdev);
1119 
1120 	return ok;
1121 }
1122 
1123 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1124 {
1125 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1126 		mdev->ov_last_oos_size += size>>9;
1127 	} else {
1128 		mdev->ov_last_oos_start = sector;
1129 		mdev->ov_last_oos_size = size>>9;
1130 	}
1131 	drbd_set_out_of_sync(mdev, sector, size);
1132 	set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1133 }
1134 
1135 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1136 {
1137 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1138 	struct digest_info *di;
1139 	int digest_size;
1140 	void *digest;
1141 	int ok, eq = 0;
1142 
1143 	if (unlikely(cancel)) {
1144 		drbd_free_ee(mdev, e);
1145 		dec_unacked(mdev);
1146 		return 1;
1147 	}
1148 
1149 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1150 	 * the resync lru has been cleaned up already */
1151 	if (get_ldev(mdev)) {
1152 		drbd_rs_complete_io(mdev, e->sector);
1153 		put_ldev(mdev);
1154 	}
1155 
1156 	di = e->digest;
1157 
1158 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1159 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1160 		digest = kmalloc(digest_size, GFP_NOIO);
1161 		if (digest) {
1162 			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1163 
1164 			D_ASSERT(digest_size == di->digest_size);
1165 			eq = !memcmp(digest, di->digest, digest_size);
1166 			kfree(digest);
1167 		}
1168 	} else {
1169 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1170 		if (__ratelimit(&drbd_ratelimit_state))
1171 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1172 	}
1173 
1174 	dec_unacked(mdev);
1175 	if (!eq)
1176 		drbd_ov_oos_found(mdev, e->sector, e->size);
1177 	else
1178 		ov_oos_print(mdev);
1179 
1180 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1181 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1182 
1183 	drbd_free_ee(mdev, e);
1184 
1185 	--mdev->ov_left;
1186 
1187 	/* let's advance progress step marks only for every other megabyte */
1188 	if ((mdev->ov_left & 0x200) == 0x200)
1189 		drbd_advance_rs_marks(mdev, mdev->ov_left);
1190 
1191 	if (mdev->ov_left == 0) {
1192 		ov_oos_print(mdev);
1193 		drbd_resync_finished(mdev);
1194 	}
1195 
1196 	return ok;
1197 }
1198 
1199 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1200 {
1201 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1202 	complete(&b->done);
1203 	return 1;
1204 }
1205 
1206 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1207 {
1208 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1209 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1210 	int ok = 1;
1211 
1212 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1213 	 * just before it was reassigned and re-queued, so double check that.
1214 	 * actually, this race was harmless, since we only try to send the
1215 	 * barrier packet here, and otherwise do nothing with the object.
1216 	 * but compare with the head of w_clear_epoch */
1217 	spin_lock_irq(&mdev->req_lock);
1218 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1219 		cancel = 1;
1220 	spin_unlock_irq(&mdev->req_lock);
1221 	if (cancel)
1222 		return 1;
1223 
1224 	if (!drbd_get_data_sock(mdev))
1225 		return 0;
1226 	p->barrier = b->br_number;
1227 	/* inc_ap_pending was done where this was queued.
1228 	 * dec_ap_pending will be done in got_BarrierAck
1229 	 * or (on connection loss) in w_clear_epoch.  */
1230 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1231 				(struct p_header80 *)p, sizeof(*p), 0);
1232 	drbd_put_data_sock(mdev);
1233 
1234 	return ok;
1235 }
1236 
1237 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1238 {
1239 	if (cancel)
1240 		return 1;
1241 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1242 }
1243 
1244 /**
1245  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1246  * @mdev:	DRBD device.
1247  * @w:		work object.
1248  * @cancel:	The connection will be closed anyways
1249  */
1250 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1251 {
1252 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1253 	int ok;
1254 
1255 	if (unlikely(cancel)) {
1256 		req_mod(req, send_canceled);
1257 		return 1;
1258 	}
1259 
1260 	ok = drbd_send_dblock(mdev, req);
1261 	req_mod(req, ok ? handed_over_to_network : send_failed);
1262 
1263 	return ok;
1264 }
1265 
1266 /**
1267  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1268  * @mdev:	DRBD device.
1269  * @w:		work object.
1270  * @cancel:	The connection will be closed anyways
1271  */
1272 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1273 {
1274 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1275 	int ok;
1276 
1277 	if (unlikely(cancel)) {
1278 		req_mod(req, send_canceled);
1279 		return 1;
1280 	}
1281 
1282 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1283 				(unsigned long)req);
1284 
1285 	if (!ok) {
1286 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1287 		 * so this is probably redundant */
1288 		if (mdev->state.conn >= C_CONNECTED)
1289 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1290 	}
1291 	req_mod(req, ok ? handed_over_to_network : send_failed);
1292 
1293 	return ok;
1294 }
1295 
1296 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1297 {
1298 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1299 
1300 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1301 		drbd_al_begin_io(mdev, req->sector);
1302 	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1303 	   theoretically. Practically it can not deadlock, since this is
1304 	   only used when unfreezing IOs. All the extents of the requests
1305 	   that made it into the TL are already active */
1306 
1307 	drbd_req_make_private_bio(req, req->master_bio);
1308 	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1309 	generic_make_request(req->private_bio);
1310 
1311 	return 1;
1312 }
1313 
1314 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1315 {
1316 	struct drbd_conf *odev = mdev;
1317 
1318 	while (1) {
1319 		if (odev->sync_conf.after == -1)
1320 			return 1;
1321 		odev = minor_to_mdev(odev->sync_conf.after);
1322 		ERR_IF(!odev) return 1;
1323 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1324 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1325 		    odev->state.aftr_isp || odev->state.peer_isp ||
1326 		    odev->state.user_isp)
1327 			return 0;
1328 	}
1329 }
1330 
1331 /**
1332  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1333  * @mdev:	DRBD device.
1334  *
1335  * Called from process context only (admin command and after_state_ch).
1336  */
1337 static int _drbd_pause_after(struct drbd_conf *mdev)
1338 {
1339 	struct drbd_conf *odev;
1340 	int i, rv = 0;
1341 
1342 	for (i = 0; i < minor_count; i++) {
1343 		odev = minor_to_mdev(i);
1344 		if (!odev)
1345 			continue;
1346 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1347 			continue;
1348 		if (!_drbd_may_sync_now(odev))
1349 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1350 			       != SS_NOTHING_TO_DO);
1351 	}
1352 
1353 	return rv;
1354 }
1355 
1356 /**
1357  * _drbd_resume_next() - Resume resync on all devices that may resync now
1358  * @mdev:	DRBD device.
1359  *
1360  * Called from process context only (admin command and worker).
1361  */
1362 static int _drbd_resume_next(struct drbd_conf *mdev)
1363 {
1364 	struct drbd_conf *odev;
1365 	int i, rv = 0;
1366 
1367 	for (i = 0; i < minor_count; i++) {
1368 		odev = minor_to_mdev(i);
1369 		if (!odev)
1370 			continue;
1371 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1372 			continue;
1373 		if (odev->state.aftr_isp) {
1374 			if (_drbd_may_sync_now(odev))
1375 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1376 							CS_HARD, NULL)
1377 				       != SS_NOTHING_TO_DO) ;
1378 		}
1379 	}
1380 	return rv;
1381 }
1382 
1383 void resume_next_sg(struct drbd_conf *mdev)
1384 {
1385 	write_lock_irq(&global_state_lock);
1386 	_drbd_resume_next(mdev);
1387 	write_unlock_irq(&global_state_lock);
1388 }
1389 
1390 void suspend_other_sg(struct drbd_conf *mdev)
1391 {
1392 	write_lock_irq(&global_state_lock);
1393 	_drbd_pause_after(mdev);
1394 	write_unlock_irq(&global_state_lock);
1395 }
1396 
1397 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1398 {
1399 	struct drbd_conf *odev;
1400 
1401 	if (o_minor == -1)
1402 		return NO_ERROR;
1403 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1404 		return ERR_SYNC_AFTER;
1405 
1406 	/* check for loops */
1407 	odev = minor_to_mdev(o_minor);
1408 	while (1) {
1409 		if (odev == mdev)
1410 			return ERR_SYNC_AFTER_CYCLE;
1411 
1412 		/* dependency chain ends here, no cycles. */
1413 		if (odev->sync_conf.after == -1)
1414 			return NO_ERROR;
1415 
1416 		/* follow the dependency chain */
1417 		odev = minor_to_mdev(odev->sync_conf.after);
1418 	}
1419 }
1420 
1421 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1422 {
1423 	int changes;
1424 	int retcode;
1425 
1426 	write_lock_irq(&global_state_lock);
1427 	retcode = sync_after_error(mdev, na);
1428 	if (retcode == NO_ERROR) {
1429 		mdev->sync_conf.after = na;
1430 		do {
1431 			changes  = _drbd_pause_after(mdev);
1432 			changes |= _drbd_resume_next(mdev);
1433 		} while (changes);
1434 	}
1435 	write_unlock_irq(&global_state_lock);
1436 	return retcode;
1437 }
1438 
1439 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1440 {
1441 	atomic_set(&mdev->rs_sect_in, 0);
1442 	atomic_set(&mdev->rs_sect_ev, 0);
1443 	mdev->rs_in_flight = 0;
1444 	mdev->rs_planed = 0;
1445 	spin_lock(&mdev->peer_seq_lock);
1446 	fifo_set(&mdev->rs_plan_s, 0);
1447 	spin_unlock(&mdev->peer_seq_lock);
1448 }
1449 
1450 /**
1451  * drbd_start_resync() - Start the resync process
1452  * @mdev:	DRBD device.
1453  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1454  *
1455  * This function might bring you directly into one of the
1456  * C_PAUSED_SYNC_* states.
1457  */
1458 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1459 {
1460 	union drbd_state ns;
1461 	int r;
1462 
1463 	if (mdev->state.conn >= C_SYNC_SOURCE) {
1464 		dev_err(DEV, "Resync already running!\n");
1465 		return;
1466 	}
1467 
1468 	/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1469 	drbd_rs_cancel_all(mdev);
1470 
1471 	if (side == C_SYNC_TARGET) {
1472 		/* Since application IO was locked out during C_WF_BITMAP_T and
1473 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1474 		   we check that we might make the data inconsistent. */
1475 		r = drbd_khelper(mdev, "before-resync-target");
1476 		r = (r >> 8) & 0xff;
1477 		if (r > 0) {
1478 			dev_info(DEV, "before-resync-target handler returned %d, "
1479 			     "dropping connection.\n", r);
1480 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1481 			return;
1482 		}
1483 	}
1484 
1485 	drbd_state_lock(mdev);
1486 
1487 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1488 		drbd_state_unlock(mdev);
1489 		return;
1490 	}
1491 
1492 	if (side == C_SYNC_TARGET) {
1493 		mdev->bm_resync_fo = 0;
1494 	} else /* side == C_SYNC_SOURCE */ {
1495 		u64 uuid;
1496 
1497 		get_random_bytes(&uuid, sizeof(u64));
1498 		drbd_uuid_set(mdev, UI_BITMAP, uuid);
1499 		drbd_send_sync_uuid(mdev, uuid);
1500 
1501 		D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1502 	}
1503 
1504 	write_lock_irq(&global_state_lock);
1505 	ns = mdev->state;
1506 
1507 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1508 
1509 	ns.conn = side;
1510 
1511 	if (side == C_SYNC_TARGET)
1512 		ns.disk = D_INCONSISTENT;
1513 	else /* side == C_SYNC_SOURCE */
1514 		ns.pdsk = D_INCONSISTENT;
1515 
1516 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1517 	ns = mdev->state;
1518 
1519 	if (ns.conn < C_CONNECTED)
1520 		r = SS_UNKNOWN_ERROR;
1521 
1522 	if (r == SS_SUCCESS) {
1523 		unsigned long tw = drbd_bm_total_weight(mdev);
1524 		unsigned long now = jiffies;
1525 		int i;
1526 
1527 		mdev->rs_failed    = 0;
1528 		mdev->rs_paused    = 0;
1529 		mdev->rs_same_csum = 0;
1530 		mdev->rs_last_events = 0;
1531 		mdev->rs_last_sect_ev = 0;
1532 		mdev->rs_total     = tw;
1533 		mdev->rs_start     = now;
1534 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1535 			mdev->rs_mark_left[i] = tw;
1536 			mdev->rs_mark_time[i] = now;
1537 		}
1538 		_drbd_pause_after(mdev);
1539 	}
1540 	write_unlock_irq(&global_state_lock);
1541 	put_ldev(mdev);
1542 
1543 	if (r == SS_SUCCESS) {
1544 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1545 		     drbd_conn_str(ns.conn),
1546 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1547 		     (unsigned long) mdev->rs_total);
1548 
1549 		if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1550 			/* This still has a race (about when exactly the peers
1551 			 * detect connection loss) that can lead to a full sync
1552 			 * on next handshake. In 8.3.9 we fixed this with explicit
1553 			 * resync-finished notifications, but the fix
1554 			 * introduces a protocol change.  Sleeping for some
1555 			 * time longer than the ping interval + timeout on the
1556 			 * SyncSource, to give the SyncTarget the chance to
1557 			 * detect connection loss, then waiting for a ping
1558 			 * response (implicit in drbd_resync_finished) reduces
1559 			 * the race considerably, but does not solve it. */
1560 			if (side == C_SYNC_SOURCE)
1561 				schedule_timeout_interruptible(
1562 					mdev->net_conf->ping_int * HZ +
1563 					mdev->net_conf->ping_timeo*HZ/9);
1564 			drbd_resync_finished(mdev);
1565 		}
1566 
1567 		drbd_rs_controller_reset(mdev);
1568 		/* ns.conn may already be != mdev->state.conn,
1569 		 * we may have been paused in between, or become paused until
1570 		 * the timer triggers.
1571 		 * No matter, that is handled in resync_timer_fn() */
1572 		if (ns.conn == C_SYNC_TARGET)
1573 			mod_timer(&mdev->resync_timer, jiffies);
1574 
1575 		drbd_md_sync(mdev);
1576 	}
1577 	drbd_state_unlock(mdev);
1578 }
1579 
1580 int drbd_worker(struct drbd_thread *thi)
1581 {
1582 	struct drbd_conf *mdev = thi->mdev;
1583 	struct drbd_work *w = NULL;
1584 	LIST_HEAD(work_list);
1585 	int intr = 0, i;
1586 
1587 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1588 
1589 	while (get_t_state(thi) == Running) {
1590 		drbd_thread_current_set_cpu(mdev);
1591 
1592 		if (down_trylock(&mdev->data.work.s)) {
1593 			mutex_lock(&mdev->data.mutex);
1594 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1595 				drbd_tcp_uncork(mdev->data.socket);
1596 			mutex_unlock(&mdev->data.mutex);
1597 
1598 			intr = down_interruptible(&mdev->data.work.s);
1599 
1600 			mutex_lock(&mdev->data.mutex);
1601 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1602 				drbd_tcp_cork(mdev->data.socket);
1603 			mutex_unlock(&mdev->data.mutex);
1604 		}
1605 
1606 		if (intr) {
1607 			D_ASSERT(intr == -EINTR);
1608 			flush_signals(current);
1609 			ERR_IF (get_t_state(thi) == Running)
1610 				continue;
1611 			break;
1612 		}
1613 
1614 		if (get_t_state(thi) != Running)
1615 			break;
1616 		/* With this break, we have done a down() but not consumed
1617 		   the entry from the list. The cleanup code takes care of
1618 		   this...   */
1619 
1620 		w = NULL;
1621 		spin_lock_irq(&mdev->data.work.q_lock);
1622 		ERR_IF(list_empty(&mdev->data.work.q)) {
1623 			/* something terribly wrong in our logic.
1624 			 * we were able to down() the semaphore,
1625 			 * but the list is empty... doh.
1626 			 *
1627 			 * what is the best thing to do now?
1628 			 * try again from scratch, restarting the receiver,
1629 			 * asender, whatnot? could break even more ugly,
1630 			 * e.g. when we are primary, but no good local data.
1631 			 *
1632 			 * I'll try to get away just starting over this loop.
1633 			 */
1634 			spin_unlock_irq(&mdev->data.work.q_lock);
1635 			continue;
1636 		}
1637 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1638 		list_del_init(&w->list);
1639 		spin_unlock_irq(&mdev->data.work.q_lock);
1640 
1641 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1642 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1643 			if (mdev->state.conn >= C_CONNECTED)
1644 				drbd_force_state(mdev,
1645 						NS(conn, C_NETWORK_FAILURE));
1646 		}
1647 	}
1648 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1649 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1650 
1651 	spin_lock_irq(&mdev->data.work.q_lock);
1652 	i = 0;
1653 	while (!list_empty(&mdev->data.work.q)) {
1654 		list_splice_init(&mdev->data.work.q, &work_list);
1655 		spin_unlock_irq(&mdev->data.work.q_lock);
1656 
1657 		while (!list_empty(&work_list)) {
1658 			w = list_entry(work_list.next, struct drbd_work, list);
1659 			list_del_init(&w->list);
1660 			w->cb(mdev, w, 1);
1661 			i++; /* dead debugging code */
1662 		}
1663 
1664 		spin_lock_irq(&mdev->data.work.q_lock);
1665 	}
1666 	sema_init(&mdev->data.work.s, 0);
1667 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1668 	 * but up() ed outside the spinlock, we could get an up() on the
1669 	 * semaphore without corresponding list entry.
1670 	 * So don't do that.
1671 	 */
1672 	spin_unlock_irq(&mdev->data.work.q_lock);
1673 
1674 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1675 	/* _drbd_set_state only uses stop_nowait.
1676 	 * wait here for the Exiting receiver. */
1677 	drbd_thread_stop(&mdev->receiver);
1678 	drbd_mdev_cleanup(mdev);
1679 
1680 	dev_info(DEV, "worker terminated\n");
1681 
1682 	clear_bit(DEVICE_DYING, &mdev->flags);
1683 	clear_bit(CONFIG_PENDING, &mdev->flags);
1684 	wake_up(&mdev->state_wait);
1685 
1686 	return 0;
1687 }
1688