1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/fs.h> 4 #include <linux/wait.h> 5 #include <linux/slab.h> 6 #include <linux/sched.h> 7 #include <linux/debugfs.h> 8 #include <linux/seq_file.h> 9 10 #include "super.h" 11 #include "mds_client.h" 12 13 #include <linux/ceph/ceph_features.h> 14 #include <linux/ceph/messenger.h> 15 #include <linux/ceph/decode.h> 16 #include <linux/ceph/pagelist.h> 17 #include <linux/ceph/auth.h> 18 #include <linux/ceph/debugfs.h> 19 20 /* 21 * A cluster of MDS (metadata server) daemons is responsible for 22 * managing the file system namespace (the directory hierarchy and 23 * inodes) and for coordinating shared access to storage. Metadata is 24 * partitioning hierarchically across a number of servers, and that 25 * partition varies over time as the cluster adjusts the distribution 26 * in order to balance load. 27 * 28 * The MDS client is primarily responsible to managing synchronous 29 * metadata requests for operations like open, unlink, and so forth. 30 * If there is a MDS failure, we find out about it when we (possibly 31 * request and) receive a new MDS map, and can resubmit affected 32 * requests. 33 * 34 * For the most part, though, we take advantage of a lossless 35 * communications channel to the MDS, and do not need to worry about 36 * timing out or resubmitting requests. 37 * 38 * We maintain a stateful "session" with each MDS we interact with. 39 * Within each session, we sent periodic heartbeat messages to ensure 40 * any capabilities or leases we have been issues remain valid. If 41 * the session times out and goes stale, our leases and capabilities 42 * are no longer valid. 43 */ 44 45 struct ceph_reconnect_state { 46 struct ceph_pagelist *pagelist; 47 bool flock; 48 }; 49 50 static void __wake_requests(struct ceph_mds_client *mdsc, 51 struct list_head *head); 52 53 static const struct ceph_connection_operations mds_con_ops; 54 55 56 /* 57 * mds reply parsing 58 */ 59 60 /* 61 * parse individual inode info 62 */ 63 static int parse_reply_info_in(void **p, void *end, 64 struct ceph_mds_reply_info_in *info, 65 int features) 66 { 67 int err = -EIO; 68 69 info->in = *p; 70 *p += sizeof(struct ceph_mds_reply_inode) + 71 sizeof(*info->in->fragtree.splits) * 72 le32_to_cpu(info->in->fragtree.nsplits); 73 74 ceph_decode_32_safe(p, end, info->symlink_len, bad); 75 ceph_decode_need(p, end, info->symlink_len, bad); 76 info->symlink = *p; 77 *p += info->symlink_len; 78 79 if (features & CEPH_FEATURE_DIRLAYOUTHASH) 80 ceph_decode_copy_safe(p, end, &info->dir_layout, 81 sizeof(info->dir_layout), bad); 82 else 83 memset(&info->dir_layout, 0, sizeof(info->dir_layout)); 84 85 ceph_decode_32_safe(p, end, info->xattr_len, bad); 86 ceph_decode_need(p, end, info->xattr_len, bad); 87 info->xattr_data = *p; 88 *p += info->xattr_len; 89 return 0; 90 bad: 91 return err; 92 } 93 94 /* 95 * parse a normal reply, which may contain a (dir+)dentry and/or a 96 * target inode. 97 */ 98 static int parse_reply_info_trace(void **p, void *end, 99 struct ceph_mds_reply_info_parsed *info, 100 int features) 101 { 102 int err; 103 104 if (info->head->is_dentry) { 105 err = parse_reply_info_in(p, end, &info->diri, features); 106 if (err < 0) 107 goto out_bad; 108 109 if (unlikely(*p + sizeof(*info->dirfrag) > end)) 110 goto bad; 111 info->dirfrag = *p; 112 *p += sizeof(*info->dirfrag) + 113 sizeof(u32)*le32_to_cpu(info->dirfrag->ndist); 114 if (unlikely(*p > end)) 115 goto bad; 116 117 ceph_decode_32_safe(p, end, info->dname_len, bad); 118 ceph_decode_need(p, end, info->dname_len, bad); 119 info->dname = *p; 120 *p += info->dname_len; 121 info->dlease = *p; 122 *p += sizeof(*info->dlease); 123 } 124 125 if (info->head->is_target) { 126 err = parse_reply_info_in(p, end, &info->targeti, features); 127 if (err < 0) 128 goto out_bad; 129 } 130 131 if (unlikely(*p != end)) 132 goto bad; 133 return 0; 134 135 bad: 136 err = -EIO; 137 out_bad: 138 pr_err("problem parsing mds trace %d\n", err); 139 return err; 140 } 141 142 /* 143 * parse readdir results 144 */ 145 static int parse_reply_info_dir(void **p, void *end, 146 struct ceph_mds_reply_info_parsed *info, 147 int features) 148 { 149 u32 num, i = 0; 150 int err; 151 152 info->dir_dir = *p; 153 if (*p + sizeof(*info->dir_dir) > end) 154 goto bad; 155 *p += sizeof(*info->dir_dir) + 156 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist); 157 if (*p > end) 158 goto bad; 159 160 ceph_decode_need(p, end, sizeof(num) + 2, bad); 161 num = ceph_decode_32(p); 162 info->dir_end = ceph_decode_8(p); 163 info->dir_complete = ceph_decode_8(p); 164 if (num == 0) 165 goto done; 166 167 /* alloc large array */ 168 info->dir_nr = num; 169 info->dir_in = kcalloc(num, sizeof(*info->dir_in) + 170 sizeof(*info->dir_dname) + 171 sizeof(*info->dir_dname_len) + 172 sizeof(*info->dir_dlease), 173 GFP_NOFS); 174 if (info->dir_in == NULL) { 175 err = -ENOMEM; 176 goto out_bad; 177 } 178 info->dir_dname = (void *)(info->dir_in + num); 179 info->dir_dname_len = (void *)(info->dir_dname + num); 180 info->dir_dlease = (void *)(info->dir_dname_len + num); 181 182 while (num) { 183 /* dentry */ 184 ceph_decode_need(p, end, sizeof(u32)*2, bad); 185 info->dir_dname_len[i] = ceph_decode_32(p); 186 ceph_decode_need(p, end, info->dir_dname_len[i], bad); 187 info->dir_dname[i] = *p; 188 *p += info->dir_dname_len[i]; 189 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i], 190 info->dir_dname[i]); 191 info->dir_dlease[i] = *p; 192 *p += sizeof(struct ceph_mds_reply_lease); 193 194 /* inode */ 195 err = parse_reply_info_in(p, end, &info->dir_in[i], features); 196 if (err < 0) 197 goto out_bad; 198 i++; 199 num--; 200 } 201 202 done: 203 if (*p != end) 204 goto bad; 205 return 0; 206 207 bad: 208 err = -EIO; 209 out_bad: 210 pr_err("problem parsing dir contents %d\n", err); 211 return err; 212 } 213 214 /* 215 * parse fcntl F_GETLK results 216 */ 217 static int parse_reply_info_filelock(void **p, void *end, 218 struct ceph_mds_reply_info_parsed *info, 219 int features) 220 { 221 if (*p + sizeof(*info->filelock_reply) > end) 222 goto bad; 223 224 info->filelock_reply = *p; 225 *p += sizeof(*info->filelock_reply); 226 227 if (unlikely(*p != end)) 228 goto bad; 229 return 0; 230 231 bad: 232 return -EIO; 233 } 234 235 /* 236 * parse create results 237 */ 238 static int parse_reply_info_create(void **p, void *end, 239 struct ceph_mds_reply_info_parsed *info, 240 int features) 241 { 242 if (features & CEPH_FEATURE_REPLY_CREATE_INODE) { 243 if (*p == end) { 244 info->has_create_ino = false; 245 } else { 246 info->has_create_ino = true; 247 info->ino = ceph_decode_64(p); 248 } 249 } 250 251 if (unlikely(*p != end)) 252 goto bad; 253 return 0; 254 255 bad: 256 return -EIO; 257 } 258 259 /* 260 * parse extra results 261 */ 262 static int parse_reply_info_extra(void **p, void *end, 263 struct ceph_mds_reply_info_parsed *info, 264 int features) 265 { 266 if (info->head->op == CEPH_MDS_OP_GETFILELOCK) 267 return parse_reply_info_filelock(p, end, info, features); 268 else if (info->head->op == CEPH_MDS_OP_READDIR) 269 return parse_reply_info_dir(p, end, info, features); 270 else if (info->head->op == CEPH_MDS_OP_CREATE) 271 return parse_reply_info_create(p, end, info, features); 272 else 273 return -EIO; 274 } 275 276 /* 277 * parse entire mds reply 278 */ 279 static int parse_reply_info(struct ceph_msg *msg, 280 struct ceph_mds_reply_info_parsed *info, 281 int features) 282 { 283 void *p, *end; 284 u32 len; 285 int err; 286 287 info->head = msg->front.iov_base; 288 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 289 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 290 291 /* trace */ 292 ceph_decode_32_safe(&p, end, len, bad); 293 if (len > 0) { 294 ceph_decode_need(&p, end, len, bad); 295 err = parse_reply_info_trace(&p, p+len, info, features); 296 if (err < 0) 297 goto out_bad; 298 } 299 300 /* extra */ 301 ceph_decode_32_safe(&p, end, len, bad); 302 if (len > 0) { 303 ceph_decode_need(&p, end, len, bad); 304 err = parse_reply_info_extra(&p, p+len, info, features); 305 if (err < 0) 306 goto out_bad; 307 } 308 309 /* snap blob */ 310 ceph_decode_32_safe(&p, end, len, bad); 311 info->snapblob_len = len; 312 info->snapblob = p; 313 p += len; 314 315 if (p != end) 316 goto bad; 317 return 0; 318 319 bad: 320 err = -EIO; 321 out_bad: 322 pr_err("mds parse_reply err %d\n", err); 323 return err; 324 } 325 326 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 327 { 328 kfree(info->dir_in); 329 } 330 331 332 /* 333 * sessions 334 */ 335 static const char *session_state_name(int s) 336 { 337 switch (s) { 338 case CEPH_MDS_SESSION_NEW: return "new"; 339 case CEPH_MDS_SESSION_OPENING: return "opening"; 340 case CEPH_MDS_SESSION_OPEN: return "open"; 341 case CEPH_MDS_SESSION_HUNG: return "hung"; 342 case CEPH_MDS_SESSION_CLOSING: return "closing"; 343 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 344 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 345 default: return "???"; 346 } 347 } 348 349 static struct ceph_mds_session *get_session(struct ceph_mds_session *s) 350 { 351 if (atomic_inc_not_zero(&s->s_ref)) { 352 dout("mdsc get_session %p %d -> %d\n", s, 353 atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref)); 354 return s; 355 } else { 356 dout("mdsc get_session %p 0 -- FAIL", s); 357 return NULL; 358 } 359 } 360 361 void ceph_put_mds_session(struct ceph_mds_session *s) 362 { 363 dout("mdsc put_session %p %d -> %d\n", s, 364 atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1); 365 if (atomic_dec_and_test(&s->s_ref)) { 366 if (s->s_auth.authorizer) 367 s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer( 368 s->s_mdsc->fsc->client->monc.auth, 369 s->s_auth.authorizer); 370 kfree(s); 371 } 372 } 373 374 /* 375 * called under mdsc->mutex 376 */ 377 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 378 int mds) 379 { 380 struct ceph_mds_session *session; 381 382 if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL) 383 return NULL; 384 session = mdsc->sessions[mds]; 385 dout("lookup_mds_session %p %d\n", session, 386 atomic_read(&session->s_ref)); 387 get_session(session); 388 return session; 389 } 390 391 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 392 { 393 if (mds >= mdsc->max_sessions) 394 return false; 395 return mdsc->sessions[mds]; 396 } 397 398 static int __verify_registered_session(struct ceph_mds_client *mdsc, 399 struct ceph_mds_session *s) 400 { 401 if (s->s_mds >= mdsc->max_sessions || 402 mdsc->sessions[s->s_mds] != s) 403 return -ENOENT; 404 return 0; 405 } 406 407 /* 408 * create+register a new session for given mds. 409 * called under mdsc->mutex. 410 */ 411 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 412 int mds) 413 { 414 struct ceph_mds_session *s; 415 416 s = kzalloc(sizeof(*s), GFP_NOFS); 417 if (!s) 418 return ERR_PTR(-ENOMEM); 419 s->s_mdsc = mdsc; 420 s->s_mds = mds; 421 s->s_state = CEPH_MDS_SESSION_NEW; 422 s->s_ttl = 0; 423 s->s_seq = 0; 424 mutex_init(&s->s_mutex); 425 426 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 427 428 spin_lock_init(&s->s_gen_ttl_lock); 429 s->s_cap_gen = 0; 430 s->s_cap_ttl = jiffies - 1; 431 432 spin_lock_init(&s->s_cap_lock); 433 s->s_renew_requested = 0; 434 s->s_renew_seq = 0; 435 INIT_LIST_HEAD(&s->s_caps); 436 s->s_nr_caps = 0; 437 s->s_trim_caps = 0; 438 atomic_set(&s->s_ref, 1); 439 INIT_LIST_HEAD(&s->s_waiting); 440 INIT_LIST_HEAD(&s->s_unsafe); 441 s->s_num_cap_releases = 0; 442 s->s_cap_iterator = NULL; 443 INIT_LIST_HEAD(&s->s_cap_releases); 444 INIT_LIST_HEAD(&s->s_cap_releases_done); 445 INIT_LIST_HEAD(&s->s_cap_flushing); 446 INIT_LIST_HEAD(&s->s_cap_snaps_flushing); 447 448 dout("register_session mds%d\n", mds); 449 if (mds >= mdsc->max_sessions) { 450 int newmax = 1 << get_count_order(mds+1); 451 struct ceph_mds_session **sa; 452 453 dout("register_session realloc to %d\n", newmax); 454 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 455 if (sa == NULL) 456 goto fail_realloc; 457 if (mdsc->sessions) { 458 memcpy(sa, mdsc->sessions, 459 mdsc->max_sessions * sizeof(void *)); 460 kfree(mdsc->sessions); 461 } 462 mdsc->sessions = sa; 463 mdsc->max_sessions = newmax; 464 } 465 mdsc->sessions[mds] = s; 466 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 467 468 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 469 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 470 471 return s; 472 473 fail_realloc: 474 kfree(s); 475 return ERR_PTR(-ENOMEM); 476 } 477 478 /* 479 * called under mdsc->mutex 480 */ 481 static void __unregister_session(struct ceph_mds_client *mdsc, 482 struct ceph_mds_session *s) 483 { 484 dout("__unregister_session mds%d %p\n", s->s_mds, s); 485 BUG_ON(mdsc->sessions[s->s_mds] != s); 486 mdsc->sessions[s->s_mds] = NULL; 487 ceph_con_close(&s->s_con); 488 ceph_put_mds_session(s); 489 } 490 491 /* 492 * drop session refs in request. 493 * 494 * should be last request ref, or hold mdsc->mutex 495 */ 496 static void put_request_session(struct ceph_mds_request *req) 497 { 498 if (req->r_session) { 499 ceph_put_mds_session(req->r_session); 500 req->r_session = NULL; 501 } 502 } 503 504 void ceph_mdsc_release_request(struct kref *kref) 505 { 506 struct ceph_mds_request *req = container_of(kref, 507 struct ceph_mds_request, 508 r_kref); 509 if (req->r_request) 510 ceph_msg_put(req->r_request); 511 if (req->r_reply) { 512 ceph_msg_put(req->r_reply); 513 destroy_reply_info(&req->r_reply_info); 514 } 515 if (req->r_inode) { 516 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 517 iput(req->r_inode); 518 } 519 if (req->r_locked_dir) 520 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 521 if (req->r_target_inode) 522 iput(req->r_target_inode); 523 if (req->r_dentry) 524 dput(req->r_dentry); 525 if (req->r_old_dentry) { 526 /* 527 * track (and drop pins for) r_old_dentry_dir 528 * separately, since r_old_dentry's d_parent may have 529 * changed between the dir mutex being dropped and 530 * this request being freed. 531 */ 532 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 533 CEPH_CAP_PIN); 534 dput(req->r_old_dentry); 535 iput(req->r_old_dentry_dir); 536 } 537 kfree(req->r_path1); 538 kfree(req->r_path2); 539 put_request_session(req); 540 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 541 kfree(req); 542 } 543 544 /* 545 * lookup session, bump ref if found. 546 * 547 * called under mdsc->mutex. 548 */ 549 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc, 550 u64 tid) 551 { 552 struct ceph_mds_request *req; 553 struct rb_node *n = mdsc->request_tree.rb_node; 554 555 while (n) { 556 req = rb_entry(n, struct ceph_mds_request, r_node); 557 if (tid < req->r_tid) 558 n = n->rb_left; 559 else if (tid > req->r_tid) 560 n = n->rb_right; 561 else { 562 ceph_mdsc_get_request(req); 563 return req; 564 } 565 } 566 return NULL; 567 } 568 569 static void __insert_request(struct ceph_mds_client *mdsc, 570 struct ceph_mds_request *new) 571 { 572 struct rb_node **p = &mdsc->request_tree.rb_node; 573 struct rb_node *parent = NULL; 574 struct ceph_mds_request *req = NULL; 575 576 while (*p) { 577 parent = *p; 578 req = rb_entry(parent, struct ceph_mds_request, r_node); 579 if (new->r_tid < req->r_tid) 580 p = &(*p)->rb_left; 581 else if (new->r_tid > req->r_tid) 582 p = &(*p)->rb_right; 583 else 584 BUG(); 585 } 586 587 rb_link_node(&new->r_node, parent, p); 588 rb_insert_color(&new->r_node, &mdsc->request_tree); 589 } 590 591 /* 592 * Register an in-flight request, and assign a tid. Link to directory 593 * are modifying (if any). 594 * 595 * Called under mdsc->mutex. 596 */ 597 static void __register_request(struct ceph_mds_client *mdsc, 598 struct ceph_mds_request *req, 599 struct inode *dir) 600 { 601 req->r_tid = ++mdsc->last_tid; 602 if (req->r_num_caps) 603 ceph_reserve_caps(mdsc, &req->r_caps_reservation, 604 req->r_num_caps); 605 dout("__register_request %p tid %lld\n", req, req->r_tid); 606 ceph_mdsc_get_request(req); 607 __insert_request(mdsc, req); 608 609 req->r_uid = current_fsuid(); 610 req->r_gid = current_fsgid(); 611 612 if (dir) { 613 struct ceph_inode_info *ci = ceph_inode(dir); 614 615 ihold(dir); 616 spin_lock(&ci->i_unsafe_lock); 617 req->r_unsafe_dir = dir; 618 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 619 spin_unlock(&ci->i_unsafe_lock); 620 } 621 } 622 623 static void __unregister_request(struct ceph_mds_client *mdsc, 624 struct ceph_mds_request *req) 625 { 626 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 627 rb_erase(&req->r_node, &mdsc->request_tree); 628 RB_CLEAR_NODE(&req->r_node); 629 630 if (req->r_unsafe_dir) { 631 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 632 633 spin_lock(&ci->i_unsafe_lock); 634 list_del_init(&req->r_unsafe_dir_item); 635 spin_unlock(&ci->i_unsafe_lock); 636 637 iput(req->r_unsafe_dir); 638 req->r_unsafe_dir = NULL; 639 } 640 641 ceph_mdsc_put_request(req); 642 } 643 644 /* 645 * Choose mds to send request to next. If there is a hint set in the 646 * request (e.g., due to a prior forward hint from the mds), use that. 647 * Otherwise, consult frag tree and/or caps to identify the 648 * appropriate mds. If all else fails, choose randomly. 649 * 650 * Called under mdsc->mutex. 651 */ 652 static struct dentry *get_nonsnap_parent(struct dentry *dentry) 653 { 654 /* 655 * we don't need to worry about protecting the d_parent access 656 * here because we never renaming inside the snapped namespace 657 * except to resplice to another snapdir, and either the old or new 658 * result is a valid result. 659 */ 660 while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP) 661 dentry = dentry->d_parent; 662 return dentry; 663 } 664 665 static int __choose_mds(struct ceph_mds_client *mdsc, 666 struct ceph_mds_request *req) 667 { 668 struct inode *inode; 669 struct ceph_inode_info *ci; 670 struct ceph_cap *cap; 671 int mode = req->r_direct_mode; 672 int mds = -1; 673 u32 hash = req->r_direct_hash; 674 bool is_hash = req->r_direct_is_hash; 675 676 /* 677 * is there a specific mds we should try? ignore hint if we have 678 * no session and the mds is not up (active or recovering). 679 */ 680 if (req->r_resend_mds >= 0 && 681 (__have_session(mdsc, req->r_resend_mds) || 682 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 683 dout("choose_mds using resend_mds mds%d\n", 684 req->r_resend_mds); 685 return req->r_resend_mds; 686 } 687 688 if (mode == USE_RANDOM_MDS) 689 goto random; 690 691 inode = NULL; 692 if (req->r_inode) { 693 inode = req->r_inode; 694 } else if (req->r_dentry) { 695 /* ignore race with rename; old or new d_parent is okay */ 696 struct dentry *parent = req->r_dentry->d_parent; 697 struct inode *dir = parent->d_inode; 698 699 if (dir->i_sb != mdsc->fsc->sb) { 700 /* not this fs! */ 701 inode = req->r_dentry->d_inode; 702 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 703 /* direct snapped/virtual snapdir requests 704 * based on parent dir inode */ 705 struct dentry *dn = get_nonsnap_parent(parent); 706 inode = dn->d_inode; 707 dout("__choose_mds using nonsnap parent %p\n", inode); 708 } else if (req->r_dentry->d_inode) { 709 /* dentry target */ 710 inode = req->r_dentry->d_inode; 711 } else { 712 /* dir + name */ 713 inode = dir; 714 hash = ceph_dentry_hash(dir, req->r_dentry); 715 is_hash = true; 716 } 717 } 718 719 dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash, 720 (int)hash, mode); 721 if (!inode) 722 goto random; 723 ci = ceph_inode(inode); 724 725 if (is_hash && S_ISDIR(inode->i_mode)) { 726 struct ceph_inode_frag frag; 727 int found; 728 729 ceph_choose_frag(ci, hash, &frag, &found); 730 if (found) { 731 if (mode == USE_ANY_MDS && frag.ndist > 0) { 732 u8 r; 733 734 /* choose a random replica */ 735 get_random_bytes(&r, 1); 736 r %= frag.ndist; 737 mds = frag.dist[r]; 738 dout("choose_mds %p %llx.%llx " 739 "frag %u mds%d (%d/%d)\n", 740 inode, ceph_vinop(inode), 741 frag.frag, mds, 742 (int)r, frag.ndist); 743 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 744 CEPH_MDS_STATE_ACTIVE) 745 return mds; 746 } 747 748 /* since this file/dir wasn't known to be 749 * replicated, then we want to look for the 750 * authoritative mds. */ 751 mode = USE_AUTH_MDS; 752 if (frag.mds >= 0) { 753 /* choose auth mds */ 754 mds = frag.mds; 755 dout("choose_mds %p %llx.%llx " 756 "frag %u mds%d (auth)\n", 757 inode, ceph_vinop(inode), frag.frag, mds); 758 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 759 CEPH_MDS_STATE_ACTIVE) 760 return mds; 761 } 762 } 763 } 764 765 spin_lock(&ci->i_ceph_lock); 766 cap = NULL; 767 if (mode == USE_AUTH_MDS) 768 cap = ci->i_auth_cap; 769 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 770 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 771 if (!cap) { 772 spin_unlock(&ci->i_ceph_lock); 773 goto random; 774 } 775 mds = cap->session->s_mds; 776 dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n", 777 inode, ceph_vinop(inode), mds, 778 cap == ci->i_auth_cap ? "auth " : "", cap); 779 spin_unlock(&ci->i_ceph_lock); 780 return mds; 781 782 random: 783 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 784 dout("choose_mds chose random mds%d\n", mds); 785 return mds; 786 } 787 788 789 /* 790 * session messages 791 */ 792 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 793 { 794 struct ceph_msg *msg; 795 struct ceph_mds_session_head *h; 796 797 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 798 false); 799 if (!msg) { 800 pr_err("create_session_msg ENOMEM creating msg\n"); 801 return NULL; 802 } 803 h = msg->front.iov_base; 804 h->op = cpu_to_le32(op); 805 h->seq = cpu_to_le64(seq); 806 return msg; 807 } 808 809 /* 810 * send session open request. 811 * 812 * called under mdsc->mutex 813 */ 814 static int __open_session(struct ceph_mds_client *mdsc, 815 struct ceph_mds_session *session) 816 { 817 struct ceph_msg *msg; 818 int mstate; 819 int mds = session->s_mds; 820 821 /* wait for mds to go active? */ 822 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 823 dout("open_session to mds%d (%s)\n", mds, 824 ceph_mds_state_name(mstate)); 825 session->s_state = CEPH_MDS_SESSION_OPENING; 826 session->s_renew_requested = jiffies; 827 828 /* send connect message */ 829 msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq); 830 if (!msg) 831 return -ENOMEM; 832 ceph_con_send(&session->s_con, msg); 833 return 0; 834 } 835 836 /* 837 * open sessions for any export targets for the given mds 838 * 839 * called under mdsc->mutex 840 */ 841 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 842 struct ceph_mds_session *session) 843 { 844 struct ceph_mds_info *mi; 845 struct ceph_mds_session *ts; 846 int i, mds = session->s_mds; 847 int target; 848 849 if (mds >= mdsc->mdsmap->m_max_mds) 850 return; 851 mi = &mdsc->mdsmap->m_info[mds]; 852 dout("open_export_target_sessions for mds%d (%d targets)\n", 853 session->s_mds, mi->num_export_targets); 854 855 for (i = 0; i < mi->num_export_targets; i++) { 856 target = mi->export_targets[i]; 857 ts = __ceph_lookup_mds_session(mdsc, target); 858 if (!ts) { 859 ts = register_session(mdsc, target); 860 if (IS_ERR(ts)) 861 return; 862 } 863 if (session->s_state == CEPH_MDS_SESSION_NEW || 864 session->s_state == CEPH_MDS_SESSION_CLOSING) 865 __open_session(mdsc, session); 866 else 867 dout(" mds%d target mds%d %p is %s\n", session->s_mds, 868 i, ts, session_state_name(ts->s_state)); 869 ceph_put_mds_session(ts); 870 } 871 } 872 873 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 874 struct ceph_mds_session *session) 875 { 876 mutex_lock(&mdsc->mutex); 877 __open_export_target_sessions(mdsc, session); 878 mutex_unlock(&mdsc->mutex); 879 } 880 881 /* 882 * session caps 883 */ 884 885 /* 886 * Free preallocated cap messages assigned to this session 887 */ 888 static void cleanup_cap_releases(struct ceph_mds_session *session) 889 { 890 struct ceph_msg *msg; 891 892 spin_lock(&session->s_cap_lock); 893 while (!list_empty(&session->s_cap_releases)) { 894 msg = list_first_entry(&session->s_cap_releases, 895 struct ceph_msg, list_head); 896 list_del_init(&msg->list_head); 897 ceph_msg_put(msg); 898 } 899 while (!list_empty(&session->s_cap_releases_done)) { 900 msg = list_first_entry(&session->s_cap_releases_done, 901 struct ceph_msg, list_head); 902 list_del_init(&msg->list_head); 903 ceph_msg_put(msg); 904 } 905 spin_unlock(&session->s_cap_lock); 906 } 907 908 /* 909 * Helper to safely iterate over all caps associated with a session, with 910 * special care taken to handle a racing __ceph_remove_cap(). 911 * 912 * Caller must hold session s_mutex. 913 */ 914 static int iterate_session_caps(struct ceph_mds_session *session, 915 int (*cb)(struct inode *, struct ceph_cap *, 916 void *), void *arg) 917 { 918 struct list_head *p; 919 struct ceph_cap *cap; 920 struct inode *inode, *last_inode = NULL; 921 struct ceph_cap *old_cap = NULL; 922 int ret; 923 924 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 925 spin_lock(&session->s_cap_lock); 926 p = session->s_caps.next; 927 while (p != &session->s_caps) { 928 cap = list_entry(p, struct ceph_cap, session_caps); 929 inode = igrab(&cap->ci->vfs_inode); 930 if (!inode) { 931 p = p->next; 932 continue; 933 } 934 session->s_cap_iterator = cap; 935 spin_unlock(&session->s_cap_lock); 936 937 if (last_inode) { 938 iput(last_inode); 939 last_inode = NULL; 940 } 941 if (old_cap) { 942 ceph_put_cap(session->s_mdsc, old_cap); 943 old_cap = NULL; 944 } 945 946 ret = cb(inode, cap, arg); 947 last_inode = inode; 948 949 spin_lock(&session->s_cap_lock); 950 p = p->next; 951 if (cap->ci == NULL) { 952 dout("iterate_session_caps finishing cap %p removal\n", 953 cap); 954 BUG_ON(cap->session != session); 955 list_del_init(&cap->session_caps); 956 session->s_nr_caps--; 957 cap->session = NULL; 958 old_cap = cap; /* put_cap it w/o locks held */ 959 } 960 if (ret < 0) 961 goto out; 962 } 963 ret = 0; 964 out: 965 session->s_cap_iterator = NULL; 966 spin_unlock(&session->s_cap_lock); 967 968 if (last_inode) 969 iput(last_inode); 970 if (old_cap) 971 ceph_put_cap(session->s_mdsc, old_cap); 972 973 return ret; 974 } 975 976 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 977 void *arg) 978 { 979 struct ceph_inode_info *ci = ceph_inode(inode); 980 int drop = 0; 981 982 dout("removing cap %p, ci is %p, inode is %p\n", 983 cap, ci, &ci->vfs_inode); 984 spin_lock(&ci->i_ceph_lock); 985 __ceph_remove_cap(cap); 986 if (!__ceph_is_any_real_caps(ci)) { 987 struct ceph_mds_client *mdsc = 988 ceph_sb_to_client(inode->i_sb)->mdsc; 989 990 spin_lock(&mdsc->cap_dirty_lock); 991 if (!list_empty(&ci->i_dirty_item)) { 992 pr_info(" dropping dirty %s state for %p %lld\n", 993 ceph_cap_string(ci->i_dirty_caps), 994 inode, ceph_ino(inode)); 995 ci->i_dirty_caps = 0; 996 list_del_init(&ci->i_dirty_item); 997 drop = 1; 998 } 999 if (!list_empty(&ci->i_flushing_item)) { 1000 pr_info(" dropping dirty+flushing %s state for %p %lld\n", 1001 ceph_cap_string(ci->i_flushing_caps), 1002 inode, ceph_ino(inode)); 1003 ci->i_flushing_caps = 0; 1004 list_del_init(&ci->i_flushing_item); 1005 mdsc->num_cap_flushing--; 1006 drop = 1; 1007 } 1008 if (drop && ci->i_wrbuffer_ref) { 1009 pr_info(" dropping dirty data for %p %lld\n", 1010 inode, ceph_ino(inode)); 1011 ci->i_wrbuffer_ref = 0; 1012 ci->i_wrbuffer_ref_head = 0; 1013 drop++; 1014 } 1015 spin_unlock(&mdsc->cap_dirty_lock); 1016 } 1017 spin_unlock(&ci->i_ceph_lock); 1018 while (drop--) 1019 iput(inode); 1020 return 0; 1021 } 1022 1023 /* 1024 * caller must hold session s_mutex 1025 */ 1026 static void remove_session_caps(struct ceph_mds_session *session) 1027 { 1028 dout("remove_session_caps on %p\n", session); 1029 iterate_session_caps(session, remove_session_caps_cb, NULL); 1030 BUG_ON(session->s_nr_caps > 0); 1031 BUG_ON(!list_empty(&session->s_cap_flushing)); 1032 cleanup_cap_releases(session); 1033 } 1034 1035 /* 1036 * wake up any threads waiting on this session's caps. if the cap is 1037 * old (didn't get renewed on the client reconnect), remove it now. 1038 * 1039 * caller must hold s_mutex. 1040 */ 1041 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1042 void *arg) 1043 { 1044 struct ceph_inode_info *ci = ceph_inode(inode); 1045 1046 wake_up_all(&ci->i_cap_wq); 1047 if (arg) { 1048 spin_lock(&ci->i_ceph_lock); 1049 ci->i_wanted_max_size = 0; 1050 ci->i_requested_max_size = 0; 1051 spin_unlock(&ci->i_ceph_lock); 1052 } 1053 return 0; 1054 } 1055 1056 static void wake_up_session_caps(struct ceph_mds_session *session, 1057 int reconnect) 1058 { 1059 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1060 iterate_session_caps(session, wake_up_session_cb, 1061 (void *)(unsigned long)reconnect); 1062 } 1063 1064 /* 1065 * Send periodic message to MDS renewing all currently held caps. The 1066 * ack will reset the expiration for all caps from this session. 1067 * 1068 * caller holds s_mutex 1069 */ 1070 static int send_renew_caps(struct ceph_mds_client *mdsc, 1071 struct ceph_mds_session *session) 1072 { 1073 struct ceph_msg *msg; 1074 int state; 1075 1076 if (time_after_eq(jiffies, session->s_cap_ttl) && 1077 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1078 pr_info("mds%d caps stale\n", session->s_mds); 1079 session->s_renew_requested = jiffies; 1080 1081 /* do not try to renew caps until a recovering mds has reconnected 1082 * with its clients. */ 1083 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1084 if (state < CEPH_MDS_STATE_RECONNECT) { 1085 dout("send_renew_caps ignoring mds%d (%s)\n", 1086 session->s_mds, ceph_mds_state_name(state)); 1087 return 0; 1088 } 1089 1090 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1091 ceph_mds_state_name(state)); 1092 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1093 ++session->s_renew_seq); 1094 if (!msg) 1095 return -ENOMEM; 1096 ceph_con_send(&session->s_con, msg); 1097 return 0; 1098 } 1099 1100 /* 1101 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1102 * 1103 * Called under session->s_mutex 1104 */ 1105 static void renewed_caps(struct ceph_mds_client *mdsc, 1106 struct ceph_mds_session *session, int is_renew) 1107 { 1108 int was_stale; 1109 int wake = 0; 1110 1111 spin_lock(&session->s_cap_lock); 1112 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1113 1114 session->s_cap_ttl = session->s_renew_requested + 1115 mdsc->mdsmap->m_session_timeout*HZ; 1116 1117 if (was_stale) { 1118 if (time_before(jiffies, session->s_cap_ttl)) { 1119 pr_info("mds%d caps renewed\n", session->s_mds); 1120 wake = 1; 1121 } else { 1122 pr_info("mds%d caps still stale\n", session->s_mds); 1123 } 1124 } 1125 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1126 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1127 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1128 spin_unlock(&session->s_cap_lock); 1129 1130 if (wake) 1131 wake_up_session_caps(session, 0); 1132 } 1133 1134 /* 1135 * send a session close request 1136 */ 1137 static int request_close_session(struct ceph_mds_client *mdsc, 1138 struct ceph_mds_session *session) 1139 { 1140 struct ceph_msg *msg; 1141 1142 dout("request_close_session mds%d state %s seq %lld\n", 1143 session->s_mds, session_state_name(session->s_state), 1144 session->s_seq); 1145 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1146 if (!msg) 1147 return -ENOMEM; 1148 ceph_con_send(&session->s_con, msg); 1149 return 0; 1150 } 1151 1152 /* 1153 * Called with s_mutex held. 1154 */ 1155 static int __close_session(struct ceph_mds_client *mdsc, 1156 struct ceph_mds_session *session) 1157 { 1158 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1159 return 0; 1160 session->s_state = CEPH_MDS_SESSION_CLOSING; 1161 return request_close_session(mdsc, session); 1162 } 1163 1164 /* 1165 * Trim old(er) caps. 1166 * 1167 * Because we can't cache an inode without one or more caps, we do 1168 * this indirectly: if a cap is unused, we prune its aliases, at which 1169 * point the inode will hopefully get dropped to. 1170 * 1171 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1172 * memory pressure from the MDS, though, so it needn't be perfect. 1173 */ 1174 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1175 { 1176 struct ceph_mds_session *session = arg; 1177 struct ceph_inode_info *ci = ceph_inode(inode); 1178 int used, oissued, mine; 1179 1180 if (session->s_trim_caps <= 0) 1181 return -1; 1182 1183 spin_lock(&ci->i_ceph_lock); 1184 mine = cap->issued | cap->implemented; 1185 used = __ceph_caps_used(ci); 1186 oissued = __ceph_caps_issued_other(ci, cap); 1187 1188 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n", 1189 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1190 ceph_cap_string(used)); 1191 if (ci->i_dirty_caps) 1192 goto out; /* dirty caps */ 1193 if ((used & ~oissued) & mine) 1194 goto out; /* we need these caps */ 1195 1196 session->s_trim_caps--; 1197 if (oissued) { 1198 /* we aren't the only cap.. just remove us */ 1199 __ceph_remove_cap(cap); 1200 } else { 1201 /* try to drop referring dentries */ 1202 spin_unlock(&ci->i_ceph_lock); 1203 d_prune_aliases(inode); 1204 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1205 inode, cap, atomic_read(&inode->i_count)); 1206 return 0; 1207 } 1208 1209 out: 1210 spin_unlock(&ci->i_ceph_lock); 1211 return 0; 1212 } 1213 1214 /* 1215 * Trim session cap count down to some max number. 1216 */ 1217 static int trim_caps(struct ceph_mds_client *mdsc, 1218 struct ceph_mds_session *session, 1219 int max_caps) 1220 { 1221 int trim_caps = session->s_nr_caps - max_caps; 1222 1223 dout("trim_caps mds%d start: %d / %d, trim %d\n", 1224 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 1225 if (trim_caps > 0) { 1226 session->s_trim_caps = trim_caps; 1227 iterate_session_caps(session, trim_caps_cb, session); 1228 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 1229 session->s_mds, session->s_nr_caps, max_caps, 1230 trim_caps - session->s_trim_caps); 1231 session->s_trim_caps = 0; 1232 } 1233 return 0; 1234 } 1235 1236 /* 1237 * Allocate cap_release messages. If there is a partially full message 1238 * in the queue, try to allocate enough to cover it's remainder, so that 1239 * we can send it immediately. 1240 * 1241 * Called under s_mutex. 1242 */ 1243 int ceph_add_cap_releases(struct ceph_mds_client *mdsc, 1244 struct ceph_mds_session *session) 1245 { 1246 struct ceph_msg *msg, *partial = NULL; 1247 struct ceph_mds_cap_release *head; 1248 int err = -ENOMEM; 1249 int extra = mdsc->fsc->mount_options->cap_release_safety; 1250 int num; 1251 1252 dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds, 1253 extra); 1254 1255 spin_lock(&session->s_cap_lock); 1256 1257 if (!list_empty(&session->s_cap_releases)) { 1258 msg = list_first_entry(&session->s_cap_releases, 1259 struct ceph_msg, 1260 list_head); 1261 head = msg->front.iov_base; 1262 num = le32_to_cpu(head->num); 1263 if (num) { 1264 dout(" partial %p with (%d/%d)\n", msg, num, 1265 (int)CEPH_CAPS_PER_RELEASE); 1266 extra += CEPH_CAPS_PER_RELEASE - num; 1267 partial = msg; 1268 } 1269 } 1270 while (session->s_num_cap_releases < session->s_nr_caps + extra) { 1271 spin_unlock(&session->s_cap_lock); 1272 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, 1273 GFP_NOFS, false); 1274 if (!msg) 1275 goto out_unlocked; 1276 dout("add_cap_releases %p msg %p now %d\n", session, msg, 1277 (int)msg->front.iov_len); 1278 head = msg->front.iov_base; 1279 head->num = cpu_to_le32(0); 1280 msg->front.iov_len = sizeof(*head); 1281 spin_lock(&session->s_cap_lock); 1282 list_add(&msg->list_head, &session->s_cap_releases); 1283 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE; 1284 } 1285 1286 if (partial) { 1287 head = partial->front.iov_base; 1288 num = le32_to_cpu(head->num); 1289 dout(" queueing partial %p with %d/%d\n", partial, num, 1290 (int)CEPH_CAPS_PER_RELEASE); 1291 list_move_tail(&partial->list_head, 1292 &session->s_cap_releases_done); 1293 session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num; 1294 } 1295 err = 0; 1296 spin_unlock(&session->s_cap_lock); 1297 out_unlocked: 1298 return err; 1299 } 1300 1301 /* 1302 * flush all dirty inode data to disk. 1303 * 1304 * returns true if we've flushed through want_flush_seq 1305 */ 1306 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) 1307 { 1308 int mds, ret = 1; 1309 1310 dout("check_cap_flush want %lld\n", want_flush_seq); 1311 mutex_lock(&mdsc->mutex); 1312 for (mds = 0; ret && mds < mdsc->max_sessions; mds++) { 1313 struct ceph_mds_session *session = mdsc->sessions[mds]; 1314 1315 if (!session) 1316 continue; 1317 get_session(session); 1318 mutex_unlock(&mdsc->mutex); 1319 1320 mutex_lock(&session->s_mutex); 1321 if (!list_empty(&session->s_cap_flushing)) { 1322 struct ceph_inode_info *ci = 1323 list_entry(session->s_cap_flushing.next, 1324 struct ceph_inode_info, 1325 i_flushing_item); 1326 struct inode *inode = &ci->vfs_inode; 1327 1328 spin_lock(&ci->i_ceph_lock); 1329 if (ci->i_cap_flush_seq <= want_flush_seq) { 1330 dout("check_cap_flush still flushing %p " 1331 "seq %lld <= %lld to mds%d\n", inode, 1332 ci->i_cap_flush_seq, want_flush_seq, 1333 session->s_mds); 1334 ret = 0; 1335 } 1336 spin_unlock(&ci->i_ceph_lock); 1337 } 1338 mutex_unlock(&session->s_mutex); 1339 ceph_put_mds_session(session); 1340 1341 if (!ret) 1342 return ret; 1343 mutex_lock(&mdsc->mutex); 1344 } 1345 1346 mutex_unlock(&mdsc->mutex); 1347 dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq); 1348 return ret; 1349 } 1350 1351 /* 1352 * called under s_mutex 1353 */ 1354 void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 1355 struct ceph_mds_session *session) 1356 { 1357 struct ceph_msg *msg; 1358 1359 dout("send_cap_releases mds%d\n", session->s_mds); 1360 spin_lock(&session->s_cap_lock); 1361 while (!list_empty(&session->s_cap_releases_done)) { 1362 msg = list_first_entry(&session->s_cap_releases_done, 1363 struct ceph_msg, list_head); 1364 list_del_init(&msg->list_head); 1365 spin_unlock(&session->s_cap_lock); 1366 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1367 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 1368 ceph_con_send(&session->s_con, msg); 1369 spin_lock(&session->s_cap_lock); 1370 } 1371 spin_unlock(&session->s_cap_lock); 1372 } 1373 1374 static void discard_cap_releases(struct ceph_mds_client *mdsc, 1375 struct ceph_mds_session *session) 1376 { 1377 struct ceph_msg *msg; 1378 struct ceph_mds_cap_release *head; 1379 unsigned num; 1380 1381 dout("discard_cap_releases mds%d\n", session->s_mds); 1382 spin_lock(&session->s_cap_lock); 1383 1384 /* zero out the in-progress message */ 1385 msg = list_first_entry(&session->s_cap_releases, 1386 struct ceph_msg, list_head); 1387 head = msg->front.iov_base; 1388 num = le32_to_cpu(head->num); 1389 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num); 1390 head->num = cpu_to_le32(0); 1391 session->s_num_cap_releases += num; 1392 1393 /* requeue completed messages */ 1394 while (!list_empty(&session->s_cap_releases_done)) { 1395 msg = list_first_entry(&session->s_cap_releases_done, 1396 struct ceph_msg, list_head); 1397 list_del_init(&msg->list_head); 1398 1399 head = msg->front.iov_base; 1400 num = le32_to_cpu(head->num); 1401 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, 1402 num); 1403 session->s_num_cap_releases += num; 1404 head->num = cpu_to_le32(0); 1405 msg->front.iov_len = sizeof(*head); 1406 list_add(&msg->list_head, &session->s_cap_releases); 1407 } 1408 1409 spin_unlock(&session->s_cap_lock); 1410 } 1411 1412 /* 1413 * requests 1414 */ 1415 1416 /* 1417 * Create an mds request. 1418 */ 1419 struct ceph_mds_request * 1420 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 1421 { 1422 struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS); 1423 1424 if (!req) 1425 return ERR_PTR(-ENOMEM); 1426 1427 mutex_init(&req->r_fill_mutex); 1428 req->r_mdsc = mdsc; 1429 req->r_started = jiffies; 1430 req->r_resend_mds = -1; 1431 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 1432 req->r_fmode = -1; 1433 kref_init(&req->r_kref); 1434 INIT_LIST_HEAD(&req->r_wait); 1435 init_completion(&req->r_completion); 1436 init_completion(&req->r_safe_completion); 1437 INIT_LIST_HEAD(&req->r_unsafe_item); 1438 1439 req->r_op = op; 1440 req->r_direct_mode = mode; 1441 return req; 1442 } 1443 1444 /* 1445 * return oldest (lowest) request, tid in request tree, 0 if none. 1446 * 1447 * called under mdsc->mutex. 1448 */ 1449 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 1450 { 1451 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 1452 return NULL; 1453 return rb_entry(rb_first(&mdsc->request_tree), 1454 struct ceph_mds_request, r_node); 1455 } 1456 1457 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 1458 { 1459 struct ceph_mds_request *req = __get_oldest_req(mdsc); 1460 1461 if (req) 1462 return req->r_tid; 1463 return 0; 1464 } 1465 1466 /* 1467 * Build a dentry's path. Allocate on heap; caller must kfree. Based 1468 * on build_path_from_dentry in fs/cifs/dir.c. 1469 * 1470 * If @stop_on_nosnap, generate path relative to the first non-snapped 1471 * inode. 1472 * 1473 * Encode hidden .snap dirs as a double /, i.e. 1474 * foo/.snap/bar -> foo//bar 1475 */ 1476 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base, 1477 int stop_on_nosnap) 1478 { 1479 struct dentry *temp; 1480 char *path; 1481 int len, pos; 1482 unsigned seq; 1483 1484 if (dentry == NULL) 1485 return ERR_PTR(-EINVAL); 1486 1487 retry: 1488 len = 0; 1489 seq = read_seqbegin(&rename_lock); 1490 rcu_read_lock(); 1491 for (temp = dentry; !IS_ROOT(temp);) { 1492 struct inode *inode = temp->d_inode; 1493 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) 1494 len++; /* slash only */ 1495 else if (stop_on_nosnap && inode && 1496 ceph_snap(inode) == CEPH_NOSNAP) 1497 break; 1498 else 1499 len += 1 + temp->d_name.len; 1500 temp = temp->d_parent; 1501 } 1502 rcu_read_unlock(); 1503 if (len) 1504 len--; /* no leading '/' */ 1505 1506 path = kmalloc(len+1, GFP_NOFS); 1507 if (path == NULL) 1508 return ERR_PTR(-ENOMEM); 1509 pos = len; 1510 path[pos] = 0; /* trailing null */ 1511 rcu_read_lock(); 1512 for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) { 1513 struct inode *inode; 1514 1515 spin_lock(&temp->d_lock); 1516 inode = temp->d_inode; 1517 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 1518 dout("build_path path+%d: %p SNAPDIR\n", 1519 pos, temp); 1520 } else if (stop_on_nosnap && inode && 1521 ceph_snap(inode) == CEPH_NOSNAP) { 1522 spin_unlock(&temp->d_lock); 1523 break; 1524 } else { 1525 pos -= temp->d_name.len; 1526 if (pos < 0) { 1527 spin_unlock(&temp->d_lock); 1528 break; 1529 } 1530 strncpy(path + pos, temp->d_name.name, 1531 temp->d_name.len); 1532 } 1533 spin_unlock(&temp->d_lock); 1534 if (pos) 1535 path[--pos] = '/'; 1536 temp = temp->d_parent; 1537 } 1538 rcu_read_unlock(); 1539 if (pos != 0 || read_seqretry(&rename_lock, seq)) { 1540 pr_err("build_path did not end path lookup where " 1541 "expected, namelen is %d, pos is %d\n", len, pos); 1542 /* presumably this is only possible if racing with a 1543 rename of one of the parent directories (we can not 1544 lock the dentries above us to prevent this, but 1545 retrying should be harmless) */ 1546 kfree(path); 1547 goto retry; 1548 } 1549 1550 *base = ceph_ino(temp->d_inode); 1551 *plen = len; 1552 dout("build_path on %p %d built %llx '%.*s'\n", 1553 dentry, dentry->d_count, *base, len, path); 1554 return path; 1555 } 1556 1557 static int build_dentry_path(struct dentry *dentry, 1558 const char **ppath, int *ppathlen, u64 *pino, 1559 int *pfreepath) 1560 { 1561 char *path; 1562 1563 if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) { 1564 *pino = ceph_ino(dentry->d_parent->d_inode); 1565 *ppath = dentry->d_name.name; 1566 *ppathlen = dentry->d_name.len; 1567 return 0; 1568 } 1569 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 1570 if (IS_ERR(path)) 1571 return PTR_ERR(path); 1572 *ppath = path; 1573 *pfreepath = 1; 1574 return 0; 1575 } 1576 1577 static int build_inode_path(struct inode *inode, 1578 const char **ppath, int *ppathlen, u64 *pino, 1579 int *pfreepath) 1580 { 1581 struct dentry *dentry; 1582 char *path; 1583 1584 if (ceph_snap(inode) == CEPH_NOSNAP) { 1585 *pino = ceph_ino(inode); 1586 *ppathlen = 0; 1587 return 0; 1588 } 1589 dentry = d_find_alias(inode); 1590 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 1591 dput(dentry); 1592 if (IS_ERR(path)) 1593 return PTR_ERR(path); 1594 *ppath = path; 1595 *pfreepath = 1; 1596 return 0; 1597 } 1598 1599 /* 1600 * request arguments may be specified via an inode *, a dentry *, or 1601 * an explicit ino+path. 1602 */ 1603 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 1604 const char *rpath, u64 rino, 1605 const char **ppath, int *pathlen, 1606 u64 *ino, int *freepath) 1607 { 1608 int r = 0; 1609 1610 if (rinode) { 1611 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 1612 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 1613 ceph_snap(rinode)); 1614 } else if (rdentry) { 1615 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath); 1616 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 1617 *ppath); 1618 } else if (rpath || rino) { 1619 *ino = rino; 1620 *ppath = rpath; 1621 *pathlen = rpath ? strlen(rpath) : 0; 1622 dout(" path %.*s\n", *pathlen, rpath); 1623 } 1624 1625 return r; 1626 } 1627 1628 /* 1629 * called under mdsc->mutex 1630 */ 1631 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, 1632 struct ceph_mds_request *req, 1633 int mds) 1634 { 1635 struct ceph_msg *msg; 1636 struct ceph_mds_request_head *head; 1637 const char *path1 = NULL; 1638 const char *path2 = NULL; 1639 u64 ino1 = 0, ino2 = 0; 1640 int pathlen1 = 0, pathlen2 = 0; 1641 int freepath1 = 0, freepath2 = 0; 1642 int len; 1643 u16 releases; 1644 void *p, *end; 1645 int ret; 1646 1647 ret = set_request_path_attr(req->r_inode, req->r_dentry, 1648 req->r_path1, req->r_ino1.ino, 1649 &path1, &pathlen1, &ino1, &freepath1); 1650 if (ret < 0) { 1651 msg = ERR_PTR(ret); 1652 goto out; 1653 } 1654 1655 ret = set_request_path_attr(NULL, req->r_old_dentry, 1656 req->r_path2, req->r_ino2.ino, 1657 &path2, &pathlen2, &ino2, &freepath2); 1658 if (ret < 0) { 1659 msg = ERR_PTR(ret); 1660 goto out_free1; 1661 } 1662 1663 len = sizeof(*head) + 1664 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)); 1665 1666 /* calculate (max) length for cap releases */ 1667 len += sizeof(struct ceph_mds_request_release) * 1668 (!!req->r_inode_drop + !!req->r_dentry_drop + 1669 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 1670 if (req->r_dentry_drop) 1671 len += req->r_dentry->d_name.len; 1672 if (req->r_old_dentry_drop) 1673 len += req->r_old_dentry->d_name.len; 1674 1675 msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false); 1676 if (!msg) { 1677 msg = ERR_PTR(-ENOMEM); 1678 goto out_free2; 1679 } 1680 1681 msg->hdr.tid = cpu_to_le64(req->r_tid); 1682 1683 head = msg->front.iov_base; 1684 p = msg->front.iov_base + sizeof(*head); 1685 end = msg->front.iov_base + msg->front.iov_len; 1686 1687 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 1688 head->op = cpu_to_le32(req->r_op); 1689 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid)); 1690 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid)); 1691 head->args = req->r_args; 1692 1693 ceph_encode_filepath(&p, end, ino1, path1); 1694 ceph_encode_filepath(&p, end, ino2, path2); 1695 1696 /* make note of release offset, in case we need to replay */ 1697 req->r_request_release_offset = p - msg->front.iov_base; 1698 1699 /* cap releases */ 1700 releases = 0; 1701 if (req->r_inode_drop) 1702 releases += ceph_encode_inode_release(&p, 1703 req->r_inode ? req->r_inode : req->r_dentry->d_inode, 1704 mds, req->r_inode_drop, req->r_inode_unless, 0); 1705 if (req->r_dentry_drop) 1706 releases += ceph_encode_dentry_release(&p, req->r_dentry, 1707 mds, req->r_dentry_drop, req->r_dentry_unless); 1708 if (req->r_old_dentry_drop) 1709 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 1710 mds, req->r_old_dentry_drop, req->r_old_dentry_unless); 1711 if (req->r_old_inode_drop) 1712 releases += ceph_encode_inode_release(&p, 1713 req->r_old_dentry->d_inode, 1714 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 1715 head->num_releases = cpu_to_le16(releases); 1716 1717 BUG_ON(p > end); 1718 msg->front.iov_len = p - msg->front.iov_base; 1719 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1720 1721 msg->pages = req->r_pages; 1722 msg->nr_pages = req->r_num_pages; 1723 msg->hdr.data_len = cpu_to_le32(req->r_data_len); 1724 msg->hdr.data_off = cpu_to_le16(0); 1725 1726 out_free2: 1727 if (freepath2) 1728 kfree((char *)path2); 1729 out_free1: 1730 if (freepath1) 1731 kfree((char *)path1); 1732 out: 1733 return msg; 1734 } 1735 1736 /* 1737 * called under mdsc->mutex if error, under no mutex if 1738 * success. 1739 */ 1740 static void complete_request(struct ceph_mds_client *mdsc, 1741 struct ceph_mds_request *req) 1742 { 1743 if (req->r_callback) 1744 req->r_callback(mdsc, req); 1745 else 1746 complete_all(&req->r_completion); 1747 } 1748 1749 /* 1750 * called under mdsc->mutex 1751 */ 1752 static int __prepare_send_request(struct ceph_mds_client *mdsc, 1753 struct ceph_mds_request *req, 1754 int mds) 1755 { 1756 struct ceph_mds_request_head *rhead; 1757 struct ceph_msg *msg; 1758 int flags = 0; 1759 1760 req->r_attempts++; 1761 if (req->r_inode) { 1762 struct ceph_cap *cap = 1763 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 1764 1765 if (cap) 1766 req->r_sent_on_mseq = cap->mseq; 1767 else 1768 req->r_sent_on_mseq = -1; 1769 } 1770 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 1771 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 1772 1773 if (req->r_got_unsafe) { 1774 /* 1775 * Replay. Do not regenerate message (and rebuild 1776 * paths, etc.); just use the original message. 1777 * Rebuilding paths will break for renames because 1778 * d_move mangles the src name. 1779 */ 1780 msg = req->r_request; 1781 rhead = msg->front.iov_base; 1782 1783 flags = le32_to_cpu(rhead->flags); 1784 flags |= CEPH_MDS_FLAG_REPLAY; 1785 rhead->flags = cpu_to_le32(flags); 1786 1787 if (req->r_target_inode) 1788 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 1789 1790 rhead->num_retry = req->r_attempts - 1; 1791 1792 /* remove cap/dentry releases from message */ 1793 rhead->num_releases = 0; 1794 msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset); 1795 msg->front.iov_len = req->r_request_release_offset; 1796 return 0; 1797 } 1798 1799 if (req->r_request) { 1800 ceph_msg_put(req->r_request); 1801 req->r_request = NULL; 1802 } 1803 msg = create_request_message(mdsc, req, mds); 1804 if (IS_ERR(msg)) { 1805 req->r_err = PTR_ERR(msg); 1806 complete_request(mdsc, req); 1807 return PTR_ERR(msg); 1808 } 1809 req->r_request = msg; 1810 1811 rhead = msg->front.iov_base; 1812 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 1813 if (req->r_got_unsafe) 1814 flags |= CEPH_MDS_FLAG_REPLAY; 1815 if (req->r_locked_dir) 1816 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 1817 rhead->flags = cpu_to_le32(flags); 1818 rhead->num_fwd = req->r_num_fwd; 1819 rhead->num_retry = req->r_attempts - 1; 1820 rhead->ino = 0; 1821 1822 dout(" r_locked_dir = %p\n", req->r_locked_dir); 1823 return 0; 1824 } 1825 1826 /* 1827 * send request, or put it on the appropriate wait list. 1828 */ 1829 static int __do_request(struct ceph_mds_client *mdsc, 1830 struct ceph_mds_request *req) 1831 { 1832 struct ceph_mds_session *session = NULL; 1833 int mds = -1; 1834 int err = -EAGAIN; 1835 1836 if (req->r_err || req->r_got_result) 1837 goto out; 1838 1839 if (req->r_timeout && 1840 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 1841 dout("do_request timed out\n"); 1842 err = -EIO; 1843 goto finish; 1844 } 1845 1846 put_request_session(req); 1847 1848 mds = __choose_mds(mdsc, req); 1849 if (mds < 0 || 1850 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 1851 dout("do_request no mds or not active, waiting for map\n"); 1852 list_add(&req->r_wait, &mdsc->waiting_for_map); 1853 goto out; 1854 } 1855 1856 /* get, open session */ 1857 session = __ceph_lookup_mds_session(mdsc, mds); 1858 if (!session) { 1859 session = register_session(mdsc, mds); 1860 if (IS_ERR(session)) { 1861 err = PTR_ERR(session); 1862 goto finish; 1863 } 1864 } 1865 req->r_session = get_session(session); 1866 1867 dout("do_request mds%d session %p state %s\n", mds, session, 1868 session_state_name(session->s_state)); 1869 if (session->s_state != CEPH_MDS_SESSION_OPEN && 1870 session->s_state != CEPH_MDS_SESSION_HUNG) { 1871 if (session->s_state == CEPH_MDS_SESSION_NEW || 1872 session->s_state == CEPH_MDS_SESSION_CLOSING) 1873 __open_session(mdsc, session); 1874 list_add(&req->r_wait, &session->s_waiting); 1875 goto out_session; 1876 } 1877 1878 /* send request */ 1879 req->r_resend_mds = -1; /* forget any previous mds hint */ 1880 1881 if (req->r_request_started == 0) /* note request start time */ 1882 req->r_request_started = jiffies; 1883 1884 err = __prepare_send_request(mdsc, req, mds); 1885 if (!err) { 1886 ceph_msg_get(req->r_request); 1887 ceph_con_send(&session->s_con, req->r_request); 1888 } 1889 1890 out_session: 1891 ceph_put_mds_session(session); 1892 out: 1893 return err; 1894 1895 finish: 1896 req->r_err = err; 1897 complete_request(mdsc, req); 1898 goto out; 1899 } 1900 1901 /* 1902 * called under mdsc->mutex 1903 */ 1904 static void __wake_requests(struct ceph_mds_client *mdsc, 1905 struct list_head *head) 1906 { 1907 struct ceph_mds_request *req; 1908 LIST_HEAD(tmp_list); 1909 1910 list_splice_init(head, &tmp_list); 1911 1912 while (!list_empty(&tmp_list)) { 1913 req = list_entry(tmp_list.next, 1914 struct ceph_mds_request, r_wait); 1915 list_del_init(&req->r_wait); 1916 __do_request(mdsc, req); 1917 } 1918 } 1919 1920 /* 1921 * Wake up threads with requests pending for @mds, so that they can 1922 * resubmit their requests to a possibly different mds. 1923 */ 1924 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 1925 { 1926 struct ceph_mds_request *req; 1927 struct rb_node *p; 1928 1929 dout("kick_requests mds%d\n", mds); 1930 for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) { 1931 req = rb_entry(p, struct ceph_mds_request, r_node); 1932 if (req->r_got_unsafe) 1933 continue; 1934 if (req->r_session && 1935 req->r_session->s_mds == mds) { 1936 dout(" kicking tid %llu\n", req->r_tid); 1937 __do_request(mdsc, req); 1938 } 1939 } 1940 } 1941 1942 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, 1943 struct ceph_mds_request *req) 1944 { 1945 dout("submit_request on %p\n", req); 1946 mutex_lock(&mdsc->mutex); 1947 __register_request(mdsc, req, NULL); 1948 __do_request(mdsc, req); 1949 mutex_unlock(&mdsc->mutex); 1950 } 1951 1952 /* 1953 * Synchrously perform an mds request. Take care of all of the 1954 * session setup, forwarding, retry details. 1955 */ 1956 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 1957 struct inode *dir, 1958 struct ceph_mds_request *req) 1959 { 1960 int err; 1961 1962 dout("do_request on %p\n", req); 1963 1964 /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */ 1965 if (req->r_inode) 1966 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 1967 if (req->r_locked_dir) 1968 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 1969 if (req->r_old_dentry) 1970 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 1971 CEPH_CAP_PIN); 1972 1973 /* issue */ 1974 mutex_lock(&mdsc->mutex); 1975 __register_request(mdsc, req, dir); 1976 __do_request(mdsc, req); 1977 1978 if (req->r_err) { 1979 err = req->r_err; 1980 __unregister_request(mdsc, req); 1981 dout("do_request early error %d\n", err); 1982 goto out; 1983 } 1984 1985 /* wait */ 1986 mutex_unlock(&mdsc->mutex); 1987 dout("do_request waiting\n"); 1988 if (req->r_timeout) { 1989 err = (long)wait_for_completion_killable_timeout( 1990 &req->r_completion, req->r_timeout); 1991 if (err == 0) 1992 err = -EIO; 1993 } else { 1994 err = wait_for_completion_killable(&req->r_completion); 1995 } 1996 dout("do_request waited, got %d\n", err); 1997 mutex_lock(&mdsc->mutex); 1998 1999 /* only abort if we didn't race with a real reply */ 2000 if (req->r_got_result) { 2001 err = le32_to_cpu(req->r_reply_info.head->result); 2002 } else if (err < 0) { 2003 dout("aborted request %lld with %d\n", req->r_tid, err); 2004 2005 /* 2006 * ensure we aren't running concurrently with 2007 * ceph_fill_trace or ceph_readdir_prepopulate, which 2008 * rely on locks (dir mutex) held by our caller. 2009 */ 2010 mutex_lock(&req->r_fill_mutex); 2011 req->r_err = err; 2012 req->r_aborted = true; 2013 mutex_unlock(&req->r_fill_mutex); 2014 2015 if (req->r_locked_dir && 2016 (req->r_op & CEPH_MDS_OP_WRITE)) 2017 ceph_invalidate_dir_request(req); 2018 } else { 2019 err = req->r_err; 2020 } 2021 2022 out: 2023 mutex_unlock(&mdsc->mutex); 2024 dout("do_request %p done, result %d\n", req, err); 2025 return err; 2026 } 2027 2028 /* 2029 * Invalidate dir D_COMPLETE, dentry lease state on an aborted MDS 2030 * namespace request. 2031 */ 2032 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 2033 { 2034 struct inode *inode = req->r_locked_dir; 2035 struct ceph_inode_info *ci = ceph_inode(inode); 2036 2037 dout("invalidate_dir_request %p (D_COMPLETE, lease(s))\n", inode); 2038 spin_lock(&ci->i_ceph_lock); 2039 ceph_dir_clear_complete(inode); 2040 ci->i_release_count++; 2041 spin_unlock(&ci->i_ceph_lock); 2042 2043 if (req->r_dentry) 2044 ceph_invalidate_dentry_lease(req->r_dentry); 2045 if (req->r_old_dentry) 2046 ceph_invalidate_dentry_lease(req->r_old_dentry); 2047 } 2048 2049 /* 2050 * Handle mds reply. 2051 * 2052 * We take the session mutex and parse and process the reply immediately. 2053 * This preserves the logical ordering of replies, capabilities, etc., sent 2054 * by the MDS as they are applied to our local cache. 2055 */ 2056 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 2057 { 2058 struct ceph_mds_client *mdsc = session->s_mdsc; 2059 struct ceph_mds_request *req; 2060 struct ceph_mds_reply_head *head = msg->front.iov_base; 2061 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 2062 u64 tid; 2063 int err, result; 2064 int mds = session->s_mds; 2065 2066 if (msg->front.iov_len < sizeof(*head)) { 2067 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 2068 ceph_msg_dump(msg); 2069 return; 2070 } 2071 2072 /* get request, session */ 2073 tid = le64_to_cpu(msg->hdr.tid); 2074 mutex_lock(&mdsc->mutex); 2075 req = __lookup_request(mdsc, tid); 2076 if (!req) { 2077 dout("handle_reply on unknown tid %llu\n", tid); 2078 mutex_unlock(&mdsc->mutex); 2079 return; 2080 } 2081 dout("handle_reply %p\n", req); 2082 2083 /* correct session? */ 2084 if (req->r_session != session) { 2085 pr_err("mdsc_handle_reply got %llu on session mds%d" 2086 " not mds%d\n", tid, session->s_mds, 2087 req->r_session ? req->r_session->s_mds : -1); 2088 mutex_unlock(&mdsc->mutex); 2089 goto out; 2090 } 2091 2092 /* dup? */ 2093 if ((req->r_got_unsafe && !head->safe) || 2094 (req->r_got_safe && head->safe)) { 2095 pr_warning("got a dup %s reply on %llu from mds%d\n", 2096 head->safe ? "safe" : "unsafe", tid, mds); 2097 mutex_unlock(&mdsc->mutex); 2098 goto out; 2099 } 2100 if (req->r_got_safe && !head->safe) { 2101 pr_warning("got unsafe after safe on %llu from mds%d\n", 2102 tid, mds); 2103 mutex_unlock(&mdsc->mutex); 2104 goto out; 2105 } 2106 2107 result = le32_to_cpu(head->result); 2108 2109 /* 2110 * Handle an ESTALE 2111 * if we're not talking to the authority, send to them 2112 * if the authority has changed while we weren't looking, 2113 * send to new authority 2114 * Otherwise we just have to return an ESTALE 2115 */ 2116 if (result == -ESTALE) { 2117 dout("got ESTALE on request %llu", req->r_tid); 2118 if (!req->r_inode) { 2119 /* do nothing; not an authority problem */ 2120 } else if (req->r_direct_mode != USE_AUTH_MDS) { 2121 dout("not using auth, setting for that now"); 2122 req->r_direct_mode = USE_AUTH_MDS; 2123 __do_request(mdsc, req); 2124 mutex_unlock(&mdsc->mutex); 2125 goto out; 2126 } else { 2127 struct ceph_inode_info *ci = ceph_inode(req->r_inode); 2128 struct ceph_cap *cap = NULL; 2129 2130 if (req->r_session) 2131 cap = ceph_get_cap_for_mds(ci, 2132 req->r_session->s_mds); 2133 2134 dout("already using auth"); 2135 if ((!cap || cap != ci->i_auth_cap) || 2136 (cap->mseq != req->r_sent_on_mseq)) { 2137 dout("but cap changed, so resending"); 2138 __do_request(mdsc, req); 2139 mutex_unlock(&mdsc->mutex); 2140 goto out; 2141 } 2142 } 2143 dout("have to return ESTALE on request %llu", req->r_tid); 2144 } 2145 2146 2147 if (head->safe) { 2148 req->r_got_safe = true; 2149 __unregister_request(mdsc, req); 2150 complete_all(&req->r_safe_completion); 2151 2152 if (req->r_got_unsafe) { 2153 /* 2154 * We already handled the unsafe response, now do the 2155 * cleanup. No need to examine the response; the MDS 2156 * doesn't include any result info in the safe 2157 * response. And even if it did, there is nothing 2158 * useful we could do with a revised return value. 2159 */ 2160 dout("got safe reply %llu, mds%d\n", tid, mds); 2161 list_del_init(&req->r_unsafe_item); 2162 2163 /* last unsafe request during umount? */ 2164 if (mdsc->stopping && !__get_oldest_req(mdsc)) 2165 complete_all(&mdsc->safe_umount_waiters); 2166 mutex_unlock(&mdsc->mutex); 2167 goto out; 2168 } 2169 } else { 2170 req->r_got_unsafe = true; 2171 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 2172 } 2173 2174 dout("handle_reply tid %lld result %d\n", tid, result); 2175 rinfo = &req->r_reply_info; 2176 err = parse_reply_info(msg, rinfo, session->s_con.peer_features); 2177 mutex_unlock(&mdsc->mutex); 2178 2179 mutex_lock(&session->s_mutex); 2180 if (err < 0) { 2181 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 2182 ceph_msg_dump(msg); 2183 goto out_err; 2184 } 2185 2186 /* snap trace */ 2187 if (rinfo->snapblob_len) { 2188 down_write(&mdsc->snap_rwsem); 2189 ceph_update_snap_trace(mdsc, rinfo->snapblob, 2190 rinfo->snapblob + rinfo->snapblob_len, 2191 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP); 2192 downgrade_write(&mdsc->snap_rwsem); 2193 } else { 2194 down_read(&mdsc->snap_rwsem); 2195 } 2196 2197 /* insert trace into our cache */ 2198 mutex_lock(&req->r_fill_mutex); 2199 err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); 2200 if (err == 0) { 2201 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 2202 req->r_op == CEPH_MDS_OP_LSSNAP) && 2203 rinfo->dir_nr) 2204 ceph_readdir_prepopulate(req, req->r_session); 2205 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 2206 } 2207 mutex_unlock(&req->r_fill_mutex); 2208 2209 up_read(&mdsc->snap_rwsem); 2210 out_err: 2211 mutex_lock(&mdsc->mutex); 2212 if (!req->r_aborted) { 2213 if (err) { 2214 req->r_err = err; 2215 } else { 2216 req->r_reply = msg; 2217 ceph_msg_get(msg); 2218 req->r_got_result = true; 2219 } 2220 } else { 2221 dout("reply arrived after request %lld was aborted\n", tid); 2222 } 2223 mutex_unlock(&mdsc->mutex); 2224 2225 ceph_add_cap_releases(mdsc, req->r_session); 2226 mutex_unlock(&session->s_mutex); 2227 2228 /* kick calling process */ 2229 complete_request(mdsc, req); 2230 out: 2231 ceph_mdsc_put_request(req); 2232 return; 2233 } 2234 2235 2236 2237 /* 2238 * handle mds notification that our request has been forwarded. 2239 */ 2240 static void handle_forward(struct ceph_mds_client *mdsc, 2241 struct ceph_mds_session *session, 2242 struct ceph_msg *msg) 2243 { 2244 struct ceph_mds_request *req; 2245 u64 tid = le64_to_cpu(msg->hdr.tid); 2246 u32 next_mds; 2247 u32 fwd_seq; 2248 int err = -EINVAL; 2249 void *p = msg->front.iov_base; 2250 void *end = p + msg->front.iov_len; 2251 2252 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 2253 next_mds = ceph_decode_32(&p); 2254 fwd_seq = ceph_decode_32(&p); 2255 2256 mutex_lock(&mdsc->mutex); 2257 req = __lookup_request(mdsc, tid); 2258 if (!req) { 2259 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 2260 goto out; /* dup reply? */ 2261 } 2262 2263 if (req->r_aborted) { 2264 dout("forward tid %llu aborted, unregistering\n", tid); 2265 __unregister_request(mdsc, req); 2266 } else if (fwd_seq <= req->r_num_fwd) { 2267 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 2268 tid, next_mds, req->r_num_fwd, fwd_seq); 2269 } else { 2270 /* resend. forward race not possible; mds would drop */ 2271 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 2272 BUG_ON(req->r_err); 2273 BUG_ON(req->r_got_result); 2274 req->r_num_fwd = fwd_seq; 2275 req->r_resend_mds = next_mds; 2276 put_request_session(req); 2277 __do_request(mdsc, req); 2278 } 2279 ceph_mdsc_put_request(req); 2280 out: 2281 mutex_unlock(&mdsc->mutex); 2282 return; 2283 2284 bad: 2285 pr_err("mdsc_handle_forward decode error err=%d\n", err); 2286 } 2287 2288 /* 2289 * handle a mds session control message 2290 */ 2291 static void handle_session(struct ceph_mds_session *session, 2292 struct ceph_msg *msg) 2293 { 2294 struct ceph_mds_client *mdsc = session->s_mdsc; 2295 u32 op; 2296 u64 seq; 2297 int mds = session->s_mds; 2298 struct ceph_mds_session_head *h = msg->front.iov_base; 2299 int wake = 0; 2300 2301 /* decode */ 2302 if (msg->front.iov_len != sizeof(*h)) 2303 goto bad; 2304 op = le32_to_cpu(h->op); 2305 seq = le64_to_cpu(h->seq); 2306 2307 mutex_lock(&mdsc->mutex); 2308 if (op == CEPH_SESSION_CLOSE) 2309 __unregister_session(mdsc, session); 2310 /* FIXME: this ttl calculation is generous */ 2311 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 2312 mutex_unlock(&mdsc->mutex); 2313 2314 mutex_lock(&session->s_mutex); 2315 2316 dout("handle_session mds%d %s %p state %s seq %llu\n", 2317 mds, ceph_session_op_name(op), session, 2318 session_state_name(session->s_state), seq); 2319 2320 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 2321 session->s_state = CEPH_MDS_SESSION_OPEN; 2322 pr_info("mds%d came back\n", session->s_mds); 2323 } 2324 2325 switch (op) { 2326 case CEPH_SESSION_OPEN: 2327 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 2328 pr_info("mds%d reconnect success\n", session->s_mds); 2329 session->s_state = CEPH_MDS_SESSION_OPEN; 2330 renewed_caps(mdsc, session, 0); 2331 wake = 1; 2332 if (mdsc->stopping) 2333 __close_session(mdsc, session); 2334 break; 2335 2336 case CEPH_SESSION_RENEWCAPS: 2337 if (session->s_renew_seq == seq) 2338 renewed_caps(mdsc, session, 1); 2339 break; 2340 2341 case CEPH_SESSION_CLOSE: 2342 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 2343 pr_info("mds%d reconnect denied\n", session->s_mds); 2344 remove_session_caps(session); 2345 wake = 1; /* for good measure */ 2346 wake_up_all(&mdsc->session_close_wq); 2347 kick_requests(mdsc, mds); 2348 break; 2349 2350 case CEPH_SESSION_STALE: 2351 pr_info("mds%d caps went stale, renewing\n", 2352 session->s_mds); 2353 spin_lock(&session->s_gen_ttl_lock); 2354 session->s_cap_gen++; 2355 session->s_cap_ttl = jiffies - 1; 2356 spin_unlock(&session->s_gen_ttl_lock); 2357 send_renew_caps(mdsc, session); 2358 break; 2359 2360 case CEPH_SESSION_RECALL_STATE: 2361 trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 2362 break; 2363 2364 default: 2365 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 2366 WARN_ON(1); 2367 } 2368 2369 mutex_unlock(&session->s_mutex); 2370 if (wake) { 2371 mutex_lock(&mdsc->mutex); 2372 __wake_requests(mdsc, &session->s_waiting); 2373 mutex_unlock(&mdsc->mutex); 2374 } 2375 return; 2376 2377 bad: 2378 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 2379 (int)msg->front.iov_len); 2380 ceph_msg_dump(msg); 2381 return; 2382 } 2383 2384 2385 /* 2386 * called under session->mutex. 2387 */ 2388 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 2389 struct ceph_mds_session *session) 2390 { 2391 struct ceph_mds_request *req, *nreq; 2392 int err; 2393 2394 dout("replay_unsafe_requests mds%d\n", session->s_mds); 2395 2396 mutex_lock(&mdsc->mutex); 2397 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) { 2398 err = __prepare_send_request(mdsc, req, session->s_mds); 2399 if (!err) { 2400 ceph_msg_get(req->r_request); 2401 ceph_con_send(&session->s_con, req->r_request); 2402 } 2403 } 2404 mutex_unlock(&mdsc->mutex); 2405 } 2406 2407 /* 2408 * Encode information about a cap for a reconnect with the MDS. 2409 */ 2410 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, 2411 void *arg) 2412 { 2413 union { 2414 struct ceph_mds_cap_reconnect v2; 2415 struct ceph_mds_cap_reconnect_v1 v1; 2416 } rec; 2417 size_t reclen; 2418 struct ceph_inode_info *ci; 2419 struct ceph_reconnect_state *recon_state = arg; 2420 struct ceph_pagelist *pagelist = recon_state->pagelist; 2421 char *path; 2422 int pathlen, err; 2423 u64 pathbase; 2424 struct dentry *dentry; 2425 2426 ci = cap->ci; 2427 2428 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 2429 inode, ceph_vinop(inode), cap, cap->cap_id, 2430 ceph_cap_string(cap->issued)); 2431 err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 2432 if (err) 2433 return err; 2434 2435 dentry = d_find_alias(inode); 2436 if (dentry) { 2437 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0); 2438 if (IS_ERR(path)) { 2439 err = PTR_ERR(path); 2440 goto out_dput; 2441 } 2442 } else { 2443 path = NULL; 2444 pathlen = 0; 2445 } 2446 err = ceph_pagelist_encode_string(pagelist, path, pathlen); 2447 if (err) 2448 goto out_free; 2449 2450 spin_lock(&ci->i_ceph_lock); 2451 cap->seq = 0; /* reset cap seq */ 2452 cap->issue_seq = 0; /* and issue_seq */ 2453 2454 if (recon_state->flock) { 2455 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 2456 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 2457 rec.v2.issued = cpu_to_le32(cap->issued); 2458 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 2459 rec.v2.pathbase = cpu_to_le64(pathbase); 2460 rec.v2.flock_len = 0; 2461 reclen = sizeof(rec.v2); 2462 } else { 2463 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 2464 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 2465 rec.v1.issued = cpu_to_le32(cap->issued); 2466 rec.v1.size = cpu_to_le64(inode->i_size); 2467 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime); 2468 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime); 2469 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 2470 rec.v1.pathbase = cpu_to_le64(pathbase); 2471 reclen = sizeof(rec.v1); 2472 } 2473 spin_unlock(&ci->i_ceph_lock); 2474 2475 if (recon_state->flock) { 2476 int num_fcntl_locks, num_flock_locks; 2477 struct ceph_pagelist_cursor trunc_point; 2478 2479 ceph_pagelist_set_cursor(pagelist, &trunc_point); 2480 do { 2481 lock_flocks(); 2482 ceph_count_locks(inode, &num_fcntl_locks, 2483 &num_flock_locks); 2484 rec.v2.flock_len = (2*sizeof(u32) + 2485 (num_fcntl_locks+num_flock_locks) * 2486 sizeof(struct ceph_filelock)); 2487 unlock_flocks(); 2488 2489 /* pre-alloc pagelist */ 2490 ceph_pagelist_truncate(pagelist, &trunc_point); 2491 err = ceph_pagelist_append(pagelist, &rec, reclen); 2492 if (!err) 2493 err = ceph_pagelist_reserve(pagelist, 2494 rec.v2.flock_len); 2495 2496 /* encode locks */ 2497 if (!err) { 2498 lock_flocks(); 2499 err = ceph_encode_locks(inode, 2500 pagelist, 2501 num_fcntl_locks, 2502 num_flock_locks); 2503 unlock_flocks(); 2504 } 2505 } while (err == -ENOSPC); 2506 } else { 2507 err = ceph_pagelist_append(pagelist, &rec, reclen); 2508 } 2509 2510 out_free: 2511 kfree(path); 2512 out_dput: 2513 dput(dentry); 2514 return err; 2515 } 2516 2517 2518 /* 2519 * If an MDS fails and recovers, clients need to reconnect in order to 2520 * reestablish shared state. This includes all caps issued through 2521 * this session _and_ the snap_realm hierarchy. Because it's not 2522 * clear which snap realms the mds cares about, we send everything we 2523 * know about.. that ensures we'll then get any new info the 2524 * recovering MDS might have. 2525 * 2526 * This is a relatively heavyweight operation, but it's rare. 2527 * 2528 * called with mdsc->mutex held. 2529 */ 2530 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 2531 struct ceph_mds_session *session) 2532 { 2533 struct ceph_msg *reply; 2534 struct rb_node *p; 2535 int mds = session->s_mds; 2536 int err = -ENOMEM; 2537 struct ceph_pagelist *pagelist; 2538 struct ceph_reconnect_state recon_state; 2539 2540 pr_info("mds%d reconnect start\n", mds); 2541 2542 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 2543 if (!pagelist) 2544 goto fail_nopagelist; 2545 ceph_pagelist_init(pagelist); 2546 2547 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false); 2548 if (!reply) 2549 goto fail_nomsg; 2550 2551 mutex_lock(&session->s_mutex); 2552 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 2553 session->s_seq = 0; 2554 2555 ceph_con_close(&session->s_con); 2556 ceph_con_open(&session->s_con, 2557 CEPH_ENTITY_TYPE_MDS, mds, 2558 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 2559 2560 /* replay unsafe requests */ 2561 replay_unsafe_requests(mdsc, session); 2562 2563 down_read(&mdsc->snap_rwsem); 2564 2565 dout("session %p state %s\n", session, 2566 session_state_name(session->s_state)); 2567 2568 /* drop old cap expires; we're about to reestablish that state */ 2569 discard_cap_releases(mdsc, session); 2570 2571 /* traverse this session's caps */ 2572 err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps); 2573 if (err) 2574 goto fail; 2575 2576 recon_state.pagelist = pagelist; 2577 recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK; 2578 err = iterate_session_caps(session, encode_caps_cb, &recon_state); 2579 if (err < 0) 2580 goto fail; 2581 2582 /* 2583 * snaprealms. we provide mds with the ino, seq (version), and 2584 * parent for all of our realms. If the mds has any newer info, 2585 * it will tell us. 2586 */ 2587 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 2588 struct ceph_snap_realm *realm = 2589 rb_entry(p, struct ceph_snap_realm, node); 2590 struct ceph_mds_snaprealm_reconnect sr_rec; 2591 2592 dout(" adding snap realm %llx seq %lld parent %llx\n", 2593 realm->ino, realm->seq, realm->parent_ino); 2594 sr_rec.ino = cpu_to_le64(realm->ino); 2595 sr_rec.seq = cpu_to_le64(realm->seq); 2596 sr_rec.parent = cpu_to_le64(realm->parent_ino); 2597 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 2598 if (err) 2599 goto fail; 2600 } 2601 2602 reply->pagelist = pagelist; 2603 if (recon_state.flock) 2604 reply->hdr.version = cpu_to_le16(2); 2605 reply->hdr.data_len = cpu_to_le32(pagelist->length); 2606 reply->nr_pages = calc_pages_for(0, pagelist->length); 2607 ceph_con_send(&session->s_con, reply); 2608 2609 mutex_unlock(&session->s_mutex); 2610 2611 mutex_lock(&mdsc->mutex); 2612 __wake_requests(mdsc, &session->s_waiting); 2613 mutex_unlock(&mdsc->mutex); 2614 2615 up_read(&mdsc->snap_rwsem); 2616 return; 2617 2618 fail: 2619 ceph_msg_put(reply); 2620 up_read(&mdsc->snap_rwsem); 2621 mutex_unlock(&session->s_mutex); 2622 fail_nomsg: 2623 ceph_pagelist_release(pagelist); 2624 kfree(pagelist); 2625 fail_nopagelist: 2626 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 2627 return; 2628 } 2629 2630 2631 /* 2632 * compare old and new mdsmaps, kicking requests 2633 * and closing out old connections as necessary 2634 * 2635 * called under mdsc->mutex. 2636 */ 2637 static void check_new_map(struct ceph_mds_client *mdsc, 2638 struct ceph_mdsmap *newmap, 2639 struct ceph_mdsmap *oldmap) 2640 { 2641 int i; 2642 int oldstate, newstate; 2643 struct ceph_mds_session *s; 2644 2645 dout("check_new_map new %u old %u\n", 2646 newmap->m_epoch, oldmap->m_epoch); 2647 2648 for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) { 2649 if (mdsc->sessions[i] == NULL) 2650 continue; 2651 s = mdsc->sessions[i]; 2652 oldstate = ceph_mdsmap_get_state(oldmap, i); 2653 newstate = ceph_mdsmap_get_state(newmap, i); 2654 2655 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 2656 i, ceph_mds_state_name(oldstate), 2657 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 2658 ceph_mds_state_name(newstate), 2659 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 2660 session_state_name(s->s_state)); 2661 2662 if (i >= newmap->m_max_mds || 2663 memcmp(ceph_mdsmap_get_addr(oldmap, i), 2664 ceph_mdsmap_get_addr(newmap, i), 2665 sizeof(struct ceph_entity_addr))) { 2666 if (s->s_state == CEPH_MDS_SESSION_OPENING) { 2667 /* the session never opened, just close it 2668 * out now */ 2669 __wake_requests(mdsc, &s->s_waiting); 2670 __unregister_session(mdsc, s); 2671 } else { 2672 /* just close it */ 2673 mutex_unlock(&mdsc->mutex); 2674 mutex_lock(&s->s_mutex); 2675 mutex_lock(&mdsc->mutex); 2676 ceph_con_close(&s->s_con); 2677 mutex_unlock(&s->s_mutex); 2678 s->s_state = CEPH_MDS_SESSION_RESTARTING; 2679 } 2680 2681 /* kick any requests waiting on the recovering mds */ 2682 kick_requests(mdsc, i); 2683 } else if (oldstate == newstate) { 2684 continue; /* nothing new with this mds */ 2685 } 2686 2687 /* 2688 * send reconnect? 2689 */ 2690 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 2691 newstate >= CEPH_MDS_STATE_RECONNECT) { 2692 mutex_unlock(&mdsc->mutex); 2693 send_mds_reconnect(mdsc, s); 2694 mutex_lock(&mdsc->mutex); 2695 } 2696 2697 /* 2698 * kick request on any mds that has gone active. 2699 */ 2700 if (oldstate < CEPH_MDS_STATE_ACTIVE && 2701 newstate >= CEPH_MDS_STATE_ACTIVE) { 2702 if (oldstate != CEPH_MDS_STATE_CREATING && 2703 oldstate != CEPH_MDS_STATE_STARTING) 2704 pr_info("mds%d recovery completed\n", s->s_mds); 2705 kick_requests(mdsc, i); 2706 ceph_kick_flushing_caps(mdsc, s); 2707 wake_up_session_caps(s, 1); 2708 } 2709 } 2710 2711 for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) { 2712 s = mdsc->sessions[i]; 2713 if (!s) 2714 continue; 2715 if (!ceph_mdsmap_is_laggy(newmap, i)) 2716 continue; 2717 if (s->s_state == CEPH_MDS_SESSION_OPEN || 2718 s->s_state == CEPH_MDS_SESSION_HUNG || 2719 s->s_state == CEPH_MDS_SESSION_CLOSING) { 2720 dout(" connecting to export targets of laggy mds%d\n", 2721 i); 2722 __open_export_target_sessions(mdsc, s); 2723 } 2724 } 2725 } 2726 2727 2728 2729 /* 2730 * leases 2731 */ 2732 2733 /* 2734 * caller must hold session s_mutex, dentry->d_lock 2735 */ 2736 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 2737 { 2738 struct ceph_dentry_info *di = ceph_dentry(dentry); 2739 2740 ceph_put_mds_session(di->lease_session); 2741 di->lease_session = NULL; 2742 } 2743 2744 static void handle_lease(struct ceph_mds_client *mdsc, 2745 struct ceph_mds_session *session, 2746 struct ceph_msg *msg) 2747 { 2748 struct super_block *sb = mdsc->fsc->sb; 2749 struct inode *inode; 2750 struct dentry *parent, *dentry; 2751 struct ceph_dentry_info *di; 2752 int mds = session->s_mds; 2753 struct ceph_mds_lease *h = msg->front.iov_base; 2754 u32 seq; 2755 struct ceph_vino vino; 2756 struct qstr dname; 2757 int release = 0; 2758 2759 dout("handle_lease from mds%d\n", mds); 2760 2761 /* decode */ 2762 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 2763 goto bad; 2764 vino.ino = le64_to_cpu(h->ino); 2765 vino.snap = CEPH_NOSNAP; 2766 seq = le32_to_cpu(h->seq); 2767 dname.name = (void *)h + sizeof(*h) + sizeof(u32); 2768 dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32); 2769 if (dname.len != get_unaligned_le32(h+1)) 2770 goto bad; 2771 2772 mutex_lock(&session->s_mutex); 2773 session->s_seq++; 2774 2775 /* lookup inode */ 2776 inode = ceph_find_inode(sb, vino); 2777 dout("handle_lease %s, ino %llx %p %.*s\n", 2778 ceph_lease_op_name(h->action), vino.ino, inode, 2779 dname.len, dname.name); 2780 if (inode == NULL) { 2781 dout("handle_lease no inode %llx\n", vino.ino); 2782 goto release; 2783 } 2784 2785 /* dentry */ 2786 parent = d_find_alias(inode); 2787 if (!parent) { 2788 dout("no parent dentry on inode %p\n", inode); 2789 WARN_ON(1); 2790 goto release; /* hrm... */ 2791 } 2792 dname.hash = full_name_hash(dname.name, dname.len); 2793 dentry = d_lookup(parent, &dname); 2794 dput(parent); 2795 if (!dentry) 2796 goto release; 2797 2798 spin_lock(&dentry->d_lock); 2799 di = ceph_dentry(dentry); 2800 switch (h->action) { 2801 case CEPH_MDS_LEASE_REVOKE: 2802 if (di->lease_session == session) { 2803 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 2804 h->seq = cpu_to_le32(di->lease_seq); 2805 __ceph_mdsc_drop_dentry_lease(dentry); 2806 } 2807 release = 1; 2808 break; 2809 2810 case CEPH_MDS_LEASE_RENEW: 2811 if (di->lease_session == session && 2812 di->lease_gen == session->s_cap_gen && 2813 di->lease_renew_from && 2814 di->lease_renew_after == 0) { 2815 unsigned long duration = 2816 le32_to_cpu(h->duration_ms) * HZ / 1000; 2817 2818 di->lease_seq = seq; 2819 dentry->d_time = di->lease_renew_from + duration; 2820 di->lease_renew_after = di->lease_renew_from + 2821 (duration >> 1); 2822 di->lease_renew_from = 0; 2823 } 2824 break; 2825 } 2826 spin_unlock(&dentry->d_lock); 2827 dput(dentry); 2828 2829 if (!release) 2830 goto out; 2831 2832 release: 2833 /* let's just reuse the same message */ 2834 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 2835 ceph_msg_get(msg); 2836 ceph_con_send(&session->s_con, msg); 2837 2838 out: 2839 iput(inode); 2840 mutex_unlock(&session->s_mutex); 2841 return; 2842 2843 bad: 2844 pr_err("corrupt lease message\n"); 2845 ceph_msg_dump(msg); 2846 } 2847 2848 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 2849 struct inode *inode, 2850 struct dentry *dentry, char action, 2851 u32 seq) 2852 { 2853 struct ceph_msg *msg; 2854 struct ceph_mds_lease *lease; 2855 int len = sizeof(*lease) + sizeof(u32); 2856 int dnamelen = 0; 2857 2858 dout("lease_send_msg inode %p dentry %p %s to mds%d\n", 2859 inode, dentry, ceph_lease_op_name(action), session->s_mds); 2860 dnamelen = dentry->d_name.len; 2861 len += dnamelen; 2862 2863 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 2864 if (!msg) 2865 return; 2866 lease = msg->front.iov_base; 2867 lease->action = action; 2868 lease->ino = cpu_to_le64(ceph_vino(inode).ino); 2869 lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap); 2870 lease->seq = cpu_to_le32(seq); 2871 put_unaligned_le32(dnamelen, lease + 1); 2872 memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen); 2873 2874 /* 2875 * if this is a preemptive lease RELEASE, no need to 2876 * flush request stream, since the actual request will 2877 * soon follow. 2878 */ 2879 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 2880 2881 ceph_con_send(&session->s_con, msg); 2882 } 2883 2884 /* 2885 * Preemptively release a lease we expect to invalidate anyway. 2886 * Pass @inode always, @dentry is optional. 2887 */ 2888 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, 2889 struct dentry *dentry) 2890 { 2891 struct ceph_dentry_info *di; 2892 struct ceph_mds_session *session; 2893 u32 seq; 2894 2895 BUG_ON(inode == NULL); 2896 BUG_ON(dentry == NULL); 2897 2898 /* is dentry lease valid? */ 2899 spin_lock(&dentry->d_lock); 2900 di = ceph_dentry(dentry); 2901 if (!di || !di->lease_session || 2902 di->lease_session->s_mds < 0 || 2903 di->lease_gen != di->lease_session->s_cap_gen || 2904 !time_before(jiffies, dentry->d_time)) { 2905 dout("lease_release inode %p dentry %p -- " 2906 "no lease\n", 2907 inode, dentry); 2908 spin_unlock(&dentry->d_lock); 2909 return; 2910 } 2911 2912 /* we do have a lease on this dentry; note mds and seq */ 2913 session = ceph_get_mds_session(di->lease_session); 2914 seq = di->lease_seq; 2915 __ceph_mdsc_drop_dentry_lease(dentry); 2916 spin_unlock(&dentry->d_lock); 2917 2918 dout("lease_release inode %p dentry %p to mds%d\n", 2919 inode, dentry, session->s_mds); 2920 ceph_mdsc_lease_send_msg(session, inode, dentry, 2921 CEPH_MDS_LEASE_RELEASE, seq); 2922 ceph_put_mds_session(session); 2923 } 2924 2925 /* 2926 * drop all leases (and dentry refs) in preparation for umount 2927 */ 2928 static void drop_leases(struct ceph_mds_client *mdsc) 2929 { 2930 int i; 2931 2932 dout("drop_leases\n"); 2933 mutex_lock(&mdsc->mutex); 2934 for (i = 0; i < mdsc->max_sessions; i++) { 2935 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 2936 if (!s) 2937 continue; 2938 mutex_unlock(&mdsc->mutex); 2939 mutex_lock(&s->s_mutex); 2940 mutex_unlock(&s->s_mutex); 2941 ceph_put_mds_session(s); 2942 mutex_lock(&mdsc->mutex); 2943 } 2944 mutex_unlock(&mdsc->mutex); 2945 } 2946 2947 2948 2949 /* 2950 * delayed work -- periodically trim expired leases, renew caps with mds 2951 */ 2952 static void schedule_delayed(struct ceph_mds_client *mdsc) 2953 { 2954 int delay = 5; 2955 unsigned hz = round_jiffies_relative(HZ * delay); 2956 schedule_delayed_work(&mdsc->delayed_work, hz); 2957 } 2958 2959 static void delayed_work(struct work_struct *work) 2960 { 2961 int i; 2962 struct ceph_mds_client *mdsc = 2963 container_of(work, struct ceph_mds_client, delayed_work.work); 2964 int renew_interval; 2965 int renew_caps; 2966 2967 dout("mdsc delayed_work\n"); 2968 ceph_check_delayed_caps(mdsc); 2969 2970 mutex_lock(&mdsc->mutex); 2971 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 2972 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 2973 mdsc->last_renew_caps); 2974 if (renew_caps) 2975 mdsc->last_renew_caps = jiffies; 2976 2977 for (i = 0; i < mdsc->max_sessions; i++) { 2978 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 2979 if (s == NULL) 2980 continue; 2981 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 2982 dout("resending session close request for mds%d\n", 2983 s->s_mds); 2984 request_close_session(mdsc, s); 2985 ceph_put_mds_session(s); 2986 continue; 2987 } 2988 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 2989 if (s->s_state == CEPH_MDS_SESSION_OPEN) { 2990 s->s_state = CEPH_MDS_SESSION_HUNG; 2991 pr_info("mds%d hung\n", s->s_mds); 2992 } 2993 } 2994 if (s->s_state < CEPH_MDS_SESSION_OPEN) { 2995 /* this mds is failed or recovering, just wait */ 2996 ceph_put_mds_session(s); 2997 continue; 2998 } 2999 mutex_unlock(&mdsc->mutex); 3000 3001 mutex_lock(&s->s_mutex); 3002 if (renew_caps) 3003 send_renew_caps(mdsc, s); 3004 else 3005 ceph_con_keepalive(&s->s_con); 3006 ceph_add_cap_releases(mdsc, s); 3007 if (s->s_state == CEPH_MDS_SESSION_OPEN || 3008 s->s_state == CEPH_MDS_SESSION_HUNG) 3009 ceph_send_cap_releases(mdsc, s); 3010 mutex_unlock(&s->s_mutex); 3011 ceph_put_mds_session(s); 3012 3013 mutex_lock(&mdsc->mutex); 3014 } 3015 mutex_unlock(&mdsc->mutex); 3016 3017 schedule_delayed(mdsc); 3018 } 3019 3020 int ceph_mdsc_init(struct ceph_fs_client *fsc) 3021 3022 { 3023 struct ceph_mds_client *mdsc; 3024 3025 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 3026 if (!mdsc) 3027 return -ENOMEM; 3028 mdsc->fsc = fsc; 3029 fsc->mdsc = mdsc; 3030 mutex_init(&mdsc->mutex); 3031 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 3032 if (mdsc->mdsmap == NULL) 3033 return -ENOMEM; 3034 3035 init_completion(&mdsc->safe_umount_waiters); 3036 init_waitqueue_head(&mdsc->session_close_wq); 3037 INIT_LIST_HEAD(&mdsc->waiting_for_map); 3038 mdsc->sessions = NULL; 3039 mdsc->max_sessions = 0; 3040 mdsc->stopping = 0; 3041 init_rwsem(&mdsc->snap_rwsem); 3042 mdsc->snap_realms = RB_ROOT; 3043 INIT_LIST_HEAD(&mdsc->snap_empty); 3044 spin_lock_init(&mdsc->snap_empty_lock); 3045 mdsc->last_tid = 0; 3046 mdsc->request_tree = RB_ROOT; 3047 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 3048 mdsc->last_renew_caps = jiffies; 3049 INIT_LIST_HEAD(&mdsc->cap_delay_list); 3050 spin_lock_init(&mdsc->cap_delay_lock); 3051 INIT_LIST_HEAD(&mdsc->snap_flush_list); 3052 spin_lock_init(&mdsc->snap_flush_lock); 3053 mdsc->cap_flush_seq = 0; 3054 INIT_LIST_HEAD(&mdsc->cap_dirty); 3055 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 3056 mdsc->num_cap_flushing = 0; 3057 spin_lock_init(&mdsc->cap_dirty_lock); 3058 init_waitqueue_head(&mdsc->cap_flushing_wq); 3059 spin_lock_init(&mdsc->dentry_lru_lock); 3060 INIT_LIST_HEAD(&mdsc->dentry_lru); 3061 3062 ceph_caps_init(mdsc); 3063 ceph_adjust_min_caps(mdsc, fsc->min_caps); 3064 3065 return 0; 3066 } 3067 3068 /* 3069 * Wait for safe replies on open mds requests. If we time out, drop 3070 * all requests from the tree to avoid dangling dentry refs. 3071 */ 3072 static void wait_requests(struct ceph_mds_client *mdsc) 3073 { 3074 struct ceph_mds_request *req; 3075 struct ceph_fs_client *fsc = mdsc->fsc; 3076 3077 mutex_lock(&mdsc->mutex); 3078 if (__get_oldest_req(mdsc)) { 3079 mutex_unlock(&mdsc->mutex); 3080 3081 dout("wait_requests waiting for requests\n"); 3082 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 3083 fsc->client->options->mount_timeout * HZ); 3084 3085 /* tear down remaining requests */ 3086 mutex_lock(&mdsc->mutex); 3087 while ((req = __get_oldest_req(mdsc))) { 3088 dout("wait_requests timed out on tid %llu\n", 3089 req->r_tid); 3090 __unregister_request(mdsc, req); 3091 } 3092 } 3093 mutex_unlock(&mdsc->mutex); 3094 dout("wait_requests done\n"); 3095 } 3096 3097 /* 3098 * called before mount is ro, and before dentries are torn down. 3099 * (hmm, does this still race with new lookups?) 3100 */ 3101 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 3102 { 3103 dout("pre_umount\n"); 3104 mdsc->stopping = 1; 3105 3106 drop_leases(mdsc); 3107 ceph_flush_dirty_caps(mdsc); 3108 wait_requests(mdsc); 3109 3110 /* 3111 * wait for reply handlers to drop their request refs and 3112 * their inode/dcache refs 3113 */ 3114 ceph_msgr_flush(); 3115 } 3116 3117 /* 3118 * wait for all write mds requests to flush. 3119 */ 3120 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 3121 { 3122 struct ceph_mds_request *req = NULL, *nextreq; 3123 struct rb_node *n; 3124 3125 mutex_lock(&mdsc->mutex); 3126 dout("wait_unsafe_requests want %lld\n", want_tid); 3127 restart: 3128 req = __get_oldest_req(mdsc); 3129 while (req && req->r_tid <= want_tid) { 3130 /* find next request */ 3131 n = rb_next(&req->r_node); 3132 if (n) 3133 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 3134 else 3135 nextreq = NULL; 3136 if ((req->r_op & CEPH_MDS_OP_WRITE)) { 3137 /* write op */ 3138 ceph_mdsc_get_request(req); 3139 if (nextreq) 3140 ceph_mdsc_get_request(nextreq); 3141 mutex_unlock(&mdsc->mutex); 3142 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 3143 req->r_tid, want_tid); 3144 wait_for_completion(&req->r_safe_completion); 3145 mutex_lock(&mdsc->mutex); 3146 ceph_mdsc_put_request(req); 3147 if (!nextreq) 3148 break; /* next dne before, so we're done! */ 3149 if (RB_EMPTY_NODE(&nextreq->r_node)) { 3150 /* next request was removed from tree */ 3151 ceph_mdsc_put_request(nextreq); 3152 goto restart; 3153 } 3154 ceph_mdsc_put_request(nextreq); /* won't go away */ 3155 } 3156 req = nextreq; 3157 } 3158 mutex_unlock(&mdsc->mutex); 3159 dout("wait_unsafe_requests done\n"); 3160 } 3161 3162 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 3163 { 3164 u64 want_tid, want_flush; 3165 3166 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) 3167 return; 3168 3169 dout("sync\n"); 3170 mutex_lock(&mdsc->mutex); 3171 want_tid = mdsc->last_tid; 3172 want_flush = mdsc->cap_flush_seq; 3173 mutex_unlock(&mdsc->mutex); 3174 dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush); 3175 3176 ceph_flush_dirty_caps(mdsc); 3177 3178 wait_unsafe_requests(mdsc, want_tid); 3179 wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush)); 3180 } 3181 3182 /* 3183 * true if all sessions are closed, or we force unmount 3184 */ 3185 static bool done_closing_sessions(struct ceph_mds_client *mdsc) 3186 { 3187 int i, n = 0; 3188 3189 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) 3190 return true; 3191 3192 mutex_lock(&mdsc->mutex); 3193 for (i = 0; i < mdsc->max_sessions; i++) 3194 if (mdsc->sessions[i]) 3195 n++; 3196 mutex_unlock(&mdsc->mutex); 3197 return n == 0; 3198 } 3199 3200 /* 3201 * called after sb is ro. 3202 */ 3203 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 3204 { 3205 struct ceph_mds_session *session; 3206 int i; 3207 struct ceph_fs_client *fsc = mdsc->fsc; 3208 unsigned long timeout = fsc->client->options->mount_timeout * HZ; 3209 3210 dout("close_sessions\n"); 3211 3212 /* close sessions */ 3213 mutex_lock(&mdsc->mutex); 3214 for (i = 0; i < mdsc->max_sessions; i++) { 3215 session = __ceph_lookup_mds_session(mdsc, i); 3216 if (!session) 3217 continue; 3218 mutex_unlock(&mdsc->mutex); 3219 mutex_lock(&session->s_mutex); 3220 __close_session(mdsc, session); 3221 mutex_unlock(&session->s_mutex); 3222 ceph_put_mds_session(session); 3223 mutex_lock(&mdsc->mutex); 3224 } 3225 mutex_unlock(&mdsc->mutex); 3226 3227 dout("waiting for sessions to close\n"); 3228 wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc), 3229 timeout); 3230 3231 /* tear down remaining sessions */ 3232 mutex_lock(&mdsc->mutex); 3233 for (i = 0; i < mdsc->max_sessions; i++) { 3234 if (mdsc->sessions[i]) { 3235 session = get_session(mdsc->sessions[i]); 3236 __unregister_session(mdsc, session); 3237 mutex_unlock(&mdsc->mutex); 3238 mutex_lock(&session->s_mutex); 3239 remove_session_caps(session); 3240 mutex_unlock(&session->s_mutex); 3241 ceph_put_mds_session(session); 3242 mutex_lock(&mdsc->mutex); 3243 } 3244 } 3245 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 3246 mutex_unlock(&mdsc->mutex); 3247 3248 ceph_cleanup_empty_realms(mdsc); 3249 3250 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 3251 3252 dout("stopped\n"); 3253 } 3254 3255 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 3256 { 3257 dout("stop\n"); 3258 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 3259 if (mdsc->mdsmap) 3260 ceph_mdsmap_destroy(mdsc->mdsmap); 3261 kfree(mdsc->sessions); 3262 ceph_caps_finalize(mdsc); 3263 } 3264 3265 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 3266 { 3267 struct ceph_mds_client *mdsc = fsc->mdsc; 3268 3269 dout("mdsc_destroy %p\n", mdsc); 3270 ceph_mdsc_stop(mdsc); 3271 3272 /* flush out any connection work with references to us */ 3273 ceph_msgr_flush(); 3274 3275 fsc->mdsc = NULL; 3276 kfree(mdsc); 3277 dout("mdsc_destroy %p done\n", mdsc); 3278 } 3279 3280 3281 /* 3282 * handle mds map update. 3283 */ 3284 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 3285 { 3286 u32 epoch; 3287 u32 maplen; 3288 void *p = msg->front.iov_base; 3289 void *end = p + msg->front.iov_len; 3290 struct ceph_mdsmap *newmap, *oldmap; 3291 struct ceph_fsid fsid; 3292 int err = -EINVAL; 3293 3294 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 3295 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 3296 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 3297 return; 3298 epoch = ceph_decode_32(&p); 3299 maplen = ceph_decode_32(&p); 3300 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 3301 3302 /* do we need it? */ 3303 ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch); 3304 mutex_lock(&mdsc->mutex); 3305 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 3306 dout("handle_map epoch %u <= our %u\n", 3307 epoch, mdsc->mdsmap->m_epoch); 3308 mutex_unlock(&mdsc->mutex); 3309 return; 3310 } 3311 3312 newmap = ceph_mdsmap_decode(&p, end); 3313 if (IS_ERR(newmap)) { 3314 err = PTR_ERR(newmap); 3315 goto bad_unlock; 3316 } 3317 3318 /* swap into place */ 3319 if (mdsc->mdsmap) { 3320 oldmap = mdsc->mdsmap; 3321 mdsc->mdsmap = newmap; 3322 check_new_map(mdsc, newmap, oldmap); 3323 ceph_mdsmap_destroy(oldmap); 3324 } else { 3325 mdsc->mdsmap = newmap; /* first mds map */ 3326 } 3327 mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size; 3328 3329 __wake_requests(mdsc, &mdsc->waiting_for_map); 3330 3331 mutex_unlock(&mdsc->mutex); 3332 schedule_delayed(mdsc); 3333 return; 3334 3335 bad_unlock: 3336 mutex_unlock(&mdsc->mutex); 3337 bad: 3338 pr_err("error decoding mdsmap %d\n", err); 3339 return; 3340 } 3341 3342 static struct ceph_connection *con_get(struct ceph_connection *con) 3343 { 3344 struct ceph_mds_session *s = con->private; 3345 3346 if (get_session(s)) { 3347 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref)); 3348 return con; 3349 } 3350 dout("mdsc con_get %p FAIL\n", s); 3351 return NULL; 3352 } 3353 3354 static void con_put(struct ceph_connection *con) 3355 { 3356 struct ceph_mds_session *s = con->private; 3357 3358 dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1); 3359 ceph_put_mds_session(s); 3360 } 3361 3362 /* 3363 * if the client is unresponsive for long enough, the mds will kill 3364 * the session entirely. 3365 */ 3366 static void peer_reset(struct ceph_connection *con) 3367 { 3368 struct ceph_mds_session *s = con->private; 3369 struct ceph_mds_client *mdsc = s->s_mdsc; 3370 3371 pr_warning("mds%d closed our session\n", s->s_mds); 3372 send_mds_reconnect(mdsc, s); 3373 } 3374 3375 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 3376 { 3377 struct ceph_mds_session *s = con->private; 3378 struct ceph_mds_client *mdsc = s->s_mdsc; 3379 int type = le16_to_cpu(msg->hdr.type); 3380 3381 mutex_lock(&mdsc->mutex); 3382 if (__verify_registered_session(mdsc, s) < 0) { 3383 mutex_unlock(&mdsc->mutex); 3384 goto out; 3385 } 3386 mutex_unlock(&mdsc->mutex); 3387 3388 switch (type) { 3389 case CEPH_MSG_MDS_MAP: 3390 ceph_mdsc_handle_map(mdsc, msg); 3391 break; 3392 case CEPH_MSG_CLIENT_SESSION: 3393 handle_session(s, msg); 3394 break; 3395 case CEPH_MSG_CLIENT_REPLY: 3396 handle_reply(s, msg); 3397 break; 3398 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 3399 handle_forward(mdsc, s, msg); 3400 break; 3401 case CEPH_MSG_CLIENT_CAPS: 3402 ceph_handle_caps(s, msg); 3403 break; 3404 case CEPH_MSG_CLIENT_SNAP: 3405 ceph_handle_snap(mdsc, s, msg); 3406 break; 3407 case CEPH_MSG_CLIENT_LEASE: 3408 handle_lease(mdsc, s, msg); 3409 break; 3410 3411 default: 3412 pr_err("received unknown message type %d %s\n", type, 3413 ceph_msg_type_name(type)); 3414 } 3415 out: 3416 ceph_msg_put(msg); 3417 } 3418 3419 /* 3420 * authentication 3421 */ 3422 3423 /* 3424 * Note: returned pointer is the address of a structure that's 3425 * managed separately. Caller must *not* attempt to free it. 3426 */ 3427 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 3428 int *proto, int force_new) 3429 { 3430 struct ceph_mds_session *s = con->private; 3431 struct ceph_mds_client *mdsc = s->s_mdsc; 3432 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3433 struct ceph_auth_handshake *auth = &s->s_auth; 3434 3435 if (force_new && auth->authorizer) { 3436 if (ac->ops && ac->ops->destroy_authorizer) 3437 ac->ops->destroy_authorizer(ac, auth->authorizer); 3438 auth->authorizer = NULL; 3439 } 3440 if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) { 3441 int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 3442 auth); 3443 if (ret) 3444 return ERR_PTR(ret); 3445 } 3446 *proto = ac->protocol; 3447 3448 return auth; 3449 } 3450 3451 3452 static int verify_authorizer_reply(struct ceph_connection *con, int len) 3453 { 3454 struct ceph_mds_session *s = con->private; 3455 struct ceph_mds_client *mdsc = s->s_mdsc; 3456 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3457 3458 return ac->ops->verify_authorizer_reply(ac, s->s_auth.authorizer, len); 3459 } 3460 3461 static int invalidate_authorizer(struct ceph_connection *con) 3462 { 3463 struct ceph_mds_session *s = con->private; 3464 struct ceph_mds_client *mdsc = s->s_mdsc; 3465 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3466 3467 if (ac->ops->invalidate_authorizer) 3468 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 3469 3470 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 3471 } 3472 3473 static const struct ceph_connection_operations mds_con_ops = { 3474 .get = con_get, 3475 .put = con_put, 3476 .dispatch = dispatch, 3477 .get_authorizer = get_authorizer, 3478 .verify_authorizer_reply = verify_authorizer_reply, 3479 .invalidate_authorizer = invalidate_authorizer, 3480 .peer_reset = peer_reset, 3481 }; 3482 3483 /* eof */ 3484