xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision 73a01a18)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40 
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 
43 
44 
45 /* defined here:
46    drbd_md_io_complete
47    drbd_endio_sec
48    drbd_endio_pri
49 
50  * more endio handlers:
51    atodb_endio in drbd_actlog.c
52    drbd_bm_async_io_complete in drbd_bitmap.c
53 
54  * For all these callbacks, note the following:
55  * The callbacks will be called in irq context by the IDE drivers,
56  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
57  * Try to get the locking right :)
58  *
59  */
60 
61 
62 /* About the global_state_lock
63    Each state transition on an device holds a read lock. In case we have
64    to evaluate the sync after dependencies, we grab a write lock, because
65    we need stable states on all devices for that.  */
66 rwlock_t global_state_lock;
67 
68 /* used for synchronous meta data and bitmap IO
69  * submitted by drbd_md_sync_page_io()
70  */
71 void drbd_md_io_complete(struct bio *bio, int error)
72 {
73 	struct drbd_md_io *md_io;
74 
75 	md_io = (struct drbd_md_io *)bio->bi_private;
76 	md_io->error = error;
77 
78 	complete(&md_io->event);
79 }
80 
81 /* reads on behalf of the partner,
82  * "submitted" by the receiver
83  */
84 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
85 {
86 	unsigned long flags = 0;
87 	struct drbd_conf *mdev = e->mdev;
88 
89 	D_ASSERT(e->block_id != ID_VACANT);
90 
91 	spin_lock_irqsave(&mdev->req_lock, flags);
92 	mdev->read_cnt += e->size >> 9;
93 	list_del(&e->w.list);
94 	if (list_empty(&mdev->read_ee))
95 		wake_up(&mdev->ee_wait);
96 	if (test_bit(__EE_WAS_ERROR, &e->flags))
97 		__drbd_chk_io_error(mdev, FALSE);
98 	spin_unlock_irqrestore(&mdev->req_lock, flags);
99 
100 	drbd_queue_work(&mdev->data.work, &e->w);
101 	put_ldev(mdev);
102 }
103 
104 /* writes on behalf of the partner, or resync writes,
105  * "submitted" by the receiver, final stage.  */
106 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
107 {
108 	unsigned long flags = 0;
109 	struct drbd_conf *mdev = e->mdev;
110 	sector_t e_sector;
111 	int do_wake;
112 	int is_syncer_req;
113 	int do_al_complete_io;
114 
115 	D_ASSERT(e->block_id != ID_VACANT);
116 
117 	/* after we moved e to done_ee,
118 	 * we may no longer access it,
119 	 * it may be freed/reused already!
120 	 * (as soon as we release the req_lock) */
121 	e_sector = e->sector;
122 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
123 	is_syncer_req = is_syncer_block_id(e->block_id);
124 
125 	spin_lock_irqsave(&mdev->req_lock, flags);
126 	mdev->writ_cnt += e->size >> 9;
127 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
128 	list_add_tail(&e->w.list, &mdev->done_ee);
129 
130 	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
131 	 * neither did we wake possibly waiting conflicting requests.
132 	 * done from "drbd_process_done_ee" within the appropriate w.cb
133 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
134 
135 	do_wake = is_syncer_req
136 		? list_empty(&mdev->sync_ee)
137 		: list_empty(&mdev->active_ee);
138 
139 	if (test_bit(__EE_WAS_ERROR, &e->flags))
140 		__drbd_chk_io_error(mdev, FALSE);
141 	spin_unlock_irqrestore(&mdev->req_lock, flags);
142 
143 	if (is_syncer_req)
144 		drbd_rs_complete_io(mdev, e_sector);
145 
146 	if (do_wake)
147 		wake_up(&mdev->ee_wait);
148 
149 	if (do_al_complete_io)
150 		drbd_al_complete_io(mdev, e_sector);
151 
152 	wake_asender(mdev);
153 	put_ldev(mdev);
154 }
155 
156 /* writes on behalf of the partner, or resync writes,
157  * "submitted" by the receiver.
158  */
159 void drbd_endio_sec(struct bio *bio, int error)
160 {
161 	struct drbd_epoch_entry *e = bio->bi_private;
162 	struct drbd_conf *mdev = e->mdev;
163 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
164 	int is_write = bio_data_dir(bio) == WRITE;
165 
166 	if (error)
167 		dev_warn(DEV, "%s: error=%d s=%llus\n",
168 				is_write ? "write" : "read", error,
169 				(unsigned long long)e->sector);
170 	if (!error && !uptodate) {
171 		dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
172 				is_write ? "write" : "read",
173 				(unsigned long long)e->sector);
174 		/* strange behavior of some lower level drivers...
175 		 * fail the request by clearing the uptodate flag,
176 		 * but do not return any error?! */
177 		error = -EIO;
178 	}
179 
180 	if (error)
181 		set_bit(__EE_WAS_ERROR, &e->flags);
182 
183 	bio_put(bio); /* no need for the bio anymore */
184 	if (atomic_dec_and_test(&e->pending_bios)) {
185 		if (is_write)
186 			drbd_endio_write_sec_final(e);
187 		else
188 			drbd_endio_read_sec_final(e);
189 	}
190 }
191 
192 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
193  */
194 void drbd_endio_pri(struct bio *bio, int error)
195 {
196 	unsigned long flags;
197 	struct drbd_request *req = bio->bi_private;
198 	struct drbd_conf *mdev = req->mdev;
199 	struct bio_and_error m;
200 	enum drbd_req_event what;
201 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
202 
203 	if (!error && !uptodate) {
204 		dev_warn(DEV, "p %s: setting error to -EIO\n",
205 			 bio_data_dir(bio) == WRITE ? "write" : "read");
206 		/* strange behavior of some lower level drivers...
207 		 * fail the request by clearing the uptodate flag,
208 		 * but do not return any error?! */
209 		error = -EIO;
210 	}
211 
212 	/* to avoid recursion in __req_mod */
213 	if (unlikely(error)) {
214 		what = (bio_data_dir(bio) == WRITE)
215 			? write_completed_with_error
216 			: (bio_rw(bio) == READ)
217 			  ? read_completed_with_error
218 			  : read_ahead_completed_with_error;
219 	} else
220 		what = completed_ok;
221 
222 	bio_put(req->private_bio);
223 	req->private_bio = ERR_PTR(error);
224 
225 	/* not req_mod(), we need irqsave here! */
226 	spin_lock_irqsave(&mdev->req_lock, flags);
227 	__req_mod(req, what, &m);
228 	spin_unlock_irqrestore(&mdev->req_lock, flags);
229 
230 	if (m.bio)
231 		complete_master_bio(mdev, &m);
232 }
233 
234 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
235 {
236 	struct drbd_request *req = container_of(w, struct drbd_request, w);
237 
238 	/* We should not detach for read io-error,
239 	 * but try to WRITE the P_DATA_REPLY to the failed location,
240 	 * to give the disk the chance to relocate that block */
241 
242 	spin_lock_irq(&mdev->req_lock);
243 	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
244 		_req_mod(req, read_retry_remote_canceled);
245 		spin_unlock_irq(&mdev->req_lock);
246 		return 1;
247 	}
248 	spin_unlock_irq(&mdev->req_lock);
249 
250 	return w_send_read_req(mdev, w, 0);
251 }
252 
253 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
254 {
255 	ERR_IF(cancel) return 1;
256 	dev_err(DEV, "resync inactive, but callback triggered??\n");
257 	return 1; /* Simply ignore this! */
258 }
259 
260 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
261 {
262 	struct hash_desc desc;
263 	struct scatterlist sg;
264 	struct page *page = e->pages;
265 	struct page *tmp;
266 	unsigned len;
267 
268 	desc.tfm = tfm;
269 	desc.flags = 0;
270 
271 	sg_init_table(&sg, 1);
272 	crypto_hash_init(&desc);
273 
274 	while ((tmp = page_chain_next(page))) {
275 		/* all but the last page will be fully used */
276 		sg_set_page(&sg, page, PAGE_SIZE, 0);
277 		crypto_hash_update(&desc, &sg, sg.length);
278 		page = tmp;
279 	}
280 	/* and now the last, possibly only partially used page */
281 	len = e->size & (PAGE_SIZE - 1);
282 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
283 	crypto_hash_update(&desc, &sg, sg.length);
284 	crypto_hash_final(&desc, digest);
285 }
286 
287 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
288 {
289 	struct hash_desc desc;
290 	struct scatterlist sg;
291 	struct bio_vec *bvec;
292 	int i;
293 
294 	desc.tfm = tfm;
295 	desc.flags = 0;
296 
297 	sg_init_table(&sg, 1);
298 	crypto_hash_init(&desc);
299 
300 	__bio_for_each_segment(bvec, bio, i, 0) {
301 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
302 		crypto_hash_update(&desc, &sg, sg.length);
303 	}
304 	crypto_hash_final(&desc, digest);
305 }
306 
307 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
308 {
309 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
310 	int digest_size;
311 	void *digest;
312 	int ok;
313 
314 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
315 
316 	if (unlikely(cancel)) {
317 		drbd_free_ee(mdev, e);
318 		return 1;
319 	}
320 
321 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
322 		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
323 		digest = kmalloc(digest_size, GFP_NOIO);
324 		if (digest) {
325 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
326 
327 			inc_rs_pending(mdev);
328 			ok = drbd_send_drequest_csum(mdev,
329 						     e->sector,
330 						     e->size,
331 						     digest,
332 						     digest_size,
333 						     P_CSUM_RS_REQUEST);
334 			kfree(digest);
335 		} else {
336 			dev_err(DEV, "kmalloc() of digest failed.\n");
337 			ok = 0;
338 		}
339 	} else
340 		ok = 1;
341 
342 	drbd_free_ee(mdev, e);
343 
344 	if (unlikely(!ok))
345 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
346 	return ok;
347 }
348 
349 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
350 
351 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
352 {
353 	struct drbd_epoch_entry *e;
354 
355 	if (!get_ldev(mdev))
356 		return -EIO;
357 
358 	if (drbd_rs_should_slow_down(mdev))
359 		goto defer;
360 
361 	/* GFP_TRY, because if there is no memory available right now, this may
362 	 * be rescheduled for later. It is "only" background resync, after all. */
363 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
364 	if (!e)
365 		goto defer;
366 
367 	e->w.cb = w_e_send_csum;
368 	spin_lock_irq(&mdev->req_lock);
369 	list_add(&e->w.list, &mdev->read_ee);
370 	spin_unlock_irq(&mdev->req_lock);
371 
372 	atomic_add(size >> 9, &mdev->rs_sect_ev);
373 	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
374 		return 0;
375 
376 	/* drbd_submit_ee currently fails for one reason only:
377 	 * not being able to allocate enough bios.
378 	 * Is dropping the connection going to help? */
379 	spin_lock_irq(&mdev->req_lock);
380 	list_del(&e->w.list);
381 	spin_unlock_irq(&mdev->req_lock);
382 
383 	drbd_free_ee(mdev, e);
384 defer:
385 	put_ldev(mdev);
386 	return -EAGAIN;
387 }
388 
389 void resync_timer_fn(unsigned long data)
390 {
391 	struct drbd_conf *mdev = (struct drbd_conf *) data;
392 	int queue;
393 
394 	queue = 1;
395 	switch (mdev->state.conn) {
396 	case C_VERIFY_S:
397 		mdev->resync_work.cb = w_make_ov_request;
398 		break;
399 	case C_SYNC_TARGET:
400 		mdev->resync_work.cb = w_make_resync_request;
401 		break;
402 	default:
403 		queue = 0;
404 		mdev->resync_work.cb = w_resync_inactive;
405 	}
406 
407 	/* harmless race: list_empty outside data.work.q_lock */
408 	if (list_empty(&mdev->resync_work.list) && queue)
409 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
410 }
411 
412 static void fifo_set(struct fifo_buffer *fb, int value)
413 {
414 	int i;
415 
416 	for (i = 0; i < fb->size; i++)
417 		fb->values[i] = value;
418 }
419 
420 static int fifo_push(struct fifo_buffer *fb, int value)
421 {
422 	int ov;
423 
424 	ov = fb->values[fb->head_index];
425 	fb->values[fb->head_index++] = value;
426 
427 	if (fb->head_index >= fb->size)
428 		fb->head_index = 0;
429 
430 	return ov;
431 }
432 
433 static void fifo_add_val(struct fifo_buffer *fb, int value)
434 {
435 	int i;
436 
437 	for (i = 0; i < fb->size; i++)
438 		fb->values[i] += value;
439 }
440 
441 int drbd_rs_controller(struct drbd_conf *mdev)
442 {
443 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
444 	unsigned int want;     /* The number of sectors we want in the proxy */
445 	int req_sect; /* Number of sectors to request in this turn */
446 	int correction; /* Number of sectors more we need in the proxy*/
447 	int cps; /* correction per invocation of drbd_rs_controller() */
448 	int steps; /* Number of time steps to plan ahead */
449 	int curr_corr;
450 	int max_sect;
451 
452 	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
453 	mdev->rs_in_flight -= sect_in;
454 
455 	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
456 
457 	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
458 
459 	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
460 		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
461 	} else { /* normal path */
462 		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
463 			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
464 	}
465 
466 	correction = want - mdev->rs_in_flight - mdev->rs_planed;
467 
468 	/* Plan ahead */
469 	cps = correction / steps;
470 	fifo_add_val(&mdev->rs_plan_s, cps);
471 	mdev->rs_planed += cps * steps;
472 
473 	/* What we do in this step */
474 	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
475 	spin_unlock(&mdev->peer_seq_lock);
476 	mdev->rs_planed -= curr_corr;
477 
478 	req_sect = sect_in + curr_corr;
479 	if (req_sect < 0)
480 		req_sect = 0;
481 
482 	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
483 	if (req_sect > max_sect)
484 		req_sect = max_sect;
485 
486 	/*
487 	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
488 		 sect_in, mdev->rs_in_flight, want, correction,
489 		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
490 	*/
491 
492 	return req_sect;
493 }
494 
495 int drbd_rs_number_requests(struct drbd_conf *mdev)
496 {
497 	int number;
498 	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
499 		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
500 		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
501 	} else {
502 		mdev->c_sync_rate = mdev->sync_conf.rate;
503 		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
504 	}
505 
506 	/* Throttle resync on lower level disk activity, which may also be
507 	 * caused by application IO on Primary/SyncTarget.
508 	 * Keep this after the call to drbd_rs_controller, as that assumes
509 	 * to be called as precisely as possible every SLEEP_TIME,
510 	 * and would be confused otherwise. */
511 	if (number && drbd_rs_should_slow_down(mdev)) {
512 		mdev->c_sync_rate = 1;
513 		number = 0;
514 	}
515 
516 	/* ignore the amount of pending requests, the resync controller should
517 	 * throttle down to incoming reply rate soon enough anyways. */
518 	return number;
519 }
520 
521 int w_make_resync_request(struct drbd_conf *mdev,
522 		struct drbd_work *w, int cancel)
523 {
524 	unsigned long bit;
525 	sector_t sector;
526 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
527 	int max_bio_size;
528 	int number, rollback_i, size;
529 	int align, queued, sndbuf;
530 	int i = 0;
531 
532 	if (unlikely(cancel))
533 		return 1;
534 
535 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
536 		dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
537 		return 0;
538 	}
539 
540 	if (mdev->state.conn != C_SYNC_TARGET)
541 		dev_err(DEV, "%s in w_make_resync_request\n",
542 			drbd_conn_str(mdev->state.conn));
543 
544 	if (mdev->rs_total == 0) {
545 		/* empty resync? */
546 		drbd_resync_finished(mdev);
547 		return 1;
548 	}
549 
550 	if (!get_ldev(mdev)) {
551 		/* Since we only need to access mdev->rsync a
552 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
553 		   to continue resync with a broken disk makes no sense at
554 		   all */
555 		dev_err(DEV, "Disk broke down during resync!\n");
556 		mdev->resync_work.cb = w_resync_inactive;
557 		return 1;
558 	}
559 
560 	/* starting with drbd 8.3.8, we can handle multi-bio EEs,
561 	 * if it should be necessary */
562 	max_bio_size =
563 		mdev->agreed_pro_version < 94 ? queue_max_hw_sectors(mdev->rq_queue) << 9 :
564 		mdev->agreed_pro_version < 95 ?	DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_BIO_SIZE;
565 
566 	number = drbd_rs_number_requests(mdev);
567 	if (number == 0)
568 		goto requeue;
569 
570 	for (i = 0; i < number; i++) {
571 		/* Stop generating RS requests, when half of the send buffer is filled */
572 		mutex_lock(&mdev->data.mutex);
573 		if (mdev->data.socket) {
574 			queued = mdev->data.socket->sk->sk_wmem_queued;
575 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
576 		} else {
577 			queued = 1;
578 			sndbuf = 0;
579 		}
580 		mutex_unlock(&mdev->data.mutex);
581 		if (queued > sndbuf / 2)
582 			goto requeue;
583 
584 next_sector:
585 		size = BM_BLOCK_SIZE;
586 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
587 
588 		if (bit == -1UL) {
589 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
590 			mdev->resync_work.cb = w_resync_inactive;
591 			put_ldev(mdev);
592 			return 1;
593 		}
594 
595 		sector = BM_BIT_TO_SECT(bit);
596 
597 		if (drbd_try_rs_begin_io(mdev, sector)) {
598 			mdev->bm_resync_fo = bit;
599 			goto requeue;
600 		}
601 		mdev->bm_resync_fo = bit + 1;
602 
603 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
604 			drbd_rs_complete_io(mdev, sector);
605 			goto next_sector;
606 		}
607 
608 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
609 		/* try to find some adjacent bits.
610 		 * we stop if we have already the maximum req size.
611 		 *
612 		 * Additionally always align bigger requests, in order to
613 		 * be prepared for all stripe sizes of software RAIDs.
614 		 */
615 		align = 1;
616 		rollback_i = i;
617 		for (;;) {
618 			if (size + BM_BLOCK_SIZE > max_bio_size)
619 				break;
620 
621 			/* Be always aligned */
622 			if (sector & ((1<<(align+3))-1))
623 				break;
624 
625 			/* do not cross extent boundaries */
626 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
627 				break;
628 			/* now, is it actually dirty, after all?
629 			 * caution, drbd_bm_test_bit is tri-state for some
630 			 * obscure reason; ( b == 0 ) would get the out-of-band
631 			 * only accidentally right because of the "oddly sized"
632 			 * adjustment below */
633 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
634 				break;
635 			bit++;
636 			size += BM_BLOCK_SIZE;
637 			if ((BM_BLOCK_SIZE << align) <= size)
638 				align++;
639 			i++;
640 		}
641 		/* if we merged some,
642 		 * reset the offset to start the next drbd_bm_find_next from */
643 		if (size > BM_BLOCK_SIZE)
644 			mdev->bm_resync_fo = bit + 1;
645 #endif
646 
647 		/* adjust very last sectors, in case we are oddly sized */
648 		if (sector + (size>>9) > capacity)
649 			size = (capacity-sector)<<9;
650 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
651 			switch (read_for_csum(mdev, sector, size)) {
652 			case -EIO: /* Disk failure */
653 				put_ldev(mdev);
654 				return 0;
655 			case -EAGAIN: /* allocation failed, or ldev busy */
656 				drbd_rs_complete_io(mdev, sector);
657 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
658 				i = rollback_i;
659 				goto requeue;
660 			case 0:
661 				/* everything ok */
662 				break;
663 			default:
664 				BUG();
665 			}
666 		} else {
667 			inc_rs_pending(mdev);
668 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
669 					       sector, size, ID_SYNCER)) {
670 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
671 				dec_rs_pending(mdev);
672 				put_ldev(mdev);
673 				return 0;
674 			}
675 		}
676 	}
677 
678 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
679 		/* last syncer _request_ was sent,
680 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
681 		 * next sync group will resume), as soon as we receive the last
682 		 * resync data block, and the last bit is cleared.
683 		 * until then resync "work" is "inactive" ...
684 		 */
685 		mdev->resync_work.cb = w_resync_inactive;
686 		put_ldev(mdev);
687 		return 1;
688 	}
689 
690  requeue:
691 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
692 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
693 	put_ldev(mdev);
694 	return 1;
695 }
696 
697 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
698 {
699 	int number, i, size;
700 	sector_t sector;
701 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
702 
703 	if (unlikely(cancel))
704 		return 1;
705 
706 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
707 		dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
708 		return 0;
709 	}
710 
711 	number = drbd_rs_number_requests(mdev);
712 
713 	sector = mdev->ov_position;
714 	for (i = 0; i < number; i++) {
715 		if (sector >= capacity) {
716 			mdev->resync_work.cb = w_resync_inactive;
717 			return 1;
718 		}
719 
720 		size = BM_BLOCK_SIZE;
721 
722 		if (drbd_try_rs_begin_io(mdev, sector)) {
723 			mdev->ov_position = sector;
724 			goto requeue;
725 		}
726 
727 		if (sector + (size>>9) > capacity)
728 			size = (capacity-sector)<<9;
729 
730 		inc_rs_pending(mdev);
731 		if (!drbd_send_ov_request(mdev, sector, size)) {
732 			dec_rs_pending(mdev);
733 			return 0;
734 		}
735 		sector += BM_SECT_PER_BIT;
736 	}
737 	mdev->ov_position = sector;
738 
739  requeue:
740 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
741 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
742 	return 1;
743 }
744 
745 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
746 {
747 	kfree(w);
748 	ov_oos_print(mdev);
749 	drbd_resync_finished(mdev);
750 
751 	return 1;
752 }
753 
754 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
755 {
756 	kfree(w);
757 
758 	drbd_resync_finished(mdev);
759 
760 	return 1;
761 }
762 
763 static void ping_peer(struct drbd_conf *mdev)
764 {
765 	clear_bit(GOT_PING_ACK, &mdev->flags);
766 	request_ping(mdev);
767 	wait_event(mdev->misc_wait,
768 		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
769 }
770 
771 int drbd_resync_finished(struct drbd_conf *mdev)
772 {
773 	unsigned long db, dt, dbdt;
774 	unsigned long n_oos;
775 	union drbd_state os, ns;
776 	struct drbd_work *w;
777 	char *khelper_cmd = NULL;
778 	int verify_done = 0;
779 
780 	/* Remove all elements from the resync LRU. Since future actions
781 	 * might set bits in the (main) bitmap, then the entries in the
782 	 * resync LRU would be wrong. */
783 	if (drbd_rs_del_all(mdev)) {
784 		/* In case this is not possible now, most probably because
785 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
786 		 * queue (or even the read operations for those packets
787 		 * is not finished by now).   Retry in 100ms. */
788 
789 		__set_current_state(TASK_INTERRUPTIBLE);
790 		schedule_timeout(HZ / 10);
791 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
792 		if (w) {
793 			w->cb = w_resync_finished;
794 			drbd_queue_work(&mdev->data.work, w);
795 			return 1;
796 		}
797 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
798 	}
799 
800 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
801 	if (dt <= 0)
802 		dt = 1;
803 	db = mdev->rs_total;
804 	dbdt = Bit2KB(db/dt);
805 	mdev->rs_paused /= HZ;
806 
807 	if (!get_ldev(mdev))
808 		goto out;
809 
810 	ping_peer(mdev);
811 
812 	spin_lock_irq(&mdev->req_lock);
813 	os = mdev->state;
814 
815 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
816 
817 	/* This protects us against multiple calls (that can happen in the presence
818 	   of application IO), and against connectivity loss just before we arrive here. */
819 	if (os.conn <= C_CONNECTED)
820 		goto out_unlock;
821 
822 	ns = os;
823 	ns.conn = C_CONNECTED;
824 
825 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
826 	     verify_done ? "Online verify " : "Resync",
827 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
828 
829 	n_oos = drbd_bm_total_weight(mdev);
830 
831 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
832 		if (n_oos) {
833 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
834 			      n_oos, Bit2KB(1));
835 			khelper_cmd = "out-of-sync";
836 		}
837 	} else {
838 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
839 
840 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
841 			khelper_cmd = "after-resync-target";
842 
843 		if (mdev->csums_tfm && mdev->rs_total) {
844 			const unsigned long s = mdev->rs_same_csum;
845 			const unsigned long t = mdev->rs_total;
846 			const int ratio =
847 				(t == 0)     ? 0 :
848 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
849 			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
850 			     "transferred %luK total %luK\n",
851 			     ratio,
852 			     Bit2KB(mdev->rs_same_csum),
853 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
854 			     Bit2KB(mdev->rs_total));
855 		}
856 	}
857 
858 	if (mdev->rs_failed) {
859 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
860 
861 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
862 			ns.disk = D_INCONSISTENT;
863 			ns.pdsk = D_UP_TO_DATE;
864 		} else {
865 			ns.disk = D_UP_TO_DATE;
866 			ns.pdsk = D_INCONSISTENT;
867 		}
868 	} else {
869 		ns.disk = D_UP_TO_DATE;
870 		ns.pdsk = D_UP_TO_DATE;
871 
872 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
873 			if (mdev->p_uuid) {
874 				int i;
875 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
876 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
877 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
878 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
879 			} else {
880 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
881 			}
882 		}
883 
884 		drbd_uuid_set_bm(mdev, 0UL);
885 
886 		if (mdev->p_uuid) {
887 			/* Now the two UUID sets are equal, update what we
888 			 * know of the peer. */
889 			int i;
890 			for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
891 				mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
892 		}
893 	}
894 
895 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
896 out_unlock:
897 	spin_unlock_irq(&mdev->req_lock);
898 	put_ldev(mdev);
899 out:
900 	mdev->rs_total  = 0;
901 	mdev->rs_failed = 0;
902 	mdev->rs_paused = 0;
903 	if (verify_done)
904 		mdev->ov_start_sector = 0;
905 
906 	drbd_md_sync(mdev);
907 
908 	if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
909 		dev_info(DEV, "Writing the whole bitmap\n");
910 		drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
911 	}
912 
913 	if (khelper_cmd)
914 		drbd_khelper(mdev, khelper_cmd);
915 
916 	return 1;
917 }
918 
919 /* helper */
920 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
921 {
922 	if (drbd_ee_has_active_page(e)) {
923 		/* This might happen if sendpage() has not finished */
924 		int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
925 		atomic_add(i, &mdev->pp_in_use_by_net);
926 		atomic_sub(i, &mdev->pp_in_use);
927 		spin_lock_irq(&mdev->req_lock);
928 		list_add_tail(&e->w.list, &mdev->net_ee);
929 		spin_unlock_irq(&mdev->req_lock);
930 		wake_up(&drbd_pp_wait);
931 	} else
932 		drbd_free_ee(mdev, e);
933 }
934 
935 /**
936  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
937  * @mdev:	DRBD device.
938  * @w:		work object.
939  * @cancel:	The connection will be closed anyways
940  */
941 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
942 {
943 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
944 	int ok;
945 
946 	if (unlikely(cancel)) {
947 		drbd_free_ee(mdev, e);
948 		dec_unacked(mdev);
949 		return 1;
950 	}
951 
952 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
953 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
954 	} else {
955 		if (__ratelimit(&drbd_ratelimit_state))
956 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
957 			    (unsigned long long)e->sector);
958 
959 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
960 	}
961 
962 	dec_unacked(mdev);
963 
964 	move_to_net_ee_or_free(mdev, e);
965 
966 	if (unlikely(!ok))
967 		dev_err(DEV, "drbd_send_block() failed\n");
968 	return ok;
969 }
970 
971 /**
972  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
973  * @mdev:	DRBD device.
974  * @w:		work object.
975  * @cancel:	The connection will be closed anyways
976  */
977 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
978 {
979 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
980 	int ok;
981 
982 	if (unlikely(cancel)) {
983 		drbd_free_ee(mdev, e);
984 		dec_unacked(mdev);
985 		return 1;
986 	}
987 
988 	if (get_ldev_if_state(mdev, D_FAILED)) {
989 		drbd_rs_complete_io(mdev, e->sector);
990 		put_ldev(mdev);
991 	}
992 
993 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
994 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
995 			inc_rs_pending(mdev);
996 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
997 		} else {
998 			if (__ratelimit(&drbd_ratelimit_state))
999 				dev_err(DEV, "Not sending RSDataReply, "
1000 				    "partner DISKLESS!\n");
1001 			ok = 1;
1002 		}
1003 	} else {
1004 		if (__ratelimit(&drbd_ratelimit_state))
1005 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1006 			    (unsigned long long)e->sector);
1007 
1008 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1009 
1010 		/* update resync data with failure */
1011 		drbd_rs_failed_io(mdev, e->sector, e->size);
1012 	}
1013 
1014 	dec_unacked(mdev);
1015 
1016 	move_to_net_ee_or_free(mdev, e);
1017 
1018 	if (unlikely(!ok))
1019 		dev_err(DEV, "drbd_send_block() failed\n");
1020 	return ok;
1021 }
1022 
1023 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1024 {
1025 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1026 	struct digest_info *di;
1027 	int digest_size;
1028 	void *digest = NULL;
1029 	int ok, eq = 0;
1030 
1031 	if (unlikely(cancel)) {
1032 		drbd_free_ee(mdev, e);
1033 		dec_unacked(mdev);
1034 		return 1;
1035 	}
1036 
1037 	if (get_ldev(mdev)) {
1038 		drbd_rs_complete_io(mdev, e->sector);
1039 		put_ldev(mdev);
1040 	}
1041 
1042 	di = e->digest;
1043 
1044 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1045 		/* quick hack to try to avoid a race against reconfiguration.
1046 		 * a real fix would be much more involved,
1047 		 * introducing more locking mechanisms */
1048 		if (mdev->csums_tfm) {
1049 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1050 			D_ASSERT(digest_size == di->digest_size);
1051 			digest = kmalloc(digest_size, GFP_NOIO);
1052 		}
1053 		if (digest) {
1054 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1055 			eq = !memcmp(digest, di->digest, digest_size);
1056 			kfree(digest);
1057 		}
1058 
1059 		if (eq) {
1060 			drbd_set_in_sync(mdev, e->sector, e->size);
1061 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1062 			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1063 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1064 		} else {
1065 			inc_rs_pending(mdev);
1066 			e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1067 			e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1068 			kfree(di);
1069 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1070 		}
1071 	} else {
1072 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1073 		if (__ratelimit(&drbd_ratelimit_state))
1074 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1075 	}
1076 
1077 	dec_unacked(mdev);
1078 	move_to_net_ee_or_free(mdev, e);
1079 
1080 	if (unlikely(!ok))
1081 		dev_err(DEV, "drbd_send_block/ack() failed\n");
1082 	return ok;
1083 }
1084 
1085 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1086 {
1087 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1088 	int digest_size;
1089 	void *digest;
1090 	int ok = 1;
1091 
1092 	if (unlikely(cancel))
1093 		goto out;
1094 
1095 	if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1096 		goto out;
1097 
1098 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1099 	/* FIXME if this allocation fails, online verify will not terminate! */
1100 	digest = kmalloc(digest_size, GFP_NOIO);
1101 	if (digest) {
1102 		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1103 		inc_rs_pending(mdev);
1104 		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1105 					     digest, digest_size, P_OV_REPLY);
1106 		if (!ok)
1107 			dec_rs_pending(mdev);
1108 		kfree(digest);
1109 	}
1110 
1111 out:
1112 	drbd_free_ee(mdev, e);
1113 
1114 	dec_unacked(mdev);
1115 
1116 	return ok;
1117 }
1118 
1119 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1120 {
1121 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1122 		mdev->ov_last_oos_size += size>>9;
1123 	} else {
1124 		mdev->ov_last_oos_start = sector;
1125 		mdev->ov_last_oos_size = size>>9;
1126 	}
1127 	drbd_set_out_of_sync(mdev, sector, size);
1128 	set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1129 }
1130 
1131 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1132 {
1133 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1134 	struct digest_info *di;
1135 	int digest_size;
1136 	void *digest;
1137 	int ok, eq = 0;
1138 
1139 	if (unlikely(cancel)) {
1140 		drbd_free_ee(mdev, e);
1141 		dec_unacked(mdev);
1142 		return 1;
1143 	}
1144 
1145 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1146 	 * the resync lru has been cleaned up already */
1147 	if (get_ldev(mdev)) {
1148 		drbd_rs_complete_io(mdev, e->sector);
1149 		put_ldev(mdev);
1150 	}
1151 
1152 	di = e->digest;
1153 
1154 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1155 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1156 		digest = kmalloc(digest_size, GFP_NOIO);
1157 		if (digest) {
1158 			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1159 
1160 			D_ASSERT(digest_size == di->digest_size);
1161 			eq = !memcmp(digest, di->digest, digest_size);
1162 			kfree(digest);
1163 		}
1164 	} else {
1165 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1166 		if (__ratelimit(&drbd_ratelimit_state))
1167 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1168 	}
1169 
1170 	dec_unacked(mdev);
1171 	if (!eq)
1172 		drbd_ov_oos_found(mdev, e->sector, e->size);
1173 	else
1174 		ov_oos_print(mdev);
1175 
1176 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1177 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1178 
1179 	drbd_free_ee(mdev, e);
1180 
1181 	--mdev->ov_left;
1182 
1183 	/* let's advance progress step marks only for every other megabyte */
1184 	if ((mdev->ov_left & 0x200) == 0x200)
1185 		drbd_advance_rs_marks(mdev, mdev->ov_left);
1186 
1187 	if (mdev->ov_left == 0) {
1188 		ov_oos_print(mdev);
1189 		drbd_resync_finished(mdev);
1190 	}
1191 
1192 	return ok;
1193 }
1194 
1195 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1196 {
1197 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1198 	complete(&b->done);
1199 	return 1;
1200 }
1201 
1202 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1203 {
1204 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1205 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1206 	int ok = 1;
1207 
1208 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1209 	 * just before it was reassigned and re-queued, so double check that.
1210 	 * actually, this race was harmless, since we only try to send the
1211 	 * barrier packet here, and otherwise do nothing with the object.
1212 	 * but compare with the head of w_clear_epoch */
1213 	spin_lock_irq(&mdev->req_lock);
1214 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1215 		cancel = 1;
1216 	spin_unlock_irq(&mdev->req_lock);
1217 	if (cancel)
1218 		return 1;
1219 
1220 	if (!drbd_get_data_sock(mdev))
1221 		return 0;
1222 	p->barrier = b->br_number;
1223 	/* inc_ap_pending was done where this was queued.
1224 	 * dec_ap_pending will be done in got_BarrierAck
1225 	 * or (on connection loss) in w_clear_epoch.  */
1226 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1227 				(struct p_header80 *)p, sizeof(*p), 0);
1228 	drbd_put_data_sock(mdev);
1229 
1230 	return ok;
1231 }
1232 
1233 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1234 {
1235 	if (cancel)
1236 		return 1;
1237 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1238 }
1239 
1240 int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1241 {
1242 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1243 	int ok;
1244 
1245 	if (unlikely(cancel)) {
1246 		req_mod(req, send_canceled);
1247 		return 1;
1248 	}
1249 
1250 	ok = drbd_send_oos(mdev, req);
1251 	req_mod(req, oos_handed_to_network);
1252 
1253 	return ok;
1254 }
1255 
1256 /**
1257  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1258  * @mdev:	DRBD device.
1259  * @w:		work object.
1260  * @cancel:	The connection will be closed anyways
1261  */
1262 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1263 {
1264 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1265 	int ok;
1266 
1267 	if (unlikely(cancel)) {
1268 		req_mod(req, send_canceled);
1269 		return 1;
1270 	}
1271 
1272 	ok = drbd_send_dblock(mdev, req);
1273 	req_mod(req, ok ? handed_over_to_network : send_failed);
1274 
1275 	return ok;
1276 }
1277 
1278 /**
1279  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1280  * @mdev:	DRBD device.
1281  * @w:		work object.
1282  * @cancel:	The connection will be closed anyways
1283  */
1284 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1285 {
1286 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1287 	int ok;
1288 
1289 	if (unlikely(cancel)) {
1290 		req_mod(req, send_canceled);
1291 		return 1;
1292 	}
1293 
1294 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1295 				(unsigned long)req);
1296 
1297 	if (!ok) {
1298 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1299 		 * so this is probably redundant */
1300 		if (mdev->state.conn >= C_CONNECTED)
1301 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1302 	}
1303 	req_mod(req, ok ? handed_over_to_network : send_failed);
1304 
1305 	return ok;
1306 }
1307 
1308 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1309 {
1310 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1311 
1312 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1313 		drbd_al_begin_io(mdev, req->sector);
1314 	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1315 	   theoretically. Practically it can not deadlock, since this is
1316 	   only used when unfreezing IOs. All the extents of the requests
1317 	   that made it into the TL are already active */
1318 
1319 	drbd_req_make_private_bio(req, req->master_bio);
1320 	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1321 	generic_make_request(req->private_bio);
1322 
1323 	return 1;
1324 }
1325 
1326 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1327 {
1328 	struct drbd_conf *odev = mdev;
1329 
1330 	while (1) {
1331 		if (odev->sync_conf.after == -1)
1332 			return 1;
1333 		odev = minor_to_mdev(odev->sync_conf.after);
1334 		ERR_IF(!odev) return 1;
1335 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1336 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1337 		    odev->state.aftr_isp || odev->state.peer_isp ||
1338 		    odev->state.user_isp)
1339 			return 0;
1340 	}
1341 }
1342 
1343 /**
1344  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1345  * @mdev:	DRBD device.
1346  *
1347  * Called from process context only (admin command and after_state_ch).
1348  */
1349 static int _drbd_pause_after(struct drbd_conf *mdev)
1350 {
1351 	struct drbd_conf *odev;
1352 	int i, rv = 0;
1353 
1354 	for (i = 0; i < minor_count; i++) {
1355 		odev = minor_to_mdev(i);
1356 		if (!odev)
1357 			continue;
1358 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1359 			continue;
1360 		if (!_drbd_may_sync_now(odev))
1361 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1362 			       != SS_NOTHING_TO_DO);
1363 	}
1364 
1365 	return rv;
1366 }
1367 
1368 /**
1369  * _drbd_resume_next() - Resume resync on all devices that may resync now
1370  * @mdev:	DRBD device.
1371  *
1372  * Called from process context only (admin command and worker).
1373  */
1374 static int _drbd_resume_next(struct drbd_conf *mdev)
1375 {
1376 	struct drbd_conf *odev;
1377 	int i, rv = 0;
1378 
1379 	for (i = 0; i < minor_count; i++) {
1380 		odev = minor_to_mdev(i);
1381 		if (!odev)
1382 			continue;
1383 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1384 			continue;
1385 		if (odev->state.aftr_isp) {
1386 			if (_drbd_may_sync_now(odev))
1387 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1388 							CS_HARD, NULL)
1389 				       != SS_NOTHING_TO_DO) ;
1390 		}
1391 	}
1392 	return rv;
1393 }
1394 
1395 void resume_next_sg(struct drbd_conf *mdev)
1396 {
1397 	write_lock_irq(&global_state_lock);
1398 	_drbd_resume_next(mdev);
1399 	write_unlock_irq(&global_state_lock);
1400 }
1401 
1402 void suspend_other_sg(struct drbd_conf *mdev)
1403 {
1404 	write_lock_irq(&global_state_lock);
1405 	_drbd_pause_after(mdev);
1406 	write_unlock_irq(&global_state_lock);
1407 }
1408 
1409 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1410 {
1411 	struct drbd_conf *odev;
1412 
1413 	if (o_minor == -1)
1414 		return NO_ERROR;
1415 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1416 		return ERR_SYNC_AFTER;
1417 
1418 	/* check for loops */
1419 	odev = minor_to_mdev(o_minor);
1420 	while (1) {
1421 		if (odev == mdev)
1422 			return ERR_SYNC_AFTER_CYCLE;
1423 
1424 		/* dependency chain ends here, no cycles. */
1425 		if (odev->sync_conf.after == -1)
1426 			return NO_ERROR;
1427 
1428 		/* follow the dependency chain */
1429 		odev = minor_to_mdev(odev->sync_conf.after);
1430 	}
1431 }
1432 
1433 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1434 {
1435 	int changes;
1436 	int retcode;
1437 
1438 	write_lock_irq(&global_state_lock);
1439 	retcode = sync_after_error(mdev, na);
1440 	if (retcode == NO_ERROR) {
1441 		mdev->sync_conf.after = na;
1442 		do {
1443 			changes  = _drbd_pause_after(mdev);
1444 			changes |= _drbd_resume_next(mdev);
1445 		} while (changes);
1446 	}
1447 	write_unlock_irq(&global_state_lock);
1448 	return retcode;
1449 }
1450 
1451 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1452 {
1453 	atomic_set(&mdev->rs_sect_in, 0);
1454 	atomic_set(&mdev->rs_sect_ev, 0);
1455 	mdev->rs_in_flight = 0;
1456 	mdev->rs_planed = 0;
1457 	spin_lock(&mdev->peer_seq_lock);
1458 	fifo_set(&mdev->rs_plan_s, 0);
1459 	spin_unlock(&mdev->peer_seq_lock);
1460 }
1461 
1462 /**
1463  * drbd_start_resync() - Start the resync process
1464  * @mdev:	DRBD device.
1465  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1466  *
1467  * This function might bring you directly into one of the
1468  * C_PAUSED_SYNC_* states.
1469  */
1470 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1471 {
1472 	union drbd_state ns;
1473 	int r;
1474 
1475 	if (mdev->state.conn >= C_SYNC_SOURCE) {
1476 		dev_err(DEV, "Resync already running!\n");
1477 		return;
1478 	}
1479 
1480 	/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1481 	drbd_rs_cancel_all(mdev);
1482 
1483 	if (side == C_SYNC_TARGET) {
1484 		/* Since application IO was locked out during C_WF_BITMAP_T and
1485 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1486 		   we check that we might make the data inconsistent. */
1487 		r = drbd_khelper(mdev, "before-resync-target");
1488 		r = (r >> 8) & 0xff;
1489 		if (r > 0) {
1490 			dev_info(DEV, "before-resync-target handler returned %d, "
1491 			     "dropping connection.\n", r);
1492 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1493 			return;
1494 		}
1495 	}
1496 
1497 	drbd_state_lock(mdev);
1498 
1499 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1500 		drbd_state_unlock(mdev);
1501 		return;
1502 	}
1503 
1504 	if (side == C_SYNC_TARGET) {
1505 		mdev->bm_resync_fo = 0;
1506 	} else /* side == C_SYNC_SOURCE */ {
1507 		u64 uuid;
1508 
1509 		get_random_bytes(&uuid, sizeof(u64));
1510 		drbd_uuid_set(mdev, UI_BITMAP, uuid);
1511 		drbd_send_sync_uuid(mdev, uuid);
1512 
1513 		D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1514 	}
1515 
1516 	write_lock_irq(&global_state_lock);
1517 	ns = mdev->state;
1518 
1519 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1520 
1521 	ns.conn = side;
1522 
1523 	if (side == C_SYNC_TARGET)
1524 		ns.disk = D_INCONSISTENT;
1525 	else /* side == C_SYNC_SOURCE */
1526 		ns.pdsk = D_INCONSISTENT;
1527 
1528 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1529 	ns = mdev->state;
1530 
1531 	if (ns.conn < C_CONNECTED)
1532 		r = SS_UNKNOWN_ERROR;
1533 
1534 	if (r == SS_SUCCESS) {
1535 		unsigned long tw = drbd_bm_total_weight(mdev);
1536 		unsigned long now = jiffies;
1537 		int i;
1538 
1539 		mdev->rs_failed    = 0;
1540 		mdev->rs_paused    = 0;
1541 		mdev->rs_same_csum = 0;
1542 		mdev->rs_last_events = 0;
1543 		mdev->rs_last_sect_ev = 0;
1544 		mdev->rs_total     = tw;
1545 		mdev->rs_start     = now;
1546 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1547 			mdev->rs_mark_left[i] = tw;
1548 			mdev->rs_mark_time[i] = now;
1549 		}
1550 		_drbd_pause_after(mdev);
1551 	}
1552 	write_unlock_irq(&global_state_lock);
1553 	put_ldev(mdev);
1554 
1555 	if (r == SS_SUCCESS) {
1556 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1557 		     drbd_conn_str(ns.conn),
1558 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1559 		     (unsigned long) mdev->rs_total);
1560 
1561 		if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1562 			/* This still has a race (about when exactly the peers
1563 			 * detect connection loss) that can lead to a full sync
1564 			 * on next handshake. In 8.3.9 we fixed this with explicit
1565 			 * resync-finished notifications, but the fix
1566 			 * introduces a protocol change.  Sleeping for some
1567 			 * time longer than the ping interval + timeout on the
1568 			 * SyncSource, to give the SyncTarget the chance to
1569 			 * detect connection loss, then waiting for a ping
1570 			 * response (implicit in drbd_resync_finished) reduces
1571 			 * the race considerably, but does not solve it. */
1572 			if (side == C_SYNC_SOURCE)
1573 				schedule_timeout_interruptible(
1574 					mdev->net_conf->ping_int * HZ +
1575 					mdev->net_conf->ping_timeo*HZ/9);
1576 			drbd_resync_finished(mdev);
1577 		}
1578 
1579 		drbd_rs_controller_reset(mdev);
1580 		/* ns.conn may already be != mdev->state.conn,
1581 		 * we may have been paused in between, or become paused until
1582 		 * the timer triggers.
1583 		 * No matter, that is handled in resync_timer_fn() */
1584 		if (ns.conn == C_SYNC_TARGET)
1585 			mod_timer(&mdev->resync_timer, jiffies);
1586 
1587 		drbd_md_sync(mdev);
1588 	}
1589 	drbd_state_unlock(mdev);
1590 }
1591 
1592 int drbd_worker(struct drbd_thread *thi)
1593 {
1594 	struct drbd_conf *mdev = thi->mdev;
1595 	struct drbd_work *w = NULL;
1596 	LIST_HEAD(work_list);
1597 	int intr = 0, i;
1598 
1599 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1600 
1601 	while (get_t_state(thi) == Running) {
1602 		drbd_thread_current_set_cpu(mdev);
1603 
1604 		if (down_trylock(&mdev->data.work.s)) {
1605 			mutex_lock(&mdev->data.mutex);
1606 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1607 				drbd_tcp_uncork(mdev->data.socket);
1608 			mutex_unlock(&mdev->data.mutex);
1609 
1610 			intr = down_interruptible(&mdev->data.work.s);
1611 
1612 			mutex_lock(&mdev->data.mutex);
1613 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1614 				drbd_tcp_cork(mdev->data.socket);
1615 			mutex_unlock(&mdev->data.mutex);
1616 		}
1617 
1618 		if (intr) {
1619 			D_ASSERT(intr == -EINTR);
1620 			flush_signals(current);
1621 			ERR_IF (get_t_state(thi) == Running)
1622 				continue;
1623 			break;
1624 		}
1625 
1626 		if (get_t_state(thi) != Running)
1627 			break;
1628 		/* With this break, we have done a down() but not consumed
1629 		   the entry from the list. The cleanup code takes care of
1630 		   this...   */
1631 
1632 		w = NULL;
1633 		spin_lock_irq(&mdev->data.work.q_lock);
1634 		ERR_IF(list_empty(&mdev->data.work.q)) {
1635 			/* something terribly wrong in our logic.
1636 			 * we were able to down() the semaphore,
1637 			 * but the list is empty... doh.
1638 			 *
1639 			 * what is the best thing to do now?
1640 			 * try again from scratch, restarting the receiver,
1641 			 * asender, whatnot? could break even more ugly,
1642 			 * e.g. when we are primary, but no good local data.
1643 			 *
1644 			 * I'll try to get away just starting over this loop.
1645 			 */
1646 			spin_unlock_irq(&mdev->data.work.q_lock);
1647 			continue;
1648 		}
1649 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1650 		list_del_init(&w->list);
1651 		spin_unlock_irq(&mdev->data.work.q_lock);
1652 
1653 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1654 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1655 			if (mdev->state.conn >= C_CONNECTED)
1656 				drbd_force_state(mdev,
1657 						NS(conn, C_NETWORK_FAILURE));
1658 		}
1659 	}
1660 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1661 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1662 
1663 	spin_lock_irq(&mdev->data.work.q_lock);
1664 	i = 0;
1665 	while (!list_empty(&mdev->data.work.q)) {
1666 		list_splice_init(&mdev->data.work.q, &work_list);
1667 		spin_unlock_irq(&mdev->data.work.q_lock);
1668 
1669 		while (!list_empty(&work_list)) {
1670 			w = list_entry(work_list.next, struct drbd_work, list);
1671 			list_del_init(&w->list);
1672 			w->cb(mdev, w, 1);
1673 			i++; /* dead debugging code */
1674 		}
1675 
1676 		spin_lock_irq(&mdev->data.work.q_lock);
1677 	}
1678 	sema_init(&mdev->data.work.s, 0);
1679 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1680 	 * but up() ed outside the spinlock, we could get an up() on the
1681 	 * semaphore without corresponding list entry.
1682 	 * So don't do that.
1683 	 */
1684 	spin_unlock_irq(&mdev->data.work.q_lock);
1685 
1686 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1687 	/* _drbd_set_state only uses stop_nowait.
1688 	 * wait here for the Exiting receiver. */
1689 	drbd_thread_stop(&mdev->receiver);
1690 	drbd_mdev_cleanup(mdev);
1691 
1692 	dev_info(DEV, "worker terminated\n");
1693 
1694 	clear_bit(DEVICE_DYING, &mdev->flags);
1695 	clear_bit(CONFIG_PENDING, &mdev->flags);
1696 	wake_up(&mdev->state_wait);
1697 
1698 	return 0;
1699 }
1700