xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision d612d309)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40 
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 static int w_make_resync_request(struct drbd_conf *mdev,
43 				 struct drbd_work *w, int cancel);
44 
45 
46 
47 /* defined here:
48    drbd_md_io_complete
49    drbd_endio_sec
50    drbd_endio_pri
51 
52  * more endio handlers:
53    atodb_endio in drbd_actlog.c
54    drbd_bm_async_io_complete in drbd_bitmap.c
55 
56  * For all these callbacks, note the following:
57  * The callbacks will be called in irq context by the IDE drivers,
58  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
59  * Try to get the locking right :)
60  *
61  */
62 
63 
64 /* About the global_state_lock
65    Each state transition on an device holds a read lock. In case we have
66    to evaluate the sync after dependencies, we grab a write lock, because
67    we need stable states on all devices for that.  */
68 rwlock_t global_state_lock;
69 
70 /* used for synchronous meta data and bitmap IO
71  * submitted by drbd_md_sync_page_io()
72  */
73 void drbd_md_io_complete(struct bio *bio, int error)
74 {
75 	struct drbd_md_io *md_io;
76 
77 	md_io = (struct drbd_md_io *)bio->bi_private;
78 	md_io->error = error;
79 
80 	complete(&md_io->event);
81 }
82 
83 /* reads on behalf of the partner,
84  * "submitted" by the receiver
85  */
86 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
87 {
88 	unsigned long flags = 0;
89 	struct drbd_conf *mdev = e->mdev;
90 
91 	D_ASSERT(e->block_id != ID_VACANT);
92 
93 	spin_lock_irqsave(&mdev->req_lock, flags);
94 	mdev->read_cnt += e->size >> 9;
95 	list_del(&e->w.list);
96 	if (list_empty(&mdev->read_ee))
97 		wake_up(&mdev->ee_wait);
98 	if (test_bit(__EE_WAS_ERROR, &e->flags))
99 		__drbd_chk_io_error(mdev, false);
100 	spin_unlock_irqrestore(&mdev->req_lock, flags);
101 
102 	drbd_queue_work(&mdev->data.work, &e->w);
103 	put_ldev(mdev);
104 }
105 
106 /* writes on behalf of the partner, or resync writes,
107  * "submitted" by the receiver, final stage.  */
108 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
109 {
110 	unsigned long flags = 0;
111 	struct drbd_conf *mdev = e->mdev;
112 	sector_t e_sector;
113 	int do_wake;
114 	int is_syncer_req;
115 	int do_al_complete_io;
116 
117 	D_ASSERT(e->block_id != ID_VACANT);
118 
119 	/* after we moved e to done_ee,
120 	 * we may no longer access it,
121 	 * it may be freed/reused already!
122 	 * (as soon as we release the req_lock) */
123 	e_sector = e->sector;
124 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
125 	is_syncer_req = is_syncer_block_id(e->block_id);
126 
127 	spin_lock_irqsave(&mdev->req_lock, flags);
128 	mdev->writ_cnt += e->size >> 9;
129 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
130 	list_add_tail(&e->w.list, &mdev->done_ee);
131 
132 	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
133 	 * neither did we wake possibly waiting conflicting requests.
134 	 * done from "drbd_process_done_ee" within the appropriate w.cb
135 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
136 
137 	do_wake = is_syncer_req
138 		? list_empty(&mdev->sync_ee)
139 		: list_empty(&mdev->active_ee);
140 
141 	if (test_bit(__EE_WAS_ERROR, &e->flags))
142 		__drbd_chk_io_error(mdev, false);
143 	spin_unlock_irqrestore(&mdev->req_lock, flags);
144 
145 	if (is_syncer_req)
146 		drbd_rs_complete_io(mdev, e_sector);
147 
148 	if (do_wake)
149 		wake_up(&mdev->ee_wait);
150 
151 	if (do_al_complete_io)
152 		drbd_al_complete_io(mdev, e_sector);
153 
154 	wake_asender(mdev);
155 	put_ldev(mdev);
156 }
157 
158 /* writes on behalf of the partner, or resync writes,
159  * "submitted" by the receiver.
160  */
161 void drbd_endio_sec(struct bio *bio, int error)
162 {
163 	struct drbd_epoch_entry *e = bio->bi_private;
164 	struct drbd_conf *mdev = e->mdev;
165 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
166 	int is_write = bio_data_dir(bio) == WRITE;
167 
168 	if (error && __ratelimit(&drbd_ratelimit_state))
169 		dev_warn(DEV, "%s: error=%d s=%llus\n",
170 				is_write ? "write" : "read", error,
171 				(unsigned long long)e->sector);
172 	if (!error && !uptodate) {
173 		if (__ratelimit(&drbd_ratelimit_state))
174 			dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
175 					is_write ? "write" : "read",
176 					(unsigned long long)e->sector);
177 		/* strange behavior of some lower level drivers...
178 		 * fail the request by clearing the uptodate flag,
179 		 * but do not return any error?! */
180 		error = -EIO;
181 	}
182 
183 	if (error)
184 		set_bit(__EE_WAS_ERROR, &e->flags);
185 
186 	bio_put(bio); /* no need for the bio anymore */
187 	if (atomic_dec_and_test(&e->pending_bios)) {
188 		if (is_write)
189 			drbd_endio_write_sec_final(e);
190 		else
191 			drbd_endio_read_sec_final(e);
192 	}
193 }
194 
195 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
196  */
197 void drbd_endio_pri(struct bio *bio, int error)
198 {
199 	unsigned long flags;
200 	struct drbd_request *req = bio->bi_private;
201 	struct drbd_conf *mdev = req->mdev;
202 	struct bio_and_error m;
203 	enum drbd_req_event what;
204 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
205 
206 	if (!error && !uptodate) {
207 		dev_warn(DEV, "p %s: setting error to -EIO\n",
208 			 bio_data_dir(bio) == WRITE ? "write" : "read");
209 		/* strange behavior of some lower level drivers...
210 		 * fail the request by clearing the uptodate flag,
211 		 * but do not return any error?! */
212 		error = -EIO;
213 	}
214 
215 	/* to avoid recursion in __req_mod */
216 	if (unlikely(error)) {
217 		what = (bio_data_dir(bio) == WRITE)
218 			? write_completed_with_error
219 			: (bio_rw(bio) == READ)
220 			  ? read_completed_with_error
221 			  : read_ahead_completed_with_error;
222 	} else
223 		what = completed_ok;
224 
225 	bio_put(req->private_bio);
226 	req->private_bio = ERR_PTR(error);
227 
228 	/* not req_mod(), we need irqsave here! */
229 	spin_lock_irqsave(&mdev->req_lock, flags);
230 	__req_mod(req, what, &m);
231 	spin_unlock_irqrestore(&mdev->req_lock, flags);
232 
233 	if (m.bio)
234 		complete_master_bio(mdev, &m);
235 }
236 
237 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
238 {
239 	struct drbd_request *req = container_of(w, struct drbd_request, w);
240 
241 	/* We should not detach for read io-error,
242 	 * but try to WRITE the P_DATA_REPLY to the failed location,
243 	 * to give the disk the chance to relocate that block */
244 
245 	spin_lock_irq(&mdev->req_lock);
246 	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
247 		_req_mod(req, read_retry_remote_canceled);
248 		spin_unlock_irq(&mdev->req_lock);
249 		return 1;
250 	}
251 	spin_unlock_irq(&mdev->req_lock);
252 
253 	return w_send_read_req(mdev, w, 0);
254 }
255 
256 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
257 {
258 	ERR_IF(cancel) return 1;
259 	dev_err(DEV, "resync inactive, but callback triggered??\n");
260 	return 1; /* Simply ignore this! */
261 }
262 
263 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
264 {
265 	struct hash_desc desc;
266 	struct scatterlist sg;
267 	struct page *page = e->pages;
268 	struct page *tmp;
269 	unsigned len;
270 
271 	desc.tfm = tfm;
272 	desc.flags = 0;
273 
274 	sg_init_table(&sg, 1);
275 	crypto_hash_init(&desc);
276 
277 	while ((tmp = page_chain_next(page))) {
278 		/* all but the last page will be fully used */
279 		sg_set_page(&sg, page, PAGE_SIZE, 0);
280 		crypto_hash_update(&desc, &sg, sg.length);
281 		page = tmp;
282 	}
283 	/* and now the last, possibly only partially used page */
284 	len = e->size & (PAGE_SIZE - 1);
285 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
286 	crypto_hash_update(&desc, &sg, sg.length);
287 	crypto_hash_final(&desc, digest);
288 }
289 
290 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
291 {
292 	struct hash_desc desc;
293 	struct scatterlist sg;
294 	struct bio_vec *bvec;
295 	int i;
296 
297 	desc.tfm = tfm;
298 	desc.flags = 0;
299 
300 	sg_init_table(&sg, 1);
301 	crypto_hash_init(&desc);
302 
303 	__bio_for_each_segment(bvec, bio, i, 0) {
304 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
305 		crypto_hash_update(&desc, &sg, sg.length);
306 	}
307 	crypto_hash_final(&desc, digest);
308 }
309 
310 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
311 {
312 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
313 	int digest_size;
314 	void *digest;
315 	int ok;
316 
317 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
318 
319 	if (unlikely(cancel)) {
320 		drbd_free_ee(mdev, e);
321 		return 1;
322 	}
323 
324 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
325 		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
326 		digest = kmalloc(digest_size, GFP_NOIO);
327 		if (digest) {
328 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
329 
330 			inc_rs_pending(mdev);
331 			ok = drbd_send_drequest_csum(mdev,
332 						     e->sector,
333 						     e->size,
334 						     digest,
335 						     digest_size,
336 						     P_CSUM_RS_REQUEST);
337 			kfree(digest);
338 		} else {
339 			dev_err(DEV, "kmalloc() of digest failed.\n");
340 			ok = 0;
341 		}
342 	} else
343 		ok = 1;
344 
345 	drbd_free_ee(mdev, e);
346 
347 	if (unlikely(!ok))
348 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
349 	return ok;
350 }
351 
352 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
353 
354 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
355 {
356 	struct drbd_epoch_entry *e;
357 
358 	if (!get_ldev(mdev))
359 		return -EIO;
360 
361 	if (drbd_rs_should_slow_down(mdev, sector))
362 		goto defer;
363 
364 	/* GFP_TRY, because if there is no memory available right now, this may
365 	 * be rescheduled for later. It is "only" background resync, after all. */
366 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
367 	if (!e)
368 		goto defer;
369 
370 	e->w.cb = w_e_send_csum;
371 	spin_lock_irq(&mdev->req_lock);
372 	list_add(&e->w.list, &mdev->read_ee);
373 	spin_unlock_irq(&mdev->req_lock);
374 
375 	atomic_add(size >> 9, &mdev->rs_sect_ev);
376 	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
377 		return 0;
378 
379 	/* drbd_submit_ee currently fails for one reason only:
380 	 * not being able to allocate enough bios.
381 	 * Is dropping the connection going to help? */
382 	spin_lock_irq(&mdev->req_lock);
383 	list_del(&e->w.list);
384 	spin_unlock_irq(&mdev->req_lock);
385 
386 	drbd_free_ee(mdev, e);
387 defer:
388 	put_ldev(mdev);
389 	return -EAGAIN;
390 }
391 
392 void resync_timer_fn(unsigned long data)
393 {
394 	struct drbd_conf *mdev = (struct drbd_conf *) data;
395 	int queue;
396 
397 	queue = 1;
398 	switch (mdev->state.conn) {
399 	case C_VERIFY_S:
400 		mdev->resync_work.cb = w_make_ov_request;
401 		break;
402 	case C_SYNC_TARGET:
403 		mdev->resync_work.cb = w_make_resync_request;
404 		break;
405 	default:
406 		queue = 0;
407 		mdev->resync_work.cb = w_resync_inactive;
408 	}
409 
410 	/* harmless race: list_empty outside data.work.q_lock */
411 	if (list_empty(&mdev->resync_work.list) && queue)
412 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
413 }
414 
415 static void fifo_set(struct fifo_buffer *fb, int value)
416 {
417 	int i;
418 
419 	for (i = 0; i < fb->size; i++)
420 		fb->values[i] = value;
421 }
422 
423 static int fifo_push(struct fifo_buffer *fb, int value)
424 {
425 	int ov;
426 
427 	ov = fb->values[fb->head_index];
428 	fb->values[fb->head_index++] = value;
429 
430 	if (fb->head_index >= fb->size)
431 		fb->head_index = 0;
432 
433 	return ov;
434 }
435 
436 static void fifo_add_val(struct fifo_buffer *fb, int value)
437 {
438 	int i;
439 
440 	for (i = 0; i < fb->size; i++)
441 		fb->values[i] += value;
442 }
443 
444 static int drbd_rs_controller(struct drbd_conf *mdev)
445 {
446 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
447 	unsigned int want;     /* The number of sectors we want in the proxy */
448 	int req_sect; /* Number of sectors to request in this turn */
449 	int correction; /* Number of sectors more we need in the proxy*/
450 	int cps; /* correction per invocation of drbd_rs_controller() */
451 	int steps; /* Number of time steps to plan ahead */
452 	int curr_corr;
453 	int max_sect;
454 
455 	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
456 	mdev->rs_in_flight -= sect_in;
457 
458 	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
459 
460 	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
461 
462 	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
463 		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
464 	} else { /* normal path */
465 		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
466 			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
467 	}
468 
469 	correction = want - mdev->rs_in_flight - mdev->rs_planed;
470 
471 	/* Plan ahead */
472 	cps = correction / steps;
473 	fifo_add_val(&mdev->rs_plan_s, cps);
474 	mdev->rs_planed += cps * steps;
475 
476 	/* What we do in this step */
477 	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
478 	spin_unlock(&mdev->peer_seq_lock);
479 	mdev->rs_planed -= curr_corr;
480 
481 	req_sect = sect_in + curr_corr;
482 	if (req_sect < 0)
483 		req_sect = 0;
484 
485 	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
486 	if (req_sect > max_sect)
487 		req_sect = max_sect;
488 
489 	/*
490 	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
491 		 sect_in, mdev->rs_in_flight, want, correction,
492 		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
493 	*/
494 
495 	return req_sect;
496 }
497 
498 static int drbd_rs_number_requests(struct drbd_conf *mdev)
499 {
500 	int number;
501 	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
502 		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
503 		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
504 	} else {
505 		mdev->c_sync_rate = mdev->sync_conf.rate;
506 		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
507 	}
508 
509 	/* ignore the amount of pending requests, the resync controller should
510 	 * throttle down to incoming reply rate soon enough anyways. */
511 	return number;
512 }
513 
514 static int w_make_resync_request(struct drbd_conf *mdev,
515 				 struct drbd_work *w, int cancel)
516 {
517 	unsigned long bit;
518 	sector_t sector;
519 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
520 	int max_bio_size;
521 	int number, rollback_i, size;
522 	int align, queued, sndbuf;
523 	int i = 0;
524 
525 	if (unlikely(cancel))
526 		return 1;
527 
528 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
529 		dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
530 		return 0;
531 	}
532 
533 	if (mdev->state.conn != C_SYNC_TARGET)
534 		dev_err(DEV, "%s in w_make_resync_request\n",
535 			drbd_conn_str(mdev->state.conn));
536 
537 	if (mdev->rs_total == 0) {
538 		/* empty resync? */
539 		drbd_resync_finished(mdev);
540 		return 1;
541 	}
542 
543 	if (!get_ldev(mdev)) {
544 		/* Since we only need to access mdev->rsync a
545 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
546 		   to continue resync with a broken disk makes no sense at
547 		   all */
548 		dev_err(DEV, "Disk broke down during resync!\n");
549 		mdev->resync_work.cb = w_resync_inactive;
550 		return 1;
551 	}
552 
553 	/* starting with drbd 8.3.8, we can handle multi-bio EEs,
554 	 * if it should be necessary */
555 	max_bio_size =
556 		mdev->agreed_pro_version < 94 ? queue_max_hw_sectors(mdev->rq_queue) << 9 :
557 		mdev->agreed_pro_version < 95 ?	DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_BIO_SIZE;
558 
559 	number = drbd_rs_number_requests(mdev);
560 	if (number == 0)
561 		goto requeue;
562 
563 	for (i = 0; i < number; i++) {
564 		/* Stop generating RS requests, when half of the send buffer is filled */
565 		mutex_lock(&mdev->data.mutex);
566 		if (mdev->data.socket) {
567 			queued = mdev->data.socket->sk->sk_wmem_queued;
568 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
569 		} else {
570 			queued = 1;
571 			sndbuf = 0;
572 		}
573 		mutex_unlock(&mdev->data.mutex);
574 		if (queued > sndbuf / 2)
575 			goto requeue;
576 
577 next_sector:
578 		size = BM_BLOCK_SIZE;
579 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
580 
581 		if (bit == DRBD_END_OF_BITMAP) {
582 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
583 			mdev->resync_work.cb = w_resync_inactive;
584 			put_ldev(mdev);
585 			return 1;
586 		}
587 
588 		sector = BM_BIT_TO_SECT(bit);
589 
590 		if (drbd_rs_should_slow_down(mdev, sector) ||
591 		    drbd_try_rs_begin_io(mdev, sector)) {
592 			mdev->bm_resync_fo = bit;
593 			goto requeue;
594 		}
595 		mdev->bm_resync_fo = bit + 1;
596 
597 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
598 			drbd_rs_complete_io(mdev, sector);
599 			goto next_sector;
600 		}
601 
602 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
603 		/* try to find some adjacent bits.
604 		 * we stop if we have already the maximum req size.
605 		 *
606 		 * Additionally always align bigger requests, in order to
607 		 * be prepared for all stripe sizes of software RAIDs.
608 		 */
609 		align = 1;
610 		rollback_i = i;
611 		for (;;) {
612 			if (size + BM_BLOCK_SIZE > max_bio_size)
613 				break;
614 
615 			/* Be always aligned */
616 			if (sector & ((1<<(align+3))-1))
617 				break;
618 
619 			/* do not cross extent boundaries */
620 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
621 				break;
622 			/* now, is it actually dirty, after all?
623 			 * caution, drbd_bm_test_bit is tri-state for some
624 			 * obscure reason; ( b == 0 ) would get the out-of-band
625 			 * only accidentally right because of the "oddly sized"
626 			 * adjustment below */
627 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
628 				break;
629 			bit++;
630 			size += BM_BLOCK_SIZE;
631 			if ((BM_BLOCK_SIZE << align) <= size)
632 				align++;
633 			i++;
634 		}
635 		/* if we merged some,
636 		 * reset the offset to start the next drbd_bm_find_next from */
637 		if (size > BM_BLOCK_SIZE)
638 			mdev->bm_resync_fo = bit + 1;
639 #endif
640 
641 		/* adjust very last sectors, in case we are oddly sized */
642 		if (sector + (size>>9) > capacity)
643 			size = (capacity-sector)<<9;
644 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
645 			switch (read_for_csum(mdev, sector, size)) {
646 			case -EIO: /* Disk failure */
647 				put_ldev(mdev);
648 				return 0;
649 			case -EAGAIN: /* allocation failed, or ldev busy */
650 				drbd_rs_complete_io(mdev, sector);
651 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
652 				i = rollback_i;
653 				goto requeue;
654 			case 0:
655 				/* everything ok */
656 				break;
657 			default:
658 				BUG();
659 			}
660 		} else {
661 			inc_rs_pending(mdev);
662 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
663 					       sector, size, ID_SYNCER)) {
664 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
665 				dec_rs_pending(mdev);
666 				put_ldev(mdev);
667 				return 0;
668 			}
669 		}
670 	}
671 
672 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
673 		/* last syncer _request_ was sent,
674 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
675 		 * next sync group will resume), as soon as we receive the last
676 		 * resync data block, and the last bit is cleared.
677 		 * until then resync "work" is "inactive" ...
678 		 */
679 		mdev->resync_work.cb = w_resync_inactive;
680 		put_ldev(mdev);
681 		return 1;
682 	}
683 
684  requeue:
685 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
686 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
687 	put_ldev(mdev);
688 	return 1;
689 }
690 
691 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
692 {
693 	int number, i, size;
694 	sector_t sector;
695 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
696 
697 	if (unlikely(cancel))
698 		return 1;
699 
700 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
701 		dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
702 		return 0;
703 	}
704 
705 	number = drbd_rs_number_requests(mdev);
706 
707 	sector = mdev->ov_position;
708 	for (i = 0; i < number; i++) {
709 		if (sector >= capacity) {
710 			mdev->resync_work.cb = w_resync_inactive;
711 			return 1;
712 		}
713 
714 		size = BM_BLOCK_SIZE;
715 
716 		if (drbd_rs_should_slow_down(mdev, sector) ||
717 		    drbd_try_rs_begin_io(mdev, sector)) {
718 			mdev->ov_position = sector;
719 			goto requeue;
720 		}
721 
722 		if (sector + (size>>9) > capacity)
723 			size = (capacity-sector)<<9;
724 
725 		inc_rs_pending(mdev);
726 		if (!drbd_send_ov_request(mdev, sector, size)) {
727 			dec_rs_pending(mdev);
728 			return 0;
729 		}
730 		sector += BM_SECT_PER_BIT;
731 	}
732 	mdev->ov_position = sector;
733 
734  requeue:
735 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
736 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
737 	return 1;
738 }
739 
740 
741 int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
742 {
743 	drbd_start_resync(mdev, C_SYNC_SOURCE);
744 
745 	return 1;
746 }
747 
748 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
749 {
750 	kfree(w);
751 	ov_oos_print(mdev);
752 	drbd_resync_finished(mdev);
753 
754 	return 1;
755 }
756 
757 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
758 {
759 	kfree(w);
760 
761 	drbd_resync_finished(mdev);
762 
763 	return 1;
764 }
765 
766 static void ping_peer(struct drbd_conf *mdev)
767 {
768 	clear_bit(GOT_PING_ACK, &mdev->flags);
769 	request_ping(mdev);
770 	wait_event(mdev->misc_wait,
771 		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
772 }
773 
774 int drbd_resync_finished(struct drbd_conf *mdev)
775 {
776 	unsigned long db, dt, dbdt;
777 	unsigned long n_oos;
778 	union drbd_state os, ns;
779 	struct drbd_work *w;
780 	char *khelper_cmd = NULL;
781 	int verify_done = 0;
782 
783 	/* Remove all elements from the resync LRU. Since future actions
784 	 * might set bits in the (main) bitmap, then the entries in the
785 	 * resync LRU would be wrong. */
786 	if (drbd_rs_del_all(mdev)) {
787 		/* In case this is not possible now, most probably because
788 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
789 		 * queue (or even the read operations for those packets
790 		 * is not finished by now).   Retry in 100ms. */
791 
792 		__set_current_state(TASK_INTERRUPTIBLE);
793 		schedule_timeout(HZ / 10);
794 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
795 		if (w) {
796 			w->cb = w_resync_finished;
797 			drbd_queue_work(&mdev->data.work, w);
798 			return 1;
799 		}
800 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
801 	}
802 
803 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
804 	if (dt <= 0)
805 		dt = 1;
806 	db = mdev->rs_total;
807 	dbdt = Bit2KB(db/dt);
808 	mdev->rs_paused /= HZ;
809 
810 	if (!get_ldev(mdev))
811 		goto out;
812 
813 	ping_peer(mdev);
814 
815 	spin_lock_irq(&mdev->req_lock);
816 	os = mdev->state;
817 
818 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
819 
820 	/* This protects us against multiple calls (that can happen in the presence
821 	   of application IO), and against connectivity loss just before we arrive here. */
822 	if (os.conn <= C_CONNECTED)
823 		goto out_unlock;
824 
825 	ns = os;
826 	ns.conn = C_CONNECTED;
827 
828 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
829 	     verify_done ? "Online verify " : "Resync",
830 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
831 
832 	n_oos = drbd_bm_total_weight(mdev);
833 
834 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
835 		if (n_oos) {
836 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
837 			      n_oos, Bit2KB(1));
838 			khelper_cmd = "out-of-sync";
839 		}
840 	} else {
841 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
842 
843 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
844 			khelper_cmd = "after-resync-target";
845 
846 		if (mdev->csums_tfm && mdev->rs_total) {
847 			const unsigned long s = mdev->rs_same_csum;
848 			const unsigned long t = mdev->rs_total;
849 			const int ratio =
850 				(t == 0)     ? 0 :
851 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
852 			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
853 			     "transferred %luK total %luK\n",
854 			     ratio,
855 			     Bit2KB(mdev->rs_same_csum),
856 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
857 			     Bit2KB(mdev->rs_total));
858 		}
859 	}
860 
861 	if (mdev->rs_failed) {
862 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
863 
864 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
865 			ns.disk = D_INCONSISTENT;
866 			ns.pdsk = D_UP_TO_DATE;
867 		} else {
868 			ns.disk = D_UP_TO_DATE;
869 			ns.pdsk = D_INCONSISTENT;
870 		}
871 	} else {
872 		ns.disk = D_UP_TO_DATE;
873 		ns.pdsk = D_UP_TO_DATE;
874 
875 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
876 			if (mdev->p_uuid) {
877 				int i;
878 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
879 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
880 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
881 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
882 			} else {
883 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
884 			}
885 		}
886 
887 		drbd_uuid_set_bm(mdev, 0UL);
888 
889 		if (mdev->p_uuid) {
890 			/* Now the two UUID sets are equal, update what we
891 			 * know of the peer. */
892 			int i;
893 			for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
894 				mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
895 		}
896 	}
897 
898 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
899 out_unlock:
900 	spin_unlock_irq(&mdev->req_lock);
901 	put_ldev(mdev);
902 out:
903 	mdev->rs_total  = 0;
904 	mdev->rs_failed = 0;
905 	mdev->rs_paused = 0;
906 	if (verify_done)
907 		mdev->ov_start_sector = 0;
908 
909 	drbd_md_sync(mdev);
910 
911 	if (khelper_cmd)
912 		drbd_khelper(mdev, khelper_cmd);
913 
914 	return 1;
915 }
916 
917 /* helper */
918 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
919 {
920 	if (drbd_ee_has_active_page(e)) {
921 		/* This might happen if sendpage() has not finished */
922 		int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
923 		atomic_add(i, &mdev->pp_in_use_by_net);
924 		atomic_sub(i, &mdev->pp_in_use);
925 		spin_lock_irq(&mdev->req_lock);
926 		list_add_tail(&e->w.list, &mdev->net_ee);
927 		spin_unlock_irq(&mdev->req_lock);
928 		wake_up(&drbd_pp_wait);
929 	} else
930 		drbd_free_ee(mdev, e);
931 }
932 
933 /**
934  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
935  * @mdev:	DRBD device.
936  * @w:		work object.
937  * @cancel:	The connection will be closed anyways
938  */
939 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
940 {
941 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
942 	int ok;
943 
944 	if (unlikely(cancel)) {
945 		drbd_free_ee(mdev, e);
946 		dec_unacked(mdev);
947 		return 1;
948 	}
949 
950 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
951 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
952 	} else {
953 		if (__ratelimit(&drbd_ratelimit_state))
954 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
955 			    (unsigned long long)e->sector);
956 
957 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
958 	}
959 
960 	dec_unacked(mdev);
961 
962 	move_to_net_ee_or_free(mdev, e);
963 
964 	if (unlikely(!ok))
965 		dev_err(DEV, "drbd_send_block() failed\n");
966 	return ok;
967 }
968 
969 /**
970  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
971  * @mdev:	DRBD device.
972  * @w:		work object.
973  * @cancel:	The connection will be closed anyways
974  */
975 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
976 {
977 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
978 	int ok;
979 
980 	if (unlikely(cancel)) {
981 		drbd_free_ee(mdev, e);
982 		dec_unacked(mdev);
983 		return 1;
984 	}
985 
986 	if (get_ldev_if_state(mdev, D_FAILED)) {
987 		drbd_rs_complete_io(mdev, e->sector);
988 		put_ldev(mdev);
989 	}
990 
991 	if (mdev->state.conn == C_AHEAD) {
992 		ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
993 	} else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
994 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
995 			inc_rs_pending(mdev);
996 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
997 		} else {
998 			if (__ratelimit(&drbd_ratelimit_state))
999 				dev_err(DEV, "Not sending RSDataReply, "
1000 				    "partner DISKLESS!\n");
1001 			ok = 1;
1002 		}
1003 	} else {
1004 		if (__ratelimit(&drbd_ratelimit_state))
1005 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1006 			    (unsigned long long)e->sector);
1007 
1008 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1009 
1010 		/* update resync data with failure */
1011 		drbd_rs_failed_io(mdev, e->sector, e->size);
1012 	}
1013 
1014 	dec_unacked(mdev);
1015 
1016 	move_to_net_ee_or_free(mdev, e);
1017 
1018 	if (unlikely(!ok))
1019 		dev_err(DEV, "drbd_send_block() failed\n");
1020 	return ok;
1021 }
1022 
1023 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1024 {
1025 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1026 	struct digest_info *di;
1027 	int digest_size;
1028 	void *digest = NULL;
1029 	int ok, eq = 0;
1030 
1031 	if (unlikely(cancel)) {
1032 		drbd_free_ee(mdev, e);
1033 		dec_unacked(mdev);
1034 		return 1;
1035 	}
1036 
1037 	if (get_ldev(mdev)) {
1038 		drbd_rs_complete_io(mdev, e->sector);
1039 		put_ldev(mdev);
1040 	}
1041 
1042 	di = e->digest;
1043 
1044 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1045 		/* quick hack to try to avoid a race against reconfiguration.
1046 		 * a real fix would be much more involved,
1047 		 * introducing more locking mechanisms */
1048 		if (mdev->csums_tfm) {
1049 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1050 			D_ASSERT(digest_size == di->digest_size);
1051 			digest = kmalloc(digest_size, GFP_NOIO);
1052 		}
1053 		if (digest) {
1054 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1055 			eq = !memcmp(digest, di->digest, digest_size);
1056 			kfree(digest);
1057 		}
1058 
1059 		if (eq) {
1060 			drbd_set_in_sync(mdev, e->sector, e->size);
1061 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1062 			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1063 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1064 		} else {
1065 			inc_rs_pending(mdev);
1066 			e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1067 			e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1068 			kfree(di);
1069 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1070 		}
1071 	} else {
1072 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1073 		if (__ratelimit(&drbd_ratelimit_state))
1074 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1075 	}
1076 
1077 	dec_unacked(mdev);
1078 	move_to_net_ee_or_free(mdev, e);
1079 
1080 	if (unlikely(!ok))
1081 		dev_err(DEV, "drbd_send_block/ack() failed\n");
1082 	return ok;
1083 }
1084 
1085 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1086 {
1087 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1088 	int digest_size;
1089 	void *digest;
1090 	int ok = 1;
1091 
1092 	if (unlikely(cancel))
1093 		goto out;
1094 
1095 	if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1096 		goto out;
1097 
1098 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1099 	/* FIXME if this allocation fails, online verify will not terminate! */
1100 	digest = kmalloc(digest_size, GFP_NOIO);
1101 	if (digest) {
1102 		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1103 		inc_rs_pending(mdev);
1104 		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1105 					     digest, digest_size, P_OV_REPLY);
1106 		if (!ok)
1107 			dec_rs_pending(mdev);
1108 		kfree(digest);
1109 	}
1110 
1111 out:
1112 	drbd_free_ee(mdev, e);
1113 
1114 	dec_unacked(mdev);
1115 
1116 	return ok;
1117 }
1118 
1119 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1120 {
1121 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1122 		mdev->ov_last_oos_size += size>>9;
1123 	} else {
1124 		mdev->ov_last_oos_start = sector;
1125 		mdev->ov_last_oos_size = size>>9;
1126 	}
1127 	drbd_set_out_of_sync(mdev, sector, size);
1128 }
1129 
1130 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1131 {
1132 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1133 	struct digest_info *di;
1134 	int digest_size;
1135 	void *digest;
1136 	int ok, eq = 0;
1137 
1138 	if (unlikely(cancel)) {
1139 		drbd_free_ee(mdev, e);
1140 		dec_unacked(mdev);
1141 		return 1;
1142 	}
1143 
1144 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1145 	 * the resync lru has been cleaned up already */
1146 	if (get_ldev(mdev)) {
1147 		drbd_rs_complete_io(mdev, e->sector);
1148 		put_ldev(mdev);
1149 	}
1150 
1151 	di = e->digest;
1152 
1153 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1154 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1155 		digest = kmalloc(digest_size, GFP_NOIO);
1156 		if (digest) {
1157 			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1158 
1159 			D_ASSERT(digest_size == di->digest_size);
1160 			eq = !memcmp(digest, di->digest, digest_size);
1161 			kfree(digest);
1162 		}
1163 	} else {
1164 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1165 		if (__ratelimit(&drbd_ratelimit_state))
1166 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1167 	}
1168 
1169 	dec_unacked(mdev);
1170 	if (!eq)
1171 		drbd_ov_oos_found(mdev, e->sector, e->size);
1172 	else
1173 		ov_oos_print(mdev);
1174 
1175 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1176 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1177 
1178 	drbd_free_ee(mdev, e);
1179 
1180 	--mdev->ov_left;
1181 
1182 	/* let's advance progress step marks only for every other megabyte */
1183 	if ((mdev->ov_left & 0x200) == 0x200)
1184 		drbd_advance_rs_marks(mdev, mdev->ov_left);
1185 
1186 	if (mdev->ov_left == 0) {
1187 		ov_oos_print(mdev);
1188 		drbd_resync_finished(mdev);
1189 	}
1190 
1191 	return ok;
1192 }
1193 
1194 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1195 {
1196 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1197 	complete(&b->done);
1198 	return 1;
1199 }
1200 
1201 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1202 {
1203 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1204 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1205 	int ok = 1;
1206 
1207 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1208 	 * just before it was reassigned and re-queued, so double check that.
1209 	 * actually, this race was harmless, since we only try to send the
1210 	 * barrier packet here, and otherwise do nothing with the object.
1211 	 * but compare with the head of w_clear_epoch */
1212 	spin_lock_irq(&mdev->req_lock);
1213 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1214 		cancel = 1;
1215 	spin_unlock_irq(&mdev->req_lock);
1216 	if (cancel)
1217 		return 1;
1218 
1219 	if (!drbd_get_data_sock(mdev))
1220 		return 0;
1221 	p->barrier = b->br_number;
1222 	/* inc_ap_pending was done where this was queued.
1223 	 * dec_ap_pending will be done in got_BarrierAck
1224 	 * or (on connection loss) in w_clear_epoch.  */
1225 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1226 				(struct p_header80 *)p, sizeof(*p), 0);
1227 	drbd_put_data_sock(mdev);
1228 
1229 	return ok;
1230 }
1231 
1232 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1233 {
1234 	if (cancel)
1235 		return 1;
1236 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1237 }
1238 
1239 int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1240 {
1241 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1242 	int ok;
1243 
1244 	if (unlikely(cancel)) {
1245 		req_mod(req, send_canceled);
1246 		return 1;
1247 	}
1248 
1249 	ok = drbd_send_oos(mdev, req);
1250 	req_mod(req, oos_handed_to_network);
1251 
1252 	return ok;
1253 }
1254 
1255 /**
1256  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1257  * @mdev:	DRBD device.
1258  * @w:		work object.
1259  * @cancel:	The connection will be closed anyways
1260  */
1261 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1262 {
1263 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1264 	int ok;
1265 
1266 	if (unlikely(cancel)) {
1267 		req_mod(req, send_canceled);
1268 		return 1;
1269 	}
1270 
1271 	ok = drbd_send_dblock(mdev, req);
1272 	req_mod(req, ok ? handed_over_to_network : send_failed);
1273 
1274 	return ok;
1275 }
1276 
1277 /**
1278  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1279  * @mdev:	DRBD device.
1280  * @w:		work object.
1281  * @cancel:	The connection will be closed anyways
1282  */
1283 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1284 {
1285 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1286 	int ok;
1287 
1288 	if (unlikely(cancel)) {
1289 		req_mod(req, send_canceled);
1290 		return 1;
1291 	}
1292 
1293 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1294 				(unsigned long)req);
1295 
1296 	if (!ok) {
1297 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1298 		 * so this is probably redundant */
1299 		if (mdev->state.conn >= C_CONNECTED)
1300 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1301 	}
1302 	req_mod(req, ok ? handed_over_to_network : send_failed);
1303 
1304 	return ok;
1305 }
1306 
1307 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1308 {
1309 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1310 
1311 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1312 		drbd_al_begin_io(mdev, req->sector);
1313 	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1314 	   theoretically. Practically it can not deadlock, since this is
1315 	   only used when unfreezing IOs. All the extents of the requests
1316 	   that made it into the TL are already active */
1317 
1318 	drbd_req_make_private_bio(req, req->master_bio);
1319 	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1320 	generic_make_request(req->private_bio);
1321 
1322 	return 1;
1323 }
1324 
1325 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1326 {
1327 	struct drbd_conf *odev = mdev;
1328 
1329 	while (1) {
1330 		if (odev->sync_conf.after == -1)
1331 			return 1;
1332 		odev = minor_to_mdev(odev->sync_conf.after);
1333 		ERR_IF(!odev) return 1;
1334 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1335 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1336 		    odev->state.aftr_isp || odev->state.peer_isp ||
1337 		    odev->state.user_isp)
1338 			return 0;
1339 	}
1340 }
1341 
1342 /**
1343  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1344  * @mdev:	DRBD device.
1345  *
1346  * Called from process context only (admin command and after_state_ch).
1347  */
1348 static int _drbd_pause_after(struct drbd_conf *mdev)
1349 {
1350 	struct drbd_conf *odev;
1351 	int i, rv = 0;
1352 
1353 	for (i = 0; i < minor_count; i++) {
1354 		odev = minor_to_mdev(i);
1355 		if (!odev)
1356 			continue;
1357 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1358 			continue;
1359 		if (!_drbd_may_sync_now(odev))
1360 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1361 			       != SS_NOTHING_TO_DO);
1362 	}
1363 
1364 	return rv;
1365 }
1366 
1367 /**
1368  * _drbd_resume_next() - Resume resync on all devices that may resync now
1369  * @mdev:	DRBD device.
1370  *
1371  * Called from process context only (admin command and worker).
1372  */
1373 static int _drbd_resume_next(struct drbd_conf *mdev)
1374 {
1375 	struct drbd_conf *odev;
1376 	int i, rv = 0;
1377 
1378 	for (i = 0; i < minor_count; i++) {
1379 		odev = minor_to_mdev(i);
1380 		if (!odev)
1381 			continue;
1382 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1383 			continue;
1384 		if (odev->state.aftr_isp) {
1385 			if (_drbd_may_sync_now(odev))
1386 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1387 							CS_HARD, NULL)
1388 				       != SS_NOTHING_TO_DO) ;
1389 		}
1390 	}
1391 	return rv;
1392 }
1393 
1394 void resume_next_sg(struct drbd_conf *mdev)
1395 {
1396 	write_lock_irq(&global_state_lock);
1397 	_drbd_resume_next(mdev);
1398 	write_unlock_irq(&global_state_lock);
1399 }
1400 
1401 void suspend_other_sg(struct drbd_conf *mdev)
1402 {
1403 	write_lock_irq(&global_state_lock);
1404 	_drbd_pause_after(mdev);
1405 	write_unlock_irq(&global_state_lock);
1406 }
1407 
1408 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1409 {
1410 	struct drbd_conf *odev;
1411 
1412 	if (o_minor == -1)
1413 		return NO_ERROR;
1414 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1415 		return ERR_SYNC_AFTER;
1416 
1417 	/* check for loops */
1418 	odev = minor_to_mdev(o_minor);
1419 	while (1) {
1420 		if (odev == mdev)
1421 			return ERR_SYNC_AFTER_CYCLE;
1422 
1423 		/* dependency chain ends here, no cycles. */
1424 		if (odev->sync_conf.after == -1)
1425 			return NO_ERROR;
1426 
1427 		/* follow the dependency chain */
1428 		odev = minor_to_mdev(odev->sync_conf.after);
1429 	}
1430 }
1431 
1432 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1433 {
1434 	int changes;
1435 	int retcode;
1436 
1437 	write_lock_irq(&global_state_lock);
1438 	retcode = sync_after_error(mdev, na);
1439 	if (retcode == NO_ERROR) {
1440 		mdev->sync_conf.after = na;
1441 		do {
1442 			changes  = _drbd_pause_after(mdev);
1443 			changes |= _drbd_resume_next(mdev);
1444 		} while (changes);
1445 	}
1446 	write_unlock_irq(&global_state_lock);
1447 	return retcode;
1448 }
1449 
1450 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1451 {
1452 	atomic_set(&mdev->rs_sect_in, 0);
1453 	atomic_set(&mdev->rs_sect_ev, 0);
1454 	mdev->rs_in_flight = 0;
1455 	mdev->rs_planed = 0;
1456 	spin_lock(&mdev->peer_seq_lock);
1457 	fifo_set(&mdev->rs_plan_s, 0);
1458 	spin_unlock(&mdev->peer_seq_lock);
1459 }
1460 
1461 /**
1462  * drbd_start_resync() - Start the resync process
1463  * @mdev:	DRBD device.
1464  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1465  *
1466  * This function might bring you directly into one of the
1467  * C_PAUSED_SYNC_* states.
1468  */
1469 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1470 {
1471 	union drbd_state ns;
1472 	int r;
1473 
1474 	if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1475 		dev_err(DEV, "Resync already running!\n");
1476 		return;
1477 	}
1478 
1479 	if (mdev->state.conn < C_AHEAD) {
1480 		/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1481 		drbd_rs_cancel_all(mdev);
1482 		/* This should be done when we abort the resync. We definitely do not
1483 		   want to have this for connections going back and forth between
1484 		   Ahead/Behind and SyncSource/SyncTarget */
1485 	}
1486 
1487 	if (side == C_SYNC_TARGET) {
1488 		/* Since application IO was locked out during C_WF_BITMAP_T and
1489 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1490 		   we check that we might make the data inconsistent. */
1491 		r = drbd_khelper(mdev, "before-resync-target");
1492 		r = (r >> 8) & 0xff;
1493 		if (r > 0) {
1494 			dev_info(DEV, "before-resync-target handler returned %d, "
1495 			     "dropping connection.\n", r);
1496 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1497 			return;
1498 		}
1499 	} else /* C_SYNC_SOURCE */ {
1500 		r = drbd_khelper(mdev, "before-resync-source");
1501 		r = (r >> 8) & 0xff;
1502 		if (r > 0) {
1503 			if (r == 3) {
1504 				dev_info(DEV, "before-resync-source handler returned %d, "
1505 					 "ignoring. Old userland tools?", r);
1506 			} else {
1507 				dev_info(DEV, "before-resync-source handler returned %d, "
1508 					 "dropping connection.\n", r);
1509 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1510 				return;
1511 			}
1512 		}
1513 	}
1514 
1515 	drbd_state_lock(mdev);
1516 
1517 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1518 		drbd_state_unlock(mdev);
1519 		return;
1520 	}
1521 
1522 	write_lock_irq(&global_state_lock);
1523 	ns = mdev->state;
1524 
1525 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1526 
1527 	ns.conn = side;
1528 
1529 	if (side == C_SYNC_TARGET)
1530 		ns.disk = D_INCONSISTENT;
1531 	else /* side == C_SYNC_SOURCE */
1532 		ns.pdsk = D_INCONSISTENT;
1533 
1534 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1535 	ns = mdev->state;
1536 
1537 	if (ns.conn < C_CONNECTED)
1538 		r = SS_UNKNOWN_ERROR;
1539 
1540 	if (r == SS_SUCCESS) {
1541 		unsigned long tw = drbd_bm_total_weight(mdev);
1542 		unsigned long now = jiffies;
1543 		int i;
1544 
1545 		mdev->rs_failed    = 0;
1546 		mdev->rs_paused    = 0;
1547 		mdev->rs_same_csum = 0;
1548 		mdev->rs_last_events = 0;
1549 		mdev->rs_last_sect_ev = 0;
1550 		mdev->rs_total     = tw;
1551 		mdev->rs_start     = now;
1552 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1553 			mdev->rs_mark_left[i] = tw;
1554 			mdev->rs_mark_time[i] = now;
1555 		}
1556 		_drbd_pause_after(mdev);
1557 	}
1558 	write_unlock_irq(&global_state_lock);
1559 
1560 	if (side == C_SYNC_TARGET)
1561 		mdev->bm_resync_fo = 0;
1562 
1563 	/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1564 	 * with w_send_oos, or the sync target will get confused as to
1565 	 * how much bits to resync.  We cannot do that always, because for an
1566 	 * empty resync and protocol < 95, we need to do it here, as we call
1567 	 * drbd_resync_finished from here in that case.
1568 	 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1569 	 * and from after_state_ch otherwise. */
1570 	if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1571 		drbd_gen_and_send_sync_uuid(mdev);
1572 
1573 	if (r == SS_SUCCESS) {
1574 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1575 		     drbd_conn_str(ns.conn),
1576 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1577 		     (unsigned long) mdev->rs_total);
1578 
1579 		if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1580 			/* This still has a race (about when exactly the peers
1581 			 * detect connection loss) that can lead to a full sync
1582 			 * on next handshake. In 8.3.9 we fixed this with explicit
1583 			 * resync-finished notifications, but the fix
1584 			 * introduces a protocol change.  Sleeping for some
1585 			 * time longer than the ping interval + timeout on the
1586 			 * SyncSource, to give the SyncTarget the chance to
1587 			 * detect connection loss, then waiting for a ping
1588 			 * response (implicit in drbd_resync_finished) reduces
1589 			 * the race considerably, but does not solve it. */
1590 			if (side == C_SYNC_SOURCE)
1591 				schedule_timeout_interruptible(
1592 					mdev->net_conf->ping_int * HZ +
1593 					mdev->net_conf->ping_timeo*HZ/9);
1594 			drbd_resync_finished(mdev);
1595 		}
1596 
1597 		drbd_rs_controller_reset(mdev);
1598 		/* ns.conn may already be != mdev->state.conn,
1599 		 * we may have been paused in between, or become paused until
1600 		 * the timer triggers.
1601 		 * No matter, that is handled in resync_timer_fn() */
1602 		if (ns.conn == C_SYNC_TARGET)
1603 			mod_timer(&mdev->resync_timer, jiffies);
1604 
1605 		drbd_md_sync(mdev);
1606 	}
1607 	put_ldev(mdev);
1608 	drbd_state_unlock(mdev);
1609 }
1610 
1611 int drbd_worker(struct drbd_thread *thi)
1612 {
1613 	struct drbd_conf *mdev = thi->mdev;
1614 	struct drbd_work *w = NULL;
1615 	LIST_HEAD(work_list);
1616 	int intr = 0, i;
1617 
1618 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1619 
1620 	while (get_t_state(thi) == Running) {
1621 		drbd_thread_current_set_cpu(mdev);
1622 
1623 		if (down_trylock(&mdev->data.work.s)) {
1624 			mutex_lock(&mdev->data.mutex);
1625 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1626 				drbd_tcp_uncork(mdev->data.socket);
1627 			mutex_unlock(&mdev->data.mutex);
1628 
1629 			intr = down_interruptible(&mdev->data.work.s);
1630 
1631 			mutex_lock(&mdev->data.mutex);
1632 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1633 				drbd_tcp_cork(mdev->data.socket);
1634 			mutex_unlock(&mdev->data.mutex);
1635 		}
1636 
1637 		if (intr) {
1638 			D_ASSERT(intr == -EINTR);
1639 			flush_signals(current);
1640 			ERR_IF (get_t_state(thi) == Running)
1641 				continue;
1642 			break;
1643 		}
1644 
1645 		if (get_t_state(thi) != Running)
1646 			break;
1647 		/* With this break, we have done a down() but not consumed
1648 		   the entry from the list. The cleanup code takes care of
1649 		   this...   */
1650 
1651 		w = NULL;
1652 		spin_lock_irq(&mdev->data.work.q_lock);
1653 		ERR_IF(list_empty(&mdev->data.work.q)) {
1654 			/* something terribly wrong in our logic.
1655 			 * we were able to down() the semaphore,
1656 			 * but the list is empty... doh.
1657 			 *
1658 			 * what is the best thing to do now?
1659 			 * try again from scratch, restarting the receiver,
1660 			 * asender, whatnot? could break even more ugly,
1661 			 * e.g. when we are primary, but no good local data.
1662 			 *
1663 			 * I'll try to get away just starting over this loop.
1664 			 */
1665 			spin_unlock_irq(&mdev->data.work.q_lock);
1666 			continue;
1667 		}
1668 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1669 		list_del_init(&w->list);
1670 		spin_unlock_irq(&mdev->data.work.q_lock);
1671 
1672 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1673 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1674 			if (mdev->state.conn >= C_CONNECTED)
1675 				drbd_force_state(mdev,
1676 						NS(conn, C_NETWORK_FAILURE));
1677 		}
1678 	}
1679 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1680 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1681 
1682 	spin_lock_irq(&mdev->data.work.q_lock);
1683 	i = 0;
1684 	while (!list_empty(&mdev->data.work.q)) {
1685 		list_splice_init(&mdev->data.work.q, &work_list);
1686 		spin_unlock_irq(&mdev->data.work.q_lock);
1687 
1688 		while (!list_empty(&work_list)) {
1689 			w = list_entry(work_list.next, struct drbd_work, list);
1690 			list_del_init(&w->list);
1691 			w->cb(mdev, w, 1);
1692 			i++; /* dead debugging code */
1693 		}
1694 
1695 		spin_lock_irq(&mdev->data.work.q_lock);
1696 	}
1697 	sema_init(&mdev->data.work.s, 0);
1698 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1699 	 * but up() ed outside the spinlock, we could get an up() on the
1700 	 * semaphore without corresponding list entry.
1701 	 * So don't do that.
1702 	 */
1703 	spin_unlock_irq(&mdev->data.work.q_lock);
1704 
1705 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1706 	/* _drbd_set_state only uses stop_nowait.
1707 	 * wait here for the Exiting receiver. */
1708 	drbd_thread_stop(&mdev->receiver);
1709 	drbd_mdev_cleanup(mdev);
1710 
1711 	dev_info(DEV, "worker terminated\n");
1712 
1713 	clear_bit(DEVICE_DYING, &mdev->flags);
1714 	clear_bit(CONFIG_PENDING, &mdev->flags);
1715 	wake_up(&mdev->state_wait);
1716 
1717 	return 0;
1718 }
1719