1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * bcache journalling code, for btree insertions 4 * 5 * Copyright 2012 Google, Inc. 6 */ 7 8 #include "bcache.h" 9 #include "btree.h" 10 #include "debug.h" 11 #include "extents.h" 12 13 #include <trace/events/bcache.h> 14 15 /* 16 * Journal replay/recovery: 17 * 18 * This code is all driven from run_cache_set(); we first read the journal 19 * entries, do some other stuff, then we mark all the keys in the journal 20 * entries (same as garbage collection would), then we replay them - reinserting 21 * them into the cache in precisely the same order as they appear in the 22 * journal. 23 * 24 * We only journal keys that go in leaf nodes, which simplifies things quite a 25 * bit. 26 */ 27 28 static void journal_read_endio(struct bio *bio) 29 { 30 struct closure *cl = bio->bi_private; 31 32 closure_put(cl); 33 } 34 35 static int journal_read_bucket(struct cache *ca, struct list_head *list, 36 unsigned int bucket_index) 37 { 38 struct journal_device *ja = &ca->journal; 39 struct bio *bio = &ja->bio; 40 41 struct journal_replay *i; 42 struct jset *j, *data = ca->set->journal.w[0].data; 43 struct closure cl; 44 unsigned int len, left, offset = 0; 45 int ret = 0; 46 sector_t bucket = bucket_to_sector(ca->set, ca->sb.d[bucket_index]); 47 48 closure_init_stack(&cl); 49 50 pr_debug("reading %u\n", bucket_index); 51 52 while (offset < ca->sb.bucket_size) { 53 reread: left = ca->sb.bucket_size - offset; 54 len = min_t(unsigned int, left, PAGE_SECTORS << JSET_BITS); 55 56 bio_reset(bio, ca->bdev, REQ_OP_READ); 57 bio->bi_iter.bi_sector = bucket + offset; 58 bio->bi_iter.bi_size = len << 9; 59 60 bio->bi_end_io = journal_read_endio; 61 bio->bi_private = &cl; 62 bch_bio_map(bio, data); 63 64 closure_bio_submit(ca->set, bio, &cl); 65 closure_sync(&cl); 66 67 /* This function could be simpler now since we no longer write 68 * journal entries that overlap bucket boundaries; this means 69 * the start of a bucket will always have a valid journal entry 70 * if it has any journal entries at all. 71 */ 72 73 j = data; 74 while (len) { 75 struct list_head *where; 76 size_t blocks, bytes = set_bytes(j); 77 78 if (j->magic != jset_magic(&ca->sb)) { 79 pr_debug("%u: bad magic\n", bucket_index); 80 return ret; 81 } 82 83 if (bytes > left << 9 || 84 bytes > PAGE_SIZE << JSET_BITS) { 85 pr_info("%u: too big, %zu bytes, offset %u\n", 86 bucket_index, bytes, offset); 87 return ret; 88 } 89 90 if (bytes > len << 9) 91 goto reread; 92 93 if (j->csum != csum_set(j)) { 94 pr_info("%u: bad csum, %zu bytes, offset %u\n", 95 bucket_index, bytes, offset); 96 return ret; 97 } 98 99 blocks = set_blocks(j, block_bytes(ca)); 100 101 /* 102 * Nodes in 'list' are in linear increasing order of 103 * i->j.seq, the node on head has the smallest (oldest) 104 * journal seq, the node on tail has the biggest 105 * (latest) journal seq. 106 */ 107 108 /* 109 * Check from the oldest jset for last_seq. If 110 * i->j.seq < j->last_seq, it means the oldest jset 111 * in list is expired and useless, remove it from 112 * this list. Otherwise, j is a candidate jset for 113 * further following checks. 114 */ 115 while (!list_empty(list)) { 116 i = list_first_entry(list, 117 struct journal_replay, list); 118 if (i->j.seq >= j->last_seq) 119 break; 120 list_del(&i->list); 121 kfree(i); 122 } 123 124 /* iterate list in reverse order (from latest jset) */ 125 list_for_each_entry_reverse(i, list, list) { 126 if (j->seq == i->j.seq) 127 goto next_set; 128 129 /* 130 * if j->seq is less than any i->j.last_seq 131 * in list, j is an expired and useless jset. 132 */ 133 if (j->seq < i->j.last_seq) 134 goto next_set; 135 136 /* 137 * 'where' points to first jset in list which 138 * is elder then j. 139 */ 140 if (j->seq > i->j.seq) { 141 where = &i->list; 142 goto add; 143 } 144 } 145 146 where = list; 147 add: 148 i = kmalloc(offsetof(struct journal_replay, j) + 149 bytes, GFP_KERNEL); 150 if (!i) 151 return -ENOMEM; 152 memcpy(&i->j, j, bytes); 153 /* Add to the location after 'where' points to */ 154 list_add(&i->list, where); 155 ret = 1; 156 157 if (j->seq > ja->seq[bucket_index]) 158 ja->seq[bucket_index] = j->seq; 159 next_set: 160 offset += blocks * ca->sb.block_size; 161 len -= blocks * ca->sb.block_size; 162 j = ((void *) j) + blocks * block_bytes(ca); 163 } 164 } 165 166 return ret; 167 } 168 169 int bch_journal_read(struct cache_set *c, struct list_head *list) 170 { 171 #define read_bucket(b) \ 172 ({ \ 173 ret = journal_read_bucket(ca, list, b); \ 174 __set_bit(b, bitmap); \ 175 if (ret < 0) \ 176 return ret; \ 177 ret; \ 178 }) 179 180 struct cache *ca = c->cache; 181 int ret = 0; 182 struct journal_device *ja = &ca->journal; 183 DECLARE_BITMAP(bitmap, SB_JOURNAL_BUCKETS); 184 unsigned int i, l, r, m; 185 uint64_t seq; 186 187 bitmap_zero(bitmap, SB_JOURNAL_BUCKETS); 188 pr_debug("%u journal buckets\n", ca->sb.njournal_buckets); 189 190 /* 191 * Read journal buckets ordered by golden ratio hash to quickly 192 * find a sequence of buckets with valid journal entries 193 */ 194 for (i = 0; i < ca->sb.njournal_buckets; i++) { 195 /* 196 * We must try the index l with ZERO first for 197 * correctness due to the scenario that the journal 198 * bucket is circular buffer which might have wrapped 199 */ 200 l = (i * 2654435769U) % ca->sb.njournal_buckets; 201 202 if (test_bit(l, bitmap)) 203 break; 204 205 if (read_bucket(l)) 206 goto bsearch; 207 } 208 209 /* 210 * If that fails, check all the buckets we haven't checked 211 * already 212 */ 213 pr_debug("falling back to linear search\n"); 214 215 for_each_clear_bit(l, bitmap, ca->sb.njournal_buckets) 216 if (read_bucket(l)) 217 goto bsearch; 218 219 /* no journal entries on this device? */ 220 if (l == ca->sb.njournal_buckets) 221 goto out; 222 bsearch: 223 BUG_ON(list_empty(list)); 224 225 /* Binary search */ 226 m = l; 227 r = find_next_bit(bitmap, ca->sb.njournal_buckets, l + 1); 228 pr_debug("starting binary search, l %u r %u\n", l, r); 229 230 while (l + 1 < r) { 231 seq = list_entry(list->prev, struct journal_replay, 232 list)->j.seq; 233 234 m = (l + r) >> 1; 235 read_bucket(m); 236 237 if (seq != list_entry(list->prev, struct journal_replay, 238 list)->j.seq) 239 l = m; 240 else 241 r = m; 242 } 243 244 /* 245 * Read buckets in reverse order until we stop finding more 246 * journal entries 247 */ 248 pr_debug("finishing up: m %u njournal_buckets %u\n", 249 m, ca->sb.njournal_buckets); 250 l = m; 251 252 while (1) { 253 if (!l--) 254 l = ca->sb.njournal_buckets - 1; 255 256 if (l == m) 257 break; 258 259 if (test_bit(l, bitmap)) 260 continue; 261 262 if (!read_bucket(l)) 263 break; 264 } 265 266 seq = 0; 267 268 for (i = 0; i < ca->sb.njournal_buckets; i++) 269 if (ja->seq[i] > seq) { 270 seq = ja->seq[i]; 271 /* 272 * When journal_reclaim() goes to allocate for 273 * the first time, it'll use the bucket after 274 * ja->cur_idx 275 */ 276 ja->cur_idx = i; 277 ja->last_idx = ja->discard_idx = (i + 1) % 278 ca->sb.njournal_buckets; 279 280 } 281 282 out: 283 if (!list_empty(list)) 284 c->journal.seq = list_entry(list->prev, 285 struct journal_replay, 286 list)->j.seq; 287 288 return 0; 289 #undef read_bucket 290 } 291 292 void bch_journal_mark(struct cache_set *c, struct list_head *list) 293 { 294 atomic_t p = { 0 }; 295 struct bkey *k; 296 struct journal_replay *i; 297 struct journal *j = &c->journal; 298 uint64_t last = j->seq; 299 300 /* 301 * journal.pin should never fill up - we never write a journal 302 * entry when it would fill up. But if for some reason it does, we 303 * iterate over the list in reverse order so that we can just skip that 304 * refcount instead of bugging. 305 */ 306 307 list_for_each_entry_reverse(i, list, list) { 308 BUG_ON(last < i->j.seq); 309 i->pin = NULL; 310 311 while (last-- != i->j.seq) 312 if (fifo_free(&j->pin) > 1) { 313 fifo_push_front(&j->pin, p); 314 atomic_set(&fifo_front(&j->pin), 0); 315 } 316 317 if (fifo_free(&j->pin) > 1) { 318 fifo_push_front(&j->pin, p); 319 i->pin = &fifo_front(&j->pin); 320 atomic_set(i->pin, 1); 321 } 322 323 for (k = i->j.start; 324 k < bset_bkey_last(&i->j); 325 k = bkey_next(k)) 326 if (!__bch_extent_invalid(c, k)) { 327 unsigned int j; 328 329 for (j = 0; j < KEY_PTRS(k); j++) 330 if (ptr_available(c, k, j)) 331 atomic_inc(&PTR_BUCKET(c, k, j)->pin); 332 333 bch_initial_mark_key(c, 0, k); 334 } 335 } 336 } 337 338 static bool is_discard_enabled(struct cache_set *s) 339 { 340 struct cache *ca = s->cache; 341 342 if (ca->discard) 343 return true; 344 345 return false; 346 } 347 348 int bch_journal_replay(struct cache_set *s, struct list_head *list) 349 { 350 int ret = 0, keys = 0, entries = 0; 351 struct bkey *k; 352 struct journal_replay *i = 353 list_entry(list->prev, struct journal_replay, list); 354 355 uint64_t start = i->j.last_seq, end = i->j.seq, n = start; 356 struct keylist keylist; 357 358 list_for_each_entry(i, list, list) { 359 BUG_ON(i->pin && atomic_read(i->pin) != 1); 360 361 if (n != i->j.seq) { 362 if (n == start && is_discard_enabled(s)) 363 pr_info("journal entries %llu-%llu may be discarded! (replaying %llu-%llu)\n", 364 n, i->j.seq - 1, start, end); 365 else { 366 pr_err("journal entries %llu-%llu missing! (replaying %llu-%llu)\n", 367 n, i->j.seq - 1, start, end); 368 ret = -EIO; 369 goto err; 370 } 371 } 372 373 for (k = i->j.start; 374 k < bset_bkey_last(&i->j); 375 k = bkey_next(k)) { 376 trace_bcache_journal_replay_key(k); 377 378 bch_keylist_init_single(&keylist, k); 379 380 ret = bch_btree_insert(s, &keylist, i->pin, NULL); 381 if (ret) 382 goto err; 383 384 BUG_ON(!bch_keylist_empty(&keylist)); 385 keys++; 386 387 cond_resched(); 388 } 389 390 if (i->pin) 391 atomic_dec(i->pin); 392 n = i->j.seq + 1; 393 entries++; 394 } 395 396 pr_info("journal replay done, %i keys in %i entries, seq %llu\n", 397 keys, entries, end); 398 err: 399 while (!list_empty(list)) { 400 i = list_first_entry(list, struct journal_replay, list); 401 list_del(&i->list); 402 kfree(i); 403 } 404 405 return ret; 406 } 407 408 /* Journalling */ 409 410 static void btree_flush_write(struct cache_set *c) 411 { 412 struct btree *b, *t, *btree_nodes[BTREE_FLUSH_NR]; 413 unsigned int i, nr; 414 int ref_nr; 415 atomic_t *fifo_front_p, *now_fifo_front_p; 416 size_t mask; 417 418 if (c->journal.btree_flushing) 419 return; 420 421 spin_lock(&c->journal.flush_write_lock); 422 if (c->journal.btree_flushing) { 423 spin_unlock(&c->journal.flush_write_lock); 424 return; 425 } 426 c->journal.btree_flushing = true; 427 spin_unlock(&c->journal.flush_write_lock); 428 429 /* get the oldest journal entry and check its refcount */ 430 spin_lock(&c->journal.lock); 431 fifo_front_p = &fifo_front(&c->journal.pin); 432 ref_nr = atomic_read(fifo_front_p); 433 if (ref_nr <= 0) { 434 /* 435 * do nothing if no btree node references 436 * the oldest journal entry 437 */ 438 spin_unlock(&c->journal.lock); 439 goto out; 440 } 441 spin_unlock(&c->journal.lock); 442 443 mask = c->journal.pin.mask; 444 nr = 0; 445 atomic_long_inc(&c->flush_write); 446 memset(btree_nodes, 0, sizeof(btree_nodes)); 447 448 mutex_lock(&c->bucket_lock); 449 list_for_each_entry_safe_reverse(b, t, &c->btree_cache, list) { 450 /* 451 * It is safe to get now_fifo_front_p without holding 452 * c->journal.lock here, because we don't need to know 453 * the exactly accurate value, just check whether the 454 * front pointer of c->journal.pin is changed. 455 */ 456 now_fifo_front_p = &fifo_front(&c->journal.pin); 457 /* 458 * If the oldest journal entry is reclaimed and front 459 * pointer of c->journal.pin changes, it is unnecessary 460 * to scan c->btree_cache anymore, just quit the loop and 461 * flush out what we have already. 462 */ 463 if (now_fifo_front_p != fifo_front_p) 464 break; 465 /* 466 * quit this loop if all matching btree nodes are 467 * scanned and record in btree_nodes[] already. 468 */ 469 ref_nr = atomic_read(fifo_front_p); 470 if (nr >= ref_nr) 471 break; 472 473 if (btree_node_journal_flush(b)) 474 pr_err("BUG: flush_write bit should not be set here!\n"); 475 476 mutex_lock(&b->write_lock); 477 478 if (!btree_node_dirty(b)) { 479 mutex_unlock(&b->write_lock); 480 continue; 481 } 482 483 if (!btree_current_write(b)->journal) { 484 mutex_unlock(&b->write_lock); 485 continue; 486 } 487 488 /* 489 * Only select the btree node which exactly references 490 * the oldest journal entry. 491 * 492 * If the journal entry pointed by fifo_front_p is 493 * reclaimed in parallel, don't worry: 494 * - the list_for_each_xxx loop will quit when checking 495 * next now_fifo_front_p. 496 * - If there are matched nodes recorded in btree_nodes[], 497 * they are clean now (this is why and how the oldest 498 * journal entry can be reclaimed). These selected nodes 499 * will be ignored and skipped in the following for-loop. 500 */ 501 if (((btree_current_write(b)->journal - fifo_front_p) & 502 mask) != 0) { 503 mutex_unlock(&b->write_lock); 504 continue; 505 } 506 507 set_btree_node_journal_flush(b); 508 509 mutex_unlock(&b->write_lock); 510 511 btree_nodes[nr++] = b; 512 /* 513 * To avoid holding c->bucket_lock too long time, 514 * only scan for BTREE_FLUSH_NR matched btree nodes 515 * at most. If there are more btree nodes reference 516 * the oldest journal entry, try to flush them next 517 * time when btree_flush_write() is called. 518 */ 519 if (nr == BTREE_FLUSH_NR) 520 break; 521 } 522 mutex_unlock(&c->bucket_lock); 523 524 for (i = 0; i < nr; i++) { 525 b = btree_nodes[i]; 526 if (!b) { 527 pr_err("BUG: btree_nodes[%d] is NULL\n", i); 528 continue; 529 } 530 531 /* safe to check without holding b->write_lock */ 532 if (!btree_node_journal_flush(b)) { 533 pr_err("BUG: bnode %p: journal_flush bit cleaned\n", b); 534 continue; 535 } 536 537 mutex_lock(&b->write_lock); 538 if (!btree_current_write(b)->journal) { 539 clear_bit(BTREE_NODE_journal_flush, &b->flags); 540 mutex_unlock(&b->write_lock); 541 pr_debug("bnode %p: written by others\n", b); 542 continue; 543 } 544 545 if (!btree_node_dirty(b)) { 546 clear_bit(BTREE_NODE_journal_flush, &b->flags); 547 mutex_unlock(&b->write_lock); 548 pr_debug("bnode %p: dirty bit cleaned by others\n", b); 549 continue; 550 } 551 552 __bch_btree_node_write(b, NULL); 553 clear_bit(BTREE_NODE_journal_flush, &b->flags); 554 mutex_unlock(&b->write_lock); 555 } 556 557 out: 558 spin_lock(&c->journal.flush_write_lock); 559 c->journal.btree_flushing = false; 560 spin_unlock(&c->journal.flush_write_lock); 561 } 562 563 #define last_seq(j) ((j)->seq - fifo_used(&(j)->pin) + 1) 564 565 static void journal_discard_endio(struct bio *bio) 566 { 567 struct journal_device *ja = 568 container_of(bio, struct journal_device, discard_bio); 569 struct cache *ca = container_of(ja, struct cache, journal); 570 571 atomic_set(&ja->discard_in_flight, DISCARD_DONE); 572 573 closure_wake_up(&ca->set->journal.wait); 574 closure_put(&ca->set->cl); 575 } 576 577 static void journal_discard_work(struct work_struct *work) 578 { 579 struct journal_device *ja = 580 container_of(work, struct journal_device, discard_work); 581 582 submit_bio(&ja->discard_bio); 583 } 584 585 static void do_journal_discard(struct cache *ca) 586 { 587 struct journal_device *ja = &ca->journal; 588 struct bio *bio = &ja->discard_bio; 589 590 if (!ca->discard) { 591 ja->discard_idx = ja->last_idx; 592 return; 593 } 594 595 switch (atomic_read(&ja->discard_in_flight)) { 596 case DISCARD_IN_FLIGHT: 597 return; 598 599 case DISCARD_DONE: 600 ja->discard_idx = (ja->discard_idx + 1) % 601 ca->sb.njournal_buckets; 602 603 atomic_set(&ja->discard_in_flight, DISCARD_READY); 604 fallthrough; 605 606 case DISCARD_READY: 607 if (ja->discard_idx == ja->last_idx) 608 return; 609 610 atomic_set(&ja->discard_in_flight, DISCARD_IN_FLIGHT); 611 612 bio_init(bio, ca->bdev, bio->bi_inline_vecs, 1, REQ_OP_DISCARD); 613 bio->bi_iter.bi_sector = bucket_to_sector(ca->set, 614 ca->sb.d[ja->discard_idx]); 615 bio->bi_iter.bi_size = bucket_bytes(ca); 616 bio->bi_end_io = journal_discard_endio; 617 618 closure_get(&ca->set->cl); 619 INIT_WORK(&ja->discard_work, journal_discard_work); 620 queue_work(bch_journal_wq, &ja->discard_work); 621 } 622 } 623 624 static void journal_reclaim(struct cache_set *c) 625 { 626 struct bkey *k = &c->journal.key; 627 struct cache *ca = c->cache; 628 uint64_t last_seq; 629 unsigned int next; 630 struct journal_device *ja = &ca->journal; 631 atomic_t p __maybe_unused; 632 633 atomic_long_inc(&c->reclaim); 634 635 while (!atomic_read(&fifo_front(&c->journal.pin))) 636 fifo_pop(&c->journal.pin, p); 637 638 last_seq = last_seq(&c->journal); 639 640 /* Update last_idx */ 641 642 while (ja->last_idx != ja->cur_idx && 643 ja->seq[ja->last_idx] < last_seq) 644 ja->last_idx = (ja->last_idx + 1) % 645 ca->sb.njournal_buckets; 646 647 do_journal_discard(ca); 648 649 if (c->journal.blocks_free) 650 goto out; 651 652 next = (ja->cur_idx + 1) % ca->sb.njournal_buckets; 653 /* No space available on this device */ 654 if (next == ja->discard_idx) 655 goto out; 656 657 ja->cur_idx = next; 658 k->ptr[0] = MAKE_PTR(0, 659 bucket_to_sector(c, ca->sb.d[ja->cur_idx]), 660 ca->sb.nr_this_dev); 661 atomic_long_inc(&c->reclaimed_journal_buckets); 662 663 bkey_init(k); 664 SET_KEY_PTRS(k, 1); 665 c->journal.blocks_free = ca->sb.bucket_size >> c->block_bits; 666 667 out: 668 if (!journal_full(&c->journal)) 669 __closure_wake_up(&c->journal.wait); 670 } 671 672 void bch_journal_next(struct journal *j) 673 { 674 atomic_t p = { 1 }; 675 676 j->cur = (j->cur == j->w) 677 ? &j->w[1] 678 : &j->w[0]; 679 680 /* 681 * The fifo_push() needs to happen at the same time as j->seq is 682 * incremented for last_seq() to be calculated correctly 683 */ 684 BUG_ON(!fifo_push(&j->pin, p)); 685 atomic_set(&fifo_back(&j->pin), 1); 686 687 j->cur->data->seq = ++j->seq; 688 j->cur->dirty = false; 689 j->cur->need_write = false; 690 j->cur->data->keys = 0; 691 692 if (fifo_full(&j->pin)) 693 pr_debug("journal_pin full (%zu)\n", fifo_used(&j->pin)); 694 } 695 696 static void journal_write_endio(struct bio *bio) 697 { 698 struct journal_write *w = bio->bi_private; 699 700 cache_set_err_on(bio->bi_status, w->c, "journal io error"); 701 closure_put(&w->c->journal.io); 702 } 703 704 static void journal_write(struct closure *cl); 705 706 static void journal_write_done(struct closure *cl) 707 { 708 struct journal *j = container_of(cl, struct journal, io); 709 struct journal_write *w = (j->cur == j->w) 710 ? &j->w[1] 711 : &j->w[0]; 712 713 __closure_wake_up(&w->wait); 714 continue_at_nobarrier(cl, journal_write, bch_journal_wq); 715 } 716 717 static void journal_write_unlock(struct closure *cl) 718 __releases(&c->journal.lock) 719 { 720 struct cache_set *c = container_of(cl, struct cache_set, journal.io); 721 722 c->journal.io_in_flight = 0; 723 spin_unlock(&c->journal.lock); 724 } 725 726 static void journal_write_unlocked(struct closure *cl) 727 __releases(c->journal.lock) 728 { 729 struct cache_set *c = container_of(cl, struct cache_set, journal.io); 730 struct cache *ca = c->cache; 731 struct journal_write *w = c->journal.cur; 732 struct bkey *k = &c->journal.key; 733 unsigned int i, sectors = set_blocks(w->data, block_bytes(ca)) * 734 ca->sb.block_size; 735 736 struct bio *bio; 737 struct bio_list list; 738 739 bio_list_init(&list); 740 741 if (!w->need_write) { 742 closure_return_with_destructor(cl, journal_write_unlock); 743 return; 744 } else if (journal_full(&c->journal)) { 745 journal_reclaim(c); 746 spin_unlock(&c->journal.lock); 747 748 btree_flush_write(c); 749 continue_at(cl, journal_write, bch_journal_wq); 750 return; 751 } 752 753 c->journal.blocks_free -= set_blocks(w->data, block_bytes(ca)); 754 755 w->data->btree_level = c->root->level; 756 757 bkey_copy(&w->data->btree_root, &c->root->key); 758 bkey_copy(&w->data->uuid_bucket, &c->uuid_bucket); 759 760 w->data->prio_bucket[ca->sb.nr_this_dev] = ca->prio_buckets[0]; 761 w->data->magic = jset_magic(&ca->sb); 762 w->data->version = BCACHE_JSET_VERSION; 763 w->data->last_seq = last_seq(&c->journal); 764 w->data->csum = csum_set(w->data); 765 766 for (i = 0; i < KEY_PTRS(k); i++) { 767 ca = c->cache; 768 bio = &ca->journal.bio; 769 770 atomic_long_add(sectors, &ca->meta_sectors_written); 771 772 bio_reset(bio, ca->bdev, REQ_OP_WRITE | 773 REQ_SYNC | REQ_META | REQ_PREFLUSH | REQ_FUA); 774 bch_bio_map(bio, w->data); 775 bio->bi_iter.bi_sector = PTR_OFFSET(k, i); 776 bio->bi_iter.bi_size = sectors << 9; 777 778 bio->bi_end_io = journal_write_endio; 779 bio->bi_private = w; 780 781 trace_bcache_journal_write(bio, w->data->keys); 782 bio_list_add(&list, bio); 783 784 SET_PTR_OFFSET(k, i, PTR_OFFSET(k, i) + sectors); 785 786 ca->journal.seq[ca->journal.cur_idx] = w->data->seq; 787 } 788 789 /* If KEY_PTRS(k) == 0, this jset gets lost in air */ 790 BUG_ON(i == 0); 791 792 atomic_dec_bug(&fifo_back(&c->journal.pin)); 793 bch_journal_next(&c->journal); 794 journal_reclaim(c); 795 796 spin_unlock(&c->journal.lock); 797 798 while ((bio = bio_list_pop(&list))) 799 closure_bio_submit(c, bio, cl); 800 801 continue_at(cl, journal_write_done, NULL); 802 } 803 804 static void journal_write(struct closure *cl) 805 { 806 struct cache_set *c = container_of(cl, struct cache_set, journal.io); 807 808 spin_lock(&c->journal.lock); 809 journal_write_unlocked(cl); 810 } 811 812 static void journal_try_write(struct cache_set *c) 813 __releases(c->journal.lock) 814 { 815 struct closure *cl = &c->journal.io; 816 struct journal_write *w = c->journal.cur; 817 818 w->need_write = true; 819 820 if (!c->journal.io_in_flight) { 821 c->journal.io_in_flight = 1; 822 closure_call(cl, journal_write_unlocked, NULL, &c->cl); 823 } else { 824 spin_unlock(&c->journal.lock); 825 } 826 } 827 828 static struct journal_write *journal_wait_for_write(struct cache_set *c, 829 unsigned int nkeys) 830 __acquires(&c->journal.lock) 831 { 832 size_t sectors; 833 struct closure cl; 834 bool wait = false; 835 struct cache *ca = c->cache; 836 837 closure_init_stack(&cl); 838 839 spin_lock(&c->journal.lock); 840 841 while (1) { 842 struct journal_write *w = c->journal.cur; 843 844 sectors = __set_blocks(w->data, w->data->keys + nkeys, 845 block_bytes(ca)) * ca->sb.block_size; 846 847 if (sectors <= min_t(size_t, 848 c->journal.blocks_free * ca->sb.block_size, 849 PAGE_SECTORS << JSET_BITS)) 850 return w; 851 852 if (wait) 853 closure_wait(&c->journal.wait, &cl); 854 855 if (!journal_full(&c->journal)) { 856 if (wait) 857 trace_bcache_journal_entry_full(c); 858 859 /* 860 * XXX: If we were inserting so many keys that they 861 * won't fit in an _empty_ journal write, we'll 862 * deadlock. For now, handle this in 863 * bch_keylist_realloc() - but something to think about. 864 */ 865 BUG_ON(!w->data->keys); 866 867 journal_try_write(c); /* unlocks */ 868 } else { 869 if (wait) 870 trace_bcache_journal_full(c); 871 872 journal_reclaim(c); 873 spin_unlock(&c->journal.lock); 874 875 btree_flush_write(c); 876 } 877 878 closure_sync(&cl); 879 spin_lock(&c->journal.lock); 880 wait = true; 881 } 882 } 883 884 static void journal_write_work(struct work_struct *work) 885 { 886 struct cache_set *c = container_of(to_delayed_work(work), 887 struct cache_set, 888 journal.work); 889 spin_lock(&c->journal.lock); 890 if (c->journal.cur->dirty) 891 journal_try_write(c); 892 else 893 spin_unlock(&c->journal.lock); 894 } 895 896 /* 897 * Entry point to the journalling code - bio_insert() and btree_invalidate() 898 * pass bch_journal() a list of keys to be journalled, and then 899 * bch_journal() hands those same keys off to btree_insert_async() 900 */ 901 902 atomic_t *bch_journal(struct cache_set *c, 903 struct keylist *keys, 904 struct closure *parent) 905 { 906 struct journal_write *w; 907 atomic_t *ret; 908 909 /* No journaling if CACHE_SET_IO_DISABLE set already */ 910 if (unlikely(test_bit(CACHE_SET_IO_DISABLE, &c->flags))) 911 return NULL; 912 913 if (!CACHE_SYNC(&c->cache->sb)) 914 return NULL; 915 916 w = journal_wait_for_write(c, bch_keylist_nkeys(keys)); 917 918 memcpy(bset_bkey_last(w->data), keys->keys, bch_keylist_bytes(keys)); 919 w->data->keys += bch_keylist_nkeys(keys); 920 921 ret = &fifo_back(&c->journal.pin); 922 atomic_inc(ret); 923 924 if (parent) { 925 closure_wait(&w->wait, parent); 926 journal_try_write(c); 927 } else if (!w->dirty) { 928 w->dirty = true; 929 queue_delayed_work(bch_flush_wq, &c->journal.work, 930 msecs_to_jiffies(c->journal_delay_ms)); 931 spin_unlock(&c->journal.lock); 932 } else { 933 spin_unlock(&c->journal.lock); 934 } 935 936 937 return ret; 938 } 939 940 void bch_journal_meta(struct cache_set *c, struct closure *cl) 941 { 942 struct keylist keys; 943 atomic_t *ref; 944 945 bch_keylist_init(&keys); 946 947 ref = bch_journal(c, &keys, cl); 948 if (ref) 949 atomic_dec_bug(ref); 950 } 951 952 void bch_journal_free(struct cache_set *c) 953 { 954 free_pages((unsigned long) c->journal.w[1].data, JSET_BITS); 955 free_pages((unsigned long) c->journal.w[0].data, JSET_BITS); 956 free_fifo(&c->journal.pin); 957 } 958 959 int bch_journal_alloc(struct cache_set *c) 960 { 961 struct journal *j = &c->journal; 962 963 spin_lock_init(&j->lock); 964 spin_lock_init(&j->flush_write_lock); 965 INIT_DELAYED_WORK(&j->work, journal_write_work); 966 967 c->journal_delay_ms = 100; 968 969 j->w[0].c = c; 970 j->w[1].c = c; 971 972 if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) || 973 !(j->w[0].data = (void *) __get_free_pages(GFP_KERNEL|__GFP_COMP, JSET_BITS)) || 974 !(j->w[1].data = (void *) __get_free_pages(GFP_KERNEL|__GFP_COMP, JSET_BITS))) 975 return -ENOMEM; 976 977 return 0; 978 } 979