1 /* 2 drbd_worker.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24 */ 25 26 #include <linux/module.h> 27 #include <linux/drbd.h> 28 #include <linux/sched/signal.h> 29 #include <linux/wait.h> 30 #include <linux/mm.h> 31 #include <linux/memcontrol.h> 32 #include <linux/mm_inline.h> 33 #include <linux/slab.h> 34 #include <linux/random.h> 35 #include <linux/string.h> 36 #include <linux/scatterlist.h> 37 38 #include "drbd_int.h" 39 #include "drbd_protocol.h" 40 #include "drbd_req.h" 41 42 static int make_ov_request(struct drbd_device *, int); 43 static int make_resync_request(struct drbd_device *, int); 44 45 /* endio handlers: 46 * drbd_md_endio (defined here) 47 * drbd_request_endio (defined here) 48 * drbd_peer_request_endio (defined here) 49 * drbd_bm_endio (defined in drbd_bitmap.c) 50 * 51 * For all these callbacks, note the following: 52 * The callbacks will be called in irq context by the IDE drivers, 53 * and in Softirqs/Tasklets/BH context by the SCSI drivers. 54 * Try to get the locking right :) 55 * 56 */ 57 58 /* used for synchronous meta data and bitmap IO 59 * submitted by drbd_md_sync_page_io() 60 */ 61 void drbd_md_endio(struct bio *bio) 62 { 63 struct drbd_device *device; 64 65 device = bio->bi_private; 66 device->md_io.error = blk_status_to_errno(bio->bi_status); 67 68 /* special case: drbd_md_read() during drbd_adm_attach() */ 69 if (device->ldev) 70 put_ldev(device); 71 bio_put(bio); 72 73 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able 74 * to timeout on the lower level device, and eventually detach from it. 75 * If this io completion runs after that timeout expired, this 76 * drbd_md_put_buffer() may allow us to finally try and re-attach. 77 * During normal operation, this only puts that extra reference 78 * down to 1 again. 79 * Make sure we first drop the reference, and only then signal 80 * completion, or we may (in drbd_al_read_log()) cycle so fast into the 81 * next drbd_md_sync_page_io(), that we trigger the 82 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there. 83 */ 84 drbd_md_put_buffer(device); 85 device->md_io.done = 1; 86 wake_up(&device->misc_wait); 87 } 88 89 /* reads on behalf of the partner, 90 * "submitted" by the receiver 91 */ 92 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local) 93 { 94 unsigned long flags = 0; 95 struct drbd_peer_device *peer_device = peer_req->peer_device; 96 struct drbd_device *device = peer_device->device; 97 98 spin_lock_irqsave(&device->resource->req_lock, flags); 99 device->read_cnt += peer_req->i.size >> 9; 100 list_del(&peer_req->w.list); 101 if (list_empty(&device->read_ee)) 102 wake_up(&device->ee_wait); 103 if (test_bit(__EE_WAS_ERROR, &peer_req->flags)) 104 __drbd_chk_io_error(device, DRBD_READ_ERROR); 105 spin_unlock_irqrestore(&device->resource->req_lock, flags); 106 107 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w); 108 put_ldev(device); 109 } 110 111 /* writes on behalf of the partner, or resync writes, 112 * "submitted" by the receiver, final stage. */ 113 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local) 114 { 115 unsigned long flags = 0; 116 struct drbd_peer_device *peer_device = peer_req->peer_device; 117 struct drbd_device *device = peer_device->device; 118 struct drbd_connection *connection = peer_device->connection; 119 struct drbd_interval i; 120 int do_wake; 121 u64 block_id; 122 int do_al_complete_io; 123 124 /* after we moved peer_req to done_ee, 125 * we may no longer access it, 126 * it may be freed/reused already! 127 * (as soon as we release the req_lock) */ 128 i = peer_req->i; 129 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO; 130 block_id = peer_req->block_id; 131 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; 132 133 if (peer_req->flags & EE_WAS_ERROR) { 134 /* In protocol != C, we usually do not send write acks. 135 * In case of a write error, send the neg ack anyways. */ 136 if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags)) 137 inc_unacked(device); 138 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size); 139 } 140 141 spin_lock_irqsave(&device->resource->req_lock, flags); 142 device->writ_cnt += peer_req->i.size >> 9; 143 list_move_tail(&peer_req->w.list, &device->done_ee); 144 145 /* 146 * Do not remove from the write_requests tree here: we did not send the 147 * Ack yet and did not wake possibly waiting conflicting requests. 148 * Removed from the tree from "drbd_process_done_ee" within the 149 * appropriate dw.cb (e_end_block/e_end_resync_block) or from 150 * _drbd_clear_done_ee. 151 */ 152 153 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee); 154 155 /* FIXME do we want to detach for failed REQ_OP_DISCARD? 156 * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */ 157 if (peer_req->flags & EE_WAS_ERROR) 158 __drbd_chk_io_error(device, DRBD_WRITE_ERROR); 159 160 if (connection->cstate >= C_WF_REPORT_PARAMS) { 161 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */ 162 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work)) 163 kref_put(&device->kref, drbd_destroy_device); 164 } 165 spin_unlock_irqrestore(&device->resource->req_lock, flags); 166 167 if (block_id == ID_SYNCER) 168 drbd_rs_complete_io(device, i.sector); 169 170 if (do_wake) 171 wake_up(&device->ee_wait); 172 173 if (do_al_complete_io) 174 drbd_al_complete_io(device, &i); 175 176 put_ldev(device); 177 } 178 179 /* writes on behalf of the partner, or resync writes, 180 * "submitted" by the receiver. 181 */ 182 void drbd_peer_request_endio(struct bio *bio) 183 { 184 struct drbd_peer_request *peer_req = bio->bi_private; 185 struct drbd_device *device = peer_req->peer_device->device; 186 bool is_write = bio_data_dir(bio) == WRITE; 187 bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES || 188 bio_op(bio) == REQ_OP_DISCARD; 189 190 if (bio->bi_status && __ratelimit(&drbd_ratelimit_state)) 191 drbd_warn(device, "%s: error=%d s=%llus\n", 192 is_write ? (is_discard ? "discard" : "write") 193 : "read", bio->bi_status, 194 (unsigned long long)peer_req->i.sector); 195 196 if (bio->bi_status) 197 set_bit(__EE_WAS_ERROR, &peer_req->flags); 198 199 bio_put(bio); /* no need for the bio anymore */ 200 if (atomic_dec_and_test(&peer_req->pending_bios)) { 201 if (is_write) 202 drbd_endio_write_sec_final(peer_req); 203 else 204 drbd_endio_read_sec_final(peer_req); 205 } 206 } 207 208 static void 209 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device) 210 { 211 panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n", 212 device->minor, device->resource->name, device->vnr); 213 } 214 215 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request 216 */ 217 void drbd_request_endio(struct bio *bio) 218 { 219 unsigned long flags; 220 struct drbd_request *req = bio->bi_private; 221 struct drbd_device *device = req->device; 222 struct bio_and_error m; 223 enum drbd_req_event what; 224 225 /* If this request was aborted locally before, 226 * but now was completed "successfully", 227 * chances are that this caused arbitrary data corruption. 228 * 229 * "aborting" requests, or force-detaching the disk, is intended for 230 * completely blocked/hung local backing devices which do no longer 231 * complete requests at all, not even do error completions. In this 232 * situation, usually a hard-reset and failover is the only way out. 233 * 234 * By "aborting", basically faking a local error-completion, 235 * we allow for a more graceful swichover by cleanly migrating services. 236 * Still the affected node has to be rebooted "soon". 237 * 238 * By completing these requests, we allow the upper layers to re-use 239 * the associated data pages. 240 * 241 * If later the local backing device "recovers", and now DMAs some data 242 * from disk into the original request pages, in the best case it will 243 * just put random data into unused pages; but typically it will corrupt 244 * meanwhile completely unrelated data, causing all sorts of damage. 245 * 246 * Which means delayed successful completion, 247 * especially for READ requests, 248 * is a reason to panic(). 249 * 250 * We assume that a delayed *error* completion is OK, 251 * though we still will complain noisily about it. 252 */ 253 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) { 254 if (__ratelimit(&drbd_ratelimit_state)) 255 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n"); 256 257 if (!bio->bi_status) 258 drbd_panic_after_delayed_completion_of_aborted_request(device); 259 } 260 261 /* to avoid recursion in __req_mod */ 262 if (unlikely(bio->bi_status)) { 263 switch (bio_op(bio)) { 264 case REQ_OP_WRITE_ZEROES: 265 case REQ_OP_DISCARD: 266 if (bio->bi_status == BLK_STS_NOTSUPP) 267 what = DISCARD_COMPLETED_NOTSUPP; 268 else 269 what = DISCARD_COMPLETED_WITH_ERROR; 270 break; 271 case REQ_OP_READ: 272 if (bio->bi_opf & REQ_RAHEAD) 273 what = READ_AHEAD_COMPLETED_WITH_ERROR; 274 else 275 what = READ_COMPLETED_WITH_ERROR; 276 break; 277 default: 278 what = WRITE_COMPLETED_WITH_ERROR; 279 break; 280 } 281 } else { 282 what = COMPLETED_OK; 283 } 284 285 req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status)); 286 bio_put(bio); 287 288 /* not req_mod(), we need irqsave here! */ 289 spin_lock_irqsave(&device->resource->req_lock, flags); 290 __req_mod(req, what, &m); 291 spin_unlock_irqrestore(&device->resource->req_lock, flags); 292 put_ldev(device); 293 294 if (m.bio) 295 complete_master_bio(device, &m); 296 } 297 298 void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest) 299 { 300 SHASH_DESC_ON_STACK(desc, tfm); 301 struct page *page = peer_req->pages; 302 struct page *tmp; 303 unsigned len; 304 void *src; 305 306 desc->tfm = tfm; 307 308 crypto_shash_init(desc); 309 310 src = kmap_atomic(page); 311 while ((tmp = page_chain_next(page))) { 312 /* all but the last page will be fully used */ 313 crypto_shash_update(desc, src, PAGE_SIZE); 314 kunmap_atomic(src); 315 page = tmp; 316 src = kmap_atomic(page); 317 } 318 /* and now the last, possibly only partially used page */ 319 len = peer_req->i.size & (PAGE_SIZE - 1); 320 crypto_shash_update(desc, src, len ?: PAGE_SIZE); 321 kunmap_atomic(src); 322 323 crypto_shash_final(desc, digest); 324 shash_desc_zero(desc); 325 } 326 327 void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest) 328 { 329 SHASH_DESC_ON_STACK(desc, tfm); 330 struct bio_vec bvec; 331 struct bvec_iter iter; 332 333 desc->tfm = tfm; 334 335 crypto_shash_init(desc); 336 337 bio_for_each_segment(bvec, bio, iter) { 338 u8 *src; 339 340 src = kmap_atomic(bvec.bv_page); 341 crypto_shash_update(desc, src + bvec.bv_offset, bvec.bv_len); 342 kunmap_atomic(src); 343 344 /* REQ_OP_WRITE_SAME has only one segment, 345 * checksum the payload only once. */ 346 if (bio_op(bio) == REQ_OP_WRITE_SAME) 347 break; 348 } 349 crypto_shash_final(desc, digest); 350 shash_desc_zero(desc); 351 } 352 353 /* MAYBE merge common code with w_e_end_ov_req */ 354 static int w_e_send_csum(struct drbd_work *w, int cancel) 355 { 356 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 357 struct drbd_peer_device *peer_device = peer_req->peer_device; 358 struct drbd_device *device = peer_device->device; 359 int digest_size; 360 void *digest; 361 int err = 0; 362 363 if (unlikely(cancel)) 364 goto out; 365 366 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0)) 367 goto out; 368 369 digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm); 370 digest = kmalloc(digest_size, GFP_NOIO); 371 if (digest) { 372 sector_t sector = peer_req->i.sector; 373 unsigned int size = peer_req->i.size; 374 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest); 375 /* Free peer_req and pages before send. 376 * In case we block on congestion, we could otherwise run into 377 * some distributed deadlock, if the other side blocks on 378 * congestion as well, because our receiver blocks in 379 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 380 drbd_free_peer_req(device, peer_req); 381 peer_req = NULL; 382 inc_rs_pending(device); 383 err = drbd_send_drequest_csum(peer_device, sector, size, 384 digest, digest_size, 385 P_CSUM_RS_REQUEST); 386 kfree(digest); 387 } else { 388 drbd_err(device, "kmalloc() of digest failed.\n"); 389 err = -ENOMEM; 390 } 391 392 out: 393 if (peer_req) 394 drbd_free_peer_req(device, peer_req); 395 396 if (unlikely(err)) 397 drbd_err(device, "drbd_send_drequest(..., csum) failed\n"); 398 return err; 399 } 400 401 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 402 403 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size) 404 { 405 struct drbd_device *device = peer_device->device; 406 struct drbd_peer_request *peer_req; 407 408 if (!get_ldev(device)) 409 return -EIO; 410 411 /* GFP_TRY, because if there is no memory available right now, this may 412 * be rescheduled for later. It is "only" background resync, after all. */ 413 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector, 414 size, size, GFP_TRY); 415 if (!peer_req) 416 goto defer; 417 418 peer_req->w.cb = w_e_send_csum; 419 spin_lock_irq(&device->resource->req_lock); 420 list_add_tail(&peer_req->w.list, &device->read_ee); 421 spin_unlock_irq(&device->resource->req_lock); 422 423 atomic_add(size >> 9, &device->rs_sect_ev); 424 if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0, 425 DRBD_FAULT_RS_RD) == 0) 426 return 0; 427 428 /* If it failed because of ENOMEM, retry should help. If it failed 429 * because bio_add_page failed (probably broken lower level driver), 430 * retry may or may not help. 431 * If it does not, you may need to force disconnect. */ 432 spin_lock_irq(&device->resource->req_lock); 433 list_del(&peer_req->w.list); 434 spin_unlock_irq(&device->resource->req_lock); 435 436 drbd_free_peer_req(device, peer_req); 437 defer: 438 put_ldev(device); 439 return -EAGAIN; 440 } 441 442 int w_resync_timer(struct drbd_work *w, int cancel) 443 { 444 struct drbd_device *device = 445 container_of(w, struct drbd_device, resync_work); 446 447 switch (device->state.conn) { 448 case C_VERIFY_S: 449 make_ov_request(device, cancel); 450 break; 451 case C_SYNC_TARGET: 452 make_resync_request(device, cancel); 453 break; 454 } 455 456 return 0; 457 } 458 459 void resync_timer_fn(struct timer_list *t) 460 { 461 struct drbd_device *device = from_timer(device, t, resync_timer); 462 463 drbd_queue_work_if_unqueued( 464 &first_peer_device(device)->connection->sender_work, 465 &device->resync_work); 466 } 467 468 static void fifo_set(struct fifo_buffer *fb, int value) 469 { 470 int i; 471 472 for (i = 0; i < fb->size; i++) 473 fb->values[i] = value; 474 } 475 476 static int fifo_push(struct fifo_buffer *fb, int value) 477 { 478 int ov; 479 480 ov = fb->values[fb->head_index]; 481 fb->values[fb->head_index++] = value; 482 483 if (fb->head_index >= fb->size) 484 fb->head_index = 0; 485 486 return ov; 487 } 488 489 static void fifo_add_val(struct fifo_buffer *fb, int value) 490 { 491 int i; 492 493 for (i = 0; i < fb->size; i++) 494 fb->values[i] += value; 495 } 496 497 struct fifo_buffer *fifo_alloc(int fifo_size) 498 { 499 struct fifo_buffer *fb; 500 501 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO); 502 if (!fb) 503 return NULL; 504 505 fb->head_index = 0; 506 fb->size = fifo_size; 507 fb->total = 0; 508 509 return fb; 510 } 511 512 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in) 513 { 514 struct disk_conf *dc; 515 unsigned int want; /* The number of sectors we want in-flight */ 516 int req_sect; /* Number of sectors to request in this turn */ 517 int correction; /* Number of sectors more we need in-flight */ 518 int cps; /* correction per invocation of drbd_rs_controller() */ 519 int steps; /* Number of time steps to plan ahead */ 520 int curr_corr; 521 int max_sect; 522 struct fifo_buffer *plan; 523 524 dc = rcu_dereference(device->ldev->disk_conf); 525 plan = rcu_dereference(device->rs_plan_s); 526 527 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */ 528 529 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */ 530 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps; 531 } else { /* normal path */ 532 want = dc->c_fill_target ? dc->c_fill_target : 533 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10); 534 } 535 536 correction = want - device->rs_in_flight - plan->total; 537 538 /* Plan ahead */ 539 cps = correction / steps; 540 fifo_add_val(plan, cps); 541 plan->total += cps * steps; 542 543 /* What we do in this step */ 544 curr_corr = fifo_push(plan, 0); 545 plan->total -= curr_corr; 546 547 req_sect = sect_in + curr_corr; 548 if (req_sect < 0) 549 req_sect = 0; 550 551 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ; 552 if (req_sect > max_sect) 553 req_sect = max_sect; 554 555 /* 556 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n", 557 sect_in, device->rs_in_flight, want, correction, 558 steps, cps, device->rs_planed, curr_corr, req_sect); 559 */ 560 561 return req_sect; 562 } 563 564 static int drbd_rs_number_requests(struct drbd_device *device) 565 { 566 unsigned int sect_in; /* Number of sectors that came in since the last turn */ 567 int number, mxb; 568 569 sect_in = atomic_xchg(&device->rs_sect_in, 0); 570 device->rs_in_flight -= sect_in; 571 572 rcu_read_lock(); 573 mxb = drbd_get_max_buffers(device) / 2; 574 if (rcu_dereference(device->rs_plan_s)->size) { 575 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9); 576 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME; 577 } else { 578 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate; 579 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ); 580 } 581 rcu_read_unlock(); 582 583 /* Don't have more than "max-buffers"/2 in-flight. 584 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(), 585 * potentially causing a distributed deadlock on congestion during 586 * online-verify or (checksum-based) resync, if max-buffers, 587 * socket buffer sizes and resync rate settings are mis-configured. */ 588 589 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k), 590 * mxb (as used here, and in drbd_alloc_pages on the peer) is 591 * "number of pages" (typically also 4k), 592 * but "rs_in_flight" is in "sectors" (512 Byte). */ 593 if (mxb - device->rs_in_flight/8 < number) 594 number = mxb - device->rs_in_flight/8; 595 596 return number; 597 } 598 599 static int make_resync_request(struct drbd_device *const device, int cancel) 600 { 601 struct drbd_peer_device *const peer_device = first_peer_device(device); 602 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL; 603 unsigned long bit; 604 sector_t sector; 605 const sector_t capacity = drbd_get_capacity(device->this_bdev); 606 int max_bio_size; 607 int number, rollback_i, size; 608 int align, requeue = 0; 609 int i = 0; 610 int discard_granularity = 0; 611 612 if (unlikely(cancel)) 613 return 0; 614 615 if (device->rs_total == 0) { 616 /* empty resync? */ 617 drbd_resync_finished(device); 618 return 0; 619 } 620 621 if (!get_ldev(device)) { 622 /* Since we only need to access device->rsync a 623 get_ldev_if_state(device,D_FAILED) would be sufficient, but 624 to continue resync with a broken disk makes no sense at 625 all */ 626 drbd_err(device, "Disk broke down during resync!\n"); 627 return 0; 628 } 629 630 if (connection->agreed_features & DRBD_FF_THIN_RESYNC) { 631 rcu_read_lock(); 632 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity; 633 rcu_read_unlock(); 634 } 635 636 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9; 637 number = drbd_rs_number_requests(device); 638 if (number <= 0) 639 goto requeue; 640 641 for (i = 0; i < number; i++) { 642 /* Stop generating RS requests when half of the send buffer is filled, 643 * but notify TCP that we'd like to have more space. */ 644 mutex_lock(&connection->data.mutex); 645 if (connection->data.socket) { 646 struct sock *sk = connection->data.socket->sk; 647 int queued = sk->sk_wmem_queued; 648 int sndbuf = sk->sk_sndbuf; 649 if (queued > sndbuf / 2) { 650 requeue = 1; 651 if (sk->sk_socket) 652 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 653 } 654 } else 655 requeue = 1; 656 mutex_unlock(&connection->data.mutex); 657 if (requeue) 658 goto requeue; 659 660 next_sector: 661 size = BM_BLOCK_SIZE; 662 bit = drbd_bm_find_next(device, device->bm_resync_fo); 663 664 if (bit == DRBD_END_OF_BITMAP) { 665 device->bm_resync_fo = drbd_bm_bits(device); 666 put_ldev(device); 667 return 0; 668 } 669 670 sector = BM_BIT_TO_SECT(bit); 671 672 if (drbd_try_rs_begin_io(device, sector)) { 673 device->bm_resync_fo = bit; 674 goto requeue; 675 } 676 device->bm_resync_fo = bit + 1; 677 678 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) { 679 drbd_rs_complete_io(device, sector); 680 goto next_sector; 681 } 682 683 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE 684 /* try to find some adjacent bits. 685 * we stop if we have already the maximum req size. 686 * 687 * Additionally always align bigger requests, in order to 688 * be prepared for all stripe sizes of software RAIDs. 689 */ 690 align = 1; 691 rollback_i = i; 692 while (i < number) { 693 if (size + BM_BLOCK_SIZE > max_bio_size) 694 break; 695 696 /* Be always aligned */ 697 if (sector & ((1<<(align+3))-1)) 698 break; 699 700 if (discard_granularity && size == discard_granularity) 701 break; 702 703 /* do not cross extent boundaries */ 704 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0) 705 break; 706 /* now, is it actually dirty, after all? 707 * caution, drbd_bm_test_bit is tri-state for some 708 * obscure reason; ( b == 0 ) would get the out-of-band 709 * only accidentally right because of the "oddly sized" 710 * adjustment below */ 711 if (drbd_bm_test_bit(device, bit+1) != 1) 712 break; 713 bit++; 714 size += BM_BLOCK_SIZE; 715 if ((BM_BLOCK_SIZE << align) <= size) 716 align++; 717 i++; 718 } 719 /* if we merged some, 720 * reset the offset to start the next drbd_bm_find_next from */ 721 if (size > BM_BLOCK_SIZE) 722 device->bm_resync_fo = bit + 1; 723 #endif 724 725 /* adjust very last sectors, in case we are oddly sized */ 726 if (sector + (size>>9) > capacity) 727 size = (capacity-sector)<<9; 728 729 if (device->use_csums) { 730 switch (read_for_csum(peer_device, sector, size)) { 731 case -EIO: /* Disk failure */ 732 put_ldev(device); 733 return -EIO; 734 case -EAGAIN: /* allocation failed, or ldev busy */ 735 drbd_rs_complete_io(device, sector); 736 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 737 i = rollback_i; 738 goto requeue; 739 case 0: 740 /* everything ok */ 741 break; 742 default: 743 BUG(); 744 } 745 } else { 746 int err; 747 748 inc_rs_pending(device); 749 err = drbd_send_drequest(peer_device, 750 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST, 751 sector, size, ID_SYNCER); 752 if (err) { 753 drbd_err(device, "drbd_send_drequest() failed, aborting...\n"); 754 dec_rs_pending(device); 755 put_ldev(device); 756 return err; 757 } 758 } 759 } 760 761 if (device->bm_resync_fo >= drbd_bm_bits(device)) { 762 /* last syncer _request_ was sent, 763 * but the P_RS_DATA_REPLY not yet received. sync will end (and 764 * next sync group will resume), as soon as we receive the last 765 * resync data block, and the last bit is cleared. 766 * until then resync "work" is "inactive" ... 767 */ 768 put_ldev(device); 769 return 0; 770 } 771 772 requeue: 773 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 774 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 775 put_ldev(device); 776 return 0; 777 } 778 779 static int make_ov_request(struct drbd_device *device, int cancel) 780 { 781 int number, i, size; 782 sector_t sector; 783 const sector_t capacity = drbd_get_capacity(device->this_bdev); 784 bool stop_sector_reached = false; 785 786 if (unlikely(cancel)) 787 return 1; 788 789 number = drbd_rs_number_requests(device); 790 791 sector = device->ov_position; 792 for (i = 0; i < number; i++) { 793 if (sector >= capacity) 794 return 1; 795 796 /* We check for "finished" only in the reply path: 797 * w_e_end_ov_reply(). 798 * We need to send at least one request out. */ 799 stop_sector_reached = i > 0 800 && verify_can_do_stop_sector(device) 801 && sector >= device->ov_stop_sector; 802 if (stop_sector_reached) 803 break; 804 805 size = BM_BLOCK_SIZE; 806 807 if (drbd_try_rs_begin_io(device, sector)) { 808 device->ov_position = sector; 809 goto requeue; 810 } 811 812 if (sector + (size>>9) > capacity) 813 size = (capacity-sector)<<9; 814 815 inc_rs_pending(device); 816 if (drbd_send_ov_request(first_peer_device(device), sector, size)) { 817 dec_rs_pending(device); 818 return 0; 819 } 820 sector += BM_SECT_PER_BIT; 821 } 822 device->ov_position = sector; 823 824 requeue: 825 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 826 if (i == 0 || !stop_sector_reached) 827 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 828 return 1; 829 } 830 831 int w_ov_finished(struct drbd_work *w, int cancel) 832 { 833 struct drbd_device_work *dw = 834 container_of(w, struct drbd_device_work, w); 835 struct drbd_device *device = dw->device; 836 kfree(dw); 837 ov_out_of_sync_print(device); 838 drbd_resync_finished(device); 839 840 return 0; 841 } 842 843 static int w_resync_finished(struct drbd_work *w, int cancel) 844 { 845 struct drbd_device_work *dw = 846 container_of(w, struct drbd_device_work, w); 847 struct drbd_device *device = dw->device; 848 kfree(dw); 849 850 drbd_resync_finished(device); 851 852 return 0; 853 } 854 855 static void ping_peer(struct drbd_device *device) 856 { 857 struct drbd_connection *connection = first_peer_device(device)->connection; 858 859 clear_bit(GOT_PING_ACK, &connection->flags); 860 request_ping(connection); 861 wait_event(connection->ping_wait, 862 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED); 863 } 864 865 int drbd_resync_finished(struct drbd_device *device) 866 { 867 struct drbd_connection *connection = first_peer_device(device)->connection; 868 unsigned long db, dt, dbdt; 869 unsigned long n_oos; 870 union drbd_state os, ns; 871 struct drbd_device_work *dw; 872 char *khelper_cmd = NULL; 873 int verify_done = 0; 874 875 /* Remove all elements from the resync LRU. Since future actions 876 * might set bits in the (main) bitmap, then the entries in the 877 * resync LRU would be wrong. */ 878 if (drbd_rs_del_all(device)) { 879 /* In case this is not possible now, most probably because 880 * there are P_RS_DATA_REPLY Packets lingering on the worker's 881 * queue (or even the read operations for those packets 882 * is not finished by now). Retry in 100ms. */ 883 884 schedule_timeout_interruptible(HZ / 10); 885 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC); 886 if (dw) { 887 dw->w.cb = w_resync_finished; 888 dw->device = device; 889 drbd_queue_work(&connection->sender_work, &dw->w); 890 return 1; 891 } 892 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n"); 893 } 894 895 dt = (jiffies - device->rs_start - device->rs_paused) / HZ; 896 if (dt <= 0) 897 dt = 1; 898 899 db = device->rs_total; 900 /* adjust for verify start and stop sectors, respective reached position */ 901 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) 902 db -= device->ov_left; 903 904 dbdt = Bit2KB(db/dt); 905 device->rs_paused /= HZ; 906 907 if (!get_ldev(device)) 908 goto out; 909 910 ping_peer(device); 911 912 spin_lock_irq(&device->resource->req_lock); 913 os = drbd_read_state(device); 914 915 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T); 916 917 /* This protects us against multiple calls (that can happen in the presence 918 of application IO), and against connectivity loss just before we arrive here. */ 919 if (os.conn <= C_CONNECTED) 920 goto out_unlock; 921 922 ns = os; 923 ns.conn = C_CONNECTED; 924 925 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n", 926 verify_done ? "Online verify" : "Resync", 927 dt + device->rs_paused, device->rs_paused, dbdt); 928 929 n_oos = drbd_bm_total_weight(device); 930 931 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) { 932 if (n_oos) { 933 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n", 934 n_oos, Bit2KB(1)); 935 khelper_cmd = "out-of-sync"; 936 } 937 } else { 938 D_ASSERT(device, (n_oos - device->rs_failed) == 0); 939 940 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 941 khelper_cmd = "after-resync-target"; 942 943 if (device->use_csums && device->rs_total) { 944 const unsigned long s = device->rs_same_csum; 945 const unsigned long t = device->rs_total; 946 const int ratio = 947 (t == 0) ? 0 : 948 (t < 100000) ? ((s*100)/t) : (s/(t/100)); 949 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; " 950 "transferred %luK total %luK\n", 951 ratio, 952 Bit2KB(device->rs_same_csum), 953 Bit2KB(device->rs_total - device->rs_same_csum), 954 Bit2KB(device->rs_total)); 955 } 956 } 957 958 if (device->rs_failed) { 959 drbd_info(device, " %lu failed blocks\n", device->rs_failed); 960 961 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 962 ns.disk = D_INCONSISTENT; 963 ns.pdsk = D_UP_TO_DATE; 964 } else { 965 ns.disk = D_UP_TO_DATE; 966 ns.pdsk = D_INCONSISTENT; 967 } 968 } else { 969 ns.disk = D_UP_TO_DATE; 970 ns.pdsk = D_UP_TO_DATE; 971 972 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 973 if (device->p_uuid) { 974 int i; 975 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++) 976 _drbd_uuid_set(device, i, device->p_uuid[i]); 977 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]); 978 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]); 979 } else { 980 drbd_err(device, "device->p_uuid is NULL! BUG\n"); 981 } 982 } 983 984 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) { 985 /* for verify runs, we don't update uuids here, 986 * so there would be nothing to report. */ 987 drbd_uuid_set_bm(device, 0UL); 988 drbd_print_uuids(device, "updated UUIDs"); 989 if (device->p_uuid) { 990 /* Now the two UUID sets are equal, update what we 991 * know of the peer. */ 992 int i; 993 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++) 994 device->p_uuid[i] = device->ldev->md.uuid[i]; 995 } 996 } 997 } 998 999 _drbd_set_state(device, ns, CS_VERBOSE, NULL); 1000 out_unlock: 1001 spin_unlock_irq(&device->resource->req_lock); 1002 1003 /* If we have been sync source, and have an effective fencing-policy, 1004 * once *all* volumes are back in sync, call "unfence". */ 1005 if (os.conn == C_SYNC_SOURCE) { 1006 enum drbd_disk_state disk_state = D_MASK; 1007 enum drbd_disk_state pdsk_state = D_MASK; 1008 enum drbd_fencing_p fp = FP_DONT_CARE; 1009 1010 rcu_read_lock(); 1011 fp = rcu_dereference(device->ldev->disk_conf)->fencing; 1012 if (fp != FP_DONT_CARE) { 1013 struct drbd_peer_device *peer_device; 1014 int vnr; 1015 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1016 struct drbd_device *device = peer_device->device; 1017 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk); 1018 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk); 1019 } 1020 } 1021 rcu_read_unlock(); 1022 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE) 1023 conn_khelper(connection, "unfence-peer"); 1024 } 1025 1026 put_ldev(device); 1027 out: 1028 device->rs_total = 0; 1029 device->rs_failed = 0; 1030 device->rs_paused = 0; 1031 1032 /* reset start sector, if we reached end of device */ 1033 if (verify_done && device->ov_left == 0) 1034 device->ov_start_sector = 0; 1035 1036 drbd_md_sync(device); 1037 1038 if (khelper_cmd) 1039 drbd_khelper(device, khelper_cmd); 1040 1041 return 1; 1042 } 1043 1044 /* helper */ 1045 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req) 1046 { 1047 if (drbd_peer_req_has_active_page(peer_req)) { 1048 /* This might happen if sendpage() has not finished */ 1049 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT; 1050 atomic_add(i, &device->pp_in_use_by_net); 1051 atomic_sub(i, &device->pp_in_use); 1052 spin_lock_irq(&device->resource->req_lock); 1053 list_add_tail(&peer_req->w.list, &device->net_ee); 1054 spin_unlock_irq(&device->resource->req_lock); 1055 wake_up(&drbd_pp_wait); 1056 } else 1057 drbd_free_peer_req(device, peer_req); 1058 } 1059 1060 /** 1061 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 1062 * @w: work object. 1063 * @cancel: The connection will be closed anyways 1064 */ 1065 int w_e_end_data_req(struct drbd_work *w, int cancel) 1066 { 1067 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1068 struct drbd_peer_device *peer_device = peer_req->peer_device; 1069 struct drbd_device *device = peer_device->device; 1070 int err; 1071 1072 if (unlikely(cancel)) { 1073 drbd_free_peer_req(device, peer_req); 1074 dec_unacked(device); 1075 return 0; 1076 } 1077 1078 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1079 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req); 1080 } else { 1081 if (__ratelimit(&drbd_ratelimit_state)) 1082 drbd_err(device, "Sending NegDReply. sector=%llus.\n", 1083 (unsigned long long)peer_req->i.sector); 1084 1085 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req); 1086 } 1087 1088 dec_unacked(device); 1089 1090 move_to_net_ee_or_free(device, peer_req); 1091 1092 if (unlikely(err)) 1093 drbd_err(device, "drbd_send_block() failed\n"); 1094 return err; 1095 } 1096 1097 static bool all_zero(struct drbd_peer_request *peer_req) 1098 { 1099 struct page *page = peer_req->pages; 1100 unsigned int len = peer_req->i.size; 1101 1102 page_chain_for_each(page) { 1103 unsigned int l = min_t(unsigned int, len, PAGE_SIZE); 1104 unsigned int i, words = l / sizeof(long); 1105 unsigned long *d; 1106 1107 d = kmap_atomic(page); 1108 for (i = 0; i < words; i++) { 1109 if (d[i]) { 1110 kunmap_atomic(d); 1111 return false; 1112 } 1113 } 1114 kunmap_atomic(d); 1115 len -= l; 1116 } 1117 1118 return true; 1119 } 1120 1121 /** 1122 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST 1123 * @w: work object. 1124 * @cancel: The connection will be closed anyways 1125 */ 1126 int w_e_end_rsdata_req(struct drbd_work *w, int cancel) 1127 { 1128 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1129 struct drbd_peer_device *peer_device = peer_req->peer_device; 1130 struct drbd_device *device = peer_device->device; 1131 int err; 1132 1133 if (unlikely(cancel)) { 1134 drbd_free_peer_req(device, peer_req); 1135 dec_unacked(device); 1136 return 0; 1137 } 1138 1139 if (get_ldev_if_state(device, D_FAILED)) { 1140 drbd_rs_complete_io(device, peer_req->i.sector); 1141 put_ldev(device); 1142 } 1143 1144 if (device->state.conn == C_AHEAD) { 1145 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req); 1146 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1147 if (likely(device->state.pdsk >= D_INCONSISTENT)) { 1148 inc_rs_pending(device); 1149 if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req)) 1150 err = drbd_send_rs_deallocated(peer_device, peer_req); 1151 else 1152 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req); 1153 } else { 1154 if (__ratelimit(&drbd_ratelimit_state)) 1155 drbd_err(device, "Not sending RSDataReply, " 1156 "partner DISKLESS!\n"); 1157 err = 0; 1158 } 1159 } else { 1160 if (__ratelimit(&drbd_ratelimit_state)) 1161 drbd_err(device, "Sending NegRSDReply. sector %llus.\n", 1162 (unsigned long long)peer_req->i.sector); 1163 1164 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req); 1165 1166 /* update resync data with failure */ 1167 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size); 1168 } 1169 1170 dec_unacked(device); 1171 1172 move_to_net_ee_or_free(device, peer_req); 1173 1174 if (unlikely(err)) 1175 drbd_err(device, "drbd_send_block() failed\n"); 1176 return err; 1177 } 1178 1179 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel) 1180 { 1181 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1182 struct drbd_peer_device *peer_device = peer_req->peer_device; 1183 struct drbd_device *device = peer_device->device; 1184 struct digest_info *di; 1185 int digest_size; 1186 void *digest = NULL; 1187 int err, eq = 0; 1188 1189 if (unlikely(cancel)) { 1190 drbd_free_peer_req(device, peer_req); 1191 dec_unacked(device); 1192 return 0; 1193 } 1194 1195 if (get_ldev(device)) { 1196 drbd_rs_complete_io(device, peer_req->i.sector); 1197 put_ldev(device); 1198 } 1199 1200 di = peer_req->digest; 1201 1202 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1203 /* quick hack to try to avoid a race against reconfiguration. 1204 * a real fix would be much more involved, 1205 * introducing more locking mechanisms */ 1206 if (peer_device->connection->csums_tfm) { 1207 digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm); 1208 D_ASSERT(device, digest_size == di->digest_size); 1209 digest = kmalloc(digest_size, GFP_NOIO); 1210 } 1211 if (digest) { 1212 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest); 1213 eq = !memcmp(digest, di->digest, digest_size); 1214 kfree(digest); 1215 } 1216 1217 if (eq) { 1218 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size); 1219 /* rs_same_csums unit is BM_BLOCK_SIZE */ 1220 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT; 1221 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req); 1222 } else { 1223 inc_rs_pending(device); 1224 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */ 1225 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */ 1226 kfree(di); 1227 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req); 1228 } 1229 } else { 1230 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req); 1231 if (__ratelimit(&drbd_ratelimit_state)) 1232 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n"); 1233 } 1234 1235 dec_unacked(device); 1236 move_to_net_ee_or_free(device, peer_req); 1237 1238 if (unlikely(err)) 1239 drbd_err(device, "drbd_send_block/ack() failed\n"); 1240 return err; 1241 } 1242 1243 int w_e_end_ov_req(struct drbd_work *w, int cancel) 1244 { 1245 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1246 struct drbd_peer_device *peer_device = peer_req->peer_device; 1247 struct drbd_device *device = peer_device->device; 1248 sector_t sector = peer_req->i.sector; 1249 unsigned int size = peer_req->i.size; 1250 int digest_size; 1251 void *digest; 1252 int err = 0; 1253 1254 if (unlikely(cancel)) 1255 goto out; 1256 1257 digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm); 1258 digest = kmalloc(digest_size, GFP_NOIO); 1259 if (!digest) { 1260 err = 1; /* terminate the connection in case the allocation failed */ 1261 goto out; 1262 } 1263 1264 if (likely(!(peer_req->flags & EE_WAS_ERROR))) 1265 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest); 1266 else 1267 memset(digest, 0, digest_size); 1268 1269 /* Free e and pages before send. 1270 * In case we block on congestion, we could otherwise run into 1271 * some distributed deadlock, if the other side blocks on 1272 * congestion as well, because our receiver blocks in 1273 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1274 drbd_free_peer_req(device, peer_req); 1275 peer_req = NULL; 1276 inc_rs_pending(device); 1277 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY); 1278 if (err) 1279 dec_rs_pending(device); 1280 kfree(digest); 1281 1282 out: 1283 if (peer_req) 1284 drbd_free_peer_req(device, peer_req); 1285 dec_unacked(device); 1286 return err; 1287 } 1288 1289 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size) 1290 { 1291 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) { 1292 device->ov_last_oos_size += size>>9; 1293 } else { 1294 device->ov_last_oos_start = sector; 1295 device->ov_last_oos_size = size>>9; 1296 } 1297 drbd_set_out_of_sync(device, sector, size); 1298 } 1299 1300 int w_e_end_ov_reply(struct drbd_work *w, int cancel) 1301 { 1302 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1303 struct drbd_peer_device *peer_device = peer_req->peer_device; 1304 struct drbd_device *device = peer_device->device; 1305 struct digest_info *di; 1306 void *digest; 1307 sector_t sector = peer_req->i.sector; 1308 unsigned int size = peer_req->i.size; 1309 int digest_size; 1310 int err, eq = 0; 1311 bool stop_sector_reached = false; 1312 1313 if (unlikely(cancel)) { 1314 drbd_free_peer_req(device, peer_req); 1315 dec_unacked(device); 1316 return 0; 1317 } 1318 1319 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all 1320 * the resync lru has been cleaned up already */ 1321 if (get_ldev(device)) { 1322 drbd_rs_complete_io(device, peer_req->i.sector); 1323 put_ldev(device); 1324 } 1325 1326 di = peer_req->digest; 1327 1328 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1329 digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm); 1330 digest = kmalloc(digest_size, GFP_NOIO); 1331 if (digest) { 1332 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest); 1333 1334 D_ASSERT(device, digest_size == di->digest_size); 1335 eq = !memcmp(digest, di->digest, digest_size); 1336 kfree(digest); 1337 } 1338 } 1339 1340 /* Free peer_req and pages before send. 1341 * In case we block on congestion, we could otherwise run into 1342 * some distributed deadlock, if the other side blocks on 1343 * congestion as well, because our receiver blocks in 1344 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1345 drbd_free_peer_req(device, peer_req); 1346 if (!eq) 1347 drbd_ov_out_of_sync_found(device, sector, size); 1348 else 1349 ov_out_of_sync_print(device); 1350 1351 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, 1352 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1353 1354 dec_unacked(device); 1355 1356 --device->ov_left; 1357 1358 /* let's advance progress step marks only for every other megabyte */ 1359 if ((device->ov_left & 0x200) == 0x200) 1360 drbd_advance_rs_marks(device, device->ov_left); 1361 1362 stop_sector_reached = verify_can_do_stop_sector(device) && 1363 (sector + (size>>9)) >= device->ov_stop_sector; 1364 1365 if (device->ov_left == 0 || stop_sector_reached) { 1366 ov_out_of_sync_print(device); 1367 drbd_resync_finished(device); 1368 } 1369 1370 return err; 1371 } 1372 1373 /* FIXME 1374 * We need to track the number of pending barrier acks, 1375 * and to be able to wait for them. 1376 * See also comment in drbd_adm_attach before drbd_suspend_io. 1377 */ 1378 static int drbd_send_barrier(struct drbd_connection *connection) 1379 { 1380 struct p_barrier *p; 1381 struct drbd_socket *sock; 1382 1383 sock = &connection->data; 1384 p = conn_prepare_command(connection, sock); 1385 if (!p) 1386 return -EIO; 1387 p->barrier = connection->send.current_epoch_nr; 1388 p->pad = 0; 1389 connection->send.current_epoch_writes = 0; 1390 connection->send.last_sent_barrier_jif = jiffies; 1391 1392 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0); 1393 } 1394 1395 static int pd_send_unplug_remote(struct drbd_peer_device *pd) 1396 { 1397 struct drbd_socket *sock = &pd->connection->data; 1398 if (!drbd_prepare_command(pd, sock)) 1399 return -EIO; 1400 return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0); 1401 } 1402 1403 int w_send_write_hint(struct drbd_work *w, int cancel) 1404 { 1405 struct drbd_device *device = 1406 container_of(w, struct drbd_device, unplug_work); 1407 1408 if (cancel) 1409 return 0; 1410 return pd_send_unplug_remote(first_peer_device(device)); 1411 } 1412 1413 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch) 1414 { 1415 if (!connection->send.seen_any_write_yet) { 1416 connection->send.seen_any_write_yet = true; 1417 connection->send.current_epoch_nr = epoch; 1418 connection->send.current_epoch_writes = 0; 1419 connection->send.last_sent_barrier_jif = jiffies; 1420 } 1421 } 1422 1423 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch) 1424 { 1425 /* re-init if first write on this connection */ 1426 if (!connection->send.seen_any_write_yet) 1427 return; 1428 if (connection->send.current_epoch_nr != epoch) { 1429 if (connection->send.current_epoch_writes) 1430 drbd_send_barrier(connection); 1431 connection->send.current_epoch_nr = epoch; 1432 } 1433 } 1434 1435 int w_send_out_of_sync(struct drbd_work *w, int cancel) 1436 { 1437 struct drbd_request *req = container_of(w, struct drbd_request, w); 1438 struct drbd_device *device = req->device; 1439 struct drbd_peer_device *const peer_device = first_peer_device(device); 1440 struct drbd_connection *const connection = peer_device->connection; 1441 int err; 1442 1443 if (unlikely(cancel)) { 1444 req_mod(req, SEND_CANCELED); 1445 return 0; 1446 } 1447 req->pre_send_jif = jiffies; 1448 1449 /* this time, no connection->send.current_epoch_writes++; 1450 * If it was sent, it was the closing barrier for the last 1451 * replicated epoch, before we went into AHEAD mode. 1452 * No more barriers will be sent, until we leave AHEAD mode again. */ 1453 maybe_send_barrier(connection, req->epoch); 1454 1455 err = drbd_send_out_of_sync(peer_device, req); 1456 req_mod(req, OOS_HANDED_TO_NETWORK); 1457 1458 return err; 1459 } 1460 1461 /** 1462 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request 1463 * @w: work object. 1464 * @cancel: The connection will be closed anyways 1465 */ 1466 int w_send_dblock(struct drbd_work *w, int cancel) 1467 { 1468 struct drbd_request *req = container_of(w, struct drbd_request, w); 1469 struct drbd_device *device = req->device; 1470 struct drbd_peer_device *const peer_device = first_peer_device(device); 1471 struct drbd_connection *connection = peer_device->connection; 1472 bool do_send_unplug = req->rq_state & RQ_UNPLUG; 1473 int err; 1474 1475 if (unlikely(cancel)) { 1476 req_mod(req, SEND_CANCELED); 1477 return 0; 1478 } 1479 req->pre_send_jif = jiffies; 1480 1481 re_init_if_first_write(connection, req->epoch); 1482 maybe_send_barrier(connection, req->epoch); 1483 connection->send.current_epoch_writes++; 1484 1485 err = drbd_send_dblock(peer_device, req); 1486 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1487 1488 if (do_send_unplug && !err) 1489 pd_send_unplug_remote(peer_device); 1490 1491 return err; 1492 } 1493 1494 /** 1495 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet 1496 * @w: work object. 1497 * @cancel: The connection will be closed anyways 1498 */ 1499 int w_send_read_req(struct drbd_work *w, int cancel) 1500 { 1501 struct drbd_request *req = container_of(w, struct drbd_request, w); 1502 struct drbd_device *device = req->device; 1503 struct drbd_peer_device *const peer_device = first_peer_device(device); 1504 struct drbd_connection *connection = peer_device->connection; 1505 bool do_send_unplug = req->rq_state & RQ_UNPLUG; 1506 int err; 1507 1508 if (unlikely(cancel)) { 1509 req_mod(req, SEND_CANCELED); 1510 return 0; 1511 } 1512 req->pre_send_jif = jiffies; 1513 1514 /* Even read requests may close a write epoch, 1515 * if there was any yet. */ 1516 maybe_send_barrier(connection, req->epoch); 1517 1518 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size, 1519 (unsigned long)req); 1520 1521 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1522 1523 if (do_send_unplug && !err) 1524 pd_send_unplug_remote(peer_device); 1525 1526 return err; 1527 } 1528 1529 int w_restart_disk_io(struct drbd_work *w, int cancel) 1530 { 1531 struct drbd_request *req = container_of(w, struct drbd_request, w); 1532 struct drbd_device *device = req->device; 1533 1534 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG) 1535 drbd_al_begin_io(device, &req->i); 1536 1537 drbd_req_make_private_bio(req, req->master_bio); 1538 bio_set_dev(req->private_bio, device->ldev->backing_bdev); 1539 generic_make_request(req->private_bio); 1540 1541 return 0; 1542 } 1543 1544 static int _drbd_may_sync_now(struct drbd_device *device) 1545 { 1546 struct drbd_device *odev = device; 1547 int resync_after; 1548 1549 while (1) { 1550 if (!odev->ldev || odev->state.disk == D_DISKLESS) 1551 return 1; 1552 rcu_read_lock(); 1553 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1554 rcu_read_unlock(); 1555 if (resync_after == -1) 1556 return 1; 1557 odev = minor_to_device(resync_after); 1558 if (!odev) 1559 return 1; 1560 if ((odev->state.conn >= C_SYNC_SOURCE && 1561 odev->state.conn <= C_PAUSED_SYNC_T) || 1562 odev->state.aftr_isp || odev->state.peer_isp || 1563 odev->state.user_isp) 1564 return 0; 1565 } 1566 } 1567 1568 /** 1569 * drbd_pause_after() - Pause resync on all devices that may not resync now 1570 * @device: DRBD device. 1571 * 1572 * Called from process context only (admin command and after_state_ch). 1573 */ 1574 static bool drbd_pause_after(struct drbd_device *device) 1575 { 1576 bool changed = false; 1577 struct drbd_device *odev; 1578 int i; 1579 1580 rcu_read_lock(); 1581 idr_for_each_entry(&drbd_devices, odev, i) { 1582 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1583 continue; 1584 if (!_drbd_may_sync_now(odev) && 1585 _drbd_set_state(_NS(odev, aftr_isp, 1), 1586 CS_HARD, NULL) != SS_NOTHING_TO_DO) 1587 changed = true; 1588 } 1589 rcu_read_unlock(); 1590 1591 return changed; 1592 } 1593 1594 /** 1595 * drbd_resume_next() - Resume resync on all devices that may resync now 1596 * @device: DRBD device. 1597 * 1598 * Called from process context only (admin command and worker). 1599 */ 1600 static bool drbd_resume_next(struct drbd_device *device) 1601 { 1602 bool changed = false; 1603 struct drbd_device *odev; 1604 int i; 1605 1606 rcu_read_lock(); 1607 idr_for_each_entry(&drbd_devices, odev, i) { 1608 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1609 continue; 1610 if (odev->state.aftr_isp) { 1611 if (_drbd_may_sync_now(odev) && 1612 _drbd_set_state(_NS(odev, aftr_isp, 0), 1613 CS_HARD, NULL) != SS_NOTHING_TO_DO) 1614 changed = true; 1615 } 1616 } 1617 rcu_read_unlock(); 1618 return changed; 1619 } 1620 1621 void resume_next_sg(struct drbd_device *device) 1622 { 1623 lock_all_resources(); 1624 drbd_resume_next(device); 1625 unlock_all_resources(); 1626 } 1627 1628 void suspend_other_sg(struct drbd_device *device) 1629 { 1630 lock_all_resources(); 1631 drbd_pause_after(device); 1632 unlock_all_resources(); 1633 } 1634 1635 /* caller must lock_all_resources() */ 1636 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor) 1637 { 1638 struct drbd_device *odev; 1639 int resync_after; 1640 1641 if (o_minor == -1) 1642 return NO_ERROR; 1643 if (o_minor < -1 || o_minor > MINORMASK) 1644 return ERR_RESYNC_AFTER; 1645 1646 /* check for loops */ 1647 odev = minor_to_device(o_minor); 1648 while (1) { 1649 if (odev == device) 1650 return ERR_RESYNC_AFTER_CYCLE; 1651 1652 /* You are free to depend on diskless, non-existing, 1653 * or not yet/no longer existing minors. 1654 * We only reject dependency loops. 1655 * We cannot follow the dependency chain beyond a detached or 1656 * missing minor. 1657 */ 1658 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS) 1659 return NO_ERROR; 1660 1661 rcu_read_lock(); 1662 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1663 rcu_read_unlock(); 1664 /* dependency chain ends here, no cycles. */ 1665 if (resync_after == -1) 1666 return NO_ERROR; 1667 1668 /* follow the dependency chain */ 1669 odev = minor_to_device(resync_after); 1670 } 1671 } 1672 1673 /* caller must lock_all_resources() */ 1674 void drbd_resync_after_changed(struct drbd_device *device) 1675 { 1676 int changed; 1677 1678 do { 1679 changed = drbd_pause_after(device); 1680 changed |= drbd_resume_next(device); 1681 } while (changed); 1682 } 1683 1684 void drbd_rs_controller_reset(struct drbd_device *device) 1685 { 1686 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk; 1687 struct fifo_buffer *plan; 1688 1689 atomic_set(&device->rs_sect_in, 0); 1690 atomic_set(&device->rs_sect_ev, 0); 1691 device->rs_in_flight = 0; 1692 device->rs_last_events = (int)part_stat_read_accum(&disk->part0, sectors); 1693 1694 /* Updating the RCU protected object in place is necessary since 1695 this function gets called from atomic context. 1696 It is valid since all other updates also lead to an completely 1697 empty fifo */ 1698 rcu_read_lock(); 1699 plan = rcu_dereference(device->rs_plan_s); 1700 plan->total = 0; 1701 fifo_set(plan, 0); 1702 rcu_read_unlock(); 1703 } 1704 1705 void start_resync_timer_fn(struct timer_list *t) 1706 { 1707 struct drbd_device *device = from_timer(device, t, start_resync_timer); 1708 drbd_device_post_work(device, RS_START); 1709 } 1710 1711 static void do_start_resync(struct drbd_device *device) 1712 { 1713 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) { 1714 drbd_warn(device, "postponing start_resync ...\n"); 1715 device->start_resync_timer.expires = jiffies + HZ/10; 1716 add_timer(&device->start_resync_timer); 1717 return; 1718 } 1719 1720 drbd_start_resync(device, C_SYNC_SOURCE); 1721 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags); 1722 } 1723 1724 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device) 1725 { 1726 bool csums_after_crash_only; 1727 rcu_read_lock(); 1728 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only; 1729 rcu_read_unlock(); 1730 return connection->agreed_pro_version >= 89 && /* supported? */ 1731 connection->csums_tfm && /* configured? */ 1732 (csums_after_crash_only == false /* use for each resync? */ 1733 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */ 1734 } 1735 1736 /** 1737 * drbd_start_resync() - Start the resync process 1738 * @device: DRBD device. 1739 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET 1740 * 1741 * This function might bring you directly into one of the 1742 * C_PAUSED_SYNC_* states. 1743 */ 1744 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) 1745 { 1746 struct drbd_peer_device *peer_device = first_peer_device(device); 1747 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; 1748 union drbd_state ns; 1749 int r; 1750 1751 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) { 1752 drbd_err(device, "Resync already running!\n"); 1753 return; 1754 } 1755 1756 if (!connection) { 1757 drbd_err(device, "No connection to peer, aborting!\n"); 1758 return; 1759 } 1760 1761 if (!test_bit(B_RS_H_DONE, &device->flags)) { 1762 if (side == C_SYNC_TARGET) { 1763 /* Since application IO was locked out during C_WF_BITMAP_T and 1764 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET 1765 we check that we might make the data inconsistent. */ 1766 r = drbd_khelper(device, "before-resync-target"); 1767 r = (r >> 8) & 0xff; 1768 if (r > 0) { 1769 drbd_info(device, "before-resync-target handler returned %d, " 1770 "dropping connection.\n", r); 1771 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 1772 return; 1773 } 1774 } else /* C_SYNC_SOURCE */ { 1775 r = drbd_khelper(device, "before-resync-source"); 1776 r = (r >> 8) & 0xff; 1777 if (r > 0) { 1778 if (r == 3) { 1779 drbd_info(device, "before-resync-source handler returned %d, " 1780 "ignoring. Old userland tools?", r); 1781 } else { 1782 drbd_info(device, "before-resync-source handler returned %d, " 1783 "dropping connection.\n", r); 1784 conn_request_state(connection, 1785 NS(conn, C_DISCONNECTING), CS_HARD); 1786 return; 1787 } 1788 } 1789 } 1790 } 1791 1792 if (current == connection->worker.task) { 1793 /* The worker should not sleep waiting for state_mutex, 1794 that can take long */ 1795 if (!mutex_trylock(device->state_mutex)) { 1796 set_bit(B_RS_H_DONE, &device->flags); 1797 device->start_resync_timer.expires = jiffies + HZ/5; 1798 add_timer(&device->start_resync_timer); 1799 return; 1800 } 1801 } else { 1802 mutex_lock(device->state_mutex); 1803 } 1804 1805 lock_all_resources(); 1806 clear_bit(B_RS_H_DONE, &device->flags); 1807 /* Did some connection breakage or IO error race with us? */ 1808 if (device->state.conn < C_CONNECTED 1809 || !get_ldev_if_state(device, D_NEGOTIATING)) { 1810 unlock_all_resources(); 1811 goto out; 1812 } 1813 1814 ns = drbd_read_state(device); 1815 1816 ns.aftr_isp = !_drbd_may_sync_now(device); 1817 1818 ns.conn = side; 1819 1820 if (side == C_SYNC_TARGET) 1821 ns.disk = D_INCONSISTENT; 1822 else /* side == C_SYNC_SOURCE */ 1823 ns.pdsk = D_INCONSISTENT; 1824 1825 r = _drbd_set_state(device, ns, CS_VERBOSE, NULL); 1826 ns = drbd_read_state(device); 1827 1828 if (ns.conn < C_CONNECTED) 1829 r = SS_UNKNOWN_ERROR; 1830 1831 if (r == SS_SUCCESS) { 1832 unsigned long tw = drbd_bm_total_weight(device); 1833 unsigned long now = jiffies; 1834 int i; 1835 1836 device->rs_failed = 0; 1837 device->rs_paused = 0; 1838 device->rs_same_csum = 0; 1839 device->rs_last_sect_ev = 0; 1840 device->rs_total = tw; 1841 device->rs_start = now; 1842 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 1843 device->rs_mark_left[i] = tw; 1844 device->rs_mark_time[i] = now; 1845 } 1846 drbd_pause_after(device); 1847 /* Forget potentially stale cached per resync extent bit-counts. 1848 * Open coded drbd_rs_cancel_all(device), we already have IRQs 1849 * disabled, and know the disk state is ok. */ 1850 spin_lock(&device->al_lock); 1851 lc_reset(device->resync); 1852 device->resync_locked = 0; 1853 device->resync_wenr = LC_FREE; 1854 spin_unlock(&device->al_lock); 1855 } 1856 unlock_all_resources(); 1857 1858 if (r == SS_SUCCESS) { 1859 wake_up(&device->al_wait); /* for lc_reset() above */ 1860 /* reset rs_last_bcast when a resync or verify is started, 1861 * to deal with potential jiffies wrap. */ 1862 device->rs_last_bcast = jiffies - HZ; 1863 1864 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n", 1865 drbd_conn_str(ns.conn), 1866 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10), 1867 (unsigned long) device->rs_total); 1868 if (side == C_SYNC_TARGET) { 1869 device->bm_resync_fo = 0; 1870 device->use_csums = use_checksum_based_resync(connection, device); 1871 } else { 1872 device->use_csums = false; 1873 } 1874 1875 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid 1876 * with w_send_oos, or the sync target will get confused as to 1877 * how much bits to resync. We cannot do that always, because for an 1878 * empty resync and protocol < 95, we need to do it here, as we call 1879 * drbd_resync_finished from here in that case. 1880 * We drbd_gen_and_send_sync_uuid here for protocol < 96, 1881 * and from after_state_ch otherwise. */ 1882 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96) 1883 drbd_gen_and_send_sync_uuid(peer_device); 1884 1885 if (connection->agreed_pro_version < 95 && device->rs_total == 0) { 1886 /* This still has a race (about when exactly the peers 1887 * detect connection loss) that can lead to a full sync 1888 * on next handshake. In 8.3.9 we fixed this with explicit 1889 * resync-finished notifications, but the fix 1890 * introduces a protocol change. Sleeping for some 1891 * time longer than the ping interval + timeout on the 1892 * SyncSource, to give the SyncTarget the chance to 1893 * detect connection loss, then waiting for a ping 1894 * response (implicit in drbd_resync_finished) reduces 1895 * the race considerably, but does not solve it. */ 1896 if (side == C_SYNC_SOURCE) { 1897 struct net_conf *nc; 1898 int timeo; 1899 1900 rcu_read_lock(); 1901 nc = rcu_dereference(connection->net_conf); 1902 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9; 1903 rcu_read_unlock(); 1904 schedule_timeout_interruptible(timeo); 1905 } 1906 drbd_resync_finished(device); 1907 } 1908 1909 drbd_rs_controller_reset(device); 1910 /* ns.conn may already be != device->state.conn, 1911 * we may have been paused in between, or become paused until 1912 * the timer triggers. 1913 * No matter, that is handled in resync_timer_fn() */ 1914 if (ns.conn == C_SYNC_TARGET) 1915 mod_timer(&device->resync_timer, jiffies); 1916 1917 drbd_md_sync(device); 1918 } 1919 put_ldev(device); 1920 out: 1921 mutex_unlock(device->state_mutex); 1922 } 1923 1924 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done) 1925 { 1926 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, }; 1927 device->rs_last_bcast = jiffies; 1928 1929 if (!get_ldev(device)) 1930 return; 1931 1932 drbd_bm_write_lazy(device, 0); 1933 if (resync_done && is_sync_state(device->state.conn)) 1934 drbd_resync_finished(device); 1935 1936 drbd_bcast_event(device, &sib); 1937 /* update timestamp, in case it took a while to write out stuff */ 1938 device->rs_last_bcast = jiffies; 1939 put_ldev(device); 1940 } 1941 1942 static void drbd_ldev_destroy(struct drbd_device *device) 1943 { 1944 lc_destroy(device->resync); 1945 device->resync = NULL; 1946 lc_destroy(device->act_log); 1947 device->act_log = NULL; 1948 1949 __acquire(local); 1950 drbd_backing_dev_free(device, device->ldev); 1951 device->ldev = NULL; 1952 __release(local); 1953 1954 clear_bit(GOING_DISKLESS, &device->flags); 1955 wake_up(&device->misc_wait); 1956 } 1957 1958 static void go_diskless(struct drbd_device *device) 1959 { 1960 D_ASSERT(device, device->state.disk == D_FAILED); 1961 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will 1962 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch 1963 * the protected members anymore, though, so once put_ldev reaches zero 1964 * again, it will be safe to free them. */ 1965 1966 /* Try to write changed bitmap pages, read errors may have just 1967 * set some bits outside the area covered by the activity log. 1968 * 1969 * If we have an IO error during the bitmap writeout, 1970 * we will want a full sync next time, just in case. 1971 * (Do we want a specific meta data flag for this?) 1972 * 1973 * If that does not make it to stable storage either, 1974 * we cannot do anything about that anymore. 1975 * 1976 * We still need to check if both bitmap and ldev are present, we may 1977 * end up here after a failed attach, before ldev was even assigned. 1978 */ 1979 if (device->bitmap && device->ldev) { 1980 /* An interrupted resync or similar is allowed to recounts bits 1981 * while we detach. 1982 * Any modifications would not be expected anymore, though. 1983 */ 1984 if (drbd_bitmap_io_from_worker(device, drbd_bm_write, 1985 "detach", BM_LOCKED_TEST_ALLOWED)) { 1986 if (test_bit(WAS_READ_ERROR, &device->flags)) { 1987 drbd_md_set_flag(device, MDF_FULL_SYNC); 1988 drbd_md_sync(device); 1989 } 1990 } 1991 } 1992 1993 drbd_force_state(device, NS(disk, D_DISKLESS)); 1994 } 1995 1996 static int do_md_sync(struct drbd_device *device) 1997 { 1998 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n"); 1999 drbd_md_sync(device); 2000 return 0; 2001 } 2002 2003 /* only called from drbd_worker thread, no locking */ 2004 void __update_timing_details( 2005 struct drbd_thread_timing_details *tdp, 2006 unsigned int *cb_nr, 2007 void *cb, 2008 const char *fn, const unsigned int line) 2009 { 2010 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST; 2011 struct drbd_thread_timing_details *td = tdp + i; 2012 2013 td->start_jif = jiffies; 2014 td->cb_addr = cb; 2015 td->caller_fn = fn; 2016 td->line = line; 2017 td->cb_nr = *cb_nr; 2018 2019 i = (i+1) % DRBD_THREAD_DETAILS_HIST; 2020 td = tdp + i; 2021 memset(td, 0, sizeof(*td)); 2022 2023 ++(*cb_nr); 2024 } 2025 2026 static void do_device_work(struct drbd_device *device, const unsigned long todo) 2027 { 2028 if (test_bit(MD_SYNC, &todo)) 2029 do_md_sync(device); 2030 if (test_bit(RS_DONE, &todo) || 2031 test_bit(RS_PROGRESS, &todo)) 2032 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo)); 2033 if (test_bit(GO_DISKLESS, &todo)) 2034 go_diskless(device); 2035 if (test_bit(DESTROY_DISK, &todo)) 2036 drbd_ldev_destroy(device); 2037 if (test_bit(RS_START, &todo)) 2038 do_start_resync(device); 2039 } 2040 2041 #define DRBD_DEVICE_WORK_MASK \ 2042 ((1UL << GO_DISKLESS) \ 2043 |(1UL << DESTROY_DISK) \ 2044 |(1UL << MD_SYNC) \ 2045 |(1UL << RS_START) \ 2046 |(1UL << RS_PROGRESS) \ 2047 |(1UL << RS_DONE) \ 2048 ) 2049 2050 static unsigned long get_work_bits(unsigned long *flags) 2051 { 2052 unsigned long old, new; 2053 do { 2054 old = *flags; 2055 new = old & ~DRBD_DEVICE_WORK_MASK; 2056 } while (cmpxchg(flags, old, new) != old); 2057 return old & DRBD_DEVICE_WORK_MASK; 2058 } 2059 2060 static void do_unqueued_work(struct drbd_connection *connection) 2061 { 2062 struct drbd_peer_device *peer_device; 2063 int vnr; 2064 2065 rcu_read_lock(); 2066 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 2067 struct drbd_device *device = peer_device->device; 2068 unsigned long todo = get_work_bits(&device->flags); 2069 if (!todo) 2070 continue; 2071 2072 kref_get(&device->kref); 2073 rcu_read_unlock(); 2074 do_device_work(device, todo); 2075 kref_put(&device->kref, drbd_destroy_device); 2076 rcu_read_lock(); 2077 } 2078 rcu_read_unlock(); 2079 } 2080 2081 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list) 2082 { 2083 spin_lock_irq(&queue->q_lock); 2084 list_splice_tail_init(&queue->q, work_list); 2085 spin_unlock_irq(&queue->q_lock); 2086 return !list_empty(work_list); 2087 } 2088 2089 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list) 2090 { 2091 DEFINE_WAIT(wait); 2092 struct net_conf *nc; 2093 int uncork, cork; 2094 2095 dequeue_work_batch(&connection->sender_work, work_list); 2096 if (!list_empty(work_list)) 2097 return; 2098 2099 /* Still nothing to do? 2100 * Maybe we still need to close the current epoch, 2101 * even if no new requests are queued yet. 2102 * 2103 * Also, poke TCP, just in case. 2104 * Then wait for new work (or signal). */ 2105 rcu_read_lock(); 2106 nc = rcu_dereference(connection->net_conf); 2107 uncork = nc ? nc->tcp_cork : 0; 2108 rcu_read_unlock(); 2109 if (uncork) { 2110 mutex_lock(&connection->data.mutex); 2111 if (connection->data.socket) 2112 drbd_tcp_uncork(connection->data.socket); 2113 mutex_unlock(&connection->data.mutex); 2114 } 2115 2116 for (;;) { 2117 int send_barrier; 2118 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE); 2119 spin_lock_irq(&connection->resource->req_lock); 2120 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 2121 if (!list_empty(&connection->sender_work.q)) 2122 list_splice_tail_init(&connection->sender_work.q, work_list); 2123 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 2124 if (!list_empty(work_list) || signal_pending(current)) { 2125 spin_unlock_irq(&connection->resource->req_lock); 2126 break; 2127 } 2128 2129 /* We found nothing new to do, no to-be-communicated request, 2130 * no other work item. We may still need to close the last 2131 * epoch. Next incoming request epoch will be connection -> 2132 * current transfer log epoch number. If that is different 2133 * from the epoch of the last request we communicated, it is 2134 * safe to send the epoch separating barrier now. 2135 */ 2136 send_barrier = 2137 atomic_read(&connection->current_tle_nr) != 2138 connection->send.current_epoch_nr; 2139 spin_unlock_irq(&connection->resource->req_lock); 2140 2141 if (send_barrier) 2142 maybe_send_barrier(connection, 2143 connection->send.current_epoch_nr + 1); 2144 2145 if (test_bit(DEVICE_WORK_PENDING, &connection->flags)) 2146 break; 2147 2148 /* drbd_send() may have called flush_signals() */ 2149 if (get_t_state(&connection->worker) != RUNNING) 2150 break; 2151 2152 schedule(); 2153 /* may be woken up for other things but new work, too, 2154 * e.g. if the current epoch got closed. 2155 * In which case we send the barrier above. */ 2156 } 2157 finish_wait(&connection->sender_work.q_wait, &wait); 2158 2159 /* someone may have changed the config while we have been waiting above. */ 2160 rcu_read_lock(); 2161 nc = rcu_dereference(connection->net_conf); 2162 cork = nc ? nc->tcp_cork : 0; 2163 rcu_read_unlock(); 2164 mutex_lock(&connection->data.mutex); 2165 if (connection->data.socket) { 2166 if (cork) 2167 drbd_tcp_cork(connection->data.socket); 2168 else if (!uncork) 2169 drbd_tcp_uncork(connection->data.socket); 2170 } 2171 mutex_unlock(&connection->data.mutex); 2172 } 2173 2174 int drbd_worker(struct drbd_thread *thi) 2175 { 2176 struct drbd_connection *connection = thi->connection; 2177 struct drbd_work *w = NULL; 2178 struct drbd_peer_device *peer_device; 2179 LIST_HEAD(work_list); 2180 int vnr; 2181 2182 while (get_t_state(thi) == RUNNING) { 2183 drbd_thread_current_set_cpu(thi); 2184 2185 if (list_empty(&work_list)) { 2186 update_worker_timing_details(connection, wait_for_work); 2187 wait_for_work(connection, &work_list); 2188 } 2189 2190 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) { 2191 update_worker_timing_details(connection, do_unqueued_work); 2192 do_unqueued_work(connection); 2193 } 2194 2195 if (signal_pending(current)) { 2196 flush_signals(current); 2197 if (get_t_state(thi) == RUNNING) { 2198 drbd_warn(connection, "Worker got an unexpected signal\n"); 2199 continue; 2200 } 2201 break; 2202 } 2203 2204 if (get_t_state(thi) != RUNNING) 2205 break; 2206 2207 if (!list_empty(&work_list)) { 2208 w = list_first_entry(&work_list, struct drbd_work, list); 2209 list_del_init(&w->list); 2210 update_worker_timing_details(connection, w->cb); 2211 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0) 2212 continue; 2213 if (connection->cstate >= C_WF_REPORT_PARAMS) 2214 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 2215 } 2216 } 2217 2218 do { 2219 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) { 2220 update_worker_timing_details(connection, do_unqueued_work); 2221 do_unqueued_work(connection); 2222 } 2223 if (!list_empty(&work_list)) { 2224 w = list_first_entry(&work_list, struct drbd_work, list); 2225 list_del_init(&w->list); 2226 update_worker_timing_details(connection, w->cb); 2227 w->cb(w, 1); 2228 } else 2229 dequeue_work_batch(&connection->sender_work, &work_list); 2230 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags)); 2231 2232 rcu_read_lock(); 2233 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 2234 struct drbd_device *device = peer_device->device; 2235 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE); 2236 kref_get(&device->kref); 2237 rcu_read_unlock(); 2238 drbd_device_cleanup(device); 2239 kref_put(&device->kref, drbd_destroy_device); 2240 rcu_read_lock(); 2241 } 2242 rcu_read_unlock(); 2243 2244 return 0; 2245 } 2246