xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision 1816a2b4)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40 
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 
43 
44 
45 /* defined here:
46    drbd_md_io_complete
47    drbd_endio_sec
48    drbd_endio_pri
49 
50  * more endio handlers:
51    atodb_endio in drbd_actlog.c
52    drbd_bm_async_io_complete in drbd_bitmap.c
53 
54  * For all these callbacks, note the following:
55  * The callbacks will be called in irq context by the IDE drivers,
56  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
57  * Try to get the locking right :)
58  *
59  */
60 
61 
62 /* About the global_state_lock
63    Each state transition on an device holds a read lock. In case we have
64    to evaluate the sync after dependencies, we grab a write lock, because
65    we need stable states on all devices for that.  */
66 rwlock_t global_state_lock;
67 
68 /* used for synchronous meta data and bitmap IO
69  * submitted by drbd_md_sync_page_io()
70  */
71 void drbd_md_io_complete(struct bio *bio, int error)
72 {
73 	struct drbd_md_io *md_io;
74 
75 	md_io = (struct drbd_md_io *)bio->bi_private;
76 	md_io->error = error;
77 
78 	complete(&md_io->event);
79 }
80 
81 /* reads on behalf of the partner,
82  * "submitted" by the receiver
83  */
84 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
85 {
86 	unsigned long flags = 0;
87 	struct drbd_conf *mdev = e->mdev;
88 
89 	D_ASSERT(e->block_id != ID_VACANT);
90 
91 	spin_lock_irqsave(&mdev->req_lock, flags);
92 	mdev->read_cnt += e->size >> 9;
93 	list_del(&e->w.list);
94 	if (list_empty(&mdev->read_ee))
95 		wake_up(&mdev->ee_wait);
96 	if (test_bit(__EE_WAS_ERROR, &e->flags))
97 		__drbd_chk_io_error(mdev, FALSE);
98 	spin_unlock_irqrestore(&mdev->req_lock, flags);
99 
100 	drbd_queue_work(&mdev->data.work, &e->w);
101 	put_ldev(mdev);
102 }
103 
104 /* writes on behalf of the partner, or resync writes,
105  * "submitted" by the receiver, final stage.  */
106 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
107 {
108 	unsigned long flags = 0;
109 	struct drbd_conf *mdev = e->mdev;
110 	sector_t e_sector;
111 	int do_wake;
112 	int is_syncer_req;
113 	int do_al_complete_io;
114 
115 	D_ASSERT(e->block_id != ID_VACANT);
116 
117 	/* after we moved e to done_ee,
118 	 * we may no longer access it,
119 	 * it may be freed/reused already!
120 	 * (as soon as we release the req_lock) */
121 	e_sector = e->sector;
122 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
123 	is_syncer_req = is_syncer_block_id(e->block_id);
124 
125 	spin_lock_irqsave(&mdev->req_lock, flags);
126 	mdev->writ_cnt += e->size >> 9;
127 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
128 	list_add_tail(&e->w.list, &mdev->done_ee);
129 
130 	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
131 	 * neither did we wake possibly waiting conflicting requests.
132 	 * done from "drbd_process_done_ee" within the appropriate w.cb
133 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
134 
135 	do_wake = is_syncer_req
136 		? list_empty(&mdev->sync_ee)
137 		: list_empty(&mdev->active_ee);
138 
139 	if (test_bit(__EE_WAS_ERROR, &e->flags))
140 		__drbd_chk_io_error(mdev, FALSE);
141 	spin_unlock_irqrestore(&mdev->req_lock, flags);
142 
143 	if (is_syncer_req)
144 		drbd_rs_complete_io(mdev, e_sector);
145 
146 	if (do_wake)
147 		wake_up(&mdev->ee_wait);
148 
149 	if (do_al_complete_io)
150 		drbd_al_complete_io(mdev, e_sector);
151 
152 	wake_asender(mdev);
153 	put_ldev(mdev);
154 }
155 
156 /* writes on behalf of the partner, or resync writes,
157  * "submitted" by the receiver.
158  */
159 void drbd_endio_sec(struct bio *bio, int error)
160 {
161 	struct drbd_epoch_entry *e = bio->bi_private;
162 	struct drbd_conf *mdev = e->mdev;
163 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
164 	int is_write = bio_data_dir(bio) == WRITE;
165 
166 	if (error)
167 		dev_warn(DEV, "%s: error=%d s=%llus\n",
168 				is_write ? "write" : "read", error,
169 				(unsigned long long)e->sector);
170 	if (!error && !uptodate) {
171 		dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
172 				is_write ? "write" : "read",
173 				(unsigned long long)e->sector);
174 		/* strange behavior of some lower level drivers...
175 		 * fail the request by clearing the uptodate flag,
176 		 * but do not return any error?! */
177 		error = -EIO;
178 	}
179 
180 	if (error)
181 		set_bit(__EE_WAS_ERROR, &e->flags);
182 
183 	bio_put(bio); /* no need for the bio anymore */
184 	if (atomic_dec_and_test(&e->pending_bios)) {
185 		if (is_write)
186 			drbd_endio_write_sec_final(e);
187 		else
188 			drbd_endio_read_sec_final(e);
189 	}
190 }
191 
192 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
193  */
194 void drbd_endio_pri(struct bio *bio, int error)
195 {
196 	unsigned long flags;
197 	struct drbd_request *req = bio->bi_private;
198 	struct drbd_conf *mdev = req->mdev;
199 	struct bio_and_error m;
200 	enum drbd_req_event what;
201 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
202 
203 	if (!error && !uptodate) {
204 		dev_warn(DEV, "p %s: setting error to -EIO\n",
205 			 bio_data_dir(bio) == WRITE ? "write" : "read");
206 		/* strange behavior of some lower level drivers...
207 		 * fail the request by clearing the uptodate flag,
208 		 * but do not return any error?! */
209 		error = -EIO;
210 	}
211 
212 	/* to avoid recursion in __req_mod */
213 	if (unlikely(error)) {
214 		what = (bio_data_dir(bio) == WRITE)
215 			? write_completed_with_error
216 			: (bio_rw(bio) == READ)
217 			  ? read_completed_with_error
218 			  : read_ahead_completed_with_error;
219 	} else
220 		what = completed_ok;
221 
222 	bio_put(req->private_bio);
223 	req->private_bio = ERR_PTR(error);
224 
225 	/* not req_mod(), we need irqsave here! */
226 	spin_lock_irqsave(&mdev->req_lock, flags);
227 	__req_mod(req, what, &m);
228 	spin_unlock_irqrestore(&mdev->req_lock, flags);
229 
230 	if (m.bio)
231 		complete_master_bio(mdev, &m);
232 }
233 
234 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
235 {
236 	struct drbd_request *req = container_of(w, struct drbd_request, w);
237 
238 	/* We should not detach for read io-error,
239 	 * but try to WRITE the P_DATA_REPLY to the failed location,
240 	 * to give the disk the chance to relocate that block */
241 
242 	spin_lock_irq(&mdev->req_lock);
243 	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
244 		_req_mod(req, read_retry_remote_canceled);
245 		spin_unlock_irq(&mdev->req_lock);
246 		return 1;
247 	}
248 	spin_unlock_irq(&mdev->req_lock);
249 
250 	return w_send_read_req(mdev, w, 0);
251 }
252 
253 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
254 {
255 	ERR_IF(cancel) return 1;
256 	dev_err(DEV, "resync inactive, but callback triggered??\n");
257 	return 1; /* Simply ignore this! */
258 }
259 
260 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
261 {
262 	struct hash_desc desc;
263 	struct scatterlist sg;
264 	struct page *page = e->pages;
265 	struct page *tmp;
266 	unsigned len;
267 
268 	desc.tfm = tfm;
269 	desc.flags = 0;
270 
271 	sg_init_table(&sg, 1);
272 	crypto_hash_init(&desc);
273 
274 	while ((tmp = page_chain_next(page))) {
275 		/* all but the last page will be fully used */
276 		sg_set_page(&sg, page, PAGE_SIZE, 0);
277 		crypto_hash_update(&desc, &sg, sg.length);
278 		page = tmp;
279 	}
280 	/* and now the last, possibly only partially used page */
281 	len = e->size & (PAGE_SIZE - 1);
282 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
283 	crypto_hash_update(&desc, &sg, sg.length);
284 	crypto_hash_final(&desc, digest);
285 }
286 
287 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
288 {
289 	struct hash_desc desc;
290 	struct scatterlist sg;
291 	struct bio_vec *bvec;
292 	int i;
293 
294 	desc.tfm = tfm;
295 	desc.flags = 0;
296 
297 	sg_init_table(&sg, 1);
298 	crypto_hash_init(&desc);
299 
300 	__bio_for_each_segment(bvec, bio, i, 0) {
301 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
302 		crypto_hash_update(&desc, &sg, sg.length);
303 	}
304 	crypto_hash_final(&desc, digest);
305 }
306 
307 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
308 {
309 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
310 	int digest_size;
311 	void *digest;
312 	int ok;
313 
314 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
315 
316 	if (unlikely(cancel)) {
317 		drbd_free_ee(mdev, e);
318 		return 1;
319 	}
320 
321 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
322 		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
323 		digest = kmalloc(digest_size, GFP_NOIO);
324 		if (digest) {
325 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
326 
327 			inc_rs_pending(mdev);
328 			ok = drbd_send_drequest_csum(mdev,
329 						     e->sector,
330 						     e->size,
331 						     digest,
332 						     digest_size,
333 						     P_CSUM_RS_REQUEST);
334 			kfree(digest);
335 		} else {
336 			dev_err(DEV, "kmalloc() of digest failed.\n");
337 			ok = 0;
338 		}
339 	} else
340 		ok = 1;
341 
342 	drbd_free_ee(mdev, e);
343 
344 	if (unlikely(!ok))
345 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
346 	return ok;
347 }
348 
349 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
350 
351 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
352 {
353 	struct drbd_epoch_entry *e;
354 
355 	if (!get_ldev(mdev))
356 		return -EIO;
357 
358 	if (drbd_rs_should_slow_down(mdev))
359 		goto defer;
360 
361 	/* GFP_TRY, because if there is no memory available right now, this may
362 	 * be rescheduled for later. It is "only" background resync, after all. */
363 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
364 	if (!e)
365 		goto defer;
366 
367 	e->w.cb = w_e_send_csum;
368 	spin_lock_irq(&mdev->req_lock);
369 	list_add(&e->w.list, &mdev->read_ee);
370 	spin_unlock_irq(&mdev->req_lock);
371 
372 	atomic_add(size >> 9, &mdev->rs_sect_ev);
373 	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
374 		return 0;
375 
376 	/* drbd_submit_ee currently fails for one reason only:
377 	 * not being able to allocate enough bios.
378 	 * Is dropping the connection going to help? */
379 	spin_lock_irq(&mdev->req_lock);
380 	list_del(&e->w.list);
381 	spin_unlock_irq(&mdev->req_lock);
382 
383 	drbd_free_ee(mdev, e);
384 defer:
385 	put_ldev(mdev);
386 	return -EAGAIN;
387 }
388 
389 void resync_timer_fn(unsigned long data)
390 {
391 	struct drbd_conf *mdev = (struct drbd_conf *) data;
392 	int queue;
393 
394 	queue = 1;
395 	switch (mdev->state.conn) {
396 	case C_VERIFY_S:
397 		mdev->resync_work.cb = w_make_ov_request;
398 		break;
399 	case C_SYNC_TARGET:
400 		mdev->resync_work.cb = w_make_resync_request;
401 		break;
402 	default:
403 		queue = 0;
404 		mdev->resync_work.cb = w_resync_inactive;
405 	}
406 
407 	/* harmless race: list_empty outside data.work.q_lock */
408 	if (list_empty(&mdev->resync_work.list) && queue)
409 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
410 }
411 
412 static void fifo_set(struct fifo_buffer *fb, int value)
413 {
414 	int i;
415 
416 	for (i = 0; i < fb->size; i++)
417 		fb->values[i] = value;
418 }
419 
420 static int fifo_push(struct fifo_buffer *fb, int value)
421 {
422 	int ov;
423 
424 	ov = fb->values[fb->head_index];
425 	fb->values[fb->head_index++] = value;
426 
427 	if (fb->head_index >= fb->size)
428 		fb->head_index = 0;
429 
430 	return ov;
431 }
432 
433 static void fifo_add_val(struct fifo_buffer *fb, int value)
434 {
435 	int i;
436 
437 	for (i = 0; i < fb->size; i++)
438 		fb->values[i] += value;
439 }
440 
441 int drbd_rs_controller(struct drbd_conf *mdev)
442 {
443 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
444 	unsigned int want;     /* The number of sectors we want in the proxy */
445 	int req_sect; /* Number of sectors to request in this turn */
446 	int correction; /* Number of sectors more we need in the proxy*/
447 	int cps; /* correction per invocation of drbd_rs_controller() */
448 	int steps; /* Number of time steps to plan ahead */
449 	int curr_corr;
450 	int max_sect;
451 
452 	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
453 	mdev->rs_in_flight -= sect_in;
454 
455 	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
456 
457 	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
458 
459 	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
460 		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
461 	} else { /* normal path */
462 		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
463 			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
464 	}
465 
466 	correction = want - mdev->rs_in_flight - mdev->rs_planed;
467 
468 	/* Plan ahead */
469 	cps = correction / steps;
470 	fifo_add_val(&mdev->rs_plan_s, cps);
471 	mdev->rs_planed += cps * steps;
472 
473 	/* What we do in this step */
474 	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
475 	spin_unlock(&mdev->peer_seq_lock);
476 	mdev->rs_planed -= curr_corr;
477 
478 	req_sect = sect_in + curr_corr;
479 	if (req_sect < 0)
480 		req_sect = 0;
481 
482 	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
483 	if (req_sect > max_sect)
484 		req_sect = max_sect;
485 
486 	/*
487 	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
488 		 sect_in, mdev->rs_in_flight, want, correction,
489 		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
490 	*/
491 
492 	return req_sect;
493 }
494 
495 int drbd_rs_number_requests(struct drbd_conf *mdev)
496 {
497 	int number;
498 	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
499 		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
500 		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
501 	} else {
502 		mdev->c_sync_rate = mdev->sync_conf.rate;
503 		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
504 	}
505 
506 	/* Throttle resync on lower level disk activity, which may also be
507 	 * caused by application IO on Primary/SyncTarget.
508 	 * Keep this after the call to drbd_rs_controller, as that assumes
509 	 * to be called as precisely as possible every SLEEP_TIME,
510 	 * and would be confused otherwise. */
511 	if (number && drbd_rs_should_slow_down(mdev)) {
512 		mdev->c_sync_rate = 1;
513 		number = 0;
514 	}
515 
516 	/* ignore the amount of pending requests, the resync controller should
517 	 * throttle down to incoming reply rate soon enough anyways. */
518 	return number;
519 }
520 
521 int w_make_resync_request(struct drbd_conf *mdev,
522 		struct drbd_work *w, int cancel)
523 {
524 	unsigned long bit;
525 	sector_t sector;
526 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
527 	int max_bio_size;
528 	int number, rollback_i, size;
529 	int align, queued, sndbuf;
530 	int i = 0;
531 
532 	if (unlikely(cancel))
533 		return 1;
534 
535 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
536 		dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
537 		return 0;
538 	}
539 
540 	if (mdev->state.conn != C_SYNC_TARGET)
541 		dev_err(DEV, "%s in w_make_resync_request\n",
542 			drbd_conn_str(mdev->state.conn));
543 
544 	if (mdev->rs_total == 0) {
545 		/* empty resync? */
546 		drbd_resync_finished(mdev);
547 		return 1;
548 	}
549 
550 	if (!get_ldev(mdev)) {
551 		/* Since we only need to access mdev->rsync a
552 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
553 		   to continue resync with a broken disk makes no sense at
554 		   all */
555 		dev_err(DEV, "Disk broke down during resync!\n");
556 		mdev->resync_work.cb = w_resync_inactive;
557 		return 1;
558 	}
559 
560 	/* starting with drbd 8.3.8, we can handle multi-bio EEs,
561 	 * if it should be necessary */
562 	max_bio_size =
563 		mdev->agreed_pro_version < 94 ? queue_max_hw_sectors(mdev->rq_queue) << 9 :
564 		mdev->agreed_pro_version < 95 ?	DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_BIO_SIZE;
565 
566 	number = drbd_rs_number_requests(mdev);
567 	if (number == 0)
568 		goto requeue;
569 
570 	for (i = 0; i < number; i++) {
571 		/* Stop generating RS requests, when half of the send buffer is filled */
572 		mutex_lock(&mdev->data.mutex);
573 		if (mdev->data.socket) {
574 			queued = mdev->data.socket->sk->sk_wmem_queued;
575 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
576 		} else {
577 			queued = 1;
578 			sndbuf = 0;
579 		}
580 		mutex_unlock(&mdev->data.mutex);
581 		if (queued > sndbuf / 2)
582 			goto requeue;
583 
584 next_sector:
585 		size = BM_BLOCK_SIZE;
586 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
587 
588 		if (bit == -1UL) {
589 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
590 			mdev->resync_work.cb = w_resync_inactive;
591 			put_ldev(mdev);
592 			return 1;
593 		}
594 
595 		sector = BM_BIT_TO_SECT(bit);
596 
597 		if (drbd_try_rs_begin_io(mdev, sector)) {
598 			mdev->bm_resync_fo = bit;
599 			goto requeue;
600 		}
601 		mdev->bm_resync_fo = bit + 1;
602 
603 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
604 			drbd_rs_complete_io(mdev, sector);
605 			goto next_sector;
606 		}
607 
608 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
609 		/* try to find some adjacent bits.
610 		 * we stop if we have already the maximum req size.
611 		 *
612 		 * Additionally always align bigger requests, in order to
613 		 * be prepared for all stripe sizes of software RAIDs.
614 		 */
615 		align = 1;
616 		rollback_i = i;
617 		for (;;) {
618 			if (size + BM_BLOCK_SIZE > max_bio_size)
619 				break;
620 
621 			/* Be always aligned */
622 			if (sector & ((1<<(align+3))-1))
623 				break;
624 
625 			/* do not cross extent boundaries */
626 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
627 				break;
628 			/* now, is it actually dirty, after all?
629 			 * caution, drbd_bm_test_bit is tri-state for some
630 			 * obscure reason; ( b == 0 ) would get the out-of-band
631 			 * only accidentally right because of the "oddly sized"
632 			 * adjustment below */
633 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
634 				break;
635 			bit++;
636 			size += BM_BLOCK_SIZE;
637 			if ((BM_BLOCK_SIZE << align) <= size)
638 				align++;
639 			i++;
640 		}
641 		/* if we merged some,
642 		 * reset the offset to start the next drbd_bm_find_next from */
643 		if (size > BM_BLOCK_SIZE)
644 			mdev->bm_resync_fo = bit + 1;
645 #endif
646 
647 		/* adjust very last sectors, in case we are oddly sized */
648 		if (sector + (size>>9) > capacity)
649 			size = (capacity-sector)<<9;
650 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
651 			switch (read_for_csum(mdev, sector, size)) {
652 			case -EIO: /* Disk failure */
653 				put_ldev(mdev);
654 				return 0;
655 			case -EAGAIN: /* allocation failed, or ldev busy */
656 				drbd_rs_complete_io(mdev, sector);
657 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
658 				i = rollback_i;
659 				goto requeue;
660 			case 0:
661 				/* everything ok */
662 				break;
663 			default:
664 				BUG();
665 			}
666 		} else {
667 			inc_rs_pending(mdev);
668 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
669 					       sector, size, ID_SYNCER)) {
670 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
671 				dec_rs_pending(mdev);
672 				put_ldev(mdev);
673 				return 0;
674 			}
675 		}
676 	}
677 
678 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
679 		/* last syncer _request_ was sent,
680 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
681 		 * next sync group will resume), as soon as we receive the last
682 		 * resync data block, and the last bit is cleared.
683 		 * until then resync "work" is "inactive" ...
684 		 */
685 		mdev->resync_work.cb = w_resync_inactive;
686 		put_ldev(mdev);
687 		return 1;
688 	}
689 
690  requeue:
691 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
692 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
693 	put_ldev(mdev);
694 	return 1;
695 }
696 
697 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
698 {
699 	int number, i, size;
700 	sector_t sector;
701 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
702 
703 	if (unlikely(cancel))
704 		return 1;
705 
706 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
707 		dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
708 		return 0;
709 	}
710 
711 	number = drbd_rs_number_requests(mdev);
712 
713 	sector = mdev->ov_position;
714 	for (i = 0; i < number; i++) {
715 		if (sector >= capacity) {
716 			mdev->resync_work.cb = w_resync_inactive;
717 			return 1;
718 		}
719 
720 		size = BM_BLOCK_SIZE;
721 
722 		if (drbd_try_rs_begin_io(mdev, sector)) {
723 			mdev->ov_position = sector;
724 			goto requeue;
725 		}
726 
727 		if (sector + (size>>9) > capacity)
728 			size = (capacity-sector)<<9;
729 
730 		inc_rs_pending(mdev);
731 		if (!drbd_send_ov_request(mdev, sector, size)) {
732 			dec_rs_pending(mdev);
733 			return 0;
734 		}
735 		sector += BM_SECT_PER_BIT;
736 	}
737 	mdev->ov_position = sector;
738 
739  requeue:
740 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
741 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
742 	return 1;
743 }
744 
745 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
746 {
747 	kfree(w);
748 	ov_oos_print(mdev);
749 	drbd_resync_finished(mdev);
750 
751 	return 1;
752 }
753 
754 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
755 {
756 	kfree(w);
757 
758 	drbd_resync_finished(mdev);
759 
760 	return 1;
761 }
762 
763 static void ping_peer(struct drbd_conf *mdev)
764 {
765 	clear_bit(GOT_PING_ACK, &mdev->flags);
766 	request_ping(mdev);
767 	wait_event(mdev->misc_wait,
768 		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
769 }
770 
771 int drbd_resync_finished(struct drbd_conf *mdev)
772 {
773 	unsigned long db, dt, dbdt;
774 	unsigned long n_oos;
775 	union drbd_state os, ns;
776 	struct drbd_work *w;
777 	char *khelper_cmd = NULL;
778 	int verify_done = 0;
779 
780 	/* Remove all elements from the resync LRU. Since future actions
781 	 * might set bits in the (main) bitmap, then the entries in the
782 	 * resync LRU would be wrong. */
783 	if (drbd_rs_del_all(mdev)) {
784 		/* In case this is not possible now, most probably because
785 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
786 		 * queue (or even the read operations for those packets
787 		 * is not finished by now).   Retry in 100ms. */
788 
789 		__set_current_state(TASK_INTERRUPTIBLE);
790 		schedule_timeout(HZ / 10);
791 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
792 		if (w) {
793 			w->cb = w_resync_finished;
794 			drbd_queue_work(&mdev->data.work, w);
795 			return 1;
796 		}
797 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
798 	}
799 
800 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
801 	if (dt <= 0)
802 		dt = 1;
803 	db = mdev->rs_total;
804 	dbdt = Bit2KB(db/dt);
805 	mdev->rs_paused /= HZ;
806 
807 	if (!get_ldev(mdev))
808 		goto out;
809 
810 	ping_peer(mdev);
811 
812 	spin_lock_irq(&mdev->req_lock);
813 	os = mdev->state;
814 
815 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
816 
817 	/* This protects us against multiple calls (that can happen in the presence
818 	   of application IO), and against connectivity loss just before we arrive here. */
819 	if (os.conn <= C_CONNECTED)
820 		goto out_unlock;
821 
822 	ns = os;
823 	ns.conn = C_CONNECTED;
824 
825 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
826 	     verify_done ? "Online verify " : "Resync",
827 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
828 
829 	n_oos = drbd_bm_total_weight(mdev);
830 
831 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
832 		if (n_oos) {
833 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
834 			      n_oos, Bit2KB(1));
835 			khelper_cmd = "out-of-sync";
836 		}
837 	} else {
838 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
839 
840 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
841 			khelper_cmd = "after-resync-target";
842 
843 		if (mdev->csums_tfm && mdev->rs_total) {
844 			const unsigned long s = mdev->rs_same_csum;
845 			const unsigned long t = mdev->rs_total;
846 			const int ratio =
847 				(t == 0)     ? 0 :
848 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
849 			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
850 			     "transferred %luK total %luK\n",
851 			     ratio,
852 			     Bit2KB(mdev->rs_same_csum),
853 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
854 			     Bit2KB(mdev->rs_total));
855 		}
856 	}
857 
858 	if (mdev->rs_failed) {
859 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
860 
861 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
862 			ns.disk = D_INCONSISTENT;
863 			ns.pdsk = D_UP_TO_DATE;
864 		} else {
865 			ns.disk = D_UP_TO_DATE;
866 			ns.pdsk = D_INCONSISTENT;
867 		}
868 	} else {
869 		ns.disk = D_UP_TO_DATE;
870 		ns.pdsk = D_UP_TO_DATE;
871 
872 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
873 			if (mdev->p_uuid) {
874 				int i;
875 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
876 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
877 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
878 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
879 			} else {
880 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
881 			}
882 		}
883 
884 		drbd_uuid_set_bm(mdev, 0UL);
885 
886 		if (mdev->p_uuid) {
887 			/* Now the two UUID sets are equal, update what we
888 			 * know of the peer. */
889 			int i;
890 			for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
891 				mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
892 		}
893 	}
894 
895 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
896 out_unlock:
897 	spin_unlock_irq(&mdev->req_lock);
898 	put_ldev(mdev);
899 out:
900 	mdev->rs_total  = 0;
901 	mdev->rs_failed = 0;
902 	mdev->rs_paused = 0;
903 	if (verify_done)
904 		mdev->ov_start_sector = 0;
905 
906 	drbd_md_sync(mdev);
907 
908 	if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
909 		dev_info(DEV, "Writing the whole bitmap\n");
910 		drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
911 	}
912 
913 	if (khelper_cmd)
914 		drbd_khelper(mdev, khelper_cmd);
915 
916 	return 1;
917 }
918 
919 /* helper */
920 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
921 {
922 	if (drbd_ee_has_active_page(e)) {
923 		/* This might happen if sendpage() has not finished */
924 		int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
925 		atomic_add(i, &mdev->pp_in_use_by_net);
926 		atomic_sub(i, &mdev->pp_in_use);
927 		spin_lock_irq(&mdev->req_lock);
928 		list_add_tail(&e->w.list, &mdev->net_ee);
929 		spin_unlock_irq(&mdev->req_lock);
930 		wake_up(&drbd_pp_wait);
931 	} else
932 		drbd_free_ee(mdev, e);
933 }
934 
935 /**
936  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
937  * @mdev:	DRBD device.
938  * @w:		work object.
939  * @cancel:	The connection will be closed anyways
940  */
941 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
942 {
943 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
944 	int ok;
945 
946 	if (unlikely(cancel)) {
947 		drbd_free_ee(mdev, e);
948 		dec_unacked(mdev);
949 		return 1;
950 	}
951 
952 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
953 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
954 	} else {
955 		if (__ratelimit(&drbd_ratelimit_state))
956 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
957 			    (unsigned long long)e->sector);
958 
959 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
960 	}
961 
962 	dec_unacked(mdev);
963 
964 	move_to_net_ee_or_free(mdev, e);
965 
966 	if (unlikely(!ok))
967 		dev_err(DEV, "drbd_send_block() failed\n");
968 	return ok;
969 }
970 
971 /**
972  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
973  * @mdev:	DRBD device.
974  * @w:		work object.
975  * @cancel:	The connection will be closed anyways
976  */
977 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
978 {
979 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
980 	int ok;
981 
982 	if (unlikely(cancel)) {
983 		drbd_free_ee(mdev, e);
984 		dec_unacked(mdev);
985 		return 1;
986 	}
987 
988 	if (get_ldev_if_state(mdev, D_FAILED)) {
989 		drbd_rs_complete_io(mdev, e->sector);
990 		put_ldev(mdev);
991 	}
992 
993 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
994 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
995 			inc_rs_pending(mdev);
996 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
997 		} else {
998 			if (__ratelimit(&drbd_ratelimit_state))
999 				dev_err(DEV, "Not sending RSDataReply, "
1000 				    "partner DISKLESS!\n");
1001 			ok = 1;
1002 		}
1003 	} else {
1004 		if (__ratelimit(&drbd_ratelimit_state))
1005 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1006 			    (unsigned long long)e->sector);
1007 
1008 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1009 
1010 		/* update resync data with failure */
1011 		drbd_rs_failed_io(mdev, e->sector, e->size);
1012 	}
1013 
1014 	dec_unacked(mdev);
1015 
1016 	move_to_net_ee_or_free(mdev, e);
1017 
1018 	if (unlikely(!ok))
1019 		dev_err(DEV, "drbd_send_block() failed\n");
1020 	return ok;
1021 }
1022 
1023 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1024 {
1025 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1026 	struct digest_info *di;
1027 	int digest_size;
1028 	void *digest = NULL;
1029 	int ok, eq = 0;
1030 
1031 	if (unlikely(cancel)) {
1032 		drbd_free_ee(mdev, e);
1033 		dec_unacked(mdev);
1034 		return 1;
1035 	}
1036 
1037 	if (get_ldev(mdev)) {
1038 		drbd_rs_complete_io(mdev, e->sector);
1039 		put_ldev(mdev);
1040 	}
1041 
1042 	di = e->digest;
1043 
1044 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1045 		/* quick hack to try to avoid a race against reconfiguration.
1046 		 * a real fix would be much more involved,
1047 		 * introducing more locking mechanisms */
1048 		if (mdev->csums_tfm) {
1049 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1050 			D_ASSERT(digest_size == di->digest_size);
1051 			digest = kmalloc(digest_size, GFP_NOIO);
1052 		}
1053 		if (digest) {
1054 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1055 			eq = !memcmp(digest, di->digest, digest_size);
1056 			kfree(digest);
1057 		}
1058 
1059 		if (eq) {
1060 			drbd_set_in_sync(mdev, e->sector, e->size);
1061 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1062 			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1063 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1064 		} else {
1065 			inc_rs_pending(mdev);
1066 			e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1067 			e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1068 			kfree(di);
1069 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1070 		}
1071 	} else {
1072 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1073 		if (__ratelimit(&drbd_ratelimit_state))
1074 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1075 	}
1076 
1077 	dec_unacked(mdev);
1078 	move_to_net_ee_or_free(mdev, e);
1079 
1080 	if (unlikely(!ok))
1081 		dev_err(DEV, "drbd_send_block/ack() failed\n");
1082 	return ok;
1083 }
1084 
1085 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1086 {
1087 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1088 	int digest_size;
1089 	void *digest;
1090 	int ok = 1;
1091 
1092 	if (unlikely(cancel))
1093 		goto out;
1094 
1095 	if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1096 		goto out;
1097 
1098 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1099 	/* FIXME if this allocation fails, online verify will not terminate! */
1100 	digest = kmalloc(digest_size, GFP_NOIO);
1101 	if (digest) {
1102 		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1103 		inc_rs_pending(mdev);
1104 		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1105 					     digest, digest_size, P_OV_REPLY);
1106 		if (!ok)
1107 			dec_rs_pending(mdev);
1108 		kfree(digest);
1109 	}
1110 
1111 out:
1112 	drbd_free_ee(mdev, e);
1113 
1114 	dec_unacked(mdev);
1115 
1116 	return ok;
1117 }
1118 
1119 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1120 {
1121 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1122 		mdev->ov_last_oos_size += size>>9;
1123 	} else {
1124 		mdev->ov_last_oos_start = sector;
1125 		mdev->ov_last_oos_size = size>>9;
1126 	}
1127 	drbd_set_out_of_sync(mdev, sector, size);
1128 	set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1129 }
1130 
1131 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1132 {
1133 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1134 	struct digest_info *di;
1135 	int digest_size;
1136 	void *digest;
1137 	int ok, eq = 0;
1138 
1139 	if (unlikely(cancel)) {
1140 		drbd_free_ee(mdev, e);
1141 		dec_unacked(mdev);
1142 		return 1;
1143 	}
1144 
1145 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1146 	 * the resync lru has been cleaned up already */
1147 	if (get_ldev(mdev)) {
1148 		drbd_rs_complete_io(mdev, e->sector);
1149 		put_ldev(mdev);
1150 	}
1151 
1152 	di = e->digest;
1153 
1154 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1155 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1156 		digest = kmalloc(digest_size, GFP_NOIO);
1157 		if (digest) {
1158 			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1159 
1160 			D_ASSERT(digest_size == di->digest_size);
1161 			eq = !memcmp(digest, di->digest, digest_size);
1162 			kfree(digest);
1163 		}
1164 	} else {
1165 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1166 		if (__ratelimit(&drbd_ratelimit_state))
1167 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1168 	}
1169 
1170 	dec_unacked(mdev);
1171 	if (!eq)
1172 		drbd_ov_oos_found(mdev, e->sector, e->size);
1173 	else
1174 		ov_oos_print(mdev);
1175 
1176 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1177 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1178 
1179 	drbd_free_ee(mdev, e);
1180 
1181 	--mdev->ov_left;
1182 
1183 	/* let's advance progress step marks only for every other megabyte */
1184 	if ((mdev->ov_left & 0x200) == 0x200)
1185 		drbd_advance_rs_marks(mdev, mdev->ov_left);
1186 
1187 	if (mdev->ov_left == 0) {
1188 		ov_oos_print(mdev);
1189 		drbd_resync_finished(mdev);
1190 	}
1191 
1192 	return ok;
1193 }
1194 
1195 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1196 {
1197 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1198 	complete(&b->done);
1199 	return 1;
1200 }
1201 
1202 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1203 {
1204 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1205 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1206 	int ok = 1;
1207 
1208 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1209 	 * just before it was reassigned and re-queued, so double check that.
1210 	 * actually, this race was harmless, since we only try to send the
1211 	 * barrier packet here, and otherwise do nothing with the object.
1212 	 * but compare with the head of w_clear_epoch */
1213 	spin_lock_irq(&mdev->req_lock);
1214 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1215 		cancel = 1;
1216 	spin_unlock_irq(&mdev->req_lock);
1217 	if (cancel)
1218 		return 1;
1219 
1220 	if (!drbd_get_data_sock(mdev))
1221 		return 0;
1222 	p->barrier = b->br_number;
1223 	/* inc_ap_pending was done where this was queued.
1224 	 * dec_ap_pending will be done in got_BarrierAck
1225 	 * or (on connection loss) in w_clear_epoch.  */
1226 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1227 				(struct p_header80 *)p, sizeof(*p), 0);
1228 	drbd_put_data_sock(mdev);
1229 
1230 	return ok;
1231 }
1232 
1233 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1234 {
1235 	if (cancel)
1236 		return 1;
1237 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1238 }
1239 
1240 /**
1241  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1242  * @mdev:	DRBD device.
1243  * @w:		work object.
1244  * @cancel:	The connection will be closed anyways
1245  */
1246 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1247 {
1248 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1249 	int ok;
1250 
1251 	if (unlikely(cancel)) {
1252 		req_mod(req, send_canceled);
1253 		return 1;
1254 	}
1255 
1256 	ok = drbd_send_dblock(mdev, req);
1257 	req_mod(req, ok ? handed_over_to_network : send_failed);
1258 
1259 	return ok;
1260 }
1261 
1262 /**
1263  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1264  * @mdev:	DRBD device.
1265  * @w:		work object.
1266  * @cancel:	The connection will be closed anyways
1267  */
1268 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1269 {
1270 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1271 	int ok;
1272 
1273 	if (unlikely(cancel)) {
1274 		req_mod(req, send_canceled);
1275 		return 1;
1276 	}
1277 
1278 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1279 				(unsigned long)req);
1280 
1281 	if (!ok) {
1282 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1283 		 * so this is probably redundant */
1284 		if (mdev->state.conn >= C_CONNECTED)
1285 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1286 	}
1287 	req_mod(req, ok ? handed_over_to_network : send_failed);
1288 
1289 	return ok;
1290 }
1291 
1292 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1293 {
1294 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1295 
1296 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1297 		drbd_al_begin_io(mdev, req->sector);
1298 	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1299 	   theoretically. Practically it can not deadlock, since this is
1300 	   only used when unfreezing IOs. All the extents of the requests
1301 	   that made it into the TL are already active */
1302 
1303 	drbd_req_make_private_bio(req, req->master_bio);
1304 	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1305 	generic_make_request(req->private_bio);
1306 
1307 	return 1;
1308 }
1309 
1310 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1311 {
1312 	struct drbd_conf *odev = mdev;
1313 
1314 	while (1) {
1315 		if (odev->sync_conf.after == -1)
1316 			return 1;
1317 		odev = minor_to_mdev(odev->sync_conf.after);
1318 		ERR_IF(!odev) return 1;
1319 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1320 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1321 		    odev->state.aftr_isp || odev->state.peer_isp ||
1322 		    odev->state.user_isp)
1323 			return 0;
1324 	}
1325 }
1326 
1327 /**
1328  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1329  * @mdev:	DRBD device.
1330  *
1331  * Called from process context only (admin command and after_state_ch).
1332  */
1333 static int _drbd_pause_after(struct drbd_conf *mdev)
1334 {
1335 	struct drbd_conf *odev;
1336 	int i, rv = 0;
1337 
1338 	for (i = 0; i < minor_count; i++) {
1339 		odev = minor_to_mdev(i);
1340 		if (!odev)
1341 			continue;
1342 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1343 			continue;
1344 		if (!_drbd_may_sync_now(odev))
1345 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1346 			       != SS_NOTHING_TO_DO);
1347 	}
1348 
1349 	return rv;
1350 }
1351 
1352 /**
1353  * _drbd_resume_next() - Resume resync on all devices that may resync now
1354  * @mdev:	DRBD device.
1355  *
1356  * Called from process context only (admin command and worker).
1357  */
1358 static int _drbd_resume_next(struct drbd_conf *mdev)
1359 {
1360 	struct drbd_conf *odev;
1361 	int i, rv = 0;
1362 
1363 	for (i = 0; i < minor_count; i++) {
1364 		odev = minor_to_mdev(i);
1365 		if (!odev)
1366 			continue;
1367 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1368 			continue;
1369 		if (odev->state.aftr_isp) {
1370 			if (_drbd_may_sync_now(odev))
1371 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1372 							CS_HARD, NULL)
1373 				       != SS_NOTHING_TO_DO) ;
1374 		}
1375 	}
1376 	return rv;
1377 }
1378 
1379 void resume_next_sg(struct drbd_conf *mdev)
1380 {
1381 	write_lock_irq(&global_state_lock);
1382 	_drbd_resume_next(mdev);
1383 	write_unlock_irq(&global_state_lock);
1384 }
1385 
1386 void suspend_other_sg(struct drbd_conf *mdev)
1387 {
1388 	write_lock_irq(&global_state_lock);
1389 	_drbd_pause_after(mdev);
1390 	write_unlock_irq(&global_state_lock);
1391 }
1392 
1393 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1394 {
1395 	struct drbd_conf *odev;
1396 
1397 	if (o_minor == -1)
1398 		return NO_ERROR;
1399 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1400 		return ERR_SYNC_AFTER;
1401 
1402 	/* check for loops */
1403 	odev = minor_to_mdev(o_minor);
1404 	while (1) {
1405 		if (odev == mdev)
1406 			return ERR_SYNC_AFTER_CYCLE;
1407 
1408 		/* dependency chain ends here, no cycles. */
1409 		if (odev->sync_conf.after == -1)
1410 			return NO_ERROR;
1411 
1412 		/* follow the dependency chain */
1413 		odev = minor_to_mdev(odev->sync_conf.after);
1414 	}
1415 }
1416 
1417 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1418 {
1419 	int changes;
1420 	int retcode;
1421 
1422 	write_lock_irq(&global_state_lock);
1423 	retcode = sync_after_error(mdev, na);
1424 	if (retcode == NO_ERROR) {
1425 		mdev->sync_conf.after = na;
1426 		do {
1427 			changes  = _drbd_pause_after(mdev);
1428 			changes |= _drbd_resume_next(mdev);
1429 		} while (changes);
1430 	}
1431 	write_unlock_irq(&global_state_lock);
1432 	return retcode;
1433 }
1434 
1435 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1436 {
1437 	atomic_set(&mdev->rs_sect_in, 0);
1438 	atomic_set(&mdev->rs_sect_ev, 0);
1439 	mdev->rs_in_flight = 0;
1440 	mdev->rs_planed = 0;
1441 	spin_lock(&mdev->peer_seq_lock);
1442 	fifo_set(&mdev->rs_plan_s, 0);
1443 	spin_unlock(&mdev->peer_seq_lock);
1444 }
1445 
1446 /**
1447  * drbd_start_resync() - Start the resync process
1448  * @mdev:	DRBD device.
1449  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1450  *
1451  * This function might bring you directly into one of the
1452  * C_PAUSED_SYNC_* states.
1453  */
1454 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1455 {
1456 	union drbd_state ns;
1457 	int r;
1458 
1459 	if (mdev->state.conn >= C_SYNC_SOURCE) {
1460 		dev_err(DEV, "Resync already running!\n");
1461 		return;
1462 	}
1463 
1464 	/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1465 	drbd_rs_cancel_all(mdev);
1466 
1467 	if (side == C_SYNC_TARGET) {
1468 		/* Since application IO was locked out during C_WF_BITMAP_T and
1469 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1470 		   we check that we might make the data inconsistent. */
1471 		r = drbd_khelper(mdev, "before-resync-target");
1472 		r = (r >> 8) & 0xff;
1473 		if (r > 0) {
1474 			dev_info(DEV, "before-resync-target handler returned %d, "
1475 			     "dropping connection.\n", r);
1476 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1477 			return;
1478 		}
1479 	}
1480 
1481 	drbd_state_lock(mdev);
1482 
1483 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1484 		drbd_state_unlock(mdev);
1485 		return;
1486 	}
1487 
1488 	if (side == C_SYNC_TARGET) {
1489 		mdev->bm_resync_fo = 0;
1490 	} else /* side == C_SYNC_SOURCE */ {
1491 		u64 uuid;
1492 
1493 		get_random_bytes(&uuid, sizeof(u64));
1494 		drbd_uuid_set(mdev, UI_BITMAP, uuid);
1495 		drbd_send_sync_uuid(mdev, uuid);
1496 
1497 		D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1498 	}
1499 
1500 	write_lock_irq(&global_state_lock);
1501 	ns = mdev->state;
1502 
1503 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1504 
1505 	ns.conn = side;
1506 
1507 	if (side == C_SYNC_TARGET)
1508 		ns.disk = D_INCONSISTENT;
1509 	else /* side == C_SYNC_SOURCE */
1510 		ns.pdsk = D_INCONSISTENT;
1511 
1512 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1513 	ns = mdev->state;
1514 
1515 	if (ns.conn < C_CONNECTED)
1516 		r = SS_UNKNOWN_ERROR;
1517 
1518 	if (r == SS_SUCCESS) {
1519 		unsigned long tw = drbd_bm_total_weight(mdev);
1520 		unsigned long now = jiffies;
1521 		int i;
1522 
1523 		mdev->rs_failed    = 0;
1524 		mdev->rs_paused    = 0;
1525 		mdev->rs_same_csum = 0;
1526 		mdev->rs_last_events = 0;
1527 		mdev->rs_last_sect_ev = 0;
1528 		mdev->rs_total     = tw;
1529 		mdev->rs_start     = now;
1530 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1531 			mdev->rs_mark_left[i] = tw;
1532 			mdev->rs_mark_time[i] = now;
1533 		}
1534 		_drbd_pause_after(mdev);
1535 	}
1536 	write_unlock_irq(&global_state_lock);
1537 	put_ldev(mdev);
1538 
1539 	if (r == SS_SUCCESS) {
1540 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1541 		     drbd_conn_str(ns.conn),
1542 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1543 		     (unsigned long) mdev->rs_total);
1544 
1545 		if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1546 			/* This still has a race (about when exactly the peers
1547 			 * detect connection loss) that can lead to a full sync
1548 			 * on next handshake. In 8.3.9 we fixed this with explicit
1549 			 * resync-finished notifications, but the fix
1550 			 * introduces a protocol change.  Sleeping for some
1551 			 * time longer than the ping interval + timeout on the
1552 			 * SyncSource, to give the SyncTarget the chance to
1553 			 * detect connection loss, then waiting for a ping
1554 			 * response (implicit in drbd_resync_finished) reduces
1555 			 * the race considerably, but does not solve it. */
1556 			if (side == C_SYNC_SOURCE)
1557 				schedule_timeout_interruptible(
1558 					mdev->net_conf->ping_int * HZ +
1559 					mdev->net_conf->ping_timeo*HZ/9);
1560 			drbd_resync_finished(mdev);
1561 		}
1562 
1563 		drbd_rs_controller_reset(mdev);
1564 		/* ns.conn may already be != mdev->state.conn,
1565 		 * we may have been paused in between, or become paused until
1566 		 * the timer triggers.
1567 		 * No matter, that is handled in resync_timer_fn() */
1568 		if (ns.conn == C_SYNC_TARGET)
1569 			mod_timer(&mdev->resync_timer, jiffies);
1570 
1571 		drbd_md_sync(mdev);
1572 	}
1573 	drbd_state_unlock(mdev);
1574 }
1575 
1576 int drbd_worker(struct drbd_thread *thi)
1577 {
1578 	struct drbd_conf *mdev = thi->mdev;
1579 	struct drbd_work *w = NULL;
1580 	LIST_HEAD(work_list);
1581 	int intr = 0, i;
1582 
1583 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1584 
1585 	while (get_t_state(thi) == Running) {
1586 		drbd_thread_current_set_cpu(mdev);
1587 
1588 		if (down_trylock(&mdev->data.work.s)) {
1589 			mutex_lock(&mdev->data.mutex);
1590 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1591 				drbd_tcp_uncork(mdev->data.socket);
1592 			mutex_unlock(&mdev->data.mutex);
1593 
1594 			intr = down_interruptible(&mdev->data.work.s);
1595 
1596 			mutex_lock(&mdev->data.mutex);
1597 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1598 				drbd_tcp_cork(mdev->data.socket);
1599 			mutex_unlock(&mdev->data.mutex);
1600 		}
1601 
1602 		if (intr) {
1603 			D_ASSERT(intr == -EINTR);
1604 			flush_signals(current);
1605 			ERR_IF (get_t_state(thi) == Running)
1606 				continue;
1607 			break;
1608 		}
1609 
1610 		if (get_t_state(thi) != Running)
1611 			break;
1612 		/* With this break, we have done a down() but not consumed
1613 		   the entry from the list. The cleanup code takes care of
1614 		   this...   */
1615 
1616 		w = NULL;
1617 		spin_lock_irq(&mdev->data.work.q_lock);
1618 		ERR_IF(list_empty(&mdev->data.work.q)) {
1619 			/* something terribly wrong in our logic.
1620 			 * we were able to down() the semaphore,
1621 			 * but the list is empty... doh.
1622 			 *
1623 			 * what is the best thing to do now?
1624 			 * try again from scratch, restarting the receiver,
1625 			 * asender, whatnot? could break even more ugly,
1626 			 * e.g. when we are primary, but no good local data.
1627 			 *
1628 			 * I'll try to get away just starting over this loop.
1629 			 */
1630 			spin_unlock_irq(&mdev->data.work.q_lock);
1631 			continue;
1632 		}
1633 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1634 		list_del_init(&w->list);
1635 		spin_unlock_irq(&mdev->data.work.q_lock);
1636 
1637 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1638 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1639 			if (mdev->state.conn >= C_CONNECTED)
1640 				drbd_force_state(mdev,
1641 						NS(conn, C_NETWORK_FAILURE));
1642 		}
1643 	}
1644 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1645 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1646 
1647 	spin_lock_irq(&mdev->data.work.q_lock);
1648 	i = 0;
1649 	while (!list_empty(&mdev->data.work.q)) {
1650 		list_splice_init(&mdev->data.work.q, &work_list);
1651 		spin_unlock_irq(&mdev->data.work.q_lock);
1652 
1653 		while (!list_empty(&work_list)) {
1654 			w = list_entry(work_list.next, struct drbd_work, list);
1655 			list_del_init(&w->list);
1656 			w->cb(mdev, w, 1);
1657 			i++; /* dead debugging code */
1658 		}
1659 
1660 		spin_lock_irq(&mdev->data.work.q_lock);
1661 	}
1662 	sema_init(&mdev->data.work.s, 0);
1663 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1664 	 * but up() ed outside the spinlock, we could get an up() on the
1665 	 * semaphore without corresponding list entry.
1666 	 * So don't do that.
1667 	 */
1668 	spin_unlock_irq(&mdev->data.work.q_lock);
1669 
1670 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1671 	/* _drbd_set_state only uses stop_nowait.
1672 	 * wait here for the Exiting receiver. */
1673 	drbd_thread_stop(&mdev->receiver);
1674 	drbd_mdev_cleanup(mdev);
1675 
1676 	dev_info(DEV, "worker terminated\n");
1677 
1678 	clear_bit(DEVICE_DYING, &mdev->flags);
1679 	clear_bit(CONFIG_PENDING, &mdev->flags);
1680 	wake_up(&mdev->state_wait);
1681 
1682 	return 0;
1683 }
1684