xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision 4f205687)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24 */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41 
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44 
45 /* endio handlers:
46  *   drbd_md_endio (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   drbd_bm_endio (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57 
58 /* used for synchronous meta data and bitmap IO
59  * submitted by drbd_md_sync_page_io()
60  */
61 void drbd_md_endio(struct bio *bio)
62 {
63 	struct drbd_device *device;
64 
65 	device = bio->bi_private;
66 	device->md_io.error = bio->bi_error;
67 
68 	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
69 	 * to timeout on the lower level device, and eventually detach from it.
70 	 * If this io completion runs after that timeout expired, this
71 	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
72 	 * During normal operation, this only puts that extra reference
73 	 * down to 1 again.
74 	 * Make sure we first drop the reference, and only then signal
75 	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
76 	 * next drbd_md_sync_page_io(), that we trigger the
77 	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
78 	 */
79 	drbd_md_put_buffer(device);
80 	device->md_io.done = 1;
81 	wake_up(&device->misc_wait);
82 	bio_put(bio);
83 	if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
84 		put_ldev(device);
85 }
86 
87 /* reads on behalf of the partner,
88  * "submitted" by the receiver
89  */
90 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
91 {
92 	unsigned long flags = 0;
93 	struct drbd_peer_device *peer_device = peer_req->peer_device;
94 	struct drbd_device *device = peer_device->device;
95 
96 	spin_lock_irqsave(&device->resource->req_lock, flags);
97 	device->read_cnt += peer_req->i.size >> 9;
98 	list_del(&peer_req->w.list);
99 	if (list_empty(&device->read_ee))
100 		wake_up(&device->ee_wait);
101 	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
102 		__drbd_chk_io_error(device, DRBD_READ_ERROR);
103 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
104 
105 	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
106 	put_ldev(device);
107 }
108 
109 /* writes on behalf of the partner, or resync writes,
110  * "submitted" by the receiver, final stage.  */
111 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
112 {
113 	unsigned long flags = 0;
114 	struct drbd_peer_device *peer_device = peer_req->peer_device;
115 	struct drbd_device *device = peer_device->device;
116 	struct drbd_connection *connection = peer_device->connection;
117 	struct drbd_interval i;
118 	int do_wake;
119 	u64 block_id;
120 	int do_al_complete_io;
121 
122 	/* after we moved peer_req to done_ee,
123 	 * we may no longer access it,
124 	 * it may be freed/reused already!
125 	 * (as soon as we release the req_lock) */
126 	i = peer_req->i;
127 	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
128 	block_id = peer_req->block_id;
129 	peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
130 
131 	spin_lock_irqsave(&device->resource->req_lock, flags);
132 	device->writ_cnt += peer_req->i.size >> 9;
133 	list_move_tail(&peer_req->w.list, &device->done_ee);
134 
135 	/*
136 	 * Do not remove from the write_requests tree here: we did not send the
137 	 * Ack yet and did not wake possibly waiting conflicting requests.
138 	 * Removed from the tree from "drbd_process_done_ee" within the
139 	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
140 	 * _drbd_clear_done_ee.
141 	 */
142 
143 	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
144 
145 	/* FIXME do we want to detach for failed REQ_DISCARD?
146 	 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
147 	if (peer_req->flags & EE_WAS_ERROR)
148 		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
149 
150 	if (connection->cstate >= C_WF_REPORT_PARAMS) {
151 		kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
152 		if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
153 			kref_put(&device->kref, drbd_destroy_device);
154 	}
155 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
156 
157 	if (block_id == ID_SYNCER)
158 		drbd_rs_complete_io(device, i.sector);
159 
160 	if (do_wake)
161 		wake_up(&device->ee_wait);
162 
163 	if (do_al_complete_io)
164 		drbd_al_complete_io(device, &i);
165 
166 	put_ldev(device);
167 }
168 
169 /* writes on behalf of the partner, or resync writes,
170  * "submitted" by the receiver.
171  */
172 void drbd_peer_request_endio(struct bio *bio)
173 {
174 	struct drbd_peer_request *peer_req = bio->bi_private;
175 	struct drbd_device *device = peer_req->peer_device->device;
176 	int is_write = bio_data_dir(bio) == WRITE;
177 	int is_discard = !!(bio->bi_rw & REQ_DISCARD);
178 
179 	if (bio->bi_error && __ratelimit(&drbd_ratelimit_state))
180 		drbd_warn(device, "%s: error=%d s=%llus\n",
181 				is_write ? (is_discard ? "discard" : "write")
182 					: "read", bio->bi_error,
183 				(unsigned long long)peer_req->i.sector);
184 
185 	if (bio->bi_error)
186 		set_bit(__EE_WAS_ERROR, &peer_req->flags);
187 
188 	bio_put(bio); /* no need for the bio anymore */
189 	if (atomic_dec_and_test(&peer_req->pending_bios)) {
190 		if (is_write)
191 			drbd_endio_write_sec_final(peer_req);
192 		else
193 			drbd_endio_read_sec_final(peer_req);
194 	}
195 }
196 
197 void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
198 {
199 	panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
200 		device->minor, device->resource->name, device->vnr);
201 }
202 
203 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
204  */
205 void drbd_request_endio(struct bio *bio)
206 {
207 	unsigned long flags;
208 	struct drbd_request *req = bio->bi_private;
209 	struct drbd_device *device = req->device;
210 	struct bio_and_error m;
211 	enum drbd_req_event what;
212 
213 	/* If this request was aborted locally before,
214 	 * but now was completed "successfully",
215 	 * chances are that this caused arbitrary data corruption.
216 	 *
217 	 * "aborting" requests, or force-detaching the disk, is intended for
218 	 * completely blocked/hung local backing devices which do no longer
219 	 * complete requests at all, not even do error completions.  In this
220 	 * situation, usually a hard-reset and failover is the only way out.
221 	 *
222 	 * By "aborting", basically faking a local error-completion,
223 	 * we allow for a more graceful swichover by cleanly migrating services.
224 	 * Still the affected node has to be rebooted "soon".
225 	 *
226 	 * By completing these requests, we allow the upper layers to re-use
227 	 * the associated data pages.
228 	 *
229 	 * If later the local backing device "recovers", and now DMAs some data
230 	 * from disk into the original request pages, in the best case it will
231 	 * just put random data into unused pages; but typically it will corrupt
232 	 * meanwhile completely unrelated data, causing all sorts of damage.
233 	 *
234 	 * Which means delayed successful completion,
235 	 * especially for READ requests,
236 	 * is a reason to panic().
237 	 *
238 	 * We assume that a delayed *error* completion is OK,
239 	 * though we still will complain noisily about it.
240 	 */
241 	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
242 		if (__ratelimit(&drbd_ratelimit_state))
243 			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
244 
245 		if (!bio->bi_error)
246 			drbd_panic_after_delayed_completion_of_aborted_request(device);
247 	}
248 
249 	/* to avoid recursion in __req_mod */
250 	if (unlikely(bio->bi_error)) {
251 		if (bio->bi_rw & REQ_DISCARD)
252 			what = (bio->bi_error == -EOPNOTSUPP)
253 				? DISCARD_COMPLETED_NOTSUPP
254 				: DISCARD_COMPLETED_WITH_ERROR;
255 		else
256 			what = (bio_data_dir(bio) == WRITE)
257 			? WRITE_COMPLETED_WITH_ERROR
258 			: (bio_rw(bio) == READ)
259 			  ? READ_COMPLETED_WITH_ERROR
260 			  : READ_AHEAD_COMPLETED_WITH_ERROR;
261 	} else
262 		what = COMPLETED_OK;
263 
264 	bio_put(req->private_bio);
265 	req->private_bio = ERR_PTR(bio->bi_error);
266 
267 	/* not req_mod(), we need irqsave here! */
268 	spin_lock_irqsave(&device->resource->req_lock, flags);
269 	__req_mod(req, what, &m);
270 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
271 	put_ldev(device);
272 
273 	if (m.bio)
274 		complete_master_bio(device, &m);
275 }
276 
277 void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
278 {
279 	AHASH_REQUEST_ON_STACK(req, tfm);
280 	struct scatterlist sg;
281 	struct page *page = peer_req->pages;
282 	struct page *tmp;
283 	unsigned len;
284 
285 	ahash_request_set_tfm(req, tfm);
286 	ahash_request_set_callback(req, 0, NULL, NULL);
287 
288 	sg_init_table(&sg, 1);
289 	crypto_ahash_init(req);
290 
291 	while ((tmp = page_chain_next(page))) {
292 		/* all but the last page will be fully used */
293 		sg_set_page(&sg, page, PAGE_SIZE, 0);
294 		ahash_request_set_crypt(req, &sg, NULL, sg.length);
295 		crypto_ahash_update(req);
296 		page = tmp;
297 	}
298 	/* and now the last, possibly only partially used page */
299 	len = peer_req->i.size & (PAGE_SIZE - 1);
300 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
301 	ahash_request_set_crypt(req, &sg, digest, sg.length);
302 	crypto_ahash_finup(req);
303 	ahash_request_zero(req);
304 }
305 
306 void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
307 {
308 	AHASH_REQUEST_ON_STACK(req, tfm);
309 	struct scatterlist sg;
310 	struct bio_vec bvec;
311 	struct bvec_iter iter;
312 
313 	ahash_request_set_tfm(req, tfm);
314 	ahash_request_set_callback(req, 0, NULL, NULL);
315 
316 	sg_init_table(&sg, 1);
317 	crypto_ahash_init(req);
318 
319 	bio_for_each_segment(bvec, bio, iter) {
320 		sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
321 		ahash_request_set_crypt(req, &sg, NULL, sg.length);
322 		crypto_ahash_update(req);
323 	}
324 	ahash_request_set_crypt(req, NULL, digest, 0);
325 	crypto_ahash_final(req);
326 	ahash_request_zero(req);
327 }
328 
329 /* MAYBE merge common code with w_e_end_ov_req */
330 static int w_e_send_csum(struct drbd_work *w, int cancel)
331 {
332 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
333 	struct drbd_peer_device *peer_device = peer_req->peer_device;
334 	struct drbd_device *device = peer_device->device;
335 	int digest_size;
336 	void *digest;
337 	int err = 0;
338 
339 	if (unlikely(cancel))
340 		goto out;
341 
342 	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
343 		goto out;
344 
345 	digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
346 	digest = kmalloc(digest_size, GFP_NOIO);
347 	if (digest) {
348 		sector_t sector = peer_req->i.sector;
349 		unsigned int size = peer_req->i.size;
350 		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
351 		/* Free peer_req and pages before send.
352 		 * In case we block on congestion, we could otherwise run into
353 		 * some distributed deadlock, if the other side blocks on
354 		 * congestion as well, because our receiver blocks in
355 		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
356 		drbd_free_peer_req(device, peer_req);
357 		peer_req = NULL;
358 		inc_rs_pending(device);
359 		err = drbd_send_drequest_csum(peer_device, sector, size,
360 					      digest, digest_size,
361 					      P_CSUM_RS_REQUEST);
362 		kfree(digest);
363 	} else {
364 		drbd_err(device, "kmalloc() of digest failed.\n");
365 		err = -ENOMEM;
366 	}
367 
368 out:
369 	if (peer_req)
370 		drbd_free_peer_req(device, peer_req);
371 
372 	if (unlikely(err))
373 		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
374 	return err;
375 }
376 
377 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
378 
379 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
380 {
381 	struct drbd_device *device = peer_device->device;
382 	struct drbd_peer_request *peer_req;
383 
384 	if (!get_ldev(device))
385 		return -EIO;
386 
387 	/* GFP_TRY, because if there is no memory available right now, this may
388 	 * be rescheduled for later. It is "only" background resync, after all. */
389 	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
390 				       size, true /* has real payload */, GFP_TRY);
391 	if (!peer_req)
392 		goto defer;
393 
394 	peer_req->w.cb = w_e_send_csum;
395 	spin_lock_irq(&device->resource->req_lock);
396 	list_add_tail(&peer_req->w.list, &device->read_ee);
397 	spin_unlock_irq(&device->resource->req_lock);
398 
399 	atomic_add(size >> 9, &device->rs_sect_ev);
400 	if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
401 		return 0;
402 
403 	/* If it failed because of ENOMEM, retry should help.  If it failed
404 	 * because bio_add_page failed (probably broken lower level driver),
405 	 * retry may or may not help.
406 	 * If it does not, you may need to force disconnect. */
407 	spin_lock_irq(&device->resource->req_lock);
408 	list_del(&peer_req->w.list);
409 	spin_unlock_irq(&device->resource->req_lock);
410 
411 	drbd_free_peer_req(device, peer_req);
412 defer:
413 	put_ldev(device);
414 	return -EAGAIN;
415 }
416 
417 int w_resync_timer(struct drbd_work *w, int cancel)
418 {
419 	struct drbd_device *device =
420 		container_of(w, struct drbd_device, resync_work);
421 
422 	switch (device->state.conn) {
423 	case C_VERIFY_S:
424 		make_ov_request(device, cancel);
425 		break;
426 	case C_SYNC_TARGET:
427 		make_resync_request(device, cancel);
428 		break;
429 	}
430 
431 	return 0;
432 }
433 
434 void resync_timer_fn(unsigned long data)
435 {
436 	struct drbd_device *device = (struct drbd_device *) data;
437 
438 	drbd_queue_work_if_unqueued(
439 		&first_peer_device(device)->connection->sender_work,
440 		&device->resync_work);
441 }
442 
443 static void fifo_set(struct fifo_buffer *fb, int value)
444 {
445 	int i;
446 
447 	for (i = 0; i < fb->size; i++)
448 		fb->values[i] = value;
449 }
450 
451 static int fifo_push(struct fifo_buffer *fb, int value)
452 {
453 	int ov;
454 
455 	ov = fb->values[fb->head_index];
456 	fb->values[fb->head_index++] = value;
457 
458 	if (fb->head_index >= fb->size)
459 		fb->head_index = 0;
460 
461 	return ov;
462 }
463 
464 static void fifo_add_val(struct fifo_buffer *fb, int value)
465 {
466 	int i;
467 
468 	for (i = 0; i < fb->size; i++)
469 		fb->values[i] += value;
470 }
471 
472 struct fifo_buffer *fifo_alloc(int fifo_size)
473 {
474 	struct fifo_buffer *fb;
475 
476 	fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
477 	if (!fb)
478 		return NULL;
479 
480 	fb->head_index = 0;
481 	fb->size = fifo_size;
482 	fb->total = 0;
483 
484 	return fb;
485 }
486 
487 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
488 {
489 	struct disk_conf *dc;
490 	unsigned int want;     /* The number of sectors we want in-flight */
491 	int req_sect; /* Number of sectors to request in this turn */
492 	int correction; /* Number of sectors more we need in-flight */
493 	int cps; /* correction per invocation of drbd_rs_controller() */
494 	int steps; /* Number of time steps to plan ahead */
495 	int curr_corr;
496 	int max_sect;
497 	struct fifo_buffer *plan;
498 
499 	dc = rcu_dereference(device->ldev->disk_conf);
500 	plan = rcu_dereference(device->rs_plan_s);
501 
502 	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
503 
504 	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
505 		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
506 	} else { /* normal path */
507 		want = dc->c_fill_target ? dc->c_fill_target :
508 			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
509 	}
510 
511 	correction = want - device->rs_in_flight - plan->total;
512 
513 	/* Plan ahead */
514 	cps = correction / steps;
515 	fifo_add_val(plan, cps);
516 	plan->total += cps * steps;
517 
518 	/* What we do in this step */
519 	curr_corr = fifo_push(plan, 0);
520 	plan->total -= curr_corr;
521 
522 	req_sect = sect_in + curr_corr;
523 	if (req_sect < 0)
524 		req_sect = 0;
525 
526 	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
527 	if (req_sect > max_sect)
528 		req_sect = max_sect;
529 
530 	/*
531 	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
532 		 sect_in, device->rs_in_flight, want, correction,
533 		 steps, cps, device->rs_planed, curr_corr, req_sect);
534 	*/
535 
536 	return req_sect;
537 }
538 
539 static int drbd_rs_number_requests(struct drbd_device *device)
540 {
541 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
542 	int number, mxb;
543 
544 	sect_in = atomic_xchg(&device->rs_sect_in, 0);
545 	device->rs_in_flight -= sect_in;
546 
547 	rcu_read_lock();
548 	mxb = drbd_get_max_buffers(device) / 2;
549 	if (rcu_dereference(device->rs_plan_s)->size) {
550 		number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
551 		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
552 	} else {
553 		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
554 		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
555 	}
556 	rcu_read_unlock();
557 
558 	/* Don't have more than "max-buffers"/2 in-flight.
559 	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
560 	 * potentially causing a distributed deadlock on congestion during
561 	 * online-verify or (checksum-based) resync, if max-buffers,
562 	 * socket buffer sizes and resync rate settings are mis-configured. */
563 
564 	/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
565 	 * mxb (as used here, and in drbd_alloc_pages on the peer) is
566 	 * "number of pages" (typically also 4k),
567 	 * but "rs_in_flight" is in "sectors" (512 Byte). */
568 	if (mxb - device->rs_in_flight/8 < number)
569 		number = mxb - device->rs_in_flight/8;
570 
571 	return number;
572 }
573 
574 static int make_resync_request(struct drbd_device *const device, int cancel)
575 {
576 	struct drbd_peer_device *const peer_device = first_peer_device(device);
577 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
578 	unsigned long bit;
579 	sector_t sector;
580 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
581 	int max_bio_size;
582 	int number, rollback_i, size;
583 	int align, requeue = 0;
584 	int i = 0;
585 
586 	if (unlikely(cancel))
587 		return 0;
588 
589 	if (device->rs_total == 0) {
590 		/* empty resync? */
591 		drbd_resync_finished(device);
592 		return 0;
593 	}
594 
595 	if (!get_ldev(device)) {
596 		/* Since we only need to access device->rsync a
597 		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
598 		   to continue resync with a broken disk makes no sense at
599 		   all */
600 		drbd_err(device, "Disk broke down during resync!\n");
601 		return 0;
602 	}
603 
604 	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
605 	number = drbd_rs_number_requests(device);
606 	if (number <= 0)
607 		goto requeue;
608 
609 	for (i = 0; i < number; i++) {
610 		/* Stop generating RS requests when half of the send buffer is filled,
611 		 * but notify TCP that we'd like to have more space. */
612 		mutex_lock(&connection->data.mutex);
613 		if (connection->data.socket) {
614 			struct sock *sk = connection->data.socket->sk;
615 			int queued = sk->sk_wmem_queued;
616 			int sndbuf = sk->sk_sndbuf;
617 			if (queued > sndbuf / 2) {
618 				requeue = 1;
619 				if (sk->sk_socket)
620 					set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
621 			}
622 		} else
623 			requeue = 1;
624 		mutex_unlock(&connection->data.mutex);
625 		if (requeue)
626 			goto requeue;
627 
628 next_sector:
629 		size = BM_BLOCK_SIZE;
630 		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
631 
632 		if (bit == DRBD_END_OF_BITMAP) {
633 			device->bm_resync_fo = drbd_bm_bits(device);
634 			put_ldev(device);
635 			return 0;
636 		}
637 
638 		sector = BM_BIT_TO_SECT(bit);
639 
640 		if (drbd_try_rs_begin_io(device, sector)) {
641 			device->bm_resync_fo = bit;
642 			goto requeue;
643 		}
644 		device->bm_resync_fo = bit + 1;
645 
646 		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
647 			drbd_rs_complete_io(device, sector);
648 			goto next_sector;
649 		}
650 
651 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
652 		/* try to find some adjacent bits.
653 		 * we stop if we have already the maximum req size.
654 		 *
655 		 * Additionally always align bigger requests, in order to
656 		 * be prepared for all stripe sizes of software RAIDs.
657 		 */
658 		align = 1;
659 		rollback_i = i;
660 		while (i < number) {
661 			if (size + BM_BLOCK_SIZE > max_bio_size)
662 				break;
663 
664 			/* Be always aligned */
665 			if (sector & ((1<<(align+3))-1))
666 				break;
667 
668 			/* do not cross extent boundaries */
669 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
670 				break;
671 			/* now, is it actually dirty, after all?
672 			 * caution, drbd_bm_test_bit is tri-state for some
673 			 * obscure reason; ( b == 0 ) would get the out-of-band
674 			 * only accidentally right because of the "oddly sized"
675 			 * adjustment below */
676 			if (drbd_bm_test_bit(device, bit+1) != 1)
677 				break;
678 			bit++;
679 			size += BM_BLOCK_SIZE;
680 			if ((BM_BLOCK_SIZE << align) <= size)
681 				align++;
682 			i++;
683 		}
684 		/* if we merged some,
685 		 * reset the offset to start the next drbd_bm_find_next from */
686 		if (size > BM_BLOCK_SIZE)
687 			device->bm_resync_fo = bit + 1;
688 #endif
689 
690 		/* adjust very last sectors, in case we are oddly sized */
691 		if (sector + (size>>9) > capacity)
692 			size = (capacity-sector)<<9;
693 
694 		if (device->use_csums) {
695 			switch (read_for_csum(peer_device, sector, size)) {
696 			case -EIO: /* Disk failure */
697 				put_ldev(device);
698 				return -EIO;
699 			case -EAGAIN: /* allocation failed, or ldev busy */
700 				drbd_rs_complete_io(device, sector);
701 				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
702 				i = rollback_i;
703 				goto requeue;
704 			case 0:
705 				/* everything ok */
706 				break;
707 			default:
708 				BUG();
709 			}
710 		} else {
711 			int err;
712 
713 			inc_rs_pending(device);
714 			err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
715 						 sector, size, ID_SYNCER);
716 			if (err) {
717 				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
718 				dec_rs_pending(device);
719 				put_ldev(device);
720 				return err;
721 			}
722 		}
723 	}
724 
725 	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
726 		/* last syncer _request_ was sent,
727 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
728 		 * next sync group will resume), as soon as we receive the last
729 		 * resync data block, and the last bit is cleared.
730 		 * until then resync "work" is "inactive" ...
731 		 */
732 		put_ldev(device);
733 		return 0;
734 	}
735 
736  requeue:
737 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
738 	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
739 	put_ldev(device);
740 	return 0;
741 }
742 
743 static int make_ov_request(struct drbd_device *device, int cancel)
744 {
745 	int number, i, size;
746 	sector_t sector;
747 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
748 	bool stop_sector_reached = false;
749 
750 	if (unlikely(cancel))
751 		return 1;
752 
753 	number = drbd_rs_number_requests(device);
754 
755 	sector = device->ov_position;
756 	for (i = 0; i < number; i++) {
757 		if (sector >= capacity)
758 			return 1;
759 
760 		/* We check for "finished" only in the reply path:
761 		 * w_e_end_ov_reply().
762 		 * We need to send at least one request out. */
763 		stop_sector_reached = i > 0
764 			&& verify_can_do_stop_sector(device)
765 			&& sector >= device->ov_stop_sector;
766 		if (stop_sector_reached)
767 			break;
768 
769 		size = BM_BLOCK_SIZE;
770 
771 		if (drbd_try_rs_begin_io(device, sector)) {
772 			device->ov_position = sector;
773 			goto requeue;
774 		}
775 
776 		if (sector + (size>>9) > capacity)
777 			size = (capacity-sector)<<9;
778 
779 		inc_rs_pending(device);
780 		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
781 			dec_rs_pending(device);
782 			return 0;
783 		}
784 		sector += BM_SECT_PER_BIT;
785 	}
786 	device->ov_position = sector;
787 
788  requeue:
789 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
790 	if (i == 0 || !stop_sector_reached)
791 		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
792 	return 1;
793 }
794 
795 int w_ov_finished(struct drbd_work *w, int cancel)
796 {
797 	struct drbd_device_work *dw =
798 		container_of(w, struct drbd_device_work, w);
799 	struct drbd_device *device = dw->device;
800 	kfree(dw);
801 	ov_out_of_sync_print(device);
802 	drbd_resync_finished(device);
803 
804 	return 0;
805 }
806 
807 static int w_resync_finished(struct drbd_work *w, int cancel)
808 {
809 	struct drbd_device_work *dw =
810 		container_of(w, struct drbd_device_work, w);
811 	struct drbd_device *device = dw->device;
812 	kfree(dw);
813 
814 	drbd_resync_finished(device);
815 
816 	return 0;
817 }
818 
819 static void ping_peer(struct drbd_device *device)
820 {
821 	struct drbd_connection *connection = first_peer_device(device)->connection;
822 
823 	clear_bit(GOT_PING_ACK, &connection->flags);
824 	request_ping(connection);
825 	wait_event(connection->ping_wait,
826 		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
827 }
828 
829 int drbd_resync_finished(struct drbd_device *device)
830 {
831 	unsigned long db, dt, dbdt;
832 	unsigned long n_oos;
833 	union drbd_state os, ns;
834 	struct drbd_device_work *dw;
835 	char *khelper_cmd = NULL;
836 	int verify_done = 0;
837 
838 	/* Remove all elements from the resync LRU. Since future actions
839 	 * might set bits in the (main) bitmap, then the entries in the
840 	 * resync LRU would be wrong. */
841 	if (drbd_rs_del_all(device)) {
842 		/* In case this is not possible now, most probably because
843 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
844 		 * queue (or even the read operations for those packets
845 		 * is not finished by now).   Retry in 100ms. */
846 
847 		schedule_timeout_interruptible(HZ / 10);
848 		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
849 		if (dw) {
850 			dw->w.cb = w_resync_finished;
851 			dw->device = device;
852 			drbd_queue_work(&first_peer_device(device)->connection->sender_work,
853 					&dw->w);
854 			return 1;
855 		}
856 		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
857 	}
858 
859 	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
860 	if (dt <= 0)
861 		dt = 1;
862 
863 	db = device->rs_total;
864 	/* adjust for verify start and stop sectors, respective reached position */
865 	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
866 		db -= device->ov_left;
867 
868 	dbdt = Bit2KB(db/dt);
869 	device->rs_paused /= HZ;
870 
871 	if (!get_ldev(device))
872 		goto out;
873 
874 	ping_peer(device);
875 
876 	spin_lock_irq(&device->resource->req_lock);
877 	os = drbd_read_state(device);
878 
879 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
880 
881 	/* This protects us against multiple calls (that can happen in the presence
882 	   of application IO), and against connectivity loss just before we arrive here. */
883 	if (os.conn <= C_CONNECTED)
884 		goto out_unlock;
885 
886 	ns = os;
887 	ns.conn = C_CONNECTED;
888 
889 	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
890 	     verify_done ? "Online verify" : "Resync",
891 	     dt + device->rs_paused, device->rs_paused, dbdt);
892 
893 	n_oos = drbd_bm_total_weight(device);
894 
895 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
896 		if (n_oos) {
897 			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
898 			      n_oos, Bit2KB(1));
899 			khelper_cmd = "out-of-sync";
900 		}
901 	} else {
902 		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
903 
904 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
905 			khelper_cmd = "after-resync-target";
906 
907 		if (device->use_csums && device->rs_total) {
908 			const unsigned long s = device->rs_same_csum;
909 			const unsigned long t = device->rs_total;
910 			const int ratio =
911 				(t == 0)     ? 0 :
912 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
913 			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
914 			     "transferred %luK total %luK\n",
915 			     ratio,
916 			     Bit2KB(device->rs_same_csum),
917 			     Bit2KB(device->rs_total - device->rs_same_csum),
918 			     Bit2KB(device->rs_total));
919 		}
920 	}
921 
922 	if (device->rs_failed) {
923 		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
924 
925 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
926 			ns.disk = D_INCONSISTENT;
927 			ns.pdsk = D_UP_TO_DATE;
928 		} else {
929 			ns.disk = D_UP_TO_DATE;
930 			ns.pdsk = D_INCONSISTENT;
931 		}
932 	} else {
933 		ns.disk = D_UP_TO_DATE;
934 		ns.pdsk = D_UP_TO_DATE;
935 
936 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
937 			if (device->p_uuid) {
938 				int i;
939 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
940 					_drbd_uuid_set(device, i, device->p_uuid[i]);
941 				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
942 				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
943 			} else {
944 				drbd_err(device, "device->p_uuid is NULL! BUG\n");
945 			}
946 		}
947 
948 		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
949 			/* for verify runs, we don't update uuids here,
950 			 * so there would be nothing to report. */
951 			drbd_uuid_set_bm(device, 0UL);
952 			drbd_print_uuids(device, "updated UUIDs");
953 			if (device->p_uuid) {
954 				/* Now the two UUID sets are equal, update what we
955 				 * know of the peer. */
956 				int i;
957 				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
958 					device->p_uuid[i] = device->ldev->md.uuid[i];
959 			}
960 		}
961 	}
962 
963 	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
964 out_unlock:
965 	spin_unlock_irq(&device->resource->req_lock);
966 	put_ldev(device);
967 out:
968 	device->rs_total  = 0;
969 	device->rs_failed = 0;
970 	device->rs_paused = 0;
971 
972 	/* reset start sector, if we reached end of device */
973 	if (verify_done && device->ov_left == 0)
974 		device->ov_start_sector = 0;
975 
976 	drbd_md_sync(device);
977 
978 	if (khelper_cmd)
979 		drbd_khelper(device, khelper_cmd);
980 
981 	return 1;
982 }
983 
984 /* helper */
985 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
986 {
987 	if (drbd_peer_req_has_active_page(peer_req)) {
988 		/* This might happen if sendpage() has not finished */
989 		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
990 		atomic_add(i, &device->pp_in_use_by_net);
991 		atomic_sub(i, &device->pp_in_use);
992 		spin_lock_irq(&device->resource->req_lock);
993 		list_add_tail(&peer_req->w.list, &device->net_ee);
994 		spin_unlock_irq(&device->resource->req_lock);
995 		wake_up(&drbd_pp_wait);
996 	} else
997 		drbd_free_peer_req(device, peer_req);
998 }
999 
1000 /**
1001  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1002  * @device:	DRBD device.
1003  * @w:		work object.
1004  * @cancel:	The connection will be closed anyways
1005  */
1006 int w_e_end_data_req(struct drbd_work *w, int cancel)
1007 {
1008 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1009 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1010 	struct drbd_device *device = peer_device->device;
1011 	int err;
1012 
1013 	if (unlikely(cancel)) {
1014 		drbd_free_peer_req(device, peer_req);
1015 		dec_unacked(device);
1016 		return 0;
1017 	}
1018 
1019 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1020 		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1021 	} else {
1022 		if (__ratelimit(&drbd_ratelimit_state))
1023 			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1024 			    (unsigned long long)peer_req->i.sector);
1025 
1026 		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1027 	}
1028 
1029 	dec_unacked(device);
1030 
1031 	move_to_net_ee_or_free(device, peer_req);
1032 
1033 	if (unlikely(err))
1034 		drbd_err(device, "drbd_send_block() failed\n");
1035 	return err;
1036 }
1037 
1038 /**
1039  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1040  * @w:		work object.
1041  * @cancel:	The connection will be closed anyways
1042  */
1043 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1044 {
1045 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1046 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1047 	struct drbd_device *device = peer_device->device;
1048 	int err;
1049 
1050 	if (unlikely(cancel)) {
1051 		drbd_free_peer_req(device, peer_req);
1052 		dec_unacked(device);
1053 		return 0;
1054 	}
1055 
1056 	if (get_ldev_if_state(device, D_FAILED)) {
1057 		drbd_rs_complete_io(device, peer_req->i.sector);
1058 		put_ldev(device);
1059 	}
1060 
1061 	if (device->state.conn == C_AHEAD) {
1062 		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1063 	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1064 		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1065 			inc_rs_pending(device);
1066 			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1067 		} else {
1068 			if (__ratelimit(&drbd_ratelimit_state))
1069 				drbd_err(device, "Not sending RSDataReply, "
1070 				    "partner DISKLESS!\n");
1071 			err = 0;
1072 		}
1073 	} else {
1074 		if (__ratelimit(&drbd_ratelimit_state))
1075 			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1076 			    (unsigned long long)peer_req->i.sector);
1077 
1078 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1079 
1080 		/* update resync data with failure */
1081 		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1082 	}
1083 
1084 	dec_unacked(device);
1085 
1086 	move_to_net_ee_or_free(device, peer_req);
1087 
1088 	if (unlikely(err))
1089 		drbd_err(device, "drbd_send_block() failed\n");
1090 	return err;
1091 }
1092 
1093 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1094 {
1095 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1096 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1097 	struct drbd_device *device = peer_device->device;
1098 	struct digest_info *di;
1099 	int digest_size;
1100 	void *digest = NULL;
1101 	int err, eq = 0;
1102 
1103 	if (unlikely(cancel)) {
1104 		drbd_free_peer_req(device, peer_req);
1105 		dec_unacked(device);
1106 		return 0;
1107 	}
1108 
1109 	if (get_ldev(device)) {
1110 		drbd_rs_complete_io(device, peer_req->i.sector);
1111 		put_ldev(device);
1112 	}
1113 
1114 	di = peer_req->digest;
1115 
1116 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1117 		/* quick hack to try to avoid a race against reconfiguration.
1118 		 * a real fix would be much more involved,
1119 		 * introducing more locking mechanisms */
1120 		if (peer_device->connection->csums_tfm) {
1121 			digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
1122 			D_ASSERT(device, digest_size == di->digest_size);
1123 			digest = kmalloc(digest_size, GFP_NOIO);
1124 		}
1125 		if (digest) {
1126 			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1127 			eq = !memcmp(digest, di->digest, digest_size);
1128 			kfree(digest);
1129 		}
1130 
1131 		if (eq) {
1132 			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1133 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1134 			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1135 			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1136 		} else {
1137 			inc_rs_pending(device);
1138 			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1139 			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1140 			kfree(di);
1141 			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1142 		}
1143 	} else {
1144 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1145 		if (__ratelimit(&drbd_ratelimit_state))
1146 			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1147 	}
1148 
1149 	dec_unacked(device);
1150 	move_to_net_ee_or_free(device, peer_req);
1151 
1152 	if (unlikely(err))
1153 		drbd_err(device, "drbd_send_block/ack() failed\n");
1154 	return err;
1155 }
1156 
1157 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1158 {
1159 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1160 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1161 	struct drbd_device *device = peer_device->device;
1162 	sector_t sector = peer_req->i.sector;
1163 	unsigned int size = peer_req->i.size;
1164 	int digest_size;
1165 	void *digest;
1166 	int err = 0;
1167 
1168 	if (unlikely(cancel))
1169 		goto out;
1170 
1171 	digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1172 	digest = kmalloc(digest_size, GFP_NOIO);
1173 	if (!digest) {
1174 		err = 1;	/* terminate the connection in case the allocation failed */
1175 		goto out;
1176 	}
1177 
1178 	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1179 		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1180 	else
1181 		memset(digest, 0, digest_size);
1182 
1183 	/* Free e and pages before send.
1184 	 * In case we block on congestion, we could otherwise run into
1185 	 * some distributed deadlock, if the other side blocks on
1186 	 * congestion as well, because our receiver blocks in
1187 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1188 	drbd_free_peer_req(device, peer_req);
1189 	peer_req = NULL;
1190 	inc_rs_pending(device);
1191 	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1192 	if (err)
1193 		dec_rs_pending(device);
1194 	kfree(digest);
1195 
1196 out:
1197 	if (peer_req)
1198 		drbd_free_peer_req(device, peer_req);
1199 	dec_unacked(device);
1200 	return err;
1201 }
1202 
1203 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1204 {
1205 	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1206 		device->ov_last_oos_size += size>>9;
1207 	} else {
1208 		device->ov_last_oos_start = sector;
1209 		device->ov_last_oos_size = size>>9;
1210 	}
1211 	drbd_set_out_of_sync(device, sector, size);
1212 }
1213 
1214 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1215 {
1216 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1217 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1218 	struct drbd_device *device = peer_device->device;
1219 	struct digest_info *di;
1220 	void *digest;
1221 	sector_t sector = peer_req->i.sector;
1222 	unsigned int size = peer_req->i.size;
1223 	int digest_size;
1224 	int err, eq = 0;
1225 	bool stop_sector_reached = false;
1226 
1227 	if (unlikely(cancel)) {
1228 		drbd_free_peer_req(device, peer_req);
1229 		dec_unacked(device);
1230 		return 0;
1231 	}
1232 
1233 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1234 	 * the resync lru has been cleaned up already */
1235 	if (get_ldev(device)) {
1236 		drbd_rs_complete_io(device, peer_req->i.sector);
1237 		put_ldev(device);
1238 	}
1239 
1240 	di = peer_req->digest;
1241 
1242 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1243 		digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1244 		digest = kmalloc(digest_size, GFP_NOIO);
1245 		if (digest) {
1246 			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1247 
1248 			D_ASSERT(device, digest_size == di->digest_size);
1249 			eq = !memcmp(digest, di->digest, digest_size);
1250 			kfree(digest);
1251 		}
1252 	}
1253 
1254 	/* Free peer_req and pages before send.
1255 	 * In case we block on congestion, we could otherwise run into
1256 	 * some distributed deadlock, if the other side blocks on
1257 	 * congestion as well, because our receiver blocks in
1258 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1259 	drbd_free_peer_req(device, peer_req);
1260 	if (!eq)
1261 		drbd_ov_out_of_sync_found(device, sector, size);
1262 	else
1263 		ov_out_of_sync_print(device);
1264 
1265 	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1266 			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1267 
1268 	dec_unacked(device);
1269 
1270 	--device->ov_left;
1271 
1272 	/* let's advance progress step marks only for every other megabyte */
1273 	if ((device->ov_left & 0x200) == 0x200)
1274 		drbd_advance_rs_marks(device, device->ov_left);
1275 
1276 	stop_sector_reached = verify_can_do_stop_sector(device) &&
1277 		(sector + (size>>9)) >= device->ov_stop_sector;
1278 
1279 	if (device->ov_left == 0 || stop_sector_reached) {
1280 		ov_out_of_sync_print(device);
1281 		drbd_resync_finished(device);
1282 	}
1283 
1284 	return err;
1285 }
1286 
1287 /* FIXME
1288  * We need to track the number of pending barrier acks,
1289  * and to be able to wait for them.
1290  * See also comment in drbd_adm_attach before drbd_suspend_io.
1291  */
1292 static int drbd_send_barrier(struct drbd_connection *connection)
1293 {
1294 	struct p_barrier *p;
1295 	struct drbd_socket *sock;
1296 
1297 	sock = &connection->data;
1298 	p = conn_prepare_command(connection, sock);
1299 	if (!p)
1300 		return -EIO;
1301 	p->barrier = connection->send.current_epoch_nr;
1302 	p->pad = 0;
1303 	connection->send.current_epoch_writes = 0;
1304 	connection->send.last_sent_barrier_jif = jiffies;
1305 
1306 	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1307 }
1308 
1309 int w_send_write_hint(struct drbd_work *w, int cancel)
1310 {
1311 	struct drbd_device *device =
1312 		container_of(w, struct drbd_device, unplug_work);
1313 	struct drbd_socket *sock;
1314 
1315 	if (cancel)
1316 		return 0;
1317 	sock = &first_peer_device(device)->connection->data;
1318 	if (!drbd_prepare_command(first_peer_device(device), sock))
1319 		return -EIO;
1320 	return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1321 }
1322 
1323 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1324 {
1325 	if (!connection->send.seen_any_write_yet) {
1326 		connection->send.seen_any_write_yet = true;
1327 		connection->send.current_epoch_nr = epoch;
1328 		connection->send.current_epoch_writes = 0;
1329 		connection->send.last_sent_barrier_jif = jiffies;
1330 	}
1331 }
1332 
1333 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1334 {
1335 	/* re-init if first write on this connection */
1336 	if (!connection->send.seen_any_write_yet)
1337 		return;
1338 	if (connection->send.current_epoch_nr != epoch) {
1339 		if (connection->send.current_epoch_writes)
1340 			drbd_send_barrier(connection);
1341 		connection->send.current_epoch_nr = epoch;
1342 	}
1343 }
1344 
1345 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1346 {
1347 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1348 	struct drbd_device *device = req->device;
1349 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1350 	struct drbd_connection *const connection = peer_device->connection;
1351 	int err;
1352 
1353 	if (unlikely(cancel)) {
1354 		req_mod(req, SEND_CANCELED);
1355 		return 0;
1356 	}
1357 	req->pre_send_jif = jiffies;
1358 
1359 	/* this time, no connection->send.current_epoch_writes++;
1360 	 * If it was sent, it was the closing barrier for the last
1361 	 * replicated epoch, before we went into AHEAD mode.
1362 	 * No more barriers will be sent, until we leave AHEAD mode again. */
1363 	maybe_send_barrier(connection, req->epoch);
1364 
1365 	err = drbd_send_out_of_sync(peer_device, req);
1366 	req_mod(req, OOS_HANDED_TO_NETWORK);
1367 
1368 	return err;
1369 }
1370 
1371 /**
1372  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1373  * @w:		work object.
1374  * @cancel:	The connection will be closed anyways
1375  */
1376 int w_send_dblock(struct drbd_work *w, int cancel)
1377 {
1378 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1379 	struct drbd_device *device = req->device;
1380 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1381 	struct drbd_connection *connection = peer_device->connection;
1382 	int err;
1383 
1384 	if (unlikely(cancel)) {
1385 		req_mod(req, SEND_CANCELED);
1386 		return 0;
1387 	}
1388 	req->pre_send_jif = jiffies;
1389 
1390 	re_init_if_first_write(connection, req->epoch);
1391 	maybe_send_barrier(connection, req->epoch);
1392 	connection->send.current_epoch_writes++;
1393 
1394 	err = drbd_send_dblock(peer_device, req);
1395 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1396 
1397 	return err;
1398 }
1399 
1400 /**
1401  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1402  * @w:		work object.
1403  * @cancel:	The connection will be closed anyways
1404  */
1405 int w_send_read_req(struct drbd_work *w, int cancel)
1406 {
1407 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1408 	struct drbd_device *device = req->device;
1409 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1410 	struct drbd_connection *connection = peer_device->connection;
1411 	int err;
1412 
1413 	if (unlikely(cancel)) {
1414 		req_mod(req, SEND_CANCELED);
1415 		return 0;
1416 	}
1417 	req->pre_send_jif = jiffies;
1418 
1419 	/* Even read requests may close a write epoch,
1420 	 * if there was any yet. */
1421 	maybe_send_barrier(connection, req->epoch);
1422 
1423 	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1424 				 (unsigned long)req);
1425 
1426 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1427 
1428 	return err;
1429 }
1430 
1431 int w_restart_disk_io(struct drbd_work *w, int cancel)
1432 {
1433 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1434 	struct drbd_device *device = req->device;
1435 
1436 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1437 		drbd_al_begin_io(device, &req->i);
1438 
1439 	drbd_req_make_private_bio(req, req->master_bio);
1440 	req->private_bio->bi_bdev = device->ldev->backing_bdev;
1441 	generic_make_request(req->private_bio);
1442 
1443 	return 0;
1444 }
1445 
1446 static int _drbd_may_sync_now(struct drbd_device *device)
1447 {
1448 	struct drbd_device *odev = device;
1449 	int resync_after;
1450 
1451 	while (1) {
1452 		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1453 			return 1;
1454 		rcu_read_lock();
1455 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1456 		rcu_read_unlock();
1457 		if (resync_after == -1)
1458 			return 1;
1459 		odev = minor_to_device(resync_after);
1460 		if (!odev)
1461 			return 1;
1462 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1463 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1464 		    odev->state.aftr_isp || odev->state.peer_isp ||
1465 		    odev->state.user_isp)
1466 			return 0;
1467 	}
1468 }
1469 
1470 /**
1471  * drbd_pause_after() - Pause resync on all devices that may not resync now
1472  * @device:	DRBD device.
1473  *
1474  * Called from process context only (admin command and after_state_ch).
1475  */
1476 static bool drbd_pause_after(struct drbd_device *device)
1477 {
1478 	bool changed = false;
1479 	struct drbd_device *odev;
1480 	int i;
1481 
1482 	rcu_read_lock();
1483 	idr_for_each_entry(&drbd_devices, odev, i) {
1484 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1485 			continue;
1486 		if (!_drbd_may_sync_now(odev) &&
1487 		    _drbd_set_state(_NS(odev, aftr_isp, 1),
1488 				    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1489 			changed = true;
1490 	}
1491 	rcu_read_unlock();
1492 
1493 	return changed;
1494 }
1495 
1496 /**
1497  * drbd_resume_next() - Resume resync on all devices that may resync now
1498  * @device:	DRBD device.
1499  *
1500  * Called from process context only (admin command and worker).
1501  */
1502 static bool drbd_resume_next(struct drbd_device *device)
1503 {
1504 	bool changed = false;
1505 	struct drbd_device *odev;
1506 	int i;
1507 
1508 	rcu_read_lock();
1509 	idr_for_each_entry(&drbd_devices, odev, i) {
1510 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1511 			continue;
1512 		if (odev->state.aftr_isp) {
1513 			if (_drbd_may_sync_now(odev) &&
1514 			    _drbd_set_state(_NS(odev, aftr_isp, 0),
1515 					    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1516 				changed = true;
1517 		}
1518 	}
1519 	rcu_read_unlock();
1520 	return changed;
1521 }
1522 
1523 void resume_next_sg(struct drbd_device *device)
1524 {
1525 	lock_all_resources();
1526 	drbd_resume_next(device);
1527 	unlock_all_resources();
1528 }
1529 
1530 void suspend_other_sg(struct drbd_device *device)
1531 {
1532 	lock_all_resources();
1533 	drbd_pause_after(device);
1534 	unlock_all_resources();
1535 }
1536 
1537 /* caller must lock_all_resources() */
1538 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1539 {
1540 	struct drbd_device *odev;
1541 	int resync_after;
1542 
1543 	if (o_minor == -1)
1544 		return NO_ERROR;
1545 	if (o_minor < -1 || o_minor > MINORMASK)
1546 		return ERR_RESYNC_AFTER;
1547 
1548 	/* check for loops */
1549 	odev = minor_to_device(o_minor);
1550 	while (1) {
1551 		if (odev == device)
1552 			return ERR_RESYNC_AFTER_CYCLE;
1553 
1554 		/* You are free to depend on diskless, non-existing,
1555 		 * or not yet/no longer existing minors.
1556 		 * We only reject dependency loops.
1557 		 * We cannot follow the dependency chain beyond a detached or
1558 		 * missing minor.
1559 		 */
1560 		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1561 			return NO_ERROR;
1562 
1563 		rcu_read_lock();
1564 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1565 		rcu_read_unlock();
1566 		/* dependency chain ends here, no cycles. */
1567 		if (resync_after == -1)
1568 			return NO_ERROR;
1569 
1570 		/* follow the dependency chain */
1571 		odev = minor_to_device(resync_after);
1572 	}
1573 }
1574 
1575 /* caller must lock_all_resources() */
1576 void drbd_resync_after_changed(struct drbd_device *device)
1577 {
1578 	int changed;
1579 
1580 	do {
1581 		changed  = drbd_pause_after(device);
1582 		changed |= drbd_resume_next(device);
1583 	} while (changed);
1584 }
1585 
1586 void drbd_rs_controller_reset(struct drbd_device *device)
1587 {
1588 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1589 	struct fifo_buffer *plan;
1590 
1591 	atomic_set(&device->rs_sect_in, 0);
1592 	atomic_set(&device->rs_sect_ev, 0);
1593 	device->rs_in_flight = 0;
1594 	device->rs_last_events =
1595 		(int)part_stat_read(&disk->part0, sectors[0]) +
1596 		(int)part_stat_read(&disk->part0, sectors[1]);
1597 
1598 	/* Updating the RCU protected object in place is necessary since
1599 	   this function gets called from atomic context.
1600 	   It is valid since all other updates also lead to an completely
1601 	   empty fifo */
1602 	rcu_read_lock();
1603 	plan = rcu_dereference(device->rs_plan_s);
1604 	plan->total = 0;
1605 	fifo_set(plan, 0);
1606 	rcu_read_unlock();
1607 }
1608 
1609 void start_resync_timer_fn(unsigned long data)
1610 {
1611 	struct drbd_device *device = (struct drbd_device *) data;
1612 	drbd_device_post_work(device, RS_START);
1613 }
1614 
1615 static void do_start_resync(struct drbd_device *device)
1616 {
1617 	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1618 		drbd_warn(device, "postponing start_resync ...\n");
1619 		device->start_resync_timer.expires = jiffies + HZ/10;
1620 		add_timer(&device->start_resync_timer);
1621 		return;
1622 	}
1623 
1624 	drbd_start_resync(device, C_SYNC_SOURCE);
1625 	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1626 }
1627 
1628 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1629 {
1630 	bool csums_after_crash_only;
1631 	rcu_read_lock();
1632 	csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1633 	rcu_read_unlock();
1634 	return connection->agreed_pro_version >= 89 &&		/* supported? */
1635 		connection->csums_tfm &&			/* configured? */
1636 		(csums_after_crash_only == 0			/* use for each resync? */
1637 		 || test_bit(CRASHED_PRIMARY, &device->flags));	/* or only after Primary crash? */
1638 }
1639 
1640 /**
1641  * drbd_start_resync() - Start the resync process
1642  * @device:	DRBD device.
1643  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1644  *
1645  * This function might bring you directly into one of the
1646  * C_PAUSED_SYNC_* states.
1647  */
1648 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1649 {
1650 	struct drbd_peer_device *peer_device = first_peer_device(device);
1651 	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1652 	union drbd_state ns;
1653 	int r;
1654 
1655 	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1656 		drbd_err(device, "Resync already running!\n");
1657 		return;
1658 	}
1659 
1660 	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1661 		if (side == C_SYNC_TARGET) {
1662 			/* Since application IO was locked out during C_WF_BITMAP_T and
1663 			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1664 			   we check that we might make the data inconsistent. */
1665 			r = drbd_khelper(device, "before-resync-target");
1666 			r = (r >> 8) & 0xff;
1667 			if (r > 0) {
1668 				drbd_info(device, "before-resync-target handler returned %d, "
1669 					 "dropping connection.\n", r);
1670 				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1671 				return;
1672 			}
1673 		} else /* C_SYNC_SOURCE */ {
1674 			r = drbd_khelper(device, "before-resync-source");
1675 			r = (r >> 8) & 0xff;
1676 			if (r > 0) {
1677 				if (r == 3) {
1678 					drbd_info(device, "before-resync-source handler returned %d, "
1679 						 "ignoring. Old userland tools?", r);
1680 				} else {
1681 					drbd_info(device, "before-resync-source handler returned %d, "
1682 						 "dropping connection.\n", r);
1683 					conn_request_state(connection,
1684 							   NS(conn, C_DISCONNECTING), CS_HARD);
1685 					return;
1686 				}
1687 			}
1688 		}
1689 	}
1690 
1691 	if (current == connection->worker.task) {
1692 		/* The worker should not sleep waiting for state_mutex,
1693 		   that can take long */
1694 		if (!mutex_trylock(device->state_mutex)) {
1695 			set_bit(B_RS_H_DONE, &device->flags);
1696 			device->start_resync_timer.expires = jiffies + HZ/5;
1697 			add_timer(&device->start_resync_timer);
1698 			return;
1699 		}
1700 	} else {
1701 		mutex_lock(device->state_mutex);
1702 	}
1703 
1704 	lock_all_resources();
1705 	clear_bit(B_RS_H_DONE, &device->flags);
1706 	/* Did some connection breakage or IO error race with us? */
1707 	if (device->state.conn < C_CONNECTED
1708 	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1709 		unlock_all_resources();
1710 		goto out;
1711 	}
1712 
1713 	ns = drbd_read_state(device);
1714 
1715 	ns.aftr_isp = !_drbd_may_sync_now(device);
1716 
1717 	ns.conn = side;
1718 
1719 	if (side == C_SYNC_TARGET)
1720 		ns.disk = D_INCONSISTENT;
1721 	else /* side == C_SYNC_SOURCE */
1722 		ns.pdsk = D_INCONSISTENT;
1723 
1724 	r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1725 	ns = drbd_read_state(device);
1726 
1727 	if (ns.conn < C_CONNECTED)
1728 		r = SS_UNKNOWN_ERROR;
1729 
1730 	if (r == SS_SUCCESS) {
1731 		unsigned long tw = drbd_bm_total_weight(device);
1732 		unsigned long now = jiffies;
1733 		int i;
1734 
1735 		device->rs_failed    = 0;
1736 		device->rs_paused    = 0;
1737 		device->rs_same_csum = 0;
1738 		device->rs_last_sect_ev = 0;
1739 		device->rs_total     = tw;
1740 		device->rs_start     = now;
1741 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1742 			device->rs_mark_left[i] = tw;
1743 			device->rs_mark_time[i] = now;
1744 		}
1745 		drbd_pause_after(device);
1746 		/* Forget potentially stale cached per resync extent bit-counts.
1747 		 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1748 		 * disabled, and know the disk state is ok. */
1749 		spin_lock(&device->al_lock);
1750 		lc_reset(device->resync);
1751 		device->resync_locked = 0;
1752 		device->resync_wenr = LC_FREE;
1753 		spin_unlock(&device->al_lock);
1754 	}
1755 	unlock_all_resources();
1756 
1757 	if (r == SS_SUCCESS) {
1758 		wake_up(&device->al_wait); /* for lc_reset() above */
1759 		/* reset rs_last_bcast when a resync or verify is started,
1760 		 * to deal with potential jiffies wrap. */
1761 		device->rs_last_bcast = jiffies - HZ;
1762 
1763 		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1764 		     drbd_conn_str(ns.conn),
1765 		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1766 		     (unsigned long) device->rs_total);
1767 		if (side == C_SYNC_TARGET) {
1768 			device->bm_resync_fo = 0;
1769 			device->use_csums = use_checksum_based_resync(connection, device);
1770 		} else {
1771 			device->use_csums = 0;
1772 		}
1773 
1774 		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1775 		 * with w_send_oos, or the sync target will get confused as to
1776 		 * how much bits to resync.  We cannot do that always, because for an
1777 		 * empty resync and protocol < 95, we need to do it here, as we call
1778 		 * drbd_resync_finished from here in that case.
1779 		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1780 		 * and from after_state_ch otherwise. */
1781 		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1782 			drbd_gen_and_send_sync_uuid(peer_device);
1783 
1784 		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1785 			/* This still has a race (about when exactly the peers
1786 			 * detect connection loss) that can lead to a full sync
1787 			 * on next handshake. In 8.3.9 we fixed this with explicit
1788 			 * resync-finished notifications, but the fix
1789 			 * introduces a protocol change.  Sleeping for some
1790 			 * time longer than the ping interval + timeout on the
1791 			 * SyncSource, to give the SyncTarget the chance to
1792 			 * detect connection loss, then waiting for a ping
1793 			 * response (implicit in drbd_resync_finished) reduces
1794 			 * the race considerably, but does not solve it. */
1795 			if (side == C_SYNC_SOURCE) {
1796 				struct net_conf *nc;
1797 				int timeo;
1798 
1799 				rcu_read_lock();
1800 				nc = rcu_dereference(connection->net_conf);
1801 				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1802 				rcu_read_unlock();
1803 				schedule_timeout_interruptible(timeo);
1804 			}
1805 			drbd_resync_finished(device);
1806 		}
1807 
1808 		drbd_rs_controller_reset(device);
1809 		/* ns.conn may already be != device->state.conn,
1810 		 * we may have been paused in between, or become paused until
1811 		 * the timer triggers.
1812 		 * No matter, that is handled in resync_timer_fn() */
1813 		if (ns.conn == C_SYNC_TARGET)
1814 			mod_timer(&device->resync_timer, jiffies);
1815 
1816 		drbd_md_sync(device);
1817 	}
1818 	put_ldev(device);
1819 out:
1820 	mutex_unlock(device->state_mutex);
1821 }
1822 
1823 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1824 {
1825 	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1826 	device->rs_last_bcast = jiffies;
1827 
1828 	if (!get_ldev(device))
1829 		return;
1830 
1831 	drbd_bm_write_lazy(device, 0);
1832 	if (resync_done && is_sync_state(device->state.conn))
1833 		drbd_resync_finished(device);
1834 
1835 	drbd_bcast_event(device, &sib);
1836 	/* update timestamp, in case it took a while to write out stuff */
1837 	device->rs_last_bcast = jiffies;
1838 	put_ldev(device);
1839 }
1840 
1841 static void drbd_ldev_destroy(struct drbd_device *device)
1842 {
1843 	lc_destroy(device->resync);
1844 	device->resync = NULL;
1845 	lc_destroy(device->act_log);
1846 	device->act_log = NULL;
1847 
1848 	__acquire(local);
1849 	drbd_backing_dev_free(device, device->ldev);
1850 	device->ldev = NULL;
1851 	__release(local);
1852 
1853 	clear_bit(GOING_DISKLESS, &device->flags);
1854 	wake_up(&device->misc_wait);
1855 }
1856 
1857 static void go_diskless(struct drbd_device *device)
1858 {
1859 	D_ASSERT(device, device->state.disk == D_FAILED);
1860 	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1861 	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1862 	 * the protected members anymore, though, so once put_ldev reaches zero
1863 	 * again, it will be safe to free them. */
1864 
1865 	/* Try to write changed bitmap pages, read errors may have just
1866 	 * set some bits outside the area covered by the activity log.
1867 	 *
1868 	 * If we have an IO error during the bitmap writeout,
1869 	 * we will want a full sync next time, just in case.
1870 	 * (Do we want a specific meta data flag for this?)
1871 	 *
1872 	 * If that does not make it to stable storage either,
1873 	 * we cannot do anything about that anymore.
1874 	 *
1875 	 * We still need to check if both bitmap and ldev are present, we may
1876 	 * end up here after a failed attach, before ldev was even assigned.
1877 	 */
1878 	if (device->bitmap && device->ldev) {
1879 		/* An interrupted resync or similar is allowed to recounts bits
1880 		 * while we detach.
1881 		 * Any modifications would not be expected anymore, though.
1882 		 */
1883 		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1884 					"detach", BM_LOCKED_TEST_ALLOWED)) {
1885 			if (test_bit(WAS_READ_ERROR, &device->flags)) {
1886 				drbd_md_set_flag(device, MDF_FULL_SYNC);
1887 				drbd_md_sync(device);
1888 			}
1889 		}
1890 	}
1891 
1892 	drbd_force_state(device, NS(disk, D_DISKLESS));
1893 }
1894 
1895 static int do_md_sync(struct drbd_device *device)
1896 {
1897 	drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1898 	drbd_md_sync(device);
1899 	return 0;
1900 }
1901 
1902 /* only called from drbd_worker thread, no locking */
1903 void __update_timing_details(
1904 		struct drbd_thread_timing_details *tdp,
1905 		unsigned int *cb_nr,
1906 		void *cb,
1907 		const char *fn, const unsigned int line)
1908 {
1909 	unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1910 	struct drbd_thread_timing_details *td = tdp + i;
1911 
1912 	td->start_jif = jiffies;
1913 	td->cb_addr = cb;
1914 	td->caller_fn = fn;
1915 	td->line = line;
1916 	td->cb_nr = *cb_nr;
1917 
1918 	i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1919 	td = tdp + i;
1920 	memset(td, 0, sizeof(*td));
1921 
1922 	++(*cb_nr);
1923 }
1924 
1925 static void do_device_work(struct drbd_device *device, const unsigned long todo)
1926 {
1927 	if (test_bit(MD_SYNC, &todo))
1928 		do_md_sync(device);
1929 	if (test_bit(RS_DONE, &todo) ||
1930 	    test_bit(RS_PROGRESS, &todo))
1931 		update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1932 	if (test_bit(GO_DISKLESS, &todo))
1933 		go_diskless(device);
1934 	if (test_bit(DESTROY_DISK, &todo))
1935 		drbd_ldev_destroy(device);
1936 	if (test_bit(RS_START, &todo))
1937 		do_start_resync(device);
1938 }
1939 
1940 #define DRBD_DEVICE_WORK_MASK	\
1941 	((1UL << GO_DISKLESS)	\
1942 	|(1UL << DESTROY_DISK)	\
1943 	|(1UL << MD_SYNC)	\
1944 	|(1UL << RS_START)	\
1945 	|(1UL << RS_PROGRESS)	\
1946 	|(1UL << RS_DONE)	\
1947 	)
1948 
1949 static unsigned long get_work_bits(unsigned long *flags)
1950 {
1951 	unsigned long old, new;
1952 	do {
1953 		old = *flags;
1954 		new = old & ~DRBD_DEVICE_WORK_MASK;
1955 	} while (cmpxchg(flags, old, new) != old);
1956 	return old & DRBD_DEVICE_WORK_MASK;
1957 }
1958 
1959 static void do_unqueued_work(struct drbd_connection *connection)
1960 {
1961 	struct drbd_peer_device *peer_device;
1962 	int vnr;
1963 
1964 	rcu_read_lock();
1965 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1966 		struct drbd_device *device = peer_device->device;
1967 		unsigned long todo = get_work_bits(&device->flags);
1968 		if (!todo)
1969 			continue;
1970 
1971 		kref_get(&device->kref);
1972 		rcu_read_unlock();
1973 		do_device_work(device, todo);
1974 		kref_put(&device->kref, drbd_destroy_device);
1975 		rcu_read_lock();
1976 	}
1977 	rcu_read_unlock();
1978 }
1979 
1980 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1981 {
1982 	spin_lock_irq(&queue->q_lock);
1983 	list_splice_tail_init(&queue->q, work_list);
1984 	spin_unlock_irq(&queue->q_lock);
1985 	return !list_empty(work_list);
1986 }
1987 
1988 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1989 {
1990 	DEFINE_WAIT(wait);
1991 	struct net_conf *nc;
1992 	int uncork, cork;
1993 
1994 	dequeue_work_batch(&connection->sender_work, work_list);
1995 	if (!list_empty(work_list))
1996 		return;
1997 
1998 	/* Still nothing to do?
1999 	 * Maybe we still need to close the current epoch,
2000 	 * even if no new requests are queued yet.
2001 	 *
2002 	 * Also, poke TCP, just in case.
2003 	 * Then wait for new work (or signal). */
2004 	rcu_read_lock();
2005 	nc = rcu_dereference(connection->net_conf);
2006 	uncork = nc ? nc->tcp_cork : 0;
2007 	rcu_read_unlock();
2008 	if (uncork) {
2009 		mutex_lock(&connection->data.mutex);
2010 		if (connection->data.socket)
2011 			drbd_tcp_uncork(connection->data.socket);
2012 		mutex_unlock(&connection->data.mutex);
2013 	}
2014 
2015 	for (;;) {
2016 		int send_barrier;
2017 		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2018 		spin_lock_irq(&connection->resource->req_lock);
2019 		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2020 		if (!list_empty(&connection->sender_work.q))
2021 			list_splice_tail_init(&connection->sender_work.q, work_list);
2022 		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2023 		if (!list_empty(work_list) || signal_pending(current)) {
2024 			spin_unlock_irq(&connection->resource->req_lock);
2025 			break;
2026 		}
2027 
2028 		/* We found nothing new to do, no to-be-communicated request,
2029 		 * no other work item.  We may still need to close the last
2030 		 * epoch.  Next incoming request epoch will be connection ->
2031 		 * current transfer log epoch number.  If that is different
2032 		 * from the epoch of the last request we communicated, it is
2033 		 * safe to send the epoch separating barrier now.
2034 		 */
2035 		send_barrier =
2036 			atomic_read(&connection->current_tle_nr) !=
2037 			connection->send.current_epoch_nr;
2038 		spin_unlock_irq(&connection->resource->req_lock);
2039 
2040 		if (send_barrier)
2041 			maybe_send_barrier(connection,
2042 					connection->send.current_epoch_nr + 1);
2043 
2044 		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2045 			break;
2046 
2047 		/* drbd_send() may have called flush_signals() */
2048 		if (get_t_state(&connection->worker) != RUNNING)
2049 			break;
2050 
2051 		schedule();
2052 		/* may be woken up for other things but new work, too,
2053 		 * e.g. if the current epoch got closed.
2054 		 * In which case we send the barrier above. */
2055 	}
2056 	finish_wait(&connection->sender_work.q_wait, &wait);
2057 
2058 	/* someone may have changed the config while we have been waiting above. */
2059 	rcu_read_lock();
2060 	nc = rcu_dereference(connection->net_conf);
2061 	cork = nc ? nc->tcp_cork : 0;
2062 	rcu_read_unlock();
2063 	mutex_lock(&connection->data.mutex);
2064 	if (connection->data.socket) {
2065 		if (cork)
2066 			drbd_tcp_cork(connection->data.socket);
2067 		else if (!uncork)
2068 			drbd_tcp_uncork(connection->data.socket);
2069 	}
2070 	mutex_unlock(&connection->data.mutex);
2071 }
2072 
2073 int drbd_worker(struct drbd_thread *thi)
2074 {
2075 	struct drbd_connection *connection = thi->connection;
2076 	struct drbd_work *w = NULL;
2077 	struct drbd_peer_device *peer_device;
2078 	LIST_HEAD(work_list);
2079 	int vnr;
2080 
2081 	while (get_t_state(thi) == RUNNING) {
2082 		drbd_thread_current_set_cpu(thi);
2083 
2084 		if (list_empty(&work_list)) {
2085 			update_worker_timing_details(connection, wait_for_work);
2086 			wait_for_work(connection, &work_list);
2087 		}
2088 
2089 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2090 			update_worker_timing_details(connection, do_unqueued_work);
2091 			do_unqueued_work(connection);
2092 		}
2093 
2094 		if (signal_pending(current)) {
2095 			flush_signals(current);
2096 			if (get_t_state(thi) == RUNNING) {
2097 				drbd_warn(connection, "Worker got an unexpected signal\n");
2098 				continue;
2099 			}
2100 			break;
2101 		}
2102 
2103 		if (get_t_state(thi) != RUNNING)
2104 			break;
2105 
2106 		if (!list_empty(&work_list)) {
2107 			w = list_first_entry(&work_list, struct drbd_work, list);
2108 			list_del_init(&w->list);
2109 			update_worker_timing_details(connection, w->cb);
2110 			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2111 				continue;
2112 			if (connection->cstate >= C_WF_REPORT_PARAMS)
2113 				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2114 		}
2115 	}
2116 
2117 	do {
2118 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2119 			update_worker_timing_details(connection, do_unqueued_work);
2120 			do_unqueued_work(connection);
2121 		}
2122 		if (!list_empty(&work_list)) {
2123 			w = list_first_entry(&work_list, struct drbd_work, list);
2124 			list_del_init(&w->list);
2125 			update_worker_timing_details(connection, w->cb);
2126 			w->cb(w, 1);
2127 		} else
2128 			dequeue_work_batch(&connection->sender_work, &work_list);
2129 	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2130 
2131 	rcu_read_lock();
2132 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2133 		struct drbd_device *device = peer_device->device;
2134 		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2135 		kref_get(&device->kref);
2136 		rcu_read_unlock();
2137 		drbd_device_cleanup(device);
2138 		kref_put(&device->kref, drbd_destroy_device);
2139 		rcu_read_lock();
2140 	}
2141 	rcu_read_unlock();
2142 
2143 	return 0;
2144 }
2145