1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 #include <linux/ceph/striper.h> 4 5 #include <linux/module.h> 6 #include <linux/sched.h> 7 #include <linux/slab.h> 8 #include <linux/file.h> 9 #include <linux/mount.h> 10 #include <linux/namei.h> 11 #include <linux/writeback.h> 12 #include <linux/falloc.h> 13 14 #include "super.h" 15 #include "mds_client.h" 16 #include "cache.h" 17 18 static __le32 ceph_flags_sys2wire(u32 flags) 19 { 20 u32 wire_flags = 0; 21 22 switch (flags & O_ACCMODE) { 23 case O_RDONLY: 24 wire_flags |= CEPH_O_RDONLY; 25 break; 26 case O_WRONLY: 27 wire_flags |= CEPH_O_WRONLY; 28 break; 29 case O_RDWR: 30 wire_flags |= CEPH_O_RDWR; 31 break; 32 } 33 34 flags &= ~O_ACCMODE; 35 36 #define ceph_sys2wire(a) if (flags & a) { wire_flags |= CEPH_##a; flags &= ~a; } 37 38 ceph_sys2wire(O_CREAT); 39 ceph_sys2wire(O_EXCL); 40 ceph_sys2wire(O_TRUNC); 41 ceph_sys2wire(O_DIRECTORY); 42 ceph_sys2wire(O_NOFOLLOW); 43 44 #undef ceph_sys2wire 45 46 if (flags) 47 dout("unused open flags: %x\n", flags); 48 49 return cpu_to_le32(wire_flags); 50 } 51 52 /* 53 * Ceph file operations 54 * 55 * Implement basic open/close functionality, and implement 56 * read/write. 57 * 58 * We implement three modes of file I/O: 59 * - buffered uses the generic_file_aio_{read,write} helpers 60 * 61 * - synchronous is used when there is multi-client read/write 62 * sharing, avoids the page cache, and synchronously waits for an 63 * ack from the OSD. 64 * 65 * - direct io takes the variant of the sync path that references 66 * user pages directly. 67 * 68 * fsync() flushes and waits on dirty pages, but just queues metadata 69 * for writeback: since the MDS can recover size and mtime there is no 70 * need to wait for MDS acknowledgement. 71 */ 72 73 /* 74 * How many pages to get in one call to iov_iter_get_pages(). This 75 * determines the size of the on-stack array used as a buffer. 76 */ 77 #define ITER_GET_BVECS_PAGES 64 78 79 static ssize_t __iter_get_bvecs(struct iov_iter *iter, size_t maxsize, 80 struct bio_vec *bvecs) 81 { 82 size_t size = 0; 83 int bvec_idx = 0; 84 85 if (maxsize > iov_iter_count(iter)) 86 maxsize = iov_iter_count(iter); 87 88 while (size < maxsize) { 89 struct page *pages[ITER_GET_BVECS_PAGES]; 90 ssize_t bytes; 91 size_t start; 92 int idx = 0; 93 94 bytes = iov_iter_get_pages(iter, pages, maxsize - size, 95 ITER_GET_BVECS_PAGES, &start); 96 if (bytes < 0) 97 return size ?: bytes; 98 99 iov_iter_advance(iter, bytes); 100 size += bytes; 101 102 for ( ; bytes; idx++, bvec_idx++) { 103 struct bio_vec bv = { 104 .bv_page = pages[idx], 105 .bv_len = min_t(int, bytes, PAGE_SIZE - start), 106 .bv_offset = start, 107 }; 108 109 bvecs[bvec_idx] = bv; 110 bytes -= bv.bv_len; 111 start = 0; 112 } 113 } 114 115 return size; 116 } 117 118 /* 119 * iov_iter_get_pages() only considers one iov_iter segment, no matter 120 * what maxsize or maxpages are given. For ITER_BVEC that is a single 121 * page. 122 * 123 * Attempt to get up to @maxsize bytes worth of pages from @iter. 124 * Return the number of bytes in the created bio_vec array, or an error. 125 */ 126 static ssize_t iter_get_bvecs_alloc(struct iov_iter *iter, size_t maxsize, 127 struct bio_vec **bvecs, int *num_bvecs) 128 { 129 struct bio_vec *bv; 130 size_t orig_count = iov_iter_count(iter); 131 ssize_t bytes; 132 int npages; 133 134 iov_iter_truncate(iter, maxsize); 135 npages = iov_iter_npages(iter, INT_MAX); 136 iov_iter_reexpand(iter, orig_count); 137 138 /* 139 * __iter_get_bvecs() may populate only part of the array -- zero it 140 * out. 141 */ 142 bv = kvmalloc_array(npages, sizeof(*bv), GFP_KERNEL | __GFP_ZERO); 143 if (!bv) 144 return -ENOMEM; 145 146 bytes = __iter_get_bvecs(iter, maxsize, bv); 147 if (bytes < 0) { 148 /* 149 * No pages were pinned -- just free the array. 150 */ 151 kvfree(bv); 152 return bytes; 153 } 154 155 *bvecs = bv; 156 *num_bvecs = npages; 157 return bytes; 158 } 159 160 static void put_bvecs(struct bio_vec *bvecs, int num_bvecs, bool should_dirty) 161 { 162 int i; 163 164 for (i = 0; i < num_bvecs; i++) { 165 if (bvecs[i].bv_page) { 166 if (should_dirty) 167 set_page_dirty_lock(bvecs[i].bv_page); 168 put_page(bvecs[i].bv_page); 169 } 170 } 171 kvfree(bvecs); 172 } 173 174 /* 175 * Prepare an open request. Preallocate ceph_cap to avoid an 176 * inopportune ENOMEM later. 177 */ 178 static struct ceph_mds_request * 179 prepare_open_request(struct super_block *sb, int flags, int create_mode) 180 { 181 struct ceph_fs_client *fsc = ceph_sb_to_client(sb); 182 struct ceph_mds_client *mdsc = fsc->mdsc; 183 struct ceph_mds_request *req; 184 int want_auth = USE_ANY_MDS; 185 int op = (flags & O_CREAT) ? CEPH_MDS_OP_CREATE : CEPH_MDS_OP_OPEN; 186 187 if (flags & (O_WRONLY|O_RDWR|O_CREAT|O_TRUNC)) 188 want_auth = USE_AUTH_MDS; 189 190 req = ceph_mdsc_create_request(mdsc, op, want_auth); 191 if (IS_ERR(req)) 192 goto out; 193 req->r_fmode = ceph_flags_to_mode(flags); 194 req->r_args.open.flags = ceph_flags_sys2wire(flags); 195 req->r_args.open.mode = cpu_to_le32(create_mode); 196 out: 197 return req; 198 } 199 200 static int ceph_init_file_info(struct inode *inode, struct file *file, 201 int fmode, bool isdir) 202 { 203 struct ceph_file_info *fi; 204 205 dout("%s %p %p 0%o (%s)\n", __func__, inode, file, 206 inode->i_mode, isdir ? "dir" : "regular"); 207 BUG_ON(inode->i_fop->release != ceph_release); 208 209 if (isdir) { 210 struct ceph_dir_file_info *dfi = 211 kmem_cache_zalloc(ceph_dir_file_cachep, GFP_KERNEL); 212 if (!dfi) { 213 ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */ 214 return -ENOMEM; 215 } 216 217 file->private_data = dfi; 218 fi = &dfi->file_info; 219 dfi->next_offset = 2; 220 dfi->readdir_cache_idx = -1; 221 } else { 222 fi = kmem_cache_zalloc(ceph_file_cachep, GFP_KERNEL); 223 if (!fi) { 224 ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */ 225 return -ENOMEM; 226 } 227 228 file->private_data = fi; 229 } 230 231 fi->fmode = fmode; 232 spin_lock_init(&fi->rw_contexts_lock); 233 INIT_LIST_HEAD(&fi->rw_contexts); 234 235 return 0; 236 } 237 238 /* 239 * initialize private struct file data. 240 * if we fail, clean up by dropping fmode reference on the ceph_inode 241 */ 242 static int ceph_init_file(struct inode *inode, struct file *file, int fmode) 243 { 244 int ret = 0; 245 246 switch (inode->i_mode & S_IFMT) { 247 case S_IFREG: 248 ceph_fscache_register_inode_cookie(inode); 249 ceph_fscache_file_set_cookie(inode, file); 250 /* fall through */ 251 case S_IFDIR: 252 ret = ceph_init_file_info(inode, file, fmode, 253 S_ISDIR(inode->i_mode)); 254 if (ret) 255 return ret; 256 break; 257 258 case S_IFLNK: 259 dout("init_file %p %p 0%o (symlink)\n", inode, file, 260 inode->i_mode); 261 ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */ 262 break; 263 264 default: 265 dout("init_file %p %p 0%o (special)\n", inode, file, 266 inode->i_mode); 267 /* 268 * we need to drop the open ref now, since we don't 269 * have .release set to ceph_release. 270 */ 271 ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */ 272 BUG_ON(inode->i_fop->release == ceph_release); 273 274 /* call the proper open fop */ 275 ret = inode->i_fop->open(inode, file); 276 } 277 return ret; 278 } 279 280 /* 281 * try renew caps after session gets killed. 282 */ 283 int ceph_renew_caps(struct inode *inode) 284 { 285 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 286 struct ceph_inode_info *ci = ceph_inode(inode); 287 struct ceph_mds_request *req; 288 int err, flags, wanted; 289 290 spin_lock(&ci->i_ceph_lock); 291 wanted = __ceph_caps_file_wanted(ci); 292 if (__ceph_is_any_real_caps(ci) && 293 (!(wanted & CEPH_CAP_ANY_WR) || ci->i_auth_cap)) { 294 int issued = __ceph_caps_issued(ci, NULL); 295 spin_unlock(&ci->i_ceph_lock); 296 dout("renew caps %p want %s issued %s updating mds_wanted\n", 297 inode, ceph_cap_string(wanted), ceph_cap_string(issued)); 298 ceph_check_caps(ci, 0, NULL); 299 return 0; 300 } 301 spin_unlock(&ci->i_ceph_lock); 302 303 flags = 0; 304 if ((wanted & CEPH_CAP_FILE_RD) && (wanted & CEPH_CAP_FILE_WR)) 305 flags = O_RDWR; 306 else if (wanted & CEPH_CAP_FILE_RD) 307 flags = O_RDONLY; 308 else if (wanted & CEPH_CAP_FILE_WR) 309 flags = O_WRONLY; 310 #ifdef O_LAZY 311 if (wanted & CEPH_CAP_FILE_LAZYIO) 312 flags |= O_LAZY; 313 #endif 314 315 req = prepare_open_request(inode->i_sb, flags, 0); 316 if (IS_ERR(req)) { 317 err = PTR_ERR(req); 318 goto out; 319 } 320 321 req->r_inode = inode; 322 ihold(inode); 323 req->r_num_caps = 1; 324 req->r_fmode = -1; 325 326 err = ceph_mdsc_do_request(mdsc, NULL, req); 327 ceph_mdsc_put_request(req); 328 out: 329 dout("renew caps %p open result=%d\n", inode, err); 330 return err < 0 ? err : 0; 331 } 332 333 /* 334 * If we already have the requisite capabilities, we can satisfy 335 * the open request locally (no need to request new caps from the 336 * MDS). We do, however, need to inform the MDS (asynchronously) 337 * if our wanted caps set expands. 338 */ 339 int ceph_open(struct inode *inode, struct file *file) 340 { 341 struct ceph_inode_info *ci = ceph_inode(inode); 342 struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb); 343 struct ceph_mds_client *mdsc = fsc->mdsc; 344 struct ceph_mds_request *req; 345 struct ceph_file_info *fi = file->private_data; 346 int err; 347 int flags, fmode, wanted; 348 349 if (fi) { 350 dout("open file %p is already opened\n", file); 351 return 0; 352 } 353 354 /* filter out O_CREAT|O_EXCL; vfs did that already. yuck. */ 355 flags = file->f_flags & ~(O_CREAT|O_EXCL); 356 if (S_ISDIR(inode->i_mode)) 357 flags = O_DIRECTORY; /* mds likes to know */ 358 359 dout("open inode %p ino %llx.%llx file %p flags %d (%d)\n", inode, 360 ceph_vinop(inode), file, flags, file->f_flags); 361 fmode = ceph_flags_to_mode(flags); 362 wanted = ceph_caps_for_mode(fmode); 363 364 /* snapped files are read-only */ 365 if (ceph_snap(inode) != CEPH_NOSNAP && (file->f_mode & FMODE_WRITE)) 366 return -EROFS; 367 368 /* trivially open snapdir */ 369 if (ceph_snap(inode) == CEPH_SNAPDIR) { 370 spin_lock(&ci->i_ceph_lock); 371 __ceph_get_fmode(ci, fmode); 372 spin_unlock(&ci->i_ceph_lock); 373 return ceph_init_file(inode, file, fmode); 374 } 375 376 /* 377 * No need to block if we have caps on the auth MDS (for 378 * write) or any MDS (for read). Update wanted set 379 * asynchronously. 380 */ 381 spin_lock(&ci->i_ceph_lock); 382 if (__ceph_is_any_real_caps(ci) && 383 (((fmode & CEPH_FILE_MODE_WR) == 0) || ci->i_auth_cap)) { 384 int mds_wanted = __ceph_caps_mds_wanted(ci, true); 385 int issued = __ceph_caps_issued(ci, NULL); 386 387 dout("open %p fmode %d want %s issued %s using existing\n", 388 inode, fmode, ceph_cap_string(wanted), 389 ceph_cap_string(issued)); 390 __ceph_get_fmode(ci, fmode); 391 spin_unlock(&ci->i_ceph_lock); 392 393 /* adjust wanted? */ 394 if ((issued & wanted) != wanted && 395 (mds_wanted & wanted) != wanted && 396 ceph_snap(inode) != CEPH_SNAPDIR) 397 ceph_check_caps(ci, 0, NULL); 398 399 return ceph_init_file(inode, file, fmode); 400 } else if (ceph_snap(inode) != CEPH_NOSNAP && 401 (ci->i_snap_caps & wanted) == wanted) { 402 __ceph_get_fmode(ci, fmode); 403 spin_unlock(&ci->i_ceph_lock); 404 return ceph_init_file(inode, file, fmode); 405 } 406 407 spin_unlock(&ci->i_ceph_lock); 408 409 dout("open fmode %d wants %s\n", fmode, ceph_cap_string(wanted)); 410 req = prepare_open_request(inode->i_sb, flags, 0); 411 if (IS_ERR(req)) { 412 err = PTR_ERR(req); 413 goto out; 414 } 415 req->r_inode = inode; 416 ihold(inode); 417 418 req->r_num_caps = 1; 419 err = ceph_mdsc_do_request(mdsc, NULL, req); 420 if (!err) 421 err = ceph_init_file(inode, file, req->r_fmode); 422 ceph_mdsc_put_request(req); 423 dout("open result=%d on %llx.%llx\n", err, ceph_vinop(inode)); 424 out: 425 return err; 426 } 427 428 429 /* 430 * Do a lookup + open with a single request. If we get a non-existent 431 * file or symlink, return 1 so the VFS can retry. 432 */ 433 int ceph_atomic_open(struct inode *dir, struct dentry *dentry, 434 struct file *file, unsigned flags, umode_t mode) 435 { 436 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 437 struct ceph_mds_client *mdsc = fsc->mdsc; 438 struct ceph_mds_request *req; 439 struct dentry *dn; 440 struct ceph_acls_info acls = {}; 441 int mask; 442 int err; 443 444 dout("atomic_open %p dentry %p '%pd' %s flags %d mode 0%o\n", 445 dir, dentry, dentry, 446 d_unhashed(dentry) ? "unhashed" : "hashed", flags, mode); 447 448 if (dentry->d_name.len > NAME_MAX) 449 return -ENAMETOOLONG; 450 451 if (flags & O_CREAT) { 452 if (ceph_quota_is_max_files_exceeded(dir)) 453 return -EDQUOT; 454 err = ceph_pre_init_acls(dir, &mode, &acls); 455 if (err < 0) 456 return err; 457 } 458 459 /* do the open */ 460 req = prepare_open_request(dir->i_sb, flags, mode); 461 if (IS_ERR(req)) { 462 err = PTR_ERR(req); 463 goto out_acl; 464 } 465 req->r_dentry = dget(dentry); 466 req->r_num_caps = 2; 467 if (flags & O_CREAT) { 468 req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; 469 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 470 if (acls.pagelist) { 471 req->r_pagelist = acls.pagelist; 472 acls.pagelist = NULL; 473 } 474 } 475 476 mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED; 477 if (ceph_security_xattr_wanted(dir)) 478 mask |= CEPH_CAP_XATTR_SHARED; 479 req->r_args.open.mask = cpu_to_le32(mask); 480 481 req->r_parent = dir; 482 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 483 err = ceph_mdsc_do_request(mdsc, 484 (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, 485 req); 486 err = ceph_handle_snapdir(req, dentry, err); 487 if (err) 488 goto out_req; 489 490 if ((flags & O_CREAT) && !req->r_reply_info.head->is_dentry) 491 err = ceph_handle_notrace_create(dir, dentry); 492 493 if (d_in_lookup(dentry)) { 494 dn = ceph_finish_lookup(req, dentry, err); 495 if (IS_ERR(dn)) 496 err = PTR_ERR(dn); 497 } else { 498 /* we were given a hashed negative dentry */ 499 dn = NULL; 500 } 501 if (err) 502 goto out_req; 503 if (dn || d_really_is_negative(dentry) || d_is_symlink(dentry)) { 504 /* make vfs retry on splice, ENOENT, or symlink */ 505 dout("atomic_open finish_no_open on dn %p\n", dn); 506 err = finish_no_open(file, dn); 507 } else { 508 dout("atomic_open finish_open on dn %p\n", dn); 509 if (req->r_op == CEPH_MDS_OP_CREATE && req->r_reply_info.has_create_ino) { 510 ceph_init_inode_acls(d_inode(dentry), &acls); 511 file->f_mode |= FMODE_CREATED; 512 } 513 err = finish_open(file, dentry, ceph_open); 514 } 515 out_req: 516 if (!req->r_err && req->r_target_inode) 517 ceph_put_fmode(ceph_inode(req->r_target_inode), req->r_fmode); 518 ceph_mdsc_put_request(req); 519 out_acl: 520 ceph_release_acls_info(&acls); 521 dout("atomic_open result=%d\n", err); 522 return err; 523 } 524 525 int ceph_release(struct inode *inode, struct file *file) 526 { 527 struct ceph_inode_info *ci = ceph_inode(inode); 528 529 if (S_ISDIR(inode->i_mode)) { 530 struct ceph_dir_file_info *dfi = file->private_data; 531 dout("release inode %p dir file %p\n", inode, file); 532 WARN_ON(!list_empty(&dfi->file_info.rw_contexts)); 533 534 ceph_put_fmode(ci, dfi->file_info.fmode); 535 536 if (dfi->last_readdir) 537 ceph_mdsc_put_request(dfi->last_readdir); 538 kfree(dfi->last_name); 539 kfree(dfi->dir_info); 540 kmem_cache_free(ceph_dir_file_cachep, dfi); 541 } else { 542 struct ceph_file_info *fi = file->private_data; 543 dout("release inode %p regular file %p\n", inode, file); 544 WARN_ON(!list_empty(&fi->rw_contexts)); 545 546 ceph_put_fmode(ci, fi->fmode); 547 kmem_cache_free(ceph_file_cachep, fi); 548 } 549 550 /* wake up anyone waiting for caps on this inode */ 551 wake_up_all(&ci->i_cap_wq); 552 return 0; 553 } 554 555 enum { 556 HAVE_RETRIED = 1, 557 CHECK_EOF = 2, 558 READ_INLINE = 3, 559 }; 560 561 /* 562 * Completely synchronous read and write methods. Direct from __user 563 * buffer to osd, or directly to user pages (if O_DIRECT). 564 * 565 * If the read spans object boundary, just do multiple reads. (That's not 566 * atomic, but good enough for now.) 567 * 568 * If we get a short result from the OSD, check against i_size; we need to 569 * only return a short read to the caller if we hit EOF. 570 */ 571 static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, 572 int *retry_op) 573 { 574 struct file *file = iocb->ki_filp; 575 struct inode *inode = file_inode(file); 576 struct ceph_inode_info *ci = ceph_inode(inode); 577 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 578 struct ceph_osd_client *osdc = &fsc->client->osdc; 579 ssize_t ret; 580 u64 off = iocb->ki_pos; 581 u64 len = iov_iter_count(to); 582 583 dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len, 584 (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); 585 586 if (!len) 587 return 0; 588 /* 589 * flush any page cache pages in this range. this 590 * will make concurrent normal and sync io slow, 591 * but it will at least behave sensibly when they are 592 * in sequence. 593 */ 594 ret = filemap_write_and_wait_range(inode->i_mapping, 595 off, off + len - 1); 596 if (ret < 0) 597 return ret; 598 599 ret = 0; 600 while ((len = iov_iter_count(to)) > 0) { 601 struct ceph_osd_request *req; 602 struct page **pages; 603 int num_pages; 604 size_t page_off; 605 u64 i_size; 606 bool more; 607 608 req = ceph_osdc_new_request(osdc, &ci->i_layout, 609 ci->i_vino, off, &len, 0, 1, 610 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 611 NULL, ci->i_truncate_seq, 612 ci->i_truncate_size, false); 613 if (IS_ERR(req)) { 614 ret = PTR_ERR(req); 615 break; 616 } 617 618 more = len < iov_iter_count(to); 619 620 if (unlikely(iov_iter_is_pipe(to))) { 621 ret = iov_iter_get_pages_alloc(to, &pages, len, 622 &page_off); 623 if (ret <= 0) { 624 ceph_osdc_put_request(req); 625 ret = -ENOMEM; 626 break; 627 } 628 num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE); 629 if (ret < len) { 630 len = ret; 631 osd_req_op_extent_update(req, 0, len); 632 more = false; 633 } 634 } else { 635 num_pages = calc_pages_for(off, len); 636 page_off = off & ~PAGE_MASK; 637 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 638 if (IS_ERR(pages)) { 639 ceph_osdc_put_request(req); 640 ret = PTR_ERR(pages); 641 break; 642 } 643 } 644 645 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off, 646 false, false); 647 ret = ceph_osdc_start_request(osdc, req, false); 648 if (!ret) 649 ret = ceph_osdc_wait_request(osdc, req); 650 ceph_osdc_put_request(req); 651 652 i_size = i_size_read(inode); 653 dout("sync_read %llu~%llu got %zd i_size %llu%s\n", 654 off, len, ret, i_size, (more ? " MORE" : "")); 655 656 if (ret == -ENOENT) 657 ret = 0; 658 if (ret >= 0 && ret < len && (off + ret < i_size)) { 659 int zlen = min(len - ret, i_size - off - ret); 660 int zoff = page_off + ret; 661 dout("sync_read zero gap %llu~%llu\n", 662 off + ret, off + ret + zlen); 663 ceph_zero_page_vector_range(zoff, zlen, pages); 664 ret += zlen; 665 } 666 667 if (unlikely(iov_iter_is_pipe(to))) { 668 if (ret > 0) { 669 iov_iter_advance(to, ret); 670 off += ret; 671 } else { 672 iov_iter_advance(to, 0); 673 } 674 ceph_put_page_vector(pages, num_pages, false); 675 } else { 676 int idx = 0; 677 size_t left = ret > 0 ? ret : 0; 678 while (left > 0) { 679 size_t len, copied; 680 page_off = off & ~PAGE_MASK; 681 len = min_t(size_t, left, PAGE_SIZE - page_off); 682 copied = copy_page_to_iter(pages[idx++], 683 page_off, len, to); 684 off += copied; 685 left -= copied; 686 if (copied < len) { 687 ret = -EFAULT; 688 break; 689 } 690 } 691 ceph_release_page_vector(pages, num_pages); 692 } 693 694 if (ret <= 0 || off >= i_size || !more) 695 break; 696 } 697 698 if (off > iocb->ki_pos) { 699 if (ret >= 0 && 700 iov_iter_count(to) > 0 && off >= i_size_read(inode)) 701 *retry_op = CHECK_EOF; 702 ret = off - iocb->ki_pos; 703 iocb->ki_pos = off; 704 } 705 706 dout("sync_read result %zd retry_op %d\n", ret, *retry_op); 707 return ret; 708 } 709 710 struct ceph_aio_request { 711 struct kiocb *iocb; 712 size_t total_len; 713 bool write; 714 bool should_dirty; 715 int error; 716 struct list_head osd_reqs; 717 unsigned num_reqs; 718 atomic_t pending_reqs; 719 struct timespec64 mtime; 720 struct ceph_cap_flush *prealloc_cf; 721 }; 722 723 struct ceph_aio_work { 724 struct work_struct work; 725 struct ceph_osd_request *req; 726 }; 727 728 static void ceph_aio_retry_work(struct work_struct *work); 729 730 static void ceph_aio_complete(struct inode *inode, 731 struct ceph_aio_request *aio_req) 732 { 733 struct ceph_inode_info *ci = ceph_inode(inode); 734 int ret; 735 736 if (!atomic_dec_and_test(&aio_req->pending_reqs)) 737 return; 738 739 ret = aio_req->error; 740 if (!ret) 741 ret = aio_req->total_len; 742 743 dout("ceph_aio_complete %p rc %d\n", inode, ret); 744 745 if (ret >= 0 && aio_req->write) { 746 int dirty; 747 748 loff_t endoff = aio_req->iocb->ki_pos + aio_req->total_len; 749 if (endoff > i_size_read(inode)) { 750 if (ceph_inode_set_size(inode, endoff)) 751 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); 752 } 753 754 spin_lock(&ci->i_ceph_lock); 755 ci->i_inline_version = CEPH_INLINE_NONE; 756 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, 757 &aio_req->prealloc_cf); 758 spin_unlock(&ci->i_ceph_lock); 759 if (dirty) 760 __mark_inode_dirty(inode, dirty); 761 762 } 763 764 ceph_put_cap_refs(ci, (aio_req->write ? CEPH_CAP_FILE_WR : 765 CEPH_CAP_FILE_RD)); 766 767 aio_req->iocb->ki_complete(aio_req->iocb, ret, 0); 768 769 ceph_free_cap_flush(aio_req->prealloc_cf); 770 kfree(aio_req); 771 } 772 773 static void ceph_aio_complete_req(struct ceph_osd_request *req) 774 { 775 int rc = req->r_result; 776 struct inode *inode = req->r_inode; 777 struct ceph_aio_request *aio_req = req->r_priv; 778 struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); 779 780 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS); 781 BUG_ON(!osd_data->num_bvecs); 782 783 dout("ceph_aio_complete_req %p rc %d bytes %u\n", 784 inode, rc, osd_data->bvec_pos.iter.bi_size); 785 786 if (rc == -EOLDSNAPC) { 787 struct ceph_aio_work *aio_work; 788 BUG_ON(!aio_req->write); 789 790 aio_work = kmalloc(sizeof(*aio_work), GFP_NOFS); 791 if (aio_work) { 792 INIT_WORK(&aio_work->work, ceph_aio_retry_work); 793 aio_work->req = req; 794 queue_work(ceph_inode_to_client(inode)->wb_wq, 795 &aio_work->work); 796 return; 797 } 798 rc = -ENOMEM; 799 } else if (!aio_req->write) { 800 if (rc == -ENOENT) 801 rc = 0; 802 if (rc >= 0 && osd_data->bvec_pos.iter.bi_size > rc) { 803 struct iov_iter i; 804 int zlen = osd_data->bvec_pos.iter.bi_size - rc; 805 806 /* 807 * If read is satisfied by single OSD request, 808 * it can pass EOF. Otherwise read is within 809 * i_size. 810 */ 811 if (aio_req->num_reqs == 1) { 812 loff_t i_size = i_size_read(inode); 813 loff_t endoff = aio_req->iocb->ki_pos + rc; 814 if (endoff < i_size) 815 zlen = min_t(size_t, zlen, 816 i_size - endoff); 817 aio_req->total_len = rc + zlen; 818 } 819 820 iov_iter_bvec(&i, READ, osd_data->bvec_pos.bvecs, 821 osd_data->num_bvecs, 822 osd_data->bvec_pos.iter.bi_size); 823 iov_iter_advance(&i, rc); 824 iov_iter_zero(zlen, &i); 825 } 826 } 827 828 put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs, 829 aio_req->should_dirty); 830 ceph_osdc_put_request(req); 831 832 if (rc < 0) 833 cmpxchg(&aio_req->error, 0, rc); 834 835 ceph_aio_complete(inode, aio_req); 836 return; 837 } 838 839 static void ceph_aio_retry_work(struct work_struct *work) 840 { 841 struct ceph_aio_work *aio_work = 842 container_of(work, struct ceph_aio_work, work); 843 struct ceph_osd_request *orig_req = aio_work->req; 844 struct ceph_aio_request *aio_req = orig_req->r_priv; 845 struct inode *inode = orig_req->r_inode; 846 struct ceph_inode_info *ci = ceph_inode(inode); 847 struct ceph_snap_context *snapc; 848 struct ceph_osd_request *req; 849 int ret; 850 851 spin_lock(&ci->i_ceph_lock); 852 if (__ceph_have_pending_cap_snap(ci)) { 853 struct ceph_cap_snap *capsnap = 854 list_last_entry(&ci->i_cap_snaps, 855 struct ceph_cap_snap, 856 ci_item); 857 snapc = ceph_get_snap_context(capsnap->context); 858 } else { 859 BUG_ON(!ci->i_head_snapc); 860 snapc = ceph_get_snap_context(ci->i_head_snapc); 861 } 862 spin_unlock(&ci->i_ceph_lock); 863 864 req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 1, 865 false, GFP_NOFS); 866 if (!req) { 867 ret = -ENOMEM; 868 req = orig_req; 869 goto out; 870 } 871 872 req->r_flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE; 873 ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc); 874 ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid); 875 876 req->r_ops[0] = orig_req->r_ops[0]; 877 878 req->r_mtime = aio_req->mtime; 879 req->r_data_offset = req->r_ops[0].extent.offset; 880 881 ret = ceph_osdc_alloc_messages(req, GFP_NOFS); 882 if (ret) { 883 ceph_osdc_put_request(req); 884 req = orig_req; 885 goto out; 886 } 887 888 ceph_osdc_put_request(orig_req); 889 890 req->r_callback = ceph_aio_complete_req; 891 req->r_inode = inode; 892 req->r_priv = aio_req; 893 894 ret = ceph_osdc_start_request(req->r_osdc, req, false); 895 out: 896 if (ret < 0) { 897 req->r_result = ret; 898 ceph_aio_complete_req(req); 899 } 900 901 ceph_put_snap_context(snapc); 902 kfree(aio_work); 903 } 904 905 static ssize_t 906 ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, 907 struct ceph_snap_context *snapc, 908 struct ceph_cap_flush **pcf) 909 { 910 struct file *file = iocb->ki_filp; 911 struct inode *inode = file_inode(file); 912 struct ceph_inode_info *ci = ceph_inode(inode); 913 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 914 struct ceph_vino vino; 915 struct ceph_osd_request *req; 916 struct bio_vec *bvecs; 917 struct ceph_aio_request *aio_req = NULL; 918 int num_pages = 0; 919 int flags; 920 int ret; 921 struct timespec64 mtime = current_time(inode); 922 size_t count = iov_iter_count(iter); 923 loff_t pos = iocb->ki_pos; 924 bool write = iov_iter_rw(iter) == WRITE; 925 bool should_dirty = !write && iter_is_iovec(iter); 926 927 if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP) 928 return -EROFS; 929 930 dout("sync_direct_%s on file %p %lld~%u snapc %p seq %lld\n", 931 (write ? "write" : "read"), file, pos, (unsigned)count, 932 snapc, snapc->seq); 933 934 ret = filemap_write_and_wait_range(inode->i_mapping, 935 pos, pos + count - 1); 936 if (ret < 0) 937 return ret; 938 939 if (write) { 940 int ret2 = invalidate_inode_pages2_range(inode->i_mapping, 941 pos >> PAGE_SHIFT, 942 (pos + count - 1) >> PAGE_SHIFT); 943 if (ret2 < 0) 944 dout("invalidate_inode_pages2_range returned %d\n", ret2); 945 946 flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE; 947 } else { 948 flags = CEPH_OSD_FLAG_READ; 949 } 950 951 while (iov_iter_count(iter) > 0) { 952 u64 size = iov_iter_count(iter); 953 ssize_t len; 954 955 if (write) 956 size = min_t(u64, size, fsc->mount_options->wsize); 957 else 958 size = min_t(u64, size, fsc->mount_options->rsize); 959 960 vino = ceph_vino(inode); 961 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 962 vino, pos, &size, 0, 963 1, 964 write ? CEPH_OSD_OP_WRITE : 965 CEPH_OSD_OP_READ, 966 flags, snapc, 967 ci->i_truncate_seq, 968 ci->i_truncate_size, 969 false); 970 if (IS_ERR(req)) { 971 ret = PTR_ERR(req); 972 break; 973 } 974 975 len = iter_get_bvecs_alloc(iter, size, &bvecs, &num_pages); 976 if (len < 0) { 977 ceph_osdc_put_request(req); 978 ret = len; 979 break; 980 } 981 if (len != size) 982 osd_req_op_extent_update(req, 0, len); 983 984 /* 985 * To simplify error handling, allow AIO when IO within i_size 986 * or IO can be satisfied by single OSD request. 987 */ 988 if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) && 989 (len == count || pos + count <= i_size_read(inode))) { 990 aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL); 991 if (aio_req) { 992 aio_req->iocb = iocb; 993 aio_req->write = write; 994 aio_req->should_dirty = should_dirty; 995 INIT_LIST_HEAD(&aio_req->osd_reqs); 996 if (write) { 997 aio_req->mtime = mtime; 998 swap(aio_req->prealloc_cf, *pcf); 999 } 1000 } 1001 /* ignore error */ 1002 } 1003 1004 if (write) { 1005 /* 1006 * throw out any page cache pages in this range. this 1007 * may block. 1008 */ 1009 truncate_inode_pages_range(inode->i_mapping, pos, 1010 (pos+len) | (PAGE_SIZE - 1)); 1011 1012 req->r_mtime = mtime; 1013 } 1014 1015 osd_req_op_extent_osd_data_bvecs(req, 0, bvecs, num_pages, len); 1016 1017 if (aio_req) { 1018 aio_req->total_len += len; 1019 aio_req->num_reqs++; 1020 atomic_inc(&aio_req->pending_reqs); 1021 1022 req->r_callback = ceph_aio_complete_req; 1023 req->r_inode = inode; 1024 req->r_priv = aio_req; 1025 list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs); 1026 1027 pos += len; 1028 continue; 1029 } 1030 1031 ret = ceph_osdc_start_request(req->r_osdc, req, false); 1032 if (!ret) 1033 ret = ceph_osdc_wait_request(&fsc->client->osdc, req); 1034 1035 size = i_size_read(inode); 1036 if (!write) { 1037 if (ret == -ENOENT) 1038 ret = 0; 1039 if (ret >= 0 && ret < len && pos + ret < size) { 1040 struct iov_iter i; 1041 int zlen = min_t(size_t, len - ret, 1042 size - pos - ret); 1043 1044 iov_iter_bvec(&i, READ, bvecs, num_pages, len); 1045 iov_iter_advance(&i, ret); 1046 iov_iter_zero(zlen, &i); 1047 ret += zlen; 1048 } 1049 if (ret >= 0) 1050 len = ret; 1051 } 1052 1053 put_bvecs(bvecs, num_pages, should_dirty); 1054 ceph_osdc_put_request(req); 1055 if (ret < 0) 1056 break; 1057 1058 pos += len; 1059 if (!write && pos >= size) 1060 break; 1061 1062 if (write && pos > size) { 1063 if (ceph_inode_set_size(inode, pos)) 1064 ceph_check_caps(ceph_inode(inode), 1065 CHECK_CAPS_AUTHONLY, 1066 NULL); 1067 } 1068 } 1069 1070 if (aio_req) { 1071 LIST_HEAD(osd_reqs); 1072 1073 if (aio_req->num_reqs == 0) { 1074 kfree(aio_req); 1075 return ret; 1076 } 1077 1078 ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR : 1079 CEPH_CAP_FILE_RD); 1080 1081 list_splice(&aio_req->osd_reqs, &osd_reqs); 1082 while (!list_empty(&osd_reqs)) { 1083 req = list_first_entry(&osd_reqs, 1084 struct ceph_osd_request, 1085 r_unsafe_item); 1086 list_del_init(&req->r_unsafe_item); 1087 if (ret >= 0) 1088 ret = ceph_osdc_start_request(req->r_osdc, 1089 req, false); 1090 if (ret < 0) { 1091 req->r_result = ret; 1092 ceph_aio_complete_req(req); 1093 } 1094 } 1095 return -EIOCBQUEUED; 1096 } 1097 1098 if (ret != -EOLDSNAPC && pos > iocb->ki_pos) { 1099 ret = pos - iocb->ki_pos; 1100 iocb->ki_pos = pos; 1101 } 1102 return ret; 1103 } 1104 1105 /* 1106 * Synchronous write, straight from __user pointer or user pages. 1107 * 1108 * If write spans object boundary, just do multiple writes. (For a 1109 * correct atomic write, we should e.g. take write locks on all 1110 * objects, rollback on failure, etc.) 1111 */ 1112 static ssize_t 1113 ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, 1114 struct ceph_snap_context *snapc) 1115 { 1116 struct file *file = iocb->ki_filp; 1117 struct inode *inode = file_inode(file); 1118 struct ceph_inode_info *ci = ceph_inode(inode); 1119 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 1120 struct ceph_vino vino; 1121 struct ceph_osd_request *req; 1122 struct page **pages; 1123 u64 len; 1124 int num_pages; 1125 int written = 0; 1126 int flags; 1127 int ret; 1128 bool check_caps = false; 1129 struct timespec64 mtime = current_time(inode); 1130 size_t count = iov_iter_count(from); 1131 1132 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) 1133 return -EROFS; 1134 1135 dout("sync_write on file %p %lld~%u snapc %p seq %lld\n", 1136 file, pos, (unsigned)count, snapc, snapc->seq); 1137 1138 ret = filemap_write_and_wait_range(inode->i_mapping, 1139 pos, pos + count - 1); 1140 if (ret < 0) 1141 return ret; 1142 1143 ret = invalidate_inode_pages2_range(inode->i_mapping, 1144 pos >> PAGE_SHIFT, 1145 (pos + count - 1) >> PAGE_SHIFT); 1146 if (ret < 0) 1147 dout("invalidate_inode_pages2_range returned %d\n", ret); 1148 1149 flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE; 1150 1151 while ((len = iov_iter_count(from)) > 0) { 1152 size_t left; 1153 int n; 1154 1155 vino = ceph_vino(inode); 1156 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 1157 vino, pos, &len, 0, 1, 1158 CEPH_OSD_OP_WRITE, flags, snapc, 1159 ci->i_truncate_seq, 1160 ci->i_truncate_size, 1161 false); 1162 if (IS_ERR(req)) { 1163 ret = PTR_ERR(req); 1164 break; 1165 } 1166 1167 /* 1168 * write from beginning of first page, 1169 * regardless of io alignment 1170 */ 1171 num_pages = (len + PAGE_SIZE - 1) >> PAGE_SHIFT; 1172 1173 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); 1174 if (IS_ERR(pages)) { 1175 ret = PTR_ERR(pages); 1176 goto out; 1177 } 1178 1179 left = len; 1180 for (n = 0; n < num_pages; n++) { 1181 size_t plen = min_t(size_t, left, PAGE_SIZE); 1182 ret = copy_page_from_iter(pages[n], 0, plen, from); 1183 if (ret != plen) { 1184 ret = -EFAULT; 1185 break; 1186 } 1187 left -= ret; 1188 } 1189 1190 if (ret < 0) { 1191 ceph_release_page_vector(pages, num_pages); 1192 goto out; 1193 } 1194 1195 req->r_inode = inode; 1196 1197 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, 1198 false, true); 1199 1200 req->r_mtime = mtime; 1201 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); 1202 if (!ret) 1203 ret = ceph_osdc_wait_request(&fsc->client->osdc, req); 1204 1205 out: 1206 ceph_osdc_put_request(req); 1207 if (ret != 0) { 1208 ceph_set_error_write(ci); 1209 break; 1210 } 1211 1212 ceph_clear_error_write(ci); 1213 pos += len; 1214 written += len; 1215 if (pos > i_size_read(inode)) { 1216 check_caps = ceph_inode_set_size(inode, pos); 1217 if (check_caps) 1218 ceph_check_caps(ceph_inode(inode), 1219 CHECK_CAPS_AUTHONLY, 1220 NULL); 1221 } 1222 1223 } 1224 1225 if (ret != -EOLDSNAPC && written > 0) { 1226 ret = written; 1227 iocb->ki_pos = pos; 1228 } 1229 return ret; 1230 } 1231 1232 /* 1233 * Wrap generic_file_aio_read with checks for cap bits on the inode. 1234 * Atomically grab references, so that those bits are not released 1235 * back to the MDS mid-read. 1236 * 1237 * Hmm, the sync read case isn't actually async... should it be? 1238 */ 1239 static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) 1240 { 1241 struct file *filp = iocb->ki_filp; 1242 struct ceph_file_info *fi = filp->private_data; 1243 size_t len = iov_iter_count(to); 1244 struct inode *inode = file_inode(filp); 1245 struct ceph_inode_info *ci = ceph_inode(inode); 1246 struct page *pinned_page = NULL; 1247 ssize_t ret; 1248 int want, got = 0; 1249 int retry_op = 0, read = 0; 1250 1251 again: 1252 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", 1253 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode); 1254 1255 if (fi->fmode & CEPH_FILE_MODE_LAZY) 1256 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; 1257 else 1258 want = CEPH_CAP_FILE_CACHE; 1259 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page); 1260 if (ret < 0) 1261 return ret; 1262 1263 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || 1264 (iocb->ki_flags & IOCB_DIRECT) || 1265 (fi->flags & CEPH_F_SYNC)) { 1266 1267 dout("aio_sync_read %p %llx.%llx %llu~%u got cap refs on %s\n", 1268 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, 1269 ceph_cap_string(got)); 1270 1271 if (ci->i_inline_version == CEPH_INLINE_NONE) { 1272 if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) { 1273 ret = ceph_direct_read_write(iocb, to, 1274 NULL, NULL); 1275 if (ret >= 0 && ret < len) 1276 retry_op = CHECK_EOF; 1277 } else { 1278 ret = ceph_sync_read(iocb, to, &retry_op); 1279 } 1280 } else { 1281 retry_op = READ_INLINE; 1282 } 1283 } else { 1284 CEPH_DEFINE_RW_CONTEXT(rw_ctx, got); 1285 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", 1286 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, 1287 ceph_cap_string(got)); 1288 ceph_add_rw_context(fi, &rw_ctx); 1289 ret = generic_file_read_iter(iocb, to); 1290 ceph_del_rw_context(fi, &rw_ctx); 1291 } 1292 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", 1293 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); 1294 if (pinned_page) { 1295 put_page(pinned_page); 1296 pinned_page = NULL; 1297 } 1298 ceph_put_cap_refs(ci, got); 1299 if (retry_op > HAVE_RETRIED && ret >= 0) { 1300 int statret; 1301 struct page *page = NULL; 1302 loff_t i_size; 1303 if (retry_op == READ_INLINE) { 1304 page = __page_cache_alloc(GFP_KERNEL); 1305 if (!page) 1306 return -ENOMEM; 1307 } 1308 1309 statret = __ceph_do_getattr(inode, page, 1310 CEPH_STAT_CAP_INLINE_DATA, !!page); 1311 if (statret < 0) { 1312 if (page) 1313 __free_page(page); 1314 if (statret == -ENODATA) { 1315 BUG_ON(retry_op != READ_INLINE); 1316 goto again; 1317 } 1318 return statret; 1319 } 1320 1321 i_size = i_size_read(inode); 1322 if (retry_op == READ_INLINE) { 1323 BUG_ON(ret > 0 || read > 0); 1324 if (iocb->ki_pos < i_size && 1325 iocb->ki_pos < PAGE_SIZE) { 1326 loff_t end = min_t(loff_t, i_size, 1327 iocb->ki_pos + len); 1328 end = min_t(loff_t, end, PAGE_SIZE); 1329 if (statret < end) 1330 zero_user_segment(page, statret, end); 1331 ret = copy_page_to_iter(page, 1332 iocb->ki_pos & ~PAGE_MASK, 1333 end - iocb->ki_pos, to); 1334 iocb->ki_pos += ret; 1335 read += ret; 1336 } 1337 if (iocb->ki_pos < i_size && read < len) { 1338 size_t zlen = min_t(size_t, len - read, 1339 i_size - iocb->ki_pos); 1340 ret = iov_iter_zero(zlen, to); 1341 iocb->ki_pos += ret; 1342 read += ret; 1343 } 1344 __free_pages(page, 0); 1345 return read; 1346 } 1347 1348 /* hit EOF or hole? */ 1349 if (retry_op == CHECK_EOF && iocb->ki_pos < i_size && 1350 ret < len) { 1351 dout("sync_read hit hole, ppos %lld < size %lld" 1352 ", reading more\n", iocb->ki_pos, i_size); 1353 1354 read += ret; 1355 len -= ret; 1356 retry_op = HAVE_RETRIED; 1357 goto again; 1358 } 1359 } 1360 1361 if (ret >= 0) 1362 ret += read; 1363 1364 return ret; 1365 } 1366 1367 /* 1368 * Take cap references to avoid releasing caps to MDS mid-write. 1369 * 1370 * If we are synchronous, and write with an old snap context, the OSD 1371 * may return EOLDSNAPC. In that case, retry the write.. _after_ 1372 * dropping our cap refs and allowing the pending snap to logically 1373 * complete _before_ this write occurs. 1374 * 1375 * If we are near ENOSPC, write synchronously. 1376 */ 1377 static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) 1378 { 1379 struct file *file = iocb->ki_filp; 1380 struct ceph_file_info *fi = file->private_data; 1381 struct inode *inode = file_inode(file); 1382 struct ceph_inode_info *ci = ceph_inode(inode); 1383 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 1384 struct ceph_cap_flush *prealloc_cf; 1385 ssize_t count, written = 0; 1386 int err, want, got; 1387 loff_t pos; 1388 loff_t limit = max(i_size_read(inode), fsc->max_file_size); 1389 1390 if (ceph_snap(inode) != CEPH_NOSNAP) 1391 return -EROFS; 1392 1393 prealloc_cf = ceph_alloc_cap_flush(); 1394 if (!prealloc_cf) 1395 return -ENOMEM; 1396 1397 retry_snap: 1398 inode_lock(inode); 1399 1400 /* We can write back this queue in page reclaim */ 1401 current->backing_dev_info = inode_to_bdi(inode); 1402 1403 if (iocb->ki_flags & IOCB_APPEND) { 1404 err = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false); 1405 if (err < 0) 1406 goto out; 1407 } 1408 1409 err = generic_write_checks(iocb, from); 1410 if (err <= 0) 1411 goto out; 1412 1413 pos = iocb->ki_pos; 1414 if (unlikely(pos >= limit)) { 1415 err = -EFBIG; 1416 goto out; 1417 } else { 1418 iov_iter_truncate(from, limit - pos); 1419 } 1420 1421 count = iov_iter_count(from); 1422 if (ceph_quota_is_max_bytes_exceeded(inode, pos + count)) { 1423 err = -EDQUOT; 1424 goto out; 1425 } 1426 1427 err = file_remove_privs(file); 1428 if (err) 1429 goto out; 1430 1431 err = file_update_time(file); 1432 if (err) 1433 goto out; 1434 1435 if (ci->i_inline_version != CEPH_INLINE_NONE) { 1436 err = ceph_uninline_data(file, NULL); 1437 if (err < 0) 1438 goto out; 1439 } 1440 1441 /* FIXME: not complete since it doesn't account for being at quota */ 1442 if (ceph_osdmap_flag(&fsc->client->osdc, CEPH_OSDMAP_FULL)) { 1443 err = -ENOSPC; 1444 goto out; 1445 } 1446 1447 dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n", 1448 inode, ceph_vinop(inode), pos, count, i_size_read(inode)); 1449 if (fi->fmode & CEPH_FILE_MODE_LAZY) 1450 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; 1451 else 1452 want = CEPH_CAP_FILE_BUFFER; 1453 got = 0; 1454 err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count, 1455 &got, NULL); 1456 if (err < 0) 1457 goto out; 1458 1459 dout("aio_write %p %llx.%llx %llu~%zd got cap refs on %s\n", 1460 inode, ceph_vinop(inode), pos, count, ceph_cap_string(got)); 1461 1462 if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || 1463 (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC) || 1464 (ci->i_ceph_flags & CEPH_I_ERROR_WRITE)) { 1465 struct ceph_snap_context *snapc; 1466 struct iov_iter data; 1467 inode_unlock(inode); 1468 1469 spin_lock(&ci->i_ceph_lock); 1470 if (__ceph_have_pending_cap_snap(ci)) { 1471 struct ceph_cap_snap *capsnap = 1472 list_last_entry(&ci->i_cap_snaps, 1473 struct ceph_cap_snap, 1474 ci_item); 1475 snapc = ceph_get_snap_context(capsnap->context); 1476 } else { 1477 BUG_ON(!ci->i_head_snapc); 1478 snapc = ceph_get_snap_context(ci->i_head_snapc); 1479 } 1480 spin_unlock(&ci->i_ceph_lock); 1481 1482 /* we might need to revert back to that point */ 1483 data = *from; 1484 if (iocb->ki_flags & IOCB_DIRECT) 1485 written = ceph_direct_read_write(iocb, &data, snapc, 1486 &prealloc_cf); 1487 else 1488 written = ceph_sync_write(iocb, &data, pos, snapc); 1489 if (written > 0) 1490 iov_iter_advance(from, written); 1491 ceph_put_snap_context(snapc); 1492 } else { 1493 /* 1494 * No need to acquire the i_truncate_mutex. Because 1495 * the MDS revokes Fwb caps before sending truncate 1496 * message to us. We can't get Fwb cap while there 1497 * are pending vmtruncate. So write and vmtruncate 1498 * can not run at the same time 1499 */ 1500 written = generic_perform_write(file, from, pos); 1501 if (likely(written >= 0)) 1502 iocb->ki_pos = pos + written; 1503 inode_unlock(inode); 1504 } 1505 1506 if (written >= 0) { 1507 int dirty; 1508 1509 spin_lock(&ci->i_ceph_lock); 1510 ci->i_inline_version = CEPH_INLINE_NONE; 1511 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, 1512 &prealloc_cf); 1513 spin_unlock(&ci->i_ceph_lock); 1514 if (dirty) 1515 __mark_inode_dirty(inode, dirty); 1516 if (ceph_quota_is_max_bytes_approaching(inode, iocb->ki_pos)) 1517 ceph_check_caps(ci, CHECK_CAPS_NODELAY, NULL); 1518 } 1519 1520 dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n", 1521 inode, ceph_vinop(inode), pos, (unsigned)count, 1522 ceph_cap_string(got)); 1523 ceph_put_cap_refs(ci, got); 1524 1525 if (written == -EOLDSNAPC) { 1526 dout("aio_write %p %llx.%llx %llu~%u" "got EOLDSNAPC, retrying\n", 1527 inode, ceph_vinop(inode), pos, (unsigned)count); 1528 goto retry_snap; 1529 } 1530 1531 if (written >= 0) { 1532 if (ceph_osdmap_flag(&fsc->client->osdc, CEPH_OSDMAP_NEARFULL)) 1533 iocb->ki_flags |= IOCB_DSYNC; 1534 written = generic_write_sync(iocb, written); 1535 } 1536 1537 goto out_unlocked; 1538 1539 out: 1540 inode_unlock(inode); 1541 out_unlocked: 1542 ceph_free_cap_flush(prealloc_cf); 1543 current->backing_dev_info = NULL; 1544 return written ? written : err; 1545 } 1546 1547 /* 1548 * llseek. be sure to verify file size on SEEK_END. 1549 */ 1550 static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) 1551 { 1552 struct inode *inode = file->f_mapping->host; 1553 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 1554 loff_t i_size; 1555 loff_t ret; 1556 1557 inode_lock(inode); 1558 1559 if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) { 1560 ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false); 1561 if (ret < 0) 1562 goto out; 1563 } 1564 1565 i_size = i_size_read(inode); 1566 switch (whence) { 1567 case SEEK_END: 1568 offset += i_size; 1569 break; 1570 case SEEK_CUR: 1571 /* 1572 * Here we special-case the lseek(fd, 0, SEEK_CUR) 1573 * position-querying operation. Avoid rewriting the "same" 1574 * f_pos value back to the file because a concurrent read(), 1575 * write() or lseek() might have altered it 1576 */ 1577 if (offset == 0) { 1578 ret = file->f_pos; 1579 goto out; 1580 } 1581 offset += file->f_pos; 1582 break; 1583 case SEEK_DATA: 1584 if (offset < 0 || offset >= i_size) { 1585 ret = -ENXIO; 1586 goto out; 1587 } 1588 break; 1589 case SEEK_HOLE: 1590 if (offset < 0 || offset >= i_size) { 1591 ret = -ENXIO; 1592 goto out; 1593 } 1594 offset = i_size; 1595 break; 1596 } 1597 1598 ret = vfs_setpos(file, offset, max(i_size, fsc->max_file_size)); 1599 1600 out: 1601 inode_unlock(inode); 1602 return ret; 1603 } 1604 1605 static inline void ceph_zero_partial_page( 1606 struct inode *inode, loff_t offset, unsigned size) 1607 { 1608 struct page *page; 1609 pgoff_t index = offset >> PAGE_SHIFT; 1610 1611 page = find_lock_page(inode->i_mapping, index); 1612 if (page) { 1613 wait_on_page_writeback(page); 1614 zero_user(page, offset & (PAGE_SIZE - 1), size); 1615 unlock_page(page); 1616 put_page(page); 1617 } 1618 } 1619 1620 static void ceph_zero_pagecache_range(struct inode *inode, loff_t offset, 1621 loff_t length) 1622 { 1623 loff_t nearly = round_up(offset, PAGE_SIZE); 1624 if (offset < nearly) { 1625 loff_t size = nearly - offset; 1626 if (length < size) 1627 size = length; 1628 ceph_zero_partial_page(inode, offset, size); 1629 offset += size; 1630 length -= size; 1631 } 1632 if (length >= PAGE_SIZE) { 1633 loff_t size = round_down(length, PAGE_SIZE); 1634 truncate_pagecache_range(inode, offset, offset + size - 1); 1635 offset += size; 1636 length -= size; 1637 } 1638 if (length) 1639 ceph_zero_partial_page(inode, offset, length); 1640 } 1641 1642 static int ceph_zero_partial_object(struct inode *inode, 1643 loff_t offset, loff_t *length) 1644 { 1645 struct ceph_inode_info *ci = ceph_inode(inode); 1646 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 1647 struct ceph_osd_request *req; 1648 int ret = 0; 1649 loff_t zero = 0; 1650 int op; 1651 1652 if (!length) { 1653 op = offset ? CEPH_OSD_OP_DELETE : CEPH_OSD_OP_TRUNCATE; 1654 length = &zero; 1655 } else { 1656 op = CEPH_OSD_OP_ZERO; 1657 } 1658 1659 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 1660 ceph_vino(inode), 1661 offset, length, 1662 0, 1, op, 1663 CEPH_OSD_FLAG_WRITE, 1664 NULL, 0, 0, false); 1665 if (IS_ERR(req)) { 1666 ret = PTR_ERR(req); 1667 goto out; 1668 } 1669 1670 req->r_mtime = inode->i_mtime; 1671 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); 1672 if (!ret) { 1673 ret = ceph_osdc_wait_request(&fsc->client->osdc, req); 1674 if (ret == -ENOENT) 1675 ret = 0; 1676 } 1677 ceph_osdc_put_request(req); 1678 1679 out: 1680 return ret; 1681 } 1682 1683 static int ceph_zero_objects(struct inode *inode, loff_t offset, loff_t length) 1684 { 1685 int ret = 0; 1686 struct ceph_inode_info *ci = ceph_inode(inode); 1687 s32 stripe_unit = ci->i_layout.stripe_unit; 1688 s32 stripe_count = ci->i_layout.stripe_count; 1689 s32 object_size = ci->i_layout.object_size; 1690 u64 object_set_size = object_size * stripe_count; 1691 u64 nearly, t; 1692 1693 /* round offset up to next period boundary */ 1694 nearly = offset + object_set_size - 1; 1695 t = nearly; 1696 nearly -= do_div(t, object_set_size); 1697 1698 while (length && offset < nearly) { 1699 loff_t size = length; 1700 ret = ceph_zero_partial_object(inode, offset, &size); 1701 if (ret < 0) 1702 return ret; 1703 offset += size; 1704 length -= size; 1705 } 1706 while (length >= object_set_size) { 1707 int i; 1708 loff_t pos = offset; 1709 for (i = 0; i < stripe_count; ++i) { 1710 ret = ceph_zero_partial_object(inode, pos, NULL); 1711 if (ret < 0) 1712 return ret; 1713 pos += stripe_unit; 1714 } 1715 offset += object_set_size; 1716 length -= object_set_size; 1717 } 1718 while (length) { 1719 loff_t size = length; 1720 ret = ceph_zero_partial_object(inode, offset, &size); 1721 if (ret < 0) 1722 return ret; 1723 offset += size; 1724 length -= size; 1725 } 1726 return ret; 1727 } 1728 1729 static long ceph_fallocate(struct file *file, int mode, 1730 loff_t offset, loff_t length) 1731 { 1732 struct ceph_file_info *fi = file->private_data; 1733 struct inode *inode = file_inode(file); 1734 struct ceph_inode_info *ci = ceph_inode(inode); 1735 struct ceph_cap_flush *prealloc_cf; 1736 int want, got = 0; 1737 int dirty; 1738 int ret = 0; 1739 loff_t endoff = 0; 1740 loff_t size; 1741 1742 if (mode != (FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE)) 1743 return -EOPNOTSUPP; 1744 1745 if (!S_ISREG(inode->i_mode)) 1746 return -EOPNOTSUPP; 1747 1748 prealloc_cf = ceph_alloc_cap_flush(); 1749 if (!prealloc_cf) 1750 return -ENOMEM; 1751 1752 inode_lock(inode); 1753 1754 if (ceph_snap(inode) != CEPH_NOSNAP) { 1755 ret = -EROFS; 1756 goto unlock; 1757 } 1758 1759 if (ci->i_inline_version != CEPH_INLINE_NONE) { 1760 ret = ceph_uninline_data(file, NULL); 1761 if (ret < 0) 1762 goto unlock; 1763 } 1764 1765 size = i_size_read(inode); 1766 1767 /* Are we punching a hole beyond EOF? */ 1768 if (offset >= size) 1769 goto unlock; 1770 if ((offset + length) > size) 1771 length = size - offset; 1772 1773 if (fi->fmode & CEPH_FILE_MODE_LAZY) 1774 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; 1775 else 1776 want = CEPH_CAP_FILE_BUFFER; 1777 1778 ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL); 1779 if (ret < 0) 1780 goto unlock; 1781 1782 ceph_zero_pagecache_range(inode, offset, length); 1783 ret = ceph_zero_objects(inode, offset, length); 1784 1785 if (!ret) { 1786 spin_lock(&ci->i_ceph_lock); 1787 ci->i_inline_version = CEPH_INLINE_NONE; 1788 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, 1789 &prealloc_cf); 1790 spin_unlock(&ci->i_ceph_lock); 1791 if (dirty) 1792 __mark_inode_dirty(inode, dirty); 1793 } 1794 1795 ceph_put_cap_refs(ci, got); 1796 unlock: 1797 inode_unlock(inode); 1798 ceph_free_cap_flush(prealloc_cf); 1799 return ret; 1800 } 1801 1802 /* 1803 * This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for 1804 * src_ci. Two attempts are made to obtain both caps, and an error is return if 1805 * this fails; zero is returned on success. 1806 */ 1807 static int get_rd_wr_caps(struct ceph_inode_info *src_ci, 1808 loff_t src_endoff, int *src_got, 1809 struct ceph_inode_info *dst_ci, 1810 loff_t dst_endoff, int *dst_got) 1811 { 1812 int ret = 0; 1813 bool retrying = false; 1814 1815 retry_caps: 1816 ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, 1817 dst_endoff, dst_got, NULL); 1818 if (ret < 0) 1819 return ret; 1820 1821 /* 1822 * Since we're already holding the FILE_WR capability for the dst file, 1823 * we would risk a deadlock by using ceph_get_caps. Thus, we'll do some 1824 * retry dance instead to try to get both capabilities. 1825 */ 1826 ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED, 1827 false, src_got); 1828 if (ret <= 0) { 1829 /* Start by dropping dst_ci caps and getting src_ci caps */ 1830 ceph_put_cap_refs(dst_ci, *dst_got); 1831 if (retrying) { 1832 if (!ret) 1833 /* ceph_try_get_caps masks EAGAIN */ 1834 ret = -EAGAIN; 1835 return ret; 1836 } 1837 ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD, 1838 CEPH_CAP_FILE_SHARED, src_endoff, 1839 src_got, NULL); 1840 if (ret < 0) 1841 return ret; 1842 /*... drop src_ci caps too, and retry */ 1843 ceph_put_cap_refs(src_ci, *src_got); 1844 retrying = true; 1845 goto retry_caps; 1846 } 1847 return ret; 1848 } 1849 1850 static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got, 1851 struct ceph_inode_info *dst_ci, int dst_got) 1852 { 1853 ceph_put_cap_refs(src_ci, src_got); 1854 ceph_put_cap_refs(dst_ci, dst_got); 1855 } 1856 1857 /* 1858 * This function does several size-related checks, returning an error if: 1859 * - source file is smaller than off+len 1860 * - destination file size is not OK (inode_newsize_ok()) 1861 * - max bytes quotas is exceeded 1862 */ 1863 static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode, 1864 loff_t src_off, loff_t dst_off, size_t len) 1865 { 1866 loff_t size, endoff; 1867 1868 size = i_size_read(src_inode); 1869 /* 1870 * Don't copy beyond source file EOF. Instead of simply setting length 1871 * to (size - src_off), just drop to VFS default implementation, as the 1872 * local i_size may be stale due to other clients writing to the source 1873 * inode. 1874 */ 1875 if (src_off + len > size) { 1876 dout("Copy beyond EOF (%llu + %zu > %llu)\n", 1877 src_off, len, size); 1878 return -EOPNOTSUPP; 1879 } 1880 size = i_size_read(dst_inode); 1881 1882 endoff = dst_off + len; 1883 if (inode_newsize_ok(dst_inode, endoff)) 1884 return -EOPNOTSUPP; 1885 1886 if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff)) 1887 return -EDQUOT; 1888 1889 return 0; 1890 } 1891 1892 static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off, 1893 struct file *dst_file, loff_t dst_off, 1894 size_t len, unsigned int flags) 1895 { 1896 struct inode *src_inode = file_inode(src_file); 1897 struct inode *dst_inode = file_inode(dst_file); 1898 struct ceph_inode_info *src_ci = ceph_inode(src_inode); 1899 struct ceph_inode_info *dst_ci = ceph_inode(dst_inode); 1900 struct ceph_cap_flush *prealloc_cf; 1901 struct ceph_object_locator src_oloc, dst_oloc; 1902 struct ceph_object_id src_oid, dst_oid; 1903 loff_t endoff = 0, size; 1904 ssize_t ret = -EIO; 1905 u64 src_objnum, dst_objnum, src_objoff, dst_objoff; 1906 u32 src_objlen, dst_objlen, object_size; 1907 int src_got = 0, dst_got = 0, err, dirty; 1908 bool do_final_copy = false; 1909 1910 if (src_inode == dst_inode) 1911 return -EINVAL; 1912 if (ceph_snap(dst_inode) != CEPH_NOSNAP) 1913 return -EROFS; 1914 1915 /* 1916 * Some of the checks below will return -EOPNOTSUPP, which will force a 1917 * fallback to the default VFS copy_file_range implementation. This is 1918 * desirable in several cases (for ex, the 'len' is smaller than the 1919 * size of the objects, or in cases where that would be more 1920 * efficient). 1921 */ 1922 1923 if (ceph_test_mount_opt(ceph_inode_to_client(src_inode), NOCOPYFROM)) 1924 return -EOPNOTSUPP; 1925 1926 if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) || 1927 (src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) || 1928 (src_ci->i_layout.object_size != dst_ci->i_layout.object_size)) 1929 return -EOPNOTSUPP; 1930 1931 if (len < src_ci->i_layout.object_size) 1932 return -EOPNOTSUPP; /* no remote copy will be done */ 1933 1934 prealloc_cf = ceph_alloc_cap_flush(); 1935 if (!prealloc_cf) 1936 return -ENOMEM; 1937 1938 /* Start by sync'ing the source and destination files */ 1939 ret = file_write_and_wait_range(src_file, src_off, (src_off + len)); 1940 if (ret < 0) { 1941 dout("failed to write src file (%zd)\n", ret); 1942 goto out; 1943 } 1944 ret = file_write_and_wait_range(dst_file, dst_off, (dst_off + len)); 1945 if (ret < 0) { 1946 dout("failed to write dst file (%zd)\n", ret); 1947 goto out; 1948 } 1949 1950 /* 1951 * We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other 1952 * clients may have dirty data in their caches. And OSDs know nothing 1953 * about caps, so they can't safely do the remote object copies. 1954 */ 1955 err = get_rd_wr_caps(src_ci, (src_off + len), &src_got, 1956 dst_ci, (dst_off + len), &dst_got); 1957 if (err < 0) { 1958 dout("get_rd_wr_caps returned %d\n", err); 1959 ret = -EOPNOTSUPP; 1960 goto out; 1961 } 1962 1963 ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len); 1964 if (ret < 0) 1965 goto out_caps; 1966 1967 size = i_size_read(dst_inode); 1968 endoff = dst_off + len; 1969 1970 /* Drop dst file cached pages */ 1971 ret = invalidate_inode_pages2_range(dst_inode->i_mapping, 1972 dst_off >> PAGE_SHIFT, 1973 endoff >> PAGE_SHIFT); 1974 if (ret < 0) { 1975 dout("Failed to invalidate inode pages (%zd)\n", ret); 1976 ret = 0; /* XXX */ 1977 } 1978 src_oloc.pool = src_ci->i_layout.pool_id; 1979 src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns); 1980 dst_oloc.pool = dst_ci->i_layout.pool_id; 1981 dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns); 1982 1983 ceph_calc_file_object_mapping(&src_ci->i_layout, src_off, 1984 src_ci->i_layout.object_size, 1985 &src_objnum, &src_objoff, &src_objlen); 1986 ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off, 1987 dst_ci->i_layout.object_size, 1988 &dst_objnum, &dst_objoff, &dst_objlen); 1989 /* object-level offsets need to the same */ 1990 if (src_objoff != dst_objoff) { 1991 ret = -EOPNOTSUPP; 1992 goto out_caps; 1993 } 1994 1995 /* 1996 * Do a manual copy if the object offset isn't object aligned. 1997 * 'src_objlen' contains the bytes left until the end of the object, 1998 * starting at the src_off 1999 */ 2000 if (src_objoff) { 2001 /* 2002 * we need to temporarily drop all caps as we'll be calling 2003 * {read,write}_iter, which will get caps again. 2004 */ 2005 put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got); 2006 ret = do_splice_direct(src_file, &src_off, dst_file, 2007 &dst_off, src_objlen, flags); 2008 if (ret < 0) { 2009 dout("do_splice_direct returned %d\n", err); 2010 goto out; 2011 } 2012 len -= ret; 2013 err = get_rd_wr_caps(src_ci, (src_off + len), 2014 &src_got, dst_ci, 2015 (dst_off + len), &dst_got); 2016 if (err < 0) 2017 goto out; 2018 err = is_file_size_ok(src_inode, dst_inode, 2019 src_off, dst_off, len); 2020 if (err < 0) 2021 goto out_caps; 2022 } 2023 object_size = src_ci->i_layout.object_size; 2024 while (len >= object_size) { 2025 ceph_calc_file_object_mapping(&src_ci->i_layout, src_off, 2026 object_size, &src_objnum, 2027 &src_objoff, &src_objlen); 2028 ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off, 2029 object_size, &dst_objnum, 2030 &dst_objoff, &dst_objlen); 2031 ceph_oid_init(&src_oid); 2032 ceph_oid_printf(&src_oid, "%llx.%08llx", 2033 src_ci->i_vino.ino, src_objnum); 2034 ceph_oid_init(&dst_oid); 2035 ceph_oid_printf(&dst_oid, "%llx.%08llx", 2036 dst_ci->i_vino.ino, dst_objnum); 2037 /* Do an object remote copy */ 2038 err = ceph_osdc_copy_from( 2039 &ceph_inode_to_client(src_inode)->client->osdc, 2040 src_ci->i_vino.snap, 0, 2041 &src_oid, &src_oloc, 2042 CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | 2043 CEPH_OSD_OP_FLAG_FADVISE_NOCACHE, 2044 &dst_oid, &dst_oloc, 2045 CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | 2046 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED, 0); 2047 if (err) { 2048 dout("ceph_osdc_copy_from returned %d\n", err); 2049 if (!ret) 2050 ret = err; 2051 goto out_caps; 2052 } 2053 len -= object_size; 2054 src_off += object_size; 2055 dst_off += object_size; 2056 ret += object_size; 2057 } 2058 2059 if (len) 2060 /* We still need one final local copy */ 2061 do_final_copy = true; 2062 2063 file_update_time(dst_file); 2064 if (endoff > size) { 2065 int caps_flags = 0; 2066 2067 /* Let the MDS know about dst file size change */ 2068 if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff)) 2069 caps_flags |= CHECK_CAPS_NODELAY; 2070 if (ceph_inode_set_size(dst_inode, endoff)) 2071 caps_flags |= CHECK_CAPS_AUTHONLY; 2072 if (caps_flags) 2073 ceph_check_caps(dst_ci, caps_flags, NULL); 2074 } 2075 /* Mark Fw dirty */ 2076 spin_lock(&dst_ci->i_ceph_lock); 2077 dst_ci->i_inline_version = CEPH_INLINE_NONE; 2078 dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf); 2079 spin_unlock(&dst_ci->i_ceph_lock); 2080 if (dirty) 2081 __mark_inode_dirty(dst_inode, dirty); 2082 2083 out_caps: 2084 put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got); 2085 2086 if (do_final_copy) { 2087 err = do_splice_direct(src_file, &src_off, dst_file, 2088 &dst_off, len, flags); 2089 if (err < 0) { 2090 dout("do_splice_direct returned %d\n", err); 2091 goto out; 2092 } 2093 len -= err; 2094 ret += err; 2095 } 2096 2097 out: 2098 ceph_free_cap_flush(prealloc_cf); 2099 2100 return ret; 2101 } 2102 2103 const struct file_operations ceph_file_fops = { 2104 .open = ceph_open, 2105 .release = ceph_release, 2106 .llseek = ceph_llseek, 2107 .read_iter = ceph_read_iter, 2108 .write_iter = ceph_write_iter, 2109 .mmap = ceph_mmap, 2110 .fsync = ceph_fsync, 2111 .lock = ceph_lock, 2112 .flock = ceph_flock, 2113 .splice_read = generic_file_splice_read, 2114 .splice_write = iter_file_splice_write, 2115 .unlocked_ioctl = ceph_ioctl, 2116 .compat_ioctl = ceph_ioctl, 2117 .fallocate = ceph_fallocate, 2118 .copy_file_range = ceph_copy_file_range, 2119 }; 2120