1 /* 2 drbd_actlog.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2003-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 2003-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2003-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24 */ 25 26 #include <linux/slab.h> 27 #include <linux/crc32c.h> 28 #include <linux/drbd.h> 29 #include <linux/drbd_limits.h> 30 #include "drbd_int.h" 31 32 33 enum al_transaction_types { 34 AL_TR_UPDATE = 0, 35 AL_TR_INITIALIZED = 0xffff 36 }; 37 /* all fields on disc in big endian */ 38 struct __packed al_transaction_on_disk { 39 /* don't we all like magic */ 40 __be32 magic; 41 42 /* to identify the most recent transaction block 43 * in the on disk ring buffer */ 44 __be32 tr_number; 45 46 /* checksum on the full 4k block, with this field set to 0. */ 47 __be32 crc32c; 48 49 /* type of transaction, special transaction types like: 50 * purge-all, set-all-idle, set-all-active, ... to-be-defined 51 * see also enum al_transaction_types */ 52 __be16 transaction_type; 53 54 /* we currently allow only a few thousand extents, 55 * so 16bit will be enough for the slot number. */ 56 57 /* how many updates in this transaction */ 58 __be16 n_updates; 59 60 /* maximum slot number, "al-extents" in drbd.conf speak. 61 * Having this in each transaction should make reconfiguration 62 * of that parameter easier. */ 63 __be16 context_size; 64 65 /* slot number the context starts with */ 66 __be16 context_start_slot_nr; 67 68 /* Some reserved bytes. Expected usage is a 64bit counter of 69 * sectors-written since device creation, and other data generation tag 70 * supporting usage */ 71 __be32 __reserved[4]; 72 73 /* --- 36 byte used --- */ 74 75 /* Reserve space for up to AL_UPDATES_PER_TRANSACTION changes 76 * in one transaction, then use the remaining byte in the 4k block for 77 * context information. "Flexible" number of updates per transaction 78 * does not help, as we have to account for the case when all update 79 * slots are used anyways, so it would only complicate code without 80 * additional benefit. 81 */ 82 __be16 update_slot_nr[AL_UPDATES_PER_TRANSACTION]; 83 84 /* but the extent number is 32bit, which at an extent size of 4 MiB 85 * allows to cover device sizes of up to 2**54 Byte (16 PiB) */ 86 __be32 update_extent_nr[AL_UPDATES_PER_TRANSACTION]; 87 88 /* --- 420 bytes used (36 + 64*6) --- */ 89 90 /* 4096 - 420 = 3676 = 919 * 4 */ 91 __be32 context[AL_CONTEXT_PER_TRANSACTION]; 92 }; 93 94 void *drbd_md_get_buffer(struct drbd_device *device, const char *intent) 95 { 96 int r; 97 98 wait_event(device->misc_wait, 99 (r = atomic_cmpxchg(&device->md_io.in_use, 0, 1)) == 0 || 100 device->state.disk <= D_FAILED); 101 102 if (r) 103 return NULL; 104 105 device->md_io.current_use = intent; 106 device->md_io.start_jif = jiffies; 107 device->md_io.submit_jif = device->md_io.start_jif - 1; 108 return page_address(device->md_io.page); 109 } 110 111 void drbd_md_put_buffer(struct drbd_device *device) 112 { 113 if (atomic_dec_and_test(&device->md_io.in_use)) 114 wake_up(&device->misc_wait); 115 } 116 117 void wait_until_done_or_force_detached(struct drbd_device *device, struct drbd_backing_dev *bdev, 118 unsigned int *done) 119 { 120 long dt; 121 122 rcu_read_lock(); 123 dt = rcu_dereference(bdev->disk_conf)->disk_timeout; 124 rcu_read_unlock(); 125 dt = dt * HZ / 10; 126 if (dt == 0) 127 dt = MAX_SCHEDULE_TIMEOUT; 128 129 dt = wait_event_timeout(device->misc_wait, 130 *done || test_bit(FORCE_DETACH, &device->flags), dt); 131 if (dt == 0) { 132 drbd_err(device, "meta-data IO operation timed out\n"); 133 drbd_chk_io_error(device, 1, DRBD_FORCE_DETACH); 134 } 135 } 136 137 static int _drbd_md_sync_page_io(struct drbd_device *device, 138 struct drbd_backing_dev *bdev, 139 sector_t sector, int op) 140 { 141 struct bio *bio; 142 /* we do all our meta data IO in aligned 4k blocks. */ 143 const int size = 4096; 144 int err, op_flags = 0; 145 146 device->md_io.done = 0; 147 device->md_io.error = -ENODEV; 148 149 if ((op == REQ_OP_WRITE) && !test_bit(MD_NO_FUA, &device->flags)) 150 op_flags |= REQ_FUA | REQ_PREFLUSH; 151 op_flags |= REQ_SYNC; 152 153 bio = bio_alloc_drbd(GFP_NOIO); 154 bio_set_dev(bio, bdev->md_bdev); 155 bio->bi_iter.bi_sector = sector; 156 err = -EIO; 157 if (bio_add_page(bio, device->md_io.page, size, 0) != size) 158 goto out; 159 bio->bi_private = device; 160 bio->bi_end_io = drbd_md_endio; 161 bio_set_op_attrs(bio, op, op_flags); 162 163 if (op != REQ_OP_WRITE && device->state.disk == D_DISKLESS && device->ldev == NULL) 164 /* special case, drbd_md_read() during drbd_adm_attach(): no get_ldev */ 165 ; 166 else if (!get_ldev_if_state(device, D_ATTACHING)) { 167 /* Corresponding put_ldev in drbd_md_endio() */ 168 drbd_err(device, "ASSERT FAILED: get_ldev_if_state() == 1 in _drbd_md_sync_page_io()\n"); 169 err = -ENODEV; 170 goto out; 171 } 172 173 bio_get(bio); /* one bio_put() is in the completion handler */ 174 atomic_inc(&device->md_io.in_use); /* drbd_md_put_buffer() is in the completion handler */ 175 device->md_io.submit_jif = jiffies; 176 if (drbd_insert_fault(device, (op == REQ_OP_WRITE) ? DRBD_FAULT_MD_WR : DRBD_FAULT_MD_RD)) 177 bio_io_error(bio); 178 else 179 submit_bio(bio); 180 wait_until_done_or_force_detached(device, bdev, &device->md_io.done); 181 if (!bio->bi_status) 182 err = device->md_io.error; 183 184 out: 185 bio_put(bio); 186 return err; 187 } 188 189 int drbd_md_sync_page_io(struct drbd_device *device, struct drbd_backing_dev *bdev, 190 sector_t sector, int op) 191 { 192 int err; 193 D_ASSERT(device, atomic_read(&device->md_io.in_use) == 1); 194 195 BUG_ON(!bdev->md_bdev); 196 197 dynamic_drbd_dbg(device, "meta_data io: %s [%d]:%s(,%llus,%s) %pS\n", 198 current->comm, current->pid, __func__, 199 (unsigned long long)sector, (op == REQ_OP_WRITE) ? "WRITE" : "READ", 200 (void*)_RET_IP_ ); 201 202 if (sector < drbd_md_first_sector(bdev) || 203 sector + 7 > drbd_md_last_sector(bdev)) 204 drbd_alert(device, "%s [%d]:%s(,%llus,%s) out of range md access!\n", 205 current->comm, current->pid, __func__, 206 (unsigned long long)sector, 207 (op == REQ_OP_WRITE) ? "WRITE" : "READ"); 208 209 err = _drbd_md_sync_page_io(device, bdev, sector, op); 210 if (err) { 211 drbd_err(device, "drbd_md_sync_page_io(,%llus,%s) failed with error %d\n", 212 (unsigned long long)sector, 213 (op == REQ_OP_WRITE) ? "WRITE" : "READ", err); 214 } 215 return err; 216 } 217 218 static struct bm_extent *find_active_resync_extent(struct drbd_device *device, unsigned int enr) 219 { 220 struct lc_element *tmp; 221 tmp = lc_find(device->resync, enr/AL_EXT_PER_BM_SECT); 222 if (unlikely(tmp != NULL)) { 223 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce); 224 if (test_bit(BME_NO_WRITES, &bm_ext->flags)) 225 return bm_ext; 226 } 227 return NULL; 228 } 229 230 static struct lc_element *_al_get(struct drbd_device *device, unsigned int enr, bool nonblock) 231 { 232 struct lc_element *al_ext; 233 struct bm_extent *bm_ext; 234 int wake; 235 236 spin_lock_irq(&device->al_lock); 237 bm_ext = find_active_resync_extent(device, enr); 238 if (bm_ext) { 239 wake = !test_and_set_bit(BME_PRIORITY, &bm_ext->flags); 240 spin_unlock_irq(&device->al_lock); 241 if (wake) 242 wake_up(&device->al_wait); 243 return NULL; 244 } 245 if (nonblock) 246 al_ext = lc_try_get(device->act_log, enr); 247 else 248 al_ext = lc_get(device->act_log, enr); 249 spin_unlock_irq(&device->al_lock); 250 return al_ext; 251 } 252 253 bool drbd_al_begin_io_fastpath(struct drbd_device *device, struct drbd_interval *i) 254 { 255 /* for bios crossing activity log extent boundaries, 256 * we may need to activate two extents in one go */ 257 unsigned first = i->sector >> (AL_EXTENT_SHIFT-9); 258 unsigned last = i->size == 0 ? first : (i->sector + (i->size >> 9) - 1) >> (AL_EXTENT_SHIFT-9); 259 260 D_ASSERT(device, first <= last); 261 D_ASSERT(device, atomic_read(&device->local_cnt) > 0); 262 263 /* FIXME figure out a fast path for bios crossing AL extent boundaries */ 264 if (first != last) 265 return false; 266 267 return _al_get(device, first, true); 268 } 269 270 bool drbd_al_begin_io_prepare(struct drbd_device *device, struct drbd_interval *i) 271 { 272 /* for bios crossing activity log extent boundaries, 273 * we may need to activate two extents in one go */ 274 unsigned first = i->sector >> (AL_EXTENT_SHIFT-9); 275 unsigned last = i->size == 0 ? first : (i->sector + (i->size >> 9) - 1) >> (AL_EXTENT_SHIFT-9); 276 unsigned enr; 277 bool need_transaction = false; 278 279 D_ASSERT(device, first <= last); 280 D_ASSERT(device, atomic_read(&device->local_cnt) > 0); 281 282 for (enr = first; enr <= last; enr++) { 283 struct lc_element *al_ext; 284 wait_event(device->al_wait, 285 (al_ext = _al_get(device, enr, false)) != NULL); 286 if (al_ext->lc_number != enr) 287 need_transaction = true; 288 } 289 return need_transaction; 290 } 291 292 #if (PAGE_SHIFT + 3) < (AL_EXTENT_SHIFT - BM_BLOCK_SHIFT) 293 /* Currently BM_BLOCK_SHIFT, BM_EXT_SHIFT and AL_EXTENT_SHIFT 294 * are still coupled, or assume too much about their relation. 295 * Code below will not work if this is violated. 296 * Will be cleaned up with some followup patch. 297 */ 298 # error FIXME 299 #endif 300 301 static unsigned int al_extent_to_bm_page(unsigned int al_enr) 302 { 303 return al_enr >> 304 /* bit to page */ 305 ((PAGE_SHIFT + 3) - 306 /* al extent number to bit */ 307 (AL_EXTENT_SHIFT - BM_BLOCK_SHIFT)); 308 } 309 310 static sector_t al_tr_number_to_on_disk_sector(struct drbd_device *device) 311 { 312 const unsigned int stripes = device->ldev->md.al_stripes; 313 const unsigned int stripe_size_4kB = device->ldev->md.al_stripe_size_4k; 314 315 /* transaction number, modulo on-disk ring buffer wrap around */ 316 unsigned int t = device->al_tr_number % (device->ldev->md.al_size_4k); 317 318 /* ... to aligned 4k on disk block */ 319 t = ((t % stripes) * stripe_size_4kB) + t/stripes; 320 321 /* ... to 512 byte sector in activity log */ 322 t *= 8; 323 324 /* ... plus offset to the on disk position */ 325 return device->ldev->md.md_offset + device->ldev->md.al_offset + t; 326 } 327 328 static int __al_write_transaction(struct drbd_device *device, struct al_transaction_on_disk *buffer) 329 { 330 struct lc_element *e; 331 sector_t sector; 332 int i, mx; 333 unsigned extent_nr; 334 unsigned crc = 0; 335 int err = 0; 336 337 memset(buffer, 0, sizeof(*buffer)); 338 buffer->magic = cpu_to_be32(DRBD_AL_MAGIC); 339 buffer->tr_number = cpu_to_be32(device->al_tr_number); 340 341 i = 0; 342 343 drbd_bm_reset_al_hints(device); 344 345 /* Even though no one can start to change this list 346 * once we set the LC_LOCKED -- from drbd_al_begin_io(), 347 * lc_try_lock_for_transaction() --, someone may still 348 * be in the process of changing it. */ 349 spin_lock_irq(&device->al_lock); 350 list_for_each_entry(e, &device->act_log->to_be_changed, list) { 351 if (i == AL_UPDATES_PER_TRANSACTION) { 352 i++; 353 break; 354 } 355 buffer->update_slot_nr[i] = cpu_to_be16(e->lc_index); 356 buffer->update_extent_nr[i] = cpu_to_be32(e->lc_new_number); 357 if (e->lc_number != LC_FREE) 358 drbd_bm_mark_for_writeout(device, 359 al_extent_to_bm_page(e->lc_number)); 360 i++; 361 } 362 spin_unlock_irq(&device->al_lock); 363 BUG_ON(i > AL_UPDATES_PER_TRANSACTION); 364 365 buffer->n_updates = cpu_to_be16(i); 366 for ( ; i < AL_UPDATES_PER_TRANSACTION; i++) { 367 buffer->update_slot_nr[i] = cpu_to_be16(-1); 368 buffer->update_extent_nr[i] = cpu_to_be32(LC_FREE); 369 } 370 371 buffer->context_size = cpu_to_be16(device->act_log->nr_elements); 372 buffer->context_start_slot_nr = cpu_to_be16(device->al_tr_cycle); 373 374 mx = min_t(int, AL_CONTEXT_PER_TRANSACTION, 375 device->act_log->nr_elements - device->al_tr_cycle); 376 for (i = 0; i < mx; i++) { 377 unsigned idx = device->al_tr_cycle + i; 378 extent_nr = lc_element_by_index(device->act_log, idx)->lc_number; 379 buffer->context[i] = cpu_to_be32(extent_nr); 380 } 381 for (; i < AL_CONTEXT_PER_TRANSACTION; i++) 382 buffer->context[i] = cpu_to_be32(LC_FREE); 383 384 device->al_tr_cycle += AL_CONTEXT_PER_TRANSACTION; 385 if (device->al_tr_cycle >= device->act_log->nr_elements) 386 device->al_tr_cycle = 0; 387 388 sector = al_tr_number_to_on_disk_sector(device); 389 390 crc = crc32c(0, buffer, 4096); 391 buffer->crc32c = cpu_to_be32(crc); 392 393 if (drbd_bm_write_hinted(device)) 394 err = -EIO; 395 else { 396 bool write_al_updates; 397 rcu_read_lock(); 398 write_al_updates = rcu_dereference(device->ldev->disk_conf)->al_updates; 399 rcu_read_unlock(); 400 if (write_al_updates) { 401 if (drbd_md_sync_page_io(device, device->ldev, sector, WRITE)) { 402 err = -EIO; 403 drbd_chk_io_error(device, 1, DRBD_META_IO_ERROR); 404 } else { 405 device->al_tr_number++; 406 device->al_writ_cnt++; 407 } 408 } 409 } 410 411 return err; 412 } 413 414 static int al_write_transaction(struct drbd_device *device) 415 { 416 struct al_transaction_on_disk *buffer; 417 int err; 418 419 if (!get_ldev(device)) { 420 drbd_err(device, "disk is %s, cannot start al transaction\n", 421 drbd_disk_str(device->state.disk)); 422 return -EIO; 423 } 424 425 /* The bitmap write may have failed, causing a state change. */ 426 if (device->state.disk < D_INCONSISTENT) { 427 drbd_err(device, 428 "disk is %s, cannot write al transaction\n", 429 drbd_disk_str(device->state.disk)); 430 put_ldev(device); 431 return -EIO; 432 } 433 434 /* protects md_io_buffer, al_tr_cycle, ... */ 435 buffer = drbd_md_get_buffer(device, __func__); 436 if (!buffer) { 437 drbd_err(device, "disk failed while waiting for md_io buffer\n"); 438 put_ldev(device); 439 return -ENODEV; 440 } 441 442 err = __al_write_transaction(device, buffer); 443 444 drbd_md_put_buffer(device); 445 put_ldev(device); 446 447 return err; 448 } 449 450 451 void drbd_al_begin_io_commit(struct drbd_device *device) 452 { 453 bool locked = false; 454 455 /* Serialize multiple transactions. 456 * This uses test_and_set_bit, memory barrier is implicit. 457 */ 458 wait_event(device->al_wait, 459 device->act_log->pending_changes == 0 || 460 (locked = lc_try_lock_for_transaction(device->act_log))); 461 462 if (locked) { 463 /* Double check: it may have been committed by someone else, 464 * while we have been waiting for the lock. */ 465 if (device->act_log->pending_changes) { 466 bool write_al_updates; 467 468 rcu_read_lock(); 469 write_al_updates = rcu_dereference(device->ldev->disk_conf)->al_updates; 470 rcu_read_unlock(); 471 472 if (write_al_updates) 473 al_write_transaction(device); 474 spin_lock_irq(&device->al_lock); 475 /* FIXME 476 if (err) 477 we need an "lc_cancel" here; 478 */ 479 lc_committed(device->act_log); 480 spin_unlock_irq(&device->al_lock); 481 } 482 lc_unlock(device->act_log); 483 wake_up(&device->al_wait); 484 } 485 } 486 487 /* 488 * @delegate: delegate activity log I/O to the worker thread 489 */ 490 void drbd_al_begin_io(struct drbd_device *device, struct drbd_interval *i) 491 { 492 if (drbd_al_begin_io_prepare(device, i)) 493 drbd_al_begin_io_commit(device); 494 } 495 496 int drbd_al_begin_io_nonblock(struct drbd_device *device, struct drbd_interval *i) 497 { 498 struct lru_cache *al = device->act_log; 499 /* for bios crossing activity log extent boundaries, 500 * we may need to activate two extents in one go */ 501 unsigned first = i->sector >> (AL_EXTENT_SHIFT-9); 502 unsigned last = i->size == 0 ? first : (i->sector + (i->size >> 9) - 1) >> (AL_EXTENT_SHIFT-9); 503 unsigned nr_al_extents; 504 unsigned available_update_slots; 505 unsigned enr; 506 507 D_ASSERT(device, first <= last); 508 509 nr_al_extents = 1 + last - first; /* worst case: all touched extends are cold. */ 510 available_update_slots = min(al->nr_elements - al->used, 511 al->max_pending_changes - al->pending_changes); 512 513 /* We want all necessary updates for a given request within the same transaction 514 * We could first check how many updates are *actually* needed, 515 * and use that instead of the worst-case nr_al_extents */ 516 if (available_update_slots < nr_al_extents) { 517 /* Too many activity log extents are currently "hot". 518 * 519 * If we have accumulated pending changes already, 520 * we made progress. 521 * 522 * If we cannot get even a single pending change through, 523 * stop the fast path until we made some progress, 524 * or requests to "cold" extents could be starved. */ 525 if (!al->pending_changes) 526 __set_bit(__LC_STARVING, &device->act_log->flags); 527 return -ENOBUFS; 528 } 529 530 /* Is resync active in this area? */ 531 for (enr = first; enr <= last; enr++) { 532 struct lc_element *tmp; 533 tmp = lc_find(device->resync, enr/AL_EXT_PER_BM_SECT); 534 if (unlikely(tmp != NULL)) { 535 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce); 536 if (test_bit(BME_NO_WRITES, &bm_ext->flags)) { 537 if (!test_and_set_bit(BME_PRIORITY, &bm_ext->flags)) 538 return -EBUSY; 539 return -EWOULDBLOCK; 540 } 541 } 542 } 543 544 /* Checkout the refcounts. 545 * Given that we checked for available elements and update slots above, 546 * this has to be successful. */ 547 for (enr = first; enr <= last; enr++) { 548 struct lc_element *al_ext; 549 al_ext = lc_get_cumulative(device->act_log, enr); 550 if (!al_ext) 551 drbd_info(device, "LOGIC BUG for enr=%u\n", enr); 552 } 553 return 0; 554 } 555 556 void drbd_al_complete_io(struct drbd_device *device, struct drbd_interval *i) 557 { 558 /* for bios crossing activity log extent boundaries, 559 * we may need to activate two extents in one go */ 560 unsigned first = i->sector >> (AL_EXTENT_SHIFT-9); 561 unsigned last = i->size == 0 ? first : (i->sector + (i->size >> 9) - 1) >> (AL_EXTENT_SHIFT-9); 562 unsigned enr; 563 struct lc_element *extent; 564 unsigned long flags; 565 566 D_ASSERT(device, first <= last); 567 spin_lock_irqsave(&device->al_lock, flags); 568 569 for (enr = first; enr <= last; enr++) { 570 extent = lc_find(device->act_log, enr); 571 if (!extent) { 572 drbd_err(device, "al_complete_io() called on inactive extent %u\n", enr); 573 continue; 574 } 575 lc_put(device->act_log, extent); 576 } 577 spin_unlock_irqrestore(&device->al_lock, flags); 578 wake_up(&device->al_wait); 579 } 580 581 static int _try_lc_del(struct drbd_device *device, struct lc_element *al_ext) 582 { 583 int rv; 584 585 spin_lock_irq(&device->al_lock); 586 rv = (al_ext->refcnt == 0); 587 if (likely(rv)) 588 lc_del(device->act_log, al_ext); 589 spin_unlock_irq(&device->al_lock); 590 591 return rv; 592 } 593 594 /** 595 * drbd_al_shrink() - Removes all active extents form the activity log 596 * @device: DRBD device. 597 * 598 * Removes all active extents form the activity log, waiting until 599 * the reference count of each entry dropped to 0 first, of course. 600 * 601 * You need to lock device->act_log with lc_try_lock() / lc_unlock() 602 */ 603 void drbd_al_shrink(struct drbd_device *device) 604 { 605 struct lc_element *al_ext; 606 int i; 607 608 D_ASSERT(device, test_bit(__LC_LOCKED, &device->act_log->flags)); 609 610 for (i = 0; i < device->act_log->nr_elements; i++) { 611 al_ext = lc_element_by_index(device->act_log, i); 612 if (al_ext->lc_number == LC_FREE) 613 continue; 614 wait_event(device->al_wait, _try_lc_del(device, al_ext)); 615 } 616 617 wake_up(&device->al_wait); 618 } 619 620 int drbd_al_initialize(struct drbd_device *device, void *buffer) 621 { 622 struct al_transaction_on_disk *al = buffer; 623 struct drbd_md *md = &device->ldev->md; 624 int al_size_4k = md->al_stripes * md->al_stripe_size_4k; 625 int i; 626 627 __al_write_transaction(device, al); 628 /* There may or may not have been a pending transaction. */ 629 spin_lock_irq(&device->al_lock); 630 lc_committed(device->act_log); 631 spin_unlock_irq(&device->al_lock); 632 633 /* The rest of the transactions will have an empty "updates" list, and 634 * are written out only to provide the context, and to initialize the 635 * on-disk ring buffer. */ 636 for (i = 1; i < al_size_4k; i++) { 637 int err = __al_write_transaction(device, al); 638 if (err) 639 return err; 640 } 641 return 0; 642 } 643 644 static const char *drbd_change_sync_fname[] = { 645 [RECORD_RS_FAILED] = "drbd_rs_failed_io", 646 [SET_IN_SYNC] = "drbd_set_in_sync", 647 [SET_OUT_OF_SYNC] = "drbd_set_out_of_sync" 648 }; 649 650 /* ATTENTION. The AL's extents are 4MB each, while the extents in the 651 * resync LRU-cache are 16MB each. 652 * The caller of this function has to hold an get_ldev() reference. 653 * 654 * Adjusts the caching members ->rs_left (success) or ->rs_failed (!success), 655 * potentially pulling in (and recounting the corresponding bits) 656 * this resync extent into the resync extent lru cache. 657 * 658 * Returns whether all bits have been cleared for this resync extent, 659 * precisely: (rs_left <= rs_failed) 660 * 661 * TODO will be obsoleted once we have a caching lru of the on disk bitmap 662 */ 663 static bool update_rs_extent(struct drbd_device *device, 664 unsigned int enr, int count, 665 enum update_sync_bits_mode mode) 666 { 667 struct lc_element *e; 668 669 D_ASSERT(device, atomic_read(&device->local_cnt)); 670 671 /* When setting out-of-sync bits, 672 * we don't need it cached (lc_find). 673 * But if it is present in the cache, 674 * we should update the cached bit count. 675 * Otherwise, that extent should be in the resync extent lru cache 676 * already -- or we want to pull it in if necessary -- (lc_get), 677 * then update and check rs_left and rs_failed. */ 678 if (mode == SET_OUT_OF_SYNC) 679 e = lc_find(device->resync, enr); 680 else 681 e = lc_get(device->resync, enr); 682 if (e) { 683 struct bm_extent *ext = lc_entry(e, struct bm_extent, lce); 684 if (ext->lce.lc_number == enr) { 685 if (mode == SET_IN_SYNC) 686 ext->rs_left -= count; 687 else if (mode == SET_OUT_OF_SYNC) 688 ext->rs_left += count; 689 else 690 ext->rs_failed += count; 691 if (ext->rs_left < ext->rs_failed) { 692 drbd_warn(device, "BAD! enr=%u rs_left=%d " 693 "rs_failed=%d count=%d cstate=%s\n", 694 ext->lce.lc_number, ext->rs_left, 695 ext->rs_failed, count, 696 drbd_conn_str(device->state.conn)); 697 698 /* We don't expect to be able to clear more bits 699 * than have been set when we originally counted 700 * the set bits to cache that value in ext->rs_left. 701 * Whatever the reason (disconnect during resync, 702 * delayed local completion of an application write), 703 * try to fix it up by recounting here. */ 704 ext->rs_left = drbd_bm_e_weight(device, enr); 705 } 706 } else { 707 /* Normally this element should be in the cache, 708 * since drbd_rs_begin_io() pulled it already in. 709 * 710 * But maybe an application write finished, and we set 711 * something outside the resync lru_cache in sync. 712 */ 713 int rs_left = drbd_bm_e_weight(device, enr); 714 if (ext->flags != 0) { 715 drbd_warn(device, "changing resync lce: %d[%u;%02lx]" 716 " -> %d[%u;00]\n", 717 ext->lce.lc_number, ext->rs_left, 718 ext->flags, enr, rs_left); 719 ext->flags = 0; 720 } 721 if (ext->rs_failed) { 722 drbd_warn(device, "Kicking resync_lru element enr=%u " 723 "out with rs_failed=%d\n", 724 ext->lce.lc_number, ext->rs_failed); 725 } 726 ext->rs_left = rs_left; 727 ext->rs_failed = (mode == RECORD_RS_FAILED) ? count : 0; 728 /* we don't keep a persistent log of the resync lru, 729 * we can commit any change right away. */ 730 lc_committed(device->resync); 731 } 732 if (mode != SET_OUT_OF_SYNC) 733 lc_put(device->resync, &ext->lce); 734 /* no race, we are within the al_lock! */ 735 736 if (ext->rs_left <= ext->rs_failed) { 737 ext->rs_failed = 0; 738 return true; 739 } 740 } else if (mode != SET_OUT_OF_SYNC) { 741 /* be quiet if lc_find() did not find it. */ 742 drbd_err(device, "lc_get() failed! locked=%d/%d flags=%lu\n", 743 device->resync_locked, 744 device->resync->nr_elements, 745 device->resync->flags); 746 } 747 return false; 748 } 749 750 void drbd_advance_rs_marks(struct drbd_device *device, unsigned long still_to_go) 751 { 752 unsigned long now = jiffies; 753 unsigned long last = device->rs_mark_time[device->rs_last_mark]; 754 int next = (device->rs_last_mark + 1) % DRBD_SYNC_MARKS; 755 if (time_after_eq(now, last + DRBD_SYNC_MARK_STEP)) { 756 if (device->rs_mark_left[device->rs_last_mark] != still_to_go && 757 device->state.conn != C_PAUSED_SYNC_T && 758 device->state.conn != C_PAUSED_SYNC_S) { 759 device->rs_mark_time[next] = now; 760 device->rs_mark_left[next] = still_to_go; 761 device->rs_last_mark = next; 762 } 763 } 764 } 765 766 /* It is called lazy update, so don't do write-out too often. */ 767 static bool lazy_bitmap_update_due(struct drbd_device *device) 768 { 769 return time_after(jiffies, device->rs_last_bcast + 2*HZ); 770 } 771 772 static void maybe_schedule_on_disk_bitmap_update(struct drbd_device *device, bool rs_done) 773 { 774 if (rs_done) { 775 struct drbd_connection *connection = first_peer_device(device)->connection; 776 if (connection->agreed_pro_version <= 95 || 777 is_sync_target_state(device->state.conn)) 778 set_bit(RS_DONE, &device->flags); 779 /* and also set RS_PROGRESS below */ 780 781 /* Else: rather wait for explicit notification via receive_state, 782 * to avoid uuids-rotated-too-fast causing full resync 783 * in next handshake, in case the replication link breaks 784 * at the most unfortunate time... */ 785 } else if (!lazy_bitmap_update_due(device)) 786 return; 787 788 drbd_device_post_work(device, RS_PROGRESS); 789 } 790 791 static int update_sync_bits(struct drbd_device *device, 792 unsigned long sbnr, unsigned long ebnr, 793 enum update_sync_bits_mode mode) 794 { 795 /* 796 * We keep a count of set bits per resync-extent in the ->rs_left 797 * caching member, so we need to loop and work within the resync extent 798 * alignment. Typically this loop will execute exactly once. 799 */ 800 unsigned long flags; 801 unsigned long count = 0; 802 unsigned int cleared = 0; 803 while (sbnr <= ebnr) { 804 /* set temporary boundary bit number to last bit number within 805 * the resync extent of the current start bit number, 806 * but cap at provided end bit number */ 807 unsigned long tbnr = min(ebnr, sbnr | BM_BLOCKS_PER_BM_EXT_MASK); 808 unsigned long c; 809 810 if (mode == RECORD_RS_FAILED) 811 /* Only called from drbd_rs_failed_io(), bits 812 * supposedly still set. Recount, maybe some 813 * of the bits have been successfully cleared 814 * by application IO meanwhile. 815 */ 816 c = drbd_bm_count_bits(device, sbnr, tbnr); 817 else if (mode == SET_IN_SYNC) 818 c = drbd_bm_clear_bits(device, sbnr, tbnr); 819 else /* if (mode == SET_OUT_OF_SYNC) */ 820 c = drbd_bm_set_bits(device, sbnr, tbnr); 821 822 if (c) { 823 spin_lock_irqsave(&device->al_lock, flags); 824 cleared += update_rs_extent(device, BM_BIT_TO_EXT(sbnr), c, mode); 825 spin_unlock_irqrestore(&device->al_lock, flags); 826 count += c; 827 } 828 sbnr = tbnr + 1; 829 } 830 if (count) { 831 if (mode == SET_IN_SYNC) { 832 unsigned long still_to_go = drbd_bm_total_weight(device); 833 bool rs_is_done = (still_to_go <= device->rs_failed); 834 drbd_advance_rs_marks(device, still_to_go); 835 if (cleared || rs_is_done) 836 maybe_schedule_on_disk_bitmap_update(device, rs_is_done); 837 } else if (mode == RECORD_RS_FAILED) 838 device->rs_failed += count; 839 wake_up(&device->al_wait); 840 } 841 return count; 842 } 843 844 static bool plausible_request_size(int size) 845 { 846 return size > 0 847 && size <= DRBD_MAX_BATCH_BIO_SIZE 848 && IS_ALIGNED(size, 512); 849 } 850 851 /* clear the bit corresponding to the piece of storage in question: 852 * size byte of data starting from sector. Only clear a bits of the affected 853 * one ore more _aligned_ BM_BLOCK_SIZE blocks. 854 * 855 * called by worker on C_SYNC_TARGET and receiver on SyncSource. 856 * 857 */ 858 int __drbd_change_sync(struct drbd_device *device, sector_t sector, int size, 859 enum update_sync_bits_mode mode) 860 { 861 /* Is called from worker and receiver context _only_ */ 862 unsigned long sbnr, ebnr, lbnr; 863 unsigned long count = 0; 864 sector_t esector, nr_sectors; 865 866 /* This would be an empty REQ_PREFLUSH, be silent. */ 867 if ((mode == SET_OUT_OF_SYNC) && size == 0) 868 return 0; 869 870 if (!plausible_request_size(size)) { 871 drbd_err(device, "%s: sector=%llus size=%d nonsense!\n", 872 drbd_change_sync_fname[mode], 873 (unsigned long long)sector, size); 874 return 0; 875 } 876 877 if (!get_ldev(device)) 878 return 0; /* no disk, no metadata, no bitmap to manipulate bits in */ 879 880 nr_sectors = drbd_get_capacity(device->this_bdev); 881 esector = sector + (size >> 9) - 1; 882 883 if (!expect(sector < nr_sectors)) 884 goto out; 885 if (!expect(esector < nr_sectors)) 886 esector = nr_sectors - 1; 887 888 lbnr = BM_SECT_TO_BIT(nr_sectors-1); 889 890 if (mode == SET_IN_SYNC) { 891 /* Round up start sector, round down end sector. We make sure 892 * we only clear full, aligned, BM_BLOCK_SIZE blocks. */ 893 if (unlikely(esector < BM_SECT_PER_BIT-1)) 894 goto out; 895 if (unlikely(esector == (nr_sectors-1))) 896 ebnr = lbnr; 897 else 898 ebnr = BM_SECT_TO_BIT(esector - (BM_SECT_PER_BIT-1)); 899 sbnr = BM_SECT_TO_BIT(sector + BM_SECT_PER_BIT-1); 900 } else { 901 /* We set it out of sync, or record resync failure. 902 * Should not round anything here. */ 903 sbnr = BM_SECT_TO_BIT(sector); 904 ebnr = BM_SECT_TO_BIT(esector); 905 } 906 907 count = update_sync_bits(device, sbnr, ebnr, mode); 908 out: 909 put_ldev(device); 910 return count; 911 } 912 913 static 914 struct bm_extent *_bme_get(struct drbd_device *device, unsigned int enr) 915 { 916 struct lc_element *e; 917 struct bm_extent *bm_ext; 918 int wakeup = 0; 919 unsigned long rs_flags; 920 921 spin_lock_irq(&device->al_lock); 922 if (device->resync_locked > device->resync->nr_elements/2) { 923 spin_unlock_irq(&device->al_lock); 924 return NULL; 925 } 926 e = lc_get(device->resync, enr); 927 bm_ext = e ? lc_entry(e, struct bm_extent, lce) : NULL; 928 if (bm_ext) { 929 if (bm_ext->lce.lc_number != enr) { 930 bm_ext->rs_left = drbd_bm_e_weight(device, enr); 931 bm_ext->rs_failed = 0; 932 lc_committed(device->resync); 933 wakeup = 1; 934 } 935 if (bm_ext->lce.refcnt == 1) 936 device->resync_locked++; 937 set_bit(BME_NO_WRITES, &bm_ext->flags); 938 } 939 rs_flags = device->resync->flags; 940 spin_unlock_irq(&device->al_lock); 941 if (wakeup) 942 wake_up(&device->al_wait); 943 944 if (!bm_ext) { 945 if (rs_flags & LC_STARVING) 946 drbd_warn(device, "Have to wait for element" 947 " (resync LRU too small?)\n"); 948 BUG_ON(rs_flags & LC_LOCKED); 949 } 950 951 return bm_ext; 952 } 953 954 static int _is_in_al(struct drbd_device *device, unsigned int enr) 955 { 956 int rv; 957 958 spin_lock_irq(&device->al_lock); 959 rv = lc_is_used(device->act_log, enr); 960 spin_unlock_irq(&device->al_lock); 961 962 return rv; 963 } 964 965 /** 966 * drbd_rs_begin_io() - Gets an extent in the resync LRU cache and sets it to BME_LOCKED 967 * @device: DRBD device. 968 * @sector: The sector number. 969 * 970 * This functions sleeps on al_wait. Returns 0 on success, -EINTR if interrupted. 971 */ 972 int drbd_rs_begin_io(struct drbd_device *device, sector_t sector) 973 { 974 unsigned int enr = BM_SECT_TO_EXT(sector); 975 struct bm_extent *bm_ext; 976 int i, sig; 977 bool sa; 978 979 retry: 980 sig = wait_event_interruptible(device->al_wait, 981 (bm_ext = _bme_get(device, enr))); 982 if (sig) 983 return -EINTR; 984 985 if (test_bit(BME_LOCKED, &bm_ext->flags)) 986 return 0; 987 988 /* step aside only while we are above c-min-rate; unless disabled. */ 989 sa = drbd_rs_c_min_rate_throttle(device); 990 991 for (i = 0; i < AL_EXT_PER_BM_SECT; i++) { 992 sig = wait_event_interruptible(device->al_wait, 993 !_is_in_al(device, enr * AL_EXT_PER_BM_SECT + i) || 994 (sa && test_bit(BME_PRIORITY, &bm_ext->flags))); 995 996 if (sig || (sa && test_bit(BME_PRIORITY, &bm_ext->flags))) { 997 spin_lock_irq(&device->al_lock); 998 if (lc_put(device->resync, &bm_ext->lce) == 0) { 999 bm_ext->flags = 0; /* clears BME_NO_WRITES and eventually BME_PRIORITY */ 1000 device->resync_locked--; 1001 wake_up(&device->al_wait); 1002 } 1003 spin_unlock_irq(&device->al_lock); 1004 if (sig) 1005 return -EINTR; 1006 if (schedule_timeout_interruptible(HZ/10)) 1007 return -EINTR; 1008 goto retry; 1009 } 1010 } 1011 set_bit(BME_LOCKED, &bm_ext->flags); 1012 return 0; 1013 } 1014 1015 /** 1016 * drbd_try_rs_begin_io() - Gets an extent in the resync LRU cache, does not sleep 1017 * @device: DRBD device. 1018 * @sector: The sector number. 1019 * 1020 * Gets an extent in the resync LRU cache, sets it to BME_NO_WRITES, then 1021 * tries to set it to BME_LOCKED. Returns 0 upon success, and -EAGAIN 1022 * if there is still application IO going on in this area. 1023 */ 1024 int drbd_try_rs_begin_io(struct drbd_device *device, sector_t sector) 1025 { 1026 unsigned int enr = BM_SECT_TO_EXT(sector); 1027 const unsigned int al_enr = enr*AL_EXT_PER_BM_SECT; 1028 struct lc_element *e; 1029 struct bm_extent *bm_ext; 1030 int i; 1031 bool throttle = drbd_rs_should_slow_down(device, sector, true); 1032 1033 /* If we need to throttle, a half-locked (only marked BME_NO_WRITES, 1034 * not yet BME_LOCKED) extent needs to be kicked out explicitly if we 1035 * need to throttle. There is at most one such half-locked extent, 1036 * which is remembered in resync_wenr. */ 1037 1038 if (throttle && device->resync_wenr != enr) 1039 return -EAGAIN; 1040 1041 spin_lock_irq(&device->al_lock); 1042 if (device->resync_wenr != LC_FREE && device->resync_wenr != enr) { 1043 /* in case you have very heavy scattered io, it may 1044 * stall the syncer undefined if we give up the ref count 1045 * when we try again and requeue. 1046 * 1047 * if we don't give up the refcount, but the next time 1048 * we are scheduled this extent has been "synced" by new 1049 * application writes, we'd miss the lc_put on the 1050 * extent we keep the refcount on. 1051 * so we remembered which extent we had to try again, and 1052 * if the next requested one is something else, we do 1053 * the lc_put here... 1054 * we also have to wake_up 1055 */ 1056 e = lc_find(device->resync, device->resync_wenr); 1057 bm_ext = e ? lc_entry(e, struct bm_extent, lce) : NULL; 1058 if (bm_ext) { 1059 D_ASSERT(device, !test_bit(BME_LOCKED, &bm_ext->flags)); 1060 D_ASSERT(device, test_bit(BME_NO_WRITES, &bm_ext->flags)); 1061 clear_bit(BME_NO_WRITES, &bm_ext->flags); 1062 device->resync_wenr = LC_FREE; 1063 if (lc_put(device->resync, &bm_ext->lce) == 0) { 1064 bm_ext->flags = 0; 1065 device->resync_locked--; 1066 } 1067 wake_up(&device->al_wait); 1068 } else { 1069 drbd_alert(device, "LOGIC BUG\n"); 1070 } 1071 } 1072 /* TRY. */ 1073 e = lc_try_get(device->resync, enr); 1074 bm_ext = e ? lc_entry(e, struct bm_extent, lce) : NULL; 1075 if (bm_ext) { 1076 if (test_bit(BME_LOCKED, &bm_ext->flags)) 1077 goto proceed; 1078 if (!test_and_set_bit(BME_NO_WRITES, &bm_ext->flags)) { 1079 device->resync_locked++; 1080 } else { 1081 /* we did set the BME_NO_WRITES, 1082 * but then could not set BME_LOCKED, 1083 * so we tried again. 1084 * drop the extra reference. */ 1085 bm_ext->lce.refcnt--; 1086 D_ASSERT(device, bm_ext->lce.refcnt > 0); 1087 } 1088 goto check_al; 1089 } else { 1090 /* do we rather want to try later? */ 1091 if (device->resync_locked > device->resync->nr_elements-3) 1092 goto try_again; 1093 /* Do or do not. There is no try. -- Yoda */ 1094 e = lc_get(device->resync, enr); 1095 bm_ext = e ? lc_entry(e, struct bm_extent, lce) : NULL; 1096 if (!bm_ext) { 1097 const unsigned long rs_flags = device->resync->flags; 1098 if (rs_flags & LC_STARVING) 1099 drbd_warn(device, "Have to wait for element" 1100 " (resync LRU too small?)\n"); 1101 BUG_ON(rs_flags & LC_LOCKED); 1102 goto try_again; 1103 } 1104 if (bm_ext->lce.lc_number != enr) { 1105 bm_ext->rs_left = drbd_bm_e_weight(device, enr); 1106 bm_ext->rs_failed = 0; 1107 lc_committed(device->resync); 1108 wake_up(&device->al_wait); 1109 D_ASSERT(device, test_bit(BME_LOCKED, &bm_ext->flags) == 0); 1110 } 1111 set_bit(BME_NO_WRITES, &bm_ext->flags); 1112 D_ASSERT(device, bm_ext->lce.refcnt == 1); 1113 device->resync_locked++; 1114 goto check_al; 1115 } 1116 check_al: 1117 for (i = 0; i < AL_EXT_PER_BM_SECT; i++) { 1118 if (lc_is_used(device->act_log, al_enr+i)) 1119 goto try_again; 1120 } 1121 set_bit(BME_LOCKED, &bm_ext->flags); 1122 proceed: 1123 device->resync_wenr = LC_FREE; 1124 spin_unlock_irq(&device->al_lock); 1125 return 0; 1126 1127 try_again: 1128 if (bm_ext) { 1129 if (throttle) { 1130 D_ASSERT(device, !test_bit(BME_LOCKED, &bm_ext->flags)); 1131 D_ASSERT(device, test_bit(BME_NO_WRITES, &bm_ext->flags)); 1132 clear_bit(BME_NO_WRITES, &bm_ext->flags); 1133 device->resync_wenr = LC_FREE; 1134 if (lc_put(device->resync, &bm_ext->lce) == 0) { 1135 bm_ext->flags = 0; 1136 device->resync_locked--; 1137 } 1138 wake_up(&device->al_wait); 1139 } else 1140 device->resync_wenr = enr; 1141 } 1142 spin_unlock_irq(&device->al_lock); 1143 return -EAGAIN; 1144 } 1145 1146 void drbd_rs_complete_io(struct drbd_device *device, sector_t sector) 1147 { 1148 unsigned int enr = BM_SECT_TO_EXT(sector); 1149 struct lc_element *e; 1150 struct bm_extent *bm_ext; 1151 unsigned long flags; 1152 1153 spin_lock_irqsave(&device->al_lock, flags); 1154 e = lc_find(device->resync, enr); 1155 bm_ext = e ? lc_entry(e, struct bm_extent, lce) : NULL; 1156 if (!bm_ext) { 1157 spin_unlock_irqrestore(&device->al_lock, flags); 1158 if (__ratelimit(&drbd_ratelimit_state)) 1159 drbd_err(device, "drbd_rs_complete_io() called, but extent not found\n"); 1160 return; 1161 } 1162 1163 if (bm_ext->lce.refcnt == 0) { 1164 spin_unlock_irqrestore(&device->al_lock, flags); 1165 drbd_err(device, "drbd_rs_complete_io(,%llu [=%u]) called, " 1166 "but refcnt is 0!?\n", 1167 (unsigned long long)sector, enr); 1168 return; 1169 } 1170 1171 if (lc_put(device->resync, &bm_ext->lce) == 0) { 1172 bm_ext->flags = 0; /* clear BME_LOCKED, BME_NO_WRITES and BME_PRIORITY */ 1173 device->resync_locked--; 1174 wake_up(&device->al_wait); 1175 } 1176 1177 spin_unlock_irqrestore(&device->al_lock, flags); 1178 } 1179 1180 /** 1181 * drbd_rs_cancel_all() - Removes all extents from the resync LRU (even BME_LOCKED) 1182 * @device: DRBD device. 1183 */ 1184 void drbd_rs_cancel_all(struct drbd_device *device) 1185 { 1186 spin_lock_irq(&device->al_lock); 1187 1188 if (get_ldev_if_state(device, D_FAILED)) { /* Makes sure ->resync is there. */ 1189 lc_reset(device->resync); 1190 put_ldev(device); 1191 } 1192 device->resync_locked = 0; 1193 device->resync_wenr = LC_FREE; 1194 spin_unlock_irq(&device->al_lock); 1195 wake_up(&device->al_wait); 1196 } 1197 1198 /** 1199 * drbd_rs_del_all() - Gracefully remove all extents from the resync LRU 1200 * @device: DRBD device. 1201 * 1202 * Returns 0 upon success, -EAGAIN if at least one reference count was 1203 * not zero. 1204 */ 1205 int drbd_rs_del_all(struct drbd_device *device) 1206 { 1207 struct lc_element *e; 1208 struct bm_extent *bm_ext; 1209 int i; 1210 1211 spin_lock_irq(&device->al_lock); 1212 1213 if (get_ldev_if_state(device, D_FAILED)) { 1214 /* ok, ->resync is there. */ 1215 for (i = 0; i < device->resync->nr_elements; i++) { 1216 e = lc_element_by_index(device->resync, i); 1217 bm_ext = lc_entry(e, struct bm_extent, lce); 1218 if (bm_ext->lce.lc_number == LC_FREE) 1219 continue; 1220 if (bm_ext->lce.lc_number == device->resync_wenr) { 1221 drbd_info(device, "dropping %u in drbd_rs_del_all, apparently" 1222 " got 'synced' by application io\n", 1223 device->resync_wenr); 1224 D_ASSERT(device, !test_bit(BME_LOCKED, &bm_ext->flags)); 1225 D_ASSERT(device, test_bit(BME_NO_WRITES, &bm_ext->flags)); 1226 clear_bit(BME_NO_WRITES, &bm_ext->flags); 1227 device->resync_wenr = LC_FREE; 1228 lc_put(device->resync, &bm_ext->lce); 1229 } 1230 if (bm_ext->lce.refcnt != 0) { 1231 drbd_info(device, "Retrying drbd_rs_del_all() later. " 1232 "refcnt=%d\n", bm_ext->lce.refcnt); 1233 put_ldev(device); 1234 spin_unlock_irq(&device->al_lock); 1235 return -EAGAIN; 1236 } 1237 D_ASSERT(device, !test_bit(BME_LOCKED, &bm_ext->flags)); 1238 D_ASSERT(device, !test_bit(BME_NO_WRITES, &bm_ext->flags)); 1239 lc_del(device->resync, &bm_ext->lce); 1240 } 1241 D_ASSERT(device, device->resync->used == 0); 1242 put_ldev(device); 1243 } 1244 spin_unlock_irq(&device->al_lock); 1245 wake_up(&device->al_wait); 1246 1247 return 0; 1248 } 1249