1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/fs.h> 4 #include <linux/wait.h> 5 #include <linux/slab.h> 6 #include <linux/sched.h> 7 #include <linux/debugfs.h> 8 #include <linux/seq_file.h> 9 10 #include "super.h" 11 #include "mds_client.h" 12 13 #include <linux/ceph/messenger.h> 14 #include <linux/ceph/decode.h> 15 #include <linux/ceph/pagelist.h> 16 #include <linux/ceph/auth.h> 17 #include <linux/ceph/debugfs.h> 18 19 /* 20 * A cluster of MDS (metadata server) daemons is responsible for 21 * managing the file system namespace (the directory hierarchy and 22 * inodes) and for coordinating shared access to storage. Metadata is 23 * partitioning hierarchically across a number of servers, and that 24 * partition varies over time as the cluster adjusts the distribution 25 * in order to balance load. 26 * 27 * The MDS client is primarily responsible to managing synchronous 28 * metadata requests for operations like open, unlink, and so forth. 29 * If there is a MDS failure, we find out about it when we (possibly 30 * request and) receive a new MDS map, and can resubmit affected 31 * requests. 32 * 33 * For the most part, though, we take advantage of a lossless 34 * communications channel to the MDS, and do not need to worry about 35 * timing out or resubmitting requests. 36 * 37 * We maintain a stateful "session" with each MDS we interact with. 38 * Within each session, we sent periodic heartbeat messages to ensure 39 * any capabilities or leases we have been issues remain valid. If 40 * the session times out and goes stale, our leases and capabilities 41 * are no longer valid. 42 */ 43 44 struct ceph_reconnect_state { 45 struct ceph_pagelist *pagelist; 46 bool flock; 47 }; 48 49 static void __wake_requests(struct ceph_mds_client *mdsc, 50 struct list_head *head); 51 52 static const struct ceph_connection_operations mds_con_ops; 53 54 55 /* 56 * mds reply parsing 57 */ 58 59 /* 60 * parse individual inode info 61 */ 62 static int parse_reply_info_in(void **p, void *end, 63 struct ceph_mds_reply_info_in *info, 64 int features) 65 { 66 int err = -EIO; 67 68 info->in = *p; 69 *p += sizeof(struct ceph_mds_reply_inode) + 70 sizeof(*info->in->fragtree.splits) * 71 le32_to_cpu(info->in->fragtree.nsplits); 72 73 ceph_decode_32_safe(p, end, info->symlink_len, bad); 74 ceph_decode_need(p, end, info->symlink_len, bad); 75 info->symlink = *p; 76 *p += info->symlink_len; 77 78 if (features & CEPH_FEATURE_DIRLAYOUTHASH) 79 ceph_decode_copy_safe(p, end, &info->dir_layout, 80 sizeof(info->dir_layout), bad); 81 else 82 memset(&info->dir_layout, 0, sizeof(info->dir_layout)); 83 84 ceph_decode_32_safe(p, end, info->xattr_len, bad); 85 ceph_decode_need(p, end, info->xattr_len, bad); 86 info->xattr_data = *p; 87 *p += info->xattr_len; 88 return 0; 89 bad: 90 return err; 91 } 92 93 /* 94 * parse a normal reply, which may contain a (dir+)dentry and/or a 95 * target inode. 96 */ 97 static int parse_reply_info_trace(void **p, void *end, 98 struct ceph_mds_reply_info_parsed *info, 99 int features) 100 { 101 int err; 102 103 if (info->head->is_dentry) { 104 err = parse_reply_info_in(p, end, &info->diri, features); 105 if (err < 0) 106 goto out_bad; 107 108 if (unlikely(*p + sizeof(*info->dirfrag) > end)) 109 goto bad; 110 info->dirfrag = *p; 111 *p += sizeof(*info->dirfrag) + 112 sizeof(u32)*le32_to_cpu(info->dirfrag->ndist); 113 if (unlikely(*p > end)) 114 goto bad; 115 116 ceph_decode_32_safe(p, end, info->dname_len, bad); 117 ceph_decode_need(p, end, info->dname_len, bad); 118 info->dname = *p; 119 *p += info->dname_len; 120 info->dlease = *p; 121 *p += sizeof(*info->dlease); 122 } 123 124 if (info->head->is_target) { 125 err = parse_reply_info_in(p, end, &info->targeti, features); 126 if (err < 0) 127 goto out_bad; 128 } 129 130 if (unlikely(*p != end)) 131 goto bad; 132 return 0; 133 134 bad: 135 err = -EIO; 136 out_bad: 137 pr_err("problem parsing mds trace %d\n", err); 138 return err; 139 } 140 141 /* 142 * parse readdir results 143 */ 144 static int parse_reply_info_dir(void **p, void *end, 145 struct ceph_mds_reply_info_parsed *info, 146 int features) 147 { 148 u32 num, i = 0; 149 int err; 150 151 info->dir_dir = *p; 152 if (*p + sizeof(*info->dir_dir) > end) 153 goto bad; 154 *p += sizeof(*info->dir_dir) + 155 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist); 156 if (*p > end) 157 goto bad; 158 159 ceph_decode_need(p, end, sizeof(num) + 2, bad); 160 num = ceph_decode_32(p); 161 info->dir_end = ceph_decode_8(p); 162 info->dir_complete = ceph_decode_8(p); 163 if (num == 0) 164 goto done; 165 166 /* alloc large array */ 167 info->dir_nr = num; 168 info->dir_in = kcalloc(num, sizeof(*info->dir_in) + 169 sizeof(*info->dir_dname) + 170 sizeof(*info->dir_dname_len) + 171 sizeof(*info->dir_dlease), 172 GFP_NOFS); 173 if (info->dir_in == NULL) { 174 err = -ENOMEM; 175 goto out_bad; 176 } 177 info->dir_dname = (void *)(info->dir_in + num); 178 info->dir_dname_len = (void *)(info->dir_dname + num); 179 info->dir_dlease = (void *)(info->dir_dname_len + num); 180 181 while (num) { 182 /* dentry */ 183 ceph_decode_need(p, end, sizeof(u32)*2, bad); 184 info->dir_dname_len[i] = ceph_decode_32(p); 185 ceph_decode_need(p, end, info->dir_dname_len[i], bad); 186 info->dir_dname[i] = *p; 187 *p += info->dir_dname_len[i]; 188 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i], 189 info->dir_dname[i]); 190 info->dir_dlease[i] = *p; 191 *p += sizeof(struct ceph_mds_reply_lease); 192 193 /* inode */ 194 err = parse_reply_info_in(p, end, &info->dir_in[i], features); 195 if (err < 0) 196 goto out_bad; 197 i++; 198 num--; 199 } 200 201 done: 202 if (*p != end) 203 goto bad; 204 return 0; 205 206 bad: 207 err = -EIO; 208 out_bad: 209 pr_err("problem parsing dir contents %d\n", err); 210 return err; 211 } 212 213 /* 214 * parse fcntl F_GETLK results 215 */ 216 static int parse_reply_info_filelock(void **p, void *end, 217 struct ceph_mds_reply_info_parsed *info, 218 int features) 219 { 220 if (*p + sizeof(*info->filelock_reply) > end) 221 goto bad; 222 223 info->filelock_reply = *p; 224 *p += sizeof(*info->filelock_reply); 225 226 if (unlikely(*p != end)) 227 goto bad; 228 return 0; 229 230 bad: 231 return -EIO; 232 } 233 234 /* 235 * parse extra results 236 */ 237 static int parse_reply_info_extra(void **p, void *end, 238 struct ceph_mds_reply_info_parsed *info, 239 int features) 240 { 241 if (info->head->op == CEPH_MDS_OP_GETFILELOCK) 242 return parse_reply_info_filelock(p, end, info, features); 243 else 244 return parse_reply_info_dir(p, end, info, features); 245 } 246 247 /* 248 * parse entire mds reply 249 */ 250 static int parse_reply_info(struct ceph_msg *msg, 251 struct ceph_mds_reply_info_parsed *info, 252 int features) 253 { 254 void *p, *end; 255 u32 len; 256 int err; 257 258 info->head = msg->front.iov_base; 259 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 260 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 261 262 /* trace */ 263 ceph_decode_32_safe(&p, end, len, bad); 264 if (len > 0) { 265 err = parse_reply_info_trace(&p, p+len, info, features); 266 if (err < 0) 267 goto out_bad; 268 } 269 270 /* extra */ 271 ceph_decode_32_safe(&p, end, len, bad); 272 if (len > 0) { 273 err = parse_reply_info_extra(&p, p+len, info, features); 274 if (err < 0) 275 goto out_bad; 276 } 277 278 /* snap blob */ 279 ceph_decode_32_safe(&p, end, len, bad); 280 info->snapblob_len = len; 281 info->snapblob = p; 282 p += len; 283 284 if (p != end) 285 goto bad; 286 return 0; 287 288 bad: 289 err = -EIO; 290 out_bad: 291 pr_err("mds parse_reply err %d\n", err); 292 return err; 293 } 294 295 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 296 { 297 kfree(info->dir_in); 298 } 299 300 301 /* 302 * sessions 303 */ 304 static const char *session_state_name(int s) 305 { 306 switch (s) { 307 case CEPH_MDS_SESSION_NEW: return "new"; 308 case CEPH_MDS_SESSION_OPENING: return "opening"; 309 case CEPH_MDS_SESSION_OPEN: return "open"; 310 case CEPH_MDS_SESSION_HUNG: return "hung"; 311 case CEPH_MDS_SESSION_CLOSING: return "closing"; 312 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 313 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 314 default: return "???"; 315 } 316 } 317 318 static struct ceph_mds_session *get_session(struct ceph_mds_session *s) 319 { 320 if (atomic_inc_not_zero(&s->s_ref)) { 321 dout("mdsc get_session %p %d -> %d\n", s, 322 atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref)); 323 return s; 324 } else { 325 dout("mdsc get_session %p 0 -- FAIL", s); 326 return NULL; 327 } 328 } 329 330 void ceph_put_mds_session(struct ceph_mds_session *s) 331 { 332 dout("mdsc put_session %p %d -> %d\n", s, 333 atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1); 334 if (atomic_dec_and_test(&s->s_ref)) { 335 if (s->s_authorizer) 336 s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer( 337 s->s_mdsc->fsc->client->monc.auth, 338 s->s_authorizer); 339 kfree(s); 340 } 341 } 342 343 /* 344 * called under mdsc->mutex 345 */ 346 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 347 int mds) 348 { 349 struct ceph_mds_session *session; 350 351 if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL) 352 return NULL; 353 session = mdsc->sessions[mds]; 354 dout("lookup_mds_session %p %d\n", session, 355 atomic_read(&session->s_ref)); 356 get_session(session); 357 return session; 358 } 359 360 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 361 { 362 if (mds >= mdsc->max_sessions) 363 return false; 364 return mdsc->sessions[mds]; 365 } 366 367 static int __verify_registered_session(struct ceph_mds_client *mdsc, 368 struct ceph_mds_session *s) 369 { 370 if (s->s_mds >= mdsc->max_sessions || 371 mdsc->sessions[s->s_mds] != s) 372 return -ENOENT; 373 return 0; 374 } 375 376 /* 377 * create+register a new session for given mds. 378 * called under mdsc->mutex. 379 */ 380 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 381 int mds) 382 { 383 struct ceph_mds_session *s; 384 385 s = kzalloc(sizeof(*s), GFP_NOFS); 386 if (!s) 387 return ERR_PTR(-ENOMEM); 388 s->s_mdsc = mdsc; 389 s->s_mds = mds; 390 s->s_state = CEPH_MDS_SESSION_NEW; 391 s->s_ttl = 0; 392 s->s_seq = 0; 393 mutex_init(&s->s_mutex); 394 395 ceph_con_init(mdsc->fsc->client->msgr, &s->s_con); 396 s->s_con.private = s; 397 s->s_con.ops = &mds_con_ops; 398 s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS; 399 s->s_con.peer_name.num = cpu_to_le64(mds); 400 401 spin_lock_init(&s->s_cap_lock); 402 s->s_cap_gen = 0; 403 s->s_cap_ttl = 0; 404 s->s_renew_requested = 0; 405 s->s_renew_seq = 0; 406 INIT_LIST_HEAD(&s->s_caps); 407 s->s_nr_caps = 0; 408 s->s_trim_caps = 0; 409 atomic_set(&s->s_ref, 1); 410 INIT_LIST_HEAD(&s->s_waiting); 411 INIT_LIST_HEAD(&s->s_unsafe); 412 s->s_num_cap_releases = 0; 413 s->s_cap_iterator = NULL; 414 INIT_LIST_HEAD(&s->s_cap_releases); 415 INIT_LIST_HEAD(&s->s_cap_releases_done); 416 INIT_LIST_HEAD(&s->s_cap_flushing); 417 INIT_LIST_HEAD(&s->s_cap_snaps_flushing); 418 419 dout("register_session mds%d\n", mds); 420 if (mds >= mdsc->max_sessions) { 421 int newmax = 1 << get_count_order(mds+1); 422 struct ceph_mds_session **sa; 423 424 dout("register_session realloc to %d\n", newmax); 425 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 426 if (sa == NULL) 427 goto fail_realloc; 428 if (mdsc->sessions) { 429 memcpy(sa, mdsc->sessions, 430 mdsc->max_sessions * sizeof(void *)); 431 kfree(mdsc->sessions); 432 } 433 mdsc->sessions = sa; 434 mdsc->max_sessions = newmax; 435 } 436 mdsc->sessions[mds] = s; 437 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 438 439 ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 440 441 return s; 442 443 fail_realloc: 444 kfree(s); 445 return ERR_PTR(-ENOMEM); 446 } 447 448 /* 449 * called under mdsc->mutex 450 */ 451 static void __unregister_session(struct ceph_mds_client *mdsc, 452 struct ceph_mds_session *s) 453 { 454 dout("__unregister_session mds%d %p\n", s->s_mds, s); 455 BUG_ON(mdsc->sessions[s->s_mds] != s); 456 mdsc->sessions[s->s_mds] = NULL; 457 ceph_con_close(&s->s_con); 458 ceph_put_mds_session(s); 459 } 460 461 /* 462 * drop session refs in request. 463 * 464 * should be last request ref, or hold mdsc->mutex 465 */ 466 static void put_request_session(struct ceph_mds_request *req) 467 { 468 if (req->r_session) { 469 ceph_put_mds_session(req->r_session); 470 req->r_session = NULL; 471 } 472 } 473 474 void ceph_mdsc_release_request(struct kref *kref) 475 { 476 struct ceph_mds_request *req = container_of(kref, 477 struct ceph_mds_request, 478 r_kref); 479 if (req->r_request) 480 ceph_msg_put(req->r_request); 481 if (req->r_reply) { 482 ceph_msg_put(req->r_reply); 483 destroy_reply_info(&req->r_reply_info); 484 } 485 if (req->r_inode) { 486 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 487 iput(req->r_inode); 488 } 489 if (req->r_locked_dir) 490 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 491 if (req->r_target_inode) 492 iput(req->r_target_inode); 493 if (req->r_dentry) 494 dput(req->r_dentry); 495 if (req->r_old_dentry) { 496 /* 497 * track (and drop pins for) r_old_dentry_dir 498 * separately, since r_old_dentry's d_parent may have 499 * changed between the dir mutex being dropped and 500 * this request being freed. 501 */ 502 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 503 CEPH_CAP_PIN); 504 dput(req->r_old_dentry); 505 iput(req->r_old_dentry_dir); 506 } 507 kfree(req->r_path1); 508 kfree(req->r_path2); 509 put_request_session(req); 510 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 511 kfree(req); 512 } 513 514 /* 515 * lookup session, bump ref if found. 516 * 517 * called under mdsc->mutex. 518 */ 519 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc, 520 u64 tid) 521 { 522 struct ceph_mds_request *req; 523 struct rb_node *n = mdsc->request_tree.rb_node; 524 525 while (n) { 526 req = rb_entry(n, struct ceph_mds_request, r_node); 527 if (tid < req->r_tid) 528 n = n->rb_left; 529 else if (tid > req->r_tid) 530 n = n->rb_right; 531 else { 532 ceph_mdsc_get_request(req); 533 return req; 534 } 535 } 536 return NULL; 537 } 538 539 static void __insert_request(struct ceph_mds_client *mdsc, 540 struct ceph_mds_request *new) 541 { 542 struct rb_node **p = &mdsc->request_tree.rb_node; 543 struct rb_node *parent = NULL; 544 struct ceph_mds_request *req = NULL; 545 546 while (*p) { 547 parent = *p; 548 req = rb_entry(parent, struct ceph_mds_request, r_node); 549 if (new->r_tid < req->r_tid) 550 p = &(*p)->rb_left; 551 else if (new->r_tid > req->r_tid) 552 p = &(*p)->rb_right; 553 else 554 BUG(); 555 } 556 557 rb_link_node(&new->r_node, parent, p); 558 rb_insert_color(&new->r_node, &mdsc->request_tree); 559 } 560 561 /* 562 * Register an in-flight request, and assign a tid. Link to directory 563 * are modifying (if any). 564 * 565 * Called under mdsc->mutex. 566 */ 567 static void __register_request(struct ceph_mds_client *mdsc, 568 struct ceph_mds_request *req, 569 struct inode *dir) 570 { 571 req->r_tid = ++mdsc->last_tid; 572 if (req->r_num_caps) 573 ceph_reserve_caps(mdsc, &req->r_caps_reservation, 574 req->r_num_caps); 575 dout("__register_request %p tid %lld\n", req, req->r_tid); 576 ceph_mdsc_get_request(req); 577 __insert_request(mdsc, req); 578 579 req->r_uid = current_fsuid(); 580 req->r_gid = current_fsgid(); 581 582 if (dir) { 583 struct ceph_inode_info *ci = ceph_inode(dir); 584 585 ihold(dir); 586 spin_lock(&ci->i_unsafe_lock); 587 req->r_unsafe_dir = dir; 588 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 589 spin_unlock(&ci->i_unsafe_lock); 590 } 591 } 592 593 static void __unregister_request(struct ceph_mds_client *mdsc, 594 struct ceph_mds_request *req) 595 { 596 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 597 rb_erase(&req->r_node, &mdsc->request_tree); 598 RB_CLEAR_NODE(&req->r_node); 599 600 if (req->r_unsafe_dir) { 601 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 602 603 spin_lock(&ci->i_unsafe_lock); 604 list_del_init(&req->r_unsafe_dir_item); 605 spin_unlock(&ci->i_unsafe_lock); 606 607 iput(req->r_unsafe_dir); 608 req->r_unsafe_dir = NULL; 609 } 610 611 ceph_mdsc_put_request(req); 612 } 613 614 /* 615 * Choose mds to send request to next. If there is a hint set in the 616 * request (e.g., due to a prior forward hint from the mds), use that. 617 * Otherwise, consult frag tree and/or caps to identify the 618 * appropriate mds. If all else fails, choose randomly. 619 * 620 * Called under mdsc->mutex. 621 */ 622 struct dentry *get_nonsnap_parent(struct dentry *dentry) 623 { 624 /* 625 * we don't need to worry about protecting the d_parent access 626 * here because we never renaming inside the snapped namespace 627 * except to resplice to another snapdir, and either the old or new 628 * result is a valid result. 629 */ 630 while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP) 631 dentry = dentry->d_parent; 632 return dentry; 633 } 634 635 static int __choose_mds(struct ceph_mds_client *mdsc, 636 struct ceph_mds_request *req) 637 { 638 struct inode *inode; 639 struct ceph_inode_info *ci; 640 struct ceph_cap *cap; 641 int mode = req->r_direct_mode; 642 int mds = -1; 643 u32 hash = req->r_direct_hash; 644 bool is_hash = req->r_direct_is_hash; 645 646 /* 647 * is there a specific mds we should try? ignore hint if we have 648 * no session and the mds is not up (active or recovering). 649 */ 650 if (req->r_resend_mds >= 0 && 651 (__have_session(mdsc, req->r_resend_mds) || 652 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 653 dout("choose_mds using resend_mds mds%d\n", 654 req->r_resend_mds); 655 return req->r_resend_mds; 656 } 657 658 if (mode == USE_RANDOM_MDS) 659 goto random; 660 661 inode = NULL; 662 if (req->r_inode) { 663 inode = req->r_inode; 664 } else if (req->r_dentry) { 665 /* ignore race with rename; old or new d_parent is okay */ 666 struct dentry *parent = req->r_dentry->d_parent; 667 struct inode *dir = parent->d_inode; 668 669 if (dir->i_sb != mdsc->fsc->sb) { 670 /* not this fs! */ 671 inode = req->r_dentry->d_inode; 672 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 673 /* direct snapped/virtual snapdir requests 674 * based on parent dir inode */ 675 struct dentry *dn = get_nonsnap_parent(parent); 676 inode = dn->d_inode; 677 dout("__choose_mds using nonsnap parent %p\n", inode); 678 } else if (req->r_dentry->d_inode) { 679 /* dentry target */ 680 inode = req->r_dentry->d_inode; 681 } else { 682 /* dir + name */ 683 inode = dir; 684 hash = ceph_dentry_hash(dir, req->r_dentry); 685 is_hash = true; 686 } 687 } 688 689 dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash, 690 (int)hash, mode); 691 if (!inode) 692 goto random; 693 ci = ceph_inode(inode); 694 695 if (is_hash && S_ISDIR(inode->i_mode)) { 696 struct ceph_inode_frag frag; 697 int found; 698 699 ceph_choose_frag(ci, hash, &frag, &found); 700 if (found) { 701 if (mode == USE_ANY_MDS && frag.ndist > 0) { 702 u8 r; 703 704 /* choose a random replica */ 705 get_random_bytes(&r, 1); 706 r %= frag.ndist; 707 mds = frag.dist[r]; 708 dout("choose_mds %p %llx.%llx " 709 "frag %u mds%d (%d/%d)\n", 710 inode, ceph_vinop(inode), 711 frag.frag, mds, 712 (int)r, frag.ndist); 713 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 714 CEPH_MDS_STATE_ACTIVE) 715 return mds; 716 } 717 718 /* since this file/dir wasn't known to be 719 * replicated, then we want to look for the 720 * authoritative mds. */ 721 mode = USE_AUTH_MDS; 722 if (frag.mds >= 0) { 723 /* choose auth mds */ 724 mds = frag.mds; 725 dout("choose_mds %p %llx.%llx " 726 "frag %u mds%d (auth)\n", 727 inode, ceph_vinop(inode), frag.frag, mds); 728 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 729 CEPH_MDS_STATE_ACTIVE) 730 return mds; 731 } 732 } 733 } 734 735 spin_lock(&inode->i_lock); 736 cap = NULL; 737 if (mode == USE_AUTH_MDS) 738 cap = ci->i_auth_cap; 739 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 740 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 741 if (!cap) { 742 spin_unlock(&inode->i_lock); 743 goto random; 744 } 745 mds = cap->session->s_mds; 746 dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n", 747 inode, ceph_vinop(inode), mds, 748 cap == ci->i_auth_cap ? "auth " : "", cap); 749 spin_unlock(&inode->i_lock); 750 return mds; 751 752 random: 753 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 754 dout("choose_mds chose random mds%d\n", mds); 755 return mds; 756 } 757 758 759 /* 760 * session messages 761 */ 762 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 763 { 764 struct ceph_msg *msg; 765 struct ceph_mds_session_head *h; 766 767 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS); 768 if (!msg) { 769 pr_err("create_session_msg ENOMEM creating msg\n"); 770 return NULL; 771 } 772 h = msg->front.iov_base; 773 h->op = cpu_to_le32(op); 774 h->seq = cpu_to_le64(seq); 775 return msg; 776 } 777 778 /* 779 * send session open request. 780 * 781 * called under mdsc->mutex 782 */ 783 static int __open_session(struct ceph_mds_client *mdsc, 784 struct ceph_mds_session *session) 785 { 786 struct ceph_msg *msg; 787 int mstate; 788 int mds = session->s_mds; 789 790 /* wait for mds to go active? */ 791 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 792 dout("open_session to mds%d (%s)\n", mds, 793 ceph_mds_state_name(mstate)); 794 session->s_state = CEPH_MDS_SESSION_OPENING; 795 session->s_renew_requested = jiffies; 796 797 /* send connect message */ 798 msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq); 799 if (!msg) 800 return -ENOMEM; 801 ceph_con_send(&session->s_con, msg); 802 return 0; 803 } 804 805 /* 806 * open sessions for any export targets for the given mds 807 * 808 * called under mdsc->mutex 809 */ 810 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 811 struct ceph_mds_session *session) 812 { 813 struct ceph_mds_info *mi; 814 struct ceph_mds_session *ts; 815 int i, mds = session->s_mds; 816 int target; 817 818 if (mds >= mdsc->mdsmap->m_max_mds) 819 return; 820 mi = &mdsc->mdsmap->m_info[mds]; 821 dout("open_export_target_sessions for mds%d (%d targets)\n", 822 session->s_mds, mi->num_export_targets); 823 824 for (i = 0; i < mi->num_export_targets; i++) { 825 target = mi->export_targets[i]; 826 ts = __ceph_lookup_mds_session(mdsc, target); 827 if (!ts) { 828 ts = register_session(mdsc, target); 829 if (IS_ERR(ts)) 830 return; 831 } 832 if (session->s_state == CEPH_MDS_SESSION_NEW || 833 session->s_state == CEPH_MDS_SESSION_CLOSING) 834 __open_session(mdsc, session); 835 else 836 dout(" mds%d target mds%d %p is %s\n", session->s_mds, 837 i, ts, session_state_name(ts->s_state)); 838 ceph_put_mds_session(ts); 839 } 840 } 841 842 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 843 struct ceph_mds_session *session) 844 { 845 mutex_lock(&mdsc->mutex); 846 __open_export_target_sessions(mdsc, session); 847 mutex_unlock(&mdsc->mutex); 848 } 849 850 /* 851 * session caps 852 */ 853 854 /* 855 * Free preallocated cap messages assigned to this session 856 */ 857 static void cleanup_cap_releases(struct ceph_mds_session *session) 858 { 859 struct ceph_msg *msg; 860 861 spin_lock(&session->s_cap_lock); 862 while (!list_empty(&session->s_cap_releases)) { 863 msg = list_first_entry(&session->s_cap_releases, 864 struct ceph_msg, list_head); 865 list_del_init(&msg->list_head); 866 ceph_msg_put(msg); 867 } 868 while (!list_empty(&session->s_cap_releases_done)) { 869 msg = list_first_entry(&session->s_cap_releases_done, 870 struct ceph_msg, list_head); 871 list_del_init(&msg->list_head); 872 ceph_msg_put(msg); 873 } 874 spin_unlock(&session->s_cap_lock); 875 } 876 877 /* 878 * Helper to safely iterate over all caps associated with a session, with 879 * special care taken to handle a racing __ceph_remove_cap(). 880 * 881 * Caller must hold session s_mutex. 882 */ 883 static int iterate_session_caps(struct ceph_mds_session *session, 884 int (*cb)(struct inode *, struct ceph_cap *, 885 void *), void *arg) 886 { 887 struct list_head *p; 888 struct ceph_cap *cap; 889 struct inode *inode, *last_inode = NULL; 890 struct ceph_cap *old_cap = NULL; 891 int ret; 892 893 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 894 spin_lock(&session->s_cap_lock); 895 p = session->s_caps.next; 896 while (p != &session->s_caps) { 897 cap = list_entry(p, struct ceph_cap, session_caps); 898 inode = igrab(&cap->ci->vfs_inode); 899 if (!inode) { 900 p = p->next; 901 continue; 902 } 903 session->s_cap_iterator = cap; 904 spin_unlock(&session->s_cap_lock); 905 906 if (last_inode) { 907 iput(last_inode); 908 last_inode = NULL; 909 } 910 if (old_cap) { 911 ceph_put_cap(session->s_mdsc, old_cap); 912 old_cap = NULL; 913 } 914 915 ret = cb(inode, cap, arg); 916 last_inode = inode; 917 918 spin_lock(&session->s_cap_lock); 919 p = p->next; 920 if (cap->ci == NULL) { 921 dout("iterate_session_caps finishing cap %p removal\n", 922 cap); 923 BUG_ON(cap->session != session); 924 list_del_init(&cap->session_caps); 925 session->s_nr_caps--; 926 cap->session = NULL; 927 old_cap = cap; /* put_cap it w/o locks held */ 928 } 929 if (ret < 0) 930 goto out; 931 } 932 ret = 0; 933 out: 934 session->s_cap_iterator = NULL; 935 spin_unlock(&session->s_cap_lock); 936 937 if (last_inode) 938 iput(last_inode); 939 if (old_cap) 940 ceph_put_cap(session->s_mdsc, old_cap); 941 942 return ret; 943 } 944 945 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 946 void *arg) 947 { 948 struct ceph_inode_info *ci = ceph_inode(inode); 949 int drop = 0; 950 951 dout("removing cap %p, ci is %p, inode is %p\n", 952 cap, ci, &ci->vfs_inode); 953 spin_lock(&inode->i_lock); 954 __ceph_remove_cap(cap); 955 if (!__ceph_is_any_real_caps(ci)) { 956 struct ceph_mds_client *mdsc = 957 ceph_sb_to_client(inode->i_sb)->mdsc; 958 959 spin_lock(&mdsc->cap_dirty_lock); 960 if (!list_empty(&ci->i_dirty_item)) { 961 pr_info(" dropping dirty %s state for %p %lld\n", 962 ceph_cap_string(ci->i_dirty_caps), 963 inode, ceph_ino(inode)); 964 ci->i_dirty_caps = 0; 965 list_del_init(&ci->i_dirty_item); 966 drop = 1; 967 } 968 if (!list_empty(&ci->i_flushing_item)) { 969 pr_info(" dropping dirty+flushing %s state for %p %lld\n", 970 ceph_cap_string(ci->i_flushing_caps), 971 inode, ceph_ino(inode)); 972 ci->i_flushing_caps = 0; 973 list_del_init(&ci->i_flushing_item); 974 mdsc->num_cap_flushing--; 975 drop = 1; 976 } 977 if (drop && ci->i_wrbuffer_ref) { 978 pr_info(" dropping dirty data for %p %lld\n", 979 inode, ceph_ino(inode)); 980 ci->i_wrbuffer_ref = 0; 981 ci->i_wrbuffer_ref_head = 0; 982 drop++; 983 } 984 spin_unlock(&mdsc->cap_dirty_lock); 985 } 986 spin_unlock(&inode->i_lock); 987 while (drop--) 988 iput(inode); 989 return 0; 990 } 991 992 /* 993 * caller must hold session s_mutex 994 */ 995 static void remove_session_caps(struct ceph_mds_session *session) 996 { 997 dout("remove_session_caps on %p\n", session); 998 iterate_session_caps(session, remove_session_caps_cb, NULL); 999 BUG_ON(session->s_nr_caps > 0); 1000 BUG_ON(!list_empty(&session->s_cap_flushing)); 1001 cleanup_cap_releases(session); 1002 } 1003 1004 /* 1005 * wake up any threads waiting on this session's caps. if the cap is 1006 * old (didn't get renewed on the client reconnect), remove it now. 1007 * 1008 * caller must hold s_mutex. 1009 */ 1010 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1011 void *arg) 1012 { 1013 struct ceph_inode_info *ci = ceph_inode(inode); 1014 1015 wake_up_all(&ci->i_cap_wq); 1016 if (arg) { 1017 spin_lock(&inode->i_lock); 1018 ci->i_wanted_max_size = 0; 1019 ci->i_requested_max_size = 0; 1020 spin_unlock(&inode->i_lock); 1021 } 1022 return 0; 1023 } 1024 1025 static void wake_up_session_caps(struct ceph_mds_session *session, 1026 int reconnect) 1027 { 1028 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1029 iterate_session_caps(session, wake_up_session_cb, 1030 (void *)(unsigned long)reconnect); 1031 } 1032 1033 /* 1034 * Send periodic message to MDS renewing all currently held caps. The 1035 * ack will reset the expiration for all caps from this session. 1036 * 1037 * caller holds s_mutex 1038 */ 1039 static int send_renew_caps(struct ceph_mds_client *mdsc, 1040 struct ceph_mds_session *session) 1041 { 1042 struct ceph_msg *msg; 1043 int state; 1044 1045 if (time_after_eq(jiffies, session->s_cap_ttl) && 1046 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1047 pr_info("mds%d caps stale\n", session->s_mds); 1048 session->s_renew_requested = jiffies; 1049 1050 /* do not try to renew caps until a recovering mds has reconnected 1051 * with its clients. */ 1052 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1053 if (state < CEPH_MDS_STATE_RECONNECT) { 1054 dout("send_renew_caps ignoring mds%d (%s)\n", 1055 session->s_mds, ceph_mds_state_name(state)); 1056 return 0; 1057 } 1058 1059 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1060 ceph_mds_state_name(state)); 1061 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1062 ++session->s_renew_seq); 1063 if (!msg) 1064 return -ENOMEM; 1065 ceph_con_send(&session->s_con, msg); 1066 return 0; 1067 } 1068 1069 /* 1070 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1071 * 1072 * Called under session->s_mutex 1073 */ 1074 static void renewed_caps(struct ceph_mds_client *mdsc, 1075 struct ceph_mds_session *session, int is_renew) 1076 { 1077 int was_stale; 1078 int wake = 0; 1079 1080 spin_lock(&session->s_cap_lock); 1081 was_stale = is_renew && (session->s_cap_ttl == 0 || 1082 time_after_eq(jiffies, session->s_cap_ttl)); 1083 1084 session->s_cap_ttl = session->s_renew_requested + 1085 mdsc->mdsmap->m_session_timeout*HZ; 1086 1087 if (was_stale) { 1088 if (time_before(jiffies, session->s_cap_ttl)) { 1089 pr_info("mds%d caps renewed\n", session->s_mds); 1090 wake = 1; 1091 } else { 1092 pr_info("mds%d caps still stale\n", session->s_mds); 1093 } 1094 } 1095 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1096 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1097 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1098 spin_unlock(&session->s_cap_lock); 1099 1100 if (wake) 1101 wake_up_session_caps(session, 0); 1102 } 1103 1104 /* 1105 * send a session close request 1106 */ 1107 static int request_close_session(struct ceph_mds_client *mdsc, 1108 struct ceph_mds_session *session) 1109 { 1110 struct ceph_msg *msg; 1111 1112 dout("request_close_session mds%d state %s seq %lld\n", 1113 session->s_mds, session_state_name(session->s_state), 1114 session->s_seq); 1115 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1116 if (!msg) 1117 return -ENOMEM; 1118 ceph_con_send(&session->s_con, msg); 1119 return 0; 1120 } 1121 1122 /* 1123 * Called with s_mutex held. 1124 */ 1125 static int __close_session(struct ceph_mds_client *mdsc, 1126 struct ceph_mds_session *session) 1127 { 1128 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1129 return 0; 1130 session->s_state = CEPH_MDS_SESSION_CLOSING; 1131 return request_close_session(mdsc, session); 1132 } 1133 1134 /* 1135 * Trim old(er) caps. 1136 * 1137 * Because we can't cache an inode without one or more caps, we do 1138 * this indirectly: if a cap is unused, we prune its aliases, at which 1139 * point the inode will hopefully get dropped to. 1140 * 1141 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1142 * memory pressure from the MDS, though, so it needn't be perfect. 1143 */ 1144 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1145 { 1146 struct ceph_mds_session *session = arg; 1147 struct ceph_inode_info *ci = ceph_inode(inode); 1148 int used, oissued, mine; 1149 1150 if (session->s_trim_caps <= 0) 1151 return -1; 1152 1153 spin_lock(&inode->i_lock); 1154 mine = cap->issued | cap->implemented; 1155 used = __ceph_caps_used(ci); 1156 oissued = __ceph_caps_issued_other(ci, cap); 1157 1158 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n", 1159 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1160 ceph_cap_string(used)); 1161 if (ci->i_dirty_caps) 1162 goto out; /* dirty caps */ 1163 if ((used & ~oissued) & mine) 1164 goto out; /* we need these caps */ 1165 1166 session->s_trim_caps--; 1167 if (oissued) { 1168 /* we aren't the only cap.. just remove us */ 1169 __ceph_remove_cap(cap); 1170 } else { 1171 /* try to drop referring dentries */ 1172 spin_unlock(&inode->i_lock); 1173 d_prune_aliases(inode); 1174 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1175 inode, cap, atomic_read(&inode->i_count)); 1176 return 0; 1177 } 1178 1179 out: 1180 spin_unlock(&inode->i_lock); 1181 return 0; 1182 } 1183 1184 /* 1185 * Trim session cap count down to some max number. 1186 */ 1187 static int trim_caps(struct ceph_mds_client *mdsc, 1188 struct ceph_mds_session *session, 1189 int max_caps) 1190 { 1191 int trim_caps = session->s_nr_caps - max_caps; 1192 1193 dout("trim_caps mds%d start: %d / %d, trim %d\n", 1194 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 1195 if (trim_caps > 0) { 1196 session->s_trim_caps = trim_caps; 1197 iterate_session_caps(session, trim_caps_cb, session); 1198 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 1199 session->s_mds, session->s_nr_caps, max_caps, 1200 trim_caps - session->s_trim_caps); 1201 session->s_trim_caps = 0; 1202 } 1203 return 0; 1204 } 1205 1206 /* 1207 * Allocate cap_release messages. If there is a partially full message 1208 * in the queue, try to allocate enough to cover it's remainder, so that 1209 * we can send it immediately. 1210 * 1211 * Called under s_mutex. 1212 */ 1213 int ceph_add_cap_releases(struct ceph_mds_client *mdsc, 1214 struct ceph_mds_session *session) 1215 { 1216 struct ceph_msg *msg, *partial = NULL; 1217 struct ceph_mds_cap_release *head; 1218 int err = -ENOMEM; 1219 int extra = mdsc->fsc->mount_options->cap_release_safety; 1220 int num; 1221 1222 dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds, 1223 extra); 1224 1225 spin_lock(&session->s_cap_lock); 1226 1227 if (!list_empty(&session->s_cap_releases)) { 1228 msg = list_first_entry(&session->s_cap_releases, 1229 struct ceph_msg, 1230 list_head); 1231 head = msg->front.iov_base; 1232 num = le32_to_cpu(head->num); 1233 if (num) { 1234 dout(" partial %p with (%d/%d)\n", msg, num, 1235 (int)CEPH_CAPS_PER_RELEASE); 1236 extra += CEPH_CAPS_PER_RELEASE - num; 1237 partial = msg; 1238 } 1239 } 1240 while (session->s_num_cap_releases < session->s_nr_caps + extra) { 1241 spin_unlock(&session->s_cap_lock); 1242 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, 1243 GFP_NOFS); 1244 if (!msg) 1245 goto out_unlocked; 1246 dout("add_cap_releases %p msg %p now %d\n", session, msg, 1247 (int)msg->front.iov_len); 1248 head = msg->front.iov_base; 1249 head->num = cpu_to_le32(0); 1250 msg->front.iov_len = sizeof(*head); 1251 spin_lock(&session->s_cap_lock); 1252 list_add(&msg->list_head, &session->s_cap_releases); 1253 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE; 1254 } 1255 1256 if (partial) { 1257 head = partial->front.iov_base; 1258 num = le32_to_cpu(head->num); 1259 dout(" queueing partial %p with %d/%d\n", partial, num, 1260 (int)CEPH_CAPS_PER_RELEASE); 1261 list_move_tail(&partial->list_head, 1262 &session->s_cap_releases_done); 1263 session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num; 1264 } 1265 err = 0; 1266 spin_unlock(&session->s_cap_lock); 1267 out_unlocked: 1268 return err; 1269 } 1270 1271 /* 1272 * flush all dirty inode data to disk. 1273 * 1274 * returns true if we've flushed through want_flush_seq 1275 */ 1276 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) 1277 { 1278 int mds, ret = 1; 1279 1280 dout("check_cap_flush want %lld\n", want_flush_seq); 1281 mutex_lock(&mdsc->mutex); 1282 for (mds = 0; ret && mds < mdsc->max_sessions; mds++) { 1283 struct ceph_mds_session *session = mdsc->sessions[mds]; 1284 1285 if (!session) 1286 continue; 1287 get_session(session); 1288 mutex_unlock(&mdsc->mutex); 1289 1290 mutex_lock(&session->s_mutex); 1291 if (!list_empty(&session->s_cap_flushing)) { 1292 struct ceph_inode_info *ci = 1293 list_entry(session->s_cap_flushing.next, 1294 struct ceph_inode_info, 1295 i_flushing_item); 1296 struct inode *inode = &ci->vfs_inode; 1297 1298 spin_lock(&inode->i_lock); 1299 if (ci->i_cap_flush_seq <= want_flush_seq) { 1300 dout("check_cap_flush still flushing %p " 1301 "seq %lld <= %lld to mds%d\n", inode, 1302 ci->i_cap_flush_seq, want_flush_seq, 1303 session->s_mds); 1304 ret = 0; 1305 } 1306 spin_unlock(&inode->i_lock); 1307 } 1308 mutex_unlock(&session->s_mutex); 1309 ceph_put_mds_session(session); 1310 1311 if (!ret) 1312 return ret; 1313 mutex_lock(&mdsc->mutex); 1314 } 1315 1316 mutex_unlock(&mdsc->mutex); 1317 dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq); 1318 return ret; 1319 } 1320 1321 /* 1322 * called under s_mutex 1323 */ 1324 void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 1325 struct ceph_mds_session *session) 1326 { 1327 struct ceph_msg *msg; 1328 1329 dout("send_cap_releases mds%d\n", session->s_mds); 1330 spin_lock(&session->s_cap_lock); 1331 while (!list_empty(&session->s_cap_releases_done)) { 1332 msg = list_first_entry(&session->s_cap_releases_done, 1333 struct ceph_msg, list_head); 1334 list_del_init(&msg->list_head); 1335 spin_unlock(&session->s_cap_lock); 1336 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1337 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 1338 ceph_con_send(&session->s_con, msg); 1339 spin_lock(&session->s_cap_lock); 1340 } 1341 spin_unlock(&session->s_cap_lock); 1342 } 1343 1344 static void discard_cap_releases(struct ceph_mds_client *mdsc, 1345 struct ceph_mds_session *session) 1346 { 1347 struct ceph_msg *msg; 1348 struct ceph_mds_cap_release *head; 1349 unsigned num; 1350 1351 dout("discard_cap_releases mds%d\n", session->s_mds); 1352 spin_lock(&session->s_cap_lock); 1353 1354 /* zero out the in-progress message */ 1355 msg = list_first_entry(&session->s_cap_releases, 1356 struct ceph_msg, list_head); 1357 head = msg->front.iov_base; 1358 num = le32_to_cpu(head->num); 1359 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num); 1360 head->num = cpu_to_le32(0); 1361 session->s_num_cap_releases += num; 1362 1363 /* requeue completed messages */ 1364 while (!list_empty(&session->s_cap_releases_done)) { 1365 msg = list_first_entry(&session->s_cap_releases_done, 1366 struct ceph_msg, list_head); 1367 list_del_init(&msg->list_head); 1368 1369 head = msg->front.iov_base; 1370 num = le32_to_cpu(head->num); 1371 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, 1372 num); 1373 session->s_num_cap_releases += num; 1374 head->num = cpu_to_le32(0); 1375 msg->front.iov_len = sizeof(*head); 1376 list_add(&msg->list_head, &session->s_cap_releases); 1377 } 1378 1379 spin_unlock(&session->s_cap_lock); 1380 } 1381 1382 /* 1383 * requests 1384 */ 1385 1386 /* 1387 * Create an mds request. 1388 */ 1389 struct ceph_mds_request * 1390 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 1391 { 1392 struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS); 1393 1394 if (!req) 1395 return ERR_PTR(-ENOMEM); 1396 1397 mutex_init(&req->r_fill_mutex); 1398 req->r_mdsc = mdsc; 1399 req->r_started = jiffies; 1400 req->r_resend_mds = -1; 1401 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 1402 req->r_fmode = -1; 1403 kref_init(&req->r_kref); 1404 INIT_LIST_HEAD(&req->r_wait); 1405 init_completion(&req->r_completion); 1406 init_completion(&req->r_safe_completion); 1407 INIT_LIST_HEAD(&req->r_unsafe_item); 1408 1409 req->r_op = op; 1410 req->r_direct_mode = mode; 1411 return req; 1412 } 1413 1414 /* 1415 * return oldest (lowest) request, tid in request tree, 0 if none. 1416 * 1417 * called under mdsc->mutex. 1418 */ 1419 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 1420 { 1421 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 1422 return NULL; 1423 return rb_entry(rb_first(&mdsc->request_tree), 1424 struct ceph_mds_request, r_node); 1425 } 1426 1427 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 1428 { 1429 struct ceph_mds_request *req = __get_oldest_req(mdsc); 1430 1431 if (req) 1432 return req->r_tid; 1433 return 0; 1434 } 1435 1436 /* 1437 * Build a dentry's path. Allocate on heap; caller must kfree. Based 1438 * on build_path_from_dentry in fs/cifs/dir.c. 1439 * 1440 * If @stop_on_nosnap, generate path relative to the first non-snapped 1441 * inode. 1442 * 1443 * Encode hidden .snap dirs as a double /, i.e. 1444 * foo/.snap/bar -> foo//bar 1445 */ 1446 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base, 1447 int stop_on_nosnap) 1448 { 1449 struct dentry *temp; 1450 char *path; 1451 int len, pos; 1452 unsigned seq; 1453 1454 if (dentry == NULL) 1455 return ERR_PTR(-EINVAL); 1456 1457 retry: 1458 len = 0; 1459 seq = read_seqbegin(&rename_lock); 1460 rcu_read_lock(); 1461 for (temp = dentry; !IS_ROOT(temp);) { 1462 struct inode *inode = temp->d_inode; 1463 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) 1464 len++; /* slash only */ 1465 else if (stop_on_nosnap && inode && 1466 ceph_snap(inode) == CEPH_NOSNAP) 1467 break; 1468 else 1469 len += 1 + temp->d_name.len; 1470 temp = temp->d_parent; 1471 if (temp == NULL) { 1472 rcu_read_unlock(); 1473 pr_err("build_path corrupt dentry %p\n", dentry); 1474 return ERR_PTR(-EINVAL); 1475 } 1476 } 1477 rcu_read_unlock(); 1478 if (len) 1479 len--; /* no leading '/' */ 1480 1481 path = kmalloc(len+1, GFP_NOFS); 1482 if (path == NULL) 1483 return ERR_PTR(-ENOMEM); 1484 pos = len; 1485 path[pos] = 0; /* trailing null */ 1486 rcu_read_lock(); 1487 for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) { 1488 struct inode *inode; 1489 1490 spin_lock(&temp->d_lock); 1491 inode = temp->d_inode; 1492 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 1493 dout("build_path path+%d: %p SNAPDIR\n", 1494 pos, temp); 1495 } else if (stop_on_nosnap && inode && 1496 ceph_snap(inode) == CEPH_NOSNAP) { 1497 break; 1498 } else { 1499 pos -= temp->d_name.len; 1500 if (pos < 0) { 1501 spin_unlock(&temp->d_lock); 1502 break; 1503 } 1504 strncpy(path + pos, temp->d_name.name, 1505 temp->d_name.len); 1506 } 1507 spin_unlock(&temp->d_lock); 1508 if (pos) 1509 path[--pos] = '/'; 1510 temp = temp->d_parent; 1511 if (temp == NULL) { 1512 rcu_read_unlock(); 1513 pr_err("build_path corrupt dentry\n"); 1514 kfree(path); 1515 return ERR_PTR(-EINVAL); 1516 } 1517 } 1518 rcu_read_unlock(); 1519 if (pos != 0 || read_seqretry(&rename_lock, seq)) { 1520 pr_err("build_path did not end path lookup where " 1521 "expected, namelen is %d, pos is %d\n", len, pos); 1522 /* presumably this is only possible if racing with a 1523 rename of one of the parent directories (we can not 1524 lock the dentries above us to prevent this, but 1525 retrying should be harmless) */ 1526 kfree(path); 1527 goto retry; 1528 } 1529 1530 *base = ceph_ino(temp->d_inode); 1531 *plen = len; 1532 dout("build_path on %p %d built %llx '%.*s'\n", 1533 dentry, dentry->d_count, *base, len, path); 1534 return path; 1535 } 1536 1537 static int build_dentry_path(struct dentry *dentry, 1538 const char **ppath, int *ppathlen, u64 *pino, 1539 int *pfreepath) 1540 { 1541 char *path; 1542 1543 if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) { 1544 *pino = ceph_ino(dentry->d_parent->d_inode); 1545 *ppath = dentry->d_name.name; 1546 *ppathlen = dentry->d_name.len; 1547 return 0; 1548 } 1549 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 1550 if (IS_ERR(path)) 1551 return PTR_ERR(path); 1552 *ppath = path; 1553 *pfreepath = 1; 1554 return 0; 1555 } 1556 1557 static int build_inode_path(struct inode *inode, 1558 const char **ppath, int *ppathlen, u64 *pino, 1559 int *pfreepath) 1560 { 1561 struct dentry *dentry; 1562 char *path; 1563 1564 if (ceph_snap(inode) == CEPH_NOSNAP) { 1565 *pino = ceph_ino(inode); 1566 *ppathlen = 0; 1567 return 0; 1568 } 1569 dentry = d_find_alias(inode); 1570 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 1571 dput(dentry); 1572 if (IS_ERR(path)) 1573 return PTR_ERR(path); 1574 *ppath = path; 1575 *pfreepath = 1; 1576 return 0; 1577 } 1578 1579 /* 1580 * request arguments may be specified via an inode *, a dentry *, or 1581 * an explicit ino+path. 1582 */ 1583 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 1584 const char *rpath, u64 rino, 1585 const char **ppath, int *pathlen, 1586 u64 *ino, int *freepath) 1587 { 1588 int r = 0; 1589 1590 if (rinode) { 1591 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 1592 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 1593 ceph_snap(rinode)); 1594 } else if (rdentry) { 1595 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath); 1596 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 1597 *ppath); 1598 } else if (rpath || rino) { 1599 *ino = rino; 1600 *ppath = rpath; 1601 *pathlen = strlen(rpath); 1602 dout(" path %.*s\n", *pathlen, rpath); 1603 } 1604 1605 return r; 1606 } 1607 1608 /* 1609 * called under mdsc->mutex 1610 */ 1611 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, 1612 struct ceph_mds_request *req, 1613 int mds) 1614 { 1615 struct ceph_msg *msg; 1616 struct ceph_mds_request_head *head; 1617 const char *path1 = NULL; 1618 const char *path2 = NULL; 1619 u64 ino1 = 0, ino2 = 0; 1620 int pathlen1 = 0, pathlen2 = 0; 1621 int freepath1 = 0, freepath2 = 0; 1622 int len; 1623 u16 releases; 1624 void *p, *end; 1625 int ret; 1626 1627 ret = set_request_path_attr(req->r_inode, req->r_dentry, 1628 req->r_path1, req->r_ino1.ino, 1629 &path1, &pathlen1, &ino1, &freepath1); 1630 if (ret < 0) { 1631 msg = ERR_PTR(ret); 1632 goto out; 1633 } 1634 1635 ret = set_request_path_attr(NULL, req->r_old_dentry, 1636 req->r_path2, req->r_ino2.ino, 1637 &path2, &pathlen2, &ino2, &freepath2); 1638 if (ret < 0) { 1639 msg = ERR_PTR(ret); 1640 goto out_free1; 1641 } 1642 1643 len = sizeof(*head) + 1644 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)); 1645 1646 /* calculate (max) length for cap releases */ 1647 len += sizeof(struct ceph_mds_request_release) * 1648 (!!req->r_inode_drop + !!req->r_dentry_drop + 1649 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 1650 if (req->r_dentry_drop) 1651 len += req->r_dentry->d_name.len; 1652 if (req->r_old_dentry_drop) 1653 len += req->r_old_dentry->d_name.len; 1654 1655 msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS); 1656 if (!msg) { 1657 msg = ERR_PTR(-ENOMEM); 1658 goto out_free2; 1659 } 1660 1661 msg->hdr.tid = cpu_to_le64(req->r_tid); 1662 1663 head = msg->front.iov_base; 1664 p = msg->front.iov_base + sizeof(*head); 1665 end = msg->front.iov_base + msg->front.iov_len; 1666 1667 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 1668 head->op = cpu_to_le32(req->r_op); 1669 head->caller_uid = cpu_to_le32(req->r_uid); 1670 head->caller_gid = cpu_to_le32(req->r_gid); 1671 head->args = req->r_args; 1672 1673 ceph_encode_filepath(&p, end, ino1, path1); 1674 ceph_encode_filepath(&p, end, ino2, path2); 1675 1676 /* make note of release offset, in case we need to replay */ 1677 req->r_request_release_offset = p - msg->front.iov_base; 1678 1679 /* cap releases */ 1680 releases = 0; 1681 if (req->r_inode_drop) 1682 releases += ceph_encode_inode_release(&p, 1683 req->r_inode ? req->r_inode : req->r_dentry->d_inode, 1684 mds, req->r_inode_drop, req->r_inode_unless, 0); 1685 if (req->r_dentry_drop) 1686 releases += ceph_encode_dentry_release(&p, req->r_dentry, 1687 mds, req->r_dentry_drop, req->r_dentry_unless); 1688 if (req->r_old_dentry_drop) 1689 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 1690 mds, req->r_old_dentry_drop, req->r_old_dentry_unless); 1691 if (req->r_old_inode_drop) 1692 releases += ceph_encode_inode_release(&p, 1693 req->r_old_dentry->d_inode, 1694 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 1695 head->num_releases = cpu_to_le16(releases); 1696 1697 BUG_ON(p > end); 1698 msg->front.iov_len = p - msg->front.iov_base; 1699 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1700 1701 msg->pages = req->r_pages; 1702 msg->nr_pages = req->r_num_pages; 1703 msg->hdr.data_len = cpu_to_le32(req->r_data_len); 1704 msg->hdr.data_off = cpu_to_le16(0); 1705 1706 out_free2: 1707 if (freepath2) 1708 kfree((char *)path2); 1709 out_free1: 1710 if (freepath1) 1711 kfree((char *)path1); 1712 out: 1713 return msg; 1714 } 1715 1716 /* 1717 * called under mdsc->mutex if error, under no mutex if 1718 * success. 1719 */ 1720 static void complete_request(struct ceph_mds_client *mdsc, 1721 struct ceph_mds_request *req) 1722 { 1723 if (req->r_callback) 1724 req->r_callback(mdsc, req); 1725 else 1726 complete_all(&req->r_completion); 1727 } 1728 1729 /* 1730 * called under mdsc->mutex 1731 */ 1732 static int __prepare_send_request(struct ceph_mds_client *mdsc, 1733 struct ceph_mds_request *req, 1734 int mds) 1735 { 1736 struct ceph_mds_request_head *rhead; 1737 struct ceph_msg *msg; 1738 int flags = 0; 1739 1740 req->r_attempts++; 1741 if (req->r_inode) { 1742 struct ceph_cap *cap = 1743 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 1744 1745 if (cap) 1746 req->r_sent_on_mseq = cap->mseq; 1747 else 1748 req->r_sent_on_mseq = -1; 1749 } 1750 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 1751 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 1752 1753 if (req->r_got_unsafe) { 1754 /* 1755 * Replay. Do not regenerate message (and rebuild 1756 * paths, etc.); just use the original message. 1757 * Rebuilding paths will break for renames because 1758 * d_move mangles the src name. 1759 */ 1760 msg = req->r_request; 1761 rhead = msg->front.iov_base; 1762 1763 flags = le32_to_cpu(rhead->flags); 1764 flags |= CEPH_MDS_FLAG_REPLAY; 1765 rhead->flags = cpu_to_le32(flags); 1766 1767 if (req->r_target_inode) 1768 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 1769 1770 rhead->num_retry = req->r_attempts - 1; 1771 1772 /* remove cap/dentry releases from message */ 1773 rhead->num_releases = 0; 1774 msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset); 1775 msg->front.iov_len = req->r_request_release_offset; 1776 return 0; 1777 } 1778 1779 if (req->r_request) { 1780 ceph_msg_put(req->r_request); 1781 req->r_request = NULL; 1782 } 1783 msg = create_request_message(mdsc, req, mds); 1784 if (IS_ERR(msg)) { 1785 req->r_err = PTR_ERR(msg); 1786 complete_request(mdsc, req); 1787 return PTR_ERR(msg); 1788 } 1789 req->r_request = msg; 1790 1791 rhead = msg->front.iov_base; 1792 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 1793 if (req->r_got_unsafe) 1794 flags |= CEPH_MDS_FLAG_REPLAY; 1795 if (req->r_locked_dir) 1796 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 1797 rhead->flags = cpu_to_le32(flags); 1798 rhead->num_fwd = req->r_num_fwd; 1799 rhead->num_retry = req->r_attempts - 1; 1800 rhead->ino = 0; 1801 1802 dout(" r_locked_dir = %p\n", req->r_locked_dir); 1803 return 0; 1804 } 1805 1806 /* 1807 * send request, or put it on the appropriate wait list. 1808 */ 1809 static int __do_request(struct ceph_mds_client *mdsc, 1810 struct ceph_mds_request *req) 1811 { 1812 struct ceph_mds_session *session = NULL; 1813 int mds = -1; 1814 int err = -EAGAIN; 1815 1816 if (req->r_err || req->r_got_result) 1817 goto out; 1818 1819 if (req->r_timeout && 1820 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 1821 dout("do_request timed out\n"); 1822 err = -EIO; 1823 goto finish; 1824 } 1825 1826 put_request_session(req); 1827 1828 mds = __choose_mds(mdsc, req); 1829 if (mds < 0 || 1830 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 1831 dout("do_request no mds or not active, waiting for map\n"); 1832 list_add(&req->r_wait, &mdsc->waiting_for_map); 1833 goto out; 1834 } 1835 1836 /* get, open session */ 1837 session = __ceph_lookup_mds_session(mdsc, mds); 1838 if (!session) { 1839 session = register_session(mdsc, mds); 1840 if (IS_ERR(session)) { 1841 err = PTR_ERR(session); 1842 goto finish; 1843 } 1844 } 1845 req->r_session = get_session(session); 1846 1847 dout("do_request mds%d session %p state %s\n", mds, session, 1848 session_state_name(session->s_state)); 1849 if (session->s_state != CEPH_MDS_SESSION_OPEN && 1850 session->s_state != CEPH_MDS_SESSION_HUNG) { 1851 if (session->s_state == CEPH_MDS_SESSION_NEW || 1852 session->s_state == CEPH_MDS_SESSION_CLOSING) 1853 __open_session(mdsc, session); 1854 list_add(&req->r_wait, &session->s_waiting); 1855 goto out_session; 1856 } 1857 1858 /* send request */ 1859 req->r_resend_mds = -1; /* forget any previous mds hint */ 1860 1861 if (req->r_request_started == 0) /* note request start time */ 1862 req->r_request_started = jiffies; 1863 1864 err = __prepare_send_request(mdsc, req, mds); 1865 if (!err) { 1866 ceph_msg_get(req->r_request); 1867 ceph_con_send(&session->s_con, req->r_request); 1868 } 1869 1870 out_session: 1871 ceph_put_mds_session(session); 1872 out: 1873 return err; 1874 1875 finish: 1876 req->r_err = err; 1877 complete_request(mdsc, req); 1878 goto out; 1879 } 1880 1881 /* 1882 * called under mdsc->mutex 1883 */ 1884 static void __wake_requests(struct ceph_mds_client *mdsc, 1885 struct list_head *head) 1886 { 1887 struct ceph_mds_request *req, *nreq; 1888 1889 list_for_each_entry_safe(req, nreq, head, r_wait) { 1890 list_del_init(&req->r_wait); 1891 __do_request(mdsc, req); 1892 } 1893 } 1894 1895 /* 1896 * Wake up threads with requests pending for @mds, so that they can 1897 * resubmit their requests to a possibly different mds. 1898 */ 1899 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 1900 { 1901 struct ceph_mds_request *req; 1902 struct rb_node *p; 1903 1904 dout("kick_requests mds%d\n", mds); 1905 for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) { 1906 req = rb_entry(p, struct ceph_mds_request, r_node); 1907 if (req->r_got_unsafe) 1908 continue; 1909 if (req->r_session && 1910 req->r_session->s_mds == mds) { 1911 dout(" kicking tid %llu\n", req->r_tid); 1912 __do_request(mdsc, req); 1913 } 1914 } 1915 } 1916 1917 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, 1918 struct ceph_mds_request *req) 1919 { 1920 dout("submit_request on %p\n", req); 1921 mutex_lock(&mdsc->mutex); 1922 __register_request(mdsc, req, NULL); 1923 __do_request(mdsc, req); 1924 mutex_unlock(&mdsc->mutex); 1925 } 1926 1927 /* 1928 * Synchrously perform an mds request. Take care of all of the 1929 * session setup, forwarding, retry details. 1930 */ 1931 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 1932 struct inode *dir, 1933 struct ceph_mds_request *req) 1934 { 1935 int err; 1936 1937 dout("do_request on %p\n", req); 1938 1939 /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */ 1940 if (req->r_inode) 1941 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 1942 if (req->r_locked_dir) 1943 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 1944 if (req->r_old_dentry) 1945 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 1946 CEPH_CAP_PIN); 1947 1948 /* issue */ 1949 mutex_lock(&mdsc->mutex); 1950 __register_request(mdsc, req, dir); 1951 __do_request(mdsc, req); 1952 1953 if (req->r_err) { 1954 err = req->r_err; 1955 __unregister_request(mdsc, req); 1956 dout("do_request early error %d\n", err); 1957 goto out; 1958 } 1959 1960 /* wait */ 1961 mutex_unlock(&mdsc->mutex); 1962 dout("do_request waiting\n"); 1963 if (req->r_timeout) { 1964 err = (long)wait_for_completion_killable_timeout( 1965 &req->r_completion, req->r_timeout); 1966 if (err == 0) 1967 err = -EIO; 1968 } else { 1969 err = wait_for_completion_killable(&req->r_completion); 1970 } 1971 dout("do_request waited, got %d\n", err); 1972 mutex_lock(&mdsc->mutex); 1973 1974 /* only abort if we didn't race with a real reply */ 1975 if (req->r_got_result) { 1976 err = le32_to_cpu(req->r_reply_info.head->result); 1977 } else if (err < 0) { 1978 dout("aborted request %lld with %d\n", req->r_tid, err); 1979 1980 /* 1981 * ensure we aren't running concurrently with 1982 * ceph_fill_trace or ceph_readdir_prepopulate, which 1983 * rely on locks (dir mutex) held by our caller. 1984 */ 1985 mutex_lock(&req->r_fill_mutex); 1986 req->r_err = err; 1987 req->r_aborted = true; 1988 mutex_unlock(&req->r_fill_mutex); 1989 1990 if (req->r_locked_dir && 1991 (req->r_op & CEPH_MDS_OP_WRITE)) 1992 ceph_invalidate_dir_request(req); 1993 } else { 1994 err = req->r_err; 1995 } 1996 1997 out: 1998 mutex_unlock(&mdsc->mutex); 1999 dout("do_request %p done, result %d\n", req, err); 2000 return err; 2001 } 2002 2003 /* 2004 * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS 2005 * namespace request. 2006 */ 2007 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 2008 { 2009 struct inode *inode = req->r_locked_dir; 2010 struct ceph_inode_info *ci = ceph_inode(inode); 2011 2012 dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode); 2013 spin_lock(&inode->i_lock); 2014 ci->i_ceph_flags &= ~CEPH_I_COMPLETE; 2015 ci->i_release_count++; 2016 spin_unlock(&inode->i_lock); 2017 2018 if (req->r_dentry) 2019 ceph_invalidate_dentry_lease(req->r_dentry); 2020 if (req->r_old_dentry) 2021 ceph_invalidate_dentry_lease(req->r_old_dentry); 2022 } 2023 2024 /* 2025 * Handle mds reply. 2026 * 2027 * We take the session mutex and parse and process the reply immediately. 2028 * This preserves the logical ordering of replies, capabilities, etc., sent 2029 * by the MDS as they are applied to our local cache. 2030 */ 2031 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 2032 { 2033 struct ceph_mds_client *mdsc = session->s_mdsc; 2034 struct ceph_mds_request *req; 2035 struct ceph_mds_reply_head *head = msg->front.iov_base; 2036 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 2037 u64 tid; 2038 int err, result; 2039 int mds = session->s_mds; 2040 2041 if (msg->front.iov_len < sizeof(*head)) { 2042 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 2043 ceph_msg_dump(msg); 2044 return; 2045 } 2046 2047 /* get request, session */ 2048 tid = le64_to_cpu(msg->hdr.tid); 2049 mutex_lock(&mdsc->mutex); 2050 req = __lookup_request(mdsc, tid); 2051 if (!req) { 2052 dout("handle_reply on unknown tid %llu\n", tid); 2053 mutex_unlock(&mdsc->mutex); 2054 return; 2055 } 2056 dout("handle_reply %p\n", req); 2057 2058 /* correct session? */ 2059 if (req->r_session != session) { 2060 pr_err("mdsc_handle_reply got %llu on session mds%d" 2061 " not mds%d\n", tid, session->s_mds, 2062 req->r_session ? req->r_session->s_mds : -1); 2063 mutex_unlock(&mdsc->mutex); 2064 goto out; 2065 } 2066 2067 /* dup? */ 2068 if ((req->r_got_unsafe && !head->safe) || 2069 (req->r_got_safe && head->safe)) { 2070 pr_warning("got a dup %s reply on %llu from mds%d\n", 2071 head->safe ? "safe" : "unsafe", tid, mds); 2072 mutex_unlock(&mdsc->mutex); 2073 goto out; 2074 } 2075 if (req->r_got_safe && !head->safe) { 2076 pr_warning("got unsafe after safe on %llu from mds%d\n", 2077 tid, mds); 2078 mutex_unlock(&mdsc->mutex); 2079 goto out; 2080 } 2081 2082 result = le32_to_cpu(head->result); 2083 2084 /* 2085 * Handle an ESTALE 2086 * if we're not talking to the authority, send to them 2087 * if the authority has changed while we weren't looking, 2088 * send to new authority 2089 * Otherwise we just have to return an ESTALE 2090 */ 2091 if (result == -ESTALE) { 2092 dout("got ESTALE on request %llu", req->r_tid); 2093 if (!req->r_inode) { 2094 /* do nothing; not an authority problem */ 2095 } else if (req->r_direct_mode != USE_AUTH_MDS) { 2096 dout("not using auth, setting for that now"); 2097 req->r_direct_mode = USE_AUTH_MDS; 2098 __do_request(mdsc, req); 2099 mutex_unlock(&mdsc->mutex); 2100 goto out; 2101 } else { 2102 struct ceph_inode_info *ci = ceph_inode(req->r_inode); 2103 struct ceph_cap *cap = NULL; 2104 2105 if (req->r_session) 2106 cap = ceph_get_cap_for_mds(ci, 2107 req->r_session->s_mds); 2108 2109 dout("already using auth"); 2110 if ((!cap || cap != ci->i_auth_cap) || 2111 (cap->mseq != req->r_sent_on_mseq)) { 2112 dout("but cap changed, so resending"); 2113 __do_request(mdsc, req); 2114 mutex_unlock(&mdsc->mutex); 2115 goto out; 2116 } 2117 } 2118 dout("have to return ESTALE on request %llu", req->r_tid); 2119 } 2120 2121 2122 if (head->safe) { 2123 req->r_got_safe = true; 2124 __unregister_request(mdsc, req); 2125 complete_all(&req->r_safe_completion); 2126 2127 if (req->r_got_unsafe) { 2128 /* 2129 * We already handled the unsafe response, now do the 2130 * cleanup. No need to examine the response; the MDS 2131 * doesn't include any result info in the safe 2132 * response. And even if it did, there is nothing 2133 * useful we could do with a revised return value. 2134 */ 2135 dout("got safe reply %llu, mds%d\n", tid, mds); 2136 list_del_init(&req->r_unsafe_item); 2137 2138 /* last unsafe request during umount? */ 2139 if (mdsc->stopping && !__get_oldest_req(mdsc)) 2140 complete_all(&mdsc->safe_umount_waiters); 2141 mutex_unlock(&mdsc->mutex); 2142 goto out; 2143 } 2144 } else { 2145 req->r_got_unsafe = true; 2146 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 2147 } 2148 2149 dout("handle_reply tid %lld result %d\n", tid, result); 2150 rinfo = &req->r_reply_info; 2151 err = parse_reply_info(msg, rinfo, session->s_con.peer_features); 2152 mutex_unlock(&mdsc->mutex); 2153 2154 mutex_lock(&session->s_mutex); 2155 if (err < 0) { 2156 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 2157 ceph_msg_dump(msg); 2158 goto out_err; 2159 } 2160 2161 /* snap trace */ 2162 if (rinfo->snapblob_len) { 2163 down_write(&mdsc->snap_rwsem); 2164 ceph_update_snap_trace(mdsc, rinfo->snapblob, 2165 rinfo->snapblob + rinfo->snapblob_len, 2166 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP); 2167 downgrade_write(&mdsc->snap_rwsem); 2168 } else { 2169 down_read(&mdsc->snap_rwsem); 2170 } 2171 2172 /* insert trace into our cache */ 2173 mutex_lock(&req->r_fill_mutex); 2174 err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); 2175 if (err == 0) { 2176 if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK && 2177 rinfo->dir_nr) 2178 ceph_readdir_prepopulate(req, req->r_session); 2179 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 2180 } 2181 mutex_unlock(&req->r_fill_mutex); 2182 2183 up_read(&mdsc->snap_rwsem); 2184 out_err: 2185 mutex_lock(&mdsc->mutex); 2186 if (!req->r_aborted) { 2187 if (err) { 2188 req->r_err = err; 2189 } else { 2190 req->r_reply = msg; 2191 ceph_msg_get(msg); 2192 req->r_got_result = true; 2193 } 2194 } else { 2195 dout("reply arrived after request %lld was aborted\n", tid); 2196 } 2197 mutex_unlock(&mdsc->mutex); 2198 2199 ceph_add_cap_releases(mdsc, req->r_session); 2200 mutex_unlock(&session->s_mutex); 2201 2202 /* kick calling process */ 2203 complete_request(mdsc, req); 2204 out: 2205 ceph_mdsc_put_request(req); 2206 return; 2207 } 2208 2209 2210 2211 /* 2212 * handle mds notification that our request has been forwarded. 2213 */ 2214 static void handle_forward(struct ceph_mds_client *mdsc, 2215 struct ceph_mds_session *session, 2216 struct ceph_msg *msg) 2217 { 2218 struct ceph_mds_request *req; 2219 u64 tid = le64_to_cpu(msg->hdr.tid); 2220 u32 next_mds; 2221 u32 fwd_seq; 2222 int err = -EINVAL; 2223 void *p = msg->front.iov_base; 2224 void *end = p + msg->front.iov_len; 2225 2226 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 2227 next_mds = ceph_decode_32(&p); 2228 fwd_seq = ceph_decode_32(&p); 2229 2230 mutex_lock(&mdsc->mutex); 2231 req = __lookup_request(mdsc, tid); 2232 if (!req) { 2233 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 2234 goto out; /* dup reply? */ 2235 } 2236 2237 if (req->r_aborted) { 2238 dout("forward tid %llu aborted, unregistering\n", tid); 2239 __unregister_request(mdsc, req); 2240 } else if (fwd_seq <= req->r_num_fwd) { 2241 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 2242 tid, next_mds, req->r_num_fwd, fwd_seq); 2243 } else { 2244 /* resend. forward race not possible; mds would drop */ 2245 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 2246 BUG_ON(req->r_err); 2247 BUG_ON(req->r_got_result); 2248 req->r_num_fwd = fwd_seq; 2249 req->r_resend_mds = next_mds; 2250 put_request_session(req); 2251 __do_request(mdsc, req); 2252 } 2253 ceph_mdsc_put_request(req); 2254 out: 2255 mutex_unlock(&mdsc->mutex); 2256 return; 2257 2258 bad: 2259 pr_err("mdsc_handle_forward decode error err=%d\n", err); 2260 } 2261 2262 /* 2263 * handle a mds session control message 2264 */ 2265 static void handle_session(struct ceph_mds_session *session, 2266 struct ceph_msg *msg) 2267 { 2268 struct ceph_mds_client *mdsc = session->s_mdsc; 2269 u32 op; 2270 u64 seq; 2271 int mds = session->s_mds; 2272 struct ceph_mds_session_head *h = msg->front.iov_base; 2273 int wake = 0; 2274 2275 /* decode */ 2276 if (msg->front.iov_len != sizeof(*h)) 2277 goto bad; 2278 op = le32_to_cpu(h->op); 2279 seq = le64_to_cpu(h->seq); 2280 2281 mutex_lock(&mdsc->mutex); 2282 if (op == CEPH_SESSION_CLOSE) 2283 __unregister_session(mdsc, session); 2284 /* FIXME: this ttl calculation is generous */ 2285 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 2286 mutex_unlock(&mdsc->mutex); 2287 2288 mutex_lock(&session->s_mutex); 2289 2290 dout("handle_session mds%d %s %p state %s seq %llu\n", 2291 mds, ceph_session_op_name(op), session, 2292 session_state_name(session->s_state), seq); 2293 2294 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 2295 session->s_state = CEPH_MDS_SESSION_OPEN; 2296 pr_info("mds%d came back\n", session->s_mds); 2297 } 2298 2299 switch (op) { 2300 case CEPH_SESSION_OPEN: 2301 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 2302 pr_info("mds%d reconnect success\n", session->s_mds); 2303 session->s_state = CEPH_MDS_SESSION_OPEN; 2304 renewed_caps(mdsc, session, 0); 2305 wake = 1; 2306 if (mdsc->stopping) 2307 __close_session(mdsc, session); 2308 break; 2309 2310 case CEPH_SESSION_RENEWCAPS: 2311 if (session->s_renew_seq == seq) 2312 renewed_caps(mdsc, session, 1); 2313 break; 2314 2315 case CEPH_SESSION_CLOSE: 2316 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 2317 pr_info("mds%d reconnect denied\n", session->s_mds); 2318 remove_session_caps(session); 2319 wake = 1; /* for good measure */ 2320 wake_up_all(&mdsc->session_close_wq); 2321 kick_requests(mdsc, mds); 2322 break; 2323 2324 case CEPH_SESSION_STALE: 2325 pr_info("mds%d caps went stale, renewing\n", 2326 session->s_mds); 2327 spin_lock(&session->s_cap_lock); 2328 session->s_cap_gen++; 2329 session->s_cap_ttl = 0; 2330 spin_unlock(&session->s_cap_lock); 2331 send_renew_caps(mdsc, session); 2332 break; 2333 2334 case CEPH_SESSION_RECALL_STATE: 2335 trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 2336 break; 2337 2338 default: 2339 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 2340 WARN_ON(1); 2341 } 2342 2343 mutex_unlock(&session->s_mutex); 2344 if (wake) { 2345 mutex_lock(&mdsc->mutex); 2346 __wake_requests(mdsc, &session->s_waiting); 2347 mutex_unlock(&mdsc->mutex); 2348 } 2349 return; 2350 2351 bad: 2352 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 2353 (int)msg->front.iov_len); 2354 ceph_msg_dump(msg); 2355 return; 2356 } 2357 2358 2359 /* 2360 * called under session->mutex. 2361 */ 2362 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 2363 struct ceph_mds_session *session) 2364 { 2365 struct ceph_mds_request *req, *nreq; 2366 int err; 2367 2368 dout("replay_unsafe_requests mds%d\n", session->s_mds); 2369 2370 mutex_lock(&mdsc->mutex); 2371 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) { 2372 err = __prepare_send_request(mdsc, req, session->s_mds); 2373 if (!err) { 2374 ceph_msg_get(req->r_request); 2375 ceph_con_send(&session->s_con, req->r_request); 2376 } 2377 } 2378 mutex_unlock(&mdsc->mutex); 2379 } 2380 2381 /* 2382 * Encode information about a cap for a reconnect with the MDS. 2383 */ 2384 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, 2385 void *arg) 2386 { 2387 union { 2388 struct ceph_mds_cap_reconnect v2; 2389 struct ceph_mds_cap_reconnect_v1 v1; 2390 } rec; 2391 size_t reclen; 2392 struct ceph_inode_info *ci; 2393 struct ceph_reconnect_state *recon_state = arg; 2394 struct ceph_pagelist *pagelist = recon_state->pagelist; 2395 char *path; 2396 int pathlen, err; 2397 u64 pathbase; 2398 struct dentry *dentry; 2399 2400 ci = cap->ci; 2401 2402 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 2403 inode, ceph_vinop(inode), cap, cap->cap_id, 2404 ceph_cap_string(cap->issued)); 2405 err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 2406 if (err) 2407 return err; 2408 2409 dentry = d_find_alias(inode); 2410 if (dentry) { 2411 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0); 2412 if (IS_ERR(path)) { 2413 err = PTR_ERR(path); 2414 goto out_dput; 2415 } 2416 } else { 2417 path = NULL; 2418 pathlen = 0; 2419 } 2420 err = ceph_pagelist_encode_string(pagelist, path, pathlen); 2421 if (err) 2422 goto out_free; 2423 2424 spin_lock(&inode->i_lock); 2425 cap->seq = 0; /* reset cap seq */ 2426 cap->issue_seq = 0; /* and issue_seq */ 2427 2428 if (recon_state->flock) { 2429 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 2430 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 2431 rec.v2.issued = cpu_to_le32(cap->issued); 2432 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 2433 rec.v2.pathbase = cpu_to_le64(pathbase); 2434 rec.v2.flock_len = 0; 2435 reclen = sizeof(rec.v2); 2436 } else { 2437 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 2438 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 2439 rec.v1.issued = cpu_to_le32(cap->issued); 2440 rec.v1.size = cpu_to_le64(inode->i_size); 2441 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime); 2442 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime); 2443 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 2444 rec.v1.pathbase = cpu_to_le64(pathbase); 2445 reclen = sizeof(rec.v1); 2446 } 2447 spin_unlock(&inode->i_lock); 2448 2449 if (recon_state->flock) { 2450 int num_fcntl_locks, num_flock_locks; 2451 struct ceph_pagelist_cursor trunc_point; 2452 2453 ceph_pagelist_set_cursor(pagelist, &trunc_point); 2454 do { 2455 lock_flocks(); 2456 ceph_count_locks(inode, &num_fcntl_locks, 2457 &num_flock_locks); 2458 rec.v2.flock_len = (2*sizeof(u32) + 2459 (num_fcntl_locks+num_flock_locks) * 2460 sizeof(struct ceph_filelock)); 2461 unlock_flocks(); 2462 2463 /* pre-alloc pagelist */ 2464 ceph_pagelist_truncate(pagelist, &trunc_point); 2465 err = ceph_pagelist_append(pagelist, &rec, reclen); 2466 if (!err) 2467 err = ceph_pagelist_reserve(pagelist, 2468 rec.v2.flock_len); 2469 2470 /* encode locks */ 2471 if (!err) { 2472 lock_flocks(); 2473 err = ceph_encode_locks(inode, 2474 pagelist, 2475 num_fcntl_locks, 2476 num_flock_locks); 2477 unlock_flocks(); 2478 } 2479 } while (err == -ENOSPC); 2480 } else { 2481 err = ceph_pagelist_append(pagelist, &rec, reclen); 2482 } 2483 2484 out_free: 2485 kfree(path); 2486 out_dput: 2487 dput(dentry); 2488 return err; 2489 } 2490 2491 2492 /* 2493 * If an MDS fails and recovers, clients need to reconnect in order to 2494 * reestablish shared state. This includes all caps issued through 2495 * this session _and_ the snap_realm hierarchy. Because it's not 2496 * clear which snap realms the mds cares about, we send everything we 2497 * know about.. that ensures we'll then get any new info the 2498 * recovering MDS might have. 2499 * 2500 * This is a relatively heavyweight operation, but it's rare. 2501 * 2502 * called with mdsc->mutex held. 2503 */ 2504 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 2505 struct ceph_mds_session *session) 2506 { 2507 struct ceph_msg *reply; 2508 struct rb_node *p; 2509 int mds = session->s_mds; 2510 int err = -ENOMEM; 2511 struct ceph_pagelist *pagelist; 2512 struct ceph_reconnect_state recon_state; 2513 2514 pr_info("mds%d reconnect start\n", mds); 2515 2516 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 2517 if (!pagelist) 2518 goto fail_nopagelist; 2519 ceph_pagelist_init(pagelist); 2520 2521 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS); 2522 if (!reply) 2523 goto fail_nomsg; 2524 2525 mutex_lock(&session->s_mutex); 2526 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 2527 session->s_seq = 0; 2528 2529 ceph_con_open(&session->s_con, 2530 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 2531 2532 /* replay unsafe requests */ 2533 replay_unsafe_requests(mdsc, session); 2534 2535 down_read(&mdsc->snap_rwsem); 2536 2537 dout("session %p state %s\n", session, 2538 session_state_name(session->s_state)); 2539 2540 /* drop old cap expires; we're about to reestablish that state */ 2541 discard_cap_releases(mdsc, session); 2542 2543 /* traverse this session's caps */ 2544 err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps); 2545 if (err) 2546 goto fail; 2547 2548 recon_state.pagelist = pagelist; 2549 recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK; 2550 err = iterate_session_caps(session, encode_caps_cb, &recon_state); 2551 if (err < 0) 2552 goto fail; 2553 2554 /* 2555 * snaprealms. we provide mds with the ino, seq (version), and 2556 * parent for all of our realms. If the mds has any newer info, 2557 * it will tell us. 2558 */ 2559 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 2560 struct ceph_snap_realm *realm = 2561 rb_entry(p, struct ceph_snap_realm, node); 2562 struct ceph_mds_snaprealm_reconnect sr_rec; 2563 2564 dout(" adding snap realm %llx seq %lld parent %llx\n", 2565 realm->ino, realm->seq, realm->parent_ino); 2566 sr_rec.ino = cpu_to_le64(realm->ino); 2567 sr_rec.seq = cpu_to_le64(realm->seq); 2568 sr_rec.parent = cpu_to_le64(realm->parent_ino); 2569 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 2570 if (err) 2571 goto fail; 2572 } 2573 2574 reply->pagelist = pagelist; 2575 if (recon_state.flock) 2576 reply->hdr.version = cpu_to_le16(2); 2577 reply->hdr.data_len = cpu_to_le32(pagelist->length); 2578 reply->nr_pages = calc_pages_for(0, pagelist->length); 2579 ceph_con_send(&session->s_con, reply); 2580 2581 mutex_unlock(&session->s_mutex); 2582 2583 mutex_lock(&mdsc->mutex); 2584 __wake_requests(mdsc, &session->s_waiting); 2585 mutex_unlock(&mdsc->mutex); 2586 2587 up_read(&mdsc->snap_rwsem); 2588 return; 2589 2590 fail: 2591 ceph_msg_put(reply); 2592 up_read(&mdsc->snap_rwsem); 2593 mutex_unlock(&session->s_mutex); 2594 fail_nomsg: 2595 ceph_pagelist_release(pagelist); 2596 kfree(pagelist); 2597 fail_nopagelist: 2598 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 2599 return; 2600 } 2601 2602 2603 /* 2604 * compare old and new mdsmaps, kicking requests 2605 * and closing out old connections as necessary 2606 * 2607 * called under mdsc->mutex. 2608 */ 2609 static void check_new_map(struct ceph_mds_client *mdsc, 2610 struct ceph_mdsmap *newmap, 2611 struct ceph_mdsmap *oldmap) 2612 { 2613 int i; 2614 int oldstate, newstate; 2615 struct ceph_mds_session *s; 2616 2617 dout("check_new_map new %u old %u\n", 2618 newmap->m_epoch, oldmap->m_epoch); 2619 2620 for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) { 2621 if (mdsc->sessions[i] == NULL) 2622 continue; 2623 s = mdsc->sessions[i]; 2624 oldstate = ceph_mdsmap_get_state(oldmap, i); 2625 newstate = ceph_mdsmap_get_state(newmap, i); 2626 2627 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 2628 i, ceph_mds_state_name(oldstate), 2629 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 2630 ceph_mds_state_name(newstate), 2631 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 2632 session_state_name(s->s_state)); 2633 2634 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 2635 ceph_mdsmap_get_addr(newmap, i), 2636 sizeof(struct ceph_entity_addr))) { 2637 if (s->s_state == CEPH_MDS_SESSION_OPENING) { 2638 /* the session never opened, just close it 2639 * out now */ 2640 __wake_requests(mdsc, &s->s_waiting); 2641 __unregister_session(mdsc, s); 2642 } else { 2643 /* just close it */ 2644 mutex_unlock(&mdsc->mutex); 2645 mutex_lock(&s->s_mutex); 2646 mutex_lock(&mdsc->mutex); 2647 ceph_con_close(&s->s_con); 2648 mutex_unlock(&s->s_mutex); 2649 s->s_state = CEPH_MDS_SESSION_RESTARTING; 2650 } 2651 2652 /* kick any requests waiting on the recovering mds */ 2653 kick_requests(mdsc, i); 2654 } else if (oldstate == newstate) { 2655 continue; /* nothing new with this mds */ 2656 } 2657 2658 /* 2659 * send reconnect? 2660 */ 2661 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 2662 newstate >= CEPH_MDS_STATE_RECONNECT) { 2663 mutex_unlock(&mdsc->mutex); 2664 send_mds_reconnect(mdsc, s); 2665 mutex_lock(&mdsc->mutex); 2666 } 2667 2668 /* 2669 * kick request on any mds that has gone active. 2670 */ 2671 if (oldstate < CEPH_MDS_STATE_ACTIVE && 2672 newstate >= CEPH_MDS_STATE_ACTIVE) { 2673 if (oldstate != CEPH_MDS_STATE_CREATING && 2674 oldstate != CEPH_MDS_STATE_STARTING) 2675 pr_info("mds%d recovery completed\n", s->s_mds); 2676 kick_requests(mdsc, i); 2677 ceph_kick_flushing_caps(mdsc, s); 2678 wake_up_session_caps(s, 1); 2679 } 2680 } 2681 2682 for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) { 2683 s = mdsc->sessions[i]; 2684 if (!s) 2685 continue; 2686 if (!ceph_mdsmap_is_laggy(newmap, i)) 2687 continue; 2688 if (s->s_state == CEPH_MDS_SESSION_OPEN || 2689 s->s_state == CEPH_MDS_SESSION_HUNG || 2690 s->s_state == CEPH_MDS_SESSION_CLOSING) { 2691 dout(" connecting to export targets of laggy mds%d\n", 2692 i); 2693 __open_export_target_sessions(mdsc, s); 2694 } 2695 } 2696 } 2697 2698 2699 2700 /* 2701 * leases 2702 */ 2703 2704 /* 2705 * caller must hold session s_mutex, dentry->d_lock 2706 */ 2707 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 2708 { 2709 struct ceph_dentry_info *di = ceph_dentry(dentry); 2710 2711 ceph_put_mds_session(di->lease_session); 2712 di->lease_session = NULL; 2713 } 2714 2715 static void handle_lease(struct ceph_mds_client *mdsc, 2716 struct ceph_mds_session *session, 2717 struct ceph_msg *msg) 2718 { 2719 struct super_block *sb = mdsc->fsc->sb; 2720 struct inode *inode; 2721 struct dentry *parent, *dentry; 2722 struct ceph_dentry_info *di; 2723 int mds = session->s_mds; 2724 struct ceph_mds_lease *h = msg->front.iov_base; 2725 u32 seq; 2726 struct ceph_vino vino; 2727 struct qstr dname; 2728 int release = 0; 2729 2730 dout("handle_lease from mds%d\n", mds); 2731 2732 /* decode */ 2733 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 2734 goto bad; 2735 vino.ino = le64_to_cpu(h->ino); 2736 vino.snap = CEPH_NOSNAP; 2737 seq = le32_to_cpu(h->seq); 2738 dname.name = (void *)h + sizeof(*h) + sizeof(u32); 2739 dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32); 2740 if (dname.len != get_unaligned_le32(h+1)) 2741 goto bad; 2742 2743 mutex_lock(&session->s_mutex); 2744 session->s_seq++; 2745 2746 /* lookup inode */ 2747 inode = ceph_find_inode(sb, vino); 2748 dout("handle_lease %s, ino %llx %p %.*s\n", 2749 ceph_lease_op_name(h->action), vino.ino, inode, 2750 dname.len, dname.name); 2751 if (inode == NULL) { 2752 dout("handle_lease no inode %llx\n", vino.ino); 2753 goto release; 2754 } 2755 2756 /* dentry */ 2757 parent = d_find_alias(inode); 2758 if (!parent) { 2759 dout("no parent dentry on inode %p\n", inode); 2760 WARN_ON(1); 2761 goto release; /* hrm... */ 2762 } 2763 dname.hash = full_name_hash(dname.name, dname.len); 2764 dentry = d_lookup(parent, &dname); 2765 dput(parent); 2766 if (!dentry) 2767 goto release; 2768 2769 spin_lock(&dentry->d_lock); 2770 di = ceph_dentry(dentry); 2771 switch (h->action) { 2772 case CEPH_MDS_LEASE_REVOKE: 2773 if (di && di->lease_session == session) { 2774 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 2775 h->seq = cpu_to_le32(di->lease_seq); 2776 __ceph_mdsc_drop_dentry_lease(dentry); 2777 } 2778 release = 1; 2779 break; 2780 2781 case CEPH_MDS_LEASE_RENEW: 2782 if (di && di->lease_session == session && 2783 di->lease_gen == session->s_cap_gen && 2784 di->lease_renew_from && 2785 di->lease_renew_after == 0) { 2786 unsigned long duration = 2787 le32_to_cpu(h->duration_ms) * HZ / 1000; 2788 2789 di->lease_seq = seq; 2790 dentry->d_time = di->lease_renew_from + duration; 2791 di->lease_renew_after = di->lease_renew_from + 2792 (duration >> 1); 2793 di->lease_renew_from = 0; 2794 } 2795 break; 2796 } 2797 spin_unlock(&dentry->d_lock); 2798 dput(dentry); 2799 2800 if (!release) 2801 goto out; 2802 2803 release: 2804 /* let's just reuse the same message */ 2805 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 2806 ceph_msg_get(msg); 2807 ceph_con_send(&session->s_con, msg); 2808 2809 out: 2810 iput(inode); 2811 mutex_unlock(&session->s_mutex); 2812 return; 2813 2814 bad: 2815 pr_err("corrupt lease message\n"); 2816 ceph_msg_dump(msg); 2817 } 2818 2819 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 2820 struct inode *inode, 2821 struct dentry *dentry, char action, 2822 u32 seq) 2823 { 2824 struct ceph_msg *msg; 2825 struct ceph_mds_lease *lease; 2826 int len = sizeof(*lease) + sizeof(u32); 2827 int dnamelen = 0; 2828 2829 dout("lease_send_msg inode %p dentry %p %s to mds%d\n", 2830 inode, dentry, ceph_lease_op_name(action), session->s_mds); 2831 dnamelen = dentry->d_name.len; 2832 len += dnamelen; 2833 2834 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS); 2835 if (!msg) 2836 return; 2837 lease = msg->front.iov_base; 2838 lease->action = action; 2839 lease->ino = cpu_to_le64(ceph_vino(inode).ino); 2840 lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap); 2841 lease->seq = cpu_to_le32(seq); 2842 put_unaligned_le32(dnamelen, lease + 1); 2843 memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen); 2844 2845 /* 2846 * if this is a preemptive lease RELEASE, no need to 2847 * flush request stream, since the actual request will 2848 * soon follow. 2849 */ 2850 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 2851 2852 ceph_con_send(&session->s_con, msg); 2853 } 2854 2855 /* 2856 * Preemptively release a lease we expect to invalidate anyway. 2857 * Pass @inode always, @dentry is optional. 2858 */ 2859 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, 2860 struct dentry *dentry) 2861 { 2862 struct ceph_dentry_info *di; 2863 struct ceph_mds_session *session; 2864 u32 seq; 2865 2866 BUG_ON(inode == NULL); 2867 BUG_ON(dentry == NULL); 2868 2869 /* is dentry lease valid? */ 2870 spin_lock(&dentry->d_lock); 2871 di = ceph_dentry(dentry); 2872 if (!di || !di->lease_session || 2873 di->lease_session->s_mds < 0 || 2874 di->lease_gen != di->lease_session->s_cap_gen || 2875 !time_before(jiffies, dentry->d_time)) { 2876 dout("lease_release inode %p dentry %p -- " 2877 "no lease\n", 2878 inode, dentry); 2879 spin_unlock(&dentry->d_lock); 2880 return; 2881 } 2882 2883 /* we do have a lease on this dentry; note mds and seq */ 2884 session = ceph_get_mds_session(di->lease_session); 2885 seq = di->lease_seq; 2886 __ceph_mdsc_drop_dentry_lease(dentry); 2887 spin_unlock(&dentry->d_lock); 2888 2889 dout("lease_release inode %p dentry %p to mds%d\n", 2890 inode, dentry, session->s_mds); 2891 ceph_mdsc_lease_send_msg(session, inode, dentry, 2892 CEPH_MDS_LEASE_RELEASE, seq); 2893 ceph_put_mds_session(session); 2894 } 2895 2896 /* 2897 * drop all leases (and dentry refs) in preparation for umount 2898 */ 2899 static void drop_leases(struct ceph_mds_client *mdsc) 2900 { 2901 int i; 2902 2903 dout("drop_leases\n"); 2904 mutex_lock(&mdsc->mutex); 2905 for (i = 0; i < mdsc->max_sessions; i++) { 2906 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 2907 if (!s) 2908 continue; 2909 mutex_unlock(&mdsc->mutex); 2910 mutex_lock(&s->s_mutex); 2911 mutex_unlock(&s->s_mutex); 2912 ceph_put_mds_session(s); 2913 mutex_lock(&mdsc->mutex); 2914 } 2915 mutex_unlock(&mdsc->mutex); 2916 } 2917 2918 2919 2920 /* 2921 * delayed work -- periodically trim expired leases, renew caps with mds 2922 */ 2923 static void schedule_delayed(struct ceph_mds_client *mdsc) 2924 { 2925 int delay = 5; 2926 unsigned hz = round_jiffies_relative(HZ * delay); 2927 schedule_delayed_work(&mdsc->delayed_work, hz); 2928 } 2929 2930 static void delayed_work(struct work_struct *work) 2931 { 2932 int i; 2933 struct ceph_mds_client *mdsc = 2934 container_of(work, struct ceph_mds_client, delayed_work.work); 2935 int renew_interval; 2936 int renew_caps; 2937 2938 dout("mdsc delayed_work\n"); 2939 ceph_check_delayed_caps(mdsc); 2940 2941 mutex_lock(&mdsc->mutex); 2942 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 2943 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 2944 mdsc->last_renew_caps); 2945 if (renew_caps) 2946 mdsc->last_renew_caps = jiffies; 2947 2948 for (i = 0; i < mdsc->max_sessions; i++) { 2949 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 2950 if (s == NULL) 2951 continue; 2952 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 2953 dout("resending session close request for mds%d\n", 2954 s->s_mds); 2955 request_close_session(mdsc, s); 2956 ceph_put_mds_session(s); 2957 continue; 2958 } 2959 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 2960 if (s->s_state == CEPH_MDS_SESSION_OPEN) { 2961 s->s_state = CEPH_MDS_SESSION_HUNG; 2962 pr_info("mds%d hung\n", s->s_mds); 2963 } 2964 } 2965 if (s->s_state < CEPH_MDS_SESSION_OPEN) { 2966 /* this mds is failed or recovering, just wait */ 2967 ceph_put_mds_session(s); 2968 continue; 2969 } 2970 mutex_unlock(&mdsc->mutex); 2971 2972 mutex_lock(&s->s_mutex); 2973 if (renew_caps) 2974 send_renew_caps(mdsc, s); 2975 else 2976 ceph_con_keepalive(&s->s_con); 2977 ceph_add_cap_releases(mdsc, s); 2978 if (s->s_state == CEPH_MDS_SESSION_OPEN || 2979 s->s_state == CEPH_MDS_SESSION_HUNG) 2980 ceph_send_cap_releases(mdsc, s); 2981 mutex_unlock(&s->s_mutex); 2982 ceph_put_mds_session(s); 2983 2984 mutex_lock(&mdsc->mutex); 2985 } 2986 mutex_unlock(&mdsc->mutex); 2987 2988 schedule_delayed(mdsc); 2989 } 2990 2991 int ceph_mdsc_init(struct ceph_fs_client *fsc) 2992 2993 { 2994 struct ceph_mds_client *mdsc; 2995 2996 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 2997 if (!mdsc) 2998 return -ENOMEM; 2999 mdsc->fsc = fsc; 3000 fsc->mdsc = mdsc; 3001 mutex_init(&mdsc->mutex); 3002 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 3003 if (mdsc->mdsmap == NULL) 3004 return -ENOMEM; 3005 3006 init_completion(&mdsc->safe_umount_waiters); 3007 init_waitqueue_head(&mdsc->session_close_wq); 3008 INIT_LIST_HEAD(&mdsc->waiting_for_map); 3009 mdsc->sessions = NULL; 3010 mdsc->max_sessions = 0; 3011 mdsc->stopping = 0; 3012 init_rwsem(&mdsc->snap_rwsem); 3013 mdsc->snap_realms = RB_ROOT; 3014 INIT_LIST_HEAD(&mdsc->snap_empty); 3015 spin_lock_init(&mdsc->snap_empty_lock); 3016 mdsc->last_tid = 0; 3017 mdsc->request_tree = RB_ROOT; 3018 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 3019 mdsc->last_renew_caps = jiffies; 3020 INIT_LIST_HEAD(&mdsc->cap_delay_list); 3021 spin_lock_init(&mdsc->cap_delay_lock); 3022 INIT_LIST_HEAD(&mdsc->snap_flush_list); 3023 spin_lock_init(&mdsc->snap_flush_lock); 3024 mdsc->cap_flush_seq = 0; 3025 INIT_LIST_HEAD(&mdsc->cap_dirty); 3026 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 3027 mdsc->num_cap_flushing = 0; 3028 spin_lock_init(&mdsc->cap_dirty_lock); 3029 init_waitqueue_head(&mdsc->cap_flushing_wq); 3030 spin_lock_init(&mdsc->dentry_lru_lock); 3031 INIT_LIST_HEAD(&mdsc->dentry_lru); 3032 3033 ceph_caps_init(mdsc); 3034 ceph_adjust_min_caps(mdsc, fsc->min_caps); 3035 3036 return 0; 3037 } 3038 3039 /* 3040 * Wait for safe replies on open mds requests. If we time out, drop 3041 * all requests from the tree to avoid dangling dentry refs. 3042 */ 3043 static void wait_requests(struct ceph_mds_client *mdsc) 3044 { 3045 struct ceph_mds_request *req; 3046 struct ceph_fs_client *fsc = mdsc->fsc; 3047 3048 mutex_lock(&mdsc->mutex); 3049 if (__get_oldest_req(mdsc)) { 3050 mutex_unlock(&mdsc->mutex); 3051 3052 dout("wait_requests waiting for requests\n"); 3053 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 3054 fsc->client->options->mount_timeout * HZ); 3055 3056 /* tear down remaining requests */ 3057 mutex_lock(&mdsc->mutex); 3058 while ((req = __get_oldest_req(mdsc))) { 3059 dout("wait_requests timed out on tid %llu\n", 3060 req->r_tid); 3061 __unregister_request(mdsc, req); 3062 } 3063 } 3064 mutex_unlock(&mdsc->mutex); 3065 dout("wait_requests done\n"); 3066 } 3067 3068 /* 3069 * called before mount is ro, and before dentries are torn down. 3070 * (hmm, does this still race with new lookups?) 3071 */ 3072 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 3073 { 3074 dout("pre_umount\n"); 3075 mdsc->stopping = 1; 3076 3077 drop_leases(mdsc); 3078 ceph_flush_dirty_caps(mdsc); 3079 wait_requests(mdsc); 3080 3081 /* 3082 * wait for reply handlers to drop their request refs and 3083 * their inode/dcache refs 3084 */ 3085 ceph_msgr_flush(); 3086 } 3087 3088 /* 3089 * wait for all write mds requests to flush. 3090 */ 3091 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 3092 { 3093 struct ceph_mds_request *req = NULL, *nextreq; 3094 struct rb_node *n; 3095 3096 mutex_lock(&mdsc->mutex); 3097 dout("wait_unsafe_requests want %lld\n", want_tid); 3098 restart: 3099 req = __get_oldest_req(mdsc); 3100 while (req && req->r_tid <= want_tid) { 3101 /* find next request */ 3102 n = rb_next(&req->r_node); 3103 if (n) 3104 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 3105 else 3106 nextreq = NULL; 3107 if ((req->r_op & CEPH_MDS_OP_WRITE)) { 3108 /* write op */ 3109 ceph_mdsc_get_request(req); 3110 if (nextreq) 3111 ceph_mdsc_get_request(nextreq); 3112 mutex_unlock(&mdsc->mutex); 3113 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 3114 req->r_tid, want_tid); 3115 wait_for_completion(&req->r_safe_completion); 3116 mutex_lock(&mdsc->mutex); 3117 ceph_mdsc_put_request(req); 3118 if (!nextreq) 3119 break; /* next dne before, so we're done! */ 3120 if (RB_EMPTY_NODE(&nextreq->r_node)) { 3121 /* next request was removed from tree */ 3122 ceph_mdsc_put_request(nextreq); 3123 goto restart; 3124 } 3125 ceph_mdsc_put_request(nextreq); /* won't go away */ 3126 } 3127 req = nextreq; 3128 } 3129 mutex_unlock(&mdsc->mutex); 3130 dout("wait_unsafe_requests done\n"); 3131 } 3132 3133 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 3134 { 3135 u64 want_tid, want_flush; 3136 3137 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) 3138 return; 3139 3140 dout("sync\n"); 3141 mutex_lock(&mdsc->mutex); 3142 want_tid = mdsc->last_tid; 3143 want_flush = mdsc->cap_flush_seq; 3144 mutex_unlock(&mdsc->mutex); 3145 dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush); 3146 3147 ceph_flush_dirty_caps(mdsc); 3148 3149 wait_unsafe_requests(mdsc, want_tid); 3150 wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush)); 3151 } 3152 3153 /* 3154 * true if all sessions are closed, or we force unmount 3155 */ 3156 bool done_closing_sessions(struct ceph_mds_client *mdsc) 3157 { 3158 int i, n = 0; 3159 3160 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) 3161 return true; 3162 3163 mutex_lock(&mdsc->mutex); 3164 for (i = 0; i < mdsc->max_sessions; i++) 3165 if (mdsc->sessions[i]) 3166 n++; 3167 mutex_unlock(&mdsc->mutex); 3168 return n == 0; 3169 } 3170 3171 /* 3172 * called after sb is ro. 3173 */ 3174 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 3175 { 3176 struct ceph_mds_session *session; 3177 int i; 3178 struct ceph_fs_client *fsc = mdsc->fsc; 3179 unsigned long timeout = fsc->client->options->mount_timeout * HZ; 3180 3181 dout("close_sessions\n"); 3182 3183 /* close sessions */ 3184 mutex_lock(&mdsc->mutex); 3185 for (i = 0; i < mdsc->max_sessions; i++) { 3186 session = __ceph_lookup_mds_session(mdsc, i); 3187 if (!session) 3188 continue; 3189 mutex_unlock(&mdsc->mutex); 3190 mutex_lock(&session->s_mutex); 3191 __close_session(mdsc, session); 3192 mutex_unlock(&session->s_mutex); 3193 ceph_put_mds_session(session); 3194 mutex_lock(&mdsc->mutex); 3195 } 3196 mutex_unlock(&mdsc->mutex); 3197 3198 dout("waiting for sessions to close\n"); 3199 wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc), 3200 timeout); 3201 3202 /* tear down remaining sessions */ 3203 mutex_lock(&mdsc->mutex); 3204 for (i = 0; i < mdsc->max_sessions; i++) { 3205 if (mdsc->sessions[i]) { 3206 session = get_session(mdsc->sessions[i]); 3207 __unregister_session(mdsc, session); 3208 mutex_unlock(&mdsc->mutex); 3209 mutex_lock(&session->s_mutex); 3210 remove_session_caps(session); 3211 mutex_unlock(&session->s_mutex); 3212 ceph_put_mds_session(session); 3213 mutex_lock(&mdsc->mutex); 3214 } 3215 } 3216 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 3217 mutex_unlock(&mdsc->mutex); 3218 3219 ceph_cleanup_empty_realms(mdsc); 3220 3221 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 3222 3223 dout("stopped\n"); 3224 } 3225 3226 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 3227 { 3228 dout("stop\n"); 3229 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 3230 if (mdsc->mdsmap) 3231 ceph_mdsmap_destroy(mdsc->mdsmap); 3232 kfree(mdsc->sessions); 3233 ceph_caps_finalize(mdsc); 3234 } 3235 3236 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 3237 { 3238 struct ceph_mds_client *mdsc = fsc->mdsc; 3239 3240 dout("mdsc_destroy %p\n", mdsc); 3241 ceph_mdsc_stop(mdsc); 3242 3243 /* flush out any connection work with references to us */ 3244 ceph_msgr_flush(); 3245 3246 fsc->mdsc = NULL; 3247 kfree(mdsc); 3248 dout("mdsc_destroy %p done\n", mdsc); 3249 } 3250 3251 3252 /* 3253 * handle mds map update. 3254 */ 3255 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 3256 { 3257 u32 epoch; 3258 u32 maplen; 3259 void *p = msg->front.iov_base; 3260 void *end = p + msg->front.iov_len; 3261 struct ceph_mdsmap *newmap, *oldmap; 3262 struct ceph_fsid fsid; 3263 int err = -EINVAL; 3264 3265 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 3266 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 3267 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 3268 return; 3269 epoch = ceph_decode_32(&p); 3270 maplen = ceph_decode_32(&p); 3271 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 3272 3273 /* do we need it? */ 3274 ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch); 3275 mutex_lock(&mdsc->mutex); 3276 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 3277 dout("handle_map epoch %u <= our %u\n", 3278 epoch, mdsc->mdsmap->m_epoch); 3279 mutex_unlock(&mdsc->mutex); 3280 return; 3281 } 3282 3283 newmap = ceph_mdsmap_decode(&p, end); 3284 if (IS_ERR(newmap)) { 3285 err = PTR_ERR(newmap); 3286 goto bad_unlock; 3287 } 3288 3289 /* swap into place */ 3290 if (mdsc->mdsmap) { 3291 oldmap = mdsc->mdsmap; 3292 mdsc->mdsmap = newmap; 3293 check_new_map(mdsc, newmap, oldmap); 3294 ceph_mdsmap_destroy(oldmap); 3295 } else { 3296 mdsc->mdsmap = newmap; /* first mds map */ 3297 } 3298 mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size; 3299 3300 __wake_requests(mdsc, &mdsc->waiting_for_map); 3301 3302 mutex_unlock(&mdsc->mutex); 3303 schedule_delayed(mdsc); 3304 return; 3305 3306 bad_unlock: 3307 mutex_unlock(&mdsc->mutex); 3308 bad: 3309 pr_err("error decoding mdsmap %d\n", err); 3310 return; 3311 } 3312 3313 static struct ceph_connection *con_get(struct ceph_connection *con) 3314 { 3315 struct ceph_mds_session *s = con->private; 3316 3317 if (get_session(s)) { 3318 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref)); 3319 return con; 3320 } 3321 dout("mdsc con_get %p FAIL\n", s); 3322 return NULL; 3323 } 3324 3325 static void con_put(struct ceph_connection *con) 3326 { 3327 struct ceph_mds_session *s = con->private; 3328 3329 dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1); 3330 ceph_put_mds_session(s); 3331 } 3332 3333 /* 3334 * if the client is unresponsive for long enough, the mds will kill 3335 * the session entirely. 3336 */ 3337 static void peer_reset(struct ceph_connection *con) 3338 { 3339 struct ceph_mds_session *s = con->private; 3340 struct ceph_mds_client *mdsc = s->s_mdsc; 3341 3342 pr_warning("mds%d closed our session\n", s->s_mds); 3343 send_mds_reconnect(mdsc, s); 3344 } 3345 3346 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 3347 { 3348 struct ceph_mds_session *s = con->private; 3349 struct ceph_mds_client *mdsc = s->s_mdsc; 3350 int type = le16_to_cpu(msg->hdr.type); 3351 3352 mutex_lock(&mdsc->mutex); 3353 if (__verify_registered_session(mdsc, s) < 0) { 3354 mutex_unlock(&mdsc->mutex); 3355 goto out; 3356 } 3357 mutex_unlock(&mdsc->mutex); 3358 3359 switch (type) { 3360 case CEPH_MSG_MDS_MAP: 3361 ceph_mdsc_handle_map(mdsc, msg); 3362 break; 3363 case CEPH_MSG_CLIENT_SESSION: 3364 handle_session(s, msg); 3365 break; 3366 case CEPH_MSG_CLIENT_REPLY: 3367 handle_reply(s, msg); 3368 break; 3369 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 3370 handle_forward(mdsc, s, msg); 3371 break; 3372 case CEPH_MSG_CLIENT_CAPS: 3373 ceph_handle_caps(s, msg); 3374 break; 3375 case CEPH_MSG_CLIENT_SNAP: 3376 ceph_handle_snap(mdsc, s, msg); 3377 break; 3378 case CEPH_MSG_CLIENT_LEASE: 3379 handle_lease(mdsc, s, msg); 3380 break; 3381 3382 default: 3383 pr_err("received unknown message type %d %s\n", type, 3384 ceph_msg_type_name(type)); 3385 } 3386 out: 3387 ceph_msg_put(msg); 3388 } 3389 3390 /* 3391 * authentication 3392 */ 3393 static int get_authorizer(struct ceph_connection *con, 3394 void **buf, int *len, int *proto, 3395 void **reply_buf, int *reply_len, int force_new) 3396 { 3397 struct ceph_mds_session *s = con->private; 3398 struct ceph_mds_client *mdsc = s->s_mdsc; 3399 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3400 int ret = 0; 3401 3402 if (force_new && s->s_authorizer) { 3403 ac->ops->destroy_authorizer(ac, s->s_authorizer); 3404 s->s_authorizer = NULL; 3405 } 3406 if (s->s_authorizer == NULL) { 3407 if (ac->ops->create_authorizer) { 3408 ret = ac->ops->create_authorizer( 3409 ac, CEPH_ENTITY_TYPE_MDS, 3410 &s->s_authorizer, 3411 &s->s_authorizer_buf, 3412 &s->s_authorizer_buf_len, 3413 &s->s_authorizer_reply_buf, 3414 &s->s_authorizer_reply_buf_len); 3415 if (ret) 3416 return ret; 3417 } 3418 } 3419 3420 *proto = ac->protocol; 3421 *buf = s->s_authorizer_buf; 3422 *len = s->s_authorizer_buf_len; 3423 *reply_buf = s->s_authorizer_reply_buf; 3424 *reply_len = s->s_authorizer_reply_buf_len; 3425 return 0; 3426 } 3427 3428 3429 static int verify_authorizer_reply(struct ceph_connection *con, int len) 3430 { 3431 struct ceph_mds_session *s = con->private; 3432 struct ceph_mds_client *mdsc = s->s_mdsc; 3433 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3434 3435 return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len); 3436 } 3437 3438 static int invalidate_authorizer(struct ceph_connection *con) 3439 { 3440 struct ceph_mds_session *s = con->private; 3441 struct ceph_mds_client *mdsc = s->s_mdsc; 3442 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3443 3444 if (ac->ops->invalidate_authorizer) 3445 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 3446 3447 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 3448 } 3449 3450 static const struct ceph_connection_operations mds_con_ops = { 3451 .get = con_get, 3452 .put = con_put, 3453 .dispatch = dispatch, 3454 .get_authorizer = get_authorizer, 3455 .verify_authorizer_reply = verify_authorizer_reply, 3456 .invalidate_authorizer = invalidate_authorizer, 3457 .peer_reset = peer_reset, 3458 }; 3459 3460 /* eof */ 3461