xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision d2c43ff1)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24 */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched/signal.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41 
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44 
45 /* endio handlers:
46  *   drbd_md_endio (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   drbd_bm_endio (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57 
58 /* used for synchronous meta data and bitmap IO
59  * submitted by drbd_md_sync_page_io()
60  */
61 void drbd_md_endio(struct bio *bio)
62 {
63 	struct drbd_device *device;
64 
65 	device = bio->bi_private;
66 	device->md_io.error = blk_status_to_errno(bio->bi_status);
67 
68 	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
69 	 * to timeout on the lower level device, and eventually detach from it.
70 	 * If this io completion runs after that timeout expired, this
71 	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
72 	 * During normal operation, this only puts that extra reference
73 	 * down to 1 again.
74 	 * Make sure we first drop the reference, and only then signal
75 	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
76 	 * next drbd_md_sync_page_io(), that we trigger the
77 	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
78 	 */
79 	drbd_md_put_buffer(device);
80 	device->md_io.done = 1;
81 	wake_up(&device->misc_wait);
82 	bio_put(bio);
83 	if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
84 		put_ldev(device);
85 }
86 
87 /* reads on behalf of the partner,
88  * "submitted" by the receiver
89  */
90 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
91 {
92 	unsigned long flags = 0;
93 	struct drbd_peer_device *peer_device = peer_req->peer_device;
94 	struct drbd_device *device = peer_device->device;
95 
96 	spin_lock_irqsave(&device->resource->req_lock, flags);
97 	device->read_cnt += peer_req->i.size >> 9;
98 	list_del(&peer_req->w.list);
99 	if (list_empty(&device->read_ee))
100 		wake_up(&device->ee_wait);
101 	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
102 		__drbd_chk_io_error(device, DRBD_READ_ERROR);
103 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
104 
105 	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
106 	put_ldev(device);
107 }
108 
109 /* writes on behalf of the partner, or resync writes,
110  * "submitted" by the receiver, final stage.  */
111 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
112 {
113 	unsigned long flags = 0;
114 	struct drbd_peer_device *peer_device = peer_req->peer_device;
115 	struct drbd_device *device = peer_device->device;
116 	struct drbd_connection *connection = peer_device->connection;
117 	struct drbd_interval i;
118 	int do_wake;
119 	u64 block_id;
120 	int do_al_complete_io;
121 
122 	/* after we moved peer_req to done_ee,
123 	 * we may no longer access it,
124 	 * it may be freed/reused already!
125 	 * (as soon as we release the req_lock) */
126 	i = peer_req->i;
127 	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
128 	block_id = peer_req->block_id;
129 	peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
130 
131 	spin_lock_irqsave(&device->resource->req_lock, flags);
132 	device->writ_cnt += peer_req->i.size >> 9;
133 	list_move_tail(&peer_req->w.list, &device->done_ee);
134 
135 	/*
136 	 * Do not remove from the write_requests tree here: we did not send the
137 	 * Ack yet and did not wake possibly waiting conflicting requests.
138 	 * Removed from the tree from "drbd_process_done_ee" within the
139 	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
140 	 * _drbd_clear_done_ee.
141 	 */
142 
143 	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
144 
145 	/* FIXME do we want to detach for failed REQ_DISCARD?
146 	 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
147 	if (peer_req->flags & EE_WAS_ERROR)
148 		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
149 
150 	if (connection->cstate >= C_WF_REPORT_PARAMS) {
151 		kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
152 		if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
153 			kref_put(&device->kref, drbd_destroy_device);
154 	}
155 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
156 
157 	if (block_id == ID_SYNCER)
158 		drbd_rs_complete_io(device, i.sector);
159 
160 	if (do_wake)
161 		wake_up(&device->ee_wait);
162 
163 	if (do_al_complete_io)
164 		drbd_al_complete_io(device, &i);
165 
166 	put_ldev(device);
167 }
168 
169 /* writes on behalf of the partner, or resync writes,
170  * "submitted" by the receiver.
171  */
172 void drbd_peer_request_endio(struct bio *bio)
173 {
174 	struct drbd_peer_request *peer_req = bio->bi_private;
175 	struct drbd_device *device = peer_req->peer_device->device;
176 	bool is_write = bio_data_dir(bio) == WRITE;
177 	bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
178 			  bio_op(bio) == REQ_OP_DISCARD;
179 
180 	if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
181 		drbd_warn(device, "%s: error=%d s=%llus\n",
182 				is_write ? (is_discard ? "discard" : "write")
183 					: "read", bio->bi_status,
184 				(unsigned long long)peer_req->i.sector);
185 
186 	if (bio->bi_status)
187 		set_bit(__EE_WAS_ERROR, &peer_req->flags);
188 
189 	bio_put(bio); /* no need for the bio anymore */
190 	if (atomic_dec_and_test(&peer_req->pending_bios)) {
191 		if (is_write)
192 			drbd_endio_write_sec_final(peer_req);
193 		else
194 			drbd_endio_read_sec_final(peer_req);
195 	}
196 }
197 
198 void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
199 {
200 	panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
201 		device->minor, device->resource->name, device->vnr);
202 }
203 
204 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205  */
206 void drbd_request_endio(struct bio *bio)
207 {
208 	unsigned long flags;
209 	struct drbd_request *req = bio->bi_private;
210 	struct drbd_device *device = req->device;
211 	struct bio_and_error m;
212 	enum drbd_req_event what;
213 
214 	/* If this request was aborted locally before,
215 	 * but now was completed "successfully",
216 	 * chances are that this caused arbitrary data corruption.
217 	 *
218 	 * "aborting" requests, or force-detaching the disk, is intended for
219 	 * completely blocked/hung local backing devices which do no longer
220 	 * complete requests at all, not even do error completions.  In this
221 	 * situation, usually a hard-reset and failover is the only way out.
222 	 *
223 	 * By "aborting", basically faking a local error-completion,
224 	 * we allow for a more graceful swichover by cleanly migrating services.
225 	 * Still the affected node has to be rebooted "soon".
226 	 *
227 	 * By completing these requests, we allow the upper layers to re-use
228 	 * the associated data pages.
229 	 *
230 	 * If later the local backing device "recovers", and now DMAs some data
231 	 * from disk into the original request pages, in the best case it will
232 	 * just put random data into unused pages; but typically it will corrupt
233 	 * meanwhile completely unrelated data, causing all sorts of damage.
234 	 *
235 	 * Which means delayed successful completion,
236 	 * especially for READ requests,
237 	 * is a reason to panic().
238 	 *
239 	 * We assume that a delayed *error* completion is OK,
240 	 * though we still will complain noisily about it.
241 	 */
242 	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
243 		if (__ratelimit(&drbd_ratelimit_state))
244 			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
245 
246 		if (!bio->bi_status)
247 			drbd_panic_after_delayed_completion_of_aborted_request(device);
248 	}
249 
250 	/* to avoid recursion in __req_mod */
251 	if (unlikely(bio->bi_status)) {
252 		switch (bio_op(bio)) {
253 		case REQ_OP_WRITE_ZEROES:
254 		case REQ_OP_DISCARD:
255 			if (bio->bi_status == BLK_STS_NOTSUPP)
256 				what = DISCARD_COMPLETED_NOTSUPP;
257 			else
258 				what = DISCARD_COMPLETED_WITH_ERROR;
259 			break;
260 		case REQ_OP_READ:
261 			if (bio->bi_opf & REQ_RAHEAD)
262 				what = READ_AHEAD_COMPLETED_WITH_ERROR;
263 			else
264 				what = READ_COMPLETED_WITH_ERROR;
265 			break;
266 		default:
267 			what = WRITE_COMPLETED_WITH_ERROR;
268 			break;
269 		}
270 	} else {
271 		what = COMPLETED_OK;
272 	}
273 
274 	bio_put(req->private_bio);
275 	req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
276 
277 	/* not req_mod(), we need irqsave here! */
278 	spin_lock_irqsave(&device->resource->req_lock, flags);
279 	__req_mod(req, what, &m);
280 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
281 	put_ldev(device);
282 
283 	if (m.bio)
284 		complete_master_bio(device, &m);
285 }
286 
287 void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
288 {
289 	AHASH_REQUEST_ON_STACK(req, tfm);
290 	struct scatterlist sg;
291 	struct page *page = peer_req->pages;
292 	struct page *tmp;
293 	unsigned len;
294 
295 	ahash_request_set_tfm(req, tfm);
296 	ahash_request_set_callback(req, 0, NULL, NULL);
297 
298 	sg_init_table(&sg, 1);
299 	crypto_ahash_init(req);
300 
301 	while ((tmp = page_chain_next(page))) {
302 		/* all but the last page will be fully used */
303 		sg_set_page(&sg, page, PAGE_SIZE, 0);
304 		ahash_request_set_crypt(req, &sg, NULL, sg.length);
305 		crypto_ahash_update(req);
306 		page = tmp;
307 	}
308 	/* and now the last, possibly only partially used page */
309 	len = peer_req->i.size & (PAGE_SIZE - 1);
310 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
311 	ahash_request_set_crypt(req, &sg, digest, sg.length);
312 	crypto_ahash_finup(req);
313 	ahash_request_zero(req);
314 }
315 
316 void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
317 {
318 	AHASH_REQUEST_ON_STACK(req, tfm);
319 	struct scatterlist sg;
320 	struct bio_vec bvec;
321 	struct bvec_iter iter;
322 
323 	ahash_request_set_tfm(req, tfm);
324 	ahash_request_set_callback(req, 0, NULL, NULL);
325 
326 	sg_init_table(&sg, 1);
327 	crypto_ahash_init(req);
328 
329 	bio_for_each_segment(bvec, bio, iter) {
330 		sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
331 		ahash_request_set_crypt(req, &sg, NULL, sg.length);
332 		crypto_ahash_update(req);
333 		/* REQ_OP_WRITE_SAME has only one segment,
334 		 * checksum the payload only once. */
335 		if (bio_op(bio) == REQ_OP_WRITE_SAME)
336 			break;
337 	}
338 	ahash_request_set_crypt(req, NULL, digest, 0);
339 	crypto_ahash_final(req);
340 	ahash_request_zero(req);
341 }
342 
343 /* MAYBE merge common code with w_e_end_ov_req */
344 static int w_e_send_csum(struct drbd_work *w, int cancel)
345 {
346 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
347 	struct drbd_peer_device *peer_device = peer_req->peer_device;
348 	struct drbd_device *device = peer_device->device;
349 	int digest_size;
350 	void *digest;
351 	int err = 0;
352 
353 	if (unlikely(cancel))
354 		goto out;
355 
356 	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
357 		goto out;
358 
359 	digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
360 	digest = kmalloc(digest_size, GFP_NOIO);
361 	if (digest) {
362 		sector_t sector = peer_req->i.sector;
363 		unsigned int size = peer_req->i.size;
364 		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
365 		/* Free peer_req and pages before send.
366 		 * In case we block on congestion, we could otherwise run into
367 		 * some distributed deadlock, if the other side blocks on
368 		 * congestion as well, because our receiver blocks in
369 		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
370 		drbd_free_peer_req(device, peer_req);
371 		peer_req = NULL;
372 		inc_rs_pending(device);
373 		err = drbd_send_drequest_csum(peer_device, sector, size,
374 					      digest, digest_size,
375 					      P_CSUM_RS_REQUEST);
376 		kfree(digest);
377 	} else {
378 		drbd_err(device, "kmalloc() of digest failed.\n");
379 		err = -ENOMEM;
380 	}
381 
382 out:
383 	if (peer_req)
384 		drbd_free_peer_req(device, peer_req);
385 
386 	if (unlikely(err))
387 		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
388 	return err;
389 }
390 
391 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
392 
393 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
394 {
395 	struct drbd_device *device = peer_device->device;
396 	struct drbd_peer_request *peer_req;
397 
398 	if (!get_ldev(device))
399 		return -EIO;
400 
401 	/* GFP_TRY, because if there is no memory available right now, this may
402 	 * be rescheduled for later. It is "only" background resync, after all. */
403 	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
404 				       size, size, GFP_TRY);
405 	if (!peer_req)
406 		goto defer;
407 
408 	peer_req->w.cb = w_e_send_csum;
409 	spin_lock_irq(&device->resource->req_lock);
410 	list_add_tail(&peer_req->w.list, &device->read_ee);
411 	spin_unlock_irq(&device->resource->req_lock);
412 
413 	atomic_add(size >> 9, &device->rs_sect_ev);
414 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
415 				     DRBD_FAULT_RS_RD) == 0)
416 		return 0;
417 
418 	/* If it failed because of ENOMEM, retry should help.  If it failed
419 	 * because bio_add_page failed (probably broken lower level driver),
420 	 * retry may or may not help.
421 	 * If it does not, you may need to force disconnect. */
422 	spin_lock_irq(&device->resource->req_lock);
423 	list_del(&peer_req->w.list);
424 	spin_unlock_irq(&device->resource->req_lock);
425 
426 	drbd_free_peer_req(device, peer_req);
427 defer:
428 	put_ldev(device);
429 	return -EAGAIN;
430 }
431 
432 int w_resync_timer(struct drbd_work *w, int cancel)
433 {
434 	struct drbd_device *device =
435 		container_of(w, struct drbd_device, resync_work);
436 
437 	switch (device->state.conn) {
438 	case C_VERIFY_S:
439 		make_ov_request(device, cancel);
440 		break;
441 	case C_SYNC_TARGET:
442 		make_resync_request(device, cancel);
443 		break;
444 	}
445 
446 	return 0;
447 }
448 
449 void resync_timer_fn(unsigned long data)
450 {
451 	struct drbd_device *device = (struct drbd_device *) data;
452 
453 	drbd_queue_work_if_unqueued(
454 		&first_peer_device(device)->connection->sender_work,
455 		&device->resync_work);
456 }
457 
458 static void fifo_set(struct fifo_buffer *fb, int value)
459 {
460 	int i;
461 
462 	for (i = 0; i < fb->size; i++)
463 		fb->values[i] = value;
464 }
465 
466 static int fifo_push(struct fifo_buffer *fb, int value)
467 {
468 	int ov;
469 
470 	ov = fb->values[fb->head_index];
471 	fb->values[fb->head_index++] = value;
472 
473 	if (fb->head_index >= fb->size)
474 		fb->head_index = 0;
475 
476 	return ov;
477 }
478 
479 static void fifo_add_val(struct fifo_buffer *fb, int value)
480 {
481 	int i;
482 
483 	for (i = 0; i < fb->size; i++)
484 		fb->values[i] += value;
485 }
486 
487 struct fifo_buffer *fifo_alloc(int fifo_size)
488 {
489 	struct fifo_buffer *fb;
490 
491 	fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
492 	if (!fb)
493 		return NULL;
494 
495 	fb->head_index = 0;
496 	fb->size = fifo_size;
497 	fb->total = 0;
498 
499 	return fb;
500 }
501 
502 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
503 {
504 	struct disk_conf *dc;
505 	unsigned int want;     /* The number of sectors we want in-flight */
506 	int req_sect; /* Number of sectors to request in this turn */
507 	int correction; /* Number of sectors more we need in-flight */
508 	int cps; /* correction per invocation of drbd_rs_controller() */
509 	int steps; /* Number of time steps to plan ahead */
510 	int curr_corr;
511 	int max_sect;
512 	struct fifo_buffer *plan;
513 
514 	dc = rcu_dereference(device->ldev->disk_conf);
515 	plan = rcu_dereference(device->rs_plan_s);
516 
517 	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
518 
519 	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
520 		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
521 	} else { /* normal path */
522 		want = dc->c_fill_target ? dc->c_fill_target :
523 			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
524 	}
525 
526 	correction = want - device->rs_in_flight - plan->total;
527 
528 	/* Plan ahead */
529 	cps = correction / steps;
530 	fifo_add_val(plan, cps);
531 	plan->total += cps * steps;
532 
533 	/* What we do in this step */
534 	curr_corr = fifo_push(plan, 0);
535 	plan->total -= curr_corr;
536 
537 	req_sect = sect_in + curr_corr;
538 	if (req_sect < 0)
539 		req_sect = 0;
540 
541 	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
542 	if (req_sect > max_sect)
543 		req_sect = max_sect;
544 
545 	/*
546 	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
547 		 sect_in, device->rs_in_flight, want, correction,
548 		 steps, cps, device->rs_planed, curr_corr, req_sect);
549 	*/
550 
551 	return req_sect;
552 }
553 
554 static int drbd_rs_number_requests(struct drbd_device *device)
555 {
556 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
557 	int number, mxb;
558 
559 	sect_in = atomic_xchg(&device->rs_sect_in, 0);
560 	device->rs_in_flight -= sect_in;
561 
562 	rcu_read_lock();
563 	mxb = drbd_get_max_buffers(device) / 2;
564 	if (rcu_dereference(device->rs_plan_s)->size) {
565 		number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
566 		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
567 	} else {
568 		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
569 		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
570 	}
571 	rcu_read_unlock();
572 
573 	/* Don't have more than "max-buffers"/2 in-flight.
574 	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
575 	 * potentially causing a distributed deadlock on congestion during
576 	 * online-verify or (checksum-based) resync, if max-buffers,
577 	 * socket buffer sizes and resync rate settings are mis-configured. */
578 
579 	/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
580 	 * mxb (as used here, and in drbd_alloc_pages on the peer) is
581 	 * "number of pages" (typically also 4k),
582 	 * but "rs_in_flight" is in "sectors" (512 Byte). */
583 	if (mxb - device->rs_in_flight/8 < number)
584 		number = mxb - device->rs_in_flight/8;
585 
586 	return number;
587 }
588 
589 static int make_resync_request(struct drbd_device *const device, int cancel)
590 {
591 	struct drbd_peer_device *const peer_device = first_peer_device(device);
592 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
593 	unsigned long bit;
594 	sector_t sector;
595 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
596 	int max_bio_size;
597 	int number, rollback_i, size;
598 	int align, requeue = 0;
599 	int i = 0;
600 	int discard_granularity = 0;
601 
602 	if (unlikely(cancel))
603 		return 0;
604 
605 	if (device->rs_total == 0) {
606 		/* empty resync? */
607 		drbd_resync_finished(device);
608 		return 0;
609 	}
610 
611 	if (!get_ldev(device)) {
612 		/* Since we only need to access device->rsync a
613 		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
614 		   to continue resync with a broken disk makes no sense at
615 		   all */
616 		drbd_err(device, "Disk broke down during resync!\n");
617 		return 0;
618 	}
619 
620 	if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
621 		rcu_read_lock();
622 		discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
623 		rcu_read_unlock();
624 	}
625 
626 	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
627 	number = drbd_rs_number_requests(device);
628 	if (number <= 0)
629 		goto requeue;
630 
631 	for (i = 0; i < number; i++) {
632 		/* Stop generating RS requests when half of the send buffer is filled,
633 		 * but notify TCP that we'd like to have more space. */
634 		mutex_lock(&connection->data.mutex);
635 		if (connection->data.socket) {
636 			struct sock *sk = connection->data.socket->sk;
637 			int queued = sk->sk_wmem_queued;
638 			int sndbuf = sk->sk_sndbuf;
639 			if (queued > sndbuf / 2) {
640 				requeue = 1;
641 				if (sk->sk_socket)
642 					set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
643 			}
644 		} else
645 			requeue = 1;
646 		mutex_unlock(&connection->data.mutex);
647 		if (requeue)
648 			goto requeue;
649 
650 next_sector:
651 		size = BM_BLOCK_SIZE;
652 		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
653 
654 		if (bit == DRBD_END_OF_BITMAP) {
655 			device->bm_resync_fo = drbd_bm_bits(device);
656 			put_ldev(device);
657 			return 0;
658 		}
659 
660 		sector = BM_BIT_TO_SECT(bit);
661 
662 		if (drbd_try_rs_begin_io(device, sector)) {
663 			device->bm_resync_fo = bit;
664 			goto requeue;
665 		}
666 		device->bm_resync_fo = bit + 1;
667 
668 		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
669 			drbd_rs_complete_io(device, sector);
670 			goto next_sector;
671 		}
672 
673 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
674 		/* try to find some adjacent bits.
675 		 * we stop if we have already the maximum req size.
676 		 *
677 		 * Additionally always align bigger requests, in order to
678 		 * be prepared for all stripe sizes of software RAIDs.
679 		 */
680 		align = 1;
681 		rollback_i = i;
682 		while (i < number) {
683 			if (size + BM_BLOCK_SIZE > max_bio_size)
684 				break;
685 
686 			/* Be always aligned */
687 			if (sector & ((1<<(align+3))-1))
688 				break;
689 
690 			if (discard_granularity && size == discard_granularity)
691 				break;
692 
693 			/* do not cross extent boundaries */
694 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
695 				break;
696 			/* now, is it actually dirty, after all?
697 			 * caution, drbd_bm_test_bit is tri-state for some
698 			 * obscure reason; ( b == 0 ) would get the out-of-band
699 			 * only accidentally right because of the "oddly sized"
700 			 * adjustment below */
701 			if (drbd_bm_test_bit(device, bit+1) != 1)
702 				break;
703 			bit++;
704 			size += BM_BLOCK_SIZE;
705 			if ((BM_BLOCK_SIZE << align) <= size)
706 				align++;
707 			i++;
708 		}
709 		/* if we merged some,
710 		 * reset the offset to start the next drbd_bm_find_next from */
711 		if (size > BM_BLOCK_SIZE)
712 			device->bm_resync_fo = bit + 1;
713 #endif
714 
715 		/* adjust very last sectors, in case we are oddly sized */
716 		if (sector + (size>>9) > capacity)
717 			size = (capacity-sector)<<9;
718 
719 		if (device->use_csums) {
720 			switch (read_for_csum(peer_device, sector, size)) {
721 			case -EIO: /* Disk failure */
722 				put_ldev(device);
723 				return -EIO;
724 			case -EAGAIN: /* allocation failed, or ldev busy */
725 				drbd_rs_complete_io(device, sector);
726 				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
727 				i = rollback_i;
728 				goto requeue;
729 			case 0:
730 				/* everything ok */
731 				break;
732 			default:
733 				BUG();
734 			}
735 		} else {
736 			int err;
737 
738 			inc_rs_pending(device);
739 			err = drbd_send_drequest(peer_device,
740 						 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
741 						 sector, size, ID_SYNCER);
742 			if (err) {
743 				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
744 				dec_rs_pending(device);
745 				put_ldev(device);
746 				return err;
747 			}
748 		}
749 	}
750 
751 	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
752 		/* last syncer _request_ was sent,
753 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
754 		 * next sync group will resume), as soon as we receive the last
755 		 * resync data block, and the last bit is cleared.
756 		 * until then resync "work" is "inactive" ...
757 		 */
758 		put_ldev(device);
759 		return 0;
760 	}
761 
762  requeue:
763 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
764 	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
765 	put_ldev(device);
766 	return 0;
767 }
768 
769 static int make_ov_request(struct drbd_device *device, int cancel)
770 {
771 	int number, i, size;
772 	sector_t sector;
773 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
774 	bool stop_sector_reached = false;
775 
776 	if (unlikely(cancel))
777 		return 1;
778 
779 	number = drbd_rs_number_requests(device);
780 
781 	sector = device->ov_position;
782 	for (i = 0; i < number; i++) {
783 		if (sector >= capacity)
784 			return 1;
785 
786 		/* We check for "finished" only in the reply path:
787 		 * w_e_end_ov_reply().
788 		 * We need to send at least one request out. */
789 		stop_sector_reached = i > 0
790 			&& verify_can_do_stop_sector(device)
791 			&& sector >= device->ov_stop_sector;
792 		if (stop_sector_reached)
793 			break;
794 
795 		size = BM_BLOCK_SIZE;
796 
797 		if (drbd_try_rs_begin_io(device, sector)) {
798 			device->ov_position = sector;
799 			goto requeue;
800 		}
801 
802 		if (sector + (size>>9) > capacity)
803 			size = (capacity-sector)<<9;
804 
805 		inc_rs_pending(device);
806 		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
807 			dec_rs_pending(device);
808 			return 0;
809 		}
810 		sector += BM_SECT_PER_BIT;
811 	}
812 	device->ov_position = sector;
813 
814  requeue:
815 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
816 	if (i == 0 || !stop_sector_reached)
817 		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
818 	return 1;
819 }
820 
821 int w_ov_finished(struct drbd_work *w, int cancel)
822 {
823 	struct drbd_device_work *dw =
824 		container_of(w, struct drbd_device_work, w);
825 	struct drbd_device *device = dw->device;
826 	kfree(dw);
827 	ov_out_of_sync_print(device);
828 	drbd_resync_finished(device);
829 
830 	return 0;
831 }
832 
833 static int w_resync_finished(struct drbd_work *w, int cancel)
834 {
835 	struct drbd_device_work *dw =
836 		container_of(w, struct drbd_device_work, w);
837 	struct drbd_device *device = dw->device;
838 	kfree(dw);
839 
840 	drbd_resync_finished(device);
841 
842 	return 0;
843 }
844 
845 static void ping_peer(struct drbd_device *device)
846 {
847 	struct drbd_connection *connection = first_peer_device(device)->connection;
848 
849 	clear_bit(GOT_PING_ACK, &connection->flags);
850 	request_ping(connection);
851 	wait_event(connection->ping_wait,
852 		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
853 }
854 
855 int drbd_resync_finished(struct drbd_device *device)
856 {
857 	struct drbd_connection *connection = first_peer_device(device)->connection;
858 	unsigned long db, dt, dbdt;
859 	unsigned long n_oos;
860 	union drbd_state os, ns;
861 	struct drbd_device_work *dw;
862 	char *khelper_cmd = NULL;
863 	int verify_done = 0;
864 
865 	/* Remove all elements from the resync LRU. Since future actions
866 	 * might set bits in the (main) bitmap, then the entries in the
867 	 * resync LRU would be wrong. */
868 	if (drbd_rs_del_all(device)) {
869 		/* In case this is not possible now, most probably because
870 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
871 		 * queue (or even the read operations for those packets
872 		 * is not finished by now).   Retry in 100ms. */
873 
874 		schedule_timeout_interruptible(HZ / 10);
875 		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
876 		if (dw) {
877 			dw->w.cb = w_resync_finished;
878 			dw->device = device;
879 			drbd_queue_work(&connection->sender_work, &dw->w);
880 			return 1;
881 		}
882 		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
883 	}
884 
885 	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
886 	if (dt <= 0)
887 		dt = 1;
888 
889 	db = device->rs_total;
890 	/* adjust for verify start and stop sectors, respective reached position */
891 	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
892 		db -= device->ov_left;
893 
894 	dbdt = Bit2KB(db/dt);
895 	device->rs_paused /= HZ;
896 
897 	if (!get_ldev(device))
898 		goto out;
899 
900 	ping_peer(device);
901 
902 	spin_lock_irq(&device->resource->req_lock);
903 	os = drbd_read_state(device);
904 
905 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
906 
907 	/* This protects us against multiple calls (that can happen in the presence
908 	   of application IO), and against connectivity loss just before we arrive here. */
909 	if (os.conn <= C_CONNECTED)
910 		goto out_unlock;
911 
912 	ns = os;
913 	ns.conn = C_CONNECTED;
914 
915 	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
916 	     verify_done ? "Online verify" : "Resync",
917 	     dt + device->rs_paused, device->rs_paused, dbdt);
918 
919 	n_oos = drbd_bm_total_weight(device);
920 
921 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
922 		if (n_oos) {
923 			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
924 			      n_oos, Bit2KB(1));
925 			khelper_cmd = "out-of-sync";
926 		}
927 	} else {
928 		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
929 
930 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
931 			khelper_cmd = "after-resync-target";
932 
933 		if (device->use_csums && device->rs_total) {
934 			const unsigned long s = device->rs_same_csum;
935 			const unsigned long t = device->rs_total;
936 			const int ratio =
937 				(t == 0)     ? 0 :
938 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
939 			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
940 			     "transferred %luK total %luK\n",
941 			     ratio,
942 			     Bit2KB(device->rs_same_csum),
943 			     Bit2KB(device->rs_total - device->rs_same_csum),
944 			     Bit2KB(device->rs_total));
945 		}
946 	}
947 
948 	if (device->rs_failed) {
949 		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
950 
951 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
952 			ns.disk = D_INCONSISTENT;
953 			ns.pdsk = D_UP_TO_DATE;
954 		} else {
955 			ns.disk = D_UP_TO_DATE;
956 			ns.pdsk = D_INCONSISTENT;
957 		}
958 	} else {
959 		ns.disk = D_UP_TO_DATE;
960 		ns.pdsk = D_UP_TO_DATE;
961 
962 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
963 			if (device->p_uuid) {
964 				int i;
965 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
966 					_drbd_uuid_set(device, i, device->p_uuid[i]);
967 				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
968 				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
969 			} else {
970 				drbd_err(device, "device->p_uuid is NULL! BUG\n");
971 			}
972 		}
973 
974 		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
975 			/* for verify runs, we don't update uuids here,
976 			 * so there would be nothing to report. */
977 			drbd_uuid_set_bm(device, 0UL);
978 			drbd_print_uuids(device, "updated UUIDs");
979 			if (device->p_uuid) {
980 				/* Now the two UUID sets are equal, update what we
981 				 * know of the peer. */
982 				int i;
983 				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
984 					device->p_uuid[i] = device->ldev->md.uuid[i];
985 			}
986 		}
987 	}
988 
989 	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
990 out_unlock:
991 	spin_unlock_irq(&device->resource->req_lock);
992 
993 	/* If we have been sync source, and have an effective fencing-policy,
994 	 * once *all* volumes are back in sync, call "unfence". */
995 	if (os.conn == C_SYNC_SOURCE) {
996 		enum drbd_disk_state disk_state = D_MASK;
997 		enum drbd_disk_state pdsk_state = D_MASK;
998 		enum drbd_fencing_p fp = FP_DONT_CARE;
999 
1000 		rcu_read_lock();
1001 		fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1002 		if (fp != FP_DONT_CARE) {
1003 			struct drbd_peer_device *peer_device;
1004 			int vnr;
1005 			idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1006 				struct drbd_device *device = peer_device->device;
1007 				disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1008 				pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1009 			}
1010 		}
1011 		rcu_read_unlock();
1012 		if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1013 			conn_khelper(connection, "unfence-peer");
1014 	}
1015 
1016 	put_ldev(device);
1017 out:
1018 	device->rs_total  = 0;
1019 	device->rs_failed = 0;
1020 	device->rs_paused = 0;
1021 
1022 	/* reset start sector, if we reached end of device */
1023 	if (verify_done && device->ov_left == 0)
1024 		device->ov_start_sector = 0;
1025 
1026 	drbd_md_sync(device);
1027 
1028 	if (khelper_cmd)
1029 		drbd_khelper(device, khelper_cmd);
1030 
1031 	return 1;
1032 }
1033 
1034 /* helper */
1035 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1036 {
1037 	if (drbd_peer_req_has_active_page(peer_req)) {
1038 		/* This might happen if sendpage() has not finished */
1039 		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1040 		atomic_add(i, &device->pp_in_use_by_net);
1041 		atomic_sub(i, &device->pp_in_use);
1042 		spin_lock_irq(&device->resource->req_lock);
1043 		list_add_tail(&peer_req->w.list, &device->net_ee);
1044 		spin_unlock_irq(&device->resource->req_lock);
1045 		wake_up(&drbd_pp_wait);
1046 	} else
1047 		drbd_free_peer_req(device, peer_req);
1048 }
1049 
1050 /**
1051  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1052  * @w:		work object.
1053  * @cancel:	The connection will be closed anyways
1054  */
1055 int w_e_end_data_req(struct drbd_work *w, int cancel)
1056 {
1057 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1058 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1059 	struct drbd_device *device = peer_device->device;
1060 	int err;
1061 
1062 	if (unlikely(cancel)) {
1063 		drbd_free_peer_req(device, peer_req);
1064 		dec_unacked(device);
1065 		return 0;
1066 	}
1067 
1068 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1069 		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1070 	} else {
1071 		if (__ratelimit(&drbd_ratelimit_state))
1072 			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1073 			    (unsigned long long)peer_req->i.sector);
1074 
1075 		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1076 	}
1077 
1078 	dec_unacked(device);
1079 
1080 	move_to_net_ee_or_free(device, peer_req);
1081 
1082 	if (unlikely(err))
1083 		drbd_err(device, "drbd_send_block() failed\n");
1084 	return err;
1085 }
1086 
1087 static bool all_zero(struct drbd_peer_request *peer_req)
1088 {
1089 	struct page *page = peer_req->pages;
1090 	unsigned int len = peer_req->i.size;
1091 
1092 	page_chain_for_each(page) {
1093 		unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1094 		unsigned int i, words = l / sizeof(long);
1095 		unsigned long *d;
1096 
1097 		d = kmap_atomic(page);
1098 		for (i = 0; i < words; i++) {
1099 			if (d[i]) {
1100 				kunmap_atomic(d);
1101 				return false;
1102 			}
1103 		}
1104 		kunmap_atomic(d);
1105 		len -= l;
1106 	}
1107 
1108 	return true;
1109 }
1110 
1111 /**
1112  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1113  * @w:		work object.
1114  * @cancel:	The connection will be closed anyways
1115  */
1116 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1117 {
1118 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1119 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1120 	struct drbd_device *device = peer_device->device;
1121 	int err;
1122 
1123 	if (unlikely(cancel)) {
1124 		drbd_free_peer_req(device, peer_req);
1125 		dec_unacked(device);
1126 		return 0;
1127 	}
1128 
1129 	if (get_ldev_if_state(device, D_FAILED)) {
1130 		drbd_rs_complete_io(device, peer_req->i.sector);
1131 		put_ldev(device);
1132 	}
1133 
1134 	if (device->state.conn == C_AHEAD) {
1135 		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1136 	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1137 		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1138 			inc_rs_pending(device);
1139 			if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1140 				err = drbd_send_rs_deallocated(peer_device, peer_req);
1141 			else
1142 				err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1143 		} else {
1144 			if (__ratelimit(&drbd_ratelimit_state))
1145 				drbd_err(device, "Not sending RSDataReply, "
1146 				    "partner DISKLESS!\n");
1147 			err = 0;
1148 		}
1149 	} else {
1150 		if (__ratelimit(&drbd_ratelimit_state))
1151 			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1152 			    (unsigned long long)peer_req->i.sector);
1153 
1154 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1155 
1156 		/* update resync data with failure */
1157 		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1158 	}
1159 
1160 	dec_unacked(device);
1161 
1162 	move_to_net_ee_or_free(device, peer_req);
1163 
1164 	if (unlikely(err))
1165 		drbd_err(device, "drbd_send_block() failed\n");
1166 	return err;
1167 }
1168 
1169 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1170 {
1171 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1172 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1173 	struct drbd_device *device = peer_device->device;
1174 	struct digest_info *di;
1175 	int digest_size;
1176 	void *digest = NULL;
1177 	int err, eq = 0;
1178 
1179 	if (unlikely(cancel)) {
1180 		drbd_free_peer_req(device, peer_req);
1181 		dec_unacked(device);
1182 		return 0;
1183 	}
1184 
1185 	if (get_ldev(device)) {
1186 		drbd_rs_complete_io(device, peer_req->i.sector);
1187 		put_ldev(device);
1188 	}
1189 
1190 	di = peer_req->digest;
1191 
1192 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1193 		/* quick hack to try to avoid a race against reconfiguration.
1194 		 * a real fix would be much more involved,
1195 		 * introducing more locking mechanisms */
1196 		if (peer_device->connection->csums_tfm) {
1197 			digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
1198 			D_ASSERT(device, digest_size == di->digest_size);
1199 			digest = kmalloc(digest_size, GFP_NOIO);
1200 		}
1201 		if (digest) {
1202 			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1203 			eq = !memcmp(digest, di->digest, digest_size);
1204 			kfree(digest);
1205 		}
1206 
1207 		if (eq) {
1208 			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1209 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1210 			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1211 			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1212 		} else {
1213 			inc_rs_pending(device);
1214 			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1215 			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1216 			kfree(di);
1217 			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1218 		}
1219 	} else {
1220 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1221 		if (__ratelimit(&drbd_ratelimit_state))
1222 			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1223 	}
1224 
1225 	dec_unacked(device);
1226 	move_to_net_ee_or_free(device, peer_req);
1227 
1228 	if (unlikely(err))
1229 		drbd_err(device, "drbd_send_block/ack() failed\n");
1230 	return err;
1231 }
1232 
1233 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1234 {
1235 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1236 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1237 	struct drbd_device *device = peer_device->device;
1238 	sector_t sector = peer_req->i.sector;
1239 	unsigned int size = peer_req->i.size;
1240 	int digest_size;
1241 	void *digest;
1242 	int err = 0;
1243 
1244 	if (unlikely(cancel))
1245 		goto out;
1246 
1247 	digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1248 	digest = kmalloc(digest_size, GFP_NOIO);
1249 	if (!digest) {
1250 		err = 1;	/* terminate the connection in case the allocation failed */
1251 		goto out;
1252 	}
1253 
1254 	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1255 		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1256 	else
1257 		memset(digest, 0, digest_size);
1258 
1259 	/* Free e and pages before send.
1260 	 * In case we block on congestion, we could otherwise run into
1261 	 * some distributed deadlock, if the other side blocks on
1262 	 * congestion as well, because our receiver blocks in
1263 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1264 	drbd_free_peer_req(device, peer_req);
1265 	peer_req = NULL;
1266 	inc_rs_pending(device);
1267 	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1268 	if (err)
1269 		dec_rs_pending(device);
1270 	kfree(digest);
1271 
1272 out:
1273 	if (peer_req)
1274 		drbd_free_peer_req(device, peer_req);
1275 	dec_unacked(device);
1276 	return err;
1277 }
1278 
1279 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1280 {
1281 	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1282 		device->ov_last_oos_size += size>>9;
1283 	} else {
1284 		device->ov_last_oos_start = sector;
1285 		device->ov_last_oos_size = size>>9;
1286 	}
1287 	drbd_set_out_of_sync(device, sector, size);
1288 }
1289 
1290 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1291 {
1292 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1293 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1294 	struct drbd_device *device = peer_device->device;
1295 	struct digest_info *di;
1296 	void *digest;
1297 	sector_t sector = peer_req->i.sector;
1298 	unsigned int size = peer_req->i.size;
1299 	int digest_size;
1300 	int err, eq = 0;
1301 	bool stop_sector_reached = false;
1302 
1303 	if (unlikely(cancel)) {
1304 		drbd_free_peer_req(device, peer_req);
1305 		dec_unacked(device);
1306 		return 0;
1307 	}
1308 
1309 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1310 	 * the resync lru has been cleaned up already */
1311 	if (get_ldev(device)) {
1312 		drbd_rs_complete_io(device, peer_req->i.sector);
1313 		put_ldev(device);
1314 	}
1315 
1316 	di = peer_req->digest;
1317 
1318 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1319 		digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1320 		digest = kmalloc(digest_size, GFP_NOIO);
1321 		if (digest) {
1322 			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1323 
1324 			D_ASSERT(device, digest_size == di->digest_size);
1325 			eq = !memcmp(digest, di->digest, digest_size);
1326 			kfree(digest);
1327 		}
1328 	}
1329 
1330 	/* Free peer_req and pages before send.
1331 	 * In case we block on congestion, we could otherwise run into
1332 	 * some distributed deadlock, if the other side blocks on
1333 	 * congestion as well, because our receiver blocks in
1334 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1335 	drbd_free_peer_req(device, peer_req);
1336 	if (!eq)
1337 		drbd_ov_out_of_sync_found(device, sector, size);
1338 	else
1339 		ov_out_of_sync_print(device);
1340 
1341 	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1342 			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1343 
1344 	dec_unacked(device);
1345 
1346 	--device->ov_left;
1347 
1348 	/* let's advance progress step marks only for every other megabyte */
1349 	if ((device->ov_left & 0x200) == 0x200)
1350 		drbd_advance_rs_marks(device, device->ov_left);
1351 
1352 	stop_sector_reached = verify_can_do_stop_sector(device) &&
1353 		(sector + (size>>9)) >= device->ov_stop_sector;
1354 
1355 	if (device->ov_left == 0 || stop_sector_reached) {
1356 		ov_out_of_sync_print(device);
1357 		drbd_resync_finished(device);
1358 	}
1359 
1360 	return err;
1361 }
1362 
1363 /* FIXME
1364  * We need to track the number of pending barrier acks,
1365  * and to be able to wait for them.
1366  * See also comment in drbd_adm_attach before drbd_suspend_io.
1367  */
1368 static int drbd_send_barrier(struct drbd_connection *connection)
1369 {
1370 	struct p_barrier *p;
1371 	struct drbd_socket *sock;
1372 
1373 	sock = &connection->data;
1374 	p = conn_prepare_command(connection, sock);
1375 	if (!p)
1376 		return -EIO;
1377 	p->barrier = connection->send.current_epoch_nr;
1378 	p->pad = 0;
1379 	connection->send.current_epoch_writes = 0;
1380 	connection->send.last_sent_barrier_jif = jiffies;
1381 
1382 	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1383 }
1384 
1385 int w_send_write_hint(struct drbd_work *w, int cancel)
1386 {
1387 	struct drbd_device *device =
1388 		container_of(w, struct drbd_device, unplug_work);
1389 	struct drbd_socket *sock;
1390 
1391 	if (cancel)
1392 		return 0;
1393 	sock = &first_peer_device(device)->connection->data;
1394 	if (!drbd_prepare_command(first_peer_device(device), sock))
1395 		return -EIO;
1396 	return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1397 }
1398 
1399 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1400 {
1401 	if (!connection->send.seen_any_write_yet) {
1402 		connection->send.seen_any_write_yet = true;
1403 		connection->send.current_epoch_nr = epoch;
1404 		connection->send.current_epoch_writes = 0;
1405 		connection->send.last_sent_barrier_jif = jiffies;
1406 	}
1407 }
1408 
1409 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1410 {
1411 	/* re-init if first write on this connection */
1412 	if (!connection->send.seen_any_write_yet)
1413 		return;
1414 	if (connection->send.current_epoch_nr != epoch) {
1415 		if (connection->send.current_epoch_writes)
1416 			drbd_send_barrier(connection);
1417 		connection->send.current_epoch_nr = epoch;
1418 	}
1419 }
1420 
1421 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1422 {
1423 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1424 	struct drbd_device *device = req->device;
1425 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1426 	struct drbd_connection *const connection = peer_device->connection;
1427 	int err;
1428 
1429 	if (unlikely(cancel)) {
1430 		req_mod(req, SEND_CANCELED);
1431 		return 0;
1432 	}
1433 	req->pre_send_jif = jiffies;
1434 
1435 	/* this time, no connection->send.current_epoch_writes++;
1436 	 * If it was sent, it was the closing barrier for the last
1437 	 * replicated epoch, before we went into AHEAD mode.
1438 	 * No more barriers will be sent, until we leave AHEAD mode again. */
1439 	maybe_send_barrier(connection, req->epoch);
1440 
1441 	err = drbd_send_out_of_sync(peer_device, req);
1442 	req_mod(req, OOS_HANDED_TO_NETWORK);
1443 
1444 	return err;
1445 }
1446 
1447 /**
1448  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1449  * @w:		work object.
1450  * @cancel:	The connection will be closed anyways
1451  */
1452 int w_send_dblock(struct drbd_work *w, int cancel)
1453 {
1454 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1455 	struct drbd_device *device = req->device;
1456 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1457 	struct drbd_connection *connection = peer_device->connection;
1458 	int err;
1459 
1460 	if (unlikely(cancel)) {
1461 		req_mod(req, SEND_CANCELED);
1462 		return 0;
1463 	}
1464 	req->pre_send_jif = jiffies;
1465 
1466 	re_init_if_first_write(connection, req->epoch);
1467 	maybe_send_barrier(connection, req->epoch);
1468 	connection->send.current_epoch_writes++;
1469 
1470 	err = drbd_send_dblock(peer_device, req);
1471 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1472 
1473 	return err;
1474 }
1475 
1476 /**
1477  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1478  * @w:		work object.
1479  * @cancel:	The connection will be closed anyways
1480  */
1481 int w_send_read_req(struct drbd_work *w, int cancel)
1482 {
1483 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1484 	struct drbd_device *device = req->device;
1485 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1486 	struct drbd_connection *connection = peer_device->connection;
1487 	int err;
1488 
1489 	if (unlikely(cancel)) {
1490 		req_mod(req, SEND_CANCELED);
1491 		return 0;
1492 	}
1493 	req->pre_send_jif = jiffies;
1494 
1495 	/* Even read requests may close a write epoch,
1496 	 * if there was any yet. */
1497 	maybe_send_barrier(connection, req->epoch);
1498 
1499 	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1500 				 (unsigned long)req);
1501 
1502 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1503 
1504 	return err;
1505 }
1506 
1507 int w_restart_disk_io(struct drbd_work *w, int cancel)
1508 {
1509 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1510 	struct drbd_device *device = req->device;
1511 
1512 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1513 		drbd_al_begin_io(device, &req->i);
1514 
1515 	drbd_req_make_private_bio(req, req->master_bio);
1516 	req->private_bio->bi_bdev = device->ldev->backing_bdev;
1517 	generic_make_request(req->private_bio);
1518 
1519 	return 0;
1520 }
1521 
1522 static int _drbd_may_sync_now(struct drbd_device *device)
1523 {
1524 	struct drbd_device *odev = device;
1525 	int resync_after;
1526 
1527 	while (1) {
1528 		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1529 			return 1;
1530 		rcu_read_lock();
1531 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1532 		rcu_read_unlock();
1533 		if (resync_after == -1)
1534 			return 1;
1535 		odev = minor_to_device(resync_after);
1536 		if (!odev)
1537 			return 1;
1538 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1539 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1540 		    odev->state.aftr_isp || odev->state.peer_isp ||
1541 		    odev->state.user_isp)
1542 			return 0;
1543 	}
1544 }
1545 
1546 /**
1547  * drbd_pause_after() - Pause resync on all devices that may not resync now
1548  * @device:	DRBD device.
1549  *
1550  * Called from process context only (admin command and after_state_ch).
1551  */
1552 static bool drbd_pause_after(struct drbd_device *device)
1553 {
1554 	bool changed = false;
1555 	struct drbd_device *odev;
1556 	int i;
1557 
1558 	rcu_read_lock();
1559 	idr_for_each_entry(&drbd_devices, odev, i) {
1560 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1561 			continue;
1562 		if (!_drbd_may_sync_now(odev) &&
1563 		    _drbd_set_state(_NS(odev, aftr_isp, 1),
1564 				    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1565 			changed = true;
1566 	}
1567 	rcu_read_unlock();
1568 
1569 	return changed;
1570 }
1571 
1572 /**
1573  * drbd_resume_next() - Resume resync on all devices that may resync now
1574  * @device:	DRBD device.
1575  *
1576  * Called from process context only (admin command and worker).
1577  */
1578 static bool drbd_resume_next(struct drbd_device *device)
1579 {
1580 	bool changed = false;
1581 	struct drbd_device *odev;
1582 	int i;
1583 
1584 	rcu_read_lock();
1585 	idr_for_each_entry(&drbd_devices, odev, i) {
1586 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1587 			continue;
1588 		if (odev->state.aftr_isp) {
1589 			if (_drbd_may_sync_now(odev) &&
1590 			    _drbd_set_state(_NS(odev, aftr_isp, 0),
1591 					    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1592 				changed = true;
1593 		}
1594 	}
1595 	rcu_read_unlock();
1596 	return changed;
1597 }
1598 
1599 void resume_next_sg(struct drbd_device *device)
1600 {
1601 	lock_all_resources();
1602 	drbd_resume_next(device);
1603 	unlock_all_resources();
1604 }
1605 
1606 void suspend_other_sg(struct drbd_device *device)
1607 {
1608 	lock_all_resources();
1609 	drbd_pause_after(device);
1610 	unlock_all_resources();
1611 }
1612 
1613 /* caller must lock_all_resources() */
1614 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1615 {
1616 	struct drbd_device *odev;
1617 	int resync_after;
1618 
1619 	if (o_minor == -1)
1620 		return NO_ERROR;
1621 	if (o_minor < -1 || o_minor > MINORMASK)
1622 		return ERR_RESYNC_AFTER;
1623 
1624 	/* check for loops */
1625 	odev = minor_to_device(o_minor);
1626 	while (1) {
1627 		if (odev == device)
1628 			return ERR_RESYNC_AFTER_CYCLE;
1629 
1630 		/* You are free to depend on diskless, non-existing,
1631 		 * or not yet/no longer existing minors.
1632 		 * We only reject dependency loops.
1633 		 * We cannot follow the dependency chain beyond a detached or
1634 		 * missing minor.
1635 		 */
1636 		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1637 			return NO_ERROR;
1638 
1639 		rcu_read_lock();
1640 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1641 		rcu_read_unlock();
1642 		/* dependency chain ends here, no cycles. */
1643 		if (resync_after == -1)
1644 			return NO_ERROR;
1645 
1646 		/* follow the dependency chain */
1647 		odev = minor_to_device(resync_after);
1648 	}
1649 }
1650 
1651 /* caller must lock_all_resources() */
1652 void drbd_resync_after_changed(struct drbd_device *device)
1653 {
1654 	int changed;
1655 
1656 	do {
1657 		changed  = drbd_pause_after(device);
1658 		changed |= drbd_resume_next(device);
1659 	} while (changed);
1660 }
1661 
1662 void drbd_rs_controller_reset(struct drbd_device *device)
1663 {
1664 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1665 	struct fifo_buffer *plan;
1666 
1667 	atomic_set(&device->rs_sect_in, 0);
1668 	atomic_set(&device->rs_sect_ev, 0);
1669 	device->rs_in_flight = 0;
1670 	device->rs_last_events =
1671 		(int)part_stat_read(&disk->part0, sectors[0]) +
1672 		(int)part_stat_read(&disk->part0, sectors[1]);
1673 
1674 	/* Updating the RCU protected object in place is necessary since
1675 	   this function gets called from atomic context.
1676 	   It is valid since all other updates also lead to an completely
1677 	   empty fifo */
1678 	rcu_read_lock();
1679 	plan = rcu_dereference(device->rs_plan_s);
1680 	plan->total = 0;
1681 	fifo_set(plan, 0);
1682 	rcu_read_unlock();
1683 }
1684 
1685 void start_resync_timer_fn(unsigned long data)
1686 {
1687 	struct drbd_device *device = (struct drbd_device *) data;
1688 	drbd_device_post_work(device, RS_START);
1689 }
1690 
1691 static void do_start_resync(struct drbd_device *device)
1692 {
1693 	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1694 		drbd_warn(device, "postponing start_resync ...\n");
1695 		device->start_resync_timer.expires = jiffies + HZ/10;
1696 		add_timer(&device->start_resync_timer);
1697 		return;
1698 	}
1699 
1700 	drbd_start_resync(device, C_SYNC_SOURCE);
1701 	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1702 }
1703 
1704 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1705 {
1706 	bool csums_after_crash_only;
1707 	rcu_read_lock();
1708 	csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1709 	rcu_read_unlock();
1710 	return connection->agreed_pro_version >= 89 &&		/* supported? */
1711 		connection->csums_tfm &&			/* configured? */
1712 		(csums_after_crash_only == false		/* use for each resync? */
1713 		 || test_bit(CRASHED_PRIMARY, &device->flags));	/* or only after Primary crash? */
1714 }
1715 
1716 /**
1717  * drbd_start_resync() - Start the resync process
1718  * @device:	DRBD device.
1719  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1720  *
1721  * This function might bring you directly into one of the
1722  * C_PAUSED_SYNC_* states.
1723  */
1724 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1725 {
1726 	struct drbd_peer_device *peer_device = first_peer_device(device);
1727 	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1728 	union drbd_state ns;
1729 	int r;
1730 
1731 	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1732 		drbd_err(device, "Resync already running!\n");
1733 		return;
1734 	}
1735 
1736 	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1737 		if (side == C_SYNC_TARGET) {
1738 			/* Since application IO was locked out during C_WF_BITMAP_T and
1739 			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1740 			   we check that we might make the data inconsistent. */
1741 			r = drbd_khelper(device, "before-resync-target");
1742 			r = (r >> 8) & 0xff;
1743 			if (r > 0) {
1744 				drbd_info(device, "before-resync-target handler returned %d, "
1745 					 "dropping connection.\n", r);
1746 				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1747 				return;
1748 			}
1749 		} else /* C_SYNC_SOURCE */ {
1750 			r = drbd_khelper(device, "before-resync-source");
1751 			r = (r >> 8) & 0xff;
1752 			if (r > 0) {
1753 				if (r == 3) {
1754 					drbd_info(device, "before-resync-source handler returned %d, "
1755 						 "ignoring. Old userland tools?", r);
1756 				} else {
1757 					drbd_info(device, "before-resync-source handler returned %d, "
1758 						 "dropping connection.\n", r);
1759 					conn_request_state(connection,
1760 							   NS(conn, C_DISCONNECTING), CS_HARD);
1761 					return;
1762 				}
1763 			}
1764 		}
1765 	}
1766 
1767 	if (current == connection->worker.task) {
1768 		/* The worker should not sleep waiting for state_mutex,
1769 		   that can take long */
1770 		if (!mutex_trylock(device->state_mutex)) {
1771 			set_bit(B_RS_H_DONE, &device->flags);
1772 			device->start_resync_timer.expires = jiffies + HZ/5;
1773 			add_timer(&device->start_resync_timer);
1774 			return;
1775 		}
1776 	} else {
1777 		mutex_lock(device->state_mutex);
1778 	}
1779 
1780 	lock_all_resources();
1781 	clear_bit(B_RS_H_DONE, &device->flags);
1782 	/* Did some connection breakage or IO error race with us? */
1783 	if (device->state.conn < C_CONNECTED
1784 	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1785 		unlock_all_resources();
1786 		goto out;
1787 	}
1788 
1789 	ns = drbd_read_state(device);
1790 
1791 	ns.aftr_isp = !_drbd_may_sync_now(device);
1792 
1793 	ns.conn = side;
1794 
1795 	if (side == C_SYNC_TARGET)
1796 		ns.disk = D_INCONSISTENT;
1797 	else /* side == C_SYNC_SOURCE */
1798 		ns.pdsk = D_INCONSISTENT;
1799 
1800 	r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1801 	ns = drbd_read_state(device);
1802 
1803 	if (ns.conn < C_CONNECTED)
1804 		r = SS_UNKNOWN_ERROR;
1805 
1806 	if (r == SS_SUCCESS) {
1807 		unsigned long tw = drbd_bm_total_weight(device);
1808 		unsigned long now = jiffies;
1809 		int i;
1810 
1811 		device->rs_failed    = 0;
1812 		device->rs_paused    = 0;
1813 		device->rs_same_csum = 0;
1814 		device->rs_last_sect_ev = 0;
1815 		device->rs_total     = tw;
1816 		device->rs_start     = now;
1817 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1818 			device->rs_mark_left[i] = tw;
1819 			device->rs_mark_time[i] = now;
1820 		}
1821 		drbd_pause_after(device);
1822 		/* Forget potentially stale cached per resync extent bit-counts.
1823 		 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1824 		 * disabled, and know the disk state is ok. */
1825 		spin_lock(&device->al_lock);
1826 		lc_reset(device->resync);
1827 		device->resync_locked = 0;
1828 		device->resync_wenr = LC_FREE;
1829 		spin_unlock(&device->al_lock);
1830 	}
1831 	unlock_all_resources();
1832 
1833 	if (r == SS_SUCCESS) {
1834 		wake_up(&device->al_wait); /* for lc_reset() above */
1835 		/* reset rs_last_bcast when a resync or verify is started,
1836 		 * to deal with potential jiffies wrap. */
1837 		device->rs_last_bcast = jiffies - HZ;
1838 
1839 		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1840 		     drbd_conn_str(ns.conn),
1841 		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1842 		     (unsigned long) device->rs_total);
1843 		if (side == C_SYNC_TARGET) {
1844 			device->bm_resync_fo = 0;
1845 			device->use_csums = use_checksum_based_resync(connection, device);
1846 		} else {
1847 			device->use_csums = false;
1848 		}
1849 
1850 		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1851 		 * with w_send_oos, or the sync target will get confused as to
1852 		 * how much bits to resync.  We cannot do that always, because for an
1853 		 * empty resync and protocol < 95, we need to do it here, as we call
1854 		 * drbd_resync_finished from here in that case.
1855 		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1856 		 * and from after_state_ch otherwise. */
1857 		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1858 			drbd_gen_and_send_sync_uuid(peer_device);
1859 
1860 		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1861 			/* This still has a race (about when exactly the peers
1862 			 * detect connection loss) that can lead to a full sync
1863 			 * on next handshake. In 8.3.9 we fixed this with explicit
1864 			 * resync-finished notifications, but the fix
1865 			 * introduces a protocol change.  Sleeping for some
1866 			 * time longer than the ping interval + timeout on the
1867 			 * SyncSource, to give the SyncTarget the chance to
1868 			 * detect connection loss, then waiting for a ping
1869 			 * response (implicit in drbd_resync_finished) reduces
1870 			 * the race considerably, but does not solve it. */
1871 			if (side == C_SYNC_SOURCE) {
1872 				struct net_conf *nc;
1873 				int timeo;
1874 
1875 				rcu_read_lock();
1876 				nc = rcu_dereference(connection->net_conf);
1877 				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1878 				rcu_read_unlock();
1879 				schedule_timeout_interruptible(timeo);
1880 			}
1881 			drbd_resync_finished(device);
1882 		}
1883 
1884 		drbd_rs_controller_reset(device);
1885 		/* ns.conn may already be != device->state.conn,
1886 		 * we may have been paused in between, or become paused until
1887 		 * the timer triggers.
1888 		 * No matter, that is handled in resync_timer_fn() */
1889 		if (ns.conn == C_SYNC_TARGET)
1890 			mod_timer(&device->resync_timer, jiffies);
1891 
1892 		drbd_md_sync(device);
1893 	}
1894 	put_ldev(device);
1895 out:
1896 	mutex_unlock(device->state_mutex);
1897 }
1898 
1899 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1900 {
1901 	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1902 	device->rs_last_bcast = jiffies;
1903 
1904 	if (!get_ldev(device))
1905 		return;
1906 
1907 	drbd_bm_write_lazy(device, 0);
1908 	if (resync_done && is_sync_state(device->state.conn))
1909 		drbd_resync_finished(device);
1910 
1911 	drbd_bcast_event(device, &sib);
1912 	/* update timestamp, in case it took a while to write out stuff */
1913 	device->rs_last_bcast = jiffies;
1914 	put_ldev(device);
1915 }
1916 
1917 static void drbd_ldev_destroy(struct drbd_device *device)
1918 {
1919 	lc_destroy(device->resync);
1920 	device->resync = NULL;
1921 	lc_destroy(device->act_log);
1922 	device->act_log = NULL;
1923 
1924 	__acquire(local);
1925 	drbd_backing_dev_free(device, device->ldev);
1926 	device->ldev = NULL;
1927 	__release(local);
1928 
1929 	clear_bit(GOING_DISKLESS, &device->flags);
1930 	wake_up(&device->misc_wait);
1931 }
1932 
1933 static void go_diskless(struct drbd_device *device)
1934 {
1935 	D_ASSERT(device, device->state.disk == D_FAILED);
1936 	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1937 	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1938 	 * the protected members anymore, though, so once put_ldev reaches zero
1939 	 * again, it will be safe to free them. */
1940 
1941 	/* Try to write changed bitmap pages, read errors may have just
1942 	 * set some bits outside the area covered by the activity log.
1943 	 *
1944 	 * If we have an IO error during the bitmap writeout,
1945 	 * we will want a full sync next time, just in case.
1946 	 * (Do we want a specific meta data flag for this?)
1947 	 *
1948 	 * If that does not make it to stable storage either,
1949 	 * we cannot do anything about that anymore.
1950 	 *
1951 	 * We still need to check if both bitmap and ldev are present, we may
1952 	 * end up here after a failed attach, before ldev was even assigned.
1953 	 */
1954 	if (device->bitmap && device->ldev) {
1955 		/* An interrupted resync or similar is allowed to recounts bits
1956 		 * while we detach.
1957 		 * Any modifications would not be expected anymore, though.
1958 		 */
1959 		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1960 					"detach", BM_LOCKED_TEST_ALLOWED)) {
1961 			if (test_bit(WAS_READ_ERROR, &device->flags)) {
1962 				drbd_md_set_flag(device, MDF_FULL_SYNC);
1963 				drbd_md_sync(device);
1964 			}
1965 		}
1966 	}
1967 
1968 	drbd_force_state(device, NS(disk, D_DISKLESS));
1969 }
1970 
1971 static int do_md_sync(struct drbd_device *device)
1972 {
1973 	drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1974 	drbd_md_sync(device);
1975 	return 0;
1976 }
1977 
1978 /* only called from drbd_worker thread, no locking */
1979 void __update_timing_details(
1980 		struct drbd_thread_timing_details *tdp,
1981 		unsigned int *cb_nr,
1982 		void *cb,
1983 		const char *fn, const unsigned int line)
1984 {
1985 	unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1986 	struct drbd_thread_timing_details *td = tdp + i;
1987 
1988 	td->start_jif = jiffies;
1989 	td->cb_addr = cb;
1990 	td->caller_fn = fn;
1991 	td->line = line;
1992 	td->cb_nr = *cb_nr;
1993 
1994 	i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1995 	td = tdp + i;
1996 	memset(td, 0, sizeof(*td));
1997 
1998 	++(*cb_nr);
1999 }
2000 
2001 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2002 {
2003 	if (test_bit(MD_SYNC, &todo))
2004 		do_md_sync(device);
2005 	if (test_bit(RS_DONE, &todo) ||
2006 	    test_bit(RS_PROGRESS, &todo))
2007 		update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2008 	if (test_bit(GO_DISKLESS, &todo))
2009 		go_diskless(device);
2010 	if (test_bit(DESTROY_DISK, &todo))
2011 		drbd_ldev_destroy(device);
2012 	if (test_bit(RS_START, &todo))
2013 		do_start_resync(device);
2014 }
2015 
2016 #define DRBD_DEVICE_WORK_MASK	\
2017 	((1UL << GO_DISKLESS)	\
2018 	|(1UL << DESTROY_DISK)	\
2019 	|(1UL << MD_SYNC)	\
2020 	|(1UL << RS_START)	\
2021 	|(1UL << RS_PROGRESS)	\
2022 	|(1UL << RS_DONE)	\
2023 	)
2024 
2025 static unsigned long get_work_bits(unsigned long *flags)
2026 {
2027 	unsigned long old, new;
2028 	do {
2029 		old = *flags;
2030 		new = old & ~DRBD_DEVICE_WORK_MASK;
2031 	} while (cmpxchg(flags, old, new) != old);
2032 	return old & DRBD_DEVICE_WORK_MASK;
2033 }
2034 
2035 static void do_unqueued_work(struct drbd_connection *connection)
2036 {
2037 	struct drbd_peer_device *peer_device;
2038 	int vnr;
2039 
2040 	rcu_read_lock();
2041 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2042 		struct drbd_device *device = peer_device->device;
2043 		unsigned long todo = get_work_bits(&device->flags);
2044 		if (!todo)
2045 			continue;
2046 
2047 		kref_get(&device->kref);
2048 		rcu_read_unlock();
2049 		do_device_work(device, todo);
2050 		kref_put(&device->kref, drbd_destroy_device);
2051 		rcu_read_lock();
2052 	}
2053 	rcu_read_unlock();
2054 }
2055 
2056 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2057 {
2058 	spin_lock_irq(&queue->q_lock);
2059 	list_splice_tail_init(&queue->q, work_list);
2060 	spin_unlock_irq(&queue->q_lock);
2061 	return !list_empty(work_list);
2062 }
2063 
2064 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2065 {
2066 	DEFINE_WAIT(wait);
2067 	struct net_conf *nc;
2068 	int uncork, cork;
2069 
2070 	dequeue_work_batch(&connection->sender_work, work_list);
2071 	if (!list_empty(work_list))
2072 		return;
2073 
2074 	/* Still nothing to do?
2075 	 * Maybe we still need to close the current epoch,
2076 	 * even if no new requests are queued yet.
2077 	 *
2078 	 * Also, poke TCP, just in case.
2079 	 * Then wait for new work (or signal). */
2080 	rcu_read_lock();
2081 	nc = rcu_dereference(connection->net_conf);
2082 	uncork = nc ? nc->tcp_cork : 0;
2083 	rcu_read_unlock();
2084 	if (uncork) {
2085 		mutex_lock(&connection->data.mutex);
2086 		if (connection->data.socket)
2087 			drbd_tcp_uncork(connection->data.socket);
2088 		mutex_unlock(&connection->data.mutex);
2089 	}
2090 
2091 	for (;;) {
2092 		int send_barrier;
2093 		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2094 		spin_lock_irq(&connection->resource->req_lock);
2095 		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2096 		if (!list_empty(&connection->sender_work.q))
2097 			list_splice_tail_init(&connection->sender_work.q, work_list);
2098 		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2099 		if (!list_empty(work_list) || signal_pending(current)) {
2100 			spin_unlock_irq(&connection->resource->req_lock);
2101 			break;
2102 		}
2103 
2104 		/* We found nothing new to do, no to-be-communicated request,
2105 		 * no other work item.  We may still need to close the last
2106 		 * epoch.  Next incoming request epoch will be connection ->
2107 		 * current transfer log epoch number.  If that is different
2108 		 * from the epoch of the last request we communicated, it is
2109 		 * safe to send the epoch separating barrier now.
2110 		 */
2111 		send_barrier =
2112 			atomic_read(&connection->current_tle_nr) !=
2113 			connection->send.current_epoch_nr;
2114 		spin_unlock_irq(&connection->resource->req_lock);
2115 
2116 		if (send_barrier)
2117 			maybe_send_barrier(connection,
2118 					connection->send.current_epoch_nr + 1);
2119 
2120 		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2121 			break;
2122 
2123 		/* drbd_send() may have called flush_signals() */
2124 		if (get_t_state(&connection->worker) != RUNNING)
2125 			break;
2126 
2127 		schedule();
2128 		/* may be woken up for other things but new work, too,
2129 		 * e.g. if the current epoch got closed.
2130 		 * In which case we send the barrier above. */
2131 	}
2132 	finish_wait(&connection->sender_work.q_wait, &wait);
2133 
2134 	/* someone may have changed the config while we have been waiting above. */
2135 	rcu_read_lock();
2136 	nc = rcu_dereference(connection->net_conf);
2137 	cork = nc ? nc->tcp_cork : 0;
2138 	rcu_read_unlock();
2139 	mutex_lock(&connection->data.mutex);
2140 	if (connection->data.socket) {
2141 		if (cork)
2142 			drbd_tcp_cork(connection->data.socket);
2143 		else if (!uncork)
2144 			drbd_tcp_uncork(connection->data.socket);
2145 	}
2146 	mutex_unlock(&connection->data.mutex);
2147 }
2148 
2149 int drbd_worker(struct drbd_thread *thi)
2150 {
2151 	struct drbd_connection *connection = thi->connection;
2152 	struct drbd_work *w = NULL;
2153 	struct drbd_peer_device *peer_device;
2154 	LIST_HEAD(work_list);
2155 	int vnr;
2156 
2157 	while (get_t_state(thi) == RUNNING) {
2158 		drbd_thread_current_set_cpu(thi);
2159 
2160 		if (list_empty(&work_list)) {
2161 			update_worker_timing_details(connection, wait_for_work);
2162 			wait_for_work(connection, &work_list);
2163 		}
2164 
2165 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2166 			update_worker_timing_details(connection, do_unqueued_work);
2167 			do_unqueued_work(connection);
2168 		}
2169 
2170 		if (signal_pending(current)) {
2171 			flush_signals(current);
2172 			if (get_t_state(thi) == RUNNING) {
2173 				drbd_warn(connection, "Worker got an unexpected signal\n");
2174 				continue;
2175 			}
2176 			break;
2177 		}
2178 
2179 		if (get_t_state(thi) != RUNNING)
2180 			break;
2181 
2182 		if (!list_empty(&work_list)) {
2183 			w = list_first_entry(&work_list, struct drbd_work, list);
2184 			list_del_init(&w->list);
2185 			update_worker_timing_details(connection, w->cb);
2186 			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2187 				continue;
2188 			if (connection->cstate >= C_WF_REPORT_PARAMS)
2189 				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2190 		}
2191 	}
2192 
2193 	do {
2194 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2195 			update_worker_timing_details(connection, do_unqueued_work);
2196 			do_unqueued_work(connection);
2197 		}
2198 		if (!list_empty(&work_list)) {
2199 			w = list_first_entry(&work_list, struct drbd_work, list);
2200 			list_del_init(&w->list);
2201 			update_worker_timing_details(connection, w->cb);
2202 			w->cb(w, 1);
2203 		} else
2204 			dequeue_work_batch(&connection->sender_work, &work_list);
2205 	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2206 
2207 	rcu_read_lock();
2208 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2209 		struct drbd_device *device = peer_device->device;
2210 		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2211 		kref_get(&device->kref);
2212 		rcu_read_unlock();
2213 		drbd_device_cleanup(device);
2214 		kref_put(&device->kref, drbd_destroy_device);
2215 		rcu_read_lock();
2216 	}
2217 	rcu_read_unlock();
2218 
2219 	return 0;
2220 }
2221