1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 13 #include "super.h" 14 #include "mds_client.h" 15 16 #include <linux/ceph/ceph_features.h> 17 #include <linux/ceph/messenger.h> 18 #include <linux/ceph/decode.h> 19 #include <linux/ceph/pagelist.h> 20 #include <linux/ceph/auth.h> 21 #include <linux/ceph/debugfs.h> 22 23 /* 24 * A cluster of MDS (metadata server) daemons is responsible for 25 * managing the file system namespace (the directory hierarchy and 26 * inodes) and for coordinating shared access to storage. Metadata is 27 * partitioning hierarchically across a number of servers, and that 28 * partition varies over time as the cluster adjusts the distribution 29 * in order to balance load. 30 * 31 * The MDS client is primarily responsible to managing synchronous 32 * metadata requests for operations like open, unlink, and so forth. 33 * If there is a MDS failure, we find out about it when we (possibly 34 * request and) receive a new MDS map, and can resubmit affected 35 * requests. 36 * 37 * For the most part, though, we take advantage of a lossless 38 * communications channel to the MDS, and do not need to worry about 39 * timing out or resubmitting requests. 40 * 41 * We maintain a stateful "session" with each MDS we interact with. 42 * Within each session, we sent periodic heartbeat messages to ensure 43 * any capabilities or leases we have been issues remain valid. If 44 * the session times out and goes stale, our leases and capabilities 45 * are no longer valid. 46 */ 47 48 struct ceph_reconnect_state { 49 int nr_caps; 50 struct ceph_pagelist *pagelist; 51 unsigned msg_version; 52 }; 53 54 static void __wake_requests(struct ceph_mds_client *mdsc, 55 struct list_head *head); 56 57 static const struct ceph_connection_operations mds_con_ops; 58 59 60 /* 61 * mds reply parsing 62 */ 63 64 /* 65 * parse individual inode info 66 */ 67 static int parse_reply_info_in(void **p, void *end, 68 struct ceph_mds_reply_info_in *info, 69 u64 features) 70 { 71 int err = -EIO; 72 73 info->in = *p; 74 *p += sizeof(struct ceph_mds_reply_inode) + 75 sizeof(*info->in->fragtree.splits) * 76 le32_to_cpu(info->in->fragtree.nsplits); 77 78 ceph_decode_32_safe(p, end, info->symlink_len, bad); 79 ceph_decode_need(p, end, info->symlink_len, bad); 80 info->symlink = *p; 81 *p += info->symlink_len; 82 83 ceph_decode_copy_safe(p, end, &info->dir_layout, 84 sizeof(info->dir_layout), bad); 85 ceph_decode_32_safe(p, end, info->xattr_len, bad); 86 ceph_decode_need(p, end, info->xattr_len, bad); 87 info->xattr_data = *p; 88 *p += info->xattr_len; 89 90 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 91 ceph_decode_64_safe(p, end, info->inline_version, bad); 92 ceph_decode_32_safe(p, end, info->inline_len, bad); 93 ceph_decode_need(p, end, info->inline_len, bad); 94 info->inline_data = *p; 95 *p += info->inline_len; 96 } else 97 info->inline_version = CEPH_INLINE_NONE; 98 99 if (features & CEPH_FEATURE_MDS_QUOTA) { 100 u8 struct_v, struct_compat; 101 u32 struct_len; 102 103 /* 104 * both struct_v and struct_compat are expected to be >= 1 105 */ 106 ceph_decode_8_safe(p, end, struct_v, bad); 107 ceph_decode_8_safe(p, end, struct_compat, bad); 108 if (!struct_v || !struct_compat) 109 goto bad; 110 ceph_decode_32_safe(p, end, struct_len, bad); 111 ceph_decode_need(p, end, struct_len, bad); 112 ceph_decode_64_safe(p, end, info->max_bytes, bad); 113 ceph_decode_64_safe(p, end, info->max_files, bad); 114 } else { 115 info->max_bytes = 0; 116 info->max_files = 0; 117 } 118 119 info->pool_ns_len = 0; 120 info->pool_ns_data = NULL; 121 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 122 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 123 if (info->pool_ns_len > 0) { 124 ceph_decode_need(p, end, info->pool_ns_len, bad); 125 info->pool_ns_data = *p; 126 *p += info->pool_ns_len; 127 } 128 } 129 130 return 0; 131 bad: 132 return err; 133 } 134 135 /* 136 * parse a normal reply, which may contain a (dir+)dentry and/or a 137 * target inode. 138 */ 139 static int parse_reply_info_trace(void **p, void *end, 140 struct ceph_mds_reply_info_parsed *info, 141 u64 features) 142 { 143 int err; 144 145 if (info->head->is_dentry) { 146 err = parse_reply_info_in(p, end, &info->diri, features); 147 if (err < 0) 148 goto out_bad; 149 150 if (unlikely(*p + sizeof(*info->dirfrag) > end)) 151 goto bad; 152 info->dirfrag = *p; 153 *p += sizeof(*info->dirfrag) + 154 sizeof(u32)*le32_to_cpu(info->dirfrag->ndist); 155 if (unlikely(*p > end)) 156 goto bad; 157 158 ceph_decode_32_safe(p, end, info->dname_len, bad); 159 ceph_decode_need(p, end, info->dname_len, bad); 160 info->dname = *p; 161 *p += info->dname_len; 162 info->dlease = *p; 163 *p += sizeof(*info->dlease); 164 } 165 166 if (info->head->is_target) { 167 err = parse_reply_info_in(p, end, &info->targeti, features); 168 if (err < 0) 169 goto out_bad; 170 } 171 172 if (unlikely(*p != end)) 173 goto bad; 174 return 0; 175 176 bad: 177 err = -EIO; 178 out_bad: 179 pr_err("problem parsing mds trace %d\n", err); 180 return err; 181 } 182 183 /* 184 * parse readdir results 185 */ 186 static int parse_reply_info_dir(void **p, void *end, 187 struct ceph_mds_reply_info_parsed *info, 188 u64 features) 189 { 190 u32 num, i = 0; 191 int err; 192 193 info->dir_dir = *p; 194 if (*p + sizeof(*info->dir_dir) > end) 195 goto bad; 196 *p += sizeof(*info->dir_dir) + 197 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist); 198 if (*p > end) 199 goto bad; 200 201 ceph_decode_need(p, end, sizeof(num) + 2, bad); 202 num = ceph_decode_32(p); 203 { 204 u16 flags = ceph_decode_16(p); 205 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 206 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 207 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 208 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 209 } 210 if (num == 0) 211 goto done; 212 213 BUG_ON(!info->dir_entries); 214 if ((unsigned long)(info->dir_entries + num) > 215 (unsigned long)info->dir_entries + info->dir_buf_size) { 216 pr_err("dir contents are larger than expected\n"); 217 WARN_ON(1); 218 goto bad; 219 } 220 221 info->dir_nr = num; 222 while (num) { 223 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 224 /* dentry */ 225 ceph_decode_need(p, end, sizeof(u32)*2, bad); 226 rde->name_len = ceph_decode_32(p); 227 ceph_decode_need(p, end, rde->name_len, bad); 228 rde->name = *p; 229 *p += rde->name_len; 230 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 231 rde->lease = *p; 232 *p += sizeof(struct ceph_mds_reply_lease); 233 234 /* inode */ 235 err = parse_reply_info_in(p, end, &rde->inode, features); 236 if (err < 0) 237 goto out_bad; 238 /* ceph_readdir_prepopulate() will update it */ 239 rde->offset = 0; 240 i++; 241 num--; 242 } 243 244 done: 245 if (*p != end) 246 goto bad; 247 return 0; 248 249 bad: 250 err = -EIO; 251 out_bad: 252 pr_err("problem parsing dir contents %d\n", err); 253 return err; 254 } 255 256 /* 257 * parse fcntl F_GETLK results 258 */ 259 static int parse_reply_info_filelock(void **p, void *end, 260 struct ceph_mds_reply_info_parsed *info, 261 u64 features) 262 { 263 if (*p + sizeof(*info->filelock_reply) > end) 264 goto bad; 265 266 info->filelock_reply = *p; 267 *p += sizeof(*info->filelock_reply); 268 269 if (unlikely(*p != end)) 270 goto bad; 271 return 0; 272 273 bad: 274 return -EIO; 275 } 276 277 /* 278 * parse create results 279 */ 280 static int parse_reply_info_create(void **p, void *end, 281 struct ceph_mds_reply_info_parsed *info, 282 u64 features) 283 { 284 if (features & CEPH_FEATURE_REPLY_CREATE_INODE) { 285 if (*p == end) { 286 info->has_create_ino = false; 287 } else { 288 info->has_create_ino = true; 289 info->ino = ceph_decode_64(p); 290 } 291 } 292 293 if (unlikely(*p != end)) 294 goto bad; 295 return 0; 296 297 bad: 298 return -EIO; 299 } 300 301 /* 302 * parse extra results 303 */ 304 static int parse_reply_info_extra(void **p, void *end, 305 struct ceph_mds_reply_info_parsed *info, 306 u64 features) 307 { 308 u32 op = le32_to_cpu(info->head->op); 309 310 if (op == CEPH_MDS_OP_GETFILELOCK) 311 return parse_reply_info_filelock(p, end, info, features); 312 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 313 return parse_reply_info_dir(p, end, info, features); 314 else if (op == CEPH_MDS_OP_CREATE) 315 return parse_reply_info_create(p, end, info, features); 316 else 317 return -EIO; 318 } 319 320 /* 321 * parse entire mds reply 322 */ 323 static int parse_reply_info(struct ceph_msg *msg, 324 struct ceph_mds_reply_info_parsed *info, 325 u64 features) 326 { 327 void *p, *end; 328 u32 len; 329 int err; 330 331 info->head = msg->front.iov_base; 332 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 333 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 334 335 /* trace */ 336 ceph_decode_32_safe(&p, end, len, bad); 337 if (len > 0) { 338 ceph_decode_need(&p, end, len, bad); 339 err = parse_reply_info_trace(&p, p+len, info, features); 340 if (err < 0) 341 goto out_bad; 342 } 343 344 /* extra */ 345 ceph_decode_32_safe(&p, end, len, bad); 346 if (len > 0) { 347 ceph_decode_need(&p, end, len, bad); 348 err = parse_reply_info_extra(&p, p+len, info, features); 349 if (err < 0) 350 goto out_bad; 351 } 352 353 /* snap blob */ 354 ceph_decode_32_safe(&p, end, len, bad); 355 info->snapblob_len = len; 356 info->snapblob = p; 357 p += len; 358 359 if (p != end) 360 goto bad; 361 return 0; 362 363 bad: 364 err = -EIO; 365 out_bad: 366 pr_err("mds parse_reply err %d\n", err); 367 return err; 368 } 369 370 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 371 { 372 if (!info->dir_entries) 373 return; 374 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 375 } 376 377 378 /* 379 * sessions 380 */ 381 const char *ceph_session_state_name(int s) 382 { 383 switch (s) { 384 case CEPH_MDS_SESSION_NEW: return "new"; 385 case CEPH_MDS_SESSION_OPENING: return "opening"; 386 case CEPH_MDS_SESSION_OPEN: return "open"; 387 case CEPH_MDS_SESSION_HUNG: return "hung"; 388 case CEPH_MDS_SESSION_CLOSING: return "closing"; 389 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 390 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 391 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 392 default: return "???"; 393 } 394 } 395 396 static struct ceph_mds_session *get_session(struct ceph_mds_session *s) 397 { 398 if (refcount_inc_not_zero(&s->s_ref)) { 399 dout("mdsc get_session %p %d -> %d\n", s, 400 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref)); 401 return s; 402 } else { 403 dout("mdsc get_session %p 0 -- FAIL\n", s); 404 return NULL; 405 } 406 } 407 408 void ceph_put_mds_session(struct ceph_mds_session *s) 409 { 410 dout("mdsc put_session %p %d -> %d\n", s, 411 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1); 412 if (refcount_dec_and_test(&s->s_ref)) { 413 if (s->s_auth.authorizer) 414 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 415 kfree(s); 416 } 417 } 418 419 /* 420 * called under mdsc->mutex 421 */ 422 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 423 int mds) 424 { 425 struct ceph_mds_session *session; 426 427 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 428 return NULL; 429 session = mdsc->sessions[mds]; 430 dout("lookup_mds_session %p %d\n", session, 431 refcount_read(&session->s_ref)); 432 get_session(session); 433 return session; 434 } 435 436 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 437 { 438 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 439 return false; 440 else 441 return true; 442 } 443 444 static int __verify_registered_session(struct ceph_mds_client *mdsc, 445 struct ceph_mds_session *s) 446 { 447 if (s->s_mds >= mdsc->max_sessions || 448 mdsc->sessions[s->s_mds] != s) 449 return -ENOENT; 450 return 0; 451 } 452 453 /* 454 * create+register a new session for given mds. 455 * called under mdsc->mutex. 456 */ 457 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 458 int mds) 459 { 460 struct ceph_mds_session *s; 461 462 if (mds >= mdsc->mdsmap->m_num_mds) 463 return ERR_PTR(-EINVAL); 464 465 s = kzalloc(sizeof(*s), GFP_NOFS); 466 if (!s) 467 return ERR_PTR(-ENOMEM); 468 469 if (mds >= mdsc->max_sessions) { 470 int newmax = 1 << get_count_order(mds + 1); 471 struct ceph_mds_session **sa; 472 473 dout("%s: realloc to %d\n", __func__, newmax); 474 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 475 if (!sa) 476 goto fail_realloc; 477 if (mdsc->sessions) { 478 memcpy(sa, mdsc->sessions, 479 mdsc->max_sessions * sizeof(void *)); 480 kfree(mdsc->sessions); 481 } 482 mdsc->sessions = sa; 483 mdsc->max_sessions = newmax; 484 } 485 486 dout("%s: mds%d\n", __func__, mds); 487 s->s_mdsc = mdsc; 488 s->s_mds = mds; 489 s->s_state = CEPH_MDS_SESSION_NEW; 490 s->s_ttl = 0; 491 s->s_seq = 0; 492 mutex_init(&s->s_mutex); 493 494 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 495 496 spin_lock_init(&s->s_gen_ttl_lock); 497 s->s_cap_gen = 0; 498 s->s_cap_ttl = jiffies - 1; 499 500 spin_lock_init(&s->s_cap_lock); 501 s->s_renew_requested = 0; 502 s->s_renew_seq = 0; 503 INIT_LIST_HEAD(&s->s_caps); 504 s->s_nr_caps = 0; 505 s->s_trim_caps = 0; 506 refcount_set(&s->s_ref, 1); 507 INIT_LIST_HEAD(&s->s_waiting); 508 INIT_LIST_HEAD(&s->s_unsafe); 509 s->s_num_cap_releases = 0; 510 s->s_cap_reconnect = 0; 511 s->s_cap_iterator = NULL; 512 INIT_LIST_HEAD(&s->s_cap_releases); 513 INIT_LIST_HEAD(&s->s_cap_flushing); 514 515 mdsc->sessions[mds] = s; 516 atomic_inc(&mdsc->num_sessions); 517 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 518 519 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 520 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 521 522 return s; 523 524 fail_realloc: 525 kfree(s); 526 return ERR_PTR(-ENOMEM); 527 } 528 529 /* 530 * called under mdsc->mutex 531 */ 532 static void __unregister_session(struct ceph_mds_client *mdsc, 533 struct ceph_mds_session *s) 534 { 535 dout("__unregister_session mds%d %p\n", s->s_mds, s); 536 BUG_ON(mdsc->sessions[s->s_mds] != s); 537 mdsc->sessions[s->s_mds] = NULL; 538 ceph_con_close(&s->s_con); 539 ceph_put_mds_session(s); 540 atomic_dec(&mdsc->num_sessions); 541 } 542 543 /* 544 * drop session refs in request. 545 * 546 * should be last request ref, or hold mdsc->mutex 547 */ 548 static void put_request_session(struct ceph_mds_request *req) 549 { 550 if (req->r_session) { 551 ceph_put_mds_session(req->r_session); 552 req->r_session = NULL; 553 } 554 } 555 556 void ceph_mdsc_release_request(struct kref *kref) 557 { 558 struct ceph_mds_request *req = container_of(kref, 559 struct ceph_mds_request, 560 r_kref); 561 destroy_reply_info(&req->r_reply_info); 562 if (req->r_request) 563 ceph_msg_put(req->r_request); 564 if (req->r_reply) 565 ceph_msg_put(req->r_reply); 566 if (req->r_inode) { 567 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 568 iput(req->r_inode); 569 } 570 if (req->r_parent) 571 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 572 iput(req->r_target_inode); 573 if (req->r_dentry) 574 dput(req->r_dentry); 575 if (req->r_old_dentry) 576 dput(req->r_old_dentry); 577 if (req->r_old_dentry_dir) { 578 /* 579 * track (and drop pins for) r_old_dentry_dir 580 * separately, since r_old_dentry's d_parent may have 581 * changed between the dir mutex being dropped and 582 * this request being freed. 583 */ 584 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 585 CEPH_CAP_PIN); 586 iput(req->r_old_dentry_dir); 587 } 588 kfree(req->r_path1); 589 kfree(req->r_path2); 590 if (req->r_pagelist) 591 ceph_pagelist_release(req->r_pagelist); 592 put_request_session(req); 593 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 594 kfree(req); 595 } 596 597 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 598 599 /* 600 * lookup session, bump ref if found. 601 * 602 * called under mdsc->mutex. 603 */ 604 static struct ceph_mds_request * 605 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 606 { 607 struct ceph_mds_request *req; 608 609 req = lookup_request(&mdsc->request_tree, tid); 610 if (req) 611 ceph_mdsc_get_request(req); 612 613 return req; 614 } 615 616 /* 617 * Register an in-flight request, and assign a tid. Link to directory 618 * are modifying (if any). 619 * 620 * Called under mdsc->mutex. 621 */ 622 static void __register_request(struct ceph_mds_client *mdsc, 623 struct ceph_mds_request *req, 624 struct inode *dir) 625 { 626 int ret = 0; 627 628 req->r_tid = ++mdsc->last_tid; 629 if (req->r_num_caps) { 630 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 631 req->r_num_caps); 632 if (ret < 0) { 633 pr_err("__register_request %p " 634 "failed to reserve caps: %d\n", req, ret); 635 /* set req->r_err to fail early from __do_request */ 636 req->r_err = ret; 637 return; 638 } 639 } 640 dout("__register_request %p tid %lld\n", req, req->r_tid); 641 ceph_mdsc_get_request(req); 642 insert_request(&mdsc->request_tree, req); 643 644 req->r_uid = current_fsuid(); 645 req->r_gid = current_fsgid(); 646 647 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 648 mdsc->oldest_tid = req->r_tid; 649 650 if (dir) { 651 ihold(dir); 652 req->r_unsafe_dir = dir; 653 } 654 } 655 656 static void __unregister_request(struct ceph_mds_client *mdsc, 657 struct ceph_mds_request *req) 658 { 659 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 660 661 /* Never leave an unregistered request on an unsafe list! */ 662 list_del_init(&req->r_unsafe_item); 663 664 if (req->r_tid == mdsc->oldest_tid) { 665 struct rb_node *p = rb_next(&req->r_node); 666 mdsc->oldest_tid = 0; 667 while (p) { 668 struct ceph_mds_request *next_req = 669 rb_entry(p, struct ceph_mds_request, r_node); 670 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 671 mdsc->oldest_tid = next_req->r_tid; 672 break; 673 } 674 p = rb_next(p); 675 } 676 } 677 678 erase_request(&mdsc->request_tree, req); 679 680 if (req->r_unsafe_dir && 681 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 682 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 683 spin_lock(&ci->i_unsafe_lock); 684 list_del_init(&req->r_unsafe_dir_item); 685 spin_unlock(&ci->i_unsafe_lock); 686 } 687 if (req->r_target_inode && 688 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 689 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 690 spin_lock(&ci->i_unsafe_lock); 691 list_del_init(&req->r_unsafe_target_item); 692 spin_unlock(&ci->i_unsafe_lock); 693 } 694 695 if (req->r_unsafe_dir) { 696 iput(req->r_unsafe_dir); 697 req->r_unsafe_dir = NULL; 698 } 699 700 complete_all(&req->r_safe_completion); 701 702 ceph_mdsc_put_request(req); 703 } 704 705 /* 706 * Walk back up the dentry tree until we hit a dentry representing a 707 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 708 * when calling this) to ensure that the objects won't disappear while we're 709 * working with them. Once we hit a candidate dentry, we attempt to take a 710 * reference to it, and return that as the result. 711 */ 712 static struct inode *get_nonsnap_parent(struct dentry *dentry) 713 { 714 struct inode *inode = NULL; 715 716 while (dentry && !IS_ROOT(dentry)) { 717 inode = d_inode_rcu(dentry); 718 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 719 break; 720 dentry = dentry->d_parent; 721 } 722 if (inode) 723 inode = igrab(inode); 724 return inode; 725 } 726 727 /* 728 * Choose mds to send request to next. If there is a hint set in the 729 * request (e.g., due to a prior forward hint from the mds), use that. 730 * Otherwise, consult frag tree and/or caps to identify the 731 * appropriate mds. If all else fails, choose randomly. 732 * 733 * Called under mdsc->mutex. 734 */ 735 static int __choose_mds(struct ceph_mds_client *mdsc, 736 struct ceph_mds_request *req) 737 { 738 struct inode *inode; 739 struct ceph_inode_info *ci; 740 struct ceph_cap *cap; 741 int mode = req->r_direct_mode; 742 int mds = -1; 743 u32 hash = req->r_direct_hash; 744 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 745 746 /* 747 * is there a specific mds we should try? ignore hint if we have 748 * no session and the mds is not up (active or recovering). 749 */ 750 if (req->r_resend_mds >= 0 && 751 (__have_session(mdsc, req->r_resend_mds) || 752 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 753 dout("choose_mds using resend_mds mds%d\n", 754 req->r_resend_mds); 755 return req->r_resend_mds; 756 } 757 758 if (mode == USE_RANDOM_MDS) 759 goto random; 760 761 inode = NULL; 762 if (req->r_inode) { 763 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 764 inode = req->r_inode; 765 ihold(inode); 766 } else { 767 /* req->r_dentry is non-null for LSSNAP request */ 768 rcu_read_lock(); 769 inode = get_nonsnap_parent(req->r_dentry); 770 rcu_read_unlock(); 771 dout("__choose_mds using snapdir's parent %p\n", inode); 772 } 773 } else if (req->r_dentry) { 774 /* ignore race with rename; old or new d_parent is okay */ 775 struct dentry *parent; 776 struct inode *dir; 777 778 rcu_read_lock(); 779 parent = req->r_dentry->d_parent; 780 dir = req->r_parent ? : d_inode_rcu(parent); 781 782 if (!dir || dir->i_sb != mdsc->fsc->sb) { 783 /* not this fs or parent went negative */ 784 inode = d_inode(req->r_dentry); 785 if (inode) 786 ihold(inode); 787 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 788 /* direct snapped/virtual snapdir requests 789 * based on parent dir inode */ 790 inode = get_nonsnap_parent(parent); 791 dout("__choose_mds using nonsnap parent %p\n", inode); 792 } else { 793 /* dentry target */ 794 inode = d_inode(req->r_dentry); 795 if (!inode || mode == USE_AUTH_MDS) { 796 /* dir + name */ 797 inode = igrab(dir); 798 hash = ceph_dentry_hash(dir, req->r_dentry); 799 is_hash = true; 800 } else { 801 ihold(inode); 802 } 803 } 804 rcu_read_unlock(); 805 } 806 807 dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash, 808 (int)hash, mode); 809 if (!inode) 810 goto random; 811 ci = ceph_inode(inode); 812 813 if (is_hash && S_ISDIR(inode->i_mode)) { 814 struct ceph_inode_frag frag; 815 int found; 816 817 ceph_choose_frag(ci, hash, &frag, &found); 818 if (found) { 819 if (mode == USE_ANY_MDS && frag.ndist > 0) { 820 u8 r; 821 822 /* choose a random replica */ 823 get_random_bytes(&r, 1); 824 r %= frag.ndist; 825 mds = frag.dist[r]; 826 dout("choose_mds %p %llx.%llx " 827 "frag %u mds%d (%d/%d)\n", 828 inode, ceph_vinop(inode), 829 frag.frag, mds, 830 (int)r, frag.ndist); 831 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 832 CEPH_MDS_STATE_ACTIVE) 833 goto out; 834 } 835 836 /* since this file/dir wasn't known to be 837 * replicated, then we want to look for the 838 * authoritative mds. */ 839 mode = USE_AUTH_MDS; 840 if (frag.mds >= 0) { 841 /* choose auth mds */ 842 mds = frag.mds; 843 dout("choose_mds %p %llx.%llx " 844 "frag %u mds%d (auth)\n", 845 inode, ceph_vinop(inode), frag.frag, mds); 846 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 847 CEPH_MDS_STATE_ACTIVE) 848 goto out; 849 } 850 } 851 } 852 853 spin_lock(&ci->i_ceph_lock); 854 cap = NULL; 855 if (mode == USE_AUTH_MDS) 856 cap = ci->i_auth_cap; 857 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 858 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 859 if (!cap) { 860 spin_unlock(&ci->i_ceph_lock); 861 iput(inode); 862 goto random; 863 } 864 mds = cap->session->s_mds; 865 dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n", 866 inode, ceph_vinop(inode), mds, 867 cap == ci->i_auth_cap ? "auth " : "", cap); 868 spin_unlock(&ci->i_ceph_lock); 869 out: 870 iput(inode); 871 return mds; 872 873 random: 874 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 875 dout("choose_mds chose random mds%d\n", mds); 876 return mds; 877 } 878 879 880 /* 881 * session messages 882 */ 883 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 884 { 885 struct ceph_msg *msg; 886 struct ceph_mds_session_head *h; 887 888 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 889 false); 890 if (!msg) { 891 pr_err("create_session_msg ENOMEM creating msg\n"); 892 return NULL; 893 } 894 h = msg->front.iov_base; 895 h->op = cpu_to_le32(op); 896 h->seq = cpu_to_le64(seq); 897 898 return msg; 899 } 900 901 static void encode_supported_features(void **p, void *end) 902 { 903 static const unsigned char bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 904 static const size_t count = ARRAY_SIZE(bits); 905 906 if (count > 0) { 907 size_t i; 908 size_t size = ((size_t)bits[count - 1] + 64) / 64 * 8; 909 910 BUG_ON(*p + 4 + size > end); 911 ceph_encode_32(p, size); 912 memset(*p, 0, size); 913 for (i = 0; i < count; i++) 914 ((unsigned char*)(*p))[i / 8] |= 1 << (bits[i] % 8); 915 *p += size; 916 } else { 917 BUG_ON(*p + 4 > end); 918 ceph_encode_32(p, 0); 919 } 920 } 921 922 /* 923 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 924 * to include additional client metadata fields. 925 */ 926 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 927 { 928 struct ceph_msg *msg; 929 struct ceph_mds_session_head *h; 930 int i = -1; 931 int extra_bytes = 0; 932 int metadata_key_count = 0; 933 struct ceph_options *opt = mdsc->fsc->client->options; 934 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 935 void *p, *end; 936 937 const char* metadata[][2] = { 938 {"hostname", mdsc->nodename}, 939 {"kernel_version", init_utsname()->release}, 940 {"entity_id", opt->name ? : ""}, 941 {"root", fsopt->server_path ? : "/"}, 942 {NULL, NULL} 943 }; 944 945 /* Calculate serialized length of metadata */ 946 extra_bytes = 4; /* map length */ 947 for (i = 0; metadata[i][0]; ++i) { 948 extra_bytes += 8 + strlen(metadata[i][0]) + 949 strlen(metadata[i][1]); 950 metadata_key_count++; 951 } 952 /* supported feature */ 953 extra_bytes += 4 + 8; 954 955 /* Allocate the message */ 956 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 957 GFP_NOFS, false); 958 if (!msg) { 959 pr_err("create_session_msg ENOMEM creating msg\n"); 960 return NULL; 961 } 962 p = msg->front.iov_base; 963 end = p + msg->front.iov_len; 964 965 h = p; 966 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 967 h->seq = cpu_to_le64(seq); 968 969 /* 970 * Serialize client metadata into waiting buffer space, using 971 * the format that userspace expects for map<string, string> 972 * 973 * ClientSession messages with metadata are v2 974 */ 975 msg->hdr.version = cpu_to_le16(3); 976 msg->hdr.compat_version = cpu_to_le16(1); 977 978 /* The write pointer, following the session_head structure */ 979 p += sizeof(*h); 980 981 /* Number of entries in the map */ 982 ceph_encode_32(&p, metadata_key_count); 983 984 /* Two length-prefixed strings for each entry in the map */ 985 for (i = 0; metadata[i][0]; ++i) { 986 size_t const key_len = strlen(metadata[i][0]); 987 size_t const val_len = strlen(metadata[i][1]); 988 989 ceph_encode_32(&p, key_len); 990 memcpy(p, metadata[i][0], key_len); 991 p += key_len; 992 ceph_encode_32(&p, val_len); 993 memcpy(p, metadata[i][1], val_len); 994 p += val_len; 995 } 996 997 encode_supported_features(&p, end); 998 msg->front.iov_len = p - msg->front.iov_base; 999 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1000 1001 return msg; 1002 } 1003 1004 /* 1005 * send session open request. 1006 * 1007 * called under mdsc->mutex 1008 */ 1009 static int __open_session(struct ceph_mds_client *mdsc, 1010 struct ceph_mds_session *session) 1011 { 1012 struct ceph_msg *msg; 1013 int mstate; 1014 int mds = session->s_mds; 1015 1016 /* wait for mds to go active? */ 1017 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1018 dout("open_session to mds%d (%s)\n", mds, 1019 ceph_mds_state_name(mstate)); 1020 session->s_state = CEPH_MDS_SESSION_OPENING; 1021 session->s_renew_requested = jiffies; 1022 1023 /* send connect message */ 1024 msg = create_session_open_msg(mdsc, session->s_seq); 1025 if (!msg) 1026 return -ENOMEM; 1027 ceph_con_send(&session->s_con, msg); 1028 return 0; 1029 } 1030 1031 /* 1032 * open sessions for any export targets for the given mds 1033 * 1034 * called under mdsc->mutex 1035 */ 1036 static struct ceph_mds_session * 1037 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1038 { 1039 struct ceph_mds_session *session; 1040 1041 session = __ceph_lookup_mds_session(mdsc, target); 1042 if (!session) { 1043 session = register_session(mdsc, target); 1044 if (IS_ERR(session)) 1045 return session; 1046 } 1047 if (session->s_state == CEPH_MDS_SESSION_NEW || 1048 session->s_state == CEPH_MDS_SESSION_CLOSING) 1049 __open_session(mdsc, session); 1050 1051 return session; 1052 } 1053 1054 struct ceph_mds_session * 1055 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1056 { 1057 struct ceph_mds_session *session; 1058 1059 dout("open_export_target_session to mds%d\n", target); 1060 1061 mutex_lock(&mdsc->mutex); 1062 session = __open_export_target_session(mdsc, target); 1063 mutex_unlock(&mdsc->mutex); 1064 1065 return session; 1066 } 1067 1068 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1069 struct ceph_mds_session *session) 1070 { 1071 struct ceph_mds_info *mi; 1072 struct ceph_mds_session *ts; 1073 int i, mds = session->s_mds; 1074 1075 if (mds >= mdsc->mdsmap->m_num_mds) 1076 return; 1077 1078 mi = &mdsc->mdsmap->m_info[mds]; 1079 dout("open_export_target_sessions for mds%d (%d targets)\n", 1080 session->s_mds, mi->num_export_targets); 1081 1082 for (i = 0; i < mi->num_export_targets; i++) { 1083 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1084 if (!IS_ERR(ts)) 1085 ceph_put_mds_session(ts); 1086 } 1087 } 1088 1089 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1090 struct ceph_mds_session *session) 1091 { 1092 mutex_lock(&mdsc->mutex); 1093 __open_export_target_sessions(mdsc, session); 1094 mutex_unlock(&mdsc->mutex); 1095 } 1096 1097 /* 1098 * session caps 1099 */ 1100 1101 static void detach_cap_releases(struct ceph_mds_session *session, 1102 struct list_head *target) 1103 { 1104 lockdep_assert_held(&session->s_cap_lock); 1105 1106 list_splice_init(&session->s_cap_releases, target); 1107 session->s_num_cap_releases = 0; 1108 dout("dispose_cap_releases mds%d\n", session->s_mds); 1109 } 1110 1111 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1112 struct list_head *dispose) 1113 { 1114 while (!list_empty(dispose)) { 1115 struct ceph_cap *cap; 1116 /* zero out the in-progress message */ 1117 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1118 list_del(&cap->session_caps); 1119 ceph_put_cap(mdsc, cap); 1120 } 1121 } 1122 1123 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1124 struct ceph_mds_session *session) 1125 { 1126 struct ceph_mds_request *req; 1127 struct rb_node *p; 1128 1129 dout("cleanup_session_requests mds%d\n", session->s_mds); 1130 mutex_lock(&mdsc->mutex); 1131 while (!list_empty(&session->s_unsafe)) { 1132 req = list_first_entry(&session->s_unsafe, 1133 struct ceph_mds_request, r_unsafe_item); 1134 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1135 req->r_tid); 1136 __unregister_request(mdsc, req); 1137 } 1138 /* zero r_attempts, so kick_requests() will re-send requests */ 1139 p = rb_first(&mdsc->request_tree); 1140 while (p) { 1141 req = rb_entry(p, struct ceph_mds_request, r_node); 1142 p = rb_next(p); 1143 if (req->r_session && 1144 req->r_session->s_mds == session->s_mds) 1145 req->r_attempts = 0; 1146 } 1147 mutex_unlock(&mdsc->mutex); 1148 } 1149 1150 /* 1151 * Helper to safely iterate over all caps associated with a session, with 1152 * special care taken to handle a racing __ceph_remove_cap(). 1153 * 1154 * Caller must hold session s_mutex. 1155 */ 1156 static int iterate_session_caps(struct ceph_mds_session *session, 1157 int (*cb)(struct inode *, struct ceph_cap *, 1158 void *), void *arg) 1159 { 1160 struct list_head *p; 1161 struct ceph_cap *cap; 1162 struct inode *inode, *last_inode = NULL; 1163 struct ceph_cap *old_cap = NULL; 1164 int ret; 1165 1166 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1167 spin_lock(&session->s_cap_lock); 1168 p = session->s_caps.next; 1169 while (p != &session->s_caps) { 1170 cap = list_entry(p, struct ceph_cap, session_caps); 1171 inode = igrab(&cap->ci->vfs_inode); 1172 if (!inode) { 1173 p = p->next; 1174 continue; 1175 } 1176 session->s_cap_iterator = cap; 1177 spin_unlock(&session->s_cap_lock); 1178 1179 if (last_inode) { 1180 iput(last_inode); 1181 last_inode = NULL; 1182 } 1183 if (old_cap) { 1184 ceph_put_cap(session->s_mdsc, old_cap); 1185 old_cap = NULL; 1186 } 1187 1188 ret = cb(inode, cap, arg); 1189 last_inode = inode; 1190 1191 spin_lock(&session->s_cap_lock); 1192 p = p->next; 1193 if (!cap->ci) { 1194 dout("iterate_session_caps finishing cap %p removal\n", 1195 cap); 1196 BUG_ON(cap->session != session); 1197 cap->session = NULL; 1198 list_del_init(&cap->session_caps); 1199 session->s_nr_caps--; 1200 if (cap->queue_release) { 1201 list_add_tail(&cap->session_caps, 1202 &session->s_cap_releases); 1203 session->s_num_cap_releases++; 1204 } else { 1205 old_cap = cap; /* put_cap it w/o locks held */ 1206 } 1207 } 1208 if (ret < 0) 1209 goto out; 1210 } 1211 ret = 0; 1212 out: 1213 session->s_cap_iterator = NULL; 1214 spin_unlock(&session->s_cap_lock); 1215 1216 iput(last_inode); 1217 if (old_cap) 1218 ceph_put_cap(session->s_mdsc, old_cap); 1219 1220 return ret; 1221 } 1222 1223 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1224 void *arg) 1225 { 1226 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg; 1227 struct ceph_inode_info *ci = ceph_inode(inode); 1228 LIST_HEAD(to_remove); 1229 bool drop = false; 1230 bool invalidate = false; 1231 1232 dout("removing cap %p, ci is %p, inode is %p\n", 1233 cap, ci, &ci->vfs_inode); 1234 spin_lock(&ci->i_ceph_lock); 1235 if (cap->mds_wanted | cap->issued) 1236 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED; 1237 __ceph_remove_cap(cap, false); 1238 if (!ci->i_auth_cap) { 1239 struct ceph_cap_flush *cf; 1240 struct ceph_mds_client *mdsc = fsc->mdsc; 1241 1242 if (ci->i_wrbuffer_ref > 0 && 1243 READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 1244 invalidate = true; 1245 1246 while (!list_empty(&ci->i_cap_flush_list)) { 1247 cf = list_first_entry(&ci->i_cap_flush_list, 1248 struct ceph_cap_flush, i_list); 1249 list_move(&cf->i_list, &to_remove); 1250 } 1251 1252 spin_lock(&mdsc->cap_dirty_lock); 1253 1254 list_for_each_entry(cf, &to_remove, i_list) 1255 list_del(&cf->g_list); 1256 1257 if (!list_empty(&ci->i_dirty_item)) { 1258 pr_warn_ratelimited( 1259 " dropping dirty %s state for %p %lld\n", 1260 ceph_cap_string(ci->i_dirty_caps), 1261 inode, ceph_ino(inode)); 1262 ci->i_dirty_caps = 0; 1263 list_del_init(&ci->i_dirty_item); 1264 drop = true; 1265 } 1266 if (!list_empty(&ci->i_flushing_item)) { 1267 pr_warn_ratelimited( 1268 " dropping dirty+flushing %s state for %p %lld\n", 1269 ceph_cap_string(ci->i_flushing_caps), 1270 inode, ceph_ino(inode)); 1271 ci->i_flushing_caps = 0; 1272 list_del_init(&ci->i_flushing_item); 1273 mdsc->num_cap_flushing--; 1274 drop = true; 1275 } 1276 spin_unlock(&mdsc->cap_dirty_lock); 1277 1278 if (atomic_read(&ci->i_filelock_ref) > 0) { 1279 /* make further file lock syscall return -EIO */ 1280 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK; 1281 pr_warn_ratelimited(" dropping file locks for %p %lld\n", 1282 inode, ceph_ino(inode)); 1283 } 1284 1285 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { 1286 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); 1287 ci->i_prealloc_cap_flush = NULL; 1288 } 1289 } 1290 spin_unlock(&ci->i_ceph_lock); 1291 while (!list_empty(&to_remove)) { 1292 struct ceph_cap_flush *cf; 1293 cf = list_first_entry(&to_remove, 1294 struct ceph_cap_flush, i_list); 1295 list_del(&cf->i_list); 1296 ceph_free_cap_flush(cf); 1297 } 1298 1299 wake_up_all(&ci->i_cap_wq); 1300 if (invalidate) 1301 ceph_queue_invalidate(inode); 1302 if (drop) 1303 iput(inode); 1304 return 0; 1305 } 1306 1307 /* 1308 * caller must hold session s_mutex 1309 */ 1310 static void remove_session_caps(struct ceph_mds_session *session) 1311 { 1312 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1313 struct super_block *sb = fsc->sb; 1314 LIST_HEAD(dispose); 1315 1316 dout("remove_session_caps on %p\n", session); 1317 iterate_session_caps(session, remove_session_caps_cb, fsc); 1318 1319 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1320 1321 spin_lock(&session->s_cap_lock); 1322 if (session->s_nr_caps > 0) { 1323 struct inode *inode; 1324 struct ceph_cap *cap, *prev = NULL; 1325 struct ceph_vino vino; 1326 /* 1327 * iterate_session_caps() skips inodes that are being 1328 * deleted, we need to wait until deletions are complete. 1329 * __wait_on_freeing_inode() is designed for the job, 1330 * but it is not exported, so use lookup inode function 1331 * to access it. 1332 */ 1333 while (!list_empty(&session->s_caps)) { 1334 cap = list_entry(session->s_caps.next, 1335 struct ceph_cap, session_caps); 1336 if (cap == prev) 1337 break; 1338 prev = cap; 1339 vino = cap->ci->i_vino; 1340 spin_unlock(&session->s_cap_lock); 1341 1342 inode = ceph_find_inode(sb, vino); 1343 iput(inode); 1344 1345 spin_lock(&session->s_cap_lock); 1346 } 1347 } 1348 1349 // drop cap expires and unlock s_cap_lock 1350 detach_cap_releases(session, &dispose); 1351 1352 BUG_ON(session->s_nr_caps > 0); 1353 BUG_ON(!list_empty(&session->s_cap_flushing)); 1354 spin_unlock(&session->s_cap_lock); 1355 dispose_cap_releases(session->s_mdsc, &dispose); 1356 } 1357 1358 enum { 1359 RECONNECT, 1360 RENEWCAPS, 1361 FORCE_RO, 1362 }; 1363 1364 /* 1365 * wake up any threads waiting on this session's caps. if the cap is 1366 * old (didn't get renewed on the client reconnect), remove it now. 1367 * 1368 * caller must hold s_mutex. 1369 */ 1370 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1371 void *arg) 1372 { 1373 struct ceph_inode_info *ci = ceph_inode(inode); 1374 unsigned long ev = (unsigned long)arg; 1375 1376 if (ev == RECONNECT) { 1377 spin_lock(&ci->i_ceph_lock); 1378 ci->i_wanted_max_size = 0; 1379 ci->i_requested_max_size = 0; 1380 spin_unlock(&ci->i_ceph_lock); 1381 } else if (ev == RENEWCAPS) { 1382 if (cap->cap_gen < cap->session->s_cap_gen) { 1383 /* mds did not re-issue stale cap */ 1384 spin_lock(&ci->i_ceph_lock); 1385 cap->issued = cap->implemented = CEPH_CAP_PIN; 1386 /* make sure mds knows what we want */ 1387 if (__ceph_caps_file_wanted(ci) & ~cap->mds_wanted) 1388 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED; 1389 spin_unlock(&ci->i_ceph_lock); 1390 } 1391 } else if (ev == FORCE_RO) { 1392 } 1393 wake_up_all(&ci->i_cap_wq); 1394 return 0; 1395 } 1396 1397 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1398 { 1399 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1400 iterate_session_caps(session, wake_up_session_cb, 1401 (void *)(unsigned long)ev); 1402 } 1403 1404 /* 1405 * Send periodic message to MDS renewing all currently held caps. The 1406 * ack will reset the expiration for all caps from this session. 1407 * 1408 * caller holds s_mutex 1409 */ 1410 static int send_renew_caps(struct ceph_mds_client *mdsc, 1411 struct ceph_mds_session *session) 1412 { 1413 struct ceph_msg *msg; 1414 int state; 1415 1416 if (time_after_eq(jiffies, session->s_cap_ttl) && 1417 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1418 pr_info("mds%d caps stale\n", session->s_mds); 1419 session->s_renew_requested = jiffies; 1420 1421 /* do not try to renew caps until a recovering mds has reconnected 1422 * with its clients. */ 1423 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1424 if (state < CEPH_MDS_STATE_RECONNECT) { 1425 dout("send_renew_caps ignoring mds%d (%s)\n", 1426 session->s_mds, ceph_mds_state_name(state)); 1427 return 0; 1428 } 1429 1430 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1431 ceph_mds_state_name(state)); 1432 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1433 ++session->s_renew_seq); 1434 if (!msg) 1435 return -ENOMEM; 1436 ceph_con_send(&session->s_con, msg); 1437 return 0; 1438 } 1439 1440 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1441 struct ceph_mds_session *session, u64 seq) 1442 { 1443 struct ceph_msg *msg; 1444 1445 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1446 session->s_mds, ceph_session_state_name(session->s_state), seq); 1447 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1448 if (!msg) 1449 return -ENOMEM; 1450 ceph_con_send(&session->s_con, msg); 1451 return 0; 1452 } 1453 1454 1455 /* 1456 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1457 * 1458 * Called under session->s_mutex 1459 */ 1460 static void renewed_caps(struct ceph_mds_client *mdsc, 1461 struct ceph_mds_session *session, int is_renew) 1462 { 1463 int was_stale; 1464 int wake = 0; 1465 1466 spin_lock(&session->s_cap_lock); 1467 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1468 1469 session->s_cap_ttl = session->s_renew_requested + 1470 mdsc->mdsmap->m_session_timeout*HZ; 1471 1472 if (was_stale) { 1473 if (time_before(jiffies, session->s_cap_ttl)) { 1474 pr_info("mds%d caps renewed\n", session->s_mds); 1475 wake = 1; 1476 } else { 1477 pr_info("mds%d caps still stale\n", session->s_mds); 1478 } 1479 } 1480 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1481 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1482 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1483 spin_unlock(&session->s_cap_lock); 1484 1485 if (wake) 1486 wake_up_session_caps(session, RENEWCAPS); 1487 } 1488 1489 /* 1490 * send a session close request 1491 */ 1492 static int request_close_session(struct ceph_mds_client *mdsc, 1493 struct ceph_mds_session *session) 1494 { 1495 struct ceph_msg *msg; 1496 1497 dout("request_close_session mds%d state %s seq %lld\n", 1498 session->s_mds, ceph_session_state_name(session->s_state), 1499 session->s_seq); 1500 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1501 if (!msg) 1502 return -ENOMEM; 1503 ceph_con_send(&session->s_con, msg); 1504 return 1; 1505 } 1506 1507 /* 1508 * Called with s_mutex held. 1509 */ 1510 static int __close_session(struct ceph_mds_client *mdsc, 1511 struct ceph_mds_session *session) 1512 { 1513 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1514 return 0; 1515 session->s_state = CEPH_MDS_SESSION_CLOSING; 1516 return request_close_session(mdsc, session); 1517 } 1518 1519 static bool drop_negative_children(struct dentry *dentry) 1520 { 1521 struct dentry *child; 1522 bool all_negative = true; 1523 1524 if (!d_is_dir(dentry)) 1525 goto out; 1526 1527 spin_lock(&dentry->d_lock); 1528 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1529 if (d_really_is_positive(child)) { 1530 all_negative = false; 1531 break; 1532 } 1533 } 1534 spin_unlock(&dentry->d_lock); 1535 1536 if (all_negative) 1537 shrink_dcache_parent(dentry); 1538 out: 1539 return all_negative; 1540 } 1541 1542 /* 1543 * Trim old(er) caps. 1544 * 1545 * Because we can't cache an inode without one or more caps, we do 1546 * this indirectly: if a cap is unused, we prune its aliases, at which 1547 * point the inode will hopefully get dropped to. 1548 * 1549 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1550 * memory pressure from the MDS, though, so it needn't be perfect. 1551 */ 1552 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1553 { 1554 struct ceph_mds_session *session = arg; 1555 struct ceph_inode_info *ci = ceph_inode(inode); 1556 int used, wanted, oissued, mine; 1557 1558 if (session->s_trim_caps <= 0) 1559 return -1; 1560 1561 spin_lock(&ci->i_ceph_lock); 1562 mine = cap->issued | cap->implemented; 1563 used = __ceph_caps_used(ci); 1564 wanted = __ceph_caps_file_wanted(ci); 1565 oissued = __ceph_caps_issued_other(ci, cap); 1566 1567 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1568 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1569 ceph_cap_string(used), ceph_cap_string(wanted)); 1570 if (cap == ci->i_auth_cap) { 1571 if (ci->i_dirty_caps || ci->i_flushing_caps || 1572 !list_empty(&ci->i_cap_snaps)) 1573 goto out; 1574 if ((used | wanted) & CEPH_CAP_ANY_WR) 1575 goto out; 1576 /* Note: it's possible that i_filelock_ref becomes non-zero 1577 * after dropping auth caps. It doesn't hurt because reply 1578 * of lock mds request will re-add auth caps. */ 1579 if (atomic_read(&ci->i_filelock_ref) > 0) 1580 goto out; 1581 } 1582 /* The inode has cached pages, but it's no longer used. 1583 * we can safely drop it */ 1584 if (wanted == 0 && used == CEPH_CAP_FILE_CACHE && 1585 !(oissued & CEPH_CAP_FILE_CACHE)) { 1586 used = 0; 1587 oissued = 0; 1588 } 1589 if ((used | wanted) & ~oissued & mine) 1590 goto out; /* we need these caps */ 1591 1592 if (oissued) { 1593 /* we aren't the only cap.. just remove us */ 1594 __ceph_remove_cap(cap, true); 1595 session->s_trim_caps--; 1596 } else { 1597 struct dentry *dentry; 1598 /* try dropping referring dentries */ 1599 spin_unlock(&ci->i_ceph_lock); 1600 dentry = d_find_any_alias(inode); 1601 if (dentry && drop_negative_children(dentry)) { 1602 int count; 1603 dput(dentry); 1604 d_prune_aliases(inode); 1605 count = atomic_read(&inode->i_count); 1606 if (count == 1) 1607 session->s_trim_caps--; 1608 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1609 inode, cap, count); 1610 } else { 1611 dput(dentry); 1612 } 1613 return 0; 1614 } 1615 1616 out: 1617 spin_unlock(&ci->i_ceph_lock); 1618 return 0; 1619 } 1620 1621 /* 1622 * Trim session cap count down to some max number. 1623 */ 1624 int ceph_trim_caps(struct ceph_mds_client *mdsc, 1625 struct ceph_mds_session *session, 1626 int max_caps) 1627 { 1628 int trim_caps = session->s_nr_caps - max_caps; 1629 1630 dout("trim_caps mds%d start: %d / %d, trim %d\n", 1631 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 1632 if (trim_caps > 0) { 1633 session->s_trim_caps = trim_caps; 1634 iterate_session_caps(session, trim_caps_cb, session); 1635 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 1636 session->s_mds, session->s_nr_caps, max_caps, 1637 trim_caps - session->s_trim_caps); 1638 session->s_trim_caps = 0; 1639 } 1640 1641 ceph_send_cap_releases(mdsc, session); 1642 return 0; 1643 } 1644 1645 static int check_caps_flush(struct ceph_mds_client *mdsc, 1646 u64 want_flush_tid) 1647 { 1648 int ret = 1; 1649 1650 spin_lock(&mdsc->cap_dirty_lock); 1651 if (!list_empty(&mdsc->cap_flush_list)) { 1652 struct ceph_cap_flush *cf = 1653 list_first_entry(&mdsc->cap_flush_list, 1654 struct ceph_cap_flush, g_list); 1655 if (cf->tid <= want_flush_tid) { 1656 dout("check_caps_flush still flushing tid " 1657 "%llu <= %llu\n", cf->tid, want_flush_tid); 1658 ret = 0; 1659 } 1660 } 1661 spin_unlock(&mdsc->cap_dirty_lock); 1662 return ret; 1663 } 1664 1665 /* 1666 * flush all dirty inode data to disk. 1667 * 1668 * returns true if we've flushed through want_flush_tid 1669 */ 1670 static void wait_caps_flush(struct ceph_mds_client *mdsc, 1671 u64 want_flush_tid) 1672 { 1673 dout("check_caps_flush want %llu\n", want_flush_tid); 1674 1675 wait_event(mdsc->cap_flushing_wq, 1676 check_caps_flush(mdsc, want_flush_tid)); 1677 1678 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 1679 } 1680 1681 /* 1682 * called under s_mutex 1683 */ 1684 void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 1685 struct ceph_mds_session *session) 1686 { 1687 struct ceph_msg *msg = NULL; 1688 struct ceph_mds_cap_release *head; 1689 struct ceph_mds_cap_item *item; 1690 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 1691 struct ceph_cap *cap; 1692 LIST_HEAD(tmp_list); 1693 int num_cap_releases; 1694 __le32 barrier, *cap_barrier; 1695 1696 down_read(&osdc->lock); 1697 barrier = cpu_to_le32(osdc->epoch_barrier); 1698 up_read(&osdc->lock); 1699 1700 spin_lock(&session->s_cap_lock); 1701 again: 1702 list_splice_init(&session->s_cap_releases, &tmp_list); 1703 num_cap_releases = session->s_num_cap_releases; 1704 session->s_num_cap_releases = 0; 1705 spin_unlock(&session->s_cap_lock); 1706 1707 while (!list_empty(&tmp_list)) { 1708 if (!msg) { 1709 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 1710 PAGE_SIZE, GFP_NOFS, false); 1711 if (!msg) 1712 goto out_err; 1713 head = msg->front.iov_base; 1714 head->num = cpu_to_le32(0); 1715 msg->front.iov_len = sizeof(*head); 1716 1717 msg->hdr.version = cpu_to_le16(2); 1718 msg->hdr.compat_version = cpu_to_le16(1); 1719 } 1720 1721 cap = list_first_entry(&tmp_list, struct ceph_cap, 1722 session_caps); 1723 list_del(&cap->session_caps); 1724 num_cap_releases--; 1725 1726 head = msg->front.iov_base; 1727 le32_add_cpu(&head->num, 1); 1728 item = msg->front.iov_base + msg->front.iov_len; 1729 item->ino = cpu_to_le64(cap->cap_ino); 1730 item->cap_id = cpu_to_le64(cap->cap_id); 1731 item->migrate_seq = cpu_to_le32(cap->mseq); 1732 item->seq = cpu_to_le32(cap->issue_seq); 1733 msg->front.iov_len += sizeof(*item); 1734 1735 ceph_put_cap(mdsc, cap); 1736 1737 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 1738 // Append cap_barrier field 1739 cap_barrier = msg->front.iov_base + msg->front.iov_len; 1740 *cap_barrier = barrier; 1741 msg->front.iov_len += sizeof(*cap_barrier); 1742 1743 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1744 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 1745 ceph_con_send(&session->s_con, msg); 1746 msg = NULL; 1747 } 1748 } 1749 1750 BUG_ON(num_cap_releases != 0); 1751 1752 spin_lock(&session->s_cap_lock); 1753 if (!list_empty(&session->s_cap_releases)) 1754 goto again; 1755 spin_unlock(&session->s_cap_lock); 1756 1757 if (msg) { 1758 // Append cap_barrier field 1759 cap_barrier = msg->front.iov_base + msg->front.iov_len; 1760 *cap_barrier = barrier; 1761 msg->front.iov_len += sizeof(*cap_barrier); 1762 1763 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1764 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 1765 ceph_con_send(&session->s_con, msg); 1766 } 1767 return; 1768 out_err: 1769 pr_err("send_cap_releases mds%d, failed to allocate message\n", 1770 session->s_mds); 1771 spin_lock(&session->s_cap_lock); 1772 list_splice(&tmp_list, &session->s_cap_releases); 1773 session->s_num_cap_releases += num_cap_releases; 1774 spin_unlock(&session->s_cap_lock); 1775 } 1776 1777 /* 1778 * requests 1779 */ 1780 1781 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 1782 struct inode *dir) 1783 { 1784 struct ceph_inode_info *ci = ceph_inode(dir); 1785 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 1786 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 1787 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 1788 int order, num_entries; 1789 1790 spin_lock(&ci->i_ceph_lock); 1791 num_entries = ci->i_files + ci->i_subdirs; 1792 spin_unlock(&ci->i_ceph_lock); 1793 num_entries = max(num_entries, 1); 1794 num_entries = min(num_entries, opt->max_readdir); 1795 1796 order = get_order(size * num_entries); 1797 while (order >= 0) { 1798 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 1799 __GFP_NOWARN, 1800 order); 1801 if (rinfo->dir_entries) 1802 break; 1803 order--; 1804 } 1805 if (!rinfo->dir_entries) 1806 return -ENOMEM; 1807 1808 num_entries = (PAGE_SIZE << order) / size; 1809 num_entries = min(num_entries, opt->max_readdir); 1810 1811 rinfo->dir_buf_size = PAGE_SIZE << order; 1812 req->r_num_caps = num_entries + 1; 1813 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 1814 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 1815 return 0; 1816 } 1817 1818 /* 1819 * Create an mds request. 1820 */ 1821 struct ceph_mds_request * 1822 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 1823 { 1824 struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS); 1825 struct timespec64 ts; 1826 1827 if (!req) 1828 return ERR_PTR(-ENOMEM); 1829 1830 mutex_init(&req->r_fill_mutex); 1831 req->r_mdsc = mdsc; 1832 req->r_started = jiffies; 1833 req->r_resend_mds = -1; 1834 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 1835 INIT_LIST_HEAD(&req->r_unsafe_target_item); 1836 req->r_fmode = -1; 1837 kref_init(&req->r_kref); 1838 RB_CLEAR_NODE(&req->r_node); 1839 INIT_LIST_HEAD(&req->r_wait); 1840 init_completion(&req->r_completion); 1841 init_completion(&req->r_safe_completion); 1842 INIT_LIST_HEAD(&req->r_unsafe_item); 1843 1844 ktime_get_coarse_real_ts64(&ts); 1845 req->r_stamp = timespec64_trunc(ts, mdsc->fsc->sb->s_time_gran); 1846 1847 req->r_op = op; 1848 req->r_direct_mode = mode; 1849 return req; 1850 } 1851 1852 /* 1853 * return oldest (lowest) request, tid in request tree, 0 if none. 1854 * 1855 * called under mdsc->mutex. 1856 */ 1857 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 1858 { 1859 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 1860 return NULL; 1861 return rb_entry(rb_first(&mdsc->request_tree), 1862 struct ceph_mds_request, r_node); 1863 } 1864 1865 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 1866 { 1867 return mdsc->oldest_tid; 1868 } 1869 1870 /* 1871 * Build a dentry's path. Allocate on heap; caller must kfree. Based 1872 * on build_path_from_dentry in fs/cifs/dir.c. 1873 * 1874 * If @stop_on_nosnap, generate path relative to the first non-snapped 1875 * inode. 1876 * 1877 * Encode hidden .snap dirs as a double /, i.e. 1878 * foo/.snap/bar -> foo//bar 1879 */ 1880 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base, 1881 int stop_on_nosnap) 1882 { 1883 struct dentry *temp; 1884 char *path; 1885 int len, pos; 1886 unsigned seq; 1887 1888 if (!dentry) 1889 return ERR_PTR(-EINVAL); 1890 1891 retry: 1892 len = 0; 1893 seq = read_seqbegin(&rename_lock); 1894 rcu_read_lock(); 1895 for (temp = dentry; !IS_ROOT(temp);) { 1896 struct inode *inode = d_inode(temp); 1897 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) 1898 len++; /* slash only */ 1899 else if (stop_on_nosnap && inode && 1900 ceph_snap(inode) == CEPH_NOSNAP) 1901 break; 1902 else 1903 len += 1 + temp->d_name.len; 1904 temp = temp->d_parent; 1905 } 1906 rcu_read_unlock(); 1907 if (len) 1908 len--; /* no leading '/' */ 1909 1910 path = kmalloc(len+1, GFP_NOFS); 1911 if (!path) 1912 return ERR_PTR(-ENOMEM); 1913 pos = len; 1914 path[pos] = 0; /* trailing null */ 1915 rcu_read_lock(); 1916 for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) { 1917 struct inode *inode; 1918 1919 spin_lock(&temp->d_lock); 1920 inode = d_inode(temp); 1921 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 1922 dout("build_path path+%d: %p SNAPDIR\n", 1923 pos, temp); 1924 } else if (stop_on_nosnap && inode && 1925 ceph_snap(inode) == CEPH_NOSNAP) { 1926 spin_unlock(&temp->d_lock); 1927 break; 1928 } else { 1929 pos -= temp->d_name.len; 1930 if (pos < 0) { 1931 spin_unlock(&temp->d_lock); 1932 break; 1933 } 1934 strncpy(path + pos, temp->d_name.name, 1935 temp->d_name.len); 1936 } 1937 spin_unlock(&temp->d_lock); 1938 if (pos) 1939 path[--pos] = '/'; 1940 temp = temp->d_parent; 1941 } 1942 rcu_read_unlock(); 1943 if (pos != 0 || read_seqretry(&rename_lock, seq)) { 1944 pr_err("build_path did not end path lookup where " 1945 "expected, namelen is %d, pos is %d\n", len, pos); 1946 /* presumably this is only possible if racing with a 1947 rename of one of the parent directories (we can not 1948 lock the dentries above us to prevent this, but 1949 retrying should be harmless) */ 1950 kfree(path); 1951 goto retry; 1952 } 1953 1954 *base = ceph_ino(d_inode(temp)); 1955 *plen = len; 1956 dout("build_path on %p %d built %llx '%.*s'\n", 1957 dentry, d_count(dentry), *base, len, path); 1958 return path; 1959 } 1960 1961 static int build_dentry_path(struct dentry *dentry, struct inode *dir, 1962 const char **ppath, int *ppathlen, u64 *pino, 1963 int *pfreepath) 1964 { 1965 char *path; 1966 1967 rcu_read_lock(); 1968 if (!dir) 1969 dir = d_inode_rcu(dentry->d_parent); 1970 if (dir && ceph_snap(dir) == CEPH_NOSNAP) { 1971 *pino = ceph_ino(dir); 1972 rcu_read_unlock(); 1973 *ppath = dentry->d_name.name; 1974 *ppathlen = dentry->d_name.len; 1975 return 0; 1976 } 1977 rcu_read_unlock(); 1978 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 1979 if (IS_ERR(path)) 1980 return PTR_ERR(path); 1981 *ppath = path; 1982 *pfreepath = 1; 1983 return 0; 1984 } 1985 1986 static int build_inode_path(struct inode *inode, 1987 const char **ppath, int *ppathlen, u64 *pino, 1988 int *pfreepath) 1989 { 1990 struct dentry *dentry; 1991 char *path; 1992 1993 if (ceph_snap(inode) == CEPH_NOSNAP) { 1994 *pino = ceph_ino(inode); 1995 *ppathlen = 0; 1996 return 0; 1997 } 1998 dentry = d_find_alias(inode); 1999 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2000 dput(dentry); 2001 if (IS_ERR(path)) 2002 return PTR_ERR(path); 2003 *ppath = path; 2004 *pfreepath = 1; 2005 return 0; 2006 } 2007 2008 /* 2009 * request arguments may be specified via an inode *, a dentry *, or 2010 * an explicit ino+path. 2011 */ 2012 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2013 struct inode *rdiri, const char *rpath, 2014 u64 rino, const char **ppath, int *pathlen, 2015 u64 *ino, int *freepath) 2016 { 2017 int r = 0; 2018 2019 if (rinode) { 2020 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2021 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2022 ceph_snap(rinode)); 2023 } else if (rdentry) { 2024 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2025 freepath); 2026 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2027 *ppath); 2028 } else if (rpath || rino) { 2029 *ino = rino; 2030 *ppath = rpath; 2031 *pathlen = rpath ? strlen(rpath) : 0; 2032 dout(" path %.*s\n", *pathlen, rpath); 2033 } 2034 2035 return r; 2036 } 2037 2038 /* 2039 * called under mdsc->mutex 2040 */ 2041 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, 2042 struct ceph_mds_request *req, 2043 int mds, bool drop_cap_releases) 2044 { 2045 struct ceph_msg *msg; 2046 struct ceph_mds_request_head *head; 2047 const char *path1 = NULL; 2048 const char *path2 = NULL; 2049 u64 ino1 = 0, ino2 = 0; 2050 int pathlen1 = 0, pathlen2 = 0; 2051 int freepath1 = 0, freepath2 = 0; 2052 int len; 2053 u16 releases; 2054 void *p, *end; 2055 int ret; 2056 2057 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2058 req->r_parent, req->r_path1, req->r_ino1.ino, 2059 &path1, &pathlen1, &ino1, &freepath1); 2060 if (ret < 0) { 2061 msg = ERR_PTR(ret); 2062 goto out; 2063 } 2064 2065 ret = set_request_path_attr(NULL, req->r_old_dentry, 2066 req->r_old_dentry_dir, 2067 req->r_path2, req->r_ino2.ino, 2068 &path2, &pathlen2, &ino2, &freepath2); 2069 if (ret < 0) { 2070 msg = ERR_PTR(ret); 2071 goto out_free1; 2072 } 2073 2074 len = sizeof(*head) + 2075 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2076 sizeof(struct ceph_timespec); 2077 2078 /* calculate (max) length for cap releases */ 2079 len += sizeof(struct ceph_mds_request_release) * 2080 (!!req->r_inode_drop + !!req->r_dentry_drop + 2081 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2082 if (req->r_dentry_drop) 2083 len += req->r_dentry->d_name.len; 2084 if (req->r_old_dentry_drop) 2085 len += req->r_old_dentry->d_name.len; 2086 2087 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2088 if (!msg) { 2089 msg = ERR_PTR(-ENOMEM); 2090 goto out_free2; 2091 } 2092 2093 msg->hdr.version = cpu_to_le16(2); 2094 msg->hdr.tid = cpu_to_le64(req->r_tid); 2095 2096 head = msg->front.iov_base; 2097 p = msg->front.iov_base + sizeof(*head); 2098 end = msg->front.iov_base + msg->front.iov_len; 2099 2100 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2101 head->op = cpu_to_le32(req->r_op); 2102 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid)); 2103 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid)); 2104 head->args = req->r_args; 2105 2106 ceph_encode_filepath(&p, end, ino1, path1); 2107 ceph_encode_filepath(&p, end, ino2, path2); 2108 2109 /* make note of release offset, in case we need to replay */ 2110 req->r_request_release_offset = p - msg->front.iov_base; 2111 2112 /* cap releases */ 2113 releases = 0; 2114 if (req->r_inode_drop) 2115 releases += ceph_encode_inode_release(&p, 2116 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2117 mds, req->r_inode_drop, req->r_inode_unless, 0); 2118 if (req->r_dentry_drop) 2119 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2120 req->r_parent, mds, req->r_dentry_drop, 2121 req->r_dentry_unless); 2122 if (req->r_old_dentry_drop) 2123 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2124 req->r_old_dentry_dir, mds, 2125 req->r_old_dentry_drop, 2126 req->r_old_dentry_unless); 2127 if (req->r_old_inode_drop) 2128 releases += ceph_encode_inode_release(&p, 2129 d_inode(req->r_old_dentry), 2130 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2131 2132 if (drop_cap_releases) { 2133 releases = 0; 2134 p = msg->front.iov_base + req->r_request_release_offset; 2135 } 2136 2137 head->num_releases = cpu_to_le16(releases); 2138 2139 /* time stamp */ 2140 { 2141 struct ceph_timespec ts; 2142 ceph_encode_timespec64(&ts, &req->r_stamp); 2143 ceph_encode_copy(&p, &ts, sizeof(ts)); 2144 } 2145 2146 BUG_ON(p > end); 2147 msg->front.iov_len = p - msg->front.iov_base; 2148 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2149 2150 if (req->r_pagelist) { 2151 struct ceph_pagelist *pagelist = req->r_pagelist; 2152 ceph_msg_data_add_pagelist(msg, pagelist); 2153 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2154 } else { 2155 msg->hdr.data_len = 0; 2156 } 2157 2158 msg->hdr.data_off = cpu_to_le16(0); 2159 2160 out_free2: 2161 if (freepath2) 2162 kfree((char *)path2); 2163 out_free1: 2164 if (freepath1) 2165 kfree((char *)path1); 2166 out: 2167 return msg; 2168 } 2169 2170 /* 2171 * called under mdsc->mutex if error, under no mutex if 2172 * success. 2173 */ 2174 static void complete_request(struct ceph_mds_client *mdsc, 2175 struct ceph_mds_request *req) 2176 { 2177 if (req->r_callback) 2178 req->r_callback(mdsc, req); 2179 else 2180 complete_all(&req->r_completion); 2181 } 2182 2183 /* 2184 * called under mdsc->mutex 2185 */ 2186 static int __prepare_send_request(struct ceph_mds_client *mdsc, 2187 struct ceph_mds_request *req, 2188 int mds, bool drop_cap_releases) 2189 { 2190 struct ceph_mds_request_head *rhead; 2191 struct ceph_msg *msg; 2192 int flags = 0; 2193 2194 req->r_attempts++; 2195 if (req->r_inode) { 2196 struct ceph_cap *cap = 2197 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2198 2199 if (cap) 2200 req->r_sent_on_mseq = cap->mseq; 2201 else 2202 req->r_sent_on_mseq = -1; 2203 } 2204 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 2205 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2206 2207 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2208 void *p; 2209 /* 2210 * Replay. Do not regenerate message (and rebuild 2211 * paths, etc.); just use the original message. 2212 * Rebuilding paths will break for renames because 2213 * d_move mangles the src name. 2214 */ 2215 msg = req->r_request; 2216 rhead = msg->front.iov_base; 2217 2218 flags = le32_to_cpu(rhead->flags); 2219 flags |= CEPH_MDS_FLAG_REPLAY; 2220 rhead->flags = cpu_to_le32(flags); 2221 2222 if (req->r_target_inode) 2223 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2224 2225 rhead->num_retry = req->r_attempts - 1; 2226 2227 /* remove cap/dentry releases from message */ 2228 rhead->num_releases = 0; 2229 2230 /* time stamp */ 2231 p = msg->front.iov_base + req->r_request_release_offset; 2232 { 2233 struct ceph_timespec ts; 2234 ceph_encode_timespec64(&ts, &req->r_stamp); 2235 ceph_encode_copy(&p, &ts, sizeof(ts)); 2236 } 2237 2238 msg->front.iov_len = p - msg->front.iov_base; 2239 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2240 return 0; 2241 } 2242 2243 if (req->r_request) { 2244 ceph_msg_put(req->r_request); 2245 req->r_request = NULL; 2246 } 2247 msg = create_request_message(mdsc, req, mds, drop_cap_releases); 2248 if (IS_ERR(msg)) { 2249 req->r_err = PTR_ERR(msg); 2250 return PTR_ERR(msg); 2251 } 2252 req->r_request = msg; 2253 2254 rhead = msg->front.iov_base; 2255 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2256 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2257 flags |= CEPH_MDS_FLAG_REPLAY; 2258 if (req->r_parent) 2259 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2260 rhead->flags = cpu_to_le32(flags); 2261 rhead->num_fwd = req->r_num_fwd; 2262 rhead->num_retry = req->r_attempts - 1; 2263 rhead->ino = 0; 2264 2265 dout(" r_parent = %p\n", req->r_parent); 2266 return 0; 2267 } 2268 2269 /* 2270 * send request, or put it on the appropriate wait list. 2271 */ 2272 static void __do_request(struct ceph_mds_client *mdsc, 2273 struct ceph_mds_request *req) 2274 { 2275 struct ceph_mds_session *session = NULL; 2276 int mds = -1; 2277 int err = 0; 2278 2279 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2280 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2281 __unregister_request(mdsc, req); 2282 return; 2283 } 2284 2285 if (req->r_timeout && 2286 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2287 dout("do_request timed out\n"); 2288 err = -EIO; 2289 goto finish; 2290 } 2291 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2292 dout("do_request forced umount\n"); 2293 err = -EIO; 2294 goto finish; 2295 } 2296 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2297 if (mdsc->mdsmap_err) { 2298 err = mdsc->mdsmap_err; 2299 dout("do_request mdsmap err %d\n", err); 2300 goto finish; 2301 } 2302 if (mdsc->mdsmap->m_epoch == 0) { 2303 dout("do_request no mdsmap, waiting for map\n"); 2304 list_add(&req->r_wait, &mdsc->waiting_for_map); 2305 return; 2306 } 2307 if (!(mdsc->fsc->mount_options->flags & 2308 CEPH_MOUNT_OPT_MOUNTWAIT) && 2309 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2310 err = -ENOENT; 2311 pr_info("probably no mds server is up\n"); 2312 goto finish; 2313 } 2314 } 2315 2316 put_request_session(req); 2317 2318 mds = __choose_mds(mdsc, req); 2319 if (mds < 0 || 2320 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2321 dout("do_request no mds or not active, waiting for map\n"); 2322 list_add(&req->r_wait, &mdsc->waiting_for_map); 2323 return; 2324 } 2325 2326 /* get, open session */ 2327 session = __ceph_lookup_mds_session(mdsc, mds); 2328 if (!session) { 2329 session = register_session(mdsc, mds); 2330 if (IS_ERR(session)) { 2331 err = PTR_ERR(session); 2332 goto finish; 2333 } 2334 } 2335 req->r_session = get_session(session); 2336 2337 dout("do_request mds%d session %p state %s\n", mds, session, 2338 ceph_session_state_name(session->s_state)); 2339 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2340 session->s_state != CEPH_MDS_SESSION_HUNG) { 2341 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2342 err = -EACCES; 2343 goto out_session; 2344 } 2345 if (session->s_state == CEPH_MDS_SESSION_NEW || 2346 session->s_state == CEPH_MDS_SESSION_CLOSING) 2347 __open_session(mdsc, session); 2348 list_add(&req->r_wait, &session->s_waiting); 2349 goto out_session; 2350 } 2351 2352 /* send request */ 2353 req->r_resend_mds = -1; /* forget any previous mds hint */ 2354 2355 if (req->r_request_started == 0) /* note request start time */ 2356 req->r_request_started = jiffies; 2357 2358 err = __prepare_send_request(mdsc, req, mds, false); 2359 if (!err) { 2360 ceph_msg_get(req->r_request); 2361 ceph_con_send(&session->s_con, req->r_request); 2362 } 2363 2364 out_session: 2365 ceph_put_mds_session(session); 2366 finish: 2367 if (err) { 2368 dout("__do_request early error %d\n", err); 2369 req->r_err = err; 2370 complete_request(mdsc, req); 2371 __unregister_request(mdsc, req); 2372 } 2373 return; 2374 } 2375 2376 /* 2377 * called under mdsc->mutex 2378 */ 2379 static void __wake_requests(struct ceph_mds_client *mdsc, 2380 struct list_head *head) 2381 { 2382 struct ceph_mds_request *req; 2383 LIST_HEAD(tmp_list); 2384 2385 list_splice_init(head, &tmp_list); 2386 2387 while (!list_empty(&tmp_list)) { 2388 req = list_entry(tmp_list.next, 2389 struct ceph_mds_request, r_wait); 2390 list_del_init(&req->r_wait); 2391 dout(" wake request %p tid %llu\n", req, req->r_tid); 2392 __do_request(mdsc, req); 2393 } 2394 } 2395 2396 /* 2397 * Wake up threads with requests pending for @mds, so that they can 2398 * resubmit their requests to a possibly different mds. 2399 */ 2400 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 2401 { 2402 struct ceph_mds_request *req; 2403 struct rb_node *p = rb_first(&mdsc->request_tree); 2404 2405 dout("kick_requests mds%d\n", mds); 2406 while (p) { 2407 req = rb_entry(p, struct ceph_mds_request, r_node); 2408 p = rb_next(p); 2409 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2410 continue; 2411 if (req->r_attempts > 0) 2412 continue; /* only new requests */ 2413 if (req->r_session && 2414 req->r_session->s_mds == mds) { 2415 dout(" kicking tid %llu\n", req->r_tid); 2416 list_del_init(&req->r_wait); 2417 __do_request(mdsc, req); 2418 } 2419 } 2420 } 2421 2422 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, 2423 struct ceph_mds_request *req) 2424 { 2425 dout("submit_request on %p\n", req); 2426 mutex_lock(&mdsc->mutex); 2427 __register_request(mdsc, req, NULL); 2428 __do_request(mdsc, req); 2429 mutex_unlock(&mdsc->mutex); 2430 } 2431 2432 /* 2433 * Synchrously perform an mds request. Take care of all of the 2434 * session setup, forwarding, retry details. 2435 */ 2436 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 2437 struct inode *dir, 2438 struct ceph_mds_request *req) 2439 { 2440 int err; 2441 2442 dout("do_request on %p\n", req); 2443 2444 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 2445 if (req->r_inode) 2446 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2447 if (req->r_parent) 2448 ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 2449 if (req->r_old_dentry_dir) 2450 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2451 CEPH_CAP_PIN); 2452 2453 /* issue */ 2454 mutex_lock(&mdsc->mutex); 2455 __register_request(mdsc, req, dir); 2456 __do_request(mdsc, req); 2457 2458 if (req->r_err) { 2459 err = req->r_err; 2460 goto out; 2461 } 2462 2463 /* wait */ 2464 mutex_unlock(&mdsc->mutex); 2465 dout("do_request waiting\n"); 2466 if (!req->r_timeout && req->r_wait_for_completion) { 2467 err = req->r_wait_for_completion(mdsc, req); 2468 } else { 2469 long timeleft = wait_for_completion_killable_timeout( 2470 &req->r_completion, 2471 ceph_timeout_jiffies(req->r_timeout)); 2472 if (timeleft > 0) 2473 err = 0; 2474 else if (!timeleft) 2475 err = -EIO; /* timed out */ 2476 else 2477 err = timeleft; /* killed */ 2478 } 2479 dout("do_request waited, got %d\n", err); 2480 mutex_lock(&mdsc->mutex); 2481 2482 /* only abort if we didn't race with a real reply */ 2483 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2484 err = le32_to_cpu(req->r_reply_info.head->result); 2485 } else if (err < 0) { 2486 dout("aborted request %lld with %d\n", req->r_tid, err); 2487 2488 /* 2489 * ensure we aren't running concurrently with 2490 * ceph_fill_trace or ceph_readdir_prepopulate, which 2491 * rely on locks (dir mutex) held by our caller. 2492 */ 2493 mutex_lock(&req->r_fill_mutex); 2494 req->r_err = err; 2495 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 2496 mutex_unlock(&req->r_fill_mutex); 2497 2498 if (req->r_parent && 2499 (req->r_op & CEPH_MDS_OP_WRITE)) 2500 ceph_invalidate_dir_request(req); 2501 } else { 2502 err = req->r_err; 2503 } 2504 2505 out: 2506 mutex_unlock(&mdsc->mutex); 2507 dout("do_request %p done, result %d\n", req, err); 2508 return err; 2509 } 2510 2511 /* 2512 * Invalidate dir's completeness, dentry lease state on an aborted MDS 2513 * namespace request. 2514 */ 2515 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 2516 { 2517 struct inode *dir = req->r_parent; 2518 struct inode *old_dir = req->r_old_dentry_dir; 2519 2520 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 2521 2522 ceph_dir_clear_complete(dir); 2523 if (old_dir) 2524 ceph_dir_clear_complete(old_dir); 2525 if (req->r_dentry) 2526 ceph_invalidate_dentry_lease(req->r_dentry); 2527 if (req->r_old_dentry) 2528 ceph_invalidate_dentry_lease(req->r_old_dentry); 2529 } 2530 2531 /* 2532 * Handle mds reply. 2533 * 2534 * We take the session mutex and parse and process the reply immediately. 2535 * This preserves the logical ordering of replies, capabilities, etc., sent 2536 * by the MDS as they are applied to our local cache. 2537 */ 2538 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 2539 { 2540 struct ceph_mds_client *mdsc = session->s_mdsc; 2541 struct ceph_mds_request *req; 2542 struct ceph_mds_reply_head *head = msg->front.iov_base; 2543 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 2544 struct ceph_snap_realm *realm; 2545 u64 tid; 2546 int err, result; 2547 int mds = session->s_mds; 2548 2549 if (msg->front.iov_len < sizeof(*head)) { 2550 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 2551 ceph_msg_dump(msg); 2552 return; 2553 } 2554 2555 /* get request, session */ 2556 tid = le64_to_cpu(msg->hdr.tid); 2557 mutex_lock(&mdsc->mutex); 2558 req = lookup_get_request(mdsc, tid); 2559 if (!req) { 2560 dout("handle_reply on unknown tid %llu\n", tid); 2561 mutex_unlock(&mdsc->mutex); 2562 return; 2563 } 2564 dout("handle_reply %p\n", req); 2565 2566 /* correct session? */ 2567 if (req->r_session != session) { 2568 pr_err("mdsc_handle_reply got %llu on session mds%d" 2569 " not mds%d\n", tid, session->s_mds, 2570 req->r_session ? req->r_session->s_mds : -1); 2571 mutex_unlock(&mdsc->mutex); 2572 goto out; 2573 } 2574 2575 /* dup? */ 2576 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 2577 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 2578 pr_warn("got a dup %s reply on %llu from mds%d\n", 2579 head->safe ? "safe" : "unsafe", tid, mds); 2580 mutex_unlock(&mdsc->mutex); 2581 goto out; 2582 } 2583 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 2584 pr_warn("got unsafe after safe on %llu from mds%d\n", 2585 tid, mds); 2586 mutex_unlock(&mdsc->mutex); 2587 goto out; 2588 } 2589 2590 result = le32_to_cpu(head->result); 2591 2592 /* 2593 * Handle an ESTALE 2594 * if we're not talking to the authority, send to them 2595 * if the authority has changed while we weren't looking, 2596 * send to new authority 2597 * Otherwise we just have to return an ESTALE 2598 */ 2599 if (result == -ESTALE) { 2600 dout("got ESTALE on request %llu\n", req->r_tid); 2601 req->r_resend_mds = -1; 2602 if (req->r_direct_mode != USE_AUTH_MDS) { 2603 dout("not using auth, setting for that now\n"); 2604 req->r_direct_mode = USE_AUTH_MDS; 2605 __do_request(mdsc, req); 2606 mutex_unlock(&mdsc->mutex); 2607 goto out; 2608 } else { 2609 int mds = __choose_mds(mdsc, req); 2610 if (mds >= 0 && mds != req->r_session->s_mds) { 2611 dout("but auth changed, so resending\n"); 2612 __do_request(mdsc, req); 2613 mutex_unlock(&mdsc->mutex); 2614 goto out; 2615 } 2616 } 2617 dout("have to return ESTALE on request %llu\n", req->r_tid); 2618 } 2619 2620 2621 if (head->safe) { 2622 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 2623 __unregister_request(mdsc, req); 2624 2625 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2626 /* 2627 * We already handled the unsafe response, now do the 2628 * cleanup. No need to examine the response; the MDS 2629 * doesn't include any result info in the safe 2630 * response. And even if it did, there is nothing 2631 * useful we could do with a revised return value. 2632 */ 2633 dout("got safe reply %llu, mds%d\n", tid, mds); 2634 2635 /* last unsafe request during umount? */ 2636 if (mdsc->stopping && !__get_oldest_req(mdsc)) 2637 complete_all(&mdsc->safe_umount_waiters); 2638 mutex_unlock(&mdsc->mutex); 2639 goto out; 2640 } 2641 } else { 2642 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 2643 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 2644 if (req->r_unsafe_dir) { 2645 struct ceph_inode_info *ci = 2646 ceph_inode(req->r_unsafe_dir); 2647 spin_lock(&ci->i_unsafe_lock); 2648 list_add_tail(&req->r_unsafe_dir_item, 2649 &ci->i_unsafe_dirops); 2650 spin_unlock(&ci->i_unsafe_lock); 2651 } 2652 } 2653 2654 dout("handle_reply tid %lld result %d\n", tid, result); 2655 rinfo = &req->r_reply_info; 2656 err = parse_reply_info(msg, rinfo, session->s_con.peer_features); 2657 mutex_unlock(&mdsc->mutex); 2658 2659 mutex_lock(&session->s_mutex); 2660 if (err < 0) { 2661 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 2662 ceph_msg_dump(msg); 2663 goto out_err; 2664 } 2665 2666 /* snap trace */ 2667 realm = NULL; 2668 if (rinfo->snapblob_len) { 2669 down_write(&mdsc->snap_rwsem); 2670 ceph_update_snap_trace(mdsc, rinfo->snapblob, 2671 rinfo->snapblob + rinfo->snapblob_len, 2672 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 2673 &realm); 2674 downgrade_write(&mdsc->snap_rwsem); 2675 } else { 2676 down_read(&mdsc->snap_rwsem); 2677 } 2678 2679 /* insert trace into our cache */ 2680 mutex_lock(&req->r_fill_mutex); 2681 current->journal_info = req; 2682 err = ceph_fill_trace(mdsc->fsc->sb, req); 2683 if (err == 0) { 2684 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 2685 req->r_op == CEPH_MDS_OP_LSSNAP)) 2686 ceph_readdir_prepopulate(req, req->r_session); 2687 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 2688 } 2689 current->journal_info = NULL; 2690 mutex_unlock(&req->r_fill_mutex); 2691 2692 up_read(&mdsc->snap_rwsem); 2693 if (realm) 2694 ceph_put_snap_realm(mdsc, realm); 2695 2696 if (err == 0 && req->r_target_inode && 2697 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2698 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 2699 spin_lock(&ci->i_unsafe_lock); 2700 list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops); 2701 spin_unlock(&ci->i_unsafe_lock); 2702 } 2703 out_err: 2704 mutex_lock(&mdsc->mutex); 2705 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 2706 if (err) { 2707 req->r_err = err; 2708 } else { 2709 req->r_reply = ceph_msg_get(msg); 2710 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 2711 } 2712 } else { 2713 dout("reply arrived after request %lld was aborted\n", tid); 2714 } 2715 mutex_unlock(&mdsc->mutex); 2716 2717 mutex_unlock(&session->s_mutex); 2718 2719 /* kick calling process */ 2720 complete_request(mdsc, req); 2721 out: 2722 ceph_mdsc_put_request(req); 2723 return; 2724 } 2725 2726 2727 2728 /* 2729 * handle mds notification that our request has been forwarded. 2730 */ 2731 static void handle_forward(struct ceph_mds_client *mdsc, 2732 struct ceph_mds_session *session, 2733 struct ceph_msg *msg) 2734 { 2735 struct ceph_mds_request *req; 2736 u64 tid = le64_to_cpu(msg->hdr.tid); 2737 u32 next_mds; 2738 u32 fwd_seq; 2739 int err = -EINVAL; 2740 void *p = msg->front.iov_base; 2741 void *end = p + msg->front.iov_len; 2742 2743 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 2744 next_mds = ceph_decode_32(&p); 2745 fwd_seq = ceph_decode_32(&p); 2746 2747 mutex_lock(&mdsc->mutex); 2748 req = lookup_get_request(mdsc, tid); 2749 if (!req) { 2750 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 2751 goto out; /* dup reply? */ 2752 } 2753 2754 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 2755 dout("forward tid %llu aborted, unregistering\n", tid); 2756 __unregister_request(mdsc, req); 2757 } else if (fwd_seq <= req->r_num_fwd) { 2758 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 2759 tid, next_mds, req->r_num_fwd, fwd_seq); 2760 } else { 2761 /* resend. forward race not possible; mds would drop */ 2762 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 2763 BUG_ON(req->r_err); 2764 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 2765 req->r_attempts = 0; 2766 req->r_num_fwd = fwd_seq; 2767 req->r_resend_mds = next_mds; 2768 put_request_session(req); 2769 __do_request(mdsc, req); 2770 } 2771 ceph_mdsc_put_request(req); 2772 out: 2773 mutex_unlock(&mdsc->mutex); 2774 return; 2775 2776 bad: 2777 pr_err("mdsc_handle_forward decode error err=%d\n", err); 2778 } 2779 2780 /* 2781 * handle a mds session control message 2782 */ 2783 static void handle_session(struct ceph_mds_session *session, 2784 struct ceph_msg *msg) 2785 { 2786 struct ceph_mds_client *mdsc = session->s_mdsc; 2787 u32 op; 2788 u64 seq; 2789 int mds = session->s_mds; 2790 struct ceph_mds_session_head *h = msg->front.iov_base; 2791 int wake = 0; 2792 2793 /* decode */ 2794 if (msg->front.iov_len < sizeof(*h)) 2795 goto bad; 2796 op = le32_to_cpu(h->op); 2797 seq = le64_to_cpu(h->seq); 2798 2799 mutex_lock(&mdsc->mutex); 2800 if (op == CEPH_SESSION_CLOSE) { 2801 get_session(session); 2802 __unregister_session(mdsc, session); 2803 } 2804 /* FIXME: this ttl calculation is generous */ 2805 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 2806 mutex_unlock(&mdsc->mutex); 2807 2808 mutex_lock(&session->s_mutex); 2809 2810 dout("handle_session mds%d %s %p state %s seq %llu\n", 2811 mds, ceph_session_op_name(op), session, 2812 ceph_session_state_name(session->s_state), seq); 2813 2814 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 2815 session->s_state = CEPH_MDS_SESSION_OPEN; 2816 pr_info("mds%d came back\n", session->s_mds); 2817 } 2818 2819 switch (op) { 2820 case CEPH_SESSION_OPEN: 2821 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 2822 pr_info("mds%d reconnect success\n", session->s_mds); 2823 session->s_state = CEPH_MDS_SESSION_OPEN; 2824 renewed_caps(mdsc, session, 0); 2825 wake = 1; 2826 if (mdsc->stopping) 2827 __close_session(mdsc, session); 2828 break; 2829 2830 case CEPH_SESSION_RENEWCAPS: 2831 if (session->s_renew_seq == seq) 2832 renewed_caps(mdsc, session, 1); 2833 break; 2834 2835 case CEPH_SESSION_CLOSE: 2836 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 2837 pr_info("mds%d reconnect denied\n", session->s_mds); 2838 cleanup_session_requests(mdsc, session); 2839 remove_session_caps(session); 2840 wake = 2; /* for good measure */ 2841 wake_up_all(&mdsc->session_close_wq); 2842 break; 2843 2844 case CEPH_SESSION_STALE: 2845 pr_info("mds%d caps went stale, renewing\n", 2846 session->s_mds); 2847 spin_lock(&session->s_gen_ttl_lock); 2848 session->s_cap_gen++; 2849 session->s_cap_ttl = jiffies - 1; 2850 spin_unlock(&session->s_gen_ttl_lock); 2851 send_renew_caps(mdsc, session); 2852 break; 2853 2854 case CEPH_SESSION_RECALL_STATE: 2855 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 2856 break; 2857 2858 case CEPH_SESSION_FLUSHMSG: 2859 send_flushmsg_ack(mdsc, session, seq); 2860 break; 2861 2862 case CEPH_SESSION_FORCE_RO: 2863 dout("force_session_readonly %p\n", session); 2864 spin_lock(&session->s_cap_lock); 2865 session->s_readonly = true; 2866 spin_unlock(&session->s_cap_lock); 2867 wake_up_session_caps(session, FORCE_RO); 2868 break; 2869 2870 case CEPH_SESSION_REJECT: 2871 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 2872 pr_info("mds%d rejected session\n", session->s_mds); 2873 session->s_state = CEPH_MDS_SESSION_REJECTED; 2874 cleanup_session_requests(mdsc, session); 2875 remove_session_caps(session); 2876 wake = 2; /* for good measure */ 2877 break; 2878 2879 default: 2880 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 2881 WARN_ON(1); 2882 } 2883 2884 mutex_unlock(&session->s_mutex); 2885 if (wake) { 2886 mutex_lock(&mdsc->mutex); 2887 __wake_requests(mdsc, &session->s_waiting); 2888 if (wake == 2) 2889 kick_requests(mdsc, mds); 2890 mutex_unlock(&mdsc->mutex); 2891 } 2892 if (op == CEPH_SESSION_CLOSE) 2893 ceph_put_mds_session(session); 2894 return; 2895 2896 bad: 2897 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 2898 (int)msg->front.iov_len); 2899 ceph_msg_dump(msg); 2900 return; 2901 } 2902 2903 2904 /* 2905 * called under session->mutex. 2906 */ 2907 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 2908 struct ceph_mds_session *session) 2909 { 2910 struct ceph_mds_request *req, *nreq; 2911 struct rb_node *p; 2912 int err; 2913 2914 dout("replay_unsafe_requests mds%d\n", session->s_mds); 2915 2916 mutex_lock(&mdsc->mutex); 2917 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) { 2918 err = __prepare_send_request(mdsc, req, session->s_mds, true); 2919 if (!err) { 2920 ceph_msg_get(req->r_request); 2921 ceph_con_send(&session->s_con, req->r_request); 2922 } 2923 } 2924 2925 /* 2926 * also re-send old requests when MDS enters reconnect stage. So that MDS 2927 * can process completed request in clientreplay stage. 2928 */ 2929 p = rb_first(&mdsc->request_tree); 2930 while (p) { 2931 req = rb_entry(p, struct ceph_mds_request, r_node); 2932 p = rb_next(p); 2933 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2934 continue; 2935 if (req->r_attempts == 0) 2936 continue; /* only old requests */ 2937 if (req->r_session && 2938 req->r_session->s_mds == session->s_mds) { 2939 err = __prepare_send_request(mdsc, req, 2940 session->s_mds, true); 2941 if (!err) { 2942 ceph_msg_get(req->r_request); 2943 ceph_con_send(&session->s_con, req->r_request); 2944 } 2945 } 2946 } 2947 mutex_unlock(&mdsc->mutex); 2948 } 2949 2950 /* 2951 * Encode information about a cap for a reconnect with the MDS. 2952 */ 2953 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, 2954 void *arg) 2955 { 2956 union { 2957 struct ceph_mds_cap_reconnect v2; 2958 struct ceph_mds_cap_reconnect_v1 v1; 2959 } rec; 2960 struct ceph_inode_info *ci = cap->ci; 2961 struct ceph_reconnect_state *recon_state = arg; 2962 struct ceph_pagelist *pagelist = recon_state->pagelist; 2963 int err; 2964 u64 snap_follows; 2965 2966 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 2967 inode, ceph_vinop(inode), cap, cap->cap_id, 2968 ceph_cap_string(cap->issued)); 2969 err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 2970 if (err) 2971 return err; 2972 2973 spin_lock(&ci->i_ceph_lock); 2974 cap->seq = 0; /* reset cap seq */ 2975 cap->issue_seq = 0; /* and issue_seq */ 2976 cap->mseq = 0; /* and migrate_seq */ 2977 cap->cap_gen = cap->session->s_cap_gen; 2978 2979 if (recon_state->msg_version >= 2) { 2980 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 2981 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 2982 rec.v2.issued = cpu_to_le32(cap->issued); 2983 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 2984 rec.v2.pathbase = 0; 2985 rec.v2.flock_len = (__force __le32) 2986 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 2987 } else { 2988 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 2989 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 2990 rec.v1.issued = cpu_to_le32(cap->issued); 2991 rec.v1.size = cpu_to_le64(inode->i_size); 2992 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 2993 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 2994 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 2995 rec.v1.pathbase = 0; 2996 } 2997 2998 if (list_empty(&ci->i_cap_snaps)) { 2999 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 3000 } else { 3001 struct ceph_cap_snap *capsnap = 3002 list_first_entry(&ci->i_cap_snaps, 3003 struct ceph_cap_snap, ci_item); 3004 snap_follows = capsnap->follows; 3005 } 3006 spin_unlock(&ci->i_ceph_lock); 3007 3008 if (recon_state->msg_version >= 2) { 3009 int num_fcntl_locks, num_flock_locks; 3010 struct ceph_filelock *flocks = NULL; 3011 size_t struct_len, total_len = 0; 3012 u8 struct_v = 0; 3013 3014 encode_again: 3015 if (rec.v2.flock_len) { 3016 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 3017 } else { 3018 num_fcntl_locks = 0; 3019 num_flock_locks = 0; 3020 } 3021 if (num_fcntl_locks + num_flock_locks > 0) { 3022 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 3023 sizeof(struct ceph_filelock), 3024 GFP_NOFS); 3025 if (!flocks) { 3026 err = -ENOMEM; 3027 goto out_err; 3028 } 3029 err = ceph_encode_locks_to_buffer(inode, flocks, 3030 num_fcntl_locks, 3031 num_flock_locks); 3032 if (err) { 3033 kfree(flocks); 3034 flocks = NULL; 3035 if (err == -ENOSPC) 3036 goto encode_again; 3037 goto out_err; 3038 } 3039 } else { 3040 kfree(flocks); 3041 flocks = NULL; 3042 } 3043 3044 if (recon_state->msg_version >= 3) { 3045 /* version, compat_version and struct_len */ 3046 total_len = 2 * sizeof(u8) + sizeof(u32); 3047 struct_v = 2; 3048 } 3049 /* 3050 * number of encoded locks is stable, so copy to pagelist 3051 */ 3052 struct_len = 2 * sizeof(u32) + 3053 (num_fcntl_locks + num_flock_locks) * 3054 sizeof(struct ceph_filelock); 3055 rec.v2.flock_len = cpu_to_le32(struct_len); 3056 3057 struct_len += sizeof(u32) + sizeof(rec.v2); 3058 3059 if (struct_v >= 2) 3060 struct_len += sizeof(u64); /* snap_follows */ 3061 3062 total_len += struct_len; 3063 err = ceph_pagelist_reserve(pagelist, total_len); 3064 if (err) { 3065 kfree(flocks); 3066 goto out_err; 3067 } 3068 3069 if (recon_state->msg_version >= 3) { 3070 ceph_pagelist_encode_8(pagelist, struct_v); 3071 ceph_pagelist_encode_8(pagelist, 1); 3072 ceph_pagelist_encode_32(pagelist, struct_len); 3073 } 3074 ceph_pagelist_encode_string(pagelist, NULL, 0); 3075 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 3076 ceph_locks_to_pagelist(flocks, pagelist, 3077 num_fcntl_locks, num_flock_locks); 3078 if (struct_v >= 2) 3079 ceph_pagelist_encode_64(pagelist, snap_follows); 3080 3081 kfree(flocks); 3082 } else { 3083 u64 pathbase = 0; 3084 int pathlen = 0; 3085 char *path = NULL; 3086 struct dentry *dentry; 3087 3088 dentry = d_find_alias(inode); 3089 if (dentry) { 3090 path = ceph_mdsc_build_path(dentry, 3091 &pathlen, &pathbase, 0); 3092 dput(dentry); 3093 if (IS_ERR(path)) { 3094 err = PTR_ERR(path); 3095 goto out_err; 3096 } 3097 rec.v1.pathbase = cpu_to_le64(pathbase); 3098 } 3099 3100 err = ceph_pagelist_reserve(pagelist, 3101 pathlen + sizeof(u32) + sizeof(rec.v1)); 3102 if (err) { 3103 kfree(path); 3104 goto out_err; 3105 } 3106 3107 ceph_pagelist_encode_string(pagelist, path, pathlen); 3108 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 3109 3110 kfree(path); 3111 } 3112 3113 recon_state->nr_caps++; 3114 out_err: 3115 return err; 3116 } 3117 3118 3119 /* 3120 * If an MDS fails and recovers, clients need to reconnect in order to 3121 * reestablish shared state. This includes all caps issued through 3122 * this session _and_ the snap_realm hierarchy. Because it's not 3123 * clear which snap realms the mds cares about, we send everything we 3124 * know about.. that ensures we'll then get any new info the 3125 * recovering MDS might have. 3126 * 3127 * This is a relatively heavyweight operation, but it's rare. 3128 * 3129 * called with mdsc->mutex held. 3130 */ 3131 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 3132 struct ceph_mds_session *session) 3133 { 3134 struct ceph_msg *reply; 3135 struct rb_node *p; 3136 int mds = session->s_mds; 3137 int err = -ENOMEM; 3138 int s_nr_caps; 3139 struct ceph_pagelist *pagelist; 3140 struct ceph_reconnect_state recon_state; 3141 LIST_HEAD(dispose); 3142 3143 pr_info("mds%d reconnect start\n", mds); 3144 3145 pagelist = ceph_pagelist_alloc(GFP_NOFS); 3146 if (!pagelist) 3147 goto fail_nopagelist; 3148 3149 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3150 if (!reply) 3151 goto fail_nomsg; 3152 3153 mutex_lock(&session->s_mutex); 3154 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 3155 session->s_seq = 0; 3156 3157 dout("session %p state %s\n", session, 3158 ceph_session_state_name(session->s_state)); 3159 3160 spin_lock(&session->s_gen_ttl_lock); 3161 session->s_cap_gen++; 3162 spin_unlock(&session->s_gen_ttl_lock); 3163 3164 spin_lock(&session->s_cap_lock); 3165 /* don't know if session is readonly */ 3166 session->s_readonly = 0; 3167 /* 3168 * notify __ceph_remove_cap() that we are composing cap reconnect. 3169 * If a cap get released before being added to the cap reconnect, 3170 * __ceph_remove_cap() should skip queuing cap release. 3171 */ 3172 session->s_cap_reconnect = 1; 3173 /* drop old cap expires; we're about to reestablish that state */ 3174 detach_cap_releases(session, &dispose); 3175 spin_unlock(&session->s_cap_lock); 3176 dispose_cap_releases(mdsc, &dispose); 3177 3178 /* trim unused caps to reduce MDS's cache rejoin time */ 3179 if (mdsc->fsc->sb->s_root) 3180 shrink_dcache_parent(mdsc->fsc->sb->s_root); 3181 3182 ceph_con_close(&session->s_con); 3183 ceph_con_open(&session->s_con, 3184 CEPH_ENTITY_TYPE_MDS, mds, 3185 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 3186 3187 /* replay unsafe requests */ 3188 replay_unsafe_requests(mdsc, session); 3189 3190 down_read(&mdsc->snap_rwsem); 3191 3192 /* traverse this session's caps */ 3193 s_nr_caps = session->s_nr_caps; 3194 err = ceph_pagelist_encode_32(pagelist, s_nr_caps); 3195 if (err) 3196 goto fail; 3197 3198 recon_state.nr_caps = 0; 3199 recon_state.pagelist = pagelist; 3200 if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) 3201 recon_state.msg_version = 3; 3202 else 3203 recon_state.msg_version = 2; 3204 err = iterate_session_caps(session, encode_caps_cb, &recon_state); 3205 if (err < 0) 3206 goto fail; 3207 3208 spin_lock(&session->s_cap_lock); 3209 session->s_cap_reconnect = 0; 3210 spin_unlock(&session->s_cap_lock); 3211 3212 /* 3213 * snaprealms. we provide mds with the ino, seq (version), and 3214 * parent for all of our realms. If the mds has any newer info, 3215 * it will tell us. 3216 */ 3217 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 3218 struct ceph_snap_realm *realm = 3219 rb_entry(p, struct ceph_snap_realm, node); 3220 struct ceph_mds_snaprealm_reconnect sr_rec; 3221 3222 dout(" adding snap realm %llx seq %lld parent %llx\n", 3223 realm->ino, realm->seq, realm->parent_ino); 3224 sr_rec.ino = cpu_to_le64(realm->ino); 3225 sr_rec.seq = cpu_to_le64(realm->seq); 3226 sr_rec.parent = cpu_to_le64(realm->parent_ino); 3227 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 3228 if (err) 3229 goto fail; 3230 } 3231 3232 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 3233 3234 /* raced with cap release? */ 3235 if (s_nr_caps != recon_state.nr_caps) { 3236 struct page *page = list_first_entry(&pagelist->head, 3237 struct page, lru); 3238 __le32 *addr = kmap_atomic(page); 3239 *addr = cpu_to_le32(recon_state.nr_caps); 3240 kunmap_atomic(addr); 3241 } 3242 3243 reply->hdr.data_len = cpu_to_le32(pagelist->length); 3244 ceph_msg_data_add_pagelist(reply, pagelist); 3245 3246 ceph_early_kick_flushing_caps(mdsc, session); 3247 3248 ceph_con_send(&session->s_con, reply); 3249 3250 mutex_unlock(&session->s_mutex); 3251 3252 mutex_lock(&mdsc->mutex); 3253 __wake_requests(mdsc, &session->s_waiting); 3254 mutex_unlock(&mdsc->mutex); 3255 3256 up_read(&mdsc->snap_rwsem); 3257 ceph_pagelist_release(pagelist); 3258 return; 3259 3260 fail: 3261 ceph_msg_put(reply); 3262 up_read(&mdsc->snap_rwsem); 3263 mutex_unlock(&session->s_mutex); 3264 fail_nomsg: 3265 ceph_pagelist_release(pagelist); 3266 fail_nopagelist: 3267 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 3268 return; 3269 } 3270 3271 3272 /* 3273 * compare old and new mdsmaps, kicking requests 3274 * and closing out old connections as necessary 3275 * 3276 * called under mdsc->mutex. 3277 */ 3278 static void check_new_map(struct ceph_mds_client *mdsc, 3279 struct ceph_mdsmap *newmap, 3280 struct ceph_mdsmap *oldmap) 3281 { 3282 int i; 3283 int oldstate, newstate; 3284 struct ceph_mds_session *s; 3285 3286 dout("check_new_map new %u old %u\n", 3287 newmap->m_epoch, oldmap->m_epoch); 3288 3289 for (i = 0; i < oldmap->m_num_mds && i < mdsc->max_sessions; i++) { 3290 if (!mdsc->sessions[i]) 3291 continue; 3292 s = mdsc->sessions[i]; 3293 oldstate = ceph_mdsmap_get_state(oldmap, i); 3294 newstate = ceph_mdsmap_get_state(newmap, i); 3295 3296 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 3297 i, ceph_mds_state_name(oldstate), 3298 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 3299 ceph_mds_state_name(newstate), 3300 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 3301 ceph_session_state_name(s->s_state)); 3302 3303 if (i >= newmap->m_num_mds || 3304 memcmp(ceph_mdsmap_get_addr(oldmap, i), 3305 ceph_mdsmap_get_addr(newmap, i), 3306 sizeof(struct ceph_entity_addr))) { 3307 if (s->s_state == CEPH_MDS_SESSION_OPENING) { 3308 /* the session never opened, just close it 3309 * out now */ 3310 get_session(s); 3311 __unregister_session(mdsc, s); 3312 __wake_requests(mdsc, &s->s_waiting); 3313 ceph_put_mds_session(s); 3314 } else if (i >= newmap->m_num_mds) { 3315 /* force close session for stopped mds */ 3316 get_session(s); 3317 __unregister_session(mdsc, s); 3318 __wake_requests(mdsc, &s->s_waiting); 3319 kick_requests(mdsc, i); 3320 mutex_unlock(&mdsc->mutex); 3321 3322 mutex_lock(&s->s_mutex); 3323 cleanup_session_requests(mdsc, s); 3324 remove_session_caps(s); 3325 mutex_unlock(&s->s_mutex); 3326 3327 ceph_put_mds_session(s); 3328 3329 mutex_lock(&mdsc->mutex); 3330 } else { 3331 /* just close it */ 3332 mutex_unlock(&mdsc->mutex); 3333 mutex_lock(&s->s_mutex); 3334 mutex_lock(&mdsc->mutex); 3335 ceph_con_close(&s->s_con); 3336 mutex_unlock(&s->s_mutex); 3337 s->s_state = CEPH_MDS_SESSION_RESTARTING; 3338 } 3339 } else if (oldstate == newstate) { 3340 continue; /* nothing new with this mds */ 3341 } 3342 3343 /* 3344 * send reconnect? 3345 */ 3346 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 3347 newstate >= CEPH_MDS_STATE_RECONNECT) { 3348 mutex_unlock(&mdsc->mutex); 3349 send_mds_reconnect(mdsc, s); 3350 mutex_lock(&mdsc->mutex); 3351 } 3352 3353 /* 3354 * kick request on any mds that has gone active. 3355 */ 3356 if (oldstate < CEPH_MDS_STATE_ACTIVE && 3357 newstate >= CEPH_MDS_STATE_ACTIVE) { 3358 if (oldstate != CEPH_MDS_STATE_CREATING && 3359 oldstate != CEPH_MDS_STATE_STARTING) 3360 pr_info("mds%d recovery completed\n", s->s_mds); 3361 kick_requests(mdsc, i); 3362 ceph_kick_flushing_caps(mdsc, s); 3363 wake_up_session_caps(s, RECONNECT); 3364 } 3365 } 3366 3367 for (i = 0; i < newmap->m_num_mds && i < mdsc->max_sessions; i++) { 3368 s = mdsc->sessions[i]; 3369 if (!s) 3370 continue; 3371 if (!ceph_mdsmap_is_laggy(newmap, i)) 3372 continue; 3373 if (s->s_state == CEPH_MDS_SESSION_OPEN || 3374 s->s_state == CEPH_MDS_SESSION_HUNG || 3375 s->s_state == CEPH_MDS_SESSION_CLOSING) { 3376 dout(" connecting to export targets of laggy mds%d\n", 3377 i); 3378 __open_export_target_sessions(mdsc, s); 3379 } 3380 } 3381 } 3382 3383 3384 3385 /* 3386 * leases 3387 */ 3388 3389 /* 3390 * caller must hold session s_mutex, dentry->d_lock 3391 */ 3392 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 3393 { 3394 struct ceph_dentry_info *di = ceph_dentry(dentry); 3395 3396 ceph_put_mds_session(di->lease_session); 3397 di->lease_session = NULL; 3398 } 3399 3400 static void handle_lease(struct ceph_mds_client *mdsc, 3401 struct ceph_mds_session *session, 3402 struct ceph_msg *msg) 3403 { 3404 struct super_block *sb = mdsc->fsc->sb; 3405 struct inode *inode; 3406 struct dentry *parent, *dentry; 3407 struct ceph_dentry_info *di; 3408 int mds = session->s_mds; 3409 struct ceph_mds_lease *h = msg->front.iov_base; 3410 u32 seq; 3411 struct ceph_vino vino; 3412 struct qstr dname; 3413 int release = 0; 3414 3415 dout("handle_lease from mds%d\n", mds); 3416 3417 /* decode */ 3418 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 3419 goto bad; 3420 vino.ino = le64_to_cpu(h->ino); 3421 vino.snap = CEPH_NOSNAP; 3422 seq = le32_to_cpu(h->seq); 3423 dname.len = get_unaligned_le32(h + 1); 3424 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 3425 goto bad; 3426 dname.name = (void *)(h + 1) + sizeof(u32); 3427 3428 /* lookup inode */ 3429 inode = ceph_find_inode(sb, vino); 3430 dout("handle_lease %s, ino %llx %p %.*s\n", 3431 ceph_lease_op_name(h->action), vino.ino, inode, 3432 dname.len, dname.name); 3433 3434 mutex_lock(&session->s_mutex); 3435 session->s_seq++; 3436 3437 if (!inode) { 3438 dout("handle_lease no inode %llx\n", vino.ino); 3439 goto release; 3440 } 3441 3442 /* dentry */ 3443 parent = d_find_alias(inode); 3444 if (!parent) { 3445 dout("no parent dentry on inode %p\n", inode); 3446 WARN_ON(1); 3447 goto release; /* hrm... */ 3448 } 3449 dname.hash = full_name_hash(parent, dname.name, dname.len); 3450 dentry = d_lookup(parent, &dname); 3451 dput(parent); 3452 if (!dentry) 3453 goto release; 3454 3455 spin_lock(&dentry->d_lock); 3456 di = ceph_dentry(dentry); 3457 switch (h->action) { 3458 case CEPH_MDS_LEASE_REVOKE: 3459 if (di->lease_session == session) { 3460 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 3461 h->seq = cpu_to_le32(di->lease_seq); 3462 __ceph_mdsc_drop_dentry_lease(dentry); 3463 } 3464 release = 1; 3465 break; 3466 3467 case CEPH_MDS_LEASE_RENEW: 3468 if (di->lease_session == session && 3469 di->lease_gen == session->s_cap_gen && 3470 di->lease_renew_from && 3471 di->lease_renew_after == 0) { 3472 unsigned long duration = 3473 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 3474 3475 di->lease_seq = seq; 3476 di->time = di->lease_renew_from + duration; 3477 di->lease_renew_after = di->lease_renew_from + 3478 (duration >> 1); 3479 di->lease_renew_from = 0; 3480 } 3481 break; 3482 } 3483 spin_unlock(&dentry->d_lock); 3484 dput(dentry); 3485 3486 if (!release) 3487 goto out; 3488 3489 release: 3490 /* let's just reuse the same message */ 3491 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 3492 ceph_msg_get(msg); 3493 ceph_con_send(&session->s_con, msg); 3494 3495 out: 3496 iput(inode); 3497 mutex_unlock(&session->s_mutex); 3498 return; 3499 3500 bad: 3501 pr_err("corrupt lease message\n"); 3502 ceph_msg_dump(msg); 3503 } 3504 3505 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 3506 struct inode *inode, 3507 struct dentry *dentry, char action, 3508 u32 seq) 3509 { 3510 struct ceph_msg *msg; 3511 struct ceph_mds_lease *lease; 3512 int len = sizeof(*lease) + sizeof(u32); 3513 int dnamelen = 0; 3514 3515 dout("lease_send_msg inode %p dentry %p %s to mds%d\n", 3516 inode, dentry, ceph_lease_op_name(action), session->s_mds); 3517 dnamelen = dentry->d_name.len; 3518 len += dnamelen; 3519 3520 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 3521 if (!msg) 3522 return; 3523 lease = msg->front.iov_base; 3524 lease->action = action; 3525 lease->ino = cpu_to_le64(ceph_vino(inode).ino); 3526 lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap); 3527 lease->seq = cpu_to_le32(seq); 3528 put_unaligned_le32(dnamelen, lease + 1); 3529 memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen); 3530 3531 /* 3532 * if this is a preemptive lease RELEASE, no need to 3533 * flush request stream, since the actual request will 3534 * soon follow. 3535 */ 3536 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 3537 3538 ceph_con_send(&session->s_con, msg); 3539 } 3540 3541 /* 3542 * lock unlock sessions, to wait ongoing session activities 3543 */ 3544 static void lock_unlock_sessions(struct ceph_mds_client *mdsc) 3545 { 3546 int i; 3547 3548 mutex_lock(&mdsc->mutex); 3549 for (i = 0; i < mdsc->max_sessions; i++) { 3550 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 3551 if (!s) 3552 continue; 3553 mutex_unlock(&mdsc->mutex); 3554 mutex_lock(&s->s_mutex); 3555 mutex_unlock(&s->s_mutex); 3556 ceph_put_mds_session(s); 3557 mutex_lock(&mdsc->mutex); 3558 } 3559 mutex_unlock(&mdsc->mutex); 3560 } 3561 3562 3563 3564 /* 3565 * delayed work -- periodically trim expired leases, renew caps with mds 3566 */ 3567 static void schedule_delayed(struct ceph_mds_client *mdsc) 3568 { 3569 int delay = 5; 3570 unsigned hz = round_jiffies_relative(HZ * delay); 3571 schedule_delayed_work(&mdsc->delayed_work, hz); 3572 } 3573 3574 static void delayed_work(struct work_struct *work) 3575 { 3576 int i; 3577 struct ceph_mds_client *mdsc = 3578 container_of(work, struct ceph_mds_client, delayed_work.work); 3579 int renew_interval; 3580 int renew_caps; 3581 3582 dout("mdsc delayed_work\n"); 3583 ceph_check_delayed_caps(mdsc); 3584 3585 mutex_lock(&mdsc->mutex); 3586 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 3587 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 3588 mdsc->last_renew_caps); 3589 if (renew_caps) 3590 mdsc->last_renew_caps = jiffies; 3591 3592 for (i = 0; i < mdsc->max_sessions; i++) { 3593 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 3594 if (!s) 3595 continue; 3596 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 3597 dout("resending session close request for mds%d\n", 3598 s->s_mds); 3599 request_close_session(mdsc, s); 3600 ceph_put_mds_session(s); 3601 continue; 3602 } 3603 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 3604 if (s->s_state == CEPH_MDS_SESSION_OPEN) { 3605 s->s_state = CEPH_MDS_SESSION_HUNG; 3606 pr_info("mds%d hung\n", s->s_mds); 3607 } 3608 } 3609 if (s->s_state < CEPH_MDS_SESSION_OPEN) { 3610 /* this mds is failed or recovering, just wait */ 3611 ceph_put_mds_session(s); 3612 continue; 3613 } 3614 mutex_unlock(&mdsc->mutex); 3615 3616 mutex_lock(&s->s_mutex); 3617 if (renew_caps) 3618 send_renew_caps(mdsc, s); 3619 else 3620 ceph_con_keepalive(&s->s_con); 3621 if (s->s_state == CEPH_MDS_SESSION_OPEN || 3622 s->s_state == CEPH_MDS_SESSION_HUNG) 3623 ceph_send_cap_releases(mdsc, s); 3624 mutex_unlock(&s->s_mutex); 3625 ceph_put_mds_session(s); 3626 3627 mutex_lock(&mdsc->mutex); 3628 } 3629 mutex_unlock(&mdsc->mutex); 3630 3631 schedule_delayed(mdsc); 3632 } 3633 3634 int ceph_mdsc_init(struct ceph_fs_client *fsc) 3635 3636 { 3637 struct ceph_mds_client *mdsc; 3638 3639 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 3640 if (!mdsc) 3641 return -ENOMEM; 3642 mdsc->fsc = fsc; 3643 mutex_init(&mdsc->mutex); 3644 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 3645 if (!mdsc->mdsmap) { 3646 kfree(mdsc); 3647 return -ENOMEM; 3648 } 3649 3650 fsc->mdsc = mdsc; 3651 init_completion(&mdsc->safe_umount_waiters); 3652 init_waitqueue_head(&mdsc->session_close_wq); 3653 INIT_LIST_HEAD(&mdsc->waiting_for_map); 3654 mdsc->sessions = NULL; 3655 atomic_set(&mdsc->num_sessions, 0); 3656 mdsc->max_sessions = 0; 3657 mdsc->stopping = 0; 3658 atomic64_set(&mdsc->quotarealms_count, 0); 3659 mdsc->last_snap_seq = 0; 3660 init_rwsem(&mdsc->snap_rwsem); 3661 mdsc->snap_realms = RB_ROOT; 3662 INIT_LIST_HEAD(&mdsc->snap_empty); 3663 spin_lock_init(&mdsc->snap_empty_lock); 3664 mdsc->last_tid = 0; 3665 mdsc->oldest_tid = 0; 3666 mdsc->request_tree = RB_ROOT; 3667 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 3668 mdsc->last_renew_caps = jiffies; 3669 INIT_LIST_HEAD(&mdsc->cap_delay_list); 3670 spin_lock_init(&mdsc->cap_delay_lock); 3671 INIT_LIST_HEAD(&mdsc->snap_flush_list); 3672 spin_lock_init(&mdsc->snap_flush_lock); 3673 mdsc->last_cap_flush_tid = 1; 3674 INIT_LIST_HEAD(&mdsc->cap_flush_list); 3675 INIT_LIST_HEAD(&mdsc->cap_dirty); 3676 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 3677 mdsc->num_cap_flushing = 0; 3678 spin_lock_init(&mdsc->cap_dirty_lock); 3679 init_waitqueue_head(&mdsc->cap_flushing_wq); 3680 spin_lock_init(&mdsc->dentry_lru_lock); 3681 INIT_LIST_HEAD(&mdsc->dentry_lru); 3682 3683 ceph_caps_init(mdsc); 3684 ceph_adjust_min_caps(mdsc, fsc->min_caps); 3685 3686 init_rwsem(&mdsc->pool_perm_rwsem); 3687 mdsc->pool_perm_tree = RB_ROOT; 3688 3689 strscpy(mdsc->nodename, utsname()->nodename, 3690 sizeof(mdsc->nodename)); 3691 return 0; 3692 } 3693 3694 /* 3695 * Wait for safe replies on open mds requests. If we time out, drop 3696 * all requests from the tree to avoid dangling dentry refs. 3697 */ 3698 static void wait_requests(struct ceph_mds_client *mdsc) 3699 { 3700 struct ceph_options *opts = mdsc->fsc->client->options; 3701 struct ceph_mds_request *req; 3702 3703 mutex_lock(&mdsc->mutex); 3704 if (__get_oldest_req(mdsc)) { 3705 mutex_unlock(&mdsc->mutex); 3706 3707 dout("wait_requests waiting for requests\n"); 3708 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 3709 ceph_timeout_jiffies(opts->mount_timeout)); 3710 3711 /* tear down remaining requests */ 3712 mutex_lock(&mdsc->mutex); 3713 while ((req = __get_oldest_req(mdsc))) { 3714 dout("wait_requests timed out on tid %llu\n", 3715 req->r_tid); 3716 __unregister_request(mdsc, req); 3717 } 3718 } 3719 mutex_unlock(&mdsc->mutex); 3720 dout("wait_requests done\n"); 3721 } 3722 3723 /* 3724 * called before mount is ro, and before dentries are torn down. 3725 * (hmm, does this still race with new lookups?) 3726 */ 3727 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 3728 { 3729 dout("pre_umount\n"); 3730 mdsc->stopping = 1; 3731 3732 lock_unlock_sessions(mdsc); 3733 ceph_flush_dirty_caps(mdsc); 3734 wait_requests(mdsc); 3735 3736 /* 3737 * wait for reply handlers to drop their request refs and 3738 * their inode/dcache refs 3739 */ 3740 ceph_msgr_flush(); 3741 } 3742 3743 /* 3744 * wait for all write mds requests to flush. 3745 */ 3746 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 3747 { 3748 struct ceph_mds_request *req = NULL, *nextreq; 3749 struct rb_node *n; 3750 3751 mutex_lock(&mdsc->mutex); 3752 dout("wait_unsafe_requests want %lld\n", want_tid); 3753 restart: 3754 req = __get_oldest_req(mdsc); 3755 while (req && req->r_tid <= want_tid) { 3756 /* find next request */ 3757 n = rb_next(&req->r_node); 3758 if (n) 3759 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 3760 else 3761 nextreq = NULL; 3762 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 3763 (req->r_op & CEPH_MDS_OP_WRITE)) { 3764 /* write op */ 3765 ceph_mdsc_get_request(req); 3766 if (nextreq) 3767 ceph_mdsc_get_request(nextreq); 3768 mutex_unlock(&mdsc->mutex); 3769 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 3770 req->r_tid, want_tid); 3771 wait_for_completion(&req->r_safe_completion); 3772 mutex_lock(&mdsc->mutex); 3773 ceph_mdsc_put_request(req); 3774 if (!nextreq) 3775 break; /* next dne before, so we're done! */ 3776 if (RB_EMPTY_NODE(&nextreq->r_node)) { 3777 /* next request was removed from tree */ 3778 ceph_mdsc_put_request(nextreq); 3779 goto restart; 3780 } 3781 ceph_mdsc_put_request(nextreq); /* won't go away */ 3782 } 3783 req = nextreq; 3784 } 3785 mutex_unlock(&mdsc->mutex); 3786 dout("wait_unsafe_requests done\n"); 3787 } 3788 3789 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 3790 { 3791 u64 want_tid, want_flush; 3792 3793 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 3794 return; 3795 3796 dout("sync\n"); 3797 mutex_lock(&mdsc->mutex); 3798 want_tid = mdsc->last_tid; 3799 mutex_unlock(&mdsc->mutex); 3800 3801 ceph_flush_dirty_caps(mdsc); 3802 spin_lock(&mdsc->cap_dirty_lock); 3803 want_flush = mdsc->last_cap_flush_tid; 3804 if (!list_empty(&mdsc->cap_flush_list)) { 3805 struct ceph_cap_flush *cf = 3806 list_last_entry(&mdsc->cap_flush_list, 3807 struct ceph_cap_flush, g_list); 3808 cf->wake = true; 3809 } 3810 spin_unlock(&mdsc->cap_dirty_lock); 3811 3812 dout("sync want tid %lld flush_seq %lld\n", 3813 want_tid, want_flush); 3814 3815 wait_unsafe_requests(mdsc, want_tid); 3816 wait_caps_flush(mdsc, want_flush); 3817 } 3818 3819 /* 3820 * true if all sessions are closed, or we force unmount 3821 */ 3822 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 3823 { 3824 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 3825 return true; 3826 return atomic_read(&mdsc->num_sessions) <= skipped; 3827 } 3828 3829 /* 3830 * called after sb is ro. 3831 */ 3832 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 3833 { 3834 struct ceph_options *opts = mdsc->fsc->client->options; 3835 struct ceph_mds_session *session; 3836 int i; 3837 int skipped = 0; 3838 3839 dout("close_sessions\n"); 3840 3841 /* close sessions */ 3842 mutex_lock(&mdsc->mutex); 3843 for (i = 0; i < mdsc->max_sessions; i++) { 3844 session = __ceph_lookup_mds_session(mdsc, i); 3845 if (!session) 3846 continue; 3847 mutex_unlock(&mdsc->mutex); 3848 mutex_lock(&session->s_mutex); 3849 if (__close_session(mdsc, session) <= 0) 3850 skipped++; 3851 mutex_unlock(&session->s_mutex); 3852 ceph_put_mds_session(session); 3853 mutex_lock(&mdsc->mutex); 3854 } 3855 mutex_unlock(&mdsc->mutex); 3856 3857 dout("waiting for sessions to close\n"); 3858 wait_event_timeout(mdsc->session_close_wq, 3859 done_closing_sessions(mdsc, skipped), 3860 ceph_timeout_jiffies(opts->mount_timeout)); 3861 3862 /* tear down remaining sessions */ 3863 mutex_lock(&mdsc->mutex); 3864 for (i = 0; i < mdsc->max_sessions; i++) { 3865 if (mdsc->sessions[i]) { 3866 session = get_session(mdsc->sessions[i]); 3867 __unregister_session(mdsc, session); 3868 mutex_unlock(&mdsc->mutex); 3869 mutex_lock(&session->s_mutex); 3870 remove_session_caps(session); 3871 mutex_unlock(&session->s_mutex); 3872 ceph_put_mds_session(session); 3873 mutex_lock(&mdsc->mutex); 3874 } 3875 } 3876 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 3877 mutex_unlock(&mdsc->mutex); 3878 3879 ceph_cleanup_empty_realms(mdsc); 3880 3881 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 3882 3883 dout("stopped\n"); 3884 } 3885 3886 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 3887 { 3888 struct ceph_mds_session *session; 3889 int mds; 3890 3891 dout("force umount\n"); 3892 3893 mutex_lock(&mdsc->mutex); 3894 for (mds = 0; mds < mdsc->max_sessions; mds++) { 3895 session = __ceph_lookup_mds_session(mdsc, mds); 3896 if (!session) 3897 continue; 3898 mutex_unlock(&mdsc->mutex); 3899 mutex_lock(&session->s_mutex); 3900 __close_session(mdsc, session); 3901 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 3902 cleanup_session_requests(mdsc, session); 3903 remove_session_caps(session); 3904 } 3905 mutex_unlock(&session->s_mutex); 3906 ceph_put_mds_session(session); 3907 mutex_lock(&mdsc->mutex); 3908 kick_requests(mdsc, mds); 3909 } 3910 __wake_requests(mdsc, &mdsc->waiting_for_map); 3911 mutex_unlock(&mdsc->mutex); 3912 } 3913 3914 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 3915 { 3916 dout("stop\n"); 3917 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 3918 if (mdsc->mdsmap) 3919 ceph_mdsmap_destroy(mdsc->mdsmap); 3920 kfree(mdsc->sessions); 3921 ceph_caps_finalize(mdsc); 3922 ceph_pool_perm_destroy(mdsc); 3923 } 3924 3925 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 3926 { 3927 struct ceph_mds_client *mdsc = fsc->mdsc; 3928 dout("mdsc_destroy %p\n", mdsc); 3929 3930 if (!mdsc) 3931 return; 3932 3933 /* flush out any connection work with references to us */ 3934 ceph_msgr_flush(); 3935 3936 ceph_mdsc_stop(mdsc); 3937 3938 fsc->mdsc = NULL; 3939 kfree(mdsc); 3940 dout("mdsc_destroy %p done\n", mdsc); 3941 } 3942 3943 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 3944 { 3945 struct ceph_fs_client *fsc = mdsc->fsc; 3946 const char *mds_namespace = fsc->mount_options->mds_namespace; 3947 void *p = msg->front.iov_base; 3948 void *end = p + msg->front.iov_len; 3949 u32 epoch; 3950 u32 map_len; 3951 u32 num_fs; 3952 u32 mount_fscid = (u32)-1; 3953 u8 struct_v, struct_cv; 3954 int err = -EINVAL; 3955 3956 ceph_decode_need(&p, end, sizeof(u32), bad); 3957 epoch = ceph_decode_32(&p); 3958 3959 dout("handle_fsmap epoch %u\n", epoch); 3960 3961 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 3962 struct_v = ceph_decode_8(&p); 3963 struct_cv = ceph_decode_8(&p); 3964 map_len = ceph_decode_32(&p); 3965 3966 ceph_decode_need(&p, end, sizeof(u32) * 3, bad); 3967 p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */ 3968 3969 num_fs = ceph_decode_32(&p); 3970 while (num_fs-- > 0) { 3971 void *info_p, *info_end; 3972 u32 info_len; 3973 u8 info_v, info_cv; 3974 u32 fscid, namelen; 3975 3976 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 3977 info_v = ceph_decode_8(&p); 3978 info_cv = ceph_decode_8(&p); 3979 info_len = ceph_decode_32(&p); 3980 ceph_decode_need(&p, end, info_len, bad); 3981 info_p = p; 3982 info_end = p + info_len; 3983 p = info_end; 3984 3985 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 3986 fscid = ceph_decode_32(&info_p); 3987 namelen = ceph_decode_32(&info_p); 3988 ceph_decode_need(&info_p, info_end, namelen, bad); 3989 3990 if (mds_namespace && 3991 strlen(mds_namespace) == namelen && 3992 !strncmp(mds_namespace, (char *)info_p, namelen)) { 3993 mount_fscid = fscid; 3994 break; 3995 } 3996 } 3997 3998 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 3999 if (mount_fscid != (u32)-1) { 4000 fsc->client->monc.fs_cluster_id = mount_fscid; 4001 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 4002 0, true); 4003 ceph_monc_renew_subs(&fsc->client->monc); 4004 } else { 4005 err = -ENOENT; 4006 goto err_out; 4007 } 4008 return; 4009 4010 bad: 4011 pr_err("error decoding fsmap\n"); 4012 err_out: 4013 mutex_lock(&mdsc->mutex); 4014 mdsc->mdsmap_err = err; 4015 __wake_requests(mdsc, &mdsc->waiting_for_map); 4016 mutex_unlock(&mdsc->mutex); 4017 } 4018 4019 /* 4020 * handle mds map update. 4021 */ 4022 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4023 { 4024 u32 epoch; 4025 u32 maplen; 4026 void *p = msg->front.iov_base; 4027 void *end = p + msg->front.iov_len; 4028 struct ceph_mdsmap *newmap, *oldmap; 4029 struct ceph_fsid fsid; 4030 int err = -EINVAL; 4031 4032 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 4033 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 4034 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 4035 return; 4036 epoch = ceph_decode_32(&p); 4037 maplen = ceph_decode_32(&p); 4038 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 4039 4040 /* do we need it? */ 4041 mutex_lock(&mdsc->mutex); 4042 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 4043 dout("handle_map epoch %u <= our %u\n", 4044 epoch, mdsc->mdsmap->m_epoch); 4045 mutex_unlock(&mdsc->mutex); 4046 return; 4047 } 4048 4049 newmap = ceph_mdsmap_decode(&p, end); 4050 if (IS_ERR(newmap)) { 4051 err = PTR_ERR(newmap); 4052 goto bad_unlock; 4053 } 4054 4055 /* swap into place */ 4056 if (mdsc->mdsmap) { 4057 oldmap = mdsc->mdsmap; 4058 mdsc->mdsmap = newmap; 4059 check_new_map(mdsc, newmap, oldmap); 4060 ceph_mdsmap_destroy(oldmap); 4061 } else { 4062 mdsc->mdsmap = newmap; /* first mds map */ 4063 } 4064 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 4065 MAX_LFS_FILESIZE); 4066 4067 __wake_requests(mdsc, &mdsc->waiting_for_map); 4068 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 4069 mdsc->mdsmap->m_epoch); 4070 4071 mutex_unlock(&mdsc->mutex); 4072 schedule_delayed(mdsc); 4073 return; 4074 4075 bad_unlock: 4076 mutex_unlock(&mdsc->mutex); 4077 bad: 4078 pr_err("error decoding mdsmap %d\n", err); 4079 return; 4080 } 4081 4082 static struct ceph_connection *con_get(struct ceph_connection *con) 4083 { 4084 struct ceph_mds_session *s = con->private; 4085 4086 if (get_session(s)) { 4087 dout("mdsc con_get %p ok (%d)\n", s, refcount_read(&s->s_ref)); 4088 return con; 4089 } 4090 dout("mdsc con_get %p FAIL\n", s); 4091 return NULL; 4092 } 4093 4094 static void con_put(struct ceph_connection *con) 4095 { 4096 struct ceph_mds_session *s = con->private; 4097 4098 dout("mdsc con_put %p (%d)\n", s, refcount_read(&s->s_ref) - 1); 4099 ceph_put_mds_session(s); 4100 } 4101 4102 /* 4103 * if the client is unresponsive for long enough, the mds will kill 4104 * the session entirely. 4105 */ 4106 static void peer_reset(struct ceph_connection *con) 4107 { 4108 struct ceph_mds_session *s = con->private; 4109 struct ceph_mds_client *mdsc = s->s_mdsc; 4110 4111 pr_warn("mds%d closed our session\n", s->s_mds); 4112 send_mds_reconnect(mdsc, s); 4113 } 4114 4115 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 4116 { 4117 struct ceph_mds_session *s = con->private; 4118 struct ceph_mds_client *mdsc = s->s_mdsc; 4119 int type = le16_to_cpu(msg->hdr.type); 4120 4121 mutex_lock(&mdsc->mutex); 4122 if (__verify_registered_session(mdsc, s) < 0) { 4123 mutex_unlock(&mdsc->mutex); 4124 goto out; 4125 } 4126 mutex_unlock(&mdsc->mutex); 4127 4128 switch (type) { 4129 case CEPH_MSG_MDS_MAP: 4130 ceph_mdsc_handle_mdsmap(mdsc, msg); 4131 break; 4132 case CEPH_MSG_FS_MAP_USER: 4133 ceph_mdsc_handle_fsmap(mdsc, msg); 4134 break; 4135 case CEPH_MSG_CLIENT_SESSION: 4136 handle_session(s, msg); 4137 break; 4138 case CEPH_MSG_CLIENT_REPLY: 4139 handle_reply(s, msg); 4140 break; 4141 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 4142 handle_forward(mdsc, s, msg); 4143 break; 4144 case CEPH_MSG_CLIENT_CAPS: 4145 ceph_handle_caps(s, msg); 4146 break; 4147 case CEPH_MSG_CLIENT_SNAP: 4148 ceph_handle_snap(mdsc, s, msg); 4149 break; 4150 case CEPH_MSG_CLIENT_LEASE: 4151 handle_lease(mdsc, s, msg); 4152 break; 4153 case CEPH_MSG_CLIENT_QUOTA: 4154 ceph_handle_quota(mdsc, s, msg); 4155 break; 4156 4157 default: 4158 pr_err("received unknown message type %d %s\n", type, 4159 ceph_msg_type_name(type)); 4160 } 4161 out: 4162 ceph_msg_put(msg); 4163 } 4164 4165 /* 4166 * authentication 4167 */ 4168 4169 /* 4170 * Note: returned pointer is the address of a structure that's 4171 * managed separately. Caller must *not* attempt to free it. 4172 */ 4173 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 4174 int *proto, int force_new) 4175 { 4176 struct ceph_mds_session *s = con->private; 4177 struct ceph_mds_client *mdsc = s->s_mdsc; 4178 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4179 struct ceph_auth_handshake *auth = &s->s_auth; 4180 4181 if (force_new && auth->authorizer) { 4182 ceph_auth_destroy_authorizer(auth->authorizer); 4183 auth->authorizer = NULL; 4184 } 4185 if (!auth->authorizer) { 4186 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 4187 auth); 4188 if (ret) 4189 return ERR_PTR(ret); 4190 } else { 4191 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 4192 auth); 4193 if (ret) 4194 return ERR_PTR(ret); 4195 } 4196 *proto = ac->protocol; 4197 4198 return auth; 4199 } 4200 4201 static int add_authorizer_challenge(struct ceph_connection *con, 4202 void *challenge_buf, int challenge_buf_len) 4203 { 4204 struct ceph_mds_session *s = con->private; 4205 struct ceph_mds_client *mdsc = s->s_mdsc; 4206 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4207 4208 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 4209 challenge_buf, challenge_buf_len); 4210 } 4211 4212 static int verify_authorizer_reply(struct ceph_connection *con) 4213 { 4214 struct ceph_mds_session *s = con->private; 4215 struct ceph_mds_client *mdsc = s->s_mdsc; 4216 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4217 4218 return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer); 4219 } 4220 4221 static int invalidate_authorizer(struct ceph_connection *con) 4222 { 4223 struct ceph_mds_session *s = con->private; 4224 struct ceph_mds_client *mdsc = s->s_mdsc; 4225 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4226 4227 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 4228 4229 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 4230 } 4231 4232 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 4233 struct ceph_msg_header *hdr, int *skip) 4234 { 4235 struct ceph_msg *msg; 4236 int type = (int) le16_to_cpu(hdr->type); 4237 int front_len = (int) le32_to_cpu(hdr->front_len); 4238 4239 if (con->in_msg) 4240 return con->in_msg; 4241 4242 *skip = 0; 4243 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 4244 if (!msg) { 4245 pr_err("unable to allocate msg type %d len %d\n", 4246 type, front_len); 4247 return NULL; 4248 } 4249 4250 return msg; 4251 } 4252 4253 static int mds_sign_message(struct ceph_msg *msg) 4254 { 4255 struct ceph_mds_session *s = msg->con->private; 4256 struct ceph_auth_handshake *auth = &s->s_auth; 4257 4258 return ceph_auth_sign_message(auth, msg); 4259 } 4260 4261 static int mds_check_message_signature(struct ceph_msg *msg) 4262 { 4263 struct ceph_mds_session *s = msg->con->private; 4264 struct ceph_auth_handshake *auth = &s->s_auth; 4265 4266 return ceph_auth_check_message_signature(auth, msg); 4267 } 4268 4269 static const struct ceph_connection_operations mds_con_ops = { 4270 .get = con_get, 4271 .put = con_put, 4272 .dispatch = dispatch, 4273 .get_authorizer = get_authorizer, 4274 .add_authorizer_challenge = add_authorizer_challenge, 4275 .verify_authorizer_reply = verify_authorizer_reply, 4276 .invalidate_authorizer = invalidate_authorizer, 4277 .peer_reset = peer_reset, 4278 .alloc_msg = mds_alloc_msg, 4279 .sign_message = mds_sign_message, 4280 .check_message_signature = mds_check_message_signature, 4281 }; 4282 4283 /* eof */ 4284