xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision 151f4e2b)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24 */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched/signal.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41 
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44 
45 /* endio handlers:
46  *   drbd_md_endio (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   drbd_bm_endio (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57 
58 /* used for synchronous meta data and bitmap IO
59  * submitted by drbd_md_sync_page_io()
60  */
61 void drbd_md_endio(struct bio *bio)
62 {
63 	struct drbd_device *device;
64 
65 	device = bio->bi_private;
66 	device->md_io.error = blk_status_to_errno(bio->bi_status);
67 
68 	/* special case: drbd_md_read() during drbd_adm_attach() */
69 	if (device->ldev)
70 		put_ldev(device);
71 	bio_put(bio);
72 
73 	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
74 	 * to timeout on the lower level device, and eventually detach from it.
75 	 * If this io completion runs after that timeout expired, this
76 	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
77 	 * During normal operation, this only puts that extra reference
78 	 * down to 1 again.
79 	 * Make sure we first drop the reference, and only then signal
80 	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
81 	 * next drbd_md_sync_page_io(), that we trigger the
82 	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
83 	 */
84 	drbd_md_put_buffer(device);
85 	device->md_io.done = 1;
86 	wake_up(&device->misc_wait);
87 }
88 
89 /* reads on behalf of the partner,
90  * "submitted" by the receiver
91  */
92 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
93 {
94 	unsigned long flags = 0;
95 	struct drbd_peer_device *peer_device = peer_req->peer_device;
96 	struct drbd_device *device = peer_device->device;
97 
98 	spin_lock_irqsave(&device->resource->req_lock, flags);
99 	device->read_cnt += peer_req->i.size >> 9;
100 	list_del(&peer_req->w.list);
101 	if (list_empty(&device->read_ee))
102 		wake_up(&device->ee_wait);
103 	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
104 		__drbd_chk_io_error(device, DRBD_READ_ERROR);
105 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
106 
107 	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
108 	put_ldev(device);
109 }
110 
111 /* writes on behalf of the partner, or resync writes,
112  * "submitted" by the receiver, final stage.  */
113 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
114 {
115 	unsigned long flags = 0;
116 	struct drbd_peer_device *peer_device = peer_req->peer_device;
117 	struct drbd_device *device = peer_device->device;
118 	struct drbd_connection *connection = peer_device->connection;
119 	struct drbd_interval i;
120 	int do_wake;
121 	u64 block_id;
122 	int do_al_complete_io;
123 
124 	/* after we moved peer_req to done_ee,
125 	 * we may no longer access it,
126 	 * it may be freed/reused already!
127 	 * (as soon as we release the req_lock) */
128 	i = peer_req->i;
129 	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
130 	block_id = peer_req->block_id;
131 	peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
132 
133 	if (peer_req->flags & EE_WAS_ERROR) {
134 		/* In protocol != C, we usually do not send write acks.
135 		 * In case of a write error, send the neg ack anyways. */
136 		if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
137 			inc_unacked(device);
138 		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
139 	}
140 
141 	spin_lock_irqsave(&device->resource->req_lock, flags);
142 	device->writ_cnt += peer_req->i.size >> 9;
143 	list_move_tail(&peer_req->w.list, &device->done_ee);
144 
145 	/*
146 	 * Do not remove from the write_requests tree here: we did not send the
147 	 * Ack yet and did not wake possibly waiting conflicting requests.
148 	 * Removed from the tree from "drbd_process_done_ee" within the
149 	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
150 	 * _drbd_clear_done_ee.
151 	 */
152 
153 	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
154 
155 	/* FIXME do we want to detach for failed REQ_OP_DISCARD?
156 	 * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
157 	if (peer_req->flags & EE_WAS_ERROR)
158 		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
159 
160 	if (connection->cstate >= C_WF_REPORT_PARAMS) {
161 		kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
162 		if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
163 			kref_put(&device->kref, drbd_destroy_device);
164 	}
165 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
166 
167 	if (block_id == ID_SYNCER)
168 		drbd_rs_complete_io(device, i.sector);
169 
170 	if (do_wake)
171 		wake_up(&device->ee_wait);
172 
173 	if (do_al_complete_io)
174 		drbd_al_complete_io(device, &i);
175 
176 	put_ldev(device);
177 }
178 
179 /* writes on behalf of the partner, or resync writes,
180  * "submitted" by the receiver.
181  */
182 void drbd_peer_request_endio(struct bio *bio)
183 {
184 	struct drbd_peer_request *peer_req = bio->bi_private;
185 	struct drbd_device *device = peer_req->peer_device->device;
186 	bool is_write = bio_data_dir(bio) == WRITE;
187 	bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
188 			  bio_op(bio) == REQ_OP_DISCARD;
189 
190 	if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
191 		drbd_warn(device, "%s: error=%d s=%llus\n",
192 				is_write ? (is_discard ? "discard" : "write")
193 					: "read", bio->bi_status,
194 				(unsigned long long)peer_req->i.sector);
195 
196 	if (bio->bi_status)
197 		set_bit(__EE_WAS_ERROR, &peer_req->flags);
198 
199 	bio_put(bio); /* no need for the bio anymore */
200 	if (atomic_dec_and_test(&peer_req->pending_bios)) {
201 		if (is_write)
202 			drbd_endio_write_sec_final(peer_req);
203 		else
204 			drbd_endio_read_sec_final(peer_req);
205 	}
206 }
207 
208 static void
209 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
210 {
211 	panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
212 		device->minor, device->resource->name, device->vnr);
213 }
214 
215 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
216  */
217 void drbd_request_endio(struct bio *bio)
218 {
219 	unsigned long flags;
220 	struct drbd_request *req = bio->bi_private;
221 	struct drbd_device *device = req->device;
222 	struct bio_and_error m;
223 	enum drbd_req_event what;
224 
225 	/* If this request was aborted locally before,
226 	 * but now was completed "successfully",
227 	 * chances are that this caused arbitrary data corruption.
228 	 *
229 	 * "aborting" requests, or force-detaching the disk, is intended for
230 	 * completely blocked/hung local backing devices which do no longer
231 	 * complete requests at all, not even do error completions.  In this
232 	 * situation, usually a hard-reset and failover is the only way out.
233 	 *
234 	 * By "aborting", basically faking a local error-completion,
235 	 * we allow for a more graceful swichover by cleanly migrating services.
236 	 * Still the affected node has to be rebooted "soon".
237 	 *
238 	 * By completing these requests, we allow the upper layers to re-use
239 	 * the associated data pages.
240 	 *
241 	 * If later the local backing device "recovers", and now DMAs some data
242 	 * from disk into the original request pages, in the best case it will
243 	 * just put random data into unused pages; but typically it will corrupt
244 	 * meanwhile completely unrelated data, causing all sorts of damage.
245 	 *
246 	 * Which means delayed successful completion,
247 	 * especially for READ requests,
248 	 * is a reason to panic().
249 	 *
250 	 * We assume that a delayed *error* completion is OK,
251 	 * though we still will complain noisily about it.
252 	 */
253 	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
254 		if (__ratelimit(&drbd_ratelimit_state))
255 			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
256 
257 		if (!bio->bi_status)
258 			drbd_panic_after_delayed_completion_of_aborted_request(device);
259 	}
260 
261 	/* to avoid recursion in __req_mod */
262 	if (unlikely(bio->bi_status)) {
263 		switch (bio_op(bio)) {
264 		case REQ_OP_WRITE_ZEROES:
265 		case REQ_OP_DISCARD:
266 			if (bio->bi_status == BLK_STS_NOTSUPP)
267 				what = DISCARD_COMPLETED_NOTSUPP;
268 			else
269 				what = DISCARD_COMPLETED_WITH_ERROR;
270 			break;
271 		case REQ_OP_READ:
272 			if (bio->bi_opf & REQ_RAHEAD)
273 				what = READ_AHEAD_COMPLETED_WITH_ERROR;
274 			else
275 				what = READ_COMPLETED_WITH_ERROR;
276 			break;
277 		default:
278 			what = WRITE_COMPLETED_WITH_ERROR;
279 			break;
280 		}
281 	} else {
282 		what = COMPLETED_OK;
283 	}
284 
285 	req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
286 	bio_put(bio);
287 
288 	/* not req_mod(), we need irqsave here! */
289 	spin_lock_irqsave(&device->resource->req_lock, flags);
290 	__req_mod(req, what, &m);
291 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
292 	put_ldev(device);
293 
294 	if (m.bio)
295 		complete_master_bio(device, &m);
296 }
297 
298 void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
299 {
300 	SHASH_DESC_ON_STACK(desc, tfm);
301 	struct page *page = peer_req->pages;
302 	struct page *tmp;
303 	unsigned len;
304 	void *src;
305 
306 	desc->tfm = tfm;
307 
308 	crypto_shash_init(desc);
309 
310 	src = kmap_atomic(page);
311 	while ((tmp = page_chain_next(page))) {
312 		/* all but the last page will be fully used */
313 		crypto_shash_update(desc, src, PAGE_SIZE);
314 		kunmap_atomic(src);
315 		page = tmp;
316 		src = kmap_atomic(page);
317 	}
318 	/* and now the last, possibly only partially used page */
319 	len = peer_req->i.size & (PAGE_SIZE - 1);
320 	crypto_shash_update(desc, src, len ?: PAGE_SIZE);
321 	kunmap_atomic(src);
322 
323 	crypto_shash_final(desc, digest);
324 	shash_desc_zero(desc);
325 }
326 
327 void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
328 {
329 	SHASH_DESC_ON_STACK(desc, tfm);
330 	struct bio_vec bvec;
331 	struct bvec_iter iter;
332 
333 	desc->tfm = tfm;
334 
335 	crypto_shash_init(desc);
336 
337 	bio_for_each_segment(bvec, bio, iter) {
338 		u8 *src;
339 
340 		src = kmap_atomic(bvec.bv_page);
341 		crypto_shash_update(desc, src + bvec.bv_offset, bvec.bv_len);
342 		kunmap_atomic(src);
343 
344 		/* REQ_OP_WRITE_SAME has only one segment,
345 		 * checksum the payload only once. */
346 		if (bio_op(bio) == REQ_OP_WRITE_SAME)
347 			break;
348 	}
349 	crypto_shash_final(desc, digest);
350 	shash_desc_zero(desc);
351 }
352 
353 /* MAYBE merge common code with w_e_end_ov_req */
354 static int w_e_send_csum(struct drbd_work *w, int cancel)
355 {
356 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
357 	struct drbd_peer_device *peer_device = peer_req->peer_device;
358 	struct drbd_device *device = peer_device->device;
359 	int digest_size;
360 	void *digest;
361 	int err = 0;
362 
363 	if (unlikely(cancel))
364 		goto out;
365 
366 	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
367 		goto out;
368 
369 	digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
370 	digest = kmalloc(digest_size, GFP_NOIO);
371 	if (digest) {
372 		sector_t sector = peer_req->i.sector;
373 		unsigned int size = peer_req->i.size;
374 		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
375 		/* Free peer_req and pages before send.
376 		 * In case we block on congestion, we could otherwise run into
377 		 * some distributed deadlock, if the other side blocks on
378 		 * congestion as well, because our receiver blocks in
379 		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
380 		drbd_free_peer_req(device, peer_req);
381 		peer_req = NULL;
382 		inc_rs_pending(device);
383 		err = drbd_send_drequest_csum(peer_device, sector, size,
384 					      digest, digest_size,
385 					      P_CSUM_RS_REQUEST);
386 		kfree(digest);
387 	} else {
388 		drbd_err(device, "kmalloc() of digest failed.\n");
389 		err = -ENOMEM;
390 	}
391 
392 out:
393 	if (peer_req)
394 		drbd_free_peer_req(device, peer_req);
395 
396 	if (unlikely(err))
397 		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
398 	return err;
399 }
400 
401 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
402 
403 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
404 {
405 	struct drbd_device *device = peer_device->device;
406 	struct drbd_peer_request *peer_req;
407 
408 	if (!get_ldev(device))
409 		return -EIO;
410 
411 	/* GFP_TRY, because if there is no memory available right now, this may
412 	 * be rescheduled for later. It is "only" background resync, after all. */
413 	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
414 				       size, size, GFP_TRY);
415 	if (!peer_req)
416 		goto defer;
417 
418 	peer_req->w.cb = w_e_send_csum;
419 	spin_lock_irq(&device->resource->req_lock);
420 	list_add_tail(&peer_req->w.list, &device->read_ee);
421 	spin_unlock_irq(&device->resource->req_lock);
422 
423 	atomic_add(size >> 9, &device->rs_sect_ev);
424 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
425 				     DRBD_FAULT_RS_RD) == 0)
426 		return 0;
427 
428 	/* If it failed because of ENOMEM, retry should help.  If it failed
429 	 * because bio_add_page failed (probably broken lower level driver),
430 	 * retry may or may not help.
431 	 * If it does not, you may need to force disconnect. */
432 	spin_lock_irq(&device->resource->req_lock);
433 	list_del(&peer_req->w.list);
434 	spin_unlock_irq(&device->resource->req_lock);
435 
436 	drbd_free_peer_req(device, peer_req);
437 defer:
438 	put_ldev(device);
439 	return -EAGAIN;
440 }
441 
442 int w_resync_timer(struct drbd_work *w, int cancel)
443 {
444 	struct drbd_device *device =
445 		container_of(w, struct drbd_device, resync_work);
446 
447 	switch (device->state.conn) {
448 	case C_VERIFY_S:
449 		make_ov_request(device, cancel);
450 		break;
451 	case C_SYNC_TARGET:
452 		make_resync_request(device, cancel);
453 		break;
454 	}
455 
456 	return 0;
457 }
458 
459 void resync_timer_fn(struct timer_list *t)
460 {
461 	struct drbd_device *device = from_timer(device, t, resync_timer);
462 
463 	drbd_queue_work_if_unqueued(
464 		&first_peer_device(device)->connection->sender_work,
465 		&device->resync_work);
466 }
467 
468 static void fifo_set(struct fifo_buffer *fb, int value)
469 {
470 	int i;
471 
472 	for (i = 0; i < fb->size; i++)
473 		fb->values[i] = value;
474 }
475 
476 static int fifo_push(struct fifo_buffer *fb, int value)
477 {
478 	int ov;
479 
480 	ov = fb->values[fb->head_index];
481 	fb->values[fb->head_index++] = value;
482 
483 	if (fb->head_index >= fb->size)
484 		fb->head_index = 0;
485 
486 	return ov;
487 }
488 
489 static void fifo_add_val(struct fifo_buffer *fb, int value)
490 {
491 	int i;
492 
493 	for (i = 0; i < fb->size; i++)
494 		fb->values[i] += value;
495 }
496 
497 struct fifo_buffer *fifo_alloc(int fifo_size)
498 {
499 	struct fifo_buffer *fb;
500 
501 	fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
502 	if (!fb)
503 		return NULL;
504 
505 	fb->head_index = 0;
506 	fb->size = fifo_size;
507 	fb->total = 0;
508 
509 	return fb;
510 }
511 
512 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
513 {
514 	struct disk_conf *dc;
515 	unsigned int want;     /* The number of sectors we want in-flight */
516 	int req_sect; /* Number of sectors to request in this turn */
517 	int correction; /* Number of sectors more we need in-flight */
518 	int cps; /* correction per invocation of drbd_rs_controller() */
519 	int steps; /* Number of time steps to plan ahead */
520 	int curr_corr;
521 	int max_sect;
522 	struct fifo_buffer *plan;
523 
524 	dc = rcu_dereference(device->ldev->disk_conf);
525 	plan = rcu_dereference(device->rs_plan_s);
526 
527 	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
528 
529 	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
530 		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
531 	} else { /* normal path */
532 		want = dc->c_fill_target ? dc->c_fill_target :
533 			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
534 	}
535 
536 	correction = want - device->rs_in_flight - plan->total;
537 
538 	/* Plan ahead */
539 	cps = correction / steps;
540 	fifo_add_val(plan, cps);
541 	plan->total += cps * steps;
542 
543 	/* What we do in this step */
544 	curr_corr = fifo_push(plan, 0);
545 	plan->total -= curr_corr;
546 
547 	req_sect = sect_in + curr_corr;
548 	if (req_sect < 0)
549 		req_sect = 0;
550 
551 	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
552 	if (req_sect > max_sect)
553 		req_sect = max_sect;
554 
555 	/*
556 	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
557 		 sect_in, device->rs_in_flight, want, correction,
558 		 steps, cps, device->rs_planed, curr_corr, req_sect);
559 	*/
560 
561 	return req_sect;
562 }
563 
564 static int drbd_rs_number_requests(struct drbd_device *device)
565 {
566 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
567 	int number, mxb;
568 
569 	sect_in = atomic_xchg(&device->rs_sect_in, 0);
570 	device->rs_in_flight -= sect_in;
571 
572 	rcu_read_lock();
573 	mxb = drbd_get_max_buffers(device) / 2;
574 	if (rcu_dereference(device->rs_plan_s)->size) {
575 		number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
576 		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
577 	} else {
578 		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
579 		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
580 	}
581 	rcu_read_unlock();
582 
583 	/* Don't have more than "max-buffers"/2 in-flight.
584 	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
585 	 * potentially causing a distributed deadlock on congestion during
586 	 * online-verify or (checksum-based) resync, if max-buffers,
587 	 * socket buffer sizes and resync rate settings are mis-configured. */
588 
589 	/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
590 	 * mxb (as used here, and in drbd_alloc_pages on the peer) is
591 	 * "number of pages" (typically also 4k),
592 	 * but "rs_in_flight" is in "sectors" (512 Byte). */
593 	if (mxb - device->rs_in_flight/8 < number)
594 		number = mxb - device->rs_in_flight/8;
595 
596 	return number;
597 }
598 
599 static int make_resync_request(struct drbd_device *const device, int cancel)
600 {
601 	struct drbd_peer_device *const peer_device = first_peer_device(device);
602 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
603 	unsigned long bit;
604 	sector_t sector;
605 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
606 	int max_bio_size;
607 	int number, rollback_i, size;
608 	int align, requeue = 0;
609 	int i = 0;
610 	int discard_granularity = 0;
611 
612 	if (unlikely(cancel))
613 		return 0;
614 
615 	if (device->rs_total == 0) {
616 		/* empty resync? */
617 		drbd_resync_finished(device);
618 		return 0;
619 	}
620 
621 	if (!get_ldev(device)) {
622 		/* Since we only need to access device->rsync a
623 		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
624 		   to continue resync with a broken disk makes no sense at
625 		   all */
626 		drbd_err(device, "Disk broke down during resync!\n");
627 		return 0;
628 	}
629 
630 	if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
631 		rcu_read_lock();
632 		discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
633 		rcu_read_unlock();
634 	}
635 
636 	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
637 	number = drbd_rs_number_requests(device);
638 	if (number <= 0)
639 		goto requeue;
640 
641 	for (i = 0; i < number; i++) {
642 		/* Stop generating RS requests when half of the send buffer is filled,
643 		 * but notify TCP that we'd like to have more space. */
644 		mutex_lock(&connection->data.mutex);
645 		if (connection->data.socket) {
646 			struct sock *sk = connection->data.socket->sk;
647 			int queued = sk->sk_wmem_queued;
648 			int sndbuf = sk->sk_sndbuf;
649 			if (queued > sndbuf / 2) {
650 				requeue = 1;
651 				if (sk->sk_socket)
652 					set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
653 			}
654 		} else
655 			requeue = 1;
656 		mutex_unlock(&connection->data.mutex);
657 		if (requeue)
658 			goto requeue;
659 
660 next_sector:
661 		size = BM_BLOCK_SIZE;
662 		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
663 
664 		if (bit == DRBD_END_OF_BITMAP) {
665 			device->bm_resync_fo = drbd_bm_bits(device);
666 			put_ldev(device);
667 			return 0;
668 		}
669 
670 		sector = BM_BIT_TO_SECT(bit);
671 
672 		if (drbd_try_rs_begin_io(device, sector)) {
673 			device->bm_resync_fo = bit;
674 			goto requeue;
675 		}
676 		device->bm_resync_fo = bit + 1;
677 
678 		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
679 			drbd_rs_complete_io(device, sector);
680 			goto next_sector;
681 		}
682 
683 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
684 		/* try to find some adjacent bits.
685 		 * we stop if we have already the maximum req size.
686 		 *
687 		 * Additionally always align bigger requests, in order to
688 		 * be prepared for all stripe sizes of software RAIDs.
689 		 */
690 		align = 1;
691 		rollback_i = i;
692 		while (i < number) {
693 			if (size + BM_BLOCK_SIZE > max_bio_size)
694 				break;
695 
696 			/* Be always aligned */
697 			if (sector & ((1<<(align+3))-1))
698 				break;
699 
700 			if (discard_granularity && size == discard_granularity)
701 				break;
702 
703 			/* do not cross extent boundaries */
704 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
705 				break;
706 			/* now, is it actually dirty, after all?
707 			 * caution, drbd_bm_test_bit is tri-state for some
708 			 * obscure reason; ( b == 0 ) would get the out-of-band
709 			 * only accidentally right because of the "oddly sized"
710 			 * adjustment below */
711 			if (drbd_bm_test_bit(device, bit+1) != 1)
712 				break;
713 			bit++;
714 			size += BM_BLOCK_SIZE;
715 			if ((BM_BLOCK_SIZE << align) <= size)
716 				align++;
717 			i++;
718 		}
719 		/* if we merged some,
720 		 * reset the offset to start the next drbd_bm_find_next from */
721 		if (size > BM_BLOCK_SIZE)
722 			device->bm_resync_fo = bit + 1;
723 #endif
724 
725 		/* adjust very last sectors, in case we are oddly sized */
726 		if (sector + (size>>9) > capacity)
727 			size = (capacity-sector)<<9;
728 
729 		if (device->use_csums) {
730 			switch (read_for_csum(peer_device, sector, size)) {
731 			case -EIO: /* Disk failure */
732 				put_ldev(device);
733 				return -EIO;
734 			case -EAGAIN: /* allocation failed, or ldev busy */
735 				drbd_rs_complete_io(device, sector);
736 				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
737 				i = rollback_i;
738 				goto requeue;
739 			case 0:
740 				/* everything ok */
741 				break;
742 			default:
743 				BUG();
744 			}
745 		} else {
746 			int err;
747 
748 			inc_rs_pending(device);
749 			err = drbd_send_drequest(peer_device,
750 						 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
751 						 sector, size, ID_SYNCER);
752 			if (err) {
753 				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
754 				dec_rs_pending(device);
755 				put_ldev(device);
756 				return err;
757 			}
758 		}
759 	}
760 
761 	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
762 		/* last syncer _request_ was sent,
763 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
764 		 * next sync group will resume), as soon as we receive the last
765 		 * resync data block, and the last bit is cleared.
766 		 * until then resync "work" is "inactive" ...
767 		 */
768 		put_ldev(device);
769 		return 0;
770 	}
771 
772  requeue:
773 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
774 	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
775 	put_ldev(device);
776 	return 0;
777 }
778 
779 static int make_ov_request(struct drbd_device *device, int cancel)
780 {
781 	int number, i, size;
782 	sector_t sector;
783 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
784 	bool stop_sector_reached = false;
785 
786 	if (unlikely(cancel))
787 		return 1;
788 
789 	number = drbd_rs_number_requests(device);
790 
791 	sector = device->ov_position;
792 	for (i = 0; i < number; i++) {
793 		if (sector >= capacity)
794 			return 1;
795 
796 		/* We check for "finished" only in the reply path:
797 		 * w_e_end_ov_reply().
798 		 * We need to send at least one request out. */
799 		stop_sector_reached = i > 0
800 			&& verify_can_do_stop_sector(device)
801 			&& sector >= device->ov_stop_sector;
802 		if (stop_sector_reached)
803 			break;
804 
805 		size = BM_BLOCK_SIZE;
806 
807 		if (drbd_try_rs_begin_io(device, sector)) {
808 			device->ov_position = sector;
809 			goto requeue;
810 		}
811 
812 		if (sector + (size>>9) > capacity)
813 			size = (capacity-sector)<<9;
814 
815 		inc_rs_pending(device);
816 		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
817 			dec_rs_pending(device);
818 			return 0;
819 		}
820 		sector += BM_SECT_PER_BIT;
821 	}
822 	device->ov_position = sector;
823 
824  requeue:
825 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
826 	if (i == 0 || !stop_sector_reached)
827 		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
828 	return 1;
829 }
830 
831 int w_ov_finished(struct drbd_work *w, int cancel)
832 {
833 	struct drbd_device_work *dw =
834 		container_of(w, struct drbd_device_work, w);
835 	struct drbd_device *device = dw->device;
836 	kfree(dw);
837 	ov_out_of_sync_print(device);
838 	drbd_resync_finished(device);
839 
840 	return 0;
841 }
842 
843 static int w_resync_finished(struct drbd_work *w, int cancel)
844 {
845 	struct drbd_device_work *dw =
846 		container_of(w, struct drbd_device_work, w);
847 	struct drbd_device *device = dw->device;
848 	kfree(dw);
849 
850 	drbd_resync_finished(device);
851 
852 	return 0;
853 }
854 
855 static void ping_peer(struct drbd_device *device)
856 {
857 	struct drbd_connection *connection = first_peer_device(device)->connection;
858 
859 	clear_bit(GOT_PING_ACK, &connection->flags);
860 	request_ping(connection);
861 	wait_event(connection->ping_wait,
862 		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
863 }
864 
865 int drbd_resync_finished(struct drbd_device *device)
866 {
867 	struct drbd_connection *connection = first_peer_device(device)->connection;
868 	unsigned long db, dt, dbdt;
869 	unsigned long n_oos;
870 	union drbd_state os, ns;
871 	struct drbd_device_work *dw;
872 	char *khelper_cmd = NULL;
873 	int verify_done = 0;
874 
875 	/* Remove all elements from the resync LRU. Since future actions
876 	 * might set bits in the (main) bitmap, then the entries in the
877 	 * resync LRU would be wrong. */
878 	if (drbd_rs_del_all(device)) {
879 		/* In case this is not possible now, most probably because
880 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
881 		 * queue (or even the read operations for those packets
882 		 * is not finished by now).   Retry in 100ms. */
883 
884 		schedule_timeout_interruptible(HZ / 10);
885 		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
886 		if (dw) {
887 			dw->w.cb = w_resync_finished;
888 			dw->device = device;
889 			drbd_queue_work(&connection->sender_work, &dw->w);
890 			return 1;
891 		}
892 		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
893 	}
894 
895 	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
896 	if (dt <= 0)
897 		dt = 1;
898 
899 	db = device->rs_total;
900 	/* adjust for verify start and stop sectors, respective reached position */
901 	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
902 		db -= device->ov_left;
903 
904 	dbdt = Bit2KB(db/dt);
905 	device->rs_paused /= HZ;
906 
907 	if (!get_ldev(device))
908 		goto out;
909 
910 	ping_peer(device);
911 
912 	spin_lock_irq(&device->resource->req_lock);
913 	os = drbd_read_state(device);
914 
915 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
916 
917 	/* This protects us against multiple calls (that can happen in the presence
918 	   of application IO), and against connectivity loss just before we arrive here. */
919 	if (os.conn <= C_CONNECTED)
920 		goto out_unlock;
921 
922 	ns = os;
923 	ns.conn = C_CONNECTED;
924 
925 	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
926 	     verify_done ? "Online verify" : "Resync",
927 	     dt + device->rs_paused, device->rs_paused, dbdt);
928 
929 	n_oos = drbd_bm_total_weight(device);
930 
931 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
932 		if (n_oos) {
933 			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
934 			      n_oos, Bit2KB(1));
935 			khelper_cmd = "out-of-sync";
936 		}
937 	} else {
938 		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
939 
940 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
941 			khelper_cmd = "after-resync-target";
942 
943 		if (device->use_csums && device->rs_total) {
944 			const unsigned long s = device->rs_same_csum;
945 			const unsigned long t = device->rs_total;
946 			const int ratio =
947 				(t == 0)     ? 0 :
948 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
949 			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
950 			     "transferred %luK total %luK\n",
951 			     ratio,
952 			     Bit2KB(device->rs_same_csum),
953 			     Bit2KB(device->rs_total - device->rs_same_csum),
954 			     Bit2KB(device->rs_total));
955 		}
956 	}
957 
958 	if (device->rs_failed) {
959 		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
960 
961 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
962 			ns.disk = D_INCONSISTENT;
963 			ns.pdsk = D_UP_TO_DATE;
964 		} else {
965 			ns.disk = D_UP_TO_DATE;
966 			ns.pdsk = D_INCONSISTENT;
967 		}
968 	} else {
969 		ns.disk = D_UP_TO_DATE;
970 		ns.pdsk = D_UP_TO_DATE;
971 
972 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
973 			if (device->p_uuid) {
974 				int i;
975 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
976 					_drbd_uuid_set(device, i, device->p_uuid[i]);
977 				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
978 				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
979 			} else {
980 				drbd_err(device, "device->p_uuid is NULL! BUG\n");
981 			}
982 		}
983 
984 		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
985 			/* for verify runs, we don't update uuids here,
986 			 * so there would be nothing to report. */
987 			drbd_uuid_set_bm(device, 0UL);
988 			drbd_print_uuids(device, "updated UUIDs");
989 			if (device->p_uuid) {
990 				/* Now the two UUID sets are equal, update what we
991 				 * know of the peer. */
992 				int i;
993 				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
994 					device->p_uuid[i] = device->ldev->md.uuid[i];
995 			}
996 		}
997 	}
998 
999 	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
1000 out_unlock:
1001 	spin_unlock_irq(&device->resource->req_lock);
1002 
1003 	/* If we have been sync source, and have an effective fencing-policy,
1004 	 * once *all* volumes are back in sync, call "unfence". */
1005 	if (os.conn == C_SYNC_SOURCE) {
1006 		enum drbd_disk_state disk_state = D_MASK;
1007 		enum drbd_disk_state pdsk_state = D_MASK;
1008 		enum drbd_fencing_p fp = FP_DONT_CARE;
1009 
1010 		rcu_read_lock();
1011 		fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1012 		if (fp != FP_DONT_CARE) {
1013 			struct drbd_peer_device *peer_device;
1014 			int vnr;
1015 			idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1016 				struct drbd_device *device = peer_device->device;
1017 				disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1018 				pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1019 			}
1020 		}
1021 		rcu_read_unlock();
1022 		if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1023 			conn_khelper(connection, "unfence-peer");
1024 	}
1025 
1026 	put_ldev(device);
1027 out:
1028 	device->rs_total  = 0;
1029 	device->rs_failed = 0;
1030 	device->rs_paused = 0;
1031 
1032 	/* reset start sector, if we reached end of device */
1033 	if (verify_done && device->ov_left == 0)
1034 		device->ov_start_sector = 0;
1035 
1036 	drbd_md_sync(device);
1037 
1038 	if (khelper_cmd)
1039 		drbd_khelper(device, khelper_cmd);
1040 
1041 	return 1;
1042 }
1043 
1044 /* helper */
1045 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1046 {
1047 	if (drbd_peer_req_has_active_page(peer_req)) {
1048 		/* This might happen if sendpage() has not finished */
1049 		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1050 		atomic_add(i, &device->pp_in_use_by_net);
1051 		atomic_sub(i, &device->pp_in_use);
1052 		spin_lock_irq(&device->resource->req_lock);
1053 		list_add_tail(&peer_req->w.list, &device->net_ee);
1054 		spin_unlock_irq(&device->resource->req_lock);
1055 		wake_up(&drbd_pp_wait);
1056 	} else
1057 		drbd_free_peer_req(device, peer_req);
1058 }
1059 
1060 /**
1061  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1062  * @w:		work object.
1063  * @cancel:	The connection will be closed anyways
1064  */
1065 int w_e_end_data_req(struct drbd_work *w, int cancel)
1066 {
1067 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1068 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1069 	struct drbd_device *device = peer_device->device;
1070 	int err;
1071 
1072 	if (unlikely(cancel)) {
1073 		drbd_free_peer_req(device, peer_req);
1074 		dec_unacked(device);
1075 		return 0;
1076 	}
1077 
1078 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1079 		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1080 	} else {
1081 		if (__ratelimit(&drbd_ratelimit_state))
1082 			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1083 			    (unsigned long long)peer_req->i.sector);
1084 
1085 		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1086 	}
1087 
1088 	dec_unacked(device);
1089 
1090 	move_to_net_ee_or_free(device, peer_req);
1091 
1092 	if (unlikely(err))
1093 		drbd_err(device, "drbd_send_block() failed\n");
1094 	return err;
1095 }
1096 
1097 static bool all_zero(struct drbd_peer_request *peer_req)
1098 {
1099 	struct page *page = peer_req->pages;
1100 	unsigned int len = peer_req->i.size;
1101 
1102 	page_chain_for_each(page) {
1103 		unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1104 		unsigned int i, words = l / sizeof(long);
1105 		unsigned long *d;
1106 
1107 		d = kmap_atomic(page);
1108 		for (i = 0; i < words; i++) {
1109 			if (d[i]) {
1110 				kunmap_atomic(d);
1111 				return false;
1112 			}
1113 		}
1114 		kunmap_atomic(d);
1115 		len -= l;
1116 	}
1117 
1118 	return true;
1119 }
1120 
1121 /**
1122  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1123  * @w:		work object.
1124  * @cancel:	The connection will be closed anyways
1125  */
1126 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1127 {
1128 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1129 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1130 	struct drbd_device *device = peer_device->device;
1131 	int err;
1132 
1133 	if (unlikely(cancel)) {
1134 		drbd_free_peer_req(device, peer_req);
1135 		dec_unacked(device);
1136 		return 0;
1137 	}
1138 
1139 	if (get_ldev_if_state(device, D_FAILED)) {
1140 		drbd_rs_complete_io(device, peer_req->i.sector);
1141 		put_ldev(device);
1142 	}
1143 
1144 	if (device->state.conn == C_AHEAD) {
1145 		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1146 	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1147 		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1148 			inc_rs_pending(device);
1149 			if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1150 				err = drbd_send_rs_deallocated(peer_device, peer_req);
1151 			else
1152 				err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1153 		} else {
1154 			if (__ratelimit(&drbd_ratelimit_state))
1155 				drbd_err(device, "Not sending RSDataReply, "
1156 				    "partner DISKLESS!\n");
1157 			err = 0;
1158 		}
1159 	} else {
1160 		if (__ratelimit(&drbd_ratelimit_state))
1161 			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1162 			    (unsigned long long)peer_req->i.sector);
1163 
1164 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1165 
1166 		/* update resync data with failure */
1167 		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1168 	}
1169 
1170 	dec_unacked(device);
1171 
1172 	move_to_net_ee_or_free(device, peer_req);
1173 
1174 	if (unlikely(err))
1175 		drbd_err(device, "drbd_send_block() failed\n");
1176 	return err;
1177 }
1178 
1179 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1180 {
1181 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1182 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1183 	struct drbd_device *device = peer_device->device;
1184 	struct digest_info *di;
1185 	int digest_size;
1186 	void *digest = NULL;
1187 	int err, eq = 0;
1188 
1189 	if (unlikely(cancel)) {
1190 		drbd_free_peer_req(device, peer_req);
1191 		dec_unacked(device);
1192 		return 0;
1193 	}
1194 
1195 	if (get_ldev(device)) {
1196 		drbd_rs_complete_io(device, peer_req->i.sector);
1197 		put_ldev(device);
1198 	}
1199 
1200 	di = peer_req->digest;
1201 
1202 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1203 		/* quick hack to try to avoid a race against reconfiguration.
1204 		 * a real fix would be much more involved,
1205 		 * introducing more locking mechanisms */
1206 		if (peer_device->connection->csums_tfm) {
1207 			digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1208 			D_ASSERT(device, digest_size == di->digest_size);
1209 			digest = kmalloc(digest_size, GFP_NOIO);
1210 		}
1211 		if (digest) {
1212 			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1213 			eq = !memcmp(digest, di->digest, digest_size);
1214 			kfree(digest);
1215 		}
1216 
1217 		if (eq) {
1218 			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1219 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1220 			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1221 			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1222 		} else {
1223 			inc_rs_pending(device);
1224 			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1225 			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1226 			kfree(di);
1227 			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1228 		}
1229 	} else {
1230 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1231 		if (__ratelimit(&drbd_ratelimit_state))
1232 			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1233 	}
1234 
1235 	dec_unacked(device);
1236 	move_to_net_ee_or_free(device, peer_req);
1237 
1238 	if (unlikely(err))
1239 		drbd_err(device, "drbd_send_block/ack() failed\n");
1240 	return err;
1241 }
1242 
1243 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1244 {
1245 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1246 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1247 	struct drbd_device *device = peer_device->device;
1248 	sector_t sector = peer_req->i.sector;
1249 	unsigned int size = peer_req->i.size;
1250 	int digest_size;
1251 	void *digest;
1252 	int err = 0;
1253 
1254 	if (unlikely(cancel))
1255 		goto out;
1256 
1257 	digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1258 	digest = kmalloc(digest_size, GFP_NOIO);
1259 	if (!digest) {
1260 		err = 1;	/* terminate the connection in case the allocation failed */
1261 		goto out;
1262 	}
1263 
1264 	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1265 		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1266 	else
1267 		memset(digest, 0, digest_size);
1268 
1269 	/* Free e and pages before send.
1270 	 * In case we block on congestion, we could otherwise run into
1271 	 * some distributed deadlock, if the other side blocks on
1272 	 * congestion as well, because our receiver blocks in
1273 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1274 	drbd_free_peer_req(device, peer_req);
1275 	peer_req = NULL;
1276 	inc_rs_pending(device);
1277 	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1278 	if (err)
1279 		dec_rs_pending(device);
1280 	kfree(digest);
1281 
1282 out:
1283 	if (peer_req)
1284 		drbd_free_peer_req(device, peer_req);
1285 	dec_unacked(device);
1286 	return err;
1287 }
1288 
1289 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1290 {
1291 	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1292 		device->ov_last_oos_size += size>>9;
1293 	} else {
1294 		device->ov_last_oos_start = sector;
1295 		device->ov_last_oos_size = size>>9;
1296 	}
1297 	drbd_set_out_of_sync(device, sector, size);
1298 }
1299 
1300 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1301 {
1302 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1303 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1304 	struct drbd_device *device = peer_device->device;
1305 	struct digest_info *di;
1306 	void *digest;
1307 	sector_t sector = peer_req->i.sector;
1308 	unsigned int size = peer_req->i.size;
1309 	int digest_size;
1310 	int err, eq = 0;
1311 	bool stop_sector_reached = false;
1312 
1313 	if (unlikely(cancel)) {
1314 		drbd_free_peer_req(device, peer_req);
1315 		dec_unacked(device);
1316 		return 0;
1317 	}
1318 
1319 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1320 	 * the resync lru has been cleaned up already */
1321 	if (get_ldev(device)) {
1322 		drbd_rs_complete_io(device, peer_req->i.sector);
1323 		put_ldev(device);
1324 	}
1325 
1326 	di = peer_req->digest;
1327 
1328 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1329 		digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1330 		digest = kmalloc(digest_size, GFP_NOIO);
1331 		if (digest) {
1332 			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1333 
1334 			D_ASSERT(device, digest_size == di->digest_size);
1335 			eq = !memcmp(digest, di->digest, digest_size);
1336 			kfree(digest);
1337 		}
1338 	}
1339 
1340 	/* Free peer_req and pages before send.
1341 	 * In case we block on congestion, we could otherwise run into
1342 	 * some distributed deadlock, if the other side blocks on
1343 	 * congestion as well, because our receiver blocks in
1344 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1345 	drbd_free_peer_req(device, peer_req);
1346 	if (!eq)
1347 		drbd_ov_out_of_sync_found(device, sector, size);
1348 	else
1349 		ov_out_of_sync_print(device);
1350 
1351 	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1352 			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1353 
1354 	dec_unacked(device);
1355 
1356 	--device->ov_left;
1357 
1358 	/* let's advance progress step marks only for every other megabyte */
1359 	if ((device->ov_left & 0x200) == 0x200)
1360 		drbd_advance_rs_marks(device, device->ov_left);
1361 
1362 	stop_sector_reached = verify_can_do_stop_sector(device) &&
1363 		(sector + (size>>9)) >= device->ov_stop_sector;
1364 
1365 	if (device->ov_left == 0 || stop_sector_reached) {
1366 		ov_out_of_sync_print(device);
1367 		drbd_resync_finished(device);
1368 	}
1369 
1370 	return err;
1371 }
1372 
1373 /* FIXME
1374  * We need to track the number of pending barrier acks,
1375  * and to be able to wait for them.
1376  * See also comment in drbd_adm_attach before drbd_suspend_io.
1377  */
1378 static int drbd_send_barrier(struct drbd_connection *connection)
1379 {
1380 	struct p_barrier *p;
1381 	struct drbd_socket *sock;
1382 
1383 	sock = &connection->data;
1384 	p = conn_prepare_command(connection, sock);
1385 	if (!p)
1386 		return -EIO;
1387 	p->barrier = connection->send.current_epoch_nr;
1388 	p->pad = 0;
1389 	connection->send.current_epoch_writes = 0;
1390 	connection->send.last_sent_barrier_jif = jiffies;
1391 
1392 	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1393 }
1394 
1395 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1396 {
1397 	struct drbd_socket *sock = &pd->connection->data;
1398 	if (!drbd_prepare_command(pd, sock))
1399 		return -EIO;
1400 	return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1401 }
1402 
1403 int w_send_write_hint(struct drbd_work *w, int cancel)
1404 {
1405 	struct drbd_device *device =
1406 		container_of(w, struct drbd_device, unplug_work);
1407 
1408 	if (cancel)
1409 		return 0;
1410 	return pd_send_unplug_remote(first_peer_device(device));
1411 }
1412 
1413 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1414 {
1415 	if (!connection->send.seen_any_write_yet) {
1416 		connection->send.seen_any_write_yet = true;
1417 		connection->send.current_epoch_nr = epoch;
1418 		connection->send.current_epoch_writes = 0;
1419 		connection->send.last_sent_barrier_jif = jiffies;
1420 	}
1421 }
1422 
1423 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1424 {
1425 	/* re-init if first write on this connection */
1426 	if (!connection->send.seen_any_write_yet)
1427 		return;
1428 	if (connection->send.current_epoch_nr != epoch) {
1429 		if (connection->send.current_epoch_writes)
1430 			drbd_send_barrier(connection);
1431 		connection->send.current_epoch_nr = epoch;
1432 	}
1433 }
1434 
1435 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1436 {
1437 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1438 	struct drbd_device *device = req->device;
1439 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1440 	struct drbd_connection *const connection = peer_device->connection;
1441 	int err;
1442 
1443 	if (unlikely(cancel)) {
1444 		req_mod(req, SEND_CANCELED);
1445 		return 0;
1446 	}
1447 	req->pre_send_jif = jiffies;
1448 
1449 	/* this time, no connection->send.current_epoch_writes++;
1450 	 * If it was sent, it was the closing barrier for the last
1451 	 * replicated epoch, before we went into AHEAD mode.
1452 	 * No more barriers will be sent, until we leave AHEAD mode again. */
1453 	maybe_send_barrier(connection, req->epoch);
1454 
1455 	err = drbd_send_out_of_sync(peer_device, req);
1456 	req_mod(req, OOS_HANDED_TO_NETWORK);
1457 
1458 	return err;
1459 }
1460 
1461 /**
1462  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1463  * @w:		work object.
1464  * @cancel:	The connection will be closed anyways
1465  */
1466 int w_send_dblock(struct drbd_work *w, int cancel)
1467 {
1468 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1469 	struct drbd_device *device = req->device;
1470 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1471 	struct drbd_connection *connection = peer_device->connection;
1472 	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1473 	int err;
1474 
1475 	if (unlikely(cancel)) {
1476 		req_mod(req, SEND_CANCELED);
1477 		return 0;
1478 	}
1479 	req->pre_send_jif = jiffies;
1480 
1481 	re_init_if_first_write(connection, req->epoch);
1482 	maybe_send_barrier(connection, req->epoch);
1483 	connection->send.current_epoch_writes++;
1484 
1485 	err = drbd_send_dblock(peer_device, req);
1486 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1487 
1488 	if (do_send_unplug && !err)
1489 		pd_send_unplug_remote(peer_device);
1490 
1491 	return err;
1492 }
1493 
1494 /**
1495  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1496  * @w:		work object.
1497  * @cancel:	The connection will be closed anyways
1498  */
1499 int w_send_read_req(struct drbd_work *w, int cancel)
1500 {
1501 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1502 	struct drbd_device *device = req->device;
1503 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1504 	struct drbd_connection *connection = peer_device->connection;
1505 	bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1506 	int err;
1507 
1508 	if (unlikely(cancel)) {
1509 		req_mod(req, SEND_CANCELED);
1510 		return 0;
1511 	}
1512 	req->pre_send_jif = jiffies;
1513 
1514 	/* Even read requests may close a write epoch,
1515 	 * if there was any yet. */
1516 	maybe_send_barrier(connection, req->epoch);
1517 
1518 	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1519 				 (unsigned long)req);
1520 
1521 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1522 
1523 	if (do_send_unplug && !err)
1524 		pd_send_unplug_remote(peer_device);
1525 
1526 	return err;
1527 }
1528 
1529 int w_restart_disk_io(struct drbd_work *w, int cancel)
1530 {
1531 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1532 	struct drbd_device *device = req->device;
1533 
1534 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1535 		drbd_al_begin_io(device, &req->i);
1536 
1537 	drbd_req_make_private_bio(req, req->master_bio);
1538 	bio_set_dev(req->private_bio, device->ldev->backing_bdev);
1539 	generic_make_request(req->private_bio);
1540 
1541 	return 0;
1542 }
1543 
1544 static int _drbd_may_sync_now(struct drbd_device *device)
1545 {
1546 	struct drbd_device *odev = device;
1547 	int resync_after;
1548 
1549 	while (1) {
1550 		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1551 			return 1;
1552 		rcu_read_lock();
1553 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1554 		rcu_read_unlock();
1555 		if (resync_after == -1)
1556 			return 1;
1557 		odev = minor_to_device(resync_after);
1558 		if (!odev)
1559 			return 1;
1560 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1561 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1562 		    odev->state.aftr_isp || odev->state.peer_isp ||
1563 		    odev->state.user_isp)
1564 			return 0;
1565 	}
1566 }
1567 
1568 /**
1569  * drbd_pause_after() - Pause resync on all devices that may not resync now
1570  * @device:	DRBD device.
1571  *
1572  * Called from process context only (admin command and after_state_ch).
1573  */
1574 static bool drbd_pause_after(struct drbd_device *device)
1575 {
1576 	bool changed = false;
1577 	struct drbd_device *odev;
1578 	int i;
1579 
1580 	rcu_read_lock();
1581 	idr_for_each_entry(&drbd_devices, odev, i) {
1582 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1583 			continue;
1584 		if (!_drbd_may_sync_now(odev) &&
1585 		    _drbd_set_state(_NS(odev, aftr_isp, 1),
1586 				    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1587 			changed = true;
1588 	}
1589 	rcu_read_unlock();
1590 
1591 	return changed;
1592 }
1593 
1594 /**
1595  * drbd_resume_next() - Resume resync on all devices that may resync now
1596  * @device:	DRBD device.
1597  *
1598  * Called from process context only (admin command and worker).
1599  */
1600 static bool drbd_resume_next(struct drbd_device *device)
1601 {
1602 	bool changed = false;
1603 	struct drbd_device *odev;
1604 	int i;
1605 
1606 	rcu_read_lock();
1607 	idr_for_each_entry(&drbd_devices, odev, i) {
1608 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1609 			continue;
1610 		if (odev->state.aftr_isp) {
1611 			if (_drbd_may_sync_now(odev) &&
1612 			    _drbd_set_state(_NS(odev, aftr_isp, 0),
1613 					    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1614 				changed = true;
1615 		}
1616 	}
1617 	rcu_read_unlock();
1618 	return changed;
1619 }
1620 
1621 void resume_next_sg(struct drbd_device *device)
1622 {
1623 	lock_all_resources();
1624 	drbd_resume_next(device);
1625 	unlock_all_resources();
1626 }
1627 
1628 void suspend_other_sg(struct drbd_device *device)
1629 {
1630 	lock_all_resources();
1631 	drbd_pause_after(device);
1632 	unlock_all_resources();
1633 }
1634 
1635 /* caller must lock_all_resources() */
1636 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1637 {
1638 	struct drbd_device *odev;
1639 	int resync_after;
1640 
1641 	if (o_minor == -1)
1642 		return NO_ERROR;
1643 	if (o_minor < -1 || o_minor > MINORMASK)
1644 		return ERR_RESYNC_AFTER;
1645 
1646 	/* check for loops */
1647 	odev = minor_to_device(o_minor);
1648 	while (1) {
1649 		if (odev == device)
1650 			return ERR_RESYNC_AFTER_CYCLE;
1651 
1652 		/* You are free to depend on diskless, non-existing,
1653 		 * or not yet/no longer existing minors.
1654 		 * We only reject dependency loops.
1655 		 * We cannot follow the dependency chain beyond a detached or
1656 		 * missing minor.
1657 		 */
1658 		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1659 			return NO_ERROR;
1660 
1661 		rcu_read_lock();
1662 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1663 		rcu_read_unlock();
1664 		/* dependency chain ends here, no cycles. */
1665 		if (resync_after == -1)
1666 			return NO_ERROR;
1667 
1668 		/* follow the dependency chain */
1669 		odev = minor_to_device(resync_after);
1670 	}
1671 }
1672 
1673 /* caller must lock_all_resources() */
1674 void drbd_resync_after_changed(struct drbd_device *device)
1675 {
1676 	int changed;
1677 
1678 	do {
1679 		changed  = drbd_pause_after(device);
1680 		changed |= drbd_resume_next(device);
1681 	} while (changed);
1682 }
1683 
1684 void drbd_rs_controller_reset(struct drbd_device *device)
1685 {
1686 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1687 	struct fifo_buffer *plan;
1688 
1689 	atomic_set(&device->rs_sect_in, 0);
1690 	atomic_set(&device->rs_sect_ev, 0);
1691 	device->rs_in_flight = 0;
1692 	device->rs_last_events = (int)part_stat_read_accum(&disk->part0, sectors);
1693 
1694 	/* Updating the RCU protected object in place is necessary since
1695 	   this function gets called from atomic context.
1696 	   It is valid since all other updates also lead to an completely
1697 	   empty fifo */
1698 	rcu_read_lock();
1699 	plan = rcu_dereference(device->rs_plan_s);
1700 	plan->total = 0;
1701 	fifo_set(plan, 0);
1702 	rcu_read_unlock();
1703 }
1704 
1705 void start_resync_timer_fn(struct timer_list *t)
1706 {
1707 	struct drbd_device *device = from_timer(device, t, start_resync_timer);
1708 	drbd_device_post_work(device, RS_START);
1709 }
1710 
1711 static void do_start_resync(struct drbd_device *device)
1712 {
1713 	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1714 		drbd_warn(device, "postponing start_resync ...\n");
1715 		device->start_resync_timer.expires = jiffies + HZ/10;
1716 		add_timer(&device->start_resync_timer);
1717 		return;
1718 	}
1719 
1720 	drbd_start_resync(device, C_SYNC_SOURCE);
1721 	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1722 }
1723 
1724 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1725 {
1726 	bool csums_after_crash_only;
1727 	rcu_read_lock();
1728 	csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1729 	rcu_read_unlock();
1730 	return connection->agreed_pro_version >= 89 &&		/* supported? */
1731 		connection->csums_tfm &&			/* configured? */
1732 		(csums_after_crash_only == false		/* use for each resync? */
1733 		 || test_bit(CRASHED_PRIMARY, &device->flags));	/* or only after Primary crash? */
1734 }
1735 
1736 /**
1737  * drbd_start_resync() - Start the resync process
1738  * @device:	DRBD device.
1739  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1740  *
1741  * This function might bring you directly into one of the
1742  * C_PAUSED_SYNC_* states.
1743  */
1744 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1745 {
1746 	struct drbd_peer_device *peer_device = first_peer_device(device);
1747 	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1748 	union drbd_state ns;
1749 	int r;
1750 
1751 	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1752 		drbd_err(device, "Resync already running!\n");
1753 		return;
1754 	}
1755 
1756 	if (!connection) {
1757 		drbd_err(device, "No connection to peer, aborting!\n");
1758 		return;
1759 	}
1760 
1761 	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1762 		if (side == C_SYNC_TARGET) {
1763 			/* Since application IO was locked out during C_WF_BITMAP_T and
1764 			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1765 			   we check that we might make the data inconsistent. */
1766 			r = drbd_khelper(device, "before-resync-target");
1767 			r = (r >> 8) & 0xff;
1768 			if (r > 0) {
1769 				drbd_info(device, "before-resync-target handler returned %d, "
1770 					 "dropping connection.\n", r);
1771 				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1772 				return;
1773 			}
1774 		} else /* C_SYNC_SOURCE */ {
1775 			r = drbd_khelper(device, "before-resync-source");
1776 			r = (r >> 8) & 0xff;
1777 			if (r > 0) {
1778 				if (r == 3) {
1779 					drbd_info(device, "before-resync-source handler returned %d, "
1780 						 "ignoring. Old userland tools?", r);
1781 				} else {
1782 					drbd_info(device, "before-resync-source handler returned %d, "
1783 						 "dropping connection.\n", r);
1784 					conn_request_state(connection,
1785 							   NS(conn, C_DISCONNECTING), CS_HARD);
1786 					return;
1787 				}
1788 			}
1789 		}
1790 	}
1791 
1792 	if (current == connection->worker.task) {
1793 		/* The worker should not sleep waiting for state_mutex,
1794 		   that can take long */
1795 		if (!mutex_trylock(device->state_mutex)) {
1796 			set_bit(B_RS_H_DONE, &device->flags);
1797 			device->start_resync_timer.expires = jiffies + HZ/5;
1798 			add_timer(&device->start_resync_timer);
1799 			return;
1800 		}
1801 	} else {
1802 		mutex_lock(device->state_mutex);
1803 	}
1804 
1805 	lock_all_resources();
1806 	clear_bit(B_RS_H_DONE, &device->flags);
1807 	/* Did some connection breakage or IO error race with us? */
1808 	if (device->state.conn < C_CONNECTED
1809 	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1810 		unlock_all_resources();
1811 		goto out;
1812 	}
1813 
1814 	ns = drbd_read_state(device);
1815 
1816 	ns.aftr_isp = !_drbd_may_sync_now(device);
1817 
1818 	ns.conn = side;
1819 
1820 	if (side == C_SYNC_TARGET)
1821 		ns.disk = D_INCONSISTENT;
1822 	else /* side == C_SYNC_SOURCE */
1823 		ns.pdsk = D_INCONSISTENT;
1824 
1825 	r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1826 	ns = drbd_read_state(device);
1827 
1828 	if (ns.conn < C_CONNECTED)
1829 		r = SS_UNKNOWN_ERROR;
1830 
1831 	if (r == SS_SUCCESS) {
1832 		unsigned long tw = drbd_bm_total_weight(device);
1833 		unsigned long now = jiffies;
1834 		int i;
1835 
1836 		device->rs_failed    = 0;
1837 		device->rs_paused    = 0;
1838 		device->rs_same_csum = 0;
1839 		device->rs_last_sect_ev = 0;
1840 		device->rs_total     = tw;
1841 		device->rs_start     = now;
1842 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1843 			device->rs_mark_left[i] = tw;
1844 			device->rs_mark_time[i] = now;
1845 		}
1846 		drbd_pause_after(device);
1847 		/* Forget potentially stale cached per resync extent bit-counts.
1848 		 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1849 		 * disabled, and know the disk state is ok. */
1850 		spin_lock(&device->al_lock);
1851 		lc_reset(device->resync);
1852 		device->resync_locked = 0;
1853 		device->resync_wenr = LC_FREE;
1854 		spin_unlock(&device->al_lock);
1855 	}
1856 	unlock_all_resources();
1857 
1858 	if (r == SS_SUCCESS) {
1859 		wake_up(&device->al_wait); /* for lc_reset() above */
1860 		/* reset rs_last_bcast when a resync or verify is started,
1861 		 * to deal with potential jiffies wrap. */
1862 		device->rs_last_bcast = jiffies - HZ;
1863 
1864 		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1865 		     drbd_conn_str(ns.conn),
1866 		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1867 		     (unsigned long) device->rs_total);
1868 		if (side == C_SYNC_TARGET) {
1869 			device->bm_resync_fo = 0;
1870 			device->use_csums = use_checksum_based_resync(connection, device);
1871 		} else {
1872 			device->use_csums = false;
1873 		}
1874 
1875 		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1876 		 * with w_send_oos, or the sync target will get confused as to
1877 		 * how much bits to resync.  We cannot do that always, because for an
1878 		 * empty resync and protocol < 95, we need to do it here, as we call
1879 		 * drbd_resync_finished from here in that case.
1880 		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1881 		 * and from after_state_ch otherwise. */
1882 		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1883 			drbd_gen_and_send_sync_uuid(peer_device);
1884 
1885 		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1886 			/* This still has a race (about when exactly the peers
1887 			 * detect connection loss) that can lead to a full sync
1888 			 * on next handshake. In 8.3.9 we fixed this with explicit
1889 			 * resync-finished notifications, but the fix
1890 			 * introduces a protocol change.  Sleeping for some
1891 			 * time longer than the ping interval + timeout on the
1892 			 * SyncSource, to give the SyncTarget the chance to
1893 			 * detect connection loss, then waiting for a ping
1894 			 * response (implicit in drbd_resync_finished) reduces
1895 			 * the race considerably, but does not solve it. */
1896 			if (side == C_SYNC_SOURCE) {
1897 				struct net_conf *nc;
1898 				int timeo;
1899 
1900 				rcu_read_lock();
1901 				nc = rcu_dereference(connection->net_conf);
1902 				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1903 				rcu_read_unlock();
1904 				schedule_timeout_interruptible(timeo);
1905 			}
1906 			drbd_resync_finished(device);
1907 		}
1908 
1909 		drbd_rs_controller_reset(device);
1910 		/* ns.conn may already be != device->state.conn,
1911 		 * we may have been paused in between, or become paused until
1912 		 * the timer triggers.
1913 		 * No matter, that is handled in resync_timer_fn() */
1914 		if (ns.conn == C_SYNC_TARGET)
1915 			mod_timer(&device->resync_timer, jiffies);
1916 
1917 		drbd_md_sync(device);
1918 	}
1919 	put_ldev(device);
1920 out:
1921 	mutex_unlock(device->state_mutex);
1922 }
1923 
1924 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1925 {
1926 	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1927 	device->rs_last_bcast = jiffies;
1928 
1929 	if (!get_ldev(device))
1930 		return;
1931 
1932 	drbd_bm_write_lazy(device, 0);
1933 	if (resync_done && is_sync_state(device->state.conn))
1934 		drbd_resync_finished(device);
1935 
1936 	drbd_bcast_event(device, &sib);
1937 	/* update timestamp, in case it took a while to write out stuff */
1938 	device->rs_last_bcast = jiffies;
1939 	put_ldev(device);
1940 }
1941 
1942 static void drbd_ldev_destroy(struct drbd_device *device)
1943 {
1944 	lc_destroy(device->resync);
1945 	device->resync = NULL;
1946 	lc_destroy(device->act_log);
1947 	device->act_log = NULL;
1948 
1949 	__acquire(local);
1950 	drbd_backing_dev_free(device, device->ldev);
1951 	device->ldev = NULL;
1952 	__release(local);
1953 
1954 	clear_bit(GOING_DISKLESS, &device->flags);
1955 	wake_up(&device->misc_wait);
1956 }
1957 
1958 static void go_diskless(struct drbd_device *device)
1959 {
1960 	D_ASSERT(device, device->state.disk == D_FAILED);
1961 	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1962 	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1963 	 * the protected members anymore, though, so once put_ldev reaches zero
1964 	 * again, it will be safe to free them. */
1965 
1966 	/* Try to write changed bitmap pages, read errors may have just
1967 	 * set some bits outside the area covered by the activity log.
1968 	 *
1969 	 * If we have an IO error during the bitmap writeout,
1970 	 * we will want a full sync next time, just in case.
1971 	 * (Do we want a specific meta data flag for this?)
1972 	 *
1973 	 * If that does not make it to stable storage either,
1974 	 * we cannot do anything about that anymore.
1975 	 *
1976 	 * We still need to check if both bitmap and ldev are present, we may
1977 	 * end up here after a failed attach, before ldev was even assigned.
1978 	 */
1979 	if (device->bitmap && device->ldev) {
1980 		/* An interrupted resync or similar is allowed to recounts bits
1981 		 * while we detach.
1982 		 * Any modifications would not be expected anymore, though.
1983 		 */
1984 		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1985 					"detach", BM_LOCKED_TEST_ALLOWED)) {
1986 			if (test_bit(WAS_READ_ERROR, &device->flags)) {
1987 				drbd_md_set_flag(device, MDF_FULL_SYNC);
1988 				drbd_md_sync(device);
1989 			}
1990 		}
1991 	}
1992 
1993 	drbd_force_state(device, NS(disk, D_DISKLESS));
1994 }
1995 
1996 static int do_md_sync(struct drbd_device *device)
1997 {
1998 	drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1999 	drbd_md_sync(device);
2000 	return 0;
2001 }
2002 
2003 /* only called from drbd_worker thread, no locking */
2004 void __update_timing_details(
2005 		struct drbd_thread_timing_details *tdp,
2006 		unsigned int *cb_nr,
2007 		void *cb,
2008 		const char *fn, const unsigned int line)
2009 {
2010 	unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2011 	struct drbd_thread_timing_details *td = tdp + i;
2012 
2013 	td->start_jif = jiffies;
2014 	td->cb_addr = cb;
2015 	td->caller_fn = fn;
2016 	td->line = line;
2017 	td->cb_nr = *cb_nr;
2018 
2019 	i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2020 	td = tdp + i;
2021 	memset(td, 0, sizeof(*td));
2022 
2023 	++(*cb_nr);
2024 }
2025 
2026 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2027 {
2028 	if (test_bit(MD_SYNC, &todo))
2029 		do_md_sync(device);
2030 	if (test_bit(RS_DONE, &todo) ||
2031 	    test_bit(RS_PROGRESS, &todo))
2032 		update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2033 	if (test_bit(GO_DISKLESS, &todo))
2034 		go_diskless(device);
2035 	if (test_bit(DESTROY_DISK, &todo))
2036 		drbd_ldev_destroy(device);
2037 	if (test_bit(RS_START, &todo))
2038 		do_start_resync(device);
2039 }
2040 
2041 #define DRBD_DEVICE_WORK_MASK	\
2042 	((1UL << GO_DISKLESS)	\
2043 	|(1UL << DESTROY_DISK)	\
2044 	|(1UL << MD_SYNC)	\
2045 	|(1UL << RS_START)	\
2046 	|(1UL << RS_PROGRESS)	\
2047 	|(1UL << RS_DONE)	\
2048 	)
2049 
2050 static unsigned long get_work_bits(unsigned long *flags)
2051 {
2052 	unsigned long old, new;
2053 	do {
2054 		old = *flags;
2055 		new = old & ~DRBD_DEVICE_WORK_MASK;
2056 	} while (cmpxchg(flags, old, new) != old);
2057 	return old & DRBD_DEVICE_WORK_MASK;
2058 }
2059 
2060 static void do_unqueued_work(struct drbd_connection *connection)
2061 {
2062 	struct drbd_peer_device *peer_device;
2063 	int vnr;
2064 
2065 	rcu_read_lock();
2066 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2067 		struct drbd_device *device = peer_device->device;
2068 		unsigned long todo = get_work_bits(&device->flags);
2069 		if (!todo)
2070 			continue;
2071 
2072 		kref_get(&device->kref);
2073 		rcu_read_unlock();
2074 		do_device_work(device, todo);
2075 		kref_put(&device->kref, drbd_destroy_device);
2076 		rcu_read_lock();
2077 	}
2078 	rcu_read_unlock();
2079 }
2080 
2081 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2082 {
2083 	spin_lock_irq(&queue->q_lock);
2084 	list_splice_tail_init(&queue->q, work_list);
2085 	spin_unlock_irq(&queue->q_lock);
2086 	return !list_empty(work_list);
2087 }
2088 
2089 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2090 {
2091 	DEFINE_WAIT(wait);
2092 	struct net_conf *nc;
2093 	int uncork, cork;
2094 
2095 	dequeue_work_batch(&connection->sender_work, work_list);
2096 	if (!list_empty(work_list))
2097 		return;
2098 
2099 	/* Still nothing to do?
2100 	 * Maybe we still need to close the current epoch,
2101 	 * even if no new requests are queued yet.
2102 	 *
2103 	 * Also, poke TCP, just in case.
2104 	 * Then wait for new work (or signal). */
2105 	rcu_read_lock();
2106 	nc = rcu_dereference(connection->net_conf);
2107 	uncork = nc ? nc->tcp_cork : 0;
2108 	rcu_read_unlock();
2109 	if (uncork) {
2110 		mutex_lock(&connection->data.mutex);
2111 		if (connection->data.socket)
2112 			drbd_tcp_uncork(connection->data.socket);
2113 		mutex_unlock(&connection->data.mutex);
2114 	}
2115 
2116 	for (;;) {
2117 		int send_barrier;
2118 		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2119 		spin_lock_irq(&connection->resource->req_lock);
2120 		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2121 		if (!list_empty(&connection->sender_work.q))
2122 			list_splice_tail_init(&connection->sender_work.q, work_list);
2123 		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2124 		if (!list_empty(work_list) || signal_pending(current)) {
2125 			spin_unlock_irq(&connection->resource->req_lock);
2126 			break;
2127 		}
2128 
2129 		/* We found nothing new to do, no to-be-communicated request,
2130 		 * no other work item.  We may still need to close the last
2131 		 * epoch.  Next incoming request epoch will be connection ->
2132 		 * current transfer log epoch number.  If that is different
2133 		 * from the epoch of the last request we communicated, it is
2134 		 * safe to send the epoch separating barrier now.
2135 		 */
2136 		send_barrier =
2137 			atomic_read(&connection->current_tle_nr) !=
2138 			connection->send.current_epoch_nr;
2139 		spin_unlock_irq(&connection->resource->req_lock);
2140 
2141 		if (send_barrier)
2142 			maybe_send_barrier(connection,
2143 					connection->send.current_epoch_nr + 1);
2144 
2145 		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2146 			break;
2147 
2148 		/* drbd_send() may have called flush_signals() */
2149 		if (get_t_state(&connection->worker) != RUNNING)
2150 			break;
2151 
2152 		schedule();
2153 		/* may be woken up for other things but new work, too,
2154 		 * e.g. if the current epoch got closed.
2155 		 * In which case we send the barrier above. */
2156 	}
2157 	finish_wait(&connection->sender_work.q_wait, &wait);
2158 
2159 	/* someone may have changed the config while we have been waiting above. */
2160 	rcu_read_lock();
2161 	nc = rcu_dereference(connection->net_conf);
2162 	cork = nc ? nc->tcp_cork : 0;
2163 	rcu_read_unlock();
2164 	mutex_lock(&connection->data.mutex);
2165 	if (connection->data.socket) {
2166 		if (cork)
2167 			drbd_tcp_cork(connection->data.socket);
2168 		else if (!uncork)
2169 			drbd_tcp_uncork(connection->data.socket);
2170 	}
2171 	mutex_unlock(&connection->data.mutex);
2172 }
2173 
2174 int drbd_worker(struct drbd_thread *thi)
2175 {
2176 	struct drbd_connection *connection = thi->connection;
2177 	struct drbd_work *w = NULL;
2178 	struct drbd_peer_device *peer_device;
2179 	LIST_HEAD(work_list);
2180 	int vnr;
2181 
2182 	while (get_t_state(thi) == RUNNING) {
2183 		drbd_thread_current_set_cpu(thi);
2184 
2185 		if (list_empty(&work_list)) {
2186 			update_worker_timing_details(connection, wait_for_work);
2187 			wait_for_work(connection, &work_list);
2188 		}
2189 
2190 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2191 			update_worker_timing_details(connection, do_unqueued_work);
2192 			do_unqueued_work(connection);
2193 		}
2194 
2195 		if (signal_pending(current)) {
2196 			flush_signals(current);
2197 			if (get_t_state(thi) == RUNNING) {
2198 				drbd_warn(connection, "Worker got an unexpected signal\n");
2199 				continue;
2200 			}
2201 			break;
2202 		}
2203 
2204 		if (get_t_state(thi) != RUNNING)
2205 			break;
2206 
2207 		if (!list_empty(&work_list)) {
2208 			w = list_first_entry(&work_list, struct drbd_work, list);
2209 			list_del_init(&w->list);
2210 			update_worker_timing_details(connection, w->cb);
2211 			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2212 				continue;
2213 			if (connection->cstate >= C_WF_REPORT_PARAMS)
2214 				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2215 		}
2216 	}
2217 
2218 	do {
2219 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2220 			update_worker_timing_details(connection, do_unqueued_work);
2221 			do_unqueued_work(connection);
2222 		}
2223 		if (!list_empty(&work_list)) {
2224 			w = list_first_entry(&work_list, struct drbd_work, list);
2225 			list_del_init(&w->list);
2226 			update_worker_timing_details(connection, w->cb);
2227 			w->cb(w, 1);
2228 		} else
2229 			dequeue_work_batch(&connection->sender_work, &work_list);
2230 	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2231 
2232 	rcu_read_lock();
2233 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2234 		struct drbd_device *device = peer_device->device;
2235 		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2236 		kref_get(&device->kref);
2237 		rcu_read_unlock();
2238 		drbd_device_cleanup(device);
2239 		kref_put(&device->kref, drbd_destroy_device);
2240 		rcu_read_lock();
2241 	}
2242 	rcu_read_unlock();
2243 
2244 	return 0;
2245 }
2246