1 /* 2 drbd_worker.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24 */ 25 26 #include <linux/module.h> 27 #include <linux/version.h> 28 #include <linux/drbd.h> 29 #include <linux/sched.h> 30 #include <linux/smp_lock.h> 31 #include <linux/wait.h> 32 #include <linux/mm.h> 33 #include <linux/memcontrol.h> 34 #include <linux/mm_inline.h> 35 #include <linux/slab.h> 36 #include <linux/random.h> 37 #include <linux/string.h> 38 #include <linux/scatterlist.h> 39 40 #include "drbd_int.h" 41 #include "drbd_req.h" 42 43 #define SLEEP_TIME (HZ/10) 44 45 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel); 46 47 48 49 /* defined here: 50 drbd_md_io_complete 51 drbd_endio_write_sec 52 drbd_endio_read_sec 53 drbd_endio_pri 54 55 * more endio handlers: 56 atodb_endio in drbd_actlog.c 57 drbd_bm_async_io_complete in drbd_bitmap.c 58 59 * For all these callbacks, note the following: 60 * The callbacks will be called in irq context by the IDE drivers, 61 * and in Softirqs/Tasklets/BH context by the SCSI drivers. 62 * Try to get the locking right :) 63 * 64 */ 65 66 67 /* About the global_state_lock 68 Each state transition on an device holds a read lock. In case we have 69 to evaluate the sync after dependencies, we grab a write lock, because 70 we need stable states on all devices for that. */ 71 rwlock_t global_state_lock; 72 73 /* used for synchronous meta data and bitmap IO 74 * submitted by drbd_md_sync_page_io() 75 */ 76 void drbd_md_io_complete(struct bio *bio, int error) 77 { 78 struct drbd_md_io *md_io; 79 80 md_io = (struct drbd_md_io *)bio->bi_private; 81 md_io->error = error; 82 83 complete(&md_io->event); 84 } 85 86 /* reads on behalf of the partner, 87 * "submitted" by the receiver 88 */ 89 void drbd_endio_read_sec(struct bio *bio, int error) __releases(local) 90 { 91 unsigned long flags = 0; 92 struct drbd_epoch_entry *e = NULL; 93 struct drbd_conf *mdev; 94 int uptodate = bio_flagged(bio, BIO_UPTODATE); 95 96 e = bio->bi_private; 97 mdev = e->mdev; 98 99 if (error) 100 dev_warn(DEV, "read: error=%d s=%llus\n", error, 101 (unsigned long long)e->sector); 102 if (!error && !uptodate) { 103 dev_warn(DEV, "read: setting error to -EIO s=%llus\n", 104 (unsigned long long)e->sector); 105 /* strange behavior of some lower level drivers... 106 * fail the request by clearing the uptodate flag, 107 * but do not return any error?! */ 108 error = -EIO; 109 } 110 111 D_ASSERT(e->block_id != ID_VACANT); 112 113 spin_lock_irqsave(&mdev->req_lock, flags); 114 mdev->read_cnt += e->size >> 9; 115 list_del(&e->w.list); 116 if (list_empty(&mdev->read_ee)) 117 wake_up(&mdev->ee_wait); 118 spin_unlock_irqrestore(&mdev->req_lock, flags); 119 120 drbd_chk_io_error(mdev, error, FALSE); 121 drbd_queue_work(&mdev->data.work, &e->w); 122 put_ldev(mdev); 123 } 124 125 /* writes on behalf of the partner, or resync writes, 126 * "submitted" by the receiver. 127 */ 128 void drbd_endio_write_sec(struct bio *bio, int error) __releases(local) 129 { 130 unsigned long flags = 0; 131 struct drbd_epoch_entry *e = NULL; 132 struct drbd_conf *mdev; 133 sector_t e_sector; 134 int do_wake; 135 int is_syncer_req; 136 int do_al_complete_io; 137 int uptodate = bio_flagged(bio, BIO_UPTODATE); 138 int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER); 139 140 e = bio->bi_private; 141 mdev = e->mdev; 142 143 if (error) 144 dev_warn(DEV, "write: error=%d s=%llus\n", error, 145 (unsigned long long)e->sector); 146 if (!error && !uptodate) { 147 dev_warn(DEV, "write: setting error to -EIO s=%llus\n", 148 (unsigned long long)e->sector); 149 /* strange behavior of some lower level drivers... 150 * fail the request by clearing the uptodate flag, 151 * but do not return any error?! */ 152 error = -EIO; 153 } 154 155 /* error == -ENOTSUPP would be a better test, 156 * alas it is not reliable */ 157 if (error && is_barrier && e->flags & EE_IS_BARRIER) { 158 drbd_bump_write_ordering(mdev, WO_bdev_flush); 159 spin_lock_irqsave(&mdev->req_lock, flags); 160 list_del(&e->w.list); 161 e->w.cb = w_e_reissue; 162 /* put_ldev actually happens below, once we come here again. */ 163 __release(local); 164 spin_unlock_irqrestore(&mdev->req_lock, flags); 165 drbd_queue_work(&mdev->data.work, &e->w); 166 return; 167 } 168 169 D_ASSERT(e->block_id != ID_VACANT); 170 171 spin_lock_irqsave(&mdev->req_lock, flags); 172 mdev->writ_cnt += e->size >> 9; 173 is_syncer_req = is_syncer_block_id(e->block_id); 174 175 /* after we moved e to done_ee, 176 * we may no longer access it, 177 * it may be freed/reused already! 178 * (as soon as we release the req_lock) */ 179 e_sector = e->sector; 180 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO; 181 182 list_del(&e->w.list); /* has been on active_ee or sync_ee */ 183 list_add_tail(&e->w.list, &mdev->done_ee); 184 185 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet, 186 * neither did we wake possibly waiting conflicting requests. 187 * done from "drbd_process_done_ee" within the appropriate w.cb 188 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */ 189 190 do_wake = is_syncer_req 191 ? list_empty(&mdev->sync_ee) 192 : list_empty(&mdev->active_ee); 193 194 if (error) 195 __drbd_chk_io_error(mdev, FALSE); 196 spin_unlock_irqrestore(&mdev->req_lock, flags); 197 198 if (is_syncer_req) 199 drbd_rs_complete_io(mdev, e_sector); 200 201 if (do_wake) 202 wake_up(&mdev->ee_wait); 203 204 if (do_al_complete_io) 205 drbd_al_complete_io(mdev, e_sector); 206 207 wake_asender(mdev); 208 put_ldev(mdev); 209 210 } 211 212 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request 213 */ 214 void drbd_endio_pri(struct bio *bio, int error) 215 { 216 unsigned long flags; 217 struct drbd_request *req = bio->bi_private; 218 struct drbd_conf *mdev = req->mdev; 219 struct bio_and_error m; 220 enum drbd_req_event what; 221 int uptodate = bio_flagged(bio, BIO_UPTODATE); 222 223 if (error) 224 dev_warn(DEV, "p %s: error=%d\n", 225 bio_data_dir(bio) == WRITE ? "write" : "read", error); 226 if (!error && !uptodate) { 227 dev_warn(DEV, "p %s: setting error to -EIO\n", 228 bio_data_dir(bio) == WRITE ? "write" : "read"); 229 /* strange behavior of some lower level drivers... 230 * fail the request by clearing the uptodate flag, 231 * but do not return any error?! */ 232 error = -EIO; 233 } 234 235 /* to avoid recursion in __req_mod */ 236 if (unlikely(error)) { 237 what = (bio_data_dir(bio) == WRITE) 238 ? write_completed_with_error 239 : (bio_rw(bio) == READA) 240 ? read_completed_with_error 241 : read_ahead_completed_with_error; 242 } else 243 what = completed_ok; 244 245 bio_put(req->private_bio); 246 req->private_bio = ERR_PTR(error); 247 248 spin_lock_irqsave(&mdev->req_lock, flags); 249 __req_mod(req, what, &m); 250 spin_unlock_irqrestore(&mdev->req_lock, flags); 251 252 if (m.bio) 253 complete_master_bio(mdev, &m); 254 } 255 256 int w_io_error(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 257 { 258 struct drbd_request *req = container_of(w, struct drbd_request, w); 259 260 /* NOTE: mdev->ldev can be NULL by the time we get here! */ 261 /* D_ASSERT(mdev->ldev->dc.on_io_error != EP_PASS_ON); */ 262 263 /* the only way this callback is scheduled is from _req_may_be_done, 264 * when it is done and had a local write error, see comments there */ 265 drbd_req_free(req); 266 267 return TRUE; 268 } 269 270 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 271 { 272 struct drbd_request *req = container_of(w, struct drbd_request, w); 273 274 /* We should not detach for read io-error, 275 * but try to WRITE the P_DATA_REPLY to the failed location, 276 * to give the disk the chance to relocate that block */ 277 278 spin_lock_irq(&mdev->req_lock); 279 if (cancel || 280 mdev->state.conn < C_CONNECTED || 281 mdev->state.pdsk <= D_INCONSISTENT) { 282 _req_mod(req, send_canceled); 283 spin_unlock_irq(&mdev->req_lock); 284 dev_alert(DEV, "WE ARE LOST. Local IO failure, no peer.\n"); 285 return 1; 286 } 287 spin_unlock_irq(&mdev->req_lock); 288 289 return w_send_read_req(mdev, w, 0); 290 } 291 292 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 293 { 294 ERR_IF(cancel) return 1; 295 dev_err(DEV, "resync inactive, but callback triggered??\n"); 296 return 1; /* Simply ignore this! */ 297 } 298 299 void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest) 300 { 301 struct hash_desc desc; 302 struct scatterlist sg; 303 struct bio_vec *bvec; 304 int i; 305 306 desc.tfm = tfm; 307 desc.flags = 0; 308 309 sg_init_table(&sg, 1); 310 crypto_hash_init(&desc); 311 312 __bio_for_each_segment(bvec, bio, i, 0) { 313 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset); 314 crypto_hash_update(&desc, &sg, sg.length); 315 } 316 crypto_hash_final(&desc, digest); 317 } 318 319 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 320 { 321 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 322 int digest_size; 323 void *digest; 324 int ok; 325 326 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef); 327 328 if (unlikely(cancel)) { 329 drbd_free_ee(mdev, e); 330 return 1; 331 } 332 333 if (likely(drbd_bio_uptodate(e->private_bio))) { 334 digest_size = crypto_hash_digestsize(mdev->csums_tfm); 335 digest = kmalloc(digest_size, GFP_NOIO); 336 if (digest) { 337 drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest); 338 339 inc_rs_pending(mdev); 340 ok = drbd_send_drequest_csum(mdev, 341 e->sector, 342 e->size, 343 digest, 344 digest_size, 345 P_CSUM_RS_REQUEST); 346 kfree(digest); 347 } else { 348 dev_err(DEV, "kmalloc() of digest failed.\n"); 349 ok = 0; 350 } 351 } else 352 ok = 1; 353 354 drbd_free_ee(mdev, e); 355 356 if (unlikely(!ok)) 357 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n"); 358 return ok; 359 } 360 361 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 362 363 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size) 364 { 365 struct drbd_epoch_entry *e; 366 367 if (!get_ldev(mdev)) 368 return 0; 369 370 /* GFP_TRY, because if there is no memory available right now, this may 371 * be rescheduled for later. It is "only" background resync, after all. */ 372 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY); 373 if (!e) { 374 put_ldev(mdev); 375 return 2; 376 } 377 378 spin_lock_irq(&mdev->req_lock); 379 list_add(&e->w.list, &mdev->read_ee); 380 spin_unlock_irq(&mdev->req_lock); 381 382 e->private_bio->bi_end_io = drbd_endio_read_sec; 383 e->private_bio->bi_rw = READ; 384 e->w.cb = w_e_send_csum; 385 386 mdev->read_cnt += size >> 9; 387 drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio); 388 389 return 1; 390 } 391 392 void resync_timer_fn(unsigned long data) 393 { 394 unsigned long flags; 395 struct drbd_conf *mdev = (struct drbd_conf *) data; 396 int queue; 397 398 spin_lock_irqsave(&mdev->req_lock, flags); 399 400 if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) { 401 queue = 1; 402 if (mdev->state.conn == C_VERIFY_S) 403 mdev->resync_work.cb = w_make_ov_request; 404 else 405 mdev->resync_work.cb = w_make_resync_request; 406 } else { 407 queue = 0; 408 mdev->resync_work.cb = w_resync_inactive; 409 } 410 411 spin_unlock_irqrestore(&mdev->req_lock, flags); 412 413 /* harmless race: list_empty outside data.work.q_lock */ 414 if (list_empty(&mdev->resync_work.list) && queue) 415 drbd_queue_work(&mdev->data.work, &mdev->resync_work); 416 } 417 418 int w_make_resync_request(struct drbd_conf *mdev, 419 struct drbd_work *w, int cancel) 420 { 421 unsigned long bit; 422 sector_t sector; 423 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 424 int max_segment_size = queue_max_segment_size(mdev->rq_queue); 425 int number, i, size, pe, mx; 426 int align, queued, sndbuf; 427 428 if (unlikely(cancel)) 429 return 1; 430 431 if (unlikely(mdev->state.conn < C_CONNECTED)) { 432 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected"); 433 return 0; 434 } 435 436 if (mdev->state.conn != C_SYNC_TARGET) 437 dev_err(DEV, "%s in w_make_resync_request\n", 438 drbd_conn_str(mdev->state.conn)); 439 440 if (!get_ldev(mdev)) { 441 /* Since we only need to access mdev->rsync a 442 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but 443 to continue resync with a broken disk makes no sense at 444 all */ 445 dev_err(DEV, "Disk broke down during resync!\n"); 446 mdev->resync_work.cb = w_resync_inactive; 447 return 1; 448 } 449 450 number = SLEEP_TIME * mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ); 451 pe = atomic_read(&mdev->rs_pending_cnt); 452 453 mutex_lock(&mdev->data.mutex); 454 if (mdev->data.socket) 455 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req); 456 else 457 mx = 1; 458 mutex_unlock(&mdev->data.mutex); 459 460 /* For resync rates >160MB/sec, allow more pending RS requests */ 461 if (number > mx) 462 mx = number; 463 464 /* Limit the number of pending RS requests to no more than the peer's receive buffer */ 465 if ((pe + number) > mx) { 466 number = mx - pe; 467 } 468 469 for (i = 0; i < number; i++) { 470 /* Stop generating RS requests, when half of the send buffer is filled */ 471 mutex_lock(&mdev->data.mutex); 472 if (mdev->data.socket) { 473 queued = mdev->data.socket->sk->sk_wmem_queued; 474 sndbuf = mdev->data.socket->sk->sk_sndbuf; 475 } else { 476 queued = 1; 477 sndbuf = 0; 478 } 479 mutex_unlock(&mdev->data.mutex); 480 if (queued > sndbuf / 2) 481 goto requeue; 482 483 next_sector: 484 size = BM_BLOCK_SIZE; 485 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo); 486 487 if (bit == -1UL) { 488 mdev->bm_resync_fo = drbd_bm_bits(mdev); 489 mdev->resync_work.cb = w_resync_inactive; 490 put_ldev(mdev); 491 return 1; 492 } 493 494 sector = BM_BIT_TO_SECT(bit); 495 496 if (drbd_try_rs_begin_io(mdev, sector)) { 497 mdev->bm_resync_fo = bit; 498 goto requeue; 499 } 500 mdev->bm_resync_fo = bit + 1; 501 502 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) { 503 drbd_rs_complete_io(mdev, sector); 504 goto next_sector; 505 } 506 507 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE 508 /* try to find some adjacent bits. 509 * we stop if we have already the maximum req size. 510 * 511 * Additionally always align bigger requests, in order to 512 * be prepared for all stripe sizes of software RAIDs. 513 * 514 * we _do_ care about the agreed-upon q->max_segment_size 515 * here, as splitting up the requests on the other side is more 516 * difficult. the consequence is, that on lvm and md and other 517 * "indirect" devices, this is dead code, since 518 * q->max_segment_size will be PAGE_SIZE. 519 */ 520 align = 1; 521 for (;;) { 522 if (size + BM_BLOCK_SIZE > max_segment_size) 523 break; 524 525 /* Be always aligned */ 526 if (sector & ((1<<(align+3))-1)) 527 break; 528 529 /* do not cross extent boundaries */ 530 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0) 531 break; 532 /* now, is it actually dirty, after all? 533 * caution, drbd_bm_test_bit is tri-state for some 534 * obscure reason; ( b == 0 ) would get the out-of-band 535 * only accidentally right because of the "oddly sized" 536 * adjustment below */ 537 if (drbd_bm_test_bit(mdev, bit+1) != 1) 538 break; 539 bit++; 540 size += BM_BLOCK_SIZE; 541 if ((BM_BLOCK_SIZE << align) <= size) 542 align++; 543 i++; 544 } 545 /* if we merged some, 546 * reset the offset to start the next drbd_bm_find_next from */ 547 if (size > BM_BLOCK_SIZE) 548 mdev->bm_resync_fo = bit + 1; 549 #endif 550 551 /* adjust very last sectors, in case we are oddly sized */ 552 if (sector + (size>>9) > capacity) 553 size = (capacity-sector)<<9; 554 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) { 555 switch (read_for_csum(mdev, sector, size)) { 556 case 0: /* Disk failure*/ 557 put_ldev(mdev); 558 return 0; 559 case 2: /* Allocation failed */ 560 drbd_rs_complete_io(mdev, sector); 561 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector); 562 goto requeue; 563 /* case 1: everything ok */ 564 } 565 } else { 566 inc_rs_pending(mdev); 567 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST, 568 sector, size, ID_SYNCER)) { 569 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n"); 570 dec_rs_pending(mdev); 571 put_ldev(mdev); 572 return 0; 573 } 574 } 575 } 576 577 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) { 578 /* last syncer _request_ was sent, 579 * but the P_RS_DATA_REPLY not yet received. sync will end (and 580 * next sync group will resume), as soon as we receive the last 581 * resync data block, and the last bit is cleared. 582 * until then resync "work" is "inactive" ... 583 */ 584 mdev->resync_work.cb = w_resync_inactive; 585 put_ldev(mdev); 586 return 1; 587 } 588 589 requeue: 590 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME); 591 put_ldev(mdev); 592 return 1; 593 } 594 595 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 596 { 597 int number, i, size; 598 sector_t sector; 599 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 600 601 if (unlikely(cancel)) 602 return 1; 603 604 if (unlikely(mdev->state.conn < C_CONNECTED)) { 605 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected"); 606 return 0; 607 } 608 609 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ); 610 if (atomic_read(&mdev->rs_pending_cnt) > number) 611 goto requeue; 612 613 number -= atomic_read(&mdev->rs_pending_cnt); 614 615 sector = mdev->ov_position; 616 for (i = 0; i < number; i++) { 617 if (sector >= capacity) { 618 mdev->resync_work.cb = w_resync_inactive; 619 return 1; 620 } 621 622 size = BM_BLOCK_SIZE; 623 624 if (drbd_try_rs_begin_io(mdev, sector)) { 625 mdev->ov_position = sector; 626 goto requeue; 627 } 628 629 if (sector + (size>>9) > capacity) 630 size = (capacity-sector)<<9; 631 632 inc_rs_pending(mdev); 633 if (!drbd_send_ov_request(mdev, sector, size)) { 634 dec_rs_pending(mdev); 635 return 0; 636 } 637 sector += BM_SECT_PER_BIT; 638 } 639 mdev->ov_position = sector; 640 641 requeue: 642 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME); 643 return 1; 644 } 645 646 647 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 648 { 649 kfree(w); 650 ov_oos_print(mdev); 651 drbd_resync_finished(mdev); 652 653 return 1; 654 } 655 656 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 657 { 658 kfree(w); 659 660 drbd_resync_finished(mdev); 661 662 return 1; 663 } 664 665 int drbd_resync_finished(struct drbd_conf *mdev) 666 { 667 unsigned long db, dt, dbdt; 668 unsigned long n_oos; 669 union drbd_state os, ns; 670 struct drbd_work *w; 671 char *khelper_cmd = NULL; 672 673 /* Remove all elements from the resync LRU. Since future actions 674 * might set bits in the (main) bitmap, then the entries in the 675 * resync LRU would be wrong. */ 676 if (drbd_rs_del_all(mdev)) { 677 /* In case this is not possible now, most probably because 678 * there are P_RS_DATA_REPLY Packets lingering on the worker's 679 * queue (or even the read operations for those packets 680 * is not finished by now). Retry in 100ms. */ 681 682 drbd_kick_lo(mdev); 683 __set_current_state(TASK_INTERRUPTIBLE); 684 schedule_timeout(HZ / 10); 685 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC); 686 if (w) { 687 w->cb = w_resync_finished; 688 drbd_queue_work(&mdev->data.work, w); 689 return 1; 690 } 691 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n"); 692 } 693 694 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ; 695 if (dt <= 0) 696 dt = 1; 697 db = mdev->rs_total; 698 dbdt = Bit2KB(db/dt); 699 mdev->rs_paused /= HZ; 700 701 if (!get_ldev(mdev)) 702 goto out; 703 704 spin_lock_irq(&mdev->req_lock); 705 os = mdev->state; 706 707 /* This protects us against multiple calls (that can happen in the presence 708 of application IO), and against connectivity loss just before we arrive here. */ 709 if (os.conn <= C_CONNECTED) 710 goto out_unlock; 711 712 ns = os; 713 ns.conn = C_CONNECTED; 714 715 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n", 716 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ? 717 "Online verify " : "Resync", 718 dt + mdev->rs_paused, mdev->rs_paused, dbdt); 719 720 n_oos = drbd_bm_total_weight(mdev); 721 722 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) { 723 if (n_oos) { 724 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n", 725 n_oos, Bit2KB(1)); 726 khelper_cmd = "out-of-sync"; 727 } 728 } else { 729 D_ASSERT((n_oos - mdev->rs_failed) == 0); 730 731 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 732 khelper_cmd = "after-resync-target"; 733 734 if (mdev->csums_tfm && mdev->rs_total) { 735 const unsigned long s = mdev->rs_same_csum; 736 const unsigned long t = mdev->rs_total; 737 const int ratio = 738 (t == 0) ? 0 : 739 (t < 100000) ? ((s*100)/t) : (s/(t/100)); 740 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; " 741 "transferred %luK total %luK\n", 742 ratio, 743 Bit2KB(mdev->rs_same_csum), 744 Bit2KB(mdev->rs_total - mdev->rs_same_csum), 745 Bit2KB(mdev->rs_total)); 746 } 747 } 748 749 if (mdev->rs_failed) { 750 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed); 751 752 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 753 ns.disk = D_INCONSISTENT; 754 ns.pdsk = D_UP_TO_DATE; 755 } else { 756 ns.disk = D_UP_TO_DATE; 757 ns.pdsk = D_INCONSISTENT; 758 } 759 } else { 760 ns.disk = D_UP_TO_DATE; 761 ns.pdsk = D_UP_TO_DATE; 762 763 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 764 if (mdev->p_uuid) { 765 int i; 766 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++) 767 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]); 768 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]); 769 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]); 770 } else { 771 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n"); 772 } 773 } 774 775 drbd_uuid_set_bm(mdev, 0UL); 776 777 if (mdev->p_uuid) { 778 /* Now the two UUID sets are equal, update what we 779 * know of the peer. */ 780 int i; 781 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++) 782 mdev->p_uuid[i] = mdev->ldev->md.uuid[i]; 783 } 784 } 785 786 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 787 out_unlock: 788 spin_unlock_irq(&mdev->req_lock); 789 put_ldev(mdev); 790 out: 791 mdev->rs_total = 0; 792 mdev->rs_failed = 0; 793 mdev->rs_paused = 0; 794 mdev->ov_start_sector = 0; 795 796 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) { 797 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n"); 798 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished"); 799 } 800 801 if (khelper_cmd) 802 drbd_khelper(mdev, khelper_cmd); 803 804 return 1; 805 } 806 807 /* helper */ 808 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e) 809 { 810 if (drbd_bio_has_active_page(e->private_bio)) { 811 /* This might happen if sendpage() has not finished */ 812 spin_lock_irq(&mdev->req_lock); 813 list_add_tail(&e->w.list, &mdev->net_ee); 814 spin_unlock_irq(&mdev->req_lock); 815 } else 816 drbd_free_ee(mdev, e); 817 } 818 819 /** 820 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 821 * @mdev: DRBD device. 822 * @w: work object. 823 * @cancel: The connection will be closed anyways 824 */ 825 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 826 { 827 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 828 int ok; 829 830 if (unlikely(cancel)) { 831 drbd_free_ee(mdev, e); 832 dec_unacked(mdev); 833 return 1; 834 } 835 836 if (likely(drbd_bio_uptodate(e->private_bio))) { 837 ok = drbd_send_block(mdev, P_DATA_REPLY, e); 838 } else { 839 if (__ratelimit(&drbd_ratelimit_state)) 840 dev_err(DEV, "Sending NegDReply. sector=%llus.\n", 841 (unsigned long long)e->sector); 842 843 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e); 844 } 845 846 dec_unacked(mdev); 847 848 move_to_net_ee_or_free(mdev, e); 849 850 if (unlikely(!ok)) 851 dev_err(DEV, "drbd_send_block() failed\n"); 852 return ok; 853 } 854 855 /** 856 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS 857 * @mdev: DRBD device. 858 * @w: work object. 859 * @cancel: The connection will be closed anyways 860 */ 861 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 862 { 863 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 864 int ok; 865 866 if (unlikely(cancel)) { 867 drbd_free_ee(mdev, e); 868 dec_unacked(mdev); 869 return 1; 870 } 871 872 if (get_ldev_if_state(mdev, D_FAILED)) { 873 drbd_rs_complete_io(mdev, e->sector); 874 put_ldev(mdev); 875 } 876 877 if (likely(drbd_bio_uptodate(e->private_bio))) { 878 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) { 879 inc_rs_pending(mdev); 880 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e); 881 } else { 882 if (__ratelimit(&drbd_ratelimit_state)) 883 dev_err(DEV, "Not sending RSDataReply, " 884 "partner DISKLESS!\n"); 885 ok = 1; 886 } 887 } else { 888 if (__ratelimit(&drbd_ratelimit_state)) 889 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n", 890 (unsigned long long)e->sector); 891 892 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 893 894 /* update resync data with failure */ 895 drbd_rs_failed_io(mdev, e->sector, e->size); 896 } 897 898 dec_unacked(mdev); 899 900 move_to_net_ee_or_free(mdev, e); 901 902 if (unlikely(!ok)) 903 dev_err(DEV, "drbd_send_block() failed\n"); 904 return ok; 905 } 906 907 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 908 { 909 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 910 struct digest_info *di; 911 int digest_size; 912 void *digest = NULL; 913 int ok, eq = 0; 914 915 if (unlikely(cancel)) { 916 drbd_free_ee(mdev, e); 917 dec_unacked(mdev); 918 return 1; 919 } 920 921 drbd_rs_complete_io(mdev, e->sector); 922 923 di = (struct digest_info *)(unsigned long)e->block_id; 924 925 if (likely(drbd_bio_uptodate(e->private_bio))) { 926 /* quick hack to try to avoid a race against reconfiguration. 927 * a real fix would be much more involved, 928 * introducing more locking mechanisms */ 929 if (mdev->csums_tfm) { 930 digest_size = crypto_hash_digestsize(mdev->csums_tfm); 931 D_ASSERT(digest_size == di->digest_size); 932 digest = kmalloc(digest_size, GFP_NOIO); 933 } 934 if (digest) { 935 drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest); 936 eq = !memcmp(digest, di->digest, digest_size); 937 kfree(digest); 938 } 939 940 if (eq) { 941 drbd_set_in_sync(mdev, e->sector, e->size); 942 mdev->rs_same_csum++; 943 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e); 944 } else { 945 inc_rs_pending(mdev); 946 e->block_id = ID_SYNCER; 947 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e); 948 } 949 } else { 950 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 951 if (__ratelimit(&drbd_ratelimit_state)) 952 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n"); 953 } 954 955 dec_unacked(mdev); 956 957 kfree(di); 958 959 move_to_net_ee_or_free(mdev, e); 960 961 if (unlikely(!ok)) 962 dev_err(DEV, "drbd_send_block/ack() failed\n"); 963 return ok; 964 } 965 966 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 967 { 968 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 969 int digest_size; 970 void *digest; 971 int ok = 1; 972 973 if (unlikely(cancel)) 974 goto out; 975 976 if (unlikely(!drbd_bio_uptodate(e->private_bio))) 977 goto out; 978 979 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 980 /* FIXME if this allocation fails, online verify will not terminate! */ 981 digest = kmalloc(digest_size, GFP_NOIO); 982 if (digest) { 983 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest); 984 inc_rs_pending(mdev); 985 ok = drbd_send_drequest_csum(mdev, e->sector, e->size, 986 digest, digest_size, P_OV_REPLY); 987 if (!ok) 988 dec_rs_pending(mdev); 989 kfree(digest); 990 } 991 992 out: 993 drbd_free_ee(mdev, e); 994 995 dec_unacked(mdev); 996 997 return ok; 998 } 999 1000 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size) 1001 { 1002 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) { 1003 mdev->ov_last_oos_size += size>>9; 1004 } else { 1005 mdev->ov_last_oos_start = sector; 1006 mdev->ov_last_oos_size = size>>9; 1007 } 1008 drbd_set_out_of_sync(mdev, sector, size); 1009 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags); 1010 } 1011 1012 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1013 { 1014 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 1015 struct digest_info *di; 1016 int digest_size; 1017 void *digest; 1018 int ok, eq = 0; 1019 1020 if (unlikely(cancel)) { 1021 drbd_free_ee(mdev, e); 1022 dec_unacked(mdev); 1023 return 1; 1024 } 1025 1026 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all 1027 * the resync lru has been cleaned up already */ 1028 drbd_rs_complete_io(mdev, e->sector); 1029 1030 di = (struct digest_info *)(unsigned long)e->block_id; 1031 1032 if (likely(drbd_bio_uptodate(e->private_bio))) { 1033 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 1034 digest = kmalloc(digest_size, GFP_NOIO); 1035 if (digest) { 1036 drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest); 1037 1038 D_ASSERT(digest_size == di->digest_size); 1039 eq = !memcmp(digest, di->digest, digest_size); 1040 kfree(digest); 1041 } 1042 } else { 1043 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 1044 if (__ratelimit(&drbd_ratelimit_state)) 1045 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n"); 1046 } 1047 1048 dec_unacked(mdev); 1049 1050 kfree(di); 1051 1052 if (!eq) 1053 drbd_ov_oos_found(mdev, e->sector, e->size); 1054 else 1055 ov_oos_print(mdev); 1056 1057 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size, 1058 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1059 1060 drbd_free_ee(mdev, e); 1061 1062 if (--mdev->ov_left == 0) { 1063 ov_oos_print(mdev); 1064 drbd_resync_finished(mdev); 1065 } 1066 1067 return ok; 1068 } 1069 1070 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1071 { 1072 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w); 1073 complete(&b->done); 1074 return 1; 1075 } 1076 1077 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1078 { 1079 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w); 1080 struct p_barrier *p = &mdev->data.sbuf.barrier; 1081 int ok = 1; 1082 1083 /* really avoid racing with tl_clear. w.cb may have been referenced 1084 * just before it was reassigned and re-queued, so double check that. 1085 * actually, this race was harmless, since we only try to send the 1086 * barrier packet here, and otherwise do nothing with the object. 1087 * but compare with the head of w_clear_epoch */ 1088 spin_lock_irq(&mdev->req_lock); 1089 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED) 1090 cancel = 1; 1091 spin_unlock_irq(&mdev->req_lock); 1092 if (cancel) 1093 return 1; 1094 1095 if (!drbd_get_data_sock(mdev)) 1096 return 0; 1097 p->barrier = b->br_number; 1098 /* inc_ap_pending was done where this was queued. 1099 * dec_ap_pending will be done in got_BarrierAck 1100 * or (on connection loss) in w_clear_epoch. */ 1101 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER, 1102 (struct p_header *)p, sizeof(*p), 0); 1103 drbd_put_data_sock(mdev); 1104 1105 return ok; 1106 } 1107 1108 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1109 { 1110 if (cancel) 1111 return 1; 1112 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE); 1113 } 1114 1115 /** 1116 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request 1117 * @mdev: DRBD device. 1118 * @w: work object. 1119 * @cancel: The connection will be closed anyways 1120 */ 1121 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1122 { 1123 struct drbd_request *req = container_of(w, struct drbd_request, w); 1124 int ok; 1125 1126 if (unlikely(cancel)) { 1127 req_mod(req, send_canceled); 1128 return 1; 1129 } 1130 1131 ok = drbd_send_dblock(mdev, req); 1132 req_mod(req, ok ? handed_over_to_network : send_failed); 1133 1134 return ok; 1135 } 1136 1137 /** 1138 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet 1139 * @mdev: DRBD device. 1140 * @w: work object. 1141 * @cancel: The connection will be closed anyways 1142 */ 1143 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1144 { 1145 struct drbd_request *req = container_of(w, struct drbd_request, w); 1146 int ok; 1147 1148 if (unlikely(cancel)) { 1149 req_mod(req, send_canceled); 1150 return 1; 1151 } 1152 1153 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size, 1154 (unsigned long)req); 1155 1156 if (!ok) { 1157 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send(); 1158 * so this is probably redundant */ 1159 if (mdev->state.conn >= C_CONNECTED) 1160 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE)); 1161 } 1162 req_mod(req, ok ? handed_over_to_network : send_failed); 1163 1164 return ok; 1165 } 1166 1167 static int _drbd_may_sync_now(struct drbd_conf *mdev) 1168 { 1169 struct drbd_conf *odev = mdev; 1170 1171 while (1) { 1172 if (odev->sync_conf.after == -1) 1173 return 1; 1174 odev = minor_to_mdev(odev->sync_conf.after); 1175 ERR_IF(!odev) return 1; 1176 if ((odev->state.conn >= C_SYNC_SOURCE && 1177 odev->state.conn <= C_PAUSED_SYNC_T) || 1178 odev->state.aftr_isp || odev->state.peer_isp || 1179 odev->state.user_isp) 1180 return 0; 1181 } 1182 } 1183 1184 /** 1185 * _drbd_pause_after() - Pause resync on all devices that may not resync now 1186 * @mdev: DRBD device. 1187 * 1188 * Called from process context only (admin command and after_state_ch). 1189 */ 1190 static int _drbd_pause_after(struct drbd_conf *mdev) 1191 { 1192 struct drbd_conf *odev; 1193 int i, rv = 0; 1194 1195 for (i = 0; i < minor_count; i++) { 1196 odev = minor_to_mdev(i); 1197 if (!odev) 1198 continue; 1199 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1200 continue; 1201 if (!_drbd_may_sync_now(odev)) 1202 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL) 1203 != SS_NOTHING_TO_DO); 1204 } 1205 1206 return rv; 1207 } 1208 1209 /** 1210 * _drbd_resume_next() - Resume resync on all devices that may resync now 1211 * @mdev: DRBD device. 1212 * 1213 * Called from process context only (admin command and worker). 1214 */ 1215 static int _drbd_resume_next(struct drbd_conf *mdev) 1216 { 1217 struct drbd_conf *odev; 1218 int i, rv = 0; 1219 1220 for (i = 0; i < minor_count; i++) { 1221 odev = minor_to_mdev(i); 1222 if (!odev) 1223 continue; 1224 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1225 continue; 1226 if (odev->state.aftr_isp) { 1227 if (_drbd_may_sync_now(odev)) 1228 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0), 1229 CS_HARD, NULL) 1230 != SS_NOTHING_TO_DO) ; 1231 } 1232 } 1233 return rv; 1234 } 1235 1236 void resume_next_sg(struct drbd_conf *mdev) 1237 { 1238 write_lock_irq(&global_state_lock); 1239 _drbd_resume_next(mdev); 1240 write_unlock_irq(&global_state_lock); 1241 } 1242 1243 void suspend_other_sg(struct drbd_conf *mdev) 1244 { 1245 write_lock_irq(&global_state_lock); 1246 _drbd_pause_after(mdev); 1247 write_unlock_irq(&global_state_lock); 1248 } 1249 1250 static int sync_after_error(struct drbd_conf *mdev, int o_minor) 1251 { 1252 struct drbd_conf *odev; 1253 1254 if (o_minor == -1) 1255 return NO_ERROR; 1256 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL) 1257 return ERR_SYNC_AFTER; 1258 1259 /* check for loops */ 1260 odev = minor_to_mdev(o_minor); 1261 while (1) { 1262 if (odev == mdev) 1263 return ERR_SYNC_AFTER_CYCLE; 1264 1265 /* dependency chain ends here, no cycles. */ 1266 if (odev->sync_conf.after == -1) 1267 return NO_ERROR; 1268 1269 /* follow the dependency chain */ 1270 odev = minor_to_mdev(odev->sync_conf.after); 1271 } 1272 } 1273 1274 int drbd_alter_sa(struct drbd_conf *mdev, int na) 1275 { 1276 int changes; 1277 int retcode; 1278 1279 write_lock_irq(&global_state_lock); 1280 retcode = sync_after_error(mdev, na); 1281 if (retcode == NO_ERROR) { 1282 mdev->sync_conf.after = na; 1283 do { 1284 changes = _drbd_pause_after(mdev); 1285 changes |= _drbd_resume_next(mdev); 1286 } while (changes); 1287 } 1288 write_unlock_irq(&global_state_lock); 1289 return retcode; 1290 } 1291 1292 /** 1293 * drbd_start_resync() - Start the resync process 1294 * @mdev: DRBD device. 1295 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET 1296 * 1297 * This function might bring you directly into one of the 1298 * C_PAUSED_SYNC_* states. 1299 */ 1300 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side) 1301 { 1302 union drbd_state ns; 1303 int r; 1304 1305 if (mdev->state.conn >= C_SYNC_SOURCE) { 1306 dev_err(DEV, "Resync already running!\n"); 1307 return; 1308 } 1309 1310 /* In case a previous resync run was aborted by an IO error/detach on the peer. */ 1311 drbd_rs_cancel_all(mdev); 1312 1313 if (side == C_SYNC_TARGET) { 1314 /* Since application IO was locked out during C_WF_BITMAP_T and 1315 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET 1316 we check that we might make the data inconsistent. */ 1317 r = drbd_khelper(mdev, "before-resync-target"); 1318 r = (r >> 8) & 0xff; 1319 if (r > 0) { 1320 dev_info(DEV, "before-resync-target handler returned %d, " 1321 "dropping connection.\n", r); 1322 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 1323 return; 1324 } 1325 } 1326 1327 drbd_state_lock(mdev); 1328 1329 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) { 1330 drbd_state_unlock(mdev); 1331 return; 1332 } 1333 1334 if (side == C_SYNC_TARGET) { 1335 mdev->bm_resync_fo = 0; 1336 } else /* side == C_SYNC_SOURCE */ { 1337 u64 uuid; 1338 1339 get_random_bytes(&uuid, sizeof(u64)); 1340 drbd_uuid_set(mdev, UI_BITMAP, uuid); 1341 drbd_send_sync_uuid(mdev, uuid); 1342 1343 D_ASSERT(mdev->state.disk == D_UP_TO_DATE); 1344 } 1345 1346 write_lock_irq(&global_state_lock); 1347 ns = mdev->state; 1348 1349 ns.aftr_isp = !_drbd_may_sync_now(mdev); 1350 1351 ns.conn = side; 1352 1353 if (side == C_SYNC_TARGET) 1354 ns.disk = D_INCONSISTENT; 1355 else /* side == C_SYNC_SOURCE */ 1356 ns.pdsk = D_INCONSISTENT; 1357 1358 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 1359 ns = mdev->state; 1360 1361 if (ns.conn < C_CONNECTED) 1362 r = SS_UNKNOWN_ERROR; 1363 1364 if (r == SS_SUCCESS) { 1365 mdev->rs_total = 1366 mdev->rs_mark_left = drbd_bm_total_weight(mdev); 1367 mdev->rs_failed = 0; 1368 mdev->rs_paused = 0; 1369 mdev->rs_start = 1370 mdev->rs_mark_time = jiffies; 1371 mdev->rs_same_csum = 0; 1372 _drbd_pause_after(mdev); 1373 } 1374 write_unlock_irq(&global_state_lock); 1375 drbd_state_unlock(mdev); 1376 put_ldev(mdev); 1377 1378 if (r == SS_SUCCESS) { 1379 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n", 1380 drbd_conn_str(ns.conn), 1381 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10), 1382 (unsigned long) mdev->rs_total); 1383 1384 if (mdev->rs_total == 0) { 1385 /* Peer still reachable? Beware of failing before-resync-target handlers! */ 1386 request_ping(mdev); 1387 __set_current_state(TASK_INTERRUPTIBLE); 1388 schedule_timeout(mdev->net_conf->ping_timeo*HZ/9); /* 9 instead 10 */ 1389 drbd_resync_finished(mdev); 1390 return; 1391 } 1392 1393 /* ns.conn may already be != mdev->state.conn, 1394 * we may have been paused in between, or become paused until 1395 * the timer triggers. 1396 * No matter, that is handled in resync_timer_fn() */ 1397 if (ns.conn == C_SYNC_TARGET) 1398 mod_timer(&mdev->resync_timer, jiffies); 1399 1400 drbd_md_sync(mdev); 1401 } 1402 } 1403 1404 int drbd_worker(struct drbd_thread *thi) 1405 { 1406 struct drbd_conf *mdev = thi->mdev; 1407 struct drbd_work *w = NULL; 1408 LIST_HEAD(work_list); 1409 int intr = 0, i; 1410 1411 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev)); 1412 1413 while (get_t_state(thi) == Running) { 1414 drbd_thread_current_set_cpu(mdev); 1415 1416 if (down_trylock(&mdev->data.work.s)) { 1417 mutex_lock(&mdev->data.mutex); 1418 if (mdev->data.socket && !mdev->net_conf->no_cork) 1419 drbd_tcp_uncork(mdev->data.socket); 1420 mutex_unlock(&mdev->data.mutex); 1421 1422 intr = down_interruptible(&mdev->data.work.s); 1423 1424 mutex_lock(&mdev->data.mutex); 1425 if (mdev->data.socket && !mdev->net_conf->no_cork) 1426 drbd_tcp_cork(mdev->data.socket); 1427 mutex_unlock(&mdev->data.mutex); 1428 } 1429 1430 if (intr) { 1431 D_ASSERT(intr == -EINTR); 1432 flush_signals(current); 1433 ERR_IF (get_t_state(thi) == Running) 1434 continue; 1435 break; 1436 } 1437 1438 if (get_t_state(thi) != Running) 1439 break; 1440 /* With this break, we have done a down() but not consumed 1441 the entry from the list. The cleanup code takes care of 1442 this... */ 1443 1444 w = NULL; 1445 spin_lock_irq(&mdev->data.work.q_lock); 1446 ERR_IF(list_empty(&mdev->data.work.q)) { 1447 /* something terribly wrong in our logic. 1448 * we were able to down() the semaphore, 1449 * but the list is empty... doh. 1450 * 1451 * what is the best thing to do now? 1452 * try again from scratch, restarting the receiver, 1453 * asender, whatnot? could break even more ugly, 1454 * e.g. when we are primary, but no good local data. 1455 * 1456 * I'll try to get away just starting over this loop. 1457 */ 1458 spin_unlock_irq(&mdev->data.work.q_lock); 1459 continue; 1460 } 1461 w = list_entry(mdev->data.work.q.next, struct drbd_work, list); 1462 list_del_init(&w->list); 1463 spin_unlock_irq(&mdev->data.work.q_lock); 1464 1465 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) { 1466 /* dev_warn(DEV, "worker: a callback failed! \n"); */ 1467 if (mdev->state.conn >= C_CONNECTED) 1468 drbd_force_state(mdev, 1469 NS(conn, C_NETWORK_FAILURE)); 1470 } 1471 } 1472 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags)); 1473 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags)); 1474 1475 spin_lock_irq(&mdev->data.work.q_lock); 1476 i = 0; 1477 while (!list_empty(&mdev->data.work.q)) { 1478 list_splice_init(&mdev->data.work.q, &work_list); 1479 spin_unlock_irq(&mdev->data.work.q_lock); 1480 1481 while (!list_empty(&work_list)) { 1482 w = list_entry(work_list.next, struct drbd_work, list); 1483 list_del_init(&w->list); 1484 w->cb(mdev, w, 1); 1485 i++; /* dead debugging code */ 1486 } 1487 1488 spin_lock_irq(&mdev->data.work.q_lock); 1489 } 1490 sema_init(&mdev->data.work.s, 0); 1491 /* DANGEROUS race: if someone did queue his work within the spinlock, 1492 * but up() ed outside the spinlock, we could get an up() on the 1493 * semaphore without corresponding list entry. 1494 * So don't do that. 1495 */ 1496 spin_unlock_irq(&mdev->data.work.q_lock); 1497 1498 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE); 1499 /* _drbd_set_state only uses stop_nowait. 1500 * wait here for the Exiting receiver. */ 1501 drbd_thread_stop(&mdev->receiver); 1502 drbd_mdev_cleanup(mdev); 1503 1504 dev_info(DEV, "worker terminated\n"); 1505 1506 clear_bit(DEVICE_DYING, &mdev->flags); 1507 clear_bit(CONFIG_PENDING, &mdev->flags); 1508 wake_up(&mdev->state_wait); 1509 1510 return 0; 1511 } 1512