xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision baa7eb025ab14f3cba2e35c0a8648f9c9f01d24f)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40 
41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42 
43 
44 
45 /* defined here:
46    drbd_md_io_complete
47    drbd_endio_sec
48    drbd_endio_pri
49 
50  * more endio handlers:
51    atodb_endio in drbd_actlog.c
52    drbd_bm_async_io_complete in drbd_bitmap.c
53 
54  * For all these callbacks, note the following:
55  * The callbacks will be called in irq context by the IDE drivers,
56  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
57  * Try to get the locking right :)
58  *
59  */
60 
61 
62 /* About the global_state_lock
63    Each state transition on an device holds a read lock. In case we have
64    to evaluate the sync after dependencies, we grab a write lock, because
65    we need stable states on all devices for that.  */
66 rwlock_t global_state_lock;
67 
68 /* used for synchronous meta data and bitmap IO
69  * submitted by drbd_md_sync_page_io()
70  */
71 void drbd_md_io_complete(struct bio *bio, int error)
72 {
73 	struct drbd_md_io *md_io;
74 
75 	md_io = (struct drbd_md_io *)bio->bi_private;
76 	md_io->error = error;
77 
78 	complete(&md_io->event);
79 }
80 
81 /* reads on behalf of the partner,
82  * "submitted" by the receiver
83  */
84 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
85 {
86 	unsigned long flags = 0;
87 	struct drbd_conf *mdev = e->mdev;
88 
89 	D_ASSERT(e->block_id != ID_VACANT);
90 
91 	spin_lock_irqsave(&mdev->req_lock, flags);
92 	mdev->read_cnt += e->size >> 9;
93 	list_del(&e->w.list);
94 	if (list_empty(&mdev->read_ee))
95 		wake_up(&mdev->ee_wait);
96 	if (test_bit(__EE_WAS_ERROR, &e->flags))
97 		__drbd_chk_io_error(mdev, FALSE);
98 	spin_unlock_irqrestore(&mdev->req_lock, flags);
99 
100 	drbd_queue_work(&mdev->data.work, &e->w);
101 	put_ldev(mdev);
102 }
103 
104 /* writes on behalf of the partner, or resync writes,
105  * "submitted" by the receiver, final stage.  */
106 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
107 {
108 	unsigned long flags = 0;
109 	struct drbd_conf *mdev = e->mdev;
110 	sector_t e_sector;
111 	int do_wake;
112 	int is_syncer_req;
113 	int do_al_complete_io;
114 
115 	D_ASSERT(e->block_id != ID_VACANT);
116 
117 	/* after we moved e to done_ee,
118 	 * we may no longer access it,
119 	 * it may be freed/reused already!
120 	 * (as soon as we release the req_lock) */
121 	e_sector = e->sector;
122 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
123 	is_syncer_req = is_syncer_block_id(e->block_id);
124 
125 	spin_lock_irqsave(&mdev->req_lock, flags);
126 	mdev->writ_cnt += e->size >> 9;
127 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
128 	list_add_tail(&e->w.list, &mdev->done_ee);
129 
130 	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
131 	 * neither did we wake possibly waiting conflicting requests.
132 	 * done from "drbd_process_done_ee" within the appropriate w.cb
133 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
134 
135 	do_wake = is_syncer_req
136 		? list_empty(&mdev->sync_ee)
137 		: list_empty(&mdev->active_ee);
138 
139 	if (test_bit(__EE_WAS_ERROR, &e->flags))
140 		__drbd_chk_io_error(mdev, FALSE);
141 	spin_unlock_irqrestore(&mdev->req_lock, flags);
142 
143 	if (is_syncer_req)
144 		drbd_rs_complete_io(mdev, e_sector);
145 
146 	if (do_wake)
147 		wake_up(&mdev->ee_wait);
148 
149 	if (do_al_complete_io)
150 		drbd_al_complete_io(mdev, e_sector);
151 
152 	wake_asender(mdev);
153 	put_ldev(mdev);
154 }
155 
156 /* writes on behalf of the partner, or resync writes,
157  * "submitted" by the receiver.
158  */
159 void drbd_endio_sec(struct bio *bio, int error)
160 {
161 	struct drbd_epoch_entry *e = bio->bi_private;
162 	struct drbd_conf *mdev = e->mdev;
163 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
164 	int is_write = bio_data_dir(bio) == WRITE;
165 
166 	if (error)
167 		dev_warn(DEV, "%s: error=%d s=%llus\n",
168 				is_write ? "write" : "read", error,
169 				(unsigned long long)e->sector);
170 	if (!error && !uptodate) {
171 		dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
172 				is_write ? "write" : "read",
173 				(unsigned long long)e->sector);
174 		/* strange behavior of some lower level drivers...
175 		 * fail the request by clearing the uptodate flag,
176 		 * but do not return any error?! */
177 		error = -EIO;
178 	}
179 
180 	if (error)
181 		set_bit(__EE_WAS_ERROR, &e->flags);
182 
183 	bio_put(bio); /* no need for the bio anymore */
184 	if (atomic_dec_and_test(&e->pending_bios)) {
185 		if (is_write)
186 			drbd_endio_write_sec_final(e);
187 		else
188 			drbd_endio_read_sec_final(e);
189 	}
190 }
191 
192 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
193  */
194 void drbd_endio_pri(struct bio *bio, int error)
195 {
196 	unsigned long flags;
197 	struct drbd_request *req = bio->bi_private;
198 	struct drbd_conf *mdev = req->mdev;
199 	struct bio_and_error m;
200 	enum drbd_req_event what;
201 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
202 
203 	if (!error && !uptodate) {
204 		dev_warn(DEV, "p %s: setting error to -EIO\n",
205 			 bio_data_dir(bio) == WRITE ? "write" : "read");
206 		/* strange behavior of some lower level drivers...
207 		 * fail the request by clearing the uptodate flag,
208 		 * but do not return any error?! */
209 		error = -EIO;
210 	}
211 
212 	/* to avoid recursion in __req_mod */
213 	if (unlikely(error)) {
214 		what = (bio_data_dir(bio) == WRITE)
215 			? write_completed_with_error
216 			: (bio_rw(bio) == READ)
217 			  ? read_completed_with_error
218 			  : read_ahead_completed_with_error;
219 	} else
220 		what = completed_ok;
221 
222 	bio_put(req->private_bio);
223 	req->private_bio = ERR_PTR(error);
224 
225 	/* not req_mod(), we need irqsave here! */
226 	spin_lock_irqsave(&mdev->req_lock, flags);
227 	__req_mod(req, what, &m);
228 	spin_unlock_irqrestore(&mdev->req_lock, flags);
229 
230 	if (m.bio)
231 		complete_master_bio(mdev, &m);
232 }
233 
234 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
235 {
236 	struct drbd_request *req = container_of(w, struct drbd_request, w);
237 
238 	/* We should not detach for read io-error,
239 	 * but try to WRITE the P_DATA_REPLY to the failed location,
240 	 * to give the disk the chance to relocate that block */
241 
242 	spin_lock_irq(&mdev->req_lock);
243 	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
244 		_req_mod(req, read_retry_remote_canceled);
245 		spin_unlock_irq(&mdev->req_lock);
246 		return 1;
247 	}
248 	spin_unlock_irq(&mdev->req_lock);
249 
250 	return w_send_read_req(mdev, w, 0);
251 }
252 
253 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
254 {
255 	ERR_IF(cancel) return 1;
256 	dev_err(DEV, "resync inactive, but callback triggered??\n");
257 	return 1; /* Simply ignore this! */
258 }
259 
260 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
261 {
262 	struct hash_desc desc;
263 	struct scatterlist sg;
264 	struct page *page = e->pages;
265 	struct page *tmp;
266 	unsigned len;
267 
268 	desc.tfm = tfm;
269 	desc.flags = 0;
270 
271 	sg_init_table(&sg, 1);
272 	crypto_hash_init(&desc);
273 
274 	while ((tmp = page_chain_next(page))) {
275 		/* all but the last page will be fully used */
276 		sg_set_page(&sg, page, PAGE_SIZE, 0);
277 		crypto_hash_update(&desc, &sg, sg.length);
278 		page = tmp;
279 	}
280 	/* and now the last, possibly only partially used page */
281 	len = e->size & (PAGE_SIZE - 1);
282 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
283 	crypto_hash_update(&desc, &sg, sg.length);
284 	crypto_hash_final(&desc, digest);
285 }
286 
287 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
288 {
289 	struct hash_desc desc;
290 	struct scatterlist sg;
291 	struct bio_vec *bvec;
292 	int i;
293 
294 	desc.tfm = tfm;
295 	desc.flags = 0;
296 
297 	sg_init_table(&sg, 1);
298 	crypto_hash_init(&desc);
299 
300 	__bio_for_each_segment(bvec, bio, i, 0) {
301 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
302 		crypto_hash_update(&desc, &sg, sg.length);
303 	}
304 	crypto_hash_final(&desc, digest);
305 }
306 
307 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
308 {
309 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
310 	int digest_size;
311 	void *digest;
312 	int ok;
313 
314 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
315 
316 	if (unlikely(cancel)) {
317 		drbd_free_ee(mdev, e);
318 		return 1;
319 	}
320 
321 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
322 		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
323 		digest = kmalloc(digest_size, GFP_NOIO);
324 		if (digest) {
325 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
326 
327 			inc_rs_pending(mdev);
328 			ok = drbd_send_drequest_csum(mdev,
329 						     e->sector,
330 						     e->size,
331 						     digest,
332 						     digest_size,
333 						     P_CSUM_RS_REQUEST);
334 			kfree(digest);
335 		} else {
336 			dev_err(DEV, "kmalloc() of digest failed.\n");
337 			ok = 0;
338 		}
339 	} else
340 		ok = 1;
341 
342 	drbd_free_ee(mdev, e);
343 
344 	if (unlikely(!ok))
345 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
346 	return ok;
347 }
348 
349 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
350 
351 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
352 {
353 	struct drbd_epoch_entry *e;
354 
355 	if (!get_ldev(mdev))
356 		return -EIO;
357 
358 	if (drbd_rs_should_slow_down(mdev))
359 		goto defer;
360 
361 	/* GFP_TRY, because if there is no memory available right now, this may
362 	 * be rescheduled for later. It is "only" background resync, after all. */
363 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
364 	if (!e)
365 		goto defer;
366 
367 	e->w.cb = w_e_send_csum;
368 	spin_lock_irq(&mdev->req_lock);
369 	list_add(&e->w.list, &mdev->read_ee);
370 	spin_unlock_irq(&mdev->req_lock);
371 
372 	atomic_add(size >> 9, &mdev->rs_sect_ev);
373 	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
374 		return 0;
375 
376 	/* drbd_submit_ee currently fails for one reason only:
377 	 * not being able to allocate enough bios.
378 	 * Is dropping the connection going to help? */
379 	spin_lock_irq(&mdev->req_lock);
380 	list_del(&e->w.list);
381 	spin_unlock_irq(&mdev->req_lock);
382 
383 	drbd_free_ee(mdev, e);
384 defer:
385 	put_ldev(mdev);
386 	return -EAGAIN;
387 }
388 
389 void resync_timer_fn(unsigned long data)
390 {
391 	struct drbd_conf *mdev = (struct drbd_conf *) data;
392 	int queue;
393 
394 	queue = 1;
395 	switch (mdev->state.conn) {
396 	case C_VERIFY_S:
397 		mdev->resync_work.cb = w_make_ov_request;
398 		break;
399 	case C_SYNC_TARGET:
400 		mdev->resync_work.cb = w_make_resync_request;
401 		break;
402 	default:
403 		queue = 0;
404 		mdev->resync_work.cb = w_resync_inactive;
405 	}
406 
407 	/* harmless race: list_empty outside data.work.q_lock */
408 	if (list_empty(&mdev->resync_work.list) && queue)
409 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
410 }
411 
412 static void fifo_set(struct fifo_buffer *fb, int value)
413 {
414 	int i;
415 
416 	for (i = 0; i < fb->size; i++)
417 		fb->values[i] = value;
418 }
419 
420 static int fifo_push(struct fifo_buffer *fb, int value)
421 {
422 	int ov;
423 
424 	ov = fb->values[fb->head_index];
425 	fb->values[fb->head_index++] = value;
426 
427 	if (fb->head_index >= fb->size)
428 		fb->head_index = 0;
429 
430 	return ov;
431 }
432 
433 static void fifo_add_val(struct fifo_buffer *fb, int value)
434 {
435 	int i;
436 
437 	for (i = 0; i < fb->size; i++)
438 		fb->values[i] += value;
439 }
440 
441 int drbd_rs_controller(struct drbd_conf *mdev)
442 {
443 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
444 	unsigned int want;     /* The number of sectors we want in the proxy */
445 	int req_sect; /* Number of sectors to request in this turn */
446 	int correction; /* Number of sectors more we need in the proxy*/
447 	int cps; /* correction per invocation of drbd_rs_controller() */
448 	int steps; /* Number of time steps to plan ahead */
449 	int curr_corr;
450 	int max_sect;
451 
452 	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
453 	mdev->rs_in_flight -= sect_in;
454 
455 	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
456 
457 	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
458 
459 	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
460 		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
461 	} else { /* normal path */
462 		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
463 			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
464 	}
465 
466 	correction = want - mdev->rs_in_flight - mdev->rs_planed;
467 
468 	/* Plan ahead */
469 	cps = correction / steps;
470 	fifo_add_val(&mdev->rs_plan_s, cps);
471 	mdev->rs_planed += cps * steps;
472 
473 	/* What we do in this step */
474 	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
475 	spin_unlock(&mdev->peer_seq_lock);
476 	mdev->rs_planed -= curr_corr;
477 
478 	req_sect = sect_in + curr_corr;
479 	if (req_sect < 0)
480 		req_sect = 0;
481 
482 	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
483 	if (req_sect > max_sect)
484 		req_sect = max_sect;
485 
486 	/*
487 	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
488 		 sect_in, mdev->rs_in_flight, want, correction,
489 		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
490 	*/
491 
492 	return req_sect;
493 }
494 
495 int w_make_resync_request(struct drbd_conf *mdev,
496 		struct drbd_work *w, int cancel)
497 {
498 	unsigned long bit;
499 	sector_t sector;
500 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
501 	int max_segment_size;
502 	int number, rollback_i, size, pe, mx;
503 	int align, queued, sndbuf;
504 	int i = 0;
505 
506 	if (unlikely(cancel))
507 		return 1;
508 
509 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
510 		dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
511 		return 0;
512 	}
513 
514 	if (mdev->state.conn != C_SYNC_TARGET)
515 		dev_err(DEV, "%s in w_make_resync_request\n",
516 			drbd_conn_str(mdev->state.conn));
517 
518 	if (mdev->rs_total == 0) {
519 		/* empty resync? */
520 		drbd_resync_finished(mdev);
521 		return 1;
522 	}
523 
524 	if (!get_ldev(mdev)) {
525 		/* Since we only need to access mdev->rsync a
526 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
527 		   to continue resync with a broken disk makes no sense at
528 		   all */
529 		dev_err(DEV, "Disk broke down during resync!\n");
530 		mdev->resync_work.cb = w_resync_inactive;
531 		return 1;
532 	}
533 
534 	/* starting with drbd 8.3.8, we can handle multi-bio EEs,
535 	 * if it should be necessary */
536 	max_segment_size =
537 		mdev->agreed_pro_version < 94 ? queue_max_segment_size(mdev->rq_queue) :
538 		mdev->agreed_pro_version < 95 ?	DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_SEGMENT_SIZE;
539 
540 	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
541 		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
542 		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
543 	} else {
544 		mdev->c_sync_rate = mdev->sync_conf.rate;
545 		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
546 	}
547 
548 	/* Throttle resync on lower level disk activity, which may also be
549 	 * caused by application IO on Primary/SyncTarget.
550 	 * Keep this after the call to drbd_rs_controller, as that assumes
551 	 * to be called as precisely as possible every SLEEP_TIME,
552 	 * and would be confused otherwise. */
553 	if (drbd_rs_should_slow_down(mdev))
554 		goto requeue;
555 
556 	mutex_lock(&mdev->data.mutex);
557 	if (mdev->data.socket)
558 		mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
559 	else
560 		mx = 1;
561 	mutex_unlock(&mdev->data.mutex);
562 
563 	/* For resync rates >160MB/sec, allow more pending RS requests */
564 	if (number > mx)
565 		mx = number;
566 
567 	/* Limit the number of pending RS requests to no more than the peer's receive buffer */
568 	pe = atomic_read(&mdev->rs_pending_cnt);
569 	if ((pe + number) > mx) {
570 		number = mx - pe;
571 	}
572 
573 	for (i = 0; i < number; i++) {
574 		/* Stop generating RS requests, when half of the send buffer is filled */
575 		mutex_lock(&mdev->data.mutex);
576 		if (mdev->data.socket) {
577 			queued = mdev->data.socket->sk->sk_wmem_queued;
578 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
579 		} else {
580 			queued = 1;
581 			sndbuf = 0;
582 		}
583 		mutex_unlock(&mdev->data.mutex);
584 		if (queued > sndbuf / 2)
585 			goto requeue;
586 
587 next_sector:
588 		size = BM_BLOCK_SIZE;
589 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
590 
591 		if (bit == -1UL) {
592 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
593 			mdev->resync_work.cb = w_resync_inactive;
594 			put_ldev(mdev);
595 			return 1;
596 		}
597 
598 		sector = BM_BIT_TO_SECT(bit);
599 
600 		if (drbd_try_rs_begin_io(mdev, sector)) {
601 			mdev->bm_resync_fo = bit;
602 			goto requeue;
603 		}
604 		mdev->bm_resync_fo = bit + 1;
605 
606 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
607 			drbd_rs_complete_io(mdev, sector);
608 			goto next_sector;
609 		}
610 
611 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
612 		/* try to find some adjacent bits.
613 		 * we stop if we have already the maximum req size.
614 		 *
615 		 * Additionally always align bigger requests, in order to
616 		 * be prepared for all stripe sizes of software RAIDs.
617 		 */
618 		align = 1;
619 		rollback_i = i;
620 		for (;;) {
621 			if (size + BM_BLOCK_SIZE > max_segment_size)
622 				break;
623 
624 			/* Be always aligned */
625 			if (sector & ((1<<(align+3))-1))
626 				break;
627 
628 			/* do not cross extent boundaries */
629 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
630 				break;
631 			/* now, is it actually dirty, after all?
632 			 * caution, drbd_bm_test_bit is tri-state for some
633 			 * obscure reason; ( b == 0 ) would get the out-of-band
634 			 * only accidentally right because of the "oddly sized"
635 			 * adjustment below */
636 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
637 				break;
638 			bit++;
639 			size += BM_BLOCK_SIZE;
640 			if ((BM_BLOCK_SIZE << align) <= size)
641 				align++;
642 			i++;
643 		}
644 		/* if we merged some,
645 		 * reset the offset to start the next drbd_bm_find_next from */
646 		if (size > BM_BLOCK_SIZE)
647 			mdev->bm_resync_fo = bit + 1;
648 #endif
649 
650 		/* adjust very last sectors, in case we are oddly sized */
651 		if (sector + (size>>9) > capacity)
652 			size = (capacity-sector)<<9;
653 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
654 			switch (read_for_csum(mdev, sector, size)) {
655 			case -EIO: /* Disk failure */
656 				put_ldev(mdev);
657 				return 0;
658 			case -EAGAIN: /* allocation failed, or ldev busy */
659 				drbd_rs_complete_io(mdev, sector);
660 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
661 				i = rollback_i;
662 				goto requeue;
663 			case 0:
664 				/* everything ok */
665 				break;
666 			default:
667 				BUG();
668 			}
669 		} else {
670 			inc_rs_pending(mdev);
671 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
672 					       sector, size, ID_SYNCER)) {
673 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
674 				dec_rs_pending(mdev);
675 				put_ldev(mdev);
676 				return 0;
677 			}
678 		}
679 	}
680 
681 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
682 		/* last syncer _request_ was sent,
683 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
684 		 * next sync group will resume), as soon as we receive the last
685 		 * resync data block, and the last bit is cleared.
686 		 * until then resync "work" is "inactive" ...
687 		 */
688 		mdev->resync_work.cb = w_resync_inactive;
689 		put_ldev(mdev);
690 		return 1;
691 	}
692 
693  requeue:
694 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
695 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
696 	put_ldev(mdev);
697 	return 1;
698 }
699 
700 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
701 {
702 	int number, i, size;
703 	sector_t sector;
704 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
705 
706 	if (unlikely(cancel))
707 		return 1;
708 
709 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
710 		dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
711 		return 0;
712 	}
713 
714 	number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
715 	if (atomic_read(&mdev->rs_pending_cnt) > number)
716 		goto requeue;
717 
718 	number -= atomic_read(&mdev->rs_pending_cnt);
719 
720 	sector = mdev->ov_position;
721 	for (i = 0; i < number; i++) {
722 		if (sector >= capacity) {
723 			mdev->resync_work.cb = w_resync_inactive;
724 			return 1;
725 		}
726 
727 		size = BM_BLOCK_SIZE;
728 
729 		if (drbd_try_rs_begin_io(mdev, sector)) {
730 			mdev->ov_position = sector;
731 			goto requeue;
732 		}
733 
734 		if (sector + (size>>9) > capacity)
735 			size = (capacity-sector)<<9;
736 
737 		inc_rs_pending(mdev);
738 		if (!drbd_send_ov_request(mdev, sector, size)) {
739 			dec_rs_pending(mdev);
740 			return 0;
741 		}
742 		sector += BM_SECT_PER_BIT;
743 	}
744 	mdev->ov_position = sector;
745 
746  requeue:
747 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
748 	return 1;
749 }
750 
751 
752 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
753 {
754 	kfree(w);
755 	ov_oos_print(mdev);
756 	drbd_resync_finished(mdev);
757 
758 	return 1;
759 }
760 
761 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
762 {
763 	kfree(w);
764 
765 	drbd_resync_finished(mdev);
766 
767 	return 1;
768 }
769 
770 static void ping_peer(struct drbd_conf *mdev)
771 {
772 	clear_bit(GOT_PING_ACK, &mdev->flags);
773 	request_ping(mdev);
774 	wait_event(mdev->misc_wait,
775 		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
776 }
777 
778 int drbd_resync_finished(struct drbd_conf *mdev)
779 {
780 	unsigned long db, dt, dbdt;
781 	unsigned long n_oos;
782 	union drbd_state os, ns;
783 	struct drbd_work *w;
784 	char *khelper_cmd = NULL;
785 
786 	/* Remove all elements from the resync LRU. Since future actions
787 	 * might set bits in the (main) bitmap, then the entries in the
788 	 * resync LRU would be wrong. */
789 	if (drbd_rs_del_all(mdev)) {
790 		/* In case this is not possible now, most probably because
791 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
792 		 * queue (or even the read operations for those packets
793 		 * is not finished by now).   Retry in 100ms. */
794 
795 		drbd_kick_lo(mdev);
796 		__set_current_state(TASK_INTERRUPTIBLE);
797 		schedule_timeout(HZ / 10);
798 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
799 		if (w) {
800 			w->cb = w_resync_finished;
801 			drbd_queue_work(&mdev->data.work, w);
802 			return 1;
803 		}
804 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
805 	}
806 
807 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
808 	if (dt <= 0)
809 		dt = 1;
810 	db = mdev->rs_total;
811 	dbdt = Bit2KB(db/dt);
812 	mdev->rs_paused /= HZ;
813 
814 	if (!get_ldev(mdev))
815 		goto out;
816 
817 	ping_peer(mdev);
818 
819 	spin_lock_irq(&mdev->req_lock);
820 	os = mdev->state;
821 
822 	/* This protects us against multiple calls (that can happen in the presence
823 	   of application IO), and against connectivity loss just before we arrive here. */
824 	if (os.conn <= C_CONNECTED)
825 		goto out_unlock;
826 
827 	ns = os;
828 	ns.conn = C_CONNECTED;
829 
830 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
831 	     (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
832 	     "Online verify " : "Resync",
833 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
834 
835 	n_oos = drbd_bm_total_weight(mdev);
836 
837 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
838 		if (n_oos) {
839 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
840 			      n_oos, Bit2KB(1));
841 			khelper_cmd = "out-of-sync";
842 		}
843 	} else {
844 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
845 
846 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
847 			khelper_cmd = "after-resync-target";
848 
849 		if (mdev->csums_tfm && mdev->rs_total) {
850 			const unsigned long s = mdev->rs_same_csum;
851 			const unsigned long t = mdev->rs_total;
852 			const int ratio =
853 				(t == 0)     ? 0 :
854 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
855 			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
856 			     "transferred %luK total %luK\n",
857 			     ratio,
858 			     Bit2KB(mdev->rs_same_csum),
859 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
860 			     Bit2KB(mdev->rs_total));
861 		}
862 	}
863 
864 	if (mdev->rs_failed) {
865 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
866 
867 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
868 			ns.disk = D_INCONSISTENT;
869 			ns.pdsk = D_UP_TO_DATE;
870 		} else {
871 			ns.disk = D_UP_TO_DATE;
872 			ns.pdsk = D_INCONSISTENT;
873 		}
874 	} else {
875 		ns.disk = D_UP_TO_DATE;
876 		ns.pdsk = D_UP_TO_DATE;
877 
878 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
879 			if (mdev->p_uuid) {
880 				int i;
881 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
882 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
883 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
884 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
885 			} else {
886 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
887 			}
888 		}
889 
890 		drbd_uuid_set_bm(mdev, 0UL);
891 
892 		if (mdev->p_uuid) {
893 			/* Now the two UUID sets are equal, update what we
894 			 * know of the peer. */
895 			int i;
896 			for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
897 				mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
898 		}
899 	}
900 
901 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
902 out_unlock:
903 	spin_unlock_irq(&mdev->req_lock);
904 	put_ldev(mdev);
905 out:
906 	mdev->rs_total  = 0;
907 	mdev->rs_failed = 0;
908 	mdev->rs_paused = 0;
909 	mdev->ov_start_sector = 0;
910 
911 	drbd_md_sync(mdev);
912 
913 	if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
914 		dev_info(DEV, "Writing the whole bitmap\n");
915 		drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
916 	}
917 
918 	if (khelper_cmd)
919 		drbd_khelper(mdev, khelper_cmd);
920 
921 	return 1;
922 }
923 
924 /* helper */
925 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
926 {
927 	if (drbd_ee_has_active_page(e)) {
928 		/* This might happen if sendpage() has not finished */
929 		int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
930 		atomic_add(i, &mdev->pp_in_use_by_net);
931 		atomic_sub(i, &mdev->pp_in_use);
932 		spin_lock_irq(&mdev->req_lock);
933 		list_add_tail(&e->w.list, &mdev->net_ee);
934 		spin_unlock_irq(&mdev->req_lock);
935 		wake_up(&drbd_pp_wait);
936 	} else
937 		drbd_free_ee(mdev, e);
938 }
939 
940 /**
941  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
942  * @mdev:	DRBD device.
943  * @w:		work object.
944  * @cancel:	The connection will be closed anyways
945  */
946 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
947 {
948 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
949 	int ok;
950 
951 	if (unlikely(cancel)) {
952 		drbd_free_ee(mdev, e);
953 		dec_unacked(mdev);
954 		return 1;
955 	}
956 
957 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
958 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
959 	} else {
960 		if (__ratelimit(&drbd_ratelimit_state))
961 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
962 			    (unsigned long long)e->sector);
963 
964 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
965 	}
966 
967 	dec_unacked(mdev);
968 
969 	move_to_net_ee_or_free(mdev, e);
970 
971 	if (unlikely(!ok))
972 		dev_err(DEV, "drbd_send_block() failed\n");
973 	return ok;
974 }
975 
976 /**
977  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
978  * @mdev:	DRBD device.
979  * @w:		work object.
980  * @cancel:	The connection will be closed anyways
981  */
982 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
983 {
984 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
985 	int ok;
986 
987 	if (unlikely(cancel)) {
988 		drbd_free_ee(mdev, e);
989 		dec_unacked(mdev);
990 		return 1;
991 	}
992 
993 	if (get_ldev_if_state(mdev, D_FAILED)) {
994 		drbd_rs_complete_io(mdev, e->sector);
995 		put_ldev(mdev);
996 	}
997 
998 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
999 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1000 			inc_rs_pending(mdev);
1001 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1002 		} else {
1003 			if (__ratelimit(&drbd_ratelimit_state))
1004 				dev_err(DEV, "Not sending RSDataReply, "
1005 				    "partner DISKLESS!\n");
1006 			ok = 1;
1007 		}
1008 	} else {
1009 		if (__ratelimit(&drbd_ratelimit_state))
1010 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1011 			    (unsigned long long)e->sector);
1012 
1013 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1014 
1015 		/* update resync data with failure */
1016 		drbd_rs_failed_io(mdev, e->sector, e->size);
1017 	}
1018 
1019 	dec_unacked(mdev);
1020 
1021 	move_to_net_ee_or_free(mdev, e);
1022 
1023 	if (unlikely(!ok))
1024 		dev_err(DEV, "drbd_send_block() failed\n");
1025 	return ok;
1026 }
1027 
1028 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1029 {
1030 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1031 	struct digest_info *di;
1032 	int digest_size;
1033 	void *digest = NULL;
1034 	int ok, eq = 0;
1035 
1036 	if (unlikely(cancel)) {
1037 		drbd_free_ee(mdev, e);
1038 		dec_unacked(mdev);
1039 		return 1;
1040 	}
1041 
1042 	if (get_ldev(mdev)) {
1043 		drbd_rs_complete_io(mdev, e->sector);
1044 		put_ldev(mdev);
1045 	}
1046 
1047 	di = e->digest;
1048 
1049 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1050 		/* quick hack to try to avoid a race against reconfiguration.
1051 		 * a real fix would be much more involved,
1052 		 * introducing more locking mechanisms */
1053 		if (mdev->csums_tfm) {
1054 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1055 			D_ASSERT(digest_size == di->digest_size);
1056 			digest = kmalloc(digest_size, GFP_NOIO);
1057 		}
1058 		if (digest) {
1059 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1060 			eq = !memcmp(digest, di->digest, digest_size);
1061 			kfree(digest);
1062 		}
1063 
1064 		if (eq) {
1065 			drbd_set_in_sync(mdev, e->sector, e->size);
1066 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1067 			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1068 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1069 		} else {
1070 			inc_rs_pending(mdev);
1071 			e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1072 			e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1073 			kfree(di);
1074 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1075 		}
1076 	} else {
1077 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1078 		if (__ratelimit(&drbd_ratelimit_state))
1079 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1080 	}
1081 
1082 	dec_unacked(mdev);
1083 	move_to_net_ee_or_free(mdev, e);
1084 
1085 	if (unlikely(!ok))
1086 		dev_err(DEV, "drbd_send_block/ack() failed\n");
1087 	return ok;
1088 }
1089 
1090 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1091 {
1092 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1093 	int digest_size;
1094 	void *digest;
1095 	int ok = 1;
1096 
1097 	if (unlikely(cancel))
1098 		goto out;
1099 
1100 	if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1101 		goto out;
1102 
1103 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1104 	/* FIXME if this allocation fails, online verify will not terminate! */
1105 	digest = kmalloc(digest_size, GFP_NOIO);
1106 	if (digest) {
1107 		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1108 		inc_rs_pending(mdev);
1109 		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1110 					     digest, digest_size, P_OV_REPLY);
1111 		if (!ok)
1112 			dec_rs_pending(mdev);
1113 		kfree(digest);
1114 	}
1115 
1116 out:
1117 	drbd_free_ee(mdev, e);
1118 
1119 	dec_unacked(mdev);
1120 
1121 	return ok;
1122 }
1123 
1124 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1125 {
1126 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1127 		mdev->ov_last_oos_size += size>>9;
1128 	} else {
1129 		mdev->ov_last_oos_start = sector;
1130 		mdev->ov_last_oos_size = size>>9;
1131 	}
1132 	drbd_set_out_of_sync(mdev, sector, size);
1133 	set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1134 }
1135 
1136 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1137 {
1138 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1139 	struct digest_info *di;
1140 	int digest_size;
1141 	void *digest;
1142 	int ok, eq = 0;
1143 
1144 	if (unlikely(cancel)) {
1145 		drbd_free_ee(mdev, e);
1146 		dec_unacked(mdev);
1147 		return 1;
1148 	}
1149 
1150 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1151 	 * the resync lru has been cleaned up already */
1152 	if (get_ldev(mdev)) {
1153 		drbd_rs_complete_io(mdev, e->sector);
1154 		put_ldev(mdev);
1155 	}
1156 
1157 	di = e->digest;
1158 
1159 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1160 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1161 		digest = kmalloc(digest_size, GFP_NOIO);
1162 		if (digest) {
1163 			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1164 
1165 			D_ASSERT(digest_size == di->digest_size);
1166 			eq = !memcmp(digest, di->digest, digest_size);
1167 			kfree(digest);
1168 		}
1169 	} else {
1170 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1171 		if (__ratelimit(&drbd_ratelimit_state))
1172 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1173 	}
1174 
1175 	dec_unacked(mdev);
1176 	if (!eq)
1177 		drbd_ov_oos_found(mdev, e->sector, e->size);
1178 	else
1179 		ov_oos_print(mdev);
1180 
1181 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1182 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1183 
1184 	drbd_free_ee(mdev, e);
1185 
1186 	if (--mdev->ov_left == 0) {
1187 		ov_oos_print(mdev);
1188 		drbd_resync_finished(mdev);
1189 	}
1190 
1191 	return ok;
1192 }
1193 
1194 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1195 {
1196 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1197 	complete(&b->done);
1198 	return 1;
1199 }
1200 
1201 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1202 {
1203 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1204 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1205 	int ok = 1;
1206 
1207 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1208 	 * just before it was reassigned and re-queued, so double check that.
1209 	 * actually, this race was harmless, since we only try to send the
1210 	 * barrier packet here, and otherwise do nothing with the object.
1211 	 * but compare with the head of w_clear_epoch */
1212 	spin_lock_irq(&mdev->req_lock);
1213 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1214 		cancel = 1;
1215 	spin_unlock_irq(&mdev->req_lock);
1216 	if (cancel)
1217 		return 1;
1218 
1219 	if (!drbd_get_data_sock(mdev))
1220 		return 0;
1221 	p->barrier = b->br_number;
1222 	/* inc_ap_pending was done where this was queued.
1223 	 * dec_ap_pending will be done in got_BarrierAck
1224 	 * or (on connection loss) in w_clear_epoch.  */
1225 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1226 				(struct p_header80 *)p, sizeof(*p), 0);
1227 	drbd_put_data_sock(mdev);
1228 
1229 	return ok;
1230 }
1231 
1232 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1233 {
1234 	if (cancel)
1235 		return 1;
1236 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1237 }
1238 
1239 /**
1240  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1241  * @mdev:	DRBD device.
1242  * @w:		work object.
1243  * @cancel:	The connection will be closed anyways
1244  */
1245 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1246 {
1247 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1248 	int ok;
1249 
1250 	if (unlikely(cancel)) {
1251 		req_mod(req, send_canceled);
1252 		return 1;
1253 	}
1254 
1255 	ok = drbd_send_dblock(mdev, req);
1256 	req_mod(req, ok ? handed_over_to_network : send_failed);
1257 
1258 	return ok;
1259 }
1260 
1261 /**
1262  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1263  * @mdev:	DRBD device.
1264  * @w:		work object.
1265  * @cancel:	The connection will be closed anyways
1266  */
1267 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1268 {
1269 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1270 	int ok;
1271 
1272 	if (unlikely(cancel)) {
1273 		req_mod(req, send_canceled);
1274 		return 1;
1275 	}
1276 
1277 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1278 				(unsigned long)req);
1279 
1280 	if (!ok) {
1281 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1282 		 * so this is probably redundant */
1283 		if (mdev->state.conn >= C_CONNECTED)
1284 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1285 	}
1286 	req_mod(req, ok ? handed_over_to_network : send_failed);
1287 
1288 	return ok;
1289 }
1290 
1291 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1292 {
1293 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1294 
1295 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1296 		drbd_al_begin_io(mdev, req->sector);
1297 	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1298 	   theoretically. Practically it can not deadlock, since this is
1299 	   only used when unfreezing IOs. All the extents of the requests
1300 	   that made it into the TL are already active */
1301 
1302 	drbd_req_make_private_bio(req, req->master_bio);
1303 	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1304 	generic_make_request(req->private_bio);
1305 
1306 	return 1;
1307 }
1308 
1309 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1310 {
1311 	struct drbd_conf *odev = mdev;
1312 
1313 	while (1) {
1314 		if (odev->sync_conf.after == -1)
1315 			return 1;
1316 		odev = minor_to_mdev(odev->sync_conf.after);
1317 		ERR_IF(!odev) return 1;
1318 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1319 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1320 		    odev->state.aftr_isp || odev->state.peer_isp ||
1321 		    odev->state.user_isp)
1322 			return 0;
1323 	}
1324 }
1325 
1326 /**
1327  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1328  * @mdev:	DRBD device.
1329  *
1330  * Called from process context only (admin command and after_state_ch).
1331  */
1332 static int _drbd_pause_after(struct drbd_conf *mdev)
1333 {
1334 	struct drbd_conf *odev;
1335 	int i, rv = 0;
1336 
1337 	for (i = 0; i < minor_count; i++) {
1338 		odev = minor_to_mdev(i);
1339 		if (!odev)
1340 			continue;
1341 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1342 			continue;
1343 		if (!_drbd_may_sync_now(odev))
1344 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1345 			       != SS_NOTHING_TO_DO);
1346 	}
1347 
1348 	return rv;
1349 }
1350 
1351 /**
1352  * _drbd_resume_next() - Resume resync on all devices that may resync now
1353  * @mdev:	DRBD device.
1354  *
1355  * Called from process context only (admin command and worker).
1356  */
1357 static int _drbd_resume_next(struct drbd_conf *mdev)
1358 {
1359 	struct drbd_conf *odev;
1360 	int i, rv = 0;
1361 
1362 	for (i = 0; i < minor_count; i++) {
1363 		odev = minor_to_mdev(i);
1364 		if (!odev)
1365 			continue;
1366 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1367 			continue;
1368 		if (odev->state.aftr_isp) {
1369 			if (_drbd_may_sync_now(odev))
1370 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1371 							CS_HARD, NULL)
1372 				       != SS_NOTHING_TO_DO) ;
1373 		}
1374 	}
1375 	return rv;
1376 }
1377 
1378 void resume_next_sg(struct drbd_conf *mdev)
1379 {
1380 	write_lock_irq(&global_state_lock);
1381 	_drbd_resume_next(mdev);
1382 	write_unlock_irq(&global_state_lock);
1383 }
1384 
1385 void suspend_other_sg(struct drbd_conf *mdev)
1386 {
1387 	write_lock_irq(&global_state_lock);
1388 	_drbd_pause_after(mdev);
1389 	write_unlock_irq(&global_state_lock);
1390 }
1391 
1392 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1393 {
1394 	struct drbd_conf *odev;
1395 
1396 	if (o_minor == -1)
1397 		return NO_ERROR;
1398 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1399 		return ERR_SYNC_AFTER;
1400 
1401 	/* check for loops */
1402 	odev = minor_to_mdev(o_minor);
1403 	while (1) {
1404 		if (odev == mdev)
1405 			return ERR_SYNC_AFTER_CYCLE;
1406 
1407 		/* dependency chain ends here, no cycles. */
1408 		if (odev->sync_conf.after == -1)
1409 			return NO_ERROR;
1410 
1411 		/* follow the dependency chain */
1412 		odev = minor_to_mdev(odev->sync_conf.after);
1413 	}
1414 }
1415 
1416 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1417 {
1418 	int changes;
1419 	int retcode;
1420 
1421 	write_lock_irq(&global_state_lock);
1422 	retcode = sync_after_error(mdev, na);
1423 	if (retcode == NO_ERROR) {
1424 		mdev->sync_conf.after = na;
1425 		do {
1426 			changes  = _drbd_pause_after(mdev);
1427 			changes |= _drbd_resume_next(mdev);
1428 		} while (changes);
1429 	}
1430 	write_unlock_irq(&global_state_lock);
1431 	return retcode;
1432 }
1433 
1434 /**
1435  * drbd_start_resync() - Start the resync process
1436  * @mdev:	DRBD device.
1437  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1438  *
1439  * This function might bring you directly into one of the
1440  * C_PAUSED_SYNC_* states.
1441  */
1442 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1443 {
1444 	union drbd_state ns;
1445 	int r;
1446 
1447 	if (mdev->state.conn >= C_SYNC_SOURCE) {
1448 		dev_err(DEV, "Resync already running!\n");
1449 		return;
1450 	}
1451 
1452 	/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1453 	drbd_rs_cancel_all(mdev);
1454 
1455 	if (side == C_SYNC_TARGET) {
1456 		/* Since application IO was locked out during C_WF_BITMAP_T and
1457 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1458 		   we check that we might make the data inconsistent. */
1459 		r = drbd_khelper(mdev, "before-resync-target");
1460 		r = (r >> 8) & 0xff;
1461 		if (r > 0) {
1462 			dev_info(DEV, "before-resync-target handler returned %d, "
1463 			     "dropping connection.\n", r);
1464 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1465 			return;
1466 		}
1467 	}
1468 
1469 	drbd_state_lock(mdev);
1470 
1471 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1472 		drbd_state_unlock(mdev);
1473 		return;
1474 	}
1475 
1476 	if (side == C_SYNC_TARGET) {
1477 		mdev->bm_resync_fo = 0;
1478 	} else /* side == C_SYNC_SOURCE */ {
1479 		u64 uuid;
1480 
1481 		get_random_bytes(&uuid, sizeof(u64));
1482 		drbd_uuid_set(mdev, UI_BITMAP, uuid);
1483 		drbd_send_sync_uuid(mdev, uuid);
1484 
1485 		D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1486 	}
1487 
1488 	write_lock_irq(&global_state_lock);
1489 	ns = mdev->state;
1490 
1491 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1492 
1493 	ns.conn = side;
1494 
1495 	if (side == C_SYNC_TARGET)
1496 		ns.disk = D_INCONSISTENT;
1497 	else /* side == C_SYNC_SOURCE */
1498 		ns.pdsk = D_INCONSISTENT;
1499 
1500 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1501 	ns = mdev->state;
1502 
1503 	if (ns.conn < C_CONNECTED)
1504 		r = SS_UNKNOWN_ERROR;
1505 
1506 	if (r == SS_SUCCESS) {
1507 		unsigned long tw = drbd_bm_total_weight(mdev);
1508 		unsigned long now = jiffies;
1509 		int i;
1510 
1511 		mdev->rs_failed    = 0;
1512 		mdev->rs_paused    = 0;
1513 		mdev->rs_same_csum = 0;
1514 		mdev->rs_last_events = 0;
1515 		mdev->rs_last_sect_ev = 0;
1516 		mdev->rs_total     = tw;
1517 		mdev->rs_start     = now;
1518 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1519 			mdev->rs_mark_left[i] = tw;
1520 			mdev->rs_mark_time[i] = now;
1521 		}
1522 		_drbd_pause_after(mdev);
1523 	}
1524 	write_unlock_irq(&global_state_lock);
1525 	put_ldev(mdev);
1526 
1527 	if (r == SS_SUCCESS) {
1528 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1529 		     drbd_conn_str(ns.conn),
1530 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1531 		     (unsigned long) mdev->rs_total);
1532 
1533 		if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1534 			/* This still has a race (about when exactly the peers
1535 			 * detect connection loss) that can lead to a full sync
1536 			 * on next handshake. In 8.3.9 we fixed this with explicit
1537 			 * resync-finished notifications, but the fix
1538 			 * introduces a protocol change.  Sleeping for some
1539 			 * time longer than the ping interval + timeout on the
1540 			 * SyncSource, to give the SyncTarget the chance to
1541 			 * detect connection loss, then waiting for a ping
1542 			 * response (implicit in drbd_resync_finished) reduces
1543 			 * the race considerably, but does not solve it. */
1544 			if (side == C_SYNC_SOURCE)
1545 				schedule_timeout_interruptible(
1546 					mdev->net_conf->ping_int * HZ +
1547 					mdev->net_conf->ping_timeo*HZ/9);
1548 			drbd_resync_finished(mdev);
1549 		}
1550 
1551 		atomic_set(&mdev->rs_sect_in, 0);
1552 		atomic_set(&mdev->rs_sect_ev, 0);
1553 		mdev->rs_in_flight = 0;
1554 		mdev->rs_planed = 0;
1555 		spin_lock(&mdev->peer_seq_lock);
1556 		fifo_set(&mdev->rs_plan_s, 0);
1557 		spin_unlock(&mdev->peer_seq_lock);
1558 		/* ns.conn may already be != mdev->state.conn,
1559 		 * we may have been paused in between, or become paused until
1560 		 * the timer triggers.
1561 		 * No matter, that is handled in resync_timer_fn() */
1562 		if (ns.conn == C_SYNC_TARGET)
1563 			mod_timer(&mdev->resync_timer, jiffies);
1564 
1565 		drbd_md_sync(mdev);
1566 	}
1567 	drbd_state_unlock(mdev);
1568 }
1569 
1570 int drbd_worker(struct drbd_thread *thi)
1571 {
1572 	struct drbd_conf *mdev = thi->mdev;
1573 	struct drbd_work *w = NULL;
1574 	LIST_HEAD(work_list);
1575 	int intr = 0, i;
1576 
1577 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1578 
1579 	while (get_t_state(thi) == Running) {
1580 		drbd_thread_current_set_cpu(mdev);
1581 
1582 		if (down_trylock(&mdev->data.work.s)) {
1583 			mutex_lock(&mdev->data.mutex);
1584 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1585 				drbd_tcp_uncork(mdev->data.socket);
1586 			mutex_unlock(&mdev->data.mutex);
1587 
1588 			intr = down_interruptible(&mdev->data.work.s);
1589 
1590 			mutex_lock(&mdev->data.mutex);
1591 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1592 				drbd_tcp_cork(mdev->data.socket);
1593 			mutex_unlock(&mdev->data.mutex);
1594 		}
1595 
1596 		if (intr) {
1597 			D_ASSERT(intr == -EINTR);
1598 			flush_signals(current);
1599 			ERR_IF (get_t_state(thi) == Running)
1600 				continue;
1601 			break;
1602 		}
1603 
1604 		if (get_t_state(thi) != Running)
1605 			break;
1606 		/* With this break, we have done a down() but not consumed
1607 		   the entry from the list. The cleanup code takes care of
1608 		   this...   */
1609 
1610 		w = NULL;
1611 		spin_lock_irq(&mdev->data.work.q_lock);
1612 		ERR_IF(list_empty(&mdev->data.work.q)) {
1613 			/* something terribly wrong in our logic.
1614 			 * we were able to down() the semaphore,
1615 			 * but the list is empty... doh.
1616 			 *
1617 			 * what is the best thing to do now?
1618 			 * try again from scratch, restarting the receiver,
1619 			 * asender, whatnot? could break even more ugly,
1620 			 * e.g. when we are primary, but no good local data.
1621 			 *
1622 			 * I'll try to get away just starting over this loop.
1623 			 */
1624 			spin_unlock_irq(&mdev->data.work.q_lock);
1625 			continue;
1626 		}
1627 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1628 		list_del_init(&w->list);
1629 		spin_unlock_irq(&mdev->data.work.q_lock);
1630 
1631 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1632 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1633 			if (mdev->state.conn >= C_CONNECTED)
1634 				drbd_force_state(mdev,
1635 						NS(conn, C_NETWORK_FAILURE));
1636 		}
1637 	}
1638 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1639 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1640 
1641 	spin_lock_irq(&mdev->data.work.q_lock);
1642 	i = 0;
1643 	while (!list_empty(&mdev->data.work.q)) {
1644 		list_splice_init(&mdev->data.work.q, &work_list);
1645 		spin_unlock_irq(&mdev->data.work.q_lock);
1646 
1647 		while (!list_empty(&work_list)) {
1648 			w = list_entry(work_list.next, struct drbd_work, list);
1649 			list_del_init(&w->list);
1650 			w->cb(mdev, w, 1);
1651 			i++; /* dead debugging code */
1652 		}
1653 
1654 		spin_lock_irq(&mdev->data.work.q_lock);
1655 	}
1656 	sema_init(&mdev->data.work.s, 0);
1657 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1658 	 * but up() ed outside the spinlock, we could get an up() on the
1659 	 * semaphore without corresponding list entry.
1660 	 * So don't do that.
1661 	 */
1662 	spin_unlock_irq(&mdev->data.work.q_lock);
1663 
1664 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1665 	/* _drbd_set_state only uses stop_nowait.
1666 	 * wait here for the Exiting receiver. */
1667 	drbd_thread_stop(&mdev->receiver);
1668 	drbd_mdev_cleanup(mdev);
1669 
1670 	dev_info(DEV, "worker terminated\n");
1671 
1672 	clear_bit(DEVICE_DYING, &mdev->flags);
1673 	clear_bit(CONFIG_PENDING, &mdev->flags);
1674 	wake_up(&mdev->state_wait);
1675 
1676 	return 0;
1677 }
1678