xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision f7018c21)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24 */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41 
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44 
45 /* endio handlers:
46  *   drbd_md_io_complete (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   bm_async_io_complete (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57 
58 
59 /* About the global_state_lock
60    Each state transition on an device holds a read lock. In case we have
61    to evaluate the resync after dependencies, we grab a write lock, because
62    we need stable states on all devices for that.  */
63 rwlock_t global_state_lock;
64 
65 /* used for synchronous meta data and bitmap IO
66  * submitted by drbd_md_sync_page_io()
67  */
68 void drbd_md_io_complete(struct bio *bio, int error)
69 {
70 	struct drbd_md_io *md_io;
71 	struct drbd_device *device;
72 
73 	md_io = (struct drbd_md_io *)bio->bi_private;
74 	device = container_of(md_io, struct drbd_device, md_io);
75 
76 	md_io->error = error;
77 
78 	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
79 	 * to timeout on the lower level device, and eventually detach from it.
80 	 * If this io completion runs after that timeout expired, this
81 	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
82 	 * During normal operation, this only puts that extra reference
83 	 * down to 1 again.
84 	 * Make sure we first drop the reference, and only then signal
85 	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
86 	 * next drbd_md_sync_page_io(), that we trigger the
87 	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
88 	 */
89 	drbd_md_put_buffer(device);
90 	md_io->done = 1;
91 	wake_up(&device->misc_wait);
92 	bio_put(bio);
93 	if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
94 		put_ldev(device);
95 }
96 
97 /* reads on behalf of the partner,
98  * "submitted" by the receiver
99  */
100 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
101 {
102 	unsigned long flags = 0;
103 	struct drbd_peer_device *peer_device = peer_req->peer_device;
104 	struct drbd_device *device = peer_device->device;
105 
106 	spin_lock_irqsave(&device->resource->req_lock, flags);
107 	device->read_cnt += peer_req->i.size >> 9;
108 	list_del(&peer_req->w.list);
109 	if (list_empty(&device->read_ee))
110 		wake_up(&device->ee_wait);
111 	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
112 		__drbd_chk_io_error(device, DRBD_READ_ERROR);
113 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
114 
115 	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
116 	put_ldev(device);
117 }
118 
119 /* writes on behalf of the partner, or resync writes,
120  * "submitted" by the receiver, final stage.  */
121 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
122 {
123 	unsigned long flags = 0;
124 	struct drbd_peer_device *peer_device = peer_req->peer_device;
125 	struct drbd_device *device = peer_device->device;
126 	struct drbd_interval i;
127 	int do_wake;
128 	u64 block_id;
129 	int do_al_complete_io;
130 
131 	/* after we moved peer_req to done_ee,
132 	 * we may no longer access it,
133 	 * it may be freed/reused already!
134 	 * (as soon as we release the req_lock) */
135 	i = peer_req->i;
136 	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
137 	block_id = peer_req->block_id;
138 
139 	spin_lock_irqsave(&device->resource->req_lock, flags);
140 	device->writ_cnt += peer_req->i.size >> 9;
141 	list_move_tail(&peer_req->w.list, &device->done_ee);
142 
143 	/*
144 	 * Do not remove from the write_requests tree here: we did not send the
145 	 * Ack yet and did not wake possibly waiting conflicting requests.
146 	 * Removed from the tree from "drbd_process_done_ee" within the
147 	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
148 	 * _drbd_clear_done_ee.
149 	 */
150 
151 	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
152 
153 	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
154 		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
155 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
156 
157 	if (block_id == ID_SYNCER)
158 		drbd_rs_complete_io(device, i.sector);
159 
160 	if (do_wake)
161 		wake_up(&device->ee_wait);
162 
163 	if (do_al_complete_io)
164 		drbd_al_complete_io(device, &i);
165 
166 	wake_asender(peer_device->connection);
167 	put_ldev(device);
168 }
169 
170 /* writes on behalf of the partner, or resync writes,
171  * "submitted" by the receiver.
172  */
173 void drbd_peer_request_endio(struct bio *bio, int error)
174 {
175 	struct drbd_peer_request *peer_req = bio->bi_private;
176 	struct drbd_device *device = peer_req->peer_device->device;
177 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
178 	int is_write = bio_data_dir(bio) == WRITE;
179 
180 	if (error && __ratelimit(&drbd_ratelimit_state))
181 		drbd_warn(device, "%s: error=%d s=%llus\n",
182 				is_write ? "write" : "read", error,
183 				(unsigned long long)peer_req->i.sector);
184 	if (!error && !uptodate) {
185 		if (__ratelimit(&drbd_ratelimit_state))
186 			drbd_warn(device, "%s: setting error to -EIO s=%llus\n",
187 					is_write ? "write" : "read",
188 					(unsigned long long)peer_req->i.sector);
189 		/* strange behavior of some lower level drivers...
190 		 * fail the request by clearing the uptodate flag,
191 		 * but do not return any error?! */
192 		error = -EIO;
193 	}
194 
195 	if (error)
196 		set_bit(__EE_WAS_ERROR, &peer_req->flags);
197 
198 	bio_put(bio); /* no need for the bio anymore */
199 	if (atomic_dec_and_test(&peer_req->pending_bios)) {
200 		if (is_write)
201 			drbd_endio_write_sec_final(peer_req);
202 		else
203 			drbd_endio_read_sec_final(peer_req);
204 	}
205 }
206 
207 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
208  */
209 void drbd_request_endio(struct bio *bio, int error)
210 {
211 	unsigned long flags;
212 	struct drbd_request *req = bio->bi_private;
213 	struct drbd_device *device = req->device;
214 	struct bio_and_error m;
215 	enum drbd_req_event what;
216 	int uptodate = bio_flagged(bio, BIO_UPTODATE);
217 
218 	if (!error && !uptodate) {
219 		drbd_warn(device, "p %s: setting error to -EIO\n",
220 			 bio_data_dir(bio) == WRITE ? "write" : "read");
221 		/* strange behavior of some lower level drivers...
222 		 * fail the request by clearing the uptodate flag,
223 		 * but do not return any error?! */
224 		error = -EIO;
225 	}
226 
227 
228 	/* If this request was aborted locally before,
229 	 * but now was completed "successfully",
230 	 * chances are that this caused arbitrary data corruption.
231 	 *
232 	 * "aborting" requests, or force-detaching the disk, is intended for
233 	 * completely blocked/hung local backing devices which do no longer
234 	 * complete requests at all, not even do error completions.  In this
235 	 * situation, usually a hard-reset and failover is the only way out.
236 	 *
237 	 * By "aborting", basically faking a local error-completion,
238 	 * we allow for a more graceful swichover by cleanly migrating services.
239 	 * Still the affected node has to be rebooted "soon".
240 	 *
241 	 * By completing these requests, we allow the upper layers to re-use
242 	 * the associated data pages.
243 	 *
244 	 * If later the local backing device "recovers", and now DMAs some data
245 	 * from disk into the original request pages, in the best case it will
246 	 * just put random data into unused pages; but typically it will corrupt
247 	 * meanwhile completely unrelated data, causing all sorts of damage.
248 	 *
249 	 * Which means delayed successful completion,
250 	 * especially for READ requests,
251 	 * is a reason to panic().
252 	 *
253 	 * We assume that a delayed *error* completion is OK,
254 	 * though we still will complain noisily about it.
255 	 */
256 	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
257 		if (__ratelimit(&drbd_ratelimit_state))
258 			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
259 
260 		if (!error)
261 			panic("possible random memory corruption caused by delayed completion of aborted local request\n");
262 	}
263 
264 	/* to avoid recursion in __req_mod */
265 	if (unlikely(error)) {
266 		what = (bio_data_dir(bio) == WRITE)
267 			? WRITE_COMPLETED_WITH_ERROR
268 			: (bio_rw(bio) == READ)
269 			  ? READ_COMPLETED_WITH_ERROR
270 			  : READ_AHEAD_COMPLETED_WITH_ERROR;
271 	} else
272 		what = COMPLETED_OK;
273 
274 	bio_put(req->private_bio);
275 	req->private_bio = ERR_PTR(error);
276 
277 	/* not req_mod(), we need irqsave here! */
278 	spin_lock_irqsave(&device->resource->req_lock, flags);
279 	__req_mod(req, what, &m);
280 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
281 	put_ldev(device);
282 
283 	if (m.bio)
284 		complete_master_bio(device, &m);
285 }
286 
287 void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
288 {
289 	struct hash_desc desc;
290 	struct scatterlist sg;
291 	struct page *page = peer_req->pages;
292 	struct page *tmp;
293 	unsigned len;
294 
295 	desc.tfm = tfm;
296 	desc.flags = 0;
297 
298 	sg_init_table(&sg, 1);
299 	crypto_hash_init(&desc);
300 
301 	while ((tmp = page_chain_next(page))) {
302 		/* all but the last page will be fully used */
303 		sg_set_page(&sg, page, PAGE_SIZE, 0);
304 		crypto_hash_update(&desc, &sg, sg.length);
305 		page = tmp;
306 	}
307 	/* and now the last, possibly only partially used page */
308 	len = peer_req->i.size & (PAGE_SIZE - 1);
309 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
310 	crypto_hash_update(&desc, &sg, sg.length);
311 	crypto_hash_final(&desc, digest);
312 }
313 
314 void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
315 {
316 	struct hash_desc desc;
317 	struct scatterlist sg;
318 	struct bio_vec bvec;
319 	struct bvec_iter iter;
320 
321 	desc.tfm = tfm;
322 	desc.flags = 0;
323 
324 	sg_init_table(&sg, 1);
325 	crypto_hash_init(&desc);
326 
327 	bio_for_each_segment(bvec, bio, iter) {
328 		sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
329 		crypto_hash_update(&desc, &sg, sg.length);
330 	}
331 	crypto_hash_final(&desc, digest);
332 }
333 
334 /* MAYBE merge common code with w_e_end_ov_req */
335 static int w_e_send_csum(struct drbd_work *w, int cancel)
336 {
337 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
338 	struct drbd_peer_device *peer_device = peer_req->peer_device;
339 	struct drbd_device *device = peer_device->device;
340 	int digest_size;
341 	void *digest;
342 	int err = 0;
343 
344 	if (unlikely(cancel))
345 		goto out;
346 
347 	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
348 		goto out;
349 
350 	digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
351 	digest = kmalloc(digest_size, GFP_NOIO);
352 	if (digest) {
353 		sector_t sector = peer_req->i.sector;
354 		unsigned int size = peer_req->i.size;
355 		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
356 		/* Free peer_req and pages before send.
357 		 * In case we block on congestion, we could otherwise run into
358 		 * some distributed deadlock, if the other side blocks on
359 		 * congestion as well, because our receiver blocks in
360 		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
361 		drbd_free_peer_req(device, peer_req);
362 		peer_req = NULL;
363 		inc_rs_pending(device);
364 		err = drbd_send_drequest_csum(peer_device, sector, size,
365 					      digest, digest_size,
366 					      P_CSUM_RS_REQUEST);
367 		kfree(digest);
368 	} else {
369 		drbd_err(device, "kmalloc() of digest failed.\n");
370 		err = -ENOMEM;
371 	}
372 
373 out:
374 	if (peer_req)
375 		drbd_free_peer_req(device, peer_req);
376 
377 	if (unlikely(err))
378 		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
379 	return err;
380 }
381 
382 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
383 
384 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
385 {
386 	struct drbd_device *device = peer_device->device;
387 	struct drbd_peer_request *peer_req;
388 
389 	if (!get_ldev(device))
390 		return -EIO;
391 
392 	if (drbd_rs_should_slow_down(device, sector))
393 		goto defer;
394 
395 	/* GFP_TRY, because if there is no memory available right now, this may
396 	 * be rescheduled for later. It is "only" background resync, after all. */
397 	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
398 				       size, GFP_TRY);
399 	if (!peer_req)
400 		goto defer;
401 
402 	peer_req->w.cb = w_e_send_csum;
403 	spin_lock_irq(&device->resource->req_lock);
404 	list_add(&peer_req->w.list, &device->read_ee);
405 	spin_unlock_irq(&device->resource->req_lock);
406 
407 	atomic_add(size >> 9, &device->rs_sect_ev);
408 	if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
409 		return 0;
410 
411 	/* If it failed because of ENOMEM, retry should help.  If it failed
412 	 * because bio_add_page failed (probably broken lower level driver),
413 	 * retry may or may not help.
414 	 * If it does not, you may need to force disconnect. */
415 	spin_lock_irq(&device->resource->req_lock);
416 	list_del(&peer_req->w.list);
417 	spin_unlock_irq(&device->resource->req_lock);
418 
419 	drbd_free_peer_req(device, peer_req);
420 defer:
421 	put_ldev(device);
422 	return -EAGAIN;
423 }
424 
425 int w_resync_timer(struct drbd_work *w, int cancel)
426 {
427 	struct drbd_device *device =
428 		container_of(w, struct drbd_device, resync_work);
429 
430 	switch (device->state.conn) {
431 	case C_VERIFY_S:
432 		make_ov_request(device, cancel);
433 		break;
434 	case C_SYNC_TARGET:
435 		make_resync_request(device, cancel);
436 		break;
437 	}
438 
439 	return 0;
440 }
441 
442 void resync_timer_fn(unsigned long data)
443 {
444 	struct drbd_device *device = (struct drbd_device *) data;
445 
446 	if (list_empty(&device->resync_work.list))
447 		drbd_queue_work(&first_peer_device(device)->connection->sender_work,
448 				&device->resync_work);
449 }
450 
451 static void fifo_set(struct fifo_buffer *fb, int value)
452 {
453 	int i;
454 
455 	for (i = 0; i < fb->size; i++)
456 		fb->values[i] = value;
457 }
458 
459 static int fifo_push(struct fifo_buffer *fb, int value)
460 {
461 	int ov;
462 
463 	ov = fb->values[fb->head_index];
464 	fb->values[fb->head_index++] = value;
465 
466 	if (fb->head_index >= fb->size)
467 		fb->head_index = 0;
468 
469 	return ov;
470 }
471 
472 static void fifo_add_val(struct fifo_buffer *fb, int value)
473 {
474 	int i;
475 
476 	for (i = 0; i < fb->size; i++)
477 		fb->values[i] += value;
478 }
479 
480 struct fifo_buffer *fifo_alloc(int fifo_size)
481 {
482 	struct fifo_buffer *fb;
483 
484 	fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
485 	if (!fb)
486 		return NULL;
487 
488 	fb->head_index = 0;
489 	fb->size = fifo_size;
490 	fb->total = 0;
491 
492 	return fb;
493 }
494 
495 static int drbd_rs_controller(struct drbd_device *device)
496 {
497 	struct disk_conf *dc;
498 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
499 	unsigned int want;     /* The number of sectors we want in the proxy */
500 	int req_sect; /* Number of sectors to request in this turn */
501 	int correction; /* Number of sectors more we need in the proxy*/
502 	int cps; /* correction per invocation of drbd_rs_controller() */
503 	int steps; /* Number of time steps to plan ahead */
504 	int curr_corr;
505 	int max_sect;
506 	struct fifo_buffer *plan;
507 
508 	sect_in = atomic_xchg(&device->rs_sect_in, 0); /* Number of sectors that came in */
509 	device->rs_in_flight -= sect_in;
510 
511 	dc = rcu_dereference(device->ldev->disk_conf);
512 	plan = rcu_dereference(device->rs_plan_s);
513 
514 	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
515 
516 	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
517 		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
518 	} else { /* normal path */
519 		want = dc->c_fill_target ? dc->c_fill_target :
520 			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
521 	}
522 
523 	correction = want - device->rs_in_flight - plan->total;
524 
525 	/* Plan ahead */
526 	cps = correction / steps;
527 	fifo_add_val(plan, cps);
528 	plan->total += cps * steps;
529 
530 	/* What we do in this step */
531 	curr_corr = fifo_push(plan, 0);
532 	plan->total -= curr_corr;
533 
534 	req_sect = sect_in + curr_corr;
535 	if (req_sect < 0)
536 		req_sect = 0;
537 
538 	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
539 	if (req_sect > max_sect)
540 		req_sect = max_sect;
541 
542 	/*
543 	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
544 		 sect_in, device->rs_in_flight, want, correction,
545 		 steps, cps, device->rs_planed, curr_corr, req_sect);
546 	*/
547 
548 	return req_sect;
549 }
550 
551 static int drbd_rs_number_requests(struct drbd_device *device)
552 {
553 	int number;
554 
555 	rcu_read_lock();
556 	if (rcu_dereference(device->rs_plan_s)->size) {
557 		number = drbd_rs_controller(device) >> (BM_BLOCK_SHIFT - 9);
558 		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
559 	} else {
560 		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
561 		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
562 	}
563 	rcu_read_unlock();
564 
565 	/* ignore the amount of pending requests, the resync controller should
566 	 * throttle down to incoming reply rate soon enough anyways. */
567 	return number;
568 }
569 
570 static int make_resync_request(struct drbd_device *device, int cancel)
571 {
572 	unsigned long bit;
573 	sector_t sector;
574 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
575 	int max_bio_size;
576 	int number, rollback_i, size;
577 	int align, queued, sndbuf;
578 	int i = 0;
579 
580 	if (unlikely(cancel))
581 		return 0;
582 
583 	if (device->rs_total == 0) {
584 		/* empty resync? */
585 		drbd_resync_finished(device);
586 		return 0;
587 	}
588 
589 	if (!get_ldev(device)) {
590 		/* Since we only need to access device->rsync a
591 		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
592 		   to continue resync with a broken disk makes no sense at
593 		   all */
594 		drbd_err(device, "Disk broke down during resync!\n");
595 		return 0;
596 	}
597 
598 	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
599 	number = drbd_rs_number_requests(device);
600 	if (number == 0)
601 		goto requeue;
602 
603 	for (i = 0; i < number; i++) {
604 		/* Stop generating RS requests, when half of the send buffer is filled */
605 		mutex_lock(&first_peer_device(device)->connection->data.mutex);
606 		if (first_peer_device(device)->connection->data.socket) {
607 			queued = first_peer_device(device)->connection->data.socket->sk->sk_wmem_queued;
608 			sndbuf = first_peer_device(device)->connection->data.socket->sk->sk_sndbuf;
609 		} else {
610 			queued = 1;
611 			sndbuf = 0;
612 		}
613 		mutex_unlock(&first_peer_device(device)->connection->data.mutex);
614 		if (queued > sndbuf / 2)
615 			goto requeue;
616 
617 next_sector:
618 		size = BM_BLOCK_SIZE;
619 		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
620 
621 		if (bit == DRBD_END_OF_BITMAP) {
622 			device->bm_resync_fo = drbd_bm_bits(device);
623 			put_ldev(device);
624 			return 0;
625 		}
626 
627 		sector = BM_BIT_TO_SECT(bit);
628 
629 		if (drbd_rs_should_slow_down(device, sector) ||
630 		    drbd_try_rs_begin_io(device, sector)) {
631 			device->bm_resync_fo = bit;
632 			goto requeue;
633 		}
634 		device->bm_resync_fo = bit + 1;
635 
636 		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
637 			drbd_rs_complete_io(device, sector);
638 			goto next_sector;
639 		}
640 
641 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
642 		/* try to find some adjacent bits.
643 		 * we stop if we have already the maximum req size.
644 		 *
645 		 * Additionally always align bigger requests, in order to
646 		 * be prepared for all stripe sizes of software RAIDs.
647 		 */
648 		align = 1;
649 		rollback_i = i;
650 		for (;;) {
651 			if (size + BM_BLOCK_SIZE > max_bio_size)
652 				break;
653 
654 			/* Be always aligned */
655 			if (sector & ((1<<(align+3))-1))
656 				break;
657 
658 			/* do not cross extent boundaries */
659 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
660 				break;
661 			/* now, is it actually dirty, after all?
662 			 * caution, drbd_bm_test_bit is tri-state for some
663 			 * obscure reason; ( b == 0 ) would get the out-of-band
664 			 * only accidentally right because of the "oddly sized"
665 			 * adjustment below */
666 			if (drbd_bm_test_bit(device, bit+1) != 1)
667 				break;
668 			bit++;
669 			size += BM_BLOCK_SIZE;
670 			if ((BM_BLOCK_SIZE << align) <= size)
671 				align++;
672 			i++;
673 		}
674 		/* if we merged some,
675 		 * reset the offset to start the next drbd_bm_find_next from */
676 		if (size > BM_BLOCK_SIZE)
677 			device->bm_resync_fo = bit + 1;
678 #endif
679 
680 		/* adjust very last sectors, in case we are oddly sized */
681 		if (sector + (size>>9) > capacity)
682 			size = (capacity-sector)<<9;
683 		if (first_peer_device(device)->connection->agreed_pro_version >= 89 &&
684 		    first_peer_device(device)->connection->csums_tfm) {
685 			switch (read_for_csum(first_peer_device(device), sector, size)) {
686 			case -EIO: /* Disk failure */
687 				put_ldev(device);
688 				return -EIO;
689 			case -EAGAIN: /* allocation failed, or ldev busy */
690 				drbd_rs_complete_io(device, sector);
691 				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
692 				i = rollback_i;
693 				goto requeue;
694 			case 0:
695 				/* everything ok */
696 				break;
697 			default:
698 				BUG();
699 			}
700 		} else {
701 			int err;
702 
703 			inc_rs_pending(device);
704 			err = drbd_send_drequest(first_peer_device(device), P_RS_DATA_REQUEST,
705 						 sector, size, ID_SYNCER);
706 			if (err) {
707 				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
708 				dec_rs_pending(device);
709 				put_ldev(device);
710 				return err;
711 			}
712 		}
713 	}
714 
715 	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
716 		/* last syncer _request_ was sent,
717 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
718 		 * next sync group will resume), as soon as we receive the last
719 		 * resync data block, and the last bit is cleared.
720 		 * until then resync "work" is "inactive" ...
721 		 */
722 		put_ldev(device);
723 		return 0;
724 	}
725 
726  requeue:
727 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
728 	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
729 	put_ldev(device);
730 	return 0;
731 }
732 
733 static int make_ov_request(struct drbd_device *device, int cancel)
734 {
735 	int number, i, size;
736 	sector_t sector;
737 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
738 	bool stop_sector_reached = false;
739 
740 	if (unlikely(cancel))
741 		return 1;
742 
743 	number = drbd_rs_number_requests(device);
744 
745 	sector = device->ov_position;
746 	for (i = 0; i < number; i++) {
747 		if (sector >= capacity)
748 			return 1;
749 
750 		/* We check for "finished" only in the reply path:
751 		 * w_e_end_ov_reply().
752 		 * We need to send at least one request out. */
753 		stop_sector_reached = i > 0
754 			&& verify_can_do_stop_sector(device)
755 			&& sector >= device->ov_stop_sector;
756 		if (stop_sector_reached)
757 			break;
758 
759 		size = BM_BLOCK_SIZE;
760 
761 		if (drbd_rs_should_slow_down(device, sector) ||
762 		    drbd_try_rs_begin_io(device, sector)) {
763 			device->ov_position = sector;
764 			goto requeue;
765 		}
766 
767 		if (sector + (size>>9) > capacity)
768 			size = (capacity-sector)<<9;
769 
770 		inc_rs_pending(device);
771 		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
772 			dec_rs_pending(device);
773 			return 0;
774 		}
775 		sector += BM_SECT_PER_BIT;
776 	}
777 	device->ov_position = sector;
778 
779  requeue:
780 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
781 	if (i == 0 || !stop_sector_reached)
782 		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
783 	return 1;
784 }
785 
786 int w_ov_finished(struct drbd_work *w, int cancel)
787 {
788 	struct drbd_device_work *dw =
789 		container_of(w, struct drbd_device_work, w);
790 	struct drbd_device *device = dw->device;
791 	kfree(dw);
792 	ov_out_of_sync_print(device);
793 	drbd_resync_finished(device);
794 
795 	return 0;
796 }
797 
798 static int w_resync_finished(struct drbd_work *w, int cancel)
799 {
800 	struct drbd_device_work *dw =
801 		container_of(w, struct drbd_device_work, w);
802 	struct drbd_device *device = dw->device;
803 	kfree(dw);
804 
805 	drbd_resync_finished(device);
806 
807 	return 0;
808 }
809 
810 static void ping_peer(struct drbd_device *device)
811 {
812 	struct drbd_connection *connection = first_peer_device(device)->connection;
813 
814 	clear_bit(GOT_PING_ACK, &connection->flags);
815 	request_ping(connection);
816 	wait_event(connection->ping_wait,
817 		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
818 }
819 
820 int drbd_resync_finished(struct drbd_device *device)
821 {
822 	unsigned long db, dt, dbdt;
823 	unsigned long n_oos;
824 	union drbd_state os, ns;
825 	struct drbd_device_work *dw;
826 	char *khelper_cmd = NULL;
827 	int verify_done = 0;
828 
829 	/* Remove all elements from the resync LRU. Since future actions
830 	 * might set bits in the (main) bitmap, then the entries in the
831 	 * resync LRU would be wrong. */
832 	if (drbd_rs_del_all(device)) {
833 		/* In case this is not possible now, most probably because
834 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
835 		 * queue (or even the read operations for those packets
836 		 * is not finished by now).   Retry in 100ms. */
837 
838 		schedule_timeout_interruptible(HZ / 10);
839 		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
840 		if (dw) {
841 			dw->w.cb = w_resync_finished;
842 			dw->device = device;
843 			drbd_queue_work(&first_peer_device(device)->connection->sender_work,
844 					&dw->w);
845 			return 1;
846 		}
847 		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
848 	}
849 
850 	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
851 	if (dt <= 0)
852 		dt = 1;
853 
854 	db = device->rs_total;
855 	/* adjust for verify start and stop sectors, respective reached position */
856 	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
857 		db -= device->ov_left;
858 
859 	dbdt = Bit2KB(db/dt);
860 	device->rs_paused /= HZ;
861 
862 	if (!get_ldev(device))
863 		goto out;
864 
865 	ping_peer(device);
866 
867 	spin_lock_irq(&device->resource->req_lock);
868 	os = drbd_read_state(device);
869 
870 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
871 
872 	/* This protects us against multiple calls (that can happen in the presence
873 	   of application IO), and against connectivity loss just before we arrive here. */
874 	if (os.conn <= C_CONNECTED)
875 		goto out_unlock;
876 
877 	ns = os;
878 	ns.conn = C_CONNECTED;
879 
880 	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
881 	     verify_done ? "Online verify" : "Resync",
882 	     dt + device->rs_paused, device->rs_paused, dbdt);
883 
884 	n_oos = drbd_bm_total_weight(device);
885 
886 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
887 		if (n_oos) {
888 			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
889 			      n_oos, Bit2KB(1));
890 			khelper_cmd = "out-of-sync";
891 		}
892 	} else {
893 		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
894 
895 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
896 			khelper_cmd = "after-resync-target";
897 
898 		if (first_peer_device(device)->connection->csums_tfm && device->rs_total) {
899 			const unsigned long s = device->rs_same_csum;
900 			const unsigned long t = device->rs_total;
901 			const int ratio =
902 				(t == 0)     ? 0 :
903 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
904 			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
905 			     "transferred %luK total %luK\n",
906 			     ratio,
907 			     Bit2KB(device->rs_same_csum),
908 			     Bit2KB(device->rs_total - device->rs_same_csum),
909 			     Bit2KB(device->rs_total));
910 		}
911 	}
912 
913 	if (device->rs_failed) {
914 		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
915 
916 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
917 			ns.disk = D_INCONSISTENT;
918 			ns.pdsk = D_UP_TO_DATE;
919 		} else {
920 			ns.disk = D_UP_TO_DATE;
921 			ns.pdsk = D_INCONSISTENT;
922 		}
923 	} else {
924 		ns.disk = D_UP_TO_DATE;
925 		ns.pdsk = D_UP_TO_DATE;
926 
927 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
928 			if (device->p_uuid) {
929 				int i;
930 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
931 					_drbd_uuid_set(device, i, device->p_uuid[i]);
932 				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
933 				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
934 			} else {
935 				drbd_err(device, "device->p_uuid is NULL! BUG\n");
936 			}
937 		}
938 
939 		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
940 			/* for verify runs, we don't update uuids here,
941 			 * so there would be nothing to report. */
942 			drbd_uuid_set_bm(device, 0UL);
943 			drbd_print_uuids(device, "updated UUIDs");
944 			if (device->p_uuid) {
945 				/* Now the two UUID sets are equal, update what we
946 				 * know of the peer. */
947 				int i;
948 				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
949 					device->p_uuid[i] = device->ldev->md.uuid[i];
950 			}
951 		}
952 	}
953 
954 	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
955 out_unlock:
956 	spin_unlock_irq(&device->resource->req_lock);
957 	put_ldev(device);
958 out:
959 	device->rs_total  = 0;
960 	device->rs_failed = 0;
961 	device->rs_paused = 0;
962 
963 	/* reset start sector, if we reached end of device */
964 	if (verify_done && device->ov_left == 0)
965 		device->ov_start_sector = 0;
966 
967 	drbd_md_sync(device);
968 
969 	if (khelper_cmd)
970 		drbd_khelper(device, khelper_cmd);
971 
972 	return 1;
973 }
974 
975 /* helper */
976 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
977 {
978 	if (drbd_peer_req_has_active_page(peer_req)) {
979 		/* This might happen if sendpage() has not finished */
980 		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
981 		atomic_add(i, &device->pp_in_use_by_net);
982 		atomic_sub(i, &device->pp_in_use);
983 		spin_lock_irq(&device->resource->req_lock);
984 		list_add_tail(&peer_req->w.list, &device->net_ee);
985 		spin_unlock_irq(&device->resource->req_lock);
986 		wake_up(&drbd_pp_wait);
987 	} else
988 		drbd_free_peer_req(device, peer_req);
989 }
990 
991 /**
992  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
993  * @device:	DRBD device.
994  * @w:		work object.
995  * @cancel:	The connection will be closed anyways
996  */
997 int w_e_end_data_req(struct drbd_work *w, int cancel)
998 {
999 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1000 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1001 	struct drbd_device *device = peer_device->device;
1002 	int err;
1003 
1004 	if (unlikely(cancel)) {
1005 		drbd_free_peer_req(device, peer_req);
1006 		dec_unacked(device);
1007 		return 0;
1008 	}
1009 
1010 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1011 		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1012 	} else {
1013 		if (__ratelimit(&drbd_ratelimit_state))
1014 			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1015 			    (unsigned long long)peer_req->i.sector);
1016 
1017 		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1018 	}
1019 
1020 	dec_unacked(device);
1021 
1022 	move_to_net_ee_or_free(device, peer_req);
1023 
1024 	if (unlikely(err))
1025 		drbd_err(device, "drbd_send_block() failed\n");
1026 	return err;
1027 }
1028 
1029 /**
1030  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1031  * @w:		work object.
1032  * @cancel:	The connection will be closed anyways
1033  */
1034 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1035 {
1036 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1037 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1038 	struct drbd_device *device = peer_device->device;
1039 	int err;
1040 
1041 	if (unlikely(cancel)) {
1042 		drbd_free_peer_req(device, peer_req);
1043 		dec_unacked(device);
1044 		return 0;
1045 	}
1046 
1047 	if (get_ldev_if_state(device, D_FAILED)) {
1048 		drbd_rs_complete_io(device, peer_req->i.sector);
1049 		put_ldev(device);
1050 	}
1051 
1052 	if (device->state.conn == C_AHEAD) {
1053 		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1054 	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1055 		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1056 			inc_rs_pending(device);
1057 			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1058 		} else {
1059 			if (__ratelimit(&drbd_ratelimit_state))
1060 				drbd_err(device, "Not sending RSDataReply, "
1061 				    "partner DISKLESS!\n");
1062 			err = 0;
1063 		}
1064 	} else {
1065 		if (__ratelimit(&drbd_ratelimit_state))
1066 			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1067 			    (unsigned long long)peer_req->i.sector);
1068 
1069 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1070 
1071 		/* update resync data with failure */
1072 		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1073 	}
1074 
1075 	dec_unacked(device);
1076 
1077 	move_to_net_ee_or_free(device, peer_req);
1078 
1079 	if (unlikely(err))
1080 		drbd_err(device, "drbd_send_block() failed\n");
1081 	return err;
1082 }
1083 
1084 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1085 {
1086 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1087 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1088 	struct drbd_device *device = peer_device->device;
1089 	struct digest_info *di;
1090 	int digest_size;
1091 	void *digest = NULL;
1092 	int err, eq = 0;
1093 
1094 	if (unlikely(cancel)) {
1095 		drbd_free_peer_req(device, peer_req);
1096 		dec_unacked(device);
1097 		return 0;
1098 	}
1099 
1100 	if (get_ldev(device)) {
1101 		drbd_rs_complete_io(device, peer_req->i.sector);
1102 		put_ldev(device);
1103 	}
1104 
1105 	di = peer_req->digest;
1106 
1107 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1108 		/* quick hack to try to avoid a race against reconfiguration.
1109 		 * a real fix would be much more involved,
1110 		 * introducing more locking mechanisms */
1111 		if (peer_device->connection->csums_tfm) {
1112 			digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1113 			D_ASSERT(device, digest_size == di->digest_size);
1114 			digest = kmalloc(digest_size, GFP_NOIO);
1115 		}
1116 		if (digest) {
1117 			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1118 			eq = !memcmp(digest, di->digest, digest_size);
1119 			kfree(digest);
1120 		}
1121 
1122 		if (eq) {
1123 			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1124 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1125 			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1126 			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1127 		} else {
1128 			inc_rs_pending(device);
1129 			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1130 			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1131 			kfree(di);
1132 			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1133 		}
1134 	} else {
1135 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1136 		if (__ratelimit(&drbd_ratelimit_state))
1137 			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1138 	}
1139 
1140 	dec_unacked(device);
1141 	move_to_net_ee_or_free(device, peer_req);
1142 
1143 	if (unlikely(err))
1144 		drbd_err(device, "drbd_send_block/ack() failed\n");
1145 	return err;
1146 }
1147 
1148 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1149 {
1150 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1151 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1152 	struct drbd_device *device = peer_device->device;
1153 	sector_t sector = peer_req->i.sector;
1154 	unsigned int size = peer_req->i.size;
1155 	int digest_size;
1156 	void *digest;
1157 	int err = 0;
1158 
1159 	if (unlikely(cancel))
1160 		goto out;
1161 
1162 	digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1163 	digest = kmalloc(digest_size, GFP_NOIO);
1164 	if (!digest) {
1165 		err = 1;	/* terminate the connection in case the allocation failed */
1166 		goto out;
1167 	}
1168 
1169 	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1170 		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1171 	else
1172 		memset(digest, 0, digest_size);
1173 
1174 	/* Free e and pages before send.
1175 	 * In case we block on congestion, we could otherwise run into
1176 	 * some distributed deadlock, if the other side blocks on
1177 	 * congestion as well, because our receiver blocks in
1178 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1179 	drbd_free_peer_req(device, peer_req);
1180 	peer_req = NULL;
1181 	inc_rs_pending(device);
1182 	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1183 	if (err)
1184 		dec_rs_pending(device);
1185 	kfree(digest);
1186 
1187 out:
1188 	if (peer_req)
1189 		drbd_free_peer_req(device, peer_req);
1190 	dec_unacked(device);
1191 	return err;
1192 }
1193 
1194 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1195 {
1196 	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1197 		device->ov_last_oos_size += size>>9;
1198 	} else {
1199 		device->ov_last_oos_start = sector;
1200 		device->ov_last_oos_size = size>>9;
1201 	}
1202 	drbd_set_out_of_sync(device, sector, size);
1203 }
1204 
1205 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1206 {
1207 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1208 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1209 	struct drbd_device *device = peer_device->device;
1210 	struct digest_info *di;
1211 	void *digest;
1212 	sector_t sector = peer_req->i.sector;
1213 	unsigned int size = peer_req->i.size;
1214 	int digest_size;
1215 	int err, eq = 0;
1216 	bool stop_sector_reached = false;
1217 
1218 	if (unlikely(cancel)) {
1219 		drbd_free_peer_req(device, peer_req);
1220 		dec_unacked(device);
1221 		return 0;
1222 	}
1223 
1224 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1225 	 * the resync lru has been cleaned up already */
1226 	if (get_ldev(device)) {
1227 		drbd_rs_complete_io(device, peer_req->i.sector);
1228 		put_ldev(device);
1229 	}
1230 
1231 	di = peer_req->digest;
1232 
1233 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1234 		digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1235 		digest = kmalloc(digest_size, GFP_NOIO);
1236 		if (digest) {
1237 			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1238 
1239 			D_ASSERT(device, digest_size == di->digest_size);
1240 			eq = !memcmp(digest, di->digest, digest_size);
1241 			kfree(digest);
1242 		}
1243 	}
1244 
1245 	/* Free peer_req and pages before send.
1246 	 * In case we block on congestion, we could otherwise run into
1247 	 * some distributed deadlock, if the other side blocks on
1248 	 * congestion as well, because our receiver blocks in
1249 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1250 	drbd_free_peer_req(device, peer_req);
1251 	if (!eq)
1252 		drbd_ov_out_of_sync_found(device, sector, size);
1253 	else
1254 		ov_out_of_sync_print(device);
1255 
1256 	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1257 			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1258 
1259 	dec_unacked(device);
1260 
1261 	--device->ov_left;
1262 
1263 	/* let's advance progress step marks only for every other megabyte */
1264 	if ((device->ov_left & 0x200) == 0x200)
1265 		drbd_advance_rs_marks(device, device->ov_left);
1266 
1267 	stop_sector_reached = verify_can_do_stop_sector(device) &&
1268 		(sector + (size>>9)) >= device->ov_stop_sector;
1269 
1270 	if (device->ov_left == 0 || stop_sector_reached) {
1271 		ov_out_of_sync_print(device);
1272 		drbd_resync_finished(device);
1273 	}
1274 
1275 	return err;
1276 }
1277 
1278 /* FIXME
1279  * We need to track the number of pending barrier acks,
1280  * and to be able to wait for them.
1281  * See also comment in drbd_adm_attach before drbd_suspend_io.
1282  */
1283 static int drbd_send_barrier(struct drbd_connection *connection)
1284 {
1285 	struct p_barrier *p;
1286 	struct drbd_socket *sock;
1287 
1288 	sock = &connection->data;
1289 	p = conn_prepare_command(connection, sock);
1290 	if (!p)
1291 		return -EIO;
1292 	p->barrier = connection->send.current_epoch_nr;
1293 	p->pad = 0;
1294 	connection->send.current_epoch_writes = 0;
1295 
1296 	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1297 }
1298 
1299 int w_send_write_hint(struct drbd_work *w, int cancel)
1300 {
1301 	struct drbd_device *device =
1302 		container_of(w, struct drbd_device, unplug_work);
1303 	struct drbd_socket *sock;
1304 
1305 	if (cancel)
1306 		return 0;
1307 	sock = &first_peer_device(device)->connection->data;
1308 	if (!drbd_prepare_command(first_peer_device(device), sock))
1309 		return -EIO;
1310 	return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1311 }
1312 
1313 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1314 {
1315 	if (!connection->send.seen_any_write_yet) {
1316 		connection->send.seen_any_write_yet = true;
1317 		connection->send.current_epoch_nr = epoch;
1318 		connection->send.current_epoch_writes = 0;
1319 	}
1320 }
1321 
1322 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1323 {
1324 	/* re-init if first write on this connection */
1325 	if (!connection->send.seen_any_write_yet)
1326 		return;
1327 	if (connection->send.current_epoch_nr != epoch) {
1328 		if (connection->send.current_epoch_writes)
1329 			drbd_send_barrier(connection);
1330 		connection->send.current_epoch_nr = epoch;
1331 	}
1332 }
1333 
1334 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1335 {
1336 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1337 	struct drbd_device *device = req->device;
1338 	struct drbd_connection *connection = first_peer_device(device)->connection;
1339 	int err;
1340 
1341 	if (unlikely(cancel)) {
1342 		req_mod(req, SEND_CANCELED);
1343 		return 0;
1344 	}
1345 
1346 	/* this time, no connection->send.current_epoch_writes++;
1347 	 * If it was sent, it was the closing barrier for the last
1348 	 * replicated epoch, before we went into AHEAD mode.
1349 	 * No more barriers will be sent, until we leave AHEAD mode again. */
1350 	maybe_send_barrier(connection, req->epoch);
1351 
1352 	err = drbd_send_out_of_sync(first_peer_device(device), req);
1353 	req_mod(req, OOS_HANDED_TO_NETWORK);
1354 
1355 	return err;
1356 }
1357 
1358 /**
1359  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1360  * @w:		work object.
1361  * @cancel:	The connection will be closed anyways
1362  */
1363 int w_send_dblock(struct drbd_work *w, int cancel)
1364 {
1365 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1366 	struct drbd_device *device = req->device;
1367 	struct drbd_connection *connection = first_peer_device(device)->connection;
1368 	int err;
1369 
1370 	if (unlikely(cancel)) {
1371 		req_mod(req, SEND_CANCELED);
1372 		return 0;
1373 	}
1374 
1375 	re_init_if_first_write(connection, req->epoch);
1376 	maybe_send_barrier(connection, req->epoch);
1377 	connection->send.current_epoch_writes++;
1378 
1379 	err = drbd_send_dblock(first_peer_device(device), req);
1380 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1381 
1382 	return err;
1383 }
1384 
1385 /**
1386  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1387  * @w:		work object.
1388  * @cancel:	The connection will be closed anyways
1389  */
1390 int w_send_read_req(struct drbd_work *w, int cancel)
1391 {
1392 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1393 	struct drbd_device *device = req->device;
1394 	struct drbd_connection *connection = first_peer_device(device)->connection;
1395 	int err;
1396 
1397 	if (unlikely(cancel)) {
1398 		req_mod(req, SEND_CANCELED);
1399 		return 0;
1400 	}
1401 
1402 	/* Even read requests may close a write epoch,
1403 	 * if there was any yet. */
1404 	maybe_send_barrier(connection, req->epoch);
1405 
1406 	err = drbd_send_drequest(first_peer_device(device), P_DATA_REQUEST, req->i.sector, req->i.size,
1407 				 (unsigned long)req);
1408 
1409 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1410 
1411 	return err;
1412 }
1413 
1414 int w_restart_disk_io(struct drbd_work *w, int cancel)
1415 {
1416 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1417 	struct drbd_device *device = req->device;
1418 
1419 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1420 		drbd_al_begin_io(device, &req->i, false);
1421 
1422 	drbd_req_make_private_bio(req, req->master_bio);
1423 	req->private_bio->bi_bdev = device->ldev->backing_bdev;
1424 	generic_make_request(req->private_bio);
1425 
1426 	return 0;
1427 }
1428 
1429 static int _drbd_may_sync_now(struct drbd_device *device)
1430 {
1431 	struct drbd_device *odev = device;
1432 	int resync_after;
1433 
1434 	while (1) {
1435 		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1436 			return 1;
1437 		rcu_read_lock();
1438 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1439 		rcu_read_unlock();
1440 		if (resync_after == -1)
1441 			return 1;
1442 		odev = minor_to_device(resync_after);
1443 		if (!odev)
1444 			return 1;
1445 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1446 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1447 		    odev->state.aftr_isp || odev->state.peer_isp ||
1448 		    odev->state.user_isp)
1449 			return 0;
1450 	}
1451 }
1452 
1453 /**
1454  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1455  * @device:	DRBD device.
1456  *
1457  * Called from process context only (admin command and after_state_ch).
1458  */
1459 static int _drbd_pause_after(struct drbd_device *device)
1460 {
1461 	struct drbd_device *odev;
1462 	int i, rv = 0;
1463 
1464 	rcu_read_lock();
1465 	idr_for_each_entry(&drbd_devices, odev, i) {
1466 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1467 			continue;
1468 		if (!_drbd_may_sync_now(odev))
1469 			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1470 			       != SS_NOTHING_TO_DO);
1471 	}
1472 	rcu_read_unlock();
1473 
1474 	return rv;
1475 }
1476 
1477 /**
1478  * _drbd_resume_next() - Resume resync on all devices that may resync now
1479  * @device:	DRBD device.
1480  *
1481  * Called from process context only (admin command and worker).
1482  */
1483 static int _drbd_resume_next(struct drbd_device *device)
1484 {
1485 	struct drbd_device *odev;
1486 	int i, rv = 0;
1487 
1488 	rcu_read_lock();
1489 	idr_for_each_entry(&drbd_devices, odev, i) {
1490 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1491 			continue;
1492 		if (odev->state.aftr_isp) {
1493 			if (_drbd_may_sync_now(odev))
1494 				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1495 							CS_HARD, NULL)
1496 				       != SS_NOTHING_TO_DO) ;
1497 		}
1498 	}
1499 	rcu_read_unlock();
1500 	return rv;
1501 }
1502 
1503 void resume_next_sg(struct drbd_device *device)
1504 {
1505 	write_lock_irq(&global_state_lock);
1506 	_drbd_resume_next(device);
1507 	write_unlock_irq(&global_state_lock);
1508 }
1509 
1510 void suspend_other_sg(struct drbd_device *device)
1511 {
1512 	write_lock_irq(&global_state_lock);
1513 	_drbd_pause_after(device);
1514 	write_unlock_irq(&global_state_lock);
1515 }
1516 
1517 /* caller must hold global_state_lock */
1518 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1519 {
1520 	struct drbd_device *odev;
1521 	int resync_after;
1522 
1523 	if (o_minor == -1)
1524 		return NO_ERROR;
1525 	if (o_minor < -1 || o_minor > MINORMASK)
1526 		return ERR_RESYNC_AFTER;
1527 
1528 	/* check for loops */
1529 	odev = minor_to_device(o_minor);
1530 	while (1) {
1531 		if (odev == device)
1532 			return ERR_RESYNC_AFTER_CYCLE;
1533 
1534 		/* You are free to depend on diskless, non-existing,
1535 		 * or not yet/no longer existing minors.
1536 		 * We only reject dependency loops.
1537 		 * We cannot follow the dependency chain beyond a detached or
1538 		 * missing minor.
1539 		 */
1540 		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1541 			return NO_ERROR;
1542 
1543 		rcu_read_lock();
1544 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1545 		rcu_read_unlock();
1546 		/* dependency chain ends here, no cycles. */
1547 		if (resync_after == -1)
1548 			return NO_ERROR;
1549 
1550 		/* follow the dependency chain */
1551 		odev = minor_to_device(resync_after);
1552 	}
1553 }
1554 
1555 /* caller must hold global_state_lock */
1556 void drbd_resync_after_changed(struct drbd_device *device)
1557 {
1558 	int changes;
1559 
1560 	do {
1561 		changes  = _drbd_pause_after(device);
1562 		changes |= _drbd_resume_next(device);
1563 	} while (changes);
1564 }
1565 
1566 void drbd_rs_controller_reset(struct drbd_device *device)
1567 {
1568 	struct fifo_buffer *plan;
1569 
1570 	atomic_set(&device->rs_sect_in, 0);
1571 	atomic_set(&device->rs_sect_ev, 0);
1572 	device->rs_in_flight = 0;
1573 
1574 	/* Updating the RCU protected object in place is necessary since
1575 	   this function gets called from atomic context.
1576 	   It is valid since all other updates also lead to an completely
1577 	   empty fifo */
1578 	rcu_read_lock();
1579 	plan = rcu_dereference(device->rs_plan_s);
1580 	plan->total = 0;
1581 	fifo_set(plan, 0);
1582 	rcu_read_unlock();
1583 }
1584 
1585 void start_resync_timer_fn(unsigned long data)
1586 {
1587 	struct drbd_device *device = (struct drbd_device *) data;
1588 
1589 	drbd_queue_work(&first_peer_device(device)->connection->sender_work,
1590 			&device->start_resync_work);
1591 }
1592 
1593 int w_start_resync(struct drbd_work *w, int cancel)
1594 {
1595 	struct drbd_device *device =
1596 		container_of(w, struct drbd_device, start_resync_work);
1597 
1598 	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1599 		drbd_warn(device, "w_start_resync later...\n");
1600 		device->start_resync_timer.expires = jiffies + HZ/10;
1601 		add_timer(&device->start_resync_timer);
1602 		return 0;
1603 	}
1604 
1605 	drbd_start_resync(device, C_SYNC_SOURCE);
1606 	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1607 	return 0;
1608 }
1609 
1610 /**
1611  * drbd_start_resync() - Start the resync process
1612  * @device:	DRBD device.
1613  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1614  *
1615  * This function might bring you directly into one of the
1616  * C_PAUSED_SYNC_* states.
1617  */
1618 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1619 {
1620 	union drbd_state ns;
1621 	int r;
1622 
1623 	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1624 		drbd_err(device, "Resync already running!\n");
1625 		return;
1626 	}
1627 
1628 	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1629 		if (side == C_SYNC_TARGET) {
1630 			/* Since application IO was locked out during C_WF_BITMAP_T and
1631 			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1632 			   we check that we might make the data inconsistent. */
1633 			r = drbd_khelper(device, "before-resync-target");
1634 			r = (r >> 8) & 0xff;
1635 			if (r > 0) {
1636 				drbd_info(device, "before-resync-target handler returned %d, "
1637 					 "dropping connection.\n", r);
1638 				conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD);
1639 				return;
1640 			}
1641 		} else /* C_SYNC_SOURCE */ {
1642 			r = drbd_khelper(device, "before-resync-source");
1643 			r = (r >> 8) & 0xff;
1644 			if (r > 0) {
1645 				if (r == 3) {
1646 					drbd_info(device, "before-resync-source handler returned %d, "
1647 						 "ignoring. Old userland tools?", r);
1648 				} else {
1649 					drbd_info(device, "before-resync-source handler returned %d, "
1650 						 "dropping connection.\n", r);
1651 					conn_request_state(first_peer_device(device)->connection,
1652 							   NS(conn, C_DISCONNECTING), CS_HARD);
1653 					return;
1654 				}
1655 			}
1656 		}
1657 	}
1658 
1659 	if (current == first_peer_device(device)->connection->worker.task) {
1660 		/* The worker should not sleep waiting for state_mutex,
1661 		   that can take long */
1662 		if (!mutex_trylock(device->state_mutex)) {
1663 			set_bit(B_RS_H_DONE, &device->flags);
1664 			device->start_resync_timer.expires = jiffies + HZ/5;
1665 			add_timer(&device->start_resync_timer);
1666 			return;
1667 		}
1668 	} else {
1669 		mutex_lock(device->state_mutex);
1670 	}
1671 	clear_bit(B_RS_H_DONE, &device->flags);
1672 
1673 	write_lock_irq(&global_state_lock);
1674 	/* Did some connection breakage or IO error race with us? */
1675 	if (device->state.conn < C_CONNECTED
1676 	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1677 		write_unlock_irq(&global_state_lock);
1678 		mutex_unlock(device->state_mutex);
1679 		return;
1680 	}
1681 
1682 	ns = drbd_read_state(device);
1683 
1684 	ns.aftr_isp = !_drbd_may_sync_now(device);
1685 
1686 	ns.conn = side;
1687 
1688 	if (side == C_SYNC_TARGET)
1689 		ns.disk = D_INCONSISTENT;
1690 	else /* side == C_SYNC_SOURCE */
1691 		ns.pdsk = D_INCONSISTENT;
1692 
1693 	r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
1694 	ns = drbd_read_state(device);
1695 
1696 	if (ns.conn < C_CONNECTED)
1697 		r = SS_UNKNOWN_ERROR;
1698 
1699 	if (r == SS_SUCCESS) {
1700 		unsigned long tw = drbd_bm_total_weight(device);
1701 		unsigned long now = jiffies;
1702 		int i;
1703 
1704 		device->rs_failed    = 0;
1705 		device->rs_paused    = 0;
1706 		device->rs_same_csum = 0;
1707 		device->rs_last_events = 0;
1708 		device->rs_last_sect_ev = 0;
1709 		device->rs_total     = tw;
1710 		device->rs_start     = now;
1711 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1712 			device->rs_mark_left[i] = tw;
1713 			device->rs_mark_time[i] = now;
1714 		}
1715 		_drbd_pause_after(device);
1716 	}
1717 	write_unlock_irq(&global_state_lock);
1718 
1719 	if (r == SS_SUCCESS) {
1720 		/* reset rs_last_bcast when a resync or verify is started,
1721 		 * to deal with potential jiffies wrap. */
1722 		device->rs_last_bcast = jiffies - HZ;
1723 
1724 		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1725 		     drbd_conn_str(ns.conn),
1726 		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1727 		     (unsigned long) device->rs_total);
1728 		if (side == C_SYNC_TARGET)
1729 			device->bm_resync_fo = 0;
1730 
1731 		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1732 		 * with w_send_oos, or the sync target will get confused as to
1733 		 * how much bits to resync.  We cannot do that always, because for an
1734 		 * empty resync and protocol < 95, we need to do it here, as we call
1735 		 * drbd_resync_finished from here in that case.
1736 		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1737 		 * and from after_state_ch otherwise. */
1738 		if (side == C_SYNC_SOURCE &&
1739 		    first_peer_device(device)->connection->agreed_pro_version < 96)
1740 			drbd_gen_and_send_sync_uuid(first_peer_device(device));
1741 
1742 		if (first_peer_device(device)->connection->agreed_pro_version < 95 &&
1743 		    device->rs_total == 0) {
1744 			/* This still has a race (about when exactly the peers
1745 			 * detect connection loss) that can lead to a full sync
1746 			 * on next handshake. In 8.3.9 we fixed this with explicit
1747 			 * resync-finished notifications, but the fix
1748 			 * introduces a protocol change.  Sleeping for some
1749 			 * time longer than the ping interval + timeout on the
1750 			 * SyncSource, to give the SyncTarget the chance to
1751 			 * detect connection loss, then waiting for a ping
1752 			 * response (implicit in drbd_resync_finished) reduces
1753 			 * the race considerably, but does not solve it. */
1754 			if (side == C_SYNC_SOURCE) {
1755 				struct net_conf *nc;
1756 				int timeo;
1757 
1758 				rcu_read_lock();
1759 				nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
1760 				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1761 				rcu_read_unlock();
1762 				schedule_timeout_interruptible(timeo);
1763 			}
1764 			drbd_resync_finished(device);
1765 		}
1766 
1767 		drbd_rs_controller_reset(device);
1768 		/* ns.conn may already be != device->state.conn,
1769 		 * we may have been paused in between, or become paused until
1770 		 * the timer triggers.
1771 		 * No matter, that is handled in resync_timer_fn() */
1772 		if (ns.conn == C_SYNC_TARGET)
1773 			mod_timer(&device->resync_timer, jiffies);
1774 
1775 		drbd_md_sync(device);
1776 	}
1777 	put_ldev(device);
1778 	mutex_unlock(device->state_mutex);
1779 }
1780 
1781 /* If the resource already closed the current epoch, but we did not
1782  * (because we have not yet seen new requests), we should send the
1783  * corresponding barrier now.  Must be checked within the same spinlock
1784  * that is used to check for new requests. */
1785 static bool need_to_send_barrier(struct drbd_connection *connection)
1786 {
1787 	if (!connection->send.seen_any_write_yet)
1788 		return false;
1789 
1790 	/* Skip barriers that do not contain any writes.
1791 	 * This may happen during AHEAD mode. */
1792 	if (!connection->send.current_epoch_writes)
1793 		return false;
1794 
1795 	/* ->req_lock is held when requests are queued on
1796 	 * connection->sender_work, and put into ->transfer_log.
1797 	 * It is also held when ->current_tle_nr is increased.
1798 	 * So either there are already new requests queued,
1799 	 * and corresponding barriers will be send there.
1800 	 * Or nothing new is queued yet, so the difference will be 1.
1801 	 */
1802 	if (atomic_read(&connection->current_tle_nr) !=
1803 	    connection->send.current_epoch_nr + 1)
1804 		return false;
1805 
1806 	return true;
1807 }
1808 
1809 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1810 {
1811 	spin_lock_irq(&queue->q_lock);
1812 	list_splice_init(&queue->q, work_list);
1813 	spin_unlock_irq(&queue->q_lock);
1814 	return !list_empty(work_list);
1815 }
1816 
1817 static bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
1818 {
1819 	spin_lock_irq(&queue->q_lock);
1820 	if (!list_empty(&queue->q))
1821 		list_move(queue->q.next, work_list);
1822 	spin_unlock_irq(&queue->q_lock);
1823 	return !list_empty(work_list);
1824 }
1825 
1826 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1827 {
1828 	DEFINE_WAIT(wait);
1829 	struct net_conf *nc;
1830 	int uncork, cork;
1831 
1832 	dequeue_work_item(&connection->sender_work, work_list);
1833 	if (!list_empty(work_list))
1834 		return;
1835 
1836 	/* Still nothing to do?
1837 	 * Maybe we still need to close the current epoch,
1838 	 * even if no new requests are queued yet.
1839 	 *
1840 	 * Also, poke TCP, just in case.
1841 	 * Then wait for new work (or signal). */
1842 	rcu_read_lock();
1843 	nc = rcu_dereference(connection->net_conf);
1844 	uncork = nc ? nc->tcp_cork : 0;
1845 	rcu_read_unlock();
1846 	if (uncork) {
1847 		mutex_lock(&connection->data.mutex);
1848 		if (connection->data.socket)
1849 			drbd_tcp_uncork(connection->data.socket);
1850 		mutex_unlock(&connection->data.mutex);
1851 	}
1852 
1853 	for (;;) {
1854 		int send_barrier;
1855 		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
1856 		spin_lock_irq(&connection->resource->req_lock);
1857 		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
1858 		/* dequeue single item only,
1859 		 * we still use drbd_queue_work_front() in some places */
1860 		if (!list_empty(&connection->sender_work.q))
1861 			list_move(connection->sender_work.q.next, work_list);
1862 		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
1863 		if (!list_empty(work_list) || signal_pending(current)) {
1864 			spin_unlock_irq(&connection->resource->req_lock);
1865 			break;
1866 		}
1867 		send_barrier = need_to_send_barrier(connection);
1868 		spin_unlock_irq(&connection->resource->req_lock);
1869 		if (send_barrier) {
1870 			drbd_send_barrier(connection);
1871 			connection->send.current_epoch_nr++;
1872 		}
1873 		schedule();
1874 		/* may be woken up for other things but new work, too,
1875 		 * e.g. if the current epoch got closed.
1876 		 * In which case we send the barrier above. */
1877 	}
1878 	finish_wait(&connection->sender_work.q_wait, &wait);
1879 
1880 	/* someone may have changed the config while we have been waiting above. */
1881 	rcu_read_lock();
1882 	nc = rcu_dereference(connection->net_conf);
1883 	cork = nc ? nc->tcp_cork : 0;
1884 	rcu_read_unlock();
1885 	mutex_lock(&connection->data.mutex);
1886 	if (connection->data.socket) {
1887 		if (cork)
1888 			drbd_tcp_cork(connection->data.socket);
1889 		else if (!uncork)
1890 			drbd_tcp_uncork(connection->data.socket);
1891 	}
1892 	mutex_unlock(&connection->data.mutex);
1893 }
1894 
1895 int drbd_worker(struct drbd_thread *thi)
1896 {
1897 	struct drbd_connection *connection = thi->connection;
1898 	struct drbd_work *w = NULL;
1899 	struct drbd_peer_device *peer_device;
1900 	LIST_HEAD(work_list);
1901 	int vnr;
1902 
1903 	while (get_t_state(thi) == RUNNING) {
1904 		drbd_thread_current_set_cpu(thi);
1905 
1906 		/* as long as we use drbd_queue_work_front(),
1907 		 * we may only dequeue single work items here, not batches. */
1908 		if (list_empty(&work_list))
1909 			wait_for_work(connection, &work_list);
1910 
1911 		if (signal_pending(current)) {
1912 			flush_signals(current);
1913 			if (get_t_state(thi) == RUNNING) {
1914 				drbd_warn(connection, "Worker got an unexpected signal\n");
1915 				continue;
1916 			}
1917 			break;
1918 		}
1919 
1920 		if (get_t_state(thi) != RUNNING)
1921 			break;
1922 
1923 		while (!list_empty(&work_list)) {
1924 			w = list_first_entry(&work_list, struct drbd_work, list);
1925 			list_del_init(&w->list);
1926 			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
1927 				continue;
1928 			if (connection->cstate >= C_WF_REPORT_PARAMS)
1929 				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1930 		}
1931 	}
1932 
1933 	do {
1934 		while (!list_empty(&work_list)) {
1935 			w = list_first_entry(&work_list, struct drbd_work, list);
1936 			list_del_init(&w->list);
1937 			w->cb(w, 1);
1938 		}
1939 		dequeue_work_batch(&connection->sender_work, &work_list);
1940 	} while (!list_empty(&work_list));
1941 
1942 	rcu_read_lock();
1943 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1944 		struct drbd_device *device = peer_device->device;
1945 		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
1946 		kref_get(&device->kref);
1947 		rcu_read_unlock();
1948 		drbd_device_cleanup(device);
1949 		kref_put(&device->kref, drbd_destroy_device);
1950 		rcu_read_lock();
1951 	}
1952 	rcu_read_unlock();
1953 
1954 	return 0;
1955 }
1956