1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/fs.h> 4 #include <linux/kernel.h> 5 #include <linux/sched/signal.h> 6 #include <linux/slab.h> 7 #include <linux/vmalloc.h> 8 #include <linux/wait.h> 9 #include <linux/writeback.h> 10 11 #include "super.h" 12 #include "mds_client.h" 13 #include "cache.h" 14 #include <linux/ceph/decode.h> 15 #include <linux/ceph/messenger.h> 16 17 /* 18 * Capability management 19 * 20 * The Ceph metadata servers control client access to inode metadata 21 * and file data by issuing capabilities, granting clients permission 22 * to read and/or write both inode field and file data to OSDs 23 * (storage nodes). Each capability consists of a set of bits 24 * indicating which operations are allowed. 25 * 26 * If the client holds a *_SHARED cap, the client has a coherent value 27 * that can be safely read from the cached inode. 28 * 29 * In the case of a *_EXCL (exclusive) or FILE_WR capabilities, the 30 * client is allowed to change inode attributes (e.g., file size, 31 * mtime), note its dirty state in the ceph_cap, and asynchronously 32 * flush that metadata change to the MDS. 33 * 34 * In the event of a conflicting operation (perhaps by another 35 * client), the MDS will revoke the conflicting client capabilities. 36 * 37 * In order for a client to cache an inode, it must hold a capability 38 * with at least one MDS server. When inodes are released, release 39 * notifications are batched and periodically sent en masse to the MDS 40 * cluster to release server state. 41 */ 42 43 static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc); 44 static void __kick_flushing_caps(struct ceph_mds_client *mdsc, 45 struct ceph_mds_session *session, 46 struct ceph_inode_info *ci, 47 u64 oldest_flush_tid); 48 49 /* 50 * Generate readable cap strings for debugging output. 51 */ 52 #define MAX_CAP_STR 20 53 static char cap_str[MAX_CAP_STR][40]; 54 static DEFINE_SPINLOCK(cap_str_lock); 55 static int last_cap_str; 56 57 static char *gcap_string(char *s, int c) 58 { 59 if (c & CEPH_CAP_GSHARED) 60 *s++ = 's'; 61 if (c & CEPH_CAP_GEXCL) 62 *s++ = 'x'; 63 if (c & CEPH_CAP_GCACHE) 64 *s++ = 'c'; 65 if (c & CEPH_CAP_GRD) 66 *s++ = 'r'; 67 if (c & CEPH_CAP_GWR) 68 *s++ = 'w'; 69 if (c & CEPH_CAP_GBUFFER) 70 *s++ = 'b'; 71 if (c & CEPH_CAP_GLAZYIO) 72 *s++ = 'l'; 73 return s; 74 } 75 76 const char *ceph_cap_string(int caps) 77 { 78 int i; 79 char *s; 80 int c; 81 82 spin_lock(&cap_str_lock); 83 i = last_cap_str++; 84 if (last_cap_str == MAX_CAP_STR) 85 last_cap_str = 0; 86 spin_unlock(&cap_str_lock); 87 88 s = cap_str[i]; 89 90 if (caps & CEPH_CAP_PIN) 91 *s++ = 'p'; 92 93 c = (caps >> CEPH_CAP_SAUTH) & 3; 94 if (c) { 95 *s++ = 'A'; 96 s = gcap_string(s, c); 97 } 98 99 c = (caps >> CEPH_CAP_SLINK) & 3; 100 if (c) { 101 *s++ = 'L'; 102 s = gcap_string(s, c); 103 } 104 105 c = (caps >> CEPH_CAP_SXATTR) & 3; 106 if (c) { 107 *s++ = 'X'; 108 s = gcap_string(s, c); 109 } 110 111 c = caps >> CEPH_CAP_SFILE; 112 if (c) { 113 *s++ = 'F'; 114 s = gcap_string(s, c); 115 } 116 117 if (s == cap_str[i]) 118 *s++ = '-'; 119 *s = 0; 120 return cap_str[i]; 121 } 122 123 void ceph_caps_init(struct ceph_mds_client *mdsc) 124 { 125 INIT_LIST_HEAD(&mdsc->caps_list); 126 spin_lock_init(&mdsc->caps_list_lock); 127 } 128 129 void ceph_caps_finalize(struct ceph_mds_client *mdsc) 130 { 131 struct ceph_cap *cap; 132 133 spin_lock(&mdsc->caps_list_lock); 134 while (!list_empty(&mdsc->caps_list)) { 135 cap = list_first_entry(&mdsc->caps_list, 136 struct ceph_cap, caps_item); 137 list_del(&cap->caps_item); 138 kmem_cache_free(ceph_cap_cachep, cap); 139 } 140 mdsc->caps_total_count = 0; 141 mdsc->caps_avail_count = 0; 142 mdsc->caps_use_count = 0; 143 mdsc->caps_reserve_count = 0; 144 mdsc->caps_min_count = 0; 145 spin_unlock(&mdsc->caps_list_lock); 146 } 147 148 void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta) 149 { 150 spin_lock(&mdsc->caps_list_lock); 151 mdsc->caps_min_count += delta; 152 BUG_ON(mdsc->caps_min_count < 0); 153 spin_unlock(&mdsc->caps_list_lock); 154 } 155 156 void ceph_reserve_caps(struct ceph_mds_client *mdsc, 157 struct ceph_cap_reservation *ctx, int need) 158 { 159 int i; 160 struct ceph_cap *cap; 161 int have; 162 int alloc = 0; 163 LIST_HEAD(newcaps); 164 165 dout("reserve caps ctx=%p need=%d\n", ctx, need); 166 167 /* first reserve any caps that are already allocated */ 168 spin_lock(&mdsc->caps_list_lock); 169 if (mdsc->caps_avail_count >= need) 170 have = need; 171 else 172 have = mdsc->caps_avail_count; 173 mdsc->caps_avail_count -= have; 174 mdsc->caps_reserve_count += have; 175 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 176 mdsc->caps_reserve_count + 177 mdsc->caps_avail_count); 178 spin_unlock(&mdsc->caps_list_lock); 179 180 for (i = have; i < need; i++) { 181 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); 182 if (!cap) 183 break; 184 list_add(&cap->caps_item, &newcaps); 185 alloc++; 186 } 187 /* we didn't manage to reserve as much as we needed */ 188 if (have + alloc != need) 189 pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n", 190 ctx, need, have + alloc); 191 192 spin_lock(&mdsc->caps_list_lock); 193 mdsc->caps_total_count += alloc; 194 mdsc->caps_reserve_count += alloc; 195 list_splice(&newcaps, &mdsc->caps_list); 196 197 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 198 mdsc->caps_reserve_count + 199 mdsc->caps_avail_count); 200 spin_unlock(&mdsc->caps_list_lock); 201 202 ctx->count = need; 203 dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n", 204 ctx, mdsc->caps_total_count, mdsc->caps_use_count, 205 mdsc->caps_reserve_count, mdsc->caps_avail_count); 206 } 207 208 int ceph_unreserve_caps(struct ceph_mds_client *mdsc, 209 struct ceph_cap_reservation *ctx) 210 { 211 dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count); 212 if (ctx->count) { 213 spin_lock(&mdsc->caps_list_lock); 214 BUG_ON(mdsc->caps_reserve_count < ctx->count); 215 mdsc->caps_reserve_count -= ctx->count; 216 mdsc->caps_avail_count += ctx->count; 217 ctx->count = 0; 218 dout("unreserve caps %d = %d used + %d resv + %d avail\n", 219 mdsc->caps_total_count, mdsc->caps_use_count, 220 mdsc->caps_reserve_count, mdsc->caps_avail_count); 221 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 222 mdsc->caps_reserve_count + 223 mdsc->caps_avail_count); 224 spin_unlock(&mdsc->caps_list_lock); 225 } 226 return 0; 227 } 228 229 struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc, 230 struct ceph_cap_reservation *ctx) 231 { 232 struct ceph_cap *cap = NULL; 233 234 /* temporary, until we do something about cap import/export */ 235 if (!ctx) { 236 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS); 237 if (cap) { 238 spin_lock(&mdsc->caps_list_lock); 239 mdsc->caps_use_count++; 240 mdsc->caps_total_count++; 241 spin_unlock(&mdsc->caps_list_lock); 242 } 243 return cap; 244 } 245 246 spin_lock(&mdsc->caps_list_lock); 247 dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n", 248 ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count, 249 mdsc->caps_reserve_count, mdsc->caps_avail_count); 250 BUG_ON(!ctx->count); 251 BUG_ON(ctx->count > mdsc->caps_reserve_count); 252 BUG_ON(list_empty(&mdsc->caps_list)); 253 254 ctx->count--; 255 mdsc->caps_reserve_count--; 256 mdsc->caps_use_count++; 257 258 cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item); 259 list_del(&cap->caps_item); 260 261 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 262 mdsc->caps_reserve_count + mdsc->caps_avail_count); 263 spin_unlock(&mdsc->caps_list_lock); 264 return cap; 265 } 266 267 void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap) 268 { 269 spin_lock(&mdsc->caps_list_lock); 270 dout("put_cap %p %d = %d used + %d resv + %d avail\n", 271 cap, mdsc->caps_total_count, mdsc->caps_use_count, 272 mdsc->caps_reserve_count, mdsc->caps_avail_count); 273 mdsc->caps_use_count--; 274 /* 275 * Keep some preallocated caps around (ceph_min_count), to 276 * avoid lots of free/alloc churn. 277 */ 278 if (mdsc->caps_avail_count >= mdsc->caps_reserve_count + 279 mdsc->caps_min_count) { 280 mdsc->caps_total_count--; 281 kmem_cache_free(ceph_cap_cachep, cap); 282 } else { 283 mdsc->caps_avail_count++; 284 list_add(&cap->caps_item, &mdsc->caps_list); 285 } 286 287 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count + 288 mdsc->caps_reserve_count + mdsc->caps_avail_count); 289 spin_unlock(&mdsc->caps_list_lock); 290 } 291 292 void ceph_reservation_status(struct ceph_fs_client *fsc, 293 int *total, int *avail, int *used, int *reserved, 294 int *min) 295 { 296 struct ceph_mds_client *mdsc = fsc->mdsc; 297 298 if (total) 299 *total = mdsc->caps_total_count; 300 if (avail) 301 *avail = mdsc->caps_avail_count; 302 if (used) 303 *used = mdsc->caps_use_count; 304 if (reserved) 305 *reserved = mdsc->caps_reserve_count; 306 if (min) 307 *min = mdsc->caps_min_count; 308 } 309 310 /* 311 * Find ceph_cap for given mds, if any. 312 * 313 * Called with i_ceph_lock held. 314 */ 315 static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds) 316 { 317 struct ceph_cap *cap; 318 struct rb_node *n = ci->i_caps.rb_node; 319 320 while (n) { 321 cap = rb_entry(n, struct ceph_cap, ci_node); 322 if (mds < cap->mds) 323 n = n->rb_left; 324 else if (mds > cap->mds) 325 n = n->rb_right; 326 else 327 return cap; 328 } 329 return NULL; 330 } 331 332 struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds) 333 { 334 struct ceph_cap *cap; 335 336 spin_lock(&ci->i_ceph_lock); 337 cap = __get_cap_for_mds(ci, mds); 338 spin_unlock(&ci->i_ceph_lock); 339 return cap; 340 } 341 342 /* 343 * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1. 344 */ 345 static int __ceph_get_cap_mds(struct ceph_inode_info *ci) 346 { 347 struct ceph_cap *cap; 348 int mds = -1; 349 struct rb_node *p; 350 351 /* prefer mds with WR|BUFFER|EXCL caps */ 352 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 353 cap = rb_entry(p, struct ceph_cap, ci_node); 354 mds = cap->mds; 355 if (cap->issued & (CEPH_CAP_FILE_WR | 356 CEPH_CAP_FILE_BUFFER | 357 CEPH_CAP_FILE_EXCL)) 358 break; 359 } 360 return mds; 361 } 362 363 int ceph_get_cap_mds(struct inode *inode) 364 { 365 struct ceph_inode_info *ci = ceph_inode(inode); 366 int mds; 367 spin_lock(&ci->i_ceph_lock); 368 mds = __ceph_get_cap_mds(ceph_inode(inode)); 369 spin_unlock(&ci->i_ceph_lock); 370 return mds; 371 } 372 373 /* 374 * Called under i_ceph_lock. 375 */ 376 static void __insert_cap_node(struct ceph_inode_info *ci, 377 struct ceph_cap *new) 378 { 379 struct rb_node **p = &ci->i_caps.rb_node; 380 struct rb_node *parent = NULL; 381 struct ceph_cap *cap = NULL; 382 383 while (*p) { 384 parent = *p; 385 cap = rb_entry(parent, struct ceph_cap, ci_node); 386 if (new->mds < cap->mds) 387 p = &(*p)->rb_left; 388 else if (new->mds > cap->mds) 389 p = &(*p)->rb_right; 390 else 391 BUG(); 392 } 393 394 rb_link_node(&new->ci_node, parent, p); 395 rb_insert_color(&new->ci_node, &ci->i_caps); 396 } 397 398 /* 399 * (re)set cap hold timeouts, which control the delayed release 400 * of unused caps back to the MDS. Should be called on cap use. 401 */ 402 static void __cap_set_timeouts(struct ceph_mds_client *mdsc, 403 struct ceph_inode_info *ci) 404 { 405 struct ceph_mount_options *ma = mdsc->fsc->mount_options; 406 407 ci->i_hold_caps_min = round_jiffies(jiffies + 408 ma->caps_wanted_delay_min * HZ); 409 ci->i_hold_caps_max = round_jiffies(jiffies + 410 ma->caps_wanted_delay_max * HZ); 411 dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode, 412 ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies); 413 } 414 415 /* 416 * (Re)queue cap at the end of the delayed cap release list. 417 * 418 * If I_FLUSH is set, leave the inode at the front of the list. 419 * 420 * Caller holds i_ceph_lock 421 * -> we take mdsc->cap_delay_lock 422 */ 423 static void __cap_delay_requeue(struct ceph_mds_client *mdsc, 424 struct ceph_inode_info *ci) 425 { 426 __cap_set_timeouts(mdsc, ci); 427 dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode, 428 ci->i_ceph_flags, ci->i_hold_caps_max); 429 if (!mdsc->stopping) { 430 spin_lock(&mdsc->cap_delay_lock); 431 if (!list_empty(&ci->i_cap_delay_list)) { 432 if (ci->i_ceph_flags & CEPH_I_FLUSH) 433 goto no_change; 434 list_del_init(&ci->i_cap_delay_list); 435 } 436 list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list); 437 no_change: 438 spin_unlock(&mdsc->cap_delay_lock); 439 } 440 } 441 442 /* 443 * Queue an inode for immediate writeback. Mark inode with I_FLUSH, 444 * indicating we should send a cap message to flush dirty metadata 445 * asap, and move to the front of the delayed cap list. 446 */ 447 static void __cap_delay_requeue_front(struct ceph_mds_client *mdsc, 448 struct ceph_inode_info *ci) 449 { 450 dout("__cap_delay_requeue_front %p\n", &ci->vfs_inode); 451 spin_lock(&mdsc->cap_delay_lock); 452 ci->i_ceph_flags |= CEPH_I_FLUSH; 453 if (!list_empty(&ci->i_cap_delay_list)) 454 list_del_init(&ci->i_cap_delay_list); 455 list_add(&ci->i_cap_delay_list, &mdsc->cap_delay_list); 456 spin_unlock(&mdsc->cap_delay_lock); 457 } 458 459 /* 460 * Cancel delayed work on cap. 461 * 462 * Caller must hold i_ceph_lock. 463 */ 464 static void __cap_delay_cancel(struct ceph_mds_client *mdsc, 465 struct ceph_inode_info *ci) 466 { 467 dout("__cap_delay_cancel %p\n", &ci->vfs_inode); 468 if (list_empty(&ci->i_cap_delay_list)) 469 return; 470 spin_lock(&mdsc->cap_delay_lock); 471 list_del_init(&ci->i_cap_delay_list); 472 spin_unlock(&mdsc->cap_delay_lock); 473 } 474 475 /* 476 * Common issue checks for add_cap, handle_cap_grant. 477 */ 478 static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap, 479 unsigned issued) 480 { 481 unsigned had = __ceph_caps_issued(ci, NULL); 482 483 /* 484 * Each time we receive FILE_CACHE anew, we increment 485 * i_rdcache_gen. 486 */ 487 if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && 488 (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) { 489 ci->i_rdcache_gen++; 490 } 491 492 /* 493 * If FILE_SHARED is newly issued, mark dir not complete. We don't 494 * know what happened to this directory while we didn't have the cap. 495 * If FILE_SHARED is being revoked, also mark dir not complete. It 496 * stops on-going cached readdir. 497 */ 498 if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) { 499 if (issued & CEPH_CAP_FILE_SHARED) 500 ci->i_shared_gen++; 501 if (S_ISDIR(ci->vfs_inode.i_mode)) { 502 dout(" marking %p NOT complete\n", &ci->vfs_inode); 503 __ceph_dir_clear_complete(ci); 504 } 505 } 506 } 507 508 /* 509 * Add a capability under the given MDS session. 510 * 511 * Caller should hold session snap_rwsem (read) and s_mutex. 512 * 513 * @fmode is the open file mode, if we are opening a file, otherwise 514 * it is < 0. (This is so we can atomically add the cap and add an 515 * open file reference to it.) 516 */ 517 void ceph_add_cap(struct inode *inode, 518 struct ceph_mds_session *session, u64 cap_id, 519 int fmode, unsigned issued, unsigned wanted, 520 unsigned seq, unsigned mseq, u64 realmino, int flags, 521 struct ceph_cap **new_cap) 522 { 523 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 524 struct ceph_inode_info *ci = ceph_inode(inode); 525 struct ceph_cap *cap; 526 int mds = session->s_mds; 527 int actual_wanted; 528 529 dout("add_cap %p mds%d cap %llx %s seq %d\n", inode, 530 session->s_mds, cap_id, ceph_cap_string(issued), seq); 531 532 /* 533 * If we are opening the file, include file mode wanted bits 534 * in wanted. 535 */ 536 if (fmode >= 0) 537 wanted |= ceph_caps_for_mode(fmode); 538 539 cap = __get_cap_for_mds(ci, mds); 540 if (!cap) { 541 cap = *new_cap; 542 *new_cap = NULL; 543 544 cap->issued = 0; 545 cap->implemented = 0; 546 cap->mds = mds; 547 cap->mds_wanted = 0; 548 cap->mseq = 0; 549 550 cap->ci = ci; 551 __insert_cap_node(ci, cap); 552 553 /* add to session cap list */ 554 cap->session = session; 555 spin_lock(&session->s_cap_lock); 556 list_add_tail(&cap->session_caps, &session->s_caps); 557 session->s_nr_caps++; 558 spin_unlock(&session->s_cap_lock); 559 } else { 560 /* 561 * auth mds of the inode changed. we received the cap export 562 * message, but still haven't received the cap import message. 563 * handle_cap_export() updated the new auth MDS' cap. 564 * 565 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing 566 * a message that was send before the cap import message. So 567 * don't remove caps. 568 */ 569 if (ceph_seq_cmp(seq, cap->seq) <= 0) { 570 WARN_ON(cap != ci->i_auth_cap); 571 WARN_ON(cap->cap_id != cap_id); 572 seq = cap->seq; 573 mseq = cap->mseq; 574 issued |= cap->issued; 575 flags |= CEPH_CAP_FLAG_AUTH; 576 } 577 } 578 579 if (!ci->i_snap_realm) { 580 /* 581 * add this inode to the appropriate snap realm 582 */ 583 struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc, 584 realmino); 585 if (realm) { 586 spin_lock(&realm->inodes_with_caps_lock); 587 ci->i_snap_realm = realm; 588 list_add(&ci->i_snap_realm_item, 589 &realm->inodes_with_caps); 590 spin_unlock(&realm->inodes_with_caps_lock); 591 } else { 592 pr_err("ceph_add_cap: couldn't find snap realm %llx\n", 593 realmino); 594 WARN_ON(!realm); 595 } 596 } 597 598 __check_cap_issue(ci, cap, issued); 599 600 /* 601 * If we are issued caps we don't want, or the mds' wanted 602 * value appears to be off, queue a check so we'll release 603 * later and/or update the mds wanted value. 604 */ 605 actual_wanted = __ceph_caps_wanted(ci); 606 if ((wanted & ~actual_wanted) || 607 (issued & ~actual_wanted & CEPH_CAP_ANY_WR)) { 608 dout(" issued %s, mds wanted %s, actual %s, queueing\n", 609 ceph_cap_string(issued), ceph_cap_string(wanted), 610 ceph_cap_string(actual_wanted)); 611 __cap_delay_requeue(mdsc, ci); 612 } 613 614 if (flags & CEPH_CAP_FLAG_AUTH) { 615 if (!ci->i_auth_cap || 616 ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) { 617 ci->i_auth_cap = cap; 618 cap->mds_wanted = wanted; 619 } 620 } else { 621 WARN_ON(ci->i_auth_cap == cap); 622 } 623 624 dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n", 625 inode, ceph_vinop(inode), cap, ceph_cap_string(issued), 626 ceph_cap_string(issued|cap->issued), seq, mds); 627 cap->cap_id = cap_id; 628 cap->issued = issued; 629 cap->implemented |= issued; 630 if (ceph_seq_cmp(mseq, cap->mseq) > 0) 631 cap->mds_wanted = wanted; 632 else 633 cap->mds_wanted |= wanted; 634 cap->seq = seq; 635 cap->issue_seq = seq; 636 cap->mseq = mseq; 637 cap->cap_gen = session->s_cap_gen; 638 639 if (fmode >= 0) 640 __ceph_get_fmode(ci, fmode); 641 } 642 643 /* 644 * Return true if cap has not timed out and belongs to the current 645 * generation of the MDS session (i.e. has not gone 'stale' due to 646 * us losing touch with the mds). 647 */ 648 static int __cap_is_valid(struct ceph_cap *cap) 649 { 650 unsigned long ttl; 651 u32 gen; 652 653 spin_lock(&cap->session->s_gen_ttl_lock); 654 gen = cap->session->s_cap_gen; 655 ttl = cap->session->s_cap_ttl; 656 spin_unlock(&cap->session->s_gen_ttl_lock); 657 658 if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) { 659 dout("__cap_is_valid %p cap %p issued %s " 660 "but STALE (gen %u vs %u)\n", &cap->ci->vfs_inode, 661 cap, ceph_cap_string(cap->issued), cap->cap_gen, gen); 662 return 0; 663 } 664 665 return 1; 666 } 667 668 /* 669 * Return set of valid cap bits issued to us. Note that caps time 670 * out, and may be invalidated in bulk if the client session times out 671 * and session->s_cap_gen is bumped. 672 */ 673 int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented) 674 { 675 int have = ci->i_snap_caps; 676 struct ceph_cap *cap; 677 struct rb_node *p; 678 679 if (implemented) 680 *implemented = 0; 681 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 682 cap = rb_entry(p, struct ceph_cap, ci_node); 683 if (!__cap_is_valid(cap)) 684 continue; 685 dout("__ceph_caps_issued %p cap %p issued %s\n", 686 &ci->vfs_inode, cap, ceph_cap_string(cap->issued)); 687 have |= cap->issued; 688 if (implemented) 689 *implemented |= cap->implemented; 690 } 691 /* 692 * exclude caps issued by non-auth MDS, but are been revoking 693 * by the auth MDS. The non-auth MDS should be revoking/exporting 694 * these caps, but the message is delayed. 695 */ 696 if (ci->i_auth_cap) { 697 cap = ci->i_auth_cap; 698 have &= ~cap->implemented | cap->issued; 699 } 700 return have; 701 } 702 703 /* 704 * Get cap bits issued by caps other than @ocap 705 */ 706 int __ceph_caps_issued_other(struct ceph_inode_info *ci, struct ceph_cap *ocap) 707 { 708 int have = ci->i_snap_caps; 709 struct ceph_cap *cap; 710 struct rb_node *p; 711 712 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 713 cap = rb_entry(p, struct ceph_cap, ci_node); 714 if (cap == ocap) 715 continue; 716 if (!__cap_is_valid(cap)) 717 continue; 718 have |= cap->issued; 719 } 720 return have; 721 } 722 723 /* 724 * Move a cap to the end of the LRU (oldest caps at list head, newest 725 * at list tail). 726 */ 727 static void __touch_cap(struct ceph_cap *cap) 728 { 729 struct ceph_mds_session *s = cap->session; 730 731 spin_lock(&s->s_cap_lock); 732 if (!s->s_cap_iterator) { 733 dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap, 734 s->s_mds); 735 list_move_tail(&cap->session_caps, &s->s_caps); 736 } else { 737 dout("__touch_cap %p cap %p mds%d NOP, iterating over caps\n", 738 &cap->ci->vfs_inode, cap, s->s_mds); 739 } 740 spin_unlock(&s->s_cap_lock); 741 } 742 743 /* 744 * Check if we hold the given mask. If so, move the cap(s) to the 745 * front of their respective LRUs. (This is the preferred way for 746 * callers to check for caps they want.) 747 */ 748 int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch) 749 { 750 struct ceph_cap *cap; 751 struct rb_node *p; 752 int have = ci->i_snap_caps; 753 754 if ((have & mask) == mask) { 755 dout("__ceph_caps_issued_mask %p snap issued %s" 756 " (mask %s)\n", &ci->vfs_inode, 757 ceph_cap_string(have), 758 ceph_cap_string(mask)); 759 return 1; 760 } 761 762 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 763 cap = rb_entry(p, struct ceph_cap, ci_node); 764 if (!__cap_is_valid(cap)) 765 continue; 766 if ((cap->issued & mask) == mask) { 767 dout("__ceph_caps_issued_mask %p cap %p issued %s" 768 " (mask %s)\n", &ci->vfs_inode, cap, 769 ceph_cap_string(cap->issued), 770 ceph_cap_string(mask)); 771 if (touch) 772 __touch_cap(cap); 773 return 1; 774 } 775 776 /* does a combination of caps satisfy mask? */ 777 have |= cap->issued; 778 if ((have & mask) == mask) { 779 dout("__ceph_caps_issued_mask %p combo issued %s" 780 " (mask %s)\n", &ci->vfs_inode, 781 ceph_cap_string(cap->issued), 782 ceph_cap_string(mask)); 783 if (touch) { 784 struct rb_node *q; 785 786 /* touch this + preceding caps */ 787 __touch_cap(cap); 788 for (q = rb_first(&ci->i_caps); q != p; 789 q = rb_next(q)) { 790 cap = rb_entry(q, struct ceph_cap, 791 ci_node); 792 if (!__cap_is_valid(cap)) 793 continue; 794 __touch_cap(cap); 795 } 796 } 797 return 1; 798 } 799 } 800 801 return 0; 802 } 803 804 /* 805 * Return true if mask caps are currently being revoked by an MDS. 806 */ 807 int __ceph_caps_revoking_other(struct ceph_inode_info *ci, 808 struct ceph_cap *ocap, int mask) 809 { 810 struct ceph_cap *cap; 811 struct rb_node *p; 812 813 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 814 cap = rb_entry(p, struct ceph_cap, ci_node); 815 if (cap != ocap && 816 (cap->implemented & ~cap->issued & mask)) 817 return 1; 818 } 819 return 0; 820 } 821 822 int ceph_caps_revoking(struct ceph_inode_info *ci, int mask) 823 { 824 struct inode *inode = &ci->vfs_inode; 825 int ret; 826 827 spin_lock(&ci->i_ceph_lock); 828 ret = __ceph_caps_revoking_other(ci, NULL, mask); 829 spin_unlock(&ci->i_ceph_lock); 830 dout("ceph_caps_revoking %p %s = %d\n", inode, 831 ceph_cap_string(mask), ret); 832 return ret; 833 } 834 835 int __ceph_caps_used(struct ceph_inode_info *ci) 836 { 837 int used = 0; 838 if (ci->i_pin_ref) 839 used |= CEPH_CAP_PIN; 840 if (ci->i_rd_ref) 841 used |= CEPH_CAP_FILE_RD; 842 if (ci->i_rdcache_ref || 843 (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */ 844 ci->vfs_inode.i_data.nrpages)) 845 used |= CEPH_CAP_FILE_CACHE; 846 if (ci->i_wr_ref) 847 used |= CEPH_CAP_FILE_WR; 848 if (ci->i_wb_ref || ci->i_wrbuffer_ref) 849 used |= CEPH_CAP_FILE_BUFFER; 850 return used; 851 } 852 853 /* 854 * wanted, by virtue of open file modes 855 */ 856 int __ceph_caps_file_wanted(struct ceph_inode_info *ci) 857 { 858 int i, bits = 0; 859 for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { 860 if (ci->i_nr_by_mode[i]) 861 bits |= 1 << i; 862 } 863 if (bits == 0) 864 return 0; 865 return ceph_caps_for_mode(bits >> 1); 866 } 867 868 /* 869 * Return caps we have registered with the MDS(s) as 'wanted'. 870 */ 871 int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check) 872 { 873 struct ceph_cap *cap; 874 struct rb_node *p; 875 int mds_wanted = 0; 876 877 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 878 cap = rb_entry(p, struct ceph_cap, ci_node); 879 if (check && !__cap_is_valid(cap)) 880 continue; 881 if (cap == ci->i_auth_cap) 882 mds_wanted |= cap->mds_wanted; 883 else 884 mds_wanted |= (cap->mds_wanted & ~CEPH_CAP_ANY_FILE_WR); 885 } 886 return mds_wanted; 887 } 888 889 /* 890 * called under i_ceph_lock 891 */ 892 static int __ceph_is_any_caps(struct ceph_inode_info *ci) 893 { 894 return !RB_EMPTY_ROOT(&ci->i_caps); 895 } 896 897 int ceph_is_any_caps(struct inode *inode) 898 { 899 struct ceph_inode_info *ci = ceph_inode(inode); 900 int ret; 901 902 spin_lock(&ci->i_ceph_lock); 903 ret = __ceph_is_any_caps(ci); 904 spin_unlock(&ci->i_ceph_lock); 905 906 return ret; 907 } 908 909 static void drop_inode_snap_realm(struct ceph_inode_info *ci) 910 { 911 struct ceph_snap_realm *realm = ci->i_snap_realm; 912 spin_lock(&realm->inodes_with_caps_lock); 913 list_del_init(&ci->i_snap_realm_item); 914 ci->i_snap_realm_counter++; 915 ci->i_snap_realm = NULL; 916 spin_unlock(&realm->inodes_with_caps_lock); 917 ceph_put_snap_realm(ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc, 918 realm); 919 } 920 921 /* 922 * Remove a cap. Take steps to deal with a racing iterate_session_caps. 923 * 924 * caller should hold i_ceph_lock. 925 * caller will not hold session s_mutex if called from destroy_inode. 926 */ 927 void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) 928 { 929 struct ceph_mds_session *session = cap->session; 930 struct ceph_inode_info *ci = cap->ci; 931 struct ceph_mds_client *mdsc = 932 ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; 933 int removed = 0; 934 935 dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode); 936 937 /* remove from session list */ 938 spin_lock(&session->s_cap_lock); 939 if (session->s_cap_iterator == cap) { 940 /* not yet, we are iterating over this very cap */ 941 dout("__ceph_remove_cap delaying %p removal from session %p\n", 942 cap, cap->session); 943 } else { 944 list_del_init(&cap->session_caps); 945 session->s_nr_caps--; 946 cap->session = NULL; 947 removed = 1; 948 } 949 /* protect backpointer with s_cap_lock: see iterate_session_caps */ 950 cap->ci = NULL; 951 952 /* 953 * s_cap_reconnect is protected by s_cap_lock. no one changes 954 * s_cap_gen while session is in the reconnect state. 955 */ 956 if (queue_release && 957 (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) { 958 cap->queue_release = 1; 959 if (removed) { 960 list_add_tail(&cap->session_caps, 961 &session->s_cap_releases); 962 session->s_num_cap_releases++; 963 removed = 0; 964 } 965 } else { 966 cap->queue_release = 0; 967 } 968 cap->cap_ino = ci->i_vino.ino; 969 970 spin_unlock(&session->s_cap_lock); 971 972 /* remove from inode list */ 973 rb_erase(&cap->ci_node, &ci->i_caps); 974 if (ci->i_auth_cap == cap) 975 ci->i_auth_cap = NULL; 976 977 if (removed) 978 ceph_put_cap(mdsc, cap); 979 980 /* when reconnect denied, we remove session caps forcibly, 981 * i_wr_ref can be non-zero. If there are ongoing write, 982 * keep i_snap_realm. 983 */ 984 if (!__ceph_is_any_caps(ci) && ci->i_wr_ref == 0 && ci->i_snap_realm) 985 drop_inode_snap_realm(ci); 986 987 if (!__ceph_is_any_real_caps(ci)) 988 __cap_delay_cancel(mdsc, ci); 989 } 990 991 struct cap_msg_args { 992 struct ceph_mds_session *session; 993 u64 ino, cid, follows; 994 u64 flush_tid, oldest_flush_tid, size, max_size; 995 u64 xattr_version; 996 struct ceph_buffer *xattr_buf; 997 struct timespec atime, mtime, ctime; 998 int op, caps, wanted, dirty; 999 u32 seq, issue_seq, mseq, time_warp_seq; 1000 u32 flags; 1001 kuid_t uid; 1002 kgid_t gid; 1003 umode_t mode; 1004 bool inline_data; 1005 }; 1006 1007 /* 1008 * Build and send a cap message to the given MDS. 1009 * 1010 * Caller should be holding s_mutex. 1011 */ 1012 static int send_cap_msg(struct cap_msg_args *arg) 1013 { 1014 struct ceph_mds_caps *fc; 1015 struct ceph_msg *msg; 1016 void *p; 1017 size_t extra_len; 1018 struct timespec zerotime = {0}; 1019 struct ceph_osd_client *osdc = &arg->session->s_mdsc->fsc->client->osdc; 1020 1021 dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" 1022 " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu" 1023 " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(arg->op), 1024 arg->cid, arg->ino, ceph_cap_string(arg->caps), 1025 ceph_cap_string(arg->wanted), ceph_cap_string(arg->dirty), 1026 arg->seq, arg->issue_seq, arg->flush_tid, arg->oldest_flush_tid, 1027 arg->mseq, arg->follows, arg->size, arg->max_size, 1028 arg->xattr_version, 1029 arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0); 1030 1031 /* flock buffer size + inline version + inline data size + 1032 * osd_epoch_barrier + oldest_flush_tid */ 1033 extra_len = 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4; 1034 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len, 1035 GFP_NOFS, false); 1036 if (!msg) 1037 return -ENOMEM; 1038 1039 msg->hdr.version = cpu_to_le16(10); 1040 msg->hdr.tid = cpu_to_le64(arg->flush_tid); 1041 1042 fc = msg->front.iov_base; 1043 memset(fc, 0, sizeof(*fc)); 1044 1045 fc->cap_id = cpu_to_le64(arg->cid); 1046 fc->op = cpu_to_le32(arg->op); 1047 fc->seq = cpu_to_le32(arg->seq); 1048 fc->issue_seq = cpu_to_le32(arg->issue_seq); 1049 fc->migrate_seq = cpu_to_le32(arg->mseq); 1050 fc->caps = cpu_to_le32(arg->caps); 1051 fc->wanted = cpu_to_le32(arg->wanted); 1052 fc->dirty = cpu_to_le32(arg->dirty); 1053 fc->ino = cpu_to_le64(arg->ino); 1054 fc->snap_follows = cpu_to_le64(arg->follows); 1055 1056 fc->size = cpu_to_le64(arg->size); 1057 fc->max_size = cpu_to_le64(arg->max_size); 1058 ceph_encode_timespec(&fc->mtime, &arg->mtime); 1059 ceph_encode_timespec(&fc->atime, &arg->atime); 1060 ceph_encode_timespec(&fc->ctime, &arg->ctime); 1061 fc->time_warp_seq = cpu_to_le32(arg->time_warp_seq); 1062 1063 fc->uid = cpu_to_le32(from_kuid(&init_user_ns, arg->uid)); 1064 fc->gid = cpu_to_le32(from_kgid(&init_user_ns, arg->gid)); 1065 fc->mode = cpu_to_le32(arg->mode); 1066 1067 fc->xattr_version = cpu_to_le64(arg->xattr_version); 1068 if (arg->xattr_buf) { 1069 msg->middle = ceph_buffer_get(arg->xattr_buf); 1070 fc->xattr_len = cpu_to_le32(arg->xattr_buf->vec.iov_len); 1071 msg->hdr.middle_len = cpu_to_le32(arg->xattr_buf->vec.iov_len); 1072 } 1073 1074 p = fc + 1; 1075 /* flock buffer size (version 2) */ 1076 ceph_encode_32(&p, 0); 1077 /* inline version (version 4) */ 1078 ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE); 1079 /* inline data size */ 1080 ceph_encode_32(&p, 0); 1081 /* 1082 * osd_epoch_barrier (version 5) 1083 * The epoch_barrier is protected osdc->lock, so READ_ONCE here in 1084 * case it was recently changed 1085 */ 1086 ceph_encode_32(&p, READ_ONCE(osdc->epoch_barrier)); 1087 /* oldest_flush_tid (version 6) */ 1088 ceph_encode_64(&p, arg->oldest_flush_tid); 1089 1090 /* 1091 * caller_uid/caller_gid (version 7) 1092 * 1093 * Currently, we don't properly track which caller dirtied the caps 1094 * last, and force a flush of them when there is a conflict. For now, 1095 * just set this to 0:0, to emulate how the MDS has worked up to now. 1096 */ 1097 ceph_encode_32(&p, 0); 1098 ceph_encode_32(&p, 0); 1099 1100 /* pool namespace (version 8) (mds always ignores this) */ 1101 ceph_encode_32(&p, 0); 1102 1103 /* 1104 * btime and change_attr (version 9) 1105 * 1106 * We just zero these out for now, as the MDS ignores them unless 1107 * the requisite feature flags are set (which we don't do yet). 1108 */ 1109 ceph_encode_timespec(p, &zerotime); 1110 p += sizeof(struct ceph_timespec); 1111 ceph_encode_64(&p, 0); 1112 1113 /* Advisory flags (version 10) */ 1114 ceph_encode_32(&p, arg->flags); 1115 1116 ceph_con_send(&arg->session->s_con, msg); 1117 return 0; 1118 } 1119 1120 /* 1121 * Queue cap releases when an inode is dropped from our cache. Since 1122 * inode is about to be destroyed, there is no need for i_ceph_lock. 1123 */ 1124 void ceph_queue_caps_release(struct inode *inode) 1125 { 1126 struct ceph_inode_info *ci = ceph_inode(inode); 1127 struct rb_node *p; 1128 1129 p = rb_first(&ci->i_caps); 1130 while (p) { 1131 struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node); 1132 p = rb_next(p); 1133 __ceph_remove_cap(cap, true); 1134 } 1135 } 1136 1137 /* 1138 * Send a cap msg on the given inode. Update our caps state, then 1139 * drop i_ceph_lock and send the message. 1140 * 1141 * Make note of max_size reported/requested from mds, revoked caps 1142 * that have now been implemented. 1143 * 1144 * Make half-hearted attempt ot to invalidate page cache if we are 1145 * dropping RDCACHE. Note that this will leave behind locked pages 1146 * that we'll then need to deal with elsewhere. 1147 * 1148 * Return non-zero if delayed release, or we experienced an error 1149 * such that the caller should requeue + retry later. 1150 * 1151 * called with i_ceph_lock, then drops it. 1152 * caller should hold snap_rwsem (read), s_mutex. 1153 */ 1154 static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, 1155 int op, bool sync, int used, int want, int retain, 1156 int flushing, u64 flush_tid, u64 oldest_flush_tid) 1157 __releases(cap->ci->i_ceph_lock) 1158 { 1159 struct ceph_inode_info *ci = cap->ci; 1160 struct inode *inode = &ci->vfs_inode; 1161 struct cap_msg_args arg; 1162 int held, revoking, dropping; 1163 int wake = 0; 1164 int delayed = 0; 1165 int ret; 1166 1167 held = cap->issued | cap->implemented; 1168 revoking = cap->implemented & ~cap->issued; 1169 retain &= ~revoking; 1170 dropping = cap->issued & ~retain; 1171 1172 dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n", 1173 inode, cap, cap->session, 1174 ceph_cap_string(held), ceph_cap_string(held & retain), 1175 ceph_cap_string(revoking)); 1176 BUG_ON((retain & CEPH_CAP_PIN) == 0); 1177 1178 arg.session = cap->session; 1179 1180 /* don't release wanted unless we've waited a bit. */ 1181 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 && 1182 time_before(jiffies, ci->i_hold_caps_min)) { 1183 dout(" delaying issued %s -> %s, wanted %s -> %s on send\n", 1184 ceph_cap_string(cap->issued), 1185 ceph_cap_string(cap->issued & retain), 1186 ceph_cap_string(cap->mds_wanted), 1187 ceph_cap_string(want)); 1188 want |= cap->mds_wanted; 1189 retain |= cap->issued; 1190 delayed = 1; 1191 } 1192 ci->i_ceph_flags &= ~(CEPH_I_NODELAY | CEPH_I_FLUSH); 1193 if (want & ~cap->mds_wanted) { 1194 /* user space may open/close single file frequently. 1195 * This avoids droping mds_wanted immediately after 1196 * requesting new mds_wanted. 1197 */ 1198 __cap_set_timeouts(mdsc, ci); 1199 } 1200 1201 cap->issued &= retain; /* drop bits we don't want */ 1202 if (cap->implemented & ~cap->issued) { 1203 /* 1204 * Wake up any waiters on wanted -> needed transition. 1205 * This is due to the weird transition from buffered 1206 * to sync IO... we need to flush dirty pages _before_ 1207 * allowing sync writes to avoid reordering. 1208 */ 1209 wake = 1; 1210 } 1211 cap->implemented &= cap->issued | used; 1212 cap->mds_wanted = want; 1213 1214 arg.ino = ceph_vino(inode).ino; 1215 arg.cid = cap->cap_id; 1216 arg.follows = flushing ? ci->i_head_snapc->seq : 0; 1217 arg.flush_tid = flush_tid; 1218 arg.oldest_flush_tid = oldest_flush_tid; 1219 1220 arg.size = inode->i_size; 1221 ci->i_reported_size = arg.size; 1222 arg.max_size = ci->i_wanted_max_size; 1223 ci->i_requested_max_size = arg.max_size; 1224 1225 if (flushing & CEPH_CAP_XATTR_EXCL) { 1226 __ceph_build_xattrs_blob(ci); 1227 arg.xattr_version = ci->i_xattrs.version; 1228 arg.xattr_buf = ci->i_xattrs.blob; 1229 } else { 1230 arg.xattr_buf = NULL; 1231 } 1232 1233 arg.mtime = inode->i_mtime; 1234 arg.atime = inode->i_atime; 1235 arg.ctime = inode->i_ctime; 1236 1237 arg.op = op; 1238 arg.caps = cap->implemented; 1239 arg.wanted = want; 1240 arg.dirty = flushing; 1241 1242 arg.seq = cap->seq; 1243 arg.issue_seq = cap->issue_seq; 1244 arg.mseq = cap->mseq; 1245 arg.time_warp_seq = ci->i_time_warp_seq; 1246 1247 arg.uid = inode->i_uid; 1248 arg.gid = inode->i_gid; 1249 arg.mode = inode->i_mode; 1250 1251 arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE; 1252 if (list_empty(&ci->i_cap_snaps)) 1253 arg.flags = CEPH_CLIENT_CAPS_NO_CAPSNAP; 1254 else 1255 arg.flags = CEPH_CLIENT_CAPS_PENDING_CAPSNAP; 1256 if (sync) 1257 arg.flags |= CEPH_CLIENT_CAPS_SYNC; 1258 1259 spin_unlock(&ci->i_ceph_lock); 1260 1261 ret = send_cap_msg(&arg); 1262 if (ret < 0) { 1263 dout("error sending cap msg, must requeue %p\n", inode); 1264 delayed = 1; 1265 } 1266 1267 if (wake) 1268 wake_up_all(&ci->i_cap_wq); 1269 1270 return delayed; 1271 } 1272 1273 static inline int __send_flush_snap(struct inode *inode, 1274 struct ceph_mds_session *session, 1275 struct ceph_cap_snap *capsnap, 1276 u32 mseq, u64 oldest_flush_tid) 1277 { 1278 struct cap_msg_args arg; 1279 1280 arg.session = session; 1281 arg.ino = ceph_vino(inode).ino; 1282 arg.cid = 0; 1283 arg.follows = capsnap->follows; 1284 arg.flush_tid = capsnap->cap_flush.tid; 1285 arg.oldest_flush_tid = oldest_flush_tid; 1286 1287 arg.size = capsnap->size; 1288 arg.max_size = 0; 1289 arg.xattr_version = capsnap->xattr_version; 1290 arg.xattr_buf = capsnap->xattr_blob; 1291 1292 arg.atime = capsnap->atime; 1293 arg.mtime = capsnap->mtime; 1294 arg.ctime = capsnap->ctime; 1295 1296 arg.op = CEPH_CAP_OP_FLUSHSNAP; 1297 arg.caps = capsnap->issued; 1298 arg.wanted = 0; 1299 arg.dirty = capsnap->dirty; 1300 1301 arg.seq = 0; 1302 arg.issue_seq = 0; 1303 arg.mseq = mseq; 1304 arg.time_warp_seq = capsnap->time_warp_seq; 1305 1306 arg.uid = capsnap->uid; 1307 arg.gid = capsnap->gid; 1308 arg.mode = capsnap->mode; 1309 1310 arg.inline_data = capsnap->inline_data; 1311 arg.flags = 0; 1312 1313 return send_cap_msg(&arg); 1314 } 1315 1316 /* 1317 * When a snapshot is taken, clients accumulate dirty metadata on 1318 * inodes with capabilities in ceph_cap_snaps to describe the file 1319 * state at the time the snapshot was taken. This must be flushed 1320 * asynchronously back to the MDS once sync writes complete and dirty 1321 * data is written out. 1322 * 1323 * Called under i_ceph_lock. Takes s_mutex as needed. 1324 */ 1325 static void __ceph_flush_snaps(struct ceph_inode_info *ci, 1326 struct ceph_mds_session *session) 1327 __releases(ci->i_ceph_lock) 1328 __acquires(ci->i_ceph_lock) 1329 { 1330 struct inode *inode = &ci->vfs_inode; 1331 struct ceph_mds_client *mdsc = session->s_mdsc; 1332 struct ceph_cap_snap *capsnap; 1333 u64 oldest_flush_tid = 0; 1334 u64 first_tid = 1, last_tid = 0; 1335 1336 dout("__flush_snaps %p session %p\n", inode, session); 1337 1338 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 1339 /* 1340 * we need to wait for sync writes to complete and for dirty 1341 * pages to be written out. 1342 */ 1343 if (capsnap->dirty_pages || capsnap->writing) 1344 break; 1345 1346 /* should be removed by ceph_try_drop_cap_snap() */ 1347 BUG_ON(!capsnap->need_flush); 1348 1349 /* only flush each capsnap once */ 1350 if (capsnap->cap_flush.tid > 0) { 1351 dout(" already flushed %p, skipping\n", capsnap); 1352 continue; 1353 } 1354 1355 spin_lock(&mdsc->cap_dirty_lock); 1356 capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid; 1357 list_add_tail(&capsnap->cap_flush.g_list, 1358 &mdsc->cap_flush_list); 1359 if (oldest_flush_tid == 0) 1360 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 1361 if (list_empty(&ci->i_flushing_item)) { 1362 list_add_tail(&ci->i_flushing_item, 1363 &session->s_cap_flushing); 1364 } 1365 spin_unlock(&mdsc->cap_dirty_lock); 1366 1367 list_add_tail(&capsnap->cap_flush.i_list, 1368 &ci->i_cap_flush_list); 1369 1370 if (first_tid == 1) 1371 first_tid = capsnap->cap_flush.tid; 1372 last_tid = capsnap->cap_flush.tid; 1373 } 1374 1375 ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS; 1376 1377 while (first_tid <= last_tid) { 1378 struct ceph_cap *cap = ci->i_auth_cap; 1379 struct ceph_cap_flush *cf; 1380 int ret; 1381 1382 if (!(cap && cap->session == session)) { 1383 dout("__flush_snaps %p auth cap %p not mds%d, " 1384 "stop\n", inode, cap, session->s_mds); 1385 break; 1386 } 1387 1388 ret = -ENOENT; 1389 list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) { 1390 if (cf->tid >= first_tid) { 1391 ret = 0; 1392 break; 1393 } 1394 } 1395 if (ret < 0) 1396 break; 1397 1398 first_tid = cf->tid + 1; 1399 1400 capsnap = container_of(cf, struct ceph_cap_snap, cap_flush); 1401 refcount_inc(&capsnap->nref); 1402 spin_unlock(&ci->i_ceph_lock); 1403 1404 dout("__flush_snaps %p capsnap %p tid %llu %s\n", 1405 inode, capsnap, cf->tid, ceph_cap_string(capsnap->dirty)); 1406 1407 ret = __send_flush_snap(inode, session, capsnap, cap->mseq, 1408 oldest_flush_tid); 1409 if (ret < 0) { 1410 pr_err("__flush_snaps: error sending cap flushsnap, " 1411 "ino (%llx.%llx) tid %llu follows %llu\n", 1412 ceph_vinop(inode), cf->tid, capsnap->follows); 1413 } 1414 1415 ceph_put_cap_snap(capsnap); 1416 spin_lock(&ci->i_ceph_lock); 1417 } 1418 } 1419 1420 void ceph_flush_snaps(struct ceph_inode_info *ci, 1421 struct ceph_mds_session **psession) 1422 { 1423 struct inode *inode = &ci->vfs_inode; 1424 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 1425 struct ceph_mds_session *session = NULL; 1426 int mds; 1427 1428 dout("ceph_flush_snaps %p\n", inode); 1429 if (psession) 1430 session = *psession; 1431 retry: 1432 spin_lock(&ci->i_ceph_lock); 1433 if (!(ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)) { 1434 dout(" no capsnap needs flush, doing nothing\n"); 1435 goto out; 1436 } 1437 if (!ci->i_auth_cap) { 1438 dout(" no auth cap (migrating?), doing nothing\n"); 1439 goto out; 1440 } 1441 1442 mds = ci->i_auth_cap->session->s_mds; 1443 if (session && session->s_mds != mds) { 1444 dout(" oops, wrong session %p mutex\n", session); 1445 mutex_unlock(&session->s_mutex); 1446 ceph_put_mds_session(session); 1447 session = NULL; 1448 } 1449 if (!session) { 1450 spin_unlock(&ci->i_ceph_lock); 1451 mutex_lock(&mdsc->mutex); 1452 session = __ceph_lookup_mds_session(mdsc, mds); 1453 mutex_unlock(&mdsc->mutex); 1454 if (session) { 1455 dout(" inverting session/ino locks on %p\n", session); 1456 mutex_lock(&session->s_mutex); 1457 } 1458 goto retry; 1459 } 1460 1461 // make sure flushsnap messages are sent in proper order. 1462 if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { 1463 __kick_flushing_caps(mdsc, session, ci, 0); 1464 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 1465 } 1466 1467 __ceph_flush_snaps(ci, session); 1468 out: 1469 spin_unlock(&ci->i_ceph_lock); 1470 1471 if (psession) { 1472 *psession = session; 1473 } else if (session) { 1474 mutex_unlock(&session->s_mutex); 1475 ceph_put_mds_session(session); 1476 } 1477 /* we flushed them all; remove this inode from the queue */ 1478 spin_lock(&mdsc->snap_flush_lock); 1479 list_del_init(&ci->i_snap_flush_item); 1480 spin_unlock(&mdsc->snap_flush_lock); 1481 } 1482 1483 /* 1484 * Mark caps dirty. If inode is newly dirty, return the dirty flags. 1485 * Caller is then responsible for calling __mark_inode_dirty with the 1486 * returned flags value. 1487 */ 1488 int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, 1489 struct ceph_cap_flush **pcf) 1490 { 1491 struct ceph_mds_client *mdsc = 1492 ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; 1493 struct inode *inode = &ci->vfs_inode; 1494 int was = ci->i_dirty_caps; 1495 int dirty = 0; 1496 1497 if (!ci->i_auth_cap) { 1498 pr_warn("__mark_dirty_caps %p %llx mask %s, " 1499 "but no auth cap (session was closed?)\n", 1500 inode, ceph_ino(inode), ceph_cap_string(mask)); 1501 return 0; 1502 } 1503 1504 dout("__mark_dirty_caps %p %s dirty %s -> %s\n", &ci->vfs_inode, 1505 ceph_cap_string(mask), ceph_cap_string(was), 1506 ceph_cap_string(was | mask)); 1507 ci->i_dirty_caps |= mask; 1508 if (was == 0) { 1509 WARN_ON_ONCE(ci->i_prealloc_cap_flush); 1510 swap(ci->i_prealloc_cap_flush, *pcf); 1511 1512 if (!ci->i_head_snapc) { 1513 WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem)); 1514 ci->i_head_snapc = ceph_get_snap_context( 1515 ci->i_snap_realm->cached_context); 1516 } 1517 dout(" inode %p now dirty snapc %p auth cap %p\n", 1518 &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); 1519 BUG_ON(!list_empty(&ci->i_dirty_item)); 1520 spin_lock(&mdsc->cap_dirty_lock); 1521 list_add(&ci->i_dirty_item, &mdsc->cap_dirty); 1522 spin_unlock(&mdsc->cap_dirty_lock); 1523 if (ci->i_flushing_caps == 0) { 1524 ihold(inode); 1525 dirty |= I_DIRTY_SYNC; 1526 } 1527 } else { 1528 WARN_ON_ONCE(!ci->i_prealloc_cap_flush); 1529 } 1530 BUG_ON(list_empty(&ci->i_dirty_item)); 1531 if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) && 1532 (mask & CEPH_CAP_FILE_BUFFER)) 1533 dirty |= I_DIRTY_DATASYNC; 1534 __cap_delay_requeue(mdsc, ci); 1535 return dirty; 1536 } 1537 1538 struct ceph_cap_flush *ceph_alloc_cap_flush(void) 1539 { 1540 return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL); 1541 } 1542 1543 void ceph_free_cap_flush(struct ceph_cap_flush *cf) 1544 { 1545 if (cf) 1546 kmem_cache_free(ceph_cap_flush_cachep, cf); 1547 } 1548 1549 static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc) 1550 { 1551 if (!list_empty(&mdsc->cap_flush_list)) { 1552 struct ceph_cap_flush *cf = 1553 list_first_entry(&mdsc->cap_flush_list, 1554 struct ceph_cap_flush, g_list); 1555 return cf->tid; 1556 } 1557 return 0; 1558 } 1559 1560 /* 1561 * Remove cap_flush from the mdsc's or inode's flushing cap list. 1562 * Return true if caller needs to wake up flush waiters. 1563 */ 1564 static bool __finish_cap_flush(struct ceph_mds_client *mdsc, 1565 struct ceph_inode_info *ci, 1566 struct ceph_cap_flush *cf) 1567 { 1568 struct ceph_cap_flush *prev; 1569 bool wake = cf->wake; 1570 if (mdsc) { 1571 /* are there older pending cap flushes? */ 1572 if (wake && cf->g_list.prev != &mdsc->cap_flush_list) { 1573 prev = list_prev_entry(cf, g_list); 1574 prev->wake = true; 1575 wake = false; 1576 } 1577 list_del(&cf->g_list); 1578 } else if (ci) { 1579 if (wake && cf->i_list.prev != &ci->i_cap_flush_list) { 1580 prev = list_prev_entry(cf, i_list); 1581 prev->wake = true; 1582 wake = false; 1583 } 1584 list_del(&cf->i_list); 1585 } else { 1586 BUG_ON(1); 1587 } 1588 return wake; 1589 } 1590 1591 /* 1592 * Add dirty inode to the flushing list. Assigned a seq number so we 1593 * can wait for caps to flush without starving. 1594 * 1595 * Called under i_ceph_lock. 1596 */ 1597 static int __mark_caps_flushing(struct inode *inode, 1598 struct ceph_mds_session *session, bool wake, 1599 u64 *flush_tid, u64 *oldest_flush_tid) 1600 { 1601 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 1602 struct ceph_inode_info *ci = ceph_inode(inode); 1603 struct ceph_cap_flush *cf = NULL; 1604 int flushing; 1605 1606 BUG_ON(ci->i_dirty_caps == 0); 1607 BUG_ON(list_empty(&ci->i_dirty_item)); 1608 BUG_ON(!ci->i_prealloc_cap_flush); 1609 1610 flushing = ci->i_dirty_caps; 1611 dout("__mark_caps_flushing flushing %s, flushing_caps %s -> %s\n", 1612 ceph_cap_string(flushing), 1613 ceph_cap_string(ci->i_flushing_caps), 1614 ceph_cap_string(ci->i_flushing_caps | flushing)); 1615 ci->i_flushing_caps |= flushing; 1616 ci->i_dirty_caps = 0; 1617 dout(" inode %p now !dirty\n", inode); 1618 1619 swap(cf, ci->i_prealloc_cap_flush); 1620 cf->caps = flushing; 1621 cf->wake = wake; 1622 1623 spin_lock(&mdsc->cap_dirty_lock); 1624 list_del_init(&ci->i_dirty_item); 1625 1626 cf->tid = ++mdsc->last_cap_flush_tid; 1627 list_add_tail(&cf->g_list, &mdsc->cap_flush_list); 1628 *oldest_flush_tid = __get_oldest_flush_tid(mdsc); 1629 1630 if (list_empty(&ci->i_flushing_item)) { 1631 list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); 1632 mdsc->num_cap_flushing++; 1633 } 1634 spin_unlock(&mdsc->cap_dirty_lock); 1635 1636 list_add_tail(&cf->i_list, &ci->i_cap_flush_list); 1637 1638 *flush_tid = cf->tid; 1639 return flushing; 1640 } 1641 1642 /* 1643 * try to invalidate mapping pages without blocking. 1644 */ 1645 static int try_nonblocking_invalidate(struct inode *inode) 1646 { 1647 struct ceph_inode_info *ci = ceph_inode(inode); 1648 u32 invalidating_gen = ci->i_rdcache_gen; 1649 1650 spin_unlock(&ci->i_ceph_lock); 1651 invalidate_mapping_pages(&inode->i_data, 0, -1); 1652 spin_lock(&ci->i_ceph_lock); 1653 1654 if (inode->i_data.nrpages == 0 && 1655 invalidating_gen == ci->i_rdcache_gen) { 1656 /* success. */ 1657 dout("try_nonblocking_invalidate %p success\n", inode); 1658 /* save any racing async invalidate some trouble */ 1659 ci->i_rdcache_revoking = ci->i_rdcache_gen - 1; 1660 return 0; 1661 } 1662 dout("try_nonblocking_invalidate %p failed\n", inode); 1663 return -1; 1664 } 1665 1666 bool __ceph_should_report_size(struct ceph_inode_info *ci) 1667 { 1668 loff_t size = ci->vfs_inode.i_size; 1669 /* mds will adjust max size according to the reported size */ 1670 if (ci->i_flushing_caps & CEPH_CAP_FILE_WR) 1671 return false; 1672 if (size >= ci->i_max_size) 1673 return true; 1674 /* half of previous max_size increment has been used */ 1675 if (ci->i_max_size > ci->i_reported_size && 1676 (size << 1) >= ci->i_max_size + ci->i_reported_size) 1677 return true; 1678 return false; 1679 } 1680 1681 /* 1682 * Swiss army knife function to examine currently used and wanted 1683 * versus held caps. Release, flush, ack revoked caps to mds as 1684 * appropriate. 1685 * 1686 * CHECK_CAPS_NODELAY - caller is delayed work and we should not delay 1687 * cap release further. 1688 * CHECK_CAPS_AUTHONLY - we should only check the auth cap 1689 * CHECK_CAPS_FLUSH - we should flush any dirty caps immediately, without 1690 * further delay. 1691 */ 1692 void ceph_check_caps(struct ceph_inode_info *ci, int flags, 1693 struct ceph_mds_session *session) 1694 { 1695 struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode); 1696 struct ceph_mds_client *mdsc = fsc->mdsc; 1697 struct inode *inode = &ci->vfs_inode; 1698 struct ceph_cap *cap; 1699 u64 flush_tid, oldest_flush_tid; 1700 int file_wanted, used, cap_used; 1701 int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ 1702 int issued, implemented, want, retain, revoking, flushing = 0; 1703 int mds = -1; /* keep track of how far we've gone through i_caps list 1704 to avoid an infinite loop on retry */ 1705 struct rb_node *p; 1706 int delayed = 0, sent = 0, num; 1707 bool is_delayed = flags & CHECK_CAPS_NODELAY; 1708 bool queue_invalidate = false; 1709 bool force_requeue = false; 1710 bool tried_invalidate = false; 1711 1712 /* if we are unmounting, flush any unused caps immediately. */ 1713 if (mdsc->stopping) 1714 is_delayed = 1; 1715 1716 spin_lock(&ci->i_ceph_lock); 1717 1718 if (ci->i_ceph_flags & CEPH_I_FLUSH) 1719 flags |= CHECK_CAPS_FLUSH; 1720 1721 goto retry_locked; 1722 retry: 1723 spin_lock(&ci->i_ceph_lock); 1724 retry_locked: 1725 file_wanted = __ceph_caps_file_wanted(ci); 1726 used = __ceph_caps_used(ci); 1727 issued = __ceph_caps_issued(ci, &implemented); 1728 revoking = implemented & ~issued; 1729 1730 want = file_wanted; 1731 retain = file_wanted | used | CEPH_CAP_PIN; 1732 if (!mdsc->stopping && inode->i_nlink > 0) { 1733 if (file_wanted) { 1734 retain |= CEPH_CAP_ANY; /* be greedy */ 1735 } else if (S_ISDIR(inode->i_mode) && 1736 (issued & CEPH_CAP_FILE_SHARED) && 1737 __ceph_dir_is_complete(ci)) { 1738 /* 1739 * If a directory is complete, we want to keep 1740 * the exclusive cap. So that MDS does not end up 1741 * revoking the shared cap on every create/unlink 1742 * operation. 1743 */ 1744 want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL; 1745 retain |= want; 1746 } else { 1747 1748 retain |= CEPH_CAP_ANY_SHARED; 1749 /* 1750 * keep RD only if we didn't have the file open RW, 1751 * because then the mds would revoke it anyway to 1752 * journal max_size=0. 1753 */ 1754 if (ci->i_max_size == 0) 1755 retain |= CEPH_CAP_ANY_RD; 1756 } 1757 } 1758 1759 dout("check_caps %p file_want %s used %s dirty %s flushing %s" 1760 " issued %s revoking %s retain %s %s%s%s\n", inode, 1761 ceph_cap_string(file_wanted), 1762 ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps), 1763 ceph_cap_string(ci->i_flushing_caps), 1764 ceph_cap_string(issued), ceph_cap_string(revoking), 1765 ceph_cap_string(retain), 1766 (flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "", 1767 (flags & CHECK_CAPS_NODELAY) ? " NODELAY" : "", 1768 (flags & CHECK_CAPS_FLUSH) ? " FLUSH" : ""); 1769 1770 /* 1771 * If we no longer need to hold onto old our caps, and we may 1772 * have cached pages, but don't want them, then try to invalidate. 1773 * If we fail, it's because pages are locked.... try again later. 1774 */ 1775 if ((!is_delayed || mdsc->stopping) && 1776 !S_ISDIR(inode->i_mode) && /* ignore readdir cache */ 1777 !(ci->i_wb_ref || ci->i_wrbuffer_ref) && /* no dirty pages... */ 1778 inode->i_data.nrpages && /* have cached pages */ 1779 (revoking & (CEPH_CAP_FILE_CACHE| 1780 CEPH_CAP_FILE_LAZYIO)) && /* or revoking cache */ 1781 !tried_invalidate) { 1782 dout("check_caps trying to invalidate on %p\n", inode); 1783 if (try_nonblocking_invalidate(inode) < 0) { 1784 if (revoking & (CEPH_CAP_FILE_CACHE| 1785 CEPH_CAP_FILE_LAZYIO)) { 1786 dout("check_caps queuing invalidate\n"); 1787 queue_invalidate = true; 1788 ci->i_rdcache_revoking = ci->i_rdcache_gen; 1789 } else { 1790 dout("check_caps failed to invalidate pages\n"); 1791 /* we failed to invalidate pages. check these 1792 caps again later. */ 1793 force_requeue = true; 1794 __cap_set_timeouts(mdsc, ci); 1795 } 1796 } 1797 tried_invalidate = true; 1798 goto retry_locked; 1799 } 1800 1801 num = 0; 1802 for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { 1803 cap = rb_entry(p, struct ceph_cap, ci_node); 1804 num++; 1805 1806 /* avoid looping forever */ 1807 if (mds >= cap->mds || 1808 ((flags & CHECK_CAPS_AUTHONLY) && cap != ci->i_auth_cap)) 1809 continue; 1810 1811 /* NOTE: no side-effects allowed, until we take s_mutex */ 1812 1813 cap_used = used; 1814 if (ci->i_auth_cap && cap != ci->i_auth_cap) 1815 cap_used &= ~ci->i_auth_cap->issued; 1816 1817 revoking = cap->implemented & ~cap->issued; 1818 dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n", 1819 cap->mds, cap, ceph_cap_string(cap_used), 1820 ceph_cap_string(cap->issued), 1821 ceph_cap_string(cap->implemented), 1822 ceph_cap_string(revoking)); 1823 1824 if (cap == ci->i_auth_cap && 1825 (cap->issued & CEPH_CAP_FILE_WR)) { 1826 /* request larger max_size from MDS? */ 1827 if (ci->i_wanted_max_size > ci->i_max_size && 1828 ci->i_wanted_max_size > ci->i_requested_max_size) { 1829 dout("requesting new max_size\n"); 1830 goto ack; 1831 } 1832 1833 /* approaching file_max? */ 1834 if (__ceph_should_report_size(ci)) { 1835 dout("i_size approaching max_size\n"); 1836 goto ack; 1837 } 1838 } 1839 /* flush anything dirty? */ 1840 if (cap == ci->i_auth_cap) { 1841 if ((flags & CHECK_CAPS_FLUSH) && ci->i_dirty_caps) { 1842 dout("flushing dirty caps\n"); 1843 goto ack; 1844 } 1845 if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) { 1846 dout("flushing snap caps\n"); 1847 goto ack; 1848 } 1849 } 1850 1851 /* completed revocation? going down and there are no caps? */ 1852 if (revoking && (revoking & cap_used) == 0) { 1853 dout("completed revocation of %s\n", 1854 ceph_cap_string(cap->implemented & ~cap->issued)); 1855 goto ack; 1856 } 1857 1858 /* want more caps from mds? */ 1859 if (want & ~(cap->mds_wanted | cap->issued)) 1860 goto ack; 1861 1862 /* things we might delay */ 1863 if ((cap->issued & ~retain) == 0 && 1864 cap->mds_wanted == want) 1865 continue; /* nope, all good */ 1866 1867 if (is_delayed) 1868 goto ack; 1869 1870 /* delay? */ 1871 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 && 1872 time_before(jiffies, ci->i_hold_caps_max)) { 1873 dout(" delaying issued %s -> %s, wanted %s -> %s\n", 1874 ceph_cap_string(cap->issued), 1875 ceph_cap_string(cap->issued & retain), 1876 ceph_cap_string(cap->mds_wanted), 1877 ceph_cap_string(want)); 1878 delayed++; 1879 continue; 1880 } 1881 1882 ack: 1883 if (ci->i_ceph_flags & CEPH_I_NOFLUSH) { 1884 dout(" skipping %p I_NOFLUSH set\n", inode); 1885 continue; 1886 } 1887 1888 if (session && session != cap->session) { 1889 dout("oops, wrong session %p mutex\n", session); 1890 mutex_unlock(&session->s_mutex); 1891 session = NULL; 1892 } 1893 if (!session) { 1894 session = cap->session; 1895 if (mutex_trylock(&session->s_mutex) == 0) { 1896 dout("inverting session/ino locks on %p\n", 1897 session); 1898 spin_unlock(&ci->i_ceph_lock); 1899 if (took_snap_rwsem) { 1900 up_read(&mdsc->snap_rwsem); 1901 took_snap_rwsem = 0; 1902 } 1903 mutex_lock(&session->s_mutex); 1904 goto retry; 1905 } 1906 } 1907 1908 /* kick flushing and flush snaps before sending normal 1909 * cap message */ 1910 if (cap == ci->i_auth_cap && 1911 (ci->i_ceph_flags & 1912 (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) { 1913 if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { 1914 __kick_flushing_caps(mdsc, session, ci, 0); 1915 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 1916 } 1917 if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) 1918 __ceph_flush_snaps(ci, session); 1919 1920 goto retry_locked; 1921 } 1922 1923 /* take snap_rwsem after session mutex */ 1924 if (!took_snap_rwsem) { 1925 if (down_read_trylock(&mdsc->snap_rwsem) == 0) { 1926 dout("inverting snap/in locks on %p\n", 1927 inode); 1928 spin_unlock(&ci->i_ceph_lock); 1929 down_read(&mdsc->snap_rwsem); 1930 took_snap_rwsem = 1; 1931 goto retry; 1932 } 1933 took_snap_rwsem = 1; 1934 } 1935 1936 if (cap == ci->i_auth_cap && ci->i_dirty_caps) { 1937 flushing = __mark_caps_flushing(inode, session, false, 1938 &flush_tid, 1939 &oldest_flush_tid); 1940 } else { 1941 flushing = 0; 1942 flush_tid = 0; 1943 spin_lock(&mdsc->cap_dirty_lock); 1944 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 1945 spin_unlock(&mdsc->cap_dirty_lock); 1946 } 1947 1948 mds = cap->mds; /* remember mds, so we don't repeat */ 1949 sent++; 1950 1951 /* __send_cap drops i_ceph_lock */ 1952 delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, false, 1953 cap_used, want, retain, flushing, 1954 flush_tid, oldest_flush_tid); 1955 goto retry; /* retake i_ceph_lock and restart our cap scan. */ 1956 } 1957 1958 /* 1959 * Reschedule delayed caps release if we delayed anything, 1960 * otherwise cancel. 1961 */ 1962 if (delayed && is_delayed) 1963 force_requeue = true; /* __send_cap delayed release; requeue */ 1964 if (!delayed && !is_delayed) 1965 __cap_delay_cancel(mdsc, ci); 1966 else if (!is_delayed || force_requeue) 1967 __cap_delay_requeue(mdsc, ci); 1968 1969 spin_unlock(&ci->i_ceph_lock); 1970 1971 if (queue_invalidate) 1972 ceph_queue_invalidate(inode); 1973 1974 if (session) 1975 mutex_unlock(&session->s_mutex); 1976 if (took_snap_rwsem) 1977 up_read(&mdsc->snap_rwsem); 1978 } 1979 1980 /* 1981 * Try to flush dirty caps back to the auth mds. 1982 */ 1983 static int try_flush_caps(struct inode *inode, u64 *ptid) 1984 { 1985 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 1986 struct ceph_inode_info *ci = ceph_inode(inode); 1987 struct ceph_mds_session *session = NULL; 1988 int flushing = 0; 1989 u64 flush_tid = 0, oldest_flush_tid = 0; 1990 1991 retry: 1992 spin_lock(&ci->i_ceph_lock); 1993 if (ci->i_ceph_flags & CEPH_I_NOFLUSH) { 1994 spin_unlock(&ci->i_ceph_lock); 1995 dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode); 1996 goto out; 1997 } 1998 if (ci->i_dirty_caps && ci->i_auth_cap) { 1999 struct ceph_cap *cap = ci->i_auth_cap; 2000 int used = __ceph_caps_used(ci); 2001 int want = __ceph_caps_wanted(ci); 2002 int delayed; 2003 2004 if (!session || session != cap->session) { 2005 spin_unlock(&ci->i_ceph_lock); 2006 if (session) 2007 mutex_unlock(&session->s_mutex); 2008 session = cap->session; 2009 mutex_lock(&session->s_mutex); 2010 goto retry; 2011 } 2012 if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) { 2013 spin_unlock(&ci->i_ceph_lock); 2014 goto out; 2015 } 2016 2017 flushing = __mark_caps_flushing(inode, session, true, 2018 &flush_tid, &oldest_flush_tid); 2019 2020 /* __send_cap drops i_ceph_lock */ 2021 delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, true, 2022 used, want, (cap->issued | cap->implemented), 2023 flushing, flush_tid, oldest_flush_tid); 2024 2025 if (delayed) { 2026 spin_lock(&ci->i_ceph_lock); 2027 __cap_delay_requeue(mdsc, ci); 2028 spin_unlock(&ci->i_ceph_lock); 2029 } 2030 } else { 2031 if (!list_empty(&ci->i_cap_flush_list)) { 2032 struct ceph_cap_flush *cf = 2033 list_last_entry(&ci->i_cap_flush_list, 2034 struct ceph_cap_flush, i_list); 2035 cf->wake = true; 2036 flush_tid = cf->tid; 2037 } 2038 flushing = ci->i_flushing_caps; 2039 spin_unlock(&ci->i_ceph_lock); 2040 } 2041 out: 2042 if (session) 2043 mutex_unlock(&session->s_mutex); 2044 2045 *ptid = flush_tid; 2046 return flushing; 2047 } 2048 2049 /* 2050 * Return true if we've flushed caps through the given flush_tid. 2051 */ 2052 static int caps_are_flushed(struct inode *inode, u64 flush_tid) 2053 { 2054 struct ceph_inode_info *ci = ceph_inode(inode); 2055 int ret = 1; 2056 2057 spin_lock(&ci->i_ceph_lock); 2058 if (!list_empty(&ci->i_cap_flush_list)) { 2059 struct ceph_cap_flush * cf = 2060 list_first_entry(&ci->i_cap_flush_list, 2061 struct ceph_cap_flush, i_list); 2062 if (cf->tid <= flush_tid) 2063 ret = 0; 2064 } 2065 spin_unlock(&ci->i_ceph_lock); 2066 return ret; 2067 } 2068 2069 /* 2070 * wait for any unsafe requests to complete. 2071 */ 2072 static int unsafe_request_wait(struct inode *inode) 2073 { 2074 struct ceph_inode_info *ci = ceph_inode(inode); 2075 struct ceph_mds_request *req1 = NULL, *req2 = NULL; 2076 int ret, err = 0; 2077 2078 spin_lock(&ci->i_unsafe_lock); 2079 if (S_ISDIR(inode->i_mode) && !list_empty(&ci->i_unsafe_dirops)) { 2080 req1 = list_last_entry(&ci->i_unsafe_dirops, 2081 struct ceph_mds_request, 2082 r_unsafe_dir_item); 2083 ceph_mdsc_get_request(req1); 2084 } 2085 if (!list_empty(&ci->i_unsafe_iops)) { 2086 req2 = list_last_entry(&ci->i_unsafe_iops, 2087 struct ceph_mds_request, 2088 r_unsafe_target_item); 2089 ceph_mdsc_get_request(req2); 2090 } 2091 spin_unlock(&ci->i_unsafe_lock); 2092 2093 dout("unsafe_request_wait %p wait on tid %llu %llu\n", 2094 inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL); 2095 if (req1) { 2096 ret = !wait_for_completion_timeout(&req1->r_safe_completion, 2097 ceph_timeout_jiffies(req1->r_timeout)); 2098 if (ret) 2099 err = -EIO; 2100 ceph_mdsc_put_request(req1); 2101 } 2102 if (req2) { 2103 ret = !wait_for_completion_timeout(&req2->r_safe_completion, 2104 ceph_timeout_jiffies(req2->r_timeout)); 2105 if (ret) 2106 err = -EIO; 2107 ceph_mdsc_put_request(req2); 2108 } 2109 return err; 2110 } 2111 2112 int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) 2113 { 2114 struct inode *inode = file->f_mapping->host; 2115 struct ceph_inode_info *ci = ceph_inode(inode); 2116 u64 flush_tid; 2117 int ret; 2118 int dirty; 2119 2120 dout("fsync %p%s\n", inode, datasync ? " datasync" : ""); 2121 2122 ret = file_write_and_wait_range(file, start, end); 2123 if (ret < 0) 2124 goto out; 2125 2126 if (datasync) 2127 goto out; 2128 2129 inode_lock(inode); 2130 2131 dirty = try_flush_caps(inode, &flush_tid); 2132 dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); 2133 2134 ret = unsafe_request_wait(inode); 2135 2136 /* 2137 * only wait on non-file metadata writeback (the mds 2138 * can recover size and mtime, so we don't need to 2139 * wait for that) 2140 */ 2141 if (!ret && (dirty & ~CEPH_CAP_ANY_FILE_WR)) { 2142 ret = wait_event_interruptible(ci->i_cap_wq, 2143 caps_are_flushed(inode, flush_tid)); 2144 } 2145 inode_unlock(inode); 2146 out: 2147 dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret); 2148 return ret; 2149 } 2150 2151 /* 2152 * Flush any dirty caps back to the mds. If we aren't asked to wait, 2153 * queue inode for flush but don't do so immediately, because we can 2154 * get by with fewer MDS messages if we wait for data writeback to 2155 * complete first. 2156 */ 2157 int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) 2158 { 2159 struct ceph_inode_info *ci = ceph_inode(inode); 2160 u64 flush_tid; 2161 int err = 0; 2162 int dirty; 2163 int wait = wbc->sync_mode == WB_SYNC_ALL; 2164 2165 dout("write_inode %p wait=%d\n", inode, wait); 2166 if (wait) { 2167 dirty = try_flush_caps(inode, &flush_tid); 2168 if (dirty) 2169 err = wait_event_interruptible(ci->i_cap_wq, 2170 caps_are_flushed(inode, flush_tid)); 2171 } else { 2172 struct ceph_mds_client *mdsc = 2173 ceph_sb_to_client(inode->i_sb)->mdsc; 2174 2175 spin_lock(&ci->i_ceph_lock); 2176 if (__ceph_caps_dirty(ci)) 2177 __cap_delay_requeue_front(mdsc, ci); 2178 spin_unlock(&ci->i_ceph_lock); 2179 } 2180 return err; 2181 } 2182 2183 static void __kick_flushing_caps(struct ceph_mds_client *mdsc, 2184 struct ceph_mds_session *session, 2185 struct ceph_inode_info *ci, 2186 u64 oldest_flush_tid) 2187 __releases(ci->i_ceph_lock) 2188 __acquires(ci->i_ceph_lock) 2189 { 2190 struct inode *inode = &ci->vfs_inode; 2191 struct ceph_cap *cap; 2192 struct ceph_cap_flush *cf; 2193 int ret; 2194 u64 first_tid = 0; 2195 2196 list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) { 2197 if (cf->tid < first_tid) 2198 continue; 2199 2200 cap = ci->i_auth_cap; 2201 if (!(cap && cap->session == session)) { 2202 pr_err("%p auth cap %p not mds%d ???\n", 2203 inode, cap, session->s_mds); 2204 break; 2205 } 2206 2207 first_tid = cf->tid + 1; 2208 2209 if (cf->caps) { 2210 dout("kick_flushing_caps %p cap %p tid %llu %s\n", 2211 inode, cap, cf->tid, ceph_cap_string(cf->caps)); 2212 ci->i_ceph_flags |= CEPH_I_NODELAY; 2213 ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, 2214 false, __ceph_caps_used(ci), 2215 __ceph_caps_wanted(ci), 2216 cap->issued | cap->implemented, 2217 cf->caps, cf->tid, oldest_flush_tid); 2218 if (ret) { 2219 pr_err("kick_flushing_caps: error sending " 2220 "cap flush, ino (%llx.%llx) " 2221 "tid %llu flushing %s\n", 2222 ceph_vinop(inode), cf->tid, 2223 ceph_cap_string(cf->caps)); 2224 } 2225 } else { 2226 struct ceph_cap_snap *capsnap = 2227 container_of(cf, struct ceph_cap_snap, 2228 cap_flush); 2229 dout("kick_flushing_caps %p capsnap %p tid %llu %s\n", 2230 inode, capsnap, cf->tid, 2231 ceph_cap_string(capsnap->dirty)); 2232 2233 refcount_inc(&capsnap->nref); 2234 spin_unlock(&ci->i_ceph_lock); 2235 2236 ret = __send_flush_snap(inode, session, capsnap, cap->mseq, 2237 oldest_flush_tid); 2238 if (ret < 0) { 2239 pr_err("kick_flushing_caps: error sending " 2240 "cap flushsnap, ino (%llx.%llx) " 2241 "tid %llu follows %llu\n", 2242 ceph_vinop(inode), cf->tid, 2243 capsnap->follows); 2244 } 2245 2246 ceph_put_cap_snap(capsnap); 2247 } 2248 2249 spin_lock(&ci->i_ceph_lock); 2250 } 2251 } 2252 2253 void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, 2254 struct ceph_mds_session *session) 2255 { 2256 struct ceph_inode_info *ci; 2257 struct ceph_cap *cap; 2258 u64 oldest_flush_tid; 2259 2260 dout("early_kick_flushing_caps mds%d\n", session->s_mds); 2261 2262 spin_lock(&mdsc->cap_dirty_lock); 2263 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 2264 spin_unlock(&mdsc->cap_dirty_lock); 2265 2266 list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { 2267 spin_lock(&ci->i_ceph_lock); 2268 cap = ci->i_auth_cap; 2269 if (!(cap && cap->session == session)) { 2270 pr_err("%p auth cap %p not mds%d ???\n", 2271 &ci->vfs_inode, cap, session->s_mds); 2272 spin_unlock(&ci->i_ceph_lock); 2273 continue; 2274 } 2275 2276 2277 /* 2278 * if flushing caps were revoked, we re-send the cap flush 2279 * in client reconnect stage. This guarantees MDS * processes 2280 * the cap flush message before issuing the flushing caps to 2281 * other client. 2282 */ 2283 if ((cap->issued & ci->i_flushing_caps) != 2284 ci->i_flushing_caps) { 2285 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 2286 __kick_flushing_caps(mdsc, session, ci, 2287 oldest_flush_tid); 2288 } else { 2289 ci->i_ceph_flags |= CEPH_I_KICK_FLUSH; 2290 } 2291 2292 spin_unlock(&ci->i_ceph_lock); 2293 } 2294 } 2295 2296 void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, 2297 struct ceph_mds_session *session) 2298 { 2299 struct ceph_inode_info *ci; 2300 struct ceph_cap *cap; 2301 u64 oldest_flush_tid; 2302 2303 dout("kick_flushing_caps mds%d\n", session->s_mds); 2304 2305 spin_lock(&mdsc->cap_dirty_lock); 2306 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 2307 spin_unlock(&mdsc->cap_dirty_lock); 2308 2309 list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { 2310 spin_lock(&ci->i_ceph_lock); 2311 cap = ci->i_auth_cap; 2312 if (!(cap && cap->session == session)) { 2313 pr_err("%p auth cap %p not mds%d ???\n", 2314 &ci->vfs_inode, cap, session->s_mds); 2315 spin_unlock(&ci->i_ceph_lock); 2316 continue; 2317 } 2318 if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) { 2319 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 2320 __kick_flushing_caps(mdsc, session, ci, 2321 oldest_flush_tid); 2322 } 2323 spin_unlock(&ci->i_ceph_lock); 2324 } 2325 } 2326 2327 static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, 2328 struct ceph_mds_session *session, 2329 struct inode *inode) 2330 __releases(ci->i_ceph_lock) 2331 { 2332 struct ceph_inode_info *ci = ceph_inode(inode); 2333 struct ceph_cap *cap; 2334 2335 cap = ci->i_auth_cap; 2336 dout("kick_flushing_inode_caps %p flushing %s\n", inode, 2337 ceph_cap_string(ci->i_flushing_caps)); 2338 2339 if (!list_empty(&ci->i_cap_flush_list)) { 2340 u64 oldest_flush_tid; 2341 spin_lock(&mdsc->cap_dirty_lock); 2342 list_move_tail(&ci->i_flushing_item, 2343 &cap->session->s_cap_flushing); 2344 oldest_flush_tid = __get_oldest_flush_tid(mdsc); 2345 spin_unlock(&mdsc->cap_dirty_lock); 2346 2347 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH; 2348 __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid); 2349 spin_unlock(&ci->i_ceph_lock); 2350 } else { 2351 spin_unlock(&ci->i_ceph_lock); 2352 } 2353 } 2354 2355 2356 /* 2357 * Take references to capabilities we hold, so that we don't release 2358 * them to the MDS prematurely. 2359 * 2360 * Protected by i_ceph_lock. 2361 */ 2362 static void __take_cap_refs(struct ceph_inode_info *ci, int got, 2363 bool snap_rwsem_locked) 2364 { 2365 if (got & CEPH_CAP_PIN) 2366 ci->i_pin_ref++; 2367 if (got & CEPH_CAP_FILE_RD) 2368 ci->i_rd_ref++; 2369 if (got & CEPH_CAP_FILE_CACHE) 2370 ci->i_rdcache_ref++; 2371 if (got & CEPH_CAP_FILE_WR) { 2372 if (ci->i_wr_ref == 0 && !ci->i_head_snapc) { 2373 BUG_ON(!snap_rwsem_locked); 2374 ci->i_head_snapc = ceph_get_snap_context( 2375 ci->i_snap_realm->cached_context); 2376 } 2377 ci->i_wr_ref++; 2378 } 2379 if (got & CEPH_CAP_FILE_BUFFER) { 2380 if (ci->i_wb_ref == 0) 2381 ihold(&ci->vfs_inode); 2382 ci->i_wb_ref++; 2383 dout("__take_cap_refs %p wb %d -> %d (?)\n", 2384 &ci->vfs_inode, ci->i_wb_ref-1, ci->i_wb_ref); 2385 } 2386 } 2387 2388 /* 2389 * Try to grab cap references. Specify those refs we @want, and the 2390 * minimal set we @need. Also include the larger offset we are writing 2391 * to (when applicable), and check against max_size here as well. 2392 * Note that caller is responsible for ensuring max_size increases are 2393 * requested from the MDS. 2394 */ 2395 static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, 2396 loff_t endoff, bool nonblock, int *got, int *err) 2397 { 2398 struct inode *inode = &ci->vfs_inode; 2399 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 2400 int ret = 0; 2401 int have, implemented; 2402 int file_wanted; 2403 bool snap_rwsem_locked = false; 2404 2405 dout("get_cap_refs %p need %s want %s\n", inode, 2406 ceph_cap_string(need), ceph_cap_string(want)); 2407 2408 again: 2409 spin_lock(&ci->i_ceph_lock); 2410 2411 /* make sure file is actually open */ 2412 file_wanted = __ceph_caps_file_wanted(ci); 2413 if ((file_wanted & need) != need) { 2414 dout("try_get_cap_refs need %s file_wanted %s, EBADF\n", 2415 ceph_cap_string(need), ceph_cap_string(file_wanted)); 2416 *err = -EBADF; 2417 ret = 1; 2418 goto out_unlock; 2419 } 2420 2421 /* finish pending truncate */ 2422 while (ci->i_truncate_pending) { 2423 spin_unlock(&ci->i_ceph_lock); 2424 if (snap_rwsem_locked) { 2425 up_read(&mdsc->snap_rwsem); 2426 snap_rwsem_locked = false; 2427 } 2428 __ceph_do_pending_vmtruncate(inode); 2429 spin_lock(&ci->i_ceph_lock); 2430 } 2431 2432 have = __ceph_caps_issued(ci, &implemented); 2433 2434 if (have & need & CEPH_CAP_FILE_WR) { 2435 if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) { 2436 dout("get_cap_refs %p endoff %llu > maxsize %llu\n", 2437 inode, endoff, ci->i_max_size); 2438 if (endoff > ci->i_requested_max_size) { 2439 *err = -EAGAIN; 2440 ret = 1; 2441 } 2442 goto out_unlock; 2443 } 2444 /* 2445 * If a sync write is in progress, we must wait, so that we 2446 * can get a final snapshot value for size+mtime. 2447 */ 2448 if (__ceph_have_pending_cap_snap(ci)) { 2449 dout("get_cap_refs %p cap_snap_pending\n", inode); 2450 goto out_unlock; 2451 } 2452 } 2453 2454 if ((have & need) == need) { 2455 /* 2456 * Look at (implemented & ~have & not) so that we keep waiting 2457 * on transition from wanted -> needed caps. This is needed 2458 * for WRBUFFER|WR -> WR to avoid a new WR sync write from 2459 * going before a prior buffered writeback happens. 2460 */ 2461 int not = want & ~(have & need); 2462 int revoking = implemented & ~have; 2463 dout("get_cap_refs %p have %s but not %s (revoking %s)\n", 2464 inode, ceph_cap_string(have), ceph_cap_string(not), 2465 ceph_cap_string(revoking)); 2466 if ((revoking & not) == 0) { 2467 if (!snap_rwsem_locked && 2468 !ci->i_head_snapc && 2469 (need & CEPH_CAP_FILE_WR)) { 2470 if (!down_read_trylock(&mdsc->snap_rwsem)) { 2471 /* 2472 * we can not call down_read() when 2473 * task isn't in TASK_RUNNING state 2474 */ 2475 if (nonblock) { 2476 *err = -EAGAIN; 2477 ret = 1; 2478 goto out_unlock; 2479 } 2480 2481 spin_unlock(&ci->i_ceph_lock); 2482 down_read(&mdsc->snap_rwsem); 2483 snap_rwsem_locked = true; 2484 goto again; 2485 } 2486 snap_rwsem_locked = true; 2487 } 2488 *got = need | (have & want); 2489 if ((need & CEPH_CAP_FILE_RD) && 2490 !(*got & CEPH_CAP_FILE_CACHE)) 2491 ceph_disable_fscache_readpage(ci); 2492 __take_cap_refs(ci, *got, true); 2493 ret = 1; 2494 } 2495 } else { 2496 int session_readonly = false; 2497 if ((need & CEPH_CAP_FILE_WR) && ci->i_auth_cap) { 2498 struct ceph_mds_session *s = ci->i_auth_cap->session; 2499 spin_lock(&s->s_cap_lock); 2500 session_readonly = s->s_readonly; 2501 spin_unlock(&s->s_cap_lock); 2502 } 2503 if (session_readonly) { 2504 dout("get_cap_refs %p needed %s but mds%d readonly\n", 2505 inode, ceph_cap_string(need), ci->i_auth_cap->mds); 2506 *err = -EROFS; 2507 ret = 1; 2508 goto out_unlock; 2509 } 2510 2511 if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) { 2512 int mds_wanted; 2513 if (READ_ONCE(mdsc->fsc->mount_state) == 2514 CEPH_MOUNT_SHUTDOWN) { 2515 dout("get_cap_refs %p forced umount\n", inode); 2516 *err = -EIO; 2517 ret = 1; 2518 goto out_unlock; 2519 } 2520 mds_wanted = __ceph_caps_mds_wanted(ci, false); 2521 if (need & ~(mds_wanted & need)) { 2522 dout("get_cap_refs %p caps were dropped" 2523 " (session killed?)\n", inode); 2524 *err = -ESTALE; 2525 ret = 1; 2526 goto out_unlock; 2527 } 2528 if (!(file_wanted & ~mds_wanted)) 2529 ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED; 2530 } 2531 2532 dout("get_cap_refs %p have %s needed %s\n", inode, 2533 ceph_cap_string(have), ceph_cap_string(need)); 2534 } 2535 out_unlock: 2536 spin_unlock(&ci->i_ceph_lock); 2537 if (snap_rwsem_locked) 2538 up_read(&mdsc->snap_rwsem); 2539 2540 dout("get_cap_refs %p ret %d got %s\n", inode, 2541 ret, ceph_cap_string(*got)); 2542 return ret; 2543 } 2544 2545 /* 2546 * Check the offset we are writing up to against our current 2547 * max_size. If necessary, tell the MDS we want to write to 2548 * a larger offset. 2549 */ 2550 static void check_max_size(struct inode *inode, loff_t endoff) 2551 { 2552 struct ceph_inode_info *ci = ceph_inode(inode); 2553 int check = 0; 2554 2555 /* do we need to explicitly request a larger max_size? */ 2556 spin_lock(&ci->i_ceph_lock); 2557 if (endoff >= ci->i_max_size && endoff > ci->i_wanted_max_size) { 2558 dout("write %p at large endoff %llu, req max_size\n", 2559 inode, endoff); 2560 ci->i_wanted_max_size = endoff; 2561 } 2562 /* duplicate ceph_check_caps()'s logic */ 2563 if (ci->i_auth_cap && 2564 (ci->i_auth_cap->issued & CEPH_CAP_FILE_WR) && 2565 ci->i_wanted_max_size > ci->i_max_size && 2566 ci->i_wanted_max_size > ci->i_requested_max_size) 2567 check = 1; 2568 spin_unlock(&ci->i_ceph_lock); 2569 if (check) 2570 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); 2571 } 2572 2573 int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got) 2574 { 2575 int ret, err = 0; 2576 2577 BUG_ON(need & ~CEPH_CAP_FILE_RD); 2578 BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)); 2579 ret = ceph_pool_perm_check(ci, need); 2580 if (ret < 0) 2581 return ret; 2582 2583 ret = try_get_cap_refs(ci, need, want, 0, true, got, &err); 2584 if (ret) { 2585 if (err == -EAGAIN) { 2586 ret = 0; 2587 } else if (err < 0) { 2588 ret = err; 2589 } 2590 } 2591 return ret; 2592 } 2593 2594 /* 2595 * Wait for caps, and take cap references. If we can't get a WR cap 2596 * due to a small max_size, make sure we check_max_size (and possibly 2597 * ask the mds) so we don't get hung up indefinitely. 2598 */ 2599 int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, 2600 loff_t endoff, int *got, struct page **pinned_page) 2601 { 2602 int _got, ret, err = 0; 2603 2604 ret = ceph_pool_perm_check(ci, need); 2605 if (ret < 0) 2606 return ret; 2607 2608 while (true) { 2609 if (endoff > 0) 2610 check_max_size(&ci->vfs_inode, endoff); 2611 2612 err = 0; 2613 _got = 0; 2614 ret = try_get_cap_refs(ci, need, want, endoff, 2615 false, &_got, &err); 2616 if (ret) { 2617 if (err == -EAGAIN) 2618 continue; 2619 if (err < 0) 2620 ret = err; 2621 } else { 2622 DEFINE_WAIT_FUNC(wait, woken_wake_function); 2623 add_wait_queue(&ci->i_cap_wq, &wait); 2624 2625 while (!try_get_cap_refs(ci, need, want, endoff, 2626 true, &_got, &err)) { 2627 if (signal_pending(current)) { 2628 ret = -ERESTARTSYS; 2629 break; 2630 } 2631 wait_woken(&wait, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT); 2632 } 2633 2634 remove_wait_queue(&ci->i_cap_wq, &wait); 2635 2636 if (err == -EAGAIN) 2637 continue; 2638 if (err < 0) 2639 ret = err; 2640 } 2641 if (ret < 0) { 2642 if (err == -ESTALE) { 2643 /* session was killed, try renew caps */ 2644 ret = ceph_renew_caps(&ci->vfs_inode); 2645 if (ret == 0) 2646 continue; 2647 } 2648 return ret; 2649 } 2650 2651 if (ci->i_inline_version != CEPH_INLINE_NONE && 2652 (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && 2653 i_size_read(&ci->vfs_inode) > 0) { 2654 struct page *page = 2655 find_get_page(ci->vfs_inode.i_mapping, 0); 2656 if (page) { 2657 if (PageUptodate(page)) { 2658 *pinned_page = page; 2659 break; 2660 } 2661 put_page(page); 2662 } 2663 /* 2664 * drop cap refs first because getattr while 2665 * holding * caps refs can cause deadlock. 2666 */ 2667 ceph_put_cap_refs(ci, _got); 2668 _got = 0; 2669 2670 /* 2671 * getattr request will bring inline data into 2672 * page cache 2673 */ 2674 ret = __ceph_do_getattr(&ci->vfs_inode, NULL, 2675 CEPH_STAT_CAP_INLINE_DATA, 2676 true); 2677 if (ret < 0) 2678 return ret; 2679 continue; 2680 } 2681 break; 2682 } 2683 2684 if ((_got & CEPH_CAP_FILE_RD) && (_got & CEPH_CAP_FILE_CACHE)) 2685 ceph_fscache_revalidate_cookie(ci); 2686 2687 *got = _got; 2688 return 0; 2689 } 2690 2691 /* 2692 * Take cap refs. Caller must already know we hold at least one ref 2693 * on the caps in question or we don't know this is safe. 2694 */ 2695 void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps) 2696 { 2697 spin_lock(&ci->i_ceph_lock); 2698 __take_cap_refs(ci, caps, false); 2699 spin_unlock(&ci->i_ceph_lock); 2700 } 2701 2702 2703 /* 2704 * drop cap_snap that is not associated with any snapshot. 2705 * we don't need to send FLUSHSNAP message for it. 2706 */ 2707 static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci, 2708 struct ceph_cap_snap *capsnap) 2709 { 2710 if (!capsnap->need_flush && 2711 !capsnap->writing && !capsnap->dirty_pages) { 2712 dout("dropping cap_snap %p follows %llu\n", 2713 capsnap, capsnap->follows); 2714 BUG_ON(capsnap->cap_flush.tid > 0); 2715 ceph_put_snap_context(capsnap->context); 2716 if (!list_is_last(&capsnap->ci_item, &ci->i_cap_snaps)) 2717 ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS; 2718 2719 list_del(&capsnap->ci_item); 2720 ceph_put_cap_snap(capsnap); 2721 return 1; 2722 } 2723 return 0; 2724 } 2725 2726 /* 2727 * Release cap refs. 2728 * 2729 * If we released the last ref on any given cap, call ceph_check_caps 2730 * to release (or schedule a release). 2731 * 2732 * If we are releasing a WR cap (from a sync write), finalize any affected 2733 * cap_snap, and wake up any waiters. 2734 */ 2735 void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) 2736 { 2737 struct inode *inode = &ci->vfs_inode; 2738 int last = 0, put = 0, flushsnaps = 0, wake = 0; 2739 2740 spin_lock(&ci->i_ceph_lock); 2741 if (had & CEPH_CAP_PIN) 2742 --ci->i_pin_ref; 2743 if (had & CEPH_CAP_FILE_RD) 2744 if (--ci->i_rd_ref == 0) 2745 last++; 2746 if (had & CEPH_CAP_FILE_CACHE) 2747 if (--ci->i_rdcache_ref == 0) 2748 last++; 2749 if (had & CEPH_CAP_FILE_BUFFER) { 2750 if (--ci->i_wb_ref == 0) { 2751 last++; 2752 put++; 2753 } 2754 dout("put_cap_refs %p wb %d -> %d (?)\n", 2755 inode, ci->i_wb_ref+1, ci->i_wb_ref); 2756 } 2757 if (had & CEPH_CAP_FILE_WR) 2758 if (--ci->i_wr_ref == 0) { 2759 last++; 2760 if (__ceph_have_pending_cap_snap(ci)) { 2761 struct ceph_cap_snap *capsnap = 2762 list_last_entry(&ci->i_cap_snaps, 2763 struct ceph_cap_snap, 2764 ci_item); 2765 capsnap->writing = 0; 2766 if (ceph_try_drop_cap_snap(ci, capsnap)) 2767 put++; 2768 else if (__ceph_finish_cap_snap(ci, capsnap)) 2769 flushsnaps = 1; 2770 wake = 1; 2771 } 2772 if (ci->i_wrbuffer_ref_head == 0 && 2773 ci->i_dirty_caps == 0 && 2774 ci->i_flushing_caps == 0) { 2775 BUG_ON(!ci->i_head_snapc); 2776 ceph_put_snap_context(ci->i_head_snapc); 2777 ci->i_head_snapc = NULL; 2778 } 2779 /* see comment in __ceph_remove_cap() */ 2780 if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) 2781 drop_inode_snap_realm(ci); 2782 } 2783 spin_unlock(&ci->i_ceph_lock); 2784 2785 dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had), 2786 last ? " last" : "", put ? " put" : ""); 2787 2788 if (last && !flushsnaps) 2789 ceph_check_caps(ci, 0, NULL); 2790 else if (flushsnaps) 2791 ceph_flush_snaps(ci, NULL); 2792 if (wake) 2793 wake_up_all(&ci->i_cap_wq); 2794 while (put-- > 0) 2795 iput(inode); 2796 } 2797 2798 /* 2799 * Release @nr WRBUFFER refs on dirty pages for the given @snapc snap 2800 * context. Adjust per-snap dirty page accounting as appropriate. 2801 * Once all dirty data for a cap_snap is flushed, flush snapped file 2802 * metadata back to the MDS. If we dropped the last ref, call 2803 * ceph_check_caps. 2804 */ 2805 void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, 2806 struct ceph_snap_context *snapc) 2807 { 2808 struct inode *inode = &ci->vfs_inode; 2809 struct ceph_cap_snap *capsnap = NULL; 2810 int put = 0; 2811 bool last = false; 2812 bool found = false; 2813 bool flush_snaps = false; 2814 bool complete_capsnap = false; 2815 2816 spin_lock(&ci->i_ceph_lock); 2817 ci->i_wrbuffer_ref -= nr; 2818 if (ci->i_wrbuffer_ref == 0) { 2819 last = true; 2820 put++; 2821 } 2822 2823 if (ci->i_head_snapc == snapc) { 2824 ci->i_wrbuffer_ref_head -= nr; 2825 if (ci->i_wrbuffer_ref_head == 0 && 2826 ci->i_wr_ref == 0 && 2827 ci->i_dirty_caps == 0 && 2828 ci->i_flushing_caps == 0) { 2829 BUG_ON(!ci->i_head_snapc); 2830 ceph_put_snap_context(ci->i_head_snapc); 2831 ci->i_head_snapc = NULL; 2832 } 2833 dout("put_wrbuffer_cap_refs on %p head %d/%d -> %d/%d %s\n", 2834 inode, 2835 ci->i_wrbuffer_ref+nr, ci->i_wrbuffer_ref_head+nr, 2836 ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head, 2837 last ? " LAST" : ""); 2838 } else { 2839 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 2840 if (capsnap->context == snapc) { 2841 found = true; 2842 break; 2843 } 2844 } 2845 BUG_ON(!found); 2846 capsnap->dirty_pages -= nr; 2847 if (capsnap->dirty_pages == 0) { 2848 complete_capsnap = true; 2849 if (!capsnap->writing) { 2850 if (ceph_try_drop_cap_snap(ci, capsnap)) { 2851 put++; 2852 } else { 2853 ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS; 2854 flush_snaps = true; 2855 } 2856 } 2857 } 2858 dout("put_wrbuffer_cap_refs on %p cap_snap %p " 2859 " snap %lld %d/%d -> %d/%d %s%s\n", 2860 inode, capsnap, capsnap->context->seq, 2861 ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr, 2862 ci->i_wrbuffer_ref, capsnap->dirty_pages, 2863 last ? " (wrbuffer last)" : "", 2864 complete_capsnap ? " (complete capsnap)" : ""); 2865 } 2866 2867 spin_unlock(&ci->i_ceph_lock); 2868 2869 if (last) { 2870 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); 2871 } else if (flush_snaps) { 2872 ceph_flush_snaps(ci, NULL); 2873 } 2874 if (complete_capsnap) 2875 wake_up_all(&ci->i_cap_wq); 2876 while (put-- > 0) 2877 iput(inode); 2878 } 2879 2880 /* 2881 * Invalidate unlinked inode's aliases, so we can drop the inode ASAP. 2882 */ 2883 static void invalidate_aliases(struct inode *inode) 2884 { 2885 struct dentry *dn, *prev = NULL; 2886 2887 dout("invalidate_aliases inode %p\n", inode); 2888 d_prune_aliases(inode); 2889 /* 2890 * For non-directory inode, d_find_alias() only returns 2891 * hashed dentry. After calling d_invalidate(), the 2892 * dentry becomes unhashed. 2893 * 2894 * For directory inode, d_find_alias() can return 2895 * unhashed dentry. But directory inode should have 2896 * one alias at most. 2897 */ 2898 while ((dn = d_find_alias(inode))) { 2899 if (dn == prev) { 2900 dput(dn); 2901 break; 2902 } 2903 d_invalidate(dn); 2904 if (prev) 2905 dput(prev); 2906 prev = dn; 2907 } 2908 if (prev) 2909 dput(prev); 2910 } 2911 2912 /* 2913 * Handle a cap GRANT message from the MDS. (Note that a GRANT may 2914 * actually be a revocation if it specifies a smaller cap set.) 2915 * 2916 * caller holds s_mutex and i_ceph_lock, we drop both. 2917 */ 2918 static void handle_cap_grant(struct ceph_mds_client *mdsc, 2919 struct inode *inode, struct ceph_mds_caps *grant, 2920 struct ceph_string **pns, u64 inline_version, 2921 void *inline_data, u32 inline_len, 2922 struct ceph_buffer *xattr_buf, 2923 struct ceph_mds_session *session, 2924 struct ceph_cap *cap, int issued) 2925 __releases(ci->i_ceph_lock) 2926 __releases(mdsc->snap_rwsem) 2927 { 2928 struct ceph_inode_info *ci = ceph_inode(inode); 2929 int mds = session->s_mds; 2930 int seq = le32_to_cpu(grant->seq); 2931 int newcaps = le32_to_cpu(grant->caps); 2932 int used, wanted, dirty; 2933 u64 size = le64_to_cpu(grant->size); 2934 u64 max_size = le64_to_cpu(grant->max_size); 2935 struct timespec mtime, atime, ctime; 2936 int check_caps = 0; 2937 bool wake = false; 2938 bool writeback = false; 2939 bool queue_trunc = false; 2940 bool queue_invalidate = false; 2941 bool deleted_inode = false; 2942 bool fill_inline = false; 2943 2944 dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", 2945 inode, cap, mds, seq, ceph_cap_string(newcaps)); 2946 dout(" size %llu max_size %llu, i_size %llu\n", size, max_size, 2947 inode->i_size); 2948 2949 2950 /* 2951 * auth mds of the inode changed. we received the cap export message, 2952 * but still haven't received the cap import message. handle_cap_export 2953 * updated the new auth MDS' cap. 2954 * 2955 * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message 2956 * that was sent before the cap import message. So don't remove caps. 2957 */ 2958 if (ceph_seq_cmp(seq, cap->seq) <= 0) { 2959 WARN_ON(cap != ci->i_auth_cap); 2960 WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id)); 2961 seq = cap->seq; 2962 newcaps |= cap->issued; 2963 } 2964 2965 /* 2966 * If CACHE is being revoked, and we have no dirty buffers, 2967 * try to invalidate (once). (If there are dirty buffers, we 2968 * will invalidate _after_ writeback.) 2969 */ 2970 if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */ 2971 ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && 2972 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && 2973 !(ci->i_wrbuffer_ref || ci->i_wb_ref)) { 2974 if (try_nonblocking_invalidate(inode)) { 2975 /* there were locked pages.. invalidate later 2976 in a separate thread. */ 2977 if (ci->i_rdcache_revoking != ci->i_rdcache_gen) { 2978 queue_invalidate = true; 2979 ci->i_rdcache_revoking = ci->i_rdcache_gen; 2980 } 2981 } 2982 } 2983 2984 /* side effects now are allowed */ 2985 cap->cap_gen = session->s_cap_gen; 2986 cap->seq = seq; 2987 2988 __check_cap_issue(ci, cap, newcaps); 2989 2990 if ((newcaps & CEPH_CAP_AUTH_SHARED) && 2991 (issued & CEPH_CAP_AUTH_EXCL) == 0) { 2992 inode->i_mode = le32_to_cpu(grant->mode); 2993 inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid)); 2994 inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid)); 2995 dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode, 2996 from_kuid(&init_user_ns, inode->i_uid), 2997 from_kgid(&init_user_ns, inode->i_gid)); 2998 } 2999 3000 if ((newcaps & CEPH_CAP_AUTH_SHARED) && 3001 (issued & CEPH_CAP_LINK_EXCL) == 0) { 3002 set_nlink(inode, le32_to_cpu(grant->nlink)); 3003 if (inode->i_nlink == 0 && 3004 (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL))) 3005 deleted_inode = true; 3006 } 3007 3008 if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) { 3009 int len = le32_to_cpu(grant->xattr_len); 3010 u64 version = le64_to_cpu(grant->xattr_version); 3011 3012 if (version > ci->i_xattrs.version) { 3013 dout(" got new xattrs v%llu on %p len %d\n", 3014 version, inode, len); 3015 if (ci->i_xattrs.blob) 3016 ceph_buffer_put(ci->i_xattrs.blob); 3017 ci->i_xattrs.blob = ceph_buffer_get(xattr_buf); 3018 ci->i_xattrs.version = version; 3019 ceph_forget_all_cached_acls(inode); 3020 } 3021 } 3022 3023 if (newcaps & CEPH_CAP_ANY_RD) { 3024 /* ctime/mtime/atime? */ 3025 ceph_decode_timespec(&mtime, &grant->mtime); 3026 ceph_decode_timespec(&atime, &grant->atime); 3027 ceph_decode_timespec(&ctime, &grant->ctime); 3028 ceph_fill_file_time(inode, issued, 3029 le32_to_cpu(grant->time_warp_seq), 3030 &ctime, &mtime, &atime); 3031 } 3032 3033 if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) { 3034 /* file layout may have changed */ 3035 s64 old_pool = ci->i_layout.pool_id; 3036 struct ceph_string *old_ns; 3037 3038 ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout); 3039 old_ns = rcu_dereference_protected(ci->i_layout.pool_ns, 3040 lockdep_is_held(&ci->i_ceph_lock)); 3041 rcu_assign_pointer(ci->i_layout.pool_ns, *pns); 3042 3043 if (ci->i_layout.pool_id != old_pool || *pns != old_ns) 3044 ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; 3045 3046 *pns = old_ns; 3047 3048 /* size/truncate_seq? */ 3049 queue_trunc = ceph_fill_file_size(inode, issued, 3050 le32_to_cpu(grant->truncate_seq), 3051 le64_to_cpu(grant->truncate_size), 3052 size); 3053 } 3054 3055 if (ci->i_auth_cap == cap && (newcaps & CEPH_CAP_ANY_FILE_WR)) { 3056 if (max_size != ci->i_max_size) { 3057 dout("max_size %lld -> %llu\n", 3058 ci->i_max_size, max_size); 3059 ci->i_max_size = max_size; 3060 if (max_size >= ci->i_wanted_max_size) { 3061 ci->i_wanted_max_size = 0; /* reset */ 3062 ci->i_requested_max_size = 0; 3063 } 3064 wake = true; 3065 } else if (ci->i_wanted_max_size > ci->i_max_size && 3066 ci->i_wanted_max_size > ci->i_requested_max_size) { 3067 /* CEPH_CAP_OP_IMPORT */ 3068 wake = true; 3069 } 3070 } 3071 3072 /* check cap bits */ 3073 wanted = __ceph_caps_wanted(ci); 3074 used = __ceph_caps_used(ci); 3075 dirty = __ceph_caps_dirty(ci); 3076 dout(" my wanted = %s, used = %s, dirty %s\n", 3077 ceph_cap_string(wanted), 3078 ceph_cap_string(used), 3079 ceph_cap_string(dirty)); 3080 if (wanted != le32_to_cpu(grant->wanted)) { 3081 dout("mds wanted %s -> %s\n", 3082 ceph_cap_string(le32_to_cpu(grant->wanted)), 3083 ceph_cap_string(wanted)); 3084 /* imported cap may not have correct mds_wanted */ 3085 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) 3086 check_caps = 1; 3087 } 3088 3089 /* revocation, grant, or no-op? */ 3090 if (cap->issued & ~newcaps) { 3091 int revoking = cap->issued & ~newcaps; 3092 3093 dout("revocation: %s -> %s (revoking %s)\n", 3094 ceph_cap_string(cap->issued), 3095 ceph_cap_string(newcaps), 3096 ceph_cap_string(revoking)); 3097 if (revoking & used & CEPH_CAP_FILE_BUFFER) 3098 writeback = true; /* initiate writeback; will delay ack */ 3099 else if (revoking == CEPH_CAP_FILE_CACHE && 3100 (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && 3101 queue_invalidate) 3102 ; /* do nothing yet, invalidation will be queued */ 3103 else if (cap == ci->i_auth_cap) 3104 check_caps = 1; /* check auth cap only */ 3105 else 3106 check_caps = 2; /* check all caps */ 3107 cap->issued = newcaps; 3108 cap->implemented |= newcaps; 3109 } else if (cap->issued == newcaps) { 3110 dout("caps unchanged: %s -> %s\n", 3111 ceph_cap_string(cap->issued), ceph_cap_string(newcaps)); 3112 } else { 3113 dout("grant: %s -> %s\n", ceph_cap_string(cap->issued), 3114 ceph_cap_string(newcaps)); 3115 /* non-auth MDS is revoking the newly grant caps ? */ 3116 if (cap == ci->i_auth_cap && 3117 __ceph_caps_revoking_other(ci, cap, newcaps)) 3118 check_caps = 2; 3119 3120 cap->issued = newcaps; 3121 cap->implemented |= newcaps; /* add bits only, to 3122 * avoid stepping on a 3123 * pending revocation */ 3124 wake = true; 3125 } 3126 BUG_ON(cap->issued & ~cap->implemented); 3127 3128 if (inline_version > 0 && inline_version >= ci->i_inline_version) { 3129 ci->i_inline_version = inline_version; 3130 if (ci->i_inline_version != CEPH_INLINE_NONE && 3131 (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO))) 3132 fill_inline = true; 3133 } 3134 3135 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { 3136 if (newcaps & ~issued) 3137 wake = true; 3138 kick_flushing_inode_caps(mdsc, session, inode); 3139 up_read(&mdsc->snap_rwsem); 3140 } else { 3141 spin_unlock(&ci->i_ceph_lock); 3142 } 3143 3144 if (fill_inline) 3145 ceph_fill_inline_data(inode, NULL, inline_data, inline_len); 3146 3147 if (queue_trunc) 3148 ceph_queue_vmtruncate(inode); 3149 3150 if (writeback) 3151 /* 3152 * queue inode for writeback: we can't actually call 3153 * filemap_write_and_wait, etc. from message handler 3154 * context. 3155 */ 3156 ceph_queue_writeback(inode); 3157 if (queue_invalidate) 3158 ceph_queue_invalidate(inode); 3159 if (deleted_inode) 3160 invalidate_aliases(inode); 3161 if (wake) 3162 wake_up_all(&ci->i_cap_wq); 3163 3164 if (check_caps == 1) 3165 ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY, 3166 session); 3167 else if (check_caps == 2) 3168 ceph_check_caps(ci, CHECK_CAPS_NODELAY, session); 3169 else 3170 mutex_unlock(&session->s_mutex); 3171 } 3172 3173 /* 3174 * Handle FLUSH_ACK from MDS, indicating that metadata we sent to the 3175 * MDS has been safely committed. 3176 */ 3177 static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, 3178 struct ceph_mds_caps *m, 3179 struct ceph_mds_session *session, 3180 struct ceph_cap *cap) 3181 __releases(ci->i_ceph_lock) 3182 { 3183 struct ceph_inode_info *ci = ceph_inode(inode); 3184 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 3185 struct ceph_cap_flush *cf, *tmp_cf; 3186 LIST_HEAD(to_remove); 3187 unsigned seq = le32_to_cpu(m->seq); 3188 int dirty = le32_to_cpu(m->dirty); 3189 int cleaned = 0; 3190 bool drop = false; 3191 bool wake_ci = 0; 3192 bool wake_mdsc = 0; 3193 3194 list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) { 3195 if (cf->tid == flush_tid) 3196 cleaned = cf->caps; 3197 if (cf->caps == 0) /* capsnap */ 3198 continue; 3199 if (cf->tid <= flush_tid) { 3200 if (__finish_cap_flush(NULL, ci, cf)) 3201 wake_ci = true; 3202 list_add_tail(&cf->i_list, &to_remove); 3203 } else { 3204 cleaned &= ~cf->caps; 3205 if (!cleaned) 3206 break; 3207 } 3208 } 3209 3210 dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s," 3211 " flushing %s -> %s\n", 3212 inode, session->s_mds, seq, ceph_cap_string(dirty), 3213 ceph_cap_string(cleaned), ceph_cap_string(ci->i_flushing_caps), 3214 ceph_cap_string(ci->i_flushing_caps & ~cleaned)); 3215 3216 if (list_empty(&to_remove) && !cleaned) 3217 goto out; 3218 3219 ci->i_flushing_caps &= ~cleaned; 3220 3221 spin_lock(&mdsc->cap_dirty_lock); 3222 3223 list_for_each_entry(cf, &to_remove, i_list) { 3224 if (__finish_cap_flush(mdsc, NULL, cf)) 3225 wake_mdsc = true; 3226 } 3227 3228 if (ci->i_flushing_caps == 0) { 3229 if (list_empty(&ci->i_cap_flush_list)) { 3230 list_del_init(&ci->i_flushing_item); 3231 if (!list_empty(&session->s_cap_flushing)) { 3232 dout(" mds%d still flushing cap on %p\n", 3233 session->s_mds, 3234 &list_first_entry(&session->s_cap_flushing, 3235 struct ceph_inode_info, 3236 i_flushing_item)->vfs_inode); 3237 } 3238 } 3239 mdsc->num_cap_flushing--; 3240 dout(" inode %p now !flushing\n", inode); 3241 3242 if (ci->i_dirty_caps == 0) { 3243 dout(" inode %p now clean\n", inode); 3244 BUG_ON(!list_empty(&ci->i_dirty_item)); 3245 drop = true; 3246 if (ci->i_wr_ref == 0 && 3247 ci->i_wrbuffer_ref_head == 0) { 3248 BUG_ON(!ci->i_head_snapc); 3249 ceph_put_snap_context(ci->i_head_snapc); 3250 ci->i_head_snapc = NULL; 3251 } 3252 } else { 3253 BUG_ON(list_empty(&ci->i_dirty_item)); 3254 } 3255 } 3256 spin_unlock(&mdsc->cap_dirty_lock); 3257 3258 out: 3259 spin_unlock(&ci->i_ceph_lock); 3260 3261 while (!list_empty(&to_remove)) { 3262 cf = list_first_entry(&to_remove, 3263 struct ceph_cap_flush, i_list); 3264 list_del(&cf->i_list); 3265 ceph_free_cap_flush(cf); 3266 } 3267 3268 if (wake_ci) 3269 wake_up_all(&ci->i_cap_wq); 3270 if (wake_mdsc) 3271 wake_up_all(&mdsc->cap_flushing_wq); 3272 if (drop) 3273 iput(inode); 3274 } 3275 3276 /* 3277 * Handle FLUSHSNAP_ACK. MDS has flushed snap data to disk and we can 3278 * throw away our cap_snap. 3279 * 3280 * Caller hold s_mutex. 3281 */ 3282 static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, 3283 struct ceph_mds_caps *m, 3284 struct ceph_mds_session *session) 3285 { 3286 struct ceph_inode_info *ci = ceph_inode(inode); 3287 struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; 3288 u64 follows = le64_to_cpu(m->snap_follows); 3289 struct ceph_cap_snap *capsnap; 3290 bool flushed = false; 3291 bool wake_ci = false; 3292 bool wake_mdsc = false; 3293 3294 dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n", 3295 inode, ci, session->s_mds, follows); 3296 3297 spin_lock(&ci->i_ceph_lock); 3298 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) { 3299 if (capsnap->follows == follows) { 3300 if (capsnap->cap_flush.tid != flush_tid) { 3301 dout(" cap_snap %p follows %lld tid %lld !=" 3302 " %lld\n", capsnap, follows, 3303 flush_tid, capsnap->cap_flush.tid); 3304 break; 3305 } 3306 flushed = true; 3307 break; 3308 } else { 3309 dout(" skipping cap_snap %p follows %lld\n", 3310 capsnap, capsnap->follows); 3311 } 3312 } 3313 if (flushed) { 3314 WARN_ON(capsnap->dirty_pages || capsnap->writing); 3315 dout(" removing %p cap_snap %p follows %lld\n", 3316 inode, capsnap, follows); 3317 list_del(&capsnap->ci_item); 3318 if (__finish_cap_flush(NULL, ci, &capsnap->cap_flush)) 3319 wake_ci = true; 3320 3321 spin_lock(&mdsc->cap_dirty_lock); 3322 3323 if (list_empty(&ci->i_cap_flush_list)) 3324 list_del_init(&ci->i_flushing_item); 3325 3326 if (__finish_cap_flush(mdsc, NULL, &capsnap->cap_flush)) 3327 wake_mdsc = true; 3328 3329 spin_unlock(&mdsc->cap_dirty_lock); 3330 } 3331 spin_unlock(&ci->i_ceph_lock); 3332 if (flushed) { 3333 ceph_put_snap_context(capsnap->context); 3334 ceph_put_cap_snap(capsnap); 3335 if (wake_ci) 3336 wake_up_all(&ci->i_cap_wq); 3337 if (wake_mdsc) 3338 wake_up_all(&mdsc->cap_flushing_wq); 3339 iput(inode); 3340 } 3341 } 3342 3343 /* 3344 * Handle TRUNC from MDS, indicating file truncation. 3345 * 3346 * caller hold s_mutex. 3347 */ 3348 static void handle_cap_trunc(struct inode *inode, 3349 struct ceph_mds_caps *trunc, 3350 struct ceph_mds_session *session) 3351 __releases(ci->i_ceph_lock) 3352 { 3353 struct ceph_inode_info *ci = ceph_inode(inode); 3354 int mds = session->s_mds; 3355 int seq = le32_to_cpu(trunc->seq); 3356 u32 truncate_seq = le32_to_cpu(trunc->truncate_seq); 3357 u64 truncate_size = le64_to_cpu(trunc->truncate_size); 3358 u64 size = le64_to_cpu(trunc->size); 3359 int implemented = 0; 3360 int dirty = __ceph_caps_dirty(ci); 3361 int issued = __ceph_caps_issued(ceph_inode(inode), &implemented); 3362 int queue_trunc = 0; 3363 3364 issued |= implemented | dirty; 3365 3366 dout("handle_cap_trunc inode %p mds%d seq %d to %lld seq %d\n", 3367 inode, mds, seq, truncate_size, truncate_seq); 3368 queue_trunc = ceph_fill_file_size(inode, issued, 3369 truncate_seq, truncate_size, size); 3370 spin_unlock(&ci->i_ceph_lock); 3371 3372 if (queue_trunc) 3373 ceph_queue_vmtruncate(inode); 3374 } 3375 3376 /* 3377 * Handle EXPORT from MDS. Cap is being migrated _from_ this mds to a 3378 * different one. If we are the most recent migration we've seen (as 3379 * indicated by mseq), make note of the migrating cap bits for the 3380 * duration (until we see the corresponding IMPORT). 3381 * 3382 * caller holds s_mutex 3383 */ 3384 static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, 3385 struct ceph_mds_cap_peer *ph, 3386 struct ceph_mds_session *session) 3387 { 3388 struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; 3389 struct ceph_mds_session *tsession = NULL; 3390 struct ceph_cap *cap, *tcap, *new_cap = NULL; 3391 struct ceph_inode_info *ci = ceph_inode(inode); 3392 u64 t_cap_id; 3393 unsigned mseq = le32_to_cpu(ex->migrate_seq); 3394 unsigned t_seq, t_mseq; 3395 int target, issued; 3396 int mds = session->s_mds; 3397 3398 if (ph) { 3399 t_cap_id = le64_to_cpu(ph->cap_id); 3400 t_seq = le32_to_cpu(ph->seq); 3401 t_mseq = le32_to_cpu(ph->mseq); 3402 target = le32_to_cpu(ph->mds); 3403 } else { 3404 t_cap_id = t_seq = t_mseq = 0; 3405 target = -1; 3406 } 3407 3408 dout("handle_cap_export inode %p ci %p mds%d mseq %d target %d\n", 3409 inode, ci, mds, mseq, target); 3410 retry: 3411 spin_lock(&ci->i_ceph_lock); 3412 cap = __get_cap_for_mds(ci, mds); 3413 if (!cap || cap->cap_id != le64_to_cpu(ex->cap_id)) 3414 goto out_unlock; 3415 3416 if (target < 0) { 3417 __ceph_remove_cap(cap, false); 3418 if (!ci->i_auth_cap) 3419 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED; 3420 goto out_unlock; 3421 } 3422 3423 /* 3424 * now we know we haven't received the cap import message yet 3425 * because the exported cap still exist. 3426 */ 3427 3428 issued = cap->issued; 3429 WARN_ON(issued != cap->implemented); 3430 3431 tcap = __get_cap_for_mds(ci, target); 3432 if (tcap) { 3433 /* already have caps from the target */ 3434 if (tcap->cap_id == t_cap_id && 3435 ceph_seq_cmp(tcap->seq, t_seq) < 0) { 3436 dout(" updating import cap %p mds%d\n", tcap, target); 3437 tcap->cap_id = t_cap_id; 3438 tcap->seq = t_seq - 1; 3439 tcap->issue_seq = t_seq - 1; 3440 tcap->mseq = t_mseq; 3441 tcap->issued |= issued; 3442 tcap->implemented |= issued; 3443 if (cap == ci->i_auth_cap) 3444 ci->i_auth_cap = tcap; 3445 3446 if (!list_empty(&ci->i_cap_flush_list) && 3447 ci->i_auth_cap == tcap) { 3448 spin_lock(&mdsc->cap_dirty_lock); 3449 list_move_tail(&ci->i_flushing_item, 3450 &tcap->session->s_cap_flushing); 3451 spin_unlock(&mdsc->cap_dirty_lock); 3452 } 3453 } 3454 __ceph_remove_cap(cap, false); 3455 goto out_unlock; 3456 } else if (tsession) { 3457 /* add placeholder for the export tagert */ 3458 int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0; 3459 tcap = new_cap; 3460 ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0, 3461 t_seq - 1, t_mseq, (u64)-1, flag, &new_cap); 3462 3463 if (!list_empty(&ci->i_cap_flush_list) && 3464 ci->i_auth_cap == tcap) { 3465 spin_lock(&mdsc->cap_dirty_lock); 3466 list_move_tail(&ci->i_flushing_item, 3467 &tcap->session->s_cap_flushing); 3468 spin_unlock(&mdsc->cap_dirty_lock); 3469 } 3470 3471 __ceph_remove_cap(cap, false); 3472 goto out_unlock; 3473 } 3474 3475 spin_unlock(&ci->i_ceph_lock); 3476 mutex_unlock(&session->s_mutex); 3477 3478 /* open target session */ 3479 tsession = ceph_mdsc_open_export_target_session(mdsc, target); 3480 if (!IS_ERR(tsession)) { 3481 if (mds > target) { 3482 mutex_lock(&session->s_mutex); 3483 mutex_lock_nested(&tsession->s_mutex, 3484 SINGLE_DEPTH_NESTING); 3485 } else { 3486 mutex_lock(&tsession->s_mutex); 3487 mutex_lock_nested(&session->s_mutex, 3488 SINGLE_DEPTH_NESTING); 3489 } 3490 new_cap = ceph_get_cap(mdsc, NULL); 3491 } else { 3492 WARN_ON(1); 3493 tsession = NULL; 3494 target = -1; 3495 } 3496 goto retry; 3497 3498 out_unlock: 3499 spin_unlock(&ci->i_ceph_lock); 3500 mutex_unlock(&session->s_mutex); 3501 if (tsession) { 3502 mutex_unlock(&tsession->s_mutex); 3503 ceph_put_mds_session(tsession); 3504 } 3505 if (new_cap) 3506 ceph_put_cap(mdsc, new_cap); 3507 } 3508 3509 /* 3510 * Handle cap IMPORT. 3511 * 3512 * caller holds s_mutex. acquires i_ceph_lock 3513 */ 3514 static void handle_cap_import(struct ceph_mds_client *mdsc, 3515 struct inode *inode, struct ceph_mds_caps *im, 3516 struct ceph_mds_cap_peer *ph, 3517 struct ceph_mds_session *session, 3518 struct ceph_cap **target_cap, int *old_issued) 3519 __acquires(ci->i_ceph_lock) 3520 { 3521 struct ceph_inode_info *ci = ceph_inode(inode); 3522 struct ceph_cap *cap, *ocap, *new_cap = NULL; 3523 int mds = session->s_mds; 3524 int issued; 3525 unsigned caps = le32_to_cpu(im->caps); 3526 unsigned wanted = le32_to_cpu(im->wanted); 3527 unsigned seq = le32_to_cpu(im->seq); 3528 unsigned mseq = le32_to_cpu(im->migrate_seq); 3529 u64 realmino = le64_to_cpu(im->realm); 3530 u64 cap_id = le64_to_cpu(im->cap_id); 3531 u64 p_cap_id; 3532 int peer; 3533 3534 if (ph) { 3535 p_cap_id = le64_to_cpu(ph->cap_id); 3536 peer = le32_to_cpu(ph->mds); 3537 } else { 3538 p_cap_id = 0; 3539 peer = -1; 3540 } 3541 3542 dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n", 3543 inode, ci, mds, mseq, peer); 3544 3545 retry: 3546 spin_lock(&ci->i_ceph_lock); 3547 cap = __get_cap_for_mds(ci, mds); 3548 if (!cap) { 3549 if (!new_cap) { 3550 spin_unlock(&ci->i_ceph_lock); 3551 new_cap = ceph_get_cap(mdsc, NULL); 3552 goto retry; 3553 } 3554 cap = new_cap; 3555 } else { 3556 if (new_cap) { 3557 ceph_put_cap(mdsc, new_cap); 3558 new_cap = NULL; 3559 } 3560 } 3561 3562 __ceph_caps_issued(ci, &issued); 3563 issued |= __ceph_caps_dirty(ci); 3564 3565 ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq, 3566 realmino, CEPH_CAP_FLAG_AUTH, &new_cap); 3567 3568 ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL; 3569 if (ocap && ocap->cap_id == p_cap_id) { 3570 dout(" remove export cap %p mds%d flags %d\n", 3571 ocap, peer, ph->flags); 3572 if ((ph->flags & CEPH_CAP_FLAG_AUTH) && 3573 (ocap->seq != le32_to_cpu(ph->seq) || 3574 ocap->mseq != le32_to_cpu(ph->mseq))) { 3575 pr_err("handle_cap_import: mismatched seq/mseq: " 3576 "ino (%llx.%llx) mds%d seq %d mseq %d " 3577 "importer mds%d has peer seq %d mseq %d\n", 3578 ceph_vinop(inode), peer, ocap->seq, 3579 ocap->mseq, mds, le32_to_cpu(ph->seq), 3580 le32_to_cpu(ph->mseq)); 3581 } 3582 __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE)); 3583 } 3584 3585 /* make sure we re-request max_size, if necessary */ 3586 ci->i_requested_max_size = 0; 3587 3588 *old_issued = issued; 3589 *target_cap = cap; 3590 } 3591 3592 /* 3593 * Handle a caps message from the MDS. 3594 * 3595 * Identify the appropriate session, inode, and call the right handler 3596 * based on the cap op. 3597 */ 3598 void ceph_handle_caps(struct ceph_mds_session *session, 3599 struct ceph_msg *msg) 3600 { 3601 struct ceph_mds_client *mdsc = session->s_mdsc; 3602 struct super_block *sb = mdsc->fsc->sb; 3603 struct inode *inode; 3604 struct ceph_inode_info *ci; 3605 struct ceph_cap *cap; 3606 struct ceph_mds_caps *h; 3607 struct ceph_mds_cap_peer *peer = NULL; 3608 struct ceph_snap_realm *realm = NULL; 3609 struct ceph_string *pool_ns = NULL; 3610 int mds = session->s_mds; 3611 int op, issued; 3612 u32 seq, mseq; 3613 struct ceph_vino vino; 3614 u64 tid; 3615 u64 inline_version = 0; 3616 void *inline_data = NULL; 3617 u32 inline_len = 0; 3618 void *snaptrace; 3619 size_t snaptrace_len; 3620 void *p, *end; 3621 3622 dout("handle_caps from mds%d\n", mds); 3623 3624 /* decode */ 3625 end = msg->front.iov_base + msg->front.iov_len; 3626 tid = le64_to_cpu(msg->hdr.tid); 3627 if (msg->front.iov_len < sizeof(*h)) 3628 goto bad; 3629 h = msg->front.iov_base; 3630 op = le32_to_cpu(h->op); 3631 vino.ino = le64_to_cpu(h->ino); 3632 vino.snap = CEPH_NOSNAP; 3633 seq = le32_to_cpu(h->seq); 3634 mseq = le32_to_cpu(h->migrate_seq); 3635 3636 snaptrace = h + 1; 3637 snaptrace_len = le32_to_cpu(h->snap_trace_len); 3638 p = snaptrace + snaptrace_len; 3639 3640 if (le16_to_cpu(msg->hdr.version) >= 2) { 3641 u32 flock_len; 3642 ceph_decode_32_safe(&p, end, flock_len, bad); 3643 if (p + flock_len > end) 3644 goto bad; 3645 p += flock_len; 3646 } 3647 3648 if (le16_to_cpu(msg->hdr.version) >= 3) { 3649 if (op == CEPH_CAP_OP_IMPORT) { 3650 if (p + sizeof(*peer) > end) 3651 goto bad; 3652 peer = p; 3653 p += sizeof(*peer); 3654 } else if (op == CEPH_CAP_OP_EXPORT) { 3655 /* recorded in unused fields */ 3656 peer = (void *)&h->size; 3657 } 3658 } 3659 3660 if (le16_to_cpu(msg->hdr.version) >= 4) { 3661 ceph_decode_64_safe(&p, end, inline_version, bad); 3662 ceph_decode_32_safe(&p, end, inline_len, bad); 3663 if (p + inline_len > end) 3664 goto bad; 3665 inline_data = p; 3666 p += inline_len; 3667 } 3668 3669 if (le16_to_cpu(msg->hdr.version) >= 5) { 3670 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 3671 u32 epoch_barrier; 3672 3673 ceph_decode_32_safe(&p, end, epoch_barrier, bad); 3674 ceph_osdc_update_epoch_barrier(osdc, epoch_barrier); 3675 } 3676 3677 if (le16_to_cpu(msg->hdr.version) >= 8) { 3678 u64 flush_tid; 3679 u32 caller_uid, caller_gid; 3680 u32 pool_ns_len; 3681 3682 /* version >= 6 */ 3683 ceph_decode_64_safe(&p, end, flush_tid, bad); 3684 /* version >= 7 */ 3685 ceph_decode_32_safe(&p, end, caller_uid, bad); 3686 ceph_decode_32_safe(&p, end, caller_gid, bad); 3687 /* version >= 8 */ 3688 ceph_decode_32_safe(&p, end, pool_ns_len, bad); 3689 if (pool_ns_len > 0) { 3690 ceph_decode_need(&p, end, pool_ns_len, bad); 3691 pool_ns = ceph_find_or_create_string(p, pool_ns_len); 3692 p += pool_ns_len; 3693 } 3694 } 3695 3696 /* lookup ino */ 3697 inode = ceph_find_inode(sb, vino); 3698 ci = ceph_inode(inode); 3699 dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino, 3700 vino.snap, inode); 3701 3702 mutex_lock(&session->s_mutex); 3703 session->s_seq++; 3704 dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, 3705 (unsigned)seq); 3706 3707 if (!inode) { 3708 dout(" i don't have ino %llx\n", vino.ino); 3709 3710 if (op == CEPH_CAP_OP_IMPORT) { 3711 cap = ceph_get_cap(mdsc, NULL); 3712 cap->cap_ino = vino.ino; 3713 cap->queue_release = 1; 3714 cap->cap_id = le64_to_cpu(h->cap_id); 3715 cap->mseq = mseq; 3716 cap->seq = seq; 3717 cap->issue_seq = seq; 3718 spin_lock(&session->s_cap_lock); 3719 list_add_tail(&cap->session_caps, 3720 &session->s_cap_releases); 3721 session->s_num_cap_releases++; 3722 spin_unlock(&session->s_cap_lock); 3723 } 3724 goto flush_cap_releases; 3725 } 3726 3727 /* these will work even if we don't have a cap yet */ 3728 switch (op) { 3729 case CEPH_CAP_OP_FLUSHSNAP_ACK: 3730 handle_cap_flushsnap_ack(inode, tid, h, session); 3731 goto done; 3732 3733 case CEPH_CAP_OP_EXPORT: 3734 handle_cap_export(inode, h, peer, session); 3735 goto done_unlocked; 3736 3737 case CEPH_CAP_OP_IMPORT: 3738 realm = NULL; 3739 if (snaptrace_len) { 3740 down_write(&mdsc->snap_rwsem); 3741 ceph_update_snap_trace(mdsc, snaptrace, 3742 snaptrace + snaptrace_len, 3743 false, &realm); 3744 downgrade_write(&mdsc->snap_rwsem); 3745 } else { 3746 down_read(&mdsc->snap_rwsem); 3747 } 3748 handle_cap_import(mdsc, inode, h, peer, session, 3749 &cap, &issued); 3750 handle_cap_grant(mdsc, inode, h, &pool_ns, 3751 inline_version, inline_data, inline_len, 3752 msg->middle, session, cap, issued); 3753 if (realm) 3754 ceph_put_snap_realm(mdsc, realm); 3755 goto done_unlocked; 3756 } 3757 3758 /* the rest require a cap */ 3759 spin_lock(&ci->i_ceph_lock); 3760 cap = __get_cap_for_mds(ceph_inode(inode), mds); 3761 if (!cap) { 3762 dout(" no cap on %p ino %llx.%llx from mds%d\n", 3763 inode, ceph_ino(inode), ceph_snap(inode), mds); 3764 spin_unlock(&ci->i_ceph_lock); 3765 goto flush_cap_releases; 3766 } 3767 3768 /* note that each of these drops i_ceph_lock for us */ 3769 switch (op) { 3770 case CEPH_CAP_OP_REVOKE: 3771 case CEPH_CAP_OP_GRANT: 3772 __ceph_caps_issued(ci, &issued); 3773 issued |= __ceph_caps_dirty(ci); 3774 handle_cap_grant(mdsc, inode, h, &pool_ns, 3775 inline_version, inline_data, inline_len, 3776 msg->middle, session, cap, issued); 3777 goto done_unlocked; 3778 3779 case CEPH_CAP_OP_FLUSH_ACK: 3780 handle_cap_flush_ack(inode, tid, h, session, cap); 3781 break; 3782 3783 case CEPH_CAP_OP_TRUNC: 3784 handle_cap_trunc(inode, h, session); 3785 break; 3786 3787 default: 3788 spin_unlock(&ci->i_ceph_lock); 3789 pr_err("ceph_handle_caps: unknown cap op %d %s\n", op, 3790 ceph_cap_op_name(op)); 3791 } 3792 3793 goto done; 3794 3795 flush_cap_releases: 3796 /* 3797 * send any cap release message to try to move things 3798 * along for the mds (who clearly thinks we still have this 3799 * cap). 3800 */ 3801 ceph_send_cap_releases(mdsc, session); 3802 3803 done: 3804 mutex_unlock(&session->s_mutex); 3805 done_unlocked: 3806 iput(inode); 3807 ceph_put_string(pool_ns); 3808 return; 3809 3810 bad: 3811 pr_err("ceph_handle_caps: corrupt message\n"); 3812 ceph_msg_dump(msg); 3813 return; 3814 } 3815 3816 /* 3817 * Delayed work handler to process end of delayed cap release LRU list. 3818 */ 3819 void ceph_check_delayed_caps(struct ceph_mds_client *mdsc) 3820 { 3821 struct inode *inode; 3822 struct ceph_inode_info *ci; 3823 int flags = CHECK_CAPS_NODELAY; 3824 3825 dout("check_delayed_caps\n"); 3826 while (1) { 3827 spin_lock(&mdsc->cap_delay_lock); 3828 if (list_empty(&mdsc->cap_delay_list)) 3829 break; 3830 ci = list_first_entry(&mdsc->cap_delay_list, 3831 struct ceph_inode_info, 3832 i_cap_delay_list); 3833 if ((ci->i_ceph_flags & CEPH_I_FLUSH) == 0 && 3834 time_before(jiffies, ci->i_hold_caps_max)) 3835 break; 3836 list_del_init(&ci->i_cap_delay_list); 3837 3838 inode = igrab(&ci->vfs_inode); 3839 spin_unlock(&mdsc->cap_delay_lock); 3840 3841 if (inode) { 3842 dout("check_delayed_caps on %p\n", inode); 3843 ceph_check_caps(ci, flags, NULL); 3844 iput(inode); 3845 } 3846 } 3847 spin_unlock(&mdsc->cap_delay_lock); 3848 } 3849 3850 /* 3851 * Flush all dirty caps to the mds 3852 */ 3853 void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc) 3854 { 3855 struct ceph_inode_info *ci; 3856 struct inode *inode; 3857 3858 dout("flush_dirty_caps\n"); 3859 spin_lock(&mdsc->cap_dirty_lock); 3860 while (!list_empty(&mdsc->cap_dirty)) { 3861 ci = list_first_entry(&mdsc->cap_dirty, struct ceph_inode_info, 3862 i_dirty_item); 3863 inode = &ci->vfs_inode; 3864 ihold(inode); 3865 dout("flush_dirty_caps %p\n", inode); 3866 spin_unlock(&mdsc->cap_dirty_lock); 3867 ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_FLUSH, NULL); 3868 iput(inode); 3869 spin_lock(&mdsc->cap_dirty_lock); 3870 } 3871 spin_unlock(&mdsc->cap_dirty_lock); 3872 dout("flush_dirty_caps done\n"); 3873 } 3874 3875 void __ceph_get_fmode(struct ceph_inode_info *ci, int fmode) 3876 { 3877 int i; 3878 int bits = (fmode << 1) | 1; 3879 for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { 3880 if (bits & (1 << i)) 3881 ci->i_nr_by_mode[i]++; 3882 } 3883 } 3884 3885 /* 3886 * Drop open file reference. If we were the last open file, 3887 * we may need to release capabilities to the MDS (or schedule 3888 * their delayed release). 3889 */ 3890 void ceph_put_fmode(struct ceph_inode_info *ci, int fmode) 3891 { 3892 int i, last = 0; 3893 int bits = (fmode << 1) | 1; 3894 spin_lock(&ci->i_ceph_lock); 3895 for (i = 0; i < CEPH_FILE_MODE_BITS; i++) { 3896 if (bits & (1 << i)) { 3897 BUG_ON(ci->i_nr_by_mode[i] == 0); 3898 if (--ci->i_nr_by_mode[i] == 0) 3899 last++; 3900 } 3901 } 3902 dout("put_fmode %p fmode %d {%d,%d,%d,%d}\n", 3903 &ci->vfs_inode, fmode, 3904 ci->i_nr_by_mode[0], ci->i_nr_by_mode[1], 3905 ci->i_nr_by_mode[2], ci->i_nr_by_mode[3]); 3906 spin_unlock(&ci->i_ceph_lock); 3907 3908 if (last && ci->i_vino.snap == CEPH_NOSNAP) 3909 ceph_check_caps(ci, 0, NULL); 3910 } 3911 3912 /* 3913 * Helpers for embedding cap and dentry lease releases into mds 3914 * requests. 3915 * 3916 * @force is used by dentry_release (below) to force inclusion of a 3917 * record for the directory inode, even when there aren't any caps to 3918 * drop. 3919 */ 3920 int ceph_encode_inode_release(void **p, struct inode *inode, 3921 int mds, int drop, int unless, int force) 3922 { 3923 struct ceph_inode_info *ci = ceph_inode(inode); 3924 struct ceph_cap *cap; 3925 struct ceph_mds_request_release *rel = *p; 3926 int used, dirty; 3927 int ret = 0; 3928 3929 spin_lock(&ci->i_ceph_lock); 3930 used = __ceph_caps_used(ci); 3931 dirty = __ceph_caps_dirty(ci); 3932 3933 dout("encode_inode_release %p mds%d used|dirty %s drop %s unless %s\n", 3934 inode, mds, ceph_cap_string(used|dirty), ceph_cap_string(drop), 3935 ceph_cap_string(unless)); 3936 3937 /* only drop unused, clean caps */ 3938 drop &= ~(used | dirty); 3939 3940 cap = __get_cap_for_mds(ci, mds); 3941 if (cap && __cap_is_valid(cap)) { 3942 if (force || 3943 ((cap->issued & drop) && 3944 (cap->issued & unless) == 0)) { 3945 if ((cap->issued & drop) && 3946 (cap->issued & unless) == 0) { 3947 int wanted = __ceph_caps_wanted(ci); 3948 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0) 3949 wanted |= cap->mds_wanted; 3950 dout("encode_inode_release %p cap %p " 3951 "%s -> %s, wanted %s -> %s\n", inode, cap, 3952 ceph_cap_string(cap->issued), 3953 ceph_cap_string(cap->issued & ~drop), 3954 ceph_cap_string(cap->mds_wanted), 3955 ceph_cap_string(wanted)); 3956 3957 cap->issued &= ~drop; 3958 cap->implemented &= ~drop; 3959 cap->mds_wanted = wanted; 3960 } else { 3961 dout("encode_inode_release %p cap %p %s" 3962 " (force)\n", inode, cap, 3963 ceph_cap_string(cap->issued)); 3964 } 3965 3966 rel->ino = cpu_to_le64(ceph_ino(inode)); 3967 rel->cap_id = cpu_to_le64(cap->cap_id); 3968 rel->seq = cpu_to_le32(cap->seq); 3969 rel->issue_seq = cpu_to_le32(cap->issue_seq); 3970 rel->mseq = cpu_to_le32(cap->mseq); 3971 rel->caps = cpu_to_le32(cap->implemented); 3972 rel->wanted = cpu_to_le32(cap->mds_wanted); 3973 rel->dname_len = 0; 3974 rel->dname_seq = 0; 3975 *p += sizeof(*rel); 3976 ret = 1; 3977 } else { 3978 dout("encode_inode_release %p cap %p %s\n", 3979 inode, cap, ceph_cap_string(cap->issued)); 3980 } 3981 } 3982 spin_unlock(&ci->i_ceph_lock); 3983 return ret; 3984 } 3985 3986 int ceph_encode_dentry_release(void **p, struct dentry *dentry, 3987 struct inode *dir, 3988 int mds, int drop, int unless) 3989 { 3990 struct dentry *parent = NULL; 3991 struct ceph_mds_request_release *rel = *p; 3992 struct ceph_dentry_info *di = ceph_dentry(dentry); 3993 int force = 0; 3994 int ret; 3995 3996 /* 3997 * force an record for the directory caps if we have a dentry lease. 3998 * this is racy (can't take i_ceph_lock and d_lock together), but it 3999 * doesn't have to be perfect; the mds will revoke anything we don't 4000 * release. 4001 */ 4002 spin_lock(&dentry->d_lock); 4003 if (di->lease_session && di->lease_session->s_mds == mds) 4004 force = 1; 4005 if (!dir) { 4006 parent = dget(dentry->d_parent); 4007 dir = d_inode(parent); 4008 } 4009 spin_unlock(&dentry->d_lock); 4010 4011 ret = ceph_encode_inode_release(p, dir, mds, drop, unless, force); 4012 dput(parent); 4013 4014 spin_lock(&dentry->d_lock); 4015 if (ret && di->lease_session && di->lease_session->s_mds == mds) { 4016 dout("encode_dentry_release %p mds%d seq %d\n", 4017 dentry, mds, (int)di->lease_seq); 4018 rel->dname_len = cpu_to_le32(dentry->d_name.len); 4019 memcpy(*p, dentry->d_name.name, dentry->d_name.len); 4020 *p += dentry->d_name.len; 4021 rel->dname_seq = cpu_to_le32(di->lease_seq); 4022 __ceph_mdsc_drop_dentry_lease(dentry); 4023 } 4024 spin_unlock(&dentry->d_lock); 4025 return ret; 4026 } 4027