xref: /openbmc/linux/drivers/block/drbd/drbd_worker.c (revision a06c488d)
1 /*
2    drbd_worker.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 
24 */
25 
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37 
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41 
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44 
45 /* endio handlers:
46  *   drbd_md_endio (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   drbd_bm_endio (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57 
58 /* used for synchronous meta data and bitmap IO
59  * submitted by drbd_md_sync_page_io()
60  */
61 void drbd_md_endio(struct bio *bio)
62 {
63 	struct drbd_device *device;
64 
65 	device = bio->bi_private;
66 	device->md_io.error = bio->bi_error;
67 
68 	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
69 	 * to timeout on the lower level device, and eventually detach from it.
70 	 * If this io completion runs after that timeout expired, this
71 	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
72 	 * During normal operation, this only puts that extra reference
73 	 * down to 1 again.
74 	 * Make sure we first drop the reference, and only then signal
75 	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
76 	 * next drbd_md_sync_page_io(), that we trigger the
77 	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
78 	 */
79 	drbd_md_put_buffer(device);
80 	device->md_io.done = 1;
81 	wake_up(&device->misc_wait);
82 	bio_put(bio);
83 	if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
84 		put_ldev(device);
85 }
86 
87 /* reads on behalf of the partner,
88  * "submitted" by the receiver
89  */
90 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
91 {
92 	unsigned long flags = 0;
93 	struct drbd_peer_device *peer_device = peer_req->peer_device;
94 	struct drbd_device *device = peer_device->device;
95 
96 	spin_lock_irqsave(&device->resource->req_lock, flags);
97 	device->read_cnt += peer_req->i.size >> 9;
98 	list_del(&peer_req->w.list);
99 	if (list_empty(&device->read_ee))
100 		wake_up(&device->ee_wait);
101 	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
102 		__drbd_chk_io_error(device, DRBD_READ_ERROR);
103 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
104 
105 	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
106 	put_ldev(device);
107 }
108 
109 /* writes on behalf of the partner, or resync writes,
110  * "submitted" by the receiver, final stage.  */
111 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
112 {
113 	unsigned long flags = 0;
114 	struct drbd_peer_device *peer_device = peer_req->peer_device;
115 	struct drbd_device *device = peer_device->device;
116 	struct drbd_connection *connection = peer_device->connection;
117 	struct drbd_interval i;
118 	int do_wake;
119 	u64 block_id;
120 	int do_al_complete_io;
121 
122 	/* after we moved peer_req to done_ee,
123 	 * we may no longer access it,
124 	 * it may be freed/reused already!
125 	 * (as soon as we release the req_lock) */
126 	i = peer_req->i;
127 	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
128 	block_id = peer_req->block_id;
129 	peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
130 
131 	spin_lock_irqsave(&device->resource->req_lock, flags);
132 	device->writ_cnt += peer_req->i.size >> 9;
133 	list_move_tail(&peer_req->w.list, &device->done_ee);
134 
135 	/*
136 	 * Do not remove from the write_requests tree here: we did not send the
137 	 * Ack yet and did not wake possibly waiting conflicting requests.
138 	 * Removed from the tree from "drbd_process_done_ee" within the
139 	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
140 	 * _drbd_clear_done_ee.
141 	 */
142 
143 	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
144 
145 	/* FIXME do we want to detach for failed REQ_DISCARD?
146 	 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
147 	if (peer_req->flags & EE_WAS_ERROR)
148 		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
149 
150 	if (connection->cstate >= C_WF_REPORT_PARAMS) {
151 		kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
152 		if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
153 			kref_put(&device->kref, drbd_destroy_device);
154 	}
155 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
156 
157 	if (block_id == ID_SYNCER)
158 		drbd_rs_complete_io(device, i.sector);
159 
160 	if (do_wake)
161 		wake_up(&device->ee_wait);
162 
163 	if (do_al_complete_io)
164 		drbd_al_complete_io(device, &i);
165 
166 	put_ldev(device);
167 }
168 
169 /* writes on behalf of the partner, or resync writes,
170  * "submitted" by the receiver.
171  */
172 void drbd_peer_request_endio(struct bio *bio)
173 {
174 	struct drbd_peer_request *peer_req = bio->bi_private;
175 	struct drbd_device *device = peer_req->peer_device->device;
176 	int is_write = bio_data_dir(bio) == WRITE;
177 	int is_discard = !!(bio->bi_rw & REQ_DISCARD);
178 
179 	if (bio->bi_error && __ratelimit(&drbd_ratelimit_state))
180 		drbd_warn(device, "%s: error=%d s=%llus\n",
181 				is_write ? (is_discard ? "discard" : "write")
182 					: "read", bio->bi_error,
183 				(unsigned long long)peer_req->i.sector);
184 
185 	if (bio->bi_error)
186 		set_bit(__EE_WAS_ERROR, &peer_req->flags);
187 
188 	bio_put(bio); /* no need for the bio anymore */
189 	if (atomic_dec_and_test(&peer_req->pending_bios)) {
190 		if (is_write)
191 			drbd_endio_write_sec_final(peer_req);
192 		else
193 			drbd_endio_read_sec_final(peer_req);
194 	}
195 }
196 
197 void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
198 {
199 	panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
200 		device->minor, device->resource->name, device->vnr);
201 }
202 
203 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
204  */
205 void drbd_request_endio(struct bio *bio)
206 {
207 	unsigned long flags;
208 	struct drbd_request *req = bio->bi_private;
209 	struct drbd_device *device = req->device;
210 	struct bio_and_error m;
211 	enum drbd_req_event what;
212 
213 	/* If this request was aborted locally before,
214 	 * but now was completed "successfully",
215 	 * chances are that this caused arbitrary data corruption.
216 	 *
217 	 * "aborting" requests, or force-detaching the disk, is intended for
218 	 * completely blocked/hung local backing devices which do no longer
219 	 * complete requests at all, not even do error completions.  In this
220 	 * situation, usually a hard-reset and failover is the only way out.
221 	 *
222 	 * By "aborting", basically faking a local error-completion,
223 	 * we allow for a more graceful swichover by cleanly migrating services.
224 	 * Still the affected node has to be rebooted "soon".
225 	 *
226 	 * By completing these requests, we allow the upper layers to re-use
227 	 * the associated data pages.
228 	 *
229 	 * If later the local backing device "recovers", and now DMAs some data
230 	 * from disk into the original request pages, in the best case it will
231 	 * just put random data into unused pages; but typically it will corrupt
232 	 * meanwhile completely unrelated data, causing all sorts of damage.
233 	 *
234 	 * Which means delayed successful completion,
235 	 * especially for READ requests,
236 	 * is a reason to panic().
237 	 *
238 	 * We assume that a delayed *error* completion is OK,
239 	 * though we still will complain noisily about it.
240 	 */
241 	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
242 		if (__ratelimit(&drbd_ratelimit_state))
243 			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
244 
245 		if (!bio->bi_error)
246 			drbd_panic_after_delayed_completion_of_aborted_request(device);
247 	}
248 
249 	/* to avoid recursion in __req_mod */
250 	if (unlikely(bio->bi_error)) {
251 		if (bio->bi_rw & REQ_DISCARD)
252 			what = (bio->bi_error == -EOPNOTSUPP)
253 				? DISCARD_COMPLETED_NOTSUPP
254 				: DISCARD_COMPLETED_WITH_ERROR;
255 		else
256 			what = (bio_data_dir(bio) == WRITE)
257 			? WRITE_COMPLETED_WITH_ERROR
258 			: (bio_rw(bio) == READ)
259 			  ? READ_COMPLETED_WITH_ERROR
260 			  : READ_AHEAD_COMPLETED_WITH_ERROR;
261 	} else
262 		what = COMPLETED_OK;
263 
264 	bio_put(req->private_bio);
265 	req->private_bio = ERR_PTR(bio->bi_error);
266 
267 	/* not req_mod(), we need irqsave here! */
268 	spin_lock_irqsave(&device->resource->req_lock, flags);
269 	__req_mod(req, what, &m);
270 	spin_unlock_irqrestore(&device->resource->req_lock, flags);
271 	put_ldev(device);
272 
273 	if (m.bio)
274 		complete_master_bio(device, &m);
275 }
276 
277 void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
278 {
279 	struct hash_desc desc;
280 	struct scatterlist sg;
281 	struct page *page = peer_req->pages;
282 	struct page *tmp;
283 	unsigned len;
284 
285 	desc.tfm = tfm;
286 	desc.flags = 0;
287 
288 	sg_init_table(&sg, 1);
289 	crypto_hash_init(&desc);
290 
291 	while ((tmp = page_chain_next(page))) {
292 		/* all but the last page will be fully used */
293 		sg_set_page(&sg, page, PAGE_SIZE, 0);
294 		crypto_hash_update(&desc, &sg, sg.length);
295 		page = tmp;
296 	}
297 	/* and now the last, possibly only partially used page */
298 	len = peer_req->i.size & (PAGE_SIZE - 1);
299 	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
300 	crypto_hash_update(&desc, &sg, sg.length);
301 	crypto_hash_final(&desc, digest);
302 }
303 
304 void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
305 {
306 	struct hash_desc desc;
307 	struct scatterlist sg;
308 	struct bio_vec bvec;
309 	struct bvec_iter iter;
310 
311 	desc.tfm = tfm;
312 	desc.flags = 0;
313 
314 	sg_init_table(&sg, 1);
315 	crypto_hash_init(&desc);
316 
317 	bio_for_each_segment(bvec, bio, iter) {
318 		sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
319 		crypto_hash_update(&desc, &sg, sg.length);
320 	}
321 	crypto_hash_final(&desc, digest);
322 }
323 
324 /* MAYBE merge common code with w_e_end_ov_req */
325 static int w_e_send_csum(struct drbd_work *w, int cancel)
326 {
327 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
328 	struct drbd_peer_device *peer_device = peer_req->peer_device;
329 	struct drbd_device *device = peer_device->device;
330 	int digest_size;
331 	void *digest;
332 	int err = 0;
333 
334 	if (unlikely(cancel))
335 		goto out;
336 
337 	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
338 		goto out;
339 
340 	digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
341 	digest = kmalloc(digest_size, GFP_NOIO);
342 	if (digest) {
343 		sector_t sector = peer_req->i.sector;
344 		unsigned int size = peer_req->i.size;
345 		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
346 		/* Free peer_req and pages before send.
347 		 * In case we block on congestion, we could otherwise run into
348 		 * some distributed deadlock, if the other side blocks on
349 		 * congestion as well, because our receiver blocks in
350 		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
351 		drbd_free_peer_req(device, peer_req);
352 		peer_req = NULL;
353 		inc_rs_pending(device);
354 		err = drbd_send_drequest_csum(peer_device, sector, size,
355 					      digest, digest_size,
356 					      P_CSUM_RS_REQUEST);
357 		kfree(digest);
358 	} else {
359 		drbd_err(device, "kmalloc() of digest failed.\n");
360 		err = -ENOMEM;
361 	}
362 
363 out:
364 	if (peer_req)
365 		drbd_free_peer_req(device, peer_req);
366 
367 	if (unlikely(err))
368 		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
369 	return err;
370 }
371 
372 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
373 
374 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
375 {
376 	struct drbd_device *device = peer_device->device;
377 	struct drbd_peer_request *peer_req;
378 
379 	if (!get_ldev(device))
380 		return -EIO;
381 
382 	/* GFP_TRY, because if there is no memory available right now, this may
383 	 * be rescheduled for later. It is "only" background resync, after all. */
384 	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
385 				       size, true /* has real payload */, GFP_TRY);
386 	if (!peer_req)
387 		goto defer;
388 
389 	peer_req->w.cb = w_e_send_csum;
390 	spin_lock_irq(&device->resource->req_lock);
391 	list_add_tail(&peer_req->w.list, &device->read_ee);
392 	spin_unlock_irq(&device->resource->req_lock);
393 
394 	atomic_add(size >> 9, &device->rs_sect_ev);
395 	if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
396 		return 0;
397 
398 	/* If it failed because of ENOMEM, retry should help.  If it failed
399 	 * because bio_add_page failed (probably broken lower level driver),
400 	 * retry may or may not help.
401 	 * If it does not, you may need to force disconnect. */
402 	spin_lock_irq(&device->resource->req_lock);
403 	list_del(&peer_req->w.list);
404 	spin_unlock_irq(&device->resource->req_lock);
405 
406 	drbd_free_peer_req(device, peer_req);
407 defer:
408 	put_ldev(device);
409 	return -EAGAIN;
410 }
411 
412 int w_resync_timer(struct drbd_work *w, int cancel)
413 {
414 	struct drbd_device *device =
415 		container_of(w, struct drbd_device, resync_work);
416 
417 	switch (device->state.conn) {
418 	case C_VERIFY_S:
419 		make_ov_request(device, cancel);
420 		break;
421 	case C_SYNC_TARGET:
422 		make_resync_request(device, cancel);
423 		break;
424 	}
425 
426 	return 0;
427 }
428 
429 void resync_timer_fn(unsigned long data)
430 {
431 	struct drbd_device *device = (struct drbd_device *) data;
432 
433 	drbd_queue_work_if_unqueued(
434 		&first_peer_device(device)->connection->sender_work,
435 		&device->resync_work);
436 }
437 
438 static void fifo_set(struct fifo_buffer *fb, int value)
439 {
440 	int i;
441 
442 	for (i = 0; i < fb->size; i++)
443 		fb->values[i] = value;
444 }
445 
446 static int fifo_push(struct fifo_buffer *fb, int value)
447 {
448 	int ov;
449 
450 	ov = fb->values[fb->head_index];
451 	fb->values[fb->head_index++] = value;
452 
453 	if (fb->head_index >= fb->size)
454 		fb->head_index = 0;
455 
456 	return ov;
457 }
458 
459 static void fifo_add_val(struct fifo_buffer *fb, int value)
460 {
461 	int i;
462 
463 	for (i = 0; i < fb->size; i++)
464 		fb->values[i] += value;
465 }
466 
467 struct fifo_buffer *fifo_alloc(int fifo_size)
468 {
469 	struct fifo_buffer *fb;
470 
471 	fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
472 	if (!fb)
473 		return NULL;
474 
475 	fb->head_index = 0;
476 	fb->size = fifo_size;
477 	fb->total = 0;
478 
479 	return fb;
480 }
481 
482 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
483 {
484 	struct disk_conf *dc;
485 	unsigned int want;     /* The number of sectors we want in-flight */
486 	int req_sect; /* Number of sectors to request in this turn */
487 	int correction; /* Number of sectors more we need in-flight */
488 	int cps; /* correction per invocation of drbd_rs_controller() */
489 	int steps; /* Number of time steps to plan ahead */
490 	int curr_corr;
491 	int max_sect;
492 	struct fifo_buffer *plan;
493 
494 	dc = rcu_dereference(device->ldev->disk_conf);
495 	plan = rcu_dereference(device->rs_plan_s);
496 
497 	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
498 
499 	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
500 		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
501 	} else { /* normal path */
502 		want = dc->c_fill_target ? dc->c_fill_target :
503 			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
504 	}
505 
506 	correction = want - device->rs_in_flight - plan->total;
507 
508 	/* Plan ahead */
509 	cps = correction / steps;
510 	fifo_add_val(plan, cps);
511 	plan->total += cps * steps;
512 
513 	/* What we do in this step */
514 	curr_corr = fifo_push(plan, 0);
515 	plan->total -= curr_corr;
516 
517 	req_sect = sect_in + curr_corr;
518 	if (req_sect < 0)
519 		req_sect = 0;
520 
521 	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
522 	if (req_sect > max_sect)
523 		req_sect = max_sect;
524 
525 	/*
526 	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
527 		 sect_in, device->rs_in_flight, want, correction,
528 		 steps, cps, device->rs_planed, curr_corr, req_sect);
529 	*/
530 
531 	return req_sect;
532 }
533 
534 static int drbd_rs_number_requests(struct drbd_device *device)
535 {
536 	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
537 	int number, mxb;
538 
539 	sect_in = atomic_xchg(&device->rs_sect_in, 0);
540 	device->rs_in_flight -= sect_in;
541 
542 	rcu_read_lock();
543 	mxb = drbd_get_max_buffers(device) / 2;
544 	if (rcu_dereference(device->rs_plan_s)->size) {
545 		number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
546 		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
547 	} else {
548 		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
549 		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
550 	}
551 	rcu_read_unlock();
552 
553 	/* Don't have more than "max-buffers"/2 in-flight.
554 	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
555 	 * potentially causing a distributed deadlock on congestion during
556 	 * online-verify or (checksum-based) resync, if max-buffers,
557 	 * socket buffer sizes and resync rate settings are mis-configured. */
558 
559 	/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
560 	 * mxb (as used here, and in drbd_alloc_pages on the peer) is
561 	 * "number of pages" (typically also 4k),
562 	 * but "rs_in_flight" is in "sectors" (512 Byte). */
563 	if (mxb - device->rs_in_flight/8 < number)
564 		number = mxb - device->rs_in_flight/8;
565 
566 	return number;
567 }
568 
569 static int make_resync_request(struct drbd_device *const device, int cancel)
570 {
571 	struct drbd_peer_device *const peer_device = first_peer_device(device);
572 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
573 	unsigned long bit;
574 	sector_t sector;
575 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
576 	int max_bio_size;
577 	int number, rollback_i, size;
578 	int align, requeue = 0;
579 	int i = 0;
580 
581 	if (unlikely(cancel))
582 		return 0;
583 
584 	if (device->rs_total == 0) {
585 		/* empty resync? */
586 		drbd_resync_finished(device);
587 		return 0;
588 	}
589 
590 	if (!get_ldev(device)) {
591 		/* Since we only need to access device->rsync a
592 		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
593 		   to continue resync with a broken disk makes no sense at
594 		   all */
595 		drbd_err(device, "Disk broke down during resync!\n");
596 		return 0;
597 	}
598 
599 	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
600 	number = drbd_rs_number_requests(device);
601 	if (number <= 0)
602 		goto requeue;
603 
604 	for (i = 0; i < number; i++) {
605 		/* Stop generating RS requests when half of the send buffer is filled,
606 		 * but notify TCP that we'd like to have more space. */
607 		mutex_lock(&connection->data.mutex);
608 		if (connection->data.socket) {
609 			struct sock *sk = connection->data.socket->sk;
610 			int queued = sk->sk_wmem_queued;
611 			int sndbuf = sk->sk_sndbuf;
612 			if (queued > sndbuf / 2) {
613 				requeue = 1;
614 				if (sk->sk_socket)
615 					set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
616 			}
617 		} else
618 			requeue = 1;
619 		mutex_unlock(&connection->data.mutex);
620 		if (requeue)
621 			goto requeue;
622 
623 next_sector:
624 		size = BM_BLOCK_SIZE;
625 		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
626 
627 		if (bit == DRBD_END_OF_BITMAP) {
628 			device->bm_resync_fo = drbd_bm_bits(device);
629 			put_ldev(device);
630 			return 0;
631 		}
632 
633 		sector = BM_BIT_TO_SECT(bit);
634 
635 		if (drbd_try_rs_begin_io(device, sector)) {
636 			device->bm_resync_fo = bit;
637 			goto requeue;
638 		}
639 		device->bm_resync_fo = bit + 1;
640 
641 		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
642 			drbd_rs_complete_io(device, sector);
643 			goto next_sector;
644 		}
645 
646 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
647 		/* try to find some adjacent bits.
648 		 * we stop if we have already the maximum req size.
649 		 *
650 		 * Additionally always align bigger requests, in order to
651 		 * be prepared for all stripe sizes of software RAIDs.
652 		 */
653 		align = 1;
654 		rollback_i = i;
655 		while (i < number) {
656 			if (size + BM_BLOCK_SIZE > max_bio_size)
657 				break;
658 
659 			/* Be always aligned */
660 			if (sector & ((1<<(align+3))-1))
661 				break;
662 
663 			/* do not cross extent boundaries */
664 			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
665 				break;
666 			/* now, is it actually dirty, after all?
667 			 * caution, drbd_bm_test_bit is tri-state for some
668 			 * obscure reason; ( b == 0 ) would get the out-of-band
669 			 * only accidentally right because of the "oddly sized"
670 			 * adjustment below */
671 			if (drbd_bm_test_bit(device, bit+1) != 1)
672 				break;
673 			bit++;
674 			size += BM_BLOCK_SIZE;
675 			if ((BM_BLOCK_SIZE << align) <= size)
676 				align++;
677 			i++;
678 		}
679 		/* if we merged some,
680 		 * reset the offset to start the next drbd_bm_find_next from */
681 		if (size > BM_BLOCK_SIZE)
682 			device->bm_resync_fo = bit + 1;
683 #endif
684 
685 		/* adjust very last sectors, in case we are oddly sized */
686 		if (sector + (size>>9) > capacity)
687 			size = (capacity-sector)<<9;
688 
689 		if (device->use_csums) {
690 			switch (read_for_csum(peer_device, sector, size)) {
691 			case -EIO: /* Disk failure */
692 				put_ldev(device);
693 				return -EIO;
694 			case -EAGAIN: /* allocation failed, or ldev busy */
695 				drbd_rs_complete_io(device, sector);
696 				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
697 				i = rollback_i;
698 				goto requeue;
699 			case 0:
700 				/* everything ok */
701 				break;
702 			default:
703 				BUG();
704 			}
705 		} else {
706 			int err;
707 
708 			inc_rs_pending(device);
709 			err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
710 						 sector, size, ID_SYNCER);
711 			if (err) {
712 				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
713 				dec_rs_pending(device);
714 				put_ldev(device);
715 				return err;
716 			}
717 		}
718 	}
719 
720 	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
721 		/* last syncer _request_ was sent,
722 		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
723 		 * next sync group will resume), as soon as we receive the last
724 		 * resync data block, and the last bit is cleared.
725 		 * until then resync "work" is "inactive" ...
726 		 */
727 		put_ldev(device);
728 		return 0;
729 	}
730 
731  requeue:
732 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
733 	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
734 	put_ldev(device);
735 	return 0;
736 }
737 
738 static int make_ov_request(struct drbd_device *device, int cancel)
739 {
740 	int number, i, size;
741 	sector_t sector;
742 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
743 	bool stop_sector_reached = false;
744 
745 	if (unlikely(cancel))
746 		return 1;
747 
748 	number = drbd_rs_number_requests(device);
749 
750 	sector = device->ov_position;
751 	for (i = 0; i < number; i++) {
752 		if (sector >= capacity)
753 			return 1;
754 
755 		/* We check for "finished" only in the reply path:
756 		 * w_e_end_ov_reply().
757 		 * We need to send at least one request out. */
758 		stop_sector_reached = i > 0
759 			&& verify_can_do_stop_sector(device)
760 			&& sector >= device->ov_stop_sector;
761 		if (stop_sector_reached)
762 			break;
763 
764 		size = BM_BLOCK_SIZE;
765 
766 		if (drbd_try_rs_begin_io(device, sector)) {
767 			device->ov_position = sector;
768 			goto requeue;
769 		}
770 
771 		if (sector + (size>>9) > capacity)
772 			size = (capacity-sector)<<9;
773 
774 		inc_rs_pending(device);
775 		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
776 			dec_rs_pending(device);
777 			return 0;
778 		}
779 		sector += BM_SECT_PER_BIT;
780 	}
781 	device->ov_position = sector;
782 
783  requeue:
784 	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
785 	if (i == 0 || !stop_sector_reached)
786 		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
787 	return 1;
788 }
789 
790 int w_ov_finished(struct drbd_work *w, int cancel)
791 {
792 	struct drbd_device_work *dw =
793 		container_of(w, struct drbd_device_work, w);
794 	struct drbd_device *device = dw->device;
795 	kfree(dw);
796 	ov_out_of_sync_print(device);
797 	drbd_resync_finished(device);
798 
799 	return 0;
800 }
801 
802 static int w_resync_finished(struct drbd_work *w, int cancel)
803 {
804 	struct drbd_device_work *dw =
805 		container_of(w, struct drbd_device_work, w);
806 	struct drbd_device *device = dw->device;
807 	kfree(dw);
808 
809 	drbd_resync_finished(device);
810 
811 	return 0;
812 }
813 
814 static void ping_peer(struct drbd_device *device)
815 {
816 	struct drbd_connection *connection = first_peer_device(device)->connection;
817 
818 	clear_bit(GOT_PING_ACK, &connection->flags);
819 	request_ping(connection);
820 	wait_event(connection->ping_wait,
821 		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
822 }
823 
824 int drbd_resync_finished(struct drbd_device *device)
825 {
826 	unsigned long db, dt, dbdt;
827 	unsigned long n_oos;
828 	union drbd_state os, ns;
829 	struct drbd_device_work *dw;
830 	char *khelper_cmd = NULL;
831 	int verify_done = 0;
832 
833 	/* Remove all elements from the resync LRU. Since future actions
834 	 * might set bits in the (main) bitmap, then the entries in the
835 	 * resync LRU would be wrong. */
836 	if (drbd_rs_del_all(device)) {
837 		/* In case this is not possible now, most probably because
838 		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
839 		 * queue (or even the read operations for those packets
840 		 * is not finished by now).   Retry in 100ms. */
841 
842 		schedule_timeout_interruptible(HZ / 10);
843 		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
844 		if (dw) {
845 			dw->w.cb = w_resync_finished;
846 			dw->device = device;
847 			drbd_queue_work(&first_peer_device(device)->connection->sender_work,
848 					&dw->w);
849 			return 1;
850 		}
851 		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
852 	}
853 
854 	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
855 	if (dt <= 0)
856 		dt = 1;
857 
858 	db = device->rs_total;
859 	/* adjust for verify start and stop sectors, respective reached position */
860 	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
861 		db -= device->ov_left;
862 
863 	dbdt = Bit2KB(db/dt);
864 	device->rs_paused /= HZ;
865 
866 	if (!get_ldev(device))
867 		goto out;
868 
869 	ping_peer(device);
870 
871 	spin_lock_irq(&device->resource->req_lock);
872 	os = drbd_read_state(device);
873 
874 	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
875 
876 	/* This protects us against multiple calls (that can happen in the presence
877 	   of application IO), and against connectivity loss just before we arrive here. */
878 	if (os.conn <= C_CONNECTED)
879 		goto out_unlock;
880 
881 	ns = os;
882 	ns.conn = C_CONNECTED;
883 
884 	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
885 	     verify_done ? "Online verify" : "Resync",
886 	     dt + device->rs_paused, device->rs_paused, dbdt);
887 
888 	n_oos = drbd_bm_total_weight(device);
889 
890 	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
891 		if (n_oos) {
892 			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
893 			      n_oos, Bit2KB(1));
894 			khelper_cmd = "out-of-sync";
895 		}
896 	} else {
897 		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
898 
899 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
900 			khelper_cmd = "after-resync-target";
901 
902 		if (device->use_csums && device->rs_total) {
903 			const unsigned long s = device->rs_same_csum;
904 			const unsigned long t = device->rs_total;
905 			const int ratio =
906 				(t == 0)     ? 0 :
907 			(t < 100000) ? ((s*100)/t) : (s/(t/100));
908 			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
909 			     "transferred %luK total %luK\n",
910 			     ratio,
911 			     Bit2KB(device->rs_same_csum),
912 			     Bit2KB(device->rs_total - device->rs_same_csum),
913 			     Bit2KB(device->rs_total));
914 		}
915 	}
916 
917 	if (device->rs_failed) {
918 		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
919 
920 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
921 			ns.disk = D_INCONSISTENT;
922 			ns.pdsk = D_UP_TO_DATE;
923 		} else {
924 			ns.disk = D_UP_TO_DATE;
925 			ns.pdsk = D_INCONSISTENT;
926 		}
927 	} else {
928 		ns.disk = D_UP_TO_DATE;
929 		ns.pdsk = D_UP_TO_DATE;
930 
931 		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
932 			if (device->p_uuid) {
933 				int i;
934 				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
935 					_drbd_uuid_set(device, i, device->p_uuid[i]);
936 				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
937 				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
938 			} else {
939 				drbd_err(device, "device->p_uuid is NULL! BUG\n");
940 			}
941 		}
942 
943 		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
944 			/* for verify runs, we don't update uuids here,
945 			 * so there would be nothing to report. */
946 			drbd_uuid_set_bm(device, 0UL);
947 			drbd_print_uuids(device, "updated UUIDs");
948 			if (device->p_uuid) {
949 				/* Now the two UUID sets are equal, update what we
950 				 * know of the peer. */
951 				int i;
952 				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
953 					device->p_uuid[i] = device->ldev->md.uuid[i];
954 			}
955 		}
956 	}
957 
958 	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
959 out_unlock:
960 	spin_unlock_irq(&device->resource->req_lock);
961 	put_ldev(device);
962 out:
963 	device->rs_total  = 0;
964 	device->rs_failed = 0;
965 	device->rs_paused = 0;
966 
967 	/* reset start sector, if we reached end of device */
968 	if (verify_done && device->ov_left == 0)
969 		device->ov_start_sector = 0;
970 
971 	drbd_md_sync(device);
972 
973 	if (khelper_cmd)
974 		drbd_khelper(device, khelper_cmd);
975 
976 	return 1;
977 }
978 
979 /* helper */
980 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
981 {
982 	if (drbd_peer_req_has_active_page(peer_req)) {
983 		/* This might happen if sendpage() has not finished */
984 		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
985 		atomic_add(i, &device->pp_in_use_by_net);
986 		atomic_sub(i, &device->pp_in_use);
987 		spin_lock_irq(&device->resource->req_lock);
988 		list_add_tail(&peer_req->w.list, &device->net_ee);
989 		spin_unlock_irq(&device->resource->req_lock);
990 		wake_up(&drbd_pp_wait);
991 	} else
992 		drbd_free_peer_req(device, peer_req);
993 }
994 
995 /**
996  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
997  * @device:	DRBD device.
998  * @w:		work object.
999  * @cancel:	The connection will be closed anyways
1000  */
1001 int w_e_end_data_req(struct drbd_work *w, int cancel)
1002 {
1003 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1004 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1005 	struct drbd_device *device = peer_device->device;
1006 	int err;
1007 
1008 	if (unlikely(cancel)) {
1009 		drbd_free_peer_req(device, peer_req);
1010 		dec_unacked(device);
1011 		return 0;
1012 	}
1013 
1014 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1015 		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1016 	} else {
1017 		if (__ratelimit(&drbd_ratelimit_state))
1018 			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1019 			    (unsigned long long)peer_req->i.sector);
1020 
1021 		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1022 	}
1023 
1024 	dec_unacked(device);
1025 
1026 	move_to_net_ee_or_free(device, peer_req);
1027 
1028 	if (unlikely(err))
1029 		drbd_err(device, "drbd_send_block() failed\n");
1030 	return err;
1031 }
1032 
1033 /**
1034  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1035  * @w:		work object.
1036  * @cancel:	The connection will be closed anyways
1037  */
1038 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1039 {
1040 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1041 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1042 	struct drbd_device *device = peer_device->device;
1043 	int err;
1044 
1045 	if (unlikely(cancel)) {
1046 		drbd_free_peer_req(device, peer_req);
1047 		dec_unacked(device);
1048 		return 0;
1049 	}
1050 
1051 	if (get_ldev_if_state(device, D_FAILED)) {
1052 		drbd_rs_complete_io(device, peer_req->i.sector);
1053 		put_ldev(device);
1054 	}
1055 
1056 	if (device->state.conn == C_AHEAD) {
1057 		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1058 	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1059 		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1060 			inc_rs_pending(device);
1061 			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1062 		} else {
1063 			if (__ratelimit(&drbd_ratelimit_state))
1064 				drbd_err(device, "Not sending RSDataReply, "
1065 				    "partner DISKLESS!\n");
1066 			err = 0;
1067 		}
1068 	} else {
1069 		if (__ratelimit(&drbd_ratelimit_state))
1070 			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1071 			    (unsigned long long)peer_req->i.sector);
1072 
1073 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1074 
1075 		/* update resync data with failure */
1076 		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1077 	}
1078 
1079 	dec_unacked(device);
1080 
1081 	move_to_net_ee_or_free(device, peer_req);
1082 
1083 	if (unlikely(err))
1084 		drbd_err(device, "drbd_send_block() failed\n");
1085 	return err;
1086 }
1087 
1088 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1089 {
1090 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1091 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1092 	struct drbd_device *device = peer_device->device;
1093 	struct digest_info *di;
1094 	int digest_size;
1095 	void *digest = NULL;
1096 	int err, eq = 0;
1097 
1098 	if (unlikely(cancel)) {
1099 		drbd_free_peer_req(device, peer_req);
1100 		dec_unacked(device);
1101 		return 0;
1102 	}
1103 
1104 	if (get_ldev(device)) {
1105 		drbd_rs_complete_io(device, peer_req->i.sector);
1106 		put_ldev(device);
1107 	}
1108 
1109 	di = peer_req->digest;
1110 
1111 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1112 		/* quick hack to try to avoid a race against reconfiguration.
1113 		 * a real fix would be much more involved,
1114 		 * introducing more locking mechanisms */
1115 		if (peer_device->connection->csums_tfm) {
1116 			digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1117 			D_ASSERT(device, digest_size == di->digest_size);
1118 			digest = kmalloc(digest_size, GFP_NOIO);
1119 		}
1120 		if (digest) {
1121 			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1122 			eq = !memcmp(digest, di->digest, digest_size);
1123 			kfree(digest);
1124 		}
1125 
1126 		if (eq) {
1127 			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1128 			/* rs_same_csums unit is BM_BLOCK_SIZE */
1129 			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1130 			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1131 		} else {
1132 			inc_rs_pending(device);
1133 			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1134 			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1135 			kfree(di);
1136 			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1137 		}
1138 	} else {
1139 		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1140 		if (__ratelimit(&drbd_ratelimit_state))
1141 			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1142 	}
1143 
1144 	dec_unacked(device);
1145 	move_to_net_ee_or_free(device, peer_req);
1146 
1147 	if (unlikely(err))
1148 		drbd_err(device, "drbd_send_block/ack() failed\n");
1149 	return err;
1150 }
1151 
1152 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1153 {
1154 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1155 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1156 	struct drbd_device *device = peer_device->device;
1157 	sector_t sector = peer_req->i.sector;
1158 	unsigned int size = peer_req->i.size;
1159 	int digest_size;
1160 	void *digest;
1161 	int err = 0;
1162 
1163 	if (unlikely(cancel))
1164 		goto out;
1165 
1166 	digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1167 	digest = kmalloc(digest_size, GFP_NOIO);
1168 	if (!digest) {
1169 		err = 1;	/* terminate the connection in case the allocation failed */
1170 		goto out;
1171 	}
1172 
1173 	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1174 		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1175 	else
1176 		memset(digest, 0, digest_size);
1177 
1178 	/* Free e and pages before send.
1179 	 * In case we block on congestion, we could otherwise run into
1180 	 * some distributed deadlock, if the other side blocks on
1181 	 * congestion as well, because our receiver blocks in
1182 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1183 	drbd_free_peer_req(device, peer_req);
1184 	peer_req = NULL;
1185 	inc_rs_pending(device);
1186 	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1187 	if (err)
1188 		dec_rs_pending(device);
1189 	kfree(digest);
1190 
1191 out:
1192 	if (peer_req)
1193 		drbd_free_peer_req(device, peer_req);
1194 	dec_unacked(device);
1195 	return err;
1196 }
1197 
1198 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1199 {
1200 	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1201 		device->ov_last_oos_size += size>>9;
1202 	} else {
1203 		device->ov_last_oos_start = sector;
1204 		device->ov_last_oos_size = size>>9;
1205 	}
1206 	drbd_set_out_of_sync(device, sector, size);
1207 }
1208 
1209 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1210 {
1211 	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1212 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1213 	struct drbd_device *device = peer_device->device;
1214 	struct digest_info *di;
1215 	void *digest;
1216 	sector_t sector = peer_req->i.sector;
1217 	unsigned int size = peer_req->i.size;
1218 	int digest_size;
1219 	int err, eq = 0;
1220 	bool stop_sector_reached = false;
1221 
1222 	if (unlikely(cancel)) {
1223 		drbd_free_peer_req(device, peer_req);
1224 		dec_unacked(device);
1225 		return 0;
1226 	}
1227 
1228 	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1229 	 * the resync lru has been cleaned up already */
1230 	if (get_ldev(device)) {
1231 		drbd_rs_complete_io(device, peer_req->i.sector);
1232 		put_ldev(device);
1233 	}
1234 
1235 	di = peer_req->digest;
1236 
1237 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1238 		digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1239 		digest = kmalloc(digest_size, GFP_NOIO);
1240 		if (digest) {
1241 			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1242 
1243 			D_ASSERT(device, digest_size == di->digest_size);
1244 			eq = !memcmp(digest, di->digest, digest_size);
1245 			kfree(digest);
1246 		}
1247 	}
1248 
1249 	/* Free peer_req and pages before send.
1250 	 * In case we block on congestion, we could otherwise run into
1251 	 * some distributed deadlock, if the other side blocks on
1252 	 * congestion as well, because our receiver blocks in
1253 	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1254 	drbd_free_peer_req(device, peer_req);
1255 	if (!eq)
1256 		drbd_ov_out_of_sync_found(device, sector, size);
1257 	else
1258 		ov_out_of_sync_print(device);
1259 
1260 	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1261 			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1262 
1263 	dec_unacked(device);
1264 
1265 	--device->ov_left;
1266 
1267 	/* let's advance progress step marks only for every other megabyte */
1268 	if ((device->ov_left & 0x200) == 0x200)
1269 		drbd_advance_rs_marks(device, device->ov_left);
1270 
1271 	stop_sector_reached = verify_can_do_stop_sector(device) &&
1272 		(sector + (size>>9)) >= device->ov_stop_sector;
1273 
1274 	if (device->ov_left == 0 || stop_sector_reached) {
1275 		ov_out_of_sync_print(device);
1276 		drbd_resync_finished(device);
1277 	}
1278 
1279 	return err;
1280 }
1281 
1282 /* FIXME
1283  * We need to track the number of pending barrier acks,
1284  * and to be able to wait for them.
1285  * See also comment in drbd_adm_attach before drbd_suspend_io.
1286  */
1287 static int drbd_send_barrier(struct drbd_connection *connection)
1288 {
1289 	struct p_barrier *p;
1290 	struct drbd_socket *sock;
1291 
1292 	sock = &connection->data;
1293 	p = conn_prepare_command(connection, sock);
1294 	if (!p)
1295 		return -EIO;
1296 	p->barrier = connection->send.current_epoch_nr;
1297 	p->pad = 0;
1298 	connection->send.current_epoch_writes = 0;
1299 	connection->send.last_sent_barrier_jif = jiffies;
1300 
1301 	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1302 }
1303 
1304 int w_send_write_hint(struct drbd_work *w, int cancel)
1305 {
1306 	struct drbd_device *device =
1307 		container_of(w, struct drbd_device, unplug_work);
1308 	struct drbd_socket *sock;
1309 
1310 	if (cancel)
1311 		return 0;
1312 	sock = &first_peer_device(device)->connection->data;
1313 	if (!drbd_prepare_command(first_peer_device(device), sock))
1314 		return -EIO;
1315 	return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1316 }
1317 
1318 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1319 {
1320 	if (!connection->send.seen_any_write_yet) {
1321 		connection->send.seen_any_write_yet = true;
1322 		connection->send.current_epoch_nr = epoch;
1323 		connection->send.current_epoch_writes = 0;
1324 		connection->send.last_sent_barrier_jif = jiffies;
1325 	}
1326 }
1327 
1328 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1329 {
1330 	/* re-init if first write on this connection */
1331 	if (!connection->send.seen_any_write_yet)
1332 		return;
1333 	if (connection->send.current_epoch_nr != epoch) {
1334 		if (connection->send.current_epoch_writes)
1335 			drbd_send_barrier(connection);
1336 		connection->send.current_epoch_nr = epoch;
1337 	}
1338 }
1339 
1340 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1341 {
1342 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1343 	struct drbd_device *device = req->device;
1344 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1345 	struct drbd_connection *const connection = peer_device->connection;
1346 	int err;
1347 
1348 	if (unlikely(cancel)) {
1349 		req_mod(req, SEND_CANCELED);
1350 		return 0;
1351 	}
1352 	req->pre_send_jif = jiffies;
1353 
1354 	/* this time, no connection->send.current_epoch_writes++;
1355 	 * If it was sent, it was the closing barrier for the last
1356 	 * replicated epoch, before we went into AHEAD mode.
1357 	 * No more barriers will be sent, until we leave AHEAD mode again. */
1358 	maybe_send_barrier(connection, req->epoch);
1359 
1360 	err = drbd_send_out_of_sync(peer_device, req);
1361 	req_mod(req, OOS_HANDED_TO_NETWORK);
1362 
1363 	return err;
1364 }
1365 
1366 /**
1367  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1368  * @w:		work object.
1369  * @cancel:	The connection will be closed anyways
1370  */
1371 int w_send_dblock(struct drbd_work *w, int cancel)
1372 {
1373 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1374 	struct drbd_device *device = req->device;
1375 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1376 	struct drbd_connection *connection = peer_device->connection;
1377 	int err;
1378 
1379 	if (unlikely(cancel)) {
1380 		req_mod(req, SEND_CANCELED);
1381 		return 0;
1382 	}
1383 	req->pre_send_jif = jiffies;
1384 
1385 	re_init_if_first_write(connection, req->epoch);
1386 	maybe_send_barrier(connection, req->epoch);
1387 	connection->send.current_epoch_writes++;
1388 
1389 	err = drbd_send_dblock(peer_device, req);
1390 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1391 
1392 	return err;
1393 }
1394 
1395 /**
1396  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1397  * @w:		work object.
1398  * @cancel:	The connection will be closed anyways
1399  */
1400 int w_send_read_req(struct drbd_work *w, int cancel)
1401 {
1402 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1403 	struct drbd_device *device = req->device;
1404 	struct drbd_peer_device *const peer_device = first_peer_device(device);
1405 	struct drbd_connection *connection = peer_device->connection;
1406 	int err;
1407 
1408 	if (unlikely(cancel)) {
1409 		req_mod(req, SEND_CANCELED);
1410 		return 0;
1411 	}
1412 	req->pre_send_jif = jiffies;
1413 
1414 	/* Even read requests may close a write epoch,
1415 	 * if there was any yet. */
1416 	maybe_send_barrier(connection, req->epoch);
1417 
1418 	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1419 				 (unsigned long)req);
1420 
1421 	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1422 
1423 	return err;
1424 }
1425 
1426 int w_restart_disk_io(struct drbd_work *w, int cancel)
1427 {
1428 	struct drbd_request *req = container_of(w, struct drbd_request, w);
1429 	struct drbd_device *device = req->device;
1430 
1431 	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1432 		drbd_al_begin_io(device, &req->i);
1433 
1434 	drbd_req_make_private_bio(req, req->master_bio);
1435 	req->private_bio->bi_bdev = device->ldev->backing_bdev;
1436 	generic_make_request(req->private_bio);
1437 
1438 	return 0;
1439 }
1440 
1441 static int _drbd_may_sync_now(struct drbd_device *device)
1442 {
1443 	struct drbd_device *odev = device;
1444 	int resync_after;
1445 
1446 	while (1) {
1447 		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1448 			return 1;
1449 		rcu_read_lock();
1450 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1451 		rcu_read_unlock();
1452 		if (resync_after == -1)
1453 			return 1;
1454 		odev = minor_to_device(resync_after);
1455 		if (!odev)
1456 			return 1;
1457 		if ((odev->state.conn >= C_SYNC_SOURCE &&
1458 		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1459 		    odev->state.aftr_isp || odev->state.peer_isp ||
1460 		    odev->state.user_isp)
1461 			return 0;
1462 	}
1463 }
1464 
1465 /**
1466  * drbd_pause_after() - Pause resync on all devices that may not resync now
1467  * @device:	DRBD device.
1468  *
1469  * Called from process context only (admin command and after_state_ch).
1470  */
1471 static bool drbd_pause_after(struct drbd_device *device)
1472 {
1473 	bool changed = false;
1474 	struct drbd_device *odev;
1475 	int i;
1476 
1477 	rcu_read_lock();
1478 	idr_for_each_entry(&drbd_devices, odev, i) {
1479 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1480 			continue;
1481 		if (!_drbd_may_sync_now(odev) &&
1482 		    _drbd_set_state(_NS(odev, aftr_isp, 1),
1483 				    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1484 			changed = true;
1485 	}
1486 	rcu_read_unlock();
1487 
1488 	return changed;
1489 }
1490 
1491 /**
1492  * drbd_resume_next() - Resume resync on all devices that may resync now
1493  * @device:	DRBD device.
1494  *
1495  * Called from process context only (admin command and worker).
1496  */
1497 static bool drbd_resume_next(struct drbd_device *device)
1498 {
1499 	bool changed = false;
1500 	struct drbd_device *odev;
1501 	int i;
1502 
1503 	rcu_read_lock();
1504 	idr_for_each_entry(&drbd_devices, odev, i) {
1505 		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1506 			continue;
1507 		if (odev->state.aftr_isp) {
1508 			if (_drbd_may_sync_now(odev) &&
1509 			    _drbd_set_state(_NS(odev, aftr_isp, 0),
1510 					    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1511 				changed = true;
1512 		}
1513 	}
1514 	rcu_read_unlock();
1515 	return changed;
1516 }
1517 
1518 void resume_next_sg(struct drbd_device *device)
1519 {
1520 	lock_all_resources();
1521 	drbd_resume_next(device);
1522 	unlock_all_resources();
1523 }
1524 
1525 void suspend_other_sg(struct drbd_device *device)
1526 {
1527 	lock_all_resources();
1528 	drbd_pause_after(device);
1529 	unlock_all_resources();
1530 }
1531 
1532 /* caller must lock_all_resources() */
1533 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1534 {
1535 	struct drbd_device *odev;
1536 	int resync_after;
1537 
1538 	if (o_minor == -1)
1539 		return NO_ERROR;
1540 	if (o_minor < -1 || o_minor > MINORMASK)
1541 		return ERR_RESYNC_AFTER;
1542 
1543 	/* check for loops */
1544 	odev = minor_to_device(o_minor);
1545 	while (1) {
1546 		if (odev == device)
1547 			return ERR_RESYNC_AFTER_CYCLE;
1548 
1549 		/* You are free to depend on diskless, non-existing,
1550 		 * or not yet/no longer existing minors.
1551 		 * We only reject dependency loops.
1552 		 * We cannot follow the dependency chain beyond a detached or
1553 		 * missing minor.
1554 		 */
1555 		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1556 			return NO_ERROR;
1557 
1558 		rcu_read_lock();
1559 		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1560 		rcu_read_unlock();
1561 		/* dependency chain ends here, no cycles. */
1562 		if (resync_after == -1)
1563 			return NO_ERROR;
1564 
1565 		/* follow the dependency chain */
1566 		odev = minor_to_device(resync_after);
1567 	}
1568 }
1569 
1570 /* caller must lock_all_resources() */
1571 void drbd_resync_after_changed(struct drbd_device *device)
1572 {
1573 	int changed;
1574 
1575 	do {
1576 		changed  = drbd_pause_after(device);
1577 		changed |= drbd_resume_next(device);
1578 	} while (changed);
1579 }
1580 
1581 void drbd_rs_controller_reset(struct drbd_device *device)
1582 {
1583 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1584 	struct fifo_buffer *plan;
1585 
1586 	atomic_set(&device->rs_sect_in, 0);
1587 	atomic_set(&device->rs_sect_ev, 0);
1588 	device->rs_in_flight = 0;
1589 	device->rs_last_events =
1590 		(int)part_stat_read(&disk->part0, sectors[0]) +
1591 		(int)part_stat_read(&disk->part0, sectors[1]);
1592 
1593 	/* Updating the RCU protected object in place is necessary since
1594 	   this function gets called from atomic context.
1595 	   It is valid since all other updates also lead to an completely
1596 	   empty fifo */
1597 	rcu_read_lock();
1598 	plan = rcu_dereference(device->rs_plan_s);
1599 	plan->total = 0;
1600 	fifo_set(plan, 0);
1601 	rcu_read_unlock();
1602 }
1603 
1604 void start_resync_timer_fn(unsigned long data)
1605 {
1606 	struct drbd_device *device = (struct drbd_device *) data;
1607 	drbd_device_post_work(device, RS_START);
1608 }
1609 
1610 static void do_start_resync(struct drbd_device *device)
1611 {
1612 	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1613 		drbd_warn(device, "postponing start_resync ...\n");
1614 		device->start_resync_timer.expires = jiffies + HZ/10;
1615 		add_timer(&device->start_resync_timer);
1616 		return;
1617 	}
1618 
1619 	drbd_start_resync(device, C_SYNC_SOURCE);
1620 	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1621 }
1622 
1623 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1624 {
1625 	bool csums_after_crash_only;
1626 	rcu_read_lock();
1627 	csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1628 	rcu_read_unlock();
1629 	return connection->agreed_pro_version >= 89 &&		/* supported? */
1630 		connection->csums_tfm &&			/* configured? */
1631 		(csums_after_crash_only == 0			/* use for each resync? */
1632 		 || test_bit(CRASHED_PRIMARY, &device->flags));	/* or only after Primary crash? */
1633 }
1634 
1635 /**
1636  * drbd_start_resync() - Start the resync process
1637  * @device:	DRBD device.
1638  * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1639  *
1640  * This function might bring you directly into one of the
1641  * C_PAUSED_SYNC_* states.
1642  */
1643 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1644 {
1645 	struct drbd_peer_device *peer_device = first_peer_device(device);
1646 	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1647 	union drbd_state ns;
1648 	int r;
1649 
1650 	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1651 		drbd_err(device, "Resync already running!\n");
1652 		return;
1653 	}
1654 
1655 	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1656 		if (side == C_SYNC_TARGET) {
1657 			/* Since application IO was locked out during C_WF_BITMAP_T and
1658 			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1659 			   we check that we might make the data inconsistent. */
1660 			r = drbd_khelper(device, "before-resync-target");
1661 			r = (r >> 8) & 0xff;
1662 			if (r > 0) {
1663 				drbd_info(device, "before-resync-target handler returned %d, "
1664 					 "dropping connection.\n", r);
1665 				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1666 				return;
1667 			}
1668 		} else /* C_SYNC_SOURCE */ {
1669 			r = drbd_khelper(device, "before-resync-source");
1670 			r = (r >> 8) & 0xff;
1671 			if (r > 0) {
1672 				if (r == 3) {
1673 					drbd_info(device, "before-resync-source handler returned %d, "
1674 						 "ignoring. Old userland tools?", r);
1675 				} else {
1676 					drbd_info(device, "before-resync-source handler returned %d, "
1677 						 "dropping connection.\n", r);
1678 					conn_request_state(connection,
1679 							   NS(conn, C_DISCONNECTING), CS_HARD);
1680 					return;
1681 				}
1682 			}
1683 		}
1684 	}
1685 
1686 	if (current == connection->worker.task) {
1687 		/* The worker should not sleep waiting for state_mutex,
1688 		   that can take long */
1689 		if (!mutex_trylock(device->state_mutex)) {
1690 			set_bit(B_RS_H_DONE, &device->flags);
1691 			device->start_resync_timer.expires = jiffies + HZ/5;
1692 			add_timer(&device->start_resync_timer);
1693 			return;
1694 		}
1695 	} else {
1696 		mutex_lock(device->state_mutex);
1697 	}
1698 
1699 	lock_all_resources();
1700 	clear_bit(B_RS_H_DONE, &device->flags);
1701 	/* Did some connection breakage or IO error race with us? */
1702 	if (device->state.conn < C_CONNECTED
1703 	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1704 		unlock_all_resources();
1705 		goto out;
1706 	}
1707 
1708 	ns = drbd_read_state(device);
1709 
1710 	ns.aftr_isp = !_drbd_may_sync_now(device);
1711 
1712 	ns.conn = side;
1713 
1714 	if (side == C_SYNC_TARGET)
1715 		ns.disk = D_INCONSISTENT;
1716 	else /* side == C_SYNC_SOURCE */
1717 		ns.pdsk = D_INCONSISTENT;
1718 
1719 	r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1720 	ns = drbd_read_state(device);
1721 
1722 	if (ns.conn < C_CONNECTED)
1723 		r = SS_UNKNOWN_ERROR;
1724 
1725 	if (r == SS_SUCCESS) {
1726 		unsigned long tw = drbd_bm_total_weight(device);
1727 		unsigned long now = jiffies;
1728 		int i;
1729 
1730 		device->rs_failed    = 0;
1731 		device->rs_paused    = 0;
1732 		device->rs_same_csum = 0;
1733 		device->rs_last_sect_ev = 0;
1734 		device->rs_total     = tw;
1735 		device->rs_start     = now;
1736 		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1737 			device->rs_mark_left[i] = tw;
1738 			device->rs_mark_time[i] = now;
1739 		}
1740 		drbd_pause_after(device);
1741 		/* Forget potentially stale cached per resync extent bit-counts.
1742 		 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1743 		 * disabled, and know the disk state is ok. */
1744 		spin_lock(&device->al_lock);
1745 		lc_reset(device->resync);
1746 		device->resync_locked = 0;
1747 		device->resync_wenr = LC_FREE;
1748 		spin_unlock(&device->al_lock);
1749 	}
1750 	unlock_all_resources();
1751 
1752 	if (r == SS_SUCCESS) {
1753 		wake_up(&device->al_wait); /* for lc_reset() above */
1754 		/* reset rs_last_bcast when a resync or verify is started,
1755 		 * to deal with potential jiffies wrap. */
1756 		device->rs_last_bcast = jiffies - HZ;
1757 
1758 		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1759 		     drbd_conn_str(ns.conn),
1760 		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1761 		     (unsigned long) device->rs_total);
1762 		if (side == C_SYNC_TARGET) {
1763 			device->bm_resync_fo = 0;
1764 			device->use_csums = use_checksum_based_resync(connection, device);
1765 		} else {
1766 			device->use_csums = 0;
1767 		}
1768 
1769 		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1770 		 * with w_send_oos, or the sync target will get confused as to
1771 		 * how much bits to resync.  We cannot do that always, because for an
1772 		 * empty resync and protocol < 95, we need to do it here, as we call
1773 		 * drbd_resync_finished from here in that case.
1774 		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1775 		 * and from after_state_ch otherwise. */
1776 		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1777 			drbd_gen_and_send_sync_uuid(peer_device);
1778 
1779 		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1780 			/* This still has a race (about when exactly the peers
1781 			 * detect connection loss) that can lead to a full sync
1782 			 * on next handshake. In 8.3.9 we fixed this with explicit
1783 			 * resync-finished notifications, but the fix
1784 			 * introduces a protocol change.  Sleeping for some
1785 			 * time longer than the ping interval + timeout on the
1786 			 * SyncSource, to give the SyncTarget the chance to
1787 			 * detect connection loss, then waiting for a ping
1788 			 * response (implicit in drbd_resync_finished) reduces
1789 			 * the race considerably, but does not solve it. */
1790 			if (side == C_SYNC_SOURCE) {
1791 				struct net_conf *nc;
1792 				int timeo;
1793 
1794 				rcu_read_lock();
1795 				nc = rcu_dereference(connection->net_conf);
1796 				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1797 				rcu_read_unlock();
1798 				schedule_timeout_interruptible(timeo);
1799 			}
1800 			drbd_resync_finished(device);
1801 		}
1802 
1803 		drbd_rs_controller_reset(device);
1804 		/* ns.conn may already be != device->state.conn,
1805 		 * we may have been paused in between, or become paused until
1806 		 * the timer triggers.
1807 		 * No matter, that is handled in resync_timer_fn() */
1808 		if (ns.conn == C_SYNC_TARGET)
1809 			mod_timer(&device->resync_timer, jiffies);
1810 
1811 		drbd_md_sync(device);
1812 	}
1813 	put_ldev(device);
1814 out:
1815 	mutex_unlock(device->state_mutex);
1816 }
1817 
1818 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1819 {
1820 	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1821 	device->rs_last_bcast = jiffies;
1822 
1823 	if (!get_ldev(device))
1824 		return;
1825 
1826 	drbd_bm_write_lazy(device, 0);
1827 	if (resync_done && is_sync_state(device->state.conn))
1828 		drbd_resync_finished(device);
1829 
1830 	drbd_bcast_event(device, &sib);
1831 	/* update timestamp, in case it took a while to write out stuff */
1832 	device->rs_last_bcast = jiffies;
1833 	put_ldev(device);
1834 }
1835 
1836 static void drbd_ldev_destroy(struct drbd_device *device)
1837 {
1838 	lc_destroy(device->resync);
1839 	device->resync = NULL;
1840 	lc_destroy(device->act_log);
1841 	device->act_log = NULL;
1842 
1843 	__acquire(local);
1844 	drbd_backing_dev_free(device, device->ldev);
1845 	device->ldev = NULL;
1846 	__release(local);
1847 
1848 	clear_bit(GOING_DISKLESS, &device->flags);
1849 	wake_up(&device->misc_wait);
1850 }
1851 
1852 static void go_diskless(struct drbd_device *device)
1853 {
1854 	D_ASSERT(device, device->state.disk == D_FAILED);
1855 	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1856 	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1857 	 * the protected members anymore, though, so once put_ldev reaches zero
1858 	 * again, it will be safe to free them. */
1859 
1860 	/* Try to write changed bitmap pages, read errors may have just
1861 	 * set some bits outside the area covered by the activity log.
1862 	 *
1863 	 * If we have an IO error during the bitmap writeout,
1864 	 * we will want a full sync next time, just in case.
1865 	 * (Do we want a specific meta data flag for this?)
1866 	 *
1867 	 * If that does not make it to stable storage either,
1868 	 * we cannot do anything about that anymore.
1869 	 *
1870 	 * We still need to check if both bitmap and ldev are present, we may
1871 	 * end up here after a failed attach, before ldev was even assigned.
1872 	 */
1873 	if (device->bitmap && device->ldev) {
1874 		/* An interrupted resync or similar is allowed to recounts bits
1875 		 * while we detach.
1876 		 * Any modifications would not be expected anymore, though.
1877 		 */
1878 		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1879 					"detach", BM_LOCKED_TEST_ALLOWED)) {
1880 			if (test_bit(WAS_READ_ERROR, &device->flags)) {
1881 				drbd_md_set_flag(device, MDF_FULL_SYNC);
1882 				drbd_md_sync(device);
1883 			}
1884 		}
1885 	}
1886 
1887 	drbd_force_state(device, NS(disk, D_DISKLESS));
1888 }
1889 
1890 static int do_md_sync(struct drbd_device *device)
1891 {
1892 	drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1893 	drbd_md_sync(device);
1894 	return 0;
1895 }
1896 
1897 /* only called from drbd_worker thread, no locking */
1898 void __update_timing_details(
1899 		struct drbd_thread_timing_details *tdp,
1900 		unsigned int *cb_nr,
1901 		void *cb,
1902 		const char *fn, const unsigned int line)
1903 {
1904 	unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1905 	struct drbd_thread_timing_details *td = tdp + i;
1906 
1907 	td->start_jif = jiffies;
1908 	td->cb_addr = cb;
1909 	td->caller_fn = fn;
1910 	td->line = line;
1911 	td->cb_nr = *cb_nr;
1912 
1913 	i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1914 	td = tdp + i;
1915 	memset(td, 0, sizeof(*td));
1916 
1917 	++(*cb_nr);
1918 }
1919 
1920 static void do_device_work(struct drbd_device *device, const unsigned long todo)
1921 {
1922 	if (test_bit(MD_SYNC, &todo))
1923 		do_md_sync(device);
1924 	if (test_bit(RS_DONE, &todo) ||
1925 	    test_bit(RS_PROGRESS, &todo))
1926 		update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1927 	if (test_bit(GO_DISKLESS, &todo))
1928 		go_diskless(device);
1929 	if (test_bit(DESTROY_DISK, &todo))
1930 		drbd_ldev_destroy(device);
1931 	if (test_bit(RS_START, &todo))
1932 		do_start_resync(device);
1933 }
1934 
1935 #define DRBD_DEVICE_WORK_MASK	\
1936 	((1UL << GO_DISKLESS)	\
1937 	|(1UL << DESTROY_DISK)	\
1938 	|(1UL << MD_SYNC)	\
1939 	|(1UL << RS_START)	\
1940 	|(1UL << RS_PROGRESS)	\
1941 	|(1UL << RS_DONE)	\
1942 	)
1943 
1944 static unsigned long get_work_bits(unsigned long *flags)
1945 {
1946 	unsigned long old, new;
1947 	do {
1948 		old = *flags;
1949 		new = old & ~DRBD_DEVICE_WORK_MASK;
1950 	} while (cmpxchg(flags, old, new) != old);
1951 	return old & DRBD_DEVICE_WORK_MASK;
1952 }
1953 
1954 static void do_unqueued_work(struct drbd_connection *connection)
1955 {
1956 	struct drbd_peer_device *peer_device;
1957 	int vnr;
1958 
1959 	rcu_read_lock();
1960 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1961 		struct drbd_device *device = peer_device->device;
1962 		unsigned long todo = get_work_bits(&device->flags);
1963 		if (!todo)
1964 			continue;
1965 
1966 		kref_get(&device->kref);
1967 		rcu_read_unlock();
1968 		do_device_work(device, todo);
1969 		kref_put(&device->kref, drbd_destroy_device);
1970 		rcu_read_lock();
1971 	}
1972 	rcu_read_unlock();
1973 }
1974 
1975 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1976 {
1977 	spin_lock_irq(&queue->q_lock);
1978 	list_splice_tail_init(&queue->q, work_list);
1979 	spin_unlock_irq(&queue->q_lock);
1980 	return !list_empty(work_list);
1981 }
1982 
1983 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1984 {
1985 	DEFINE_WAIT(wait);
1986 	struct net_conf *nc;
1987 	int uncork, cork;
1988 
1989 	dequeue_work_batch(&connection->sender_work, work_list);
1990 	if (!list_empty(work_list))
1991 		return;
1992 
1993 	/* Still nothing to do?
1994 	 * Maybe we still need to close the current epoch,
1995 	 * even if no new requests are queued yet.
1996 	 *
1997 	 * Also, poke TCP, just in case.
1998 	 * Then wait for new work (or signal). */
1999 	rcu_read_lock();
2000 	nc = rcu_dereference(connection->net_conf);
2001 	uncork = nc ? nc->tcp_cork : 0;
2002 	rcu_read_unlock();
2003 	if (uncork) {
2004 		mutex_lock(&connection->data.mutex);
2005 		if (connection->data.socket)
2006 			drbd_tcp_uncork(connection->data.socket);
2007 		mutex_unlock(&connection->data.mutex);
2008 	}
2009 
2010 	for (;;) {
2011 		int send_barrier;
2012 		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2013 		spin_lock_irq(&connection->resource->req_lock);
2014 		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2015 		if (!list_empty(&connection->sender_work.q))
2016 			list_splice_tail_init(&connection->sender_work.q, work_list);
2017 		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2018 		if (!list_empty(work_list) || signal_pending(current)) {
2019 			spin_unlock_irq(&connection->resource->req_lock);
2020 			break;
2021 		}
2022 
2023 		/* We found nothing new to do, no to-be-communicated request,
2024 		 * no other work item.  We may still need to close the last
2025 		 * epoch.  Next incoming request epoch will be connection ->
2026 		 * current transfer log epoch number.  If that is different
2027 		 * from the epoch of the last request we communicated, it is
2028 		 * safe to send the epoch separating barrier now.
2029 		 */
2030 		send_barrier =
2031 			atomic_read(&connection->current_tle_nr) !=
2032 			connection->send.current_epoch_nr;
2033 		spin_unlock_irq(&connection->resource->req_lock);
2034 
2035 		if (send_barrier)
2036 			maybe_send_barrier(connection,
2037 					connection->send.current_epoch_nr + 1);
2038 
2039 		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2040 			break;
2041 
2042 		/* drbd_send() may have called flush_signals() */
2043 		if (get_t_state(&connection->worker) != RUNNING)
2044 			break;
2045 
2046 		schedule();
2047 		/* may be woken up for other things but new work, too,
2048 		 * e.g. if the current epoch got closed.
2049 		 * In which case we send the barrier above. */
2050 	}
2051 	finish_wait(&connection->sender_work.q_wait, &wait);
2052 
2053 	/* someone may have changed the config while we have been waiting above. */
2054 	rcu_read_lock();
2055 	nc = rcu_dereference(connection->net_conf);
2056 	cork = nc ? nc->tcp_cork : 0;
2057 	rcu_read_unlock();
2058 	mutex_lock(&connection->data.mutex);
2059 	if (connection->data.socket) {
2060 		if (cork)
2061 			drbd_tcp_cork(connection->data.socket);
2062 		else if (!uncork)
2063 			drbd_tcp_uncork(connection->data.socket);
2064 	}
2065 	mutex_unlock(&connection->data.mutex);
2066 }
2067 
2068 int drbd_worker(struct drbd_thread *thi)
2069 {
2070 	struct drbd_connection *connection = thi->connection;
2071 	struct drbd_work *w = NULL;
2072 	struct drbd_peer_device *peer_device;
2073 	LIST_HEAD(work_list);
2074 	int vnr;
2075 
2076 	while (get_t_state(thi) == RUNNING) {
2077 		drbd_thread_current_set_cpu(thi);
2078 
2079 		if (list_empty(&work_list)) {
2080 			update_worker_timing_details(connection, wait_for_work);
2081 			wait_for_work(connection, &work_list);
2082 		}
2083 
2084 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2085 			update_worker_timing_details(connection, do_unqueued_work);
2086 			do_unqueued_work(connection);
2087 		}
2088 
2089 		if (signal_pending(current)) {
2090 			flush_signals(current);
2091 			if (get_t_state(thi) == RUNNING) {
2092 				drbd_warn(connection, "Worker got an unexpected signal\n");
2093 				continue;
2094 			}
2095 			break;
2096 		}
2097 
2098 		if (get_t_state(thi) != RUNNING)
2099 			break;
2100 
2101 		if (!list_empty(&work_list)) {
2102 			w = list_first_entry(&work_list, struct drbd_work, list);
2103 			list_del_init(&w->list);
2104 			update_worker_timing_details(connection, w->cb);
2105 			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2106 				continue;
2107 			if (connection->cstate >= C_WF_REPORT_PARAMS)
2108 				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2109 		}
2110 	}
2111 
2112 	do {
2113 		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2114 			update_worker_timing_details(connection, do_unqueued_work);
2115 			do_unqueued_work(connection);
2116 		}
2117 		if (!list_empty(&work_list)) {
2118 			w = list_first_entry(&work_list, struct drbd_work, list);
2119 			list_del_init(&w->list);
2120 			update_worker_timing_details(connection, w->cb);
2121 			w->cb(w, 1);
2122 		} else
2123 			dequeue_work_batch(&connection->sender_work, &work_list);
2124 	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2125 
2126 	rcu_read_lock();
2127 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2128 		struct drbd_device *device = peer_device->device;
2129 		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2130 		kref_get(&device->kref);
2131 		rcu_read_unlock();
2132 		drbd_device_cleanup(device);
2133 		kref_put(&device->kref, drbd_destroy_device);
2134 		rcu_read_lock();
2135 	}
2136 	rcu_read_unlock();
2137 
2138 	return 0;
2139 }
2140