1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (c) 2010 Red Hat, Inc. All Rights Reserved. 4 */ 5 6 #include "xfs.h" 7 #include "xfs_fs.h" 8 #include "xfs_format.h" 9 #include "xfs_log_format.h" 10 #include "xfs_shared.h" 11 #include "xfs_trans_resv.h" 12 #include "xfs_mount.h" 13 #include "xfs_extent_busy.h" 14 #include "xfs_trans.h" 15 #include "xfs_trans_priv.h" 16 #include "xfs_log.h" 17 #include "xfs_log_priv.h" 18 #include "xfs_trace.h" 19 20 struct workqueue_struct *xfs_discard_wq; 21 22 /* 23 * Allocate a new ticket. Failing to get a new ticket makes it really hard to 24 * recover, so we don't allow failure here. Also, we allocate in a context that 25 * we don't want to be issuing transactions from, so we need to tell the 26 * allocation code this as well. 27 * 28 * We don't reserve any space for the ticket - we are going to steal whatever 29 * space we require from transactions as they commit. To ensure we reserve all 30 * the space required, we need to set the current reservation of the ticket to 31 * zero so that we know to steal the initial transaction overhead from the 32 * first transaction commit. 33 */ 34 static struct xlog_ticket * 35 xlog_cil_ticket_alloc( 36 struct xlog *log) 37 { 38 struct xlog_ticket *tic; 39 40 tic = xlog_ticket_alloc(log, 0, 1, 0); 41 42 /* 43 * set the current reservation to zero so we know to steal the basic 44 * transaction overhead reservation from the first transaction commit. 45 */ 46 tic->t_curr_res = 0; 47 return tic; 48 } 49 50 /* 51 * Check if the current log item was first committed in this sequence. 52 * We can't rely on just the log item being in the CIL, we have to check 53 * the recorded commit sequence number. 54 * 55 * Note: for this to be used in a non-racy manner, it has to be called with 56 * CIL flushing locked out. As a result, it should only be used during the 57 * transaction commit process when deciding what to format into the item. 58 */ 59 static bool 60 xlog_item_in_current_chkpt( 61 struct xfs_cil *cil, 62 struct xfs_log_item *lip) 63 { 64 if (list_empty(&lip->li_cil)) 65 return false; 66 67 /* 68 * li_seq is written on the first commit of a log item to record the 69 * first checkpoint it is written to. Hence if it is different to the 70 * current sequence, we're in a new checkpoint. 71 */ 72 return lip->li_seq == READ_ONCE(cil->xc_current_sequence); 73 } 74 75 bool 76 xfs_log_item_in_current_chkpt( 77 struct xfs_log_item *lip) 78 { 79 return xlog_item_in_current_chkpt(lip->li_log->l_cilp, lip); 80 } 81 82 /* 83 * Unavoidable forward declaration - xlog_cil_push_work() calls 84 * xlog_cil_ctx_alloc() itself. 85 */ 86 static void xlog_cil_push_work(struct work_struct *work); 87 88 static struct xfs_cil_ctx * 89 xlog_cil_ctx_alloc(void) 90 { 91 struct xfs_cil_ctx *ctx; 92 93 ctx = kmem_zalloc(sizeof(*ctx), KM_NOFS); 94 INIT_LIST_HEAD(&ctx->committing); 95 INIT_LIST_HEAD(&ctx->busy_extents); 96 INIT_WORK(&ctx->push_work, xlog_cil_push_work); 97 return ctx; 98 } 99 100 static void 101 xlog_cil_ctx_switch( 102 struct xfs_cil *cil, 103 struct xfs_cil_ctx *ctx) 104 { 105 ctx->sequence = ++cil->xc_current_sequence; 106 ctx->cil = cil; 107 cil->xc_ctx = ctx; 108 } 109 110 /* 111 * After the first stage of log recovery is done, we know where the head and 112 * tail of the log are. We need this log initialisation done before we can 113 * initialise the first CIL checkpoint context. 114 * 115 * Here we allocate a log ticket to track space usage during a CIL push. This 116 * ticket is passed to xlog_write() directly so that we don't slowly leak log 117 * space by failing to account for space used by log headers and additional 118 * region headers for split regions. 119 */ 120 void 121 xlog_cil_init_post_recovery( 122 struct xlog *log) 123 { 124 log->l_cilp->xc_ctx->ticket = xlog_cil_ticket_alloc(log); 125 log->l_cilp->xc_ctx->sequence = 1; 126 } 127 128 static inline int 129 xlog_cil_iovec_space( 130 uint niovecs) 131 { 132 return round_up((sizeof(struct xfs_log_vec) + 133 niovecs * sizeof(struct xfs_log_iovec)), 134 sizeof(uint64_t)); 135 } 136 137 /* 138 * shadow buffers can be large, so we need to use kvmalloc() here to ensure 139 * success. Unfortunately, kvmalloc() only allows GFP_KERNEL contexts to fall 140 * back to vmalloc, so we can't actually do anything useful with gfp flags to 141 * control the kmalloc() behaviour within kvmalloc(). Hence kmalloc() will do 142 * direct reclaim and compaction in the slow path, both of which are 143 * horrendously expensive. We just want kmalloc to fail fast and fall back to 144 * vmalloc if it can't get somethign straight away from the free lists or buddy 145 * allocator. Hence we have to open code kvmalloc outselves here. 146 * 147 * Also, we are in memalloc_nofs_save task context here, so despite the use of 148 * GFP_KERNEL here, we are actually going to be doing GFP_NOFS allocations. This 149 * is actually the only way to make vmalloc() do GFP_NOFS allocations, so lets 150 * just all pretend this is a GFP_KERNEL context operation.... 151 */ 152 static inline void * 153 xlog_cil_kvmalloc( 154 size_t buf_size) 155 { 156 gfp_t flags = GFP_KERNEL; 157 void *p; 158 159 flags &= ~__GFP_DIRECT_RECLAIM; 160 flags |= __GFP_NOWARN | __GFP_NORETRY; 161 do { 162 p = kmalloc(buf_size, flags); 163 if (!p) 164 p = vmalloc(buf_size); 165 } while (!p); 166 167 return p; 168 } 169 170 /* 171 * Allocate or pin log vector buffers for CIL insertion. 172 * 173 * The CIL currently uses disposable buffers for copying a snapshot of the 174 * modified items into the log during a push. The biggest problem with this is 175 * the requirement to allocate the disposable buffer during the commit if: 176 * a) does not exist; or 177 * b) it is too small 178 * 179 * If we do this allocation within xlog_cil_insert_format_items(), it is done 180 * under the xc_ctx_lock, which means that a CIL push cannot occur during 181 * the memory allocation. This means that we have a potential deadlock situation 182 * under low memory conditions when we have lots of dirty metadata pinned in 183 * the CIL and we need a CIL commit to occur to free memory. 184 * 185 * To avoid this, we need to move the memory allocation outside the 186 * xc_ctx_lock, but because the log vector buffers are disposable, that opens 187 * up a TOCTOU race condition w.r.t. the CIL committing and removing the log 188 * vector buffers between the check and the formatting of the item into the 189 * log vector buffer within the xc_ctx_lock. 190 * 191 * Because the log vector buffer needs to be unchanged during the CIL push 192 * process, we cannot share the buffer between the transaction commit (which 193 * modifies the buffer) and the CIL push context that is writing the changes 194 * into the log. This means skipping preallocation of buffer space is 195 * unreliable, but we most definitely do not want to be allocating and freeing 196 * buffers unnecessarily during commits when overwrites can be done safely. 197 * 198 * The simplest solution to this problem is to allocate a shadow buffer when a 199 * log item is committed for the second time, and then to only use this buffer 200 * if necessary. The buffer can remain attached to the log item until such time 201 * it is needed, and this is the buffer that is reallocated to match the size of 202 * the incoming modification. Then during the formatting of the item we can swap 203 * the active buffer with the new one if we can't reuse the existing buffer. We 204 * don't free the old buffer as it may be reused on the next modification if 205 * it's size is right, otherwise we'll free and reallocate it at that point. 206 * 207 * This function builds a vector for the changes in each log item in the 208 * transaction. It then works out the length of the buffer needed for each log 209 * item, allocates them and attaches the vector to the log item in preparation 210 * for the formatting step which occurs under the xc_ctx_lock. 211 * 212 * While this means the memory footprint goes up, it avoids the repeated 213 * alloc/free pattern that repeated modifications of an item would otherwise 214 * cause, and hence minimises the CPU overhead of such behaviour. 215 */ 216 static void 217 xlog_cil_alloc_shadow_bufs( 218 struct xlog *log, 219 struct xfs_trans *tp) 220 { 221 struct xfs_log_item *lip; 222 223 list_for_each_entry(lip, &tp->t_items, li_trans) { 224 struct xfs_log_vec *lv; 225 int niovecs = 0; 226 int nbytes = 0; 227 int buf_size; 228 bool ordered = false; 229 230 /* Skip items which aren't dirty in this transaction. */ 231 if (!test_bit(XFS_LI_DIRTY, &lip->li_flags)) 232 continue; 233 234 /* get number of vecs and size of data to be stored */ 235 lip->li_ops->iop_size(lip, &niovecs, &nbytes); 236 237 /* 238 * Ordered items need to be tracked but we do not wish to write 239 * them. We need a logvec to track the object, but we do not 240 * need an iovec or buffer to be allocated for copying data. 241 */ 242 if (niovecs == XFS_LOG_VEC_ORDERED) { 243 ordered = true; 244 niovecs = 0; 245 nbytes = 0; 246 } 247 248 /* 249 * We 64-bit align the length of each iovec so that the start of 250 * the next one is naturally aligned. We'll need to account for 251 * that slack space here. 252 * 253 * We also add the xlog_op_header to each region when 254 * formatting, but that's not accounted to the size of the item 255 * at this point. Hence we'll need an addition number of bytes 256 * for each vector to hold an opheader. 257 * 258 * Then round nbytes up to 64-bit alignment so that the initial 259 * buffer alignment is easy to calculate and verify. 260 */ 261 nbytes += niovecs * 262 (sizeof(uint64_t) + sizeof(struct xlog_op_header)); 263 nbytes = round_up(nbytes, sizeof(uint64_t)); 264 265 /* 266 * The data buffer needs to start 64-bit aligned, so round up 267 * that space to ensure we can align it appropriately and not 268 * overrun the buffer. 269 */ 270 buf_size = nbytes + xlog_cil_iovec_space(niovecs); 271 272 /* 273 * if we have no shadow buffer, or it is too small, we need to 274 * reallocate it. 275 */ 276 if (!lip->li_lv_shadow || 277 buf_size > lip->li_lv_shadow->lv_size) { 278 /* 279 * We free and allocate here as a realloc would copy 280 * unnecessary data. We don't use kvzalloc() for the 281 * same reason - we don't need to zero the data area in 282 * the buffer, only the log vector header and the iovec 283 * storage. 284 */ 285 kmem_free(lip->li_lv_shadow); 286 lv = xlog_cil_kvmalloc(buf_size); 287 288 memset(lv, 0, xlog_cil_iovec_space(niovecs)); 289 290 lv->lv_item = lip; 291 lv->lv_size = buf_size; 292 if (ordered) 293 lv->lv_buf_len = XFS_LOG_VEC_ORDERED; 294 else 295 lv->lv_iovecp = (struct xfs_log_iovec *)&lv[1]; 296 lip->li_lv_shadow = lv; 297 } else { 298 /* same or smaller, optimise common overwrite case */ 299 lv = lip->li_lv_shadow; 300 if (ordered) 301 lv->lv_buf_len = XFS_LOG_VEC_ORDERED; 302 else 303 lv->lv_buf_len = 0; 304 lv->lv_bytes = 0; 305 lv->lv_next = NULL; 306 } 307 308 /* Ensure the lv is set up according to ->iop_size */ 309 lv->lv_niovecs = niovecs; 310 311 /* The allocated data region lies beyond the iovec region */ 312 lv->lv_buf = (char *)lv + xlog_cil_iovec_space(niovecs); 313 } 314 315 } 316 317 /* 318 * Prepare the log item for insertion into the CIL. Calculate the difference in 319 * log space it will consume, and if it is a new item pin it as well. 320 */ 321 STATIC void 322 xfs_cil_prepare_item( 323 struct xlog *log, 324 struct xfs_log_vec *lv, 325 struct xfs_log_vec *old_lv, 326 int *diff_len) 327 { 328 /* Account for the new LV being passed in */ 329 if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED) 330 *diff_len += lv->lv_bytes; 331 332 /* 333 * If there is no old LV, this is the first time we've seen the item in 334 * this CIL context and so we need to pin it. If we are replacing the 335 * old_lv, then remove the space it accounts for and make it the shadow 336 * buffer for later freeing. In both cases we are now switching to the 337 * shadow buffer, so update the pointer to it appropriately. 338 */ 339 if (!old_lv) { 340 if (lv->lv_item->li_ops->iop_pin) 341 lv->lv_item->li_ops->iop_pin(lv->lv_item); 342 lv->lv_item->li_lv_shadow = NULL; 343 } else if (old_lv != lv) { 344 ASSERT(lv->lv_buf_len != XFS_LOG_VEC_ORDERED); 345 346 *diff_len -= old_lv->lv_bytes; 347 lv->lv_item->li_lv_shadow = old_lv; 348 } 349 350 /* attach new log vector to log item */ 351 lv->lv_item->li_lv = lv; 352 353 /* 354 * If this is the first time the item is being committed to the 355 * CIL, store the sequence number on the log item so we can 356 * tell in future commits whether this is the first checkpoint 357 * the item is being committed into. 358 */ 359 if (!lv->lv_item->li_seq) 360 lv->lv_item->li_seq = log->l_cilp->xc_ctx->sequence; 361 } 362 363 /* 364 * Format log item into a flat buffers 365 * 366 * For delayed logging, we need to hold a formatted buffer containing all the 367 * changes on the log item. This enables us to relog the item in memory and 368 * write it out asynchronously without needing to relock the object that was 369 * modified at the time it gets written into the iclog. 370 * 371 * This function takes the prepared log vectors attached to each log item, and 372 * formats the changes into the log vector buffer. The buffer it uses is 373 * dependent on the current state of the vector in the CIL - the shadow lv is 374 * guaranteed to be large enough for the current modification, but we will only 375 * use that if we can't reuse the existing lv. If we can't reuse the existing 376 * lv, then simple swap it out for the shadow lv. We don't free it - that is 377 * done lazily either by th enext modification or the freeing of the log item. 378 * 379 * We don't set up region headers during this process; we simply copy the 380 * regions into the flat buffer. We can do this because we still have to do a 381 * formatting step to write the regions into the iclog buffer. Writing the 382 * ophdrs during the iclog write means that we can support splitting large 383 * regions across iclog boundares without needing a change in the format of the 384 * item/region encapsulation. 385 * 386 * Hence what we need to do now is change the rewrite the vector array to point 387 * to the copied region inside the buffer we just allocated. This allows us to 388 * format the regions into the iclog as though they are being formatted 389 * directly out of the objects themselves. 390 */ 391 static void 392 xlog_cil_insert_format_items( 393 struct xlog *log, 394 struct xfs_trans *tp, 395 int *diff_len) 396 { 397 struct xfs_log_item *lip; 398 399 /* Bail out if we didn't find a log item. */ 400 if (list_empty(&tp->t_items)) { 401 ASSERT(0); 402 return; 403 } 404 405 list_for_each_entry(lip, &tp->t_items, li_trans) { 406 struct xfs_log_vec *lv; 407 struct xfs_log_vec *old_lv = NULL; 408 struct xfs_log_vec *shadow; 409 bool ordered = false; 410 411 /* Skip items which aren't dirty in this transaction. */ 412 if (!test_bit(XFS_LI_DIRTY, &lip->li_flags)) 413 continue; 414 415 /* 416 * The formatting size information is already attached to 417 * the shadow lv on the log item. 418 */ 419 shadow = lip->li_lv_shadow; 420 if (shadow->lv_buf_len == XFS_LOG_VEC_ORDERED) 421 ordered = true; 422 423 /* Skip items that do not have any vectors for writing */ 424 if (!shadow->lv_niovecs && !ordered) 425 continue; 426 427 /* compare to existing item size */ 428 old_lv = lip->li_lv; 429 if (lip->li_lv && shadow->lv_size <= lip->li_lv->lv_size) { 430 /* same or smaller, optimise common overwrite case */ 431 lv = lip->li_lv; 432 lv->lv_next = NULL; 433 434 if (ordered) 435 goto insert; 436 437 /* 438 * set the item up as though it is a new insertion so 439 * that the space reservation accounting is correct. 440 */ 441 *diff_len -= lv->lv_bytes; 442 443 /* Ensure the lv is set up according to ->iop_size */ 444 lv->lv_niovecs = shadow->lv_niovecs; 445 446 /* reset the lv buffer information for new formatting */ 447 lv->lv_buf_len = 0; 448 lv->lv_bytes = 0; 449 lv->lv_buf = (char *)lv + 450 xlog_cil_iovec_space(lv->lv_niovecs); 451 } else { 452 /* switch to shadow buffer! */ 453 lv = shadow; 454 lv->lv_item = lip; 455 if (ordered) { 456 /* track as an ordered logvec */ 457 ASSERT(lip->li_lv == NULL); 458 goto insert; 459 } 460 } 461 462 ASSERT(IS_ALIGNED((unsigned long)lv->lv_buf, sizeof(uint64_t))); 463 lip->li_ops->iop_format(lip, lv); 464 insert: 465 xfs_cil_prepare_item(log, lv, old_lv, diff_len); 466 } 467 } 468 469 /* 470 * Insert the log items into the CIL and calculate the difference in space 471 * consumed by the item. Add the space to the checkpoint ticket and calculate 472 * if the change requires additional log metadata. If it does, take that space 473 * as well. Remove the amount of space we added to the checkpoint ticket from 474 * the current transaction ticket so that the accounting works out correctly. 475 */ 476 static void 477 xlog_cil_insert_items( 478 struct xlog *log, 479 struct xfs_trans *tp) 480 { 481 struct xfs_cil *cil = log->l_cilp; 482 struct xfs_cil_ctx *ctx = cil->xc_ctx; 483 struct xfs_log_item *lip; 484 int len = 0; 485 int iclog_space; 486 int iovhdr_res = 0, split_res = 0, ctx_res = 0; 487 488 ASSERT(tp); 489 490 /* 491 * We can do this safely because the context can't checkpoint until we 492 * are done so it doesn't matter exactly how we update the CIL. 493 */ 494 xlog_cil_insert_format_items(log, tp, &len); 495 496 spin_lock(&cil->xc_cil_lock); 497 498 /* attach the transaction to the CIL if it has any busy extents */ 499 if (!list_empty(&tp->t_busy)) 500 list_splice_init(&tp->t_busy, &ctx->busy_extents); 501 502 /* 503 * Now transfer enough transaction reservation to the context ticket 504 * for the checkpoint. The context ticket is special - the unit 505 * reservation has to grow as well as the current reservation as we 506 * steal from tickets so we can correctly determine the space used 507 * during the transaction commit. 508 */ 509 if (ctx->ticket->t_curr_res == 0) { 510 ctx_res = ctx->ticket->t_unit_res; 511 ctx->ticket->t_curr_res = ctx_res; 512 tp->t_ticket->t_curr_res -= ctx_res; 513 } 514 515 /* do we need space for more log record headers? */ 516 iclog_space = log->l_iclog_size - log->l_iclog_hsize; 517 if (len > 0 && (ctx->space_used / iclog_space != 518 (ctx->space_used + len) / iclog_space)) { 519 split_res = (len + iclog_space - 1) / iclog_space; 520 /* need to take into account split region headers, too */ 521 split_res *= log->l_iclog_hsize + sizeof(struct xlog_op_header); 522 ctx->ticket->t_unit_res += split_res; 523 ctx->ticket->t_curr_res += split_res; 524 tp->t_ticket->t_curr_res -= split_res; 525 ASSERT(tp->t_ticket->t_curr_res >= len); 526 } 527 tp->t_ticket->t_curr_res -= len; 528 ctx->space_used += len; 529 530 /* 531 * If we've overrun the reservation, dump the tx details before we move 532 * the log items. Shutdown is imminent... 533 */ 534 if (WARN_ON(tp->t_ticket->t_curr_res < 0)) { 535 xfs_warn(log->l_mp, "Transaction log reservation overrun:"); 536 xfs_warn(log->l_mp, 537 " log items: %d bytes (iov hdrs: %d bytes)", 538 len, iovhdr_res); 539 xfs_warn(log->l_mp, " split region headers: %d bytes", 540 split_res); 541 xfs_warn(log->l_mp, " ctx ticket: %d bytes", ctx_res); 542 xlog_print_trans(tp); 543 } 544 545 /* 546 * Now (re-)position everything modified at the tail of the CIL. 547 * We do this here so we only need to take the CIL lock once during 548 * the transaction commit. 549 */ 550 list_for_each_entry(lip, &tp->t_items, li_trans) { 551 552 /* Skip items which aren't dirty in this transaction. */ 553 if (!test_bit(XFS_LI_DIRTY, &lip->li_flags)) 554 continue; 555 556 /* 557 * Only move the item if it isn't already at the tail. This is 558 * to prevent a transient list_empty() state when reinserting 559 * an item that is already the only item in the CIL. 560 */ 561 if (!list_is_last(&lip->li_cil, &cil->xc_cil)) 562 list_move_tail(&lip->li_cil, &cil->xc_cil); 563 } 564 565 spin_unlock(&cil->xc_cil_lock); 566 567 if (tp->t_ticket->t_curr_res < 0) 568 xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR); 569 } 570 571 static void 572 xlog_cil_free_logvec( 573 struct xfs_log_vec *log_vector) 574 { 575 struct xfs_log_vec *lv; 576 577 for (lv = log_vector; lv; ) { 578 struct xfs_log_vec *next = lv->lv_next; 579 kmem_free(lv); 580 lv = next; 581 } 582 } 583 584 static void 585 xlog_discard_endio_work( 586 struct work_struct *work) 587 { 588 struct xfs_cil_ctx *ctx = 589 container_of(work, struct xfs_cil_ctx, discard_endio_work); 590 struct xfs_mount *mp = ctx->cil->xc_log->l_mp; 591 592 xfs_extent_busy_clear(mp, &ctx->busy_extents, false); 593 kmem_free(ctx); 594 } 595 596 /* 597 * Queue up the actual completion to a thread to avoid IRQ-safe locking for 598 * pagb_lock. Note that we need a unbounded workqueue, otherwise we might 599 * get the execution delayed up to 30 seconds for weird reasons. 600 */ 601 static void 602 xlog_discard_endio( 603 struct bio *bio) 604 { 605 struct xfs_cil_ctx *ctx = bio->bi_private; 606 607 INIT_WORK(&ctx->discard_endio_work, xlog_discard_endio_work); 608 queue_work(xfs_discard_wq, &ctx->discard_endio_work); 609 bio_put(bio); 610 } 611 612 static void 613 xlog_discard_busy_extents( 614 struct xfs_mount *mp, 615 struct xfs_cil_ctx *ctx) 616 { 617 struct list_head *list = &ctx->busy_extents; 618 struct xfs_extent_busy *busyp; 619 struct bio *bio = NULL; 620 struct blk_plug plug; 621 int error = 0; 622 623 ASSERT(xfs_has_discard(mp)); 624 625 blk_start_plug(&plug); 626 list_for_each_entry(busyp, list, list) { 627 trace_xfs_discard_extent(mp, busyp->agno, busyp->bno, 628 busyp->length); 629 630 error = __blkdev_issue_discard(mp->m_ddev_targp->bt_bdev, 631 XFS_AGB_TO_DADDR(mp, busyp->agno, busyp->bno), 632 XFS_FSB_TO_BB(mp, busyp->length), 633 GFP_NOFS, 0, &bio); 634 if (error && error != -EOPNOTSUPP) { 635 xfs_info(mp, 636 "discard failed for extent [0x%llx,%u], error %d", 637 (unsigned long long)busyp->bno, 638 busyp->length, 639 error); 640 break; 641 } 642 } 643 644 if (bio) { 645 bio->bi_private = ctx; 646 bio->bi_end_io = xlog_discard_endio; 647 submit_bio(bio); 648 } else { 649 xlog_discard_endio_work(&ctx->discard_endio_work); 650 } 651 blk_finish_plug(&plug); 652 } 653 654 /* 655 * Mark all items committed and clear busy extents. We free the log vector 656 * chains in a separate pass so that we unpin the log items as quickly as 657 * possible. 658 */ 659 static void 660 xlog_cil_committed( 661 struct xfs_cil_ctx *ctx) 662 { 663 struct xfs_mount *mp = ctx->cil->xc_log->l_mp; 664 bool abort = xlog_is_shutdown(ctx->cil->xc_log); 665 666 /* 667 * If the I/O failed, we're aborting the commit and already shutdown. 668 * Wake any commit waiters before aborting the log items so we don't 669 * block async log pushers on callbacks. Async log pushers explicitly do 670 * not wait on log force completion because they may be holding locks 671 * required to unpin items. 672 */ 673 if (abort) { 674 spin_lock(&ctx->cil->xc_push_lock); 675 wake_up_all(&ctx->cil->xc_start_wait); 676 wake_up_all(&ctx->cil->xc_commit_wait); 677 spin_unlock(&ctx->cil->xc_push_lock); 678 } 679 680 xfs_trans_committed_bulk(ctx->cil->xc_log->l_ailp, ctx->lv_chain, 681 ctx->start_lsn, abort); 682 683 xfs_extent_busy_sort(&ctx->busy_extents); 684 xfs_extent_busy_clear(mp, &ctx->busy_extents, 685 xfs_has_discard(mp) && !abort); 686 687 spin_lock(&ctx->cil->xc_push_lock); 688 list_del(&ctx->committing); 689 spin_unlock(&ctx->cil->xc_push_lock); 690 691 xlog_cil_free_logvec(ctx->lv_chain); 692 693 if (!list_empty(&ctx->busy_extents)) 694 xlog_discard_busy_extents(mp, ctx); 695 else 696 kmem_free(ctx); 697 } 698 699 void 700 xlog_cil_process_committed( 701 struct list_head *list) 702 { 703 struct xfs_cil_ctx *ctx; 704 705 while ((ctx = list_first_entry_or_null(list, 706 struct xfs_cil_ctx, iclog_entry))) { 707 list_del(&ctx->iclog_entry); 708 xlog_cil_committed(ctx); 709 } 710 } 711 712 /* 713 * Record the LSN of the iclog we were just granted space to start writing into. 714 * If the context doesn't have a start_lsn recorded, then this iclog will 715 * contain the start record for the checkpoint. Otherwise this write contains 716 * the commit record for the checkpoint. 717 */ 718 void 719 xlog_cil_set_ctx_write_state( 720 struct xfs_cil_ctx *ctx, 721 struct xlog_in_core *iclog) 722 { 723 struct xfs_cil *cil = ctx->cil; 724 xfs_lsn_t lsn = be64_to_cpu(iclog->ic_header.h_lsn); 725 726 ASSERT(!ctx->commit_lsn); 727 if (!ctx->start_lsn) { 728 spin_lock(&cil->xc_push_lock); 729 /* 730 * The LSN we need to pass to the log items on transaction 731 * commit is the LSN reported by the first log vector write, not 732 * the commit lsn. If we use the commit record lsn then we can 733 * move the grant write head beyond the tail LSN and overwrite 734 * it. 735 */ 736 ctx->start_lsn = lsn; 737 wake_up_all(&cil->xc_start_wait); 738 spin_unlock(&cil->xc_push_lock); 739 740 /* 741 * Make sure the metadata we are about to overwrite in the log 742 * has been flushed to stable storage before this iclog is 743 * issued. 744 */ 745 spin_lock(&cil->xc_log->l_icloglock); 746 iclog->ic_flags |= XLOG_ICL_NEED_FLUSH; 747 spin_unlock(&cil->xc_log->l_icloglock); 748 return; 749 } 750 751 /* 752 * Take a reference to the iclog for the context so that we still hold 753 * it when xlog_write is done and has released it. This means the 754 * context controls when the iclog is released for IO. 755 */ 756 atomic_inc(&iclog->ic_refcnt); 757 758 /* 759 * xlog_state_get_iclog_space() guarantees there is enough space in the 760 * iclog for an entire commit record, so we can attach the context 761 * callbacks now. This needs to be done before we make the commit_lsn 762 * visible to waiters so that checkpoints with commit records in the 763 * same iclog order their IO completion callbacks in the same order that 764 * the commit records appear in the iclog. 765 */ 766 spin_lock(&cil->xc_log->l_icloglock); 767 list_add_tail(&ctx->iclog_entry, &iclog->ic_callbacks); 768 spin_unlock(&cil->xc_log->l_icloglock); 769 770 /* 771 * Now we can record the commit LSN and wake anyone waiting for this 772 * sequence to have the ordered commit record assigned to a physical 773 * location in the log. 774 */ 775 spin_lock(&cil->xc_push_lock); 776 ctx->commit_iclog = iclog; 777 ctx->commit_lsn = lsn; 778 wake_up_all(&cil->xc_commit_wait); 779 spin_unlock(&cil->xc_push_lock); 780 } 781 782 783 /* 784 * Ensure that the order of log writes follows checkpoint sequence order. This 785 * relies on the context LSN being zero until the log write has guaranteed the 786 * LSN that the log write will start at via xlog_state_get_iclog_space(). 787 */ 788 enum _record_type { 789 _START_RECORD, 790 _COMMIT_RECORD, 791 }; 792 793 static int 794 xlog_cil_order_write( 795 struct xfs_cil *cil, 796 xfs_csn_t sequence, 797 enum _record_type record) 798 { 799 struct xfs_cil_ctx *ctx; 800 801 restart: 802 spin_lock(&cil->xc_push_lock); 803 list_for_each_entry(ctx, &cil->xc_committing, committing) { 804 /* 805 * Avoid getting stuck in this loop because we were woken by the 806 * shutdown, but then went back to sleep once already in the 807 * shutdown state. 808 */ 809 if (xlog_is_shutdown(cil->xc_log)) { 810 spin_unlock(&cil->xc_push_lock); 811 return -EIO; 812 } 813 814 /* 815 * Higher sequences will wait for this one so skip them. 816 * Don't wait for our own sequence, either. 817 */ 818 if (ctx->sequence >= sequence) 819 continue; 820 821 /* Wait until the LSN for the record has been recorded. */ 822 switch (record) { 823 case _START_RECORD: 824 if (!ctx->start_lsn) { 825 xlog_wait(&cil->xc_start_wait, &cil->xc_push_lock); 826 goto restart; 827 } 828 break; 829 case _COMMIT_RECORD: 830 if (!ctx->commit_lsn) { 831 xlog_wait(&cil->xc_commit_wait, &cil->xc_push_lock); 832 goto restart; 833 } 834 break; 835 } 836 } 837 spin_unlock(&cil->xc_push_lock); 838 return 0; 839 } 840 841 /* 842 * Write out the log vector change now attached to the CIL context. This will 843 * write a start record that needs to be strictly ordered in ascending CIL 844 * sequence order so that log recovery will always use in-order start LSNs when 845 * replaying checkpoints. 846 */ 847 static int 848 xlog_cil_write_chain( 849 struct xfs_cil_ctx *ctx, 850 struct xfs_log_vec *chain, 851 uint32_t chain_len) 852 { 853 struct xlog *log = ctx->cil->xc_log; 854 int error; 855 856 error = xlog_cil_order_write(ctx->cil, ctx->sequence, _START_RECORD); 857 if (error) 858 return error; 859 return xlog_write(log, ctx, chain, ctx->ticket, chain_len); 860 } 861 862 /* 863 * Write out the commit record of a checkpoint transaction to close off a 864 * running log write. These commit records are strictly ordered in ascending CIL 865 * sequence order so that log recovery will always replay the checkpoints in the 866 * correct order. 867 */ 868 static int 869 xlog_cil_write_commit_record( 870 struct xfs_cil_ctx *ctx) 871 { 872 struct xlog *log = ctx->cil->xc_log; 873 struct xlog_op_header ophdr = { 874 .oh_clientid = XFS_TRANSACTION, 875 .oh_tid = cpu_to_be32(ctx->ticket->t_tid), 876 .oh_flags = XLOG_COMMIT_TRANS, 877 }; 878 struct xfs_log_iovec reg = { 879 .i_addr = &ophdr, 880 .i_len = sizeof(struct xlog_op_header), 881 .i_type = XLOG_REG_TYPE_COMMIT, 882 }; 883 struct xfs_log_vec vec = { 884 .lv_niovecs = 1, 885 .lv_iovecp = ®, 886 }; 887 int error; 888 889 if (xlog_is_shutdown(log)) 890 return -EIO; 891 892 error = xlog_cil_order_write(ctx->cil, ctx->sequence, _COMMIT_RECORD); 893 if (error) 894 return error; 895 896 /* account for space used by record data */ 897 ctx->ticket->t_curr_res -= reg.i_len; 898 error = xlog_write(log, ctx, &vec, ctx->ticket, reg.i_len); 899 if (error) 900 xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR); 901 return error; 902 } 903 904 struct xlog_cil_trans_hdr { 905 struct xlog_op_header oph[2]; 906 struct xfs_trans_header thdr; 907 struct xfs_log_iovec lhdr[2]; 908 }; 909 910 /* 911 * Build a checkpoint transaction header to begin the journal transaction. We 912 * need to account for the space used by the transaction header here as it is 913 * not accounted for in xlog_write(). 914 * 915 * This is the only place we write a transaction header, so we also build the 916 * log opheaders that indicate the start of a log transaction and wrap the 917 * transaction header. We keep the start record in it's own log vector rather 918 * than compacting them into a single region as this ends up making the logic 919 * in xlog_write() for handling empty opheaders for start, commit and unmount 920 * records much simpler. 921 */ 922 static void 923 xlog_cil_build_trans_hdr( 924 struct xfs_cil_ctx *ctx, 925 struct xlog_cil_trans_hdr *hdr, 926 struct xfs_log_vec *lvhdr, 927 int num_iovecs) 928 { 929 struct xlog_ticket *tic = ctx->ticket; 930 __be32 tid = cpu_to_be32(tic->t_tid); 931 932 memset(hdr, 0, sizeof(*hdr)); 933 934 /* Log start record */ 935 hdr->oph[0].oh_tid = tid; 936 hdr->oph[0].oh_clientid = XFS_TRANSACTION; 937 hdr->oph[0].oh_flags = XLOG_START_TRANS; 938 939 /* log iovec region pointer */ 940 hdr->lhdr[0].i_addr = &hdr->oph[0]; 941 hdr->lhdr[0].i_len = sizeof(struct xlog_op_header); 942 hdr->lhdr[0].i_type = XLOG_REG_TYPE_LRHEADER; 943 944 /* log opheader */ 945 hdr->oph[1].oh_tid = tid; 946 hdr->oph[1].oh_clientid = XFS_TRANSACTION; 947 hdr->oph[1].oh_len = cpu_to_be32(sizeof(struct xfs_trans_header)); 948 949 /* transaction header in host byte order format */ 950 hdr->thdr.th_magic = XFS_TRANS_HEADER_MAGIC; 951 hdr->thdr.th_type = XFS_TRANS_CHECKPOINT; 952 hdr->thdr.th_tid = tic->t_tid; 953 hdr->thdr.th_num_items = num_iovecs; 954 955 /* log iovec region pointer */ 956 hdr->lhdr[1].i_addr = &hdr->oph[1]; 957 hdr->lhdr[1].i_len = sizeof(struct xlog_op_header) + 958 sizeof(struct xfs_trans_header); 959 hdr->lhdr[1].i_type = XLOG_REG_TYPE_TRANSHDR; 960 961 lvhdr->lv_niovecs = 2; 962 lvhdr->lv_iovecp = &hdr->lhdr[0]; 963 lvhdr->lv_bytes = hdr->lhdr[0].i_len + hdr->lhdr[1].i_len; 964 lvhdr->lv_next = ctx->lv_chain; 965 966 tic->t_curr_res -= lvhdr->lv_bytes; 967 } 968 969 /* 970 * Pull all the log vectors off the items in the CIL, and remove the items from 971 * the CIL. We don't need the CIL lock here because it's only needed on the 972 * transaction commit side which is currently locked out by the flush lock. 973 */ 974 static void 975 xlog_cil_build_lv_chain( 976 struct xfs_cil *cil, 977 struct xfs_cil_ctx *ctx, 978 uint32_t *num_iovecs, 979 uint32_t *num_bytes) 980 { 981 struct xfs_log_vec *lv = NULL; 982 983 while (!list_empty(&cil->xc_cil)) { 984 struct xfs_log_item *item; 985 986 item = list_first_entry(&cil->xc_cil, 987 struct xfs_log_item, li_cil); 988 list_del_init(&item->li_cil); 989 if (!ctx->lv_chain) 990 ctx->lv_chain = item->li_lv; 991 else 992 lv->lv_next = item->li_lv; 993 lv = item->li_lv; 994 item->li_lv = NULL; 995 *num_iovecs += lv->lv_niovecs; 996 997 /* we don't write ordered log vectors */ 998 if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED) 999 *num_bytes += lv->lv_bytes; 1000 } 1001 } 1002 1003 /* 1004 * Push the Committed Item List to the log. 1005 * 1006 * If the current sequence is the same as xc_push_seq we need to do a flush. If 1007 * xc_push_seq is less than the current sequence, then it has already been 1008 * flushed and we don't need to do anything - the caller will wait for it to 1009 * complete if necessary. 1010 * 1011 * xc_push_seq is checked unlocked against the sequence number for a match. 1012 * Hence we can allow log forces to run racily and not issue pushes for the 1013 * same sequence twice. If we get a race between multiple pushes for the same 1014 * sequence they will block on the first one and then abort, hence avoiding 1015 * needless pushes. 1016 */ 1017 static void 1018 xlog_cil_push_work( 1019 struct work_struct *work) 1020 { 1021 struct xfs_cil_ctx *ctx = 1022 container_of(work, struct xfs_cil_ctx, push_work); 1023 struct xfs_cil *cil = ctx->cil; 1024 struct xlog *log = cil->xc_log; 1025 struct xfs_cil_ctx *new_ctx; 1026 int num_iovecs = 0; 1027 int num_bytes = 0; 1028 int error = 0; 1029 struct xlog_cil_trans_hdr thdr; 1030 struct xfs_log_vec lvhdr = { NULL }; 1031 xfs_csn_t push_seq; 1032 bool push_commit_stable; 1033 1034 new_ctx = xlog_cil_ctx_alloc(); 1035 new_ctx->ticket = xlog_cil_ticket_alloc(log); 1036 1037 down_write(&cil->xc_ctx_lock); 1038 1039 spin_lock(&cil->xc_push_lock); 1040 push_seq = cil->xc_push_seq; 1041 ASSERT(push_seq <= ctx->sequence); 1042 push_commit_stable = cil->xc_push_commit_stable; 1043 cil->xc_push_commit_stable = false; 1044 1045 /* 1046 * As we are about to switch to a new, empty CIL context, we no longer 1047 * need to throttle tasks on CIL space overruns. Wake any waiters that 1048 * the hard push throttle may have caught so they can start committing 1049 * to the new context. The ctx->xc_push_lock provides the serialisation 1050 * necessary for safely using the lockless waitqueue_active() check in 1051 * this context. 1052 */ 1053 if (waitqueue_active(&cil->xc_push_wait)) 1054 wake_up_all(&cil->xc_push_wait); 1055 1056 /* 1057 * Check if we've anything to push. If there is nothing, then we don't 1058 * move on to a new sequence number and so we have to be able to push 1059 * this sequence again later. 1060 */ 1061 if (list_empty(&cil->xc_cil)) { 1062 cil->xc_push_seq = 0; 1063 spin_unlock(&cil->xc_push_lock); 1064 goto out_skip; 1065 } 1066 1067 1068 /* check for a previously pushed sequence */ 1069 if (push_seq < ctx->sequence) { 1070 spin_unlock(&cil->xc_push_lock); 1071 goto out_skip; 1072 } 1073 1074 /* 1075 * We are now going to push this context, so add it to the committing 1076 * list before we do anything else. This ensures that anyone waiting on 1077 * this push can easily detect the difference between a "push in 1078 * progress" and "CIL is empty, nothing to do". 1079 * 1080 * IOWs, a wait loop can now check for: 1081 * the current sequence not being found on the committing list; 1082 * an empty CIL; and 1083 * an unchanged sequence number 1084 * to detect a push that had nothing to do and therefore does not need 1085 * waiting on. If the CIL is not empty, we get put on the committing 1086 * list before emptying the CIL and bumping the sequence number. Hence 1087 * an empty CIL and an unchanged sequence number means we jumped out 1088 * above after doing nothing. 1089 * 1090 * Hence the waiter will either find the commit sequence on the 1091 * committing list or the sequence number will be unchanged and the CIL 1092 * still dirty. In that latter case, the push has not yet started, and 1093 * so the waiter will have to continue trying to check the CIL 1094 * committing list until it is found. In extreme cases of delay, the 1095 * sequence may fully commit between the attempts the wait makes to wait 1096 * on the commit sequence. 1097 */ 1098 list_add(&ctx->committing, &cil->xc_committing); 1099 spin_unlock(&cil->xc_push_lock); 1100 1101 xlog_cil_build_lv_chain(cil, ctx, &num_iovecs, &num_bytes); 1102 1103 /* 1104 * Switch the contexts so we can drop the context lock and move out 1105 * of a shared context. We can't just go straight to the commit record, 1106 * though - we need to synchronise with previous and future commits so 1107 * that the commit records are correctly ordered in the log to ensure 1108 * that we process items during log IO completion in the correct order. 1109 * 1110 * For example, if we get an EFI in one checkpoint and the EFD in the 1111 * next (e.g. due to log forces), we do not want the checkpoint with 1112 * the EFD to be committed before the checkpoint with the EFI. Hence 1113 * we must strictly order the commit records of the checkpoints so 1114 * that: a) the checkpoint callbacks are attached to the iclogs in the 1115 * correct order; and b) the checkpoints are replayed in correct order 1116 * in log recovery. 1117 * 1118 * Hence we need to add this context to the committing context list so 1119 * that higher sequences will wait for us to write out a commit record 1120 * before they do. 1121 * 1122 * xfs_log_force_seq requires us to mirror the new sequence into the cil 1123 * structure atomically with the addition of this sequence to the 1124 * committing list. This also ensures that we can do unlocked checks 1125 * against the current sequence in log forces without risking 1126 * deferencing a freed context pointer. 1127 */ 1128 spin_lock(&cil->xc_push_lock); 1129 xlog_cil_ctx_switch(cil, new_ctx); 1130 spin_unlock(&cil->xc_push_lock); 1131 up_write(&cil->xc_ctx_lock); 1132 1133 /* 1134 * Build a checkpoint transaction header and write it to the log to 1135 * begin the transaction. We need to account for the space used by the 1136 * transaction header here as it is not accounted for in xlog_write(). 1137 */ 1138 xlog_cil_build_trans_hdr(ctx, &thdr, &lvhdr, num_iovecs); 1139 num_bytes += lvhdr.lv_bytes; 1140 1141 error = xlog_cil_write_chain(ctx, &lvhdr, num_bytes); 1142 if (error) 1143 goto out_abort_free_ticket; 1144 1145 error = xlog_cil_write_commit_record(ctx); 1146 if (error) 1147 goto out_abort_free_ticket; 1148 1149 xfs_log_ticket_ungrant(log, ctx->ticket); 1150 1151 /* 1152 * If the checkpoint spans multiple iclogs, wait for all previous iclogs 1153 * to complete before we submit the commit_iclog. We can't use state 1154 * checks for this - ACTIVE can be either a past completed iclog or a 1155 * future iclog being filled, while WANT_SYNC through SYNC_DONE can be a 1156 * past or future iclog awaiting IO or ordered IO completion to be run. 1157 * In the latter case, if it's a future iclog and we wait on it, the we 1158 * will hang because it won't get processed through to ic_force_wait 1159 * wakeup until this commit_iclog is written to disk. Hence we use the 1160 * iclog header lsn and compare it to the commit lsn to determine if we 1161 * need to wait on iclogs or not. 1162 */ 1163 spin_lock(&log->l_icloglock); 1164 if (ctx->start_lsn != ctx->commit_lsn) { 1165 xfs_lsn_t plsn; 1166 1167 plsn = be64_to_cpu(ctx->commit_iclog->ic_prev->ic_header.h_lsn); 1168 if (plsn && XFS_LSN_CMP(plsn, ctx->commit_lsn) < 0) { 1169 /* 1170 * Waiting on ic_force_wait orders the completion of 1171 * iclogs older than ic_prev. Hence we only need to wait 1172 * on the most recent older iclog here. 1173 */ 1174 xlog_wait_on_iclog(ctx->commit_iclog->ic_prev); 1175 spin_lock(&log->l_icloglock); 1176 } 1177 1178 /* 1179 * We need to issue a pre-flush so that the ordering for this 1180 * checkpoint is correctly preserved down to stable storage. 1181 */ 1182 ctx->commit_iclog->ic_flags |= XLOG_ICL_NEED_FLUSH; 1183 } 1184 1185 /* 1186 * The commit iclog must be written to stable storage to guarantee 1187 * journal IO vs metadata writeback IO is correctly ordered on stable 1188 * storage. 1189 * 1190 * If the push caller needs the commit to be immediately stable and the 1191 * commit_iclog is not yet marked as XLOG_STATE_WANT_SYNC to indicate it 1192 * will be written when released, switch it's state to WANT_SYNC right 1193 * now. 1194 */ 1195 ctx->commit_iclog->ic_flags |= XLOG_ICL_NEED_FUA; 1196 if (push_commit_stable && 1197 ctx->commit_iclog->ic_state == XLOG_STATE_ACTIVE) 1198 xlog_state_switch_iclogs(log, ctx->commit_iclog, 0); 1199 xlog_state_release_iclog(log, ctx->commit_iclog); 1200 1201 /* Not safe to reference ctx now! */ 1202 1203 spin_unlock(&log->l_icloglock); 1204 return; 1205 1206 out_skip: 1207 up_write(&cil->xc_ctx_lock); 1208 xfs_log_ticket_put(new_ctx->ticket); 1209 kmem_free(new_ctx); 1210 return; 1211 1212 out_abort_free_ticket: 1213 xfs_log_ticket_ungrant(log, ctx->ticket); 1214 ASSERT(xlog_is_shutdown(log)); 1215 if (!ctx->commit_iclog) { 1216 xlog_cil_committed(ctx); 1217 return; 1218 } 1219 spin_lock(&log->l_icloglock); 1220 xlog_state_release_iclog(log, ctx->commit_iclog); 1221 /* Not safe to reference ctx now! */ 1222 spin_unlock(&log->l_icloglock); 1223 } 1224 1225 /* 1226 * We need to push CIL every so often so we don't cache more than we can fit in 1227 * the log. The limit really is that a checkpoint can't be more than half the 1228 * log (the current checkpoint is not allowed to overwrite the previous 1229 * checkpoint), but commit latency and memory usage limit this to a smaller 1230 * size. 1231 */ 1232 static void 1233 xlog_cil_push_background( 1234 struct xlog *log) __releases(cil->xc_ctx_lock) 1235 { 1236 struct xfs_cil *cil = log->l_cilp; 1237 1238 /* 1239 * The cil won't be empty because we are called while holding the 1240 * context lock so whatever we added to the CIL will still be there 1241 */ 1242 ASSERT(!list_empty(&cil->xc_cil)); 1243 1244 /* 1245 * Don't do a background push if we haven't used up all the 1246 * space available yet. 1247 */ 1248 if (cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log)) { 1249 up_read(&cil->xc_ctx_lock); 1250 return; 1251 } 1252 1253 spin_lock(&cil->xc_push_lock); 1254 if (cil->xc_push_seq < cil->xc_current_sequence) { 1255 cil->xc_push_seq = cil->xc_current_sequence; 1256 queue_work(cil->xc_push_wq, &cil->xc_ctx->push_work); 1257 } 1258 1259 /* 1260 * Drop the context lock now, we can't hold that if we need to sleep 1261 * because we are over the blocking threshold. The push_lock is still 1262 * held, so blocking threshold sleep/wakeup is still correctly 1263 * serialised here. 1264 */ 1265 up_read(&cil->xc_ctx_lock); 1266 1267 /* 1268 * If we are well over the space limit, throttle the work that is being 1269 * done until the push work on this context has begun. Enforce the hard 1270 * throttle on all transaction commits once it has been activated, even 1271 * if the committing transactions have resulted in the space usage 1272 * dipping back down under the hard limit. 1273 * 1274 * The ctx->xc_push_lock provides the serialisation necessary for safely 1275 * using the lockless waitqueue_active() check in this context. 1276 */ 1277 if (cil->xc_ctx->space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log) || 1278 waitqueue_active(&cil->xc_push_wait)) { 1279 trace_xfs_log_cil_wait(log, cil->xc_ctx->ticket); 1280 ASSERT(cil->xc_ctx->space_used < log->l_logsize); 1281 xlog_wait(&cil->xc_push_wait, &cil->xc_push_lock); 1282 return; 1283 } 1284 1285 spin_unlock(&cil->xc_push_lock); 1286 1287 } 1288 1289 /* 1290 * xlog_cil_push_now() is used to trigger an immediate CIL push to the sequence 1291 * number that is passed. When it returns, the work will be queued for 1292 * @push_seq, but it won't be completed. 1293 * 1294 * If the caller is performing a synchronous force, we will flush the workqueue 1295 * to get previously queued work moving to minimise the wait time they will 1296 * undergo waiting for all outstanding pushes to complete. The caller is 1297 * expected to do the required waiting for push_seq to complete. 1298 * 1299 * If the caller is performing an async push, we need to ensure that the 1300 * checkpoint is fully flushed out of the iclogs when we finish the push. If we 1301 * don't do this, then the commit record may remain sitting in memory in an 1302 * ACTIVE iclog. This then requires another full log force to push to disk, 1303 * which defeats the purpose of having an async, non-blocking CIL force 1304 * mechanism. Hence in this case we need to pass a flag to the push work to 1305 * indicate it needs to flush the commit record itself. 1306 */ 1307 static void 1308 xlog_cil_push_now( 1309 struct xlog *log, 1310 xfs_lsn_t push_seq, 1311 bool async) 1312 { 1313 struct xfs_cil *cil = log->l_cilp; 1314 1315 if (!cil) 1316 return; 1317 1318 ASSERT(push_seq && push_seq <= cil->xc_current_sequence); 1319 1320 /* start on any pending background push to minimise wait time on it */ 1321 if (!async) 1322 flush_workqueue(cil->xc_push_wq); 1323 1324 spin_lock(&cil->xc_push_lock); 1325 1326 /* 1327 * If this is an async flush request, we always need to set the 1328 * xc_push_commit_stable flag even if something else has already queued 1329 * a push. The flush caller is asking for the CIL to be on stable 1330 * storage when the next push completes, so regardless of who has queued 1331 * the push, the flush requires stable semantics from it. 1332 */ 1333 cil->xc_push_commit_stable = async; 1334 1335 /* 1336 * If the CIL is empty or we've already pushed the sequence then 1337 * there's no more work that we need to do. 1338 */ 1339 if (list_empty(&cil->xc_cil) || push_seq <= cil->xc_push_seq) { 1340 spin_unlock(&cil->xc_push_lock); 1341 return; 1342 } 1343 1344 cil->xc_push_seq = push_seq; 1345 queue_work(cil->xc_push_wq, &cil->xc_ctx->push_work); 1346 spin_unlock(&cil->xc_push_lock); 1347 } 1348 1349 bool 1350 xlog_cil_empty( 1351 struct xlog *log) 1352 { 1353 struct xfs_cil *cil = log->l_cilp; 1354 bool empty = false; 1355 1356 spin_lock(&cil->xc_push_lock); 1357 if (list_empty(&cil->xc_cil)) 1358 empty = true; 1359 spin_unlock(&cil->xc_push_lock); 1360 return empty; 1361 } 1362 1363 /* 1364 * Commit a transaction with the given vector to the Committed Item List. 1365 * 1366 * To do this, we need to format the item, pin it in memory if required and 1367 * account for the space used by the transaction. Once we have done that we 1368 * need to release the unused reservation for the transaction, attach the 1369 * transaction to the checkpoint context so we carry the busy extents through 1370 * to checkpoint completion, and then unlock all the items in the transaction. 1371 * 1372 * Called with the context lock already held in read mode to lock out 1373 * background commit, returns without it held once background commits are 1374 * allowed again. 1375 */ 1376 void 1377 xlog_cil_commit( 1378 struct xlog *log, 1379 struct xfs_trans *tp, 1380 xfs_csn_t *commit_seq, 1381 bool regrant) 1382 { 1383 struct xfs_cil *cil = log->l_cilp; 1384 struct xfs_log_item *lip, *next; 1385 1386 /* 1387 * Do all necessary memory allocation before we lock the CIL. 1388 * This ensures the allocation does not deadlock with a CIL 1389 * push in memory reclaim (e.g. from kswapd). 1390 */ 1391 xlog_cil_alloc_shadow_bufs(log, tp); 1392 1393 /* lock out background commit */ 1394 down_read(&cil->xc_ctx_lock); 1395 1396 xlog_cil_insert_items(log, tp); 1397 1398 if (regrant && !xlog_is_shutdown(log)) 1399 xfs_log_ticket_regrant(log, tp->t_ticket); 1400 else 1401 xfs_log_ticket_ungrant(log, tp->t_ticket); 1402 tp->t_ticket = NULL; 1403 xfs_trans_unreserve_and_mod_sb(tp); 1404 1405 /* 1406 * Once all the items of the transaction have been copied to the CIL, 1407 * the items can be unlocked and possibly freed. 1408 * 1409 * This needs to be done before we drop the CIL context lock because we 1410 * have to update state in the log items and unlock them before they go 1411 * to disk. If we don't, then the CIL checkpoint can race with us and 1412 * we can run checkpoint completion before we've updated and unlocked 1413 * the log items. This affects (at least) processing of stale buffers, 1414 * inodes and EFIs. 1415 */ 1416 trace_xfs_trans_commit_items(tp, _RET_IP_); 1417 list_for_each_entry_safe(lip, next, &tp->t_items, li_trans) { 1418 xfs_trans_del_item(lip); 1419 if (lip->li_ops->iop_committing) 1420 lip->li_ops->iop_committing(lip, cil->xc_ctx->sequence); 1421 } 1422 if (commit_seq) 1423 *commit_seq = cil->xc_ctx->sequence; 1424 1425 /* xlog_cil_push_background() releases cil->xc_ctx_lock */ 1426 xlog_cil_push_background(log); 1427 } 1428 1429 /* 1430 * Flush the CIL to stable storage but don't wait for it to complete. This 1431 * requires the CIL push to ensure the commit record for the push hits the disk, 1432 * but otherwise is no different to a push done from a log force. 1433 */ 1434 void 1435 xlog_cil_flush( 1436 struct xlog *log) 1437 { 1438 xfs_csn_t seq = log->l_cilp->xc_current_sequence; 1439 1440 trace_xfs_log_force(log->l_mp, seq, _RET_IP_); 1441 xlog_cil_push_now(log, seq, true); 1442 1443 /* 1444 * If the CIL is empty, make sure that any previous checkpoint that may 1445 * still be in an active iclog is pushed to stable storage. 1446 */ 1447 if (list_empty(&log->l_cilp->xc_cil)) 1448 xfs_log_force(log->l_mp, 0); 1449 } 1450 1451 /* 1452 * Conditionally push the CIL based on the sequence passed in. 1453 * 1454 * We only need to push if we haven't already pushed the sequence number given. 1455 * Hence the only time we will trigger a push here is if the push sequence is 1456 * the same as the current context. 1457 * 1458 * We return the current commit lsn to allow the callers to determine if a 1459 * iclog flush is necessary following this call. 1460 */ 1461 xfs_lsn_t 1462 xlog_cil_force_seq( 1463 struct xlog *log, 1464 xfs_csn_t sequence) 1465 { 1466 struct xfs_cil *cil = log->l_cilp; 1467 struct xfs_cil_ctx *ctx; 1468 xfs_lsn_t commit_lsn = NULLCOMMITLSN; 1469 1470 ASSERT(sequence <= cil->xc_current_sequence); 1471 1472 if (!sequence) 1473 sequence = cil->xc_current_sequence; 1474 trace_xfs_log_force(log->l_mp, sequence, _RET_IP_); 1475 1476 /* 1477 * check to see if we need to force out the current context. 1478 * xlog_cil_push() handles racing pushes for the same sequence, 1479 * so no need to deal with it here. 1480 */ 1481 restart: 1482 xlog_cil_push_now(log, sequence, false); 1483 1484 /* 1485 * See if we can find a previous sequence still committing. 1486 * We need to wait for all previous sequence commits to complete 1487 * before allowing the force of push_seq to go ahead. Hence block 1488 * on commits for those as well. 1489 */ 1490 spin_lock(&cil->xc_push_lock); 1491 list_for_each_entry(ctx, &cil->xc_committing, committing) { 1492 /* 1493 * Avoid getting stuck in this loop because we were woken by the 1494 * shutdown, but then went back to sleep once already in the 1495 * shutdown state. 1496 */ 1497 if (xlog_is_shutdown(log)) 1498 goto out_shutdown; 1499 if (ctx->sequence > sequence) 1500 continue; 1501 if (!ctx->commit_lsn) { 1502 /* 1503 * It is still being pushed! Wait for the push to 1504 * complete, then start again from the beginning. 1505 */ 1506 XFS_STATS_INC(log->l_mp, xs_log_force_sleep); 1507 xlog_wait(&cil->xc_commit_wait, &cil->xc_push_lock); 1508 goto restart; 1509 } 1510 if (ctx->sequence != sequence) 1511 continue; 1512 /* found it! */ 1513 commit_lsn = ctx->commit_lsn; 1514 } 1515 1516 /* 1517 * The call to xlog_cil_push_now() executes the push in the background. 1518 * Hence by the time we have got here it our sequence may not have been 1519 * pushed yet. This is true if the current sequence still matches the 1520 * push sequence after the above wait loop and the CIL still contains 1521 * dirty objects. This is guaranteed by the push code first adding the 1522 * context to the committing list before emptying the CIL. 1523 * 1524 * Hence if we don't find the context in the committing list and the 1525 * current sequence number is unchanged then the CIL contents are 1526 * significant. If the CIL is empty, if means there was nothing to push 1527 * and that means there is nothing to wait for. If the CIL is not empty, 1528 * it means we haven't yet started the push, because if it had started 1529 * we would have found the context on the committing list. 1530 */ 1531 if (sequence == cil->xc_current_sequence && 1532 !list_empty(&cil->xc_cil)) { 1533 spin_unlock(&cil->xc_push_lock); 1534 goto restart; 1535 } 1536 1537 spin_unlock(&cil->xc_push_lock); 1538 return commit_lsn; 1539 1540 /* 1541 * We detected a shutdown in progress. We need to trigger the log force 1542 * to pass through it's iclog state machine error handling, even though 1543 * we are already in a shutdown state. Hence we can't return 1544 * NULLCOMMITLSN here as that has special meaning to log forces (i.e. 1545 * LSN is already stable), so we return a zero LSN instead. 1546 */ 1547 out_shutdown: 1548 spin_unlock(&cil->xc_push_lock); 1549 return 0; 1550 } 1551 1552 /* 1553 * Perform initial CIL structure initialisation. 1554 */ 1555 int 1556 xlog_cil_init( 1557 struct xlog *log) 1558 { 1559 struct xfs_cil *cil; 1560 struct xfs_cil_ctx *ctx; 1561 1562 cil = kmem_zalloc(sizeof(*cil), KM_MAYFAIL); 1563 if (!cil) 1564 return -ENOMEM; 1565 /* 1566 * Limit the CIL pipeline depth to 4 concurrent works to bound the 1567 * concurrency the log spinlocks will be exposed to. 1568 */ 1569 cil->xc_push_wq = alloc_workqueue("xfs-cil/%s", 1570 XFS_WQFLAGS(WQ_FREEZABLE | WQ_MEM_RECLAIM | WQ_UNBOUND), 1571 4, log->l_mp->m_super->s_id); 1572 if (!cil->xc_push_wq) 1573 goto out_destroy_cil; 1574 1575 INIT_LIST_HEAD(&cil->xc_cil); 1576 INIT_LIST_HEAD(&cil->xc_committing); 1577 spin_lock_init(&cil->xc_cil_lock); 1578 spin_lock_init(&cil->xc_push_lock); 1579 init_waitqueue_head(&cil->xc_push_wait); 1580 init_rwsem(&cil->xc_ctx_lock); 1581 init_waitqueue_head(&cil->xc_start_wait); 1582 init_waitqueue_head(&cil->xc_commit_wait); 1583 cil->xc_log = log; 1584 log->l_cilp = cil; 1585 1586 ctx = xlog_cil_ctx_alloc(); 1587 xlog_cil_ctx_switch(cil, ctx); 1588 1589 return 0; 1590 1591 out_destroy_cil: 1592 kmem_free(cil); 1593 return -ENOMEM; 1594 } 1595 1596 void 1597 xlog_cil_destroy( 1598 struct xlog *log) 1599 { 1600 if (log->l_cilp->xc_ctx) { 1601 if (log->l_cilp->xc_ctx->ticket) 1602 xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket); 1603 kmem_free(log->l_cilp->xc_ctx); 1604 } 1605 1606 ASSERT(list_empty(&log->l_cilp->xc_cil)); 1607 destroy_workqueue(log->l_cilp->xc_push_wq); 1608 kmem_free(log->l_cilp); 1609 } 1610 1611