1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/backing-dev.h> 5 #include <linux/fs.h> 6 #include <linux/mm.h> 7 #include <linux/swap.h> 8 #include <linux/pagemap.h> 9 #include <linux/slab.h> 10 #include <linux/pagevec.h> 11 #include <linux/task_io_accounting_ops.h> 12 #include <linux/signal.h> 13 #include <linux/iversion.h> 14 #include <linux/ktime.h> 15 #include <linux/netfs.h> 16 17 #include "super.h" 18 #include "mds_client.h" 19 #include "cache.h" 20 #include "metric.h" 21 #include "crypto.h" 22 #include <linux/ceph/osd_client.h> 23 #include <linux/ceph/striper.h> 24 25 /* 26 * Ceph address space ops. 27 * 28 * There are a few funny things going on here. 29 * 30 * The page->private field is used to reference a struct 31 * ceph_snap_context for _every_ dirty page. This indicates which 32 * snapshot the page was logically dirtied in, and thus which snap 33 * context needs to be associated with the osd write during writeback. 34 * 35 * Similarly, struct ceph_inode_info maintains a set of counters to 36 * count dirty pages on the inode. In the absence of snapshots, 37 * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count. 38 * 39 * When a snapshot is taken (that is, when the client receives 40 * notification that a snapshot was taken), each inode with caps and 41 * with dirty pages (dirty pages implies there is a cap) gets a new 42 * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending 43 * order, new snaps go to the tail). The i_wrbuffer_ref_head count is 44 * moved to capsnap->dirty. (Unless a sync write is currently in 45 * progress. In that case, the capsnap is said to be "pending", new 46 * writes cannot start, and the capsnap isn't "finalized" until the 47 * write completes (or fails) and a final size/mtime for the inode for 48 * that snap can be settled upon.) i_wrbuffer_ref_head is reset to 0. 49 * 50 * On writeback, we must submit writes to the osd IN SNAP ORDER. So, 51 * we look for the first capsnap in i_cap_snaps and write out pages in 52 * that snap context _only_. Then we move on to the next capsnap, 53 * eventually reaching the "live" or "head" context (i.e., pages that 54 * are not yet snapped) and are writing the most recently dirtied 55 * pages. 56 * 57 * Invalidate and so forth must take care to ensure the dirty page 58 * accounting is preserved. 59 */ 60 61 #define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10)) 62 #define CONGESTION_OFF_THRESH(congestion_kb) \ 63 (CONGESTION_ON_THRESH(congestion_kb) - \ 64 (CONGESTION_ON_THRESH(congestion_kb) >> 2)) 65 66 static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len, 67 struct folio **foliop, void **_fsdata); 68 69 static inline struct ceph_snap_context *page_snap_context(struct page *page) 70 { 71 if (PagePrivate(page)) 72 return (void *)page->private; 73 return NULL; 74 } 75 76 /* 77 * Dirty a page. Optimistically adjust accounting, on the assumption 78 * that we won't race with invalidate. If we do, readjust. 79 */ 80 static bool ceph_dirty_folio(struct address_space *mapping, struct folio *folio) 81 { 82 struct inode *inode; 83 struct ceph_inode_info *ci; 84 struct ceph_snap_context *snapc; 85 86 if (folio_test_dirty(folio)) { 87 dout("%p dirty_folio %p idx %lu -- already dirty\n", 88 mapping->host, folio, folio->index); 89 VM_BUG_ON_FOLIO(!folio_test_private(folio), folio); 90 return false; 91 } 92 93 inode = mapping->host; 94 ci = ceph_inode(inode); 95 96 /* dirty the head */ 97 spin_lock(&ci->i_ceph_lock); 98 BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference 99 if (__ceph_have_pending_cap_snap(ci)) { 100 struct ceph_cap_snap *capsnap = 101 list_last_entry(&ci->i_cap_snaps, 102 struct ceph_cap_snap, 103 ci_item); 104 snapc = ceph_get_snap_context(capsnap->context); 105 capsnap->dirty_pages++; 106 } else { 107 BUG_ON(!ci->i_head_snapc); 108 snapc = ceph_get_snap_context(ci->i_head_snapc); 109 ++ci->i_wrbuffer_ref_head; 110 } 111 if (ci->i_wrbuffer_ref == 0) 112 ihold(inode); 113 ++ci->i_wrbuffer_ref; 114 dout("%p dirty_folio %p idx %lu head %d/%d -> %d/%d " 115 "snapc %p seq %lld (%d snaps)\n", 116 mapping->host, folio, folio->index, 117 ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1, 118 ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head, 119 snapc, snapc->seq, snapc->num_snaps); 120 spin_unlock(&ci->i_ceph_lock); 121 122 /* 123 * Reference snap context in folio->private. Also set 124 * PagePrivate so that we get invalidate_folio callback. 125 */ 126 VM_WARN_ON_FOLIO(folio->private, folio); 127 folio_attach_private(folio, snapc); 128 129 return ceph_fscache_dirty_folio(mapping, folio); 130 } 131 132 /* 133 * If we are truncating the full folio (i.e. offset == 0), adjust the 134 * dirty folio counters appropriately. Only called if there is private 135 * data on the folio. 136 */ 137 static void ceph_invalidate_folio(struct folio *folio, size_t offset, 138 size_t length) 139 { 140 struct inode *inode; 141 struct ceph_inode_info *ci; 142 struct ceph_snap_context *snapc; 143 144 inode = folio->mapping->host; 145 ci = ceph_inode(inode); 146 147 if (offset != 0 || length != folio_size(folio)) { 148 dout("%p invalidate_folio idx %lu partial dirty page %zu~%zu\n", 149 inode, folio->index, offset, length); 150 return; 151 } 152 153 WARN_ON(!folio_test_locked(folio)); 154 if (folio_test_private(folio)) { 155 dout("%p invalidate_folio idx %lu full dirty page\n", 156 inode, folio->index); 157 158 snapc = folio_detach_private(folio); 159 ceph_put_wrbuffer_cap_refs(ci, 1, snapc); 160 ceph_put_snap_context(snapc); 161 } 162 163 folio_wait_fscache(folio); 164 } 165 166 static bool ceph_release_folio(struct folio *folio, gfp_t gfp) 167 { 168 struct inode *inode = folio->mapping->host; 169 170 dout("%llx:%llx release_folio idx %lu (%sdirty)\n", 171 ceph_vinop(inode), 172 folio->index, folio_test_dirty(folio) ? "" : "not "); 173 174 if (folio_test_private(folio)) 175 return false; 176 177 if (folio_test_fscache(folio)) { 178 if (current_is_kswapd() || !(gfp & __GFP_FS)) 179 return false; 180 folio_wait_fscache(folio); 181 } 182 ceph_fscache_note_page_release(inode); 183 return true; 184 } 185 186 static void ceph_netfs_expand_readahead(struct netfs_io_request *rreq) 187 { 188 struct inode *inode = rreq->inode; 189 struct ceph_inode_info *ci = ceph_inode(inode); 190 struct ceph_file_layout *lo = &ci->i_layout; 191 unsigned long max_pages = inode->i_sb->s_bdi->ra_pages; 192 loff_t end = rreq->start + rreq->len, new_end; 193 struct ceph_netfs_request_data *priv = rreq->netfs_priv; 194 unsigned long max_len; 195 u32 blockoff; 196 197 if (priv) { 198 /* Readahead is disabled by posix_fadvise POSIX_FADV_RANDOM */ 199 if (priv->file_ra_disabled) 200 max_pages = 0; 201 else 202 max_pages = priv->file_ra_pages; 203 204 } 205 206 /* Readahead is disabled */ 207 if (!max_pages) 208 return; 209 210 max_len = max_pages << PAGE_SHIFT; 211 212 /* 213 * Try to expand the length forward by rounding up it to the next 214 * block, but do not exceed the file size, unless the original 215 * request already exceeds it. 216 */ 217 new_end = min(round_up(end, lo->stripe_unit), rreq->i_size); 218 if (new_end > end && new_end <= rreq->start + max_len) 219 rreq->len = new_end - rreq->start; 220 221 /* Try to expand the start downward */ 222 div_u64_rem(rreq->start, lo->stripe_unit, &blockoff); 223 if (rreq->len + blockoff <= max_len) { 224 rreq->start -= blockoff; 225 rreq->len += blockoff; 226 } 227 } 228 229 static bool ceph_netfs_clamp_length(struct netfs_io_subrequest *subreq) 230 { 231 struct inode *inode = subreq->rreq->inode; 232 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 233 struct ceph_inode_info *ci = ceph_inode(inode); 234 u64 objno, objoff; 235 u32 xlen; 236 237 /* Truncate the extent at the end of the current block */ 238 ceph_calc_file_object_mapping(&ci->i_layout, subreq->start, subreq->len, 239 &objno, &objoff, &xlen); 240 subreq->len = min(xlen, fsc->mount_options->rsize); 241 return true; 242 } 243 244 static void finish_netfs_read(struct ceph_osd_request *req) 245 { 246 struct inode *inode = req->r_inode; 247 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 248 struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); 249 struct netfs_io_subrequest *subreq = req->r_priv; 250 struct ceph_osd_req_op *op = &req->r_ops[0]; 251 int err = req->r_result; 252 bool sparse = (op->op == CEPH_OSD_OP_SPARSE_READ); 253 254 ceph_update_read_metrics(&fsc->mdsc->metric, req->r_start_latency, 255 req->r_end_latency, osd_data->length, err); 256 257 dout("%s: result %d subreq->len=%zu i_size=%lld\n", __func__, req->r_result, 258 subreq->len, i_size_read(req->r_inode)); 259 260 /* no object means success but no data */ 261 if (err == -ENOENT) 262 err = 0; 263 else if (err == -EBLOCKLISTED) 264 fsc->blocklisted = true; 265 266 if (err >= 0) { 267 if (sparse && err > 0) 268 err = ceph_sparse_ext_map_end(op); 269 if (err < subreq->len) 270 __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags); 271 if (IS_ENCRYPTED(inode) && err > 0) { 272 err = ceph_fscrypt_decrypt_extents(inode, 273 osd_data->pages, subreq->start, 274 op->extent.sparse_ext, 275 op->extent.sparse_ext_cnt); 276 if (err > subreq->len) 277 err = subreq->len; 278 } 279 } 280 281 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 282 ceph_put_page_vector(osd_data->pages, 283 calc_pages_for(osd_data->alignment, 284 osd_data->length), false); 285 } 286 netfs_subreq_terminated(subreq, err, false); 287 iput(req->r_inode); 288 } 289 290 static bool ceph_netfs_issue_op_inline(struct netfs_io_subrequest *subreq) 291 { 292 struct netfs_io_request *rreq = subreq->rreq; 293 struct inode *inode = rreq->inode; 294 struct ceph_mds_reply_info_parsed *rinfo; 295 struct ceph_mds_reply_info_in *iinfo; 296 struct ceph_mds_request *req; 297 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); 298 struct ceph_inode_info *ci = ceph_inode(inode); 299 struct iov_iter iter; 300 ssize_t err = 0; 301 size_t len; 302 int mode; 303 304 __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags); 305 __clear_bit(NETFS_SREQ_COPY_TO_CACHE, &subreq->flags); 306 307 if (subreq->start >= inode->i_size) 308 goto out; 309 310 /* We need to fetch the inline data. */ 311 mode = ceph_try_to_choose_auth_mds(inode, CEPH_STAT_CAP_INLINE_DATA); 312 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode); 313 if (IS_ERR(req)) { 314 err = PTR_ERR(req); 315 goto out; 316 } 317 req->r_ino1 = ci->i_vino; 318 req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INLINE_DATA); 319 req->r_num_caps = 2; 320 321 err = ceph_mdsc_do_request(mdsc, NULL, req); 322 if (err < 0) 323 goto out; 324 325 rinfo = &req->r_reply_info; 326 iinfo = &rinfo->targeti; 327 if (iinfo->inline_version == CEPH_INLINE_NONE) { 328 /* The data got uninlined */ 329 ceph_mdsc_put_request(req); 330 return false; 331 } 332 333 len = min_t(size_t, iinfo->inline_len - subreq->start, subreq->len); 334 iov_iter_xarray(&iter, ITER_DEST, &rreq->mapping->i_pages, subreq->start, len); 335 err = copy_to_iter(iinfo->inline_data + subreq->start, len, &iter); 336 if (err == 0) 337 err = -EFAULT; 338 339 ceph_mdsc_put_request(req); 340 out: 341 netfs_subreq_terminated(subreq, err, false); 342 return true; 343 } 344 345 static void ceph_netfs_issue_read(struct netfs_io_subrequest *subreq) 346 { 347 struct netfs_io_request *rreq = subreq->rreq; 348 struct inode *inode = rreq->inode; 349 struct ceph_inode_info *ci = ceph_inode(inode); 350 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 351 struct ceph_osd_request *req = NULL; 352 struct ceph_vino vino = ceph_vino(inode); 353 struct iov_iter iter; 354 int err = 0; 355 u64 len = subreq->len; 356 bool sparse = IS_ENCRYPTED(inode) || ceph_test_mount_opt(fsc, SPARSEREAD); 357 u64 off = subreq->start; 358 359 if (ceph_inode_is_shutdown(inode)) { 360 err = -EIO; 361 goto out; 362 } 363 364 if (ceph_has_inline_data(ci) && ceph_netfs_issue_op_inline(subreq)) 365 return; 366 367 ceph_fscrypt_adjust_off_and_len(inode, &off, &len); 368 369 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, 370 off, &len, 0, 1, sparse ? CEPH_OSD_OP_SPARSE_READ : CEPH_OSD_OP_READ, 371 CEPH_OSD_FLAG_READ | fsc->client->osdc.client->options->read_from_replica, 372 NULL, ci->i_truncate_seq, ci->i_truncate_size, false); 373 if (IS_ERR(req)) { 374 err = PTR_ERR(req); 375 req = NULL; 376 goto out; 377 } 378 379 if (sparse) { 380 err = ceph_alloc_sparse_ext_map(&req->r_ops[0]); 381 if (err) 382 goto out; 383 } 384 385 dout("%s: pos=%llu orig_len=%zu len=%llu\n", __func__, subreq->start, subreq->len, len); 386 387 iov_iter_xarray(&iter, ITER_DEST, &rreq->mapping->i_pages, subreq->start, len); 388 389 /* 390 * FIXME: For now, use CEPH_OSD_DATA_TYPE_PAGES instead of _ITER for 391 * encrypted inodes. We'd need infrastructure that handles an iov_iter 392 * instead of page arrays, and we don't have that as of yet. Once the 393 * dust settles on the write helpers and encrypt/decrypt routines for 394 * netfs, we should be able to rework this. 395 */ 396 if (IS_ENCRYPTED(inode)) { 397 struct page **pages; 398 size_t page_off; 399 400 err = iov_iter_get_pages_alloc2(&iter, &pages, len, &page_off); 401 if (err < 0) { 402 dout("%s: iov_ter_get_pages_alloc returned %d\n", 403 __func__, err); 404 goto out; 405 } 406 407 /* should always give us a page-aligned read */ 408 WARN_ON_ONCE(page_off); 409 len = err; 410 err = 0; 411 412 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, 413 false); 414 } else { 415 osd_req_op_extent_osd_iter(req, 0, &iter); 416 } 417 req->r_callback = finish_netfs_read; 418 req->r_priv = subreq; 419 req->r_inode = inode; 420 ihold(inode); 421 422 ceph_osdc_start_request(req->r_osdc, req); 423 out: 424 ceph_osdc_put_request(req); 425 if (err) 426 netfs_subreq_terminated(subreq, err, false); 427 dout("%s: result %d\n", __func__, err); 428 } 429 430 static int ceph_init_request(struct netfs_io_request *rreq, struct file *file) 431 { 432 struct inode *inode = rreq->inode; 433 int got = 0, want = CEPH_CAP_FILE_CACHE; 434 struct ceph_netfs_request_data *priv; 435 int ret = 0; 436 437 if (rreq->origin != NETFS_READAHEAD) 438 return 0; 439 440 priv = kzalloc(sizeof(*priv), GFP_NOFS); 441 if (!priv) 442 return -ENOMEM; 443 444 if (file) { 445 struct ceph_rw_context *rw_ctx; 446 struct ceph_file_info *fi = file->private_data; 447 448 priv->file_ra_pages = file->f_ra.ra_pages; 449 priv->file_ra_disabled = file->f_mode & FMODE_RANDOM; 450 451 rw_ctx = ceph_find_rw_context(fi); 452 if (rw_ctx) { 453 rreq->netfs_priv = priv; 454 return 0; 455 } 456 } 457 458 /* 459 * readahead callers do not necessarily hold Fcb caps 460 * (e.g. fadvise, madvise). 461 */ 462 ret = ceph_try_get_caps(inode, CEPH_CAP_FILE_RD, want, true, &got); 463 if (ret < 0) { 464 dout("start_read %p, error getting cap\n", inode); 465 goto out; 466 } 467 468 if (!(got & want)) { 469 dout("start_read %p, no cache cap\n", inode); 470 ret = -EACCES; 471 goto out; 472 } 473 if (ret == 0) { 474 ret = -EACCES; 475 goto out; 476 } 477 478 priv->caps = got; 479 rreq->netfs_priv = priv; 480 481 out: 482 if (ret < 0) 483 kfree(priv); 484 485 return ret; 486 } 487 488 static void ceph_netfs_free_request(struct netfs_io_request *rreq) 489 { 490 struct ceph_netfs_request_data *priv = rreq->netfs_priv; 491 492 if (!priv) 493 return; 494 495 if (priv->caps) 496 ceph_put_cap_refs(ceph_inode(rreq->inode), priv->caps); 497 kfree(priv); 498 rreq->netfs_priv = NULL; 499 } 500 501 const struct netfs_request_ops ceph_netfs_ops = { 502 .init_request = ceph_init_request, 503 .free_request = ceph_netfs_free_request, 504 .begin_cache_operation = ceph_begin_cache_operation, 505 .issue_read = ceph_netfs_issue_read, 506 .expand_readahead = ceph_netfs_expand_readahead, 507 .clamp_length = ceph_netfs_clamp_length, 508 .check_write_begin = ceph_netfs_check_write_begin, 509 }; 510 511 #ifdef CONFIG_CEPH_FSCACHE 512 static void ceph_set_page_fscache(struct page *page) 513 { 514 set_page_fscache(page); 515 } 516 517 static void ceph_fscache_write_terminated(void *priv, ssize_t error, bool was_async) 518 { 519 struct inode *inode = priv; 520 521 if (IS_ERR_VALUE(error) && error != -ENOBUFS) 522 ceph_fscache_invalidate(inode, false); 523 } 524 525 static void ceph_fscache_write_to_cache(struct inode *inode, u64 off, u64 len, bool caching) 526 { 527 struct ceph_inode_info *ci = ceph_inode(inode); 528 struct fscache_cookie *cookie = ceph_fscache_cookie(ci); 529 530 fscache_write_to_cache(cookie, inode->i_mapping, off, len, i_size_read(inode), 531 ceph_fscache_write_terminated, inode, caching); 532 } 533 #else 534 static inline void ceph_set_page_fscache(struct page *page) 535 { 536 } 537 538 static inline void ceph_fscache_write_to_cache(struct inode *inode, u64 off, u64 len, bool caching) 539 { 540 } 541 #endif /* CONFIG_CEPH_FSCACHE */ 542 543 struct ceph_writeback_ctl 544 { 545 loff_t i_size; 546 u64 truncate_size; 547 u32 truncate_seq; 548 bool size_stable; 549 bool head_snapc; 550 }; 551 552 /* 553 * Get ref for the oldest snapc for an inode with dirty data... that is, the 554 * only snap context we are allowed to write back. 555 */ 556 static struct ceph_snap_context * 557 get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl, 558 struct ceph_snap_context *page_snapc) 559 { 560 struct ceph_inode_info *ci = ceph_inode(inode); 561 struct ceph_snap_context *snapc = NULL; 562 struct ceph_cap_snap *capsnap = NULL; 563 564 spin_lock(&ci->i_ceph_lock); 565 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 566 dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap, 567 capsnap->context, capsnap->dirty_pages); 568 if (!capsnap->dirty_pages) 569 continue; 570 571 /* get i_size, truncate_{seq,size} for page_snapc? */ 572 if (snapc && capsnap->context != page_snapc) 573 continue; 574 575 if (ctl) { 576 if (capsnap->writing) { 577 ctl->i_size = i_size_read(inode); 578 ctl->size_stable = false; 579 } else { 580 ctl->i_size = capsnap->size; 581 ctl->size_stable = true; 582 } 583 ctl->truncate_size = capsnap->truncate_size; 584 ctl->truncate_seq = capsnap->truncate_seq; 585 ctl->head_snapc = false; 586 } 587 588 if (snapc) 589 break; 590 591 snapc = ceph_get_snap_context(capsnap->context); 592 if (!page_snapc || 593 page_snapc == snapc || 594 page_snapc->seq > snapc->seq) 595 break; 596 } 597 if (!snapc && ci->i_wrbuffer_ref_head) { 598 snapc = ceph_get_snap_context(ci->i_head_snapc); 599 dout(" head snapc %p has %d dirty pages\n", 600 snapc, ci->i_wrbuffer_ref_head); 601 if (ctl) { 602 ctl->i_size = i_size_read(inode); 603 ctl->truncate_size = ci->i_truncate_size; 604 ctl->truncate_seq = ci->i_truncate_seq; 605 ctl->size_stable = false; 606 ctl->head_snapc = true; 607 } 608 } 609 spin_unlock(&ci->i_ceph_lock); 610 return snapc; 611 } 612 613 static u64 get_writepages_data_length(struct inode *inode, 614 struct page *page, u64 start) 615 { 616 struct ceph_inode_info *ci = ceph_inode(inode); 617 struct ceph_snap_context *snapc; 618 struct ceph_cap_snap *capsnap = NULL; 619 u64 end = i_size_read(inode); 620 u64 ret; 621 622 snapc = page_snap_context(ceph_fscrypt_pagecache_page(page)); 623 if (snapc != ci->i_head_snapc) { 624 bool found = false; 625 spin_lock(&ci->i_ceph_lock); 626 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 627 if (capsnap->context == snapc) { 628 if (!capsnap->writing) 629 end = capsnap->size; 630 found = true; 631 break; 632 } 633 } 634 spin_unlock(&ci->i_ceph_lock); 635 WARN_ON(!found); 636 } 637 if (end > ceph_fscrypt_page_offset(page) + thp_size(page)) 638 end = ceph_fscrypt_page_offset(page) + thp_size(page); 639 ret = end > start ? end - start : 0; 640 if (ret && fscrypt_is_bounce_page(page)) 641 ret = round_up(ret, CEPH_FSCRYPT_BLOCK_SIZE); 642 return ret; 643 } 644 645 /* 646 * Write a single page, but leave the page locked. 647 * 648 * If we get a write error, mark the mapping for error, but still adjust the 649 * dirty page accounting (i.e., page is no longer dirty). 650 */ 651 static int writepage_nounlock(struct page *page, struct writeback_control *wbc) 652 { 653 struct folio *folio = page_folio(page); 654 struct inode *inode = page->mapping->host; 655 struct ceph_inode_info *ci = ceph_inode(inode); 656 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 657 struct ceph_snap_context *snapc, *oldest; 658 loff_t page_off = page_offset(page); 659 int err; 660 loff_t len = thp_size(page); 661 loff_t wlen; 662 struct ceph_writeback_ctl ceph_wbc; 663 struct ceph_osd_client *osdc = &fsc->client->osdc; 664 struct ceph_osd_request *req; 665 bool caching = ceph_is_cache_enabled(inode); 666 struct page *bounce_page = NULL; 667 668 dout("writepage %p idx %lu\n", page, page->index); 669 670 if (ceph_inode_is_shutdown(inode)) 671 return -EIO; 672 673 /* verify this is a writeable snap context */ 674 snapc = page_snap_context(page); 675 if (!snapc) { 676 dout("writepage %p page %p not dirty?\n", inode, page); 677 return 0; 678 } 679 oldest = get_oldest_context(inode, &ceph_wbc, snapc); 680 if (snapc->seq > oldest->seq) { 681 dout("writepage %p page %p snapc %p not writeable - noop\n", 682 inode, page, snapc); 683 /* we should only noop if called by kswapd */ 684 WARN_ON(!(current->flags & PF_MEMALLOC)); 685 ceph_put_snap_context(oldest); 686 redirty_page_for_writepage(wbc, page); 687 return 0; 688 } 689 ceph_put_snap_context(oldest); 690 691 /* is this a partial page at end of file? */ 692 if (page_off >= ceph_wbc.i_size) { 693 dout("folio at %lu beyond eof %llu\n", folio->index, 694 ceph_wbc.i_size); 695 folio_invalidate(folio, 0, folio_size(folio)); 696 return 0; 697 } 698 699 if (ceph_wbc.i_size < page_off + len) 700 len = ceph_wbc.i_size - page_off; 701 702 wlen = IS_ENCRYPTED(inode) ? round_up(len, CEPH_FSCRYPT_BLOCK_SIZE) : len; 703 dout("writepage %p page %p index %lu on %llu~%llu snapc %p seq %lld\n", 704 inode, page, page->index, page_off, wlen, snapc, snapc->seq); 705 706 if (atomic_long_inc_return(&fsc->writeback_count) > 707 CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb)) 708 fsc->write_congested = true; 709 710 req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), 711 page_off, &wlen, 0, 1, CEPH_OSD_OP_WRITE, 712 CEPH_OSD_FLAG_WRITE, snapc, 713 ceph_wbc.truncate_seq, 714 ceph_wbc.truncate_size, true); 715 if (IS_ERR(req)) { 716 redirty_page_for_writepage(wbc, page); 717 return PTR_ERR(req); 718 } 719 720 if (wlen < len) 721 len = wlen; 722 723 set_page_writeback(page); 724 if (caching) 725 ceph_set_page_fscache(page); 726 ceph_fscache_write_to_cache(inode, page_off, len, caching); 727 728 if (IS_ENCRYPTED(inode)) { 729 bounce_page = fscrypt_encrypt_pagecache_blocks(page, 730 CEPH_FSCRYPT_BLOCK_SIZE, 0, 731 GFP_NOFS); 732 if (IS_ERR(bounce_page)) { 733 redirty_page_for_writepage(wbc, page); 734 end_page_writeback(page); 735 ceph_osdc_put_request(req); 736 return PTR_ERR(bounce_page); 737 } 738 } 739 740 /* it may be a short write due to an object boundary */ 741 WARN_ON_ONCE(len > thp_size(page)); 742 osd_req_op_extent_osd_data_pages(req, 0, 743 bounce_page ? &bounce_page : &page, wlen, 0, 744 false, false); 745 dout("writepage %llu~%llu (%llu bytes, %sencrypted)\n", 746 page_off, len, wlen, IS_ENCRYPTED(inode) ? "" : "not "); 747 748 req->r_mtime = inode->i_mtime; 749 ceph_osdc_start_request(osdc, req); 750 err = ceph_osdc_wait_request(osdc, req); 751 752 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, 753 req->r_end_latency, len, err); 754 fscrypt_free_bounce_page(bounce_page); 755 ceph_osdc_put_request(req); 756 if (err == 0) 757 err = len; 758 759 if (err < 0) { 760 struct writeback_control tmp_wbc; 761 if (!wbc) 762 wbc = &tmp_wbc; 763 if (err == -ERESTARTSYS) { 764 /* killed by SIGKILL */ 765 dout("writepage interrupted page %p\n", page); 766 redirty_page_for_writepage(wbc, page); 767 end_page_writeback(page); 768 return err; 769 } 770 if (err == -EBLOCKLISTED) 771 fsc->blocklisted = true; 772 dout("writepage setting page/mapping error %d %p\n", 773 err, page); 774 mapping_set_error(&inode->i_data, err); 775 wbc->pages_skipped++; 776 } else { 777 dout("writepage cleaned page %p\n", page); 778 err = 0; /* vfs expects us to return 0 */ 779 } 780 oldest = detach_page_private(page); 781 WARN_ON_ONCE(oldest != snapc); 782 end_page_writeback(page); 783 ceph_put_wrbuffer_cap_refs(ci, 1, snapc); 784 ceph_put_snap_context(snapc); /* page's reference */ 785 786 if (atomic_long_dec_return(&fsc->writeback_count) < 787 CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb)) 788 fsc->write_congested = false; 789 790 return err; 791 } 792 793 static int ceph_writepage(struct page *page, struct writeback_control *wbc) 794 { 795 int err; 796 struct inode *inode = page->mapping->host; 797 BUG_ON(!inode); 798 ihold(inode); 799 800 if (wbc->sync_mode == WB_SYNC_NONE && 801 ceph_inode_to_client(inode)->write_congested) 802 return AOP_WRITEPAGE_ACTIVATE; 803 804 wait_on_page_fscache(page); 805 806 err = writepage_nounlock(page, wbc); 807 if (err == -ERESTARTSYS) { 808 /* direct memory reclaimer was killed by SIGKILL. return 0 809 * to prevent caller from setting mapping/page error */ 810 err = 0; 811 } 812 unlock_page(page); 813 iput(inode); 814 return err; 815 } 816 817 /* 818 * async writeback completion handler. 819 * 820 * If we get an error, set the mapping error bit, but not the individual 821 * page error bits. 822 */ 823 static void writepages_finish(struct ceph_osd_request *req) 824 { 825 struct inode *inode = req->r_inode; 826 struct ceph_inode_info *ci = ceph_inode(inode); 827 struct ceph_osd_data *osd_data; 828 struct page *page; 829 int num_pages, total_pages = 0; 830 int i, j; 831 int rc = req->r_result; 832 struct ceph_snap_context *snapc = req->r_snapc; 833 struct address_space *mapping = inode->i_mapping; 834 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 835 unsigned int len = 0; 836 bool remove_page; 837 838 dout("writepages_finish %p rc %d\n", inode, rc); 839 if (rc < 0) { 840 mapping_set_error(mapping, rc); 841 ceph_set_error_write(ci); 842 if (rc == -EBLOCKLISTED) 843 fsc->blocklisted = true; 844 } else { 845 ceph_clear_error_write(ci); 846 } 847 848 /* 849 * We lost the cache cap, need to truncate the page before 850 * it is unlocked, otherwise we'd truncate it later in the 851 * page truncation thread, possibly losing some data that 852 * raced its way in 853 */ 854 remove_page = !(ceph_caps_issued(ci) & 855 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)); 856 857 /* clean all pages */ 858 for (i = 0; i < req->r_num_ops; i++) { 859 if (req->r_ops[i].op != CEPH_OSD_OP_WRITE) { 860 pr_warn("%s incorrect op %d req %p index %d tid %llu\n", 861 __func__, req->r_ops[i].op, req, i, req->r_tid); 862 break; 863 } 864 865 osd_data = osd_req_op_extent_osd_data(req, i); 866 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); 867 len += osd_data->length; 868 num_pages = calc_pages_for((u64)osd_data->alignment, 869 (u64)osd_data->length); 870 total_pages += num_pages; 871 for (j = 0; j < num_pages; j++) { 872 page = osd_data->pages[j]; 873 if (fscrypt_is_bounce_page(page)) { 874 page = fscrypt_pagecache_page(page); 875 fscrypt_free_bounce_page(osd_data->pages[j]); 876 osd_data->pages[j] = page; 877 } 878 BUG_ON(!page); 879 WARN_ON(!PageUptodate(page)); 880 881 if (atomic_long_dec_return(&fsc->writeback_count) < 882 CONGESTION_OFF_THRESH( 883 fsc->mount_options->congestion_kb)) 884 fsc->write_congested = false; 885 886 ceph_put_snap_context(detach_page_private(page)); 887 end_page_writeback(page); 888 dout("unlocking %p\n", page); 889 890 if (remove_page) 891 generic_error_remove_page(inode->i_mapping, 892 page); 893 894 unlock_page(page); 895 } 896 dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n", 897 inode, osd_data->length, rc >= 0 ? num_pages : 0); 898 899 release_pages(osd_data->pages, num_pages); 900 } 901 902 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, 903 req->r_end_latency, len, rc); 904 905 ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc); 906 907 osd_data = osd_req_op_extent_osd_data(req, 0); 908 if (osd_data->pages_from_pool) 909 mempool_free(osd_data->pages, ceph_wb_pagevec_pool); 910 else 911 kfree(osd_data->pages); 912 ceph_osdc_put_request(req); 913 } 914 915 /* 916 * initiate async writeback 917 */ 918 static int ceph_writepages_start(struct address_space *mapping, 919 struct writeback_control *wbc) 920 { 921 struct inode *inode = mapping->host; 922 struct ceph_inode_info *ci = ceph_inode(inode); 923 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 924 struct ceph_vino vino = ceph_vino(inode); 925 pgoff_t index, start_index, end = -1; 926 struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc; 927 struct folio_batch fbatch; 928 int rc = 0; 929 unsigned int wsize = i_blocksize(inode); 930 struct ceph_osd_request *req = NULL; 931 struct ceph_writeback_ctl ceph_wbc; 932 bool should_loop, range_whole = false; 933 bool done = false; 934 bool caching = ceph_is_cache_enabled(inode); 935 xa_mark_t tag; 936 937 if (wbc->sync_mode == WB_SYNC_NONE && 938 fsc->write_congested) 939 return 0; 940 941 dout("writepages_start %p (mode=%s)\n", inode, 942 wbc->sync_mode == WB_SYNC_NONE ? "NONE" : 943 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); 944 945 if (ceph_inode_is_shutdown(inode)) { 946 if (ci->i_wrbuffer_ref > 0) { 947 pr_warn_ratelimited( 948 "writepage_start %p %lld forced umount\n", 949 inode, ceph_ino(inode)); 950 } 951 mapping_set_error(mapping, -EIO); 952 return -EIO; /* we're in a forced umount, don't write! */ 953 } 954 if (fsc->mount_options->wsize < wsize) 955 wsize = fsc->mount_options->wsize; 956 957 folio_batch_init(&fbatch); 958 959 start_index = wbc->range_cyclic ? mapping->writeback_index : 0; 960 index = start_index; 961 962 if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) { 963 tag = PAGECACHE_TAG_TOWRITE; 964 } else { 965 tag = PAGECACHE_TAG_DIRTY; 966 } 967 retry: 968 /* find oldest snap context with dirty data */ 969 snapc = get_oldest_context(inode, &ceph_wbc, NULL); 970 if (!snapc) { 971 /* hmm, why does writepages get called when there 972 is no dirty data? */ 973 dout(" no snap context with dirty data?\n"); 974 goto out; 975 } 976 dout(" oldest snapc is %p seq %lld (%d snaps)\n", 977 snapc, snapc->seq, snapc->num_snaps); 978 979 should_loop = false; 980 if (ceph_wbc.head_snapc && snapc != last_snapc) { 981 /* where to start/end? */ 982 if (wbc->range_cyclic) { 983 index = start_index; 984 end = -1; 985 if (index > 0) 986 should_loop = true; 987 dout(" cyclic, start at %lu\n", index); 988 } else { 989 index = wbc->range_start >> PAGE_SHIFT; 990 end = wbc->range_end >> PAGE_SHIFT; 991 if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX) 992 range_whole = true; 993 dout(" not cyclic, %lu to %lu\n", index, end); 994 } 995 } else if (!ceph_wbc.head_snapc) { 996 /* Do not respect wbc->range_{start,end}. Dirty pages 997 * in that range can be associated with newer snapc. 998 * They are not writeable until we write all dirty pages 999 * associated with 'snapc' get written */ 1000 if (index > 0) 1001 should_loop = true; 1002 dout(" non-head snapc, range whole\n"); 1003 } 1004 1005 if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) 1006 tag_pages_for_writeback(mapping, index, end); 1007 1008 ceph_put_snap_context(last_snapc); 1009 last_snapc = snapc; 1010 1011 while (!done && index <= end) { 1012 int num_ops = 0, op_idx; 1013 unsigned i, nr_folios, max_pages, locked_pages = 0; 1014 struct page **pages = NULL, **data_pages; 1015 struct page *page; 1016 pgoff_t strip_unit_end = 0; 1017 u64 offset = 0, len = 0; 1018 bool from_pool = false; 1019 1020 max_pages = wsize >> PAGE_SHIFT; 1021 1022 get_more_pages: 1023 nr_folios = filemap_get_folios_tag(mapping, &index, 1024 end, tag, &fbatch); 1025 dout("pagevec_lookup_range_tag got %d\n", nr_folios); 1026 if (!nr_folios && !locked_pages) 1027 break; 1028 for (i = 0; i < nr_folios && locked_pages < max_pages; i++) { 1029 page = &fbatch.folios[i]->page; 1030 dout("? %p idx %lu\n", page, page->index); 1031 if (locked_pages == 0) 1032 lock_page(page); /* first page */ 1033 else if (!trylock_page(page)) 1034 break; 1035 1036 /* only dirty pages, or our accounting breaks */ 1037 if (unlikely(!PageDirty(page)) || 1038 unlikely(page->mapping != mapping)) { 1039 dout("!dirty or !mapping %p\n", page); 1040 unlock_page(page); 1041 continue; 1042 } 1043 /* only if matching snap context */ 1044 pgsnapc = page_snap_context(page); 1045 if (pgsnapc != snapc) { 1046 dout("page snapc %p %lld != oldest %p %lld\n", 1047 pgsnapc, pgsnapc->seq, snapc, snapc->seq); 1048 if (!should_loop && 1049 !ceph_wbc.head_snapc && 1050 wbc->sync_mode != WB_SYNC_NONE) 1051 should_loop = true; 1052 unlock_page(page); 1053 continue; 1054 } 1055 if (page_offset(page) >= ceph_wbc.i_size) { 1056 struct folio *folio = page_folio(page); 1057 1058 dout("folio at %lu beyond eof %llu\n", 1059 folio->index, ceph_wbc.i_size); 1060 if ((ceph_wbc.size_stable || 1061 folio_pos(folio) >= i_size_read(inode)) && 1062 folio_clear_dirty_for_io(folio)) 1063 folio_invalidate(folio, 0, 1064 folio_size(folio)); 1065 folio_unlock(folio); 1066 continue; 1067 } 1068 if (strip_unit_end && (page->index > strip_unit_end)) { 1069 dout("end of strip unit %p\n", page); 1070 unlock_page(page); 1071 break; 1072 } 1073 if (PageWriteback(page) || PageFsCache(page)) { 1074 if (wbc->sync_mode == WB_SYNC_NONE) { 1075 dout("%p under writeback\n", page); 1076 unlock_page(page); 1077 continue; 1078 } 1079 dout("waiting on writeback %p\n", page); 1080 wait_on_page_writeback(page); 1081 wait_on_page_fscache(page); 1082 } 1083 1084 if (!clear_page_dirty_for_io(page)) { 1085 dout("%p !clear_page_dirty_for_io\n", page); 1086 unlock_page(page); 1087 continue; 1088 } 1089 1090 /* 1091 * We have something to write. If this is 1092 * the first locked page this time through, 1093 * calculate max possinle write size and 1094 * allocate a page array 1095 */ 1096 if (locked_pages == 0) { 1097 u64 objnum; 1098 u64 objoff; 1099 u32 xlen; 1100 1101 /* prepare async write request */ 1102 offset = (u64)page_offset(page); 1103 ceph_calc_file_object_mapping(&ci->i_layout, 1104 offset, wsize, 1105 &objnum, &objoff, 1106 &xlen); 1107 len = xlen; 1108 1109 num_ops = 1; 1110 strip_unit_end = page->index + 1111 ((len - 1) >> PAGE_SHIFT); 1112 1113 BUG_ON(pages); 1114 max_pages = calc_pages_for(0, (u64)len); 1115 pages = kmalloc_array(max_pages, 1116 sizeof(*pages), 1117 GFP_NOFS); 1118 if (!pages) { 1119 from_pool = true; 1120 pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); 1121 BUG_ON(!pages); 1122 } 1123 1124 len = 0; 1125 } else if (page->index != 1126 (offset + len) >> PAGE_SHIFT) { 1127 if (num_ops >= (from_pool ? CEPH_OSD_SLAB_OPS : 1128 CEPH_OSD_MAX_OPS)) { 1129 redirty_page_for_writepage(wbc, page); 1130 unlock_page(page); 1131 break; 1132 } 1133 1134 num_ops++; 1135 offset = (u64)page_offset(page); 1136 len = 0; 1137 } 1138 1139 /* note position of first page in fbatch */ 1140 dout("%p will write page %p idx %lu\n", 1141 inode, page, page->index); 1142 1143 if (atomic_long_inc_return(&fsc->writeback_count) > 1144 CONGESTION_ON_THRESH( 1145 fsc->mount_options->congestion_kb)) 1146 fsc->write_congested = true; 1147 1148 if (IS_ENCRYPTED(inode)) { 1149 pages[locked_pages] = 1150 fscrypt_encrypt_pagecache_blocks(page, 1151 PAGE_SIZE, 0, 1152 locked_pages ? GFP_NOWAIT : GFP_NOFS); 1153 if (IS_ERR(pages[locked_pages])) { 1154 if (PTR_ERR(pages[locked_pages]) == -EINVAL) 1155 pr_err("%s: inode->i_blkbits=%hhu\n", 1156 __func__, inode->i_blkbits); 1157 /* better not fail on first page! */ 1158 BUG_ON(locked_pages == 0); 1159 pages[locked_pages] = NULL; 1160 redirty_page_for_writepage(wbc, page); 1161 unlock_page(page); 1162 break; 1163 } 1164 ++locked_pages; 1165 } else { 1166 pages[locked_pages++] = page; 1167 } 1168 1169 fbatch.folios[i] = NULL; 1170 len += thp_size(page); 1171 } 1172 1173 /* did we get anything? */ 1174 if (!locked_pages) 1175 goto release_folios; 1176 if (i) { 1177 unsigned j, n = 0; 1178 /* shift unused page to beginning of fbatch */ 1179 for (j = 0; j < nr_folios; j++) { 1180 if (!fbatch.folios[j]) 1181 continue; 1182 if (n < j) 1183 fbatch.folios[n] = fbatch.folios[j]; 1184 n++; 1185 } 1186 fbatch.nr = n; 1187 1188 if (nr_folios && i == nr_folios && 1189 locked_pages < max_pages) { 1190 dout("reached end fbatch, trying for more\n"); 1191 folio_batch_release(&fbatch); 1192 goto get_more_pages; 1193 } 1194 } 1195 1196 new_request: 1197 offset = ceph_fscrypt_page_offset(pages[0]); 1198 len = wsize; 1199 1200 req = ceph_osdc_new_request(&fsc->client->osdc, 1201 &ci->i_layout, vino, 1202 offset, &len, 0, num_ops, 1203 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, 1204 snapc, ceph_wbc.truncate_seq, 1205 ceph_wbc.truncate_size, false); 1206 if (IS_ERR(req)) { 1207 req = ceph_osdc_new_request(&fsc->client->osdc, 1208 &ci->i_layout, vino, 1209 offset, &len, 0, 1210 min(num_ops, 1211 CEPH_OSD_SLAB_OPS), 1212 CEPH_OSD_OP_WRITE, 1213 CEPH_OSD_FLAG_WRITE, 1214 snapc, ceph_wbc.truncate_seq, 1215 ceph_wbc.truncate_size, true); 1216 BUG_ON(IS_ERR(req)); 1217 } 1218 BUG_ON(len < ceph_fscrypt_page_offset(pages[locked_pages - 1]) + 1219 thp_size(pages[locked_pages - 1]) - offset); 1220 1221 req->r_callback = writepages_finish; 1222 req->r_inode = inode; 1223 1224 /* Format the osd request message and submit the write */ 1225 len = 0; 1226 data_pages = pages; 1227 op_idx = 0; 1228 for (i = 0; i < locked_pages; i++) { 1229 struct page *page = ceph_fscrypt_pagecache_page(pages[i]); 1230 1231 u64 cur_offset = page_offset(page); 1232 /* 1233 * Discontinuity in page range? Ceph can handle that by just passing 1234 * multiple extents in the write op. 1235 */ 1236 if (offset + len != cur_offset) { 1237 /* If it's full, stop here */ 1238 if (op_idx + 1 == req->r_num_ops) 1239 break; 1240 1241 /* Kick off an fscache write with what we have so far. */ 1242 ceph_fscache_write_to_cache(inode, offset, len, caching); 1243 1244 /* Start a new extent */ 1245 osd_req_op_extent_dup_last(req, op_idx, 1246 cur_offset - offset); 1247 dout("writepages got pages at %llu~%llu\n", 1248 offset, len); 1249 osd_req_op_extent_osd_data_pages(req, op_idx, 1250 data_pages, len, 0, 1251 from_pool, false); 1252 osd_req_op_extent_update(req, op_idx, len); 1253 1254 len = 0; 1255 offset = cur_offset; 1256 data_pages = pages + i; 1257 op_idx++; 1258 } 1259 1260 set_page_writeback(page); 1261 if (caching) 1262 ceph_set_page_fscache(page); 1263 len += thp_size(page); 1264 } 1265 ceph_fscache_write_to_cache(inode, offset, len, caching); 1266 1267 if (ceph_wbc.size_stable) { 1268 len = min(len, ceph_wbc.i_size - offset); 1269 } else if (i == locked_pages) { 1270 /* writepages_finish() clears writeback pages 1271 * according to the data length, so make sure 1272 * data length covers all locked pages */ 1273 u64 min_len = len + 1 - thp_size(page); 1274 len = get_writepages_data_length(inode, pages[i - 1], 1275 offset); 1276 len = max(len, min_len); 1277 } 1278 if (IS_ENCRYPTED(inode)) 1279 len = round_up(len, CEPH_FSCRYPT_BLOCK_SIZE); 1280 1281 dout("writepages got pages at %llu~%llu\n", offset, len); 1282 1283 if (IS_ENCRYPTED(inode) && 1284 ((offset | len) & ~CEPH_FSCRYPT_BLOCK_MASK)) 1285 pr_warn("%s: bad encrypted write offset=%lld len=%llu\n", 1286 __func__, offset, len); 1287 1288 osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len, 1289 0, from_pool, false); 1290 osd_req_op_extent_update(req, op_idx, len); 1291 1292 BUG_ON(op_idx + 1 != req->r_num_ops); 1293 1294 from_pool = false; 1295 if (i < locked_pages) { 1296 BUG_ON(num_ops <= req->r_num_ops); 1297 num_ops -= req->r_num_ops; 1298 locked_pages -= i; 1299 1300 /* allocate new pages array for next request */ 1301 data_pages = pages; 1302 pages = kmalloc_array(locked_pages, sizeof(*pages), 1303 GFP_NOFS); 1304 if (!pages) { 1305 from_pool = true; 1306 pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); 1307 BUG_ON(!pages); 1308 } 1309 memcpy(pages, data_pages + i, 1310 locked_pages * sizeof(*pages)); 1311 memset(data_pages + i, 0, 1312 locked_pages * sizeof(*pages)); 1313 } else { 1314 BUG_ON(num_ops != req->r_num_ops); 1315 index = pages[i - 1]->index + 1; 1316 /* request message now owns the pages array */ 1317 pages = NULL; 1318 } 1319 1320 req->r_mtime = inode->i_mtime; 1321 ceph_osdc_start_request(&fsc->client->osdc, req); 1322 req = NULL; 1323 1324 wbc->nr_to_write -= i; 1325 if (pages) 1326 goto new_request; 1327 1328 /* 1329 * We stop writing back only if we are not doing 1330 * integrity sync. In case of integrity sync we have to 1331 * keep going until we have written all the pages 1332 * we tagged for writeback prior to entering this loop. 1333 */ 1334 if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE) 1335 done = true; 1336 1337 release_folios: 1338 dout("folio_batch release on %d folios (%p)\n", (int)fbatch.nr, 1339 fbatch.nr ? fbatch.folios[0] : NULL); 1340 folio_batch_release(&fbatch); 1341 } 1342 1343 if (should_loop && !done) { 1344 /* more to do; loop back to beginning of file */ 1345 dout("writepages looping back to beginning of file\n"); 1346 end = start_index - 1; /* OK even when start_index == 0 */ 1347 1348 /* to write dirty pages associated with next snapc, 1349 * we need to wait until current writes complete */ 1350 if (wbc->sync_mode != WB_SYNC_NONE && 1351 start_index == 0 && /* all dirty pages were checked */ 1352 !ceph_wbc.head_snapc) { 1353 struct page *page; 1354 unsigned i, nr; 1355 index = 0; 1356 while ((index <= end) && 1357 (nr = filemap_get_folios_tag(mapping, &index, 1358 (pgoff_t)-1, 1359 PAGECACHE_TAG_WRITEBACK, 1360 &fbatch))) { 1361 for (i = 0; i < nr; i++) { 1362 page = &fbatch.folios[i]->page; 1363 if (page_snap_context(page) != snapc) 1364 continue; 1365 wait_on_page_writeback(page); 1366 } 1367 folio_batch_release(&fbatch); 1368 cond_resched(); 1369 } 1370 } 1371 1372 start_index = 0; 1373 index = 0; 1374 goto retry; 1375 } 1376 1377 if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) 1378 mapping->writeback_index = index; 1379 1380 out: 1381 ceph_osdc_put_request(req); 1382 ceph_put_snap_context(last_snapc); 1383 dout("writepages dend - startone, rc = %d\n", rc); 1384 return rc; 1385 } 1386 1387 1388 1389 /* 1390 * See if a given @snapc is either writeable, or already written. 1391 */ 1392 static int context_is_writeable_or_written(struct inode *inode, 1393 struct ceph_snap_context *snapc) 1394 { 1395 struct ceph_snap_context *oldest = get_oldest_context(inode, NULL, NULL); 1396 int ret = !oldest || snapc->seq <= oldest->seq; 1397 1398 ceph_put_snap_context(oldest); 1399 return ret; 1400 } 1401 1402 /** 1403 * ceph_find_incompatible - find an incompatible context and return it 1404 * @page: page being dirtied 1405 * 1406 * We are only allowed to write into/dirty a page if the page is 1407 * clean, or already dirty within the same snap context. Returns a 1408 * conflicting context if there is one, NULL if there isn't, or a 1409 * negative error code on other errors. 1410 * 1411 * Must be called with page lock held. 1412 */ 1413 static struct ceph_snap_context * 1414 ceph_find_incompatible(struct page *page) 1415 { 1416 struct inode *inode = page->mapping->host; 1417 struct ceph_inode_info *ci = ceph_inode(inode); 1418 1419 if (ceph_inode_is_shutdown(inode)) { 1420 dout(" page %p %llx:%llx is shutdown\n", page, 1421 ceph_vinop(inode)); 1422 return ERR_PTR(-ESTALE); 1423 } 1424 1425 for (;;) { 1426 struct ceph_snap_context *snapc, *oldest; 1427 1428 wait_on_page_writeback(page); 1429 1430 snapc = page_snap_context(page); 1431 if (!snapc || snapc == ci->i_head_snapc) 1432 break; 1433 1434 /* 1435 * this page is already dirty in another (older) snap 1436 * context! is it writeable now? 1437 */ 1438 oldest = get_oldest_context(inode, NULL, NULL); 1439 if (snapc->seq > oldest->seq) { 1440 /* not writeable -- return it for the caller to deal with */ 1441 ceph_put_snap_context(oldest); 1442 dout(" page %p snapc %p not current or oldest\n", page, snapc); 1443 return ceph_get_snap_context(snapc); 1444 } 1445 ceph_put_snap_context(oldest); 1446 1447 /* yay, writeable, do it now (without dropping page lock) */ 1448 dout(" page %p snapc %p not current, but oldest\n", page, snapc); 1449 if (clear_page_dirty_for_io(page)) { 1450 int r = writepage_nounlock(page, NULL); 1451 if (r < 0) 1452 return ERR_PTR(r); 1453 } 1454 } 1455 return NULL; 1456 } 1457 1458 static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len, 1459 struct folio **foliop, void **_fsdata) 1460 { 1461 struct inode *inode = file_inode(file); 1462 struct ceph_inode_info *ci = ceph_inode(inode); 1463 struct ceph_snap_context *snapc; 1464 1465 snapc = ceph_find_incompatible(folio_page(*foliop, 0)); 1466 if (snapc) { 1467 int r; 1468 1469 folio_unlock(*foliop); 1470 folio_put(*foliop); 1471 *foliop = NULL; 1472 if (IS_ERR(snapc)) 1473 return PTR_ERR(snapc); 1474 1475 ceph_queue_writeback(inode); 1476 r = wait_event_killable(ci->i_cap_wq, 1477 context_is_writeable_or_written(inode, snapc)); 1478 ceph_put_snap_context(snapc); 1479 return r == 0 ? -EAGAIN : r; 1480 } 1481 return 0; 1482 } 1483 1484 /* 1485 * We are only allowed to write into/dirty the page if the page is 1486 * clean, or already dirty within the same snap context. 1487 */ 1488 static int ceph_write_begin(struct file *file, struct address_space *mapping, 1489 loff_t pos, unsigned len, 1490 struct page **pagep, void **fsdata) 1491 { 1492 struct inode *inode = file_inode(file); 1493 struct ceph_inode_info *ci = ceph_inode(inode); 1494 struct folio *folio = NULL; 1495 int r; 1496 1497 r = netfs_write_begin(&ci->netfs, file, inode->i_mapping, pos, len, &folio, NULL); 1498 if (r < 0) 1499 return r; 1500 1501 folio_wait_fscache(folio); 1502 WARN_ON_ONCE(!folio_test_locked(folio)); 1503 *pagep = &folio->page; 1504 return 0; 1505 } 1506 1507 /* 1508 * we don't do anything in here that simple_write_end doesn't do 1509 * except adjust dirty page accounting 1510 */ 1511 static int ceph_write_end(struct file *file, struct address_space *mapping, 1512 loff_t pos, unsigned len, unsigned copied, 1513 struct page *subpage, void *fsdata) 1514 { 1515 struct folio *folio = page_folio(subpage); 1516 struct inode *inode = file_inode(file); 1517 bool check_cap = false; 1518 1519 dout("write_end file %p inode %p folio %p %d~%d (%d)\n", file, 1520 inode, folio, (int)pos, (int)copied, (int)len); 1521 1522 if (!folio_test_uptodate(folio)) { 1523 /* just return that nothing was copied on a short copy */ 1524 if (copied < len) { 1525 copied = 0; 1526 goto out; 1527 } 1528 folio_mark_uptodate(folio); 1529 } 1530 1531 /* did file size increase? */ 1532 if (pos+copied > i_size_read(inode)) 1533 check_cap = ceph_inode_set_size(inode, pos+copied); 1534 1535 folio_mark_dirty(folio); 1536 1537 out: 1538 folio_unlock(folio); 1539 folio_put(folio); 1540 1541 if (check_cap) 1542 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY); 1543 1544 return copied; 1545 } 1546 1547 const struct address_space_operations ceph_aops = { 1548 .read_folio = netfs_read_folio, 1549 .readahead = netfs_readahead, 1550 .writepage = ceph_writepage, 1551 .writepages = ceph_writepages_start, 1552 .write_begin = ceph_write_begin, 1553 .write_end = ceph_write_end, 1554 .dirty_folio = ceph_dirty_folio, 1555 .invalidate_folio = ceph_invalidate_folio, 1556 .release_folio = ceph_release_folio, 1557 .direct_IO = noop_direct_IO, 1558 }; 1559 1560 static void ceph_block_sigs(sigset_t *oldset) 1561 { 1562 sigset_t mask; 1563 siginitsetinv(&mask, sigmask(SIGKILL)); 1564 sigprocmask(SIG_BLOCK, &mask, oldset); 1565 } 1566 1567 static void ceph_restore_sigs(sigset_t *oldset) 1568 { 1569 sigprocmask(SIG_SETMASK, oldset, NULL); 1570 } 1571 1572 /* 1573 * vm ops 1574 */ 1575 static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf) 1576 { 1577 struct vm_area_struct *vma = vmf->vma; 1578 struct inode *inode = file_inode(vma->vm_file); 1579 struct ceph_inode_info *ci = ceph_inode(inode); 1580 struct ceph_file_info *fi = vma->vm_file->private_data; 1581 loff_t off = (loff_t)vmf->pgoff << PAGE_SHIFT; 1582 int want, got, err; 1583 sigset_t oldset; 1584 vm_fault_t ret = VM_FAULT_SIGBUS; 1585 1586 if (ceph_inode_is_shutdown(inode)) 1587 return ret; 1588 1589 ceph_block_sigs(&oldset); 1590 1591 dout("filemap_fault %p %llx.%llx %llu trying to get caps\n", 1592 inode, ceph_vinop(inode), off); 1593 if (fi->fmode & CEPH_FILE_MODE_LAZY) 1594 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; 1595 else 1596 want = CEPH_CAP_FILE_CACHE; 1597 1598 got = 0; 1599 err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_RD, want, -1, &got); 1600 if (err < 0) 1601 goto out_restore; 1602 1603 dout("filemap_fault %p %llu got cap refs on %s\n", 1604 inode, off, ceph_cap_string(got)); 1605 1606 if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) || 1607 !ceph_has_inline_data(ci)) { 1608 CEPH_DEFINE_RW_CONTEXT(rw_ctx, got); 1609 ceph_add_rw_context(fi, &rw_ctx); 1610 ret = filemap_fault(vmf); 1611 ceph_del_rw_context(fi, &rw_ctx); 1612 dout("filemap_fault %p %llu drop cap refs %s ret %x\n", 1613 inode, off, ceph_cap_string(got), ret); 1614 } else 1615 err = -EAGAIN; 1616 1617 ceph_put_cap_refs(ci, got); 1618 1619 if (err != -EAGAIN) 1620 goto out_restore; 1621 1622 /* read inline data */ 1623 if (off >= PAGE_SIZE) { 1624 /* does not support inline data > PAGE_SIZE */ 1625 ret = VM_FAULT_SIGBUS; 1626 } else { 1627 struct address_space *mapping = inode->i_mapping; 1628 struct page *page; 1629 1630 filemap_invalidate_lock_shared(mapping); 1631 page = find_or_create_page(mapping, 0, 1632 mapping_gfp_constraint(mapping, ~__GFP_FS)); 1633 if (!page) { 1634 ret = VM_FAULT_OOM; 1635 goto out_inline; 1636 } 1637 err = __ceph_do_getattr(inode, page, 1638 CEPH_STAT_CAP_INLINE_DATA, true); 1639 if (err < 0 || off >= i_size_read(inode)) { 1640 unlock_page(page); 1641 put_page(page); 1642 ret = vmf_error(err); 1643 goto out_inline; 1644 } 1645 if (err < PAGE_SIZE) 1646 zero_user_segment(page, err, PAGE_SIZE); 1647 else 1648 flush_dcache_page(page); 1649 SetPageUptodate(page); 1650 vmf->page = page; 1651 ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED; 1652 out_inline: 1653 filemap_invalidate_unlock_shared(mapping); 1654 dout("filemap_fault %p %llu read inline data ret %x\n", 1655 inode, off, ret); 1656 } 1657 out_restore: 1658 ceph_restore_sigs(&oldset); 1659 if (err < 0) 1660 ret = vmf_error(err); 1661 1662 return ret; 1663 } 1664 1665 static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf) 1666 { 1667 struct vm_area_struct *vma = vmf->vma; 1668 struct inode *inode = file_inode(vma->vm_file); 1669 struct ceph_inode_info *ci = ceph_inode(inode); 1670 struct ceph_file_info *fi = vma->vm_file->private_data; 1671 struct ceph_cap_flush *prealloc_cf; 1672 struct page *page = vmf->page; 1673 loff_t off = page_offset(page); 1674 loff_t size = i_size_read(inode); 1675 size_t len; 1676 int want, got, err; 1677 sigset_t oldset; 1678 vm_fault_t ret = VM_FAULT_SIGBUS; 1679 1680 if (ceph_inode_is_shutdown(inode)) 1681 return ret; 1682 1683 prealloc_cf = ceph_alloc_cap_flush(); 1684 if (!prealloc_cf) 1685 return VM_FAULT_OOM; 1686 1687 sb_start_pagefault(inode->i_sb); 1688 ceph_block_sigs(&oldset); 1689 1690 if (off + thp_size(page) <= size) 1691 len = thp_size(page); 1692 else 1693 len = offset_in_thp(page, size); 1694 1695 dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n", 1696 inode, ceph_vinop(inode), off, len, size); 1697 if (fi->fmode & CEPH_FILE_MODE_LAZY) 1698 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; 1699 else 1700 want = CEPH_CAP_FILE_BUFFER; 1701 1702 got = 0; 1703 err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_WR, want, off + len, &got); 1704 if (err < 0) 1705 goto out_free; 1706 1707 dout("page_mkwrite %p %llu~%zd got cap refs on %s\n", 1708 inode, off, len, ceph_cap_string(got)); 1709 1710 /* Update time before taking page lock */ 1711 file_update_time(vma->vm_file); 1712 inode_inc_iversion_raw(inode); 1713 1714 do { 1715 struct ceph_snap_context *snapc; 1716 1717 lock_page(page); 1718 1719 if (page_mkwrite_check_truncate(page, inode) < 0) { 1720 unlock_page(page); 1721 ret = VM_FAULT_NOPAGE; 1722 break; 1723 } 1724 1725 snapc = ceph_find_incompatible(page); 1726 if (!snapc) { 1727 /* success. we'll keep the page locked. */ 1728 set_page_dirty(page); 1729 ret = VM_FAULT_LOCKED; 1730 break; 1731 } 1732 1733 unlock_page(page); 1734 1735 if (IS_ERR(snapc)) { 1736 ret = VM_FAULT_SIGBUS; 1737 break; 1738 } 1739 1740 ceph_queue_writeback(inode); 1741 err = wait_event_killable(ci->i_cap_wq, 1742 context_is_writeable_or_written(inode, snapc)); 1743 ceph_put_snap_context(snapc); 1744 } while (err == 0); 1745 1746 if (ret == VM_FAULT_LOCKED) { 1747 int dirty; 1748 spin_lock(&ci->i_ceph_lock); 1749 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, 1750 &prealloc_cf); 1751 spin_unlock(&ci->i_ceph_lock); 1752 if (dirty) 1753 __mark_inode_dirty(inode, dirty); 1754 } 1755 1756 dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %x\n", 1757 inode, off, len, ceph_cap_string(got), ret); 1758 ceph_put_cap_refs_async(ci, got); 1759 out_free: 1760 ceph_restore_sigs(&oldset); 1761 sb_end_pagefault(inode->i_sb); 1762 ceph_free_cap_flush(prealloc_cf); 1763 if (err < 0) 1764 ret = vmf_error(err); 1765 return ret; 1766 } 1767 1768 void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, 1769 char *data, size_t len) 1770 { 1771 struct address_space *mapping = inode->i_mapping; 1772 struct page *page; 1773 1774 if (locked_page) { 1775 page = locked_page; 1776 } else { 1777 if (i_size_read(inode) == 0) 1778 return; 1779 page = find_or_create_page(mapping, 0, 1780 mapping_gfp_constraint(mapping, 1781 ~__GFP_FS)); 1782 if (!page) 1783 return; 1784 if (PageUptodate(page)) { 1785 unlock_page(page); 1786 put_page(page); 1787 return; 1788 } 1789 } 1790 1791 dout("fill_inline_data %p %llx.%llx len %zu locked_page %p\n", 1792 inode, ceph_vinop(inode), len, locked_page); 1793 1794 if (len > 0) { 1795 void *kaddr = kmap_atomic(page); 1796 memcpy(kaddr, data, len); 1797 kunmap_atomic(kaddr); 1798 } 1799 1800 if (page != locked_page) { 1801 if (len < PAGE_SIZE) 1802 zero_user_segment(page, len, PAGE_SIZE); 1803 else 1804 flush_dcache_page(page); 1805 1806 SetPageUptodate(page); 1807 unlock_page(page); 1808 put_page(page); 1809 } 1810 } 1811 1812 int ceph_uninline_data(struct file *file) 1813 { 1814 struct inode *inode = file_inode(file); 1815 struct ceph_inode_info *ci = ceph_inode(inode); 1816 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 1817 struct ceph_osd_request *req = NULL; 1818 struct ceph_cap_flush *prealloc_cf = NULL; 1819 struct folio *folio = NULL; 1820 u64 inline_version = CEPH_INLINE_NONE; 1821 struct page *pages[1]; 1822 int err = 0; 1823 u64 len; 1824 1825 spin_lock(&ci->i_ceph_lock); 1826 inline_version = ci->i_inline_version; 1827 spin_unlock(&ci->i_ceph_lock); 1828 1829 dout("uninline_data %p %llx.%llx inline_version %llu\n", 1830 inode, ceph_vinop(inode), inline_version); 1831 1832 if (ceph_inode_is_shutdown(inode)) { 1833 err = -EIO; 1834 goto out; 1835 } 1836 1837 if (inline_version == CEPH_INLINE_NONE) 1838 return 0; 1839 1840 prealloc_cf = ceph_alloc_cap_flush(); 1841 if (!prealloc_cf) 1842 return -ENOMEM; 1843 1844 if (inline_version == 1) /* initial version, no data */ 1845 goto out_uninline; 1846 1847 folio = read_mapping_folio(inode->i_mapping, 0, file); 1848 if (IS_ERR(folio)) { 1849 err = PTR_ERR(folio); 1850 goto out; 1851 } 1852 1853 folio_lock(folio); 1854 1855 len = i_size_read(inode); 1856 if (len > folio_size(folio)) 1857 len = folio_size(folio); 1858 1859 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 1860 ceph_vino(inode), 0, &len, 0, 1, 1861 CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE, 1862 NULL, 0, 0, false); 1863 if (IS_ERR(req)) { 1864 err = PTR_ERR(req); 1865 goto out_unlock; 1866 } 1867 1868 req->r_mtime = inode->i_mtime; 1869 ceph_osdc_start_request(&fsc->client->osdc, req); 1870 err = ceph_osdc_wait_request(&fsc->client->osdc, req); 1871 ceph_osdc_put_request(req); 1872 if (err < 0) 1873 goto out_unlock; 1874 1875 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 1876 ceph_vino(inode), 0, &len, 1, 3, 1877 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, 1878 NULL, ci->i_truncate_seq, 1879 ci->i_truncate_size, false); 1880 if (IS_ERR(req)) { 1881 err = PTR_ERR(req); 1882 goto out_unlock; 1883 } 1884 1885 pages[0] = folio_page(folio, 0); 1886 osd_req_op_extent_osd_data_pages(req, 1, pages, len, 0, false, false); 1887 1888 { 1889 __le64 xattr_buf = cpu_to_le64(inline_version); 1890 err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR, 1891 "inline_version", &xattr_buf, 1892 sizeof(xattr_buf), 1893 CEPH_OSD_CMPXATTR_OP_GT, 1894 CEPH_OSD_CMPXATTR_MODE_U64); 1895 if (err) 1896 goto out_put_req; 1897 } 1898 1899 { 1900 char xattr_buf[32]; 1901 int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf), 1902 "%llu", inline_version); 1903 err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR, 1904 "inline_version", 1905 xattr_buf, xattr_len, 0, 0); 1906 if (err) 1907 goto out_put_req; 1908 } 1909 1910 req->r_mtime = inode->i_mtime; 1911 ceph_osdc_start_request(&fsc->client->osdc, req); 1912 err = ceph_osdc_wait_request(&fsc->client->osdc, req); 1913 1914 ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency, 1915 req->r_end_latency, len, err); 1916 1917 out_uninline: 1918 if (!err) { 1919 int dirty; 1920 1921 /* Set to CAP_INLINE_NONE and dirty the caps */ 1922 down_read(&fsc->mdsc->snap_rwsem); 1923 spin_lock(&ci->i_ceph_lock); 1924 ci->i_inline_version = CEPH_INLINE_NONE; 1925 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, &prealloc_cf); 1926 spin_unlock(&ci->i_ceph_lock); 1927 up_read(&fsc->mdsc->snap_rwsem); 1928 if (dirty) 1929 __mark_inode_dirty(inode, dirty); 1930 } 1931 out_put_req: 1932 ceph_osdc_put_request(req); 1933 if (err == -ECANCELED) 1934 err = 0; 1935 out_unlock: 1936 if (folio) { 1937 folio_unlock(folio); 1938 folio_put(folio); 1939 } 1940 out: 1941 ceph_free_cap_flush(prealloc_cf); 1942 dout("uninline_data %p %llx.%llx inline_version %llu = %d\n", 1943 inode, ceph_vinop(inode), inline_version, err); 1944 return err; 1945 } 1946 1947 static const struct vm_operations_struct ceph_vmops = { 1948 .fault = ceph_filemap_fault, 1949 .page_mkwrite = ceph_page_mkwrite, 1950 }; 1951 1952 int ceph_mmap(struct file *file, struct vm_area_struct *vma) 1953 { 1954 struct address_space *mapping = file->f_mapping; 1955 1956 if (!mapping->a_ops->read_folio) 1957 return -ENOEXEC; 1958 vma->vm_ops = &ceph_vmops; 1959 return 0; 1960 } 1961 1962 enum { 1963 POOL_READ = 1, 1964 POOL_WRITE = 2, 1965 }; 1966 1967 static int __ceph_pool_perm_get(struct ceph_inode_info *ci, 1968 s64 pool, struct ceph_string *pool_ns) 1969 { 1970 struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->netfs.inode); 1971 struct ceph_mds_client *mdsc = fsc->mdsc; 1972 struct ceph_osd_request *rd_req = NULL, *wr_req = NULL; 1973 struct rb_node **p, *parent; 1974 struct ceph_pool_perm *perm; 1975 struct page **pages; 1976 size_t pool_ns_len; 1977 int err = 0, err2 = 0, have = 0; 1978 1979 down_read(&mdsc->pool_perm_rwsem); 1980 p = &mdsc->pool_perm_tree.rb_node; 1981 while (*p) { 1982 perm = rb_entry(*p, struct ceph_pool_perm, node); 1983 if (pool < perm->pool) 1984 p = &(*p)->rb_left; 1985 else if (pool > perm->pool) 1986 p = &(*p)->rb_right; 1987 else { 1988 int ret = ceph_compare_string(pool_ns, 1989 perm->pool_ns, 1990 perm->pool_ns_len); 1991 if (ret < 0) 1992 p = &(*p)->rb_left; 1993 else if (ret > 0) 1994 p = &(*p)->rb_right; 1995 else { 1996 have = perm->perm; 1997 break; 1998 } 1999 } 2000 } 2001 up_read(&mdsc->pool_perm_rwsem); 2002 if (*p) 2003 goto out; 2004 2005 if (pool_ns) 2006 dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n", 2007 pool, (int)pool_ns->len, pool_ns->str); 2008 else 2009 dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool); 2010 2011 down_write(&mdsc->pool_perm_rwsem); 2012 p = &mdsc->pool_perm_tree.rb_node; 2013 parent = NULL; 2014 while (*p) { 2015 parent = *p; 2016 perm = rb_entry(parent, struct ceph_pool_perm, node); 2017 if (pool < perm->pool) 2018 p = &(*p)->rb_left; 2019 else if (pool > perm->pool) 2020 p = &(*p)->rb_right; 2021 else { 2022 int ret = ceph_compare_string(pool_ns, 2023 perm->pool_ns, 2024 perm->pool_ns_len); 2025 if (ret < 0) 2026 p = &(*p)->rb_left; 2027 else if (ret > 0) 2028 p = &(*p)->rb_right; 2029 else { 2030 have = perm->perm; 2031 break; 2032 } 2033 } 2034 } 2035 if (*p) { 2036 up_write(&mdsc->pool_perm_rwsem); 2037 goto out; 2038 } 2039 2040 rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL, 2041 1, false, GFP_NOFS); 2042 if (!rd_req) { 2043 err = -ENOMEM; 2044 goto out_unlock; 2045 } 2046 2047 rd_req->r_flags = CEPH_OSD_FLAG_READ; 2048 osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0); 2049 rd_req->r_base_oloc.pool = pool; 2050 if (pool_ns) 2051 rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns); 2052 ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino); 2053 2054 err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS); 2055 if (err) 2056 goto out_unlock; 2057 2058 wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL, 2059 1, false, GFP_NOFS); 2060 if (!wr_req) { 2061 err = -ENOMEM; 2062 goto out_unlock; 2063 } 2064 2065 wr_req->r_flags = CEPH_OSD_FLAG_WRITE; 2066 osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL); 2067 ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc); 2068 ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid); 2069 2070 err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS); 2071 if (err) 2072 goto out_unlock; 2073 2074 /* one page should be large enough for STAT data */ 2075 pages = ceph_alloc_page_vector(1, GFP_KERNEL); 2076 if (IS_ERR(pages)) { 2077 err = PTR_ERR(pages); 2078 goto out_unlock; 2079 } 2080 2081 osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE, 2082 0, false, true); 2083 ceph_osdc_start_request(&fsc->client->osdc, rd_req); 2084 2085 wr_req->r_mtime = ci->netfs.inode.i_mtime; 2086 ceph_osdc_start_request(&fsc->client->osdc, wr_req); 2087 2088 err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req); 2089 err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req); 2090 2091 if (err >= 0 || err == -ENOENT) 2092 have |= POOL_READ; 2093 else if (err != -EPERM) { 2094 if (err == -EBLOCKLISTED) 2095 fsc->blocklisted = true; 2096 goto out_unlock; 2097 } 2098 2099 if (err2 == 0 || err2 == -EEXIST) 2100 have |= POOL_WRITE; 2101 else if (err2 != -EPERM) { 2102 if (err2 == -EBLOCKLISTED) 2103 fsc->blocklisted = true; 2104 err = err2; 2105 goto out_unlock; 2106 } 2107 2108 pool_ns_len = pool_ns ? pool_ns->len : 0; 2109 perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS); 2110 if (!perm) { 2111 err = -ENOMEM; 2112 goto out_unlock; 2113 } 2114 2115 perm->pool = pool; 2116 perm->perm = have; 2117 perm->pool_ns_len = pool_ns_len; 2118 if (pool_ns_len > 0) 2119 memcpy(perm->pool_ns, pool_ns->str, pool_ns_len); 2120 perm->pool_ns[pool_ns_len] = 0; 2121 2122 rb_link_node(&perm->node, parent, p); 2123 rb_insert_color(&perm->node, &mdsc->pool_perm_tree); 2124 err = 0; 2125 out_unlock: 2126 up_write(&mdsc->pool_perm_rwsem); 2127 2128 ceph_osdc_put_request(rd_req); 2129 ceph_osdc_put_request(wr_req); 2130 out: 2131 if (!err) 2132 err = have; 2133 if (pool_ns) 2134 dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n", 2135 pool, (int)pool_ns->len, pool_ns->str, err); 2136 else 2137 dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err); 2138 return err; 2139 } 2140 2141 int ceph_pool_perm_check(struct inode *inode, int need) 2142 { 2143 struct ceph_inode_info *ci = ceph_inode(inode); 2144 struct ceph_string *pool_ns; 2145 s64 pool; 2146 int ret, flags; 2147 2148 /* Only need to do this for regular files */ 2149 if (!S_ISREG(inode->i_mode)) 2150 return 0; 2151 2152 if (ci->i_vino.snap != CEPH_NOSNAP) { 2153 /* 2154 * Pool permission check needs to write to the first object. 2155 * But for snapshot, head of the first object may have alread 2156 * been deleted. Skip check to avoid creating orphan object. 2157 */ 2158 return 0; 2159 } 2160 2161 if (ceph_test_mount_opt(ceph_inode_to_client(inode), 2162 NOPOOLPERM)) 2163 return 0; 2164 2165 spin_lock(&ci->i_ceph_lock); 2166 flags = ci->i_ceph_flags; 2167 pool = ci->i_layout.pool_id; 2168 spin_unlock(&ci->i_ceph_lock); 2169 check: 2170 if (flags & CEPH_I_POOL_PERM) { 2171 if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) { 2172 dout("ceph_pool_perm_check pool %lld no read perm\n", 2173 pool); 2174 return -EPERM; 2175 } 2176 if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) { 2177 dout("ceph_pool_perm_check pool %lld no write perm\n", 2178 pool); 2179 return -EPERM; 2180 } 2181 return 0; 2182 } 2183 2184 pool_ns = ceph_try_get_string(ci->i_layout.pool_ns); 2185 ret = __ceph_pool_perm_get(ci, pool, pool_ns); 2186 ceph_put_string(pool_ns); 2187 if (ret < 0) 2188 return ret; 2189 2190 flags = CEPH_I_POOL_PERM; 2191 if (ret & POOL_READ) 2192 flags |= CEPH_I_POOL_RD; 2193 if (ret & POOL_WRITE) 2194 flags |= CEPH_I_POOL_WR; 2195 2196 spin_lock(&ci->i_ceph_lock); 2197 if (pool == ci->i_layout.pool_id && 2198 pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) { 2199 ci->i_ceph_flags |= flags; 2200 } else { 2201 pool = ci->i_layout.pool_id; 2202 flags = ci->i_ceph_flags; 2203 } 2204 spin_unlock(&ci->i_ceph_lock); 2205 goto check; 2206 } 2207 2208 void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc) 2209 { 2210 struct ceph_pool_perm *perm; 2211 struct rb_node *n; 2212 2213 while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) { 2214 n = rb_first(&mdsc->pool_perm_tree); 2215 perm = rb_entry(n, struct ceph_pool_perm, node); 2216 rb_erase(n, &mdsc->pool_perm_tree); 2217 kfree(perm); 2218 } 2219 } 2220