xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision 02851e9f)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40 
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 static int w_make_resync_request(struct drbd_conf *mdev,
43 				 struct drbd_work *w, int cancel);
44 
45 
46 
47 /* defined here:
48    drbd_md_io_complete
49    drbd_endio_sec
50    drbd_endio_pri
51 
52  * more endio handlers:
53    atodb_endio in drbd_actlog.c
54    drbd_bm_async_io_complete in drbd_bitmap.c
55 
56  * For all these callbacks, note the following:
57  * The callbacks will be called in irq context by the IDE drivers,
58  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
59  * Try to get the locking right :)
60  *
61  */
62 
63 
64 /* About the global_state_lock
65    Each state transition on an device holds a read lock. In case we have
66    to evaluate the sync after dependencies, we grab a write lock, because
67    we need stable states on all devices for that.  */
68 rwlock_t global_state_lock;
69 
70 /* used for synchronous meta data and bitmap IO
71  * submitted by drbd_md_sync_page_io()
72  */
73 void drbd_md_io_complete(struct bio *bio, int error)
74 {
75 	struct drbd_md_io *md_io;
76 
77 	md_io = (struct drbd_md_io *)bio->bi_private;
78 	md_io->error = error;
79 
80 	complete(&md_io->event);
81 }
82 
83 /* reads on behalf of the partner,
84  * "submitted" by the receiver
85  */
86 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
87 {
88 	unsigned long flags = 0;
89 	struct drbd_conf *mdev = e->mdev;
90 
91 	D_ASSERT(e->block_id != ID_VACANT);
92 
93 	spin_lock_irqsave(&mdev->req_lock, flags);
94 	mdev->read_cnt += e->size >> 9;
95 	list_del(&e->w.list);
96 	if (list_empty(&mdev->read_ee))
97 		wake_up(&mdev->ee_wait);
98 	if (test_bit(__EE_WAS_ERROR, &e->flags))
99 		__drbd_chk_io_error(mdev, false);
100 	spin_unlock_irqrestore(&mdev->req_lock, flags);
101 
102 	drbd_queue_work(&mdev->data.work, &e->w);
103 	put_ldev(mdev);
104 }
105 
106 /* writes on behalf of the partner, or resync writes,
107  * "submitted" by the receiver, final stage.  */
108 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
109 {
110 	unsigned long flags = 0;
111 	struct drbd_conf *mdev = e->mdev;
112 	sector_t e_sector;
113 	int do_wake;
114 	int is_syncer_req;
115 	int do_al_complete_io;
116 
117 	D_ASSERT(e->block_id != ID_VACANT);
118 
119 	/* after we moved e to done_ee,
120 	 * we may no longer access it,
121 	 * it may be freed/reused already!
122 	 * (as soon as we release the req_lock) */
123 	e_sector = e->sector;
124 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
125 	is_syncer_req = is_syncer_block_id(e->block_id);
126 
127 	spin_lock_irqsave(&mdev->req_lock, flags);
128 	mdev->writ_cnt += e->size >> 9;
129 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
130 	list_add_tail(&e->w.list, &mdev->done_ee);
131 
132 	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
133 	 * neither did we wake possibly waiting conflicting requests.
134 	 * done from "drbd_process_done_ee" within the appropriate w.cb
135 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
136 
137 	do_wake = is_syncer_req
138 		? list_empty(&mdev->sync_ee)
139 		: list_empty(&mdev->active_ee);
140 
141 	if (test_bit(__EE_WAS_ERROR, &e->flags))
142 		__drbd_chk_io_error(mdev, false);
143 	spin_unlock_irqrestore(&mdev->req_lock, flags);
144 
145 	if (is_syncer_req)
146 		drbd_rs_complete_io(mdev, e_sector);
147 
148 	if (do_wake)
149 		wake_up(&mdev->ee_wait);
150 
151 	if (do_al_complete_io)
152 		drbd_al_complete_io(mdev, e_sector);
153 
154 	wake_asender(mdev);
155 	put_ldev(mdev);
156 }
157 
158 /* writes on behalf of the partner, or resync writes,
159  * "submitted" by the receiver.
160  */
161 void drbd_endio_sec(struct bio *bio, int error)
162 {
163 	struct drbd_epoch_entry *e = bio->bi_private;
164 	struct drbd_conf *mdev = e->mdev;
165 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
166 	int is_write = bio_data_dir(bio) == WRITE;
167 
168 	if (error)
169 		dev_warn(DEV, "%s: error=%d s=%llus\n",
170 				is_write ? "write" : "read", error,
171 				(unsigned long long)e->sector);
172 	if (!error && !uptodate) {
173 		dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
174 				is_write ? "write" : "read",
175 				(unsigned long long)e->sector);
176 		/* strange behavior of some lower level drivers...
177 		 * fail the request by clearing the uptodate flag,
178 		 * but do not return any error?! */
179 		error = -EIO;
180 	}
181 
182 	if (error)
183 		set_bit(__EE_WAS_ERROR, &e->flags);
184 
185 	bio_put(bio); /* no need for the bio anymore */
186 	if (atomic_dec_and_test(&e->pending_bios)) {
187 		if (is_write)
188 			drbd_endio_write_sec_final(e);
189 		else
190 			drbd_endio_read_sec_final(e);
191 	}
192 }
193 
194 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
195  */
196 void drbd_endio_pri(struct bio *bio, int error)
197 {
198 	unsigned long flags;
199 	struct drbd_request *req = bio->bi_private;
200 	struct drbd_conf *mdev = req->mdev;
201 	struct bio_and_error m;
202 	enum drbd_req_event what;
203 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
204 
205 	if (!error && !uptodate) {
206 		dev_warn(DEV, "p %s: setting error to -EIO\n",
207 			 bio_data_dir(bio) == WRITE ? "write" : "read");
208 		/* strange behavior of some lower level drivers...
209 		 * fail the request by clearing the uptodate flag,
210 		 * but do not return any error?! */
211 		error = -EIO;
212 	}
213 
214 	/* to avoid recursion in __req_mod */
215 	if (unlikely(error)) {
216 		what = (bio_data_dir(bio) == WRITE)
217 			? write_completed_with_error
218 			: (bio_rw(bio) == READ)
219 			  ? read_completed_with_error
220 			  : read_ahead_completed_with_error;
221 	} else
222 		what = completed_ok;
223 
224 	bio_put(req->private_bio);
225 	req->private_bio = ERR_PTR(error);
226 
227 	/* not req_mod(), we need irqsave here! */
228 	spin_lock_irqsave(&mdev->req_lock, flags);
229 	__req_mod(req, what, &m);
230 	spin_unlock_irqrestore(&mdev->req_lock, flags);
231 
232 	if (m.bio)
233 		complete_master_bio(mdev, &m);
234 }
235 
236 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
237 {
238 	struct drbd_request *req = container_of(w, struct drbd_request, w);
239 
240 	/* We should not detach for read io-error,
241 	 * but try to WRITE the P_DATA_REPLY to the failed location,
242 	 * to give the disk the chance to relocate that block */
243 
244 	spin_lock_irq(&mdev->req_lock);
245 	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
246 		_req_mod(req, read_retry_remote_canceled);
247 		spin_unlock_irq(&mdev->req_lock);
248 		return 1;
249 	}
250 	spin_unlock_irq(&mdev->req_lock);
251 
252 	return w_send_read_req(mdev, w, 0);
253 }
254 
255 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
256 {
257 	ERR_IF(cancel) return 1;
258 	dev_err(DEV, "resync inactive, but callback triggered??\n");
259 	return 1; /* Simply ignore this! */
260 }
261 
262 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
263 {
264 	struct hash_desc desc;
265 	struct scatterlist sg;
266 	struct page *page = e->pages;
267 	struct page *tmp;
268 	unsigned len;
269 
270 	desc.tfm = tfm;
271 	desc.flags = 0;
272 
273 	sg_init_table(&sg, 1);
274 	crypto_hash_init(&desc);
275 
276 	while ((tmp = page_chain_next(page))) {
277 		/* all but the last page will be fully used */
278 		sg_set_page(&sg, page, PAGE_SIZE, 0);
279 		crypto_hash_update(&desc, &sg, sg.length);
280 		page = tmp;
281 	}
282 	/* and now the last, possibly only partially used page */
283 	len = e->size & (PAGE_SIZE - 1);
284 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
285 	crypto_hash_update(&desc, &sg, sg.length);
286 	crypto_hash_final(&desc, digest);
287 }
288 
289 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
290 {
291 	struct hash_desc desc;
292 	struct scatterlist sg;
293 	struct bio_vec *bvec;
294 	int i;
295 
296 	desc.tfm = tfm;
297 	desc.flags = 0;
298 
299 	sg_init_table(&sg, 1);
300 	crypto_hash_init(&desc);
301 
302 	__bio_for_each_segment(bvec, bio, i, 0) {
303 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
304 		crypto_hash_update(&desc, &sg, sg.length);
305 	}
306 	crypto_hash_final(&desc, digest);
307 }
308 
309 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
310 {
311 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
312 	int digest_size;
313 	void *digest;
314 	int ok;
315 
316 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
317 
318 	if (unlikely(cancel)) {
319 		drbd_free_ee(mdev, e);
320 		return 1;
321 	}
322 
323 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
324 		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
325 		digest = kmalloc(digest_size, GFP_NOIO);
326 		if (digest) {
327 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
328 
329 			inc_rs_pending(mdev);
330 			ok = drbd_send_drequest_csum(mdev,
331 						     e->sector,
332 						     e->size,
333 						     digest,
334 						     digest_size,
335 						     P_CSUM_RS_REQUEST);
336 			kfree(digest);
337 		} else {
338 			dev_err(DEV, "kmalloc() of digest failed.\n");
339 			ok = 0;
340 		}
341 	} else
342 		ok = 1;
343 
344 	drbd_free_ee(mdev, e);
345 
346 	if (unlikely(!ok))
347 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
348 	return ok;
349 }
350 
351 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
352 
353 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
354 {
355 	struct drbd_epoch_entry *e;
356 
357 	if (!get_ldev(mdev))
358 		return -EIO;
359 
360 	if (drbd_rs_should_slow_down(mdev, sector))
361 		goto defer;
362 
363 	/* GFP_TRY, because if there is no memory available right now, this may
364 	 * be rescheduled for later. It is "only" background resync, after all. */
365 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
366 	if (!e)
367 		goto defer;
368 
369 	e->w.cb = w_e_send_csum;
370 	spin_lock_irq(&mdev->req_lock);
371 	list_add(&e->w.list, &mdev->read_ee);
372 	spin_unlock_irq(&mdev->req_lock);
373 
374 	atomic_add(size >> 9, &mdev->rs_sect_ev);
375 	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
376 		return 0;
377 
378 	/* drbd_submit_ee currently fails for one reason only:
379 	 * not being able to allocate enough bios.
380 	 * Is dropping the connection going to help? */
381 	spin_lock_irq(&mdev->req_lock);
382 	list_del(&e->w.list);
383 	spin_unlock_irq(&mdev->req_lock);
384 
385 	drbd_free_ee(mdev, e);
386 defer:
387 	put_ldev(mdev);
388 	return -EAGAIN;
389 }
390 
391 void resync_timer_fn(unsigned long data)
392 {
393 	struct drbd_conf *mdev = (struct drbd_conf *) data;
394 	int queue;
395 
396 	queue = 1;
397 	switch (mdev->state.conn) {
398 	case C_VERIFY_S:
399 		mdev->resync_work.cb = w_make_ov_request;
400 		break;
401 	case C_SYNC_TARGET:
402 		mdev->resync_work.cb = w_make_resync_request;
403 		break;
404 	default:
405 		queue = 0;
406 		mdev->resync_work.cb = w_resync_inactive;
407 	}
408 
409 	/* harmless race: list_empty outside data.work.q_lock */
410 	if (list_empty(&mdev->resync_work.list) && queue)
411 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
412 }
413 
414 static void fifo_set(struct fifo_buffer *fb, int value)
415 {
416 	int i;
417 
418 	for (i = 0; i < fb->size; i++)
419 		fb->values[i] = value;
420 }
421 
422 static int fifo_push(struct fifo_buffer *fb, int value)
423 {
424 	int ov;
425 
426 	ov = fb->values[fb->head_index];
427 	fb->values[fb->head_index++] = value;
428 
429 	if (fb->head_index >= fb->size)
430 		fb->head_index = 0;
431 
432 	return ov;
433 }
434 
435 static void fifo_add_val(struct fifo_buffer *fb, int value)
436 {
437 	int i;
438 
439 	for (i = 0; i < fb->size; i++)
440 		fb->values[i] += value;
441 }
442 
443 static int drbd_rs_controller(struct drbd_conf *mdev)
444 {
445 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
446 	unsigned int want;     /* The number of sectors we want in the proxy */
447 	int req_sect; /* Number of sectors to request in this turn */
448 	int correction; /* Number of sectors more we need in the proxy*/
449 	int cps; /* correction per invocation of drbd_rs_controller() */
450 	int steps; /* Number of time steps to plan ahead */
451 	int curr_corr;
452 	int max_sect;
453 
454 	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
455 	mdev->rs_in_flight -= sect_in;
456 
457 	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
458 
459 	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
460 
461 	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
462 		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
463 	} else { /* normal path */
464 		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
465 			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
466 	}
467 
468 	correction = want - mdev->rs_in_flight - mdev->rs_planed;
469 
470 	/* Plan ahead */
471 	cps = correction / steps;
472 	fifo_add_val(&mdev->rs_plan_s, cps);
473 	mdev->rs_planed += cps * steps;
474 
475 	/* What we do in this step */
476 	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
477 	spin_unlock(&mdev->peer_seq_lock);
478 	mdev->rs_planed -= curr_corr;
479 
480 	req_sect = sect_in + curr_corr;
481 	if (req_sect < 0)
482 		req_sect = 0;
483 
484 	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
485 	if (req_sect > max_sect)
486 		req_sect = max_sect;
487 
488 	/*
489 	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
490 		 sect_in, mdev->rs_in_flight, want, correction,
491 		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
492 	*/
493 
494 	return req_sect;
495 }
496 
497 static int drbd_rs_number_requests(struct drbd_conf *mdev)
498 {
499 	int number;
500 	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
501 		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
502 		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
503 	} else {
504 		mdev->c_sync_rate = mdev->sync_conf.rate;
505 		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
506 	}
507 
508 	/* ignore the amount of pending requests, the resync controller should
509 	 * throttle down to incoming reply rate soon enough anyways. */
510 	return number;
511 }
512 
513 static int w_make_resync_request(struct drbd_conf *mdev,
514 				 struct drbd_work *w, int cancel)
515 {
516 	unsigned long bit;
517 	sector_t sector;
518 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
519 	int max_bio_size;
520 	int number, rollback_i, size;
521 	int align, queued, sndbuf;
522 	int i = 0;
523 
524 	if (unlikely(cancel))
525 		return 1;
526 
527 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
528 		dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
529 		return 0;
530 	}
531 
532 	if (mdev->state.conn != C_SYNC_TARGET)
533 		dev_err(DEV, "%s in w_make_resync_request\n",
534 			drbd_conn_str(mdev->state.conn));
535 
536 	if (mdev->rs_total == 0) {
537 		/* empty resync? */
538 		drbd_resync_finished(mdev);
539 		return 1;
540 	}
541 
542 	if (!get_ldev(mdev)) {
543 		/* Since we only need to access mdev->rsync a
544 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
545 		   to continue resync with a broken disk makes no sense at
546 		   all */
547 		dev_err(DEV, "Disk broke down during resync!\n");
548 		mdev->resync_work.cb = w_resync_inactive;
549 		return 1;
550 	}
551 
552 	/* starting with drbd 8.3.8, we can handle multi-bio EEs,
553 	 * if it should be necessary */
554 	max_bio_size =
555 		mdev->agreed_pro_version < 94 ? queue_max_hw_sectors(mdev->rq_queue) << 9 :
556 		mdev->agreed_pro_version < 95 ?	DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_BIO_SIZE;
557 
558 	number = drbd_rs_number_requests(mdev);
559 	if (number == 0)
560 		goto requeue;
561 
562 	for (i = 0; i < number; i++) {
563 		/* Stop generating RS requests, when half of the send buffer is filled */
564 		mutex_lock(&mdev->data.mutex);
565 		if (mdev->data.socket) {
566 			queued = mdev->data.socket->sk->sk_wmem_queued;
567 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
568 		} else {
569 			queued = 1;
570 			sndbuf = 0;
571 		}
572 		mutex_unlock(&mdev->data.mutex);
573 		if (queued > sndbuf / 2)
574 			goto requeue;
575 
576 next_sector:
577 		size = BM_BLOCK_SIZE;
578 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
579 
580 		if (bit == DRBD_END_OF_BITMAP) {
581 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
582 			mdev->resync_work.cb = w_resync_inactive;
583 			put_ldev(mdev);
584 			return 1;
585 		}
586 
587 		sector = BM_BIT_TO_SECT(bit);
588 
589 		if (drbd_rs_should_slow_down(mdev, sector) ||
590 		    drbd_try_rs_begin_io(mdev, sector)) {
591 			mdev->bm_resync_fo = bit;
592 			goto requeue;
593 		}
594 		mdev->bm_resync_fo = bit + 1;
595 
596 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
597 			drbd_rs_complete_io(mdev, sector);
598 			goto next_sector;
599 		}
600 
601 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
602 		/* try to find some adjacent bits.
603 		 * we stop if we have already the maximum req size.
604 		 *
605 		 * Additionally always align bigger requests, in order to
606 		 * be prepared for all stripe sizes of software RAIDs.
607 		 */
608 		align = 1;
609 		rollback_i = i;
610 		for (;;) {
611 			if (size + BM_BLOCK_SIZE > max_bio_size)
612 				break;
613 
614 			/* Be always aligned */
615 			if (sector & ((1<<(align+3))-1))
616 				break;
617 
618 			/* do not cross extent boundaries */
619 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
620 				break;
621 			/* now, is it actually dirty, after all?
622 			 * caution, drbd_bm_test_bit is tri-state for some
623 			 * obscure reason; ( b == 0 ) would get the out-of-band
624 			 * only accidentally right because of the "oddly sized"
625 			 * adjustment below */
626 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
627 				break;
628 			bit++;
629 			size += BM_BLOCK_SIZE;
630 			if ((BM_BLOCK_SIZE << align) <= size)
631 				align++;
632 			i++;
633 		}
634 		/* if we merged some,
635 		 * reset the offset to start the next drbd_bm_find_next from */
636 		if (size > BM_BLOCK_SIZE)
637 			mdev->bm_resync_fo = bit + 1;
638 #endif
639 
640 		/* adjust very last sectors, in case we are oddly sized */
641 		if (sector + (size>>9) > capacity)
642 			size = (capacity-sector)<<9;
643 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
644 			switch (read_for_csum(mdev, sector, size)) {
645 			case -EIO: /* Disk failure */
646 				put_ldev(mdev);
647 				return 0;
648 			case -EAGAIN: /* allocation failed, or ldev busy */
649 				drbd_rs_complete_io(mdev, sector);
650 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
651 				i = rollback_i;
652 				goto requeue;
653 			case 0:
654 				/* everything ok */
655 				break;
656 			default:
657 				BUG();
658 			}
659 		} else {
660 			inc_rs_pending(mdev);
661 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
662 					       sector, size, ID_SYNCER)) {
663 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
664 				dec_rs_pending(mdev);
665 				put_ldev(mdev);
666 				return 0;
667 			}
668 		}
669 	}
670 
671 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
672 		/* last syncer _request_ was sent,
673 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
674 		 * next sync group will resume), as soon as we receive the last
675 		 * resync data block, and the last bit is cleared.
676 		 * until then resync "work" is "inactive" ...
677 		 */
678 		mdev->resync_work.cb = w_resync_inactive;
679 		put_ldev(mdev);
680 		return 1;
681 	}
682 
683  requeue:
684 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
685 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
686 	put_ldev(mdev);
687 	return 1;
688 }
689 
690 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
691 {
692 	int number, i, size;
693 	sector_t sector;
694 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
695 
696 	if (unlikely(cancel))
697 		return 1;
698 
699 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
700 		dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
701 		return 0;
702 	}
703 
704 	number = drbd_rs_number_requests(mdev);
705 
706 	sector = mdev->ov_position;
707 	for (i = 0; i < number; i++) {
708 		if (sector >= capacity) {
709 			mdev->resync_work.cb = w_resync_inactive;
710 			return 1;
711 		}
712 
713 		size = BM_BLOCK_SIZE;
714 
715 		if (drbd_rs_should_slow_down(mdev, sector) ||
716 		    drbd_try_rs_begin_io(mdev, sector)) {
717 			mdev->ov_position = sector;
718 			goto requeue;
719 		}
720 
721 		if (sector + (size>>9) > capacity)
722 			size = (capacity-sector)<<9;
723 
724 		inc_rs_pending(mdev);
725 		if (!drbd_send_ov_request(mdev, sector, size)) {
726 			dec_rs_pending(mdev);
727 			return 0;
728 		}
729 		sector += BM_SECT_PER_BIT;
730 	}
731 	mdev->ov_position = sector;
732 
733  requeue:
734 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
735 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
736 	return 1;
737 }
738 
739 
740 int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
741 {
742 	drbd_start_resync(mdev, C_SYNC_SOURCE);
743 
744 	return 1;
745 }
746 
747 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
748 {
749 	kfree(w);
750 	ov_oos_print(mdev);
751 	drbd_resync_finished(mdev);
752 
753 	return 1;
754 }
755 
756 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
757 {
758 	kfree(w);
759 
760 	drbd_resync_finished(mdev);
761 
762 	return 1;
763 }
764 
765 static void ping_peer(struct drbd_conf *mdev)
766 {
767 	clear_bit(GOT_PING_ACK, &mdev->flags);
768 	request_ping(mdev);
769 	wait_event(mdev->misc_wait,
770 		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
771 }
772 
773 int drbd_resync_finished(struct drbd_conf *mdev)
774 {
775 	unsigned long db, dt, dbdt;
776 	unsigned long n_oos;
777 	union drbd_state os, ns;
778 	struct drbd_work *w;
779 	char *khelper_cmd = NULL;
780 	int verify_done = 0;
781 
782 	/* Remove all elements from the resync LRU. Since future actions
783 	 * might set bits in the (main) bitmap, then the entries in the
784 	 * resync LRU would be wrong. */
785 	if (drbd_rs_del_all(mdev)) {
786 		/* In case this is not possible now, most probably because
787 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
788 		 * queue (or even the read operations for those packets
789 		 * is not finished by now).   Retry in 100ms. */
790 
791 		__set_current_state(TASK_INTERRUPTIBLE);
792 		schedule_timeout(HZ / 10);
793 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
794 		if (w) {
795 			w->cb = w_resync_finished;
796 			drbd_queue_work(&mdev->data.work, w);
797 			return 1;
798 		}
799 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
800 	}
801 
802 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
803 	if (dt <= 0)
804 		dt = 1;
805 	db = mdev->rs_total;
806 	dbdt = Bit2KB(db/dt);
807 	mdev->rs_paused /= HZ;
808 
809 	if (!get_ldev(mdev))
810 		goto out;
811 
812 	ping_peer(mdev);
813 
814 	spin_lock_irq(&mdev->req_lock);
815 	os = mdev->state;
816 
817 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
818 
819 	/* This protects us against multiple calls (that can happen in the presence
820 	   of application IO), and against connectivity loss just before we arrive here. */
821 	if (os.conn <= C_CONNECTED)
822 		goto out_unlock;
823 
824 	ns = os;
825 	ns.conn = C_CONNECTED;
826 
827 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
828 	     verify_done ? "Online verify " : "Resync",
829 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
830 
831 	n_oos = drbd_bm_total_weight(mdev);
832 
833 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
834 		if (n_oos) {
835 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
836 			      n_oos, Bit2KB(1));
837 			khelper_cmd = "out-of-sync";
838 		}
839 	} else {
840 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
841 
842 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
843 			khelper_cmd = "after-resync-target";
844 
845 		if (mdev->csums_tfm && mdev->rs_total) {
846 			const unsigned long s = mdev->rs_same_csum;
847 			const unsigned long t = mdev->rs_total;
848 			const int ratio =
849 				(t == 0)     ? 0 :
850 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
851 			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
852 			     "transferred %luK total %luK\n",
853 			     ratio,
854 			     Bit2KB(mdev->rs_same_csum),
855 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
856 			     Bit2KB(mdev->rs_total));
857 		}
858 	}
859 
860 	if (mdev->rs_failed) {
861 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
862 
863 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
864 			ns.disk = D_INCONSISTENT;
865 			ns.pdsk = D_UP_TO_DATE;
866 		} else {
867 			ns.disk = D_UP_TO_DATE;
868 			ns.pdsk = D_INCONSISTENT;
869 		}
870 	} else {
871 		ns.disk = D_UP_TO_DATE;
872 		ns.pdsk = D_UP_TO_DATE;
873 
874 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
875 			if (mdev->p_uuid) {
876 				int i;
877 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
878 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
879 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
880 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
881 			} else {
882 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
883 			}
884 		}
885 
886 		drbd_uuid_set_bm(mdev, 0UL);
887 
888 		if (mdev->p_uuid) {
889 			/* Now the two UUID sets are equal, update what we
890 			 * know of the peer. */
891 			int i;
892 			for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
893 				mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
894 		}
895 	}
896 
897 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
898 out_unlock:
899 	spin_unlock_irq(&mdev->req_lock);
900 	put_ldev(mdev);
901 out:
902 	mdev->rs_total  = 0;
903 	mdev->rs_failed = 0;
904 	mdev->rs_paused = 0;
905 	if (verify_done)
906 		mdev->ov_start_sector = 0;
907 
908 	drbd_md_sync(mdev);
909 
910 	if (khelper_cmd)
911 		drbd_khelper(mdev, khelper_cmd);
912 
913 	return 1;
914 }
915 
916 /* helper */
917 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
918 {
919 	if (drbd_ee_has_active_page(e)) {
920 		/* This might happen if sendpage() has not finished */
921 		int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
922 		atomic_add(i, &mdev->pp_in_use_by_net);
923 		atomic_sub(i, &mdev->pp_in_use);
924 		spin_lock_irq(&mdev->req_lock);
925 		list_add_tail(&e->w.list, &mdev->net_ee);
926 		spin_unlock_irq(&mdev->req_lock);
927 		wake_up(&drbd_pp_wait);
928 	} else
929 		drbd_free_ee(mdev, e);
930 }
931 
932 /**
933  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
934  * @mdev:	DRBD device.
935  * @w:		work object.
936  * @cancel:	The connection will be closed anyways
937  */
938 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
939 {
940 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
941 	int ok;
942 
943 	if (unlikely(cancel)) {
944 		drbd_free_ee(mdev, e);
945 		dec_unacked(mdev);
946 		return 1;
947 	}
948 
949 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
950 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
951 	} else {
952 		if (__ratelimit(&drbd_ratelimit_state))
953 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
954 			    (unsigned long long)e->sector);
955 
956 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
957 	}
958 
959 	dec_unacked(mdev);
960 
961 	move_to_net_ee_or_free(mdev, e);
962 
963 	if (unlikely(!ok))
964 		dev_err(DEV, "drbd_send_block() failed\n");
965 	return ok;
966 }
967 
968 /**
969  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
970  * @mdev:	DRBD device.
971  * @w:		work object.
972  * @cancel:	The connection will be closed anyways
973  */
974 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
975 {
976 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
977 	int ok;
978 
979 	if (unlikely(cancel)) {
980 		drbd_free_ee(mdev, e);
981 		dec_unacked(mdev);
982 		return 1;
983 	}
984 
985 	if (get_ldev_if_state(mdev, D_FAILED)) {
986 		drbd_rs_complete_io(mdev, e->sector);
987 		put_ldev(mdev);
988 	}
989 
990 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
991 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
992 			inc_rs_pending(mdev);
993 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
994 		} else {
995 			if (__ratelimit(&drbd_ratelimit_state))
996 				dev_err(DEV, "Not sending RSDataReply, "
997 				    "partner DISKLESS!\n");
998 			ok = 1;
999 		}
1000 	} else {
1001 		if (__ratelimit(&drbd_ratelimit_state))
1002 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1003 			    (unsigned long long)e->sector);
1004 
1005 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1006 
1007 		/* update resync data with failure */
1008 		drbd_rs_failed_io(mdev, e->sector, e->size);
1009 	}
1010 
1011 	dec_unacked(mdev);
1012 
1013 	move_to_net_ee_or_free(mdev, e);
1014 
1015 	if (unlikely(!ok))
1016 		dev_err(DEV, "drbd_send_block() failed\n");
1017 	return ok;
1018 }
1019 
1020 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1021 {
1022 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1023 	struct digest_info *di;
1024 	int digest_size;
1025 	void *digest = NULL;
1026 	int ok, eq = 0;
1027 
1028 	if (unlikely(cancel)) {
1029 		drbd_free_ee(mdev, e);
1030 		dec_unacked(mdev);
1031 		return 1;
1032 	}
1033 
1034 	if (get_ldev(mdev)) {
1035 		drbd_rs_complete_io(mdev, e->sector);
1036 		put_ldev(mdev);
1037 	}
1038 
1039 	di = e->digest;
1040 
1041 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1042 		/* quick hack to try to avoid a race against reconfiguration.
1043 		 * a real fix would be much more involved,
1044 		 * introducing more locking mechanisms */
1045 		if (mdev->csums_tfm) {
1046 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1047 			D_ASSERT(digest_size == di->digest_size);
1048 			digest = kmalloc(digest_size, GFP_NOIO);
1049 		}
1050 		if (digest) {
1051 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1052 			eq = !memcmp(digest, di->digest, digest_size);
1053 			kfree(digest);
1054 		}
1055 
1056 		if (eq) {
1057 			drbd_set_in_sync(mdev, e->sector, e->size);
1058 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1059 			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1060 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1061 		} else {
1062 			inc_rs_pending(mdev);
1063 			e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1064 			e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1065 			kfree(di);
1066 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1067 		}
1068 	} else {
1069 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1070 		if (__ratelimit(&drbd_ratelimit_state))
1071 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1072 	}
1073 
1074 	dec_unacked(mdev);
1075 	move_to_net_ee_or_free(mdev, e);
1076 
1077 	if (unlikely(!ok))
1078 		dev_err(DEV, "drbd_send_block/ack() failed\n");
1079 	return ok;
1080 }
1081 
1082 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1083 {
1084 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1085 	int digest_size;
1086 	void *digest;
1087 	int ok = 1;
1088 
1089 	if (unlikely(cancel))
1090 		goto out;
1091 
1092 	if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1093 		goto out;
1094 
1095 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1096 	/* FIXME if this allocation fails, online verify will not terminate! */
1097 	digest = kmalloc(digest_size, GFP_NOIO);
1098 	if (digest) {
1099 		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1100 		inc_rs_pending(mdev);
1101 		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1102 					     digest, digest_size, P_OV_REPLY);
1103 		if (!ok)
1104 			dec_rs_pending(mdev);
1105 		kfree(digest);
1106 	}
1107 
1108 out:
1109 	drbd_free_ee(mdev, e);
1110 
1111 	dec_unacked(mdev);
1112 
1113 	return ok;
1114 }
1115 
1116 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1117 {
1118 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1119 		mdev->ov_last_oos_size += size>>9;
1120 	} else {
1121 		mdev->ov_last_oos_start = sector;
1122 		mdev->ov_last_oos_size = size>>9;
1123 	}
1124 	drbd_set_out_of_sync(mdev, sector, size);
1125 }
1126 
1127 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1128 {
1129 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1130 	struct digest_info *di;
1131 	int digest_size;
1132 	void *digest;
1133 	int ok, eq = 0;
1134 
1135 	if (unlikely(cancel)) {
1136 		drbd_free_ee(mdev, e);
1137 		dec_unacked(mdev);
1138 		return 1;
1139 	}
1140 
1141 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1142 	 * the resync lru has been cleaned up already */
1143 	if (get_ldev(mdev)) {
1144 		drbd_rs_complete_io(mdev, e->sector);
1145 		put_ldev(mdev);
1146 	}
1147 
1148 	di = e->digest;
1149 
1150 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1151 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1152 		digest = kmalloc(digest_size, GFP_NOIO);
1153 		if (digest) {
1154 			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1155 
1156 			D_ASSERT(digest_size == di->digest_size);
1157 			eq = !memcmp(digest, di->digest, digest_size);
1158 			kfree(digest);
1159 		}
1160 	} else {
1161 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1162 		if (__ratelimit(&drbd_ratelimit_state))
1163 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1164 	}
1165 
1166 	dec_unacked(mdev);
1167 	if (!eq)
1168 		drbd_ov_oos_found(mdev, e->sector, e->size);
1169 	else
1170 		ov_oos_print(mdev);
1171 
1172 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1173 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1174 
1175 	drbd_free_ee(mdev, e);
1176 
1177 	--mdev->ov_left;
1178 
1179 	/* let's advance progress step marks only for every other megabyte */
1180 	if ((mdev->ov_left & 0x200) == 0x200)
1181 		drbd_advance_rs_marks(mdev, mdev->ov_left);
1182 
1183 	if (mdev->ov_left == 0) {
1184 		ov_oos_print(mdev);
1185 		drbd_resync_finished(mdev);
1186 	}
1187 
1188 	return ok;
1189 }
1190 
1191 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1192 {
1193 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1194 	complete(&b->done);
1195 	return 1;
1196 }
1197 
1198 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1199 {
1200 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1201 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1202 	int ok = 1;
1203 
1204 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1205 	 * just before it was reassigned and re-queued, so double check that.
1206 	 * actually, this race was harmless, since we only try to send the
1207 	 * barrier packet here, and otherwise do nothing with the object.
1208 	 * but compare with the head of w_clear_epoch */
1209 	spin_lock_irq(&mdev->req_lock);
1210 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1211 		cancel = 1;
1212 	spin_unlock_irq(&mdev->req_lock);
1213 	if (cancel)
1214 		return 1;
1215 
1216 	if (!drbd_get_data_sock(mdev))
1217 		return 0;
1218 	p->barrier = b->br_number;
1219 	/* inc_ap_pending was done where this was queued.
1220 	 * dec_ap_pending will be done in got_BarrierAck
1221 	 * or (on connection loss) in w_clear_epoch.  */
1222 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1223 				(struct p_header80 *)p, sizeof(*p), 0);
1224 	drbd_put_data_sock(mdev);
1225 
1226 	return ok;
1227 }
1228 
1229 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1230 {
1231 	if (cancel)
1232 		return 1;
1233 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1234 }
1235 
1236 int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1237 {
1238 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1239 	int ok;
1240 
1241 	if (unlikely(cancel)) {
1242 		req_mod(req, send_canceled);
1243 		return 1;
1244 	}
1245 
1246 	ok = drbd_send_oos(mdev, req);
1247 	req_mod(req, oos_handed_to_network);
1248 
1249 	return ok;
1250 }
1251 
1252 /**
1253  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1254  * @mdev:	DRBD device.
1255  * @w:		work object.
1256  * @cancel:	The connection will be closed anyways
1257  */
1258 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1259 {
1260 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1261 	int ok;
1262 
1263 	if (unlikely(cancel)) {
1264 		req_mod(req, send_canceled);
1265 		return 1;
1266 	}
1267 
1268 	ok = drbd_send_dblock(mdev, req);
1269 	req_mod(req, ok ? handed_over_to_network : send_failed);
1270 
1271 	return ok;
1272 }
1273 
1274 /**
1275  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1276  * @mdev:	DRBD device.
1277  * @w:		work object.
1278  * @cancel:	The connection will be closed anyways
1279  */
1280 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1281 {
1282 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1283 	int ok;
1284 
1285 	if (unlikely(cancel)) {
1286 		req_mod(req, send_canceled);
1287 		return 1;
1288 	}
1289 
1290 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1291 				(unsigned long)req);
1292 
1293 	if (!ok) {
1294 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1295 		 * so this is probably redundant */
1296 		if (mdev->state.conn >= C_CONNECTED)
1297 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1298 	}
1299 	req_mod(req, ok ? handed_over_to_network : send_failed);
1300 
1301 	return ok;
1302 }
1303 
1304 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1305 {
1306 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1307 
1308 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1309 		drbd_al_begin_io(mdev, req->sector);
1310 	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1311 	   theoretically. Practically it can not deadlock, since this is
1312 	   only used when unfreezing IOs. All the extents of the requests
1313 	   that made it into the TL are already active */
1314 
1315 	drbd_req_make_private_bio(req, req->master_bio);
1316 	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1317 	generic_make_request(req->private_bio);
1318 
1319 	return 1;
1320 }
1321 
1322 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1323 {
1324 	struct drbd_conf *odev = mdev;
1325 
1326 	while (1) {
1327 		if (odev->sync_conf.after == -1)
1328 			return 1;
1329 		odev = minor_to_mdev(odev->sync_conf.after);
1330 		ERR_IF(!odev) return 1;
1331 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1332 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1333 		    odev->state.aftr_isp || odev->state.peer_isp ||
1334 		    odev->state.user_isp)
1335 			return 0;
1336 	}
1337 }
1338 
1339 /**
1340  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1341  * @mdev:	DRBD device.
1342  *
1343  * Called from process context only (admin command and after_state_ch).
1344  */
1345 static int _drbd_pause_after(struct drbd_conf *mdev)
1346 {
1347 	struct drbd_conf *odev;
1348 	int i, rv = 0;
1349 
1350 	for (i = 0; i < minor_count; i++) {
1351 		odev = minor_to_mdev(i);
1352 		if (!odev)
1353 			continue;
1354 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1355 			continue;
1356 		if (!_drbd_may_sync_now(odev))
1357 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1358 			       != SS_NOTHING_TO_DO);
1359 	}
1360 
1361 	return rv;
1362 }
1363 
1364 /**
1365  * _drbd_resume_next() - Resume resync on all devices that may resync now
1366  * @mdev:	DRBD device.
1367  *
1368  * Called from process context only (admin command and worker).
1369  */
1370 static int _drbd_resume_next(struct drbd_conf *mdev)
1371 {
1372 	struct drbd_conf *odev;
1373 	int i, rv = 0;
1374 
1375 	for (i = 0; i < minor_count; i++) {
1376 		odev = minor_to_mdev(i);
1377 		if (!odev)
1378 			continue;
1379 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1380 			continue;
1381 		if (odev->state.aftr_isp) {
1382 			if (_drbd_may_sync_now(odev))
1383 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1384 							CS_HARD, NULL)
1385 				       != SS_NOTHING_TO_DO) ;
1386 		}
1387 	}
1388 	return rv;
1389 }
1390 
1391 void resume_next_sg(struct drbd_conf *mdev)
1392 {
1393 	write_lock_irq(&global_state_lock);
1394 	_drbd_resume_next(mdev);
1395 	write_unlock_irq(&global_state_lock);
1396 }
1397 
1398 void suspend_other_sg(struct drbd_conf *mdev)
1399 {
1400 	write_lock_irq(&global_state_lock);
1401 	_drbd_pause_after(mdev);
1402 	write_unlock_irq(&global_state_lock);
1403 }
1404 
1405 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1406 {
1407 	struct drbd_conf *odev;
1408 
1409 	if (o_minor == -1)
1410 		return NO_ERROR;
1411 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1412 		return ERR_SYNC_AFTER;
1413 
1414 	/* check for loops */
1415 	odev = minor_to_mdev(o_minor);
1416 	while (1) {
1417 		if (odev == mdev)
1418 			return ERR_SYNC_AFTER_CYCLE;
1419 
1420 		/* dependency chain ends here, no cycles. */
1421 		if (odev->sync_conf.after == -1)
1422 			return NO_ERROR;
1423 
1424 		/* follow the dependency chain */
1425 		odev = minor_to_mdev(odev->sync_conf.after);
1426 	}
1427 }
1428 
1429 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1430 {
1431 	int changes;
1432 	int retcode;
1433 
1434 	write_lock_irq(&global_state_lock);
1435 	retcode = sync_after_error(mdev, na);
1436 	if (retcode == NO_ERROR) {
1437 		mdev->sync_conf.after = na;
1438 		do {
1439 			changes  = _drbd_pause_after(mdev);
1440 			changes |= _drbd_resume_next(mdev);
1441 		} while (changes);
1442 	}
1443 	write_unlock_irq(&global_state_lock);
1444 	return retcode;
1445 }
1446 
1447 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1448 {
1449 	atomic_set(&mdev->rs_sect_in, 0);
1450 	atomic_set(&mdev->rs_sect_ev, 0);
1451 	mdev->rs_in_flight = 0;
1452 	mdev->rs_planed = 0;
1453 	spin_lock(&mdev->peer_seq_lock);
1454 	fifo_set(&mdev->rs_plan_s, 0);
1455 	spin_unlock(&mdev->peer_seq_lock);
1456 }
1457 
1458 /**
1459  * drbd_start_resync() - Start the resync process
1460  * @mdev:	DRBD device.
1461  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1462  *
1463  * This function might bring you directly into one of the
1464  * C_PAUSED_SYNC_* states.
1465  */
1466 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1467 {
1468 	union drbd_state ns;
1469 	int r;
1470 
1471 	if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1472 		dev_err(DEV, "Resync already running!\n");
1473 		return;
1474 	}
1475 
1476 	if (mdev->state.conn < C_AHEAD) {
1477 		/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1478 		drbd_rs_cancel_all(mdev);
1479 		/* This should be done when we abort the resync. We definitely do not
1480 		   want to have this for connections going back and forth between
1481 		   Ahead/Behind and SyncSource/SyncTarget */
1482 	}
1483 
1484 	if (side == C_SYNC_TARGET) {
1485 		/* Since application IO was locked out during C_WF_BITMAP_T and
1486 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1487 		   we check that we might make the data inconsistent. */
1488 		r = drbd_khelper(mdev, "before-resync-target");
1489 		r = (r >> 8) & 0xff;
1490 		if (r > 0) {
1491 			dev_info(DEV, "before-resync-target handler returned %d, "
1492 			     "dropping connection.\n", r);
1493 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1494 			return;
1495 		}
1496 	} else /* C_SYNC_SOURCE */ {
1497 		r = drbd_khelper(mdev, "before-resync-source");
1498 		r = (r >> 8) & 0xff;
1499 		if (r > 0) {
1500 			if (r == 3) {
1501 				dev_info(DEV, "before-resync-source handler returned %d, "
1502 					 "ignoring. Old userland tools?", r);
1503 			} else {
1504 				dev_info(DEV, "before-resync-source handler returned %d, "
1505 					 "dropping connection.\n", r);
1506 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1507 				return;
1508 			}
1509 		}
1510 	}
1511 
1512 	drbd_state_lock(mdev);
1513 
1514 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1515 		drbd_state_unlock(mdev);
1516 		return;
1517 	}
1518 
1519 	if (side == C_SYNC_TARGET) {
1520 		mdev->bm_resync_fo = 0;
1521 	} else /* side == C_SYNC_SOURCE */ {
1522 		u64 uuid;
1523 
1524 		get_random_bytes(&uuid, sizeof(u64));
1525 		drbd_uuid_set(mdev, UI_BITMAP, uuid);
1526 		drbd_send_sync_uuid(mdev, uuid);
1527 
1528 		D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1529 	}
1530 
1531 	write_lock_irq(&global_state_lock);
1532 	ns = mdev->state;
1533 
1534 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1535 
1536 	ns.conn = side;
1537 
1538 	if (side == C_SYNC_TARGET)
1539 		ns.disk = D_INCONSISTENT;
1540 	else /* side == C_SYNC_SOURCE */
1541 		ns.pdsk = D_INCONSISTENT;
1542 
1543 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1544 	ns = mdev->state;
1545 
1546 	if (ns.conn < C_CONNECTED)
1547 		r = SS_UNKNOWN_ERROR;
1548 
1549 	if (r == SS_SUCCESS) {
1550 		unsigned long tw = drbd_bm_total_weight(mdev);
1551 		unsigned long now = jiffies;
1552 		int i;
1553 
1554 		mdev->rs_failed    = 0;
1555 		mdev->rs_paused    = 0;
1556 		mdev->rs_same_csum = 0;
1557 		mdev->rs_last_events = 0;
1558 		mdev->rs_last_sect_ev = 0;
1559 		mdev->rs_total     = tw;
1560 		mdev->rs_start     = now;
1561 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1562 			mdev->rs_mark_left[i] = tw;
1563 			mdev->rs_mark_time[i] = now;
1564 		}
1565 		_drbd_pause_after(mdev);
1566 	}
1567 	write_unlock_irq(&global_state_lock);
1568 	put_ldev(mdev);
1569 
1570 	if (r == SS_SUCCESS) {
1571 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1572 		     drbd_conn_str(ns.conn),
1573 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1574 		     (unsigned long) mdev->rs_total);
1575 
1576 		if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1577 			/* This still has a race (about when exactly the peers
1578 			 * detect connection loss) that can lead to a full sync
1579 			 * on next handshake. In 8.3.9 we fixed this with explicit
1580 			 * resync-finished notifications, but the fix
1581 			 * introduces a protocol change.  Sleeping for some
1582 			 * time longer than the ping interval + timeout on the
1583 			 * SyncSource, to give the SyncTarget the chance to
1584 			 * detect connection loss, then waiting for a ping
1585 			 * response (implicit in drbd_resync_finished) reduces
1586 			 * the race considerably, but does not solve it. */
1587 			if (side == C_SYNC_SOURCE)
1588 				schedule_timeout_interruptible(
1589 					mdev->net_conf->ping_int * HZ +
1590 					mdev->net_conf->ping_timeo*HZ/9);
1591 			drbd_resync_finished(mdev);
1592 		}
1593 
1594 		drbd_rs_controller_reset(mdev);
1595 		/* ns.conn may already be != mdev->state.conn,
1596 		 * we may have been paused in between, or become paused until
1597 		 * the timer triggers.
1598 		 * No matter, that is handled in resync_timer_fn() */
1599 		if (ns.conn == C_SYNC_TARGET)
1600 			mod_timer(&mdev->resync_timer, jiffies);
1601 
1602 		drbd_md_sync(mdev);
1603 	}
1604 	drbd_state_unlock(mdev);
1605 }
1606 
1607 int drbd_worker(struct drbd_thread *thi)
1608 {
1609 	struct drbd_conf *mdev = thi->mdev;
1610 	struct drbd_work *w = NULL;
1611 	LIST_HEAD(work_list);
1612 	int intr = 0, i;
1613 
1614 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1615 
1616 	while (get_t_state(thi) == Running) {
1617 		drbd_thread_current_set_cpu(mdev);
1618 
1619 		if (down_trylock(&mdev->data.work.s)) {
1620 			mutex_lock(&mdev->data.mutex);
1621 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1622 				drbd_tcp_uncork(mdev->data.socket);
1623 			mutex_unlock(&mdev->data.mutex);
1624 
1625 			intr = down_interruptible(&mdev->data.work.s);
1626 
1627 			mutex_lock(&mdev->data.mutex);
1628 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1629 				drbd_tcp_cork(mdev->data.socket);
1630 			mutex_unlock(&mdev->data.mutex);
1631 		}
1632 
1633 		if (intr) {
1634 			D_ASSERT(intr == -EINTR);
1635 			flush_signals(current);
1636 			ERR_IF (get_t_state(thi) == Running)
1637 				continue;
1638 			break;
1639 		}
1640 
1641 		if (get_t_state(thi) != Running)
1642 			break;
1643 		/* With this break, we have done a down() but not consumed
1644 		   the entry from the list. The cleanup code takes care of
1645 		   this...   */
1646 
1647 		w = NULL;
1648 		spin_lock_irq(&mdev->data.work.q_lock);
1649 		ERR_IF(list_empty(&mdev->data.work.q)) {
1650 			/* something terribly wrong in our logic.
1651 			 * we were able to down() the semaphore,
1652 			 * but the list is empty... doh.
1653 			 *
1654 			 * what is the best thing to do now?
1655 			 * try again from scratch, restarting the receiver,
1656 			 * asender, whatnot? could break even more ugly,
1657 			 * e.g. when we are primary, but no good local data.
1658 			 *
1659 			 * I'll try to get away just starting over this loop.
1660 			 */
1661 			spin_unlock_irq(&mdev->data.work.q_lock);
1662 			continue;
1663 		}
1664 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1665 		list_del_init(&w->list);
1666 		spin_unlock_irq(&mdev->data.work.q_lock);
1667 
1668 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1669 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1670 			if (mdev->state.conn >= C_CONNECTED)
1671 				drbd_force_state(mdev,
1672 						NS(conn, C_NETWORK_FAILURE));
1673 		}
1674 	}
1675 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1676 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1677 
1678 	spin_lock_irq(&mdev->data.work.q_lock);
1679 	i = 0;
1680 	while (!list_empty(&mdev->data.work.q)) {
1681 		list_splice_init(&mdev->data.work.q, &work_list);
1682 		spin_unlock_irq(&mdev->data.work.q_lock);
1683 
1684 		while (!list_empty(&work_list)) {
1685 			w = list_entry(work_list.next, struct drbd_work, list);
1686 			list_del_init(&w->list);
1687 			w->cb(mdev, w, 1);
1688 			i++; /* dead debugging code */
1689 		}
1690 
1691 		spin_lock_irq(&mdev->data.work.q_lock);
1692 	}
1693 	sema_init(&mdev->data.work.s, 0);
1694 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1695 	 * but up() ed outside the spinlock, we could get an up() on the
1696 	 * semaphore without corresponding list entry.
1697 	 * So don't do that.
1698 	 */
1699 	spin_unlock_irq(&mdev->data.work.q_lock);
1700 
1701 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1702 	/* _drbd_set_state only uses stop_nowait.
1703 	 * wait here for the Exiting receiver. */
1704 	drbd_thread_stop(&mdev->receiver);
1705 	drbd_mdev_cleanup(mdev);
1706 
1707 	dev_info(DEV, "worker terminated\n");
1708 
1709 	clear_bit(DEVICE_DYING, &mdev->flags);
1710 	clear_bit(CONFIG_PENDING, &mdev->flags);
1711 	wake_up(&mdev->state_wait);
1712 
1713 	return 0;
1714 }
1715