xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision 2451fc3b)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38 
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41 
42 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
43 
44 
45 
46 /* defined here:
47    drbd_md_io_complete
48    drbd_endio_sec
49    drbd_endio_pri
50 
51  * more endio handlers:
52    atodb_endio in drbd_actlog.c
53    drbd_bm_async_io_complete in drbd_bitmap.c
54 
55  * For all these callbacks, note the following:
56  * The callbacks will be called in irq context by the IDE drivers,
57  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58  * Try to get the locking right :)
59  *
60  */
61 
62 
63 /* About the global_state_lock
64    Each state transition on an device holds a read lock. In case we have
65    to evaluate the sync after dependencies, we grab a write lock, because
66    we need stable states on all devices for that.  */
67 rwlock_t global_state_lock;
68 
69 /* used for synchronous meta data and bitmap IO
70  * submitted by drbd_md_sync_page_io()
71  */
72 void drbd_md_io_complete(struct bio *bio, int error)
73 {
74 	struct drbd_md_io *md_io;
75 
76 	md_io = (struct drbd_md_io *)bio->bi_private;
77 	md_io->error = error;
78 
79 	complete(&md_io->event);
80 }
81 
82 /* reads on behalf of the partner,
83  * "submitted" by the receiver
84  */
85 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
86 {
87 	unsigned long flags = 0;
88 	struct drbd_conf *mdev = e->mdev;
89 
90 	D_ASSERT(e->block_id != ID_VACANT);
91 
92 	spin_lock_irqsave(&mdev->req_lock, flags);
93 	mdev->read_cnt += e->size >> 9;
94 	list_del(&e->w.list);
95 	if (list_empty(&mdev->read_ee))
96 		wake_up(&mdev->ee_wait);
97 	if (test_bit(__EE_WAS_ERROR, &e->flags))
98 		__drbd_chk_io_error(mdev, FALSE);
99 	spin_unlock_irqrestore(&mdev->req_lock, flags);
100 
101 	drbd_queue_work(&mdev->data.work, &e->w);
102 	put_ldev(mdev);
103 }
104 
105 /* writes on behalf of the partner, or resync writes,
106  * "submitted" by the receiver, final stage.  */
107 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
108 {
109 	unsigned long flags = 0;
110 	struct drbd_conf *mdev = e->mdev;
111 	sector_t e_sector;
112 	int do_wake;
113 	int is_syncer_req;
114 	int do_al_complete_io;
115 
116 	D_ASSERT(e->block_id != ID_VACANT);
117 
118 	/* after we moved e to done_ee,
119 	 * we may no longer access it,
120 	 * it may be freed/reused already!
121 	 * (as soon as we release the req_lock) */
122 	e_sector = e->sector;
123 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
124 	is_syncer_req = is_syncer_block_id(e->block_id);
125 
126 	spin_lock_irqsave(&mdev->req_lock, flags);
127 	mdev->writ_cnt += e->size >> 9;
128 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
129 	list_add_tail(&e->w.list, &mdev->done_ee);
130 
131 	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
132 	 * neither did we wake possibly waiting conflicting requests.
133 	 * done from "drbd_process_done_ee" within the appropriate w.cb
134 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
135 
136 	do_wake = is_syncer_req
137 		? list_empty(&mdev->sync_ee)
138 		: list_empty(&mdev->active_ee);
139 
140 	if (test_bit(__EE_WAS_ERROR, &e->flags))
141 		__drbd_chk_io_error(mdev, FALSE);
142 	spin_unlock_irqrestore(&mdev->req_lock, flags);
143 
144 	if (is_syncer_req)
145 		drbd_rs_complete_io(mdev, e_sector);
146 
147 	if (do_wake)
148 		wake_up(&mdev->ee_wait);
149 
150 	if (do_al_complete_io)
151 		drbd_al_complete_io(mdev, e_sector);
152 
153 	wake_asender(mdev);
154 	put_ldev(mdev);
155 }
156 
157 /* writes on behalf of the partner, or resync writes,
158  * "submitted" by the receiver.
159  */
160 void drbd_endio_sec(struct bio *bio, int error)
161 {
162 	struct drbd_epoch_entry *e = bio->bi_private;
163 	struct drbd_conf *mdev = e->mdev;
164 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
165 	int is_write = bio_data_dir(bio) == WRITE;
166 
167 	if (error)
168 		dev_warn(DEV, "%s: error=%d s=%llus\n",
169 				is_write ? "write" : "read", error,
170 				(unsigned long long)e->sector);
171 	if (!error && !uptodate) {
172 		dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
173 				is_write ? "write" : "read",
174 				(unsigned long long)e->sector);
175 		/* strange behavior of some lower level drivers...
176 		 * fail the request by clearing the uptodate flag,
177 		 * but do not return any error?! */
178 		error = -EIO;
179 	}
180 
181 	if (error)
182 		set_bit(__EE_WAS_ERROR, &e->flags);
183 
184 	bio_put(bio); /* no need for the bio anymore */
185 	if (atomic_dec_and_test(&e->pending_bios)) {
186 		if (is_write)
187 			drbd_endio_write_sec_final(e);
188 		else
189 			drbd_endio_read_sec_final(e);
190 	}
191 }
192 
193 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
194  */
195 void drbd_endio_pri(struct bio *bio, int error)
196 {
197 	struct drbd_request *req = bio->bi_private;
198 	struct drbd_conf *mdev = req->mdev;
199 	enum drbd_req_event what;
200 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
201 
202 	if (!error && !uptodate) {
203 		dev_warn(DEV, "p %s: setting error to -EIO\n",
204 			 bio_data_dir(bio) == WRITE ? "write" : "read");
205 		/* strange behavior of some lower level drivers...
206 		 * fail the request by clearing the uptodate flag,
207 		 * but do not return any error?! */
208 		error = -EIO;
209 	}
210 
211 	/* to avoid recursion in __req_mod */
212 	if (unlikely(error)) {
213 		what = (bio_data_dir(bio) == WRITE)
214 			? write_completed_with_error
215 			: (bio_rw(bio) == READ)
216 			  ? read_completed_with_error
217 			  : read_ahead_completed_with_error;
218 	} else
219 		what = completed_ok;
220 
221 	bio_put(req->private_bio);
222 	req->private_bio = ERR_PTR(error);
223 
224 	req_mod(req, what);
225 }
226 
227 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
228 {
229 	struct drbd_request *req = container_of(w, struct drbd_request, w);
230 
231 	/* We should not detach for read io-error,
232 	 * but try to WRITE the P_DATA_REPLY to the failed location,
233 	 * to give the disk the chance to relocate that block */
234 
235 	spin_lock_irq(&mdev->req_lock);
236 	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
237 		_req_mod(req, read_retry_remote_canceled);
238 		spin_unlock_irq(&mdev->req_lock);
239 		return 1;
240 	}
241 	spin_unlock_irq(&mdev->req_lock);
242 
243 	return w_send_read_req(mdev, w, 0);
244 }
245 
246 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
247 {
248 	ERR_IF(cancel) return 1;
249 	dev_err(DEV, "resync inactive, but callback triggered??\n");
250 	return 1; /* Simply ignore this! */
251 }
252 
253 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
254 {
255 	struct hash_desc desc;
256 	struct scatterlist sg;
257 	struct page *page = e->pages;
258 	struct page *tmp;
259 	unsigned len;
260 
261 	desc.tfm = tfm;
262 	desc.flags = 0;
263 
264 	sg_init_table(&sg, 1);
265 	crypto_hash_init(&desc);
266 
267 	while ((tmp = page_chain_next(page))) {
268 		/* all but the last page will be fully used */
269 		sg_set_page(&sg, page, PAGE_SIZE, 0);
270 		crypto_hash_update(&desc, &sg, sg.length);
271 		page = tmp;
272 	}
273 	/* and now the last, possibly only partially used page */
274 	len = e->size & (PAGE_SIZE - 1);
275 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
276 	crypto_hash_update(&desc, &sg, sg.length);
277 	crypto_hash_final(&desc, digest);
278 }
279 
280 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
281 {
282 	struct hash_desc desc;
283 	struct scatterlist sg;
284 	struct bio_vec *bvec;
285 	int i;
286 
287 	desc.tfm = tfm;
288 	desc.flags = 0;
289 
290 	sg_init_table(&sg, 1);
291 	crypto_hash_init(&desc);
292 
293 	__bio_for_each_segment(bvec, bio, i, 0) {
294 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
295 		crypto_hash_update(&desc, &sg, sg.length);
296 	}
297 	crypto_hash_final(&desc, digest);
298 }
299 
300 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
301 {
302 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
303 	int digest_size;
304 	void *digest;
305 	int ok;
306 
307 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
308 
309 	if (unlikely(cancel)) {
310 		drbd_free_ee(mdev, e);
311 		return 1;
312 	}
313 
314 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
315 		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
316 		digest = kmalloc(digest_size, GFP_NOIO);
317 		if (digest) {
318 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
319 
320 			inc_rs_pending(mdev);
321 			ok = drbd_send_drequest_csum(mdev,
322 						     e->sector,
323 						     e->size,
324 						     digest,
325 						     digest_size,
326 						     P_CSUM_RS_REQUEST);
327 			kfree(digest);
328 		} else {
329 			dev_err(DEV, "kmalloc() of digest failed.\n");
330 			ok = 0;
331 		}
332 	} else
333 		ok = 1;
334 
335 	drbd_free_ee(mdev, e);
336 
337 	if (unlikely(!ok))
338 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
339 	return ok;
340 }
341 
342 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
343 
344 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
345 {
346 	struct drbd_epoch_entry *e;
347 
348 	if (!get_ldev(mdev))
349 		return -EIO;
350 
351 	if (drbd_rs_should_slow_down(mdev))
352 		goto defer;
353 
354 	/* GFP_TRY, because if there is no memory available right now, this may
355 	 * be rescheduled for later. It is "only" background resync, after all. */
356 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
357 	if (!e)
358 		goto defer;
359 
360 	e->w.cb = w_e_send_csum;
361 	spin_lock_irq(&mdev->req_lock);
362 	list_add(&e->w.list, &mdev->read_ee);
363 	spin_unlock_irq(&mdev->req_lock);
364 
365 	atomic_add(size >> 9, &mdev->rs_sect_ev);
366 	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
367 		return 0;
368 
369 	/* drbd_submit_ee currently fails for one reason only:
370 	 * not being able to allocate enough bios.
371 	 * Is dropping the connection going to help? */
372 	spin_lock_irq(&mdev->req_lock);
373 	list_del(&e->w.list);
374 	spin_unlock_irq(&mdev->req_lock);
375 
376 	drbd_free_ee(mdev, e);
377 defer:
378 	put_ldev(mdev);
379 	return -EAGAIN;
380 }
381 
382 void resync_timer_fn(unsigned long data)
383 {
384 	struct drbd_conf *mdev = (struct drbd_conf *) data;
385 	int queue;
386 
387 	queue = 1;
388 	switch (mdev->state.conn) {
389 	case C_VERIFY_S:
390 		mdev->resync_work.cb = w_make_ov_request;
391 		break;
392 	case C_SYNC_TARGET:
393 		mdev->resync_work.cb = w_make_resync_request;
394 		break;
395 	default:
396 		queue = 0;
397 		mdev->resync_work.cb = w_resync_inactive;
398 	}
399 
400 	/* harmless race: list_empty outside data.work.q_lock */
401 	if (list_empty(&mdev->resync_work.list) && queue)
402 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
403 }
404 
405 static void fifo_set(struct fifo_buffer *fb, int value)
406 {
407 	int i;
408 
409 	for (i = 0; i < fb->size; i++)
410 		fb->values[i] = value;
411 }
412 
413 static int fifo_push(struct fifo_buffer *fb, int value)
414 {
415 	int ov;
416 
417 	ov = fb->values[fb->head_index];
418 	fb->values[fb->head_index++] = value;
419 
420 	if (fb->head_index >= fb->size)
421 		fb->head_index = 0;
422 
423 	return ov;
424 }
425 
426 static void fifo_add_val(struct fifo_buffer *fb, int value)
427 {
428 	int i;
429 
430 	for (i = 0; i < fb->size; i++)
431 		fb->values[i] += value;
432 }
433 
434 int drbd_rs_controller(struct drbd_conf *mdev)
435 {
436 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
437 	unsigned int want;     /* The number of sectors we want in the proxy */
438 	int req_sect; /* Number of sectors to request in this turn */
439 	int correction; /* Number of sectors more we need in the proxy*/
440 	int cps; /* correction per invocation of drbd_rs_controller() */
441 	int steps; /* Number of time steps to plan ahead */
442 	int curr_corr;
443 	int max_sect;
444 
445 	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
446 	mdev->rs_in_flight -= sect_in;
447 
448 	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
449 
450 	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
451 
452 	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
453 		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
454 	} else { /* normal path */
455 		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
456 			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
457 	}
458 
459 	correction = want - mdev->rs_in_flight - mdev->rs_planed;
460 
461 	/* Plan ahead */
462 	cps = correction / steps;
463 	fifo_add_val(&mdev->rs_plan_s, cps);
464 	mdev->rs_planed += cps * steps;
465 
466 	/* What we do in this step */
467 	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
468 	spin_unlock(&mdev->peer_seq_lock);
469 	mdev->rs_planed -= curr_corr;
470 
471 	req_sect = sect_in + curr_corr;
472 	if (req_sect < 0)
473 		req_sect = 0;
474 
475 	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
476 	if (req_sect > max_sect)
477 		req_sect = max_sect;
478 
479 	/*
480 	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
481 		 sect_in, mdev->rs_in_flight, want, correction,
482 		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
483 	*/
484 
485 	return req_sect;
486 }
487 
488 int w_make_resync_request(struct drbd_conf *mdev,
489 		struct drbd_work *w, int cancel)
490 {
491 	unsigned long bit;
492 	sector_t sector;
493 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
494 	int max_segment_size;
495 	int number, rollback_i, size, pe, mx;
496 	int align, queued, sndbuf;
497 	int i = 0;
498 
499 	if (unlikely(cancel))
500 		return 1;
501 
502 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
503 		dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
504 		return 0;
505 	}
506 
507 	if (mdev->state.conn != C_SYNC_TARGET)
508 		dev_err(DEV, "%s in w_make_resync_request\n",
509 			drbd_conn_str(mdev->state.conn));
510 
511 	if (mdev->rs_total == 0) {
512 		/* empty resync? */
513 		drbd_resync_finished(mdev);
514 		return 1;
515 	}
516 
517 	if (!get_ldev(mdev)) {
518 		/* Since we only need to access mdev->rsync a
519 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
520 		   to continue resync with a broken disk makes no sense at
521 		   all */
522 		dev_err(DEV, "Disk broke down during resync!\n");
523 		mdev->resync_work.cb = w_resync_inactive;
524 		return 1;
525 	}
526 
527 	/* starting with drbd 8.3.8, we can handle multi-bio EEs,
528 	 * if it should be necessary */
529 	max_segment_size =
530 		mdev->agreed_pro_version < 94 ? queue_max_segment_size(mdev->rq_queue) :
531 		mdev->agreed_pro_version < 95 ?	DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_SEGMENT_SIZE;
532 
533 	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
534 		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
535 		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
536 	} else {
537 		mdev->c_sync_rate = mdev->sync_conf.rate;
538 		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
539 	}
540 
541 	/* Throttle resync on lower level disk activity, which may also be
542 	 * caused by application IO on Primary/SyncTarget.
543 	 * Keep this after the call to drbd_rs_controller, as that assumes
544 	 * to be called as precisely as possible every SLEEP_TIME,
545 	 * and would be confused otherwise. */
546 	if (drbd_rs_should_slow_down(mdev))
547 		goto requeue;
548 
549 	mutex_lock(&mdev->data.mutex);
550 	if (mdev->data.socket)
551 		mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
552 	else
553 		mx = 1;
554 	mutex_unlock(&mdev->data.mutex);
555 
556 	/* For resync rates >160MB/sec, allow more pending RS requests */
557 	if (number > mx)
558 		mx = number;
559 
560 	/* Limit the number of pending RS requests to no more than the peer's receive buffer */
561 	pe = atomic_read(&mdev->rs_pending_cnt);
562 	if ((pe + number) > mx) {
563 		number = mx - pe;
564 	}
565 
566 	for (i = 0; i < number; i++) {
567 		/* Stop generating RS requests, when half of the send buffer is filled */
568 		mutex_lock(&mdev->data.mutex);
569 		if (mdev->data.socket) {
570 			queued = mdev->data.socket->sk->sk_wmem_queued;
571 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
572 		} else {
573 			queued = 1;
574 			sndbuf = 0;
575 		}
576 		mutex_unlock(&mdev->data.mutex);
577 		if (queued > sndbuf / 2)
578 			goto requeue;
579 
580 next_sector:
581 		size = BM_BLOCK_SIZE;
582 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
583 
584 		if (bit == -1UL) {
585 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
586 			mdev->resync_work.cb = w_resync_inactive;
587 			put_ldev(mdev);
588 			return 1;
589 		}
590 
591 		sector = BM_BIT_TO_SECT(bit);
592 
593 		if (drbd_try_rs_begin_io(mdev, sector)) {
594 			mdev->bm_resync_fo = bit;
595 			goto requeue;
596 		}
597 		mdev->bm_resync_fo = bit + 1;
598 
599 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
600 			drbd_rs_complete_io(mdev, sector);
601 			goto next_sector;
602 		}
603 
604 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
605 		/* try to find some adjacent bits.
606 		 * we stop if we have already the maximum req size.
607 		 *
608 		 * Additionally always align bigger requests, in order to
609 		 * be prepared for all stripe sizes of software RAIDs.
610 		 */
611 		align = 1;
612 		rollback_i = i;
613 		for (;;) {
614 			if (size + BM_BLOCK_SIZE > max_segment_size)
615 				break;
616 
617 			/* Be always aligned */
618 			if (sector & ((1<<(align+3))-1))
619 				break;
620 
621 			/* do not cross extent boundaries */
622 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
623 				break;
624 			/* now, is it actually dirty, after all?
625 			 * caution, drbd_bm_test_bit is tri-state for some
626 			 * obscure reason; ( b == 0 ) would get the out-of-band
627 			 * only accidentally right because of the "oddly sized"
628 			 * adjustment below */
629 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
630 				break;
631 			bit++;
632 			size += BM_BLOCK_SIZE;
633 			if ((BM_BLOCK_SIZE << align) <= size)
634 				align++;
635 			i++;
636 		}
637 		/* if we merged some,
638 		 * reset the offset to start the next drbd_bm_find_next from */
639 		if (size > BM_BLOCK_SIZE)
640 			mdev->bm_resync_fo = bit + 1;
641 #endif
642 
643 		/* adjust very last sectors, in case we are oddly sized */
644 		if (sector + (size>>9) > capacity)
645 			size = (capacity-sector)<<9;
646 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
647 			switch (read_for_csum(mdev, sector, size)) {
648 			case -EIO: /* Disk failure */
649 				put_ldev(mdev);
650 				return 0;
651 			case -EAGAIN: /* allocation failed, or ldev busy */
652 				drbd_rs_complete_io(mdev, sector);
653 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
654 				i = rollback_i;
655 				goto requeue;
656 			case 0:
657 				/* everything ok */
658 				break;
659 			default:
660 				BUG();
661 			}
662 		} else {
663 			inc_rs_pending(mdev);
664 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
665 					       sector, size, ID_SYNCER)) {
666 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
667 				dec_rs_pending(mdev);
668 				put_ldev(mdev);
669 				return 0;
670 			}
671 		}
672 	}
673 
674 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
675 		/* last syncer _request_ was sent,
676 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
677 		 * next sync group will resume), as soon as we receive the last
678 		 * resync data block, and the last bit is cleared.
679 		 * until then resync "work" is "inactive" ...
680 		 */
681 		mdev->resync_work.cb = w_resync_inactive;
682 		put_ldev(mdev);
683 		return 1;
684 	}
685 
686  requeue:
687 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
688 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
689 	put_ldev(mdev);
690 	return 1;
691 }
692 
693 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
694 {
695 	int number, i, size;
696 	sector_t sector;
697 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
698 
699 	if (unlikely(cancel))
700 		return 1;
701 
702 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
703 		dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
704 		return 0;
705 	}
706 
707 	number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
708 	if (atomic_read(&mdev->rs_pending_cnt) > number)
709 		goto requeue;
710 
711 	number -= atomic_read(&mdev->rs_pending_cnt);
712 
713 	sector = mdev->ov_position;
714 	for (i = 0; i < number; i++) {
715 		if (sector >= capacity) {
716 			mdev->resync_work.cb = w_resync_inactive;
717 			return 1;
718 		}
719 
720 		size = BM_BLOCK_SIZE;
721 
722 		if (drbd_try_rs_begin_io(mdev, sector)) {
723 			mdev->ov_position = sector;
724 			goto requeue;
725 		}
726 
727 		if (sector + (size>>9) > capacity)
728 			size = (capacity-sector)<<9;
729 
730 		inc_rs_pending(mdev);
731 		if (!drbd_send_ov_request(mdev, sector, size)) {
732 			dec_rs_pending(mdev);
733 			return 0;
734 		}
735 		sector += BM_SECT_PER_BIT;
736 	}
737 	mdev->ov_position = sector;
738 
739  requeue:
740 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
741 	return 1;
742 }
743 
744 
745 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
746 {
747 	kfree(w);
748 	ov_oos_print(mdev);
749 	drbd_resync_finished(mdev);
750 
751 	return 1;
752 }
753 
754 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
755 {
756 	kfree(w);
757 
758 	drbd_resync_finished(mdev);
759 
760 	return 1;
761 }
762 
763 static void ping_peer(struct drbd_conf *mdev)
764 {
765 	clear_bit(GOT_PING_ACK, &mdev->flags);
766 	request_ping(mdev);
767 	wait_event(mdev->misc_wait,
768 		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
769 }
770 
771 int drbd_resync_finished(struct drbd_conf *mdev)
772 {
773 	unsigned long db, dt, dbdt;
774 	unsigned long n_oos;
775 	union drbd_state os, ns;
776 	struct drbd_work *w;
777 	char *khelper_cmd = NULL;
778 
779 	/* Remove all elements from the resync LRU. Since future actions
780 	 * might set bits in the (main) bitmap, then the entries in the
781 	 * resync LRU would be wrong. */
782 	if (drbd_rs_del_all(mdev)) {
783 		/* In case this is not possible now, most probably because
784 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
785 		 * queue (or even the read operations for those packets
786 		 * is not finished by now).   Retry in 100ms. */
787 
788 		drbd_kick_lo(mdev);
789 		__set_current_state(TASK_INTERRUPTIBLE);
790 		schedule_timeout(HZ / 10);
791 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
792 		if (w) {
793 			w->cb = w_resync_finished;
794 			drbd_queue_work(&mdev->data.work, w);
795 			return 1;
796 		}
797 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
798 	}
799 
800 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
801 	if (dt <= 0)
802 		dt = 1;
803 	db = mdev->rs_total;
804 	dbdt = Bit2KB(db/dt);
805 	mdev->rs_paused /= HZ;
806 
807 	if (!get_ldev(mdev))
808 		goto out;
809 
810 	ping_peer(mdev);
811 
812 	spin_lock_irq(&mdev->req_lock);
813 	os = mdev->state;
814 
815 	/* This protects us against multiple calls (that can happen in the presence
816 	   of application IO), and against connectivity loss just before we arrive here. */
817 	if (os.conn <= C_CONNECTED)
818 		goto out_unlock;
819 
820 	ns = os;
821 	ns.conn = C_CONNECTED;
822 
823 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
824 	     (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
825 	     "Online verify " : "Resync",
826 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
827 
828 	n_oos = drbd_bm_total_weight(mdev);
829 
830 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
831 		if (n_oos) {
832 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
833 			      n_oos, Bit2KB(1));
834 			khelper_cmd = "out-of-sync";
835 		}
836 	} else {
837 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
838 
839 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
840 			khelper_cmd = "after-resync-target";
841 
842 		if (mdev->csums_tfm && mdev->rs_total) {
843 			const unsigned long s = mdev->rs_same_csum;
844 			const unsigned long t = mdev->rs_total;
845 			const int ratio =
846 				(t == 0)     ? 0 :
847 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
848 			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
849 			     "transferred %luK total %luK\n",
850 			     ratio,
851 			     Bit2KB(mdev->rs_same_csum),
852 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
853 			     Bit2KB(mdev->rs_total));
854 		}
855 	}
856 
857 	if (mdev->rs_failed) {
858 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
859 
860 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
861 			ns.disk = D_INCONSISTENT;
862 			ns.pdsk = D_UP_TO_DATE;
863 		} else {
864 			ns.disk = D_UP_TO_DATE;
865 			ns.pdsk = D_INCONSISTENT;
866 		}
867 	} else {
868 		ns.disk = D_UP_TO_DATE;
869 		ns.pdsk = D_UP_TO_DATE;
870 
871 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
872 			if (mdev->p_uuid) {
873 				int i;
874 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
875 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
876 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
877 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
878 			} else {
879 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
880 			}
881 		}
882 
883 		drbd_uuid_set_bm(mdev, 0UL);
884 
885 		if (mdev->p_uuid) {
886 			/* Now the two UUID sets are equal, update what we
887 			 * know of the peer. */
888 			int i;
889 			for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
890 				mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
891 		}
892 	}
893 
894 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
895 out_unlock:
896 	spin_unlock_irq(&mdev->req_lock);
897 	put_ldev(mdev);
898 out:
899 	mdev->rs_total  = 0;
900 	mdev->rs_failed = 0;
901 	mdev->rs_paused = 0;
902 	mdev->ov_start_sector = 0;
903 
904 	drbd_md_sync(mdev);
905 
906 	if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
907 		dev_info(DEV, "Writing the whole bitmap\n");
908 		drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
909 	}
910 
911 	if (khelper_cmd)
912 		drbd_khelper(mdev, khelper_cmd);
913 
914 	return 1;
915 }
916 
917 /* helper */
918 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
919 {
920 	if (drbd_ee_has_active_page(e)) {
921 		/* This might happen if sendpage() has not finished */
922 		int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
923 		atomic_add(i, &mdev->pp_in_use_by_net);
924 		atomic_sub(i, &mdev->pp_in_use);
925 		spin_lock_irq(&mdev->req_lock);
926 		list_add_tail(&e->w.list, &mdev->net_ee);
927 		spin_unlock_irq(&mdev->req_lock);
928 		wake_up(&drbd_pp_wait);
929 	} else
930 		drbd_free_ee(mdev, e);
931 }
932 
933 /**
934  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
935  * @mdev:	DRBD device.
936  * @w:		work object.
937  * @cancel:	The connection will be closed anyways
938  */
939 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
940 {
941 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
942 	int ok;
943 
944 	if (unlikely(cancel)) {
945 		drbd_free_ee(mdev, e);
946 		dec_unacked(mdev);
947 		return 1;
948 	}
949 
950 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
951 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
952 	} else {
953 		if (__ratelimit(&drbd_ratelimit_state))
954 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
955 			    (unsigned long long)e->sector);
956 
957 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
958 	}
959 
960 	dec_unacked(mdev);
961 
962 	move_to_net_ee_or_free(mdev, e);
963 
964 	if (unlikely(!ok))
965 		dev_err(DEV, "drbd_send_block() failed\n");
966 	return ok;
967 }
968 
969 /**
970  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
971  * @mdev:	DRBD device.
972  * @w:		work object.
973  * @cancel:	The connection will be closed anyways
974  */
975 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
976 {
977 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
978 	int ok;
979 
980 	if (unlikely(cancel)) {
981 		drbd_free_ee(mdev, e);
982 		dec_unacked(mdev);
983 		return 1;
984 	}
985 
986 	if (get_ldev_if_state(mdev, D_FAILED)) {
987 		drbd_rs_complete_io(mdev, e->sector);
988 		put_ldev(mdev);
989 	}
990 
991 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
992 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
993 			inc_rs_pending(mdev);
994 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
995 		} else {
996 			if (__ratelimit(&drbd_ratelimit_state))
997 				dev_err(DEV, "Not sending RSDataReply, "
998 				    "partner DISKLESS!\n");
999 			ok = 1;
1000 		}
1001 	} else {
1002 		if (__ratelimit(&drbd_ratelimit_state))
1003 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1004 			    (unsigned long long)e->sector);
1005 
1006 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1007 
1008 		/* update resync data with failure */
1009 		drbd_rs_failed_io(mdev, e->sector, e->size);
1010 	}
1011 
1012 	dec_unacked(mdev);
1013 
1014 	move_to_net_ee_or_free(mdev, e);
1015 
1016 	if (unlikely(!ok))
1017 		dev_err(DEV, "drbd_send_block() failed\n");
1018 	return ok;
1019 }
1020 
1021 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1022 {
1023 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1024 	struct digest_info *di;
1025 	int digest_size;
1026 	void *digest = NULL;
1027 	int ok, eq = 0;
1028 
1029 	if (unlikely(cancel)) {
1030 		drbd_free_ee(mdev, e);
1031 		dec_unacked(mdev);
1032 		return 1;
1033 	}
1034 
1035 	if (get_ldev(mdev)) {
1036 		drbd_rs_complete_io(mdev, e->sector);
1037 		put_ldev(mdev);
1038 	}
1039 
1040 	di = e->digest;
1041 
1042 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1043 		/* quick hack to try to avoid a race against reconfiguration.
1044 		 * a real fix would be much more involved,
1045 		 * introducing more locking mechanisms */
1046 		if (mdev->csums_tfm) {
1047 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1048 			D_ASSERT(digest_size == di->digest_size);
1049 			digest = kmalloc(digest_size, GFP_NOIO);
1050 		}
1051 		if (digest) {
1052 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1053 			eq = !memcmp(digest, di->digest, digest_size);
1054 			kfree(digest);
1055 		}
1056 
1057 		if (eq) {
1058 			drbd_set_in_sync(mdev, e->sector, e->size);
1059 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1060 			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1061 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1062 		} else {
1063 			inc_rs_pending(mdev);
1064 			e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1065 			e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1066 			kfree(di);
1067 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1068 		}
1069 	} else {
1070 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1071 		if (__ratelimit(&drbd_ratelimit_state))
1072 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1073 	}
1074 
1075 	dec_unacked(mdev);
1076 	move_to_net_ee_or_free(mdev, e);
1077 
1078 	if (unlikely(!ok))
1079 		dev_err(DEV, "drbd_send_block/ack() failed\n");
1080 	return ok;
1081 }
1082 
1083 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1084 {
1085 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1086 	int digest_size;
1087 	void *digest;
1088 	int ok = 1;
1089 
1090 	if (unlikely(cancel))
1091 		goto out;
1092 
1093 	if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1094 		goto out;
1095 
1096 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1097 	/* FIXME if this allocation fails, online verify will not terminate! */
1098 	digest = kmalloc(digest_size, GFP_NOIO);
1099 	if (digest) {
1100 		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1101 		inc_rs_pending(mdev);
1102 		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1103 					     digest, digest_size, P_OV_REPLY);
1104 		if (!ok)
1105 			dec_rs_pending(mdev);
1106 		kfree(digest);
1107 	}
1108 
1109 out:
1110 	drbd_free_ee(mdev, e);
1111 
1112 	dec_unacked(mdev);
1113 
1114 	return ok;
1115 }
1116 
1117 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1118 {
1119 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1120 		mdev->ov_last_oos_size += size>>9;
1121 	} else {
1122 		mdev->ov_last_oos_start = sector;
1123 		mdev->ov_last_oos_size = size>>9;
1124 	}
1125 	drbd_set_out_of_sync(mdev, sector, size);
1126 	set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1127 }
1128 
1129 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1130 {
1131 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1132 	struct digest_info *di;
1133 	int digest_size;
1134 	void *digest;
1135 	int ok, eq = 0;
1136 
1137 	if (unlikely(cancel)) {
1138 		drbd_free_ee(mdev, e);
1139 		dec_unacked(mdev);
1140 		return 1;
1141 	}
1142 
1143 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1144 	 * the resync lru has been cleaned up already */
1145 	if (get_ldev(mdev)) {
1146 		drbd_rs_complete_io(mdev, e->sector);
1147 		put_ldev(mdev);
1148 	}
1149 
1150 	di = e->digest;
1151 
1152 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1153 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1154 		digest = kmalloc(digest_size, GFP_NOIO);
1155 		if (digest) {
1156 			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1157 
1158 			D_ASSERT(digest_size == di->digest_size);
1159 			eq = !memcmp(digest, di->digest, digest_size);
1160 			kfree(digest);
1161 		}
1162 	} else {
1163 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1164 		if (__ratelimit(&drbd_ratelimit_state))
1165 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1166 	}
1167 
1168 	dec_unacked(mdev);
1169 	if (!eq)
1170 		drbd_ov_oos_found(mdev, e->sector, e->size);
1171 	else
1172 		ov_oos_print(mdev);
1173 
1174 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1175 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1176 
1177 	drbd_free_ee(mdev, e);
1178 
1179 	if (--mdev->ov_left == 0) {
1180 		ov_oos_print(mdev);
1181 		drbd_resync_finished(mdev);
1182 	}
1183 
1184 	return ok;
1185 }
1186 
1187 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1188 {
1189 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1190 	complete(&b->done);
1191 	return 1;
1192 }
1193 
1194 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1195 {
1196 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1197 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1198 	int ok = 1;
1199 
1200 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1201 	 * just before it was reassigned and re-queued, so double check that.
1202 	 * actually, this race was harmless, since we only try to send the
1203 	 * barrier packet here, and otherwise do nothing with the object.
1204 	 * but compare with the head of w_clear_epoch */
1205 	spin_lock_irq(&mdev->req_lock);
1206 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1207 		cancel = 1;
1208 	spin_unlock_irq(&mdev->req_lock);
1209 	if (cancel)
1210 		return 1;
1211 
1212 	if (!drbd_get_data_sock(mdev))
1213 		return 0;
1214 	p->barrier = b->br_number;
1215 	/* inc_ap_pending was done where this was queued.
1216 	 * dec_ap_pending will be done in got_BarrierAck
1217 	 * or (on connection loss) in w_clear_epoch.  */
1218 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1219 				(struct p_header80 *)p, sizeof(*p), 0);
1220 	drbd_put_data_sock(mdev);
1221 
1222 	return ok;
1223 }
1224 
1225 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1226 {
1227 	if (cancel)
1228 		return 1;
1229 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1230 }
1231 
1232 /**
1233  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1234  * @mdev:	DRBD device.
1235  * @w:		work object.
1236  * @cancel:	The connection will be closed anyways
1237  */
1238 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1239 {
1240 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1241 	int ok;
1242 
1243 	if (unlikely(cancel)) {
1244 		req_mod(req, send_canceled);
1245 		return 1;
1246 	}
1247 
1248 	ok = drbd_send_dblock(mdev, req);
1249 	req_mod(req, ok ? handed_over_to_network : send_failed);
1250 
1251 	return ok;
1252 }
1253 
1254 /**
1255  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1256  * @mdev:	DRBD device.
1257  * @w:		work object.
1258  * @cancel:	The connection will be closed anyways
1259  */
1260 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1261 {
1262 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1263 	int ok;
1264 
1265 	if (unlikely(cancel)) {
1266 		req_mod(req, send_canceled);
1267 		return 1;
1268 	}
1269 
1270 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1271 				(unsigned long)req);
1272 
1273 	if (!ok) {
1274 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1275 		 * so this is probably redundant */
1276 		if (mdev->state.conn >= C_CONNECTED)
1277 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1278 	}
1279 	req_mod(req, ok ? handed_over_to_network : send_failed);
1280 
1281 	return ok;
1282 }
1283 
1284 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1285 {
1286 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1287 
1288 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1289 		drbd_al_begin_io(mdev, req->sector);
1290 	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1291 	   theoretically. Practically it can not deadlock, since this is
1292 	   only used when unfreezing IOs. All the extents of the requests
1293 	   that made it into the TL are already active */
1294 
1295 	drbd_req_make_private_bio(req, req->master_bio);
1296 	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1297 	generic_make_request(req->private_bio);
1298 
1299 	return 1;
1300 }
1301 
1302 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1303 {
1304 	struct drbd_conf *odev = mdev;
1305 
1306 	while (1) {
1307 		if (odev->sync_conf.after == -1)
1308 			return 1;
1309 		odev = minor_to_mdev(odev->sync_conf.after);
1310 		ERR_IF(!odev) return 1;
1311 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1312 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1313 		    odev->state.aftr_isp || odev->state.peer_isp ||
1314 		    odev->state.user_isp)
1315 			return 0;
1316 	}
1317 }
1318 
1319 /**
1320  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1321  * @mdev:	DRBD device.
1322  *
1323  * Called from process context only (admin command and after_state_ch).
1324  */
1325 static int _drbd_pause_after(struct drbd_conf *mdev)
1326 {
1327 	struct drbd_conf *odev;
1328 	int i, rv = 0;
1329 
1330 	for (i = 0; i < minor_count; i++) {
1331 		odev = minor_to_mdev(i);
1332 		if (!odev)
1333 			continue;
1334 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1335 			continue;
1336 		if (!_drbd_may_sync_now(odev))
1337 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1338 			       != SS_NOTHING_TO_DO);
1339 	}
1340 
1341 	return rv;
1342 }
1343 
1344 /**
1345  * _drbd_resume_next() - Resume resync on all devices that may resync now
1346  * @mdev:	DRBD device.
1347  *
1348  * Called from process context only (admin command and worker).
1349  */
1350 static int _drbd_resume_next(struct drbd_conf *mdev)
1351 {
1352 	struct drbd_conf *odev;
1353 	int i, rv = 0;
1354 
1355 	for (i = 0; i < minor_count; i++) {
1356 		odev = minor_to_mdev(i);
1357 		if (!odev)
1358 			continue;
1359 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1360 			continue;
1361 		if (odev->state.aftr_isp) {
1362 			if (_drbd_may_sync_now(odev))
1363 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1364 							CS_HARD, NULL)
1365 				       != SS_NOTHING_TO_DO) ;
1366 		}
1367 	}
1368 	return rv;
1369 }
1370 
1371 void resume_next_sg(struct drbd_conf *mdev)
1372 {
1373 	write_lock_irq(&global_state_lock);
1374 	_drbd_resume_next(mdev);
1375 	write_unlock_irq(&global_state_lock);
1376 }
1377 
1378 void suspend_other_sg(struct drbd_conf *mdev)
1379 {
1380 	write_lock_irq(&global_state_lock);
1381 	_drbd_pause_after(mdev);
1382 	write_unlock_irq(&global_state_lock);
1383 }
1384 
1385 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1386 {
1387 	struct drbd_conf *odev;
1388 
1389 	if (o_minor == -1)
1390 		return NO_ERROR;
1391 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1392 		return ERR_SYNC_AFTER;
1393 
1394 	/* check for loops */
1395 	odev = minor_to_mdev(o_minor);
1396 	while (1) {
1397 		if (odev == mdev)
1398 			return ERR_SYNC_AFTER_CYCLE;
1399 
1400 		/* dependency chain ends here, no cycles. */
1401 		if (odev->sync_conf.after == -1)
1402 			return NO_ERROR;
1403 
1404 		/* follow the dependency chain */
1405 		odev = minor_to_mdev(odev->sync_conf.after);
1406 	}
1407 }
1408 
1409 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1410 {
1411 	int changes;
1412 	int retcode;
1413 
1414 	write_lock_irq(&global_state_lock);
1415 	retcode = sync_after_error(mdev, na);
1416 	if (retcode == NO_ERROR) {
1417 		mdev->sync_conf.after = na;
1418 		do {
1419 			changes  = _drbd_pause_after(mdev);
1420 			changes |= _drbd_resume_next(mdev);
1421 		} while (changes);
1422 	}
1423 	write_unlock_irq(&global_state_lock);
1424 	return retcode;
1425 }
1426 
1427 /**
1428  * drbd_start_resync() - Start the resync process
1429  * @mdev:	DRBD device.
1430  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1431  *
1432  * This function might bring you directly into one of the
1433  * C_PAUSED_SYNC_* states.
1434  */
1435 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1436 {
1437 	union drbd_state ns;
1438 	int r;
1439 
1440 	if (mdev->state.conn >= C_SYNC_SOURCE) {
1441 		dev_err(DEV, "Resync already running!\n");
1442 		return;
1443 	}
1444 
1445 	/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1446 	drbd_rs_cancel_all(mdev);
1447 
1448 	if (side == C_SYNC_TARGET) {
1449 		/* Since application IO was locked out during C_WF_BITMAP_T and
1450 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1451 		   we check that we might make the data inconsistent. */
1452 		r = drbd_khelper(mdev, "before-resync-target");
1453 		r = (r >> 8) & 0xff;
1454 		if (r > 0) {
1455 			dev_info(DEV, "before-resync-target handler returned %d, "
1456 			     "dropping connection.\n", r);
1457 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1458 			return;
1459 		}
1460 	}
1461 
1462 	drbd_state_lock(mdev);
1463 
1464 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1465 		drbd_state_unlock(mdev);
1466 		return;
1467 	}
1468 
1469 	if (side == C_SYNC_TARGET) {
1470 		mdev->bm_resync_fo = 0;
1471 	} else /* side == C_SYNC_SOURCE */ {
1472 		u64 uuid;
1473 
1474 		get_random_bytes(&uuid, sizeof(u64));
1475 		drbd_uuid_set(mdev, UI_BITMAP, uuid);
1476 		drbd_send_sync_uuid(mdev, uuid);
1477 
1478 		D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1479 	}
1480 
1481 	write_lock_irq(&global_state_lock);
1482 	ns = mdev->state;
1483 
1484 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1485 
1486 	ns.conn = side;
1487 
1488 	if (side == C_SYNC_TARGET)
1489 		ns.disk = D_INCONSISTENT;
1490 	else /* side == C_SYNC_SOURCE */
1491 		ns.pdsk = D_INCONSISTENT;
1492 
1493 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1494 	ns = mdev->state;
1495 
1496 	if (ns.conn < C_CONNECTED)
1497 		r = SS_UNKNOWN_ERROR;
1498 
1499 	if (r == SS_SUCCESS) {
1500 		unsigned long tw = drbd_bm_total_weight(mdev);
1501 		unsigned long now = jiffies;
1502 		int i;
1503 
1504 		mdev->rs_failed    = 0;
1505 		mdev->rs_paused    = 0;
1506 		mdev->rs_same_csum = 0;
1507 		mdev->rs_last_events = 0;
1508 		mdev->rs_last_sect_ev = 0;
1509 		mdev->rs_total     = tw;
1510 		mdev->rs_start     = now;
1511 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1512 			mdev->rs_mark_left[i] = tw;
1513 			mdev->rs_mark_time[i] = now;
1514 		}
1515 		_drbd_pause_after(mdev);
1516 	}
1517 	write_unlock_irq(&global_state_lock);
1518 	put_ldev(mdev);
1519 
1520 	if (r == SS_SUCCESS) {
1521 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1522 		     drbd_conn_str(ns.conn),
1523 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1524 		     (unsigned long) mdev->rs_total);
1525 
1526 		if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1527 			/* This still has a race (about when exactly the peers
1528 			 * detect connection loss) that can lead to a full sync
1529 			 * on next handshake. In 8.3.9 we fixed this with explicit
1530 			 * resync-finished notifications, but the fix
1531 			 * introduces a protocol change.  Sleeping for some
1532 			 * time longer than the ping interval + timeout on the
1533 			 * SyncSource, to give the SyncTarget the chance to
1534 			 * detect connection loss, then waiting for a ping
1535 			 * response (implicit in drbd_resync_finished) reduces
1536 			 * the race considerably, but does not solve it. */
1537 			if (side == C_SYNC_SOURCE)
1538 				schedule_timeout_interruptible(
1539 					mdev->net_conf->ping_int * HZ +
1540 					mdev->net_conf->ping_timeo*HZ/9);
1541 			drbd_resync_finished(mdev);
1542 		}
1543 
1544 		atomic_set(&mdev->rs_sect_in, 0);
1545 		atomic_set(&mdev->rs_sect_ev, 0);
1546 		mdev->rs_in_flight = 0;
1547 		mdev->rs_planed = 0;
1548 		spin_lock(&mdev->peer_seq_lock);
1549 		fifo_set(&mdev->rs_plan_s, 0);
1550 		spin_unlock(&mdev->peer_seq_lock);
1551 		/* ns.conn may already be != mdev->state.conn,
1552 		 * we may have been paused in between, or become paused until
1553 		 * the timer triggers.
1554 		 * No matter, that is handled in resync_timer_fn() */
1555 		if (ns.conn == C_SYNC_TARGET)
1556 			mod_timer(&mdev->resync_timer, jiffies);
1557 
1558 		drbd_md_sync(mdev);
1559 	}
1560 	drbd_state_unlock(mdev);
1561 }
1562 
1563 int drbd_worker(struct drbd_thread *thi)
1564 {
1565 	struct drbd_conf *mdev = thi->mdev;
1566 	struct drbd_work *w = NULL;
1567 	LIST_HEAD(work_list);
1568 	int intr = 0, i;
1569 
1570 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1571 
1572 	while (get_t_state(thi) == Running) {
1573 		drbd_thread_current_set_cpu(mdev);
1574 
1575 		if (down_trylock(&mdev->data.work.s)) {
1576 			mutex_lock(&mdev->data.mutex);
1577 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1578 				drbd_tcp_uncork(mdev->data.socket);
1579 			mutex_unlock(&mdev->data.mutex);
1580 
1581 			intr = down_interruptible(&mdev->data.work.s);
1582 
1583 			mutex_lock(&mdev->data.mutex);
1584 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1585 				drbd_tcp_cork(mdev->data.socket);
1586 			mutex_unlock(&mdev->data.mutex);
1587 		}
1588 
1589 		if (intr) {
1590 			D_ASSERT(intr == -EINTR);
1591 			flush_signals(current);
1592 			ERR_IF (get_t_state(thi) == Running)
1593 				continue;
1594 			break;
1595 		}
1596 
1597 		if (get_t_state(thi) != Running)
1598 			break;
1599 		/* With this break, we have done a down() but not consumed
1600 		   the entry from the list. The cleanup code takes care of
1601 		   this...   */
1602 
1603 		w = NULL;
1604 		spin_lock_irq(&mdev->data.work.q_lock);
1605 		ERR_IF(list_empty(&mdev->data.work.q)) {
1606 			/* something terribly wrong in our logic.
1607 			 * we were able to down() the semaphore,
1608 			 * but the list is empty... doh.
1609 			 *
1610 			 * what is the best thing to do now?
1611 			 * try again from scratch, restarting the receiver,
1612 			 * asender, whatnot? could break even more ugly,
1613 			 * e.g. when we are primary, but no good local data.
1614 			 *
1615 			 * I'll try to get away just starting over this loop.
1616 			 */
1617 			spin_unlock_irq(&mdev->data.work.q_lock);
1618 			continue;
1619 		}
1620 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1621 		list_del_init(&w->list);
1622 		spin_unlock_irq(&mdev->data.work.q_lock);
1623 
1624 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1625 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1626 			if (mdev->state.conn >= C_CONNECTED)
1627 				drbd_force_state(mdev,
1628 						NS(conn, C_NETWORK_FAILURE));
1629 		}
1630 	}
1631 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1632 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1633 
1634 	spin_lock_irq(&mdev->data.work.q_lock);
1635 	i = 0;
1636 	while (!list_empty(&mdev->data.work.q)) {
1637 		list_splice_init(&mdev->data.work.q, &work_list);
1638 		spin_unlock_irq(&mdev->data.work.q_lock);
1639 
1640 		while (!list_empty(&work_list)) {
1641 			w = list_entry(work_list.next, struct drbd_work, list);
1642 			list_del_init(&w->list);
1643 			w->cb(mdev, w, 1);
1644 			i++; /* dead debugging code */
1645 		}
1646 
1647 		spin_lock_irq(&mdev->data.work.q_lock);
1648 	}
1649 	sema_init(&mdev->data.work.s, 0);
1650 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1651 	 * but up() ed outside the spinlock, we could get an up() on the
1652 	 * semaphore without corresponding list entry.
1653 	 * So don't do that.
1654 	 */
1655 	spin_unlock_irq(&mdev->data.work.q_lock);
1656 
1657 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1658 	/* _drbd_set_state only uses stop_nowait.
1659 	 * wait here for the Exiting receiver. */
1660 	drbd_thread_stop(&mdev->receiver);
1661 	drbd_mdev_cleanup(mdev);
1662 
1663 	dev_info(DEV, "worker terminated\n");
1664 
1665 	clear_bit(DEVICE_DYING, &mdev->flags);
1666 	clear_bit(CONFIG_PENDING, &mdev->flags);
1667 	wake_up(&mdev->state_wait);
1668 
1669 	return 0;
1670 }
1671