1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 14 #include "super.h" 15 #include "mds_client.h" 16 17 #include <linux/ceph/ceph_features.h> 18 #include <linux/ceph/messenger.h> 19 #include <linux/ceph/decode.h> 20 #include <linux/ceph/pagelist.h> 21 #include <linux/ceph/auth.h> 22 #include <linux/ceph/debugfs.h> 23 24 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 25 26 /* 27 * A cluster of MDS (metadata server) daemons is responsible for 28 * managing the file system namespace (the directory hierarchy and 29 * inodes) and for coordinating shared access to storage. Metadata is 30 * partitioning hierarchically across a number of servers, and that 31 * partition varies over time as the cluster adjusts the distribution 32 * in order to balance load. 33 * 34 * The MDS client is primarily responsible to managing synchronous 35 * metadata requests for operations like open, unlink, and so forth. 36 * If there is a MDS failure, we find out about it when we (possibly 37 * request and) receive a new MDS map, and can resubmit affected 38 * requests. 39 * 40 * For the most part, though, we take advantage of a lossless 41 * communications channel to the MDS, and do not need to worry about 42 * timing out or resubmitting requests. 43 * 44 * We maintain a stateful "session" with each MDS we interact with. 45 * Within each session, we sent periodic heartbeat messages to ensure 46 * any capabilities or leases we have been issues remain valid. If 47 * the session times out and goes stale, our leases and capabilities 48 * are no longer valid. 49 */ 50 51 struct ceph_reconnect_state { 52 struct ceph_mds_session *session; 53 int nr_caps, nr_realms; 54 struct ceph_pagelist *pagelist; 55 unsigned msg_version; 56 bool allow_multi; 57 }; 58 59 static void __wake_requests(struct ceph_mds_client *mdsc, 60 struct list_head *head); 61 static void ceph_cap_release_work(struct work_struct *work); 62 static void ceph_cap_reclaim_work(struct work_struct *work); 63 64 static const struct ceph_connection_operations mds_con_ops; 65 66 67 /* 68 * mds reply parsing 69 */ 70 71 static int parse_reply_info_quota(void **p, void *end, 72 struct ceph_mds_reply_info_in *info) 73 { 74 u8 struct_v, struct_compat; 75 u32 struct_len; 76 77 ceph_decode_8_safe(p, end, struct_v, bad); 78 ceph_decode_8_safe(p, end, struct_compat, bad); 79 /* struct_v is expected to be >= 1. we only 80 * understand encoding with struct_compat == 1. */ 81 if (!struct_v || struct_compat != 1) 82 goto bad; 83 ceph_decode_32_safe(p, end, struct_len, bad); 84 ceph_decode_need(p, end, struct_len, bad); 85 end = *p + struct_len; 86 ceph_decode_64_safe(p, end, info->max_bytes, bad); 87 ceph_decode_64_safe(p, end, info->max_files, bad); 88 *p = end; 89 return 0; 90 bad: 91 return -EIO; 92 } 93 94 /* 95 * parse individual inode info 96 */ 97 static int parse_reply_info_in(void **p, void *end, 98 struct ceph_mds_reply_info_in *info, 99 u64 features) 100 { 101 int err = 0; 102 u8 struct_v = 0; 103 104 if (features == (u64)-1) { 105 u32 struct_len; 106 u8 struct_compat; 107 ceph_decode_8_safe(p, end, struct_v, bad); 108 ceph_decode_8_safe(p, end, struct_compat, bad); 109 /* struct_v is expected to be >= 1. we only understand 110 * encoding with struct_compat == 1. */ 111 if (!struct_v || struct_compat != 1) 112 goto bad; 113 ceph_decode_32_safe(p, end, struct_len, bad); 114 ceph_decode_need(p, end, struct_len, bad); 115 end = *p + struct_len; 116 } 117 118 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 119 info->in = *p; 120 *p += sizeof(struct ceph_mds_reply_inode) + 121 sizeof(*info->in->fragtree.splits) * 122 le32_to_cpu(info->in->fragtree.nsplits); 123 124 ceph_decode_32_safe(p, end, info->symlink_len, bad); 125 ceph_decode_need(p, end, info->symlink_len, bad); 126 info->symlink = *p; 127 *p += info->symlink_len; 128 129 ceph_decode_copy_safe(p, end, &info->dir_layout, 130 sizeof(info->dir_layout), bad); 131 ceph_decode_32_safe(p, end, info->xattr_len, bad); 132 ceph_decode_need(p, end, info->xattr_len, bad); 133 info->xattr_data = *p; 134 *p += info->xattr_len; 135 136 if (features == (u64)-1) { 137 /* inline data */ 138 ceph_decode_64_safe(p, end, info->inline_version, bad); 139 ceph_decode_32_safe(p, end, info->inline_len, bad); 140 ceph_decode_need(p, end, info->inline_len, bad); 141 info->inline_data = *p; 142 *p += info->inline_len; 143 /* quota */ 144 err = parse_reply_info_quota(p, end, info); 145 if (err < 0) 146 goto out_bad; 147 /* pool namespace */ 148 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 149 if (info->pool_ns_len > 0) { 150 ceph_decode_need(p, end, info->pool_ns_len, bad); 151 info->pool_ns_data = *p; 152 *p += info->pool_ns_len; 153 } 154 155 /* btime */ 156 ceph_decode_need(p, end, sizeof(info->btime), bad); 157 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 158 159 /* change attribute */ 160 ceph_decode_64_safe(p, end, info->change_attr, bad); 161 162 /* dir pin */ 163 if (struct_v >= 2) { 164 ceph_decode_32_safe(p, end, info->dir_pin, bad); 165 } else { 166 info->dir_pin = -ENODATA; 167 } 168 169 /* snapshot birth time, remains zero for v<=2 */ 170 if (struct_v >= 3) { 171 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 172 ceph_decode_copy(p, &info->snap_btime, 173 sizeof(info->snap_btime)); 174 } else { 175 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 176 } 177 178 *p = end; 179 } else { 180 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 181 ceph_decode_64_safe(p, end, info->inline_version, bad); 182 ceph_decode_32_safe(p, end, info->inline_len, bad); 183 ceph_decode_need(p, end, info->inline_len, bad); 184 info->inline_data = *p; 185 *p += info->inline_len; 186 } else 187 info->inline_version = CEPH_INLINE_NONE; 188 189 if (features & CEPH_FEATURE_MDS_QUOTA) { 190 err = parse_reply_info_quota(p, end, info); 191 if (err < 0) 192 goto out_bad; 193 } else { 194 info->max_bytes = 0; 195 info->max_files = 0; 196 } 197 198 info->pool_ns_len = 0; 199 info->pool_ns_data = NULL; 200 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 201 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 202 if (info->pool_ns_len > 0) { 203 ceph_decode_need(p, end, info->pool_ns_len, bad); 204 info->pool_ns_data = *p; 205 *p += info->pool_ns_len; 206 } 207 } 208 209 if (features & CEPH_FEATURE_FS_BTIME) { 210 ceph_decode_need(p, end, sizeof(info->btime), bad); 211 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 212 ceph_decode_64_safe(p, end, info->change_attr, bad); 213 } 214 215 info->dir_pin = -ENODATA; 216 /* info->snap_btime remains zero */ 217 } 218 return 0; 219 bad: 220 err = -EIO; 221 out_bad: 222 return err; 223 } 224 225 static int parse_reply_info_dir(void **p, void *end, 226 struct ceph_mds_reply_dirfrag **dirfrag, 227 u64 features) 228 { 229 if (features == (u64)-1) { 230 u8 struct_v, struct_compat; 231 u32 struct_len; 232 ceph_decode_8_safe(p, end, struct_v, bad); 233 ceph_decode_8_safe(p, end, struct_compat, bad); 234 /* struct_v is expected to be >= 1. we only understand 235 * encoding whose struct_compat == 1. */ 236 if (!struct_v || struct_compat != 1) 237 goto bad; 238 ceph_decode_32_safe(p, end, struct_len, bad); 239 ceph_decode_need(p, end, struct_len, bad); 240 end = *p + struct_len; 241 } 242 243 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 244 *dirfrag = *p; 245 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 246 if (unlikely(*p > end)) 247 goto bad; 248 if (features == (u64)-1) 249 *p = end; 250 return 0; 251 bad: 252 return -EIO; 253 } 254 255 static int parse_reply_info_lease(void **p, void *end, 256 struct ceph_mds_reply_lease **lease, 257 u64 features) 258 { 259 if (features == (u64)-1) { 260 u8 struct_v, struct_compat; 261 u32 struct_len; 262 ceph_decode_8_safe(p, end, struct_v, bad); 263 ceph_decode_8_safe(p, end, struct_compat, bad); 264 /* struct_v is expected to be >= 1. we only understand 265 * encoding whose struct_compat == 1. */ 266 if (!struct_v || struct_compat != 1) 267 goto bad; 268 ceph_decode_32_safe(p, end, struct_len, bad); 269 ceph_decode_need(p, end, struct_len, bad); 270 end = *p + struct_len; 271 } 272 273 ceph_decode_need(p, end, sizeof(**lease), bad); 274 *lease = *p; 275 *p += sizeof(**lease); 276 if (features == (u64)-1) 277 *p = end; 278 return 0; 279 bad: 280 return -EIO; 281 } 282 283 /* 284 * parse a normal reply, which may contain a (dir+)dentry and/or a 285 * target inode. 286 */ 287 static int parse_reply_info_trace(void **p, void *end, 288 struct ceph_mds_reply_info_parsed *info, 289 u64 features) 290 { 291 int err; 292 293 if (info->head->is_dentry) { 294 err = parse_reply_info_in(p, end, &info->diri, features); 295 if (err < 0) 296 goto out_bad; 297 298 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 299 if (err < 0) 300 goto out_bad; 301 302 ceph_decode_32_safe(p, end, info->dname_len, bad); 303 ceph_decode_need(p, end, info->dname_len, bad); 304 info->dname = *p; 305 *p += info->dname_len; 306 307 err = parse_reply_info_lease(p, end, &info->dlease, features); 308 if (err < 0) 309 goto out_bad; 310 } 311 312 if (info->head->is_target) { 313 err = parse_reply_info_in(p, end, &info->targeti, features); 314 if (err < 0) 315 goto out_bad; 316 } 317 318 if (unlikely(*p != end)) 319 goto bad; 320 return 0; 321 322 bad: 323 err = -EIO; 324 out_bad: 325 pr_err("problem parsing mds trace %d\n", err); 326 return err; 327 } 328 329 /* 330 * parse readdir results 331 */ 332 static int parse_reply_info_readdir(void **p, void *end, 333 struct ceph_mds_reply_info_parsed *info, 334 u64 features) 335 { 336 u32 num, i = 0; 337 int err; 338 339 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 340 if (err < 0) 341 goto out_bad; 342 343 ceph_decode_need(p, end, sizeof(num) + 2, bad); 344 num = ceph_decode_32(p); 345 { 346 u16 flags = ceph_decode_16(p); 347 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 348 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 349 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 350 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 351 } 352 if (num == 0) 353 goto done; 354 355 BUG_ON(!info->dir_entries); 356 if ((unsigned long)(info->dir_entries + num) > 357 (unsigned long)info->dir_entries + info->dir_buf_size) { 358 pr_err("dir contents are larger than expected\n"); 359 WARN_ON(1); 360 goto bad; 361 } 362 363 info->dir_nr = num; 364 while (num) { 365 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 366 /* dentry */ 367 ceph_decode_32_safe(p, end, rde->name_len, bad); 368 ceph_decode_need(p, end, rde->name_len, bad); 369 rde->name = *p; 370 *p += rde->name_len; 371 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 372 373 /* dentry lease */ 374 err = parse_reply_info_lease(p, end, &rde->lease, features); 375 if (err) 376 goto out_bad; 377 /* inode */ 378 err = parse_reply_info_in(p, end, &rde->inode, features); 379 if (err < 0) 380 goto out_bad; 381 /* ceph_readdir_prepopulate() will update it */ 382 rde->offset = 0; 383 i++; 384 num--; 385 } 386 387 done: 388 /* Skip over any unrecognized fields */ 389 *p = end; 390 return 0; 391 392 bad: 393 err = -EIO; 394 out_bad: 395 pr_err("problem parsing dir contents %d\n", err); 396 return err; 397 } 398 399 /* 400 * parse fcntl F_GETLK results 401 */ 402 static int parse_reply_info_filelock(void **p, void *end, 403 struct ceph_mds_reply_info_parsed *info, 404 u64 features) 405 { 406 if (*p + sizeof(*info->filelock_reply) > end) 407 goto bad; 408 409 info->filelock_reply = *p; 410 411 /* Skip over any unrecognized fields */ 412 *p = end; 413 return 0; 414 bad: 415 return -EIO; 416 } 417 418 /* 419 * parse create results 420 */ 421 static int parse_reply_info_create(void **p, void *end, 422 struct ceph_mds_reply_info_parsed *info, 423 u64 features) 424 { 425 if (features == (u64)-1 || 426 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 427 /* Malformed reply? */ 428 if (*p == end) { 429 info->has_create_ino = false; 430 } else { 431 info->has_create_ino = true; 432 ceph_decode_64_safe(p, end, info->ino, bad); 433 } 434 } else { 435 if (*p != end) 436 goto bad; 437 } 438 439 /* Skip over any unrecognized fields */ 440 *p = end; 441 return 0; 442 bad: 443 return -EIO; 444 } 445 446 /* 447 * parse extra results 448 */ 449 static int parse_reply_info_extra(void **p, void *end, 450 struct ceph_mds_reply_info_parsed *info, 451 u64 features) 452 { 453 u32 op = le32_to_cpu(info->head->op); 454 455 if (op == CEPH_MDS_OP_GETFILELOCK) 456 return parse_reply_info_filelock(p, end, info, features); 457 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 458 return parse_reply_info_readdir(p, end, info, features); 459 else if (op == CEPH_MDS_OP_CREATE) 460 return parse_reply_info_create(p, end, info, features); 461 else 462 return -EIO; 463 } 464 465 /* 466 * parse entire mds reply 467 */ 468 static int parse_reply_info(struct ceph_msg *msg, 469 struct ceph_mds_reply_info_parsed *info, 470 u64 features) 471 { 472 void *p, *end; 473 u32 len; 474 int err; 475 476 info->head = msg->front.iov_base; 477 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 478 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 479 480 /* trace */ 481 ceph_decode_32_safe(&p, end, len, bad); 482 if (len > 0) { 483 ceph_decode_need(&p, end, len, bad); 484 err = parse_reply_info_trace(&p, p+len, info, features); 485 if (err < 0) 486 goto out_bad; 487 } 488 489 /* extra */ 490 ceph_decode_32_safe(&p, end, len, bad); 491 if (len > 0) { 492 ceph_decode_need(&p, end, len, bad); 493 err = parse_reply_info_extra(&p, p+len, info, features); 494 if (err < 0) 495 goto out_bad; 496 } 497 498 /* snap blob */ 499 ceph_decode_32_safe(&p, end, len, bad); 500 info->snapblob_len = len; 501 info->snapblob = p; 502 p += len; 503 504 if (p != end) 505 goto bad; 506 return 0; 507 508 bad: 509 err = -EIO; 510 out_bad: 511 pr_err("mds parse_reply err %d\n", err); 512 return err; 513 } 514 515 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 516 { 517 if (!info->dir_entries) 518 return; 519 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 520 } 521 522 523 /* 524 * sessions 525 */ 526 const char *ceph_session_state_name(int s) 527 { 528 switch (s) { 529 case CEPH_MDS_SESSION_NEW: return "new"; 530 case CEPH_MDS_SESSION_OPENING: return "opening"; 531 case CEPH_MDS_SESSION_OPEN: return "open"; 532 case CEPH_MDS_SESSION_HUNG: return "hung"; 533 case CEPH_MDS_SESSION_CLOSING: return "closing"; 534 case CEPH_MDS_SESSION_CLOSED: return "closed"; 535 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 536 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 537 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 538 default: return "???"; 539 } 540 } 541 542 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 543 { 544 if (refcount_inc_not_zero(&s->s_ref)) { 545 dout("mdsc get_session %p %d -> %d\n", s, 546 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref)); 547 return s; 548 } else { 549 dout("mdsc get_session %p 0 -- FAIL\n", s); 550 return NULL; 551 } 552 } 553 554 void ceph_put_mds_session(struct ceph_mds_session *s) 555 { 556 dout("mdsc put_session %p %d -> %d\n", s, 557 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1); 558 if (refcount_dec_and_test(&s->s_ref)) { 559 if (s->s_auth.authorizer) 560 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 561 kfree(s); 562 } 563 } 564 565 /* 566 * called under mdsc->mutex 567 */ 568 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 569 int mds) 570 { 571 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 572 return NULL; 573 return ceph_get_mds_session(mdsc->sessions[mds]); 574 } 575 576 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 577 { 578 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 579 return false; 580 else 581 return true; 582 } 583 584 static int __verify_registered_session(struct ceph_mds_client *mdsc, 585 struct ceph_mds_session *s) 586 { 587 if (s->s_mds >= mdsc->max_sessions || 588 mdsc->sessions[s->s_mds] != s) 589 return -ENOENT; 590 return 0; 591 } 592 593 /* 594 * create+register a new session for given mds. 595 * called under mdsc->mutex. 596 */ 597 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 598 int mds) 599 { 600 struct ceph_mds_session *s; 601 602 if (mds >= mdsc->mdsmap->possible_max_rank) 603 return ERR_PTR(-EINVAL); 604 605 s = kzalloc(sizeof(*s), GFP_NOFS); 606 if (!s) 607 return ERR_PTR(-ENOMEM); 608 609 if (mds >= mdsc->max_sessions) { 610 int newmax = 1 << get_count_order(mds + 1); 611 struct ceph_mds_session **sa; 612 613 dout("%s: realloc to %d\n", __func__, newmax); 614 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 615 if (!sa) 616 goto fail_realloc; 617 if (mdsc->sessions) { 618 memcpy(sa, mdsc->sessions, 619 mdsc->max_sessions * sizeof(void *)); 620 kfree(mdsc->sessions); 621 } 622 mdsc->sessions = sa; 623 mdsc->max_sessions = newmax; 624 } 625 626 dout("%s: mds%d\n", __func__, mds); 627 s->s_mdsc = mdsc; 628 s->s_mds = mds; 629 s->s_state = CEPH_MDS_SESSION_NEW; 630 s->s_ttl = 0; 631 s->s_seq = 0; 632 mutex_init(&s->s_mutex); 633 634 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 635 636 spin_lock_init(&s->s_gen_ttl_lock); 637 s->s_cap_gen = 1; 638 s->s_cap_ttl = jiffies - 1; 639 640 spin_lock_init(&s->s_cap_lock); 641 s->s_renew_requested = 0; 642 s->s_renew_seq = 0; 643 INIT_LIST_HEAD(&s->s_caps); 644 s->s_nr_caps = 0; 645 refcount_set(&s->s_ref, 1); 646 INIT_LIST_HEAD(&s->s_waiting); 647 INIT_LIST_HEAD(&s->s_unsafe); 648 s->s_num_cap_releases = 0; 649 s->s_cap_reconnect = 0; 650 s->s_cap_iterator = NULL; 651 INIT_LIST_HEAD(&s->s_cap_releases); 652 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 653 654 INIT_LIST_HEAD(&s->s_cap_flushing); 655 656 mdsc->sessions[mds] = s; 657 atomic_inc(&mdsc->num_sessions); 658 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 659 660 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 661 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 662 663 return s; 664 665 fail_realloc: 666 kfree(s); 667 return ERR_PTR(-ENOMEM); 668 } 669 670 /* 671 * called under mdsc->mutex 672 */ 673 static void __unregister_session(struct ceph_mds_client *mdsc, 674 struct ceph_mds_session *s) 675 { 676 dout("__unregister_session mds%d %p\n", s->s_mds, s); 677 BUG_ON(mdsc->sessions[s->s_mds] != s); 678 mdsc->sessions[s->s_mds] = NULL; 679 ceph_con_close(&s->s_con); 680 ceph_put_mds_session(s); 681 atomic_dec(&mdsc->num_sessions); 682 } 683 684 /* 685 * drop session refs in request. 686 * 687 * should be last request ref, or hold mdsc->mutex 688 */ 689 static void put_request_session(struct ceph_mds_request *req) 690 { 691 if (req->r_session) { 692 ceph_put_mds_session(req->r_session); 693 req->r_session = NULL; 694 } 695 } 696 697 void ceph_mdsc_release_request(struct kref *kref) 698 { 699 struct ceph_mds_request *req = container_of(kref, 700 struct ceph_mds_request, 701 r_kref); 702 destroy_reply_info(&req->r_reply_info); 703 if (req->r_request) 704 ceph_msg_put(req->r_request); 705 if (req->r_reply) 706 ceph_msg_put(req->r_reply); 707 if (req->r_inode) { 708 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 709 /* avoid calling iput_final() in mds dispatch threads */ 710 ceph_async_iput(req->r_inode); 711 } 712 if (req->r_parent) { 713 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 714 ceph_async_iput(req->r_parent); 715 } 716 ceph_async_iput(req->r_target_inode); 717 if (req->r_dentry) 718 dput(req->r_dentry); 719 if (req->r_old_dentry) 720 dput(req->r_old_dentry); 721 if (req->r_old_dentry_dir) { 722 /* 723 * track (and drop pins for) r_old_dentry_dir 724 * separately, since r_old_dentry's d_parent may have 725 * changed between the dir mutex being dropped and 726 * this request being freed. 727 */ 728 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 729 CEPH_CAP_PIN); 730 ceph_async_iput(req->r_old_dentry_dir); 731 } 732 kfree(req->r_path1); 733 kfree(req->r_path2); 734 if (req->r_pagelist) 735 ceph_pagelist_release(req->r_pagelist); 736 put_request_session(req); 737 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 738 WARN_ON_ONCE(!list_empty(&req->r_wait)); 739 kfree(req); 740 } 741 742 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 743 744 /* 745 * lookup session, bump ref if found. 746 * 747 * called under mdsc->mutex. 748 */ 749 static struct ceph_mds_request * 750 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 751 { 752 struct ceph_mds_request *req; 753 754 req = lookup_request(&mdsc->request_tree, tid); 755 if (req) 756 ceph_mdsc_get_request(req); 757 758 return req; 759 } 760 761 /* 762 * Register an in-flight request, and assign a tid. Link to directory 763 * are modifying (if any). 764 * 765 * Called under mdsc->mutex. 766 */ 767 static void __register_request(struct ceph_mds_client *mdsc, 768 struct ceph_mds_request *req, 769 struct inode *dir) 770 { 771 int ret = 0; 772 773 req->r_tid = ++mdsc->last_tid; 774 if (req->r_num_caps) { 775 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 776 req->r_num_caps); 777 if (ret < 0) { 778 pr_err("__register_request %p " 779 "failed to reserve caps: %d\n", req, ret); 780 /* set req->r_err to fail early from __do_request */ 781 req->r_err = ret; 782 return; 783 } 784 } 785 dout("__register_request %p tid %lld\n", req, req->r_tid); 786 ceph_mdsc_get_request(req); 787 insert_request(&mdsc->request_tree, req); 788 789 req->r_uid = current_fsuid(); 790 req->r_gid = current_fsgid(); 791 792 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 793 mdsc->oldest_tid = req->r_tid; 794 795 if (dir) { 796 ihold(dir); 797 req->r_unsafe_dir = dir; 798 } 799 } 800 801 static void __unregister_request(struct ceph_mds_client *mdsc, 802 struct ceph_mds_request *req) 803 { 804 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 805 806 /* Never leave an unregistered request on an unsafe list! */ 807 list_del_init(&req->r_unsafe_item); 808 809 if (req->r_tid == mdsc->oldest_tid) { 810 struct rb_node *p = rb_next(&req->r_node); 811 mdsc->oldest_tid = 0; 812 while (p) { 813 struct ceph_mds_request *next_req = 814 rb_entry(p, struct ceph_mds_request, r_node); 815 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 816 mdsc->oldest_tid = next_req->r_tid; 817 break; 818 } 819 p = rb_next(p); 820 } 821 } 822 823 erase_request(&mdsc->request_tree, req); 824 825 if (req->r_unsafe_dir && 826 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 827 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 828 spin_lock(&ci->i_unsafe_lock); 829 list_del_init(&req->r_unsafe_dir_item); 830 spin_unlock(&ci->i_unsafe_lock); 831 } 832 if (req->r_target_inode && 833 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 834 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 835 spin_lock(&ci->i_unsafe_lock); 836 list_del_init(&req->r_unsafe_target_item); 837 spin_unlock(&ci->i_unsafe_lock); 838 } 839 840 if (req->r_unsafe_dir) { 841 /* avoid calling iput_final() in mds dispatch threads */ 842 ceph_async_iput(req->r_unsafe_dir); 843 req->r_unsafe_dir = NULL; 844 } 845 846 complete_all(&req->r_safe_completion); 847 848 ceph_mdsc_put_request(req); 849 } 850 851 /* 852 * Walk back up the dentry tree until we hit a dentry representing a 853 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 854 * when calling this) to ensure that the objects won't disappear while we're 855 * working with them. Once we hit a candidate dentry, we attempt to take a 856 * reference to it, and return that as the result. 857 */ 858 static struct inode *get_nonsnap_parent(struct dentry *dentry) 859 { 860 struct inode *inode = NULL; 861 862 while (dentry && !IS_ROOT(dentry)) { 863 inode = d_inode_rcu(dentry); 864 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 865 break; 866 dentry = dentry->d_parent; 867 } 868 if (inode) 869 inode = igrab(inode); 870 return inode; 871 } 872 873 /* 874 * Choose mds to send request to next. If there is a hint set in the 875 * request (e.g., due to a prior forward hint from the mds), use that. 876 * Otherwise, consult frag tree and/or caps to identify the 877 * appropriate mds. If all else fails, choose randomly. 878 * 879 * Called under mdsc->mutex. 880 */ 881 static int __choose_mds(struct ceph_mds_client *mdsc, 882 struct ceph_mds_request *req, 883 bool *random) 884 { 885 struct inode *inode; 886 struct ceph_inode_info *ci; 887 struct ceph_cap *cap; 888 int mode = req->r_direct_mode; 889 int mds = -1; 890 u32 hash = req->r_direct_hash; 891 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 892 893 if (random) 894 *random = false; 895 896 /* 897 * is there a specific mds we should try? ignore hint if we have 898 * no session and the mds is not up (active or recovering). 899 */ 900 if (req->r_resend_mds >= 0 && 901 (__have_session(mdsc, req->r_resend_mds) || 902 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 903 dout("%s using resend_mds mds%d\n", __func__, 904 req->r_resend_mds); 905 return req->r_resend_mds; 906 } 907 908 if (mode == USE_RANDOM_MDS) 909 goto random; 910 911 inode = NULL; 912 if (req->r_inode) { 913 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 914 inode = req->r_inode; 915 ihold(inode); 916 } else { 917 /* req->r_dentry is non-null for LSSNAP request */ 918 rcu_read_lock(); 919 inode = get_nonsnap_parent(req->r_dentry); 920 rcu_read_unlock(); 921 dout("%s using snapdir's parent %p\n", __func__, inode); 922 } 923 } else if (req->r_dentry) { 924 /* ignore race with rename; old or new d_parent is okay */ 925 struct dentry *parent; 926 struct inode *dir; 927 928 rcu_read_lock(); 929 parent = READ_ONCE(req->r_dentry->d_parent); 930 dir = req->r_parent ? : d_inode_rcu(parent); 931 932 if (!dir || dir->i_sb != mdsc->fsc->sb) { 933 /* not this fs or parent went negative */ 934 inode = d_inode(req->r_dentry); 935 if (inode) 936 ihold(inode); 937 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 938 /* direct snapped/virtual snapdir requests 939 * based on parent dir inode */ 940 inode = get_nonsnap_parent(parent); 941 dout("%s using nonsnap parent %p\n", __func__, inode); 942 } else { 943 /* dentry target */ 944 inode = d_inode(req->r_dentry); 945 if (!inode || mode == USE_AUTH_MDS) { 946 /* dir + name */ 947 inode = igrab(dir); 948 hash = ceph_dentry_hash(dir, req->r_dentry); 949 is_hash = true; 950 } else { 951 ihold(inode); 952 } 953 } 954 rcu_read_unlock(); 955 } 956 957 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, 958 hash, mode); 959 if (!inode) 960 goto random; 961 ci = ceph_inode(inode); 962 963 if (is_hash && S_ISDIR(inode->i_mode)) { 964 struct ceph_inode_frag frag; 965 int found; 966 967 ceph_choose_frag(ci, hash, &frag, &found); 968 if (found) { 969 if (mode == USE_ANY_MDS && frag.ndist > 0) { 970 u8 r; 971 972 /* choose a random replica */ 973 get_random_bytes(&r, 1); 974 r %= frag.ndist; 975 mds = frag.dist[r]; 976 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", 977 __func__, inode, ceph_vinop(inode), 978 frag.frag, mds, (int)r, frag.ndist); 979 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 980 CEPH_MDS_STATE_ACTIVE && 981 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 982 goto out; 983 } 984 985 /* since this file/dir wasn't known to be 986 * replicated, then we want to look for the 987 * authoritative mds. */ 988 if (frag.mds >= 0) { 989 /* choose auth mds */ 990 mds = frag.mds; 991 dout("%s %p %llx.%llx frag %u mds%d (auth)\n", 992 __func__, inode, ceph_vinop(inode), 993 frag.frag, mds); 994 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 995 CEPH_MDS_STATE_ACTIVE) { 996 if (mode == USE_ANY_MDS && 997 !ceph_mdsmap_is_laggy(mdsc->mdsmap, 998 mds)) 999 goto out; 1000 } 1001 } 1002 mode = USE_AUTH_MDS; 1003 } 1004 } 1005 1006 spin_lock(&ci->i_ceph_lock); 1007 cap = NULL; 1008 if (mode == USE_AUTH_MDS) 1009 cap = ci->i_auth_cap; 1010 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1011 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1012 if (!cap) { 1013 spin_unlock(&ci->i_ceph_lock); 1014 ceph_async_iput(inode); 1015 goto random; 1016 } 1017 mds = cap->session->s_mds; 1018 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, 1019 inode, ceph_vinop(inode), mds, 1020 cap == ci->i_auth_cap ? "auth " : "", cap); 1021 spin_unlock(&ci->i_ceph_lock); 1022 out: 1023 /* avoid calling iput_final() while holding mdsc->mutex or 1024 * in mds dispatch threads */ 1025 ceph_async_iput(inode); 1026 return mds; 1027 1028 random: 1029 if (random) 1030 *random = true; 1031 1032 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1033 dout("%s chose random mds%d\n", __func__, mds); 1034 return mds; 1035 } 1036 1037 1038 /* 1039 * session messages 1040 */ 1041 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 1042 { 1043 struct ceph_msg *msg; 1044 struct ceph_mds_session_head *h; 1045 1046 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1047 false); 1048 if (!msg) { 1049 pr_err("create_session_msg ENOMEM creating msg\n"); 1050 return NULL; 1051 } 1052 h = msg->front.iov_base; 1053 h->op = cpu_to_le32(op); 1054 h->seq = cpu_to_le64(seq); 1055 1056 return msg; 1057 } 1058 1059 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1060 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1061 static void encode_supported_features(void **p, void *end) 1062 { 1063 static const size_t count = ARRAY_SIZE(feature_bits); 1064 1065 if (count > 0) { 1066 size_t i; 1067 size_t size = FEATURE_BYTES(count); 1068 1069 BUG_ON(*p + 4 + size > end); 1070 ceph_encode_32(p, size); 1071 memset(*p, 0, size); 1072 for (i = 0; i < count; i++) 1073 ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8); 1074 *p += size; 1075 } else { 1076 BUG_ON(*p + 4 > end); 1077 ceph_encode_32(p, 0); 1078 } 1079 } 1080 1081 /* 1082 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1083 * to include additional client metadata fields. 1084 */ 1085 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1086 { 1087 struct ceph_msg *msg; 1088 struct ceph_mds_session_head *h; 1089 int i = -1; 1090 int extra_bytes = 0; 1091 int metadata_key_count = 0; 1092 struct ceph_options *opt = mdsc->fsc->client->options; 1093 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1094 size_t size, count; 1095 void *p, *end; 1096 1097 const char* metadata[][2] = { 1098 {"hostname", mdsc->nodename}, 1099 {"kernel_version", init_utsname()->release}, 1100 {"entity_id", opt->name ? : ""}, 1101 {"root", fsopt->server_path ? : "/"}, 1102 {NULL, NULL} 1103 }; 1104 1105 /* Calculate serialized length of metadata */ 1106 extra_bytes = 4; /* map length */ 1107 for (i = 0; metadata[i][0]; ++i) { 1108 extra_bytes += 8 + strlen(metadata[i][0]) + 1109 strlen(metadata[i][1]); 1110 metadata_key_count++; 1111 } 1112 1113 /* supported feature */ 1114 size = 0; 1115 count = ARRAY_SIZE(feature_bits); 1116 if (count > 0) 1117 size = FEATURE_BYTES(count); 1118 extra_bytes += 4 + size; 1119 1120 /* Allocate the message */ 1121 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1122 GFP_NOFS, false); 1123 if (!msg) { 1124 pr_err("create_session_msg ENOMEM creating msg\n"); 1125 return NULL; 1126 } 1127 p = msg->front.iov_base; 1128 end = p + msg->front.iov_len; 1129 1130 h = p; 1131 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1132 h->seq = cpu_to_le64(seq); 1133 1134 /* 1135 * Serialize client metadata into waiting buffer space, using 1136 * the format that userspace expects for map<string, string> 1137 * 1138 * ClientSession messages with metadata are v3 1139 */ 1140 msg->hdr.version = cpu_to_le16(3); 1141 msg->hdr.compat_version = cpu_to_le16(1); 1142 1143 /* The write pointer, following the session_head structure */ 1144 p += sizeof(*h); 1145 1146 /* Number of entries in the map */ 1147 ceph_encode_32(&p, metadata_key_count); 1148 1149 /* Two length-prefixed strings for each entry in the map */ 1150 for (i = 0; metadata[i][0]; ++i) { 1151 size_t const key_len = strlen(metadata[i][0]); 1152 size_t const val_len = strlen(metadata[i][1]); 1153 1154 ceph_encode_32(&p, key_len); 1155 memcpy(p, metadata[i][0], key_len); 1156 p += key_len; 1157 ceph_encode_32(&p, val_len); 1158 memcpy(p, metadata[i][1], val_len); 1159 p += val_len; 1160 } 1161 1162 encode_supported_features(&p, end); 1163 msg->front.iov_len = p - msg->front.iov_base; 1164 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1165 1166 return msg; 1167 } 1168 1169 /* 1170 * send session open request. 1171 * 1172 * called under mdsc->mutex 1173 */ 1174 static int __open_session(struct ceph_mds_client *mdsc, 1175 struct ceph_mds_session *session) 1176 { 1177 struct ceph_msg *msg; 1178 int mstate; 1179 int mds = session->s_mds; 1180 1181 /* wait for mds to go active? */ 1182 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1183 dout("open_session to mds%d (%s)\n", mds, 1184 ceph_mds_state_name(mstate)); 1185 session->s_state = CEPH_MDS_SESSION_OPENING; 1186 session->s_renew_requested = jiffies; 1187 1188 /* send connect message */ 1189 msg = create_session_open_msg(mdsc, session->s_seq); 1190 if (!msg) 1191 return -ENOMEM; 1192 ceph_con_send(&session->s_con, msg); 1193 return 0; 1194 } 1195 1196 /* 1197 * open sessions for any export targets for the given mds 1198 * 1199 * called under mdsc->mutex 1200 */ 1201 static struct ceph_mds_session * 1202 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1203 { 1204 struct ceph_mds_session *session; 1205 1206 session = __ceph_lookup_mds_session(mdsc, target); 1207 if (!session) { 1208 session = register_session(mdsc, target); 1209 if (IS_ERR(session)) 1210 return session; 1211 } 1212 if (session->s_state == CEPH_MDS_SESSION_NEW || 1213 session->s_state == CEPH_MDS_SESSION_CLOSING) 1214 __open_session(mdsc, session); 1215 1216 return session; 1217 } 1218 1219 struct ceph_mds_session * 1220 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1221 { 1222 struct ceph_mds_session *session; 1223 1224 dout("open_export_target_session to mds%d\n", target); 1225 1226 mutex_lock(&mdsc->mutex); 1227 session = __open_export_target_session(mdsc, target); 1228 mutex_unlock(&mdsc->mutex); 1229 1230 return session; 1231 } 1232 1233 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1234 struct ceph_mds_session *session) 1235 { 1236 struct ceph_mds_info *mi; 1237 struct ceph_mds_session *ts; 1238 int i, mds = session->s_mds; 1239 1240 if (mds >= mdsc->mdsmap->possible_max_rank) 1241 return; 1242 1243 mi = &mdsc->mdsmap->m_info[mds]; 1244 dout("open_export_target_sessions for mds%d (%d targets)\n", 1245 session->s_mds, mi->num_export_targets); 1246 1247 for (i = 0; i < mi->num_export_targets; i++) { 1248 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1249 if (!IS_ERR(ts)) 1250 ceph_put_mds_session(ts); 1251 } 1252 } 1253 1254 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1255 struct ceph_mds_session *session) 1256 { 1257 mutex_lock(&mdsc->mutex); 1258 __open_export_target_sessions(mdsc, session); 1259 mutex_unlock(&mdsc->mutex); 1260 } 1261 1262 /* 1263 * session caps 1264 */ 1265 1266 static void detach_cap_releases(struct ceph_mds_session *session, 1267 struct list_head *target) 1268 { 1269 lockdep_assert_held(&session->s_cap_lock); 1270 1271 list_splice_init(&session->s_cap_releases, target); 1272 session->s_num_cap_releases = 0; 1273 dout("dispose_cap_releases mds%d\n", session->s_mds); 1274 } 1275 1276 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1277 struct list_head *dispose) 1278 { 1279 while (!list_empty(dispose)) { 1280 struct ceph_cap *cap; 1281 /* zero out the in-progress message */ 1282 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1283 list_del(&cap->session_caps); 1284 ceph_put_cap(mdsc, cap); 1285 } 1286 } 1287 1288 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1289 struct ceph_mds_session *session) 1290 { 1291 struct ceph_mds_request *req; 1292 struct rb_node *p; 1293 struct ceph_inode_info *ci; 1294 1295 dout("cleanup_session_requests mds%d\n", session->s_mds); 1296 mutex_lock(&mdsc->mutex); 1297 while (!list_empty(&session->s_unsafe)) { 1298 req = list_first_entry(&session->s_unsafe, 1299 struct ceph_mds_request, r_unsafe_item); 1300 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1301 req->r_tid); 1302 if (req->r_target_inode) { 1303 /* dropping unsafe change of inode's attributes */ 1304 ci = ceph_inode(req->r_target_inode); 1305 errseq_set(&ci->i_meta_err, -EIO); 1306 } 1307 if (req->r_unsafe_dir) { 1308 /* dropping unsafe directory operation */ 1309 ci = ceph_inode(req->r_unsafe_dir); 1310 errseq_set(&ci->i_meta_err, -EIO); 1311 } 1312 __unregister_request(mdsc, req); 1313 } 1314 /* zero r_attempts, so kick_requests() will re-send requests */ 1315 p = rb_first(&mdsc->request_tree); 1316 while (p) { 1317 req = rb_entry(p, struct ceph_mds_request, r_node); 1318 p = rb_next(p); 1319 if (req->r_session && 1320 req->r_session->s_mds == session->s_mds) 1321 req->r_attempts = 0; 1322 } 1323 mutex_unlock(&mdsc->mutex); 1324 } 1325 1326 /* 1327 * Helper to safely iterate over all caps associated with a session, with 1328 * special care taken to handle a racing __ceph_remove_cap(). 1329 * 1330 * Caller must hold session s_mutex. 1331 */ 1332 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1333 int (*cb)(struct inode *, struct ceph_cap *, 1334 void *), void *arg) 1335 { 1336 struct list_head *p; 1337 struct ceph_cap *cap; 1338 struct inode *inode, *last_inode = NULL; 1339 struct ceph_cap *old_cap = NULL; 1340 int ret; 1341 1342 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1343 spin_lock(&session->s_cap_lock); 1344 p = session->s_caps.next; 1345 while (p != &session->s_caps) { 1346 cap = list_entry(p, struct ceph_cap, session_caps); 1347 inode = igrab(&cap->ci->vfs_inode); 1348 if (!inode) { 1349 p = p->next; 1350 continue; 1351 } 1352 session->s_cap_iterator = cap; 1353 spin_unlock(&session->s_cap_lock); 1354 1355 if (last_inode) { 1356 /* avoid calling iput_final() while holding 1357 * s_mutex or in mds dispatch threads */ 1358 ceph_async_iput(last_inode); 1359 last_inode = NULL; 1360 } 1361 if (old_cap) { 1362 ceph_put_cap(session->s_mdsc, old_cap); 1363 old_cap = NULL; 1364 } 1365 1366 ret = cb(inode, cap, arg); 1367 last_inode = inode; 1368 1369 spin_lock(&session->s_cap_lock); 1370 p = p->next; 1371 if (!cap->ci) { 1372 dout("iterate_session_caps finishing cap %p removal\n", 1373 cap); 1374 BUG_ON(cap->session != session); 1375 cap->session = NULL; 1376 list_del_init(&cap->session_caps); 1377 session->s_nr_caps--; 1378 if (cap->queue_release) 1379 __ceph_queue_cap_release(session, cap); 1380 else 1381 old_cap = cap; /* put_cap it w/o locks held */ 1382 } 1383 if (ret < 0) 1384 goto out; 1385 } 1386 ret = 0; 1387 out: 1388 session->s_cap_iterator = NULL; 1389 spin_unlock(&session->s_cap_lock); 1390 1391 ceph_async_iput(last_inode); 1392 if (old_cap) 1393 ceph_put_cap(session->s_mdsc, old_cap); 1394 1395 return ret; 1396 } 1397 1398 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1399 void *arg) 1400 { 1401 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg; 1402 struct ceph_inode_info *ci = ceph_inode(inode); 1403 LIST_HEAD(to_remove); 1404 bool dirty_dropped = false; 1405 bool invalidate = false; 1406 1407 dout("removing cap %p, ci is %p, inode is %p\n", 1408 cap, ci, &ci->vfs_inode); 1409 spin_lock(&ci->i_ceph_lock); 1410 if (cap->mds_wanted | cap->issued) 1411 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED; 1412 __ceph_remove_cap(cap, false); 1413 if (!ci->i_auth_cap) { 1414 struct ceph_cap_flush *cf; 1415 struct ceph_mds_client *mdsc = fsc->mdsc; 1416 1417 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 1418 if (inode->i_data.nrpages > 0) 1419 invalidate = true; 1420 if (ci->i_wrbuffer_ref > 0) 1421 mapping_set_error(&inode->i_data, -EIO); 1422 } 1423 1424 while (!list_empty(&ci->i_cap_flush_list)) { 1425 cf = list_first_entry(&ci->i_cap_flush_list, 1426 struct ceph_cap_flush, i_list); 1427 list_move(&cf->i_list, &to_remove); 1428 } 1429 1430 spin_lock(&mdsc->cap_dirty_lock); 1431 1432 list_for_each_entry(cf, &to_remove, i_list) 1433 list_del(&cf->g_list); 1434 1435 if (!list_empty(&ci->i_dirty_item)) { 1436 pr_warn_ratelimited( 1437 " dropping dirty %s state for %p %lld\n", 1438 ceph_cap_string(ci->i_dirty_caps), 1439 inode, ceph_ino(inode)); 1440 ci->i_dirty_caps = 0; 1441 list_del_init(&ci->i_dirty_item); 1442 dirty_dropped = true; 1443 } 1444 if (!list_empty(&ci->i_flushing_item)) { 1445 pr_warn_ratelimited( 1446 " dropping dirty+flushing %s state for %p %lld\n", 1447 ceph_cap_string(ci->i_flushing_caps), 1448 inode, ceph_ino(inode)); 1449 ci->i_flushing_caps = 0; 1450 list_del_init(&ci->i_flushing_item); 1451 mdsc->num_cap_flushing--; 1452 dirty_dropped = true; 1453 } 1454 spin_unlock(&mdsc->cap_dirty_lock); 1455 1456 if (dirty_dropped) { 1457 errseq_set(&ci->i_meta_err, -EIO); 1458 1459 if (ci->i_wrbuffer_ref_head == 0 && 1460 ci->i_wr_ref == 0 && 1461 ci->i_dirty_caps == 0 && 1462 ci->i_flushing_caps == 0) { 1463 ceph_put_snap_context(ci->i_head_snapc); 1464 ci->i_head_snapc = NULL; 1465 } 1466 } 1467 1468 if (atomic_read(&ci->i_filelock_ref) > 0) { 1469 /* make further file lock syscall return -EIO */ 1470 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK; 1471 pr_warn_ratelimited(" dropping file locks for %p %lld\n", 1472 inode, ceph_ino(inode)); 1473 } 1474 1475 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { 1476 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); 1477 ci->i_prealloc_cap_flush = NULL; 1478 } 1479 } 1480 spin_unlock(&ci->i_ceph_lock); 1481 while (!list_empty(&to_remove)) { 1482 struct ceph_cap_flush *cf; 1483 cf = list_first_entry(&to_remove, 1484 struct ceph_cap_flush, i_list); 1485 list_del(&cf->i_list); 1486 ceph_free_cap_flush(cf); 1487 } 1488 1489 wake_up_all(&ci->i_cap_wq); 1490 if (invalidate) 1491 ceph_queue_invalidate(inode); 1492 if (dirty_dropped) 1493 iput(inode); 1494 return 0; 1495 } 1496 1497 /* 1498 * caller must hold session s_mutex 1499 */ 1500 static void remove_session_caps(struct ceph_mds_session *session) 1501 { 1502 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1503 struct super_block *sb = fsc->sb; 1504 LIST_HEAD(dispose); 1505 1506 dout("remove_session_caps on %p\n", session); 1507 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1508 1509 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1510 1511 spin_lock(&session->s_cap_lock); 1512 if (session->s_nr_caps > 0) { 1513 struct inode *inode; 1514 struct ceph_cap *cap, *prev = NULL; 1515 struct ceph_vino vino; 1516 /* 1517 * iterate_session_caps() skips inodes that are being 1518 * deleted, we need to wait until deletions are complete. 1519 * __wait_on_freeing_inode() is designed for the job, 1520 * but it is not exported, so use lookup inode function 1521 * to access it. 1522 */ 1523 while (!list_empty(&session->s_caps)) { 1524 cap = list_entry(session->s_caps.next, 1525 struct ceph_cap, session_caps); 1526 if (cap == prev) 1527 break; 1528 prev = cap; 1529 vino = cap->ci->i_vino; 1530 spin_unlock(&session->s_cap_lock); 1531 1532 inode = ceph_find_inode(sb, vino); 1533 /* avoid calling iput_final() while holding s_mutex */ 1534 ceph_async_iput(inode); 1535 1536 spin_lock(&session->s_cap_lock); 1537 } 1538 } 1539 1540 // drop cap expires and unlock s_cap_lock 1541 detach_cap_releases(session, &dispose); 1542 1543 BUG_ON(session->s_nr_caps > 0); 1544 BUG_ON(!list_empty(&session->s_cap_flushing)); 1545 spin_unlock(&session->s_cap_lock); 1546 dispose_cap_releases(session->s_mdsc, &dispose); 1547 } 1548 1549 enum { 1550 RECONNECT, 1551 RENEWCAPS, 1552 FORCE_RO, 1553 }; 1554 1555 /* 1556 * wake up any threads waiting on this session's caps. if the cap is 1557 * old (didn't get renewed on the client reconnect), remove it now. 1558 * 1559 * caller must hold s_mutex. 1560 */ 1561 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1562 void *arg) 1563 { 1564 struct ceph_inode_info *ci = ceph_inode(inode); 1565 unsigned long ev = (unsigned long)arg; 1566 1567 if (ev == RECONNECT) { 1568 spin_lock(&ci->i_ceph_lock); 1569 ci->i_wanted_max_size = 0; 1570 ci->i_requested_max_size = 0; 1571 spin_unlock(&ci->i_ceph_lock); 1572 } else if (ev == RENEWCAPS) { 1573 if (cap->cap_gen < cap->session->s_cap_gen) { 1574 /* mds did not re-issue stale cap */ 1575 spin_lock(&ci->i_ceph_lock); 1576 cap->issued = cap->implemented = CEPH_CAP_PIN; 1577 /* make sure mds knows what we want */ 1578 if (__ceph_caps_file_wanted(ci) & ~cap->mds_wanted) 1579 ci->i_ceph_flags |= CEPH_I_CAP_DROPPED; 1580 spin_unlock(&ci->i_ceph_lock); 1581 } 1582 } else if (ev == FORCE_RO) { 1583 } 1584 wake_up_all(&ci->i_cap_wq); 1585 return 0; 1586 } 1587 1588 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1589 { 1590 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1591 ceph_iterate_session_caps(session, wake_up_session_cb, 1592 (void *)(unsigned long)ev); 1593 } 1594 1595 /* 1596 * Send periodic message to MDS renewing all currently held caps. The 1597 * ack will reset the expiration for all caps from this session. 1598 * 1599 * caller holds s_mutex 1600 */ 1601 static int send_renew_caps(struct ceph_mds_client *mdsc, 1602 struct ceph_mds_session *session) 1603 { 1604 struct ceph_msg *msg; 1605 int state; 1606 1607 if (time_after_eq(jiffies, session->s_cap_ttl) && 1608 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1609 pr_info("mds%d caps stale\n", session->s_mds); 1610 session->s_renew_requested = jiffies; 1611 1612 /* do not try to renew caps until a recovering mds has reconnected 1613 * with its clients. */ 1614 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1615 if (state < CEPH_MDS_STATE_RECONNECT) { 1616 dout("send_renew_caps ignoring mds%d (%s)\n", 1617 session->s_mds, ceph_mds_state_name(state)); 1618 return 0; 1619 } 1620 1621 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1622 ceph_mds_state_name(state)); 1623 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1624 ++session->s_renew_seq); 1625 if (!msg) 1626 return -ENOMEM; 1627 ceph_con_send(&session->s_con, msg); 1628 return 0; 1629 } 1630 1631 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1632 struct ceph_mds_session *session, u64 seq) 1633 { 1634 struct ceph_msg *msg; 1635 1636 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1637 session->s_mds, ceph_session_state_name(session->s_state), seq); 1638 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1639 if (!msg) 1640 return -ENOMEM; 1641 ceph_con_send(&session->s_con, msg); 1642 return 0; 1643 } 1644 1645 1646 /* 1647 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1648 * 1649 * Called under session->s_mutex 1650 */ 1651 static void renewed_caps(struct ceph_mds_client *mdsc, 1652 struct ceph_mds_session *session, int is_renew) 1653 { 1654 int was_stale; 1655 int wake = 0; 1656 1657 spin_lock(&session->s_cap_lock); 1658 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1659 1660 session->s_cap_ttl = session->s_renew_requested + 1661 mdsc->mdsmap->m_session_timeout*HZ; 1662 1663 if (was_stale) { 1664 if (time_before(jiffies, session->s_cap_ttl)) { 1665 pr_info("mds%d caps renewed\n", session->s_mds); 1666 wake = 1; 1667 } else { 1668 pr_info("mds%d caps still stale\n", session->s_mds); 1669 } 1670 } 1671 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1672 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1673 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1674 spin_unlock(&session->s_cap_lock); 1675 1676 if (wake) 1677 wake_up_session_caps(session, RENEWCAPS); 1678 } 1679 1680 /* 1681 * send a session close request 1682 */ 1683 static int request_close_session(struct ceph_mds_client *mdsc, 1684 struct ceph_mds_session *session) 1685 { 1686 struct ceph_msg *msg; 1687 1688 dout("request_close_session mds%d state %s seq %lld\n", 1689 session->s_mds, ceph_session_state_name(session->s_state), 1690 session->s_seq); 1691 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1692 if (!msg) 1693 return -ENOMEM; 1694 ceph_con_send(&session->s_con, msg); 1695 return 1; 1696 } 1697 1698 /* 1699 * Called with s_mutex held. 1700 */ 1701 static int __close_session(struct ceph_mds_client *mdsc, 1702 struct ceph_mds_session *session) 1703 { 1704 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1705 return 0; 1706 session->s_state = CEPH_MDS_SESSION_CLOSING; 1707 return request_close_session(mdsc, session); 1708 } 1709 1710 static bool drop_negative_children(struct dentry *dentry) 1711 { 1712 struct dentry *child; 1713 bool all_negative = true; 1714 1715 if (!d_is_dir(dentry)) 1716 goto out; 1717 1718 spin_lock(&dentry->d_lock); 1719 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1720 if (d_really_is_positive(child)) { 1721 all_negative = false; 1722 break; 1723 } 1724 } 1725 spin_unlock(&dentry->d_lock); 1726 1727 if (all_negative) 1728 shrink_dcache_parent(dentry); 1729 out: 1730 return all_negative; 1731 } 1732 1733 /* 1734 * Trim old(er) caps. 1735 * 1736 * Because we can't cache an inode without one or more caps, we do 1737 * this indirectly: if a cap is unused, we prune its aliases, at which 1738 * point the inode will hopefully get dropped to. 1739 * 1740 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1741 * memory pressure from the MDS, though, so it needn't be perfect. 1742 */ 1743 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1744 { 1745 int *remaining = arg; 1746 struct ceph_inode_info *ci = ceph_inode(inode); 1747 int used, wanted, oissued, mine; 1748 1749 if (*remaining <= 0) 1750 return -1; 1751 1752 spin_lock(&ci->i_ceph_lock); 1753 mine = cap->issued | cap->implemented; 1754 used = __ceph_caps_used(ci); 1755 wanted = __ceph_caps_file_wanted(ci); 1756 oissued = __ceph_caps_issued_other(ci, cap); 1757 1758 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1759 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1760 ceph_cap_string(used), ceph_cap_string(wanted)); 1761 if (cap == ci->i_auth_cap) { 1762 if (ci->i_dirty_caps || ci->i_flushing_caps || 1763 !list_empty(&ci->i_cap_snaps)) 1764 goto out; 1765 if ((used | wanted) & CEPH_CAP_ANY_WR) 1766 goto out; 1767 /* Note: it's possible that i_filelock_ref becomes non-zero 1768 * after dropping auth caps. It doesn't hurt because reply 1769 * of lock mds request will re-add auth caps. */ 1770 if (atomic_read(&ci->i_filelock_ref) > 0) 1771 goto out; 1772 } 1773 /* The inode has cached pages, but it's no longer used. 1774 * we can safely drop it */ 1775 if (wanted == 0 && used == CEPH_CAP_FILE_CACHE && 1776 !(oissued & CEPH_CAP_FILE_CACHE)) { 1777 used = 0; 1778 oissued = 0; 1779 } 1780 if ((used | wanted) & ~oissued & mine) 1781 goto out; /* we need these caps */ 1782 1783 if (oissued) { 1784 /* we aren't the only cap.. just remove us */ 1785 __ceph_remove_cap(cap, true); 1786 (*remaining)--; 1787 } else { 1788 struct dentry *dentry; 1789 /* try dropping referring dentries */ 1790 spin_unlock(&ci->i_ceph_lock); 1791 dentry = d_find_any_alias(inode); 1792 if (dentry && drop_negative_children(dentry)) { 1793 int count; 1794 dput(dentry); 1795 d_prune_aliases(inode); 1796 count = atomic_read(&inode->i_count); 1797 if (count == 1) 1798 (*remaining)--; 1799 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1800 inode, cap, count); 1801 } else { 1802 dput(dentry); 1803 } 1804 return 0; 1805 } 1806 1807 out: 1808 spin_unlock(&ci->i_ceph_lock); 1809 return 0; 1810 } 1811 1812 /* 1813 * Trim session cap count down to some max number. 1814 */ 1815 int ceph_trim_caps(struct ceph_mds_client *mdsc, 1816 struct ceph_mds_session *session, 1817 int max_caps) 1818 { 1819 int trim_caps = session->s_nr_caps - max_caps; 1820 1821 dout("trim_caps mds%d start: %d / %d, trim %d\n", 1822 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 1823 if (trim_caps > 0) { 1824 int remaining = trim_caps; 1825 1826 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 1827 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 1828 session->s_mds, session->s_nr_caps, max_caps, 1829 trim_caps - remaining); 1830 } 1831 1832 ceph_flush_cap_releases(mdsc, session); 1833 return 0; 1834 } 1835 1836 static int check_caps_flush(struct ceph_mds_client *mdsc, 1837 u64 want_flush_tid) 1838 { 1839 int ret = 1; 1840 1841 spin_lock(&mdsc->cap_dirty_lock); 1842 if (!list_empty(&mdsc->cap_flush_list)) { 1843 struct ceph_cap_flush *cf = 1844 list_first_entry(&mdsc->cap_flush_list, 1845 struct ceph_cap_flush, g_list); 1846 if (cf->tid <= want_flush_tid) { 1847 dout("check_caps_flush still flushing tid " 1848 "%llu <= %llu\n", cf->tid, want_flush_tid); 1849 ret = 0; 1850 } 1851 } 1852 spin_unlock(&mdsc->cap_dirty_lock); 1853 return ret; 1854 } 1855 1856 /* 1857 * flush all dirty inode data to disk. 1858 * 1859 * returns true if we've flushed through want_flush_tid 1860 */ 1861 static void wait_caps_flush(struct ceph_mds_client *mdsc, 1862 u64 want_flush_tid) 1863 { 1864 dout("check_caps_flush want %llu\n", want_flush_tid); 1865 1866 wait_event(mdsc->cap_flushing_wq, 1867 check_caps_flush(mdsc, want_flush_tid)); 1868 1869 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 1870 } 1871 1872 /* 1873 * called under s_mutex 1874 */ 1875 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 1876 struct ceph_mds_session *session) 1877 { 1878 struct ceph_msg *msg = NULL; 1879 struct ceph_mds_cap_release *head; 1880 struct ceph_mds_cap_item *item; 1881 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 1882 struct ceph_cap *cap; 1883 LIST_HEAD(tmp_list); 1884 int num_cap_releases; 1885 __le32 barrier, *cap_barrier; 1886 1887 down_read(&osdc->lock); 1888 barrier = cpu_to_le32(osdc->epoch_barrier); 1889 up_read(&osdc->lock); 1890 1891 spin_lock(&session->s_cap_lock); 1892 again: 1893 list_splice_init(&session->s_cap_releases, &tmp_list); 1894 num_cap_releases = session->s_num_cap_releases; 1895 session->s_num_cap_releases = 0; 1896 spin_unlock(&session->s_cap_lock); 1897 1898 while (!list_empty(&tmp_list)) { 1899 if (!msg) { 1900 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 1901 PAGE_SIZE, GFP_NOFS, false); 1902 if (!msg) 1903 goto out_err; 1904 head = msg->front.iov_base; 1905 head->num = cpu_to_le32(0); 1906 msg->front.iov_len = sizeof(*head); 1907 1908 msg->hdr.version = cpu_to_le16(2); 1909 msg->hdr.compat_version = cpu_to_le16(1); 1910 } 1911 1912 cap = list_first_entry(&tmp_list, struct ceph_cap, 1913 session_caps); 1914 list_del(&cap->session_caps); 1915 num_cap_releases--; 1916 1917 head = msg->front.iov_base; 1918 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 1919 &head->num); 1920 item = msg->front.iov_base + msg->front.iov_len; 1921 item->ino = cpu_to_le64(cap->cap_ino); 1922 item->cap_id = cpu_to_le64(cap->cap_id); 1923 item->migrate_seq = cpu_to_le32(cap->mseq); 1924 item->seq = cpu_to_le32(cap->issue_seq); 1925 msg->front.iov_len += sizeof(*item); 1926 1927 ceph_put_cap(mdsc, cap); 1928 1929 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 1930 // Append cap_barrier field 1931 cap_barrier = msg->front.iov_base + msg->front.iov_len; 1932 *cap_barrier = barrier; 1933 msg->front.iov_len += sizeof(*cap_barrier); 1934 1935 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1936 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 1937 ceph_con_send(&session->s_con, msg); 1938 msg = NULL; 1939 } 1940 } 1941 1942 BUG_ON(num_cap_releases != 0); 1943 1944 spin_lock(&session->s_cap_lock); 1945 if (!list_empty(&session->s_cap_releases)) 1946 goto again; 1947 spin_unlock(&session->s_cap_lock); 1948 1949 if (msg) { 1950 // Append cap_barrier field 1951 cap_barrier = msg->front.iov_base + msg->front.iov_len; 1952 *cap_barrier = barrier; 1953 msg->front.iov_len += sizeof(*cap_barrier); 1954 1955 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1956 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 1957 ceph_con_send(&session->s_con, msg); 1958 } 1959 return; 1960 out_err: 1961 pr_err("send_cap_releases mds%d, failed to allocate message\n", 1962 session->s_mds); 1963 spin_lock(&session->s_cap_lock); 1964 list_splice(&tmp_list, &session->s_cap_releases); 1965 session->s_num_cap_releases += num_cap_releases; 1966 spin_unlock(&session->s_cap_lock); 1967 } 1968 1969 static void ceph_cap_release_work(struct work_struct *work) 1970 { 1971 struct ceph_mds_session *session = 1972 container_of(work, struct ceph_mds_session, s_cap_release_work); 1973 1974 mutex_lock(&session->s_mutex); 1975 if (session->s_state == CEPH_MDS_SESSION_OPEN || 1976 session->s_state == CEPH_MDS_SESSION_HUNG) 1977 ceph_send_cap_releases(session->s_mdsc, session); 1978 mutex_unlock(&session->s_mutex); 1979 ceph_put_mds_session(session); 1980 } 1981 1982 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 1983 struct ceph_mds_session *session) 1984 { 1985 if (mdsc->stopping) 1986 return; 1987 1988 ceph_get_mds_session(session); 1989 if (queue_work(mdsc->fsc->cap_wq, 1990 &session->s_cap_release_work)) { 1991 dout("cap release work queued\n"); 1992 } else { 1993 ceph_put_mds_session(session); 1994 dout("failed to queue cap release work\n"); 1995 } 1996 } 1997 1998 /* 1999 * caller holds session->s_cap_lock 2000 */ 2001 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2002 struct ceph_cap *cap) 2003 { 2004 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2005 session->s_num_cap_releases++; 2006 2007 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2008 ceph_flush_cap_releases(session->s_mdsc, session); 2009 } 2010 2011 static void ceph_cap_reclaim_work(struct work_struct *work) 2012 { 2013 struct ceph_mds_client *mdsc = 2014 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2015 int ret = ceph_trim_dentries(mdsc); 2016 if (ret == -EAGAIN) 2017 ceph_queue_cap_reclaim_work(mdsc); 2018 } 2019 2020 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2021 { 2022 if (mdsc->stopping) 2023 return; 2024 2025 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2026 dout("caps reclaim work queued\n"); 2027 } else { 2028 dout("failed to queue caps release work\n"); 2029 } 2030 } 2031 2032 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2033 { 2034 int val; 2035 if (!nr) 2036 return; 2037 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2038 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2039 atomic_set(&mdsc->cap_reclaim_pending, 0); 2040 ceph_queue_cap_reclaim_work(mdsc); 2041 } 2042 } 2043 2044 /* 2045 * requests 2046 */ 2047 2048 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2049 struct inode *dir) 2050 { 2051 struct ceph_inode_info *ci = ceph_inode(dir); 2052 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2053 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2054 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2055 unsigned int num_entries; 2056 int order; 2057 2058 spin_lock(&ci->i_ceph_lock); 2059 num_entries = ci->i_files + ci->i_subdirs; 2060 spin_unlock(&ci->i_ceph_lock); 2061 num_entries = max(num_entries, 1U); 2062 num_entries = min(num_entries, opt->max_readdir); 2063 2064 order = get_order(size * num_entries); 2065 while (order >= 0) { 2066 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2067 __GFP_NOWARN, 2068 order); 2069 if (rinfo->dir_entries) 2070 break; 2071 order--; 2072 } 2073 if (!rinfo->dir_entries) 2074 return -ENOMEM; 2075 2076 num_entries = (PAGE_SIZE << order) / size; 2077 num_entries = min(num_entries, opt->max_readdir); 2078 2079 rinfo->dir_buf_size = PAGE_SIZE << order; 2080 req->r_num_caps = num_entries + 1; 2081 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2082 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2083 return 0; 2084 } 2085 2086 /* 2087 * Create an mds request. 2088 */ 2089 struct ceph_mds_request * 2090 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2091 { 2092 struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS); 2093 2094 if (!req) 2095 return ERR_PTR(-ENOMEM); 2096 2097 mutex_init(&req->r_fill_mutex); 2098 req->r_mdsc = mdsc; 2099 req->r_started = jiffies; 2100 req->r_resend_mds = -1; 2101 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2102 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2103 req->r_fmode = -1; 2104 kref_init(&req->r_kref); 2105 RB_CLEAR_NODE(&req->r_node); 2106 INIT_LIST_HEAD(&req->r_wait); 2107 init_completion(&req->r_completion); 2108 init_completion(&req->r_safe_completion); 2109 INIT_LIST_HEAD(&req->r_unsafe_item); 2110 2111 ktime_get_coarse_real_ts64(&req->r_stamp); 2112 2113 req->r_op = op; 2114 req->r_direct_mode = mode; 2115 return req; 2116 } 2117 2118 /* 2119 * return oldest (lowest) request, tid in request tree, 0 if none. 2120 * 2121 * called under mdsc->mutex. 2122 */ 2123 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2124 { 2125 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2126 return NULL; 2127 return rb_entry(rb_first(&mdsc->request_tree), 2128 struct ceph_mds_request, r_node); 2129 } 2130 2131 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2132 { 2133 return mdsc->oldest_tid; 2134 } 2135 2136 /* 2137 * Build a dentry's path. Allocate on heap; caller must kfree. Based 2138 * on build_path_from_dentry in fs/cifs/dir.c. 2139 * 2140 * If @stop_on_nosnap, generate path relative to the first non-snapped 2141 * inode. 2142 * 2143 * Encode hidden .snap dirs as a double /, i.e. 2144 * foo/.snap/bar -> foo//bar 2145 */ 2146 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, 2147 int stop_on_nosnap) 2148 { 2149 struct dentry *temp; 2150 char *path; 2151 int pos; 2152 unsigned seq; 2153 u64 base; 2154 2155 if (!dentry) 2156 return ERR_PTR(-EINVAL); 2157 2158 path = __getname(); 2159 if (!path) 2160 return ERR_PTR(-ENOMEM); 2161 retry: 2162 pos = PATH_MAX - 1; 2163 path[pos] = '\0'; 2164 2165 seq = read_seqbegin(&rename_lock); 2166 rcu_read_lock(); 2167 temp = dentry; 2168 for (;;) { 2169 struct inode *inode; 2170 2171 spin_lock(&temp->d_lock); 2172 inode = d_inode(temp); 2173 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2174 dout("build_path path+%d: %p SNAPDIR\n", 2175 pos, temp); 2176 } else if (stop_on_nosnap && inode && dentry != temp && 2177 ceph_snap(inode) == CEPH_NOSNAP) { 2178 spin_unlock(&temp->d_lock); 2179 pos++; /* get rid of any prepended '/' */ 2180 break; 2181 } else { 2182 pos -= temp->d_name.len; 2183 if (pos < 0) { 2184 spin_unlock(&temp->d_lock); 2185 break; 2186 } 2187 memcpy(path + pos, temp->d_name.name, temp->d_name.len); 2188 } 2189 spin_unlock(&temp->d_lock); 2190 temp = READ_ONCE(temp->d_parent); 2191 2192 /* Are we at the root? */ 2193 if (IS_ROOT(temp)) 2194 break; 2195 2196 /* Are we out of buffer? */ 2197 if (--pos < 0) 2198 break; 2199 2200 path[pos] = '/'; 2201 } 2202 base = ceph_ino(d_inode(temp)); 2203 rcu_read_unlock(); 2204 2205 if (read_seqretry(&rename_lock, seq)) 2206 goto retry; 2207 2208 if (pos < 0) { 2209 /* 2210 * A rename didn't occur, but somehow we didn't end up where 2211 * we thought we would. Throw a warning and try again. 2212 */ 2213 pr_warn("build_path did not end path lookup where " 2214 "expected, pos is %d\n", pos); 2215 goto retry; 2216 } 2217 2218 *pbase = base; 2219 *plen = PATH_MAX - 1 - pos; 2220 dout("build_path on %p %d built %llx '%.*s'\n", 2221 dentry, d_count(dentry), base, *plen, path + pos); 2222 return path + pos; 2223 } 2224 2225 static int build_dentry_path(struct dentry *dentry, struct inode *dir, 2226 const char **ppath, int *ppathlen, u64 *pino, 2227 bool *pfreepath, bool parent_locked) 2228 { 2229 char *path; 2230 2231 rcu_read_lock(); 2232 if (!dir) 2233 dir = d_inode_rcu(dentry->d_parent); 2234 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { 2235 *pino = ceph_ino(dir); 2236 rcu_read_unlock(); 2237 *ppath = dentry->d_name.name; 2238 *ppathlen = dentry->d_name.len; 2239 return 0; 2240 } 2241 rcu_read_unlock(); 2242 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2243 if (IS_ERR(path)) 2244 return PTR_ERR(path); 2245 *ppath = path; 2246 *pfreepath = true; 2247 return 0; 2248 } 2249 2250 static int build_inode_path(struct inode *inode, 2251 const char **ppath, int *ppathlen, u64 *pino, 2252 bool *pfreepath) 2253 { 2254 struct dentry *dentry; 2255 char *path; 2256 2257 if (ceph_snap(inode) == CEPH_NOSNAP) { 2258 *pino = ceph_ino(inode); 2259 *ppathlen = 0; 2260 return 0; 2261 } 2262 dentry = d_find_alias(inode); 2263 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2264 dput(dentry); 2265 if (IS_ERR(path)) 2266 return PTR_ERR(path); 2267 *ppath = path; 2268 *pfreepath = true; 2269 return 0; 2270 } 2271 2272 /* 2273 * request arguments may be specified via an inode *, a dentry *, or 2274 * an explicit ino+path. 2275 */ 2276 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2277 struct inode *rdiri, const char *rpath, 2278 u64 rino, const char **ppath, int *pathlen, 2279 u64 *ino, bool *freepath, bool parent_locked) 2280 { 2281 int r = 0; 2282 2283 if (rinode) { 2284 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2285 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2286 ceph_snap(rinode)); 2287 } else if (rdentry) { 2288 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2289 freepath, parent_locked); 2290 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2291 *ppath); 2292 } else if (rpath || rino) { 2293 *ino = rino; 2294 *ppath = rpath; 2295 *pathlen = rpath ? strlen(rpath) : 0; 2296 dout(" path %.*s\n", *pathlen, rpath); 2297 } 2298 2299 return r; 2300 } 2301 2302 /* 2303 * called under mdsc->mutex 2304 */ 2305 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, 2306 struct ceph_mds_request *req, 2307 int mds, bool drop_cap_releases) 2308 { 2309 struct ceph_msg *msg; 2310 struct ceph_mds_request_head *head; 2311 const char *path1 = NULL; 2312 const char *path2 = NULL; 2313 u64 ino1 = 0, ino2 = 0; 2314 int pathlen1 = 0, pathlen2 = 0; 2315 bool freepath1 = false, freepath2 = false; 2316 int len; 2317 u16 releases; 2318 void *p, *end; 2319 int ret; 2320 2321 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2322 req->r_parent, req->r_path1, req->r_ino1.ino, 2323 &path1, &pathlen1, &ino1, &freepath1, 2324 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2325 &req->r_req_flags)); 2326 if (ret < 0) { 2327 msg = ERR_PTR(ret); 2328 goto out; 2329 } 2330 2331 /* If r_old_dentry is set, then assume that its parent is locked */ 2332 ret = set_request_path_attr(NULL, req->r_old_dentry, 2333 req->r_old_dentry_dir, 2334 req->r_path2, req->r_ino2.ino, 2335 &path2, &pathlen2, &ino2, &freepath2, true); 2336 if (ret < 0) { 2337 msg = ERR_PTR(ret); 2338 goto out_free1; 2339 } 2340 2341 len = sizeof(*head) + 2342 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2343 sizeof(struct ceph_timespec); 2344 2345 /* calculate (max) length for cap releases */ 2346 len += sizeof(struct ceph_mds_request_release) * 2347 (!!req->r_inode_drop + !!req->r_dentry_drop + 2348 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2349 if (req->r_dentry_drop) 2350 len += pathlen1; 2351 if (req->r_old_dentry_drop) 2352 len += pathlen2; 2353 2354 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2355 if (!msg) { 2356 msg = ERR_PTR(-ENOMEM); 2357 goto out_free2; 2358 } 2359 2360 msg->hdr.version = cpu_to_le16(2); 2361 msg->hdr.tid = cpu_to_le64(req->r_tid); 2362 2363 head = msg->front.iov_base; 2364 p = msg->front.iov_base + sizeof(*head); 2365 end = msg->front.iov_base + msg->front.iov_len; 2366 2367 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2368 head->op = cpu_to_le32(req->r_op); 2369 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid)); 2370 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid)); 2371 head->ino = 0; 2372 head->args = req->r_args; 2373 2374 ceph_encode_filepath(&p, end, ino1, path1); 2375 ceph_encode_filepath(&p, end, ino2, path2); 2376 2377 /* make note of release offset, in case we need to replay */ 2378 req->r_request_release_offset = p - msg->front.iov_base; 2379 2380 /* cap releases */ 2381 releases = 0; 2382 if (req->r_inode_drop) 2383 releases += ceph_encode_inode_release(&p, 2384 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2385 mds, req->r_inode_drop, req->r_inode_unless, 0); 2386 if (req->r_dentry_drop) 2387 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2388 req->r_parent, mds, req->r_dentry_drop, 2389 req->r_dentry_unless); 2390 if (req->r_old_dentry_drop) 2391 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2392 req->r_old_dentry_dir, mds, 2393 req->r_old_dentry_drop, 2394 req->r_old_dentry_unless); 2395 if (req->r_old_inode_drop) 2396 releases += ceph_encode_inode_release(&p, 2397 d_inode(req->r_old_dentry), 2398 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2399 2400 if (drop_cap_releases) { 2401 releases = 0; 2402 p = msg->front.iov_base + req->r_request_release_offset; 2403 } 2404 2405 head->num_releases = cpu_to_le16(releases); 2406 2407 /* time stamp */ 2408 { 2409 struct ceph_timespec ts; 2410 ceph_encode_timespec64(&ts, &req->r_stamp); 2411 ceph_encode_copy(&p, &ts, sizeof(ts)); 2412 } 2413 2414 BUG_ON(p > end); 2415 msg->front.iov_len = p - msg->front.iov_base; 2416 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2417 2418 if (req->r_pagelist) { 2419 struct ceph_pagelist *pagelist = req->r_pagelist; 2420 ceph_msg_data_add_pagelist(msg, pagelist); 2421 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2422 } else { 2423 msg->hdr.data_len = 0; 2424 } 2425 2426 msg->hdr.data_off = cpu_to_le16(0); 2427 2428 out_free2: 2429 if (freepath2) 2430 ceph_mdsc_free_path((char *)path2, pathlen2); 2431 out_free1: 2432 if (freepath1) 2433 ceph_mdsc_free_path((char *)path1, pathlen1); 2434 out: 2435 return msg; 2436 } 2437 2438 /* 2439 * called under mdsc->mutex if error, under no mutex if 2440 * success. 2441 */ 2442 static void complete_request(struct ceph_mds_client *mdsc, 2443 struct ceph_mds_request *req) 2444 { 2445 if (req->r_callback) 2446 req->r_callback(mdsc, req); 2447 complete_all(&req->r_completion); 2448 } 2449 2450 /* 2451 * called under mdsc->mutex 2452 */ 2453 static int __prepare_send_request(struct ceph_mds_client *mdsc, 2454 struct ceph_mds_request *req, 2455 int mds, bool drop_cap_releases) 2456 { 2457 struct ceph_mds_request_head *rhead; 2458 struct ceph_msg *msg; 2459 int flags = 0; 2460 2461 req->r_attempts++; 2462 if (req->r_inode) { 2463 struct ceph_cap *cap = 2464 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2465 2466 if (cap) 2467 req->r_sent_on_mseq = cap->mseq; 2468 else 2469 req->r_sent_on_mseq = -1; 2470 } 2471 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 2472 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2473 2474 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2475 void *p; 2476 /* 2477 * Replay. Do not regenerate message (and rebuild 2478 * paths, etc.); just use the original message. 2479 * Rebuilding paths will break for renames because 2480 * d_move mangles the src name. 2481 */ 2482 msg = req->r_request; 2483 rhead = msg->front.iov_base; 2484 2485 flags = le32_to_cpu(rhead->flags); 2486 flags |= CEPH_MDS_FLAG_REPLAY; 2487 rhead->flags = cpu_to_le32(flags); 2488 2489 if (req->r_target_inode) 2490 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2491 2492 rhead->num_retry = req->r_attempts - 1; 2493 2494 /* remove cap/dentry releases from message */ 2495 rhead->num_releases = 0; 2496 2497 /* time stamp */ 2498 p = msg->front.iov_base + req->r_request_release_offset; 2499 { 2500 struct ceph_timespec ts; 2501 ceph_encode_timespec64(&ts, &req->r_stamp); 2502 ceph_encode_copy(&p, &ts, sizeof(ts)); 2503 } 2504 2505 msg->front.iov_len = p - msg->front.iov_base; 2506 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2507 return 0; 2508 } 2509 2510 if (req->r_request) { 2511 ceph_msg_put(req->r_request); 2512 req->r_request = NULL; 2513 } 2514 msg = create_request_message(mdsc, req, mds, drop_cap_releases); 2515 if (IS_ERR(msg)) { 2516 req->r_err = PTR_ERR(msg); 2517 return PTR_ERR(msg); 2518 } 2519 req->r_request = msg; 2520 2521 rhead = msg->front.iov_base; 2522 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2523 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2524 flags |= CEPH_MDS_FLAG_REPLAY; 2525 if (req->r_parent) 2526 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2527 rhead->flags = cpu_to_le32(flags); 2528 rhead->num_fwd = req->r_num_fwd; 2529 rhead->num_retry = req->r_attempts - 1; 2530 rhead->ino = 0; 2531 2532 dout(" r_parent = %p\n", req->r_parent); 2533 return 0; 2534 } 2535 2536 /* 2537 * called under mdsc->mutex 2538 */ 2539 static int __send_request(struct ceph_mds_client *mdsc, 2540 struct ceph_mds_session *session, 2541 struct ceph_mds_request *req, 2542 bool drop_cap_releases) 2543 { 2544 int err; 2545 2546 err = __prepare_send_request(mdsc, req, session->s_mds, 2547 drop_cap_releases); 2548 if (!err) { 2549 ceph_msg_get(req->r_request); 2550 ceph_con_send(&session->s_con, req->r_request); 2551 } 2552 2553 return err; 2554 } 2555 2556 /* 2557 * send request, or put it on the appropriate wait list. 2558 */ 2559 static void __do_request(struct ceph_mds_client *mdsc, 2560 struct ceph_mds_request *req) 2561 { 2562 struct ceph_mds_session *session = NULL; 2563 int mds = -1; 2564 int err = 0; 2565 bool random; 2566 2567 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2568 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2569 __unregister_request(mdsc, req); 2570 return; 2571 } 2572 2573 if (req->r_timeout && 2574 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2575 dout("do_request timed out\n"); 2576 err = -EIO; 2577 goto finish; 2578 } 2579 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2580 dout("do_request forced umount\n"); 2581 err = -EIO; 2582 goto finish; 2583 } 2584 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2585 if (mdsc->mdsmap_err) { 2586 err = mdsc->mdsmap_err; 2587 dout("do_request mdsmap err %d\n", err); 2588 goto finish; 2589 } 2590 if (mdsc->mdsmap->m_epoch == 0) { 2591 dout("do_request no mdsmap, waiting for map\n"); 2592 list_add(&req->r_wait, &mdsc->waiting_for_map); 2593 return; 2594 } 2595 if (!(mdsc->fsc->mount_options->flags & 2596 CEPH_MOUNT_OPT_MOUNTWAIT) && 2597 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2598 err = -EHOSTUNREACH; 2599 goto finish; 2600 } 2601 } 2602 2603 put_request_session(req); 2604 2605 mds = __choose_mds(mdsc, req, &random); 2606 if (mds < 0 || 2607 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2608 dout("do_request no mds or not active, waiting for map\n"); 2609 list_add(&req->r_wait, &mdsc->waiting_for_map); 2610 return; 2611 } 2612 2613 /* get, open session */ 2614 session = __ceph_lookup_mds_session(mdsc, mds); 2615 if (!session) { 2616 session = register_session(mdsc, mds); 2617 if (IS_ERR(session)) { 2618 err = PTR_ERR(session); 2619 goto finish; 2620 } 2621 } 2622 req->r_session = ceph_get_mds_session(session); 2623 2624 dout("do_request mds%d session %p state %s\n", mds, session, 2625 ceph_session_state_name(session->s_state)); 2626 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2627 session->s_state != CEPH_MDS_SESSION_HUNG) { 2628 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2629 err = -EACCES; 2630 goto out_session; 2631 } 2632 if (session->s_state == CEPH_MDS_SESSION_NEW || 2633 session->s_state == CEPH_MDS_SESSION_CLOSING) { 2634 __open_session(mdsc, session); 2635 /* retry the same mds later */ 2636 if (random) 2637 req->r_resend_mds = mds; 2638 } 2639 list_add(&req->r_wait, &session->s_waiting); 2640 goto out_session; 2641 } 2642 2643 /* send request */ 2644 req->r_resend_mds = -1; /* forget any previous mds hint */ 2645 2646 if (req->r_request_started == 0) /* note request start time */ 2647 req->r_request_started = jiffies; 2648 2649 err = __send_request(mdsc, session, req, false); 2650 2651 out_session: 2652 ceph_put_mds_session(session); 2653 finish: 2654 if (err) { 2655 dout("__do_request early error %d\n", err); 2656 req->r_err = err; 2657 complete_request(mdsc, req); 2658 __unregister_request(mdsc, req); 2659 } 2660 return; 2661 } 2662 2663 /* 2664 * called under mdsc->mutex 2665 */ 2666 static void __wake_requests(struct ceph_mds_client *mdsc, 2667 struct list_head *head) 2668 { 2669 struct ceph_mds_request *req; 2670 LIST_HEAD(tmp_list); 2671 2672 list_splice_init(head, &tmp_list); 2673 2674 while (!list_empty(&tmp_list)) { 2675 req = list_entry(tmp_list.next, 2676 struct ceph_mds_request, r_wait); 2677 list_del_init(&req->r_wait); 2678 dout(" wake request %p tid %llu\n", req, req->r_tid); 2679 __do_request(mdsc, req); 2680 } 2681 } 2682 2683 /* 2684 * Wake up threads with requests pending for @mds, so that they can 2685 * resubmit their requests to a possibly different mds. 2686 */ 2687 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 2688 { 2689 struct ceph_mds_request *req; 2690 struct rb_node *p = rb_first(&mdsc->request_tree); 2691 2692 dout("kick_requests mds%d\n", mds); 2693 while (p) { 2694 req = rb_entry(p, struct ceph_mds_request, r_node); 2695 p = rb_next(p); 2696 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2697 continue; 2698 if (req->r_attempts > 0) 2699 continue; /* only new requests */ 2700 if (req->r_session && 2701 req->r_session->s_mds == mds) { 2702 dout(" kicking tid %llu\n", req->r_tid); 2703 list_del_init(&req->r_wait); 2704 __do_request(mdsc, req); 2705 } 2706 } 2707 } 2708 2709 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 2710 struct ceph_mds_request *req) 2711 { 2712 int err; 2713 2714 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 2715 if (req->r_inode) 2716 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2717 if (req->r_parent) { 2718 ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 2719 ihold(req->r_parent); 2720 } 2721 if (req->r_old_dentry_dir) 2722 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2723 CEPH_CAP_PIN); 2724 2725 dout("submit_request on %p for inode %p\n", req, dir); 2726 mutex_lock(&mdsc->mutex); 2727 __register_request(mdsc, req, dir); 2728 __do_request(mdsc, req); 2729 err = req->r_err; 2730 mutex_unlock(&mdsc->mutex); 2731 return err; 2732 } 2733 2734 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 2735 struct ceph_mds_request *req) 2736 { 2737 int err; 2738 2739 /* wait */ 2740 dout("do_request waiting\n"); 2741 if (!req->r_timeout && req->r_wait_for_completion) { 2742 err = req->r_wait_for_completion(mdsc, req); 2743 } else { 2744 long timeleft = wait_for_completion_killable_timeout( 2745 &req->r_completion, 2746 ceph_timeout_jiffies(req->r_timeout)); 2747 if (timeleft > 0) 2748 err = 0; 2749 else if (!timeleft) 2750 err = -EIO; /* timed out */ 2751 else 2752 err = timeleft; /* killed */ 2753 } 2754 dout("do_request waited, got %d\n", err); 2755 mutex_lock(&mdsc->mutex); 2756 2757 /* only abort if we didn't race with a real reply */ 2758 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2759 err = le32_to_cpu(req->r_reply_info.head->result); 2760 } else if (err < 0) { 2761 dout("aborted request %lld with %d\n", req->r_tid, err); 2762 2763 /* 2764 * ensure we aren't running concurrently with 2765 * ceph_fill_trace or ceph_readdir_prepopulate, which 2766 * rely on locks (dir mutex) held by our caller. 2767 */ 2768 mutex_lock(&req->r_fill_mutex); 2769 req->r_err = err; 2770 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 2771 mutex_unlock(&req->r_fill_mutex); 2772 2773 if (req->r_parent && 2774 (req->r_op & CEPH_MDS_OP_WRITE)) 2775 ceph_invalidate_dir_request(req); 2776 } else { 2777 err = req->r_err; 2778 } 2779 2780 mutex_unlock(&mdsc->mutex); 2781 return err; 2782 } 2783 2784 /* 2785 * Synchrously perform an mds request. Take care of all of the 2786 * session setup, forwarding, retry details. 2787 */ 2788 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 2789 struct inode *dir, 2790 struct ceph_mds_request *req) 2791 { 2792 int err; 2793 2794 dout("do_request on %p\n", req); 2795 2796 /* issue */ 2797 err = ceph_mdsc_submit_request(mdsc, dir, req); 2798 if (!err) 2799 err = ceph_mdsc_wait_request(mdsc, req); 2800 dout("do_request %p done, result %d\n", req, err); 2801 return err; 2802 } 2803 2804 /* 2805 * Invalidate dir's completeness, dentry lease state on an aborted MDS 2806 * namespace request. 2807 */ 2808 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 2809 { 2810 struct inode *dir = req->r_parent; 2811 struct inode *old_dir = req->r_old_dentry_dir; 2812 2813 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 2814 2815 ceph_dir_clear_complete(dir); 2816 if (old_dir) 2817 ceph_dir_clear_complete(old_dir); 2818 if (req->r_dentry) 2819 ceph_invalidate_dentry_lease(req->r_dentry); 2820 if (req->r_old_dentry) 2821 ceph_invalidate_dentry_lease(req->r_old_dentry); 2822 } 2823 2824 /* 2825 * Handle mds reply. 2826 * 2827 * We take the session mutex and parse and process the reply immediately. 2828 * This preserves the logical ordering of replies, capabilities, etc., sent 2829 * by the MDS as they are applied to our local cache. 2830 */ 2831 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 2832 { 2833 struct ceph_mds_client *mdsc = session->s_mdsc; 2834 struct ceph_mds_request *req; 2835 struct ceph_mds_reply_head *head = msg->front.iov_base; 2836 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 2837 struct ceph_snap_realm *realm; 2838 u64 tid; 2839 int err, result; 2840 int mds = session->s_mds; 2841 2842 if (msg->front.iov_len < sizeof(*head)) { 2843 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 2844 ceph_msg_dump(msg); 2845 return; 2846 } 2847 2848 /* get request, session */ 2849 tid = le64_to_cpu(msg->hdr.tid); 2850 mutex_lock(&mdsc->mutex); 2851 req = lookup_get_request(mdsc, tid); 2852 if (!req) { 2853 dout("handle_reply on unknown tid %llu\n", tid); 2854 mutex_unlock(&mdsc->mutex); 2855 return; 2856 } 2857 dout("handle_reply %p\n", req); 2858 2859 /* correct session? */ 2860 if (req->r_session != session) { 2861 pr_err("mdsc_handle_reply got %llu on session mds%d" 2862 " not mds%d\n", tid, session->s_mds, 2863 req->r_session ? req->r_session->s_mds : -1); 2864 mutex_unlock(&mdsc->mutex); 2865 goto out; 2866 } 2867 2868 /* dup? */ 2869 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 2870 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 2871 pr_warn("got a dup %s reply on %llu from mds%d\n", 2872 head->safe ? "safe" : "unsafe", tid, mds); 2873 mutex_unlock(&mdsc->mutex); 2874 goto out; 2875 } 2876 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 2877 pr_warn("got unsafe after safe on %llu from mds%d\n", 2878 tid, mds); 2879 mutex_unlock(&mdsc->mutex); 2880 goto out; 2881 } 2882 2883 result = le32_to_cpu(head->result); 2884 2885 /* 2886 * Handle an ESTALE 2887 * if we're not talking to the authority, send to them 2888 * if the authority has changed while we weren't looking, 2889 * send to new authority 2890 * Otherwise we just have to return an ESTALE 2891 */ 2892 if (result == -ESTALE) { 2893 dout("got ESTALE on request %llu\n", req->r_tid); 2894 req->r_resend_mds = -1; 2895 if (req->r_direct_mode != USE_AUTH_MDS) { 2896 dout("not using auth, setting for that now\n"); 2897 req->r_direct_mode = USE_AUTH_MDS; 2898 __do_request(mdsc, req); 2899 mutex_unlock(&mdsc->mutex); 2900 goto out; 2901 } else { 2902 int mds = __choose_mds(mdsc, req, NULL); 2903 if (mds >= 0 && mds != req->r_session->s_mds) { 2904 dout("but auth changed, so resending\n"); 2905 __do_request(mdsc, req); 2906 mutex_unlock(&mdsc->mutex); 2907 goto out; 2908 } 2909 } 2910 dout("have to return ESTALE on request %llu\n", req->r_tid); 2911 } 2912 2913 2914 if (head->safe) { 2915 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 2916 __unregister_request(mdsc, req); 2917 2918 /* last request during umount? */ 2919 if (mdsc->stopping && !__get_oldest_req(mdsc)) 2920 complete_all(&mdsc->safe_umount_waiters); 2921 2922 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2923 /* 2924 * We already handled the unsafe response, now do the 2925 * cleanup. No need to examine the response; the MDS 2926 * doesn't include any result info in the safe 2927 * response. And even if it did, there is nothing 2928 * useful we could do with a revised return value. 2929 */ 2930 dout("got safe reply %llu, mds%d\n", tid, mds); 2931 2932 mutex_unlock(&mdsc->mutex); 2933 goto out; 2934 } 2935 } else { 2936 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 2937 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 2938 if (req->r_unsafe_dir) { 2939 struct ceph_inode_info *ci = 2940 ceph_inode(req->r_unsafe_dir); 2941 spin_lock(&ci->i_unsafe_lock); 2942 list_add_tail(&req->r_unsafe_dir_item, 2943 &ci->i_unsafe_dirops); 2944 spin_unlock(&ci->i_unsafe_lock); 2945 } 2946 } 2947 2948 dout("handle_reply tid %lld result %d\n", tid, result); 2949 rinfo = &req->r_reply_info; 2950 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 2951 err = parse_reply_info(msg, rinfo, (u64)-1); 2952 else 2953 err = parse_reply_info(msg, rinfo, session->s_con.peer_features); 2954 mutex_unlock(&mdsc->mutex); 2955 2956 mutex_lock(&session->s_mutex); 2957 if (err < 0) { 2958 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 2959 ceph_msg_dump(msg); 2960 goto out_err; 2961 } 2962 2963 /* snap trace */ 2964 realm = NULL; 2965 if (rinfo->snapblob_len) { 2966 down_write(&mdsc->snap_rwsem); 2967 ceph_update_snap_trace(mdsc, rinfo->snapblob, 2968 rinfo->snapblob + rinfo->snapblob_len, 2969 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 2970 &realm); 2971 downgrade_write(&mdsc->snap_rwsem); 2972 } else { 2973 down_read(&mdsc->snap_rwsem); 2974 } 2975 2976 /* insert trace into our cache */ 2977 mutex_lock(&req->r_fill_mutex); 2978 current->journal_info = req; 2979 err = ceph_fill_trace(mdsc->fsc->sb, req); 2980 if (err == 0) { 2981 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 2982 req->r_op == CEPH_MDS_OP_LSSNAP)) 2983 ceph_readdir_prepopulate(req, req->r_session); 2984 } 2985 current->journal_info = NULL; 2986 mutex_unlock(&req->r_fill_mutex); 2987 2988 up_read(&mdsc->snap_rwsem); 2989 if (realm) 2990 ceph_put_snap_realm(mdsc, realm); 2991 2992 if (err == 0) { 2993 if (req->r_target_inode && 2994 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2995 struct ceph_inode_info *ci = 2996 ceph_inode(req->r_target_inode); 2997 spin_lock(&ci->i_unsafe_lock); 2998 list_add_tail(&req->r_unsafe_target_item, 2999 &ci->i_unsafe_iops); 3000 spin_unlock(&ci->i_unsafe_lock); 3001 } 3002 3003 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3004 } 3005 out_err: 3006 mutex_lock(&mdsc->mutex); 3007 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3008 if (err) { 3009 req->r_err = err; 3010 } else { 3011 req->r_reply = ceph_msg_get(msg); 3012 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3013 } 3014 } else { 3015 dout("reply arrived after request %lld was aborted\n", tid); 3016 } 3017 mutex_unlock(&mdsc->mutex); 3018 3019 mutex_unlock(&session->s_mutex); 3020 3021 /* kick calling process */ 3022 complete_request(mdsc, req); 3023 out: 3024 ceph_mdsc_put_request(req); 3025 return; 3026 } 3027 3028 3029 3030 /* 3031 * handle mds notification that our request has been forwarded. 3032 */ 3033 static void handle_forward(struct ceph_mds_client *mdsc, 3034 struct ceph_mds_session *session, 3035 struct ceph_msg *msg) 3036 { 3037 struct ceph_mds_request *req; 3038 u64 tid = le64_to_cpu(msg->hdr.tid); 3039 u32 next_mds; 3040 u32 fwd_seq; 3041 int err = -EINVAL; 3042 void *p = msg->front.iov_base; 3043 void *end = p + msg->front.iov_len; 3044 3045 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3046 next_mds = ceph_decode_32(&p); 3047 fwd_seq = ceph_decode_32(&p); 3048 3049 mutex_lock(&mdsc->mutex); 3050 req = lookup_get_request(mdsc, tid); 3051 if (!req) { 3052 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 3053 goto out; /* dup reply? */ 3054 } 3055 3056 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3057 dout("forward tid %llu aborted, unregistering\n", tid); 3058 __unregister_request(mdsc, req); 3059 } else if (fwd_seq <= req->r_num_fwd) { 3060 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 3061 tid, next_mds, req->r_num_fwd, fwd_seq); 3062 } else { 3063 /* resend. forward race not possible; mds would drop */ 3064 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3065 BUG_ON(req->r_err); 3066 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3067 req->r_attempts = 0; 3068 req->r_num_fwd = fwd_seq; 3069 req->r_resend_mds = next_mds; 3070 put_request_session(req); 3071 __do_request(mdsc, req); 3072 } 3073 ceph_mdsc_put_request(req); 3074 out: 3075 mutex_unlock(&mdsc->mutex); 3076 return; 3077 3078 bad: 3079 pr_err("mdsc_handle_forward decode error err=%d\n", err); 3080 } 3081 3082 static int __decode_session_metadata(void **p, void *end, 3083 bool *blacklisted) 3084 { 3085 /* map<string,string> */ 3086 u32 n; 3087 bool err_str; 3088 ceph_decode_32_safe(p, end, n, bad); 3089 while (n-- > 0) { 3090 u32 len; 3091 ceph_decode_32_safe(p, end, len, bad); 3092 ceph_decode_need(p, end, len, bad); 3093 err_str = !strncmp(*p, "error_string", len); 3094 *p += len; 3095 ceph_decode_32_safe(p, end, len, bad); 3096 ceph_decode_need(p, end, len, bad); 3097 if (err_str && strnstr(*p, "blacklisted", len)) 3098 *blacklisted = true; 3099 *p += len; 3100 } 3101 return 0; 3102 bad: 3103 return -1; 3104 } 3105 3106 /* 3107 * handle a mds session control message 3108 */ 3109 static void handle_session(struct ceph_mds_session *session, 3110 struct ceph_msg *msg) 3111 { 3112 struct ceph_mds_client *mdsc = session->s_mdsc; 3113 int mds = session->s_mds; 3114 int msg_version = le16_to_cpu(msg->hdr.version); 3115 void *p = msg->front.iov_base; 3116 void *end = p + msg->front.iov_len; 3117 struct ceph_mds_session_head *h; 3118 u32 op; 3119 u64 seq; 3120 unsigned long features = 0; 3121 int wake = 0; 3122 bool blacklisted = false; 3123 3124 /* decode */ 3125 ceph_decode_need(&p, end, sizeof(*h), bad); 3126 h = p; 3127 p += sizeof(*h); 3128 3129 op = le32_to_cpu(h->op); 3130 seq = le64_to_cpu(h->seq); 3131 3132 if (msg_version >= 3) { 3133 u32 len; 3134 /* version >= 2, metadata */ 3135 if (__decode_session_metadata(&p, end, &blacklisted) < 0) 3136 goto bad; 3137 /* version >= 3, feature bits */ 3138 ceph_decode_32_safe(&p, end, len, bad); 3139 ceph_decode_need(&p, end, len, bad); 3140 memcpy(&features, p, min_t(size_t, len, sizeof(features))); 3141 p += len; 3142 } 3143 3144 mutex_lock(&mdsc->mutex); 3145 if (op == CEPH_SESSION_CLOSE) { 3146 ceph_get_mds_session(session); 3147 __unregister_session(mdsc, session); 3148 } 3149 /* FIXME: this ttl calculation is generous */ 3150 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 3151 mutex_unlock(&mdsc->mutex); 3152 3153 mutex_lock(&session->s_mutex); 3154 3155 dout("handle_session mds%d %s %p state %s seq %llu\n", 3156 mds, ceph_session_op_name(op), session, 3157 ceph_session_state_name(session->s_state), seq); 3158 3159 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 3160 session->s_state = CEPH_MDS_SESSION_OPEN; 3161 pr_info("mds%d came back\n", session->s_mds); 3162 } 3163 3164 switch (op) { 3165 case CEPH_SESSION_OPEN: 3166 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3167 pr_info("mds%d reconnect success\n", session->s_mds); 3168 session->s_state = CEPH_MDS_SESSION_OPEN; 3169 session->s_features = features; 3170 renewed_caps(mdsc, session, 0); 3171 wake = 1; 3172 if (mdsc->stopping) 3173 __close_session(mdsc, session); 3174 break; 3175 3176 case CEPH_SESSION_RENEWCAPS: 3177 if (session->s_renew_seq == seq) 3178 renewed_caps(mdsc, session, 1); 3179 break; 3180 3181 case CEPH_SESSION_CLOSE: 3182 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3183 pr_info("mds%d reconnect denied\n", session->s_mds); 3184 session->s_state = CEPH_MDS_SESSION_CLOSED; 3185 cleanup_session_requests(mdsc, session); 3186 remove_session_caps(session); 3187 wake = 2; /* for good measure */ 3188 wake_up_all(&mdsc->session_close_wq); 3189 break; 3190 3191 case CEPH_SESSION_STALE: 3192 pr_info("mds%d caps went stale, renewing\n", 3193 session->s_mds); 3194 spin_lock(&session->s_gen_ttl_lock); 3195 session->s_cap_gen++; 3196 session->s_cap_ttl = jiffies - 1; 3197 spin_unlock(&session->s_gen_ttl_lock); 3198 send_renew_caps(mdsc, session); 3199 break; 3200 3201 case CEPH_SESSION_RECALL_STATE: 3202 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 3203 break; 3204 3205 case CEPH_SESSION_FLUSHMSG: 3206 send_flushmsg_ack(mdsc, session, seq); 3207 break; 3208 3209 case CEPH_SESSION_FORCE_RO: 3210 dout("force_session_readonly %p\n", session); 3211 spin_lock(&session->s_cap_lock); 3212 session->s_readonly = true; 3213 spin_unlock(&session->s_cap_lock); 3214 wake_up_session_caps(session, FORCE_RO); 3215 break; 3216 3217 case CEPH_SESSION_REJECT: 3218 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 3219 pr_info("mds%d rejected session\n", session->s_mds); 3220 session->s_state = CEPH_MDS_SESSION_REJECTED; 3221 cleanup_session_requests(mdsc, session); 3222 remove_session_caps(session); 3223 if (blacklisted) 3224 mdsc->fsc->blacklisted = true; 3225 wake = 2; /* for good measure */ 3226 break; 3227 3228 default: 3229 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 3230 WARN_ON(1); 3231 } 3232 3233 mutex_unlock(&session->s_mutex); 3234 if (wake) { 3235 mutex_lock(&mdsc->mutex); 3236 __wake_requests(mdsc, &session->s_waiting); 3237 if (wake == 2) 3238 kick_requests(mdsc, mds); 3239 mutex_unlock(&mdsc->mutex); 3240 } 3241 if (op == CEPH_SESSION_CLOSE) 3242 ceph_put_mds_session(session); 3243 return; 3244 3245 bad: 3246 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 3247 (int)msg->front.iov_len); 3248 ceph_msg_dump(msg); 3249 return; 3250 } 3251 3252 /* 3253 * called under session->mutex. 3254 */ 3255 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 3256 struct ceph_mds_session *session) 3257 { 3258 struct ceph_mds_request *req, *nreq; 3259 struct rb_node *p; 3260 3261 dout("replay_unsafe_requests mds%d\n", session->s_mds); 3262 3263 mutex_lock(&mdsc->mutex); 3264 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 3265 __send_request(mdsc, session, req, true); 3266 3267 /* 3268 * also re-send old requests when MDS enters reconnect stage. So that MDS 3269 * can process completed request in clientreplay stage. 3270 */ 3271 p = rb_first(&mdsc->request_tree); 3272 while (p) { 3273 req = rb_entry(p, struct ceph_mds_request, r_node); 3274 p = rb_next(p); 3275 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3276 continue; 3277 if (req->r_attempts == 0) 3278 continue; /* only old requests */ 3279 if (req->r_session && 3280 req->r_session->s_mds == session->s_mds) 3281 __send_request(mdsc, session, req, true); 3282 } 3283 mutex_unlock(&mdsc->mutex); 3284 } 3285 3286 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 3287 { 3288 struct ceph_msg *reply; 3289 struct ceph_pagelist *_pagelist; 3290 struct page *page; 3291 __le32 *addr; 3292 int err = -ENOMEM; 3293 3294 if (!recon_state->allow_multi) 3295 return -ENOSPC; 3296 3297 /* can't handle message that contains both caps and realm */ 3298 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 3299 3300 /* pre-allocate new pagelist */ 3301 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 3302 if (!_pagelist) 3303 return -ENOMEM; 3304 3305 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3306 if (!reply) 3307 goto fail_msg; 3308 3309 /* placeholder for nr_caps */ 3310 err = ceph_pagelist_encode_32(_pagelist, 0); 3311 if (err < 0) 3312 goto fail; 3313 3314 if (recon_state->nr_caps) { 3315 /* currently encoding caps */ 3316 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 3317 if (err) 3318 goto fail; 3319 } else { 3320 /* placeholder for nr_realms (currently encoding relams) */ 3321 err = ceph_pagelist_encode_32(_pagelist, 0); 3322 if (err < 0) 3323 goto fail; 3324 } 3325 3326 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 3327 if (err) 3328 goto fail; 3329 3330 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 3331 addr = kmap_atomic(page); 3332 if (recon_state->nr_caps) { 3333 /* currently encoding caps */ 3334 *addr = cpu_to_le32(recon_state->nr_caps); 3335 } else { 3336 /* currently encoding relams */ 3337 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 3338 } 3339 kunmap_atomic(addr); 3340 3341 reply->hdr.version = cpu_to_le16(5); 3342 reply->hdr.compat_version = cpu_to_le16(4); 3343 3344 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 3345 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 3346 3347 ceph_con_send(&recon_state->session->s_con, reply); 3348 ceph_pagelist_release(recon_state->pagelist); 3349 3350 recon_state->pagelist = _pagelist; 3351 recon_state->nr_caps = 0; 3352 recon_state->nr_realms = 0; 3353 recon_state->msg_version = 5; 3354 return 0; 3355 fail: 3356 ceph_msg_put(reply); 3357 fail_msg: 3358 ceph_pagelist_release(_pagelist); 3359 return err; 3360 } 3361 3362 /* 3363 * Encode information about a cap for a reconnect with the MDS. 3364 */ 3365 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, 3366 void *arg) 3367 { 3368 union { 3369 struct ceph_mds_cap_reconnect v2; 3370 struct ceph_mds_cap_reconnect_v1 v1; 3371 } rec; 3372 struct ceph_inode_info *ci = cap->ci; 3373 struct ceph_reconnect_state *recon_state = arg; 3374 struct ceph_pagelist *pagelist = recon_state->pagelist; 3375 int err; 3376 u64 snap_follows; 3377 3378 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 3379 inode, ceph_vinop(inode), cap, cap->cap_id, 3380 ceph_cap_string(cap->issued)); 3381 3382 spin_lock(&ci->i_ceph_lock); 3383 cap->seq = 0; /* reset cap seq */ 3384 cap->issue_seq = 0; /* and issue_seq */ 3385 cap->mseq = 0; /* and migrate_seq */ 3386 cap->cap_gen = cap->session->s_cap_gen; 3387 3388 if (recon_state->msg_version >= 2) { 3389 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 3390 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3391 rec.v2.issued = cpu_to_le32(cap->issued); 3392 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3393 rec.v2.pathbase = 0; 3394 rec.v2.flock_len = (__force __le32) 3395 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 3396 } else { 3397 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 3398 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3399 rec.v1.issued = cpu_to_le32(cap->issued); 3400 rec.v1.size = cpu_to_le64(inode->i_size); 3401 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 3402 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 3403 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3404 rec.v1.pathbase = 0; 3405 } 3406 3407 if (list_empty(&ci->i_cap_snaps)) { 3408 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 3409 } else { 3410 struct ceph_cap_snap *capsnap = 3411 list_first_entry(&ci->i_cap_snaps, 3412 struct ceph_cap_snap, ci_item); 3413 snap_follows = capsnap->follows; 3414 } 3415 spin_unlock(&ci->i_ceph_lock); 3416 3417 if (recon_state->msg_version >= 2) { 3418 int num_fcntl_locks, num_flock_locks; 3419 struct ceph_filelock *flocks = NULL; 3420 size_t struct_len, total_len = sizeof(u64); 3421 u8 struct_v = 0; 3422 3423 encode_again: 3424 if (rec.v2.flock_len) { 3425 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 3426 } else { 3427 num_fcntl_locks = 0; 3428 num_flock_locks = 0; 3429 } 3430 if (num_fcntl_locks + num_flock_locks > 0) { 3431 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 3432 sizeof(struct ceph_filelock), 3433 GFP_NOFS); 3434 if (!flocks) { 3435 err = -ENOMEM; 3436 goto out_err; 3437 } 3438 err = ceph_encode_locks_to_buffer(inode, flocks, 3439 num_fcntl_locks, 3440 num_flock_locks); 3441 if (err) { 3442 kfree(flocks); 3443 flocks = NULL; 3444 if (err == -ENOSPC) 3445 goto encode_again; 3446 goto out_err; 3447 } 3448 } else { 3449 kfree(flocks); 3450 flocks = NULL; 3451 } 3452 3453 if (recon_state->msg_version >= 3) { 3454 /* version, compat_version and struct_len */ 3455 total_len += 2 * sizeof(u8) + sizeof(u32); 3456 struct_v = 2; 3457 } 3458 /* 3459 * number of encoded locks is stable, so copy to pagelist 3460 */ 3461 struct_len = 2 * sizeof(u32) + 3462 (num_fcntl_locks + num_flock_locks) * 3463 sizeof(struct ceph_filelock); 3464 rec.v2.flock_len = cpu_to_le32(struct_len); 3465 3466 struct_len += sizeof(u32) + sizeof(rec.v2); 3467 3468 if (struct_v >= 2) 3469 struct_len += sizeof(u64); /* snap_follows */ 3470 3471 total_len += struct_len; 3472 3473 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 3474 err = send_reconnect_partial(recon_state); 3475 if (err) 3476 goto out_freeflocks; 3477 pagelist = recon_state->pagelist; 3478 } 3479 3480 err = ceph_pagelist_reserve(pagelist, total_len); 3481 if (err) 3482 goto out_freeflocks; 3483 3484 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3485 if (recon_state->msg_version >= 3) { 3486 ceph_pagelist_encode_8(pagelist, struct_v); 3487 ceph_pagelist_encode_8(pagelist, 1); 3488 ceph_pagelist_encode_32(pagelist, struct_len); 3489 } 3490 ceph_pagelist_encode_string(pagelist, NULL, 0); 3491 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 3492 ceph_locks_to_pagelist(flocks, pagelist, 3493 num_fcntl_locks, num_flock_locks); 3494 if (struct_v >= 2) 3495 ceph_pagelist_encode_64(pagelist, snap_follows); 3496 out_freeflocks: 3497 kfree(flocks); 3498 } else { 3499 u64 pathbase = 0; 3500 int pathlen = 0; 3501 char *path = NULL; 3502 struct dentry *dentry; 3503 3504 dentry = d_find_alias(inode); 3505 if (dentry) { 3506 path = ceph_mdsc_build_path(dentry, 3507 &pathlen, &pathbase, 0); 3508 dput(dentry); 3509 if (IS_ERR(path)) { 3510 err = PTR_ERR(path); 3511 goto out_err; 3512 } 3513 rec.v1.pathbase = cpu_to_le64(pathbase); 3514 } 3515 3516 err = ceph_pagelist_reserve(pagelist, 3517 sizeof(u64) + sizeof(u32) + 3518 pathlen + sizeof(rec.v1)); 3519 if (err) { 3520 goto out_freepath; 3521 } 3522 3523 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3524 ceph_pagelist_encode_string(pagelist, path, pathlen); 3525 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 3526 out_freepath: 3527 ceph_mdsc_free_path(path, pathlen); 3528 } 3529 3530 out_err: 3531 if (err >= 0) 3532 recon_state->nr_caps++; 3533 return err; 3534 } 3535 3536 static int encode_snap_realms(struct ceph_mds_client *mdsc, 3537 struct ceph_reconnect_state *recon_state) 3538 { 3539 struct rb_node *p; 3540 struct ceph_pagelist *pagelist = recon_state->pagelist; 3541 int err = 0; 3542 3543 if (recon_state->msg_version >= 4) { 3544 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 3545 if (err < 0) 3546 goto fail; 3547 } 3548 3549 /* 3550 * snaprealms. we provide mds with the ino, seq (version), and 3551 * parent for all of our realms. If the mds has any newer info, 3552 * it will tell us. 3553 */ 3554 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 3555 struct ceph_snap_realm *realm = 3556 rb_entry(p, struct ceph_snap_realm, node); 3557 struct ceph_mds_snaprealm_reconnect sr_rec; 3558 3559 if (recon_state->msg_version >= 4) { 3560 size_t need = sizeof(u8) * 2 + sizeof(u32) + 3561 sizeof(sr_rec); 3562 3563 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 3564 err = send_reconnect_partial(recon_state); 3565 if (err) 3566 goto fail; 3567 pagelist = recon_state->pagelist; 3568 } 3569 3570 err = ceph_pagelist_reserve(pagelist, need); 3571 if (err) 3572 goto fail; 3573 3574 ceph_pagelist_encode_8(pagelist, 1); 3575 ceph_pagelist_encode_8(pagelist, 1); 3576 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 3577 } 3578 3579 dout(" adding snap realm %llx seq %lld parent %llx\n", 3580 realm->ino, realm->seq, realm->parent_ino); 3581 sr_rec.ino = cpu_to_le64(realm->ino); 3582 sr_rec.seq = cpu_to_le64(realm->seq); 3583 sr_rec.parent = cpu_to_le64(realm->parent_ino); 3584 3585 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 3586 if (err) 3587 goto fail; 3588 3589 recon_state->nr_realms++; 3590 } 3591 fail: 3592 return err; 3593 } 3594 3595 3596 /* 3597 * If an MDS fails and recovers, clients need to reconnect in order to 3598 * reestablish shared state. This includes all caps issued through 3599 * this session _and_ the snap_realm hierarchy. Because it's not 3600 * clear which snap realms the mds cares about, we send everything we 3601 * know about.. that ensures we'll then get any new info the 3602 * recovering MDS might have. 3603 * 3604 * This is a relatively heavyweight operation, but it's rare. 3605 * 3606 * called with mdsc->mutex held. 3607 */ 3608 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 3609 struct ceph_mds_session *session) 3610 { 3611 struct ceph_msg *reply; 3612 int mds = session->s_mds; 3613 int err = -ENOMEM; 3614 struct ceph_reconnect_state recon_state = { 3615 .session = session, 3616 }; 3617 LIST_HEAD(dispose); 3618 3619 pr_info("mds%d reconnect start\n", mds); 3620 3621 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 3622 if (!recon_state.pagelist) 3623 goto fail_nopagelist; 3624 3625 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3626 if (!reply) 3627 goto fail_nomsg; 3628 3629 mutex_lock(&session->s_mutex); 3630 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 3631 session->s_seq = 0; 3632 3633 dout("session %p state %s\n", session, 3634 ceph_session_state_name(session->s_state)); 3635 3636 spin_lock(&session->s_gen_ttl_lock); 3637 session->s_cap_gen++; 3638 spin_unlock(&session->s_gen_ttl_lock); 3639 3640 spin_lock(&session->s_cap_lock); 3641 /* don't know if session is readonly */ 3642 session->s_readonly = 0; 3643 /* 3644 * notify __ceph_remove_cap() that we are composing cap reconnect. 3645 * If a cap get released before being added to the cap reconnect, 3646 * __ceph_remove_cap() should skip queuing cap release. 3647 */ 3648 session->s_cap_reconnect = 1; 3649 /* drop old cap expires; we're about to reestablish that state */ 3650 detach_cap_releases(session, &dispose); 3651 spin_unlock(&session->s_cap_lock); 3652 dispose_cap_releases(mdsc, &dispose); 3653 3654 /* trim unused caps to reduce MDS's cache rejoin time */ 3655 if (mdsc->fsc->sb->s_root) 3656 shrink_dcache_parent(mdsc->fsc->sb->s_root); 3657 3658 ceph_con_close(&session->s_con); 3659 ceph_con_open(&session->s_con, 3660 CEPH_ENTITY_TYPE_MDS, mds, 3661 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 3662 3663 /* replay unsafe requests */ 3664 replay_unsafe_requests(mdsc, session); 3665 3666 ceph_early_kick_flushing_caps(mdsc, session); 3667 3668 down_read(&mdsc->snap_rwsem); 3669 3670 /* placeholder for nr_caps */ 3671 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 3672 if (err) 3673 goto fail; 3674 3675 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 3676 recon_state.msg_version = 3; 3677 recon_state.allow_multi = true; 3678 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 3679 recon_state.msg_version = 3; 3680 } else { 3681 recon_state.msg_version = 2; 3682 } 3683 /* trsaverse this session's caps */ 3684 err = ceph_iterate_session_caps(session, encode_caps_cb, &recon_state); 3685 3686 spin_lock(&session->s_cap_lock); 3687 session->s_cap_reconnect = 0; 3688 spin_unlock(&session->s_cap_lock); 3689 3690 if (err < 0) 3691 goto fail; 3692 3693 /* check if all realms can be encoded into current message */ 3694 if (mdsc->num_snap_realms) { 3695 size_t total_len = 3696 recon_state.pagelist->length + 3697 mdsc->num_snap_realms * 3698 sizeof(struct ceph_mds_snaprealm_reconnect); 3699 if (recon_state.msg_version >= 4) { 3700 /* number of realms */ 3701 total_len += sizeof(u32); 3702 /* version, compat_version and struct_len */ 3703 total_len += mdsc->num_snap_realms * 3704 (2 * sizeof(u8) + sizeof(u32)); 3705 } 3706 if (total_len > RECONNECT_MAX_SIZE) { 3707 if (!recon_state.allow_multi) { 3708 err = -ENOSPC; 3709 goto fail; 3710 } 3711 if (recon_state.nr_caps) { 3712 err = send_reconnect_partial(&recon_state); 3713 if (err) 3714 goto fail; 3715 } 3716 recon_state.msg_version = 5; 3717 } 3718 } 3719 3720 err = encode_snap_realms(mdsc, &recon_state); 3721 if (err < 0) 3722 goto fail; 3723 3724 if (recon_state.msg_version >= 5) { 3725 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 3726 if (err < 0) 3727 goto fail; 3728 } 3729 3730 if (recon_state.nr_caps || recon_state.nr_realms) { 3731 struct page *page = 3732 list_first_entry(&recon_state.pagelist->head, 3733 struct page, lru); 3734 __le32 *addr = kmap_atomic(page); 3735 if (recon_state.nr_caps) { 3736 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 3737 *addr = cpu_to_le32(recon_state.nr_caps); 3738 } else if (recon_state.msg_version >= 4) { 3739 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 3740 } 3741 kunmap_atomic(addr); 3742 } 3743 3744 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 3745 if (recon_state.msg_version >= 4) 3746 reply->hdr.compat_version = cpu_to_le16(4); 3747 3748 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 3749 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 3750 3751 ceph_con_send(&session->s_con, reply); 3752 3753 mutex_unlock(&session->s_mutex); 3754 3755 mutex_lock(&mdsc->mutex); 3756 __wake_requests(mdsc, &session->s_waiting); 3757 mutex_unlock(&mdsc->mutex); 3758 3759 up_read(&mdsc->snap_rwsem); 3760 ceph_pagelist_release(recon_state.pagelist); 3761 return; 3762 3763 fail: 3764 ceph_msg_put(reply); 3765 up_read(&mdsc->snap_rwsem); 3766 mutex_unlock(&session->s_mutex); 3767 fail_nomsg: 3768 ceph_pagelist_release(recon_state.pagelist); 3769 fail_nopagelist: 3770 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 3771 return; 3772 } 3773 3774 3775 /* 3776 * compare old and new mdsmaps, kicking requests 3777 * and closing out old connections as necessary 3778 * 3779 * called under mdsc->mutex. 3780 */ 3781 static void check_new_map(struct ceph_mds_client *mdsc, 3782 struct ceph_mdsmap *newmap, 3783 struct ceph_mdsmap *oldmap) 3784 { 3785 int i; 3786 int oldstate, newstate; 3787 struct ceph_mds_session *s; 3788 3789 dout("check_new_map new %u old %u\n", 3790 newmap->m_epoch, oldmap->m_epoch); 3791 3792 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 3793 if (!mdsc->sessions[i]) 3794 continue; 3795 s = mdsc->sessions[i]; 3796 oldstate = ceph_mdsmap_get_state(oldmap, i); 3797 newstate = ceph_mdsmap_get_state(newmap, i); 3798 3799 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 3800 i, ceph_mds_state_name(oldstate), 3801 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 3802 ceph_mds_state_name(newstate), 3803 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 3804 ceph_session_state_name(s->s_state)); 3805 3806 if (i >= newmap->possible_max_rank) { 3807 /* force close session for stopped mds */ 3808 ceph_get_mds_session(s); 3809 __unregister_session(mdsc, s); 3810 __wake_requests(mdsc, &s->s_waiting); 3811 mutex_unlock(&mdsc->mutex); 3812 3813 mutex_lock(&s->s_mutex); 3814 cleanup_session_requests(mdsc, s); 3815 remove_session_caps(s); 3816 mutex_unlock(&s->s_mutex); 3817 3818 ceph_put_mds_session(s); 3819 3820 mutex_lock(&mdsc->mutex); 3821 kick_requests(mdsc, i); 3822 continue; 3823 } 3824 3825 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 3826 ceph_mdsmap_get_addr(newmap, i), 3827 sizeof(struct ceph_entity_addr))) { 3828 /* just close it */ 3829 mutex_unlock(&mdsc->mutex); 3830 mutex_lock(&s->s_mutex); 3831 mutex_lock(&mdsc->mutex); 3832 ceph_con_close(&s->s_con); 3833 mutex_unlock(&s->s_mutex); 3834 s->s_state = CEPH_MDS_SESSION_RESTARTING; 3835 } else if (oldstate == newstate) { 3836 continue; /* nothing new with this mds */ 3837 } 3838 3839 /* 3840 * send reconnect? 3841 */ 3842 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 3843 newstate >= CEPH_MDS_STATE_RECONNECT) { 3844 mutex_unlock(&mdsc->mutex); 3845 send_mds_reconnect(mdsc, s); 3846 mutex_lock(&mdsc->mutex); 3847 } 3848 3849 /* 3850 * kick request on any mds that has gone active. 3851 */ 3852 if (oldstate < CEPH_MDS_STATE_ACTIVE && 3853 newstate >= CEPH_MDS_STATE_ACTIVE) { 3854 if (oldstate != CEPH_MDS_STATE_CREATING && 3855 oldstate != CEPH_MDS_STATE_STARTING) 3856 pr_info("mds%d recovery completed\n", s->s_mds); 3857 kick_requests(mdsc, i); 3858 ceph_kick_flushing_caps(mdsc, s); 3859 wake_up_session_caps(s, RECONNECT); 3860 } 3861 } 3862 3863 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 3864 s = mdsc->sessions[i]; 3865 if (!s) 3866 continue; 3867 if (!ceph_mdsmap_is_laggy(newmap, i)) 3868 continue; 3869 if (s->s_state == CEPH_MDS_SESSION_OPEN || 3870 s->s_state == CEPH_MDS_SESSION_HUNG || 3871 s->s_state == CEPH_MDS_SESSION_CLOSING) { 3872 dout(" connecting to export targets of laggy mds%d\n", 3873 i); 3874 __open_export_target_sessions(mdsc, s); 3875 } 3876 } 3877 } 3878 3879 3880 3881 /* 3882 * leases 3883 */ 3884 3885 /* 3886 * caller must hold session s_mutex, dentry->d_lock 3887 */ 3888 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 3889 { 3890 struct ceph_dentry_info *di = ceph_dentry(dentry); 3891 3892 ceph_put_mds_session(di->lease_session); 3893 di->lease_session = NULL; 3894 } 3895 3896 static void handle_lease(struct ceph_mds_client *mdsc, 3897 struct ceph_mds_session *session, 3898 struct ceph_msg *msg) 3899 { 3900 struct super_block *sb = mdsc->fsc->sb; 3901 struct inode *inode; 3902 struct dentry *parent, *dentry; 3903 struct ceph_dentry_info *di; 3904 int mds = session->s_mds; 3905 struct ceph_mds_lease *h = msg->front.iov_base; 3906 u32 seq; 3907 struct ceph_vino vino; 3908 struct qstr dname; 3909 int release = 0; 3910 3911 dout("handle_lease from mds%d\n", mds); 3912 3913 /* decode */ 3914 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 3915 goto bad; 3916 vino.ino = le64_to_cpu(h->ino); 3917 vino.snap = CEPH_NOSNAP; 3918 seq = le32_to_cpu(h->seq); 3919 dname.len = get_unaligned_le32(h + 1); 3920 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 3921 goto bad; 3922 dname.name = (void *)(h + 1) + sizeof(u32); 3923 3924 /* lookup inode */ 3925 inode = ceph_find_inode(sb, vino); 3926 dout("handle_lease %s, ino %llx %p %.*s\n", 3927 ceph_lease_op_name(h->action), vino.ino, inode, 3928 dname.len, dname.name); 3929 3930 mutex_lock(&session->s_mutex); 3931 session->s_seq++; 3932 3933 if (!inode) { 3934 dout("handle_lease no inode %llx\n", vino.ino); 3935 goto release; 3936 } 3937 3938 /* dentry */ 3939 parent = d_find_alias(inode); 3940 if (!parent) { 3941 dout("no parent dentry on inode %p\n", inode); 3942 WARN_ON(1); 3943 goto release; /* hrm... */ 3944 } 3945 dname.hash = full_name_hash(parent, dname.name, dname.len); 3946 dentry = d_lookup(parent, &dname); 3947 dput(parent); 3948 if (!dentry) 3949 goto release; 3950 3951 spin_lock(&dentry->d_lock); 3952 di = ceph_dentry(dentry); 3953 switch (h->action) { 3954 case CEPH_MDS_LEASE_REVOKE: 3955 if (di->lease_session == session) { 3956 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 3957 h->seq = cpu_to_le32(di->lease_seq); 3958 __ceph_mdsc_drop_dentry_lease(dentry); 3959 } 3960 release = 1; 3961 break; 3962 3963 case CEPH_MDS_LEASE_RENEW: 3964 if (di->lease_session == session && 3965 di->lease_gen == session->s_cap_gen && 3966 di->lease_renew_from && 3967 di->lease_renew_after == 0) { 3968 unsigned long duration = 3969 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 3970 3971 di->lease_seq = seq; 3972 di->time = di->lease_renew_from + duration; 3973 di->lease_renew_after = di->lease_renew_from + 3974 (duration >> 1); 3975 di->lease_renew_from = 0; 3976 } 3977 break; 3978 } 3979 spin_unlock(&dentry->d_lock); 3980 dput(dentry); 3981 3982 if (!release) 3983 goto out; 3984 3985 release: 3986 /* let's just reuse the same message */ 3987 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 3988 ceph_msg_get(msg); 3989 ceph_con_send(&session->s_con, msg); 3990 3991 out: 3992 mutex_unlock(&session->s_mutex); 3993 /* avoid calling iput_final() in mds dispatch threads */ 3994 ceph_async_iput(inode); 3995 return; 3996 3997 bad: 3998 pr_err("corrupt lease message\n"); 3999 ceph_msg_dump(msg); 4000 } 4001 4002 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 4003 struct dentry *dentry, char action, 4004 u32 seq) 4005 { 4006 struct ceph_msg *msg; 4007 struct ceph_mds_lease *lease; 4008 struct inode *dir; 4009 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 4010 4011 dout("lease_send_msg identry %p %s to mds%d\n", 4012 dentry, ceph_lease_op_name(action), session->s_mds); 4013 4014 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 4015 if (!msg) 4016 return; 4017 lease = msg->front.iov_base; 4018 lease->action = action; 4019 lease->seq = cpu_to_le32(seq); 4020 4021 spin_lock(&dentry->d_lock); 4022 dir = d_inode(dentry->d_parent); 4023 lease->ino = cpu_to_le64(ceph_ino(dir)); 4024 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 4025 4026 put_unaligned_le32(dentry->d_name.len, lease + 1); 4027 memcpy((void *)(lease + 1) + 4, 4028 dentry->d_name.name, dentry->d_name.len); 4029 spin_unlock(&dentry->d_lock); 4030 /* 4031 * if this is a preemptive lease RELEASE, no need to 4032 * flush request stream, since the actual request will 4033 * soon follow. 4034 */ 4035 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 4036 4037 ceph_con_send(&session->s_con, msg); 4038 } 4039 4040 /* 4041 * lock unlock sessions, to wait ongoing session activities 4042 */ 4043 static void lock_unlock_sessions(struct ceph_mds_client *mdsc) 4044 { 4045 int i; 4046 4047 mutex_lock(&mdsc->mutex); 4048 for (i = 0; i < mdsc->max_sessions; i++) { 4049 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4050 if (!s) 4051 continue; 4052 mutex_unlock(&mdsc->mutex); 4053 mutex_lock(&s->s_mutex); 4054 mutex_unlock(&s->s_mutex); 4055 ceph_put_mds_session(s); 4056 mutex_lock(&mdsc->mutex); 4057 } 4058 mutex_unlock(&mdsc->mutex); 4059 } 4060 4061 static void maybe_recover_session(struct ceph_mds_client *mdsc) 4062 { 4063 struct ceph_fs_client *fsc = mdsc->fsc; 4064 4065 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 4066 return; 4067 4068 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 4069 return; 4070 4071 if (!READ_ONCE(fsc->blacklisted)) 4072 return; 4073 4074 if (fsc->last_auto_reconnect && 4075 time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30)) 4076 return; 4077 4078 pr_info("auto reconnect after blacklisted\n"); 4079 fsc->last_auto_reconnect = jiffies; 4080 ceph_force_reconnect(fsc->sb); 4081 } 4082 4083 /* 4084 * delayed work -- periodically trim expired leases, renew caps with mds 4085 */ 4086 static void schedule_delayed(struct ceph_mds_client *mdsc) 4087 { 4088 int delay = 5; 4089 unsigned hz = round_jiffies_relative(HZ * delay); 4090 schedule_delayed_work(&mdsc->delayed_work, hz); 4091 } 4092 4093 static void delayed_work(struct work_struct *work) 4094 { 4095 int i; 4096 struct ceph_mds_client *mdsc = 4097 container_of(work, struct ceph_mds_client, delayed_work.work); 4098 int renew_interval; 4099 int renew_caps; 4100 4101 dout("mdsc delayed_work\n"); 4102 4103 mutex_lock(&mdsc->mutex); 4104 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 4105 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 4106 mdsc->last_renew_caps); 4107 if (renew_caps) 4108 mdsc->last_renew_caps = jiffies; 4109 4110 for (i = 0; i < mdsc->max_sessions; i++) { 4111 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4112 if (!s) 4113 continue; 4114 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 4115 dout("resending session close request for mds%d\n", 4116 s->s_mds); 4117 request_close_session(mdsc, s); 4118 ceph_put_mds_session(s); 4119 continue; 4120 } 4121 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 4122 if (s->s_state == CEPH_MDS_SESSION_OPEN) { 4123 s->s_state = CEPH_MDS_SESSION_HUNG; 4124 pr_info("mds%d hung\n", s->s_mds); 4125 } 4126 } 4127 if (s->s_state == CEPH_MDS_SESSION_NEW || 4128 s->s_state == CEPH_MDS_SESSION_RESTARTING || 4129 s->s_state == CEPH_MDS_SESSION_REJECTED) { 4130 /* this mds is failed or recovering, just wait */ 4131 ceph_put_mds_session(s); 4132 continue; 4133 } 4134 mutex_unlock(&mdsc->mutex); 4135 4136 mutex_lock(&s->s_mutex); 4137 if (renew_caps) 4138 send_renew_caps(mdsc, s); 4139 else 4140 ceph_con_keepalive(&s->s_con); 4141 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4142 s->s_state == CEPH_MDS_SESSION_HUNG) 4143 ceph_send_cap_releases(mdsc, s); 4144 mutex_unlock(&s->s_mutex); 4145 ceph_put_mds_session(s); 4146 4147 mutex_lock(&mdsc->mutex); 4148 } 4149 mutex_unlock(&mdsc->mutex); 4150 4151 ceph_check_delayed_caps(mdsc); 4152 4153 ceph_queue_cap_reclaim_work(mdsc); 4154 4155 ceph_trim_snapid_map(mdsc); 4156 4157 maybe_recover_session(mdsc); 4158 4159 schedule_delayed(mdsc); 4160 } 4161 4162 int ceph_mdsc_init(struct ceph_fs_client *fsc) 4163 4164 { 4165 struct ceph_mds_client *mdsc; 4166 4167 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 4168 if (!mdsc) 4169 return -ENOMEM; 4170 mdsc->fsc = fsc; 4171 mutex_init(&mdsc->mutex); 4172 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 4173 if (!mdsc->mdsmap) { 4174 kfree(mdsc); 4175 return -ENOMEM; 4176 } 4177 4178 fsc->mdsc = mdsc; 4179 init_completion(&mdsc->safe_umount_waiters); 4180 init_waitqueue_head(&mdsc->session_close_wq); 4181 INIT_LIST_HEAD(&mdsc->waiting_for_map); 4182 mdsc->sessions = NULL; 4183 atomic_set(&mdsc->num_sessions, 0); 4184 mdsc->max_sessions = 0; 4185 mdsc->stopping = 0; 4186 atomic64_set(&mdsc->quotarealms_count, 0); 4187 mdsc->quotarealms_inodes = RB_ROOT; 4188 mutex_init(&mdsc->quotarealms_inodes_mutex); 4189 mdsc->last_snap_seq = 0; 4190 init_rwsem(&mdsc->snap_rwsem); 4191 mdsc->snap_realms = RB_ROOT; 4192 INIT_LIST_HEAD(&mdsc->snap_empty); 4193 mdsc->num_snap_realms = 0; 4194 spin_lock_init(&mdsc->snap_empty_lock); 4195 mdsc->last_tid = 0; 4196 mdsc->oldest_tid = 0; 4197 mdsc->request_tree = RB_ROOT; 4198 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 4199 mdsc->last_renew_caps = jiffies; 4200 INIT_LIST_HEAD(&mdsc->cap_delay_list); 4201 INIT_LIST_HEAD(&mdsc->cap_wait_list); 4202 spin_lock_init(&mdsc->cap_delay_lock); 4203 INIT_LIST_HEAD(&mdsc->snap_flush_list); 4204 spin_lock_init(&mdsc->snap_flush_lock); 4205 mdsc->last_cap_flush_tid = 1; 4206 INIT_LIST_HEAD(&mdsc->cap_flush_list); 4207 INIT_LIST_HEAD(&mdsc->cap_dirty); 4208 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 4209 mdsc->num_cap_flushing = 0; 4210 spin_lock_init(&mdsc->cap_dirty_lock); 4211 init_waitqueue_head(&mdsc->cap_flushing_wq); 4212 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 4213 atomic_set(&mdsc->cap_reclaim_pending, 0); 4214 4215 spin_lock_init(&mdsc->dentry_list_lock); 4216 INIT_LIST_HEAD(&mdsc->dentry_leases); 4217 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 4218 4219 ceph_caps_init(mdsc); 4220 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 4221 4222 spin_lock_init(&mdsc->snapid_map_lock); 4223 mdsc->snapid_map_tree = RB_ROOT; 4224 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 4225 4226 init_rwsem(&mdsc->pool_perm_rwsem); 4227 mdsc->pool_perm_tree = RB_ROOT; 4228 4229 strscpy(mdsc->nodename, utsname()->nodename, 4230 sizeof(mdsc->nodename)); 4231 return 0; 4232 } 4233 4234 /* 4235 * Wait for safe replies on open mds requests. If we time out, drop 4236 * all requests from the tree to avoid dangling dentry refs. 4237 */ 4238 static void wait_requests(struct ceph_mds_client *mdsc) 4239 { 4240 struct ceph_options *opts = mdsc->fsc->client->options; 4241 struct ceph_mds_request *req; 4242 4243 mutex_lock(&mdsc->mutex); 4244 if (__get_oldest_req(mdsc)) { 4245 mutex_unlock(&mdsc->mutex); 4246 4247 dout("wait_requests waiting for requests\n"); 4248 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 4249 ceph_timeout_jiffies(opts->mount_timeout)); 4250 4251 /* tear down remaining requests */ 4252 mutex_lock(&mdsc->mutex); 4253 while ((req = __get_oldest_req(mdsc))) { 4254 dout("wait_requests timed out on tid %llu\n", 4255 req->r_tid); 4256 list_del_init(&req->r_wait); 4257 __unregister_request(mdsc, req); 4258 } 4259 } 4260 mutex_unlock(&mdsc->mutex); 4261 dout("wait_requests done\n"); 4262 } 4263 4264 /* 4265 * called before mount is ro, and before dentries are torn down. 4266 * (hmm, does this still race with new lookups?) 4267 */ 4268 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 4269 { 4270 dout("pre_umount\n"); 4271 mdsc->stopping = 1; 4272 4273 lock_unlock_sessions(mdsc); 4274 ceph_flush_dirty_caps(mdsc); 4275 wait_requests(mdsc); 4276 4277 /* 4278 * wait for reply handlers to drop their request refs and 4279 * their inode/dcache refs 4280 */ 4281 ceph_msgr_flush(); 4282 4283 ceph_cleanup_quotarealms_inodes(mdsc); 4284 } 4285 4286 /* 4287 * wait for all write mds requests to flush. 4288 */ 4289 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 4290 { 4291 struct ceph_mds_request *req = NULL, *nextreq; 4292 struct rb_node *n; 4293 4294 mutex_lock(&mdsc->mutex); 4295 dout("wait_unsafe_requests want %lld\n", want_tid); 4296 restart: 4297 req = __get_oldest_req(mdsc); 4298 while (req && req->r_tid <= want_tid) { 4299 /* find next request */ 4300 n = rb_next(&req->r_node); 4301 if (n) 4302 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 4303 else 4304 nextreq = NULL; 4305 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 4306 (req->r_op & CEPH_MDS_OP_WRITE)) { 4307 /* write op */ 4308 ceph_mdsc_get_request(req); 4309 if (nextreq) 4310 ceph_mdsc_get_request(nextreq); 4311 mutex_unlock(&mdsc->mutex); 4312 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 4313 req->r_tid, want_tid); 4314 wait_for_completion(&req->r_safe_completion); 4315 mutex_lock(&mdsc->mutex); 4316 ceph_mdsc_put_request(req); 4317 if (!nextreq) 4318 break; /* next dne before, so we're done! */ 4319 if (RB_EMPTY_NODE(&nextreq->r_node)) { 4320 /* next request was removed from tree */ 4321 ceph_mdsc_put_request(nextreq); 4322 goto restart; 4323 } 4324 ceph_mdsc_put_request(nextreq); /* won't go away */ 4325 } 4326 req = nextreq; 4327 } 4328 mutex_unlock(&mdsc->mutex); 4329 dout("wait_unsafe_requests done\n"); 4330 } 4331 4332 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 4333 { 4334 u64 want_tid, want_flush; 4335 4336 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4337 return; 4338 4339 dout("sync\n"); 4340 mutex_lock(&mdsc->mutex); 4341 want_tid = mdsc->last_tid; 4342 mutex_unlock(&mdsc->mutex); 4343 4344 ceph_flush_dirty_caps(mdsc); 4345 spin_lock(&mdsc->cap_dirty_lock); 4346 want_flush = mdsc->last_cap_flush_tid; 4347 if (!list_empty(&mdsc->cap_flush_list)) { 4348 struct ceph_cap_flush *cf = 4349 list_last_entry(&mdsc->cap_flush_list, 4350 struct ceph_cap_flush, g_list); 4351 cf->wake = true; 4352 } 4353 spin_unlock(&mdsc->cap_dirty_lock); 4354 4355 dout("sync want tid %lld flush_seq %lld\n", 4356 want_tid, want_flush); 4357 4358 wait_unsafe_requests(mdsc, want_tid); 4359 wait_caps_flush(mdsc, want_flush); 4360 } 4361 4362 /* 4363 * true if all sessions are closed, or we force unmount 4364 */ 4365 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 4366 { 4367 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4368 return true; 4369 return atomic_read(&mdsc->num_sessions) <= skipped; 4370 } 4371 4372 /* 4373 * called after sb is ro. 4374 */ 4375 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 4376 { 4377 struct ceph_options *opts = mdsc->fsc->client->options; 4378 struct ceph_mds_session *session; 4379 int i; 4380 int skipped = 0; 4381 4382 dout("close_sessions\n"); 4383 4384 /* close sessions */ 4385 mutex_lock(&mdsc->mutex); 4386 for (i = 0; i < mdsc->max_sessions; i++) { 4387 session = __ceph_lookup_mds_session(mdsc, i); 4388 if (!session) 4389 continue; 4390 mutex_unlock(&mdsc->mutex); 4391 mutex_lock(&session->s_mutex); 4392 if (__close_session(mdsc, session) <= 0) 4393 skipped++; 4394 mutex_unlock(&session->s_mutex); 4395 ceph_put_mds_session(session); 4396 mutex_lock(&mdsc->mutex); 4397 } 4398 mutex_unlock(&mdsc->mutex); 4399 4400 dout("waiting for sessions to close\n"); 4401 wait_event_timeout(mdsc->session_close_wq, 4402 done_closing_sessions(mdsc, skipped), 4403 ceph_timeout_jiffies(opts->mount_timeout)); 4404 4405 /* tear down remaining sessions */ 4406 mutex_lock(&mdsc->mutex); 4407 for (i = 0; i < mdsc->max_sessions; i++) { 4408 if (mdsc->sessions[i]) { 4409 session = ceph_get_mds_session(mdsc->sessions[i]); 4410 __unregister_session(mdsc, session); 4411 mutex_unlock(&mdsc->mutex); 4412 mutex_lock(&session->s_mutex); 4413 remove_session_caps(session); 4414 mutex_unlock(&session->s_mutex); 4415 ceph_put_mds_session(session); 4416 mutex_lock(&mdsc->mutex); 4417 } 4418 } 4419 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 4420 mutex_unlock(&mdsc->mutex); 4421 4422 ceph_cleanup_snapid_map(mdsc); 4423 ceph_cleanup_empty_realms(mdsc); 4424 4425 cancel_work_sync(&mdsc->cap_reclaim_work); 4426 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4427 4428 dout("stopped\n"); 4429 } 4430 4431 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 4432 { 4433 struct ceph_mds_session *session; 4434 int mds; 4435 4436 dout("force umount\n"); 4437 4438 mutex_lock(&mdsc->mutex); 4439 for (mds = 0; mds < mdsc->max_sessions; mds++) { 4440 session = __ceph_lookup_mds_session(mdsc, mds); 4441 if (!session) 4442 continue; 4443 4444 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 4445 __unregister_session(mdsc, session); 4446 __wake_requests(mdsc, &session->s_waiting); 4447 mutex_unlock(&mdsc->mutex); 4448 4449 mutex_lock(&session->s_mutex); 4450 __close_session(mdsc, session); 4451 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 4452 cleanup_session_requests(mdsc, session); 4453 remove_session_caps(session); 4454 } 4455 mutex_unlock(&session->s_mutex); 4456 ceph_put_mds_session(session); 4457 4458 mutex_lock(&mdsc->mutex); 4459 kick_requests(mdsc, mds); 4460 } 4461 __wake_requests(mdsc, &mdsc->waiting_for_map); 4462 mutex_unlock(&mdsc->mutex); 4463 } 4464 4465 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 4466 { 4467 dout("stop\n"); 4468 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4469 if (mdsc->mdsmap) 4470 ceph_mdsmap_destroy(mdsc->mdsmap); 4471 kfree(mdsc->sessions); 4472 ceph_caps_finalize(mdsc); 4473 ceph_pool_perm_destroy(mdsc); 4474 } 4475 4476 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 4477 { 4478 struct ceph_mds_client *mdsc = fsc->mdsc; 4479 dout("mdsc_destroy %p\n", mdsc); 4480 4481 if (!mdsc) 4482 return; 4483 4484 /* flush out any connection work with references to us */ 4485 ceph_msgr_flush(); 4486 4487 ceph_mdsc_stop(mdsc); 4488 4489 fsc->mdsc = NULL; 4490 kfree(mdsc); 4491 dout("mdsc_destroy %p done\n", mdsc); 4492 } 4493 4494 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4495 { 4496 struct ceph_fs_client *fsc = mdsc->fsc; 4497 const char *mds_namespace = fsc->mount_options->mds_namespace; 4498 void *p = msg->front.iov_base; 4499 void *end = p + msg->front.iov_len; 4500 u32 epoch; 4501 u32 map_len; 4502 u32 num_fs; 4503 u32 mount_fscid = (u32)-1; 4504 u8 struct_v, struct_cv; 4505 int err = -EINVAL; 4506 4507 ceph_decode_need(&p, end, sizeof(u32), bad); 4508 epoch = ceph_decode_32(&p); 4509 4510 dout("handle_fsmap epoch %u\n", epoch); 4511 4512 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4513 struct_v = ceph_decode_8(&p); 4514 struct_cv = ceph_decode_8(&p); 4515 map_len = ceph_decode_32(&p); 4516 4517 ceph_decode_need(&p, end, sizeof(u32) * 3, bad); 4518 p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */ 4519 4520 num_fs = ceph_decode_32(&p); 4521 while (num_fs-- > 0) { 4522 void *info_p, *info_end; 4523 u32 info_len; 4524 u8 info_v, info_cv; 4525 u32 fscid, namelen; 4526 4527 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4528 info_v = ceph_decode_8(&p); 4529 info_cv = ceph_decode_8(&p); 4530 info_len = ceph_decode_32(&p); 4531 ceph_decode_need(&p, end, info_len, bad); 4532 info_p = p; 4533 info_end = p + info_len; 4534 p = info_end; 4535 4536 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 4537 fscid = ceph_decode_32(&info_p); 4538 namelen = ceph_decode_32(&info_p); 4539 ceph_decode_need(&info_p, info_end, namelen, bad); 4540 4541 if (mds_namespace && 4542 strlen(mds_namespace) == namelen && 4543 !strncmp(mds_namespace, (char *)info_p, namelen)) { 4544 mount_fscid = fscid; 4545 break; 4546 } 4547 } 4548 4549 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 4550 if (mount_fscid != (u32)-1) { 4551 fsc->client->monc.fs_cluster_id = mount_fscid; 4552 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 4553 0, true); 4554 ceph_monc_renew_subs(&fsc->client->monc); 4555 } else { 4556 err = -ENOENT; 4557 goto err_out; 4558 } 4559 return; 4560 4561 bad: 4562 pr_err("error decoding fsmap\n"); 4563 err_out: 4564 mutex_lock(&mdsc->mutex); 4565 mdsc->mdsmap_err = err; 4566 __wake_requests(mdsc, &mdsc->waiting_for_map); 4567 mutex_unlock(&mdsc->mutex); 4568 } 4569 4570 /* 4571 * handle mds map update. 4572 */ 4573 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4574 { 4575 u32 epoch; 4576 u32 maplen; 4577 void *p = msg->front.iov_base; 4578 void *end = p + msg->front.iov_len; 4579 struct ceph_mdsmap *newmap, *oldmap; 4580 struct ceph_fsid fsid; 4581 int err = -EINVAL; 4582 4583 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 4584 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 4585 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 4586 return; 4587 epoch = ceph_decode_32(&p); 4588 maplen = ceph_decode_32(&p); 4589 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 4590 4591 /* do we need it? */ 4592 mutex_lock(&mdsc->mutex); 4593 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 4594 dout("handle_map epoch %u <= our %u\n", 4595 epoch, mdsc->mdsmap->m_epoch); 4596 mutex_unlock(&mdsc->mutex); 4597 return; 4598 } 4599 4600 newmap = ceph_mdsmap_decode(&p, end); 4601 if (IS_ERR(newmap)) { 4602 err = PTR_ERR(newmap); 4603 goto bad_unlock; 4604 } 4605 4606 /* swap into place */ 4607 if (mdsc->mdsmap) { 4608 oldmap = mdsc->mdsmap; 4609 mdsc->mdsmap = newmap; 4610 check_new_map(mdsc, newmap, oldmap); 4611 ceph_mdsmap_destroy(oldmap); 4612 } else { 4613 mdsc->mdsmap = newmap; /* first mds map */ 4614 } 4615 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 4616 MAX_LFS_FILESIZE); 4617 4618 __wake_requests(mdsc, &mdsc->waiting_for_map); 4619 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 4620 mdsc->mdsmap->m_epoch); 4621 4622 mutex_unlock(&mdsc->mutex); 4623 schedule_delayed(mdsc); 4624 return; 4625 4626 bad_unlock: 4627 mutex_unlock(&mdsc->mutex); 4628 bad: 4629 pr_err("error decoding mdsmap %d\n", err); 4630 return; 4631 } 4632 4633 static struct ceph_connection *con_get(struct ceph_connection *con) 4634 { 4635 struct ceph_mds_session *s = con->private; 4636 4637 if (ceph_get_mds_session(s)) 4638 return con; 4639 return NULL; 4640 } 4641 4642 static void con_put(struct ceph_connection *con) 4643 { 4644 struct ceph_mds_session *s = con->private; 4645 4646 ceph_put_mds_session(s); 4647 } 4648 4649 /* 4650 * if the client is unresponsive for long enough, the mds will kill 4651 * the session entirely. 4652 */ 4653 static void peer_reset(struct ceph_connection *con) 4654 { 4655 struct ceph_mds_session *s = con->private; 4656 struct ceph_mds_client *mdsc = s->s_mdsc; 4657 4658 pr_warn("mds%d closed our session\n", s->s_mds); 4659 send_mds_reconnect(mdsc, s); 4660 } 4661 4662 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 4663 { 4664 struct ceph_mds_session *s = con->private; 4665 struct ceph_mds_client *mdsc = s->s_mdsc; 4666 int type = le16_to_cpu(msg->hdr.type); 4667 4668 mutex_lock(&mdsc->mutex); 4669 if (__verify_registered_session(mdsc, s) < 0) { 4670 mutex_unlock(&mdsc->mutex); 4671 goto out; 4672 } 4673 mutex_unlock(&mdsc->mutex); 4674 4675 switch (type) { 4676 case CEPH_MSG_MDS_MAP: 4677 ceph_mdsc_handle_mdsmap(mdsc, msg); 4678 break; 4679 case CEPH_MSG_FS_MAP_USER: 4680 ceph_mdsc_handle_fsmap(mdsc, msg); 4681 break; 4682 case CEPH_MSG_CLIENT_SESSION: 4683 handle_session(s, msg); 4684 break; 4685 case CEPH_MSG_CLIENT_REPLY: 4686 handle_reply(s, msg); 4687 break; 4688 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 4689 handle_forward(mdsc, s, msg); 4690 break; 4691 case CEPH_MSG_CLIENT_CAPS: 4692 ceph_handle_caps(s, msg); 4693 break; 4694 case CEPH_MSG_CLIENT_SNAP: 4695 ceph_handle_snap(mdsc, s, msg); 4696 break; 4697 case CEPH_MSG_CLIENT_LEASE: 4698 handle_lease(mdsc, s, msg); 4699 break; 4700 case CEPH_MSG_CLIENT_QUOTA: 4701 ceph_handle_quota(mdsc, s, msg); 4702 break; 4703 4704 default: 4705 pr_err("received unknown message type %d %s\n", type, 4706 ceph_msg_type_name(type)); 4707 } 4708 out: 4709 ceph_msg_put(msg); 4710 } 4711 4712 /* 4713 * authentication 4714 */ 4715 4716 /* 4717 * Note: returned pointer is the address of a structure that's 4718 * managed separately. Caller must *not* attempt to free it. 4719 */ 4720 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 4721 int *proto, int force_new) 4722 { 4723 struct ceph_mds_session *s = con->private; 4724 struct ceph_mds_client *mdsc = s->s_mdsc; 4725 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4726 struct ceph_auth_handshake *auth = &s->s_auth; 4727 4728 if (force_new && auth->authorizer) { 4729 ceph_auth_destroy_authorizer(auth->authorizer); 4730 auth->authorizer = NULL; 4731 } 4732 if (!auth->authorizer) { 4733 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 4734 auth); 4735 if (ret) 4736 return ERR_PTR(ret); 4737 } else { 4738 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 4739 auth); 4740 if (ret) 4741 return ERR_PTR(ret); 4742 } 4743 *proto = ac->protocol; 4744 4745 return auth; 4746 } 4747 4748 static int add_authorizer_challenge(struct ceph_connection *con, 4749 void *challenge_buf, int challenge_buf_len) 4750 { 4751 struct ceph_mds_session *s = con->private; 4752 struct ceph_mds_client *mdsc = s->s_mdsc; 4753 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4754 4755 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 4756 challenge_buf, challenge_buf_len); 4757 } 4758 4759 static int verify_authorizer_reply(struct ceph_connection *con) 4760 { 4761 struct ceph_mds_session *s = con->private; 4762 struct ceph_mds_client *mdsc = s->s_mdsc; 4763 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4764 4765 return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer); 4766 } 4767 4768 static int invalidate_authorizer(struct ceph_connection *con) 4769 { 4770 struct ceph_mds_session *s = con->private; 4771 struct ceph_mds_client *mdsc = s->s_mdsc; 4772 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4773 4774 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 4775 4776 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 4777 } 4778 4779 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 4780 struct ceph_msg_header *hdr, int *skip) 4781 { 4782 struct ceph_msg *msg; 4783 int type = (int) le16_to_cpu(hdr->type); 4784 int front_len = (int) le32_to_cpu(hdr->front_len); 4785 4786 if (con->in_msg) 4787 return con->in_msg; 4788 4789 *skip = 0; 4790 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 4791 if (!msg) { 4792 pr_err("unable to allocate msg type %d len %d\n", 4793 type, front_len); 4794 return NULL; 4795 } 4796 4797 return msg; 4798 } 4799 4800 static int mds_sign_message(struct ceph_msg *msg) 4801 { 4802 struct ceph_mds_session *s = msg->con->private; 4803 struct ceph_auth_handshake *auth = &s->s_auth; 4804 4805 return ceph_auth_sign_message(auth, msg); 4806 } 4807 4808 static int mds_check_message_signature(struct ceph_msg *msg) 4809 { 4810 struct ceph_mds_session *s = msg->con->private; 4811 struct ceph_auth_handshake *auth = &s->s_auth; 4812 4813 return ceph_auth_check_message_signature(auth, msg); 4814 } 4815 4816 static const struct ceph_connection_operations mds_con_ops = { 4817 .get = con_get, 4818 .put = con_put, 4819 .dispatch = dispatch, 4820 .get_authorizer = get_authorizer, 4821 .add_authorizer_challenge = add_authorizer_challenge, 4822 .verify_authorizer_reply = verify_authorizer_reply, 4823 .invalidate_authorizer = invalidate_authorizer, 4824 .peer_reset = peer_reset, 4825 .alloc_msg = mds_alloc_msg, 4826 .sign_message = mds_sign_message, 4827 .check_message_signature = mds_check_message_signature, 4828 }; 4829 4830 /* eof */ 4831