1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/spinlock.h> 5 #include <linux/namei.h> 6 #include <linux/slab.h> 7 #include <linux/sched.h> 8 #include <linux/xattr.h> 9 10 #include "super.h" 11 #include "mds_client.h" 12 13 /* 14 * Directory operations: readdir, lookup, create, link, unlink, 15 * rename, etc. 16 */ 17 18 /* 19 * Ceph MDS operations are specified in terms of a base ino and 20 * relative path. Thus, the client can specify an operation on a 21 * specific inode (e.g., a getattr due to fstat(2)), or as a path 22 * relative to, say, the root directory. 23 * 24 * Normally, we limit ourselves to strict inode ops (no path component) 25 * or dentry operations (a single path component relative to an ino). The 26 * exception to this is open_root_dentry(), which will open the mount 27 * point by name. 28 */ 29 30 const struct dentry_operations ceph_dentry_ops; 31 32 static bool __dentry_lease_is_valid(struct ceph_dentry_info *di); 33 static int __dir_lease_try_check(const struct dentry *dentry); 34 35 /* 36 * Initialize ceph dentry state. 37 */ 38 static int ceph_d_init(struct dentry *dentry) 39 { 40 struct ceph_dentry_info *di; 41 42 di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL); 43 if (!di) 44 return -ENOMEM; /* oh well */ 45 46 di->dentry = dentry; 47 di->lease_session = NULL; 48 di->time = jiffies; 49 dentry->d_fsdata = di; 50 INIT_LIST_HEAD(&di->lease_list); 51 return 0; 52 } 53 54 /* 55 * for f_pos for readdir: 56 * - hash order: 57 * (0xff << 52) | ((24 bits hash) << 28) | 58 * (the nth entry has hash collision); 59 * - frag+name order; 60 * ((frag value) << 28) | (the nth entry in frag); 61 */ 62 #define OFFSET_BITS 28 63 #define OFFSET_MASK ((1 << OFFSET_BITS) - 1) 64 #define HASH_ORDER (0xffull << (OFFSET_BITS + 24)) 65 loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order) 66 { 67 loff_t fpos = ((loff_t)high << 28) | (loff_t)off; 68 if (hash_order) 69 fpos |= HASH_ORDER; 70 return fpos; 71 } 72 73 static bool is_hash_order(loff_t p) 74 { 75 return (p & HASH_ORDER) == HASH_ORDER; 76 } 77 78 static unsigned fpos_frag(loff_t p) 79 { 80 return p >> OFFSET_BITS; 81 } 82 83 static unsigned fpos_hash(loff_t p) 84 { 85 return ceph_frag_value(fpos_frag(p)); 86 } 87 88 static unsigned fpos_off(loff_t p) 89 { 90 return p & OFFSET_MASK; 91 } 92 93 static int fpos_cmp(loff_t l, loff_t r) 94 { 95 int v = ceph_frag_compare(fpos_frag(l), fpos_frag(r)); 96 if (v) 97 return v; 98 return (int)(fpos_off(l) - fpos_off(r)); 99 } 100 101 /* 102 * make note of the last dentry we read, so we can 103 * continue at the same lexicographical point, 104 * regardless of what dir changes take place on the 105 * server. 106 */ 107 static int note_last_dentry(struct ceph_dir_file_info *dfi, const char *name, 108 int len, unsigned next_offset) 109 { 110 char *buf = kmalloc(len+1, GFP_KERNEL); 111 if (!buf) 112 return -ENOMEM; 113 kfree(dfi->last_name); 114 dfi->last_name = buf; 115 memcpy(dfi->last_name, name, len); 116 dfi->last_name[len] = 0; 117 dfi->next_offset = next_offset; 118 dout("note_last_dentry '%s'\n", dfi->last_name); 119 return 0; 120 } 121 122 123 static struct dentry * 124 __dcache_find_get_entry(struct dentry *parent, u64 idx, 125 struct ceph_readdir_cache_control *cache_ctl) 126 { 127 struct inode *dir = d_inode(parent); 128 struct dentry *dentry; 129 unsigned idx_mask = (PAGE_SIZE / sizeof(struct dentry *)) - 1; 130 loff_t ptr_pos = idx * sizeof(struct dentry *); 131 pgoff_t ptr_pgoff = ptr_pos >> PAGE_SHIFT; 132 133 if (ptr_pos >= i_size_read(dir)) 134 return NULL; 135 136 if (!cache_ctl->page || ptr_pgoff != page_index(cache_ctl->page)) { 137 ceph_readdir_cache_release(cache_ctl); 138 cache_ctl->page = find_lock_page(&dir->i_data, ptr_pgoff); 139 if (!cache_ctl->page) { 140 dout(" page %lu not found\n", ptr_pgoff); 141 return ERR_PTR(-EAGAIN); 142 } 143 /* reading/filling the cache are serialized by 144 i_mutex, no need to use page lock */ 145 unlock_page(cache_ctl->page); 146 cache_ctl->dentries = kmap(cache_ctl->page); 147 } 148 149 cache_ctl->index = idx & idx_mask; 150 151 rcu_read_lock(); 152 spin_lock(&parent->d_lock); 153 /* check i_size again here, because empty directory can be 154 * marked as complete while not holding the i_mutex. */ 155 if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir)) 156 dentry = cache_ctl->dentries[cache_ctl->index]; 157 else 158 dentry = NULL; 159 spin_unlock(&parent->d_lock); 160 if (dentry && !lockref_get_not_dead(&dentry->d_lockref)) 161 dentry = NULL; 162 rcu_read_unlock(); 163 return dentry ? : ERR_PTR(-EAGAIN); 164 } 165 166 /* 167 * When possible, we try to satisfy a readdir by peeking at the 168 * dcache. We make this work by carefully ordering dentries on 169 * d_child when we initially get results back from the MDS, and 170 * falling back to a "normal" sync readdir if any dentries in the dir 171 * are dropped. 172 * 173 * Complete dir indicates that we have all dentries in the dir. It is 174 * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by 175 * the MDS if/when the directory is modified). 176 */ 177 static int __dcache_readdir(struct file *file, struct dir_context *ctx, 178 int shared_gen) 179 { 180 struct ceph_dir_file_info *dfi = file->private_data; 181 struct dentry *parent = file->f_path.dentry; 182 struct inode *dir = d_inode(parent); 183 struct dentry *dentry, *last = NULL; 184 struct ceph_dentry_info *di; 185 struct ceph_readdir_cache_control cache_ctl = {}; 186 u64 idx = 0; 187 int err = 0; 188 189 dout("__dcache_readdir %p v%u at %llx\n", dir, (unsigned)shared_gen, ctx->pos); 190 191 /* search start position */ 192 if (ctx->pos > 2) { 193 u64 count = div_u64(i_size_read(dir), sizeof(struct dentry *)); 194 while (count > 0) { 195 u64 step = count >> 1; 196 dentry = __dcache_find_get_entry(parent, idx + step, 197 &cache_ctl); 198 if (!dentry) { 199 /* use linar search */ 200 idx = 0; 201 break; 202 } 203 if (IS_ERR(dentry)) { 204 err = PTR_ERR(dentry); 205 goto out; 206 } 207 di = ceph_dentry(dentry); 208 spin_lock(&dentry->d_lock); 209 if (fpos_cmp(di->offset, ctx->pos) < 0) { 210 idx += step + 1; 211 count -= step + 1; 212 } else { 213 count = step; 214 } 215 spin_unlock(&dentry->d_lock); 216 dput(dentry); 217 } 218 219 dout("__dcache_readdir %p cache idx %llu\n", dir, idx); 220 } 221 222 223 for (;;) { 224 bool emit_dentry = false; 225 dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl); 226 if (!dentry) { 227 dfi->file_info.flags |= CEPH_F_ATEND; 228 err = 0; 229 break; 230 } 231 if (IS_ERR(dentry)) { 232 err = PTR_ERR(dentry); 233 goto out; 234 } 235 236 spin_lock(&dentry->d_lock); 237 di = ceph_dentry(dentry); 238 if (d_unhashed(dentry) || 239 d_really_is_negative(dentry) || 240 di->lease_shared_gen != shared_gen) { 241 spin_unlock(&dentry->d_lock); 242 dput(dentry); 243 err = -EAGAIN; 244 goto out; 245 } 246 if (fpos_cmp(ctx->pos, di->offset) <= 0) { 247 __ceph_dentry_dir_lease_touch(di); 248 emit_dentry = true; 249 } 250 spin_unlock(&dentry->d_lock); 251 252 if (emit_dentry) { 253 dout(" %llx dentry %p %pd %p\n", di->offset, 254 dentry, dentry, d_inode(dentry)); 255 ctx->pos = di->offset; 256 if (!dir_emit(ctx, dentry->d_name.name, 257 dentry->d_name.len, 258 ceph_translate_ino(dentry->d_sb, 259 d_inode(dentry)->i_ino), 260 d_inode(dentry)->i_mode >> 12)) { 261 dput(dentry); 262 err = 0; 263 break; 264 } 265 ctx->pos++; 266 267 if (last) 268 dput(last); 269 last = dentry; 270 } else { 271 dput(dentry); 272 } 273 } 274 out: 275 ceph_readdir_cache_release(&cache_ctl); 276 if (last) { 277 int ret; 278 di = ceph_dentry(last); 279 ret = note_last_dentry(dfi, last->d_name.name, last->d_name.len, 280 fpos_off(di->offset) + 1); 281 if (ret < 0) 282 err = ret; 283 dput(last); 284 /* last_name no longer match cache index */ 285 if (dfi->readdir_cache_idx >= 0) { 286 dfi->readdir_cache_idx = -1; 287 dfi->dir_release_count = 0; 288 } 289 } 290 return err; 291 } 292 293 static bool need_send_readdir(struct ceph_dir_file_info *dfi, loff_t pos) 294 { 295 if (!dfi->last_readdir) 296 return true; 297 if (is_hash_order(pos)) 298 return !ceph_frag_contains_value(dfi->frag, fpos_hash(pos)); 299 else 300 return dfi->frag != fpos_frag(pos); 301 } 302 303 static int ceph_readdir(struct file *file, struct dir_context *ctx) 304 { 305 struct ceph_dir_file_info *dfi = file->private_data; 306 struct inode *inode = file_inode(file); 307 struct ceph_inode_info *ci = ceph_inode(inode); 308 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 309 struct ceph_mds_client *mdsc = fsc->mdsc; 310 int i; 311 int err; 312 unsigned frag = -1; 313 struct ceph_mds_reply_info_parsed *rinfo; 314 315 dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos); 316 if (dfi->file_info.flags & CEPH_F_ATEND) 317 return 0; 318 319 /* always start with . and .. */ 320 if (ctx->pos == 0) { 321 dout("readdir off 0 -> '.'\n"); 322 if (!dir_emit(ctx, ".", 1, 323 ceph_translate_ino(inode->i_sb, inode->i_ino), 324 inode->i_mode >> 12)) 325 return 0; 326 ctx->pos = 1; 327 } 328 if (ctx->pos == 1) { 329 ino_t ino = parent_ino(file->f_path.dentry); 330 dout("readdir off 1 -> '..'\n"); 331 if (!dir_emit(ctx, "..", 2, 332 ceph_translate_ino(inode->i_sb, ino), 333 inode->i_mode >> 12)) 334 return 0; 335 ctx->pos = 2; 336 } 337 338 /* can we use the dcache? */ 339 spin_lock(&ci->i_ceph_lock); 340 if (ceph_test_mount_opt(fsc, DCACHE) && 341 !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && 342 ceph_snap(inode) != CEPH_SNAPDIR && 343 __ceph_dir_is_complete_ordered(ci) && 344 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { 345 int shared_gen = atomic_read(&ci->i_shared_gen); 346 spin_unlock(&ci->i_ceph_lock); 347 err = __dcache_readdir(file, ctx, shared_gen); 348 if (err != -EAGAIN) 349 return err; 350 } else { 351 spin_unlock(&ci->i_ceph_lock); 352 } 353 354 /* proceed with a normal readdir */ 355 more: 356 /* do we have the correct frag content buffered? */ 357 if (need_send_readdir(dfi, ctx->pos)) { 358 struct ceph_mds_request *req; 359 int op = ceph_snap(inode) == CEPH_SNAPDIR ? 360 CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR; 361 362 /* discard old result, if any */ 363 if (dfi->last_readdir) { 364 ceph_mdsc_put_request(dfi->last_readdir); 365 dfi->last_readdir = NULL; 366 } 367 368 if (is_hash_order(ctx->pos)) { 369 /* fragtree isn't always accurate. choose frag 370 * based on previous reply when possible. */ 371 if (frag == (unsigned)-1) 372 frag = ceph_choose_frag(ci, fpos_hash(ctx->pos), 373 NULL, NULL); 374 } else { 375 frag = fpos_frag(ctx->pos); 376 } 377 378 dout("readdir fetching %llx.%llx frag %x offset '%s'\n", 379 ceph_vinop(inode), frag, dfi->last_name); 380 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); 381 if (IS_ERR(req)) 382 return PTR_ERR(req); 383 err = ceph_alloc_readdir_reply_buffer(req, inode); 384 if (err) { 385 ceph_mdsc_put_request(req); 386 return err; 387 } 388 /* hints to request -> mds selection code */ 389 req->r_direct_mode = USE_AUTH_MDS; 390 if (op == CEPH_MDS_OP_READDIR) { 391 req->r_direct_hash = ceph_frag_value(frag); 392 __set_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 393 req->r_inode_drop = CEPH_CAP_FILE_EXCL; 394 } 395 if (dfi->last_name) { 396 req->r_path2 = kstrdup(dfi->last_name, GFP_KERNEL); 397 if (!req->r_path2) { 398 ceph_mdsc_put_request(req); 399 return -ENOMEM; 400 } 401 } else if (is_hash_order(ctx->pos)) { 402 req->r_args.readdir.offset_hash = 403 cpu_to_le32(fpos_hash(ctx->pos)); 404 } 405 406 req->r_dir_release_cnt = dfi->dir_release_count; 407 req->r_dir_ordered_cnt = dfi->dir_ordered_count; 408 req->r_readdir_cache_idx = dfi->readdir_cache_idx; 409 req->r_readdir_offset = dfi->next_offset; 410 req->r_args.readdir.frag = cpu_to_le32(frag); 411 req->r_args.readdir.flags = 412 cpu_to_le16(CEPH_READDIR_REPLY_BITFLAGS); 413 414 req->r_inode = inode; 415 ihold(inode); 416 req->r_dentry = dget(file->f_path.dentry); 417 err = ceph_mdsc_do_request(mdsc, NULL, req); 418 if (err < 0) { 419 ceph_mdsc_put_request(req); 420 return err; 421 } 422 dout("readdir got and parsed readdir result=%d on " 423 "frag %x, end=%d, complete=%d, hash_order=%d\n", 424 err, frag, 425 (int)req->r_reply_info.dir_end, 426 (int)req->r_reply_info.dir_complete, 427 (int)req->r_reply_info.hash_order); 428 429 rinfo = &req->r_reply_info; 430 if (le32_to_cpu(rinfo->dir_dir->frag) != frag) { 431 frag = le32_to_cpu(rinfo->dir_dir->frag); 432 if (!rinfo->hash_order) { 433 dfi->next_offset = req->r_readdir_offset; 434 /* adjust ctx->pos to beginning of frag */ 435 ctx->pos = ceph_make_fpos(frag, 436 dfi->next_offset, 437 false); 438 } 439 } 440 441 dfi->frag = frag; 442 dfi->last_readdir = req; 443 444 if (test_bit(CEPH_MDS_R_DID_PREPOPULATE, &req->r_req_flags)) { 445 dfi->readdir_cache_idx = req->r_readdir_cache_idx; 446 if (dfi->readdir_cache_idx < 0) { 447 /* preclude from marking dir ordered */ 448 dfi->dir_ordered_count = 0; 449 } else if (ceph_frag_is_leftmost(frag) && 450 dfi->next_offset == 2) { 451 /* note dir version at start of readdir so 452 * we can tell if any dentries get dropped */ 453 dfi->dir_release_count = req->r_dir_release_cnt; 454 dfi->dir_ordered_count = req->r_dir_ordered_cnt; 455 } 456 } else { 457 dout("readdir !did_prepopulate\n"); 458 /* disable readdir cache */ 459 dfi->readdir_cache_idx = -1; 460 /* preclude from marking dir complete */ 461 dfi->dir_release_count = 0; 462 } 463 464 /* note next offset and last dentry name */ 465 if (rinfo->dir_nr > 0) { 466 struct ceph_mds_reply_dir_entry *rde = 467 rinfo->dir_entries + (rinfo->dir_nr-1); 468 unsigned next_offset = req->r_reply_info.dir_end ? 469 2 : (fpos_off(rde->offset) + 1); 470 err = note_last_dentry(dfi, rde->name, rde->name_len, 471 next_offset); 472 if (err) 473 return err; 474 } else if (req->r_reply_info.dir_end) { 475 dfi->next_offset = 2; 476 /* keep last name */ 477 } 478 } 479 480 rinfo = &dfi->last_readdir->r_reply_info; 481 dout("readdir frag %x num %d pos %llx chunk first %llx\n", 482 dfi->frag, rinfo->dir_nr, ctx->pos, 483 rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL); 484 485 i = 0; 486 /* search start position */ 487 if (rinfo->dir_nr > 0) { 488 int step, nr = rinfo->dir_nr; 489 while (nr > 0) { 490 step = nr >> 1; 491 if (rinfo->dir_entries[i + step].offset < ctx->pos) { 492 i += step + 1; 493 nr -= step + 1; 494 } else { 495 nr = step; 496 } 497 } 498 } 499 for (; i < rinfo->dir_nr; i++) { 500 struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i; 501 struct ceph_vino vino; 502 ino_t ino; 503 u32 ftype; 504 505 BUG_ON(rde->offset < ctx->pos); 506 507 ctx->pos = rde->offset; 508 dout("readdir (%d/%d) -> %llx '%.*s' %p\n", 509 i, rinfo->dir_nr, ctx->pos, 510 rde->name_len, rde->name, &rde->inode.in); 511 512 BUG_ON(!rde->inode.in); 513 ftype = le32_to_cpu(rde->inode.in->mode) >> 12; 514 vino.ino = le64_to_cpu(rde->inode.in->ino); 515 vino.snap = le64_to_cpu(rde->inode.in->snapid); 516 ino = ceph_vino_to_ino(vino); 517 518 if (!dir_emit(ctx, rde->name, rde->name_len, 519 ceph_translate_ino(inode->i_sb, ino), ftype)) { 520 dout("filldir stopping us...\n"); 521 return 0; 522 } 523 ctx->pos++; 524 } 525 526 ceph_mdsc_put_request(dfi->last_readdir); 527 dfi->last_readdir = NULL; 528 529 if (dfi->next_offset > 2) { 530 frag = dfi->frag; 531 goto more; 532 } 533 534 /* more frags? */ 535 if (!ceph_frag_is_rightmost(dfi->frag)) { 536 frag = ceph_frag_next(dfi->frag); 537 if (is_hash_order(ctx->pos)) { 538 loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag), 539 dfi->next_offset, true); 540 if (new_pos > ctx->pos) 541 ctx->pos = new_pos; 542 /* keep last_name */ 543 } else { 544 ctx->pos = ceph_make_fpos(frag, dfi->next_offset, 545 false); 546 kfree(dfi->last_name); 547 dfi->last_name = NULL; 548 } 549 dout("readdir next frag is %x\n", frag); 550 goto more; 551 } 552 dfi->file_info.flags |= CEPH_F_ATEND; 553 554 /* 555 * if dir_release_count still matches the dir, no dentries 556 * were released during the whole readdir, and we should have 557 * the complete dir contents in our cache. 558 */ 559 if (atomic64_read(&ci->i_release_count) == 560 dfi->dir_release_count) { 561 spin_lock(&ci->i_ceph_lock); 562 if (dfi->dir_ordered_count == 563 atomic64_read(&ci->i_ordered_count)) { 564 dout(" marking %p complete and ordered\n", inode); 565 /* use i_size to track number of entries in 566 * readdir cache */ 567 BUG_ON(dfi->readdir_cache_idx < 0); 568 i_size_write(inode, dfi->readdir_cache_idx * 569 sizeof(struct dentry*)); 570 } else { 571 dout(" marking %p complete\n", inode); 572 } 573 __ceph_dir_set_complete(ci, dfi->dir_release_count, 574 dfi->dir_ordered_count); 575 spin_unlock(&ci->i_ceph_lock); 576 } 577 578 dout("readdir %p file %p done.\n", inode, file); 579 return 0; 580 } 581 582 static void reset_readdir(struct ceph_dir_file_info *dfi) 583 { 584 if (dfi->last_readdir) { 585 ceph_mdsc_put_request(dfi->last_readdir); 586 dfi->last_readdir = NULL; 587 } 588 kfree(dfi->last_name); 589 dfi->last_name = NULL; 590 dfi->dir_release_count = 0; 591 dfi->readdir_cache_idx = -1; 592 dfi->next_offset = 2; /* compensate for . and .. */ 593 dfi->file_info.flags &= ~CEPH_F_ATEND; 594 } 595 596 /* 597 * discard buffered readdir content on seekdir(0), or seek to new frag, 598 * or seek prior to current chunk 599 */ 600 static bool need_reset_readdir(struct ceph_dir_file_info *dfi, loff_t new_pos) 601 { 602 struct ceph_mds_reply_info_parsed *rinfo; 603 loff_t chunk_offset; 604 if (new_pos == 0) 605 return true; 606 if (is_hash_order(new_pos)) { 607 /* no need to reset last_name for a forward seek when 608 * dentries are sotred in hash order */ 609 } else if (dfi->frag != fpos_frag(new_pos)) { 610 return true; 611 } 612 rinfo = dfi->last_readdir ? &dfi->last_readdir->r_reply_info : NULL; 613 if (!rinfo || !rinfo->dir_nr) 614 return true; 615 chunk_offset = rinfo->dir_entries[0].offset; 616 return new_pos < chunk_offset || 617 is_hash_order(new_pos) != is_hash_order(chunk_offset); 618 } 619 620 static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) 621 { 622 struct ceph_dir_file_info *dfi = file->private_data; 623 struct inode *inode = file->f_mapping->host; 624 loff_t retval; 625 626 inode_lock(inode); 627 retval = -EINVAL; 628 switch (whence) { 629 case SEEK_CUR: 630 offset += file->f_pos; 631 case SEEK_SET: 632 break; 633 case SEEK_END: 634 retval = -EOPNOTSUPP; 635 default: 636 goto out; 637 } 638 639 if (offset >= 0) { 640 if (need_reset_readdir(dfi, offset)) { 641 dout("dir_llseek dropping %p content\n", file); 642 reset_readdir(dfi); 643 } else if (is_hash_order(offset) && offset > file->f_pos) { 644 /* for hash offset, we don't know if a forward seek 645 * is within same frag */ 646 dfi->dir_release_count = 0; 647 dfi->readdir_cache_idx = -1; 648 } 649 650 if (offset != file->f_pos) { 651 file->f_pos = offset; 652 file->f_version = 0; 653 dfi->file_info.flags &= ~CEPH_F_ATEND; 654 } 655 retval = offset; 656 } 657 out: 658 inode_unlock(inode); 659 return retval; 660 } 661 662 /* 663 * Handle lookups for the hidden .snap directory. 664 */ 665 int ceph_handle_snapdir(struct ceph_mds_request *req, 666 struct dentry *dentry, int err) 667 { 668 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 669 struct inode *parent = d_inode(dentry->d_parent); /* we hold i_mutex */ 670 671 /* .snap dir? */ 672 if (err == -ENOENT && 673 ceph_snap(parent) == CEPH_NOSNAP && 674 strcmp(dentry->d_name.name, 675 fsc->mount_options->snapdir_name) == 0) { 676 struct inode *inode = ceph_get_snapdir(parent); 677 dout("ENOENT on snapdir %p '%pd', linking to snapdir %p\n", 678 dentry, dentry, inode); 679 BUG_ON(!d_unhashed(dentry)); 680 d_add(dentry, inode); 681 err = 0; 682 } 683 return err; 684 } 685 686 /* 687 * Figure out final result of a lookup/open request. 688 * 689 * Mainly, make sure we return the final req->r_dentry (if it already 690 * existed) in place of the original VFS-provided dentry when they 691 * differ. 692 * 693 * Gracefully handle the case where the MDS replies with -ENOENT and 694 * no trace (which it may do, at its discretion, e.g., if it doesn't 695 * care to issue a lease on the negative dentry). 696 */ 697 struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, 698 struct dentry *dentry, int err) 699 { 700 if (err == -ENOENT) { 701 /* no trace? */ 702 err = 0; 703 if (!req->r_reply_info.head->is_dentry) { 704 dout("ENOENT and no trace, dentry %p inode %p\n", 705 dentry, d_inode(dentry)); 706 if (d_really_is_positive(dentry)) { 707 d_drop(dentry); 708 err = -ENOENT; 709 } else { 710 d_add(dentry, NULL); 711 } 712 } 713 } 714 if (err) 715 dentry = ERR_PTR(err); 716 else if (dentry != req->r_dentry) 717 dentry = dget(req->r_dentry); /* we got spliced */ 718 else 719 dentry = NULL; 720 return dentry; 721 } 722 723 static bool is_root_ceph_dentry(struct inode *inode, struct dentry *dentry) 724 { 725 return ceph_ino(inode) == CEPH_INO_ROOT && 726 strncmp(dentry->d_name.name, ".ceph", 5) == 0; 727 } 728 729 /* 730 * Look up a single dir entry. If there is a lookup intent, inform 731 * the MDS so that it gets our 'caps wanted' value in a single op. 732 */ 733 static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, 734 unsigned int flags) 735 { 736 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 737 struct ceph_mds_client *mdsc = fsc->mdsc; 738 struct ceph_mds_request *req; 739 int op; 740 int mask; 741 int err; 742 743 dout("lookup %p dentry %p '%pd'\n", 744 dir, dentry, dentry); 745 746 if (dentry->d_name.len > NAME_MAX) 747 return ERR_PTR(-ENAMETOOLONG); 748 749 /* can we conclude ENOENT locally? */ 750 if (d_really_is_negative(dentry)) { 751 struct ceph_inode_info *ci = ceph_inode(dir); 752 struct ceph_dentry_info *di = ceph_dentry(dentry); 753 754 spin_lock(&ci->i_ceph_lock); 755 dout(" dir %p flags are %d\n", dir, ci->i_ceph_flags); 756 if (strncmp(dentry->d_name.name, 757 fsc->mount_options->snapdir_name, 758 dentry->d_name.len) && 759 !is_root_ceph_dentry(dir, dentry) && 760 ceph_test_mount_opt(fsc, DCACHE) && 761 __ceph_dir_is_complete(ci) && 762 (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) { 763 spin_unlock(&ci->i_ceph_lock); 764 dout(" dir %p complete, -ENOENT\n", dir); 765 d_add(dentry, NULL); 766 di->lease_shared_gen = atomic_read(&ci->i_shared_gen); 767 return NULL; 768 } 769 spin_unlock(&ci->i_ceph_lock); 770 } 771 772 op = ceph_snap(dir) == CEPH_SNAPDIR ? 773 CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP; 774 req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS); 775 if (IS_ERR(req)) 776 return ERR_CAST(req); 777 req->r_dentry = dget(dentry); 778 req->r_num_caps = 2; 779 780 mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED; 781 if (ceph_security_xattr_wanted(dir)) 782 mask |= CEPH_CAP_XATTR_SHARED; 783 req->r_args.getattr.mask = cpu_to_le32(mask); 784 785 req->r_parent = dir; 786 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 787 err = ceph_mdsc_do_request(mdsc, NULL, req); 788 err = ceph_handle_snapdir(req, dentry, err); 789 dentry = ceph_finish_lookup(req, dentry, err); 790 ceph_mdsc_put_request(req); /* will dput(dentry) */ 791 dout("lookup result=%p\n", dentry); 792 return dentry; 793 } 794 795 /* 796 * If we do a create but get no trace back from the MDS, follow up with 797 * a lookup (the VFS expects us to link up the provided dentry). 798 */ 799 int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry) 800 { 801 struct dentry *result = ceph_lookup(dir, dentry, 0); 802 803 if (result && !IS_ERR(result)) { 804 /* 805 * We created the item, then did a lookup, and found 806 * it was already linked to another inode we already 807 * had in our cache (and thus got spliced). To not 808 * confuse VFS (especially when inode is a directory), 809 * we don't link our dentry to that inode, return an 810 * error instead. 811 * 812 * This event should be rare and it happens only when 813 * we talk to old MDS. Recent MDS does not send traceless 814 * reply for request that creates new inode. 815 */ 816 d_drop(result); 817 return -ESTALE; 818 } 819 return PTR_ERR(result); 820 } 821 822 static int ceph_mknod(struct inode *dir, struct dentry *dentry, 823 umode_t mode, dev_t rdev) 824 { 825 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 826 struct ceph_mds_client *mdsc = fsc->mdsc; 827 struct ceph_mds_request *req; 828 struct ceph_acl_sec_ctx as_ctx = {}; 829 int err; 830 831 if (ceph_snap(dir) != CEPH_NOSNAP) 832 return -EROFS; 833 834 if (ceph_quota_is_max_files_exceeded(dir)) { 835 err = -EDQUOT; 836 goto out; 837 } 838 839 err = ceph_pre_init_acls(dir, &mode, &as_ctx); 840 if (err < 0) 841 goto out; 842 err = ceph_security_init_secctx(dentry, mode, &as_ctx); 843 if (err < 0) 844 goto out; 845 846 dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n", 847 dir, dentry, mode, rdev); 848 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS); 849 if (IS_ERR(req)) { 850 err = PTR_ERR(req); 851 goto out; 852 } 853 req->r_dentry = dget(dentry); 854 req->r_num_caps = 2; 855 req->r_parent = dir; 856 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 857 req->r_args.mknod.mode = cpu_to_le32(mode); 858 req->r_args.mknod.rdev = cpu_to_le32(rdev); 859 req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; 860 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 861 if (as_ctx.pagelist) { 862 req->r_pagelist = as_ctx.pagelist; 863 as_ctx.pagelist = NULL; 864 } 865 err = ceph_mdsc_do_request(mdsc, dir, req); 866 if (!err && !req->r_reply_info.head->is_dentry) 867 err = ceph_handle_notrace_create(dir, dentry); 868 ceph_mdsc_put_request(req); 869 out: 870 if (!err) 871 ceph_init_inode_acls(d_inode(dentry), &as_ctx); 872 else 873 d_drop(dentry); 874 ceph_release_acl_sec_ctx(&as_ctx); 875 return err; 876 } 877 878 static int ceph_create(struct inode *dir, struct dentry *dentry, umode_t mode, 879 bool excl) 880 { 881 return ceph_mknod(dir, dentry, mode, 0); 882 } 883 884 static int ceph_symlink(struct inode *dir, struct dentry *dentry, 885 const char *dest) 886 { 887 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 888 struct ceph_mds_client *mdsc = fsc->mdsc; 889 struct ceph_mds_request *req; 890 struct ceph_acl_sec_ctx as_ctx = {}; 891 int err; 892 893 if (ceph_snap(dir) != CEPH_NOSNAP) 894 return -EROFS; 895 896 if (ceph_quota_is_max_files_exceeded(dir)) { 897 err = -EDQUOT; 898 goto out; 899 } 900 901 err = ceph_security_init_secctx(dentry, S_IFLNK | 0777, &as_ctx); 902 if (err < 0) 903 goto out; 904 905 dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest); 906 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS); 907 if (IS_ERR(req)) { 908 err = PTR_ERR(req); 909 goto out; 910 } 911 req->r_path2 = kstrdup(dest, GFP_KERNEL); 912 if (!req->r_path2) { 913 err = -ENOMEM; 914 ceph_mdsc_put_request(req); 915 goto out; 916 } 917 req->r_parent = dir; 918 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 919 req->r_dentry = dget(dentry); 920 req->r_num_caps = 2; 921 req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; 922 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 923 err = ceph_mdsc_do_request(mdsc, dir, req); 924 if (!err && !req->r_reply_info.head->is_dentry) 925 err = ceph_handle_notrace_create(dir, dentry); 926 ceph_mdsc_put_request(req); 927 out: 928 if (err) 929 d_drop(dentry); 930 ceph_release_acl_sec_ctx(&as_ctx); 931 return err; 932 } 933 934 static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) 935 { 936 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 937 struct ceph_mds_client *mdsc = fsc->mdsc; 938 struct ceph_mds_request *req; 939 struct ceph_acl_sec_ctx as_ctx = {}; 940 int err = -EROFS; 941 int op; 942 943 if (ceph_snap(dir) == CEPH_SNAPDIR) { 944 /* mkdir .snap/foo is a MKSNAP */ 945 op = CEPH_MDS_OP_MKSNAP; 946 dout("mksnap dir %p snap '%pd' dn %p\n", dir, 947 dentry, dentry); 948 } else if (ceph_snap(dir) == CEPH_NOSNAP) { 949 dout("mkdir dir %p dn %p mode 0%ho\n", dir, dentry, mode); 950 op = CEPH_MDS_OP_MKDIR; 951 } else { 952 goto out; 953 } 954 955 if (op == CEPH_MDS_OP_MKDIR && 956 ceph_quota_is_max_files_exceeded(dir)) { 957 err = -EDQUOT; 958 goto out; 959 } 960 961 mode |= S_IFDIR; 962 err = ceph_pre_init_acls(dir, &mode, &as_ctx); 963 if (err < 0) 964 goto out; 965 err = ceph_security_init_secctx(dentry, mode, &as_ctx); 966 if (err < 0) 967 goto out; 968 969 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); 970 if (IS_ERR(req)) { 971 err = PTR_ERR(req); 972 goto out; 973 } 974 975 req->r_dentry = dget(dentry); 976 req->r_num_caps = 2; 977 req->r_parent = dir; 978 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 979 req->r_args.mkdir.mode = cpu_to_le32(mode); 980 req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; 981 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 982 if (as_ctx.pagelist) { 983 req->r_pagelist = as_ctx.pagelist; 984 as_ctx.pagelist = NULL; 985 } 986 err = ceph_mdsc_do_request(mdsc, dir, req); 987 if (!err && 988 !req->r_reply_info.head->is_target && 989 !req->r_reply_info.head->is_dentry) 990 err = ceph_handle_notrace_create(dir, dentry); 991 ceph_mdsc_put_request(req); 992 out: 993 if (!err) 994 ceph_init_inode_acls(d_inode(dentry), &as_ctx); 995 else 996 d_drop(dentry); 997 ceph_release_acl_sec_ctx(&as_ctx); 998 return err; 999 } 1000 1001 static int ceph_link(struct dentry *old_dentry, struct inode *dir, 1002 struct dentry *dentry) 1003 { 1004 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 1005 struct ceph_mds_client *mdsc = fsc->mdsc; 1006 struct ceph_mds_request *req; 1007 int err; 1008 1009 if (ceph_snap(dir) != CEPH_NOSNAP) 1010 return -EROFS; 1011 1012 dout("link in dir %p old_dentry %p dentry %p\n", dir, 1013 old_dentry, dentry); 1014 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK, USE_AUTH_MDS); 1015 if (IS_ERR(req)) { 1016 d_drop(dentry); 1017 return PTR_ERR(req); 1018 } 1019 req->r_dentry = dget(dentry); 1020 req->r_num_caps = 2; 1021 req->r_old_dentry = dget(old_dentry); 1022 req->r_parent = dir; 1023 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 1024 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 1025 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 1026 /* release LINK_SHARED on source inode (mds will lock it) */ 1027 req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL; 1028 err = ceph_mdsc_do_request(mdsc, dir, req); 1029 if (err) { 1030 d_drop(dentry); 1031 } else if (!req->r_reply_info.head->is_dentry) { 1032 ihold(d_inode(old_dentry)); 1033 d_instantiate(dentry, d_inode(old_dentry)); 1034 } 1035 ceph_mdsc_put_request(req); 1036 return err; 1037 } 1038 1039 /* 1040 * rmdir and unlink are differ only by the metadata op code 1041 */ 1042 static int ceph_unlink(struct inode *dir, struct dentry *dentry) 1043 { 1044 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 1045 struct ceph_mds_client *mdsc = fsc->mdsc; 1046 struct inode *inode = d_inode(dentry); 1047 struct ceph_mds_request *req; 1048 int err = -EROFS; 1049 int op; 1050 1051 if (ceph_snap(dir) == CEPH_SNAPDIR) { 1052 /* rmdir .snap/foo is RMSNAP */ 1053 dout("rmsnap dir %p '%pd' dn %p\n", dir, dentry, dentry); 1054 op = CEPH_MDS_OP_RMSNAP; 1055 } else if (ceph_snap(dir) == CEPH_NOSNAP) { 1056 dout("unlink/rmdir dir %p dn %p inode %p\n", 1057 dir, dentry, inode); 1058 op = d_is_dir(dentry) ? 1059 CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK; 1060 } else 1061 goto out; 1062 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); 1063 if (IS_ERR(req)) { 1064 err = PTR_ERR(req); 1065 goto out; 1066 } 1067 req->r_dentry = dget(dentry); 1068 req->r_num_caps = 2; 1069 req->r_parent = dir; 1070 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 1071 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 1072 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 1073 req->r_inode_drop = ceph_drop_caps_for_unlink(inode); 1074 err = ceph_mdsc_do_request(mdsc, dir, req); 1075 if (!err && !req->r_reply_info.head->is_dentry) 1076 d_delete(dentry); 1077 ceph_mdsc_put_request(req); 1078 out: 1079 return err; 1080 } 1081 1082 static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, 1083 struct inode *new_dir, struct dentry *new_dentry, 1084 unsigned int flags) 1085 { 1086 struct ceph_fs_client *fsc = ceph_sb_to_client(old_dir->i_sb); 1087 struct ceph_mds_client *mdsc = fsc->mdsc; 1088 struct ceph_mds_request *req; 1089 int op = CEPH_MDS_OP_RENAME; 1090 int err; 1091 1092 if (flags) 1093 return -EINVAL; 1094 1095 if (ceph_snap(old_dir) != ceph_snap(new_dir)) 1096 return -EXDEV; 1097 if (ceph_snap(old_dir) != CEPH_NOSNAP) { 1098 if (old_dir == new_dir && ceph_snap(old_dir) == CEPH_SNAPDIR) 1099 op = CEPH_MDS_OP_RENAMESNAP; 1100 else 1101 return -EROFS; 1102 } 1103 /* don't allow cross-quota renames */ 1104 if ((old_dir != new_dir) && 1105 (!ceph_quota_is_same_realm(old_dir, new_dir))) 1106 return -EXDEV; 1107 1108 dout("rename dir %p dentry %p to dir %p dentry %p\n", 1109 old_dir, old_dentry, new_dir, new_dentry); 1110 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); 1111 if (IS_ERR(req)) 1112 return PTR_ERR(req); 1113 ihold(old_dir); 1114 req->r_dentry = dget(new_dentry); 1115 req->r_num_caps = 2; 1116 req->r_old_dentry = dget(old_dentry); 1117 req->r_old_dentry_dir = old_dir; 1118 req->r_parent = new_dir; 1119 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 1120 req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED; 1121 req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL; 1122 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 1123 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 1124 /* release LINK_RDCACHE on source inode (mds will lock it) */ 1125 req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL; 1126 if (d_really_is_positive(new_dentry)) { 1127 req->r_inode_drop = 1128 ceph_drop_caps_for_unlink(d_inode(new_dentry)); 1129 } 1130 err = ceph_mdsc_do_request(mdsc, old_dir, req); 1131 if (!err && !req->r_reply_info.head->is_dentry) { 1132 /* 1133 * Normally d_move() is done by fill_trace (called by 1134 * do_request, above). If there is no trace, we need 1135 * to do it here. 1136 */ 1137 d_move(old_dentry, new_dentry); 1138 } 1139 ceph_mdsc_put_request(req); 1140 return err; 1141 } 1142 1143 /* 1144 * Move dentry to tail of mdsc->dentry_leases list when lease is updated. 1145 * Leases at front of the list will expire first. (Assume all leases have 1146 * similar duration) 1147 * 1148 * Called under dentry->d_lock. 1149 */ 1150 void __ceph_dentry_lease_touch(struct ceph_dentry_info *di) 1151 { 1152 struct dentry *dn = di->dentry; 1153 struct ceph_mds_client *mdsc; 1154 1155 dout("dentry_lease_touch %p %p '%pd'\n", di, dn, dn); 1156 1157 di->flags |= CEPH_DENTRY_LEASE_LIST; 1158 if (di->flags & CEPH_DENTRY_SHRINK_LIST) { 1159 di->flags |= CEPH_DENTRY_REFERENCED; 1160 return; 1161 } 1162 1163 mdsc = ceph_sb_to_client(dn->d_sb)->mdsc; 1164 spin_lock(&mdsc->dentry_list_lock); 1165 list_move_tail(&di->lease_list, &mdsc->dentry_leases); 1166 spin_unlock(&mdsc->dentry_list_lock); 1167 } 1168 1169 static void __dentry_dir_lease_touch(struct ceph_mds_client* mdsc, 1170 struct ceph_dentry_info *di) 1171 { 1172 di->flags &= ~(CEPH_DENTRY_LEASE_LIST | CEPH_DENTRY_REFERENCED); 1173 di->lease_gen = 0; 1174 di->time = jiffies; 1175 list_move_tail(&di->lease_list, &mdsc->dentry_dir_leases); 1176 } 1177 1178 /* 1179 * When dir lease is used, add dentry to tail of mdsc->dentry_dir_leases 1180 * list if it's not in the list, otherwise set 'referenced' flag. 1181 * 1182 * Called under dentry->d_lock. 1183 */ 1184 void __ceph_dentry_dir_lease_touch(struct ceph_dentry_info *di) 1185 { 1186 struct dentry *dn = di->dentry; 1187 struct ceph_mds_client *mdsc; 1188 1189 dout("dentry_dir_lease_touch %p %p '%pd' (offset %lld)\n", 1190 di, dn, dn, di->offset); 1191 1192 if (!list_empty(&di->lease_list)) { 1193 if (di->flags & CEPH_DENTRY_LEASE_LIST) { 1194 /* don't remove dentry from dentry lease list 1195 * if its lease is valid */ 1196 if (__dentry_lease_is_valid(di)) 1197 return; 1198 } else { 1199 di->flags |= CEPH_DENTRY_REFERENCED; 1200 return; 1201 } 1202 } 1203 1204 if (di->flags & CEPH_DENTRY_SHRINK_LIST) { 1205 di->flags |= CEPH_DENTRY_REFERENCED; 1206 di->flags &= ~CEPH_DENTRY_LEASE_LIST; 1207 return; 1208 } 1209 1210 mdsc = ceph_sb_to_client(dn->d_sb)->mdsc; 1211 spin_lock(&mdsc->dentry_list_lock); 1212 __dentry_dir_lease_touch(mdsc, di), 1213 spin_unlock(&mdsc->dentry_list_lock); 1214 } 1215 1216 static void __dentry_lease_unlist(struct ceph_dentry_info *di) 1217 { 1218 struct ceph_mds_client *mdsc; 1219 if (di->flags & CEPH_DENTRY_SHRINK_LIST) 1220 return; 1221 if (list_empty(&di->lease_list)) 1222 return; 1223 1224 mdsc = ceph_sb_to_client(di->dentry->d_sb)->mdsc; 1225 spin_lock(&mdsc->dentry_list_lock); 1226 list_del_init(&di->lease_list); 1227 spin_unlock(&mdsc->dentry_list_lock); 1228 } 1229 1230 enum { 1231 KEEP = 0, 1232 DELETE = 1, 1233 TOUCH = 2, 1234 STOP = 4, 1235 }; 1236 1237 struct ceph_lease_walk_control { 1238 bool dir_lease; 1239 bool expire_dir_lease; 1240 unsigned long nr_to_scan; 1241 unsigned long dir_lease_ttl; 1242 }; 1243 1244 static unsigned long 1245 __dentry_leases_walk(struct ceph_mds_client *mdsc, 1246 struct ceph_lease_walk_control *lwc, 1247 int (*check)(struct dentry*, void*)) 1248 { 1249 struct ceph_dentry_info *di, *tmp; 1250 struct dentry *dentry, *last = NULL; 1251 struct list_head* list; 1252 LIST_HEAD(dispose); 1253 unsigned long freed = 0; 1254 int ret = 0; 1255 1256 list = lwc->dir_lease ? &mdsc->dentry_dir_leases : &mdsc->dentry_leases; 1257 spin_lock(&mdsc->dentry_list_lock); 1258 list_for_each_entry_safe(di, tmp, list, lease_list) { 1259 if (!lwc->nr_to_scan) 1260 break; 1261 --lwc->nr_to_scan; 1262 1263 dentry = di->dentry; 1264 if (last == dentry) 1265 break; 1266 1267 if (!spin_trylock(&dentry->d_lock)) 1268 continue; 1269 1270 if (__lockref_is_dead(&dentry->d_lockref)) { 1271 list_del_init(&di->lease_list); 1272 goto next; 1273 } 1274 1275 ret = check(dentry, lwc); 1276 if (ret & TOUCH) { 1277 /* move it into tail of dir lease list */ 1278 __dentry_dir_lease_touch(mdsc, di); 1279 if (!last) 1280 last = dentry; 1281 } 1282 if (ret & DELETE) { 1283 /* stale lease */ 1284 di->flags &= ~CEPH_DENTRY_REFERENCED; 1285 if (dentry->d_lockref.count > 0) { 1286 /* update_dentry_lease() will re-add 1287 * it to lease list, or 1288 * ceph_d_delete() will return 1 when 1289 * last reference is dropped */ 1290 list_del_init(&di->lease_list); 1291 } else { 1292 di->flags |= CEPH_DENTRY_SHRINK_LIST; 1293 list_move_tail(&di->lease_list, &dispose); 1294 dget_dlock(dentry); 1295 } 1296 } 1297 next: 1298 spin_unlock(&dentry->d_lock); 1299 if (ret & STOP) 1300 break; 1301 } 1302 spin_unlock(&mdsc->dentry_list_lock); 1303 1304 while (!list_empty(&dispose)) { 1305 di = list_first_entry(&dispose, struct ceph_dentry_info, 1306 lease_list); 1307 dentry = di->dentry; 1308 spin_lock(&dentry->d_lock); 1309 1310 list_del_init(&di->lease_list); 1311 di->flags &= ~CEPH_DENTRY_SHRINK_LIST; 1312 if (di->flags & CEPH_DENTRY_REFERENCED) { 1313 spin_lock(&mdsc->dentry_list_lock); 1314 if (di->flags & CEPH_DENTRY_LEASE_LIST) { 1315 list_add_tail(&di->lease_list, 1316 &mdsc->dentry_leases); 1317 } else { 1318 __dentry_dir_lease_touch(mdsc, di); 1319 } 1320 spin_unlock(&mdsc->dentry_list_lock); 1321 } else { 1322 freed++; 1323 } 1324 1325 spin_unlock(&dentry->d_lock); 1326 /* ceph_d_delete() does the trick */ 1327 dput(dentry); 1328 } 1329 return freed; 1330 } 1331 1332 static int __dentry_lease_check(struct dentry *dentry, void *arg) 1333 { 1334 struct ceph_dentry_info *di = ceph_dentry(dentry); 1335 int ret; 1336 1337 if (__dentry_lease_is_valid(di)) 1338 return STOP; 1339 ret = __dir_lease_try_check(dentry); 1340 if (ret == -EBUSY) 1341 return KEEP; 1342 if (ret > 0) 1343 return TOUCH; 1344 return DELETE; 1345 } 1346 1347 static int __dir_lease_check(struct dentry *dentry, void *arg) 1348 { 1349 struct ceph_lease_walk_control *lwc = arg; 1350 struct ceph_dentry_info *di = ceph_dentry(dentry); 1351 1352 int ret = __dir_lease_try_check(dentry); 1353 if (ret == -EBUSY) 1354 return KEEP; 1355 if (ret > 0) { 1356 if (time_before(jiffies, di->time + lwc->dir_lease_ttl)) 1357 return STOP; 1358 /* Move dentry to tail of dir lease list if we don't want 1359 * to delete it. So dentries in the list are checked in a 1360 * round robin manner */ 1361 if (!lwc->expire_dir_lease) 1362 return TOUCH; 1363 if (dentry->d_lockref.count > 0 || 1364 (di->flags & CEPH_DENTRY_REFERENCED)) 1365 return TOUCH; 1366 /* invalidate dir lease */ 1367 di->lease_shared_gen = 0; 1368 } 1369 return DELETE; 1370 } 1371 1372 int ceph_trim_dentries(struct ceph_mds_client *mdsc) 1373 { 1374 struct ceph_lease_walk_control lwc; 1375 unsigned long count; 1376 unsigned long freed; 1377 1378 spin_lock(&mdsc->caps_list_lock); 1379 if (mdsc->caps_use_max > 0 && 1380 mdsc->caps_use_count > mdsc->caps_use_max) 1381 count = mdsc->caps_use_count - mdsc->caps_use_max; 1382 else 1383 count = 0; 1384 spin_unlock(&mdsc->caps_list_lock); 1385 1386 lwc.dir_lease = false; 1387 lwc.nr_to_scan = CEPH_CAPS_PER_RELEASE * 2; 1388 freed = __dentry_leases_walk(mdsc, &lwc, __dentry_lease_check); 1389 if (!lwc.nr_to_scan) /* more invalid leases */ 1390 return -EAGAIN; 1391 1392 if (lwc.nr_to_scan < CEPH_CAPS_PER_RELEASE) 1393 lwc.nr_to_scan = CEPH_CAPS_PER_RELEASE; 1394 1395 lwc.dir_lease = true; 1396 lwc.expire_dir_lease = freed < count; 1397 lwc.dir_lease_ttl = mdsc->fsc->mount_options->caps_wanted_delay_max * HZ; 1398 freed +=__dentry_leases_walk(mdsc, &lwc, __dir_lease_check); 1399 if (!lwc.nr_to_scan) /* more to check */ 1400 return -EAGAIN; 1401 1402 return freed > 0 ? 1 : 0; 1403 } 1404 1405 /* 1406 * Ensure a dentry lease will no longer revalidate. 1407 */ 1408 void ceph_invalidate_dentry_lease(struct dentry *dentry) 1409 { 1410 struct ceph_dentry_info *di = ceph_dentry(dentry); 1411 spin_lock(&dentry->d_lock); 1412 di->time = jiffies; 1413 di->lease_shared_gen = 0; 1414 __dentry_lease_unlist(di); 1415 spin_unlock(&dentry->d_lock); 1416 } 1417 1418 /* 1419 * Check if dentry lease is valid. If not, delete the lease. Try to 1420 * renew if the least is more than half up. 1421 */ 1422 static bool __dentry_lease_is_valid(struct ceph_dentry_info *di) 1423 { 1424 struct ceph_mds_session *session; 1425 1426 if (!di->lease_gen) 1427 return false; 1428 1429 session = di->lease_session; 1430 if (session) { 1431 u32 gen; 1432 unsigned long ttl; 1433 1434 spin_lock(&session->s_gen_ttl_lock); 1435 gen = session->s_cap_gen; 1436 ttl = session->s_cap_ttl; 1437 spin_unlock(&session->s_gen_ttl_lock); 1438 1439 if (di->lease_gen == gen && 1440 time_before(jiffies, ttl) && 1441 time_before(jiffies, di->time)) 1442 return true; 1443 } 1444 di->lease_gen = 0; 1445 return false; 1446 } 1447 1448 static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags) 1449 { 1450 struct ceph_dentry_info *di; 1451 struct ceph_mds_session *session = NULL; 1452 u32 seq = 0; 1453 int valid = 0; 1454 1455 spin_lock(&dentry->d_lock); 1456 di = ceph_dentry(dentry); 1457 if (di && __dentry_lease_is_valid(di)) { 1458 valid = 1; 1459 1460 if (di->lease_renew_after && 1461 time_after(jiffies, di->lease_renew_after)) { 1462 /* 1463 * We should renew. If we're in RCU walk mode 1464 * though, we can't do that so just return 1465 * -ECHILD. 1466 */ 1467 if (flags & LOOKUP_RCU) { 1468 valid = -ECHILD; 1469 } else { 1470 session = ceph_get_mds_session(di->lease_session); 1471 seq = di->lease_seq; 1472 di->lease_renew_after = 0; 1473 di->lease_renew_from = jiffies; 1474 } 1475 } 1476 } 1477 spin_unlock(&dentry->d_lock); 1478 1479 if (session) { 1480 ceph_mdsc_lease_send_msg(session, dentry, 1481 CEPH_MDS_LEASE_RENEW, seq); 1482 ceph_put_mds_session(session); 1483 } 1484 dout("dentry_lease_is_valid - dentry %p = %d\n", dentry, valid); 1485 return valid; 1486 } 1487 1488 /* 1489 * Called under dentry->d_lock. 1490 */ 1491 static int __dir_lease_try_check(const struct dentry *dentry) 1492 { 1493 struct ceph_dentry_info *di = ceph_dentry(dentry); 1494 struct inode *dir; 1495 struct ceph_inode_info *ci; 1496 int valid = 0; 1497 1498 if (!di->lease_shared_gen) 1499 return 0; 1500 if (IS_ROOT(dentry)) 1501 return 0; 1502 1503 dir = d_inode(dentry->d_parent); 1504 ci = ceph_inode(dir); 1505 1506 if (spin_trylock(&ci->i_ceph_lock)) { 1507 if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen && 1508 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 0)) 1509 valid = 1; 1510 spin_unlock(&ci->i_ceph_lock); 1511 } else { 1512 valid = -EBUSY; 1513 } 1514 1515 if (!valid) 1516 di->lease_shared_gen = 0; 1517 return valid; 1518 } 1519 1520 /* 1521 * Check if directory-wide content lease/cap is valid. 1522 */ 1523 static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry) 1524 { 1525 struct ceph_inode_info *ci = ceph_inode(dir); 1526 int valid; 1527 int shared_gen; 1528 1529 spin_lock(&ci->i_ceph_lock); 1530 valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1); 1531 shared_gen = atomic_read(&ci->i_shared_gen); 1532 spin_unlock(&ci->i_ceph_lock); 1533 if (valid) { 1534 struct ceph_dentry_info *di; 1535 spin_lock(&dentry->d_lock); 1536 di = ceph_dentry(dentry); 1537 if (dir == d_inode(dentry->d_parent) && 1538 di && di->lease_shared_gen == shared_gen) 1539 __ceph_dentry_dir_lease_touch(di); 1540 else 1541 valid = 0; 1542 spin_unlock(&dentry->d_lock); 1543 } 1544 dout("dir_lease_is_valid dir %p v%u dentry %p = %d\n", 1545 dir, (unsigned)atomic_read(&ci->i_shared_gen), dentry, valid); 1546 return valid; 1547 } 1548 1549 /* 1550 * Check if cached dentry can be trusted. 1551 */ 1552 static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) 1553 { 1554 int valid = 0; 1555 struct dentry *parent; 1556 struct inode *dir, *inode; 1557 1558 if (flags & LOOKUP_RCU) { 1559 parent = READ_ONCE(dentry->d_parent); 1560 dir = d_inode_rcu(parent); 1561 if (!dir) 1562 return -ECHILD; 1563 inode = d_inode_rcu(dentry); 1564 } else { 1565 parent = dget_parent(dentry); 1566 dir = d_inode(parent); 1567 inode = d_inode(dentry); 1568 } 1569 1570 dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry, 1571 dentry, inode, ceph_dentry(dentry)->offset); 1572 1573 /* always trust cached snapped dentries, snapdir dentry */ 1574 if (ceph_snap(dir) != CEPH_NOSNAP) { 1575 dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry, 1576 dentry, inode); 1577 valid = 1; 1578 } else if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 1579 valid = 1; 1580 } else { 1581 valid = dentry_lease_is_valid(dentry, flags); 1582 if (valid == -ECHILD) 1583 return valid; 1584 if (valid || dir_lease_is_valid(dir, dentry)) { 1585 if (inode) 1586 valid = ceph_is_any_caps(inode); 1587 else 1588 valid = 1; 1589 } 1590 } 1591 1592 if (!valid) { 1593 struct ceph_mds_client *mdsc = 1594 ceph_sb_to_client(dir->i_sb)->mdsc; 1595 struct ceph_mds_request *req; 1596 int op, err; 1597 u32 mask; 1598 1599 if (flags & LOOKUP_RCU) 1600 return -ECHILD; 1601 1602 op = ceph_snap(dir) == CEPH_SNAPDIR ? 1603 CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP; 1604 req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS); 1605 if (!IS_ERR(req)) { 1606 req->r_dentry = dget(dentry); 1607 req->r_num_caps = 2; 1608 req->r_parent = dir; 1609 1610 mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED; 1611 if (ceph_security_xattr_wanted(dir)) 1612 mask |= CEPH_CAP_XATTR_SHARED; 1613 req->r_args.getattr.mask = cpu_to_le32(mask); 1614 1615 err = ceph_mdsc_do_request(mdsc, NULL, req); 1616 switch (err) { 1617 case 0: 1618 if (d_really_is_positive(dentry) && 1619 d_inode(dentry) == req->r_target_inode) 1620 valid = 1; 1621 break; 1622 case -ENOENT: 1623 if (d_really_is_negative(dentry)) 1624 valid = 1; 1625 /* Fallthrough */ 1626 default: 1627 break; 1628 } 1629 ceph_mdsc_put_request(req); 1630 dout("d_revalidate %p lookup result=%d\n", 1631 dentry, err); 1632 } 1633 } 1634 1635 dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid"); 1636 if (!valid) 1637 ceph_dir_clear_complete(dir); 1638 1639 if (!(flags & LOOKUP_RCU)) 1640 dput(parent); 1641 return valid; 1642 } 1643 1644 /* 1645 * Delete unused dentry that doesn't have valid lease 1646 * 1647 * Called under dentry->d_lock. 1648 */ 1649 static int ceph_d_delete(const struct dentry *dentry) 1650 { 1651 struct ceph_dentry_info *di; 1652 1653 /* won't release caps */ 1654 if (d_really_is_negative(dentry)) 1655 return 0; 1656 if (ceph_snap(d_inode(dentry)) != CEPH_NOSNAP) 1657 return 0; 1658 /* vaild lease? */ 1659 di = ceph_dentry(dentry); 1660 if (di) { 1661 if (__dentry_lease_is_valid(di)) 1662 return 0; 1663 if (__dir_lease_try_check(dentry)) 1664 return 0; 1665 } 1666 return 1; 1667 } 1668 1669 /* 1670 * Release our ceph_dentry_info. 1671 */ 1672 static void ceph_d_release(struct dentry *dentry) 1673 { 1674 struct ceph_dentry_info *di = ceph_dentry(dentry); 1675 1676 dout("d_release %p\n", dentry); 1677 1678 spin_lock(&dentry->d_lock); 1679 __dentry_lease_unlist(di); 1680 dentry->d_fsdata = NULL; 1681 spin_unlock(&dentry->d_lock); 1682 1683 if (di->lease_session) 1684 ceph_put_mds_session(di->lease_session); 1685 kmem_cache_free(ceph_dentry_cachep, di); 1686 } 1687 1688 /* 1689 * When the VFS prunes a dentry from the cache, we need to clear the 1690 * complete flag on the parent directory. 1691 * 1692 * Called under dentry->d_lock. 1693 */ 1694 static void ceph_d_prune(struct dentry *dentry) 1695 { 1696 struct ceph_inode_info *dir_ci; 1697 struct ceph_dentry_info *di; 1698 1699 dout("ceph_d_prune %pd %p\n", dentry, dentry); 1700 1701 /* do we have a valid parent? */ 1702 if (IS_ROOT(dentry)) 1703 return; 1704 1705 /* we hold d_lock, so d_parent is stable */ 1706 dir_ci = ceph_inode(d_inode(dentry->d_parent)); 1707 if (dir_ci->i_vino.snap == CEPH_SNAPDIR) 1708 return; 1709 1710 /* who calls d_delete() should also disable dcache readdir */ 1711 if (d_really_is_negative(dentry)) 1712 return; 1713 1714 /* d_fsdata does not get cleared until d_release */ 1715 if (!d_unhashed(dentry)) { 1716 __ceph_dir_clear_complete(dir_ci); 1717 return; 1718 } 1719 1720 /* Disable dcache readdir just in case that someone called d_drop() 1721 * or d_invalidate(), but MDS didn't revoke CEPH_CAP_FILE_SHARED 1722 * properly (dcache readdir is still enabled) */ 1723 di = ceph_dentry(dentry); 1724 if (di->offset > 0 && 1725 di->lease_shared_gen == atomic_read(&dir_ci->i_shared_gen)) 1726 __ceph_dir_clear_ordered(dir_ci); 1727 } 1728 1729 /* 1730 * read() on a dir. This weird interface hack only works if mounted 1731 * with '-o dirstat'. 1732 */ 1733 static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size, 1734 loff_t *ppos) 1735 { 1736 struct ceph_dir_file_info *dfi = file->private_data; 1737 struct inode *inode = file_inode(file); 1738 struct ceph_inode_info *ci = ceph_inode(inode); 1739 int left; 1740 const int bufsize = 1024; 1741 1742 if (!ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), DIRSTAT)) 1743 return -EISDIR; 1744 1745 if (!dfi->dir_info) { 1746 dfi->dir_info = kmalloc(bufsize, GFP_KERNEL); 1747 if (!dfi->dir_info) 1748 return -ENOMEM; 1749 dfi->dir_info_len = 1750 snprintf(dfi->dir_info, bufsize, 1751 "entries: %20lld\n" 1752 " files: %20lld\n" 1753 " subdirs: %20lld\n" 1754 "rentries: %20lld\n" 1755 " rfiles: %20lld\n" 1756 " rsubdirs: %20lld\n" 1757 "rbytes: %20lld\n" 1758 "rctime: %10lld.%09ld\n", 1759 ci->i_files + ci->i_subdirs, 1760 ci->i_files, 1761 ci->i_subdirs, 1762 ci->i_rfiles + ci->i_rsubdirs, 1763 ci->i_rfiles, 1764 ci->i_rsubdirs, 1765 ci->i_rbytes, 1766 ci->i_rctime.tv_sec, 1767 ci->i_rctime.tv_nsec); 1768 } 1769 1770 if (*ppos >= dfi->dir_info_len) 1771 return 0; 1772 size = min_t(unsigned, size, dfi->dir_info_len-*ppos); 1773 left = copy_to_user(buf, dfi->dir_info + *ppos, size); 1774 if (left == size) 1775 return -EFAULT; 1776 *ppos += (size - left); 1777 return size - left; 1778 } 1779 1780 1781 1782 /* 1783 * Return name hash for a given dentry. This is dependent on 1784 * the parent directory's hash function. 1785 */ 1786 unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn) 1787 { 1788 struct ceph_inode_info *dci = ceph_inode(dir); 1789 unsigned hash; 1790 1791 switch (dci->i_dir_layout.dl_dir_hash) { 1792 case 0: /* for backward compat */ 1793 case CEPH_STR_HASH_LINUX: 1794 return dn->d_name.hash; 1795 1796 default: 1797 spin_lock(&dn->d_lock); 1798 hash = ceph_str_hash(dci->i_dir_layout.dl_dir_hash, 1799 dn->d_name.name, dn->d_name.len); 1800 spin_unlock(&dn->d_lock); 1801 return hash; 1802 } 1803 } 1804 1805 const struct file_operations ceph_dir_fops = { 1806 .read = ceph_read_dir, 1807 .iterate = ceph_readdir, 1808 .llseek = ceph_dir_llseek, 1809 .open = ceph_open, 1810 .release = ceph_release, 1811 .unlocked_ioctl = ceph_ioctl, 1812 .fsync = ceph_fsync, 1813 .lock = ceph_lock, 1814 .flock = ceph_flock, 1815 }; 1816 1817 const struct file_operations ceph_snapdir_fops = { 1818 .iterate = ceph_readdir, 1819 .llseek = ceph_dir_llseek, 1820 .open = ceph_open, 1821 .release = ceph_release, 1822 }; 1823 1824 const struct inode_operations ceph_dir_iops = { 1825 .lookup = ceph_lookup, 1826 .permission = ceph_permission, 1827 .getattr = ceph_getattr, 1828 .setattr = ceph_setattr, 1829 .listxattr = ceph_listxattr, 1830 .get_acl = ceph_get_acl, 1831 .set_acl = ceph_set_acl, 1832 .mknod = ceph_mknod, 1833 .symlink = ceph_symlink, 1834 .mkdir = ceph_mkdir, 1835 .link = ceph_link, 1836 .unlink = ceph_unlink, 1837 .rmdir = ceph_unlink, 1838 .rename = ceph_rename, 1839 .create = ceph_create, 1840 .atomic_open = ceph_atomic_open, 1841 }; 1842 1843 const struct inode_operations ceph_snapdir_iops = { 1844 .lookup = ceph_lookup, 1845 .permission = ceph_permission, 1846 .getattr = ceph_getattr, 1847 .mkdir = ceph_mkdir, 1848 .rmdir = ceph_unlink, 1849 .rename = ceph_rename, 1850 }; 1851 1852 const struct dentry_operations ceph_dentry_ops = { 1853 .d_revalidate = ceph_d_revalidate, 1854 .d_delete = ceph_d_delete, 1855 .d_release = ceph_d_release, 1856 .d_prune = ceph_d_prune, 1857 .d_init = ceph_d_init, 1858 }; 1859