1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/backing-dev.h> 4 #include <linux/fs.h> 5 #include <linux/mm.h> 6 #include <linux/pagemap.h> 7 #include <linux/writeback.h> /* generic_writepages */ 8 #include <linux/slab.h> 9 #include <linux/pagevec.h> 10 #include <linux/task_io_accounting_ops.h> 11 12 #include "super.h" 13 #include "mds_client.h" 14 #include "cache.h" 15 #include <linux/ceph/osd_client.h> 16 17 /* 18 * Ceph address space ops. 19 * 20 * There are a few funny things going on here. 21 * 22 * The page->private field is used to reference a struct 23 * ceph_snap_context for _every_ dirty page. This indicates which 24 * snapshot the page was logically dirtied in, and thus which snap 25 * context needs to be associated with the osd write during writeback. 26 * 27 * Similarly, struct ceph_inode_info maintains a set of counters to 28 * count dirty pages on the inode. In the absence of snapshots, 29 * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count. 30 * 31 * When a snapshot is taken (that is, when the client receives 32 * notification that a snapshot was taken), each inode with caps and 33 * with dirty pages (dirty pages implies there is a cap) gets a new 34 * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending 35 * order, new snaps go to the tail). The i_wrbuffer_ref_head count is 36 * moved to capsnap->dirty. (Unless a sync write is currently in 37 * progress. In that case, the capsnap is said to be "pending", new 38 * writes cannot start, and the capsnap isn't "finalized" until the 39 * write completes (or fails) and a final size/mtime for the inode for 40 * that snap can be settled upon.) i_wrbuffer_ref_head is reset to 0. 41 * 42 * On writeback, we must submit writes to the osd IN SNAP ORDER. So, 43 * we look for the first capsnap in i_cap_snaps and write out pages in 44 * that snap context _only_. Then we move on to the next capsnap, 45 * eventually reaching the "live" or "head" context (i.e., pages that 46 * are not yet snapped) and are writing the most recently dirtied 47 * pages. 48 * 49 * Invalidate and so forth must take care to ensure the dirty page 50 * accounting is preserved. 51 */ 52 53 #define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10)) 54 #define CONGESTION_OFF_THRESH(congestion_kb) \ 55 (CONGESTION_ON_THRESH(congestion_kb) - \ 56 (CONGESTION_ON_THRESH(congestion_kb) >> 2)) 57 58 static inline struct ceph_snap_context *page_snap_context(struct page *page) 59 { 60 if (PagePrivate(page)) 61 return (void *)page->private; 62 return NULL; 63 } 64 65 /* 66 * Dirty a page. Optimistically adjust accounting, on the assumption 67 * that we won't race with invalidate. If we do, readjust. 68 */ 69 static int ceph_set_page_dirty(struct page *page) 70 { 71 struct address_space *mapping = page->mapping; 72 struct inode *inode; 73 struct ceph_inode_info *ci; 74 struct ceph_snap_context *snapc; 75 int ret; 76 77 if (unlikely(!mapping)) 78 return !TestSetPageDirty(page); 79 80 if (PageDirty(page)) { 81 dout("%p set_page_dirty %p idx %lu -- already dirty\n", 82 mapping->host, page, page->index); 83 BUG_ON(!PagePrivate(page)); 84 return 0; 85 } 86 87 inode = mapping->host; 88 ci = ceph_inode(inode); 89 90 /* dirty the head */ 91 spin_lock(&ci->i_ceph_lock); 92 BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference 93 if (__ceph_have_pending_cap_snap(ci)) { 94 struct ceph_cap_snap *capsnap = 95 list_last_entry(&ci->i_cap_snaps, 96 struct ceph_cap_snap, 97 ci_item); 98 snapc = ceph_get_snap_context(capsnap->context); 99 capsnap->dirty_pages++; 100 } else { 101 BUG_ON(!ci->i_head_snapc); 102 snapc = ceph_get_snap_context(ci->i_head_snapc); 103 ++ci->i_wrbuffer_ref_head; 104 } 105 if (ci->i_wrbuffer_ref == 0) 106 ihold(inode); 107 ++ci->i_wrbuffer_ref; 108 dout("%p set_page_dirty %p idx %lu head %d/%d -> %d/%d " 109 "snapc %p seq %lld (%d snaps)\n", 110 mapping->host, page, page->index, 111 ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1, 112 ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head, 113 snapc, snapc->seq, snapc->num_snaps); 114 spin_unlock(&ci->i_ceph_lock); 115 116 /* 117 * Reference snap context in page->private. Also set 118 * PagePrivate so that we get invalidatepage callback. 119 */ 120 BUG_ON(PagePrivate(page)); 121 page->private = (unsigned long)snapc; 122 SetPagePrivate(page); 123 124 ret = __set_page_dirty_nobuffers(page); 125 WARN_ON(!PageLocked(page)); 126 WARN_ON(!page->mapping); 127 128 return ret; 129 } 130 131 /* 132 * If we are truncating the full page (i.e. offset == 0), adjust the 133 * dirty page counters appropriately. Only called if there is private 134 * data on the page. 135 */ 136 static void ceph_invalidatepage(struct page *page, unsigned int offset, 137 unsigned int length) 138 { 139 struct inode *inode; 140 struct ceph_inode_info *ci; 141 struct ceph_snap_context *snapc = page_snap_context(page); 142 143 inode = page->mapping->host; 144 ci = ceph_inode(inode); 145 146 if (offset != 0 || length != PAGE_SIZE) { 147 dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n", 148 inode, page, page->index, offset, length); 149 return; 150 } 151 152 ceph_invalidate_fscache_page(inode, page); 153 154 if (!PagePrivate(page)) 155 return; 156 157 /* 158 * We can get non-dirty pages here due to races between 159 * set_page_dirty and truncate_complete_page; just spit out a 160 * warning, in case we end up with accounting problems later. 161 */ 162 if (!PageDirty(page)) 163 pr_err("%p invalidatepage %p page not dirty\n", inode, page); 164 165 ClearPageChecked(page); 166 167 dout("%p invalidatepage %p idx %lu full dirty page\n", 168 inode, page, page->index); 169 170 ceph_put_wrbuffer_cap_refs(ci, 1, snapc); 171 ceph_put_snap_context(snapc); 172 page->private = 0; 173 ClearPagePrivate(page); 174 } 175 176 static int ceph_releasepage(struct page *page, gfp_t g) 177 { 178 dout("%p releasepage %p idx %lu\n", page->mapping->host, 179 page, page->index); 180 WARN_ON(PageDirty(page)); 181 182 /* Can we release the page from the cache? */ 183 if (!ceph_release_fscache_page(page, g)) 184 return 0; 185 186 return !PagePrivate(page); 187 } 188 189 /* 190 * read a single page, without unlocking it. 191 */ 192 static int readpage_nounlock(struct file *filp, struct page *page) 193 { 194 struct inode *inode = file_inode(filp); 195 struct ceph_inode_info *ci = ceph_inode(inode); 196 struct ceph_osd_client *osdc = 197 &ceph_inode_to_client(inode)->client->osdc; 198 int err = 0; 199 u64 off = page_offset(page); 200 u64 len = PAGE_SIZE; 201 202 if (off >= i_size_read(inode)) { 203 zero_user_segment(page, 0, PAGE_SIZE); 204 SetPageUptodate(page); 205 return 0; 206 } 207 208 if (ci->i_inline_version != CEPH_INLINE_NONE) { 209 /* 210 * Uptodate inline data should have been added 211 * into page cache while getting Fcr caps. 212 */ 213 if (off == 0) 214 return -EINVAL; 215 zero_user_segment(page, 0, PAGE_SIZE); 216 SetPageUptodate(page); 217 return 0; 218 } 219 220 err = ceph_readpage_from_fscache(inode, page); 221 if (err == 0) 222 goto out; 223 224 dout("readpage inode %p file %p page %p index %lu\n", 225 inode, filp, page, page->index); 226 err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, 227 off, &len, 228 ci->i_truncate_seq, ci->i_truncate_size, 229 &page, 1, 0); 230 if (err == -ENOENT) 231 err = 0; 232 if (err < 0) { 233 SetPageError(page); 234 ceph_fscache_readpage_cancel(inode, page); 235 goto out; 236 } 237 if (err < PAGE_SIZE) 238 /* zero fill remainder of page */ 239 zero_user_segment(page, err, PAGE_SIZE); 240 else 241 flush_dcache_page(page); 242 243 SetPageUptodate(page); 244 ceph_readpage_to_fscache(inode, page); 245 246 out: 247 return err < 0 ? err : 0; 248 } 249 250 static int ceph_readpage(struct file *filp, struct page *page) 251 { 252 int r = readpage_nounlock(filp, page); 253 unlock_page(page); 254 return r; 255 } 256 257 /* 258 * Finish an async read(ahead) op. 259 */ 260 static void finish_read(struct ceph_osd_request *req) 261 { 262 struct inode *inode = req->r_inode; 263 struct ceph_osd_data *osd_data; 264 int rc = req->r_result <= 0 ? req->r_result : 0; 265 int bytes = req->r_result >= 0 ? req->r_result : 0; 266 int num_pages; 267 int i; 268 269 dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); 270 271 /* unlock all pages, zeroing any data we didn't read */ 272 osd_data = osd_req_op_extent_osd_data(req, 0); 273 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); 274 num_pages = calc_pages_for((u64)osd_data->alignment, 275 (u64)osd_data->length); 276 for (i = 0; i < num_pages; i++) { 277 struct page *page = osd_data->pages[i]; 278 279 if (rc < 0 && rc != -ENOENT) { 280 ceph_fscache_readpage_cancel(inode, page); 281 goto unlock; 282 } 283 if (bytes < (int)PAGE_SIZE) { 284 /* zero (remainder of) page */ 285 int s = bytes < 0 ? 0 : bytes; 286 zero_user_segment(page, s, PAGE_SIZE); 287 } 288 dout("finish_read %p uptodate %p idx %lu\n", inode, page, 289 page->index); 290 flush_dcache_page(page); 291 SetPageUptodate(page); 292 ceph_readpage_to_fscache(inode, page); 293 unlock: 294 unlock_page(page); 295 put_page(page); 296 bytes -= PAGE_SIZE; 297 } 298 kfree(osd_data->pages); 299 } 300 301 static void ceph_unlock_page_vector(struct page **pages, int num_pages) 302 { 303 int i; 304 305 for (i = 0; i < num_pages; i++) 306 unlock_page(pages[i]); 307 } 308 309 /* 310 * start an async read(ahead) operation. return nr_pages we submitted 311 * a read for on success, or negative error code. 312 */ 313 static int start_read(struct inode *inode, struct list_head *page_list, int max) 314 { 315 struct ceph_osd_client *osdc = 316 &ceph_inode_to_client(inode)->client->osdc; 317 struct ceph_inode_info *ci = ceph_inode(inode); 318 struct page *page = list_entry(page_list->prev, struct page, lru); 319 struct ceph_vino vino; 320 struct ceph_osd_request *req; 321 u64 off; 322 u64 len; 323 int i; 324 struct page **pages; 325 pgoff_t next_index; 326 int nr_pages = 0; 327 int ret; 328 329 off = (u64) page_offset(page); 330 331 /* count pages */ 332 next_index = page->index; 333 list_for_each_entry_reverse(page, page_list, lru) { 334 if (page->index != next_index) 335 break; 336 nr_pages++; 337 next_index++; 338 if (max && nr_pages == max) 339 break; 340 } 341 len = nr_pages << PAGE_SHIFT; 342 dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages, 343 off, len); 344 vino = ceph_vino(inode); 345 req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, 346 0, 1, CEPH_OSD_OP_READ, 347 CEPH_OSD_FLAG_READ, NULL, 348 ci->i_truncate_seq, ci->i_truncate_size, 349 false); 350 if (IS_ERR(req)) 351 return PTR_ERR(req); 352 353 /* build page vector */ 354 nr_pages = calc_pages_for(0, len); 355 pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL); 356 ret = -ENOMEM; 357 if (!pages) 358 goto out; 359 for (i = 0; i < nr_pages; ++i) { 360 page = list_entry(page_list->prev, struct page, lru); 361 BUG_ON(PageLocked(page)); 362 list_del(&page->lru); 363 364 dout("start_read %p adding %p idx %lu\n", inode, page, 365 page->index); 366 if (add_to_page_cache_lru(page, &inode->i_data, page->index, 367 GFP_KERNEL)) { 368 ceph_fscache_uncache_page(inode, page); 369 put_page(page); 370 dout("start_read %p add_to_page_cache failed %p\n", 371 inode, page); 372 nr_pages = i; 373 goto out_pages; 374 } 375 pages[i] = page; 376 } 377 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false); 378 req->r_callback = finish_read; 379 req->r_inode = inode; 380 381 dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len); 382 ret = ceph_osdc_start_request(osdc, req, false); 383 if (ret < 0) 384 goto out_pages; 385 ceph_osdc_put_request(req); 386 return nr_pages; 387 388 out_pages: 389 ceph_unlock_page_vector(pages, nr_pages); 390 ceph_release_page_vector(pages, nr_pages); 391 out: 392 ceph_osdc_put_request(req); 393 return ret; 394 } 395 396 397 /* 398 * Read multiple pages. Leave pages we don't read + unlock in page_list; 399 * the caller (VM) cleans them up. 400 */ 401 static int ceph_readpages(struct file *file, struct address_space *mapping, 402 struct list_head *page_list, unsigned nr_pages) 403 { 404 struct inode *inode = file_inode(file); 405 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 406 int rc = 0; 407 int max = 0; 408 409 if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE) 410 return -EINVAL; 411 412 rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list, 413 &nr_pages); 414 415 if (rc == 0) 416 goto out; 417 418 if (fsc->mount_options->rsize >= PAGE_SIZE) 419 max = (fsc->mount_options->rsize + PAGE_SIZE - 1) 420 >> PAGE_SHIFT; 421 422 dout("readpages %p file %p nr_pages %d max %d\n", inode, 423 file, nr_pages, 424 max); 425 while (!list_empty(page_list)) { 426 rc = start_read(inode, page_list, max); 427 if (rc < 0) 428 goto out; 429 BUG_ON(rc == 0); 430 } 431 out: 432 ceph_fscache_readpages_cancel(inode, page_list); 433 434 dout("readpages %p file %p ret %d\n", inode, file, rc); 435 return rc; 436 } 437 438 /* 439 * Get ref for the oldest snapc for an inode with dirty data... that is, the 440 * only snap context we are allowed to write back. 441 */ 442 static struct ceph_snap_context *get_oldest_context(struct inode *inode, 443 loff_t *snap_size) 444 { 445 struct ceph_inode_info *ci = ceph_inode(inode); 446 struct ceph_snap_context *snapc = NULL; 447 struct ceph_cap_snap *capsnap = NULL; 448 449 spin_lock(&ci->i_ceph_lock); 450 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 451 dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap, 452 capsnap->context, capsnap->dirty_pages); 453 if (capsnap->dirty_pages) { 454 snapc = ceph_get_snap_context(capsnap->context); 455 if (snap_size) 456 *snap_size = capsnap->size; 457 break; 458 } 459 } 460 if (!snapc && ci->i_wrbuffer_ref_head) { 461 snapc = ceph_get_snap_context(ci->i_head_snapc); 462 dout(" head snapc %p has %d dirty pages\n", 463 snapc, ci->i_wrbuffer_ref_head); 464 } 465 spin_unlock(&ci->i_ceph_lock); 466 return snapc; 467 } 468 469 /* 470 * Write a single page, but leave the page locked. 471 * 472 * If we get a write error, set the page error bit, but still adjust the 473 * dirty page accounting (i.e., page is no longer dirty). 474 */ 475 static int writepage_nounlock(struct page *page, struct writeback_control *wbc) 476 { 477 struct inode *inode; 478 struct ceph_inode_info *ci; 479 struct ceph_fs_client *fsc; 480 struct ceph_osd_client *osdc; 481 struct ceph_snap_context *snapc, *oldest; 482 loff_t page_off = page_offset(page); 483 loff_t snap_size = -1; 484 long writeback_stat; 485 u64 truncate_size; 486 u32 truncate_seq; 487 int err = 0, len = PAGE_SIZE; 488 489 dout("writepage %p idx %lu\n", page, page->index); 490 491 if (!page->mapping || !page->mapping->host) { 492 dout("writepage %p - no mapping\n", page); 493 return -EFAULT; 494 } 495 inode = page->mapping->host; 496 ci = ceph_inode(inode); 497 fsc = ceph_inode_to_client(inode); 498 osdc = &fsc->client->osdc; 499 500 /* verify this is a writeable snap context */ 501 snapc = page_snap_context(page); 502 if (snapc == NULL) { 503 dout("writepage %p page %p not dirty?\n", inode, page); 504 goto out; 505 } 506 oldest = get_oldest_context(inode, &snap_size); 507 if (snapc->seq > oldest->seq) { 508 dout("writepage %p page %p snapc %p not writeable - noop\n", 509 inode, page, snapc); 510 /* we should only noop if called by kswapd */ 511 WARN_ON((current->flags & PF_MEMALLOC) == 0); 512 ceph_put_snap_context(oldest); 513 goto out; 514 } 515 ceph_put_snap_context(oldest); 516 517 spin_lock(&ci->i_ceph_lock); 518 truncate_seq = ci->i_truncate_seq; 519 truncate_size = ci->i_truncate_size; 520 if (snap_size == -1) 521 snap_size = i_size_read(inode); 522 spin_unlock(&ci->i_ceph_lock); 523 524 /* is this a partial page at end of file? */ 525 if (page_off >= snap_size) { 526 dout("%p page eof %llu\n", page, snap_size); 527 goto out; 528 } 529 if (snap_size < page_off + len) 530 len = snap_size - page_off; 531 532 dout("writepage %p page %p index %lu on %llu~%u snapc %p\n", 533 inode, page, page->index, page_off, len, snapc); 534 535 writeback_stat = atomic_long_inc_return(&fsc->writeback_count); 536 if (writeback_stat > 537 CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb)) 538 set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC); 539 540 set_page_writeback(page); 541 err = ceph_osdc_writepages(osdc, ceph_vino(inode), 542 &ci->i_layout, snapc, 543 page_off, len, 544 truncate_seq, truncate_size, 545 &inode->i_mtime, &page, 1); 546 if (err < 0) { 547 struct writeback_control tmp_wbc; 548 if (!wbc) 549 wbc = &tmp_wbc; 550 if (err == -ERESTARTSYS) { 551 /* killed by SIGKILL */ 552 dout("writepage interrupted page %p\n", page); 553 redirty_page_for_writepage(wbc, page); 554 end_page_writeback(page); 555 goto out; 556 } 557 dout("writepage setting page/mapping error %d %p\n", 558 err, page); 559 SetPageError(page); 560 mapping_set_error(&inode->i_data, err); 561 wbc->pages_skipped++; 562 } else { 563 dout("writepage cleaned page %p\n", page); 564 err = 0; /* vfs expects us to return 0 */ 565 } 566 page->private = 0; 567 ClearPagePrivate(page); 568 end_page_writeback(page); 569 ceph_put_wrbuffer_cap_refs(ci, 1, snapc); 570 ceph_put_snap_context(snapc); /* page's reference */ 571 out: 572 return err; 573 } 574 575 static int ceph_writepage(struct page *page, struct writeback_control *wbc) 576 { 577 int err; 578 struct inode *inode = page->mapping->host; 579 BUG_ON(!inode); 580 ihold(inode); 581 err = writepage_nounlock(page, wbc); 582 if (err == -ERESTARTSYS) { 583 /* direct memory reclaimer was killed by SIGKILL. return 0 584 * to prevent caller from setting mapping/page error */ 585 err = 0; 586 } 587 unlock_page(page); 588 iput(inode); 589 return err; 590 } 591 592 /* 593 * lame release_pages helper. release_pages() isn't exported to 594 * modules. 595 */ 596 static void ceph_release_pages(struct page **pages, int num) 597 { 598 struct pagevec pvec; 599 int i; 600 601 pagevec_init(&pvec, 0); 602 for (i = 0; i < num; i++) { 603 if (pagevec_add(&pvec, pages[i]) == 0) 604 pagevec_release(&pvec); 605 } 606 pagevec_release(&pvec); 607 } 608 609 /* 610 * async writeback completion handler. 611 * 612 * If we get an error, set the mapping error bit, but not the individual 613 * page error bits. 614 */ 615 static void writepages_finish(struct ceph_osd_request *req) 616 { 617 struct inode *inode = req->r_inode; 618 struct ceph_inode_info *ci = ceph_inode(inode); 619 struct ceph_osd_data *osd_data; 620 struct page *page; 621 int num_pages, total_pages = 0; 622 int i, j; 623 int rc = req->r_result; 624 struct ceph_snap_context *snapc = req->r_snapc; 625 struct address_space *mapping = inode->i_mapping; 626 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 627 bool remove_page; 628 629 dout("writepages_finish %p rc %d\n", inode, rc); 630 if (rc < 0) 631 mapping_set_error(mapping, rc); 632 633 /* 634 * We lost the cache cap, need to truncate the page before 635 * it is unlocked, otherwise we'd truncate it later in the 636 * page truncation thread, possibly losing some data that 637 * raced its way in 638 */ 639 remove_page = !(ceph_caps_issued(ci) & 640 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)); 641 642 /* clean all pages */ 643 for (i = 0; i < req->r_num_ops; i++) { 644 if (req->r_ops[i].op != CEPH_OSD_OP_WRITE) 645 break; 646 647 osd_data = osd_req_op_extent_osd_data(req, i); 648 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); 649 num_pages = calc_pages_for((u64)osd_data->alignment, 650 (u64)osd_data->length); 651 total_pages += num_pages; 652 for (j = 0; j < num_pages; j++) { 653 page = osd_data->pages[j]; 654 BUG_ON(!page); 655 WARN_ON(!PageUptodate(page)); 656 657 if (atomic_long_dec_return(&fsc->writeback_count) < 658 CONGESTION_OFF_THRESH( 659 fsc->mount_options->congestion_kb)) 660 clear_bdi_congested(&fsc->backing_dev_info, 661 BLK_RW_ASYNC); 662 663 if (rc < 0) 664 SetPageError(page); 665 666 ceph_put_snap_context(page_snap_context(page)); 667 page->private = 0; 668 ClearPagePrivate(page); 669 dout("unlocking %p\n", page); 670 end_page_writeback(page); 671 672 if (remove_page) 673 generic_error_remove_page(inode->i_mapping, 674 page); 675 676 unlock_page(page); 677 } 678 dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n", 679 inode, osd_data->length, rc >= 0 ? num_pages : 0); 680 681 ceph_release_pages(osd_data->pages, num_pages); 682 } 683 684 ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc); 685 686 osd_data = osd_req_op_extent_osd_data(req, 0); 687 if (osd_data->pages_from_pool) 688 mempool_free(osd_data->pages, 689 ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool); 690 else 691 kfree(osd_data->pages); 692 ceph_osdc_put_request(req); 693 } 694 695 /* 696 * initiate async writeback 697 */ 698 static int ceph_writepages_start(struct address_space *mapping, 699 struct writeback_control *wbc) 700 { 701 struct inode *inode = mapping->host; 702 struct ceph_inode_info *ci = ceph_inode(inode); 703 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 704 struct ceph_vino vino = ceph_vino(inode); 705 pgoff_t index, start, end; 706 int range_whole = 0; 707 int should_loop = 1; 708 pgoff_t max_pages = 0, max_pages_ever = 0; 709 struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc; 710 struct pagevec pvec; 711 int done = 0; 712 int rc = 0; 713 unsigned wsize = 1 << inode->i_blkbits; 714 struct ceph_osd_request *req = NULL; 715 int do_sync = 0; 716 loff_t snap_size, i_size; 717 u64 truncate_size; 718 u32 truncate_seq; 719 720 /* 721 * Include a 'sync' in the OSD request if this is a data 722 * integrity write (e.g., O_SYNC write or fsync()), or if our 723 * cap is being revoked. 724 */ 725 if ((wbc->sync_mode == WB_SYNC_ALL) || 726 ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER)) 727 do_sync = 1; 728 dout("writepages_start %p dosync=%d (mode=%s)\n", 729 inode, do_sync, 730 wbc->sync_mode == WB_SYNC_NONE ? "NONE" : 731 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); 732 733 if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 734 if (ci->i_wrbuffer_ref > 0) { 735 pr_warn_ratelimited( 736 "writepage_start %p %lld forced umount\n", 737 inode, ceph_ino(inode)); 738 } 739 mapping_set_error(mapping, -EIO); 740 return -EIO; /* we're in a forced umount, don't write! */ 741 } 742 if (fsc->mount_options->wsize && fsc->mount_options->wsize < wsize) 743 wsize = fsc->mount_options->wsize; 744 if (wsize < PAGE_SIZE) 745 wsize = PAGE_SIZE; 746 max_pages_ever = wsize >> PAGE_SHIFT; 747 748 pagevec_init(&pvec, 0); 749 750 /* where to start/end? */ 751 if (wbc->range_cyclic) { 752 start = mapping->writeback_index; /* Start from prev offset */ 753 end = -1; 754 dout(" cyclic, start at %lu\n", start); 755 } else { 756 start = wbc->range_start >> PAGE_SHIFT; 757 end = wbc->range_end >> PAGE_SHIFT; 758 if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX) 759 range_whole = 1; 760 should_loop = 0; 761 dout(" not cyclic, %lu to %lu\n", start, end); 762 } 763 index = start; 764 765 retry: 766 /* find oldest snap context with dirty data */ 767 ceph_put_snap_context(snapc); 768 snap_size = -1; 769 snapc = get_oldest_context(inode, &snap_size); 770 if (!snapc) { 771 /* hmm, why does writepages get called when there 772 is no dirty data? */ 773 dout(" no snap context with dirty data?\n"); 774 goto out; 775 } 776 dout(" oldest snapc is %p seq %lld (%d snaps)\n", 777 snapc, snapc->seq, snapc->num_snaps); 778 779 spin_lock(&ci->i_ceph_lock); 780 truncate_seq = ci->i_truncate_seq; 781 truncate_size = ci->i_truncate_size; 782 i_size = i_size_read(inode); 783 spin_unlock(&ci->i_ceph_lock); 784 785 if (last_snapc && snapc != last_snapc) { 786 /* if we switched to a newer snapc, restart our scan at the 787 * start of the original file range. */ 788 dout(" snapc differs from last pass, restarting at %lu\n", 789 index); 790 index = start; 791 } 792 last_snapc = snapc; 793 794 while (!done && index <= end) { 795 unsigned i; 796 int first; 797 pgoff_t strip_unit_end = 0; 798 int num_ops = 0, op_idx; 799 int pvec_pages, locked_pages = 0; 800 struct page **pages = NULL, **data_pages; 801 mempool_t *pool = NULL; /* Becomes non-null if mempool used */ 802 struct page *page; 803 int want; 804 u64 offset = 0, len = 0; 805 806 max_pages = max_pages_ever; 807 808 get_more_pages: 809 first = -1; 810 want = min(end - index, 811 min((pgoff_t)PAGEVEC_SIZE, 812 max_pages - (pgoff_t)locked_pages) - 1) 813 + 1; 814 pvec_pages = pagevec_lookup_tag(&pvec, mapping, &index, 815 PAGECACHE_TAG_DIRTY, 816 want); 817 dout("pagevec_lookup_tag got %d\n", pvec_pages); 818 if (!pvec_pages && !locked_pages) 819 break; 820 for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) { 821 page = pvec.pages[i]; 822 dout("? %p idx %lu\n", page, page->index); 823 if (locked_pages == 0) 824 lock_page(page); /* first page */ 825 else if (!trylock_page(page)) 826 break; 827 828 /* only dirty pages, or our accounting breaks */ 829 if (unlikely(!PageDirty(page)) || 830 unlikely(page->mapping != mapping)) { 831 dout("!dirty or !mapping %p\n", page); 832 unlock_page(page); 833 break; 834 } 835 if (!wbc->range_cyclic && page->index > end) { 836 dout("end of range %p\n", page); 837 done = 1; 838 unlock_page(page); 839 break; 840 } 841 if (strip_unit_end && (page->index > strip_unit_end)) { 842 dout("end of strip unit %p\n", page); 843 unlock_page(page); 844 break; 845 } 846 if (wbc->sync_mode != WB_SYNC_NONE) { 847 dout("waiting on writeback %p\n", page); 848 wait_on_page_writeback(page); 849 } 850 if (page_offset(page) >= 851 (snap_size == -1 ? i_size : snap_size)) { 852 dout("%p page eof %llu\n", page, 853 (snap_size == -1 ? i_size : snap_size)); 854 done = 1; 855 unlock_page(page); 856 break; 857 } 858 if (PageWriteback(page)) { 859 dout("%p under writeback\n", page); 860 unlock_page(page); 861 break; 862 } 863 864 /* only if matching snap context */ 865 pgsnapc = page_snap_context(page); 866 if (pgsnapc->seq > snapc->seq) { 867 dout("page snapc %p %lld > oldest %p %lld\n", 868 pgsnapc, pgsnapc->seq, snapc, snapc->seq); 869 unlock_page(page); 870 if (!locked_pages) 871 continue; /* keep looking for snap */ 872 break; 873 } 874 875 if (!clear_page_dirty_for_io(page)) { 876 dout("%p !clear_page_dirty_for_io\n", page); 877 unlock_page(page); 878 break; 879 } 880 881 /* 882 * We have something to write. If this is 883 * the first locked page this time through, 884 * calculate max possinle write size and 885 * allocate a page array 886 */ 887 if (locked_pages == 0) { 888 u64 objnum; 889 u64 objoff; 890 891 /* prepare async write request */ 892 offset = (u64)page_offset(page); 893 len = wsize; 894 895 rc = ceph_calc_file_object_mapping(&ci->i_layout, 896 offset, len, 897 &objnum, &objoff, 898 &len); 899 if (rc < 0) { 900 unlock_page(page); 901 break; 902 } 903 904 num_ops = 1 + do_sync; 905 strip_unit_end = page->index + 906 ((len - 1) >> PAGE_SHIFT); 907 908 BUG_ON(pages); 909 max_pages = calc_pages_for(0, (u64)len); 910 pages = kmalloc(max_pages * sizeof (*pages), 911 GFP_NOFS); 912 if (!pages) { 913 pool = fsc->wb_pagevec_pool; 914 pages = mempool_alloc(pool, GFP_NOFS); 915 BUG_ON(!pages); 916 } 917 918 len = 0; 919 } else if (page->index != 920 (offset + len) >> PAGE_SHIFT) { 921 if (num_ops >= (pool ? CEPH_OSD_SLAB_OPS : 922 CEPH_OSD_MAX_OPS)) { 923 redirty_page_for_writepage(wbc, page); 924 unlock_page(page); 925 break; 926 } 927 928 num_ops++; 929 offset = (u64)page_offset(page); 930 len = 0; 931 } 932 933 /* note position of first page in pvec */ 934 if (first < 0) 935 first = i; 936 dout("%p will write page %p idx %lu\n", 937 inode, page, page->index); 938 939 if (atomic_long_inc_return(&fsc->writeback_count) > 940 CONGESTION_ON_THRESH( 941 fsc->mount_options->congestion_kb)) { 942 set_bdi_congested(&fsc->backing_dev_info, 943 BLK_RW_ASYNC); 944 } 945 946 pages[locked_pages] = page; 947 locked_pages++; 948 len += PAGE_SIZE; 949 } 950 951 /* did we get anything? */ 952 if (!locked_pages) 953 goto release_pvec_pages; 954 if (i) { 955 int j; 956 BUG_ON(!locked_pages || first < 0); 957 958 if (pvec_pages && i == pvec_pages && 959 locked_pages < max_pages) { 960 dout("reached end pvec, trying for more\n"); 961 pagevec_reinit(&pvec); 962 goto get_more_pages; 963 } 964 965 /* shift unused pages over in the pvec... we 966 * will need to release them below. */ 967 for (j = i; j < pvec_pages; j++) { 968 dout(" pvec leftover page %p\n", pvec.pages[j]); 969 pvec.pages[j-i+first] = pvec.pages[j]; 970 } 971 pvec.nr -= i-first; 972 } 973 974 new_request: 975 offset = page_offset(pages[0]); 976 len = wsize; 977 978 req = ceph_osdc_new_request(&fsc->client->osdc, 979 &ci->i_layout, vino, 980 offset, &len, 0, num_ops, 981 CEPH_OSD_OP_WRITE, 982 CEPH_OSD_FLAG_WRITE | 983 CEPH_OSD_FLAG_ONDISK, 984 snapc, truncate_seq, 985 truncate_size, false); 986 if (IS_ERR(req)) { 987 req = ceph_osdc_new_request(&fsc->client->osdc, 988 &ci->i_layout, vino, 989 offset, &len, 0, 990 min(num_ops, 991 CEPH_OSD_SLAB_OPS), 992 CEPH_OSD_OP_WRITE, 993 CEPH_OSD_FLAG_WRITE | 994 CEPH_OSD_FLAG_ONDISK, 995 snapc, truncate_seq, 996 truncate_size, true); 997 BUG_ON(IS_ERR(req)); 998 } 999 BUG_ON(len < page_offset(pages[locked_pages - 1]) + 1000 PAGE_SIZE - offset); 1001 1002 req->r_callback = writepages_finish; 1003 req->r_inode = inode; 1004 1005 /* Format the osd request message and submit the write */ 1006 len = 0; 1007 data_pages = pages; 1008 op_idx = 0; 1009 for (i = 0; i < locked_pages; i++) { 1010 u64 cur_offset = page_offset(pages[i]); 1011 if (offset + len != cur_offset) { 1012 if (op_idx + do_sync + 1 == req->r_num_ops) 1013 break; 1014 osd_req_op_extent_dup_last(req, op_idx, 1015 cur_offset - offset); 1016 dout("writepages got pages at %llu~%llu\n", 1017 offset, len); 1018 osd_req_op_extent_osd_data_pages(req, op_idx, 1019 data_pages, len, 0, 1020 !!pool, false); 1021 osd_req_op_extent_update(req, op_idx, len); 1022 1023 len = 0; 1024 offset = cur_offset; 1025 data_pages = pages + i; 1026 op_idx++; 1027 } 1028 1029 set_page_writeback(pages[i]); 1030 len += PAGE_SIZE; 1031 } 1032 1033 if (snap_size != -1) { 1034 len = min(len, snap_size - offset); 1035 } else if (i == locked_pages) { 1036 /* writepages_finish() clears writeback pages 1037 * according to the data length, so make sure 1038 * data length covers all locked pages */ 1039 u64 min_len = len + 1 - PAGE_SIZE; 1040 len = min(len, (u64)i_size_read(inode) - offset); 1041 len = max(len, min_len); 1042 } 1043 dout("writepages got pages at %llu~%llu\n", offset, len); 1044 1045 osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len, 1046 0, !!pool, false); 1047 osd_req_op_extent_update(req, op_idx, len); 1048 1049 if (do_sync) { 1050 op_idx++; 1051 osd_req_op_init(req, op_idx, CEPH_OSD_OP_STARTSYNC, 0); 1052 } 1053 BUG_ON(op_idx + 1 != req->r_num_ops); 1054 1055 pool = NULL; 1056 if (i < locked_pages) { 1057 BUG_ON(num_ops <= req->r_num_ops); 1058 num_ops -= req->r_num_ops; 1059 num_ops += do_sync; 1060 locked_pages -= i; 1061 1062 /* allocate new pages array for next request */ 1063 data_pages = pages; 1064 pages = kmalloc(locked_pages * sizeof (*pages), 1065 GFP_NOFS); 1066 if (!pages) { 1067 pool = fsc->wb_pagevec_pool; 1068 pages = mempool_alloc(pool, GFP_NOFS); 1069 BUG_ON(!pages); 1070 } 1071 memcpy(pages, data_pages + i, 1072 locked_pages * sizeof(*pages)); 1073 memset(data_pages + i, 0, 1074 locked_pages * sizeof(*pages)); 1075 } else { 1076 BUG_ON(num_ops != req->r_num_ops); 1077 index = pages[i - 1]->index + 1; 1078 /* request message now owns the pages array */ 1079 pages = NULL; 1080 } 1081 1082 req->r_mtime = inode->i_mtime; 1083 rc = ceph_osdc_start_request(&fsc->client->osdc, req, true); 1084 BUG_ON(rc); 1085 req = NULL; 1086 1087 wbc->nr_to_write -= i; 1088 if (pages) 1089 goto new_request; 1090 1091 if (wbc->nr_to_write <= 0) 1092 done = 1; 1093 1094 release_pvec_pages: 1095 dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr, 1096 pvec.nr ? pvec.pages[0] : NULL); 1097 pagevec_release(&pvec); 1098 1099 if (locked_pages && !done) 1100 goto retry; 1101 } 1102 1103 if (should_loop && !done) { 1104 /* more to do; loop back to beginning of file */ 1105 dout("writepages looping back to beginning of file\n"); 1106 should_loop = 0; 1107 index = 0; 1108 goto retry; 1109 } 1110 1111 if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) 1112 mapping->writeback_index = index; 1113 1114 out: 1115 ceph_osdc_put_request(req); 1116 ceph_put_snap_context(snapc); 1117 dout("writepages done, rc = %d\n", rc); 1118 return rc; 1119 } 1120 1121 1122 1123 /* 1124 * See if a given @snapc is either writeable, or already written. 1125 */ 1126 static int context_is_writeable_or_written(struct inode *inode, 1127 struct ceph_snap_context *snapc) 1128 { 1129 struct ceph_snap_context *oldest = get_oldest_context(inode, NULL); 1130 int ret = !oldest || snapc->seq <= oldest->seq; 1131 1132 ceph_put_snap_context(oldest); 1133 return ret; 1134 } 1135 1136 /* 1137 * We are only allowed to write into/dirty the page if the page is 1138 * clean, or already dirty within the same snap context. 1139 * 1140 * called with page locked. 1141 * return success with page locked, 1142 * or any failure (incl -EAGAIN) with page unlocked. 1143 */ 1144 static int ceph_update_writeable_page(struct file *file, 1145 loff_t pos, unsigned len, 1146 struct page *page) 1147 { 1148 struct inode *inode = file_inode(file); 1149 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 1150 struct ceph_inode_info *ci = ceph_inode(inode); 1151 loff_t page_off = pos & PAGE_MASK; 1152 int pos_in_page = pos & ~PAGE_MASK; 1153 int end_in_page = pos_in_page + len; 1154 loff_t i_size; 1155 int r; 1156 struct ceph_snap_context *snapc, *oldest; 1157 1158 if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 1159 dout(" page %p forced umount\n", page); 1160 unlock_page(page); 1161 return -EIO; 1162 } 1163 1164 retry_locked: 1165 /* writepages currently holds page lock, but if we change that later, */ 1166 wait_on_page_writeback(page); 1167 1168 snapc = page_snap_context(page); 1169 if (snapc && snapc != ci->i_head_snapc) { 1170 /* 1171 * this page is already dirty in another (older) snap 1172 * context! is it writeable now? 1173 */ 1174 oldest = get_oldest_context(inode, NULL); 1175 1176 if (snapc->seq > oldest->seq) { 1177 ceph_put_snap_context(oldest); 1178 dout(" page %p snapc %p not current or oldest\n", 1179 page, snapc); 1180 /* 1181 * queue for writeback, and wait for snapc to 1182 * be writeable or written 1183 */ 1184 snapc = ceph_get_snap_context(snapc); 1185 unlock_page(page); 1186 ceph_queue_writeback(inode); 1187 r = wait_event_killable(ci->i_cap_wq, 1188 context_is_writeable_or_written(inode, snapc)); 1189 ceph_put_snap_context(snapc); 1190 if (r == -ERESTARTSYS) 1191 return r; 1192 return -EAGAIN; 1193 } 1194 ceph_put_snap_context(oldest); 1195 1196 /* yay, writeable, do it now (without dropping page lock) */ 1197 dout(" page %p snapc %p not current, but oldest\n", 1198 page, snapc); 1199 if (!clear_page_dirty_for_io(page)) 1200 goto retry_locked; 1201 r = writepage_nounlock(page, NULL); 1202 if (r < 0) 1203 goto fail_nosnap; 1204 goto retry_locked; 1205 } 1206 1207 if (PageUptodate(page)) { 1208 dout(" page %p already uptodate\n", page); 1209 return 0; 1210 } 1211 1212 /* full page? */ 1213 if (pos_in_page == 0 && len == PAGE_SIZE) 1214 return 0; 1215 1216 /* past end of file? */ 1217 i_size = i_size_read(inode); 1218 1219 if (page_off >= i_size || 1220 (pos_in_page == 0 && (pos+len) >= i_size && 1221 end_in_page - pos_in_page != PAGE_SIZE)) { 1222 dout(" zeroing %p 0 - %d and %d - %d\n", 1223 page, pos_in_page, end_in_page, (int)PAGE_SIZE); 1224 zero_user_segments(page, 1225 0, pos_in_page, 1226 end_in_page, PAGE_SIZE); 1227 return 0; 1228 } 1229 1230 /* we need to read it. */ 1231 r = readpage_nounlock(file, page); 1232 if (r < 0) 1233 goto fail_nosnap; 1234 goto retry_locked; 1235 fail_nosnap: 1236 unlock_page(page); 1237 return r; 1238 } 1239 1240 /* 1241 * We are only allowed to write into/dirty the page if the page is 1242 * clean, or already dirty within the same snap context. 1243 */ 1244 static int ceph_write_begin(struct file *file, struct address_space *mapping, 1245 loff_t pos, unsigned len, unsigned flags, 1246 struct page **pagep, void **fsdata) 1247 { 1248 struct inode *inode = file_inode(file); 1249 struct page *page; 1250 pgoff_t index = pos >> PAGE_SHIFT; 1251 int r; 1252 1253 do { 1254 /* get a page */ 1255 page = grab_cache_page_write_begin(mapping, index, 0); 1256 if (!page) 1257 return -ENOMEM; 1258 1259 dout("write_begin file %p inode %p page %p %d~%d\n", file, 1260 inode, page, (int)pos, (int)len); 1261 1262 r = ceph_update_writeable_page(file, pos, len, page); 1263 if (r < 0) 1264 put_page(page); 1265 else 1266 *pagep = page; 1267 } while (r == -EAGAIN); 1268 1269 return r; 1270 } 1271 1272 /* 1273 * we don't do anything in here that simple_write_end doesn't do 1274 * except adjust dirty page accounting 1275 */ 1276 static int ceph_write_end(struct file *file, struct address_space *mapping, 1277 loff_t pos, unsigned len, unsigned copied, 1278 struct page *page, void *fsdata) 1279 { 1280 struct inode *inode = file_inode(file); 1281 unsigned from = pos & (PAGE_SIZE - 1); 1282 int check_cap = 0; 1283 1284 dout("write_end file %p inode %p page %p %d~%d (%d)\n", file, 1285 inode, page, (int)pos, (int)copied, (int)len); 1286 1287 /* zero the stale part of the page if we did a short copy */ 1288 if (copied < len) 1289 zero_user_segment(page, from+copied, len); 1290 1291 /* did file size increase? */ 1292 if (pos+copied > i_size_read(inode)) 1293 check_cap = ceph_inode_set_size(inode, pos+copied); 1294 1295 if (!PageUptodate(page)) 1296 SetPageUptodate(page); 1297 1298 set_page_dirty(page); 1299 1300 unlock_page(page); 1301 put_page(page); 1302 1303 if (check_cap) 1304 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL); 1305 1306 return copied; 1307 } 1308 1309 /* 1310 * we set .direct_IO to indicate direct io is supported, but since we 1311 * intercept O_DIRECT reads and writes early, this function should 1312 * never get called. 1313 */ 1314 static ssize_t ceph_direct_io(struct kiocb *iocb, struct iov_iter *iter) 1315 { 1316 WARN_ON(1); 1317 return -EINVAL; 1318 } 1319 1320 const struct address_space_operations ceph_aops = { 1321 .readpage = ceph_readpage, 1322 .readpages = ceph_readpages, 1323 .writepage = ceph_writepage, 1324 .writepages = ceph_writepages_start, 1325 .write_begin = ceph_write_begin, 1326 .write_end = ceph_write_end, 1327 .set_page_dirty = ceph_set_page_dirty, 1328 .invalidatepage = ceph_invalidatepage, 1329 .releasepage = ceph_releasepage, 1330 .direct_IO = ceph_direct_io, 1331 }; 1332 1333 static void ceph_block_sigs(sigset_t *oldset) 1334 { 1335 sigset_t mask; 1336 siginitsetinv(&mask, sigmask(SIGKILL)); 1337 sigprocmask(SIG_BLOCK, &mask, oldset); 1338 } 1339 1340 static void ceph_restore_sigs(sigset_t *oldset) 1341 { 1342 sigprocmask(SIG_SETMASK, oldset, NULL); 1343 } 1344 1345 /* 1346 * vm ops 1347 */ 1348 static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) 1349 { 1350 struct inode *inode = file_inode(vma->vm_file); 1351 struct ceph_inode_info *ci = ceph_inode(inode); 1352 struct ceph_file_info *fi = vma->vm_file->private_data; 1353 struct page *pinned_page = NULL; 1354 loff_t off = vmf->pgoff << PAGE_SHIFT; 1355 int want, got, ret; 1356 sigset_t oldset; 1357 1358 ceph_block_sigs(&oldset); 1359 1360 dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n", 1361 inode, ceph_vinop(inode), off, (size_t)PAGE_SIZE); 1362 if (fi->fmode & CEPH_FILE_MODE_LAZY) 1363 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; 1364 else 1365 want = CEPH_CAP_FILE_CACHE; 1366 1367 got = 0; 1368 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page); 1369 if (ret < 0) 1370 goto out_restore; 1371 1372 dout("filemap_fault %p %llu~%zd got cap refs on %s\n", 1373 inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got)); 1374 1375 if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) || 1376 ci->i_inline_version == CEPH_INLINE_NONE) 1377 ret = filemap_fault(vma, vmf); 1378 else 1379 ret = -EAGAIN; 1380 1381 dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", 1382 inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got), ret); 1383 if (pinned_page) 1384 put_page(pinned_page); 1385 ceph_put_cap_refs(ci, got); 1386 1387 if (ret != -EAGAIN) 1388 goto out_restore; 1389 1390 /* read inline data */ 1391 if (off >= PAGE_SIZE) { 1392 /* does not support inline data > PAGE_SIZE */ 1393 ret = VM_FAULT_SIGBUS; 1394 } else { 1395 int ret1; 1396 struct address_space *mapping = inode->i_mapping; 1397 struct page *page = find_or_create_page(mapping, 0, 1398 mapping_gfp_constraint(mapping, 1399 ~__GFP_FS)); 1400 if (!page) { 1401 ret = VM_FAULT_OOM; 1402 goto out_inline; 1403 } 1404 ret1 = __ceph_do_getattr(inode, page, 1405 CEPH_STAT_CAP_INLINE_DATA, true); 1406 if (ret1 < 0 || off >= i_size_read(inode)) { 1407 unlock_page(page); 1408 put_page(page); 1409 if (ret1 < 0) 1410 ret = ret1; 1411 else 1412 ret = VM_FAULT_SIGBUS; 1413 goto out_inline; 1414 } 1415 if (ret1 < PAGE_SIZE) 1416 zero_user_segment(page, ret1, PAGE_SIZE); 1417 else 1418 flush_dcache_page(page); 1419 SetPageUptodate(page); 1420 vmf->page = page; 1421 ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED; 1422 out_inline: 1423 dout("filemap_fault %p %llu~%zd read inline data ret %d\n", 1424 inode, off, (size_t)PAGE_SIZE, ret); 1425 } 1426 out_restore: 1427 ceph_restore_sigs(&oldset); 1428 if (ret < 0) 1429 ret = (ret == -ENOMEM) ? VM_FAULT_OOM : VM_FAULT_SIGBUS; 1430 1431 return ret; 1432 } 1433 1434 /* 1435 * Reuse write_begin here for simplicity. 1436 */ 1437 static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) 1438 { 1439 struct inode *inode = file_inode(vma->vm_file); 1440 struct ceph_inode_info *ci = ceph_inode(inode); 1441 struct ceph_file_info *fi = vma->vm_file->private_data; 1442 struct ceph_cap_flush *prealloc_cf; 1443 struct page *page = vmf->page; 1444 loff_t off = page_offset(page); 1445 loff_t size = i_size_read(inode); 1446 size_t len; 1447 int want, got, ret; 1448 sigset_t oldset; 1449 1450 prealloc_cf = ceph_alloc_cap_flush(); 1451 if (!prealloc_cf) 1452 return VM_FAULT_OOM; 1453 1454 ceph_block_sigs(&oldset); 1455 1456 if (ci->i_inline_version != CEPH_INLINE_NONE) { 1457 struct page *locked_page = NULL; 1458 if (off == 0) { 1459 lock_page(page); 1460 locked_page = page; 1461 } 1462 ret = ceph_uninline_data(vma->vm_file, locked_page); 1463 if (locked_page) 1464 unlock_page(locked_page); 1465 if (ret < 0) 1466 goto out_free; 1467 } 1468 1469 if (off + PAGE_SIZE <= size) 1470 len = PAGE_SIZE; 1471 else 1472 len = size & ~PAGE_MASK; 1473 1474 dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n", 1475 inode, ceph_vinop(inode), off, len, size); 1476 if (fi->fmode & CEPH_FILE_MODE_LAZY) 1477 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; 1478 else 1479 want = CEPH_CAP_FILE_BUFFER; 1480 1481 got = 0; 1482 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len, 1483 &got, NULL); 1484 if (ret < 0) 1485 goto out_free; 1486 1487 dout("page_mkwrite %p %llu~%zd got cap refs on %s\n", 1488 inode, off, len, ceph_cap_string(got)); 1489 1490 /* Update time before taking page lock */ 1491 file_update_time(vma->vm_file); 1492 1493 do { 1494 lock_page(page); 1495 1496 if ((off > size) || (page->mapping != inode->i_mapping)) { 1497 unlock_page(page); 1498 ret = VM_FAULT_NOPAGE; 1499 break; 1500 } 1501 1502 ret = ceph_update_writeable_page(vma->vm_file, off, len, page); 1503 if (ret >= 0) { 1504 /* success. we'll keep the page locked. */ 1505 set_page_dirty(page); 1506 ret = VM_FAULT_LOCKED; 1507 } 1508 } while (ret == -EAGAIN); 1509 1510 if (ret == VM_FAULT_LOCKED || 1511 ci->i_inline_version != CEPH_INLINE_NONE) { 1512 int dirty; 1513 spin_lock(&ci->i_ceph_lock); 1514 ci->i_inline_version = CEPH_INLINE_NONE; 1515 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, 1516 &prealloc_cf); 1517 spin_unlock(&ci->i_ceph_lock); 1518 if (dirty) 1519 __mark_inode_dirty(inode, dirty); 1520 } 1521 1522 dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n", 1523 inode, off, len, ceph_cap_string(got), ret); 1524 ceph_put_cap_refs(ci, got); 1525 out_free: 1526 ceph_restore_sigs(&oldset); 1527 ceph_free_cap_flush(prealloc_cf); 1528 if (ret < 0) 1529 ret = (ret == -ENOMEM) ? VM_FAULT_OOM : VM_FAULT_SIGBUS; 1530 return ret; 1531 } 1532 1533 void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, 1534 char *data, size_t len) 1535 { 1536 struct address_space *mapping = inode->i_mapping; 1537 struct page *page; 1538 1539 if (locked_page) { 1540 page = locked_page; 1541 } else { 1542 if (i_size_read(inode) == 0) 1543 return; 1544 page = find_or_create_page(mapping, 0, 1545 mapping_gfp_constraint(mapping, 1546 ~__GFP_FS)); 1547 if (!page) 1548 return; 1549 if (PageUptodate(page)) { 1550 unlock_page(page); 1551 put_page(page); 1552 return; 1553 } 1554 } 1555 1556 dout("fill_inline_data %p %llx.%llx len %zu locked_page %p\n", 1557 inode, ceph_vinop(inode), len, locked_page); 1558 1559 if (len > 0) { 1560 void *kaddr = kmap_atomic(page); 1561 memcpy(kaddr, data, len); 1562 kunmap_atomic(kaddr); 1563 } 1564 1565 if (page != locked_page) { 1566 if (len < PAGE_SIZE) 1567 zero_user_segment(page, len, PAGE_SIZE); 1568 else 1569 flush_dcache_page(page); 1570 1571 SetPageUptodate(page); 1572 unlock_page(page); 1573 put_page(page); 1574 } 1575 } 1576 1577 int ceph_uninline_data(struct file *filp, struct page *locked_page) 1578 { 1579 struct inode *inode = file_inode(filp); 1580 struct ceph_inode_info *ci = ceph_inode(inode); 1581 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 1582 struct ceph_osd_request *req; 1583 struct page *page = NULL; 1584 u64 len, inline_version; 1585 int err = 0; 1586 bool from_pagecache = false; 1587 1588 spin_lock(&ci->i_ceph_lock); 1589 inline_version = ci->i_inline_version; 1590 spin_unlock(&ci->i_ceph_lock); 1591 1592 dout("uninline_data %p %llx.%llx inline_version %llu\n", 1593 inode, ceph_vinop(inode), inline_version); 1594 1595 if (inline_version == 1 || /* initial version, no data */ 1596 inline_version == CEPH_INLINE_NONE) 1597 goto out; 1598 1599 if (locked_page) { 1600 page = locked_page; 1601 WARN_ON(!PageUptodate(page)); 1602 } else if (ceph_caps_issued(ci) & 1603 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) { 1604 page = find_get_page(inode->i_mapping, 0); 1605 if (page) { 1606 if (PageUptodate(page)) { 1607 from_pagecache = true; 1608 lock_page(page); 1609 } else { 1610 put_page(page); 1611 page = NULL; 1612 } 1613 } 1614 } 1615 1616 if (page) { 1617 len = i_size_read(inode); 1618 if (len > PAGE_SIZE) 1619 len = PAGE_SIZE; 1620 } else { 1621 page = __page_cache_alloc(GFP_NOFS); 1622 if (!page) { 1623 err = -ENOMEM; 1624 goto out; 1625 } 1626 err = __ceph_do_getattr(inode, page, 1627 CEPH_STAT_CAP_INLINE_DATA, true); 1628 if (err < 0) { 1629 /* no inline data */ 1630 if (err == -ENODATA) 1631 err = 0; 1632 goto out; 1633 } 1634 len = err; 1635 } 1636 1637 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 1638 ceph_vino(inode), 0, &len, 0, 1, 1639 CEPH_OSD_OP_CREATE, 1640 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 1641 NULL, 0, 0, false); 1642 if (IS_ERR(req)) { 1643 err = PTR_ERR(req); 1644 goto out; 1645 } 1646 1647 req->r_mtime = inode->i_mtime; 1648 err = ceph_osdc_start_request(&fsc->client->osdc, req, false); 1649 if (!err) 1650 err = ceph_osdc_wait_request(&fsc->client->osdc, req); 1651 ceph_osdc_put_request(req); 1652 if (err < 0) 1653 goto out; 1654 1655 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 1656 ceph_vino(inode), 0, &len, 1, 3, 1657 CEPH_OSD_OP_WRITE, 1658 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 1659 NULL, ci->i_truncate_seq, 1660 ci->i_truncate_size, false); 1661 if (IS_ERR(req)) { 1662 err = PTR_ERR(req); 1663 goto out; 1664 } 1665 1666 osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false); 1667 1668 { 1669 __le64 xattr_buf = cpu_to_le64(inline_version); 1670 err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR, 1671 "inline_version", &xattr_buf, 1672 sizeof(xattr_buf), 1673 CEPH_OSD_CMPXATTR_OP_GT, 1674 CEPH_OSD_CMPXATTR_MODE_U64); 1675 if (err) 1676 goto out_put; 1677 } 1678 1679 { 1680 char xattr_buf[32]; 1681 int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf), 1682 "%llu", inline_version); 1683 err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR, 1684 "inline_version", 1685 xattr_buf, xattr_len, 0, 0); 1686 if (err) 1687 goto out_put; 1688 } 1689 1690 req->r_mtime = inode->i_mtime; 1691 err = ceph_osdc_start_request(&fsc->client->osdc, req, false); 1692 if (!err) 1693 err = ceph_osdc_wait_request(&fsc->client->osdc, req); 1694 out_put: 1695 ceph_osdc_put_request(req); 1696 if (err == -ECANCELED) 1697 err = 0; 1698 out: 1699 if (page && page != locked_page) { 1700 if (from_pagecache) { 1701 unlock_page(page); 1702 put_page(page); 1703 } else 1704 __free_pages(page, 0); 1705 } 1706 1707 dout("uninline_data %p %llx.%llx inline_version %llu = %d\n", 1708 inode, ceph_vinop(inode), inline_version, err); 1709 return err; 1710 } 1711 1712 static const struct vm_operations_struct ceph_vmops = { 1713 .fault = ceph_filemap_fault, 1714 .page_mkwrite = ceph_page_mkwrite, 1715 }; 1716 1717 int ceph_mmap(struct file *file, struct vm_area_struct *vma) 1718 { 1719 struct address_space *mapping = file->f_mapping; 1720 1721 if (!mapping->a_ops->readpage) 1722 return -ENOEXEC; 1723 file_accessed(file); 1724 vma->vm_ops = &ceph_vmops; 1725 return 0; 1726 } 1727 1728 enum { 1729 POOL_READ = 1, 1730 POOL_WRITE = 2, 1731 }; 1732 1733 static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool) 1734 { 1735 struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode); 1736 struct ceph_mds_client *mdsc = fsc->mdsc; 1737 struct ceph_osd_request *rd_req = NULL, *wr_req = NULL; 1738 struct rb_node **p, *parent; 1739 struct ceph_pool_perm *perm; 1740 struct page **pages; 1741 int err = 0, err2 = 0, have = 0; 1742 1743 down_read(&mdsc->pool_perm_rwsem); 1744 p = &mdsc->pool_perm_tree.rb_node; 1745 while (*p) { 1746 perm = rb_entry(*p, struct ceph_pool_perm, node); 1747 if (pool < perm->pool) 1748 p = &(*p)->rb_left; 1749 else if (pool > perm->pool) 1750 p = &(*p)->rb_right; 1751 else { 1752 have = perm->perm; 1753 break; 1754 } 1755 } 1756 up_read(&mdsc->pool_perm_rwsem); 1757 if (*p) 1758 goto out; 1759 1760 dout("__ceph_pool_perm_get pool %u no perm cached\n", pool); 1761 1762 down_write(&mdsc->pool_perm_rwsem); 1763 parent = NULL; 1764 while (*p) { 1765 parent = *p; 1766 perm = rb_entry(parent, struct ceph_pool_perm, node); 1767 if (pool < perm->pool) 1768 p = &(*p)->rb_left; 1769 else if (pool > perm->pool) 1770 p = &(*p)->rb_right; 1771 else { 1772 have = perm->perm; 1773 break; 1774 } 1775 } 1776 if (*p) { 1777 up_write(&mdsc->pool_perm_rwsem); 1778 goto out; 1779 } 1780 1781 rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL, 1782 1, false, GFP_NOFS); 1783 if (!rd_req) { 1784 err = -ENOMEM; 1785 goto out_unlock; 1786 } 1787 1788 rd_req->r_flags = CEPH_OSD_FLAG_READ; 1789 osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0); 1790 rd_req->r_base_oloc.pool = pool; 1791 ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino); 1792 1793 err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS); 1794 if (err) 1795 goto out_unlock; 1796 1797 wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL, 1798 1, false, GFP_NOFS); 1799 if (!wr_req) { 1800 err = -ENOMEM; 1801 goto out_unlock; 1802 } 1803 1804 wr_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ACK; 1805 osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL); 1806 ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc); 1807 ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid); 1808 1809 err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS); 1810 if (err) 1811 goto out_unlock; 1812 1813 /* one page should be large enough for STAT data */ 1814 pages = ceph_alloc_page_vector(1, GFP_KERNEL); 1815 if (IS_ERR(pages)) { 1816 err = PTR_ERR(pages); 1817 goto out_unlock; 1818 } 1819 1820 osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE, 1821 0, false, true); 1822 err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false); 1823 1824 wr_req->r_mtime = ci->vfs_inode.i_mtime; 1825 err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false); 1826 1827 if (!err) 1828 err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req); 1829 if (!err2) 1830 err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req); 1831 1832 if (err >= 0 || err == -ENOENT) 1833 have |= POOL_READ; 1834 else if (err != -EPERM) 1835 goto out_unlock; 1836 1837 if (err2 == 0 || err2 == -EEXIST) 1838 have |= POOL_WRITE; 1839 else if (err2 != -EPERM) { 1840 err = err2; 1841 goto out_unlock; 1842 } 1843 1844 perm = kmalloc(sizeof(*perm), GFP_NOFS); 1845 if (!perm) { 1846 err = -ENOMEM; 1847 goto out_unlock; 1848 } 1849 1850 perm->pool = pool; 1851 perm->perm = have; 1852 rb_link_node(&perm->node, parent, p); 1853 rb_insert_color(&perm->node, &mdsc->pool_perm_tree); 1854 err = 0; 1855 out_unlock: 1856 up_write(&mdsc->pool_perm_rwsem); 1857 1858 ceph_osdc_put_request(rd_req); 1859 ceph_osdc_put_request(wr_req); 1860 out: 1861 if (!err) 1862 err = have; 1863 dout("__ceph_pool_perm_get pool %u result = %d\n", pool, err); 1864 return err; 1865 } 1866 1867 int ceph_pool_perm_check(struct ceph_inode_info *ci, int need) 1868 { 1869 u32 pool; 1870 int ret, flags; 1871 1872 /* does not support pool namespace yet */ 1873 if (ci->i_pool_ns_len) 1874 return -EIO; 1875 1876 if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode), 1877 NOPOOLPERM)) 1878 return 0; 1879 1880 spin_lock(&ci->i_ceph_lock); 1881 flags = ci->i_ceph_flags; 1882 pool = ceph_file_layout_pg_pool(ci->i_layout); 1883 spin_unlock(&ci->i_ceph_lock); 1884 check: 1885 if (flags & CEPH_I_POOL_PERM) { 1886 if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) { 1887 dout("ceph_pool_perm_check pool %u no read perm\n", 1888 pool); 1889 return -EPERM; 1890 } 1891 if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) { 1892 dout("ceph_pool_perm_check pool %u no write perm\n", 1893 pool); 1894 return -EPERM; 1895 } 1896 return 0; 1897 } 1898 1899 ret = __ceph_pool_perm_get(ci, pool); 1900 if (ret < 0) 1901 return ret; 1902 1903 flags = CEPH_I_POOL_PERM; 1904 if (ret & POOL_READ) 1905 flags |= CEPH_I_POOL_RD; 1906 if (ret & POOL_WRITE) 1907 flags |= CEPH_I_POOL_WR; 1908 1909 spin_lock(&ci->i_ceph_lock); 1910 if (pool == ceph_file_layout_pg_pool(ci->i_layout)) { 1911 ci->i_ceph_flags = flags; 1912 } else { 1913 pool = ceph_file_layout_pg_pool(ci->i_layout); 1914 flags = ci->i_ceph_flags; 1915 } 1916 spin_unlock(&ci->i_ceph_lock); 1917 goto check; 1918 } 1919 1920 void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc) 1921 { 1922 struct ceph_pool_perm *perm; 1923 struct rb_node *n; 1924 1925 while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) { 1926 n = rb_first(&mdsc->pool_perm_tree); 1927 perm = rb_entry(n, struct ceph_pool_perm, node); 1928 rb_erase(n, &mdsc->pool_perm_tree); 1929 kfree(perm); 1930 } 1931 } 1932