1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/fs.h> 4 #include <linux/wait.h> 5 #include <linux/slab.h> 6 #include <linux/gfp.h> 7 #include <linux/sched.h> 8 #include <linux/debugfs.h> 9 #include <linux/seq_file.h> 10 #include <linux/utsname.h> 11 #include <linux/ratelimit.h> 12 13 #include "super.h" 14 #include "mds_client.h" 15 16 #include <linux/ceph/ceph_features.h> 17 #include <linux/ceph/messenger.h> 18 #include <linux/ceph/decode.h> 19 #include <linux/ceph/pagelist.h> 20 #include <linux/ceph/auth.h> 21 #include <linux/ceph/debugfs.h> 22 23 /* 24 * A cluster of MDS (metadata server) daemons is responsible for 25 * managing the file system namespace (the directory hierarchy and 26 * inodes) and for coordinating shared access to storage. Metadata is 27 * partitioning hierarchically across a number of servers, and that 28 * partition varies over time as the cluster adjusts the distribution 29 * in order to balance load. 30 * 31 * The MDS client is primarily responsible to managing synchronous 32 * metadata requests for operations like open, unlink, and so forth. 33 * If there is a MDS failure, we find out about it when we (possibly 34 * request and) receive a new MDS map, and can resubmit affected 35 * requests. 36 * 37 * For the most part, though, we take advantage of a lossless 38 * communications channel to the MDS, and do not need to worry about 39 * timing out or resubmitting requests. 40 * 41 * We maintain a stateful "session" with each MDS we interact with. 42 * Within each session, we sent periodic heartbeat messages to ensure 43 * any capabilities or leases we have been issues remain valid. If 44 * the session times out and goes stale, our leases and capabilities 45 * are no longer valid. 46 */ 47 48 struct ceph_reconnect_state { 49 int nr_caps; 50 struct ceph_pagelist *pagelist; 51 bool flock; 52 }; 53 54 static void __wake_requests(struct ceph_mds_client *mdsc, 55 struct list_head *head); 56 57 static const struct ceph_connection_operations mds_con_ops; 58 59 60 /* 61 * mds reply parsing 62 */ 63 64 /* 65 * parse individual inode info 66 */ 67 static int parse_reply_info_in(void **p, void *end, 68 struct ceph_mds_reply_info_in *info, 69 u64 features) 70 { 71 int err = -EIO; 72 73 info->in = *p; 74 *p += sizeof(struct ceph_mds_reply_inode) + 75 sizeof(*info->in->fragtree.splits) * 76 le32_to_cpu(info->in->fragtree.nsplits); 77 78 ceph_decode_32_safe(p, end, info->symlink_len, bad); 79 ceph_decode_need(p, end, info->symlink_len, bad); 80 info->symlink = *p; 81 *p += info->symlink_len; 82 83 if (features & CEPH_FEATURE_DIRLAYOUTHASH) 84 ceph_decode_copy_safe(p, end, &info->dir_layout, 85 sizeof(info->dir_layout), bad); 86 else 87 memset(&info->dir_layout, 0, sizeof(info->dir_layout)); 88 89 ceph_decode_32_safe(p, end, info->xattr_len, bad); 90 ceph_decode_need(p, end, info->xattr_len, bad); 91 info->xattr_data = *p; 92 *p += info->xattr_len; 93 94 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 95 ceph_decode_64_safe(p, end, info->inline_version, bad); 96 ceph_decode_32_safe(p, end, info->inline_len, bad); 97 ceph_decode_need(p, end, info->inline_len, bad); 98 info->inline_data = *p; 99 *p += info->inline_len; 100 } else 101 info->inline_version = CEPH_INLINE_NONE; 102 103 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 104 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 105 ceph_decode_need(p, end, info->pool_ns_len, bad); 106 *p += info->pool_ns_len; 107 } else { 108 info->pool_ns_len = 0; 109 } 110 111 return 0; 112 bad: 113 return err; 114 } 115 116 /* 117 * parse a normal reply, which may contain a (dir+)dentry and/or a 118 * target inode. 119 */ 120 static int parse_reply_info_trace(void **p, void *end, 121 struct ceph_mds_reply_info_parsed *info, 122 u64 features) 123 { 124 int err; 125 126 if (info->head->is_dentry) { 127 err = parse_reply_info_in(p, end, &info->diri, features); 128 if (err < 0) 129 goto out_bad; 130 131 if (unlikely(*p + sizeof(*info->dirfrag) > end)) 132 goto bad; 133 info->dirfrag = *p; 134 *p += sizeof(*info->dirfrag) + 135 sizeof(u32)*le32_to_cpu(info->dirfrag->ndist); 136 if (unlikely(*p > end)) 137 goto bad; 138 139 ceph_decode_32_safe(p, end, info->dname_len, bad); 140 ceph_decode_need(p, end, info->dname_len, bad); 141 info->dname = *p; 142 *p += info->dname_len; 143 info->dlease = *p; 144 *p += sizeof(*info->dlease); 145 } 146 147 if (info->head->is_target) { 148 err = parse_reply_info_in(p, end, &info->targeti, features); 149 if (err < 0) 150 goto out_bad; 151 } 152 153 if (unlikely(*p != end)) 154 goto bad; 155 return 0; 156 157 bad: 158 err = -EIO; 159 out_bad: 160 pr_err("problem parsing mds trace %d\n", err); 161 return err; 162 } 163 164 /* 165 * parse readdir results 166 */ 167 static int parse_reply_info_dir(void **p, void *end, 168 struct ceph_mds_reply_info_parsed *info, 169 u64 features) 170 { 171 u32 num, i = 0; 172 int err; 173 174 info->dir_dir = *p; 175 if (*p + sizeof(*info->dir_dir) > end) 176 goto bad; 177 *p += sizeof(*info->dir_dir) + 178 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist); 179 if (*p > end) 180 goto bad; 181 182 ceph_decode_need(p, end, sizeof(num) + 2, bad); 183 num = ceph_decode_32(p); 184 { 185 u16 flags = ceph_decode_16(p); 186 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 187 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 188 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 189 } 190 if (num == 0) 191 goto done; 192 193 BUG_ON(!info->dir_entries); 194 if ((unsigned long)(info->dir_entries + num) > 195 (unsigned long)info->dir_entries + info->dir_buf_size) { 196 pr_err("dir contents are larger than expected\n"); 197 WARN_ON(1); 198 goto bad; 199 } 200 201 info->dir_nr = num; 202 while (num) { 203 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 204 /* dentry */ 205 ceph_decode_need(p, end, sizeof(u32)*2, bad); 206 rde->name_len = ceph_decode_32(p); 207 ceph_decode_need(p, end, rde->name_len, bad); 208 rde->name = *p; 209 *p += rde->name_len; 210 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 211 rde->lease = *p; 212 *p += sizeof(struct ceph_mds_reply_lease); 213 214 /* inode */ 215 err = parse_reply_info_in(p, end, &rde->inode, features); 216 if (err < 0) 217 goto out_bad; 218 /* ceph_readdir_prepopulate() will update it */ 219 rde->offset = 0; 220 i++; 221 num--; 222 } 223 224 done: 225 if (*p != end) 226 goto bad; 227 return 0; 228 229 bad: 230 err = -EIO; 231 out_bad: 232 pr_err("problem parsing dir contents %d\n", err); 233 return err; 234 } 235 236 /* 237 * parse fcntl F_GETLK results 238 */ 239 static int parse_reply_info_filelock(void **p, void *end, 240 struct ceph_mds_reply_info_parsed *info, 241 u64 features) 242 { 243 if (*p + sizeof(*info->filelock_reply) > end) 244 goto bad; 245 246 info->filelock_reply = *p; 247 *p += sizeof(*info->filelock_reply); 248 249 if (unlikely(*p != end)) 250 goto bad; 251 return 0; 252 253 bad: 254 return -EIO; 255 } 256 257 /* 258 * parse create results 259 */ 260 static int parse_reply_info_create(void **p, void *end, 261 struct ceph_mds_reply_info_parsed *info, 262 u64 features) 263 { 264 if (features & CEPH_FEATURE_REPLY_CREATE_INODE) { 265 if (*p == end) { 266 info->has_create_ino = false; 267 } else { 268 info->has_create_ino = true; 269 info->ino = ceph_decode_64(p); 270 } 271 } 272 273 if (unlikely(*p != end)) 274 goto bad; 275 return 0; 276 277 bad: 278 return -EIO; 279 } 280 281 /* 282 * parse extra results 283 */ 284 static int parse_reply_info_extra(void **p, void *end, 285 struct ceph_mds_reply_info_parsed *info, 286 u64 features) 287 { 288 if (info->head->op == CEPH_MDS_OP_GETFILELOCK) 289 return parse_reply_info_filelock(p, end, info, features); 290 else if (info->head->op == CEPH_MDS_OP_READDIR || 291 info->head->op == CEPH_MDS_OP_LSSNAP) 292 return parse_reply_info_dir(p, end, info, features); 293 else if (info->head->op == CEPH_MDS_OP_CREATE) 294 return parse_reply_info_create(p, end, info, features); 295 else 296 return -EIO; 297 } 298 299 /* 300 * parse entire mds reply 301 */ 302 static int parse_reply_info(struct ceph_msg *msg, 303 struct ceph_mds_reply_info_parsed *info, 304 u64 features) 305 { 306 void *p, *end; 307 u32 len; 308 int err; 309 310 info->head = msg->front.iov_base; 311 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 312 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 313 314 /* trace */ 315 ceph_decode_32_safe(&p, end, len, bad); 316 if (len > 0) { 317 ceph_decode_need(&p, end, len, bad); 318 err = parse_reply_info_trace(&p, p+len, info, features); 319 if (err < 0) 320 goto out_bad; 321 } 322 323 /* extra */ 324 ceph_decode_32_safe(&p, end, len, bad); 325 if (len > 0) { 326 ceph_decode_need(&p, end, len, bad); 327 err = parse_reply_info_extra(&p, p+len, info, features); 328 if (err < 0) 329 goto out_bad; 330 } 331 332 /* snap blob */ 333 ceph_decode_32_safe(&p, end, len, bad); 334 info->snapblob_len = len; 335 info->snapblob = p; 336 p += len; 337 338 if (p != end) 339 goto bad; 340 return 0; 341 342 bad: 343 err = -EIO; 344 out_bad: 345 pr_err("mds parse_reply err %d\n", err); 346 return err; 347 } 348 349 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 350 { 351 if (!info->dir_entries) 352 return; 353 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 354 } 355 356 357 /* 358 * sessions 359 */ 360 const char *ceph_session_state_name(int s) 361 { 362 switch (s) { 363 case CEPH_MDS_SESSION_NEW: return "new"; 364 case CEPH_MDS_SESSION_OPENING: return "opening"; 365 case CEPH_MDS_SESSION_OPEN: return "open"; 366 case CEPH_MDS_SESSION_HUNG: return "hung"; 367 case CEPH_MDS_SESSION_CLOSING: return "closing"; 368 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 369 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 370 default: return "???"; 371 } 372 } 373 374 static struct ceph_mds_session *get_session(struct ceph_mds_session *s) 375 { 376 if (atomic_inc_not_zero(&s->s_ref)) { 377 dout("mdsc get_session %p %d -> %d\n", s, 378 atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref)); 379 return s; 380 } else { 381 dout("mdsc get_session %p 0 -- FAIL", s); 382 return NULL; 383 } 384 } 385 386 void ceph_put_mds_session(struct ceph_mds_session *s) 387 { 388 dout("mdsc put_session %p %d -> %d\n", s, 389 atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1); 390 if (atomic_dec_and_test(&s->s_ref)) { 391 if (s->s_auth.authorizer) 392 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 393 kfree(s); 394 } 395 } 396 397 /* 398 * called under mdsc->mutex 399 */ 400 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 401 int mds) 402 { 403 struct ceph_mds_session *session; 404 405 if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL) 406 return NULL; 407 session = mdsc->sessions[mds]; 408 dout("lookup_mds_session %p %d\n", session, 409 atomic_read(&session->s_ref)); 410 get_session(session); 411 return session; 412 } 413 414 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 415 { 416 if (mds >= mdsc->max_sessions) 417 return false; 418 return mdsc->sessions[mds]; 419 } 420 421 static int __verify_registered_session(struct ceph_mds_client *mdsc, 422 struct ceph_mds_session *s) 423 { 424 if (s->s_mds >= mdsc->max_sessions || 425 mdsc->sessions[s->s_mds] != s) 426 return -ENOENT; 427 return 0; 428 } 429 430 /* 431 * create+register a new session for given mds. 432 * called under mdsc->mutex. 433 */ 434 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 435 int mds) 436 { 437 struct ceph_mds_session *s; 438 439 if (mds >= mdsc->mdsmap->m_max_mds) 440 return ERR_PTR(-EINVAL); 441 442 s = kzalloc(sizeof(*s), GFP_NOFS); 443 if (!s) 444 return ERR_PTR(-ENOMEM); 445 s->s_mdsc = mdsc; 446 s->s_mds = mds; 447 s->s_state = CEPH_MDS_SESSION_NEW; 448 s->s_ttl = 0; 449 s->s_seq = 0; 450 mutex_init(&s->s_mutex); 451 452 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 453 454 spin_lock_init(&s->s_gen_ttl_lock); 455 s->s_cap_gen = 0; 456 s->s_cap_ttl = jiffies - 1; 457 458 spin_lock_init(&s->s_cap_lock); 459 s->s_renew_requested = 0; 460 s->s_renew_seq = 0; 461 INIT_LIST_HEAD(&s->s_caps); 462 s->s_nr_caps = 0; 463 s->s_trim_caps = 0; 464 atomic_set(&s->s_ref, 1); 465 INIT_LIST_HEAD(&s->s_waiting); 466 INIT_LIST_HEAD(&s->s_unsafe); 467 s->s_num_cap_releases = 0; 468 s->s_cap_reconnect = 0; 469 s->s_cap_iterator = NULL; 470 INIT_LIST_HEAD(&s->s_cap_releases); 471 INIT_LIST_HEAD(&s->s_cap_flushing); 472 INIT_LIST_HEAD(&s->s_cap_snaps_flushing); 473 474 dout("register_session mds%d\n", mds); 475 if (mds >= mdsc->max_sessions) { 476 int newmax = 1 << get_count_order(mds+1); 477 struct ceph_mds_session **sa; 478 479 dout("register_session realloc to %d\n", newmax); 480 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 481 if (sa == NULL) 482 goto fail_realloc; 483 if (mdsc->sessions) { 484 memcpy(sa, mdsc->sessions, 485 mdsc->max_sessions * sizeof(void *)); 486 kfree(mdsc->sessions); 487 } 488 mdsc->sessions = sa; 489 mdsc->max_sessions = newmax; 490 } 491 mdsc->sessions[mds] = s; 492 atomic_inc(&mdsc->num_sessions); 493 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 494 495 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 496 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 497 498 return s; 499 500 fail_realloc: 501 kfree(s); 502 return ERR_PTR(-ENOMEM); 503 } 504 505 /* 506 * called under mdsc->mutex 507 */ 508 static void __unregister_session(struct ceph_mds_client *mdsc, 509 struct ceph_mds_session *s) 510 { 511 dout("__unregister_session mds%d %p\n", s->s_mds, s); 512 BUG_ON(mdsc->sessions[s->s_mds] != s); 513 mdsc->sessions[s->s_mds] = NULL; 514 ceph_con_close(&s->s_con); 515 ceph_put_mds_session(s); 516 atomic_dec(&mdsc->num_sessions); 517 } 518 519 /* 520 * drop session refs in request. 521 * 522 * should be last request ref, or hold mdsc->mutex 523 */ 524 static void put_request_session(struct ceph_mds_request *req) 525 { 526 if (req->r_session) { 527 ceph_put_mds_session(req->r_session); 528 req->r_session = NULL; 529 } 530 } 531 532 void ceph_mdsc_release_request(struct kref *kref) 533 { 534 struct ceph_mds_request *req = container_of(kref, 535 struct ceph_mds_request, 536 r_kref); 537 destroy_reply_info(&req->r_reply_info); 538 if (req->r_request) 539 ceph_msg_put(req->r_request); 540 if (req->r_reply) 541 ceph_msg_put(req->r_reply); 542 if (req->r_inode) { 543 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 544 iput(req->r_inode); 545 } 546 if (req->r_locked_dir) 547 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 548 iput(req->r_target_inode); 549 if (req->r_dentry) 550 dput(req->r_dentry); 551 if (req->r_old_dentry) 552 dput(req->r_old_dentry); 553 if (req->r_old_dentry_dir) { 554 /* 555 * track (and drop pins for) r_old_dentry_dir 556 * separately, since r_old_dentry's d_parent may have 557 * changed between the dir mutex being dropped and 558 * this request being freed. 559 */ 560 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 561 CEPH_CAP_PIN); 562 iput(req->r_old_dentry_dir); 563 } 564 kfree(req->r_path1); 565 kfree(req->r_path2); 566 if (req->r_pagelist) 567 ceph_pagelist_release(req->r_pagelist); 568 put_request_session(req); 569 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 570 kfree(req); 571 } 572 573 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 574 575 /* 576 * lookup session, bump ref if found. 577 * 578 * called under mdsc->mutex. 579 */ 580 static struct ceph_mds_request * 581 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 582 { 583 struct ceph_mds_request *req; 584 585 req = lookup_request(&mdsc->request_tree, tid); 586 if (req) 587 ceph_mdsc_get_request(req); 588 589 return req; 590 } 591 592 /* 593 * Register an in-flight request, and assign a tid. Link to directory 594 * are modifying (if any). 595 * 596 * Called under mdsc->mutex. 597 */ 598 static void __register_request(struct ceph_mds_client *mdsc, 599 struct ceph_mds_request *req, 600 struct inode *dir) 601 { 602 req->r_tid = ++mdsc->last_tid; 603 if (req->r_num_caps) 604 ceph_reserve_caps(mdsc, &req->r_caps_reservation, 605 req->r_num_caps); 606 dout("__register_request %p tid %lld\n", req, req->r_tid); 607 ceph_mdsc_get_request(req); 608 insert_request(&mdsc->request_tree, req); 609 610 req->r_uid = current_fsuid(); 611 req->r_gid = current_fsgid(); 612 613 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 614 mdsc->oldest_tid = req->r_tid; 615 616 if (dir) { 617 ihold(dir); 618 req->r_unsafe_dir = dir; 619 } 620 } 621 622 static void __unregister_request(struct ceph_mds_client *mdsc, 623 struct ceph_mds_request *req) 624 { 625 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 626 627 if (req->r_tid == mdsc->oldest_tid) { 628 struct rb_node *p = rb_next(&req->r_node); 629 mdsc->oldest_tid = 0; 630 while (p) { 631 struct ceph_mds_request *next_req = 632 rb_entry(p, struct ceph_mds_request, r_node); 633 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 634 mdsc->oldest_tid = next_req->r_tid; 635 break; 636 } 637 p = rb_next(p); 638 } 639 } 640 641 erase_request(&mdsc->request_tree, req); 642 643 if (req->r_unsafe_dir && req->r_got_unsafe) { 644 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 645 spin_lock(&ci->i_unsafe_lock); 646 list_del_init(&req->r_unsafe_dir_item); 647 spin_unlock(&ci->i_unsafe_lock); 648 } 649 if (req->r_target_inode && req->r_got_unsafe) { 650 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 651 spin_lock(&ci->i_unsafe_lock); 652 list_del_init(&req->r_unsafe_target_item); 653 spin_unlock(&ci->i_unsafe_lock); 654 } 655 656 if (req->r_unsafe_dir) { 657 iput(req->r_unsafe_dir); 658 req->r_unsafe_dir = NULL; 659 } 660 661 complete_all(&req->r_safe_completion); 662 663 ceph_mdsc_put_request(req); 664 } 665 666 /* 667 * Choose mds to send request to next. If there is a hint set in the 668 * request (e.g., due to a prior forward hint from the mds), use that. 669 * Otherwise, consult frag tree and/or caps to identify the 670 * appropriate mds. If all else fails, choose randomly. 671 * 672 * Called under mdsc->mutex. 673 */ 674 static struct dentry *get_nonsnap_parent(struct dentry *dentry) 675 { 676 /* 677 * we don't need to worry about protecting the d_parent access 678 * here because we never renaming inside the snapped namespace 679 * except to resplice to another snapdir, and either the old or new 680 * result is a valid result. 681 */ 682 while (!IS_ROOT(dentry) && ceph_snap(d_inode(dentry)) != CEPH_NOSNAP) 683 dentry = dentry->d_parent; 684 return dentry; 685 } 686 687 static int __choose_mds(struct ceph_mds_client *mdsc, 688 struct ceph_mds_request *req) 689 { 690 struct inode *inode; 691 struct ceph_inode_info *ci; 692 struct ceph_cap *cap; 693 int mode = req->r_direct_mode; 694 int mds = -1; 695 u32 hash = req->r_direct_hash; 696 bool is_hash = req->r_direct_is_hash; 697 698 /* 699 * is there a specific mds we should try? ignore hint if we have 700 * no session and the mds is not up (active or recovering). 701 */ 702 if (req->r_resend_mds >= 0 && 703 (__have_session(mdsc, req->r_resend_mds) || 704 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 705 dout("choose_mds using resend_mds mds%d\n", 706 req->r_resend_mds); 707 return req->r_resend_mds; 708 } 709 710 if (mode == USE_RANDOM_MDS) 711 goto random; 712 713 inode = NULL; 714 if (req->r_inode) { 715 inode = req->r_inode; 716 } else if (req->r_dentry) { 717 /* ignore race with rename; old or new d_parent is okay */ 718 struct dentry *parent = req->r_dentry->d_parent; 719 struct inode *dir = d_inode(parent); 720 721 if (dir->i_sb != mdsc->fsc->sb) { 722 /* not this fs! */ 723 inode = d_inode(req->r_dentry); 724 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 725 /* direct snapped/virtual snapdir requests 726 * based on parent dir inode */ 727 struct dentry *dn = get_nonsnap_parent(parent); 728 inode = d_inode(dn); 729 dout("__choose_mds using nonsnap parent %p\n", inode); 730 } else { 731 /* dentry target */ 732 inode = d_inode(req->r_dentry); 733 if (!inode || mode == USE_AUTH_MDS) { 734 /* dir + name */ 735 inode = dir; 736 hash = ceph_dentry_hash(dir, req->r_dentry); 737 is_hash = true; 738 } 739 } 740 } 741 742 dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash, 743 (int)hash, mode); 744 if (!inode) 745 goto random; 746 ci = ceph_inode(inode); 747 748 if (is_hash && S_ISDIR(inode->i_mode)) { 749 struct ceph_inode_frag frag; 750 int found; 751 752 ceph_choose_frag(ci, hash, &frag, &found); 753 if (found) { 754 if (mode == USE_ANY_MDS && frag.ndist > 0) { 755 u8 r; 756 757 /* choose a random replica */ 758 get_random_bytes(&r, 1); 759 r %= frag.ndist; 760 mds = frag.dist[r]; 761 dout("choose_mds %p %llx.%llx " 762 "frag %u mds%d (%d/%d)\n", 763 inode, ceph_vinop(inode), 764 frag.frag, mds, 765 (int)r, frag.ndist); 766 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 767 CEPH_MDS_STATE_ACTIVE) 768 return mds; 769 } 770 771 /* since this file/dir wasn't known to be 772 * replicated, then we want to look for the 773 * authoritative mds. */ 774 mode = USE_AUTH_MDS; 775 if (frag.mds >= 0) { 776 /* choose auth mds */ 777 mds = frag.mds; 778 dout("choose_mds %p %llx.%llx " 779 "frag %u mds%d (auth)\n", 780 inode, ceph_vinop(inode), frag.frag, mds); 781 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 782 CEPH_MDS_STATE_ACTIVE) 783 return mds; 784 } 785 } 786 } 787 788 spin_lock(&ci->i_ceph_lock); 789 cap = NULL; 790 if (mode == USE_AUTH_MDS) 791 cap = ci->i_auth_cap; 792 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 793 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 794 if (!cap) { 795 spin_unlock(&ci->i_ceph_lock); 796 goto random; 797 } 798 mds = cap->session->s_mds; 799 dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n", 800 inode, ceph_vinop(inode), mds, 801 cap == ci->i_auth_cap ? "auth " : "", cap); 802 spin_unlock(&ci->i_ceph_lock); 803 return mds; 804 805 random: 806 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 807 dout("choose_mds chose random mds%d\n", mds); 808 return mds; 809 } 810 811 812 /* 813 * session messages 814 */ 815 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 816 { 817 struct ceph_msg *msg; 818 struct ceph_mds_session_head *h; 819 820 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 821 false); 822 if (!msg) { 823 pr_err("create_session_msg ENOMEM creating msg\n"); 824 return NULL; 825 } 826 h = msg->front.iov_base; 827 h->op = cpu_to_le32(op); 828 h->seq = cpu_to_le64(seq); 829 830 return msg; 831 } 832 833 /* 834 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 835 * to include additional client metadata fields. 836 */ 837 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 838 { 839 struct ceph_msg *msg; 840 struct ceph_mds_session_head *h; 841 int i = -1; 842 int metadata_bytes = 0; 843 int metadata_key_count = 0; 844 struct ceph_options *opt = mdsc->fsc->client->options; 845 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 846 void *p; 847 848 const char* metadata[][2] = { 849 {"hostname", utsname()->nodename}, 850 {"kernel_version", utsname()->release}, 851 {"entity_id", opt->name ? : ""}, 852 {"root", fsopt->server_path ? : "/"}, 853 {NULL, NULL} 854 }; 855 856 /* Calculate serialized length of metadata */ 857 metadata_bytes = 4; /* map length */ 858 for (i = 0; metadata[i][0] != NULL; ++i) { 859 metadata_bytes += 8 + strlen(metadata[i][0]) + 860 strlen(metadata[i][1]); 861 metadata_key_count++; 862 } 863 864 /* Allocate the message */ 865 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + metadata_bytes, 866 GFP_NOFS, false); 867 if (!msg) { 868 pr_err("create_session_msg ENOMEM creating msg\n"); 869 return NULL; 870 } 871 h = msg->front.iov_base; 872 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 873 h->seq = cpu_to_le64(seq); 874 875 /* 876 * Serialize client metadata into waiting buffer space, using 877 * the format that userspace expects for map<string, string> 878 * 879 * ClientSession messages with metadata are v2 880 */ 881 msg->hdr.version = cpu_to_le16(2); 882 msg->hdr.compat_version = cpu_to_le16(1); 883 884 /* The write pointer, following the session_head structure */ 885 p = msg->front.iov_base + sizeof(*h); 886 887 /* Number of entries in the map */ 888 ceph_encode_32(&p, metadata_key_count); 889 890 /* Two length-prefixed strings for each entry in the map */ 891 for (i = 0; metadata[i][0] != NULL; ++i) { 892 size_t const key_len = strlen(metadata[i][0]); 893 size_t const val_len = strlen(metadata[i][1]); 894 895 ceph_encode_32(&p, key_len); 896 memcpy(p, metadata[i][0], key_len); 897 p += key_len; 898 ceph_encode_32(&p, val_len); 899 memcpy(p, metadata[i][1], val_len); 900 p += val_len; 901 } 902 903 return msg; 904 } 905 906 /* 907 * send session open request. 908 * 909 * called under mdsc->mutex 910 */ 911 static int __open_session(struct ceph_mds_client *mdsc, 912 struct ceph_mds_session *session) 913 { 914 struct ceph_msg *msg; 915 int mstate; 916 int mds = session->s_mds; 917 918 /* wait for mds to go active? */ 919 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 920 dout("open_session to mds%d (%s)\n", mds, 921 ceph_mds_state_name(mstate)); 922 session->s_state = CEPH_MDS_SESSION_OPENING; 923 session->s_renew_requested = jiffies; 924 925 /* send connect message */ 926 msg = create_session_open_msg(mdsc, session->s_seq); 927 if (!msg) 928 return -ENOMEM; 929 ceph_con_send(&session->s_con, msg); 930 return 0; 931 } 932 933 /* 934 * open sessions for any export targets for the given mds 935 * 936 * called under mdsc->mutex 937 */ 938 static struct ceph_mds_session * 939 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 940 { 941 struct ceph_mds_session *session; 942 943 session = __ceph_lookup_mds_session(mdsc, target); 944 if (!session) { 945 session = register_session(mdsc, target); 946 if (IS_ERR(session)) 947 return session; 948 } 949 if (session->s_state == CEPH_MDS_SESSION_NEW || 950 session->s_state == CEPH_MDS_SESSION_CLOSING) 951 __open_session(mdsc, session); 952 953 return session; 954 } 955 956 struct ceph_mds_session * 957 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 958 { 959 struct ceph_mds_session *session; 960 961 dout("open_export_target_session to mds%d\n", target); 962 963 mutex_lock(&mdsc->mutex); 964 session = __open_export_target_session(mdsc, target); 965 mutex_unlock(&mdsc->mutex); 966 967 return session; 968 } 969 970 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 971 struct ceph_mds_session *session) 972 { 973 struct ceph_mds_info *mi; 974 struct ceph_mds_session *ts; 975 int i, mds = session->s_mds; 976 977 if (mds >= mdsc->mdsmap->m_max_mds) 978 return; 979 980 mi = &mdsc->mdsmap->m_info[mds]; 981 dout("open_export_target_sessions for mds%d (%d targets)\n", 982 session->s_mds, mi->num_export_targets); 983 984 for (i = 0; i < mi->num_export_targets; i++) { 985 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 986 if (!IS_ERR(ts)) 987 ceph_put_mds_session(ts); 988 } 989 } 990 991 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 992 struct ceph_mds_session *session) 993 { 994 mutex_lock(&mdsc->mutex); 995 __open_export_target_sessions(mdsc, session); 996 mutex_unlock(&mdsc->mutex); 997 } 998 999 /* 1000 * session caps 1001 */ 1002 1003 /* caller holds s_cap_lock, we drop it */ 1004 static void cleanup_cap_releases(struct ceph_mds_client *mdsc, 1005 struct ceph_mds_session *session) 1006 __releases(session->s_cap_lock) 1007 { 1008 LIST_HEAD(tmp_list); 1009 list_splice_init(&session->s_cap_releases, &tmp_list); 1010 session->s_num_cap_releases = 0; 1011 spin_unlock(&session->s_cap_lock); 1012 1013 dout("cleanup_cap_releases mds%d\n", session->s_mds); 1014 while (!list_empty(&tmp_list)) { 1015 struct ceph_cap *cap; 1016 /* zero out the in-progress message */ 1017 cap = list_first_entry(&tmp_list, 1018 struct ceph_cap, session_caps); 1019 list_del(&cap->session_caps); 1020 ceph_put_cap(mdsc, cap); 1021 } 1022 } 1023 1024 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1025 struct ceph_mds_session *session) 1026 { 1027 struct ceph_mds_request *req; 1028 struct rb_node *p; 1029 1030 dout("cleanup_session_requests mds%d\n", session->s_mds); 1031 mutex_lock(&mdsc->mutex); 1032 while (!list_empty(&session->s_unsafe)) { 1033 req = list_first_entry(&session->s_unsafe, 1034 struct ceph_mds_request, r_unsafe_item); 1035 list_del_init(&req->r_unsafe_item); 1036 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1037 req->r_tid); 1038 __unregister_request(mdsc, req); 1039 } 1040 /* zero r_attempts, so kick_requests() will re-send requests */ 1041 p = rb_first(&mdsc->request_tree); 1042 while (p) { 1043 req = rb_entry(p, struct ceph_mds_request, r_node); 1044 p = rb_next(p); 1045 if (req->r_session && 1046 req->r_session->s_mds == session->s_mds) 1047 req->r_attempts = 0; 1048 } 1049 mutex_unlock(&mdsc->mutex); 1050 } 1051 1052 /* 1053 * Helper to safely iterate over all caps associated with a session, with 1054 * special care taken to handle a racing __ceph_remove_cap(). 1055 * 1056 * Caller must hold session s_mutex. 1057 */ 1058 static int iterate_session_caps(struct ceph_mds_session *session, 1059 int (*cb)(struct inode *, struct ceph_cap *, 1060 void *), void *arg) 1061 { 1062 struct list_head *p; 1063 struct ceph_cap *cap; 1064 struct inode *inode, *last_inode = NULL; 1065 struct ceph_cap *old_cap = NULL; 1066 int ret; 1067 1068 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1069 spin_lock(&session->s_cap_lock); 1070 p = session->s_caps.next; 1071 while (p != &session->s_caps) { 1072 cap = list_entry(p, struct ceph_cap, session_caps); 1073 inode = igrab(&cap->ci->vfs_inode); 1074 if (!inode) { 1075 p = p->next; 1076 continue; 1077 } 1078 session->s_cap_iterator = cap; 1079 spin_unlock(&session->s_cap_lock); 1080 1081 if (last_inode) { 1082 iput(last_inode); 1083 last_inode = NULL; 1084 } 1085 if (old_cap) { 1086 ceph_put_cap(session->s_mdsc, old_cap); 1087 old_cap = NULL; 1088 } 1089 1090 ret = cb(inode, cap, arg); 1091 last_inode = inode; 1092 1093 spin_lock(&session->s_cap_lock); 1094 p = p->next; 1095 if (cap->ci == NULL) { 1096 dout("iterate_session_caps finishing cap %p removal\n", 1097 cap); 1098 BUG_ON(cap->session != session); 1099 cap->session = NULL; 1100 list_del_init(&cap->session_caps); 1101 session->s_nr_caps--; 1102 if (cap->queue_release) { 1103 list_add_tail(&cap->session_caps, 1104 &session->s_cap_releases); 1105 session->s_num_cap_releases++; 1106 } else { 1107 old_cap = cap; /* put_cap it w/o locks held */ 1108 } 1109 } 1110 if (ret < 0) 1111 goto out; 1112 } 1113 ret = 0; 1114 out: 1115 session->s_cap_iterator = NULL; 1116 spin_unlock(&session->s_cap_lock); 1117 1118 iput(last_inode); 1119 if (old_cap) 1120 ceph_put_cap(session->s_mdsc, old_cap); 1121 1122 return ret; 1123 } 1124 1125 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1126 void *arg) 1127 { 1128 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg; 1129 struct ceph_inode_info *ci = ceph_inode(inode); 1130 LIST_HEAD(to_remove); 1131 bool drop = false; 1132 bool invalidate = false; 1133 1134 dout("removing cap %p, ci is %p, inode is %p\n", 1135 cap, ci, &ci->vfs_inode); 1136 spin_lock(&ci->i_ceph_lock); 1137 __ceph_remove_cap(cap, false); 1138 if (!ci->i_auth_cap) { 1139 struct ceph_cap_flush *cf; 1140 struct ceph_mds_client *mdsc = fsc->mdsc; 1141 1142 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED; 1143 1144 if (ci->i_wrbuffer_ref > 0 && 1145 ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 1146 invalidate = true; 1147 1148 while (true) { 1149 struct rb_node *n = rb_first(&ci->i_cap_flush_tree); 1150 if (!n) 1151 break; 1152 cf = rb_entry(n, struct ceph_cap_flush, i_node); 1153 rb_erase(&cf->i_node, &ci->i_cap_flush_tree); 1154 list_add(&cf->list, &to_remove); 1155 } 1156 1157 spin_lock(&mdsc->cap_dirty_lock); 1158 1159 list_for_each_entry(cf, &to_remove, list) 1160 rb_erase(&cf->g_node, &mdsc->cap_flush_tree); 1161 1162 if (!list_empty(&ci->i_dirty_item)) { 1163 pr_warn_ratelimited( 1164 " dropping dirty %s state for %p %lld\n", 1165 ceph_cap_string(ci->i_dirty_caps), 1166 inode, ceph_ino(inode)); 1167 ci->i_dirty_caps = 0; 1168 list_del_init(&ci->i_dirty_item); 1169 drop = true; 1170 } 1171 if (!list_empty(&ci->i_flushing_item)) { 1172 pr_warn_ratelimited( 1173 " dropping dirty+flushing %s state for %p %lld\n", 1174 ceph_cap_string(ci->i_flushing_caps), 1175 inode, ceph_ino(inode)); 1176 ci->i_flushing_caps = 0; 1177 list_del_init(&ci->i_flushing_item); 1178 mdsc->num_cap_flushing--; 1179 drop = true; 1180 } 1181 spin_unlock(&mdsc->cap_dirty_lock); 1182 1183 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { 1184 list_add(&ci->i_prealloc_cap_flush->list, &to_remove); 1185 ci->i_prealloc_cap_flush = NULL; 1186 } 1187 } 1188 spin_unlock(&ci->i_ceph_lock); 1189 while (!list_empty(&to_remove)) { 1190 struct ceph_cap_flush *cf; 1191 cf = list_first_entry(&to_remove, 1192 struct ceph_cap_flush, list); 1193 list_del(&cf->list); 1194 ceph_free_cap_flush(cf); 1195 } 1196 1197 wake_up_all(&ci->i_cap_wq); 1198 if (invalidate) 1199 ceph_queue_invalidate(inode); 1200 if (drop) 1201 iput(inode); 1202 return 0; 1203 } 1204 1205 /* 1206 * caller must hold session s_mutex 1207 */ 1208 static void remove_session_caps(struct ceph_mds_session *session) 1209 { 1210 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1211 struct super_block *sb = fsc->sb; 1212 dout("remove_session_caps on %p\n", session); 1213 iterate_session_caps(session, remove_session_caps_cb, fsc); 1214 1215 spin_lock(&session->s_cap_lock); 1216 if (session->s_nr_caps > 0) { 1217 struct inode *inode; 1218 struct ceph_cap *cap, *prev = NULL; 1219 struct ceph_vino vino; 1220 /* 1221 * iterate_session_caps() skips inodes that are being 1222 * deleted, we need to wait until deletions are complete. 1223 * __wait_on_freeing_inode() is designed for the job, 1224 * but it is not exported, so use lookup inode function 1225 * to access it. 1226 */ 1227 while (!list_empty(&session->s_caps)) { 1228 cap = list_entry(session->s_caps.next, 1229 struct ceph_cap, session_caps); 1230 if (cap == prev) 1231 break; 1232 prev = cap; 1233 vino = cap->ci->i_vino; 1234 spin_unlock(&session->s_cap_lock); 1235 1236 inode = ceph_find_inode(sb, vino); 1237 iput(inode); 1238 1239 spin_lock(&session->s_cap_lock); 1240 } 1241 } 1242 1243 // drop cap expires and unlock s_cap_lock 1244 cleanup_cap_releases(session->s_mdsc, session); 1245 1246 BUG_ON(session->s_nr_caps > 0); 1247 BUG_ON(!list_empty(&session->s_cap_flushing)); 1248 } 1249 1250 /* 1251 * wake up any threads waiting on this session's caps. if the cap is 1252 * old (didn't get renewed on the client reconnect), remove it now. 1253 * 1254 * caller must hold s_mutex. 1255 */ 1256 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1257 void *arg) 1258 { 1259 struct ceph_inode_info *ci = ceph_inode(inode); 1260 1261 if (arg) { 1262 spin_lock(&ci->i_ceph_lock); 1263 ci->i_wanted_max_size = 0; 1264 ci->i_requested_max_size = 0; 1265 spin_unlock(&ci->i_ceph_lock); 1266 } 1267 wake_up_all(&ci->i_cap_wq); 1268 return 0; 1269 } 1270 1271 static void wake_up_session_caps(struct ceph_mds_session *session, 1272 int reconnect) 1273 { 1274 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1275 iterate_session_caps(session, wake_up_session_cb, 1276 (void *)(unsigned long)reconnect); 1277 } 1278 1279 /* 1280 * Send periodic message to MDS renewing all currently held caps. The 1281 * ack will reset the expiration for all caps from this session. 1282 * 1283 * caller holds s_mutex 1284 */ 1285 static int send_renew_caps(struct ceph_mds_client *mdsc, 1286 struct ceph_mds_session *session) 1287 { 1288 struct ceph_msg *msg; 1289 int state; 1290 1291 if (time_after_eq(jiffies, session->s_cap_ttl) && 1292 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1293 pr_info("mds%d caps stale\n", session->s_mds); 1294 session->s_renew_requested = jiffies; 1295 1296 /* do not try to renew caps until a recovering mds has reconnected 1297 * with its clients. */ 1298 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1299 if (state < CEPH_MDS_STATE_RECONNECT) { 1300 dout("send_renew_caps ignoring mds%d (%s)\n", 1301 session->s_mds, ceph_mds_state_name(state)); 1302 return 0; 1303 } 1304 1305 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1306 ceph_mds_state_name(state)); 1307 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1308 ++session->s_renew_seq); 1309 if (!msg) 1310 return -ENOMEM; 1311 ceph_con_send(&session->s_con, msg); 1312 return 0; 1313 } 1314 1315 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1316 struct ceph_mds_session *session, u64 seq) 1317 { 1318 struct ceph_msg *msg; 1319 1320 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1321 session->s_mds, ceph_session_state_name(session->s_state), seq); 1322 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1323 if (!msg) 1324 return -ENOMEM; 1325 ceph_con_send(&session->s_con, msg); 1326 return 0; 1327 } 1328 1329 1330 /* 1331 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1332 * 1333 * Called under session->s_mutex 1334 */ 1335 static void renewed_caps(struct ceph_mds_client *mdsc, 1336 struct ceph_mds_session *session, int is_renew) 1337 { 1338 int was_stale; 1339 int wake = 0; 1340 1341 spin_lock(&session->s_cap_lock); 1342 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1343 1344 session->s_cap_ttl = session->s_renew_requested + 1345 mdsc->mdsmap->m_session_timeout*HZ; 1346 1347 if (was_stale) { 1348 if (time_before(jiffies, session->s_cap_ttl)) { 1349 pr_info("mds%d caps renewed\n", session->s_mds); 1350 wake = 1; 1351 } else { 1352 pr_info("mds%d caps still stale\n", session->s_mds); 1353 } 1354 } 1355 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1356 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1357 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1358 spin_unlock(&session->s_cap_lock); 1359 1360 if (wake) 1361 wake_up_session_caps(session, 0); 1362 } 1363 1364 /* 1365 * send a session close request 1366 */ 1367 static int request_close_session(struct ceph_mds_client *mdsc, 1368 struct ceph_mds_session *session) 1369 { 1370 struct ceph_msg *msg; 1371 1372 dout("request_close_session mds%d state %s seq %lld\n", 1373 session->s_mds, ceph_session_state_name(session->s_state), 1374 session->s_seq); 1375 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1376 if (!msg) 1377 return -ENOMEM; 1378 ceph_con_send(&session->s_con, msg); 1379 return 0; 1380 } 1381 1382 /* 1383 * Called with s_mutex held. 1384 */ 1385 static int __close_session(struct ceph_mds_client *mdsc, 1386 struct ceph_mds_session *session) 1387 { 1388 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1389 return 0; 1390 session->s_state = CEPH_MDS_SESSION_CLOSING; 1391 return request_close_session(mdsc, session); 1392 } 1393 1394 /* 1395 * Trim old(er) caps. 1396 * 1397 * Because we can't cache an inode without one or more caps, we do 1398 * this indirectly: if a cap is unused, we prune its aliases, at which 1399 * point the inode will hopefully get dropped to. 1400 * 1401 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1402 * memory pressure from the MDS, though, so it needn't be perfect. 1403 */ 1404 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1405 { 1406 struct ceph_mds_session *session = arg; 1407 struct ceph_inode_info *ci = ceph_inode(inode); 1408 int used, wanted, oissued, mine; 1409 1410 if (session->s_trim_caps <= 0) 1411 return -1; 1412 1413 spin_lock(&ci->i_ceph_lock); 1414 mine = cap->issued | cap->implemented; 1415 used = __ceph_caps_used(ci); 1416 wanted = __ceph_caps_file_wanted(ci); 1417 oissued = __ceph_caps_issued_other(ci, cap); 1418 1419 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1420 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1421 ceph_cap_string(used), ceph_cap_string(wanted)); 1422 if (cap == ci->i_auth_cap) { 1423 if (ci->i_dirty_caps || ci->i_flushing_caps || 1424 !list_empty(&ci->i_cap_snaps)) 1425 goto out; 1426 if ((used | wanted) & CEPH_CAP_ANY_WR) 1427 goto out; 1428 } 1429 /* The inode has cached pages, but it's no longer used. 1430 * we can safely drop it */ 1431 if (wanted == 0 && used == CEPH_CAP_FILE_CACHE && 1432 !(oissued & CEPH_CAP_FILE_CACHE)) { 1433 used = 0; 1434 oissued = 0; 1435 } 1436 if ((used | wanted) & ~oissued & mine) 1437 goto out; /* we need these caps */ 1438 1439 session->s_trim_caps--; 1440 if (oissued) { 1441 /* we aren't the only cap.. just remove us */ 1442 __ceph_remove_cap(cap, true); 1443 } else { 1444 /* try dropping referring dentries */ 1445 spin_unlock(&ci->i_ceph_lock); 1446 d_prune_aliases(inode); 1447 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1448 inode, cap, atomic_read(&inode->i_count)); 1449 return 0; 1450 } 1451 1452 out: 1453 spin_unlock(&ci->i_ceph_lock); 1454 return 0; 1455 } 1456 1457 /* 1458 * Trim session cap count down to some max number. 1459 */ 1460 static int trim_caps(struct ceph_mds_client *mdsc, 1461 struct ceph_mds_session *session, 1462 int max_caps) 1463 { 1464 int trim_caps = session->s_nr_caps - max_caps; 1465 1466 dout("trim_caps mds%d start: %d / %d, trim %d\n", 1467 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 1468 if (trim_caps > 0) { 1469 session->s_trim_caps = trim_caps; 1470 iterate_session_caps(session, trim_caps_cb, session); 1471 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 1472 session->s_mds, session->s_nr_caps, max_caps, 1473 trim_caps - session->s_trim_caps); 1474 session->s_trim_caps = 0; 1475 } 1476 1477 ceph_send_cap_releases(mdsc, session); 1478 return 0; 1479 } 1480 1481 static int check_capsnap_flush(struct ceph_inode_info *ci, 1482 u64 want_snap_seq) 1483 { 1484 int ret = 1; 1485 spin_lock(&ci->i_ceph_lock); 1486 if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) { 1487 struct ceph_cap_snap *capsnap = 1488 list_first_entry(&ci->i_cap_snaps, 1489 struct ceph_cap_snap, ci_item); 1490 ret = capsnap->follows >= want_snap_seq; 1491 } 1492 spin_unlock(&ci->i_ceph_lock); 1493 return ret; 1494 } 1495 1496 static int check_caps_flush(struct ceph_mds_client *mdsc, 1497 u64 want_flush_tid) 1498 { 1499 struct rb_node *n; 1500 struct ceph_cap_flush *cf; 1501 int ret = 1; 1502 1503 spin_lock(&mdsc->cap_dirty_lock); 1504 n = rb_first(&mdsc->cap_flush_tree); 1505 cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL; 1506 if (cf && cf->tid <= want_flush_tid) { 1507 dout("check_caps_flush still flushing tid %llu <= %llu\n", 1508 cf->tid, want_flush_tid); 1509 ret = 0; 1510 } 1511 spin_unlock(&mdsc->cap_dirty_lock); 1512 return ret; 1513 } 1514 1515 /* 1516 * flush all dirty inode data to disk. 1517 * 1518 * returns true if we've flushed through want_flush_tid 1519 */ 1520 static void wait_caps_flush(struct ceph_mds_client *mdsc, 1521 u64 want_flush_tid, u64 want_snap_seq) 1522 { 1523 int mds; 1524 1525 dout("check_caps_flush want %llu snap want %llu\n", 1526 want_flush_tid, want_snap_seq); 1527 mutex_lock(&mdsc->mutex); 1528 for (mds = 0; mds < mdsc->max_sessions; ) { 1529 struct ceph_mds_session *session = mdsc->sessions[mds]; 1530 struct inode *inode = NULL; 1531 1532 if (!session) { 1533 mds++; 1534 continue; 1535 } 1536 get_session(session); 1537 mutex_unlock(&mdsc->mutex); 1538 1539 mutex_lock(&session->s_mutex); 1540 if (!list_empty(&session->s_cap_snaps_flushing)) { 1541 struct ceph_cap_snap *capsnap = 1542 list_first_entry(&session->s_cap_snaps_flushing, 1543 struct ceph_cap_snap, 1544 flushing_item); 1545 struct ceph_inode_info *ci = capsnap->ci; 1546 if (!check_capsnap_flush(ci, want_snap_seq)) { 1547 dout("check_cap_flush still flushing snap %p " 1548 "follows %lld <= %lld to mds%d\n", 1549 &ci->vfs_inode, capsnap->follows, 1550 want_snap_seq, mds); 1551 inode = igrab(&ci->vfs_inode); 1552 } 1553 } 1554 mutex_unlock(&session->s_mutex); 1555 ceph_put_mds_session(session); 1556 1557 if (inode) { 1558 wait_event(mdsc->cap_flushing_wq, 1559 check_capsnap_flush(ceph_inode(inode), 1560 want_snap_seq)); 1561 iput(inode); 1562 } else { 1563 mds++; 1564 } 1565 1566 mutex_lock(&mdsc->mutex); 1567 } 1568 mutex_unlock(&mdsc->mutex); 1569 1570 wait_event(mdsc->cap_flushing_wq, 1571 check_caps_flush(mdsc, want_flush_tid)); 1572 1573 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 1574 } 1575 1576 /* 1577 * called under s_mutex 1578 */ 1579 void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 1580 struct ceph_mds_session *session) 1581 { 1582 struct ceph_msg *msg = NULL; 1583 struct ceph_mds_cap_release *head; 1584 struct ceph_mds_cap_item *item; 1585 struct ceph_cap *cap; 1586 LIST_HEAD(tmp_list); 1587 int num_cap_releases; 1588 1589 spin_lock(&session->s_cap_lock); 1590 again: 1591 list_splice_init(&session->s_cap_releases, &tmp_list); 1592 num_cap_releases = session->s_num_cap_releases; 1593 session->s_num_cap_releases = 0; 1594 spin_unlock(&session->s_cap_lock); 1595 1596 while (!list_empty(&tmp_list)) { 1597 if (!msg) { 1598 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 1599 PAGE_SIZE, GFP_NOFS, false); 1600 if (!msg) 1601 goto out_err; 1602 head = msg->front.iov_base; 1603 head->num = cpu_to_le32(0); 1604 msg->front.iov_len = sizeof(*head); 1605 } 1606 cap = list_first_entry(&tmp_list, struct ceph_cap, 1607 session_caps); 1608 list_del(&cap->session_caps); 1609 num_cap_releases--; 1610 1611 head = msg->front.iov_base; 1612 le32_add_cpu(&head->num, 1); 1613 item = msg->front.iov_base + msg->front.iov_len; 1614 item->ino = cpu_to_le64(cap->cap_ino); 1615 item->cap_id = cpu_to_le64(cap->cap_id); 1616 item->migrate_seq = cpu_to_le32(cap->mseq); 1617 item->seq = cpu_to_le32(cap->issue_seq); 1618 msg->front.iov_len += sizeof(*item); 1619 1620 ceph_put_cap(mdsc, cap); 1621 1622 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 1623 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1624 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 1625 ceph_con_send(&session->s_con, msg); 1626 msg = NULL; 1627 } 1628 } 1629 1630 BUG_ON(num_cap_releases != 0); 1631 1632 spin_lock(&session->s_cap_lock); 1633 if (!list_empty(&session->s_cap_releases)) 1634 goto again; 1635 spin_unlock(&session->s_cap_lock); 1636 1637 if (msg) { 1638 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1639 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 1640 ceph_con_send(&session->s_con, msg); 1641 } 1642 return; 1643 out_err: 1644 pr_err("send_cap_releases mds%d, failed to allocate message\n", 1645 session->s_mds); 1646 spin_lock(&session->s_cap_lock); 1647 list_splice(&tmp_list, &session->s_cap_releases); 1648 session->s_num_cap_releases += num_cap_releases; 1649 spin_unlock(&session->s_cap_lock); 1650 } 1651 1652 /* 1653 * requests 1654 */ 1655 1656 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 1657 struct inode *dir) 1658 { 1659 struct ceph_inode_info *ci = ceph_inode(dir); 1660 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 1661 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 1662 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 1663 int order, num_entries; 1664 1665 spin_lock(&ci->i_ceph_lock); 1666 num_entries = ci->i_files + ci->i_subdirs; 1667 spin_unlock(&ci->i_ceph_lock); 1668 num_entries = max(num_entries, 1); 1669 num_entries = min(num_entries, opt->max_readdir); 1670 1671 order = get_order(size * num_entries); 1672 while (order >= 0) { 1673 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 1674 __GFP_NOWARN, 1675 order); 1676 if (rinfo->dir_entries) 1677 break; 1678 order--; 1679 } 1680 if (!rinfo->dir_entries) 1681 return -ENOMEM; 1682 1683 num_entries = (PAGE_SIZE << order) / size; 1684 num_entries = min(num_entries, opt->max_readdir); 1685 1686 rinfo->dir_buf_size = PAGE_SIZE << order; 1687 req->r_num_caps = num_entries + 1; 1688 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 1689 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 1690 return 0; 1691 } 1692 1693 /* 1694 * Create an mds request. 1695 */ 1696 struct ceph_mds_request * 1697 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 1698 { 1699 struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS); 1700 1701 if (!req) 1702 return ERR_PTR(-ENOMEM); 1703 1704 mutex_init(&req->r_fill_mutex); 1705 req->r_mdsc = mdsc; 1706 req->r_started = jiffies; 1707 req->r_resend_mds = -1; 1708 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 1709 INIT_LIST_HEAD(&req->r_unsafe_target_item); 1710 req->r_fmode = -1; 1711 kref_init(&req->r_kref); 1712 RB_CLEAR_NODE(&req->r_node); 1713 INIT_LIST_HEAD(&req->r_wait); 1714 init_completion(&req->r_completion); 1715 init_completion(&req->r_safe_completion); 1716 INIT_LIST_HEAD(&req->r_unsafe_item); 1717 1718 req->r_stamp = current_fs_time(mdsc->fsc->sb); 1719 1720 req->r_op = op; 1721 req->r_direct_mode = mode; 1722 return req; 1723 } 1724 1725 /* 1726 * return oldest (lowest) request, tid in request tree, 0 if none. 1727 * 1728 * called under mdsc->mutex. 1729 */ 1730 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 1731 { 1732 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 1733 return NULL; 1734 return rb_entry(rb_first(&mdsc->request_tree), 1735 struct ceph_mds_request, r_node); 1736 } 1737 1738 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 1739 { 1740 return mdsc->oldest_tid; 1741 } 1742 1743 /* 1744 * Build a dentry's path. Allocate on heap; caller must kfree. Based 1745 * on build_path_from_dentry in fs/cifs/dir.c. 1746 * 1747 * If @stop_on_nosnap, generate path relative to the first non-snapped 1748 * inode. 1749 * 1750 * Encode hidden .snap dirs as a double /, i.e. 1751 * foo/.snap/bar -> foo//bar 1752 */ 1753 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base, 1754 int stop_on_nosnap) 1755 { 1756 struct dentry *temp; 1757 char *path; 1758 int len, pos; 1759 unsigned seq; 1760 1761 if (dentry == NULL) 1762 return ERR_PTR(-EINVAL); 1763 1764 retry: 1765 len = 0; 1766 seq = read_seqbegin(&rename_lock); 1767 rcu_read_lock(); 1768 for (temp = dentry; !IS_ROOT(temp);) { 1769 struct inode *inode = d_inode(temp); 1770 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) 1771 len++; /* slash only */ 1772 else if (stop_on_nosnap && inode && 1773 ceph_snap(inode) == CEPH_NOSNAP) 1774 break; 1775 else 1776 len += 1 + temp->d_name.len; 1777 temp = temp->d_parent; 1778 } 1779 rcu_read_unlock(); 1780 if (len) 1781 len--; /* no leading '/' */ 1782 1783 path = kmalloc(len+1, GFP_NOFS); 1784 if (path == NULL) 1785 return ERR_PTR(-ENOMEM); 1786 pos = len; 1787 path[pos] = 0; /* trailing null */ 1788 rcu_read_lock(); 1789 for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) { 1790 struct inode *inode; 1791 1792 spin_lock(&temp->d_lock); 1793 inode = d_inode(temp); 1794 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 1795 dout("build_path path+%d: %p SNAPDIR\n", 1796 pos, temp); 1797 } else if (stop_on_nosnap && inode && 1798 ceph_snap(inode) == CEPH_NOSNAP) { 1799 spin_unlock(&temp->d_lock); 1800 break; 1801 } else { 1802 pos -= temp->d_name.len; 1803 if (pos < 0) { 1804 spin_unlock(&temp->d_lock); 1805 break; 1806 } 1807 strncpy(path + pos, temp->d_name.name, 1808 temp->d_name.len); 1809 } 1810 spin_unlock(&temp->d_lock); 1811 if (pos) 1812 path[--pos] = '/'; 1813 temp = temp->d_parent; 1814 } 1815 rcu_read_unlock(); 1816 if (pos != 0 || read_seqretry(&rename_lock, seq)) { 1817 pr_err("build_path did not end path lookup where " 1818 "expected, namelen is %d, pos is %d\n", len, pos); 1819 /* presumably this is only possible if racing with a 1820 rename of one of the parent directories (we can not 1821 lock the dentries above us to prevent this, but 1822 retrying should be harmless) */ 1823 kfree(path); 1824 goto retry; 1825 } 1826 1827 *base = ceph_ino(d_inode(temp)); 1828 *plen = len; 1829 dout("build_path on %p %d built %llx '%.*s'\n", 1830 dentry, d_count(dentry), *base, len, path); 1831 return path; 1832 } 1833 1834 static int build_dentry_path(struct dentry *dentry, 1835 const char **ppath, int *ppathlen, u64 *pino, 1836 int *pfreepath) 1837 { 1838 char *path; 1839 1840 if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_NOSNAP) { 1841 *pino = ceph_ino(d_inode(dentry->d_parent)); 1842 *ppath = dentry->d_name.name; 1843 *ppathlen = dentry->d_name.len; 1844 return 0; 1845 } 1846 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 1847 if (IS_ERR(path)) 1848 return PTR_ERR(path); 1849 *ppath = path; 1850 *pfreepath = 1; 1851 return 0; 1852 } 1853 1854 static int build_inode_path(struct inode *inode, 1855 const char **ppath, int *ppathlen, u64 *pino, 1856 int *pfreepath) 1857 { 1858 struct dentry *dentry; 1859 char *path; 1860 1861 if (ceph_snap(inode) == CEPH_NOSNAP) { 1862 *pino = ceph_ino(inode); 1863 *ppathlen = 0; 1864 return 0; 1865 } 1866 dentry = d_find_alias(inode); 1867 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 1868 dput(dentry); 1869 if (IS_ERR(path)) 1870 return PTR_ERR(path); 1871 *ppath = path; 1872 *pfreepath = 1; 1873 return 0; 1874 } 1875 1876 /* 1877 * request arguments may be specified via an inode *, a dentry *, or 1878 * an explicit ino+path. 1879 */ 1880 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 1881 const char *rpath, u64 rino, 1882 const char **ppath, int *pathlen, 1883 u64 *ino, int *freepath) 1884 { 1885 int r = 0; 1886 1887 if (rinode) { 1888 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 1889 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 1890 ceph_snap(rinode)); 1891 } else if (rdentry) { 1892 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath); 1893 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 1894 *ppath); 1895 } else if (rpath || rino) { 1896 *ino = rino; 1897 *ppath = rpath; 1898 *pathlen = rpath ? strlen(rpath) : 0; 1899 dout(" path %.*s\n", *pathlen, rpath); 1900 } 1901 1902 return r; 1903 } 1904 1905 /* 1906 * called under mdsc->mutex 1907 */ 1908 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, 1909 struct ceph_mds_request *req, 1910 int mds, bool drop_cap_releases) 1911 { 1912 struct ceph_msg *msg; 1913 struct ceph_mds_request_head *head; 1914 const char *path1 = NULL; 1915 const char *path2 = NULL; 1916 u64 ino1 = 0, ino2 = 0; 1917 int pathlen1 = 0, pathlen2 = 0; 1918 int freepath1 = 0, freepath2 = 0; 1919 int len; 1920 u16 releases; 1921 void *p, *end; 1922 int ret; 1923 1924 ret = set_request_path_attr(req->r_inode, req->r_dentry, 1925 req->r_path1, req->r_ino1.ino, 1926 &path1, &pathlen1, &ino1, &freepath1); 1927 if (ret < 0) { 1928 msg = ERR_PTR(ret); 1929 goto out; 1930 } 1931 1932 ret = set_request_path_attr(NULL, req->r_old_dentry, 1933 req->r_path2, req->r_ino2.ino, 1934 &path2, &pathlen2, &ino2, &freepath2); 1935 if (ret < 0) { 1936 msg = ERR_PTR(ret); 1937 goto out_free1; 1938 } 1939 1940 len = sizeof(*head) + 1941 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 1942 sizeof(struct ceph_timespec); 1943 1944 /* calculate (max) length for cap releases */ 1945 len += sizeof(struct ceph_mds_request_release) * 1946 (!!req->r_inode_drop + !!req->r_dentry_drop + 1947 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 1948 if (req->r_dentry_drop) 1949 len += req->r_dentry->d_name.len; 1950 if (req->r_old_dentry_drop) 1951 len += req->r_old_dentry->d_name.len; 1952 1953 msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false); 1954 if (!msg) { 1955 msg = ERR_PTR(-ENOMEM); 1956 goto out_free2; 1957 } 1958 1959 msg->hdr.version = cpu_to_le16(2); 1960 msg->hdr.tid = cpu_to_le64(req->r_tid); 1961 1962 head = msg->front.iov_base; 1963 p = msg->front.iov_base + sizeof(*head); 1964 end = msg->front.iov_base + msg->front.iov_len; 1965 1966 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 1967 head->op = cpu_to_le32(req->r_op); 1968 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid)); 1969 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid)); 1970 head->args = req->r_args; 1971 1972 ceph_encode_filepath(&p, end, ino1, path1); 1973 ceph_encode_filepath(&p, end, ino2, path2); 1974 1975 /* make note of release offset, in case we need to replay */ 1976 req->r_request_release_offset = p - msg->front.iov_base; 1977 1978 /* cap releases */ 1979 releases = 0; 1980 if (req->r_inode_drop) 1981 releases += ceph_encode_inode_release(&p, 1982 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 1983 mds, req->r_inode_drop, req->r_inode_unless, 0); 1984 if (req->r_dentry_drop) 1985 releases += ceph_encode_dentry_release(&p, req->r_dentry, 1986 mds, req->r_dentry_drop, req->r_dentry_unless); 1987 if (req->r_old_dentry_drop) 1988 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 1989 mds, req->r_old_dentry_drop, req->r_old_dentry_unless); 1990 if (req->r_old_inode_drop) 1991 releases += ceph_encode_inode_release(&p, 1992 d_inode(req->r_old_dentry), 1993 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 1994 1995 if (drop_cap_releases) { 1996 releases = 0; 1997 p = msg->front.iov_base + req->r_request_release_offset; 1998 } 1999 2000 head->num_releases = cpu_to_le16(releases); 2001 2002 /* time stamp */ 2003 { 2004 struct ceph_timespec ts; 2005 ceph_encode_timespec(&ts, &req->r_stamp); 2006 ceph_encode_copy(&p, &ts, sizeof(ts)); 2007 } 2008 2009 BUG_ON(p > end); 2010 msg->front.iov_len = p - msg->front.iov_base; 2011 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2012 2013 if (req->r_pagelist) { 2014 struct ceph_pagelist *pagelist = req->r_pagelist; 2015 atomic_inc(&pagelist->refcnt); 2016 ceph_msg_data_add_pagelist(msg, pagelist); 2017 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2018 } else { 2019 msg->hdr.data_len = 0; 2020 } 2021 2022 msg->hdr.data_off = cpu_to_le16(0); 2023 2024 out_free2: 2025 if (freepath2) 2026 kfree((char *)path2); 2027 out_free1: 2028 if (freepath1) 2029 kfree((char *)path1); 2030 out: 2031 return msg; 2032 } 2033 2034 /* 2035 * called under mdsc->mutex if error, under no mutex if 2036 * success. 2037 */ 2038 static void complete_request(struct ceph_mds_client *mdsc, 2039 struct ceph_mds_request *req) 2040 { 2041 if (req->r_callback) 2042 req->r_callback(mdsc, req); 2043 else 2044 complete_all(&req->r_completion); 2045 } 2046 2047 /* 2048 * called under mdsc->mutex 2049 */ 2050 static int __prepare_send_request(struct ceph_mds_client *mdsc, 2051 struct ceph_mds_request *req, 2052 int mds, bool drop_cap_releases) 2053 { 2054 struct ceph_mds_request_head *rhead; 2055 struct ceph_msg *msg; 2056 int flags = 0; 2057 2058 req->r_attempts++; 2059 if (req->r_inode) { 2060 struct ceph_cap *cap = 2061 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2062 2063 if (cap) 2064 req->r_sent_on_mseq = cap->mseq; 2065 else 2066 req->r_sent_on_mseq = -1; 2067 } 2068 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 2069 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2070 2071 if (req->r_got_unsafe) { 2072 void *p; 2073 /* 2074 * Replay. Do not regenerate message (and rebuild 2075 * paths, etc.); just use the original message. 2076 * Rebuilding paths will break for renames because 2077 * d_move mangles the src name. 2078 */ 2079 msg = req->r_request; 2080 rhead = msg->front.iov_base; 2081 2082 flags = le32_to_cpu(rhead->flags); 2083 flags |= CEPH_MDS_FLAG_REPLAY; 2084 rhead->flags = cpu_to_le32(flags); 2085 2086 if (req->r_target_inode) 2087 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2088 2089 rhead->num_retry = req->r_attempts - 1; 2090 2091 /* remove cap/dentry releases from message */ 2092 rhead->num_releases = 0; 2093 2094 /* time stamp */ 2095 p = msg->front.iov_base + req->r_request_release_offset; 2096 { 2097 struct ceph_timespec ts; 2098 ceph_encode_timespec(&ts, &req->r_stamp); 2099 ceph_encode_copy(&p, &ts, sizeof(ts)); 2100 } 2101 2102 msg->front.iov_len = p - msg->front.iov_base; 2103 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2104 return 0; 2105 } 2106 2107 if (req->r_request) { 2108 ceph_msg_put(req->r_request); 2109 req->r_request = NULL; 2110 } 2111 msg = create_request_message(mdsc, req, mds, drop_cap_releases); 2112 if (IS_ERR(msg)) { 2113 req->r_err = PTR_ERR(msg); 2114 return PTR_ERR(msg); 2115 } 2116 req->r_request = msg; 2117 2118 rhead = msg->front.iov_base; 2119 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2120 if (req->r_got_unsafe) 2121 flags |= CEPH_MDS_FLAG_REPLAY; 2122 if (req->r_locked_dir) 2123 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2124 rhead->flags = cpu_to_le32(flags); 2125 rhead->num_fwd = req->r_num_fwd; 2126 rhead->num_retry = req->r_attempts - 1; 2127 rhead->ino = 0; 2128 2129 dout(" r_locked_dir = %p\n", req->r_locked_dir); 2130 return 0; 2131 } 2132 2133 /* 2134 * send request, or put it on the appropriate wait list. 2135 */ 2136 static int __do_request(struct ceph_mds_client *mdsc, 2137 struct ceph_mds_request *req) 2138 { 2139 struct ceph_mds_session *session = NULL; 2140 int mds = -1; 2141 int err = 0; 2142 2143 if (req->r_err || req->r_got_result) { 2144 if (req->r_aborted) 2145 __unregister_request(mdsc, req); 2146 goto out; 2147 } 2148 2149 if (req->r_timeout && 2150 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2151 dout("do_request timed out\n"); 2152 err = -EIO; 2153 goto finish; 2154 } 2155 if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2156 dout("do_request forced umount\n"); 2157 err = -EIO; 2158 goto finish; 2159 } 2160 2161 put_request_session(req); 2162 2163 mds = __choose_mds(mdsc, req); 2164 if (mds < 0 || 2165 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2166 dout("do_request no mds or not active, waiting for map\n"); 2167 list_add(&req->r_wait, &mdsc->waiting_for_map); 2168 goto out; 2169 } 2170 2171 /* get, open session */ 2172 session = __ceph_lookup_mds_session(mdsc, mds); 2173 if (!session) { 2174 session = register_session(mdsc, mds); 2175 if (IS_ERR(session)) { 2176 err = PTR_ERR(session); 2177 goto finish; 2178 } 2179 } 2180 req->r_session = get_session(session); 2181 2182 dout("do_request mds%d session %p state %s\n", mds, session, 2183 ceph_session_state_name(session->s_state)); 2184 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2185 session->s_state != CEPH_MDS_SESSION_HUNG) { 2186 if (session->s_state == CEPH_MDS_SESSION_NEW || 2187 session->s_state == CEPH_MDS_SESSION_CLOSING) 2188 __open_session(mdsc, session); 2189 list_add(&req->r_wait, &session->s_waiting); 2190 goto out_session; 2191 } 2192 2193 /* send request */ 2194 req->r_resend_mds = -1; /* forget any previous mds hint */ 2195 2196 if (req->r_request_started == 0) /* note request start time */ 2197 req->r_request_started = jiffies; 2198 2199 err = __prepare_send_request(mdsc, req, mds, false); 2200 if (!err) { 2201 ceph_msg_get(req->r_request); 2202 ceph_con_send(&session->s_con, req->r_request); 2203 } 2204 2205 out_session: 2206 ceph_put_mds_session(session); 2207 finish: 2208 if (err) { 2209 dout("__do_request early error %d\n", err); 2210 req->r_err = err; 2211 complete_request(mdsc, req); 2212 __unregister_request(mdsc, req); 2213 } 2214 out: 2215 return err; 2216 } 2217 2218 /* 2219 * called under mdsc->mutex 2220 */ 2221 static void __wake_requests(struct ceph_mds_client *mdsc, 2222 struct list_head *head) 2223 { 2224 struct ceph_mds_request *req; 2225 LIST_HEAD(tmp_list); 2226 2227 list_splice_init(head, &tmp_list); 2228 2229 while (!list_empty(&tmp_list)) { 2230 req = list_entry(tmp_list.next, 2231 struct ceph_mds_request, r_wait); 2232 list_del_init(&req->r_wait); 2233 dout(" wake request %p tid %llu\n", req, req->r_tid); 2234 __do_request(mdsc, req); 2235 } 2236 } 2237 2238 /* 2239 * Wake up threads with requests pending for @mds, so that they can 2240 * resubmit their requests to a possibly different mds. 2241 */ 2242 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 2243 { 2244 struct ceph_mds_request *req; 2245 struct rb_node *p = rb_first(&mdsc->request_tree); 2246 2247 dout("kick_requests mds%d\n", mds); 2248 while (p) { 2249 req = rb_entry(p, struct ceph_mds_request, r_node); 2250 p = rb_next(p); 2251 if (req->r_got_unsafe) 2252 continue; 2253 if (req->r_attempts > 0) 2254 continue; /* only new requests */ 2255 if (req->r_session && 2256 req->r_session->s_mds == mds) { 2257 dout(" kicking tid %llu\n", req->r_tid); 2258 list_del_init(&req->r_wait); 2259 __do_request(mdsc, req); 2260 } 2261 } 2262 } 2263 2264 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, 2265 struct ceph_mds_request *req) 2266 { 2267 dout("submit_request on %p\n", req); 2268 mutex_lock(&mdsc->mutex); 2269 __register_request(mdsc, req, NULL); 2270 __do_request(mdsc, req); 2271 mutex_unlock(&mdsc->mutex); 2272 } 2273 2274 /* 2275 * Synchrously perform an mds request. Take care of all of the 2276 * session setup, forwarding, retry details. 2277 */ 2278 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 2279 struct inode *dir, 2280 struct ceph_mds_request *req) 2281 { 2282 int err; 2283 2284 dout("do_request on %p\n", req); 2285 2286 /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */ 2287 if (req->r_inode) 2288 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2289 if (req->r_locked_dir) 2290 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 2291 if (req->r_old_dentry_dir) 2292 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2293 CEPH_CAP_PIN); 2294 2295 /* deny access to directories with pool_ns layouts */ 2296 if (req->r_inode && S_ISDIR(req->r_inode->i_mode) && 2297 ceph_inode(req->r_inode)->i_pool_ns_len) 2298 return -EIO; 2299 if (req->r_locked_dir && 2300 ceph_inode(req->r_locked_dir)->i_pool_ns_len) 2301 return -EIO; 2302 2303 /* issue */ 2304 mutex_lock(&mdsc->mutex); 2305 __register_request(mdsc, req, dir); 2306 __do_request(mdsc, req); 2307 2308 if (req->r_err) { 2309 err = req->r_err; 2310 goto out; 2311 } 2312 2313 /* wait */ 2314 mutex_unlock(&mdsc->mutex); 2315 dout("do_request waiting\n"); 2316 if (!req->r_timeout && req->r_wait_for_completion) { 2317 err = req->r_wait_for_completion(mdsc, req); 2318 } else { 2319 long timeleft = wait_for_completion_killable_timeout( 2320 &req->r_completion, 2321 ceph_timeout_jiffies(req->r_timeout)); 2322 if (timeleft > 0) 2323 err = 0; 2324 else if (!timeleft) 2325 err = -EIO; /* timed out */ 2326 else 2327 err = timeleft; /* killed */ 2328 } 2329 dout("do_request waited, got %d\n", err); 2330 mutex_lock(&mdsc->mutex); 2331 2332 /* only abort if we didn't race with a real reply */ 2333 if (req->r_got_result) { 2334 err = le32_to_cpu(req->r_reply_info.head->result); 2335 } else if (err < 0) { 2336 dout("aborted request %lld with %d\n", req->r_tid, err); 2337 2338 /* 2339 * ensure we aren't running concurrently with 2340 * ceph_fill_trace or ceph_readdir_prepopulate, which 2341 * rely on locks (dir mutex) held by our caller. 2342 */ 2343 mutex_lock(&req->r_fill_mutex); 2344 req->r_err = err; 2345 req->r_aborted = true; 2346 mutex_unlock(&req->r_fill_mutex); 2347 2348 if (req->r_locked_dir && 2349 (req->r_op & CEPH_MDS_OP_WRITE)) 2350 ceph_invalidate_dir_request(req); 2351 } else { 2352 err = req->r_err; 2353 } 2354 2355 out: 2356 mutex_unlock(&mdsc->mutex); 2357 dout("do_request %p done, result %d\n", req, err); 2358 return err; 2359 } 2360 2361 /* 2362 * Invalidate dir's completeness, dentry lease state on an aborted MDS 2363 * namespace request. 2364 */ 2365 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 2366 { 2367 struct inode *inode = req->r_locked_dir; 2368 2369 dout("invalidate_dir_request %p (complete, lease(s))\n", inode); 2370 2371 ceph_dir_clear_complete(inode); 2372 if (req->r_dentry) 2373 ceph_invalidate_dentry_lease(req->r_dentry); 2374 if (req->r_old_dentry) 2375 ceph_invalidate_dentry_lease(req->r_old_dentry); 2376 } 2377 2378 /* 2379 * Handle mds reply. 2380 * 2381 * We take the session mutex and parse and process the reply immediately. 2382 * This preserves the logical ordering of replies, capabilities, etc., sent 2383 * by the MDS as they are applied to our local cache. 2384 */ 2385 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 2386 { 2387 struct ceph_mds_client *mdsc = session->s_mdsc; 2388 struct ceph_mds_request *req; 2389 struct ceph_mds_reply_head *head = msg->front.iov_base; 2390 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 2391 struct ceph_snap_realm *realm; 2392 u64 tid; 2393 int err, result; 2394 int mds = session->s_mds; 2395 2396 if (msg->front.iov_len < sizeof(*head)) { 2397 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 2398 ceph_msg_dump(msg); 2399 return; 2400 } 2401 2402 /* get request, session */ 2403 tid = le64_to_cpu(msg->hdr.tid); 2404 mutex_lock(&mdsc->mutex); 2405 req = lookup_get_request(mdsc, tid); 2406 if (!req) { 2407 dout("handle_reply on unknown tid %llu\n", tid); 2408 mutex_unlock(&mdsc->mutex); 2409 return; 2410 } 2411 dout("handle_reply %p\n", req); 2412 2413 /* correct session? */ 2414 if (req->r_session != session) { 2415 pr_err("mdsc_handle_reply got %llu on session mds%d" 2416 " not mds%d\n", tid, session->s_mds, 2417 req->r_session ? req->r_session->s_mds : -1); 2418 mutex_unlock(&mdsc->mutex); 2419 goto out; 2420 } 2421 2422 /* dup? */ 2423 if ((req->r_got_unsafe && !head->safe) || 2424 (req->r_got_safe && head->safe)) { 2425 pr_warn("got a dup %s reply on %llu from mds%d\n", 2426 head->safe ? "safe" : "unsafe", tid, mds); 2427 mutex_unlock(&mdsc->mutex); 2428 goto out; 2429 } 2430 if (req->r_got_safe) { 2431 pr_warn("got unsafe after safe on %llu from mds%d\n", 2432 tid, mds); 2433 mutex_unlock(&mdsc->mutex); 2434 goto out; 2435 } 2436 2437 result = le32_to_cpu(head->result); 2438 2439 /* 2440 * Handle an ESTALE 2441 * if we're not talking to the authority, send to them 2442 * if the authority has changed while we weren't looking, 2443 * send to new authority 2444 * Otherwise we just have to return an ESTALE 2445 */ 2446 if (result == -ESTALE) { 2447 dout("got ESTALE on request %llu", req->r_tid); 2448 req->r_resend_mds = -1; 2449 if (req->r_direct_mode != USE_AUTH_MDS) { 2450 dout("not using auth, setting for that now"); 2451 req->r_direct_mode = USE_AUTH_MDS; 2452 __do_request(mdsc, req); 2453 mutex_unlock(&mdsc->mutex); 2454 goto out; 2455 } else { 2456 int mds = __choose_mds(mdsc, req); 2457 if (mds >= 0 && mds != req->r_session->s_mds) { 2458 dout("but auth changed, so resending"); 2459 __do_request(mdsc, req); 2460 mutex_unlock(&mdsc->mutex); 2461 goto out; 2462 } 2463 } 2464 dout("have to return ESTALE on request %llu", req->r_tid); 2465 } 2466 2467 2468 if (head->safe) { 2469 req->r_got_safe = true; 2470 __unregister_request(mdsc, req); 2471 2472 if (req->r_got_unsafe) { 2473 /* 2474 * We already handled the unsafe response, now do the 2475 * cleanup. No need to examine the response; the MDS 2476 * doesn't include any result info in the safe 2477 * response. And even if it did, there is nothing 2478 * useful we could do with a revised return value. 2479 */ 2480 dout("got safe reply %llu, mds%d\n", tid, mds); 2481 list_del_init(&req->r_unsafe_item); 2482 2483 /* last unsafe request during umount? */ 2484 if (mdsc->stopping && !__get_oldest_req(mdsc)) 2485 complete_all(&mdsc->safe_umount_waiters); 2486 mutex_unlock(&mdsc->mutex); 2487 goto out; 2488 } 2489 } else { 2490 req->r_got_unsafe = true; 2491 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 2492 if (req->r_unsafe_dir) { 2493 struct ceph_inode_info *ci = 2494 ceph_inode(req->r_unsafe_dir); 2495 spin_lock(&ci->i_unsafe_lock); 2496 list_add_tail(&req->r_unsafe_dir_item, 2497 &ci->i_unsafe_dirops); 2498 spin_unlock(&ci->i_unsafe_lock); 2499 } 2500 } 2501 2502 dout("handle_reply tid %lld result %d\n", tid, result); 2503 rinfo = &req->r_reply_info; 2504 err = parse_reply_info(msg, rinfo, session->s_con.peer_features); 2505 mutex_unlock(&mdsc->mutex); 2506 2507 mutex_lock(&session->s_mutex); 2508 if (err < 0) { 2509 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 2510 ceph_msg_dump(msg); 2511 goto out_err; 2512 } 2513 2514 /* snap trace */ 2515 realm = NULL; 2516 if (rinfo->snapblob_len) { 2517 down_write(&mdsc->snap_rwsem); 2518 ceph_update_snap_trace(mdsc, rinfo->snapblob, 2519 rinfo->snapblob + rinfo->snapblob_len, 2520 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 2521 &realm); 2522 downgrade_write(&mdsc->snap_rwsem); 2523 } else { 2524 down_read(&mdsc->snap_rwsem); 2525 } 2526 2527 /* insert trace into our cache */ 2528 mutex_lock(&req->r_fill_mutex); 2529 current->journal_info = req; 2530 err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); 2531 if (err == 0) { 2532 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 2533 req->r_op == CEPH_MDS_OP_LSSNAP)) 2534 ceph_readdir_prepopulate(req, req->r_session); 2535 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 2536 } 2537 current->journal_info = NULL; 2538 mutex_unlock(&req->r_fill_mutex); 2539 2540 up_read(&mdsc->snap_rwsem); 2541 if (realm) 2542 ceph_put_snap_realm(mdsc, realm); 2543 2544 if (err == 0 && req->r_got_unsafe && req->r_target_inode) { 2545 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 2546 spin_lock(&ci->i_unsafe_lock); 2547 list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops); 2548 spin_unlock(&ci->i_unsafe_lock); 2549 } 2550 out_err: 2551 mutex_lock(&mdsc->mutex); 2552 if (!req->r_aborted) { 2553 if (err) { 2554 req->r_err = err; 2555 } else { 2556 req->r_reply = ceph_msg_get(msg); 2557 req->r_got_result = true; 2558 } 2559 } else { 2560 dout("reply arrived after request %lld was aborted\n", tid); 2561 } 2562 mutex_unlock(&mdsc->mutex); 2563 2564 mutex_unlock(&session->s_mutex); 2565 2566 /* kick calling process */ 2567 complete_request(mdsc, req); 2568 out: 2569 ceph_mdsc_put_request(req); 2570 return; 2571 } 2572 2573 2574 2575 /* 2576 * handle mds notification that our request has been forwarded. 2577 */ 2578 static void handle_forward(struct ceph_mds_client *mdsc, 2579 struct ceph_mds_session *session, 2580 struct ceph_msg *msg) 2581 { 2582 struct ceph_mds_request *req; 2583 u64 tid = le64_to_cpu(msg->hdr.tid); 2584 u32 next_mds; 2585 u32 fwd_seq; 2586 int err = -EINVAL; 2587 void *p = msg->front.iov_base; 2588 void *end = p + msg->front.iov_len; 2589 2590 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 2591 next_mds = ceph_decode_32(&p); 2592 fwd_seq = ceph_decode_32(&p); 2593 2594 mutex_lock(&mdsc->mutex); 2595 req = lookup_get_request(mdsc, tid); 2596 if (!req) { 2597 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 2598 goto out; /* dup reply? */ 2599 } 2600 2601 if (req->r_aborted) { 2602 dout("forward tid %llu aborted, unregistering\n", tid); 2603 __unregister_request(mdsc, req); 2604 } else if (fwd_seq <= req->r_num_fwd) { 2605 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 2606 tid, next_mds, req->r_num_fwd, fwd_seq); 2607 } else { 2608 /* resend. forward race not possible; mds would drop */ 2609 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 2610 BUG_ON(req->r_err); 2611 BUG_ON(req->r_got_result); 2612 req->r_attempts = 0; 2613 req->r_num_fwd = fwd_seq; 2614 req->r_resend_mds = next_mds; 2615 put_request_session(req); 2616 __do_request(mdsc, req); 2617 } 2618 ceph_mdsc_put_request(req); 2619 out: 2620 mutex_unlock(&mdsc->mutex); 2621 return; 2622 2623 bad: 2624 pr_err("mdsc_handle_forward decode error err=%d\n", err); 2625 } 2626 2627 /* 2628 * handle a mds session control message 2629 */ 2630 static void handle_session(struct ceph_mds_session *session, 2631 struct ceph_msg *msg) 2632 { 2633 struct ceph_mds_client *mdsc = session->s_mdsc; 2634 u32 op; 2635 u64 seq; 2636 int mds = session->s_mds; 2637 struct ceph_mds_session_head *h = msg->front.iov_base; 2638 int wake = 0; 2639 2640 /* decode */ 2641 if (msg->front.iov_len != sizeof(*h)) 2642 goto bad; 2643 op = le32_to_cpu(h->op); 2644 seq = le64_to_cpu(h->seq); 2645 2646 mutex_lock(&mdsc->mutex); 2647 if (op == CEPH_SESSION_CLOSE) 2648 __unregister_session(mdsc, session); 2649 /* FIXME: this ttl calculation is generous */ 2650 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 2651 mutex_unlock(&mdsc->mutex); 2652 2653 mutex_lock(&session->s_mutex); 2654 2655 dout("handle_session mds%d %s %p state %s seq %llu\n", 2656 mds, ceph_session_op_name(op), session, 2657 ceph_session_state_name(session->s_state), seq); 2658 2659 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 2660 session->s_state = CEPH_MDS_SESSION_OPEN; 2661 pr_info("mds%d came back\n", session->s_mds); 2662 } 2663 2664 switch (op) { 2665 case CEPH_SESSION_OPEN: 2666 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 2667 pr_info("mds%d reconnect success\n", session->s_mds); 2668 session->s_state = CEPH_MDS_SESSION_OPEN; 2669 renewed_caps(mdsc, session, 0); 2670 wake = 1; 2671 if (mdsc->stopping) 2672 __close_session(mdsc, session); 2673 break; 2674 2675 case CEPH_SESSION_RENEWCAPS: 2676 if (session->s_renew_seq == seq) 2677 renewed_caps(mdsc, session, 1); 2678 break; 2679 2680 case CEPH_SESSION_CLOSE: 2681 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 2682 pr_info("mds%d reconnect denied\n", session->s_mds); 2683 cleanup_session_requests(mdsc, session); 2684 remove_session_caps(session); 2685 wake = 2; /* for good measure */ 2686 wake_up_all(&mdsc->session_close_wq); 2687 break; 2688 2689 case CEPH_SESSION_STALE: 2690 pr_info("mds%d caps went stale, renewing\n", 2691 session->s_mds); 2692 spin_lock(&session->s_gen_ttl_lock); 2693 session->s_cap_gen++; 2694 session->s_cap_ttl = jiffies - 1; 2695 spin_unlock(&session->s_gen_ttl_lock); 2696 send_renew_caps(mdsc, session); 2697 break; 2698 2699 case CEPH_SESSION_RECALL_STATE: 2700 trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 2701 break; 2702 2703 case CEPH_SESSION_FLUSHMSG: 2704 send_flushmsg_ack(mdsc, session, seq); 2705 break; 2706 2707 case CEPH_SESSION_FORCE_RO: 2708 dout("force_session_readonly %p\n", session); 2709 spin_lock(&session->s_cap_lock); 2710 session->s_readonly = true; 2711 spin_unlock(&session->s_cap_lock); 2712 wake_up_session_caps(session, 0); 2713 break; 2714 2715 default: 2716 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 2717 WARN_ON(1); 2718 } 2719 2720 mutex_unlock(&session->s_mutex); 2721 if (wake) { 2722 mutex_lock(&mdsc->mutex); 2723 __wake_requests(mdsc, &session->s_waiting); 2724 if (wake == 2) 2725 kick_requests(mdsc, mds); 2726 mutex_unlock(&mdsc->mutex); 2727 } 2728 return; 2729 2730 bad: 2731 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 2732 (int)msg->front.iov_len); 2733 ceph_msg_dump(msg); 2734 return; 2735 } 2736 2737 2738 /* 2739 * called under session->mutex. 2740 */ 2741 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 2742 struct ceph_mds_session *session) 2743 { 2744 struct ceph_mds_request *req, *nreq; 2745 struct rb_node *p; 2746 int err; 2747 2748 dout("replay_unsafe_requests mds%d\n", session->s_mds); 2749 2750 mutex_lock(&mdsc->mutex); 2751 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) { 2752 err = __prepare_send_request(mdsc, req, session->s_mds, true); 2753 if (!err) { 2754 ceph_msg_get(req->r_request); 2755 ceph_con_send(&session->s_con, req->r_request); 2756 } 2757 } 2758 2759 /* 2760 * also re-send old requests when MDS enters reconnect stage. So that MDS 2761 * can process completed request in clientreplay stage. 2762 */ 2763 p = rb_first(&mdsc->request_tree); 2764 while (p) { 2765 req = rb_entry(p, struct ceph_mds_request, r_node); 2766 p = rb_next(p); 2767 if (req->r_got_unsafe) 2768 continue; 2769 if (req->r_attempts == 0) 2770 continue; /* only old requests */ 2771 if (req->r_session && 2772 req->r_session->s_mds == session->s_mds) { 2773 err = __prepare_send_request(mdsc, req, 2774 session->s_mds, true); 2775 if (!err) { 2776 ceph_msg_get(req->r_request); 2777 ceph_con_send(&session->s_con, req->r_request); 2778 } 2779 } 2780 } 2781 mutex_unlock(&mdsc->mutex); 2782 } 2783 2784 /* 2785 * Encode information about a cap for a reconnect with the MDS. 2786 */ 2787 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, 2788 void *arg) 2789 { 2790 union { 2791 struct ceph_mds_cap_reconnect v2; 2792 struct ceph_mds_cap_reconnect_v1 v1; 2793 } rec; 2794 size_t reclen; 2795 struct ceph_inode_info *ci; 2796 struct ceph_reconnect_state *recon_state = arg; 2797 struct ceph_pagelist *pagelist = recon_state->pagelist; 2798 char *path; 2799 int pathlen, err; 2800 u64 pathbase; 2801 struct dentry *dentry; 2802 2803 ci = cap->ci; 2804 2805 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 2806 inode, ceph_vinop(inode), cap, cap->cap_id, 2807 ceph_cap_string(cap->issued)); 2808 err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 2809 if (err) 2810 return err; 2811 2812 dentry = d_find_alias(inode); 2813 if (dentry) { 2814 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0); 2815 if (IS_ERR(path)) { 2816 err = PTR_ERR(path); 2817 goto out_dput; 2818 } 2819 } else { 2820 path = NULL; 2821 pathlen = 0; 2822 } 2823 err = ceph_pagelist_encode_string(pagelist, path, pathlen); 2824 if (err) 2825 goto out_free; 2826 2827 spin_lock(&ci->i_ceph_lock); 2828 cap->seq = 0; /* reset cap seq */ 2829 cap->issue_seq = 0; /* and issue_seq */ 2830 cap->mseq = 0; /* and migrate_seq */ 2831 cap->cap_gen = cap->session->s_cap_gen; 2832 2833 if (recon_state->flock) { 2834 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 2835 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 2836 rec.v2.issued = cpu_to_le32(cap->issued); 2837 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 2838 rec.v2.pathbase = cpu_to_le64(pathbase); 2839 rec.v2.flock_len = 0; 2840 reclen = sizeof(rec.v2); 2841 } else { 2842 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 2843 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 2844 rec.v1.issued = cpu_to_le32(cap->issued); 2845 rec.v1.size = cpu_to_le64(inode->i_size); 2846 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime); 2847 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime); 2848 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 2849 rec.v1.pathbase = cpu_to_le64(pathbase); 2850 reclen = sizeof(rec.v1); 2851 } 2852 spin_unlock(&ci->i_ceph_lock); 2853 2854 if (recon_state->flock) { 2855 int num_fcntl_locks, num_flock_locks; 2856 struct ceph_filelock *flocks; 2857 2858 encode_again: 2859 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 2860 flocks = kmalloc((num_fcntl_locks+num_flock_locks) * 2861 sizeof(struct ceph_filelock), GFP_NOFS); 2862 if (!flocks) { 2863 err = -ENOMEM; 2864 goto out_free; 2865 } 2866 err = ceph_encode_locks_to_buffer(inode, flocks, 2867 num_fcntl_locks, 2868 num_flock_locks); 2869 if (err) { 2870 kfree(flocks); 2871 if (err == -ENOSPC) 2872 goto encode_again; 2873 goto out_free; 2874 } 2875 /* 2876 * number of encoded locks is stable, so copy to pagelist 2877 */ 2878 rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) + 2879 (num_fcntl_locks+num_flock_locks) * 2880 sizeof(struct ceph_filelock)); 2881 err = ceph_pagelist_append(pagelist, &rec, reclen); 2882 if (!err) 2883 err = ceph_locks_to_pagelist(flocks, pagelist, 2884 num_fcntl_locks, 2885 num_flock_locks); 2886 kfree(flocks); 2887 } else { 2888 err = ceph_pagelist_append(pagelist, &rec, reclen); 2889 } 2890 2891 recon_state->nr_caps++; 2892 out_free: 2893 kfree(path); 2894 out_dput: 2895 dput(dentry); 2896 return err; 2897 } 2898 2899 2900 /* 2901 * If an MDS fails and recovers, clients need to reconnect in order to 2902 * reestablish shared state. This includes all caps issued through 2903 * this session _and_ the snap_realm hierarchy. Because it's not 2904 * clear which snap realms the mds cares about, we send everything we 2905 * know about.. that ensures we'll then get any new info the 2906 * recovering MDS might have. 2907 * 2908 * This is a relatively heavyweight operation, but it's rare. 2909 * 2910 * called with mdsc->mutex held. 2911 */ 2912 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 2913 struct ceph_mds_session *session) 2914 { 2915 struct ceph_msg *reply; 2916 struct rb_node *p; 2917 int mds = session->s_mds; 2918 int err = -ENOMEM; 2919 int s_nr_caps; 2920 struct ceph_pagelist *pagelist; 2921 struct ceph_reconnect_state recon_state; 2922 2923 pr_info("mds%d reconnect start\n", mds); 2924 2925 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 2926 if (!pagelist) 2927 goto fail_nopagelist; 2928 ceph_pagelist_init(pagelist); 2929 2930 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false); 2931 if (!reply) 2932 goto fail_nomsg; 2933 2934 mutex_lock(&session->s_mutex); 2935 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 2936 session->s_seq = 0; 2937 2938 dout("session %p state %s\n", session, 2939 ceph_session_state_name(session->s_state)); 2940 2941 spin_lock(&session->s_gen_ttl_lock); 2942 session->s_cap_gen++; 2943 spin_unlock(&session->s_gen_ttl_lock); 2944 2945 spin_lock(&session->s_cap_lock); 2946 /* don't know if session is readonly */ 2947 session->s_readonly = 0; 2948 /* 2949 * notify __ceph_remove_cap() that we are composing cap reconnect. 2950 * If a cap get released before being added to the cap reconnect, 2951 * __ceph_remove_cap() should skip queuing cap release. 2952 */ 2953 session->s_cap_reconnect = 1; 2954 /* drop old cap expires; we're about to reestablish that state */ 2955 cleanup_cap_releases(mdsc, session); 2956 2957 /* trim unused caps to reduce MDS's cache rejoin time */ 2958 if (mdsc->fsc->sb->s_root) 2959 shrink_dcache_parent(mdsc->fsc->sb->s_root); 2960 2961 ceph_con_close(&session->s_con); 2962 ceph_con_open(&session->s_con, 2963 CEPH_ENTITY_TYPE_MDS, mds, 2964 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 2965 2966 /* replay unsafe requests */ 2967 replay_unsafe_requests(mdsc, session); 2968 2969 down_read(&mdsc->snap_rwsem); 2970 2971 /* traverse this session's caps */ 2972 s_nr_caps = session->s_nr_caps; 2973 err = ceph_pagelist_encode_32(pagelist, s_nr_caps); 2974 if (err) 2975 goto fail; 2976 2977 recon_state.nr_caps = 0; 2978 recon_state.pagelist = pagelist; 2979 recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK; 2980 err = iterate_session_caps(session, encode_caps_cb, &recon_state); 2981 if (err < 0) 2982 goto fail; 2983 2984 spin_lock(&session->s_cap_lock); 2985 session->s_cap_reconnect = 0; 2986 spin_unlock(&session->s_cap_lock); 2987 2988 /* 2989 * snaprealms. we provide mds with the ino, seq (version), and 2990 * parent for all of our realms. If the mds has any newer info, 2991 * it will tell us. 2992 */ 2993 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 2994 struct ceph_snap_realm *realm = 2995 rb_entry(p, struct ceph_snap_realm, node); 2996 struct ceph_mds_snaprealm_reconnect sr_rec; 2997 2998 dout(" adding snap realm %llx seq %lld parent %llx\n", 2999 realm->ino, realm->seq, realm->parent_ino); 3000 sr_rec.ino = cpu_to_le64(realm->ino); 3001 sr_rec.seq = cpu_to_le64(realm->seq); 3002 sr_rec.parent = cpu_to_le64(realm->parent_ino); 3003 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 3004 if (err) 3005 goto fail; 3006 } 3007 3008 if (recon_state.flock) 3009 reply->hdr.version = cpu_to_le16(2); 3010 3011 /* raced with cap release? */ 3012 if (s_nr_caps != recon_state.nr_caps) { 3013 struct page *page = list_first_entry(&pagelist->head, 3014 struct page, lru); 3015 __le32 *addr = kmap_atomic(page); 3016 *addr = cpu_to_le32(recon_state.nr_caps); 3017 kunmap_atomic(addr); 3018 } 3019 3020 reply->hdr.data_len = cpu_to_le32(pagelist->length); 3021 ceph_msg_data_add_pagelist(reply, pagelist); 3022 3023 ceph_early_kick_flushing_caps(mdsc, session); 3024 3025 ceph_con_send(&session->s_con, reply); 3026 3027 mutex_unlock(&session->s_mutex); 3028 3029 mutex_lock(&mdsc->mutex); 3030 __wake_requests(mdsc, &session->s_waiting); 3031 mutex_unlock(&mdsc->mutex); 3032 3033 up_read(&mdsc->snap_rwsem); 3034 return; 3035 3036 fail: 3037 ceph_msg_put(reply); 3038 up_read(&mdsc->snap_rwsem); 3039 mutex_unlock(&session->s_mutex); 3040 fail_nomsg: 3041 ceph_pagelist_release(pagelist); 3042 fail_nopagelist: 3043 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 3044 return; 3045 } 3046 3047 3048 /* 3049 * compare old and new mdsmaps, kicking requests 3050 * and closing out old connections as necessary 3051 * 3052 * called under mdsc->mutex. 3053 */ 3054 static void check_new_map(struct ceph_mds_client *mdsc, 3055 struct ceph_mdsmap *newmap, 3056 struct ceph_mdsmap *oldmap) 3057 { 3058 int i; 3059 int oldstate, newstate; 3060 struct ceph_mds_session *s; 3061 3062 dout("check_new_map new %u old %u\n", 3063 newmap->m_epoch, oldmap->m_epoch); 3064 3065 for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) { 3066 if (mdsc->sessions[i] == NULL) 3067 continue; 3068 s = mdsc->sessions[i]; 3069 oldstate = ceph_mdsmap_get_state(oldmap, i); 3070 newstate = ceph_mdsmap_get_state(newmap, i); 3071 3072 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 3073 i, ceph_mds_state_name(oldstate), 3074 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 3075 ceph_mds_state_name(newstate), 3076 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 3077 ceph_session_state_name(s->s_state)); 3078 3079 if (i >= newmap->m_max_mds || 3080 memcmp(ceph_mdsmap_get_addr(oldmap, i), 3081 ceph_mdsmap_get_addr(newmap, i), 3082 sizeof(struct ceph_entity_addr))) { 3083 if (s->s_state == CEPH_MDS_SESSION_OPENING) { 3084 /* the session never opened, just close it 3085 * out now */ 3086 __wake_requests(mdsc, &s->s_waiting); 3087 __unregister_session(mdsc, s); 3088 } else { 3089 /* just close it */ 3090 mutex_unlock(&mdsc->mutex); 3091 mutex_lock(&s->s_mutex); 3092 mutex_lock(&mdsc->mutex); 3093 ceph_con_close(&s->s_con); 3094 mutex_unlock(&s->s_mutex); 3095 s->s_state = CEPH_MDS_SESSION_RESTARTING; 3096 } 3097 } else if (oldstate == newstate) { 3098 continue; /* nothing new with this mds */ 3099 } 3100 3101 /* 3102 * send reconnect? 3103 */ 3104 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 3105 newstate >= CEPH_MDS_STATE_RECONNECT) { 3106 mutex_unlock(&mdsc->mutex); 3107 send_mds_reconnect(mdsc, s); 3108 mutex_lock(&mdsc->mutex); 3109 } 3110 3111 /* 3112 * kick request on any mds that has gone active. 3113 */ 3114 if (oldstate < CEPH_MDS_STATE_ACTIVE && 3115 newstate >= CEPH_MDS_STATE_ACTIVE) { 3116 if (oldstate != CEPH_MDS_STATE_CREATING && 3117 oldstate != CEPH_MDS_STATE_STARTING) 3118 pr_info("mds%d recovery completed\n", s->s_mds); 3119 kick_requests(mdsc, i); 3120 ceph_kick_flushing_caps(mdsc, s); 3121 wake_up_session_caps(s, 1); 3122 } 3123 } 3124 3125 for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) { 3126 s = mdsc->sessions[i]; 3127 if (!s) 3128 continue; 3129 if (!ceph_mdsmap_is_laggy(newmap, i)) 3130 continue; 3131 if (s->s_state == CEPH_MDS_SESSION_OPEN || 3132 s->s_state == CEPH_MDS_SESSION_HUNG || 3133 s->s_state == CEPH_MDS_SESSION_CLOSING) { 3134 dout(" connecting to export targets of laggy mds%d\n", 3135 i); 3136 __open_export_target_sessions(mdsc, s); 3137 } 3138 } 3139 } 3140 3141 3142 3143 /* 3144 * leases 3145 */ 3146 3147 /* 3148 * caller must hold session s_mutex, dentry->d_lock 3149 */ 3150 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 3151 { 3152 struct ceph_dentry_info *di = ceph_dentry(dentry); 3153 3154 ceph_put_mds_session(di->lease_session); 3155 di->lease_session = NULL; 3156 } 3157 3158 static void handle_lease(struct ceph_mds_client *mdsc, 3159 struct ceph_mds_session *session, 3160 struct ceph_msg *msg) 3161 { 3162 struct super_block *sb = mdsc->fsc->sb; 3163 struct inode *inode; 3164 struct dentry *parent, *dentry; 3165 struct ceph_dentry_info *di; 3166 int mds = session->s_mds; 3167 struct ceph_mds_lease *h = msg->front.iov_base; 3168 u32 seq; 3169 struct ceph_vino vino; 3170 struct qstr dname; 3171 int release = 0; 3172 3173 dout("handle_lease from mds%d\n", mds); 3174 3175 /* decode */ 3176 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 3177 goto bad; 3178 vino.ino = le64_to_cpu(h->ino); 3179 vino.snap = CEPH_NOSNAP; 3180 seq = le32_to_cpu(h->seq); 3181 dname.name = (void *)h + sizeof(*h) + sizeof(u32); 3182 dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32); 3183 if (dname.len != get_unaligned_le32(h+1)) 3184 goto bad; 3185 3186 /* lookup inode */ 3187 inode = ceph_find_inode(sb, vino); 3188 dout("handle_lease %s, ino %llx %p %.*s\n", 3189 ceph_lease_op_name(h->action), vino.ino, inode, 3190 dname.len, dname.name); 3191 3192 mutex_lock(&session->s_mutex); 3193 session->s_seq++; 3194 3195 if (inode == NULL) { 3196 dout("handle_lease no inode %llx\n", vino.ino); 3197 goto release; 3198 } 3199 3200 /* dentry */ 3201 parent = d_find_alias(inode); 3202 if (!parent) { 3203 dout("no parent dentry on inode %p\n", inode); 3204 WARN_ON(1); 3205 goto release; /* hrm... */ 3206 } 3207 dname.hash = full_name_hash(dname.name, dname.len); 3208 dentry = d_lookup(parent, &dname); 3209 dput(parent); 3210 if (!dentry) 3211 goto release; 3212 3213 spin_lock(&dentry->d_lock); 3214 di = ceph_dentry(dentry); 3215 switch (h->action) { 3216 case CEPH_MDS_LEASE_REVOKE: 3217 if (di->lease_session == session) { 3218 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 3219 h->seq = cpu_to_le32(di->lease_seq); 3220 __ceph_mdsc_drop_dentry_lease(dentry); 3221 } 3222 release = 1; 3223 break; 3224 3225 case CEPH_MDS_LEASE_RENEW: 3226 if (di->lease_session == session && 3227 di->lease_gen == session->s_cap_gen && 3228 di->lease_renew_from && 3229 di->lease_renew_after == 0) { 3230 unsigned long duration = 3231 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 3232 3233 di->lease_seq = seq; 3234 dentry->d_time = di->lease_renew_from + duration; 3235 di->lease_renew_after = di->lease_renew_from + 3236 (duration >> 1); 3237 di->lease_renew_from = 0; 3238 } 3239 break; 3240 } 3241 spin_unlock(&dentry->d_lock); 3242 dput(dentry); 3243 3244 if (!release) 3245 goto out; 3246 3247 release: 3248 /* let's just reuse the same message */ 3249 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 3250 ceph_msg_get(msg); 3251 ceph_con_send(&session->s_con, msg); 3252 3253 out: 3254 iput(inode); 3255 mutex_unlock(&session->s_mutex); 3256 return; 3257 3258 bad: 3259 pr_err("corrupt lease message\n"); 3260 ceph_msg_dump(msg); 3261 } 3262 3263 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 3264 struct inode *inode, 3265 struct dentry *dentry, char action, 3266 u32 seq) 3267 { 3268 struct ceph_msg *msg; 3269 struct ceph_mds_lease *lease; 3270 int len = sizeof(*lease) + sizeof(u32); 3271 int dnamelen = 0; 3272 3273 dout("lease_send_msg inode %p dentry %p %s to mds%d\n", 3274 inode, dentry, ceph_lease_op_name(action), session->s_mds); 3275 dnamelen = dentry->d_name.len; 3276 len += dnamelen; 3277 3278 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 3279 if (!msg) 3280 return; 3281 lease = msg->front.iov_base; 3282 lease->action = action; 3283 lease->ino = cpu_to_le64(ceph_vino(inode).ino); 3284 lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap); 3285 lease->seq = cpu_to_le32(seq); 3286 put_unaligned_le32(dnamelen, lease + 1); 3287 memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen); 3288 3289 /* 3290 * if this is a preemptive lease RELEASE, no need to 3291 * flush request stream, since the actual request will 3292 * soon follow. 3293 */ 3294 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 3295 3296 ceph_con_send(&session->s_con, msg); 3297 } 3298 3299 /* 3300 * Preemptively release a lease we expect to invalidate anyway. 3301 * Pass @inode always, @dentry is optional. 3302 */ 3303 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, 3304 struct dentry *dentry) 3305 { 3306 struct ceph_dentry_info *di; 3307 struct ceph_mds_session *session; 3308 u32 seq; 3309 3310 BUG_ON(inode == NULL); 3311 BUG_ON(dentry == NULL); 3312 3313 /* is dentry lease valid? */ 3314 spin_lock(&dentry->d_lock); 3315 di = ceph_dentry(dentry); 3316 if (!di || !di->lease_session || 3317 di->lease_session->s_mds < 0 || 3318 di->lease_gen != di->lease_session->s_cap_gen || 3319 !time_before(jiffies, dentry->d_time)) { 3320 dout("lease_release inode %p dentry %p -- " 3321 "no lease\n", 3322 inode, dentry); 3323 spin_unlock(&dentry->d_lock); 3324 return; 3325 } 3326 3327 /* we do have a lease on this dentry; note mds and seq */ 3328 session = ceph_get_mds_session(di->lease_session); 3329 seq = di->lease_seq; 3330 __ceph_mdsc_drop_dentry_lease(dentry); 3331 spin_unlock(&dentry->d_lock); 3332 3333 dout("lease_release inode %p dentry %p to mds%d\n", 3334 inode, dentry, session->s_mds); 3335 ceph_mdsc_lease_send_msg(session, inode, dentry, 3336 CEPH_MDS_LEASE_RELEASE, seq); 3337 ceph_put_mds_session(session); 3338 } 3339 3340 /* 3341 * drop all leases (and dentry refs) in preparation for umount 3342 */ 3343 static void drop_leases(struct ceph_mds_client *mdsc) 3344 { 3345 int i; 3346 3347 dout("drop_leases\n"); 3348 mutex_lock(&mdsc->mutex); 3349 for (i = 0; i < mdsc->max_sessions; i++) { 3350 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 3351 if (!s) 3352 continue; 3353 mutex_unlock(&mdsc->mutex); 3354 mutex_lock(&s->s_mutex); 3355 mutex_unlock(&s->s_mutex); 3356 ceph_put_mds_session(s); 3357 mutex_lock(&mdsc->mutex); 3358 } 3359 mutex_unlock(&mdsc->mutex); 3360 } 3361 3362 3363 3364 /* 3365 * delayed work -- periodically trim expired leases, renew caps with mds 3366 */ 3367 static void schedule_delayed(struct ceph_mds_client *mdsc) 3368 { 3369 int delay = 5; 3370 unsigned hz = round_jiffies_relative(HZ * delay); 3371 schedule_delayed_work(&mdsc->delayed_work, hz); 3372 } 3373 3374 static void delayed_work(struct work_struct *work) 3375 { 3376 int i; 3377 struct ceph_mds_client *mdsc = 3378 container_of(work, struct ceph_mds_client, delayed_work.work); 3379 int renew_interval; 3380 int renew_caps; 3381 3382 dout("mdsc delayed_work\n"); 3383 ceph_check_delayed_caps(mdsc); 3384 3385 mutex_lock(&mdsc->mutex); 3386 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 3387 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 3388 mdsc->last_renew_caps); 3389 if (renew_caps) 3390 mdsc->last_renew_caps = jiffies; 3391 3392 for (i = 0; i < mdsc->max_sessions; i++) { 3393 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 3394 if (s == NULL) 3395 continue; 3396 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 3397 dout("resending session close request for mds%d\n", 3398 s->s_mds); 3399 request_close_session(mdsc, s); 3400 ceph_put_mds_session(s); 3401 continue; 3402 } 3403 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 3404 if (s->s_state == CEPH_MDS_SESSION_OPEN) { 3405 s->s_state = CEPH_MDS_SESSION_HUNG; 3406 pr_info("mds%d hung\n", s->s_mds); 3407 } 3408 } 3409 if (s->s_state < CEPH_MDS_SESSION_OPEN) { 3410 /* this mds is failed or recovering, just wait */ 3411 ceph_put_mds_session(s); 3412 continue; 3413 } 3414 mutex_unlock(&mdsc->mutex); 3415 3416 mutex_lock(&s->s_mutex); 3417 if (renew_caps) 3418 send_renew_caps(mdsc, s); 3419 else 3420 ceph_con_keepalive(&s->s_con); 3421 if (s->s_state == CEPH_MDS_SESSION_OPEN || 3422 s->s_state == CEPH_MDS_SESSION_HUNG) 3423 ceph_send_cap_releases(mdsc, s); 3424 mutex_unlock(&s->s_mutex); 3425 ceph_put_mds_session(s); 3426 3427 mutex_lock(&mdsc->mutex); 3428 } 3429 mutex_unlock(&mdsc->mutex); 3430 3431 schedule_delayed(mdsc); 3432 } 3433 3434 int ceph_mdsc_init(struct ceph_fs_client *fsc) 3435 3436 { 3437 struct ceph_mds_client *mdsc; 3438 3439 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 3440 if (!mdsc) 3441 return -ENOMEM; 3442 mdsc->fsc = fsc; 3443 fsc->mdsc = mdsc; 3444 mutex_init(&mdsc->mutex); 3445 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 3446 if (mdsc->mdsmap == NULL) { 3447 kfree(mdsc); 3448 return -ENOMEM; 3449 } 3450 3451 init_completion(&mdsc->safe_umount_waiters); 3452 init_waitqueue_head(&mdsc->session_close_wq); 3453 INIT_LIST_HEAD(&mdsc->waiting_for_map); 3454 mdsc->sessions = NULL; 3455 atomic_set(&mdsc->num_sessions, 0); 3456 mdsc->max_sessions = 0; 3457 mdsc->stopping = 0; 3458 mdsc->last_snap_seq = 0; 3459 init_rwsem(&mdsc->snap_rwsem); 3460 mdsc->snap_realms = RB_ROOT; 3461 INIT_LIST_HEAD(&mdsc->snap_empty); 3462 spin_lock_init(&mdsc->snap_empty_lock); 3463 mdsc->last_tid = 0; 3464 mdsc->oldest_tid = 0; 3465 mdsc->request_tree = RB_ROOT; 3466 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 3467 mdsc->last_renew_caps = jiffies; 3468 INIT_LIST_HEAD(&mdsc->cap_delay_list); 3469 spin_lock_init(&mdsc->cap_delay_lock); 3470 INIT_LIST_HEAD(&mdsc->snap_flush_list); 3471 spin_lock_init(&mdsc->snap_flush_lock); 3472 mdsc->last_cap_flush_tid = 1; 3473 mdsc->cap_flush_tree = RB_ROOT; 3474 INIT_LIST_HEAD(&mdsc->cap_dirty); 3475 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 3476 mdsc->num_cap_flushing = 0; 3477 spin_lock_init(&mdsc->cap_dirty_lock); 3478 init_waitqueue_head(&mdsc->cap_flushing_wq); 3479 spin_lock_init(&mdsc->dentry_lru_lock); 3480 INIT_LIST_HEAD(&mdsc->dentry_lru); 3481 3482 ceph_caps_init(mdsc); 3483 ceph_adjust_min_caps(mdsc, fsc->min_caps); 3484 3485 init_rwsem(&mdsc->pool_perm_rwsem); 3486 mdsc->pool_perm_tree = RB_ROOT; 3487 3488 return 0; 3489 } 3490 3491 /* 3492 * Wait for safe replies on open mds requests. If we time out, drop 3493 * all requests from the tree to avoid dangling dentry refs. 3494 */ 3495 static void wait_requests(struct ceph_mds_client *mdsc) 3496 { 3497 struct ceph_options *opts = mdsc->fsc->client->options; 3498 struct ceph_mds_request *req; 3499 3500 mutex_lock(&mdsc->mutex); 3501 if (__get_oldest_req(mdsc)) { 3502 mutex_unlock(&mdsc->mutex); 3503 3504 dout("wait_requests waiting for requests\n"); 3505 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 3506 ceph_timeout_jiffies(opts->mount_timeout)); 3507 3508 /* tear down remaining requests */ 3509 mutex_lock(&mdsc->mutex); 3510 while ((req = __get_oldest_req(mdsc))) { 3511 dout("wait_requests timed out on tid %llu\n", 3512 req->r_tid); 3513 __unregister_request(mdsc, req); 3514 } 3515 } 3516 mutex_unlock(&mdsc->mutex); 3517 dout("wait_requests done\n"); 3518 } 3519 3520 /* 3521 * called before mount is ro, and before dentries are torn down. 3522 * (hmm, does this still race with new lookups?) 3523 */ 3524 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 3525 { 3526 dout("pre_umount\n"); 3527 mdsc->stopping = 1; 3528 3529 drop_leases(mdsc); 3530 ceph_flush_dirty_caps(mdsc); 3531 wait_requests(mdsc); 3532 3533 /* 3534 * wait for reply handlers to drop their request refs and 3535 * their inode/dcache refs 3536 */ 3537 ceph_msgr_flush(); 3538 } 3539 3540 /* 3541 * wait for all write mds requests to flush. 3542 */ 3543 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 3544 { 3545 struct ceph_mds_request *req = NULL, *nextreq; 3546 struct rb_node *n; 3547 3548 mutex_lock(&mdsc->mutex); 3549 dout("wait_unsafe_requests want %lld\n", want_tid); 3550 restart: 3551 req = __get_oldest_req(mdsc); 3552 while (req && req->r_tid <= want_tid) { 3553 /* find next request */ 3554 n = rb_next(&req->r_node); 3555 if (n) 3556 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 3557 else 3558 nextreq = NULL; 3559 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 3560 (req->r_op & CEPH_MDS_OP_WRITE)) { 3561 /* write op */ 3562 ceph_mdsc_get_request(req); 3563 if (nextreq) 3564 ceph_mdsc_get_request(nextreq); 3565 mutex_unlock(&mdsc->mutex); 3566 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 3567 req->r_tid, want_tid); 3568 wait_for_completion(&req->r_safe_completion); 3569 mutex_lock(&mdsc->mutex); 3570 ceph_mdsc_put_request(req); 3571 if (!nextreq) 3572 break; /* next dne before, so we're done! */ 3573 if (RB_EMPTY_NODE(&nextreq->r_node)) { 3574 /* next request was removed from tree */ 3575 ceph_mdsc_put_request(nextreq); 3576 goto restart; 3577 } 3578 ceph_mdsc_put_request(nextreq); /* won't go away */ 3579 } 3580 req = nextreq; 3581 } 3582 mutex_unlock(&mdsc->mutex); 3583 dout("wait_unsafe_requests done\n"); 3584 } 3585 3586 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 3587 { 3588 u64 want_tid, want_flush, want_snap; 3589 3590 if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 3591 return; 3592 3593 dout("sync\n"); 3594 mutex_lock(&mdsc->mutex); 3595 want_tid = mdsc->last_tid; 3596 mutex_unlock(&mdsc->mutex); 3597 3598 ceph_flush_dirty_caps(mdsc); 3599 spin_lock(&mdsc->cap_dirty_lock); 3600 want_flush = mdsc->last_cap_flush_tid; 3601 spin_unlock(&mdsc->cap_dirty_lock); 3602 3603 down_read(&mdsc->snap_rwsem); 3604 want_snap = mdsc->last_snap_seq; 3605 up_read(&mdsc->snap_rwsem); 3606 3607 dout("sync want tid %lld flush_seq %lld snap_seq %lld\n", 3608 want_tid, want_flush, want_snap); 3609 3610 wait_unsafe_requests(mdsc, want_tid); 3611 wait_caps_flush(mdsc, want_flush, want_snap); 3612 } 3613 3614 /* 3615 * true if all sessions are closed, or we force unmount 3616 */ 3617 static bool done_closing_sessions(struct ceph_mds_client *mdsc) 3618 { 3619 if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 3620 return true; 3621 return atomic_read(&mdsc->num_sessions) == 0; 3622 } 3623 3624 /* 3625 * called after sb is ro. 3626 */ 3627 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 3628 { 3629 struct ceph_options *opts = mdsc->fsc->client->options; 3630 struct ceph_mds_session *session; 3631 int i; 3632 3633 dout("close_sessions\n"); 3634 3635 /* close sessions */ 3636 mutex_lock(&mdsc->mutex); 3637 for (i = 0; i < mdsc->max_sessions; i++) { 3638 session = __ceph_lookup_mds_session(mdsc, i); 3639 if (!session) 3640 continue; 3641 mutex_unlock(&mdsc->mutex); 3642 mutex_lock(&session->s_mutex); 3643 __close_session(mdsc, session); 3644 mutex_unlock(&session->s_mutex); 3645 ceph_put_mds_session(session); 3646 mutex_lock(&mdsc->mutex); 3647 } 3648 mutex_unlock(&mdsc->mutex); 3649 3650 dout("waiting for sessions to close\n"); 3651 wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc), 3652 ceph_timeout_jiffies(opts->mount_timeout)); 3653 3654 /* tear down remaining sessions */ 3655 mutex_lock(&mdsc->mutex); 3656 for (i = 0; i < mdsc->max_sessions; i++) { 3657 if (mdsc->sessions[i]) { 3658 session = get_session(mdsc->sessions[i]); 3659 __unregister_session(mdsc, session); 3660 mutex_unlock(&mdsc->mutex); 3661 mutex_lock(&session->s_mutex); 3662 remove_session_caps(session); 3663 mutex_unlock(&session->s_mutex); 3664 ceph_put_mds_session(session); 3665 mutex_lock(&mdsc->mutex); 3666 } 3667 } 3668 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 3669 mutex_unlock(&mdsc->mutex); 3670 3671 ceph_cleanup_empty_realms(mdsc); 3672 3673 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 3674 3675 dout("stopped\n"); 3676 } 3677 3678 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 3679 { 3680 struct ceph_mds_session *session; 3681 int mds; 3682 3683 dout("force umount\n"); 3684 3685 mutex_lock(&mdsc->mutex); 3686 for (mds = 0; mds < mdsc->max_sessions; mds++) { 3687 session = __ceph_lookup_mds_session(mdsc, mds); 3688 if (!session) 3689 continue; 3690 mutex_unlock(&mdsc->mutex); 3691 mutex_lock(&session->s_mutex); 3692 __close_session(mdsc, session); 3693 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 3694 cleanup_session_requests(mdsc, session); 3695 remove_session_caps(session); 3696 } 3697 mutex_unlock(&session->s_mutex); 3698 ceph_put_mds_session(session); 3699 mutex_lock(&mdsc->mutex); 3700 kick_requests(mdsc, mds); 3701 } 3702 __wake_requests(mdsc, &mdsc->waiting_for_map); 3703 mutex_unlock(&mdsc->mutex); 3704 } 3705 3706 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 3707 { 3708 dout("stop\n"); 3709 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 3710 if (mdsc->mdsmap) 3711 ceph_mdsmap_destroy(mdsc->mdsmap); 3712 kfree(mdsc->sessions); 3713 ceph_caps_finalize(mdsc); 3714 ceph_pool_perm_destroy(mdsc); 3715 } 3716 3717 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 3718 { 3719 struct ceph_mds_client *mdsc = fsc->mdsc; 3720 3721 dout("mdsc_destroy %p\n", mdsc); 3722 ceph_mdsc_stop(mdsc); 3723 3724 /* flush out any connection work with references to us */ 3725 ceph_msgr_flush(); 3726 3727 fsc->mdsc = NULL; 3728 kfree(mdsc); 3729 dout("mdsc_destroy %p done\n", mdsc); 3730 } 3731 3732 3733 /* 3734 * handle mds map update. 3735 */ 3736 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 3737 { 3738 u32 epoch; 3739 u32 maplen; 3740 void *p = msg->front.iov_base; 3741 void *end = p + msg->front.iov_len; 3742 struct ceph_mdsmap *newmap, *oldmap; 3743 struct ceph_fsid fsid; 3744 int err = -EINVAL; 3745 3746 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 3747 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 3748 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 3749 return; 3750 epoch = ceph_decode_32(&p); 3751 maplen = ceph_decode_32(&p); 3752 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 3753 3754 /* do we need it? */ 3755 mutex_lock(&mdsc->mutex); 3756 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 3757 dout("handle_map epoch %u <= our %u\n", 3758 epoch, mdsc->mdsmap->m_epoch); 3759 mutex_unlock(&mdsc->mutex); 3760 return; 3761 } 3762 3763 newmap = ceph_mdsmap_decode(&p, end); 3764 if (IS_ERR(newmap)) { 3765 err = PTR_ERR(newmap); 3766 goto bad_unlock; 3767 } 3768 3769 /* swap into place */ 3770 if (mdsc->mdsmap) { 3771 oldmap = mdsc->mdsmap; 3772 mdsc->mdsmap = newmap; 3773 check_new_map(mdsc, newmap, oldmap); 3774 ceph_mdsmap_destroy(oldmap); 3775 } else { 3776 mdsc->mdsmap = newmap; /* first mds map */ 3777 } 3778 mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size; 3779 3780 __wake_requests(mdsc, &mdsc->waiting_for_map); 3781 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 3782 mdsc->mdsmap->m_epoch); 3783 3784 mutex_unlock(&mdsc->mutex); 3785 schedule_delayed(mdsc); 3786 return; 3787 3788 bad_unlock: 3789 mutex_unlock(&mdsc->mutex); 3790 bad: 3791 pr_err("error decoding mdsmap %d\n", err); 3792 return; 3793 } 3794 3795 static struct ceph_connection *con_get(struct ceph_connection *con) 3796 { 3797 struct ceph_mds_session *s = con->private; 3798 3799 if (get_session(s)) { 3800 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref)); 3801 return con; 3802 } 3803 dout("mdsc con_get %p FAIL\n", s); 3804 return NULL; 3805 } 3806 3807 static void con_put(struct ceph_connection *con) 3808 { 3809 struct ceph_mds_session *s = con->private; 3810 3811 dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1); 3812 ceph_put_mds_session(s); 3813 } 3814 3815 /* 3816 * if the client is unresponsive for long enough, the mds will kill 3817 * the session entirely. 3818 */ 3819 static void peer_reset(struct ceph_connection *con) 3820 { 3821 struct ceph_mds_session *s = con->private; 3822 struct ceph_mds_client *mdsc = s->s_mdsc; 3823 3824 pr_warn("mds%d closed our session\n", s->s_mds); 3825 send_mds_reconnect(mdsc, s); 3826 } 3827 3828 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 3829 { 3830 struct ceph_mds_session *s = con->private; 3831 struct ceph_mds_client *mdsc = s->s_mdsc; 3832 int type = le16_to_cpu(msg->hdr.type); 3833 3834 mutex_lock(&mdsc->mutex); 3835 if (__verify_registered_session(mdsc, s) < 0) { 3836 mutex_unlock(&mdsc->mutex); 3837 goto out; 3838 } 3839 mutex_unlock(&mdsc->mutex); 3840 3841 switch (type) { 3842 case CEPH_MSG_MDS_MAP: 3843 ceph_mdsc_handle_map(mdsc, msg); 3844 break; 3845 case CEPH_MSG_CLIENT_SESSION: 3846 handle_session(s, msg); 3847 break; 3848 case CEPH_MSG_CLIENT_REPLY: 3849 handle_reply(s, msg); 3850 break; 3851 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 3852 handle_forward(mdsc, s, msg); 3853 break; 3854 case CEPH_MSG_CLIENT_CAPS: 3855 ceph_handle_caps(s, msg); 3856 break; 3857 case CEPH_MSG_CLIENT_SNAP: 3858 ceph_handle_snap(mdsc, s, msg); 3859 break; 3860 case CEPH_MSG_CLIENT_LEASE: 3861 handle_lease(mdsc, s, msg); 3862 break; 3863 3864 default: 3865 pr_err("received unknown message type %d %s\n", type, 3866 ceph_msg_type_name(type)); 3867 } 3868 out: 3869 ceph_msg_put(msg); 3870 } 3871 3872 /* 3873 * authentication 3874 */ 3875 3876 /* 3877 * Note: returned pointer is the address of a structure that's 3878 * managed separately. Caller must *not* attempt to free it. 3879 */ 3880 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 3881 int *proto, int force_new) 3882 { 3883 struct ceph_mds_session *s = con->private; 3884 struct ceph_mds_client *mdsc = s->s_mdsc; 3885 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3886 struct ceph_auth_handshake *auth = &s->s_auth; 3887 3888 if (force_new && auth->authorizer) { 3889 ceph_auth_destroy_authorizer(auth->authorizer); 3890 auth->authorizer = NULL; 3891 } 3892 if (!auth->authorizer) { 3893 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 3894 auth); 3895 if (ret) 3896 return ERR_PTR(ret); 3897 } else { 3898 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 3899 auth); 3900 if (ret) 3901 return ERR_PTR(ret); 3902 } 3903 *proto = ac->protocol; 3904 3905 return auth; 3906 } 3907 3908 3909 static int verify_authorizer_reply(struct ceph_connection *con, int len) 3910 { 3911 struct ceph_mds_session *s = con->private; 3912 struct ceph_mds_client *mdsc = s->s_mdsc; 3913 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3914 3915 return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer, len); 3916 } 3917 3918 static int invalidate_authorizer(struct ceph_connection *con) 3919 { 3920 struct ceph_mds_session *s = con->private; 3921 struct ceph_mds_client *mdsc = s->s_mdsc; 3922 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3923 3924 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 3925 3926 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 3927 } 3928 3929 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 3930 struct ceph_msg_header *hdr, int *skip) 3931 { 3932 struct ceph_msg *msg; 3933 int type = (int) le16_to_cpu(hdr->type); 3934 int front_len = (int) le32_to_cpu(hdr->front_len); 3935 3936 if (con->in_msg) 3937 return con->in_msg; 3938 3939 *skip = 0; 3940 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 3941 if (!msg) { 3942 pr_err("unable to allocate msg type %d len %d\n", 3943 type, front_len); 3944 return NULL; 3945 } 3946 3947 return msg; 3948 } 3949 3950 static int mds_sign_message(struct ceph_msg *msg) 3951 { 3952 struct ceph_mds_session *s = msg->con->private; 3953 struct ceph_auth_handshake *auth = &s->s_auth; 3954 3955 return ceph_auth_sign_message(auth, msg); 3956 } 3957 3958 static int mds_check_message_signature(struct ceph_msg *msg) 3959 { 3960 struct ceph_mds_session *s = msg->con->private; 3961 struct ceph_auth_handshake *auth = &s->s_auth; 3962 3963 return ceph_auth_check_message_signature(auth, msg); 3964 } 3965 3966 static const struct ceph_connection_operations mds_con_ops = { 3967 .get = con_get, 3968 .put = con_put, 3969 .dispatch = dispatch, 3970 .get_authorizer = get_authorizer, 3971 .verify_authorizer_reply = verify_authorizer_reply, 3972 .invalidate_authorizer = invalidate_authorizer, 3973 .peer_reset = peer_reset, 3974 .alloc_msg = mds_alloc_msg, 3975 .sign_message = mds_sign_message, 3976 .check_message_signature = mds_check_message_signature, 3977 }; 3978 3979 /* eof */ 3980