1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/spinlock.h> 5 #include <linux/namei.h> 6 #include <linux/slab.h> 7 #include <linux/sched.h> 8 #include <linux/xattr.h> 9 10 #include "super.h" 11 #include "mds_client.h" 12 13 /* 14 * Directory operations: readdir, lookup, create, link, unlink, 15 * rename, etc. 16 */ 17 18 /* 19 * Ceph MDS operations are specified in terms of a base ino and 20 * relative path. Thus, the client can specify an operation on a 21 * specific inode (e.g., a getattr due to fstat(2)), or as a path 22 * relative to, say, the root directory. 23 * 24 * Normally, we limit ourselves to strict inode ops (no path component) 25 * or dentry operations (a single path component relative to an ino). The 26 * exception to this is open_root_dentry(), which will open the mount 27 * point by name. 28 */ 29 30 const struct dentry_operations ceph_dentry_ops; 31 32 static bool __dentry_lease_is_valid(struct ceph_dentry_info *di); 33 static int __dir_lease_try_check(const struct dentry *dentry); 34 35 /* 36 * Initialize ceph dentry state. 37 */ 38 static int ceph_d_init(struct dentry *dentry) 39 { 40 struct ceph_dentry_info *di; 41 42 di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL); 43 if (!di) 44 return -ENOMEM; /* oh well */ 45 46 di->dentry = dentry; 47 di->lease_session = NULL; 48 di->time = jiffies; 49 dentry->d_fsdata = di; 50 INIT_LIST_HEAD(&di->lease_list); 51 return 0; 52 } 53 54 /* 55 * for f_pos for readdir: 56 * - hash order: 57 * (0xff << 52) | ((24 bits hash) << 28) | 58 * (the nth entry has hash collision); 59 * - frag+name order; 60 * ((frag value) << 28) | (the nth entry in frag); 61 */ 62 #define OFFSET_BITS 28 63 #define OFFSET_MASK ((1 << OFFSET_BITS) - 1) 64 #define HASH_ORDER (0xffull << (OFFSET_BITS + 24)) 65 loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order) 66 { 67 loff_t fpos = ((loff_t)high << 28) | (loff_t)off; 68 if (hash_order) 69 fpos |= HASH_ORDER; 70 return fpos; 71 } 72 73 static bool is_hash_order(loff_t p) 74 { 75 return (p & HASH_ORDER) == HASH_ORDER; 76 } 77 78 static unsigned fpos_frag(loff_t p) 79 { 80 return p >> OFFSET_BITS; 81 } 82 83 static unsigned fpos_hash(loff_t p) 84 { 85 return ceph_frag_value(fpos_frag(p)); 86 } 87 88 static unsigned fpos_off(loff_t p) 89 { 90 return p & OFFSET_MASK; 91 } 92 93 static int fpos_cmp(loff_t l, loff_t r) 94 { 95 int v = ceph_frag_compare(fpos_frag(l), fpos_frag(r)); 96 if (v) 97 return v; 98 return (int)(fpos_off(l) - fpos_off(r)); 99 } 100 101 /* 102 * make note of the last dentry we read, so we can 103 * continue at the same lexicographical point, 104 * regardless of what dir changes take place on the 105 * server. 106 */ 107 static int note_last_dentry(struct ceph_dir_file_info *dfi, const char *name, 108 int len, unsigned next_offset) 109 { 110 char *buf = kmalloc(len+1, GFP_KERNEL); 111 if (!buf) 112 return -ENOMEM; 113 kfree(dfi->last_name); 114 dfi->last_name = buf; 115 memcpy(dfi->last_name, name, len); 116 dfi->last_name[len] = 0; 117 dfi->next_offset = next_offset; 118 dout("note_last_dentry '%s'\n", dfi->last_name); 119 return 0; 120 } 121 122 123 static struct dentry * 124 __dcache_find_get_entry(struct dentry *parent, u64 idx, 125 struct ceph_readdir_cache_control *cache_ctl) 126 { 127 struct inode *dir = d_inode(parent); 128 struct dentry *dentry; 129 unsigned idx_mask = (PAGE_SIZE / sizeof(struct dentry *)) - 1; 130 loff_t ptr_pos = idx * sizeof(struct dentry *); 131 pgoff_t ptr_pgoff = ptr_pos >> PAGE_SHIFT; 132 133 if (ptr_pos >= i_size_read(dir)) 134 return NULL; 135 136 if (!cache_ctl->page || ptr_pgoff != page_index(cache_ctl->page)) { 137 ceph_readdir_cache_release(cache_ctl); 138 cache_ctl->page = find_lock_page(&dir->i_data, ptr_pgoff); 139 if (!cache_ctl->page) { 140 dout(" page %lu not found\n", ptr_pgoff); 141 return ERR_PTR(-EAGAIN); 142 } 143 /* reading/filling the cache are serialized by 144 i_mutex, no need to use page lock */ 145 unlock_page(cache_ctl->page); 146 cache_ctl->dentries = kmap(cache_ctl->page); 147 } 148 149 cache_ctl->index = idx & idx_mask; 150 151 rcu_read_lock(); 152 spin_lock(&parent->d_lock); 153 /* check i_size again here, because empty directory can be 154 * marked as complete while not holding the i_mutex. */ 155 if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir)) 156 dentry = cache_ctl->dentries[cache_ctl->index]; 157 else 158 dentry = NULL; 159 spin_unlock(&parent->d_lock); 160 if (dentry && !lockref_get_not_dead(&dentry->d_lockref)) 161 dentry = NULL; 162 rcu_read_unlock(); 163 return dentry ? : ERR_PTR(-EAGAIN); 164 } 165 166 /* 167 * When possible, we try to satisfy a readdir by peeking at the 168 * dcache. We make this work by carefully ordering dentries on 169 * d_child when we initially get results back from the MDS, and 170 * falling back to a "normal" sync readdir if any dentries in the dir 171 * are dropped. 172 * 173 * Complete dir indicates that we have all dentries in the dir. It is 174 * defined IFF we hold CEPH_CAP_FILE_SHARED (which will be revoked by 175 * the MDS if/when the directory is modified). 176 */ 177 static int __dcache_readdir(struct file *file, struct dir_context *ctx, 178 int shared_gen) 179 { 180 struct ceph_dir_file_info *dfi = file->private_data; 181 struct dentry *parent = file->f_path.dentry; 182 struct inode *dir = d_inode(parent); 183 struct dentry *dentry, *last = NULL; 184 struct ceph_dentry_info *di; 185 struct ceph_readdir_cache_control cache_ctl = {}; 186 u64 idx = 0; 187 int err = 0; 188 189 dout("__dcache_readdir %p v%u at %llx\n", dir, (unsigned)shared_gen, ctx->pos); 190 191 /* search start position */ 192 if (ctx->pos > 2) { 193 u64 count = div_u64(i_size_read(dir), sizeof(struct dentry *)); 194 while (count > 0) { 195 u64 step = count >> 1; 196 dentry = __dcache_find_get_entry(parent, idx + step, 197 &cache_ctl); 198 if (!dentry) { 199 /* use linar search */ 200 idx = 0; 201 break; 202 } 203 if (IS_ERR(dentry)) { 204 err = PTR_ERR(dentry); 205 goto out; 206 } 207 di = ceph_dentry(dentry); 208 spin_lock(&dentry->d_lock); 209 if (fpos_cmp(di->offset, ctx->pos) < 0) { 210 idx += step + 1; 211 count -= step + 1; 212 } else { 213 count = step; 214 } 215 spin_unlock(&dentry->d_lock); 216 dput(dentry); 217 } 218 219 dout("__dcache_readdir %p cache idx %llu\n", dir, idx); 220 } 221 222 223 for (;;) { 224 bool emit_dentry = false; 225 dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl); 226 if (!dentry) { 227 dfi->file_info.flags |= CEPH_F_ATEND; 228 err = 0; 229 break; 230 } 231 if (IS_ERR(dentry)) { 232 err = PTR_ERR(dentry); 233 goto out; 234 } 235 236 spin_lock(&dentry->d_lock); 237 di = ceph_dentry(dentry); 238 if (d_unhashed(dentry) || 239 d_really_is_negative(dentry) || 240 di->lease_shared_gen != shared_gen) { 241 spin_unlock(&dentry->d_lock); 242 dput(dentry); 243 err = -EAGAIN; 244 goto out; 245 } 246 if (fpos_cmp(ctx->pos, di->offset) <= 0) { 247 __ceph_dentry_dir_lease_touch(di); 248 emit_dentry = true; 249 } 250 spin_unlock(&dentry->d_lock); 251 252 if (emit_dentry) { 253 dout(" %llx dentry %p %pd %p\n", di->offset, 254 dentry, dentry, d_inode(dentry)); 255 ctx->pos = di->offset; 256 if (!dir_emit(ctx, dentry->d_name.name, 257 dentry->d_name.len, 258 ceph_translate_ino(dentry->d_sb, 259 d_inode(dentry)->i_ino), 260 d_inode(dentry)->i_mode >> 12)) { 261 dput(dentry); 262 err = 0; 263 break; 264 } 265 ctx->pos++; 266 267 if (last) 268 dput(last); 269 last = dentry; 270 } else { 271 dput(dentry); 272 } 273 } 274 out: 275 ceph_readdir_cache_release(&cache_ctl); 276 if (last) { 277 int ret; 278 di = ceph_dentry(last); 279 ret = note_last_dentry(dfi, last->d_name.name, last->d_name.len, 280 fpos_off(di->offset) + 1); 281 if (ret < 0) 282 err = ret; 283 dput(last); 284 /* last_name no longer match cache index */ 285 if (dfi->readdir_cache_idx >= 0) { 286 dfi->readdir_cache_idx = -1; 287 dfi->dir_release_count = 0; 288 } 289 } 290 return err; 291 } 292 293 static bool need_send_readdir(struct ceph_dir_file_info *dfi, loff_t pos) 294 { 295 if (!dfi->last_readdir) 296 return true; 297 if (is_hash_order(pos)) 298 return !ceph_frag_contains_value(dfi->frag, fpos_hash(pos)); 299 else 300 return dfi->frag != fpos_frag(pos); 301 } 302 303 static int ceph_readdir(struct file *file, struct dir_context *ctx) 304 { 305 struct ceph_dir_file_info *dfi = file->private_data; 306 struct inode *inode = file_inode(file); 307 struct ceph_inode_info *ci = ceph_inode(inode); 308 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 309 struct ceph_mds_client *mdsc = fsc->mdsc; 310 int i; 311 int err; 312 unsigned frag = -1; 313 struct ceph_mds_reply_info_parsed *rinfo; 314 315 dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos); 316 if (dfi->file_info.flags & CEPH_F_ATEND) 317 return 0; 318 319 /* always start with . and .. */ 320 if (ctx->pos == 0) { 321 dout("readdir off 0 -> '.'\n"); 322 if (!dir_emit(ctx, ".", 1, 323 ceph_translate_ino(inode->i_sb, inode->i_ino), 324 inode->i_mode >> 12)) 325 return 0; 326 ctx->pos = 1; 327 } 328 if (ctx->pos == 1) { 329 ino_t ino = parent_ino(file->f_path.dentry); 330 dout("readdir off 1 -> '..'\n"); 331 if (!dir_emit(ctx, "..", 2, 332 ceph_translate_ino(inode->i_sb, ino), 333 inode->i_mode >> 12)) 334 return 0; 335 ctx->pos = 2; 336 } 337 338 /* can we use the dcache? */ 339 spin_lock(&ci->i_ceph_lock); 340 if (ceph_test_mount_opt(fsc, DCACHE) && 341 !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && 342 ceph_snap(inode) != CEPH_SNAPDIR && 343 __ceph_dir_is_complete_ordered(ci) && 344 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { 345 int shared_gen = atomic_read(&ci->i_shared_gen); 346 spin_unlock(&ci->i_ceph_lock); 347 err = __dcache_readdir(file, ctx, shared_gen); 348 if (err != -EAGAIN) 349 return err; 350 } else { 351 spin_unlock(&ci->i_ceph_lock); 352 } 353 354 /* proceed with a normal readdir */ 355 more: 356 /* do we have the correct frag content buffered? */ 357 if (need_send_readdir(dfi, ctx->pos)) { 358 struct ceph_mds_request *req; 359 int op = ceph_snap(inode) == CEPH_SNAPDIR ? 360 CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR; 361 362 /* discard old result, if any */ 363 if (dfi->last_readdir) { 364 ceph_mdsc_put_request(dfi->last_readdir); 365 dfi->last_readdir = NULL; 366 } 367 368 if (is_hash_order(ctx->pos)) { 369 /* fragtree isn't always accurate. choose frag 370 * based on previous reply when possible. */ 371 if (frag == (unsigned)-1) 372 frag = ceph_choose_frag(ci, fpos_hash(ctx->pos), 373 NULL, NULL); 374 } else { 375 frag = fpos_frag(ctx->pos); 376 } 377 378 dout("readdir fetching %llx.%llx frag %x offset '%s'\n", 379 ceph_vinop(inode), frag, dfi->last_name); 380 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); 381 if (IS_ERR(req)) 382 return PTR_ERR(req); 383 err = ceph_alloc_readdir_reply_buffer(req, inode); 384 if (err) { 385 ceph_mdsc_put_request(req); 386 return err; 387 } 388 /* hints to request -> mds selection code */ 389 req->r_direct_mode = USE_AUTH_MDS; 390 if (op == CEPH_MDS_OP_READDIR) { 391 req->r_direct_hash = ceph_frag_value(frag); 392 __set_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 393 req->r_inode_drop = CEPH_CAP_FILE_EXCL; 394 } 395 if (dfi->last_name) { 396 req->r_path2 = kstrdup(dfi->last_name, GFP_KERNEL); 397 if (!req->r_path2) { 398 ceph_mdsc_put_request(req); 399 return -ENOMEM; 400 } 401 } else if (is_hash_order(ctx->pos)) { 402 req->r_args.readdir.offset_hash = 403 cpu_to_le32(fpos_hash(ctx->pos)); 404 } 405 406 req->r_dir_release_cnt = dfi->dir_release_count; 407 req->r_dir_ordered_cnt = dfi->dir_ordered_count; 408 req->r_readdir_cache_idx = dfi->readdir_cache_idx; 409 req->r_readdir_offset = dfi->next_offset; 410 req->r_args.readdir.frag = cpu_to_le32(frag); 411 req->r_args.readdir.flags = 412 cpu_to_le16(CEPH_READDIR_REPLY_BITFLAGS); 413 414 req->r_inode = inode; 415 ihold(inode); 416 req->r_dentry = dget(file->f_path.dentry); 417 err = ceph_mdsc_do_request(mdsc, NULL, req); 418 if (err < 0) { 419 ceph_mdsc_put_request(req); 420 return err; 421 } 422 dout("readdir got and parsed readdir result=%d on " 423 "frag %x, end=%d, complete=%d, hash_order=%d\n", 424 err, frag, 425 (int)req->r_reply_info.dir_end, 426 (int)req->r_reply_info.dir_complete, 427 (int)req->r_reply_info.hash_order); 428 429 rinfo = &req->r_reply_info; 430 if (le32_to_cpu(rinfo->dir_dir->frag) != frag) { 431 frag = le32_to_cpu(rinfo->dir_dir->frag); 432 if (!rinfo->hash_order) { 433 dfi->next_offset = req->r_readdir_offset; 434 /* adjust ctx->pos to beginning of frag */ 435 ctx->pos = ceph_make_fpos(frag, 436 dfi->next_offset, 437 false); 438 } 439 } 440 441 dfi->frag = frag; 442 dfi->last_readdir = req; 443 444 if (test_bit(CEPH_MDS_R_DID_PREPOPULATE, &req->r_req_flags)) { 445 dfi->readdir_cache_idx = req->r_readdir_cache_idx; 446 if (dfi->readdir_cache_idx < 0) { 447 /* preclude from marking dir ordered */ 448 dfi->dir_ordered_count = 0; 449 } else if (ceph_frag_is_leftmost(frag) && 450 dfi->next_offset == 2) { 451 /* note dir version at start of readdir so 452 * we can tell if any dentries get dropped */ 453 dfi->dir_release_count = req->r_dir_release_cnt; 454 dfi->dir_ordered_count = req->r_dir_ordered_cnt; 455 } 456 } else { 457 dout("readdir !did_prepopulate\n"); 458 /* disable readdir cache */ 459 dfi->readdir_cache_idx = -1; 460 /* preclude from marking dir complete */ 461 dfi->dir_release_count = 0; 462 } 463 464 /* note next offset and last dentry name */ 465 if (rinfo->dir_nr > 0) { 466 struct ceph_mds_reply_dir_entry *rde = 467 rinfo->dir_entries + (rinfo->dir_nr-1); 468 unsigned next_offset = req->r_reply_info.dir_end ? 469 2 : (fpos_off(rde->offset) + 1); 470 err = note_last_dentry(dfi, rde->name, rde->name_len, 471 next_offset); 472 if (err) 473 return err; 474 } else if (req->r_reply_info.dir_end) { 475 dfi->next_offset = 2; 476 /* keep last name */ 477 } 478 } 479 480 rinfo = &dfi->last_readdir->r_reply_info; 481 dout("readdir frag %x num %d pos %llx chunk first %llx\n", 482 dfi->frag, rinfo->dir_nr, ctx->pos, 483 rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL); 484 485 i = 0; 486 /* search start position */ 487 if (rinfo->dir_nr > 0) { 488 int step, nr = rinfo->dir_nr; 489 while (nr > 0) { 490 step = nr >> 1; 491 if (rinfo->dir_entries[i + step].offset < ctx->pos) { 492 i += step + 1; 493 nr -= step + 1; 494 } else { 495 nr = step; 496 } 497 } 498 } 499 for (; i < rinfo->dir_nr; i++) { 500 struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i; 501 struct ceph_vino vino; 502 ino_t ino; 503 u32 ftype; 504 505 BUG_ON(rde->offset < ctx->pos); 506 507 ctx->pos = rde->offset; 508 dout("readdir (%d/%d) -> %llx '%.*s' %p\n", 509 i, rinfo->dir_nr, ctx->pos, 510 rde->name_len, rde->name, &rde->inode.in); 511 512 BUG_ON(!rde->inode.in); 513 ftype = le32_to_cpu(rde->inode.in->mode) >> 12; 514 vino.ino = le64_to_cpu(rde->inode.in->ino); 515 vino.snap = le64_to_cpu(rde->inode.in->snapid); 516 ino = ceph_vino_to_ino(vino); 517 518 if (!dir_emit(ctx, rde->name, rde->name_len, 519 ceph_translate_ino(inode->i_sb, ino), ftype)) { 520 dout("filldir stopping us...\n"); 521 return 0; 522 } 523 ctx->pos++; 524 } 525 526 ceph_mdsc_put_request(dfi->last_readdir); 527 dfi->last_readdir = NULL; 528 529 if (dfi->next_offset > 2) { 530 frag = dfi->frag; 531 goto more; 532 } 533 534 /* more frags? */ 535 if (!ceph_frag_is_rightmost(dfi->frag)) { 536 frag = ceph_frag_next(dfi->frag); 537 if (is_hash_order(ctx->pos)) { 538 loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag), 539 dfi->next_offset, true); 540 if (new_pos > ctx->pos) 541 ctx->pos = new_pos; 542 /* keep last_name */ 543 } else { 544 ctx->pos = ceph_make_fpos(frag, dfi->next_offset, 545 false); 546 kfree(dfi->last_name); 547 dfi->last_name = NULL; 548 } 549 dout("readdir next frag is %x\n", frag); 550 goto more; 551 } 552 dfi->file_info.flags |= CEPH_F_ATEND; 553 554 /* 555 * if dir_release_count still matches the dir, no dentries 556 * were released during the whole readdir, and we should have 557 * the complete dir contents in our cache. 558 */ 559 if (atomic64_read(&ci->i_release_count) == 560 dfi->dir_release_count) { 561 spin_lock(&ci->i_ceph_lock); 562 if (dfi->dir_ordered_count == 563 atomic64_read(&ci->i_ordered_count)) { 564 dout(" marking %p complete and ordered\n", inode); 565 /* use i_size to track number of entries in 566 * readdir cache */ 567 BUG_ON(dfi->readdir_cache_idx < 0); 568 i_size_write(inode, dfi->readdir_cache_idx * 569 sizeof(struct dentry*)); 570 } else { 571 dout(" marking %p complete\n", inode); 572 } 573 __ceph_dir_set_complete(ci, dfi->dir_release_count, 574 dfi->dir_ordered_count); 575 spin_unlock(&ci->i_ceph_lock); 576 } 577 578 dout("readdir %p file %p done.\n", inode, file); 579 return 0; 580 } 581 582 static void reset_readdir(struct ceph_dir_file_info *dfi) 583 { 584 if (dfi->last_readdir) { 585 ceph_mdsc_put_request(dfi->last_readdir); 586 dfi->last_readdir = NULL; 587 } 588 kfree(dfi->last_name); 589 dfi->last_name = NULL; 590 dfi->dir_release_count = 0; 591 dfi->readdir_cache_idx = -1; 592 dfi->next_offset = 2; /* compensate for . and .. */ 593 dfi->file_info.flags &= ~CEPH_F_ATEND; 594 } 595 596 /* 597 * discard buffered readdir content on seekdir(0), or seek to new frag, 598 * or seek prior to current chunk 599 */ 600 static bool need_reset_readdir(struct ceph_dir_file_info *dfi, loff_t new_pos) 601 { 602 struct ceph_mds_reply_info_parsed *rinfo; 603 loff_t chunk_offset; 604 if (new_pos == 0) 605 return true; 606 if (is_hash_order(new_pos)) { 607 /* no need to reset last_name for a forward seek when 608 * dentries are sotred in hash order */ 609 } else if (dfi->frag != fpos_frag(new_pos)) { 610 return true; 611 } 612 rinfo = dfi->last_readdir ? &dfi->last_readdir->r_reply_info : NULL; 613 if (!rinfo || !rinfo->dir_nr) 614 return true; 615 chunk_offset = rinfo->dir_entries[0].offset; 616 return new_pos < chunk_offset || 617 is_hash_order(new_pos) != is_hash_order(chunk_offset); 618 } 619 620 static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) 621 { 622 struct ceph_dir_file_info *dfi = file->private_data; 623 struct inode *inode = file->f_mapping->host; 624 loff_t retval; 625 626 inode_lock(inode); 627 retval = -EINVAL; 628 switch (whence) { 629 case SEEK_CUR: 630 offset += file->f_pos; 631 case SEEK_SET: 632 break; 633 case SEEK_END: 634 retval = -EOPNOTSUPP; 635 default: 636 goto out; 637 } 638 639 if (offset >= 0) { 640 if (need_reset_readdir(dfi, offset)) { 641 dout("dir_llseek dropping %p content\n", file); 642 reset_readdir(dfi); 643 } else if (is_hash_order(offset) && offset > file->f_pos) { 644 /* for hash offset, we don't know if a forward seek 645 * is within same frag */ 646 dfi->dir_release_count = 0; 647 dfi->readdir_cache_idx = -1; 648 } 649 650 if (offset != file->f_pos) { 651 file->f_pos = offset; 652 file->f_version = 0; 653 dfi->file_info.flags &= ~CEPH_F_ATEND; 654 } 655 retval = offset; 656 } 657 out: 658 inode_unlock(inode); 659 return retval; 660 } 661 662 /* 663 * Handle lookups for the hidden .snap directory. 664 */ 665 int ceph_handle_snapdir(struct ceph_mds_request *req, 666 struct dentry *dentry, int err) 667 { 668 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 669 struct inode *parent = d_inode(dentry->d_parent); /* we hold i_mutex */ 670 671 /* .snap dir? */ 672 if (err == -ENOENT && 673 ceph_snap(parent) == CEPH_NOSNAP && 674 strcmp(dentry->d_name.name, 675 fsc->mount_options->snapdir_name) == 0) { 676 struct inode *inode = ceph_get_snapdir(parent); 677 dout("ENOENT on snapdir %p '%pd', linking to snapdir %p\n", 678 dentry, dentry, inode); 679 BUG_ON(!d_unhashed(dentry)); 680 d_add(dentry, inode); 681 err = 0; 682 } 683 return err; 684 } 685 686 /* 687 * Figure out final result of a lookup/open request. 688 * 689 * Mainly, make sure we return the final req->r_dentry (if it already 690 * existed) in place of the original VFS-provided dentry when they 691 * differ. 692 * 693 * Gracefully handle the case where the MDS replies with -ENOENT and 694 * no trace (which it may do, at its discretion, e.g., if it doesn't 695 * care to issue a lease on the negative dentry). 696 */ 697 struct dentry *ceph_finish_lookup(struct ceph_mds_request *req, 698 struct dentry *dentry, int err) 699 { 700 if (err == -ENOENT) { 701 /* no trace? */ 702 err = 0; 703 if (!req->r_reply_info.head->is_dentry) { 704 dout("ENOENT and no trace, dentry %p inode %p\n", 705 dentry, d_inode(dentry)); 706 if (d_really_is_positive(dentry)) { 707 d_drop(dentry); 708 err = -ENOENT; 709 } else { 710 d_add(dentry, NULL); 711 } 712 } 713 } 714 if (err) 715 dentry = ERR_PTR(err); 716 else if (dentry != req->r_dentry) 717 dentry = dget(req->r_dentry); /* we got spliced */ 718 else 719 dentry = NULL; 720 return dentry; 721 } 722 723 static bool is_root_ceph_dentry(struct inode *inode, struct dentry *dentry) 724 { 725 return ceph_ino(inode) == CEPH_INO_ROOT && 726 strncmp(dentry->d_name.name, ".ceph", 5) == 0; 727 } 728 729 /* 730 * Look up a single dir entry. If there is a lookup intent, inform 731 * the MDS so that it gets our 'caps wanted' value in a single op. 732 */ 733 static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, 734 unsigned int flags) 735 { 736 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 737 struct ceph_mds_client *mdsc = fsc->mdsc; 738 struct ceph_mds_request *req; 739 int op; 740 int mask; 741 int err; 742 743 dout("lookup %p dentry %p '%pd'\n", 744 dir, dentry, dentry); 745 746 if (dentry->d_name.len > NAME_MAX) 747 return ERR_PTR(-ENAMETOOLONG); 748 749 /* can we conclude ENOENT locally? */ 750 if (d_really_is_negative(dentry)) { 751 struct ceph_inode_info *ci = ceph_inode(dir); 752 struct ceph_dentry_info *di = ceph_dentry(dentry); 753 754 spin_lock(&ci->i_ceph_lock); 755 dout(" dir %p flags are %d\n", dir, ci->i_ceph_flags); 756 if (strncmp(dentry->d_name.name, 757 fsc->mount_options->snapdir_name, 758 dentry->d_name.len) && 759 !is_root_ceph_dentry(dir, dentry) && 760 ceph_test_mount_opt(fsc, DCACHE) && 761 __ceph_dir_is_complete(ci) && 762 (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) { 763 spin_unlock(&ci->i_ceph_lock); 764 dout(" dir %p complete, -ENOENT\n", dir); 765 d_add(dentry, NULL); 766 di->lease_shared_gen = atomic_read(&ci->i_shared_gen); 767 return NULL; 768 } 769 spin_unlock(&ci->i_ceph_lock); 770 } 771 772 op = ceph_snap(dir) == CEPH_SNAPDIR ? 773 CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP; 774 req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS); 775 if (IS_ERR(req)) 776 return ERR_CAST(req); 777 req->r_dentry = dget(dentry); 778 req->r_num_caps = 2; 779 780 mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED; 781 if (ceph_security_xattr_wanted(dir)) 782 mask |= CEPH_CAP_XATTR_SHARED; 783 req->r_args.getattr.mask = cpu_to_le32(mask); 784 785 req->r_parent = dir; 786 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 787 err = ceph_mdsc_do_request(mdsc, NULL, req); 788 err = ceph_handle_snapdir(req, dentry, err); 789 dentry = ceph_finish_lookup(req, dentry, err); 790 ceph_mdsc_put_request(req); /* will dput(dentry) */ 791 dout("lookup result=%p\n", dentry); 792 return dentry; 793 } 794 795 /* 796 * If we do a create but get no trace back from the MDS, follow up with 797 * a lookup (the VFS expects us to link up the provided dentry). 798 */ 799 int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry) 800 { 801 struct dentry *result = ceph_lookup(dir, dentry, 0); 802 803 if (result && !IS_ERR(result)) { 804 /* 805 * We created the item, then did a lookup, and found 806 * it was already linked to another inode we already 807 * had in our cache (and thus got spliced). To not 808 * confuse VFS (especially when inode is a directory), 809 * we don't link our dentry to that inode, return an 810 * error instead. 811 * 812 * This event should be rare and it happens only when 813 * we talk to old MDS. Recent MDS does not send traceless 814 * reply for request that creates new inode. 815 */ 816 d_drop(result); 817 return -ESTALE; 818 } 819 return PTR_ERR(result); 820 } 821 822 static int ceph_mknod(struct inode *dir, struct dentry *dentry, 823 umode_t mode, dev_t rdev) 824 { 825 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 826 struct ceph_mds_client *mdsc = fsc->mdsc; 827 struct ceph_mds_request *req; 828 struct ceph_acls_info acls = {}; 829 int err; 830 831 if (ceph_snap(dir) != CEPH_NOSNAP) 832 return -EROFS; 833 834 if (ceph_quota_is_max_files_exceeded(dir)) { 835 err = -EDQUOT; 836 goto out; 837 } 838 839 err = ceph_pre_init_acls(dir, &mode, &acls); 840 if (err < 0) 841 goto out; 842 843 dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n", 844 dir, dentry, mode, rdev); 845 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS); 846 if (IS_ERR(req)) { 847 err = PTR_ERR(req); 848 goto out; 849 } 850 req->r_dentry = dget(dentry); 851 req->r_num_caps = 2; 852 req->r_parent = dir; 853 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 854 req->r_args.mknod.mode = cpu_to_le32(mode); 855 req->r_args.mknod.rdev = cpu_to_le32(rdev); 856 req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; 857 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 858 if (acls.pagelist) { 859 req->r_pagelist = acls.pagelist; 860 acls.pagelist = NULL; 861 } 862 err = ceph_mdsc_do_request(mdsc, dir, req); 863 if (!err && !req->r_reply_info.head->is_dentry) 864 err = ceph_handle_notrace_create(dir, dentry); 865 ceph_mdsc_put_request(req); 866 out: 867 if (!err) 868 ceph_init_inode_acls(d_inode(dentry), &acls); 869 else 870 d_drop(dentry); 871 ceph_release_acls_info(&acls); 872 return err; 873 } 874 875 static int ceph_create(struct inode *dir, struct dentry *dentry, umode_t mode, 876 bool excl) 877 { 878 return ceph_mknod(dir, dentry, mode, 0); 879 } 880 881 static int ceph_symlink(struct inode *dir, struct dentry *dentry, 882 const char *dest) 883 { 884 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 885 struct ceph_mds_client *mdsc = fsc->mdsc; 886 struct ceph_mds_request *req; 887 int err; 888 889 if (ceph_snap(dir) != CEPH_NOSNAP) 890 return -EROFS; 891 892 if (ceph_quota_is_max_files_exceeded(dir)) { 893 err = -EDQUOT; 894 goto out; 895 } 896 897 dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest); 898 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS); 899 if (IS_ERR(req)) { 900 err = PTR_ERR(req); 901 goto out; 902 } 903 req->r_path2 = kstrdup(dest, GFP_KERNEL); 904 if (!req->r_path2) { 905 err = -ENOMEM; 906 ceph_mdsc_put_request(req); 907 goto out; 908 } 909 req->r_parent = dir; 910 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 911 req->r_dentry = dget(dentry); 912 req->r_num_caps = 2; 913 req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; 914 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 915 err = ceph_mdsc_do_request(mdsc, dir, req); 916 if (!err && !req->r_reply_info.head->is_dentry) 917 err = ceph_handle_notrace_create(dir, dentry); 918 ceph_mdsc_put_request(req); 919 out: 920 if (err) 921 d_drop(dentry); 922 return err; 923 } 924 925 static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) 926 { 927 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 928 struct ceph_mds_client *mdsc = fsc->mdsc; 929 struct ceph_mds_request *req; 930 struct ceph_acls_info acls = {}; 931 int err = -EROFS; 932 int op; 933 934 if (ceph_snap(dir) == CEPH_SNAPDIR) { 935 /* mkdir .snap/foo is a MKSNAP */ 936 op = CEPH_MDS_OP_MKSNAP; 937 dout("mksnap dir %p snap '%pd' dn %p\n", dir, 938 dentry, dentry); 939 } else if (ceph_snap(dir) == CEPH_NOSNAP) { 940 dout("mkdir dir %p dn %p mode 0%ho\n", dir, dentry, mode); 941 op = CEPH_MDS_OP_MKDIR; 942 } else { 943 goto out; 944 } 945 946 if (op == CEPH_MDS_OP_MKDIR && 947 ceph_quota_is_max_files_exceeded(dir)) { 948 err = -EDQUOT; 949 goto out; 950 } 951 952 mode |= S_IFDIR; 953 err = ceph_pre_init_acls(dir, &mode, &acls); 954 if (err < 0) 955 goto out; 956 957 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); 958 if (IS_ERR(req)) { 959 err = PTR_ERR(req); 960 goto out; 961 } 962 963 req->r_dentry = dget(dentry); 964 req->r_num_caps = 2; 965 req->r_parent = dir; 966 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 967 req->r_args.mkdir.mode = cpu_to_le32(mode); 968 req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; 969 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 970 if (acls.pagelist) { 971 req->r_pagelist = acls.pagelist; 972 acls.pagelist = NULL; 973 } 974 err = ceph_mdsc_do_request(mdsc, dir, req); 975 if (!err && 976 !req->r_reply_info.head->is_target && 977 !req->r_reply_info.head->is_dentry) 978 err = ceph_handle_notrace_create(dir, dentry); 979 ceph_mdsc_put_request(req); 980 out: 981 if (!err) 982 ceph_init_inode_acls(d_inode(dentry), &acls); 983 else 984 d_drop(dentry); 985 ceph_release_acls_info(&acls); 986 return err; 987 } 988 989 static int ceph_link(struct dentry *old_dentry, struct inode *dir, 990 struct dentry *dentry) 991 { 992 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 993 struct ceph_mds_client *mdsc = fsc->mdsc; 994 struct ceph_mds_request *req; 995 int err; 996 997 if (ceph_snap(dir) != CEPH_NOSNAP) 998 return -EROFS; 999 1000 dout("link in dir %p old_dentry %p dentry %p\n", dir, 1001 old_dentry, dentry); 1002 req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LINK, USE_AUTH_MDS); 1003 if (IS_ERR(req)) { 1004 d_drop(dentry); 1005 return PTR_ERR(req); 1006 } 1007 req->r_dentry = dget(dentry); 1008 req->r_num_caps = 2; 1009 req->r_old_dentry = dget(old_dentry); 1010 req->r_parent = dir; 1011 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 1012 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 1013 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 1014 /* release LINK_SHARED on source inode (mds will lock it) */ 1015 req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL; 1016 err = ceph_mdsc_do_request(mdsc, dir, req); 1017 if (err) { 1018 d_drop(dentry); 1019 } else if (!req->r_reply_info.head->is_dentry) { 1020 ihold(d_inode(old_dentry)); 1021 d_instantiate(dentry, d_inode(old_dentry)); 1022 } 1023 ceph_mdsc_put_request(req); 1024 return err; 1025 } 1026 1027 /* 1028 * rmdir and unlink are differ only by the metadata op code 1029 */ 1030 static int ceph_unlink(struct inode *dir, struct dentry *dentry) 1031 { 1032 struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); 1033 struct ceph_mds_client *mdsc = fsc->mdsc; 1034 struct inode *inode = d_inode(dentry); 1035 struct ceph_mds_request *req; 1036 int err = -EROFS; 1037 int op; 1038 1039 if (ceph_snap(dir) == CEPH_SNAPDIR) { 1040 /* rmdir .snap/foo is RMSNAP */ 1041 dout("rmsnap dir %p '%pd' dn %p\n", dir, dentry, dentry); 1042 op = CEPH_MDS_OP_RMSNAP; 1043 } else if (ceph_snap(dir) == CEPH_NOSNAP) { 1044 dout("unlink/rmdir dir %p dn %p inode %p\n", 1045 dir, dentry, inode); 1046 op = d_is_dir(dentry) ? 1047 CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK; 1048 } else 1049 goto out; 1050 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); 1051 if (IS_ERR(req)) { 1052 err = PTR_ERR(req); 1053 goto out; 1054 } 1055 req->r_dentry = dget(dentry); 1056 req->r_num_caps = 2; 1057 req->r_parent = dir; 1058 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 1059 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 1060 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 1061 req->r_inode_drop = ceph_drop_caps_for_unlink(inode); 1062 err = ceph_mdsc_do_request(mdsc, dir, req); 1063 if (!err && !req->r_reply_info.head->is_dentry) 1064 d_delete(dentry); 1065 ceph_mdsc_put_request(req); 1066 out: 1067 return err; 1068 } 1069 1070 static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, 1071 struct inode *new_dir, struct dentry *new_dentry, 1072 unsigned int flags) 1073 { 1074 struct ceph_fs_client *fsc = ceph_sb_to_client(old_dir->i_sb); 1075 struct ceph_mds_client *mdsc = fsc->mdsc; 1076 struct ceph_mds_request *req; 1077 int op = CEPH_MDS_OP_RENAME; 1078 int err; 1079 1080 if (flags) 1081 return -EINVAL; 1082 1083 if (ceph_snap(old_dir) != ceph_snap(new_dir)) 1084 return -EXDEV; 1085 if (ceph_snap(old_dir) != CEPH_NOSNAP) { 1086 if (old_dir == new_dir && ceph_snap(old_dir) == CEPH_SNAPDIR) 1087 op = CEPH_MDS_OP_RENAMESNAP; 1088 else 1089 return -EROFS; 1090 } 1091 /* don't allow cross-quota renames */ 1092 if ((old_dir != new_dir) && 1093 (!ceph_quota_is_same_realm(old_dir, new_dir))) 1094 return -EXDEV; 1095 1096 dout("rename dir %p dentry %p to dir %p dentry %p\n", 1097 old_dir, old_dentry, new_dir, new_dentry); 1098 req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); 1099 if (IS_ERR(req)) 1100 return PTR_ERR(req); 1101 ihold(old_dir); 1102 req->r_dentry = dget(new_dentry); 1103 req->r_num_caps = 2; 1104 req->r_old_dentry = dget(old_dentry); 1105 req->r_old_dentry_dir = old_dir; 1106 req->r_parent = new_dir; 1107 set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); 1108 req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED; 1109 req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL; 1110 req->r_dentry_drop = CEPH_CAP_FILE_SHARED; 1111 req->r_dentry_unless = CEPH_CAP_FILE_EXCL; 1112 /* release LINK_RDCACHE on source inode (mds will lock it) */ 1113 req->r_old_inode_drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL; 1114 if (d_really_is_positive(new_dentry)) { 1115 req->r_inode_drop = 1116 ceph_drop_caps_for_unlink(d_inode(new_dentry)); 1117 } 1118 err = ceph_mdsc_do_request(mdsc, old_dir, req); 1119 if (!err && !req->r_reply_info.head->is_dentry) { 1120 /* 1121 * Normally d_move() is done by fill_trace (called by 1122 * do_request, above). If there is no trace, we need 1123 * to do it here. 1124 */ 1125 d_move(old_dentry, new_dentry); 1126 } 1127 ceph_mdsc_put_request(req); 1128 return err; 1129 } 1130 1131 /* 1132 * Move dentry to tail of mdsc->dentry_leases list when lease is updated. 1133 * Leases at front of the list will expire first. (Assume all leases have 1134 * similar duration) 1135 * 1136 * Called under dentry->d_lock. 1137 */ 1138 void __ceph_dentry_lease_touch(struct ceph_dentry_info *di) 1139 { 1140 struct dentry *dn = di->dentry; 1141 struct ceph_mds_client *mdsc; 1142 1143 dout("dentry_lease_touch %p %p '%pd'\n", di, dn, dn); 1144 1145 di->flags |= CEPH_DENTRY_LEASE_LIST; 1146 if (di->flags & CEPH_DENTRY_SHRINK_LIST) { 1147 di->flags |= CEPH_DENTRY_REFERENCED; 1148 return; 1149 } 1150 1151 mdsc = ceph_sb_to_client(dn->d_sb)->mdsc; 1152 spin_lock(&mdsc->dentry_list_lock); 1153 list_move_tail(&di->lease_list, &mdsc->dentry_leases); 1154 spin_unlock(&mdsc->dentry_list_lock); 1155 } 1156 1157 static void __dentry_dir_lease_touch(struct ceph_mds_client* mdsc, 1158 struct ceph_dentry_info *di) 1159 { 1160 di->flags &= ~(CEPH_DENTRY_LEASE_LIST | CEPH_DENTRY_REFERENCED); 1161 di->lease_gen = 0; 1162 di->time = jiffies; 1163 list_move_tail(&di->lease_list, &mdsc->dentry_dir_leases); 1164 } 1165 1166 /* 1167 * When dir lease is used, add dentry to tail of mdsc->dentry_dir_leases 1168 * list if it's not in the list, otherwise set 'referenced' flag. 1169 * 1170 * Called under dentry->d_lock. 1171 */ 1172 void __ceph_dentry_dir_lease_touch(struct ceph_dentry_info *di) 1173 { 1174 struct dentry *dn = di->dentry; 1175 struct ceph_mds_client *mdsc; 1176 1177 dout("dentry_dir_lease_touch %p %p '%pd' (offset %lld)\n", 1178 di, dn, dn, di->offset); 1179 1180 if (!list_empty(&di->lease_list)) { 1181 if (di->flags & CEPH_DENTRY_LEASE_LIST) { 1182 /* don't remove dentry from dentry lease list 1183 * if its lease is valid */ 1184 if (__dentry_lease_is_valid(di)) 1185 return; 1186 } else { 1187 di->flags |= CEPH_DENTRY_REFERENCED; 1188 return; 1189 } 1190 } 1191 1192 if (di->flags & CEPH_DENTRY_SHRINK_LIST) { 1193 di->flags |= CEPH_DENTRY_REFERENCED; 1194 di->flags &= ~CEPH_DENTRY_LEASE_LIST; 1195 return; 1196 } 1197 1198 mdsc = ceph_sb_to_client(dn->d_sb)->mdsc; 1199 spin_lock(&mdsc->dentry_list_lock); 1200 __dentry_dir_lease_touch(mdsc, di), 1201 spin_unlock(&mdsc->dentry_list_lock); 1202 } 1203 1204 static void __dentry_lease_unlist(struct ceph_dentry_info *di) 1205 { 1206 struct ceph_mds_client *mdsc; 1207 if (di->flags & CEPH_DENTRY_SHRINK_LIST) 1208 return; 1209 if (list_empty(&di->lease_list)) 1210 return; 1211 1212 mdsc = ceph_sb_to_client(di->dentry->d_sb)->mdsc; 1213 spin_lock(&mdsc->dentry_list_lock); 1214 list_del_init(&di->lease_list); 1215 spin_unlock(&mdsc->dentry_list_lock); 1216 } 1217 1218 enum { 1219 KEEP = 0, 1220 DELETE = 1, 1221 TOUCH = 2, 1222 STOP = 4, 1223 }; 1224 1225 struct ceph_lease_walk_control { 1226 bool dir_lease; 1227 bool expire_dir_lease; 1228 unsigned long nr_to_scan; 1229 unsigned long dir_lease_ttl; 1230 }; 1231 1232 static unsigned long 1233 __dentry_leases_walk(struct ceph_mds_client *mdsc, 1234 struct ceph_lease_walk_control *lwc, 1235 int (*check)(struct dentry*, void*)) 1236 { 1237 struct ceph_dentry_info *di, *tmp; 1238 struct dentry *dentry, *last = NULL; 1239 struct list_head* list; 1240 LIST_HEAD(dispose); 1241 unsigned long freed = 0; 1242 int ret = 0; 1243 1244 list = lwc->dir_lease ? &mdsc->dentry_dir_leases : &mdsc->dentry_leases; 1245 spin_lock(&mdsc->dentry_list_lock); 1246 list_for_each_entry_safe(di, tmp, list, lease_list) { 1247 if (!lwc->nr_to_scan) 1248 break; 1249 --lwc->nr_to_scan; 1250 1251 dentry = di->dentry; 1252 if (last == dentry) 1253 break; 1254 1255 if (!spin_trylock(&dentry->d_lock)) 1256 continue; 1257 1258 if (dentry->d_lockref.count < 0) { 1259 list_del_init(&di->lease_list); 1260 goto next; 1261 } 1262 1263 ret = check(dentry, lwc); 1264 if (ret & TOUCH) { 1265 /* move it into tail of dir lease list */ 1266 __dentry_dir_lease_touch(mdsc, di); 1267 if (!last) 1268 last = dentry; 1269 } 1270 if (ret & DELETE) { 1271 /* stale lease */ 1272 di->flags &= ~CEPH_DENTRY_REFERENCED; 1273 if (dentry->d_lockref.count > 0) { 1274 /* update_dentry_lease() will re-add 1275 * it to lease list, or 1276 * ceph_d_delete() will return 1 when 1277 * last reference is dropped */ 1278 list_del_init(&di->lease_list); 1279 } else { 1280 di->flags |= CEPH_DENTRY_SHRINK_LIST; 1281 list_move_tail(&di->lease_list, &dispose); 1282 dget_dlock(dentry); 1283 } 1284 } 1285 next: 1286 spin_unlock(&dentry->d_lock); 1287 if (ret & STOP) 1288 break; 1289 } 1290 spin_unlock(&mdsc->dentry_list_lock); 1291 1292 while (!list_empty(&dispose)) { 1293 di = list_first_entry(&dispose, struct ceph_dentry_info, 1294 lease_list); 1295 dentry = di->dentry; 1296 spin_lock(&dentry->d_lock); 1297 1298 list_del_init(&di->lease_list); 1299 di->flags &= ~CEPH_DENTRY_SHRINK_LIST; 1300 if (di->flags & CEPH_DENTRY_REFERENCED) { 1301 spin_lock(&mdsc->dentry_list_lock); 1302 if (di->flags & CEPH_DENTRY_LEASE_LIST) { 1303 list_add_tail(&di->lease_list, 1304 &mdsc->dentry_leases); 1305 } else { 1306 __dentry_dir_lease_touch(mdsc, di); 1307 } 1308 spin_unlock(&mdsc->dentry_list_lock); 1309 } else { 1310 freed++; 1311 } 1312 1313 spin_unlock(&dentry->d_lock); 1314 /* ceph_d_delete() does the trick */ 1315 dput(dentry); 1316 } 1317 return freed; 1318 } 1319 1320 static int __dentry_lease_check(struct dentry *dentry, void *arg) 1321 { 1322 struct ceph_dentry_info *di = ceph_dentry(dentry); 1323 int ret; 1324 1325 if (__dentry_lease_is_valid(di)) 1326 return STOP; 1327 ret = __dir_lease_try_check(dentry); 1328 if (ret == -EBUSY) 1329 return KEEP; 1330 if (ret > 0) 1331 return TOUCH; 1332 return DELETE; 1333 } 1334 1335 static int __dir_lease_check(struct dentry *dentry, void *arg) 1336 { 1337 struct ceph_lease_walk_control *lwc = arg; 1338 struct ceph_dentry_info *di = ceph_dentry(dentry); 1339 1340 int ret = __dir_lease_try_check(dentry); 1341 if (ret == -EBUSY) 1342 return KEEP; 1343 if (ret > 0) { 1344 if (time_before(jiffies, di->time + lwc->dir_lease_ttl)) 1345 return STOP; 1346 /* Move dentry to tail of dir lease list if we don't want 1347 * to delete it. So dentries in the list are checked in a 1348 * round robin manner */ 1349 if (!lwc->expire_dir_lease) 1350 return TOUCH; 1351 if (dentry->d_lockref.count > 0 || 1352 (di->flags & CEPH_DENTRY_REFERENCED)) 1353 return TOUCH; 1354 /* invalidate dir lease */ 1355 di->lease_shared_gen = 0; 1356 } 1357 return DELETE; 1358 } 1359 1360 int ceph_trim_dentries(struct ceph_mds_client *mdsc) 1361 { 1362 struct ceph_lease_walk_control lwc; 1363 unsigned long count; 1364 unsigned long freed; 1365 1366 spin_lock(&mdsc->caps_list_lock); 1367 if (mdsc->caps_use_max > 0 && 1368 mdsc->caps_use_count > mdsc->caps_use_max) 1369 count = mdsc->caps_use_count - mdsc->caps_use_max; 1370 else 1371 count = 0; 1372 spin_unlock(&mdsc->caps_list_lock); 1373 1374 lwc.dir_lease = false; 1375 lwc.nr_to_scan = CEPH_CAPS_PER_RELEASE * 2; 1376 freed = __dentry_leases_walk(mdsc, &lwc, __dentry_lease_check); 1377 if (!lwc.nr_to_scan) /* more invalid leases */ 1378 return -EAGAIN; 1379 1380 if (lwc.nr_to_scan < CEPH_CAPS_PER_RELEASE) 1381 lwc.nr_to_scan = CEPH_CAPS_PER_RELEASE; 1382 1383 lwc.dir_lease = true; 1384 lwc.expire_dir_lease = freed < count; 1385 lwc.dir_lease_ttl = mdsc->fsc->mount_options->caps_wanted_delay_max * HZ; 1386 freed +=__dentry_leases_walk(mdsc, &lwc, __dir_lease_check); 1387 if (!lwc.nr_to_scan) /* more to check */ 1388 return -EAGAIN; 1389 1390 return freed > 0 ? 1 : 0; 1391 } 1392 1393 /* 1394 * Ensure a dentry lease will no longer revalidate. 1395 */ 1396 void ceph_invalidate_dentry_lease(struct dentry *dentry) 1397 { 1398 struct ceph_dentry_info *di = ceph_dentry(dentry); 1399 spin_lock(&dentry->d_lock); 1400 di->time = jiffies; 1401 di->lease_shared_gen = 0; 1402 __dentry_lease_unlist(di); 1403 spin_unlock(&dentry->d_lock); 1404 } 1405 1406 /* 1407 * Check if dentry lease is valid. If not, delete the lease. Try to 1408 * renew if the least is more than half up. 1409 */ 1410 static bool __dentry_lease_is_valid(struct ceph_dentry_info *di) 1411 { 1412 struct ceph_mds_session *session; 1413 1414 if (!di->lease_gen) 1415 return false; 1416 1417 session = di->lease_session; 1418 if (session) { 1419 u32 gen; 1420 unsigned long ttl; 1421 1422 spin_lock(&session->s_gen_ttl_lock); 1423 gen = session->s_cap_gen; 1424 ttl = session->s_cap_ttl; 1425 spin_unlock(&session->s_gen_ttl_lock); 1426 1427 if (di->lease_gen == gen && 1428 time_before(jiffies, ttl) && 1429 time_before(jiffies, di->time)) 1430 return true; 1431 } 1432 di->lease_gen = 0; 1433 return false; 1434 } 1435 1436 static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags, 1437 struct inode *dir) 1438 { 1439 struct ceph_dentry_info *di; 1440 struct ceph_mds_session *session = NULL; 1441 u32 seq = 0; 1442 int valid = 0; 1443 1444 spin_lock(&dentry->d_lock); 1445 di = ceph_dentry(dentry); 1446 if (di && __dentry_lease_is_valid(di)) { 1447 valid = 1; 1448 1449 if (di->lease_renew_after && 1450 time_after(jiffies, di->lease_renew_after)) { 1451 /* 1452 * We should renew. If we're in RCU walk mode 1453 * though, we can't do that so just return 1454 * -ECHILD. 1455 */ 1456 if (flags & LOOKUP_RCU) { 1457 valid = -ECHILD; 1458 } else { 1459 session = ceph_get_mds_session(di->lease_session); 1460 seq = di->lease_seq; 1461 di->lease_renew_after = 0; 1462 di->lease_renew_from = jiffies; 1463 } 1464 } 1465 } 1466 spin_unlock(&dentry->d_lock); 1467 1468 if (session) { 1469 ceph_mdsc_lease_send_msg(session, dir, dentry, 1470 CEPH_MDS_LEASE_RENEW, seq); 1471 ceph_put_mds_session(session); 1472 } 1473 dout("dentry_lease_is_valid - dentry %p = %d\n", dentry, valid); 1474 return valid; 1475 } 1476 1477 /* 1478 * Called under dentry->d_lock. 1479 */ 1480 static int __dir_lease_try_check(const struct dentry *dentry) 1481 { 1482 struct ceph_dentry_info *di = ceph_dentry(dentry); 1483 struct inode *dir; 1484 struct ceph_inode_info *ci; 1485 int valid = 0; 1486 1487 if (!di->lease_shared_gen) 1488 return 0; 1489 if (IS_ROOT(dentry)) 1490 return 0; 1491 1492 dir = d_inode(dentry->d_parent); 1493 ci = ceph_inode(dir); 1494 1495 if (spin_trylock(&ci->i_ceph_lock)) { 1496 if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen && 1497 __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 0)) 1498 valid = 1; 1499 spin_unlock(&ci->i_ceph_lock); 1500 } else { 1501 valid = -EBUSY; 1502 } 1503 1504 if (!valid) 1505 di->lease_shared_gen = 0; 1506 return valid; 1507 } 1508 1509 /* 1510 * Check if directory-wide content lease/cap is valid. 1511 */ 1512 static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry) 1513 { 1514 struct ceph_inode_info *ci = ceph_inode(dir); 1515 struct ceph_dentry_info *di = ceph_dentry(dentry); 1516 int valid = 0; 1517 1518 spin_lock(&ci->i_ceph_lock); 1519 if (atomic_read(&ci->i_shared_gen) == di->lease_shared_gen) 1520 valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1); 1521 spin_unlock(&ci->i_ceph_lock); 1522 if (valid) 1523 __ceph_dentry_dir_lease_touch(di); 1524 dout("dir_lease_is_valid dir %p v%u dentry %p v%u = %d\n", 1525 dir, (unsigned)atomic_read(&ci->i_shared_gen), 1526 dentry, (unsigned)di->lease_shared_gen, valid); 1527 return valid; 1528 } 1529 1530 /* 1531 * Check if cached dentry can be trusted. 1532 */ 1533 static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) 1534 { 1535 int valid = 0; 1536 struct dentry *parent; 1537 struct inode *dir; 1538 1539 if (flags & LOOKUP_RCU) { 1540 parent = READ_ONCE(dentry->d_parent); 1541 dir = d_inode_rcu(parent); 1542 if (!dir) 1543 return -ECHILD; 1544 } else { 1545 parent = dget_parent(dentry); 1546 dir = d_inode(parent); 1547 } 1548 1549 dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry, 1550 dentry, d_inode(dentry), ceph_dentry(dentry)->offset); 1551 1552 /* always trust cached snapped dentries, snapdir dentry */ 1553 if (ceph_snap(dir) != CEPH_NOSNAP) { 1554 dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry, 1555 dentry, d_inode(dentry)); 1556 valid = 1; 1557 } else if (d_really_is_positive(dentry) && 1558 ceph_snap(d_inode(dentry)) == CEPH_SNAPDIR) { 1559 valid = 1; 1560 } else { 1561 valid = dentry_lease_is_valid(dentry, flags, dir); 1562 if (valid == -ECHILD) 1563 return valid; 1564 if (valid || dir_lease_is_valid(dir, dentry)) { 1565 if (d_really_is_positive(dentry)) 1566 valid = ceph_is_any_caps(d_inode(dentry)); 1567 else 1568 valid = 1; 1569 } 1570 } 1571 1572 if (!valid) { 1573 struct ceph_mds_client *mdsc = 1574 ceph_sb_to_client(dir->i_sb)->mdsc; 1575 struct ceph_mds_request *req; 1576 int op, err; 1577 u32 mask; 1578 1579 if (flags & LOOKUP_RCU) 1580 return -ECHILD; 1581 1582 op = ceph_snap(dir) == CEPH_SNAPDIR ? 1583 CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP; 1584 req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS); 1585 if (!IS_ERR(req)) { 1586 req->r_dentry = dget(dentry); 1587 req->r_num_caps = 2; 1588 req->r_parent = dir; 1589 1590 mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED; 1591 if (ceph_security_xattr_wanted(dir)) 1592 mask |= CEPH_CAP_XATTR_SHARED; 1593 req->r_args.getattr.mask = cpu_to_le32(mask); 1594 1595 err = ceph_mdsc_do_request(mdsc, NULL, req); 1596 switch (err) { 1597 case 0: 1598 if (d_really_is_positive(dentry) && 1599 d_inode(dentry) == req->r_target_inode) 1600 valid = 1; 1601 break; 1602 case -ENOENT: 1603 if (d_really_is_negative(dentry)) 1604 valid = 1; 1605 /* Fallthrough */ 1606 default: 1607 break; 1608 } 1609 ceph_mdsc_put_request(req); 1610 dout("d_revalidate %p lookup result=%d\n", 1611 dentry, err); 1612 } 1613 } 1614 1615 dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid"); 1616 if (!valid) 1617 ceph_dir_clear_complete(dir); 1618 1619 if (!(flags & LOOKUP_RCU)) 1620 dput(parent); 1621 return valid; 1622 } 1623 1624 /* 1625 * Delete unused dentry that doesn't have valid lease 1626 * 1627 * Called under dentry->d_lock. 1628 */ 1629 static int ceph_d_delete(const struct dentry *dentry) 1630 { 1631 struct ceph_dentry_info *di; 1632 1633 /* won't release caps */ 1634 if (d_really_is_negative(dentry)) 1635 return 0; 1636 if (ceph_snap(d_inode(dentry)) != CEPH_NOSNAP) 1637 return 0; 1638 /* vaild lease? */ 1639 di = ceph_dentry(dentry); 1640 if (di) { 1641 if (__dentry_lease_is_valid(di)) 1642 return 0; 1643 if (__dir_lease_try_check(dentry)) 1644 return 0; 1645 } 1646 return 1; 1647 } 1648 1649 /* 1650 * Release our ceph_dentry_info. 1651 */ 1652 static void ceph_d_release(struct dentry *dentry) 1653 { 1654 struct ceph_dentry_info *di = ceph_dentry(dentry); 1655 1656 dout("d_release %p\n", dentry); 1657 1658 spin_lock(&dentry->d_lock); 1659 __dentry_lease_unlist(di); 1660 dentry->d_fsdata = NULL; 1661 spin_unlock(&dentry->d_lock); 1662 1663 if (di->lease_session) 1664 ceph_put_mds_session(di->lease_session); 1665 kmem_cache_free(ceph_dentry_cachep, di); 1666 } 1667 1668 /* 1669 * When the VFS prunes a dentry from the cache, we need to clear the 1670 * complete flag on the parent directory. 1671 * 1672 * Called under dentry->d_lock. 1673 */ 1674 static void ceph_d_prune(struct dentry *dentry) 1675 { 1676 struct ceph_inode_info *dir_ci; 1677 struct ceph_dentry_info *di; 1678 1679 dout("ceph_d_prune %pd %p\n", dentry, dentry); 1680 1681 /* do we have a valid parent? */ 1682 if (IS_ROOT(dentry)) 1683 return; 1684 1685 /* we hold d_lock, so d_parent is stable */ 1686 dir_ci = ceph_inode(d_inode(dentry->d_parent)); 1687 if (dir_ci->i_vino.snap == CEPH_SNAPDIR) 1688 return; 1689 1690 /* who calls d_delete() should also disable dcache readdir */ 1691 if (d_really_is_negative(dentry)) 1692 return; 1693 1694 /* d_fsdata does not get cleared until d_release */ 1695 if (!d_unhashed(dentry)) { 1696 __ceph_dir_clear_complete(dir_ci); 1697 return; 1698 } 1699 1700 /* Disable dcache readdir just in case that someone called d_drop() 1701 * or d_invalidate(), but MDS didn't revoke CEPH_CAP_FILE_SHARED 1702 * properly (dcache readdir is still enabled) */ 1703 di = ceph_dentry(dentry); 1704 if (di->offset > 0 && 1705 di->lease_shared_gen == atomic_read(&dir_ci->i_shared_gen)) 1706 __ceph_dir_clear_ordered(dir_ci); 1707 } 1708 1709 /* 1710 * read() on a dir. This weird interface hack only works if mounted 1711 * with '-o dirstat'. 1712 */ 1713 static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size, 1714 loff_t *ppos) 1715 { 1716 struct ceph_dir_file_info *dfi = file->private_data; 1717 struct inode *inode = file_inode(file); 1718 struct ceph_inode_info *ci = ceph_inode(inode); 1719 int left; 1720 const int bufsize = 1024; 1721 1722 if (!ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), DIRSTAT)) 1723 return -EISDIR; 1724 1725 if (!dfi->dir_info) { 1726 dfi->dir_info = kmalloc(bufsize, GFP_KERNEL); 1727 if (!dfi->dir_info) 1728 return -ENOMEM; 1729 dfi->dir_info_len = 1730 snprintf(dfi->dir_info, bufsize, 1731 "entries: %20lld\n" 1732 " files: %20lld\n" 1733 " subdirs: %20lld\n" 1734 "rentries: %20lld\n" 1735 " rfiles: %20lld\n" 1736 " rsubdirs: %20lld\n" 1737 "rbytes: %20lld\n" 1738 "rctime: %10lld.%09ld\n", 1739 ci->i_files + ci->i_subdirs, 1740 ci->i_files, 1741 ci->i_subdirs, 1742 ci->i_rfiles + ci->i_rsubdirs, 1743 ci->i_rfiles, 1744 ci->i_rsubdirs, 1745 ci->i_rbytes, 1746 ci->i_rctime.tv_sec, 1747 ci->i_rctime.tv_nsec); 1748 } 1749 1750 if (*ppos >= dfi->dir_info_len) 1751 return 0; 1752 size = min_t(unsigned, size, dfi->dir_info_len-*ppos); 1753 left = copy_to_user(buf, dfi->dir_info + *ppos, size); 1754 if (left == size) 1755 return -EFAULT; 1756 *ppos += (size - left); 1757 return size - left; 1758 } 1759 1760 1761 1762 /* 1763 * Return name hash for a given dentry. This is dependent on 1764 * the parent directory's hash function. 1765 */ 1766 unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn) 1767 { 1768 struct ceph_inode_info *dci = ceph_inode(dir); 1769 unsigned hash; 1770 1771 switch (dci->i_dir_layout.dl_dir_hash) { 1772 case 0: /* for backward compat */ 1773 case CEPH_STR_HASH_LINUX: 1774 return dn->d_name.hash; 1775 1776 default: 1777 spin_lock(&dn->d_lock); 1778 hash = ceph_str_hash(dci->i_dir_layout.dl_dir_hash, 1779 dn->d_name.name, dn->d_name.len); 1780 spin_unlock(&dn->d_lock); 1781 return hash; 1782 } 1783 } 1784 1785 const struct file_operations ceph_dir_fops = { 1786 .read = ceph_read_dir, 1787 .iterate = ceph_readdir, 1788 .llseek = ceph_dir_llseek, 1789 .open = ceph_open, 1790 .release = ceph_release, 1791 .unlocked_ioctl = ceph_ioctl, 1792 .fsync = ceph_fsync, 1793 .lock = ceph_lock, 1794 .flock = ceph_flock, 1795 }; 1796 1797 const struct file_operations ceph_snapdir_fops = { 1798 .iterate = ceph_readdir, 1799 .llseek = ceph_dir_llseek, 1800 .open = ceph_open, 1801 .release = ceph_release, 1802 }; 1803 1804 const struct inode_operations ceph_dir_iops = { 1805 .lookup = ceph_lookup, 1806 .permission = ceph_permission, 1807 .getattr = ceph_getattr, 1808 .setattr = ceph_setattr, 1809 .listxattr = ceph_listxattr, 1810 .get_acl = ceph_get_acl, 1811 .set_acl = ceph_set_acl, 1812 .mknod = ceph_mknod, 1813 .symlink = ceph_symlink, 1814 .mkdir = ceph_mkdir, 1815 .link = ceph_link, 1816 .unlink = ceph_unlink, 1817 .rmdir = ceph_unlink, 1818 .rename = ceph_rename, 1819 .create = ceph_create, 1820 .atomic_open = ceph_atomic_open, 1821 }; 1822 1823 const struct inode_operations ceph_snapdir_iops = { 1824 .lookup = ceph_lookup, 1825 .permission = ceph_permission, 1826 .getattr = ceph_getattr, 1827 .mkdir = ceph_mkdir, 1828 .rmdir = ceph_unlink, 1829 .rename = ceph_rename, 1830 }; 1831 1832 const struct dentry_operations ceph_dentry_ops = { 1833 .d_revalidate = ceph_d_revalidate, 1834 .d_delete = ceph_d_delete, 1835 .d_release = ceph_d_release, 1836 .d_prune = ceph_d_prune, 1837 .d_init = ceph_d_init, 1838 }; 1839