1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 14 #include "super.h" 15 #include "mds_client.h" 16 17 #include <linux/ceph/ceph_features.h> 18 #include <linux/ceph/messenger.h> 19 #include <linux/ceph/decode.h> 20 #include <linux/ceph/pagelist.h> 21 #include <linux/ceph/auth.h> 22 #include <linux/ceph/debugfs.h> 23 24 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 25 26 /* 27 * A cluster of MDS (metadata server) daemons is responsible for 28 * managing the file system namespace (the directory hierarchy and 29 * inodes) and for coordinating shared access to storage. Metadata is 30 * partitioning hierarchically across a number of servers, and that 31 * partition varies over time as the cluster adjusts the distribution 32 * in order to balance load. 33 * 34 * The MDS client is primarily responsible to managing synchronous 35 * metadata requests for operations like open, unlink, and so forth. 36 * If there is a MDS failure, we find out about it when we (possibly 37 * request and) receive a new MDS map, and can resubmit affected 38 * requests. 39 * 40 * For the most part, though, we take advantage of a lossless 41 * communications channel to the MDS, and do not need to worry about 42 * timing out or resubmitting requests. 43 * 44 * We maintain a stateful "session" with each MDS we interact with. 45 * Within each session, we sent periodic heartbeat messages to ensure 46 * any capabilities or leases we have been issues remain valid. If 47 * the session times out and goes stale, our leases and capabilities 48 * are no longer valid. 49 */ 50 51 struct ceph_reconnect_state { 52 struct ceph_mds_session *session; 53 int nr_caps, nr_realms; 54 struct ceph_pagelist *pagelist; 55 unsigned msg_version; 56 bool allow_multi; 57 }; 58 59 static void __wake_requests(struct ceph_mds_client *mdsc, 60 struct list_head *head); 61 static void ceph_cap_release_work(struct work_struct *work); 62 static void ceph_cap_reclaim_work(struct work_struct *work); 63 64 static const struct ceph_connection_operations mds_con_ops; 65 66 67 /* 68 * mds reply parsing 69 */ 70 71 static int parse_reply_info_quota(void **p, void *end, 72 struct ceph_mds_reply_info_in *info) 73 { 74 u8 struct_v, struct_compat; 75 u32 struct_len; 76 77 ceph_decode_8_safe(p, end, struct_v, bad); 78 ceph_decode_8_safe(p, end, struct_compat, bad); 79 /* struct_v is expected to be >= 1. we only 80 * understand encoding with struct_compat == 1. */ 81 if (!struct_v || struct_compat != 1) 82 goto bad; 83 ceph_decode_32_safe(p, end, struct_len, bad); 84 ceph_decode_need(p, end, struct_len, bad); 85 end = *p + struct_len; 86 ceph_decode_64_safe(p, end, info->max_bytes, bad); 87 ceph_decode_64_safe(p, end, info->max_files, bad); 88 *p = end; 89 return 0; 90 bad: 91 return -EIO; 92 } 93 94 /* 95 * parse individual inode info 96 */ 97 static int parse_reply_info_in(void **p, void *end, 98 struct ceph_mds_reply_info_in *info, 99 u64 features) 100 { 101 int err = 0; 102 u8 struct_v = 0; 103 104 if (features == (u64)-1) { 105 u32 struct_len; 106 u8 struct_compat; 107 ceph_decode_8_safe(p, end, struct_v, bad); 108 ceph_decode_8_safe(p, end, struct_compat, bad); 109 /* struct_v is expected to be >= 1. we only understand 110 * encoding with struct_compat == 1. */ 111 if (!struct_v || struct_compat != 1) 112 goto bad; 113 ceph_decode_32_safe(p, end, struct_len, bad); 114 ceph_decode_need(p, end, struct_len, bad); 115 end = *p + struct_len; 116 } 117 118 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 119 info->in = *p; 120 *p += sizeof(struct ceph_mds_reply_inode) + 121 sizeof(*info->in->fragtree.splits) * 122 le32_to_cpu(info->in->fragtree.nsplits); 123 124 ceph_decode_32_safe(p, end, info->symlink_len, bad); 125 ceph_decode_need(p, end, info->symlink_len, bad); 126 info->symlink = *p; 127 *p += info->symlink_len; 128 129 ceph_decode_copy_safe(p, end, &info->dir_layout, 130 sizeof(info->dir_layout), bad); 131 ceph_decode_32_safe(p, end, info->xattr_len, bad); 132 ceph_decode_need(p, end, info->xattr_len, bad); 133 info->xattr_data = *p; 134 *p += info->xattr_len; 135 136 if (features == (u64)-1) { 137 /* inline data */ 138 ceph_decode_64_safe(p, end, info->inline_version, bad); 139 ceph_decode_32_safe(p, end, info->inline_len, bad); 140 ceph_decode_need(p, end, info->inline_len, bad); 141 info->inline_data = *p; 142 *p += info->inline_len; 143 /* quota */ 144 err = parse_reply_info_quota(p, end, info); 145 if (err < 0) 146 goto out_bad; 147 /* pool namespace */ 148 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 149 if (info->pool_ns_len > 0) { 150 ceph_decode_need(p, end, info->pool_ns_len, bad); 151 info->pool_ns_data = *p; 152 *p += info->pool_ns_len; 153 } 154 155 /* btime */ 156 ceph_decode_need(p, end, sizeof(info->btime), bad); 157 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 158 159 /* change attribute */ 160 ceph_decode_64_safe(p, end, info->change_attr, bad); 161 162 /* dir pin */ 163 if (struct_v >= 2) { 164 ceph_decode_32_safe(p, end, info->dir_pin, bad); 165 } else { 166 info->dir_pin = -ENODATA; 167 } 168 169 /* snapshot birth time, remains zero for v<=2 */ 170 if (struct_v >= 3) { 171 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 172 ceph_decode_copy(p, &info->snap_btime, 173 sizeof(info->snap_btime)); 174 } else { 175 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 176 } 177 178 *p = end; 179 } else { 180 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 181 ceph_decode_64_safe(p, end, info->inline_version, bad); 182 ceph_decode_32_safe(p, end, info->inline_len, bad); 183 ceph_decode_need(p, end, info->inline_len, bad); 184 info->inline_data = *p; 185 *p += info->inline_len; 186 } else 187 info->inline_version = CEPH_INLINE_NONE; 188 189 if (features & CEPH_FEATURE_MDS_QUOTA) { 190 err = parse_reply_info_quota(p, end, info); 191 if (err < 0) 192 goto out_bad; 193 } else { 194 info->max_bytes = 0; 195 info->max_files = 0; 196 } 197 198 info->pool_ns_len = 0; 199 info->pool_ns_data = NULL; 200 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 201 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 202 if (info->pool_ns_len > 0) { 203 ceph_decode_need(p, end, info->pool_ns_len, bad); 204 info->pool_ns_data = *p; 205 *p += info->pool_ns_len; 206 } 207 } 208 209 if (features & CEPH_FEATURE_FS_BTIME) { 210 ceph_decode_need(p, end, sizeof(info->btime), bad); 211 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 212 ceph_decode_64_safe(p, end, info->change_attr, bad); 213 } 214 215 info->dir_pin = -ENODATA; 216 /* info->snap_btime remains zero */ 217 } 218 return 0; 219 bad: 220 err = -EIO; 221 out_bad: 222 return err; 223 } 224 225 static int parse_reply_info_dir(void **p, void *end, 226 struct ceph_mds_reply_dirfrag **dirfrag, 227 u64 features) 228 { 229 if (features == (u64)-1) { 230 u8 struct_v, struct_compat; 231 u32 struct_len; 232 ceph_decode_8_safe(p, end, struct_v, bad); 233 ceph_decode_8_safe(p, end, struct_compat, bad); 234 /* struct_v is expected to be >= 1. we only understand 235 * encoding whose struct_compat == 1. */ 236 if (!struct_v || struct_compat != 1) 237 goto bad; 238 ceph_decode_32_safe(p, end, struct_len, bad); 239 ceph_decode_need(p, end, struct_len, bad); 240 end = *p + struct_len; 241 } 242 243 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 244 *dirfrag = *p; 245 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 246 if (unlikely(*p > end)) 247 goto bad; 248 if (features == (u64)-1) 249 *p = end; 250 return 0; 251 bad: 252 return -EIO; 253 } 254 255 static int parse_reply_info_lease(void **p, void *end, 256 struct ceph_mds_reply_lease **lease, 257 u64 features) 258 { 259 if (features == (u64)-1) { 260 u8 struct_v, struct_compat; 261 u32 struct_len; 262 ceph_decode_8_safe(p, end, struct_v, bad); 263 ceph_decode_8_safe(p, end, struct_compat, bad); 264 /* struct_v is expected to be >= 1. we only understand 265 * encoding whose struct_compat == 1. */ 266 if (!struct_v || struct_compat != 1) 267 goto bad; 268 ceph_decode_32_safe(p, end, struct_len, bad); 269 ceph_decode_need(p, end, struct_len, bad); 270 end = *p + struct_len; 271 } 272 273 ceph_decode_need(p, end, sizeof(**lease), bad); 274 *lease = *p; 275 *p += sizeof(**lease); 276 if (features == (u64)-1) 277 *p = end; 278 return 0; 279 bad: 280 return -EIO; 281 } 282 283 /* 284 * parse a normal reply, which may contain a (dir+)dentry and/or a 285 * target inode. 286 */ 287 static int parse_reply_info_trace(void **p, void *end, 288 struct ceph_mds_reply_info_parsed *info, 289 u64 features) 290 { 291 int err; 292 293 if (info->head->is_dentry) { 294 err = parse_reply_info_in(p, end, &info->diri, features); 295 if (err < 0) 296 goto out_bad; 297 298 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 299 if (err < 0) 300 goto out_bad; 301 302 ceph_decode_32_safe(p, end, info->dname_len, bad); 303 ceph_decode_need(p, end, info->dname_len, bad); 304 info->dname = *p; 305 *p += info->dname_len; 306 307 err = parse_reply_info_lease(p, end, &info->dlease, features); 308 if (err < 0) 309 goto out_bad; 310 } 311 312 if (info->head->is_target) { 313 err = parse_reply_info_in(p, end, &info->targeti, features); 314 if (err < 0) 315 goto out_bad; 316 } 317 318 if (unlikely(*p != end)) 319 goto bad; 320 return 0; 321 322 bad: 323 err = -EIO; 324 out_bad: 325 pr_err("problem parsing mds trace %d\n", err); 326 return err; 327 } 328 329 /* 330 * parse readdir results 331 */ 332 static int parse_reply_info_readdir(void **p, void *end, 333 struct ceph_mds_reply_info_parsed *info, 334 u64 features) 335 { 336 u32 num, i = 0; 337 int err; 338 339 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 340 if (err < 0) 341 goto out_bad; 342 343 ceph_decode_need(p, end, sizeof(num) + 2, bad); 344 num = ceph_decode_32(p); 345 { 346 u16 flags = ceph_decode_16(p); 347 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 348 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 349 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 350 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 351 } 352 if (num == 0) 353 goto done; 354 355 BUG_ON(!info->dir_entries); 356 if ((unsigned long)(info->dir_entries + num) > 357 (unsigned long)info->dir_entries + info->dir_buf_size) { 358 pr_err("dir contents are larger than expected\n"); 359 WARN_ON(1); 360 goto bad; 361 } 362 363 info->dir_nr = num; 364 while (num) { 365 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 366 /* dentry */ 367 ceph_decode_32_safe(p, end, rde->name_len, bad); 368 ceph_decode_need(p, end, rde->name_len, bad); 369 rde->name = *p; 370 *p += rde->name_len; 371 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 372 373 /* dentry lease */ 374 err = parse_reply_info_lease(p, end, &rde->lease, features); 375 if (err) 376 goto out_bad; 377 /* inode */ 378 err = parse_reply_info_in(p, end, &rde->inode, features); 379 if (err < 0) 380 goto out_bad; 381 /* ceph_readdir_prepopulate() will update it */ 382 rde->offset = 0; 383 i++; 384 num--; 385 } 386 387 done: 388 /* Skip over any unrecognized fields */ 389 *p = end; 390 return 0; 391 392 bad: 393 err = -EIO; 394 out_bad: 395 pr_err("problem parsing dir contents %d\n", err); 396 return err; 397 } 398 399 /* 400 * parse fcntl F_GETLK results 401 */ 402 static int parse_reply_info_filelock(void **p, void *end, 403 struct ceph_mds_reply_info_parsed *info, 404 u64 features) 405 { 406 if (*p + sizeof(*info->filelock_reply) > end) 407 goto bad; 408 409 info->filelock_reply = *p; 410 411 /* Skip over any unrecognized fields */ 412 *p = end; 413 return 0; 414 bad: 415 return -EIO; 416 } 417 418 419 #if BITS_PER_LONG == 64 420 421 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 422 423 static int ceph_parse_deleg_inos(void **p, void *end, 424 struct ceph_mds_session *s) 425 { 426 u32 sets; 427 428 ceph_decode_32_safe(p, end, sets, bad); 429 dout("got %u sets of delegated inodes\n", sets); 430 while (sets--) { 431 u64 start, len, ino; 432 433 ceph_decode_64_safe(p, end, start, bad); 434 ceph_decode_64_safe(p, end, len, bad); 435 while (len--) { 436 int err = xa_insert(&s->s_delegated_inos, ino = start++, 437 DELEGATED_INO_AVAILABLE, 438 GFP_KERNEL); 439 if (!err) { 440 dout("added delegated inode 0x%llx\n", 441 start - 1); 442 } else if (err == -EBUSY) { 443 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n", 444 start - 1); 445 } else { 446 return err; 447 } 448 } 449 } 450 return 0; 451 bad: 452 return -EIO; 453 } 454 455 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 456 { 457 unsigned long ino; 458 void *val; 459 460 xa_for_each(&s->s_delegated_inos, ino, val) { 461 val = xa_erase(&s->s_delegated_inos, ino); 462 if (val == DELEGATED_INO_AVAILABLE) 463 return ino; 464 } 465 return 0; 466 } 467 468 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 469 { 470 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 471 GFP_KERNEL); 472 } 473 #else /* BITS_PER_LONG == 64 */ 474 /* 475 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 476 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 477 * and bottom words? 478 */ 479 static int ceph_parse_deleg_inos(void **p, void *end, 480 struct ceph_mds_session *s) 481 { 482 u32 sets; 483 484 ceph_decode_32_safe(p, end, sets, bad); 485 if (sets) 486 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 487 return 0; 488 bad: 489 return -EIO; 490 } 491 492 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 493 { 494 return 0; 495 } 496 497 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 498 { 499 return 0; 500 } 501 #endif /* BITS_PER_LONG == 64 */ 502 503 /* 504 * parse create results 505 */ 506 static int parse_reply_info_create(void **p, void *end, 507 struct ceph_mds_reply_info_parsed *info, 508 u64 features, struct ceph_mds_session *s) 509 { 510 int ret; 511 512 if (features == (u64)-1 || 513 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 514 if (*p == end) { 515 /* Malformed reply? */ 516 info->has_create_ino = false; 517 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 518 u8 struct_v, struct_compat; 519 u32 len; 520 521 info->has_create_ino = true; 522 ceph_decode_8_safe(p, end, struct_v, bad); 523 ceph_decode_8_safe(p, end, struct_compat, bad); 524 ceph_decode_32_safe(p, end, len, bad); 525 ceph_decode_64_safe(p, end, info->ino, bad); 526 ret = ceph_parse_deleg_inos(p, end, s); 527 if (ret) 528 return ret; 529 } else { 530 /* legacy */ 531 ceph_decode_64_safe(p, end, info->ino, bad); 532 info->has_create_ino = true; 533 } 534 } else { 535 if (*p != end) 536 goto bad; 537 } 538 539 /* Skip over any unrecognized fields */ 540 *p = end; 541 return 0; 542 bad: 543 return -EIO; 544 } 545 546 /* 547 * parse extra results 548 */ 549 static int parse_reply_info_extra(void **p, void *end, 550 struct ceph_mds_reply_info_parsed *info, 551 u64 features, struct ceph_mds_session *s) 552 { 553 u32 op = le32_to_cpu(info->head->op); 554 555 if (op == CEPH_MDS_OP_GETFILELOCK) 556 return parse_reply_info_filelock(p, end, info, features); 557 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 558 return parse_reply_info_readdir(p, end, info, features); 559 else if (op == CEPH_MDS_OP_CREATE) 560 return parse_reply_info_create(p, end, info, features, s); 561 else 562 return -EIO; 563 } 564 565 /* 566 * parse entire mds reply 567 */ 568 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 569 struct ceph_mds_reply_info_parsed *info, 570 u64 features) 571 { 572 void *p, *end; 573 u32 len; 574 int err; 575 576 info->head = msg->front.iov_base; 577 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 578 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 579 580 /* trace */ 581 ceph_decode_32_safe(&p, end, len, bad); 582 if (len > 0) { 583 ceph_decode_need(&p, end, len, bad); 584 err = parse_reply_info_trace(&p, p+len, info, features); 585 if (err < 0) 586 goto out_bad; 587 } 588 589 /* extra */ 590 ceph_decode_32_safe(&p, end, len, bad); 591 if (len > 0) { 592 ceph_decode_need(&p, end, len, bad); 593 err = parse_reply_info_extra(&p, p+len, info, features, s); 594 if (err < 0) 595 goto out_bad; 596 } 597 598 /* snap blob */ 599 ceph_decode_32_safe(&p, end, len, bad); 600 info->snapblob_len = len; 601 info->snapblob = p; 602 p += len; 603 604 if (p != end) 605 goto bad; 606 return 0; 607 608 bad: 609 err = -EIO; 610 out_bad: 611 pr_err("mds parse_reply err %d\n", err); 612 return err; 613 } 614 615 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 616 { 617 if (!info->dir_entries) 618 return; 619 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 620 } 621 622 623 /* 624 * sessions 625 */ 626 const char *ceph_session_state_name(int s) 627 { 628 switch (s) { 629 case CEPH_MDS_SESSION_NEW: return "new"; 630 case CEPH_MDS_SESSION_OPENING: return "opening"; 631 case CEPH_MDS_SESSION_OPEN: return "open"; 632 case CEPH_MDS_SESSION_HUNG: return "hung"; 633 case CEPH_MDS_SESSION_CLOSING: return "closing"; 634 case CEPH_MDS_SESSION_CLOSED: return "closed"; 635 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 636 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 637 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 638 default: return "???"; 639 } 640 } 641 642 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 643 { 644 if (refcount_inc_not_zero(&s->s_ref)) { 645 dout("mdsc get_session %p %d -> %d\n", s, 646 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref)); 647 return s; 648 } else { 649 dout("mdsc get_session %p 0 -- FAIL\n", s); 650 return NULL; 651 } 652 } 653 654 void ceph_put_mds_session(struct ceph_mds_session *s) 655 { 656 dout("mdsc put_session %p %d -> %d\n", s, 657 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1); 658 if (refcount_dec_and_test(&s->s_ref)) { 659 if (s->s_auth.authorizer) 660 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 661 xa_destroy(&s->s_delegated_inos); 662 kfree(s); 663 } 664 } 665 666 /* 667 * called under mdsc->mutex 668 */ 669 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 670 int mds) 671 { 672 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 673 return NULL; 674 return ceph_get_mds_session(mdsc->sessions[mds]); 675 } 676 677 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 678 { 679 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 680 return false; 681 else 682 return true; 683 } 684 685 static int __verify_registered_session(struct ceph_mds_client *mdsc, 686 struct ceph_mds_session *s) 687 { 688 if (s->s_mds >= mdsc->max_sessions || 689 mdsc->sessions[s->s_mds] != s) 690 return -ENOENT; 691 return 0; 692 } 693 694 /* 695 * create+register a new session for given mds. 696 * called under mdsc->mutex. 697 */ 698 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 699 int mds) 700 { 701 struct ceph_mds_session *s; 702 703 if (mds >= mdsc->mdsmap->possible_max_rank) 704 return ERR_PTR(-EINVAL); 705 706 s = kzalloc(sizeof(*s), GFP_NOFS); 707 if (!s) 708 return ERR_PTR(-ENOMEM); 709 710 if (mds >= mdsc->max_sessions) { 711 int newmax = 1 << get_count_order(mds + 1); 712 struct ceph_mds_session **sa; 713 714 dout("%s: realloc to %d\n", __func__, newmax); 715 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 716 if (!sa) 717 goto fail_realloc; 718 if (mdsc->sessions) { 719 memcpy(sa, mdsc->sessions, 720 mdsc->max_sessions * sizeof(void *)); 721 kfree(mdsc->sessions); 722 } 723 mdsc->sessions = sa; 724 mdsc->max_sessions = newmax; 725 } 726 727 dout("%s: mds%d\n", __func__, mds); 728 s->s_mdsc = mdsc; 729 s->s_mds = mds; 730 s->s_state = CEPH_MDS_SESSION_NEW; 731 s->s_ttl = 0; 732 s->s_seq = 0; 733 mutex_init(&s->s_mutex); 734 735 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 736 737 spin_lock_init(&s->s_gen_ttl_lock); 738 s->s_cap_gen = 1; 739 s->s_cap_ttl = jiffies - 1; 740 741 spin_lock_init(&s->s_cap_lock); 742 s->s_renew_requested = 0; 743 s->s_renew_seq = 0; 744 INIT_LIST_HEAD(&s->s_caps); 745 s->s_nr_caps = 0; 746 refcount_set(&s->s_ref, 1); 747 INIT_LIST_HEAD(&s->s_waiting); 748 INIT_LIST_HEAD(&s->s_unsafe); 749 xa_init(&s->s_delegated_inos); 750 s->s_num_cap_releases = 0; 751 s->s_cap_reconnect = 0; 752 s->s_cap_iterator = NULL; 753 INIT_LIST_HEAD(&s->s_cap_releases); 754 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 755 756 INIT_LIST_HEAD(&s->s_cap_flushing); 757 758 mdsc->sessions[mds] = s; 759 atomic_inc(&mdsc->num_sessions); 760 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 761 762 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 763 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 764 765 return s; 766 767 fail_realloc: 768 kfree(s); 769 return ERR_PTR(-ENOMEM); 770 } 771 772 /* 773 * called under mdsc->mutex 774 */ 775 static void __unregister_session(struct ceph_mds_client *mdsc, 776 struct ceph_mds_session *s) 777 { 778 dout("__unregister_session mds%d %p\n", s->s_mds, s); 779 BUG_ON(mdsc->sessions[s->s_mds] != s); 780 mdsc->sessions[s->s_mds] = NULL; 781 ceph_con_close(&s->s_con); 782 ceph_put_mds_session(s); 783 atomic_dec(&mdsc->num_sessions); 784 } 785 786 /* 787 * drop session refs in request. 788 * 789 * should be last request ref, or hold mdsc->mutex 790 */ 791 static void put_request_session(struct ceph_mds_request *req) 792 { 793 if (req->r_session) { 794 ceph_put_mds_session(req->r_session); 795 req->r_session = NULL; 796 } 797 } 798 799 void ceph_mdsc_release_request(struct kref *kref) 800 { 801 struct ceph_mds_request *req = container_of(kref, 802 struct ceph_mds_request, 803 r_kref); 804 ceph_mdsc_release_dir_caps(req); 805 destroy_reply_info(&req->r_reply_info); 806 if (req->r_request) 807 ceph_msg_put(req->r_request); 808 if (req->r_reply) 809 ceph_msg_put(req->r_reply); 810 if (req->r_inode) { 811 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 812 /* avoid calling iput_final() in mds dispatch threads */ 813 ceph_async_iput(req->r_inode); 814 } 815 if (req->r_parent) { 816 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 817 ceph_async_iput(req->r_parent); 818 } 819 ceph_async_iput(req->r_target_inode); 820 if (req->r_dentry) 821 dput(req->r_dentry); 822 if (req->r_old_dentry) 823 dput(req->r_old_dentry); 824 if (req->r_old_dentry_dir) { 825 /* 826 * track (and drop pins for) r_old_dentry_dir 827 * separately, since r_old_dentry's d_parent may have 828 * changed between the dir mutex being dropped and 829 * this request being freed. 830 */ 831 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 832 CEPH_CAP_PIN); 833 ceph_async_iput(req->r_old_dentry_dir); 834 } 835 kfree(req->r_path1); 836 kfree(req->r_path2); 837 if (req->r_pagelist) 838 ceph_pagelist_release(req->r_pagelist); 839 put_request_session(req); 840 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 841 WARN_ON_ONCE(!list_empty(&req->r_wait)); 842 kmem_cache_free(ceph_mds_request_cachep, req); 843 } 844 845 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 846 847 /* 848 * lookup session, bump ref if found. 849 * 850 * called under mdsc->mutex. 851 */ 852 static struct ceph_mds_request * 853 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 854 { 855 struct ceph_mds_request *req; 856 857 req = lookup_request(&mdsc->request_tree, tid); 858 if (req) 859 ceph_mdsc_get_request(req); 860 861 return req; 862 } 863 864 /* 865 * Register an in-flight request, and assign a tid. Link to directory 866 * are modifying (if any). 867 * 868 * Called under mdsc->mutex. 869 */ 870 static void __register_request(struct ceph_mds_client *mdsc, 871 struct ceph_mds_request *req, 872 struct inode *dir) 873 { 874 int ret = 0; 875 876 req->r_tid = ++mdsc->last_tid; 877 if (req->r_num_caps) { 878 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 879 req->r_num_caps); 880 if (ret < 0) { 881 pr_err("__register_request %p " 882 "failed to reserve caps: %d\n", req, ret); 883 /* set req->r_err to fail early from __do_request */ 884 req->r_err = ret; 885 return; 886 } 887 } 888 dout("__register_request %p tid %lld\n", req, req->r_tid); 889 ceph_mdsc_get_request(req); 890 insert_request(&mdsc->request_tree, req); 891 892 req->r_uid = current_fsuid(); 893 req->r_gid = current_fsgid(); 894 895 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 896 mdsc->oldest_tid = req->r_tid; 897 898 if (dir) { 899 struct ceph_inode_info *ci = ceph_inode(dir); 900 901 ihold(dir); 902 req->r_unsafe_dir = dir; 903 spin_lock(&ci->i_unsafe_lock); 904 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 905 spin_unlock(&ci->i_unsafe_lock); 906 } 907 } 908 909 static void __unregister_request(struct ceph_mds_client *mdsc, 910 struct ceph_mds_request *req) 911 { 912 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 913 914 /* Never leave an unregistered request on an unsafe list! */ 915 list_del_init(&req->r_unsafe_item); 916 917 if (req->r_tid == mdsc->oldest_tid) { 918 struct rb_node *p = rb_next(&req->r_node); 919 mdsc->oldest_tid = 0; 920 while (p) { 921 struct ceph_mds_request *next_req = 922 rb_entry(p, struct ceph_mds_request, r_node); 923 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 924 mdsc->oldest_tid = next_req->r_tid; 925 break; 926 } 927 p = rb_next(p); 928 } 929 } 930 931 erase_request(&mdsc->request_tree, req); 932 933 if (req->r_unsafe_dir) { 934 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 935 spin_lock(&ci->i_unsafe_lock); 936 list_del_init(&req->r_unsafe_dir_item); 937 spin_unlock(&ci->i_unsafe_lock); 938 } 939 if (req->r_target_inode && 940 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 941 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 942 spin_lock(&ci->i_unsafe_lock); 943 list_del_init(&req->r_unsafe_target_item); 944 spin_unlock(&ci->i_unsafe_lock); 945 } 946 947 if (req->r_unsafe_dir) { 948 /* avoid calling iput_final() in mds dispatch threads */ 949 ceph_async_iput(req->r_unsafe_dir); 950 req->r_unsafe_dir = NULL; 951 } 952 953 complete_all(&req->r_safe_completion); 954 955 ceph_mdsc_put_request(req); 956 } 957 958 /* 959 * Walk back up the dentry tree until we hit a dentry representing a 960 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 961 * when calling this) to ensure that the objects won't disappear while we're 962 * working with them. Once we hit a candidate dentry, we attempt to take a 963 * reference to it, and return that as the result. 964 */ 965 static struct inode *get_nonsnap_parent(struct dentry *dentry) 966 { 967 struct inode *inode = NULL; 968 969 while (dentry && !IS_ROOT(dentry)) { 970 inode = d_inode_rcu(dentry); 971 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 972 break; 973 dentry = dentry->d_parent; 974 } 975 if (inode) 976 inode = igrab(inode); 977 return inode; 978 } 979 980 /* 981 * Choose mds to send request to next. If there is a hint set in the 982 * request (e.g., due to a prior forward hint from the mds), use that. 983 * Otherwise, consult frag tree and/or caps to identify the 984 * appropriate mds. If all else fails, choose randomly. 985 * 986 * Called under mdsc->mutex. 987 */ 988 static int __choose_mds(struct ceph_mds_client *mdsc, 989 struct ceph_mds_request *req, 990 bool *random) 991 { 992 struct inode *inode; 993 struct ceph_inode_info *ci; 994 struct ceph_cap *cap; 995 int mode = req->r_direct_mode; 996 int mds = -1; 997 u32 hash = req->r_direct_hash; 998 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 999 1000 if (random) 1001 *random = false; 1002 1003 /* 1004 * is there a specific mds we should try? ignore hint if we have 1005 * no session and the mds is not up (active or recovering). 1006 */ 1007 if (req->r_resend_mds >= 0 && 1008 (__have_session(mdsc, req->r_resend_mds) || 1009 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1010 dout("%s using resend_mds mds%d\n", __func__, 1011 req->r_resend_mds); 1012 return req->r_resend_mds; 1013 } 1014 1015 if (mode == USE_RANDOM_MDS) 1016 goto random; 1017 1018 inode = NULL; 1019 if (req->r_inode) { 1020 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1021 inode = req->r_inode; 1022 ihold(inode); 1023 } else { 1024 /* req->r_dentry is non-null for LSSNAP request */ 1025 rcu_read_lock(); 1026 inode = get_nonsnap_parent(req->r_dentry); 1027 rcu_read_unlock(); 1028 dout("%s using snapdir's parent %p\n", __func__, inode); 1029 } 1030 } else if (req->r_dentry) { 1031 /* ignore race with rename; old or new d_parent is okay */ 1032 struct dentry *parent; 1033 struct inode *dir; 1034 1035 rcu_read_lock(); 1036 parent = READ_ONCE(req->r_dentry->d_parent); 1037 dir = req->r_parent ? : d_inode_rcu(parent); 1038 1039 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1040 /* not this fs or parent went negative */ 1041 inode = d_inode(req->r_dentry); 1042 if (inode) 1043 ihold(inode); 1044 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1045 /* direct snapped/virtual snapdir requests 1046 * based on parent dir inode */ 1047 inode = get_nonsnap_parent(parent); 1048 dout("%s using nonsnap parent %p\n", __func__, inode); 1049 } else { 1050 /* dentry target */ 1051 inode = d_inode(req->r_dentry); 1052 if (!inode || mode == USE_AUTH_MDS) { 1053 /* dir + name */ 1054 inode = igrab(dir); 1055 hash = ceph_dentry_hash(dir, req->r_dentry); 1056 is_hash = true; 1057 } else { 1058 ihold(inode); 1059 } 1060 } 1061 rcu_read_unlock(); 1062 } 1063 1064 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, 1065 hash, mode); 1066 if (!inode) 1067 goto random; 1068 ci = ceph_inode(inode); 1069 1070 if (is_hash && S_ISDIR(inode->i_mode)) { 1071 struct ceph_inode_frag frag; 1072 int found; 1073 1074 ceph_choose_frag(ci, hash, &frag, &found); 1075 if (found) { 1076 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1077 u8 r; 1078 1079 /* choose a random replica */ 1080 get_random_bytes(&r, 1); 1081 r %= frag.ndist; 1082 mds = frag.dist[r]; 1083 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", 1084 __func__, inode, ceph_vinop(inode), 1085 frag.frag, mds, (int)r, frag.ndist); 1086 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1087 CEPH_MDS_STATE_ACTIVE && 1088 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1089 goto out; 1090 } 1091 1092 /* since this file/dir wasn't known to be 1093 * replicated, then we want to look for the 1094 * authoritative mds. */ 1095 if (frag.mds >= 0) { 1096 /* choose auth mds */ 1097 mds = frag.mds; 1098 dout("%s %p %llx.%llx frag %u mds%d (auth)\n", 1099 __func__, inode, ceph_vinop(inode), 1100 frag.frag, mds); 1101 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1102 CEPH_MDS_STATE_ACTIVE) { 1103 if (mode == USE_ANY_MDS && 1104 !ceph_mdsmap_is_laggy(mdsc->mdsmap, 1105 mds)) 1106 goto out; 1107 } 1108 } 1109 mode = USE_AUTH_MDS; 1110 } 1111 } 1112 1113 spin_lock(&ci->i_ceph_lock); 1114 cap = NULL; 1115 if (mode == USE_AUTH_MDS) 1116 cap = ci->i_auth_cap; 1117 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1118 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1119 if (!cap) { 1120 spin_unlock(&ci->i_ceph_lock); 1121 ceph_async_iput(inode); 1122 goto random; 1123 } 1124 mds = cap->session->s_mds; 1125 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, 1126 inode, ceph_vinop(inode), mds, 1127 cap == ci->i_auth_cap ? "auth " : "", cap); 1128 spin_unlock(&ci->i_ceph_lock); 1129 out: 1130 /* avoid calling iput_final() while holding mdsc->mutex or 1131 * in mds dispatch threads */ 1132 ceph_async_iput(inode); 1133 return mds; 1134 1135 random: 1136 if (random) 1137 *random = true; 1138 1139 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1140 dout("%s chose random mds%d\n", __func__, mds); 1141 return mds; 1142 } 1143 1144 1145 /* 1146 * session messages 1147 */ 1148 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 1149 { 1150 struct ceph_msg *msg; 1151 struct ceph_mds_session_head *h; 1152 1153 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1154 false); 1155 if (!msg) { 1156 pr_err("create_session_msg ENOMEM creating msg\n"); 1157 return NULL; 1158 } 1159 h = msg->front.iov_base; 1160 h->op = cpu_to_le32(op); 1161 h->seq = cpu_to_le64(seq); 1162 1163 return msg; 1164 } 1165 1166 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1167 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1168 static void encode_supported_features(void **p, void *end) 1169 { 1170 static const size_t count = ARRAY_SIZE(feature_bits); 1171 1172 if (count > 0) { 1173 size_t i; 1174 size_t size = FEATURE_BYTES(count); 1175 1176 BUG_ON(*p + 4 + size > end); 1177 ceph_encode_32(p, size); 1178 memset(*p, 0, size); 1179 for (i = 0; i < count; i++) 1180 ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8); 1181 *p += size; 1182 } else { 1183 BUG_ON(*p + 4 > end); 1184 ceph_encode_32(p, 0); 1185 } 1186 } 1187 1188 /* 1189 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1190 * to include additional client metadata fields. 1191 */ 1192 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1193 { 1194 struct ceph_msg *msg; 1195 struct ceph_mds_session_head *h; 1196 int i = -1; 1197 int extra_bytes = 0; 1198 int metadata_key_count = 0; 1199 struct ceph_options *opt = mdsc->fsc->client->options; 1200 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1201 size_t size, count; 1202 void *p, *end; 1203 1204 const char* metadata[][2] = { 1205 {"hostname", mdsc->nodename}, 1206 {"kernel_version", init_utsname()->release}, 1207 {"entity_id", opt->name ? : ""}, 1208 {"root", fsopt->server_path ? : "/"}, 1209 {NULL, NULL} 1210 }; 1211 1212 /* Calculate serialized length of metadata */ 1213 extra_bytes = 4; /* map length */ 1214 for (i = 0; metadata[i][0]; ++i) { 1215 extra_bytes += 8 + strlen(metadata[i][0]) + 1216 strlen(metadata[i][1]); 1217 metadata_key_count++; 1218 } 1219 1220 /* supported feature */ 1221 size = 0; 1222 count = ARRAY_SIZE(feature_bits); 1223 if (count > 0) 1224 size = FEATURE_BYTES(count); 1225 extra_bytes += 4 + size; 1226 1227 /* Allocate the message */ 1228 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1229 GFP_NOFS, false); 1230 if (!msg) { 1231 pr_err("create_session_msg ENOMEM creating msg\n"); 1232 return NULL; 1233 } 1234 p = msg->front.iov_base; 1235 end = p + msg->front.iov_len; 1236 1237 h = p; 1238 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1239 h->seq = cpu_to_le64(seq); 1240 1241 /* 1242 * Serialize client metadata into waiting buffer space, using 1243 * the format that userspace expects for map<string, string> 1244 * 1245 * ClientSession messages with metadata are v3 1246 */ 1247 msg->hdr.version = cpu_to_le16(3); 1248 msg->hdr.compat_version = cpu_to_le16(1); 1249 1250 /* The write pointer, following the session_head structure */ 1251 p += sizeof(*h); 1252 1253 /* Number of entries in the map */ 1254 ceph_encode_32(&p, metadata_key_count); 1255 1256 /* Two length-prefixed strings for each entry in the map */ 1257 for (i = 0; metadata[i][0]; ++i) { 1258 size_t const key_len = strlen(metadata[i][0]); 1259 size_t const val_len = strlen(metadata[i][1]); 1260 1261 ceph_encode_32(&p, key_len); 1262 memcpy(p, metadata[i][0], key_len); 1263 p += key_len; 1264 ceph_encode_32(&p, val_len); 1265 memcpy(p, metadata[i][1], val_len); 1266 p += val_len; 1267 } 1268 1269 encode_supported_features(&p, end); 1270 msg->front.iov_len = p - msg->front.iov_base; 1271 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1272 1273 return msg; 1274 } 1275 1276 /* 1277 * send session open request. 1278 * 1279 * called under mdsc->mutex 1280 */ 1281 static int __open_session(struct ceph_mds_client *mdsc, 1282 struct ceph_mds_session *session) 1283 { 1284 struct ceph_msg *msg; 1285 int mstate; 1286 int mds = session->s_mds; 1287 1288 /* wait for mds to go active? */ 1289 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1290 dout("open_session to mds%d (%s)\n", mds, 1291 ceph_mds_state_name(mstate)); 1292 session->s_state = CEPH_MDS_SESSION_OPENING; 1293 session->s_renew_requested = jiffies; 1294 1295 /* send connect message */ 1296 msg = create_session_open_msg(mdsc, session->s_seq); 1297 if (!msg) 1298 return -ENOMEM; 1299 ceph_con_send(&session->s_con, msg); 1300 return 0; 1301 } 1302 1303 /* 1304 * open sessions for any export targets for the given mds 1305 * 1306 * called under mdsc->mutex 1307 */ 1308 static struct ceph_mds_session * 1309 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1310 { 1311 struct ceph_mds_session *session; 1312 1313 session = __ceph_lookup_mds_session(mdsc, target); 1314 if (!session) { 1315 session = register_session(mdsc, target); 1316 if (IS_ERR(session)) 1317 return session; 1318 } 1319 if (session->s_state == CEPH_MDS_SESSION_NEW || 1320 session->s_state == CEPH_MDS_SESSION_CLOSING) 1321 __open_session(mdsc, session); 1322 1323 return session; 1324 } 1325 1326 struct ceph_mds_session * 1327 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1328 { 1329 struct ceph_mds_session *session; 1330 1331 dout("open_export_target_session to mds%d\n", target); 1332 1333 mutex_lock(&mdsc->mutex); 1334 session = __open_export_target_session(mdsc, target); 1335 mutex_unlock(&mdsc->mutex); 1336 1337 return session; 1338 } 1339 1340 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1341 struct ceph_mds_session *session) 1342 { 1343 struct ceph_mds_info *mi; 1344 struct ceph_mds_session *ts; 1345 int i, mds = session->s_mds; 1346 1347 if (mds >= mdsc->mdsmap->possible_max_rank) 1348 return; 1349 1350 mi = &mdsc->mdsmap->m_info[mds]; 1351 dout("open_export_target_sessions for mds%d (%d targets)\n", 1352 session->s_mds, mi->num_export_targets); 1353 1354 for (i = 0; i < mi->num_export_targets; i++) { 1355 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1356 if (!IS_ERR(ts)) 1357 ceph_put_mds_session(ts); 1358 } 1359 } 1360 1361 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1362 struct ceph_mds_session *session) 1363 { 1364 mutex_lock(&mdsc->mutex); 1365 __open_export_target_sessions(mdsc, session); 1366 mutex_unlock(&mdsc->mutex); 1367 } 1368 1369 /* 1370 * session caps 1371 */ 1372 1373 static void detach_cap_releases(struct ceph_mds_session *session, 1374 struct list_head *target) 1375 { 1376 lockdep_assert_held(&session->s_cap_lock); 1377 1378 list_splice_init(&session->s_cap_releases, target); 1379 session->s_num_cap_releases = 0; 1380 dout("dispose_cap_releases mds%d\n", session->s_mds); 1381 } 1382 1383 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1384 struct list_head *dispose) 1385 { 1386 while (!list_empty(dispose)) { 1387 struct ceph_cap *cap; 1388 /* zero out the in-progress message */ 1389 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1390 list_del(&cap->session_caps); 1391 ceph_put_cap(mdsc, cap); 1392 } 1393 } 1394 1395 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1396 struct ceph_mds_session *session) 1397 { 1398 struct ceph_mds_request *req; 1399 struct rb_node *p; 1400 struct ceph_inode_info *ci; 1401 1402 dout("cleanup_session_requests mds%d\n", session->s_mds); 1403 mutex_lock(&mdsc->mutex); 1404 while (!list_empty(&session->s_unsafe)) { 1405 req = list_first_entry(&session->s_unsafe, 1406 struct ceph_mds_request, r_unsafe_item); 1407 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1408 req->r_tid); 1409 if (req->r_target_inode) { 1410 /* dropping unsafe change of inode's attributes */ 1411 ci = ceph_inode(req->r_target_inode); 1412 errseq_set(&ci->i_meta_err, -EIO); 1413 } 1414 if (req->r_unsafe_dir) { 1415 /* dropping unsafe directory operation */ 1416 ci = ceph_inode(req->r_unsafe_dir); 1417 errseq_set(&ci->i_meta_err, -EIO); 1418 } 1419 __unregister_request(mdsc, req); 1420 } 1421 /* zero r_attempts, so kick_requests() will re-send requests */ 1422 p = rb_first(&mdsc->request_tree); 1423 while (p) { 1424 req = rb_entry(p, struct ceph_mds_request, r_node); 1425 p = rb_next(p); 1426 if (req->r_session && 1427 req->r_session->s_mds == session->s_mds) 1428 req->r_attempts = 0; 1429 } 1430 mutex_unlock(&mdsc->mutex); 1431 } 1432 1433 /* 1434 * Helper to safely iterate over all caps associated with a session, with 1435 * special care taken to handle a racing __ceph_remove_cap(). 1436 * 1437 * Caller must hold session s_mutex. 1438 */ 1439 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1440 int (*cb)(struct inode *, struct ceph_cap *, 1441 void *), void *arg) 1442 { 1443 struct list_head *p; 1444 struct ceph_cap *cap; 1445 struct inode *inode, *last_inode = NULL; 1446 struct ceph_cap *old_cap = NULL; 1447 int ret; 1448 1449 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1450 spin_lock(&session->s_cap_lock); 1451 p = session->s_caps.next; 1452 while (p != &session->s_caps) { 1453 cap = list_entry(p, struct ceph_cap, session_caps); 1454 inode = igrab(&cap->ci->vfs_inode); 1455 if (!inode) { 1456 p = p->next; 1457 continue; 1458 } 1459 session->s_cap_iterator = cap; 1460 spin_unlock(&session->s_cap_lock); 1461 1462 if (last_inode) { 1463 /* avoid calling iput_final() while holding 1464 * s_mutex or in mds dispatch threads */ 1465 ceph_async_iput(last_inode); 1466 last_inode = NULL; 1467 } 1468 if (old_cap) { 1469 ceph_put_cap(session->s_mdsc, old_cap); 1470 old_cap = NULL; 1471 } 1472 1473 ret = cb(inode, cap, arg); 1474 last_inode = inode; 1475 1476 spin_lock(&session->s_cap_lock); 1477 p = p->next; 1478 if (!cap->ci) { 1479 dout("iterate_session_caps finishing cap %p removal\n", 1480 cap); 1481 BUG_ON(cap->session != session); 1482 cap->session = NULL; 1483 list_del_init(&cap->session_caps); 1484 session->s_nr_caps--; 1485 if (cap->queue_release) 1486 __ceph_queue_cap_release(session, cap); 1487 else 1488 old_cap = cap; /* put_cap it w/o locks held */ 1489 } 1490 if (ret < 0) 1491 goto out; 1492 } 1493 ret = 0; 1494 out: 1495 session->s_cap_iterator = NULL; 1496 spin_unlock(&session->s_cap_lock); 1497 1498 ceph_async_iput(last_inode); 1499 if (old_cap) 1500 ceph_put_cap(session->s_mdsc, old_cap); 1501 1502 return ret; 1503 } 1504 1505 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1506 void *arg) 1507 { 1508 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg; 1509 struct ceph_inode_info *ci = ceph_inode(inode); 1510 LIST_HEAD(to_remove); 1511 bool dirty_dropped = false; 1512 bool invalidate = false; 1513 1514 dout("removing cap %p, ci is %p, inode is %p\n", 1515 cap, ci, &ci->vfs_inode); 1516 spin_lock(&ci->i_ceph_lock); 1517 __ceph_remove_cap(cap, false); 1518 if (!ci->i_auth_cap) { 1519 struct ceph_cap_flush *cf; 1520 struct ceph_mds_client *mdsc = fsc->mdsc; 1521 1522 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 1523 if (inode->i_data.nrpages > 0) 1524 invalidate = true; 1525 if (ci->i_wrbuffer_ref > 0) 1526 mapping_set_error(&inode->i_data, -EIO); 1527 } 1528 1529 while (!list_empty(&ci->i_cap_flush_list)) { 1530 cf = list_first_entry(&ci->i_cap_flush_list, 1531 struct ceph_cap_flush, i_list); 1532 list_move(&cf->i_list, &to_remove); 1533 } 1534 1535 spin_lock(&mdsc->cap_dirty_lock); 1536 1537 list_for_each_entry(cf, &to_remove, i_list) 1538 list_del(&cf->g_list); 1539 1540 if (!list_empty(&ci->i_dirty_item)) { 1541 pr_warn_ratelimited( 1542 " dropping dirty %s state for %p %lld\n", 1543 ceph_cap_string(ci->i_dirty_caps), 1544 inode, ceph_ino(inode)); 1545 ci->i_dirty_caps = 0; 1546 list_del_init(&ci->i_dirty_item); 1547 dirty_dropped = true; 1548 } 1549 if (!list_empty(&ci->i_flushing_item)) { 1550 pr_warn_ratelimited( 1551 " dropping dirty+flushing %s state for %p %lld\n", 1552 ceph_cap_string(ci->i_flushing_caps), 1553 inode, ceph_ino(inode)); 1554 ci->i_flushing_caps = 0; 1555 list_del_init(&ci->i_flushing_item); 1556 mdsc->num_cap_flushing--; 1557 dirty_dropped = true; 1558 } 1559 spin_unlock(&mdsc->cap_dirty_lock); 1560 1561 if (dirty_dropped) { 1562 errseq_set(&ci->i_meta_err, -EIO); 1563 1564 if (ci->i_wrbuffer_ref_head == 0 && 1565 ci->i_wr_ref == 0 && 1566 ci->i_dirty_caps == 0 && 1567 ci->i_flushing_caps == 0) { 1568 ceph_put_snap_context(ci->i_head_snapc); 1569 ci->i_head_snapc = NULL; 1570 } 1571 } 1572 1573 if (atomic_read(&ci->i_filelock_ref) > 0) { 1574 /* make further file lock syscall return -EIO */ 1575 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK; 1576 pr_warn_ratelimited(" dropping file locks for %p %lld\n", 1577 inode, ceph_ino(inode)); 1578 } 1579 1580 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { 1581 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); 1582 ci->i_prealloc_cap_flush = NULL; 1583 } 1584 } 1585 spin_unlock(&ci->i_ceph_lock); 1586 while (!list_empty(&to_remove)) { 1587 struct ceph_cap_flush *cf; 1588 cf = list_first_entry(&to_remove, 1589 struct ceph_cap_flush, i_list); 1590 list_del(&cf->i_list); 1591 ceph_free_cap_flush(cf); 1592 } 1593 1594 wake_up_all(&ci->i_cap_wq); 1595 if (invalidate) 1596 ceph_queue_invalidate(inode); 1597 if (dirty_dropped) 1598 iput(inode); 1599 return 0; 1600 } 1601 1602 /* 1603 * caller must hold session s_mutex 1604 */ 1605 static void remove_session_caps(struct ceph_mds_session *session) 1606 { 1607 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1608 struct super_block *sb = fsc->sb; 1609 LIST_HEAD(dispose); 1610 1611 dout("remove_session_caps on %p\n", session); 1612 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1613 1614 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1615 1616 spin_lock(&session->s_cap_lock); 1617 if (session->s_nr_caps > 0) { 1618 struct inode *inode; 1619 struct ceph_cap *cap, *prev = NULL; 1620 struct ceph_vino vino; 1621 /* 1622 * iterate_session_caps() skips inodes that are being 1623 * deleted, we need to wait until deletions are complete. 1624 * __wait_on_freeing_inode() is designed for the job, 1625 * but it is not exported, so use lookup inode function 1626 * to access it. 1627 */ 1628 while (!list_empty(&session->s_caps)) { 1629 cap = list_entry(session->s_caps.next, 1630 struct ceph_cap, session_caps); 1631 if (cap == prev) 1632 break; 1633 prev = cap; 1634 vino = cap->ci->i_vino; 1635 spin_unlock(&session->s_cap_lock); 1636 1637 inode = ceph_find_inode(sb, vino); 1638 /* avoid calling iput_final() while holding s_mutex */ 1639 ceph_async_iput(inode); 1640 1641 spin_lock(&session->s_cap_lock); 1642 } 1643 } 1644 1645 // drop cap expires and unlock s_cap_lock 1646 detach_cap_releases(session, &dispose); 1647 1648 BUG_ON(session->s_nr_caps > 0); 1649 BUG_ON(!list_empty(&session->s_cap_flushing)); 1650 spin_unlock(&session->s_cap_lock); 1651 dispose_cap_releases(session->s_mdsc, &dispose); 1652 } 1653 1654 enum { 1655 RECONNECT, 1656 RENEWCAPS, 1657 FORCE_RO, 1658 }; 1659 1660 /* 1661 * wake up any threads waiting on this session's caps. if the cap is 1662 * old (didn't get renewed on the client reconnect), remove it now. 1663 * 1664 * caller must hold s_mutex. 1665 */ 1666 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1667 void *arg) 1668 { 1669 struct ceph_inode_info *ci = ceph_inode(inode); 1670 unsigned long ev = (unsigned long)arg; 1671 1672 if (ev == RECONNECT) { 1673 spin_lock(&ci->i_ceph_lock); 1674 ci->i_wanted_max_size = 0; 1675 ci->i_requested_max_size = 0; 1676 spin_unlock(&ci->i_ceph_lock); 1677 } else if (ev == RENEWCAPS) { 1678 if (cap->cap_gen < cap->session->s_cap_gen) { 1679 /* mds did not re-issue stale cap */ 1680 spin_lock(&ci->i_ceph_lock); 1681 cap->issued = cap->implemented = CEPH_CAP_PIN; 1682 spin_unlock(&ci->i_ceph_lock); 1683 } 1684 } else if (ev == FORCE_RO) { 1685 } 1686 wake_up_all(&ci->i_cap_wq); 1687 return 0; 1688 } 1689 1690 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1691 { 1692 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1693 ceph_iterate_session_caps(session, wake_up_session_cb, 1694 (void *)(unsigned long)ev); 1695 } 1696 1697 /* 1698 * Send periodic message to MDS renewing all currently held caps. The 1699 * ack will reset the expiration for all caps from this session. 1700 * 1701 * caller holds s_mutex 1702 */ 1703 static int send_renew_caps(struct ceph_mds_client *mdsc, 1704 struct ceph_mds_session *session) 1705 { 1706 struct ceph_msg *msg; 1707 int state; 1708 1709 if (time_after_eq(jiffies, session->s_cap_ttl) && 1710 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1711 pr_info("mds%d caps stale\n", session->s_mds); 1712 session->s_renew_requested = jiffies; 1713 1714 /* do not try to renew caps until a recovering mds has reconnected 1715 * with its clients. */ 1716 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1717 if (state < CEPH_MDS_STATE_RECONNECT) { 1718 dout("send_renew_caps ignoring mds%d (%s)\n", 1719 session->s_mds, ceph_mds_state_name(state)); 1720 return 0; 1721 } 1722 1723 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1724 ceph_mds_state_name(state)); 1725 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1726 ++session->s_renew_seq); 1727 if (!msg) 1728 return -ENOMEM; 1729 ceph_con_send(&session->s_con, msg); 1730 return 0; 1731 } 1732 1733 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1734 struct ceph_mds_session *session, u64 seq) 1735 { 1736 struct ceph_msg *msg; 1737 1738 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1739 session->s_mds, ceph_session_state_name(session->s_state), seq); 1740 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1741 if (!msg) 1742 return -ENOMEM; 1743 ceph_con_send(&session->s_con, msg); 1744 return 0; 1745 } 1746 1747 1748 /* 1749 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1750 * 1751 * Called under session->s_mutex 1752 */ 1753 static void renewed_caps(struct ceph_mds_client *mdsc, 1754 struct ceph_mds_session *session, int is_renew) 1755 { 1756 int was_stale; 1757 int wake = 0; 1758 1759 spin_lock(&session->s_cap_lock); 1760 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1761 1762 session->s_cap_ttl = session->s_renew_requested + 1763 mdsc->mdsmap->m_session_timeout*HZ; 1764 1765 if (was_stale) { 1766 if (time_before(jiffies, session->s_cap_ttl)) { 1767 pr_info("mds%d caps renewed\n", session->s_mds); 1768 wake = 1; 1769 } else { 1770 pr_info("mds%d caps still stale\n", session->s_mds); 1771 } 1772 } 1773 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1774 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1775 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1776 spin_unlock(&session->s_cap_lock); 1777 1778 if (wake) 1779 wake_up_session_caps(session, RENEWCAPS); 1780 } 1781 1782 /* 1783 * send a session close request 1784 */ 1785 static int request_close_session(struct ceph_mds_client *mdsc, 1786 struct ceph_mds_session *session) 1787 { 1788 struct ceph_msg *msg; 1789 1790 dout("request_close_session mds%d state %s seq %lld\n", 1791 session->s_mds, ceph_session_state_name(session->s_state), 1792 session->s_seq); 1793 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1794 if (!msg) 1795 return -ENOMEM; 1796 ceph_con_send(&session->s_con, msg); 1797 return 1; 1798 } 1799 1800 /* 1801 * Called with s_mutex held. 1802 */ 1803 static int __close_session(struct ceph_mds_client *mdsc, 1804 struct ceph_mds_session *session) 1805 { 1806 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1807 return 0; 1808 session->s_state = CEPH_MDS_SESSION_CLOSING; 1809 return request_close_session(mdsc, session); 1810 } 1811 1812 static bool drop_negative_children(struct dentry *dentry) 1813 { 1814 struct dentry *child; 1815 bool all_negative = true; 1816 1817 if (!d_is_dir(dentry)) 1818 goto out; 1819 1820 spin_lock(&dentry->d_lock); 1821 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1822 if (d_really_is_positive(child)) { 1823 all_negative = false; 1824 break; 1825 } 1826 } 1827 spin_unlock(&dentry->d_lock); 1828 1829 if (all_negative) 1830 shrink_dcache_parent(dentry); 1831 out: 1832 return all_negative; 1833 } 1834 1835 /* 1836 * Trim old(er) caps. 1837 * 1838 * Because we can't cache an inode without one or more caps, we do 1839 * this indirectly: if a cap is unused, we prune its aliases, at which 1840 * point the inode will hopefully get dropped to. 1841 * 1842 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1843 * memory pressure from the MDS, though, so it needn't be perfect. 1844 */ 1845 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1846 { 1847 int *remaining = arg; 1848 struct ceph_inode_info *ci = ceph_inode(inode); 1849 int used, wanted, oissued, mine; 1850 1851 if (*remaining <= 0) 1852 return -1; 1853 1854 spin_lock(&ci->i_ceph_lock); 1855 mine = cap->issued | cap->implemented; 1856 used = __ceph_caps_used(ci); 1857 wanted = __ceph_caps_file_wanted(ci); 1858 oissued = __ceph_caps_issued_other(ci, cap); 1859 1860 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1861 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1862 ceph_cap_string(used), ceph_cap_string(wanted)); 1863 if (cap == ci->i_auth_cap) { 1864 if (ci->i_dirty_caps || ci->i_flushing_caps || 1865 !list_empty(&ci->i_cap_snaps)) 1866 goto out; 1867 if ((used | wanted) & CEPH_CAP_ANY_WR) 1868 goto out; 1869 /* Note: it's possible that i_filelock_ref becomes non-zero 1870 * after dropping auth caps. It doesn't hurt because reply 1871 * of lock mds request will re-add auth caps. */ 1872 if (atomic_read(&ci->i_filelock_ref) > 0) 1873 goto out; 1874 } 1875 /* The inode has cached pages, but it's no longer used. 1876 * we can safely drop it */ 1877 if (S_ISREG(inode->i_mode) && 1878 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 1879 !(oissued & CEPH_CAP_FILE_CACHE)) { 1880 used = 0; 1881 oissued = 0; 1882 } 1883 if ((used | wanted) & ~oissued & mine) 1884 goto out; /* we need these caps */ 1885 1886 if (oissued) { 1887 /* we aren't the only cap.. just remove us */ 1888 __ceph_remove_cap(cap, true); 1889 (*remaining)--; 1890 } else { 1891 struct dentry *dentry; 1892 /* try dropping referring dentries */ 1893 spin_unlock(&ci->i_ceph_lock); 1894 dentry = d_find_any_alias(inode); 1895 if (dentry && drop_negative_children(dentry)) { 1896 int count; 1897 dput(dentry); 1898 d_prune_aliases(inode); 1899 count = atomic_read(&inode->i_count); 1900 if (count == 1) 1901 (*remaining)--; 1902 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1903 inode, cap, count); 1904 } else { 1905 dput(dentry); 1906 } 1907 return 0; 1908 } 1909 1910 out: 1911 spin_unlock(&ci->i_ceph_lock); 1912 return 0; 1913 } 1914 1915 /* 1916 * Trim session cap count down to some max number. 1917 */ 1918 int ceph_trim_caps(struct ceph_mds_client *mdsc, 1919 struct ceph_mds_session *session, 1920 int max_caps) 1921 { 1922 int trim_caps = session->s_nr_caps - max_caps; 1923 1924 dout("trim_caps mds%d start: %d / %d, trim %d\n", 1925 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 1926 if (trim_caps > 0) { 1927 int remaining = trim_caps; 1928 1929 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 1930 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 1931 session->s_mds, session->s_nr_caps, max_caps, 1932 trim_caps - remaining); 1933 } 1934 1935 ceph_flush_cap_releases(mdsc, session); 1936 return 0; 1937 } 1938 1939 static int check_caps_flush(struct ceph_mds_client *mdsc, 1940 u64 want_flush_tid) 1941 { 1942 int ret = 1; 1943 1944 spin_lock(&mdsc->cap_dirty_lock); 1945 if (!list_empty(&mdsc->cap_flush_list)) { 1946 struct ceph_cap_flush *cf = 1947 list_first_entry(&mdsc->cap_flush_list, 1948 struct ceph_cap_flush, g_list); 1949 if (cf->tid <= want_flush_tid) { 1950 dout("check_caps_flush still flushing tid " 1951 "%llu <= %llu\n", cf->tid, want_flush_tid); 1952 ret = 0; 1953 } 1954 } 1955 spin_unlock(&mdsc->cap_dirty_lock); 1956 return ret; 1957 } 1958 1959 /* 1960 * flush all dirty inode data to disk. 1961 * 1962 * returns true if we've flushed through want_flush_tid 1963 */ 1964 static void wait_caps_flush(struct ceph_mds_client *mdsc, 1965 u64 want_flush_tid) 1966 { 1967 dout("check_caps_flush want %llu\n", want_flush_tid); 1968 1969 wait_event(mdsc->cap_flushing_wq, 1970 check_caps_flush(mdsc, want_flush_tid)); 1971 1972 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 1973 } 1974 1975 /* 1976 * called under s_mutex 1977 */ 1978 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 1979 struct ceph_mds_session *session) 1980 { 1981 struct ceph_msg *msg = NULL; 1982 struct ceph_mds_cap_release *head; 1983 struct ceph_mds_cap_item *item; 1984 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 1985 struct ceph_cap *cap; 1986 LIST_HEAD(tmp_list); 1987 int num_cap_releases; 1988 __le32 barrier, *cap_barrier; 1989 1990 down_read(&osdc->lock); 1991 barrier = cpu_to_le32(osdc->epoch_barrier); 1992 up_read(&osdc->lock); 1993 1994 spin_lock(&session->s_cap_lock); 1995 again: 1996 list_splice_init(&session->s_cap_releases, &tmp_list); 1997 num_cap_releases = session->s_num_cap_releases; 1998 session->s_num_cap_releases = 0; 1999 spin_unlock(&session->s_cap_lock); 2000 2001 while (!list_empty(&tmp_list)) { 2002 if (!msg) { 2003 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2004 PAGE_SIZE, GFP_NOFS, false); 2005 if (!msg) 2006 goto out_err; 2007 head = msg->front.iov_base; 2008 head->num = cpu_to_le32(0); 2009 msg->front.iov_len = sizeof(*head); 2010 2011 msg->hdr.version = cpu_to_le16(2); 2012 msg->hdr.compat_version = cpu_to_le16(1); 2013 } 2014 2015 cap = list_first_entry(&tmp_list, struct ceph_cap, 2016 session_caps); 2017 list_del(&cap->session_caps); 2018 num_cap_releases--; 2019 2020 head = msg->front.iov_base; 2021 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2022 &head->num); 2023 item = msg->front.iov_base + msg->front.iov_len; 2024 item->ino = cpu_to_le64(cap->cap_ino); 2025 item->cap_id = cpu_to_le64(cap->cap_id); 2026 item->migrate_seq = cpu_to_le32(cap->mseq); 2027 item->seq = cpu_to_le32(cap->issue_seq); 2028 msg->front.iov_len += sizeof(*item); 2029 2030 ceph_put_cap(mdsc, cap); 2031 2032 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2033 // Append cap_barrier field 2034 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2035 *cap_barrier = barrier; 2036 msg->front.iov_len += sizeof(*cap_barrier); 2037 2038 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2039 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2040 ceph_con_send(&session->s_con, msg); 2041 msg = NULL; 2042 } 2043 } 2044 2045 BUG_ON(num_cap_releases != 0); 2046 2047 spin_lock(&session->s_cap_lock); 2048 if (!list_empty(&session->s_cap_releases)) 2049 goto again; 2050 spin_unlock(&session->s_cap_lock); 2051 2052 if (msg) { 2053 // Append cap_barrier field 2054 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2055 *cap_barrier = barrier; 2056 msg->front.iov_len += sizeof(*cap_barrier); 2057 2058 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2059 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2060 ceph_con_send(&session->s_con, msg); 2061 } 2062 return; 2063 out_err: 2064 pr_err("send_cap_releases mds%d, failed to allocate message\n", 2065 session->s_mds); 2066 spin_lock(&session->s_cap_lock); 2067 list_splice(&tmp_list, &session->s_cap_releases); 2068 session->s_num_cap_releases += num_cap_releases; 2069 spin_unlock(&session->s_cap_lock); 2070 } 2071 2072 static void ceph_cap_release_work(struct work_struct *work) 2073 { 2074 struct ceph_mds_session *session = 2075 container_of(work, struct ceph_mds_session, s_cap_release_work); 2076 2077 mutex_lock(&session->s_mutex); 2078 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2079 session->s_state == CEPH_MDS_SESSION_HUNG) 2080 ceph_send_cap_releases(session->s_mdsc, session); 2081 mutex_unlock(&session->s_mutex); 2082 ceph_put_mds_session(session); 2083 } 2084 2085 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 2086 struct ceph_mds_session *session) 2087 { 2088 if (mdsc->stopping) 2089 return; 2090 2091 ceph_get_mds_session(session); 2092 if (queue_work(mdsc->fsc->cap_wq, 2093 &session->s_cap_release_work)) { 2094 dout("cap release work queued\n"); 2095 } else { 2096 ceph_put_mds_session(session); 2097 dout("failed to queue cap release work\n"); 2098 } 2099 } 2100 2101 /* 2102 * caller holds session->s_cap_lock 2103 */ 2104 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2105 struct ceph_cap *cap) 2106 { 2107 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2108 session->s_num_cap_releases++; 2109 2110 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2111 ceph_flush_cap_releases(session->s_mdsc, session); 2112 } 2113 2114 static void ceph_cap_reclaim_work(struct work_struct *work) 2115 { 2116 struct ceph_mds_client *mdsc = 2117 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2118 int ret = ceph_trim_dentries(mdsc); 2119 if (ret == -EAGAIN) 2120 ceph_queue_cap_reclaim_work(mdsc); 2121 } 2122 2123 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2124 { 2125 if (mdsc->stopping) 2126 return; 2127 2128 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2129 dout("caps reclaim work queued\n"); 2130 } else { 2131 dout("failed to queue caps release work\n"); 2132 } 2133 } 2134 2135 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2136 { 2137 int val; 2138 if (!nr) 2139 return; 2140 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2141 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2142 atomic_set(&mdsc->cap_reclaim_pending, 0); 2143 ceph_queue_cap_reclaim_work(mdsc); 2144 } 2145 } 2146 2147 /* 2148 * requests 2149 */ 2150 2151 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2152 struct inode *dir) 2153 { 2154 struct ceph_inode_info *ci = ceph_inode(dir); 2155 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2156 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2157 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2158 unsigned int num_entries; 2159 int order; 2160 2161 spin_lock(&ci->i_ceph_lock); 2162 num_entries = ci->i_files + ci->i_subdirs; 2163 spin_unlock(&ci->i_ceph_lock); 2164 num_entries = max(num_entries, 1U); 2165 num_entries = min(num_entries, opt->max_readdir); 2166 2167 order = get_order(size * num_entries); 2168 while (order >= 0) { 2169 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2170 __GFP_NOWARN, 2171 order); 2172 if (rinfo->dir_entries) 2173 break; 2174 order--; 2175 } 2176 if (!rinfo->dir_entries) 2177 return -ENOMEM; 2178 2179 num_entries = (PAGE_SIZE << order) / size; 2180 num_entries = min(num_entries, opt->max_readdir); 2181 2182 rinfo->dir_buf_size = PAGE_SIZE << order; 2183 req->r_num_caps = num_entries + 1; 2184 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2185 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2186 return 0; 2187 } 2188 2189 /* 2190 * Create an mds request. 2191 */ 2192 struct ceph_mds_request * 2193 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2194 { 2195 struct ceph_mds_request *req; 2196 2197 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2198 if (!req) 2199 return ERR_PTR(-ENOMEM); 2200 2201 mutex_init(&req->r_fill_mutex); 2202 req->r_mdsc = mdsc; 2203 req->r_started = jiffies; 2204 req->r_resend_mds = -1; 2205 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2206 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2207 req->r_fmode = -1; 2208 kref_init(&req->r_kref); 2209 RB_CLEAR_NODE(&req->r_node); 2210 INIT_LIST_HEAD(&req->r_wait); 2211 init_completion(&req->r_completion); 2212 init_completion(&req->r_safe_completion); 2213 INIT_LIST_HEAD(&req->r_unsafe_item); 2214 2215 ktime_get_coarse_real_ts64(&req->r_stamp); 2216 2217 req->r_op = op; 2218 req->r_direct_mode = mode; 2219 return req; 2220 } 2221 2222 /* 2223 * return oldest (lowest) request, tid in request tree, 0 if none. 2224 * 2225 * called under mdsc->mutex. 2226 */ 2227 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2228 { 2229 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2230 return NULL; 2231 return rb_entry(rb_first(&mdsc->request_tree), 2232 struct ceph_mds_request, r_node); 2233 } 2234 2235 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2236 { 2237 return mdsc->oldest_tid; 2238 } 2239 2240 /* 2241 * Build a dentry's path. Allocate on heap; caller must kfree. Based 2242 * on build_path_from_dentry in fs/cifs/dir.c. 2243 * 2244 * If @stop_on_nosnap, generate path relative to the first non-snapped 2245 * inode. 2246 * 2247 * Encode hidden .snap dirs as a double /, i.e. 2248 * foo/.snap/bar -> foo//bar 2249 */ 2250 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, 2251 int stop_on_nosnap) 2252 { 2253 struct dentry *temp; 2254 char *path; 2255 int pos; 2256 unsigned seq; 2257 u64 base; 2258 2259 if (!dentry) 2260 return ERR_PTR(-EINVAL); 2261 2262 path = __getname(); 2263 if (!path) 2264 return ERR_PTR(-ENOMEM); 2265 retry: 2266 pos = PATH_MAX - 1; 2267 path[pos] = '\0'; 2268 2269 seq = read_seqbegin(&rename_lock); 2270 rcu_read_lock(); 2271 temp = dentry; 2272 for (;;) { 2273 struct inode *inode; 2274 2275 spin_lock(&temp->d_lock); 2276 inode = d_inode(temp); 2277 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2278 dout("build_path path+%d: %p SNAPDIR\n", 2279 pos, temp); 2280 } else if (stop_on_nosnap && inode && dentry != temp && 2281 ceph_snap(inode) == CEPH_NOSNAP) { 2282 spin_unlock(&temp->d_lock); 2283 pos++; /* get rid of any prepended '/' */ 2284 break; 2285 } else { 2286 pos -= temp->d_name.len; 2287 if (pos < 0) { 2288 spin_unlock(&temp->d_lock); 2289 break; 2290 } 2291 memcpy(path + pos, temp->d_name.name, temp->d_name.len); 2292 } 2293 spin_unlock(&temp->d_lock); 2294 temp = READ_ONCE(temp->d_parent); 2295 2296 /* Are we at the root? */ 2297 if (IS_ROOT(temp)) 2298 break; 2299 2300 /* Are we out of buffer? */ 2301 if (--pos < 0) 2302 break; 2303 2304 path[pos] = '/'; 2305 } 2306 base = ceph_ino(d_inode(temp)); 2307 rcu_read_unlock(); 2308 2309 if (read_seqretry(&rename_lock, seq)) 2310 goto retry; 2311 2312 if (pos < 0) { 2313 /* 2314 * A rename didn't occur, but somehow we didn't end up where 2315 * we thought we would. Throw a warning and try again. 2316 */ 2317 pr_warn("build_path did not end path lookup where " 2318 "expected, pos is %d\n", pos); 2319 goto retry; 2320 } 2321 2322 *pbase = base; 2323 *plen = PATH_MAX - 1 - pos; 2324 dout("build_path on %p %d built %llx '%.*s'\n", 2325 dentry, d_count(dentry), base, *plen, path + pos); 2326 return path + pos; 2327 } 2328 2329 static int build_dentry_path(struct dentry *dentry, struct inode *dir, 2330 const char **ppath, int *ppathlen, u64 *pino, 2331 bool *pfreepath, bool parent_locked) 2332 { 2333 char *path; 2334 2335 rcu_read_lock(); 2336 if (!dir) 2337 dir = d_inode_rcu(dentry->d_parent); 2338 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { 2339 *pino = ceph_ino(dir); 2340 rcu_read_unlock(); 2341 *ppath = dentry->d_name.name; 2342 *ppathlen = dentry->d_name.len; 2343 return 0; 2344 } 2345 rcu_read_unlock(); 2346 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2347 if (IS_ERR(path)) 2348 return PTR_ERR(path); 2349 *ppath = path; 2350 *pfreepath = true; 2351 return 0; 2352 } 2353 2354 static int build_inode_path(struct inode *inode, 2355 const char **ppath, int *ppathlen, u64 *pino, 2356 bool *pfreepath) 2357 { 2358 struct dentry *dentry; 2359 char *path; 2360 2361 if (ceph_snap(inode) == CEPH_NOSNAP) { 2362 *pino = ceph_ino(inode); 2363 *ppathlen = 0; 2364 return 0; 2365 } 2366 dentry = d_find_alias(inode); 2367 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2368 dput(dentry); 2369 if (IS_ERR(path)) 2370 return PTR_ERR(path); 2371 *ppath = path; 2372 *pfreepath = true; 2373 return 0; 2374 } 2375 2376 /* 2377 * request arguments may be specified via an inode *, a dentry *, or 2378 * an explicit ino+path. 2379 */ 2380 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2381 struct inode *rdiri, const char *rpath, 2382 u64 rino, const char **ppath, int *pathlen, 2383 u64 *ino, bool *freepath, bool parent_locked) 2384 { 2385 int r = 0; 2386 2387 if (rinode) { 2388 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2389 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2390 ceph_snap(rinode)); 2391 } else if (rdentry) { 2392 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2393 freepath, parent_locked); 2394 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2395 *ppath); 2396 } else if (rpath || rino) { 2397 *ino = rino; 2398 *ppath = rpath; 2399 *pathlen = rpath ? strlen(rpath) : 0; 2400 dout(" path %.*s\n", *pathlen, rpath); 2401 } 2402 2403 return r; 2404 } 2405 2406 /* 2407 * called under mdsc->mutex 2408 */ 2409 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, 2410 struct ceph_mds_request *req, 2411 int mds, bool drop_cap_releases) 2412 { 2413 struct ceph_msg *msg; 2414 struct ceph_mds_request_head *head; 2415 const char *path1 = NULL; 2416 const char *path2 = NULL; 2417 u64 ino1 = 0, ino2 = 0; 2418 int pathlen1 = 0, pathlen2 = 0; 2419 bool freepath1 = false, freepath2 = false; 2420 int len; 2421 u16 releases; 2422 void *p, *end; 2423 int ret; 2424 2425 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2426 req->r_parent, req->r_path1, req->r_ino1.ino, 2427 &path1, &pathlen1, &ino1, &freepath1, 2428 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2429 &req->r_req_flags)); 2430 if (ret < 0) { 2431 msg = ERR_PTR(ret); 2432 goto out; 2433 } 2434 2435 /* If r_old_dentry is set, then assume that its parent is locked */ 2436 ret = set_request_path_attr(NULL, req->r_old_dentry, 2437 req->r_old_dentry_dir, 2438 req->r_path2, req->r_ino2.ino, 2439 &path2, &pathlen2, &ino2, &freepath2, true); 2440 if (ret < 0) { 2441 msg = ERR_PTR(ret); 2442 goto out_free1; 2443 } 2444 2445 len = sizeof(*head) + 2446 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2447 sizeof(struct ceph_timespec); 2448 2449 /* calculate (max) length for cap releases */ 2450 len += sizeof(struct ceph_mds_request_release) * 2451 (!!req->r_inode_drop + !!req->r_dentry_drop + 2452 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2453 if (req->r_dentry_drop) 2454 len += pathlen1; 2455 if (req->r_old_dentry_drop) 2456 len += pathlen2; 2457 2458 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2459 if (!msg) { 2460 msg = ERR_PTR(-ENOMEM); 2461 goto out_free2; 2462 } 2463 2464 msg->hdr.version = cpu_to_le16(2); 2465 msg->hdr.tid = cpu_to_le64(req->r_tid); 2466 2467 head = msg->front.iov_base; 2468 p = msg->front.iov_base + sizeof(*head); 2469 end = msg->front.iov_base + msg->front.iov_len; 2470 2471 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2472 head->op = cpu_to_le32(req->r_op); 2473 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid)); 2474 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid)); 2475 head->ino = cpu_to_le64(req->r_deleg_ino); 2476 head->args = req->r_args; 2477 2478 ceph_encode_filepath(&p, end, ino1, path1); 2479 ceph_encode_filepath(&p, end, ino2, path2); 2480 2481 /* make note of release offset, in case we need to replay */ 2482 req->r_request_release_offset = p - msg->front.iov_base; 2483 2484 /* cap releases */ 2485 releases = 0; 2486 if (req->r_inode_drop) 2487 releases += ceph_encode_inode_release(&p, 2488 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2489 mds, req->r_inode_drop, req->r_inode_unless, 2490 req->r_op == CEPH_MDS_OP_READDIR); 2491 if (req->r_dentry_drop) 2492 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2493 req->r_parent, mds, req->r_dentry_drop, 2494 req->r_dentry_unless); 2495 if (req->r_old_dentry_drop) 2496 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2497 req->r_old_dentry_dir, mds, 2498 req->r_old_dentry_drop, 2499 req->r_old_dentry_unless); 2500 if (req->r_old_inode_drop) 2501 releases += ceph_encode_inode_release(&p, 2502 d_inode(req->r_old_dentry), 2503 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2504 2505 if (drop_cap_releases) { 2506 releases = 0; 2507 p = msg->front.iov_base + req->r_request_release_offset; 2508 } 2509 2510 head->num_releases = cpu_to_le16(releases); 2511 2512 /* time stamp */ 2513 { 2514 struct ceph_timespec ts; 2515 ceph_encode_timespec64(&ts, &req->r_stamp); 2516 ceph_encode_copy(&p, &ts, sizeof(ts)); 2517 } 2518 2519 BUG_ON(p > end); 2520 msg->front.iov_len = p - msg->front.iov_base; 2521 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2522 2523 if (req->r_pagelist) { 2524 struct ceph_pagelist *pagelist = req->r_pagelist; 2525 ceph_msg_data_add_pagelist(msg, pagelist); 2526 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2527 } else { 2528 msg->hdr.data_len = 0; 2529 } 2530 2531 msg->hdr.data_off = cpu_to_le16(0); 2532 2533 out_free2: 2534 if (freepath2) 2535 ceph_mdsc_free_path((char *)path2, pathlen2); 2536 out_free1: 2537 if (freepath1) 2538 ceph_mdsc_free_path((char *)path1, pathlen1); 2539 out: 2540 return msg; 2541 } 2542 2543 /* 2544 * called under mdsc->mutex if error, under no mutex if 2545 * success. 2546 */ 2547 static void complete_request(struct ceph_mds_client *mdsc, 2548 struct ceph_mds_request *req) 2549 { 2550 if (req->r_callback) 2551 req->r_callback(mdsc, req); 2552 complete_all(&req->r_completion); 2553 } 2554 2555 /* 2556 * called under mdsc->mutex 2557 */ 2558 static int __prepare_send_request(struct ceph_mds_client *mdsc, 2559 struct ceph_mds_request *req, 2560 int mds, bool drop_cap_releases) 2561 { 2562 struct ceph_mds_request_head *rhead; 2563 struct ceph_msg *msg; 2564 int flags = 0; 2565 2566 req->r_attempts++; 2567 if (req->r_inode) { 2568 struct ceph_cap *cap = 2569 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2570 2571 if (cap) 2572 req->r_sent_on_mseq = cap->mseq; 2573 else 2574 req->r_sent_on_mseq = -1; 2575 } 2576 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 2577 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2578 2579 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2580 void *p; 2581 /* 2582 * Replay. Do not regenerate message (and rebuild 2583 * paths, etc.); just use the original message. 2584 * Rebuilding paths will break for renames because 2585 * d_move mangles the src name. 2586 */ 2587 msg = req->r_request; 2588 rhead = msg->front.iov_base; 2589 2590 flags = le32_to_cpu(rhead->flags); 2591 flags |= CEPH_MDS_FLAG_REPLAY; 2592 rhead->flags = cpu_to_le32(flags); 2593 2594 if (req->r_target_inode) 2595 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2596 2597 rhead->num_retry = req->r_attempts - 1; 2598 2599 /* remove cap/dentry releases from message */ 2600 rhead->num_releases = 0; 2601 2602 /* time stamp */ 2603 p = msg->front.iov_base + req->r_request_release_offset; 2604 { 2605 struct ceph_timespec ts; 2606 ceph_encode_timespec64(&ts, &req->r_stamp); 2607 ceph_encode_copy(&p, &ts, sizeof(ts)); 2608 } 2609 2610 msg->front.iov_len = p - msg->front.iov_base; 2611 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2612 return 0; 2613 } 2614 2615 if (req->r_request) { 2616 ceph_msg_put(req->r_request); 2617 req->r_request = NULL; 2618 } 2619 msg = create_request_message(mdsc, req, mds, drop_cap_releases); 2620 if (IS_ERR(msg)) { 2621 req->r_err = PTR_ERR(msg); 2622 return PTR_ERR(msg); 2623 } 2624 req->r_request = msg; 2625 2626 rhead = msg->front.iov_base; 2627 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2628 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2629 flags |= CEPH_MDS_FLAG_REPLAY; 2630 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 2631 flags |= CEPH_MDS_FLAG_ASYNC; 2632 if (req->r_parent) 2633 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2634 rhead->flags = cpu_to_le32(flags); 2635 rhead->num_fwd = req->r_num_fwd; 2636 rhead->num_retry = req->r_attempts - 1; 2637 2638 dout(" r_parent = %p\n", req->r_parent); 2639 return 0; 2640 } 2641 2642 /* 2643 * called under mdsc->mutex 2644 */ 2645 static int __send_request(struct ceph_mds_client *mdsc, 2646 struct ceph_mds_session *session, 2647 struct ceph_mds_request *req, 2648 bool drop_cap_releases) 2649 { 2650 int err; 2651 2652 err = __prepare_send_request(mdsc, req, session->s_mds, 2653 drop_cap_releases); 2654 if (!err) { 2655 ceph_msg_get(req->r_request); 2656 ceph_con_send(&session->s_con, req->r_request); 2657 } 2658 2659 return err; 2660 } 2661 2662 /* 2663 * send request, or put it on the appropriate wait list. 2664 */ 2665 static void __do_request(struct ceph_mds_client *mdsc, 2666 struct ceph_mds_request *req) 2667 { 2668 struct ceph_mds_session *session = NULL; 2669 int mds = -1; 2670 int err = 0; 2671 bool random; 2672 2673 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2674 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2675 __unregister_request(mdsc, req); 2676 return; 2677 } 2678 2679 if (req->r_timeout && 2680 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2681 dout("do_request timed out\n"); 2682 err = -ETIMEDOUT; 2683 goto finish; 2684 } 2685 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2686 dout("do_request forced umount\n"); 2687 err = -EIO; 2688 goto finish; 2689 } 2690 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2691 if (mdsc->mdsmap_err) { 2692 err = mdsc->mdsmap_err; 2693 dout("do_request mdsmap err %d\n", err); 2694 goto finish; 2695 } 2696 if (mdsc->mdsmap->m_epoch == 0) { 2697 dout("do_request no mdsmap, waiting for map\n"); 2698 list_add(&req->r_wait, &mdsc->waiting_for_map); 2699 return; 2700 } 2701 if (!(mdsc->fsc->mount_options->flags & 2702 CEPH_MOUNT_OPT_MOUNTWAIT) && 2703 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2704 err = -EHOSTUNREACH; 2705 goto finish; 2706 } 2707 } 2708 2709 put_request_session(req); 2710 2711 mds = __choose_mds(mdsc, req, &random); 2712 if (mds < 0 || 2713 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2714 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2715 err = -EJUKEBOX; 2716 goto finish; 2717 } 2718 dout("do_request no mds or not active, waiting for map\n"); 2719 list_add(&req->r_wait, &mdsc->waiting_for_map); 2720 return; 2721 } 2722 2723 /* get, open session */ 2724 session = __ceph_lookup_mds_session(mdsc, mds); 2725 if (!session) { 2726 session = register_session(mdsc, mds); 2727 if (IS_ERR(session)) { 2728 err = PTR_ERR(session); 2729 goto finish; 2730 } 2731 } 2732 req->r_session = ceph_get_mds_session(session); 2733 2734 dout("do_request mds%d session %p state %s\n", mds, session, 2735 ceph_session_state_name(session->s_state)); 2736 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2737 session->s_state != CEPH_MDS_SESSION_HUNG) { 2738 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2739 err = -EACCES; 2740 goto out_session; 2741 } 2742 /* 2743 * We cannot queue async requests since the caps and delegated 2744 * inodes are bound to the session. Just return -EJUKEBOX and 2745 * let the caller retry a sync request in that case. 2746 */ 2747 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2748 err = -EJUKEBOX; 2749 goto out_session; 2750 } 2751 if (session->s_state == CEPH_MDS_SESSION_NEW || 2752 session->s_state == CEPH_MDS_SESSION_CLOSING) { 2753 __open_session(mdsc, session); 2754 /* retry the same mds later */ 2755 if (random) 2756 req->r_resend_mds = mds; 2757 } 2758 list_add(&req->r_wait, &session->s_waiting); 2759 goto out_session; 2760 } 2761 2762 /* send request */ 2763 req->r_resend_mds = -1; /* forget any previous mds hint */ 2764 2765 if (req->r_request_started == 0) /* note request start time */ 2766 req->r_request_started = jiffies; 2767 2768 err = __send_request(mdsc, session, req, false); 2769 2770 out_session: 2771 ceph_put_mds_session(session); 2772 finish: 2773 if (err) { 2774 dout("__do_request early error %d\n", err); 2775 req->r_err = err; 2776 complete_request(mdsc, req); 2777 __unregister_request(mdsc, req); 2778 } 2779 return; 2780 } 2781 2782 /* 2783 * called under mdsc->mutex 2784 */ 2785 static void __wake_requests(struct ceph_mds_client *mdsc, 2786 struct list_head *head) 2787 { 2788 struct ceph_mds_request *req; 2789 LIST_HEAD(tmp_list); 2790 2791 list_splice_init(head, &tmp_list); 2792 2793 while (!list_empty(&tmp_list)) { 2794 req = list_entry(tmp_list.next, 2795 struct ceph_mds_request, r_wait); 2796 list_del_init(&req->r_wait); 2797 dout(" wake request %p tid %llu\n", req, req->r_tid); 2798 __do_request(mdsc, req); 2799 } 2800 } 2801 2802 /* 2803 * Wake up threads with requests pending for @mds, so that they can 2804 * resubmit their requests to a possibly different mds. 2805 */ 2806 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 2807 { 2808 struct ceph_mds_request *req; 2809 struct rb_node *p = rb_first(&mdsc->request_tree); 2810 2811 dout("kick_requests mds%d\n", mds); 2812 while (p) { 2813 req = rb_entry(p, struct ceph_mds_request, r_node); 2814 p = rb_next(p); 2815 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2816 continue; 2817 if (req->r_attempts > 0) 2818 continue; /* only new requests */ 2819 if (req->r_session && 2820 req->r_session->s_mds == mds) { 2821 dout(" kicking tid %llu\n", req->r_tid); 2822 list_del_init(&req->r_wait); 2823 __do_request(mdsc, req); 2824 } 2825 } 2826 } 2827 2828 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 2829 struct ceph_mds_request *req) 2830 { 2831 int err = 0; 2832 2833 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 2834 if (req->r_inode) 2835 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2836 if (req->r_parent) { 2837 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 2838 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 2839 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 2840 spin_lock(&ci->i_ceph_lock); 2841 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 2842 __ceph_touch_fmode(ci, mdsc, fmode); 2843 spin_unlock(&ci->i_ceph_lock); 2844 ihold(req->r_parent); 2845 } 2846 if (req->r_old_dentry_dir) 2847 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2848 CEPH_CAP_PIN); 2849 2850 if (req->r_inode) { 2851 err = ceph_wait_on_async_create(req->r_inode); 2852 if (err) { 2853 dout("%s: wait for async create returned: %d\n", 2854 __func__, err); 2855 return err; 2856 } 2857 } 2858 2859 if (!err && req->r_old_inode) { 2860 err = ceph_wait_on_async_create(req->r_old_inode); 2861 if (err) { 2862 dout("%s: wait for async create returned: %d\n", 2863 __func__, err); 2864 return err; 2865 } 2866 } 2867 2868 dout("submit_request on %p for inode %p\n", req, dir); 2869 mutex_lock(&mdsc->mutex); 2870 __register_request(mdsc, req, dir); 2871 __do_request(mdsc, req); 2872 err = req->r_err; 2873 mutex_unlock(&mdsc->mutex); 2874 return err; 2875 } 2876 2877 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 2878 struct ceph_mds_request *req) 2879 { 2880 int err; 2881 2882 /* wait */ 2883 dout("do_request waiting\n"); 2884 if (!req->r_timeout && req->r_wait_for_completion) { 2885 err = req->r_wait_for_completion(mdsc, req); 2886 } else { 2887 long timeleft = wait_for_completion_killable_timeout( 2888 &req->r_completion, 2889 ceph_timeout_jiffies(req->r_timeout)); 2890 if (timeleft > 0) 2891 err = 0; 2892 else if (!timeleft) 2893 err = -ETIMEDOUT; /* timed out */ 2894 else 2895 err = timeleft; /* killed */ 2896 } 2897 dout("do_request waited, got %d\n", err); 2898 mutex_lock(&mdsc->mutex); 2899 2900 /* only abort if we didn't race with a real reply */ 2901 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2902 err = le32_to_cpu(req->r_reply_info.head->result); 2903 } else if (err < 0) { 2904 dout("aborted request %lld with %d\n", req->r_tid, err); 2905 2906 /* 2907 * ensure we aren't running concurrently with 2908 * ceph_fill_trace or ceph_readdir_prepopulate, which 2909 * rely on locks (dir mutex) held by our caller. 2910 */ 2911 mutex_lock(&req->r_fill_mutex); 2912 req->r_err = err; 2913 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 2914 mutex_unlock(&req->r_fill_mutex); 2915 2916 if (req->r_parent && 2917 (req->r_op & CEPH_MDS_OP_WRITE)) 2918 ceph_invalidate_dir_request(req); 2919 } else { 2920 err = req->r_err; 2921 } 2922 2923 mutex_unlock(&mdsc->mutex); 2924 return err; 2925 } 2926 2927 /* 2928 * Synchrously perform an mds request. Take care of all of the 2929 * session setup, forwarding, retry details. 2930 */ 2931 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 2932 struct inode *dir, 2933 struct ceph_mds_request *req) 2934 { 2935 int err; 2936 2937 dout("do_request on %p\n", req); 2938 2939 /* issue */ 2940 err = ceph_mdsc_submit_request(mdsc, dir, req); 2941 if (!err) 2942 err = ceph_mdsc_wait_request(mdsc, req); 2943 dout("do_request %p done, result %d\n", req, err); 2944 return err; 2945 } 2946 2947 /* 2948 * Invalidate dir's completeness, dentry lease state on an aborted MDS 2949 * namespace request. 2950 */ 2951 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 2952 { 2953 struct inode *dir = req->r_parent; 2954 struct inode *old_dir = req->r_old_dentry_dir; 2955 2956 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 2957 2958 ceph_dir_clear_complete(dir); 2959 if (old_dir) 2960 ceph_dir_clear_complete(old_dir); 2961 if (req->r_dentry) 2962 ceph_invalidate_dentry_lease(req->r_dentry); 2963 if (req->r_old_dentry) 2964 ceph_invalidate_dentry_lease(req->r_old_dentry); 2965 } 2966 2967 /* 2968 * Handle mds reply. 2969 * 2970 * We take the session mutex and parse and process the reply immediately. 2971 * This preserves the logical ordering of replies, capabilities, etc., sent 2972 * by the MDS as they are applied to our local cache. 2973 */ 2974 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 2975 { 2976 struct ceph_mds_client *mdsc = session->s_mdsc; 2977 struct ceph_mds_request *req; 2978 struct ceph_mds_reply_head *head = msg->front.iov_base; 2979 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 2980 struct ceph_snap_realm *realm; 2981 u64 tid; 2982 int err, result; 2983 int mds = session->s_mds; 2984 2985 if (msg->front.iov_len < sizeof(*head)) { 2986 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 2987 ceph_msg_dump(msg); 2988 return; 2989 } 2990 2991 /* get request, session */ 2992 tid = le64_to_cpu(msg->hdr.tid); 2993 mutex_lock(&mdsc->mutex); 2994 req = lookup_get_request(mdsc, tid); 2995 if (!req) { 2996 dout("handle_reply on unknown tid %llu\n", tid); 2997 mutex_unlock(&mdsc->mutex); 2998 return; 2999 } 3000 dout("handle_reply %p\n", req); 3001 3002 /* correct session? */ 3003 if (req->r_session != session) { 3004 pr_err("mdsc_handle_reply got %llu on session mds%d" 3005 " not mds%d\n", tid, session->s_mds, 3006 req->r_session ? req->r_session->s_mds : -1); 3007 mutex_unlock(&mdsc->mutex); 3008 goto out; 3009 } 3010 3011 /* dup? */ 3012 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3013 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3014 pr_warn("got a dup %s reply on %llu from mds%d\n", 3015 head->safe ? "safe" : "unsafe", tid, mds); 3016 mutex_unlock(&mdsc->mutex); 3017 goto out; 3018 } 3019 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3020 pr_warn("got unsafe after safe on %llu from mds%d\n", 3021 tid, mds); 3022 mutex_unlock(&mdsc->mutex); 3023 goto out; 3024 } 3025 3026 result = le32_to_cpu(head->result); 3027 3028 /* 3029 * Handle an ESTALE 3030 * if we're not talking to the authority, send to them 3031 * if the authority has changed while we weren't looking, 3032 * send to new authority 3033 * Otherwise we just have to return an ESTALE 3034 */ 3035 if (result == -ESTALE) { 3036 dout("got ESTALE on request %llu\n", req->r_tid); 3037 req->r_resend_mds = -1; 3038 if (req->r_direct_mode != USE_AUTH_MDS) { 3039 dout("not using auth, setting for that now\n"); 3040 req->r_direct_mode = USE_AUTH_MDS; 3041 __do_request(mdsc, req); 3042 mutex_unlock(&mdsc->mutex); 3043 goto out; 3044 } else { 3045 int mds = __choose_mds(mdsc, req, NULL); 3046 if (mds >= 0 && mds != req->r_session->s_mds) { 3047 dout("but auth changed, so resending\n"); 3048 __do_request(mdsc, req); 3049 mutex_unlock(&mdsc->mutex); 3050 goto out; 3051 } 3052 } 3053 dout("have to return ESTALE on request %llu\n", req->r_tid); 3054 } 3055 3056 3057 if (head->safe) { 3058 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3059 __unregister_request(mdsc, req); 3060 3061 /* last request during umount? */ 3062 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3063 complete_all(&mdsc->safe_umount_waiters); 3064 3065 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3066 /* 3067 * We already handled the unsafe response, now do the 3068 * cleanup. No need to examine the response; the MDS 3069 * doesn't include any result info in the safe 3070 * response. And even if it did, there is nothing 3071 * useful we could do with a revised return value. 3072 */ 3073 dout("got safe reply %llu, mds%d\n", tid, mds); 3074 3075 mutex_unlock(&mdsc->mutex); 3076 goto out; 3077 } 3078 } else { 3079 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3080 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3081 } 3082 3083 dout("handle_reply tid %lld result %d\n", tid, result); 3084 rinfo = &req->r_reply_info; 3085 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3086 err = parse_reply_info(session, msg, rinfo, (u64)-1); 3087 else 3088 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); 3089 mutex_unlock(&mdsc->mutex); 3090 3091 mutex_lock(&session->s_mutex); 3092 if (err < 0) { 3093 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 3094 ceph_msg_dump(msg); 3095 goto out_err; 3096 } 3097 3098 /* snap trace */ 3099 realm = NULL; 3100 if (rinfo->snapblob_len) { 3101 down_write(&mdsc->snap_rwsem); 3102 ceph_update_snap_trace(mdsc, rinfo->snapblob, 3103 rinfo->snapblob + rinfo->snapblob_len, 3104 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3105 &realm); 3106 downgrade_write(&mdsc->snap_rwsem); 3107 } else { 3108 down_read(&mdsc->snap_rwsem); 3109 } 3110 3111 /* insert trace into our cache */ 3112 mutex_lock(&req->r_fill_mutex); 3113 current->journal_info = req; 3114 err = ceph_fill_trace(mdsc->fsc->sb, req); 3115 if (err == 0) { 3116 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3117 req->r_op == CEPH_MDS_OP_LSSNAP)) 3118 ceph_readdir_prepopulate(req, req->r_session); 3119 } 3120 current->journal_info = NULL; 3121 mutex_unlock(&req->r_fill_mutex); 3122 3123 up_read(&mdsc->snap_rwsem); 3124 if (realm) 3125 ceph_put_snap_realm(mdsc, realm); 3126 3127 if (err == 0) { 3128 if (req->r_target_inode && 3129 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3130 struct ceph_inode_info *ci = 3131 ceph_inode(req->r_target_inode); 3132 spin_lock(&ci->i_unsafe_lock); 3133 list_add_tail(&req->r_unsafe_target_item, 3134 &ci->i_unsafe_iops); 3135 spin_unlock(&ci->i_unsafe_lock); 3136 } 3137 3138 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3139 } 3140 out_err: 3141 mutex_lock(&mdsc->mutex); 3142 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3143 if (err) { 3144 req->r_err = err; 3145 } else { 3146 req->r_reply = ceph_msg_get(msg); 3147 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3148 } 3149 } else { 3150 dout("reply arrived after request %lld was aborted\n", tid); 3151 } 3152 mutex_unlock(&mdsc->mutex); 3153 3154 mutex_unlock(&session->s_mutex); 3155 3156 /* kick calling process */ 3157 complete_request(mdsc, req); 3158 out: 3159 ceph_mdsc_put_request(req); 3160 return; 3161 } 3162 3163 3164 3165 /* 3166 * handle mds notification that our request has been forwarded. 3167 */ 3168 static void handle_forward(struct ceph_mds_client *mdsc, 3169 struct ceph_mds_session *session, 3170 struct ceph_msg *msg) 3171 { 3172 struct ceph_mds_request *req; 3173 u64 tid = le64_to_cpu(msg->hdr.tid); 3174 u32 next_mds; 3175 u32 fwd_seq; 3176 int err = -EINVAL; 3177 void *p = msg->front.iov_base; 3178 void *end = p + msg->front.iov_len; 3179 3180 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3181 next_mds = ceph_decode_32(&p); 3182 fwd_seq = ceph_decode_32(&p); 3183 3184 mutex_lock(&mdsc->mutex); 3185 req = lookup_get_request(mdsc, tid); 3186 if (!req) { 3187 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 3188 goto out; /* dup reply? */ 3189 } 3190 3191 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3192 dout("forward tid %llu aborted, unregistering\n", tid); 3193 __unregister_request(mdsc, req); 3194 } else if (fwd_seq <= req->r_num_fwd) { 3195 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 3196 tid, next_mds, req->r_num_fwd, fwd_seq); 3197 } else { 3198 /* resend. forward race not possible; mds would drop */ 3199 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3200 BUG_ON(req->r_err); 3201 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3202 req->r_attempts = 0; 3203 req->r_num_fwd = fwd_seq; 3204 req->r_resend_mds = next_mds; 3205 put_request_session(req); 3206 __do_request(mdsc, req); 3207 } 3208 ceph_mdsc_put_request(req); 3209 out: 3210 mutex_unlock(&mdsc->mutex); 3211 return; 3212 3213 bad: 3214 pr_err("mdsc_handle_forward decode error err=%d\n", err); 3215 } 3216 3217 static int __decode_session_metadata(void **p, void *end, 3218 bool *blacklisted) 3219 { 3220 /* map<string,string> */ 3221 u32 n; 3222 bool err_str; 3223 ceph_decode_32_safe(p, end, n, bad); 3224 while (n-- > 0) { 3225 u32 len; 3226 ceph_decode_32_safe(p, end, len, bad); 3227 ceph_decode_need(p, end, len, bad); 3228 err_str = !strncmp(*p, "error_string", len); 3229 *p += len; 3230 ceph_decode_32_safe(p, end, len, bad); 3231 ceph_decode_need(p, end, len, bad); 3232 if (err_str && strnstr(*p, "blacklisted", len)) 3233 *blacklisted = true; 3234 *p += len; 3235 } 3236 return 0; 3237 bad: 3238 return -1; 3239 } 3240 3241 /* 3242 * handle a mds session control message 3243 */ 3244 static void handle_session(struct ceph_mds_session *session, 3245 struct ceph_msg *msg) 3246 { 3247 struct ceph_mds_client *mdsc = session->s_mdsc; 3248 int mds = session->s_mds; 3249 int msg_version = le16_to_cpu(msg->hdr.version); 3250 void *p = msg->front.iov_base; 3251 void *end = p + msg->front.iov_len; 3252 struct ceph_mds_session_head *h; 3253 u32 op; 3254 u64 seq, features = 0; 3255 int wake = 0; 3256 bool blacklisted = false; 3257 3258 /* decode */ 3259 ceph_decode_need(&p, end, sizeof(*h), bad); 3260 h = p; 3261 p += sizeof(*h); 3262 3263 op = le32_to_cpu(h->op); 3264 seq = le64_to_cpu(h->seq); 3265 3266 if (msg_version >= 3) { 3267 u32 len; 3268 /* version >= 2, metadata */ 3269 if (__decode_session_metadata(&p, end, &blacklisted) < 0) 3270 goto bad; 3271 /* version >= 3, feature bits */ 3272 ceph_decode_32_safe(&p, end, len, bad); 3273 ceph_decode_64_safe(&p, end, features, bad); 3274 p += len - sizeof(features); 3275 } 3276 3277 mutex_lock(&mdsc->mutex); 3278 if (op == CEPH_SESSION_CLOSE) { 3279 ceph_get_mds_session(session); 3280 __unregister_session(mdsc, session); 3281 } 3282 /* FIXME: this ttl calculation is generous */ 3283 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 3284 mutex_unlock(&mdsc->mutex); 3285 3286 mutex_lock(&session->s_mutex); 3287 3288 dout("handle_session mds%d %s %p state %s seq %llu\n", 3289 mds, ceph_session_op_name(op), session, 3290 ceph_session_state_name(session->s_state), seq); 3291 3292 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 3293 session->s_state = CEPH_MDS_SESSION_OPEN; 3294 pr_info("mds%d came back\n", session->s_mds); 3295 } 3296 3297 switch (op) { 3298 case CEPH_SESSION_OPEN: 3299 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3300 pr_info("mds%d reconnect success\n", session->s_mds); 3301 session->s_state = CEPH_MDS_SESSION_OPEN; 3302 session->s_features = features; 3303 renewed_caps(mdsc, session, 0); 3304 wake = 1; 3305 if (mdsc->stopping) 3306 __close_session(mdsc, session); 3307 break; 3308 3309 case CEPH_SESSION_RENEWCAPS: 3310 if (session->s_renew_seq == seq) 3311 renewed_caps(mdsc, session, 1); 3312 break; 3313 3314 case CEPH_SESSION_CLOSE: 3315 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3316 pr_info("mds%d reconnect denied\n", session->s_mds); 3317 session->s_state = CEPH_MDS_SESSION_CLOSED; 3318 cleanup_session_requests(mdsc, session); 3319 remove_session_caps(session); 3320 wake = 2; /* for good measure */ 3321 wake_up_all(&mdsc->session_close_wq); 3322 break; 3323 3324 case CEPH_SESSION_STALE: 3325 pr_info("mds%d caps went stale, renewing\n", 3326 session->s_mds); 3327 spin_lock(&session->s_gen_ttl_lock); 3328 session->s_cap_gen++; 3329 session->s_cap_ttl = jiffies - 1; 3330 spin_unlock(&session->s_gen_ttl_lock); 3331 send_renew_caps(mdsc, session); 3332 break; 3333 3334 case CEPH_SESSION_RECALL_STATE: 3335 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 3336 break; 3337 3338 case CEPH_SESSION_FLUSHMSG: 3339 send_flushmsg_ack(mdsc, session, seq); 3340 break; 3341 3342 case CEPH_SESSION_FORCE_RO: 3343 dout("force_session_readonly %p\n", session); 3344 spin_lock(&session->s_cap_lock); 3345 session->s_readonly = true; 3346 spin_unlock(&session->s_cap_lock); 3347 wake_up_session_caps(session, FORCE_RO); 3348 break; 3349 3350 case CEPH_SESSION_REJECT: 3351 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 3352 pr_info("mds%d rejected session\n", session->s_mds); 3353 session->s_state = CEPH_MDS_SESSION_REJECTED; 3354 cleanup_session_requests(mdsc, session); 3355 remove_session_caps(session); 3356 if (blacklisted) 3357 mdsc->fsc->blacklisted = true; 3358 wake = 2; /* for good measure */ 3359 break; 3360 3361 default: 3362 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 3363 WARN_ON(1); 3364 } 3365 3366 mutex_unlock(&session->s_mutex); 3367 if (wake) { 3368 mutex_lock(&mdsc->mutex); 3369 __wake_requests(mdsc, &session->s_waiting); 3370 if (wake == 2) 3371 kick_requests(mdsc, mds); 3372 mutex_unlock(&mdsc->mutex); 3373 } 3374 if (op == CEPH_SESSION_CLOSE) 3375 ceph_put_mds_session(session); 3376 return; 3377 3378 bad: 3379 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 3380 (int)msg->front.iov_len); 3381 ceph_msg_dump(msg); 3382 return; 3383 } 3384 3385 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 3386 { 3387 int dcaps; 3388 3389 dcaps = xchg(&req->r_dir_caps, 0); 3390 if (dcaps) { 3391 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3392 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 3393 } 3394 } 3395 3396 /* 3397 * called under session->mutex. 3398 */ 3399 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 3400 struct ceph_mds_session *session) 3401 { 3402 struct ceph_mds_request *req, *nreq; 3403 struct rb_node *p; 3404 3405 dout("replay_unsafe_requests mds%d\n", session->s_mds); 3406 3407 mutex_lock(&mdsc->mutex); 3408 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 3409 __send_request(mdsc, session, req, true); 3410 3411 /* 3412 * also re-send old requests when MDS enters reconnect stage. So that MDS 3413 * can process completed request in clientreplay stage. 3414 */ 3415 p = rb_first(&mdsc->request_tree); 3416 while (p) { 3417 req = rb_entry(p, struct ceph_mds_request, r_node); 3418 p = rb_next(p); 3419 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3420 continue; 3421 if (req->r_attempts == 0) 3422 continue; /* only old requests */ 3423 if (!req->r_session) 3424 continue; 3425 if (req->r_session->s_mds != session->s_mds) 3426 continue; 3427 3428 ceph_mdsc_release_dir_caps(req); 3429 3430 __send_request(mdsc, session, req, true); 3431 } 3432 mutex_unlock(&mdsc->mutex); 3433 } 3434 3435 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 3436 { 3437 struct ceph_msg *reply; 3438 struct ceph_pagelist *_pagelist; 3439 struct page *page; 3440 __le32 *addr; 3441 int err = -ENOMEM; 3442 3443 if (!recon_state->allow_multi) 3444 return -ENOSPC; 3445 3446 /* can't handle message that contains both caps and realm */ 3447 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 3448 3449 /* pre-allocate new pagelist */ 3450 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 3451 if (!_pagelist) 3452 return -ENOMEM; 3453 3454 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3455 if (!reply) 3456 goto fail_msg; 3457 3458 /* placeholder for nr_caps */ 3459 err = ceph_pagelist_encode_32(_pagelist, 0); 3460 if (err < 0) 3461 goto fail; 3462 3463 if (recon_state->nr_caps) { 3464 /* currently encoding caps */ 3465 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 3466 if (err) 3467 goto fail; 3468 } else { 3469 /* placeholder for nr_realms (currently encoding relams) */ 3470 err = ceph_pagelist_encode_32(_pagelist, 0); 3471 if (err < 0) 3472 goto fail; 3473 } 3474 3475 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 3476 if (err) 3477 goto fail; 3478 3479 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 3480 addr = kmap_atomic(page); 3481 if (recon_state->nr_caps) { 3482 /* currently encoding caps */ 3483 *addr = cpu_to_le32(recon_state->nr_caps); 3484 } else { 3485 /* currently encoding relams */ 3486 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 3487 } 3488 kunmap_atomic(addr); 3489 3490 reply->hdr.version = cpu_to_le16(5); 3491 reply->hdr.compat_version = cpu_to_le16(4); 3492 3493 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 3494 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 3495 3496 ceph_con_send(&recon_state->session->s_con, reply); 3497 ceph_pagelist_release(recon_state->pagelist); 3498 3499 recon_state->pagelist = _pagelist; 3500 recon_state->nr_caps = 0; 3501 recon_state->nr_realms = 0; 3502 recon_state->msg_version = 5; 3503 return 0; 3504 fail: 3505 ceph_msg_put(reply); 3506 fail_msg: 3507 ceph_pagelist_release(_pagelist); 3508 return err; 3509 } 3510 3511 /* 3512 * Encode information about a cap for a reconnect with the MDS. 3513 */ 3514 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, 3515 void *arg) 3516 { 3517 union { 3518 struct ceph_mds_cap_reconnect v2; 3519 struct ceph_mds_cap_reconnect_v1 v1; 3520 } rec; 3521 struct ceph_inode_info *ci = cap->ci; 3522 struct ceph_reconnect_state *recon_state = arg; 3523 struct ceph_pagelist *pagelist = recon_state->pagelist; 3524 int err; 3525 u64 snap_follows; 3526 3527 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 3528 inode, ceph_vinop(inode), cap, cap->cap_id, 3529 ceph_cap_string(cap->issued)); 3530 3531 spin_lock(&ci->i_ceph_lock); 3532 cap->seq = 0; /* reset cap seq */ 3533 cap->issue_seq = 0; /* and issue_seq */ 3534 cap->mseq = 0; /* and migrate_seq */ 3535 cap->cap_gen = cap->session->s_cap_gen; 3536 3537 /* These are lost when the session goes away */ 3538 if (S_ISDIR(inode->i_mode)) { 3539 if (cap->issued & CEPH_CAP_DIR_CREATE) { 3540 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 3541 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 3542 } 3543 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 3544 } 3545 3546 if (recon_state->msg_version >= 2) { 3547 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 3548 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3549 rec.v2.issued = cpu_to_le32(cap->issued); 3550 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3551 rec.v2.pathbase = 0; 3552 rec.v2.flock_len = (__force __le32) 3553 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 3554 } else { 3555 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 3556 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3557 rec.v1.issued = cpu_to_le32(cap->issued); 3558 rec.v1.size = cpu_to_le64(inode->i_size); 3559 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 3560 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 3561 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3562 rec.v1.pathbase = 0; 3563 } 3564 3565 if (list_empty(&ci->i_cap_snaps)) { 3566 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 3567 } else { 3568 struct ceph_cap_snap *capsnap = 3569 list_first_entry(&ci->i_cap_snaps, 3570 struct ceph_cap_snap, ci_item); 3571 snap_follows = capsnap->follows; 3572 } 3573 spin_unlock(&ci->i_ceph_lock); 3574 3575 if (recon_state->msg_version >= 2) { 3576 int num_fcntl_locks, num_flock_locks; 3577 struct ceph_filelock *flocks = NULL; 3578 size_t struct_len, total_len = sizeof(u64); 3579 u8 struct_v = 0; 3580 3581 encode_again: 3582 if (rec.v2.flock_len) { 3583 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 3584 } else { 3585 num_fcntl_locks = 0; 3586 num_flock_locks = 0; 3587 } 3588 if (num_fcntl_locks + num_flock_locks > 0) { 3589 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 3590 sizeof(struct ceph_filelock), 3591 GFP_NOFS); 3592 if (!flocks) { 3593 err = -ENOMEM; 3594 goto out_err; 3595 } 3596 err = ceph_encode_locks_to_buffer(inode, flocks, 3597 num_fcntl_locks, 3598 num_flock_locks); 3599 if (err) { 3600 kfree(flocks); 3601 flocks = NULL; 3602 if (err == -ENOSPC) 3603 goto encode_again; 3604 goto out_err; 3605 } 3606 } else { 3607 kfree(flocks); 3608 flocks = NULL; 3609 } 3610 3611 if (recon_state->msg_version >= 3) { 3612 /* version, compat_version and struct_len */ 3613 total_len += 2 * sizeof(u8) + sizeof(u32); 3614 struct_v = 2; 3615 } 3616 /* 3617 * number of encoded locks is stable, so copy to pagelist 3618 */ 3619 struct_len = 2 * sizeof(u32) + 3620 (num_fcntl_locks + num_flock_locks) * 3621 sizeof(struct ceph_filelock); 3622 rec.v2.flock_len = cpu_to_le32(struct_len); 3623 3624 struct_len += sizeof(u32) + sizeof(rec.v2); 3625 3626 if (struct_v >= 2) 3627 struct_len += sizeof(u64); /* snap_follows */ 3628 3629 total_len += struct_len; 3630 3631 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 3632 err = send_reconnect_partial(recon_state); 3633 if (err) 3634 goto out_freeflocks; 3635 pagelist = recon_state->pagelist; 3636 } 3637 3638 err = ceph_pagelist_reserve(pagelist, total_len); 3639 if (err) 3640 goto out_freeflocks; 3641 3642 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3643 if (recon_state->msg_version >= 3) { 3644 ceph_pagelist_encode_8(pagelist, struct_v); 3645 ceph_pagelist_encode_8(pagelist, 1); 3646 ceph_pagelist_encode_32(pagelist, struct_len); 3647 } 3648 ceph_pagelist_encode_string(pagelist, NULL, 0); 3649 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 3650 ceph_locks_to_pagelist(flocks, pagelist, 3651 num_fcntl_locks, num_flock_locks); 3652 if (struct_v >= 2) 3653 ceph_pagelist_encode_64(pagelist, snap_follows); 3654 out_freeflocks: 3655 kfree(flocks); 3656 } else { 3657 u64 pathbase = 0; 3658 int pathlen = 0; 3659 char *path = NULL; 3660 struct dentry *dentry; 3661 3662 dentry = d_find_alias(inode); 3663 if (dentry) { 3664 path = ceph_mdsc_build_path(dentry, 3665 &pathlen, &pathbase, 0); 3666 dput(dentry); 3667 if (IS_ERR(path)) { 3668 err = PTR_ERR(path); 3669 goto out_err; 3670 } 3671 rec.v1.pathbase = cpu_to_le64(pathbase); 3672 } 3673 3674 err = ceph_pagelist_reserve(pagelist, 3675 sizeof(u64) + sizeof(u32) + 3676 pathlen + sizeof(rec.v1)); 3677 if (err) { 3678 goto out_freepath; 3679 } 3680 3681 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3682 ceph_pagelist_encode_string(pagelist, path, pathlen); 3683 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 3684 out_freepath: 3685 ceph_mdsc_free_path(path, pathlen); 3686 } 3687 3688 out_err: 3689 if (err >= 0) 3690 recon_state->nr_caps++; 3691 return err; 3692 } 3693 3694 static int encode_snap_realms(struct ceph_mds_client *mdsc, 3695 struct ceph_reconnect_state *recon_state) 3696 { 3697 struct rb_node *p; 3698 struct ceph_pagelist *pagelist = recon_state->pagelist; 3699 int err = 0; 3700 3701 if (recon_state->msg_version >= 4) { 3702 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 3703 if (err < 0) 3704 goto fail; 3705 } 3706 3707 /* 3708 * snaprealms. we provide mds with the ino, seq (version), and 3709 * parent for all of our realms. If the mds has any newer info, 3710 * it will tell us. 3711 */ 3712 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 3713 struct ceph_snap_realm *realm = 3714 rb_entry(p, struct ceph_snap_realm, node); 3715 struct ceph_mds_snaprealm_reconnect sr_rec; 3716 3717 if (recon_state->msg_version >= 4) { 3718 size_t need = sizeof(u8) * 2 + sizeof(u32) + 3719 sizeof(sr_rec); 3720 3721 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 3722 err = send_reconnect_partial(recon_state); 3723 if (err) 3724 goto fail; 3725 pagelist = recon_state->pagelist; 3726 } 3727 3728 err = ceph_pagelist_reserve(pagelist, need); 3729 if (err) 3730 goto fail; 3731 3732 ceph_pagelist_encode_8(pagelist, 1); 3733 ceph_pagelist_encode_8(pagelist, 1); 3734 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 3735 } 3736 3737 dout(" adding snap realm %llx seq %lld parent %llx\n", 3738 realm->ino, realm->seq, realm->parent_ino); 3739 sr_rec.ino = cpu_to_le64(realm->ino); 3740 sr_rec.seq = cpu_to_le64(realm->seq); 3741 sr_rec.parent = cpu_to_le64(realm->parent_ino); 3742 3743 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 3744 if (err) 3745 goto fail; 3746 3747 recon_state->nr_realms++; 3748 } 3749 fail: 3750 return err; 3751 } 3752 3753 3754 /* 3755 * If an MDS fails and recovers, clients need to reconnect in order to 3756 * reestablish shared state. This includes all caps issued through 3757 * this session _and_ the snap_realm hierarchy. Because it's not 3758 * clear which snap realms the mds cares about, we send everything we 3759 * know about.. that ensures we'll then get any new info the 3760 * recovering MDS might have. 3761 * 3762 * This is a relatively heavyweight operation, but it's rare. 3763 * 3764 * called with mdsc->mutex held. 3765 */ 3766 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 3767 struct ceph_mds_session *session) 3768 { 3769 struct ceph_msg *reply; 3770 int mds = session->s_mds; 3771 int err = -ENOMEM; 3772 struct ceph_reconnect_state recon_state = { 3773 .session = session, 3774 }; 3775 LIST_HEAD(dispose); 3776 3777 pr_info("mds%d reconnect start\n", mds); 3778 3779 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 3780 if (!recon_state.pagelist) 3781 goto fail_nopagelist; 3782 3783 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3784 if (!reply) 3785 goto fail_nomsg; 3786 3787 xa_destroy(&session->s_delegated_inos); 3788 3789 mutex_lock(&session->s_mutex); 3790 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 3791 session->s_seq = 0; 3792 3793 dout("session %p state %s\n", session, 3794 ceph_session_state_name(session->s_state)); 3795 3796 spin_lock(&session->s_gen_ttl_lock); 3797 session->s_cap_gen++; 3798 spin_unlock(&session->s_gen_ttl_lock); 3799 3800 spin_lock(&session->s_cap_lock); 3801 /* don't know if session is readonly */ 3802 session->s_readonly = 0; 3803 /* 3804 * notify __ceph_remove_cap() that we are composing cap reconnect. 3805 * If a cap get released before being added to the cap reconnect, 3806 * __ceph_remove_cap() should skip queuing cap release. 3807 */ 3808 session->s_cap_reconnect = 1; 3809 /* drop old cap expires; we're about to reestablish that state */ 3810 detach_cap_releases(session, &dispose); 3811 spin_unlock(&session->s_cap_lock); 3812 dispose_cap_releases(mdsc, &dispose); 3813 3814 /* trim unused caps to reduce MDS's cache rejoin time */ 3815 if (mdsc->fsc->sb->s_root) 3816 shrink_dcache_parent(mdsc->fsc->sb->s_root); 3817 3818 ceph_con_close(&session->s_con); 3819 ceph_con_open(&session->s_con, 3820 CEPH_ENTITY_TYPE_MDS, mds, 3821 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 3822 3823 /* replay unsafe requests */ 3824 replay_unsafe_requests(mdsc, session); 3825 3826 ceph_early_kick_flushing_caps(mdsc, session); 3827 3828 down_read(&mdsc->snap_rwsem); 3829 3830 /* placeholder for nr_caps */ 3831 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 3832 if (err) 3833 goto fail; 3834 3835 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 3836 recon_state.msg_version = 3; 3837 recon_state.allow_multi = true; 3838 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 3839 recon_state.msg_version = 3; 3840 } else { 3841 recon_state.msg_version = 2; 3842 } 3843 /* trsaverse this session's caps */ 3844 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 3845 3846 spin_lock(&session->s_cap_lock); 3847 session->s_cap_reconnect = 0; 3848 spin_unlock(&session->s_cap_lock); 3849 3850 if (err < 0) 3851 goto fail; 3852 3853 /* check if all realms can be encoded into current message */ 3854 if (mdsc->num_snap_realms) { 3855 size_t total_len = 3856 recon_state.pagelist->length + 3857 mdsc->num_snap_realms * 3858 sizeof(struct ceph_mds_snaprealm_reconnect); 3859 if (recon_state.msg_version >= 4) { 3860 /* number of realms */ 3861 total_len += sizeof(u32); 3862 /* version, compat_version and struct_len */ 3863 total_len += mdsc->num_snap_realms * 3864 (2 * sizeof(u8) + sizeof(u32)); 3865 } 3866 if (total_len > RECONNECT_MAX_SIZE) { 3867 if (!recon_state.allow_multi) { 3868 err = -ENOSPC; 3869 goto fail; 3870 } 3871 if (recon_state.nr_caps) { 3872 err = send_reconnect_partial(&recon_state); 3873 if (err) 3874 goto fail; 3875 } 3876 recon_state.msg_version = 5; 3877 } 3878 } 3879 3880 err = encode_snap_realms(mdsc, &recon_state); 3881 if (err < 0) 3882 goto fail; 3883 3884 if (recon_state.msg_version >= 5) { 3885 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 3886 if (err < 0) 3887 goto fail; 3888 } 3889 3890 if (recon_state.nr_caps || recon_state.nr_realms) { 3891 struct page *page = 3892 list_first_entry(&recon_state.pagelist->head, 3893 struct page, lru); 3894 __le32 *addr = kmap_atomic(page); 3895 if (recon_state.nr_caps) { 3896 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 3897 *addr = cpu_to_le32(recon_state.nr_caps); 3898 } else if (recon_state.msg_version >= 4) { 3899 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 3900 } 3901 kunmap_atomic(addr); 3902 } 3903 3904 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 3905 if (recon_state.msg_version >= 4) 3906 reply->hdr.compat_version = cpu_to_le16(4); 3907 3908 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 3909 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 3910 3911 ceph_con_send(&session->s_con, reply); 3912 3913 mutex_unlock(&session->s_mutex); 3914 3915 mutex_lock(&mdsc->mutex); 3916 __wake_requests(mdsc, &session->s_waiting); 3917 mutex_unlock(&mdsc->mutex); 3918 3919 up_read(&mdsc->snap_rwsem); 3920 ceph_pagelist_release(recon_state.pagelist); 3921 return; 3922 3923 fail: 3924 ceph_msg_put(reply); 3925 up_read(&mdsc->snap_rwsem); 3926 mutex_unlock(&session->s_mutex); 3927 fail_nomsg: 3928 ceph_pagelist_release(recon_state.pagelist); 3929 fail_nopagelist: 3930 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 3931 return; 3932 } 3933 3934 3935 /* 3936 * compare old and new mdsmaps, kicking requests 3937 * and closing out old connections as necessary 3938 * 3939 * called under mdsc->mutex. 3940 */ 3941 static void check_new_map(struct ceph_mds_client *mdsc, 3942 struct ceph_mdsmap *newmap, 3943 struct ceph_mdsmap *oldmap) 3944 { 3945 int i; 3946 int oldstate, newstate; 3947 struct ceph_mds_session *s; 3948 3949 dout("check_new_map new %u old %u\n", 3950 newmap->m_epoch, oldmap->m_epoch); 3951 3952 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 3953 if (!mdsc->sessions[i]) 3954 continue; 3955 s = mdsc->sessions[i]; 3956 oldstate = ceph_mdsmap_get_state(oldmap, i); 3957 newstate = ceph_mdsmap_get_state(newmap, i); 3958 3959 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 3960 i, ceph_mds_state_name(oldstate), 3961 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 3962 ceph_mds_state_name(newstate), 3963 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 3964 ceph_session_state_name(s->s_state)); 3965 3966 if (i >= newmap->possible_max_rank) { 3967 /* force close session for stopped mds */ 3968 ceph_get_mds_session(s); 3969 __unregister_session(mdsc, s); 3970 __wake_requests(mdsc, &s->s_waiting); 3971 mutex_unlock(&mdsc->mutex); 3972 3973 mutex_lock(&s->s_mutex); 3974 cleanup_session_requests(mdsc, s); 3975 remove_session_caps(s); 3976 mutex_unlock(&s->s_mutex); 3977 3978 ceph_put_mds_session(s); 3979 3980 mutex_lock(&mdsc->mutex); 3981 kick_requests(mdsc, i); 3982 continue; 3983 } 3984 3985 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 3986 ceph_mdsmap_get_addr(newmap, i), 3987 sizeof(struct ceph_entity_addr))) { 3988 /* just close it */ 3989 mutex_unlock(&mdsc->mutex); 3990 mutex_lock(&s->s_mutex); 3991 mutex_lock(&mdsc->mutex); 3992 ceph_con_close(&s->s_con); 3993 mutex_unlock(&s->s_mutex); 3994 s->s_state = CEPH_MDS_SESSION_RESTARTING; 3995 } else if (oldstate == newstate) { 3996 continue; /* nothing new with this mds */ 3997 } 3998 3999 /* 4000 * send reconnect? 4001 */ 4002 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 4003 newstate >= CEPH_MDS_STATE_RECONNECT) { 4004 mutex_unlock(&mdsc->mutex); 4005 send_mds_reconnect(mdsc, s); 4006 mutex_lock(&mdsc->mutex); 4007 } 4008 4009 /* 4010 * kick request on any mds that has gone active. 4011 */ 4012 if (oldstate < CEPH_MDS_STATE_ACTIVE && 4013 newstate >= CEPH_MDS_STATE_ACTIVE) { 4014 if (oldstate != CEPH_MDS_STATE_CREATING && 4015 oldstate != CEPH_MDS_STATE_STARTING) 4016 pr_info("mds%d recovery completed\n", s->s_mds); 4017 kick_requests(mdsc, i); 4018 ceph_kick_flushing_caps(mdsc, s); 4019 wake_up_session_caps(s, RECONNECT); 4020 } 4021 } 4022 4023 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4024 s = mdsc->sessions[i]; 4025 if (!s) 4026 continue; 4027 if (!ceph_mdsmap_is_laggy(newmap, i)) 4028 continue; 4029 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4030 s->s_state == CEPH_MDS_SESSION_HUNG || 4031 s->s_state == CEPH_MDS_SESSION_CLOSING) { 4032 dout(" connecting to export targets of laggy mds%d\n", 4033 i); 4034 __open_export_target_sessions(mdsc, s); 4035 } 4036 } 4037 } 4038 4039 4040 4041 /* 4042 * leases 4043 */ 4044 4045 /* 4046 * caller must hold session s_mutex, dentry->d_lock 4047 */ 4048 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 4049 { 4050 struct ceph_dentry_info *di = ceph_dentry(dentry); 4051 4052 ceph_put_mds_session(di->lease_session); 4053 di->lease_session = NULL; 4054 } 4055 4056 static void handle_lease(struct ceph_mds_client *mdsc, 4057 struct ceph_mds_session *session, 4058 struct ceph_msg *msg) 4059 { 4060 struct super_block *sb = mdsc->fsc->sb; 4061 struct inode *inode; 4062 struct dentry *parent, *dentry; 4063 struct ceph_dentry_info *di; 4064 int mds = session->s_mds; 4065 struct ceph_mds_lease *h = msg->front.iov_base; 4066 u32 seq; 4067 struct ceph_vino vino; 4068 struct qstr dname; 4069 int release = 0; 4070 4071 dout("handle_lease from mds%d\n", mds); 4072 4073 /* decode */ 4074 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 4075 goto bad; 4076 vino.ino = le64_to_cpu(h->ino); 4077 vino.snap = CEPH_NOSNAP; 4078 seq = le32_to_cpu(h->seq); 4079 dname.len = get_unaligned_le32(h + 1); 4080 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 4081 goto bad; 4082 dname.name = (void *)(h + 1) + sizeof(u32); 4083 4084 /* lookup inode */ 4085 inode = ceph_find_inode(sb, vino); 4086 dout("handle_lease %s, ino %llx %p %.*s\n", 4087 ceph_lease_op_name(h->action), vino.ino, inode, 4088 dname.len, dname.name); 4089 4090 mutex_lock(&session->s_mutex); 4091 session->s_seq++; 4092 4093 if (!inode) { 4094 dout("handle_lease no inode %llx\n", vino.ino); 4095 goto release; 4096 } 4097 4098 /* dentry */ 4099 parent = d_find_alias(inode); 4100 if (!parent) { 4101 dout("no parent dentry on inode %p\n", inode); 4102 WARN_ON(1); 4103 goto release; /* hrm... */ 4104 } 4105 dname.hash = full_name_hash(parent, dname.name, dname.len); 4106 dentry = d_lookup(parent, &dname); 4107 dput(parent); 4108 if (!dentry) 4109 goto release; 4110 4111 spin_lock(&dentry->d_lock); 4112 di = ceph_dentry(dentry); 4113 switch (h->action) { 4114 case CEPH_MDS_LEASE_REVOKE: 4115 if (di->lease_session == session) { 4116 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 4117 h->seq = cpu_to_le32(di->lease_seq); 4118 __ceph_mdsc_drop_dentry_lease(dentry); 4119 } 4120 release = 1; 4121 break; 4122 4123 case CEPH_MDS_LEASE_RENEW: 4124 if (di->lease_session == session && 4125 di->lease_gen == session->s_cap_gen && 4126 di->lease_renew_from && 4127 di->lease_renew_after == 0) { 4128 unsigned long duration = 4129 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 4130 4131 di->lease_seq = seq; 4132 di->time = di->lease_renew_from + duration; 4133 di->lease_renew_after = di->lease_renew_from + 4134 (duration >> 1); 4135 di->lease_renew_from = 0; 4136 } 4137 break; 4138 } 4139 spin_unlock(&dentry->d_lock); 4140 dput(dentry); 4141 4142 if (!release) 4143 goto out; 4144 4145 release: 4146 /* let's just reuse the same message */ 4147 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 4148 ceph_msg_get(msg); 4149 ceph_con_send(&session->s_con, msg); 4150 4151 out: 4152 mutex_unlock(&session->s_mutex); 4153 /* avoid calling iput_final() in mds dispatch threads */ 4154 ceph_async_iput(inode); 4155 return; 4156 4157 bad: 4158 pr_err("corrupt lease message\n"); 4159 ceph_msg_dump(msg); 4160 } 4161 4162 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 4163 struct dentry *dentry, char action, 4164 u32 seq) 4165 { 4166 struct ceph_msg *msg; 4167 struct ceph_mds_lease *lease; 4168 struct inode *dir; 4169 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 4170 4171 dout("lease_send_msg identry %p %s to mds%d\n", 4172 dentry, ceph_lease_op_name(action), session->s_mds); 4173 4174 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 4175 if (!msg) 4176 return; 4177 lease = msg->front.iov_base; 4178 lease->action = action; 4179 lease->seq = cpu_to_le32(seq); 4180 4181 spin_lock(&dentry->d_lock); 4182 dir = d_inode(dentry->d_parent); 4183 lease->ino = cpu_to_le64(ceph_ino(dir)); 4184 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 4185 4186 put_unaligned_le32(dentry->d_name.len, lease + 1); 4187 memcpy((void *)(lease + 1) + 4, 4188 dentry->d_name.name, dentry->d_name.len); 4189 spin_unlock(&dentry->d_lock); 4190 /* 4191 * if this is a preemptive lease RELEASE, no need to 4192 * flush request stream, since the actual request will 4193 * soon follow. 4194 */ 4195 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 4196 4197 ceph_con_send(&session->s_con, msg); 4198 } 4199 4200 /* 4201 * lock unlock sessions, to wait ongoing session activities 4202 */ 4203 static void lock_unlock_sessions(struct ceph_mds_client *mdsc) 4204 { 4205 int i; 4206 4207 mutex_lock(&mdsc->mutex); 4208 for (i = 0; i < mdsc->max_sessions; i++) { 4209 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4210 if (!s) 4211 continue; 4212 mutex_unlock(&mdsc->mutex); 4213 mutex_lock(&s->s_mutex); 4214 mutex_unlock(&s->s_mutex); 4215 ceph_put_mds_session(s); 4216 mutex_lock(&mdsc->mutex); 4217 } 4218 mutex_unlock(&mdsc->mutex); 4219 } 4220 4221 static void maybe_recover_session(struct ceph_mds_client *mdsc) 4222 { 4223 struct ceph_fs_client *fsc = mdsc->fsc; 4224 4225 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 4226 return; 4227 4228 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 4229 return; 4230 4231 if (!READ_ONCE(fsc->blacklisted)) 4232 return; 4233 4234 if (fsc->last_auto_reconnect && 4235 time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30)) 4236 return; 4237 4238 pr_info("auto reconnect after blacklisted\n"); 4239 fsc->last_auto_reconnect = jiffies; 4240 ceph_force_reconnect(fsc->sb); 4241 } 4242 4243 /* 4244 * delayed work -- periodically trim expired leases, renew caps with mds 4245 */ 4246 static void schedule_delayed(struct ceph_mds_client *mdsc) 4247 { 4248 int delay = 5; 4249 unsigned hz = round_jiffies_relative(HZ * delay); 4250 schedule_delayed_work(&mdsc->delayed_work, hz); 4251 } 4252 4253 static void delayed_work(struct work_struct *work) 4254 { 4255 int i; 4256 struct ceph_mds_client *mdsc = 4257 container_of(work, struct ceph_mds_client, delayed_work.work); 4258 int renew_interval; 4259 int renew_caps; 4260 4261 dout("mdsc delayed_work\n"); 4262 4263 mutex_lock(&mdsc->mutex); 4264 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 4265 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 4266 mdsc->last_renew_caps); 4267 if (renew_caps) 4268 mdsc->last_renew_caps = jiffies; 4269 4270 for (i = 0; i < mdsc->max_sessions; i++) { 4271 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4272 if (!s) 4273 continue; 4274 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 4275 dout("resending session close request for mds%d\n", 4276 s->s_mds); 4277 request_close_session(mdsc, s); 4278 ceph_put_mds_session(s); 4279 continue; 4280 } 4281 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 4282 if (s->s_state == CEPH_MDS_SESSION_OPEN) { 4283 s->s_state = CEPH_MDS_SESSION_HUNG; 4284 pr_info("mds%d hung\n", s->s_mds); 4285 } 4286 } 4287 if (s->s_state == CEPH_MDS_SESSION_NEW || 4288 s->s_state == CEPH_MDS_SESSION_RESTARTING || 4289 s->s_state == CEPH_MDS_SESSION_REJECTED) { 4290 /* this mds is failed or recovering, just wait */ 4291 ceph_put_mds_session(s); 4292 continue; 4293 } 4294 mutex_unlock(&mdsc->mutex); 4295 4296 mutex_lock(&s->s_mutex); 4297 if (renew_caps) 4298 send_renew_caps(mdsc, s); 4299 else 4300 ceph_con_keepalive(&s->s_con); 4301 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4302 s->s_state == CEPH_MDS_SESSION_HUNG) 4303 ceph_send_cap_releases(mdsc, s); 4304 mutex_unlock(&s->s_mutex); 4305 ceph_put_mds_session(s); 4306 4307 mutex_lock(&mdsc->mutex); 4308 } 4309 mutex_unlock(&mdsc->mutex); 4310 4311 ceph_check_delayed_caps(mdsc); 4312 4313 ceph_queue_cap_reclaim_work(mdsc); 4314 4315 ceph_trim_snapid_map(mdsc); 4316 4317 maybe_recover_session(mdsc); 4318 4319 schedule_delayed(mdsc); 4320 } 4321 4322 int ceph_mdsc_init(struct ceph_fs_client *fsc) 4323 4324 { 4325 struct ceph_mds_client *mdsc; 4326 4327 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 4328 if (!mdsc) 4329 return -ENOMEM; 4330 mdsc->fsc = fsc; 4331 mutex_init(&mdsc->mutex); 4332 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 4333 if (!mdsc->mdsmap) { 4334 kfree(mdsc); 4335 return -ENOMEM; 4336 } 4337 4338 fsc->mdsc = mdsc; 4339 init_completion(&mdsc->safe_umount_waiters); 4340 init_waitqueue_head(&mdsc->session_close_wq); 4341 INIT_LIST_HEAD(&mdsc->waiting_for_map); 4342 mdsc->sessions = NULL; 4343 atomic_set(&mdsc->num_sessions, 0); 4344 mdsc->max_sessions = 0; 4345 mdsc->stopping = 0; 4346 atomic64_set(&mdsc->quotarealms_count, 0); 4347 mdsc->quotarealms_inodes = RB_ROOT; 4348 mutex_init(&mdsc->quotarealms_inodes_mutex); 4349 mdsc->last_snap_seq = 0; 4350 init_rwsem(&mdsc->snap_rwsem); 4351 mdsc->snap_realms = RB_ROOT; 4352 INIT_LIST_HEAD(&mdsc->snap_empty); 4353 mdsc->num_snap_realms = 0; 4354 spin_lock_init(&mdsc->snap_empty_lock); 4355 mdsc->last_tid = 0; 4356 mdsc->oldest_tid = 0; 4357 mdsc->request_tree = RB_ROOT; 4358 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 4359 mdsc->last_renew_caps = jiffies; 4360 INIT_LIST_HEAD(&mdsc->cap_delay_list); 4361 INIT_LIST_HEAD(&mdsc->cap_wait_list); 4362 spin_lock_init(&mdsc->cap_delay_lock); 4363 INIT_LIST_HEAD(&mdsc->snap_flush_list); 4364 spin_lock_init(&mdsc->snap_flush_lock); 4365 mdsc->last_cap_flush_tid = 1; 4366 INIT_LIST_HEAD(&mdsc->cap_flush_list); 4367 INIT_LIST_HEAD(&mdsc->cap_dirty); 4368 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 4369 mdsc->num_cap_flushing = 0; 4370 spin_lock_init(&mdsc->cap_dirty_lock); 4371 init_waitqueue_head(&mdsc->cap_flushing_wq); 4372 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 4373 atomic_set(&mdsc->cap_reclaim_pending, 0); 4374 4375 spin_lock_init(&mdsc->dentry_list_lock); 4376 INIT_LIST_HEAD(&mdsc->dentry_leases); 4377 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 4378 4379 ceph_caps_init(mdsc); 4380 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 4381 4382 spin_lock_init(&mdsc->snapid_map_lock); 4383 mdsc->snapid_map_tree = RB_ROOT; 4384 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 4385 4386 init_rwsem(&mdsc->pool_perm_rwsem); 4387 mdsc->pool_perm_tree = RB_ROOT; 4388 4389 strscpy(mdsc->nodename, utsname()->nodename, 4390 sizeof(mdsc->nodename)); 4391 return 0; 4392 } 4393 4394 /* 4395 * Wait for safe replies on open mds requests. If we time out, drop 4396 * all requests from the tree to avoid dangling dentry refs. 4397 */ 4398 static void wait_requests(struct ceph_mds_client *mdsc) 4399 { 4400 struct ceph_options *opts = mdsc->fsc->client->options; 4401 struct ceph_mds_request *req; 4402 4403 mutex_lock(&mdsc->mutex); 4404 if (__get_oldest_req(mdsc)) { 4405 mutex_unlock(&mdsc->mutex); 4406 4407 dout("wait_requests waiting for requests\n"); 4408 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 4409 ceph_timeout_jiffies(opts->mount_timeout)); 4410 4411 /* tear down remaining requests */ 4412 mutex_lock(&mdsc->mutex); 4413 while ((req = __get_oldest_req(mdsc))) { 4414 dout("wait_requests timed out on tid %llu\n", 4415 req->r_tid); 4416 list_del_init(&req->r_wait); 4417 __unregister_request(mdsc, req); 4418 } 4419 } 4420 mutex_unlock(&mdsc->mutex); 4421 dout("wait_requests done\n"); 4422 } 4423 4424 /* 4425 * called before mount is ro, and before dentries are torn down. 4426 * (hmm, does this still race with new lookups?) 4427 */ 4428 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 4429 { 4430 dout("pre_umount\n"); 4431 mdsc->stopping = 1; 4432 4433 lock_unlock_sessions(mdsc); 4434 ceph_flush_dirty_caps(mdsc); 4435 wait_requests(mdsc); 4436 4437 /* 4438 * wait for reply handlers to drop their request refs and 4439 * their inode/dcache refs 4440 */ 4441 ceph_msgr_flush(); 4442 4443 ceph_cleanup_quotarealms_inodes(mdsc); 4444 } 4445 4446 /* 4447 * wait for all write mds requests to flush. 4448 */ 4449 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 4450 { 4451 struct ceph_mds_request *req = NULL, *nextreq; 4452 struct rb_node *n; 4453 4454 mutex_lock(&mdsc->mutex); 4455 dout("wait_unsafe_requests want %lld\n", want_tid); 4456 restart: 4457 req = __get_oldest_req(mdsc); 4458 while (req && req->r_tid <= want_tid) { 4459 /* find next request */ 4460 n = rb_next(&req->r_node); 4461 if (n) 4462 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 4463 else 4464 nextreq = NULL; 4465 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 4466 (req->r_op & CEPH_MDS_OP_WRITE)) { 4467 /* write op */ 4468 ceph_mdsc_get_request(req); 4469 if (nextreq) 4470 ceph_mdsc_get_request(nextreq); 4471 mutex_unlock(&mdsc->mutex); 4472 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 4473 req->r_tid, want_tid); 4474 wait_for_completion(&req->r_safe_completion); 4475 mutex_lock(&mdsc->mutex); 4476 ceph_mdsc_put_request(req); 4477 if (!nextreq) 4478 break; /* next dne before, so we're done! */ 4479 if (RB_EMPTY_NODE(&nextreq->r_node)) { 4480 /* next request was removed from tree */ 4481 ceph_mdsc_put_request(nextreq); 4482 goto restart; 4483 } 4484 ceph_mdsc_put_request(nextreq); /* won't go away */ 4485 } 4486 req = nextreq; 4487 } 4488 mutex_unlock(&mdsc->mutex); 4489 dout("wait_unsafe_requests done\n"); 4490 } 4491 4492 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 4493 { 4494 u64 want_tid, want_flush; 4495 4496 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4497 return; 4498 4499 dout("sync\n"); 4500 mutex_lock(&mdsc->mutex); 4501 want_tid = mdsc->last_tid; 4502 mutex_unlock(&mdsc->mutex); 4503 4504 ceph_flush_dirty_caps(mdsc); 4505 spin_lock(&mdsc->cap_dirty_lock); 4506 want_flush = mdsc->last_cap_flush_tid; 4507 if (!list_empty(&mdsc->cap_flush_list)) { 4508 struct ceph_cap_flush *cf = 4509 list_last_entry(&mdsc->cap_flush_list, 4510 struct ceph_cap_flush, g_list); 4511 cf->wake = true; 4512 } 4513 spin_unlock(&mdsc->cap_dirty_lock); 4514 4515 dout("sync want tid %lld flush_seq %lld\n", 4516 want_tid, want_flush); 4517 4518 wait_unsafe_requests(mdsc, want_tid); 4519 wait_caps_flush(mdsc, want_flush); 4520 } 4521 4522 /* 4523 * true if all sessions are closed, or we force unmount 4524 */ 4525 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 4526 { 4527 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4528 return true; 4529 return atomic_read(&mdsc->num_sessions) <= skipped; 4530 } 4531 4532 /* 4533 * called after sb is ro. 4534 */ 4535 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 4536 { 4537 struct ceph_options *opts = mdsc->fsc->client->options; 4538 struct ceph_mds_session *session; 4539 int i; 4540 int skipped = 0; 4541 4542 dout("close_sessions\n"); 4543 4544 /* close sessions */ 4545 mutex_lock(&mdsc->mutex); 4546 for (i = 0; i < mdsc->max_sessions; i++) { 4547 session = __ceph_lookup_mds_session(mdsc, i); 4548 if (!session) 4549 continue; 4550 mutex_unlock(&mdsc->mutex); 4551 mutex_lock(&session->s_mutex); 4552 if (__close_session(mdsc, session) <= 0) 4553 skipped++; 4554 mutex_unlock(&session->s_mutex); 4555 ceph_put_mds_session(session); 4556 mutex_lock(&mdsc->mutex); 4557 } 4558 mutex_unlock(&mdsc->mutex); 4559 4560 dout("waiting for sessions to close\n"); 4561 wait_event_timeout(mdsc->session_close_wq, 4562 done_closing_sessions(mdsc, skipped), 4563 ceph_timeout_jiffies(opts->mount_timeout)); 4564 4565 /* tear down remaining sessions */ 4566 mutex_lock(&mdsc->mutex); 4567 for (i = 0; i < mdsc->max_sessions; i++) { 4568 if (mdsc->sessions[i]) { 4569 session = ceph_get_mds_session(mdsc->sessions[i]); 4570 __unregister_session(mdsc, session); 4571 mutex_unlock(&mdsc->mutex); 4572 mutex_lock(&session->s_mutex); 4573 remove_session_caps(session); 4574 mutex_unlock(&session->s_mutex); 4575 ceph_put_mds_session(session); 4576 mutex_lock(&mdsc->mutex); 4577 } 4578 } 4579 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 4580 mutex_unlock(&mdsc->mutex); 4581 4582 ceph_cleanup_snapid_map(mdsc); 4583 ceph_cleanup_empty_realms(mdsc); 4584 4585 cancel_work_sync(&mdsc->cap_reclaim_work); 4586 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4587 4588 dout("stopped\n"); 4589 } 4590 4591 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 4592 { 4593 struct ceph_mds_session *session; 4594 int mds; 4595 4596 dout("force umount\n"); 4597 4598 mutex_lock(&mdsc->mutex); 4599 for (mds = 0; mds < mdsc->max_sessions; mds++) { 4600 session = __ceph_lookup_mds_session(mdsc, mds); 4601 if (!session) 4602 continue; 4603 4604 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 4605 __unregister_session(mdsc, session); 4606 __wake_requests(mdsc, &session->s_waiting); 4607 mutex_unlock(&mdsc->mutex); 4608 4609 mutex_lock(&session->s_mutex); 4610 __close_session(mdsc, session); 4611 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 4612 cleanup_session_requests(mdsc, session); 4613 remove_session_caps(session); 4614 } 4615 mutex_unlock(&session->s_mutex); 4616 ceph_put_mds_session(session); 4617 4618 mutex_lock(&mdsc->mutex); 4619 kick_requests(mdsc, mds); 4620 } 4621 __wake_requests(mdsc, &mdsc->waiting_for_map); 4622 mutex_unlock(&mdsc->mutex); 4623 } 4624 4625 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 4626 { 4627 dout("stop\n"); 4628 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4629 if (mdsc->mdsmap) 4630 ceph_mdsmap_destroy(mdsc->mdsmap); 4631 kfree(mdsc->sessions); 4632 ceph_caps_finalize(mdsc); 4633 ceph_pool_perm_destroy(mdsc); 4634 } 4635 4636 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 4637 { 4638 struct ceph_mds_client *mdsc = fsc->mdsc; 4639 dout("mdsc_destroy %p\n", mdsc); 4640 4641 if (!mdsc) 4642 return; 4643 4644 /* flush out any connection work with references to us */ 4645 ceph_msgr_flush(); 4646 4647 ceph_mdsc_stop(mdsc); 4648 4649 fsc->mdsc = NULL; 4650 kfree(mdsc); 4651 dout("mdsc_destroy %p done\n", mdsc); 4652 } 4653 4654 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4655 { 4656 struct ceph_fs_client *fsc = mdsc->fsc; 4657 const char *mds_namespace = fsc->mount_options->mds_namespace; 4658 void *p = msg->front.iov_base; 4659 void *end = p + msg->front.iov_len; 4660 u32 epoch; 4661 u32 map_len; 4662 u32 num_fs; 4663 u32 mount_fscid = (u32)-1; 4664 u8 struct_v, struct_cv; 4665 int err = -EINVAL; 4666 4667 ceph_decode_need(&p, end, sizeof(u32), bad); 4668 epoch = ceph_decode_32(&p); 4669 4670 dout("handle_fsmap epoch %u\n", epoch); 4671 4672 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4673 struct_v = ceph_decode_8(&p); 4674 struct_cv = ceph_decode_8(&p); 4675 map_len = ceph_decode_32(&p); 4676 4677 ceph_decode_need(&p, end, sizeof(u32) * 3, bad); 4678 p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */ 4679 4680 num_fs = ceph_decode_32(&p); 4681 while (num_fs-- > 0) { 4682 void *info_p, *info_end; 4683 u32 info_len; 4684 u8 info_v, info_cv; 4685 u32 fscid, namelen; 4686 4687 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4688 info_v = ceph_decode_8(&p); 4689 info_cv = ceph_decode_8(&p); 4690 info_len = ceph_decode_32(&p); 4691 ceph_decode_need(&p, end, info_len, bad); 4692 info_p = p; 4693 info_end = p + info_len; 4694 p = info_end; 4695 4696 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 4697 fscid = ceph_decode_32(&info_p); 4698 namelen = ceph_decode_32(&info_p); 4699 ceph_decode_need(&info_p, info_end, namelen, bad); 4700 4701 if (mds_namespace && 4702 strlen(mds_namespace) == namelen && 4703 !strncmp(mds_namespace, (char *)info_p, namelen)) { 4704 mount_fscid = fscid; 4705 break; 4706 } 4707 } 4708 4709 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 4710 if (mount_fscid != (u32)-1) { 4711 fsc->client->monc.fs_cluster_id = mount_fscid; 4712 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 4713 0, true); 4714 ceph_monc_renew_subs(&fsc->client->monc); 4715 } else { 4716 err = -ENOENT; 4717 goto err_out; 4718 } 4719 return; 4720 4721 bad: 4722 pr_err("error decoding fsmap\n"); 4723 err_out: 4724 mutex_lock(&mdsc->mutex); 4725 mdsc->mdsmap_err = err; 4726 __wake_requests(mdsc, &mdsc->waiting_for_map); 4727 mutex_unlock(&mdsc->mutex); 4728 } 4729 4730 /* 4731 * handle mds map update. 4732 */ 4733 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4734 { 4735 u32 epoch; 4736 u32 maplen; 4737 void *p = msg->front.iov_base; 4738 void *end = p + msg->front.iov_len; 4739 struct ceph_mdsmap *newmap, *oldmap; 4740 struct ceph_fsid fsid; 4741 int err = -EINVAL; 4742 4743 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 4744 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 4745 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 4746 return; 4747 epoch = ceph_decode_32(&p); 4748 maplen = ceph_decode_32(&p); 4749 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 4750 4751 /* do we need it? */ 4752 mutex_lock(&mdsc->mutex); 4753 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 4754 dout("handle_map epoch %u <= our %u\n", 4755 epoch, mdsc->mdsmap->m_epoch); 4756 mutex_unlock(&mdsc->mutex); 4757 return; 4758 } 4759 4760 newmap = ceph_mdsmap_decode(&p, end); 4761 if (IS_ERR(newmap)) { 4762 err = PTR_ERR(newmap); 4763 goto bad_unlock; 4764 } 4765 4766 /* swap into place */ 4767 if (mdsc->mdsmap) { 4768 oldmap = mdsc->mdsmap; 4769 mdsc->mdsmap = newmap; 4770 check_new_map(mdsc, newmap, oldmap); 4771 ceph_mdsmap_destroy(oldmap); 4772 } else { 4773 mdsc->mdsmap = newmap; /* first mds map */ 4774 } 4775 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 4776 MAX_LFS_FILESIZE); 4777 4778 __wake_requests(mdsc, &mdsc->waiting_for_map); 4779 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 4780 mdsc->mdsmap->m_epoch); 4781 4782 mutex_unlock(&mdsc->mutex); 4783 schedule_delayed(mdsc); 4784 return; 4785 4786 bad_unlock: 4787 mutex_unlock(&mdsc->mutex); 4788 bad: 4789 pr_err("error decoding mdsmap %d\n", err); 4790 return; 4791 } 4792 4793 static struct ceph_connection *con_get(struct ceph_connection *con) 4794 { 4795 struct ceph_mds_session *s = con->private; 4796 4797 if (ceph_get_mds_session(s)) 4798 return con; 4799 return NULL; 4800 } 4801 4802 static void con_put(struct ceph_connection *con) 4803 { 4804 struct ceph_mds_session *s = con->private; 4805 4806 ceph_put_mds_session(s); 4807 } 4808 4809 /* 4810 * if the client is unresponsive for long enough, the mds will kill 4811 * the session entirely. 4812 */ 4813 static void peer_reset(struct ceph_connection *con) 4814 { 4815 struct ceph_mds_session *s = con->private; 4816 struct ceph_mds_client *mdsc = s->s_mdsc; 4817 4818 pr_warn("mds%d closed our session\n", s->s_mds); 4819 send_mds_reconnect(mdsc, s); 4820 } 4821 4822 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 4823 { 4824 struct ceph_mds_session *s = con->private; 4825 struct ceph_mds_client *mdsc = s->s_mdsc; 4826 int type = le16_to_cpu(msg->hdr.type); 4827 4828 mutex_lock(&mdsc->mutex); 4829 if (__verify_registered_session(mdsc, s) < 0) { 4830 mutex_unlock(&mdsc->mutex); 4831 goto out; 4832 } 4833 mutex_unlock(&mdsc->mutex); 4834 4835 switch (type) { 4836 case CEPH_MSG_MDS_MAP: 4837 ceph_mdsc_handle_mdsmap(mdsc, msg); 4838 break; 4839 case CEPH_MSG_FS_MAP_USER: 4840 ceph_mdsc_handle_fsmap(mdsc, msg); 4841 break; 4842 case CEPH_MSG_CLIENT_SESSION: 4843 handle_session(s, msg); 4844 break; 4845 case CEPH_MSG_CLIENT_REPLY: 4846 handle_reply(s, msg); 4847 break; 4848 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 4849 handle_forward(mdsc, s, msg); 4850 break; 4851 case CEPH_MSG_CLIENT_CAPS: 4852 ceph_handle_caps(s, msg); 4853 break; 4854 case CEPH_MSG_CLIENT_SNAP: 4855 ceph_handle_snap(mdsc, s, msg); 4856 break; 4857 case CEPH_MSG_CLIENT_LEASE: 4858 handle_lease(mdsc, s, msg); 4859 break; 4860 case CEPH_MSG_CLIENT_QUOTA: 4861 ceph_handle_quota(mdsc, s, msg); 4862 break; 4863 4864 default: 4865 pr_err("received unknown message type %d %s\n", type, 4866 ceph_msg_type_name(type)); 4867 } 4868 out: 4869 ceph_msg_put(msg); 4870 } 4871 4872 /* 4873 * authentication 4874 */ 4875 4876 /* 4877 * Note: returned pointer is the address of a structure that's 4878 * managed separately. Caller must *not* attempt to free it. 4879 */ 4880 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 4881 int *proto, int force_new) 4882 { 4883 struct ceph_mds_session *s = con->private; 4884 struct ceph_mds_client *mdsc = s->s_mdsc; 4885 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4886 struct ceph_auth_handshake *auth = &s->s_auth; 4887 4888 if (force_new && auth->authorizer) { 4889 ceph_auth_destroy_authorizer(auth->authorizer); 4890 auth->authorizer = NULL; 4891 } 4892 if (!auth->authorizer) { 4893 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 4894 auth); 4895 if (ret) 4896 return ERR_PTR(ret); 4897 } else { 4898 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 4899 auth); 4900 if (ret) 4901 return ERR_PTR(ret); 4902 } 4903 *proto = ac->protocol; 4904 4905 return auth; 4906 } 4907 4908 static int add_authorizer_challenge(struct ceph_connection *con, 4909 void *challenge_buf, int challenge_buf_len) 4910 { 4911 struct ceph_mds_session *s = con->private; 4912 struct ceph_mds_client *mdsc = s->s_mdsc; 4913 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4914 4915 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 4916 challenge_buf, challenge_buf_len); 4917 } 4918 4919 static int verify_authorizer_reply(struct ceph_connection *con) 4920 { 4921 struct ceph_mds_session *s = con->private; 4922 struct ceph_mds_client *mdsc = s->s_mdsc; 4923 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4924 4925 return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer); 4926 } 4927 4928 static int invalidate_authorizer(struct ceph_connection *con) 4929 { 4930 struct ceph_mds_session *s = con->private; 4931 struct ceph_mds_client *mdsc = s->s_mdsc; 4932 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4933 4934 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 4935 4936 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 4937 } 4938 4939 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 4940 struct ceph_msg_header *hdr, int *skip) 4941 { 4942 struct ceph_msg *msg; 4943 int type = (int) le16_to_cpu(hdr->type); 4944 int front_len = (int) le32_to_cpu(hdr->front_len); 4945 4946 if (con->in_msg) 4947 return con->in_msg; 4948 4949 *skip = 0; 4950 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 4951 if (!msg) { 4952 pr_err("unable to allocate msg type %d len %d\n", 4953 type, front_len); 4954 return NULL; 4955 } 4956 4957 return msg; 4958 } 4959 4960 static int mds_sign_message(struct ceph_msg *msg) 4961 { 4962 struct ceph_mds_session *s = msg->con->private; 4963 struct ceph_auth_handshake *auth = &s->s_auth; 4964 4965 return ceph_auth_sign_message(auth, msg); 4966 } 4967 4968 static int mds_check_message_signature(struct ceph_msg *msg) 4969 { 4970 struct ceph_mds_session *s = msg->con->private; 4971 struct ceph_auth_handshake *auth = &s->s_auth; 4972 4973 return ceph_auth_check_message_signature(auth, msg); 4974 } 4975 4976 static const struct ceph_connection_operations mds_con_ops = { 4977 .get = con_get, 4978 .put = con_put, 4979 .dispatch = dispatch, 4980 .get_authorizer = get_authorizer, 4981 .add_authorizer_challenge = add_authorizer_challenge, 4982 .verify_authorizer_reply = verify_authorizer_reply, 4983 .invalidate_authorizer = invalidate_authorizer, 4984 .peer_reset = peer_reset, 4985 .alloc_msg = mds_alloc_msg, 4986 .sign_message = mds_sign_message, 4987 .check_message_signature = mds_check_message_signature, 4988 }; 4989 4990 /* eof */ 4991