1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * bcache journalling code, for btree insertions 4 * 5 * Copyright 2012 Google, Inc. 6 */ 7 8 #include "bcache.h" 9 #include "btree.h" 10 #include "debug.h" 11 #include "extents.h" 12 13 #include <trace/events/bcache.h> 14 15 /* 16 * Journal replay/recovery: 17 * 18 * This code is all driven from run_cache_set(); we first read the journal 19 * entries, do some other stuff, then we mark all the keys in the journal 20 * entries (same as garbage collection would), then we replay them - reinserting 21 * them into the cache in precisely the same order as they appear in the 22 * journal. 23 * 24 * We only journal keys that go in leaf nodes, which simplifies things quite a 25 * bit. 26 */ 27 28 static void journal_read_endio(struct bio *bio) 29 { 30 struct closure *cl = bio->bi_private; 31 closure_put(cl); 32 } 33 34 static int journal_read_bucket(struct cache *ca, struct list_head *list, 35 unsigned bucket_index) 36 { 37 struct journal_device *ja = &ca->journal; 38 struct bio *bio = &ja->bio; 39 40 struct journal_replay *i; 41 struct jset *j, *data = ca->set->journal.w[0].data; 42 struct closure cl; 43 unsigned len, left, offset = 0; 44 int ret = 0; 45 sector_t bucket = bucket_to_sector(ca->set, ca->sb.d[bucket_index]); 46 47 closure_init_stack(&cl); 48 49 pr_debug("reading %u", bucket_index); 50 51 while (offset < ca->sb.bucket_size) { 52 reread: left = ca->sb.bucket_size - offset; 53 len = min_t(unsigned, left, PAGE_SECTORS << JSET_BITS); 54 55 bio_reset(bio); 56 bio->bi_iter.bi_sector = bucket + offset; 57 bio_set_dev(bio, ca->bdev); 58 bio->bi_iter.bi_size = len << 9; 59 60 bio->bi_end_io = journal_read_endio; 61 bio->bi_private = &cl; 62 bio_set_op_attrs(bio, REQ_OP_READ, 0); 63 bch_bio_map(bio, data); 64 65 closure_bio_submit(bio, &cl); 66 closure_sync(&cl); 67 68 /* This function could be simpler now since we no longer write 69 * journal entries that overlap bucket boundaries; this means 70 * the start of a bucket will always have a valid journal entry 71 * if it has any journal entries at all. 72 */ 73 74 j = data; 75 while (len) { 76 struct list_head *where; 77 size_t blocks, bytes = set_bytes(j); 78 79 if (j->magic != jset_magic(&ca->sb)) { 80 pr_debug("%u: bad magic", bucket_index); 81 return ret; 82 } 83 84 if (bytes > left << 9 || 85 bytes > PAGE_SIZE << JSET_BITS) { 86 pr_info("%u: too big, %zu bytes, offset %u", 87 bucket_index, bytes, offset); 88 return ret; 89 } 90 91 if (bytes > len << 9) 92 goto reread; 93 94 if (j->csum != csum_set(j)) { 95 pr_info("%u: bad csum, %zu bytes, offset %u", 96 bucket_index, bytes, offset); 97 return ret; 98 } 99 100 blocks = set_blocks(j, block_bytes(ca->set)); 101 102 while (!list_empty(list)) { 103 i = list_first_entry(list, 104 struct journal_replay, list); 105 if (i->j.seq >= j->last_seq) 106 break; 107 list_del(&i->list); 108 kfree(i); 109 } 110 111 list_for_each_entry_reverse(i, list, list) { 112 if (j->seq == i->j.seq) 113 goto next_set; 114 115 if (j->seq < i->j.last_seq) 116 goto next_set; 117 118 if (j->seq > i->j.seq) { 119 where = &i->list; 120 goto add; 121 } 122 } 123 124 where = list; 125 add: 126 i = kmalloc(offsetof(struct journal_replay, j) + 127 bytes, GFP_KERNEL); 128 if (!i) 129 return -ENOMEM; 130 memcpy(&i->j, j, bytes); 131 list_add(&i->list, where); 132 ret = 1; 133 134 ja->seq[bucket_index] = j->seq; 135 next_set: 136 offset += blocks * ca->sb.block_size; 137 len -= blocks * ca->sb.block_size; 138 j = ((void *) j) + blocks * block_bytes(ca); 139 } 140 } 141 142 return ret; 143 } 144 145 int bch_journal_read(struct cache_set *c, struct list_head *list) 146 { 147 #define read_bucket(b) \ 148 ({ \ 149 int ret = journal_read_bucket(ca, list, b); \ 150 __set_bit(b, bitmap); \ 151 if (ret < 0) \ 152 return ret; \ 153 ret; \ 154 }) 155 156 struct cache *ca; 157 unsigned iter; 158 159 for_each_cache(ca, c, iter) { 160 struct journal_device *ja = &ca->journal; 161 DECLARE_BITMAP(bitmap, SB_JOURNAL_BUCKETS); 162 unsigned i, l, r, m; 163 uint64_t seq; 164 165 bitmap_zero(bitmap, SB_JOURNAL_BUCKETS); 166 pr_debug("%u journal buckets", ca->sb.njournal_buckets); 167 168 /* 169 * Read journal buckets ordered by golden ratio hash to quickly 170 * find a sequence of buckets with valid journal entries 171 */ 172 for (i = 0; i < ca->sb.njournal_buckets; i++) { 173 /* 174 * We must try the index l with ZERO first for 175 * correctness due to the scenario that the journal 176 * bucket is circular buffer which might have wrapped 177 */ 178 l = (i * 2654435769U) % ca->sb.njournal_buckets; 179 180 if (test_bit(l, bitmap)) 181 break; 182 183 if (read_bucket(l)) 184 goto bsearch; 185 } 186 187 /* 188 * If that fails, check all the buckets we haven't checked 189 * already 190 */ 191 pr_debug("falling back to linear search"); 192 193 for (l = find_first_zero_bit(bitmap, ca->sb.njournal_buckets); 194 l < ca->sb.njournal_buckets; 195 l = find_next_zero_bit(bitmap, ca->sb.njournal_buckets, l + 1)) 196 if (read_bucket(l)) 197 goto bsearch; 198 199 /* no journal entries on this device? */ 200 if (l == ca->sb.njournal_buckets) 201 continue; 202 bsearch: 203 BUG_ON(list_empty(list)); 204 205 /* Binary search */ 206 m = l; 207 r = find_next_bit(bitmap, ca->sb.njournal_buckets, l + 1); 208 pr_debug("starting binary search, l %u r %u", l, r); 209 210 while (l + 1 < r) { 211 seq = list_entry(list->prev, struct journal_replay, 212 list)->j.seq; 213 214 m = (l + r) >> 1; 215 read_bucket(m); 216 217 if (seq != list_entry(list->prev, struct journal_replay, 218 list)->j.seq) 219 l = m; 220 else 221 r = m; 222 } 223 224 /* 225 * Read buckets in reverse order until we stop finding more 226 * journal entries 227 */ 228 pr_debug("finishing up: m %u njournal_buckets %u", 229 m, ca->sb.njournal_buckets); 230 l = m; 231 232 while (1) { 233 if (!l--) 234 l = ca->sb.njournal_buckets - 1; 235 236 if (l == m) 237 break; 238 239 if (test_bit(l, bitmap)) 240 continue; 241 242 if (!read_bucket(l)) 243 break; 244 } 245 246 seq = 0; 247 248 for (i = 0; i < ca->sb.njournal_buckets; i++) 249 if (ja->seq[i] > seq) { 250 seq = ja->seq[i]; 251 /* 252 * When journal_reclaim() goes to allocate for 253 * the first time, it'll use the bucket after 254 * ja->cur_idx 255 */ 256 ja->cur_idx = i; 257 ja->last_idx = ja->discard_idx = (i + 1) % 258 ca->sb.njournal_buckets; 259 260 } 261 } 262 263 if (!list_empty(list)) 264 c->journal.seq = list_entry(list->prev, 265 struct journal_replay, 266 list)->j.seq; 267 268 return 0; 269 #undef read_bucket 270 } 271 272 void bch_journal_mark(struct cache_set *c, struct list_head *list) 273 { 274 atomic_t p = { 0 }; 275 struct bkey *k; 276 struct journal_replay *i; 277 struct journal *j = &c->journal; 278 uint64_t last = j->seq; 279 280 /* 281 * journal.pin should never fill up - we never write a journal 282 * entry when it would fill up. But if for some reason it does, we 283 * iterate over the list in reverse order so that we can just skip that 284 * refcount instead of bugging. 285 */ 286 287 list_for_each_entry_reverse(i, list, list) { 288 BUG_ON(last < i->j.seq); 289 i->pin = NULL; 290 291 while (last-- != i->j.seq) 292 if (fifo_free(&j->pin) > 1) { 293 fifo_push_front(&j->pin, p); 294 atomic_set(&fifo_front(&j->pin), 0); 295 } 296 297 if (fifo_free(&j->pin) > 1) { 298 fifo_push_front(&j->pin, p); 299 i->pin = &fifo_front(&j->pin); 300 atomic_set(i->pin, 1); 301 } 302 303 for (k = i->j.start; 304 k < bset_bkey_last(&i->j); 305 k = bkey_next(k)) 306 if (!__bch_extent_invalid(c, k)) { 307 unsigned j; 308 309 for (j = 0; j < KEY_PTRS(k); j++) 310 if (ptr_available(c, k, j)) 311 atomic_inc(&PTR_BUCKET(c, k, j)->pin); 312 313 bch_initial_mark_key(c, 0, k); 314 } 315 } 316 } 317 318 int bch_journal_replay(struct cache_set *s, struct list_head *list) 319 { 320 int ret = 0, keys = 0, entries = 0; 321 struct bkey *k; 322 struct journal_replay *i = 323 list_entry(list->prev, struct journal_replay, list); 324 325 uint64_t start = i->j.last_seq, end = i->j.seq, n = start; 326 struct keylist keylist; 327 328 list_for_each_entry(i, list, list) { 329 BUG_ON(i->pin && atomic_read(i->pin) != 1); 330 331 cache_set_err_on(n != i->j.seq, s, 332 "bcache: journal entries %llu-%llu missing! (replaying %llu-%llu)", 333 n, i->j.seq - 1, start, end); 334 335 for (k = i->j.start; 336 k < bset_bkey_last(&i->j); 337 k = bkey_next(k)) { 338 trace_bcache_journal_replay_key(k); 339 340 bch_keylist_init_single(&keylist, k); 341 342 ret = bch_btree_insert(s, &keylist, i->pin, NULL); 343 if (ret) 344 goto err; 345 346 BUG_ON(!bch_keylist_empty(&keylist)); 347 keys++; 348 349 cond_resched(); 350 } 351 352 if (i->pin) 353 atomic_dec(i->pin); 354 n = i->j.seq + 1; 355 entries++; 356 } 357 358 pr_info("journal replay done, %i keys in %i entries, seq %llu", 359 keys, entries, end); 360 err: 361 while (!list_empty(list)) { 362 i = list_first_entry(list, struct journal_replay, list); 363 list_del(&i->list); 364 kfree(i); 365 } 366 367 return ret; 368 } 369 370 /* Journalling */ 371 #define journal_max_cmp(l, r) \ 372 (fifo_idx(&c->journal.pin, btree_current_write(l)->journal) < \ 373 fifo_idx(&(c)->journal.pin, btree_current_write(r)->journal)) 374 #define journal_min_cmp(l, r) \ 375 (fifo_idx(&c->journal.pin, btree_current_write(l)->journal) > \ 376 fifo_idx(&(c)->journal.pin, btree_current_write(r)->journal)) 377 378 static void btree_flush_write(struct cache_set *c) 379 { 380 /* 381 * Try to find the btree node with that references the oldest journal 382 * entry, best is our current candidate and is locked if non NULL: 383 */ 384 struct btree *b; 385 int i; 386 387 atomic_long_inc(&c->flush_write); 388 389 retry: 390 spin_lock(&c->journal.lock); 391 if (heap_empty(&c->flush_btree)) { 392 for_each_cached_btree(b, c, i) 393 if (btree_current_write(b)->journal) { 394 if (!heap_full(&c->flush_btree)) 395 heap_add(&c->flush_btree, b, 396 journal_max_cmp); 397 else if (journal_max_cmp(b, 398 heap_peek(&c->flush_btree))) { 399 c->flush_btree.data[0] = b; 400 heap_sift(&c->flush_btree, 0, 401 journal_max_cmp); 402 } 403 } 404 405 for (i = c->flush_btree.used / 2 - 1; i >= 0; --i) 406 heap_sift(&c->flush_btree, i, journal_min_cmp); 407 } 408 409 b = NULL; 410 heap_pop(&c->flush_btree, b, journal_min_cmp); 411 spin_unlock(&c->journal.lock); 412 413 if (b) { 414 mutex_lock(&b->write_lock); 415 if (!btree_current_write(b)->journal) { 416 mutex_unlock(&b->write_lock); 417 /* We raced */ 418 atomic_long_inc(&c->retry_flush_write); 419 goto retry; 420 } 421 422 __bch_btree_node_write(b, NULL); 423 mutex_unlock(&b->write_lock); 424 } 425 } 426 427 #define last_seq(j) ((j)->seq - fifo_used(&(j)->pin) + 1) 428 429 static void journal_discard_endio(struct bio *bio) 430 { 431 struct journal_device *ja = 432 container_of(bio, struct journal_device, discard_bio); 433 struct cache *ca = container_of(ja, struct cache, journal); 434 435 atomic_set(&ja->discard_in_flight, DISCARD_DONE); 436 437 closure_wake_up(&ca->set->journal.wait); 438 closure_put(&ca->set->cl); 439 } 440 441 static void journal_discard_work(struct work_struct *work) 442 { 443 struct journal_device *ja = 444 container_of(work, struct journal_device, discard_work); 445 446 submit_bio(&ja->discard_bio); 447 } 448 449 static void do_journal_discard(struct cache *ca) 450 { 451 struct journal_device *ja = &ca->journal; 452 struct bio *bio = &ja->discard_bio; 453 454 if (!ca->discard) { 455 ja->discard_idx = ja->last_idx; 456 return; 457 } 458 459 switch (atomic_read(&ja->discard_in_flight)) { 460 case DISCARD_IN_FLIGHT: 461 return; 462 463 case DISCARD_DONE: 464 ja->discard_idx = (ja->discard_idx + 1) % 465 ca->sb.njournal_buckets; 466 467 atomic_set(&ja->discard_in_flight, DISCARD_READY); 468 /* fallthrough */ 469 470 case DISCARD_READY: 471 if (ja->discard_idx == ja->last_idx) 472 return; 473 474 atomic_set(&ja->discard_in_flight, DISCARD_IN_FLIGHT); 475 476 bio_init(bio, bio->bi_inline_vecs, 1); 477 bio_set_op_attrs(bio, REQ_OP_DISCARD, 0); 478 bio->bi_iter.bi_sector = bucket_to_sector(ca->set, 479 ca->sb.d[ja->discard_idx]); 480 bio_set_dev(bio, ca->bdev); 481 bio->bi_iter.bi_size = bucket_bytes(ca); 482 bio->bi_end_io = journal_discard_endio; 483 484 closure_get(&ca->set->cl); 485 INIT_WORK(&ja->discard_work, journal_discard_work); 486 schedule_work(&ja->discard_work); 487 } 488 } 489 490 static void journal_reclaim(struct cache_set *c) 491 { 492 struct bkey *k = &c->journal.key; 493 struct cache *ca; 494 uint64_t last_seq; 495 unsigned iter, n = 0; 496 atomic_t p; 497 498 atomic_long_inc(&c->reclaim); 499 500 while (!atomic_read(&fifo_front(&c->journal.pin))) 501 fifo_pop(&c->journal.pin, p); 502 503 last_seq = last_seq(&c->journal); 504 505 /* Update last_idx */ 506 507 for_each_cache(ca, c, iter) { 508 struct journal_device *ja = &ca->journal; 509 510 while (ja->last_idx != ja->cur_idx && 511 ja->seq[ja->last_idx] < last_seq) 512 ja->last_idx = (ja->last_idx + 1) % 513 ca->sb.njournal_buckets; 514 } 515 516 for_each_cache(ca, c, iter) 517 do_journal_discard(ca); 518 519 if (c->journal.blocks_free) 520 goto out; 521 522 /* 523 * Allocate: 524 * XXX: Sort by free journal space 525 */ 526 527 for_each_cache(ca, c, iter) { 528 struct journal_device *ja = &ca->journal; 529 unsigned next = (ja->cur_idx + 1) % ca->sb.njournal_buckets; 530 531 /* No space available on this device */ 532 if (next == ja->discard_idx) 533 continue; 534 535 ja->cur_idx = next; 536 k->ptr[n++] = MAKE_PTR(0, 537 bucket_to_sector(c, ca->sb.d[ja->cur_idx]), 538 ca->sb.nr_this_dev); 539 } 540 541 bkey_init(k); 542 SET_KEY_PTRS(k, n); 543 544 if (n) 545 c->journal.blocks_free = c->sb.bucket_size >> c->block_bits; 546 out: 547 if (!journal_full(&c->journal)) 548 __closure_wake_up(&c->journal.wait); 549 } 550 551 void bch_journal_next(struct journal *j) 552 { 553 atomic_t p = { 1 }; 554 555 j->cur = (j->cur == j->w) 556 ? &j->w[1] 557 : &j->w[0]; 558 559 /* 560 * The fifo_push() needs to happen at the same time as j->seq is 561 * incremented for last_seq() to be calculated correctly 562 */ 563 BUG_ON(!fifo_push(&j->pin, p)); 564 atomic_set(&fifo_back(&j->pin), 1); 565 566 j->cur->data->seq = ++j->seq; 567 j->cur->dirty = false; 568 j->cur->need_write = false; 569 j->cur->data->keys = 0; 570 571 if (fifo_full(&j->pin)) 572 pr_debug("journal_pin full (%zu)", fifo_used(&j->pin)); 573 } 574 575 static void journal_write_endio(struct bio *bio) 576 { 577 struct journal_write *w = bio->bi_private; 578 579 cache_set_err_on(bio->bi_status, w->c, "journal io error"); 580 closure_put(&w->c->journal.io); 581 } 582 583 static void journal_write(struct closure *); 584 585 static void journal_write_done(struct closure *cl) 586 { 587 struct journal *j = container_of(cl, struct journal, io); 588 struct journal_write *w = (j->cur == j->w) 589 ? &j->w[1] 590 : &j->w[0]; 591 592 __closure_wake_up(&w->wait); 593 continue_at_nobarrier(cl, journal_write, system_wq); 594 } 595 596 static void journal_write_unlock(struct closure *cl) 597 { 598 struct cache_set *c = container_of(cl, struct cache_set, journal.io); 599 600 c->journal.io_in_flight = 0; 601 spin_unlock(&c->journal.lock); 602 } 603 604 static void journal_write_unlocked(struct closure *cl) 605 __releases(c->journal.lock) 606 { 607 struct cache_set *c = container_of(cl, struct cache_set, journal.io); 608 struct cache *ca; 609 struct journal_write *w = c->journal.cur; 610 struct bkey *k = &c->journal.key; 611 unsigned i, sectors = set_blocks(w->data, block_bytes(c)) * 612 c->sb.block_size; 613 614 struct bio *bio; 615 struct bio_list list; 616 bio_list_init(&list); 617 618 if (!w->need_write) { 619 closure_return_with_destructor(cl, journal_write_unlock); 620 return; 621 } else if (journal_full(&c->journal)) { 622 journal_reclaim(c); 623 spin_unlock(&c->journal.lock); 624 625 btree_flush_write(c); 626 continue_at(cl, journal_write, system_wq); 627 return; 628 } 629 630 c->journal.blocks_free -= set_blocks(w->data, block_bytes(c)); 631 632 w->data->btree_level = c->root->level; 633 634 bkey_copy(&w->data->btree_root, &c->root->key); 635 bkey_copy(&w->data->uuid_bucket, &c->uuid_bucket); 636 637 for_each_cache(ca, c, i) 638 w->data->prio_bucket[ca->sb.nr_this_dev] = ca->prio_buckets[0]; 639 640 w->data->magic = jset_magic(&c->sb); 641 w->data->version = BCACHE_JSET_VERSION; 642 w->data->last_seq = last_seq(&c->journal); 643 w->data->csum = csum_set(w->data); 644 645 for (i = 0; i < KEY_PTRS(k); i++) { 646 ca = PTR_CACHE(c, k, i); 647 bio = &ca->journal.bio; 648 649 atomic_long_add(sectors, &ca->meta_sectors_written); 650 651 bio_reset(bio); 652 bio->bi_iter.bi_sector = PTR_OFFSET(k, i); 653 bio_set_dev(bio, ca->bdev); 654 bio->bi_iter.bi_size = sectors << 9; 655 656 bio->bi_end_io = journal_write_endio; 657 bio->bi_private = w; 658 bio_set_op_attrs(bio, REQ_OP_WRITE, 659 REQ_SYNC|REQ_META|REQ_PREFLUSH|REQ_FUA); 660 bch_bio_map(bio, w->data); 661 662 trace_bcache_journal_write(bio); 663 bio_list_add(&list, bio); 664 665 SET_PTR_OFFSET(k, i, PTR_OFFSET(k, i) + sectors); 666 667 ca->journal.seq[ca->journal.cur_idx] = w->data->seq; 668 } 669 670 atomic_dec_bug(&fifo_back(&c->journal.pin)); 671 bch_journal_next(&c->journal); 672 journal_reclaim(c); 673 674 spin_unlock(&c->journal.lock); 675 676 while ((bio = bio_list_pop(&list))) 677 closure_bio_submit(bio, cl); 678 679 continue_at(cl, journal_write_done, NULL); 680 } 681 682 static void journal_write(struct closure *cl) 683 { 684 struct cache_set *c = container_of(cl, struct cache_set, journal.io); 685 686 spin_lock(&c->journal.lock); 687 journal_write_unlocked(cl); 688 } 689 690 static void journal_try_write(struct cache_set *c) 691 __releases(c->journal.lock) 692 { 693 struct closure *cl = &c->journal.io; 694 struct journal_write *w = c->journal.cur; 695 696 w->need_write = true; 697 698 if (!c->journal.io_in_flight) { 699 c->journal.io_in_flight = 1; 700 closure_call(cl, journal_write_unlocked, NULL, &c->cl); 701 } else { 702 spin_unlock(&c->journal.lock); 703 } 704 } 705 706 static struct journal_write *journal_wait_for_write(struct cache_set *c, 707 unsigned nkeys) 708 { 709 size_t sectors; 710 struct closure cl; 711 bool wait = false; 712 713 closure_init_stack(&cl); 714 715 spin_lock(&c->journal.lock); 716 717 while (1) { 718 struct journal_write *w = c->journal.cur; 719 720 sectors = __set_blocks(w->data, w->data->keys + nkeys, 721 block_bytes(c)) * c->sb.block_size; 722 723 if (sectors <= min_t(size_t, 724 c->journal.blocks_free * c->sb.block_size, 725 PAGE_SECTORS << JSET_BITS)) 726 return w; 727 728 if (wait) 729 closure_wait(&c->journal.wait, &cl); 730 731 if (!journal_full(&c->journal)) { 732 if (wait) 733 trace_bcache_journal_entry_full(c); 734 735 /* 736 * XXX: If we were inserting so many keys that they 737 * won't fit in an _empty_ journal write, we'll 738 * deadlock. For now, handle this in 739 * bch_keylist_realloc() - but something to think about. 740 */ 741 BUG_ON(!w->data->keys); 742 743 journal_try_write(c); /* unlocks */ 744 } else { 745 if (wait) 746 trace_bcache_journal_full(c); 747 748 journal_reclaim(c); 749 spin_unlock(&c->journal.lock); 750 751 btree_flush_write(c); 752 } 753 754 closure_sync(&cl); 755 spin_lock(&c->journal.lock); 756 wait = true; 757 } 758 } 759 760 static void journal_write_work(struct work_struct *work) 761 { 762 struct cache_set *c = container_of(to_delayed_work(work), 763 struct cache_set, 764 journal.work); 765 spin_lock(&c->journal.lock); 766 if (c->journal.cur->dirty) 767 journal_try_write(c); 768 else 769 spin_unlock(&c->journal.lock); 770 } 771 772 /* 773 * Entry point to the journalling code - bio_insert() and btree_invalidate() 774 * pass bch_journal() a list of keys to be journalled, and then 775 * bch_journal() hands those same keys off to btree_insert_async() 776 */ 777 778 atomic_t *bch_journal(struct cache_set *c, 779 struct keylist *keys, 780 struct closure *parent) 781 { 782 struct journal_write *w; 783 atomic_t *ret; 784 785 if (!CACHE_SYNC(&c->sb)) 786 return NULL; 787 788 w = journal_wait_for_write(c, bch_keylist_nkeys(keys)); 789 790 memcpy(bset_bkey_last(w->data), keys->keys, bch_keylist_bytes(keys)); 791 w->data->keys += bch_keylist_nkeys(keys); 792 793 ret = &fifo_back(&c->journal.pin); 794 atomic_inc(ret); 795 796 if (parent) { 797 closure_wait(&w->wait, parent); 798 journal_try_write(c); 799 } else if (!w->dirty) { 800 w->dirty = true; 801 schedule_delayed_work(&c->journal.work, 802 msecs_to_jiffies(c->journal_delay_ms)); 803 spin_unlock(&c->journal.lock); 804 } else { 805 spin_unlock(&c->journal.lock); 806 } 807 808 809 return ret; 810 } 811 812 void bch_journal_meta(struct cache_set *c, struct closure *cl) 813 { 814 struct keylist keys; 815 atomic_t *ref; 816 817 bch_keylist_init(&keys); 818 819 ref = bch_journal(c, &keys, cl); 820 if (ref) 821 atomic_dec_bug(ref); 822 } 823 824 void bch_journal_free(struct cache_set *c) 825 { 826 free_pages((unsigned long) c->journal.w[1].data, JSET_BITS); 827 free_pages((unsigned long) c->journal.w[0].data, JSET_BITS); 828 free_fifo(&c->journal.pin); 829 } 830 831 int bch_journal_alloc(struct cache_set *c) 832 { 833 struct journal *j = &c->journal; 834 835 spin_lock_init(&j->lock); 836 INIT_DELAYED_WORK(&j->work, journal_write_work); 837 838 c->journal_delay_ms = 100; 839 840 j->w[0].c = c; 841 j->w[1].c = c; 842 843 if (!(init_heap(&c->flush_btree, 128, GFP_KERNEL)) || 844 !(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) || 845 !(j->w[0].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS)) || 846 !(j->w[1].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS))) 847 return -ENOMEM; 848 849 return 0; 850 } 851