1 /* 2 drbd_worker.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24 */ 25 26 #include <linux/module.h> 27 #include <linux/drbd.h> 28 #include <linux/sched.h> 29 #include <linux/wait.h> 30 #include <linux/mm.h> 31 #include <linux/memcontrol.h> 32 #include <linux/mm_inline.h> 33 #include <linux/slab.h> 34 #include <linux/random.h> 35 #include <linux/string.h> 36 #include <linux/scatterlist.h> 37 38 #include "drbd_int.h" 39 #include "drbd_protocol.h" 40 #include "drbd_req.h" 41 42 static int make_ov_request(struct drbd_device *, int); 43 static int make_resync_request(struct drbd_device *, int); 44 45 /* endio handlers: 46 * drbd_md_endio (defined here) 47 * drbd_request_endio (defined here) 48 * drbd_peer_request_endio (defined here) 49 * drbd_bm_endio (defined in drbd_bitmap.c) 50 * 51 * For all these callbacks, note the following: 52 * The callbacks will be called in irq context by the IDE drivers, 53 * and in Softirqs/Tasklets/BH context by the SCSI drivers. 54 * Try to get the locking right :) 55 * 56 */ 57 58 /* used for synchronous meta data and bitmap IO 59 * submitted by drbd_md_sync_page_io() 60 */ 61 void drbd_md_endio(struct bio *bio) 62 { 63 struct drbd_device *device; 64 65 device = bio->bi_private; 66 device->md_io.error = bio->bi_error; 67 68 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able 69 * to timeout on the lower level device, and eventually detach from it. 70 * If this io completion runs after that timeout expired, this 71 * drbd_md_put_buffer() may allow us to finally try and re-attach. 72 * During normal operation, this only puts that extra reference 73 * down to 1 again. 74 * Make sure we first drop the reference, and only then signal 75 * completion, or we may (in drbd_al_read_log()) cycle so fast into the 76 * next drbd_md_sync_page_io(), that we trigger the 77 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there. 78 */ 79 drbd_md_put_buffer(device); 80 device->md_io.done = 1; 81 wake_up(&device->misc_wait); 82 bio_put(bio); 83 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */ 84 put_ldev(device); 85 } 86 87 /* reads on behalf of the partner, 88 * "submitted" by the receiver 89 */ 90 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local) 91 { 92 unsigned long flags = 0; 93 struct drbd_peer_device *peer_device = peer_req->peer_device; 94 struct drbd_device *device = peer_device->device; 95 96 spin_lock_irqsave(&device->resource->req_lock, flags); 97 device->read_cnt += peer_req->i.size >> 9; 98 list_del(&peer_req->w.list); 99 if (list_empty(&device->read_ee)) 100 wake_up(&device->ee_wait); 101 if (test_bit(__EE_WAS_ERROR, &peer_req->flags)) 102 __drbd_chk_io_error(device, DRBD_READ_ERROR); 103 spin_unlock_irqrestore(&device->resource->req_lock, flags); 104 105 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w); 106 put_ldev(device); 107 } 108 109 /* writes on behalf of the partner, or resync writes, 110 * "submitted" by the receiver, final stage. */ 111 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local) 112 { 113 unsigned long flags = 0; 114 struct drbd_peer_device *peer_device = peer_req->peer_device; 115 struct drbd_device *device = peer_device->device; 116 struct drbd_connection *connection = peer_device->connection; 117 struct drbd_interval i; 118 int do_wake; 119 u64 block_id; 120 int do_al_complete_io; 121 122 /* after we moved peer_req to done_ee, 123 * we may no longer access it, 124 * it may be freed/reused already! 125 * (as soon as we release the req_lock) */ 126 i = peer_req->i; 127 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO; 128 block_id = peer_req->block_id; 129 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO; 130 131 spin_lock_irqsave(&device->resource->req_lock, flags); 132 device->writ_cnt += peer_req->i.size >> 9; 133 list_move_tail(&peer_req->w.list, &device->done_ee); 134 135 /* 136 * Do not remove from the write_requests tree here: we did not send the 137 * Ack yet and did not wake possibly waiting conflicting requests. 138 * Removed from the tree from "drbd_process_done_ee" within the 139 * appropriate dw.cb (e_end_block/e_end_resync_block) or from 140 * _drbd_clear_done_ee. 141 */ 142 143 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee); 144 145 /* FIXME do we want to detach for failed REQ_DISCARD? 146 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */ 147 if (peer_req->flags & EE_WAS_ERROR) 148 __drbd_chk_io_error(device, DRBD_WRITE_ERROR); 149 150 if (connection->cstate >= C_WF_REPORT_PARAMS) { 151 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */ 152 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work)) 153 kref_put(&device->kref, drbd_destroy_device); 154 } 155 spin_unlock_irqrestore(&device->resource->req_lock, flags); 156 157 if (block_id == ID_SYNCER) 158 drbd_rs_complete_io(device, i.sector); 159 160 if (do_wake) 161 wake_up(&device->ee_wait); 162 163 if (do_al_complete_io) 164 drbd_al_complete_io(device, &i); 165 166 put_ldev(device); 167 } 168 169 /* writes on behalf of the partner, or resync writes, 170 * "submitted" by the receiver. 171 */ 172 void drbd_peer_request_endio(struct bio *bio) 173 { 174 struct drbd_peer_request *peer_req = bio->bi_private; 175 struct drbd_device *device = peer_req->peer_device->device; 176 int is_write = bio_data_dir(bio) == WRITE; 177 int is_discard = !!(bio->bi_rw & REQ_DISCARD); 178 179 if (bio->bi_error && __ratelimit(&drbd_ratelimit_state)) 180 drbd_warn(device, "%s: error=%d s=%llus\n", 181 is_write ? (is_discard ? "discard" : "write") 182 : "read", bio->bi_error, 183 (unsigned long long)peer_req->i.sector); 184 185 if (bio->bi_error) 186 set_bit(__EE_WAS_ERROR, &peer_req->flags); 187 188 bio_put(bio); /* no need for the bio anymore */ 189 if (atomic_dec_and_test(&peer_req->pending_bios)) { 190 if (is_write) 191 drbd_endio_write_sec_final(peer_req); 192 else 193 drbd_endio_read_sec_final(peer_req); 194 } 195 } 196 197 void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device) 198 { 199 panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n", 200 device->minor, device->resource->name, device->vnr); 201 } 202 203 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request 204 */ 205 void drbd_request_endio(struct bio *bio) 206 { 207 unsigned long flags; 208 struct drbd_request *req = bio->bi_private; 209 struct drbd_device *device = req->device; 210 struct bio_and_error m; 211 enum drbd_req_event what; 212 213 /* If this request was aborted locally before, 214 * but now was completed "successfully", 215 * chances are that this caused arbitrary data corruption. 216 * 217 * "aborting" requests, or force-detaching the disk, is intended for 218 * completely blocked/hung local backing devices which do no longer 219 * complete requests at all, not even do error completions. In this 220 * situation, usually a hard-reset and failover is the only way out. 221 * 222 * By "aborting", basically faking a local error-completion, 223 * we allow for a more graceful swichover by cleanly migrating services. 224 * Still the affected node has to be rebooted "soon". 225 * 226 * By completing these requests, we allow the upper layers to re-use 227 * the associated data pages. 228 * 229 * If later the local backing device "recovers", and now DMAs some data 230 * from disk into the original request pages, in the best case it will 231 * just put random data into unused pages; but typically it will corrupt 232 * meanwhile completely unrelated data, causing all sorts of damage. 233 * 234 * Which means delayed successful completion, 235 * especially for READ requests, 236 * is a reason to panic(). 237 * 238 * We assume that a delayed *error* completion is OK, 239 * though we still will complain noisily about it. 240 */ 241 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) { 242 if (__ratelimit(&drbd_ratelimit_state)) 243 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n"); 244 245 if (!bio->bi_error) 246 drbd_panic_after_delayed_completion_of_aborted_request(device); 247 } 248 249 /* to avoid recursion in __req_mod */ 250 if (unlikely(bio->bi_error)) { 251 if (bio->bi_rw & REQ_DISCARD) 252 what = (bio->bi_error == -EOPNOTSUPP) 253 ? DISCARD_COMPLETED_NOTSUPP 254 : DISCARD_COMPLETED_WITH_ERROR; 255 else 256 what = (bio_data_dir(bio) == WRITE) 257 ? WRITE_COMPLETED_WITH_ERROR 258 : (bio_rw(bio) == READ) 259 ? READ_COMPLETED_WITH_ERROR 260 : READ_AHEAD_COMPLETED_WITH_ERROR; 261 } else 262 what = COMPLETED_OK; 263 264 bio_put(req->private_bio); 265 req->private_bio = ERR_PTR(bio->bi_error); 266 267 /* not req_mod(), we need irqsave here! */ 268 spin_lock_irqsave(&device->resource->req_lock, flags); 269 __req_mod(req, what, &m); 270 spin_unlock_irqrestore(&device->resource->req_lock, flags); 271 put_ldev(device); 272 273 if (m.bio) 274 complete_master_bio(device, &m); 275 } 276 277 void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest) 278 { 279 struct hash_desc desc; 280 struct scatterlist sg; 281 struct page *page = peer_req->pages; 282 struct page *tmp; 283 unsigned len; 284 285 desc.tfm = tfm; 286 desc.flags = 0; 287 288 sg_init_table(&sg, 1); 289 crypto_hash_init(&desc); 290 291 while ((tmp = page_chain_next(page))) { 292 /* all but the last page will be fully used */ 293 sg_set_page(&sg, page, PAGE_SIZE, 0); 294 crypto_hash_update(&desc, &sg, sg.length); 295 page = tmp; 296 } 297 /* and now the last, possibly only partially used page */ 298 len = peer_req->i.size & (PAGE_SIZE - 1); 299 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0); 300 crypto_hash_update(&desc, &sg, sg.length); 301 crypto_hash_final(&desc, digest); 302 } 303 304 void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest) 305 { 306 struct hash_desc desc; 307 struct scatterlist sg; 308 struct bio_vec bvec; 309 struct bvec_iter iter; 310 311 desc.tfm = tfm; 312 desc.flags = 0; 313 314 sg_init_table(&sg, 1); 315 crypto_hash_init(&desc); 316 317 bio_for_each_segment(bvec, bio, iter) { 318 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset); 319 crypto_hash_update(&desc, &sg, sg.length); 320 } 321 crypto_hash_final(&desc, digest); 322 } 323 324 /* MAYBE merge common code with w_e_end_ov_req */ 325 static int w_e_send_csum(struct drbd_work *w, int cancel) 326 { 327 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 328 struct drbd_peer_device *peer_device = peer_req->peer_device; 329 struct drbd_device *device = peer_device->device; 330 int digest_size; 331 void *digest; 332 int err = 0; 333 334 if (unlikely(cancel)) 335 goto out; 336 337 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0)) 338 goto out; 339 340 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm); 341 digest = kmalloc(digest_size, GFP_NOIO); 342 if (digest) { 343 sector_t sector = peer_req->i.sector; 344 unsigned int size = peer_req->i.size; 345 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest); 346 /* Free peer_req and pages before send. 347 * In case we block on congestion, we could otherwise run into 348 * some distributed deadlock, if the other side blocks on 349 * congestion as well, because our receiver blocks in 350 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 351 drbd_free_peer_req(device, peer_req); 352 peer_req = NULL; 353 inc_rs_pending(device); 354 err = drbd_send_drequest_csum(peer_device, sector, size, 355 digest, digest_size, 356 P_CSUM_RS_REQUEST); 357 kfree(digest); 358 } else { 359 drbd_err(device, "kmalloc() of digest failed.\n"); 360 err = -ENOMEM; 361 } 362 363 out: 364 if (peer_req) 365 drbd_free_peer_req(device, peer_req); 366 367 if (unlikely(err)) 368 drbd_err(device, "drbd_send_drequest(..., csum) failed\n"); 369 return err; 370 } 371 372 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 373 374 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size) 375 { 376 struct drbd_device *device = peer_device->device; 377 struct drbd_peer_request *peer_req; 378 379 if (!get_ldev(device)) 380 return -EIO; 381 382 /* GFP_TRY, because if there is no memory available right now, this may 383 * be rescheduled for later. It is "only" background resync, after all. */ 384 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector, 385 size, true /* has real payload */, GFP_TRY); 386 if (!peer_req) 387 goto defer; 388 389 peer_req->w.cb = w_e_send_csum; 390 spin_lock_irq(&device->resource->req_lock); 391 list_add_tail(&peer_req->w.list, &device->read_ee); 392 spin_unlock_irq(&device->resource->req_lock); 393 394 atomic_add(size >> 9, &device->rs_sect_ev); 395 if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0) 396 return 0; 397 398 /* If it failed because of ENOMEM, retry should help. If it failed 399 * because bio_add_page failed (probably broken lower level driver), 400 * retry may or may not help. 401 * If it does not, you may need to force disconnect. */ 402 spin_lock_irq(&device->resource->req_lock); 403 list_del(&peer_req->w.list); 404 spin_unlock_irq(&device->resource->req_lock); 405 406 drbd_free_peer_req(device, peer_req); 407 defer: 408 put_ldev(device); 409 return -EAGAIN; 410 } 411 412 int w_resync_timer(struct drbd_work *w, int cancel) 413 { 414 struct drbd_device *device = 415 container_of(w, struct drbd_device, resync_work); 416 417 switch (device->state.conn) { 418 case C_VERIFY_S: 419 make_ov_request(device, cancel); 420 break; 421 case C_SYNC_TARGET: 422 make_resync_request(device, cancel); 423 break; 424 } 425 426 return 0; 427 } 428 429 void resync_timer_fn(unsigned long data) 430 { 431 struct drbd_device *device = (struct drbd_device *) data; 432 433 drbd_queue_work_if_unqueued( 434 &first_peer_device(device)->connection->sender_work, 435 &device->resync_work); 436 } 437 438 static void fifo_set(struct fifo_buffer *fb, int value) 439 { 440 int i; 441 442 for (i = 0; i < fb->size; i++) 443 fb->values[i] = value; 444 } 445 446 static int fifo_push(struct fifo_buffer *fb, int value) 447 { 448 int ov; 449 450 ov = fb->values[fb->head_index]; 451 fb->values[fb->head_index++] = value; 452 453 if (fb->head_index >= fb->size) 454 fb->head_index = 0; 455 456 return ov; 457 } 458 459 static void fifo_add_val(struct fifo_buffer *fb, int value) 460 { 461 int i; 462 463 for (i = 0; i < fb->size; i++) 464 fb->values[i] += value; 465 } 466 467 struct fifo_buffer *fifo_alloc(int fifo_size) 468 { 469 struct fifo_buffer *fb; 470 471 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO); 472 if (!fb) 473 return NULL; 474 475 fb->head_index = 0; 476 fb->size = fifo_size; 477 fb->total = 0; 478 479 return fb; 480 } 481 482 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in) 483 { 484 struct disk_conf *dc; 485 unsigned int want; /* The number of sectors we want in-flight */ 486 int req_sect; /* Number of sectors to request in this turn */ 487 int correction; /* Number of sectors more we need in-flight */ 488 int cps; /* correction per invocation of drbd_rs_controller() */ 489 int steps; /* Number of time steps to plan ahead */ 490 int curr_corr; 491 int max_sect; 492 struct fifo_buffer *plan; 493 494 dc = rcu_dereference(device->ldev->disk_conf); 495 plan = rcu_dereference(device->rs_plan_s); 496 497 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */ 498 499 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */ 500 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps; 501 } else { /* normal path */ 502 want = dc->c_fill_target ? dc->c_fill_target : 503 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10); 504 } 505 506 correction = want - device->rs_in_flight - plan->total; 507 508 /* Plan ahead */ 509 cps = correction / steps; 510 fifo_add_val(plan, cps); 511 plan->total += cps * steps; 512 513 /* What we do in this step */ 514 curr_corr = fifo_push(plan, 0); 515 plan->total -= curr_corr; 516 517 req_sect = sect_in + curr_corr; 518 if (req_sect < 0) 519 req_sect = 0; 520 521 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ; 522 if (req_sect > max_sect) 523 req_sect = max_sect; 524 525 /* 526 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n", 527 sect_in, device->rs_in_flight, want, correction, 528 steps, cps, device->rs_planed, curr_corr, req_sect); 529 */ 530 531 return req_sect; 532 } 533 534 static int drbd_rs_number_requests(struct drbd_device *device) 535 { 536 unsigned int sect_in; /* Number of sectors that came in since the last turn */ 537 int number, mxb; 538 539 sect_in = atomic_xchg(&device->rs_sect_in, 0); 540 device->rs_in_flight -= sect_in; 541 542 rcu_read_lock(); 543 mxb = drbd_get_max_buffers(device) / 2; 544 if (rcu_dereference(device->rs_plan_s)->size) { 545 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9); 546 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME; 547 } else { 548 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate; 549 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ); 550 } 551 rcu_read_unlock(); 552 553 /* Don't have more than "max-buffers"/2 in-flight. 554 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(), 555 * potentially causing a distributed deadlock on congestion during 556 * online-verify or (checksum-based) resync, if max-buffers, 557 * socket buffer sizes and resync rate settings are mis-configured. */ 558 559 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k), 560 * mxb (as used here, and in drbd_alloc_pages on the peer) is 561 * "number of pages" (typically also 4k), 562 * but "rs_in_flight" is in "sectors" (512 Byte). */ 563 if (mxb - device->rs_in_flight/8 < number) 564 number = mxb - device->rs_in_flight/8; 565 566 return number; 567 } 568 569 static int make_resync_request(struct drbd_device *const device, int cancel) 570 { 571 struct drbd_peer_device *const peer_device = first_peer_device(device); 572 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL; 573 unsigned long bit; 574 sector_t sector; 575 const sector_t capacity = drbd_get_capacity(device->this_bdev); 576 int max_bio_size; 577 int number, rollback_i, size; 578 int align, requeue = 0; 579 int i = 0; 580 581 if (unlikely(cancel)) 582 return 0; 583 584 if (device->rs_total == 0) { 585 /* empty resync? */ 586 drbd_resync_finished(device); 587 return 0; 588 } 589 590 if (!get_ldev(device)) { 591 /* Since we only need to access device->rsync a 592 get_ldev_if_state(device,D_FAILED) would be sufficient, but 593 to continue resync with a broken disk makes no sense at 594 all */ 595 drbd_err(device, "Disk broke down during resync!\n"); 596 return 0; 597 } 598 599 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9; 600 number = drbd_rs_number_requests(device); 601 if (number <= 0) 602 goto requeue; 603 604 for (i = 0; i < number; i++) { 605 /* Stop generating RS requests when half of the send buffer is filled, 606 * but notify TCP that we'd like to have more space. */ 607 mutex_lock(&connection->data.mutex); 608 if (connection->data.socket) { 609 struct sock *sk = connection->data.socket->sk; 610 int queued = sk->sk_wmem_queued; 611 int sndbuf = sk->sk_sndbuf; 612 if (queued > sndbuf / 2) { 613 requeue = 1; 614 if (sk->sk_socket) 615 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 616 } 617 } else 618 requeue = 1; 619 mutex_unlock(&connection->data.mutex); 620 if (requeue) 621 goto requeue; 622 623 next_sector: 624 size = BM_BLOCK_SIZE; 625 bit = drbd_bm_find_next(device, device->bm_resync_fo); 626 627 if (bit == DRBD_END_OF_BITMAP) { 628 device->bm_resync_fo = drbd_bm_bits(device); 629 put_ldev(device); 630 return 0; 631 } 632 633 sector = BM_BIT_TO_SECT(bit); 634 635 if (drbd_try_rs_begin_io(device, sector)) { 636 device->bm_resync_fo = bit; 637 goto requeue; 638 } 639 device->bm_resync_fo = bit + 1; 640 641 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) { 642 drbd_rs_complete_io(device, sector); 643 goto next_sector; 644 } 645 646 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE 647 /* try to find some adjacent bits. 648 * we stop if we have already the maximum req size. 649 * 650 * Additionally always align bigger requests, in order to 651 * be prepared for all stripe sizes of software RAIDs. 652 */ 653 align = 1; 654 rollback_i = i; 655 while (i < number) { 656 if (size + BM_BLOCK_SIZE > max_bio_size) 657 break; 658 659 /* Be always aligned */ 660 if (sector & ((1<<(align+3))-1)) 661 break; 662 663 /* do not cross extent boundaries */ 664 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0) 665 break; 666 /* now, is it actually dirty, after all? 667 * caution, drbd_bm_test_bit is tri-state for some 668 * obscure reason; ( b == 0 ) would get the out-of-band 669 * only accidentally right because of the "oddly sized" 670 * adjustment below */ 671 if (drbd_bm_test_bit(device, bit+1) != 1) 672 break; 673 bit++; 674 size += BM_BLOCK_SIZE; 675 if ((BM_BLOCK_SIZE << align) <= size) 676 align++; 677 i++; 678 } 679 /* if we merged some, 680 * reset the offset to start the next drbd_bm_find_next from */ 681 if (size > BM_BLOCK_SIZE) 682 device->bm_resync_fo = bit + 1; 683 #endif 684 685 /* adjust very last sectors, in case we are oddly sized */ 686 if (sector + (size>>9) > capacity) 687 size = (capacity-sector)<<9; 688 689 if (device->use_csums) { 690 switch (read_for_csum(peer_device, sector, size)) { 691 case -EIO: /* Disk failure */ 692 put_ldev(device); 693 return -EIO; 694 case -EAGAIN: /* allocation failed, or ldev busy */ 695 drbd_rs_complete_io(device, sector); 696 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 697 i = rollback_i; 698 goto requeue; 699 case 0: 700 /* everything ok */ 701 break; 702 default: 703 BUG(); 704 } 705 } else { 706 int err; 707 708 inc_rs_pending(device); 709 err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST, 710 sector, size, ID_SYNCER); 711 if (err) { 712 drbd_err(device, "drbd_send_drequest() failed, aborting...\n"); 713 dec_rs_pending(device); 714 put_ldev(device); 715 return err; 716 } 717 } 718 } 719 720 if (device->bm_resync_fo >= drbd_bm_bits(device)) { 721 /* last syncer _request_ was sent, 722 * but the P_RS_DATA_REPLY not yet received. sync will end (and 723 * next sync group will resume), as soon as we receive the last 724 * resync data block, and the last bit is cleared. 725 * until then resync "work" is "inactive" ... 726 */ 727 put_ldev(device); 728 return 0; 729 } 730 731 requeue: 732 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 733 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 734 put_ldev(device); 735 return 0; 736 } 737 738 static int make_ov_request(struct drbd_device *device, int cancel) 739 { 740 int number, i, size; 741 sector_t sector; 742 const sector_t capacity = drbd_get_capacity(device->this_bdev); 743 bool stop_sector_reached = false; 744 745 if (unlikely(cancel)) 746 return 1; 747 748 number = drbd_rs_number_requests(device); 749 750 sector = device->ov_position; 751 for (i = 0; i < number; i++) { 752 if (sector >= capacity) 753 return 1; 754 755 /* We check for "finished" only in the reply path: 756 * w_e_end_ov_reply(). 757 * We need to send at least one request out. */ 758 stop_sector_reached = i > 0 759 && verify_can_do_stop_sector(device) 760 && sector >= device->ov_stop_sector; 761 if (stop_sector_reached) 762 break; 763 764 size = BM_BLOCK_SIZE; 765 766 if (drbd_try_rs_begin_io(device, sector)) { 767 device->ov_position = sector; 768 goto requeue; 769 } 770 771 if (sector + (size>>9) > capacity) 772 size = (capacity-sector)<<9; 773 774 inc_rs_pending(device); 775 if (drbd_send_ov_request(first_peer_device(device), sector, size)) { 776 dec_rs_pending(device); 777 return 0; 778 } 779 sector += BM_SECT_PER_BIT; 780 } 781 device->ov_position = sector; 782 783 requeue: 784 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 785 if (i == 0 || !stop_sector_reached) 786 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 787 return 1; 788 } 789 790 int w_ov_finished(struct drbd_work *w, int cancel) 791 { 792 struct drbd_device_work *dw = 793 container_of(w, struct drbd_device_work, w); 794 struct drbd_device *device = dw->device; 795 kfree(dw); 796 ov_out_of_sync_print(device); 797 drbd_resync_finished(device); 798 799 return 0; 800 } 801 802 static int w_resync_finished(struct drbd_work *w, int cancel) 803 { 804 struct drbd_device_work *dw = 805 container_of(w, struct drbd_device_work, w); 806 struct drbd_device *device = dw->device; 807 kfree(dw); 808 809 drbd_resync_finished(device); 810 811 return 0; 812 } 813 814 static void ping_peer(struct drbd_device *device) 815 { 816 struct drbd_connection *connection = first_peer_device(device)->connection; 817 818 clear_bit(GOT_PING_ACK, &connection->flags); 819 request_ping(connection); 820 wait_event(connection->ping_wait, 821 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED); 822 } 823 824 int drbd_resync_finished(struct drbd_device *device) 825 { 826 unsigned long db, dt, dbdt; 827 unsigned long n_oos; 828 union drbd_state os, ns; 829 struct drbd_device_work *dw; 830 char *khelper_cmd = NULL; 831 int verify_done = 0; 832 833 /* Remove all elements from the resync LRU. Since future actions 834 * might set bits in the (main) bitmap, then the entries in the 835 * resync LRU would be wrong. */ 836 if (drbd_rs_del_all(device)) { 837 /* In case this is not possible now, most probably because 838 * there are P_RS_DATA_REPLY Packets lingering on the worker's 839 * queue (or even the read operations for those packets 840 * is not finished by now). Retry in 100ms. */ 841 842 schedule_timeout_interruptible(HZ / 10); 843 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC); 844 if (dw) { 845 dw->w.cb = w_resync_finished; 846 dw->device = device; 847 drbd_queue_work(&first_peer_device(device)->connection->sender_work, 848 &dw->w); 849 return 1; 850 } 851 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n"); 852 } 853 854 dt = (jiffies - device->rs_start - device->rs_paused) / HZ; 855 if (dt <= 0) 856 dt = 1; 857 858 db = device->rs_total; 859 /* adjust for verify start and stop sectors, respective reached position */ 860 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) 861 db -= device->ov_left; 862 863 dbdt = Bit2KB(db/dt); 864 device->rs_paused /= HZ; 865 866 if (!get_ldev(device)) 867 goto out; 868 869 ping_peer(device); 870 871 spin_lock_irq(&device->resource->req_lock); 872 os = drbd_read_state(device); 873 874 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T); 875 876 /* This protects us against multiple calls (that can happen in the presence 877 of application IO), and against connectivity loss just before we arrive here. */ 878 if (os.conn <= C_CONNECTED) 879 goto out_unlock; 880 881 ns = os; 882 ns.conn = C_CONNECTED; 883 884 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n", 885 verify_done ? "Online verify" : "Resync", 886 dt + device->rs_paused, device->rs_paused, dbdt); 887 888 n_oos = drbd_bm_total_weight(device); 889 890 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) { 891 if (n_oos) { 892 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n", 893 n_oos, Bit2KB(1)); 894 khelper_cmd = "out-of-sync"; 895 } 896 } else { 897 D_ASSERT(device, (n_oos - device->rs_failed) == 0); 898 899 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 900 khelper_cmd = "after-resync-target"; 901 902 if (device->use_csums && device->rs_total) { 903 const unsigned long s = device->rs_same_csum; 904 const unsigned long t = device->rs_total; 905 const int ratio = 906 (t == 0) ? 0 : 907 (t < 100000) ? ((s*100)/t) : (s/(t/100)); 908 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; " 909 "transferred %luK total %luK\n", 910 ratio, 911 Bit2KB(device->rs_same_csum), 912 Bit2KB(device->rs_total - device->rs_same_csum), 913 Bit2KB(device->rs_total)); 914 } 915 } 916 917 if (device->rs_failed) { 918 drbd_info(device, " %lu failed blocks\n", device->rs_failed); 919 920 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 921 ns.disk = D_INCONSISTENT; 922 ns.pdsk = D_UP_TO_DATE; 923 } else { 924 ns.disk = D_UP_TO_DATE; 925 ns.pdsk = D_INCONSISTENT; 926 } 927 } else { 928 ns.disk = D_UP_TO_DATE; 929 ns.pdsk = D_UP_TO_DATE; 930 931 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 932 if (device->p_uuid) { 933 int i; 934 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++) 935 _drbd_uuid_set(device, i, device->p_uuid[i]); 936 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]); 937 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]); 938 } else { 939 drbd_err(device, "device->p_uuid is NULL! BUG\n"); 940 } 941 } 942 943 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) { 944 /* for verify runs, we don't update uuids here, 945 * so there would be nothing to report. */ 946 drbd_uuid_set_bm(device, 0UL); 947 drbd_print_uuids(device, "updated UUIDs"); 948 if (device->p_uuid) { 949 /* Now the two UUID sets are equal, update what we 950 * know of the peer. */ 951 int i; 952 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++) 953 device->p_uuid[i] = device->ldev->md.uuid[i]; 954 } 955 } 956 } 957 958 _drbd_set_state(device, ns, CS_VERBOSE, NULL); 959 out_unlock: 960 spin_unlock_irq(&device->resource->req_lock); 961 put_ldev(device); 962 out: 963 device->rs_total = 0; 964 device->rs_failed = 0; 965 device->rs_paused = 0; 966 967 /* reset start sector, if we reached end of device */ 968 if (verify_done && device->ov_left == 0) 969 device->ov_start_sector = 0; 970 971 drbd_md_sync(device); 972 973 if (khelper_cmd) 974 drbd_khelper(device, khelper_cmd); 975 976 return 1; 977 } 978 979 /* helper */ 980 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req) 981 { 982 if (drbd_peer_req_has_active_page(peer_req)) { 983 /* This might happen if sendpage() has not finished */ 984 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT; 985 atomic_add(i, &device->pp_in_use_by_net); 986 atomic_sub(i, &device->pp_in_use); 987 spin_lock_irq(&device->resource->req_lock); 988 list_add_tail(&peer_req->w.list, &device->net_ee); 989 spin_unlock_irq(&device->resource->req_lock); 990 wake_up(&drbd_pp_wait); 991 } else 992 drbd_free_peer_req(device, peer_req); 993 } 994 995 /** 996 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 997 * @device: DRBD device. 998 * @w: work object. 999 * @cancel: The connection will be closed anyways 1000 */ 1001 int w_e_end_data_req(struct drbd_work *w, int cancel) 1002 { 1003 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1004 struct drbd_peer_device *peer_device = peer_req->peer_device; 1005 struct drbd_device *device = peer_device->device; 1006 int err; 1007 1008 if (unlikely(cancel)) { 1009 drbd_free_peer_req(device, peer_req); 1010 dec_unacked(device); 1011 return 0; 1012 } 1013 1014 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1015 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req); 1016 } else { 1017 if (__ratelimit(&drbd_ratelimit_state)) 1018 drbd_err(device, "Sending NegDReply. sector=%llus.\n", 1019 (unsigned long long)peer_req->i.sector); 1020 1021 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req); 1022 } 1023 1024 dec_unacked(device); 1025 1026 move_to_net_ee_or_free(device, peer_req); 1027 1028 if (unlikely(err)) 1029 drbd_err(device, "drbd_send_block() failed\n"); 1030 return err; 1031 } 1032 1033 /** 1034 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST 1035 * @w: work object. 1036 * @cancel: The connection will be closed anyways 1037 */ 1038 int w_e_end_rsdata_req(struct drbd_work *w, int cancel) 1039 { 1040 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1041 struct drbd_peer_device *peer_device = peer_req->peer_device; 1042 struct drbd_device *device = peer_device->device; 1043 int err; 1044 1045 if (unlikely(cancel)) { 1046 drbd_free_peer_req(device, peer_req); 1047 dec_unacked(device); 1048 return 0; 1049 } 1050 1051 if (get_ldev_if_state(device, D_FAILED)) { 1052 drbd_rs_complete_io(device, peer_req->i.sector); 1053 put_ldev(device); 1054 } 1055 1056 if (device->state.conn == C_AHEAD) { 1057 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req); 1058 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1059 if (likely(device->state.pdsk >= D_INCONSISTENT)) { 1060 inc_rs_pending(device); 1061 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req); 1062 } else { 1063 if (__ratelimit(&drbd_ratelimit_state)) 1064 drbd_err(device, "Not sending RSDataReply, " 1065 "partner DISKLESS!\n"); 1066 err = 0; 1067 } 1068 } else { 1069 if (__ratelimit(&drbd_ratelimit_state)) 1070 drbd_err(device, "Sending NegRSDReply. sector %llus.\n", 1071 (unsigned long long)peer_req->i.sector); 1072 1073 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req); 1074 1075 /* update resync data with failure */ 1076 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size); 1077 } 1078 1079 dec_unacked(device); 1080 1081 move_to_net_ee_or_free(device, peer_req); 1082 1083 if (unlikely(err)) 1084 drbd_err(device, "drbd_send_block() failed\n"); 1085 return err; 1086 } 1087 1088 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel) 1089 { 1090 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1091 struct drbd_peer_device *peer_device = peer_req->peer_device; 1092 struct drbd_device *device = peer_device->device; 1093 struct digest_info *di; 1094 int digest_size; 1095 void *digest = NULL; 1096 int err, eq = 0; 1097 1098 if (unlikely(cancel)) { 1099 drbd_free_peer_req(device, peer_req); 1100 dec_unacked(device); 1101 return 0; 1102 } 1103 1104 if (get_ldev(device)) { 1105 drbd_rs_complete_io(device, peer_req->i.sector); 1106 put_ldev(device); 1107 } 1108 1109 di = peer_req->digest; 1110 1111 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1112 /* quick hack to try to avoid a race against reconfiguration. 1113 * a real fix would be much more involved, 1114 * introducing more locking mechanisms */ 1115 if (peer_device->connection->csums_tfm) { 1116 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm); 1117 D_ASSERT(device, digest_size == di->digest_size); 1118 digest = kmalloc(digest_size, GFP_NOIO); 1119 } 1120 if (digest) { 1121 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest); 1122 eq = !memcmp(digest, di->digest, digest_size); 1123 kfree(digest); 1124 } 1125 1126 if (eq) { 1127 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size); 1128 /* rs_same_csums unit is BM_BLOCK_SIZE */ 1129 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT; 1130 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req); 1131 } else { 1132 inc_rs_pending(device); 1133 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */ 1134 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */ 1135 kfree(di); 1136 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req); 1137 } 1138 } else { 1139 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req); 1140 if (__ratelimit(&drbd_ratelimit_state)) 1141 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n"); 1142 } 1143 1144 dec_unacked(device); 1145 move_to_net_ee_or_free(device, peer_req); 1146 1147 if (unlikely(err)) 1148 drbd_err(device, "drbd_send_block/ack() failed\n"); 1149 return err; 1150 } 1151 1152 int w_e_end_ov_req(struct drbd_work *w, int cancel) 1153 { 1154 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1155 struct drbd_peer_device *peer_device = peer_req->peer_device; 1156 struct drbd_device *device = peer_device->device; 1157 sector_t sector = peer_req->i.sector; 1158 unsigned int size = peer_req->i.size; 1159 int digest_size; 1160 void *digest; 1161 int err = 0; 1162 1163 if (unlikely(cancel)) 1164 goto out; 1165 1166 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm); 1167 digest = kmalloc(digest_size, GFP_NOIO); 1168 if (!digest) { 1169 err = 1; /* terminate the connection in case the allocation failed */ 1170 goto out; 1171 } 1172 1173 if (likely(!(peer_req->flags & EE_WAS_ERROR))) 1174 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest); 1175 else 1176 memset(digest, 0, digest_size); 1177 1178 /* Free e and pages before send. 1179 * In case we block on congestion, we could otherwise run into 1180 * some distributed deadlock, if the other side blocks on 1181 * congestion as well, because our receiver blocks in 1182 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1183 drbd_free_peer_req(device, peer_req); 1184 peer_req = NULL; 1185 inc_rs_pending(device); 1186 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY); 1187 if (err) 1188 dec_rs_pending(device); 1189 kfree(digest); 1190 1191 out: 1192 if (peer_req) 1193 drbd_free_peer_req(device, peer_req); 1194 dec_unacked(device); 1195 return err; 1196 } 1197 1198 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size) 1199 { 1200 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) { 1201 device->ov_last_oos_size += size>>9; 1202 } else { 1203 device->ov_last_oos_start = sector; 1204 device->ov_last_oos_size = size>>9; 1205 } 1206 drbd_set_out_of_sync(device, sector, size); 1207 } 1208 1209 int w_e_end_ov_reply(struct drbd_work *w, int cancel) 1210 { 1211 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1212 struct drbd_peer_device *peer_device = peer_req->peer_device; 1213 struct drbd_device *device = peer_device->device; 1214 struct digest_info *di; 1215 void *digest; 1216 sector_t sector = peer_req->i.sector; 1217 unsigned int size = peer_req->i.size; 1218 int digest_size; 1219 int err, eq = 0; 1220 bool stop_sector_reached = false; 1221 1222 if (unlikely(cancel)) { 1223 drbd_free_peer_req(device, peer_req); 1224 dec_unacked(device); 1225 return 0; 1226 } 1227 1228 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all 1229 * the resync lru has been cleaned up already */ 1230 if (get_ldev(device)) { 1231 drbd_rs_complete_io(device, peer_req->i.sector); 1232 put_ldev(device); 1233 } 1234 1235 di = peer_req->digest; 1236 1237 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1238 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm); 1239 digest = kmalloc(digest_size, GFP_NOIO); 1240 if (digest) { 1241 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest); 1242 1243 D_ASSERT(device, digest_size == di->digest_size); 1244 eq = !memcmp(digest, di->digest, digest_size); 1245 kfree(digest); 1246 } 1247 } 1248 1249 /* Free peer_req and pages before send. 1250 * In case we block on congestion, we could otherwise run into 1251 * some distributed deadlock, if the other side blocks on 1252 * congestion as well, because our receiver blocks in 1253 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1254 drbd_free_peer_req(device, peer_req); 1255 if (!eq) 1256 drbd_ov_out_of_sync_found(device, sector, size); 1257 else 1258 ov_out_of_sync_print(device); 1259 1260 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, 1261 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1262 1263 dec_unacked(device); 1264 1265 --device->ov_left; 1266 1267 /* let's advance progress step marks only for every other megabyte */ 1268 if ((device->ov_left & 0x200) == 0x200) 1269 drbd_advance_rs_marks(device, device->ov_left); 1270 1271 stop_sector_reached = verify_can_do_stop_sector(device) && 1272 (sector + (size>>9)) >= device->ov_stop_sector; 1273 1274 if (device->ov_left == 0 || stop_sector_reached) { 1275 ov_out_of_sync_print(device); 1276 drbd_resync_finished(device); 1277 } 1278 1279 return err; 1280 } 1281 1282 /* FIXME 1283 * We need to track the number of pending barrier acks, 1284 * and to be able to wait for them. 1285 * See also comment in drbd_adm_attach before drbd_suspend_io. 1286 */ 1287 static int drbd_send_barrier(struct drbd_connection *connection) 1288 { 1289 struct p_barrier *p; 1290 struct drbd_socket *sock; 1291 1292 sock = &connection->data; 1293 p = conn_prepare_command(connection, sock); 1294 if (!p) 1295 return -EIO; 1296 p->barrier = connection->send.current_epoch_nr; 1297 p->pad = 0; 1298 connection->send.current_epoch_writes = 0; 1299 connection->send.last_sent_barrier_jif = jiffies; 1300 1301 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0); 1302 } 1303 1304 int w_send_write_hint(struct drbd_work *w, int cancel) 1305 { 1306 struct drbd_device *device = 1307 container_of(w, struct drbd_device, unplug_work); 1308 struct drbd_socket *sock; 1309 1310 if (cancel) 1311 return 0; 1312 sock = &first_peer_device(device)->connection->data; 1313 if (!drbd_prepare_command(first_peer_device(device), sock)) 1314 return -EIO; 1315 return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0); 1316 } 1317 1318 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch) 1319 { 1320 if (!connection->send.seen_any_write_yet) { 1321 connection->send.seen_any_write_yet = true; 1322 connection->send.current_epoch_nr = epoch; 1323 connection->send.current_epoch_writes = 0; 1324 connection->send.last_sent_barrier_jif = jiffies; 1325 } 1326 } 1327 1328 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch) 1329 { 1330 /* re-init if first write on this connection */ 1331 if (!connection->send.seen_any_write_yet) 1332 return; 1333 if (connection->send.current_epoch_nr != epoch) { 1334 if (connection->send.current_epoch_writes) 1335 drbd_send_barrier(connection); 1336 connection->send.current_epoch_nr = epoch; 1337 } 1338 } 1339 1340 int w_send_out_of_sync(struct drbd_work *w, int cancel) 1341 { 1342 struct drbd_request *req = container_of(w, struct drbd_request, w); 1343 struct drbd_device *device = req->device; 1344 struct drbd_peer_device *const peer_device = first_peer_device(device); 1345 struct drbd_connection *const connection = peer_device->connection; 1346 int err; 1347 1348 if (unlikely(cancel)) { 1349 req_mod(req, SEND_CANCELED); 1350 return 0; 1351 } 1352 req->pre_send_jif = jiffies; 1353 1354 /* this time, no connection->send.current_epoch_writes++; 1355 * If it was sent, it was the closing barrier for the last 1356 * replicated epoch, before we went into AHEAD mode. 1357 * No more barriers will be sent, until we leave AHEAD mode again. */ 1358 maybe_send_barrier(connection, req->epoch); 1359 1360 err = drbd_send_out_of_sync(peer_device, req); 1361 req_mod(req, OOS_HANDED_TO_NETWORK); 1362 1363 return err; 1364 } 1365 1366 /** 1367 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request 1368 * @w: work object. 1369 * @cancel: The connection will be closed anyways 1370 */ 1371 int w_send_dblock(struct drbd_work *w, int cancel) 1372 { 1373 struct drbd_request *req = container_of(w, struct drbd_request, w); 1374 struct drbd_device *device = req->device; 1375 struct drbd_peer_device *const peer_device = first_peer_device(device); 1376 struct drbd_connection *connection = peer_device->connection; 1377 int err; 1378 1379 if (unlikely(cancel)) { 1380 req_mod(req, SEND_CANCELED); 1381 return 0; 1382 } 1383 req->pre_send_jif = jiffies; 1384 1385 re_init_if_first_write(connection, req->epoch); 1386 maybe_send_barrier(connection, req->epoch); 1387 connection->send.current_epoch_writes++; 1388 1389 err = drbd_send_dblock(peer_device, req); 1390 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1391 1392 return err; 1393 } 1394 1395 /** 1396 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet 1397 * @w: work object. 1398 * @cancel: The connection will be closed anyways 1399 */ 1400 int w_send_read_req(struct drbd_work *w, int cancel) 1401 { 1402 struct drbd_request *req = container_of(w, struct drbd_request, w); 1403 struct drbd_device *device = req->device; 1404 struct drbd_peer_device *const peer_device = first_peer_device(device); 1405 struct drbd_connection *connection = peer_device->connection; 1406 int err; 1407 1408 if (unlikely(cancel)) { 1409 req_mod(req, SEND_CANCELED); 1410 return 0; 1411 } 1412 req->pre_send_jif = jiffies; 1413 1414 /* Even read requests may close a write epoch, 1415 * if there was any yet. */ 1416 maybe_send_barrier(connection, req->epoch); 1417 1418 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size, 1419 (unsigned long)req); 1420 1421 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1422 1423 return err; 1424 } 1425 1426 int w_restart_disk_io(struct drbd_work *w, int cancel) 1427 { 1428 struct drbd_request *req = container_of(w, struct drbd_request, w); 1429 struct drbd_device *device = req->device; 1430 1431 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG) 1432 drbd_al_begin_io(device, &req->i); 1433 1434 drbd_req_make_private_bio(req, req->master_bio); 1435 req->private_bio->bi_bdev = device->ldev->backing_bdev; 1436 generic_make_request(req->private_bio); 1437 1438 return 0; 1439 } 1440 1441 static int _drbd_may_sync_now(struct drbd_device *device) 1442 { 1443 struct drbd_device *odev = device; 1444 int resync_after; 1445 1446 while (1) { 1447 if (!odev->ldev || odev->state.disk == D_DISKLESS) 1448 return 1; 1449 rcu_read_lock(); 1450 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1451 rcu_read_unlock(); 1452 if (resync_after == -1) 1453 return 1; 1454 odev = minor_to_device(resync_after); 1455 if (!odev) 1456 return 1; 1457 if ((odev->state.conn >= C_SYNC_SOURCE && 1458 odev->state.conn <= C_PAUSED_SYNC_T) || 1459 odev->state.aftr_isp || odev->state.peer_isp || 1460 odev->state.user_isp) 1461 return 0; 1462 } 1463 } 1464 1465 /** 1466 * drbd_pause_after() - Pause resync on all devices that may not resync now 1467 * @device: DRBD device. 1468 * 1469 * Called from process context only (admin command and after_state_ch). 1470 */ 1471 static bool drbd_pause_after(struct drbd_device *device) 1472 { 1473 bool changed = false; 1474 struct drbd_device *odev; 1475 int i; 1476 1477 rcu_read_lock(); 1478 idr_for_each_entry(&drbd_devices, odev, i) { 1479 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1480 continue; 1481 if (!_drbd_may_sync_now(odev) && 1482 _drbd_set_state(_NS(odev, aftr_isp, 1), 1483 CS_HARD, NULL) != SS_NOTHING_TO_DO) 1484 changed = true; 1485 } 1486 rcu_read_unlock(); 1487 1488 return changed; 1489 } 1490 1491 /** 1492 * drbd_resume_next() - Resume resync on all devices that may resync now 1493 * @device: DRBD device. 1494 * 1495 * Called from process context only (admin command and worker). 1496 */ 1497 static bool drbd_resume_next(struct drbd_device *device) 1498 { 1499 bool changed = false; 1500 struct drbd_device *odev; 1501 int i; 1502 1503 rcu_read_lock(); 1504 idr_for_each_entry(&drbd_devices, odev, i) { 1505 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1506 continue; 1507 if (odev->state.aftr_isp) { 1508 if (_drbd_may_sync_now(odev) && 1509 _drbd_set_state(_NS(odev, aftr_isp, 0), 1510 CS_HARD, NULL) != SS_NOTHING_TO_DO) 1511 changed = true; 1512 } 1513 } 1514 rcu_read_unlock(); 1515 return changed; 1516 } 1517 1518 void resume_next_sg(struct drbd_device *device) 1519 { 1520 lock_all_resources(); 1521 drbd_resume_next(device); 1522 unlock_all_resources(); 1523 } 1524 1525 void suspend_other_sg(struct drbd_device *device) 1526 { 1527 lock_all_resources(); 1528 drbd_pause_after(device); 1529 unlock_all_resources(); 1530 } 1531 1532 /* caller must lock_all_resources() */ 1533 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor) 1534 { 1535 struct drbd_device *odev; 1536 int resync_after; 1537 1538 if (o_minor == -1) 1539 return NO_ERROR; 1540 if (o_minor < -1 || o_minor > MINORMASK) 1541 return ERR_RESYNC_AFTER; 1542 1543 /* check for loops */ 1544 odev = minor_to_device(o_minor); 1545 while (1) { 1546 if (odev == device) 1547 return ERR_RESYNC_AFTER_CYCLE; 1548 1549 /* You are free to depend on diskless, non-existing, 1550 * or not yet/no longer existing minors. 1551 * We only reject dependency loops. 1552 * We cannot follow the dependency chain beyond a detached or 1553 * missing minor. 1554 */ 1555 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS) 1556 return NO_ERROR; 1557 1558 rcu_read_lock(); 1559 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1560 rcu_read_unlock(); 1561 /* dependency chain ends here, no cycles. */ 1562 if (resync_after == -1) 1563 return NO_ERROR; 1564 1565 /* follow the dependency chain */ 1566 odev = minor_to_device(resync_after); 1567 } 1568 } 1569 1570 /* caller must lock_all_resources() */ 1571 void drbd_resync_after_changed(struct drbd_device *device) 1572 { 1573 int changed; 1574 1575 do { 1576 changed = drbd_pause_after(device); 1577 changed |= drbd_resume_next(device); 1578 } while (changed); 1579 } 1580 1581 void drbd_rs_controller_reset(struct drbd_device *device) 1582 { 1583 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk; 1584 struct fifo_buffer *plan; 1585 1586 atomic_set(&device->rs_sect_in, 0); 1587 atomic_set(&device->rs_sect_ev, 0); 1588 device->rs_in_flight = 0; 1589 device->rs_last_events = 1590 (int)part_stat_read(&disk->part0, sectors[0]) + 1591 (int)part_stat_read(&disk->part0, sectors[1]); 1592 1593 /* Updating the RCU protected object in place is necessary since 1594 this function gets called from atomic context. 1595 It is valid since all other updates also lead to an completely 1596 empty fifo */ 1597 rcu_read_lock(); 1598 plan = rcu_dereference(device->rs_plan_s); 1599 plan->total = 0; 1600 fifo_set(plan, 0); 1601 rcu_read_unlock(); 1602 } 1603 1604 void start_resync_timer_fn(unsigned long data) 1605 { 1606 struct drbd_device *device = (struct drbd_device *) data; 1607 drbd_device_post_work(device, RS_START); 1608 } 1609 1610 static void do_start_resync(struct drbd_device *device) 1611 { 1612 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) { 1613 drbd_warn(device, "postponing start_resync ...\n"); 1614 device->start_resync_timer.expires = jiffies + HZ/10; 1615 add_timer(&device->start_resync_timer); 1616 return; 1617 } 1618 1619 drbd_start_resync(device, C_SYNC_SOURCE); 1620 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags); 1621 } 1622 1623 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device) 1624 { 1625 bool csums_after_crash_only; 1626 rcu_read_lock(); 1627 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only; 1628 rcu_read_unlock(); 1629 return connection->agreed_pro_version >= 89 && /* supported? */ 1630 connection->csums_tfm && /* configured? */ 1631 (csums_after_crash_only == 0 /* use for each resync? */ 1632 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */ 1633 } 1634 1635 /** 1636 * drbd_start_resync() - Start the resync process 1637 * @device: DRBD device. 1638 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET 1639 * 1640 * This function might bring you directly into one of the 1641 * C_PAUSED_SYNC_* states. 1642 */ 1643 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) 1644 { 1645 struct drbd_peer_device *peer_device = first_peer_device(device); 1646 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; 1647 union drbd_state ns; 1648 int r; 1649 1650 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) { 1651 drbd_err(device, "Resync already running!\n"); 1652 return; 1653 } 1654 1655 if (!test_bit(B_RS_H_DONE, &device->flags)) { 1656 if (side == C_SYNC_TARGET) { 1657 /* Since application IO was locked out during C_WF_BITMAP_T and 1658 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET 1659 we check that we might make the data inconsistent. */ 1660 r = drbd_khelper(device, "before-resync-target"); 1661 r = (r >> 8) & 0xff; 1662 if (r > 0) { 1663 drbd_info(device, "before-resync-target handler returned %d, " 1664 "dropping connection.\n", r); 1665 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 1666 return; 1667 } 1668 } else /* C_SYNC_SOURCE */ { 1669 r = drbd_khelper(device, "before-resync-source"); 1670 r = (r >> 8) & 0xff; 1671 if (r > 0) { 1672 if (r == 3) { 1673 drbd_info(device, "before-resync-source handler returned %d, " 1674 "ignoring. Old userland tools?", r); 1675 } else { 1676 drbd_info(device, "before-resync-source handler returned %d, " 1677 "dropping connection.\n", r); 1678 conn_request_state(connection, 1679 NS(conn, C_DISCONNECTING), CS_HARD); 1680 return; 1681 } 1682 } 1683 } 1684 } 1685 1686 if (current == connection->worker.task) { 1687 /* The worker should not sleep waiting for state_mutex, 1688 that can take long */ 1689 if (!mutex_trylock(device->state_mutex)) { 1690 set_bit(B_RS_H_DONE, &device->flags); 1691 device->start_resync_timer.expires = jiffies + HZ/5; 1692 add_timer(&device->start_resync_timer); 1693 return; 1694 } 1695 } else { 1696 mutex_lock(device->state_mutex); 1697 } 1698 1699 lock_all_resources(); 1700 clear_bit(B_RS_H_DONE, &device->flags); 1701 /* Did some connection breakage or IO error race with us? */ 1702 if (device->state.conn < C_CONNECTED 1703 || !get_ldev_if_state(device, D_NEGOTIATING)) { 1704 unlock_all_resources(); 1705 goto out; 1706 } 1707 1708 ns = drbd_read_state(device); 1709 1710 ns.aftr_isp = !_drbd_may_sync_now(device); 1711 1712 ns.conn = side; 1713 1714 if (side == C_SYNC_TARGET) 1715 ns.disk = D_INCONSISTENT; 1716 else /* side == C_SYNC_SOURCE */ 1717 ns.pdsk = D_INCONSISTENT; 1718 1719 r = _drbd_set_state(device, ns, CS_VERBOSE, NULL); 1720 ns = drbd_read_state(device); 1721 1722 if (ns.conn < C_CONNECTED) 1723 r = SS_UNKNOWN_ERROR; 1724 1725 if (r == SS_SUCCESS) { 1726 unsigned long tw = drbd_bm_total_weight(device); 1727 unsigned long now = jiffies; 1728 int i; 1729 1730 device->rs_failed = 0; 1731 device->rs_paused = 0; 1732 device->rs_same_csum = 0; 1733 device->rs_last_sect_ev = 0; 1734 device->rs_total = tw; 1735 device->rs_start = now; 1736 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 1737 device->rs_mark_left[i] = tw; 1738 device->rs_mark_time[i] = now; 1739 } 1740 drbd_pause_after(device); 1741 /* Forget potentially stale cached per resync extent bit-counts. 1742 * Open coded drbd_rs_cancel_all(device), we already have IRQs 1743 * disabled, and know the disk state is ok. */ 1744 spin_lock(&device->al_lock); 1745 lc_reset(device->resync); 1746 device->resync_locked = 0; 1747 device->resync_wenr = LC_FREE; 1748 spin_unlock(&device->al_lock); 1749 } 1750 unlock_all_resources(); 1751 1752 if (r == SS_SUCCESS) { 1753 wake_up(&device->al_wait); /* for lc_reset() above */ 1754 /* reset rs_last_bcast when a resync or verify is started, 1755 * to deal with potential jiffies wrap. */ 1756 device->rs_last_bcast = jiffies - HZ; 1757 1758 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n", 1759 drbd_conn_str(ns.conn), 1760 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10), 1761 (unsigned long) device->rs_total); 1762 if (side == C_SYNC_TARGET) { 1763 device->bm_resync_fo = 0; 1764 device->use_csums = use_checksum_based_resync(connection, device); 1765 } else { 1766 device->use_csums = 0; 1767 } 1768 1769 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid 1770 * with w_send_oos, or the sync target will get confused as to 1771 * how much bits to resync. We cannot do that always, because for an 1772 * empty resync and protocol < 95, we need to do it here, as we call 1773 * drbd_resync_finished from here in that case. 1774 * We drbd_gen_and_send_sync_uuid here for protocol < 96, 1775 * and from after_state_ch otherwise. */ 1776 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96) 1777 drbd_gen_and_send_sync_uuid(peer_device); 1778 1779 if (connection->agreed_pro_version < 95 && device->rs_total == 0) { 1780 /* This still has a race (about when exactly the peers 1781 * detect connection loss) that can lead to a full sync 1782 * on next handshake. In 8.3.9 we fixed this with explicit 1783 * resync-finished notifications, but the fix 1784 * introduces a protocol change. Sleeping for some 1785 * time longer than the ping interval + timeout on the 1786 * SyncSource, to give the SyncTarget the chance to 1787 * detect connection loss, then waiting for a ping 1788 * response (implicit in drbd_resync_finished) reduces 1789 * the race considerably, but does not solve it. */ 1790 if (side == C_SYNC_SOURCE) { 1791 struct net_conf *nc; 1792 int timeo; 1793 1794 rcu_read_lock(); 1795 nc = rcu_dereference(connection->net_conf); 1796 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9; 1797 rcu_read_unlock(); 1798 schedule_timeout_interruptible(timeo); 1799 } 1800 drbd_resync_finished(device); 1801 } 1802 1803 drbd_rs_controller_reset(device); 1804 /* ns.conn may already be != device->state.conn, 1805 * we may have been paused in between, or become paused until 1806 * the timer triggers. 1807 * No matter, that is handled in resync_timer_fn() */ 1808 if (ns.conn == C_SYNC_TARGET) 1809 mod_timer(&device->resync_timer, jiffies); 1810 1811 drbd_md_sync(device); 1812 } 1813 put_ldev(device); 1814 out: 1815 mutex_unlock(device->state_mutex); 1816 } 1817 1818 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done) 1819 { 1820 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, }; 1821 device->rs_last_bcast = jiffies; 1822 1823 if (!get_ldev(device)) 1824 return; 1825 1826 drbd_bm_write_lazy(device, 0); 1827 if (resync_done && is_sync_state(device->state.conn)) 1828 drbd_resync_finished(device); 1829 1830 drbd_bcast_event(device, &sib); 1831 /* update timestamp, in case it took a while to write out stuff */ 1832 device->rs_last_bcast = jiffies; 1833 put_ldev(device); 1834 } 1835 1836 static void drbd_ldev_destroy(struct drbd_device *device) 1837 { 1838 lc_destroy(device->resync); 1839 device->resync = NULL; 1840 lc_destroy(device->act_log); 1841 device->act_log = NULL; 1842 1843 __acquire(local); 1844 drbd_backing_dev_free(device, device->ldev); 1845 device->ldev = NULL; 1846 __release(local); 1847 1848 clear_bit(GOING_DISKLESS, &device->flags); 1849 wake_up(&device->misc_wait); 1850 } 1851 1852 static void go_diskless(struct drbd_device *device) 1853 { 1854 D_ASSERT(device, device->state.disk == D_FAILED); 1855 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will 1856 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch 1857 * the protected members anymore, though, so once put_ldev reaches zero 1858 * again, it will be safe to free them. */ 1859 1860 /* Try to write changed bitmap pages, read errors may have just 1861 * set some bits outside the area covered by the activity log. 1862 * 1863 * If we have an IO error during the bitmap writeout, 1864 * we will want a full sync next time, just in case. 1865 * (Do we want a specific meta data flag for this?) 1866 * 1867 * If that does not make it to stable storage either, 1868 * we cannot do anything about that anymore. 1869 * 1870 * We still need to check if both bitmap and ldev are present, we may 1871 * end up here after a failed attach, before ldev was even assigned. 1872 */ 1873 if (device->bitmap && device->ldev) { 1874 /* An interrupted resync or similar is allowed to recounts bits 1875 * while we detach. 1876 * Any modifications would not be expected anymore, though. 1877 */ 1878 if (drbd_bitmap_io_from_worker(device, drbd_bm_write, 1879 "detach", BM_LOCKED_TEST_ALLOWED)) { 1880 if (test_bit(WAS_READ_ERROR, &device->flags)) { 1881 drbd_md_set_flag(device, MDF_FULL_SYNC); 1882 drbd_md_sync(device); 1883 } 1884 } 1885 } 1886 1887 drbd_force_state(device, NS(disk, D_DISKLESS)); 1888 } 1889 1890 static int do_md_sync(struct drbd_device *device) 1891 { 1892 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n"); 1893 drbd_md_sync(device); 1894 return 0; 1895 } 1896 1897 /* only called from drbd_worker thread, no locking */ 1898 void __update_timing_details( 1899 struct drbd_thread_timing_details *tdp, 1900 unsigned int *cb_nr, 1901 void *cb, 1902 const char *fn, const unsigned int line) 1903 { 1904 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST; 1905 struct drbd_thread_timing_details *td = tdp + i; 1906 1907 td->start_jif = jiffies; 1908 td->cb_addr = cb; 1909 td->caller_fn = fn; 1910 td->line = line; 1911 td->cb_nr = *cb_nr; 1912 1913 i = (i+1) % DRBD_THREAD_DETAILS_HIST; 1914 td = tdp + i; 1915 memset(td, 0, sizeof(*td)); 1916 1917 ++(*cb_nr); 1918 } 1919 1920 static void do_device_work(struct drbd_device *device, const unsigned long todo) 1921 { 1922 if (test_bit(MD_SYNC, &todo)) 1923 do_md_sync(device); 1924 if (test_bit(RS_DONE, &todo) || 1925 test_bit(RS_PROGRESS, &todo)) 1926 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo)); 1927 if (test_bit(GO_DISKLESS, &todo)) 1928 go_diskless(device); 1929 if (test_bit(DESTROY_DISK, &todo)) 1930 drbd_ldev_destroy(device); 1931 if (test_bit(RS_START, &todo)) 1932 do_start_resync(device); 1933 } 1934 1935 #define DRBD_DEVICE_WORK_MASK \ 1936 ((1UL << GO_DISKLESS) \ 1937 |(1UL << DESTROY_DISK) \ 1938 |(1UL << MD_SYNC) \ 1939 |(1UL << RS_START) \ 1940 |(1UL << RS_PROGRESS) \ 1941 |(1UL << RS_DONE) \ 1942 ) 1943 1944 static unsigned long get_work_bits(unsigned long *flags) 1945 { 1946 unsigned long old, new; 1947 do { 1948 old = *flags; 1949 new = old & ~DRBD_DEVICE_WORK_MASK; 1950 } while (cmpxchg(flags, old, new) != old); 1951 return old & DRBD_DEVICE_WORK_MASK; 1952 } 1953 1954 static void do_unqueued_work(struct drbd_connection *connection) 1955 { 1956 struct drbd_peer_device *peer_device; 1957 int vnr; 1958 1959 rcu_read_lock(); 1960 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1961 struct drbd_device *device = peer_device->device; 1962 unsigned long todo = get_work_bits(&device->flags); 1963 if (!todo) 1964 continue; 1965 1966 kref_get(&device->kref); 1967 rcu_read_unlock(); 1968 do_device_work(device, todo); 1969 kref_put(&device->kref, drbd_destroy_device); 1970 rcu_read_lock(); 1971 } 1972 rcu_read_unlock(); 1973 } 1974 1975 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list) 1976 { 1977 spin_lock_irq(&queue->q_lock); 1978 list_splice_tail_init(&queue->q, work_list); 1979 spin_unlock_irq(&queue->q_lock); 1980 return !list_empty(work_list); 1981 } 1982 1983 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list) 1984 { 1985 DEFINE_WAIT(wait); 1986 struct net_conf *nc; 1987 int uncork, cork; 1988 1989 dequeue_work_batch(&connection->sender_work, work_list); 1990 if (!list_empty(work_list)) 1991 return; 1992 1993 /* Still nothing to do? 1994 * Maybe we still need to close the current epoch, 1995 * even if no new requests are queued yet. 1996 * 1997 * Also, poke TCP, just in case. 1998 * Then wait for new work (or signal). */ 1999 rcu_read_lock(); 2000 nc = rcu_dereference(connection->net_conf); 2001 uncork = nc ? nc->tcp_cork : 0; 2002 rcu_read_unlock(); 2003 if (uncork) { 2004 mutex_lock(&connection->data.mutex); 2005 if (connection->data.socket) 2006 drbd_tcp_uncork(connection->data.socket); 2007 mutex_unlock(&connection->data.mutex); 2008 } 2009 2010 for (;;) { 2011 int send_barrier; 2012 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE); 2013 spin_lock_irq(&connection->resource->req_lock); 2014 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 2015 if (!list_empty(&connection->sender_work.q)) 2016 list_splice_tail_init(&connection->sender_work.q, work_list); 2017 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 2018 if (!list_empty(work_list) || signal_pending(current)) { 2019 spin_unlock_irq(&connection->resource->req_lock); 2020 break; 2021 } 2022 2023 /* We found nothing new to do, no to-be-communicated request, 2024 * no other work item. We may still need to close the last 2025 * epoch. Next incoming request epoch will be connection -> 2026 * current transfer log epoch number. If that is different 2027 * from the epoch of the last request we communicated, it is 2028 * safe to send the epoch separating barrier now. 2029 */ 2030 send_barrier = 2031 atomic_read(&connection->current_tle_nr) != 2032 connection->send.current_epoch_nr; 2033 spin_unlock_irq(&connection->resource->req_lock); 2034 2035 if (send_barrier) 2036 maybe_send_barrier(connection, 2037 connection->send.current_epoch_nr + 1); 2038 2039 if (test_bit(DEVICE_WORK_PENDING, &connection->flags)) 2040 break; 2041 2042 /* drbd_send() may have called flush_signals() */ 2043 if (get_t_state(&connection->worker) != RUNNING) 2044 break; 2045 2046 schedule(); 2047 /* may be woken up for other things but new work, too, 2048 * e.g. if the current epoch got closed. 2049 * In which case we send the barrier above. */ 2050 } 2051 finish_wait(&connection->sender_work.q_wait, &wait); 2052 2053 /* someone may have changed the config while we have been waiting above. */ 2054 rcu_read_lock(); 2055 nc = rcu_dereference(connection->net_conf); 2056 cork = nc ? nc->tcp_cork : 0; 2057 rcu_read_unlock(); 2058 mutex_lock(&connection->data.mutex); 2059 if (connection->data.socket) { 2060 if (cork) 2061 drbd_tcp_cork(connection->data.socket); 2062 else if (!uncork) 2063 drbd_tcp_uncork(connection->data.socket); 2064 } 2065 mutex_unlock(&connection->data.mutex); 2066 } 2067 2068 int drbd_worker(struct drbd_thread *thi) 2069 { 2070 struct drbd_connection *connection = thi->connection; 2071 struct drbd_work *w = NULL; 2072 struct drbd_peer_device *peer_device; 2073 LIST_HEAD(work_list); 2074 int vnr; 2075 2076 while (get_t_state(thi) == RUNNING) { 2077 drbd_thread_current_set_cpu(thi); 2078 2079 if (list_empty(&work_list)) { 2080 update_worker_timing_details(connection, wait_for_work); 2081 wait_for_work(connection, &work_list); 2082 } 2083 2084 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) { 2085 update_worker_timing_details(connection, do_unqueued_work); 2086 do_unqueued_work(connection); 2087 } 2088 2089 if (signal_pending(current)) { 2090 flush_signals(current); 2091 if (get_t_state(thi) == RUNNING) { 2092 drbd_warn(connection, "Worker got an unexpected signal\n"); 2093 continue; 2094 } 2095 break; 2096 } 2097 2098 if (get_t_state(thi) != RUNNING) 2099 break; 2100 2101 if (!list_empty(&work_list)) { 2102 w = list_first_entry(&work_list, struct drbd_work, list); 2103 list_del_init(&w->list); 2104 update_worker_timing_details(connection, w->cb); 2105 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0) 2106 continue; 2107 if (connection->cstate >= C_WF_REPORT_PARAMS) 2108 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 2109 } 2110 } 2111 2112 do { 2113 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) { 2114 update_worker_timing_details(connection, do_unqueued_work); 2115 do_unqueued_work(connection); 2116 } 2117 if (!list_empty(&work_list)) { 2118 w = list_first_entry(&work_list, struct drbd_work, list); 2119 list_del_init(&w->list); 2120 update_worker_timing_details(connection, w->cb); 2121 w->cb(w, 1); 2122 } else 2123 dequeue_work_batch(&connection->sender_work, &work_list); 2124 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags)); 2125 2126 rcu_read_lock(); 2127 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 2128 struct drbd_device *device = peer_device->device; 2129 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE); 2130 kref_get(&device->kref); 2131 rcu_read_unlock(); 2132 drbd_device_cleanup(device); 2133 kref_put(&device->kref, drbd_destroy_device); 2134 rcu_read_lock(); 2135 } 2136 rcu_read_unlock(); 2137 2138 return 0; 2139 } 2140