xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision b9ccfda2)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40 
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 static int w_make_resync_request(struct drbd_conf *mdev,
43 				 struct drbd_work *w, int cancel);
44 
45 
46 
47 /* endio handlers:
48  *   drbd_md_io_complete (defined here)
49  *   drbd_endio_pri (defined here)
50  *   drbd_endio_sec (defined here)
51  *   bm_async_io_complete (defined in drbd_bitmap.c)
52  *
53  * For all these callbacks, note the following:
54  * The callbacks will be called in irq context by the IDE drivers,
55  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
56  * Try to get the locking right :)
57  *
58  */
59 
60 
61 /* About the global_state_lock
62    Each state transition on an device holds a read lock. In case we have
63    to evaluate the sync after dependencies, we grab a write lock, because
64    we need stable states on all devices for that.  */
65 rwlock_t global_state_lock;
66 
67 /* used for synchronous meta data and bitmap IO
68  * submitted by drbd_md_sync_page_io()
69  */
70 void drbd_md_io_complete(struct bio *bio, int error)
71 {
72 	struct drbd_md_io *md_io;
73 	struct drbd_conf *mdev;
74 
75 	md_io = (struct drbd_md_io *)bio->bi_private;
76 	mdev = container_of(md_io, struct drbd_conf, md_io);
77 
78 	md_io->error = error;
79 
80 	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
81 	 * to timeout on the lower level device, and eventually detach from it.
82 	 * If this io completion runs after that timeout expired, this
83 	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
84 	 * During normal operation, this only puts that extra reference
85 	 * down to 1 again.
86 	 * Make sure we first drop the reference, and only then signal
87 	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
88 	 * next drbd_md_sync_page_io(), that we trigger the
89 	 * ASSERT(atomic_read(&mdev->md_io_in_use) == 1) there.
90 	 */
91 	drbd_md_put_buffer(mdev);
92 	md_io->done = 1;
93 	wake_up(&mdev->misc_wait);
94 	bio_put(bio);
95 	put_ldev(mdev);
96 }
97 
98 /* reads on behalf of the partner,
99  * "submitted" by the receiver
100  */
101 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
102 {
103 	unsigned long flags = 0;
104 	struct drbd_conf *mdev = e->mdev;
105 
106 	D_ASSERT(e->block_id != ID_VACANT);
107 
108 	spin_lock_irqsave(&mdev->req_lock, flags);
109 	mdev->read_cnt += e->size >> 9;
110 	list_del(&e->w.list);
111 	if (list_empty(&mdev->read_ee))
112 		wake_up(&mdev->ee_wait);
113 	if (test_bit(__EE_WAS_ERROR, &e->flags))
114 		__drbd_chk_io_error(mdev, false);
115 	spin_unlock_irqrestore(&mdev->req_lock, flags);
116 
117 	drbd_queue_work(&mdev->data.work, &e->w);
118 	put_ldev(mdev);
119 }
120 
121 /* writes on behalf of the partner, or resync writes,
122  * "submitted" by the receiver, final stage.  */
123 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
124 {
125 	unsigned long flags = 0;
126 	struct drbd_conf *mdev = e->mdev;
127 	sector_t e_sector;
128 	int do_wake;
129 	int is_syncer_req;
130 	int do_al_complete_io;
131 
132 	D_ASSERT(e->block_id != ID_VACANT);
133 
134 	/* after we moved e to done_ee,
135 	 * we may no longer access it,
136 	 * it may be freed/reused already!
137 	 * (as soon as we release the req_lock) */
138 	e_sector = e->sector;
139 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
140 	is_syncer_req = is_syncer_block_id(e->block_id);
141 
142 	spin_lock_irqsave(&mdev->req_lock, flags);
143 	mdev->writ_cnt += e->size >> 9;
144 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
145 	list_add_tail(&e->w.list, &mdev->done_ee);
146 
147 	/* No hlist_del_init(&e->collision) here, we did not send the Ack yet,
148 	 * neither did we wake possibly waiting conflicting requests.
149 	 * done from "drbd_process_done_ee" within the appropriate w.cb
150 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
151 
152 	do_wake = is_syncer_req
153 		? list_empty(&mdev->sync_ee)
154 		: list_empty(&mdev->active_ee);
155 
156 	if (test_bit(__EE_WAS_ERROR, &e->flags))
157 		__drbd_chk_io_error(mdev, false);
158 	spin_unlock_irqrestore(&mdev->req_lock, flags);
159 
160 	if (is_syncer_req)
161 		drbd_rs_complete_io(mdev, e_sector);
162 
163 	if (do_wake)
164 		wake_up(&mdev->ee_wait);
165 
166 	if (do_al_complete_io)
167 		drbd_al_complete_io(mdev, e_sector);
168 
169 	wake_asender(mdev);
170 	put_ldev(mdev);
171 }
172 
173 /* writes on behalf of the partner, or resync writes,
174  * "submitted" by the receiver.
175  */
176 void drbd_endio_sec(struct bio *bio, int error)
177 {
178 	struct drbd_epoch_entry *e = bio->bi_private;
179 	struct drbd_conf *mdev = e->mdev;
180 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
181 	int is_write = bio_data_dir(bio) == WRITE;
182 
183 	if (error && __ratelimit(&drbd_ratelimit_state))
184 		dev_warn(DEV, "%s: error=%d s=%llus\n",
185 				is_write ? "write" : "read", error,
186 				(unsigned long long)e->sector);
187 	if (!error && !uptodate) {
188 		if (__ratelimit(&drbd_ratelimit_state))
189 			dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
190 					is_write ? "write" : "read",
191 					(unsigned long long)e->sector);
192 		/* strange behavior of some lower level drivers...
193 		 * fail the request by clearing the uptodate flag,
194 		 * but do not return any error?! */
195 		error = -EIO;
196 	}
197 
198 	if (error)
199 		set_bit(__EE_WAS_ERROR, &e->flags);
200 
201 	bio_put(bio); /* no need for the bio anymore */
202 	if (atomic_dec_and_test(&e->pending_bios)) {
203 		if (is_write)
204 			drbd_endio_write_sec_final(e);
205 		else
206 			drbd_endio_read_sec_final(e);
207 	}
208 }
209 
210 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
211  */
212 void drbd_endio_pri(struct bio *bio, int error)
213 {
214 	unsigned long flags;
215 	struct drbd_request *req = bio->bi_private;
216 	struct drbd_conf *mdev = req->mdev;
217 	struct bio_and_error m;
218 	enum drbd_req_event what;
219 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
220 
221 	if (!error && !uptodate) {
222 		dev_warn(DEV, "p %s: setting error to -EIO\n",
223 			 bio_data_dir(bio) == WRITE ? "write" : "read");
224 		/* strange behavior of some lower level drivers...
225 		 * fail the request by clearing the uptodate flag,
226 		 * but do not return any error?! */
227 		error = -EIO;
228 	}
229 
230 	/* to avoid recursion in __req_mod */
231 	if (unlikely(error)) {
232 		what = (bio_data_dir(bio) == WRITE)
233 			? write_completed_with_error
234 			: (bio_rw(bio) == READ)
235 			  ? read_completed_with_error
236 			  : read_ahead_completed_with_error;
237 	} else
238 		what = completed_ok;
239 
240 	bio_put(req->private_bio);
241 	req->private_bio = ERR_PTR(error);
242 
243 	/* not req_mod(), we need irqsave here! */
244 	spin_lock_irqsave(&mdev->req_lock, flags);
245 	__req_mod(req, what, &m);
246 	spin_unlock_irqrestore(&mdev->req_lock, flags);
247 	put_ldev(mdev);
248 
249 	if (m.bio)
250 		complete_master_bio(mdev, &m);
251 }
252 
253 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
254 {
255 	struct drbd_request *req = container_of(w, struct drbd_request, w);
256 
257 	/* We should not detach for read io-error,
258 	 * but try to WRITE the P_DATA_REPLY to the failed location,
259 	 * to give the disk the chance to relocate that block */
260 
261 	spin_lock_irq(&mdev->req_lock);
262 	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
263 		_req_mod(req, read_retry_remote_canceled);
264 		spin_unlock_irq(&mdev->req_lock);
265 		return 1;
266 	}
267 	spin_unlock_irq(&mdev->req_lock);
268 
269 	return w_send_read_req(mdev, w, 0);
270 }
271 
272 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
273 {
274 	struct hash_desc desc;
275 	struct scatterlist sg;
276 	struct page *page = e->pages;
277 	struct page *tmp;
278 	unsigned len;
279 
280 	desc.tfm = tfm;
281 	desc.flags = 0;
282 
283 	sg_init_table(&sg, 1);
284 	crypto_hash_init(&desc);
285 
286 	while ((tmp = page_chain_next(page))) {
287 		/* all but the last page will be fully used */
288 		sg_set_page(&sg, page, PAGE_SIZE, 0);
289 		crypto_hash_update(&desc, &sg, sg.length);
290 		page = tmp;
291 	}
292 	/* and now the last, possibly only partially used page */
293 	len = e->size & (PAGE_SIZE - 1);
294 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
295 	crypto_hash_update(&desc, &sg, sg.length);
296 	crypto_hash_final(&desc, digest);
297 }
298 
299 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
300 {
301 	struct hash_desc desc;
302 	struct scatterlist sg;
303 	struct bio_vec *bvec;
304 	int i;
305 
306 	desc.tfm = tfm;
307 	desc.flags = 0;
308 
309 	sg_init_table(&sg, 1);
310 	crypto_hash_init(&desc);
311 
312 	bio_for_each_segment(bvec, bio, i) {
313 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
314 		crypto_hash_update(&desc, &sg, sg.length);
315 	}
316 	crypto_hash_final(&desc, digest);
317 }
318 
319 /* TODO merge common code with w_e_end_ov_req */
320 int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
321 {
322 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
323 	int digest_size;
324 	void *digest;
325 	int ok = 1;
326 
327 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
328 
329 	if (unlikely(cancel))
330 		goto out;
331 
332 	if (likely((e->flags & EE_WAS_ERROR) != 0))
333 		goto out;
334 
335 	digest_size = crypto_hash_digestsize(mdev->csums_tfm);
336 	digest = kmalloc(digest_size, GFP_NOIO);
337 	if (digest) {
338 		sector_t sector = e->sector;
339 		unsigned int size = e->size;
340 		drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
341 		/* Free e and pages before send.
342 		 * In case we block on congestion, we could otherwise run into
343 		 * some distributed deadlock, if the other side blocks on
344 		 * congestion as well, because our receiver blocks in
345 		 * drbd_pp_alloc due to pp_in_use > max_buffers. */
346 		drbd_free_ee(mdev, e);
347 		e = NULL;
348 		inc_rs_pending(mdev);
349 		ok = drbd_send_drequest_csum(mdev, sector, size,
350 					     digest, digest_size,
351 					     P_CSUM_RS_REQUEST);
352 		kfree(digest);
353 	} else {
354 		dev_err(DEV, "kmalloc() of digest failed.\n");
355 		ok = 0;
356 	}
357 
358 out:
359 	if (e)
360 		drbd_free_ee(mdev, e);
361 
362 	if (unlikely(!ok))
363 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
364 	return ok;
365 }
366 
367 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
368 
369 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
370 {
371 	struct drbd_epoch_entry *e;
372 
373 	if (!get_ldev(mdev))
374 		return -EIO;
375 
376 	if (drbd_rs_should_slow_down(mdev, sector))
377 		goto defer;
378 
379 	/* GFP_TRY, because if there is no memory available right now, this may
380 	 * be rescheduled for later. It is "only" background resync, after all. */
381 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
382 	if (!e)
383 		goto defer;
384 
385 	e->w.cb = w_e_send_csum;
386 	spin_lock_irq(&mdev->req_lock);
387 	list_add(&e->w.list, &mdev->read_ee);
388 	spin_unlock_irq(&mdev->req_lock);
389 
390 	atomic_add(size >> 9, &mdev->rs_sect_ev);
391 	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
392 		return 0;
393 
394 	/* If it failed because of ENOMEM, retry should help.  If it failed
395 	 * because bio_add_page failed (probably broken lower level driver),
396 	 * retry may or may not help.
397 	 * If it does not, you may need to force disconnect. */
398 	spin_lock_irq(&mdev->req_lock);
399 	list_del(&e->w.list);
400 	spin_unlock_irq(&mdev->req_lock);
401 
402 	drbd_free_ee(mdev, e);
403 defer:
404 	put_ldev(mdev);
405 	return -EAGAIN;
406 }
407 
408 int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
409 {
410 	switch (mdev->state.conn) {
411 	case C_VERIFY_S:
412 		w_make_ov_request(mdev, w, cancel);
413 		break;
414 	case C_SYNC_TARGET:
415 		w_make_resync_request(mdev, w, cancel);
416 		break;
417 	}
418 
419 	return 1;
420 }
421 
422 void resync_timer_fn(unsigned long data)
423 {
424 	struct drbd_conf *mdev = (struct drbd_conf *) data;
425 
426 	if (list_empty(&mdev->resync_work.list))
427 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
428 }
429 
430 static void fifo_set(struct fifo_buffer *fb, int value)
431 {
432 	int i;
433 
434 	for (i = 0; i < fb->size; i++)
435 		fb->values[i] = value;
436 }
437 
438 static int fifo_push(struct fifo_buffer *fb, int value)
439 {
440 	int ov;
441 
442 	ov = fb->values[fb->head_index];
443 	fb->values[fb->head_index++] = value;
444 
445 	if (fb->head_index >= fb->size)
446 		fb->head_index = 0;
447 
448 	return ov;
449 }
450 
451 static void fifo_add_val(struct fifo_buffer *fb, int value)
452 {
453 	int i;
454 
455 	for (i = 0; i < fb->size; i++)
456 		fb->values[i] += value;
457 }
458 
459 static int drbd_rs_controller(struct drbd_conf *mdev)
460 {
461 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
462 	unsigned int want;     /* The number of sectors we want in the proxy */
463 	int req_sect; /* Number of sectors to request in this turn */
464 	int correction; /* Number of sectors more we need in the proxy*/
465 	int cps; /* correction per invocation of drbd_rs_controller() */
466 	int steps; /* Number of time steps to plan ahead */
467 	int curr_corr;
468 	int max_sect;
469 
470 	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
471 	mdev->rs_in_flight -= sect_in;
472 
473 	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
474 
475 	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
476 
477 	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
478 		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
479 	} else { /* normal path */
480 		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
481 			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
482 	}
483 
484 	correction = want - mdev->rs_in_flight - mdev->rs_planed;
485 
486 	/* Plan ahead */
487 	cps = correction / steps;
488 	fifo_add_val(&mdev->rs_plan_s, cps);
489 	mdev->rs_planed += cps * steps;
490 
491 	/* What we do in this step */
492 	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
493 	spin_unlock(&mdev->peer_seq_lock);
494 	mdev->rs_planed -= curr_corr;
495 
496 	req_sect = sect_in + curr_corr;
497 	if (req_sect < 0)
498 		req_sect = 0;
499 
500 	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
501 	if (req_sect > max_sect)
502 		req_sect = max_sect;
503 
504 	/*
505 	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
506 		 sect_in, mdev->rs_in_flight, want, correction,
507 		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
508 	*/
509 
510 	return req_sect;
511 }
512 
513 static int drbd_rs_number_requests(struct drbd_conf *mdev)
514 {
515 	int number;
516 	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
517 		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
518 		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
519 	} else {
520 		mdev->c_sync_rate = mdev->sync_conf.rate;
521 		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
522 	}
523 
524 	/* ignore the amount of pending requests, the resync controller should
525 	 * throttle down to incoming reply rate soon enough anyways. */
526 	return number;
527 }
528 
529 static int w_make_resync_request(struct drbd_conf *mdev,
530 				 struct drbd_work *w, int cancel)
531 {
532 	unsigned long bit;
533 	sector_t sector;
534 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
535 	int max_bio_size;
536 	int number, rollback_i, size;
537 	int align, queued, sndbuf;
538 	int i = 0;
539 
540 	if (unlikely(cancel))
541 		return 1;
542 
543 	if (mdev->rs_total == 0) {
544 		/* empty resync? */
545 		drbd_resync_finished(mdev);
546 		return 1;
547 	}
548 
549 	if (!get_ldev(mdev)) {
550 		/* Since we only need to access mdev->rsync a
551 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
552 		   to continue resync with a broken disk makes no sense at
553 		   all */
554 		dev_err(DEV, "Disk broke down during resync!\n");
555 		return 1;
556 	}
557 
558 	max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
559 	number = drbd_rs_number_requests(mdev);
560 	if (number == 0)
561 		goto requeue;
562 
563 	for (i = 0; i < number; i++) {
564 		/* Stop generating RS requests, when half of the send buffer is filled */
565 		mutex_lock(&mdev->data.mutex);
566 		if (mdev->data.socket) {
567 			queued = mdev->data.socket->sk->sk_wmem_queued;
568 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
569 		} else {
570 			queued = 1;
571 			sndbuf = 0;
572 		}
573 		mutex_unlock(&mdev->data.mutex);
574 		if (queued > sndbuf / 2)
575 			goto requeue;
576 
577 next_sector:
578 		size = BM_BLOCK_SIZE;
579 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
580 
581 		if (bit == DRBD_END_OF_BITMAP) {
582 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
583 			put_ldev(mdev);
584 			return 1;
585 		}
586 
587 		sector = BM_BIT_TO_SECT(bit);
588 
589 		if (drbd_rs_should_slow_down(mdev, sector) ||
590 		    drbd_try_rs_begin_io(mdev, sector)) {
591 			mdev->bm_resync_fo = bit;
592 			goto requeue;
593 		}
594 		mdev->bm_resync_fo = bit + 1;
595 
596 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
597 			drbd_rs_complete_io(mdev, sector);
598 			goto next_sector;
599 		}
600 
601 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
602 		/* try to find some adjacent bits.
603 		 * we stop if we have already the maximum req size.
604 		 *
605 		 * Additionally always align bigger requests, in order to
606 		 * be prepared for all stripe sizes of software RAIDs.
607 		 */
608 		align = 1;
609 		rollback_i = i;
610 		for (;;) {
611 			if (size + BM_BLOCK_SIZE > max_bio_size)
612 				break;
613 
614 			/* Be always aligned */
615 			if (sector & ((1<<(align+3))-1))
616 				break;
617 
618 			/* do not cross extent boundaries */
619 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
620 				break;
621 			/* now, is it actually dirty, after all?
622 			 * caution, drbd_bm_test_bit is tri-state for some
623 			 * obscure reason; ( b == 0 ) would get the out-of-band
624 			 * only accidentally right because of the "oddly sized"
625 			 * adjustment below */
626 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
627 				break;
628 			bit++;
629 			size += BM_BLOCK_SIZE;
630 			if ((BM_BLOCK_SIZE << align) <= size)
631 				align++;
632 			i++;
633 		}
634 		/* if we merged some,
635 		 * reset the offset to start the next drbd_bm_find_next from */
636 		if (size > BM_BLOCK_SIZE)
637 			mdev->bm_resync_fo = bit + 1;
638 #endif
639 
640 		/* adjust very last sectors, in case we are oddly sized */
641 		if (sector + (size>>9) > capacity)
642 			size = (capacity-sector)<<9;
643 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
644 			switch (read_for_csum(mdev, sector, size)) {
645 			case -EIO: /* Disk failure */
646 				put_ldev(mdev);
647 				return 0;
648 			case -EAGAIN: /* allocation failed, or ldev busy */
649 				drbd_rs_complete_io(mdev, sector);
650 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
651 				i = rollback_i;
652 				goto requeue;
653 			case 0:
654 				/* everything ok */
655 				break;
656 			default:
657 				BUG();
658 			}
659 		} else {
660 			inc_rs_pending(mdev);
661 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
662 					       sector, size, ID_SYNCER)) {
663 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
664 				dec_rs_pending(mdev);
665 				put_ldev(mdev);
666 				return 0;
667 			}
668 		}
669 	}
670 
671 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
672 		/* last syncer _request_ was sent,
673 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
674 		 * next sync group will resume), as soon as we receive the last
675 		 * resync data block, and the last bit is cleared.
676 		 * until then resync "work" is "inactive" ...
677 		 */
678 		put_ldev(mdev);
679 		return 1;
680 	}
681 
682  requeue:
683 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
684 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
685 	put_ldev(mdev);
686 	return 1;
687 }
688 
689 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
690 {
691 	int number, i, size;
692 	sector_t sector;
693 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
694 
695 	if (unlikely(cancel))
696 		return 1;
697 
698 	number = drbd_rs_number_requests(mdev);
699 
700 	sector = mdev->ov_position;
701 	for (i = 0; i < number; i++) {
702 		if (sector >= capacity) {
703 			return 1;
704 		}
705 
706 		size = BM_BLOCK_SIZE;
707 
708 		if (drbd_rs_should_slow_down(mdev, sector) ||
709 		    drbd_try_rs_begin_io(mdev, sector)) {
710 			mdev->ov_position = sector;
711 			goto requeue;
712 		}
713 
714 		if (sector + (size>>9) > capacity)
715 			size = (capacity-sector)<<9;
716 
717 		inc_rs_pending(mdev);
718 		if (!drbd_send_ov_request(mdev, sector, size)) {
719 			dec_rs_pending(mdev);
720 			return 0;
721 		}
722 		sector += BM_SECT_PER_BIT;
723 	}
724 	mdev->ov_position = sector;
725 
726  requeue:
727 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
728 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
729 	return 1;
730 }
731 
732 
733 void start_resync_timer_fn(unsigned long data)
734 {
735 	struct drbd_conf *mdev = (struct drbd_conf *) data;
736 
737 	drbd_queue_work(&mdev->data.work, &mdev->start_resync_work);
738 }
739 
740 int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
741 {
742 	if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
743 		dev_warn(DEV, "w_start_resync later...\n");
744 		mdev->start_resync_timer.expires = jiffies + HZ/10;
745 		add_timer(&mdev->start_resync_timer);
746 		return 1;
747 	}
748 
749 	drbd_start_resync(mdev, C_SYNC_SOURCE);
750 	clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags);
751 	return 1;
752 }
753 
754 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
755 {
756 	kfree(w);
757 	ov_oos_print(mdev);
758 	drbd_resync_finished(mdev);
759 
760 	return 1;
761 }
762 
763 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
764 {
765 	kfree(w);
766 
767 	drbd_resync_finished(mdev);
768 
769 	return 1;
770 }
771 
772 static void ping_peer(struct drbd_conf *mdev)
773 {
774 	clear_bit(GOT_PING_ACK, &mdev->flags);
775 	request_ping(mdev);
776 	wait_event(mdev->misc_wait,
777 		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
778 }
779 
780 int drbd_resync_finished(struct drbd_conf *mdev)
781 {
782 	unsigned long db, dt, dbdt;
783 	unsigned long n_oos;
784 	union drbd_state os, ns;
785 	struct drbd_work *w;
786 	char *khelper_cmd = NULL;
787 	int verify_done = 0;
788 
789 	/* Remove all elements from the resync LRU. Since future actions
790 	 * might set bits in the (main) bitmap, then the entries in the
791 	 * resync LRU would be wrong. */
792 	if (drbd_rs_del_all(mdev)) {
793 		/* In case this is not possible now, most probably because
794 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
795 		 * queue (or even the read operations for those packets
796 		 * is not finished by now).   Retry in 100ms. */
797 
798 		schedule_timeout_interruptible(HZ / 10);
799 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
800 		if (w) {
801 			w->cb = w_resync_finished;
802 			drbd_queue_work(&mdev->data.work, w);
803 			return 1;
804 		}
805 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
806 	}
807 
808 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
809 	if (dt <= 0)
810 		dt = 1;
811 	db = mdev->rs_total;
812 	dbdt = Bit2KB(db/dt);
813 	mdev->rs_paused /= HZ;
814 
815 	if (!get_ldev(mdev))
816 		goto out;
817 
818 	ping_peer(mdev);
819 
820 	spin_lock_irq(&mdev->req_lock);
821 	os = mdev->state;
822 
823 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
824 
825 	/* This protects us against multiple calls (that can happen in the presence
826 	   of application IO), and against connectivity loss just before we arrive here. */
827 	if (os.conn <= C_CONNECTED)
828 		goto out_unlock;
829 
830 	ns = os;
831 	ns.conn = C_CONNECTED;
832 
833 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
834 	     verify_done ? "Online verify " : "Resync",
835 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
836 
837 	n_oos = drbd_bm_total_weight(mdev);
838 
839 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
840 		if (n_oos) {
841 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
842 			      n_oos, Bit2KB(1));
843 			khelper_cmd = "out-of-sync";
844 		}
845 	} else {
846 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
847 
848 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
849 			khelper_cmd = "after-resync-target";
850 
851 		if (mdev->csums_tfm && mdev->rs_total) {
852 			const unsigned long s = mdev->rs_same_csum;
853 			const unsigned long t = mdev->rs_total;
854 			const int ratio =
855 				(t == 0)     ? 0 :
856 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
857 			dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
858 			     "transferred %luK total %luK\n",
859 			     ratio,
860 			     Bit2KB(mdev->rs_same_csum),
861 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
862 			     Bit2KB(mdev->rs_total));
863 		}
864 	}
865 
866 	if (mdev->rs_failed) {
867 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
868 
869 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
870 			ns.disk = D_INCONSISTENT;
871 			ns.pdsk = D_UP_TO_DATE;
872 		} else {
873 			ns.disk = D_UP_TO_DATE;
874 			ns.pdsk = D_INCONSISTENT;
875 		}
876 	} else {
877 		ns.disk = D_UP_TO_DATE;
878 		ns.pdsk = D_UP_TO_DATE;
879 
880 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
881 			if (mdev->p_uuid) {
882 				int i;
883 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
884 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
885 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
886 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
887 			} else {
888 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
889 			}
890 		}
891 
892 		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
893 			/* for verify runs, we don't update uuids here,
894 			 * so there would be nothing to report. */
895 			drbd_uuid_set_bm(mdev, 0UL);
896 			drbd_print_uuids(mdev, "updated UUIDs");
897 			if (mdev->p_uuid) {
898 				/* Now the two UUID sets are equal, update what we
899 				 * know of the peer. */
900 				int i;
901 				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
902 					mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
903 			}
904 		}
905 	}
906 
907 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
908 out_unlock:
909 	spin_unlock_irq(&mdev->req_lock);
910 	put_ldev(mdev);
911 out:
912 	mdev->rs_total  = 0;
913 	mdev->rs_failed = 0;
914 	mdev->rs_paused = 0;
915 	if (verify_done)
916 		mdev->ov_start_sector = 0;
917 
918 	drbd_md_sync(mdev);
919 
920 	if (khelper_cmd)
921 		drbd_khelper(mdev, khelper_cmd);
922 
923 	return 1;
924 }
925 
926 /* helper */
927 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
928 {
929 	if (drbd_ee_has_active_page(e)) {
930 		/* This might happen if sendpage() has not finished */
931 		int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
932 		atomic_add(i, &mdev->pp_in_use_by_net);
933 		atomic_sub(i, &mdev->pp_in_use);
934 		spin_lock_irq(&mdev->req_lock);
935 		list_add_tail(&e->w.list, &mdev->net_ee);
936 		spin_unlock_irq(&mdev->req_lock);
937 		wake_up(&drbd_pp_wait);
938 	} else
939 		drbd_free_ee(mdev, e);
940 }
941 
942 /**
943  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
944  * @mdev:	DRBD device.
945  * @w:		work object.
946  * @cancel:	The connection will be closed anyways
947  */
948 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
949 {
950 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
951 	int ok;
952 
953 	if (unlikely(cancel)) {
954 		drbd_free_ee(mdev, e);
955 		dec_unacked(mdev);
956 		return 1;
957 	}
958 
959 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
960 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
961 	} else {
962 		if (__ratelimit(&drbd_ratelimit_state))
963 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
964 			    (unsigned long long)e->sector);
965 
966 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
967 	}
968 
969 	dec_unacked(mdev);
970 
971 	move_to_net_ee_or_free(mdev, e);
972 
973 	if (unlikely(!ok))
974 		dev_err(DEV, "drbd_send_block() failed\n");
975 	return ok;
976 }
977 
978 /**
979  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
980  * @mdev:	DRBD device.
981  * @w:		work object.
982  * @cancel:	The connection will be closed anyways
983  */
984 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
985 {
986 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
987 	int ok;
988 
989 	if (unlikely(cancel)) {
990 		drbd_free_ee(mdev, e);
991 		dec_unacked(mdev);
992 		return 1;
993 	}
994 
995 	if (get_ldev_if_state(mdev, D_FAILED)) {
996 		drbd_rs_complete_io(mdev, e->sector);
997 		put_ldev(mdev);
998 	}
999 
1000 	if (mdev->state.conn == C_AHEAD) {
1001 		ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
1002 	} else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1003 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1004 			inc_rs_pending(mdev);
1005 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1006 		} else {
1007 			if (__ratelimit(&drbd_ratelimit_state))
1008 				dev_err(DEV, "Not sending RSDataReply, "
1009 				    "partner DISKLESS!\n");
1010 			ok = 1;
1011 		}
1012 	} else {
1013 		if (__ratelimit(&drbd_ratelimit_state))
1014 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1015 			    (unsigned long long)e->sector);
1016 
1017 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1018 
1019 		/* update resync data with failure */
1020 		drbd_rs_failed_io(mdev, e->sector, e->size);
1021 	}
1022 
1023 	dec_unacked(mdev);
1024 
1025 	move_to_net_ee_or_free(mdev, e);
1026 
1027 	if (unlikely(!ok))
1028 		dev_err(DEV, "drbd_send_block() failed\n");
1029 	return ok;
1030 }
1031 
1032 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1033 {
1034 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1035 	struct digest_info *di;
1036 	int digest_size;
1037 	void *digest = NULL;
1038 	int ok, eq = 0;
1039 
1040 	if (unlikely(cancel)) {
1041 		drbd_free_ee(mdev, e);
1042 		dec_unacked(mdev);
1043 		return 1;
1044 	}
1045 
1046 	if (get_ldev(mdev)) {
1047 		drbd_rs_complete_io(mdev, e->sector);
1048 		put_ldev(mdev);
1049 	}
1050 
1051 	di = e->digest;
1052 
1053 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1054 		/* quick hack to try to avoid a race against reconfiguration.
1055 		 * a real fix would be much more involved,
1056 		 * introducing more locking mechanisms */
1057 		if (mdev->csums_tfm) {
1058 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1059 			D_ASSERT(digest_size == di->digest_size);
1060 			digest = kmalloc(digest_size, GFP_NOIO);
1061 		}
1062 		if (digest) {
1063 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1064 			eq = !memcmp(digest, di->digest, digest_size);
1065 			kfree(digest);
1066 		}
1067 
1068 		if (eq) {
1069 			drbd_set_in_sync(mdev, e->sector, e->size);
1070 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1071 			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1072 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1073 		} else {
1074 			inc_rs_pending(mdev);
1075 			e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1076 			e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1077 			kfree(di);
1078 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1079 		}
1080 	} else {
1081 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1082 		if (__ratelimit(&drbd_ratelimit_state))
1083 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1084 	}
1085 
1086 	dec_unacked(mdev);
1087 	move_to_net_ee_or_free(mdev, e);
1088 
1089 	if (unlikely(!ok))
1090 		dev_err(DEV, "drbd_send_block/ack() failed\n");
1091 	return ok;
1092 }
1093 
1094 /* TODO merge common code with w_e_send_csum */
1095 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1096 {
1097 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1098 	sector_t sector = e->sector;
1099 	unsigned int size = e->size;
1100 	int digest_size;
1101 	void *digest;
1102 	int ok = 1;
1103 
1104 	if (unlikely(cancel))
1105 		goto out;
1106 
1107 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1108 	digest = kmalloc(digest_size, GFP_NOIO);
1109 	if (!digest) {
1110 		ok = 0;	/* terminate the connection in case the allocation failed */
1111 		goto out;
1112 	}
1113 
1114 	if (likely(!(e->flags & EE_WAS_ERROR)))
1115 		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1116 	else
1117 		memset(digest, 0, digest_size);
1118 
1119 	/* Free e and pages before send.
1120 	 * In case we block on congestion, we could otherwise run into
1121 	 * some distributed deadlock, if the other side blocks on
1122 	 * congestion as well, because our receiver blocks in
1123 	 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1124 	drbd_free_ee(mdev, e);
1125 	e = NULL;
1126 	inc_rs_pending(mdev);
1127 	ok = drbd_send_drequest_csum(mdev, sector, size,
1128 				     digest, digest_size,
1129 				     P_OV_REPLY);
1130 	if (!ok)
1131 		dec_rs_pending(mdev);
1132 	kfree(digest);
1133 
1134 out:
1135 	if (e)
1136 		drbd_free_ee(mdev, e);
1137 	dec_unacked(mdev);
1138 	return ok;
1139 }
1140 
1141 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1142 {
1143 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1144 		mdev->ov_last_oos_size += size>>9;
1145 	} else {
1146 		mdev->ov_last_oos_start = sector;
1147 		mdev->ov_last_oos_size = size>>9;
1148 	}
1149 	drbd_set_out_of_sync(mdev, sector, size);
1150 }
1151 
1152 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1153 {
1154 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1155 	struct digest_info *di;
1156 	void *digest;
1157 	sector_t sector = e->sector;
1158 	unsigned int size = e->size;
1159 	int digest_size;
1160 	int ok, eq = 0;
1161 
1162 	if (unlikely(cancel)) {
1163 		drbd_free_ee(mdev, e);
1164 		dec_unacked(mdev);
1165 		return 1;
1166 	}
1167 
1168 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1169 	 * the resync lru has been cleaned up already */
1170 	if (get_ldev(mdev)) {
1171 		drbd_rs_complete_io(mdev, e->sector);
1172 		put_ldev(mdev);
1173 	}
1174 
1175 	di = e->digest;
1176 
1177 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1178 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1179 		digest = kmalloc(digest_size, GFP_NOIO);
1180 		if (digest) {
1181 			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1182 
1183 			D_ASSERT(digest_size == di->digest_size);
1184 			eq = !memcmp(digest, di->digest, digest_size);
1185 			kfree(digest);
1186 		}
1187 	}
1188 
1189 		/* Free e and pages before send.
1190 		 * In case we block on congestion, we could otherwise run into
1191 		 * some distributed deadlock, if the other side blocks on
1192 		 * congestion as well, because our receiver blocks in
1193 		 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1194 	drbd_free_ee(mdev, e);
1195 	if (!eq)
1196 		drbd_ov_oos_found(mdev, sector, size);
1197 	else
1198 		ov_oos_print(mdev);
1199 
1200 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1201 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1202 
1203 	dec_unacked(mdev);
1204 
1205 	--mdev->ov_left;
1206 
1207 	/* let's advance progress step marks only for every other megabyte */
1208 	if ((mdev->ov_left & 0x200) == 0x200)
1209 		drbd_advance_rs_marks(mdev, mdev->ov_left);
1210 
1211 	if (mdev->ov_left == 0) {
1212 		ov_oos_print(mdev);
1213 		drbd_resync_finished(mdev);
1214 	}
1215 
1216 	return ok;
1217 }
1218 
1219 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1220 {
1221 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1222 	complete(&b->done);
1223 	return 1;
1224 }
1225 
1226 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1227 {
1228 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1229 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1230 	int ok = 1;
1231 
1232 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1233 	 * just before it was reassigned and re-queued, so double check that.
1234 	 * actually, this race was harmless, since we only try to send the
1235 	 * barrier packet here, and otherwise do nothing with the object.
1236 	 * but compare with the head of w_clear_epoch */
1237 	spin_lock_irq(&mdev->req_lock);
1238 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1239 		cancel = 1;
1240 	spin_unlock_irq(&mdev->req_lock);
1241 	if (cancel)
1242 		return 1;
1243 
1244 	if (!drbd_get_data_sock(mdev))
1245 		return 0;
1246 	p->barrier = b->br_number;
1247 	/* inc_ap_pending was done where this was queued.
1248 	 * dec_ap_pending will be done in got_BarrierAck
1249 	 * or (on connection loss) in w_clear_epoch.  */
1250 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1251 				(struct p_header80 *)p, sizeof(*p), 0);
1252 	drbd_put_data_sock(mdev);
1253 
1254 	return ok;
1255 }
1256 
1257 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1258 {
1259 	if (cancel)
1260 		return 1;
1261 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1262 }
1263 
1264 int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1265 {
1266 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1267 	int ok;
1268 
1269 	if (unlikely(cancel)) {
1270 		req_mod(req, send_canceled);
1271 		return 1;
1272 	}
1273 
1274 	ok = drbd_send_oos(mdev, req);
1275 	req_mod(req, oos_handed_to_network);
1276 
1277 	return ok;
1278 }
1279 
1280 /**
1281  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1282  * @mdev:	DRBD device.
1283  * @w:		work object.
1284  * @cancel:	The connection will be closed anyways
1285  */
1286 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1287 {
1288 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1289 	int ok;
1290 
1291 	if (unlikely(cancel)) {
1292 		req_mod(req, send_canceled);
1293 		return 1;
1294 	}
1295 
1296 	ok = drbd_send_dblock(mdev, req);
1297 	req_mod(req, ok ? handed_over_to_network : send_failed);
1298 
1299 	return ok;
1300 }
1301 
1302 /**
1303  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1304  * @mdev:	DRBD device.
1305  * @w:		work object.
1306  * @cancel:	The connection will be closed anyways
1307  */
1308 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1309 {
1310 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1311 	int ok;
1312 
1313 	if (unlikely(cancel)) {
1314 		req_mod(req, send_canceled);
1315 		return 1;
1316 	}
1317 
1318 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1319 				(unsigned long)req);
1320 
1321 	if (!ok) {
1322 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1323 		 * so this is probably redundant */
1324 		if (mdev->state.conn >= C_CONNECTED)
1325 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1326 	}
1327 	req_mod(req, ok ? handed_over_to_network : send_failed);
1328 
1329 	return ok;
1330 }
1331 
1332 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1333 {
1334 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1335 
1336 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1337 		drbd_al_begin_io(mdev, req->sector);
1338 	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1339 	   theoretically. Practically it can not deadlock, since this is
1340 	   only used when unfreezing IOs. All the extents of the requests
1341 	   that made it into the TL are already active */
1342 
1343 	drbd_req_make_private_bio(req, req->master_bio);
1344 	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1345 	generic_make_request(req->private_bio);
1346 
1347 	return 1;
1348 }
1349 
1350 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1351 {
1352 	struct drbd_conf *odev = mdev;
1353 
1354 	while (1) {
1355 		if (odev->sync_conf.after == -1)
1356 			return 1;
1357 		odev = minor_to_mdev(odev->sync_conf.after);
1358 		ERR_IF(!odev) return 1;
1359 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1360 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1361 		    odev->state.aftr_isp || odev->state.peer_isp ||
1362 		    odev->state.user_isp)
1363 			return 0;
1364 	}
1365 }
1366 
1367 /**
1368  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1369  * @mdev:	DRBD device.
1370  *
1371  * Called from process context only (admin command and after_state_ch).
1372  */
1373 static int _drbd_pause_after(struct drbd_conf *mdev)
1374 {
1375 	struct drbd_conf *odev;
1376 	int i, rv = 0;
1377 
1378 	for (i = 0; i < minor_count; i++) {
1379 		odev = minor_to_mdev(i);
1380 		if (!odev)
1381 			continue;
1382 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1383 			continue;
1384 		if (!_drbd_may_sync_now(odev))
1385 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1386 			       != SS_NOTHING_TO_DO);
1387 	}
1388 
1389 	return rv;
1390 }
1391 
1392 /**
1393  * _drbd_resume_next() - Resume resync on all devices that may resync now
1394  * @mdev:	DRBD device.
1395  *
1396  * Called from process context only (admin command and worker).
1397  */
1398 static int _drbd_resume_next(struct drbd_conf *mdev)
1399 {
1400 	struct drbd_conf *odev;
1401 	int i, rv = 0;
1402 
1403 	for (i = 0; i < minor_count; i++) {
1404 		odev = minor_to_mdev(i);
1405 		if (!odev)
1406 			continue;
1407 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1408 			continue;
1409 		if (odev->state.aftr_isp) {
1410 			if (_drbd_may_sync_now(odev))
1411 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1412 							CS_HARD, NULL)
1413 				       != SS_NOTHING_TO_DO) ;
1414 		}
1415 	}
1416 	return rv;
1417 }
1418 
1419 void resume_next_sg(struct drbd_conf *mdev)
1420 {
1421 	write_lock_irq(&global_state_lock);
1422 	_drbd_resume_next(mdev);
1423 	write_unlock_irq(&global_state_lock);
1424 }
1425 
1426 void suspend_other_sg(struct drbd_conf *mdev)
1427 {
1428 	write_lock_irq(&global_state_lock);
1429 	_drbd_pause_after(mdev);
1430 	write_unlock_irq(&global_state_lock);
1431 }
1432 
1433 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1434 {
1435 	struct drbd_conf *odev;
1436 
1437 	if (o_minor == -1)
1438 		return NO_ERROR;
1439 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1440 		return ERR_SYNC_AFTER;
1441 
1442 	/* check for loops */
1443 	odev = minor_to_mdev(o_minor);
1444 	while (1) {
1445 		if (odev == mdev)
1446 			return ERR_SYNC_AFTER_CYCLE;
1447 
1448 		/* dependency chain ends here, no cycles. */
1449 		if (odev->sync_conf.after == -1)
1450 			return NO_ERROR;
1451 
1452 		/* follow the dependency chain */
1453 		odev = minor_to_mdev(odev->sync_conf.after);
1454 	}
1455 }
1456 
1457 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1458 {
1459 	int changes;
1460 	int retcode;
1461 
1462 	write_lock_irq(&global_state_lock);
1463 	retcode = sync_after_error(mdev, na);
1464 	if (retcode == NO_ERROR) {
1465 		mdev->sync_conf.after = na;
1466 		do {
1467 			changes  = _drbd_pause_after(mdev);
1468 			changes |= _drbd_resume_next(mdev);
1469 		} while (changes);
1470 	}
1471 	write_unlock_irq(&global_state_lock);
1472 	return retcode;
1473 }
1474 
1475 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1476 {
1477 	atomic_set(&mdev->rs_sect_in, 0);
1478 	atomic_set(&mdev->rs_sect_ev, 0);
1479 	mdev->rs_in_flight = 0;
1480 	mdev->rs_planed = 0;
1481 	spin_lock(&mdev->peer_seq_lock);
1482 	fifo_set(&mdev->rs_plan_s, 0);
1483 	spin_unlock(&mdev->peer_seq_lock);
1484 }
1485 
1486 /**
1487  * drbd_start_resync() - Start the resync process
1488  * @mdev:	DRBD device.
1489  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1490  *
1491  * This function might bring you directly into one of the
1492  * C_PAUSED_SYNC_* states.
1493  */
1494 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1495 {
1496 	union drbd_state ns;
1497 	int r;
1498 
1499 	if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1500 		dev_err(DEV, "Resync already running!\n");
1501 		return;
1502 	}
1503 
1504 	if (mdev->state.conn < C_AHEAD) {
1505 		/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1506 		drbd_rs_cancel_all(mdev);
1507 		/* This should be done when we abort the resync. We definitely do not
1508 		   want to have this for connections going back and forth between
1509 		   Ahead/Behind and SyncSource/SyncTarget */
1510 	}
1511 
1512 	if (side == C_SYNC_TARGET) {
1513 		/* Since application IO was locked out during C_WF_BITMAP_T and
1514 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1515 		   we check that we might make the data inconsistent. */
1516 		r = drbd_khelper(mdev, "before-resync-target");
1517 		r = (r >> 8) & 0xff;
1518 		if (r > 0) {
1519 			dev_info(DEV, "before-resync-target handler returned %d, "
1520 			     "dropping connection.\n", r);
1521 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1522 			return;
1523 		}
1524 	} else /* C_SYNC_SOURCE */ {
1525 		r = drbd_khelper(mdev, "before-resync-source");
1526 		r = (r >> 8) & 0xff;
1527 		if (r > 0) {
1528 			if (r == 3) {
1529 				dev_info(DEV, "before-resync-source handler returned %d, "
1530 					 "ignoring. Old userland tools?", r);
1531 			} else {
1532 				dev_info(DEV, "before-resync-source handler returned %d, "
1533 					 "dropping connection.\n", r);
1534 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1535 				return;
1536 			}
1537 		}
1538 	}
1539 
1540 	drbd_state_lock(mdev);
1541 	write_lock_irq(&global_state_lock);
1542 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1543 		write_unlock_irq(&global_state_lock);
1544 		drbd_state_unlock(mdev);
1545 		return;
1546 	}
1547 
1548 	ns.i = mdev->state.i;
1549 
1550 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1551 
1552 	ns.conn = side;
1553 
1554 	if (side == C_SYNC_TARGET)
1555 		ns.disk = D_INCONSISTENT;
1556 	else /* side == C_SYNC_SOURCE */
1557 		ns.pdsk = D_INCONSISTENT;
1558 
1559 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1560 	ns = mdev->state;
1561 
1562 	if (ns.conn < C_CONNECTED)
1563 		r = SS_UNKNOWN_ERROR;
1564 
1565 	if (r == SS_SUCCESS) {
1566 		unsigned long tw = drbd_bm_total_weight(mdev);
1567 		unsigned long now = jiffies;
1568 		int i;
1569 
1570 		mdev->rs_failed    = 0;
1571 		mdev->rs_paused    = 0;
1572 		mdev->rs_same_csum = 0;
1573 		mdev->rs_last_events = 0;
1574 		mdev->rs_last_sect_ev = 0;
1575 		mdev->rs_total     = tw;
1576 		mdev->rs_start     = now;
1577 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1578 			mdev->rs_mark_left[i] = tw;
1579 			mdev->rs_mark_time[i] = now;
1580 		}
1581 		_drbd_pause_after(mdev);
1582 	}
1583 	write_unlock_irq(&global_state_lock);
1584 
1585 	if (r == SS_SUCCESS) {
1586 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1587 		     drbd_conn_str(ns.conn),
1588 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1589 		     (unsigned long) mdev->rs_total);
1590 		if (side == C_SYNC_TARGET)
1591 			mdev->bm_resync_fo = 0;
1592 
1593 		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1594 		 * with w_send_oos, or the sync target will get confused as to
1595 		 * how much bits to resync.  We cannot do that always, because for an
1596 		 * empty resync and protocol < 95, we need to do it here, as we call
1597 		 * drbd_resync_finished from here in that case.
1598 		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1599 		 * and from after_state_ch otherwise. */
1600 		if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1601 			drbd_gen_and_send_sync_uuid(mdev);
1602 
1603 		if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1604 			/* This still has a race (about when exactly the peers
1605 			 * detect connection loss) that can lead to a full sync
1606 			 * on next handshake. In 8.3.9 we fixed this with explicit
1607 			 * resync-finished notifications, but the fix
1608 			 * introduces a protocol change.  Sleeping for some
1609 			 * time longer than the ping interval + timeout on the
1610 			 * SyncSource, to give the SyncTarget the chance to
1611 			 * detect connection loss, then waiting for a ping
1612 			 * response (implicit in drbd_resync_finished) reduces
1613 			 * the race considerably, but does not solve it. */
1614 			if (side == C_SYNC_SOURCE)
1615 				schedule_timeout_interruptible(
1616 					mdev->net_conf->ping_int * HZ +
1617 					mdev->net_conf->ping_timeo*HZ/9);
1618 			drbd_resync_finished(mdev);
1619 		}
1620 
1621 		drbd_rs_controller_reset(mdev);
1622 		/* ns.conn may already be != mdev->state.conn,
1623 		 * we may have been paused in between, or become paused until
1624 		 * the timer triggers.
1625 		 * No matter, that is handled in resync_timer_fn() */
1626 		if (ns.conn == C_SYNC_TARGET)
1627 			mod_timer(&mdev->resync_timer, jiffies);
1628 
1629 		drbd_md_sync(mdev);
1630 	}
1631 	put_ldev(mdev);
1632 	drbd_state_unlock(mdev);
1633 }
1634 
1635 int drbd_worker(struct drbd_thread *thi)
1636 {
1637 	struct drbd_conf *mdev = thi->mdev;
1638 	struct drbd_work *w = NULL;
1639 	LIST_HEAD(work_list);
1640 	int intr = 0, i;
1641 
1642 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1643 
1644 	while (get_t_state(thi) == Running) {
1645 		drbd_thread_current_set_cpu(mdev);
1646 
1647 		if (down_trylock(&mdev->data.work.s)) {
1648 			mutex_lock(&mdev->data.mutex);
1649 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1650 				drbd_tcp_uncork(mdev->data.socket);
1651 			mutex_unlock(&mdev->data.mutex);
1652 
1653 			intr = down_interruptible(&mdev->data.work.s);
1654 
1655 			mutex_lock(&mdev->data.mutex);
1656 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1657 				drbd_tcp_cork(mdev->data.socket);
1658 			mutex_unlock(&mdev->data.mutex);
1659 		}
1660 
1661 		if (intr) {
1662 			D_ASSERT(intr == -EINTR);
1663 			flush_signals(current);
1664 			ERR_IF (get_t_state(thi) == Running)
1665 				continue;
1666 			break;
1667 		}
1668 
1669 		if (get_t_state(thi) != Running)
1670 			break;
1671 		/* With this break, we have done a down() but not consumed
1672 		   the entry from the list. The cleanup code takes care of
1673 		   this...   */
1674 
1675 		w = NULL;
1676 		spin_lock_irq(&mdev->data.work.q_lock);
1677 		ERR_IF(list_empty(&mdev->data.work.q)) {
1678 			/* something terribly wrong in our logic.
1679 			 * we were able to down() the semaphore,
1680 			 * but the list is empty... doh.
1681 			 *
1682 			 * what is the best thing to do now?
1683 			 * try again from scratch, restarting the receiver,
1684 			 * asender, whatnot? could break even more ugly,
1685 			 * e.g. when we are primary, but no good local data.
1686 			 *
1687 			 * I'll try to get away just starting over this loop.
1688 			 */
1689 			spin_unlock_irq(&mdev->data.work.q_lock);
1690 			continue;
1691 		}
1692 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1693 		list_del_init(&w->list);
1694 		spin_unlock_irq(&mdev->data.work.q_lock);
1695 
1696 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1697 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1698 			if (mdev->state.conn >= C_CONNECTED)
1699 				drbd_force_state(mdev,
1700 						NS(conn, C_NETWORK_FAILURE));
1701 		}
1702 	}
1703 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1704 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1705 
1706 	spin_lock_irq(&mdev->data.work.q_lock);
1707 	i = 0;
1708 	while (!list_empty(&mdev->data.work.q)) {
1709 		list_splice_init(&mdev->data.work.q, &work_list);
1710 		spin_unlock_irq(&mdev->data.work.q_lock);
1711 
1712 		while (!list_empty(&work_list)) {
1713 			w = list_entry(work_list.next, struct drbd_work, list);
1714 			list_del_init(&w->list);
1715 			w->cb(mdev, w, 1);
1716 			i++; /* dead debugging code */
1717 		}
1718 
1719 		spin_lock_irq(&mdev->data.work.q_lock);
1720 	}
1721 	sema_init(&mdev->data.work.s, 0);
1722 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1723 	 * but up() ed outside the spinlock, we could get an up() on the
1724 	 * semaphore without corresponding list entry.
1725 	 * So don't do that.
1726 	 */
1727 	spin_unlock_irq(&mdev->data.work.q_lock);
1728 
1729 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1730 	/* _drbd_set_state only uses stop_nowait.
1731 	 * wait here for the Exiting receiver. */
1732 	drbd_thread_stop(&mdev->receiver);
1733 	drbd_mdev_cleanup(mdev);
1734 
1735 	dev_info(DEV, "worker terminated\n");
1736 
1737 	clear_bit(DEVICE_DYING, &mdev->flags);
1738 	clear_bit(CONFIG_PENDING, &mdev->flags);
1739 	wake_up(&mdev->state_wait);
1740 
1741 	return 0;
1742 }
1743