xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision 9d749629)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24  */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_req.h"
40 
41 static int w_make_ov_request(struct drbd_work *w, int cancel);
42 
43 
44 /* endio handlers:
45  *   drbd_md_io_complete (defined here)
46  *   drbd_request_endio (defined here)
47  *   drbd_peer_request_endio (defined here)
48  *   bm_async_io_complete (defined in drbd_bitmap.c)
49  *
50  * For all these callbacks, note the following:
51  * The callbacks will be called in irq context by the IDE drivers,
52  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
53  * Try to get the locking right :)
54  *
55  */
56 
57 
58 /* About the global_state_lock
59    Each state transition on an device holds a read lock. In case we have
60    to evaluate the resync after dependencies, we grab a write lock, because
61    we need stable states on all devices for that.  */
62 rwlock_t global_state_lock;
63 
64 /* used for synchronous meta data and bitmap IO
65  * submitted by drbd_md_sync_page_io()
66  */
67 void drbd_md_io_complete(struct bio *bio, int error)
68 {
69 	struct drbd_md_io *md_io;
70 	struct drbd_conf *mdev;
71 
72 	md_io = (struct drbd_md_io *)bio->bi_private;
73 	mdev = container_of(md_io, struct drbd_conf, md_io);
74 
75 	md_io->error = error;
76 
77 	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
78 	 * to timeout on the lower level device, and eventually detach from it.
79 	 * If this io completion runs after that timeout expired, this
80 	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
81 	 * During normal operation, this only puts that extra reference
82 	 * down to 1 again.
83 	 * Make sure we first drop the reference, and only then signal
84 	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
85 	 * next drbd_md_sync_page_io(), that we trigger the
86 	 * ASSERT(atomic_read(&mdev->md_io_in_use) == 1) there.
87 	 */
88 	drbd_md_put_buffer(mdev);
89 	md_io->done = 1;
90 	wake_up(&mdev->misc_wait);
91 	bio_put(bio);
92 	put_ldev(mdev);
93 }
94 
95 /* reads on behalf of the partner,
96  * "submitted" by the receiver
97  */
98 void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
99 {
100 	unsigned long flags = 0;
101 	struct drbd_conf *mdev = peer_req->w.mdev;
102 
103 	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
104 	mdev->read_cnt += peer_req->i.size >> 9;
105 	list_del(&peer_req->w.list);
106 	if (list_empty(&mdev->read_ee))
107 		wake_up(&mdev->ee_wait);
108 	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
109 		__drbd_chk_io_error(mdev, DRBD_READ_ERROR);
110 	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
111 
112 	drbd_queue_work(&mdev->tconn->sender_work, &peer_req->w);
113 	put_ldev(mdev);
114 }
115 
116 /* writes on behalf of the partner, or resync writes,
117  * "submitted" by the receiver, final stage.  */
118 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
119 {
120 	unsigned long flags = 0;
121 	struct drbd_conf *mdev = peer_req->w.mdev;
122 	struct drbd_interval i;
123 	int do_wake;
124 	u64 block_id;
125 	int do_al_complete_io;
126 
127 	/* after we moved peer_req to done_ee,
128 	 * we may no longer access it,
129 	 * it may be freed/reused already!
130 	 * (as soon as we release the req_lock) */
131 	i = peer_req->i;
132 	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
133 	block_id = peer_req->block_id;
134 
135 	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
136 	mdev->writ_cnt += peer_req->i.size >> 9;
137 	list_move_tail(&peer_req->w.list, &mdev->done_ee);
138 
139 	/*
140 	 * Do not remove from the write_requests tree here: we did not send the
141 	 * Ack yet and did not wake possibly waiting conflicting requests.
142 	 * Removed from the tree from "drbd_process_done_ee" within the
143 	 * appropriate w.cb (e_end_block/e_end_resync_block) or from
144 	 * _drbd_clear_done_ee.
145 	 */
146 
147 	do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
148 
149 	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
150 		__drbd_chk_io_error(mdev, DRBD_WRITE_ERROR);
151 	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
152 
153 	if (block_id == ID_SYNCER)
154 		drbd_rs_complete_io(mdev, i.sector);
155 
156 	if (do_wake)
157 		wake_up(&mdev->ee_wait);
158 
159 	if (do_al_complete_io)
160 		drbd_al_complete_io(mdev, &i);
161 
162 	wake_asender(mdev->tconn);
163 	put_ldev(mdev);
164 }
165 
166 /* writes on behalf of the partner, or resync writes,
167  * "submitted" by the receiver.
168  */
169 void drbd_peer_request_endio(struct bio *bio, int error)
170 {
171 	struct drbd_peer_request *peer_req = bio->bi_private;
172 	struct drbd_conf *mdev = peer_req->w.mdev;
173 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
174 	int is_write = bio_data_dir(bio) == WRITE;
175 
176 	if (error && __ratelimit(&drbd_ratelimit_state))
177 		dev_warn(DEV, "%s: error=%d s=%llus\n",
178 				is_write ? "write" : "read", error,
179 				(unsigned long long)peer_req->i.sector);
180 	if (!error && !uptodate) {
181 		if (__ratelimit(&drbd_ratelimit_state))
182 			dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
183 					is_write ? "write" : "read",
184 					(unsigned long long)peer_req->i.sector);
185 		/* strange behavior of some lower level drivers...
186 		 * fail the request by clearing the uptodate flag,
187 		 * but do not return any error?! */
188 		error = -EIO;
189 	}
190 
191 	if (error)
192 		set_bit(__EE_WAS_ERROR, &peer_req->flags);
193 
194 	bio_put(bio); /* no need for the bio anymore */
195 	if (atomic_dec_and_test(&peer_req->pending_bios)) {
196 		if (is_write)
197 			drbd_endio_write_sec_final(peer_req);
198 		else
199 			drbd_endio_read_sec_final(peer_req);
200 	}
201 }
202 
203 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
204  */
205 void drbd_request_endio(struct bio *bio, int error)
206 {
207 	unsigned long flags;
208 	struct drbd_request *req = bio->bi_private;
209 	struct drbd_conf *mdev = req->w.mdev;
210 	struct bio_and_error m;
211 	enum drbd_req_event what;
212 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
213 
214 	if (!error && !uptodate) {
215 		dev_warn(DEV, "p %s: setting error to -EIO\n",
216 			 bio_data_dir(bio) == WRITE ? "write" : "read");
217 		/* strange behavior of some lower level drivers...
218 		 * fail the request by clearing the uptodate flag,
219 		 * but do not return any error?! */
220 		error = -EIO;
221 	}
222 
223 
224 	/* If this request was aborted locally before,
225 	 * but now was completed "successfully",
226 	 * chances are that this caused arbitrary data corruption.
227 	 *
228 	 * "aborting" requests, or force-detaching the disk, is intended for
229 	 * completely blocked/hung local backing devices which do no longer
230 	 * complete requests at all, not even do error completions.  In this
231 	 * situation, usually a hard-reset and failover is the only way out.
232 	 *
233 	 * By "aborting", basically faking a local error-completion,
234 	 * we allow for a more graceful swichover by cleanly migrating services.
235 	 * Still the affected node has to be rebooted "soon".
236 	 *
237 	 * By completing these requests, we allow the upper layers to re-use
238 	 * the associated data pages.
239 	 *
240 	 * If later the local backing device "recovers", and now DMAs some data
241 	 * from disk into the original request pages, in the best case it will
242 	 * just put random data into unused pages; but typically it will corrupt
243 	 * meanwhile completely unrelated data, causing all sorts of damage.
244 	 *
245 	 * Which means delayed successful completion,
246 	 * especially for READ requests,
247 	 * is a reason to panic().
248 	 *
249 	 * We assume that a delayed *error* completion is OK,
250 	 * though we still will complain noisily about it.
251 	 */
252 	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
253 		if (__ratelimit(&drbd_ratelimit_state))
254 			dev_emerg(DEV, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
255 
256 		if (!error)
257 			panic("possible random memory corruption caused by delayed completion of aborted local request\n");
258 	}
259 
260 	/* to avoid recursion in __req_mod */
261 	if (unlikely(error)) {
262 		what = (bio_data_dir(bio) == WRITE)
263 			? WRITE_COMPLETED_WITH_ERROR
264 			: (bio_rw(bio) == READ)
265 			  ? READ_COMPLETED_WITH_ERROR
266 			  : READ_AHEAD_COMPLETED_WITH_ERROR;
267 	} else
268 		what = COMPLETED_OK;
269 
270 	bio_put(req->private_bio);
271 	req->private_bio = ERR_PTR(error);
272 
273 	/* not req_mod(), we need irqsave here! */
274 	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
275 	__req_mod(req, what, &m);
276 	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
277 	put_ldev(mdev);
278 
279 	if (m.bio)
280 		complete_master_bio(mdev, &m);
281 }
282 
283 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
284 		  struct drbd_peer_request *peer_req, void *digest)
285 {
286 	struct hash_desc desc;
287 	struct scatterlist sg;
288 	struct page *page = peer_req->pages;
289 	struct page *tmp;
290 	unsigned len;
291 
292 	desc.tfm = tfm;
293 	desc.flags = 0;
294 
295 	sg_init_table(&sg, 1);
296 	crypto_hash_init(&desc);
297 
298 	while ((tmp = page_chain_next(page))) {
299 		/* all but the last page will be fully used */
300 		sg_set_page(&sg, page, PAGE_SIZE, 0);
301 		crypto_hash_update(&desc, &sg, sg.length);
302 		page = tmp;
303 	}
304 	/* and now the last, possibly only partially used page */
305 	len = peer_req->i.size & (PAGE_SIZE - 1);
306 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
307 	crypto_hash_update(&desc, &sg, sg.length);
308 	crypto_hash_final(&desc, digest);
309 }
310 
311 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
312 {
313 	struct hash_desc desc;
314 	struct scatterlist sg;
315 	struct bio_vec *bvec;
316 	int i;
317 
318 	desc.tfm = tfm;
319 	desc.flags = 0;
320 
321 	sg_init_table(&sg, 1);
322 	crypto_hash_init(&desc);
323 
324 	bio_for_each_segment(bvec, bio, i) {
325 		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
326 		crypto_hash_update(&desc, &sg, sg.length);
327 	}
328 	crypto_hash_final(&desc, digest);
329 }
330 
331 /* MAYBE merge common code with w_e_end_ov_req */
332 static int w_e_send_csum(struct drbd_work *w, int cancel)
333 {
334 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
335 	struct drbd_conf *mdev = w->mdev;
336 	int digest_size;
337 	void *digest;
338 	int err = 0;
339 
340 	if (unlikely(cancel))
341 		goto out;
342 
343 	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
344 		goto out;
345 
346 	digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
347 	digest = kmalloc(digest_size, GFP_NOIO);
348 	if (digest) {
349 		sector_t sector = peer_req->i.sector;
350 		unsigned int size = peer_req->i.size;
351 		drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
352 		/* Free peer_req and pages before send.
353 		 * In case we block on congestion, we could otherwise run into
354 		 * some distributed deadlock, if the other side blocks on
355 		 * congestion as well, because our receiver blocks in
356 		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
357 		drbd_free_peer_req(mdev, peer_req);
358 		peer_req = NULL;
359 		inc_rs_pending(mdev);
360 		err = drbd_send_drequest_csum(mdev, sector, size,
361 					      digest, digest_size,
362 					      P_CSUM_RS_REQUEST);
363 		kfree(digest);
364 	} else {
365 		dev_err(DEV, "kmalloc() of digest failed.\n");
366 		err = -ENOMEM;
367 	}
368 
369 out:
370 	if (peer_req)
371 		drbd_free_peer_req(mdev, peer_req);
372 
373 	if (unlikely(err))
374 		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
375 	return err;
376 }
377 
378 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
379 
380 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
381 {
382 	struct drbd_peer_request *peer_req;
383 
384 	if (!get_ldev(mdev))
385 		return -EIO;
386 
387 	if (drbd_rs_should_slow_down(mdev, sector))
388 		goto defer;
389 
390 	/* GFP_TRY, because if there is no memory available right now, this may
391 	 * be rescheduled for later. It is "only" background resync, after all. */
392 	peer_req = drbd_alloc_peer_req(mdev, ID_SYNCER /* unused */, sector,
393 				       size, GFP_TRY);
394 	if (!peer_req)
395 		goto defer;
396 
397 	peer_req->w.cb = w_e_send_csum;
398 	spin_lock_irq(&mdev->tconn->req_lock);
399 	list_add(&peer_req->w.list, &mdev->read_ee);
400 	spin_unlock_irq(&mdev->tconn->req_lock);
401 
402 	atomic_add(size >> 9, &mdev->rs_sect_ev);
403 	if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
404 		return 0;
405 
406 	/* If it failed because of ENOMEM, retry should help.  If it failed
407 	 * because bio_add_page failed (probably broken lower level driver),
408 	 * retry may or may not help.
409 	 * If it does not, you may need to force disconnect. */
410 	spin_lock_irq(&mdev->tconn->req_lock);
411 	list_del(&peer_req->w.list);
412 	spin_unlock_irq(&mdev->tconn->req_lock);
413 
414 	drbd_free_peer_req(mdev, peer_req);
415 defer:
416 	put_ldev(mdev);
417 	return -EAGAIN;
418 }
419 
420 int w_resync_timer(struct drbd_work *w, int cancel)
421 {
422 	struct drbd_conf *mdev = w->mdev;
423 	switch (mdev->state.conn) {
424 	case C_VERIFY_S:
425 		w_make_ov_request(w, cancel);
426 		break;
427 	case C_SYNC_TARGET:
428 		w_make_resync_request(w, cancel);
429 		break;
430 	}
431 
432 	return 0;
433 }
434 
435 void resync_timer_fn(unsigned long data)
436 {
437 	struct drbd_conf *mdev = (struct drbd_conf *) data;
438 
439 	if (list_empty(&mdev->resync_work.list))
440 		drbd_queue_work(&mdev->tconn->sender_work, &mdev->resync_work);
441 }
442 
443 static void fifo_set(struct fifo_buffer *fb, int value)
444 {
445 	int i;
446 
447 	for (i = 0; i < fb->size; i++)
448 		fb->values[i] = value;
449 }
450 
451 static int fifo_push(struct fifo_buffer *fb, int value)
452 {
453 	int ov;
454 
455 	ov = fb->values[fb->head_index];
456 	fb->values[fb->head_index++] = value;
457 
458 	if (fb->head_index >= fb->size)
459 		fb->head_index = 0;
460 
461 	return ov;
462 }
463 
464 static void fifo_add_val(struct fifo_buffer *fb, int value)
465 {
466 	int i;
467 
468 	for (i = 0; i < fb->size; i++)
469 		fb->values[i] += value;
470 }
471 
472 struct fifo_buffer *fifo_alloc(int fifo_size)
473 {
474 	struct fifo_buffer *fb;
475 
476 	fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
477 	if (!fb)
478 		return NULL;
479 
480 	fb->head_index = 0;
481 	fb->size = fifo_size;
482 	fb->total = 0;
483 
484 	return fb;
485 }
486 
487 static int drbd_rs_controller(struct drbd_conf *mdev)
488 {
489 	struct disk_conf *dc;
490 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
491 	unsigned int want;     /* The number of sectors we want in the proxy */
492 	int req_sect; /* Number of sectors to request in this turn */
493 	int correction; /* Number of sectors more we need in the proxy*/
494 	int cps; /* correction per invocation of drbd_rs_controller() */
495 	int steps; /* Number of time steps to plan ahead */
496 	int curr_corr;
497 	int max_sect;
498 	struct fifo_buffer *plan;
499 
500 	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
501 	mdev->rs_in_flight -= sect_in;
502 
503 	dc = rcu_dereference(mdev->ldev->disk_conf);
504 	plan = rcu_dereference(mdev->rs_plan_s);
505 
506 	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
507 
508 	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
509 		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
510 	} else { /* normal path */
511 		want = dc->c_fill_target ? dc->c_fill_target :
512 			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
513 	}
514 
515 	correction = want - mdev->rs_in_flight - plan->total;
516 
517 	/* Plan ahead */
518 	cps = correction / steps;
519 	fifo_add_val(plan, cps);
520 	plan->total += cps * steps;
521 
522 	/* What we do in this step */
523 	curr_corr = fifo_push(plan, 0);
524 	plan->total -= curr_corr;
525 
526 	req_sect = sect_in + curr_corr;
527 	if (req_sect < 0)
528 		req_sect = 0;
529 
530 	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
531 	if (req_sect > max_sect)
532 		req_sect = max_sect;
533 
534 	/*
535 	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
536 		 sect_in, mdev->rs_in_flight, want, correction,
537 		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
538 	*/
539 
540 	return req_sect;
541 }
542 
543 static int drbd_rs_number_requests(struct drbd_conf *mdev)
544 {
545 	int number;
546 
547 	rcu_read_lock();
548 	if (rcu_dereference(mdev->rs_plan_s)->size) {
549 		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
550 		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
551 	} else {
552 		mdev->c_sync_rate = rcu_dereference(mdev->ldev->disk_conf)->resync_rate;
553 		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
554 	}
555 	rcu_read_unlock();
556 
557 	/* ignore the amount of pending requests, the resync controller should
558 	 * throttle down to incoming reply rate soon enough anyways. */
559 	return number;
560 }
561 
562 int w_make_resync_request(struct drbd_work *w, int cancel)
563 {
564 	struct drbd_conf *mdev = w->mdev;
565 	unsigned long bit;
566 	sector_t sector;
567 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
568 	int max_bio_size;
569 	int number, rollback_i, size;
570 	int align, queued, sndbuf;
571 	int i = 0;
572 
573 	if (unlikely(cancel))
574 		return 0;
575 
576 	if (mdev->rs_total == 0) {
577 		/* empty resync? */
578 		drbd_resync_finished(mdev);
579 		return 0;
580 	}
581 
582 	if (!get_ldev(mdev)) {
583 		/* Since we only need to access mdev->rsync a
584 		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
585 		   to continue resync with a broken disk makes no sense at
586 		   all */
587 		dev_err(DEV, "Disk broke down during resync!\n");
588 		return 0;
589 	}
590 
591 	max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
592 	number = drbd_rs_number_requests(mdev);
593 	if (number == 0)
594 		goto requeue;
595 
596 	for (i = 0; i < number; i++) {
597 		/* Stop generating RS requests, when half of the send buffer is filled */
598 		mutex_lock(&mdev->tconn->data.mutex);
599 		if (mdev->tconn->data.socket) {
600 			queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
601 			sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
602 		} else {
603 			queued = 1;
604 			sndbuf = 0;
605 		}
606 		mutex_unlock(&mdev->tconn->data.mutex);
607 		if (queued > sndbuf / 2)
608 			goto requeue;
609 
610 next_sector:
611 		size = BM_BLOCK_SIZE;
612 		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
613 
614 		if (bit == DRBD_END_OF_BITMAP) {
615 			mdev->bm_resync_fo = drbd_bm_bits(mdev);
616 			put_ldev(mdev);
617 			return 0;
618 		}
619 
620 		sector = BM_BIT_TO_SECT(bit);
621 
622 		if (drbd_rs_should_slow_down(mdev, sector) ||
623 		    drbd_try_rs_begin_io(mdev, sector)) {
624 			mdev->bm_resync_fo = bit;
625 			goto requeue;
626 		}
627 		mdev->bm_resync_fo = bit + 1;
628 
629 		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
630 			drbd_rs_complete_io(mdev, sector);
631 			goto next_sector;
632 		}
633 
634 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
635 		/* try to find some adjacent bits.
636 		 * we stop if we have already the maximum req size.
637 		 *
638 		 * Additionally always align bigger requests, in order to
639 		 * be prepared for all stripe sizes of software RAIDs.
640 		 */
641 		align = 1;
642 		rollback_i = i;
643 		for (;;) {
644 			if (size + BM_BLOCK_SIZE > max_bio_size)
645 				break;
646 
647 			/* Be always aligned */
648 			if (sector & ((1<<(align+3))-1))
649 				break;
650 
651 			/* do not cross extent boundaries */
652 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
653 				break;
654 			/* now, is it actually dirty, after all?
655 			 * caution, drbd_bm_test_bit is tri-state for some
656 			 * obscure reason; ( b == 0 ) would get the out-of-band
657 			 * only accidentally right because of the "oddly sized"
658 			 * adjustment below */
659 			if (drbd_bm_test_bit(mdev, bit+1) != 1)
660 				break;
661 			bit++;
662 			size += BM_BLOCK_SIZE;
663 			if ((BM_BLOCK_SIZE << align) <= size)
664 				align++;
665 			i++;
666 		}
667 		/* if we merged some,
668 		 * reset the offset to start the next drbd_bm_find_next from */
669 		if (size > BM_BLOCK_SIZE)
670 			mdev->bm_resync_fo = bit + 1;
671 #endif
672 
673 		/* adjust very last sectors, in case we are oddly sized */
674 		if (sector + (size>>9) > capacity)
675 			size = (capacity-sector)<<9;
676 		if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
677 			switch (read_for_csum(mdev, sector, size)) {
678 			case -EIO: /* Disk failure */
679 				put_ldev(mdev);
680 				return -EIO;
681 			case -EAGAIN: /* allocation failed, or ldev busy */
682 				drbd_rs_complete_io(mdev, sector);
683 				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
684 				i = rollback_i;
685 				goto requeue;
686 			case 0:
687 				/* everything ok */
688 				break;
689 			default:
690 				BUG();
691 			}
692 		} else {
693 			int err;
694 
695 			inc_rs_pending(mdev);
696 			err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
697 						 sector, size, ID_SYNCER);
698 			if (err) {
699 				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
700 				dec_rs_pending(mdev);
701 				put_ldev(mdev);
702 				return err;
703 			}
704 		}
705 	}
706 
707 	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
708 		/* last syncer _request_ was sent,
709 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
710 		 * next sync group will resume), as soon as we receive the last
711 		 * resync data block, and the last bit is cleared.
712 		 * until then resync "work" is "inactive" ...
713 		 */
714 		put_ldev(mdev);
715 		return 0;
716 	}
717 
718  requeue:
719 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
720 	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
721 	put_ldev(mdev);
722 	return 0;
723 }
724 
725 static int w_make_ov_request(struct drbd_work *w, int cancel)
726 {
727 	struct drbd_conf *mdev = w->mdev;
728 	int number, i, size;
729 	sector_t sector;
730 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
731 	bool stop_sector_reached = false;
732 
733 	if (unlikely(cancel))
734 		return 1;
735 
736 	number = drbd_rs_number_requests(mdev);
737 
738 	sector = mdev->ov_position;
739 	for (i = 0; i < number; i++) {
740 		if (sector >= capacity)
741 			return 1;
742 
743 		/* We check for "finished" only in the reply path:
744 		 * w_e_end_ov_reply().
745 		 * We need to send at least one request out. */
746 		stop_sector_reached = i > 0
747 			&& verify_can_do_stop_sector(mdev)
748 			&& sector >= mdev->ov_stop_sector;
749 		if (stop_sector_reached)
750 			break;
751 
752 		size = BM_BLOCK_SIZE;
753 
754 		if (drbd_rs_should_slow_down(mdev, sector) ||
755 		    drbd_try_rs_begin_io(mdev, sector)) {
756 			mdev->ov_position = sector;
757 			goto requeue;
758 		}
759 
760 		if (sector + (size>>9) > capacity)
761 			size = (capacity-sector)<<9;
762 
763 		inc_rs_pending(mdev);
764 		if (drbd_send_ov_request(mdev, sector, size)) {
765 			dec_rs_pending(mdev);
766 			return 0;
767 		}
768 		sector += BM_SECT_PER_BIT;
769 	}
770 	mdev->ov_position = sector;
771 
772  requeue:
773 	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
774 	if (i == 0 || !stop_sector_reached)
775 		mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
776 	return 1;
777 }
778 
779 int w_ov_finished(struct drbd_work *w, int cancel)
780 {
781 	struct drbd_conf *mdev = w->mdev;
782 	kfree(w);
783 	ov_out_of_sync_print(mdev);
784 	drbd_resync_finished(mdev);
785 
786 	return 0;
787 }
788 
789 static int w_resync_finished(struct drbd_work *w, int cancel)
790 {
791 	struct drbd_conf *mdev = w->mdev;
792 	kfree(w);
793 
794 	drbd_resync_finished(mdev);
795 
796 	return 0;
797 }
798 
799 static void ping_peer(struct drbd_conf *mdev)
800 {
801 	struct drbd_tconn *tconn = mdev->tconn;
802 
803 	clear_bit(GOT_PING_ACK, &tconn->flags);
804 	request_ping(tconn);
805 	wait_event(tconn->ping_wait,
806 		   test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
807 }
808 
809 int drbd_resync_finished(struct drbd_conf *mdev)
810 {
811 	unsigned long db, dt, dbdt;
812 	unsigned long n_oos;
813 	union drbd_state os, ns;
814 	struct drbd_work *w;
815 	char *khelper_cmd = NULL;
816 	int verify_done = 0;
817 
818 	/* Remove all elements from the resync LRU. Since future actions
819 	 * might set bits in the (main) bitmap, then the entries in the
820 	 * resync LRU would be wrong. */
821 	if (drbd_rs_del_all(mdev)) {
822 		/* In case this is not possible now, most probably because
823 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
824 		 * queue (or even the read operations for those packets
825 		 * is not finished by now).   Retry in 100ms. */
826 
827 		schedule_timeout_interruptible(HZ / 10);
828 		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
829 		if (w) {
830 			w->cb = w_resync_finished;
831 			w->mdev = mdev;
832 			drbd_queue_work(&mdev->tconn->sender_work, w);
833 			return 1;
834 		}
835 		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
836 	}
837 
838 	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
839 	if (dt <= 0)
840 		dt = 1;
841 
842 	db = mdev->rs_total;
843 	/* adjust for verify start and stop sectors, respective reached position */
844 	if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
845 		db -= mdev->ov_left;
846 
847 	dbdt = Bit2KB(db/dt);
848 	mdev->rs_paused /= HZ;
849 
850 	if (!get_ldev(mdev))
851 		goto out;
852 
853 	ping_peer(mdev);
854 
855 	spin_lock_irq(&mdev->tconn->req_lock);
856 	os = drbd_read_state(mdev);
857 
858 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
859 
860 	/* This protects us against multiple calls (that can happen in the presence
861 	   of application IO), and against connectivity loss just before we arrive here. */
862 	if (os.conn <= C_CONNECTED)
863 		goto out_unlock;
864 
865 	ns = os;
866 	ns.conn = C_CONNECTED;
867 
868 	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
869 	     verify_done ? "Online verify" : "Resync",
870 	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
871 
872 	n_oos = drbd_bm_total_weight(mdev);
873 
874 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
875 		if (n_oos) {
876 			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
877 			      n_oos, Bit2KB(1));
878 			khelper_cmd = "out-of-sync";
879 		}
880 	} else {
881 		D_ASSERT((n_oos - mdev->rs_failed) == 0);
882 
883 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
884 			khelper_cmd = "after-resync-target";
885 
886 		if (mdev->tconn->csums_tfm && mdev->rs_total) {
887 			const unsigned long s = mdev->rs_same_csum;
888 			const unsigned long t = mdev->rs_total;
889 			const int ratio =
890 				(t == 0)     ? 0 :
891 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
892 			dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
893 			     "transferred %luK total %luK\n",
894 			     ratio,
895 			     Bit2KB(mdev->rs_same_csum),
896 			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
897 			     Bit2KB(mdev->rs_total));
898 		}
899 	}
900 
901 	if (mdev->rs_failed) {
902 		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
903 
904 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
905 			ns.disk = D_INCONSISTENT;
906 			ns.pdsk = D_UP_TO_DATE;
907 		} else {
908 			ns.disk = D_UP_TO_DATE;
909 			ns.pdsk = D_INCONSISTENT;
910 		}
911 	} else {
912 		ns.disk = D_UP_TO_DATE;
913 		ns.pdsk = D_UP_TO_DATE;
914 
915 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
916 			if (mdev->p_uuid) {
917 				int i;
918 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
919 					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
920 				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
921 				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
922 			} else {
923 				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
924 			}
925 		}
926 
927 		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
928 			/* for verify runs, we don't update uuids here,
929 			 * so there would be nothing to report. */
930 			drbd_uuid_set_bm(mdev, 0UL);
931 			drbd_print_uuids(mdev, "updated UUIDs");
932 			if (mdev->p_uuid) {
933 				/* Now the two UUID sets are equal, update what we
934 				 * know of the peer. */
935 				int i;
936 				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
937 					mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
938 			}
939 		}
940 	}
941 
942 	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
943 out_unlock:
944 	spin_unlock_irq(&mdev->tconn->req_lock);
945 	put_ldev(mdev);
946 out:
947 	mdev->rs_total  = 0;
948 	mdev->rs_failed = 0;
949 	mdev->rs_paused = 0;
950 
951 	/* reset start sector, if we reached end of device */
952 	if (verify_done && mdev->ov_left == 0)
953 		mdev->ov_start_sector = 0;
954 
955 	drbd_md_sync(mdev);
956 
957 	if (khelper_cmd)
958 		drbd_khelper(mdev, khelper_cmd);
959 
960 	return 1;
961 }
962 
963 /* helper */
964 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
965 {
966 	if (drbd_peer_req_has_active_page(peer_req)) {
967 		/* This might happen if sendpage() has not finished */
968 		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
969 		atomic_add(i, &mdev->pp_in_use_by_net);
970 		atomic_sub(i, &mdev->pp_in_use);
971 		spin_lock_irq(&mdev->tconn->req_lock);
972 		list_add_tail(&peer_req->w.list, &mdev->net_ee);
973 		spin_unlock_irq(&mdev->tconn->req_lock);
974 		wake_up(&drbd_pp_wait);
975 	} else
976 		drbd_free_peer_req(mdev, peer_req);
977 }
978 
979 /**
980  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
981  * @mdev:	DRBD device.
982  * @w:		work object.
983  * @cancel:	The connection will be closed anyways
984  */
985 int w_e_end_data_req(struct drbd_work *w, int cancel)
986 {
987 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
988 	struct drbd_conf *mdev = w->mdev;
989 	int err;
990 
991 	if (unlikely(cancel)) {
992 		drbd_free_peer_req(mdev, peer_req);
993 		dec_unacked(mdev);
994 		return 0;
995 	}
996 
997 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
998 		err = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
999 	} else {
1000 		if (__ratelimit(&drbd_ratelimit_state))
1001 			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
1002 			    (unsigned long long)peer_req->i.sector);
1003 
1004 		err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
1005 	}
1006 
1007 	dec_unacked(mdev);
1008 
1009 	move_to_net_ee_or_free(mdev, peer_req);
1010 
1011 	if (unlikely(err))
1012 		dev_err(DEV, "drbd_send_block() failed\n");
1013 	return err;
1014 }
1015 
1016 /**
1017  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1018  * @mdev:	DRBD device.
1019  * @w:		work object.
1020  * @cancel:	The connection will be closed anyways
1021  */
1022 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1023 {
1024 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1025 	struct drbd_conf *mdev = w->mdev;
1026 	int err;
1027 
1028 	if (unlikely(cancel)) {
1029 		drbd_free_peer_req(mdev, peer_req);
1030 		dec_unacked(mdev);
1031 		return 0;
1032 	}
1033 
1034 	if (get_ldev_if_state(mdev, D_FAILED)) {
1035 		drbd_rs_complete_io(mdev, peer_req->i.sector);
1036 		put_ldev(mdev);
1037 	}
1038 
1039 	if (mdev->state.conn == C_AHEAD) {
1040 		err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
1041 	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1042 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1043 			inc_rs_pending(mdev);
1044 			err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1045 		} else {
1046 			if (__ratelimit(&drbd_ratelimit_state))
1047 				dev_err(DEV, "Not sending RSDataReply, "
1048 				    "partner DISKLESS!\n");
1049 			err = 0;
1050 		}
1051 	} else {
1052 		if (__ratelimit(&drbd_ratelimit_state))
1053 			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1054 			    (unsigned long long)peer_req->i.sector);
1055 
1056 		err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1057 
1058 		/* update resync data with failure */
1059 		drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
1060 	}
1061 
1062 	dec_unacked(mdev);
1063 
1064 	move_to_net_ee_or_free(mdev, peer_req);
1065 
1066 	if (unlikely(err))
1067 		dev_err(DEV, "drbd_send_block() failed\n");
1068 	return err;
1069 }
1070 
1071 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1072 {
1073 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1074 	struct drbd_conf *mdev = w->mdev;
1075 	struct digest_info *di;
1076 	int digest_size;
1077 	void *digest = NULL;
1078 	int err, eq = 0;
1079 
1080 	if (unlikely(cancel)) {
1081 		drbd_free_peer_req(mdev, peer_req);
1082 		dec_unacked(mdev);
1083 		return 0;
1084 	}
1085 
1086 	if (get_ldev(mdev)) {
1087 		drbd_rs_complete_io(mdev, peer_req->i.sector);
1088 		put_ldev(mdev);
1089 	}
1090 
1091 	di = peer_req->digest;
1092 
1093 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1094 		/* quick hack to try to avoid a race against reconfiguration.
1095 		 * a real fix would be much more involved,
1096 		 * introducing more locking mechanisms */
1097 		if (mdev->tconn->csums_tfm) {
1098 			digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1099 			D_ASSERT(digest_size == di->digest_size);
1100 			digest = kmalloc(digest_size, GFP_NOIO);
1101 		}
1102 		if (digest) {
1103 			drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1104 			eq = !memcmp(digest, di->digest, digest_size);
1105 			kfree(digest);
1106 		}
1107 
1108 		if (eq) {
1109 			drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1110 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1111 			mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1112 			err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1113 		} else {
1114 			inc_rs_pending(mdev);
1115 			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1116 			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1117 			kfree(di);
1118 			err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1119 		}
1120 	} else {
1121 		err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1122 		if (__ratelimit(&drbd_ratelimit_state))
1123 			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1124 	}
1125 
1126 	dec_unacked(mdev);
1127 	move_to_net_ee_or_free(mdev, peer_req);
1128 
1129 	if (unlikely(err))
1130 		dev_err(DEV, "drbd_send_block/ack() failed\n");
1131 	return err;
1132 }
1133 
1134 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1135 {
1136 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1137 	struct drbd_conf *mdev = w->mdev;
1138 	sector_t sector = peer_req->i.sector;
1139 	unsigned int size = peer_req->i.size;
1140 	int digest_size;
1141 	void *digest;
1142 	int err = 0;
1143 
1144 	if (unlikely(cancel))
1145 		goto out;
1146 
1147 	digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1148 	digest = kmalloc(digest_size, GFP_NOIO);
1149 	if (!digest) {
1150 		err = 1;	/* terminate the connection in case the allocation failed */
1151 		goto out;
1152 	}
1153 
1154 	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1155 		drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1156 	else
1157 		memset(digest, 0, digest_size);
1158 
1159 	/* Free e and pages before send.
1160 	 * In case we block on congestion, we could otherwise run into
1161 	 * some distributed deadlock, if the other side blocks on
1162 	 * congestion as well, because our receiver blocks in
1163 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1164 	drbd_free_peer_req(mdev, peer_req);
1165 	peer_req = NULL;
1166 	inc_rs_pending(mdev);
1167 	err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY);
1168 	if (err)
1169 		dec_rs_pending(mdev);
1170 	kfree(digest);
1171 
1172 out:
1173 	if (peer_req)
1174 		drbd_free_peer_req(mdev, peer_req);
1175 	dec_unacked(mdev);
1176 	return err;
1177 }
1178 
1179 void drbd_ov_out_of_sync_found(struct drbd_conf *mdev, sector_t sector, int size)
1180 {
1181 	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1182 		mdev->ov_last_oos_size += size>>9;
1183 	} else {
1184 		mdev->ov_last_oos_start = sector;
1185 		mdev->ov_last_oos_size = size>>9;
1186 	}
1187 	drbd_set_out_of_sync(mdev, sector, size);
1188 }
1189 
1190 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1191 {
1192 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1193 	struct drbd_conf *mdev = w->mdev;
1194 	struct digest_info *di;
1195 	void *digest;
1196 	sector_t sector = peer_req->i.sector;
1197 	unsigned int size = peer_req->i.size;
1198 	int digest_size;
1199 	int err, eq = 0;
1200 	bool stop_sector_reached = false;
1201 
1202 	if (unlikely(cancel)) {
1203 		drbd_free_peer_req(mdev, peer_req);
1204 		dec_unacked(mdev);
1205 		return 0;
1206 	}
1207 
1208 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1209 	 * the resync lru has been cleaned up already */
1210 	if (get_ldev(mdev)) {
1211 		drbd_rs_complete_io(mdev, peer_req->i.sector);
1212 		put_ldev(mdev);
1213 	}
1214 
1215 	di = peer_req->digest;
1216 
1217 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1218 		digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1219 		digest = kmalloc(digest_size, GFP_NOIO);
1220 		if (digest) {
1221 			drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1222 
1223 			D_ASSERT(digest_size == di->digest_size);
1224 			eq = !memcmp(digest, di->digest, digest_size);
1225 			kfree(digest);
1226 		}
1227 	}
1228 
1229 	/* Free peer_req and pages before send.
1230 	 * In case we block on congestion, we could otherwise run into
1231 	 * some distributed deadlock, if the other side blocks on
1232 	 * congestion as well, because our receiver blocks in
1233 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1234 	drbd_free_peer_req(mdev, peer_req);
1235 	if (!eq)
1236 		drbd_ov_out_of_sync_found(mdev, sector, size);
1237 	else
1238 		ov_out_of_sync_print(mdev);
1239 
1240 	err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1241 			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1242 
1243 	dec_unacked(mdev);
1244 
1245 	--mdev->ov_left;
1246 
1247 	/* let's advance progress step marks only for every other megabyte */
1248 	if ((mdev->ov_left & 0x200) == 0x200)
1249 		drbd_advance_rs_marks(mdev, mdev->ov_left);
1250 
1251 	stop_sector_reached = verify_can_do_stop_sector(mdev) &&
1252 		(sector + (size>>9)) >= mdev->ov_stop_sector;
1253 
1254 	if (mdev->ov_left == 0 || stop_sector_reached) {
1255 		ov_out_of_sync_print(mdev);
1256 		drbd_resync_finished(mdev);
1257 	}
1258 
1259 	return err;
1260 }
1261 
1262 int w_prev_work_done(struct drbd_work *w, int cancel)
1263 {
1264 	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1265 
1266 	complete(&b->done);
1267 	return 0;
1268 }
1269 
1270 /* FIXME
1271  * We need to track the number of pending barrier acks,
1272  * and to be able to wait for them.
1273  * See also comment in drbd_adm_attach before drbd_suspend_io.
1274  */
1275 int drbd_send_barrier(struct drbd_tconn *tconn)
1276 {
1277 	struct p_barrier *p;
1278 	struct drbd_socket *sock;
1279 
1280 	sock = &tconn->data;
1281 	p = conn_prepare_command(tconn, sock);
1282 	if (!p)
1283 		return -EIO;
1284 	p->barrier = tconn->send.current_epoch_nr;
1285 	p->pad = 0;
1286 	tconn->send.current_epoch_writes = 0;
1287 
1288 	return conn_send_command(tconn, sock, P_BARRIER, sizeof(*p), NULL, 0);
1289 }
1290 
1291 int w_send_write_hint(struct drbd_work *w, int cancel)
1292 {
1293 	struct drbd_conf *mdev = w->mdev;
1294 	struct drbd_socket *sock;
1295 
1296 	if (cancel)
1297 		return 0;
1298 	sock = &mdev->tconn->data;
1299 	if (!drbd_prepare_command(mdev, sock))
1300 		return -EIO;
1301 	return drbd_send_command(mdev, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1302 }
1303 
1304 static void re_init_if_first_write(struct drbd_tconn *tconn, unsigned int epoch)
1305 {
1306 	if (!tconn->send.seen_any_write_yet) {
1307 		tconn->send.seen_any_write_yet = true;
1308 		tconn->send.current_epoch_nr = epoch;
1309 		tconn->send.current_epoch_writes = 0;
1310 	}
1311 }
1312 
1313 static void maybe_send_barrier(struct drbd_tconn *tconn, unsigned int epoch)
1314 {
1315 	/* re-init if first write on this connection */
1316 	if (!tconn->send.seen_any_write_yet)
1317 		return;
1318 	if (tconn->send.current_epoch_nr != epoch) {
1319 		if (tconn->send.current_epoch_writes)
1320 			drbd_send_barrier(tconn);
1321 		tconn->send.current_epoch_nr = epoch;
1322 	}
1323 }
1324 
1325 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1326 {
1327 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1328 	struct drbd_conf *mdev = w->mdev;
1329 	struct drbd_tconn *tconn = mdev->tconn;
1330 	int err;
1331 
1332 	if (unlikely(cancel)) {
1333 		req_mod(req, SEND_CANCELED);
1334 		return 0;
1335 	}
1336 
1337 	/* this time, no tconn->send.current_epoch_writes++;
1338 	 * If it was sent, it was the closing barrier for the last
1339 	 * replicated epoch, before we went into AHEAD mode.
1340 	 * No more barriers will be sent, until we leave AHEAD mode again. */
1341 	maybe_send_barrier(tconn, req->epoch);
1342 
1343 	err = drbd_send_out_of_sync(mdev, req);
1344 	req_mod(req, OOS_HANDED_TO_NETWORK);
1345 
1346 	return err;
1347 }
1348 
1349 /**
1350  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1351  * @mdev:	DRBD device.
1352  * @w:		work object.
1353  * @cancel:	The connection will be closed anyways
1354  */
1355 int w_send_dblock(struct drbd_work *w, int cancel)
1356 {
1357 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1358 	struct drbd_conf *mdev = w->mdev;
1359 	struct drbd_tconn *tconn = mdev->tconn;
1360 	int err;
1361 
1362 	if (unlikely(cancel)) {
1363 		req_mod(req, SEND_CANCELED);
1364 		return 0;
1365 	}
1366 
1367 	re_init_if_first_write(tconn, req->epoch);
1368 	maybe_send_barrier(tconn, req->epoch);
1369 	tconn->send.current_epoch_writes++;
1370 
1371 	err = drbd_send_dblock(mdev, req);
1372 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1373 
1374 	return err;
1375 }
1376 
1377 /**
1378  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1379  * @mdev:	DRBD device.
1380  * @w:		work object.
1381  * @cancel:	The connection will be closed anyways
1382  */
1383 int w_send_read_req(struct drbd_work *w, int cancel)
1384 {
1385 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1386 	struct drbd_conf *mdev = w->mdev;
1387 	struct drbd_tconn *tconn = mdev->tconn;
1388 	int err;
1389 
1390 	if (unlikely(cancel)) {
1391 		req_mod(req, SEND_CANCELED);
1392 		return 0;
1393 	}
1394 
1395 	/* Even read requests may close a write epoch,
1396 	 * if there was any yet. */
1397 	maybe_send_barrier(tconn, req->epoch);
1398 
1399 	err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1400 				 (unsigned long)req);
1401 
1402 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1403 
1404 	return err;
1405 }
1406 
1407 int w_restart_disk_io(struct drbd_work *w, int cancel)
1408 {
1409 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1410 	struct drbd_conf *mdev = w->mdev;
1411 
1412 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1413 		drbd_al_begin_io(mdev, &req->i);
1414 
1415 	drbd_req_make_private_bio(req, req->master_bio);
1416 	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1417 	generic_make_request(req->private_bio);
1418 
1419 	return 0;
1420 }
1421 
1422 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1423 {
1424 	struct drbd_conf *odev = mdev;
1425 	int resync_after;
1426 
1427 	while (1) {
1428 		if (!odev->ldev)
1429 			return 1;
1430 		rcu_read_lock();
1431 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1432 		rcu_read_unlock();
1433 		if (resync_after == -1)
1434 			return 1;
1435 		odev = minor_to_mdev(resync_after);
1436 		if (!expect(odev))
1437 			return 1;
1438 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1439 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1440 		    odev->state.aftr_isp || odev->state.peer_isp ||
1441 		    odev->state.user_isp)
1442 			return 0;
1443 	}
1444 }
1445 
1446 /**
1447  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1448  * @mdev:	DRBD device.
1449  *
1450  * Called from process context only (admin command and after_state_ch).
1451  */
1452 static int _drbd_pause_after(struct drbd_conf *mdev)
1453 {
1454 	struct drbd_conf *odev;
1455 	int i, rv = 0;
1456 
1457 	rcu_read_lock();
1458 	idr_for_each_entry(&minors, odev, i) {
1459 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1460 			continue;
1461 		if (!_drbd_may_sync_now(odev))
1462 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1463 			       != SS_NOTHING_TO_DO);
1464 	}
1465 	rcu_read_unlock();
1466 
1467 	return rv;
1468 }
1469 
1470 /**
1471  * _drbd_resume_next() - Resume resync on all devices that may resync now
1472  * @mdev:	DRBD device.
1473  *
1474  * Called from process context only (admin command and worker).
1475  */
1476 static int _drbd_resume_next(struct drbd_conf *mdev)
1477 {
1478 	struct drbd_conf *odev;
1479 	int i, rv = 0;
1480 
1481 	rcu_read_lock();
1482 	idr_for_each_entry(&minors, odev, i) {
1483 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1484 			continue;
1485 		if (odev->state.aftr_isp) {
1486 			if (_drbd_may_sync_now(odev))
1487 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1488 							CS_HARD, NULL)
1489 				       != SS_NOTHING_TO_DO) ;
1490 		}
1491 	}
1492 	rcu_read_unlock();
1493 	return rv;
1494 }
1495 
1496 void resume_next_sg(struct drbd_conf *mdev)
1497 {
1498 	write_lock_irq(&global_state_lock);
1499 	_drbd_resume_next(mdev);
1500 	write_unlock_irq(&global_state_lock);
1501 }
1502 
1503 void suspend_other_sg(struct drbd_conf *mdev)
1504 {
1505 	write_lock_irq(&global_state_lock);
1506 	_drbd_pause_after(mdev);
1507 	write_unlock_irq(&global_state_lock);
1508 }
1509 
1510 /* caller must hold global_state_lock */
1511 enum drbd_ret_code drbd_resync_after_valid(struct drbd_conf *mdev, int o_minor)
1512 {
1513 	struct drbd_conf *odev;
1514 	int resync_after;
1515 
1516 	if (o_minor == -1)
1517 		return NO_ERROR;
1518 	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1519 		return ERR_RESYNC_AFTER;
1520 
1521 	/* check for loops */
1522 	odev = minor_to_mdev(o_minor);
1523 	while (1) {
1524 		if (odev == mdev)
1525 			return ERR_RESYNC_AFTER_CYCLE;
1526 
1527 		rcu_read_lock();
1528 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1529 		rcu_read_unlock();
1530 		/* dependency chain ends here, no cycles. */
1531 		if (resync_after == -1)
1532 			return NO_ERROR;
1533 
1534 		/* follow the dependency chain */
1535 		odev = minor_to_mdev(resync_after);
1536 	}
1537 }
1538 
1539 /* caller must hold global_state_lock */
1540 void drbd_resync_after_changed(struct drbd_conf *mdev)
1541 {
1542 	int changes;
1543 
1544 	do {
1545 		changes  = _drbd_pause_after(mdev);
1546 		changes |= _drbd_resume_next(mdev);
1547 	} while (changes);
1548 }
1549 
1550 void drbd_rs_controller_reset(struct drbd_conf *mdev)
1551 {
1552 	struct fifo_buffer *plan;
1553 
1554 	atomic_set(&mdev->rs_sect_in, 0);
1555 	atomic_set(&mdev->rs_sect_ev, 0);
1556 	mdev->rs_in_flight = 0;
1557 
1558 	/* Updating the RCU protected object in place is necessary since
1559 	   this function gets called from atomic context.
1560 	   It is valid since all other updates also lead to an completely
1561 	   empty fifo */
1562 	rcu_read_lock();
1563 	plan = rcu_dereference(mdev->rs_plan_s);
1564 	plan->total = 0;
1565 	fifo_set(plan, 0);
1566 	rcu_read_unlock();
1567 }
1568 
1569 void start_resync_timer_fn(unsigned long data)
1570 {
1571 	struct drbd_conf *mdev = (struct drbd_conf *) data;
1572 
1573 	drbd_queue_work(&mdev->tconn->sender_work, &mdev->start_resync_work);
1574 }
1575 
1576 int w_start_resync(struct drbd_work *w, int cancel)
1577 {
1578 	struct drbd_conf *mdev = w->mdev;
1579 
1580 	if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1581 		dev_warn(DEV, "w_start_resync later...\n");
1582 		mdev->start_resync_timer.expires = jiffies + HZ/10;
1583 		add_timer(&mdev->start_resync_timer);
1584 		return 0;
1585 	}
1586 
1587 	drbd_start_resync(mdev, C_SYNC_SOURCE);
1588 	clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags);
1589 	return 0;
1590 }
1591 
1592 /**
1593  * drbd_start_resync() - Start the resync process
1594  * @mdev:	DRBD device.
1595  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1596  *
1597  * This function might bring you directly into one of the
1598  * C_PAUSED_SYNC_* states.
1599  */
1600 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1601 {
1602 	union drbd_state ns;
1603 	int r;
1604 
1605 	if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1606 		dev_err(DEV, "Resync already running!\n");
1607 		return;
1608 	}
1609 
1610 	if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1611 		if (side == C_SYNC_TARGET) {
1612 			/* Since application IO was locked out during C_WF_BITMAP_T and
1613 			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1614 			   we check that we might make the data inconsistent. */
1615 			r = drbd_khelper(mdev, "before-resync-target");
1616 			r = (r >> 8) & 0xff;
1617 			if (r > 0) {
1618 				dev_info(DEV, "before-resync-target handler returned %d, "
1619 					 "dropping connection.\n", r);
1620 				conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1621 				return;
1622 			}
1623 		} else /* C_SYNC_SOURCE */ {
1624 			r = drbd_khelper(mdev, "before-resync-source");
1625 			r = (r >> 8) & 0xff;
1626 			if (r > 0) {
1627 				if (r == 3) {
1628 					dev_info(DEV, "before-resync-source handler returned %d, "
1629 						 "ignoring. Old userland tools?", r);
1630 				} else {
1631 					dev_info(DEV, "before-resync-source handler returned %d, "
1632 						 "dropping connection.\n", r);
1633 					conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1634 					return;
1635 				}
1636 			}
1637 		}
1638 	}
1639 
1640 	if (current == mdev->tconn->worker.task) {
1641 		/* The worker should not sleep waiting for state_mutex,
1642 		   that can take long */
1643 		if (!mutex_trylock(mdev->state_mutex)) {
1644 			set_bit(B_RS_H_DONE, &mdev->flags);
1645 			mdev->start_resync_timer.expires = jiffies + HZ/5;
1646 			add_timer(&mdev->start_resync_timer);
1647 			return;
1648 		}
1649 	} else {
1650 		mutex_lock(mdev->state_mutex);
1651 	}
1652 	clear_bit(B_RS_H_DONE, &mdev->flags);
1653 
1654 	write_lock_irq(&global_state_lock);
1655 	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1656 		write_unlock_irq(&global_state_lock);
1657 		mutex_unlock(mdev->state_mutex);
1658 		return;
1659 	}
1660 
1661 	ns = drbd_read_state(mdev);
1662 
1663 	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1664 
1665 	ns.conn = side;
1666 
1667 	if (side == C_SYNC_TARGET)
1668 		ns.disk = D_INCONSISTENT;
1669 	else /* side == C_SYNC_SOURCE */
1670 		ns.pdsk = D_INCONSISTENT;
1671 
1672 	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1673 	ns = drbd_read_state(mdev);
1674 
1675 	if (ns.conn < C_CONNECTED)
1676 		r = SS_UNKNOWN_ERROR;
1677 
1678 	if (r == SS_SUCCESS) {
1679 		unsigned long tw = drbd_bm_total_weight(mdev);
1680 		unsigned long now = jiffies;
1681 		int i;
1682 
1683 		mdev->rs_failed    = 0;
1684 		mdev->rs_paused    = 0;
1685 		mdev->rs_same_csum = 0;
1686 		mdev->rs_last_events = 0;
1687 		mdev->rs_last_sect_ev = 0;
1688 		mdev->rs_total     = tw;
1689 		mdev->rs_start     = now;
1690 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1691 			mdev->rs_mark_left[i] = tw;
1692 			mdev->rs_mark_time[i] = now;
1693 		}
1694 		_drbd_pause_after(mdev);
1695 	}
1696 	write_unlock_irq(&global_state_lock);
1697 
1698 	if (r == SS_SUCCESS) {
1699 		/* reset rs_last_bcast when a resync or verify is started,
1700 		 * to deal with potential jiffies wrap. */
1701 		mdev->rs_last_bcast = jiffies - HZ;
1702 
1703 		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1704 		     drbd_conn_str(ns.conn),
1705 		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1706 		     (unsigned long) mdev->rs_total);
1707 		if (side == C_SYNC_TARGET)
1708 			mdev->bm_resync_fo = 0;
1709 
1710 		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1711 		 * with w_send_oos, or the sync target will get confused as to
1712 		 * how much bits to resync.  We cannot do that always, because for an
1713 		 * empty resync and protocol < 95, we need to do it here, as we call
1714 		 * drbd_resync_finished from here in that case.
1715 		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1716 		 * and from after_state_ch otherwise. */
1717 		if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1718 			drbd_gen_and_send_sync_uuid(mdev);
1719 
1720 		if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1721 			/* This still has a race (about when exactly the peers
1722 			 * detect connection loss) that can lead to a full sync
1723 			 * on next handshake. In 8.3.9 we fixed this with explicit
1724 			 * resync-finished notifications, but the fix
1725 			 * introduces a protocol change.  Sleeping for some
1726 			 * time longer than the ping interval + timeout on the
1727 			 * SyncSource, to give the SyncTarget the chance to
1728 			 * detect connection loss, then waiting for a ping
1729 			 * response (implicit in drbd_resync_finished) reduces
1730 			 * the race considerably, but does not solve it. */
1731 			if (side == C_SYNC_SOURCE) {
1732 				struct net_conf *nc;
1733 				int timeo;
1734 
1735 				rcu_read_lock();
1736 				nc = rcu_dereference(mdev->tconn->net_conf);
1737 				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1738 				rcu_read_unlock();
1739 				schedule_timeout_interruptible(timeo);
1740 			}
1741 			drbd_resync_finished(mdev);
1742 		}
1743 
1744 		drbd_rs_controller_reset(mdev);
1745 		/* ns.conn may already be != mdev->state.conn,
1746 		 * we may have been paused in between, or become paused until
1747 		 * the timer triggers.
1748 		 * No matter, that is handled in resync_timer_fn() */
1749 		if (ns.conn == C_SYNC_TARGET)
1750 			mod_timer(&mdev->resync_timer, jiffies);
1751 
1752 		drbd_md_sync(mdev);
1753 	}
1754 	put_ldev(mdev);
1755 	mutex_unlock(mdev->state_mutex);
1756 }
1757 
1758 /* If the resource already closed the current epoch, but we did not
1759  * (because we have not yet seen new requests), we should send the
1760  * corresponding barrier now.  Must be checked within the same spinlock
1761  * that is used to check for new requests. */
1762 bool need_to_send_barrier(struct drbd_tconn *connection)
1763 {
1764 	if (!connection->send.seen_any_write_yet)
1765 		return false;
1766 
1767 	/* Skip barriers that do not contain any writes.
1768 	 * This may happen during AHEAD mode. */
1769 	if (!connection->send.current_epoch_writes)
1770 		return false;
1771 
1772 	/* ->req_lock is held when requests are queued on
1773 	 * connection->sender_work, and put into ->transfer_log.
1774 	 * It is also held when ->current_tle_nr is increased.
1775 	 * So either there are already new requests queued,
1776 	 * and corresponding barriers will be send there.
1777 	 * Or nothing new is queued yet, so the difference will be 1.
1778 	 */
1779 	if (atomic_read(&connection->current_tle_nr) !=
1780 	    connection->send.current_epoch_nr + 1)
1781 		return false;
1782 
1783 	return true;
1784 }
1785 
1786 bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1787 {
1788 	spin_lock_irq(&queue->q_lock);
1789 	list_splice_init(&queue->q, work_list);
1790 	spin_unlock_irq(&queue->q_lock);
1791 	return !list_empty(work_list);
1792 }
1793 
1794 bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
1795 {
1796 	spin_lock_irq(&queue->q_lock);
1797 	if (!list_empty(&queue->q))
1798 		list_move(queue->q.next, work_list);
1799 	spin_unlock_irq(&queue->q_lock);
1800 	return !list_empty(work_list);
1801 }
1802 
1803 void wait_for_work(struct drbd_tconn *connection, struct list_head *work_list)
1804 {
1805 	DEFINE_WAIT(wait);
1806 	struct net_conf *nc;
1807 	int uncork, cork;
1808 
1809 	dequeue_work_item(&connection->sender_work, work_list);
1810 	if (!list_empty(work_list))
1811 		return;
1812 
1813 	/* Still nothing to do?
1814 	 * Maybe we still need to close the current epoch,
1815 	 * even if no new requests are queued yet.
1816 	 *
1817 	 * Also, poke TCP, just in case.
1818 	 * Then wait for new work (or signal). */
1819 	rcu_read_lock();
1820 	nc = rcu_dereference(connection->net_conf);
1821 	uncork = nc ? nc->tcp_cork : 0;
1822 	rcu_read_unlock();
1823 	if (uncork) {
1824 		mutex_lock(&connection->data.mutex);
1825 		if (connection->data.socket)
1826 			drbd_tcp_uncork(connection->data.socket);
1827 		mutex_unlock(&connection->data.mutex);
1828 	}
1829 
1830 	for (;;) {
1831 		int send_barrier;
1832 		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
1833 		spin_lock_irq(&connection->req_lock);
1834 		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
1835 		/* dequeue single item only,
1836 		 * we still use drbd_queue_work_front() in some places */
1837 		if (!list_empty(&connection->sender_work.q))
1838 			list_move(connection->sender_work.q.next, work_list);
1839 		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
1840 		if (!list_empty(work_list) || signal_pending(current)) {
1841 			spin_unlock_irq(&connection->req_lock);
1842 			break;
1843 		}
1844 		send_barrier = need_to_send_barrier(connection);
1845 		spin_unlock_irq(&connection->req_lock);
1846 		if (send_barrier) {
1847 			drbd_send_barrier(connection);
1848 			connection->send.current_epoch_nr++;
1849 		}
1850 		schedule();
1851 		/* may be woken up for other things but new work, too,
1852 		 * e.g. if the current epoch got closed.
1853 		 * In which case we send the barrier above. */
1854 	}
1855 	finish_wait(&connection->sender_work.q_wait, &wait);
1856 
1857 	/* someone may have changed the config while we have been waiting above. */
1858 	rcu_read_lock();
1859 	nc = rcu_dereference(connection->net_conf);
1860 	cork = nc ? nc->tcp_cork : 0;
1861 	rcu_read_unlock();
1862 	mutex_lock(&connection->data.mutex);
1863 	if (connection->data.socket) {
1864 		if (cork)
1865 			drbd_tcp_cork(connection->data.socket);
1866 		else if (!uncork)
1867 			drbd_tcp_uncork(connection->data.socket);
1868 	}
1869 	mutex_unlock(&connection->data.mutex);
1870 }
1871 
1872 int drbd_worker(struct drbd_thread *thi)
1873 {
1874 	struct drbd_tconn *tconn = thi->tconn;
1875 	struct drbd_work *w = NULL;
1876 	struct drbd_conf *mdev;
1877 	LIST_HEAD(work_list);
1878 	int vnr;
1879 
1880 	while (get_t_state(thi) == RUNNING) {
1881 		drbd_thread_current_set_cpu(thi);
1882 
1883 		/* as long as we use drbd_queue_work_front(),
1884 		 * we may only dequeue single work items here, not batches. */
1885 		if (list_empty(&work_list))
1886 			wait_for_work(tconn, &work_list);
1887 
1888 		if (signal_pending(current)) {
1889 			flush_signals(current);
1890 			if (get_t_state(thi) == RUNNING) {
1891 				conn_warn(tconn, "Worker got an unexpected signal\n");
1892 				continue;
1893 			}
1894 			break;
1895 		}
1896 
1897 		if (get_t_state(thi) != RUNNING)
1898 			break;
1899 
1900 		while (!list_empty(&work_list)) {
1901 			w = list_first_entry(&work_list, struct drbd_work, list);
1902 			list_del_init(&w->list);
1903 			if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS) == 0)
1904 				continue;
1905 			if (tconn->cstate >= C_WF_REPORT_PARAMS)
1906 				conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1907 		}
1908 	}
1909 
1910 	do {
1911 		while (!list_empty(&work_list)) {
1912 			w = list_first_entry(&work_list, struct drbd_work, list);
1913 			list_del_init(&w->list);
1914 			w->cb(w, 1);
1915 		}
1916 		dequeue_work_batch(&tconn->sender_work, &work_list);
1917 	} while (!list_empty(&work_list));
1918 
1919 	rcu_read_lock();
1920 	idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1921 		D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1922 		kref_get(&mdev->kref);
1923 		rcu_read_unlock();
1924 		drbd_mdev_cleanup(mdev);
1925 		kref_put(&mdev->kref, &drbd_minor_destroy);
1926 		rcu_read_lock();
1927 	}
1928 	rcu_read_unlock();
1929 
1930 	return 0;
1931 }
1932