xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision f79e4d5f92a129a1159c973735007d4ddc8541f3)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24 */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched/signal.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41 
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44 
45 /* endio handlers:
46  *   drbd_md_endio (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   drbd_bm_endio (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57 
58 /* used for synchronous meta data and bitmap IO
59  * submitted by drbd_md_sync_page_io()
60  */
61 void drbd_md_endio(struct bio *bio)
62 {
63 	struct drbd_device *device;
64 
65 	device = bio->bi_private;
66 	device->md_io.error = blk_status_to_errno(bio->bi_status);
67 
68 	/* special case: drbd_md_read() during drbd_adm_attach() */
69 	if (device->ldev)
70 		put_ldev(device);
71 	bio_put(bio);
72 
73 	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
74 	 * to timeout on the lower level device, and eventually detach from it.
75 	 * If this io completion runs after that timeout expired, this
76 	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
77 	 * During normal operation, this only puts that extra reference
78 	 * down to 1 again.
79 	 * Make sure we first drop the reference, and only then signal
80 	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
81 	 * next drbd_md_sync_page_io(), that we trigger the
82 	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
83 	 */
84 	drbd_md_put_buffer(device);
85 	device->md_io.done = 1;
86 	wake_up(&device->misc_wait);
87 }
88 
89 /* reads on behalf of the partner,
90  * "submitted" by the receiver
91  */
92 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
93 {
94 	unsigned long flags = 0;
95 	struct drbd_peer_device *peer_device = peer_req->peer_device;
96 	struct drbd_device *device = peer_device->device;
97 
98 	spin_lock_irqsave(&device->resource->req_lock, flags);
99 	device->read_cnt += peer_req->i.size >> 9;
100 	list_del(&peer_req->w.list);
101 	if (list_empty(&device->read_ee))
102 		wake_up(&device->ee_wait);
103 	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
104 		__drbd_chk_io_error(device, DRBD_READ_ERROR);
105 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
106 
107 	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
108 	put_ldev(device);
109 }
110 
111 /* writes on behalf of the partner, or resync writes,
112  * "submitted" by the receiver, final stage.  */
113 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
114 {
115 	unsigned long flags = 0;
116 	struct drbd_peer_device *peer_device = peer_req->peer_device;
117 	struct drbd_device *device = peer_device->device;
118 	struct drbd_connection *connection = peer_device->connection;
119 	struct drbd_interval i;
120 	int do_wake;
121 	u64 block_id;
122 	int do_al_complete_io;
123 
124 	/* after we moved peer_req to done_ee,
125 	 * we may no longer access it,
126 	 * it may be freed/reused already!
127 	 * (as soon as we release the req_lock) */
128 	i = peer_req->i;
129 	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
130 	block_id = peer_req->block_id;
131 	peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
132 
133 	if (peer_req->flags & EE_WAS_ERROR) {
134 		/* In protocol != C, we usually do not send write acks.
135 		 * In case of a write error, send the neg ack anyways. */
136 		if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
137 			inc_unacked(device);
138 		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
139 	}
140 
141 	spin_lock_irqsave(&device->resource->req_lock, flags);
142 	device->writ_cnt += peer_req->i.size >> 9;
143 	list_move_tail(&peer_req->w.list, &device->done_ee);
144 
145 	/*
146 	 * Do not remove from the write_requests tree here: we did not send the
147 	 * Ack yet and did not wake possibly waiting conflicting requests.
148 	 * Removed from the tree from "drbd_process_done_ee" within the
149 	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
150 	 * _drbd_clear_done_ee.
151 	 */
152 
153 	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
154 
155 	/* FIXME do we want to detach for failed REQ_DISCARD?
156 	 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
157 	if (peer_req->flags & EE_WAS_ERROR)
158 		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
159 
160 	if (connection->cstate >= C_WF_REPORT_PARAMS) {
161 		kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
162 		if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
163 			kref_put(&device->kref, drbd_destroy_device);
164 	}
165 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
166 
167 	if (block_id == ID_SYNCER)
168 		drbd_rs_complete_io(device, i.sector);
169 
170 	if (do_wake)
171 		wake_up(&device->ee_wait);
172 
173 	if (do_al_complete_io)
174 		drbd_al_complete_io(device, &i);
175 
176 	put_ldev(device);
177 }
178 
179 /* writes on behalf of the partner, or resync writes,
180  * "submitted" by the receiver.
181  */
182 void drbd_peer_request_endio(struct bio *bio)
183 {
184 	struct drbd_peer_request *peer_req = bio->bi_private;
185 	struct drbd_device *device = peer_req->peer_device->device;
186 	bool is_write = bio_data_dir(bio) == WRITE;
187 	bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
188 			  bio_op(bio) == REQ_OP_DISCARD;
189 
190 	if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
191 		drbd_warn(device, "%s: error=%d s=%llus\n",
192 				is_write ? (is_discard ? "discard" : "write")
193 					: "read", bio->bi_status,
194 				(unsigned long long)peer_req->i.sector);
195 
196 	if (bio->bi_status)
197 		set_bit(__EE_WAS_ERROR, &peer_req->flags);
198 
199 	bio_put(bio); /* no need for the bio anymore */
200 	if (atomic_dec_and_test(&peer_req->pending_bios)) {
201 		if (is_write)
202 			drbd_endio_write_sec_final(peer_req);
203 		else
204 			drbd_endio_read_sec_final(peer_req);
205 	}
206 }
207 
208 static void
209 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
210 {
211 	panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
212 		device->minor, device->resource->name, device->vnr);
213 }
214 
215 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
216  */
217 void drbd_request_endio(struct bio *bio)
218 {
219 	unsigned long flags;
220 	struct drbd_request *req = bio->bi_private;
221 	struct drbd_device *device = req->device;
222 	struct bio_and_error m;
223 	enum drbd_req_event what;
224 
225 	/* If this request was aborted locally before,
226 	 * but now was completed "successfully",
227 	 * chances are that this caused arbitrary data corruption.
228 	 *
229 	 * "aborting" requests, or force-detaching the disk, is intended for
230 	 * completely blocked/hung local backing devices which do no longer
231 	 * complete requests at all, not even do error completions.  In this
232 	 * situation, usually a hard-reset and failover is the only way out.
233 	 *
234 	 * By "aborting", basically faking a local error-completion,
235 	 * we allow for a more graceful swichover by cleanly migrating services.
236 	 * Still the affected node has to be rebooted "soon".
237 	 *
238 	 * By completing these requests, we allow the upper layers to re-use
239 	 * the associated data pages.
240 	 *
241 	 * If later the local backing device "recovers", and now DMAs some data
242 	 * from disk into the original request pages, in the best case it will
243 	 * just put random data into unused pages; but typically it will corrupt
244 	 * meanwhile completely unrelated data, causing all sorts of damage.
245 	 *
246 	 * Which means delayed successful completion,
247 	 * especially for READ requests,
248 	 * is a reason to panic().
249 	 *
250 	 * We assume that a delayed *error* completion is OK,
251 	 * though we still will complain noisily about it.
252 	 */
253 	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
254 		if (__ratelimit(&drbd_ratelimit_state))
255 			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
256 
257 		if (!bio->bi_status)
258 			drbd_panic_after_delayed_completion_of_aborted_request(device);
259 	}
260 
261 	/* to avoid recursion in __req_mod */
262 	if (unlikely(bio->bi_status)) {
263 		switch (bio_op(bio)) {
264 		case REQ_OP_WRITE_ZEROES:
265 		case REQ_OP_DISCARD:
266 			if (bio->bi_status == BLK_STS_NOTSUPP)
267 				what = DISCARD_COMPLETED_NOTSUPP;
268 			else
269 				what = DISCARD_COMPLETED_WITH_ERROR;
270 			break;
271 		case REQ_OP_READ:
272 			if (bio->bi_opf & REQ_RAHEAD)
273 				what = READ_AHEAD_COMPLETED_WITH_ERROR;
274 			else
275 				what = READ_COMPLETED_WITH_ERROR;
276 			break;
277 		default:
278 			what = WRITE_COMPLETED_WITH_ERROR;
279 			break;
280 		}
281 	} else {
282 		what = COMPLETED_OK;
283 	}
284 
285 	req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
286 	bio_put(bio);
287 
288 	/* not req_mod(), we need irqsave here! */
289 	spin_lock_irqsave(&device->resource->req_lock, flags);
290 	__req_mod(req, what, &m);
291 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
292 	put_ldev(device);
293 
294 	if (m.bio)
295 		complete_master_bio(device, &m);
296 }
297 
298 void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
299 {
300 	AHASH_REQUEST_ON_STACK(req, tfm);
301 	struct scatterlist sg;
302 	struct page *page = peer_req->pages;
303 	struct page *tmp;
304 	unsigned len;
305 
306 	ahash_request_set_tfm(req, tfm);
307 	ahash_request_set_callback(req, 0, NULL, NULL);
308 
309 	sg_init_table(&sg, 1);
310 	crypto_ahash_init(req);
311 
312 	while ((tmp = page_chain_next(page))) {
313 		/* all but the last page will be fully used */
314 		sg_set_page(&sg, page, PAGE_SIZE, 0);
315 		ahash_request_set_crypt(req, &sg, NULL, sg.length);
316 		crypto_ahash_update(req);
317 		page = tmp;
318 	}
319 	/* and now the last, possibly only partially used page */
320 	len = peer_req->i.size & (PAGE_SIZE - 1);
321 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
322 	ahash_request_set_crypt(req, &sg, digest, sg.length);
323 	crypto_ahash_finup(req);
324 	ahash_request_zero(req);
325 }
326 
327 void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
328 {
329 	AHASH_REQUEST_ON_STACK(req, tfm);
330 	struct scatterlist sg;
331 	struct bio_vec bvec;
332 	struct bvec_iter iter;
333 
334 	ahash_request_set_tfm(req, tfm);
335 	ahash_request_set_callback(req, 0, NULL, NULL);
336 
337 	sg_init_table(&sg, 1);
338 	crypto_ahash_init(req);
339 
340 	bio_for_each_segment(bvec, bio, iter) {
341 		sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
342 		ahash_request_set_crypt(req, &sg, NULL, sg.length);
343 		crypto_ahash_update(req);
344 		/* REQ_OP_WRITE_SAME has only one segment,
345 		 * checksum the payload only once. */
346 		if (bio_op(bio) == REQ_OP_WRITE_SAME)
347 			break;
348 	}
349 	ahash_request_set_crypt(req, NULL, digest, 0);
350 	crypto_ahash_final(req);
351 	ahash_request_zero(req);
352 }
353 
354 /* MAYBE merge common code with w_e_end_ov_req */
355 static int w_e_send_csum(struct drbd_work *w, int cancel)
356 {
357 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
358 	struct drbd_peer_device *peer_device = peer_req->peer_device;
359 	struct drbd_device *device = peer_device->device;
360 	int digest_size;
361 	void *digest;
362 	int err = 0;
363 
364 	if (unlikely(cancel))
365 		goto out;
366 
367 	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
368 		goto out;
369 
370 	digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
371 	digest = kmalloc(digest_size, GFP_NOIO);
372 	if (digest) {
373 		sector_t sector = peer_req->i.sector;
374 		unsigned int size = peer_req->i.size;
375 		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
376 		/* Free peer_req and pages before send.
377 		 * In case we block on congestion, we could otherwise run into
378 		 * some distributed deadlock, if the other side blocks on
379 		 * congestion as well, because our receiver blocks in
380 		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
381 		drbd_free_peer_req(device, peer_req);
382 		peer_req = NULL;
383 		inc_rs_pending(device);
384 		err = drbd_send_drequest_csum(peer_device, sector, size,
385 					      digest, digest_size,
386 					      P_CSUM_RS_REQUEST);
387 		kfree(digest);
388 	} else {
389 		drbd_err(device, "kmalloc() of digest failed.\n");
390 		err = -ENOMEM;
391 	}
392 
393 out:
394 	if (peer_req)
395 		drbd_free_peer_req(device, peer_req);
396 
397 	if (unlikely(err))
398 		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
399 	return err;
400 }
401 
402 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
403 
404 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
405 {
406 	struct drbd_device *device = peer_device->device;
407 	struct drbd_peer_request *peer_req;
408 
409 	if (!get_ldev(device))
410 		return -EIO;
411 
412 	/* GFP_TRY, because if there is no memory available right now, this may
413 	 * be rescheduled for later. It is "only" background resync, after all. */
414 	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
415 				       size, size, GFP_TRY);
416 	if (!peer_req)
417 		goto defer;
418 
419 	peer_req->w.cb = w_e_send_csum;
420 	spin_lock_irq(&device->resource->req_lock);
421 	list_add_tail(&peer_req->w.list, &device->read_ee);
422 	spin_unlock_irq(&device->resource->req_lock);
423 
424 	atomic_add(size >> 9, &device->rs_sect_ev);
425 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
426 				     DRBD_FAULT_RS_RD) == 0)
427 		return 0;
428 
429 	/* If it failed because of ENOMEM, retry should help.  If it failed
430 	 * because bio_add_page failed (probably broken lower level driver),
431 	 * retry may or may not help.
432 	 * If it does not, you may need to force disconnect. */
433 	spin_lock_irq(&device->resource->req_lock);
434 	list_del(&peer_req->w.list);
435 	spin_unlock_irq(&device->resource->req_lock);
436 
437 	drbd_free_peer_req(device, peer_req);
438 defer:
439 	put_ldev(device);
440 	return -EAGAIN;
441 }
442 
443 int w_resync_timer(struct drbd_work *w, int cancel)
444 {
445 	struct drbd_device *device =
446 		container_of(w, struct drbd_device, resync_work);
447 
448 	switch (device->state.conn) {
449 	case C_VERIFY_S:
450 		make_ov_request(device, cancel);
451 		break;
452 	case C_SYNC_TARGET:
453 		make_resync_request(device, cancel);
454 		break;
455 	}
456 
457 	return 0;
458 }
459 
460 void resync_timer_fn(struct timer_list *t)
461 {
462 	struct drbd_device *device = from_timer(device, t, resync_timer);
463 
464 	drbd_queue_work_if_unqueued(
465 		&first_peer_device(device)->connection->sender_work,
466 		&device->resync_work);
467 }
468 
469 static void fifo_set(struct fifo_buffer *fb, int value)
470 {
471 	int i;
472 
473 	for (i = 0; i < fb->size; i++)
474 		fb->values[i] = value;
475 }
476 
477 static int fifo_push(struct fifo_buffer *fb, int value)
478 {
479 	int ov;
480 
481 	ov = fb->values[fb->head_index];
482 	fb->values[fb->head_index++] = value;
483 
484 	if (fb->head_index >= fb->size)
485 		fb->head_index = 0;
486 
487 	return ov;
488 }
489 
490 static void fifo_add_val(struct fifo_buffer *fb, int value)
491 {
492 	int i;
493 
494 	for (i = 0; i < fb->size; i++)
495 		fb->values[i] += value;
496 }
497 
498 struct fifo_buffer *fifo_alloc(int fifo_size)
499 {
500 	struct fifo_buffer *fb;
501 
502 	fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
503 	if (!fb)
504 		return NULL;
505 
506 	fb->head_index = 0;
507 	fb->size = fifo_size;
508 	fb->total = 0;
509 
510 	return fb;
511 }
512 
513 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
514 {
515 	struct disk_conf *dc;
516 	unsigned int want;     /* The number of sectors we want in-flight */
517 	int req_sect; /* Number of sectors to request in this turn */
518 	int correction; /* Number of sectors more we need in-flight */
519 	int cps; /* correction per invocation of drbd_rs_controller() */
520 	int steps; /* Number of time steps to plan ahead */
521 	int curr_corr;
522 	int max_sect;
523 	struct fifo_buffer *plan;
524 
525 	dc = rcu_dereference(device->ldev->disk_conf);
526 	plan = rcu_dereference(device->rs_plan_s);
527 
528 	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
529 
530 	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
531 		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
532 	} else { /* normal path */
533 		want = dc->c_fill_target ? dc->c_fill_target :
534 			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
535 	}
536 
537 	correction = want - device->rs_in_flight - plan->total;
538 
539 	/* Plan ahead */
540 	cps = correction / steps;
541 	fifo_add_val(plan, cps);
542 	plan->total += cps * steps;
543 
544 	/* What we do in this step */
545 	curr_corr = fifo_push(plan, 0);
546 	plan->total -= curr_corr;
547 
548 	req_sect = sect_in + curr_corr;
549 	if (req_sect < 0)
550 		req_sect = 0;
551 
552 	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
553 	if (req_sect > max_sect)
554 		req_sect = max_sect;
555 
556 	/*
557 	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
558 		 sect_in, device->rs_in_flight, want, correction,
559 		 steps, cps, device->rs_planed, curr_corr, req_sect);
560 	*/
561 
562 	return req_sect;
563 }
564 
565 static int drbd_rs_number_requests(struct drbd_device *device)
566 {
567 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
568 	int number, mxb;
569 
570 	sect_in = atomic_xchg(&device->rs_sect_in, 0);
571 	device->rs_in_flight -= sect_in;
572 
573 	rcu_read_lock();
574 	mxb = drbd_get_max_buffers(device) / 2;
575 	if (rcu_dereference(device->rs_plan_s)->size) {
576 		number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
577 		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
578 	} else {
579 		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
580 		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
581 	}
582 	rcu_read_unlock();
583 
584 	/* Don't have more than "max-buffers"/2 in-flight.
585 	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
586 	 * potentially causing a distributed deadlock on congestion during
587 	 * online-verify or (checksum-based) resync, if max-buffers,
588 	 * socket buffer sizes and resync rate settings are mis-configured. */
589 
590 	/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
591 	 * mxb (as used here, and in drbd_alloc_pages on the peer) is
592 	 * "number of pages" (typically also 4k),
593 	 * but "rs_in_flight" is in "sectors" (512 Byte). */
594 	if (mxb - device->rs_in_flight/8 < number)
595 		number = mxb - device->rs_in_flight/8;
596 
597 	return number;
598 }
599 
600 static int make_resync_request(struct drbd_device *const device, int cancel)
601 {
602 	struct drbd_peer_device *const peer_device = first_peer_device(device);
603 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
604 	unsigned long bit;
605 	sector_t sector;
606 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
607 	int max_bio_size;
608 	int number, rollback_i, size;
609 	int align, requeue = 0;
610 	int i = 0;
611 	int discard_granularity = 0;
612 
613 	if (unlikely(cancel))
614 		return 0;
615 
616 	if (device->rs_total == 0) {
617 		/* empty resync? */
618 		drbd_resync_finished(device);
619 		return 0;
620 	}
621 
622 	if (!get_ldev(device)) {
623 		/* Since we only need to access device->rsync a
624 		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
625 		   to continue resync with a broken disk makes no sense at
626 		   all */
627 		drbd_err(device, "Disk broke down during resync!\n");
628 		return 0;
629 	}
630 
631 	if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
632 		rcu_read_lock();
633 		discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
634 		rcu_read_unlock();
635 	}
636 
637 	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
638 	number = drbd_rs_number_requests(device);
639 	if (number <= 0)
640 		goto requeue;
641 
642 	for (i = 0; i < number; i++) {
643 		/* Stop generating RS requests when half of the send buffer is filled,
644 		 * but notify TCP that we'd like to have more space. */
645 		mutex_lock(&connection->data.mutex);
646 		if (connection->data.socket) {
647 			struct sock *sk = connection->data.socket->sk;
648 			int queued = sk->sk_wmem_queued;
649 			int sndbuf = sk->sk_sndbuf;
650 			if (queued > sndbuf / 2) {
651 				requeue = 1;
652 				if (sk->sk_socket)
653 					set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
654 			}
655 		} else
656 			requeue = 1;
657 		mutex_unlock(&connection->data.mutex);
658 		if (requeue)
659 			goto requeue;
660 
661 next_sector:
662 		size = BM_BLOCK_SIZE;
663 		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
664 
665 		if (bit == DRBD_END_OF_BITMAP) {
666 			device->bm_resync_fo = drbd_bm_bits(device);
667 			put_ldev(device);
668 			return 0;
669 		}
670 
671 		sector = BM_BIT_TO_SECT(bit);
672 
673 		if (drbd_try_rs_begin_io(device, sector)) {
674 			device->bm_resync_fo = bit;
675 			goto requeue;
676 		}
677 		device->bm_resync_fo = bit + 1;
678 
679 		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
680 			drbd_rs_complete_io(device, sector);
681 			goto next_sector;
682 		}
683 
684 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
685 		/* try to find some adjacent bits.
686 		 * we stop if we have already the maximum req size.
687 		 *
688 		 * Additionally always align bigger requests, in order to
689 		 * be prepared for all stripe sizes of software RAIDs.
690 		 */
691 		align = 1;
692 		rollback_i = i;
693 		while (i < number) {
694 			if (size + BM_BLOCK_SIZE > max_bio_size)
695 				break;
696 
697 			/* Be always aligned */
698 			if (sector & ((1<<(align+3))-1))
699 				break;
700 
701 			if (discard_granularity && size == discard_granularity)
702 				break;
703 
704 			/* do not cross extent boundaries */
705 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
706 				break;
707 			/* now, is it actually dirty, after all?
708 			 * caution, drbd_bm_test_bit is tri-state for some
709 			 * obscure reason; ( b == 0 ) would get the out-of-band
710 			 * only accidentally right because of the "oddly sized"
711 			 * adjustment below */
712 			if (drbd_bm_test_bit(device, bit+1) != 1)
713 				break;
714 			bit++;
715 			size += BM_BLOCK_SIZE;
716 			if ((BM_BLOCK_SIZE << align) <= size)
717 				align++;
718 			i++;
719 		}
720 		/* if we merged some,
721 		 * reset the offset to start the next drbd_bm_find_next from */
722 		if (size > BM_BLOCK_SIZE)
723 			device->bm_resync_fo = bit + 1;
724 #endif
725 
726 		/* adjust very last sectors, in case we are oddly sized */
727 		if (sector + (size>>9) > capacity)
728 			size = (capacity-sector)<<9;
729 
730 		if (device->use_csums) {
731 			switch (read_for_csum(peer_device, sector, size)) {
732 			case -EIO: /* Disk failure */
733 				put_ldev(device);
734 				return -EIO;
735 			case -EAGAIN: /* allocation failed, or ldev busy */
736 				drbd_rs_complete_io(device, sector);
737 				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
738 				i = rollback_i;
739 				goto requeue;
740 			case 0:
741 				/* everything ok */
742 				break;
743 			default:
744 				BUG();
745 			}
746 		} else {
747 			int err;
748 
749 			inc_rs_pending(device);
750 			err = drbd_send_drequest(peer_device,
751 						 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
752 						 sector, size, ID_SYNCER);
753 			if (err) {
754 				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
755 				dec_rs_pending(device);
756 				put_ldev(device);
757 				return err;
758 			}
759 		}
760 	}
761 
762 	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
763 		/* last syncer _request_ was sent,
764 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
765 		 * next sync group will resume), as soon as we receive the last
766 		 * resync data block, and the last bit is cleared.
767 		 * until then resync "work" is "inactive" ...
768 		 */
769 		put_ldev(device);
770 		return 0;
771 	}
772 
773  requeue:
774 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
775 	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
776 	put_ldev(device);
777 	return 0;
778 }
779 
780 static int make_ov_request(struct drbd_device *device, int cancel)
781 {
782 	int number, i, size;
783 	sector_t sector;
784 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
785 	bool stop_sector_reached = false;
786 
787 	if (unlikely(cancel))
788 		return 1;
789 
790 	number = drbd_rs_number_requests(device);
791 
792 	sector = device->ov_position;
793 	for (i = 0; i < number; i++) {
794 		if (sector >= capacity)
795 			return 1;
796 
797 		/* We check for "finished" only in the reply path:
798 		 * w_e_end_ov_reply().
799 		 * We need to send at least one request out. */
800 		stop_sector_reached = i > 0
801 			&& verify_can_do_stop_sector(device)
802 			&& sector >= device->ov_stop_sector;
803 		if (stop_sector_reached)
804 			break;
805 
806 		size = BM_BLOCK_SIZE;
807 
808 		if (drbd_try_rs_begin_io(device, sector)) {
809 			device->ov_position = sector;
810 			goto requeue;
811 		}
812 
813 		if (sector + (size>>9) > capacity)
814 			size = (capacity-sector)<<9;
815 
816 		inc_rs_pending(device);
817 		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
818 			dec_rs_pending(device);
819 			return 0;
820 		}
821 		sector += BM_SECT_PER_BIT;
822 	}
823 	device->ov_position = sector;
824 
825  requeue:
826 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
827 	if (i == 0 || !stop_sector_reached)
828 		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
829 	return 1;
830 }
831 
832 int w_ov_finished(struct drbd_work *w, int cancel)
833 {
834 	struct drbd_device_work *dw =
835 		container_of(w, struct drbd_device_work, w);
836 	struct drbd_device *device = dw->device;
837 	kfree(dw);
838 	ov_out_of_sync_print(device);
839 	drbd_resync_finished(device);
840 
841 	return 0;
842 }
843 
844 static int w_resync_finished(struct drbd_work *w, int cancel)
845 {
846 	struct drbd_device_work *dw =
847 		container_of(w, struct drbd_device_work, w);
848 	struct drbd_device *device = dw->device;
849 	kfree(dw);
850 
851 	drbd_resync_finished(device);
852 
853 	return 0;
854 }
855 
856 static void ping_peer(struct drbd_device *device)
857 {
858 	struct drbd_connection *connection = first_peer_device(device)->connection;
859 
860 	clear_bit(GOT_PING_ACK, &connection->flags);
861 	request_ping(connection);
862 	wait_event(connection->ping_wait,
863 		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
864 }
865 
866 int drbd_resync_finished(struct drbd_device *device)
867 {
868 	struct drbd_connection *connection = first_peer_device(device)->connection;
869 	unsigned long db, dt, dbdt;
870 	unsigned long n_oos;
871 	union drbd_state os, ns;
872 	struct drbd_device_work *dw;
873 	char *khelper_cmd = NULL;
874 	int verify_done = 0;
875 
876 	/* Remove all elements from the resync LRU. Since future actions
877 	 * might set bits in the (main) bitmap, then the entries in the
878 	 * resync LRU would be wrong. */
879 	if (drbd_rs_del_all(device)) {
880 		/* In case this is not possible now, most probably because
881 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
882 		 * queue (or even the read operations for those packets
883 		 * is not finished by now).   Retry in 100ms. */
884 
885 		schedule_timeout_interruptible(HZ / 10);
886 		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
887 		if (dw) {
888 			dw->w.cb = w_resync_finished;
889 			dw->device = device;
890 			drbd_queue_work(&connection->sender_work, &dw->w);
891 			return 1;
892 		}
893 		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
894 	}
895 
896 	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
897 	if (dt <= 0)
898 		dt = 1;
899 
900 	db = device->rs_total;
901 	/* adjust for verify start and stop sectors, respective reached position */
902 	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
903 		db -= device->ov_left;
904 
905 	dbdt = Bit2KB(db/dt);
906 	device->rs_paused /= HZ;
907 
908 	if (!get_ldev(device))
909 		goto out;
910 
911 	ping_peer(device);
912 
913 	spin_lock_irq(&device->resource->req_lock);
914 	os = drbd_read_state(device);
915 
916 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
917 
918 	/* This protects us against multiple calls (that can happen in the presence
919 	   of application IO), and against connectivity loss just before we arrive here. */
920 	if (os.conn <= C_CONNECTED)
921 		goto out_unlock;
922 
923 	ns = os;
924 	ns.conn = C_CONNECTED;
925 
926 	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
927 	     verify_done ? "Online verify" : "Resync",
928 	     dt + device->rs_paused, device->rs_paused, dbdt);
929 
930 	n_oos = drbd_bm_total_weight(device);
931 
932 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
933 		if (n_oos) {
934 			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
935 			      n_oos, Bit2KB(1));
936 			khelper_cmd = "out-of-sync";
937 		}
938 	} else {
939 		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
940 
941 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
942 			khelper_cmd = "after-resync-target";
943 
944 		if (device->use_csums && device->rs_total) {
945 			const unsigned long s = device->rs_same_csum;
946 			const unsigned long t = device->rs_total;
947 			const int ratio =
948 				(t == 0)     ? 0 :
949 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
950 			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
951 			     "transferred %luK total %luK\n",
952 			     ratio,
953 			     Bit2KB(device->rs_same_csum),
954 			     Bit2KB(device->rs_total - device->rs_same_csum),
955 			     Bit2KB(device->rs_total));
956 		}
957 	}
958 
959 	if (device->rs_failed) {
960 		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
961 
962 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
963 			ns.disk = D_INCONSISTENT;
964 			ns.pdsk = D_UP_TO_DATE;
965 		} else {
966 			ns.disk = D_UP_TO_DATE;
967 			ns.pdsk = D_INCONSISTENT;
968 		}
969 	} else {
970 		ns.disk = D_UP_TO_DATE;
971 		ns.pdsk = D_UP_TO_DATE;
972 
973 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
974 			if (device->p_uuid) {
975 				int i;
976 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
977 					_drbd_uuid_set(device, i, device->p_uuid[i]);
978 				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
979 				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
980 			} else {
981 				drbd_err(device, "device->p_uuid is NULL! BUG\n");
982 			}
983 		}
984 
985 		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
986 			/* for verify runs, we don't update uuids here,
987 			 * so there would be nothing to report. */
988 			drbd_uuid_set_bm(device, 0UL);
989 			drbd_print_uuids(device, "updated UUIDs");
990 			if (device->p_uuid) {
991 				/* Now the two UUID sets are equal, update what we
992 				 * know of the peer. */
993 				int i;
994 				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
995 					device->p_uuid[i] = device->ldev->md.uuid[i];
996 			}
997 		}
998 	}
999 
1000 	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
1001 out_unlock:
1002 	spin_unlock_irq(&device->resource->req_lock);
1003 
1004 	/* If we have been sync source, and have an effective fencing-policy,
1005 	 * once *all* volumes are back in sync, call "unfence". */
1006 	if (os.conn == C_SYNC_SOURCE) {
1007 		enum drbd_disk_state disk_state = D_MASK;
1008 		enum drbd_disk_state pdsk_state = D_MASK;
1009 		enum drbd_fencing_p fp = FP_DONT_CARE;
1010 
1011 		rcu_read_lock();
1012 		fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1013 		if (fp != FP_DONT_CARE) {
1014 			struct drbd_peer_device *peer_device;
1015 			int vnr;
1016 			idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1017 				struct drbd_device *device = peer_device->device;
1018 				disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1019 				pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1020 			}
1021 		}
1022 		rcu_read_unlock();
1023 		if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1024 			conn_khelper(connection, "unfence-peer");
1025 	}
1026 
1027 	put_ldev(device);
1028 out:
1029 	device->rs_total  = 0;
1030 	device->rs_failed = 0;
1031 	device->rs_paused = 0;
1032 
1033 	/* reset start sector, if we reached end of device */
1034 	if (verify_done && device->ov_left == 0)
1035 		device->ov_start_sector = 0;
1036 
1037 	drbd_md_sync(device);
1038 
1039 	if (khelper_cmd)
1040 		drbd_khelper(device, khelper_cmd);
1041 
1042 	return 1;
1043 }
1044 
1045 /* helper */
1046 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1047 {
1048 	if (drbd_peer_req_has_active_page(peer_req)) {
1049 		/* This might happen if sendpage() has not finished */
1050 		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1051 		atomic_add(i, &device->pp_in_use_by_net);
1052 		atomic_sub(i, &device->pp_in_use);
1053 		spin_lock_irq(&device->resource->req_lock);
1054 		list_add_tail(&peer_req->w.list, &device->net_ee);
1055 		spin_unlock_irq(&device->resource->req_lock);
1056 		wake_up(&drbd_pp_wait);
1057 	} else
1058 		drbd_free_peer_req(device, peer_req);
1059 }
1060 
1061 /**
1062  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1063  * @w:		work object.
1064  * @cancel:	The connection will be closed anyways
1065  */
1066 int w_e_end_data_req(struct drbd_work *w, int cancel)
1067 {
1068 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1069 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1070 	struct drbd_device *device = peer_device->device;
1071 	int err;
1072 
1073 	if (unlikely(cancel)) {
1074 		drbd_free_peer_req(device, peer_req);
1075 		dec_unacked(device);
1076 		return 0;
1077 	}
1078 
1079 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1080 		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1081 	} else {
1082 		if (__ratelimit(&drbd_ratelimit_state))
1083 			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1084 			    (unsigned long long)peer_req->i.sector);
1085 
1086 		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1087 	}
1088 
1089 	dec_unacked(device);
1090 
1091 	move_to_net_ee_or_free(device, peer_req);
1092 
1093 	if (unlikely(err))
1094 		drbd_err(device, "drbd_send_block() failed\n");
1095 	return err;
1096 }
1097 
1098 static bool all_zero(struct drbd_peer_request *peer_req)
1099 {
1100 	struct page *page = peer_req->pages;
1101 	unsigned int len = peer_req->i.size;
1102 
1103 	page_chain_for_each(page) {
1104 		unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1105 		unsigned int i, words = l / sizeof(long);
1106 		unsigned long *d;
1107 
1108 		d = kmap_atomic(page);
1109 		for (i = 0; i < words; i++) {
1110 			if (d[i]) {
1111 				kunmap_atomic(d);
1112 				return false;
1113 			}
1114 		}
1115 		kunmap_atomic(d);
1116 		len -= l;
1117 	}
1118 
1119 	return true;
1120 }
1121 
1122 /**
1123  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1124  * @w:		work object.
1125  * @cancel:	The connection will be closed anyways
1126  */
1127 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1128 {
1129 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1130 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1131 	struct drbd_device *device = peer_device->device;
1132 	int err;
1133 
1134 	if (unlikely(cancel)) {
1135 		drbd_free_peer_req(device, peer_req);
1136 		dec_unacked(device);
1137 		return 0;
1138 	}
1139 
1140 	if (get_ldev_if_state(device, D_FAILED)) {
1141 		drbd_rs_complete_io(device, peer_req->i.sector);
1142 		put_ldev(device);
1143 	}
1144 
1145 	if (device->state.conn == C_AHEAD) {
1146 		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1147 	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1148 		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1149 			inc_rs_pending(device);
1150 			if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1151 				err = drbd_send_rs_deallocated(peer_device, peer_req);
1152 			else
1153 				err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1154 		} else {
1155 			if (__ratelimit(&drbd_ratelimit_state))
1156 				drbd_err(device, "Not sending RSDataReply, "
1157 				    "partner DISKLESS!\n");
1158 			err = 0;
1159 		}
1160 	} else {
1161 		if (__ratelimit(&drbd_ratelimit_state))
1162 			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1163 			    (unsigned long long)peer_req->i.sector);
1164 
1165 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1166 
1167 		/* update resync data with failure */
1168 		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1169 	}
1170 
1171 	dec_unacked(device);
1172 
1173 	move_to_net_ee_or_free(device, peer_req);
1174 
1175 	if (unlikely(err))
1176 		drbd_err(device, "drbd_send_block() failed\n");
1177 	return err;
1178 }
1179 
1180 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1181 {
1182 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1183 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1184 	struct drbd_device *device = peer_device->device;
1185 	struct digest_info *di;
1186 	int digest_size;
1187 	void *digest = NULL;
1188 	int err, eq = 0;
1189 
1190 	if (unlikely(cancel)) {
1191 		drbd_free_peer_req(device, peer_req);
1192 		dec_unacked(device);
1193 		return 0;
1194 	}
1195 
1196 	if (get_ldev(device)) {
1197 		drbd_rs_complete_io(device, peer_req->i.sector);
1198 		put_ldev(device);
1199 	}
1200 
1201 	di = peer_req->digest;
1202 
1203 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1204 		/* quick hack to try to avoid a race against reconfiguration.
1205 		 * a real fix would be much more involved,
1206 		 * introducing more locking mechanisms */
1207 		if (peer_device->connection->csums_tfm) {
1208 			digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
1209 			D_ASSERT(device, digest_size == di->digest_size);
1210 			digest = kmalloc(digest_size, GFP_NOIO);
1211 		}
1212 		if (digest) {
1213 			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1214 			eq = !memcmp(digest, di->digest, digest_size);
1215 			kfree(digest);
1216 		}
1217 
1218 		if (eq) {
1219 			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1220 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1221 			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1222 			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1223 		} else {
1224 			inc_rs_pending(device);
1225 			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1226 			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1227 			kfree(di);
1228 			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1229 		}
1230 	} else {
1231 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1232 		if (__ratelimit(&drbd_ratelimit_state))
1233 			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1234 	}
1235 
1236 	dec_unacked(device);
1237 	move_to_net_ee_or_free(device, peer_req);
1238 
1239 	if (unlikely(err))
1240 		drbd_err(device, "drbd_send_block/ack() failed\n");
1241 	return err;
1242 }
1243 
1244 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1245 {
1246 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1247 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1248 	struct drbd_device *device = peer_device->device;
1249 	sector_t sector = peer_req->i.sector;
1250 	unsigned int size = peer_req->i.size;
1251 	int digest_size;
1252 	void *digest;
1253 	int err = 0;
1254 
1255 	if (unlikely(cancel))
1256 		goto out;
1257 
1258 	digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1259 	digest = kmalloc(digest_size, GFP_NOIO);
1260 	if (!digest) {
1261 		err = 1;	/* terminate the connection in case the allocation failed */
1262 		goto out;
1263 	}
1264 
1265 	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1266 		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1267 	else
1268 		memset(digest, 0, digest_size);
1269 
1270 	/* Free e and pages before send.
1271 	 * In case we block on congestion, we could otherwise run into
1272 	 * some distributed deadlock, if the other side blocks on
1273 	 * congestion as well, because our receiver blocks in
1274 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1275 	drbd_free_peer_req(device, peer_req);
1276 	peer_req = NULL;
1277 	inc_rs_pending(device);
1278 	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1279 	if (err)
1280 		dec_rs_pending(device);
1281 	kfree(digest);
1282 
1283 out:
1284 	if (peer_req)
1285 		drbd_free_peer_req(device, peer_req);
1286 	dec_unacked(device);
1287 	return err;
1288 }
1289 
1290 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1291 {
1292 	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1293 		device->ov_last_oos_size += size>>9;
1294 	} else {
1295 		device->ov_last_oos_start = sector;
1296 		device->ov_last_oos_size = size>>9;
1297 	}
1298 	drbd_set_out_of_sync(device, sector, size);
1299 }
1300 
1301 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1302 {
1303 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1304 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1305 	struct drbd_device *device = peer_device->device;
1306 	struct digest_info *di;
1307 	void *digest;
1308 	sector_t sector = peer_req->i.sector;
1309 	unsigned int size = peer_req->i.size;
1310 	int digest_size;
1311 	int err, eq = 0;
1312 	bool stop_sector_reached = false;
1313 
1314 	if (unlikely(cancel)) {
1315 		drbd_free_peer_req(device, peer_req);
1316 		dec_unacked(device);
1317 		return 0;
1318 	}
1319 
1320 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1321 	 * the resync lru has been cleaned up already */
1322 	if (get_ldev(device)) {
1323 		drbd_rs_complete_io(device, peer_req->i.sector);
1324 		put_ldev(device);
1325 	}
1326 
1327 	di = peer_req->digest;
1328 
1329 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1330 		digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1331 		digest = kmalloc(digest_size, GFP_NOIO);
1332 		if (digest) {
1333 			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1334 
1335 			D_ASSERT(device, digest_size == di->digest_size);
1336 			eq = !memcmp(digest, di->digest, digest_size);
1337 			kfree(digest);
1338 		}
1339 	}
1340 
1341 	/* Free peer_req and pages before send.
1342 	 * In case we block on congestion, we could otherwise run into
1343 	 * some distributed deadlock, if the other side blocks on
1344 	 * congestion as well, because our receiver blocks in
1345 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1346 	drbd_free_peer_req(device, peer_req);
1347 	if (!eq)
1348 		drbd_ov_out_of_sync_found(device, sector, size);
1349 	else
1350 		ov_out_of_sync_print(device);
1351 
1352 	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1353 			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1354 
1355 	dec_unacked(device);
1356 
1357 	--device->ov_left;
1358 
1359 	/* let's advance progress step marks only for every other megabyte */
1360 	if ((device->ov_left & 0x200) == 0x200)
1361 		drbd_advance_rs_marks(device, device->ov_left);
1362 
1363 	stop_sector_reached = verify_can_do_stop_sector(device) &&
1364 		(sector + (size>>9)) >= device->ov_stop_sector;
1365 
1366 	if (device->ov_left == 0 || stop_sector_reached) {
1367 		ov_out_of_sync_print(device);
1368 		drbd_resync_finished(device);
1369 	}
1370 
1371 	return err;
1372 }
1373 
1374 /* FIXME
1375  * We need to track the number of pending barrier acks,
1376  * and to be able to wait for them.
1377  * See also comment in drbd_adm_attach before drbd_suspend_io.
1378  */
1379 static int drbd_send_barrier(struct drbd_connection *connection)
1380 {
1381 	struct p_barrier *p;
1382 	struct drbd_socket *sock;
1383 
1384 	sock = &connection->data;
1385 	p = conn_prepare_command(connection, sock);
1386 	if (!p)
1387 		return -EIO;
1388 	p->barrier = connection->send.current_epoch_nr;
1389 	p->pad = 0;
1390 	connection->send.current_epoch_writes = 0;
1391 	connection->send.last_sent_barrier_jif = jiffies;
1392 
1393 	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1394 }
1395 
1396 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1397 {
1398 	struct drbd_socket *sock = &pd->connection->data;
1399 	if (!drbd_prepare_command(pd, sock))
1400 		return -EIO;
1401 	return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1402 }
1403 
1404 int w_send_write_hint(struct drbd_work *w, int cancel)
1405 {
1406 	struct drbd_device *device =
1407 		container_of(w, struct drbd_device, unplug_work);
1408 
1409 	if (cancel)
1410 		return 0;
1411 	return pd_send_unplug_remote(first_peer_device(device));
1412 }
1413 
1414 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1415 {
1416 	if (!connection->send.seen_any_write_yet) {
1417 		connection->send.seen_any_write_yet = true;
1418 		connection->send.current_epoch_nr = epoch;
1419 		connection->send.current_epoch_writes = 0;
1420 		connection->send.last_sent_barrier_jif = jiffies;
1421 	}
1422 }
1423 
1424 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1425 {
1426 	/* re-init if first write on this connection */
1427 	if (!connection->send.seen_any_write_yet)
1428 		return;
1429 	if (connection->send.current_epoch_nr != epoch) {
1430 		if (connection->send.current_epoch_writes)
1431 			drbd_send_barrier(connection);
1432 		connection->send.current_epoch_nr = epoch;
1433 	}
1434 }
1435 
1436 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1437 {
1438 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1439 	struct drbd_device *device = req->device;
1440 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1441 	struct drbd_connection *const connection = peer_device->connection;
1442 	int err;
1443 
1444 	if (unlikely(cancel)) {
1445 		req_mod(req, SEND_CANCELED);
1446 		return 0;
1447 	}
1448 	req->pre_send_jif = jiffies;
1449 
1450 	/* this time, no connection->send.current_epoch_writes++;
1451 	 * If it was sent, it was the closing barrier for the last
1452 	 * replicated epoch, before we went into AHEAD mode.
1453 	 * No more barriers will be sent, until we leave AHEAD mode again. */
1454 	maybe_send_barrier(connection, req->epoch);
1455 
1456 	err = drbd_send_out_of_sync(peer_device, req);
1457 	req_mod(req, OOS_HANDED_TO_NETWORK);
1458 
1459 	return err;
1460 }
1461 
1462 /**
1463  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1464  * @w:		work object.
1465  * @cancel:	The connection will be closed anyways
1466  */
1467 int w_send_dblock(struct drbd_work *w, int cancel)
1468 {
1469 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1470 	struct drbd_device *device = req->device;
1471 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1472 	struct drbd_connection *connection = peer_device->connection;
1473 	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1474 	int err;
1475 
1476 	if (unlikely(cancel)) {
1477 		req_mod(req, SEND_CANCELED);
1478 		return 0;
1479 	}
1480 	req->pre_send_jif = jiffies;
1481 
1482 	re_init_if_first_write(connection, req->epoch);
1483 	maybe_send_barrier(connection, req->epoch);
1484 	connection->send.current_epoch_writes++;
1485 
1486 	err = drbd_send_dblock(peer_device, req);
1487 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1488 
1489 	if (do_send_unplug && !err)
1490 		pd_send_unplug_remote(peer_device);
1491 
1492 	return err;
1493 }
1494 
1495 /**
1496  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1497  * @w:		work object.
1498  * @cancel:	The connection will be closed anyways
1499  */
1500 int w_send_read_req(struct drbd_work *w, int cancel)
1501 {
1502 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1503 	struct drbd_device *device = req->device;
1504 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1505 	struct drbd_connection *connection = peer_device->connection;
1506 	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1507 	int err;
1508 
1509 	if (unlikely(cancel)) {
1510 		req_mod(req, SEND_CANCELED);
1511 		return 0;
1512 	}
1513 	req->pre_send_jif = jiffies;
1514 
1515 	/* Even read requests may close a write epoch,
1516 	 * if there was any yet. */
1517 	maybe_send_barrier(connection, req->epoch);
1518 
1519 	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1520 				 (unsigned long)req);
1521 
1522 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1523 
1524 	if (do_send_unplug && !err)
1525 		pd_send_unplug_remote(peer_device);
1526 
1527 	return err;
1528 }
1529 
1530 int w_restart_disk_io(struct drbd_work *w, int cancel)
1531 {
1532 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1533 	struct drbd_device *device = req->device;
1534 
1535 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1536 		drbd_al_begin_io(device, &req->i);
1537 
1538 	drbd_req_make_private_bio(req, req->master_bio);
1539 	bio_set_dev(req->private_bio, device->ldev->backing_bdev);
1540 	generic_make_request(req->private_bio);
1541 
1542 	return 0;
1543 }
1544 
1545 static int _drbd_may_sync_now(struct drbd_device *device)
1546 {
1547 	struct drbd_device *odev = device;
1548 	int resync_after;
1549 
1550 	while (1) {
1551 		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1552 			return 1;
1553 		rcu_read_lock();
1554 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1555 		rcu_read_unlock();
1556 		if (resync_after == -1)
1557 			return 1;
1558 		odev = minor_to_device(resync_after);
1559 		if (!odev)
1560 			return 1;
1561 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1562 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1563 		    odev->state.aftr_isp || odev->state.peer_isp ||
1564 		    odev->state.user_isp)
1565 			return 0;
1566 	}
1567 }
1568 
1569 /**
1570  * drbd_pause_after() - Pause resync on all devices that may not resync now
1571  * @device:	DRBD device.
1572  *
1573  * Called from process context only (admin command and after_state_ch).
1574  */
1575 static bool drbd_pause_after(struct drbd_device *device)
1576 {
1577 	bool changed = false;
1578 	struct drbd_device *odev;
1579 	int i;
1580 
1581 	rcu_read_lock();
1582 	idr_for_each_entry(&drbd_devices, odev, i) {
1583 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1584 			continue;
1585 		if (!_drbd_may_sync_now(odev) &&
1586 		    _drbd_set_state(_NS(odev, aftr_isp, 1),
1587 				    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1588 			changed = true;
1589 	}
1590 	rcu_read_unlock();
1591 
1592 	return changed;
1593 }
1594 
1595 /**
1596  * drbd_resume_next() - Resume resync on all devices that may resync now
1597  * @device:	DRBD device.
1598  *
1599  * Called from process context only (admin command and worker).
1600  */
1601 static bool drbd_resume_next(struct drbd_device *device)
1602 {
1603 	bool changed = false;
1604 	struct drbd_device *odev;
1605 	int i;
1606 
1607 	rcu_read_lock();
1608 	idr_for_each_entry(&drbd_devices, odev, i) {
1609 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1610 			continue;
1611 		if (odev->state.aftr_isp) {
1612 			if (_drbd_may_sync_now(odev) &&
1613 			    _drbd_set_state(_NS(odev, aftr_isp, 0),
1614 					    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1615 				changed = true;
1616 		}
1617 	}
1618 	rcu_read_unlock();
1619 	return changed;
1620 }
1621 
1622 void resume_next_sg(struct drbd_device *device)
1623 {
1624 	lock_all_resources();
1625 	drbd_resume_next(device);
1626 	unlock_all_resources();
1627 }
1628 
1629 void suspend_other_sg(struct drbd_device *device)
1630 {
1631 	lock_all_resources();
1632 	drbd_pause_after(device);
1633 	unlock_all_resources();
1634 }
1635 
1636 /* caller must lock_all_resources() */
1637 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1638 {
1639 	struct drbd_device *odev;
1640 	int resync_after;
1641 
1642 	if (o_minor == -1)
1643 		return NO_ERROR;
1644 	if (o_minor < -1 || o_minor > MINORMASK)
1645 		return ERR_RESYNC_AFTER;
1646 
1647 	/* check for loops */
1648 	odev = minor_to_device(o_minor);
1649 	while (1) {
1650 		if (odev == device)
1651 			return ERR_RESYNC_AFTER_CYCLE;
1652 
1653 		/* You are free to depend on diskless, non-existing,
1654 		 * or not yet/no longer existing minors.
1655 		 * We only reject dependency loops.
1656 		 * We cannot follow the dependency chain beyond a detached or
1657 		 * missing minor.
1658 		 */
1659 		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1660 			return NO_ERROR;
1661 
1662 		rcu_read_lock();
1663 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1664 		rcu_read_unlock();
1665 		/* dependency chain ends here, no cycles. */
1666 		if (resync_after == -1)
1667 			return NO_ERROR;
1668 
1669 		/* follow the dependency chain */
1670 		odev = minor_to_device(resync_after);
1671 	}
1672 }
1673 
1674 /* caller must lock_all_resources() */
1675 void drbd_resync_after_changed(struct drbd_device *device)
1676 {
1677 	int changed;
1678 
1679 	do {
1680 		changed  = drbd_pause_after(device);
1681 		changed |= drbd_resume_next(device);
1682 	} while (changed);
1683 }
1684 
1685 void drbd_rs_controller_reset(struct drbd_device *device)
1686 {
1687 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1688 	struct fifo_buffer *plan;
1689 
1690 	atomic_set(&device->rs_sect_in, 0);
1691 	atomic_set(&device->rs_sect_ev, 0);
1692 	device->rs_in_flight = 0;
1693 	device->rs_last_events =
1694 		(int)part_stat_read(&disk->part0, sectors[0]) +
1695 		(int)part_stat_read(&disk->part0, sectors[1]);
1696 
1697 	/* Updating the RCU protected object in place is necessary since
1698 	   this function gets called from atomic context.
1699 	   It is valid since all other updates also lead to an completely
1700 	   empty fifo */
1701 	rcu_read_lock();
1702 	plan = rcu_dereference(device->rs_plan_s);
1703 	plan->total = 0;
1704 	fifo_set(plan, 0);
1705 	rcu_read_unlock();
1706 }
1707 
1708 void start_resync_timer_fn(struct timer_list *t)
1709 {
1710 	struct drbd_device *device = from_timer(device, t, start_resync_timer);
1711 	drbd_device_post_work(device, RS_START);
1712 }
1713 
1714 static void do_start_resync(struct drbd_device *device)
1715 {
1716 	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1717 		drbd_warn(device, "postponing start_resync ...\n");
1718 		device->start_resync_timer.expires = jiffies + HZ/10;
1719 		add_timer(&device->start_resync_timer);
1720 		return;
1721 	}
1722 
1723 	drbd_start_resync(device, C_SYNC_SOURCE);
1724 	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1725 }
1726 
1727 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1728 {
1729 	bool csums_after_crash_only;
1730 	rcu_read_lock();
1731 	csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1732 	rcu_read_unlock();
1733 	return connection->agreed_pro_version >= 89 &&		/* supported? */
1734 		connection->csums_tfm &&			/* configured? */
1735 		(csums_after_crash_only == false		/* use for each resync? */
1736 		 || test_bit(CRASHED_PRIMARY, &device->flags));	/* or only after Primary crash? */
1737 }
1738 
1739 /**
1740  * drbd_start_resync() - Start the resync process
1741  * @device:	DRBD device.
1742  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1743  *
1744  * This function might bring you directly into one of the
1745  * C_PAUSED_SYNC_* states.
1746  */
1747 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1748 {
1749 	struct drbd_peer_device *peer_device = first_peer_device(device);
1750 	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1751 	union drbd_state ns;
1752 	int r;
1753 
1754 	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1755 		drbd_err(device, "Resync already running!\n");
1756 		return;
1757 	}
1758 
1759 	if (!connection) {
1760 		drbd_err(device, "No connection to peer, aborting!\n");
1761 		return;
1762 	}
1763 
1764 	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1765 		if (side == C_SYNC_TARGET) {
1766 			/* Since application IO was locked out during C_WF_BITMAP_T and
1767 			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1768 			   we check that we might make the data inconsistent. */
1769 			r = drbd_khelper(device, "before-resync-target");
1770 			r = (r >> 8) & 0xff;
1771 			if (r > 0) {
1772 				drbd_info(device, "before-resync-target handler returned %d, "
1773 					 "dropping connection.\n", r);
1774 				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1775 				return;
1776 			}
1777 		} else /* C_SYNC_SOURCE */ {
1778 			r = drbd_khelper(device, "before-resync-source");
1779 			r = (r >> 8) & 0xff;
1780 			if (r > 0) {
1781 				if (r == 3) {
1782 					drbd_info(device, "before-resync-source handler returned %d, "
1783 						 "ignoring. Old userland tools?", r);
1784 				} else {
1785 					drbd_info(device, "before-resync-source handler returned %d, "
1786 						 "dropping connection.\n", r);
1787 					conn_request_state(connection,
1788 							   NS(conn, C_DISCONNECTING), CS_HARD);
1789 					return;
1790 				}
1791 			}
1792 		}
1793 	}
1794 
1795 	if (current == connection->worker.task) {
1796 		/* The worker should not sleep waiting for state_mutex,
1797 		   that can take long */
1798 		if (!mutex_trylock(device->state_mutex)) {
1799 			set_bit(B_RS_H_DONE, &device->flags);
1800 			device->start_resync_timer.expires = jiffies + HZ/5;
1801 			add_timer(&device->start_resync_timer);
1802 			return;
1803 		}
1804 	} else {
1805 		mutex_lock(device->state_mutex);
1806 	}
1807 
1808 	lock_all_resources();
1809 	clear_bit(B_RS_H_DONE, &device->flags);
1810 	/* Did some connection breakage or IO error race with us? */
1811 	if (device->state.conn < C_CONNECTED
1812 	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1813 		unlock_all_resources();
1814 		goto out;
1815 	}
1816 
1817 	ns = drbd_read_state(device);
1818 
1819 	ns.aftr_isp = !_drbd_may_sync_now(device);
1820 
1821 	ns.conn = side;
1822 
1823 	if (side == C_SYNC_TARGET)
1824 		ns.disk = D_INCONSISTENT;
1825 	else /* side == C_SYNC_SOURCE */
1826 		ns.pdsk = D_INCONSISTENT;
1827 
1828 	r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1829 	ns = drbd_read_state(device);
1830 
1831 	if (ns.conn < C_CONNECTED)
1832 		r = SS_UNKNOWN_ERROR;
1833 
1834 	if (r == SS_SUCCESS) {
1835 		unsigned long tw = drbd_bm_total_weight(device);
1836 		unsigned long now = jiffies;
1837 		int i;
1838 
1839 		device->rs_failed    = 0;
1840 		device->rs_paused    = 0;
1841 		device->rs_same_csum = 0;
1842 		device->rs_last_sect_ev = 0;
1843 		device->rs_total     = tw;
1844 		device->rs_start     = now;
1845 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1846 			device->rs_mark_left[i] = tw;
1847 			device->rs_mark_time[i] = now;
1848 		}
1849 		drbd_pause_after(device);
1850 		/* Forget potentially stale cached per resync extent bit-counts.
1851 		 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1852 		 * disabled, and know the disk state is ok. */
1853 		spin_lock(&device->al_lock);
1854 		lc_reset(device->resync);
1855 		device->resync_locked = 0;
1856 		device->resync_wenr = LC_FREE;
1857 		spin_unlock(&device->al_lock);
1858 	}
1859 	unlock_all_resources();
1860 
1861 	if (r == SS_SUCCESS) {
1862 		wake_up(&device->al_wait); /* for lc_reset() above */
1863 		/* reset rs_last_bcast when a resync or verify is started,
1864 		 * to deal with potential jiffies wrap. */
1865 		device->rs_last_bcast = jiffies - HZ;
1866 
1867 		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1868 		     drbd_conn_str(ns.conn),
1869 		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1870 		     (unsigned long) device->rs_total);
1871 		if (side == C_SYNC_TARGET) {
1872 			device->bm_resync_fo = 0;
1873 			device->use_csums = use_checksum_based_resync(connection, device);
1874 		} else {
1875 			device->use_csums = false;
1876 		}
1877 
1878 		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1879 		 * with w_send_oos, or the sync target will get confused as to
1880 		 * how much bits to resync.  We cannot do that always, because for an
1881 		 * empty resync and protocol < 95, we need to do it here, as we call
1882 		 * drbd_resync_finished from here in that case.
1883 		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1884 		 * and from after_state_ch otherwise. */
1885 		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1886 			drbd_gen_and_send_sync_uuid(peer_device);
1887 
1888 		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1889 			/* This still has a race (about when exactly the peers
1890 			 * detect connection loss) that can lead to a full sync
1891 			 * on next handshake. In 8.3.9 we fixed this with explicit
1892 			 * resync-finished notifications, but the fix
1893 			 * introduces a protocol change.  Sleeping for some
1894 			 * time longer than the ping interval + timeout on the
1895 			 * SyncSource, to give the SyncTarget the chance to
1896 			 * detect connection loss, then waiting for a ping
1897 			 * response (implicit in drbd_resync_finished) reduces
1898 			 * the race considerably, but does not solve it. */
1899 			if (side == C_SYNC_SOURCE) {
1900 				struct net_conf *nc;
1901 				int timeo;
1902 
1903 				rcu_read_lock();
1904 				nc = rcu_dereference(connection->net_conf);
1905 				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1906 				rcu_read_unlock();
1907 				schedule_timeout_interruptible(timeo);
1908 			}
1909 			drbd_resync_finished(device);
1910 		}
1911 
1912 		drbd_rs_controller_reset(device);
1913 		/* ns.conn may already be != device->state.conn,
1914 		 * we may have been paused in between, or become paused until
1915 		 * the timer triggers.
1916 		 * No matter, that is handled in resync_timer_fn() */
1917 		if (ns.conn == C_SYNC_TARGET)
1918 			mod_timer(&device->resync_timer, jiffies);
1919 
1920 		drbd_md_sync(device);
1921 	}
1922 	put_ldev(device);
1923 out:
1924 	mutex_unlock(device->state_mutex);
1925 }
1926 
1927 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1928 {
1929 	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1930 	device->rs_last_bcast = jiffies;
1931 
1932 	if (!get_ldev(device))
1933 		return;
1934 
1935 	drbd_bm_write_lazy(device, 0);
1936 	if (resync_done && is_sync_state(device->state.conn))
1937 		drbd_resync_finished(device);
1938 
1939 	drbd_bcast_event(device, &sib);
1940 	/* update timestamp, in case it took a while to write out stuff */
1941 	device->rs_last_bcast = jiffies;
1942 	put_ldev(device);
1943 }
1944 
1945 static void drbd_ldev_destroy(struct drbd_device *device)
1946 {
1947 	lc_destroy(device->resync);
1948 	device->resync = NULL;
1949 	lc_destroy(device->act_log);
1950 	device->act_log = NULL;
1951 
1952 	__acquire(local);
1953 	drbd_backing_dev_free(device, device->ldev);
1954 	device->ldev = NULL;
1955 	__release(local);
1956 
1957 	clear_bit(GOING_DISKLESS, &device->flags);
1958 	wake_up(&device->misc_wait);
1959 }
1960 
1961 static void go_diskless(struct drbd_device *device)
1962 {
1963 	D_ASSERT(device, device->state.disk == D_FAILED);
1964 	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1965 	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1966 	 * the protected members anymore, though, so once put_ldev reaches zero
1967 	 * again, it will be safe to free them. */
1968 
1969 	/* Try to write changed bitmap pages, read errors may have just
1970 	 * set some bits outside the area covered by the activity log.
1971 	 *
1972 	 * If we have an IO error during the bitmap writeout,
1973 	 * we will want a full sync next time, just in case.
1974 	 * (Do we want a specific meta data flag for this?)
1975 	 *
1976 	 * If that does not make it to stable storage either,
1977 	 * we cannot do anything about that anymore.
1978 	 *
1979 	 * We still need to check if both bitmap and ldev are present, we may
1980 	 * end up here after a failed attach, before ldev was even assigned.
1981 	 */
1982 	if (device->bitmap && device->ldev) {
1983 		/* An interrupted resync or similar is allowed to recounts bits
1984 		 * while we detach.
1985 		 * Any modifications would not be expected anymore, though.
1986 		 */
1987 		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1988 					"detach", BM_LOCKED_TEST_ALLOWED)) {
1989 			if (test_bit(WAS_READ_ERROR, &device->flags)) {
1990 				drbd_md_set_flag(device, MDF_FULL_SYNC);
1991 				drbd_md_sync(device);
1992 			}
1993 		}
1994 	}
1995 
1996 	drbd_force_state(device, NS(disk, D_DISKLESS));
1997 }
1998 
1999 static int do_md_sync(struct drbd_device *device)
2000 {
2001 	drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
2002 	drbd_md_sync(device);
2003 	return 0;
2004 }
2005 
2006 /* only called from drbd_worker thread, no locking */
2007 void __update_timing_details(
2008 		struct drbd_thread_timing_details *tdp,
2009 		unsigned int *cb_nr,
2010 		void *cb,
2011 		const char *fn, const unsigned int line)
2012 {
2013 	unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2014 	struct drbd_thread_timing_details *td = tdp + i;
2015 
2016 	td->start_jif = jiffies;
2017 	td->cb_addr = cb;
2018 	td->caller_fn = fn;
2019 	td->line = line;
2020 	td->cb_nr = *cb_nr;
2021 
2022 	i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2023 	td = tdp + i;
2024 	memset(td, 0, sizeof(*td));
2025 
2026 	++(*cb_nr);
2027 }
2028 
2029 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2030 {
2031 	if (test_bit(MD_SYNC, &todo))
2032 		do_md_sync(device);
2033 	if (test_bit(RS_DONE, &todo) ||
2034 	    test_bit(RS_PROGRESS, &todo))
2035 		update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2036 	if (test_bit(GO_DISKLESS, &todo))
2037 		go_diskless(device);
2038 	if (test_bit(DESTROY_DISK, &todo))
2039 		drbd_ldev_destroy(device);
2040 	if (test_bit(RS_START, &todo))
2041 		do_start_resync(device);
2042 }
2043 
2044 #define DRBD_DEVICE_WORK_MASK	\
2045 	((1UL << GO_DISKLESS)	\
2046 	|(1UL << DESTROY_DISK)	\
2047 	|(1UL << MD_SYNC)	\
2048 	|(1UL << RS_START)	\
2049 	|(1UL << RS_PROGRESS)	\
2050 	|(1UL << RS_DONE)	\
2051 	)
2052 
2053 static unsigned long get_work_bits(unsigned long *flags)
2054 {
2055 	unsigned long old, new;
2056 	do {
2057 		old = *flags;
2058 		new = old & ~DRBD_DEVICE_WORK_MASK;
2059 	} while (cmpxchg(flags, old, new) != old);
2060 	return old & DRBD_DEVICE_WORK_MASK;
2061 }
2062 
2063 static void do_unqueued_work(struct drbd_connection *connection)
2064 {
2065 	struct drbd_peer_device *peer_device;
2066 	int vnr;
2067 
2068 	rcu_read_lock();
2069 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2070 		struct drbd_device *device = peer_device->device;
2071 		unsigned long todo = get_work_bits(&device->flags);
2072 		if (!todo)
2073 			continue;
2074 
2075 		kref_get(&device->kref);
2076 		rcu_read_unlock();
2077 		do_device_work(device, todo);
2078 		kref_put(&device->kref, drbd_destroy_device);
2079 		rcu_read_lock();
2080 	}
2081 	rcu_read_unlock();
2082 }
2083 
2084 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2085 {
2086 	spin_lock_irq(&queue->q_lock);
2087 	list_splice_tail_init(&queue->q, work_list);
2088 	spin_unlock_irq(&queue->q_lock);
2089 	return !list_empty(work_list);
2090 }
2091 
2092 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2093 {
2094 	DEFINE_WAIT(wait);
2095 	struct net_conf *nc;
2096 	int uncork, cork;
2097 
2098 	dequeue_work_batch(&connection->sender_work, work_list);
2099 	if (!list_empty(work_list))
2100 		return;
2101 
2102 	/* Still nothing to do?
2103 	 * Maybe we still need to close the current epoch,
2104 	 * even if no new requests are queued yet.
2105 	 *
2106 	 * Also, poke TCP, just in case.
2107 	 * Then wait for new work (or signal). */
2108 	rcu_read_lock();
2109 	nc = rcu_dereference(connection->net_conf);
2110 	uncork = nc ? nc->tcp_cork : 0;
2111 	rcu_read_unlock();
2112 	if (uncork) {
2113 		mutex_lock(&connection->data.mutex);
2114 		if (connection->data.socket)
2115 			drbd_tcp_uncork(connection->data.socket);
2116 		mutex_unlock(&connection->data.mutex);
2117 	}
2118 
2119 	for (;;) {
2120 		int send_barrier;
2121 		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2122 		spin_lock_irq(&connection->resource->req_lock);
2123 		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2124 		if (!list_empty(&connection->sender_work.q))
2125 			list_splice_tail_init(&connection->sender_work.q, work_list);
2126 		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2127 		if (!list_empty(work_list) || signal_pending(current)) {
2128 			spin_unlock_irq(&connection->resource->req_lock);
2129 			break;
2130 		}
2131 
2132 		/* We found nothing new to do, no to-be-communicated request,
2133 		 * no other work item.  We may still need to close the last
2134 		 * epoch.  Next incoming request epoch will be connection ->
2135 		 * current transfer log epoch number.  If that is different
2136 		 * from the epoch of the last request we communicated, it is
2137 		 * safe to send the epoch separating barrier now.
2138 		 */
2139 		send_barrier =
2140 			atomic_read(&connection->current_tle_nr) !=
2141 			connection->send.current_epoch_nr;
2142 		spin_unlock_irq(&connection->resource->req_lock);
2143 
2144 		if (send_barrier)
2145 			maybe_send_barrier(connection,
2146 					connection->send.current_epoch_nr + 1);
2147 
2148 		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2149 			break;
2150 
2151 		/* drbd_send() may have called flush_signals() */
2152 		if (get_t_state(&connection->worker) != RUNNING)
2153 			break;
2154 
2155 		schedule();
2156 		/* may be woken up for other things but new work, too,
2157 		 * e.g. if the current epoch got closed.
2158 		 * In which case we send the barrier above. */
2159 	}
2160 	finish_wait(&connection->sender_work.q_wait, &wait);
2161 
2162 	/* someone may have changed the config while we have been waiting above. */
2163 	rcu_read_lock();
2164 	nc = rcu_dereference(connection->net_conf);
2165 	cork = nc ? nc->tcp_cork : 0;
2166 	rcu_read_unlock();
2167 	mutex_lock(&connection->data.mutex);
2168 	if (connection->data.socket) {
2169 		if (cork)
2170 			drbd_tcp_cork(connection->data.socket);
2171 		else if (!uncork)
2172 			drbd_tcp_uncork(connection->data.socket);
2173 	}
2174 	mutex_unlock(&connection->data.mutex);
2175 }
2176 
2177 int drbd_worker(struct drbd_thread *thi)
2178 {
2179 	struct drbd_connection *connection = thi->connection;
2180 	struct drbd_work *w = NULL;
2181 	struct drbd_peer_device *peer_device;
2182 	LIST_HEAD(work_list);
2183 	int vnr;
2184 
2185 	while (get_t_state(thi) == RUNNING) {
2186 		drbd_thread_current_set_cpu(thi);
2187 
2188 		if (list_empty(&work_list)) {
2189 			update_worker_timing_details(connection, wait_for_work);
2190 			wait_for_work(connection, &work_list);
2191 		}
2192 
2193 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2194 			update_worker_timing_details(connection, do_unqueued_work);
2195 			do_unqueued_work(connection);
2196 		}
2197 
2198 		if (signal_pending(current)) {
2199 			flush_signals(current);
2200 			if (get_t_state(thi) == RUNNING) {
2201 				drbd_warn(connection, "Worker got an unexpected signal\n");
2202 				continue;
2203 			}
2204 			break;
2205 		}
2206 
2207 		if (get_t_state(thi) != RUNNING)
2208 			break;
2209 
2210 		if (!list_empty(&work_list)) {
2211 			w = list_first_entry(&work_list, struct drbd_work, list);
2212 			list_del_init(&w->list);
2213 			update_worker_timing_details(connection, w->cb);
2214 			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2215 				continue;
2216 			if (connection->cstate >= C_WF_REPORT_PARAMS)
2217 				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2218 		}
2219 	}
2220 
2221 	do {
2222 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2223 			update_worker_timing_details(connection, do_unqueued_work);
2224 			do_unqueued_work(connection);
2225 		}
2226 		if (!list_empty(&work_list)) {
2227 			w = list_first_entry(&work_list, struct drbd_work, list);
2228 			list_del_init(&w->list);
2229 			update_worker_timing_details(connection, w->cb);
2230 			w->cb(w, 1);
2231 		} else
2232 			dequeue_work_batch(&connection->sender_work, &work_list);
2233 	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2234 
2235 	rcu_read_lock();
2236 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2237 		struct drbd_device *device = peer_device->device;
2238 		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2239 		kref_get(&device->kref);
2240 		rcu_read_unlock();
2241 		drbd_device_cleanup(device);
2242 		kref_put(&device->kref, drbd_destroy_device);
2243 		rcu_read_lock();
2244 	}
2245 	rcu_read_unlock();
2246 
2247 	return 0;
2248 }
2249