1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/kernel.h> 6 #include <linux/sched/signal.h> 7 #include <linux/slab.h> 8 #include <linux/vmalloc.h> 9 #include <linux/wait.h> 10 #include <linux/writeback.h> 11 12 #include "super.h" 13 #include "mds_client.h" 14 #include "cache.h" 15 #include <linux/ceph/decode.h> 16 #include <linux/ceph/messenger.h> 17 18 /* 19 * Capability management 20 * 21 * The Ceph metadata servers control client access to inode metadata 22 * and file data by issuing capabilities, granting clients permission 23 * to read and/or write both inode field and file data to OSDs 24 * (storage nodes). Each capability consists of a set of bits 25 * indicating which operations are allowed. 26 * 27 * If the client holds a *_SHARED cap, the client has a coherent value 28 * that can be safely read from the cached inode. 29 * 30 * In the case of a *_EXCL (exclusive) or FILE_WR capabilities, the 31 * client is allowed to change inode attributes (e.g., file size, 32 * mtime), note its dirty state in the ceph_cap, and asynchronously 33 * flush that metadata change to the MDS. 34 * 35 * In the event of a conflicting operation (perhaps by another 36 * client), the MDS will revoke the conflicting client capabilities. 37 * 38 * In order for a client to cache an inode, it must hold a capability 39 * with at least one MDS server. When inodes are released, release 40 * notifications are batched and periodically sent en masse to the MDS 41 * cluster to release server state. 42 */ 43 44 static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc); 45 static void __kick_flushing_caps(struct ceph_mds_client *mdsc, 46 struct ceph_mds_session *session, 47 struct ceph_inode_info *ci, 48 u64 oldest_flush_tid); 49 50 /* 51 * Generate readable cap strings for debugging output. 52 */ 53 #define MAX_CAP_STR 20 54 static char cap_str[MAX_CAP_STR][40]; 55 static DEFINE_SPINLOCK(cap_str_lock); 56 static int last_cap_str; 57 58 static char *gcap_string(char *s, int c) 59 { 60 if (c & CEPH_CAP_GSHARED) 61 *s++ = 's'; 62 if (c & CEPH_CAP_GEXCL) 63 *s++ = 'x'; 64 if (c & CEPH_CAP_GCACHE) 65 *s++ = 'c'; 66 if (c & CEPH_CAP_GRD) 67 *s++ = 'r'; 68 if (c & CEPH_CAP_GWR) 69 *s++ = 'w'; 70 if (c & CEPH_CAP_GBUFFER) 71 *s++ = 'b'; 72 if (c & CEPH_CAP_GWREXTEND) 73 *s++ = 'a'; 74 if (c & CEPH_CAP_GLAZYIO) 75 *s++ = 'l'; 76 return s; 77 } 78 79 const char *ceph_cap_string(int caps) 80 { 81 int i; 82 char *s; 83 int c; 84 85 spin_lock(&cap_str_lock); 86 i = last_cap_str++; 87 if (last_cap_str == MAX_CAP_STR) 88 last_cap_str = 0; 89 spin_unlock(&cap_str_lock); 90 91 s = cap_str[i]; 92 93 if (caps & CEPH_CAP_PIN) 94 *s++ = 'p'; 95 96 c = (caps >> CEPH_CAP_SAUTH) & 3; 97 if (c) { 98 *s++ = 'A'; 99 s = gcap_string(s, c); 100 } 101 102 c = (caps >> CEPH_CAP_SLINK) & 3; 103 if (c) { 104 *s++ = 'L'; 105 s = gcap_string(s, c); 106 } 107 108 c = (caps >> CEPH_CAP_SXATTR) & 3; 109 if (c) { 110 *s++ = 'X'; 111 s = gcap_string(s, c); 112 } 113 114 c = caps >> CEPH_CAP_SFILE; 115 if (c) { 116 *s++ = 'F'; 117 s = gcap_string(s, c); 118 } 119 120 if (s == cap_str[i]) 121 *s++ = '-'; 122 *s = 0; 123 return cap_str[i]; 124 } 125 126 void ceph_caps_init(struct ceph_mds_client *mdsc) 127 { 128 INIT_LIST_HEAD(&mdsc->caps_list); 129 spin_lock_init(&mdsc->caps_list_lock); 130 } 131 132 void ceph_caps_finalize(struct ceph_mds_client *mdsc) 133 { 134 struct ceph_cap *cap; 135 136 spin_lock(&mdsc->caps_list_lock); 137 while (!list_empty(&mdsc->caps_list)) { 138 cap = list_first_entry(&mdsc->caps_list, 139 struct ceph_cap, caps_item); 140 list_del(&cap->caps_item); 141 kmem_cache_free(ceph_cap_cachep, cap); 142 } 143 mdsc->caps_total_count = 0; 144 mdsc->caps_avail_count = 0; 145 mdsc->caps_use_count = 0; 146 mdsc->caps_reserve_count = 0; 147 mdsc->caps_min_count = 0; 148 spin_unlock(&mdsc->caps_list_lock); 149 } 150 151 void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta) 152 { 153 spin_lock(&mdsc->caps_list_lock); 154 mdsc->caps_min_count += delta; 155 BUG_ON(mdsc->caps_min_count < 0); 156 spin_unlock(&mdsc->caps_list_lock); 157 } 158 159 static void __ceph_unreserve_caps(struct ceph_mds_client *mdsc, int nr_caps) 160 { 161 struct ceph_cap *cap; 162 int i; 163 164 if (nr_caps) { 165 BUG_ON(mdsc->caps_reserve_count < nr_caps); 166 mdsc->caps_reserve_count -= nr_caps; 167 if (mdsc->caps_avail_count >= 168 mdsc->caps_reserve_count + mdsc->caps_min_count) { 169 mdsc->caps_total_count -= nr_caps; 170 for (i = 0; i < nr_caps; i++) { 171 cap = list_first_entry(&mdsc->caps_list, 172 struct ceph_cap, caps_item); 173 list_del(&cap->caps_item); 174 kmem_cache_free(ceph_cap_cachep, cap); 175 } 176 } else { 177 mdsc->caps_avail_count += nr_caps; 178 } 179 180 dout("%s: caps %d = %d used + %d resv + %d avail\n", 181 __func__, 182 mdsc->caps_total_count, mdsc->caps_use_count, 183 mdsc->caps_reserve_count, mdsc->caps_avail_count); 184 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 185 mdsc->caps_reserve_count + 186 mdsc->caps_avail_count); 187 } 188 } 189 190 /* 191 * Called under mdsc->mutex. 192 */ 193 int ceph_reserve_caps(struct ceph_mds_client *mdsc, 194 struct ceph_cap_reservation *ctx, int need) 195 { 196 int i, j; 197 struct ceph_cap *cap; 198 int have; 199 int alloc = 0; 200 int max_caps; 201 int err = 0; 202 bool trimmed = false; 203 struct ceph_mds_session *s; 204 LIST_HEAD(newcaps); 205 206 dout("reserve caps ctx=%p need=%d\n", ctx, need); 207 208 /* first reserve any caps that are already allocated */ 209 spin_lock(&mdsc->caps_list_lock); 210 if (mdsc->caps_avail_count >= need) 211 have = need; 212 else 213 have = mdsc->caps_avail_count; 214 mdsc->caps_avail_count -= have; 215 mdsc->caps_reserve_count += have; 216 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 217 mdsc->caps_reserve_count + 218 mdsc->caps_avail_count); 219 spin_unlock(&mdsc->caps_list_lock); 220 221 for (i = have; i < need; ) { 222 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); 223 if (cap) { 224 list_add(&cap->caps_item, &newcaps); 225 alloc++; 226 i++; 227 continue; 228 } 229 230 if (!trimmed) { 231 for (j = 0; j < mdsc->max_sessions; j++) { 232 s = __ceph_lookup_mds_session(mdsc, j); 233 if (!s) 234 continue; 235 mutex_unlock(&mdsc->mutex); 236 237 mutex_lock(&s->s_mutex); 238 max_caps = s->s_nr_caps - (need - i); 239 ceph_trim_caps(mdsc, s, max_caps); 240 mutex_unlock(&s->s_mutex); 241 242 ceph_put_mds_session(s); 243 mutex_lock(&mdsc->mutex); 244 } 245 trimmed = true; 246 247 spin_lock(&mdsc->caps_list_lock); 248 if (mdsc->caps_avail_count) { 249 int more_have; 250 if (mdsc->caps_avail_count >= need - i) 251 more_have = need - i; 252 else 253 more_have = mdsc->caps_avail_count; 254 255 i += more_have; 256 have += more_have; 257 mdsc->caps_avail_count -= more_have; 258 mdsc->caps_reserve_count += more_have; 259 260 } 261 spin_unlock(&mdsc->caps_list_lock); 262 263 continue; 264 } 265 266 pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n", 267 ctx, need, have + alloc); 268 err = -ENOMEM; 269 break; 270 } 271 272 if (!err) { 273 BUG_ON(have + alloc != need); 274 ctx->count = need; 275 } 276 277 spin_lock(&mdsc->caps_list_lock); 278 mdsc->caps_total_count += alloc; 279 mdsc->caps_reserve_count += alloc; 280 list_splice(&newcaps, &mdsc->caps_list); 281 282 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 283 mdsc->caps_reserve_count + 284 mdsc->caps_avail_count); 285 286 if (err) 287 __ceph_unreserve_caps(mdsc, have + alloc); 288 289 spin_unlock(&mdsc->caps_list_lock); 290 291 dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n", 292 ctx, mdsc->caps_total_count, mdsc->caps_use_count, 293 mdsc->caps_reserve_count, mdsc->caps_avail_count); 294 return err; 295 } 296 297 void ceph_unreserve_caps(struct ceph_mds_client *mdsc, 298 struct ceph_cap_reservation *ctx) 299 { 300 dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count); 301 spin_lock(&mdsc->caps_list_lock); 302 __ceph_unreserve_caps(mdsc, ctx->count); 303 ctx->count = 0; 304 spin_unlock(&mdsc->caps_list_lock); 305 } 306 307 struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc, 308 struct ceph_cap_reservation *ctx) 309 { 310 struct ceph_cap *cap = NULL; 311 312 /* temporary, until we do something about cap import/export */ 313 if (!ctx) { 314 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); 315 if (cap) { 316 spin_lock(&mdsc->caps_list_lock); 317 mdsc->caps_use_count++; 318 mdsc->caps_total_count++; 319 spin_unlock(&mdsc->caps_list_lock); 320 } else { 321 spin_lock(&mdsc->caps_list_lock); 322 if (mdsc->caps_avail_count) { 323 BUG_ON(list_empty(&mdsc->caps_list)); 324 325 mdsc->caps_avail_count--; 326 mdsc->caps_use_count++; 327 cap = list_first_entry(&mdsc->caps_list, 328 struct ceph_cap, caps_item); 329 list_del(&cap->caps_item); 330 331 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 332 mdsc->caps_reserve_count + mdsc->caps_avail_count); 333 } 334 spin_unlock(&mdsc->caps_list_lock); 335 } 336 337 return cap; 338 } 339 340 spin_lock(&mdsc->caps_list_lock); 341 dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n", 342 ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count, 343 mdsc->caps_reserve_count, mdsc->caps_avail_count); 344 BUG_ON(!ctx->count); 345 BUG_ON(ctx->count > mdsc->caps_reserve_count); 346 BUG_ON(list_empty(&mdsc->caps_list)); 347 348 ctx->count--; 349 mdsc->caps_reserve_count--; 350 mdsc->caps_use_count++; 351 352 cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item); 353 list_del(&cap->caps_item); 354 355 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 356 mdsc->caps_reserve_count + mdsc->caps_avail_count); 357 spin_unlock(&mdsc->caps_list_lock); 358 return cap; 359 } 360 361 void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap) 362 { 363 spin_lock(&mdsc->caps_list_lock); 364 dout("put_cap %p %d = %d used + %d resv + %d avail\n", 365 cap, mdsc->caps_total_count, mdsc->caps_use_count, 366 mdsc->caps_reserve_count, mdsc->caps_avail_count); 367 mdsc->caps_use_count--; 368 /* 369 * Keep some preallocated caps around (ceph_min_count), to 370 * avoid lots of free/alloc churn. 371 */ 372 if (mdsc->caps_avail_count >= mdsc->caps_reserve_count + 373 mdsc->caps_min_count) { 374 mdsc->caps_total_count--; 375 kmem_cache_free(ceph_cap_cachep, cap); 376 } else { 377 mdsc->caps_avail_count++; 378 list_add(&cap->caps_item, &mdsc->caps_list); 379 } 380 381 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 382 mdsc->caps_reserve_count + mdsc->caps_avail_count); 383 spin_unlock(&mdsc->caps_list_lock); 384 } 385 386 void ceph_reservation_status(struct ceph_fs_client *fsc, 387 int *total, int *avail, int *used, int *reserved, 388 int *min) 389 { 390 struct ceph_mds_client *mdsc = fsc->mdsc; 391 392 spin_lock(&mdsc->caps_list_lock); 393 394 if (total) 395 *total = mdsc->caps_total_count; 396 if (avail) 397 *avail = mdsc->caps_avail_count; 398 if (used) 399 *used = mdsc->caps_use_count; 400 if (reserved) 401 *reserved = mdsc->caps_reserve_count; 402 if (min) 403 *min = mdsc->caps_min_count; 404 405 spin_unlock(&mdsc->caps_list_lock); 406 } 407 408 /* 409 * Find ceph_cap for given mds, if any. 410 * 411 * Called with i_ceph_lock held. 412 */ 413 static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds) 414 { 415 struct ceph_cap *cap; 416 struct rb_node *n = ci->i_caps.rb_node; 417 418 while (n) { 419 cap = rb_entry(n, struct ceph_cap, ci_node); 420 if (mds < cap->mds) 421 n = n->rb_left; 422 else if (mds > cap->mds) 423 n = n->rb_right; 424 else 425 return cap; 426 } 427 return NULL; 428 } 429 430 struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds) 431 { 432 struct ceph_cap *cap; 433 434 spin_lock(&ci->i_ceph_lock); 435 cap = __get_cap_for_mds(ci, mds); 436 spin_unlock(&ci->i_ceph_lock); 437 return cap; 438 } 439 440 /* 441 * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1. 442 */ 443 static int __ceph_get_cap_mds(struct ceph_inode_info *ci) 444 { 445 struct ceph_cap *cap; 446 int mds = -1; 447 struct rb_node *p; 448 449 /* prefer mds with WR|BUFFER|EXCL caps */ 450 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 451 cap = rb_entry(p, struct ceph_cap, ci_node); 452 mds = cap->mds; 453 if (cap->issued & (CEPH_CAP_FILE_WR | 454 CEPH_CAP_FILE_BUFFER | 455 CEPH_CAP_FILE_EXCL)) 456 break; 457 } 458 return mds; 459 } 460 461 int ceph_get_cap_mds(struct inode *inode) 462 { 463 struct ceph_inode_info *ci = ceph_inode(inode); 464 int mds; 465 spin_lock(&ci->i_ceph_lock); 466 mds = __ceph_get_cap_mds(ceph_inode(inode)); 467 spin_unlock(&ci->i_ceph_lock); 468 return mds; 469 } 470 471 /* 472 * Called under i_ceph_lock. 473 */ 474 static void __insert_cap_node(struct ceph_inode_info *ci, 475 struct ceph_cap *new) 476 { 477 struct rb_node **p = &ci->i_caps.rb_node; 478 struct rb_node *parent = NULL; 479 struct ceph_cap *cap = NULL; 480 481 while (*p) { 482 parent = *p; 483 cap = rb_entry(parent, struct ceph_cap, ci_node); 484 if (new->mds < cap->mds) 485 p = &(*p)->rb_left; 486 else if (new->mds > cap->mds) 487 p = &(*p)->rb_right; 488 else 489 BUG(); 490 } 491 492 rb_link_node(&new->ci_node, parent, p); 493 rb_insert_color(&new->ci_node, &ci->i_caps); 494 } 495 496 /* 497 * (re)set cap hold timeouts, which control the delayed release 498 * of unused caps back to the MDS. Should be called on cap use. 499 */ 500 static void __cap_set_timeouts(struct ceph_mds_client *mdsc, 501 struct ceph_inode_info *ci) 502 { 503 struct ceph_mount_options *ma = mdsc->fsc->mount_options; 504 505 ci->i_hold_caps_min = round_jiffies(jiffies + 506 ma->caps_wanted_delay_min * HZ); 507 ci->i_hold_caps_max = round_jiffies(jiffies + 508 ma->caps_wanted_delay_max * HZ); 509 dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode, 510 ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies); 511 } 512 513 /* 514 * (Re)queue cap at the end of the delayed cap release list. 515 * 516 * If I_FLUSH is set, leave the inode at the front of the list. 517 * 518 * Caller holds i_ceph_lock 519 * -> we take mdsc->cap_delay_lock 520 */ 521 static void __cap_delay_requeue(struct ceph_mds_client *mdsc, 522 struct ceph_inode_info *ci, 523 bool set_timeout) 524 { 525 dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode, 526 ci->i_ceph_flags, ci->i_hold_caps_max); 527 if (!mdsc->stopping) { 528 spin_lock(&mdsc->cap_delay_lock); 529 if (!list_empty(&ci->i_cap_delay_list)) { 530 if (ci->i_ceph_flags & CEPH_I_FLUSH) 531 goto no_change; 532 list_del_init(&ci->i_cap_delay_list); 533 } 534 if (set_timeout) 535 __cap_set_timeouts(mdsc, ci); 536 list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list); 537 no_change: 538 spin_unlock(&mdsc->cap_delay_lock); 539 } 540 } 541 542 /* 543 * Queue an inode for immediate writeback. Mark inode with I_FLUSH, 544 * indicating we should send a cap message to flush dirty metadata 545 * asap, and move to the front of the delayed cap list. 546 */ 547 static void __cap_delay_requeue_front(struct ceph_mds_client *mdsc, 548 struct ceph_inode_info *ci) 549 { 550 dout("__cap_delay_requeue_front %p\n", &ci->vfs_inode); 551 spin_lock(&mdsc->cap_delay_lock); 552 ci->i_ceph_flags |= CEPH_I_FLUSH; 553 if (!list_empty(&ci->i_cap_delay_list)) 554 list_del_init(&ci->i_cap_delay_list); 555 list_add(&ci->i_cap_delay_list, &mdsc->cap_delay_list); 556 spin_unlock(&mdsc->cap_delay_lock); 557 } 558 559 /* 560 * Cancel delayed work on cap. 561 * 562 * Caller must hold i_ceph_lock. 563 */ 564 static void __cap_delay_cancel(struct ceph_mds_client *mdsc, 565 struct ceph_inode_info *ci) 566 { 567 dout("__cap_delay_cancel %p\n", &ci->vfs_inode); 568 if (list_empty(&ci->i_cap_delay_list)) 569 return; 570 spin_lock(&mdsc->cap_delay_lock); 571 list_del_init(&ci->i_cap_delay_list); 572 spin_unlock(&mdsc->cap_delay_lock); 573 } 574 575 /* 576 * Common issue checks for add_cap, handle_cap_grant. 577 */ 578 static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap, 579 unsigned issued) 580 { 581 unsigned had = __ceph_caps_issued(ci, NULL); 582 583 /* 584 * Each time we receive FILE_CACHE anew, we increment 585 * i_rdcache_gen. 586 */ 587 if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && 588 (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) { 589 ci->i_rdcache_gen++; 590 } 591 592 /* 593 * If FILE_SHARED is newly issued, mark dir not complete. We don't 594 * know what happened to this directory while we didn't have the cap. 595 * If FILE_SHARED is being revoked, also mark dir not complete. It 596 * stops on-going cached readdir. 597 */ 598 if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) { 599 if (issued & CEPH_CAP_FILE_SHARED) 600 atomic_inc(&ci->i_shared_gen); 601 if (S_ISDIR(ci->vfs_inode.i_mode)) { 602 dout(" marking %p NOT complete\n", &ci->vfs_inode); 603 __ceph_dir_clear_complete(ci); 604 } 605 } 606 } 607 608 /* 609 * Add a capability under the given MDS session. 610 * 611 * Caller should hold session snap_rwsem (read) and s_mutex. 612 * 613 * @fmode is the open file mode, if we are opening a file, otherwise 614 * it is < 0. (This is so we can atomically add the cap and add an 615 * open file reference to it.) 616 */ 617 void ceph_add_cap(struct inode *inode, 618 struct ceph_mds_session *session, u64 cap_id, 619 int fmode, unsigned issued, unsigned wanted, 620 unsigned seq, unsigned mseq, u64 realmino, int flags, 621 struct ceph_cap **new_cap) 622 { 623 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 624 struct ceph_inode_info *ci = ceph_inode(inode); 625 struct ceph_cap *cap; 626 int mds = session->s_mds; 627 int actual_wanted; 628 629 dout("add_cap %p mds%d cap %llx %s seq %d\n", inode, 630 session->s_mds, cap_id, ceph_cap_string(issued), seq); 631 632 /* 633 * If we are opening the file, include file mode wanted bits 634 * in wanted. 635 */ 636 if (fmode >= 0) 637 wanted |= ceph_caps_for_mode(fmode); 638 639 cap = __get_cap_for_mds(ci, mds); 640 if (!cap) { 641 cap = *new_cap; 642 *new_cap = NULL; 643 644 cap->issued = 0; 645 cap->implemented = 0; 646 cap->mds = mds; 647 cap->mds_wanted = 0; 648 cap->mseq = 0; 649 650 cap->ci = ci; 651 __insert_cap_node(ci, cap); 652 653 /* add to session cap list */ 654 cap->session = session; 655 spin_lock(&session->s_cap_lock); 656 list_add_tail(&cap->session_caps, &session->s_caps); 657 session->s_nr_caps++; 658 spin_unlock(&session->s_cap_lock); 659 } else { 660 /* 661 * auth mds of the inode changed. we received the cap export 662 * message, but still haven't received the cap import message. 663 * handle_cap_export() updated the new auth MDS' cap. 664 * 665 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing 666 * a message that was send before the cap import message. So 667 * don't remove caps. 668 */ 669 if (ceph_seq_cmp(seq, cap->seq) <= 0) { 670 WARN_ON(cap != ci->i_auth_cap); 671 WARN_ON(cap->cap_id != cap_id); 672 seq = cap->seq; 673 mseq = cap->mseq; 674 issued |= cap->issued; 675 flags |= CEPH_CAP_FLAG_AUTH; 676 } 677 } 678 679 if (!ci->i_snap_realm || 680 ((flags & CEPH_CAP_FLAG_AUTH) && 681 realmino != (u64)-1 && ci->i_snap_realm->ino != realmino)) { 682 /* 683 * add this inode to the appropriate snap realm 684 */ 685 struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc, 686 realmino); 687 if (realm) { 688 struct ceph_snap_realm *oldrealm = ci->i_snap_realm; 689 if (oldrealm) { 690 spin_lock(&oldrealm->inodes_with_caps_lock); 691 list_del_init(&ci->i_snap_realm_item); 692 spin_unlock(&oldrealm->inodes_with_caps_lock); 693 } 694 695 spin_lock(&realm->inodes_with_caps_lock); 696 list_add(&ci->i_snap_realm_item, 697 &realm->inodes_with_caps); 698 ci->i_snap_realm = realm; 699 if (realm->ino == ci->i_vino.ino) 700 realm->inode = inode; 701 spin_unlock(&realm->inodes_with_caps_lock); 702 703 if (oldrealm) 704 ceph_put_snap_realm(mdsc, oldrealm); 705 } else { 706 pr_err("ceph_add_cap: couldn't find snap realm %llx\n", 707 realmino); 708 WARN_ON(!realm); 709 } 710 } 711 712 __check_cap_issue(ci, cap, issued); 713 714 /* 715 * If we are issued caps we don't want, or the mds' wanted 716 * value appears to be off, queue a check so we'll release 717 * later and/or update the mds wanted value. 718 */ 719 actual_wanted = __ceph_caps_wanted(ci); 720 if ((wanted & ~actual_wanted) || 721 (issued & ~actual_wanted & CEPH_CAP_ANY_WR)) { 722 dout(" issued %s, mds wanted %s, actual %s, queueing\n", 723 ceph_cap_string(issued), ceph_cap_string(wanted), 724 ceph_cap_string(actual_wanted)); 725 __cap_delay_requeue(mdsc, ci, true); 726 } 727 728 if (flags & CEPH_CAP_FLAG_AUTH) { 729 if (!ci->i_auth_cap || 730 ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) { 731 ci->i_auth_cap = cap; 732 cap->mds_wanted = wanted; 733 } 734 } else { 735 WARN_ON(ci->i_auth_cap == cap); 736 } 737 738 dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n", 739 inode, ceph_vinop(inode), cap, ceph_cap_string(issued), 740 ceph_cap_string(issued|cap->issued), seq, mds); 741 cap->cap_id = cap_id; 742 cap->issued = issued; 743 cap->implemented |= issued; 744 if (ceph_seq_cmp(mseq, cap->mseq) > 0) 745 cap->mds_wanted = wanted; 746 else 747 cap->mds_wanted |= wanted; 748 cap->seq = seq; 749 cap->issue_seq = seq; 750 cap->mseq = mseq; 751 cap->cap_gen = session->s_cap_gen; 752 753 if (fmode >= 0) 754 __ceph_get_fmode(ci, fmode); 755 } 756 757 /* 758 * Return true if cap has not timed out and belongs to the current 759 * generation of the MDS session (i.e. has not gone 'stale' due to 760 * us losing touch with the mds). 761 */ 762 static int __cap_is_valid(struct ceph_cap *cap) 763 { 764 unsigned long ttl; 765 u32 gen; 766 767 spin_lock(&cap->session->s_gen_ttl_lock); 768 gen = cap->session->s_cap_gen; 769 ttl = cap->session->s_cap_ttl; 770 spin_unlock(&cap->session->s_gen_ttl_lock); 771 772 if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) { 773 dout("__cap_is_valid %p cap %p issued %s " 774 "but STALE (gen %u vs %u)\n", &cap->ci->vfs_inode, 775 cap, ceph_cap_string(cap->issued), cap->cap_gen, gen); 776 return 0; 777 } 778 779 return 1; 780 } 781 782 /* 783 * Return set of valid cap bits issued to us. Note that caps time 784 * out, and may be invalidated in bulk if the client session times out 785 * and session->s_cap_gen is bumped. 786 */ 787 int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented) 788 { 789 int have = ci->i_snap_caps; 790 struct ceph_cap *cap; 791 struct rb_node *p; 792 793 if (implemented) 794 *implemented = 0; 795 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 796 cap = rb_entry(p, struct ceph_cap, ci_node); 797 if (!__cap_is_valid(cap)) 798 continue; 799 dout("__ceph_caps_issued %p cap %p issued %s\n", 800 &ci->vfs_inode, cap, ceph_cap_string(cap->issued)); 801 have |= cap->issued; 802 if (implemented) 803 *implemented |= cap->implemented; 804 } 805 /* 806 * exclude caps issued by non-auth MDS, but are been revoking 807 * by the auth MDS. The non-auth MDS should be revoking/exporting 808 * these caps, but the message is delayed. 809 */ 810 if (ci->i_auth_cap) { 811 cap = ci->i_auth_cap; 812 have &= ~cap->implemented | cap->issued; 813 } 814 return have; 815 } 816 817 /* 818 * Get cap bits issued by caps other than @ocap 819 */ 820 int __ceph_caps_issued_other(struct ceph_inode_info *ci, struct ceph_cap *ocap) 821 { 822 int have = ci->i_snap_caps; 823 struct ceph_cap *cap; 824 struct rb_node *p; 825 826 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 827 cap = rb_entry(p, struct ceph_cap, ci_node); 828 if (cap == ocap) 829 continue; 830 if (!__cap_is_valid(cap)) 831 continue; 832 have |= cap->issued; 833 } 834 return have; 835 } 836 837 /* 838 * Move a cap to the end of the LRU (oldest caps at list head, newest 839 * at list tail). 840 */ 841 static void __touch_cap(struct ceph_cap *cap) 842 { 843 struct ceph_mds_session *s = cap->session; 844 845 spin_lock(&s->s_cap_lock); 846 if (!s->s_cap_iterator) { 847 dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap, 848 s->s_mds); 849 list_move_tail(&cap->session_caps, &s->s_caps); 850 } else { 851 dout("__touch_cap %p cap %p mds%d NOP, iterating over caps\n", 852 &cap->ci->vfs_inode, cap, s->s_mds); 853 } 854 spin_unlock(&s->s_cap_lock); 855 } 856 857 /* 858 * Check if we hold the given mask. If so, move the cap(s) to the 859 * front of their respective LRUs. (This is the preferred way for 860 * callers to check for caps they want.) 861 */ 862 int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch) 863 { 864 struct ceph_cap *cap; 865 struct rb_node *p; 866 int have = ci->i_snap_caps; 867 868 if ((have & mask) == mask) { 869 dout("__ceph_caps_issued_mask %p snap issued %s" 870 " (mask %s)\n", &ci->vfs_inode, 871 ceph_cap_string(have), 872 ceph_cap_string(mask)); 873 return 1; 874 } 875 876 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 877 cap = rb_entry(p, struct ceph_cap, ci_node); 878 if (!__cap_is_valid(cap)) 879 continue; 880 if ((cap->issued & mask) == mask) { 881 dout("__ceph_caps_issued_mask %p cap %p issued %s" 882 " (mask %s)\n", &ci->vfs_inode, cap, 883 ceph_cap_string(cap->issued), 884 ceph_cap_string(mask)); 885 if (touch) 886 __touch_cap(cap); 887 return 1; 888 } 889 890 /* does a combination of caps satisfy mask? */ 891 have |= cap->issued; 892 if ((have & mask) == mask) { 893 dout("__ceph_caps_issued_mask %p combo issued %s" 894 " (mask %s)\n", &ci->vfs_inode, 895 ceph_cap_string(cap->issued), 896 ceph_cap_string(mask)); 897 if (touch) { 898 struct rb_node *q; 899 900 /* touch this + preceding caps */ 901 __touch_cap(cap); 902 for (q = rb_first(&ci->i_caps); q != p; 903 q = rb_next(q)) { 904 cap = rb_entry(q, struct ceph_cap, 905 ci_node); 906 if (!__cap_is_valid(cap)) 907 continue; 908 __touch_cap(cap); 909 } 910 } 911 return 1; 912 } 913 } 914 915 return 0; 916 } 917 918 /* 919 * Return true if mask caps are currently being revoked by an MDS. 920 */ 921 int __ceph_caps_revoking_other(struct ceph_inode_info *ci, 922 struct ceph_cap *ocap, int mask) 923 { 924 struct ceph_cap *cap; 925 struct rb_node *p; 926 927 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 928 cap = rb_entry(p, struct ceph_cap, ci_node); 929 if (cap != ocap && 930 (cap->implemented & ~cap->issued & mask)) 931 return 1; 932 } 933 return 0; 934 } 935 936 int ceph_caps_revoking(struct ceph_inode_info *ci, int mask) 937 { 938 struct inode *inode = &ci->vfs_inode; 939 int ret; 940 941 spin_lock(&ci->i_ceph_lock); 942 ret = __ceph_caps_revoking_other(ci, NULL, mask); 943 spin_unlock(&ci->i_ceph_lock); 944 dout("ceph_caps_revoking %p %s = %d\n", inode, 945 ceph_cap_string(mask), ret); 946 return ret; 947 } 948 949 int __ceph_caps_used(struct ceph_inode_info *ci) 950 { 951 int used = 0; 952 if (ci->i_pin_ref) 953 used |= CEPH_CAP_PIN; 954 if (ci->i_rd_ref) 955 used |= CEPH_CAP_FILE_RD; 956 if (ci->i_rdcache_ref || 957 (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */ 958 ci->vfs_inode.i_data.nrpages)) 959 used |= CEPH_CAP_FILE_CACHE; 960 if (ci->i_wr_ref) 961 used |= CEPH_CAP_FILE_WR; 962 if (ci->i_wb_ref || ci->i_wrbuffer_ref) 963 used |= CEPH_CAP_FILE_BUFFER; 964 return used; 965 } 966 967 /* 968 * wanted, by virtue of open file modes 969 */ 970 int __ceph_caps_file_wanted(struct ceph_inode_info *ci) 971 { 972 int i, bits = 0; 973 for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { 974 if (ci->i_nr_by_mode[i]) 975 bits |= 1 << i; 976 } 977 if (bits == 0) 978 return 0; 979 return ceph_caps_for_mode(bits >> 1); 980 } 981 982 /* 983 * Return caps we have registered with the MDS(s) as 'wanted'. 984 */ 985 int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check) 986 { 987 struct ceph_cap *cap; 988 struct rb_node *p; 989 int mds_wanted = 0; 990 991 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 992 cap = rb_entry(p, struct ceph_cap, ci_node); 993 if (check && !__cap_is_valid(cap)) 994 continue; 995 if (cap == ci->i_auth_cap) 996 mds_wanted |= cap->mds_wanted; 997 else 998 mds_wanted |= (cap->mds_wanted & ~CEPH_CAP_ANY_FILE_WR); 999 } 1000 return mds_wanted; 1001 } 1002 1003 /* 1004 * called under i_ceph_lock 1005 */ 1006 static int __ceph_is_single_caps(struct ceph_inode_info *ci) 1007 { 1008 return rb_first(&ci->i_caps) == rb_last(&ci->i_caps); 1009 } 1010 1011 static int __ceph_is_any_caps(struct ceph_inode_info *ci) 1012 { 1013 return !RB_EMPTY_ROOT(&ci->i_caps); 1014 } 1015 1016 int ceph_is_any_caps(struct inode *inode) 1017 { 1018 struct ceph_inode_info *ci = ceph_inode(inode); 1019 int ret; 1020 1021 spin_lock(&ci->i_ceph_lock); 1022 ret = __ceph_is_any_caps(ci); 1023 spin_unlock(&ci->i_ceph_lock); 1024 1025 return ret; 1026 } 1027 1028 static void drop_inode_snap_realm(struct ceph_inode_info *ci) 1029 { 1030 struct ceph_snap_realm *realm = ci->i_snap_realm; 1031 spin_lock(&realm->inodes_with_caps_lock); 1032 list_del_init(&ci->i_snap_realm_item); 1033 ci->i_snap_realm_counter++; 1034 ci->i_snap_realm = NULL; 1035 spin_unlock(&realm->inodes_with_caps_lock); 1036 ceph_put_snap_realm(ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc, 1037 realm); 1038 } 1039 1040 /* 1041 * Remove a cap. Take steps to deal with a racing iterate_session_caps. 1042 * 1043 * caller should hold i_ceph_lock. 1044 * caller will not hold session s_mutex if called from destroy_inode. 1045 */ 1046 void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) 1047 { 1048 struct ceph_mds_session *session = cap->session; 1049 struct ceph_inode_info *ci = cap->ci; 1050 struct ceph_mds_client *mdsc = 1051 ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; 1052 int removed = 0; 1053 1054 dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode); 1055 1056 /* remove from session list */ 1057 spin_lock(&session->s_cap_lock); 1058 if (session->s_cap_iterator == cap) { 1059 /* not yet, we are iterating over this very cap */ 1060 dout("__ceph_remove_cap delaying %p removal from session %p\n", 1061 cap, cap->session); 1062 } else { 1063 list_del_init(&cap->session_caps); 1064 session->s_nr_caps--; 1065 cap->session = NULL; 1066 removed = 1; 1067 } 1068 /* protect backpointer with s_cap_lock: see iterate_session_caps */ 1069 cap->ci = NULL; 1070 1071 /* 1072 * s_cap_reconnect is protected by s_cap_lock. no one changes 1073 * s_cap_gen while session is in the reconnect state. 1074 */ 1075 if (queue_release && 1076 (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) { 1077 cap->queue_release = 1; 1078 if (removed) { 1079 list_add_tail(&cap->session_caps, 1080 &session->s_cap_releases); 1081 session->s_num_cap_releases++; 1082 removed = 0; 1083 } 1084 } else { 1085 cap->queue_release = 0; 1086 } 1087 cap->cap_ino = ci->i_vino.ino; 1088 1089 spin_unlock(&session->s_cap_lock); 1090 1091 /* remove from inode list */ 1092 rb_erase(&cap->ci_node, &ci->i_caps); 1093 if (ci->i_auth_cap == cap) 1094 ci->i_auth_cap = NULL; 1095 1096 if (removed) 1097 ceph_put_cap(mdsc, cap); 1098 1099 /* when reconnect denied, we remove session caps forcibly, 1100 * i_wr_ref can be non-zero. If there are ongoing write, 1101 * keep i_snap_realm. 1102 */ 1103 if (!__ceph_is_any_caps(ci) && ci->i_wr_ref == 0 && ci->i_snap_realm) 1104 drop_inode_snap_realm(ci); 1105 1106 if (!__ceph_is_any_real_caps(ci)) 1107 __cap_delay_cancel(mdsc, ci); 1108 } 1109 1110 struct cap_msg_args { 1111 struct ceph_mds_session *session; 1112 u64 ino, cid, follows; 1113 u64 flush_tid, oldest_flush_tid, size, max_size; 1114 u64 xattr_version; 1115 struct ceph_buffer *xattr_buf; 1116 struct timespec64 atime, mtime, ctime; 1117 int op, caps, wanted, dirty; 1118 u32 seq, issue_seq, mseq, time_warp_seq; 1119 u32 flags; 1120 kuid_t uid; 1121 kgid_t gid; 1122 umode_t mode; 1123 bool inline_data; 1124 }; 1125 1126 /* 1127 * Build and send a cap message to the given MDS. 1128 * 1129 * Caller should be holding s_mutex. 1130 */ 1131 static int send_cap_msg(struct cap_msg_args *arg) 1132 { 1133 struct ceph_mds_caps *fc; 1134 struct ceph_msg *msg; 1135 void *p; 1136 size_t extra_len; 1137 struct timespec64 zerotime = {0}; 1138 struct ceph_osd_client *osdc = &arg->session->s_mdsc->fsc->client->osdc; 1139 1140 dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" 1141 " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu" 1142 " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(arg->op), 1143 arg->cid, arg->ino, ceph_cap_string(arg->caps), 1144 ceph_cap_string(arg->wanted), ceph_cap_string(arg->dirty), 1145 arg->seq, arg->issue_seq, arg->flush_tid, arg->oldest_flush_tid, 1146 arg->mseq, arg->follows, arg->size, arg->max_size, 1147 arg->xattr_version, 1148 arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0); 1149 1150 /* flock buffer size + inline version + inline data size + 1151 * osd_epoch_barrier + oldest_flush_tid */ 1152 extra_len = 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4; 1153 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len, 1154 GFP_NOFS, false); 1155 if (!msg) 1156 return -ENOMEM; 1157 1158 msg->hdr.version = cpu_to_le16(10); 1159 msg->hdr.tid = cpu_to_le64(arg->flush_tid); 1160 1161 fc = msg->front.iov_base; 1162 memset(fc, 0, sizeof(*fc)); 1163 1164 fc->cap_id = cpu_to_le64(arg->cid); 1165 fc->op = cpu_to_le32(arg->op); 1166 fc->seq = cpu_to_le32(arg->seq); 1167 fc->issue_seq = cpu_to_le32(arg->issue_seq); 1168 fc->migrate_seq = cpu_to_le32(arg->mseq); 1169 fc->caps = cpu_to_le32(arg->caps); 1170 fc->wanted = cpu_to_le32(arg->wanted); 1171 fc->dirty = cpu_to_le32(arg->dirty); 1172 fc->ino = cpu_to_le64(arg->ino); 1173 fc->snap_follows = cpu_to_le64(arg->follows); 1174 1175 fc->size = cpu_to_le64(arg->size); 1176 fc->max_size = cpu_to_le64(arg->max_size); 1177 ceph_encode_timespec64(&fc->mtime, &arg->mtime); 1178 ceph_encode_timespec64(&fc->atime, &arg->atime); 1179 ceph_encode_timespec64(&fc->ctime, &arg->ctime); 1180 fc->time_warp_seq = cpu_to_le32(arg->time_warp_seq); 1181 1182 fc->uid = cpu_to_le32(from_kuid(&init_user_ns, arg->uid)); 1183 fc->gid = cpu_to_le32(from_kgid(&init_user_ns, arg->gid)); 1184 fc->mode = cpu_to_le32(arg->mode); 1185 1186 fc->xattr_version = cpu_to_le64(arg->xattr_version); 1187 if (arg->xattr_buf) { 1188 msg->middle = ceph_buffer_get(arg->xattr_buf); 1189 fc->xattr_len = cpu_to_le32(arg->xattr_buf->vec.iov_len); 1190 msg->hdr.middle_len = cpu_to_le32(arg->xattr_buf->vec.iov_len); 1191 } 1192 1193 p = fc + 1; 1194 /* flock buffer size (version 2) */ 1195 ceph_encode_32(&p, 0); 1196 /* inline version (version 4) */ 1197 ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE); 1198 /* inline data size */ 1199 ceph_encode_32(&p, 0); 1200 /* 1201 * osd_epoch_barrier (version 5) 1202 * The epoch_barrier is protected osdc->lock, so READ_ONCE here in 1203 * case it was recently changed 1204 */ 1205 ceph_encode_32(&p, READ_ONCE(osdc->epoch_barrier)); 1206 /* oldest_flush_tid (version 6) */ 1207 ceph_encode_64(&p, arg->oldest_flush_tid); 1208 1209 /* 1210 * caller_uid/caller_gid (version 7) 1211 * 1212 * Currently, we don't properly track which caller dirtied the caps 1213 * last, and force a flush of them when there is a conflict. For now, 1214 * just set this to 0:0, to emulate how the MDS has worked up to now. 1215 */ 1216 ceph_encode_32(&p, 0); 1217 ceph_encode_32(&p, 0); 1218 1219 /* pool namespace (version 8) (mds always ignores this) */ 1220 ceph_encode_32(&p, 0); 1221 1222 /* 1223 * btime and change_attr (version 9) 1224 * 1225 * We just zero these out for now, as the MDS ignores them unless 1226 * the requisite feature flags are set (which we don't do yet). 1227 */ 1228 ceph_encode_timespec64(p, &zerotime); 1229 p += sizeof(struct ceph_timespec); 1230 ceph_encode_64(&p, 0); 1231 1232 /* Advisory flags (version 10) */ 1233 ceph_encode_32(&p, arg->flags); 1234 1235 ceph_con_send(&arg->session->s_con, msg); 1236 return 0; 1237 } 1238 1239 /* 1240 * Queue cap releases when an inode is dropped from our cache. Since 1241 * inode is about to be destroyed, there is no need for i_ceph_lock. 1242 */ 1243 void ceph_queue_caps_release(struct inode *inode) 1244 { 1245 struct ceph_inode_info *ci = ceph_inode(inode); 1246 struct rb_node *p; 1247 1248 p = rb_first(&ci->i_caps); 1249 while (p) { 1250 struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node); 1251 p = rb_next(p); 1252 __ceph_remove_cap(cap, true); 1253 } 1254 } 1255 1256 /* 1257 * Send a cap msg on the given inode. Update our caps state, then 1258 * drop i_ceph_lock and send the message. 1259 * 1260 * Make note of max_size reported/requested from mds, revoked caps 1261 * that have now been implemented. 1262 * 1263 * Make half-hearted attempt ot to invalidate page cache if we are 1264 * dropping RDCACHE. Note that this will leave behind locked pages 1265 * that we'll then need to deal with elsewhere. 1266 * 1267 * Return non-zero if delayed release, or we experienced an error 1268 * such that the caller should requeue + retry later. 1269 * 1270 * called with i_ceph_lock, then drops it. 1271 * caller should hold snap_rwsem (read), s_mutex. 1272 */ 1273 static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, 1274 int op, bool sync, int used, int want, int retain, 1275 int flushing, u64 flush_tid, u64 oldest_flush_tid) 1276 __releases(cap->ci->i_ceph_lock) 1277 { 1278 struct ceph_inode_info *ci = cap->ci; 1279 struct inode *inode = &ci->vfs_inode; 1280 struct cap_msg_args arg; 1281 int held, revoking; 1282 int wake = 0; 1283 int delayed = 0; 1284 int ret; 1285 1286 held = cap->issued | cap->implemented; 1287 revoking = cap->implemented & ~cap->issued; 1288 retain &= ~revoking; 1289 1290 dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n", 1291 inode, cap, cap->session, 1292 ceph_cap_string(held), ceph_cap_string(held & retain), 1293 ceph_cap_string(revoking)); 1294 BUG_ON((retain & CEPH_CAP_PIN) == 0); 1295 1296 arg.session = cap->session; 1297 1298 /* don't release wanted unless we've waited a bit. */ 1299 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 && 1300 time_before(jiffies, ci->i_hold_caps_min)) { 1301 dout(" delaying issued %s -> %s, wanted %s -> %s on send\n", 1302 ceph_cap_string(cap->issued), 1303 ceph_cap_string(cap->issued & retain), 1304 ceph_cap_string(cap->mds_wanted), 1305 ceph_cap_string(want)); 1306 want |= cap->mds_wanted; 1307 retain |= cap->issued; 1308 delayed = 1; 1309 } 1310 ci->i_ceph_flags &= ~(CEPH_I_NODELAY | CEPH_I_FLUSH); 1311 if (want & ~cap->mds_wanted) { 1312 /* user space may open/close single file frequently. 1313 * This avoids droping mds_wanted immediately after 1314 * requesting new mds_wanted. 1315 */ 1316 __cap_set_timeouts(mdsc, ci); 1317 } 1318 1319 cap->issued &= retain; /* drop bits we don't want */ 1320 if (cap->implemented & ~cap->issued) { 1321 /* 1322 * Wake up any waiters on wanted -> needed transition. 1323 * This is due to the weird transition from buffered 1324 * to sync IO... we need to flush dirty pages _before_ 1325 * allowing sync writes to avoid reordering. 1326 */ 1327 wake = 1; 1328 } 1329 cap->implemented &= cap->issued | used; 1330 cap->mds_wanted = want; 1331 1332 arg.ino = ceph_vino(inode).ino; 1333 arg.cid = cap->cap_id; 1334 arg.follows = flushing ? ci->i_head_snapc->seq : 0; 1335 arg.flush_tid = flush_tid; 1336 arg.oldest_flush_tid = oldest_flush_tid; 1337 1338 arg.size = inode->i_size; 1339 ci->i_reported_size = arg.size; 1340 arg.max_size = ci->i_wanted_max_size; 1341 ci->i_requested_max_size = arg.max_size; 1342 1343 if (flushing & CEPH_CAP_XATTR_EXCL) { 1344 __ceph_build_xattrs_blob(ci); 1345 arg.xattr_version = ci->i_xattrs.version; 1346 arg.xattr_buf = ci->i_xattrs.blob; 1347 } else { 1348 arg.xattr_buf = NULL; 1349 } 1350 1351 arg.mtime = inode->i_mtime; 1352 arg.atime = inode->i_atime; 1353 arg.ctime = inode->i_ctime; 1354 1355 arg.op = op; 1356 arg.caps = cap->implemented; 1357 arg.wanted = want; 1358 arg.dirty = flushing; 1359 1360 arg.seq = cap->seq; 1361 arg.issue_seq = cap->issue_seq; 1362 arg.mseq = cap->mseq; 1363 arg.time_warp_seq = ci->i_time_warp_seq; 1364 1365 arg.uid = inode->i_uid; 1366 arg.gid = inode->i_gid; 1367 arg.mode = inode->i_mode; 1368 1369 arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE; 1370 if (list_empty(&ci->i_cap_snaps)) 1371 arg.flags = CEPH_CLIENT_CAPS_NO_CAPSNAP; 1372 else 1373 arg.flags = CEPH_CLIENT_CAPS_PENDING_CAPSNAP; 1374 if (sync) 1375 arg.flags |= CEPH_CLIENT_CAPS_SYNC; 1376 1377 spin_unlock(&ci->i_ceph_lock); 1378 1379 ret = send_cap_msg(&arg); 1380 if (ret < 0) { 1381 dout("error sending cap msg, must requeue %p\n", inode); 1382 delayed = 1; 1383 } 1384 1385 if (wake) 1386 wake_up_all(&ci->i_cap_wq); 1387 1388 return delayed; 1389 } 1390 1391 static inline int __send_flush_snap(struct inode *inode, 1392 struct ceph_mds_session *session, 1393 struct ceph_cap_snap *capsnap, 1394 u32 mseq, u64 oldest_flush_tid) 1395 { 1396 struct cap_msg_args arg; 1397 1398 arg.session = session; 1399 arg.ino = ceph_vino(inode).ino; 1400 arg.cid = 0; 1401 arg.follows = capsnap->follows; 1402 arg.flush_tid = capsnap->cap_flush.tid; 1403 arg.oldest_flush_tid = oldest_flush_tid; 1404 1405 arg.size = capsnap->size; 1406 arg.max_size = 0; 1407 arg.xattr_version = capsnap->xattr_version; 1408 arg.xattr_buf = capsnap->xattr_blob; 1409 1410 arg.atime = capsnap->atime; 1411 arg.mtime = capsnap->mtime; 1412 arg.ctime = capsnap->ctime; 1413 1414 arg.op = CEPH_CAP_OP_FLUSHSNAP; 1415 arg.caps = capsnap->issued; 1416 arg.wanted = 0; 1417 arg.dirty = capsnap->dirty; 1418 1419 arg.seq = 0; 1420 arg.issue_seq = 0; 1421 arg.mseq = mseq; 1422 arg.time_warp_seq = capsnap->time_warp_seq; 1423 1424 arg.uid = capsnap->uid; 1425 arg.gid = capsnap->gid; 1426 arg.mode = capsnap->mode; 1427 1428 arg.inline_data = capsnap->inline_data; 1429 arg.flags = 0; 1430 1431 return send_cap_msg(&arg); 1432 } 1433 1434 /* 1435 * When a snapshot is taken, clients accumulate dirty metadata on 1436 * inodes with capabilities in ceph_cap_snaps to describe the file 1437 * state at the time the snapshot was taken. This must be flushed 1438 * asynchronously back to the MDS once sync writes complete and dirty 1439 * data is written out. 1440 * 1441 * Called under i_ceph_lock. Takes s_mutex as needed. 1442 */ 1443 static void __ceph_flush_snaps(struct ceph_inode_info *ci, 1444 struct ceph_mds_session *session) 1445 __releases(ci->i_ceph_lock) 1446 __acquires(ci->i_ceph_lock) 1447 { 1448 struct inode *inode = &ci->vfs_inode; 1449 struct ceph_mds_client *mdsc = session->s_mdsc; 1450 struct ceph_cap_snap *capsnap; 1451 u64 oldest_flush_tid = 0; 1452 u64 first_tid = 1, last_tid = 0; 1453 1454 dout("__flush_snaps %p session %p\n", inode, session); 1455 1456 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 1457 /* 1458 * we need to wait for sync writes to complete and for dirty 1459 * pages to be written out. 1460 */ 1461 if (capsnap->dirty_pages || capsnap->writing) 1462 break; 1463 1464 /* should be removed by ceph_try_drop_cap_snap() */ 1465 BUG_ON(!capsnap->need_flush); 1466 1467 /* only flush each capsnap once */ 1468 if (capsnap->cap_flush.tid > 0) { 1469 dout(" already flushed %p, skipping\n", capsnap); 1470 continue; 1471 } 1472 1473 spin_lock(&mdsc->cap_dirty_lock); 1474 capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid; 1475 list_add_tail(&capsnap->cap_flush.g_list, 1476 &mdsc->cap_flush_list); 1477 if (oldest_flush_tid == 0) 1478 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 1479 if (list_empty(&ci->i_flushing_item)) { 1480 list_add_tail(&ci->i_flushing_item, 1481 &session->s_cap_flushing); 1482 } 1483 spin_unlock(&mdsc->cap_dirty_lock); 1484 1485 list_add_tail(&capsnap->cap_flush.i_list, 1486 &ci->i_cap_flush_list); 1487 1488 if (first_tid == 1) 1489 first_tid = capsnap->cap_flush.tid; 1490 last_tid = capsnap->cap_flush.tid; 1491 } 1492 1493 ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS; 1494 1495 while (first_tid <= last_tid) { 1496 struct ceph_cap *cap = ci->i_auth_cap; 1497 struct ceph_cap_flush *cf; 1498 int ret; 1499 1500 if (!(cap && cap->session == session)) { 1501 dout("__flush_snaps %p auth cap %p not mds%d, " 1502 "stop\n", inode, cap, session->s_mds); 1503 break; 1504 } 1505 1506 ret = -ENOENT; 1507 list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) { 1508 if (cf->tid >= first_tid) { 1509 ret = 0; 1510 break; 1511 } 1512 } 1513 if (ret < 0) 1514 break; 1515 1516 first_tid = cf->tid + 1; 1517 1518 capsnap = container_of(cf, struct ceph_cap_snap, cap_flush); 1519 refcount_inc(&capsnap->nref); 1520 spin_unlock(&ci->i_ceph_lock); 1521 1522 dout("__flush_snaps %p capsnap %p tid %llu %s\n", 1523 inode, capsnap, cf->tid, ceph_cap_string(capsnap->dirty)); 1524 1525 ret = __send_flush_snap(inode, session, capsnap, cap->mseq, 1526 oldest_flush_tid); 1527 if (ret < 0) { 1528 pr_err("__flush_snaps: error sending cap flushsnap, " 1529 "ino (%llx.%llx) tid %llu follows %llu\n", 1530 ceph_vinop(inode), cf->tid, capsnap->follows); 1531 } 1532 1533 ceph_put_cap_snap(capsnap); 1534 spin_lock(&ci->i_ceph_lock); 1535 } 1536 } 1537 1538 void ceph_flush_snaps(struct ceph_inode_info *ci, 1539 struct ceph_mds_session **psession) 1540 { 1541 struct inode *inode = &ci->vfs_inode; 1542 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 1543 struct ceph_mds_session *session = NULL; 1544 int mds; 1545 1546 dout("ceph_flush_snaps %p\n", inode); 1547 if (psession) 1548 session = *psession; 1549 retry: 1550 spin_lock(&ci->i_ceph_lock); 1551 if (!(ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)) { 1552 dout(" no capsnap needs flush, doing nothing\n"); 1553 goto out; 1554 } 1555 if (!ci->i_auth_cap) { 1556 dout(" no auth cap (migrating?), doing nothing\n"); 1557 goto out; 1558 } 1559 1560 mds = ci->i_auth_cap->session->s_mds; 1561 if (session && session->s_mds != mds) { 1562 dout(" oops, wrong session %p mutex\n", session); 1563 mutex_unlock(&session->s_mutex); 1564 ceph_put_mds_session(session); 1565 session = NULL; 1566 } 1567 if (!session) { 1568 spin_unlock(&ci->i_ceph_lock); 1569 mutex_lock(&mdsc->mutex); 1570 session = __ceph_lookup_mds_session(mdsc, mds); 1571 mutex_unlock(&mdsc->mutex); 1572 if (session) { 1573 dout(" inverting session/ino locks on %p\n", session); 1574 mutex_lock(&session->s_mutex); 1575 } 1576 goto retry; 1577 } 1578 1579 // make sure flushsnap messages are sent in proper order. 1580 if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { 1581 __kick_flushing_caps(mdsc, session, ci, 0); 1582 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 1583 } 1584 1585 __ceph_flush_snaps(ci, session); 1586 out: 1587 spin_unlock(&ci->i_ceph_lock); 1588 1589 if (psession) { 1590 *psession = session; 1591 } else if (session) { 1592 mutex_unlock(&session->s_mutex); 1593 ceph_put_mds_session(session); 1594 } 1595 /* we flushed them all; remove this inode from the queue */ 1596 spin_lock(&mdsc->snap_flush_lock); 1597 list_del_init(&ci->i_snap_flush_item); 1598 spin_unlock(&mdsc->snap_flush_lock); 1599 } 1600 1601 /* 1602 * Mark caps dirty. If inode is newly dirty, return the dirty flags. 1603 * Caller is then responsible for calling __mark_inode_dirty with the 1604 * returned flags value. 1605 */ 1606 int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, 1607 struct ceph_cap_flush **pcf) 1608 { 1609 struct ceph_mds_client *mdsc = 1610 ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; 1611 struct inode *inode = &ci->vfs_inode; 1612 int was = ci->i_dirty_caps; 1613 int dirty = 0; 1614 1615 if (!ci->i_auth_cap) { 1616 pr_warn("__mark_dirty_caps %p %llx mask %s, " 1617 "but no auth cap (session was closed?)\n", 1618 inode, ceph_ino(inode), ceph_cap_string(mask)); 1619 return 0; 1620 } 1621 1622 dout("__mark_dirty_caps %p %s dirty %s -> %s\n", &ci->vfs_inode, 1623 ceph_cap_string(mask), ceph_cap_string(was), 1624 ceph_cap_string(was | mask)); 1625 ci->i_dirty_caps |= mask; 1626 if (was == 0) { 1627 WARN_ON_ONCE(ci->i_prealloc_cap_flush); 1628 swap(ci->i_prealloc_cap_flush, *pcf); 1629 1630 if (!ci->i_head_snapc) { 1631 WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem)); 1632 ci->i_head_snapc = ceph_get_snap_context( 1633 ci->i_snap_realm->cached_context); 1634 } 1635 dout(" inode %p now dirty snapc %p auth cap %p\n", 1636 &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); 1637 BUG_ON(!list_empty(&ci->i_dirty_item)); 1638 spin_lock(&mdsc->cap_dirty_lock); 1639 list_add(&ci->i_dirty_item, &mdsc->cap_dirty); 1640 spin_unlock(&mdsc->cap_dirty_lock); 1641 if (ci->i_flushing_caps == 0) { 1642 ihold(inode); 1643 dirty |= I_DIRTY_SYNC; 1644 } 1645 } else { 1646 WARN_ON_ONCE(!ci->i_prealloc_cap_flush); 1647 } 1648 BUG_ON(list_empty(&ci->i_dirty_item)); 1649 if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) && 1650 (mask & CEPH_CAP_FILE_BUFFER)) 1651 dirty |= I_DIRTY_DATASYNC; 1652 __cap_delay_requeue(mdsc, ci, true); 1653 return dirty; 1654 } 1655 1656 struct ceph_cap_flush *ceph_alloc_cap_flush(void) 1657 { 1658 return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL); 1659 } 1660 1661 void ceph_free_cap_flush(struct ceph_cap_flush *cf) 1662 { 1663 if (cf) 1664 kmem_cache_free(ceph_cap_flush_cachep, cf); 1665 } 1666 1667 static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc) 1668 { 1669 if (!list_empty(&mdsc->cap_flush_list)) { 1670 struct ceph_cap_flush *cf = 1671 list_first_entry(&mdsc->cap_flush_list, 1672 struct ceph_cap_flush, g_list); 1673 return cf->tid; 1674 } 1675 return 0; 1676 } 1677 1678 /* 1679 * Remove cap_flush from the mdsc's or inode's flushing cap list. 1680 * Return true if caller needs to wake up flush waiters. 1681 */ 1682 static bool __finish_cap_flush(struct ceph_mds_client *mdsc, 1683 struct ceph_inode_info *ci, 1684 struct ceph_cap_flush *cf) 1685 { 1686 struct ceph_cap_flush *prev; 1687 bool wake = cf->wake; 1688 if (mdsc) { 1689 /* are there older pending cap flushes? */ 1690 if (wake && cf->g_list.prev != &mdsc->cap_flush_list) { 1691 prev = list_prev_entry(cf, g_list); 1692 prev->wake = true; 1693 wake = false; 1694 } 1695 list_del(&cf->g_list); 1696 } else if (ci) { 1697 if (wake && cf->i_list.prev != &ci->i_cap_flush_list) { 1698 prev = list_prev_entry(cf, i_list); 1699 prev->wake = true; 1700 wake = false; 1701 } 1702 list_del(&cf->i_list); 1703 } else { 1704 BUG_ON(1); 1705 } 1706 return wake; 1707 } 1708 1709 /* 1710 * Add dirty inode to the flushing list. Assigned a seq number so we 1711 * can wait for caps to flush without starving. 1712 * 1713 * Called under i_ceph_lock. 1714 */ 1715 static int __mark_caps_flushing(struct inode *inode, 1716 struct ceph_mds_session *session, bool wake, 1717 u64 *flush_tid, u64 *oldest_flush_tid) 1718 { 1719 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 1720 struct ceph_inode_info *ci = ceph_inode(inode); 1721 struct ceph_cap_flush *cf = NULL; 1722 int flushing; 1723 1724 BUG_ON(ci->i_dirty_caps == 0); 1725 BUG_ON(list_empty(&ci->i_dirty_item)); 1726 BUG_ON(!ci->i_prealloc_cap_flush); 1727 1728 flushing = ci->i_dirty_caps; 1729 dout("__mark_caps_flushing flushing %s, flushing_caps %s -> %s\n", 1730 ceph_cap_string(flushing), 1731 ceph_cap_string(ci->i_flushing_caps), 1732 ceph_cap_string(ci->i_flushing_caps | flushing)); 1733 ci->i_flushing_caps |= flushing; 1734 ci->i_dirty_caps = 0; 1735 dout(" inode %p now !dirty\n", inode); 1736 1737 swap(cf, ci->i_prealloc_cap_flush); 1738 cf->caps = flushing; 1739 cf->wake = wake; 1740 1741 spin_lock(&mdsc->cap_dirty_lock); 1742 list_del_init(&ci->i_dirty_item); 1743 1744 cf->tid = ++mdsc->last_cap_flush_tid; 1745 list_add_tail(&cf->g_list, &mdsc->cap_flush_list); 1746 *oldest_flush_tid = __get_oldest_flush_tid(mdsc); 1747 1748 if (list_empty(&ci->i_flushing_item)) { 1749 list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); 1750 mdsc->num_cap_flushing++; 1751 } 1752 spin_unlock(&mdsc->cap_dirty_lock); 1753 1754 list_add_tail(&cf->i_list, &ci->i_cap_flush_list); 1755 1756 *flush_tid = cf->tid; 1757 return flushing; 1758 } 1759 1760 /* 1761 * try to invalidate mapping pages without blocking. 1762 */ 1763 static int try_nonblocking_invalidate(struct inode *inode) 1764 { 1765 struct ceph_inode_info *ci = ceph_inode(inode); 1766 u32 invalidating_gen = ci->i_rdcache_gen; 1767 1768 spin_unlock(&ci->i_ceph_lock); 1769 invalidate_mapping_pages(&inode->i_data, 0, -1); 1770 spin_lock(&ci->i_ceph_lock); 1771 1772 if (inode->i_data.nrpages == 0 && 1773 invalidating_gen == ci->i_rdcache_gen) { 1774 /* success. */ 1775 dout("try_nonblocking_invalidate %p success\n", inode); 1776 /* save any racing async invalidate some trouble */ 1777 ci->i_rdcache_revoking = ci->i_rdcache_gen - 1; 1778 return 0; 1779 } 1780 dout("try_nonblocking_invalidate %p failed\n", inode); 1781 return -1; 1782 } 1783 1784 bool __ceph_should_report_size(struct ceph_inode_info *ci) 1785 { 1786 loff_t size = ci->vfs_inode.i_size; 1787 /* mds will adjust max size according to the reported size */ 1788 if (ci->i_flushing_caps & CEPH_CAP_FILE_WR) 1789 return false; 1790 if (size >= ci->i_max_size) 1791 return true; 1792 /* half of previous max_size increment has been used */ 1793 if (ci->i_max_size > ci->i_reported_size && 1794 (size << 1) >= ci->i_max_size + ci->i_reported_size) 1795 return true; 1796 return false; 1797 } 1798 1799 /* 1800 * Swiss army knife function to examine currently used and wanted 1801 * versus held caps. Release, flush, ack revoked caps to mds as 1802 * appropriate. 1803 * 1804 * CHECK_CAPS_NODELAY - caller is delayed work and we should not delay 1805 * cap release further. 1806 * CHECK_CAPS_AUTHONLY - we should only check the auth cap 1807 * CHECK_CAPS_FLUSH - we should flush any dirty caps immediately, without 1808 * further delay. 1809 */ 1810 void ceph_check_caps(struct ceph_inode_info *ci, int flags, 1811 struct ceph_mds_session *session) 1812 { 1813 struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode); 1814 struct ceph_mds_client *mdsc = fsc->mdsc; 1815 struct inode *inode = &ci->vfs_inode; 1816 struct ceph_cap *cap; 1817 u64 flush_tid, oldest_flush_tid; 1818 int file_wanted, used, cap_used; 1819 int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ 1820 int issued, implemented, want, retain, revoking, flushing = 0; 1821 int mds = -1; /* keep track of how far we've gone through i_caps list 1822 to avoid an infinite loop on retry */ 1823 struct rb_node *p; 1824 int delayed = 0, sent = 0; 1825 bool no_delay = flags & CHECK_CAPS_NODELAY; 1826 bool queue_invalidate = false; 1827 bool tried_invalidate = false; 1828 1829 /* if we are unmounting, flush any unused caps immediately. */ 1830 if (mdsc->stopping) 1831 no_delay = true; 1832 1833 spin_lock(&ci->i_ceph_lock); 1834 1835 if (ci->i_ceph_flags & CEPH_I_FLUSH) 1836 flags |= CHECK_CAPS_FLUSH; 1837 1838 if (!(flags & CHECK_CAPS_AUTHONLY) || 1839 (ci->i_auth_cap && __ceph_is_single_caps(ci))) 1840 __cap_delay_cancel(mdsc, ci); 1841 1842 goto retry_locked; 1843 retry: 1844 spin_lock(&ci->i_ceph_lock); 1845 retry_locked: 1846 file_wanted = __ceph_caps_file_wanted(ci); 1847 used = __ceph_caps_used(ci); 1848 issued = __ceph_caps_issued(ci, &implemented); 1849 revoking = implemented & ~issued; 1850 1851 want = file_wanted; 1852 retain = file_wanted | used | CEPH_CAP_PIN; 1853 if (!mdsc->stopping && inode->i_nlink > 0) { 1854 if (file_wanted) { 1855 retain |= CEPH_CAP_ANY; /* be greedy */ 1856 } else if (S_ISDIR(inode->i_mode) && 1857 (issued & CEPH_CAP_FILE_SHARED) && 1858 __ceph_dir_is_complete(ci)) { 1859 /* 1860 * If a directory is complete, we want to keep 1861 * the exclusive cap. So that MDS does not end up 1862 * revoking the shared cap on every create/unlink 1863 * operation. 1864 */ 1865 want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL; 1866 retain |= want; 1867 } else { 1868 1869 retain |= CEPH_CAP_ANY_SHARED; 1870 /* 1871 * keep RD only if we didn't have the file open RW, 1872 * because then the mds would revoke it anyway to 1873 * journal max_size=0. 1874 */ 1875 if (ci->i_max_size == 0) 1876 retain |= CEPH_CAP_ANY_RD; 1877 } 1878 } 1879 1880 dout("check_caps %p file_want %s used %s dirty %s flushing %s" 1881 " issued %s revoking %s retain %s %s%s%s\n", inode, 1882 ceph_cap_string(file_wanted), 1883 ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps), 1884 ceph_cap_string(ci->i_flushing_caps), 1885 ceph_cap_string(issued), ceph_cap_string(revoking), 1886 ceph_cap_string(retain), 1887 (flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "", 1888 (flags & CHECK_CAPS_NODELAY) ? " NODELAY" : "", 1889 (flags & CHECK_CAPS_FLUSH) ? " FLUSH" : ""); 1890 1891 /* 1892 * If we no longer need to hold onto old our caps, and we may 1893 * have cached pages, but don't want them, then try to invalidate. 1894 * If we fail, it's because pages are locked.... try again later. 1895 */ 1896 if ((!no_delay || mdsc->stopping) && 1897 !S_ISDIR(inode->i_mode) && /* ignore readdir cache */ 1898 !(ci->i_wb_ref || ci->i_wrbuffer_ref) && /* no dirty pages... */ 1899 inode->i_data.nrpages && /* have cached pages */ 1900 (revoking & (CEPH_CAP_FILE_CACHE| 1901 CEPH_CAP_FILE_LAZYIO)) && /* or revoking cache */ 1902 !tried_invalidate) { 1903 dout("check_caps trying to invalidate on %p\n", inode); 1904 if (try_nonblocking_invalidate(inode) < 0) { 1905 dout("check_caps queuing invalidate\n"); 1906 queue_invalidate = true; 1907 ci->i_rdcache_revoking = ci->i_rdcache_gen; 1908 } 1909 tried_invalidate = true; 1910 goto retry_locked; 1911 } 1912 1913 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 1914 cap = rb_entry(p, struct ceph_cap, ci_node); 1915 1916 /* avoid looping forever */ 1917 if (mds >= cap->mds || 1918 ((flags & CHECK_CAPS_AUTHONLY) && cap != ci->i_auth_cap)) 1919 continue; 1920 1921 /* NOTE: no side-effects allowed, until we take s_mutex */ 1922 1923 cap_used = used; 1924 if (ci->i_auth_cap && cap != ci->i_auth_cap) 1925 cap_used &= ~ci->i_auth_cap->issued; 1926 1927 revoking = cap->implemented & ~cap->issued; 1928 dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n", 1929 cap->mds, cap, ceph_cap_string(cap_used), 1930 ceph_cap_string(cap->issued), 1931 ceph_cap_string(cap->implemented), 1932 ceph_cap_string(revoking)); 1933 1934 if (cap == ci->i_auth_cap && 1935 (cap->issued & CEPH_CAP_FILE_WR)) { 1936 /* request larger max_size from MDS? */ 1937 if (ci->i_wanted_max_size > ci->i_max_size && 1938 ci->i_wanted_max_size > ci->i_requested_max_size) { 1939 dout("requesting new max_size\n"); 1940 goto ack; 1941 } 1942 1943 /* approaching file_max? */ 1944 if (__ceph_should_report_size(ci)) { 1945 dout("i_size approaching max_size\n"); 1946 goto ack; 1947 } 1948 } 1949 /* flush anything dirty? */ 1950 if (cap == ci->i_auth_cap) { 1951 if ((flags & CHECK_CAPS_FLUSH) && ci->i_dirty_caps) { 1952 dout("flushing dirty caps\n"); 1953 goto ack; 1954 } 1955 if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) { 1956 dout("flushing snap caps\n"); 1957 goto ack; 1958 } 1959 } 1960 1961 /* completed revocation? going down and there are no caps? */ 1962 if (revoking && (revoking & cap_used) == 0) { 1963 dout("completed revocation of %s\n", 1964 ceph_cap_string(cap->implemented & ~cap->issued)); 1965 goto ack; 1966 } 1967 1968 /* want more caps from mds? */ 1969 if (want & ~(cap->mds_wanted | cap->issued)) 1970 goto ack; 1971 1972 /* things we might delay */ 1973 if ((cap->issued & ~retain) == 0 && 1974 cap->mds_wanted == want) 1975 continue; /* nope, all good */ 1976 1977 if (no_delay) 1978 goto ack; 1979 1980 /* delay? */ 1981 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 && 1982 time_before(jiffies, ci->i_hold_caps_max)) { 1983 dout(" delaying issued %s -> %s, wanted %s -> %s\n", 1984 ceph_cap_string(cap->issued), 1985 ceph_cap_string(cap->issued & retain), 1986 ceph_cap_string(cap->mds_wanted), 1987 ceph_cap_string(want)); 1988 delayed++; 1989 continue; 1990 } 1991 1992 ack: 1993 if (ci->i_ceph_flags & CEPH_I_NOFLUSH) { 1994 dout(" skipping %p I_NOFLUSH set\n", inode); 1995 continue; 1996 } 1997 1998 if (session && session != cap->session) { 1999 dout("oops, wrong session %p mutex\n", session); 2000 mutex_unlock(&session->s_mutex); 2001 session = NULL; 2002 } 2003 if (!session) { 2004 session = cap->session; 2005 if (mutex_trylock(&session->s_mutex) == 0) { 2006 dout("inverting session/ino locks on %p\n", 2007 session); 2008 spin_unlock(&ci->i_ceph_lock); 2009 if (took_snap_rwsem) { 2010 up_read(&mdsc->snap_rwsem); 2011 took_snap_rwsem = 0; 2012 } 2013 mutex_lock(&session->s_mutex); 2014 goto retry; 2015 } 2016 } 2017 2018 /* kick flushing and flush snaps before sending normal 2019 * cap message */ 2020 if (cap == ci->i_auth_cap && 2021 (ci->i_ceph_flags & 2022 (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) { 2023 if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { 2024 __kick_flushing_caps(mdsc, session, ci, 0); 2025 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 2026 } 2027 if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) 2028 __ceph_flush_snaps(ci, session); 2029 2030 goto retry_locked; 2031 } 2032 2033 /* take snap_rwsem after session mutex */ 2034 if (!took_snap_rwsem) { 2035 if (down_read_trylock(&mdsc->snap_rwsem) == 0) { 2036 dout("inverting snap/in locks on %p\n", 2037 inode); 2038 spin_unlock(&ci->i_ceph_lock); 2039 down_read(&mdsc->snap_rwsem); 2040 took_snap_rwsem = 1; 2041 goto retry; 2042 } 2043 took_snap_rwsem = 1; 2044 } 2045 2046 if (cap == ci->i_auth_cap && ci->i_dirty_caps) { 2047 flushing = __mark_caps_flushing(inode, session, false, 2048 &flush_tid, 2049 &oldest_flush_tid); 2050 } else { 2051 flushing = 0; 2052 flush_tid = 0; 2053 spin_lock(&mdsc->cap_dirty_lock); 2054 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 2055 spin_unlock(&mdsc->cap_dirty_lock); 2056 } 2057 2058 mds = cap->mds; /* remember mds, so we don't repeat */ 2059 sent++; 2060 2061 /* __send_cap drops i_ceph_lock */ 2062 delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, false, 2063 cap_used, want, retain, flushing, 2064 flush_tid, oldest_flush_tid); 2065 goto retry; /* retake i_ceph_lock and restart our cap scan. */ 2066 } 2067 2068 /* Reschedule delayed caps release if we delayed anything */ 2069 if (delayed) 2070 __cap_delay_requeue(mdsc, ci, false); 2071 2072 spin_unlock(&ci->i_ceph_lock); 2073 2074 if (queue_invalidate) 2075 ceph_queue_invalidate(inode); 2076 2077 if (session) 2078 mutex_unlock(&session->s_mutex); 2079 if (took_snap_rwsem) 2080 up_read(&mdsc->snap_rwsem); 2081 } 2082 2083 /* 2084 * Try to flush dirty caps back to the auth mds. 2085 */ 2086 static int try_flush_caps(struct inode *inode, u64 *ptid) 2087 { 2088 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 2089 struct ceph_inode_info *ci = ceph_inode(inode); 2090 struct ceph_mds_session *session = NULL; 2091 int flushing = 0; 2092 u64 flush_tid = 0, oldest_flush_tid = 0; 2093 2094 retry: 2095 spin_lock(&ci->i_ceph_lock); 2096 if (ci->i_ceph_flags & CEPH_I_NOFLUSH) { 2097 spin_unlock(&ci->i_ceph_lock); 2098 dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode); 2099 goto out; 2100 } 2101 if (ci->i_dirty_caps && ci->i_auth_cap) { 2102 struct ceph_cap *cap = ci->i_auth_cap; 2103 int used = __ceph_caps_used(ci); 2104 int want = __ceph_caps_wanted(ci); 2105 int delayed; 2106 2107 if (!session || session != cap->session) { 2108 spin_unlock(&ci->i_ceph_lock); 2109 if (session) 2110 mutex_unlock(&session->s_mutex); 2111 session = cap->session; 2112 mutex_lock(&session->s_mutex); 2113 goto retry; 2114 } 2115 if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) { 2116 spin_unlock(&ci->i_ceph_lock); 2117 goto out; 2118 } 2119 2120 flushing = __mark_caps_flushing(inode, session, true, 2121 &flush_tid, &oldest_flush_tid); 2122 2123 /* __send_cap drops i_ceph_lock */ 2124 delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, true, 2125 used, want, (cap->issued | cap->implemented), 2126 flushing, flush_tid, oldest_flush_tid); 2127 2128 if (delayed) { 2129 spin_lock(&ci->i_ceph_lock); 2130 __cap_delay_requeue(mdsc, ci, true); 2131 spin_unlock(&ci->i_ceph_lock); 2132 } 2133 } else { 2134 if (!list_empty(&ci->i_cap_flush_list)) { 2135 struct ceph_cap_flush *cf = 2136 list_last_entry(&ci->i_cap_flush_list, 2137 struct ceph_cap_flush, i_list); 2138 cf->wake = true; 2139 flush_tid = cf->tid; 2140 } 2141 flushing = ci->i_flushing_caps; 2142 spin_unlock(&ci->i_ceph_lock); 2143 } 2144 out: 2145 if (session) 2146 mutex_unlock(&session->s_mutex); 2147 2148 *ptid = flush_tid; 2149 return flushing; 2150 } 2151 2152 /* 2153 * Return true if we've flushed caps through the given flush_tid. 2154 */ 2155 static int caps_are_flushed(struct inode *inode, u64 flush_tid) 2156 { 2157 struct ceph_inode_info *ci = ceph_inode(inode); 2158 int ret = 1; 2159 2160 spin_lock(&ci->i_ceph_lock); 2161 if (!list_empty(&ci->i_cap_flush_list)) { 2162 struct ceph_cap_flush * cf = 2163 list_first_entry(&ci->i_cap_flush_list, 2164 struct ceph_cap_flush, i_list); 2165 if (cf->tid <= flush_tid) 2166 ret = 0; 2167 } 2168 spin_unlock(&ci->i_ceph_lock); 2169 return ret; 2170 } 2171 2172 /* 2173 * wait for any unsafe requests to complete. 2174 */ 2175 static int unsafe_request_wait(struct inode *inode) 2176 { 2177 struct ceph_inode_info *ci = ceph_inode(inode); 2178 struct ceph_mds_request *req1 = NULL, *req2 = NULL; 2179 int ret, err = 0; 2180 2181 spin_lock(&ci->i_unsafe_lock); 2182 if (S_ISDIR(inode->i_mode) && !list_empty(&ci->i_unsafe_dirops)) { 2183 req1 = list_last_entry(&ci->i_unsafe_dirops, 2184 struct ceph_mds_request, 2185 r_unsafe_dir_item); 2186 ceph_mdsc_get_request(req1); 2187 } 2188 if (!list_empty(&ci->i_unsafe_iops)) { 2189 req2 = list_last_entry(&ci->i_unsafe_iops, 2190 struct ceph_mds_request, 2191 r_unsafe_target_item); 2192 ceph_mdsc_get_request(req2); 2193 } 2194 spin_unlock(&ci->i_unsafe_lock); 2195 2196 dout("unsafe_request_wait %p wait on tid %llu %llu\n", 2197 inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL); 2198 if (req1) { 2199 ret = !wait_for_completion_timeout(&req1->r_safe_completion, 2200 ceph_timeout_jiffies(req1->r_timeout)); 2201 if (ret) 2202 err = -EIO; 2203 ceph_mdsc_put_request(req1); 2204 } 2205 if (req2) { 2206 ret = !wait_for_completion_timeout(&req2->r_safe_completion, 2207 ceph_timeout_jiffies(req2->r_timeout)); 2208 if (ret) 2209 err = -EIO; 2210 ceph_mdsc_put_request(req2); 2211 } 2212 return err; 2213 } 2214 2215 int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) 2216 { 2217 struct inode *inode = file->f_mapping->host; 2218 struct ceph_inode_info *ci = ceph_inode(inode); 2219 u64 flush_tid; 2220 int ret; 2221 int dirty; 2222 2223 dout("fsync %p%s\n", inode, datasync ? " datasync" : ""); 2224 2225 ret = file_write_and_wait_range(file, start, end); 2226 if (ret < 0) 2227 goto out; 2228 2229 if (datasync) 2230 goto out; 2231 2232 inode_lock(inode); 2233 2234 dirty = try_flush_caps(inode, &flush_tid); 2235 dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); 2236 2237 ret = unsafe_request_wait(inode); 2238 2239 /* 2240 * only wait on non-file metadata writeback (the mds 2241 * can recover size and mtime, so we don't need to 2242 * wait for that) 2243 */ 2244 if (!ret && (dirty & ~CEPH_CAP_ANY_FILE_WR)) { 2245 ret = wait_event_interruptible(ci->i_cap_wq, 2246 caps_are_flushed(inode, flush_tid)); 2247 } 2248 inode_unlock(inode); 2249 out: 2250 dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret); 2251 return ret; 2252 } 2253 2254 /* 2255 * Flush any dirty caps back to the mds. If we aren't asked to wait, 2256 * queue inode for flush but don't do so immediately, because we can 2257 * get by with fewer MDS messages if we wait for data writeback to 2258 * complete first. 2259 */ 2260 int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) 2261 { 2262 struct ceph_inode_info *ci = ceph_inode(inode); 2263 u64 flush_tid; 2264 int err = 0; 2265 int dirty; 2266 int wait = (wbc->sync_mode == WB_SYNC_ALL && !wbc->for_sync); 2267 2268 dout("write_inode %p wait=%d\n", inode, wait); 2269 if (wait) { 2270 dirty = try_flush_caps(inode, &flush_tid); 2271 if (dirty) 2272 err = wait_event_interruptible(ci->i_cap_wq, 2273 caps_are_flushed(inode, flush_tid)); 2274 } else { 2275 struct ceph_mds_client *mdsc = 2276 ceph_sb_to_client(inode->i_sb)->mdsc; 2277 2278 spin_lock(&ci->i_ceph_lock); 2279 if (__ceph_caps_dirty(ci)) 2280 __cap_delay_requeue_front(mdsc, ci); 2281 spin_unlock(&ci->i_ceph_lock); 2282 } 2283 return err; 2284 } 2285 2286 static void __kick_flushing_caps(struct ceph_mds_client *mdsc, 2287 struct ceph_mds_session *session, 2288 struct ceph_inode_info *ci, 2289 u64 oldest_flush_tid) 2290 __releases(ci->i_ceph_lock) 2291 __acquires(ci->i_ceph_lock) 2292 { 2293 struct inode *inode = &ci->vfs_inode; 2294 struct ceph_cap *cap; 2295 struct ceph_cap_flush *cf; 2296 int ret; 2297 u64 first_tid = 0; 2298 2299 list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) { 2300 if (cf->tid < first_tid) 2301 continue; 2302 2303 cap = ci->i_auth_cap; 2304 if (!(cap && cap->session == session)) { 2305 pr_err("%p auth cap %p not mds%d ???\n", 2306 inode, cap, session->s_mds); 2307 break; 2308 } 2309 2310 first_tid = cf->tid + 1; 2311 2312 if (cf->caps) { 2313 dout("kick_flushing_caps %p cap %p tid %llu %s\n", 2314 inode, cap, cf->tid, ceph_cap_string(cf->caps)); 2315 ci->i_ceph_flags |= CEPH_I_NODELAY; 2316 ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, 2317 false, __ceph_caps_used(ci), 2318 __ceph_caps_wanted(ci), 2319 cap->issued | cap->implemented, 2320 cf->caps, cf->tid, oldest_flush_tid); 2321 if (ret) { 2322 pr_err("kick_flushing_caps: error sending " 2323 "cap flush, ino (%llx.%llx) " 2324 "tid %llu flushing %s\n", 2325 ceph_vinop(inode), cf->tid, 2326 ceph_cap_string(cf->caps)); 2327 } 2328 } else { 2329 struct ceph_cap_snap *capsnap = 2330 container_of(cf, struct ceph_cap_snap, 2331 cap_flush); 2332 dout("kick_flushing_caps %p capsnap %p tid %llu %s\n", 2333 inode, capsnap, cf->tid, 2334 ceph_cap_string(capsnap->dirty)); 2335 2336 refcount_inc(&capsnap->nref); 2337 spin_unlock(&ci->i_ceph_lock); 2338 2339 ret = __send_flush_snap(inode, session, capsnap, cap->mseq, 2340 oldest_flush_tid); 2341 if (ret < 0) { 2342 pr_err("kick_flushing_caps: error sending " 2343 "cap flushsnap, ino (%llx.%llx) " 2344 "tid %llu follows %llu\n", 2345 ceph_vinop(inode), cf->tid, 2346 capsnap->follows); 2347 } 2348 2349 ceph_put_cap_snap(capsnap); 2350 } 2351 2352 spin_lock(&ci->i_ceph_lock); 2353 } 2354 } 2355 2356 void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, 2357 struct ceph_mds_session *session) 2358 { 2359 struct ceph_inode_info *ci; 2360 struct ceph_cap *cap; 2361 u64 oldest_flush_tid; 2362 2363 dout("early_kick_flushing_caps mds%d\n", session->s_mds); 2364 2365 spin_lock(&mdsc->cap_dirty_lock); 2366 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 2367 spin_unlock(&mdsc->cap_dirty_lock); 2368 2369 list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { 2370 spin_lock(&ci->i_ceph_lock); 2371 cap = ci->i_auth_cap; 2372 if (!(cap && cap->session == session)) { 2373 pr_err("%p auth cap %p not mds%d ???\n", 2374 &ci->vfs_inode, cap, session->s_mds); 2375 spin_unlock(&ci->i_ceph_lock); 2376 continue; 2377 } 2378 2379 2380 /* 2381 * if flushing caps were revoked, we re-send the cap flush 2382 * in client reconnect stage. This guarantees MDS * processes 2383 * the cap flush message before issuing the flushing caps to 2384 * other client. 2385 */ 2386 if ((cap->issued & ci->i_flushing_caps) != 2387 ci->i_flushing_caps) { 2388 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 2389 __kick_flushing_caps(mdsc, session, ci, 2390 oldest_flush_tid); 2391 } else { 2392 ci->i_ceph_flags |= CEPH_I_KICK_FLUSH; 2393 } 2394 2395 spin_unlock(&ci->i_ceph_lock); 2396 } 2397 } 2398 2399 void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, 2400 struct ceph_mds_session *session) 2401 { 2402 struct ceph_inode_info *ci; 2403 struct ceph_cap *cap; 2404 u64 oldest_flush_tid; 2405 2406 dout("kick_flushing_caps mds%d\n", session->s_mds); 2407 2408 spin_lock(&mdsc->cap_dirty_lock); 2409 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 2410 spin_unlock(&mdsc->cap_dirty_lock); 2411 2412 list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { 2413 spin_lock(&ci->i_ceph_lock); 2414 cap = ci->i_auth_cap; 2415 if (!(cap && cap->session == session)) { 2416 pr_err("%p auth cap %p not mds%d ???\n", 2417 &ci->vfs_inode, cap, session->s_mds); 2418 spin_unlock(&ci->i_ceph_lock); 2419 continue; 2420 } 2421 if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { 2422 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 2423 __kick_flushing_caps(mdsc, session, ci, 2424 oldest_flush_tid); 2425 } 2426 spin_unlock(&ci->i_ceph_lock); 2427 } 2428 } 2429 2430 static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, 2431 struct ceph_mds_session *session, 2432 struct inode *inode) 2433 __releases(ci->i_ceph_lock) 2434 { 2435 struct ceph_inode_info *ci = ceph_inode(inode); 2436 struct ceph_cap *cap; 2437 2438 cap = ci->i_auth_cap; 2439 dout("kick_flushing_inode_caps %p flushing %s\n", inode, 2440 ceph_cap_string(ci->i_flushing_caps)); 2441 2442 if (!list_empty(&ci->i_cap_flush_list)) { 2443 u64 oldest_flush_tid; 2444 spin_lock(&mdsc->cap_dirty_lock); 2445 list_move_tail(&ci->i_flushing_item, 2446 &cap->session->s_cap_flushing); 2447 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 2448 spin_unlock(&mdsc->cap_dirty_lock); 2449 2450 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 2451 __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid); 2452 spin_unlock(&ci->i_ceph_lock); 2453 } else { 2454 spin_unlock(&ci->i_ceph_lock); 2455 } 2456 } 2457 2458 2459 /* 2460 * Take references to capabilities we hold, so that we don't release 2461 * them to the MDS prematurely. 2462 * 2463 * Protected by i_ceph_lock. 2464 */ 2465 static void __take_cap_refs(struct ceph_inode_info *ci, int got, 2466 bool snap_rwsem_locked) 2467 { 2468 if (got & CEPH_CAP_PIN) 2469 ci->i_pin_ref++; 2470 if (got & CEPH_CAP_FILE_RD) 2471 ci->i_rd_ref++; 2472 if (got & CEPH_CAP_FILE_CACHE) 2473 ci->i_rdcache_ref++; 2474 if (got & CEPH_CAP_FILE_WR) { 2475 if (ci->i_wr_ref == 0 && !ci->i_head_snapc) { 2476 BUG_ON(!snap_rwsem_locked); 2477 ci->i_head_snapc = ceph_get_snap_context( 2478 ci->i_snap_realm->cached_context); 2479 } 2480 ci->i_wr_ref++; 2481 } 2482 if (got & CEPH_CAP_FILE_BUFFER) { 2483 if (ci->i_wb_ref == 0) 2484 ihold(&ci->vfs_inode); 2485 ci->i_wb_ref++; 2486 dout("__take_cap_refs %p wb %d -> %d (?)\n", 2487 &ci->vfs_inode, ci->i_wb_ref-1, ci->i_wb_ref); 2488 } 2489 } 2490 2491 /* 2492 * Try to grab cap references. Specify those refs we @want, and the 2493 * minimal set we @need. Also include the larger offset we are writing 2494 * to (when applicable), and check against max_size here as well. 2495 * Note that caller is responsible for ensuring max_size increases are 2496 * requested from the MDS. 2497 */ 2498 static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, 2499 loff_t endoff, bool nonblock, int *got, int *err) 2500 { 2501 struct inode *inode = &ci->vfs_inode; 2502 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 2503 int ret = 0; 2504 int have, implemented; 2505 int file_wanted; 2506 bool snap_rwsem_locked = false; 2507 2508 dout("get_cap_refs %p need %s want %s\n", inode, 2509 ceph_cap_string(need), ceph_cap_string(want)); 2510 2511 again: 2512 spin_lock(&ci->i_ceph_lock); 2513 2514 /* make sure file is actually open */ 2515 file_wanted = __ceph_caps_file_wanted(ci); 2516 if ((file_wanted & need) != need) { 2517 dout("try_get_cap_refs need %s file_wanted %s, EBADF\n", 2518 ceph_cap_string(need), ceph_cap_string(file_wanted)); 2519 *err = -EBADF; 2520 ret = 1; 2521 goto out_unlock; 2522 } 2523 2524 /* finish pending truncate */ 2525 while (ci->i_truncate_pending) { 2526 spin_unlock(&ci->i_ceph_lock); 2527 if (snap_rwsem_locked) { 2528 up_read(&mdsc->snap_rwsem); 2529 snap_rwsem_locked = false; 2530 } 2531 __ceph_do_pending_vmtruncate(inode); 2532 spin_lock(&ci->i_ceph_lock); 2533 } 2534 2535 have = __ceph_caps_issued(ci, &implemented); 2536 2537 if (have & need & CEPH_CAP_FILE_WR) { 2538 if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) { 2539 dout("get_cap_refs %p endoff %llu > maxsize %llu\n", 2540 inode, endoff, ci->i_max_size); 2541 if (endoff > ci->i_requested_max_size) { 2542 *err = -EAGAIN; 2543 ret = 1; 2544 } 2545 goto out_unlock; 2546 } 2547 /* 2548 * If a sync write is in progress, we must wait, so that we 2549 * can get a final snapshot value for size+mtime. 2550 */ 2551 if (__ceph_have_pending_cap_snap(ci)) { 2552 dout("get_cap_refs %p cap_snap_pending\n", inode); 2553 goto out_unlock; 2554 } 2555 } 2556 2557 if ((have & need) == need) { 2558 /* 2559 * Look at (implemented & ~have & not) so that we keep waiting 2560 * on transition from wanted -> needed caps. This is needed 2561 * for WRBUFFER|WR -> WR to avoid a new WR sync write from 2562 * going before a prior buffered writeback happens. 2563 */ 2564 int not = want & ~(have & need); 2565 int revoking = implemented & ~have; 2566 dout("get_cap_refs %p have %s but not %s (revoking %s)\n", 2567 inode, ceph_cap_string(have), ceph_cap_string(not), 2568 ceph_cap_string(revoking)); 2569 if ((revoking & not) == 0) { 2570 if (!snap_rwsem_locked && 2571 !ci->i_head_snapc && 2572 (need & CEPH_CAP_FILE_WR)) { 2573 if (!down_read_trylock(&mdsc->snap_rwsem)) { 2574 /* 2575 * we can not call down_read() when 2576 * task isn't in TASK_RUNNING state 2577 */ 2578 if (nonblock) { 2579 *err = -EAGAIN; 2580 ret = 1; 2581 goto out_unlock; 2582 } 2583 2584 spin_unlock(&ci->i_ceph_lock); 2585 down_read(&mdsc->snap_rwsem); 2586 snap_rwsem_locked = true; 2587 goto again; 2588 } 2589 snap_rwsem_locked = true; 2590 } 2591 *got = need | (have & want); 2592 if ((need & CEPH_CAP_FILE_RD) && 2593 !(*got & CEPH_CAP_FILE_CACHE)) 2594 ceph_disable_fscache_readpage(ci); 2595 __take_cap_refs(ci, *got, true); 2596 ret = 1; 2597 } 2598 } else { 2599 int session_readonly = false; 2600 if ((need & CEPH_CAP_FILE_WR) && ci->i_auth_cap) { 2601 struct ceph_mds_session *s = ci->i_auth_cap->session; 2602 spin_lock(&s->s_cap_lock); 2603 session_readonly = s->s_readonly; 2604 spin_unlock(&s->s_cap_lock); 2605 } 2606 if (session_readonly) { 2607 dout("get_cap_refs %p needed %s but mds%d readonly\n", 2608 inode, ceph_cap_string(need), ci->i_auth_cap->mds); 2609 *err = -EROFS; 2610 ret = 1; 2611 goto out_unlock; 2612 } 2613 2614 if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) { 2615 int mds_wanted; 2616 if (READ_ONCE(mdsc->fsc->mount_state) == 2617 CEPH_MOUNT_SHUTDOWN) { 2618 dout("get_cap_refs %p forced umount\n", inode); 2619 *err = -EIO; 2620 ret = 1; 2621 goto out_unlock; 2622 } 2623 mds_wanted = __ceph_caps_mds_wanted(ci, false); 2624 if (need & ~(mds_wanted & need)) { 2625 dout("get_cap_refs %p caps were dropped" 2626 " (session killed?)\n", inode); 2627 *err = -ESTALE; 2628 ret = 1; 2629 goto out_unlock; 2630 } 2631 if (!(file_wanted & ~mds_wanted)) 2632 ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED; 2633 } 2634 2635 dout("get_cap_refs %p have %s needed %s\n", inode, 2636 ceph_cap_string(have), ceph_cap_string(need)); 2637 } 2638 out_unlock: 2639 spin_unlock(&ci->i_ceph_lock); 2640 if (snap_rwsem_locked) 2641 up_read(&mdsc->snap_rwsem); 2642 2643 dout("get_cap_refs %p ret %d got %s\n", inode, 2644 ret, ceph_cap_string(*got)); 2645 return ret; 2646 } 2647 2648 /* 2649 * Check the offset we are writing up to against our current 2650 * max_size. If necessary, tell the MDS we want to write to 2651 * a larger offset. 2652 */ 2653 static void check_max_size(struct inode *inode, loff_t endoff) 2654 { 2655 struct ceph_inode_info *ci = ceph_inode(inode); 2656 int check = 0; 2657 2658 /* do we need to explicitly request a larger max_size? */ 2659 spin_lock(&ci->i_ceph_lock); 2660 if (endoff >= ci->i_max_size && endoff > ci->i_wanted_max_size) { 2661 dout("write %p at large endoff %llu, req max_size\n", 2662 inode, endoff); 2663 ci->i_wanted_max_size = endoff; 2664 } 2665 /* duplicate ceph_check_caps()'s logic */ 2666 if (ci->i_auth_cap && 2667 (ci->i_auth_cap->issued & CEPH_CAP_FILE_WR) && 2668 ci->i_wanted_max_size > ci->i_max_size && 2669 ci->i_wanted_max_size > ci->i_requested_max_size) 2670 check = 1; 2671 spin_unlock(&ci->i_ceph_lock); 2672 if (check) 2673 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); 2674 } 2675 2676 int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, 2677 bool nonblock, int *got) 2678 { 2679 int ret, err = 0; 2680 2681 BUG_ON(need & ~CEPH_CAP_FILE_RD); 2682 BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO|CEPH_CAP_FILE_SHARED)); 2683 ret = ceph_pool_perm_check(ci, need); 2684 if (ret < 0) 2685 return ret; 2686 2687 ret = try_get_cap_refs(ci, need, want, 0, nonblock, got, &err); 2688 if (ret) { 2689 if (err == -EAGAIN) { 2690 ret = 0; 2691 } else if (err < 0) { 2692 ret = err; 2693 } 2694 } 2695 return ret; 2696 } 2697 2698 /* 2699 * Wait for caps, and take cap references. If we can't get a WR cap 2700 * due to a small max_size, make sure we check_max_size (and possibly 2701 * ask the mds) so we don't get hung up indefinitely. 2702 */ 2703 int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, 2704 loff_t endoff, int *got, struct page **pinned_page) 2705 { 2706 int _got, ret, err = 0; 2707 2708 ret = ceph_pool_perm_check(ci, need); 2709 if (ret < 0) 2710 return ret; 2711 2712 while (true) { 2713 if (endoff > 0) 2714 check_max_size(&ci->vfs_inode, endoff); 2715 2716 err = 0; 2717 _got = 0; 2718 ret = try_get_cap_refs(ci, need, want, endoff, 2719 false, &_got, &err); 2720 if (ret) { 2721 if (err == -EAGAIN) 2722 continue; 2723 if (err < 0) 2724 ret = err; 2725 } else { 2726 DEFINE_WAIT_FUNC(wait, woken_wake_function); 2727 add_wait_queue(&ci->i_cap_wq, &wait); 2728 2729 while (!try_get_cap_refs(ci, need, want, endoff, 2730 true, &_got, &err)) { 2731 if (signal_pending(current)) { 2732 ret = -ERESTARTSYS; 2733 break; 2734 } 2735 wait_woken(&wait, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT); 2736 } 2737 2738 remove_wait_queue(&ci->i_cap_wq, &wait); 2739 2740 if (err == -EAGAIN) 2741 continue; 2742 if (err < 0) 2743 ret = err; 2744 } 2745 if (ret < 0) { 2746 if (err == -ESTALE) { 2747 /* session was killed, try renew caps */ 2748 ret = ceph_renew_caps(&ci->vfs_inode); 2749 if (ret == 0) 2750 continue; 2751 } 2752 return ret; 2753 } 2754 2755 if (ci->i_inline_version != CEPH_INLINE_NONE && 2756 (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && 2757 i_size_read(&ci->vfs_inode) > 0) { 2758 struct page *page = 2759 find_get_page(ci->vfs_inode.i_mapping, 0); 2760 if (page) { 2761 if (PageUptodate(page)) { 2762 *pinned_page = page; 2763 break; 2764 } 2765 put_page(page); 2766 } 2767 /* 2768 * drop cap refs first because getattr while 2769 * holding * caps refs can cause deadlock. 2770 */ 2771 ceph_put_cap_refs(ci, _got); 2772 _got = 0; 2773 2774 /* 2775 * getattr request will bring inline data into 2776 * page cache 2777 */ 2778 ret = __ceph_do_getattr(&ci->vfs_inode, NULL, 2779 CEPH_STAT_CAP_INLINE_DATA, 2780 true); 2781 if (ret < 0) 2782 return ret; 2783 continue; 2784 } 2785 break; 2786 } 2787 2788 if ((_got & CEPH_CAP_FILE_RD) && (_got & CEPH_CAP_FILE_CACHE)) 2789 ceph_fscache_revalidate_cookie(ci); 2790 2791 *got = _got; 2792 return 0; 2793 } 2794 2795 /* 2796 * Take cap refs. Caller must already know we hold at least one ref 2797 * on the caps in question or we don't know this is safe. 2798 */ 2799 void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps) 2800 { 2801 spin_lock(&ci->i_ceph_lock); 2802 __take_cap_refs(ci, caps, false); 2803 spin_unlock(&ci->i_ceph_lock); 2804 } 2805 2806 2807 /* 2808 * drop cap_snap that is not associated with any snapshot. 2809 * we don't need to send FLUSHSNAP message for it. 2810 */ 2811 static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci, 2812 struct ceph_cap_snap *capsnap) 2813 { 2814 if (!capsnap->need_flush && 2815 !capsnap->writing && !capsnap->dirty_pages) { 2816 dout("dropping cap_snap %p follows %llu\n", 2817 capsnap, capsnap->follows); 2818 BUG_ON(capsnap->cap_flush.tid > 0); 2819 ceph_put_snap_context(capsnap->context); 2820 if (!list_is_last(&capsnap->ci_item, &ci->i_cap_snaps)) 2821 ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS; 2822 2823 list_del(&capsnap->ci_item); 2824 ceph_put_cap_snap(capsnap); 2825 return 1; 2826 } 2827 return 0; 2828 } 2829 2830 /* 2831 * Release cap refs. 2832 * 2833 * If we released the last ref on any given cap, call ceph_check_caps 2834 * to release (or schedule a release). 2835 * 2836 * If we are releasing a WR cap (from a sync write), finalize any affected 2837 * cap_snap, and wake up any waiters. 2838 */ 2839 void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) 2840 { 2841 struct inode *inode = &ci->vfs_inode; 2842 int last = 0, put = 0, flushsnaps = 0, wake = 0; 2843 2844 spin_lock(&ci->i_ceph_lock); 2845 if (had & CEPH_CAP_PIN) 2846 --ci->i_pin_ref; 2847 if (had & CEPH_CAP_FILE_RD) 2848 if (--ci->i_rd_ref == 0) 2849 last++; 2850 if (had & CEPH_CAP_FILE_CACHE) 2851 if (--ci->i_rdcache_ref == 0) 2852 last++; 2853 if (had & CEPH_CAP_FILE_BUFFER) { 2854 if (--ci->i_wb_ref == 0) { 2855 last++; 2856 put++; 2857 } 2858 dout("put_cap_refs %p wb %d -> %d (?)\n", 2859 inode, ci->i_wb_ref+1, ci->i_wb_ref); 2860 } 2861 if (had & CEPH_CAP_FILE_WR) 2862 if (--ci->i_wr_ref == 0) { 2863 last++; 2864 if (__ceph_have_pending_cap_snap(ci)) { 2865 struct ceph_cap_snap *capsnap = 2866 list_last_entry(&ci->i_cap_snaps, 2867 struct ceph_cap_snap, 2868 ci_item); 2869 capsnap->writing = 0; 2870 if (ceph_try_drop_cap_snap(ci, capsnap)) 2871 put++; 2872 else if (__ceph_finish_cap_snap(ci, capsnap)) 2873 flushsnaps = 1; 2874 wake = 1; 2875 } 2876 if (ci->i_wrbuffer_ref_head == 0 && 2877 ci->i_dirty_caps == 0 && 2878 ci->i_flushing_caps == 0) { 2879 BUG_ON(!ci->i_head_snapc); 2880 ceph_put_snap_context(ci->i_head_snapc); 2881 ci->i_head_snapc = NULL; 2882 } 2883 /* see comment in __ceph_remove_cap() */ 2884 if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) 2885 drop_inode_snap_realm(ci); 2886 } 2887 spin_unlock(&ci->i_ceph_lock); 2888 2889 dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had), 2890 last ? " last" : "", put ? " put" : ""); 2891 2892 if (last && !flushsnaps) 2893 ceph_check_caps(ci, 0, NULL); 2894 else if (flushsnaps) 2895 ceph_flush_snaps(ci, NULL); 2896 if (wake) 2897 wake_up_all(&ci->i_cap_wq); 2898 while (put-- > 0) 2899 iput(inode); 2900 } 2901 2902 /* 2903 * Release @nr WRBUFFER refs on dirty pages for the given @snapc snap 2904 * context. Adjust per-snap dirty page accounting as appropriate. 2905 * Once all dirty data for a cap_snap is flushed, flush snapped file 2906 * metadata back to the MDS. If we dropped the last ref, call 2907 * ceph_check_caps. 2908 */ 2909 void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, 2910 struct ceph_snap_context *snapc) 2911 { 2912 struct inode *inode = &ci->vfs_inode; 2913 struct ceph_cap_snap *capsnap = NULL; 2914 int put = 0; 2915 bool last = false; 2916 bool found = false; 2917 bool flush_snaps = false; 2918 bool complete_capsnap = false; 2919 2920 spin_lock(&ci->i_ceph_lock); 2921 ci->i_wrbuffer_ref -= nr; 2922 if (ci->i_wrbuffer_ref == 0) { 2923 last = true; 2924 put++; 2925 } 2926 2927 if (ci->i_head_snapc == snapc) { 2928 ci->i_wrbuffer_ref_head -= nr; 2929 if (ci->i_wrbuffer_ref_head == 0 && 2930 ci->i_wr_ref == 0 && 2931 ci->i_dirty_caps == 0 && 2932 ci->i_flushing_caps == 0) { 2933 BUG_ON(!ci->i_head_snapc); 2934 ceph_put_snap_context(ci->i_head_snapc); 2935 ci->i_head_snapc = NULL; 2936 } 2937 dout("put_wrbuffer_cap_refs on %p head %d/%d -> %d/%d %s\n", 2938 inode, 2939 ci->i_wrbuffer_ref+nr, ci->i_wrbuffer_ref_head+nr, 2940 ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head, 2941 last ? " LAST" : ""); 2942 } else { 2943 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 2944 if (capsnap->context == snapc) { 2945 found = true; 2946 break; 2947 } 2948 } 2949 BUG_ON(!found); 2950 capsnap->dirty_pages -= nr; 2951 if (capsnap->dirty_pages == 0) { 2952 complete_capsnap = true; 2953 if (!capsnap->writing) { 2954 if (ceph_try_drop_cap_snap(ci, capsnap)) { 2955 put++; 2956 } else { 2957 ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS; 2958 flush_snaps = true; 2959 } 2960 } 2961 } 2962 dout("put_wrbuffer_cap_refs on %p cap_snap %p " 2963 " snap %lld %d/%d -> %d/%d %s%s\n", 2964 inode, capsnap, capsnap->context->seq, 2965 ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr, 2966 ci->i_wrbuffer_ref, capsnap->dirty_pages, 2967 last ? " (wrbuffer last)" : "", 2968 complete_capsnap ? " (complete capsnap)" : ""); 2969 } 2970 2971 spin_unlock(&ci->i_ceph_lock); 2972 2973 if (last) { 2974 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); 2975 } else if (flush_snaps) { 2976 ceph_flush_snaps(ci, NULL); 2977 } 2978 if (complete_capsnap) 2979 wake_up_all(&ci->i_cap_wq); 2980 while (put-- > 0) 2981 iput(inode); 2982 } 2983 2984 /* 2985 * Invalidate unlinked inode's aliases, so we can drop the inode ASAP. 2986 */ 2987 static void invalidate_aliases(struct inode *inode) 2988 { 2989 struct dentry *dn, *prev = NULL; 2990 2991 dout("invalidate_aliases inode %p\n", inode); 2992 d_prune_aliases(inode); 2993 /* 2994 * For non-directory inode, d_find_alias() only returns 2995 * hashed dentry. After calling d_invalidate(), the 2996 * dentry becomes unhashed. 2997 * 2998 * For directory inode, d_find_alias() can return 2999 * unhashed dentry. But directory inode should have 3000 * one alias at most. 3001 */ 3002 while ((dn = d_find_alias(inode))) { 3003 if (dn == prev) { 3004 dput(dn); 3005 break; 3006 } 3007 d_invalidate(dn); 3008 if (prev) 3009 dput(prev); 3010 prev = dn; 3011 } 3012 if (prev) 3013 dput(prev); 3014 } 3015 3016 struct cap_extra_info { 3017 struct ceph_string *pool_ns; 3018 /* inline data */ 3019 u64 inline_version; 3020 void *inline_data; 3021 u32 inline_len; 3022 /* dirstat */ 3023 bool dirstat_valid; 3024 u64 nfiles; 3025 u64 nsubdirs; 3026 /* currently issued */ 3027 int issued; 3028 }; 3029 3030 /* 3031 * Handle a cap GRANT message from the MDS. (Note that a GRANT may 3032 * actually be a revocation if it specifies a smaller cap set.) 3033 * 3034 * caller holds s_mutex and i_ceph_lock, we drop both. 3035 */ 3036 static void handle_cap_grant(struct inode *inode, 3037 struct ceph_mds_session *session, 3038 struct ceph_cap *cap, 3039 struct ceph_mds_caps *grant, 3040 struct ceph_buffer *xattr_buf, 3041 struct cap_extra_info *extra_info) 3042 __releases(ci->i_ceph_lock) 3043 __releases(session->s_mdsc->snap_rwsem) 3044 { 3045 struct ceph_inode_info *ci = ceph_inode(inode); 3046 int seq = le32_to_cpu(grant->seq); 3047 int newcaps = le32_to_cpu(grant->caps); 3048 int used, wanted, dirty; 3049 u64 size = le64_to_cpu(grant->size); 3050 u64 max_size = le64_to_cpu(grant->max_size); 3051 int check_caps = 0; 3052 bool wake = false; 3053 bool writeback = false; 3054 bool queue_trunc = false; 3055 bool queue_invalidate = false; 3056 bool deleted_inode = false; 3057 bool fill_inline = false; 3058 3059 dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", 3060 inode, cap, session->s_mds, seq, ceph_cap_string(newcaps)); 3061 dout(" size %llu max_size %llu, i_size %llu\n", size, max_size, 3062 inode->i_size); 3063 3064 3065 /* 3066 * auth mds of the inode changed. we received the cap export message, 3067 * but still haven't received the cap import message. handle_cap_export 3068 * updated the new auth MDS' cap. 3069 * 3070 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message 3071 * that was sent before the cap import message. So don't remove caps. 3072 */ 3073 if (ceph_seq_cmp(seq, cap->seq) <= 0) { 3074 WARN_ON(cap != ci->i_auth_cap); 3075 WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id)); 3076 seq = cap->seq; 3077 newcaps |= cap->issued; 3078 } 3079 3080 /* 3081 * If CACHE is being revoked, and we have no dirty buffers, 3082 * try to invalidate (once). (If there are dirty buffers, we 3083 * will invalidate _after_ writeback.) 3084 */ 3085 if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */ 3086 ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && 3087 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && 3088 !(ci->i_wrbuffer_ref || ci->i_wb_ref)) { 3089 if (try_nonblocking_invalidate(inode)) { 3090 /* there were locked pages.. invalidate later 3091 in a separate thread. */ 3092 if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { 3093 queue_invalidate = true; 3094 ci->i_rdcache_revoking = ci->i_rdcache_gen; 3095 } 3096 } 3097 } 3098 3099 /* side effects now are allowed */ 3100 cap->cap_gen = session->s_cap_gen; 3101 cap->seq = seq; 3102 3103 __check_cap_issue(ci, cap, newcaps); 3104 3105 if ((newcaps & CEPH_CAP_AUTH_SHARED) && 3106 (extra_info->issued & CEPH_CAP_AUTH_EXCL) == 0) { 3107 inode->i_mode = le32_to_cpu(grant->mode); 3108 inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid)); 3109 inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid)); 3110 dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode, 3111 from_kuid(&init_user_ns, inode->i_uid), 3112 from_kgid(&init_user_ns, inode->i_gid)); 3113 } 3114 3115 if ((newcaps & CEPH_CAP_LINK_SHARED) && 3116 (extra_info->issued & CEPH_CAP_LINK_EXCL) == 0) { 3117 set_nlink(inode, le32_to_cpu(grant->nlink)); 3118 if (inode->i_nlink == 0 && 3119 (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL))) 3120 deleted_inode = true; 3121 } 3122 3123 if ((extra_info->issued & CEPH_CAP_XATTR_EXCL) == 0 && 3124 grant->xattr_len) { 3125 int len = le32_to_cpu(grant->xattr_len); 3126 u64 version = le64_to_cpu(grant->xattr_version); 3127 3128 if (version > ci->i_xattrs.version) { 3129 dout(" got new xattrs v%llu on %p len %d\n", 3130 version, inode, len); 3131 if (ci->i_xattrs.blob) 3132 ceph_buffer_put(ci->i_xattrs.blob); 3133 ci->i_xattrs.blob = ceph_buffer_get(xattr_buf); 3134 ci->i_xattrs.version = version; 3135 ceph_forget_all_cached_acls(inode); 3136 } 3137 } 3138 3139 if (newcaps & CEPH_CAP_ANY_RD) { 3140 struct timespec64 mtime, atime, ctime; 3141 /* ctime/mtime/atime? */ 3142 ceph_decode_timespec64(&mtime, &grant->mtime); 3143 ceph_decode_timespec64(&atime, &grant->atime); 3144 ceph_decode_timespec64(&ctime, &grant->ctime); 3145 ceph_fill_file_time(inode, extra_info->issued, 3146 le32_to_cpu(grant->time_warp_seq), 3147 &ctime, &mtime, &atime); 3148 } 3149 3150 if ((newcaps & CEPH_CAP_FILE_SHARED) && extra_info->dirstat_valid) { 3151 ci->i_files = extra_info->nfiles; 3152 ci->i_subdirs = extra_info->nsubdirs; 3153 } 3154 3155 if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) { 3156 /* file layout may have changed */ 3157 s64 old_pool = ci->i_layout.pool_id; 3158 struct ceph_string *old_ns; 3159 3160 ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout); 3161 old_ns = rcu_dereference_protected(ci->i_layout.pool_ns, 3162 lockdep_is_held(&ci->i_ceph_lock)); 3163 rcu_assign_pointer(ci->i_layout.pool_ns, extra_info->pool_ns); 3164 3165 if (ci->i_layout.pool_id != old_pool || 3166 extra_info->pool_ns != old_ns) 3167 ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; 3168 3169 extra_info->pool_ns = old_ns; 3170 3171 /* size/truncate_seq? */ 3172 queue_trunc = ceph_fill_file_size(inode, extra_info->issued, 3173 le32_to_cpu(grant->truncate_seq), 3174 le64_to_cpu(grant->truncate_size), 3175 size); 3176 } 3177 3178 if (ci->i_auth_cap == cap && (newcaps & CEPH_CAP_ANY_FILE_WR)) { 3179 if (max_size != ci->i_max_size) { 3180 dout("max_size %lld -> %llu\n", 3181 ci->i_max_size, max_size); 3182 ci->i_max_size = max_size; 3183 if (max_size >= ci->i_wanted_max_size) { 3184 ci->i_wanted_max_size = 0; /* reset */ 3185 ci->i_requested_max_size = 0; 3186 } 3187 wake = true; 3188 } else if (ci->i_wanted_max_size > ci->i_max_size && 3189 ci->i_wanted_max_size > ci->i_requested_max_size) { 3190 /* CEPH_CAP_OP_IMPORT */ 3191 wake = true; 3192 } 3193 } 3194 3195 /* check cap bits */ 3196 wanted = __ceph_caps_wanted(ci); 3197 used = __ceph_caps_used(ci); 3198 dirty = __ceph_caps_dirty(ci); 3199 dout(" my wanted = %s, used = %s, dirty %s\n", 3200 ceph_cap_string(wanted), 3201 ceph_cap_string(used), 3202 ceph_cap_string(dirty)); 3203 if (wanted != le32_to_cpu(grant->wanted)) { 3204 dout("mds wanted %s -> %s\n", 3205 ceph_cap_string(le32_to_cpu(grant->wanted)), 3206 ceph_cap_string(wanted)); 3207 /* imported cap may not have correct mds_wanted */ 3208 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) 3209 check_caps = 1; 3210 } 3211 3212 /* revocation, grant, or no-op? */ 3213 if (cap->issued & ~newcaps) { 3214 int revoking = cap->issued & ~newcaps; 3215 3216 dout("revocation: %s -> %s (revoking %s)\n", 3217 ceph_cap_string(cap->issued), 3218 ceph_cap_string(newcaps), 3219 ceph_cap_string(revoking)); 3220 if (revoking & used & CEPH_CAP_FILE_BUFFER) 3221 writeback = true; /* initiate writeback; will delay ack */ 3222 else if (revoking == CEPH_CAP_FILE_CACHE && 3223 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && 3224 queue_invalidate) 3225 ; /* do nothing yet, invalidation will be queued */ 3226 else if (cap == ci->i_auth_cap) 3227 check_caps = 1; /* check auth cap only */ 3228 else 3229 check_caps = 2; /* check all caps */ 3230 cap->issued = newcaps; 3231 cap->implemented |= newcaps; 3232 } else if (cap->issued == newcaps) { 3233 dout("caps unchanged: %s -> %s\n", 3234 ceph_cap_string(cap->issued), ceph_cap_string(newcaps)); 3235 } else { 3236 dout("grant: %s -> %s\n", ceph_cap_string(cap->issued), 3237 ceph_cap_string(newcaps)); 3238 /* non-auth MDS is revoking the newly grant caps ? */ 3239 if (cap == ci->i_auth_cap && 3240 __ceph_caps_revoking_other(ci, cap, newcaps)) 3241 check_caps = 2; 3242 3243 cap->issued = newcaps; 3244 cap->implemented |= newcaps; /* add bits only, to 3245 * avoid stepping on a 3246 * pending revocation */ 3247 wake = true; 3248 } 3249 BUG_ON(cap->issued & ~cap->implemented); 3250 3251 if (extra_info->inline_version > 0 && 3252 extra_info->inline_version >= ci->i_inline_version) { 3253 ci->i_inline_version = extra_info->inline_version; 3254 if (ci->i_inline_version != CEPH_INLINE_NONE && 3255 (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO))) 3256 fill_inline = true; 3257 } 3258 3259 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { 3260 if (newcaps & ~extra_info->issued) 3261 wake = true; 3262 kick_flushing_inode_caps(session->s_mdsc, session, inode); 3263 up_read(&session->s_mdsc->snap_rwsem); 3264 } else { 3265 spin_unlock(&ci->i_ceph_lock); 3266 } 3267 3268 if (fill_inline) 3269 ceph_fill_inline_data(inode, NULL, extra_info->inline_data, 3270 extra_info->inline_len); 3271 3272 if (queue_trunc) 3273 ceph_queue_vmtruncate(inode); 3274 3275 if (writeback) 3276 /* 3277 * queue inode for writeback: we can't actually call 3278 * filemap_write_and_wait, etc. from message handler 3279 * context. 3280 */ 3281 ceph_queue_writeback(inode); 3282 if (queue_invalidate) 3283 ceph_queue_invalidate(inode); 3284 if (deleted_inode) 3285 invalidate_aliases(inode); 3286 if (wake) 3287 wake_up_all(&ci->i_cap_wq); 3288 3289 if (check_caps == 1) 3290 ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY, 3291 session); 3292 else if (check_caps == 2) 3293 ceph_check_caps(ci, CHECK_CAPS_NODELAY, session); 3294 else 3295 mutex_unlock(&session->s_mutex); 3296 } 3297 3298 /* 3299 * Handle FLUSH_ACK from MDS, indicating that metadata we sent to the 3300 * MDS has been safely committed. 3301 */ 3302 static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, 3303 struct ceph_mds_caps *m, 3304 struct ceph_mds_session *session, 3305 struct ceph_cap *cap) 3306 __releases(ci->i_ceph_lock) 3307 { 3308 struct ceph_inode_info *ci = ceph_inode(inode); 3309 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 3310 struct ceph_cap_flush *cf, *tmp_cf; 3311 LIST_HEAD(to_remove); 3312 unsigned seq = le32_to_cpu(m->seq); 3313 int dirty = le32_to_cpu(m->dirty); 3314 int cleaned = 0; 3315 bool drop = false; 3316 bool wake_ci = false; 3317 bool wake_mdsc = false; 3318 3319 list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) { 3320 if (cf->tid == flush_tid) 3321 cleaned = cf->caps; 3322 if (cf->caps == 0) /* capsnap */ 3323 continue; 3324 if (cf->tid <= flush_tid) { 3325 if (__finish_cap_flush(NULL, ci, cf)) 3326 wake_ci = true; 3327 list_add_tail(&cf->i_list, &to_remove); 3328 } else { 3329 cleaned &= ~cf->caps; 3330 if (!cleaned) 3331 break; 3332 } 3333 } 3334 3335 dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s," 3336 " flushing %s -> %s\n", 3337 inode, session->s_mds, seq, ceph_cap_string(dirty), 3338 ceph_cap_string(cleaned), ceph_cap_string(ci->i_flushing_caps), 3339 ceph_cap_string(ci->i_flushing_caps & ~cleaned)); 3340 3341 if (list_empty(&to_remove) && !cleaned) 3342 goto out; 3343 3344 ci->i_flushing_caps &= ~cleaned; 3345 3346 spin_lock(&mdsc->cap_dirty_lock); 3347 3348 list_for_each_entry(cf, &to_remove, i_list) { 3349 if (__finish_cap_flush(mdsc, NULL, cf)) 3350 wake_mdsc = true; 3351 } 3352 3353 if (ci->i_flushing_caps == 0) { 3354 if (list_empty(&ci->i_cap_flush_list)) { 3355 list_del_init(&ci->i_flushing_item); 3356 if (!list_empty(&session->s_cap_flushing)) { 3357 dout(" mds%d still flushing cap on %p\n", 3358 session->s_mds, 3359 &list_first_entry(&session->s_cap_flushing, 3360 struct ceph_inode_info, 3361 i_flushing_item)->vfs_inode); 3362 } 3363 } 3364 mdsc->num_cap_flushing--; 3365 dout(" inode %p now !flushing\n", inode); 3366 3367 if (ci->i_dirty_caps == 0) { 3368 dout(" inode %p now clean\n", inode); 3369 BUG_ON(!list_empty(&ci->i_dirty_item)); 3370 drop = true; 3371 if (ci->i_wr_ref == 0 && 3372 ci->i_wrbuffer_ref_head == 0) { 3373 BUG_ON(!ci->i_head_snapc); 3374 ceph_put_snap_context(ci->i_head_snapc); 3375 ci->i_head_snapc = NULL; 3376 } 3377 } else { 3378 BUG_ON(list_empty(&ci->i_dirty_item)); 3379 } 3380 } 3381 spin_unlock(&mdsc->cap_dirty_lock); 3382 3383 out: 3384 spin_unlock(&ci->i_ceph_lock); 3385 3386 while (!list_empty(&to_remove)) { 3387 cf = list_first_entry(&to_remove, 3388 struct ceph_cap_flush, i_list); 3389 list_del(&cf->i_list); 3390 ceph_free_cap_flush(cf); 3391 } 3392 3393 if (wake_ci) 3394 wake_up_all(&ci->i_cap_wq); 3395 if (wake_mdsc) 3396 wake_up_all(&mdsc->cap_flushing_wq); 3397 if (drop) 3398 iput(inode); 3399 } 3400 3401 /* 3402 * Handle FLUSHSNAP_ACK. MDS has flushed snap data to disk and we can 3403 * throw away our cap_snap. 3404 * 3405 * Caller hold s_mutex. 3406 */ 3407 static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, 3408 struct ceph_mds_caps *m, 3409 struct ceph_mds_session *session) 3410 { 3411 struct ceph_inode_info *ci = ceph_inode(inode); 3412 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 3413 u64 follows = le64_to_cpu(m->snap_follows); 3414 struct ceph_cap_snap *capsnap; 3415 bool flushed = false; 3416 bool wake_ci = false; 3417 bool wake_mdsc = false; 3418 3419 dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n", 3420 inode, ci, session->s_mds, follows); 3421 3422 spin_lock(&ci->i_ceph_lock); 3423 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 3424 if (capsnap->follows == follows) { 3425 if (capsnap->cap_flush.tid != flush_tid) { 3426 dout(" cap_snap %p follows %lld tid %lld !=" 3427 " %lld\n", capsnap, follows, 3428 flush_tid, capsnap->cap_flush.tid); 3429 break; 3430 } 3431 flushed = true; 3432 break; 3433 } else { 3434 dout(" skipping cap_snap %p follows %lld\n", 3435 capsnap, capsnap->follows); 3436 } 3437 } 3438 if (flushed) { 3439 WARN_ON(capsnap->dirty_pages || capsnap->writing); 3440 dout(" removing %p cap_snap %p follows %lld\n", 3441 inode, capsnap, follows); 3442 list_del(&capsnap->ci_item); 3443 if (__finish_cap_flush(NULL, ci, &capsnap->cap_flush)) 3444 wake_ci = true; 3445 3446 spin_lock(&mdsc->cap_dirty_lock); 3447 3448 if (list_empty(&ci->i_cap_flush_list)) 3449 list_del_init(&ci->i_flushing_item); 3450 3451 if (__finish_cap_flush(mdsc, NULL, &capsnap->cap_flush)) 3452 wake_mdsc = true; 3453 3454 spin_unlock(&mdsc->cap_dirty_lock); 3455 } 3456 spin_unlock(&ci->i_ceph_lock); 3457 if (flushed) { 3458 ceph_put_snap_context(capsnap->context); 3459 ceph_put_cap_snap(capsnap); 3460 if (wake_ci) 3461 wake_up_all(&ci->i_cap_wq); 3462 if (wake_mdsc) 3463 wake_up_all(&mdsc->cap_flushing_wq); 3464 iput(inode); 3465 } 3466 } 3467 3468 /* 3469 * Handle TRUNC from MDS, indicating file truncation. 3470 * 3471 * caller hold s_mutex. 3472 */ 3473 static void handle_cap_trunc(struct inode *inode, 3474 struct ceph_mds_caps *trunc, 3475 struct ceph_mds_session *session) 3476 __releases(ci->i_ceph_lock) 3477 { 3478 struct ceph_inode_info *ci = ceph_inode(inode); 3479 int mds = session->s_mds; 3480 int seq = le32_to_cpu(trunc->seq); 3481 u32 truncate_seq = le32_to_cpu(trunc->truncate_seq); 3482 u64 truncate_size = le64_to_cpu(trunc->truncate_size); 3483 u64 size = le64_to_cpu(trunc->size); 3484 int implemented = 0; 3485 int dirty = __ceph_caps_dirty(ci); 3486 int issued = __ceph_caps_issued(ceph_inode(inode), &implemented); 3487 int queue_trunc = 0; 3488 3489 issued |= implemented | dirty; 3490 3491 dout("handle_cap_trunc inode %p mds%d seq %d to %lld seq %d\n", 3492 inode, mds, seq, truncate_size, truncate_seq); 3493 queue_trunc = ceph_fill_file_size(inode, issued, 3494 truncate_seq, truncate_size, size); 3495 spin_unlock(&ci->i_ceph_lock); 3496 3497 if (queue_trunc) 3498 ceph_queue_vmtruncate(inode); 3499 } 3500 3501 /* 3502 * Handle EXPORT from MDS. Cap is being migrated _from_ this mds to a 3503 * different one. If we are the most recent migration we've seen (as 3504 * indicated by mseq), make note of the migrating cap bits for the 3505 * duration (until we see the corresponding IMPORT). 3506 * 3507 * caller holds s_mutex 3508 */ 3509 static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, 3510 struct ceph_mds_cap_peer *ph, 3511 struct ceph_mds_session *session) 3512 { 3513 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 3514 struct ceph_mds_session *tsession = NULL; 3515 struct ceph_cap *cap, *tcap, *new_cap = NULL; 3516 struct ceph_inode_info *ci = ceph_inode(inode); 3517 u64 t_cap_id; 3518 unsigned mseq = le32_to_cpu(ex->migrate_seq); 3519 unsigned t_seq, t_mseq; 3520 int target, issued; 3521 int mds = session->s_mds; 3522 3523 if (ph) { 3524 t_cap_id = le64_to_cpu(ph->cap_id); 3525 t_seq = le32_to_cpu(ph->seq); 3526 t_mseq = le32_to_cpu(ph->mseq); 3527 target = le32_to_cpu(ph->mds); 3528 } else { 3529 t_cap_id = t_seq = t_mseq = 0; 3530 target = -1; 3531 } 3532 3533 dout("handle_cap_export inode %p ci %p mds%d mseq %d target %d\n", 3534 inode, ci, mds, mseq, target); 3535 retry: 3536 spin_lock(&ci->i_ceph_lock); 3537 cap = __get_cap_for_mds(ci, mds); 3538 if (!cap || cap->cap_id != le64_to_cpu(ex->cap_id)) 3539 goto out_unlock; 3540 3541 if (target < 0) { 3542 __ceph_remove_cap(cap, false); 3543 if (!ci->i_auth_cap) 3544 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED; 3545 goto out_unlock; 3546 } 3547 3548 /* 3549 * now we know we haven't received the cap import message yet 3550 * because the exported cap still exist. 3551 */ 3552 3553 issued = cap->issued; 3554 if (issued != cap->implemented) 3555 pr_err_ratelimited("handle_cap_export: issued != implemented: " 3556 "ino (%llx.%llx) mds%d seq %d mseq %d " 3557 "issued %s implemented %s\n", 3558 ceph_vinop(inode), mds, cap->seq, cap->mseq, 3559 ceph_cap_string(issued), 3560 ceph_cap_string(cap->implemented)); 3561 3562 3563 tcap = __get_cap_for_mds(ci, target); 3564 if (tcap) { 3565 /* already have caps from the target */ 3566 if (tcap->cap_id == t_cap_id && 3567 ceph_seq_cmp(tcap->seq, t_seq) < 0) { 3568 dout(" updating import cap %p mds%d\n", tcap, target); 3569 tcap->cap_id = t_cap_id; 3570 tcap->seq = t_seq - 1; 3571 tcap->issue_seq = t_seq - 1; 3572 tcap->mseq = t_mseq; 3573 tcap->issued |= issued; 3574 tcap->implemented |= issued; 3575 if (cap == ci->i_auth_cap) 3576 ci->i_auth_cap = tcap; 3577 3578 if (!list_empty(&ci->i_cap_flush_list) && 3579 ci->i_auth_cap == tcap) { 3580 spin_lock(&mdsc->cap_dirty_lock); 3581 list_move_tail(&ci->i_flushing_item, 3582 &tcap->session->s_cap_flushing); 3583 spin_unlock(&mdsc->cap_dirty_lock); 3584 } 3585 } 3586 __ceph_remove_cap(cap, false); 3587 goto out_unlock; 3588 } else if (tsession) { 3589 /* add placeholder for the export tagert */ 3590 int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0; 3591 tcap = new_cap; 3592 ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0, 3593 t_seq - 1, t_mseq, (u64)-1, flag, &new_cap); 3594 3595 if (!list_empty(&ci->i_cap_flush_list) && 3596 ci->i_auth_cap == tcap) { 3597 spin_lock(&mdsc->cap_dirty_lock); 3598 list_move_tail(&ci->i_flushing_item, 3599 &tcap->session->s_cap_flushing); 3600 spin_unlock(&mdsc->cap_dirty_lock); 3601 } 3602 3603 __ceph_remove_cap(cap, false); 3604 goto out_unlock; 3605 } 3606 3607 spin_unlock(&ci->i_ceph_lock); 3608 mutex_unlock(&session->s_mutex); 3609 3610 /* open target session */ 3611 tsession = ceph_mdsc_open_export_target_session(mdsc, target); 3612 if (!IS_ERR(tsession)) { 3613 if (mds > target) { 3614 mutex_lock(&session->s_mutex); 3615 mutex_lock_nested(&tsession->s_mutex, 3616 SINGLE_DEPTH_NESTING); 3617 } else { 3618 mutex_lock(&tsession->s_mutex); 3619 mutex_lock_nested(&session->s_mutex, 3620 SINGLE_DEPTH_NESTING); 3621 } 3622 new_cap = ceph_get_cap(mdsc, NULL); 3623 } else { 3624 WARN_ON(1); 3625 tsession = NULL; 3626 target = -1; 3627 } 3628 goto retry; 3629 3630 out_unlock: 3631 spin_unlock(&ci->i_ceph_lock); 3632 mutex_unlock(&session->s_mutex); 3633 if (tsession) { 3634 mutex_unlock(&tsession->s_mutex); 3635 ceph_put_mds_session(tsession); 3636 } 3637 if (new_cap) 3638 ceph_put_cap(mdsc, new_cap); 3639 } 3640 3641 /* 3642 * Handle cap IMPORT. 3643 * 3644 * caller holds s_mutex. acquires i_ceph_lock 3645 */ 3646 static void handle_cap_import(struct ceph_mds_client *mdsc, 3647 struct inode *inode, struct ceph_mds_caps *im, 3648 struct ceph_mds_cap_peer *ph, 3649 struct ceph_mds_session *session, 3650 struct ceph_cap **target_cap, int *old_issued) 3651 __acquires(ci->i_ceph_lock) 3652 { 3653 struct ceph_inode_info *ci = ceph_inode(inode); 3654 struct ceph_cap *cap, *ocap, *new_cap = NULL; 3655 int mds = session->s_mds; 3656 int issued; 3657 unsigned caps = le32_to_cpu(im->caps); 3658 unsigned wanted = le32_to_cpu(im->wanted); 3659 unsigned seq = le32_to_cpu(im->seq); 3660 unsigned mseq = le32_to_cpu(im->migrate_seq); 3661 u64 realmino = le64_to_cpu(im->realm); 3662 u64 cap_id = le64_to_cpu(im->cap_id); 3663 u64 p_cap_id; 3664 int peer; 3665 3666 if (ph) { 3667 p_cap_id = le64_to_cpu(ph->cap_id); 3668 peer = le32_to_cpu(ph->mds); 3669 } else { 3670 p_cap_id = 0; 3671 peer = -1; 3672 } 3673 3674 dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n", 3675 inode, ci, mds, mseq, peer); 3676 3677 retry: 3678 spin_lock(&ci->i_ceph_lock); 3679 cap = __get_cap_for_mds(ci, mds); 3680 if (!cap) { 3681 if (!new_cap) { 3682 spin_unlock(&ci->i_ceph_lock); 3683 new_cap = ceph_get_cap(mdsc, NULL); 3684 goto retry; 3685 } 3686 cap = new_cap; 3687 } else { 3688 if (new_cap) { 3689 ceph_put_cap(mdsc, new_cap); 3690 new_cap = NULL; 3691 } 3692 } 3693 3694 __ceph_caps_issued(ci, &issued); 3695 issued |= __ceph_caps_dirty(ci); 3696 3697 ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq, 3698 realmino, CEPH_CAP_FLAG_AUTH, &new_cap); 3699 3700 ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL; 3701 if (ocap && ocap->cap_id == p_cap_id) { 3702 dout(" remove export cap %p mds%d flags %d\n", 3703 ocap, peer, ph->flags); 3704 if ((ph->flags & CEPH_CAP_FLAG_AUTH) && 3705 (ocap->seq != le32_to_cpu(ph->seq) || 3706 ocap->mseq != le32_to_cpu(ph->mseq))) { 3707 pr_err_ratelimited("handle_cap_import: " 3708 "mismatched seq/mseq: ino (%llx.%llx) " 3709 "mds%d seq %d mseq %d importer mds%d " 3710 "has peer seq %d mseq %d\n", 3711 ceph_vinop(inode), peer, ocap->seq, 3712 ocap->mseq, mds, le32_to_cpu(ph->seq), 3713 le32_to_cpu(ph->mseq)); 3714 } 3715 __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE)); 3716 } 3717 3718 /* make sure we re-request max_size, if necessary */ 3719 ci->i_requested_max_size = 0; 3720 3721 *old_issued = issued; 3722 *target_cap = cap; 3723 } 3724 3725 /* 3726 * Handle a caps message from the MDS. 3727 * 3728 * Identify the appropriate session, inode, and call the right handler 3729 * based on the cap op. 3730 */ 3731 void ceph_handle_caps(struct ceph_mds_session *session, 3732 struct ceph_msg *msg) 3733 { 3734 struct ceph_mds_client *mdsc = session->s_mdsc; 3735 struct inode *inode; 3736 struct ceph_inode_info *ci; 3737 struct ceph_cap *cap; 3738 struct ceph_mds_caps *h; 3739 struct ceph_mds_cap_peer *peer = NULL; 3740 struct ceph_snap_realm *realm = NULL; 3741 int op; 3742 int msg_version = le16_to_cpu(msg->hdr.version); 3743 u32 seq, mseq; 3744 struct ceph_vino vino; 3745 void *snaptrace; 3746 size_t snaptrace_len; 3747 void *p, *end; 3748 struct cap_extra_info extra_info = {}; 3749 3750 dout("handle_caps from mds%d\n", session->s_mds); 3751 3752 /* decode */ 3753 end = msg->front.iov_base + msg->front.iov_len; 3754 if (msg->front.iov_len < sizeof(*h)) 3755 goto bad; 3756 h = msg->front.iov_base; 3757 op = le32_to_cpu(h->op); 3758 vino.ino = le64_to_cpu(h->ino); 3759 vino.snap = CEPH_NOSNAP; 3760 seq = le32_to_cpu(h->seq); 3761 mseq = le32_to_cpu(h->migrate_seq); 3762 3763 snaptrace = h + 1; 3764 snaptrace_len = le32_to_cpu(h->snap_trace_len); 3765 p = snaptrace + snaptrace_len; 3766 3767 if (msg_version >= 2) { 3768 u32 flock_len; 3769 ceph_decode_32_safe(&p, end, flock_len, bad); 3770 if (p + flock_len > end) 3771 goto bad; 3772 p += flock_len; 3773 } 3774 3775 if (msg_version >= 3) { 3776 if (op == CEPH_CAP_OP_IMPORT) { 3777 if (p + sizeof(*peer) > end) 3778 goto bad; 3779 peer = p; 3780 p += sizeof(*peer); 3781 } else if (op == CEPH_CAP_OP_EXPORT) { 3782 /* recorded in unused fields */ 3783 peer = (void *)&h->size; 3784 } 3785 } 3786 3787 if (msg_version >= 4) { 3788 ceph_decode_64_safe(&p, end, extra_info.inline_version, bad); 3789 ceph_decode_32_safe(&p, end, extra_info.inline_len, bad); 3790 if (p + extra_info.inline_len > end) 3791 goto bad; 3792 extra_info.inline_data = p; 3793 p += extra_info.inline_len; 3794 } 3795 3796 if (msg_version >= 5) { 3797 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 3798 u32 epoch_barrier; 3799 3800 ceph_decode_32_safe(&p, end, epoch_barrier, bad); 3801 ceph_osdc_update_epoch_barrier(osdc, epoch_barrier); 3802 } 3803 3804 if (msg_version >= 8) { 3805 u64 flush_tid; 3806 u32 caller_uid, caller_gid; 3807 u32 pool_ns_len; 3808 3809 /* version >= 6 */ 3810 ceph_decode_64_safe(&p, end, flush_tid, bad); 3811 /* version >= 7 */ 3812 ceph_decode_32_safe(&p, end, caller_uid, bad); 3813 ceph_decode_32_safe(&p, end, caller_gid, bad); 3814 /* version >= 8 */ 3815 ceph_decode_32_safe(&p, end, pool_ns_len, bad); 3816 if (pool_ns_len > 0) { 3817 ceph_decode_need(&p, end, pool_ns_len, bad); 3818 extra_info.pool_ns = 3819 ceph_find_or_create_string(p, pool_ns_len); 3820 p += pool_ns_len; 3821 } 3822 } 3823 3824 if (msg_version >= 11) { 3825 struct ceph_timespec *btime; 3826 u64 change_attr; 3827 u32 flags; 3828 3829 /* version >= 9 */ 3830 if (p + sizeof(*btime) > end) 3831 goto bad; 3832 btime = p; 3833 p += sizeof(*btime); 3834 ceph_decode_64_safe(&p, end, change_attr, bad); 3835 /* version >= 10 */ 3836 ceph_decode_32_safe(&p, end, flags, bad); 3837 /* version >= 11 */ 3838 extra_info.dirstat_valid = true; 3839 ceph_decode_64_safe(&p, end, extra_info.nfiles, bad); 3840 ceph_decode_64_safe(&p, end, extra_info.nsubdirs, bad); 3841 } 3842 3843 /* lookup ino */ 3844 inode = ceph_find_inode(mdsc->fsc->sb, vino); 3845 ci = ceph_inode(inode); 3846 dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino, 3847 vino.snap, inode); 3848 3849 mutex_lock(&session->s_mutex); 3850 session->s_seq++; 3851 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, 3852 (unsigned)seq); 3853 3854 if (!inode) { 3855 dout(" i don't have ino %llx\n", vino.ino); 3856 3857 if (op == CEPH_CAP_OP_IMPORT) { 3858 cap = ceph_get_cap(mdsc, NULL); 3859 cap->cap_ino = vino.ino; 3860 cap->queue_release = 1; 3861 cap->cap_id = le64_to_cpu(h->cap_id); 3862 cap->mseq = mseq; 3863 cap->seq = seq; 3864 cap->issue_seq = seq; 3865 spin_lock(&session->s_cap_lock); 3866 list_add_tail(&cap->session_caps, 3867 &session->s_cap_releases); 3868 session->s_num_cap_releases++; 3869 spin_unlock(&session->s_cap_lock); 3870 } 3871 goto flush_cap_releases; 3872 } 3873 3874 /* these will work even if we don't have a cap yet */ 3875 switch (op) { 3876 case CEPH_CAP_OP_FLUSHSNAP_ACK: 3877 handle_cap_flushsnap_ack(inode, le64_to_cpu(msg->hdr.tid), 3878 h, session); 3879 goto done; 3880 3881 case CEPH_CAP_OP_EXPORT: 3882 handle_cap_export(inode, h, peer, session); 3883 goto done_unlocked; 3884 3885 case CEPH_CAP_OP_IMPORT: 3886 realm = NULL; 3887 if (snaptrace_len) { 3888 down_write(&mdsc->snap_rwsem); 3889 ceph_update_snap_trace(mdsc, snaptrace, 3890 snaptrace + snaptrace_len, 3891 false, &realm); 3892 downgrade_write(&mdsc->snap_rwsem); 3893 } else { 3894 down_read(&mdsc->snap_rwsem); 3895 } 3896 handle_cap_import(mdsc, inode, h, peer, session, 3897 &cap, &extra_info.issued); 3898 handle_cap_grant(inode, session, cap, 3899 h, msg->middle, &extra_info); 3900 if (realm) 3901 ceph_put_snap_realm(mdsc, realm); 3902 goto done_unlocked; 3903 } 3904 3905 /* the rest require a cap */ 3906 spin_lock(&ci->i_ceph_lock); 3907 cap = __get_cap_for_mds(ceph_inode(inode), session->s_mds); 3908 if (!cap) { 3909 dout(" no cap on %p ino %llx.%llx from mds%d\n", 3910 inode, ceph_ino(inode), ceph_snap(inode), 3911 session->s_mds); 3912 spin_unlock(&ci->i_ceph_lock); 3913 goto flush_cap_releases; 3914 } 3915 3916 /* note that each of these drops i_ceph_lock for us */ 3917 switch (op) { 3918 case CEPH_CAP_OP_REVOKE: 3919 case CEPH_CAP_OP_GRANT: 3920 __ceph_caps_issued(ci, &extra_info.issued); 3921 extra_info.issued |= __ceph_caps_dirty(ci); 3922 handle_cap_grant(inode, session, cap, 3923 h, msg->middle, &extra_info); 3924 goto done_unlocked; 3925 3926 case CEPH_CAP_OP_FLUSH_ACK: 3927 handle_cap_flush_ack(inode, le64_to_cpu(msg->hdr.tid), 3928 h, session, cap); 3929 break; 3930 3931 case CEPH_CAP_OP_TRUNC: 3932 handle_cap_trunc(inode, h, session); 3933 break; 3934 3935 default: 3936 spin_unlock(&ci->i_ceph_lock); 3937 pr_err("ceph_handle_caps: unknown cap op %d %s\n", op, 3938 ceph_cap_op_name(op)); 3939 } 3940 3941 goto done; 3942 3943 flush_cap_releases: 3944 /* 3945 * send any cap release message to try to move things 3946 * along for the mds (who clearly thinks we still have this 3947 * cap). 3948 */ 3949 ceph_send_cap_releases(mdsc, session); 3950 3951 done: 3952 mutex_unlock(&session->s_mutex); 3953 done_unlocked: 3954 iput(inode); 3955 ceph_put_string(extra_info.pool_ns); 3956 return; 3957 3958 bad: 3959 pr_err("ceph_handle_caps: corrupt message\n"); 3960 ceph_msg_dump(msg); 3961 return; 3962 } 3963 3964 /* 3965 * Delayed work handler to process end of delayed cap release LRU list. 3966 */ 3967 void ceph_check_delayed_caps(struct ceph_mds_client *mdsc) 3968 { 3969 struct inode *inode; 3970 struct ceph_inode_info *ci; 3971 int flags = CHECK_CAPS_NODELAY; 3972 3973 dout("check_delayed_caps\n"); 3974 while (1) { 3975 spin_lock(&mdsc->cap_delay_lock); 3976 if (list_empty(&mdsc->cap_delay_list)) 3977 break; 3978 ci = list_first_entry(&mdsc->cap_delay_list, 3979 struct ceph_inode_info, 3980 i_cap_delay_list); 3981 if ((ci->i_ceph_flags & CEPH_I_FLUSH) == 0 && 3982 time_before(jiffies, ci->i_hold_caps_max)) 3983 break; 3984 list_del_init(&ci->i_cap_delay_list); 3985 3986 inode = igrab(&ci->vfs_inode); 3987 spin_unlock(&mdsc->cap_delay_lock); 3988 3989 if (inode) { 3990 dout("check_delayed_caps on %p\n", inode); 3991 ceph_check_caps(ci, flags, NULL); 3992 iput(inode); 3993 } 3994 } 3995 spin_unlock(&mdsc->cap_delay_lock); 3996 } 3997 3998 /* 3999 * Flush all dirty caps to the mds 4000 */ 4001 void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc) 4002 { 4003 struct ceph_inode_info *ci; 4004 struct inode *inode; 4005 4006 dout("flush_dirty_caps\n"); 4007 spin_lock(&mdsc->cap_dirty_lock); 4008 while (!list_empty(&mdsc->cap_dirty)) { 4009 ci = list_first_entry(&mdsc->cap_dirty, struct ceph_inode_info, 4010 i_dirty_item); 4011 inode = &ci->vfs_inode; 4012 ihold(inode); 4013 dout("flush_dirty_caps %p\n", inode); 4014 spin_unlock(&mdsc->cap_dirty_lock); 4015 ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_FLUSH, NULL); 4016 iput(inode); 4017 spin_lock(&mdsc->cap_dirty_lock); 4018 } 4019 spin_unlock(&mdsc->cap_dirty_lock); 4020 dout("flush_dirty_caps done\n"); 4021 } 4022 4023 void __ceph_get_fmode(struct ceph_inode_info *ci, int fmode) 4024 { 4025 int i; 4026 int bits = (fmode << 1) | 1; 4027 for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { 4028 if (bits & (1 << i)) 4029 ci->i_nr_by_mode[i]++; 4030 } 4031 } 4032 4033 /* 4034 * Drop open file reference. If we were the last open file, 4035 * we may need to release capabilities to the MDS (or schedule 4036 * their delayed release). 4037 */ 4038 void ceph_put_fmode(struct ceph_inode_info *ci, int fmode) 4039 { 4040 int i, last = 0; 4041 int bits = (fmode << 1) | 1; 4042 spin_lock(&ci->i_ceph_lock); 4043 for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { 4044 if (bits & (1 << i)) { 4045 BUG_ON(ci->i_nr_by_mode[i] == 0); 4046 if (--ci->i_nr_by_mode[i] == 0) 4047 last++; 4048 } 4049 } 4050 dout("put_fmode %p fmode %d {%d,%d,%d,%d}\n", 4051 &ci->vfs_inode, fmode, 4052 ci->i_nr_by_mode[0], ci->i_nr_by_mode[1], 4053 ci->i_nr_by_mode[2], ci->i_nr_by_mode[3]); 4054 spin_unlock(&ci->i_ceph_lock); 4055 4056 if (last && ci->i_vino.snap == CEPH_NOSNAP) 4057 ceph_check_caps(ci, 0, NULL); 4058 } 4059 4060 /* 4061 * For a soon-to-be unlinked file, drop the AUTH_RDCACHE caps. If it 4062 * looks like the link count will hit 0, drop any other caps (other 4063 * than PIN) we don't specifically want (due to the file still being 4064 * open). 4065 */ 4066 int ceph_drop_caps_for_unlink(struct inode *inode) 4067 { 4068 struct ceph_inode_info *ci = ceph_inode(inode); 4069 int drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL; 4070 4071 spin_lock(&ci->i_ceph_lock); 4072 if (inode->i_nlink == 1) { 4073 drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN); 4074 4075 ci->i_ceph_flags |= CEPH_I_NODELAY; 4076 if (__ceph_caps_dirty(ci)) { 4077 struct ceph_mds_client *mdsc = 4078 ceph_inode_to_client(inode)->mdsc; 4079 __cap_delay_requeue_front(mdsc, ci); 4080 } 4081 } 4082 spin_unlock(&ci->i_ceph_lock); 4083 return drop; 4084 } 4085 4086 /* 4087 * Helpers for embedding cap and dentry lease releases into mds 4088 * requests. 4089 * 4090 * @force is used by dentry_release (below) to force inclusion of a 4091 * record for the directory inode, even when there aren't any caps to 4092 * drop. 4093 */ 4094 int ceph_encode_inode_release(void **p, struct inode *inode, 4095 int mds, int drop, int unless, int force) 4096 { 4097 struct ceph_inode_info *ci = ceph_inode(inode); 4098 struct ceph_cap *cap; 4099 struct ceph_mds_request_release *rel = *p; 4100 int used, dirty; 4101 int ret = 0; 4102 4103 spin_lock(&ci->i_ceph_lock); 4104 used = __ceph_caps_used(ci); 4105 dirty = __ceph_caps_dirty(ci); 4106 4107 dout("encode_inode_release %p mds%d used|dirty %s drop %s unless %s\n", 4108 inode, mds, ceph_cap_string(used|dirty), ceph_cap_string(drop), 4109 ceph_cap_string(unless)); 4110 4111 /* only drop unused, clean caps */ 4112 drop &= ~(used | dirty); 4113 4114 cap = __get_cap_for_mds(ci, mds); 4115 if (cap && __cap_is_valid(cap)) { 4116 unless &= cap->issued; 4117 if (unless) { 4118 if (unless & CEPH_CAP_AUTH_EXCL) 4119 drop &= ~CEPH_CAP_AUTH_SHARED; 4120 if (unless & CEPH_CAP_LINK_EXCL) 4121 drop &= ~CEPH_CAP_LINK_SHARED; 4122 if (unless & CEPH_CAP_XATTR_EXCL) 4123 drop &= ~CEPH_CAP_XATTR_SHARED; 4124 if (unless & CEPH_CAP_FILE_EXCL) 4125 drop &= ~CEPH_CAP_FILE_SHARED; 4126 } 4127 4128 if (force || (cap->issued & drop)) { 4129 if (cap->issued & drop) { 4130 int wanted = __ceph_caps_wanted(ci); 4131 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0) 4132 wanted |= cap->mds_wanted; 4133 dout("encode_inode_release %p cap %p " 4134 "%s -> %s, wanted %s -> %s\n", inode, cap, 4135 ceph_cap_string(cap->issued), 4136 ceph_cap_string(cap->issued & ~drop), 4137 ceph_cap_string(cap->mds_wanted), 4138 ceph_cap_string(wanted)); 4139 4140 cap->issued &= ~drop; 4141 cap->implemented &= ~drop; 4142 cap->mds_wanted = wanted; 4143 } else { 4144 dout("encode_inode_release %p cap %p %s" 4145 " (force)\n", inode, cap, 4146 ceph_cap_string(cap->issued)); 4147 } 4148 4149 rel->ino = cpu_to_le64(ceph_ino(inode)); 4150 rel->cap_id = cpu_to_le64(cap->cap_id); 4151 rel->seq = cpu_to_le32(cap->seq); 4152 rel->issue_seq = cpu_to_le32(cap->issue_seq); 4153 rel->mseq = cpu_to_le32(cap->mseq); 4154 rel->caps = cpu_to_le32(cap->implemented); 4155 rel->wanted = cpu_to_le32(cap->mds_wanted); 4156 rel->dname_len = 0; 4157 rel->dname_seq = 0; 4158 *p += sizeof(*rel); 4159 ret = 1; 4160 } else { 4161 dout("encode_inode_release %p cap %p %s (noop)\n", 4162 inode, cap, ceph_cap_string(cap->issued)); 4163 } 4164 } 4165 spin_unlock(&ci->i_ceph_lock); 4166 return ret; 4167 } 4168 4169 int ceph_encode_dentry_release(void **p, struct dentry *dentry, 4170 struct inode *dir, 4171 int mds, int drop, int unless) 4172 { 4173 struct dentry *parent = NULL; 4174 struct ceph_mds_request_release *rel = *p; 4175 struct ceph_dentry_info *di = ceph_dentry(dentry); 4176 int force = 0; 4177 int ret; 4178 4179 /* 4180 * force an record for the directory caps if we have a dentry lease. 4181 * this is racy (can't take i_ceph_lock and d_lock together), but it 4182 * doesn't have to be perfect; the mds will revoke anything we don't 4183 * release. 4184 */ 4185 spin_lock(&dentry->d_lock); 4186 if (di->lease_session && di->lease_session->s_mds == mds) 4187 force = 1; 4188 if (!dir) { 4189 parent = dget(dentry->d_parent); 4190 dir = d_inode(parent); 4191 } 4192 spin_unlock(&dentry->d_lock); 4193 4194 ret = ceph_encode_inode_release(p, dir, mds, drop, unless, force); 4195 dput(parent); 4196 4197 spin_lock(&dentry->d_lock); 4198 if (ret && di->lease_session && di->lease_session->s_mds == mds) { 4199 dout("encode_dentry_release %p mds%d seq %d\n", 4200 dentry, mds, (int)di->lease_seq); 4201 rel->dname_len = cpu_to_le32(dentry->d_name.len); 4202 memcpy(*p, dentry->d_name.name, dentry->d_name.len); 4203 *p += dentry->d_name.len; 4204 rel->dname_seq = cpu_to_le32(di->lease_seq); 4205 __ceph_mdsc_drop_dentry_lease(dentry); 4206 } 4207 spin_unlock(&dentry->d_lock); 4208 return ret; 4209 } 4210