1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/kernel.h> 6 #include <linux/sched/signal.h> 7 #include <linux/slab.h> 8 #include <linux/vmalloc.h> 9 #include <linux/wait.h> 10 #include <linux/writeback.h> 11 12 #include "super.h" 13 #include "mds_client.h" 14 #include "cache.h" 15 #include <linux/ceph/decode.h> 16 #include <linux/ceph/messenger.h> 17 18 /* 19 * Capability management 20 * 21 * The Ceph metadata servers control client access to inode metadata 22 * and file data by issuing capabilities, granting clients permission 23 * to read and/or write both inode field and file data to OSDs 24 * (storage nodes). Each capability consists of a set of bits 25 * indicating which operations are allowed. 26 * 27 * If the client holds a *_SHARED cap, the client has a coherent value 28 * that can be safely read from the cached inode. 29 * 30 * In the case of a *_EXCL (exclusive) or FILE_WR capabilities, the 31 * client is allowed to change inode attributes (e.g., file size, 32 * mtime), note its dirty state in the ceph_cap, and asynchronously 33 * flush that metadata change to the MDS. 34 * 35 * In the event of a conflicting operation (perhaps by another 36 * client), the MDS will revoke the conflicting client capabilities. 37 * 38 * In order for a client to cache an inode, it must hold a capability 39 * with at least one MDS server. When inodes are released, release 40 * notifications are batched and periodically sent en masse to the MDS 41 * cluster to release server state. 42 */ 43 44 static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc); 45 static void __kick_flushing_caps(struct ceph_mds_client *mdsc, 46 struct ceph_mds_session *session, 47 struct ceph_inode_info *ci, 48 u64 oldest_flush_tid); 49 50 /* 51 * Generate readable cap strings for debugging output. 52 */ 53 #define MAX_CAP_STR 20 54 static char cap_str[MAX_CAP_STR][40]; 55 static DEFINE_SPINLOCK(cap_str_lock); 56 static int last_cap_str; 57 58 static char *gcap_string(char *s, int c) 59 { 60 if (c & CEPH_CAP_GSHARED) 61 *s++ = 's'; 62 if (c & CEPH_CAP_GEXCL) 63 *s++ = 'x'; 64 if (c & CEPH_CAP_GCACHE) 65 *s++ = 'c'; 66 if (c & CEPH_CAP_GRD) 67 *s++ = 'r'; 68 if (c & CEPH_CAP_GWR) 69 *s++ = 'w'; 70 if (c & CEPH_CAP_GBUFFER) 71 *s++ = 'b'; 72 if (c & CEPH_CAP_GLAZYIO) 73 *s++ = 'l'; 74 return s; 75 } 76 77 const char *ceph_cap_string(int caps) 78 { 79 int i; 80 char *s; 81 int c; 82 83 spin_lock(&cap_str_lock); 84 i = last_cap_str++; 85 if (last_cap_str == MAX_CAP_STR) 86 last_cap_str = 0; 87 spin_unlock(&cap_str_lock); 88 89 s = cap_str[i]; 90 91 if (caps & CEPH_CAP_PIN) 92 *s++ = 'p'; 93 94 c = (caps >> CEPH_CAP_SAUTH) & 3; 95 if (c) { 96 *s++ = 'A'; 97 s = gcap_string(s, c); 98 } 99 100 c = (caps >> CEPH_CAP_SLINK) & 3; 101 if (c) { 102 *s++ = 'L'; 103 s = gcap_string(s, c); 104 } 105 106 c = (caps >> CEPH_CAP_SXATTR) & 3; 107 if (c) { 108 *s++ = 'X'; 109 s = gcap_string(s, c); 110 } 111 112 c = caps >> CEPH_CAP_SFILE; 113 if (c) { 114 *s++ = 'F'; 115 s = gcap_string(s, c); 116 } 117 118 if (s == cap_str[i]) 119 *s++ = '-'; 120 *s = 0; 121 return cap_str[i]; 122 } 123 124 void ceph_caps_init(struct ceph_mds_client *mdsc) 125 { 126 INIT_LIST_HEAD(&mdsc->caps_list); 127 spin_lock_init(&mdsc->caps_list_lock); 128 } 129 130 void ceph_caps_finalize(struct ceph_mds_client *mdsc) 131 { 132 struct ceph_cap *cap; 133 134 spin_lock(&mdsc->caps_list_lock); 135 while (!list_empty(&mdsc->caps_list)) { 136 cap = list_first_entry(&mdsc->caps_list, 137 struct ceph_cap, caps_item); 138 list_del(&cap->caps_item); 139 kmem_cache_free(ceph_cap_cachep, cap); 140 } 141 mdsc->caps_total_count = 0; 142 mdsc->caps_avail_count = 0; 143 mdsc->caps_use_count = 0; 144 mdsc->caps_reserve_count = 0; 145 mdsc->caps_min_count = 0; 146 spin_unlock(&mdsc->caps_list_lock); 147 } 148 149 void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta) 150 { 151 spin_lock(&mdsc->caps_list_lock); 152 mdsc->caps_min_count += delta; 153 BUG_ON(mdsc->caps_min_count < 0); 154 spin_unlock(&mdsc->caps_list_lock); 155 } 156 157 /* 158 * Called under mdsc->mutex. 159 */ 160 int ceph_reserve_caps(struct ceph_mds_client *mdsc, 161 struct ceph_cap_reservation *ctx, int need) 162 { 163 int i, j; 164 struct ceph_cap *cap; 165 int have; 166 int alloc = 0; 167 int max_caps; 168 bool trimmed = false; 169 struct ceph_mds_session *s; 170 LIST_HEAD(newcaps); 171 172 dout("reserve caps ctx=%p need=%d\n", ctx, need); 173 174 /* first reserve any caps that are already allocated */ 175 spin_lock(&mdsc->caps_list_lock); 176 if (mdsc->caps_avail_count >= need) 177 have = need; 178 else 179 have = mdsc->caps_avail_count; 180 mdsc->caps_avail_count -= have; 181 mdsc->caps_reserve_count += have; 182 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 183 mdsc->caps_reserve_count + 184 mdsc->caps_avail_count); 185 spin_unlock(&mdsc->caps_list_lock); 186 187 for (i = have; i < need; ) { 188 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); 189 if (cap) { 190 list_add(&cap->caps_item, &newcaps); 191 alloc++; 192 i++; 193 continue; 194 } 195 196 if (!trimmed) { 197 for (j = 0; j < mdsc->max_sessions; j++) { 198 s = __ceph_lookup_mds_session(mdsc, j); 199 if (!s) 200 continue; 201 mutex_unlock(&mdsc->mutex); 202 203 mutex_lock(&s->s_mutex); 204 max_caps = s->s_nr_caps - (need - i); 205 ceph_trim_caps(mdsc, s, max_caps); 206 mutex_unlock(&s->s_mutex); 207 208 ceph_put_mds_session(s); 209 mutex_lock(&mdsc->mutex); 210 } 211 trimmed = true; 212 213 spin_lock(&mdsc->caps_list_lock); 214 if (mdsc->caps_avail_count) { 215 int more_have; 216 if (mdsc->caps_avail_count >= need - i) 217 more_have = need - i; 218 else 219 more_have = mdsc->caps_avail_count; 220 221 i += more_have; 222 have += more_have; 223 mdsc->caps_avail_count -= more_have; 224 mdsc->caps_reserve_count += more_have; 225 226 } 227 spin_unlock(&mdsc->caps_list_lock); 228 229 continue; 230 } 231 232 pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n", 233 ctx, need, have + alloc); 234 goto out_nomem; 235 } 236 BUG_ON(have + alloc != need); 237 238 spin_lock(&mdsc->caps_list_lock); 239 mdsc->caps_total_count += alloc; 240 mdsc->caps_reserve_count += alloc; 241 list_splice(&newcaps, &mdsc->caps_list); 242 243 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 244 mdsc->caps_reserve_count + 245 mdsc->caps_avail_count); 246 spin_unlock(&mdsc->caps_list_lock); 247 248 ctx->count = need; 249 dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n", 250 ctx, mdsc->caps_total_count, mdsc->caps_use_count, 251 mdsc->caps_reserve_count, mdsc->caps_avail_count); 252 return 0; 253 254 out_nomem: 255 256 spin_lock(&mdsc->caps_list_lock); 257 mdsc->caps_avail_count += have; 258 mdsc->caps_reserve_count -= have; 259 260 while (!list_empty(&newcaps)) { 261 cap = list_first_entry(&newcaps, 262 struct ceph_cap, caps_item); 263 list_del(&cap->caps_item); 264 265 /* Keep some preallocated caps around (ceph_min_count), to 266 * avoid lots of free/alloc churn. */ 267 if (mdsc->caps_avail_count >= 268 mdsc->caps_reserve_count + mdsc->caps_min_count) { 269 kmem_cache_free(ceph_cap_cachep, cap); 270 } else { 271 mdsc->caps_avail_count++; 272 mdsc->caps_total_count++; 273 list_add(&cap->caps_item, &mdsc->caps_list); 274 } 275 } 276 277 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 278 mdsc->caps_reserve_count + 279 mdsc->caps_avail_count); 280 spin_unlock(&mdsc->caps_list_lock); 281 return -ENOMEM; 282 } 283 284 int ceph_unreserve_caps(struct ceph_mds_client *mdsc, 285 struct ceph_cap_reservation *ctx) 286 { 287 int i; 288 struct ceph_cap *cap; 289 290 dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count); 291 if (ctx->count) { 292 spin_lock(&mdsc->caps_list_lock); 293 BUG_ON(mdsc->caps_reserve_count < ctx->count); 294 mdsc->caps_reserve_count -= ctx->count; 295 if (mdsc->caps_avail_count >= 296 mdsc->caps_reserve_count + mdsc->caps_min_count) { 297 mdsc->caps_total_count -= ctx->count; 298 for (i = 0; i < ctx->count; i++) { 299 cap = list_first_entry(&mdsc->caps_list, 300 struct ceph_cap, caps_item); 301 list_del(&cap->caps_item); 302 kmem_cache_free(ceph_cap_cachep, cap); 303 } 304 } else { 305 mdsc->caps_avail_count += ctx->count; 306 } 307 ctx->count = 0; 308 dout("unreserve caps %d = %d used + %d resv + %d avail\n", 309 mdsc->caps_total_count, mdsc->caps_use_count, 310 mdsc->caps_reserve_count, mdsc->caps_avail_count); 311 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 312 mdsc->caps_reserve_count + 313 mdsc->caps_avail_count); 314 spin_unlock(&mdsc->caps_list_lock); 315 } 316 return 0; 317 } 318 319 struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc, 320 struct ceph_cap_reservation *ctx) 321 { 322 struct ceph_cap *cap = NULL; 323 324 /* temporary, until we do something about cap import/export */ 325 if (!ctx) { 326 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); 327 if (cap) { 328 spin_lock(&mdsc->caps_list_lock); 329 mdsc->caps_use_count++; 330 mdsc->caps_total_count++; 331 spin_unlock(&mdsc->caps_list_lock); 332 } else { 333 spin_lock(&mdsc->caps_list_lock); 334 if (mdsc->caps_avail_count) { 335 BUG_ON(list_empty(&mdsc->caps_list)); 336 337 mdsc->caps_avail_count--; 338 mdsc->caps_use_count++; 339 cap = list_first_entry(&mdsc->caps_list, 340 struct ceph_cap, caps_item); 341 list_del(&cap->caps_item); 342 343 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 344 mdsc->caps_reserve_count + mdsc->caps_avail_count); 345 } 346 spin_unlock(&mdsc->caps_list_lock); 347 } 348 349 return cap; 350 } 351 352 spin_lock(&mdsc->caps_list_lock); 353 dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n", 354 ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count, 355 mdsc->caps_reserve_count, mdsc->caps_avail_count); 356 BUG_ON(!ctx->count); 357 BUG_ON(ctx->count > mdsc->caps_reserve_count); 358 BUG_ON(list_empty(&mdsc->caps_list)); 359 360 ctx->count--; 361 mdsc->caps_reserve_count--; 362 mdsc->caps_use_count++; 363 364 cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item); 365 list_del(&cap->caps_item); 366 367 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 368 mdsc->caps_reserve_count + mdsc->caps_avail_count); 369 spin_unlock(&mdsc->caps_list_lock); 370 return cap; 371 } 372 373 void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap) 374 { 375 spin_lock(&mdsc->caps_list_lock); 376 dout("put_cap %p %d = %d used + %d resv + %d avail\n", 377 cap, mdsc->caps_total_count, mdsc->caps_use_count, 378 mdsc->caps_reserve_count, mdsc->caps_avail_count); 379 mdsc->caps_use_count--; 380 /* 381 * Keep some preallocated caps around (ceph_min_count), to 382 * avoid lots of free/alloc churn. 383 */ 384 if (mdsc->caps_avail_count >= mdsc->caps_reserve_count + 385 mdsc->caps_min_count) { 386 mdsc->caps_total_count--; 387 kmem_cache_free(ceph_cap_cachep, cap); 388 } else { 389 mdsc->caps_avail_count++; 390 list_add(&cap->caps_item, &mdsc->caps_list); 391 } 392 393 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 394 mdsc->caps_reserve_count + mdsc->caps_avail_count); 395 spin_unlock(&mdsc->caps_list_lock); 396 } 397 398 void ceph_reservation_status(struct ceph_fs_client *fsc, 399 int *total, int *avail, int *used, int *reserved, 400 int *min) 401 { 402 struct ceph_mds_client *mdsc = fsc->mdsc; 403 404 spin_lock(&mdsc->caps_list_lock); 405 406 if (total) 407 *total = mdsc->caps_total_count; 408 if (avail) 409 *avail = mdsc->caps_avail_count; 410 if (used) 411 *used = mdsc->caps_use_count; 412 if (reserved) 413 *reserved = mdsc->caps_reserve_count; 414 if (min) 415 *min = mdsc->caps_min_count; 416 417 spin_unlock(&mdsc->caps_list_lock); 418 } 419 420 /* 421 * Find ceph_cap for given mds, if any. 422 * 423 * Called with i_ceph_lock held. 424 */ 425 static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds) 426 { 427 struct ceph_cap *cap; 428 struct rb_node *n = ci->i_caps.rb_node; 429 430 while (n) { 431 cap = rb_entry(n, struct ceph_cap, ci_node); 432 if (mds < cap->mds) 433 n = n->rb_left; 434 else if (mds > cap->mds) 435 n = n->rb_right; 436 else 437 return cap; 438 } 439 return NULL; 440 } 441 442 struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds) 443 { 444 struct ceph_cap *cap; 445 446 spin_lock(&ci->i_ceph_lock); 447 cap = __get_cap_for_mds(ci, mds); 448 spin_unlock(&ci->i_ceph_lock); 449 return cap; 450 } 451 452 /* 453 * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1. 454 */ 455 static int __ceph_get_cap_mds(struct ceph_inode_info *ci) 456 { 457 struct ceph_cap *cap; 458 int mds = -1; 459 struct rb_node *p; 460 461 /* prefer mds with WR|BUFFER|EXCL caps */ 462 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 463 cap = rb_entry(p, struct ceph_cap, ci_node); 464 mds = cap->mds; 465 if (cap->issued & (CEPH_CAP_FILE_WR | 466 CEPH_CAP_FILE_BUFFER | 467 CEPH_CAP_FILE_EXCL)) 468 break; 469 } 470 return mds; 471 } 472 473 int ceph_get_cap_mds(struct inode *inode) 474 { 475 struct ceph_inode_info *ci = ceph_inode(inode); 476 int mds; 477 spin_lock(&ci->i_ceph_lock); 478 mds = __ceph_get_cap_mds(ceph_inode(inode)); 479 spin_unlock(&ci->i_ceph_lock); 480 return mds; 481 } 482 483 /* 484 * Called under i_ceph_lock. 485 */ 486 static void __insert_cap_node(struct ceph_inode_info *ci, 487 struct ceph_cap *new) 488 { 489 struct rb_node **p = &ci->i_caps.rb_node; 490 struct rb_node *parent = NULL; 491 struct ceph_cap *cap = NULL; 492 493 while (*p) { 494 parent = *p; 495 cap = rb_entry(parent, struct ceph_cap, ci_node); 496 if (new->mds < cap->mds) 497 p = &(*p)->rb_left; 498 else if (new->mds > cap->mds) 499 p = &(*p)->rb_right; 500 else 501 BUG(); 502 } 503 504 rb_link_node(&new->ci_node, parent, p); 505 rb_insert_color(&new->ci_node, &ci->i_caps); 506 } 507 508 /* 509 * (re)set cap hold timeouts, which control the delayed release 510 * of unused caps back to the MDS. Should be called on cap use. 511 */ 512 static void __cap_set_timeouts(struct ceph_mds_client *mdsc, 513 struct ceph_inode_info *ci) 514 { 515 struct ceph_mount_options *ma = mdsc->fsc->mount_options; 516 517 ci->i_hold_caps_min = round_jiffies(jiffies + 518 ma->caps_wanted_delay_min * HZ); 519 ci->i_hold_caps_max = round_jiffies(jiffies + 520 ma->caps_wanted_delay_max * HZ); 521 dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode, 522 ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies); 523 } 524 525 /* 526 * (Re)queue cap at the end of the delayed cap release list. 527 * 528 * If I_FLUSH is set, leave the inode at the front of the list. 529 * 530 * Caller holds i_ceph_lock 531 * -> we take mdsc->cap_delay_lock 532 */ 533 static void __cap_delay_requeue(struct ceph_mds_client *mdsc, 534 struct ceph_inode_info *ci) 535 { 536 __cap_set_timeouts(mdsc, ci); 537 dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode, 538 ci->i_ceph_flags, ci->i_hold_caps_max); 539 if (!mdsc->stopping) { 540 spin_lock(&mdsc->cap_delay_lock); 541 if (!list_empty(&ci->i_cap_delay_list)) { 542 if (ci->i_ceph_flags & CEPH_I_FLUSH) 543 goto no_change; 544 list_del_init(&ci->i_cap_delay_list); 545 } 546 list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list); 547 no_change: 548 spin_unlock(&mdsc->cap_delay_lock); 549 } 550 } 551 552 /* 553 * Queue an inode for immediate writeback. Mark inode with I_FLUSH, 554 * indicating we should send a cap message to flush dirty metadata 555 * asap, and move to the front of the delayed cap list. 556 */ 557 static void __cap_delay_requeue_front(struct ceph_mds_client *mdsc, 558 struct ceph_inode_info *ci) 559 { 560 dout("__cap_delay_requeue_front %p\n", &ci->vfs_inode); 561 spin_lock(&mdsc->cap_delay_lock); 562 ci->i_ceph_flags |= CEPH_I_FLUSH; 563 if (!list_empty(&ci->i_cap_delay_list)) 564 list_del_init(&ci->i_cap_delay_list); 565 list_add(&ci->i_cap_delay_list, &mdsc->cap_delay_list); 566 spin_unlock(&mdsc->cap_delay_lock); 567 } 568 569 /* 570 * Cancel delayed work on cap. 571 * 572 * Caller must hold i_ceph_lock. 573 */ 574 static void __cap_delay_cancel(struct ceph_mds_client *mdsc, 575 struct ceph_inode_info *ci) 576 { 577 dout("__cap_delay_cancel %p\n", &ci->vfs_inode); 578 if (list_empty(&ci->i_cap_delay_list)) 579 return; 580 spin_lock(&mdsc->cap_delay_lock); 581 list_del_init(&ci->i_cap_delay_list); 582 spin_unlock(&mdsc->cap_delay_lock); 583 } 584 585 /* 586 * Common issue checks for add_cap, handle_cap_grant. 587 */ 588 static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap, 589 unsigned issued) 590 { 591 unsigned had = __ceph_caps_issued(ci, NULL); 592 593 /* 594 * Each time we receive FILE_CACHE anew, we increment 595 * i_rdcache_gen. 596 */ 597 if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && 598 (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) { 599 ci->i_rdcache_gen++; 600 } 601 602 /* 603 * If FILE_SHARED is newly issued, mark dir not complete. We don't 604 * know what happened to this directory while we didn't have the cap. 605 * If FILE_SHARED is being revoked, also mark dir not complete. It 606 * stops on-going cached readdir. 607 */ 608 if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) { 609 if (issued & CEPH_CAP_FILE_SHARED) 610 atomic_inc(&ci->i_shared_gen); 611 if (S_ISDIR(ci->vfs_inode.i_mode)) { 612 dout(" marking %p NOT complete\n", &ci->vfs_inode); 613 __ceph_dir_clear_complete(ci); 614 } 615 } 616 } 617 618 /* 619 * Add a capability under the given MDS session. 620 * 621 * Caller should hold session snap_rwsem (read) and s_mutex. 622 * 623 * @fmode is the open file mode, if we are opening a file, otherwise 624 * it is < 0. (This is so we can atomically add the cap and add an 625 * open file reference to it.) 626 */ 627 void ceph_add_cap(struct inode *inode, 628 struct ceph_mds_session *session, u64 cap_id, 629 int fmode, unsigned issued, unsigned wanted, 630 unsigned seq, unsigned mseq, u64 realmino, int flags, 631 struct ceph_cap **new_cap) 632 { 633 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 634 struct ceph_inode_info *ci = ceph_inode(inode); 635 struct ceph_cap *cap; 636 int mds = session->s_mds; 637 int actual_wanted; 638 639 dout("add_cap %p mds%d cap %llx %s seq %d\n", inode, 640 session->s_mds, cap_id, ceph_cap_string(issued), seq); 641 642 /* 643 * If we are opening the file, include file mode wanted bits 644 * in wanted. 645 */ 646 if (fmode >= 0) 647 wanted |= ceph_caps_for_mode(fmode); 648 649 cap = __get_cap_for_mds(ci, mds); 650 if (!cap) { 651 cap = *new_cap; 652 *new_cap = NULL; 653 654 cap->issued = 0; 655 cap->implemented = 0; 656 cap->mds = mds; 657 cap->mds_wanted = 0; 658 cap->mseq = 0; 659 660 cap->ci = ci; 661 __insert_cap_node(ci, cap); 662 663 /* add to session cap list */ 664 cap->session = session; 665 spin_lock(&session->s_cap_lock); 666 list_add_tail(&cap->session_caps, &session->s_caps); 667 session->s_nr_caps++; 668 spin_unlock(&session->s_cap_lock); 669 } else { 670 /* 671 * auth mds of the inode changed. we received the cap export 672 * message, but still haven't received the cap import message. 673 * handle_cap_export() updated the new auth MDS' cap. 674 * 675 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing 676 * a message that was send before the cap import message. So 677 * don't remove caps. 678 */ 679 if (ceph_seq_cmp(seq, cap->seq) <= 0) { 680 WARN_ON(cap != ci->i_auth_cap); 681 WARN_ON(cap->cap_id != cap_id); 682 seq = cap->seq; 683 mseq = cap->mseq; 684 issued |= cap->issued; 685 flags |= CEPH_CAP_FLAG_AUTH; 686 } 687 } 688 689 if (!ci->i_snap_realm || 690 ((flags & CEPH_CAP_FLAG_AUTH) && 691 realmino != (u64)-1 && ci->i_snap_realm->ino != realmino)) { 692 /* 693 * add this inode to the appropriate snap realm 694 */ 695 struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc, 696 realmino); 697 if (realm) { 698 struct ceph_snap_realm *oldrealm = ci->i_snap_realm; 699 if (oldrealm) { 700 spin_lock(&oldrealm->inodes_with_caps_lock); 701 list_del_init(&ci->i_snap_realm_item); 702 spin_unlock(&oldrealm->inodes_with_caps_lock); 703 } 704 705 spin_lock(&realm->inodes_with_caps_lock); 706 list_add(&ci->i_snap_realm_item, 707 &realm->inodes_with_caps); 708 ci->i_snap_realm = realm; 709 if (realm->ino == ci->i_vino.ino) 710 realm->inode = inode; 711 spin_unlock(&realm->inodes_with_caps_lock); 712 713 if (oldrealm) 714 ceph_put_snap_realm(mdsc, oldrealm); 715 } else { 716 pr_err("ceph_add_cap: couldn't find snap realm %llx\n", 717 realmino); 718 WARN_ON(!realm); 719 } 720 } 721 722 __check_cap_issue(ci, cap, issued); 723 724 /* 725 * If we are issued caps we don't want, or the mds' wanted 726 * value appears to be off, queue a check so we'll release 727 * later and/or update the mds wanted value. 728 */ 729 actual_wanted = __ceph_caps_wanted(ci); 730 if ((wanted & ~actual_wanted) || 731 (issued & ~actual_wanted & CEPH_CAP_ANY_WR)) { 732 dout(" issued %s, mds wanted %s, actual %s, queueing\n", 733 ceph_cap_string(issued), ceph_cap_string(wanted), 734 ceph_cap_string(actual_wanted)); 735 __cap_delay_requeue(mdsc, ci); 736 } 737 738 if (flags & CEPH_CAP_FLAG_AUTH) { 739 if (!ci->i_auth_cap || 740 ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) { 741 ci->i_auth_cap = cap; 742 cap->mds_wanted = wanted; 743 } 744 } else { 745 WARN_ON(ci->i_auth_cap == cap); 746 } 747 748 dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n", 749 inode, ceph_vinop(inode), cap, ceph_cap_string(issued), 750 ceph_cap_string(issued|cap->issued), seq, mds); 751 cap->cap_id = cap_id; 752 cap->issued = issued; 753 cap->implemented |= issued; 754 if (ceph_seq_cmp(mseq, cap->mseq) > 0) 755 cap->mds_wanted = wanted; 756 else 757 cap->mds_wanted |= wanted; 758 cap->seq = seq; 759 cap->issue_seq = seq; 760 cap->mseq = mseq; 761 cap->cap_gen = session->s_cap_gen; 762 763 if (fmode >= 0) 764 __ceph_get_fmode(ci, fmode); 765 } 766 767 /* 768 * Return true if cap has not timed out and belongs to the current 769 * generation of the MDS session (i.e. has not gone 'stale' due to 770 * us losing touch with the mds). 771 */ 772 static int __cap_is_valid(struct ceph_cap *cap) 773 { 774 unsigned long ttl; 775 u32 gen; 776 777 spin_lock(&cap->session->s_gen_ttl_lock); 778 gen = cap->session->s_cap_gen; 779 ttl = cap->session->s_cap_ttl; 780 spin_unlock(&cap->session->s_gen_ttl_lock); 781 782 if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) { 783 dout("__cap_is_valid %p cap %p issued %s " 784 "but STALE (gen %u vs %u)\n", &cap->ci->vfs_inode, 785 cap, ceph_cap_string(cap->issued), cap->cap_gen, gen); 786 return 0; 787 } 788 789 return 1; 790 } 791 792 /* 793 * Return set of valid cap bits issued to us. Note that caps time 794 * out, and may be invalidated in bulk if the client session times out 795 * and session->s_cap_gen is bumped. 796 */ 797 int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented) 798 { 799 int have = ci->i_snap_caps; 800 struct ceph_cap *cap; 801 struct rb_node *p; 802 803 if (implemented) 804 *implemented = 0; 805 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 806 cap = rb_entry(p, struct ceph_cap, ci_node); 807 if (!__cap_is_valid(cap)) 808 continue; 809 dout("__ceph_caps_issued %p cap %p issued %s\n", 810 &ci->vfs_inode, cap, ceph_cap_string(cap->issued)); 811 have |= cap->issued; 812 if (implemented) 813 *implemented |= cap->implemented; 814 } 815 /* 816 * exclude caps issued by non-auth MDS, but are been revoking 817 * by the auth MDS. The non-auth MDS should be revoking/exporting 818 * these caps, but the message is delayed. 819 */ 820 if (ci->i_auth_cap) { 821 cap = ci->i_auth_cap; 822 have &= ~cap->implemented | cap->issued; 823 } 824 return have; 825 } 826 827 /* 828 * Get cap bits issued by caps other than @ocap 829 */ 830 int __ceph_caps_issued_other(struct ceph_inode_info *ci, struct ceph_cap *ocap) 831 { 832 int have = ci->i_snap_caps; 833 struct ceph_cap *cap; 834 struct rb_node *p; 835 836 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 837 cap = rb_entry(p, struct ceph_cap, ci_node); 838 if (cap == ocap) 839 continue; 840 if (!__cap_is_valid(cap)) 841 continue; 842 have |= cap->issued; 843 } 844 return have; 845 } 846 847 /* 848 * Move a cap to the end of the LRU (oldest caps at list head, newest 849 * at list tail). 850 */ 851 static void __touch_cap(struct ceph_cap *cap) 852 { 853 struct ceph_mds_session *s = cap->session; 854 855 spin_lock(&s->s_cap_lock); 856 if (!s->s_cap_iterator) { 857 dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap, 858 s->s_mds); 859 list_move_tail(&cap->session_caps, &s->s_caps); 860 } else { 861 dout("__touch_cap %p cap %p mds%d NOP, iterating over caps\n", 862 &cap->ci->vfs_inode, cap, s->s_mds); 863 } 864 spin_unlock(&s->s_cap_lock); 865 } 866 867 /* 868 * Check if we hold the given mask. If so, move the cap(s) to the 869 * front of their respective LRUs. (This is the preferred way for 870 * callers to check for caps they want.) 871 */ 872 int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch) 873 { 874 struct ceph_cap *cap; 875 struct rb_node *p; 876 int have = ci->i_snap_caps; 877 878 if ((have & mask) == mask) { 879 dout("__ceph_caps_issued_mask %p snap issued %s" 880 " (mask %s)\n", &ci->vfs_inode, 881 ceph_cap_string(have), 882 ceph_cap_string(mask)); 883 return 1; 884 } 885 886 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 887 cap = rb_entry(p, struct ceph_cap, ci_node); 888 if (!__cap_is_valid(cap)) 889 continue; 890 if ((cap->issued & mask) == mask) { 891 dout("__ceph_caps_issued_mask %p cap %p issued %s" 892 " (mask %s)\n", &ci->vfs_inode, cap, 893 ceph_cap_string(cap->issued), 894 ceph_cap_string(mask)); 895 if (touch) 896 __touch_cap(cap); 897 return 1; 898 } 899 900 /* does a combination of caps satisfy mask? */ 901 have |= cap->issued; 902 if ((have & mask) == mask) { 903 dout("__ceph_caps_issued_mask %p combo issued %s" 904 " (mask %s)\n", &ci->vfs_inode, 905 ceph_cap_string(cap->issued), 906 ceph_cap_string(mask)); 907 if (touch) { 908 struct rb_node *q; 909 910 /* touch this + preceding caps */ 911 __touch_cap(cap); 912 for (q = rb_first(&ci->i_caps); q != p; 913 q = rb_next(q)) { 914 cap = rb_entry(q, struct ceph_cap, 915 ci_node); 916 if (!__cap_is_valid(cap)) 917 continue; 918 __touch_cap(cap); 919 } 920 } 921 return 1; 922 } 923 } 924 925 return 0; 926 } 927 928 /* 929 * Return true if mask caps are currently being revoked by an MDS. 930 */ 931 int __ceph_caps_revoking_other(struct ceph_inode_info *ci, 932 struct ceph_cap *ocap, int mask) 933 { 934 struct ceph_cap *cap; 935 struct rb_node *p; 936 937 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 938 cap = rb_entry(p, struct ceph_cap, ci_node); 939 if (cap != ocap && 940 (cap->implemented & ~cap->issued & mask)) 941 return 1; 942 } 943 return 0; 944 } 945 946 int ceph_caps_revoking(struct ceph_inode_info *ci, int mask) 947 { 948 struct inode *inode = &ci->vfs_inode; 949 int ret; 950 951 spin_lock(&ci->i_ceph_lock); 952 ret = __ceph_caps_revoking_other(ci, NULL, mask); 953 spin_unlock(&ci->i_ceph_lock); 954 dout("ceph_caps_revoking %p %s = %d\n", inode, 955 ceph_cap_string(mask), ret); 956 return ret; 957 } 958 959 int __ceph_caps_used(struct ceph_inode_info *ci) 960 { 961 int used = 0; 962 if (ci->i_pin_ref) 963 used |= CEPH_CAP_PIN; 964 if (ci->i_rd_ref) 965 used |= CEPH_CAP_FILE_RD; 966 if (ci->i_rdcache_ref || 967 (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */ 968 ci->vfs_inode.i_data.nrpages)) 969 used |= CEPH_CAP_FILE_CACHE; 970 if (ci->i_wr_ref) 971 used |= CEPH_CAP_FILE_WR; 972 if (ci->i_wb_ref || ci->i_wrbuffer_ref) 973 used |= CEPH_CAP_FILE_BUFFER; 974 return used; 975 } 976 977 /* 978 * wanted, by virtue of open file modes 979 */ 980 int __ceph_caps_file_wanted(struct ceph_inode_info *ci) 981 { 982 int i, bits = 0; 983 for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { 984 if (ci->i_nr_by_mode[i]) 985 bits |= 1 << i; 986 } 987 if (bits == 0) 988 return 0; 989 return ceph_caps_for_mode(bits >> 1); 990 } 991 992 /* 993 * Return caps we have registered with the MDS(s) as 'wanted'. 994 */ 995 int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check) 996 { 997 struct ceph_cap *cap; 998 struct rb_node *p; 999 int mds_wanted = 0; 1000 1001 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 1002 cap = rb_entry(p, struct ceph_cap, ci_node); 1003 if (check && !__cap_is_valid(cap)) 1004 continue; 1005 if (cap == ci->i_auth_cap) 1006 mds_wanted |= cap->mds_wanted; 1007 else 1008 mds_wanted |= (cap->mds_wanted & ~CEPH_CAP_ANY_FILE_WR); 1009 } 1010 return mds_wanted; 1011 } 1012 1013 /* 1014 * called under i_ceph_lock 1015 */ 1016 static int __ceph_is_single_caps(struct ceph_inode_info *ci) 1017 { 1018 return rb_first(&ci->i_caps) == rb_last(&ci->i_caps); 1019 } 1020 1021 static int __ceph_is_any_caps(struct ceph_inode_info *ci) 1022 { 1023 return !RB_EMPTY_ROOT(&ci->i_caps); 1024 } 1025 1026 int ceph_is_any_caps(struct inode *inode) 1027 { 1028 struct ceph_inode_info *ci = ceph_inode(inode); 1029 int ret; 1030 1031 spin_lock(&ci->i_ceph_lock); 1032 ret = __ceph_is_any_caps(ci); 1033 spin_unlock(&ci->i_ceph_lock); 1034 1035 return ret; 1036 } 1037 1038 static void drop_inode_snap_realm(struct ceph_inode_info *ci) 1039 { 1040 struct ceph_snap_realm *realm = ci->i_snap_realm; 1041 spin_lock(&realm->inodes_with_caps_lock); 1042 list_del_init(&ci->i_snap_realm_item); 1043 ci->i_snap_realm_counter++; 1044 ci->i_snap_realm = NULL; 1045 spin_unlock(&realm->inodes_with_caps_lock); 1046 ceph_put_snap_realm(ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc, 1047 realm); 1048 } 1049 1050 /* 1051 * Remove a cap. Take steps to deal with a racing iterate_session_caps. 1052 * 1053 * caller should hold i_ceph_lock. 1054 * caller will not hold session s_mutex if called from destroy_inode. 1055 */ 1056 void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) 1057 { 1058 struct ceph_mds_session *session = cap->session; 1059 struct ceph_inode_info *ci = cap->ci; 1060 struct ceph_mds_client *mdsc = 1061 ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; 1062 int removed = 0; 1063 1064 dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode); 1065 1066 /* remove from session list */ 1067 spin_lock(&session->s_cap_lock); 1068 if (session->s_cap_iterator == cap) { 1069 /* not yet, we are iterating over this very cap */ 1070 dout("__ceph_remove_cap delaying %p removal from session %p\n", 1071 cap, cap->session); 1072 } else { 1073 list_del_init(&cap->session_caps); 1074 session->s_nr_caps--; 1075 cap->session = NULL; 1076 removed = 1; 1077 } 1078 /* protect backpointer with s_cap_lock: see iterate_session_caps */ 1079 cap->ci = NULL; 1080 1081 /* 1082 * s_cap_reconnect is protected by s_cap_lock. no one changes 1083 * s_cap_gen while session is in the reconnect state. 1084 */ 1085 if (queue_release && 1086 (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) { 1087 cap->queue_release = 1; 1088 if (removed) { 1089 list_add_tail(&cap->session_caps, 1090 &session->s_cap_releases); 1091 session->s_num_cap_releases++; 1092 removed = 0; 1093 } 1094 } else { 1095 cap->queue_release = 0; 1096 } 1097 cap->cap_ino = ci->i_vino.ino; 1098 1099 spin_unlock(&session->s_cap_lock); 1100 1101 /* remove from inode list */ 1102 rb_erase(&cap->ci_node, &ci->i_caps); 1103 if (ci->i_auth_cap == cap) 1104 ci->i_auth_cap = NULL; 1105 1106 if (removed) 1107 ceph_put_cap(mdsc, cap); 1108 1109 /* when reconnect denied, we remove session caps forcibly, 1110 * i_wr_ref can be non-zero. If there are ongoing write, 1111 * keep i_snap_realm. 1112 */ 1113 if (!__ceph_is_any_caps(ci) && ci->i_wr_ref == 0 && ci->i_snap_realm) 1114 drop_inode_snap_realm(ci); 1115 1116 if (!__ceph_is_any_real_caps(ci)) 1117 __cap_delay_cancel(mdsc, ci); 1118 } 1119 1120 struct cap_msg_args { 1121 struct ceph_mds_session *session; 1122 u64 ino, cid, follows; 1123 u64 flush_tid, oldest_flush_tid, size, max_size; 1124 u64 xattr_version; 1125 struct ceph_buffer *xattr_buf; 1126 struct timespec atime, mtime, ctime; 1127 int op, caps, wanted, dirty; 1128 u32 seq, issue_seq, mseq, time_warp_seq; 1129 u32 flags; 1130 kuid_t uid; 1131 kgid_t gid; 1132 umode_t mode; 1133 bool inline_data; 1134 }; 1135 1136 /* 1137 * Build and send a cap message to the given MDS. 1138 * 1139 * Caller should be holding s_mutex. 1140 */ 1141 static int send_cap_msg(struct cap_msg_args *arg) 1142 { 1143 struct ceph_mds_caps *fc; 1144 struct ceph_msg *msg; 1145 void *p; 1146 size_t extra_len; 1147 struct timespec zerotime = {0}; 1148 struct ceph_osd_client *osdc = &arg->session->s_mdsc->fsc->client->osdc; 1149 1150 dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" 1151 " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu" 1152 " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(arg->op), 1153 arg->cid, arg->ino, ceph_cap_string(arg->caps), 1154 ceph_cap_string(arg->wanted), ceph_cap_string(arg->dirty), 1155 arg->seq, arg->issue_seq, arg->flush_tid, arg->oldest_flush_tid, 1156 arg->mseq, arg->follows, arg->size, arg->max_size, 1157 arg->xattr_version, 1158 arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0); 1159 1160 /* flock buffer size + inline version + inline data size + 1161 * osd_epoch_barrier + oldest_flush_tid */ 1162 extra_len = 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4; 1163 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len, 1164 GFP_NOFS, false); 1165 if (!msg) 1166 return -ENOMEM; 1167 1168 msg->hdr.version = cpu_to_le16(10); 1169 msg->hdr.tid = cpu_to_le64(arg->flush_tid); 1170 1171 fc = msg->front.iov_base; 1172 memset(fc, 0, sizeof(*fc)); 1173 1174 fc->cap_id = cpu_to_le64(arg->cid); 1175 fc->op = cpu_to_le32(arg->op); 1176 fc->seq = cpu_to_le32(arg->seq); 1177 fc->issue_seq = cpu_to_le32(arg->issue_seq); 1178 fc->migrate_seq = cpu_to_le32(arg->mseq); 1179 fc->caps = cpu_to_le32(arg->caps); 1180 fc->wanted = cpu_to_le32(arg->wanted); 1181 fc->dirty = cpu_to_le32(arg->dirty); 1182 fc->ino = cpu_to_le64(arg->ino); 1183 fc->snap_follows = cpu_to_le64(arg->follows); 1184 1185 fc->size = cpu_to_le64(arg->size); 1186 fc->max_size = cpu_to_le64(arg->max_size); 1187 ceph_encode_timespec(&fc->mtime, &arg->mtime); 1188 ceph_encode_timespec(&fc->atime, &arg->atime); 1189 ceph_encode_timespec(&fc->ctime, &arg->ctime); 1190 fc->time_warp_seq = cpu_to_le32(arg->time_warp_seq); 1191 1192 fc->uid = cpu_to_le32(from_kuid(&init_user_ns, arg->uid)); 1193 fc->gid = cpu_to_le32(from_kgid(&init_user_ns, arg->gid)); 1194 fc->mode = cpu_to_le32(arg->mode); 1195 1196 fc->xattr_version = cpu_to_le64(arg->xattr_version); 1197 if (arg->xattr_buf) { 1198 msg->middle = ceph_buffer_get(arg->xattr_buf); 1199 fc->xattr_len = cpu_to_le32(arg->xattr_buf->vec.iov_len); 1200 msg->hdr.middle_len = cpu_to_le32(arg->xattr_buf->vec.iov_len); 1201 } 1202 1203 p = fc + 1; 1204 /* flock buffer size (version 2) */ 1205 ceph_encode_32(&p, 0); 1206 /* inline version (version 4) */ 1207 ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE); 1208 /* inline data size */ 1209 ceph_encode_32(&p, 0); 1210 /* 1211 * osd_epoch_barrier (version 5) 1212 * The epoch_barrier is protected osdc->lock, so READ_ONCE here in 1213 * case it was recently changed 1214 */ 1215 ceph_encode_32(&p, READ_ONCE(osdc->epoch_barrier)); 1216 /* oldest_flush_tid (version 6) */ 1217 ceph_encode_64(&p, arg->oldest_flush_tid); 1218 1219 /* 1220 * caller_uid/caller_gid (version 7) 1221 * 1222 * Currently, we don't properly track which caller dirtied the caps 1223 * last, and force a flush of them when there is a conflict. For now, 1224 * just set this to 0:0, to emulate how the MDS has worked up to now. 1225 */ 1226 ceph_encode_32(&p, 0); 1227 ceph_encode_32(&p, 0); 1228 1229 /* pool namespace (version 8) (mds always ignores this) */ 1230 ceph_encode_32(&p, 0); 1231 1232 /* 1233 * btime and change_attr (version 9) 1234 * 1235 * We just zero these out for now, as the MDS ignores them unless 1236 * the requisite feature flags are set (which we don't do yet). 1237 */ 1238 ceph_encode_timespec(p, &zerotime); 1239 p += sizeof(struct ceph_timespec); 1240 ceph_encode_64(&p, 0); 1241 1242 /* Advisory flags (version 10) */ 1243 ceph_encode_32(&p, arg->flags); 1244 1245 ceph_con_send(&arg->session->s_con, msg); 1246 return 0; 1247 } 1248 1249 /* 1250 * Queue cap releases when an inode is dropped from our cache. Since 1251 * inode is about to be destroyed, there is no need for i_ceph_lock. 1252 */ 1253 void ceph_queue_caps_release(struct inode *inode) 1254 { 1255 struct ceph_inode_info *ci = ceph_inode(inode); 1256 struct rb_node *p; 1257 1258 p = rb_first(&ci->i_caps); 1259 while (p) { 1260 struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node); 1261 p = rb_next(p); 1262 __ceph_remove_cap(cap, true); 1263 } 1264 } 1265 1266 /* 1267 * Send a cap msg on the given inode. Update our caps state, then 1268 * drop i_ceph_lock and send the message. 1269 * 1270 * Make note of max_size reported/requested from mds, revoked caps 1271 * that have now been implemented. 1272 * 1273 * Make half-hearted attempt ot to invalidate page cache if we are 1274 * dropping RDCACHE. Note that this will leave behind locked pages 1275 * that we'll then need to deal with elsewhere. 1276 * 1277 * Return non-zero if delayed release, or we experienced an error 1278 * such that the caller should requeue + retry later. 1279 * 1280 * called with i_ceph_lock, then drops it. 1281 * caller should hold snap_rwsem (read), s_mutex. 1282 */ 1283 static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, 1284 int op, bool sync, int used, int want, int retain, 1285 int flushing, u64 flush_tid, u64 oldest_flush_tid) 1286 __releases(cap->ci->i_ceph_lock) 1287 { 1288 struct ceph_inode_info *ci = cap->ci; 1289 struct inode *inode = &ci->vfs_inode; 1290 struct cap_msg_args arg; 1291 int held, revoking; 1292 int wake = 0; 1293 int delayed = 0; 1294 int ret; 1295 1296 held = cap->issued | cap->implemented; 1297 revoking = cap->implemented & ~cap->issued; 1298 retain &= ~revoking; 1299 1300 dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n", 1301 inode, cap, cap->session, 1302 ceph_cap_string(held), ceph_cap_string(held & retain), 1303 ceph_cap_string(revoking)); 1304 BUG_ON((retain & CEPH_CAP_PIN) == 0); 1305 1306 arg.session = cap->session; 1307 1308 /* don't release wanted unless we've waited a bit. */ 1309 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 && 1310 time_before(jiffies, ci->i_hold_caps_min)) { 1311 dout(" delaying issued %s -> %s, wanted %s -> %s on send\n", 1312 ceph_cap_string(cap->issued), 1313 ceph_cap_string(cap->issued & retain), 1314 ceph_cap_string(cap->mds_wanted), 1315 ceph_cap_string(want)); 1316 want |= cap->mds_wanted; 1317 retain |= cap->issued; 1318 delayed = 1; 1319 } 1320 ci->i_ceph_flags &= ~(CEPH_I_NODELAY | CEPH_I_FLUSH); 1321 if (want & ~cap->mds_wanted) { 1322 /* user space may open/close single file frequently. 1323 * This avoids droping mds_wanted immediately after 1324 * requesting new mds_wanted. 1325 */ 1326 __cap_set_timeouts(mdsc, ci); 1327 } 1328 1329 cap->issued &= retain; /* drop bits we don't want */ 1330 if (cap->implemented & ~cap->issued) { 1331 /* 1332 * Wake up any waiters on wanted -> needed transition. 1333 * This is due to the weird transition from buffered 1334 * to sync IO... we need to flush dirty pages _before_ 1335 * allowing sync writes to avoid reordering. 1336 */ 1337 wake = 1; 1338 } 1339 cap->implemented &= cap->issued | used; 1340 cap->mds_wanted = want; 1341 1342 arg.ino = ceph_vino(inode).ino; 1343 arg.cid = cap->cap_id; 1344 arg.follows = flushing ? ci->i_head_snapc->seq : 0; 1345 arg.flush_tid = flush_tid; 1346 arg.oldest_flush_tid = oldest_flush_tid; 1347 1348 arg.size = inode->i_size; 1349 ci->i_reported_size = arg.size; 1350 arg.max_size = ci->i_wanted_max_size; 1351 ci->i_requested_max_size = arg.max_size; 1352 1353 if (flushing & CEPH_CAP_XATTR_EXCL) { 1354 __ceph_build_xattrs_blob(ci); 1355 arg.xattr_version = ci->i_xattrs.version; 1356 arg.xattr_buf = ci->i_xattrs.blob; 1357 } else { 1358 arg.xattr_buf = NULL; 1359 } 1360 1361 arg.mtime = inode->i_mtime; 1362 arg.atime = inode->i_atime; 1363 arg.ctime = inode->i_ctime; 1364 1365 arg.op = op; 1366 arg.caps = cap->implemented; 1367 arg.wanted = want; 1368 arg.dirty = flushing; 1369 1370 arg.seq = cap->seq; 1371 arg.issue_seq = cap->issue_seq; 1372 arg.mseq = cap->mseq; 1373 arg.time_warp_seq = ci->i_time_warp_seq; 1374 1375 arg.uid = inode->i_uid; 1376 arg.gid = inode->i_gid; 1377 arg.mode = inode->i_mode; 1378 1379 arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE; 1380 if (list_empty(&ci->i_cap_snaps)) 1381 arg.flags = CEPH_CLIENT_CAPS_NO_CAPSNAP; 1382 else 1383 arg.flags = CEPH_CLIENT_CAPS_PENDING_CAPSNAP; 1384 if (sync) 1385 arg.flags |= CEPH_CLIENT_CAPS_SYNC; 1386 1387 spin_unlock(&ci->i_ceph_lock); 1388 1389 ret = send_cap_msg(&arg); 1390 if (ret < 0) { 1391 dout("error sending cap msg, must requeue %p\n", inode); 1392 delayed = 1; 1393 } 1394 1395 if (wake) 1396 wake_up_all(&ci->i_cap_wq); 1397 1398 return delayed; 1399 } 1400 1401 static inline int __send_flush_snap(struct inode *inode, 1402 struct ceph_mds_session *session, 1403 struct ceph_cap_snap *capsnap, 1404 u32 mseq, u64 oldest_flush_tid) 1405 { 1406 struct cap_msg_args arg; 1407 1408 arg.session = session; 1409 arg.ino = ceph_vino(inode).ino; 1410 arg.cid = 0; 1411 arg.follows = capsnap->follows; 1412 arg.flush_tid = capsnap->cap_flush.tid; 1413 arg.oldest_flush_tid = oldest_flush_tid; 1414 1415 arg.size = capsnap->size; 1416 arg.max_size = 0; 1417 arg.xattr_version = capsnap->xattr_version; 1418 arg.xattr_buf = capsnap->xattr_blob; 1419 1420 arg.atime = capsnap->atime; 1421 arg.mtime = capsnap->mtime; 1422 arg.ctime = capsnap->ctime; 1423 1424 arg.op = CEPH_CAP_OP_FLUSHSNAP; 1425 arg.caps = capsnap->issued; 1426 arg.wanted = 0; 1427 arg.dirty = capsnap->dirty; 1428 1429 arg.seq = 0; 1430 arg.issue_seq = 0; 1431 arg.mseq = mseq; 1432 arg.time_warp_seq = capsnap->time_warp_seq; 1433 1434 arg.uid = capsnap->uid; 1435 arg.gid = capsnap->gid; 1436 arg.mode = capsnap->mode; 1437 1438 arg.inline_data = capsnap->inline_data; 1439 arg.flags = 0; 1440 1441 return send_cap_msg(&arg); 1442 } 1443 1444 /* 1445 * When a snapshot is taken, clients accumulate dirty metadata on 1446 * inodes with capabilities in ceph_cap_snaps to describe the file 1447 * state at the time the snapshot was taken. This must be flushed 1448 * asynchronously back to the MDS once sync writes complete and dirty 1449 * data is written out. 1450 * 1451 * Called under i_ceph_lock. Takes s_mutex as needed. 1452 */ 1453 static void __ceph_flush_snaps(struct ceph_inode_info *ci, 1454 struct ceph_mds_session *session) 1455 __releases(ci->i_ceph_lock) 1456 __acquires(ci->i_ceph_lock) 1457 { 1458 struct inode *inode = &ci->vfs_inode; 1459 struct ceph_mds_client *mdsc = session->s_mdsc; 1460 struct ceph_cap_snap *capsnap; 1461 u64 oldest_flush_tid = 0; 1462 u64 first_tid = 1, last_tid = 0; 1463 1464 dout("__flush_snaps %p session %p\n", inode, session); 1465 1466 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 1467 /* 1468 * we need to wait for sync writes to complete and for dirty 1469 * pages to be written out. 1470 */ 1471 if (capsnap->dirty_pages || capsnap->writing) 1472 break; 1473 1474 /* should be removed by ceph_try_drop_cap_snap() */ 1475 BUG_ON(!capsnap->need_flush); 1476 1477 /* only flush each capsnap once */ 1478 if (capsnap->cap_flush.tid > 0) { 1479 dout(" already flushed %p, skipping\n", capsnap); 1480 continue; 1481 } 1482 1483 spin_lock(&mdsc->cap_dirty_lock); 1484 capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid; 1485 list_add_tail(&capsnap->cap_flush.g_list, 1486 &mdsc->cap_flush_list); 1487 if (oldest_flush_tid == 0) 1488 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 1489 if (list_empty(&ci->i_flushing_item)) { 1490 list_add_tail(&ci->i_flushing_item, 1491 &session->s_cap_flushing); 1492 } 1493 spin_unlock(&mdsc->cap_dirty_lock); 1494 1495 list_add_tail(&capsnap->cap_flush.i_list, 1496 &ci->i_cap_flush_list); 1497 1498 if (first_tid == 1) 1499 first_tid = capsnap->cap_flush.tid; 1500 last_tid = capsnap->cap_flush.tid; 1501 } 1502 1503 ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS; 1504 1505 while (first_tid <= last_tid) { 1506 struct ceph_cap *cap = ci->i_auth_cap; 1507 struct ceph_cap_flush *cf; 1508 int ret; 1509 1510 if (!(cap && cap->session == session)) { 1511 dout("__flush_snaps %p auth cap %p not mds%d, " 1512 "stop\n", inode, cap, session->s_mds); 1513 break; 1514 } 1515 1516 ret = -ENOENT; 1517 list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) { 1518 if (cf->tid >= first_tid) { 1519 ret = 0; 1520 break; 1521 } 1522 } 1523 if (ret < 0) 1524 break; 1525 1526 first_tid = cf->tid + 1; 1527 1528 capsnap = container_of(cf, struct ceph_cap_snap, cap_flush); 1529 refcount_inc(&capsnap->nref); 1530 spin_unlock(&ci->i_ceph_lock); 1531 1532 dout("__flush_snaps %p capsnap %p tid %llu %s\n", 1533 inode, capsnap, cf->tid, ceph_cap_string(capsnap->dirty)); 1534 1535 ret = __send_flush_snap(inode, session, capsnap, cap->mseq, 1536 oldest_flush_tid); 1537 if (ret < 0) { 1538 pr_err("__flush_snaps: error sending cap flushsnap, " 1539 "ino (%llx.%llx) tid %llu follows %llu\n", 1540 ceph_vinop(inode), cf->tid, capsnap->follows); 1541 } 1542 1543 ceph_put_cap_snap(capsnap); 1544 spin_lock(&ci->i_ceph_lock); 1545 } 1546 } 1547 1548 void ceph_flush_snaps(struct ceph_inode_info *ci, 1549 struct ceph_mds_session **psession) 1550 { 1551 struct inode *inode = &ci->vfs_inode; 1552 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 1553 struct ceph_mds_session *session = NULL; 1554 int mds; 1555 1556 dout("ceph_flush_snaps %p\n", inode); 1557 if (psession) 1558 session = *psession; 1559 retry: 1560 spin_lock(&ci->i_ceph_lock); 1561 if (!(ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)) { 1562 dout(" no capsnap needs flush, doing nothing\n"); 1563 goto out; 1564 } 1565 if (!ci->i_auth_cap) { 1566 dout(" no auth cap (migrating?), doing nothing\n"); 1567 goto out; 1568 } 1569 1570 mds = ci->i_auth_cap->session->s_mds; 1571 if (session && session->s_mds != mds) { 1572 dout(" oops, wrong session %p mutex\n", session); 1573 mutex_unlock(&session->s_mutex); 1574 ceph_put_mds_session(session); 1575 session = NULL; 1576 } 1577 if (!session) { 1578 spin_unlock(&ci->i_ceph_lock); 1579 mutex_lock(&mdsc->mutex); 1580 session = __ceph_lookup_mds_session(mdsc, mds); 1581 mutex_unlock(&mdsc->mutex); 1582 if (session) { 1583 dout(" inverting session/ino locks on %p\n", session); 1584 mutex_lock(&session->s_mutex); 1585 } 1586 goto retry; 1587 } 1588 1589 // make sure flushsnap messages are sent in proper order. 1590 if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { 1591 __kick_flushing_caps(mdsc, session, ci, 0); 1592 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 1593 } 1594 1595 __ceph_flush_snaps(ci, session); 1596 out: 1597 spin_unlock(&ci->i_ceph_lock); 1598 1599 if (psession) { 1600 *psession = session; 1601 } else if (session) { 1602 mutex_unlock(&session->s_mutex); 1603 ceph_put_mds_session(session); 1604 } 1605 /* we flushed them all; remove this inode from the queue */ 1606 spin_lock(&mdsc->snap_flush_lock); 1607 list_del_init(&ci->i_snap_flush_item); 1608 spin_unlock(&mdsc->snap_flush_lock); 1609 } 1610 1611 /* 1612 * Mark caps dirty. If inode is newly dirty, return the dirty flags. 1613 * Caller is then responsible for calling __mark_inode_dirty with the 1614 * returned flags value. 1615 */ 1616 int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, 1617 struct ceph_cap_flush **pcf) 1618 { 1619 struct ceph_mds_client *mdsc = 1620 ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; 1621 struct inode *inode = &ci->vfs_inode; 1622 int was = ci->i_dirty_caps; 1623 int dirty = 0; 1624 1625 if (!ci->i_auth_cap) { 1626 pr_warn("__mark_dirty_caps %p %llx mask %s, " 1627 "but no auth cap (session was closed?)\n", 1628 inode, ceph_ino(inode), ceph_cap_string(mask)); 1629 return 0; 1630 } 1631 1632 dout("__mark_dirty_caps %p %s dirty %s -> %s\n", &ci->vfs_inode, 1633 ceph_cap_string(mask), ceph_cap_string(was), 1634 ceph_cap_string(was | mask)); 1635 ci->i_dirty_caps |= mask; 1636 if (was == 0) { 1637 WARN_ON_ONCE(ci->i_prealloc_cap_flush); 1638 swap(ci->i_prealloc_cap_flush, *pcf); 1639 1640 if (!ci->i_head_snapc) { 1641 WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem)); 1642 ci->i_head_snapc = ceph_get_snap_context( 1643 ci->i_snap_realm->cached_context); 1644 } 1645 dout(" inode %p now dirty snapc %p auth cap %p\n", 1646 &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); 1647 BUG_ON(!list_empty(&ci->i_dirty_item)); 1648 spin_lock(&mdsc->cap_dirty_lock); 1649 list_add(&ci->i_dirty_item, &mdsc->cap_dirty); 1650 spin_unlock(&mdsc->cap_dirty_lock); 1651 if (ci->i_flushing_caps == 0) { 1652 ihold(inode); 1653 dirty |= I_DIRTY_SYNC; 1654 } 1655 } else { 1656 WARN_ON_ONCE(!ci->i_prealloc_cap_flush); 1657 } 1658 BUG_ON(list_empty(&ci->i_dirty_item)); 1659 if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) && 1660 (mask & CEPH_CAP_FILE_BUFFER)) 1661 dirty |= I_DIRTY_DATASYNC; 1662 __cap_delay_requeue(mdsc, ci); 1663 return dirty; 1664 } 1665 1666 struct ceph_cap_flush *ceph_alloc_cap_flush(void) 1667 { 1668 return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL); 1669 } 1670 1671 void ceph_free_cap_flush(struct ceph_cap_flush *cf) 1672 { 1673 if (cf) 1674 kmem_cache_free(ceph_cap_flush_cachep, cf); 1675 } 1676 1677 static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc) 1678 { 1679 if (!list_empty(&mdsc->cap_flush_list)) { 1680 struct ceph_cap_flush *cf = 1681 list_first_entry(&mdsc->cap_flush_list, 1682 struct ceph_cap_flush, g_list); 1683 return cf->tid; 1684 } 1685 return 0; 1686 } 1687 1688 /* 1689 * Remove cap_flush from the mdsc's or inode's flushing cap list. 1690 * Return true if caller needs to wake up flush waiters. 1691 */ 1692 static bool __finish_cap_flush(struct ceph_mds_client *mdsc, 1693 struct ceph_inode_info *ci, 1694 struct ceph_cap_flush *cf) 1695 { 1696 struct ceph_cap_flush *prev; 1697 bool wake = cf->wake; 1698 if (mdsc) { 1699 /* are there older pending cap flushes? */ 1700 if (wake && cf->g_list.prev != &mdsc->cap_flush_list) { 1701 prev = list_prev_entry(cf, g_list); 1702 prev->wake = true; 1703 wake = false; 1704 } 1705 list_del(&cf->g_list); 1706 } else if (ci) { 1707 if (wake && cf->i_list.prev != &ci->i_cap_flush_list) { 1708 prev = list_prev_entry(cf, i_list); 1709 prev->wake = true; 1710 wake = false; 1711 } 1712 list_del(&cf->i_list); 1713 } else { 1714 BUG_ON(1); 1715 } 1716 return wake; 1717 } 1718 1719 /* 1720 * Add dirty inode to the flushing list. Assigned a seq number so we 1721 * can wait for caps to flush without starving. 1722 * 1723 * Called under i_ceph_lock. 1724 */ 1725 static int __mark_caps_flushing(struct inode *inode, 1726 struct ceph_mds_session *session, bool wake, 1727 u64 *flush_tid, u64 *oldest_flush_tid) 1728 { 1729 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 1730 struct ceph_inode_info *ci = ceph_inode(inode); 1731 struct ceph_cap_flush *cf = NULL; 1732 int flushing; 1733 1734 BUG_ON(ci->i_dirty_caps == 0); 1735 BUG_ON(list_empty(&ci->i_dirty_item)); 1736 BUG_ON(!ci->i_prealloc_cap_flush); 1737 1738 flushing = ci->i_dirty_caps; 1739 dout("__mark_caps_flushing flushing %s, flushing_caps %s -> %s\n", 1740 ceph_cap_string(flushing), 1741 ceph_cap_string(ci->i_flushing_caps), 1742 ceph_cap_string(ci->i_flushing_caps | flushing)); 1743 ci->i_flushing_caps |= flushing; 1744 ci->i_dirty_caps = 0; 1745 dout(" inode %p now !dirty\n", inode); 1746 1747 swap(cf, ci->i_prealloc_cap_flush); 1748 cf->caps = flushing; 1749 cf->wake = wake; 1750 1751 spin_lock(&mdsc->cap_dirty_lock); 1752 list_del_init(&ci->i_dirty_item); 1753 1754 cf->tid = ++mdsc->last_cap_flush_tid; 1755 list_add_tail(&cf->g_list, &mdsc->cap_flush_list); 1756 *oldest_flush_tid = __get_oldest_flush_tid(mdsc); 1757 1758 if (list_empty(&ci->i_flushing_item)) { 1759 list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); 1760 mdsc->num_cap_flushing++; 1761 } 1762 spin_unlock(&mdsc->cap_dirty_lock); 1763 1764 list_add_tail(&cf->i_list, &ci->i_cap_flush_list); 1765 1766 *flush_tid = cf->tid; 1767 return flushing; 1768 } 1769 1770 /* 1771 * try to invalidate mapping pages without blocking. 1772 */ 1773 static int try_nonblocking_invalidate(struct inode *inode) 1774 { 1775 struct ceph_inode_info *ci = ceph_inode(inode); 1776 u32 invalidating_gen = ci->i_rdcache_gen; 1777 1778 spin_unlock(&ci->i_ceph_lock); 1779 invalidate_mapping_pages(&inode->i_data, 0, -1); 1780 spin_lock(&ci->i_ceph_lock); 1781 1782 if (inode->i_data.nrpages == 0 && 1783 invalidating_gen == ci->i_rdcache_gen) { 1784 /* success. */ 1785 dout("try_nonblocking_invalidate %p success\n", inode); 1786 /* save any racing async invalidate some trouble */ 1787 ci->i_rdcache_revoking = ci->i_rdcache_gen - 1; 1788 return 0; 1789 } 1790 dout("try_nonblocking_invalidate %p failed\n", inode); 1791 return -1; 1792 } 1793 1794 bool __ceph_should_report_size(struct ceph_inode_info *ci) 1795 { 1796 loff_t size = ci->vfs_inode.i_size; 1797 /* mds will adjust max size according to the reported size */ 1798 if (ci->i_flushing_caps & CEPH_CAP_FILE_WR) 1799 return false; 1800 if (size >= ci->i_max_size) 1801 return true; 1802 /* half of previous max_size increment has been used */ 1803 if (ci->i_max_size > ci->i_reported_size && 1804 (size << 1) >= ci->i_max_size + ci->i_reported_size) 1805 return true; 1806 return false; 1807 } 1808 1809 /* 1810 * Swiss army knife function to examine currently used and wanted 1811 * versus held caps. Release, flush, ack revoked caps to mds as 1812 * appropriate. 1813 * 1814 * CHECK_CAPS_NODELAY - caller is delayed work and we should not delay 1815 * cap release further. 1816 * CHECK_CAPS_AUTHONLY - we should only check the auth cap 1817 * CHECK_CAPS_FLUSH - we should flush any dirty caps immediately, without 1818 * further delay. 1819 */ 1820 void ceph_check_caps(struct ceph_inode_info *ci, int flags, 1821 struct ceph_mds_session *session) 1822 { 1823 struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode); 1824 struct ceph_mds_client *mdsc = fsc->mdsc; 1825 struct inode *inode = &ci->vfs_inode; 1826 struct ceph_cap *cap; 1827 u64 flush_tid, oldest_flush_tid; 1828 int file_wanted, used, cap_used; 1829 int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ 1830 int issued, implemented, want, retain, revoking, flushing = 0; 1831 int mds = -1; /* keep track of how far we've gone through i_caps list 1832 to avoid an infinite loop on retry */ 1833 struct rb_node *p; 1834 int delayed = 0, sent = 0; 1835 bool no_delay = flags & CHECK_CAPS_NODELAY; 1836 bool queue_invalidate = false; 1837 bool tried_invalidate = false; 1838 1839 /* if we are unmounting, flush any unused caps immediately. */ 1840 if (mdsc->stopping) 1841 no_delay = true; 1842 1843 spin_lock(&ci->i_ceph_lock); 1844 1845 if (ci->i_ceph_flags & CEPH_I_FLUSH) 1846 flags |= CHECK_CAPS_FLUSH; 1847 1848 if (!(flags & CHECK_CAPS_AUTHONLY) || 1849 (ci->i_auth_cap && __ceph_is_single_caps(ci))) 1850 __cap_delay_cancel(mdsc, ci); 1851 1852 goto retry_locked; 1853 retry: 1854 spin_lock(&ci->i_ceph_lock); 1855 retry_locked: 1856 file_wanted = __ceph_caps_file_wanted(ci); 1857 used = __ceph_caps_used(ci); 1858 issued = __ceph_caps_issued(ci, &implemented); 1859 revoking = implemented & ~issued; 1860 1861 want = file_wanted; 1862 retain = file_wanted | used | CEPH_CAP_PIN; 1863 if (!mdsc->stopping && inode->i_nlink > 0) { 1864 if (file_wanted) { 1865 retain |= CEPH_CAP_ANY; /* be greedy */ 1866 } else if (S_ISDIR(inode->i_mode) && 1867 (issued & CEPH_CAP_FILE_SHARED) && 1868 __ceph_dir_is_complete(ci)) { 1869 /* 1870 * If a directory is complete, we want to keep 1871 * the exclusive cap. So that MDS does not end up 1872 * revoking the shared cap on every create/unlink 1873 * operation. 1874 */ 1875 want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL; 1876 retain |= want; 1877 } else { 1878 1879 retain |= CEPH_CAP_ANY_SHARED; 1880 /* 1881 * keep RD only if we didn't have the file open RW, 1882 * because then the mds would revoke it anyway to 1883 * journal max_size=0. 1884 */ 1885 if (ci->i_max_size == 0) 1886 retain |= CEPH_CAP_ANY_RD; 1887 } 1888 } 1889 1890 dout("check_caps %p file_want %s used %s dirty %s flushing %s" 1891 " issued %s revoking %s retain %s %s%s%s\n", inode, 1892 ceph_cap_string(file_wanted), 1893 ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps), 1894 ceph_cap_string(ci->i_flushing_caps), 1895 ceph_cap_string(issued), ceph_cap_string(revoking), 1896 ceph_cap_string(retain), 1897 (flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "", 1898 (flags & CHECK_CAPS_NODELAY) ? " NODELAY" : "", 1899 (flags & CHECK_CAPS_FLUSH) ? " FLUSH" : ""); 1900 1901 /* 1902 * If we no longer need to hold onto old our caps, and we may 1903 * have cached pages, but don't want them, then try to invalidate. 1904 * If we fail, it's because pages are locked.... try again later. 1905 */ 1906 if ((!no_delay || mdsc->stopping) && 1907 !S_ISDIR(inode->i_mode) && /* ignore readdir cache */ 1908 !(ci->i_wb_ref || ci->i_wrbuffer_ref) && /* no dirty pages... */ 1909 inode->i_data.nrpages && /* have cached pages */ 1910 (revoking & (CEPH_CAP_FILE_CACHE| 1911 CEPH_CAP_FILE_LAZYIO)) && /* or revoking cache */ 1912 !tried_invalidate) { 1913 dout("check_caps trying to invalidate on %p\n", inode); 1914 if (try_nonblocking_invalidate(inode) < 0) { 1915 dout("check_caps queuing invalidate\n"); 1916 queue_invalidate = true; 1917 ci->i_rdcache_revoking = ci->i_rdcache_gen; 1918 } 1919 tried_invalidate = true; 1920 goto retry_locked; 1921 } 1922 1923 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 1924 cap = rb_entry(p, struct ceph_cap, ci_node); 1925 1926 /* avoid looping forever */ 1927 if (mds >= cap->mds || 1928 ((flags & CHECK_CAPS_AUTHONLY) && cap != ci->i_auth_cap)) 1929 continue; 1930 1931 /* NOTE: no side-effects allowed, until we take s_mutex */ 1932 1933 cap_used = used; 1934 if (ci->i_auth_cap && cap != ci->i_auth_cap) 1935 cap_used &= ~ci->i_auth_cap->issued; 1936 1937 revoking = cap->implemented & ~cap->issued; 1938 dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n", 1939 cap->mds, cap, ceph_cap_string(cap_used), 1940 ceph_cap_string(cap->issued), 1941 ceph_cap_string(cap->implemented), 1942 ceph_cap_string(revoking)); 1943 1944 if (cap == ci->i_auth_cap && 1945 (cap->issued & CEPH_CAP_FILE_WR)) { 1946 /* request larger max_size from MDS? */ 1947 if (ci->i_wanted_max_size > ci->i_max_size && 1948 ci->i_wanted_max_size > ci->i_requested_max_size) { 1949 dout("requesting new max_size\n"); 1950 goto ack; 1951 } 1952 1953 /* approaching file_max? */ 1954 if (__ceph_should_report_size(ci)) { 1955 dout("i_size approaching max_size\n"); 1956 goto ack; 1957 } 1958 } 1959 /* flush anything dirty? */ 1960 if (cap == ci->i_auth_cap) { 1961 if ((flags & CHECK_CAPS_FLUSH) && ci->i_dirty_caps) { 1962 dout("flushing dirty caps\n"); 1963 goto ack; 1964 } 1965 if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) { 1966 dout("flushing snap caps\n"); 1967 goto ack; 1968 } 1969 } 1970 1971 /* completed revocation? going down and there are no caps? */ 1972 if (revoking && (revoking & cap_used) == 0) { 1973 dout("completed revocation of %s\n", 1974 ceph_cap_string(cap->implemented & ~cap->issued)); 1975 goto ack; 1976 } 1977 1978 /* want more caps from mds? */ 1979 if (want & ~(cap->mds_wanted | cap->issued)) 1980 goto ack; 1981 1982 /* things we might delay */ 1983 if ((cap->issued & ~retain) == 0 && 1984 cap->mds_wanted == want) 1985 continue; /* nope, all good */ 1986 1987 if (no_delay) 1988 goto ack; 1989 1990 /* delay? */ 1991 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 && 1992 time_before(jiffies, ci->i_hold_caps_max)) { 1993 dout(" delaying issued %s -> %s, wanted %s -> %s\n", 1994 ceph_cap_string(cap->issued), 1995 ceph_cap_string(cap->issued & retain), 1996 ceph_cap_string(cap->mds_wanted), 1997 ceph_cap_string(want)); 1998 delayed++; 1999 continue; 2000 } 2001 2002 ack: 2003 if (ci->i_ceph_flags & CEPH_I_NOFLUSH) { 2004 dout(" skipping %p I_NOFLUSH set\n", inode); 2005 continue; 2006 } 2007 2008 if (session && session != cap->session) { 2009 dout("oops, wrong session %p mutex\n", session); 2010 mutex_unlock(&session->s_mutex); 2011 session = NULL; 2012 } 2013 if (!session) { 2014 session = cap->session; 2015 if (mutex_trylock(&session->s_mutex) == 0) { 2016 dout("inverting session/ino locks on %p\n", 2017 session); 2018 spin_unlock(&ci->i_ceph_lock); 2019 if (took_snap_rwsem) { 2020 up_read(&mdsc->snap_rwsem); 2021 took_snap_rwsem = 0; 2022 } 2023 mutex_lock(&session->s_mutex); 2024 goto retry; 2025 } 2026 } 2027 2028 /* kick flushing and flush snaps before sending normal 2029 * cap message */ 2030 if (cap == ci->i_auth_cap && 2031 (ci->i_ceph_flags & 2032 (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) { 2033 if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { 2034 __kick_flushing_caps(mdsc, session, ci, 0); 2035 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 2036 } 2037 if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) 2038 __ceph_flush_snaps(ci, session); 2039 2040 goto retry_locked; 2041 } 2042 2043 /* take snap_rwsem after session mutex */ 2044 if (!took_snap_rwsem) { 2045 if (down_read_trylock(&mdsc->snap_rwsem) == 0) { 2046 dout("inverting snap/in locks on %p\n", 2047 inode); 2048 spin_unlock(&ci->i_ceph_lock); 2049 down_read(&mdsc->snap_rwsem); 2050 took_snap_rwsem = 1; 2051 goto retry; 2052 } 2053 took_snap_rwsem = 1; 2054 } 2055 2056 if (cap == ci->i_auth_cap && ci->i_dirty_caps) { 2057 flushing = __mark_caps_flushing(inode, session, false, 2058 &flush_tid, 2059 &oldest_flush_tid); 2060 } else { 2061 flushing = 0; 2062 flush_tid = 0; 2063 spin_lock(&mdsc->cap_dirty_lock); 2064 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 2065 spin_unlock(&mdsc->cap_dirty_lock); 2066 } 2067 2068 mds = cap->mds; /* remember mds, so we don't repeat */ 2069 sent++; 2070 2071 /* __send_cap drops i_ceph_lock */ 2072 delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, false, 2073 cap_used, want, retain, flushing, 2074 flush_tid, oldest_flush_tid); 2075 goto retry; /* retake i_ceph_lock and restart our cap scan. */ 2076 } 2077 2078 /* Reschedule delayed caps release if we delayed anything */ 2079 if (delayed) 2080 __cap_delay_requeue(mdsc, ci); 2081 2082 spin_unlock(&ci->i_ceph_lock); 2083 2084 if (queue_invalidate) 2085 ceph_queue_invalidate(inode); 2086 2087 if (session) 2088 mutex_unlock(&session->s_mutex); 2089 if (took_snap_rwsem) 2090 up_read(&mdsc->snap_rwsem); 2091 } 2092 2093 /* 2094 * Try to flush dirty caps back to the auth mds. 2095 */ 2096 static int try_flush_caps(struct inode *inode, u64 *ptid) 2097 { 2098 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 2099 struct ceph_inode_info *ci = ceph_inode(inode); 2100 struct ceph_mds_session *session = NULL; 2101 int flushing = 0; 2102 u64 flush_tid = 0, oldest_flush_tid = 0; 2103 2104 retry: 2105 spin_lock(&ci->i_ceph_lock); 2106 if (ci->i_ceph_flags & CEPH_I_NOFLUSH) { 2107 spin_unlock(&ci->i_ceph_lock); 2108 dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode); 2109 goto out; 2110 } 2111 if (ci->i_dirty_caps && ci->i_auth_cap) { 2112 struct ceph_cap *cap = ci->i_auth_cap; 2113 int used = __ceph_caps_used(ci); 2114 int want = __ceph_caps_wanted(ci); 2115 int delayed; 2116 2117 if (!session || session != cap->session) { 2118 spin_unlock(&ci->i_ceph_lock); 2119 if (session) 2120 mutex_unlock(&session->s_mutex); 2121 session = cap->session; 2122 mutex_lock(&session->s_mutex); 2123 goto retry; 2124 } 2125 if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) { 2126 spin_unlock(&ci->i_ceph_lock); 2127 goto out; 2128 } 2129 2130 flushing = __mark_caps_flushing(inode, session, true, 2131 &flush_tid, &oldest_flush_tid); 2132 2133 /* __send_cap drops i_ceph_lock */ 2134 delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, true, 2135 used, want, (cap->issued | cap->implemented), 2136 flushing, flush_tid, oldest_flush_tid); 2137 2138 if (delayed) { 2139 spin_lock(&ci->i_ceph_lock); 2140 __cap_delay_requeue(mdsc, ci); 2141 spin_unlock(&ci->i_ceph_lock); 2142 } 2143 } else { 2144 if (!list_empty(&ci->i_cap_flush_list)) { 2145 struct ceph_cap_flush *cf = 2146 list_last_entry(&ci->i_cap_flush_list, 2147 struct ceph_cap_flush, i_list); 2148 cf->wake = true; 2149 flush_tid = cf->tid; 2150 } 2151 flushing = ci->i_flushing_caps; 2152 spin_unlock(&ci->i_ceph_lock); 2153 } 2154 out: 2155 if (session) 2156 mutex_unlock(&session->s_mutex); 2157 2158 *ptid = flush_tid; 2159 return flushing; 2160 } 2161 2162 /* 2163 * Return true if we've flushed caps through the given flush_tid. 2164 */ 2165 static int caps_are_flushed(struct inode *inode, u64 flush_tid) 2166 { 2167 struct ceph_inode_info *ci = ceph_inode(inode); 2168 int ret = 1; 2169 2170 spin_lock(&ci->i_ceph_lock); 2171 if (!list_empty(&ci->i_cap_flush_list)) { 2172 struct ceph_cap_flush * cf = 2173 list_first_entry(&ci->i_cap_flush_list, 2174 struct ceph_cap_flush, i_list); 2175 if (cf->tid <= flush_tid) 2176 ret = 0; 2177 } 2178 spin_unlock(&ci->i_ceph_lock); 2179 return ret; 2180 } 2181 2182 /* 2183 * wait for any unsafe requests to complete. 2184 */ 2185 static int unsafe_request_wait(struct inode *inode) 2186 { 2187 struct ceph_inode_info *ci = ceph_inode(inode); 2188 struct ceph_mds_request *req1 = NULL, *req2 = NULL; 2189 int ret, err = 0; 2190 2191 spin_lock(&ci->i_unsafe_lock); 2192 if (S_ISDIR(inode->i_mode) && !list_empty(&ci->i_unsafe_dirops)) { 2193 req1 = list_last_entry(&ci->i_unsafe_dirops, 2194 struct ceph_mds_request, 2195 r_unsafe_dir_item); 2196 ceph_mdsc_get_request(req1); 2197 } 2198 if (!list_empty(&ci->i_unsafe_iops)) { 2199 req2 = list_last_entry(&ci->i_unsafe_iops, 2200 struct ceph_mds_request, 2201 r_unsafe_target_item); 2202 ceph_mdsc_get_request(req2); 2203 } 2204 spin_unlock(&ci->i_unsafe_lock); 2205 2206 dout("unsafe_request_wait %p wait on tid %llu %llu\n", 2207 inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL); 2208 if (req1) { 2209 ret = !wait_for_completion_timeout(&req1->r_safe_completion, 2210 ceph_timeout_jiffies(req1->r_timeout)); 2211 if (ret) 2212 err = -EIO; 2213 ceph_mdsc_put_request(req1); 2214 } 2215 if (req2) { 2216 ret = !wait_for_completion_timeout(&req2->r_safe_completion, 2217 ceph_timeout_jiffies(req2->r_timeout)); 2218 if (ret) 2219 err = -EIO; 2220 ceph_mdsc_put_request(req2); 2221 } 2222 return err; 2223 } 2224 2225 int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) 2226 { 2227 struct inode *inode = file->f_mapping->host; 2228 struct ceph_inode_info *ci = ceph_inode(inode); 2229 u64 flush_tid; 2230 int ret; 2231 int dirty; 2232 2233 dout("fsync %p%s\n", inode, datasync ? " datasync" : ""); 2234 2235 ret = file_write_and_wait_range(file, start, end); 2236 if (ret < 0) 2237 goto out; 2238 2239 if (datasync) 2240 goto out; 2241 2242 inode_lock(inode); 2243 2244 dirty = try_flush_caps(inode, &flush_tid); 2245 dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); 2246 2247 ret = unsafe_request_wait(inode); 2248 2249 /* 2250 * only wait on non-file metadata writeback (the mds 2251 * can recover size and mtime, so we don't need to 2252 * wait for that) 2253 */ 2254 if (!ret && (dirty & ~CEPH_CAP_ANY_FILE_WR)) { 2255 ret = wait_event_interruptible(ci->i_cap_wq, 2256 caps_are_flushed(inode, flush_tid)); 2257 } 2258 inode_unlock(inode); 2259 out: 2260 dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret); 2261 return ret; 2262 } 2263 2264 /* 2265 * Flush any dirty caps back to the mds. If we aren't asked to wait, 2266 * queue inode for flush but don't do so immediately, because we can 2267 * get by with fewer MDS messages if we wait for data writeback to 2268 * complete first. 2269 */ 2270 int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) 2271 { 2272 struct ceph_inode_info *ci = ceph_inode(inode); 2273 u64 flush_tid; 2274 int err = 0; 2275 int dirty; 2276 int wait = (wbc->sync_mode == WB_SYNC_ALL && !wbc->for_sync); 2277 2278 dout("write_inode %p wait=%d\n", inode, wait); 2279 if (wait) { 2280 dirty = try_flush_caps(inode, &flush_tid); 2281 if (dirty) 2282 err = wait_event_interruptible(ci->i_cap_wq, 2283 caps_are_flushed(inode, flush_tid)); 2284 } else { 2285 struct ceph_mds_client *mdsc = 2286 ceph_sb_to_client(inode->i_sb)->mdsc; 2287 2288 spin_lock(&ci->i_ceph_lock); 2289 if (__ceph_caps_dirty(ci)) 2290 __cap_delay_requeue_front(mdsc, ci); 2291 spin_unlock(&ci->i_ceph_lock); 2292 } 2293 return err; 2294 } 2295 2296 static void __kick_flushing_caps(struct ceph_mds_client *mdsc, 2297 struct ceph_mds_session *session, 2298 struct ceph_inode_info *ci, 2299 u64 oldest_flush_tid) 2300 __releases(ci->i_ceph_lock) 2301 __acquires(ci->i_ceph_lock) 2302 { 2303 struct inode *inode = &ci->vfs_inode; 2304 struct ceph_cap *cap; 2305 struct ceph_cap_flush *cf; 2306 int ret; 2307 u64 first_tid = 0; 2308 2309 list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) { 2310 if (cf->tid < first_tid) 2311 continue; 2312 2313 cap = ci->i_auth_cap; 2314 if (!(cap && cap->session == session)) { 2315 pr_err("%p auth cap %p not mds%d ???\n", 2316 inode, cap, session->s_mds); 2317 break; 2318 } 2319 2320 first_tid = cf->tid + 1; 2321 2322 if (cf->caps) { 2323 dout("kick_flushing_caps %p cap %p tid %llu %s\n", 2324 inode, cap, cf->tid, ceph_cap_string(cf->caps)); 2325 ci->i_ceph_flags |= CEPH_I_NODELAY; 2326 ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, 2327 false, __ceph_caps_used(ci), 2328 __ceph_caps_wanted(ci), 2329 cap->issued | cap->implemented, 2330 cf->caps, cf->tid, oldest_flush_tid); 2331 if (ret) { 2332 pr_err("kick_flushing_caps: error sending " 2333 "cap flush, ino (%llx.%llx) " 2334 "tid %llu flushing %s\n", 2335 ceph_vinop(inode), cf->tid, 2336 ceph_cap_string(cf->caps)); 2337 } 2338 } else { 2339 struct ceph_cap_snap *capsnap = 2340 container_of(cf, struct ceph_cap_snap, 2341 cap_flush); 2342 dout("kick_flushing_caps %p capsnap %p tid %llu %s\n", 2343 inode, capsnap, cf->tid, 2344 ceph_cap_string(capsnap->dirty)); 2345 2346 refcount_inc(&capsnap->nref); 2347 spin_unlock(&ci->i_ceph_lock); 2348 2349 ret = __send_flush_snap(inode, session, capsnap, cap->mseq, 2350 oldest_flush_tid); 2351 if (ret < 0) { 2352 pr_err("kick_flushing_caps: error sending " 2353 "cap flushsnap, ino (%llx.%llx) " 2354 "tid %llu follows %llu\n", 2355 ceph_vinop(inode), cf->tid, 2356 capsnap->follows); 2357 } 2358 2359 ceph_put_cap_snap(capsnap); 2360 } 2361 2362 spin_lock(&ci->i_ceph_lock); 2363 } 2364 } 2365 2366 void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, 2367 struct ceph_mds_session *session) 2368 { 2369 struct ceph_inode_info *ci; 2370 struct ceph_cap *cap; 2371 u64 oldest_flush_tid; 2372 2373 dout("early_kick_flushing_caps mds%d\n", session->s_mds); 2374 2375 spin_lock(&mdsc->cap_dirty_lock); 2376 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 2377 spin_unlock(&mdsc->cap_dirty_lock); 2378 2379 list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { 2380 spin_lock(&ci->i_ceph_lock); 2381 cap = ci->i_auth_cap; 2382 if (!(cap && cap->session == session)) { 2383 pr_err("%p auth cap %p not mds%d ???\n", 2384 &ci->vfs_inode, cap, session->s_mds); 2385 spin_unlock(&ci->i_ceph_lock); 2386 continue; 2387 } 2388 2389 2390 /* 2391 * if flushing caps were revoked, we re-send the cap flush 2392 * in client reconnect stage. This guarantees MDS * processes 2393 * the cap flush message before issuing the flushing caps to 2394 * other client. 2395 */ 2396 if ((cap->issued & ci->i_flushing_caps) != 2397 ci->i_flushing_caps) { 2398 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 2399 __kick_flushing_caps(mdsc, session, ci, 2400 oldest_flush_tid); 2401 } else { 2402 ci->i_ceph_flags |= CEPH_I_KICK_FLUSH; 2403 } 2404 2405 spin_unlock(&ci->i_ceph_lock); 2406 } 2407 } 2408 2409 void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, 2410 struct ceph_mds_session *session) 2411 { 2412 struct ceph_inode_info *ci; 2413 struct ceph_cap *cap; 2414 u64 oldest_flush_tid; 2415 2416 dout("kick_flushing_caps mds%d\n", session->s_mds); 2417 2418 spin_lock(&mdsc->cap_dirty_lock); 2419 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 2420 spin_unlock(&mdsc->cap_dirty_lock); 2421 2422 list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { 2423 spin_lock(&ci->i_ceph_lock); 2424 cap = ci->i_auth_cap; 2425 if (!(cap && cap->session == session)) { 2426 pr_err("%p auth cap %p not mds%d ???\n", 2427 &ci->vfs_inode, cap, session->s_mds); 2428 spin_unlock(&ci->i_ceph_lock); 2429 continue; 2430 } 2431 if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { 2432 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 2433 __kick_flushing_caps(mdsc, session, ci, 2434 oldest_flush_tid); 2435 } 2436 spin_unlock(&ci->i_ceph_lock); 2437 } 2438 } 2439 2440 static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, 2441 struct ceph_mds_session *session, 2442 struct inode *inode) 2443 __releases(ci->i_ceph_lock) 2444 { 2445 struct ceph_inode_info *ci = ceph_inode(inode); 2446 struct ceph_cap *cap; 2447 2448 cap = ci->i_auth_cap; 2449 dout("kick_flushing_inode_caps %p flushing %s\n", inode, 2450 ceph_cap_string(ci->i_flushing_caps)); 2451 2452 if (!list_empty(&ci->i_cap_flush_list)) { 2453 u64 oldest_flush_tid; 2454 spin_lock(&mdsc->cap_dirty_lock); 2455 list_move_tail(&ci->i_flushing_item, 2456 &cap->session->s_cap_flushing); 2457 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 2458 spin_unlock(&mdsc->cap_dirty_lock); 2459 2460 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 2461 __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid); 2462 spin_unlock(&ci->i_ceph_lock); 2463 } else { 2464 spin_unlock(&ci->i_ceph_lock); 2465 } 2466 } 2467 2468 2469 /* 2470 * Take references to capabilities we hold, so that we don't release 2471 * them to the MDS prematurely. 2472 * 2473 * Protected by i_ceph_lock. 2474 */ 2475 static void __take_cap_refs(struct ceph_inode_info *ci, int got, 2476 bool snap_rwsem_locked) 2477 { 2478 if (got & CEPH_CAP_PIN) 2479 ci->i_pin_ref++; 2480 if (got & CEPH_CAP_FILE_RD) 2481 ci->i_rd_ref++; 2482 if (got & CEPH_CAP_FILE_CACHE) 2483 ci->i_rdcache_ref++; 2484 if (got & CEPH_CAP_FILE_WR) { 2485 if (ci->i_wr_ref == 0 && !ci->i_head_snapc) { 2486 BUG_ON(!snap_rwsem_locked); 2487 ci->i_head_snapc = ceph_get_snap_context( 2488 ci->i_snap_realm->cached_context); 2489 } 2490 ci->i_wr_ref++; 2491 } 2492 if (got & CEPH_CAP_FILE_BUFFER) { 2493 if (ci->i_wb_ref == 0) 2494 ihold(&ci->vfs_inode); 2495 ci->i_wb_ref++; 2496 dout("__take_cap_refs %p wb %d -> %d (?)\n", 2497 &ci->vfs_inode, ci->i_wb_ref-1, ci->i_wb_ref); 2498 } 2499 } 2500 2501 /* 2502 * Try to grab cap references. Specify those refs we @want, and the 2503 * minimal set we @need. Also include the larger offset we are writing 2504 * to (when applicable), and check against max_size here as well. 2505 * Note that caller is responsible for ensuring max_size increases are 2506 * requested from the MDS. 2507 */ 2508 static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, 2509 loff_t endoff, bool nonblock, int *got, int *err) 2510 { 2511 struct inode *inode = &ci->vfs_inode; 2512 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 2513 int ret = 0; 2514 int have, implemented; 2515 int file_wanted; 2516 bool snap_rwsem_locked = false; 2517 2518 dout("get_cap_refs %p need %s want %s\n", inode, 2519 ceph_cap_string(need), ceph_cap_string(want)); 2520 2521 again: 2522 spin_lock(&ci->i_ceph_lock); 2523 2524 /* make sure file is actually open */ 2525 file_wanted = __ceph_caps_file_wanted(ci); 2526 if ((file_wanted & need) != need) { 2527 dout("try_get_cap_refs need %s file_wanted %s, EBADF\n", 2528 ceph_cap_string(need), ceph_cap_string(file_wanted)); 2529 *err = -EBADF; 2530 ret = 1; 2531 goto out_unlock; 2532 } 2533 2534 /* finish pending truncate */ 2535 while (ci->i_truncate_pending) { 2536 spin_unlock(&ci->i_ceph_lock); 2537 if (snap_rwsem_locked) { 2538 up_read(&mdsc->snap_rwsem); 2539 snap_rwsem_locked = false; 2540 } 2541 __ceph_do_pending_vmtruncate(inode); 2542 spin_lock(&ci->i_ceph_lock); 2543 } 2544 2545 have = __ceph_caps_issued(ci, &implemented); 2546 2547 if (have & need & CEPH_CAP_FILE_WR) { 2548 if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) { 2549 dout("get_cap_refs %p endoff %llu > maxsize %llu\n", 2550 inode, endoff, ci->i_max_size); 2551 if (endoff > ci->i_requested_max_size) { 2552 *err = -EAGAIN; 2553 ret = 1; 2554 } 2555 goto out_unlock; 2556 } 2557 /* 2558 * If a sync write is in progress, we must wait, so that we 2559 * can get a final snapshot value for size+mtime. 2560 */ 2561 if (__ceph_have_pending_cap_snap(ci)) { 2562 dout("get_cap_refs %p cap_snap_pending\n", inode); 2563 goto out_unlock; 2564 } 2565 } 2566 2567 if ((have & need) == need) { 2568 /* 2569 * Look at (implemented & ~have & not) so that we keep waiting 2570 * on transition from wanted -> needed caps. This is needed 2571 * for WRBUFFER|WR -> WR to avoid a new WR sync write from 2572 * going before a prior buffered writeback happens. 2573 */ 2574 int not = want & ~(have & need); 2575 int revoking = implemented & ~have; 2576 dout("get_cap_refs %p have %s but not %s (revoking %s)\n", 2577 inode, ceph_cap_string(have), ceph_cap_string(not), 2578 ceph_cap_string(revoking)); 2579 if ((revoking & not) == 0) { 2580 if (!snap_rwsem_locked && 2581 !ci->i_head_snapc && 2582 (need & CEPH_CAP_FILE_WR)) { 2583 if (!down_read_trylock(&mdsc->snap_rwsem)) { 2584 /* 2585 * we can not call down_read() when 2586 * task isn't in TASK_RUNNING state 2587 */ 2588 if (nonblock) { 2589 *err = -EAGAIN; 2590 ret = 1; 2591 goto out_unlock; 2592 } 2593 2594 spin_unlock(&ci->i_ceph_lock); 2595 down_read(&mdsc->snap_rwsem); 2596 snap_rwsem_locked = true; 2597 goto again; 2598 } 2599 snap_rwsem_locked = true; 2600 } 2601 *got = need | (have & want); 2602 if ((need & CEPH_CAP_FILE_RD) && 2603 !(*got & CEPH_CAP_FILE_CACHE)) 2604 ceph_disable_fscache_readpage(ci); 2605 __take_cap_refs(ci, *got, true); 2606 ret = 1; 2607 } 2608 } else { 2609 int session_readonly = false; 2610 if ((need & CEPH_CAP_FILE_WR) && ci->i_auth_cap) { 2611 struct ceph_mds_session *s = ci->i_auth_cap->session; 2612 spin_lock(&s->s_cap_lock); 2613 session_readonly = s->s_readonly; 2614 spin_unlock(&s->s_cap_lock); 2615 } 2616 if (session_readonly) { 2617 dout("get_cap_refs %p needed %s but mds%d readonly\n", 2618 inode, ceph_cap_string(need), ci->i_auth_cap->mds); 2619 *err = -EROFS; 2620 ret = 1; 2621 goto out_unlock; 2622 } 2623 2624 if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) { 2625 int mds_wanted; 2626 if (READ_ONCE(mdsc->fsc->mount_state) == 2627 CEPH_MOUNT_SHUTDOWN) { 2628 dout("get_cap_refs %p forced umount\n", inode); 2629 *err = -EIO; 2630 ret = 1; 2631 goto out_unlock; 2632 } 2633 mds_wanted = __ceph_caps_mds_wanted(ci, false); 2634 if (need & ~(mds_wanted & need)) { 2635 dout("get_cap_refs %p caps were dropped" 2636 " (session killed?)\n", inode); 2637 *err = -ESTALE; 2638 ret = 1; 2639 goto out_unlock; 2640 } 2641 if (!(file_wanted & ~mds_wanted)) 2642 ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED; 2643 } 2644 2645 dout("get_cap_refs %p have %s needed %s\n", inode, 2646 ceph_cap_string(have), ceph_cap_string(need)); 2647 } 2648 out_unlock: 2649 spin_unlock(&ci->i_ceph_lock); 2650 if (snap_rwsem_locked) 2651 up_read(&mdsc->snap_rwsem); 2652 2653 dout("get_cap_refs %p ret %d got %s\n", inode, 2654 ret, ceph_cap_string(*got)); 2655 return ret; 2656 } 2657 2658 /* 2659 * Check the offset we are writing up to against our current 2660 * max_size. If necessary, tell the MDS we want to write to 2661 * a larger offset. 2662 */ 2663 static void check_max_size(struct inode *inode, loff_t endoff) 2664 { 2665 struct ceph_inode_info *ci = ceph_inode(inode); 2666 int check = 0; 2667 2668 /* do we need to explicitly request a larger max_size? */ 2669 spin_lock(&ci->i_ceph_lock); 2670 if (endoff >= ci->i_max_size && endoff > ci->i_wanted_max_size) { 2671 dout("write %p at large endoff %llu, req max_size\n", 2672 inode, endoff); 2673 ci->i_wanted_max_size = endoff; 2674 } 2675 /* duplicate ceph_check_caps()'s logic */ 2676 if (ci->i_auth_cap && 2677 (ci->i_auth_cap->issued & CEPH_CAP_FILE_WR) && 2678 ci->i_wanted_max_size > ci->i_max_size && 2679 ci->i_wanted_max_size > ci->i_requested_max_size) 2680 check = 1; 2681 spin_unlock(&ci->i_ceph_lock); 2682 if (check) 2683 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); 2684 } 2685 2686 int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got) 2687 { 2688 int ret, err = 0; 2689 2690 BUG_ON(need & ~CEPH_CAP_FILE_RD); 2691 BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)); 2692 ret = ceph_pool_perm_check(ci, need); 2693 if (ret < 0) 2694 return ret; 2695 2696 ret = try_get_cap_refs(ci, need, want, 0, true, got, &err); 2697 if (ret) { 2698 if (err == -EAGAIN) { 2699 ret = 0; 2700 } else if (err < 0) { 2701 ret = err; 2702 } 2703 } 2704 return ret; 2705 } 2706 2707 /* 2708 * Wait for caps, and take cap references. If we can't get a WR cap 2709 * due to a small max_size, make sure we check_max_size (and possibly 2710 * ask the mds) so we don't get hung up indefinitely. 2711 */ 2712 int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, 2713 loff_t endoff, int *got, struct page **pinned_page) 2714 { 2715 int _got, ret, err = 0; 2716 2717 ret = ceph_pool_perm_check(ci, need); 2718 if (ret < 0) 2719 return ret; 2720 2721 while (true) { 2722 if (endoff > 0) 2723 check_max_size(&ci->vfs_inode, endoff); 2724 2725 err = 0; 2726 _got = 0; 2727 ret = try_get_cap_refs(ci, need, want, endoff, 2728 false, &_got, &err); 2729 if (ret) { 2730 if (err == -EAGAIN) 2731 continue; 2732 if (err < 0) 2733 ret = err; 2734 } else { 2735 DEFINE_WAIT_FUNC(wait, woken_wake_function); 2736 add_wait_queue(&ci->i_cap_wq, &wait); 2737 2738 while (!try_get_cap_refs(ci, need, want, endoff, 2739 true, &_got, &err)) { 2740 if (signal_pending(current)) { 2741 ret = -ERESTARTSYS; 2742 break; 2743 } 2744 wait_woken(&wait, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT); 2745 } 2746 2747 remove_wait_queue(&ci->i_cap_wq, &wait); 2748 2749 if (err == -EAGAIN) 2750 continue; 2751 if (err < 0) 2752 ret = err; 2753 } 2754 if (ret < 0) { 2755 if (err == -ESTALE) { 2756 /* session was killed, try renew caps */ 2757 ret = ceph_renew_caps(&ci->vfs_inode); 2758 if (ret == 0) 2759 continue; 2760 } 2761 return ret; 2762 } 2763 2764 if (ci->i_inline_version != CEPH_INLINE_NONE && 2765 (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && 2766 i_size_read(&ci->vfs_inode) > 0) { 2767 struct page *page = 2768 find_get_page(ci->vfs_inode.i_mapping, 0); 2769 if (page) { 2770 if (PageUptodate(page)) { 2771 *pinned_page = page; 2772 break; 2773 } 2774 put_page(page); 2775 } 2776 /* 2777 * drop cap refs first because getattr while 2778 * holding * caps refs can cause deadlock. 2779 */ 2780 ceph_put_cap_refs(ci, _got); 2781 _got = 0; 2782 2783 /* 2784 * getattr request will bring inline data into 2785 * page cache 2786 */ 2787 ret = __ceph_do_getattr(&ci->vfs_inode, NULL, 2788 CEPH_STAT_CAP_INLINE_DATA, 2789 true); 2790 if (ret < 0) 2791 return ret; 2792 continue; 2793 } 2794 break; 2795 } 2796 2797 if ((_got & CEPH_CAP_FILE_RD) && (_got & CEPH_CAP_FILE_CACHE)) 2798 ceph_fscache_revalidate_cookie(ci); 2799 2800 *got = _got; 2801 return 0; 2802 } 2803 2804 /* 2805 * Take cap refs. Caller must already know we hold at least one ref 2806 * on the caps in question or we don't know this is safe. 2807 */ 2808 void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps) 2809 { 2810 spin_lock(&ci->i_ceph_lock); 2811 __take_cap_refs(ci, caps, false); 2812 spin_unlock(&ci->i_ceph_lock); 2813 } 2814 2815 2816 /* 2817 * drop cap_snap that is not associated with any snapshot. 2818 * we don't need to send FLUSHSNAP message for it. 2819 */ 2820 static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci, 2821 struct ceph_cap_snap *capsnap) 2822 { 2823 if (!capsnap->need_flush && 2824 !capsnap->writing && !capsnap->dirty_pages) { 2825 dout("dropping cap_snap %p follows %llu\n", 2826 capsnap, capsnap->follows); 2827 BUG_ON(capsnap->cap_flush.tid > 0); 2828 ceph_put_snap_context(capsnap->context); 2829 if (!list_is_last(&capsnap->ci_item, &ci->i_cap_snaps)) 2830 ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS; 2831 2832 list_del(&capsnap->ci_item); 2833 ceph_put_cap_snap(capsnap); 2834 return 1; 2835 } 2836 return 0; 2837 } 2838 2839 /* 2840 * Release cap refs. 2841 * 2842 * If we released the last ref on any given cap, call ceph_check_caps 2843 * to release (or schedule a release). 2844 * 2845 * If we are releasing a WR cap (from a sync write), finalize any affected 2846 * cap_snap, and wake up any waiters. 2847 */ 2848 void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) 2849 { 2850 struct inode *inode = &ci->vfs_inode; 2851 int last = 0, put = 0, flushsnaps = 0, wake = 0; 2852 2853 spin_lock(&ci->i_ceph_lock); 2854 if (had & CEPH_CAP_PIN) 2855 --ci->i_pin_ref; 2856 if (had & CEPH_CAP_FILE_RD) 2857 if (--ci->i_rd_ref == 0) 2858 last++; 2859 if (had & CEPH_CAP_FILE_CACHE) 2860 if (--ci->i_rdcache_ref == 0) 2861 last++; 2862 if (had & CEPH_CAP_FILE_BUFFER) { 2863 if (--ci->i_wb_ref == 0) { 2864 last++; 2865 put++; 2866 } 2867 dout("put_cap_refs %p wb %d -> %d (?)\n", 2868 inode, ci->i_wb_ref+1, ci->i_wb_ref); 2869 } 2870 if (had & CEPH_CAP_FILE_WR) 2871 if (--ci->i_wr_ref == 0) { 2872 last++; 2873 if (__ceph_have_pending_cap_snap(ci)) { 2874 struct ceph_cap_snap *capsnap = 2875 list_last_entry(&ci->i_cap_snaps, 2876 struct ceph_cap_snap, 2877 ci_item); 2878 capsnap->writing = 0; 2879 if (ceph_try_drop_cap_snap(ci, capsnap)) 2880 put++; 2881 else if (__ceph_finish_cap_snap(ci, capsnap)) 2882 flushsnaps = 1; 2883 wake = 1; 2884 } 2885 if (ci->i_wrbuffer_ref_head == 0 && 2886 ci->i_dirty_caps == 0 && 2887 ci->i_flushing_caps == 0) { 2888 BUG_ON(!ci->i_head_snapc); 2889 ceph_put_snap_context(ci->i_head_snapc); 2890 ci->i_head_snapc = NULL; 2891 } 2892 /* see comment in __ceph_remove_cap() */ 2893 if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) 2894 drop_inode_snap_realm(ci); 2895 } 2896 spin_unlock(&ci->i_ceph_lock); 2897 2898 dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had), 2899 last ? " last" : "", put ? " put" : ""); 2900 2901 if (last && !flushsnaps) 2902 ceph_check_caps(ci, 0, NULL); 2903 else if (flushsnaps) 2904 ceph_flush_snaps(ci, NULL); 2905 if (wake) 2906 wake_up_all(&ci->i_cap_wq); 2907 while (put-- > 0) 2908 iput(inode); 2909 } 2910 2911 /* 2912 * Release @nr WRBUFFER refs on dirty pages for the given @snapc snap 2913 * context. Adjust per-snap dirty page accounting as appropriate. 2914 * Once all dirty data for a cap_snap is flushed, flush snapped file 2915 * metadata back to the MDS. If we dropped the last ref, call 2916 * ceph_check_caps. 2917 */ 2918 void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, 2919 struct ceph_snap_context *snapc) 2920 { 2921 struct inode *inode = &ci->vfs_inode; 2922 struct ceph_cap_snap *capsnap = NULL; 2923 int put = 0; 2924 bool last = false; 2925 bool found = false; 2926 bool flush_snaps = false; 2927 bool complete_capsnap = false; 2928 2929 spin_lock(&ci->i_ceph_lock); 2930 ci->i_wrbuffer_ref -= nr; 2931 if (ci->i_wrbuffer_ref == 0) { 2932 last = true; 2933 put++; 2934 } 2935 2936 if (ci->i_head_snapc == snapc) { 2937 ci->i_wrbuffer_ref_head -= nr; 2938 if (ci->i_wrbuffer_ref_head == 0 && 2939 ci->i_wr_ref == 0 && 2940 ci->i_dirty_caps == 0 && 2941 ci->i_flushing_caps == 0) { 2942 BUG_ON(!ci->i_head_snapc); 2943 ceph_put_snap_context(ci->i_head_snapc); 2944 ci->i_head_snapc = NULL; 2945 } 2946 dout("put_wrbuffer_cap_refs on %p head %d/%d -> %d/%d %s\n", 2947 inode, 2948 ci->i_wrbuffer_ref+nr, ci->i_wrbuffer_ref_head+nr, 2949 ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head, 2950 last ? " LAST" : ""); 2951 } else { 2952 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 2953 if (capsnap->context == snapc) { 2954 found = true; 2955 break; 2956 } 2957 } 2958 BUG_ON(!found); 2959 capsnap->dirty_pages -= nr; 2960 if (capsnap->dirty_pages == 0) { 2961 complete_capsnap = true; 2962 if (!capsnap->writing) { 2963 if (ceph_try_drop_cap_snap(ci, capsnap)) { 2964 put++; 2965 } else { 2966 ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS; 2967 flush_snaps = true; 2968 } 2969 } 2970 } 2971 dout("put_wrbuffer_cap_refs on %p cap_snap %p " 2972 " snap %lld %d/%d -> %d/%d %s%s\n", 2973 inode, capsnap, capsnap->context->seq, 2974 ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr, 2975 ci->i_wrbuffer_ref, capsnap->dirty_pages, 2976 last ? " (wrbuffer last)" : "", 2977 complete_capsnap ? " (complete capsnap)" : ""); 2978 } 2979 2980 spin_unlock(&ci->i_ceph_lock); 2981 2982 if (last) { 2983 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); 2984 } else if (flush_snaps) { 2985 ceph_flush_snaps(ci, NULL); 2986 } 2987 if (complete_capsnap) 2988 wake_up_all(&ci->i_cap_wq); 2989 while (put-- > 0) 2990 iput(inode); 2991 } 2992 2993 /* 2994 * Invalidate unlinked inode's aliases, so we can drop the inode ASAP. 2995 */ 2996 static void invalidate_aliases(struct inode *inode) 2997 { 2998 struct dentry *dn, *prev = NULL; 2999 3000 dout("invalidate_aliases inode %p\n", inode); 3001 d_prune_aliases(inode); 3002 /* 3003 * For non-directory inode, d_find_alias() only returns 3004 * hashed dentry. After calling d_invalidate(), the 3005 * dentry becomes unhashed. 3006 * 3007 * For directory inode, d_find_alias() can return 3008 * unhashed dentry. But directory inode should have 3009 * one alias at most. 3010 */ 3011 while ((dn = d_find_alias(inode))) { 3012 if (dn == prev) { 3013 dput(dn); 3014 break; 3015 } 3016 d_invalidate(dn); 3017 if (prev) 3018 dput(prev); 3019 prev = dn; 3020 } 3021 if (prev) 3022 dput(prev); 3023 } 3024 3025 /* 3026 * Handle a cap GRANT message from the MDS. (Note that a GRANT may 3027 * actually be a revocation if it specifies a smaller cap set.) 3028 * 3029 * caller holds s_mutex and i_ceph_lock, we drop both. 3030 */ 3031 static void handle_cap_grant(struct ceph_mds_client *mdsc, 3032 struct inode *inode, struct ceph_mds_caps *grant, 3033 struct ceph_string **pns, u64 inline_version, 3034 void *inline_data, u32 inline_len, 3035 struct ceph_buffer *xattr_buf, 3036 struct ceph_mds_session *session, 3037 struct ceph_cap *cap, int issued) 3038 __releases(ci->i_ceph_lock) 3039 __releases(mdsc->snap_rwsem) 3040 { 3041 struct ceph_inode_info *ci = ceph_inode(inode); 3042 int mds = session->s_mds; 3043 int seq = le32_to_cpu(grant->seq); 3044 int newcaps = le32_to_cpu(grant->caps); 3045 int used, wanted, dirty; 3046 u64 size = le64_to_cpu(grant->size); 3047 u64 max_size = le64_to_cpu(grant->max_size); 3048 struct timespec mtime, atime, ctime; 3049 int check_caps = 0; 3050 bool wake = false; 3051 bool writeback = false; 3052 bool queue_trunc = false; 3053 bool queue_invalidate = false; 3054 bool deleted_inode = false; 3055 bool fill_inline = false; 3056 3057 dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", 3058 inode, cap, mds, seq, ceph_cap_string(newcaps)); 3059 dout(" size %llu max_size %llu, i_size %llu\n", size, max_size, 3060 inode->i_size); 3061 3062 3063 /* 3064 * auth mds of the inode changed. we received the cap export message, 3065 * but still haven't received the cap import message. handle_cap_export 3066 * updated the new auth MDS' cap. 3067 * 3068 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message 3069 * that was sent before the cap import message. So don't remove caps. 3070 */ 3071 if (ceph_seq_cmp(seq, cap->seq) <= 0) { 3072 WARN_ON(cap != ci->i_auth_cap); 3073 WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id)); 3074 seq = cap->seq; 3075 newcaps |= cap->issued; 3076 } 3077 3078 /* 3079 * If CACHE is being revoked, and we have no dirty buffers, 3080 * try to invalidate (once). (If there are dirty buffers, we 3081 * will invalidate _after_ writeback.) 3082 */ 3083 if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */ 3084 ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && 3085 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && 3086 !(ci->i_wrbuffer_ref || ci->i_wb_ref)) { 3087 if (try_nonblocking_invalidate(inode)) { 3088 /* there were locked pages.. invalidate later 3089 in a separate thread. */ 3090 if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { 3091 queue_invalidate = true; 3092 ci->i_rdcache_revoking = ci->i_rdcache_gen; 3093 } 3094 } 3095 } 3096 3097 /* side effects now are allowed */ 3098 cap->cap_gen = session->s_cap_gen; 3099 cap->seq = seq; 3100 3101 __check_cap_issue(ci, cap, newcaps); 3102 3103 if ((newcaps & CEPH_CAP_AUTH_SHARED) && 3104 (issued & CEPH_CAP_AUTH_EXCL) == 0) { 3105 inode->i_mode = le32_to_cpu(grant->mode); 3106 inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid)); 3107 inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid)); 3108 dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode, 3109 from_kuid(&init_user_ns, inode->i_uid), 3110 from_kgid(&init_user_ns, inode->i_gid)); 3111 } 3112 3113 if ((newcaps & CEPH_CAP_AUTH_SHARED) && 3114 (issued & CEPH_CAP_LINK_EXCL) == 0) { 3115 set_nlink(inode, le32_to_cpu(grant->nlink)); 3116 if (inode->i_nlink == 0 && 3117 (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL))) 3118 deleted_inode = true; 3119 } 3120 3121 if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) { 3122 int len = le32_to_cpu(grant->xattr_len); 3123 u64 version = le64_to_cpu(grant->xattr_version); 3124 3125 if (version > ci->i_xattrs.version) { 3126 dout(" got new xattrs v%llu on %p len %d\n", 3127 version, inode, len); 3128 if (ci->i_xattrs.blob) 3129 ceph_buffer_put(ci->i_xattrs.blob); 3130 ci->i_xattrs.blob = ceph_buffer_get(xattr_buf); 3131 ci->i_xattrs.version = version; 3132 ceph_forget_all_cached_acls(inode); 3133 } 3134 } 3135 3136 if (newcaps & CEPH_CAP_ANY_RD) { 3137 /* ctime/mtime/atime? */ 3138 ceph_decode_timespec(&mtime, &grant->mtime); 3139 ceph_decode_timespec(&atime, &grant->atime); 3140 ceph_decode_timespec(&ctime, &grant->ctime); 3141 ceph_fill_file_time(inode, issued, 3142 le32_to_cpu(grant->time_warp_seq), 3143 &ctime, &mtime, &atime); 3144 } 3145 3146 if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) { 3147 /* file layout may have changed */ 3148 s64 old_pool = ci->i_layout.pool_id; 3149 struct ceph_string *old_ns; 3150 3151 ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout); 3152 old_ns = rcu_dereference_protected(ci->i_layout.pool_ns, 3153 lockdep_is_held(&ci->i_ceph_lock)); 3154 rcu_assign_pointer(ci->i_layout.pool_ns, *pns); 3155 3156 if (ci->i_layout.pool_id != old_pool || *pns != old_ns) 3157 ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; 3158 3159 *pns = old_ns; 3160 3161 /* size/truncate_seq? */ 3162 queue_trunc = ceph_fill_file_size(inode, issued, 3163 le32_to_cpu(grant->truncate_seq), 3164 le64_to_cpu(grant->truncate_size), 3165 size); 3166 } 3167 3168 if (ci->i_auth_cap == cap && (newcaps & CEPH_CAP_ANY_FILE_WR)) { 3169 if (max_size != ci->i_max_size) { 3170 dout("max_size %lld -> %llu\n", 3171 ci->i_max_size, max_size); 3172 ci->i_max_size = max_size; 3173 if (max_size >= ci->i_wanted_max_size) { 3174 ci->i_wanted_max_size = 0; /* reset */ 3175 ci->i_requested_max_size = 0; 3176 } 3177 wake = true; 3178 } else if (ci->i_wanted_max_size > ci->i_max_size && 3179 ci->i_wanted_max_size > ci->i_requested_max_size) { 3180 /* CEPH_CAP_OP_IMPORT */ 3181 wake = true; 3182 } 3183 } 3184 3185 /* check cap bits */ 3186 wanted = __ceph_caps_wanted(ci); 3187 used = __ceph_caps_used(ci); 3188 dirty = __ceph_caps_dirty(ci); 3189 dout(" my wanted = %s, used = %s, dirty %s\n", 3190 ceph_cap_string(wanted), 3191 ceph_cap_string(used), 3192 ceph_cap_string(dirty)); 3193 if (wanted != le32_to_cpu(grant->wanted)) { 3194 dout("mds wanted %s -> %s\n", 3195 ceph_cap_string(le32_to_cpu(grant->wanted)), 3196 ceph_cap_string(wanted)); 3197 /* imported cap may not have correct mds_wanted */ 3198 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) 3199 check_caps = 1; 3200 } 3201 3202 /* revocation, grant, or no-op? */ 3203 if (cap->issued & ~newcaps) { 3204 int revoking = cap->issued & ~newcaps; 3205 3206 dout("revocation: %s -> %s (revoking %s)\n", 3207 ceph_cap_string(cap->issued), 3208 ceph_cap_string(newcaps), 3209 ceph_cap_string(revoking)); 3210 if (revoking & used & CEPH_CAP_FILE_BUFFER) 3211 writeback = true; /* initiate writeback; will delay ack */ 3212 else if (revoking == CEPH_CAP_FILE_CACHE && 3213 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && 3214 queue_invalidate) 3215 ; /* do nothing yet, invalidation will be queued */ 3216 else if (cap == ci->i_auth_cap) 3217 check_caps = 1; /* check auth cap only */ 3218 else 3219 check_caps = 2; /* check all caps */ 3220 cap->issued = newcaps; 3221 cap->implemented |= newcaps; 3222 } else if (cap->issued == newcaps) { 3223 dout("caps unchanged: %s -> %s\n", 3224 ceph_cap_string(cap->issued), ceph_cap_string(newcaps)); 3225 } else { 3226 dout("grant: %s -> %s\n", ceph_cap_string(cap->issued), 3227 ceph_cap_string(newcaps)); 3228 /* non-auth MDS is revoking the newly grant caps ? */ 3229 if (cap == ci->i_auth_cap && 3230 __ceph_caps_revoking_other(ci, cap, newcaps)) 3231 check_caps = 2; 3232 3233 cap->issued = newcaps; 3234 cap->implemented |= newcaps; /* add bits only, to 3235 * avoid stepping on a 3236 * pending revocation */ 3237 wake = true; 3238 } 3239 BUG_ON(cap->issued & ~cap->implemented); 3240 3241 if (inline_version > 0 && inline_version >= ci->i_inline_version) { 3242 ci->i_inline_version = inline_version; 3243 if (ci->i_inline_version != CEPH_INLINE_NONE && 3244 (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO))) 3245 fill_inline = true; 3246 } 3247 3248 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { 3249 if (newcaps & ~issued) 3250 wake = true; 3251 kick_flushing_inode_caps(mdsc, session, inode); 3252 up_read(&mdsc->snap_rwsem); 3253 } else { 3254 spin_unlock(&ci->i_ceph_lock); 3255 } 3256 3257 if (fill_inline) 3258 ceph_fill_inline_data(inode, NULL, inline_data, inline_len); 3259 3260 if (queue_trunc) 3261 ceph_queue_vmtruncate(inode); 3262 3263 if (writeback) 3264 /* 3265 * queue inode for writeback: we can't actually call 3266 * filemap_write_and_wait, etc. from message handler 3267 * context. 3268 */ 3269 ceph_queue_writeback(inode); 3270 if (queue_invalidate) 3271 ceph_queue_invalidate(inode); 3272 if (deleted_inode) 3273 invalidate_aliases(inode); 3274 if (wake) 3275 wake_up_all(&ci->i_cap_wq); 3276 3277 if (check_caps == 1) 3278 ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY, 3279 session); 3280 else if (check_caps == 2) 3281 ceph_check_caps(ci, CHECK_CAPS_NODELAY, session); 3282 else 3283 mutex_unlock(&session->s_mutex); 3284 } 3285 3286 /* 3287 * Handle FLUSH_ACK from MDS, indicating that metadata we sent to the 3288 * MDS has been safely committed. 3289 */ 3290 static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, 3291 struct ceph_mds_caps *m, 3292 struct ceph_mds_session *session, 3293 struct ceph_cap *cap) 3294 __releases(ci->i_ceph_lock) 3295 { 3296 struct ceph_inode_info *ci = ceph_inode(inode); 3297 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 3298 struct ceph_cap_flush *cf, *tmp_cf; 3299 LIST_HEAD(to_remove); 3300 unsigned seq = le32_to_cpu(m->seq); 3301 int dirty = le32_to_cpu(m->dirty); 3302 int cleaned = 0; 3303 bool drop = false; 3304 bool wake_ci = false; 3305 bool wake_mdsc = false; 3306 3307 list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) { 3308 if (cf->tid == flush_tid) 3309 cleaned = cf->caps; 3310 if (cf->caps == 0) /* capsnap */ 3311 continue; 3312 if (cf->tid <= flush_tid) { 3313 if (__finish_cap_flush(NULL, ci, cf)) 3314 wake_ci = true; 3315 list_add_tail(&cf->i_list, &to_remove); 3316 } else { 3317 cleaned &= ~cf->caps; 3318 if (!cleaned) 3319 break; 3320 } 3321 } 3322 3323 dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s," 3324 " flushing %s -> %s\n", 3325 inode, session->s_mds, seq, ceph_cap_string(dirty), 3326 ceph_cap_string(cleaned), ceph_cap_string(ci->i_flushing_caps), 3327 ceph_cap_string(ci->i_flushing_caps & ~cleaned)); 3328 3329 if (list_empty(&to_remove) && !cleaned) 3330 goto out; 3331 3332 ci->i_flushing_caps &= ~cleaned; 3333 3334 spin_lock(&mdsc->cap_dirty_lock); 3335 3336 list_for_each_entry(cf, &to_remove, i_list) { 3337 if (__finish_cap_flush(mdsc, NULL, cf)) 3338 wake_mdsc = true; 3339 } 3340 3341 if (ci->i_flushing_caps == 0) { 3342 if (list_empty(&ci->i_cap_flush_list)) { 3343 list_del_init(&ci->i_flushing_item); 3344 if (!list_empty(&session->s_cap_flushing)) { 3345 dout(" mds%d still flushing cap on %p\n", 3346 session->s_mds, 3347 &list_first_entry(&session->s_cap_flushing, 3348 struct ceph_inode_info, 3349 i_flushing_item)->vfs_inode); 3350 } 3351 } 3352 mdsc->num_cap_flushing--; 3353 dout(" inode %p now !flushing\n", inode); 3354 3355 if (ci->i_dirty_caps == 0) { 3356 dout(" inode %p now clean\n", inode); 3357 BUG_ON(!list_empty(&ci->i_dirty_item)); 3358 drop = true; 3359 if (ci->i_wr_ref == 0 && 3360 ci->i_wrbuffer_ref_head == 0) { 3361 BUG_ON(!ci->i_head_snapc); 3362 ceph_put_snap_context(ci->i_head_snapc); 3363 ci->i_head_snapc = NULL; 3364 } 3365 } else { 3366 BUG_ON(list_empty(&ci->i_dirty_item)); 3367 } 3368 } 3369 spin_unlock(&mdsc->cap_dirty_lock); 3370 3371 out: 3372 spin_unlock(&ci->i_ceph_lock); 3373 3374 while (!list_empty(&to_remove)) { 3375 cf = list_first_entry(&to_remove, 3376 struct ceph_cap_flush, i_list); 3377 list_del(&cf->i_list); 3378 ceph_free_cap_flush(cf); 3379 } 3380 3381 if (wake_ci) 3382 wake_up_all(&ci->i_cap_wq); 3383 if (wake_mdsc) 3384 wake_up_all(&mdsc->cap_flushing_wq); 3385 if (drop) 3386 iput(inode); 3387 } 3388 3389 /* 3390 * Handle FLUSHSNAP_ACK. MDS has flushed snap data to disk and we can 3391 * throw away our cap_snap. 3392 * 3393 * Caller hold s_mutex. 3394 */ 3395 static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, 3396 struct ceph_mds_caps *m, 3397 struct ceph_mds_session *session) 3398 { 3399 struct ceph_inode_info *ci = ceph_inode(inode); 3400 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 3401 u64 follows = le64_to_cpu(m->snap_follows); 3402 struct ceph_cap_snap *capsnap; 3403 bool flushed = false; 3404 bool wake_ci = false; 3405 bool wake_mdsc = false; 3406 3407 dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n", 3408 inode, ci, session->s_mds, follows); 3409 3410 spin_lock(&ci->i_ceph_lock); 3411 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 3412 if (capsnap->follows == follows) { 3413 if (capsnap->cap_flush.tid != flush_tid) { 3414 dout(" cap_snap %p follows %lld tid %lld !=" 3415 " %lld\n", capsnap, follows, 3416 flush_tid, capsnap->cap_flush.tid); 3417 break; 3418 } 3419 flushed = true; 3420 break; 3421 } else { 3422 dout(" skipping cap_snap %p follows %lld\n", 3423 capsnap, capsnap->follows); 3424 } 3425 } 3426 if (flushed) { 3427 WARN_ON(capsnap->dirty_pages || capsnap->writing); 3428 dout(" removing %p cap_snap %p follows %lld\n", 3429 inode, capsnap, follows); 3430 list_del(&capsnap->ci_item); 3431 if (__finish_cap_flush(NULL, ci, &capsnap->cap_flush)) 3432 wake_ci = true; 3433 3434 spin_lock(&mdsc->cap_dirty_lock); 3435 3436 if (list_empty(&ci->i_cap_flush_list)) 3437 list_del_init(&ci->i_flushing_item); 3438 3439 if (__finish_cap_flush(mdsc, NULL, &capsnap->cap_flush)) 3440 wake_mdsc = true; 3441 3442 spin_unlock(&mdsc->cap_dirty_lock); 3443 } 3444 spin_unlock(&ci->i_ceph_lock); 3445 if (flushed) { 3446 ceph_put_snap_context(capsnap->context); 3447 ceph_put_cap_snap(capsnap); 3448 if (wake_ci) 3449 wake_up_all(&ci->i_cap_wq); 3450 if (wake_mdsc) 3451 wake_up_all(&mdsc->cap_flushing_wq); 3452 iput(inode); 3453 } 3454 } 3455 3456 /* 3457 * Handle TRUNC from MDS, indicating file truncation. 3458 * 3459 * caller hold s_mutex. 3460 */ 3461 static void handle_cap_trunc(struct inode *inode, 3462 struct ceph_mds_caps *trunc, 3463 struct ceph_mds_session *session) 3464 __releases(ci->i_ceph_lock) 3465 { 3466 struct ceph_inode_info *ci = ceph_inode(inode); 3467 int mds = session->s_mds; 3468 int seq = le32_to_cpu(trunc->seq); 3469 u32 truncate_seq = le32_to_cpu(trunc->truncate_seq); 3470 u64 truncate_size = le64_to_cpu(trunc->truncate_size); 3471 u64 size = le64_to_cpu(trunc->size); 3472 int implemented = 0; 3473 int dirty = __ceph_caps_dirty(ci); 3474 int issued = __ceph_caps_issued(ceph_inode(inode), &implemented); 3475 int queue_trunc = 0; 3476 3477 issued |= implemented | dirty; 3478 3479 dout("handle_cap_trunc inode %p mds%d seq %d to %lld seq %d\n", 3480 inode, mds, seq, truncate_size, truncate_seq); 3481 queue_trunc = ceph_fill_file_size(inode, issued, 3482 truncate_seq, truncate_size, size); 3483 spin_unlock(&ci->i_ceph_lock); 3484 3485 if (queue_trunc) 3486 ceph_queue_vmtruncate(inode); 3487 } 3488 3489 /* 3490 * Handle EXPORT from MDS. Cap is being migrated _from_ this mds to a 3491 * different one. If we are the most recent migration we've seen (as 3492 * indicated by mseq), make note of the migrating cap bits for the 3493 * duration (until we see the corresponding IMPORT). 3494 * 3495 * caller holds s_mutex 3496 */ 3497 static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, 3498 struct ceph_mds_cap_peer *ph, 3499 struct ceph_mds_session *session) 3500 { 3501 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 3502 struct ceph_mds_session *tsession = NULL; 3503 struct ceph_cap *cap, *tcap, *new_cap = NULL; 3504 struct ceph_inode_info *ci = ceph_inode(inode); 3505 u64 t_cap_id; 3506 unsigned mseq = le32_to_cpu(ex->migrate_seq); 3507 unsigned t_seq, t_mseq; 3508 int target, issued; 3509 int mds = session->s_mds; 3510 3511 if (ph) { 3512 t_cap_id = le64_to_cpu(ph->cap_id); 3513 t_seq = le32_to_cpu(ph->seq); 3514 t_mseq = le32_to_cpu(ph->mseq); 3515 target = le32_to_cpu(ph->mds); 3516 } else { 3517 t_cap_id = t_seq = t_mseq = 0; 3518 target = -1; 3519 } 3520 3521 dout("handle_cap_export inode %p ci %p mds%d mseq %d target %d\n", 3522 inode, ci, mds, mseq, target); 3523 retry: 3524 spin_lock(&ci->i_ceph_lock); 3525 cap = __get_cap_for_mds(ci, mds); 3526 if (!cap || cap->cap_id != le64_to_cpu(ex->cap_id)) 3527 goto out_unlock; 3528 3529 if (target < 0) { 3530 __ceph_remove_cap(cap, false); 3531 if (!ci->i_auth_cap) 3532 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED; 3533 goto out_unlock; 3534 } 3535 3536 /* 3537 * now we know we haven't received the cap import message yet 3538 * because the exported cap still exist. 3539 */ 3540 3541 issued = cap->issued; 3542 if (issued != cap->implemented) 3543 pr_err_ratelimited("handle_cap_export: issued != implemented: " 3544 "ino (%llx.%llx) mds%d seq %d mseq %d " 3545 "issued %s implemented %s\n", 3546 ceph_vinop(inode), mds, cap->seq, cap->mseq, 3547 ceph_cap_string(issued), 3548 ceph_cap_string(cap->implemented)); 3549 3550 3551 tcap = __get_cap_for_mds(ci, target); 3552 if (tcap) { 3553 /* already have caps from the target */ 3554 if (tcap->cap_id == t_cap_id && 3555 ceph_seq_cmp(tcap->seq, t_seq) < 0) { 3556 dout(" updating import cap %p mds%d\n", tcap, target); 3557 tcap->cap_id = t_cap_id; 3558 tcap->seq = t_seq - 1; 3559 tcap->issue_seq = t_seq - 1; 3560 tcap->mseq = t_mseq; 3561 tcap->issued |= issued; 3562 tcap->implemented |= issued; 3563 if (cap == ci->i_auth_cap) 3564 ci->i_auth_cap = tcap; 3565 3566 if (!list_empty(&ci->i_cap_flush_list) && 3567 ci->i_auth_cap == tcap) { 3568 spin_lock(&mdsc->cap_dirty_lock); 3569 list_move_tail(&ci->i_flushing_item, 3570 &tcap->session->s_cap_flushing); 3571 spin_unlock(&mdsc->cap_dirty_lock); 3572 } 3573 } 3574 __ceph_remove_cap(cap, false); 3575 goto out_unlock; 3576 } else if (tsession) { 3577 /* add placeholder for the export tagert */ 3578 int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0; 3579 tcap = new_cap; 3580 ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0, 3581 t_seq - 1, t_mseq, (u64)-1, flag, &new_cap); 3582 3583 if (!list_empty(&ci->i_cap_flush_list) && 3584 ci->i_auth_cap == tcap) { 3585 spin_lock(&mdsc->cap_dirty_lock); 3586 list_move_tail(&ci->i_flushing_item, 3587 &tcap->session->s_cap_flushing); 3588 spin_unlock(&mdsc->cap_dirty_lock); 3589 } 3590 3591 __ceph_remove_cap(cap, false); 3592 goto out_unlock; 3593 } 3594 3595 spin_unlock(&ci->i_ceph_lock); 3596 mutex_unlock(&session->s_mutex); 3597 3598 /* open target session */ 3599 tsession = ceph_mdsc_open_export_target_session(mdsc, target); 3600 if (!IS_ERR(tsession)) { 3601 if (mds > target) { 3602 mutex_lock(&session->s_mutex); 3603 mutex_lock_nested(&tsession->s_mutex, 3604 SINGLE_DEPTH_NESTING); 3605 } else { 3606 mutex_lock(&tsession->s_mutex); 3607 mutex_lock_nested(&session->s_mutex, 3608 SINGLE_DEPTH_NESTING); 3609 } 3610 new_cap = ceph_get_cap(mdsc, NULL); 3611 } else { 3612 WARN_ON(1); 3613 tsession = NULL; 3614 target = -1; 3615 } 3616 goto retry; 3617 3618 out_unlock: 3619 spin_unlock(&ci->i_ceph_lock); 3620 mutex_unlock(&session->s_mutex); 3621 if (tsession) { 3622 mutex_unlock(&tsession->s_mutex); 3623 ceph_put_mds_session(tsession); 3624 } 3625 if (new_cap) 3626 ceph_put_cap(mdsc, new_cap); 3627 } 3628 3629 /* 3630 * Handle cap IMPORT. 3631 * 3632 * caller holds s_mutex. acquires i_ceph_lock 3633 */ 3634 static void handle_cap_import(struct ceph_mds_client *mdsc, 3635 struct inode *inode, struct ceph_mds_caps *im, 3636 struct ceph_mds_cap_peer *ph, 3637 struct ceph_mds_session *session, 3638 struct ceph_cap **target_cap, int *old_issued) 3639 __acquires(ci->i_ceph_lock) 3640 { 3641 struct ceph_inode_info *ci = ceph_inode(inode); 3642 struct ceph_cap *cap, *ocap, *new_cap = NULL; 3643 int mds = session->s_mds; 3644 int issued; 3645 unsigned caps = le32_to_cpu(im->caps); 3646 unsigned wanted = le32_to_cpu(im->wanted); 3647 unsigned seq = le32_to_cpu(im->seq); 3648 unsigned mseq = le32_to_cpu(im->migrate_seq); 3649 u64 realmino = le64_to_cpu(im->realm); 3650 u64 cap_id = le64_to_cpu(im->cap_id); 3651 u64 p_cap_id; 3652 int peer; 3653 3654 if (ph) { 3655 p_cap_id = le64_to_cpu(ph->cap_id); 3656 peer = le32_to_cpu(ph->mds); 3657 } else { 3658 p_cap_id = 0; 3659 peer = -1; 3660 } 3661 3662 dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n", 3663 inode, ci, mds, mseq, peer); 3664 3665 retry: 3666 spin_lock(&ci->i_ceph_lock); 3667 cap = __get_cap_for_mds(ci, mds); 3668 if (!cap) { 3669 if (!new_cap) { 3670 spin_unlock(&ci->i_ceph_lock); 3671 new_cap = ceph_get_cap(mdsc, NULL); 3672 goto retry; 3673 } 3674 cap = new_cap; 3675 } else { 3676 if (new_cap) { 3677 ceph_put_cap(mdsc, new_cap); 3678 new_cap = NULL; 3679 } 3680 } 3681 3682 __ceph_caps_issued(ci, &issued); 3683 issued |= __ceph_caps_dirty(ci); 3684 3685 ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq, 3686 realmino, CEPH_CAP_FLAG_AUTH, &new_cap); 3687 3688 ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL; 3689 if (ocap && ocap->cap_id == p_cap_id) { 3690 dout(" remove export cap %p mds%d flags %d\n", 3691 ocap, peer, ph->flags); 3692 if ((ph->flags & CEPH_CAP_FLAG_AUTH) && 3693 (ocap->seq != le32_to_cpu(ph->seq) || 3694 ocap->mseq != le32_to_cpu(ph->mseq))) { 3695 pr_err_ratelimited("handle_cap_import: " 3696 "mismatched seq/mseq: ino (%llx.%llx) " 3697 "mds%d seq %d mseq %d importer mds%d " 3698 "has peer seq %d mseq %d\n", 3699 ceph_vinop(inode), peer, ocap->seq, 3700 ocap->mseq, mds, le32_to_cpu(ph->seq), 3701 le32_to_cpu(ph->mseq)); 3702 } 3703 __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE)); 3704 } 3705 3706 /* make sure we re-request max_size, if necessary */ 3707 ci->i_requested_max_size = 0; 3708 3709 *old_issued = issued; 3710 *target_cap = cap; 3711 } 3712 3713 /* 3714 * Handle a caps message from the MDS. 3715 * 3716 * Identify the appropriate session, inode, and call the right handler 3717 * based on the cap op. 3718 */ 3719 void ceph_handle_caps(struct ceph_mds_session *session, 3720 struct ceph_msg *msg) 3721 { 3722 struct ceph_mds_client *mdsc = session->s_mdsc; 3723 struct super_block *sb = mdsc->fsc->sb; 3724 struct inode *inode; 3725 struct ceph_inode_info *ci; 3726 struct ceph_cap *cap; 3727 struct ceph_mds_caps *h; 3728 struct ceph_mds_cap_peer *peer = NULL; 3729 struct ceph_snap_realm *realm = NULL; 3730 struct ceph_string *pool_ns = NULL; 3731 int mds = session->s_mds; 3732 int op, issued; 3733 u32 seq, mseq; 3734 struct ceph_vino vino; 3735 u64 tid; 3736 u64 inline_version = 0; 3737 void *inline_data = NULL; 3738 u32 inline_len = 0; 3739 void *snaptrace; 3740 size_t snaptrace_len; 3741 void *p, *end; 3742 3743 dout("handle_caps from mds%d\n", mds); 3744 3745 /* decode */ 3746 end = msg->front.iov_base + msg->front.iov_len; 3747 tid = le64_to_cpu(msg->hdr.tid); 3748 if (msg->front.iov_len < sizeof(*h)) 3749 goto bad; 3750 h = msg->front.iov_base; 3751 op = le32_to_cpu(h->op); 3752 vino.ino = le64_to_cpu(h->ino); 3753 vino.snap = CEPH_NOSNAP; 3754 seq = le32_to_cpu(h->seq); 3755 mseq = le32_to_cpu(h->migrate_seq); 3756 3757 snaptrace = h + 1; 3758 snaptrace_len = le32_to_cpu(h->snap_trace_len); 3759 p = snaptrace + snaptrace_len; 3760 3761 if (le16_to_cpu(msg->hdr.version) >= 2) { 3762 u32 flock_len; 3763 ceph_decode_32_safe(&p, end, flock_len, bad); 3764 if (p + flock_len > end) 3765 goto bad; 3766 p += flock_len; 3767 } 3768 3769 if (le16_to_cpu(msg->hdr.version) >= 3) { 3770 if (op == CEPH_CAP_OP_IMPORT) { 3771 if (p + sizeof(*peer) > end) 3772 goto bad; 3773 peer = p; 3774 p += sizeof(*peer); 3775 } else if (op == CEPH_CAP_OP_EXPORT) { 3776 /* recorded in unused fields */ 3777 peer = (void *)&h->size; 3778 } 3779 } 3780 3781 if (le16_to_cpu(msg->hdr.version) >= 4) { 3782 ceph_decode_64_safe(&p, end, inline_version, bad); 3783 ceph_decode_32_safe(&p, end, inline_len, bad); 3784 if (p + inline_len > end) 3785 goto bad; 3786 inline_data = p; 3787 p += inline_len; 3788 } 3789 3790 if (le16_to_cpu(msg->hdr.version) >= 5) { 3791 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 3792 u32 epoch_barrier; 3793 3794 ceph_decode_32_safe(&p, end, epoch_barrier, bad); 3795 ceph_osdc_update_epoch_barrier(osdc, epoch_barrier); 3796 } 3797 3798 if (le16_to_cpu(msg->hdr.version) >= 8) { 3799 u64 flush_tid; 3800 u32 caller_uid, caller_gid; 3801 u32 pool_ns_len; 3802 3803 /* version >= 6 */ 3804 ceph_decode_64_safe(&p, end, flush_tid, bad); 3805 /* version >= 7 */ 3806 ceph_decode_32_safe(&p, end, caller_uid, bad); 3807 ceph_decode_32_safe(&p, end, caller_gid, bad); 3808 /* version >= 8 */ 3809 ceph_decode_32_safe(&p, end, pool_ns_len, bad); 3810 if (pool_ns_len > 0) { 3811 ceph_decode_need(&p, end, pool_ns_len, bad); 3812 pool_ns = ceph_find_or_create_string(p, pool_ns_len); 3813 p += pool_ns_len; 3814 } 3815 } 3816 3817 /* lookup ino */ 3818 inode = ceph_find_inode(sb, vino); 3819 ci = ceph_inode(inode); 3820 dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino, 3821 vino.snap, inode); 3822 3823 mutex_lock(&session->s_mutex); 3824 session->s_seq++; 3825 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, 3826 (unsigned)seq); 3827 3828 if (!inode) { 3829 dout(" i don't have ino %llx\n", vino.ino); 3830 3831 if (op == CEPH_CAP_OP_IMPORT) { 3832 cap = ceph_get_cap(mdsc, NULL); 3833 cap->cap_ino = vino.ino; 3834 cap->queue_release = 1; 3835 cap->cap_id = le64_to_cpu(h->cap_id); 3836 cap->mseq = mseq; 3837 cap->seq = seq; 3838 cap->issue_seq = seq; 3839 spin_lock(&session->s_cap_lock); 3840 list_add_tail(&cap->session_caps, 3841 &session->s_cap_releases); 3842 session->s_num_cap_releases++; 3843 spin_unlock(&session->s_cap_lock); 3844 } 3845 goto flush_cap_releases; 3846 } 3847 3848 /* these will work even if we don't have a cap yet */ 3849 switch (op) { 3850 case CEPH_CAP_OP_FLUSHSNAP_ACK: 3851 handle_cap_flushsnap_ack(inode, tid, h, session); 3852 goto done; 3853 3854 case CEPH_CAP_OP_EXPORT: 3855 handle_cap_export(inode, h, peer, session); 3856 goto done_unlocked; 3857 3858 case CEPH_CAP_OP_IMPORT: 3859 realm = NULL; 3860 if (snaptrace_len) { 3861 down_write(&mdsc->snap_rwsem); 3862 ceph_update_snap_trace(mdsc, snaptrace, 3863 snaptrace + snaptrace_len, 3864 false, &realm); 3865 downgrade_write(&mdsc->snap_rwsem); 3866 } else { 3867 down_read(&mdsc->snap_rwsem); 3868 } 3869 handle_cap_import(mdsc, inode, h, peer, session, 3870 &cap, &issued); 3871 handle_cap_grant(mdsc, inode, h, &pool_ns, 3872 inline_version, inline_data, inline_len, 3873 msg->middle, session, cap, issued); 3874 if (realm) 3875 ceph_put_snap_realm(mdsc, realm); 3876 goto done_unlocked; 3877 } 3878 3879 /* the rest require a cap */ 3880 spin_lock(&ci->i_ceph_lock); 3881 cap = __get_cap_for_mds(ceph_inode(inode), mds); 3882 if (!cap) { 3883 dout(" no cap on %p ino %llx.%llx from mds%d\n", 3884 inode, ceph_ino(inode), ceph_snap(inode), mds); 3885 spin_unlock(&ci->i_ceph_lock); 3886 goto flush_cap_releases; 3887 } 3888 3889 /* note that each of these drops i_ceph_lock for us */ 3890 switch (op) { 3891 case CEPH_CAP_OP_REVOKE: 3892 case CEPH_CAP_OP_GRANT: 3893 __ceph_caps_issued(ci, &issued); 3894 issued |= __ceph_caps_dirty(ci); 3895 handle_cap_grant(mdsc, inode, h, &pool_ns, 3896 inline_version, inline_data, inline_len, 3897 msg->middle, session, cap, issued); 3898 goto done_unlocked; 3899 3900 case CEPH_CAP_OP_FLUSH_ACK: 3901 handle_cap_flush_ack(inode, tid, h, session, cap); 3902 break; 3903 3904 case CEPH_CAP_OP_TRUNC: 3905 handle_cap_trunc(inode, h, session); 3906 break; 3907 3908 default: 3909 spin_unlock(&ci->i_ceph_lock); 3910 pr_err("ceph_handle_caps: unknown cap op %d %s\n", op, 3911 ceph_cap_op_name(op)); 3912 } 3913 3914 goto done; 3915 3916 flush_cap_releases: 3917 /* 3918 * send any cap release message to try to move things 3919 * along for the mds (who clearly thinks we still have this 3920 * cap). 3921 */ 3922 ceph_send_cap_releases(mdsc, session); 3923 3924 done: 3925 mutex_unlock(&session->s_mutex); 3926 done_unlocked: 3927 iput(inode); 3928 ceph_put_string(pool_ns); 3929 return; 3930 3931 bad: 3932 pr_err("ceph_handle_caps: corrupt message\n"); 3933 ceph_msg_dump(msg); 3934 return; 3935 } 3936 3937 /* 3938 * Delayed work handler to process end of delayed cap release LRU list. 3939 */ 3940 void ceph_check_delayed_caps(struct ceph_mds_client *mdsc) 3941 { 3942 struct inode *inode; 3943 struct ceph_inode_info *ci; 3944 int flags = CHECK_CAPS_NODELAY; 3945 3946 dout("check_delayed_caps\n"); 3947 while (1) { 3948 spin_lock(&mdsc->cap_delay_lock); 3949 if (list_empty(&mdsc->cap_delay_list)) 3950 break; 3951 ci = list_first_entry(&mdsc->cap_delay_list, 3952 struct ceph_inode_info, 3953 i_cap_delay_list); 3954 if ((ci->i_ceph_flags & CEPH_I_FLUSH) == 0 && 3955 time_before(jiffies, ci->i_hold_caps_max)) 3956 break; 3957 list_del_init(&ci->i_cap_delay_list); 3958 3959 inode = igrab(&ci->vfs_inode); 3960 spin_unlock(&mdsc->cap_delay_lock); 3961 3962 if (inode) { 3963 dout("check_delayed_caps on %p\n", inode); 3964 ceph_check_caps(ci, flags, NULL); 3965 iput(inode); 3966 } 3967 } 3968 spin_unlock(&mdsc->cap_delay_lock); 3969 } 3970 3971 /* 3972 * Flush all dirty caps to the mds 3973 */ 3974 void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc) 3975 { 3976 struct ceph_inode_info *ci; 3977 struct inode *inode; 3978 3979 dout("flush_dirty_caps\n"); 3980 spin_lock(&mdsc->cap_dirty_lock); 3981 while (!list_empty(&mdsc->cap_dirty)) { 3982 ci = list_first_entry(&mdsc->cap_dirty, struct ceph_inode_info, 3983 i_dirty_item); 3984 inode = &ci->vfs_inode; 3985 ihold(inode); 3986 dout("flush_dirty_caps %p\n", inode); 3987 spin_unlock(&mdsc->cap_dirty_lock); 3988 ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_FLUSH, NULL); 3989 iput(inode); 3990 spin_lock(&mdsc->cap_dirty_lock); 3991 } 3992 spin_unlock(&mdsc->cap_dirty_lock); 3993 dout("flush_dirty_caps done\n"); 3994 } 3995 3996 void __ceph_get_fmode(struct ceph_inode_info *ci, int fmode) 3997 { 3998 int i; 3999 int bits = (fmode << 1) | 1; 4000 for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { 4001 if (bits & (1 << i)) 4002 ci->i_nr_by_mode[i]++; 4003 } 4004 } 4005 4006 /* 4007 * Drop open file reference. If we were the last open file, 4008 * we may need to release capabilities to the MDS (or schedule 4009 * their delayed release). 4010 */ 4011 void ceph_put_fmode(struct ceph_inode_info *ci, int fmode) 4012 { 4013 int i, last = 0; 4014 int bits = (fmode << 1) | 1; 4015 spin_lock(&ci->i_ceph_lock); 4016 for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { 4017 if (bits & (1 << i)) { 4018 BUG_ON(ci->i_nr_by_mode[i] == 0); 4019 if (--ci->i_nr_by_mode[i] == 0) 4020 last++; 4021 } 4022 } 4023 dout("put_fmode %p fmode %d {%d,%d,%d,%d}\n", 4024 &ci->vfs_inode, fmode, 4025 ci->i_nr_by_mode[0], ci->i_nr_by_mode[1], 4026 ci->i_nr_by_mode[2], ci->i_nr_by_mode[3]); 4027 spin_unlock(&ci->i_ceph_lock); 4028 4029 if (last && ci->i_vino.snap == CEPH_NOSNAP) 4030 ceph_check_caps(ci, 0, NULL); 4031 } 4032 4033 /* 4034 * For a soon-to-be unlinked file, drop the AUTH_RDCACHE caps. If it 4035 * looks like the link count will hit 0, drop any other caps (other 4036 * than PIN) we don't specifically want (due to the file still being 4037 * open). 4038 */ 4039 int ceph_drop_caps_for_unlink(struct inode *inode) 4040 { 4041 struct ceph_inode_info *ci = ceph_inode(inode); 4042 int drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL; 4043 4044 spin_lock(&ci->i_ceph_lock); 4045 if (inode->i_nlink == 1) { 4046 drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN); 4047 4048 ci->i_ceph_flags |= CEPH_I_NODELAY; 4049 if (__ceph_caps_dirty(ci)) { 4050 struct ceph_mds_client *mdsc = 4051 ceph_inode_to_client(inode)->mdsc; 4052 __cap_delay_requeue_front(mdsc, ci); 4053 } 4054 } 4055 spin_unlock(&ci->i_ceph_lock); 4056 return drop; 4057 } 4058 4059 /* 4060 * Helpers for embedding cap and dentry lease releases into mds 4061 * requests. 4062 * 4063 * @force is used by dentry_release (below) to force inclusion of a 4064 * record for the directory inode, even when there aren't any caps to 4065 * drop. 4066 */ 4067 int ceph_encode_inode_release(void **p, struct inode *inode, 4068 int mds, int drop, int unless, int force) 4069 { 4070 struct ceph_inode_info *ci = ceph_inode(inode); 4071 struct ceph_cap *cap; 4072 struct ceph_mds_request_release *rel = *p; 4073 int used, dirty; 4074 int ret = 0; 4075 4076 spin_lock(&ci->i_ceph_lock); 4077 used = __ceph_caps_used(ci); 4078 dirty = __ceph_caps_dirty(ci); 4079 4080 dout("encode_inode_release %p mds%d used|dirty %s drop %s unless %s\n", 4081 inode, mds, ceph_cap_string(used|dirty), ceph_cap_string(drop), 4082 ceph_cap_string(unless)); 4083 4084 /* only drop unused, clean caps */ 4085 drop &= ~(used | dirty); 4086 4087 cap = __get_cap_for_mds(ci, mds); 4088 if (cap && __cap_is_valid(cap)) { 4089 unless &= cap->issued; 4090 if (unless) { 4091 if (unless & CEPH_CAP_AUTH_EXCL) 4092 drop &= ~CEPH_CAP_AUTH_SHARED; 4093 if (unless & CEPH_CAP_LINK_EXCL) 4094 drop &= ~CEPH_CAP_LINK_SHARED; 4095 if (unless & CEPH_CAP_XATTR_EXCL) 4096 drop &= ~CEPH_CAP_XATTR_SHARED; 4097 if (unless & CEPH_CAP_FILE_EXCL) 4098 drop &= ~CEPH_CAP_FILE_SHARED; 4099 } 4100 4101 if (force || (cap->issued & drop)) { 4102 if (cap->issued & drop) { 4103 int wanted = __ceph_caps_wanted(ci); 4104 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0) 4105 wanted |= cap->mds_wanted; 4106 dout("encode_inode_release %p cap %p " 4107 "%s -> %s, wanted %s -> %s\n", inode, cap, 4108 ceph_cap_string(cap->issued), 4109 ceph_cap_string(cap->issued & ~drop), 4110 ceph_cap_string(cap->mds_wanted), 4111 ceph_cap_string(wanted)); 4112 4113 cap->issued &= ~drop; 4114 cap->implemented &= ~drop; 4115 cap->mds_wanted = wanted; 4116 } else { 4117 dout("encode_inode_release %p cap %p %s" 4118 " (force)\n", inode, cap, 4119 ceph_cap_string(cap->issued)); 4120 } 4121 4122 rel->ino = cpu_to_le64(ceph_ino(inode)); 4123 rel->cap_id = cpu_to_le64(cap->cap_id); 4124 rel->seq = cpu_to_le32(cap->seq); 4125 rel->issue_seq = cpu_to_le32(cap->issue_seq); 4126 rel->mseq = cpu_to_le32(cap->mseq); 4127 rel->caps = cpu_to_le32(cap->implemented); 4128 rel->wanted = cpu_to_le32(cap->mds_wanted); 4129 rel->dname_len = 0; 4130 rel->dname_seq = 0; 4131 *p += sizeof(*rel); 4132 ret = 1; 4133 } else { 4134 dout("encode_inode_release %p cap %p %s (noop)\n", 4135 inode, cap, ceph_cap_string(cap->issued)); 4136 } 4137 } 4138 spin_unlock(&ci->i_ceph_lock); 4139 return ret; 4140 } 4141 4142 int ceph_encode_dentry_release(void **p, struct dentry *dentry, 4143 struct inode *dir, 4144 int mds, int drop, int unless) 4145 { 4146 struct dentry *parent = NULL; 4147 struct ceph_mds_request_release *rel = *p; 4148 struct ceph_dentry_info *di = ceph_dentry(dentry); 4149 int force = 0; 4150 int ret; 4151 4152 /* 4153 * force an record for the directory caps if we have a dentry lease. 4154 * this is racy (can't take i_ceph_lock and d_lock together), but it 4155 * doesn't have to be perfect; the mds will revoke anything we don't 4156 * release. 4157 */ 4158 spin_lock(&dentry->d_lock); 4159 if (di->lease_session && di->lease_session->s_mds == mds) 4160 force = 1; 4161 if (!dir) { 4162 parent = dget(dentry->d_parent); 4163 dir = d_inode(parent); 4164 } 4165 spin_unlock(&dentry->d_lock); 4166 4167 ret = ceph_encode_inode_release(p, dir, mds, drop, unless, force); 4168 dput(parent); 4169 4170 spin_lock(&dentry->d_lock); 4171 if (ret && di->lease_session && di->lease_session->s_mds == mds) { 4172 dout("encode_dentry_release %p mds%d seq %d\n", 4173 dentry, mds, (int)di->lease_seq); 4174 rel->dname_len = cpu_to_le32(dentry->d_name.len); 4175 memcpy(*p, dentry->d_name.name, dentry->d_name.len); 4176 *p += dentry->d_name.len; 4177 rel->dname_seq = cpu_to_le32(di->lease_seq); 4178 __ceph_mdsc_drop_dentry_lease(dentry); 4179 } 4180 spin_unlock(&dentry->d_lock); 4181 return ret; 4182 } 4183