1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/fs.h> 4 #include <linux/wait.h> 5 #include <linux/slab.h> 6 #include <linux/sched.h> 7 #include <linux/debugfs.h> 8 #include <linux/seq_file.h> 9 10 #include "super.h" 11 #include "mds_client.h" 12 13 #include <linux/ceph/ceph_features.h> 14 #include <linux/ceph/messenger.h> 15 #include <linux/ceph/decode.h> 16 #include <linux/ceph/pagelist.h> 17 #include <linux/ceph/auth.h> 18 #include <linux/ceph/debugfs.h> 19 20 /* 21 * A cluster of MDS (metadata server) daemons is responsible for 22 * managing the file system namespace (the directory hierarchy and 23 * inodes) and for coordinating shared access to storage. Metadata is 24 * partitioning hierarchically across a number of servers, and that 25 * partition varies over time as the cluster adjusts the distribution 26 * in order to balance load. 27 * 28 * The MDS client is primarily responsible to managing synchronous 29 * metadata requests for operations like open, unlink, and so forth. 30 * If there is a MDS failure, we find out about it when we (possibly 31 * request and) receive a new MDS map, and can resubmit affected 32 * requests. 33 * 34 * For the most part, though, we take advantage of a lossless 35 * communications channel to the MDS, and do not need to worry about 36 * timing out or resubmitting requests. 37 * 38 * We maintain a stateful "session" with each MDS we interact with. 39 * Within each session, we sent periodic heartbeat messages to ensure 40 * any capabilities or leases we have been issues remain valid. If 41 * the session times out and goes stale, our leases and capabilities 42 * are no longer valid. 43 */ 44 45 struct ceph_reconnect_state { 46 struct ceph_pagelist *pagelist; 47 bool flock; 48 }; 49 50 static void __wake_requests(struct ceph_mds_client *mdsc, 51 struct list_head *head); 52 53 static const struct ceph_connection_operations mds_con_ops; 54 55 56 /* 57 * mds reply parsing 58 */ 59 60 /* 61 * parse individual inode info 62 */ 63 static int parse_reply_info_in(void **p, void *end, 64 struct ceph_mds_reply_info_in *info, 65 int features) 66 { 67 int err = -EIO; 68 69 info->in = *p; 70 *p += sizeof(struct ceph_mds_reply_inode) + 71 sizeof(*info->in->fragtree.splits) * 72 le32_to_cpu(info->in->fragtree.nsplits); 73 74 ceph_decode_32_safe(p, end, info->symlink_len, bad); 75 ceph_decode_need(p, end, info->symlink_len, bad); 76 info->symlink = *p; 77 *p += info->symlink_len; 78 79 if (features & CEPH_FEATURE_DIRLAYOUTHASH) 80 ceph_decode_copy_safe(p, end, &info->dir_layout, 81 sizeof(info->dir_layout), bad); 82 else 83 memset(&info->dir_layout, 0, sizeof(info->dir_layout)); 84 85 ceph_decode_32_safe(p, end, info->xattr_len, bad); 86 ceph_decode_need(p, end, info->xattr_len, bad); 87 info->xattr_data = *p; 88 *p += info->xattr_len; 89 return 0; 90 bad: 91 return err; 92 } 93 94 /* 95 * parse a normal reply, which may contain a (dir+)dentry and/or a 96 * target inode. 97 */ 98 static int parse_reply_info_trace(void **p, void *end, 99 struct ceph_mds_reply_info_parsed *info, 100 int features) 101 { 102 int err; 103 104 if (info->head->is_dentry) { 105 err = parse_reply_info_in(p, end, &info->diri, features); 106 if (err < 0) 107 goto out_bad; 108 109 if (unlikely(*p + sizeof(*info->dirfrag) > end)) 110 goto bad; 111 info->dirfrag = *p; 112 *p += sizeof(*info->dirfrag) + 113 sizeof(u32)*le32_to_cpu(info->dirfrag->ndist); 114 if (unlikely(*p > end)) 115 goto bad; 116 117 ceph_decode_32_safe(p, end, info->dname_len, bad); 118 ceph_decode_need(p, end, info->dname_len, bad); 119 info->dname = *p; 120 *p += info->dname_len; 121 info->dlease = *p; 122 *p += sizeof(*info->dlease); 123 } 124 125 if (info->head->is_target) { 126 err = parse_reply_info_in(p, end, &info->targeti, features); 127 if (err < 0) 128 goto out_bad; 129 } 130 131 if (unlikely(*p != end)) 132 goto bad; 133 return 0; 134 135 bad: 136 err = -EIO; 137 out_bad: 138 pr_err("problem parsing mds trace %d\n", err); 139 return err; 140 } 141 142 /* 143 * parse readdir results 144 */ 145 static int parse_reply_info_dir(void **p, void *end, 146 struct ceph_mds_reply_info_parsed *info, 147 int features) 148 { 149 u32 num, i = 0; 150 int err; 151 152 info->dir_dir = *p; 153 if (*p + sizeof(*info->dir_dir) > end) 154 goto bad; 155 *p += sizeof(*info->dir_dir) + 156 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist); 157 if (*p > end) 158 goto bad; 159 160 ceph_decode_need(p, end, sizeof(num) + 2, bad); 161 num = ceph_decode_32(p); 162 info->dir_end = ceph_decode_8(p); 163 info->dir_complete = ceph_decode_8(p); 164 if (num == 0) 165 goto done; 166 167 /* alloc large array */ 168 info->dir_nr = num; 169 info->dir_in = kcalloc(num, sizeof(*info->dir_in) + 170 sizeof(*info->dir_dname) + 171 sizeof(*info->dir_dname_len) + 172 sizeof(*info->dir_dlease), 173 GFP_NOFS); 174 if (info->dir_in == NULL) { 175 err = -ENOMEM; 176 goto out_bad; 177 } 178 info->dir_dname = (void *)(info->dir_in + num); 179 info->dir_dname_len = (void *)(info->dir_dname + num); 180 info->dir_dlease = (void *)(info->dir_dname_len + num); 181 182 while (num) { 183 /* dentry */ 184 ceph_decode_need(p, end, sizeof(u32)*2, bad); 185 info->dir_dname_len[i] = ceph_decode_32(p); 186 ceph_decode_need(p, end, info->dir_dname_len[i], bad); 187 info->dir_dname[i] = *p; 188 *p += info->dir_dname_len[i]; 189 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i], 190 info->dir_dname[i]); 191 info->dir_dlease[i] = *p; 192 *p += sizeof(struct ceph_mds_reply_lease); 193 194 /* inode */ 195 err = parse_reply_info_in(p, end, &info->dir_in[i], features); 196 if (err < 0) 197 goto out_bad; 198 i++; 199 num--; 200 } 201 202 done: 203 if (*p != end) 204 goto bad; 205 return 0; 206 207 bad: 208 err = -EIO; 209 out_bad: 210 pr_err("problem parsing dir contents %d\n", err); 211 return err; 212 } 213 214 /* 215 * parse fcntl F_GETLK results 216 */ 217 static int parse_reply_info_filelock(void **p, void *end, 218 struct ceph_mds_reply_info_parsed *info, 219 int features) 220 { 221 if (*p + sizeof(*info->filelock_reply) > end) 222 goto bad; 223 224 info->filelock_reply = *p; 225 *p += sizeof(*info->filelock_reply); 226 227 if (unlikely(*p != end)) 228 goto bad; 229 return 0; 230 231 bad: 232 return -EIO; 233 } 234 235 /* 236 * parse create results 237 */ 238 static int parse_reply_info_create(void **p, void *end, 239 struct ceph_mds_reply_info_parsed *info, 240 int features) 241 { 242 if (features & CEPH_FEATURE_REPLY_CREATE_INODE) { 243 if (*p == end) { 244 info->has_create_ino = false; 245 } else { 246 info->has_create_ino = true; 247 info->ino = ceph_decode_64(p); 248 } 249 } 250 251 if (unlikely(*p != end)) 252 goto bad; 253 return 0; 254 255 bad: 256 return -EIO; 257 } 258 259 /* 260 * parse extra results 261 */ 262 static int parse_reply_info_extra(void **p, void *end, 263 struct ceph_mds_reply_info_parsed *info, 264 int features) 265 { 266 if (info->head->op == CEPH_MDS_OP_GETFILELOCK) 267 return parse_reply_info_filelock(p, end, info, features); 268 else if (info->head->op == CEPH_MDS_OP_READDIR || 269 info->head->op == CEPH_MDS_OP_LSSNAP) 270 return parse_reply_info_dir(p, end, info, features); 271 else if (info->head->op == CEPH_MDS_OP_CREATE) 272 return parse_reply_info_create(p, end, info, features); 273 else 274 return -EIO; 275 } 276 277 /* 278 * parse entire mds reply 279 */ 280 static int parse_reply_info(struct ceph_msg *msg, 281 struct ceph_mds_reply_info_parsed *info, 282 int features) 283 { 284 void *p, *end; 285 u32 len; 286 int err; 287 288 info->head = msg->front.iov_base; 289 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 290 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 291 292 /* trace */ 293 ceph_decode_32_safe(&p, end, len, bad); 294 if (len > 0) { 295 ceph_decode_need(&p, end, len, bad); 296 err = parse_reply_info_trace(&p, p+len, info, features); 297 if (err < 0) 298 goto out_bad; 299 } 300 301 /* extra */ 302 ceph_decode_32_safe(&p, end, len, bad); 303 if (len > 0) { 304 ceph_decode_need(&p, end, len, bad); 305 err = parse_reply_info_extra(&p, p+len, info, features); 306 if (err < 0) 307 goto out_bad; 308 } 309 310 /* snap blob */ 311 ceph_decode_32_safe(&p, end, len, bad); 312 info->snapblob_len = len; 313 info->snapblob = p; 314 p += len; 315 316 if (p != end) 317 goto bad; 318 return 0; 319 320 bad: 321 err = -EIO; 322 out_bad: 323 pr_err("mds parse_reply err %d\n", err); 324 return err; 325 } 326 327 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 328 { 329 kfree(info->dir_in); 330 } 331 332 333 /* 334 * sessions 335 */ 336 static const char *session_state_name(int s) 337 { 338 switch (s) { 339 case CEPH_MDS_SESSION_NEW: return "new"; 340 case CEPH_MDS_SESSION_OPENING: return "opening"; 341 case CEPH_MDS_SESSION_OPEN: return "open"; 342 case CEPH_MDS_SESSION_HUNG: return "hung"; 343 case CEPH_MDS_SESSION_CLOSING: return "closing"; 344 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 345 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 346 default: return "???"; 347 } 348 } 349 350 static struct ceph_mds_session *get_session(struct ceph_mds_session *s) 351 { 352 if (atomic_inc_not_zero(&s->s_ref)) { 353 dout("mdsc get_session %p %d -> %d\n", s, 354 atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref)); 355 return s; 356 } else { 357 dout("mdsc get_session %p 0 -- FAIL", s); 358 return NULL; 359 } 360 } 361 362 void ceph_put_mds_session(struct ceph_mds_session *s) 363 { 364 dout("mdsc put_session %p %d -> %d\n", s, 365 atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1); 366 if (atomic_dec_and_test(&s->s_ref)) { 367 if (s->s_auth.authorizer) 368 ceph_auth_destroy_authorizer( 369 s->s_mdsc->fsc->client->monc.auth, 370 s->s_auth.authorizer); 371 kfree(s); 372 } 373 } 374 375 /* 376 * called under mdsc->mutex 377 */ 378 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 379 int mds) 380 { 381 struct ceph_mds_session *session; 382 383 if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL) 384 return NULL; 385 session = mdsc->sessions[mds]; 386 dout("lookup_mds_session %p %d\n", session, 387 atomic_read(&session->s_ref)); 388 get_session(session); 389 return session; 390 } 391 392 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 393 { 394 if (mds >= mdsc->max_sessions) 395 return false; 396 return mdsc->sessions[mds]; 397 } 398 399 static int __verify_registered_session(struct ceph_mds_client *mdsc, 400 struct ceph_mds_session *s) 401 { 402 if (s->s_mds >= mdsc->max_sessions || 403 mdsc->sessions[s->s_mds] != s) 404 return -ENOENT; 405 return 0; 406 } 407 408 /* 409 * create+register a new session for given mds. 410 * called under mdsc->mutex. 411 */ 412 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 413 int mds) 414 { 415 struct ceph_mds_session *s; 416 417 if (mds >= mdsc->mdsmap->m_max_mds) 418 return ERR_PTR(-EINVAL); 419 420 s = kzalloc(sizeof(*s), GFP_NOFS); 421 if (!s) 422 return ERR_PTR(-ENOMEM); 423 s->s_mdsc = mdsc; 424 s->s_mds = mds; 425 s->s_state = CEPH_MDS_SESSION_NEW; 426 s->s_ttl = 0; 427 s->s_seq = 0; 428 mutex_init(&s->s_mutex); 429 430 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 431 432 spin_lock_init(&s->s_gen_ttl_lock); 433 s->s_cap_gen = 0; 434 s->s_cap_ttl = jiffies - 1; 435 436 spin_lock_init(&s->s_cap_lock); 437 s->s_renew_requested = 0; 438 s->s_renew_seq = 0; 439 INIT_LIST_HEAD(&s->s_caps); 440 s->s_nr_caps = 0; 441 s->s_trim_caps = 0; 442 atomic_set(&s->s_ref, 1); 443 INIT_LIST_HEAD(&s->s_waiting); 444 INIT_LIST_HEAD(&s->s_unsafe); 445 s->s_num_cap_releases = 0; 446 s->s_cap_iterator = NULL; 447 INIT_LIST_HEAD(&s->s_cap_releases); 448 INIT_LIST_HEAD(&s->s_cap_releases_done); 449 INIT_LIST_HEAD(&s->s_cap_flushing); 450 INIT_LIST_HEAD(&s->s_cap_snaps_flushing); 451 452 dout("register_session mds%d\n", mds); 453 if (mds >= mdsc->max_sessions) { 454 int newmax = 1 << get_count_order(mds+1); 455 struct ceph_mds_session **sa; 456 457 dout("register_session realloc to %d\n", newmax); 458 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 459 if (sa == NULL) 460 goto fail_realloc; 461 if (mdsc->sessions) { 462 memcpy(sa, mdsc->sessions, 463 mdsc->max_sessions * sizeof(void *)); 464 kfree(mdsc->sessions); 465 } 466 mdsc->sessions = sa; 467 mdsc->max_sessions = newmax; 468 } 469 mdsc->sessions[mds] = s; 470 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 471 472 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 473 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 474 475 return s; 476 477 fail_realloc: 478 kfree(s); 479 return ERR_PTR(-ENOMEM); 480 } 481 482 /* 483 * called under mdsc->mutex 484 */ 485 static void __unregister_session(struct ceph_mds_client *mdsc, 486 struct ceph_mds_session *s) 487 { 488 dout("__unregister_session mds%d %p\n", s->s_mds, s); 489 BUG_ON(mdsc->sessions[s->s_mds] != s); 490 mdsc->sessions[s->s_mds] = NULL; 491 ceph_con_close(&s->s_con); 492 ceph_put_mds_session(s); 493 } 494 495 /* 496 * drop session refs in request. 497 * 498 * should be last request ref, or hold mdsc->mutex 499 */ 500 static void put_request_session(struct ceph_mds_request *req) 501 { 502 if (req->r_session) { 503 ceph_put_mds_session(req->r_session); 504 req->r_session = NULL; 505 } 506 } 507 508 void ceph_mdsc_release_request(struct kref *kref) 509 { 510 struct ceph_mds_request *req = container_of(kref, 511 struct ceph_mds_request, 512 r_kref); 513 if (req->r_request) 514 ceph_msg_put(req->r_request); 515 if (req->r_reply) { 516 ceph_msg_put(req->r_reply); 517 destroy_reply_info(&req->r_reply_info); 518 } 519 if (req->r_inode) { 520 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 521 iput(req->r_inode); 522 } 523 if (req->r_locked_dir) 524 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 525 if (req->r_target_inode) 526 iput(req->r_target_inode); 527 if (req->r_dentry) 528 dput(req->r_dentry); 529 if (req->r_old_dentry) { 530 /* 531 * track (and drop pins for) r_old_dentry_dir 532 * separately, since r_old_dentry's d_parent may have 533 * changed between the dir mutex being dropped and 534 * this request being freed. 535 */ 536 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 537 CEPH_CAP_PIN); 538 dput(req->r_old_dentry); 539 iput(req->r_old_dentry_dir); 540 } 541 kfree(req->r_path1); 542 kfree(req->r_path2); 543 put_request_session(req); 544 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 545 kfree(req); 546 } 547 548 /* 549 * lookup session, bump ref if found. 550 * 551 * called under mdsc->mutex. 552 */ 553 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc, 554 u64 tid) 555 { 556 struct ceph_mds_request *req; 557 struct rb_node *n = mdsc->request_tree.rb_node; 558 559 while (n) { 560 req = rb_entry(n, struct ceph_mds_request, r_node); 561 if (tid < req->r_tid) 562 n = n->rb_left; 563 else if (tid > req->r_tid) 564 n = n->rb_right; 565 else { 566 ceph_mdsc_get_request(req); 567 return req; 568 } 569 } 570 return NULL; 571 } 572 573 static void __insert_request(struct ceph_mds_client *mdsc, 574 struct ceph_mds_request *new) 575 { 576 struct rb_node **p = &mdsc->request_tree.rb_node; 577 struct rb_node *parent = NULL; 578 struct ceph_mds_request *req = NULL; 579 580 while (*p) { 581 parent = *p; 582 req = rb_entry(parent, struct ceph_mds_request, r_node); 583 if (new->r_tid < req->r_tid) 584 p = &(*p)->rb_left; 585 else if (new->r_tid > req->r_tid) 586 p = &(*p)->rb_right; 587 else 588 BUG(); 589 } 590 591 rb_link_node(&new->r_node, parent, p); 592 rb_insert_color(&new->r_node, &mdsc->request_tree); 593 } 594 595 /* 596 * Register an in-flight request, and assign a tid. Link to directory 597 * are modifying (if any). 598 * 599 * Called under mdsc->mutex. 600 */ 601 static void __register_request(struct ceph_mds_client *mdsc, 602 struct ceph_mds_request *req, 603 struct inode *dir) 604 { 605 req->r_tid = ++mdsc->last_tid; 606 if (req->r_num_caps) 607 ceph_reserve_caps(mdsc, &req->r_caps_reservation, 608 req->r_num_caps); 609 dout("__register_request %p tid %lld\n", req, req->r_tid); 610 ceph_mdsc_get_request(req); 611 __insert_request(mdsc, req); 612 613 req->r_uid = current_fsuid(); 614 req->r_gid = current_fsgid(); 615 616 if (dir) { 617 struct ceph_inode_info *ci = ceph_inode(dir); 618 619 ihold(dir); 620 spin_lock(&ci->i_unsafe_lock); 621 req->r_unsafe_dir = dir; 622 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 623 spin_unlock(&ci->i_unsafe_lock); 624 } 625 } 626 627 static void __unregister_request(struct ceph_mds_client *mdsc, 628 struct ceph_mds_request *req) 629 { 630 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 631 rb_erase(&req->r_node, &mdsc->request_tree); 632 RB_CLEAR_NODE(&req->r_node); 633 634 if (req->r_unsafe_dir) { 635 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 636 637 spin_lock(&ci->i_unsafe_lock); 638 list_del_init(&req->r_unsafe_dir_item); 639 spin_unlock(&ci->i_unsafe_lock); 640 641 iput(req->r_unsafe_dir); 642 req->r_unsafe_dir = NULL; 643 } 644 645 ceph_mdsc_put_request(req); 646 } 647 648 /* 649 * Choose mds to send request to next. If there is a hint set in the 650 * request (e.g., due to a prior forward hint from the mds), use that. 651 * Otherwise, consult frag tree and/or caps to identify the 652 * appropriate mds. If all else fails, choose randomly. 653 * 654 * Called under mdsc->mutex. 655 */ 656 static struct dentry *get_nonsnap_parent(struct dentry *dentry) 657 { 658 /* 659 * we don't need to worry about protecting the d_parent access 660 * here because we never renaming inside the snapped namespace 661 * except to resplice to another snapdir, and either the old or new 662 * result is a valid result. 663 */ 664 while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP) 665 dentry = dentry->d_parent; 666 return dentry; 667 } 668 669 static int __choose_mds(struct ceph_mds_client *mdsc, 670 struct ceph_mds_request *req) 671 { 672 struct inode *inode; 673 struct ceph_inode_info *ci; 674 struct ceph_cap *cap; 675 int mode = req->r_direct_mode; 676 int mds = -1; 677 u32 hash = req->r_direct_hash; 678 bool is_hash = req->r_direct_is_hash; 679 680 /* 681 * is there a specific mds we should try? ignore hint if we have 682 * no session and the mds is not up (active or recovering). 683 */ 684 if (req->r_resend_mds >= 0 && 685 (__have_session(mdsc, req->r_resend_mds) || 686 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 687 dout("choose_mds using resend_mds mds%d\n", 688 req->r_resend_mds); 689 return req->r_resend_mds; 690 } 691 692 if (mode == USE_RANDOM_MDS) 693 goto random; 694 695 inode = NULL; 696 if (req->r_inode) { 697 inode = req->r_inode; 698 } else if (req->r_dentry) { 699 /* ignore race with rename; old or new d_parent is okay */ 700 struct dentry *parent = req->r_dentry->d_parent; 701 struct inode *dir = parent->d_inode; 702 703 if (dir->i_sb != mdsc->fsc->sb) { 704 /* not this fs! */ 705 inode = req->r_dentry->d_inode; 706 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 707 /* direct snapped/virtual snapdir requests 708 * based on parent dir inode */ 709 struct dentry *dn = get_nonsnap_parent(parent); 710 inode = dn->d_inode; 711 dout("__choose_mds using nonsnap parent %p\n", inode); 712 } else if (req->r_dentry->d_inode) { 713 /* dentry target */ 714 inode = req->r_dentry->d_inode; 715 } else { 716 /* dir + name */ 717 inode = dir; 718 hash = ceph_dentry_hash(dir, req->r_dentry); 719 is_hash = true; 720 } 721 } 722 723 dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash, 724 (int)hash, mode); 725 if (!inode) 726 goto random; 727 ci = ceph_inode(inode); 728 729 if (is_hash && S_ISDIR(inode->i_mode)) { 730 struct ceph_inode_frag frag; 731 int found; 732 733 ceph_choose_frag(ci, hash, &frag, &found); 734 if (found) { 735 if (mode == USE_ANY_MDS && frag.ndist > 0) { 736 u8 r; 737 738 /* choose a random replica */ 739 get_random_bytes(&r, 1); 740 r %= frag.ndist; 741 mds = frag.dist[r]; 742 dout("choose_mds %p %llx.%llx " 743 "frag %u mds%d (%d/%d)\n", 744 inode, ceph_vinop(inode), 745 frag.frag, mds, 746 (int)r, frag.ndist); 747 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 748 CEPH_MDS_STATE_ACTIVE) 749 return mds; 750 } 751 752 /* since this file/dir wasn't known to be 753 * replicated, then we want to look for the 754 * authoritative mds. */ 755 mode = USE_AUTH_MDS; 756 if (frag.mds >= 0) { 757 /* choose auth mds */ 758 mds = frag.mds; 759 dout("choose_mds %p %llx.%llx " 760 "frag %u mds%d (auth)\n", 761 inode, ceph_vinop(inode), frag.frag, mds); 762 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 763 CEPH_MDS_STATE_ACTIVE) 764 return mds; 765 } 766 } 767 } 768 769 spin_lock(&ci->i_ceph_lock); 770 cap = NULL; 771 if (mode == USE_AUTH_MDS) 772 cap = ci->i_auth_cap; 773 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 774 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 775 if (!cap) { 776 spin_unlock(&ci->i_ceph_lock); 777 goto random; 778 } 779 mds = cap->session->s_mds; 780 dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n", 781 inode, ceph_vinop(inode), mds, 782 cap == ci->i_auth_cap ? "auth " : "", cap); 783 spin_unlock(&ci->i_ceph_lock); 784 return mds; 785 786 random: 787 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 788 dout("choose_mds chose random mds%d\n", mds); 789 return mds; 790 } 791 792 793 /* 794 * session messages 795 */ 796 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 797 { 798 struct ceph_msg *msg; 799 struct ceph_mds_session_head *h; 800 801 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 802 false); 803 if (!msg) { 804 pr_err("create_session_msg ENOMEM creating msg\n"); 805 return NULL; 806 } 807 h = msg->front.iov_base; 808 h->op = cpu_to_le32(op); 809 h->seq = cpu_to_le64(seq); 810 return msg; 811 } 812 813 /* 814 * send session open request. 815 * 816 * called under mdsc->mutex 817 */ 818 static int __open_session(struct ceph_mds_client *mdsc, 819 struct ceph_mds_session *session) 820 { 821 struct ceph_msg *msg; 822 int mstate; 823 int mds = session->s_mds; 824 825 /* wait for mds to go active? */ 826 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 827 dout("open_session to mds%d (%s)\n", mds, 828 ceph_mds_state_name(mstate)); 829 session->s_state = CEPH_MDS_SESSION_OPENING; 830 session->s_renew_requested = jiffies; 831 832 /* send connect message */ 833 msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq); 834 if (!msg) 835 return -ENOMEM; 836 ceph_con_send(&session->s_con, msg); 837 return 0; 838 } 839 840 /* 841 * open sessions for any export targets for the given mds 842 * 843 * called under mdsc->mutex 844 */ 845 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 846 struct ceph_mds_session *session) 847 { 848 struct ceph_mds_info *mi; 849 struct ceph_mds_session *ts; 850 int i, mds = session->s_mds; 851 int target; 852 853 if (mds >= mdsc->mdsmap->m_max_mds) 854 return; 855 mi = &mdsc->mdsmap->m_info[mds]; 856 dout("open_export_target_sessions for mds%d (%d targets)\n", 857 session->s_mds, mi->num_export_targets); 858 859 for (i = 0; i < mi->num_export_targets; i++) { 860 target = mi->export_targets[i]; 861 ts = __ceph_lookup_mds_session(mdsc, target); 862 if (!ts) { 863 ts = register_session(mdsc, target); 864 if (IS_ERR(ts)) 865 return; 866 } 867 if (session->s_state == CEPH_MDS_SESSION_NEW || 868 session->s_state == CEPH_MDS_SESSION_CLOSING) 869 __open_session(mdsc, session); 870 else 871 dout(" mds%d target mds%d %p is %s\n", session->s_mds, 872 i, ts, session_state_name(ts->s_state)); 873 ceph_put_mds_session(ts); 874 } 875 } 876 877 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 878 struct ceph_mds_session *session) 879 { 880 mutex_lock(&mdsc->mutex); 881 __open_export_target_sessions(mdsc, session); 882 mutex_unlock(&mdsc->mutex); 883 } 884 885 /* 886 * session caps 887 */ 888 889 /* 890 * Free preallocated cap messages assigned to this session 891 */ 892 static void cleanup_cap_releases(struct ceph_mds_session *session) 893 { 894 struct ceph_msg *msg; 895 896 spin_lock(&session->s_cap_lock); 897 while (!list_empty(&session->s_cap_releases)) { 898 msg = list_first_entry(&session->s_cap_releases, 899 struct ceph_msg, list_head); 900 list_del_init(&msg->list_head); 901 ceph_msg_put(msg); 902 } 903 while (!list_empty(&session->s_cap_releases_done)) { 904 msg = list_first_entry(&session->s_cap_releases_done, 905 struct ceph_msg, list_head); 906 list_del_init(&msg->list_head); 907 ceph_msg_put(msg); 908 } 909 spin_unlock(&session->s_cap_lock); 910 } 911 912 /* 913 * Helper to safely iterate over all caps associated with a session, with 914 * special care taken to handle a racing __ceph_remove_cap(). 915 * 916 * Caller must hold session s_mutex. 917 */ 918 static int iterate_session_caps(struct ceph_mds_session *session, 919 int (*cb)(struct inode *, struct ceph_cap *, 920 void *), void *arg) 921 { 922 struct list_head *p; 923 struct ceph_cap *cap; 924 struct inode *inode, *last_inode = NULL; 925 struct ceph_cap *old_cap = NULL; 926 int ret; 927 928 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 929 spin_lock(&session->s_cap_lock); 930 p = session->s_caps.next; 931 while (p != &session->s_caps) { 932 cap = list_entry(p, struct ceph_cap, session_caps); 933 inode = igrab(&cap->ci->vfs_inode); 934 if (!inode) { 935 p = p->next; 936 continue; 937 } 938 session->s_cap_iterator = cap; 939 spin_unlock(&session->s_cap_lock); 940 941 if (last_inode) { 942 iput(last_inode); 943 last_inode = NULL; 944 } 945 if (old_cap) { 946 ceph_put_cap(session->s_mdsc, old_cap); 947 old_cap = NULL; 948 } 949 950 ret = cb(inode, cap, arg); 951 last_inode = inode; 952 953 spin_lock(&session->s_cap_lock); 954 p = p->next; 955 if (cap->ci == NULL) { 956 dout("iterate_session_caps finishing cap %p removal\n", 957 cap); 958 BUG_ON(cap->session != session); 959 list_del_init(&cap->session_caps); 960 session->s_nr_caps--; 961 cap->session = NULL; 962 old_cap = cap; /* put_cap it w/o locks held */ 963 } 964 if (ret < 0) 965 goto out; 966 } 967 ret = 0; 968 out: 969 session->s_cap_iterator = NULL; 970 spin_unlock(&session->s_cap_lock); 971 972 if (last_inode) 973 iput(last_inode); 974 if (old_cap) 975 ceph_put_cap(session->s_mdsc, old_cap); 976 977 return ret; 978 } 979 980 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 981 void *arg) 982 { 983 struct ceph_inode_info *ci = ceph_inode(inode); 984 int drop = 0; 985 986 dout("removing cap %p, ci is %p, inode is %p\n", 987 cap, ci, &ci->vfs_inode); 988 spin_lock(&ci->i_ceph_lock); 989 __ceph_remove_cap(cap); 990 if (!__ceph_is_any_real_caps(ci)) { 991 struct ceph_mds_client *mdsc = 992 ceph_sb_to_client(inode->i_sb)->mdsc; 993 994 spin_lock(&mdsc->cap_dirty_lock); 995 if (!list_empty(&ci->i_dirty_item)) { 996 pr_info(" dropping dirty %s state for %p %lld\n", 997 ceph_cap_string(ci->i_dirty_caps), 998 inode, ceph_ino(inode)); 999 ci->i_dirty_caps = 0; 1000 list_del_init(&ci->i_dirty_item); 1001 drop = 1; 1002 } 1003 if (!list_empty(&ci->i_flushing_item)) { 1004 pr_info(" dropping dirty+flushing %s state for %p %lld\n", 1005 ceph_cap_string(ci->i_flushing_caps), 1006 inode, ceph_ino(inode)); 1007 ci->i_flushing_caps = 0; 1008 list_del_init(&ci->i_flushing_item); 1009 mdsc->num_cap_flushing--; 1010 drop = 1; 1011 } 1012 if (drop && ci->i_wrbuffer_ref) { 1013 pr_info(" dropping dirty data for %p %lld\n", 1014 inode, ceph_ino(inode)); 1015 ci->i_wrbuffer_ref = 0; 1016 ci->i_wrbuffer_ref_head = 0; 1017 drop++; 1018 } 1019 spin_unlock(&mdsc->cap_dirty_lock); 1020 } 1021 spin_unlock(&ci->i_ceph_lock); 1022 while (drop--) 1023 iput(inode); 1024 return 0; 1025 } 1026 1027 /* 1028 * caller must hold session s_mutex 1029 */ 1030 static void remove_session_caps(struct ceph_mds_session *session) 1031 { 1032 dout("remove_session_caps on %p\n", session); 1033 iterate_session_caps(session, remove_session_caps_cb, NULL); 1034 1035 spin_lock(&session->s_cap_lock); 1036 if (session->s_nr_caps > 0) { 1037 struct super_block *sb = session->s_mdsc->fsc->sb; 1038 struct inode *inode; 1039 struct ceph_cap *cap, *prev = NULL; 1040 struct ceph_vino vino; 1041 /* 1042 * iterate_session_caps() skips inodes that are being 1043 * deleted, we need to wait until deletions are complete. 1044 * __wait_on_freeing_inode() is designed for the job, 1045 * but it is not exported, so use lookup inode function 1046 * to access it. 1047 */ 1048 while (!list_empty(&session->s_caps)) { 1049 cap = list_entry(session->s_caps.next, 1050 struct ceph_cap, session_caps); 1051 if (cap == prev) 1052 break; 1053 prev = cap; 1054 vino = cap->ci->i_vino; 1055 spin_unlock(&session->s_cap_lock); 1056 1057 inode = ceph_find_inode(sb, vino); 1058 iput(inode); 1059 1060 spin_lock(&session->s_cap_lock); 1061 } 1062 } 1063 spin_unlock(&session->s_cap_lock); 1064 1065 BUG_ON(session->s_nr_caps > 0); 1066 BUG_ON(!list_empty(&session->s_cap_flushing)); 1067 cleanup_cap_releases(session); 1068 } 1069 1070 /* 1071 * wake up any threads waiting on this session's caps. if the cap is 1072 * old (didn't get renewed on the client reconnect), remove it now. 1073 * 1074 * caller must hold s_mutex. 1075 */ 1076 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1077 void *arg) 1078 { 1079 struct ceph_inode_info *ci = ceph_inode(inode); 1080 1081 wake_up_all(&ci->i_cap_wq); 1082 if (arg) { 1083 spin_lock(&ci->i_ceph_lock); 1084 ci->i_wanted_max_size = 0; 1085 ci->i_requested_max_size = 0; 1086 spin_unlock(&ci->i_ceph_lock); 1087 } 1088 return 0; 1089 } 1090 1091 static void wake_up_session_caps(struct ceph_mds_session *session, 1092 int reconnect) 1093 { 1094 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1095 iterate_session_caps(session, wake_up_session_cb, 1096 (void *)(unsigned long)reconnect); 1097 } 1098 1099 /* 1100 * Send periodic message to MDS renewing all currently held caps. The 1101 * ack will reset the expiration for all caps from this session. 1102 * 1103 * caller holds s_mutex 1104 */ 1105 static int send_renew_caps(struct ceph_mds_client *mdsc, 1106 struct ceph_mds_session *session) 1107 { 1108 struct ceph_msg *msg; 1109 int state; 1110 1111 if (time_after_eq(jiffies, session->s_cap_ttl) && 1112 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1113 pr_info("mds%d caps stale\n", session->s_mds); 1114 session->s_renew_requested = jiffies; 1115 1116 /* do not try to renew caps until a recovering mds has reconnected 1117 * with its clients. */ 1118 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1119 if (state < CEPH_MDS_STATE_RECONNECT) { 1120 dout("send_renew_caps ignoring mds%d (%s)\n", 1121 session->s_mds, ceph_mds_state_name(state)); 1122 return 0; 1123 } 1124 1125 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1126 ceph_mds_state_name(state)); 1127 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1128 ++session->s_renew_seq); 1129 if (!msg) 1130 return -ENOMEM; 1131 ceph_con_send(&session->s_con, msg); 1132 return 0; 1133 } 1134 1135 /* 1136 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1137 * 1138 * Called under session->s_mutex 1139 */ 1140 static void renewed_caps(struct ceph_mds_client *mdsc, 1141 struct ceph_mds_session *session, int is_renew) 1142 { 1143 int was_stale; 1144 int wake = 0; 1145 1146 spin_lock(&session->s_cap_lock); 1147 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1148 1149 session->s_cap_ttl = session->s_renew_requested + 1150 mdsc->mdsmap->m_session_timeout*HZ; 1151 1152 if (was_stale) { 1153 if (time_before(jiffies, session->s_cap_ttl)) { 1154 pr_info("mds%d caps renewed\n", session->s_mds); 1155 wake = 1; 1156 } else { 1157 pr_info("mds%d caps still stale\n", session->s_mds); 1158 } 1159 } 1160 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1161 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1162 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1163 spin_unlock(&session->s_cap_lock); 1164 1165 if (wake) 1166 wake_up_session_caps(session, 0); 1167 } 1168 1169 /* 1170 * send a session close request 1171 */ 1172 static int request_close_session(struct ceph_mds_client *mdsc, 1173 struct ceph_mds_session *session) 1174 { 1175 struct ceph_msg *msg; 1176 1177 dout("request_close_session mds%d state %s seq %lld\n", 1178 session->s_mds, session_state_name(session->s_state), 1179 session->s_seq); 1180 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1181 if (!msg) 1182 return -ENOMEM; 1183 ceph_con_send(&session->s_con, msg); 1184 return 0; 1185 } 1186 1187 /* 1188 * Called with s_mutex held. 1189 */ 1190 static int __close_session(struct ceph_mds_client *mdsc, 1191 struct ceph_mds_session *session) 1192 { 1193 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1194 return 0; 1195 session->s_state = CEPH_MDS_SESSION_CLOSING; 1196 return request_close_session(mdsc, session); 1197 } 1198 1199 /* 1200 * Trim old(er) caps. 1201 * 1202 * Because we can't cache an inode without one or more caps, we do 1203 * this indirectly: if a cap is unused, we prune its aliases, at which 1204 * point the inode will hopefully get dropped to. 1205 * 1206 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1207 * memory pressure from the MDS, though, so it needn't be perfect. 1208 */ 1209 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1210 { 1211 struct ceph_mds_session *session = arg; 1212 struct ceph_inode_info *ci = ceph_inode(inode); 1213 int used, oissued, mine; 1214 1215 if (session->s_trim_caps <= 0) 1216 return -1; 1217 1218 spin_lock(&ci->i_ceph_lock); 1219 mine = cap->issued | cap->implemented; 1220 used = __ceph_caps_used(ci); 1221 oissued = __ceph_caps_issued_other(ci, cap); 1222 1223 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n", 1224 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1225 ceph_cap_string(used)); 1226 if (ci->i_dirty_caps) 1227 goto out; /* dirty caps */ 1228 if ((used & ~oissued) & mine) 1229 goto out; /* we need these caps */ 1230 1231 session->s_trim_caps--; 1232 if (oissued) { 1233 /* we aren't the only cap.. just remove us */ 1234 __queue_cap_release(session, ceph_ino(inode), cap->cap_id, 1235 cap->mseq, cap->issue_seq); 1236 __ceph_remove_cap(cap); 1237 } else { 1238 /* try to drop referring dentries */ 1239 spin_unlock(&ci->i_ceph_lock); 1240 d_prune_aliases(inode); 1241 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1242 inode, cap, atomic_read(&inode->i_count)); 1243 return 0; 1244 } 1245 1246 out: 1247 spin_unlock(&ci->i_ceph_lock); 1248 return 0; 1249 } 1250 1251 /* 1252 * Trim session cap count down to some max number. 1253 */ 1254 static int trim_caps(struct ceph_mds_client *mdsc, 1255 struct ceph_mds_session *session, 1256 int max_caps) 1257 { 1258 int trim_caps = session->s_nr_caps - max_caps; 1259 1260 dout("trim_caps mds%d start: %d / %d, trim %d\n", 1261 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 1262 if (trim_caps > 0) { 1263 session->s_trim_caps = trim_caps; 1264 iterate_session_caps(session, trim_caps_cb, session); 1265 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 1266 session->s_mds, session->s_nr_caps, max_caps, 1267 trim_caps - session->s_trim_caps); 1268 session->s_trim_caps = 0; 1269 } 1270 return 0; 1271 } 1272 1273 /* 1274 * Allocate cap_release messages. If there is a partially full message 1275 * in the queue, try to allocate enough to cover it's remainder, so that 1276 * we can send it immediately. 1277 * 1278 * Called under s_mutex. 1279 */ 1280 int ceph_add_cap_releases(struct ceph_mds_client *mdsc, 1281 struct ceph_mds_session *session) 1282 { 1283 struct ceph_msg *msg, *partial = NULL; 1284 struct ceph_mds_cap_release *head; 1285 int err = -ENOMEM; 1286 int extra = mdsc->fsc->mount_options->cap_release_safety; 1287 int num; 1288 1289 dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds, 1290 extra); 1291 1292 spin_lock(&session->s_cap_lock); 1293 1294 if (!list_empty(&session->s_cap_releases)) { 1295 msg = list_first_entry(&session->s_cap_releases, 1296 struct ceph_msg, 1297 list_head); 1298 head = msg->front.iov_base; 1299 num = le32_to_cpu(head->num); 1300 if (num) { 1301 dout(" partial %p with (%d/%d)\n", msg, num, 1302 (int)CEPH_CAPS_PER_RELEASE); 1303 extra += CEPH_CAPS_PER_RELEASE - num; 1304 partial = msg; 1305 } 1306 } 1307 while (session->s_num_cap_releases < session->s_nr_caps + extra) { 1308 spin_unlock(&session->s_cap_lock); 1309 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, 1310 GFP_NOFS, false); 1311 if (!msg) 1312 goto out_unlocked; 1313 dout("add_cap_releases %p msg %p now %d\n", session, msg, 1314 (int)msg->front.iov_len); 1315 head = msg->front.iov_base; 1316 head->num = cpu_to_le32(0); 1317 msg->front.iov_len = sizeof(*head); 1318 spin_lock(&session->s_cap_lock); 1319 list_add(&msg->list_head, &session->s_cap_releases); 1320 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE; 1321 } 1322 1323 if (partial) { 1324 head = partial->front.iov_base; 1325 num = le32_to_cpu(head->num); 1326 dout(" queueing partial %p with %d/%d\n", partial, num, 1327 (int)CEPH_CAPS_PER_RELEASE); 1328 list_move_tail(&partial->list_head, 1329 &session->s_cap_releases_done); 1330 session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num; 1331 } 1332 err = 0; 1333 spin_unlock(&session->s_cap_lock); 1334 out_unlocked: 1335 return err; 1336 } 1337 1338 /* 1339 * flush all dirty inode data to disk. 1340 * 1341 * returns true if we've flushed through want_flush_seq 1342 */ 1343 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) 1344 { 1345 int mds, ret = 1; 1346 1347 dout("check_cap_flush want %lld\n", want_flush_seq); 1348 mutex_lock(&mdsc->mutex); 1349 for (mds = 0; ret && mds < mdsc->max_sessions; mds++) { 1350 struct ceph_mds_session *session = mdsc->sessions[mds]; 1351 1352 if (!session) 1353 continue; 1354 get_session(session); 1355 mutex_unlock(&mdsc->mutex); 1356 1357 mutex_lock(&session->s_mutex); 1358 if (!list_empty(&session->s_cap_flushing)) { 1359 struct ceph_inode_info *ci = 1360 list_entry(session->s_cap_flushing.next, 1361 struct ceph_inode_info, 1362 i_flushing_item); 1363 struct inode *inode = &ci->vfs_inode; 1364 1365 spin_lock(&ci->i_ceph_lock); 1366 if (ci->i_cap_flush_seq <= want_flush_seq) { 1367 dout("check_cap_flush still flushing %p " 1368 "seq %lld <= %lld to mds%d\n", inode, 1369 ci->i_cap_flush_seq, want_flush_seq, 1370 session->s_mds); 1371 ret = 0; 1372 } 1373 spin_unlock(&ci->i_ceph_lock); 1374 } 1375 mutex_unlock(&session->s_mutex); 1376 ceph_put_mds_session(session); 1377 1378 if (!ret) 1379 return ret; 1380 mutex_lock(&mdsc->mutex); 1381 } 1382 1383 mutex_unlock(&mdsc->mutex); 1384 dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq); 1385 return ret; 1386 } 1387 1388 /* 1389 * called under s_mutex 1390 */ 1391 void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 1392 struct ceph_mds_session *session) 1393 { 1394 struct ceph_msg *msg; 1395 1396 dout("send_cap_releases mds%d\n", session->s_mds); 1397 spin_lock(&session->s_cap_lock); 1398 while (!list_empty(&session->s_cap_releases_done)) { 1399 msg = list_first_entry(&session->s_cap_releases_done, 1400 struct ceph_msg, list_head); 1401 list_del_init(&msg->list_head); 1402 spin_unlock(&session->s_cap_lock); 1403 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1404 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 1405 ceph_con_send(&session->s_con, msg); 1406 spin_lock(&session->s_cap_lock); 1407 } 1408 spin_unlock(&session->s_cap_lock); 1409 } 1410 1411 static void discard_cap_releases(struct ceph_mds_client *mdsc, 1412 struct ceph_mds_session *session) 1413 { 1414 struct ceph_msg *msg; 1415 struct ceph_mds_cap_release *head; 1416 unsigned num; 1417 1418 dout("discard_cap_releases mds%d\n", session->s_mds); 1419 spin_lock(&session->s_cap_lock); 1420 1421 /* zero out the in-progress message */ 1422 msg = list_first_entry(&session->s_cap_releases, 1423 struct ceph_msg, list_head); 1424 head = msg->front.iov_base; 1425 num = le32_to_cpu(head->num); 1426 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num); 1427 head->num = cpu_to_le32(0); 1428 msg->front.iov_len = sizeof(*head); 1429 session->s_num_cap_releases += num; 1430 1431 /* requeue completed messages */ 1432 while (!list_empty(&session->s_cap_releases_done)) { 1433 msg = list_first_entry(&session->s_cap_releases_done, 1434 struct ceph_msg, list_head); 1435 list_del_init(&msg->list_head); 1436 1437 head = msg->front.iov_base; 1438 num = le32_to_cpu(head->num); 1439 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, 1440 num); 1441 session->s_num_cap_releases += num; 1442 head->num = cpu_to_le32(0); 1443 msg->front.iov_len = sizeof(*head); 1444 list_add(&msg->list_head, &session->s_cap_releases); 1445 } 1446 1447 spin_unlock(&session->s_cap_lock); 1448 } 1449 1450 /* 1451 * requests 1452 */ 1453 1454 /* 1455 * Create an mds request. 1456 */ 1457 struct ceph_mds_request * 1458 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 1459 { 1460 struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS); 1461 1462 if (!req) 1463 return ERR_PTR(-ENOMEM); 1464 1465 mutex_init(&req->r_fill_mutex); 1466 req->r_mdsc = mdsc; 1467 req->r_started = jiffies; 1468 req->r_resend_mds = -1; 1469 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 1470 req->r_fmode = -1; 1471 kref_init(&req->r_kref); 1472 INIT_LIST_HEAD(&req->r_wait); 1473 init_completion(&req->r_completion); 1474 init_completion(&req->r_safe_completion); 1475 INIT_LIST_HEAD(&req->r_unsafe_item); 1476 1477 req->r_op = op; 1478 req->r_direct_mode = mode; 1479 return req; 1480 } 1481 1482 /* 1483 * return oldest (lowest) request, tid in request tree, 0 if none. 1484 * 1485 * called under mdsc->mutex. 1486 */ 1487 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 1488 { 1489 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 1490 return NULL; 1491 return rb_entry(rb_first(&mdsc->request_tree), 1492 struct ceph_mds_request, r_node); 1493 } 1494 1495 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 1496 { 1497 struct ceph_mds_request *req = __get_oldest_req(mdsc); 1498 1499 if (req) 1500 return req->r_tid; 1501 return 0; 1502 } 1503 1504 /* 1505 * Build a dentry's path. Allocate on heap; caller must kfree. Based 1506 * on build_path_from_dentry in fs/cifs/dir.c. 1507 * 1508 * If @stop_on_nosnap, generate path relative to the first non-snapped 1509 * inode. 1510 * 1511 * Encode hidden .snap dirs as a double /, i.e. 1512 * foo/.snap/bar -> foo//bar 1513 */ 1514 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base, 1515 int stop_on_nosnap) 1516 { 1517 struct dentry *temp; 1518 char *path; 1519 int len, pos; 1520 unsigned seq; 1521 1522 if (dentry == NULL) 1523 return ERR_PTR(-EINVAL); 1524 1525 retry: 1526 len = 0; 1527 seq = read_seqbegin(&rename_lock); 1528 rcu_read_lock(); 1529 for (temp = dentry; !IS_ROOT(temp);) { 1530 struct inode *inode = temp->d_inode; 1531 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) 1532 len++; /* slash only */ 1533 else if (stop_on_nosnap && inode && 1534 ceph_snap(inode) == CEPH_NOSNAP) 1535 break; 1536 else 1537 len += 1 + temp->d_name.len; 1538 temp = temp->d_parent; 1539 } 1540 rcu_read_unlock(); 1541 if (len) 1542 len--; /* no leading '/' */ 1543 1544 path = kmalloc(len+1, GFP_NOFS); 1545 if (path == NULL) 1546 return ERR_PTR(-ENOMEM); 1547 pos = len; 1548 path[pos] = 0; /* trailing null */ 1549 rcu_read_lock(); 1550 for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) { 1551 struct inode *inode; 1552 1553 spin_lock(&temp->d_lock); 1554 inode = temp->d_inode; 1555 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 1556 dout("build_path path+%d: %p SNAPDIR\n", 1557 pos, temp); 1558 } else if (stop_on_nosnap && inode && 1559 ceph_snap(inode) == CEPH_NOSNAP) { 1560 spin_unlock(&temp->d_lock); 1561 break; 1562 } else { 1563 pos -= temp->d_name.len; 1564 if (pos < 0) { 1565 spin_unlock(&temp->d_lock); 1566 break; 1567 } 1568 strncpy(path + pos, temp->d_name.name, 1569 temp->d_name.len); 1570 } 1571 spin_unlock(&temp->d_lock); 1572 if (pos) 1573 path[--pos] = '/'; 1574 temp = temp->d_parent; 1575 } 1576 rcu_read_unlock(); 1577 if (pos != 0 || read_seqretry(&rename_lock, seq)) { 1578 pr_err("build_path did not end path lookup where " 1579 "expected, namelen is %d, pos is %d\n", len, pos); 1580 /* presumably this is only possible if racing with a 1581 rename of one of the parent directories (we can not 1582 lock the dentries above us to prevent this, but 1583 retrying should be harmless) */ 1584 kfree(path); 1585 goto retry; 1586 } 1587 1588 *base = ceph_ino(temp->d_inode); 1589 *plen = len; 1590 dout("build_path on %p %d built %llx '%.*s'\n", 1591 dentry, d_count(dentry), *base, len, path); 1592 return path; 1593 } 1594 1595 static int build_dentry_path(struct dentry *dentry, 1596 const char **ppath, int *ppathlen, u64 *pino, 1597 int *pfreepath) 1598 { 1599 char *path; 1600 1601 if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) { 1602 *pino = ceph_ino(dentry->d_parent->d_inode); 1603 *ppath = dentry->d_name.name; 1604 *ppathlen = dentry->d_name.len; 1605 return 0; 1606 } 1607 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 1608 if (IS_ERR(path)) 1609 return PTR_ERR(path); 1610 *ppath = path; 1611 *pfreepath = 1; 1612 return 0; 1613 } 1614 1615 static int build_inode_path(struct inode *inode, 1616 const char **ppath, int *ppathlen, u64 *pino, 1617 int *pfreepath) 1618 { 1619 struct dentry *dentry; 1620 char *path; 1621 1622 if (ceph_snap(inode) == CEPH_NOSNAP) { 1623 *pino = ceph_ino(inode); 1624 *ppathlen = 0; 1625 return 0; 1626 } 1627 dentry = d_find_alias(inode); 1628 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 1629 dput(dentry); 1630 if (IS_ERR(path)) 1631 return PTR_ERR(path); 1632 *ppath = path; 1633 *pfreepath = 1; 1634 return 0; 1635 } 1636 1637 /* 1638 * request arguments may be specified via an inode *, a dentry *, or 1639 * an explicit ino+path. 1640 */ 1641 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 1642 const char *rpath, u64 rino, 1643 const char **ppath, int *pathlen, 1644 u64 *ino, int *freepath) 1645 { 1646 int r = 0; 1647 1648 if (rinode) { 1649 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 1650 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 1651 ceph_snap(rinode)); 1652 } else if (rdentry) { 1653 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath); 1654 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 1655 *ppath); 1656 } else if (rpath || rino) { 1657 *ino = rino; 1658 *ppath = rpath; 1659 *pathlen = rpath ? strlen(rpath) : 0; 1660 dout(" path %.*s\n", *pathlen, rpath); 1661 } 1662 1663 return r; 1664 } 1665 1666 /* 1667 * called under mdsc->mutex 1668 */ 1669 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, 1670 struct ceph_mds_request *req, 1671 int mds) 1672 { 1673 struct ceph_msg *msg; 1674 struct ceph_mds_request_head *head; 1675 const char *path1 = NULL; 1676 const char *path2 = NULL; 1677 u64 ino1 = 0, ino2 = 0; 1678 int pathlen1 = 0, pathlen2 = 0; 1679 int freepath1 = 0, freepath2 = 0; 1680 int len; 1681 u16 releases; 1682 void *p, *end; 1683 int ret; 1684 1685 ret = set_request_path_attr(req->r_inode, req->r_dentry, 1686 req->r_path1, req->r_ino1.ino, 1687 &path1, &pathlen1, &ino1, &freepath1); 1688 if (ret < 0) { 1689 msg = ERR_PTR(ret); 1690 goto out; 1691 } 1692 1693 ret = set_request_path_attr(NULL, req->r_old_dentry, 1694 req->r_path2, req->r_ino2.ino, 1695 &path2, &pathlen2, &ino2, &freepath2); 1696 if (ret < 0) { 1697 msg = ERR_PTR(ret); 1698 goto out_free1; 1699 } 1700 1701 len = sizeof(*head) + 1702 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)); 1703 1704 /* calculate (max) length for cap releases */ 1705 len += sizeof(struct ceph_mds_request_release) * 1706 (!!req->r_inode_drop + !!req->r_dentry_drop + 1707 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 1708 if (req->r_dentry_drop) 1709 len += req->r_dentry->d_name.len; 1710 if (req->r_old_dentry_drop) 1711 len += req->r_old_dentry->d_name.len; 1712 1713 msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false); 1714 if (!msg) { 1715 msg = ERR_PTR(-ENOMEM); 1716 goto out_free2; 1717 } 1718 1719 msg->hdr.tid = cpu_to_le64(req->r_tid); 1720 1721 head = msg->front.iov_base; 1722 p = msg->front.iov_base + sizeof(*head); 1723 end = msg->front.iov_base + msg->front.iov_len; 1724 1725 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 1726 head->op = cpu_to_le32(req->r_op); 1727 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid)); 1728 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid)); 1729 head->args = req->r_args; 1730 1731 ceph_encode_filepath(&p, end, ino1, path1); 1732 ceph_encode_filepath(&p, end, ino2, path2); 1733 1734 /* make note of release offset, in case we need to replay */ 1735 req->r_request_release_offset = p - msg->front.iov_base; 1736 1737 /* cap releases */ 1738 releases = 0; 1739 if (req->r_inode_drop) 1740 releases += ceph_encode_inode_release(&p, 1741 req->r_inode ? req->r_inode : req->r_dentry->d_inode, 1742 mds, req->r_inode_drop, req->r_inode_unless, 0); 1743 if (req->r_dentry_drop) 1744 releases += ceph_encode_dentry_release(&p, req->r_dentry, 1745 mds, req->r_dentry_drop, req->r_dentry_unless); 1746 if (req->r_old_dentry_drop) 1747 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 1748 mds, req->r_old_dentry_drop, req->r_old_dentry_unless); 1749 if (req->r_old_inode_drop) 1750 releases += ceph_encode_inode_release(&p, 1751 req->r_old_dentry->d_inode, 1752 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 1753 head->num_releases = cpu_to_le16(releases); 1754 1755 BUG_ON(p > end); 1756 msg->front.iov_len = p - msg->front.iov_base; 1757 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1758 1759 if (req->r_data_len) { 1760 /* outbound data set only by ceph_sync_setxattr() */ 1761 BUG_ON(!req->r_pages); 1762 ceph_msg_data_add_pages(msg, req->r_pages, req->r_data_len, 0); 1763 } 1764 1765 msg->hdr.data_len = cpu_to_le32(req->r_data_len); 1766 msg->hdr.data_off = cpu_to_le16(0); 1767 1768 out_free2: 1769 if (freepath2) 1770 kfree((char *)path2); 1771 out_free1: 1772 if (freepath1) 1773 kfree((char *)path1); 1774 out: 1775 return msg; 1776 } 1777 1778 /* 1779 * called under mdsc->mutex if error, under no mutex if 1780 * success. 1781 */ 1782 static void complete_request(struct ceph_mds_client *mdsc, 1783 struct ceph_mds_request *req) 1784 { 1785 if (req->r_callback) 1786 req->r_callback(mdsc, req); 1787 else 1788 complete_all(&req->r_completion); 1789 } 1790 1791 /* 1792 * called under mdsc->mutex 1793 */ 1794 static int __prepare_send_request(struct ceph_mds_client *mdsc, 1795 struct ceph_mds_request *req, 1796 int mds) 1797 { 1798 struct ceph_mds_request_head *rhead; 1799 struct ceph_msg *msg; 1800 int flags = 0; 1801 1802 req->r_attempts++; 1803 if (req->r_inode) { 1804 struct ceph_cap *cap = 1805 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 1806 1807 if (cap) 1808 req->r_sent_on_mseq = cap->mseq; 1809 else 1810 req->r_sent_on_mseq = -1; 1811 } 1812 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 1813 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 1814 1815 if (req->r_got_unsafe) { 1816 /* 1817 * Replay. Do not regenerate message (and rebuild 1818 * paths, etc.); just use the original message. 1819 * Rebuilding paths will break for renames because 1820 * d_move mangles the src name. 1821 */ 1822 msg = req->r_request; 1823 rhead = msg->front.iov_base; 1824 1825 flags = le32_to_cpu(rhead->flags); 1826 flags |= CEPH_MDS_FLAG_REPLAY; 1827 rhead->flags = cpu_to_le32(flags); 1828 1829 if (req->r_target_inode) 1830 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 1831 1832 rhead->num_retry = req->r_attempts - 1; 1833 1834 /* remove cap/dentry releases from message */ 1835 rhead->num_releases = 0; 1836 msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset); 1837 msg->front.iov_len = req->r_request_release_offset; 1838 return 0; 1839 } 1840 1841 if (req->r_request) { 1842 ceph_msg_put(req->r_request); 1843 req->r_request = NULL; 1844 } 1845 msg = create_request_message(mdsc, req, mds); 1846 if (IS_ERR(msg)) { 1847 req->r_err = PTR_ERR(msg); 1848 complete_request(mdsc, req); 1849 return PTR_ERR(msg); 1850 } 1851 req->r_request = msg; 1852 1853 rhead = msg->front.iov_base; 1854 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 1855 if (req->r_got_unsafe) 1856 flags |= CEPH_MDS_FLAG_REPLAY; 1857 if (req->r_locked_dir) 1858 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 1859 rhead->flags = cpu_to_le32(flags); 1860 rhead->num_fwd = req->r_num_fwd; 1861 rhead->num_retry = req->r_attempts - 1; 1862 rhead->ino = 0; 1863 1864 dout(" r_locked_dir = %p\n", req->r_locked_dir); 1865 return 0; 1866 } 1867 1868 /* 1869 * send request, or put it on the appropriate wait list. 1870 */ 1871 static int __do_request(struct ceph_mds_client *mdsc, 1872 struct ceph_mds_request *req) 1873 { 1874 struct ceph_mds_session *session = NULL; 1875 int mds = -1; 1876 int err = -EAGAIN; 1877 1878 if (req->r_err || req->r_got_result) 1879 goto out; 1880 1881 if (req->r_timeout && 1882 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 1883 dout("do_request timed out\n"); 1884 err = -EIO; 1885 goto finish; 1886 } 1887 1888 put_request_session(req); 1889 1890 mds = __choose_mds(mdsc, req); 1891 if (mds < 0 || 1892 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 1893 dout("do_request no mds or not active, waiting for map\n"); 1894 list_add(&req->r_wait, &mdsc->waiting_for_map); 1895 goto out; 1896 } 1897 1898 /* get, open session */ 1899 session = __ceph_lookup_mds_session(mdsc, mds); 1900 if (!session) { 1901 session = register_session(mdsc, mds); 1902 if (IS_ERR(session)) { 1903 err = PTR_ERR(session); 1904 goto finish; 1905 } 1906 } 1907 req->r_session = get_session(session); 1908 1909 dout("do_request mds%d session %p state %s\n", mds, session, 1910 session_state_name(session->s_state)); 1911 if (session->s_state != CEPH_MDS_SESSION_OPEN && 1912 session->s_state != CEPH_MDS_SESSION_HUNG) { 1913 if (session->s_state == CEPH_MDS_SESSION_NEW || 1914 session->s_state == CEPH_MDS_SESSION_CLOSING) 1915 __open_session(mdsc, session); 1916 list_add(&req->r_wait, &session->s_waiting); 1917 goto out_session; 1918 } 1919 1920 /* send request */ 1921 req->r_resend_mds = -1; /* forget any previous mds hint */ 1922 1923 if (req->r_request_started == 0) /* note request start time */ 1924 req->r_request_started = jiffies; 1925 1926 err = __prepare_send_request(mdsc, req, mds); 1927 if (!err) { 1928 ceph_msg_get(req->r_request); 1929 ceph_con_send(&session->s_con, req->r_request); 1930 } 1931 1932 out_session: 1933 ceph_put_mds_session(session); 1934 out: 1935 return err; 1936 1937 finish: 1938 req->r_err = err; 1939 complete_request(mdsc, req); 1940 goto out; 1941 } 1942 1943 /* 1944 * called under mdsc->mutex 1945 */ 1946 static void __wake_requests(struct ceph_mds_client *mdsc, 1947 struct list_head *head) 1948 { 1949 struct ceph_mds_request *req; 1950 LIST_HEAD(tmp_list); 1951 1952 list_splice_init(head, &tmp_list); 1953 1954 while (!list_empty(&tmp_list)) { 1955 req = list_entry(tmp_list.next, 1956 struct ceph_mds_request, r_wait); 1957 list_del_init(&req->r_wait); 1958 dout(" wake request %p tid %llu\n", req, req->r_tid); 1959 __do_request(mdsc, req); 1960 } 1961 } 1962 1963 /* 1964 * Wake up threads with requests pending for @mds, so that they can 1965 * resubmit their requests to a possibly different mds. 1966 */ 1967 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 1968 { 1969 struct ceph_mds_request *req; 1970 struct rb_node *p; 1971 1972 dout("kick_requests mds%d\n", mds); 1973 for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) { 1974 req = rb_entry(p, struct ceph_mds_request, r_node); 1975 if (req->r_got_unsafe) 1976 continue; 1977 if (req->r_session && 1978 req->r_session->s_mds == mds) { 1979 dout(" kicking tid %llu\n", req->r_tid); 1980 __do_request(mdsc, req); 1981 } 1982 } 1983 } 1984 1985 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, 1986 struct ceph_mds_request *req) 1987 { 1988 dout("submit_request on %p\n", req); 1989 mutex_lock(&mdsc->mutex); 1990 __register_request(mdsc, req, NULL); 1991 __do_request(mdsc, req); 1992 mutex_unlock(&mdsc->mutex); 1993 } 1994 1995 /* 1996 * Synchrously perform an mds request. Take care of all of the 1997 * session setup, forwarding, retry details. 1998 */ 1999 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 2000 struct inode *dir, 2001 struct ceph_mds_request *req) 2002 { 2003 int err; 2004 2005 dout("do_request on %p\n", req); 2006 2007 /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */ 2008 if (req->r_inode) 2009 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2010 if (req->r_locked_dir) 2011 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 2012 if (req->r_old_dentry) 2013 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2014 CEPH_CAP_PIN); 2015 2016 /* issue */ 2017 mutex_lock(&mdsc->mutex); 2018 __register_request(mdsc, req, dir); 2019 __do_request(mdsc, req); 2020 2021 if (req->r_err) { 2022 err = req->r_err; 2023 __unregister_request(mdsc, req); 2024 dout("do_request early error %d\n", err); 2025 goto out; 2026 } 2027 2028 /* wait */ 2029 mutex_unlock(&mdsc->mutex); 2030 dout("do_request waiting\n"); 2031 if (req->r_timeout) { 2032 err = (long)wait_for_completion_killable_timeout( 2033 &req->r_completion, req->r_timeout); 2034 if (err == 0) 2035 err = -EIO; 2036 } else { 2037 err = wait_for_completion_killable(&req->r_completion); 2038 } 2039 dout("do_request waited, got %d\n", err); 2040 mutex_lock(&mdsc->mutex); 2041 2042 /* only abort if we didn't race with a real reply */ 2043 if (req->r_got_result) { 2044 err = le32_to_cpu(req->r_reply_info.head->result); 2045 } else if (err < 0) { 2046 dout("aborted request %lld with %d\n", req->r_tid, err); 2047 2048 /* 2049 * ensure we aren't running concurrently with 2050 * ceph_fill_trace or ceph_readdir_prepopulate, which 2051 * rely on locks (dir mutex) held by our caller. 2052 */ 2053 mutex_lock(&req->r_fill_mutex); 2054 req->r_err = err; 2055 req->r_aborted = true; 2056 mutex_unlock(&req->r_fill_mutex); 2057 2058 if (req->r_locked_dir && 2059 (req->r_op & CEPH_MDS_OP_WRITE)) 2060 ceph_invalidate_dir_request(req); 2061 } else { 2062 err = req->r_err; 2063 } 2064 2065 out: 2066 mutex_unlock(&mdsc->mutex); 2067 dout("do_request %p done, result %d\n", req, err); 2068 return err; 2069 } 2070 2071 /* 2072 * Invalidate dir's completeness, dentry lease state on an aborted MDS 2073 * namespace request. 2074 */ 2075 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 2076 { 2077 struct inode *inode = req->r_locked_dir; 2078 2079 dout("invalidate_dir_request %p (complete, lease(s))\n", inode); 2080 2081 ceph_dir_clear_complete(inode); 2082 if (req->r_dentry) 2083 ceph_invalidate_dentry_lease(req->r_dentry); 2084 if (req->r_old_dentry) 2085 ceph_invalidate_dentry_lease(req->r_old_dentry); 2086 } 2087 2088 /* 2089 * Handle mds reply. 2090 * 2091 * We take the session mutex and parse and process the reply immediately. 2092 * This preserves the logical ordering of replies, capabilities, etc., sent 2093 * by the MDS as they are applied to our local cache. 2094 */ 2095 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 2096 { 2097 struct ceph_mds_client *mdsc = session->s_mdsc; 2098 struct ceph_mds_request *req; 2099 struct ceph_mds_reply_head *head = msg->front.iov_base; 2100 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 2101 u64 tid; 2102 int err, result; 2103 int mds = session->s_mds; 2104 2105 if (msg->front.iov_len < sizeof(*head)) { 2106 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 2107 ceph_msg_dump(msg); 2108 return; 2109 } 2110 2111 /* get request, session */ 2112 tid = le64_to_cpu(msg->hdr.tid); 2113 mutex_lock(&mdsc->mutex); 2114 req = __lookup_request(mdsc, tid); 2115 if (!req) { 2116 dout("handle_reply on unknown tid %llu\n", tid); 2117 mutex_unlock(&mdsc->mutex); 2118 return; 2119 } 2120 dout("handle_reply %p\n", req); 2121 2122 /* correct session? */ 2123 if (req->r_session != session) { 2124 pr_err("mdsc_handle_reply got %llu on session mds%d" 2125 " not mds%d\n", tid, session->s_mds, 2126 req->r_session ? req->r_session->s_mds : -1); 2127 mutex_unlock(&mdsc->mutex); 2128 goto out; 2129 } 2130 2131 /* dup? */ 2132 if ((req->r_got_unsafe && !head->safe) || 2133 (req->r_got_safe && head->safe)) { 2134 pr_warning("got a dup %s reply on %llu from mds%d\n", 2135 head->safe ? "safe" : "unsafe", tid, mds); 2136 mutex_unlock(&mdsc->mutex); 2137 goto out; 2138 } 2139 if (req->r_got_safe && !head->safe) { 2140 pr_warning("got unsafe after safe on %llu from mds%d\n", 2141 tid, mds); 2142 mutex_unlock(&mdsc->mutex); 2143 goto out; 2144 } 2145 2146 result = le32_to_cpu(head->result); 2147 2148 /* 2149 * Handle an ESTALE 2150 * if we're not talking to the authority, send to them 2151 * if the authority has changed while we weren't looking, 2152 * send to new authority 2153 * Otherwise we just have to return an ESTALE 2154 */ 2155 if (result == -ESTALE) { 2156 dout("got ESTALE on request %llu", req->r_tid); 2157 if (!req->r_inode) { 2158 /* do nothing; not an authority problem */ 2159 } else if (req->r_direct_mode != USE_AUTH_MDS) { 2160 dout("not using auth, setting for that now"); 2161 req->r_direct_mode = USE_AUTH_MDS; 2162 __do_request(mdsc, req); 2163 mutex_unlock(&mdsc->mutex); 2164 goto out; 2165 } else { 2166 struct ceph_inode_info *ci = ceph_inode(req->r_inode); 2167 struct ceph_cap *cap = NULL; 2168 2169 if (req->r_session) 2170 cap = ceph_get_cap_for_mds(ci, 2171 req->r_session->s_mds); 2172 2173 dout("already using auth"); 2174 if ((!cap || cap != ci->i_auth_cap) || 2175 (cap->mseq != req->r_sent_on_mseq)) { 2176 dout("but cap changed, so resending"); 2177 __do_request(mdsc, req); 2178 mutex_unlock(&mdsc->mutex); 2179 goto out; 2180 } 2181 } 2182 dout("have to return ESTALE on request %llu", req->r_tid); 2183 } 2184 2185 2186 if (head->safe) { 2187 req->r_got_safe = true; 2188 __unregister_request(mdsc, req); 2189 complete_all(&req->r_safe_completion); 2190 2191 if (req->r_got_unsafe) { 2192 /* 2193 * We already handled the unsafe response, now do the 2194 * cleanup. No need to examine the response; the MDS 2195 * doesn't include any result info in the safe 2196 * response. And even if it did, there is nothing 2197 * useful we could do with a revised return value. 2198 */ 2199 dout("got safe reply %llu, mds%d\n", tid, mds); 2200 list_del_init(&req->r_unsafe_item); 2201 2202 /* last unsafe request during umount? */ 2203 if (mdsc->stopping && !__get_oldest_req(mdsc)) 2204 complete_all(&mdsc->safe_umount_waiters); 2205 mutex_unlock(&mdsc->mutex); 2206 goto out; 2207 } 2208 } else { 2209 req->r_got_unsafe = true; 2210 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 2211 } 2212 2213 dout("handle_reply tid %lld result %d\n", tid, result); 2214 rinfo = &req->r_reply_info; 2215 err = parse_reply_info(msg, rinfo, session->s_con.peer_features); 2216 mutex_unlock(&mdsc->mutex); 2217 2218 mutex_lock(&session->s_mutex); 2219 if (err < 0) { 2220 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 2221 ceph_msg_dump(msg); 2222 goto out_err; 2223 } 2224 2225 /* snap trace */ 2226 if (rinfo->snapblob_len) { 2227 down_write(&mdsc->snap_rwsem); 2228 ceph_update_snap_trace(mdsc, rinfo->snapblob, 2229 rinfo->snapblob + rinfo->snapblob_len, 2230 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP); 2231 downgrade_write(&mdsc->snap_rwsem); 2232 } else { 2233 down_read(&mdsc->snap_rwsem); 2234 } 2235 2236 /* insert trace into our cache */ 2237 mutex_lock(&req->r_fill_mutex); 2238 err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); 2239 if (err == 0) { 2240 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 2241 req->r_op == CEPH_MDS_OP_LSSNAP) && 2242 rinfo->dir_nr) 2243 ceph_readdir_prepopulate(req, req->r_session); 2244 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 2245 } 2246 mutex_unlock(&req->r_fill_mutex); 2247 2248 up_read(&mdsc->snap_rwsem); 2249 out_err: 2250 mutex_lock(&mdsc->mutex); 2251 if (!req->r_aborted) { 2252 if (err) { 2253 req->r_err = err; 2254 } else { 2255 req->r_reply = msg; 2256 ceph_msg_get(msg); 2257 req->r_got_result = true; 2258 } 2259 } else { 2260 dout("reply arrived after request %lld was aborted\n", tid); 2261 } 2262 mutex_unlock(&mdsc->mutex); 2263 2264 ceph_add_cap_releases(mdsc, req->r_session); 2265 mutex_unlock(&session->s_mutex); 2266 2267 /* kick calling process */ 2268 complete_request(mdsc, req); 2269 out: 2270 ceph_mdsc_put_request(req); 2271 return; 2272 } 2273 2274 2275 2276 /* 2277 * handle mds notification that our request has been forwarded. 2278 */ 2279 static void handle_forward(struct ceph_mds_client *mdsc, 2280 struct ceph_mds_session *session, 2281 struct ceph_msg *msg) 2282 { 2283 struct ceph_mds_request *req; 2284 u64 tid = le64_to_cpu(msg->hdr.tid); 2285 u32 next_mds; 2286 u32 fwd_seq; 2287 int err = -EINVAL; 2288 void *p = msg->front.iov_base; 2289 void *end = p + msg->front.iov_len; 2290 2291 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 2292 next_mds = ceph_decode_32(&p); 2293 fwd_seq = ceph_decode_32(&p); 2294 2295 mutex_lock(&mdsc->mutex); 2296 req = __lookup_request(mdsc, tid); 2297 if (!req) { 2298 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 2299 goto out; /* dup reply? */ 2300 } 2301 2302 if (req->r_aborted) { 2303 dout("forward tid %llu aborted, unregistering\n", tid); 2304 __unregister_request(mdsc, req); 2305 } else if (fwd_seq <= req->r_num_fwd) { 2306 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 2307 tid, next_mds, req->r_num_fwd, fwd_seq); 2308 } else { 2309 /* resend. forward race not possible; mds would drop */ 2310 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 2311 BUG_ON(req->r_err); 2312 BUG_ON(req->r_got_result); 2313 req->r_num_fwd = fwd_seq; 2314 req->r_resend_mds = next_mds; 2315 put_request_session(req); 2316 __do_request(mdsc, req); 2317 } 2318 ceph_mdsc_put_request(req); 2319 out: 2320 mutex_unlock(&mdsc->mutex); 2321 return; 2322 2323 bad: 2324 pr_err("mdsc_handle_forward decode error err=%d\n", err); 2325 } 2326 2327 /* 2328 * handle a mds session control message 2329 */ 2330 static void handle_session(struct ceph_mds_session *session, 2331 struct ceph_msg *msg) 2332 { 2333 struct ceph_mds_client *mdsc = session->s_mdsc; 2334 u32 op; 2335 u64 seq; 2336 int mds = session->s_mds; 2337 struct ceph_mds_session_head *h = msg->front.iov_base; 2338 int wake = 0; 2339 2340 /* decode */ 2341 if (msg->front.iov_len != sizeof(*h)) 2342 goto bad; 2343 op = le32_to_cpu(h->op); 2344 seq = le64_to_cpu(h->seq); 2345 2346 mutex_lock(&mdsc->mutex); 2347 if (op == CEPH_SESSION_CLOSE) 2348 __unregister_session(mdsc, session); 2349 /* FIXME: this ttl calculation is generous */ 2350 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 2351 mutex_unlock(&mdsc->mutex); 2352 2353 mutex_lock(&session->s_mutex); 2354 2355 dout("handle_session mds%d %s %p state %s seq %llu\n", 2356 mds, ceph_session_op_name(op), session, 2357 session_state_name(session->s_state), seq); 2358 2359 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 2360 session->s_state = CEPH_MDS_SESSION_OPEN; 2361 pr_info("mds%d came back\n", session->s_mds); 2362 } 2363 2364 switch (op) { 2365 case CEPH_SESSION_OPEN: 2366 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 2367 pr_info("mds%d reconnect success\n", session->s_mds); 2368 session->s_state = CEPH_MDS_SESSION_OPEN; 2369 renewed_caps(mdsc, session, 0); 2370 wake = 1; 2371 if (mdsc->stopping) 2372 __close_session(mdsc, session); 2373 break; 2374 2375 case CEPH_SESSION_RENEWCAPS: 2376 if (session->s_renew_seq == seq) 2377 renewed_caps(mdsc, session, 1); 2378 break; 2379 2380 case CEPH_SESSION_CLOSE: 2381 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 2382 pr_info("mds%d reconnect denied\n", session->s_mds); 2383 remove_session_caps(session); 2384 wake = 1; /* for good measure */ 2385 wake_up_all(&mdsc->session_close_wq); 2386 kick_requests(mdsc, mds); 2387 break; 2388 2389 case CEPH_SESSION_STALE: 2390 pr_info("mds%d caps went stale, renewing\n", 2391 session->s_mds); 2392 spin_lock(&session->s_gen_ttl_lock); 2393 session->s_cap_gen++; 2394 session->s_cap_ttl = jiffies - 1; 2395 spin_unlock(&session->s_gen_ttl_lock); 2396 send_renew_caps(mdsc, session); 2397 break; 2398 2399 case CEPH_SESSION_RECALL_STATE: 2400 trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 2401 break; 2402 2403 default: 2404 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 2405 WARN_ON(1); 2406 } 2407 2408 mutex_unlock(&session->s_mutex); 2409 if (wake) { 2410 mutex_lock(&mdsc->mutex); 2411 __wake_requests(mdsc, &session->s_waiting); 2412 mutex_unlock(&mdsc->mutex); 2413 } 2414 return; 2415 2416 bad: 2417 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 2418 (int)msg->front.iov_len); 2419 ceph_msg_dump(msg); 2420 return; 2421 } 2422 2423 2424 /* 2425 * called under session->mutex. 2426 */ 2427 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 2428 struct ceph_mds_session *session) 2429 { 2430 struct ceph_mds_request *req, *nreq; 2431 int err; 2432 2433 dout("replay_unsafe_requests mds%d\n", session->s_mds); 2434 2435 mutex_lock(&mdsc->mutex); 2436 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) { 2437 err = __prepare_send_request(mdsc, req, session->s_mds); 2438 if (!err) { 2439 ceph_msg_get(req->r_request); 2440 ceph_con_send(&session->s_con, req->r_request); 2441 } 2442 } 2443 mutex_unlock(&mdsc->mutex); 2444 } 2445 2446 /* 2447 * Encode information about a cap for a reconnect with the MDS. 2448 */ 2449 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, 2450 void *arg) 2451 { 2452 union { 2453 struct ceph_mds_cap_reconnect v2; 2454 struct ceph_mds_cap_reconnect_v1 v1; 2455 } rec; 2456 size_t reclen; 2457 struct ceph_inode_info *ci; 2458 struct ceph_reconnect_state *recon_state = arg; 2459 struct ceph_pagelist *pagelist = recon_state->pagelist; 2460 char *path; 2461 int pathlen, err; 2462 u64 pathbase; 2463 struct dentry *dentry; 2464 2465 ci = cap->ci; 2466 2467 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 2468 inode, ceph_vinop(inode), cap, cap->cap_id, 2469 ceph_cap_string(cap->issued)); 2470 err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 2471 if (err) 2472 return err; 2473 2474 dentry = d_find_alias(inode); 2475 if (dentry) { 2476 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0); 2477 if (IS_ERR(path)) { 2478 err = PTR_ERR(path); 2479 goto out_dput; 2480 } 2481 } else { 2482 path = NULL; 2483 pathlen = 0; 2484 } 2485 err = ceph_pagelist_encode_string(pagelist, path, pathlen); 2486 if (err) 2487 goto out_free; 2488 2489 spin_lock(&ci->i_ceph_lock); 2490 cap->seq = 0; /* reset cap seq */ 2491 cap->issue_seq = 0; /* and issue_seq */ 2492 cap->mseq = 0; /* and migrate_seq */ 2493 2494 if (recon_state->flock) { 2495 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 2496 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 2497 rec.v2.issued = cpu_to_le32(cap->issued); 2498 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 2499 rec.v2.pathbase = cpu_to_le64(pathbase); 2500 rec.v2.flock_len = 0; 2501 reclen = sizeof(rec.v2); 2502 } else { 2503 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 2504 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 2505 rec.v1.issued = cpu_to_le32(cap->issued); 2506 rec.v1.size = cpu_to_le64(inode->i_size); 2507 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime); 2508 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime); 2509 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 2510 rec.v1.pathbase = cpu_to_le64(pathbase); 2511 reclen = sizeof(rec.v1); 2512 } 2513 spin_unlock(&ci->i_ceph_lock); 2514 2515 if (recon_state->flock) { 2516 int num_fcntl_locks, num_flock_locks; 2517 struct ceph_filelock *flocks; 2518 2519 encode_again: 2520 spin_lock(&inode->i_lock); 2521 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 2522 spin_unlock(&inode->i_lock); 2523 flocks = kmalloc((num_fcntl_locks+num_flock_locks) * 2524 sizeof(struct ceph_filelock), GFP_NOFS); 2525 if (!flocks) { 2526 err = -ENOMEM; 2527 goto out_free; 2528 } 2529 spin_lock(&inode->i_lock); 2530 err = ceph_encode_locks_to_buffer(inode, flocks, 2531 num_fcntl_locks, 2532 num_flock_locks); 2533 spin_unlock(&inode->i_lock); 2534 if (err) { 2535 kfree(flocks); 2536 if (err == -ENOSPC) 2537 goto encode_again; 2538 goto out_free; 2539 } 2540 /* 2541 * number of encoded locks is stable, so copy to pagelist 2542 */ 2543 rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) + 2544 (num_fcntl_locks+num_flock_locks) * 2545 sizeof(struct ceph_filelock)); 2546 err = ceph_pagelist_append(pagelist, &rec, reclen); 2547 if (!err) 2548 err = ceph_locks_to_pagelist(flocks, pagelist, 2549 num_fcntl_locks, 2550 num_flock_locks); 2551 kfree(flocks); 2552 } else { 2553 err = ceph_pagelist_append(pagelist, &rec, reclen); 2554 } 2555 out_free: 2556 kfree(path); 2557 out_dput: 2558 dput(dentry); 2559 return err; 2560 } 2561 2562 2563 /* 2564 * If an MDS fails and recovers, clients need to reconnect in order to 2565 * reestablish shared state. This includes all caps issued through 2566 * this session _and_ the snap_realm hierarchy. Because it's not 2567 * clear which snap realms the mds cares about, we send everything we 2568 * know about.. that ensures we'll then get any new info the 2569 * recovering MDS might have. 2570 * 2571 * This is a relatively heavyweight operation, but it's rare. 2572 * 2573 * called with mdsc->mutex held. 2574 */ 2575 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 2576 struct ceph_mds_session *session) 2577 { 2578 struct ceph_msg *reply; 2579 struct rb_node *p; 2580 int mds = session->s_mds; 2581 int err = -ENOMEM; 2582 struct ceph_pagelist *pagelist; 2583 struct ceph_reconnect_state recon_state; 2584 2585 pr_info("mds%d reconnect start\n", mds); 2586 2587 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 2588 if (!pagelist) 2589 goto fail_nopagelist; 2590 ceph_pagelist_init(pagelist); 2591 2592 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false); 2593 if (!reply) 2594 goto fail_nomsg; 2595 2596 mutex_lock(&session->s_mutex); 2597 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 2598 session->s_seq = 0; 2599 2600 ceph_con_close(&session->s_con); 2601 ceph_con_open(&session->s_con, 2602 CEPH_ENTITY_TYPE_MDS, mds, 2603 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 2604 2605 /* replay unsafe requests */ 2606 replay_unsafe_requests(mdsc, session); 2607 2608 down_read(&mdsc->snap_rwsem); 2609 2610 dout("session %p state %s\n", session, 2611 session_state_name(session->s_state)); 2612 2613 /* drop old cap expires; we're about to reestablish that state */ 2614 discard_cap_releases(mdsc, session); 2615 2616 /* traverse this session's caps */ 2617 err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps); 2618 if (err) 2619 goto fail; 2620 2621 recon_state.pagelist = pagelist; 2622 recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK; 2623 err = iterate_session_caps(session, encode_caps_cb, &recon_state); 2624 if (err < 0) 2625 goto fail; 2626 2627 /* 2628 * snaprealms. we provide mds with the ino, seq (version), and 2629 * parent for all of our realms. If the mds has any newer info, 2630 * it will tell us. 2631 */ 2632 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 2633 struct ceph_snap_realm *realm = 2634 rb_entry(p, struct ceph_snap_realm, node); 2635 struct ceph_mds_snaprealm_reconnect sr_rec; 2636 2637 dout(" adding snap realm %llx seq %lld parent %llx\n", 2638 realm->ino, realm->seq, realm->parent_ino); 2639 sr_rec.ino = cpu_to_le64(realm->ino); 2640 sr_rec.seq = cpu_to_le64(realm->seq); 2641 sr_rec.parent = cpu_to_le64(realm->parent_ino); 2642 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 2643 if (err) 2644 goto fail; 2645 } 2646 2647 if (recon_state.flock) 2648 reply->hdr.version = cpu_to_le16(2); 2649 if (pagelist->length) { 2650 /* set up outbound data if we have any */ 2651 reply->hdr.data_len = cpu_to_le32(pagelist->length); 2652 ceph_msg_data_add_pagelist(reply, pagelist); 2653 } 2654 ceph_con_send(&session->s_con, reply); 2655 2656 mutex_unlock(&session->s_mutex); 2657 2658 mutex_lock(&mdsc->mutex); 2659 __wake_requests(mdsc, &session->s_waiting); 2660 mutex_unlock(&mdsc->mutex); 2661 2662 up_read(&mdsc->snap_rwsem); 2663 return; 2664 2665 fail: 2666 ceph_msg_put(reply); 2667 up_read(&mdsc->snap_rwsem); 2668 mutex_unlock(&session->s_mutex); 2669 fail_nomsg: 2670 ceph_pagelist_release(pagelist); 2671 kfree(pagelist); 2672 fail_nopagelist: 2673 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 2674 return; 2675 } 2676 2677 2678 /* 2679 * compare old and new mdsmaps, kicking requests 2680 * and closing out old connections as necessary 2681 * 2682 * called under mdsc->mutex. 2683 */ 2684 static void check_new_map(struct ceph_mds_client *mdsc, 2685 struct ceph_mdsmap *newmap, 2686 struct ceph_mdsmap *oldmap) 2687 { 2688 int i; 2689 int oldstate, newstate; 2690 struct ceph_mds_session *s; 2691 2692 dout("check_new_map new %u old %u\n", 2693 newmap->m_epoch, oldmap->m_epoch); 2694 2695 for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) { 2696 if (mdsc->sessions[i] == NULL) 2697 continue; 2698 s = mdsc->sessions[i]; 2699 oldstate = ceph_mdsmap_get_state(oldmap, i); 2700 newstate = ceph_mdsmap_get_state(newmap, i); 2701 2702 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 2703 i, ceph_mds_state_name(oldstate), 2704 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 2705 ceph_mds_state_name(newstate), 2706 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 2707 session_state_name(s->s_state)); 2708 2709 if (i >= newmap->m_max_mds || 2710 memcmp(ceph_mdsmap_get_addr(oldmap, i), 2711 ceph_mdsmap_get_addr(newmap, i), 2712 sizeof(struct ceph_entity_addr))) { 2713 if (s->s_state == CEPH_MDS_SESSION_OPENING) { 2714 /* the session never opened, just close it 2715 * out now */ 2716 __wake_requests(mdsc, &s->s_waiting); 2717 __unregister_session(mdsc, s); 2718 } else { 2719 /* just close it */ 2720 mutex_unlock(&mdsc->mutex); 2721 mutex_lock(&s->s_mutex); 2722 mutex_lock(&mdsc->mutex); 2723 ceph_con_close(&s->s_con); 2724 mutex_unlock(&s->s_mutex); 2725 s->s_state = CEPH_MDS_SESSION_RESTARTING; 2726 } 2727 2728 /* kick any requests waiting on the recovering mds */ 2729 kick_requests(mdsc, i); 2730 } else if (oldstate == newstate) { 2731 continue; /* nothing new with this mds */ 2732 } 2733 2734 /* 2735 * send reconnect? 2736 */ 2737 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 2738 newstate >= CEPH_MDS_STATE_RECONNECT) { 2739 mutex_unlock(&mdsc->mutex); 2740 send_mds_reconnect(mdsc, s); 2741 mutex_lock(&mdsc->mutex); 2742 } 2743 2744 /* 2745 * kick request on any mds that has gone active. 2746 */ 2747 if (oldstate < CEPH_MDS_STATE_ACTIVE && 2748 newstate >= CEPH_MDS_STATE_ACTIVE) { 2749 if (oldstate != CEPH_MDS_STATE_CREATING && 2750 oldstate != CEPH_MDS_STATE_STARTING) 2751 pr_info("mds%d recovery completed\n", s->s_mds); 2752 kick_requests(mdsc, i); 2753 ceph_kick_flushing_caps(mdsc, s); 2754 wake_up_session_caps(s, 1); 2755 } 2756 } 2757 2758 for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) { 2759 s = mdsc->sessions[i]; 2760 if (!s) 2761 continue; 2762 if (!ceph_mdsmap_is_laggy(newmap, i)) 2763 continue; 2764 if (s->s_state == CEPH_MDS_SESSION_OPEN || 2765 s->s_state == CEPH_MDS_SESSION_HUNG || 2766 s->s_state == CEPH_MDS_SESSION_CLOSING) { 2767 dout(" connecting to export targets of laggy mds%d\n", 2768 i); 2769 __open_export_target_sessions(mdsc, s); 2770 } 2771 } 2772 } 2773 2774 2775 2776 /* 2777 * leases 2778 */ 2779 2780 /* 2781 * caller must hold session s_mutex, dentry->d_lock 2782 */ 2783 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 2784 { 2785 struct ceph_dentry_info *di = ceph_dentry(dentry); 2786 2787 ceph_put_mds_session(di->lease_session); 2788 di->lease_session = NULL; 2789 } 2790 2791 static void handle_lease(struct ceph_mds_client *mdsc, 2792 struct ceph_mds_session *session, 2793 struct ceph_msg *msg) 2794 { 2795 struct super_block *sb = mdsc->fsc->sb; 2796 struct inode *inode; 2797 struct dentry *parent, *dentry; 2798 struct ceph_dentry_info *di; 2799 int mds = session->s_mds; 2800 struct ceph_mds_lease *h = msg->front.iov_base; 2801 u32 seq; 2802 struct ceph_vino vino; 2803 struct qstr dname; 2804 int release = 0; 2805 2806 dout("handle_lease from mds%d\n", mds); 2807 2808 /* decode */ 2809 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 2810 goto bad; 2811 vino.ino = le64_to_cpu(h->ino); 2812 vino.snap = CEPH_NOSNAP; 2813 seq = le32_to_cpu(h->seq); 2814 dname.name = (void *)h + sizeof(*h) + sizeof(u32); 2815 dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32); 2816 if (dname.len != get_unaligned_le32(h+1)) 2817 goto bad; 2818 2819 mutex_lock(&session->s_mutex); 2820 session->s_seq++; 2821 2822 /* lookup inode */ 2823 inode = ceph_find_inode(sb, vino); 2824 dout("handle_lease %s, ino %llx %p %.*s\n", 2825 ceph_lease_op_name(h->action), vino.ino, inode, 2826 dname.len, dname.name); 2827 if (inode == NULL) { 2828 dout("handle_lease no inode %llx\n", vino.ino); 2829 goto release; 2830 } 2831 2832 /* dentry */ 2833 parent = d_find_alias(inode); 2834 if (!parent) { 2835 dout("no parent dentry on inode %p\n", inode); 2836 WARN_ON(1); 2837 goto release; /* hrm... */ 2838 } 2839 dname.hash = full_name_hash(dname.name, dname.len); 2840 dentry = d_lookup(parent, &dname); 2841 dput(parent); 2842 if (!dentry) 2843 goto release; 2844 2845 spin_lock(&dentry->d_lock); 2846 di = ceph_dentry(dentry); 2847 switch (h->action) { 2848 case CEPH_MDS_LEASE_REVOKE: 2849 if (di->lease_session == session) { 2850 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 2851 h->seq = cpu_to_le32(di->lease_seq); 2852 __ceph_mdsc_drop_dentry_lease(dentry); 2853 } 2854 release = 1; 2855 break; 2856 2857 case CEPH_MDS_LEASE_RENEW: 2858 if (di->lease_session == session && 2859 di->lease_gen == session->s_cap_gen && 2860 di->lease_renew_from && 2861 di->lease_renew_after == 0) { 2862 unsigned long duration = 2863 le32_to_cpu(h->duration_ms) * HZ / 1000; 2864 2865 di->lease_seq = seq; 2866 dentry->d_time = di->lease_renew_from + duration; 2867 di->lease_renew_after = di->lease_renew_from + 2868 (duration >> 1); 2869 di->lease_renew_from = 0; 2870 } 2871 break; 2872 } 2873 spin_unlock(&dentry->d_lock); 2874 dput(dentry); 2875 2876 if (!release) 2877 goto out; 2878 2879 release: 2880 /* let's just reuse the same message */ 2881 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 2882 ceph_msg_get(msg); 2883 ceph_con_send(&session->s_con, msg); 2884 2885 out: 2886 iput(inode); 2887 mutex_unlock(&session->s_mutex); 2888 return; 2889 2890 bad: 2891 pr_err("corrupt lease message\n"); 2892 ceph_msg_dump(msg); 2893 } 2894 2895 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 2896 struct inode *inode, 2897 struct dentry *dentry, char action, 2898 u32 seq) 2899 { 2900 struct ceph_msg *msg; 2901 struct ceph_mds_lease *lease; 2902 int len = sizeof(*lease) + sizeof(u32); 2903 int dnamelen = 0; 2904 2905 dout("lease_send_msg inode %p dentry %p %s to mds%d\n", 2906 inode, dentry, ceph_lease_op_name(action), session->s_mds); 2907 dnamelen = dentry->d_name.len; 2908 len += dnamelen; 2909 2910 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 2911 if (!msg) 2912 return; 2913 lease = msg->front.iov_base; 2914 lease->action = action; 2915 lease->ino = cpu_to_le64(ceph_vino(inode).ino); 2916 lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap); 2917 lease->seq = cpu_to_le32(seq); 2918 put_unaligned_le32(dnamelen, lease + 1); 2919 memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen); 2920 2921 /* 2922 * if this is a preemptive lease RELEASE, no need to 2923 * flush request stream, since the actual request will 2924 * soon follow. 2925 */ 2926 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 2927 2928 ceph_con_send(&session->s_con, msg); 2929 } 2930 2931 /* 2932 * Preemptively release a lease we expect to invalidate anyway. 2933 * Pass @inode always, @dentry is optional. 2934 */ 2935 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, 2936 struct dentry *dentry) 2937 { 2938 struct ceph_dentry_info *di; 2939 struct ceph_mds_session *session; 2940 u32 seq; 2941 2942 BUG_ON(inode == NULL); 2943 BUG_ON(dentry == NULL); 2944 2945 /* is dentry lease valid? */ 2946 spin_lock(&dentry->d_lock); 2947 di = ceph_dentry(dentry); 2948 if (!di || !di->lease_session || 2949 di->lease_session->s_mds < 0 || 2950 di->lease_gen != di->lease_session->s_cap_gen || 2951 !time_before(jiffies, dentry->d_time)) { 2952 dout("lease_release inode %p dentry %p -- " 2953 "no lease\n", 2954 inode, dentry); 2955 spin_unlock(&dentry->d_lock); 2956 return; 2957 } 2958 2959 /* we do have a lease on this dentry; note mds and seq */ 2960 session = ceph_get_mds_session(di->lease_session); 2961 seq = di->lease_seq; 2962 __ceph_mdsc_drop_dentry_lease(dentry); 2963 spin_unlock(&dentry->d_lock); 2964 2965 dout("lease_release inode %p dentry %p to mds%d\n", 2966 inode, dentry, session->s_mds); 2967 ceph_mdsc_lease_send_msg(session, inode, dentry, 2968 CEPH_MDS_LEASE_RELEASE, seq); 2969 ceph_put_mds_session(session); 2970 } 2971 2972 /* 2973 * drop all leases (and dentry refs) in preparation for umount 2974 */ 2975 static void drop_leases(struct ceph_mds_client *mdsc) 2976 { 2977 int i; 2978 2979 dout("drop_leases\n"); 2980 mutex_lock(&mdsc->mutex); 2981 for (i = 0; i < mdsc->max_sessions; i++) { 2982 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 2983 if (!s) 2984 continue; 2985 mutex_unlock(&mdsc->mutex); 2986 mutex_lock(&s->s_mutex); 2987 mutex_unlock(&s->s_mutex); 2988 ceph_put_mds_session(s); 2989 mutex_lock(&mdsc->mutex); 2990 } 2991 mutex_unlock(&mdsc->mutex); 2992 } 2993 2994 2995 2996 /* 2997 * delayed work -- periodically trim expired leases, renew caps with mds 2998 */ 2999 static void schedule_delayed(struct ceph_mds_client *mdsc) 3000 { 3001 int delay = 5; 3002 unsigned hz = round_jiffies_relative(HZ * delay); 3003 schedule_delayed_work(&mdsc->delayed_work, hz); 3004 } 3005 3006 static void delayed_work(struct work_struct *work) 3007 { 3008 int i; 3009 struct ceph_mds_client *mdsc = 3010 container_of(work, struct ceph_mds_client, delayed_work.work); 3011 int renew_interval; 3012 int renew_caps; 3013 3014 dout("mdsc delayed_work\n"); 3015 ceph_check_delayed_caps(mdsc); 3016 3017 mutex_lock(&mdsc->mutex); 3018 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 3019 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 3020 mdsc->last_renew_caps); 3021 if (renew_caps) 3022 mdsc->last_renew_caps = jiffies; 3023 3024 for (i = 0; i < mdsc->max_sessions; i++) { 3025 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 3026 if (s == NULL) 3027 continue; 3028 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 3029 dout("resending session close request for mds%d\n", 3030 s->s_mds); 3031 request_close_session(mdsc, s); 3032 ceph_put_mds_session(s); 3033 continue; 3034 } 3035 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 3036 if (s->s_state == CEPH_MDS_SESSION_OPEN) { 3037 s->s_state = CEPH_MDS_SESSION_HUNG; 3038 pr_info("mds%d hung\n", s->s_mds); 3039 } 3040 } 3041 if (s->s_state < CEPH_MDS_SESSION_OPEN) { 3042 /* this mds is failed or recovering, just wait */ 3043 ceph_put_mds_session(s); 3044 continue; 3045 } 3046 mutex_unlock(&mdsc->mutex); 3047 3048 mutex_lock(&s->s_mutex); 3049 if (renew_caps) 3050 send_renew_caps(mdsc, s); 3051 else 3052 ceph_con_keepalive(&s->s_con); 3053 ceph_add_cap_releases(mdsc, s); 3054 if (s->s_state == CEPH_MDS_SESSION_OPEN || 3055 s->s_state == CEPH_MDS_SESSION_HUNG) 3056 ceph_send_cap_releases(mdsc, s); 3057 mutex_unlock(&s->s_mutex); 3058 ceph_put_mds_session(s); 3059 3060 mutex_lock(&mdsc->mutex); 3061 } 3062 mutex_unlock(&mdsc->mutex); 3063 3064 schedule_delayed(mdsc); 3065 } 3066 3067 int ceph_mdsc_init(struct ceph_fs_client *fsc) 3068 3069 { 3070 struct ceph_mds_client *mdsc; 3071 3072 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 3073 if (!mdsc) 3074 return -ENOMEM; 3075 mdsc->fsc = fsc; 3076 fsc->mdsc = mdsc; 3077 mutex_init(&mdsc->mutex); 3078 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 3079 if (mdsc->mdsmap == NULL) { 3080 kfree(mdsc); 3081 return -ENOMEM; 3082 } 3083 3084 init_completion(&mdsc->safe_umount_waiters); 3085 init_waitqueue_head(&mdsc->session_close_wq); 3086 INIT_LIST_HEAD(&mdsc->waiting_for_map); 3087 mdsc->sessions = NULL; 3088 mdsc->max_sessions = 0; 3089 mdsc->stopping = 0; 3090 init_rwsem(&mdsc->snap_rwsem); 3091 mdsc->snap_realms = RB_ROOT; 3092 INIT_LIST_HEAD(&mdsc->snap_empty); 3093 spin_lock_init(&mdsc->snap_empty_lock); 3094 mdsc->last_tid = 0; 3095 mdsc->request_tree = RB_ROOT; 3096 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 3097 mdsc->last_renew_caps = jiffies; 3098 INIT_LIST_HEAD(&mdsc->cap_delay_list); 3099 spin_lock_init(&mdsc->cap_delay_lock); 3100 INIT_LIST_HEAD(&mdsc->snap_flush_list); 3101 spin_lock_init(&mdsc->snap_flush_lock); 3102 mdsc->cap_flush_seq = 0; 3103 INIT_LIST_HEAD(&mdsc->cap_dirty); 3104 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 3105 mdsc->num_cap_flushing = 0; 3106 spin_lock_init(&mdsc->cap_dirty_lock); 3107 init_waitqueue_head(&mdsc->cap_flushing_wq); 3108 spin_lock_init(&mdsc->dentry_lru_lock); 3109 INIT_LIST_HEAD(&mdsc->dentry_lru); 3110 3111 ceph_caps_init(mdsc); 3112 ceph_adjust_min_caps(mdsc, fsc->min_caps); 3113 3114 return 0; 3115 } 3116 3117 /* 3118 * Wait for safe replies on open mds requests. If we time out, drop 3119 * all requests from the tree to avoid dangling dentry refs. 3120 */ 3121 static void wait_requests(struct ceph_mds_client *mdsc) 3122 { 3123 struct ceph_mds_request *req; 3124 struct ceph_fs_client *fsc = mdsc->fsc; 3125 3126 mutex_lock(&mdsc->mutex); 3127 if (__get_oldest_req(mdsc)) { 3128 mutex_unlock(&mdsc->mutex); 3129 3130 dout("wait_requests waiting for requests\n"); 3131 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 3132 fsc->client->options->mount_timeout * HZ); 3133 3134 /* tear down remaining requests */ 3135 mutex_lock(&mdsc->mutex); 3136 while ((req = __get_oldest_req(mdsc))) { 3137 dout("wait_requests timed out on tid %llu\n", 3138 req->r_tid); 3139 __unregister_request(mdsc, req); 3140 } 3141 } 3142 mutex_unlock(&mdsc->mutex); 3143 dout("wait_requests done\n"); 3144 } 3145 3146 /* 3147 * called before mount is ro, and before dentries are torn down. 3148 * (hmm, does this still race with new lookups?) 3149 */ 3150 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 3151 { 3152 dout("pre_umount\n"); 3153 mdsc->stopping = 1; 3154 3155 drop_leases(mdsc); 3156 ceph_flush_dirty_caps(mdsc); 3157 wait_requests(mdsc); 3158 3159 /* 3160 * wait for reply handlers to drop their request refs and 3161 * their inode/dcache refs 3162 */ 3163 ceph_msgr_flush(); 3164 } 3165 3166 /* 3167 * wait for all write mds requests to flush. 3168 */ 3169 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 3170 { 3171 struct ceph_mds_request *req = NULL, *nextreq; 3172 struct rb_node *n; 3173 3174 mutex_lock(&mdsc->mutex); 3175 dout("wait_unsafe_requests want %lld\n", want_tid); 3176 restart: 3177 req = __get_oldest_req(mdsc); 3178 while (req && req->r_tid <= want_tid) { 3179 /* find next request */ 3180 n = rb_next(&req->r_node); 3181 if (n) 3182 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 3183 else 3184 nextreq = NULL; 3185 if ((req->r_op & CEPH_MDS_OP_WRITE)) { 3186 /* write op */ 3187 ceph_mdsc_get_request(req); 3188 if (nextreq) 3189 ceph_mdsc_get_request(nextreq); 3190 mutex_unlock(&mdsc->mutex); 3191 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 3192 req->r_tid, want_tid); 3193 wait_for_completion(&req->r_safe_completion); 3194 mutex_lock(&mdsc->mutex); 3195 ceph_mdsc_put_request(req); 3196 if (!nextreq) 3197 break; /* next dne before, so we're done! */ 3198 if (RB_EMPTY_NODE(&nextreq->r_node)) { 3199 /* next request was removed from tree */ 3200 ceph_mdsc_put_request(nextreq); 3201 goto restart; 3202 } 3203 ceph_mdsc_put_request(nextreq); /* won't go away */ 3204 } 3205 req = nextreq; 3206 } 3207 mutex_unlock(&mdsc->mutex); 3208 dout("wait_unsafe_requests done\n"); 3209 } 3210 3211 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 3212 { 3213 u64 want_tid, want_flush; 3214 3215 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) 3216 return; 3217 3218 dout("sync\n"); 3219 mutex_lock(&mdsc->mutex); 3220 want_tid = mdsc->last_tid; 3221 want_flush = mdsc->cap_flush_seq; 3222 mutex_unlock(&mdsc->mutex); 3223 dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush); 3224 3225 ceph_flush_dirty_caps(mdsc); 3226 3227 wait_unsafe_requests(mdsc, want_tid); 3228 wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush)); 3229 } 3230 3231 /* 3232 * true if all sessions are closed, or we force unmount 3233 */ 3234 static bool done_closing_sessions(struct ceph_mds_client *mdsc) 3235 { 3236 int i, n = 0; 3237 3238 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) 3239 return true; 3240 3241 mutex_lock(&mdsc->mutex); 3242 for (i = 0; i < mdsc->max_sessions; i++) 3243 if (mdsc->sessions[i]) 3244 n++; 3245 mutex_unlock(&mdsc->mutex); 3246 return n == 0; 3247 } 3248 3249 /* 3250 * called after sb is ro. 3251 */ 3252 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 3253 { 3254 struct ceph_mds_session *session; 3255 int i; 3256 struct ceph_fs_client *fsc = mdsc->fsc; 3257 unsigned long timeout = fsc->client->options->mount_timeout * HZ; 3258 3259 dout("close_sessions\n"); 3260 3261 /* close sessions */ 3262 mutex_lock(&mdsc->mutex); 3263 for (i = 0; i < mdsc->max_sessions; i++) { 3264 session = __ceph_lookup_mds_session(mdsc, i); 3265 if (!session) 3266 continue; 3267 mutex_unlock(&mdsc->mutex); 3268 mutex_lock(&session->s_mutex); 3269 __close_session(mdsc, session); 3270 mutex_unlock(&session->s_mutex); 3271 ceph_put_mds_session(session); 3272 mutex_lock(&mdsc->mutex); 3273 } 3274 mutex_unlock(&mdsc->mutex); 3275 3276 dout("waiting for sessions to close\n"); 3277 wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc), 3278 timeout); 3279 3280 /* tear down remaining sessions */ 3281 mutex_lock(&mdsc->mutex); 3282 for (i = 0; i < mdsc->max_sessions; i++) { 3283 if (mdsc->sessions[i]) { 3284 session = get_session(mdsc->sessions[i]); 3285 __unregister_session(mdsc, session); 3286 mutex_unlock(&mdsc->mutex); 3287 mutex_lock(&session->s_mutex); 3288 remove_session_caps(session); 3289 mutex_unlock(&session->s_mutex); 3290 ceph_put_mds_session(session); 3291 mutex_lock(&mdsc->mutex); 3292 } 3293 } 3294 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 3295 mutex_unlock(&mdsc->mutex); 3296 3297 ceph_cleanup_empty_realms(mdsc); 3298 3299 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 3300 3301 dout("stopped\n"); 3302 } 3303 3304 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 3305 { 3306 dout("stop\n"); 3307 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 3308 if (mdsc->mdsmap) 3309 ceph_mdsmap_destroy(mdsc->mdsmap); 3310 kfree(mdsc->sessions); 3311 ceph_caps_finalize(mdsc); 3312 } 3313 3314 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 3315 { 3316 struct ceph_mds_client *mdsc = fsc->mdsc; 3317 3318 dout("mdsc_destroy %p\n", mdsc); 3319 ceph_mdsc_stop(mdsc); 3320 3321 /* flush out any connection work with references to us */ 3322 ceph_msgr_flush(); 3323 3324 fsc->mdsc = NULL; 3325 kfree(mdsc); 3326 dout("mdsc_destroy %p done\n", mdsc); 3327 } 3328 3329 3330 /* 3331 * handle mds map update. 3332 */ 3333 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 3334 { 3335 u32 epoch; 3336 u32 maplen; 3337 void *p = msg->front.iov_base; 3338 void *end = p + msg->front.iov_len; 3339 struct ceph_mdsmap *newmap, *oldmap; 3340 struct ceph_fsid fsid; 3341 int err = -EINVAL; 3342 3343 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 3344 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 3345 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 3346 return; 3347 epoch = ceph_decode_32(&p); 3348 maplen = ceph_decode_32(&p); 3349 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 3350 3351 /* do we need it? */ 3352 ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch); 3353 mutex_lock(&mdsc->mutex); 3354 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 3355 dout("handle_map epoch %u <= our %u\n", 3356 epoch, mdsc->mdsmap->m_epoch); 3357 mutex_unlock(&mdsc->mutex); 3358 return; 3359 } 3360 3361 newmap = ceph_mdsmap_decode(&p, end); 3362 if (IS_ERR(newmap)) { 3363 err = PTR_ERR(newmap); 3364 goto bad_unlock; 3365 } 3366 3367 /* swap into place */ 3368 if (mdsc->mdsmap) { 3369 oldmap = mdsc->mdsmap; 3370 mdsc->mdsmap = newmap; 3371 check_new_map(mdsc, newmap, oldmap); 3372 ceph_mdsmap_destroy(oldmap); 3373 } else { 3374 mdsc->mdsmap = newmap; /* first mds map */ 3375 } 3376 mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size; 3377 3378 __wake_requests(mdsc, &mdsc->waiting_for_map); 3379 3380 mutex_unlock(&mdsc->mutex); 3381 schedule_delayed(mdsc); 3382 return; 3383 3384 bad_unlock: 3385 mutex_unlock(&mdsc->mutex); 3386 bad: 3387 pr_err("error decoding mdsmap %d\n", err); 3388 return; 3389 } 3390 3391 static struct ceph_connection *con_get(struct ceph_connection *con) 3392 { 3393 struct ceph_mds_session *s = con->private; 3394 3395 if (get_session(s)) { 3396 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref)); 3397 return con; 3398 } 3399 dout("mdsc con_get %p FAIL\n", s); 3400 return NULL; 3401 } 3402 3403 static void con_put(struct ceph_connection *con) 3404 { 3405 struct ceph_mds_session *s = con->private; 3406 3407 dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1); 3408 ceph_put_mds_session(s); 3409 } 3410 3411 /* 3412 * if the client is unresponsive for long enough, the mds will kill 3413 * the session entirely. 3414 */ 3415 static void peer_reset(struct ceph_connection *con) 3416 { 3417 struct ceph_mds_session *s = con->private; 3418 struct ceph_mds_client *mdsc = s->s_mdsc; 3419 3420 pr_warning("mds%d closed our session\n", s->s_mds); 3421 send_mds_reconnect(mdsc, s); 3422 } 3423 3424 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 3425 { 3426 struct ceph_mds_session *s = con->private; 3427 struct ceph_mds_client *mdsc = s->s_mdsc; 3428 int type = le16_to_cpu(msg->hdr.type); 3429 3430 mutex_lock(&mdsc->mutex); 3431 if (__verify_registered_session(mdsc, s) < 0) { 3432 mutex_unlock(&mdsc->mutex); 3433 goto out; 3434 } 3435 mutex_unlock(&mdsc->mutex); 3436 3437 switch (type) { 3438 case CEPH_MSG_MDS_MAP: 3439 ceph_mdsc_handle_map(mdsc, msg); 3440 break; 3441 case CEPH_MSG_CLIENT_SESSION: 3442 handle_session(s, msg); 3443 break; 3444 case CEPH_MSG_CLIENT_REPLY: 3445 handle_reply(s, msg); 3446 break; 3447 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 3448 handle_forward(mdsc, s, msg); 3449 break; 3450 case CEPH_MSG_CLIENT_CAPS: 3451 ceph_handle_caps(s, msg); 3452 break; 3453 case CEPH_MSG_CLIENT_SNAP: 3454 ceph_handle_snap(mdsc, s, msg); 3455 break; 3456 case CEPH_MSG_CLIENT_LEASE: 3457 handle_lease(mdsc, s, msg); 3458 break; 3459 3460 default: 3461 pr_err("received unknown message type %d %s\n", type, 3462 ceph_msg_type_name(type)); 3463 } 3464 out: 3465 ceph_msg_put(msg); 3466 } 3467 3468 /* 3469 * authentication 3470 */ 3471 3472 /* 3473 * Note: returned pointer is the address of a structure that's 3474 * managed separately. Caller must *not* attempt to free it. 3475 */ 3476 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 3477 int *proto, int force_new) 3478 { 3479 struct ceph_mds_session *s = con->private; 3480 struct ceph_mds_client *mdsc = s->s_mdsc; 3481 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3482 struct ceph_auth_handshake *auth = &s->s_auth; 3483 3484 if (force_new && auth->authorizer) { 3485 ceph_auth_destroy_authorizer(ac, auth->authorizer); 3486 auth->authorizer = NULL; 3487 } 3488 if (!auth->authorizer) { 3489 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 3490 auth); 3491 if (ret) 3492 return ERR_PTR(ret); 3493 } else { 3494 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 3495 auth); 3496 if (ret) 3497 return ERR_PTR(ret); 3498 } 3499 *proto = ac->protocol; 3500 3501 return auth; 3502 } 3503 3504 3505 static int verify_authorizer_reply(struct ceph_connection *con, int len) 3506 { 3507 struct ceph_mds_session *s = con->private; 3508 struct ceph_mds_client *mdsc = s->s_mdsc; 3509 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3510 3511 return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer, len); 3512 } 3513 3514 static int invalidate_authorizer(struct ceph_connection *con) 3515 { 3516 struct ceph_mds_session *s = con->private; 3517 struct ceph_mds_client *mdsc = s->s_mdsc; 3518 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3519 3520 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 3521 3522 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 3523 } 3524 3525 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 3526 struct ceph_msg_header *hdr, int *skip) 3527 { 3528 struct ceph_msg *msg; 3529 int type = (int) le16_to_cpu(hdr->type); 3530 int front_len = (int) le32_to_cpu(hdr->front_len); 3531 3532 if (con->in_msg) 3533 return con->in_msg; 3534 3535 *skip = 0; 3536 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 3537 if (!msg) { 3538 pr_err("unable to allocate msg type %d len %d\n", 3539 type, front_len); 3540 return NULL; 3541 } 3542 3543 return msg; 3544 } 3545 3546 static const struct ceph_connection_operations mds_con_ops = { 3547 .get = con_get, 3548 .put = con_put, 3549 .dispatch = dispatch, 3550 .get_authorizer = get_authorizer, 3551 .verify_authorizer_reply = verify_authorizer_reply, 3552 .invalidate_authorizer = invalidate_authorizer, 3553 .peer_reset = peer_reset, 3554 .alloc_msg = mds_alloc_msg, 3555 }; 3556 3557 /* eof */ 3558