1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/backing-dev.h> 4 #include <linux/fs.h> 5 #include <linux/mm.h> 6 #include <linux/pagemap.h> 7 #include <linux/writeback.h> /* generic_writepages */ 8 #include <linux/slab.h> 9 #include <linux/pagevec.h> 10 #include <linux/task_io_accounting_ops.h> 11 12 #include "super.h" 13 #include "mds_client.h" 14 #include <linux/ceph/osd_client.h> 15 16 /* 17 * Ceph address space ops. 18 * 19 * There are a few funny things going on here. 20 * 21 * The page->private field is used to reference a struct 22 * ceph_snap_context for _every_ dirty page. This indicates which 23 * snapshot the page was logically dirtied in, and thus which snap 24 * context needs to be associated with the osd write during writeback. 25 * 26 * Similarly, struct ceph_inode_info maintains a set of counters to 27 * count dirty pages on the inode. In the absence of snapshots, 28 * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count. 29 * 30 * When a snapshot is taken (that is, when the client receives 31 * notification that a snapshot was taken), each inode with caps and 32 * with dirty pages (dirty pages implies there is a cap) gets a new 33 * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending 34 * order, new snaps go to the tail). The i_wrbuffer_ref_head count is 35 * moved to capsnap->dirty. (Unless a sync write is currently in 36 * progress. In that case, the capsnap is said to be "pending", new 37 * writes cannot start, and the capsnap isn't "finalized" until the 38 * write completes (or fails) and a final size/mtime for the inode for 39 * that snap can be settled upon.) i_wrbuffer_ref_head is reset to 0. 40 * 41 * On writeback, we must submit writes to the osd IN SNAP ORDER. So, 42 * we look for the first capsnap in i_cap_snaps and write out pages in 43 * that snap context _only_. Then we move on to the next capsnap, 44 * eventually reaching the "live" or "head" context (i.e., pages that 45 * are not yet snapped) and are writing the most recently dirtied 46 * pages. 47 * 48 * Invalidate and so forth must take care to ensure the dirty page 49 * accounting is preserved. 50 */ 51 52 #define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10)) 53 #define CONGESTION_OFF_THRESH(congestion_kb) \ 54 (CONGESTION_ON_THRESH(congestion_kb) - \ 55 (CONGESTION_ON_THRESH(congestion_kb) >> 2)) 56 57 static inline struct ceph_snap_context *page_snap_context(struct page *page) 58 { 59 if (PagePrivate(page)) 60 return (void *)page->private; 61 return NULL; 62 } 63 64 /* 65 * Dirty a page. Optimistically adjust accounting, on the assumption 66 * that we won't race with invalidate. If we do, readjust. 67 */ 68 static int ceph_set_page_dirty(struct page *page) 69 { 70 struct address_space *mapping = page->mapping; 71 struct inode *inode; 72 struct ceph_inode_info *ci; 73 int undo = 0; 74 struct ceph_snap_context *snapc; 75 76 if (unlikely(!mapping)) 77 return !TestSetPageDirty(page); 78 79 if (TestSetPageDirty(page)) { 80 dout("%p set_page_dirty %p idx %lu -- already dirty\n", 81 mapping->host, page, page->index); 82 return 0; 83 } 84 85 inode = mapping->host; 86 ci = ceph_inode(inode); 87 88 /* 89 * Note that we're grabbing a snapc ref here without holding 90 * any locks! 91 */ 92 snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context); 93 94 /* dirty the head */ 95 spin_lock(&ci->i_ceph_lock); 96 if (ci->i_head_snapc == NULL) 97 ci->i_head_snapc = ceph_get_snap_context(snapc); 98 ++ci->i_wrbuffer_ref_head; 99 if (ci->i_wrbuffer_ref == 0) 100 ihold(inode); 101 ++ci->i_wrbuffer_ref; 102 dout("%p set_page_dirty %p idx %lu head %d/%d -> %d/%d " 103 "snapc %p seq %lld (%d snaps)\n", 104 mapping->host, page, page->index, 105 ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1, 106 ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head, 107 snapc, snapc->seq, snapc->num_snaps); 108 spin_unlock(&ci->i_ceph_lock); 109 110 /* now adjust page */ 111 spin_lock_irq(&mapping->tree_lock); 112 if (page->mapping) { /* Race with truncate? */ 113 WARN_ON_ONCE(!PageUptodate(page)); 114 account_page_dirtied(page, page->mapping); 115 radix_tree_tag_set(&mapping->page_tree, 116 page_index(page), PAGECACHE_TAG_DIRTY); 117 118 /* 119 * Reference snap context in page->private. Also set 120 * PagePrivate so that we get invalidatepage callback. 121 */ 122 page->private = (unsigned long)snapc; 123 SetPagePrivate(page); 124 } else { 125 dout("ANON set_page_dirty %p (raced truncate?)\n", page); 126 undo = 1; 127 } 128 129 spin_unlock_irq(&mapping->tree_lock); 130 131 if (undo) 132 /* whoops, we failed to dirty the page */ 133 ceph_put_wrbuffer_cap_refs(ci, 1, snapc); 134 135 __mark_inode_dirty(mapping->host, I_DIRTY_PAGES); 136 137 BUG_ON(!PageDirty(page)); 138 return 1; 139 } 140 141 /* 142 * If we are truncating the full page (i.e. offset == 0), adjust the 143 * dirty page counters appropriately. Only called if there is private 144 * data on the page. 145 */ 146 static void ceph_invalidatepage(struct page *page, unsigned long offset) 147 { 148 struct inode *inode; 149 struct ceph_inode_info *ci; 150 struct ceph_snap_context *snapc = page_snap_context(page); 151 152 BUG_ON(!PageLocked(page)); 153 BUG_ON(!PagePrivate(page)); 154 BUG_ON(!page->mapping); 155 156 inode = page->mapping->host; 157 158 /* 159 * We can get non-dirty pages here due to races between 160 * set_page_dirty and truncate_complete_page; just spit out a 161 * warning, in case we end up with accounting problems later. 162 */ 163 if (!PageDirty(page)) 164 pr_err("%p invalidatepage %p page not dirty\n", inode, page); 165 166 if (offset == 0) 167 ClearPageChecked(page); 168 169 ci = ceph_inode(inode); 170 if (offset == 0) { 171 dout("%p invalidatepage %p idx %lu full dirty page %lu\n", 172 inode, page, page->index, offset); 173 ceph_put_wrbuffer_cap_refs(ci, 1, snapc); 174 ceph_put_snap_context(snapc); 175 page->private = 0; 176 ClearPagePrivate(page); 177 } else { 178 dout("%p invalidatepage %p idx %lu partial dirty page\n", 179 inode, page, page->index); 180 } 181 } 182 183 /* just a sanity check */ 184 static int ceph_releasepage(struct page *page, gfp_t g) 185 { 186 struct inode *inode = page->mapping ? page->mapping->host : NULL; 187 dout("%p releasepage %p idx %lu\n", inode, page, page->index); 188 WARN_ON(PageDirty(page)); 189 WARN_ON(PagePrivate(page)); 190 return 0; 191 } 192 193 /* 194 * read a single page, without unlocking it. 195 */ 196 static int readpage_nounlock(struct file *filp, struct page *page) 197 { 198 struct inode *inode = file_inode(filp); 199 struct ceph_inode_info *ci = ceph_inode(inode); 200 struct ceph_osd_client *osdc = 201 &ceph_inode_to_client(inode)->client->osdc; 202 int err = 0; 203 u64 len = PAGE_CACHE_SIZE; 204 205 dout("readpage inode %p file %p page %p index %lu\n", 206 inode, filp, page, page->index); 207 err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, 208 (u64) page_offset(page), &len, 209 ci->i_truncate_seq, ci->i_truncate_size, 210 &page, 1, 0); 211 if (err == -ENOENT) 212 err = 0; 213 if (err < 0) { 214 SetPageError(page); 215 goto out; 216 } else if (err < PAGE_CACHE_SIZE) { 217 /* zero fill remainder of page */ 218 zero_user_segment(page, err, PAGE_CACHE_SIZE); 219 } 220 SetPageUptodate(page); 221 222 out: 223 return err < 0 ? err : 0; 224 } 225 226 static int ceph_readpage(struct file *filp, struct page *page) 227 { 228 int r = readpage_nounlock(filp, page); 229 unlock_page(page); 230 return r; 231 } 232 233 /* 234 * Finish an async read(ahead) op. 235 */ 236 static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) 237 { 238 struct inode *inode = req->r_inode; 239 int rc = req->r_result; 240 int bytes = le32_to_cpu(msg->hdr.data_len); 241 int i; 242 243 dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); 244 245 /* unlock all pages, zeroing any data we didn't read */ 246 for (i = 0; i < req->r_data.num_pages; i++, bytes -= PAGE_CACHE_SIZE) { 247 struct page *page = req->r_data.pages[i]; 248 249 if (bytes < (int)PAGE_CACHE_SIZE) { 250 /* zero (remainder of) page */ 251 int s = bytes < 0 ? 0 : bytes; 252 zero_user_segment(page, s, PAGE_CACHE_SIZE); 253 } 254 dout("finish_read %p uptodate %p idx %lu\n", inode, page, 255 page->index); 256 flush_dcache_page(page); 257 SetPageUptodate(page); 258 unlock_page(page); 259 page_cache_release(page); 260 } 261 kfree(req->r_data.pages); 262 } 263 264 static void ceph_unlock_page_vector(struct page **pages, int num_pages) 265 { 266 int i; 267 268 for (i = 0; i < num_pages; i++) 269 unlock_page(pages[i]); 270 } 271 272 /* 273 * start an async read(ahead) operation. return nr_pages we submitted 274 * a read for on success, or negative error code. 275 */ 276 static int start_read(struct inode *inode, struct list_head *page_list, int max) 277 { 278 struct ceph_osd_client *osdc = 279 &ceph_inode_to_client(inode)->client->osdc; 280 struct ceph_inode_info *ci = ceph_inode(inode); 281 struct page *page = list_entry(page_list->prev, struct page, lru); 282 struct ceph_osd_request *req; 283 u64 off; 284 u64 len; 285 int i; 286 struct page **pages; 287 pgoff_t next_index; 288 int nr_pages = 0; 289 int ret; 290 291 off = (u64) page_offset(page); 292 293 /* count pages */ 294 next_index = page->index; 295 list_for_each_entry_reverse(page, page_list, lru) { 296 if (page->index != next_index) 297 break; 298 nr_pages++; 299 next_index++; 300 if (max && nr_pages == max) 301 break; 302 } 303 len = nr_pages << PAGE_CACHE_SHIFT; 304 dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages, 305 off, len); 306 307 req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), 308 off, &len, 309 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 310 NULL, 0, 311 ci->i_truncate_seq, ci->i_truncate_size, 312 NULL, false); 313 if (IS_ERR(req)) 314 return PTR_ERR(req); 315 316 /* build page vector */ 317 nr_pages = calc_pages_for(0, len); 318 pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS); 319 ret = -ENOMEM; 320 if (!pages) 321 goto out; 322 for (i = 0; i < nr_pages; ++i) { 323 page = list_entry(page_list->prev, struct page, lru); 324 BUG_ON(PageLocked(page)); 325 list_del(&page->lru); 326 327 dout("start_read %p adding %p idx %lu\n", inode, page, 328 page->index); 329 if (add_to_page_cache_lru(page, &inode->i_data, page->index, 330 GFP_NOFS)) { 331 page_cache_release(page); 332 dout("start_read %p add_to_page_cache failed %p\n", 333 inode, page); 334 nr_pages = i; 335 goto out_pages; 336 } 337 pages[i] = page; 338 } 339 req->r_data.pages = pages; 340 req->r_data.num_pages = nr_pages; 341 req->r_data.alignment = 0; 342 req->r_callback = finish_read; 343 req->r_inode = inode; 344 345 dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len); 346 ret = ceph_osdc_start_request(osdc, req, false); 347 if (ret < 0) 348 goto out_pages; 349 ceph_osdc_put_request(req); 350 return nr_pages; 351 352 out_pages: 353 ceph_unlock_page_vector(pages, nr_pages); 354 ceph_release_page_vector(pages, nr_pages); 355 out: 356 ceph_osdc_put_request(req); 357 return ret; 358 } 359 360 361 /* 362 * Read multiple pages. Leave pages we don't read + unlock in page_list; 363 * the caller (VM) cleans them up. 364 */ 365 static int ceph_readpages(struct file *file, struct address_space *mapping, 366 struct list_head *page_list, unsigned nr_pages) 367 { 368 struct inode *inode = file_inode(file); 369 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 370 int rc = 0; 371 int max = 0; 372 373 if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE) 374 max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1) 375 >> PAGE_SHIFT; 376 377 dout("readpages %p file %p nr_pages %d max %d\n", inode, 378 file, nr_pages, 379 max); 380 while (!list_empty(page_list)) { 381 rc = start_read(inode, page_list, max); 382 if (rc < 0) 383 goto out; 384 BUG_ON(rc == 0); 385 } 386 out: 387 dout("readpages %p file %p ret %d\n", inode, file, rc); 388 return rc; 389 } 390 391 /* 392 * Get ref for the oldest snapc for an inode with dirty data... that is, the 393 * only snap context we are allowed to write back. 394 */ 395 static struct ceph_snap_context *get_oldest_context(struct inode *inode, 396 u64 *snap_size) 397 { 398 struct ceph_inode_info *ci = ceph_inode(inode); 399 struct ceph_snap_context *snapc = NULL; 400 struct ceph_cap_snap *capsnap = NULL; 401 402 spin_lock(&ci->i_ceph_lock); 403 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 404 dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap, 405 capsnap->context, capsnap->dirty_pages); 406 if (capsnap->dirty_pages) { 407 snapc = ceph_get_snap_context(capsnap->context); 408 if (snap_size) 409 *snap_size = capsnap->size; 410 break; 411 } 412 } 413 if (!snapc && ci->i_wrbuffer_ref_head) { 414 snapc = ceph_get_snap_context(ci->i_head_snapc); 415 dout(" head snapc %p has %d dirty pages\n", 416 snapc, ci->i_wrbuffer_ref_head); 417 } 418 spin_unlock(&ci->i_ceph_lock); 419 return snapc; 420 } 421 422 /* 423 * Write a single page, but leave the page locked. 424 * 425 * If we get a write error, set the page error bit, but still adjust the 426 * dirty page accounting (i.e., page is no longer dirty). 427 */ 428 static int writepage_nounlock(struct page *page, struct writeback_control *wbc) 429 { 430 struct inode *inode; 431 struct ceph_inode_info *ci; 432 struct ceph_fs_client *fsc; 433 struct ceph_osd_client *osdc; 434 loff_t page_off = page_offset(page); 435 int len = PAGE_CACHE_SIZE; 436 loff_t i_size; 437 int err = 0; 438 struct ceph_snap_context *snapc, *oldest; 439 u64 snap_size = 0; 440 long writeback_stat; 441 442 dout("writepage %p idx %lu\n", page, page->index); 443 444 if (!page->mapping || !page->mapping->host) { 445 dout("writepage %p - no mapping\n", page); 446 return -EFAULT; 447 } 448 inode = page->mapping->host; 449 ci = ceph_inode(inode); 450 fsc = ceph_inode_to_client(inode); 451 osdc = &fsc->client->osdc; 452 453 /* verify this is a writeable snap context */ 454 snapc = page_snap_context(page); 455 if (snapc == NULL) { 456 dout("writepage %p page %p not dirty?\n", inode, page); 457 goto out; 458 } 459 oldest = get_oldest_context(inode, &snap_size); 460 if (snapc->seq > oldest->seq) { 461 dout("writepage %p page %p snapc %p not writeable - noop\n", 462 inode, page, snapc); 463 /* we should only noop if called by kswapd */ 464 WARN_ON((current->flags & PF_MEMALLOC) == 0); 465 ceph_put_snap_context(oldest); 466 goto out; 467 } 468 ceph_put_snap_context(oldest); 469 470 /* is this a partial page at end of file? */ 471 if (snap_size) 472 i_size = snap_size; 473 else 474 i_size = i_size_read(inode); 475 if (i_size < page_off + len) 476 len = i_size - page_off; 477 478 dout("writepage %p page %p index %lu on %llu~%u snapc %p\n", 479 inode, page, page->index, page_off, len, snapc); 480 481 writeback_stat = atomic_long_inc_return(&fsc->writeback_count); 482 if (writeback_stat > 483 CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb)) 484 set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC); 485 486 set_page_writeback(page); 487 err = ceph_osdc_writepages(osdc, ceph_vino(inode), 488 &ci->i_layout, snapc, 489 page_off, len, 490 ci->i_truncate_seq, ci->i_truncate_size, 491 &inode->i_mtime, &page, 1); 492 if (err < 0) { 493 dout("writepage setting page/mapping error %d %p\n", err, page); 494 SetPageError(page); 495 mapping_set_error(&inode->i_data, err); 496 if (wbc) 497 wbc->pages_skipped++; 498 } else { 499 dout("writepage cleaned page %p\n", page); 500 err = 0; /* vfs expects us to return 0 */ 501 } 502 page->private = 0; 503 ClearPagePrivate(page); 504 end_page_writeback(page); 505 ceph_put_wrbuffer_cap_refs(ci, 1, snapc); 506 ceph_put_snap_context(snapc); /* page's reference */ 507 out: 508 return err; 509 } 510 511 static int ceph_writepage(struct page *page, struct writeback_control *wbc) 512 { 513 int err; 514 struct inode *inode = page->mapping->host; 515 BUG_ON(!inode); 516 ihold(inode); 517 err = writepage_nounlock(page, wbc); 518 unlock_page(page); 519 iput(inode); 520 return err; 521 } 522 523 524 /* 525 * lame release_pages helper. release_pages() isn't exported to 526 * modules. 527 */ 528 static void ceph_release_pages(struct page **pages, int num) 529 { 530 struct pagevec pvec; 531 int i; 532 533 pagevec_init(&pvec, 0); 534 for (i = 0; i < num; i++) { 535 if (pagevec_add(&pvec, pages[i]) == 0) 536 pagevec_release(&pvec); 537 } 538 pagevec_release(&pvec); 539 } 540 541 542 /* 543 * async writeback completion handler. 544 * 545 * If we get an error, set the mapping error bit, but not the individual 546 * page error bits. 547 */ 548 static void writepages_finish(struct ceph_osd_request *req, 549 struct ceph_msg *msg) 550 { 551 struct inode *inode = req->r_inode; 552 struct ceph_inode_info *ci = ceph_inode(inode); 553 unsigned wrote; 554 struct page *page; 555 int i; 556 struct ceph_snap_context *snapc = req->r_snapc; 557 struct address_space *mapping = inode->i_mapping; 558 int rc = req->r_result; 559 u64 bytes = le64_to_cpu(req->r_request_ops[0].extent.length); 560 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 561 long writeback_stat; 562 unsigned issued = ceph_caps_issued(ci); 563 564 if (rc >= 0) { 565 /* 566 * Assume we wrote the pages we originally sent. The 567 * osd might reply with fewer pages if our writeback 568 * raced with a truncation and was adjusted at the osd, 569 * so don't believe the reply. 570 */ 571 wrote = req->r_data.num_pages; 572 } else { 573 wrote = 0; 574 mapping_set_error(mapping, rc); 575 } 576 dout("writepages_finish %p rc %d bytes %llu wrote %d (pages)\n", 577 inode, rc, bytes, wrote); 578 579 /* clean all pages */ 580 for (i = 0; i < req->r_data.num_pages; i++) { 581 page = req->r_data.pages[i]; 582 BUG_ON(!page); 583 WARN_ON(!PageUptodate(page)); 584 585 writeback_stat = 586 atomic_long_dec_return(&fsc->writeback_count); 587 if (writeback_stat < 588 CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb)) 589 clear_bdi_congested(&fsc->backing_dev_info, 590 BLK_RW_ASYNC); 591 592 ceph_put_snap_context(page_snap_context(page)); 593 page->private = 0; 594 ClearPagePrivate(page); 595 dout("unlocking %d %p\n", i, page); 596 end_page_writeback(page); 597 598 /* 599 * We lost the cache cap, need to truncate the page before 600 * it is unlocked, otherwise we'd truncate it later in the 601 * page truncation thread, possibly losing some data that 602 * raced its way in 603 */ 604 if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) 605 generic_error_remove_page(inode->i_mapping, page); 606 607 unlock_page(page); 608 } 609 dout("%p wrote+cleaned %d pages\n", inode, wrote); 610 ceph_put_wrbuffer_cap_refs(ci, req->r_data.num_pages, snapc); 611 612 ceph_release_pages(req->r_data.pages, req->r_data.num_pages); 613 if (req->r_data.pages_from_pool) 614 mempool_free(req->r_data.pages, 615 ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool); 616 else 617 kfree(req->r_data.pages); 618 ceph_osdc_put_request(req); 619 } 620 621 /* 622 * allocate a page vec, either directly, or if necessary, via a the 623 * mempool. we avoid the mempool if we can because req->r_data.num_pages 624 * may be less than the maximum write size. 625 */ 626 static void alloc_page_vec(struct ceph_fs_client *fsc, 627 struct ceph_osd_request *req) 628 { 629 req->r_data.pages = kmalloc(sizeof(struct page *) * req->r_data.num_pages, 630 GFP_NOFS); 631 if (!req->r_data.pages) { 632 req->r_data.pages = mempool_alloc(fsc->wb_pagevec_pool, GFP_NOFS); 633 req->r_data.pages_from_pool = 1; 634 WARN_ON(!req->r_data.pages); 635 } 636 } 637 638 /* 639 * initiate async writeback 640 */ 641 static int ceph_writepages_start(struct address_space *mapping, 642 struct writeback_control *wbc) 643 { 644 struct inode *inode = mapping->host; 645 struct ceph_inode_info *ci = ceph_inode(inode); 646 struct ceph_fs_client *fsc; 647 pgoff_t index, start, end; 648 int range_whole = 0; 649 int should_loop = 1; 650 pgoff_t max_pages = 0, max_pages_ever = 0; 651 struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc; 652 struct pagevec pvec; 653 int done = 0; 654 int rc = 0; 655 unsigned wsize = 1 << inode->i_blkbits; 656 struct ceph_osd_request *req = NULL; 657 int do_sync; 658 u64 snap_size = 0; 659 660 /* 661 * Include a 'sync' in the OSD request if this is a data 662 * integrity write (e.g., O_SYNC write or fsync()), or if our 663 * cap is being revoked. 664 */ 665 do_sync = wbc->sync_mode == WB_SYNC_ALL; 666 if (ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER)) 667 do_sync = 1; 668 dout("writepages_start %p dosync=%d (mode=%s)\n", 669 inode, do_sync, 670 wbc->sync_mode == WB_SYNC_NONE ? "NONE" : 671 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); 672 673 fsc = ceph_inode_to_client(inode); 674 if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) { 675 pr_warning("writepage_start %p on forced umount\n", inode); 676 return -EIO; /* we're in a forced umount, don't write! */ 677 } 678 if (fsc->mount_options->wsize && fsc->mount_options->wsize < wsize) 679 wsize = fsc->mount_options->wsize; 680 if (wsize < PAGE_CACHE_SIZE) 681 wsize = PAGE_CACHE_SIZE; 682 max_pages_ever = wsize >> PAGE_CACHE_SHIFT; 683 684 pagevec_init(&pvec, 0); 685 686 /* where to start/end? */ 687 if (wbc->range_cyclic) { 688 start = mapping->writeback_index; /* Start from prev offset */ 689 end = -1; 690 dout(" cyclic, start at %lu\n", start); 691 } else { 692 start = wbc->range_start >> PAGE_CACHE_SHIFT; 693 end = wbc->range_end >> PAGE_CACHE_SHIFT; 694 if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX) 695 range_whole = 1; 696 should_loop = 0; 697 dout(" not cyclic, %lu to %lu\n", start, end); 698 } 699 index = start; 700 701 retry: 702 /* find oldest snap context with dirty data */ 703 ceph_put_snap_context(snapc); 704 snapc = get_oldest_context(inode, &snap_size); 705 if (!snapc) { 706 /* hmm, why does writepages get called when there 707 is no dirty data? */ 708 dout(" no snap context with dirty data?\n"); 709 goto out; 710 } 711 dout(" oldest snapc is %p seq %lld (%d snaps)\n", 712 snapc, snapc->seq, snapc->num_snaps); 713 if (last_snapc && snapc != last_snapc) { 714 /* if we switched to a newer snapc, restart our scan at the 715 * start of the original file range. */ 716 dout(" snapc differs from last pass, restarting at %lu\n", 717 index); 718 index = start; 719 } 720 last_snapc = snapc; 721 722 while (!done && index <= end) { 723 unsigned i; 724 int first; 725 pgoff_t next; 726 int pvec_pages, locked_pages; 727 struct page *page; 728 int want; 729 u64 offset, len; 730 long writeback_stat; 731 732 next = 0; 733 locked_pages = 0; 734 max_pages = max_pages_ever; 735 736 get_more_pages: 737 first = -1; 738 want = min(end - index, 739 min((pgoff_t)PAGEVEC_SIZE, 740 max_pages - (pgoff_t)locked_pages) - 1) 741 + 1; 742 pvec_pages = pagevec_lookup_tag(&pvec, mapping, &index, 743 PAGECACHE_TAG_DIRTY, 744 want); 745 dout("pagevec_lookup_tag got %d\n", pvec_pages); 746 if (!pvec_pages && !locked_pages) 747 break; 748 for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) { 749 page = pvec.pages[i]; 750 dout("? %p idx %lu\n", page, page->index); 751 if (locked_pages == 0) 752 lock_page(page); /* first page */ 753 else if (!trylock_page(page)) 754 break; 755 756 /* only dirty pages, or our accounting breaks */ 757 if (unlikely(!PageDirty(page)) || 758 unlikely(page->mapping != mapping)) { 759 dout("!dirty or !mapping %p\n", page); 760 unlock_page(page); 761 break; 762 } 763 if (!wbc->range_cyclic && page->index > end) { 764 dout("end of range %p\n", page); 765 done = 1; 766 unlock_page(page); 767 break; 768 } 769 if (next && (page->index != next)) { 770 dout("not consecutive %p\n", page); 771 unlock_page(page); 772 break; 773 } 774 if (wbc->sync_mode != WB_SYNC_NONE) { 775 dout("waiting on writeback %p\n", page); 776 wait_on_page_writeback(page); 777 } 778 if ((snap_size && page_offset(page) > snap_size) || 779 (!snap_size && 780 page_offset(page) > i_size_read(inode))) { 781 dout("%p page eof %llu\n", page, snap_size ? 782 snap_size : i_size_read(inode)); 783 done = 1; 784 unlock_page(page); 785 break; 786 } 787 if (PageWriteback(page)) { 788 dout("%p under writeback\n", page); 789 unlock_page(page); 790 break; 791 } 792 793 /* only if matching snap context */ 794 pgsnapc = page_snap_context(page); 795 if (pgsnapc->seq > snapc->seq) { 796 dout("page snapc %p %lld > oldest %p %lld\n", 797 pgsnapc, pgsnapc->seq, snapc, snapc->seq); 798 unlock_page(page); 799 if (!locked_pages) 800 continue; /* keep looking for snap */ 801 break; 802 } 803 804 if (!clear_page_dirty_for_io(page)) { 805 dout("%p !clear_page_dirty_for_io\n", page); 806 unlock_page(page); 807 break; 808 } 809 810 /* ok */ 811 if (locked_pages == 0) { 812 /* prepare async write request */ 813 offset = (u64) page_offset(page); 814 len = wsize; 815 req = ceph_osdc_new_request(&fsc->client->osdc, 816 &ci->i_layout, 817 ceph_vino(inode), 818 offset, &len, 819 CEPH_OSD_OP_WRITE, 820 CEPH_OSD_FLAG_WRITE | 821 CEPH_OSD_FLAG_ONDISK, 822 snapc, do_sync, 823 ci->i_truncate_seq, 824 ci->i_truncate_size, 825 &inode->i_mtime, true); 826 827 if (IS_ERR(req)) { 828 rc = PTR_ERR(req); 829 unlock_page(page); 830 break; 831 } 832 833 req->r_data.num_pages = calc_pages_for(0, len); 834 req->r_data.alignment = 0; 835 max_pages = req->r_data.num_pages; 836 837 alloc_page_vec(fsc, req); 838 req->r_callback = writepages_finish; 839 req->r_inode = inode; 840 } 841 842 /* note position of first page in pvec */ 843 if (first < 0) 844 first = i; 845 dout("%p will write page %p idx %lu\n", 846 inode, page, page->index); 847 848 writeback_stat = 849 atomic_long_inc_return(&fsc->writeback_count); 850 if (writeback_stat > CONGESTION_ON_THRESH( 851 fsc->mount_options->congestion_kb)) { 852 set_bdi_congested(&fsc->backing_dev_info, 853 BLK_RW_ASYNC); 854 } 855 856 set_page_writeback(page); 857 req->r_data.pages[locked_pages] = page; 858 locked_pages++; 859 next = page->index + 1; 860 } 861 862 /* did we get anything? */ 863 if (!locked_pages) 864 goto release_pvec_pages; 865 if (i) { 866 int j; 867 BUG_ON(!locked_pages || first < 0); 868 869 if (pvec_pages && i == pvec_pages && 870 locked_pages < max_pages) { 871 dout("reached end pvec, trying for more\n"); 872 pagevec_reinit(&pvec); 873 goto get_more_pages; 874 } 875 876 /* shift unused pages over in the pvec... we 877 * will need to release them below. */ 878 for (j = i; j < pvec_pages; j++) { 879 dout(" pvec leftover page %p\n", 880 pvec.pages[j]); 881 pvec.pages[j-i+first] = pvec.pages[j]; 882 } 883 pvec.nr -= i-first; 884 } 885 886 /* submit the write */ 887 offset = req->r_data.pages[0]->index << PAGE_CACHE_SHIFT; 888 len = min((snap_size ? snap_size : i_size_read(inode)) - offset, 889 (u64)locked_pages << PAGE_CACHE_SHIFT); 890 dout("writepages got %d pages at %llu~%llu\n", 891 locked_pages, offset, len); 892 893 /* revise final length, page count */ 894 req->r_data.num_pages = locked_pages; 895 req->r_request_ops[0].extent.length = cpu_to_le64(len); 896 req->r_request_ops[0].payload_len = cpu_to_le32(len); 897 req->r_request->hdr.data_len = cpu_to_le32(len); 898 899 rc = ceph_osdc_start_request(&fsc->client->osdc, req, true); 900 BUG_ON(rc); 901 req = NULL; 902 903 /* continue? */ 904 index = next; 905 wbc->nr_to_write -= locked_pages; 906 if (wbc->nr_to_write <= 0) 907 done = 1; 908 909 release_pvec_pages: 910 dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr, 911 pvec.nr ? pvec.pages[0] : NULL); 912 pagevec_release(&pvec); 913 914 if (locked_pages && !done) 915 goto retry; 916 } 917 918 if (should_loop && !done) { 919 /* more to do; loop back to beginning of file */ 920 dout("writepages looping back to beginning of file\n"); 921 should_loop = 0; 922 index = 0; 923 goto retry; 924 } 925 926 if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) 927 mapping->writeback_index = index; 928 929 out: 930 if (req) 931 ceph_osdc_put_request(req); 932 ceph_put_snap_context(snapc); 933 dout("writepages done, rc = %d\n", rc); 934 return rc; 935 } 936 937 938 939 /* 940 * See if a given @snapc is either writeable, or already written. 941 */ 942 static int context_is_writeable_or_written(struct inode *inode, 943 struct ceph_snap_context *snapc) 944 { 945 struct ceph_snap_context *oldest = get_oldest_context(inode, NULL); 946 int ret = !oldest || snapc->seq <= oldest->seq; 947 948 ceph_put_snap_context(oldest); 949 return ret; 950 } 951 952 /* 953 * We are only allowed to write into/dirty the page if the page is 954 * clean, or already dirty within the same snap context. 955 * 956 * called with page locked. 957 * return success with page locked, 958 * or any failure (incl -EAGAIN) with page unlocked. 959 */ 960 static int ceph_update_writeable_page(struct file *file, 961 loff_t pos, unsigned len, 962 struct page *page) 963 { 964 struct inode *inode = file_inode(file); 965 struct ceph_inode_info *ci = ceph_inode(inode); 966 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 967 loff_t page_off = pos & PAGE_CACHE_MASK; 968 int pos_in_page = pos & ~PAGE_CACHE_MASK; 969 int end_in_page = pos_in_page + len; 970 loff_t i_size; 971 int r; 972 struct ceph_snap_context *snapc, *oldest; 973 974 retry_locked: 975 /* writepages currently holds page lock, but if we change that later, */ 976 wait_on_page_writeback(page); 977 978 /* check snap context */ 979 BUG_ON(!ci->i_snap_realm); 980 down_read(&mdsc->snap_rwsem); 981 BUG_ON(!ci->i_snap_realm->cached_context); 982 snapc = page_snap_context(page); 983 if (snapc && snapc != ci->i_head_snapc) { 984 /* 985 * this page is already dirty in another (older) snap 986 * context! is it writeable now? 987 */ 988 oldest = get_oldest_context(inode, NULL); 989 up_read(&mdsc->snap_rwsem); 990 991 if (snapc->seq > oldest->seq) { 992 ceph_put_snap_context(oldest); 993 dout(" page %p snapc %p not current or oldest\n", 994 page, snapc); 995 /* 996 * queue for writeback, and wait for snapc to 997 * be writeable or written 998 */ 999 snapc = ceph_get_snap_context(snapc); 1000 unlock_page(page); 1001 ceph_queue_writeback(inode); 1002 r = wait_event_interruptible(ci->i_cap_wq, 1003 context_is_writeable_or_written(inode, snapc)); 1004 ceph_put_snap_context(snapc); 1005 if (r == -ERESTARTSYS) 1006 return r; 1007 return -EAGAIN; 1008 } 1009 ceph_put_snap_context(oldest); 1010 1011 /* yay, writeable, do it now (without dropping page lock) */ 1012 dout(" page %p snapc %p not current, but oldest\n", 1013 page, snapc); 1014 if (!clear_page_dirty_for_io(page)) 1015 goto retry_locked; 1016 r = writepage_nounlock(page, NULL); 1017 if (r < 0) 1018 goto fail_nosnap; 1019 goto retry_locked; 1020 } 1021 1022 if (PageUptodate(page)) { 1023 dout(" page %p already uptodate\n", page); 1024 return 0; 1025 } 1026 1027 /* full page? */ 1028 if (pos_in_page == 0 && len == PAGE_CACHE_SIZE) 1029 return 0; 1030 1031 /* past end of file? */ 1032 i_size = inode->i_size; /* caller holds i_mutex */ 1033 1034 if (i_size + len > inode->i_sb->s_maxbytes) { 1035 /* file is too big */ 1036 r = -EINVAL; 1037 goto fail; 1038 } 1039 1040 if (page_off >= i_size || 1041 (pos_in_page == 0 && (pos+len) >= i_size && 1042 end_in_page - pos_in_page != PAGE_CACHE_SIZE)) { 1043 dout(" zeroing %p 0 - %d and %d - %d\n", 1044 page, pos_in_page, end_in_page, (int)PAGE_CACHE_SIZE); 1045 zero_user_segments(page, 1046 0, pos_in_page, 1047 end_in_page, PAGE_CACHE_SIZE); 1048 return 0; 1049 } 1050 1051 /* we need to read it. */ 1052 up_read(&mdsc->snap_rwsem); 1053 r = readpage_nounlock(file, page); 1054 if (r < 0) 1055 goto fail_nosnap; 1056 goto retry_locked; 1057 1058 fail: 1059 up_read(&mdsc->snap_rwsem); 1060 fail_nosnap: 1061 unlock_page(page); 1062 return r; 1063 } 1064 1065 /* 1066 * We are only allowed to write into/dirty the page if the page is 1067 * clean, or already dirty within the same snap context. 1068 */ 1069 static int ceph_write_begin(struct file *file, struct address_space *mapping, 1070 loff_t pos, unsigned len, unsigned flags, 1071 struct page **pagep, void **fsdata) 1072 { 1073 struct inode *inode = file_inode(file); 1074 struct page *page; 1075 pgoff_t index = pos >> PAGE_CACHE_SHIFT; 1076 int r; 1077 1078 do { 1079 /* get a page */ 1080 page = grab_cache_page_write_begin(mapping, index, 0); 1081 if (!page) 1082 return -ENOMEM; 1083 *pagep = page; 1084 1085 dout("write_begin file %p inode %p page %p %d~%d\n", file, 1086 inode, page, (int)pos, (int)len); 1087 1088 r = ceph_update_writeable_page(file, pos, len, page); 1089 } while (r == -EAGAIN); 1090 1091 return r; 1092 } 1093 1094 /* 1095 * we don't do anything in here that simple_write_end doesn't do 1096 * except adjust dirty page accounting and drop read lock on 1097 * mdsc->snap_rwsem. 1098 */ 1099 static int ceph_write_end(struct file *file, struct address_space *mapping, 1100 loff_t pos, unsigned len, unsigned copied, 1101 struct page *page, void *fsdata) 1102 { 1103 struct inode *inode = file_inode(file); 1104 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 1105 struct ceph_mds_client *mdsc = fsc->mdsc; 1106 unsigned from = pos & (PAGE_CACHE_SIZE - 1); 1107 int check_cap = 0; 1108 1109 dout("write_end file %p inode %p page %p %d~%d (%d)\n", file, 1110 inode, page, (int)pos, (int)copied, (int)len); 1111 1112 /* zero the stale part of the page if we did a short copy */ 1113 if (copied < len) 1114 zero_user_segment(page, from+copied, len); 1115 1116 /* did file size increase? */ 1117 /* (no need for i_size_read(); we caller holds i_mutex */ 1118 if (pos+copied > inode->i_size) 1119 check_cap = ceph_inode_set_size(inode, pos+copied); 1120 1121 if (!PageUptodate(page)) 1122 SetPageUptodate(page); 1123 1124 set_page_dirty(page); 1125 1126 unlock_page(page); 1127 up_read(&mdsc->snap_rwsem); 1128 page_cache_release(page); 1129 1130 if (check_cap) 1131 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL); 1132 1133 return copied; 1134 } 1135 1136 /* 1137 * we set .direct_IO to indicate direct io is supported, but since we 1138 * intercept O_DIRECT reads and writes early, this function should 1139 * never get called. 1140 */ 1141 static ssize_t ceph_direct_io(int rw, struct kiocb *iocb, 1142 const struct iovec *iov, 1143 loff_t pos, unsigned long nr_segs) 1144 { 1145 WARN_ON(1); 1146 return -EINVAL; 1147 } 1148 1149 const struct address_space_operations ceph_aops = { 1150 .readpage = ceph_readpage, 1151 .readpages = ceph_readpages, 1152 .writepage = ceph_writepage, 1153 .writepages = ceph_writepages_start, 1154 .write_begin = ceph_write_begin, 1155 .write_end = ceph_write_end, 1156 .set_page_dirty = ceph_set_page_dirty, 1157 .invalidatepage = ceph_invalidatepage, 1158 .releasepage = ceph_releasepage, 1159 .direct_IO = ceph_direct_io, 1160 }; 1161 1162 1163 /* 1164 * vm ops 1165 */ 1166 1167 /* 1168 * Reuse write_begin here for simplicity. 1169 */ 1170 static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) 1171 { 1172 struct inode *inode = file_inode(vma->vm_file); 1173 struct page *page = vmf->page; 1174 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 1175 loff_t off = page_offset(page); 1176 loff_t size, len; 1177 int ret; 1178 1179 /* Update time before taking page lock */ 1180 file_update_time(vma->vm_file); 1181 1182 size = i_size_read(inode); 1183 if (off + PAGE_CACHE_SIZE <= size) 1184 len = PAGE_CACHE_SIZE; 1185 else 1186 len = size & ~PAGE_CACHE_MASK; 1187 1188 dout("page_mkwrite %p %llu~%llu page %p idx %lu\n", inode, 1189 off, len, page, page->index); 1190 1191 lock_page(page); 1192 1193 ret = VM_FAULT_NOPAGE; 1194 if ((off > size) || 1195 (page->mapping != inode->i_mapping)) 1196 goto out; 1197 1198 ret = ceph_update_writeable_page(vma->vm_file, off, len, page); 1199 if (ret == 0) { 1200 /* success. we'll keep the page locked. */ 1201 set_page_dirty(page); 1202 up_read(&mdsc->snap_rwsem); 1203 ret = VM_FAULT_LOCKED; 1204 } else { 1205 if (ret == -ENOMEM) 1206 ret = VM_FAULT_OOM; 1207 else 1208 ret = VM_FAULT_SIGBUS; 1209 } 1210 out: 1211 dout("page_mkwrite %p %llu~%llu = %d\n", inode, off, len, ret); 1212 if (ret != VM_FAULT_LOCKED) 1213 unlock_page(page); 1214 return ret; 1215 } 1216 1217 static struct vm_operations_struct ceph_vmops = { 1218 .fault = filemap_fault, 1219 .page_mkwrite = ceph_page_mkwrite, 1220 .remap_pages = generic_file_remap_pages, 1221 }; 1222 1223 int ceph_mmap(struct file *file, struct vm_area_struct *vma) 1224 { 1225 struct address_space *mapping = file->f_mapping; 1226 1227 if (!mapping->a_ops->readpage) 1228 return -ENOEXEC; 1229 file_accessed(file); 1230 vma->vm_ops = &ceph_vmops; 1231 return 0; 1232 } 1233