1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/kernel.h> 6 #include <linux/sched/signal.h> 7 #include <linux/slab.h> 8 #include <linux/vmalloc.h> 9 #include <linux/wait.h> 10 #include <linux/writeback.h> 11 12 #include "super.h" 13 #include "mds_client.h" 14 #include "cache.h" 15 #include <linux/ceph/decode.h> 16 #include <linux/ceph/messenger.h> 17 18 /* 19 * Capability management 20 * 21 * The Ceph metadata servers control client access to inode metadata 22 * and file data by issuing capabilities, granting clients permission 23 * to read and/or write both inode field and file data to OSDs 24 * (storage nodes). Each capability consists of a set of bits 25 * indicating which operations are allowed. 26 * 27 * If the client holds a *_SHARED cap, the client has a coherent value 28 * that can be safely read from the cached inode. 29 * 30 * In the case of a *_EXCL (exclusive) or FILE_WR capabilities, the 31 * client is allowed to change inode attributes (e.g., file size, 32 * mtime), note its dirty state in the ceph_cap, and asynchronously 33 * flush that metadata change to the MDS. 34 * 35 * In the event of a conflicting operation (perhaps by another 36 * client), the MDS will revoke the conflicting client capabilities. 37 * 38 * In order for a client to cache an inode, it must hold a capability 39 * with at least one MDS server. When inodes are released, release 40 * notifications are batched and periodically sent en masse to the MDS 41 * cluster to release server state. 42 */ 43 44 static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc); 45 static void __kick_flushing_caps(struct ceph_mds_client *mdsc, 46 struct ceph_mds_session *session, 47 struct ceph_inode_info *ci, 48 u64 oldest_flush_tid); 49 50 /* 51 * Generate readable cap strings for debugging output. 52 */ 53 #define MAX_CAP_STR 20 54 static char cap_str[MAX_CAP_STR][40]; 55 static DEFINE_SPINLOCK(cap_str_lock); 56 static int last_cap_str; 57 58 static char *gcap_string(char *s, int c) 59 { 60 if (c & CEPH_CAP_GSHARED) 61 *s++ = 's'; 62 if (c & CEPH_CAP_GEXCL) 63 *s++ = 'x'; 64 if (c & CEPH_CAP_GCACHE) 65 *s++ = 'c'; 66 if (c & CEPH_CAP_GRD) 67 *s++ = 'r'; 68 if (c & CEPH_CAP_GWR) 69 *s++ = 'w'; 70 if (c & CEPH_CAP_GBUFFER) 71 *s++ = 'b'; 72 if (c & CEPH_CAP_GWREXTEND) 73 *s++ = 'a'; 74 if (c & CEPH_CAP_GLAZYIO) 75 *s++ = 'l'; 76 return s; 77 } 78 79 const char *ceph_cap_string(int caps) 80 { 81 int i; 82 char *s; 83 int c; 84 85 spin_lock(&cap_str_lock); 86 i = last_cap_str++; 87 if (last_cap_str == MAX_CAP_STR) 88 last_cap_str = 0; 89 spin_unlock(&cap_str_lock); 90 91 s = cap_str[i]; 92 93 if (caps & CEPH_CAP_PIN) 94 *s++ = 'p'; 95 96 c = (caps >> CEPH_CAP_SAUTH) & 3; 97 if (c) { 98 *s++ = 'A'; 99 s = gcap_string(s, c); 100 } 101 102 c = (caps >> CEPH_CAP_SLINK) & 3; 103 if (c) { 104 *s++ = 'L'; 105 s = gcap_string(s, c); 106 } 107 108 c = (caps >> CEPH_CAP_SXATTR) & 3; 109 if (c) { 110 *s++ = 'X'; 111 s = gcap_string(s, c); 112 } 113 114 c = caps >> CEPH_CAP_SFILE; 115 if (c) { 116 *s++ = 'F'; 117 s = gcap_string(s, c); 118 } 119 120 if (s == cap_str[i]) 121 *s++ = '-'; 122 *s = 0; 123 return cap_str[i]; 124 } 125 126 void ceph_caps_init(struct ceph_mds_client *mdsc) 127 { 128 INIT_LIST_HEAD(&mdsc->caps_list); 129 spin_lock_init(&mdsc->caps_list_lock); 130 } 131 132 void ceph_caps_finalize(struct ceph_mds_client *mdsc) 133 { 134 struct ceph_cap *cap; 135 136 spin_lock(&mdsc->caps_list_lock); 137 while (!list_empty(&mdsc->caps_list)) { 138 cap = list_first_entry(&mdsc->caps_list, 139 struct ceph_cap, caps_item); 140 list_del(&cap->caps_item); 141 kmem_cache_free(ceph_cap_cachep, cap); 142 } 143 mdsc->caps_total_count = 0; 144 mdsc->caps_avail_count = 0; 145 mdsc->caps_use_count = 0; 146 mdsc->caps_reserve_count = 0; 147 mdsc->caps_min_count = 0; 148 spin_unlock(&mdsc->caps_list_lock); 149 } 150 151 void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta) 152 { 153 spin_lock(&mdsc->caps_list_lock); 154 mdsc->caps_min_count += delta; 155 BUG_ON(mdsc->caps_min_count < 0); 156 spin_unlock(&mdsc->caps_list_lock); 157 } 158 159 /* 160 * Called under mdsc->mutex. 161 */ 162 int ceph_reserve_caps(struct ceph_mds_client *mdsc, 163 struct ceph_cap_reservation *ctx, int need) 164 { 165 int i, j; 166 struct ceph_cap *cap; 167 int have; 168 int alloc = 0; 169 int max_caps; 170 bool trimmed = false; 171 struct ceph_mds_session *s; 172 LIST_HEAD(newcaps); 173 174 dout("reserve caps ctx=%p need=%d\n", ctx, need); 175 176 /* first reserve any caps that are already allocated */ 177 spin_lock(&mdsc->caps_list_lock); 178 if (mdsc->caps_avail_count >= need) 179 have = need; 180 else 181 have = mdsc->caps_avail_count; 182 mdsc->caps_avail_count -= have; 183 mdsc->caps_reserve_count += have; 184 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 185 mdsc->caps_reserve_count + 186 mdsc->caps_avail_count); 187 spin_unlock(&mdsc->caps_list_lock); 188 189 for (i = have; i < need; ) { 190 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); 191 if (cap) { 192 list_add(&cap->caps_item, &newcaps); 193 alloc++; 194 i++; 195 continue; 196 } 197 198 if (!trimmed) { 199 for (j = 0; j < mdsc->max_sessions; j++) { 200 s = __ceph_lookup_mds_session(mdsc, j); 201 if (!s) 202 continue; 203 mutex_unlock(&mdsc->mutex); 204 205 mutex_lock(&s->s_mutex); 206 max_caps = s->s_nr_caps - (need - i); 207 ceph_trim_caps(mdsc, s, max_caps); 208 mutex_unlock(&s->s_mutex); 209 210 ceph_put_mds_session(s); 211 mutex_lock(&mdsc->mutex); 212 } 213 trimmed = true; 214 215 spin_lock(&mdsc->caps_list_lock); 216 if (mdsc->caps_avail_count) { 217 int more_have; 218 if (mdsc->caps_avail_count >= need - i) 219 more_have = need - i; 220 else 221 more_have = mdsc->caps_avail_count; 222 223 i += more_have; 224 have += more_have; 225 mdsc->caps_avail_count -= more_have; 226 mdsc->caps_reserve_count += more_have; 227 228 } 229 spin_unlock(&mdsc->caps_list_lock); 230 231 continue; 232 } 233 234 pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n", 235 ctx, need, have + alloc); 236 goto out_nomem; 237 } 238 BUG_ON(have + alloc != need); 239 240 spin_lock(&mdsc->caps_list_lock); 241 mdsc->caps_total_count += alloc; 242 mdsc->caps_reserve_count += alloc; 243 list_splice(&newcaps, &mdsc->caps_list); 244 245 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 246 mdsc->caps_reserve_count + 247 mdsc->caps_avail_count); 248 spin_unlock(&mdsc->caps_list_lock); 249 250 ctx->count = need; 251 dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n", 252 ctx, mdsc->caps_total_count, mdsc->caps_use_count, 253 mdsc->caps_reserve_count, mdsc->caps_avail_count); 254 return 0; 255 256 out_nomem: 257 258 spin_lock(&mdsc->caps_list_lock); 259 mdsc->caps_avail_count += have; 260 mdsc->caps_reserve_count -= have; 261 262 while (!list_empty(&newcaps)) { 263 cap = list_first_entry(&newcaps, 264 struct ceph_cap, caps_item); 265 list_del(&cap->caps_item); 266 267 /* Keep some preallocated caps around (ceph_min_count), to 268 * avoid lots of free/alloc churn. */ 269 if (mdsc->caps_avail_count >= 270 mdsc->caps_reserve_count + mdsc->caps_min_count) { 271 kmem_cache_free(ceph_cap_cachep, cap); 272 } else { 273 mdsc->caps_avail_count++; 274 mdsc->caps_total_count++; 275 list_add(&cap->caps_item, &mdsc->caps_list); 276 } 277 } 278 279 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 280 mdsc->caps_reserve_count + 281 mdsc->caps_avail_count); 282 spin_unlock(&mdsc->caps_list_lock); 283 return -ENOMEM; 284 } 285 286 int ceph_unreserve_caps(struct ceph_mds_client *mdsc, 287 struct ceph_cap_reservation *ctx) 288 { 289 int i; 290 struct ceph_cap *cap; 291 292 dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count); 293 if (ctx->count) { 294 spin_lock(&mdsc->caps_list_lock); 295 BUG_ON(mdsc->caps_reserve_count < ctx->count); 296 mdsc->caps_reserve_count -= ctx->count; 297 if (mdsc->caps_avail_count >= 298 mdsc->caps_reserve_count + mdsc->caps_min_count) { 299 mdsc->caps_total_count -= ctx->count; 300 for (i = 0; i < ctx->count; i++) { 301 cap = list_first_entry(&mdsc->caps_list, 302 struct ceph_cap, caps_item); 303 list_del(&cap->caps_item); 304 kmem_cache_free(ceph_cap_cachep, cap); 305 } 306 } else { 307 mdsc->caps_avail_count += ctx->count; 308 } 309 ctx->count = 0; 310 dout("unreserve caps %d = %d used + %d resv + %d avail\n", 311 mdsc->caps_total_count, mdsc->caps_use_count, 312 mdsc->caps_reserve_count, mdsc->caps_avail_count); 313 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 314 mdsc->caps_reserve_count + 315 mdsc->caps_avail_count); 316 spin_unlock(&mdsc->caps_list_lock); 317 } 318 return 0; 319 } 320 321 struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc, 322 struct ceph_cap_reservation *ctx) 323 { 324 struct ceph_cap *cap = NULL; 325 326 /* temporary, until we do something about cap import/export */ 327 if (!ctx) { 328 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); 329 if (cap) { 330 spin_lock(&mdsc->caps_list_lock); 331 mdsc->caps_use_count++; 332 mdsc->caps_total_count++; 333 spin_unlock(&mdsc->caps_list_lock); 334 } else { 335 spin_lock(&mdsc->caps_list_lock); 336 if (mdsc->caps_avail_count) { 337 BUG_ON(list_empty(&mdsc->caps_list)); 338 339 mdsc->caps_avail_count--; 340 mdsc->caps_use_count++; 341 cap = list_first_entry(&mdsc->caps_list, 342 struct ceph_cap, caps_item); 343 list_del(&cap->caps_item); 344 345 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 346 mdsc->caps_reserve_count + mdsc->caps_avail_count); 347 } 348 spin_unlock(&mdsc->caps_list_lock); 349 } 350 351 return cap; 352 } 353 354 spin_lock(&mdsc->caps_list_lock); 355 dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n", 356 ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count, 357 mdsc->caps_reserve_count, mdsc->caps_avail_count); 358 BUG_ON(!ctx->count); 359 BUG_ON(ctx->count > mdsc->caps_reserve_count); 360 BUG_ON(list_empty(&mdsc->caps_list)); 361 362 ctx->count--; 363 mdsc->caps_reserve_count--; 364 mdsc->caps_use_count++; 365 366 cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item); 367 list_del(&cap->caps_item); 368 369 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 370 mdsc->caps_reserve_count + mdsc->caps_avail_count); 371 spin_unlock(&mdsc->caps_list_lock); 372 return cap; 373 } 374 375 void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap) 376 { 377 spin_lock(&mdsc->caps_list_lock); 378 dout("put_cap %p %d = %d used + %d resv + %d avail\n", 379 cap, mdsc->caps_total_count, mdsc->caps_use_count, 380 mdsc->caps_reserve_count, mdsc->caps_avail_count); 381 mdsc->caps_use_count--; 382 /* 383 * Keep some preallocated caps around (ceph_min_count), to 384 * avoid lots of free/alloc churn. 385 */ 386 if (mdsc->caps_avail_count >= mdsc->caps_reserve_count + 387 mdsc->caps_min_count) { 388 mdsc->caps_total_count--; 389 kmem_cache_free(ceph_cap_cachep, cap); 390 } else { 391 mdsc->caps_avail_count++; 392 list_add(&cap->caps_item, &mdsc->caps_list); 393 } 394 395 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 396 mdsc->caps_reserve_count + mdsc->caps_avail_count); 397 spin_unlock(&mdsc->caps_list_lock); 398 } 399 400 void ceph_reservation_status(struct ceph_fs_client *fsc, 401 int *total, int *avail, int *used, int *reserved, 402 int *min) 403 { 404 struct ceph_mds_client *mdsc = fsc->mdsc; 405 406 spin_lock(&mdsc->caps_list_lock); 407 408 if (total) 409 *total = mdsc->caps_total_count; 410 if (avail) 411 *avail = mdsc->caps_avail_count; 412 if (used) 413 *used = mdsc->caps_use_count; 414 if (reserved) 415 *reserved = mdsc->caps_reserve_count; 416 if (min) 417 *min = mdsc->caps_min_count; 418 419 spin_unlock(&mdsc->caps_list_lock); 420 } 421 422 /* 423 * Find ceph_cap for given mds, if any. 424 * 425 * Called with i_ceph_lock held. 426 */ 427 static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds) 428 { 429 struct ceph_cap *cap; 430 struct rb_node *n = ci->i_caps.rb_node; 431 432 while (n) { 433 cap = rb_entry(n, struct ceph_cap, ci_node); 434 if (mds < cap->mds) 435 n = n->rb_left; 436 else if (mds > cap->mds) 437 n = n->rb_right; 438 else 439 return cap; 440 } 441 return NULL; 442 } 443 444 struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds) 445 { 446 struct ceph_cap *cap; 447 448 spin_lock(&ci->i_ceph_lock); 449 cap = __get_cap_for_mds(ci, mds); 450 spin_unlock(&ci->i_ceph_lock); 451 return cap; 452 } 453 454 /* 455 * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1. 456 */ 457 static int __ceph_get_cap_mds(struct ceph_inode_info *ci) 458 { 459 struct ceph_cap *cap; 460 int mds = -1; 461 struct rb_node *p; 462 463 /* prefer mds with WR|BUFFER|EXCL caps */ 464 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 465 cap = rb_entry(p, struct ceph_cap, ci_node); 466 mds = cap->mds; 467 if (cap->issued & (CEPH_CAP_FILE_WR | 468 CEPH_CAP_FILE_BUFFER | 469 CEPH_CAP_FILE_EXCL)) 470 break; 471 } 472 return mds; 473 } 474 475 int ceph_get_cap_mds(struct inode *inode) 476 { 477 struct ceph_inode_info *ci = ceph_inode(inode); 478 int mds; 479 spin_lock(&ci->i_ceph_lock); 480 mds = __ceph_get_cap_mds(ceph_inode(inode)); 481 spin_unlock(&ci->i_ceph_lock); 482 return mds; 483 } 484 485 /* 486 * Called under i_ceph_lock. 487 */ 488 static void __insert_cap_node(struct ceph_inode_info *ci, 489 struct ceph_cap *new) 490 { 491 struct rb_node **p = &ci->i_caps.rb_node; 492 struct rb_node *parent = NULL; 493 struct ceph_cap *cap = NULL; 494 495 while (*p) { 496 parent = *p; 497 cap = rb_entry(parent, struct ceph_cap, ci_node); 498 if (new->mds < cap->mds) 499 p = &(*p)->rb_left; 500 else if (new->mds > cap->mds) 501 p = &(*p)->rb_right; 502 else 503 BUG(); 504 } 505 506 rb_link_node(&new->ci_node, parent, p); 507 rb_insert_color(&new->ci_node, &ci->i_caps); 508 } 509 510 /* 511 * (re)set cap hold timeouts, which control the delayed release 512 * of unused caps back to the MDS. Should be called on cap use. 513 */ 514 static void __cap_set_timeouts(struct ceph_mds_client *mdsc, 515 struct ceph_inode_info *ci) 516 { 517 struct ceph_mount_options *ma = mdsc->fsc->mount_options; 518 519 ci->i_hold_caps_min = round_jiffies(jiffies + 520 ma->caps_wanted_delay_min * HZ); 521 ci->i_hold_caps_max = round_jiffies(jiffies + 522 ma->caps_wanted_delay_max * HZ); 523 dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode, 524 ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies); 525 } 526 527 /* 528 * (Re)queue cap at the end of the delayed cap release list. 529 * 530 * If I_FLUSH is set, leave the inode at the front of the list. 531 * 532 * Caller holds i_ceph_lock 533 * -> we take mdsc->cap_delay_lock 534 */ 535 static void __cap_delay_requeue(struct ceph_mds_client *mdsc, 536 struct ceph_inode_info *ci) 537 { 538 __cap_set_timeouts(mdsc, ci); 539 dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode, 540 ci->i_ceph_flags, ci->i_hold_caps_max); 541 if (!mdsc->stopping) { 542 spin_lock(&mdsc->cap_delay_lock); 543 if (!list_empty(&ci->i_cap_delay_list)) { 544 if (ci->i_ceph_flags & CEPH_I_FLUSH) 545 goto no_change; 546 list_del_init(&ci->i_cap_delay_list); 547 } 548 list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list); 549 no_change: 550 spin_unlock(&mdsc->cap_delay_lock); 551 } 552 } 553 554 /* 555 * Queue an inode for immediate writeback. Mark inode with I_FLUSH, 556 * indicating we should send a cap message to flush dirty metadata 557 * asap, and move to the front of the delayed cap list. 558 */ 559 static void __cap_delay_requeue_front(struct ceph_mds_client *mdsc, 560 struct ceph_inode_info *ci) 561 { 562 dout("__cap_delay_requeue_front %p\n", &ci->vfs_inode); 563 spin_lock(&mdsc->cap_delay_lock); 564 ci->i_ceph_flags |= CEPH_I_FLUSH; 565 if (!list_empty(&ci->i_cap_delay_list)) 566 list_del_init(&ci->i_cap_delay_list); 567 list_add(&ci->i_cap_delay_list, &mdsc->cap_delay_list); 568 spin_unlock(&mdsc->cap_delay_lock); 569 } 570 571 /* 572 * Cancel delayed work on cap. 573 * 574 * Caller must hold i_ceph_lock. 575 */ 576 static void __cap_delay_cancel(struct ceph_mds_client *mdsc, 577 struct ceph_inode_info *ci) 578 { 579 dout("__cap_delay_cancel %p\n", &ci->vfs_inode); 580 if (list_empty(&ci->i_cap_delay_list)) 581 return; 582 spin_lock(&mdsc->cap_delay_lock); 583 list_del_init(&ci->i_cap_delay_list); 584 spin_unlock(&mdsc->cap_delay_lock); 585 } 586 587 /* 588 * Common issue checks for add_cap, handle_cap_grant. 589 */ 590 static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap, 591 unsigned issued) 592 { 593 unsigned had = __ceph_caps_issued(ci, NULL); 594 595 /* 596 * Each time we receive FILE_CACHE anew, we increment 597 * i_rdcache_gen. 598 */ 599 if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && 600 (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) { 601 ci->i_rdcache_gen++; 602 } 603 604 /* 605 * If FILE_SHARED is newly issued, mark dir not complete. We don't 606 * know what happened to this directory while we didn't have the cap. 607 * If FILE_SHARED is being revoked, also mark dir not complete. It 608 * stops on-going cached readdir. 609 */ 610 if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) { 611 if (issued & CEPH_CAP_FILE_SHARED) 612 atomic_inc(&ci->i_shared_gen); 613 if (S_ISDIR(ci->vfs_inode.i_mode)) { 614 dout(" marking %p NOT complete\n", &ci->vfs_inode); 615 __ceph_dir_clear_complete(ci); 616 } 617 } 618 } 619 620 /* 621 * Add a capability under the given MDS session. 622 * 623 * Caller should hold session snap_rwsem (read) and s_mutex. 624 * 625 * @fmode is the open file mode, if we are opening a file, otherwise 626 * it is < 0. (This is so we can atomically add the cap and add an 627 * open file reference to it.) 628 */ 629 void ceph_add_cap(struct inode *inode, 630 struct ceph_mds_session *session, u64 cap_id, 631 int fmode, unsigned issued, unsigned wanted, 632 unsigned seq, unsigned mseq, u64 realmino, int flags, 633 struct ceph_cap **new_cap) 634 { 635 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 636 struct ceph_inode_info *ci = ceph_inode(inode); 637 struct ceph_cap *cap; 638 int mds = session->s_mds; 639 int actual_wanted; 640 641 dout("add_cap %p mds%d cap %llx %s seq %d\n", inode, 642 session->s_mds, cap_id, ceph_cap_string(issued), seq); 643 644 /* 645 * If we are opening the file, include file mode wanted bits 646 * in wanted. 647 */ 648 if (fmode >= 0) 649 wanted |= ceph_caps_for_mode(fmode); 650 651 cap = __get_cap_for_mds(ci, mds); 652 if (!cap) { 653 cap = *new_cap; 654 *new_cap = NULL; 655 656 cap->issued = 0; 657 cap->implemented = 0; 658 cap->mds = mds; 659 cap->mds_wanted = 0; 660 cap->mseq = 0; 661 662 cap->ci = ci; 663 __insert_cap_node(ci, cap); 664 665 /* add to session cap list */ 666 cap->session = session; 667 spin_lock(&session->s_cap_lock); 668 list_add_tail(&cap->session_caps, &session->s_caps); 669 session->s_nr_caps++; 670 spin_unlock(&session->s_cap_lock); 671 } else { 672 /* 673 * auth mds of the inode changed. we received the cap export 674 * message, but still haven't received the cap import message. 675 * handle_cap_export() updated the new auth MDS' cap. 676 * 677 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing 678 * a message that was send before the cap import message. So 679 * don't remove caps. 680 */ 681 if (ceph_seq_cmp(seq, cap->seq) <= 0) { 682 WARN_ON(cap != ci->i_auth_cap); 683 WARN_ON(cap->cap_id != cap_id); 684 seq = cap->seq; 685 mseq = cap->mseq; 686 issued |= cap->issued; 687 flags |= CEPH_CAP_FLAG_AUTH; 688 } 689 } 690 691 if (!ci->i_snap_realm || 692 ((flags & CEPH_CAP_FLAG_AUTH) && 693 realmino != (u64)-1 && ci->i_snap_realm->ino != realmino)) { 694 /* 695 * add this inode to the appropriate snap realm 696 */ 697 struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc, 698 realmino); 699 if (realm) { 700 struct ceph_snap_realm *oldrealm = ci->i_snap_realm; 701 if (oldrealm) { 702 spin_lock(&oldrealm->inodes_with_caps_lock); 703 list_del_init(&ci->i_snap_realm_item); 704 spin_unlock(&oldrealm->inodes_with_caps_lock); 705 } 706 707 spin_lock(&realm->inodes_with_caps_lock); 708 list_add(&ci->i_snap_realm_item, 709 &realm->inodes_with_caps); 710 ci->i_snap_realm = realm; 711 if (realm->ino == ci->i_vino.ino) 712 realm->inode = inode; 713 spin_unlock(&realm->inodes_with_caps_lock); 714 715 if (oldrealm) 716 ceph_put_snap_realm(mdsc, oldrealm); 717 } else { 718 pr_err("ceph_add_cap: couldn't find snap realm %llx\n", 719 realmino); 720 WARN_ON(!realm); 721 } 722 } 723 724 __check_cap_issue(ci, cap, issued); 725 726 /* 727 * If we are issued caps we don't want, or the mds' wanted 728 * value appears to be off, queue a check so we'll release 729 * later and/or update the mds wanted value. 730 */ 731 actual_wanted = __ceph_caps_wanted(ci); 732 if ((wanted & ~actual_wanted) || 733 (issued & ~actual_wanted & CEPH_CAP_ANY_WR)) { 734 dout(" issued %s, mds wanted %s, actual %s, queueing\n", 735 ceph_cap_string(issued), ceph_cap_string(wanted), 736 ceph_cap_string(actual_wanted)); 737 __cap_delay_requeue(mdsc, ci); 738 } 739 740 if (flags & CEPH_CAP_FLAG_AUTH) { 741 if (!ci->i_auth_cap || 742 ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) { 743 ci->i_auth_cap = cap; 744 cap->mds_wanted = wanted; 745 } 746 } else { 747 WARN_ON(ci->i_auth_cap == cap); 748 } 749 750 dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n", 751 inode, ceph_vinop(inode), cap, ceph_cap_string(issued), 752 ceph_cap_string(issued|cap->issued), seq, mds); 753 cap->cap_id = cap_id; 754 cap->issued = issued; 755 cap->implemented |= issued; 756 if (ceph_seq_cmp(mseq, cap->mseq) > 0) 757 cap->mds_wanted = wanted; 758 else 759 cap->mds_wanted |= wanted; 760 cap->seq = seq; 761 cap->issue_seq = seq; 762 cap->mseq = mseq; 763 cap->cap_gen = session->s_cap_gen; 764 765 if (fmode >= 0) 766 __ceph_get_fmode(ci, fmode); 767 } 768 769 /* 770 * Return true if cap has not timed out and belongs to the current 771 * generation of the MDS session (i.e. has not gone 'stale' due to 772 * us losing touch with the mds). 773 */ 774 static int __cap_is_valid(struct ceph_cap *cap) 775 { 776 unsigned long ttl; 777 u32 gen; 778 779 spin_lock(&cap->session->s_gen_ttl_lock); 780 gen = cap->session->s_cap_gen; 781 ttl = cap->session->s_cap_ttl; 782 spin_unlock(&cap->session->s_gen_ttl_lock); 783 784 if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) { 785 dout("__cap_is_valid %p cap %p issued %s " 786 "but STALE (gen %u vs %u)\n", &cap->ci->vfs_inode, 787 cap, ceph_cap_string(cap->issued), cap->cap_gen, gen); 788 return 0; 789 } 790 791 return 1; 792 } 793 794 /* 795 * Return set of valid cap bits issued to us. Note that caps time 796 * out, and may be invalidated in bulk if the client session times out 797 * and session->s_cap_gen is bumped. 798 */ 799 int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented) 800 { 801 int have = ci->i_snap_caps; 802 struct ceph_cap *cap; 803 struct rb_node *p; 804 805 if (implemented) 806 *implemented = 0; 807 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 808 cap = rb_entry(p, struct ceph_cap, ci_node); 809 if (!__cap_is_valid(cap)) 810 continue; 811 dout("__ceph_caps_issued %p cap %p issued %s\n", 812 &ci->vfs_inode, cap, ceph_cap_string(cap->issued)); 813 have |= cap->issued; 814 if (implemented) 815 *implemented |= cap->implemented; 816 } 817 /* 818 * exclude caps issued by non-auth MDS, but are been revoking 819 * by the auth MDS. The non-auth MDS should be revoking/exporting 820 * these caps, but the message is delayed. 821 */ 822 if (ci->i_auth_cap) { 823 cap = ci->i_auth_cap; 824 have &= ~cap->implemented | cap->issued; 825 } 826 return have; 827 } 828 829 /* 830 * Get cap bits issued by caps other than @ocap 831 */ 832 int __ceph_caps_issued_other(struct ceph_inode_info *ci, struct ceph_cap *ocap) 833 { 834 int have = ci->i_snap_caps; 835 struct ceph_cap *cap; 836 struct rb_node *p; 837 838 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 839 cap = rb_entry(p, struct ceph_cap, ci_node); 840 if (cap == ocap) 841 continue; 842 if (!__cap_is_valid(cap)) 843 continue; 844 have |= cap->issued; 845 } 846 return have; 847 } 848 849 /* 850 * Move a cap to the end of the LRU (oldest caps at list head, newest 851 * at list tail). 852 */ 853 static void __touch_cap(struct ceph_cap *cap) 854 { 855 struct ceph_mds_session *s = cap->session; 856 857 spin_lock(&s->s_cap_lock); 858 if (!s->s_cap_iterator) { 859 dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap, 860 s->s_mds); 861 list_move_tail(&cap->session_caps, &s->s_caps); 862 } else { 863 dout("__touch_cap %p cap %p mds%d NOP, iterating over caps\n", 864 &cap->ci->vfs_inode, cap, s->s_mds); 865 } 866 spin_unlock(&s->s_cap_lock); 867 } 868 869 /* 870 * Check if we hold the given mask. If so, move the cap(s) to the 871 * front of their respective LRUs. (This is the preferred way for 872 * callers to check for caps they want.) 873 */ 874 int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch) 875 { 876 struct ceph_cap *cap; 877 struct rb_node *p; 878 int have = ci->i_snap_caps; 879 880 if ((have & mask) == mask) { 881 dout("__ceph_caps_issued_mask %p snap issued %s" 882 " (mask %s)\n", &ci->vfs_inode, 883 ceph_cap_string(have), 884 ceph_cap_string(mask)); 885 return 1; 886 } 887 888 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 889 cap = rb_entry(p, struct ceph_cap, ci_node); 890 if (!__cap_is_valid(cap)) 891 continue; 892 if ((cap->issued & mask) == mask) { 893 dout("__ceph_caps_issued_mask %p cap %p issued %s" 894 " (mask %s)\n", &ci->vfs_inode, cap, 895 ceph_cap_string(cap->issued), 896 ceph_cap_string(mask)); 897 if (touch) 898 __touch_cap(cap); 899 return 1; 900 } 901 902 /* does a combination of caps satisfy mask? */ 903 have |= cap->issued; 904 if ((have & mask) == mask) { 905 dout("__ceph_caps_issued_mask %p combo issued %s" 906 " (mask %s)\n", &ci->vfs_inode, 907 ceph_cap_string(cap->issued), 908 ceph_cap_string(mask)); 909 if (touch) { 910 struct rb_node *q; 911 912 /* touch this + preceding caps */ 913 __touch_cap(cap); 914 for (q = rb_first(&ci->i_caps); q != p; 915 q = rb_next(q)) { 916 cap = rb_entry(q, struct ceph_cap, 917 ci_node); 918 if (!__cap_is_valid(cap)) 919 continue; 920 __touch_cap(cap); 921 } 922 } 923 return 1; 924 } 925 } 926 927 return 0; 928 } 929 930 /* 931 * Return true if mask caps are currently being revoked by an MDS. 932 */ 933 int __ceph_caps_revoking_other(struct ceph_inode_info *ci, 934 struct ceph_cap *ocap, int mask) 935 { 936 struct ceph_cap *cap; 937 struct rb_node *p; 938 939 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 940 cap = rb_entry(p, struct ceph_cap, ci_node); 941 if (cap != ocap && 942 (cap->implemented & ~cap->issued & mask)) 943 return 1; 944 } 945 return 0; 946 } 947 948 int ceph_caps_revoking(struct ceph_inode_info *ci, int mask) 949 { 950 struct inode *inode = &ci->vfs_inode; 951 int ret; 952 953 spin_lock(&ci->i_ceph_lock); 954 ret = __ceph_caps_revoking_other(ci, NULL, mask); 955 spin_unlock(&ci->i_ceph_lock); 956 dout("ceph_caps_revoking %p %s = %d\n", inode, 957 ceph_cap_string(mask), ret); 958 return ret; 959 } 960 961 int __ceph_caps_used(struct ceph_inode_info *ci) 962 { 963 int used = 0; 964 if (ci->i_pin_ref) 965 used |= CEPH_CAP_PIN; 966 if (ci->i_rd_ref) 967 used |= CEPH_CAP_FILE_RD; 968 if (ci->i_rdcache_ref || 969 (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */ 970 ci->vfs_inode.i_data.nrpages)) 971 used |= CEPH_CAP_FILE_CACHE; 972 if (ci->i_wr_ref) 973 used |= CEPH_CAP_FILE_WR; 974 if (ci->i_wb_ref || ci->i_wrbuffer_ref) 975 used |= CEPH_CAP_FILE_BUFFER; 976 return used; 977 } 978 979 /* 980 * wanted, by virtue of open file modes 981 */ 982 int __ceph_caps_file_wanted(struct ceph_inode_info *ci) 983 { 984 int i, bits = 0; 985 for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { 986 if (ci->i_nr_by_mode[i]) 987 bits |= 1 << i; 988 } 989 if (bits == 0) 990 return 0; 991 return ceph_caps_for_mode(bits >> 1); 992 } 993 994 /* 995 * Return caps we have registered with the MDS(s) as 'wanted'. 996 */ 997 int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check) 998 { 999 struct ceph_cap *cap; 1000 struct rb_node *p; 1001 int mds_wanted = 0; 1002 1003 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 1004 cap = rb_entry(p, struct ceph_cap, ci_node); 1005 if (check && !__cap_is_valid(cap)) 1006 continue; 1007 if (cap == ci->i_auth_cap) 1008 mds_wanted |= cap->mds_wanted; 1009 else 1010 mds_wanted |= (cap->mds_wanted & ~CEPH_CAP_ANY_FILE_WR); 1011 } 1012 return mds_wanted; 1013 } 1014 1015 /* 1016 * called under i_ceph_lock 1017 */ 1018 static int __ceph_is_single_caps(struct ceph_inode_info *ci) 1019 { 1020 return rb_first(&ci->i_caps) == rb_last(&ci->i_caps); 1021 } 1022 1023 static int __ceph_is_any_caps(struct ceph_inode_info *ci) 1024 { 1025 return !RB_EMPTY_ROOT(&ci->i_caps); 1026 } 1027 1028 int ceph_is_any_caps(struct inode *inode) 1029 { 1030 struct ceph_inode_info *ci = ceph_inode(inode); 1031 int ret; 1032 1033 spin_lock(&ci->i_ceph_lock); 1034 ret = __ceph_is_any_caps(ci); 1035 spin_unlock(&ci->i_ceph_lock); 1036 1037 return ret; 1038 } 1039 1040 static void drop_inode_snap_realm(struct ceph_inode_info *ci) 1041 { 1042 struct ceph_snap_realm *realm = ci->i_snap_realm; 1043 spin_lock(&realm->inodes_with_caps_lock); 1044 list_del_init(&ci->i_snap_realm_item); 1045 ci->i_snap_realm_counter++; 1046 ci->i_snap_realm = NULL; 1047 spin_unlock(&realm->inodes_with_caps_lock); 1048 ceph_put_snap_realm(ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc, 1049 realm); 1050 } 1051 1052 /* 1053 * Remove a cap. Take steps to deal with a racing iterate_session_caps. 1054 * 1055 * caller should hold i_ceph_lock. 1056 * caller will not hold session s_mutex if called from destroy_inode. 1057 */ 1058 void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) 1059 { 1060 struct ceph_mds_session *session = cap->session; 1061 struct ceph_inode_info *ci = cap->ci; 1062 struct ceph_mds_client *mdsc = 1063 ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; 1064 int removed = 0; 1065 1066 dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode); 1067 1068 /* remove from session list */ 1069 spin_lock(&session->s_cap_lock); 1070 if (session->s_cap_iterator == cap) { 1071 /* not yet, we are iterating over this very cap */ 1072 dout("__ceph_remove_cap delaying %p removal from session %p\n", 1073 cap, cap->session); 1074 } else { 1075 list_del_init(&cap->session_caps); 1076 session->s_nr_caps--; 1077 cap->session = NULL; 1078 removed = 1; 1079 } 1080 /* protect backpointer with s_cap_lock: see iterate_session_caps */ 1081 cap->ci = NULL; 1082 1083 /* 1084 * s_cap_reconnect is protected by s_cap_lock. no one changes 1085 * s_cap_gen while session is in the reconnect state. 1086 */ 1087 if (queue_release && 1088 (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) { 1089 cap->queue_release = 1; 1090 if (removed) { 1091 list_add_tail(&cap->session_caps, 1092 &session->s_cap_releases); 1093 session->s_num_cap_releases++; 1094 removed = 0; 1095 } 1096 } else { 1097 cap->queue_release = 0; 1098 } 1099 cap->cap_ino = ci->i_vino.ino; 1100 1101 spin_unlock(&session->s_cap_lock); 1102 1103 /* remove from inode list */ 1104 rb_erase(&cap->ci_node, &ci->i_caps); 1105 if (ci->i_auth_cap == cap) 1106 ci->i_auth_cap = NULL; 1107 1108 if (removed) 1109 ceph_put_cap(mdsc, cap); 1110 1111 /* when reconnect denied, we remove session caps forcibly, 1112 * i_wr_ref can be non-zero. If there are ongoing write, 1113 * keep i_snap_realm. 1114 */ 1115 if (!__ceph_is_any_caps(ci) && ci->i_wr_ref == 0 && ci->i_snap_realm) 1116 drop_inode_snap_realm(ci); 1117 1118 if (!__ceph_is_any_real_caps(ci)) 1119 __cap_delay_cancel(mdsc, ci); 1120 } 1121 1122 struct cap_msg_args { 1123 struct ceph_mds_session *session; 1124 u64 ino, cid, follows; 1125 u64 flush_tid, oldest_flush_tid, size, max_size; 1126 u64 xattr_version; 1127 struct ceph_buffer *xattr_buf; 1128 struct timespec atime, mtime, ctime; 1129 int op, caps, wanted, dirty; 1130 u32 seq, issue_seq, mseq, time_warp_seq; 1131 u32 flags; 1132 kuid_t uid; 1133 kgid_t gid; 1134 umode_t mode; 1135 bool inline_data; 1136 }; 1137 1138 /* 1139 * Build and send a cap message to the given MDS. 1140 * 1141 * Caller should be holding s_mutex. 1142 */ 1143 static int send_cap_msg(struct cap_msg_args *arg) 1144 { 1145 struct ceph_mds_caps *fc; 1146 struct ceph_msg *msg; 1147 void *p; 1148 size_t extra_len; 1149 struct timespec zerotime = {0}; 1150 struct ceph_osd_client *osdc = &arg->session->s_mdsc->fsc->client->osdc; 1151 1152 dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" 1153 " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu" 1154 " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(arg->op), 1155 arg->cid, arg->ino, ceph_cap_string(arg->caps), 1156 ceph_cap_string(arg->wanted), ceph_cap_string(arg->dirty), 1157 arg->seq, arg->issue_seq, arg->flush_tid, arg->oldest_flush_tid, 1158 arg->mseq, arg->follows, arg->size, arg->max_size, 1159 arg->xattr_version, 1160 arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0); 1161 1162 /* flock buffer size + inline version + inline data size + 1163 * osd_epoch_barrier + oldest_flush_tid */ 1164 extra_len = 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4; 1165 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len, 1166 GFP_NOFS, false); 1167 if (!msg) 1168 return -ENOMEM; 1169 1170 msg->hdr.version = cpu_to_le16(10); 1171 msg->hdr.tid = cpu_to_le64(arg->flush_tid); 1172 1173 fc = msg->front.iov_base; 1174 memset(fc, 0, sizeof(*fc)); 1175 1176 fc->cap_id = cpu_to_le64(arg->cid); 1177 fc->op = cpu_to_le32(arg->op); 1178 fc->seq = cpu_to_le32(arg->seq); 1179 fc->issue_seq = cpu_to_le32(arg->issue_seq); 1180 fc->migrate_seq = cpu_to_le32(arg->mseq); 1181 fc->caps = cpu_to_le32(arg->caps); 1182 fc->wanted = cpu_to_le32(arg->wanted); 1183 fc->dirty = cpu_to_le32(arg->dirty); 1184 fc->ino = cpu_to_le64(arg->ino); 1185 fc->snap_follows = cpu_to_le64(arg->follows); 1186 1187 fc->size = cpu_to_le64(arg->size); 1188 fc->max_size = cpu_to_le64(arg->max_size); 1189 ceph_encode_timespec(&fc->mtime, &arg->mtime); 1190 ceph_encode_timespec(&fc->atime, &arg->atime); 1191 ceph_encode_timespec(&fc->ctime, &arg->ctime); 1192 fc->time_warp_seq = cpu_to_le32(arg->time_warp_seq); 1193 1194 fc->uid = cpu_to_le32(from_kuid(&init_user_ns, arg->uid)); 1195 fc->gid = cpu_to_le32(from_kgid(&init_user_ns, arg->gid)); 1196 fc->mode = cpu_to_le32(arg->mode); 1197 1198 fc->xattr_version = cpu_to_le64(arg->xattr_version); 1199 if (arg->xattr_buf) { 1200 msg->middle = ceph_buffer_get(arg->xattr_buf); 1201 fc->xattr_len = cpu_to_le32(arg->xattr_buf->vec.iov_len); 1202 msg->hdr.middle_len = cpu_to_le32(arg->xattr_buf->vec.iov_len); 1203 } 1204 1205 p = fc + 1; 1206 /* flock buffer size (version 2) */ 1207 ceph_encode_32(&p, 0); 1208 /* inline version (version 4) */ 1209 ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE); 1210 /* inline data size */ 1211 ceph_encode_32(&p, 0); 1212 /* 1213 * osd_epoch_barrier (version 5) 1214 * The epoch_barrier is protected osdc->lock, so READ_ONCE here in 1215 * case it was recently changed 1216 */ 1217 ceph_encode_32(&p, READ_ONCE(osdc->epoch_barrier)); 1218 /* oldest_flush_tid (version 6) */ 1219 ceph_encode_64(&p, arg->oldest_flush_tid); 1220 1221 /* 1222 * caller_uid/caller_gid (version 7) 1223 * 1224 * Currently, we don't properly track which caller dirtied the caps 1225 * last, and force a flush of them when there is a conflict. For now, 1226 * just set this to 0:0, to emulate how the MDS has worked up to now. 1227 */ 1228 ceph_encode_32(&p, 0); 1229 ceph_encode_32(&p, 0); 1230 1231 /* pool namespace (version 8) (mds always ignores this) */ 1232 ceph_encode_32(&p, 0); 1233 1234 /* 1235 * btime and change_attr (version 9) 1236 * 1237 * We just zero these out for now, as the MDS ignores them unless 1238 * the requisite feature flags are set (which we don't do yet). 1239 */ 1240 ceph_encode_timespec(p, &zerotime); 1241 p += sizeof(struct ceph_timespec); 1242 ceph_encode_64(&p, 0); 1243 1244 /* Advisory flags (version 10) */ 1245 ceph_encode_32(&p, arg->flags); 1246 1247 ceph_con_send(&arg->session->s_con, msg); 1248 return 0; 1249 } 1250 1251 /* 1252 * Queue cap releases when an inode is dropped from our cache. Since 1253 * inode is about to be destroyed, there is no need for i_ceph_lock. 1254 */ 1255 void ceph_queue_caps_release(struct inode *inode) 1256 { 1257 struct ceph_inode_info *ci = ceph_inode(inode); 1258 struct rb_node *p; 1259 1260 p = rb_first(&ci->i_caps); 1261 while (p) { 1262 struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node); 1263 p = rb_next(p); 1264 __ceph_remove_cap(cap, true); 1265 } 1266 } 1267 1268 /* 1269 * Send a cap msg on the given inode. Update our caps state, then 1270 * drop i_ceph_lock and send the message. 1271 * 1272 * Make note of max_size reported/requested from mds, revoked caps 1273 * that have now been implemented. 1274 * 1275 * Make half-hearted attempt ot to invalidate page cache if we are 1276 * dropping RDCACHE. Note that this will leave behind locked pages 1277 * that we'll then need to deal with elsewhere. 1278 * 1279 * Return non-zero if delayed release, or we experienced an error 1280 * such that the caller should requeue + retry later. 1281 * 1282 * called with i_ceph_lock, then drops it. 1283 * caller should hold snap_rwsem (read), s_mutex. 1284 */ 1285 static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, 1286 int op, bool sync, int used, int want, int retain, 1287 int flushing, u64 flush_tid, u64 oldest_flush_tid) 1288 __releases(cap->ci->i_ceph_lock) 1289 { 1290 struct ceph_inode_info *ci = cap->ci; 1291 struct inode *inode = &ci->vfs_inode; 1292 struct cap_msg_args arg; 1293 int held, revoking; 1294 int wake = 0; 1295 int delayed = 0; 1296 int ret; 1297 1298 held = cap->issued | cap->implemented; 1299 revoking = cap->implemented & ~cap->issued; 1300 retain &= ~revoking; 1301 1302 dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n", 1303 inode, cap, cap->session, 1304 ceph_cap_string(held), ceph_cap_string(held & retain), 1305 ceph_cap_string(revoking)); 1306 BUG_ON((retain & CEPH_CAP_PIN) == 0); 1307 1308 arg.session = cap->session; 1309 1310 /* don't release wanted unless we've waited a bit. */ 1311 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 && 1312 time_before(jiffies, ci->i_hold_caps_min)) { 1313 dout(" delaying issued %s -> %s, wanted %s -> %s on send\n", 1314 ceph_cap_string(cap->issued), 1315 ceph_cap_string(cap->issued & retain), 1316 ceph_cap_string(cap->mds_wanted), 1317 ceph_cap_string(want)); 1318 want |= cap->mds_wanted; 1319 retain |= cap->issued; 1320 delayed = 1; 1321 } 1322 ci->i_ceph_flags &= ~(CEPH_I_NODELAY | CEPH_I_FLUSH); 1323 if (want & ~cap->mds_wanted) { 1324 /* user space may open/close single file frequently. 1325 * This avoids droping mds_wanted immediately after 1326 * requesting new mds_wanted. 1327 */ 1328 __cap_set_timeouts(mdsc, ci); 1329 } 1330 1331 cap->issued &= retain; /* drop bits we don't want */ 1332 if (cap->implemented & ~cap->issued) { 1333 /* 1334 * Wake up any waiters on wanted -> needed transition. 1335 * This is due to the weird transition from buffered 1336 * to sync IO... we need to flush dirty pages _before_ 1337 * allowing sync writes to avoid reordering. 1338 */ 1339 wake = 1; 1340 } 1341 cap->implemented &= cap->issued | used; 1342 cap->mds_wanted = want; 1343 1344 arg.ino = ceph_vino(inode).ino; 1345 arg.cid = cap->cap_id; 1346 arg.follows = flushing ? ci->i_head_snapc->seq : 0; 1347 arg.flush_tid = flush_tid; 1348 arg.oldest_flush_tid = oldest_flush_tid; 1349 1350 arg.size = inode->i_size; 1351 ci->i_reported_size = arg.size; 1352 arg.max_size = ci->i_wanted_max_size; 1353 ci->i_requested_max_size = arg.max_size; 1354 1355 if (flushing & CEPH_CAP_XATTR_EXCL) { 1356 __ceph_build_xattrs_blob(ci); 1357 arg.xattr_version = ci->i_xattrs.version; 1358 arg.xattr_buf = ci->i_xattrs.blob; 1359 } else { 1360 arg.xattr_buf = NULL; 1361 } 1362 1363 arg.mtime = timespec64_to_timespec(inode->i_mtime); 1364 arg.atime = timespec64_to_timespec(inode->i_atime); 1365 arg.ctime = timespec64_to_timespec(inode->i_ctime); 1366 1367 arg.op = op; 1368 arg.caps = cap->implemented; 1369 arg.wanted = want; 1370 arg.dirty = flushing; 1371 1372 arg.seq = cap->seq; 1373 arg.issue_seq = cap->issue_seq; 1374 arg.mseq = cap->mseq; 1375 arg.time_warp_seq = ci->i_time_warp_seq; 1376 1377 arg.uid = inode->i_uid; 1378 arg.gid = inode->i_gid; 1379 arg.mode = inode->i_mode; 1380 1381 arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE; 1382 if (list_empty(&ci->i_cap_snaps)) 1383 arg.flags = CEPH_CLIENT_CAPS_NO_CAPSNAP; 1384 else 1385 arg.flags = CEPH_CLIENT_CAPS_PENDING_CAPSNAP; 1386 if (sync) 1387 arg.flags |= CEPH_CLIENT_CAPS_SYNC; 1388 1389 spin_unlock(&ci->i_ceph_lock); 1390 1391 ret = send_cap_msg(&arg); 1392 if (ret < 0) { 1393 dout("error sending cap msg, must requeue %p\n", inode); 1394 delayed = 1; 1395 } 1396 1397 if (wake) 1398 wake_up_all(&ci->i_cap_wq); 1399 1400 return delayed; 1401 } 1402 1403 static inline int __send_flush_snap(struct inode *inode, 1404 struct ceph_mds_session *session, 1405 struct ceph_cap_snap *capsnap, 1406 u32 mseq, u64 oldest_flush_tid) 1407 { 1408 struct cap_msg_args arg; 1409 1410 arg.session = session; 1411 arg.ino = ceph_vino(inode).ino; 1412 arg.cid = 0; 1413 arg.follows = capsnap->follows; 1414 arg.flush_tid = capsnap->cap_flush.tid; 1415 arg.oldest_flush_tid = oldest_flush_tid; 1416 1417 arg.size = capsnap->size; 1418 arg.max_size = 0; 1419 arg.xattr_version = capsnap->xattr_version; 1420 arg.xattr_buf = capsnap->xattr_blob; 1421 1422 arg.atime = capsnap->atime; 1423 arg.mtime = capsnap->mtime; 1424 arg.ctime = capsnap->ctime; 1425 1426 arg.op = CEPH_CAP_OP_FLUSHSNAP; 1427 arg.caps = capsnap->issued; 1428 arg.wanted = 0; 1429 arg.dirty = capsnap->dirty; 1430 1431 arg.seq = 0; 1432 arg.issue_seq = 0; 1433 arg.mseq = mseq; 1434 arg.time_warp_seq = capsnap->time_warp_seq; 1435 1436 arg.uid = capsnap->uid; 1437 arg.gid = capsnap->gid; 1438 arg.mode = capsnap->mode; 1439 1440 arg.inline_data = capsnap->inline_data; 1441 arg.flags = 0; 1442 1443 return send_cap_msg(&arg); 1444 } 1445 1446 /* 1447 * When a snapshot is taken, clients accumulate dirty metadata on 1448 * inodes with capabilities in ceph_cap_snaps to describe the file 1449 * state at the time the snapshot was taken. This must be flushed 1450 * asynchronously back to the MDS once sync writes complete and dirty 1451 * data is written out. 1452 * 1453 * Called under i_ceph_lock. Takes s_mutex as needed. 1454 */ 1455 static void __ceph_flush_snaps(struct ceph_inode_info *ci, 1456 struct ceph_mds_session *session) 1457 __releases(ci->i_ceph_lock) 1458 __acquires(ci->i_ceph_lock) 1459 { 1460 struct inode *inode = &ci->vfs_inode; 1461 struct ceph_mds_client *mdsc = session->s_mdsc; 1462 struct ceph_cap_snap *capsnap; 1463 u64 oldest_flush_tid = 0; 1464 u64 first_tid = 1, last_tid = 0; 1465 1466 dout("__flush_snaps %p session %p\n", inode, session); 1467 1468 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 1469 /* 1470 * we need to wait for sync writes to complete and for dirty 1471 * pages to be written out. 1472 */ 1473 if (capsnap->dirty_pages || capsnap->writing) 1474 break; 1475 1476 /* should be removed by ceph_try_drop_cap_snap() */ 1477 BUG_ON(!capsnap->need_flush); 1478 1479 /* only flush each capsnap once */ 1480 if (capsnap->cap_flush.tid > 0) { 1481 dout(" already flushed %p, skipping\n", capsnap); 1482 continue; 1483 } 1484 1485 spin_lock(&mdsc->cap_dirty_lock); 1486 capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid; 1487 list_add_tail(&capsnap->cap_flush.g_list, 1488 &mdsc->cap_flush_list); 1489 if (oldest_flush_tid == 0) 1490 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 1491 if (list_empty(&ci->i_flushing_item)) { 1492 list_add_tail(&ci->i_flushing_item, 1493 &session->s_cap_flushing); 1494 } 1495 spin_unlock(&mdsc->cap_dirty_lock); 1496 1497 list_add_tail(&capsnap->cap_flush.i_list, 1498 &ci->i_cap_flush_list); 1499 1500 if (first_tid == 1) 1501 first_tid = capsnap->cap_flush.tid; 1502 last_tid = capsnap->cap_flush.tid; 1503 } 1504 1505 ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS; 1506 1507 while (first_tid <= last_tid) { 1508 struct ceph_cap *cap = ci->i_auth_cap; 1509 struct ceph_cap_flush *cf; 1510 int ret; 1511 1512 if (!(cap && cap->session == session)) { 1513 dout("__flush_snaps %p auth cap %p not mds%d, " 1514 "stop\n", inode, cap, session->s_mds); 1515 break; 1516 } 1517 1518 ret = -ENOENT; 1519 list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) { 1520 if (cf->tid >= first_tid) { 1521 ret = 0; 1522 break; 1523 } 1524 } 1525 if (ret < 0) 1526 break; 1527 1528 first_tid = cf->tid + 1; 1529 1530 capsnap = container_of(cf, struct ceph_cap_snap, cap_flush); 1531 refcount_inc(&capsnap->nref); 1532 spin_unlock(&ci->i_ceph_lock); 1533 1534 dout("__flush_snaps %p capsnap %p tid %llu %s\n", 1535 inode, capsnap, cf->tid, ceph_cap_string(capsnap->dirty)); 1536 1537 ret = __send_flush_snap(inode, session, capsnap, cap->mseq, 1538 oldest_flush_tid); 1539 if (ret < 0) { 1540 pr_err("__flush_snaps: error sending cap flushsnap, " 1541 "ino (%llx.%llx) tid %llu follows %llu\n", 1542 ceph_vinop(inode), cf->tid, capsnap->follows); 1543 } 1544 1545 ceph_put_cap_snap(capsnap); 1546 spin_lock(&ci->i_ceph_lock); 1547 } 1548 } 1549 1550 void ceph_flush_snaps(struct ceph_inode_info *ci, 1551 struct ceph_mds_session **psession) 1552 { 1553 struct inode *inode = &ci->vfs_inode; 1554 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 1555 struct ceph_mds_session *session = NULL; 1556 int mds; 1557 1558 dout("ceph_flush_snaps %p\n", inode); 1559 if (psession) 1560 session = *psession; 1561 retry: 1562 spin_lock(&ci->i_ceph_lock); 1563 if (!(ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)) { 1564 dout(" no capsnap needs flush, doing nothing\n"); 1565 goto out; 1566 } 1567 if (!ci->i_auth_cap) { 1568 dout(" no auth cap (migrating?), doing nothing\n"); 1569 goto out; 1570 } 1571 1572 mds = ci->i_auth_cap->session->s_mds; 1573 if (session && session->s_mds != mds) { 1574 dout(" oops, wrong session %p mutex\n", session); 1575 mutex_unlock(&session->s_mutex); 1576 ceph_put_mds_session(session); 1577 session = NULL; 1578 } 1579 if (!session) { 1580 spin_unlock(&ci->i_ceph_lock); 1581 mutex_lock(&mdsc->mutex); 1582 session = __ceph_lookup_mds_session(mdsc, mds); 1583 mutex_unlock(&mdsc->mutex); 1584 if (session) { 1585 dout(" inverting session/ino locks on %p\n", session); 1586 mutex_lock(&session->s_mutex); 1587 } 1588 goto retry; 1589 } 1590 1591 // make sure flushsnap messages are sent in proper order. 1592 if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { 1593 __kick_flushing_caps(mdsc, session, ci, 0); 1594 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 1595 } 1596 1597 __ceph_flush_snaps(ci, session); 1598 out: 1599 spin_unlock(&ci->i_ceph_lock); 1600 1601 if (psession) { 1602 *psession = session; 1603 } else if (session) { 1604 mutex_unlock(&session->s_mutex); 1605 ceph_put_mds_session(session); 1606 } 1607 /* we flushed them all; remove this inode from the queue */ 1608 spin_lock(&mdsc->snap_flush_lock); 1609 list_del_init(&ci->i_snap_flush_item); 1610 spin_unlock(&mdsc->snap_flush_lock); 1611 } 1612 1613 /* 1614 * Mark caps dirty. If inode is newly dirty, return the dirty flags. 1615 * Caller is then responsible for calling __mark_inode_dirty with the 1616 * returned flags value. 1617 */ 1618 int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, 1619 struct ceph_cap_flush **pcf) 1620 { 1621 struct ceph_mds_client *mdsc = 1622 ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; 1623 struct inode *inode = &ci->vfs_inode; 1624 int was = ci->i_dirty_caps; 1625 int dirty = 0; 1626 1627 if (!ci->i_auth_cap) { 1628 pr_warn("__mark_dirty_caps %p %llx mask %s, " 1629 "but no auth cap (session was closed?)\n", 1630 inode, ceph_ino(inode), ceph_cap_string(mask)); 1631 return 0; 1632 } 1633 1634 dout("__mark_dirty_caps %p %s dirty %s -> %s\n", &ci->vfs_inode, 1635 ceph_cap_string(mask), ceph_cap_string(was), 1636 ceph_cap_string(was | mask)); 1637 ci->i_dirty_caps |= mask; 1638 if (was == 0) { 1639 WARN_ON_ONCE(ci->i_prealloc_cap_flush); 1640 swap(ci->i_prealloc_cap_flush, *pcf); 1641 1642 if (!ci->i_head_snapc) { 1643 WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem)); 1644 ci->i_head_snapc = ceph_get_snap_context( 1645 ci->i_snap_realm->cached_context); 1646 } 1647 dout(" inode %p now dirty snapc %p auth cap %p\n", 1648 &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); 1649 BUG_ON(!list_empty(&ci->i_dirty_item)); 1650 spin_lock(&mdsc->cap_dirty_lock); 1651 list_add(&ci->i_dirty_item, &mdsc->cap_dirty); 1652 spin_unlock(&mdsc->cap_dirty_lock); 1653 if (ci->i_flushing_caps == 0) { 1654 ihold(inode); 1655 dirty |= I_DIRTY_SYNC; 1656 } 1657 } else { 1658 WARN_ON_ONCE(!ci->i_prealloc_cap_flush); 1659 } 1660 BUG_ON(list_empty(&ci->i_dirty_item)); 1661 if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) && 1662 (mask & CEPH_CAP_FILE_BUFFER)) 1663 dirty |= I_DIRTY_DATASYNC; 1664 __cap_delay_requeue(mdsc, ci); 1665 return dirty; 1666 } 1667 1668 struct ceph_cap_flush *ceph_alloc_cap_flush(void) 1669 { 1670 return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL); 1671 } 1672 1673 void ceph_free_cap_flush(struct ceph_cap_flush *cf) 1674 { 1675 if (cf) 1676 kmem_cache_free(ceph_cap_flush_cachep, cf); 1677 } 1678 1679 static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc) 1680 { 1681 if (!list_empty(&mdsc->cap_flush_list)) { 1682 struct ceph_cap_flush *cf = 1683 list_first_entry(&mdsc->cap_flush_list, 1684 struct ceph_cap_flush, g_list); 1685 return cf->tid; 1686 } 1687 return 0; 1688 } 1689 1690 /* 1691 * Remove cap_flush from the mdsc's or inode's flushing cap list. 1692 * Return true if caller needs to wake up flush waiters. 1693 */ 1694 static bool __finish_cap_flush(struct ceph_mds_client *mdsc, 1695 struct ceph_inode_info *ci, 1696 struct ceph_cap_flush *cf) 1697 { 1698 struct ceph_cap_flush *prev; 1699 bool wake = cf->wake; 1700 if (mdsc) { 1701 /* are there older pending cap flushes? */ 1702 if (wake && cf->g_list.prev != &mdsc->cap_flush_list) { 1703 prev = list_prev_entry(cf, g_list); 1704 prev->wake = true; 1705 wake = false; 1706 } 1707 list_del(&cf->g_list); 1708 } else if (ci) { 1709 if (wake && cf->i_list.prev != &ci->i_cap_flush_list) { 1710 prev = list_prev_entry(cf, i_list); 1711 prev->wake = true; 1712 wake = false; 1713 } 1714 list_del(&cf->i_list); 1715 } else { 1716 BUG_ON(1); 1717 } 1718 return wake; 1719 } 1720 1721 /* 1722 * Add dirty inode to the flushing list. Assigned a seq number so we 1723 * can wait for caps to flush without starving. 1724 * 1725 * Called under i_ceph_lock. 1726 */ 1727 static int __mark_caps_flushing(struct inode *inode, 1728 struct ceph_mds_session *session, bool wake, 1729 u64 *flush_tid, u64 *oldest_flush_tid) 1730 { 1731 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 1732 struct ceph_inode_info *ci = ceph_inode(inode); 1733 struct ceph_cap_flush *cf = NULL; 1734 int flushing; 1735 1736 BUG_ON(ci->i_dirty_caps == 0); 1737 BUG_ON(list_empty(&ci->i_dirty_item)); 1738 BUG_ON(!ci->i_prealloc_cap_flush); 1739 1740 flushing = ci->i_dirty_caps; 1741 dout("__mark_caps_flushing flushing %s, flushing_caps %s -> %s\n", 1742 ceph_cap_string(flushing), 1743 ceph_cap_string(ci->i_flushing_caps), 1744 ceph_cap_string(ci->i_flushing_caps | flushing)); 1745 ci->i_flushing_caps |= flushing; 1746 ci->i_dirty_caps = 0; 1747 dout(" inode %p now !dirty\n", inode); 1748 1749 swap(cf, ci->i_prealloc_cap_flush); 1750 cf->caps = flushing; 1751 cf->wake = wake; 1752 1753 spin_lock(&mdsc->cap_dirty_lock); 1754 list_del_init(&ci->i_dirty_item); 1755 1756 cf->tid = ++mdsc->last_cap_flush_tid; 1757 list_add_tail(&cf->g_list, &mdsc->cap_flush_list); 1758 *oldest_flush_tid = __get_oldest_flush_tid(mdsc); 1759 1760 if (list_empty(&ci->i_flushing_item)) { 1761 list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); 1762 mdsc->num_cap_flushing++; 1763 } 1764 spin_unlock(&mdsc->cap_dirty_lock); 1765 1766 list_add_tail(&cf->i_list, &ci->i_cap_flush_list); 1767 1768 *flush_tid = cf->tid; 1769 return flushing; 1770 } 1771 1772 /* 1773 * try to invalidate mapping pages without blocking. 1774 */ 1775 static int try_nonblocking_invalidate(struct inode *inode) 1776 { 1777 struct ceph_inode_info *ci = ceph_inode(inode); 1778 u32 invalidating_gen = ci->i_rdcache_gen; 1779 1780 spin_unlock(&ci->i_ceph_lock); 1781 invalidate_mapping_pages(&inode->i_data, 0, -1); 1782 spin_lock(&ci->i_ceph_lock); 1783 1784 if (inode->i_data.nrpages == 0 && 1785 invalidating_gen == ci->i_rdcache_gen) { 1786 /* success. */ 1787 dout("try_nonblocking_invalidate %p success\n", inode); 1788 /* save any racing async invalidate some trouble */ 1789 ci->i_rdcache_revoking = ci->i_rdcache_gen - 1; 1790 return 0; 1791 } 1792 dout("try_nonblocking_invalidate %p failed\n", inode); 1793 return -1; 1794 } 1795 1796 bool __ceph_should_report_size(struct ceph_inode_info *ci) 1797 { 1798 loff_t size = ci->vfs_inode.i_size; 1799 /* mds will adjust max size according to the reported size */ 1800 if (ci->i_flushing_caps & CEPH_CAP_FILE_WR) 1801 return false; 1802 if (size >= ci->i_max_size) 1803 return true; 1804 /* half of previous max_size increment has been used */ 1805 if (ci->i_max_size > ci->i_reported_size && 1806 (size << 1) >= ci->i_max_size + ci->i_reported_size) 1807 return true; 1808 return false; 1809 } 1810 1811 /* 1812 * Swiss army knife function to examine currently used and wanted 1813 * versus held caps. Release, flush, ack revoked caps to mds as 1814 * appropriate. 1815 * 1816 * CHECK_CAPS_NODELAY - caller is delayed work and we should not delay 1817 * cap release further. 1818 * CHECK_CAPS_AUTHONLY - we should only check the auth cap 1819 * CHECK_CAPS_FLUSH - we should flush any dirty caps immediately, without 1820 * further delay. 1821 */ 1822 void ceph_check_caps(struct ceph_inode_info *ci, int flags, 1823 struct ceph_mds_session *session) 1824 { 1825 struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode); 1826 struct ceph_mds_client *mdsc = fsc->mdsc; 1827 struct inode *inode = &ci->vfs_inode; 1828 struct ceph_cap *cap; 1829 u64 flush_tid, oldest_flush_tid; 1830 int file_wanted, used, cap_used; 1831 int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ 1832 int issued, implemented, want, retain, revoking, flushing = 0; 1833 int mds = -1; /* keep track of how far we've gone through i_caps list 1834 to avoid an infinite loop on retry */ 1835 struct rb_node *p; 1836 int delayed = 0, sent = 0; 1837 bool no_delay = flags & CHECK_CAPS_NODELAY; 1838 bool queue_invalidate = false; 1839 bool tried_invalidate = false; 1840 1841 /* if we are unmounting, flush any unused caps immediately. */ 1842 if (mdsc->stopping) 1843 no_delay = true; 1844 1845 spin_lock(&ci->i_ceph_lock); 1846 1847 if (ci->i_ceph_flags & CEPH_I_FLUSH) 1848 flags |= CHECK_CAPS_FLUSH; 1849 1850 if (!(flags & CHECK_CAPS_AUTHONLY) || 1851 (ci->i_auth_cap && __ceph_is_single_caps(ci))) 1852 __cap_delay_cancel(mdsc, ci); 1853 1854 goto retry_locked; 1855 retry: 1856 spin_lock(&ci->i_ceph_lock); 1857 retry_locked: 1858 file_wanted = __ceph_caps_file_wanted(ci); 1859 used = __ceph_caps_used(ci); 1860 issued = __ceph_caps_issued(ci, &implemented); 1861 revoking = implemented & ~issued; 1862 1863 want = file_wanted; 1864 retain = file_wanted | used | CEPH_CAP_PIN; 1865 if (!mdsc->stopping && inode->i_nlink > 0) { 1866 if (file_wanted) { 1867 retain |= CEPH_CAP_ANY; /* be greedy */ 1868 } else if (S_ISDIR(inode->i_mode) && 1869 (issued & CEPH_CAP_FILE_SHARED) && 1870 __ceph_dir_is_complete(ci)) { 1871 /* 1872 * If a directory is complete, we want to keep 1873 * the exclusive cap. So that MDS does not end up 1874 * revoking the shared cap on every create/unlink 1875 * operation. 1876 */ 1877 want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL; 1878 retain |= want; 1879 } else { 1880 1881 retain |= CEPH_CAP_ANY_SHARED; 1882 /* 1883 * keep RD only if we didn't have the file open RW, 1884 * because then the mds would revoke it anyway to 1885 * journal max_size=0. 1886 */ 1887 if (ci->i_max_size == 0) 1888 retain |= CEPH_CAP_ANY_RD; 1889 } 1890 } 1891 1892 dout("check_caps %p file_want %s used %s dirty %s flushing %s" 1893 " issued %s revoking %s retain %s %s%s%s\n", inode, 1894 ceph_cap_string(file_wanted), 1895 ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps), 1896 ceph_cap_string(ci->i_flushing_caps), 1897 ceph_cap_string(issued), ceph_cap_string(revoking), 1898 ceph_cap_string(retain), 1899 (flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "", 1900 (flags & CHECK_CAPS_NODELAY) ? " NODELAY" : "", 1901 (flags & CHECK_CAPS_FLUSH) ? " FLUSH" : ""); 1902 1903 /* 1904 * If we no longer need to hold onto old our caps, and we may 1905 * have cached pages, but don't want them, then try to invalidate. 1906 * If we fail, it's because pages are locked.... try again later. 1907 */ 1908 if ((!no_delay || mdsc->stopping) && 1909 !S_ISDIR(inode->i_mode) && /* ignore readdir cache */ 1910 !(ci->i_wb_ref || ci->i_wrbuffer_ref) && /* no dirty pages... */ 1911 inode->i_data.nrpages && /* have cached pages */ 1912 (revoking & (CEPH_CAP_FILE_CACHE| 1913 CEPH_CAP_FILE_LAZYIO)) && /* or revoking cache */ 1914 !tried_invalidate) { 1915 dout("check_caps trying to invalidate on %p\n", inode); 1916 if (try_nonblocking_invalidate(inode) < 0) { 1917 dout("check_caps queuing invalidate\n"); 1918 queue_invalidate = true; 1919 ci->i_rdcache_revoking = ci->i_rdcache_gen; 1920 } 1921 tried_invalidate = true; 1922 goto retry_locked; 1923 } 1924 1925 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 1926 cap = rb_entry(p, struct ceph_cap, ci_node); 1927 1928 /* avoid looping forever */ 1929 if (mds >= cap->mds || 1930 ((flags & CHECK_CAPS_AUTHONLY) && cap != ci->i_auth_cap)) 1931 continue; 1932 1933 /* NOTE: no side-effects allowed, until we take s_mutex */ 1934 1935 cap_used = used; 1936 if (ci->i_auth_cap && cap != ci->i_auth_cap) 1937 cap_used &= ~ci->i_auth_cap->issued; 1938 1939 revoking = cap->implemented & ~cap->issued; 1940 dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n", 1941 cap->mds, cap, ceph_cap_string(cap_used), 1942 ceph_cap_string(cap->issued), 1943 ceph_cap_string(cap->implemented), 1944 ceph_cap_string(revoking)); 1945 1946 if (cap == ci->i_auth_cap && 1947 (cap->issued & CEPH_CAP_FILE_WR)) { 1948 /* request larger max_size from MDS? */ 1949 if (ci->i_wanted_max_size > ci->i_max_size && 1950 ci->i_wanted_max_size > ci->i_requested_max_size) { 1951 dout("requesting new max_size\n"); 1952 goto ack; 1953 } 1954 1955 /* approaching file_max? */ 1956 if (__ceph_should_report_size(ci)) { 1957 dout("i_size approaching max_size\n"); 1958 goto ack; 1959 } 1960 } 1961 /* flush anything dirty? */ 1962 if (cap == ci->i_auth_cap) { 1963 if ((flags & CHECK_CAPS_FLUSH) && ci->i_dirty_caps) { 1964 dout("flushing dirty caps\n"); 1965 goto ack; 1966 } 1967 if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) { 1968 dout("flushing snap caps\n"); 1969 goto ack; 1970 } 1971 } 1972 1973 /* completed revocation? going down and there are no caps? */ 1974 if (revoking && (revoking & cap_used) == 0) { 1975 dout("completed revocation of %s\n", 1976 ceph_cap_string(cap->implemented & ~cap->issued)); 1977 goto ack; 1978 } 1979 1980 /* want more caps from mds? */ 1981 if (want & ~(cap->mds_wanted | cap->issued)) 1982 goto ack; 1983 1984 /* things we might delay */ 1985 if ((cap->issued & ~retain) == 0 && 1986 cap->mds_wanted == want) 1987 continue; /* nope, all good */ 1988 1989 if (no_delay) 1990 goto ack; 1991 1992 /* delay? */ 1993 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 && 1994 time_before(jiffies, ci->i_hold_caps_max)) { 1995 dout(" delaying issued %s -> %s, wanted %s -> %s\n", 1996 ceph_cap_string(cap->issued), 1997 ceph_cap_string(cap->issued & retain), 1998 ceph_cap_string(cap->mds_wanted), 1999 ceph_cap_string(want)); 2000 delayed++; 2001 continue; 2002 } 2003 2004 ack: 2005 if (ci->i_ceph_flags & CEPH_I_NOFLUSH) { 2006 dout(" skipping %p I_NOFLUSH set\n", inode); 2007 continue; 2008 } 2009 2010 if (session && session != cap->session) { 2011 dout("oops, wrong session %p mutex\n", session); 2012 mutex_unlock(&session->s_mutex); 2013 session = NULL; 2014 } 2015 if (!session) { 2016 session = cap->session; 2017 if (mutex_trylock(&session->s_mutex) == 0) { 2018 dout("inverting session/ino locks on %p\n", 2019 session); 2020 spin_unlock(&ci->i_ceph_lock); 2021 if (took_snap_rwsem) { 2022 up_read(&mdsc->snap_rwsem); 2023 took_snap_rwsem = 0; 2024 } 2025 mutex_lock(&session->s_mutex); 2026 goto retry; 2027 } 2028 } 2029 2030 /* kick flushing and flush snaps before sending normal 2031 * cap message */ 2032 if (cap == ci->i_auth_cap && 2033 (ci->i_ceph_flags & 2034 (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) { 2035 if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { 2036 __kick_flushing_caps(mdsc, session, ci, 0); 2037 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 2038 } 2039 if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) 2040 __ceph_flush_snaps(ci, session); 2041 2042 goto retry_locked; 2043 } 2044 2045 /* take snap_rwsem after session mutex */ 2046 if (!took_snap_rwsem) { 2047 if (down_read_trylock(&mdsc->snap_rwsem) == 0) { 2048 dout("inverting snap/in locks on %p\n", 2049 inode); 2050 spin_unlock(&ci->i_ceph_lock); 2051 down_read(&mdsc->snap_rwsem); 2052 took_snap_rwsem = 1; 2053 goto retry; 2054 } 2055 took_snap_rwsem = 1; 2056 } 2057 2058 if (cap == ci->i_auth_cap && ci->i_dirty_caps) { 2059 flushing = __mark_caps_flushing(inode, session, false, 2060 &flush_tid, 2061 &oldest_flush_tid); 2062 } else { 2063 flushing = 0; 2064 flush_tid = 0; 2065 spin_lock(&mdsc->cap_dirty_lock); 2066 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 2067 spin_unlock(&mdsc->cap_dirty_lock); 2068 } 2069 2070 mds = cap->mds; /* remember mds, so we don't repeat */ 2071 sent++; 2072 2073 /* __send_cap drops i_ceph_lock */ 2074 delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, false, 2075 cap_used, want, retain, flushing, 2076 flush_tid, oldest_flush_tid); 2077 goto retry; /* retake i_ceph_lock and restart our cap scan. */ 2078 } 2079 2080 /* Reschedule delayed caps release if we delayed anything */ 2081 if (delayed) 2082 __cap_delay_requeue(mdsc, ci); 2083 2084 spin_unlock(&ci->i_ceph_lock); 2085 2086 if (queue_invalidate) 2087 ceph_queue_invalidate(inode); 2088 2089 if (session) 2090 mutex_unlock(&session->s_mutex); 2091 if (took_snap_rwsem) 2092 up_read(&mdsc->snap_rwsem); 2093 } 2094 2095 /* 2096 * Try to flush dirty caps back to the auth mds. 2097 */ 2098 static int try_flush_caps(struct inode *inode, u64 *ptid) 2099 { 2100 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 2101 struct ceph_inode_info *ci = ceph_inode(inode); 2102 struct ceph_mds_session *session = NULL; 2103 int flushing = 0; 2104 u64 flush_tid = 0, oldest_flush_tid = 0; 2105 2106 retry: 2107 spin_lock(&ci->i_ceph_lock); 2108 if (ci->i_ceph_flags & CEPH_I_NOFLUSH) { 2109 spin_unlock(&ci->i_ceph_lock); 2110 dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode); 2111 goto out; 2112 } 2113 if (ci->i_dirty_caps && ci->i_auth_cap) { 2114 struct ceph_cap *cap = ci->i_auth_cap; 2115 int used = __ceph_caps_used(ci); 2116 int want = __ceph_caps_wanted(ci); 2117 int delayed; 2118 2119 if (!session || session != cap->session) { 2120 spin_unlock(&ci->i_ceph_lock); 2121 if (session) 2122 mutex_unlock(&session->s_mutex); 2123 session = cap->session; 2124 mutex_lock(&session->s_mutex); 2125 goto retry; 2126 } 2127 if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) { 2128 spin_unlock(&ci->i_ceph_lock); 2129 goto out; 2130 } 2131 2132 flushing = __mark_caps_flushing(inode, session, true, 2133 &flush_tid, &oldest_flush_tid); 2134 2135 /* __send_cap drops i_ceph_lock */ 2136 delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, true, 2137 used, want, (cap->issued | cap->implemented), 2138 flushing, flush_tid, oldest_flush_tid); 2139 2140 if (delayed) { 2141 spin_lock(&ci->i_ceph_lock); 2142 __cap_delay_requeue(mdsc, ci); 2143 spin_unlock(&ci->i_ceph_lock); 2144 } 2145 } else { 2146 if (!list_empty(&ci->i_cap_flush_list)) { 2147 struct ceph_cap_flush *cf = 2148 list_last_entry(&ci->i_cap_flush_list, 2149 struct ceph_cap_flush, i_list); 2150 cf->wake = true; 2151 flush_tid = cf->tid; 2152 } 2153 flushing = ci->i_flushing_caps; 2154 spin_unlock(&ci->i_ceph_lock); 2155 } 2156 out: 2157 if (session) 2158 mutex_unlock(&session->s_mutex); 2159 2160 *ptid = flush_tid; 2161 return flushing; 2162 } 2163 2164 /* 2165 * Return true if we've flushed caps through the given flush_tid. 2166 */ 2167 static int caps_are_flushed(struct inode *inode, u64 flush_tid) 2168 { 2169 struct ceph_inode_info *ci = ceph_inode(inode); 2170 int ret = 1; 2171 2172 spin_lock(&ci->i_ceph_lock); 2173 if (!list_empty(&ci->i_cap_flush_list)) { 2174 struct ceph_cap_flush * cf = 2175 list_first_entry(&ci->i_cap_flush_list, 2176 struct ceph_cap_flush, i_list); 2177 if (cf->tid <= flush_tid) 2178 ret = 0; 2179 } 2180 spin_unlock(&ci->i_ceph_lock); 2181 return ret; 2182 } 2183 2184 /* 2185 * wait for any unsafe requests to complete. 2186 */ 2187 static int unsafe_request_wait(struct inode *inode) 2188 { 2189 struct ceph_inode_info *ci = ceph_inode(inode); 2190 struct ceph_mds_request *req1 = NULL, *req2 = NULL; 2191 int ret, err = 0; 2192 2193 spin_lock(&ci->i_unsafe_lock); 2194 if (S_ISDIR(inode->i_mode) && !list_empty(&ci->i_unsafe_dirops)) { 2195 req1 = list_last_entry(&ci->i_unsafe_dirops, 2196 struct ceph_mds_request, 2197 r_unsafe_dir_item); 2198 ceph_mdsc_get_request(req1); 2199 } 2200 if (!list_empty(&ci->i_unsafe_iops)) { 2201 req2 = list_last_entry(&ci->i_unsafe_iops, 2202 struct ceph_mds_request, 2203 r_unsafe_target_item); 2204 ceph_mdsc_get_request(req2); 2205 } 2206 spin_unlock(&ci->i_unsafe_lock); 2207 2208 dout("unsafe_request_wait %p wait on tid %llu %llu\n", 2209 inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL); 2210 if (req1) { 2211 ret = !wait_for_completion_timeout(&req1->r_safe_completion, 2212 ceph_timeout_jiffies(req1->r_timeout)); 2213 if (ret) 2214 err = -EIO; 2215 ceph_mdsc_put_request(req1); 2216 } 2217 if (req2) { 2218 ret = !wait_for_completion_timeout(&req2->r_safe_completion, 2219 ceph_timeout_jiffies(req2->r_timeout)); 2220 if (ret) 2221 err = -EIO; 2222 ceph_mdsc_put_request(req2); 2223 } 2224 return err; 2225 } 2226 2227 int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) 2228 { 2229 struct inode *inode = file->f_mapping->host; 2230 struct ceph_inode_info *ci = ceph_inode(inode); 2231 u64 flush_tid; 2232 int ret; 2233 int dirty; 2234 2235 dout("fsync %p%s\n", inode, datasync ? " datasync" : ""); 2236 2237 ret = file_write_and_wait_range(file, start, end); 2238 if (ret < 0) 2239 goto out; 2240 2241 if (datasync) 2242 goto out; 2243 2244 inode_lock(inode); 2245 2246 dirty = try_flush_caps(inode, &flush_tid); 2247 dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); 2248 2249 ret = unsafe_request_wait(inode); 2250 2251 /* 2252 * only wait on non-file metadata writeback (the mds 2253 * can recover size and mtime, so we don't need to 2254 * wait for that) 2255 */ 2256 if (!ret && (dirty & ~CEPH_CAP_ANY_FILE_WR)) { 2257 ret = wait_event_interruptible(ci->i_cap_wq, 2258 caps_are_flushed(inode, flush_tid)); 2259 } 2260 inode_unlock(inode); 2261 out: 2262 dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret); 2263 return ret; 2264 } 2265 2266 /* 2267 * Flush any dirty caps back to the mds. If we aren't asked to wait, 2268 * queue inode for flush but don't do so immediately, because we can 2269 * get by with fewer MDS messages if we wait for data writeback to 2270 * complete first. 2271 */ 2272 int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) 2273 { 2274 struct ceph_inode_info *ci = ceph_inode(inode); 2275 u64 flush_tid; 2276 int err = 0; 2277 int dirty; 2278 int wait = (wbc->sync_mode == WB_SYNC_ALL && !wbc->for_sync); 2279 2280 dout("write_inode %p wait=%d\n", inode, wait); 2281 if (wait) { 2282 dirty = try_flush_caps(inode, &flush_tid); 2283 if (dirty) 2284 err = wait_event_interruptible(ci->i_cap_wq, 2285 caps_are_flushed(inode, flush_tid)); 2286 } else { 2287 struct ceph_mds_client *mdsc = 2288 ceph_sb_to_client(inode->i_sb)->mdsc; 2289 2290 spin_lock(&ci->i_ceph_lock); 2291 if (__ceph_caps_dirty(ci)) 2292 __cap_delay_requeue_front(mdsc, ci); 2293 spin_unlock(&ci->i_ceph_lock); 2294 } 2295 return err; 2296 } 2297 2298 static void __kick_flushing_caps(struct ceph_mds_client *mdsc, 2299 struct ceph_mds_session *session, 2300 struct ceph_inode_info *ci, 2301 u64 oldest_flush_tid) 2302 __releases(ci->i_ceph_lock) 2303 __acquires(ci->i_ceph_lock) 2304 { 2305 struct inode *inode = &ci->vfs_inode; 2306 struct ceph_cap *cap; 2307 struct ceph_cap_flush *cf; 2308 int ret; 2309 u64 first_tid = 0; 2310 2311 list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) { 2312 if (cf->tid < first_tid) 2313 continue; 2314 2315 cap = ci->i_auth_cap; 2316 if (!(cap && cap->session == session)) { 2317 pr_err("%p auth cap %p not mds%d ???\n", 2318 inode, cap, session->s_mds); 2319 break; 2320 } 2321 2322 first_tid = cf->tid + 1; 2323 2324 if (cf->caps) { 2325 dout("kick_flushing_caps %p cap %p tid %llu %s\n", 2326 inode, cap, cf->tid, ceph_cap_string(cf->caps)); 2327 ci->i_ceph_flags |= CEPH_I_NODELAY; 2328 ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, 2329 false, __ceph_caps_used(ci), 2330 __ceph_caps_wanted(ci), 2331 cap->issued | cap->implemented, 2332 cf->caps, cf->tid, oldest_flush_tid); 2333 if (ret) { 2334 pr_err("kick_flushing_caps: error sending " 2335 "cap flush, ino (%llx.%llx) " 2336 "tid %llu flushing %s\n", 2337 ceph_vinop(inode), cf->tid, 2338 ceph_cap_string(cf->caps)); 2339 } 2340 } else { 2341 struct ceph_cap_snap *capsnap = 2342 container_of(cf, struct ceph_cap_snap, 2343 cap_flush); 2344 dout("kick_flushing_caps %p capsnap %p tid %llu %s\n", 2345 inode, capsnap, cf->tid, 2346 ceph_cap_string(capsnap->dirty)); 2347 2348 refcount_inc(&capsnap->nref); 2349 spin_unlock(&ci->i_ceph_lock); 2350 2351 ret = __send_flush_snap(inode, session, capsnap, cap->mseq, 2352 oldest_flush_tid); 2353 if (ret < 0) { 2354 pr_err("kick_flushing_caps: error sending " 2355 "cap flushsnap, ino (%llx.%llx) " 2356 "tid %llu follows %llu\n", 2357 ceph_vinop(inode), cf->tid, 2358 capsnap->follows); 2359 } 2360 2361 ceph_put_cap_snap(capsnap); 2362 } 2363 2364 spin_lock(&ci->i_ceph_lock); 2365 } 2366 } 2367 2368 void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, 2369 struct ceph_mds_session *session) 2370 { 2371 struct ceph_inode_info *ci; 2372 struct ceph_cap *cap; 2373 u64 oldest_flush_tid; 2374 2375 dout("early_kick_flushing_caps mds%d\n", session->s_mds); 2376 2377 spin_lock(&mdsc->cap_dirty_lock); 2378 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 2379 spin_unlock(&mdsc->cap_dirty_lock); 2380 2381 list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { 2382 spin_lock(&ci->i_ceph_lock); 2383 cap = ci->i_auth_cap; 2384 if (!(cap && cap->session == session)) { 2385 pr_err("%p auth cap %p not mds%d ???\n", 2386 &ci->vfs_inode, cap, session->s_mds); 2387 spin_unlock(&ci->i_ceph_lock); 2388 continue; 2389 } 2390 2391 2392 /* 2393 * if flushing caps were revoked, we re-send the cap flush 2394 * in client reconnect stage. This guarantees MDS * processes 2395 * the cap flush message before issuing the flushing caps to 2396 * other client. 2397 */ 2398 if ((cap->issued & ci->i_flushing_caps) != 2399 ci->i_flushing_caps) { 2400 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 2401 __kick_flushing_caps(mdsc, session, ci, 2402 oldest_flush_tid); 2403 } else { 2404 ci->i_ceph_flags |= CEPH_I_KICK_FLUSH; 2405 } 2406 2407 spin_unlock(&ci->i_ceph_lock); 2408 } 2409 } 2410 2411 void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, 2412 struct ceph_mds_session *session) 2413 { 2414 struct ceph_inode_info *ci; 2415 struct ceph_cap *cap; 2416 u64 oldest_flush_tid; 2417 2418 dout("kick_flushing_caps mds%d\n", session->s_mds); 2419 2420 spin_lock(&mdsc->cap_dirty_lock); 2421 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 2422 spin_unlock(&mdsc->cap_dirty_lock); 2423 2424 list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { 2425 spin_lock(&ci->i_ceph_lock); 2426 cap = ci->i_auth_cap; 2427 if (!(cap && cap->session == session)) { 2428 pr_err("%p auth cap %p not mds%d ???\n", 2429 &ci->vfs_inode, cap, session->s_mds); 2430 spin_unlock(&ci->i_ceph_lock); 2431 continue; 2432 } 2433 if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { 2434 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 2435 __kick_flushing_caps(mdsc, session, ci, 2436 oldest_flush_tid); 2437 } 2438 spin_unlock(&ci->i_ceph_lock); 2439 } 2440 } 2441 2442 static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, 2443 struct ceph_mds_session *session, 2444 struct inode *inode) 2445 __releases(ci->i_ceph_lock) 2446 { 2447 struct ceph_inode_info *ci = ceph_inode(inode); 2448 struct ceph_cap *cap; 2449 2450 cap = ci->i_auth_cap; 2451 dout("kick_flushing_inode_caps %p flushing %s\n", inode, 2452 ceph_cap_string(ci->i_flushing_caps)); 2453 2454 if (!list_empty(&ci->i_cap_flush_list)) { 2455 u64 oldest_flush_tid; 2456 spin_lock(&mdsc->cap_dirty_lock); 2457 list_move_tail(&ci->i_flushing_item, 2458 &cap->session->s_cap_flushing); 2459 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 2460 spin_unlock(&mdsc->cap_dirty_lock); 2461 2462 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 2463 __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid); 2464 spin_unlock(&ci->i_ceph_lock); 2465 } else { 2466 spin_unlock(&ci->i_ceph_lock); 2467 } 2468 } 2469 2470 2471 /* 2472 * Take references to capabilities we hold, so that we don't release 2473 * them to the MDS prematurely. 2474 * 2475 * Protected by i_ceph_lock. 2476 */ 2477 static void __take_cap_refs(struct ceph_inode_info *ci, int got, 2478 bool snap_rwsem_locked) 2479 { 2480 if (got & CEPH_CAP_PIN) 2481 ci->i_pin_ref++; 2482 if (got & CEPH_CAP_FILE_RD) 2483 ci->i_rd_ref++; 2484 if (got & CEPH_CAP_FILE_CACHE) 2485 ci->i_rdcache_ref++; 2486 if (got & CEPH_CAP_FILE_WR) { 2487 if (ci->i_wr_ref == 0 && !ci->i_head_snapc) { 2488 BUG_ON(!snap_rwsem_locked); 2489 ci->i_head_snapc = ceph_get_snap_context( 2490 ci->i_snap_realm->cached_context); 2491 } 2492 ci->i_wr_ref++; 2493 } 2494 if (got & CEPH_CAP_FILE_BUFFER) { 2495 if (ci->i_wb_ref == 0) 2496 ihold(&ci->vfs_inode); 2497 ci->i_wb_ref++; 2498 dout("__take_cap_refs %p wb %d -> %d (?)\n", 2499 &ci->vfs_inode, ci->i_wb_ref-1, ci->i_wb_ref); 2500 } 2501 } 2502 2503 /* 2504 * Try to grab cap references. Specify those refs we @want, and the 2505 * minimal set we @need. Also include the larger offset we are writing 2506 * to (when applicable), and check against max_size here as well. 2507 * Note that caller is responsible for ensuring max_size increases are 2508 * requested from the MDS. 2509 */ 2510 static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, 2511 loff_t endoff, bool nonblock, int *got, int *err) 2512 { 2513 struct inode *inode = &ci->vfs_inode; 2514 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 2515 int ret = 0; 2516 int have, implemented; 2517 int file_wanted; 2518 bool snap_rwsem_locked = false; 2519 2520 dout("get_cap_refs %p need %s want %s\n", inode, 2521 ceph_cap_string(need), ceph_cap_string(want)); 2522 2523 again: 2524 spin_lock(&ci->i_ceph_lock); 2525 2526 /* make sure file is actually open */ 2527 file_wanted = __ceph_caps_file_wanted(ci); 2528 if ((file_wanted & need) != need) { 2529 dout("try_get_cap_refs need %s file_wanted %s, EBADF\n", 2530 ceph_cap_string(need), ceph_cap_string(file_wanted)); 2531 *err = -EBADF; 2532 ret = 1; 2533 goto out_unlock; 2534 } 2535 2536 /* finish pending truncate */ 2537 while (ci->i_truncate_pending) { 2538 spin_unlock(&ci->i_ceph_lock); 2539 if (snap_rwsem_locked) { 2540 up_read(&mdsc->snap_rwsem); 2541 snap_rwsem_locked = false; 2542 } 2543 __ceph_do_pending_vmtruncate(inode); 2544 spin_lock(&ci->i_ceph_lock); 2545 } 2546 2547 have = __ceph_caps_issued(ci, &implemented); 2548 2549 if (have & need & CEPH_CAP_FILE_WR) { 2550 if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) { 2551 dout("get_cap_refs %p endoff %llu > maxsize %llu\n", 2552 inode, endoff, ci->i_max_size); 2553 if (endoff > ci->i_requested_max_size) { 2554 *err = -EAGAIN; 2555 ret = 1; 2556 } 2557 goto out_unlock; 2558 } 2559 /* 2560 * If a sync write is in progress, we must wait, so that we 2561 * can get a final snapshot value for size+mtime. 2562 */ 2563 if (__ceph_have_pending_cap_snap(ci)) { 2564 dout("get_cap_refs %p cap_snap_pending\n", inode); 2565 goto out_unlock; 2566 } 2567 } 2568 2569 if ((have & need) == need) { 2570 /* 2571 * Look at (implemented & ~have & not) so that we keep waiting 2572 * on transition from wanted -> needed caps. This is needed 2573 * for WRBUFFER|WR -> WR to avoid a new WR sync write from 2574 * going before a prior buffered writeback happens. 2575 */ 2576 int not = want & ~(have & need); 2577 int revoking = implemented & ~have; 2578 dout("get_cap_refs %p have %s but not %s (revoking %s)\n", 2579 inode, ceph_cap_string(have), ceph_cap_string(not), 2580 ceph_cap_string(revoking)); 2581 if ((revoking & not) == 0) { 2582 if (!snap_rwsem_locked && 2583 !ci->i_head_snapc && 2584 (need & CEPH_CAP_FILE_WR)) { 2585 if (!down_read_trylock(&mdsc->snap_rwsem)) { 2586 /* 2587 * we can not call down_read() when 2588 * task isn't in TASK_RUNNING state 2589 */ 2590 if (nonblock) { 2591 *err = -EAGAIN; 2592 ret = 1; 2593 goto out_unlock; 2594 } 2595 2596 spin_unlock(&ci->i_ceph_lock); 2597 down_read(&mdsc->snap_rwsem); 2598 snap_rwsem_locked = true; 2599 goto again; 2600 } 2601 snap_rwsem_locked = true; 2602 } 2603 *got = need | (have & want); 2604 if ((need & CEPH_CAP_FILE_RD) && 2605 !(*got & CEPH_CAP_FILE_CACHE)) 2606 ceph_disable_fscache_readpage(ci); 2607 __take_cap_refs(ci, *got, true); 2608 ret = 1; 2609 } 2610 } else { 2611 int session_readonly = false; 2612 if ((need & CEPH_CAP_FILE_WR) && ci->i_auth_cap) { 2613 struct ceph_mds_session *s = ci->i_auth_cap->session; 2614 spin_lock(&s->s_cap_lock); 2615 session_readonly = s->s_readonly; 2616 spin_unlock(&s->s_cap_lock); 2617 } 2618 if (session_readonly) { 2619 dout("get_cap_refs %p needed %s but mds%d readonly\n", 2620 inode, ceph_cap_string(need), ci->i_auth_cap->mds); 2621 *err = -EROFS; 2622 ret = 1; 2623 goto out_unlock; 2624 } 2625 2626 if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) { 2627 int mds_wanted; 2628 if (READ_ONCE(mdsc->fsc->mount_state) == 2629 CEPH_MOUNT_SHUTDOWN) { 2630 dout("get_cap_refs %p forced umount\n", inode); 2631 *err = -EIO; 2632 ret = 1; 2633 goto out_unlock; 2634 } 2635 mds_wanted = __ceph_caps_mds_wanted(ci, false); 2636 if (need & ~(mds_wanted & need)) { 2637 dout("get_cap_refs %p caps were dropped" 2638 " (session killed?)\n", inode); 2639 *err = -ESTALE; 2640 ret = 1; 2641 goto out_unlock; 2642 } 2643 if (!(file_wanted & ~mds_wanted)) 2644 ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED; 2645 } 2646 2647 dout("get_cap_refs %p have %s needed %s\n", inode, 2648 ceph_cap_string(have), ceph_cap_string(need)); 2649 } 2650 out_unlock: 2651 spin_unlock(&ci->i_ceph_lock); 2652 if (snap_rwsem_locked) 2653 up_read(&mdsc->snap_rwsem); 2654 2655 dout("get_cap_refs %p ret %d got %s\n", inode, 2656 ret, ceph_cap_string(*got)); 2657 return ret; 2658 } 2659 2660 /* 2661 * Check the offset we are writing up to against our current 2662 * max_size. If necessary, tell the MDS we want to write to 2663 * a larger offset. 2664 */ 2665 static void check_max_size(struct inode *inode, loff_t endoff) 2666 { 2667 struct ceph_inode_info *ci = ceph_inode(inode); 2668 int check = 0; 2669 2670 /* do we need to explicitly request a larger max_size? */ 2671 spin_lock(&ci->i_ceph_lock); 2672 if (endoff >= ci->i_max_size && endoff > ci->i_wanted_max_size) { 2673 dout("write %p at large endoff %llu, req max_size\n", 2674 inode, endoff); 2675 ci->i_wanted_max_size = endoff; 2676 } 2677 /* duplicate ceph_check_caps()'s logic */ 2678 if (ci->i_auth_cap && 2679 (ci->i_auth_cap->issued & CEPH_CAP_FILE_WR) && 2680 ci->i_wanted_max_size > ci->i_max_size && 2681 ci->i_wanted_max_size > ci->i_requested_max_size) 2682 check = 1; 2683 spin_unlock(&ci->i_ceph_lock); 2684 if (check) 2685 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); 2686 } 2687 2688 int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got) 2689 { 2690 int ret, err = 0; 2691 2692 BUG_ON(need & ~CEPH_CAP_FILE_RD); 2693 BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)); 2694 ret = ceph_pool_perm_check(ci, need); 2695 if (ret < 0) 2696 return ret; 2697 2698 ret = try_get_cap_refs(ci, need, want, 0, true, got, &err); 2699 if (ret) { 2700 if (err == -EAGAIN) { 2701 ret = 0; 2702 } else if (err < 0) { 2703 ret = err; 2704 } 2705 } 2706 return ret; 2707 } 2708 2709 /* 2710 * Wait for caps, and take cap references. If we can't get a WR cap 2711 * due to a small max_size, make sure we check_max_size (and possibly 2712 * ask the mds) so we don't get hung up indefinitely. 2713 */ 2714 int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, 2715 loff_t endoff, int *got, struct page **pinned_page) 2716 { 2717 int _got, ret, err = 0; 2718 2719 ret = ceph_pool_perm_check(ci, need); 2720 if (ret < 0) 2721 return ret; 2722 2723 while (true) { 2724 if (endoff > 0) 2725 check_max_size(&ci->vfs_inode, endoff); 2726 2727 err = 0; 2728 _got = 0; 2729 ret = try_get_cap_refs(ci, need, want, endoff, 2730 false, &_got, &err); 2731 if (ret) { 2732 if (err == -EAGAIN) 2733 continue; 2734 if (err < 0) 2735 ret = err; 2736 } else { 2737 DEFINE_WAIT_FUNC(wait, woken_wake_function); 2738 add_wait_queue(&ci->i_cap_wq, &wait); 2739 2740 while (!try_get_cap_refs(ci, need, want, endoff, 2741 true, &_got, &err)) { 2742 if (signal_pending(current)) { 2743 ret = -ERESTARTSYS; 2744 break; 2745 } 2746 wait_woken(&wait, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT); 2747 } 2748 2749 remove_wait_queue(&ci->i_cap_wq, &wait); 2750 2751 if (err == -EAGAIN) 2752 continue; 2753 if (err < 0) 2754 ret = err; 2755 } 2756 if (ret < 0) { 2757 if (err == -ESTALE) { 2758 /* session was killed, try renew caps */ 2759 ret = ceph_renew_caps(&ci->vfs_inode); 2760 if (ret == 0) 2761 continue; 2762 } 2763 return ret; 2764 } 2765 2766 if (ci->i_inline_version != CEPH_INLINE_NONE && 2767 (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && 2768 i_size_read(&ci->vfs_inode) > 0) { 2769 struct page *page = 2770 find_get_page(ci->vfs_inode.i_mapping, 0); 2771 if (page) { 2772 if (PageUptodate(page)) { 2773 *pinned_page = page; 2774 break; 2775 } 2776 put_page(page); 2777 } 2778 /* 2779 * drop cap refs first because getattr while 2780 * holding * caps refs can cause deadlock. 2781 */ 2782 ceph_put_cap_refs(ci, _got); 2783 _got = 0; 2784 2785 /* 2786 * getattr request will bring inline data into 2787 * page cache 2788 */ 2789 ret = __ceph_do_getattr(&ci->vfs_inode, NULL, 2790 CEPH_STAT_CAP_INLINE_DATA, 2791 true); 2792 if (ret < 0) 2793 return ret; 2794 continue; 2795 } 2796 break; 2797 } 2798 2799 if ((_got & CEPH_CAP_FILE_RD) && (_got & CEPH_CAP_FILE_CACHE)) 2800 ceph_fscache_revalidate_cookie(ci); 2801 2802 *got = _got; 2803 return 0; 2804 } 2805 2806 /* 2807 * Take cap refs. Caller must already know we hold at least one ref 2808 * on the caps in question or we don't know this is safe. 2809 */ 2810 void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps) 2811 { 2812 spin_lock(&ci->i_ceph_lock); 2813 __take_cap_refs(ci, caps, false); 2814 spin_unlock(&ci->i_ceph_lock); 2815 } 2816 2817 2818 /* 2819 * drop cap_snap that is not associated with any snapshot. 2820 * we don't need to send FLUSHSNAP message for it. 2821 */ 2822 static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci, 2823 struct ceph_cap_snap *capsnap) 2824 { 2825 if (!capsnap->need_flush && 2826 !capsnap->writing && !capsnap->dirty_pages) { 2827 dout("dropping cap_snap %p follows %llu\n", 2828 capsnap, capsnap->follows); 2829 BUG_ON(capsnap->cap_flush.tid > 0); 2830 ceph_put_snap_context(capsnap->context); 2831 if (!list_is_last(&capsnap->ci_item, &ci->i_cap_snaps)) 2832 ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS; 2833 2834 list_del(&capsnap->ci_item); 2835 ceph_put_cap_snap(capsnap); 2836 return 1; 2837 } 2838 return 0; 2839 } 2840 2841 /* 2842 * Release cap refs. 2843 * 2844 * If we released the last ref on any given cap, call ceph_check_caps 2845 * to release (or schedule a release). 2846 * 2847 * If we are releasing a WR cap (from a sync write), finalize any affected 2848 * cap_snap, and wake up any waiters. 2849 */ 2850 void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) 2851 { 2852 struct inode *inode = &ci->vfs_inode; 2853 int last = 0, put = 0, flushsnaps = 0, wake = 0; 2854 2855 spin_lock(&ci->i_ceph_lock); 2856 if (had & CEPH_CAP_PIN) 2857 --ci->i_pin_ref; 2858 if (had & CEPH_CAP_FILE_RD) 2859 if (--ci->i_rd_ref == 0) 2860 last++; 2861 if (had & CEPH_CAP_FILE_CACHE) 2862 if (--ci->i_rdcache_ref == 0) 2863 last++; 2864 if (had & CEPH_CAP_FILE_BUFFER) { 2865 if (--ci->i_wb_ref == 0) { 2866 last++; 2867 put++; 2868 } 2869 dout("put_cap_refs %p wb %d -> %d (?)\n", 2870 inode, ci->i_wb_ref+1, ci->i_wb_ref); 2871 } 2872 if (had & CEPH_CAP_FILE_WR) 2873 if (--ci->i_wr_ref == 0) { 2874 last++; 2875 if (__ceph_have_pending_cap_snap(ci)) { 2876 struct ceph_cap_snap *capsnap = 2877 list_last_entry(&ci->i_cap_snaps, 2878 struct ceph_cap_snap, 2879 ci_item); 2880 capsnap->writing = 0; 2881 if (ceph_try_drop_cap_snap(ci, capsnap)) 2882 put++; 2883 else if (__ceph_finish_cap_snap(ci, capsnap)) 2884 flushsnaps = 1; 2885 wake = 1; 2886 } 2887 if (ci->i_wrbuffer_ref_head == 0 && 2888 ci->i_dirty_caps == 0 && 2889 ci->i_flushing_caps == 0) { 2890 BUG_ON(!ci->i_head_snapc); 2891 ceph_put_snap_context(ci->i_head_snapc); 2892 ci->i_head_snapc = NULL; 2893 } 2894 /* see comment in __ceph_remove_cap() */ 2895 if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) 2896 drop_inode_snap_realm(ci); 2897 } 2898 spin_unlock(&ci->i_ceph_lock); 2899 2900 dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had), 2901 last ? " last" : "", put ? " put" : ""); 2902 2903 if (last && !flushsnaps) 2904 ceph_check_caps(ci, 0, NULL); 2905 else if (flushsnaps) 2906 ceph_flush_snaps(ci, NULL); 2907 if (wake) 2908 wake_up_all(&ci->i_cap_wq); 2909 while (put-- > 0) 2910 iput(inode); 2911 } 2912 2913 /* 2914 * Release @nr WRBUFFER refs on dirty pages for the given @snapc snap 2915 * context. Adjust per-snap dirty page accounting as appropriate. 2916 * Once all dirty data for a cap_snap is flushed, flush snapped file 2917 * metadata back to the MDS. If we dropped the last ref, call 2918 * ceph_check_caps. 2919 */ 2920 void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, 2921 struct ceph_snap_context *snapc) 2922 { 2923 struct inode *inode = &ci->vfs_inode; 2924 struct ceph_cap_snap *capsnap = NULL; 2925 int put = 0; 2926 bool last = false; 2927 bool found = false; 2928 bool flush_snaps = false; 2929 bool complete_capsnap = false; 2930 2931 spin_lock(&ci->i_ceph_lock); 2932 ci->i_wrbuffer_ref -= nr; 2933 if (ci->i_wrbuffer_ref == 0) { 2934 last = true; 2935 put++; 2936 } 2937 2938 if (ci->i_head_snapc == snapc) { 2939 ci->i_wrbuffer_ref_head -= nr; 2940 if (ci->i_wrbuffer_ref_head == 0 && 2941 ci->i_wr_ref == 0 && 2942 ci->i_dirty_caps == 0 && 2943 ci->i_flushing_caps == 0) { 2944 BUG_ON(!ci->i_head_snapc); 2945 ceph_put_snap_context(ci->i_head_snapc); 2946 ci->i_head_snapc = NULL; 2947 } 2948 dout("put_wrbuffer_cap_refs on %p head %d/%d -> %d/%d %s\n", 2949 inode, 2950 ci->i_wrbuffer_ref+nr, ci->i_wrbuffer_ref_head+nr, 2951 ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head, 2952 last ? " LAST" : ""); 2953 } else { 2954 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 2955 if (capsnap->context == snapc) { 2956 found = true; 2957 break; 2958 } 2959 } 2960 BUG_ON(!found); 2961 capsnap->dirty_pages -= nr; 2962 if (capsnap->dirty_pages == 0) { 2963 complete_capsnap = true; 2964 if (!capsnap->writing) { 2965 if (ceph_try_drop_cap_snap(ci, capsnap)) { 2966 put++; 2967 } else { 2968 ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS; 2969 flush_snaps = true; 2970 } 2971 } 2972 } 2973 dout("put_wrbuffer_cap_refs on %p cap_snap %p " 2974 " snap %lld %d/%d -> %d/%d %s%s\n", 2975 inode, capsnap, capsnap->context->seq, 2976 ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr, 2977 ci->i_wrbuffer_ref, capsnap->dirty_pages, 2978 last ? " (wrbuffer last)" : "", 2979 complete_capsnap ? " (complete capsnap)" : ""); 2980 } 2981 2982 spin_unlock(&ci->i_ceph_lock); 2983 2984 if (last) { 2985 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); 2986 } else if (flush_snaps) { 2987 ceph_flush_snaps(ci, NULL); 2988 } 2989 if (complete_capsnap) 2990 wake_up_all(&ci->i_cap_wq); 2991 while (put-- > 0) 2992 iput(inode); 2993 } 2994 2995 /* 2996 * Invalidate unlinked inode's aliases, so we can drop the inode ASAP. 2997 */ 2998 static void invalidate_aliases(struct inode *inode) 2999 { 3000 struct dentry *dn, *prev = NULL; 3001 3002 dout("invalidate_aliases inode %p\n", inode); 3003 d_prune_aliases(inode); 3004 /* 3005 * For non-directory inode, d_find_alias() only returns 3006 * hashed dentry. After calling d_invalidate(), the 3007 * dentry becomes unhashed. 3008 * 3009 * For directory inode, d_find_alias() can return 3010 * unhashed dentry. But directory inode should have 3011 * one alias at most. 3012 */ 3013 while ((dn = d_find_alias(inode))) { 3014 if (dn == prev) { 3015 dput(dn); 3016 break; 3017 } 3018 d_invalidate(dn); 3019 if (prev) 3020 dput(prev); 3021 prev = dn; 3022 } 3023 if (prev) 3024 dput(prev); 3025 } 3026 3027 struct cap_extra_info { 3028 struct ceph_string *pool_ns; 3029 /* inline data */ 3030 u64 inline_version; 3031 void *inline_data; 3032 u32 inline_len; 3033 /* dirstat */ 3034 bool dirstat_valid; 3035 u64 nfiles; 3036 u64 nsubdirs; 3037 /* currently issued */ 3038 int issued; 3039 }; 3040 3041 /* 3042 * Handle a cap GRANT message from the MDS. (Note that a GRANT may 3043 * actually be a revocation if it specifies a smaller cap set.) 3044 * 3045 * caller holds s_mutex and i_ceph_lock, we drop both. 3046 */ 3047 static void handle_cap_grant(struct inode *inode, 3048 struct ceph_mds_session *session, 3049 struct ceph_cap *cap, 3050 struct ceph_mds_caps *grant, 3051 struct ceph_buffer *xattr_buf, 3052 struct cap_extra_info *extra_info) 3053 __releases(ci->i_ceph_lock) 3054 __releases(session->s_mdsc->snap_rwsem) 3055 { 3056 struct ceph_inode_info *ci = ceph_inode(inode); 3057 int seq = le32_to_cpu(grant->seq); 3058 int newcaps = le32_to_cpu(grant->caps); 3059 int used, wanted, dirty; 3060 u64 size = le64_to_cpu(grant->size); 3061 u64 max_size = le64_to_cpu(grant->max_size); 3062 int check_caps = 0; 3063 bool wake = false; 3064 bool writeback = false; 3065 bool queue_trunc = false; 3066 bool queue_invalidate = false; 3067 bool deleted_inode = false; 3068 bool fill_inline = false; 3069 3070 dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", 3071 inode, cap, session->s_mds, seq, ceph_cap_string(newcaps)); 3072 dout(" size %llu max_size %llu, i_size %llu\n", size, max_size, 3073 inode->i_size); 3074 3075 3076 /* 3077 * auth mds of the inode changed. we received the cap export message, 3078 * but still haven't received the cap import message. handle_cap_export 3079 * updated the new auth MDS' cap. 3080 * 3081 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message 3082 * that was sent before the cap import message. So don't remove caps. 3083 */ 3084 if (ceph_seq_cmp(seq, cap->seq) <= 0) { 3085 WARN_ON(cap != ci->i_auth_cap); 3086 WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id)); 3087 seq = cap->seq; 3088 newcaps |= cap->issued; 3089 } 3090 3091 /* 3092 * If CACHE is being revoked, and we have no dirty buffers, 3093 * try to invalidate (once). (If there are dirty buffers, we 3094 * will invalidate _after_ writeback.) 3095 */ 3096 if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */ 3097 ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && 3098 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && 3099 !(ci->i_wrbuffer_ref || ci->i_wb_ref)) { 3100 if (try_nonblocking_invalidate(inode)) { 3101 /* there were locked pages.. invalidate later 3102 in a separate thread. */ 3103 if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { 3104 queue_invalidate = true; 3105 ci->i_rdcache_revoking = ci->i_rdcache_gen; 3106 } 3107 } 3108 } 3109 3110 /* side effects now are allowed */ 3111 cap->cap_gen = session->s_cap_gen; 3112 cap->seq = seq; 3113 3114 __check_cap_issue(ci, cap, newcaps); 3115 3116 if ((newcaps & CEPH_CAP_AUTH_SHARED) && 3117 (extra_info->issued & CEPH_CAP_AUTH_EXCL) == 0) { 3118 inode->i_mode = le32_to_cpu(grant->mode); 3119 inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid)); 3120 inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid)); 3121 dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode, 3122 from_kuid(&init_user_ns, inode->i_uid), 3123 from_kgid(&init_user_ns, inode->i_gid)); 3124 } 3125 3126 if ((newcaps & CEPH_CAP_LINK_SHARED) && 3127 (extra_info->issued & CEPH_CAP_LINK_EXCL) == 0) { 3128 set_nlink(inode, le32_to_cpu(grant->nlink)); 3129 if (inode->i_nlink == 0 && 3130 (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL))) 3131 deleted_inode = true; 3132 } 3133 3134 if ((extra_info->issued & CEPH_CAP_XATTR_EXCL) == 0 && 3135 grant->xattr_len) { 3136 int len = le32_to_cpu(grant->xattr_len); 3137 u64 version = le64_to_cpu(grant->xattr_version); 3138 3139 if (version > ci->i_xattrs.version) { 3140 dout(" got new xattrs v%llu on %p len %d\n", 3141 version, inode, len); 3142 if (ci->i_xattrs.blob) 3143 ceph_buffer_put(ci->i_xattrs.blob); 3144 ci->i_xattrs.blob = ceph_buffer_get(xattr_buf); 3145 ci->i_xattrs.version = version; 3146 ceph_forget_all_cached_acls(inode); 3147 } 3148 } 3149 3150 if (newcaps & CEPH_CAP_ANY_RD) { 3151 struct timespec mtime, atime, ctime; 3152 /* ctime/mtime/atime? */ 3153 ceph_decode_timespec(&mtime, &grant->mtime); 3154 ceph_decode_timespec(&atime, &grant->atime); 3155 ceph_decode_timespec(&ctime, &grant->ctime); 3156 ceph_fill_file_time(inode, extra_info->issued, 3157 le32_to_cpu(grant->time_warp_seq), 3158 &ctime, &mtime, &atime); 3159 } 3160 3161 if ((newcaps & CEPH_CAP_FILE_SHARED) && extra_info->dirstat_valid) { 3162 ci->i_files = extra_info->nfiles; 3163 ci->i_subdirs = extra_info->nsubdirs; 3164 } 3165 3166 if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) { 3167 /* file layout may have changed */ 3168 s64 old_pool = ci->i_layout.pool_id; 3169 struct ceph_string *old_ns; 3170 3171 ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout); 3172 old_ns = rcu_dereference_protected(ci->i_layout.pool_ns, 3173 lockdep_is_held(&ci->i_ceph_lock)); 3174 rcu_assign_pointer(ci->i_layout.pool_ns, extra_info->pool_ns); 3175 3176 if (ci->i_layout.pool_id != old_pool || 3177 extra_info->pool_ns != old_ns) 3178 ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; 3179 3180 extra_info->pool_ns = old_ns; 3181 3182 /* size/truncate_seq? */ 3183 queue_trunc = ceph_fill_file_size(inode, extra_info->issued, 3184 le32_to_cpu(grant->truncate_seq), 3185 le64_to_cpu(grant->truncate_size), 3186 size); 3187 } 3188 3189 if (ci->i_auth_cap == cap && (newcaps & CEPH_CAP_ANY_FILE_WR)) { 3190 if (max_size != ci->i_max_size) { 3191 dout("max_size %lld -> %llu\n", 3192 ci->i_max_size, max_size); 3193 ci->i_max_size = max_size; 3194 if (max_size >= ci->i_wanted_max_size) { 3195 ci->i_wanted_max_size = 0; /* reset */ 3196 ci->i_requested_max_size = 0; 3197 } 3198 wake = true; 3199 } else if (ci->i_wanted_max_size > ci->i_max_size && 3200 ci->i_wanted_max_size > ci->i_requested_max_size) { 3201 /* CEPH_CAP_OP_IMPORT */ 3202 wake = true; 3203 } 3204 } 3205 3206 /* check cap bits */ 3207 wanted = __ceph_caps_wanted(ci); 3208 used = __ceph_caps_used(ci); 3209 dirty = __ceph_caps_dirty(ci); 3210 dout(" my wanted = %s, used = %s, dirty %s\n", 3211 ceph_cap_string(wanted), 3212 ceph_cap_string(used), 3213 ceph_cap_string(dirty)); 3214 if (wanted != le32_to_cpu(grant->wanted)) { 3215 dout("mds wanted %s -> %s\n", 3216 ceph_cap_string(le32_to_cpu(grant->wanted)), 3217 ceph_cap_string(wanted)); 3218 /* imported cap may not have correct mds_wanted */ 3219 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) 3220 check_caps = 1; 3221 } 3222 3223 /* revocation, grant, or no-op? */ 3224 if (cap->issued & ~newcaps) { 3225 int revoking = cap->issued & ~newcaps; 3226 3227 dout("revocation: %s -> %s (revoking %s)\n", 3228 ceph_cap_string(cap->issued), 3229 ceph_cap_string(newcaps), 3230 ceph_cap_string(revoking)); 3231 if (revoking & used & CEPH_CAP_FILE_BUFFER) 3232 writeback = true; /* initiate writeback; will delay ack */ 3233 else if (revoking == CEPH_CAP_FILE_CACHE && 3234 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && 3235 queue_invalidate) 3236 ; /* do nothing yet, invalidation will be queued */ 3237 else if (cap == ci->i_auth_cap) 3238 check_caps = 1; /* check auth cap only */ 3239 else 3240 check_caps = 2; /* check all caps */ 3241 cap->issued = newcaps; 3242 cap->implemented |= newcaps; 3243 } else if (cap->issued == newcaps) { 3244 dout("caps unchanged: %s -> %s\n", 3245 ceph_cap_string(cap->issued), ceph_cap_string(newcaps)); 3246 } else { 3247 dout("grant: %s -> %s\n", ceph_cap_string(cap->issued), 3248 ceph_cap_string(newcaps)); 3249 /* non-auth MDS is revoking the newly grant caps ? */ 3250 if (cap == ci->i_auth_cap && 3251 __ceph_caps_revoking_other(ci, cap, newcaps)) 3252 check_caps = 2; 3253 3254 cap->issued = newcaps; 3255 cap->implemented |= newcaps; /* add bits only, to 3256 * avoid stepping on a 3257 * pending revocation */ 3258 wake = true; 3259 } 3260 BUG_ON(cap->issued & ~cap->implemented); 3261 3262 if (extra_info->inline_version > 0 && 3263 extra_info->inline_version >= ci->i_inline_version) { 3264 ci->i_inline_version = extra_info->inline_version; 3265 if (ci->i_inline_version != CEPH_INLINE_NONE && 3266 (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO))) 3267 fill_inline = true; 3268 } 3269 3270 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { 3271 if (newcaps & ~extra_info->issued) 3272 wake = true; 3273 kick_flushing_inode_caps(session->s_mdsc, session, inode); 3274 up_read(&session->s_mdsc->snap_rwsem); 3275 } else { 3276 spin_unlock(&ci->i_ceph_lock); 3277 } 3278 3279 if (fill_inline) 3280 ceph_fill_inline_data(inode, NULL, extra_info->inline_data, 3281 extra_info->inline_len); 3282 3283 if (queue_trunc) 3284 ceph_queue_vmtruncate(inode); 3285 3286 if (writeback) 3287 /* 3288 * queue inode for writeback: we can't actually call 3289 * filemap_write_and_wait, etc. from message handler 3290 * context. 3291 */ 3292 ceph_queue_writeback(inode); 3293 if (queue_invalidate) 3294 ceph_queue_invalidate(inode); 3295 if (deleted_inode) 3296 invalidate_aliases(inode); 3297 if (wake) 3298 wake_up_all(&ci->i_cap_wq); 3299 3300 if (check_caps == 1) 3301 ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY, 3302 session); 3303 else if (check_caps == 2) 3304 ceph_check_caps(ci, CHECK_CAPS_NODELAY, session); 3305 else 3306 mutex_unlock(&session->s_mutex); 3307 } 3308 3309 /* 3310 * Handle FLUSH_ACK from MDS, indicating that metadata we sent to the 3311 * MDS has been safely committed. 3312 */ 3313 static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, 3314 struct ceph_mds_caps *m, 3315 struct ceph_mds_session *session, 3316 struct ceph_cap *cap) 3317 __releases(ci->i_ceph_lock) 3318 { 3319 struct ceph_inode_info *ci = ceph_inode(inode); 3320 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 3321 struct ceph_cap_flush *cf, *tmp_cf; 3322 LIST_HEAD(to_remove); 3323 unsigned seq = le32_to_cpu(m->seq); 3324 int dirty = le32_to_cpu(m->dirty); 3325 int cleaned = 0; 3326 bool drop = false; 3327 bool wake_ci = false; 3328 bool wake_mdsc = false; 3329 3330 list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) { 3331 if (cf->tid == flush_tid) 3332 cleaned = cf->caps; 3333 if (cf->caps == 0) /* capsnap */ 3334 continue; 3335 if (cf->tid <= flush_tid) { 3336 if (__finish_cap_flush(NULL, ci, cf)) 3337 wake_ci = true; 3338 list_add_tail(&cf->i_list, &to_remove); 3339 } else { 3340 cleaned &= ~cf->caps; 3341 if (!cleaned) 3342 break; 3343 } 3344 } 3345 3346 dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s," 3347 " flushing %s -> %s\n", 3348 inode, session->s_mds, seq, ceph_cap_string(dirty), 3349 ceph_cap_string(cleaned), ceph_cap_string(ci->i_flushing_caps), 3350 ceph_cap_string(ci->i_flushing_caps & ~cleaned)); 3351 3352 if (list_empty(&to_remove) && !cleaned) 3353 goto out; 3354 3355 ci->i_flushing_caps &= ~cleaned; 3356 3357 spin_lock(&mdsc->cap_dirty_lock); 3358 3359 list_for_each_entry(cf, &to_remove, i_list) { 3360 if (__finish_cap_flush(mdsc, NULL, cf)) 3361 wake_mdsc = true; 3362 } 3363 3364 if (ci->i_flushing_caps == 0) { 3365 if (list_empty(&ci->i_cap_flush_list)) { 3366 list_del_init(&ci->i_flushing_item); 3367 if (!list_empty(&session->s_cap_flushing)) { 3368 dout(" mds%d still flushing cap on %p\n", 3369 session->s_mds, 3370 &list_first_entry(&session->s_cap_flushing, 3371 struct ceph_inode_info, 3372 i_flushing_item)->vfs_inode); 3373 } 3374 } 3375 mdsc->num_cap_flushing--; 3376 dout(" inode %p now !flushing\n", inode); 3377 3378 if (ci->i_dirty_caps == 0) { 3379 dout(" inode %p now clean\n", inode); 3380 BUG_ON(!list_empty(&ci->i_dirty_item)); 3381 drop = true; 3382 if (ci->i_wr_ref == 0 && 3383 ci->i_wrbuffer_ref_head == 0) { 3384 BUG_ON(!ci->i_head_snapc); 3385 ceph_put_snap_context(ci->i_head_snapc); 3386 ci->i_head_snapc = NULL; 3387 } 3388 } else { 3389 BUG_ON(list_empty(&ci->i_dirty_item)); 3390 } 3391 } 3392 spin_unlock(&mdsc->cap_dirty_lock); 3393 3394 out: 3395 spin_unlock(&ci->i_ceph_lock); 3396 3397 while (!list_empty(&to_remove)) { 3398 cf = list_first_entry(&to_remove, 3399 struct ceph_cap_flush, i_list); 3400 list_del(&cf->i_list); 3401 ceph_free_cap_flush(cf); 3402 } 3403 3404 if (wake_ci) 3405 wake_up_all(&ci->i_cap_wq); 3406 if (wake_mdsc) 3407 wake_up_all(&mdsc->cap_flushing_wq); 3408 if (drop) 3409 iput(inode); 3410 } 3411 3412 /* 3413 * Handle FLUSHSNAP_ACK. MDS has flushed snap data to disk and we can 3414 * throw away our cap_snap. 3415 * 3416 * Caller hold s_mutex. 3417 */ 3418 static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, 3419 struct ceph_mds_caps *m, 3420 struct ceph_mds_session *session) 3421 { 3422 struct ceph_inode_info *ci = ceph_inode(inode); 3423 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 3424 u64 follows = le64_to_cpu(m->snap_follows); 3425 struct ceph_cap_snap *capsnap; 3426 bool flushed = false; 3427 bool wake_ci = false; 3428 bool wake_mdsc = false; 3429 3430 dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n", 3431 inode, ci, session->s_mds, follows); 3432 3433 spin_lock(&ci->i_ceph_lock); 3434 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 3435 if (capsnap->follows == follows) { 3436 if (capsnap->cap_flush.tid != flush_tid) { 3437 dout(" cap_snap %p follows %lld tid %lld !=" 3438 " %lld\n", capsnap, follows, 3439 flush_tid, capsnap->cap_flush.tid); 3440 break; 3441 } 3442 flushed = true; 3443 break; 3444 } else { 3445 dout(" skipping cap_snap %p follows %lld\n", 3446 capsnap, capsnap->follows); 3447 } 3448 } 3449 if (flushed) { 3450 WARN_ON(capsnap->dirty_pages || capsnap->writing); 3451 dout(" removing %p cap_snap %p follows %lld\n", 3452 inode, capsnap, follows); 3453 list_del(&capsnap->ci_item); 3454 if (__finish_cap_flush(NULL, ci, &capsnap->cap_flush)) 3455 wake_ci = true; 3456 3457 spin_lock(&mdsc->cap_dirty_lock); 3458 3459 if (list_empty(&ci->i_cap_flush_list)) 3460 list_del_init(&ci->i_flushing_item); 3461 3462 if (__finish_cap_flush(mdsc, NULL, &capsnap->cap_flush)) 3463 wake_mdsc = true; 3464 3465 spin_unlock(&mdsc->cap_dirty_lock); 3466 } 3467 spin_unlock(&ci->i_ceph_lock); 3468 if (flushed) { 3469 ceph_put_snap_context(capsnap->context); 3470 ceph_put_cap_snap(capsnap); 3471 if (wake_ci) 3472 wake_up_all(&ci->i_cap_wq); 3473 if (wake_mdsc) 3474 wake_up_all(&mdsc->cap_flushing_wq); 3475 iput(inode); 3476 } 3477 } 3478 3479 /* 3480 * Handle TRUNC from MDS, indicating file truncation. 3481 * 3482 * caller hold s_mutex. 3483 */ 3484 static void handle_cap_trunc(struct inode *inode, 3485 struct ceph_mds_caps *trunc, 3486 struct ceph_mds_session *session) 3487 __releases(ci->i_ceph_lock) 3488 { 3489 struct ceph_inode_info *ci = ceph_inode(inode); 3490 int mds = session->s_mds; 3491 int seq = le32_to_cpu(trunc->seq); 3492 u32 truncate_seq = le32_to_cpu(trunc->truncate_seq); 3493 u64 truncate_size = le64_to_cpu(trunc->truncate_size); 3494 u64 size = le64_to_cpu(trunc->size); 3495 int implemented = 0; 3496 int dirty = __ceph_caps_dirty(ci); 3497 int issued = __ceph_caps_issued(ceph_inode(inode), &implemented); 3498 int queue_trunc = 0; 3499 3500 issued |= implemented | dirty; 3501 3502 dout("handle_cap_trunc inode %p mds%d seq %d to %lld seq %d\n", 3503 inode, mds, seq, truncate_size, truncate_seq); 3504 queue_trunc = ceph_fill_file_size(inode, issued, 3505 truncate_seq, truncate_size, size); 3506 spin_unlock(&ci->i_ceph_lock); 3507 3508 if (queue_trunc) 3509 ceph_queue_vmtruncate(inode); 3510 } 3511 3512 /* 3513 * Handle EXPORT from MDS. Cap is being migrated _from_ this mds to a 3514 * different one. If we are the most recent migration we've seen (as 3515 * indicated by mseq), make note of the migrating cap bits for the 3516 * duration (until we see the corresponding IMPORT). 3517 * 3518 * caller holds s_mutex 3519 */ 3520 static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, 3521 struct ceph_mds_cap_peer *ph, 3522 struct ceph_mds_session *session) 3523 { 3524 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 3525 struct ceph_mds_session *tsession = NULL; 3526 struct ceph_cap *cap, *tcap, *new_cap = NULL; 3527 struct ceph_inode_info *ci = ceph_inode(inode); 3528 u64 t_cap_id; 3529 unsigned mseq = le32_to_cpu(ex->migrate_seq); 3530 unsigned t_seq, t_mseq; 3531 int target, issued; 3532 int mds = session->s_mds; 3533 3534 if (ph) { 3535 t_cap_id = le64_to_cpu(ph->cap_id); 3536 t_seq = le32_to_cpu(ph->seq); 3537 t_mseq = le32_to_cpu(ph->mseq); 3538 target = le32_to_cpu(ph->mds); 3539 } else { 3540 t_cap_id = t_seq = t_mseq = 0; 3541 target = -1; 3542 } 3543 3544 dout("handle_cap_export inode %p ci %p mds%d mseq %d target %d\n", 3545 inode, ci, mds, mseq, target); 3546 retry: 3547 spin_lock(&ci->i_ceph_lock); 3548 cap = __get_cap_for_mds(ci, mds); 3549 if (!cap || cap->cap_id != le64_to_cpu(ex->cap_id)) 3550 goto out_unlock; 3551 3552 if (target < 0) { 3553 __ceph_remove_cap(cap, false); 3554 if (!ci->i_auth_cap) 3555 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED; 3556 goto out_unlock; 3557 } 3558 3559 /* 3560 * now we know we haven't received the cap import message yet 3561 * because the exported cap still exist. 3562 */ 3563 3564 issued = cap->issued; 3565 if (issued != cap->implemented) 3566 pr_err_ratelimited("handle_cap_export: issued != implemented: " 3567 "ino (%llx.%llx) mds%d seq %d mseq %d " 3568 "issued %s implemented %s\n", 3569 ceph_vinop(inode), mds, cap->seq, cap->mseq, 3570 ceph_cap_string(issued), 3571 ceph_cap_string(cap->implemented)); 3572 3573 3574 tcap = __get_cap_for_mds(ci, target); 3575 if (tcap) { 3576 /* already have caps from the target */ 3577 if (tcap->cap_id == t_cap_id && 3578 ceph_seq_cmp(tcap->seq, t_seq) < 0) { 3579 dout(" updating import cap %p mds%d\n", tcap, target); 3580 tcap->cap_id = t_cap_id; 3581 tcap->seq = t_seq - 1; 3582 tcap->issue_seq = t_seq - 1; 3583 tcap->mseq = t_mseq; 3584 tcap->issued |= issued; 3585 tcap->implemented |= issued; 3586 if (cap == ci->i_auth_cap) 3587 ci->i_auth_cap = tcap; 3588 3589 if (!list_empty(&ci->i_cap_flush_list) && 3590 ci->i_auth_cap == tcap) { 3591 spin_lock(&mdsc->cap_dirty_lock); 3592 list_move_tail(&ci->i_flushing_item, 3593 &tcap->session->s_cap_flushing); 3594 spin_unlock(&mdsc->cap_dirty_lock); 3595 } 3596 } 3597 __ceph_remove_cap(cap, false); 3598 goto out_unlock; 3599 } else if (tsession) { 3600 /* add placeholder for the export tagert */ 3601 int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0; 3602 tcap = new_cap; 3603 ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0, 3604 t_seq - 1, t_mseq, (u64)-1, flag, &new_cap); 3605 3606 if (!list_empty(&ci->i_cap_flush_list) && 3607 ci->i_auth_cap == tcap) { 3608 spin_lock(&mdsc->cap_dirty_lock); 3609 list_move_tail(&ci->i_flushing_item, 3610 &tcap->session->s_cap_flushing); 3611 spin_unlock(&mdsc->cap_dirty_lock); 3612 } 3613 3614 __ceph_remove_cap(cap, false); 3615 goto out_unlock; 3616 } 3617 3618 spin_unlock(&ci->i_ceph_lock); 3619 mutex_unlock(&session->s_mutex); 3620 3621 /* open target session */ 3622 tsession = ceph_mdsc_open_export_target_session(mdsc, target); 3623 if (!IS_ERR(tsession)) { 3624 if (mds > target) { 3625 mutex_lock(&session->s_mutex); 3626 mutex_lock_nested(&tsession->s_mutex, 3627 SINGLE_DEPTH_NESTING); 3628 } else { 3629 mutex_lock(&tsession->s_mutex); 3630 mutex_lock_nested(&session->s_mutex, 3631 SINGLE_DEPTH_NESTING); 3632 } 3633 new_cap = ceph_get_cap(mdsc, NULL); 3634 } else { 3635 WARN_ON(1); 3636 tsession = NULL; 3637 target = -1; 3638 } 3639 goto retry; 3640 3641 out_unlock: 3642 spin_unlock(&ci->i_ceph_lock); 3643 mutex_unlock(&session->s_mutex); 3644 if (tsession) { 3645 mutex_unlock(&tsession->s_mutex); 3646 ceph_put_mds_session(tsession); 3647 } 3648 if (new_cap) 3649 ceph_put_cap(mdsc, new_cap); 3650 } 3651 3652 /* 3653 * Handle cap IMPORT. 3654 * 3655 * caller holds s_mutex. acquires i_ceph_lock 3656 */ 3657 static void handle_cap_import(struct ceph_mds_client *mdsc, 3658 struct inode *inode, struct ceph_mds_caps *im, 3659 struct ceph_mds_cap_peer *ph, 3660 struct ceph_mds_session *session, 3661 struct ceph_cap **target_cap, int *old_issued) 3662 __acquires(ci->i_ceph_lock) 3663 { 3664 struct ceph_inode_info *ci = ceph_inode(inode); 3665 struct ceph_cap *cap, *ocap, *new_cap = NULL; 3666 int mds = session->s_mds; 3667 int issued; 3668 unsigned caps = le32_to_cpu(im->caps); 3669 unsigned wanted = le32_to_cpu(im->wanted); 3670 unsigned seq = le32_to_cpu(im->seq); 3671 unsigned mseq = le32_to_cpu(im->migrate_seq); 3672 u64 realmino = le64_to_cpu(im->realm); 3673 u64 cap_id = le64_to_cpu(im->cap_id); 3674 u64 p_cap_id; 3675 int peer; 3676 3677 if (ph) { 3678 p_cap_id = le64_to_cpu(ph->cap_id); 3679 peer = le32_to_cpu(ph->mds); 3680 } else { 3681 p_cap_id = 0; 3682 peer = -1; 3683 } 3684 3685 dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n", 3686 inode, ci, mds, mseq, peer); 3687 3688 retry: 3689 spin_lock(&ci->i_ceph_lock); 3690 cap = __get_cap_for_mds(ci, mds); 3691 if (!cap) { 3692 if (!new_cap) { 3693 spin_unlock(&ci->i_ceph_lock); 3694 new_cap = ceph_get_cap(mdsc, NULL); 3695 goto retry; 3696 } 3697 cap = new_cap; 3698 } else { 3699 if (new_cap) { 3700 ceph_put_cap(mdsc, new_cap); 3701 new_cap = NULL; 3702 } 3703 } 3704 3705 __ceph_caps_issued(ci, &issued); 3706 issued |= __ceph_caps_dirty(ci); 3707 3708 ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq, 3709 realmino, CEPH_CAP_FLAG_AUTH, &new_cap); 3710 3711 ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL; 3712 if (ocap && ocap->cap_id == p_cap_id) { 3713 dout(" remove export cap %p mds%d flags %d\n", 3714 ocap, peer, ph->flags); 3715 if ((ph->flags & CEPH_CAP_FLAG_AUTH) && 3716 (ocap->seq != le32_to_cpu(ph->seq) || 3717 ocap->mseq != le32_to_cpu(ph->mseq))) { 3718 pr_err_ratelimited("handle_cap_import: " 3719 "mismatched seq/mseq: ino (%llx.%llx) " 3720 "mds%d seq %d mseq %d importer mds%d " 3721 "has peer seq %d mseq %d\n", 3722 ceph_vinop(inode), peer, ocap->seq, 3723 ocap->mseq, mds, le32_to_cpu(ph->seq), 3724 le32_to_cpu(ph->mseq)); 3725 } 3726 __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE)); 3727 } 3728 3729 /* make sure we re-request max_size, if necessary */ 3730 ci->i_requested_max_size = 0; 3731 3732 *old_issued = issued; 3733 *target_cap = cap; 3734 } 3735 3736 /* 3737 * Handle a caps message from the MDS. 3738 * 3739 * Identify the appropriate session, inode, and call the right handler 3740 * based on the cap op. 3741 */ 3742 void ceph_handle_caps(struct ceph_mds_session *session, 3743 struct ceph_msg *msg) 3744 { 3745 struct ceph_mds_client *mdsc = session->s_mdsc; 3746 struct inode *inode; 3747 struct ceph_inode_info *ci; 3748 struct ceph_cap *cap; 3749 struct ceph_mds_caps *h; 3750 struct ceph_mds_cap_peer *peer = NULL; 3751 struct ceph_snap_realm *realm = NULL; 3752 int op; 3753 int msg_version = le16_to_cpu(msg->hdr.version); 3754 u32 seq, mseq; 3755 struct ceph_vino vino; 3756 void *snaptrace; 3757 size_t snaptrace_len; 3758 void *p, *end; 3759 struct cap_extra_info extra_info = {}; 3760 3761 dout("handle_caps from mds%d\n", session->s_mds); 3762 3763 /* decode */ 3764 end = msg->front.iov_base + msg->front.iov_len; 3765 if (msg->front.iov_len < sizeof(*h)) 3766 goto bad; 3767 h = msg->front.iov_base; 3768 op = le32_to_cpu(h->op); 3769 vino.ino = le64_to_cpu(h->ino); 3770 vino.snap = CEPH_NOSNAP; 3771 seq = le32_to_cpu(h->seq); 3772 mseq = le32_to_cpu(h->migrate_seq); 3773 3774 snaptrace = h + 1; 3775 snaptrace_len = le32_to_cpu(h->snap_trace_len); 3776 p = snaptrace + snaptrace_len; 3777 3778 if (msg_version >= 2) { 3779 u32 flock_len; 3780 ceph_decode_32_safe(&p, end, flock_len, bad); 3781 if (p + flock_len > end) 3782 goto bad; 3783 p += flock_len; 3784 } 3785 3786 if (msg_version >= 3) { 3787 if (op == CEPH_CAP_OP_IMPORT) { 3788 if (p + sizeof(*peer) > end) 3789 goto bad; 3790 peer = p; 3791 p += sizeof(*peer); 3792 } else if (op == CEPH_CAP_OP_EXPORT) { 3793 /* recorded in unused fields */ 3794 peer = (void *)&h->size; 3795 } 3796 } 3797 3798 if (msg_version >= 4) { 3799 ceph_decode_64_safe(&p, end, extra_info.inline_version, bad); 3800 ceph_decode_32_safe(&p, end, extra_info.inline_len, bad); 3801 if (p + extra_info.inline_len > end) 3802 goto bad; 3803 extra_info.inline_data = p; 3804 p += extra_info.inline_len; 3805 } 3806 3807 if (msg_version >= 5) { 3808 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 3809 u32 epoch_barrier; 3810 3811 ceph_decode_32_safe(&p, end, epoch_barrier, bad); 3812 ceph_osdc_update_epoch_barrier(osdc, epoch_barrier); 3813 } 3814 3815 if (msg_version >= 8) { 3816 u64 flush_tid; 3817 u32 caller_uid, caller_gid; 3818 u32 pool_ns_len; 3819 3820 /* version >= 6 */ 3821 ceph_decode_64_safe(&p, end, flush_tid, bad); 3822 /* version >= 7 */ 3823 ceph_decode_32_safe(&p, end, caller_uid, bad); 3824 ceph_decode_32_safe(&p, end, caller_gid, bad); 3825 /* version >= 8 */ 3826 ceph_decode_32_safe(&p, end, pool_ns_len, bad); 3827 if (pool_ns_len > 0) { 3828 ceph_decode_need(&p, end, pool_ns_len, bad); 3829 extra_info.pool_ns = 3830 ceph_find_or_create_string(p, pool_ns_len); 3831 p += pool_ns_len; 3832 } 3833 } 3834 3835 if (msg_version >= 11) { 3836 struct ceph_timespec *btime; 3837 u64 change_attr; 3838 u32 flags; 3839 3840 /* version >= 9 */ 3841 if (p + sizeof(*btime) > end) 3842 goto bad; 3843 btime = p; 3844 p += sizeof(*btime); 3845 ceph_decode_64_safe(&p, end, change_attr, bad); 3846 /* version >= 10 */ 3847 ceph_decode_32_safe(&p, end, flags, bad); 3848 /* version >= 11 */ 3849 extra_info.dirstat_valid = true; 3850 ceph_decode_64_safe(&p, end, extra_info.nfiles, bad); 3851 ceph_decode_64_safe(&p, end, extra_info.nsubdirs, bad); 3852 } 3853 3854 /* lookup ino */ 3855 inode = ceph_find_inode(mdsc->fsc->sb, vino); 3856 ci = ceph_inode(inode); 3857 dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino, 3858 vino.snap, inode); 3859 3860 mutex_lock(&session->s_mutex); 3861 session->s_seq++; 3862 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, 3863 (unsigned)seq); 3864 3865 if (!inode) { 3866 dout(" i don't have ino %llx\n", vino.ino); 3867 3868 if (op == CEPH_CAP_OP_IMPORT) { 3869 cap = ceph_get_cap(mdsc, NULL); 3870 cap->cap_ino = vino.ino; 3871 cap->queue_release = 1; 3872 cap->cap_id = le64_to_cpu(h->cap_id); 3873 cap->mseq = mseq; 3874 cap->seq = seq; 3875 cap->issue_seq = seq; 3876 spin_lock(&session->s_cap_lock); 3877 list_add_tail(&cap->session_caps, 3878 &session->s_cap_releases); 3879 session->s_num_cap_releases++; 3880 spin_unlock(&session->s_cap_lock); 3881 } 3882 goto flush_cap_releases; 3883 } 3884 3885 /* these will work even if we don't have a cap yet */ 3886 switch (op) { 3887 case CEPH_CAP_OP_FLUSHSNAP_ACK: 3888 handle_cap_flushsnap_ack(inode, le64_to_cpu(msg->hdr.tid), 3889 h, session); 3890 goto done; 3891 3892 case CEPH_CAP_OP_EXPORT: 3893 handle_cap_export(inode, h, peer, session); 3894 goto done_unlocked; 3895 3896 case CEPH_CAP_OP_IMPORT: 3897 realm = NULL; 3898 if (snaptrace_len) { 3899 down_write(&mdsc->snap_rwsem); 3900 ceph_update_snap_trace(mdsc, snaptrace, 3901 snaptrace + snaptrace_len, 3902 false, &realm); 3903 downgrade_write(&mdsc->snap_rwsem); 3904 } else { 3905 down_read(&mdsc->snap_rwsem); 3906 } 3907 handle_cap_import(mdsc, inode, h, peer, session, 3908 &cap, &extra_info.issued); 3909 handle_cap_grant(inode, session, cap, 3910 h, msg->middle, &extra_info); 3911 if (realm) 3912 ceph_put_snap_realm(mdsc, realm); 3913 goto done_unlocked; 3914 } 3915 3916 /* the rest require a cap */ 3917 spin_lock(&ci->i_ceph_lock); 3918 cap = __get_cap_for_mds(ceph_inode(inode), session->s_mds); 3919 if (!cap) { 3920 dout(" no cap on %p ino %llx.%llx from mds%d\n", 3921 inode, ceph_ino(inode), ceph_snap(inode), 3922 session->s_mds); 3923 spin_unlock(&ci->i_ceph_lock); 3924 goto flush_cap_releases; 3925 } 3926 3927 /* note that each of these drops i_ceph_lock for us */ 3928 switch (op) { 3929 case CEPH_CAP_OP_REVOKE: 3930 case CEPH_CAP_OP_GRANT: 3931 __ceph_caps_issued(ci, &extra_info.issued); 3932 extra_info.issued |= __ceph_caps_dirty(ci); 3933 handle_cap_grant(inode, session, cap, 3934 h, msg->middle, &extra_info); 3935 goto done_unlocked; 3936 3937 case CEPH_CAP_OP_FLUSH_ACK: 3938 handle_cap_flush_ack(inode, le64_to_cpu(msg->hdr.tid), 3939 h, session, cap); 3940 break; 3941 3942 case CEPH_CAP_OP_TRUNC: 3943 handle_cap_trunc(inode, h, session); 3944 break; 3945 3946 default: 3947 spin_unlock(&ci->i_ceph_lock); 3948 pr_err("ceph_handle_caps: unknown cap op %d %s\n", op, 3949 ceph_cap_op_name(op)); 3950 } 3951 3952 goto done; 3953 3954 flush_cap_releases: 3955 /* 3956 * send any cap release message to try to move things 3957 * along for the mds (who clearly thinks we still have this 3958 * cap). 3959 */ 3960 ceph_send_cap_releases(mdsc, session); 3961 3962 done: 3963 mutex_unlock(&session->s_mutex); 3964 done_unlocked: 3965 iput(inode); 3966 ceph_put_string(extra_info.pool_ns); 3967 return; 3968 3969 bad: 3970 pr_err("ceph_handle_caps: corrupt message\n"); 3971 ceph_msg_dump(msg); 3972 return; 3973 } 3974 3975 /* 3976 * Delayed work handler to process end of delayed cap release LRU list. 3977 */ 3978 void ceph_check_delayed_caps(struct ceph_mds_client *mdsc) 3979 { 3980 struct inode *inode; 3981 struct ceph_inode_info *ci; 3982 int flags = CHECK_CAPS_NODELAY; 3983 3984 dout("check_delayed_caps\n"); 3985 while (1) { 3986 spin_lock(&mdsc->cap_delay_lock); 3987 if (list_empty(&mdsc->cap_delay_list)) 3988 break; 3989 ci = list_first_entry(&mdsc->cap_delay_list, 3990 struct ceph_inode_info, 3991 i_cap_delay_list); 3992 if ((ci->i_ceph_flags & CEPH_I_FLUSH) == 0 && 3993 time_before(jiffies, ci->i_hold_caps_max)) 3994 break; 3995 list_del_init(&ci->i_cap_delay_list); 3996 3997 inode = igrab(&ci->vfs_inode); 3998 spin_unlock(&mdsc->cap_delay_lock); 3999 4000 if (inode) { 4001 dout("check_delayed_caps on %p\n", inode); 4002 ceph_check_caps(ci, flags, NULL); 4003 iput(inode); 4004 } 4005 } 4006 spin_unlock(&mdsc->cap_delay_lock); 4007 } 4008 4009 /* 4010 * Flush all dirty caps to the mds 4011 */ 4012 void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc) 4013 { 4014 struct ceph_inode_info *ci; 4015 struct inode *inode; 4016 4017 dout("flush_dirty_caps\n"); 4018 spin_lock(&mdsc->cap_dirty_lock); 4019 while (!list_empty(&mdsc->cap_dirty)) { 4020 ci = list_first_entry(&mdsc->cap_dirty, struct ceph_inode_info, 4021 i_dirty_item); 4022 inode = &ci->vfs_inode; 4023 ihold(inode); 4024 dout("flush_dirty_caps %p\n", inode); 4025 spin_unlock(&mdsc->cap_dirty_lock); 4026 ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_FLUSH, NULL); 4027 iput(inode); 4028 spin_lock(&mdsc->cap_dirty_lock); 4029 } 4030 spin_unlock(&mdsc->cap_dirty_lock); 4031 dout("flush_dirty_caps done\n"); 4032 } 4033 4034 void __ceph_get_fmode(struct ceph_inode_info *ci, int fmode) 4035 { 4036 int i; 4037 int bits = (fmode << 1) | 1; 4038 for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { 4039 if (bits & (1 << i)) 4040 ci->i_nr_by_mode[i]++; 4041 } 4042 } 4043 4044 /* 4045 * Drop open file reference. If we were the last open file, 4046 * we may need to release capabilities to the MDS (or schedule 4047 * their delayed release). 4048 */ 4049 void ceph_put_fmode(struct ceph_inode_info *ci, int fmode) 4050 { 4051 int i, last = 0; 4052 int bits = (fmode << 1) | 1; 4053 spin_lock(&ci->i_ceph_lock); 4054 for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { 4055 if (bits & (1 << i)) { 4056 BUG_ON(ci->i_nr_by_mode[i] == 0); 4057 if (--ci->i_nr_by_mode[i] == 0) 4058 last++; 4059 } 4060 } 4061 dout("put_fmode %p fmode %d {%d,%d,%d,%d}\n", 4062 &ci->vfs_inode, fmode, 4063 ci->i_nr_by_mode[0], ci->i_nr_by_mode[1], 4064 ci->i_nr_by_mode[2], ci->i_nr_by_mode[3]); 4065 spin_unlock(&ci->i_ceph_lock); 4066 4067 if (last && ci->i_vino.snap == CEPH_NOSNAP) 4068 ceph_check_caps(ci, 0, NULL); 4069 } 4070 4071 /* 4072 * For a soon-to-be unlinked file, drop the AUTH_RDCACHE caps. If it 4073 * looks like the link count will hit 0, drop any other caps (other 4074 * than PIN) we don't specifically want (due to the file still being 4075 * open). 4076 */ 4077 int ceph_drop_caps_for_unlink(struct inode *inode) 4078 { 4079 struct ceph_inode_info *ci = ceph_inode(inode); 4080 int drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL; 4081 4082 spin_lock(&ci->i_ceph_lock); 4083 if (inode->i_nlink == 1) { 4084 drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN); 4085 4086 ci->i_ceph_flags |= CEPH_I_NODELAY; 4087 if (__ceph_caps_dirty(ci)) { 4088 struct ceph_mds_client *mdsc = 4089 ceph_inode_to_client(inode)->mdsc; 4090 __cap_delay_requeue_front(mdsc, ci); 4091 } 4092 } 4093 spin_unlock(&ci->i_ceph_lock); 4094 return drop; 4095 } 4096 4097 /* 4098 * Helpers for embedding cap and dentry lease releases into mds 4099 * requests. 4100 * 4101 * @force is used by dentry_release (below) to force inclusion of a 4102 * record for the directory inode, even when there aren't any caps to 4103 * drop. 4104 */ 4105 int ceph_encode_inode_release(void **p, struct inode *inode, 4106 int mds, int drop, int unless, int force) 4107 { 4108 struct ceph_inode_info *ci = ceph_inode(inode); 4109 struct ceph_cap *cap; 4110 struct ceph_mds_request_release *rel = *p; 4111 int used, dirty; 4112 int ret = 0; 4113 4114 spin_lock(&ci->i_ceph_lock); 4115 used = __ceph_caps_used(ci); 4116 dirty = __ceph_caps_dirty(ci); 4117 4118 dout("encode_inode_release %p mds%d used|dirty %s drop %s unless %s\n", 4119 inode, mds, ceph_cap_string(used|dirty), ceph_cap_string(drop), 4120 ceph_cap_string(unless)); 4121 4122 /* only drop unused, clean caps */ 4123 drop &= ~(used | dirty); 4124 4125 cap = __get_cap_for_mds(ci, mds); 4126 if (cap && __cap_is_valid(cap)) { 4127 unless &= cap->issued; 4128 if (unless) { 4129 if (unless & CEPH_CAP_AUTH_EXCL) 4130 drop &= ~CEPH_CAP_AUTH_SHARED; 4131 if (unless & CEPH_CAP_LINK_EXCL) 4132 drop &= ~CEPH_CAP_LINK_SHARED; 4133 if (unless & CEPH_CAP_XATTR_EXCL) 4134 drop &= ~CEPH_CAP_XATTR_SHARED; 4135 if (unless & CEPH_CAP_FILE_EXCL) 4136 drop &= ~CEPH_CAP_FILE_SHARED; 4137 } 4138 4139 if (force || (cap->issued & drop)) { 4140 if (cap->issued & drop) { 4141 int wanted = __ceph_caps_wanted(ci); 4142 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0) 4143 wanted |= cap->mds_wanted; 4144 dout("encode_inode_release %p cap %p " 4145 "%s -> %s, wanted %s -> %s\n", inode, cap, 4146 ceph_cap_string(cap->issued), 4147 ceph_cap_string(cap->issued & ~drop), 4148 ceph_cap_string(cap->mds_wanted), 4149 ceph_cap_string(wanted)); 4150 4151 cap->issued &= ~drop; 4152 cap->implemented &= ~drop; 4153 cap->mds_wanted = wanted; 4154 } else { 4155 dout("encode_inode_release %p cap %p %s" 4156 " (force)\n", inode, cap, 4157 ceph_cap_string(cap->issued)); 4158 } 4159 4160 rel->ino = cpu_to_le64(ceph_ino(inode)); 4161 rel->cap_id = cpu_to_le64(cap->cap_id); 4162 rel->seq = cpu_to_le32(cap->seq); 4163 rel->issue_seq = cpu_to_le32(cap->issue_seq); 4164 rel->mseq = cpu_to_le32(cap->mseq); 4165 rel->caps = cpu_to_le32(cap->implemented); 4166 rel->wanted = cpu_to_le32(cap->mds_wanted); 4167 rel->dname_len = 0; 4168 rel->dname_seq = 0; 4169 *p += sizeof(*rel); 4170 ret = 1; 4171 } else { 4172 dout("encode_inode_release %p cap %p %s (noop)\n", 4173 inode, cap, ceph_cap_string(cap->issued)); 4174 } 4175 } 4176 spin_unlock(&ci->i_ceph_lock); 4177 return ret; 4178 } 4179 4180 int ceph_encode_dentry_release(void **p, struct dentry *dentry, 4181 struct inode *dir, 4182 int mds, int drop, int unless) 4183 { 4184 struct dentry *parent = NULL; 4185 struct ceph_mds_request_release *rel = *p; 4186 struct ceph_dentry_info *di = ceph_dentry(dentry); 4187 int force = 0; 4188 int ret; 4189 4190 /* 4191 * force an record for the directory caps if we have a dentry lease. 4192 * this is racy (can't take i_ceph_lock and d_lock together), but it 4193 * doesn't have to be perfect; the mds will revoke anything we don't 4194 * release. 4195 */ 4196 spin_lock(&dentry->d_lock); 4197 if (di->lease_session && di->lease_session->s_mds == mds) 4198 force = 1; 4199 if (!dir) { 4200 parent = dget(dentry->d_parent); 4201 dir = d_inode(parent); 4202 } 4203 spin_unlock(&dentry->d_lock); 4204 4205 ret = ceph_encode_inode_release(p, dir, mds, drop, unless, force); 4206 dput(parent); 4207 4208 spin_lock(&dentry->d_lock); 4209 if (ret && di->lease_session && di->lease_session->s_mds == mds) { 4210 dout("encode_dentry_release %p mds%d seq %d\n", 4211 dentry, mds, (int)di->lease_seq); 4212 rel->dname_len = cpu_to_le32(dentry->d_name.len); 4213 memcpy(*p, dentry->d_name.name, dentry->d_name.len); 4214 *p += dentry->d_name.len; 4215 rel->dname_seq = cpu_to_le32(di->lease_seq); 4216 __ceph_mdsc_drop_dentry_lease(dentry); 4217 } 4218 spin_unlock(&dentry->d_lock); 4219 return ret; 4220 } 4221