1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/fs.h> 4 #include <linux/wait.h> 5 #include <linux/slab.h> 6 #include <linux/sched.h> 7 #include <linux/debugfs.h> 8 #include <linux/seq_file.h> 9 10 #include "super.h" 11 #include "mds_client.h" 12 13 #include <linux/ceph/messenger.h> 14 #include <linux/ceph/decode.h> 15 #include <linux/ceph/pagelist.h> 16 #include <linux/ceph/auth.h> 17 #include <linux/ceph/debugfs.h> 18 19 /* 20 * A cluster of MDS (metadata server) daemons is responsible for 21 * managing the file system namespace (the directory hierarchy and 22 * inodes) and for coordinating shared access to storage. Metadata is 23 * partitioning hierarchically across a number of servers, and that 24 * partition varies over time as the cluster adjusts the distribution 25 * in order to balance load. 26 * 27 * The MDS client is primarily responsible to managing synchronous 28 * metadata requests for operations like open, unlink, and so forth. 29 * If there is a MDS failure, we find out about it when we (possibly 30 * request and) receive a new MDS map, and can resubmit affected 31 * requests. 32 * 33 * For the most part, though, we take advantage of a lossless 34 * communications channel to the MDS, and do not need to worry about 35 * timing out or resubmitting requests. 36 * 37 * We maintain a stateful "session" with each MDS we interact with. 38 * Within each session, we sent periodic heartbeat messages to ensure 39 * any capabilities or leases we have been issues remain valid. If 40 * the session times out and goes stale, our leases and capabilities 41 * are no longer valid. 42 */ 43 44 struct ceph_reconnect_state { 45 struct ceph_pagelist *pagelist; 46 bool flock; 47 }; 48 49 static void __wake_requests(struct ceph_mds_client *mdsc, 50 struct list_head *head); 51 52 static const struct ceph_connection_operations mds_con_ops; 53 54 55 /* 56 * mds reply parsing 57 */ 58 59 /* 60 * parse individual inode info 61 */ 62 static int parse_reply_info_in(void **p, void *end, 63 struct ceph_mds_reply_info_in *info, 64 int features) 65 { 66 int err = -EIO; 67 68 info->in = *p; 69 *p += sizeof(struct ceph_mds_reply_inode) + 70 sizeof(*info->in->fragtree.splits) * 71 le32_to_cpu(info->in->fragtree.nsplits); 72 73 ceph_decode_32_safe(p, end, info->symlink_len, bad); 74 ceph_decode_need(p, end, info->symlink_len, bad); 75 info->symlink = *p; 76 *p += info->symlink_len; 77 78 if (features & CEPH_FEATURE_DIRLAYOUTHASH) 79 ceph_decode_copy_safe(p, end, &info->dir_layout, 80 sizeof(info->dir_layout), bad); 81 else 82 memset(&info->dir_layout, 0, sizeof(info->dir_layout)); 83 84 ceph_decode_32_safe(p, end, info->xattr_len, bad); 85 ceph_decode_need(p, end, info->xattr_len, bad); 86 info->xattr_data = *p; 87 *p += info->xattr_len; 88 return 0; 89 bad: 90 return err; 91 } 92 93 /* 94 * parse a normal reply, which may contain a (dir+)dentry and/or a 95 * target inode. 96 */ 97 static int parse_reply_info_trace(void **p, void *end, 98 struct ceph_mds_reply_info_parsed *info, 99 int features) 100 { 101 int err; 102 103 if (info->head->is_dentry) { 104 err = parse_reply_info_in(p, end, &info->diri, features); 105 if (err < 0) 106 goto out_bad; 107 108 if (unlikely(*p + sizeof(*info->dirfrag) > end)) 109 goto bad; 110 info->dirfrag = *p; 111 *p += sizeof(*info->dirfrag) + 112 sizeof(u32)*le32_to_cpu(info->dirfrag->ndist); 113 if (unlikely(*p > end)) 114 goto bad; 115 116 ceph_decode_32_safe(p, end, info->dname_len, bad); 117 ceph_decode_need(p, end, info->dname_len, bad); 118 info->dname = *p; 119 *p += info->dname_len; 120 info->dlease = *p; 121 *p += sizeof(*info->dlease); 122 } 123 124 if (info->head->is_target) { 125 err = parse_reply_info_in(p, end, &info->targeti, features); 126 if (err < 0) 127 goto out_bad; 128 } 129 130 if (unlikely(*p != end)) 131 goto bad; 132 return 0; 133 134 bad: 135 err = -EIO; 136 out_bad: 137 pr_err("problem parsing mds trace %d\n", err); 138 return err; 139 } 140 141 /* 142 * parse readdir results 143 */ 144 static int parse_reply_info_dir(void **p, void *end, 145 struct ceph_mds_reply_info_parsed *info, 146 int features) 147 { 148 u32 num, i = 0; 149 int err; 150 151 info->dir_dir = *p; 152 if (*p + sizeof(*info->dir_dir) > end) 153 goto bad; 154 *p += sizeof(*info->dir_dir) + 155 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist); 156 if (*p > end) 157 goto bad; 158 159 ceph_decode_need(p, end, sizeof(num) + 2, bad); 160 num = ceph_decode_32(p); 161 info->dir_end = ceph_decode_8(p); 162 info->dir_complete = ceph_decode_8(p); 163 if (num == 0) 164 goto done; 165 166 /* alloc large array */ 167 info->dir_nr = num; 168 info->dir_in = kcalloc(num, sizeof(*info->dir_in) + 169 sizeof(*info->dir_dname) + 170 sizeof(*info->dir_dname_len) + 171 sizeof(*info->dir_dlease), 172 GFP_NOFS); 173 if (info->dir_in == NULL) { 174 err = -ENOMEM; 175 goto out_bad; 176 } 177 info->dir_dname = (void *)(info->dir_in + num); 178 info->dir_dname_len = (void *)(info->dir_dname + num); 179 info->dir_dlease = (void *)(info->dir_dname_len + num); 180 181 while (num) { 182 /* dentry */ 183 ceph_decode_need(p, end, sizeof(u32)*2, bad); 184 info->dir_dname_len[i] = ceph_decode_32(p); 185 ceph_decode_need(p, end, info->dir_dname_len[i], bad); 186 info->dir_dname[i] = *p; 187 *p += info->dir_dname_len[i]; 188 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i], 189 info->dir_dname[i]); 190 info->dir_dlease[i] = *p; 191 *p += sizeof(struct ceph_mds_reply_lease); 192 193 /* inode */ 194 err = parse_reply_info_in(p, end, &info->dir_in[i], features); 195 if (err < 0) 196 goto out_bad; 197 i++; 198 num--; 199 } 200 201 done: 202 if (*p != end) 203 goto bad; 204 return 0; 205 206 bad: 207 err = -EIO; 208 out_bad: 209 pr_err("problem parsing dir contents %d\n", err); 210 return err; 211 } 212 213 /* 214 * parse fcntl F_GETLK results 215 */ 216 static int parse_reply_info_filelock(void **p, void *end, 217 struct ceph_mds_reply_info_parsed *info, 218 int features) 219 { 220 if (*p + sizeof(*info->filelock_reply) > end) 221 goto bad; 222 223 info->filelock_reply = *p; 224 *p += sizeof(*info->filelock_reply); 225 226 if (unlikely(*p != end)) 227 goto bad; 228 return 0; 229 230 bad: 231 return -EIO; 232 } 233 234 /* 235 * parse extra results 236 */ 237 static int parse_reply_info_extra(void **p, void *end, 238 struct ceph_mds_reply_info_parsed *info, 239 int features) 240 { 241 if (info->head->op == CEPH_MDS_OP_GETFILELOCK) 242 return parse_reply_info_filelock(p, end, info, features); 243 else 244 return parse_reply_info_dir(p, end, info, features); 245 } 246 247 /* 248 * parse entire mds reply 249 */ 250 static int parse_reply_info(struct ceph_msg *msg, 251 struct ceph_mds_reply_info_parsed *info, 252 int features) 253 { 254 void *p, *end; 255 u32 len; 256 int err; 257 258 info->head = msg->front.iov_base; 259 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 260 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 261 262 /* trace */ 263 ceph_decode_32_safe(&p, end, len, bad); 264 if (len > 0) { 265 ceph_decode_need(&p, end, len, bad); 266 err = parse_reply_info_trace(&p, p+len, info, features); 267 if (err < 0) 268 goto out_bad; 269 } 270 271 /* extra */ 272 ceph_decode_32_safe(&p, end, len, bad); 273 if (len > 0) { 274 ceph_decode_need(&p, end, len, bad); 275 err = parse_reply_info_extra(&p, p+len, info, features); 276 if (err < 0) 277 goto out_bad; 278 } 279 280 /* snap blob */ 281 ceph_decode_32_safe(&p, end, len, bad); 282 info->snapblob_len = len; 283 info->snapblob = p; 284 p += len; 285 286 if (p != end) 287 goto bad; 288 return 0; 289 290 bad: 291 err = -EIO; 292 out_bad: 293 pr_err("mds parse_reply err %d\n", err); 294 return err; 295 } 296 297 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 298 { 299 kfree(info->dir_in); 300 } 301 302 303 /* 304 * sessions 305 */ 306 static const char *session_state_name(int s) 307 { 308 switch (s) { 309 case CEPH_MDS_SESSION_NEW: return "new"; 310 case CEPH_MDS_SESSION_OPENING: return "opening"; 311 case CEPH_MDS_SESSION_OPEN: return "open"; 312 case CEPH_MDS_SESSION_HUNG: return "hung"; 313 case CEPH_MDS_SESSION_CLOSING: return "closing"; 314 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 315 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 316 default: return "???"; 317 } 318 } 319 320 static struct ceph_mds_session *get_session(struct ceph_mds_session *s) 321 { 322 if (atomic_inc_not_zero(&s->s_ref)) { 323 dout("mdsc get_session %p %d -> %d\n", s, 324 atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref)); 325 return s; 326 } else { 327 dout("mdsc get_session %p 0 -- FAIL", s); 328 return NULL; 329 } 330 } 331 332 void ceph_put_mds_session(struct ceph_mds_session *s) 333 { 334 dout("mdsc put_session %p %d -> %d\n", s, 335 atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1); 336 if (atomic_dec_and_test(&s->s_ref)) { 337 if (s->s_authorizer) 338 s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer( 339 s->s_mdsc->fsc->client->monc.auth, 340 s->s_authorizer); 341 kfree(s); 342 } 343 } 344 345 /* 346 * called under mdsc->mutex 347 */ 348 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 349 int mds) 350 { 351 struct ceph_mds_session *session; 352 353 if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL) 354 return NULL; 355 session = mdsc->sessions[mds]; 356 dout("lookup_mds_session %p %d\n", session, 357 atomic_read(&session->s_ref)); 358 get_session(session); 359 return session; 360 } 361 362 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 363 { 364 if (mds >= mdsc->max_sessions) 365 return false; 366 return mdsc->sessions[mds]; 367 } 368 369 static int __verify_registered_session(struct ceph_mds_client *mdsc, 370 struct ceph_mds_session *s) 371 { 372 if (s->s_mds >= mdsc->max_sessions || 373 mdsc->sessions[s->s_mds] != s) 374 return -ENOENT; 375 return 0; 376 } 377 378 /* 379 * create+register a new session for given mds. 380 * called under mdsc->mutex. 381 */ 382 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 383 int mds) 384 { 385 struct ceph_mds_session *s; 386 387 s = kzalloc(sizeof(*s), GFP_NOFS); 388 if (!s) 389 return ERR_PTR(-ENOMEM); 390 s->s_mdsc = mdsc; 391 s->s_mds = mds; 392 s->s_state = CEPH_MDS_SESSION_NEW; 393 s->s_ttl = 0; 394 s->s_seq = 0; 395 mutex_init(&s->s_mutex); 396 397 ceph_con_init(mdsc->fsc->client->msgr, &s->s_con); 398 s->s_con.private = s; 399 s->s_con.ops = &mds_con_ops; 400 s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS; 401 s->s_con.peer_name.num = cpu_to_le64(mds); 402 403 spin_lock_init(&s->s_gen_ttl_lock); 404 s->s_cap_gen = 0; 405 s->s_cap_ttl = jiffies - 1; 406 407 spin_lock_init(&s->s_cap_lock); 408 s->s_renew_requested = 0; 409 s->s_renew_seq = 0; 410 INIT_LIST_HEAD(&s->s_caps); 411 s->s_nr_caps = 0; 412 s->s_trim_caps = 0; 413 atomic_set(&s->s_ref, 1); 414 INIT_LIST_HEAD(&s->s_waiting); 415 INIT_LIST_HEAD(&s->s_unsafe); 416 s->s_num_cap_releases = 0; 417 s->s_cap_iterator = NULL; 418 INIT_LIST_HEAD(&s->s_cap_releases); 419 INIT_LIST_HEAD(&s->s_cap_releases_done); 420 INIT_LIST_HEAD(&s->s_cap_flushing); 421 INIT_LIST_HEAD(&s->s_cap_snaps_flushing); 422 423 dout("register_session mds%d\n", mds); 424 if (mds >= mdsc->max_sessions) { 425 int newmax = 1 << get_count_order(mds+1); 426 struct ceph_mds_session **sa; 427 428 dout("register_session realloc to %d\n", newmax); 429 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 430 if (sa == NULL) 431 goto fail_realloc; 432 if (mdsc->sessions) { 433 memcpy(sa, mdsc->sessions, 434 mdsc->max_sessions * sizeof(void *)); 435 kfree(mdsc->sessions); 436 } 437 mdsc->sessions = sa; 438 mdsc->max_sessions = newmax; 439 } 440 mdsc->sessions[mds] = s; 441 atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 442 443 ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 444 445 return s; 446 447 fail_realloc: 448 kfree(s); 449 return ERR_PTR(-ENOMEM); 450 } 451 452 /* 453 * called under mdsc->mutex 454 */ 455 static void __unregister_session(struct ceph_mds_client *mdsc, 456 struct ceph_mds_session *s) 457 { 458 dout("__unregister_session mds%d %p\n", s->s_mds, s); 459 BUG_ON(mdsc->sessions[s->s_mds] != s); 460 mdsc->sessions[s->s_mds] = NULL; 461 ceph_con_close(&s->s_con); 462 ceph_put_mds_session(s); 463 } 464 465 /* 466 * drop session refs in request. 467 * 468 * should be last request ref, or hold mdsc->mutex 469 */ 470 static void put_request_session(struct ceph_mds_request *req) 471 { 472 if (req->r_session) { 473 ceph_put_mds_session(req->r_session); 474 req->r_session = NULL; 475 } 476 } 477 478 void ceph_mdsc_release_request(struct kref *kref) 479 { 480 struct ceph_mds_request *req = container_of(kref, 481 struct ceph_mds_request, 482 r_kref); 483 if (req->r_request) 484 ceph_msg_put(req->r_request); 485 if (req->r_reply) { 486 ceph_msg_put(req->r_reply); 487 destroy_reply_info(&req->r_reply_info); 488 } 489 if (req->r_inode) { 490 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 491 iput(req->r_inode); 492 } 493 if (req->r_locked_dir) 494 ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 495 if (req->r_target_inode) 496 iput(req->r_target_inode); 497 if (req->r_dentry) 498 dput(req->r_dentry); 499 if (req->r_old_dentry) { 500 /* 501 * track (and drop pins for) r_old_dentry_dir 502 * separately, since r_old_dentry's d_parent may have 503 * changed between the dir mutex being dropped and 504 * this request being freed. 505 */ 506 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 507 CEPH_CAP_PIN); 508 dput(req->r_old_dentry); 509 iput(req->r_old_dentry_dir); 510 } 511 kfree(req->r_path1); 512 kfree(req->r_path2); 513 put_request_session(req); 514 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 515 kfree(req); 516 } 517 518 /* 519 * lookup session, bump ref if found. 520 * 521 * called under mdsc->mutex. 522 */ 523 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc, 524 u64 tid) 525 { 526 struct ceph_mds_request *req; 527 struct rb_node *n = mdsc->request_tree.rb_node; 528 529 while (n) { 530 req = rb_entry(n, struct ceph_mds_request, r_node); 531 if (tid < req->r_tid) 532 n = n->rb_left; 533 else if (tid > req->r_tid) 534 n = n->rb_right; 535 else { 536 ceph_mdsc_get_request(req); 537 return req; 538 } 539 } 540 return NULL; 541 } 542 543 static void __insert_request(struct ceph_mds_client *mdsc, 544 struct ceph_mds_request *new) 545 { 546 struct rb_node **p = &mdsc->request_tree.rb_node; 547 struct rb_node *parent = NULL; 548 struct ceph_mds_request *req = NULL; 549 550 while (*p) { 551 parent = *p; 552 req = rb_entry(parent, struct ceph_mds_request, r_node); 553 if (new->r_tid < req->r_tid) 554 p = &(*p)->rb_left; 555 else if (new->r_tid > req->r_tid) 556 p = &(*p)->rb_right; 557 else 558 BUG(); 559 } 560 561 rb_link_node(&new->r_node, parent, p); 562 rb_insert_color(&new->r_node, &mdsc->request_tree); 563 } 564 565 /* 566 * Register an in-flight request, and assign a tid. Link to directory 567 * are modifying (if any). 568 * 569 * Called under mdsc->mutex. 570 */ 571 static void __register_request(struct ceph_mds_client *mdsc, 572 struct ceph_mds_request *req, 573 struct inode *dir) 574 { 575 req->r_tid = ++mdsc->last_tid; 576 if (req->r_num_caps) 577 ceph_reserve_caps(mdsc, &req->r_caps_reservation, 578 req->r_num_caps); 579 dout("__register_request %p tid %lld\n", req, req->r_tid); 580 ceph_mdsc_get_request(req); 581 __insert_request(mdsc, req); 582 583 req->r_uid = current_fsuid(); 584 req->r_gid = current_fsgid(); 585 586 if (dir) { 587 struct ceph_inode_info *ci = ceph_inode(dir); 588 589 ihold(dir); 590 spin_lock(&ci->i_unsafe_lock); 591 req->r_unsafe_dir = dir; 592 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 593 spin_unlock(&ci->i_unsafe_lock); 594 } 595 } 596 597 static void __unregister_request(struct ceph_mds_client *mdsc, 598 struct ceph_mds_request *req) 599 { 600 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 601 rb_erase(&req->r_node, &mdsc->request_tree); 602 RB_CLEAR_NODE(&req->r_node); 603 604 if (req->r_unsafe_dir) { 605 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 606 607 spin_lock(&ci->i_unsafe_lock); 608 list_del_init(&req->r_unsafe_dir_item); 609 spin_unlock(&ci->i_unsafe_lock); 610 611 iput(req->r_unsafe_dir); 612 req->r_unsafe_dir = NULL; 613 } 614 615 ceph_mdsc_put_request(req); 616 } 617 618 /* 619 * Choose mds to send request to next. If there is a hint set in the 620 * request (e.g., due to a prior forward hint from the mds), use that. 621 * Otherwise, consult frag tree and/or caps to identify the 622 * appropriate mds. If all else fails, choose randomly. 623 * 624 * Called under mdsc->mutex. 625 */ 626 static struct dentry *get_nonsnap_parent(struct dentry *dentry) 627 { 628 /* 629 * we don't need to worry about protecting the d_parent access 630 * here because we never renaming inside the snapped namespace 631 * except to resplice to another snapdir, and either the old or new 632 * result is a valid result. 633 */ 634 while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP) 635 dentry = dentry->d_parent; 636 return dentry; 637 } 638 639 static int __choose_mds(struct ceph_mds_client *mdsc, 640 struct ceph_mds_request *req) 641 { 642 struct inode *inode; 643 struct ceph_inode_info *ci; 644 struct ceph_cap *cap; 645 int mode = req->r_direct_mode; 646 int mds = -1; 647 u32 hash = req->r_direct_hash; 648 bool is_hash = req->r_direct_is_hash; 649 650 /* 651 * is there a specific mds we should try? ignore hint if we have 652 * no session and the mds is not up (active or recovering). 653 */ 654 if (req->r_resend_mds >= 0 && 655 (__have_session(mdsc, req->r_resend_mds) || 656 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 657 dout("choose_mds using resend_mds mds%d\n", 658 req->r_resend_mds); 659 return req->r_resend_mds; 660 } 661 662 if (mode == USE_RANDOM_MDS) 663 goto random; 664 665 inode = NULL; 666 if (req->r_inode) { 667 inode = req->r_inode; 668 } else if (req->r_dentry) { 669 /* ignore race with rename; old or new d_parent is okay */ 670 struct dentry *parent = req->r_dentry->d_parent; 671 struct inode *dir = parent->d_inode; 672 673 if (dir->i_sb != mdsc->fsc->sb) { 674 /* not this fs! */ 675 inode = req->r_dentry->d_inode; 676 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 677 /* direct snapped/virtual snapdir requests 678 * based on parent dir inode */ 679 struct dentry *dn = get_nonsnap_parent(parent); 680 inode = dn->d_inode; 681 dout("__choose_mds using nonsnap parent %p\n", inode); 682 } else if (req->r_dentry->d_inode) { 683 /* dentry target */ 684 inode = req->r_dentry->d_inode; 685 } else { 686 /* dir + name */ 687 inode = dir; 688 hash = ceph_dentry_hash(dir, req->r_dentry); 689 is_hash = true; 690 } 691 } 692 693 dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash, 694 (int)hash, mode); 695 if (!inode) 696 goto random; 697 ci = ceph_inode(inode); 698 699 if (is_hash && S_ISDIR(inode->i_mode)) { 700 struct ceph_inode_frag frag; 701 int found; 702 703 ceph_choose_frag(ci, hash, &frag, &found); 704 if (found) { 705 if (mode == USE_ANY_MDS && frag.ndist > 0) { 706 u8 r; 707 708 /* choose a random replica */ 709 get_random_bytes(&r, 1); 710 r %= frag.ndist; 711 mds = frag.dist[r]; 712 dout("choose_mds %p %llx.%llx " 713 "frag %u mds%d (%d/%d)\n", 714 inode, ceph_vinop(inode), 715 frag.frag, mds, 716 (int)r, frag.ndist); 717 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 718 CEPH_MDS_STATE_ACTIVE) 719 return mds; 720 } 721 722 /* since this file/dir wasn't known to be 723 * replicated, then we want to look for the 724 * authoritative mds. */ 725 mode = USE_AUTH_MDS; 726 if (frag.mds >= 0) { 727 /* choose auth mds */ 728 mds = frag.mds; 729 dout("choose_mds %p %llx.%llx " 730 "frag %u mds%d (auth)\n", 731 inode, ceph_vinop(inode), frag.frag, mds); 732 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 733 CEPH_MDS_STATE_ACTIVE) 734 return mds; 735 } 736 } 737 } 738 739 spin_lock(&ci->i_ceph_lock); 740 cap = NULL; 741 if (mode == USE_AUTH_MDS) 742 cap = ci->i_auth_cap; 743 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 744 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 745 if (!cap) { 746 spin_unlock(&ci->i_ceph_lock); 747 goto random; 748 } 749 mds = cap->session->s_mds; 750 dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n", 751 inode, ceph_vinop(inode), mds, 752 cap == ci->i_auth_cap ? "auth " : "", cap); 753 spin_unlock(&ci->i_ceph_lock); 754 return mds; 755 756 random: 757 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 758 dout("choose_mds chose random mds%d\n", mds); 759 return mds; 760 } 761 762 763 /* 764 * session messages 765 */ 766 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 767 { 768 struct ceph_msg *msg; 769 struct ceph_mds_session_head *h; 770 771 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 772 false); 773 if (!msg) { 774 pr_err("create_session_msg ENOMEM creating msg\n"); 775 return NULL; 776 } 777 h = msg->front.iov_base; 778 h->op = cpu_to_le32(op); 779 h->seq = cpu_to_le64(seq); 780 return msg; 781 } 782 783 /* 784 * send session open request. 785 * 786 * called under mdsc->mutex 787 */ 788 static int __open_session(struct ceph_mds_client *mdsc, 789 struct ceph_mds_session *session) 790 { 791 struct ceph_msg *msg; 792 int mstate; 793 int mds = session->s_mds; 794 795 /* wait for mds to go active? */ 796 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 797 dout("open_session to mds%d (%s)\n", mds, 798 ceph_mds_state_name(mstate)); 799 session->s_state = CEPH_MDS_SESSION_OPENING; 800 session->s_renew_requested = jiffies; 801 802 /* send connect message */ 803 msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq); 804 if (!msg) 805 return -ENOMEM; 806 ceph_con_send(&session->s_con, msg); 807 return 0; 808 } 809 810 /* 811 * open sessions for any export targets for the given mds 812 * 813 * called under mdsc->mutex 814 */ 815 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 816 struct ceph_mds_session *session) 817 { 818 struct ceph_mds_info *mi; 819 struct ceph_mds_session *ts; 820 int i, mds = session->s_mds; 821 int target; 822 823 if (mds >= mdsc->mdsmap->m_max_mds) 824 return; 825 mi = &mdsc->mdsmap->m_info[mds]; 826 dout("open_export_target_sessions for mds%d (%d targets)\n", 827 session->s_mds, mi->num_export_targets); 828 829 for (i = 0; i < mi->num_export_targets; i++) { 830 target = mi->export_targets[i]; 831 ts = __ceph_lookup_mds_session(mdsc, target); 832 if (!ts) { 833 ts = register_session(mdsc, target); 834 if (IS_ERR(ts)) 835 return; 836 } 837 if (session->s_state == CEPH_MDS_SESSION_NEW || 838 session->s_state == CEPH_MDS_SESSION_CLOSING) 839 __open_session(mdsc, session); 840 else 841 dout(" mds%d target mds%d %p is %s\n", session->s_mds, 842 i, ts, session_state_name(ts->s_state)); 843 ceph_put_mds_session(ts); 844 } 845 } 846 847 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 848 struct ceph_mds_session *session) 849 { 850 mutex_lock(&mdsc->mutex); 851 __open_export_target_sessions(mdsc, session); 852 mutex_unlock(&mdsc->mutex); 853 } 854 855 /* 856 * session caps 857 */ 858 859 /* 860 * Free preallocated cap messages assigned to this session 861 */ 862 static void cleanup_cap_releases(struct ceph_mds_session *session) 863 { 864 struct ceph_msg *msg; 865 866 spin_lock(&session->s_cap_lock); 867 while (!list_empty(&session->s_cap_releases)) { 868 msg = list_first_entry(&session->s_cap_releases, 869 struct ceph_msg, list_head); 870 list_del_init(&msg->list_head); 871 ceph_msg_put(msg); 872 } 873 while (!list_empty(&session->s_cap_releases_done)) { 874 msg = list_first_entry(&session->s_cap_releases_done, 875 struct ceph_msg, list_head); 876 list_del_init(&msg->list_head); 877 ceph_msg_put(msg); 878 } 879 spin_unlock(&session->s_cap_lock); 880 } 881 882 /* 883 * Helper to safely iterate over all caps associated with a session, with 884 * special care taken to handle a racing __ceph_remove_cap(). 885 * 886 * Caller must hold session s_mutex. 887 */ 888 static int iterate_session_caps(struct ceph_mds_session *session, 889 int (*cb)(struct inode *, struct ceph_cap *, 890 void *), void *arg) 891 { 892 struct list_head *p; 893 struct ceph_cap *cap; 894 struct inode *inode, *last_inode = NULL; 895 struct ceph_cap *old_cap = NULL; 896 int ret; 897 898 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 899 spin_lock(&session->s_cap_lock); 900 p = session->s_caps.next; 901 while (p != &session->s_caps) { 902 cap = list_entry(p, struct ceph_cap, session_caps); 903 inode = igrab(&cap->ci->vfs_inode); 904 if (!inode) { 905 p = p->next; 906 continue; 907 } 908 session->s_cap_iterator = cap; 909 spin_unlock(&session->s_cap_lock); 910 911 if (last_inode) { 912 iput(last_inode); 913 last_inode = NULL; 914 } 915 if (old_cap) { 916 ceph_put_cap(session->s_mdsc, old_cap); 917 old_cap = NULL; 918 } 919 920 ret = cb(inode, cap, arg); 921 last_inode = inode; 922 923 spin_lock(&session->s_cap_lock); 924 p = p->next; 925 if (cap->ci == NULL) { 926 dout("iterate_session_caps finishing cap %p removal\n", 927 cap); 928 BUG_ON(cap->session != session); 929 list_del_init(&cap->session_caps); 930 session->s_nr_caps--; 931 cap->session = NULL; 932 old_cap = cap; /* put_cap it w/o locks held */ 933 } 934 if (ret < 0) 935 goto out; 936 } 937 ret = 0; 938 out: 939 session->s_cap_iterator = NULL; 940 spin_unlock(&session->s_cap_lock); 941 942 if (last_inode) 943 iput(last_inode); 944 if (old_cap) 945 ceph_put_cap(session->s_mdsc, old_cap); 946 947 return ret; 948 } 949 950 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 951 void *arg) 952 { 953 struct ceph_inode_info *ci = ceph_inode(inode); 954 int drop = 0; 955 956 dout("removing cap %p, ci is %p, inode is %p\n", 957 cap, ci, &ci->vfs_inode); 958 spin_lock(&ci->i_ceph_lock); 959 __ceph_remove_cap(cap); 960 if (!__ceph_is_any_real_caps(ci)) { 961 struct ceph_mds_client *mdsc = 962 ceph_sb_to_client(inode->i_sb)->mdsc; 963 964 spin_lock(&mdsc->cap_dirty_lock); 965 if (!list_empty(&ci->i_dirty_item)) { 966 pr_info(" dropping dirty %s state for %p %lld\n", 967 ceph_cap_string(ci->i_dirty_caps), 968 inode, ceph_ino(inode)); 969 ci->i_dirty_caps = 0; 970 list_del_init(&ci->i_dirty_item); 971 drop = 1; 972 } 973 if (!list_empty(&ci->i_flushing_item)) { 974 pr_info(" dropping dirty+flushing %s state for %p %lld\n", 975 ceph_cap_string(ci->i_flushing_caps), 976 inode, ceph_ino(inode)); 977 ci->i_flushing_caps = 0; 978 list_del_init(&ci->i_flushing_item); 979 mdsc->num_cap_flushing--; 980 drop = 1; 981 } 982 if (drop && ci->i_wrbuffer_ref) { 983 pr_info(" dropping dirty data for %p %lld\n", 984 inode, ceph_ino(inode)); 985 ci->i_wrbuffer_ref = 0; 986 ci->i_wrbuffer_ref_head = 0; 987 drop++; 988 } 989 spin_unlock(&mdsc->cap_dirty_lock); 990 } 991 spin_unlock(&ci->i_ceph_lock); 992 while (drop--) 993 iput(inode); 994 return 0; 995 } 996 997 /* 998 * caller must hold session s_mutex 999 */ 1000 static void remove_session_caps(struct ceph_mds_session *session) 1001 { 1002 dout("remove_session_caps on %p\n", session); 1003 iterate_session_caps(session, remove_session_caps_cb, NULL); 1004 BUG_ON(session->s_nr_caps > 0); 1005 BUG_ON(!list_empty(&session->s_cap_flushing)); 1006 cleanup_cap_releases(session); 1007 } 1008 1009 /* 1010 * wake up any threads waiting on this session's caps. if the cap is 1011 * old (didn't get renewed on the client reconnect), remove it now. 1012 * 1013 * caller must hold s_mutex. 1014 */ 1015 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1016 void *arg) 1017 { 1018 struct ceph_inode_info *ci = ceph_inode(inode); 1019 1020 wake_up_all(&ci->i_cap_wq); 1021 if (arg) { 1022 spin_lock(&ci->i_ceph_lock); 1023 ci->i_wanted_max_size = 0; 1024 ci->i_requested_max_size = 0; 1025 spin_unlock(&ci->i_ceph_lock); 1026 } 1027 return 0; 1028 } 1029 1030 static void wake_up_session_caps(struct ceph_mds_session *session, 1031 int reconnect) 1032 { 1033 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1034 iterate_session_caps(session, wake_up_session_cb, 1035 (void *)(unsigned long)reconnect); 1036 } 1037 1038 /* 1039 * Send periodic message to MDS renewing all currently held caps. The 1040 * ack will reset the expiration for all caps from this session. 1041 * 1042 * caller holds s_mutex 1043 */ 1044 static int send_renew_caps(struct ceph_mds_client *mdsc, 1045 struct ceph_mds_session *session) 1046 { 1047 struct ceph_msg *msg; 1048 int state; 1049 1050 if (time_after_eq(jiffies, session->s_cap_ttl) && 1051 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1052 pr_info("mds%d caps stale\n", session->s_mds); 1053 session->s_renew_requested = jiffies; 1054 1055 /* do not try to renew caps until a recovering mds has reconnected 1056 * with its clients. */ 1057 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1058 if (state < CEPH_MDS_STATE_RECONNECT) { 1059 dout("send_renew_caps ignoring mds%d (%s)\n", 1060 session->s_mds, ceph_mds_state_name(state)); 1061 return 0; 1062 } 1063 1064 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1065 ceph_mds_state_name(state)); 1066 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1067 ++session->s_renew_seq); 1068 if (!msg) 1069 return -ENOMEM; 1070 ceph_con_send(&session->s_con, msg); 1071 return 0; 1072 } 1073 1074 /* 1075 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1076 * 1077 * Called under session->s_mutex 1078 */ 1079 static void renewed_caps(struct ceph_mds_client *mdsc, 1080 struct ceph_mds_session *session, int is_renew) 1081 { 1082 int was_stale; 1083 int wake = 0; 1084 1085 spin_lock(&session->s_cap_lock); 1086 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1087 1088 session->s_cap_ttl = session->s_renew_requested + 1089 mdsc->mdsmap->m_session_timeout*HZ; 1090 1091 if (was_stale) { 1092 if (time_before(jiffies, session->s_cap_ttl)) { 1093 pr_info("mds%d caps renewed\n", session->s_mds); 1094 wake = 1; 1095 } else { 1096 pr_info("mds%d caps still stale\n", session->s_mds); 1097 } 1098 } 1099 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1100 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1101 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1102 spin_unlock(&session->s_cap_lock); 1103 1104 if (wake) 1105 wake_up_session_caps(session, 0); 1106 } 1107 1108 /* 1109 * send a session close request 1110 */ 1111 static int request_close_session(struct ceph_mds_client *mdsc, 1112 struct ceph_mds_session *session) 1113 { 1114 struct ceph_msg *msg; 1115 1116 dout("request_close_session mds%d state %s seq %lld\n", 1117 session->s_mds, session_state_name(session->s_state), 1118 session->s_seq); 1119 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1120 if (!msg) 1121 return -ENOMEM; 1122 ceph_con_send(&session->s_con, msg); 1123 return 0; 1124 } 1125 1126 /* 1127 * Called with s_mutex held. 1128 */ 1129 static int __close_session(struct ceph_mds_client *mdsc, 1130 struct ceph_mds_session *session) 1131 { 1132 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1133 return 0; 1134 session->s_state = CEPH_MDS_SESSION_CLOSING; 1135 return request_close_session(mdsc, session); 1136 } 1137 1138 /* 1139 * Trim old(er) caps. 1140 * 1141 * Because we can't cache an inode without one or more caps, we do 1142 * this indirectly: if a cap is unused, we prune its aliases, at which 1143 * point the inode will hopefully get dropped to. 1144 * 1145 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1146 * memory pressure from the MDS, though, so it needn't be perfect. 1147 */ 1148 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1149 { 1150 struct ceph_mds_session *session = arg; 1151 struct ceph_inode_info *ci = ceph_inode(inode); 1152 int used, oissued, mine; 1153 1154 if (session->s_trim_caps <= 0) 1155 return -1; 1156 1157 spin_lock(&ci->i_ceph_lock); 1158 mine = cap->issued | cap->implemented; 1159 used = __ceph_caps_used(ci); 1160 oissued = __ceph_caps_issued_other(ci, cap); 1161 1162 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n", 1163 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1164 ceph_cap_string(used)); 1165 if (ci->i_dirty_caps) 1166 goto out; /* dirty caps */ 1167 if ((used & ~oissued) & mine) 1168 goto out; /* we need these caps */ 1169 1170 session->s_trim_caps--; 1171 if (oissued) { 1172 /* we aren't the only cap.. just remove us */ 1173 __ceph_remove_cap(cap); 1174 } else { 1175 /* try to drop referring dentries */ 1176 spin_unlock(&ci->i_ceph_lock); 1177 d_prune_aliases(inode); 1178 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1179 inode, cap, atomic_read(&inode->i_count)); 1180 return 0; 1181 } 1182 1183 out: 1184 spin_unlock(&ci->i_ceph_lock); 1185 return 0; 1186 } 1187 1188 /* 1189 * Trim session cap count down to some max number. 1190 */ 1191 static int trim_caps(struct ceph_mds_client *mdsc, 1192 struct ceph_mds_session *session, 1193 int max_caps) 1194 { 1195 int trim_caps = session->s_nr_caps - max_caps; 1196 1197 dout("trim_caps mds%d start: %d / %d, trim %d\n", 1198 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 1199 if (trim_caps > 0) { 1200 session->s_trim_caps = trim_caps; 1201 iterate_session_caps(session, trim_caps_cb, session); 1202 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 1203 session->s_mds, session->s_nr_caps, max_caps, 1204 trim_caps - session->s_trim_caps); 1205 session->s_trim_caps = 0; 1206 } 1207 return 0; 1208 } 1209 1210 /* 1211 * Allocate cap_release messages. If there is a partially full message 1212 * in the queue, try to allocate enough to cover it's remainder, so that 1213 * we can send it immediately. 1214 * 1215 * Called under s_mutex. 1216 */ 1217 int ceph_add_cap_releases(struct ceph_mds_client *mdsc, 1218 struct ceph_mds_session *session) 1219 { 1220 struct ceph_msg *msg, *partial = NULL; 1221 struct ceph_mds_cap_release *head; 1222 int err = -ENOMEM; 1223 int extra = mdsc->fsc->mount_options->cap_release_safety; 1224 int num; 1225 1226 dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds, 1227 extra); 1228 1229 spin_lock(&session->s_cap_lock); 1230 1231 if (!list_empty(&session->s_cap_releases)) { 1232 msg = list_first_entry(&session->s_cap_releases, 1233 struct ceph_msg, 1234 list_head); 1235 head = msg->front.iov_base; 1236 num = le32_to_cpu(head->num); 1237 if (num) { 1238 dout(" partial %p with (%d/%d)\n", msg, num, 1239 (int)CEPH_CAPS_PER_RELEASE); 1240 extra += CEPH_CAPS_PER_RELEASE - num; 1241 partial = msg; 1242 } 1243 } 1244 while (session->s_num_cap_releases < session->s_nr_caps + extra) { 1245 spin_unlock(&session->s_cap_lock); 1246 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, 1247 GFP_NOFS, false); 1248 if (!msg) 1249 goto out_unlocked; 1250 dout("add_cap_releases %p msg %p now %d\n", session, msg, 1251 (int)msg->front.iov_len); 1252 head = msg->front.iov_base; 1253 head->num = cpu_to_le32(0); 1254 msg->front.iov_len = sizeof(*head); 1255 spin_lock(&session->s_cap_lock); 1256 list_add(&msg->list_head, &session->s_cap_releases); 1257 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE; 1258 } 1259 1260 if (partial) { 1261 head = partial->front.iov_base; 1262 num = le32_to_cpu(head->num); 1263 dout(" queueing partial %p with %d/%d\n", partial, num, 1264 (int)CEPH_CAPS_PER_RELEASE); 1265 list_move_tail(&partial->list_head, 1266 &session->s_cap_releases_done); 1267 session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num; 1268 } 1269 err = 0; 1270 spin_unlock(&session->s_cap_lock); 1271 out_unlocked: 1272 return err; 1273 } 1274 1275 /* 1276 * flush all dirty inode data to disk. 1277 * 1278 * returns true if we've flushed through want_flush_seq 1279 */ 1280 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) 1281 { 1282 int mds, ret = 1; 1283 1284 dout("check_cap_flush want %lld\n", want_flush_seq); 1285 mutex_lock(&mdsc->mutex); 1286 for (mds = 0; ret && mds < mdsc->max_sessions; mds++) { 1287 struct ceph_mds_session *session = mdsc->sessions[mds]; 1288 1289 if (!session) 1290 continue; 1291 get_session(session); 1292 mutex_unlock(&mdsc->mutex); 1293 1294 mutex_lock(&session->s_mutex); 1295 if (!list_empty(&session->s_cap_flushing)) { 1296 struct ceph_inode_info *ci = 1297 list_entry(session->s_cap_flushing.next, 1298 struct ceph_inode_info, 1299 i_flushing_item); 1300 struct inode *inode = &ci->vfs_inode; 1301 1302 spin_lock(&ci->i_ceph_lock); 1303 if (ci->i_cap_flush_seq <= want_flush_seq) { 1304 dout("check_cap_flush still flushing %p " 1305 "seq %lld <= %lld to mds%d\n", inode, 1306 ci->i_cap_flush_seq, want_flush_seq, 1307 session->s_mds); 1308 ret = 0; 1309 } 1310 spin_unlock(&ci->i_ceph_lock); 1311 } 1312 mutex_unlock(&session->s_mutex); 1313 ceph_put_mds_session(session); 1314 1315 if (!ret) 1316 return ret; 1317 mutex_lock(&mdsc->mutex); 1318 } 1319 1320 mutex_unlock(&mdsc->mutex); 1321 dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq); 1322 return ret; 1323 } 1324 1325 /* 1326 * called under s_mutex 1327 */ 1328 void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 1329 struct ceph_mds_session *session) 1330 { 1331 struct ceph_msg *msg; 1332 1333 dout("send_cap_releases mds%d\n", session->s_mds); 1334 spin_lock(&session->s_cap_lock); 1335 while (!list_empty(&session->s_cap_releases_done)) { 1336 msg = list_first_entry(&session->s_cap_releases_done, 1337 struct ceph_msg, list_head); 1338 list_del_init(&msg->list_head); 1339 spin_unlock(&session->s_cap_lock); 1340 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1341 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 1342 ceph_con_send(&session->s_con, msg); 1343 spin_lock(&session->s_cap_lock); 1344 } 1345 spin_unlock(&session->s_cap_lock); 1346 } 1347 1348 static void discard_cap_releases(struct ceph_mds_client *mdsc, 1349 struct ceph_mds_session *session) 1350 { 1351 struct ceph_msg *msg; 1352 struct ceph_mds_cap_release *head; 1353 unsigned num; 1354 1355 dout("discard_cap_releases mds%d\n", session->s_mds); 1356 spin_lock(&session->s_cap_lock); 1357 1358 /* zero out the in-progress message */ 1359 msg = list_first_entry(&session->s_cap_releases, 1360 struct ceph_msg, list_head); 1361 head = msg->front.iov_base; 1362 num = le32_to_cpu(head->num); 1363 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num); 1364 head->num = cpu_to_le32(0); 1365 session->s_num_cap_releases += num; 1366 1367 /* requeue completed messages */ 1368 while (!list_empty(&session->s_cap_releases_done)) { 1369 msg = list_first_entry(&session->s_cap_releases_done, 1370 struct ceph_msg, list_head); 1371 list_del_init(&msg->list_head); 1372 1373 head = msg->front.iov_base; 1374 num = le32_to_cpu(head->num); 1375 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, 1376 num); 1377 session->s_num_cap_releases += num; 1378 head->num = cpu_to_le32(0); 1379 msg->front.iov_len = sizeof(*head); 1380 list_add(&msg->list_head, &session->s_cap_releases); 1381 } 1382 1383 spin_unlock(&session->s_cap_lock); 1384 } 1385 1386 /* 1387 * requests 1388 */ 1389 1390 /* 1391 * Create an mds request. 1392 */ 1393 struct ceph_mds_request * 1394 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 1395 { 1396 struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS); 1397 1398 if (!req) 1399 return ERR_PTR(-ENOMEM); 1400 1401 mutex_init(&req->r_fill_mutex); 1402 req->r_mdsc = mdsc; 1403 req->r_started = jiffies; 1404 req->r_resend_mds = -1; 1405 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 1406 req->r_fmode = -1; 1407 kref_init(&req->r_kref); 1408 INIT_LIST_HEAD(&req->r_wait); 1409 init_completion(&req->r_completion); 1410 init_completion(&req->r_safe_completion); 1411 INIT_LIST_HEAD(&req->r_unsafe_item); 1412 1413 req->r_op = op; 1414 req->r_direct_mode = mode; 1415 return req; 1416 } 1417 1418 /* 1419 * return oldest (lowest) request, tid in request tree, 0 if none. 1420 * 1421 * called under mdsc->mutex. 1422 */ 1423 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 1424 { 1425 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 1426 return NULL; 1427 return rb_entry(rb_first(&mdsc->request_tree), 1428 struct ceph_mds_request, r_node); 1429 } 1430 1431 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 1432 { 1433 struct ceph_mds_request *req = __get_oldest_req(mdsc); 1434 1435 if (req) 1436 return req->r_tid; 1437 return 0; 1438 } 1439 1440 /* 1441 * Build a dentry's path. Allocate on heap; caller must kfree. Based 1442 * on build_path_from_dentry in fs/cifs/dir.c. 1443 * 1444 * If @stop_on_nosnap, generate path relative to the first non-snapped 1445 * inode. 1446 * 1447 * Encode hidden .snap dirs as a double /, i.e. 1448 * foo/.snap/bar -> foo//bar 1449 */ 1450 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base, 1451 int stop_on_nosnap) 1452 { 1453 struct dentry *temp; 1454 char *path; 1455 int len, pos; 1456 unsigned seq; 1457 1458 if (dentry == NULL) 1459 return ERR_PTR(-EINVAL); 1460 1461 retry: 1462 len = 0; 1463 seq = read_seqbegin(&rename_lock); 1464 rcu_read_lock(); 1465 for (temp = dentry; !IS_ROOT(temp);) { 1466 struct inode *inode = temp->d_inode; 1467 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) 1468 len++; /* slash only */ 1469 else if (stop_on_nosnap && inode && 1470 ceph_snap(inode) == CEPH_NOSNAP) 1471 break; 1472 else 1473 len += 1 + temp->d_name.len; 1474 temp = temp->d_parent; 1475 if (temp == NULL) { 1476 rcu_read_unlock(); 1477 pr_err("build_path corrupt dentry %p\n", dentry); 1478 return ERR_PTR(-EINVAL); 1479 } 1480 } 1481 rcu_read_unlock(); 1482 if (len) 1483 len--; /* no leading '/' */ 1484 1485 path = kmalloc(len+1, GFP_NOFS); 1486 if (path == NULL) 1487 return ERR_PTR(-ENOMEM); 1488 pos = len; 1489 path[pos] = 0; /* trailing null */ 1490 rcu_read_lock(); 1491 for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) { 1492 struct inode *inode; 1493 1494 spin_lock(&temp->d_lock); 1495 inode = temp->d_inode; 1496 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 1497 dout("build_path path+%d: %p SNAPDIR\n", 1498 pos, temp); 1499 } else if (stop_on_nosnap && inode && 1500 ceph_snap(inode) == CEPH_NOSNAP) { 1501 spin_unlock(&temp->d_lock); 1502 break; 1503 } else { 1504 pos -= temp->d_name.len; 1505 if (pos < 0) { 1506 spin_unlock(&temp->d_lock); 1507 break; 1508 } 1509 strncpy(path + pos, temp->d_name.name, 1510 temp->d_name.len); 1511 } 1512 spin_unlock(&temp->d_lock); 1513 if (pos) 1514 path[--pos] = '/'; 1515 temp = temp->d_parent; 1516 if (temp == NULL) { 1517 rcu_read_unlock(); 1518 pr_err("build_path corrupt dentry\n"); 1519 kfree(path); 1520 return ERR_PTR(-EINVAL); 1521 } 1522 } 1523 rcu_read_unlock(); 1524 if (pos != 0 || read_seqretry(&rename_lock, seq)) { 1525 pr_err("build_path did not end path lookup where " 1526 "expected, namelen is %d, pos is %d\n", len, pos); 1527 /* presumably this is only possible if racing with a 1528 rename of one of the parent directories (we can not 1529 lock the dentries above us to prevent this, but 1530 retrying should be harmless) */ 1531 kfree(path); 1532 goto retry; 1533 } 1534 1535 *base = ceph_ino(temp->d_inode); 1536 *plen = len; 1537 dout("build_path on %p %d built %llx '%.*s'\n", 1538 dentry, dentry->d_count, *base, len, path); 1539 return path; 1540 } 1541 1542 static int build_dentry_path(struct dentry *dentry, 1543 const char **ppath, int *ppathlen, u64 *pino, 1544 int *pfreepath) 1545 { 1546 char *path; 1547 1548 if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) { 1549 *pino = ceph_ino(dentry->d_parent->d_inode); 1550 *ppath = dentry->d_name.name; 1551 *ppathlen = dentry->d_name.len; 1552 return 0; 1553 } 1554 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 1555 if (IS_ERR(path)) 1556 return PTR_ERR(path); 1557 *ppath = path; 1558 *pfreepath = 1; 1559 return 0; 1560 } 1561 1562 static int build_inode_path(struct inode *inode, 1563 const char **ppath, int *ppathlen, u64 *pino, 1564 int *pfreepath) 1565 { 1566 struct dentry *dentry; 1567 char *path; 1568 1569 if (ceph_snap(inode) == CEPH_NOSNAP) { 1570 *pino = ceph_ino(inode); 1571 *ppathlen = 0; 1572 return 0; 1573 } 1574 dentry = d_find_alias(inode); 1575 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 1576 dput(dentry); 1577 if (IS_ERR(path)) 1578 return PTR_ERR(path); 1579 *ppath = path; 1580 *pfreepath = 1; 1581 return 0; 1582 } 1583 1584 /* 1585 * request arguments may be specified via an inode *, a dentry *, or 1586 * an explicit ino+path. 1587 */ 1588 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 1589 const char *rpath, u64 rino, 1590 const char **ppath, int *pathlen, 1591 u64 *ino, int *freepath) 1592 { 1593 int r = 0; 1594 1595 if (rinode) { 1596 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 1597 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 1598 ceph_snap(rinode)); 1599 } else if (rdentry) { 1600 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath); 1601 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 1602 *ppath); 1603 } else if (rpath || rino) { 1604 *ino = rino; 1605 *ppath = rpath; 1606 *pathlen = strlen(rpath); 1607 dout(" path %.*s\n", *pathlen, rpath); 1608 } 1609 1610 return r; 1611 } 1612 1613 /* 1614 * called under mdsc->mutex 1615 */ 1616 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, 1617 struct ceph_mds_request *req, 1618 int mds) 1619 { 1620 struct ceph_msg *msg; 1621 struct ceph_mds_request_head *head; 1622 const char *path1 = NULL; 1623 const char *path2 = NULL; 1624 u64 ino1 = 0, ino2 = 0; 1625 int pathlen1 = 0, pathlen2 = 0; 1626 int freepath1 = 0, freepath2 = 0; 1627 int len; 1628 u16 releases; 1629 void *p, *end; 1630 int ret; 1631 1632 ret = set_request_path_attr(req->r_inode, req->r_dentry, 1633 req->r_path1, req->r_ino1.ino, 1634 &path1, &pathlen1, &ino1, &freepath1); 1635 if (ret < 0) { 1636 msg = ERR_PTR(ret); 1637 goto out; 1638 } 1639 1640 ret = set_request_path_attr(NULL, req->r_old_dentry, 1641 req->r_path2, req->r_ino2.ino, 1642 &path2, &pathlen2, &ino2, &freepath2); 1643 if (ret < 0) { 1644 msg = ERR_PTR(ret); 1645 goto out_free1; 1646 } 1647 1648 len = sizeof(*head) + 1649 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)); 1650 1651 /* calculate (max) length for cap releases */ 1652 len += sizeof(struct ceph_mds_request_release) * 1653 (!!req->r_inode_drop + !!req->r_dentry_drop + 1654 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 1655 if (req->r_dentry_drop) 1656 len += req->r_dentry->d_name.len; 1657 if (req->r_old_dentry_drop) 1658 len += req->r_old_dentry->d_name.len; 1659 1660 msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false); 1661 if (!msg) { 1662 msg = ERR_PTR(-ENOMEM); 1663 goto out_free2; 1664 } 1665 1666 msg->hdr.tid = cpu_to_le64(req->r_tid); 1667 1668 head = msg->front.iov_base; 1669 p = msg->front.iov_base + sizeof(*head); 1670 end = msg->front.iov_base + msg->front.iov_len; 1671 1672 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 1673 head->op = cpu_to_le32(req->r_op); 1674 head->caller_uid = cpu_to_le32(req->r_uid); 1675 head->caller_gid = cpu_to_le32(req->r_gid); 1676 head->args = req->r_args; 1677 1678 ceph_encode_filepath(&p, end, ino1, path1); 1679 ceph_encode_filepath(&p, end, ino2, path2); 1680 1681 /* make note of release offset, in case we need to replay */ 1682 req->r_request_release_offset = p - msg->front.iov_base; 1683 1684 /* cap releases */ 1685 releases = 0; 1686 if (req->r_inode_drop) 1687 releases += ceph_encode_inode_release(&p, 1688 req->r_inode ? req->r_inode : req->r_dentry->d_inode, 1689 mds, req->r_inode_drop, req->r_inode_unless, 0); 1690 if (req->r_dentry_drop) 1691 releases += ceph_encode_dentry_release(&p, req->r_dentry, 1692 mds, req->r_dentry_drop, req->r_dentry_unless); 1693 if (req->r_old_dentry_drop) 1694 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 1695 mds, req->r_old_dentry_drop, req->r_old_dentry_unless); 1696 if (req->r_old_inode_drop) 1697 releases += ceph_encode_inode_release(&p, 1698 req->r_old_dentry->d_inode, 1699 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 1700 head->num_releases = cpu_to_le16(releases); 1701 1702 BUG_ON(p > end); 1703 msg->front.iov_len = p - msg->front.iov_base; 1704 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1705 1706 msg->pages = req->r_pages; 1707 msg->nr_pages = req->r_num_pages; 1708 msg->hdr.data_len = cpu_to_le32(req->r_data_len); 1709 msg->hdr.data_off = cpu_to_le16(0); 1710 1711 out_free2: 1712 if (freepath2) 1713 kfree((char *)path2); 1714 out_free1: 1715 if (freepath1) 1716 kfree((char *)path1); 1717 out: 1718 return msg; 1719 } 1720 1721 /* 1722 * called under mdsc->mutex if error, under no mutex if 1723 * success. 1724 */ 1725 static void complete_request(struct ceph_mds_client *mdsc, 1726 struct ceph_mds_request *req) 1727 { 1728 if (req->r_callback) 1729 req->r_callback(mdsc, req); 1730 else 1731 complete_all(&req->r_completion); 1732 } 1733 1734 /* 1735 * called under mdsc->mutex 1736 */ 1737 static int __prepare_send_request(struct ceph_mds_client *mdsc, 1738 struct ceph_mds_request *req, 1739 int mds) 1740 { 1741 struct ceph_mds_request_head *rhead; 1742 struct ceph_msg *msg; 1743 int flags = 0; 1744 1745 req->r_attempts++; 1746 if (req->r_inode) { 1747 struct ceph_cap *cap = 1748 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 1749 1750 if (cap) 1751 req->r_sent_on_mseq = cap->mseq; 1752 else 1753 req->r_sent_on_mseq = -1; 1754 } 1755 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 1756 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 1757 1758 if (req->r_got_unsafe) { 1759 /* 1760 * Replay. Do not regenerate message (and rebuild 1761 * paths, etc.); just use the original message. 1762 * Rebuilding paths will break for renames because 1763 * d_move mangles the src name. 1764 */ 1765 msg = req->r_request; 1766 rhead = msg->front.iov_base; 1767 1768 flags = le32_to_cpu(rhead->flags); 1769 flags |= CEPH_MDS_FLAG_REPLAY; 1770 rhead->flags = cpu_to_le32(flags); 1771 1772 if (req->r_target_inode) 1773 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 1774 1775 rhead->num_retry = req->r_attempts - 1; 1776 1777 /* remove cap/dentry releases from message */ 1778 rhead->num_releases = 0; 1779 msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset); 1780 msg->front.iov_len = req->r_request_release_offset; 1781 return 0; 1782 } 1783 1784 if (req->r_request) { 1785 ceph_msg_put(req->r_request); 1786 req->r_request = NULL; 1787 } 1788 msg = create_request_message(mdsc, req, mds); 1789 if (IS_ERR(msg)) { 1790 req->r_err = PTR_ERR(msg); 1791 complete_request(mdsc, req); 1792 return PTR_ERR(msg); 1793 } 1794 req->r_request = msg; 1795 1796 rhead = msg->front.iov_base; 1797 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 1798 if (req->r_got_unsafe) 1799 flags |= CEPH_MDS_FLAG_REPLAY; 1800 if (req->r_locked_dir) 1801 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 1802 rhead->flags = cpu_to_le32(flags); 1803 rhead->num_fwd = req->r_num_fwd; 1804 rhead->num_retry = req->r_attempts - 1; 1805 rhead->ino = 0; 1806 1807 dout(" r_locked_dir = %p\n", req->r_locked_dir); 1808 return 0; 1809 } 1810 1811 /* 1812 * send request, or put it on the appropriate wait list. 1813 */ 1814 static int __do_request(struct ceph_mds_client *mdsc, 1815 struct ceph_mds_request *req) 1816 { 1817 struct ceph_mds_session *session = NULL; 1818 int mds = -1; 1819 int err = -EAGAIN; 1820 1821 if (req->r_err || req->r_got_result) 1822 goto out; 1823 1824 if (req->r_timeout && 1825 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 1826 dout("do_request timed out\n"); 1827 err = -EIO; 1828 goto finish; 1829 } 1830 1831 put_request_session(req); 1832 1833 mds = __choose_mds(mdsc, req); 1834 if (mds < 0 || 1835 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 1836 dout("do_request no mds or not active, waiting for map\n"); 1837 list_add(&req->r_wait, &mdsc->waiting_for_map); 1838 goto out; 1839 } 1840 1841 /* get, open session */ 1842 session = __ceph_lookup_mds_session(mdsc, mds); 1843 if (!session) { 1844 session = register_session(mdsc, mds); 1845 if (IS_ERR(session)) { 1846 err = PTR_ERR(session); 1847 goto finish; 1848 } 1849 } 1850 req->r_session = get_session(session); 1851 1852 dout("do_request mds%d session %p state %s\n", mds, session, 1853 session_state_name(session->s_state)); 1854 if (session->s_state != CEPH_MDS_SESSION_OPEN && 1855 session->s_state != CEPH_MDS_SESSION_HUNG) { 1856 if (session->s_state == CEPH_MDS_SESSION_NEW || 1857 session->s_state == CEPH_MDS_SESSION_CLOSING) 1858 __open_session(mdsc, session); 1859 list_add(&req->r_wait, &session->s_waiting); 1860 goto out_session; 1861 } 1862 1863 /* send request */ 1864 req->r_resend_mds = -1; /* forget any previous mds hint */ 1865 1866 if (req->r_request_started == 0) /* note request start time */ 1867 req->r_request_started = jiffies; 1868 1869 err = __prepare_send_request(mdsc, req, mds); 1870 if (!err) { 1871 ceph_msg_get(req->r_request); 1872 ceph_con_send(&session->s_con, req->r_request); 1873 } 1874 1875 out_session: 1876 ceph_put_mds_session(session); 1877 out: 1878 return err; 1879 1880 finish: 1881 req->r_err = err; 1882 complete_request(mdsc, req); 1883 goto out; 1884 } 1885 1886 /* 1887 * called under mdsc->mutex 1888 */ 1889 static void __wake_requests(struct ceph_mds_client *mdsc, 1890 struct list_head *head) 1891 { 1892 struct ceph_mds_request *req, *nreq; 1893 1894 list_for_each_entry_safe(req, nreq, head, r_wait) { 1895 list_del_init(&req->r_wait); 1896 __do_request(mdsc, req); 1897 } 1898 } 1899 1900 /* 1901 * Wake up threads with requests pending for @mds, so that they can 1902 * resubmit their requests to a possibly different mds. 1903 */ 1904 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 1905 { 1906 struct ceph_mds_request *req; 1907 struct rb_node *p; 1908 1909 dout("kick_requests mds%d\n", mds); 1910 for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) { 1911 req = rb_entry(p, struct ceph_mds_request, r_node); 1912 if (req->r_got_unsafe) 1913 continue; 1914 if (req->r_session && 1915 req->r_session->s_mds == mds) { 1916 dout(" kicking tid %llu\n", req->r_tid); 1917 __do_request(mdsc, req); 1918 } 1919 } 1920 } 1921 1922 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, 1923 struct ceph_mds_request *req) 1924 { 1925 dout("submit_request on %p\n", req); 1926 mutex_lock(&mdsc->mutex); 1927 __register_request(mdsc, req, NULL); 1928 __do_request(mdsc, req); 1929 mutex_unlock(&mdsc->mutex); 1930 } 1931 1932 /* 1933 * Synchrously perform an mds request. Take care of all of the 1934 * session setup, forwarding, retry details. 1935 */ 1936 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 1937 struct inode *dir, 1938 struct ceph_mds_request *req) 1939 { 1940 int err; 1941 1942 dout("do_request on %p\n", req); 1943 1944 /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */ 1945 if (req->r_inode) 1946 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 1947 if (req->r_locked_dir) 1948 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); 1949 if (req->r_old_dentry) 1950 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 1951 CEPH_CAP_PIN); 1952 1953 /* issue */ 1954 mutex_lock(&mdsc->mutex); 1955 __register_request(mdsc, req, dir); 1956 __do_request(mdsc, req); 1957 1958 if (req->r_err) { 1959 err = req->r_err; 1960 __unregister_request(mdsc, req); 1961 dout("do_request early error %d\n", err); 1962 goto out; 1963 } 1964 1965 /* wait */ 1966 mutex_unlock(&mdsc->mutex); 1967 dout("do_request waiting\n"); 1968 if (req->r_timeout) { 1969 err = (long)wait_for_completion_killable_timeout( 1970 &req->r_completion, req->r_timeout); 1971 if (err == 0) 1972 err = -EIO; 1973 } else { 1974 err = wait_for_completion_killable(&req->r_completion); 1975 } 1976 dout("do_request waited, got %d\n", err); 1977 mutex_lock(&mdsc->mutex); 1978 1979 /* only abort if we didn't race with a real reply */ 1980 if (req->r_got_result) { 1981 err = le32_to_cpu(req->r_reply_info.head->result); 1982 } else if (err < 0) { 1983 dout("aborted request %lld with %d\n", req->r_tid, err); 1984 1985 /* 1986 * ensure we aren't running concurrently with 1987 * ceph_fill_trace or ceph_readdir_prepopulate, which 1988 * rely on locks (dir mutex) held by our caller. 1989 */ 1990 mutex_lock(&req->r_fill_mutex); 1991 req->r_err = err; 1992 req->r_aborted = true; 1993 mutex_unlock(&req->r_fill_mutex); 1994 1995 if (req->r_locked_dir && 1996 (req->r_op & CEPH_MDS_OP_WRITE)) 1997 ceph_invalidate_dir_request(req); 1998 } else { 1999 err = req->r_err; 2000 } 2001 2002 out: 2003 mutex_unlock(&mdsc->mutex); 2004 dout("do_request %p done, result %d\n", req, err); 2005 return err; 2006 } 2007 2008 /* 2009 * Invalidate dir D_COMPLETE, dentry lease state on an aborted MDS 2010 * namespace request. 2011 */ 2012 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 2013 { 2014 struct inode *inode = req->r_locked_dir; 2015 struct ceph_inode_info *ci = ceph_inode(inode); 2016 2017 dout("invalidate_dir_request %p (D_COMPLETE, lease(s))\n", inode); 2018 spin_lock(&ci->i_ceph_lock); 2019 ceph_dir_clear_complete(inode); 2020 ci->i_release_count++; 2021 spin_unlock(&ci->i_ceph_lock); 2022 2023 if (req->r_dentry) 2024 ceph_invalidate_dentry_lease(req->r_dentry); 2025 if (req->r_old_dentry) 2026 ceph_invalidate_dentry_lease(req->r_old_dentry); 2027 } 2028 2029 /* 2030 * Handle mds reply. 2031 * 2032 * We take the session mutex and parse and process the reply immediately. 2033 * This preserves the logical ordering of replies, capabilities, etc., sent 2034 * by the MDS as they are applied to our local cache. 2035 */ 2036 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 2037 { 2038 struct ceph_mds_client *mdsc = session->s_mdsc; 2039 struct ceph_mds_request *req; 2040 struct ceph_mds_reply_head *head = msg->front.iov_base; 2041 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 2042 u64 tid; 2043 int err, result; 2044 int mds = session->s_mds; 2045 2046 if (msg->front.iov_len < sizeof(*head)) { 2047 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 2048 ceph_msg_dump(msg); 2049 return; 2050 } 2051 2052 /* get request, session */ 2053 tid = le64_to_cpu(msg->hdr.tid); 2054 mutex_lock(&mdsc->mutex); 2055 req = __lookup_request(mdsc, tid); 2056 if (!req) { 2057 dout("handle_reply on unknown tid %llu\n", tid); 2058 mutex_unlock(&mdsc->mutex); 2059 return; 2060 } 2061 dout("handle_reply %p\n", req); 2062 2063 /* correct session? */ 2064 if (req->r_session != session) { 2065 pr_err("mdsc_handle_reply got %llu on session mds%d" 2066 " not mds%d\n", tid, session->s_mds, 2067 req->r_session ? req->r_session->s_mds : -1); 2068 mutex_unlock(&mdsc->mutex); 2069 goto out; 2070 } 2071 2072 /* dup? */ 2073 if ((req->r_got_unsafe && !head->safe) || 2074 (req->r_got_safe && head->safe)) { 2075 pr_warning("got a dup %s reply on %llu from mds%d\n", 2076 head->safe ? "safe" : "unsafe", tid, mds); 2077 mutex_unlock(&mdsc->mutex); 2078 goto out; 2079 } 2080 if (req->r_got_safe && !head->safe) { 2081 pr_warning("got unsafe after safe on %llu from mds%d\n", 2082 tid, mds); 2083 mutex_unlock(&mdsc->mutex); 2084 goto out; 2085 } 2086 2087 result = le32_to_cpu(head->result); 2088 2089 /* 2090 * Handle an ESTALE 2091 * if we're not talking to the authority, send to them 2092 * if the authority has changed while we weren't looking, 2093 * send to new authority 2094 * Otherwise we just have to return an ESTALE 2095 */ 2096 if (result == -ESTALE) { 2097 dout("got ESTALE on request %llu", req->r_tid); 2098 if (!req->r_inode) { 2099 /* do nothing; not an authority problem */ 2100 } else if (req->r_direct_mode != USE_AUTH_MDS) { 2101 dout("not using auth, setting for that now"); 2102 req->r_direct_mode = USE_AUTH_MDS; 2103 __do_request(mdsc, req); 2104 mutex_unlock(&mdsc->mutex); 2105 goto out; 2106 } else { 2107 struct ceph_inode_info *ci = ceph_inode(req->r_inode); 2108 struct ceph_cap *cap = NULL; 2109 2110 if (req->r_session) 2111 cap = ceph_get_cap_for_mds(ci, 2112 req->r_session->s_mds); 2113 2114 dout("already using auth"); 2115 if ((!cap || cap != ci->i_auth_cap) || 2116 (cap->mseq != req->r_sent_on_mseq)) { 2117 dout("but cap changed, so resending"); 2118 __do_request(mdsc, req); 2119 mutex_unlock(&mdsc->mutex); 2120 goto out; 2121 } 2122 } 2123 dout("have to return ESTALE on request %llu", req->r_tid); 2124 } 2125 2126 2127 if (head->safe) { 2128 req->r_got_safe = true; 2129 __unregister_request(mdsc, req); 2130 complete_all(&req->r_safe_completion); 2131 2132 if (req->r_got_unsafe) { 2133 /* 2134 * We already handled the unsafe response, now do the 2135 * cleanup. No need to examine the response; the MDS 2136 * doesn't include any result info in the safe 2137 * response. And even if it did, there is nothing 2138 * useful we could do with a revised return value. 2139 */ 2140 dout("got safe reply %llu, mds%d\n", tid, mds); 2141 list_del_init(&req->r_unsafe_item); 2142 2143 /* last unsafe request during umount? */ 2144 if (mdsc->stopping && !__get_oldest_req(mdsc)) 2145 complete_all(&mdsc->safe_umount_waiters); 2146 mutex_unlock(&mdsc->mutex); 2147 goto out; 2148 } 2149 } else { 2150 req->r_got_unsafe = true; 2151 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 2152 } 2153 2154 dout("handle_reply tid %lld result %d\n", tid, result); 2155 rinfo = &req->r_reply_info; 2156 err = parse_reply_info(msg, rinfo, session->s_con.peer_features); 2157 mutex_unlock(&mdsc->mutex); 2158 2159 mutex_lock(&session->s_mutex); 2160 if (err < 0) { 2161 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 2162 ceph_msg_dump(msg); 2163 goto out_err; 2164 } 2165 2166 /* snap trace */ 2167 if (rinfo->snapblob_len) { 2168 down_write(&mdsc->snap_rwsem); 2169 ceph_update_snap_trace(mdsc, rinfo->snapblob, 2170 rinfo->snapblob + rinfo->snapblob_len, 2171 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP); 2172 downgrade_write(&mdsc->snap_rwsem); 2173 } else { 2174 down_read(&mdsc->snap_rwsem); 2175 } 2176 2177 /* insert trace into our cache */ 2178 mutex_lock(&req->r_fill_mutex); 2179 err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); 2180 if (err == 0) { 2181 if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK && 2182 rinfo->dir_nr) 2183 ceph_readdir_prepopulate(req, req->r_session); 2184 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 2185 } 2186 mutex_unlock(&req->r_fill_mutex); 2187 2188 up_read(&mdsc->snap_rwsem); 2189 out_err: 2190 mutex_lock(&mdsc->mutex); 2191 if (!req->r_aborted) { 2192 if (err) { 2193 req->r_err = err; 2194 } else { 2195 req->r_reply = msg; 2196 ceph_msg_get(msg); 2197 req->r_got_result = true; 2198 } 2199 } else { 2200 dout("reply arrived after request %lld was aborted\n", tid); 2201 } 2202 mutex_unlock(&mdsc->mutex); 2203 2204 ceph_add_cap_releases(mdsc, req->r_session); 2205 mutex_unlock(&session->s_mutex); 2206 2207 /* kick calling process */ 2208 complete_request(mdsc, req); 2209 out: 2210 ceph_mdsc_put_request(req); 2211 return; 2212 } 2213 2214 2215 2216 /* 2217 * handle mds notification that our request has been forwarded. 2218 */ 2219 static void handle_forward(struct ceph_mds_client *mdsc, 2220 struct ceph_mds_session *session, 2221 struct ceph_msg *msg) 2222 { 2223 struct ceph_mds_request *req; 2224 u64 tid = le64_to_cpu(msg->hdr.tid); 2225 u32 next_mds; 2226 u32 fwd_seq; 2227 int err = -EINVAL; 2228 void *p = msg->front.iov_base; 2229 void *end = p + msg->front.iov_len; 2230 2231 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 2232 next_mds = ceph_decode_32(&p); 2233 fwd_seq = ceph_decode_32(&p); 2234 2235 mutex_lock(&mdsc->mutex); 2236 req = __lookup_request(mdsc, tid); 2237 if (!req) { 2238 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 2239 goto out; /* dup reply? */ 2240 } 2241 2242 if (req->r_aborted) { 2243 dout("forward tid %llu aborted, unregistering\n", tid); 2244 __unregister_request(mdsc, req); 2245 } else if (fwd_seq <= req->r_num_fwd) { 2246 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 2247 tid, next_mds, req->r_num_fwd, fwd_seq); 2248 } else { 2249 /* resend. forward race not possible; mds would drop */ 2250 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 2251 BUG_ON(req->r_err); 2252 BUG_ON(req->r_got_result); 2253 req->r_num_fwd = fwd_seq; 2254 req->r_resend_mds = next_mds; 2255 put_request_session(req); 2256 __do_request(mdsc, req); 2257 } 2258 ceph_mdsc_put_request(req); 2259 out: 2260 mutex_unlock(&mdsc->mutex); 2261 return; 2262 2263 bad: 2264 pr_err("mdsc_handle_forward decode error err=%d\n", err); 2265 } 2266 2267 /* 2268 * handle a mds session control message 2269 */ 2270 static void handle_session(struct ceph_mds_session *session, 2271 struct ceph_msg *msg) 2272 { 2273 struct ceph_mds_client *mdsc = session->s_mdsc; 2274 u32 op; 2275 u64 seq; 2276 int mds = session->s_mds; 2277 struct ceph_mds_session_head *h = msg->front.iov_base; 2278 int wake = 0; 2279 2280 /* decode */ 2281 if (msg->front.iov_len != sizeof(*h)) 2282 goto bad; 2283 op = le32_to_cpu(h->op); 2284 seq = le64_to_cpu(h->seq); 2285 2286 mutex_lock(&mdsc->mutex); 2287 if (op == CEPH_SESSION_CLOSE) 2288 __unregister_session(mdsc, session); 2289 /* FIXME: this ttl calculation is generous */ 2290 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 2291 mutex_unlock(&mdsc->mutex); 2292 2293 mutex_lock(&session->s_mutex); 2294 2295 dout("handle_session mds%d %s %p state %s seq %llu\n", 2296 mds, ceph_session_op_name(op), session, 2297 session_state_name(session->s_state), seq); 2298 2299 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 2300 session->s_state = CEPH_MDS_SESSION_OPEN; 2301 pr_info("mds%d came back\n", session->s_mds); 2302 } 2303 2304 switch (op) { 2305 case CEPH_SESSION_OPEN: 2306 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 2307 pr_info("mds%d reconnect success\n", session->s_mds); 2308 session->s_state = CEPH_MDS_SESSION_OPEN; 2309 renewed_caps(mdsc, session, 0); 2310 wake = 1; 2311 if (mdsc->stopping) 2312 __close_session(mdsc, session); 2313 break; 2314 2315 case CEPH_SESSION_RENEWCAPS: 2316 if (session->s_renew_seq == seq) 2317 renewed_caps(mdsc, session, 1); 2318 break; 2319 2320 case CEPH_SESSION_CLOSE: 2321 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 2322 pr_info("mds%d reconnect denied\n", session->s_mds); 2323 remove_session_caps(session); 2324 wake = 1; /* for good measure */ 2325 wake_up_all(&mdsc->session_close_wq); 2326 kick_requests(mdsc, mds); 2327 break; 2328 2329 case CEPH_SESSION_STALE: 2330 pr_info("mds%d caps went stale, renewing\n", 2331 session->s_mds); 2332 spin_lock(&session->s_gen_ttl_lock); 2333 session->s_cap_gen++; 2334 session->s_cap_ttl = jiffies - 1; 2335 spin_unlock(&session->s_gen_ttl_lock); 2336 send_renew_caps(mdsc, session); 2337 break; 2338 2339 case CEPH_SESSION_RECALL_STATE: 2340 trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 2341 break; 2342 2343 default: 2344 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 2345 WARN_ON(1); 2346 } 2347 2348 mutex_unlock(&session->s_mutex); 2349 if (wake) { 2350 mutex_lock(&mdsc->mutex); 2351 __wake_requests(mdsc, &session->s_waiting); 2352 mutex_unlock(&mdsc->mutex); 2353 } 2354 return; 2355 2356 bad: 2357 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 2358 (int)msg->front.iov_len); 2359 ceph_msg_dump(msg); 2360 return; 2361 } 2362 2363 2364 /* 2365 * called under session->mutex. 2366 */ 2367 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 2368 struct ceph_mds_session *session) 2369 { 2370 struct ceph_mds_request *req, *nreq; 2371 int err; 2372 2373 dout("replay_unsafe_requests mds%d\n", session->s_mds); 2374 2375 mutex_lock(&mdsc->mutex); 2376 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) { 2377 err = __prepare_send_request(mdsc, req, session->s_mds); 2378 if (!err) { 2379 ceph_msg_get(req->r_request); 2380 ceph_con_send(&session->s_con, req->r_request); 2381 } 2382 } 2383 mutex_unlock(&mdsc->mutex); 2384 } 2385 2386 /* 2387 * Encode information about a cap for a reconnect with the MDS. 2388 */ 2389 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, 2390 void *arg) 2391 { 2392 union { 2393 struct ceph_mds_cap_reconnect v2; 2394 struct ceph_mds_cap_reconnect_v1 v1; 2395 } rec; 2396 size_t reclen; 2397 struct ceph_inode_info *ci; 2398 struct ceph_reconnect_state *recon_state = arg; 2399 struct ceph_pagelist *pagelist = recon_state->pagelist; 2400 char *path; 2401 int pathlen, err; 2402 u64 pathbase; 2403 struct dentry *dentry; 2404 2405 ci = cap->ci; 2406 2407 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 2408 inode, ceph_vinop(inode), cap, cap->cap_id, 2409 ceph_cap_string(cap->issued)); 2410 err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 2411 if (err) 2412 return err; 2413 2414 dentry = d_find_alias(inode); 2415 if (dentry) { 2416 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0); 2417 if (IS_ERR(path)) { 2418 err = PTR_ERR(path); 2419 goto out_dput; 2420 } 2421 } else { 2422 path = NULL; 2423 pathlen = 0; 2424 } 2425 err = ceph_pagelist_encode_string(pagelist, path, pathlen); 2426 if (err) 2427 goto out_free; 2428 2429 spin_lock(&ci->i_ceph_lock); 2430 cap->seq = 0; /* reset cap seq */ 2431 cap->issue_seq = 0; /* and issue_seq */ 2432 2433 if (recon_state->flock) { 2434 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 2435 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 2436 rec.v2.issued = cpu_to_le32(cap->issued); 2437 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 2438 rec.v2.pathbase = cpu_to_le64(pathbase); 2439 rec.v2.flock_len = 0; 2440 reclen = sizeof(rec.v2); 2441 } else { 2442 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 2443 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 2444 rec.v1.issued = cpu_to_le32(cap->issued); 2445 rec.v1.size = cpu_to_le64(inode->i_size); 2446 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime); 2447 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime); 2448 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 2449 rec.v1.pathbase = cpu_to_le64(pathbase); 2450 reclen = sizeof(rec.v1); 2451 } 2452 spin_unlock(&ci->i_ceph_lock); 2453 2454 if (recon_state->flock) { 2455 int num_fcntl_locks, num_flock_locks; 2456 struct ceph_pagelist_cursor trunc_point; 2457 2458 ceph_pagelist_set_cursor(pagelist, &trunc_point); 2459 do { 2460 lock_flocks(); 2461 ceph_count_locks(inode, &num_fcntl_locks, 2462 &num_flock_locks); 2463 rec.v2.flock_len = (2*sizeof(u32) + 2464 (num_fcntl_locks+num_flock_locks) * 2465 sizeof(struct ceph_filelock)); 2466 unlock_flocks(); 2467 2468 /* pre-alloc pagelist */ 2469 ceph_pagelist_truncate(pagelist, &trunc_point); 2470 err = ceph_pagelist_append(pagelist, &rec, reclen); 2471 if (!err) 2472 err = ceph_pagelist_reserve(pagelist, 2473 rec.v2.flock_len); 2474 2475 /* encode locks */ 2476 if (!err) { 2477 lock_flocks(); 2478 err = ceph_encode_locks(inode, 2479 pagelist, 2480 num_fcntl_locks, 2481 num_flock_locks); 2482 unlock_flocks(); 2483 } 2484 } while (err == -ENOSPC); 2485 } else { 2486 err = ceph_pagelist_append(pagelist, &rec, reclen); 2487 } 2488 2489 out_free: 2490 kfree(path); 2491 out_dput: 2492 dput(dentry); 2493 return err; 2494 } 2495 2496 2497 /* 2498 * If an MDS fails and recovers, clients need to reconnect in order to 2499 * reestablish shared state. This includes all caps issued through 2500 * this session _and_ the snap_realm hierarchy. Because it's not 2501 * clear which snap realms the mds cares about, we send everything we 2502 * know about.. that ensures we'll then get any new info the 2503 * recovering MDS might have. 2504 * 2505 * This is a relatively heavyweight operation, but it's rare. 2506 * 2507 * called with mdsc->mutex held. 2508 */ 2509 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 2510 struct ceph_mds_session *session) 2511 { 2512 struct ceph_msg *reply; 2513 struct rb_node *p; 2514 int mds = session->s_mds; 2515 int err = -ENOMEM; 2516 struct ceph_pagelist *pagelist; 2517 struct ceph_reconnect_state recon_state; 2518 2519 pr_info("mds%d reconnect start\n", mds); 2520 2521 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 2522 if (!pagelist) 2523 goto fail_nopagelist; 2524 ceph_pagelist_init(pagelist); 2525 2526 reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false); 2527 if (!reply) 2528 goto fail_nomsg; 2529 2530 mutex_lock(&session->s_mutex); 2531 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 2532 session->s_seq = 0; 2533 2534 ceph_con_open(&session->s_con, 2535 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 2536 2537 /* replay unsafe requests */ 2538 replay_unsafe_requests(mdsc, session); 2539 2540 down_read(&mdsc->snap_rwsem); 2541 2542 dout("session %p state %s\n", session, 2543 session_state_name(session->s_state)); 2544 2545 /* drop old cap expires; we're about to reestablish that state */ 2546 discard_cap_releases(mdsc, session); 2547 2548 /* traverse this session's caps */ 2549 err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps); 2550 if (err) 2551 goto fail; 2552 2553 recon_state.pagelist = pagelist; 2554 recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK; 2555 err = iterate_session_caps(session, encode_caps_cb, &recon_state); 2556 if (err < 0) 2557 goto fail; 2558 2559 /* 2560 * snaprealms. we provide mds with the ino, seq (version), and 2561 * parent for all of our realms. If the mds has any newer info, 2562 * it will tell us. 2563 */ 2564 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 2565 struct ceph_snap_realm *realm = 2566 rb_entry(p, struct ceph_snap_realm, node); 2567 struct ceph_mds_snaprealm_reconnect sr_rec; 2568 2569 dout(" adding snap realm %llx seq %lld parent %llx\n", 2570 realm->ino, realm->seq, realm->parent_ino); 2571 sr_rec.ino = cpu_to_le64(realm->ino); 2572 sr_rec.seq = cpu_to_le64(realm->seq); 2573 sr_rec.parent = cpu_to_le64(realm->parent_ino); 2574 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 2575 if (err) 2576 goto fail; 2577 } 2578 2579 reply->pagelist = pagelist; 2580 if (recon_state.flock) 2581 reply->hdr.version = cpu_to_le16(2); 2582 reply->hdr.data_len = cpu_to_le32(pagelist->length); 2583 reply->nr_pages = calc_pages_for(0, pagelist->length); 2584 ceph_con_send(&session->s_con, reply); 2585 2586 mutex_unlock(&session->s_mutex); 2587 2588 mutex_lock(&mdsc->mutex); 2589 __wake_requests(mdsc, &session->s_waiting); 2590 mutex_unlock(&mdsc->mutex); 2591 2592 up_read(&mdsc->snap_rwsem); 2593 return; 2594 2595 fail: 2596 ceph_msg_put(reply); 2597 up_read(&mdsc->snap_rwsem); 2598 mutex_unlock(&session->s_mutex); 2599 fail_nomsg: 2600 ceph_pagelist_release(pagelist); 2601 kfree(pagelist); 2602 fail_nopagelist: 2603 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 2604 return; 2605 } 2606 2607 2608 /* 2609 * compare old and new mdsmaps, kicking requests 2610 * and closing out old connections as necessary 2611 * 2612 * called under mdsc->mutex. 2613 */ 2614 static void check_new_map(struct ceph_mds_client *mdsc, 2615 struct ceph_mdsmap *newmap, 2616 struct ceph_mdsmap *oldmap) 2617 { 2618 int i; 2619 int oldstate, newstate; 2620 struct ceph_mds_session *s; 2621 2622 dout("check_new_map new %u old %u\n", 2623 newmap->m_epoch, oldmap->m_epoch); 2624 2625 for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) { 2626 if (mdsc->sessions[i] == NULL) 2627 continue; 2628 s = mdsc->sessions[i]; 2629 oldstate = ceph_mdsmap_get_state(oldmap, i); 2630 newstate = ceph_mdsmap_get_state(newmap, i); 2631 2632 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 2633 i, ceph_mds_state_name(oldstate), 2634 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 2635 ceph_mds_state_name(newstate), 2636 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 2637 session_state_name(s->s_state)); 2638 2639 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 2640 ceph_mdsmap_get_addr(newmap, i), 2641 sizeof(struct ceph_entity_addr))) { 2642 if (s->s_state == CEPH_MDS_SESSION_OPENING) { 2643 /* the session never opened, just close it 2644 * out now */ 2645 __wake_requests(mdsc, &s->s_waiting); 2646 __unregister_session(mdsc, s); 2647 } else { 2648 /* just close it */ 2649 mutex_unlock(&mdsc->mutex); 2650 mutex_lock(&s->s_mutex); 2651 mutex_lock(&mdsc->mutex); 2652 ceph_con_close(&s->s_con); 2653 mutex_unlock(&s->s_mutex); 2654 s->s_state = CEPH_MDS_SESSION_RESTARTING; 2655 } 2656 2657 /* kick any requests waiting on the recovering mds */ 2658 kick_requests(mdsc, i); 2659 } else if (oldstate == newstate) { 2660 continue; /* nothing new with this mds */ 2661 } 2662 2663 /* 2664 * send reconnect? 2665 */ 2666 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 2667 newstate >= CEPH_MDS_STATE_RECONNECT) { 2668 mutex_unlock(&mdsc->mutex); 2669 send_mds_reconnect(mdsc, s); 2670 mutex_lock(&mdsc->mutex); 2671 } 2672 2673 /* 2674 * kick request on any mds that has gone active. 2675 */ 2676 if (oldstate < CEPH_MDS_STATE_ACTIVE && 2677 newstate >= CEPH_MDS_STATE_ACTIVE) { 2678 if (oldstate != CEPH_MDS_STATE_CREATING && 2679 oldstate != CEPH_MDS_STATE_STARTING) 2680 pr_info("mds%d recovery completed\n", s->s_mds); 2681 kick_requests(mdsc, i); 2682 ceph_kick_flushing_caps(mdsc, s); 2683 wake_up_session_caps(s, 1); 2684 } 2685 } 2686 2687 for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) { 2688 s = mdsc->sessions[i]; 2689 if (!s) 2690 continue; 2691 if (!ceph_mdsmap_is_laggy(newmap, i)) 2692 continue; 2693 if (s->s_state == CEPH_MDS_SESSION_OPEN || 2694 s->s_state == CEPH_MDS_SESSION_HUNG || 2695 s->s_state == CEPH_MDS_SESSION_CLOSING) { 2696 dout(" connecting to export targets of laggy mds%d\n", 2697 i); 2698 __open_export_target_sessions(mdsc, s); 2699 } 2700 } 2701 } 2702 2703 2704 2705 /* 2706 * leases 2707 */ 2708 2709 /* 2710 * caller must hold session s_mutex, dentry->d_lock 2711 */ 2712 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 2713 { 2714 struct ceph_dentry_info *di = ceph_dentry(dentry); 2715 2716 ceph_put_mds_session(di->lease_session); 2717 di->lease_session = NULL; 2718 } 2719 2720 static void handle_lease(struct ceph_mds_client *mdsc, 2721 struct ceph_mds_session *session, 2722 struct ceph_msg *msg) 2723 { 2724 struct super_block *sb = mdsc->fsc->sb; 2725 struct inode *inode; 2726 struct dentry *parent, *dentry; 2727 struct ceph_dentry_info *di; 2728 int mds = session->s_mds; 2729 struct ceph_mds_lease *h = msg->front.iov_base; 2730 u32 seq; 2731 struct ceph_vino vino; 2732 struct qstr dname; 2733 int release = 0; 2734 2735 dout("handle_lease from mds%d\n", mds); 2736 2737 /* decode */ 2738 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 2739 goto bad; 2740 vino.ino = le64_to_cpu(h->ino); 2741 vino.snap = CEPH_NOSNAP; 2742 seq = le32_to_cpu(h->seq); 2743 dname.name = (void *)h + sizeof(*h) + sizeof(u32); 2744 dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32); 2745 if (dname.len != get_unaligned_le32(h+1)) 2746 goto bad; 2747 2748 mutex_lock(&session->s_mutex); 2749 session->s_seq++; 2750 2751 /* lookup inode */ 2752 inode = ceph_find_inode(sb, vino); 2753 dout("handle_lease %s, ino %llx %p %.*s\n", 2754 ceph_lease_op_name(h->action), vino.ino, inode, 2755 dname.len, dname.name); 2756 if (inode == NULL) { 2757 dout("handle_lease no inode %llx\n", vino.ino); 2758 goto release; 2759 } 2760 2761 /* dentry */ 2762 parent = d_find_alias(inode); 2763 if (!parent) { 2764 dout("no parent dentry on inode %p\n", inode); 2765 WARN_ON(1); 2766 goto release; /* hrm... */ 2767 } 2768 dname.hash = full_name_hash(dname.name, dname.len); 2769 dentry = d_lookup(parent, &dname); 2770 dput(parent); 2771 if (!dentry) 2772 goto release; 2773 2774 spin_lock(&dentry->d_lock); 2775 di = ceph_dentry(dentry); 2776 switch (h->action) { 2777 case CEPH_MDS_LEASE_REVOKE: 2778 if (di->lease_session == session) { 2779 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 2780 h->seq = cpu_to_le32(di->lease_seq); 2781 __ceph_mdsc_drop_dentry_lease(dentry); 2782 } 2783 release = 1; 2784 break; 2785 2786 case CEPH_MDS_LEASE_RENEW: 2787 if (di->lease_session == session && 2788 di->lease_gen == session->s_cap_gen && 2789 di->lease_renew_from && 2790 di->lease_renew_after == 0) { 2791 unsigned long duration = 2792 le32_to_cpu(h->duration_ms) * HZ / 1000; 2793 2794 di->lease_seq = seq; 2795 dentry->d_time = di->lease_renew_from + duration; 2796 di->lease_renew_after = di->lease_renew_from + 2797 (duration >> 1); 2798 di->lease_renew_from = 0; 2799 } 2800 break; 2801 } 2802 spin_unlock(&dentry->d_lock); 2803 dput(dentry); 2804 2805 if (!release) 2806 goto out; 2807 2808 release: 2809 /* let's just reuse the same message */ 2810 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 2811 ceph_msg_get(msg); 2812 ceph_con_send(&session->s_con, msg); 2813 2814 out: 2815 iput(inode); 2816 mutex_unlock(&session->s_mutex); 2817 return; 2818 2819 bad: 2820 pr_err("corrupt lease message\n"); 2821 ceph_msg_dump(msg); 2822 } 2823 2824 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 2825 struct inode *inode, 2826 struct dentry *dentry, char action, 2827 u32 seq) 2828 { 2829 struct ceph_msg *msg; 2830 struct ceph_mds_lease *lease; 2831 int len = sizeof(*lease) + sizeof(u32); 2832 int dnamelen = 0; 2833 2834 dout("lease_send_msg inode %p dentry %p %s to mds%d\n", 2835 inode, dentry, ceph_lease_op_name(action), session->s_mds); 2836 dnamelen = dentry->d_name.len; 2837 len += dnamelen; 2838 2839 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 2840 if (!msg) 2841 return; 2842 lease = msg->front.iov_base; 2843 lease->action = action; 2844 lease->ino = cpu_to_le64(ceph_vino(inode).ino); 2845 lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap); 2846 lease->seq = cpu_to_le32(seq); 2847 put_unaligned_le32(dnamelen, lease + 1); 2848 memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen); 2849 2850 /* 2851 * if this is a preemptive lease RELEASE, no need to 2852 * flush request stream, since the actual request will 2853 * soon follow. 2854 */ 2855 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 2856 2857 ceph_con_send(&session->s_con, msg); 2858 } 2859 2860 /* 2861 * Preemptively release a lease we expect to invalidate anyway. 2862 * Pass @inode always, @dentry is optional. 2863 */ 2864 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode, 2865 struct dentry *dentry) 2866 { 2867 struct ceph_dentry_info *di; 2868 struct ceph_mds_session *session; 2869 u32 seq; 2870 2871 BUG_ON(inode == NULL); 2872 BUG_ON(dentry == NULL); 2873 2874 /* is dentry lease valid? */ 2875 spin_lock(&dentry->d_lock); 2876 di = ceph_dentry(dentry); 2877 if (!di || !di->lease_session || 2878 di->lease_session->s_mds < 0 || 2879 di->lease_gen != di->lease_session->s_cap_gen || 2880 !time_before(jiffies, dentry->d_time)) { 2881 dout("lease_release inode %p dentry %p -- " 2882 "no lease\n", 2883 inode, dentry); 2884 spin_unlock(&dentry->d_lock); 2885 return; 2886 } 2887 2888 /* we do have a lease on this dentry; note mds and seq */ 2889 session = ceph_get_mds_session(di->lease_session); 2890 seq = di->lease_seq; 2891 __ceph_mdsc_drop_dentry_lease(dentry); 2892 spin_unlock(&dentry->d_lock); 2893 2894 dout("lease_release inode %p dentry %p to mds%d\n", 2895 inode, dentry, session->s_mds); 2896 ceph_mdsc_lease_send_msg(session, inode, dentry, 2897 CEPH_MDS_LEASE_RELEASE, seq); 2898 ceph_put_mds_session(session); 2899 } 2900 2901 /* 2902 * drop all leases (and dentry refs) in preparation for umount 2903 */ 2904 static void drop_leases(struct ceph_mds_client *mdsc) 2905 { 2906 int i; 2907 2908 dout("drop_leases\n"); 2909 mutex_lock(&mdsc->mutex); 2910 for (i = 0; i < mdsc->max_sessions; i++) { 2911 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 2912 if (!s) 2913 continue; 2914 mutex_unlock(&mdsc->mutex); 2915 mutex_lock(&s->s_mutex); 2916 mutex_unlock(&s->s_mutex); 2917 ceph_put_mds_session(s); 2918 mutex_lock(&mdsc->mutex); 2919 } 2920 mutex_unlock(&mdsc->mutex); 2921 } 2922 2923 2924 2925 /* 2926 * delayed work -- periodically trim expired leases, renew caps with mds 2927 */ 2928 static void schedule_delayed(struct ceph_mds_client *mdsc) 2929 { 2930 int delay = 5; 2931 unsigned hz = round_jiffies_relative(HZ * delay); 2932 schedule_delayed_work(&mdsc->delayed_work, hz); 2933 } 2934 2935 static void delayed_work(struct work_struct *work) 2936 { 2937 int i; 2938 struct ceph_mds_client *mdsc = 2939 container_of(work, struct ceph_mds_client, delayed_work.work); 2940 int renew_interval; 2941 int renew_caps; 2942 2943 dout("mdsc delayed_work\n"); 2944 ceph_check_delayed_caps(mdsc); 2945 2946 mutex_lock(&mdsc->mutex); 2947 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 2948 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 2949 mdsc->last_renew_caps); 2950 if (renew_caps) 2951 mdsc->last_renew_caps = jiffies; 2952 2953 for (i = 0; i < mdsc->max_sessions; i++) { 2954 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 2955 if (s == NULL) 2956 continue; 2957 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 2958 dout("resending session close request for mds%d\n", 2959 s->s_mds); 2960 request_close_session(mdsc, s); 2961 ceph_put_mds_session(s); 2962 continue; 2963 } 2964 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 2965 if (s->s_state == CEPH_MDS_SESSION_OPEN) { 2966 s->s_state = CEPH_MDS_SESSION_HUNG; 2967 pr_info("mds%d hung\n", s->s_mds); 2968 } 2969 } 2970 if (s->s_state < CEPH_MDS_SESSION_OPEN) { 2971 /* this mds is failed or recovering, just wait */ 2972 ceph_put_mds_session(s); 2973 continue; 2974 } 2975 mutex_unlock(&mdsc->mutex); 2976 2977 mutex_lock(&s->s_mutex); 2978 if (renew_caps) 2979 send_renew_caps(mdsc, s); 2980 else 2981 ceph_con_keepalive(&s->s_con); 2982 ceph_add_cap_releases(mdsc, s); 2983 if (s->s_state == CEPH_MDS_SESSION_OPEN || 2984 s->s_state == CEPH_MDS_SESSION_HUNG) 2985 ceph_send_cap_releases(mdsc, s); 2986 mutex_unlock(&s->s_mutex); 2987 ceph_put_mds_session(s); 2988 2989 mutex_lock(&mdsc->mutex); 2990 } 2991 mutex_unlock(&mdsc->mutex); 2992 2993 schedule_delayed(mdsc); 2994 } 2995 2996 int ceph_mdsc_init(struct ceph_fs_client *fsc) 2997 2998 { 2999 struct ceph_mds_client *mdsc; 3000 3001 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 3002 if (!mdsc) 3003 return -ENOMEM; 3004 mdsc->fsc = fsc; 3005 fsc->mdsc = mdsc; 3006 mutex_init(&mdsc->mutex); 3007 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 3008 if (mdsc->mdsmap == NULL) 3009 return -ENOMEM; 3010 3011 init_completion(&mdsc->safe_umount_waiters); 3012 init_waitqueue_head(&mdsc->session_close_wq); 3013 INIT_LIST_HEAD(&mdsc->waiting_for_map); 3014 mdsc->sessions = NULL; 3015 mdsc->max_sessions = 0; 3016 mdsc->stopping = 0; 3017 init_rwsem(&mdsc->snap_rwsem); 3018 mdsc->snap_realms = RB_ROOT; 3019 INIT_LIST_HEAD(&mdsc->snap_empty); 3020 spin_lock_init(&mdsc->snap_empty_lock); 3021 mdsc->last_tid = 0; 3022 mdsc->request_tree = RB_ROOT; 3023 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 3024 mdsc->last_renew_caps = jiffies; 3025 INIT_LIST_HEAD(&mdsc->cap_delay_list); 3026 spin_lock_init(&mdsc->cap_delay_lock); 3027 INIT_LIST_HEAD(&mdsc->snap_flush_list); 3028 spin_lock_init(&mdsc->snap_flush_lock); 3029 mdsc->cap_flush_seq = 0; 3030 INIT_LIST_HEAD(&mdsc->cap_dirty); 3031 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 3032 mdsc->num_cap_flushing = 0; 3033 spin_lock_init(&mdsc->cap_dirty_lock); 3034 init_waitqueue_head(&mdsc->cap_flushing_wq); 3035 spin_lock_init(&mdsc->dentry_lru_lock); 3036 INIT_LIST_HEAD(&mdsc->dentry_lru); 3037 3038 ceph_caps_init(mdsc); 3039 ceph_adjust_min_caps(mdsc, fsc->min_caps); 3040 3041 return 0; 3042 } 3043 3044 /* 3045 * Wait for safe replies on open mds requests. If we time out, drop 3046 * all requests from the tree to avoid dangling dentry refs. 3047 */ 3048 static void wait_requests(struct ceph_mds_client *mdsc) 3049 { 3050 struct ceph_mds_request *req; 3051 struct ceph_fs_client *fsc = mdsc->fsc; 3052 3053 mutex_lock(&mdsc->mutex); 3054 if (__get_oldest_req(mdsc)) { 3055 mutex_unlock(&mdsc->mutex); 3056 3057 dout("wait_requests waiting for requests\n"); 3058 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 3059 fsc->client->options->mount_timeout * HZ); 3060 3061 /* tear down remaining requests */ 3062 mutex_lock(&mdsc->mutex); 3063 while ((req = __get_oldest_req(mdsc))) { 3064 dout("wait_requests timed out on tid %llu\n", 3065 req->r_tid); 3066 __unregister_request(mdsc, req); 3067 } 3068 } 3069 mutex_unlock(&mdsc->mutex); 3070 dout("wait_requests done\n"); 3071 } 3072 3073 /* 3074 * called before mount is ro, and before dentries are torn down. 3075 * (hmm, does this still race with new lookups?) 3076 */ 3077 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 3078 { 3079 dout("pre_umount\n"); 3080 mdsc->stopping = 1; 3081 3082 drop_leases(mdsc); 3083 ceph_flush_dirty_caps(mdsc); 3084 wait_requests(mdsc); 3085 3086 /* 3087 * wait for reply handlers to drop their request refs and 3088 * their inode/dcache refs 3089 */ 3090 ceph_msgr_flush(); 3091 } 3092 3093 /* 3094 * wait for all write mds requests to flush. 3095 */ 3096 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 3097 { 3098 struct ceph_mds_request *req = NULL, *nextreq; 3099 struct rb_node *n; 3100 3101 mutex_lock(&mdsc->mutex); 3102 dout("wait_unsafe_requests want %lld\n", want_tid); 3103 restart: 3104 req = __get_oldest_req(mdsc); 3105 while (req && req->r_tid <= want_tid) { 3106 /* find next request */ 3107 n = rb_next(&req->r_node); 3108 if (n) 3109 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 3110 else 3111 nextreq = NULL; 3112 if ((req->r_op & CEPH_MDS_OP_WRITE)) { 3113 /* write op */ 3114 ceph_mdsc_get_request(req); 3115 if (nextreq) 3116 ceph_mdsc_get_request(nextreq); 3117 mutex_unlock(&mdsc->mutex); 3118 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 3119 req->r_tid, want_tid); 3120 wait_for_completion(&req->r_safe_completion); 3121 mutex_lock(&mdsc->mutex); 3122 ceph_mdsc_put_request(req); 3123 if (!nextreq) 3124 break; /* next dne before, so we're done! */ 3125 if (RB_EMPTY_NODE(&nextreq->r_node)) { 3126 /* next request was removed from tree */ 3127 ceph_mdsc_put_request(nextreq); 3128 goto restart; 3129 } 3130 ceph_mdsc_put_request(nextreq); /* won't go away */ 3131 } 3132 req = nextreq; 3133 } 3134 mutex_unlock(&mdsc->mutex); 3135 dout("wait_unsafe_requests done\n"); 3136 } 3137 3138 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 3139 { 3140 u64 want_tid, want_flush; 3141 3142 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) 3143 return; 3144 3145 dout("sync\n"); 3146 mutex_lock(&mdsc->mutex); 3147 want_tid = mdsc->last_tid; 3148 want_flush = mdsc->cap_flush_seq; 3149 mutex_unlock(&mdsc->mutex); 3150 dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush); 3151 3152 ceph_flush_dirty_caps(mdsc); 3153 3154 wait_unsafe_requests(mdsc, want_tid); 3155 wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush)); 3156 } 3157 3158 /* 3159 * true if all sessions are closed, or we force unmount 3160 */ 3161 static bool done_closing_sessions(struct ceph_mds_client *mdsc) 3162 { 3163 int i, n = 0; 3164 3165 if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) 3166 return true; 3167 3168 mutex_lock(&mdsc->mutex); 3169 for (i = 0; i < mdsc->max_sessions; i++) 3170 if (mdsc->sessions[i]) 3171 n++; 3172 mutex_unlock(&mdsc->mutex); 3173 return n == 0; 3174 } 3175 3176 /* 3177 * called after sb is ro. 3178 */ 3179 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 3180 { 3181 struct ceph_mds_session *session; 3182 int i; 3183 struct ceph_fs_client *fsc = mdsc->fsc; 3184 unsigned long timeout = fsc->client->options->mount_timeout * HZ; 3185 3186 dout("close_sessions\n"); 3187 3188 /* close sessions */ 3189 mutex_lock(&mdsc->mutex); 3190 for (i = 0; i < mdsc->max_sessions; i++) { 3191 session = __ceph_lookup_mds_session(mdsc, i); 3192 if (!session) 3193 continue; 3194 mutex_unlock(&mdsc->mutex); 3195 mutex_lock(&session->s_mutex); 3196 __close_session(mdsc, session); 3197 mutex_unlock(&session->s_mutex); 3198 ceph_put_mds_session(session); 3199 mutex_lock(&mdsc->mutex); 3200 } 3201 mutex_unlock(&mdsc->mutex); 3202 3203 dout("waiting for sessions to close\n"); 3204 wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc), 3205 timeout); 3206 3207 /* tear down remaining sessions */ 3208 mutex_lock(&mdsc->mutex); 3209 for (i = 0; i < mdsc->max_sessions; i++) { 3210 if (mdsc->sessions[i]) { 3211 session = get_session(mdsc->sessions[i]); 3212 __unregister_session(mdsc, session); 3213 mutex_unlock(&mdsc->mutex); 3214 mutex_lock(&session->s_mutex); 3215 remove_session_caps(session); 3216 mutex_unlock(&session->s_mutex); 3217 ceph_put_mds_session(session); 3218 mutex_lock(&mdsc->mutex); 3219 } 3220 } 3221 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 3222 mutex_unlock(&mdsc->mutex); 3223 3224 ceph_cleanup_empty_realms(mdsc); 3225 3226 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 3227 3228 dout("stopped\n"); 3229 } 3230 3231 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 3232 { 3233 dout("stop\n"); 3234 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 3235 if (mdsc->mdsmap) 3236 ceph_mdsmap_destroy(mdsc->mdsmap); 3237 kfree(mdsc->sessions); 3238 ceph_caps_finalize(mdsc); 3239 } 3240 3241 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 3242 { 3243 struct ceph_mds_client *mdsc = fsc->mdsc; 3244 3245 dout("mdsc_destroy %p\n", mdsc); 3246 ceph_mdsc_stop(mdsc); 3247 3248 /* flush out any connection work with references to us */ 3249 ceph_msgr_flush(); 3250 3251 fsc->mdsc = NULL; 3252 kfree(mdsc); 3253 dout("mdsc_destroy %p done\n", mdsc); 3254 } 3255 3256 3257 /* 3258 * handle mds map update. 3259 */ 3260 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 3261 { 3262 u32 epoch; 3263 u32 maplen; 3264 void *p = msg->front.iov_base; 3265 void *end = p + msg->front.iov_len; 3266 struct ceph_mdsmap *newmap, *oldmap; 3267 struct ceph_fsid fsid; 3268 int err = -EINVAL; 3269 3270 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 3271 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 3272 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 3273 return; 3274 epoch = ceph_decode_32(&p); 3275 maplen = ceph_decode_32(&p); 3276 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 3277 3278 /* do we need it? */ 3279 ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch); 3280 mutex_lock(&mdsc->mutex); 3281 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 3282 dout("handle_map epoch %u <= our %u\n", 3283 epoch, mdsc->mdsmap->m_epoch); 3284 mutex_unlock(&mdsc->mutex); 3285 return; 3286 } 3287 3288 newmap = ceph_mdsmap_decode(&p, end); 3289 if (IS_ERR(newmap)) { 3290 err = PTR_ERR(newmap); 3291 goto bad_unlock; 3292 } 3293 3294 /* swap into place */ 3295 if (mdsc->mdsmap) { 3296 oldmap = mdsc->mdsmap; 3297 mdsc->mdsmap = newmap; 3298 check_new_map(mdsc, newmap, oldmap); 3299 ceph_mdsmap_destroy(oldmap); 3300 } else { 3301 mdsc->mdsmap = newmap; /* first mds map */ 3302 } 3303 mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size; 3304 3305 __wake_requests(mdsc, &mdsc->waiting_for_map); 3306 3307 mutex_unlock(&mdsc->mutex); 3308 schedule_delayed(mdsc); 3309 return; 3310 3311 bad_unlock: 3312 mutex_unlock(&mdsc->mutex); 3313 bad: 3314 pr_err("error decoding mdsmap %d\n", err); 3315 return; 3316 } 3317 3318 static struct ceph_connection *con_get(struct ceph_connection *con) 3319 { 3320 struct ceph_mds_session *s = con->private; 3321 3322 if (get_session(s)) { 3323 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref)); 3324 return con; 3325 } 3326 dout("mdsc con_get %p FAIL\n", s); 3327 return NULL; 3328 } 3329 3330 static void con_put(struct ceph_connection *con) 3331 { 3332 struct ceph_mds_session *s = con->private; 3333 3334 dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref) - 1); 3335 ceph_put_mds_session(s); 3336 } 3337 3338 /* 3339 * if the client is unresponsive for long enough, the mds will kill 3340 * the session entirely. 3341 */ 3342 static void peer_reset(struct ceph_connection *con) 3343 { 3344 struct ceph_mds_session *s = con->private; 3345 struct ceph_mds_client *mdsc = s->s_mdsc; 3346 3347 pr_warning("mds%d closed our session\n", s->s_mds); 3348 send_mds_reconnect(mdsc, s); 3349 } 3350 3351 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 3352 { 3353 struct ceph_mds_session *s = con->private; 3354 struct ceph_mds_client *mdsc = s->s_mdsc; 3355 int type = le16_to_cpu(msg->hdr.type); 3356 3357 mutex_lock(&mdsc->mutex); 3358 if (__verify_registered_session(mdsc, s) < 0) { 3359 mutex_unlock(&mdsc->mutex); 3360 goto out; 3361 } 3362 mutex_unlock(&mdsc->mutex); 3363 3364 switch (type) { 3365 case CEPH_MSG_MDS_MAP: 3366 ceph_mdsc_handle_map(mdsc, msg); 3367 break; 3368 case CEPH_MSG_CLIENT_SESSION: 3369 handle_session(s, msg); 3370 break; 3371 case CEPH_MSG_CLIENT_REPLY: 3372 handle_reply(s, msg); 3373 break; 3374 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 3375 handle_forward(mdsc, s, msg); 3376 break; 3377 case CEPH_MSG_CLIENT_CAPS: 3378 ceph_handle_caps(s, msg); 3379 break; 3380 case CEPH_MSG_CLIENT_SNAP: 3381 ceph_handle_snap(mdsc, s, msg); 3382 break; 3383 case CEPH_MSG_CLIENT_LEASE: 3384 handle_lease(mdsc, s, msg); 3385 break; 3386 3387 default: 3388 pr_err("received unknown message type %d %s\n", type, 3389 ceph_msg_type_name(type)); 3390 } 3391 out: 3392 ceph_msg_put(msg); 3393 } 3394 3395 /* 3396 * authentication 3397 */ 3398 static int get_authorizer(struct ceph_connection *con, 3399 void **buf, int *len, int *proto, 3400 void **reply_buf, int *reply_len, int force_new) 3401 { 3402 struct ceph_mds_session *s = con->private; 3403 struct ceph_mds_client *mdsc = s->s_mdsc; 3404 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3405 int ret = 0; 3406 3407 if (force_new && s->s_authorizer) { 3408 ac->ops->destroy_authorizer(ac, s->s_authorizer); 3409 s->s_authorizer = NULL; 3410 } 3411 if (s->s_authorizer == NULL) { 3412 if (ac->ops->create_authorizer) { 3413 ret = ac->ops->create_authorizer( 3414 ac, CEPH_ENTITY_TYPE_MDS, 3415 &s->s_authorizer, 3416 &s->s_authorizer_buf, 3417 &s->s_authorizer_buf_len, 3418 &s->s_authorizer_reply_buf, 3419 &s->s_authorizer_reply_buf_len); 3420 if (ret) 3421 return ret; 3422 } 3423 } 3424 3425 *proto = ac->protocol; 3426 *buf = s->s_authorizer_buf; 3427 *len = s->s_authorizer_buf_len; 3428 *reply_buf = s->s_authorizer_reply_buf; 3429 *reply_len = s->s_authorizer_reply_buf_len; 3430 return 0; 3431 } 3432 3433 3434 static int verify_authorizer_reply(struct ceph_connection *con, int len) 3435 { 3436 struct ceph_mds_session *s = con->private; 3437 struct ceph_mds_client *mdsc = s->s_mdsc; 3438 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3439 3440 return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len); 3441 } 3442 3443 static int invalidate_authorizer(struct ceph_connection *con) 3444 { 3445 struct ceph_mds_session *s = con->private; 3446 struct ceph_mds_client *mdsc = s->s_mdsc; 3447 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 3448 3449 if (ac->ops->invalidate_authorizer) 3450 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 3451 3452 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 3453 } 3454 3455 static const struct ceph_connection_operations mds_con_ops = { 3456 .get = con_get, 3457 .put = con_put, 3458 .dispatch = dispatch, 3459 .get_authorizer = get_authorizer, 3460 .verify_authorizer_reply = verify_authorizer_reply, 3461 .invalidate_authorizer = invalidate_authorizer, 3462 .peer_reset = peer_reset, 3463 }; 3464 3465 /* eof */ 3466