1 /* 2 drbd_worker.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24 */ 25 26 #include <linux/module.h> 27 #include <linux/drbd.h> 28 #include <linux/sched.h> 29 #include <linux/wait.h> 30 #include <linux/mm.h> 31 #include <linux/memcontrol.h> 32 #include <linux/mm_inline.h> 33 #include <linux/slab.h> 34 #include <linux/random.h> 35 #include <linux/string.h> 36 #include <linux/scatterlist.h> 37 38 #include "drbd_int.h" 39 #include "drbd_req.h" 40 41 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel); 42 static int w_make_resync_request(struct drbd_conf *mdev, 43 struct drbd_work *w, int cancel); 44 45 46 47 /* defined here: 48 drbd_md_io_complete 49 drbd_endio_sec 50 drbd_endio_pri 51 52 * more endio handlers: 53 atodb_endio in drbd_actlog.c 54 drbd_bm_async_io_complete in drbd_bitmap.c 55 56 * For all these callbacks, note the following: 57 * The callbacks will be called in irq context by the IDE drivers, 58 * and in Softirqs/Tasklets/BH context by the SCSI drivers. 59 * Try to get the locking right :) 60 * 61 */ 62 63 64 /* About the global_state_lock 65 Each state transition on an device holds a read lock. In case we have 66 to evaluate the sync after dependencies, we grab a write lock, because 67 we need stable states on all devices for that. */ 68 rwlock_t global_state_lock; 69 70 /* used for synchronous meta data and bitmap IO 71 * submitted by drbd_md_sync_page_io() 72 */ 73 void drbd_md_io_complete(struct bio *bio, int error) 74 { 75 struct drbd_md_io *md_io; 76 77 md_io = (struct drbd_md_io *)bio->bi_private; 78 md_io->error = error; 79 80 complete(&md_io->event); 81 } 82 83 /* reads on behalf of the partner, 84 * "submitted" by the receiver 85 */ 86 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local) 87 { 88 unsigned long flags = 0; 89 struct drbd_conf *mdev = e->mdev; 90 91 D_ASSERT(e->block_id != ID_VACANT); 92 93 spin_lock_irqsave(&mdev->req_lock, flags); 94 mdev->read_cnt += e->size >> 9; 95 list_del(&e->w.list); 96 if (list_empty(&mdev->read_ee)) 97 wake_up(&mdev->ee_wait); 98 if (test_bit(__EE_WAS_ERROR, &e->flags)) 99 __drbd_chk_io_error(mdev, false); 100 spin_unlock_irqrestore(&mdev->req_lock, flags); 101 102 drbd_queue_work(&mdev->data.work, &e->w); 103 put_ldev(mdev); 104 } 105 106 /* writes on behalf of the partner, or resync writes, 107 * "submitted" by the receiver, final stage. */ 108 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local) 109 { 110 unsigned long flags = 0; 111 struct drbd_conf *mdev = e->mdev; 112 sector_t e_sector; 113 int do_wake; 114 int is_syncer_req; 115 int do_al_complete_io; 116 117 D_ASSERT(e->block_id != ID_VACANT); 118 119 /* after we moved e to done_ee, 120 * we may no longer access it, 121 * it may be freed/reused already! 122 * (as soon as we release the req_lock) */ 123 e_sector = e->sector; 124 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO; 125 is_syncer_req = is_syncer_block_id(e->block_id); 126 127 spin_lock_irqsave(&mdev->req_lock, flags); 128 mdev->writ_cnt += e->size >> 9; 129 list_del(&e->w.list); /* has been on active_ee or sync_ee */ 130 list_add_tail(&e->w.list, &mdev->done_ee); 131 132 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet, 133 * neither did we wake possibly waiting conflicting requests. 134 * done from "drbd_process_done_ee" within the appropriate w.cb 135 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */ 136 137 do_wake = is_syncer_req 138 ? list_empty(&mdev->sync_ee) 139 : list_empty(&mdev->active_ee); 140 141 if (test_bit(__EE_WAS_ERROR, &e->flags)) 142 __drbd_chk_io_error(mdev, false); 143 spin_unlock_irqrestore(&mdev->req_lock, flags); 144 145 if (is_syncer_req) 146 drbd_rs_complete_io(mdev, e_sector); 147 148 if (do_wake) 149 wake_up(&mdev->ee_wait); 150 151 if (do_al_complete_io) 152 drbd_al_complete_io(mdev, e_sector); 153 154 wake_asender(mdev); 155 put_ldev(mdev); 156 } 157 158 /* writes on behalf of the partner, or resync writes, 159 * "submitted" by the receiver. 160 */ 161 void drbd_endio_sec(struct bio *bio, int error) 162 { 163 struct drbd_epoch_entry *e = bio->bi_private; 164 struct drbd_conf *mdev = e->mdev; 165 int uptodate = bio_flagged(bio, BIO_UPTODATE); 166 int is_write = bio_data_dir(bio) == WRITE; 167 168 if (error) 169 dev_warn(DEV, "%s: error=%d s=%llus\n", 170 is_write ? "write" : "read", error, 171 (unsigned long long)e->sector); 172 if (!error && !uptodate) { 173 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n", 174 is_write ? "write" : "read", 175 (unsigned long long)e->sector); 176 /* strange behavior of some lower level drivers... 177 * fail the request by clearing the uptodate flag, 178 * but do not return any error?! */ 179 error = -EIO; 180 } 181 182 if (error) 183 set_bit(__EE_WAS_ERROR, &e->flags); 184 185 bio_put(bio); /* no need for the bio anymore */ 186 if (atomic_dec_and_test(&e->pending_bios)) { 187 if (is_write) 188 drbd_endio_write_sec_final(e); 189 else 190 drbd_endio_read_sec_final(e); 191 } 192 } 193 194 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request 195 */ 196 void drbd_endio_pri(struct bio *bio, int error) 197 { 198 unsigned long flags; 199 struct drbd_request *req = bio->bi_private; 200 struct drbd_conf *mdev = req->mdev; 201 struct bio_and_error m; 202 enum drbd_req_event what; 203 int uptodate = bio_flagged(bio, BIO_UPTODATE); 204 205 if (!error && !uptodate) { 206 dev_warn(DEV, "p %s: setting error to -EIO\n", 207 bio_data_dir(bio) == WRITE ? "write" : "read"); 208 /* strange behavior of some lower level drivers... 209 * fail the request by clearing the uptodate flag, 210 * but do not return any error?! */ 211 error = -EIO; 212 } 213 214 /* to avoid recursion in __req_mod */ 215 if (unlikely(error)) { 216 what = (bio_data_dir(bio) == WRITE) 217 ? write_completed_with_error 218 : (bio_rw(bio) == READ) 219 ? read_completed_with_error 220 : read_ahead_completed_with_error; 221 } else 222 what = completed_ok; 223 224 bio_put(req->private_bio); 225 req->private_bio = ERR_PTR(error); 226 227 /* not req_mod(), we need irqsave here! */ 228 spin_lock_irqsave(&mdev->req_lock, flags); 229 __req_mod(req, what, &m); 230 spin_unlock_irqrestore(&mdev->req_lock, flags); 231 232 if (m.bio) 233 complete_master_bio(mdev, &m); 234 } 235 236 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 237 { 238 struct drbd_request *req = container_of(w, struct drbd_request, w); 239 240 /* We should not detach for read io-error, 241 * but try to WRITE the P_DATA_REPLY to the failed location, 242 * to give the disk the chance to relocate that block */ 243 244 spin_lock_irq(&mdev->req_lock); 245 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) { 246 _req_mod(req, read_retry_remote_canceled); 247 spin_unlock_irq(&mdev->req_lock); 248 return 1; 249 } 250 spin_unlock_irq(&mdev->req_lock); 251 252 return w_send_read_req(mdev, w, 0); 253 } 254 255 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 256 { 257 ERR_IF(cancel) return 1; 258 dev_err(DEV, "resync inactive, but callback triggered??\n"); 259 return 1; /* Simply ignore this! */ 260 } 261 262 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest) 263 { 264 struct hash_desc desc; 265 struct scatterlist sg; 266 struct page *page = e->pages; 267 struct page *tmp; 268 unsigned len; 269 270 desc.tfm = tfm; 271 desc.flags = 0; 272 273 sg_init_table(&sg, 1); 274 crypto_hash_init(&desc); 275 276 while ((tmp = page_chain_next(page))) { 277 /* all but the last page will be fully used */ 278 sg_set_page(&sg, page, PAGE_SIZE, 0); 279 crypto_hash_update(&desc, &sg, sg.length); 280 page = tmp; 281 } 282 /* and now the last, possibly only partially used page */ 283 len = e->size & (PAGE_SIZE - 1); 284 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0); 285 crypto_hash_update(&desc, &sg, sg.length); 286 crypto_hash_final(&desc, digest); 287 } 288 289 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest) 290 { 291 struct hash_desc desc; 292 struct scatterlist sg; 293 struct bio_vec *bvec; 294 int i; 295 296 desc.tfm = tfm; 297 desc.flags = 0; 298 299 sg_init_table(&sg, 1); 300 crypto_hash_init(&desc); 301 302 __bio_for_each_segment(bvec, bio, i, 0) { 303 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset); 304 crypto_hash_update(&desc, &sg, sg.length); 305 } 306 crypto_hash_final(&desc, digest); 307 } 308 309 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 310 { 311 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 312 int digest_size; 313 void *digest; 314 int ok; 315 316 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef); 317 318 if (unlikely(cancel)) { 319 drbd_free_ee(mdev, e); 320 return 1; 321 } 322 323 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 324 digest_size = crypto_hash_digestsize(mdev->csums_tfm); 325 digest = kmalloc(digest_size, GFP_NOIO); 326 if (digest) { 327 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest); 328 329 inc_rs_pending(mdev); 330 ok = drbd_send_drequest_csum(mdev, 331 e->sector, 332 e->size, 333 digest, 334 digest_size, 335 P_CSUM_RS_REQUEST); 336 kfree(digest); 337 } else { 338 dev_err(DEV, "kmalloc() of digest failed.\n"); 339 ok = 0; 340 } 341 } else 342 ok = 1; 343 344 drbd_free_ee(mdev, e); 345 346 if (unlikely(!ok)) 347 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n"); 348 return ok; 349 } 350 351 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 352 353 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size) 354 { 355 struct drbd_epoch_entry *e; 356 357 if (!get_ldev(mdev)) 358 return -EIO; 359 360 if (drbd_rs_should_slow_down(mdev, sector)) 361 goto defer; 362 363 /* GFP_TRY, because if there is no memory available right now, this may 364 * be rescheduled for later. It is "only" background resync, after all. */ 365 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY); 366 if (!e) 367 goto defer; 368 369 e->w.cb = w_e_send_csum; 370 spin_lock_irq(&mdev->req_lock); 371 list_add(&e->w.list, &mdev->read_ee); 372 spin_unlock_irq(&mdev->req_lock); 373 374 atomic_add(size >> 9, &mdev->rs_sect_ev); 375 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0) 376 return 0; 377 378 /* drbd_submit_ee currently fails for one reason only: 379 * not being able to allocate enough bios. 380 * Is dropping the connection going to help? */ 381 spin_lock_irq(&mdev->req_lock); 382 list_del(&e->w.list); 383 spin_unlock_irq(&mdev->req_lock); 384 385 drbd_free_ee(mdev, e); 386 defer: 387 put_ldev(mdev); 388 return -EAGAIN; 389 } 390 391 void resync_timer_fn(unsigned long data) 392 { 393 struct drbd_conf *mdev = (struct drbd_conf *) data; 394 int queue; 395 396 queue = 1; 397 switch (mdev->state.conn) { 398 case C_VERIFY_S: 399 mdev->resync_work.cb = w_make_ov_request; 400 break; 401 case C_SYNC_TARGET: 402 mdev->resync_work.cb = w_make_resync_request; 403 break; 404 default: 405 queue = 0; 406 mdev->resync_work.cb = w_resync_inactive; 407 } 408 409 /* harmless race: list_empty outside data.work.q_lock */ 410 if (list_empty(&mdev->resync_work.list) && queue) 411 drbd_queue_work(&mdev->data.work, &mdev->resync_work); 412 } 413 414 static void fifo_set(struct fifo_buffer *fb, int value) 415 { 416 int i; 417 418 for (i = 0; i < fb->size; i++) 419 fb->values[i] = value; 420 } 421 422 static int fifo_push(struct fifo_buffer *fb, int value) 423 { 424 int ov; 425 426 ov = fb->values[fb->head_index]; 427 fb->values[fb->head_index++] = value; 428 429 if (fb->head_index >= fb->size) 430 fb->head_index = 0; 431 432 return ov; 433 } 434 435 static void fifo_add_val(struct fifo_buffer *fb, int value) 436 { 437 int i; 438 439 for (i = 0; i < fb->size; i++) 440 fb->values[i] += value; 441 } 442 443 static int drbd_rs_controller(struct drbd_conf *mdev) 444 { 445 unsigned int sect_in; /* Number of sectors that came in since the last turn */ 446 unsigned int want; /* The number of sectors we want in the proxy */ 447 int req_sect; /* Number of sectors to request in this turn */ 448 int correction; /* Number of sectors more we need in the proxy*/ 449 int cps; /* correction per invocation of drbd_rs_controller() */ 450 int steps; /* Number of time steps to plan ahead */ 451 int curr_corr; 452 int max_sect; 453 454 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */ 455 mdev->rs_in_flight -= sect_in; 456 457 spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */ 458 459 steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */ 460 461 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */ 462 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps; 463 } else { /* normal path */ 464 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target : 465 sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10); 466 } 467 468 correction = want - mdev->rs_in_flight - mdev->rs_planed; 469 470 /* Plan ahead */ 471 cps = correction / steps; 472 fifo_add_val(&mdev->rs_plan_s, cps); 473 mdev->rs_planed += cps * steps; 474 475 /* What we do in this step */ 476 curr_corr = fifo_push(&mdev->rs_plan_s, 0); 477 spin_unlock(&mdev->peer_seq_lock); 478 mdev->rs_planed -= curr_corr; 479 480 req_sect = sect_in + curr_corr; 481 if (req_sect < 0) 482 req_sect = 0; 483 484 max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ; 485 if (req_sect > max_sect) 486 req_sect = max_sect; 487 488 /* 489 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n", 490 sect_in, mdev->rs_in_flight, want, correction, 491 steps, cps, mdev->rs_planed, curr_corr, req_sect); 492 */ 493 494 return req_sect; 495 } 496 497 static int drbd_rs_number_requests(struct drbd_conf *mdev) 498 { 499 int number; 500 if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */ 501 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9); 502 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME; 503 } else { 504 mdev->c_sync_rate = mdev->sync_conf.rate; 505 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ); 506 } 507 508 /* ignore the amount of pending requests, the resync controller should 509 * throttle down to incoming reply rate soon enough anyways. */ 510 return number; 511 } 512 513 static int w_make_resync_request(struct drbd_conf *mdev, 514 struct drbd_work *w, int cancel) 515 { 516 unsigned long bit; 517 sector_t sector; 518 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 519 int max_bio_size; 520 int number, rollback_i, size; 521 int align, queued, sndbuf; 522 int i = 0; 523 524 if (unlikely(cancel)) 525 return 1; 526 527 if (unlikely(mdev->state.conn < C_CONNECTED)) { 528 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected"); 529 return 0; 530 } 531 532 if (mdev->state.conn != C_SYNC_TARGET) 533 dev_err(DEV, "%s in w_make_resync_request\n", 534 drbd_conn_str(mdev->state.conn)); 535 536 if (mdev->rs_total == 0) { 537 /* empty resync? */ 538 drbd_resync_finished(mdev); 539 return 1; 540 } 541 542 if (!get_ldev(mdev)) { 543 /* Since we only need to access mdev->rsync a 544 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but 545 to continue resync with a broken disk makes no sense at 546 all */ 547 dev_err(DEV, "Disk broke down during resync!\n"); 548 mdev->resync_work.cb = w_resync_inactive; 549 return 1; 550 } 551 552 /* starting with drbd 8.3.8, we can handle multi-bio EEs, 553 * if it should be necessary */ 554 max_bio_size = 555 mdev->agreed_pro_version < 94 ? queue_max_hw_sectors(mdev->rq_queue) << 9 : 556 mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_BIO_SIZE; 557 558 number = drbd_rs_number_requests(mdev); 559 if (number == 0) 560 goto requeue; 561 562 for (i = 0; i < number; i++) { 563 /* Stop generating RS requests, when half of the send buffer is filled */ 564 mutex_lock(&mdev->data.mutex); 565 if (mdev->data.socket) { 566 queued = mdev->data.socket->sk->sk_wmem_queued; 567 sndbuf = mdev->data.socket->sk->sk_sndbuf; 568 } else { 569 queued = 1; 570 sndbuf = 0; 571 } 572 mutex_unlock(&mdev->data.mutex); 573 if (queued > sndbuf / 2) 574 goto requeue; 575 576 next_sector: 577 size = BM_BLOCK_SIZE; 578 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo); 579 580 if (bit == DRBD_END_OF_BITMAP) { 581 mdev->bm_resync_fo = drbd_bm_bits(mdev); 582 mdev->resync_work.cb = w_resync_inactive; 583 put_ldev(mdev); 584 return 1; 585 } 586 587 sector = BM_BIT_TO_SECT(bit); 588 589 if (drbd_rs_should_slow_down(mdev, sector) || 590 drbd_try_rs_begin_io(mdev, sector)) { 591 mdev->bm_resync_fo = bit; 592 goto requeue; 593 } 594 mdev->bm_resync_fo = bit + 1; 595 596 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) { 597 drbd_rs_complete_io(mdev, sector); 598 goto next_sector; 599 } 600 601 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE 602 /* try to find some adjacent bits. 603 * we stop if we have already the maximum req size. 604 * 605 * Additionally always align bigger requests, in order to 606 * be prepared for all stripe sizes of software RAIDs. 607 */ 608 align = 1; 609 rollback_i = i; 610 for (;;) { 611 if (size + BM_BLOCK_SIZE > max_bio_size) 612 break; 613 614 /* Be always aligned */ 615 if (sector & ((1<<(align+3))-1)) 616 break; 617 618 /* do not cross extent boundaries */ 619 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0) 620 break; 621 /* now, is it actually dirty, after all? 622 * caution, drbd_bm_test_bit is tri-state for some 623 * obscure reason; ( b == 0 ) would get the out-of-band 624 * only accidentally right because of the "oddly sized" 625 * adjustment below */ 626 if (drbd_bm_test_bit(mdev, bit+1) != 1) 627 break; 628 bit++; 629 size += BM_BLOCK_SIZE; 630 if ((BM_BLOCK_SIZE << align) <= size) 631 align++; 632 i++; 633 } 634 /* if we merged some, 635 * reset the offset to start the next drbd_bm_find_next from */ 636 if (size > BM_BLOCK_SIZE) 637 mdev->bm_resync_fo = bit + 1; 638 #endif 639 640 /* adjust very last sectors, in case we are oddly sized */ 641 if (sector + (size>>9) > capacity) 642 size = (capacity-sector)<<9; 643 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) { 644 switch (read_for_csum(mdev, sector, size)) { 645 case -EIO: /* Disk failure */ 646 put_ldev(mdev); 647 return 0; 648 case -EAGAIN: /* allocation failed, or ldev busy */ 649 drbd_rs_complete_io(mdev, sector); 650 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector); 651 i = rollback_i; 652 goto requeue; 653 case 0: 654 /* everything ok */ 655 break; 656 default: 657 BUG(); 658 } 659 } else { 660 inc_rs_pending(mdev); 661 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST, 662 sector, size, ID_SYNCER)) { 663 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n"); 664 dec_rs_pending(mdev); 665 put_ldev(mdev); 666 return 0; 667 } 668 } 669 } 670 671 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) { 672 /* last syncer _request_ was sent, 673 * but the P_RS_DATA_REPLY not yet received. sync will end (and 674 * next sync group will resume), as soon as we receive the last 675 * resync data block, and the last bit is cleared. 676 * until then resync "work" is "inactive" ... 677 */ 678 mdev->resync_work.cb = w_resync_inactive; 679 put_ldev(mdev); 680 return 1; 681 } 682 683 requeue: 684 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 685 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME); 686 put_ldev(mdev); 687 return 1; 688 } 689 690 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 691 { 692 int number, i, size; 693 sector_t sector; 694 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 695 696 if (unlikely(cancel)) 697 return 1; 698 699 if (unlikely(mdev->state.conn < C_CONNECTED)) { 700 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected"); 701 return 0; 702 } 703 704 number = drbd_rs_number_requests(mdev); 705 706 sector = mdev->ov_position; 707 for (i = 0; i < number; i++) { 708 if (sector >= capacity) { 709 mdev->resync_work.cb = w_resync_inactive; 710 return 1; 711 } 712 713 size = BM_BLOCK_SIZE; 714 715 if (drbd_rs_should_slow_down(mdev, sector) || 716 drbd_try_rs_begin_io(mdev, sector)) { 717 mdev->ov_position = sector; 718 goto requeue; 719 } 720 721 if (sector + (size>>9) > capacity) 722 size = (capacity-sector)<<9; 723 724 inc_rs_pending(mdev); 725 if (!drbd_send_ov_request(mdev, sector, size)) { 726 dec_rs_pending(mdev); 727 return 0; 728 } 729 sector += BM_SECT_PER_BIT; 730 } 731 mdev->ov_position = sector; 732 733 requeue: 734 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 735 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME); 736 return 1; 737 } 738 739 740 int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 741 { 742 drbd_start_resync(mdev, C_SYNC_SOURCE); 743 744 return 1; 745 } 746 747 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 748 { 749 kfree(w); 750 ov_oos_print(mdev); 751 drbd_resync_finished(mdev); 752 753 return 1; 754 } 755 756 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 757 { 758 kfree(w); 759 760 drbd_resync_finished(mdev); 761 762 return 1; 763 } 764 765 static void ping_peer(struct drbd_conf *mdev) 766 { 767 clear_bit(GOT_PING_ACK, &mdev->flags); 768 request_ping(mdev); 769 wait_event(mdev->misc_wait, 770 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED); 771 } 772 773 int drbd_resync_finished(struct drbd_conf *mdev) 774 { 775 unsigned long db, dt, dbdt; 776 unsigned long n_oos; 777 union drbd_state os, ns; 778 struct drbd_work *w; 779 char *khelper_cmd = NULL; 780 int verify_done = 0; 781 782 /* Remove all elements from the resync LRU. Since future actions 783 * might set bits in the (main) bitmap, then the entries in the 784 * resync LRU would be wrong. */ 785 if (drbd_rs_del_all(mdev)) { 786 /* In case this is not possible now, most probably because 787 * there are P_RS_DATA_REPLY Packets lingering on the worker's 788 * queue (or even the read operations for those packets 789 * is not finished by now). Retry in 100ms. */ 790 791 __set_current_state(TASK_INTERRUPTIBLE); 792 schedule_timeout(HZ / 10); 793 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC); 794 if (w) { 795 w->cb = w_resync_finished; 796 drbd_queue_work(&mdev->data.work, w); 797 return 1; 798 } 799 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n"); 800 } 801 802 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ; 803 if (dt <= 0) 804 dt = 1; 805 db = mdev->rs_total; 806 dbdt = Bit2KB(db/dt); 807 mdev->rs_paused /= HZ; 808 809 if (!get_ldev(mdev)) 810 goto out; 811 812 ping_peer(mdev); 813 814 spin_lock_irq(&mdev->req_lock); 815 os = mdev->state; 816 817 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T); 818 819 /* This protects us against multiple calls (that can happen in the presence 820 of application IO), and against connectivity loss just before we arrive here. */ 821 if (os.conn <= C_CONNECTED) 822 goto out_unlock; 823 824 ns = os; 825 ns.conn = C_CONNECTED; 826 827 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n", 828 verify_done ? "Online verify " : "Resync", 829 dt + mdev->rs_paused, mdev->rs_paused, dbdt); 830 831 n_oos = drbd_bm_total_weight(mdev); 832 833 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) { 834 if (n_oos) { 835 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n", 836 n_oos, Bit2KB(1)); 837 khelper_cmd = "out-of-sync"; 838 } 839 } else { 840 D_ASSERT((n_oos - mdev->rs_failed) == 0); 841 842 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 843 khelper_cmd = "after-resync-target"; 844 845 if (mdev->csums_tfm && mdev->rs_total) { 846 const unsigned long s = mdev->rs_same_csum; 847 const unsigned long t = mdev->rs_total; 848 const int ratio = 849 (t == 0) ? 0 : 850 (t < 100000) ? ((s*100)/t) : (s/(t/100)); 851 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; " 852 "transferred %luK total %luK\n", 853 ratio, 854 Bit2KB(mdev->rs_same_csum), 855 Bit2KB(mdev->rs_total - mdev->rs_same_csum), 856 Bit2KB(mdev->rs_total)); 857 } 858 } 859 860 if (mdev->rs_failed) { 861 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed); 862 863 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 864 ns.disk = D_INCONSISTENT; 865 ns.pdsk = D_UP_TO_DATE; 866 } else { 867 ns.disk = D_UP_TO_DATE; 868 ns.pdsk = D_INCONSISTENT; 869 } 870 } else { 871 ns.disk = D_UP_TO_DATE; 872 ns.pdsk = D_UP_TO_DATE; 873 874 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 875 if (mdev->p_uuid) { 876 int i; 877 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++) 878 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]); 879 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]); 880 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]); 881 } else { 882 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n"); 883 } 884 } 885 886 drbd_uuid_set_bm(mdev, 0UL); 887 888 if (mdev->p_uuid) { 889 /* Now the two UUID sets are equal, update what we 890 * know of the peer. */ 891 int i; 892 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++) 893 mdev->p_uuid[i] = mdev->ldev->md.uuid[i]; 894 } 895 } 896 897 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 898 out_unlock: 899 spin_unlock_irq(&mdev->req_lock); 900 put_ldev(mdev); 901 out: 902 mdev->rs_total = 0; 903 mdev->rs_failed = 0; 904 mdev->rs_paused = 0; 905 if (verify_done) 906 mdev->ov_start_sector = 0; 907 908 drbd_md_sync(mdev); 909 910 if (khelper_cmd) 911 drbd_khelper(mdev, khelper_cmd); 912 913 return 1; 914 } 915 916 /* helper */ 917 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e) 918 { 919 if (drbd_ee_has_active_page(e)) { 920 /* This might happen if sendpage() has not finished */ 921 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT; 922 atomic_add(i, &mdev->pp_in_use_by_net); 923 atomic_sub(i, &mdev->pp_in_use); 924 spin_lock_irq(&mdev->req_lock); 925 list_add_tail(&e->w.list, &mdev->net_ee); 926 spin_unlock_irq(&mdev->req_lock); 927 wake_up(&drbd_pp_wait); 928 } else 929 drbd_free_ee(mdev, e); 930 } 931 932 /** 933 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 934 * @mdev: DRBD device. 935 * @w: work object. 936 * @cancel: The connection will be closed anyways 937 */ 938 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 939 { 940 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 941 int ok; 942 943 if (unlikely(cancel)) { 944 drbd_free_ee(mdev, e); 945 dec_unacked(mdev); 946 return 1; 947 } 948 949 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 950 ok = drbd_send_block(mdev, P_DATA_REPLY, e); 951 } else { 952 if (__ratelimit(&drbd_ratelimit_state)) 953 dev_err(DEV, "Sending NegDReply. sector=%llus.\n", 954 (unsigned long long)e->sector); 955 956 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e); 957 } 958 959 dec_unacked(mdev); 960 961 move_to_net_ee_or_free(mdev, e); 962 963 if (unlikely(!ok)) 964 dev_err(DEV, "drbd_send_block() failed\n"); 965 return ok; 966 } 967 968 /** 969 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS 970 * @mdev: DRBD device. 971 * @w: work object. 972 * @cancel: The connection will be closed anyways 973 */ 974 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 975 { 976 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 977 int ok; 978 979 if (unlikely(cancel)) { 980 drbd_free_ee(mdev, e); 981 dec_unacked(mdev); 982 return 1; 983 } 984 985 if (get_ldev_if_state(mdev, D_FAILED)) { 986 drbd_rs_complete_io(mdev, e->sector); 987 put_ldev(mdev); 988 } 989 990 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 991 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) { 992 inc_rs_pending(mdev); 993 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e); 994 } else { 995 if (__ratelimit(&drbd_ratelimit_state)) 996 dev_err(DEV, "Not sending RSDataReply, " 997 "partner DISKLESS!\n"); 998 ok = 1; 999 } 1000 } else { 1001 if (__ratelimit(&drbd_ratelimit_state)) 1002 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n", 1003 (unsigned long long)e->sector); 1004 1005 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 1006 1007 /* update resync data with failure */ 1008 drbd_rs_failed_io(mdev, e->sector, e->size); 1009 } 1010 1011 dec_unacked(mdev); 1012 1013 move_to_net_ee_or_free(mdev, e); 1014 1015 if (unlikely(!ok)) 1016 dev_err(DEV, "drbd_send_block() failed\n"); 1017 return ok; 1018 } 1019 1020 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1021 { 1022 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 1023 struct digest_info *di; 1024 int digest_size; 1025 void *digest = NULL; 1026 int ok, eq = 0; 1027 1028 if (unlikely(cancel)) { 1029 drbd_free_ee(mdev, e); 1030 dec_unacked(mdev); 1031 return 1; 1032 } 1033 1034 if (get_ldev(mdev)) { 1035 drbd_rs_complete_io(mdev, e->sector); 1036 put_ldev(mdev); 1037 } 1038 1039 di = e->digest; 1040 1041 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 1042 /* quick hack to try to avoid a race against reconfiguration. 1043 * a real fix would be much more involved, 1044 * introducing more locking mechanisms */ 1045 if (mdev->csums_tfm) { 1046 digest_size = crypto_hash_digestsize(mdev->csums_tfm); 1047 D_ASSERT(digest_size == di->digest_size); 1048 digest = kmalloc(digest_size, GFP_NOIO); 1049 } 1050 if (digest) { 1051 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest); 1052 eq = !memcmp(digest, di->digest, digest_size); 1053 kfree(digest); 1054 } 1055 1056 if (eq) { 1057 drbd_set_in_sync(mdev, e->sector, e->size); 1058 /* rs_same_csums unit is BM_BLOCK_SIZE */ 1059 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT; 1060 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e); 1061 } else { 1062 inc_rs_pending(mdev); 1063 e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */ 1064 e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */ 1065 kfree(di); 1066 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e); 1067 } 1068 } else { 1069 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 1070 if (__ratelimit(&drbd_ratelimit_state)) 1071 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n"); 1072 } 1073 1074 dec_unacked(mdev); 1075 move_to_net_ee_or_free(mdev, e); 1076 1077 if (unlikely(!ok)) 1078 dev_err(DEV, "drbd_send_block/ack() failed\n"); 1079 return ok; 1080 } 1081 1082 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1083 { 1084 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 1085 int digest_size; 1086 void *digest; 1087 int ok = 1; 1088 1089 if (unlikely(cancel)) 1090 goto out; 1091 1092 if (unlikely((e->flags & EE_WAS_ERROR) != 0)) 1093 goto out; 1094 1095 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 1096 /* FIXME if this allocation fails, online verify will not terminate! */ 1097 digest = kmalloc(digest_size, GFP_NOIO); 1098 if (digest) { 1099 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest); 1100 inc_rs_pending(mdev); 1101 ok = drbd_send_drequest_csum(mdev, e->sector, e->size, 1102 digest, digest_size, P_OV_REPLY); 1103 if (!ok) 1104 dec_rs_pending(mdev); 1105 kfree(digest); 1106 } 1107 1108 out: 1109 drbd_free_ee(mdev, e); 1110 1111 dec_unacked(mdev); 1112 1113 return ok; 1114 } 1115 1116 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size) 1117 { 1118 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) { 1119 mdev->ov_last_oos_size += size>>9; 1120 } else { 1121 mdev->ov_last_oos_start = sector; 1122 mdev->ov_last_oos_size = size>>9; 1123 } 1124 drbd_set_out_of_sync(mdev, sector, size); 1125 } 1126 1127 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1128 { 1129 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 1130 struct digest_info *di; 1131 int digest_size; 1132 void *digest; 1133 int ok, eq = 0; 1134 1135 if (unlikely(cancel)) { 1136 drbd_free_ee(mdev, e); 1137 dec_unacked(mdev); 1138 return 1; 1139 } 1140 1141 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all 1142 * the resync lru has been cleaned up already */ 1143 if (get_ldev(mdev)) { 1144 drbd_rs_complete_io(mdev, e->sector); 1145 put_ldev(mdev); 1146 } 1147 1148 di = e->digest; 1149 1150 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 1151 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 1152 digest = kmalloc(digest_size, GFP_NOIO); 1153 if (digest) { 1154 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest); 1155 1156 D_ASSERT(digest_size == di->digest_size); 1157 eq = !memcmp(digest, di->digest, digest_size); 1158 kfree(digest); 1159 } 1160 } else { 1161 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 1162 if (__ratelimit(&drbd_ratelimit_state)) 1163 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n"); 1164 } 1165 1166 dec_unacked(mdev); 1167 if (!eq) 1168 drbd_ov_oos_found(mdev, e->sector, e->size); 1169 else 1170 ov_oos_print(mdev); 1171 1172 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size, 1173 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1174 1175 drbd_free_ee(mdev, e); 1176 1177 --mdev->ov_left; 1178 1179 /* let's advance progress step marks only for every other megabyte */ 1180 if ((mdev->ov_left & 0x200) == 0x200) 1181 drbd_advance_rs_marks(mdev, mdev->ov_left); 1182 1183 if (mdev->ov_left == 0) { 1184 ov_oos_print(mdev); 1185 drbd_resync_finished(mdev); 1186 } 1187 1188 return ok; 1189 } 1190 1191 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1192 { 1193 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w); 1194 complete(&b->done); 1195 return 1; 1196 } 1197 1198 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1199 { 1200 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w); 1201 struct p_barrier *p = &mdev->data.sbuf.barrier; 1202 int ok = 1; 1203 1204 /* really avoid racing with tl_clear. w.cb may have been referenced 1205 * just before it was reassigned and re-queued, so double check that. 1206 * actually, this race was harmless, since we only try to send the 1207 * barrier packet here, and otherwise do nothing with the object. 1208 * but compare with the head of w_clear_epoch */ 1209 spin_lock_irq(&mdev->req_lock); 1210 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED) 1211 cancel = 1; 1212 spin_unlock_irq(&mdev->req_lock); 1213 if (cancel) 1214 return 1; 1215 1216 if (!drbd_get_data_sock(mdev)) 1217 return 0; 1218 p->barrier = b->br_number; 1219 /* inc_ap_pending was done where this was queued. 1220 * dec_ap_pending will be done in got_BarrierAck 1221 * or (on connection loss) in w_clear_epoch. */ 1222 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER, 1223 (struct p_header80 *)p, sizeof(*p), 0); 1224 drbd_put_data_sock(mdev); 1225 1226 return ok; 1227 } 1228 1229 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1230 { 1231 if (cancel) 1232 return 1; 1233 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE); 1234 } 1235 1236 int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1237 { 1238 struct drbd_request *req = container_of(w, struct drbd_request, w); 1239 int ok; 1240 1241 if (unlikely(cancel)) { 1242 req_mod(req, send_canceled); 1243 return 1; 1244 } 1245 1246 ok = drbd_send_oos(mdev, req); 1247 req_mod(req, oos_handed_to_network); 1248 1249 return ok; 1250 } 1251 1252 /** 1253 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request 1254 * @mdev: DRBD device. 1255 * @w: work object. 1256 * @cancel: The connection will be closed anyways 1257 */ 1258 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1259 { 1260 struct drbd_request *req = container_of(w, struct drbd_request, w); 1261 int ok; 1262 1263 if (unlikely(cancel)) { 1264 req_mod(req, send_canceled); 1265 return 1; 1266 } 1267 1268 ok = drbd_send_dblock(mdev, req); 1269 req_mod(req, ok ? handed_over_to_network : send_failed); 1270 1271 return ok; 1272 } 1273 1274 /** 1275 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet 1276 * @mdev: DRBD device. 1277 * @w: work object. 1278 * @cancel: The connection will be closed anyways 1279 */ 1280 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1281 { 1282 struct drbd_request *req = container_of(w, struct drbd_request, w); 1283 int ok; 1284 1285 if (unlikely(cancel)) { 1286 req_mod(req, send_canceled); 1287 return 1; 1288 } 1289 1290 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size, 1291 (unsigned long)req); 1292 1293 if (!ok) { 1294 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send(); 1295 * so this is probably redundant */ 1296 if (mdev->state.conn >= C_CONNECTED) 1297 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE)); 1298 } 1299 req_mod(req, ok ? handed_over_to_network : send_failed); 1300 1301 return ok; 1302 } 1303 1304 int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1305 { 1306 struct drbd_request *req = container_of(w, struct drbd_request, w); 1307 1308 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG) 1309 drbd_al_begin_io(mdev, req->sector); 1310 /* Calling drbd_al_begin_io() out of the worker might deadlocks 1311 theoretically. Practically it can not deadlock, since this is 1312 only used when unfreezing IOs. All the extents of the requests 1313 that made it into the TL are already active */ 1314 1315 drbd_req_make_private_bio(req, req->master_bio); 1316 req->private_bio->bi_bdev = mdev->ldev->backing_bdev; 1317 generic_make_request(req->private_bio); 1318 1319 return 1; 1320 } 1321 1322 static int _drbd_may_sync_now(struct drbd_conf *mdev) 1323 { 1324 struct drbd_conf *odev = mdev; 1325 1326 while (1) { 1327 if (odev->sync_conf.after == -1) 1328 return 1; 1329 odev = minor_to_mdev(odev->sync_conf.after); 1330 ERR_IF(!odev) return 1; 1331 if ((odev->state.conn >= C_SYNC_SOURCE && 1332 odev->state.conn <= C_PAUSED_SYNC_T) || 1333 odev->state.aftr_isp || odev->state.peer_isp || 1334 odev->state.user_isp) 1335 return 0; 1336 } 1337 } 1338 1339 /** 1340 * _drbd_pause_after() - Pause resync on all devices that may not resync now 1341 * @mdev: DRBD device. 1342 * 1343 * Called from process context only (admin command and after_state_ch). 1344 */ 1345 static int _drbd_pause_after(struct drbd_conf *mdev) 1346 { 1347 struct drbd_conf *odev; 1348 int i, rv = 0; 1349 1350 for (i = 0; i < minor_count; i++) { 1351 odev = minor_to_mdev(i); 1352 if (!odev) 1353 continue; 1354 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1355 continue; 1356 if (!_drbd_may_sync_now(odev)) 1357 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL) 1358 != SS_NOTHING_TO_DO); 1359 } 1360 1361 return rv; 1362 } 1363 1364 /** 1365 * _drbd_resume_next() - Resume resync on all devices that may resync now 1366 * @mdev: DRBD device. 1367 * 1368 * Called from process context only (admin command and worker). 1369 */ 1370 static int _drbd_resume_next(struct drbd_conf *mdev) 1371 { 1372 struct drbd_conf *odev; 1373 int i, rv = 0; 1374 1375 for (i = 0; i < minor_count; i++) { 1376 odev = minor_to_mdev(i); 1377 if (!odev) 1378 continue; 1379 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1380 continue; 1381 if (odev->state.aftr_isp) { 1382 if (_drbd_may_sync_now(odev)) 1383 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0), 1384 CS_HARD, NULL) 1385 != SS_NOTHING_TO_DO) ; 1386 } 1387 } 1388 return rv; 1389 } 1390 1391 void resume_next_sg(struct drbd_conf *mdev) 1392 { 1393 write_lock_irq(&global_state_lock); 1394 _drbd_resume_next(mdev); 1395 write_unlock_irq(&global_state_lock); 1396 } 1397 1398 void suspend_other_sg(struct drbd_conf *mdev) 1399 { 1400 write_lock_irq(&global_state_lock); 1401 _drbd_pause_after(mdev); 1402 write_unlock_irq(&global_state_lock); 1403 } 1404 1405 static int sync_after_error(struct drbd_conf *mdev, int o_minor) 1406 { 1407 struct drbd_conf *odev; 1408 1409 if (o_minor == -1) 1410 return NO_ERROR; 1411 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL) 1412 return ERR_SYNC_AFTER; 1413 1414 /* check for loops */ 1415 odev = minor_to_mdev(o_minor); 1416 while (1) { 1417 if (odev == mdev) 1418 return ERR_SYNC_AFTER_CYCLE; 1419 1420 /* dependency chain ends here, no cycles. */ 1421 if (odev->sync_conf.after == -1) 1422 return NO_ERROR; 1423 1424 /* follow the dependency chain */ 1425 odev = minor_to_mdev(odev->sync_conf.after); 1426 } 1427 } 1428 1429 int drbd_alter_sa(struct drbd_conf *mdev, int na) 1430 { 1431 int changes; 1432 int retcode; 1433 1434 write_lock_irq(&global_state_lock); 1435 retcode = sync_after_error(mdev, na); 1436 if (retcode == NO_ERROR) { 1437 mdev->sync_conf.after = na; 1438 do { 1439 changes = _drbd_pause_after(mdev); 1440 changes |= _drbd_resume_next(mdev); 1441 } while (changes); 1442 } 1443 write_unlock_irq(&global_state_lock); 1444 return retcode; 1445 } 1446 1447 void drbd_rs_controller_reset(struct drbd_conf *mdev) 1448 { 1449 atomic_set(&mdev->rs_sect_in, 0); 1450 atomic_set(&mdev->rs_sect_ev, 0); 1451 mdev->rs_in_flight = 0; 1452 mdev->rs_planed = 0; 1453 spin_lock(&mdev->peer_seq_lock); 1454 fifo_set(&mdev->rs_plan_s, 0); 1455 spin_unlock(&mdev->peer_seq_lock); 1456 } 1457 1458 /** 1459 * drbd_start_resync() - Start the resync process 1460 * @mdev: DRBD device. 1461 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET 1462 * 1463 * This function might bring you directly into one of the 1464 * C_PAUSED_SYNC_* states. 1465 */ 1466 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side) 1467 { 1468 union drbd_state ns; 1469 int r; 1470 1471 if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) { 1472 dev_err(DEV, "Resync already running!\n"); 1473 return; 1474 } 1475 1476 if (mdev->state.conn < C_AHEAD) { 1477 /* In case a previous resync run was aborted by an IO error/detach on the peer. */ 1478 drbd_rs_cancel_all(mdev); 1479 /* This should be done when we abort the resync. We definitely do not 1480 want to have this for connections going back and forth between 1481 Ahead/Behind and SyncSource/SyncTarget */ 1482 } 1483 1484 if (side == C_SYNC_TARGET) { 1485 /* Since application IO was locked out during C_WF_BITMAP_T and 1486 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET 1487 we check that we might make the data inconsistent. */ 1488 r = drbd_khelper(mdev, "before-resync-target"); 1489 r = (r >> 8) & 0xff; 1490 if (r > 0) { 1491 dev_info(DEV, "before-resync-target handler returned %d, " 1492 "dropping connection.\n", r); 1493 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 1494 return; 1495 } 1496 } else /* C_SYNC_SOURCE */ { 1497 r = drbd_khelper(mdev, "before-resync-source"); 1498 r = (r >> 8) & 0xff; 1499 if (r > 0) { 1500 if (r == 3) { 1501 dev_info(DEV, "before-resync-source handler returned %d, " 1502 "ignoring. Old userland tools?", r); 1503 } else { 1504 dev_info(DEV, "before-resync-source handler returned %d, " 1505 "dropping connection.\n", r); 1506 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 1507 return; 1508 } 1509 } 1510 } 1511 1512 drbd_state_lock(mdev); 1513 1514 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) { 1515 drbd_state_unlock(mdev); 1516 return; 1517 } 1518 1519 if (side == C_SYNC_TARGET) { 1520 mdev->bm_resync_fo = 0; 1521 } else /* side == C_SYNC_SOURCE */ { 1522 u64 uuid; 1523 1524 get_random_bytes(&uuid, sizeof(u64)); 1525 drbd_uuid_set(mdev, UI_BITMAP, uuid); 1526 drbd_send_sync_uuid(mdev, uuid); 1527 1528 D_ASSERT(mdev->state.disk == D_UP_TO_DATE); 1529 } 1530 1531 write_lock_irq(&global_state_lock); 1532 ns = mdev->state; 1533 1534 ns.aftr_isp = !_drbd_may_sync_now(mdev); 1535 1536 ns.conn = side; 1537 1538 if (side == C_SYNC_TARGET) 1539 ns.disk = D_INCONSISTENT; 1540 else /* side == C_SYNC_SOURCE */ 1541 ns.pdsk = D_INCONSISTENT; 1542 1543 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 1544 ns = mdev->state; 1545 1546 if (ns.conn < C_CONNECTED) 1547 r = SS_UNKNOWN_ERROR; 1548 1549 if (r == SS_SUCCESS) { 1550 unsigned long tw = drbd_bm_total_weight(mdev); 1551 unsigned long now = jiffies; 1552 int i; 1553 1554 mdev->rs_failed = 0; 1555 mdev->rs_paused = 0; 1556 mdev->rs_same_csum = 0; 1557 mdev->rs_last_events = 0; 1558 mdev->rs_last_sect_ev = 0; 1559 mdev->rs_total = tw; 1560 mdev->rs_start = now; 1561 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 1562 mdev->rs_mark_left[i] = tw; 1563 mdev->rs_mark_time[i] = now; 1564 } 1565 _drbd_pause_after(mdev); 1566 } 1567 write_unlock_irq(&global_state_lock); 1568 put_ldev(mdev); 1569 1570 if (r == SS_SUCCESS) { 1571 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n", 1572 drbd_conn_str(ns.conn), 1573 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10), 1574 (unsigned long) mdev->rs_total); 1575 1576 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) { 1577 /* This still has a race (about when exactly the peers 1578 * detect connection loss) that can lead to a full sync 1579 * on next handshake. In 8.3.9 we fixed this with explicit 1580 * resync-finished notifications, but the fix 1581 * introduces a protocol change. Sleeping for some 1582 * time longer than the ping interval + timeout on the 1583 * SyncSource, to give the SyncTarget the chance to 1584 * detect connection loss, then waiting for a ping 1585 * response (implicit in drbd_resync_finished) reduces 1586 * the race considerably, but does not solve it. */ 1587 if (side == C_SYNC_SOURCE) 1588 schedule_timeout_interruptible( 1589 mdev->net_conf->ping_int * HZ + 1590 mdev->net_conf->ping_timeo*HZ/9); 1591 drbd_resync_finished(mdev); 1592 } 1593 1594 drbd_rs_controller_reset(mdev); 1595 /* ns.conn may already be != mdev->state.conn, 1596 * we may have been paused in between, or become paused until 1597 * the timer triggers. 1598 * No matter, that is handled in resync_timer_fn() */ 1599 if (ns.conn == C_SYNC_TARGET) 1600 mod_timer(&mdev->resync_timer, jiffies); 1601 1602 drbd_md_sync(mdev); 1603 } 1604 drbd_state_unlock(mdev); 1605 } 1606 1607 int drbd_worker(struct drbd_thread *thi) 1608 { 1609 struct drbd_conf *mdev = thi->mdev; 1610 struct drbd_work *w = NULL; 1611 LIST_HEAD(work_list); 1612 int intr = 0, i; 1613 1614 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev)); 1615 1616 while (get_t_state(thi) == Running) { 1617 drbd_thread_current_set_cpu(mdev); 1618 1619 if (down_trylock(&mdev->data.work.s)) { 1620 mutex_lock(&mdev->data.mutex); 1621 if (mdev->data.socket && !mdev->net_conf->no_cork) 1622 drbd_tcp_uncork(mdev->data.socket); 1623 mutex_unlock(&mdev->data.mutex); 1624 1625 intr = down_interruptible(&mdev->data.work.s); 1626 1627 mutex_lock(&mdev->data.mutex); 1628 if (mdev->data.socket && !mdev->net_conf->no_cork) 1629 drbd_tcp_cork(mdev->data.socket); 1630 mutex_unlock(&mdev->data.mutex); 1631 } 1632 1633 if (intr) { 1634 D_ASSERT(intr == -EINTR); 1635 flush_signals(current); 1636 ERR_IF (get_t_state(thi) == Running) 1637 continue; 1638 break; 1639 } 1640 1641 if (get_t_state(thi) != Running) 1642 break; 1643 /* With this break, we have done a down() but not consumed 1644 the entry from the list. The cleanup code takes care of 1645 this... */ 1646 1647 w = NULL; 1648 spin_lock_irq(&mdev->data.work.q_lock); 1649 ERR_IF(list_empty(&mdev->data.work.q)) { 1650 /* something terribly wrong in our logic. 1651 * we were able to down() the semaphore, 1652 * but the list is empty... doh. 1653 * 1654 * what is the best thing to do now? 1655 * try again from scratch, restarting the receiver, 1656 * asender, whatnot? could break even more ugly, 1657 * e.g. when we are primary, but no good local data. 1658 * 1659 * I'll try to get away just starting over this loop. 1660 */ 1661 spin_unlock_irq(&mdev->data.work.q_lock); 1662 continue; 1663 } 1664 w = list_entry(mdev->data.work.q.next, struct drbd_work, list); 1665 list_del_init(&w->list); 1666 spin_unlock_irq(&mdev->data.work.q_lock); 1667 1668 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) { 1669 /* dev_warn(DEV, "worker: a callback failed! \n"); */ 1670 if (mdev->state.conn >= C_CONNECTED) 1671 drbd_force_state(mdev, 1672 NS(conn, C_NETWORK_FAILURE)); 1673 } 1674 } 1675 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags)); 1676 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags)); 1677 1678 spin_lock_irq(&mdev->data.work.q_lock); 1679 i = 0; 1680 while (!list_empty(&mdev->data.work.q)) { 1681 list_splice_init(&mdev->data.work.q, &work_list); 1682 spin_unlock_irq(&mdev->data.work.q_lock); 1683 1684 while (!list_empty(&work_list)) { 1685 w = list_entry(work_list.next, struct drbd_work, list); 1686 list_del_init(&w->list); 1687 w->cb(mdev, w, 1); 1688 i++; /* dead debugging code */ 1689 } 1690 1691 spin_lock_irq(&mdev->data.work.q_lock); 1692 } 1693 sema_init(&mdev->data.work.s, 0); 1694 /* DANGEROUS race: if someone did queue his work within the spinlock, 1695 * but up() ed outside the spinlock, we could get an up() on the 1696 * semaphore without corresponding list entry. 1697 * So don't do that. 1698 */ 1699 spin_unlock_irq(&mdev->data.work.q_lock); 1700 1701 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE); 1702 /* _drbd_set_state only uses stop_nowait. 1703 * wait here for the Exiting receiver. */ 1704 drbd_thread_stop(&mdev->receiver); 1705 drbd_mdev_cleanup(mdev); 1706 1707 dev_info(DEV, "worker terminated\n"); 1708 1709 clear_bit(DEVICE_DYING, &mdev->flags); 1710 clear_bit(CONFIG_PENDING, &mdev->flags); 1711 wake_up(&mdev->state_wait); 1712 1713 return 0; 1714 } 1715