1 /* 2 drbd_worker.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24 */ 25 26 #include <linux/module.h> 27 #include <linux/drbd.h> 28 #include <linux/sched.h> 29 #include <linux/wait.h> 30 #include <linux/mm.h> 31 #include <linux/memcontrol.h> 32 #include <linux/mm_inline.h> 33 #include <linux/slab.h> 34 #include <linux/random.h> 35 #include <linux/string.h> 36 #include <linux/scatterlist.h> 37 38 #include "drbd_int.h" 39 #include "drbd_protocol.h" 40 #include "drbd_req.h" 41 42 static int make_ov_request(struct drbd_device *, int); 43 static int make_resync_request(struct drbd_device *, int); 44 45 /* endio handlers: 46 * drbd_md_io_complete (defined here) 47 * drbd_request_endio (defined here) 48 * drbd_peer_request_endio (defined here) 49 * bm_async_io_complete (defined in drbd_bitmap.c) 50 * 51 * For all these callbacks, note the following: 52 * The callbacks will be called in irq context by the IDE drivers, 53 * and in Softirqs/Tasklets/BH context by the SCSI drivers. 54 * Try to get the locking right :) 55 * 56 */ 57 58 59 /* About the global_state_lock 60 Each state transition on an device holds a read lock. In case we have 61 to evaluate the resync after dependencies, we grab a write lock, because 62 we need stable states on all devices for that. */ 63 rwlock_t global_state_lock; 64 65 /* used for synchronous meta data and bitmap IO 66 * submitted by drbd_md_sync_page_io() 67 */ 68 void drbd_md_io_complete(struct bio *bio, int error) 69 { 70 struct drbd_device *device; 71 72 device = bio->bi_private; 73 device->md_io.error = error; 74 75 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able 76 * to timeout on the lower level device, and eventually detach from it. 77 * If this io completion runs after that timeout expired, this 78 * drbd_md_put_buffer() may allow us to finally try and re-attach. 79 * During normal operation, this only puts that extra reference 80 * down to 1 again. 81 * Make sure we first drop the reference, and only then signal 82 * completion, or we may (in drbd_al_read_log()) cycle so fast into the 83 * next drbd_md_sync_page_io(), that we trigger the 84 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there. 85 */ 86 drbd_md_put_buffer(device); 87 device->md_io.done = 1; 88 wake_up(&device->misc_wait); 89 bio_put(bio); 90 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */ 91 put_ldev(device); 92 } 93 94 /* reads on behalf of the partner, 95 * "submitted" by the receiver 96 */ 97 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local) 98 { 99 unsigned long flags = 0; 100 struct drbd_peer_device *peer_device = peer_req->peer_device; 101 struct drbd_device *device = peer_device->device; 102 103 spin_lock_irqsave(&device->resource->req_lock, flags); 104 device->read_cnt += peer_req->i.size >> 9; 105 list_del(&peer_req->w.list); 106 if (list_empty(&device->read_ee)) 107 wake_up(&device->ee_wait); 108 if (test_bit(__EE_WAS_ERROR, &peer_req->flags)) 109 __drbd_chk_io_error(device, DRBD_READ_ERROR); 110 spin_unlock_irqrestore(&device->resource->req_lock, flags); 111 112 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w); 113 put_ldev(device); 114 } 115 116 /* writes on behalf of the partner, or resync writes, 117 * "submitted" by the receiver, final stage. */ 118 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local) 119 { 120 unsigned long flags = 0; 121 struct drbd_peer_device *peer_device = peer_req->peer_device; 122 struct drbd_device *device = peer_device->device; 123 struct drbd_interval i; 124 int do_wake; 125 u64 block_id; 126 int do_al_complete_io; 127 128 /* after we moved peer_req to done_ee, 129 * we may no longer access it, 130 * it may be freed/reused already! 131 * (as soon as we release the req_lock) */ 132 i = peer_req->i; 133 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO; 134 block_id = peer_req->block_id; 135 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; 136 137 spin_lock_irqsave(&device->resource->req_lock, flags); 138 device->writ_cnt += peer_req->i.size >> 9; 139 list_move_tail(&peer_req->w.list, &device->done_ee); 140 141 /* 142 * Do not remove from the write_requests tree here: we did not send the 143 * Ack yet and did not wake possibly waiting conflicting requests. 144 * Removed from the tree from "drbd_process_done_ee" within the 145 * appropriate dw.cb (e_end_block/e_end_resync_block) or from 146 * _drbd_clear_done_ee. 147 */ 148 149 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee); 150 151 /* FIXME do we want to detach for failed REQ_DISCARD? 152 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */ 153 if (peer_req->flags & EE_WAS_ERROR) 154 __drbd_chk_io_error(device, DRBD_WRITE_ERROR); 155 spin_unlock_irqrestore(&device->resource->req_lock, flags); 156 157 if (block_id == ID_SYNCER) 158 drbd_rs_complete_io(device, i.sector); 159 160 if (do_wake) 161 wake_up(&device->ee_wait); 162 163 if (do_al_complete_io) 164 drbd_al_complete_io(device, &i); 165 166 wake_asender(peer_device->connection); 167 put_ldev(device); 168 } 169 170 /* writes on behalf of the partner, or resync writes, 171 * "submitted" by the receiver. 172 */ 173 void drbd_peer_request_endio(struct bio *bio, int error) 174 { 175 struct drbd_peer_request *peer_req = bio->bi_private; 176 struct drbd_device *device = peer_req->peer_device->device; 177 int uptodate = bio_flagged(bio, BIO_UPTODATE); 178 int is_write = bio_data_dir(bio) == WRITE; 179 int is_discard = !!(bio->bi_rw & REQ_DISCARD); 180 181 if (error && __ratelimit(&drbd_ratelimit_state)) 182 drbd_warn(device, "%s: error=%d s=%llus\n", 183 is_write ? (is_discard ? "discard" : "write") 184 : "read", error, 185 (unsigned long long)peer_req->i.sector); 186 if (!error && !uptodate) { 187 if (__ratelimit(&drbd_ratelimit_state)) 188 drbd_warn(device, "%s: setting error to -EIO s=%llus\n", 189 is_write ? "write" : "read", 190 (unsigned long long)peer_req->i.sector); 191 /* strange behavior of some lower level drivers... 192 * fail the request by clearing the uptodate flag, 193 * but do not return any error?! */ 194 error = -EIO; 195 } 196 197 if (error) 198 set_bit(__EE_WAS_ERROR, &peer_req->flags); 199 200 bio_put(bio); /* no need for the bio anymore */ 201 if (atomic_dec_and_test(&peer_req->pending_bios)) { 202 if (is_write) 203 drbd_endio_write_sec_final(peer_req); 204 else 205 drbd_endio_read_sec_final(peer_req); 206 } 207 } 208 209 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request 210 */ 211 void drbd_request_endio(struct bio *bio, int error) 212 { 213 unsigned long flags; 214 struct drbd_request *req = bio->bi_private; 215 struct drbd_device *device = req->device; 216 struct bio_and_error m; 217 enum drbd_req_event what; 218 int uptodate = bio_flagged(bio, BIO_UPTODATE); 219 220 if (!error && !uptodate) { 221 drbd_warn(device, "p %s: setting error to -EIO\n", 222 bio_data_dir(bio) == WRITE ? "write" : "read"); 223 /* strange behavior of some lower level drivers... 224 * fail the request by clearing the uptodate flag, 225 * but do not return any error?! */ 226 error = -EIO; 227 } 228 229 230 /* If this request was aborted locally before, 231 * but now was completed "successfully", 232 * chances are that this caused arbitrary data corruption. 233 * 234 * "aborting" requests, or force-detaching the disk, is intended for 235 * completely blocked/hung local backing devices which do no longer 236 * complete requests at all, not even do error completions. In this 237 * situation, usually a hard-reset and failover is the only way out. 238 * 239 * By "aborting", basically faking a local error-completion, 240 * we allow for a more graceful swichover by cleanly migrating services. 241 * Still the affected node has to be rebooted "soon". 242 * 243 * By completing these requests, we allow the upper layers to re-use 244 * the associated data pages. 245 * 246 * If later the local backing device "recovers", and now DMAs some data 247 * from disk into the original request pages, in the best case it will 248 * just put random data into unused pages; but typically it will corrupt 249 * meanwhile completely unrelated data, causing all sorts of damage. 250 * 251 * Which means delayed successful completion, 252 * especially for READ requests, 253 * is a reason to panic(). 254 * 255 * We assume that a delayed *error* completion is OK, 256 * though we still will complain noisily about it. 257 */ 258 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) { 259 if (__ratelimit(&drbd_ratelimit_state)) 260 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n"); 261 262 if (!error) 263 panic("possible random memory corruption caused by delayed completion of aborted local request\n"); 264 } 265 266 /* to avoid recursion in __req_mod */ 267 if (unlikely(error)) { 268 if (bio->bi_rw & REQ_DISCARD) 269 what = (error == -EOPNOTSUPP) 270 ? DISCARD_COMPLETED_NOTSUPP 271 : DISCARD_COMPLETED_WITH_ERROR; 272 else 273 what = (bio_data_dir(bio) == WRITE) 274 ? WRITE_COMPLETED_WITH_ERROR 275 : (bio_rw(bio) == READ) 276 ? READ_COMPLETED_WITH_ERROR 277 : READ_AHEAD_COMPLETED_WITH_ERROR; 278 } else 279 what = COMPLETED_OK; 280 281 bio_put(req->private_bio); 282 req->private_bio = ERR_PTR(error); 283 284 /* not req_mod(), we need irqsave here! */ 285 spin_lock_irqsave(&device->resource->req_lock, flags); 286 __req_mod(req, what, &m); 287 spin_unlock_irqrestore(&device->resource->req_lock, flags); 288 put_ldev(device); 289 290 if (m.bio) 291 complete_master_bio(device, &m); 292 } 293 294 void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest) 295 { 296 struct hash_desc desc; 297 struct scatterlist sg; 298 struct page *page = peer_req->pages; 299 struct page *tmp; 300 unsigned len; 301 302 desc.tfm = tfm; 303 desc.flags = 0; 304 305 sg_init_table(&sg, 1); 306 crypto_hash_init(&desc); 307 308 while ((tmp = page_chain_next(page))) { 309 /* all but the last page will be fully used */ 310 sg_set_page(&sg, page, PAGE_SIZE, 0); 311 crypto_hash_update(&desc, &sg, sg.length); 312 page = tmp; 313 } 314 /* and now the last, possibly only partially used page */ 315 len = peer_req->i.size & (PAGE_SIZE - 1); 316 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0); 317 crypto_hash_update(&desc, &sg, sg.length); 318 crypto_hash_final(&desc, digest); 319 } 320 321 void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest) 322 { 323 struct hash_desc desc; 324 struct scatterlist sg; 325 struct bio_vec bvec; 326 struct bvec_iter iter; 327 328 desc.tfm = tfm; 329 desc.flags = 0; 330 331 sg_init_table(&sg, 1); 332 crypto_hash_init(&desc); 333 334 bio_for_each_segment(bvec, bio, iter) { 335 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset); 336 crypto_hash_update(&desc, &sg, sg.length); 337 } 338 crypto_hash_final(&desc, digest); 339 } 340 341 /* MAYBE merge common code with w_e_end_ov_req */ 342 static int w_e_send_csum(struct drbd_work *w, int cancel) 343 { 344 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 345 struct drbd_peer_device *peer_device = peer_req->peer_device; 346 struct drbd_device *device = peer_device->device; 347 int digest_size; 348 void *digest; 349 int err = 0; 350 351 if (unlikely(cancel)) 352 goto out; 353 354 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0)) 355 goto out; 356 357 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm); 358 digest = kmalloc(digest_size, GFP_NOIO); 359 if (digest) { 360 sector_t sector = peer_req->i.sector; 361 unsigned int size = peer_req->i.size; 362 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest); 363 /* Free peer_req and pages before send. 364 * In case we block on congestion, we could otherwise run into 365 * some distributed deadlock, if the other side blocks on 366 * congestion as well, because our receiver blocks in 367 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 368 drbd_free_peer_req(device, peer_req); 369 peer_req = NULL; 370 inc_rs_pending(device); 371 err = drbd_send_drequest_csum(peer_device, sector, size, 372 digest, digest_size, 373 P_CSUM_RS_REQUEST); 374 kfree(digest); 375 } else { 376 drbd_err(device, "kmalloc() of digest failed.\n"); 377 err = -ENOMEM; 378 } 379 380 out: 381 if (peer_req) 382 drbd_free_peer_req(device, peer_req); 383 384 if (unlikely(err)) 385 drbd_err(device, "drbd_send_drequest(..., csum) failed\n"); 386 return err; 387 } 388 389 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 390 391 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size) 392 { 393 struct drbd_device *device = peer_device->device; 394 struct drbd_peer_request *peer_req; 395 396 if (!get_ldev(device)) 397 return -EIO; 398 399 /* GFP_TRY, because if there is no memory available right now, this may 400 * be rescheduled for later. It is "only" background resync, after all. */ 401 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector, 402 size, true /* has real payload */, GFP_TRY); 403 if (!peer_req) 404 goto defer; 405 406 peer_req->w.cb = w_e_send_csum; 407 spin_lock_irq(&device->resource->req_lock); 408 list_add_tail(&peer_req->w.list, &device->read_ee); 409 spin_unlock_irq(&device->resource->req_lock); 410 411 atomic_add(size >> 9, &device->rs_sect_ev); 412 if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0) 413 return 0; 414 415 /* If it failed because of ENOMEM, retry should help. If it failed 416 * because bio_add_page failed (probably broken lower level driver), 417 * retry may or may not help. 418 * If it does not, you may need to force disconnect. */ 419 spin_lock_irq(&device->resource->req_lock); 420 list_del(&peer_req->w.list); 421 spin_unlock_irq(&device->resource->req_lock); 422 423 drbd_free_peer_req(device, peer_req); 424 defer: 425 put_ldev(device); 426 return -EAGAIN; 427 } 428 429 int w_resync_timer(struct drbd_work *w, int cancel) 430 { 431 struct drbd_device *device = 432 container_of(w, struct drbd_device, resync_work); 433 434 switch (device->state.conn) { 435 case C_VERIFY_S: 436 make_ov_request(device, cancel); 437 break; 438 case C_SYNC_TARGET: 439 make_resync_request(device, cancel); 440 break; 441 } 442 443 return 0; 444 } 445 446 void resync_timer_fn(unsigned long data) 447 { 448 struct drbd_device *device = (struct drbd_device *) data; 449 450 drbd_queue_work_if_unqueued( 451 &first_peer_device(device)->connection->sender_work, 452 &device->resync_work); 453 } 454 455 static void fifo_set(struct fifo_buffer *fb, int value) 456 { 457 int i; 458 459 for (i = 0; i < fb->size; i++) 460 fb->values[i] = value; 461 } 462 463 static int fifo_push(struct fifo_buffer *fb, int value) 464 { 465 int ov; 466 467 ov = fb->values[fb->head_index]; 468 fb->values[fb->head_index++] = value; 469 470 if (fb->head_index >= fb->size) 471 fb->head_index = 0; 472 473 return ov; 474 } 475 476 static void fifo_add_val(struct fifo_buffer *fb, int value) 477 { 478 int i; 479 480 for (i = 0; i < fb->size; i++) 481 fb->values[i] += value; 482 } 483 484 struct fifo_buffer *fifo_alloc(int fifo_size) 485 { 486 struct fifo_buffer *fb; 487 488 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO); 489 if (!fb) 490 return NULL; 491 492 fb->head_index = 0; 493 fb->size = fifo_size; 494 fb->total = 0; 495 496 return fb; 497 } 498 499 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in) 500 { 501 struct disk_conf *dc; 502 unsigned int want; /* The number of sectors we want in-flight */ 503 int req_sect; /* Number of sectors to request in this turn */ 504 int correction; /* Number of sectors more we need in-flight */ 505 int cps; /* correction per invocation of drbd_rs_controller() */ 506 int steps; /* Number of time steps to plan ahead */ 507 int curr_corr; 508 int max_sect; 509 struct fifo_buffer *plan; 510 511 dc = rcu_dereference(device->ldev->disk_conf); 512 plan = rcu_dereference(device->rs_plan_s); 513 514 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */ 515 516 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */ 517 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps; 518 } else { /* normal path */ 519 want = dc->c_fill_target ? dc->c_fill_target : 520 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10); 521 } 522 523 correction = want - device->rs_in_flight - plan->total; 524 525 /* Plan ahead */ 526 cps = correction / steps; 527 fifo_add_val(plan, cps); 528 plan->total += cps * steps; 529 530 /* What we do in this step */ 531 curr_corr = fifo_push(plan, 0); 532 plan->total -= curr_corr; 533 534 req_sect = sect_in + curr_corr; 535 if (req_sect < 0) 536 req_sect = 0; 537 538 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ; 539 if (req_sect > max_sect) 540 req_sect = max_sect; 541 542 /* 543 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n", 544 sect_in, device->rs_in_flight, want, correction, 545 steps, cps, device->rs_planed, curr_corr, req_sect); 546 */ 547 548 return req_sect; 549 } 550 551 static int drbd_rs_number_requests(struct drbd_device *device) 552 { 553 unsigned int sect_in; /* Number of sectors that came in since the last turn */ 554 int number, mxb; 555 556 sect_in = atomic_xchg(&device->rs_sect_in, 0); 557 device->rs_in_flight -= sect_in; 558 559 rcu_read_lock(); 560 mxb = drbd_get_max_buffers(device) / 2; 561 if (rcu_dereference(device->rs_plan_s)->size) { 562 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9); 563 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME; 564 } else { 565 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate; 566 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ); 567 } 568 rcu_read_unlock(); 569 570 /* Don't have more than "max-buffers"/2 in-flight. 571 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(), 572 * potentially causing a distributed deadlock on congestion during 573 * online-verify or (checksum-based) resync, if max-buffers, 574 * socket buffer sizes and resync rate settings are mis-configured. */ 575 576 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k), 577 * mxb (as used here, and in drbd_alloc_pages on the peer) is 578 * "number of pages" (typically also 4k), 579 * but "rs_in_flight" is in "sectors" (512 Byte). */ 580 if (mxb - device->rs_in_flight/8 < number) 581 number = mxb - device->rs_in_flight/8; 582 583 return number; 584 } 585 586 static int make_resync_request(struct drbd_device *const device, int cancel) 587 { 588 struct drbd_peer_device *const peer_device = first_peer_device(device); 589 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL; 590 unsigned long bit; 591 sector_t sector; 592 const sector_t capacity = drbd_get_capacity(device->this_bdev); 593 int max_bio_size; 594 int number, rollback_i, size; 595 int align, requeue = 0; 596 int i = 0; 597 598 if (unlikely(cancel)) 599 return 0; 600 601 if (device->rs_total == 0) { 602 /* empty resync? */ 603 drbd_resync_finished(device); 604 return 0; 605 } 606 607 if (!get_ldev(device)) { 608 /* Since we only need to access device->rsync a 609 get_ldev_if_state(device,D_FAILED) would be sufficient, but 610 to continue resync with a broken disk makes no sense at 611 all */ 612 drbd_err(device, "Disk broke down during resync!\n"); 613 return 0; 614 } 615 616 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9; 617 number = drbd_rs_number_requests(device); 618 if (number <= 0) 619 goto requeue; 620 621 for (i = 0; i < number; i++) { 622 /* Stop generating RS requests when half of the send buffer is filled, 623 * but notify TCP that we'd like to have more space. */ 624 mutex_lock(&connection->data.mutex); 625 if (connection->data.socket) { 626 struct sock *sk = connection->data.socket->sk; 627 int queued = sk->sk_wmem_queued; 628 int sndbuf = sk->sk_sndbuf; 629 if (queued > sndbuf / 2) { 630 requeue = 1; 631 if (sk->sk_socket) 632 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 633 } 634 } else 635 requeue = 1; 636 mutex_unlock(&connection->data.mutex); 637 if (requeue) 638 goto requeue; 639 640 next_sector: 641 size = BM_BLOCK_SIZE; 642 bit = drbd_bm_find_next(device, device->bm_resync_fo); 643 644 if (bit == DRBD_END_OF_BITMAP) { 645 device->bm_resync_fo = drbd_bm_bits(device); 646 put_ldev(device); 647 return 0; 648 } 649 650 sector = BM_BIT_TO_SECT(bit); 651 652 if (drbd_try_rs_begin_io(device, sector)) { 653 device->bm_resync_fo = bit; 654 goto requeue; 655 } 656 device->bm_resync_fo = bit + 1; 657 658 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) { 659 drbd_rs_complete_io(device, sector); 660 goto next_sector; 661 } 662 663 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE 664 /* try to find some adjacent bits. 665 * we stop if we have already the maximum req size. 666 * 667 * Additionally always align bigger requests, in order to 668 * be prepared for all stripe sizes of software RAIDs. 669 */ 670 align = 1; 671 rollback_i = i; 672 while (i < number) { 673 if (size + BM_BLOCK_SIZE > max_bio_size) 674 break; 675 676 /* Be always aligned */ 677 if (sector & ((1<<(align+3))-1)) 678 break; 679 680 /* do not cross extent boundaries */ 681 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0) 682 break; 683 /* now, is it actually dirty, after all? 684 * caution, drbd_bm_test_bit is tri-state for some 685 * obscure reason; ( b == 0 ) would get the out-of-band 686 * only accidentally right because of the "oddly sized" 687 * adjustment below */ 688 if (drbd_bm_test_bit(device, bit+1) != 1) 689 break; 690 bit++; 691 size += BM_BLOCK_SIZE; 692 if ((BM_BLOCK_SIZE << align) <= size) 693 align++; 694 i++; 695 } 696 /* if we merged some, 697 * reset the offset to start the next drbd_bm_find_next from */ 698 if (size > BM_BLOCK_SIZE) 699 device->bm_resync_fo = bit + 1; 700 #endif 701 702 /* adjust very last sectors, in case we are oddly sized */ 703 if (sector + (size>>9) > capacity) 704 size = (capacity-sector)<<9; 705 706 if (device->use_csums) { 707 switch (read_for_csum(peer_device, sector, size)) { 708 case -EIO: /* Disk failure */ 709 put_ldev(device); 710 return -EIO; 711 case -EAGAIN: /* allocation failed, or ldev busy */ 712 drbd_rs_complete_io(device, sector); 713 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 714 i = rollback_i; 715 goto requeue; 716 case 0: 717 /* everything ok */ 718 break; 719 default: 720 BUG(); 721 } 722 } else { 723 int err; 724 725 inc_rs_pending(device); 726 err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST, 727 sector, size, ID_SYNCER); 728 if (err) { 729 drbd_err(device, "drbd_send_drequest() failed, aborting...\n"); 730 dec_rs_pending(device); 731 put_ldev(device); 732 return err; 733 } 734 } 735 } 736 737 if (device->bm_resync_fo >= drbd_bm_bits(device)) { 738 /* last syncer _request_ was sent, 739 * but the P_RS_DATA_REPLY not yet received. sync will end (and 740 * next sync group will resume), as soon as we receive the last 741 * resync data block, and the last bit is cleared. 742 * until then resync "work" is "inactive" ... 743 */ 744 put_ldev(device); 745 return 0; 746 } 747 748 requeue: 749 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 750 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 751 put_ldev(device); 752 return 0; 753 } 754 755 static int make_ov_request(struct drbd_device *device, int cancel) 756 { 757 int number, i, size; 758 sector_t sector; 759 const sector_t capacity = drbd_get_capacity(device->this_bdev); 760 bool stop_sector_reached = false; 761 762 if (unlikely(cancel)) 763 return 1; 764 765 number = drbd_rs_number_requests(device); 766 767 sector = device->ov_position; 768 for (i = 0; i < number; i++) { 769 if (sector >= capacity) 770 return 1; 771 772 /* We check for "finished" only in the reply path: 773 * w_e_end_ov_reply(). 774 * We need to send at least one request out. */ 775 stop_sector_reached = i > 0 776 && verify_can_do_stop_sector(device) 777 && sector >= device->ov_stop_sector; 778 if (stop_sector_reached) 779 break; 780 781 size = BM_BLOCK_SIZE; 782 783 if (drbd_try_rs_begin_io(device, sector)) { 784 device->ov_position = sector; 785 goto requeue; 786 } 787 788 if (sector + (size>>9) > capacity) 789 size = (capacity-sector)<<9; 790 791 inc_rs_pending(device); 792 if (drbd_send_ov_request(first_peer_device(device), sector, size)) { 793 dec_rs_pending(device); 794 return 0; 795 } 796 sector += BM_SECT_PER_BIT; 797 } 798 device->ov_position = sector; 799 800 requeue: 801 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 802 if (i == 0 || !stop_sector_reached) 803 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 804 return 1; 805 } 806 807 int w_ov_finished(struct drbd_work *w, int cancel) 808 { 809 struct drbd_device_work *dw = 810 container_of(w, struct drbd_device_work, w); 811 struct drbd_device *device = dw->device; 812 kfree(dw); 813 ov_out_of_sync_print(device); 814 drbd_resync_finished(device); 815 816 return 0; 817 } 818 819 static int w_resync_finished(struct drbd_work *w, int cancel) 820 { 821 struct drbd_device_work *dw = 822 container_of(w, struct drbd_device_work, w); 823 struct drbd_device *device = dw->device; 824 kfree(dw); 825 826 drbd_resync_finished(device); 827 828 return 0; 829 } 830 831 static void ping_peer(struct drbd_device *device) 832 { 833 struct drbd_connection *connection = first_peer_device(device)->connection; 834 835 clear_bit(GOT_PING_ACK, &connection->flags); 836 request_ping(connection); 837 wait_event(connection->ping_wait, 838 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED); 839 } 840 841 int drbd_resync_finished(struct drbd_device *device) 842 { 843 unsigned long db, dt, dbdt; 844 unsigned long n_oos; 845 union drbd_state os, ns; 846 struct drbd_device_work *dw; 847 char *khelper_cmd = NULL; 848 int verify_done = 0; 849 850 /* Remove all elements from the resync LRU. Since future actions 851 * might set bits in the (main) bitmap, then the entries in the 852 * resync LRU would be wrong. */ 853 if (drbd_rs_del_all(device)) { 854 /* In case this is not possible now, most probably because 855 * there are P_RS_DATA_REPLY Packets lingering on the worker's 856 * queue (or even the read operations for those packets 857 * is not finished by now). Retry in 100ms. */ 858 859 schedule_timeout_interruptible(HZ / 10); 860 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC); 861 if (dw) { 862 dw->w.cb = w_resync_finished; 863 dw->device = device; 864 drbd_queue_work(&first_peer_device(device)->connection->sender_work, 865 &dw->w); 866 return 1; 867 } 868 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n"); 869 } 870 871 dt = (jiffies - device->rs_start - device->rs_paused) / HZ; 872 if (dt <= 0) 873 dt = 1; 874 875 db = device->rs_total; 876 /* adjust for verify start and stop sectors, respective reached position */ 877 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) 878 db -= device->ov_left; 879 880 dbdt = Bit2KB(db/dt); 881 device->rs_paused /= HZ; 882 883 if (!get_ldev(device)) 884 goto out; 885 886 ping_peer(device); 887 888 spin_lock_irq(&device->resource->req_lock); 889 os = drbd_read_state(device); 890 891 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T); 892 893 /* This protects us against multiple calls (that can happen in the presence 894 of application IO), and against connectivity loss just before we arrive here. */ 895 if (os.conn <= C_CONNECTED) 896 goto out_unlock; 897 898 ns = os; 899 ns.conn = C_CONNECTED; 900 901 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n", 902 verify_done ? "Online verify" : "Resync", 903 dt + device->rs_paused, device->rs_paused, dbdt); 904 905 n_oos = drbd_bm_total_weight(device); 906 907 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) { 908 if (n_oos) { 909 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n", 910 n_oos, Bit2KB(1)); 911 khelper_cmd = "out-of-sync"; 912 } 913 } else { 914 D_ASSERT(device, (n_oos - device->rs_failed) == 0); 915 916 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 917 khelper_cmd = "after-resync-target"; 918 919 if (device->use_csums && device->rs_total) { 920 const unsigned long s = device->rs_same_csum; 921 const unsigned long t = device->rs_total; 922 const int ratio = 923 (t == 0) ? 0 : 924 (t < 100000) ? ((s*100)/t) : (s/(t/100)); 925 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; " 926 "transferred %luK total %luK\n", 927 ratio, 928 Bit2KB(device->rs_same_csum), 929 Bit2KB(device->rs_total - device->rs_same_csum), 930 Bit2KB(device->rs_total)); 931 } 932 } 933 934 if (device->rs_failed) { 935 drbd_info(device, " %lu failed blocks\n", device->rs_failed); 936 937 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 938 ns.disk = D_INCONSISTENT; 939 ns.pdsk = D_UP_TO_DATE; 940 } else { 941 ns.disk = D_UP_TO_DATE; 942 ns.pdsk = D_INCONSISTENT; 943 } 944 } else { 945 ns.disk = D_UP_TO_DATE; 946 ns.pdsk = D_UP_TO_DATE; 947 948 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 949 if (device->p_uuid) { 950 int i; 951 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++) 952 _drbd_uuid_set(device, i, device->p_uuid[i]); 953 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]); 954 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]); 955 } else { 956 drbd_err(device, "device->p_uuid is NULL! BUG\n"); 957 } 958 } 959 960 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) { 961 /* for verify runs, we don't update uuids here, 962 * so there would be nothing to report. */ 963 drbd_uuid_set_bm(device, 0UL); 964 drbd_print_uuids(device, "updated UUIDs"); 965 if (device->p_uuid) { 966 /* Now the two UUID sets are equal, update what we 967 * know of the peer. */ 968 int i; 969 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++) 970 device->p_uuid[i] = device->ldev->md.uuid[i]; 971 } 972 } 973 } 974 975 _drbd_set_state(device, ns, CS_VERBOSE, NULL); 976 out_unlock: 977 spin_unlock_irq(&device->resource->req_lock); 978 put_ldev(device); 979 out: 980 device->rs_total = 0; 981 device->rs_failed = 0; 982 device->rs_paused = 0; 983 984 /* reset start sector, if we reached end of device */ 985 if (verify_done && device->ov_left == 0) 986 device->ov_start_sector = 0; 987 988 drbd_md_sync(device); 989 990 if (khelper_cmd) 991 drbd_khelper(device, khelper_cmd); 992 993 return 1; 994 } 995 996 /* helper */ 997 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req) 998 { 999 if (drbd_peer_req_has_active_page(peer_req)) { 1000 /* This might happen if sendpage() has not finished */ 1001 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT; 1002 atomic_add(i, &device->pp_in_use_by_net); 1003 atomic_sub(i, &device->pp_in_use); 1004 spin_lock_irq(&device->resource->req_lock); 1005 list_add_tail(&peer_req->w.list, &device->net_ee); 1006 spin_unlock_irq(&device->resource->req_lock); 1007 wake_up(&drbd_pp_wait); 1008 } else 1009 drbd_free_peer_req(device, peer_req); 1010 } 1011 1012 /** 1013 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 1014 * @device: DRBD device. 1015 * @w: work object. 1016 * @cancel: The connection will be closed anyways 1017 */ 1018 int w_e_end_data_req(struct drbd_work *w, int cancel) 1019 { 1020 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1021 struct drbd_peer_device *peer_device = peer_req->peer_device; 1022 struct drbd_device *device = peer_device->device; 1023 int err; 1024 1025 if (unlikely(cancel)) { 1026 drbd_free_peer_req(device, peer_req); 1027 dec_unacked(device); 1028 return 0; 1029 } 1030 1031 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1032 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req); 1033 } else { 1034 if (__ratelimit(&drbd_ratelimit_state)) 1035 drbd_err(device, "Sending NegDReply. sector=%llus.\n", 1036 (unsigned long long)peer_req->i.sector); 1037 1038 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req); 1039 } 1040 1041 dec_unacked(device); 1042 1043 move_to_net_ee_or_free(device, peer_req); 1044 1045 if (unlikely(err)) 1046 drbd_err(device, "drbd_send_block() failed\n"); 1047 return err; 1048 } 1049 1050 /** 1051 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST 1052 * @w: work object. 1053 * @cancel: The connection will be closed anyways 1054 */ 1055 int w_e_end_rsdata_req(struct drbd_work *w, int cancel) 1056 { 1057 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1058 struct drbd_peer_device *peer_device = peer_req->peer_device; 1059 struct drbd_device *device = peer_device->device; 1060 int err; 1061 1062 if (unlikely(cancel)) { 1063 drbd_free_peer_req(device, peer_req); 1064 dec_unacked(device); 1065 return 0; 1066 } 1067 1068 if (get_ldev_if_state(device, D_FAILED)) { 1069 drbd_rs_complete_io(device, peer_req->i.sector); 1070 put_ldev(device); 1071 } 1072 1073 if (device->state.conn == C_AHEAD) { 1074 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req); 1075 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1076 if (likely(device->state.pdsk >= D_INCONSISTENT)) { 1077 inc_rs_pending(device); 1078 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req); 1079 } else { 1080 if (__ratelimit(&drbd_ratelimit_state)) 1081 drbd_err(device, "Not sending RSDataReply, " 1082 "partner DISKLESS!\n"); 1083 err = 0; 1084 } 1085 } else { 1086 if (__ratelimit(&drbd_ratelimit_state)) 1087 drbd_err(device, "Sending NegRSDReply. sector %llus.\n", 1088 (unsigned long long)peer_req->i.sector); 1089 1090 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req); 1091 1092 /* update resync data with failure */ 1093 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size); 1094 } 1095 1096 dec_unacked(device); 1097 1098 move_to_net_ee_or_free(device, peer_req); 1099 1100 if (unlikely(err)) 1101 drbd_err(device, "drbd_send_block() failed\n"); 1102 return err; 1103 } 1104 1105 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel) 1106 { 1107 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1108 struct drbd_peer_device *peer_device = peer_req->peer_device; 1109 struct drbd_device *device = peer_device->device; 1110 struct digest_info *di; 1111 int digest_size; 1112 void *digest = NULL; 1113 int err, eq = 0; 1114 1115 if (unlikely(cancel)) { 1116 drbd_free_peer_req(device, peer_req); 1117 dec_unacked(device); 1118 return 0; 1119 } 1120 1121 if (get_ldev(device)) { 1122 drbd_rs_complete_io(device, peer_req->i.sector); 1123 put_ldev(device); 1124 } 1125 1126 di = peer_req->digest; 1127 1128 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1129 /* quick hack to try to avoid a race against reconfiguration. 1130 * a real fix would be much more involved, 1131 * introducing more locking mechanisms */ 1132 if (peer_device->connection->csums_tfm) { 1133 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm); 1134 D_ASSERT(device, digest_size == di->digest_size); 1135 digest = kmalloc(digest_size, GFP_NOIO); 1136 } 1137 if (digest) { 1138 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest); 1139 eq = !memcmp(digest, di->digest, digest_size); 1140 kfree(digest); 1141 } 1142 1143 if (eq) { 1144 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size); 1145 /* rs_same_csums unit is BM_BLOCK_SIZE */ 1146 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT; 1147 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req); 1148 } else { 1149 inc_rs_pending(device); 1150 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */ 1151 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */ 1152 kfree(di); 1153 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req); 1154 } 1155 } else { 1156 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req); 1157 if (__ratelimit(&drbd_ratelimit_state)) 1158 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n"); 1159 } 1160 1161 dec_unacked(device); 1162 move_to_net_ee_or_free(device, peer_req); 1163 1164 if (unlikely(err)) 1165 drbd_err(device, "drbd_send_block/ack() failed\n"); 1166 return err; 1167 } 1168 1169 int w_e_end_ov_req(struct drbd_work *w, int cancel) 1170 { 1171 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1172 struct drbd_peer_device *peer_device = peer_req->peer_device; 1173 struct drbd_device *device = peer_device->device; 1174 sector_t sector = peer_req->i.sector; 1175 unsigned int size = peer_req->i.size; 1176 int digest_size; 1177 void *digest; 1178 int err = 0; 1179 1180 if (unlikely(cancel)) 1181 goto out; 1182 1183 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm); 1184 digest = kmalloc(digest_size, GFP_NOIO); 1185 if (!digest) { 1186 err = 1; /* terminate the connection in case the allocation failed */ 1187 goto out; 1188 } 1189 1190 if (likely(!(peer_req->flags & EE_WAS_ERROR))) 1191 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest); 1192 else 1193 memset(digest, 0, digest_size); 1194 1195 /* Free e and pages before send. 1196 * In case we block on congestion, we could otherwise run into 1197 * some distributed deadlock, if the other side blocks on 1198 * congestion as well, because our receiver blocks in 1199 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1200 drbd_free_peer_req(device, peer_req); 1201 peer_req = NULL; 1202 inc_rs_pending(device); 1203 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY); 1204 if (err) 1205 dec_rs_pending(device); 1206 kfree(digest); 1207 1208 out: 1209 if (peer_req) 1210 drbd_free_peer_req(device, peer_req); 1211 dec_unacked(device); 1212 return err; 1213 } 1214 1215 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size) 1216 { 1217 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) { 1218 device->ov_last_oos_size += size>>9; 1219 } else { 1220 device->ov_last_oos_start = sector; 1221 device->ov_last_oos_size = size>>9; 1222 } 1223 drbd_set_out_of_sync(device, sector, size); 1224 } 1225 1226 int w_e_end_ov_reply(struct drbd_work *w, int cancel) 1227 { 1228 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1229 struct drbd_peer_device *peer_device = peer_req->peer_device; 1230 struct drbd_device *device = peer_device->device; 1231 struct digest_info *di; 1232 void *digest; 1233 sector_t sector = peer_req->i.sector; 1234 unsigned int size = peer_req->i.size; 1235 int digest_size; 1236 int err, eq = 0; 1237 bool stop_sector_reached = false; 1238 1239 if (unlikely(cancel)) { 1240 drbd_free_peer_req(device, peer_req); 1241 dec_unacked(device); 1242 return 0; 1243 } 1244 1245 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all 1246 * the resync lru has been cleaned up already */ 1247 if (get_ldev(device)) { 1248 drbd_rs_complete_io(device, peer_req->i.sector); 1249 put_ldev(device); 1250 } 1251 1252 di = peer_req->digest; 1253 1254 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1255 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm); 1256 digest = kmalloc(digest_size, GFP_NOIO); 1257 if (digest) { 1258 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest); 1259 1260 D_ASSERT(device, digest_size == di->digest_size); 1261 eq = !memcmp(digest, di->digest, digest_size); 1262 kfree(digest); 1263 } 1264 } 1265 1266 /* Free peer_req and pages before send. 1267 * In case we block on congestion, we could otherwise run into 1268 * some distributed deadlock, if the other side blocks on 1269 * congestion as well, because our receiver blocks in 1270 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1271 drbd_free_peer_req(device, peer_req); 1272 if (!eq) 1273 drbd_ov_out_of_sync_found(device, sector, size); 1274 else 1275 ov_out_of_sync_print(device); 1276 1277 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, 1278 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1279 1280 dec_unacked(device); 1281 1282 --device->ov_left; 1283 1284 /* let's advance progress step marks only for every other megabyte */ 1285 if ((device->ov_left & 0x200) == 0x200) 1286 drbd_advance_rs_marks(device, device->ov_left); 1287 1288 stop_sector_reached = verify_can_do_stop_sector(device) && 1289 (sector + (size>>9)) >= device->ov_stop_sector; 1290 1291 if (device->ov_left == 0 || stop_sector_reached) { 1292 ov_out_of_sync_print(device); 1293 drbd_resync_finished(device); 1294 } 1295 1296 return err; 1297 } 1298 1299 /* FIXME 1300 * We need to track the number of pending barrier acks, 1301 * and to be able to wait for them. 1302 * See also comment in drbd_adm_attach before drbd_suspend_io. 1303 */ 1304 static int drbd_send_barrier(struct drbd_connection *connection) 1305 { 1306 struct p_barrier *p; 1307 struct drbd_socket *sock; 1308 1309 sock = &connection->data; 1310 p = conn_prepare_command(connection, sock); 1311 if (!p) 1312 return -EIO; 1313 p->barrier = connection->send.current_epoch_nr; 1314 p->pad = 0; 1315 connection->send.current_epoch_writes = 0; 1316 1317 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0); 1318 } 1319 1320 int w_send_write_hint(struct drbd_work *w, int cancel) 1321 { 1322 struct drbd_device *device = 1323 container_of(w, struct drbd_device, unplug_work); 1324 struct drbd_socket *sock; 1325 1326 if (cancel) 1327 return 0; 1328 sock = &first_peer_device(device)->connection->data; 1329 if (!drbd_prepare_command(first_peer_device(device), sock)) 1330 return -EIO; 1331 return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0); 1332 } 1333 1334 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch) 1335 { 1336 if (!connection->send.seen_any_write_yet) { 1337 connection->send.seen_any_write_yet = true; 1338 connection->send.current_epoch_nr = epoch; 1339 connection->send.current_epoch_writes = 0; 1340 } 1341 } 1342 1343 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch) 1344 { 1345 /* re-init if first write on this connection */ 1346 if (!connection->send.seen_any_write_yet) 1347 return; 1348 if (connection->send.current_epoch_nr != epoch) { 1349 if (connection->send.current_epoch_writes) 1350 drbd_send_barrier(connection); 1351 connection->send.current_epoch_nr = epoch; 1352 } 1353 } 1354 1355 int w_send_out_of_sync(struct drbd_work *w, int cancel) 1356 { 1357 struct drbd_request *req = container_of(w, struct drbd_request, w); 1358 struct drbd_device *device = req->device; 1359 struct drbd_peer_device *const peer_device = first_peer_device(device); 1360 struct drbd_connection *const connection = peer_device->connection; 1361 int err; 1362 1363 if (unlikely(cancel)) { 1364 req_mod(req, SEND_CANCELED); 1365 return 0; 1366 } 1367 req->pre_send_jif = jiffies; 1368 1369 /* this time, no connection->send.current_epoch_writes++; 1370 * If it was sent, it was the closing barrier for the last 1371 * replicated epoch, before we went into AHEAD mode. 1372 * No more barriers will be sent, until we leave AHEAD mode again. */ 1373 maybe_send_barrier(connection, req->epoch); 1374 1375 err = drbd_send_out_of_sync(peer_device, req); 1376 req_mod(req, OOS_HANDED_TO_NETWORK); 1377 1378 return err; 1379 } 1380 1381 /** 1382 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request 1383 * @w: work object. 1384 * @cancel: The connection will be closed anyways 1385 */ 1386 int w_send_dblock(struct drbd_work *w, int cancel) 1387 { 1388 struct drbd_request *req = container_of(w, struct drbd_request, w); 1389 struct drbd_device *device = req->device; 1390 struct drbd_peer_device *const peer_device = first_peer_device(device); 1391 struct drbd_connection *connection = peer_device->connection; 1392 int err; 1393 1394 if (unlikely(cancel)) { 1395 req_mod(req, SEND_CANCELED); 1396 return 0; 1397 } 1398 req->pre_send_jif = jiffies; 1399 1400 re_init_if_first_write(connection, req->epoch); 1401 maybe_send_barrier(connection, req->epoch); 1402 connection->send.current_epoch_writes++; 1403 1404 err = drbd_send_dblock(peer_device, req); 1405 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1406 1407 return err; 1408 } 1409 1410 /** 1411 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet 1412 * @w: work object. 1413 * @cancel: The connection will be closed anyways 1414 */ 1415 int w_send_read_req(struct drbd_work *w, int cancel) 1416 { 1417 struct drbd_request *req = container_of(w, struct drbd_request, w); 1418 struct drbd_device *device = req->device; 1419 struct drbd_peer_device *const peer_device = first_peer_device(device); 1420 struct drbd_connection *connection = peer_device->connection; 1421 int err; 1422 1423 if (unlikely(cancel)) { 1424 req_mod(req, SEND_CANCELED); 1425 return 0; 1426 } 1427 req->pre_send_jif = jiffies; 1428 1429 /* Even read requests may close a write epoch, 1430 * if there was any yet. */ 1431 maybe_send_barrier(connection, req->epoch); 1432 1433 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size, 1434 (unsigned long)req); 1435 1436 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1437 1438 return err; 1439 } 1440 1441 int w_restart_disk_io(struct drbd_work *w, int cancel) 1442 { 1443 struct drbd_request *req = container_of(w, struct drbd_request, w); 1444 struct drbd_device *device = req->device; 1445 1446 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG) 1447 drbd_al_begin_io(device, &req->i); 1448 1449 drbd_req_make_private_bio(req, req->master_bio); 1450 req->private_bio->bi_bdev = device->ldev->backing_bdev; 1451 generic_make_request(req->private_bio); 1452 1453 return 0; 1454 } 1455 1456 static int _drbd_may_sync_now(struct drbd_device *device) 1457 { 1458 struct drbd_device *odev = device; 1459 int resync_after; 1460 1461 while (1) { 1462 if (!odev->ldev || odev->state.disk == D_DISKLESS) 1463 return 1; 1464 rcu_read_lock(); 1465 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1466 rcu_read_unlock(); 1467 if (resync_after == -1) 1468 return 1; 1469 odev = minor_to_device(resync_after); 1470 if (!odev) 1471 return 1; 1472 if ((odev->state.conn >= C_SYNC_SOURCE && 1473 odev->state.conn <= C_PAUSED_SYNC_T) || 1474 odev->state.aftr_isp || odev->state.peer_isp || 1475 odev->state.user_isp) 1476 return 0; 1477 } 1478 } 1479 1480 /** 1481 * _drbd_pause_after() - Pause resync on all devices that may not resync now 1482 * @device: DRBD device. 1483 * 1484 * Called from process context only (admin command and after_state_ch). 1485 */ 1486 static int _drbd_pause_after(struct drbd_device *device) 1487 { 1488 struct drbd_device *odev; 1489 int i, rv = 0; 1490 1491 rcu_read_lock(); 1492 idr_for_each_entry(&drbd_devices, odev, i) { 1493 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1494 continue; 1495 if (!_drbd_may_sync_now(odev)) 1496 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL) 1497 != SS_NOTHING_TO_DO); 1498 } 1499 rcu_read_unlock(); 1500 1501 return rv; 1502 } 1503 1504 /** 1505 * _drbd_resume_next() - Resume resync on all devices that may resync now 1506 * @device: DRBD device. 1507 * 1508 * Called from process context only (admin command and worker). 1509 */ 1510 static int _drbd_resume_next(struct drbd_device *device) 1511 { 1512 struct drbd_device *odev; 1513 int i, rv = 0; 1514 1515 rcu_read_lock(); 1516 idr_for_each_entry(&drbd_devices, odev, i) { 1517 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1518 continue; 1519 if (odev->state.aftr_isp) { 1520 if (_drbd_may_sync_now(odev)) 1521 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0), 1522 CS_HARD, NULL) 1523 != SS_NOTHING_TO_DO) ; 1524 } 1525 } 1526 rcu_read_unlock(); 1527 return rv; 1528 } 1529 1530 void resume_next_sg(struct drbd_device *device) 1531 { 1532 write_lock_irq(&global_state_lock); 1533 _drbd_resume_next(device); 1534 write_unlock_irq(&global_state_lock); 1535 } 1536 1537 void suspend_other_sg(struct drbd_device *device) 1538 { 1539 write_lock_irq(&global_state_lock); 1540 _drbd_pause_after(device); 1541 write_unlock_irq(&global_state_lock); 1542 } 1543 1544 /* caller must hold global_state_lock */ 1545 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor) 1546 { 1547 struct drbd_device *odev; 1548 int resync_after; 1549 1550 if (o_minor == -1) 1551 return NO_ERROR; 1552 if (o_minor < -1 || o_minor > MINORMASK) 1553 return ERR_RESYNC_AFTER; 1554 1555 /* check for loops */ 1556 odev = minor_to_device(o_minor); 1557 while (1) { 1558 if (odev == device) 1559 return ERR_RESYNC_AFTER_CYCLE; 1560 1561 /* You are free to depend on diskless, non-existing, 1562 * or not yet/no longer existing minors. 1563 * We only reject dependency loops. 1564 * We cannot follow the dependency chain beyond a detached or 1565 * missing minor. 1566 */ 1567 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS) 1568 return NO_ERROR; 1569 1570 rcu_read_lock(); 1571 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1572 rcu_read_unlock(); 1573 /* dependency chain ends here, no cycles. */ 1574 if (resync_after == -1) 1575 return NO_ERROR; 1576 1577 /* follow the dependency chain */ 1578 odev = minor_to_device(resync_after); 1579 } 1580 } 1581 1582 /* caller must hold global_state_lock */ 1583 void drbd_resync_after_changed(struct drbd_device *device) 1584 { 1585 int changes; 1586 1587 do { 1588 changes = _drbd_pause_after(device); 1589 changes |= _drbd_resume_next(device); 1590 } while (changes); 1591 } 1592 1593 void drbd_rs_controller_reset(struct drbd_device *device) 1594 { 1595 struct fifo_buffer *plan; 1596 1597 atomic_set(&device->rs_sect_in, 0); 1598 atomic_set(&device->rs_sect_ev, 0); 1599 device->rs_in_flight = 0; 1600 1601 /* Updating the RCU protected object in place is necessary since 1602 this function gets called from atomic context. 1603 It is valid since all other updates also lead to an completely 1604 empty fifo */ 1605 rcu_read_lock(); 1606 plan = rcu_dereference(device->rs_plan_s); 1607 plan->total = 0; 1608 fifo_set(plan, 0); 1609 rcu_read_unlock(); 1610 } 1611 1612 void start_resync_timer_fn(unsigned long data) 1613 { 1614 struct drbd_device *device = (struct drbd_device *) data; 1615 drbd_device_post_work(device, RS_START); 1616 } 1617 1618 static void do_start_resync(struct drbd_device *device) 1619 { 1620 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) { 1621 drbd_warn(device, "postponing start_resync ...\n"); 1622 device->start_resync_timer.expires = jiffies + HZ/10; 1623 add_timer(&device->start_resync_timer); 1624 return; 1625 } 1626 1627 drbd_start_resync(device, C_SYNC_SOURCE); 1628 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags); 1629 } 1630 1631 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device) 1632 { 1633 bool csums_after_crash_only; 1634 rcu_read_lock(); 1635 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only; 1636 rcu_read_unlock(); 1637 return connection->agreed_pro_version >= 89 && /* supported? */ 1638 connection->csums_tfm && /* configured? */ 1639 (csums_after_crash_only == 0 /* use for each resync? */ 1640 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */ 1641 } 1642 1643 /** 1644 * drbd_start_resync() - Start the resync process 1645 * @device: DRBD device. 1646 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET 1647 * 1648 * This function might bring you directly into one of the 1649 * C_PAUSED_SYNC_* states. 1650 */ 1651 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) 1652 { 1653 struct drbd_peer_device *peer_device = first_peer_device(device); 1654 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; 1655 union drbd_state ns; 1656 int r; 1657 1658 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) { 1659 drbd_err(device, "Resync already running!\n"); 1660 return; 1661 } 1662 1663 if (!test_bit(B_RS_H_DONE, &device->flags)) { 1664 if (side == C_SYNC_TARGET) { 1665 /* Since application IO was locked out during C_WF_BITMAP_T and 1666 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET 1667 we check that we might make the data inconsistent. */ 1668 r = drbd_khelper(device, "before-resync-target"); 1669 r = (r >> 8) & 0xff; 1670 if (r > 0) { 1671 drbd_info(device, "before-resync-target handler returned %d, " 1672 "dropping connection.\n", r); 1673 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 1674 return; 1675 } 1676 } else /* C_SYNC_SOURCE */ { 1677 r = drbd_khelper(device, "before-resync-source"); 1678 r = (r >> 8) & 0xff; 1679 if (r > 0) { 1680 if (r == 3) { 1681 drbd_info(device, "before-resync-source handler returned %d, " 1682 "ignoring. Old userland tools?", r); 1683 } else { 1684 drbd_info(device, "before-resync-source handler returned %d, " 1685 "dropping connection.\n", r); 1686 conn_request_state(connection, 1687 NS(conn, C_DISCONNECTING), CS_HARD); 1688 return; 1689 } 1690 } 1691 } 1692 } 1693 1694 if (current == connection->worker.task) { 1695 /* The worker should not sleep waiting for state_mutex, 1696 that can take long */ 1697 if (!mutex_trylock(device->state_mutex)) { 1698 set_bit(B_RS_H_DONE, &device->flags); 1699 device->start_resync_timer.expires = jiffies + HZ/5; 1700 add_timer(&device->start_resync_timer); 1701 return; 1702 } 1703 } else { 1704 mutex_lock(device->state_mutex); 1705 } 1706 clear_bit(B_RS_H_DONE, &device->flags); 1707 1708 /* req_lock: serialize with drbd_send_and_submit() and others 1709 * global_state_lock: for stable sync-after dependencies */ 1710 spin_lock_irq(&device->resource->req_lock); 1711 write_lock(&global_state_lock); 1712 /* Did some connection breakage or IO error race with us? */ 1713 if (device->state.conn < C_CONNECTED 1714 || !get_ldev_if_state(device, D_NEGOTIATING)) { 1715 write_unlock(&global_state_lock); 1716 spin_unlock_irq(&device->resource->req_lock); 1717 mutex_unlock(device->state_mutex); 1718 return; 1719 } 1720 1721 ns = drbd_read_state(device); 1722 1723 ns.aftr_isp = !_drbd_may_sync_now(device); 1724 1725 ns.conn = side; 1726 1727 if (side == C_SYNC_TARGET) 1728 ns.disk = D_INCONSISTENT; 1729 else /* side == C_SYNC_SOURCE */ 1730 ns.pdsk = D_INCONSISTENT; 1731 1732 r = __drbd_set_state(device, ns, CS_VERBOSE, NULL); 1733 ns = drbd_read_state(device); 1734 1735 if (ns.conn < C_CONNECTED) 1736 r = SS_UNKNOWN_ERROR; 1737 1738 if (r == SS_SUCCESS) { 1739 unsigned long tw = drbd_bm_total_weight(device); 1740 unsigned long now = jiffies; 1741 int i; 1742 1743 device->rs_failed = 0; 1744 device->rs_paused = 0; 1745 device->rs_same_csum = 0; 1746 device->rs_last_events = 0; 1747 device->rs_last_sect_ev = 0; 1748 device->rs_total = tw; 1749 device->rs_start = now; 1750 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 1751 device->rs_mark_left[i] = tw; 1752 device->rs_mark_time[i] = now; 1753 } 1754 _drbd_pause_after(device); 1755 /* Forget potentially stale cached per resync extent bit-counts. 1756 * Open coded drbd_rs_cancel_all(device), we already have IRQs 1757 * disabled, and know the disk state is ok. */ 1758 spin_lock(&device->al_lock); 1759 lc_reset(device->resync); 1760 device->resync_locked = 0; 1761 device->resync_wenr = LC_FREE; 1762 spin_unlock(&device->al_lock); 1763 } 1764 write_unlock(&global_state_lock); 1765 spin_unlock_irq(&device->resource->req_lock); 1766 1767 if (r == SS_SUCCESS) { 1768 wake_up(&device->al_wait); /* for lc_reset() above */ 1769 /* reset rs_last_bcast when a resync or verify is started, 1770 * to deal with potential jiffies wrap. */ 1771 device->rs_last_bcast = jiffies - HZ; 1772 1773 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n", 1774 drbd_conn_str(ns.conn), 1775 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10), 1776 (unsigned long) device->rs_total); 1777 if (side == C_SYNC_TARGET) { 1778 device->bm_resync_fo = 0; 1779 device->use_csums = use_checksum_based_resync(connection, device); 1780 } else { 1781 device->use_csums = 0; 1782 } 1783 1784 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid 1785 * with w_send_oos, or the sync target will get confused as to 1786 * how much bits to resync. We cannot do that always, because for an 1787 * empty resync and protocol < 95, we need to do it here, as we call 1788 * drbd_resync_finished from here in that case. 1789 * We drbd_gen_and_send_sync_uuid here for protocol < 96, 1790 * and from after_state_ch otherwise. */ 1791 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96) 1792 drbd_gen_and_send_sync_uuid(peer_device); 1793 1794 if (connection->agreed_pro_version < 95 && device->rs_total == 0) { 1795 /* This still has a race (about when exactly the peers 1796 * detect connection loss) that can lead to a full sync 1797 * on next handshake. In 8.3.9 we fixed this with explicit 1798 * resync-finished notifications, but the fix 1799 * introduces a protocol change. Sleeping for some 1800 * time longer than the ping interval + timeout on the 1801 * SyncSource, to give the SyncTarget the chance to 1802 * detect connection loss, then waiting for a ping 1803 * response (implicit in drbd_resync_finished) reduces 1804 * the race considerably, but does not solve it. */ 1805 if (side == C_SYNC_SOURCE) { 1806 struct net_conf *nc; 1807 int timeo; 1808 1809 rcu_read_lock(); 1810 nc = rcu_dereference(connection->net_conf); 1811 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9; 1812 rcu_read_unlock(); 1813 schedule_timeout_interruptible(timeo); 1814 } 1815 drbd_resync_finished(device); 1816 } 1817 1818 drbd_rs_controller_reset(device); 1819 /* ns.conn may already be != device->state.conn, 1820 * we may have been paused in between, or become paused until 1821 * the timer triggers. 1822 * No matter, that is handled in resync_timer_fn() */ 1823 if (ns.conn == C_SYNC_TARGET) 1824 mod_timer(&device->resync_timer, jiffies); 1825 1826 drbd_md_sync(device); 1827 } 1828 put_ldev(device); 1829 mutex_unlock(device->state_mutex); 1830 } 1831 1832 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done) 1833 { 1834 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, }; 1835 device->rs_last_bcast = jiffies; 1836 1837 if (!get_ldev(device)) 1838 return; 1839 1840 drbd_bm_write_lazy(device, 0); 1841 if (resync_done && is_sync_state(device->state.conn)) 1842 drbd_resync_finished(device); 1843 1844 drbd_bcast_event(device, &sib); 1845 /* update timestamp, in case it took a while to write out stuff */ 1846 device->rs_last_bcast = jiffies; 1847 put_ldev(device); 1848 } 1849 1850 static void drbd_ldev_destroy(struct drbd_device *device) 1851 { 1852 lc_destroy(device->resync); 1853 device->resync = NULL; 1854 lc_destroy(device->act_log); 1855 device->act_log = NULL; 1856 __no_warn(local, 1857 drbd_free_ldev(device->ldev); 1858 device->ldev = NULL;); 1859 clear_bit(GOING_DISKLESS, &device->flags); 1860 wake_up(&device->misc_wait); 1861 } 1862 1863 static void go_diskless(struct drbd_device *device) 1864 { 1865 D_ASSERT(device, device->state.disk == D_FAILED); 1866 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will 1867 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch 1868 * the protected members anymore, though, so once put_ldev reaches zero 1869 * again, it will be safe to free them. */ 1870 1871 /* Try to write changed bitmap pages, read errors may have just 1872 * set some bits outside the area covered by the activity log. 1873 * 1874 * If we have an IO error during the bitmap writeout, 1875 * we will want a full sync next time, just in case. 1876 * (Do we want a specific meta data flag for this?) 1877 * 1878 * If that does not make it to stable storage either, 1879 * we cannot do anything about that anymore. 1880 * 1881 * We still need to check if both bitmap and ldev are present, we may 1882 * end up here after a failed attach, before ldev was even assigned. 1883 */ 1884 if (device->bitmap && device->ldev) { 1885 /* An interrupted resync or similar is allowed to recounts bits 1886 * while we detach. 1887 * Any modifications would not be expected anymore, though. 1888 */ 1889 if (drbd_bitmap_io_from_worker(device, drbd_bm_write, 1890 "detach", BM_LOCKED_TEST_ALLOWED)) { 1891 if (test_bit(WAS_READ_ERROR, &device->flags)) { 1892 drbd_md_set_flag(device, MDF_FULL_SYNC); 1893 drbd_md_sync(device); 1894 } 1895 } 1896 } 1897 1898 drbd_force_state(device, NS(disk, D_DISKLESS)); 1899 } 1900 1901 static int do_md_sync(struct drbd_device *device) 1902 { 1903 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n"); 1904 drbd_md_sync(device); 1905 return 0; 1906 } 1907 1908 /* only called from drbd_worker thread, no locking */ 1909 void __update_timing_details( 1910 struct drbd_thread_timing_details *tdp, 1911 unsigned int *cb_nr, 1912 void *cb, 1913 const char *fn, const unsigned int line) 1914 { 1915 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST; 1916 struct drbd_thread_timing_details *td = tdp + i; 1917 1918 td->start_jif = jiffies; 1919 td->cb_addr = cb; 1920 td->caller_fn = fn; 1921 td->line = line; 1922 td->cb_nr = *cb_nr; 1923 1924 i = (i+1) % DRBD_THREAD_DETAILS_HIST; 1925 td = tdp + i; 1926 memset(td, 0, sizeof(*td)); 1927 1928 ++(*cb_nr); 1929 } 1930 1931 #define WORK_PENDING(work_bit, todo) (todo & (1UL << work_bit)) 1932 static void do_device_work(struct drbd_device *device, const unsigned long todo) 1933 { 1934 if (WORK_PENDING(MD_SYNC, todo)) 1935 do_md_sync(device); 1936 if (WORK_PENDING(RS_DONE, todo) || 1937 WORK_PENDING(RS_PROGRESS, todo)) 1938 update_on_disk_bitmap(device, WORK_PENDING(RS_DONE, todo)); 1939 if (WORK_PENDING(GO_DISKLESS, todo)) 1940 go_diskless(device); 1941 if (WORK_PENDING(DESTROY_DISK, todo)) 1942 drbd_ldev_destroy(device); 1943 if (WORK_PENDING(RS_START, todo)) 1944 do_start_resync(device); 1945 } 1946 1947 #define DRBD_DEVICE_WORK_MASK \ 1948 ((1UL << GO_DISKLESS) \ 1949 |(1UL << DESTROY_DISK) \ 1950 |(1UL << MD_SYNC) \ 1951 |(1UL << RS_START) \ 1952 |(1UL << RS_PROGRESS) \ 1953 |(1UL << RS_DONE) \ 1954 ) 1955 1956 static unsigned long get_work_bits(unsigned long *flags) 1957 { 1958 unsigned long old, new; 1959 do { 1960 old = *flags; 1961 new = old & ~DRBD_DEVICE_WORK_MASK; 1962 } while (cmpxchg(flags, old, new) != old); 1963 return old & DRBD_DEVICE_WORK_MASK; 1964 } 1965 1966 static void do_unqueued_work(struct drbd_connection *connection) 1967 { 1968 struct drbd_peer_device *peer_device; 1969 int vnr; 1970 1971 rcu_read_lock(); 1972 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1973 struct drbd_device *device = peer_device->device; 1974 unsigned long todo = get_work_bits(&device->flags); 1975 if (!todo) 1976 continue; 1977 1978 kref_get(&device->kref); 1979 rcu_read_unlock(); 1980 do_device_work(device, todo); 1981 kref_put(&device->kref, drbd_destroy_device); 1982 rcu_read_lock(); 1983 } 1984 rcu_read_unlock(); 1985 } 1986 1987 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list) 1988 { 1989 spin_lock_irq(&queue->q_lock); 1990 list_splice_tail_init(&queue->q, work_list); 1991 spin_unlock_irq(&queue->q_lock); 1992 return !list_empty(work_list); 1993 } 1994 1995 static bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list) 1996 { 1997 spin_lock_irq(&queue->q_lock); 1998 if (!list_empty(&queue->q)) 1999 list_move(queue->q.next, work_list); 2000 spin_unlock_irq(&queue->q_lock); 2001 return !list_empty(work_list); 2002 } 2003 2004 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list) 2005 { 2006 DEFINE_WAIT(wait); 2007 struct net_conf *nc; 2008 int uncork, cork; 2009 2010 dequeue_work_item(&connection->sender_work, work_list); 2011 if (!list_empty(work_list)) 2012 return; 2013 2014 /* Still nothing to do? 2015 * Maybe we still need to close the current epoch, 2016 * even if no new requests are queued yet. 2017 * 2018 * Also, poke TCP, just in case. 2019 * Then wait for new work (or signal). */ 2020 rcu_read_lock(); 2021 nc = rcu_dereference(connection->net_conf); 2022 uncork = nc ? nc->tcp_cork : 0; 2023 rcu_read_unlock(); 2024 if (uncork) { 2025 mutex_lock(&connection->data.mutex); 2026 if (connection->data.socket) 2027 drbd_tcp_uncork(connection->data.socket); 2028 mutex_unlock(&connection->data.mutex); 2029 } 2030 2031 for (;;) { 2032 int send_barrier; 2033 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE); 2034 spin_lock_irq(&connection->resource->req_lock); 2035 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 2036 /* dequeue single item only, 2037 * we still use drbd_queue_work_front() in some places */ 2038 if (!list_empty(&connection->sender_work.q)) 2039 list_splice_tail_init(&connection->sender_work.q, work_list); 2040 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 2041 if (!list_empty(work_list) || signal_pending(current)) { 2042 spin_unlock_irq(&connection->resource->req_lock); 2043 break; 2044 } 2045 2046 /* We found nothing new to do, no to-be-communicated request, 2047 * no other work item. We may still need to close the last 2048 * epoch. Next incoming request epoch will be connection -> 2049 * current transfer log epoch number. If that is different 2050 * from the epoch of the last request we communicated, it is 2051 * safe to send the epoch separating barrier now. 2052 */ 2053 send_barrier = 2054 atomic_read(&connection->current_tle_nr) != 2055 connection->send.current_epoch_nr; 2056 spin_unlock_irq(&connection->resource->req_lock); 2057 2058 if (send_barrier) 2059 maybe_send_barrier(connection, 2060 connection->send.current_epoch_nr + 1); 2061 2062 if (test_bit(DEVICE_WORK_PENDING, &connection->flags)) 2063 break; 2064 2065 /* drbd_send() may have called flush_signals() */ 2066 if (get_t_state(&connection->worker) != RUNNING) 2067 break; 2068 2069 schedule(); 2070 /* may be woken up for other things but new work, too, 2071 * e.g. if the current epoch got closed. 2072 * In which case we send the barrier above. */ 2073 } 2074 finish_wait(&connection->sender_work.q_wait, &wait); 2075 2076 /* someone may have changed the config while we have been waiting above. */ 2077 rcu_read_lock(); 2078 nc = rcu_dereference(connection->net_conf); 2079 cork = nc ? nc->tcp_cork : 0; 2080 rcu_read_unlock(); 2081 mutex_lock(&connection->data.mutex); 2082 if (connection->data.socket) { 2083 if (cork) 2084 drbd_tcp_cork(connection->data.socket); 2085 else if (!uncork) 2086 drbd_tcp_uncork(connection->data.socket); 2087 } 2088 mutex_unlock(&connection->data.mutex); 2089 } 2090 2091 int drbd_worker(struct drbd_thread *thi) 2092 { 2093 struct drbd_connection *connection = thi->connection; 2094 struct drbd_work *w = NULL; 2095 struct drbd_peer_device *peer_device; 2096 LIST_HEAD(work_list); 2097 int vnr; 2098 2099 while (get_t_state(thi) == RUNNING) { 2100 drbd_thread_current_set_cpu(thi); 2101 2102 if (list_empty(&work_list)) { 2103 update_worker_timing_details(connection, wait_for_work); 2104 wait_for_work(connection, &work_list); 2105 } 2106 2107 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) { 2108 update_worker_timing_details(connection, do_unqueued_work); 2109 do_unqueued_work(connection); 2110 } 2111 2112 if (signal_pending(current)) { 2113 flush_signals(current); 2114 if (get_t_state(thi) == RUNNING) { 2115 drbd_warn(connection, "Worker got an unexpected signal\n"); 2116 continue; 2117 } 2118 break; 2119 } 2120 2121 if (get_t_state(thi) != RUNNING) 2122 break; 2123 2124 while (!list_empty(&work_list)) { 2125 w = list_first_entry(&work_list, struct drbd_work, list); 2126 list_del_init(&w->list); 2127 update_worker_timing_details(connection, w->cb); 2128 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0) 2129 continue; 2130 if (connection->cstate >= C_WF_REPORT_PARAMS) 2131 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 2132 } 2133 } 2134 2135 do { 2136 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) { 2137 update_worker_timing_details(connection, do_unqueued_work); 2138 do_unqueued_work(connection); 2139 } 2140 while (!list_empty(&work_list)) { 2141 w = list_first_entry(&work_list, struct drbd_work, list); 2142 list_del_init(&w->list); 2143 update_worker_timing_details(connection, w->cb); 2144 w->cb(w, 1); 2145 } 2146 dequeue_work_batch(&connection->sender_work, &work_list); 2147 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags)); 2148 2149 rcu_read_lock(); 2150 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 2151 struct drbd_device *device = peer_device->device; 2152 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE); 2153 kref_get(&device->kref); 2154 rcu_read_unlock(); 2155 drbd_device_cleanup(device); 2156 kref_put(&device->kref, drbd_destroy_device); 2157 rcu_read_lock(); 2158 } 2159 rcu_read_unlock(); 2160 2161 return 0; 2162 } 2163