xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision bc571b8c)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38 
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41 
42 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
43 
44 
45 
46 /* defined here:
47    drbd_md_io_complete
48    drbd_endio_sec
49    drbd_endio_pri
50 
51  * more endio handlers:
52    atodb_endio in drbd_actlog.c
53    drbd_bm_async_io_complete in drbd_bitmap.c
54 
55  * For all these callbacks, note the following:
56  * The callbacks will be called in irq context by the IDE drivers,
57  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58  * Try to get the locking right :)
59  *
60  */
61 
62 
63 /* About the global_state_lock
64    Each state transition on an device holds a read lock. In case we have
65    to evaluate the sync after dependencies, we grab a write lock, because
66    we need stable states on all devices for that.  */
67 rwlock_t global_state_lock;
68 
69 /* used for synchronous meta data and bitmap IO
70  * submitted by drbd_md_sync_page_io()
71  */
72 void drbd_md_io_complete(struct bio *bio, int error)
73 {
74 	struct drbd_md_io *md_io;
75 
76 	md_io = (struct drbd_md_io *)bio->bi_private;
77 	md_io->error = error;
78 
79 	complete(&md_io->event);
80 }
81 
82 /* reads on behalf of the partner,
83  * "submitted" by the receiver
84  */
85 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
86 {
87 	unsigned long flags = 0;
88 	struct drbd_conf *mdev = e->mdev;
89 
90 	D_ASSERT(e->block_id != ID_VACANT);
91 
92 	spin_lock_irqsave(&mdev->req_lock, flags);
93 	mdev->read_cnt += e->size >> 9;
94 	list_del(&e->w.list);
95 	if (list_empty(&mdev->read_ee))
96 		wake_up(&mdev->ee_wait);
97 	if (test_bit(__EE_WAS_ERROR, &e->flags))
98 		__drbd_chk_io_error(mdev, FALSE);
99 	spin_unlock_irqrestore(&mdev->req_lock, flags);
100 
101 	drbd_queue_work(&mdev->data.work, &e->w);
102 	put_ldev(mdev);
103 }
104 
105 static int is_failed_barrier(int ee_flags)
106 {
107 	return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
108 			== (EE_IS_BARRIER|EE_WAS_ERROR);
109 }
110 
111 /* writes on behalf of the partner, or resync writes,
112  * "submitted" by the receiver, final stage.  */
113 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
114 {
115 	unsigned long flags = 0;
116 	struct drbd_conf *mdev = e->mdev;
117 	sector_t e_sector;
118 	int do_wake;
119 	int is_syncer_req;
120 	int do_al_complete_io;
121 
122 	/* if this is a failed barrier request, disable use of barriers,
123 	 * and schedule for resubmission */
124 	if (is_failed_barrier(e->flags)) {
125 		drbd_bump_write_ordering(mdev, WO_bdev_flush);
126 		spin_lock_irqsave(&mdev->req_lock, flags);
127 		list_del(&e->w.list);
128 		e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
129 		e->w.cb = w_e_reissue;
130 		/* put_ldev actually happens below, once we come here again. */
131 		__release(local);
132 		spin_unlock_irqrestore(&mdev->req_lock, flags);
133 		drbd_queue_work(&mdev->data.work, &e->w);
134 		return;
135 	}
136 
137 	D_ASSERT(e->block_id != ID_VACANT);
138 
139 	/* after we moved e to done_ee,
140 	 * we may no longer access it,
141 	 * it may be freed/reused already!
142 	 * (as soon as we release the req_lock) */
143 	e_sector = e->sector;
144 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
145 	is_syncer_req = is_syncer_block_id(e->block_id);
146 
147 	spin_lock_irqsave(&mdev->req_lock, flags);
148 	mdev->writ_cnt += e->size >> 9;
149 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
150 	list_add_tail(&e->w.list, &mdev->done_ee);
151 
152 	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
153 	 * neither did we wake possibly waiting conflicting requests.
154 	 * done from "drbd_process_done_ee" within the appropriate w.cb
155 	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
156 
157 	do_wake = is_syncer_req
158 		? list_empty(&mdev->sync_ee)
159 		: list_empty(&mdev->active_ee);
160 
161 	if (test_bit(__EE_WAS_ERROR, &e->flags))
162 		__drbd_chk_io_error(mdev, FALSE);
163 	spin_unlock_irqrestore(&mdev->req_lock, flags);
164 
165 	if (is_syncer_req)
166 		drbd_rs_complete_io(mdev, e_sector);
167 
168 	if (do_wake)
169 		wake_up(&mdev->ee_wait);
170 
171 	if (do_al_complete_io)
172 		drbd_al_complete_io(mdev, e_sector);
173 
174 	wake_asender(mdev);
175 	put_ldev(mdev);
176 }
177 
178 /* writes on behalf of the partner, or resync writes,
179  * "submitted" by the receiver.
180  */
181 void drbd_endio_sec(struct bio *bio, int error)
182 {
183 	struct drbd_epoch_entry *e = bio->bi_private;
184 	struct drbd_conf *mdev = e->mdev;
185 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
186 	int is_write = bio_data_dir(bio) == WRITE;
187 
188 	if (error)
189 		dev_warn(DEV, "%s: error=%d s=%llus\n",
190 				is_write ? "write" : "read", error,
191 				(unsigned long long)e->sector);
192 	if (!error && !uptodate) {
193 		dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
194 				is_write ? "write" : "read",
195 				(unsigned long long)e->sector);
196 		/* strange behavior of some lower level drivers...
197 		 * fail the request by clearing the uptodate flag,
198 		 * but do not return any error?! */
199 		error = -EIO;
200 	}
201 
202 	if (error)
203 		set_bit(__EE_WAS_ERROR, &e->flags);
204 
205 	bio_put(bio); /* no need for the bio anymore */
206 	if (atomic_dec_and_test(&e->pending_bios)) {
207 		if (is_write)
208 			drbd_endio_write_sec_final(e);
209 		else
210 			drbd_endio_read_sec_final(e);
211 	}
212 }
213 
214 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
215  */
216 void drbd_endio_pri(struct bio *bio, int error)
217 {
218 	struct drbd_request *req = bio->bi_private;
219 	struct drbd_conf *mdev = req->mdev;
220 	enum drbd_req_event what;
221 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
222 
223 	if (!error && !uptodate) {
224 		dev_warn(DEV, "p %s: setting error to -EIO\n",
225 			 bio_data_dir(bio) == WRITE ? "write" : "read");
226 		/* strange behavior of some lower level drivers...
227 		 * fail the request by clearing the uptodate flag,
228 		 * but do not return any error?! */
229 		error = -EIO;
230 	}
231 
232 	/* to avoid recursion in __req_mod */
233 	if (unlikely(error)) {
234 		what = (bio_data_dir(bio) == WRITE)
235 			? write_completed_with_error
236 			: (bio_rw(bio) == READ)
237 			  ? read_completed_with_error
238 			  : read_ahead_completed_with_error;
239 	} else
240 		what = completed_ok;
241 
242 	bio_put(req->private_bio);
243 	req->private_bio = ERR_PTR(error);
244 
245 	req_mod(req, what);
246 }
247 
248 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
249 {
250 	struct drbd_request *req = container_of(w, struct drbd_request, w);
251 
252 	/* We should not detach for read io-error,
253 	 * but try to WRITE the P_DATA_REPLY to the failed location,
254 	 * to give the disk the chance to relocate that block */
255 
256 	spin_lock_irq(&mdev->req_lock);
257 	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
258 		_req_mod(req, read_retry_remote_canceled);
259 		spin_unlock_irq(&mdev->req_lock);
260 		return 1;
261 	}
262 	spin_unlock_irq(&mdev->req_lock);
263 
264 	return w_send_read_req(mdev, w, 0);
265 }
266 
267 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
268 {
269 	ERR_IF(cancel) return 1;
270 	dev_err(DEV, "resync inactive, but callback triggered??\n");
271 	return 1; /* Simply ignore this! */
272 }
273 
274 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
275 {
276 	struct hash_desc desc;
277 	struct scatterlist sg;
278 	struct page *page = e->pages;
279 	struct page *tmp;
280 	unsigned len;
281 
282 	desc.tfm = tfm;
283 	desc.flags = 0;
284 
285 	sg_init_table(&sg, 1);
286 	crypto_hash_init(&desc);
287 
288 	while ((tmp = page_chain_next(page))) {
289 		/* all but the last page will be fully used */
290 		sg_set_page(&sg, page, PAGE_SIZE, 0);
291 		crypto_hash_update(&desc, &sg, sg.length);
292 		page = tmp;
293 	}
294 	/* and now the last, possibly only partially used page */
295 	len = e->size & (PAGE_SIZE - 1);
296 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
297 	crypto_hash_update(&desc, &sg, sg.length);
298 	crypto_hash_final(&desc, digest);
299 }
300 
301 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
302 {
303 	struct hash_desc desc;
304 	struct scatterlist sg;
305 	struct bio_vec *bvec;
306 	int i;
307 
308 	desc.tfm = tfm;
309 	desc.flags = 0;
310 
311 	sg_init_table(&sg, 1);
312 	crypto_hash_init(&desc);
313 
314 	__bio_for_each_segment(bvec, bio, i, 0) {
315 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
316 		crypto_hash_update(&desc, &sg, sg.length);
317 	}
318 	crypto_hash_final(&desc, digest);
319 }
320 
321 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
322 {
323 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
324 	int digest_size;
325 	void *digest;
326 	int ok;
327 
328 	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
329 
330 	if (unlikely(cancel)) {
331 		drbd_free_ee(mdev, e);
332 		return 1;
333 	}
334 
335 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
336 		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
337 		digest = kmalloc(digest_size, GFP_NOIO);
338 		if (digest) {
339 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
340 
341 			inc_rs_pending(mdev);
342 			ok = drbd_send_drequest_csum(mdev,
343 						     e->sector,
344 						     e->size,
345 						     digest,
346 						     digest_size,
347 						     P_CSUM_RS_REQUEST);
348 			kfree(digest);
349 		} else {
350 			dev_err(DEV, "kmalloc() of digest failed.\n");
351 			ok = 0;
352 		}
353 	} else
354 		ok = 1;
355 
356 	drbd_free_ee(mdev, e);
357 
358 	if (unlikely(!ok))
359 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
360 	return ok;
361 }
362 
363 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
364 
365 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
366 {
367 	struct drbd_epoch_entry *e;
368 
369 	if (!get_ldev(mdev))
370 		return -EIO;
371 
372 	if (drbd_rs_should_slow_down(mdev))
373 		goto defer;
374 
375 	/* GFP_TRY, because if there is no memory available right now, this may
376 	 * be rescheduled for later. It is "only" background resync, after all. */
377 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
378 	if (!e)
379 		goto defer;
380 
381 	e->w.cb = w_e_send_csum;
382 	spin_lock_irq(&mdev->req_lock);
383 	list_add(&e->w.list, &mdev->read_ee);
384 	spin_unlock_irq(&mdev->req_lock);
385 
386 	atomic_add(size >> 9, &mdev->rs_sect_ev);
387 	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
388 		return 0;
389 
390 	/* drbd_submit_ee currently fails for one reason only:
391 	 * not being able to allocate enough bios.
392 	 * Is dropping the connection going to help? */
393 	spin_lock_irq(&mdev->req_lock);
394 	list_del(&e->w.list);
395 	spin_unlock_irq(&mdev->req_lock);
396 
397 	drbd_free_ee(mdev, e);
398 defer:
399 	put_ldev(mdev);
400 	return -EAGAIN;
401 }
402 
403 void resync_timer_fn(unsigned long data)
404 {
405 	struct drbd_conf *mdev = (struct drbd_conf *) data;
406 	int queue;
407 
408 	queue = 1;
409 	switch (mdev->state.conn) {
410 	case C_VERIFY_S:
411 		mdev->resync_work.cb = w_make_ov_request;
412 		break;
413 	case C_SYNC_TARGET:
414 		mdev->resync_work.cb = w_make_resync_request;
415 		break;
416 	default:
417 		queue = 0;
418 		mdev->resync_work.cb = w_resync_inactive;
419 	}
420 
421 	/* harmless race: list_empty outside data.work.q_lock */
422 	if (list_empty(&mdev->resync_work.list) && queue)
423 		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
424 }
425 
426 static void fifo_set(struct fifo_buffer *fb, int value)
427 {
428 	int i;
429 
430 	for (i = 0; i < fb->size; i++)
431 		fb->values[i] = value;
432 }
433 
434 static int fifo_push(struct fifo_buffer *fb, int value)
435 {
436 	int ov;
437 
438 	ov = fb->values[fb->head_index];
439 	fb->values[fb->head_index++] = value;
440 
441 	if (fb->head_index >= fb->size)
442 		fb->head_index = 0;
443 
444 	return ov;
445 }
446 
447 static void fifo_add_val(struct fifo_buffer *fb, int value)
448 {
449 	int i;
450 
451 	for (i = 0; i < fb->size; i++)
452 		fb->values[i] += value;
453 }
454 
455 int drbd_rs_controller(struct drbd_conf *mdev)
456 {
457 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
458 	unsigned int want;     /* The number of sectors we want in the proxy */
459 	int req_sect; /* Number of sectors to request in this turn */
460 	int correction; /* Number of sectors more we need in the proxy*/
461 	int cps; /* correction per invocation of drbd_rs_controller() */
462 	int steps; /* Number of time steps to plan ahead */
463 	int curr_corr;
464 	int max_sect;
465 
466 	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
467 	mdev->rs_in_flight -= sect_in;
468 
469 	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
470 
471 	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
472 
473 	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
474 		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
475 	} else { /* normal path */
476 		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
477 			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
478 	}
479 
480 	correction = want - mdev->rs_in_flight - mdev->rs_planed;
481 
482 	/* Plan ahead */
483 	cps = correction / steps;
484 	fifo_add_val(&mdev->rs_plan_s, cps);
485 	mdev->rs_planed += cps * steps;
486 
487 	/* What we do in this step */
488 	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
489 	spin_unlock(&mdev->peer_seq_lock);
490 	mdev->rs_planed -= curr_corr;
491 
492 	req_sect = sect_in + curr_corr;
493 	if (req_sect < 0)
494 		req_sect = 0;
495 
496 	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
497 	if (req_sect > max_sect)
498 		req_sect = max_sect;
499 
500 	/*
501 	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
502 		 sect_in, mdev->rs_in_flight, want, correction,
503 		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
504 	*/
505 
506 	return req_sect;
507 }
508 
509 int w_make_resync_request(struct drbd_conf *mdev,
510 		struct drbd_work *w, int cancel)
511 {
512 	unsigned long bit;
513 	sector_t sector;
514 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
515 	int max_segment_size;
516 	int number, rollback_i, size, pe, mx;
517 	int align, queued, sndbuf;
518 	int i = 0;
519 
520 	if (unlikely(cancel))
521 		return 1;
522 
523 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
524 		dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
525 		return 0;
526 	}
527 
528 	if (mdev->state.conn != C_SYNC_TARGET)
529 		dev_err(DEV, "%s in w_make_resync_request\n",
530 			drbd_conn_str(mdev->state.conn));
531 
532 	if (mdev->rs_total == 0) {
533 		/* empty resync? */
534 		drbd_resync_finished(mdev);
535 		return 1;
536 	}
537 
538 	if (!get_ldev(mdev)) {
539 		/* Since we only need to access mdev->rsync a
540 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
541 		   to continue resync with a broken disk makes no sense at
542 		   all */
543 		dev_err(DEV, "Disk broke down during resync!\n");
544 		mdev->resync_work.cb = w_resync_inactive;
545 		return 1;
546 	}
547 
548 	/* starting with drbd 8.3.8, we can handle multi-bio EEs,
549 	 * if it should be necessary */
550 	max_segment_size =
551 		mdev->agreed_pro_version < 94 ? queue_max_segment_size(mdev->rq_queue) :
552 		mdev->agreed_pro_version < 95 ?	DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_SEGMENT_SIZE;
553 
554 	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
555 		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
556 		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
557 	} else {
558 		mdev->c_sync_rate = mdev->sync_conf.rate;
559 		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
560 	}
561 
562 	/* Throttle resync on lower level disk activity, which may also be
563 	 * caused by application IO on Primary/SyncTarget.
564 	 * Keep this after the call to drbd_rs_controller, as that assumes
565 	 * to be called as precisely as possible every SLEEP_TIME,
566 	 * and would be confused otherwise. */
567 	if (drbd_rs_should_slow_down(mdev))
568 		goto requeue;
569 
570 	mutex_lock(&mdev->data.mutex);
571 	if (mdev->data.socket)
572 		mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
573 	else
574 		mx = 1;
575 	mutex_unlock(&mdev->data.mutex);
576 
577 	/* For resync rates >160MB/sec, allow more pending RS requests */
578 	if (number > mx)
579 		mx = number;
580 
581 	/* Limit the number of pending RS requests to no more than the peer's receive buffer */
582 	pe = atomic_read(&mdev->rs_pending_cnt);
583 	if ((pe + number) > mx) {
584 		number = mx - pe;
585 	}
586 
587 	for (i = 0; i < number; i++) {
588 		/* Stop generating RS requests, when half of the send buffer is filled */
589 		mutex_lock(&mdev->data.mutex);
590 		if (mdev->data.socket) {
591 			queued = mdev->data.socket->sk->sk_wmem_queued;
592 			sndbuf = mdev->data.socket->sk->sk_sndbuf;
593 		} else {
594 			queued = 1;
595 			sndbuf = 0;
596 		}
597 		mutex_unlock(&mdev->data.mutex);
598 		if (queued > sndbuf / 2)
599 			goto requeue;
600 
601 next_sector:
602 		size = BM_BLOCK_SIZE;
603 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
604 
605 		if (bit == -1UL) {
606 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
607 			mdev->resync_work.cb = w_resync_inactive;
608 			put_ldev(mdev);
609 			return 1;
610 		}
611 
612 		sector = BM_BIT_TO_SECT(bit);
613 
614 		if (drbd_try_rs_begin_io(mdev, sector)) {
615 			mdev->bm_resync_fo = bit;
616 			goto requeue;
617 		}
618 		mdev->bm_resync_fo = bit + 1;
619 
620 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
621 			drbd_rs_complete_io(mdev, sector);
622 			goto next_sector;
623 		}
624 
625 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
626 		/* try to find some adjacent bits.
627 		 * we stop if we have already the maximum req size.
628 		 *
629 		 * Additionally always align bigger requests, in order to
630 		 * be prepared for all stripe sizes of software RAIDs.
631 		 */
632 		align = 1;
633 		rollback_i = i;
634 		for (;;) {
635 			if (size + BM_BLOCK_SIZE > max_segment_size)
636 				break;
637 
638 			/* Be always aligned */
639 			if (sector & ((1<<(align+3))-1))
640 				break;
641 
642 			/* do not cross extent boundaries */
643 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
644 				break;
645 			/* now, is it actually dirty, after all?
646 			 * caution, drbd_bm_test_bit is tri-state for some
647 			 * obscure reason; ( b == 0 ) would get the out-of-band
648 			 * only accidentally right because of the "oddly sized"
649 			 * adjustment below */
650 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
651 				break;
652 			bit++;
653 			size += BM_BLOCK_SIZE;
654 			if ((BM_BLOCK_SIZE << align) <= size)
655 				align++;
656 			i++;
657 		}
658 		/* if we merged some,
659 		 * reset the offset to start the next drbd_bm_find_next from */
660 		if (size > BM_BLOCK_SIZE)
661 			mdev->bm_resync_fo = bit + 1;
662 #endif
663 
664 		/* adjust very last sectors, in case we are oddly sized */
665 		if (sector + (size>>9) > capacity)
666 			size = (capacity-sector)<<9;
667 		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
668 			switch (read_for_csum(mdev, sector, size)) {
669 			case -EIO: /* Disk failure */
670 				put_ldev(mdev);
671 				return 0;
672 			case -EAGAIN: /* allocation failed, or ldev busy */
673 				drbd_rs_complete_io(mdev, sector);
674 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
675 				i = rollback_i;
676 				goto requeue;
677 			case 0:
678 				/* everything ok */
679 				break;
680 			default:
681 				BUG();
682 			}
683 		} else {
684 			inc_rs_pending(mdev);
685 			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
686 					       sector, size, ID_SYNCER)) {
687 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
688 				dec_rs_pending(mdev);
689 				put_ldev(mdev);
690 				return 0;
691 			}
692 		}
693 	}
694 
695 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
696 		/* last syncer _request_ was sent,
697 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
698 		 * next sync group will resume), as soon as we receive the last
699 		 * resync data block, and the last bit is cleared.
700 		 * until then resync "work" is "inactive" ...
701 		 */
702 		mdev->resync_work.cb = w_resync_inactive;
703 		put_ldev(mdev);
704 		return 1;
705 	}
706 
707  requeue:
708 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
709 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
710 	put_ldev(mdev);
711 	return 1;
712 }
713 
714 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
715 {
716 	int number, i, size;
717 	sector_t sector;
718 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
719 
720 	if (unlikely(cancel))
721 		return 1;
722 
723 	if (unlikely(mdev->state.conn < C_CONNECTED)) {
724 		dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
725 		return 0;
726 	}
727 
728 	number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
729 	if (atomic_read(&mdev->rs_pending_cnt) > number)
730 		goto requeue;
731 
732 	number -= atomic_read(&mdev->rs_pending_cnt);
733 
734 	sector = mdev->ov_position;
735 	for (i = 0; i < number; i++) {
736 		if (sector >= capacity) {
737 			mdev->resync_work.cb = w_resync_inactive;
738 			return 1;
739 		}
740 
741 		size = BM_BLOCK_SIZE;
742 
743 		if (drbd_try_rs_begin_io(mdev, sector)) {
744 			mdev->ov_position = sector;
745 			goto requeue;
746 		}
747 
748 		if (sector + (size>>9) > capacity)
749 			size = (capacity-sector)<<9;
750 
751 		inc_rs_pending(mdev);
752 		if (!drbd_send_ov_request(mdev, sector, size)) {
753 			dec_rs_pending(mdev);
754 			return 0;
755 		}
756 		sector += BM_SECT_PER_BIT;
757 	}
758 	mdev->ov_position = sector;
759 
760  requeue:
761 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
762 	return 1;
763 }
764 
765 
766 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
767 {
768 	kfree(w);
769 	ov_oos_print(mdev);
770 	drbd_resync_finished(mdev);
771 
772 	return 1;
773 }
774 
775 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
776 {
777 	kfree(w);
778 
779 	drbd_resync_finished(mdev);
780 
781 	return 1;
782 }
783 
784 static void ping_peer(struct drbd_conf *mdev)
785 {
786 	clear_bit(GOT_PING_ACK, &mdev->flags);
787 	request_ping(mdev);
788 	wait_event(mdev->misc_wait,
789 		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
790 }
791 
792 int drbd_resync_finished(struct drbd_conf *mdev)
793 {
794 	unsigned long db, dt, dbdt;
795 	unsigned long n_oos;
796 	union drbd_state os, ns;
797 	struct drbd_work *w;
798 	char *khelper_cmd = NULL;
799 
800 	/* Remove all elements from the resync LRU. Since future actions
801 	 * might set bits in the (main) bitmap, then the entries in the
802 	 * resync LRU would be wrong. */
803 	if (drbd_rs_del_all(mdev)) {
804 		/* In case this is not possible now, most probably because
805 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
806 		 * queue (or even the read operations for those packets
807 		 * is not finished by now).   Retry in 100ms. */
808 
809 		drbd_kick_lo(mdev);
810 		__set_current_state(TASK_INTERRUPTIBLE);
811 		schedule_timeout(HZ / 10);
812 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
813 		if (w) {
814 			w->cb = w_resync_finished;
815 			drbd_queue_work(&mdev->data.work, w);
816 			return 1;
817 		}
818 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
819 	}
820 
821 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
822 	if (dt <= 0)
823 		dt = 1;
824 	db = mdev->rs_total;
825 	dbdt = Bit2KB(db/dt);
826 	mdev->rs_paused /= HZ;
827 
828 	if (!get_ldev(mdev))
829 		goto out;
830 
831 	ping_peer(mdev);
832 
833 	spin_lock_irq(&mdev->req_lock);
834 	os = mdev->state;
835 
836 	/* This protects us against multiple calls (that can happen in the presence
837 	   of application IO), and against connectivity loss just before we arrive here. */
838 	if (os.conn <= C_CONNECTED)
839 		goto out_unlock;
840 
841 	ns = os;
842 	ns.conn = C_CONNECTED;
843 
844 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
845 	     (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
846 	     "Online verify " : "Resync",
847 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
848 
849 	n_oos = drbd_bm_total_weight(mdev);
850 
851 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
852 		if (n_oos) {
853 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
854 			      n_oos, Bit2KB(1));
855 			khelper_cmd = "out-of-sync";
856 		}
857 	} else {
858 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
859 
860 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
861 			khelper_cmd = "after-resync-target";
862 
863 		if (mdev->csums_tfm && mdev->rs_total) {
864 			const unsigned long s = mdev->rs_same_csum;
865 			const unsigned long t = mdev->rs_total;
866 			const int ratio =
867 				(t == 0)     ? 0 :
868 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
869 			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
870 			     "transferred %luK total %luK\n",
871 			     ratio,
872 			     Bit2KB(mdev->rs_same_csum),
873 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
874 			     Bit2KB(mdev->rs_total));
875 		}
876 	}
877 
878 	if (mdev->rs_failed) {
879 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
880 
881 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
882 			ns.disk = D_INCONSISTENT;
883 			ns.pdsk = D_UP_TO_DATE;
884 		} else {
885 			ns.disk = D_UP_TO_DATE;
886 			ns.pdsk = D_INCONSISTENT;
887 		}
888 	} else {
889 		ns.disk = D_UP_TO_DATE;
890 		ns.pdsk = D_UP_TO_DATE;
891 
892 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
893 			if (mdev->p_uuid) {
894 				int i;
895 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
896 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
897 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
898 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
899 			} else {
900 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
901 			}
902 		}
903 
904 		drbd_uuid_set_bm(mdev, 0UL);
905 
906 		if (mdev->p_uuid) {
907 			/* Now the two UUID sets are equal, update what we
908 			 * know of the peer. */
909 			int i;
910 			for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
911 				mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
912 		}
913 	}
914 
915 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
916 out_unlock:
917 	spin_unlock_irq(&mdev->req_lock);
918 	put_ldev(mdev);
919 out:
920 	mdev->rs_total  = 0;
921 	mdev->rs_failed = 0;
922 	mdev->rs_paused = 0;
923 	mdev->ov_start_sector = 0;
924 
925 	drbd_md_sync(mdev);
926 
927 	if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
928 		dev_info(DEV, "Writing the whole bitmap\n");
929 		drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
930 	}
931 
932 	if (khelper_cmd)
933 		drbd_khelper(mdev, khelper_cmd);
934 
935 	return 1;
936 }
937 
938 /* helper */
939 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
940 {
941 	if (drbd_ee_has_active_page(e)) {
942 		/* This might happen if sendpage() has not finished */
943 		int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
944 		atomic_add(i, &mdev->pp_in_use_by_net);
945 		atomic_sub(i, &mdev->pp_in_use);
946 		spin_lock_irq(&mdev->req_lock);
947 		list_add_tail(&e->w.list, &mdev->net_ee);
948 		spin_unlock_irq(&mdev->req_lock);
949 		wake_up(&drbd_pp_wait);
950 	} else
951 		drbd_free_ee(mdev, e);
952 }
953 
954 /**
955  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
956  * @mdev:	DRBD device.
957  * @w:		work object.
958  * @cancel:	The connection will be closed anyways
959  */
960 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
961 {
962 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
963 	int ok;
964 
965 	if (unlikely(cancel)) {
966 		drbd_free_ee(mdev, e);
967 		dec_unacked(mdev);
968 		return 1;
969 	}
970 
971 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
972 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
973 	} else {
974 		if (__ratelimit(&drbd_ratelimit_state))
975 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
976 			    (unsigned long long)e->sector);
977 
978 		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
979 	}
980 
981 	dec_unacked(mdev);
982 
983 	move_to_net_ee_or_free(mdev, e);
984 
985 	if (unlikely(!ok))
986 		dev_err(DEV, "drbd_send_block() failed\n");
987 	return ok;
988 }
989 
990 /**
991  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
992  * @mdev:	DRBD device.
993  * @w:		work object.
994  * @cancel:	The connection will be closed anyways
995  */
996 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
997 {
998 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
999 	int ok;
1000 
1001 	if (unlikely(cancel)) {
1002 		drbd_free_ee(mdev, e);
1003 		dec_unacked(mdev);
1004 		return 1;
1005 	}
1006 
1007 	if (get_ldev_if_state(mdev, D_FAILED)) {
1008 		drbd_rs_complete_io(mdev, e->sector);
1009 		put_ldev(mdev);
1010 	}
1011 
1012 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1013 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1014 			inc_rs_pending(mdev);
1015 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1016 		} else {
1017 			if (__ratelimit(&drbd_ratelimit_state))
1018 				dev_err(DEV, "Not sending RSDataReply, "
1019 				    "partner DISKLESS!\n");
1020 			ok = 1;
1021 		}
1022 	} else {
1023 		if (__ratelimit(&drbd_ratelimit_state))
1024 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1025 			    (unsigned long long)e->sector);
1026 
1027 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1028 
1029 		/* update resync data with failure */
1030 		drbd_rs_failed_io(mdev, e->sector, e->size);
1031 	}
1032 
1033 	dec_unacked(mdev);
1034 
1035 	move_to_net_ee_or_free(mdev, e);
1036 
1037 	if (unlikely(!ok))
1038 		dev_err(DEV, "drbd_send_block() failed\n");
1039 	return ok;
1040 }
1041 
1042 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1043 {
1044 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1045 	struct digest_info *di;
1046 	int digest_size;
1047 	void *digest = NULL;
1048 	int ok, eq = 0;
1049 
1050 	if (unlikely(cancel)) {
1051 		drbd_free_ee(mdev, e);
1052 		dec_unacked(mdev);
1053 		return 1;
1054 	}
1055 
1056 	if (get_ldev(mdev)) {
1057 		drbd_rs_complete_io(mdev, e->sector);
1058 		put_ldev(mdev);
1059 	}
1060 
1061 	di = e->digest;
1062 
1063 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1064 		/* quick hack to try to avoid a race against reconfiguration.
1065 		 * a real fix would be much more involved,
1066 		 * introducing more locking mechanisms */
1067 		if (mdev->csums_tfm) {
1068 			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1069 			D_ASSERT(digest_size == di->digest_size);
1070 			digest = kmalloc(digest_size, GFP_NOIO);
1071 		}
1072 		if (digest) {
1073 			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1074 			eq = !memcmp(digest, di->digest, digest_size);
1075 			kfree(digest);
1076 		}
1077 
1078 		if (eq) {
1079 			drbd_set_in_sync(mdev, e->sector, e->size);
1080 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1081 			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1082 			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1083 		} else {
1084 			inc_rs_pending(mdev);
1085 			e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1086 			e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1087 			kfree(di);
1088 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1089 		}
1090 	} else {
1091 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1092 		if (__ratelimit(&drbd_ratelimit_state))
1093 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1094 	}
1095 
1096 	dec_unacked(mdev);
1097 	move_to_net_ee_or_free(mdev, e);
1098 
1099 	if (unlikely(!ok))
1100 		dev_err(DEV, "drbd_send_block/ack() failed\n");
1101 	return ok;
1102 }
1103 
1104 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1105 {
1106 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1107 	int digest_size;
1108 	void *digest;
1109 	int ok = 1;
1110 
1111 	if (unlikely(cancel))
1112 		goto out;
1113 
1114 	if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1115 		goto out;
1116 
1117 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1118 	/* FIXME if this allocation fails, online verify will not terminate! */
1119 	digest = kmalloc(digest_size, GFP_NOIO);
1120 	if (digest) {
1121 		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1122 		inc_rs_pending(mdev);
1123 		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1124 					     digest, digest_size, P_OV_REPLY);
1125 		if (!ok)
1126 			dec_rs_pending(mdev);
1127 		kfree(digest);
1128 	}
1129 
1130 out:
1131 	drbd_free_ee(mdev, e);
1132 
1133 	dec_unacked(mdev);
1134 
1135 	return ok;
1136 }
1137 
1138 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1139 {
1140 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1141 		mdev->ov_last_oos_size += size>>9;
1142 	} else {
1143 		mdev->ov_last_oos_start = sector;
1144 		mdev->ov_last_oos_size = size>>9;
1145 	}
1146 	drbd_set_out_of_sync(mdev, sector, size);
1147 	set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1148 }
1149 
1150 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1151 {
1152 	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1153 	struct digest_info *di;
1154 	int digest_size;
1155 	void *digest;
1156 	int ok, eq = 0;
1157 
1158 	if (unlikely(cancel)) {
1159 		drbd_free_ee(mdev, e);
1160 		dec_unacked(mdev);
1161 		return 1;
1162 	}
1163 
1164 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1165 	 * the resync lru has been cleaned up already */
1166 	if (get_ldev(mdev)) {
1167 		drbd_rs_complete_io(mdev, e->sector);
1168 		put_ldev(mdev);
1169 	}
1170 
1171 	di = e->digest;
1172 
1173 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1174 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1175 		digest = kmalloc(digest_size, GFP_NOIO);
1176 		if (digest) {
1177 			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1178 
1179 			D_ASSERT(digest_size == di->digest_size);
1180 			eq = !memcmp(digest, di->digest, digest_size);
1181 			kfree(digest);
1182 		}
1183 	} else {
1184 		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1185 		if (__ratelimit(&drbd_ratelimit_state))
1186 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1187 	}
1188 
1189 	dec_unacked(mdev);
1190 	if (!eq)
1191 		drbd_ov_oos_found(mdev, e->sector, e->size);
1192 	else
1193 		ov_oos_print(mdev);
1194 
1195 	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1196 			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1197 
1198 	drbd_free_ee(mdev, e);
1199 
1200 	if (--mdev->ov_left == 0) {
1201 		ov_oos_print(mdev);
1202 		drbd_resync_finished(mdev);
1203 	}
1204 
1205 	return ok;
1206 }
1207 
1208 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1209 {
1210 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1211 	complete(&b->done);
1212 	return 1;
1213 }
1214 
1215 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1216 {
1217 	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1218 	struct p_barrier *p = &mdev->data.sbuf.barrier;
1219 	int ok = 1;
1220 
1221 	/* really avoid racing with tl_clear.  w.cb may have been referenced
1222 	 * just before it was reassigned and re-queued, so double check that.
1223 	 * actually, this race was harmless, since we only try to send the
1224 	 * barrier packet here, and otherwise do nothing with the object.
1225 	 * but compare with the head of w_clear_epoch */
1226 	spin_lock_irq(&mdev->req_lock);
1227 	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1228 		cancel = 1;
1229 	spin_unlock_irq(&mdev->req_lock);
1230 	if (cancel)
1231 		return 1;
1232 
1233 	if (!drbd_get_data_sock(mdev))
1234 		return 0;
1235 	p->barrier = b->br_number;
1236 	/* inc_ap_pending was done where this was queued.
1237 	 * dec_ap_pending will be done in got_BarrierAck
1238 	 * or (on connection loss) in w_clear_epoch.  */
1239 	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1240 				(struct p_header80 *)p, sizeof(*p), 0);
1241 	drbd_put_data_sock(mdev);
1242 
1243 	return ok;
1244 }
1245 
1246 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1247 {
1248 	if (cancel)
1249 		return 1;
1250 	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1251 }
1252 
1253 /**
1254  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1255  * @mdev:	DRBD device.
1256  * @w:		work object.
1257  * @cancel:	The connection will be closed anyways
1258  */
1259 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1260 {
1261 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1262 	int ok;
1263 
1264 	if (unlikely(cancel)) {
1265 		req_mod(req, send_canceled);
1266 		return 1;
1267 	}
1268 
1269 	ok = drbd_send_dblock(mdev, req);
1270 	req_mod(req, ok ? handed_over_to_network : send_failed);
1271 
1272 	return ok;
1273 }
1274 
1275 /**
1276  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1277  * @mdev:	DRBD device.
1278  * @w:		work object.
1279  * @cancel:	The connection will be closed anyways
1280  */
1281 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1282 {
1283 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1284 	int ok;
1285 
1286 	if (unlikely(cancel)) {
1287 		req_mod(req, send_canceled);
1288 		return 1;
1289 	}
1290 
1291 	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1292 				(unsigned long)req);
1293 
1294 	if (!ok) {
1295 		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1296 		 * so this is probably redundant */
1297 		if (mdev->state.conn >= C_CONNECTED)
1298 			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1299 	}
1300 	req_mod(req, ok ? handed_over_to_network : send_failed);
1301 
1302 	return ok;
1303 }
1304 
1305 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1306 {
1307 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1308 
1309 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1310 		drbd_al_begin_io(mdev, req->sector);
1311 	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1312 	   theoretically. Practically it can not deadlock, since this is
1313 	   only used when unfreezing IOs. All the extents of the requests
1314 	   that made it into the TL are already active */
1315 
1316 	drbd_req_make_private_bio(req, req->master_bio);
1317 	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1318 	generic_make_request(req->private_bio);
1319 
1320 	return 1;
1321 }
1322 
1323 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1324 {
1325 	struct drbd_conf *odev = mdev;
1326 
1327 	while (1) {
1328 		if (odev->sync_conf.after == -1)
1329 			return 1;
1330 		odev = minor_to_mdev(odev->sync_conf.after);
1331 		ERR_IF(!odev) return 1;
1332 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1333 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1334 		    odev->state.aftr_isp || odev->state.peer_isp ||
1335 		    odev->state.user_isp)
1336 			return 0;
1337 	}
1338 }
1339 
1340 /**
1341  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1342  * @mdev:	DRBD device.
1343  *
1344  * Called from process context only (admin command and after_state_ch).
1345  */
1346 static int _drbd_pause_after(struct drbd_conf *mdev)
1347 {
1348 	struct drbd_conf *odev;
1349 	int i, rv = 0;
1350 
1351 	for (i = 0; i < minor_count; i++) {
1352 		odev = minor_to_mdev(i);
1353 		if (!odev)
1354 			continue;
1355 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1356 			continue;
1357 		if (!_drbd_may_sync_now(odev))
1358 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1359 			       != SS_NOTHING_TO_DO);
1360 	}
1361 
1362 	return rv;
1363 }
1364 
1365 /**
1366  * _drbd_resume_next() - Resume resync on all devices that may resync now
1367  * @mdev:	DRBD device.
1368  *
1369  * Called from process context only (admin command and worker).
1370  */
1371 static int _drbd_resume_next(struct drbd_conf *mdev)
1372 {
1373 	struct drbd_conf *odev;
1374 	int i, rv = 0;
1375 
1376 	for (i = 0; i < minor_count; i++) {
1377 		odev = minor_to_mdev(i);
1378 		if (!odev)
1379 			continue;
1380 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1381 			continue;
1382 		if (odev->state.aftr_isp) {
1383 			if (_drbd_may_sync_now(odev))
1384 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1385 							CS_HARD, NULL)
1386 				       != SS_NOTHING_TO_DO) ;
1387 		}
1388 	}
1389 	return rv;
1390 }
1391 
1392 void resume_next_sg(struct drbd_conf *mdev)
1393 {
1394 	write_lock_irq(&global_state_lock);
1395 	_drbd_resume_next(mdev);
1396 	write_unlock_irq(&global_state_lock);
1397 }
1398 
1399 void suspend_other_sg(struct drbd_conf *mdev)
1400 {
1401 	write_lock_irq(&global_state_lock);
1402 	_drbd_pause_after(mdev);
1403 	write_unlock_irq(&global_state_lock);
1404 }
1405 
1406 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1407 {
1408 	struct drbd_conf *odev;
1409 
1410 	if (o_minor == -1)
1411 		return NO_ERROR;
1412 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1413 		return ERR_SYNC_AFTER;
1414 
1415 	/* check for loops */
1416 	odev = minor_to_mdev(o_minor);
1417 	while (1) {
1418 		if (odev == mdev)
1419 			return ERR_SYNC_AFTER_CYCLE;
1420 
1421 		/* dependency chain ends here, no cycles. */
1422 		if (odev->sync_conf.after == -1)
1423 			return NO_ERROR;
1424 
1425 		/* follow the dependency chain */
1426 		odev = minor_to_mdev(odev->sync_conf.after);
1427 	}
1428 }
1429 
1430 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1431 {
1432 	int changes;
1433 	int retcode;
1434 
1435 	write_lock_irq(&global_state_lock);
1436 	retcode = sync_after_error(mdev, na);
1437 	if (retcode == NO_ERROR) {
1438 		mdev->sync_conf.after = na;
1439 		do {
1440 			changes  = _drbd_pause_after(mdev);
1441 			changes |= _drbd_resume_next(mdev);
1442 		} while (changes);
1443 	}
1444 	write_unlock_irq(&global_state_lock);
1445 	return retcode;
1446 }
1447 
1448 /**
1449  * drbd_start_resync() - Start the resync process
1450  * @mdev:	DRBD device.
1451  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1452  *
1453  * This function might bring you directly into one of the
1454  * C_PAUSED_SYNC_* states.
1455  */
1456 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1457 {
1458 	union drbd_state ns;
1459 	int r;
1460 
1461 	if (mdev->state.conn >= C_SYNC_SOURCE) {
1462 		dev_err(DEV, "Resync already running!\n");
1463 		return;
1464 	}
1465 
1466 	/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1467 	drbd_rs_cancel_all(mdev);
1468 
1469 	if (side == C_SYNC_TARGET) {
1470 		/* Since application IO was locked out during C_WF_BITMAP_T and
1471 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1472 		   we check that we might make the data inconsistent. */
1473 		r = drbd_khelper(mdev, "before-resync-target");
1474 		r = (r >> 8) & 0xff;
1475 		if (r > 0) {
1476 			dev_info(DEV, "before-resync-target handler returned %d, "
1477 			     "dropping connection.\n", r);
1478 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1479 			return;
1480 		}
1481 	}
1482 
1483 	drbd_state_lock(mdev);
1484 
1485 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1486 		drbd_state_unlock(mdev);
1487 		return;
1488 	}
1489 
1490 	if (side == C_SYNC_TARGET) {
1491 		mdev->bm_resync_fo = 0;
1492 	} else /* side == C_SYNC_SOURCE */ {
1493 		u64 uuid;
1494 
1495 		get_random_bytes(&uuid, sizeof(u64));
1496 		drbd_uuid_set(mdev, UI_BITMAP, uuid);
1497 		drbd_send_sync_uuid(mdev, uuid);
1498 
1499 		D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1500 	}
1501 
1502 	write_lock_irq(&global_state_lock);
1503 	ns = mdev->state;
1504 
1505 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1506 
1507 	ns.conn = side;
1508 
1509 	if (side == C_SYNC_TARGET)
1510 		ns.disk = D_INCONSISTENT;
1511 	else /* side == C_SYNC_SOURCE */
1512 		ns.pdsk = D_INCONSISTENT;
1513 
1514 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1515 	ns = mdev->state;
1516 
1517 	if (ns.conn < C_CONNECTED)
1518 		r = SS_UNKNOWN_ERROR;
1519 
1520 	if (r == SS_SUCCESS) {
1521 		unsigned long tw = drbd_bm_total_weight(mdev);
1522 		unsigned long now = jiffies;
1523 		int i;
1524 
1525 		mdev->rs_failed    = 0;
1526 		mdev->rs_paused    = 0;
1527 		mdev->rs_same_csum = 0;
1528 		mdev->rs_last_events = 0;
1529 		mdev->rs_last_sect_ev = 0;
1530 		mdev->rs_total     = tw;
1531 		mdev->rs_start     = now;
1532 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1533 			mdev->rs_mark_left[i] = tw;
1534 			mdev->rs_mark_time[i] = now;
1535 		}
1536 		_drbd_pause_after(mdev);
1537 	}
1538 	write_unlock_irq(&global_state_lock);
1539 	put_ldev(mdev);
1540 
1541 	if (r == SS_SUCCESS) {
1542 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1543 		     drbd_conn_str(ns.conn),
1544 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1545 		     (unsigned long) mdev->rs_total);
1546 
1547 		if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1548 			/* This still has a race (about when exactly the peers
1549 			 * detect connection loss) that can lead to a full sync
1550 			 * on next handshake. In 8.3.9 we fixed this with explicit
1551 			 * resync-finished notifications, but the fix
1552 			 * introduces a protocol change.  Sleeping for some
1553 			 * time longer than the ping interval + timeout on the
1554 			 * SyncSource, to give the SyncTarget the chance to
1555 			 * detect connection loss, then waiting for a ping
1556 			 * response (implicit in drbd_resync_finished) reduces
1557 			 * the race considerably, but does not solve it. */
1558 			if (side == C_SYNC_SOURCE)
1559 				schedule_timeout_interruptible(
1560 					mdev->net_conf->ping_int * HZ +
1561 					mdev->net_conf->ping_timeo*HZ/9);
1562 			drbd_resync_finished(mdev);
1563 		}
1564 
1565 		atomic_set(&mdev->rs_sect_in, 0);
1566 		atomic_set(&mdev->rs_sect_ev, 0);
1567 		mdev->rs_in_flight = 0;
1568 		mdev->rs_planed = 0;
1569 		spin_lock(&mdev->peer_seq_lock);
1570 		fifo_set(&mdev->rs_plan_s, 0);
1571 		spin_unlock(&mdev->peer_seq_lock);
1572 		/* ns.conn may already be != mdev->state.conn,
1573 		 * we may have been paused in between, or become paused until
1574 		 * the timer triggers.
1575 		 * No matter, that is handled in resync_timer_fn() */
1576 		if (ns.conn == C_SYNC_TARGET)
1577 			mod_timer(&mdev->resync_timer, jiffies);
1578 
1579 		drbd_md_sync(mdev);
1580 	}
1581 	drbd_state_unlock(mdev);
1582 }
1583 
1584 int drbd_worker(struct drbd_thread *thi)
1585 {
1586 	struct drbd_conf *mdev = thi->mdev;
1587 	struct drbd_work *w = NULL;
1588 	LIST_HEAD(work_list);
1589 	int intr = 0, i;
1590 
1591 	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1592 
1593 	while (get_t_state(thi) == Running) {
1594 		drbd_thread_current_set_cpu(mdev);
1595 
1596 		if (down_trylock(&mdev->data.work.s)) {
1597 			mutex_lock(&mdev->data.mutex);
1598 			if (mdev->data.socket && !mdev->net_conf->no_cork)
1599 				drbd_tcp_uncork(mdev->data.socket);
1600 			mutex_unlock(&mdev->data.mutex);
1601 
1602 			intr = down_interruptible(&mdev->data.work.s);
1603 
1604 			mutex_lock(&mdev->data.mutex);
1605 			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1606 				drbd_tcp_cork(mdev->data.socket);
1607 			mutex_unlock(&mdev->data.mutex);
1608 		}
1609 
1610 		if (intr) {
1611 			D_ASSERT(intr == -EINTR);
1612 			flush_signals(current);
1613 			ERR_IF (get_t_state(thi) == Running)
1614 				continue;
1615 			break;
1616 		}
1617 
1618 		if (get_t_state(thi) != Running)
1619 			break;
1620 		/* With this break, we have done a down() but not consumed
1621 		   the entry from the list. The cleanup code takes care of
1622 		   this...   */
1623 
1624 		w = NULL;
1625 		spin_lock_irq(&mdev->data.work.q_lock);
1626 		ERR_IF(list_empty(&mdev->data.work.q)) {
1627 			/* something terribly wrong in our logic.
1628 			 * we were able to down() the semaphore,
1629 			 * but the list is empty... doh.
1630 			 *
1631 			 * what is the best thing to do now?
1632 			 * try again from scratch, restarting the receiver,
1633 			 * asender, whatnot? could break even more ugly,
1634 			 * e.g. when we are primary, but no good local data.
1635 			 *
1636 			 * I'll try to get away just starting over this loop.
1637 			 */
1638 			spin_unlock_irq(&mdev->data.work.q_lock);
1639 			continue;
1640 		}
1641 		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1642 		list_del_init(&w->list);
1643 		spin_unlock_irq(&mdev->data.work.q_lock);
1644 
1645 		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1646 			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1647 			if (mdev->state.conn >= C_CONNECTED)
1648 				drbd_force_state(mdev,
1649 						NS(conn, C_NETWORK_FAILURE));
1650 		}
1651 	}
1652 	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1653 	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1654 
1655 	spin_lock_irq(&mdev->data.work.q_lock);
1656 	i = 0;
1657 	while (!list_empty(&mdev->data.work.q)) {
1658 		list_splice_init(&mdev->data.work.q, &work_list);
1659 		spin_unlock_irq(&mdev->data.work.q_lock);
1660 
1661 		while (!list_empty(&work_list)) {
1662 			w = list_entry(work_list.next, struct drbd_work, list);
1663 			list_del_init(&w->list);
1664 			w->cb(mdev, w, 1);
1665 			i++; /* dead debugging code */
1666 		}
1667 
1668 		spin_lock_irq(&mdev->data.work.q_lock);
1669 	}
1670 	sema_init(&mdev->data.work.s, 0);
1671 	/* DANGEROUS race: if someone did queue his work within the spinlock,
1672 	 * but up() ed outside the spinlock, we could get an up() on the
1673 	 * semaphore without corresponding list entry.
1674 	 * So don't do that.
1675 	 */
1676 	spin_unlock_irq(&mdev->data.work.q_lock);
1677 
1678 	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1679 	/* _drbd_set_state only uses stop_nowait.
1680 	 * wait here for the Exiting receiver. */
1681 	drbd_thread_stop(&mdev->receiver);
1682 	drbd_mdev_cleanup(mdev);
1683 
1684 	dev_info(DEV, "worker terminated\n");
1685 
1686 	clear_bit(DEVICE_DYING, &mdev->flags);
1687 	clear_bit(CONFIG_PENDING, &mdev->flags);
1688 	wake_up(&mdev->state_wait);
1689 
1690 	return 0;
1691 }
1692