1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (c) 2010 Red Hat, Inc. All Rights Reserved. 4 */ 5 6 #include "xfs.h" 7 #include "xfs_fs.h" 8 #include "xfs_format.h" 9 #include "xfs_log_format.h" 10 #include "xfs_shared.h" 11 #include "xfs_trans_resv.h" 12 #include "xfs_mount.h" 13 #include "xfs_extent_busy.h" 14 #include "xfs_trans.h" 15 #include "xfs_trans_priv.h" 16 #include "xfs_log.h" 17 #include "xfs_log_priv.h" 18 #include "xfs_trace.h" 19 20 struct workqueue_struct *xfs_discard_wq; 21 22 /* 23 * Allocate a new ticket. Failing to get a new ticket makes it really hard to 24 * recover, so we don't allow failure here. Also, we allocate in a context that 25 * we don't want to be issuing transactions from, so we need to tell the 26 * allocation code this as well. 27 * 28 * We don't reserve any space for the ticket - we are going to steal whatever 29 * space we require from transactions as they commit. To ensure we reserve all 30 * the space required, we need to set the current reservation of the ticket to 31 * zero so that we know to steal the initial transaction overhead from the 32 * first transaction commit. 33 */ 34 static struct xlog_ticket * 35 xlog_cil_ticket_alloc( 36 struct xlog *log) 37 { 38 struct xlog_ticket *tic; 39 40 tic = xlog_ticket_alloc(log, 0, 1, 0); 41 42 /* 43 * set the current reservation to zero so we know to steal the basic 44 * transaction overhead reservation from the first transaction commit. 45 */ 46 tic->t_curr_res = 0; 47 return tic; 48 } 49 50 /* 51 * Check if the current log item was first committed in this sequence. 52 * We can't rely on just the log item being in the CIL, we have to check 53 * the recorded commit sequence number. 54 * 55 * Note: for this to be used in a non-racy manner, it has to be called with 56 * CIL flushing locked out. As a result, it should only be used during the 57 * transaction commit process when deciding what to format into the item. 58 */ 59 static bool 60 xlog_item_in_current_chkpt( 61 struct xfs_cil *cil, 62 struct xfs_log_item *lip) 63 { 64 if (list_empty(&lip->li_cil)) 65 return false; 66 67 /* 68 * li_seq is written on the first commit of a log item to record the 69 * first checkpoint it is written to. Hence if it is different to the 70 * current sequence, we're in a new checkpoint. 71 */ 72 return lip->li_seq == READ_ONCE(cil->xc_current_sequence); 73 } 74 75 bool 76 xfs_log_item_in_current_chkpt( 77 struct xfs_log_item *lip) 78 { 79 return xlog_item_in_current_chkpt(lip->li_log->l_cilp, lip); 80 } 81 82 /* 83 * Unavoidable forward declaration - xlog_cil_push_work() calls 84 * xlog_cil_ctx_alloc() itself. 85 */ 86 static void xlog_cil_push_work(struct work_struct *work); 87 88 static struct xfs_cil_ctx * 89 xlog_cil_ctx_alloc(void) 90 { 91 struct xfs_cil_ctx *ctx; 92 93 ctx = kmem_zalloc(sizeof(*ctx), KM_NOFS); 94 INIT_LIST_HEAD(&ctx->committing); 95 INIT_LIST_HEAD(&ctx->busy_extents); 96 INIT_WORK(&ctx->push_work, xlog_cil_push_work); 97 return ctx; 98 } 99 100 static void 101 xlog_cil_ctx_switch( 102 struct xfs_cil *cil, 103 struct xfs_cil_ctx *ctx) 104 { 105 ctx->sequence = ++cil->xc_current_sequence; 106 ctx->cil = cil; 107 cil->xc_ctx = ctx; 108 } 109 110 /* 111 * After the first stage of log recovery is done, we know where the head and 112 * tail of the log are. We need this log initialisation done before we can 113 * initialise the first CIL checkpoint context. 114 * 115 * Here we allocate a log ticket to track space usage during a CIL push. This 116 * ticket is passed to xlog_write() directly so that we don't slowly leak log 117 * space by failing to account for space used by log headers and additional 118 * region headers for split regions. 119 */ 120 void 121 xlog_cil_init_post_recovery( 122 struct xlog *log) 123 { 124 log->l_cilp->xc_ctx->ticket = xlog_cil_ticket_alloc(log); 125 log->l_cilp->xc_ctx->sequence = 1; 126 } 127 128 static inline int 129 xlog_cil_iovec_space( 130 uint niovecs) 131 { 132 return round_up((sizeof(struct xfs_log_vec) + 133 niovecs * sizeof(struct xfs_log_iovec)), 134 sizeof(uint64_t)); 135 } 136 137 /* 138 * shadow buffers can be large, so we need to use kvmalloc() here to ensure 139 * success. Unfortunately, kvmalloc() only allows GFP_KERNEL contexts to fall 140 * back to vmalloc, so we can't actually do anything useful with gfp flags to 141 * control the kmalloc() behaviour within kvmalloc(). Hence kmalloc() will do 142 * direct reclaim and compaction in the slow path, both of which are 143 * horrendously expensive. We just want kmalloc to fail fast and fall back to 144 * vmalloc if it can't get somethign straight away from the free lists or buddy 145 * allocator. Hence we have to open code kvmalloc outselves here. 146 * 147 * Also, we are in memalloc_nofs_save task context here, so despite the use of 148 * GFP_KERNEL here, we are actually going to be doing GFP_NOFS allocations. This 149 * is actually the only way to make vmalloc() do GFP_NOFS allocations, so lets 150 * just all pretend this is a GFP_KERNEL context operation.... 151 */ 152 static inline void * 153 xlog_cil_kvmalloc( 154 size_t buf_size) 155 { 156 gfp_t flags = GFP_KERNEL; 157 void *p; 158 159 flags &= ~__GFP_DIRECT_RECLAIM; 160 flags |= __GFP_NOWARN | __GFP_NORETRY; 161 do { 162 p = kmalloc(buf_size, flags); 163 if (!p) 164 p = vmalloc(buf_size); 165 } while (!p); 166 167 return p; 168 } 169 170 /* 171 * Allocate or pin log vector buffers for CIL insertion. 172 * 173 * The CIL currently uses disposable buffers for copying a snapshot of the 174 * modified items into the log during a push. The biggest problem with this is 175 * the requirement to allocate the disposable buffer during the commit if: 176 * a) does not exist; or 177 * b) it is too small 178 * 179 * If we do this allocation within xlog_cil_insert_format_items(), it is done 180 * under the xc_ctx_lock, which means that a CIL push cannot occur during 181 * the memory allocation. This means that we have a potential deadlock situation 182 * under low memory conditions when we have lots of dirty metadata pinned in 183 * the CIL and we need a CIL commit to occur to free memory. 184 * 185 * To avoid this, we need to move the memory allocation outside the 186 * xc_ctx_lock, but because the log vector buffers are disposable, that opens 187 * up a TOCTOU race condition w.r.t. the CIL committing and removing the log 188 * vector buffers between the check and the formatting of the item into the 189 * log vector buffer within the xc_ctx_lock. 190 * 191 * Because the log vector buffer needs to be unchanged during the CIL push 192 * process, we cannot share the buffer between the transaction commit (which 193 * modifies the buffer) and the CIL push context that is writing the changes 194 * into the log. This means skipping preallocation of buffer space is 195 * unreliable, but we most definitely do not want to be allocating and freeing 196 * buffers unnecessarily during commits when overwrites can be done safely. 197 * 198 * The simplest solution to this problem is to allocate a shadow buffer when a 199 * log item is committed for the second time, and then to only use this buffer 200 * if necessary. The buffer can remain attached to the log item until such time 201 * it is needed, and this is the buffer that is reallocated to match the size of 202 * the incoming modification. Then during the formatting of the item we can swap 203 * the active buffer with the new one if we can't reuse the existing buffer. We 204 * don't free the old buffer as it may be reused on the next modification if 205 * it's size is right, otherwise we'll free and reallocate it at that point. 206 * 207 * This function builds a vector for the changes in each log item in the 208 * transaction. It then works out the length of the buffer needed for each log 209 * item, allocates them and attaches the vector to the log item in preparation 210 * for the formatting step which occurs under the xc_ctx_lock. 211 * 212 * While this means the memory footprint goes up, it avoids the repeated 213 * alloc/free pattern that repeated modifications of an item would otherwise 214 * cause, and hence minimises the CPU overhead of such behaviour. 215 */ 216 static void 217 xlog_cil_alloc_shadow_bufs( 218 struct xlog *log, 219 struct xfs_trans *tp) 220 { 221 struct xfs_log_item *lip; 222 223 list_for_each_entry(lip, &tp->t_items, li_trans) { 224 struct xfs_log_vec *lv; 225 int niovecs = 0; 226 int nbytes = 0; 227 int buf_size; 228 bool ordered = false; 229 230 /* Skip items which aren't dirty in this transaction. */ 231 if (!test_bit(XFS_LI_DIRTY, &lip->li_flags)) 232 continue; 233 234 /* get number of vecs and size of data to be stored */ 235 lip->li_ops->iop_size(lip, &niovecs, &nbytes); 236 237 /* 238 * Ordered items need to be tracked but we do not wish to write 239 * them. We need a logvec to track the object, but we do not 240 * need an iovec or buffer to be allocated for copying data. 241 */ 242 if (niovecs == XFS_LOG_VEC_ORDERED) { 243 ordered = true; 244 niovecs = 0; 245 nbytes = 0; 246 } 247 248 /* 249 * We 64-bit align the length of each iovec so that the start of 250 * the next one is naturally aligned. We'll need to account for 251 * that slack space here. 252 * 253 * We also add the xlog_op_header to each region when 254 * formatting, but that's not accounted to the size of the item 255 * at this point. Hence we'll need an addition number of bytes 256 * for each vector to hold an opheader. 257 * 258 * Then round nbytes up to 64-bit alignment so that the initial 259 * buffer alignment is easy to calculate and verify. 260 */ 261 nbytes += niovecs * 262 (sizeof(uint64_t) + sizeof(struct xlog_op_header)); 263 nbytes = round_up(nbytes, sizeof(uint64_t)); 264 265 /* 266 * The data buffer needs to start 64-bit aligned, so round up 267 * that space to ensure we can align it appropriately and not 268 * overrun the buffer. 269 */ 270 buf_size = nbytes + xlog_cil_iovec_space(niovecs); 271 272 /* 273 * if we have no shadow buffer, or it is too small, we need to 274 * reallocate it. 275 */ 276 if (!lip->li_lv_shadow || 277 buf_size > lip->li_lv_shadow->lv_size) { 278 /* 279 * We free and allocate here as a realloc would copy 280 * unnecessary data. We don't use kvzalloc() for the 281 * same reason - we don't need to zero the data area in 282 * the buffer, only the log vector header and the iovec 283 * storage. 284 */ 285 kmem_free(lip->li_lv_shadow); 286 lv = xlog_cil_kvmalloc(buf_size); 287 288 memset(lv, 0, xlog_cil_iovec_space(niovecs)); 289 290 lv->lv_item = lip; 291 lv->lv_size = buf_size; 292 if (ordered) 293 lv->lv_buf_len = XFS_LOG_VEC_ORDERED; 294 else 295 lv->lv_iovecp = (struct xfs_log_iovec *)&lv[1]; 296 lip->li_lv_shadow = lv; 297 } else { 298 /* same or smaller, optimise common overwrite case */ 299 lv = lip->li_lv_shadow; 300 if (ordered) 301 lv->lv_buf_len = XFS_LOG_VEC_ORDERED; 302 else 303 lv->lv_buf_len = 0; 304 lv->lv_bytes = 0; 305 lv->lv_next = NULL; 306 } 307 308 /* Ensure the lv is set up according to ->iop_size */ 309 lv->lv_niovecs = niovecs; 310 311 /* The allocated data region lies beyond the iovec region */ 312 lv->lv_buf = (char *)lv + xlog_cil_iovec_space(niovecs); 313 } 314 315 } 316 317 /* 318 * Prepare the log item for insertion into the CIL. Calculate the difference in 319 * log space it will consume, and if it is a new item pin it as well. 320 */ 321 STATIC void 322 xfs_cil_prepare_item( 323 struct xlog *log, 324 struct xfs_log_vec *lv, 325 struct xfs_log_vec *old_lv, 326 int *diff_len) 327 { 328 /* Account for the new LV being passed in */ 329 if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED) 330 *diff_len += lv->lv_bytes; 331 332 /* 333 * If there is no old LV, this is the first time we've seen the item in 334 * this CIL context and so we need to pin it. If we are replacing the 335 * old_lv, then remove the space it accounts for and make it the shadow 336 * buffer for later freeing. In both cases we are now switching to the 337 * shadow buffer, so update the pointer to it appropriately. 338 */ 339 if (!old_lv) { 340 if (lv->lv_item->li_ops->iop_pin) 341 lv->lv_item->li_ops->iop_pin(lv->lv_item); 342 lv->lv_item->li_lv_shadow = NULL; 343 } else if (old_lv != lv) { 344 ASSERT(lv->lv_buf_len != XFS_LOG_VEC_ORDERED); 345 346 *diff_len -= old_lv->lv_bytes; 347 lv->lv_item->li_lv_shadow = old_lv; 348 } 349 350 /* attach new log vector to log item */ 351 lv->lv_item->li_lv = lv; 352 353 /* 354 * If this is the first time the item is being committed to the 355 * CIL, store the sequence number on the log item so we can 356 * tell in future commits whether this is the first checkpoint 357 * the item is being committed into. 358 */ 359 if (!lv->lv_item->li_seq) 360 lv->lv_item->li_seq = log->l_cilp->xc_ctx->sequence; 361 } 362 363 /* 364 * Format log item into a flat buffers 365 * 366 * For delayed logging, we need to hold a formatted buffer containing all the 367 * changes on the log item. This enables us to relog the item in memory and 368 * write it out asynchronously without needing to relock the object that was 369 * modified at the time it gets written into the iclog. 370 * 371 * This function takes the prepared log vectors attached to each log item, and 372 * formats the changes into the log vector buffer. The buffer it uses is 373 * dependent on the current state of the vector in the CIL - the shadow lv is 374 * guaranteed to be large enough for the current modification, but we will only 375 * use that if we can't reuse the existing lv. If we can't reuse the existing 376 * lv, then simple swap it out for the shadow lv. We don't free it - that is 377 * done lazily either by th enext modification or the freeing of the log item. 378 * 379 * We don't set up region headers during this process; we simply copy the 380 * regions into the flat buffer. We can do this because we still have to do a 381 * formatting step to write the regions into the iclog buffer. Writing the 382 * ophdrs during the iclog write means that we can support splitting large 383 * regions across iclog boundares without needing a change in the format of the 384 * item/region encapsulation. 385 * 386 * Hence what we need to do now is change the rewrite the vector array to point 387 * to the copied region inside the buffer we just allocated. This allows us to 388 * format the regions into the iclog as though they are being formatted 389 * directly out of the objects themselves. 390 */ 391 static void 392 xlog_cil_insert_format_items( 393 struct xlog *log, 394 struct xfs_trans *tp, 395 int *diff_len) 396 { 397 struct xfs_log_item *lip; 398 399 /* Bail out if we didn't find a log item. */ 400 if (list_empty(&tp->t_items)) { 401 ASSERT(0); 402 return; 403 } 404 405 list_for_each_entry(lip, &tp->t_items, li_trans) { 406 struct xfs_log_vec *lv; 407 struct xfs_log_vec *old_lv = NULL; 408 struct xfs_log_vec *shadow; 409 bool ordered = false; 410 411 /* Skip items which aren't dirty in this transaction. */ 412 if (!test_bit(XFS_LI_DIRTY, &lip->li_flags)) 413 continue; 414 415 /* 416 * The formatting size information is already attached to 417 * the shadow lv on the log item. 418 */ 419 shadow = lip->li_lv_shadow; 420 if (shadow->lv_buf_len == XFS_LOG_VEC_ORDERED) 421 ordered = true; 422 423 /* Skip items that do not have any vectors for writing */ 424 if (!shadow->lv_niovecs && !ordered) 425 continue; 426 427 /* compare to existing item size */ 428 old_lv = lip->li_lv; 429 if (lip->li_lv && shadow->lv_size <= lip->li_lv->lv_size) { 430 /* same or smaller, optimise common overwrite case */ 431 lv = lip->li_lv; 432 lv->lv_next = NULL; 433 434 if (ordered) 435 goto insert; 436 437 /* 438 * set the item up as though it is a new insertion so 439 * that the space reservation accounting is correct. 440 */ 441 *diff_len -= lv->lv_bytes; 442 443 /* Ensure the lv is set up according to ->iop_size */ 444 lv->lv_niovecs = shadow->lv_niovecs; 445 446 /* reset the lv buffer information for new formatting */ 447 lv->lv_buf_len = 0; 448 lv->lv_bytes = 0; 449 lv->lv_buf = (char *)lv + 450 xlog_cil_iovec_space(lv->lv_niovecs); 451 } else { 452 /* switch to shadow buffer! */ 453 lv = shadow; 454 lv->lv_item = lip; 455 if (ordered) { 456 /* track as an ordered logvec */ 457 ASSERT(lip->li_lv == NULL); 458 goto insert; 459 } 460 } 461 462 ASSERT(IS_ALIGNED((unsigned long)lv->lv_buf, sizeof(uint64_t))); 463 lip->li_ops->iop_format(lip, lv); 464 insert: 465 xfs_cil_prepare_item(log, lv, old_lv, diff_len); 466 } 467 } 468 469 /* 470 * Insert the log items into the CIL and calculate the difference in space 471 * consumed by the item. Add the space to the checkpoint ticket and calculate 472 * if the change requires additional log metadata. If it does, take that space 473 * as well. Remove the amount of space we added to the checkpoint ticket from 474 * the current transaction ticket so that the accounting works out correctly. 475 */ 476 static void 477 xlog_cil_insert_items( 478 struct xlog *log, 479 struct xfs_trans *tp, 480 uint32_t released_space) 481 { 482 struct xfs_cil *cil = log->l_cilp; 483 struct xfs_cil_ctx *ctx = cil->xc_ctx; 484 struct xfs_log_item *lip; 485 int len = 0; 486 int iclog_space; 487 int iovhdr_res = 0, split_res = 0, ctx_res = 0; 488 489 ASSERT(tp); 490 491 /* 492 * We can do this safely because the context can't checkpoint until we 493 * are done so it doesn't matter exactly how we update the CIL. 494 */ 495 xlog_cil_insert_format_items(log, tp, &len); 496 497 spin_lock(&cil->xc_cil_lock); 498 499 /* attach the transaction to the CIL if it has any busy extents */ 500 if (!list_empty(&tp->t_busy)) 501 list_splice_init(&tp->t_busy, &ctx->busy_extents); 502 503 /* 504 * Now transfer enough transaction reservation to the context ticket 505 * for the checkpoint. The context ticket is special - the unit 506 * reservation has to grow as well as the current reservation as we 507 * steal from tickets so we can correctly determine the space used 508 * during the transaction commit. 509 */ 510 if (ctx->ticket->t_curr_res == 0) { 511 ctx_res = ctx->ticket->t_unit_res; 512 ctx->ticket->t_curr_res = ctx_res; 513 tp->t_ticket->t_curr_res -= ctx_res; 514 } 515 516 /* do we need space for more log record headers? */ 517 iclog_space = log->l_iclog_size - log->l_iclog_hsize; 518 if (len > 0 && (ctx->space_used / iclog_space != 519 (ctx->space_used + len) / iclog_space)) { 520 split_res = (len + iclog_space - 1) / iclog_space; 521 /* need to take into account split region headers, too */ 522 split_res *= log->l_iclog_hsize + sizeof(struct xlog_op_header); 523 ctx->ticket->t_unit_res += split_res; 524 ctx->ticket->t_curr_res += split_res; 525 tp->t_ticket->t_curr_res -= split_res; 526 ASSERT(tp->t_ticket->t_curr_res >= len); 527 } 528 tp->t_ticket->t_curr_res -= len; 529 tp->t_ticket->t_curr_res += released_space; 530 ctx->space_used += len; 531 ctx->space_used -= released_space; 532 533 /* 534 * If we've overrun the reservation, dump the tx details before we move 535 * the log items. Shutdown is imminent... 536 */ 537 if (WARN_ON(tp->t_ticket->t_curr_res < 0)) { 538 xfs_warn(log->l_mp, "Transaction log reservation overrun:"); 539 xfs_warn(log->l_mp, 540 " log items: %d bytes (iov hdrs: %d bytes)", 541 len, iovhdr_res); 542 xfs_warn(log->l_mp, " split region headers: %d bytes", 543 split_res); 544 xfs_warn(log->l_mp, " ctx ticket: %d bytes", ctx_res); 545 xlog_print_trans(tp); 546 } 547 548 /* 549 * Now (re-)position everything modified at the tail of the CIL. 550 * We do this here so we only need to take the CIL lock once during 551 * the transaction commit. 552 */ 553 list_for_each_entry(lip, &tp->t_items, li_trans) { 554 555 /* Skip items which aren't dirty in this transaction. */ 556 if (!test_bit(XFS_LI_DIRTY, &lip->li_flags)) 557 continue; 558 559 /* 560 * Only move the item if it isn't already at the tail. This is 561 * to prevent a transient list_empty() state when reinserting 562 * an item that is already the only item in the CIL. 563 */ 564 if (!list_is_last(&lip->li_cil, &cil->xc_cil)) 565 list_move_tail(&lip->li_cil, &cil->xc_cil); 566 } 567 568 spin_unlock(&cil->xc_cil_lock); 569 570 if (tp->t_ticket->t_curr_res < 0) 571 xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR); 572 } 573 574 static void 575 xlog_cil_free_logvec( 576 struct xfs_log_vec *log_vector) 577 { 578 struct xfs_log_vec *lv; 579 580 for (lv = log_vector; lv; ) { 581 struct xfs_log_vec *next = lv->lv_next; 582 kmem_free(lv); 583 lv = next; 584 } 585 } 586 587 static void 588 xlog_discard_endio_work( 589 struct work_struct *work) 590 { 591 struct xfs_cil_ctx *ctx = 592 container_of(work, struct xfs_cil_ctx, discard_endio_work); 593 struct xfs_mount *mp = ctx->cil->xc_log->l_mp; 594 595 xfs_extent_busy_clear(mp, &ctx->busy_extents, false); 596 kmem_free(ctx); 597 } 598 599 /* 600 * Queue up the actual completion to a thread to avoid IRQ-safe locking for 601 * pagb_lock. Note that we need a unbounded workqueue, otherwise we might 602 * get the execution delayed up to 30 seconds for weird reasons. 603 */ 604 static void 605 xlog_discard_endio( 606 struct bio *bio) 607 { 608 struct xfs_cil_ctx *ctx = bio->bi_private; 609 610 INIT_WORK(&ctx->discard_endio_work, xlog_discard_endio_work); 611 queue_work(xfs_discard_wq, &ctx->discard_endio_work); 612 bio_put(bio); 613 } 614 615 static void 616 xlog_discard_busy_extents( 617 struct xfs_mount *mp, 618 struct xfs_cil_ctx *ctx) 619 { 620 struct list_head *list = &ctx->busy_extents; 621 struct xfs_extent_busy *busyp; 622 struct bio *bio = NULL; 623 struct blk_plug plug; 624 int error = 0; 625 626 ASSERT(xfs_has_discard(mp)); 627 628 blk_start_plug(&plug); 629 list_for_each_entry(busyp, list, list) { 630 trace_xfs_discard_extent(mp, busyp->agno, busyp->bno, 631 busyp->length); 632 633 error = __blkdev_issue_discard(mp->m_ddev_targp->bt_bdev, 634 XFS_AGB_TO_DADDR(mp, busyp->agno, busyp->bno), 635 XFS_FSB_TO_BB(mp, busyp->length), 636 GFP_NOFS, 0, &bio); 637 if (error && error != -EOPNOTSUPP) { 638 xfs_info(mp, 639 "discard failed for extent [0x%llx,%u], error %d", 640 (unsigned long long)busyp->bno, 641 busyp->length, 642 error); 643 break; 644 } 645 } 646 647 if (bio) { 648 bio->bi_private = ctx; 649 bio->bi_end_io = xlog_discard_endio; 650 submit_bio(bio); 651 } else { 652 xlog_discard_endio_work(&ctx->discard_endio_work); 653 } 654 blk_finish_plug(&plug); 655 } 656 657 /* 658 * Mark all items committed and clear busy extents. We free the log vector 659 * chains in a separate pass so that we unpin the log items as quickly as 660 * possible. 661 */ 662 static void 663 xlog_cil_committed( 664 struct xfs_cil_ctx *ctx) 665 { 666 struct xfs_mount *mp = ctx->cil->xc_log->l_mp; 667 bool abort = xlog_is_shutdown(ctx->cil->xc_log); 668 669 /* 670 * If the I/O failed, we're aborting the commit and already shutdown. 671 * Wake any commit waiters before aborting the log items so we don't 672 * block async log pushers on callbacks. Async log pushers explicitly do 673 * not wait on log force completion because they may be holding locks 674 * required to unpin items. 675 */ 676 if (abort) { 677 spin_lock(&ctx->cil->xc_push_lock); 678 wake_up_all(&ctx->cil->xc_start_wait); 679 wake_up_all(&ctx->cil->xc_commit_wait); 680 spin_unlock(&ctx->cil->xc_push_lock); 681 } 682 683 xfs_trans_committed_bulk(ctx->cil->xc_log->l_ailp, ctx->lv_chain, 684 ctx->start_lsn, abort); 685 686 xfs_extent_busy_sort(&ctx->busy_extents); 687 xfs_extent_busy_clear(mp, &ctx->busy_extents, 688 xfs_has_discard(mp) && !abort); 689 690 spin_lock(&ctx->cil->xc_push_lock); 691 list_del(&ctx->committing); 692 spin_unlock(&ctx->cil->xc_push_lock); 693 694 xlog_cil_free_logvec(ctx->lv_chain); 695 696 if (!list_empty(&ctx->busy_extents)) 697 xlog_discard_busy_extents(mp, ctx); 698 else 699 kmem_free(ctx); 700 } 701 702 void 703 xlog_cil_process_committed( 704 struct list_head *list) 705 { 706 struct xfs_cil_ctx *ctx; 707 708 while ((ctx = list_first_entry_or_null(list, 709 struct xfs_cil_ctx, iclog_entry))) { 710 list_del(&ctx->iclog_entry); 711 xlog_cil_committed(ctx); 712 } 713 } 714 715 /* 716 * Record the LSN of the iclog we were just granted space to start writing into. 717 * If the context doesn't have a start_lsn recorded, then this iclog will 718 * contain the start record for the checkpoint. Otherwise this write contains 719 * the commit record for the checkpoint. 720 */ 721 void 722 xlog_cil_set_ctx_write_state( 723 struct xfs_cil_ctx *ctx, 724 struct xlog_in_core *iclog) 725 { 726 struct xfs_cil *cil = ctx->cil; 727 xfs_lsn_t lsn = be64_to_cpu(iclog->ic_header.h_lsn); 728 729 ASSERT(!ctx->commit_lsn); 730 if (!ctx->start_lsn) { 731 spin_lock(&cil->xc_push_lock); 732 /* 733 * The LSN we need to pass to the log items on transaction 734 * commit is the LSN reported by the first log vector write, not 735 * the commit lsn. If we use the commit record lsn then we can 736 * move the grant write head beyond the tail LSN and overwrite 737 * it. 738 */ 739 ctx->start_lsn = lsn; 740 wake_up_all(&cil->xc_start_wait); 741 spin_unlock(&cil->xc_push_lock); 742 743 /* 744 * Make sure the metadata we are about to overwrite in the log 745 * has been flushed to stable storage before this iclog is 746 * issued. 747 */ 748 spin_lock(&cil->xc_log->l_icloglock); 749 iclog->ic_flags |= XLOG_ICL_NEED_FLUSH; 750 spin_unlock(&cil->xc_log->l_icloglock); 751 return; 752 } 753 754 /* 755 * Take a reference to the iclog for the context so that we still hold 756 * it when xlog_write is done and has released it. This means the 757 * context controls when the iclog is released for IO. 758 */ 759 atomic_inc(&iclog->ic_refcnt); 760 761 /* 762 * xlog_state_get_iclog_space() guarantees there is enough space in the 763 * iclog for an entire commit record, so we can attach the context 764 * callbacks now. This needs to be done before we make the commit_lsn 765 * visible to waiters so that checkpoints with commit records in the 766 * same iclog order their IO completion callbacks in the same order that 767 * the commit records appear in the iclog. 768 */ 769 spin_lock(&cil->xc_log->l_icloglock); 770 list_add_tail(&ctx->iclog_entry, &iclog->ic_callbacks); 771 spin_unlock(&cil->xc_log->l_icloglock); 772 773 /* 774 * Now we can record the commit LSN and wake anyone waiting for this 775 * sequence to have the ordered commit record assigned to a physical 776 * location in the log. 777 */ 778 spin_lock(&cil->xc_push_lock); 779 ctx->commit_iclog = iclog; 780 ctx->commit_lsn = lsn; 781 wake_up_all(&cil->xc_commit_wait); 782 spin_unlock(&cil->xc_push_lock); 783 } 784 785 786 /* 787 * Ensure that the order of log writes follows checkpoint sequence order. This 788 * relies on the context LSN being zero until the log write has guaranteed the 789 * LSN that the log write will start at via xlog_state_get_iclog_space(). 790 */ 791 enum _record_type { 792 _START_RECORD, 793 _COMMIT_RECORD, 794 }; 795 796 static int 797 xlog_cil_order_write( 798 struct xfs_cil *cil, 799 xfs_csn_t sequence, 800 enum _record_type record) 801 { 802 struct xfs_cil_ctx *ctx; 803 804 restart: 805 spin_lock(&cil->xc_push_lock); 806 list_for_each_entry(ctx, &cil->xc_committing, committing) { 807 /* 808 * Avoid getting stuck in this loop because we were woken by the 809 * shutdown, but then went back to sleep once already in the 810 * shutdown state. 811 */ 812 if (xlog_is_shutdown(cil->xc_log)) { 813 spin_unlock(&cil->xc_push_lock); 814 return -EIO; 815 } 816 817 /* 818 * Higher sequences will wait for this one so skip them. 819 * Don't wait for our own sequence, either. 820 */ 821 if (ctx->sequence >= sequence) 822 continue; 823 824 /* Wait until the LSN for the record has been recorded. */ 825 switch (record) { 826 case _START_RECORD: 827 if (!ctx->start_lsn) { 828 xlog_wait(&cil->xc_start_wait, &cil->xc_push_lock); 829 goto restart; 830 } 831 break; 832 case _COMMIT_RECORD: 833 if (!ctx->commit_lsn) { 834 xlog_wait(&cil->xc_commit_wait, &cil->xc_push_lock); 835 goto restart; 836 } 837 break; 838 } 839 } 840 spin_unlock(&cil->xc_push_lock); 841 return 0; 842 } 843 844 /* 845 * Write out the log vector change now attached to the CIL context. This will 846 * write a start record that needs to be strictly ordered in ascending CIL 847 * sequence order so that log recovery will always use in-order start LSNs when 848 * replaying checkpoints. 849 */ 850 static int 851 xlog_cil_write_chain( 852 struct xfs_cil_ctx *ctx, 853 struct xfs_log_vec *chain, 854 uint32_t chain_len) 855 { 856 struct xlog *log = ctx->cil->xc_log; 857 int error; 858 859 error = xlog_cil_order_write(ctx->cil, ctx->sequence, _START_RECORD); 860 if (error) 861 return error; 862 return xlog_write(log, ctx, chain, ctx->ticket, chain_len); 863 } 864 865 /* 866 * Write out the commit record of a checkpoint transaction to close off a 867 * running log write. These commit records are strictly ordered in ascending CIL 868 * sequence order so that log recovery will always replay the checkpoints in the 869 * correct order. 870 */ 871 static int 872 xlog_cil_write_commit_record( 873 struct xfs_cil_ctx *ctx) 874 { 875 struct xlog *log = ctx->cil->xc_log; 876 struct xlog_op_header ophdr = { 877 .oh_clientid = XFS_TRANSACTION, 878 .oh_tid = cpu_to_be32(ctx->ticket->t_tid), 879 .oh_flags = XLOG_COMMIT_TRANS, 880 }; 881 struct xfs_log_iovec reg = { 882 .i_addr = &ophdr, 883 .i_len = sizeof(struct xlog_op_header), 884 .i_type = XLOG_REG_TYPE_COMMIT, 885 }; 886 struct xfs_log_vec vec = { 887 .lv_niovecs = 1, 888 .lv_iovecp = ®, 889 }; 890 int error; 891 892 if (xlog_is_shutdown(log)) 893 return -EIO; 894 895 error = xlog_cil_order_write(ctx->cil, ctx->sequence, _COMMIT_RECORD); 896 if (error) 897 return error; 898 899 /* account for space used by record data */ 900 ctx->ticket->t_curr_res -= reg.i_len; 901 error = xlog_write(log, ctx, &vec, ctx->ticket, reg.i_len); 902 if (error) 903 xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR); 904 return error; 905 } 906 907 struct xlog_cil_trans_hdr { 908 struct xlog_op_header oph[2]; 909 struct xfs_trans_header thdr; 910 struct xfs_log_iovec lhdr[2]; 911 }; 912 913 /* 914 * Build a checkpoint transaction header to begin the journal transaction. We 915 * need to account for the space used by the transaction header here as it is 916 * not accounted for in xlog_write(). 917 * 918 * This is the only place we write a transaction header, so we also build the 919 * log opheaders that indicate the start of a log transaction and wrap the 920 * transaction header. We keep the start record in it's own log vector rather 921 * than compacting them into a single region as this ends up making the logic 922 * in xlog_write() for handling empty opheaders for start, commit and unmount 923 * records much simpler. 924 */ 925 static void 926 xlog_cil_build_trans_hdr( 927 struct xfs_cil_ctx *ctx, 928 struct xlog_cil_trans_hdr *hdr, 929 struct xfs_log_vec *lvhdr, 930 int num_iovecs) 931 { 932 struct xlog_ticket *tic = ctx->ticket; 933 __be32 tid = cpu_to_be32(tic->t_tid); 934 935 memset(hdr, 0, sizeof(*hdr)); 936 937 /* Log start record */ 938 hdr->oph[0].oh_tid = tid; 939 hdr->oph[0].oh_clientid = XFS_TRANSACTION; 940 hdr->oph[0].oh_flags = XLOG_START_TRANS; 941 942 /* log iovec region pointer */ 943 hdr->lhdr[0].i_addr = &hdr->oph[0]; 944 hdr->lhdr[0].i_len = sizeof(struct xlog_op_header); 945 hdr->lhdr[0].i_type = XLOG_REG_TYPE_LRHEADER; 946 947 /* log opheader */ 948 hdr->oph[1].oh_tid = tid; 949 hdr->oph[1].oh_clientid = XFS_TRANSACTION; 950 hdr->oph[1].oh_len = cpu_to_be32(sizeof(struct xfs_trans_header)); 951 952 /* transaction header in host byte order format */ 953 hdr->thdr.th_magic = XFS_TRANS_HEADER_MAGIC; 954 hdr->thdr.th_type = XFS_TRANS_CHECKPOINT; 955 hdr->thdr.th_tid = tic->t_tid; 956 hdr->thdr.th_num_items = num_iovecs; 957 958 /* log iovec region pointer */ 959 hdr->lhdr[1].i_addr = &hdr->oph[1]; 960 hdr->lhdr[1].i_len = sizeof(struct xlog_op_header) + 961 sizeof(struct xfs_trans_header); 962 hdr->lhdr[1].i_type = XLOG_REG_TYPE_TRANSHDR; 963 964 lvhdr->lv_niovecs = 2; 965 lvhdr->lv_iovecp = &hdr->lhdr[0]; 966 lvhdr->lv_bytes = hdr->lhdr[0].i_len + hdr->lhdr[1].i_len; 967 lvhdr->lv_next = ctx->lv_chain; 968 969 tic->t_curr_res -= lvhdr->lv_bytes; 970 } 971 972 /* 973 * Pull all the log vectors off the items in the CIL, and remove the items from 974 * the CIL. We don't need the CIL lock here because it's only needed on the 975 * transaction commit side which is currently locked out by the flush lock. 976 * 977 * If a log item is marked with a whiteout, we do not need to write it to the 978 * journal and so we just move them to the whiteout list for the caller to 979 * dispose of appropriately. 980 */ 981 static void 982 xlog_cil_build_lv_chain( 983 struct xfs_cil *cil, 984 struct xfs_cil_ctx *ctx, 985 struct list_head *whiteouts, 986 uint32_t *num_iovecs, 987 uint32_t *num_bytes) 988 { 989 struct xfs_log_vec *lv = NULL; 990 991 while (!list_empty(&cil->xc_cil)) { 992 struct xfs_log_item *item; 993 994 item = list_first_entry(&cil->xc_cil, 995 struct xfs_log_item, li_cil); 996 997 if (test_bit(XFS_LI_WHITEOUT, &item->li_flags)) { 998 list_move(&item->li_cil, whiteouts); 999 trace_xfs_cil_whiteout_skip(item); 1000 continue; 1001 } 1002 1003 list_del_init(&item->li_cil); 1004 if (!ctx->lv_chain) 1005 ctx->lv_chain = item->li_lv; 1006 else 1007 lv->lv_next = item->li_lv; 1008 lv = item->li_lv; 1009 item->li_lv = NULL; 1010 *num_iovecs += lv->lv_niovecs; 1011 1012 /* we don't write ordered log vectors */ 1013 if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED) 1014 *num_bytes += lv->lv_bytes; 1015 } 1016 } 1017 1018 static void 1019 xlog_cil_cleanup_whiteouts( 1020 struct list_head *whiteouts) 1021 { 1022 while (!list_empty(whiteouts)) { 1023 struct xfs_log_item *item = list_first_entry(whiteouts, 1024 struct xfs_log_item, li_cil); 1025 list_del_init(&item->li_cil); 1026 trace_xfs_cil_whiteout_unpin(item); 1027 item->li_ops->iop_unpin(item, 1); 1028 } 1029 } 1030 1031 /* 1032 * Push the Committed Item List to the log. 1033 * 1034 * If the current sequence is the same as xc_push_seq we need to do a flush. If 1035 * xc_push_seq is less than the current sequence, then it has already been 1036 * flushed and we don't need to do anything - the caller will wait for it to 1037 * complete if necessary. 1038 * 1039 * xc_push_seq is checked unlocked against the sequence number for a match. 1040 * Hence we can allow log forces to run racily and not issue pushes for the 1041 * same sequence twice. If we get a race between multiple pushes for the same 1042 * sequence they will block on the first one and then abort, hence avoiding 1043 * needless pushes. 1044 */ 1045 static void 1046 xlog_cil_push_work( 1047 struct work_struct *work) 1048 { 1049 struct xfs_cil_ctx *ctx = 1050 container_of(work, struct xfs_cil_ctx, push_work); 1051 struct xfs_cil *cil = ctx->cil; 1052 struct xlog *log = cil->xc_log; 1053 struct xfs_cil_ctx *new_ctx; 1054 int num_iovecs = 0; 1055 int num_bytes = 0; 1056 int error = 0; 1057 struct xlog_cil_trans_hdr thdr; 1058 struct xfs_log_vec lvhdr = { NULL }; 1059 xfs_csn_t push_seq; 1060 bool push_commit_stable; 1061 LIST_HEAD (whiteouts); 1062 1063 new_ctx = xlog_cil_ctx_alloc(); 1064 new_ctx->ticket = xlog_cil_ticket_alloc(log); 1065 1066 down_write(&cil->xc_ctx_lock); 1067 1068 spin_lock(&cil->xc_push_lock); 1069 push_seq = cil->xc_push_seq; 1070 ASSERT(push_seq <= ctx->sequence); 1071 push_commit_stable = cil->xc_push_commit_stable; 1072 cil->xc_push_commit_stable = false; 1073 1074 /* 1075 * As we are about to switch to a new, empty CIL context, we no longer 1076 * need to throttle tasks on CIL space overruns. Wake any waiters that 1077 * the hard push throttle may have caught so they can start committing 1078 * to the new context. The ctx->xc_push_lock provides the serialisation 1079 * necessary for safely using the lockless waitqueue_active() check in 1080 * this context. 1081 */ 1082 if (waitqueue_active(&cil->xc_push_wait)) 1083 wake_up_all(&cil->xc_push_wait); 1084 1085 /* 1086 * Check if we've anything to push. If there is nothing, then we don't 1087 * move on to a new sequence number and so we have to be able to push 1088 * this sequence again later. 1089 */ 1090 if (list_empty(&cil->xc_cil)) { 1091 cil->xc_push_seq = 0; 1092 spin_unlock(&cil->xc_push_lock); 1093 goto out_skip; 1094 } 1095 1096 1097 /* check for a previously pushed sequence */ 1098 if (push_seq < ctx->sequence) { 1099 spin_unlock(&cil->xc_push_lock); 1100 goto out_skip; 1101 } 1102 1103 /* 1104 * We are now going to push this context, so add it to the committing 1105 * list before we do anything else. This ensures that anyone waiting on 1106 * this push can easily detect the difference between a "push in 1107 * progress" and "CIL is empty, nothing to do". 1108 * 1109 * IOWs, a wait loop can now check for: 1110 * the current sequence not being found on the committing list; 1111 * an empty CIL; and 1112 * an unchanged sequence number 1113 * to detect a push that had nothing to do and therefore does not need 1114 * waiting on. If the CIL is not empty, we get put on the committing 1115 * list before emptying the CIL and bumping the sequence number. Hence 1116 * an empty CIL and an unchanged sequence number means we jumped out 1117 * above after doing nothing. 1118 * 1119 * Hence the waiter will either find the commit sequence on the 1120 * committing list or the sequence number will be unchanged and the CIL 1121 * still dirty. In that latter case, the push has not yet started, and 1122 * so the waiter will have to continue trying to check the CIL 1123 * committing list until it is found. In extreme cases of delay, the 1124 * sequence may fully commit between the attempts the wait makes to wait 1125 * on the commit sequence. 1126 */ 1127 list_add(&ctx->committing, &cil->xc_committing); 1128 spin_unlock(&cil->xc_push_lock); 1129 1130 xlog_cil_build_lv_chain(cil, ctx, &whiteouts, &num_iovecs, &num_bytes); 1131 1132 /* 1133 * Switch the contexts so we can drop the context lock and move out 1134 * of a shared context. We can't just go straight to the commit record, 1135 * though - we need to synchronise with previous and future commits so 1136 * that the commit records are correctly ordered in the log to ensure 1137 * that we process items during log IO completion in the correct order. 1138 * 1139 * For example, if we get an EFI in one checkpoint and the EFD in the 1140 * next (e.g. due to log forces), we do not want the checkpoint with 1141 * the EFD to be committed before the checkpoint with the EFI. Hence 1142 * we must strictly order the commit records of the checkpoints so 1143 * that: a) the checkpoint callbacks are attached to the iclogs in the 1144 * correct order; and b) the checkpoints are replayed in correct order 1145 * in log recovery. 1146 * 1147 * Hence we need to add this context to the committing context list so 1148 * that higher sequences will wait for us to write out a commit record 1149 * before they do. 1150 * 1151 * xfs_log_force_seq requires us to mirror the new sequence into the cil 1152 * structure atomically with the addition of this sequence to the 1153 * committing list. This also ensures that we can do unlocked checks 1154 * against the current sequence in log forces without risking 1155 * deferencing a freed context pointer. 1156 */ 1157 spin_lock(&cil->xc_push_lock); 1158 xlog_cil_ctx_switch(cil, new_ctx); 1159 spin_unlock(&cil->xc_push_lock); 1160 up_write(&cil->xc_ctx_lock); 1161 1162 /* 1163 * Build a checkpoint transaction header and write it to the log to 1164 * begin the transaction. We need to account for the space used by the 1165 * transaction header here as it is not accounted for in xlog_write(). 1166 */ 1167 xlog_cil_build_trans_hdr(ctx, &thdr, &lvhdr, num_iovecs); 1168 num_bytes += lvhdr.lv_bytes; 1169 1170 error = xlog_cil_write_chain(ctx, &lvhdr, num_bytes); 1171 if (error) 1172 goto out_abort_free_ticket; 1173 1174 error = xlog_cil_write_commit_record(ctx); 1175 if (error) 1176 goto out_abort_free_ticket; 1177 1178 xfs_log_ticket_ungrant(log, ctx->ticket); 1179 1180 /* 1181 * If the checkpoint spans multiple iclogs, wait for all previous iclogs 1182 * to complete before we submit the commit_iclog. We can't use state 1183 * checks for this - ACTIVE can be either a past completed iclog or a 1184 * future iclog being filled, while WANT_SYNC through SYNC_DONE can be a 1185 * past or future iclog awaiting IO or ordered IO completion to be run. 1186 * In the latter case, if it's a future iclog and we wait on it, the we 1187 * will hang because it won't get processed through to ic_force_wait 1188 * wakeup until this commit_iclog is written to disk. Hence we use the 1189 * iclog header lsn and compare it to the commit lsn to determine if we 1190 * need to wait on iclogs or not. 1191 */ 1192 spin_lock(&log->l_icloglock); 1193 if (ctx->start_lsn != ctx->commit_lsn) { 1194 xfs_lsn_t plsn; 1195 1196 plsn = be64_to_cpu(ctx->commit_iclog->ic_prev->ic_header.h_lsn); 1197 if (plsn && XFS_LSN_CMP(plsn, ctx->commit_lsn) < 0) { 1198 /* 1199 * Waiting on ic_force_wait orders the completion of 1200 * iclogs older than ic_prev. Hence we only need to wait 1201 * on the most recent older iclog here. 1202 */ 1203 xlog_wait_on_iclog(ctx->commit_iclog->ic_prev); 1204 spin_lock(&log->l_icloglock); 1205 } 1206 1207 /* 1208 * We need to issue a pre-flush so that the ordering for this 1209 * checkpoint is correctly preserved down to stable storage. 1210 */ 1211 ctx->commit_iclog->ic_flags |= XLOG_ICL_NEED_FLUSH; 1212 } 1213 1214 /* 1215 * The commit iclog must be written to stable storage to guarantee 1216 * journal IO vs metadata writeback IO is correctly ordered on stable 1217 * storage. 1218 * 1219 * If the push caller needs the commit to be immediately stable and the 1220 * commit_iclog is not yet marked as XLOG_STATE_WANT_SYNC to indicate it 1221 * will be written when released, switch it's state to WANT_SYNC right 1222 * now. 1223 */ 1224 ctx->commit_iclog->ic_flags |= XLOG_ICL_NEED_FUA; 1225 if (push_commit_stable && 1226 ctx->commit_iclog->ic_state == XLOG_STATE_ACTIVE) 1227 xlog_state_switch_iclogs(log, ctx->commit_iclog, 0); 1228 xlog_state_release_iclog(log, ctx->commit_iclog); 1229 1230 /* Not safe to reference ctx now! */ 1231 1232 spin_unlock(&log->l_icloglock); 1233 xlog_cil_cleanup_whiteouts(&whiteouts); 1234 return; 1235 1236 out_skip: 1237 up_write(&cil->xc_ctx_lock); 1238 xfs_log_ticket_put(new_ctx->ticket); 1239 kmem_free(new_ctx); 1240 return; 1241 1242 out_abort_free_ticket: 1243 xfs_log_ticket_ungrant(log, ctx->ticket); 1244 ASSERT(xlog_is_shutdown(log)); 1245 xlog_cil_cleanup_whiteouts(&whiteouts); 1246 if (!ctx->commit_iclog) { 1247 xlog_cil_committed(ctx); 1248 return; 1249 } 1250 spin_lock(&log->l_icloglock); 1251 xlog_state_release_iclog(log, ctx->commit_iclog); 1252 /* Not safe to reference ctx now! */ 1253 spin_unlock(&log->l_icloglock); 1254 } 1255 1256 /* 1257 * We need to push CIL every so often so we don't cache more than we can fit in 1258 * the log. The limit really is that a checkpoint can't be more than half the 1259 * log (the current checkpoint is not allowed to overwrite the previous 1260 * checkpoint), but commit latency and memory usage limit this to a smaller 1261 * size. 1262 */ 1263 static void 1264 xlog_cil_push_background( 1265 struct xlog *log) __releases(cil->xc_ctx_lock) 1266 { 1267 struct xfs_cil *cil = log->l_cilp; 1268 1269 /* 1270 * The cil won't be empty because we are called while holding the 1271 * context lock so whatever we added to the CIL will still be there 1272 */ 1273 ASSERT(!list_empty(&cil->xc_cil)); 1274 1275 /* 1276 * Don't do a background push if we haven't used up all the 1277 * space available yet. 1278 */ 1279 if (cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log)) { 1280 up_read(&cil->xc_ctx_lock); 1281 return; 1282 } 1283 1284 spin_lock(&cil->xc_push_lock); 1285 if (cil->xc_push_seq < cil->xc_current_sequence) { 1286 cil->xc_push_seq = cil->xc_current_sequence; 1287 queue_work(cil->xc_push_wq, &cil->xc_ctx->push_work); 1288 } 1289 1290 /* 1291 * Drop the context lock now, we can't hold that if we need to sleep 1292 * because we are over the blocking threshold. The push_lock is still 1293 * held, so blocking threshold sleep/wakeup is still correctly 1294 * serialised here. 1295 */ 1296 up_read(&cil->xc_ctx_lock); 1297 1298 /* 1299 * If we are well over the space limit, throttle the work that is being 1300 * done until the push work on this context has begun. Enforce the hard 1301 * throttle on all transaction commits once it has been activated, even 1302 * if the committing transactions have resulted in the space usage 1303 * dipping back down under the hard limit. 1304 * 1305 * The ctx->xc_push_lock provides the serialisation necessary for safely 1306 * using the lockless waitqueue_active() check in this context. 1307 */ 1308 if (cil->xc_ctx->space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log) || 1309 waitqueue_active(&cil->xc_push_wait)) { 1310 trace_xfs_log_cil_wait(log, cil->xc_ctx->ticket); 1311 ASSERT(cil->xc_ctx->space_used < log->l_logsize); 1312 xlog_wait(&cil->xc_push_wait, &cil->xc_push_lock); 1313 return; 1314 } 1315 1316 spin_unlock(&cil->xc_push_lock); 1317 1318 } 1319 1320 /* 1321 * xlog_cil_push_now() is used to trigger an immediate CIL push to the sequence 1322 * number that is passed. When it returns, the work will be queued for 1323 * @push_seq, but it won't be completed. 1324 * 1325 * If the caller is performing a synchronous force, we will flush the workqueue 1326 * to get previously queued work moving to minimise the wait time they will 1327 * undergo waiting for all outstanding pushes to complete. The caller is 1328 * expected to do the required waiting for push_seq to complete. 1329 * 1330 * If the caller is performing an async push, we need to ensure that the 1331 * checkpoint is fully flushed out of the iclogs when we finish the push. If we 1332 * don't do this, then the commit record may remain sitting in memory in an 1333 * ACTIVE iclog. This then requires another full log force to push to disk, 1334 * which defeats the purpose of having an async, non-blocking CIL force 1335 * mechanism. Hence in this case we need to pass a flag to the push work to 1336 * indicate it needs to flush the commit record itself. 1337 */ 1338 static void 1339 xlog_cil_push_now( 1340 struct xlog *log, 1341 xfs_lsn_t push_seq, 1342 bool async) 1343 { 1344 struct xfs_cil *cil = log->l_cilp; 1345 1346 if (!cil) 1347 return; 1348 1349 ASSERT(push_seq && push_seq <= cil->xc_current_sequence); 1350 1351 /* start on any pending background push to minimise wait time on it */ 1352 if (!async) 1353 flush_workqueue(cil->xc_push_wq); 1354 1355 spin_lock(&cil->xc_push_lock); 1356 1357 /* 1358 * If this is an async flush request, we always need to set the 1359 * xc_push_commit_stable flag even if something else has already queued 1360 * a push. The flush caller is asking for the CIL to be on stable 1361 * storage when the next push completes, so regardless of who has queued 1362 * the push, the flush requires stable semantics from it. 1363 */ 1364 cil->xc_push_commit_stable = async; 1365 1366 /* 1367 * If the CIL is empty or we've already pushed the sequence then 1368 * there's no more work that we need to do. 1369 */ 1370 if (list_empty(&cil->xc_cil) || push_seq <= cil->xc_push_seq) { 1371 spin_unlock(&cil->xc_push_lock); 1372 return; 1373 } 1374 1375 cil->xc_push_seq = push_seq; 1376 queue_work(cil->xc_push_wq, &cil->xc_ctx->push_work); 1377 spin_unlock(&cil->xc_push_lock); 1378 } 1379 1380 bool 1381 xlog_cil_empty( 1382 struct xlog *log) 1383 { 1384 struct xfs_cil *cil = log->l_cilp; 1385 bool empty = false; 1386 1387 spin_lock(&cil->xc_push_lock); 1388 if (list_empty(&cil->xc_cil)) 1389 empty = true; 1390 spin_unlock(&cil->xc_push_lock); 1391 return empty; 1392 } 1393 1394 /* 1395 * If there are intent done items in this transaction and the related intent was 1396 * committed in the current (same) CIL checkpoint, we don't need to write either 1397 * the intent or intent done item to the journal as the change will be 1398 * journalled atomically within this checkpoint. As we cannot remove items from 1399 * the CIL here, mark the related intent with a whiteout so that the CIL push 1400 * can remove it rather than writing it to the journal. Then remove the intent 1401 * done item from the current transaction and release it so it doesn't get put 1402 * into the CIL at all. 1403 */ 1404 static uint32_t 1405 xlog_cil_process_intents( 1406 struct xfs_cil *cil, 1407 struct xfs_trans *tp) 1408 { 1409 struct xfs_log_item *lip, *ilip, *next; 1410 uint32_t len = 0; 1411 1412 list_for_each_entry_safe(lip, next, &tp->t_items, li_trans) { 1413 if (!(lip->li_ops->flags & XFS_ITEM_INTENT_DONE)) 1414 continue; 1415 1416 ilip = lip->li_ops->iop_intent(lip); 1417 if (!ilip || !xlog_item_in_current_chkpt(cil, ilip)) 1418 continue; 1419 set_bit(XFS_LI_WHITEOUT, &ilip->li_flags); 1420 trace_xfs_cil_whiteout_mark(ilip); 1421 len += ilip->li_lv->lv_bytes; 1422 kmem_free(ilip->li_lv); 1423 ilip->li_lv = NULL; 1424 1425 xfs_trans_del_item(lip); 1426 lip->li_ops->iop_release(lip); 1427 } 1428 return len; 1429 } 1430 1431 /* 1432 * Commit a transaction with the given vector to the Committed Item List. 1433 * 1434 * To do this, we need to format the item, pin it in memory if required and 1435 * account for the space used by the transaction. Once we have done that we 1436 * need to release the unused reservation for the transaction, attach the 1437 * transaction to the checkpoint context so we carry the busy extents through 1438 * to checkpoint completion, and then unlock all the items in the transaction. 1439 * 1440 * Called with the context lock already held in read mode to lock out 1441 * background commit, returns without it held once background commits are 1442 * allowed again. 1443 */ 1444 void 1445 xlog_cil_commit( 1446 struct xlog *log, 1447 struct xfs_trans *tp, 1448 xfs_csn_t *commit_seq, 1449 bool regrant) 1450 { 1451 struct xfs_cil *cil = log->l_cilp; 1452 struct xfs_log_item *lip, *next; 1453 uint32_t released_space = 0; 1454 1455 /* 1456 * Do all necessary memory allocation before we lock the CIL. 1457 * This ensures the allocation does not deadlock with a CIL 1458 * push in memory reclaim (e.g. from kswapd). 1459 */ 1460 xlog_cil_alloc_shadow_bufs(log, tp); 1461 1462 /* lock out background commit */ 1463 down_read(&cil->xc_ctx_lock); 1464 1465 if (tp->t_flags & XFS_TRANS_HAS_INTENT_DONE) 1466 released_space = xlog_cil_process_intents(cil, tp); 1467 1468 xlog_cil_insert_items(log, tp, released_space); 1469 1470 if (regrant && !xlog_is_shutdown(log)) 1471 xfs_log_ticket_regrant(log, tp->t_ticket); 1472 else 1473 xfs_log_ticket_ungrant(log, tp->t_ticket); 1474 tp->t_ticket = NULL; 1475 xfs_trans_unreserve_and_mod_sb(tp); 1476 1477 /* 1478 * Once all the items of the transaction have been copied to the CIL, 1479 * the items can be unlocked and possibly freed. 1480 * 1481 * This needs to be done before we drop the CIL context lock because we 1482 * have to update state in the log items and unlock them before they go 1483 * to disk. If we don't, then the CIL checkpoint can race with us and 1484 * we can run checkpoint completion before we've updated and unlocked 1485 * the log items. This affects (at least) processing of stale buffers, 1486 * inodes and EFIs. 1487 */ 1488 trace_xfs_trans_commit_items(tp, _RET_IP_); 1489 list_for_each_entry_safe(lip, next, &tp->t_items, li_trans) { 1490 xfs_trans_del_item(lip); 1491 if (lip->li_ops->iop_committing) 1492 lip->li_ops->iop_committing(lip, cil->xc_ctx->sequence); 1493 } 1494 if (commit_seq) 1495 *commit_seq = cil->xc_ctx->sequence; 1496 1497 /* xlog_cil_push_background() releases cil->xc_ctx_lock */ 1498 xlog_cil_push_background(log); 1499 } 1500 1501 /* 1502 * Flush the CIL to stable storage but don't wait for it to complete. This 1503 * requires the CIL push to ensure the commit record for the push hits the disk, 1504 * but otherwise is no different to a push done from a log force. 1505 */ 1506 void 1507 xlog_cil_flush( 1508 struct xlog *log) 1509 { 1510 xfs_csn_t seq = log->l_cilp->xc_current_sequence; 1511 1512 trace_xfs_log_force(log->l_mp, seq, _RET_IP_); 1513 xlog_cil_push_now(log, seq, true); 1514 1515 /* 1516 * If the CIL is empty, make sure that any previous checkpoint that may 1517 * still be in an active iclog is pushed to stable storage. 1518 */ 1519 if (list_empty(&log->l_cilp->xc_cil)) 1520 xfs_log_force(log->l_mp, 0); 1521 } 1522 1523 /* 1524 * Conditionally push the CIL based on the sequence passed in. 1525 * 1526 * We only need to push if we haven't already pushed the sequence number given. 1527 * Hence the only time we will trigger a push here is if the push sequence is 1528 * the same as the current context. 1529 * 1530 * We return the current commit lsn to allow the callers to determine if a 1531 * iclog flush is necessary following this call. 1532 */ 1533 xfs_lsn_t 1534 xlog_cil_force_seq( 1535 struct xlog *log, 1536 xfs_csn_t sequence) 1537 { 1538 struct xfs_cil *cil = log->l_cilp; 1539 struct xfs_cil_ctx *ctx; 1540 xfs_lsn_t commit_lsn = NULLCOMMITLSN; 1541 1542 ASSERT(sequence <= cil->xc_current_sequence); 1543 1544 if (!sequence) 1545 sequence = cil->xc_current_sequence; 1546 trace_xfs_log_force(log->l_mp, sequence, _RET_IP_); 1547 1548 /* 1549 * check to see if we need to force out the current context. 1550 * xlog_cil_push() handles racing pushes for the same sequence, 1551 * so no need to deal with it here. 1552 */ 1553 restart: 1554 xlog_cil_push_now(log, sequence, false); 1555 1556 /* 1557 * See if we can find a previous sequence still committing. 1558 * We need to wait for all previous sequence commits to complete 1559 * before allowing the force of push_seq to go ahead. Hence block 1560 * on commits for those as well. 1561 */ 1562 spin_lock(&cil->xc_push_lock); 1563 list_for_each_entry(ctx, &cil->xc_committing, committing) { 1564 /* 1565 * Avoid getting stuck in this loop because we were woken by the 1566 * shutdown, but then went back to sleep once already in the 1567 * shutdown state. 1568 */ 1569 if (xlog_is_shutdown(log)) 1570 goto out_shutdown; 1571 if (ctx->sequence > sequence) 1572 continue; 1573 if (!ctx->commit_lsn) { 1574 /* 1575 * It is still being pushed! Wait for the push to 1576 * complete, then start again from the beginning. 1577 */ 1578 XFS_STATS_INC(log->l_mp, xs_log_force_sleep); 1579 xlog_wait(&cil->xc_commit_wait, &cil->xc_push_lock); 1580 goto restart; 1581 } 1582 if (ctx->sequence != sequence) 1583 continue; 1584 /* found it! */ 1585 commit_lsn = ctx->commit_lsn; 1586 } 1587 1588 /* 1589 * The call to xlog_cil_push_now() executes the push in the background. 1590 * Hence by the time we have got here it our sequence may not have been 1591 * pushed yet. This is true if the current sequence still matches the 1592 * push sequence after the above wait loop and the CIL still contains 1593 * dirty objects. This is guaranteed by the push code first adding the 1594 * context to the committing list before emptying the CIL. 1595 * 1596 * Hence if we don't find the context in the committing list and the 1597 * current sequence number is unchanged then the CIL contents are 1598 * significant. If the CIL is empty, if means there was nothing to push 1599 * and that means there is nothing to wait for. If the CIL is not empty, 1600 * it means we haven't yet started the push, because if it had started 1601 * we would have found the context on the committing list. 1602 */ 1603 if (sequence == cil->xc_current_sequence && 1604 !list_empty(&cil->xc_cil)) { 1605 spin_unlock(&cil->xc_push_lock); 1606 goto restart; 1607 } 1608 1609 spin_unlock(&cil->xc_push_lock); 1610 return commit_lsn; 1611 1612 /* 1613 * We detected a shutdown in progress. We need to trigger the log force 1614 * to pass through it's iclog state machine error handling, even though 1615 * we are already in a shutdown state. Hence we can't return 1616 * NULLCOMMITLSN here as that has special meaning to log forces (i.e. 1617 * LSN is already stable), so we return a zero LSN instead. 1618 */ 1619 out_shutdown: 1620 spin_unlock(&cil->xc_push_lock); 1621 return 0; 1622 } 1623 1624 /* 1625 * Perform initial CIL structure initialisation. 1626 */ 1627 int 1628 xlog_cil_init( 1629 struct xlog *log) 1630 { 1631 struct xfs_cil *cil; 1632 struct xfs_cil_ctx *ctx; 1633 1634 cil = kmem_zalloc(sizeof(*cil), KM_MAYFAIL); 1635 if (!cil) 1636 return -ENOMEM; 1637 /* 1638 * Limit the CIL pipeline depth to 4 concurrent works to bound the 1639 * concurrency the log spinlocks will be exposed to. 1640 */ 1641 cil->xc_push_wq = alloc_workqueue("xfs-cil/%s", 1642 XFS_WQFLAGS(WQ_FREEZABLE | WQ_MEM_RECLAIM | WQ_UNBOUND), 1643 4, log->l_mp->m_super->s_id); 1644 if (!cil->xc_push_wq) 1645 goto out_destroy_cil; 1646 1647 INIT_LIST_HEAD(&cil->xc_cil); 1648 INIT_LIST_HEAD(&cil->xc_committing); 1649 spin_lock_init(&cil->xc_cil_lock); 1650 spin_lock_init(&cil->xc_push_lock); 1651 init_waitqueue_head(&cil->xc_push_wait); 1652 init_rwsem(&cil->xc_ctx_lock); 1653 init_waitqueue_head(&cil->xc_start_wait); 1654 init_waitqueue_head(&cil->xc_commit_wait); 1655 cil->xc_log = log; 1656 log->l_cilp = cil; 1657 1658 ctx = xlog_cil_ctx_alloc(); 1659 xlog_cil_ctx_switch(cil, ctx); 1660 1661 return 0; 1662 1663 out_destroy_cil: 1664 kmem_free(cil); 1665 return -ENOMEM; 1666 } 1667 1668 void 1669 xlog_cil_destroy( 1670 struct xlog *log) 1671 { 1672 if (log->l_cilp->xc_ctx) { 1673 if (log->l_cilp->xc_ctx->ticket) 1674 xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket); 1675 kmem_free(log->l_cilp->xc_ctx); 1676 } 1677 1678 ASSERT(list_empty(&log->l_cilp->xc_cil)); 1679 destroy_workqueue(log->l_cilp->xc_push_wq); 1680 kmem_free(log->l_cilp); 1681 } 1682 1683