1 /* 2 * bcache journalling code, for btree insertions 3 * 4 * Copyright 2012 Google, Inc. 5 */ 6 7 #include "bcache.h" 8 #include "btree.h" 9 #include "debug.h" 10 #include "request.h" 11 12 #include <trace/events/bcache.h> 13 14 /* 15 * Journal replay/recovery: 16 * 17 * This code is all driven from run_cache_set(); we first read the journal 18 * entries, do some other stuff, then we mark all the keys in the journal 19 * entries (same as garbage collection would), then we replay them - reinserting 20 * them into the cache in precisely the same order as they appear in the 21 * journal. 22 * 23 * We only journal keys that go in leaf nodes, which simplifies things quite a 24 * bit. 25 */ 26 27 static void journal_read_endio(struct bio *bio, int error) 28 { 29 struct closure *cl = bio->bi_private; 30 closure_put(cl); 31 } 32 33 static int journal_read_bucket(struct cache *ca, struct list_head *list, 34 struct btree_op *op, unsigned bucket_index) 35 { 36 struct journal_device *ja = &ca->journal; 37 struct bio *bio = &ja->bio; 38 39 struct journal_replay *i; 40 struct jset *j, *data = ca->set->journal.w[0].data; 41 unsigned len, left, offset = 0; 42 int ret = 0; 43 sector_t bucket = bucket_to_sector(ca->set, ca->sb.d[bucket_index]); 44 45 pr_debug("reading %llu", (uint64_t) bucket); 46 47 while (offset < ca->sb.bucket_size) { 48 reread: left = ca->sb.bucket_size - offset; 49 len = min_t(unsigned, left, PAGE_SECTORS * 8); 50 51 bio_reset(bio); 52 bio->bi_sector = bucket + offset; 53 bio->bi_bdev = ca->bdev; 54 bio->bi_rw = READ; 55 bio->bi_size = len << 9; 56 57 bio->bi_end_io = journal_read_endio; 58 bio->bi_private = &op->cl; 59 bch_bio_map(bio, data); 60 61 closure_bio_submit(bio, &op->cl, ca); 62 closure_sync(&op->cl); 63 64 /* This function could be simpler now since we no longer write 65 * journal entries that overlap bucket boundaries; this means 66 * the start of a bucket will always have a valid journal entry 67 * if it has any journal entries at all. 68 */ 69 70 j = data; 71 while (len) { 72 struct list_head *where; 73 size_t blocks, bytes = set_bytes(j); 74 75 if (j->magic != jset_magic(ca->set)) 76 return ret; 77 78 if (bytes > left << 9) 79 return ret; 80 81 if (bytes > len << 9) 82 goto reread; 83 84 if (j->csum != csum_set(j)) 85 return ret; 86 87 blocks = set_blocks(j, ca->set); 88 89 while (!list_empty(list)) { 90 i = list_first_entry(list, 91 struct journal_replay, list); 92 if (i->j.seq >= j->last_seq) 93 break; 94 list_del(&i->list); 95 kfree(i); 96 } 97 98 list_for_each_entry_reverse(i, list, list) { 99 if (j->seq == i->j.seq) 100 goto next_set; 101 102 if (j->seq < i->j.last_seq) 103 goto next_set; 104 105 if (j->seq > i->j.seq) { 106 where = &i->list; 107 goto add; 108 } 109 } 110 111 where = list; 112 add: 113 i = kmalloc(offsetof(struct journal_replay, j) + 114 bytes, GFP_KERNEL); 115 if (!i) 116 return -ENOMEM; 117 memcpy(&i->j, j, bytes); 118 list_add(&i->list, where); 119 ret = 1; 120 121 ja->seq[bucket_index] = j->seq; 122 next_set: 123 offset += blocks * ca->sb.block_size; 124 len -= blocks * ca->sb.block_size; 125 j = ((void *) j) + blocks * block_bytes(ca); 126 } 127 } 128 129 return ret; 130 } 131 132 int bch_journal_read(struct cache_set *c, struct list_head *list, 133 struct btree_op *op) 134 { 135 #define read_bucket(b) \ 136 ({ \ 137 int ret = journal_read_bucket(ca, list, op, b); \ 138 __set_bit(b, bitmap); \ 139 if (ret < 0) \ 140 return ret; \ 141 ret; \ 142 }) 143 144 struct cache *ca; 145 unsigned iter; 146 147 for_each_cache(ca, c, iter) { 148 struct journal_device *ja = &ca->journal; 149 unsigned long bitmap[SB_JOURNAL_BUCKETS / BITS_PER_LONG]; 150 unsigned i, l, r, m; 151 uint64_t seq; 152 153 bitmap_zero(bitmap, SB_JOURNAL_BUCKETS); 154 pr_debug("%u journal buckets", ca->sb.njournal_buckets); 155 156 /* Read journal buckets ordered by golden ratio hash to quickly 157 * find a sequence of buckets with valid journal entries 158 */ 159 for (i = 0; i < ca->sb.njournal_buckets; i++) { 160 l = (i * 2654435769U) % ca->sb.njournal_buckets; 161 162 if (test_bit(l, bitmap)) 163 break; 164 165 if (read_bucket(l)) 166 goto bsearch; 167 } 168 169 /* If that fails, check all the buckets we haven't checked 170 * already 171 */ 172 pr_debug("falling back to linear search"); 173 174 for (l = 0; l < ca->sb.njournal_buckets; l++) { 175 if (test_bit(l, bitmap)) 176 continue; 177 178 if (read_bucket(l)) 179 goto bsearch; 180 } 181 bsearch: 182 /* Binary search */ 183 m = r = find_next_bit(bitmap, ca->sb.njournal_buckets, l + 1); 184 pr_debug("starting binary search, l %u r %u", l, r); 185 186 while (l + 1 < r) { 187 m = (l + r) >> 1; 188 189 if (read_bucket(m)) 190 l = m; 191 else 192 r = m; 193 } 194 195 /* Read buckets in reverse order until we stop finding more 196 * journal entries 197 */ 198 pr_debug("finishing up"); 199 l = m; 200 201 while (1) { 202 if (!l--) 203 l = ca->sb.njournal_buckets - 1; 204 205 if (l == m) 206 break; 207 208 if (test_bit(l, bitmap)) 209 continue; 210 211 if (!read_bucket(l)) 212 break; 213 } 214 215 seq = 0; 216 217 for (i = 0; i < ca->sb.njournal_buckets; i++) 218 if (ja->seq[i] > seq) { 219 seq = ja->seq[i]; 220 ja->cur_idx = ja->discard_idx = 221 ja->last_idx = i; 222 223 } 224 } 225 226 c->journal.seq = list_entry(list->prev, 227 struct journal_replay, 228 list)->j.seq; 229 230 return 0; 231 #undef read_bucket 232 } 233 234 void bch_journal_mark(struct cache_set *c, struct list_head *list) 235 { 236 atomic_t p = { 0 }; 237 struct bkey *k; 238 struct journal_replay *i; 239 struct journal *j = &c->journal; 240 uint64_t last = j->seq; 241 242 /* 243 * journal.pin should never fill up - we never write a journal 244 * entry when it would fill up. But if for some reason it does, we 245 * iterate over the list in reverse order so that we can just skip that 246 * refcount instead of bugging. 247 */ 248 249 list_for_each_entry_reverse(i, list, list) { 250 BUG_ON(last < i->j.seq); 251 i->pin = NULL; 252 253 while (last-- != i->j.seq) 254 if (fifo_free(&j->pin) > 1) { 255 fifo_push_front(&j->pin, p); 256 atomic_set(&fifo_front(&j->pin), 0); 257 } 258 259 if (fifo_free(&j->pin) > 1) { 260 fifo_push_front(&j->pin, p); 261 i->pin = &fifo_front(&j->pin); 262 atomic_set(i->pin, 1); 263 } 264 265 for (k = i->j.start; 266 k < end(&i->j); 267 k = bkey_next(k)) { 268 unsigned j; 269 270 for (j = 0; j < KEY_PTRS(k); j++) { 271 struct bucket *g = PTR_BUCKET(c, k, j); 272 atomic_inc(&g->pin); 273 274 if (g->prio == BTREE_PRIO && 275 !ptr_stale(c, k, j)) 276 g->prio = INITIAL_PRIO; 277 } 278 279 __bch_btree_mark_key(c, 0, k); 280 } 281 } 282 } 283 284 int bch_journal_replay(struct cache_set *s, struct list_head *list, 285 struct btree_op *op) 286 { 287 int ret = 0, keys = 0, entries = 0; 288 struct bkey *k; 289 struct journal_replay *i = 290 list_entry(list->prev, struct journal_replay, list); 291 292 uint64_t start = i->j.last_seq, end = i->j.seq, n = start; 293 294 list_for_each_entry(i, list, list) { 295 BUG_ON(i->pin && atomic_read(i->pin) != 1); 296 297 if (n != i->j.seq) 298 pr_err( 299 "journal entries %llu-%llu missing! (replaying %llu-%llu)\n", 300 n, i->j.seq - 1, start, end); 301 302 for (k = i->j.start; 303 k < end(&i->j); 304 k = bkey_next(k)) { 305 trace_bcache_journal_replay_key(k); 306 307 bkey_copy(op->keys.top, k); 308 bch_keylist_push(&op->keys); 309 310 op->journal = i->pin; 311 atomic_inc(op->journal); 312 313 ret = bch_btree_insert(op, s); 314 if (ret) 315 goto err; 316 317 BUG_ON(!bch_keylist_empty(&op->keys)); 318 keys++; 319 320 cond_resched(); 321 } 322 323 if (i->pin) 324 atomic_dec(i->pin); 325 n = i->j.seq + 1; 326 entries++; 327 } 328 329 pr_info("journal replay done, %i keys in %i entries, seq %llu", 330 keys, entries, end); 331 332 while (!list_empty(list)) { 333 i = list_first_entry(list, struct journal_replay, list); 334 list_del(&i->list); 335 kfree(i); 336 } 337 err: 338 closure_sync(&op->cl); 339 return ret; 340 } 341 342 /* Journalling */ 343 344 static void btree_flush_write(struct cache_set *c) 345 { 346 /* 347 * Try to find the btree node with that references the oldest journal 348 * entry, best is our current candidate and is locked if non NULL: 349 */ 350 struct btree *b, *best = NULL; 351 unsigned iter; 352 353 for_each_cached_btree(b, c, iter) { 354 if (!down_write_trylock(&b->lock)) 355 continue; 356 357 if (!btree_node_dirty(b) || 358 !btree_current_write(b)->journal) { 359 rw_unlock(true, b); 360 continue; 361 } 362 363 if (!best) 364 best = b; 365 else if (journal_pin_cmp(c, 366 btree_current_write(best), 367 btree_current_write(b))) { 368 rw_unlock(true, best); 369 best = b; 370 } else 371 rw_unlock(true, b); 372 } 373 374 if (best) 375 goto out; 376 377 /* We can't find the best btree node, just pick the first */ 378 list_for_each_entry(b, &c->btree_cache, list) 379 if (!b->level && btree_node_dirty(b)) { 380 best = b; 381 rw_lock(true, best, best->level); 382 goto found; 383 } 384 385 out: 386 if (!best) 387 return; 388 found: 389 if (btree_node_dirty(best)) 390 bch_btree_node_write(best, NULL); 391 rw_unlock(true, best); 392 } 393 394 #define last_seq(j) ((j)->seq - fifo_used(&(j)->pin) + 1) 395 396 static void journal_discard_endio(struct bio *bio, int error) 397 { 398 struct journal_device *ja = 399 container_of(bio, struct journal_device, discard_bio); 400 struct cache *ca = container_of(ja, struct cache, journal); 401 402 atomic_set(&ja->discard_in_flight, DISCARD_DONE); 403 404 closure_wake_up(&ca->set->journal.wait); 405 closure_put(&ca->set->cl); 406 } 407 408 static void journal_discard_work(struct work_struct *work) 409 { 410 struct journal_device *ja = 411 container_of(work, struct journal_device, discard_work); 412 413 submit_bio(0, &ja->discard_bio); 414 } 415 416 static void do_journal_discard(struct cache *ca) 417 { 418 struct journal_device *ja = &ca->journal; 419 struct bio *bio = &ja->discard_bio; 420 421 if (!ca->discard) { 422 ja->discard_idx = ja->last_idx; 423 return; 424 } 425 426 switch (atomic_read(&ja->discard_in_flight) == DISCARD_IN_FLIGHT) { 427 case DISCARD_IN_FLIGHT: 428 return; 429 430 case DISCARD_DONE: 431 ja->discard_idx = (ja->discard_idx + 1) % 432 ca->sb.njournal_buckets; 433 434 atomic_set(&ja->discard_in_flight, DISCARD_READY); 435 /* fallthrough */ 436 437 case DISCARD_READY: 438 if (ja->discard_idx == ja->last_idx) 439 return; 440 441 atomic_set(&ja->discard_in_flight, DISCARD_IN_FLIGHT); 442 443 bio_init(bio); 444 bio->bi_sector = bucket_to_sector(ca->set, 445 ca->sb.d[ja->discard_idx]); 446 bio->bi_bdev = ca->bdev; 447 bio->bi_rw = REQ_WRITE|REQ_DISCARD; 448 bio->bi_max_vecs = 1; 449 bio->bi_io_vec = bio->bi_inline_vecs; 450 bio->bi_size = bucket_bytes(ca); 451 bio->bi_end_io = journal_discard_endio; 452 453 closure_get(&ca->set->cl); 454 INIT_WORK(&ja->discard_work, journal_discard_work); 455 schedule_work(&ja->discard_work); 456 } 457 } 458 459 static void journal_reclaim(struct cache_set *c) 460 { 461 struct bkey *k = &c->journal.key; 462 struct cache *ca; 463 uint64_t last_seq; 464 unsigned iter, n = 0; 465 atomic_t p; 466 467 while (!atomic_read(&fifo_front(&c->journal.pin))) 468 fifo_pop(&c->journal.pin, p); 469 470 last_seq = last_seq(&c->journal); 471 472 /* Update last_idx */ 473 474 for_each_cache(ca, c, iter) { 475 struct journal_device *ja = &ca->journal; 476 477 while (ja->last_idx != ja->cur_idx && 478 ja->seq[ja->last_idx] < last_seq) 479 ja->last_idx = (ja->last_idx + 1) % 480 ca->sb.njournal_buckets; 481 } 482 483 for_each_cache(ca, c, iter) 484 do_journal_discard(ca); 485 486 if (c->journal.blocks_free) 487 return; 488 489 /* 490 * Allocate: 491 * XXX: Sort by free journal space 492 */ 493 494 for_each_cache(ca, c, iter) { 495 struct journal_device *ja = &ca->journal; 496 unsigned next = (ja->cur_idx + 1) % ca->sb.njournal_buckets; 497 498 /* No space available on this device */ 499 if (next == ja->discard_idx) 500 continue; 501 502 ja->cur_idx = next; 503 k->ptr[n++] = PTR(0, 504 bucket_to_sector(c, ca->sb.d[ja->cur_idx]), 505 ca->sb.nr_this_dev); 506 } 507 508 bkey_init(k); 509 SET_KEY_PTRS(k, n); 510 511 if (n) 512 c->journal.blocks_free = c->sb.bucket_size >> c->block_bits; 513 514 if (!journal_full(&c->journal)) 515 __closure_wake_up(&c->journal.wait); 516 } 517 518 void bch_journal_next(struct journal *j) 519 { 520 atomic_t p = { 1 }; 521 522 j->cur = (j->cur == j->w) 523 ? &j->w[1] 524 : &j->w[0]; 525 526 /* 527 * The fifo_push() needs to happen at the same time as j->seq is 528 * incremented for last_seq() to be calculated correctly 529 */ 530 BUG_ON(!fifo_push(&j->pin, p)); 531 atomic_set(&fifo_back(&j->pin), 1); 532 533 j->cur->data->seq = ++j->seq; 534 j->cur->need_write = false; 535 j->cur->data->keys = 0; 536 537 if (fifo_full(&j->pin)) 538 pr_debug("journal_pin full (%zu)", fifo_used(&j->pin)); 539 } 540 541 static void journal_write_endio(struct bio *bio, int error) 542 { 543 struct journal_write *w = bio->bi_private; 544 545 cache_set_err_on(error, w->c, "journal io error"); 546 closure_put(&w->c->journal.io.cl); 547 } 548 549 static void journal_write(struct closure *); 550 551 static void journal_write_done(struct closure *cl) 552 { 553 struct journal *j = container_of(cl, struct journal, io.cl); 554 struct cache_set *c = container_of(j, struct cache_set, journal); 555 556 struct journal_write *w = (j->cur == j->w) 557 ? &j->w[1] 558 : &j->w[0]; 559 560 __closure_wake_up(&w->wait); 561 562 if (c->journal_delay_ms) 563 closure_delay(&j->io, msecs_to_jiffies(c->journal_delay_ms)); 564 565 continue_at(cl, journal_write, system_wq); 566 } 567 568 static void journal_write_unlocked(struct closure *cl) 569 __releases(c->journal.lock) 570 { 571 struct cache_set *c = container_of(cl, struct cache_set, journal.io.cl); 572 struct cache *ca; 573 struct journal_write *w = c->journal.cur; 574 struct bkey *k = &c->journal.key; 575 unsigned i, sectors = set_blocks(w->data, c) * c->sb.block_size; 576 577 struct bio *bio; 578 struct bio_list list; 579 bio_list_init(&list); 580 581 if (!w->need_write) { 582 /* 583 * XXX: have to unlock closure before we unlock journal lock, 584 * else we race with bch_journal(). But this way we race 585 * against cache set unregister. Doh. 586 */ 587 set_closure_fn(cl, NULL, NULL); 588 closure_sub(cl, CLOSURE_RUNNING + 1); 589 spin_unlock(&c->journal.lock); 590 return; 591 } else if (journal_full(&c->journal)) { 592 journal_reclaim(c); 593 spin_unlock(&c->journal.lock); 594 595 btree_flush_write(c); 596 continue_at(cl, journal_write, system_wq); 597 } 598 599 c->journal.blocks_free -= set_blocks(w->data, c); 600 601 w->data->btree_level = c->root->level; 602 603 bkey_copy(&w->data->btree_root, &c->root->key); 604 bkey_copy(&w->data->uuid_bucket, &c->uuid_bucket); 605 606 for_each_cache(ca, c, i) 607 w->data->prio_bucket[ca->sb.nr_this_dev] = ca->prio_buckets[0]; 608 609 w->data->magic = jset_magic(c); 610 w->data->version = BCACHE_JSET_VERSION; 611 w->data->last_seq = last_seq(&c->journal); 612 w->data->csum = csum_set(w->data); 613 614 for (i = 0; i < KEY_PTRS(k); i++) { 615 ca = PTR_CACHE(c, k, i); 616 bio = &ca->journal.bio; 617 618 atomic_long_add(sectors, &ca->meta_sectors_written); 619 620 bio_reset(bio); 621 bio->bi_sector = PTR_OFFSET(k, i); 622 bio->bi_bdev = ca->bdev; 623 bio->bi_rw = REQ_WRITE|REQ_SYNC|REQ_META|REQ_FLUSH; 624 bio->bi_size = sectors << 9; 625 626 bio->bi_end_io = journal_write_endio; 627 bio->bi_private = w; 628 bch_bio_map(bio, w->data); 629 630 trace_bcache_journal_write(bio); 631 bio_list_add(&list, bio); 632 633 SET_PTR_OFFSET(k, i, PTR_OFFSET(k, i) + sectors); 634 635 ca->journal.seq[ca->journal.cur_idx] = w->data->seq; 636 } 637 638 atomic_dec_bug(&fifo_back(&c->journal.pin)); 639 bch_journal_next(&c->journal); 640 journal_reclaim(c); 641 642 spin_unlock(&c->journal.lock); 643 644 while ((bio = bio_list_pop(&list))) 645 closure_bio_submit(bio, cl, c->cache[0]); 646 647 continue_at(cl, journal_write_done, NULL); 648 } 649 650 static void journal_write(struct closure *cl) 651 { 652 struct cache_set *c = container_of(cl, struct cache_set, journal.io.cl); 653 654 spin_lock(&c->journal.lock); 655 journal_write_unlocked(cl); 656 } 657 658 static void __journal_try_write(struct cache_set *c, bool noflush) 659 __releases(c->journal.lock) 660 { 661 struct closure *cl = &c->journal.io.cl; 662 663 if (!closure_trylock(cl, &c->cl)) 664 spin_unlock(&c->journal.lock); 665 else if (noflush && journal_full(&c->journal)) { 666 spin_unlock(&c->journal.lock); 667 continue_at(cl, journal_write, system_wq); 668 } else 669 journal_write_unlocked(cl); 670 } 671 672 #define journal_try_write(c) __journal_try_write(c, false) 673 674 void bch_journal_meta(struct cache_set *c, struct closure *cl) 675 { 676 struct journal_write *w; 677 678 if (CACHE_SYNC(&c->sb)) { 679 spin_lock(&c->journal.lock); 680 681 w = c->journal.cur; 682 w->need_write = true; 683 684 if (cl) 685 BUG_ON(!closure_wait(&w->wait, cl)); 686 687 __journal_try_write(c, true); 688 } 689 } 690 691 /* 692 * Entry point to the journalling code - bio_insert() and btree_invalidate() 693 * pass bch_journal() a list of keys to be journalled, and then 694 * bch_journal() hands those same keys off to btree_insert_async() 695 */ 696 697 void bch_journal(struct closure *cl) 698 { 699 struct btree_op *op = container_of(cl, struct btree_op, cl); 700 struct cache_set *c = op->c; 701 struct journal_write *w; 702 size_t b, n = ((uint64_t *) op->keys.top) - op->keys.list; 703 704 if (op->type != BTREE_INSERT || 705 !CACHE_SYNC(&c->sb)) 706 goto out; 707 708 /* 709 * If we're looping because we errored, might already be waiting on 710 * another journal write: 711 */ 712 while (atomic_read(&cl->parent->remaining) & CLOSURE_WAITING) 713 closure_sync(cl->parent); 714 715 spin_lock(&c->journal.lock); 716 717 if (journal_full(&c->journal)) { 718 trace_bcache_journal_full(c); 719 720 closure_wait(&c->journal.wait, cl); 721 722 journal_reclaim(c); 723 spin_unlock(&c->journal.lock); 724 725 btree_flush_write(c); 726 continue_at(cl, bch_journal, bcache_wq); 727 } 728 729 w = c->journal.cur; 730 w->need_write = true; 731 b = __set_blocks(w->data, w->data->keys + n, c); 732 733 if (b * c->sb.block_size > PAGE_SECTORS << JSET_BITS || 734 b > c->journal.blocks_free) { 735 trace_bcache_journal_entry_full(c); 736 737 /* 738 * XXX: If we were inserting so many keys that they won't fit in 739 * an _empty_ journal write, we'll deadlock. For now, handle 740 * this in bch_keylist_realloc() - but something to think about. 741 */ 742 BUG_ON(!w->data->keys); 743 744 BUG_ON(!closure_wait(&w->wait, cl)); 745 746 closure_flush(&c->journal.io); 747 748 journal_try_write(c); 749 continue_at(cl, bch_journal, bcache_wq); 750 } 751 752 memcpy(end(w->data), op->keys.list, n * sizeof(uint64_t)); 753 w->data->keys += n; 754 755 op->journal = &fifo_back(&c->journal.pin); 756 atomic_inc(op->journal); 757 758 if (op->flush_journal) { 759 closure_flush(&c->journal.io); 760 closure_wait(&w->wait, cl->parent); 761 } 762 763 journal_try_write(c); 764 out: 765 bch_btree_insert_async(cl); 766 } 767 768 void bch_journal_free(struct cache_set *c) 769 { 770 free_pages((unsigned long) c->journal.w[1].data, JSET_BITS); 771 free_pages((unsigned long) c->journal.w[0].data, JSET_BITS); 772 free_fifo(&c->journal.pin); 773 } 774 775 int bch_journal_alloc(struct cache_set *c) 776 { 777 struct journal *j = &c->journal; 778 779 closure_init_unlocked(&j->io); 780 spin_lock_init(&j->lock); 781 782 c->journal_delay_ms = 100; 783 784 j->w[0].c = c; 785 j->w[1].c = c; 786 787 if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) || 788 !(j->w[0].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS)) || 789 !(j->w[1].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS))) 790 return -ENOMEM; 791 792 return 0; 793 } 794