1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * bcache journalling code, for btree insertions 4 * 5 * Copyright 2012 Google, Inc. 6 */ 7 8 #include "bcache.h" 9 #include "btree.h" 10 #include "debug.h" 11 #include "extents.h" 12 13 #include <trace/events/bcache.h> 14 15 /* 16 * Journal replay/recovery: 17 * 18 * This code is all driven from run_cache_set(); we first read the journal 19 * entries, do some other stuff, then we mark all the keys in the journal 20 * entries (same as garbage collection would), then we replay them - reinserting 21 * them into the cache in precisely the same order as they appear in the 22 * journal. 23 * 24 * We only journal keys that go in leaf nodes, which simplifies things quite a 25 * bit. 26 */ 27 28 static void journal_read_endio(struct bio *bio) 29 { 30 struct closure *cl = bio->bi_private; 31 closure_put(cl); 32 } 33 34 static int journal_read_bucket(struct cache *ca, struct list_head *list, 35 unsigned bucket_index) 36 { 37 struct journal_device *ja = &ca->journal; 38 struct bio *bio = &ja->bio; 39 40 struct journal_replay *i; 41 struct jset *j, *data = ca->set->journal.w[0].data; 42 struct closure cl; 43 unsigned len, left, offset = 0; 44 int ret = 0; 45 sector_t bucket = bucket_to_sector(ca->set, ca->sb.d[bucket_index]); 46 47 closure_init_stack(&cl); 48 49 pr_debug("reading %u", bucket_index); 50 51 while (offset < ca->sb.bucket_size) { 52 reread: left = ca->sb.bucket_size - offset; 53 len = min_t(unsigned, left, PAGE_SECTORS << JSET_BITS); 54 55 bio_reset(bio); 56 bio->bi_iter.bi_sector = bucket + offset; 57 bio_set_dev(bio, ca->bdev); 58 bio->bi_iter.bi_size = len << 9; 59 60 bio->bi_end_io = journal_read_endio; 61 bio->bi_private = &cl; 62 bio_set_op_attrs(bio, REQ_OP_READ, 0); 63 bch_bio_map(bio, data); 64 65 closure_bio_submit(ca->set, bio, &cl); 66 closure_sync(&cl); 67 68 /* This function could be simpler now since we no longer write 69 * journal entries that overlap bucket boundaries; this means 70 * the start of a bucket will always have a valid journal entry 71 * if it has any journal entries at all. 72 */ 73 74 j = data; 75 while (len) { 76 struct list_head *where; 77 size_t blocks, bytes = set_bytes(j); 78 79 if (j->magic != jset_magic(&ca->sb)) { 80 pr_debug("%u: bad magic", bucket_index); 81 return ret; 82 } 83 84 if (bytes > left << 9 || 85 bytes > PAGE_SIZE << JSET_BITS) { 86 pr_info("%u: too big, %zu bytes, offset %u", 87 bucket_index, bytes, offset); 88 return ret; 89 } 90 91 if (bytes > len << 9) 92 goto reread; 93 94 if (j->csum != csum_set(j)) { 95 pr_info("%u: bad csum, %zu bytes, offset %u", 96 bucket_index, bytes, offset); 97 return ret; 98 } 99 100 blocks = set_blocks(j, block_bytes(ca->set)); 101 102 while (!list_empty(list)) { 103 i = list_first_entry(list, 104 struct journal_replay, list); 105 if (i->j.seq >= j->last_seq) 106 break; 107 list_del(&i->list); 108 kfree(i); 109 } 110 111 list_for_each_entry_reverse(i, list, list) { 112 if (j->seq == i->j.seq) 113 goto next_set; 114 115 if (j->seq < i->j.last_seq) 116 goto next_set; 117 118 if (j->seq > i->j.seq) { 119 where = &i->list; 120 goto add; 121 } 122 } 123 124 where = list; 125 add: 126 i = kmalloc(offsetof(struct journal_replay, j) + 127 bytes, GFP_KERNEL); 128 if (!i) 129 return -ENOMEM; 130 memcpy(&i->j, j, bytes); 131 list_add(&i->list, where); 132 ret = 1; 133 134 ja->seq[bucket_index] = j->seq; 135 next_set: 136 offset += blocks * ca->sb.block_size; 137 len -= blocks * ca->sb.block_size; 138 j = ((void *) j) + blocks * block_bytes(ca); 139 } 140 } 141 142 return ret; 143 } 144 145 int bch_journal_read(struct cache_set *c, struct list_head *list) 146 { 147 #define read_bucket(b) \ 148 ({ \ 149 int ret = journal_read_bucket(ca, list, b); \ 150 __set_bit(b, bitmap); \ 151 if (ret < 0) \ 152 return ret; \ 153 ret; \ 154 }) 155 156 struct cache *ca; 157 unsigned iter; 158 159 for_each_cache(ca, c, iter) { 160 struct journal_device *ja = &ca->journal; 161 DECLARE_BITMAP(bitmap, SB_JOURNAL_BUCKETS); 162 unsigned i, l, r, m; 163 uint64_t seq; 164 165 bitmap_zero(bitmap, SB_JOURNAL_BUCKETS); 166 pr_debug("%u journal buckets", ca->sb.njournal_buckets); 167 168 /* 169 * Read journal buckets ordered by golden ratio hash to quickly 170 * find a sequence of buckets with valid journal entries 171 */ 172 for (i = 0; i < ca->sb.njournal_buckets; i++) { 173 /* 174 * We must try the index l with ZERO first for 175 * correctness due to the scenario that the journal 176 * bucket is circular buffer which might have wrapped 177 */ 178 l = (i * 2654435769U) % ca->sb.njournal_buckets; 179 180 if (test_bit(l, bitmap)) 181 break; 182 183 if (read_bucket(l)) 184 goto bsearch; 185 } 186 187 /* 188 * If that fails, check all the buckets we haven't checked 189 * already 190 */ 191 pr_debug("falling back to linear search"); 192 193 for (l = find_first_zero_bit(bitmap, ca->sb.njournal_buckets); 194 l < ca->sb.njournal_buckets; 195 l = find_next_zero_bit(bitmap, ca->sb.njournal_buckets, l + 1)) 196 if (read_bucket(l)) 197 goto bsearch; 198 199 /* no journal entries on this device? */ 200 if (l == ca->sb.njournal_buckets) 201 continue; 202 bsearch: 203 BUG_ON(list_empty(list)); 204 205 /* Binary search */ 206 m = l; 207 r = find_next_bit(bitmap, ca->sb.njournal_buckets, l + 1); 208 pr_debug("starting binary search, l %u r %u", l, r); 209 210 while (l + 1 < r) { 211 seq = list_entry(list->prev, struct journal_replay, 212 list)->j.seq; 213 214 m = (l + r) >> 1; 215 read_bucket(m); 216 217 if (seq != list_entry(list->prev, struct journal_replay, 218 list)->j.seq) 219 l = m; 220 else 221 r = m; 222 } 223 224 /* 225 * Read buckets in reverse order until we stop finding more 226 * journal entries 227 */ 228 pr_debug("finishing up: m %u njournal_buckets %u", 229 m, ca->sb.njournal_buckets); 230 l = m; 231 232 while (1) { 233 if (!l--) 234 l = ca->sb.njournal_buckets - 1; 235 236 if (l == m) 237 break; 238 239 if (test_bit(l, bitmap)) 240 continue; 241 242 if (!read_bucket(l)) 243 break; 244 } 245 246 seq = 0; 247 248 for (i = 0; i < ca->sb.njournal_buckets; i++) 249 if (ja->seq[i] > seq) { 250 seq = ja->seq[i]; 251 /* 252 * When journal_reclaim() goes to allocate for 253 * the first time, it'll use the bucket after 254 * ja->cur_idx 255 */ 256 ja->cur_idx = i; 257 ja->last_idx = ja->discard_idx = (i + 1) % 258 ca->sb.njournal_buckets; 259 260 } 261 } 262 263 if (!list_empty(list)) 264 c->journal.seq = list_entry(list->prev, 265 struct journal_replay, 266 list)->j.seq; 267 268 return 0; 269 #undef read_bucket 270 } 271 272 void bch_journal_mark(struct cache_set *c, struct list_head *list) 273 { 274 atomic_t p = { 0 }; 275 struct bkey *k; 276 struct journal_replay *i; 277 struct journal *j = &c->journal; 278 uint64_t last = j->seq; 279 280 /* 281 * journal.pin should never fill up - we never write a journal 282 * entry when it would fill up. But if for some reason it does, we 283 * iterate over the list in reverse order so that we can just skip that 284 * refcount instead of bugging. 285 */ 286 287 list_for_each_entry_reverse(i, list, list) { 288 BUG_ON(last < i->j.seq); 289 i->pin = NULL; 290 291 while (last-- != i->j.seq) 292 if (fifo_free(&j->pin) > 1) { 293 fifo_push_front(&j->pin, p); 294 atomic_set(&fifo_front(&j->pin), 0); 295 } 296 297 if (fifo_free(&j->pin) > 1) { 298 fifo_push_front(&j->pin, p); 299 i->pin = &fifo_front(&j->pin); 300 atomic_set(i->pin, 1); 301 } 302 303 for (k = i->j.start; 304 k < bset_bkey_last(&i->j); 305 k = bkey_next(k)) 306 if (!__bch_extent_invalid(c, k)) { 307 unsigned j; 308 309 for (j = 0; j < KEY_PTRS(k); j++) 310 if (ptr_available(c, k, j)) 311 atomic_inc(&PTR_BUCKET(c, k, j)->pin); 312 313 bch_initial_mark_key(c, 0, k); 314 } 315 } 316 } 317 318 int bch_journal_replay(struct cache_set *s, struct list_head *list) 319 { 320 int ret = 0, keys = 0, entries = 0; 321 struct bkey *k; 322 struct journal_replay *i = 323 list_entry(list->prev, struct journal_replay, list); 324 325 uint64_t start = i->j.last_seq, end = i->j.seq, n = start; 326 struct keylist keylist; 327 328 list_for_each_entry(i, list, list) { 329 BUG_ON(i->pin && atomic_read(i->pin) != 1); 330 331 cache_set_err_on(n != i->j.seq, s, 332 "bcache: journal entries %llu-%llu missing! (replaying %llu-%llu)", 333 n, i->j.seq - 1, start, end); 334 335 for (k = i->j.start; 336 k < bset_bkey_last(&i->j); 337 k = bkey_next(k)) { 338 trace_bcache_journal_replay_key(k); 339 340 bch_keylist_init_single(&keylist, k); 341 342 ret = bch_btree_insert(s, &keylist, i->pin, NULL); 343 if (ret) 344 goto err; 345 346 BUG_ON(!bch_keylist_empty(&keylist)); 347 keys++; 348 349 cond_resched(); 350 } 351 352 if (i->pin) 353 atomic_dec(i->pin); 354 n = i->j.seq + 1; 355 entries++; 356 } 357 358 pr_info("journal replay done, %i keys in %i entries, seq %llu", 359 keys, entries, end); 360 err: 361 while (!list_empty(list)) { 362 i = list_first_entry(list, struct journal_replay, list); 363 list_del(&i->list); 364 kfree(i); 365 } 366 367 return ret; 368 } 369 370 /* Journalling */ 371 #define journal_max_cmp(l, r) \ 372 (fifo_idx(&c->journal.pin, btree_current_write(l)->journal) < \ 373 fifo_idx(&(c)->journal.pin, btree_current_write(r)->journal)) 374 #define journal_min_cmp(l, r) \ 375 (fifo_idx(&c->journal.pin, btree_current_write(l)->journal) > \ 376 fifo_idx(&(c)->journal.pin, btree_current_write(r)->journal)) 377 378 static void btree_flush_write(struct cache_set *c) 379 { 380 /* 381 * Try to find the btree node with that references the oldest journal 382 * entry, best is our current candidate and is locked if non NULL: 383 */ 384 struct btree *b; 385 int i; 386 387 atomic_long_inc(&c->flush_write); 388 389 retry: 390 spin_lock(&c->journal.lock); 391 if (heap_empty(&c->flush_btree)) { 392 for_each_cached_btree(b, c, i) 393 if (btree_current_write(b)->journal) { 394 if (!heap_full(&c->flush_btree)) 395 heap_add(&c->flush_btree, b, 396 journal_max_cmp); 397 else if (journal_max_cmp(b, 398 heap_peek(&c->flush_btree))) { 399 c->flush_btree.data[0] = b; 400 heap_sift(&c->flush_btree, 0, 401 journal_max_cmp); 402 } 403 } 404 405 for (i = c->flush_btree.used / 2 - 1; i >= 0; --i) 406 heap_sift(&c->flush_btree, i, journal_min_cmp); 407 } 408 409 b = NULL; 410 heap_pop(&c->flush_btree, b, journal_min_cmp); 411 spin_unlock(&c->journal.lock); 412 413 if (b) { 414 mutex_lock(&b->write_lock); 415 if (!btree_current_write(b)->journal) { 416 mutex_unlock(&b->write_lock); 417 /* We raced */ 418 atomic_long_inc(&c->retry_flush_write); 419 goto retry; 420 } 421 422 __bch_btree_node_write(b, NULL); 423 mutex_unlock(&b->write_lock); 424 } 425 } 426 427 #define last_seq(j) ((j)->seq - fifo_used(&(j)->pin) + 1) 428 429 static void journal_discard_endio(struct bio *bio) 430 { 431 struct journal_device *ja = 432 container_of(bio, struct journal_device, discard_bio); 433 struct cache *ca = container_of(ja, struct cache, journal); 434 435 atomic_set(&ja->discard_in_flight, DISCARD_DONE); 436 437 closure_wake_up(&ca->set->journal.wait); 438 closure_put(&ca->set->cl); 439 } 440 441 static void journal_discard_work(struct work_struct *work) 442 { 443 struct journal_device *ja = 444 container_of(work, struct journal_device, discard_work); 445 446 submit_bio(&ja->discard_bio); 447 } 448 449 static void do_journal_discard(struct cache *ca) 450 { 451 struct journal_device *ja = &ca->journal; 452 struct bio *bio = &ja->discard_bio; 453 454 if (!ca->discard) { 455 ja->discard_idx = ja->last_idx; 456 return; 457 } 458 459 switch (atomic_read(&ja->discard_in_flight)) { 460 case DISCARD_IN_FLIGHT: 461 return; 462 463 case DISCARD_DONE: 464 ja->discard_idx = (ja->discard_idx + 1) % 465 ca->sb.njournal_buckets; 466 467 atomic_set(&ja->discard_in_flight, DISCARD_READY); 468 /* fallthrough */ 469 470 case DISCARD_READY: 471 if (ja->discard_idx == ja->last_idx) 472 return; 473 474 atomic_set(&ja->discard_in_flight, DISCARD_IN_FLIGHT); 475 476 bio_init(bio, bio->bi_inline_vecs, 1); 477 bio_set_op_attrs(bio, REQ_OP_DISCARD, 0); 478 bio->bi_iter.bi_sector = bucket_to_sector(ca->set, 479 ca->sb.d[ja->discard_idx]); 480 bio_set_dev(bio, ca->bdev); 481 bio->bi_iter.bi_size = bucket_bytes(ca); 482 bio->bi_end_io = journal_discard_endio; 483 484 closure_get(&ca->set->cl); 485 INIT_WORK(&ja->discard_work, journal_discard_work); 486 schedule_work(&ja->discard_work); 487 } 488 } 489 490 static void journal_reclaim(struct cache_set *c) 491 { 492 struct bkey *k = &c->journal.key; 493 struct cache *ca; 494 uint64_t last_seq; 495 unsigned iter, n = 0; 496 atomic_t p __maybe_unused; 497 498 atomic_long_inc(&c->reclaim); 499 500 while (!atomic_read(&fifo_front(&c->journal.pin))) 501 fifo_pop(&c->journal.pin, p); 502 503 last_seq = last_seq(&c->journal); 504 505 /* Update last_idx */ 506 507 for_each_cache(ca, c, iter) { 508 struct journal_device *ja = &ca->journal; 509 510 while (ja->last_idx != ja->cur_idx && 511 ja->seq[ja->last_idx] < last_seq) 512 ja->last_idx = (ja->last_idx + 1) % 513 ca->sb.njournal_buckets; 514 } 515 516 for_each_cache(ca, c, iter) 517 do_journal_discard(ca); 518 519 if (c->journal.blocks_free) 520 goto out; 521 522 /* 523 * Allocate: 524 * XXX: Sort by free journal space 525 */ 526 527 for_each_cache(ca, c, iter) { 528 struct journal_device *ja = &ca->journal; 529 unsigned next = (ja->cur_idx + 1) % ca->sb.njournal_buckets; 530 531 /* No space available on this device */ 532 if (next == ja->discard_idx) 533 continue; 534 535 ja->cur_idx = next; 536 k->ptr[n++] = MAKE_PTR(0, 537 bucket_to_sector(c, ca->sb.d[ja->cur_idx]), 538 ca->sb.nr_this_dev); 539 } 540 541 bkey_init(k); 542 SET_KEY_PTRS(k, n); 543 544 if (n) 545 c->journal.blocks_free = c->sb.bucket_size >> c->block_bits; 546 out: 547 if (!journal_full(&c->journal)) 548 __closure_wake_up(&c->journal.wait); 549 } 550 551 void bch_journal_next(struct journal *j) 552 { 553 atomic_t p = { 1 }; 554 555 j->cur = (j->cur == j->w) 556 ? &j->w[1] 557 : &j->w[0]; 558 559 /* 560 * The fifo_push() needs to happen at the same time as j->seq is 561 * incremented for last_seq() to be calculated correctly 562 */ 563 BUG_ON(!fifo_push(&j->pin, p)); 564 atomic_set(&fifo_back(&j->pin), 1); 565 566 j->cur->data->seq = ++j->seq; 567 j->cur->dirty = false; 568 j->cur->need_write = false; 569 j->cur->data->keys = 0; 570 571 if (fifo_full(&j->pin)) 572 pr_debug("journal_pin full (%zu)", fifo_used(&j->pin)); 573 } 574 575 static void journal_write_endio(struct bio *bio) 576 { 577 struct journal_write *w = bio->bi_private; 578 579 cache_set_err_on(bio->bi_status, w->c, "journal io error"); 580 closure_put(&w->c->journal.io); 581 } 582 583 static void journal_write(struct closure *); 584 585 static void journal_write_done(struct closure *cl) 586 { 587 struct journal *j = container_of(cl, struct journal, io); 588 struct journal_write *w = (j->cur == j->w) 589 ? &j->w[1] 590 : &j->w[0]; 591 592 __closure_wake_up(&w->wait); 593 continue_at_nobarrier(cl, journal_write, system_wq); 594 } 595 596 static void journal_write_unlock(struct closure *cl) 597 __releases(&c->journal.lock) 598 { 599 struct cache_set *c = container_of(cl, struct cache_set, journal.io); 600 601 c->journal.io_in_flight = 0; 602 spin_unlock(&c->journal.lock); 603 } 604 605 static void journal_write_unlocked(struct closure *cl) 606 __releases(c->journal.lock) 607 { 608 struct cache_set *c = container_of(cl, struct cache_set, journal.io); 609 struct cache *ca; 610 struct journal_write *w = c->journal.cur; 611 struct bkey *k = &c->journal.key; 612 unsigned i, sectors = set_blocks(w->data, block_bytes(c)) * 613 c->sb.block_size; 614 615 struct bio *bio; 616 struct bio_list list; 617 bio_list_init(&list); 618 619 if (!w->need_write) { 620 closure_return_with_destructor(cl, journal_write_unlock); 621 return; 622 } else if (journal_full(&c->journal)) { 623 journal_reclaim(c); 624 spin_unlock(&c->journal.lock); 625 626 btree_flush_write(c); 627 continue_at(cl, journal_write, system_wq); 628 return; 629 } 630 631 c->journal.blocks_free -= set_blocks(w->data, block_bytes(c)); 632 633 w->data->btree_level = c->root->level; 634 635 bkey_copy(&w->data->btree_root, &c->root->key); 636 bkey_copy(&w->data->uuid_bucket, &c->uuid_bucket); 637 638 for_each_cache(ca, c, i) 639 w->data->prio_bucket[ca->sb.nr_this_dev] = ca->prio_buckets[0]; 640 641 w->data->magic = jset_magic(&c->sb); 642 w->data->version = BCACHE_JSET_VERSION; 643 w->data->last_seq = last_seq(&c->journal); 644 w->data->csum = csum_set(w->data); 645 646 for (i = 0; i < KEY_PTRS(k); i++) { 647 ca = PTR_CACHE(c, k, i); 648 bio = &ca->journal.bio; 649 650 atomic_long_add(sectors, &ca->meta_sectors_written); 651 652 bio_reset(bio); 653 bio->bi_iter.bi_sector = PTR_OFFSET(k, i); 654 bio_set_dev(bio, ca->bdev); 655 bio->bi_iter.bi_size = sectors << 9; 656 657 bio->bi_end_io = journal_write_endio; 658 bio->bi_private = w; 659 bio_set_op_attrs(bio, REQ_OP_WRITE, 660 REQ_SYNC|REQ_META|REQ_PREFLUSH|REQ_FUA); 661 bch_bio_map(bio, w->data); 662 663 trace_bcache_journal_write(bio); 664 bio_list_add(&list, bio); 665 666 SET_PTR_OFFSET(k, i, PTR_OFFSET(k, i) + sectors); 667 668 ca->journal.seq[ca->journal.cur_idx] = w->data->seq; 669 } 670 671 atomic_dec_bug(&fifo_back(&c->journal.pin)); 672 bch_journal_next(&c->journal); 673 journal_reclaim(c); 674 675 spin_unlock(&c->journal.lock); 676 677 while ((bio = bio_list_pop(&list))) 678 closure_bio_submit(c, bio, cl); 679 680 continue_at(cl, journal_write_done, NULL); 681 } 682 683 static void journal_write(struct closure *cl) 684 { 685 struct cache_set *c = container_of(cl, struct cache_set, journal.io); 686 687 spin_lock(&c->journal.lock); 688 journal_write_unlocked(cl); 689 } 690 691 static void journal_try_write(struct cache_set *c) 692 __releases(c->journal.lock) 693 { 694 struct closure *cl = &c->journal.io; 695 struct journal_write *w = c->journal.cur; 696 697 w->need_write = true; 698 699 if (!c->journal.io_in_flight) { 700 c->journal.io_in_flight = 1; 701 closure_call(cl, journal_write_unlocked, NULL, &c->cl); 702 } else { 703 spin_unlock(&c->journal.lock); 704 } 705 } 706 707 static struct journal_write *journal_wait_for_write(struct cache_set *c, 708 unsigned nkeys) 709 __acquires(&c->journal.lock) 710 { 711 size_t sectors; 712 struct closure cl; 713 bool wait = false; 714 715 closure_init_stack(&cl); 716 717 spin_lock(&c->journal.lock); 718 719 while (1) { 720 struct journal_write *w = c->journal.cur; 721 722 sectors = __set_blocks(w->data, w->data->keys + nkeys, 723 block_bytes(c)) * c->sb.block_size; 724 725 if (sectors <= min_t(size_t, 726 c->journal.blocks_free * c->sb.block_size, 727 PAGE_SECTORS << JSET_BITS)) 728 return w; 729 730 if (wait) 731 closure_wait(&c->journal.wait, &cl); 732 733 if (!journal_full(&c->journal)) { 734 if (wait) 735 trace_bcache_journal_entry_full(c); 736 737 /* 738 * XXX: If we were inserting so many keys that they 739 * won't fit in an _empty_ journal write, we'll 740 * deadlock. For now, handle this in 741 * bch_keylist_realloc() - but something to think about. 742 */ 743 BUG_ON(!w->data->keys); 744 745 journal_try_write(c); /* unlocks */ 746 } else { 747 if (wait) 748 trace_bcache_journal_full(c); 749 750 journal_reclaim(c); 751 spin_unlock(&c->journal.lock); 752 753 btree_flush_write(c); 754 } 755 756 closure_sync(&cl); 757 spin_lock(&c->journal.lock); 758 wait = true; 759 } 760 } 761 762 static void journal_write_work(struct work_struct *work) 763 { 764 struct cache_set *c = container_of(to_delayed_work(work), 765 struct cache_set, 766 journal.work); 767 spin_lock(&c->journal.lock); 768 if (c->journal.cur->dirty) 769 journal_try_write(c); 770 else 771 spin_unlock(&c->journal.lock); 772 } 773 774 /* 775 * Entry point to the journalling code - bio_insert() and btree_invalidate() 776 * pass bch_journal() a list of keys to be journalled, and then 777 * bch_journal() hands those same keys off to btree_insert_async() 778 */ 779 780 atomic_t *bch_journal(struct cache_set *c, 781 struct keylist *keys, 782 struct closure *parent) 783 { 784 struct journal_write *w; 785 atomic_t *ret; 786 787 if (!CACHE_SYNC(&c->sb)) 788 return NULL; 789 790 w = journal_wait_for_write(c, bch_keylist_nkeys(keys)); 791 792 memcpy(bset_bkey_last(w->data), keys->keys, bch_keylist_bytes(keys)); 793 w->data->keys += bch_keylist_nkeys(keys); 794 795 ret = &fifo_back(&c->journal.pin); 796 atomic_inc(ret); 797 798 if (parent) { 799 closure_wait(&w->wait, parent); 800 journal_try_write(c); 801 } else if (!w->dirty) { 802 w->dirty = true; 803 schedule_delayed_work(&c->journal.work, 804 msecs_to_jiffies(c->journal_delay_ms)); 805 spin_unlock(&c->journal.lock); 806 } else { 807 spin_unlock(&c->journal.lock); 808 } 809 810 811 return ret; 812 } 813 814 void bch_journal_meta(struct cache_set *c, struct closure *cl) 815 { 816 struct keylist keys; 817 atomic_t *ref; 818 819 bch_keylist_init(&keys); 820 821 ref = bch_journal(c, &keys, cl); 822 if (ref) 823 atomic_dec_bug(ref); 824 } 825 826 void bch_journal_free(struct cache_set *c) 827 { 828 free_pages((unsigned long) c->journal.w[1].data, JSET_BITS); 829 free_pages((unsigned long) c->journal.w[0].data, JSET_BITS); 830 free_fifo(&c->journal.pin); 831 } 832 833 int bch_journal_alloc(struct cache_set *c) 834 { 835 struct journal *j = &c->journal; 836 837 spin_lock_init(&j->lock); 838 INIT_DELAYED_WORK(&j->work, journal_write_work); 839 840 c->journal_delay_ms = 100; 841 842 j->w[0].c = c; 843 j->w[1].c = c; 844 845 if (!(init_heap(&c->flush_btree, 128, GFP_KERNEL)) || 846 !(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) || 847 !(j->w[0].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS)) || 848 !(j->w[1].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS))) 849 return -ENOMEM; 850 851 return 0; 852 } 853