1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 14 #include "super.h" 15 #include "mds_client.h" 16 17 #include <linux/ceph/ceph_features.h> 18 #include <linux/ceph/messenger.h> 19 #include <linux/ceph/decode.h> 20 #include <linux/ceph/pagelist.h> 21 #include <linux/ceph/auth.h> 22 #include <linux/ceph/debugfs.h> 23 24 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 25 26 /* 27 * A cluster of MDS (metadata server) daemons is responsible for 28 * managing the file system namespace (the directory hierarchy and 29 * inodes) and for coordinating shared access to storage. Metadata is 30 * partitioning hierarchically across a number of servers, and that 31 * partition varies over time as the cluster adjusts the distribution 32 * in order to balance load. 33 * 34 * The MDS client is primarily responsible to managing synchronous 35 * metadata requests for operations like open, unlink, and so forth. 36 * If there is a MDS failure, we find out about it when we (possibly 37 * request and) receive a new MDS map, and can resubmit affected 38 * requests. 39 * 40 * For the most part, though, we take advantage of a lossless 41 * communications channel to the MDS, and do not need to worry about 42 * timing out or resubmitting requests. 43 * 44 * We maintain a stateful "session" with each MDS we interact with. 45 * Within each session, we sent periodic heartbeat messages to ensure 46 * any capabilities or leases we have been issues remain valid. If 47 * the session times out and goes stale, our leases and capabilities 48 * are no longer valid. 49 */ 50 51 struct ceph_reconnect_state { 52 struct ceph_mds_session *session; 53 int nr_caps, nr_realms; 54 struct ceph_pagelist *pagelist; 55 unsigned msg_version; 56 bool allow_multi; 57 }; 58 59 static void __wake_requests(struct ceph_mds_client *mdsc, 60 struct list_head *head); 61 static void ceph_cap_release_work(struct work_struct *work); 62 static void ceph_cap_reclaim_work(struct work_struct *work); 63 64 static const struct ceph_connection_operations mds_con_ops; 65 66 67 /* 68 * mds reply parsing 69 */ 70 71 static int parse_reply_info_quota(void **p, void *end, 72 struct ceph_mds_reply_info_in *info) 73 { 74 u8 struct_v, struct_compat; 75 u32 struct_len; 76 77 ceph_decode_8_safe(p, end, struct_v, bad); 78 ceph_decode_8_safe(p, end, struct_compat, bad); 79 /* struct_v is expected to be >= 1. we only 80 * understand encoding with struct_compat == 1. */ 81 if (!struct_v || struct_compat != 1) 82 goto bad; 83 ceph_decode_32_safe(p, end, struct_len, bad); 84 ceph_decode_need(p, end, struct_len, bad); 85 end = *p + struct_len; 86 ceph_decode_64_safe(p, end, info->max_bytes, bad); 87 ceph_decode_64_safe(p, end, info->max_files, bad); 88 *p = end; 89 return 0; 90 bad: 91 return -EIO; 92 } 93 94 /* 95 * parse individual inode info 96 */ 97 static int parse_reply_info_in(void **p, void *end, 98 struct ceph_mds_reply_info_in *info, 99 u64 features) 100 { 101 int err = 0; 102 u8 struct_v = 0; 103 104 if (features == (u64)-1) { 105 u32 struct_len; 106 u8 struct_compat; 107 ceph_decode_8_safe(p, end, struct_v, bad); 108 ceph_decode_8_safe(p, end, struct_compat, bad); 109 /* struct_v is expected to be >= 1. we only understand 110 * encoding with struct_compat == 1. */ 111 if (!struct_v || struct_compat != 1) 112 goto bad; 113 ceph_decode_32_safe(p, end, struct_len, bad); 114 ceph_decode_need(p, end, struct_len, bad); 115 end = *p + struct_len; 116 } 117 118 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 119 info->in = *p; 120 *p += sizeof(struct ceph_mds_reply_inode) + 121 sizeof(*info->in->fragtree.splits) * 122 le32_to_cpu(info->in->fragtree.nsplits); 123 124 ceph_decode_32_safe(p, end, info->symlink_len, bad); 125 ceph_decode_need(p, end, info->symlink_len, bad); 126 info->symlink = *p; 127 *p += info->symlink_len; 128 129 ceph_decode_copy_safe(p, end, &info->dir_layout, 130 sizeof(info->dir_layout), bad); 131 ceph_decode_32_safe(p, end, info->xattr_len, bad); 132 ceph_decode_need(p, end, info->xattr_len, bad); 133 info->xattr_data = *p; 134 *p += info->xattr_len; 135 136 if (features == (u64)-1) { 137 /* inline data */ 138 ceph_decode_64_safe(p, end, info->inline_version, bad); 139 ceph_decode_32_safe(p, end, info->inline_len, bad); 140 ceph_decode_need(p, end, info->inline_len, bad); 141 info->inline_data = *p; 142 *p += info->inline_len; 143 /* quota */ 144 err = parse_reply_info_quota(p, end, info); 145 if (err < 0) 146 goto out_bad; 147 /* pool namespace */ 148 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 149 if (info->pool_ns_len > 0) { 150 ceph_decode_need(p, end, info->pool_ns_len, bad); 151 info->pool_ns_data = *p; 152 *p += info->pool_ns_len; 153 } 154 155 /* btime */ 156 ceph_decode_need(p, end, sizeof(info->btime), bad); 157 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 158 159 /* change attribute */ 160 ceph_decode_64_safe(p, end, info->change_attr, bad); 161 162 /* dir pin */ 163 if (struct_v >= 2) { 164 ceph_decode_32_safe(p, end, info->dir_pin, bad); 165 } else { 166 info->dir_pin = -ENODATA; 167 } 168 169 /* snapshot birth time, remains zero for v<=2 */ 170 if (struct_v >= 3) { 171 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 172 ceph_decode_copy(p, &info->snap_btime, 173 sizeof(info->snap_btime)); 174 } else { 175 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 176 } 177 178 *p = end; 179 } else { 180 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 181 ceph_decode_64_safe(p, end, info->inline_version, bad); 182 ceph_decode_32_safe(p, end, info->inline_len, bad); 183 ceph_decode_need(p, end, info->inline_len, bad); 184 info->inline_data = *p; 185 *p += info->inline_len; 186 } else 187 info->inline_version = CEPH_INLINE_NONE; 188 189 if (features & CEPH_FEATURE_MDS_QUOTA) { 190 err = parse_reply_info_quota(p, end, info); 191 if (err < 0) 192 goto out_bad; 193 } else { 194 info->max_bytes = 0; 195 info->max_files = 0; 196 } 197 198 info->pool_ns_len = 0; 199 info->pool_ns_data = NULL; 200 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 201 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 202 if (info->pool_ns_len > 0) { 203 ceph_decode_need(p, end, info->pool_ns_len, bad); 204 info->pool_ns_data = *p; 205 *p += info->pool_ns_len; 206 } 207 } 208 209 if (features & CEPH_FEATURE_FS_BTIME) { 210 ceph_decode_need(p, end, sizeof(info->btime), bad); 211 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 212 ceph_decode_64_safe(p, end, info->change_attr, bad); 213 } 214 215 info->dir_pin = -ENODATA; 216 /* info->snap_btime remains zero */ 217 } 218 return 0; 219 bad: 220 err = -EIO; 221 out_bad: 222 return err; 223 } 224 225 static int parse_reply_info_dir(void **p, void *end, 226 struct ceph_mds_reply_dirfrag **dirfrag, 227 u64 features) 228 { 229 if (features == (u64)-1) { 230 u8 struct_v, struct_compat; 231 u32 struct_len; 232 ceph_decode_8_safe(p, end, struct_v, bad); 233 ceph_decode_8_safe(p, end, struct_compat, bad); 234 /* struct_v is expected to be >= 1. we only understand 235 * encoding whose struct_compat == 1. */ 236 if (!struct_v || struct_compat != 1) 237 goto bad; 238 ceph_decode_32_safe(p, end, struct_len, bad); 239 ceph_decode_need(p, end, struct_len, bad); 240 end = *p + struct_len; 241 } 242 243 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 244 *dirfrag = *p; 245 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 246 if (unlikely(*p > end)) 247 goto bad; 248 if (features == (u64)-1) 249 *p = end; 250 return 0; 251 bad: 252 return -EIO; 253 } 254 255 static int parse_reply_info_lease(void **p, void *end, 256 struct ceph_mds_reply_lease **lease, 257 u64 features) 258 { 259 if (features == (u64)-1) { 260 u8 struct_v, struct_compat; 261 u32 struct_len; 262 ceph_decode_8_safe(p, end, struct_v, bad); 263 ceph_decode_8_safe(p, end, struct_compat, bad); 264 /* struct_v is expected to be >= 1. we only understand 265 * encoding whose struct_compat == 1. */ 266 if (!struct_v || struct_compat != 1) 267 goto bad; 268 ceph_decode_32_safe(p, end, struct_len, bad); 269 ceph_decode_need(p, end, struct_len, bad); 270 end = *p + struct_len; 271 } 272 273 ceph_decode_need(p, end, sizeof(**lease), bad); 274 *lease = *p; 275 *p += sizeof(**lease); 276 if (features == (u64)-1) 277 *p = end; 278 return 0; 279 bad: 280 return -EIO; 281 } 282 283 /* 284 * parse a normal reply, which may contain a (dir+)dentry and/or a 285 * target inode. 286 */ 287 static int parse_reply_info_trace(void **p, void *end, 288 struct ceph_mds_reply_info_parsed *info, 289 u64 features) 290 { 291 int err; 292 293 if (info->head->is_dentry) { 294 err = parse_reply_info_in(p, end, &info->diri, features); 295 if (err < 0) 296 goto out_bad; 297 298 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 299 if (err < 0) 300 goto out_bad; 301 302 ceph_decode_32_safe(p, end, info->dname_len, bad); 303 ceph_decode_need(p, end, info->dname_len, bad); 304 info->dname = *p; 305 *p += info->dname_len; 306 307 err = parse_reply_info_lease(p, end, &info->dlease, features); 308 if (err < 0) 309 goto out_bad; 310 } 311 312 if (info->head->is_target) { 313 err = parse_reply_info_in(p, end, &info->targeti, features); 314 if (err < 0) 315 goto out_bad; 316 } 317 318 if (unlikely(*p != end)) 319 goto bad; 320 return 0; 321 322 bad: 323 err = -EIO; 324 out_bad: 325 pr_err("problem parsing mds trace %d\n", err); 326 return err; 327 } 328 329 /* 330 * parse readdir results 331 */ 332 static int parse_reply_info_readdir(void **p, void *end, 333 struct ceph_mds_reply_info_parsed *info, 334 u64 features) 335 { 336 u32 num, i = 0; 337 int err; 338 339 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 340 if (err < 0) 341 goto out_bad; 342 343 ceph_decode_need(p, end, sizeof(num) + 2, bad); 344 num = ceph_decode_32(p); 345 { 346 u16 flags = ceph_decode_16(p); 347 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 348 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 349 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 350 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 351 } 352 if (num == 0) 353 goto done; 354 355 BUG_ON(!info->dir_entries); 356 if ((unsigned long)(info->dir_entries + num) > 357 (unsigned long)info->dir_entries + info->dir_buf_size) { 358 pr_err("dir contents are larger than expected\n"); 359 WARN_ON(1); 360 goto bad; 361 } 362 363 info->dir_nr = num; 364 while (num) { 365 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 366 /* dentry */ 367 ceph_decode_32_safe(p, end, rde->name_len, bad); 368 ceph_decode_need(p, end, rde->name_len, bad); 369 rde->name = *p; 370 *p += rde->name_len; 371 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 372 373 /* dentry lease */ 374 err = parse_reply_info_lease(p, end, &rde->lease, features); 375 if (err) 376 goto out_bad; 377 /* inode */ 378 err = parse_reply_info_in(p, end, &rde->inode, features); 379 if (err < 0) 380 goto out_bad; 381 /* ceph_readdir_prepopulate() will update it */ 382 rde->offset = 0; 383 i++; 384 num--; 385 } 386 387 done: 388 /* Skip over any unrecognized fields */ 389 *p = end; 390 return 0; 391 392 bad: 393 err = -EIO; 394 out_bad: 395 pr_err("problem parsing dir contents %d\n", err); 396 return err; 397 } 398 399 /* 400 * parse fcntl F_GETLK results 401 */ 402 static int parse_reply_info_filelock(void **p, void *end, 403 struct ceph_mds_reply_info_parsed *info, 404 u64 features) 405 { 406 if (*p + sizeof(*info->filelock_reply) > end) 407 goto bad; 408 409 info->filelock_reply = *p; 410 411 /* Skip over any unrecognized fields */ 412 *p = end; 413 return 0; 414 bad: 415 return -EIO; 416 } 417 418 419 #if BITS_PER_LONG == 64 420 421 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 422 423 static int ceph_parse_deleg_inos(void **p, void *end, 424 struct ceph_mds_session *s) 425 { 426 u32 sets; 427 428 ceph_decode_32_safe(p, end, sets, bad); 429 dout("got %u sets of delegated inodes\n", sets); 430 while (sets--) { 431 u64 start, len, ino; 432 433 ceph_decode_64_safe(p, end, start, bad); 434 ceph_decode_64_safe(p, end, len, bad); 435 while (len--) { 436 int err = xa_insert(&s->s_delegated_inos, ino = start++, 437 DELEGATED_INO_AVAILABLE, 438 GFP_KERNEL); 439 if (!err) { 440 dout("added delegated inode 0x%llx\n", 441 start - 1); 442 } else if (err == -EBUSY) { 443 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n", 444 start - 1); 445 } else { 446 return err; 447 } 448 } 449 } 450 return 0; 451 bad: 452 return -EIO; 453 } 454 455 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 456 { 457 unsigned long ino; 458 void *val; 459 460 xa_for_each(&s->s_delegated_inos, ino, val) { 461 val = xa_erase(&s->s_delegated_inos, ino); 462 if (val == DELEGATED_INO_AVAILABLE) 463 return ino; 464 } 465 return 0; 466 } 467 468 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 469 { 470 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 471 GFP_KERNEL); 472 } 473 #else /* BITS_PER_LONG == 64 */ 474 /* 475 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 476 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 477 * and bottom words? 478 */ 479 static int ceph_parse_deleg_inos(void **p, void *end, 480 struct ceph_mds_session *s) 481 { 482 u32 sets; 483 484 ceph_decode_32_safe(p, end, sets, bad); 485 if (sets) 486 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 487 return 0; 488 bad: 489 return -EIO; 490 } 491 492 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 493 { 494 return 0; 495 } 496 497 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 498 { 499 return 0; 500 } 501 #endif /* BITS_PER_LONG == 64 */ 502 503 /* 504 * parse create results 505 */ 506 static int parse_reply_info_create(void **p, void *end, 507 struct ceph_mds_reply_info_parsed *info, 508 u64 features, struct ceph_mds_session *s) 509 { 510 int ret; 511 512 if (features == (u64)-1 || 513 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 514 if (*p == end) { 515 /* Malformed reply? */ 516 info->has_create_ino = false; 517 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 518 u8 struct_v, struct_compat; 519 u32 len; 520 521 info->has_create_ino = true; 522 ceph_decode_8_safe(p, end, struct_v, bad); 523 ceph_decode_8_safe(p, end, struct_compat, bad); 524 ceph_decode_32_safe(p, end, len, bad); 525 ceph_decode_64_safe(p, end, info->ino, bad); 526 ret = ceph_parse_deleg_inos(p, end, s); 527 if (ret) 528 return ret; 529 } else { 530 /* legacy */ 531 ceph_decode_64_safe(p, end, info->ino, bad); 532 info->has_create_ino = true; 533 } 534 } else { 535 if (*p != end) 536 goto bad; 537 } 538 539 /* Skip over any unrecognized fields */ 540 *p = end; 541 return 0; 542 bad: 543 return -EIO; 544 } 545 546 /* 547 * parse extra results 548 */ 549 static int parse_reply_info_extra(void **p, void *end, 550 struct ceph_mds_reply_info_parsed *info, 551 u64 features, struct ceph_mds_session *s) 552 { 553 u32 op = le32_to_cpu(info->head->op); 554 555 if (op == CEPH_MDS_OP_GETFILELOCK) 556 return parse_reply_info_filelock(p, end, info, features); 557 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 558 return parse_reply_info_readdir(p, end, info, features); 559 else if (op == CEPH_MDS_OP_CREATE) 560 return parse_reply_info_create(p, end, info, features, s); 561 else 562 return -EIO; 563 } 564 565 /* 566 * parse entire mds reply 567 */ 568 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 569 struct ceph_mds_reply_info_parsed *info, 570 u64 features) 571 { 572 void *p, *end; 573 u32 len; 574 int err; 575 576 info->head = msg->front.iov_base; 577 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 578 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 579 580 /* trace */ 581 ceph_decode_32_safe(&p, end, len, bad); 582 if (len > 0) { 583 ceph_decode_need(&p, end, len, bad); 584 err = parse_reply_info_trace(&p, p+len, info, features); 585 if (err < 0) 586 goto out_bad; 587 } 588 589 /* extra */ 590 ceph_decode_32_safe(&p, end, len, bad); 591 if (len > 0) { 592 ceph_decode_need(&p, end, len, bad); 593 err = parse_reply_info_extra(&p, p+len, info, features, s); 594 if (err < 0) 595 goto out_bad; 596 } 597 598 /* snap blob */ 599 ceph_decode_32_safe(&p, end, len, bad); 600 info->snapblob_len = len; 601 info->snapblob = p; 602 p += len; 603 604 if (p != end) 605 goto bad; 606 return 0; 607 608 bad: 609 err = -EIO; 610 out_bad: 611 pr_err("mds parse_reply err %d\n", err); 612 return err; 613 } 614 615 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 616 { 617 if (!info->dir_entries) 618 return; 619 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 620 } 621 622 623 /* 624 * sessions 625 */ 626 const char *ceph_session_state_name(int s) 627 { 628 switch (s) { 629 case CEPH_MDS_SESSION_NEW: return "new"; 630 case CEPH_MDS_SESSION_OPENING: return "opening"; 631 case CEPH_MDS_SESSION_OPEN: return "open"; 632 case CEPH_MDS_SESSION_HUNG: return "hung"; 633 case CEPH_MDS_SESSION_CLOSING: return "closing"; 634 case CEPH_MDS_SESSION_CLOSED: return "closed"; 635 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 636 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 637 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 638 default: return "???"; 639 } 640 } 641 642 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 643 { 644 if (refcount_inc_not_zero(&s->s_ref)) { 645 dout("mdsc get_session %p %d -> %d\n", s, 646 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref)); 647 return s; 648 } else { 649 dout("mdsc get_session %p 0 -- FAIL\n", s); 650 return NULL; 651 } 652 } 653 654 void ceph_put_mds_session(struct ceph_mds_session *s) 655 { 656 dout("mdsc put_session %p %d -> %d\n", s, 657 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1); 658 if (refcount_dec_and_test(&s->s_ref)) { 659 if (s->s_auth.authorizer) 660 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 661 xa_destroy(&s->s_delegated_inos); 662 kfree(s); 663 } 664 } 665 666 /* 667 * called under mdsc->mutex 668 */ 669 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 670 int mds) 671 { 672 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 673 return NULL; 674 return ceph_get_mds_session(mdsc->sessions[mds]); 675 } 676 677 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 678 { 679 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 680 return false; 681 else 682 return true; 683 } 684 685 static int __verify_registered_session(struct ceph_mds_client *mdsc, 686 struct ceph_mds_session *s) 687 { 688 if (s->s_mds >= mdsc->max_sessions || 689 mdsc->sessions[s->s_mds] != s) 690 return -ENOENT; 691 return 0; 692 } 693 694 /* 695 * create+register a new session for given mds. 696 * called under mdsc->mutex. 697 */ 698 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 699 int mds) 700 { 701 struct ceph_mds_session *s; 702 703 if (mds >= mdsc->mdsmap->possible_max_rank) 704 return ERR_PTR(-EINVAL); 705 706 s = kzalloc(sizeof(*s), GFP_NOFS); 707 if (!s) 708 return ERR_PTR(-ENOMEM); 709 710 if (mds >= mdsc->max_sessions) { 711 int newmax = 1 << get_count_order(mds + 1); 712 struct ceph_mds_session **sa; 713 714 dout("%s: realloc to %d\n", __func__, newmax); 715 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 716 if (!sa) 717 goto fail_realloc; 718 if (mdsc->sessions) { 719 memcpy(sa, mdsc->sessions, 720 mdsc->max_sessions * sizeof(void *)); 721 kfree(mdsc->sessions); 722 } 723 mdsc->sessions = sa; 724 mdsc->max_sessions = newmax; 725 } 726 727 dout("%s: mds%d\n", __func__, mds); 728 s->s_mdsc = mdsc; 729 s->s_mds = mds; 730 s->s_state = CEPH_MDS_SESSION_NEW; 731 s->s_ttl = 0; 732 s->s_seq = 0; 733 mutex_init(&s->s_mutex); 734 735 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 736 737 spin_lock_init(&s->s_gen_ttl_lock); 738 s->s_cap_gen = 1; 739 s->s_cap_ttl = jiffies - 1; 740 741 spin_lock_init(&s->s_cap_lock); 742 s->s_renew_requested = 0; 743 s->s_renew_seq = 0; 744 INIT_LIST_HEAD(&s->s_caps); 745 s->s_nr_caps = 0; 746 refcount_set(&s->s_ref, 1); 747 INIT_LIST_HEAD(&s->s_waiting); 748 INIT_LIST_HEAD(&s->s_unsafe); 749 xa_init(&s->s_delegated_inos); 750 s->s_num_cap_releases = 0; 751 s->s_cap_reconnect = 0; 752 s->s_cap_iterator = NULL; 753 INIT_LIST_HEAD(&s->s_cap_releases); 754 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 755 756 INIT_LIST_HEAD(&s->s_cap_flushing); 757 758 mdsc->sessions[mds] = s; 759 atomic_inc(&mdsc->num_sessions); 760 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 761 762 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 763 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 764 765 return s; 766 767 fail_realloc: 768 kfree(s); 769 return ERR_PTR(-ENOMEM); 770 } 771 772 /* 773 * called under mdsc->mutex 774 */ 775 static void __unregister_session(struct ceph_mds_client *mdsc, 776 struct ceph_mds_session *s) 777 { 778 dout("__unregister_session mds%d %p\n", s->s_mds, s); 779 BUG_ON(mdsc->sessions[s->s_mds] != s); 780 mdsc->sessions[s->s_mds] = NULL; 781 ceph_con_close(&s->s_con); 782 ceph_put_mds_session(s); 783 atomic_dec(&mdsc->num_sessions); 784 } 785 786 /* 787 * drop session refs in request. 788 * 789 * should be last request ref, or hold mdsc->mutex 790 */ 791 static void put_request_session(struct ceph_mds_request *req) 792 { 793 if (req->r_session) { 794 ceph_put_mds_session(req->r_session); 795 req->r_session = NULL; 796 } 797 } 798 799 void ceph_mdsc_release_request(struct kref *kref) 800 { 801 struct ceph_mds_request *req = container_of(kref, 802 struct ceph_mds_request, 803 r_kref); 804 ceph_mdsc_release_dir_caps(req); 805 destroy_reply_info(&req->r_reply_info); 806 if (req->r_request) 807 ceph_msg_put(req->r_request); 808 if (req->r_reply) 809 ceph_msg_put(req->r_reply); 810 if (req->r_inode) { 811 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 812 /* avoid calling iput_final() in mds dispatch threads */ 813 ceph_async_iput(req->r_inode); 814 } 815 if (req->r_parent) { 816 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 817 ceph_async_iput(req->r_parent); 818 } 819 ceph_async_iput(req->r_target_inode); 820 if (req->r_dentry) 821 dput(req->r_dentry); 822 if (req->r_old_dentry) 823 dput(req->r_old_dentry); 824 if (req->r_old_dentry_dir) { 825 /* 826 * track (and drop pins for) r_old_dentry_dir 827 * separately, since r_old_dentry's d_parent may have 828 * changed between the dir mutex being dropped and 829 * this request being freed. 830 */ 831 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 832 CEPH_CAP_PIN); 833 ceph_async_iput(req->r_old_dentry_dir); 834 } 835 kfree(req->r_path1); 836 kfree(req->r_path2); 837 if (req->r_pagelist) 838 ceph_pagelist_release(req->r_pagelist); 839 put_request_session(req); 840 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 841 WARN_ON_ONCE(!list_empty(&req->r_wait)); 842 kmem_cache_free(ceph_mds_request_cachep, req); 843 } 844 845 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 846 847 /* 848 * lookup session, bump ref if found. 849 * 850 * called under mdsc->mutex. 851 */ 852 static struct ceph_mds_request * 853 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 854 { 855 struct ceph_mds_request *req; 856 857 req = lookup_request(&mdsc->request_tree, tid); 858 if (req) 859 ceph_mdsc_get_request(req); 860 861 return req; 862 } 863 864 /* 865 * Register an in-flight request, and assign a tid. Link to directory 866 * are modifying (if any). 867 * 868 * Called under mdsc->mutex. 869 */ 870 static void __register_request(struct ceph_mds_client *mdsc, 871 struct ceph_mds_request *req, 872 struct inode *dir) 873 { 874 int ret = 0; 875 876 req->r_tid = ++mdsc->last_tid; 877 if (req->r_num_caps) { 878 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 879 req->r_num_caps); 880 if (ret < 0) { 881 pr_err("__register_request %p " 882 "failed to reserve caps: %d\n", req, ret); 883 /* set req->r_err to fail early from __do_request */ 884 req->r_err = ret; 885 return; 886 } 887 } 888 dout("__register_request %p tid %lld\n", req, req->r_tid); 889 ceph_mdsc_get_request(req); 890 insert_request(&mdsc->request_tree, req); 891 892 req->r_uid = current_fsuid(); 893 req->r_gid = current_fsgid(); 894 895 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 896 mdsc->oldest_tid = req->r_tid; 897 898 if (dir) { 899 struct ceph_inode_info *ci = ceph_inode(dir); 900 901 ihold(dir); 902 req->r_unsafe_dir = dir; 903 spin_lock(&ci->i_unsafe_lock); 904 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 905 spin_unlock(&ci->i_unsafe_lock); 906 } 907 } 908 909 static void __unregister_request(struct ceph_mds_client *mdsc, 910 struct ceph_mds_request *req) 911 { 912 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 913 914 /* Never leave an unregistered request on an unsafe list! */ 915 list_del_init(&req->r_unsafe_item); 916 917 if (req->r_tid == mdsc->oldest_tid) { 918 struct rb_node *p = rb_next(&req->r_node); 919 mdsc->oldest_tid = 0; 920 while (p) { 921 struct ceph_mds_request *next_req = 922 rb_entry(p, struct ceph_mds_request, r_node); 923 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 924 mdsc->oldest_tid = next_req->r_tid; 925 break; 926 } 927 p = rb_next(p); 928 } 929 } 930 931 erase_request(&mdsc->request_tree, req); 932 933 if (req->r_unsafe_dir) { 934 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 935 spin_lock(&ci->i_unsafe_lock); 936 list_del_init(&req->r_unsafe_dir_item); 937 spin_unlock(&ci->i_unsafe_lock); 938 } 939 if (req->r_target_inode && 940 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 941 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 942 spin_lock(&ci->i_unsafe_lock); 943 list_del_init(&req->r_unsafe_target_item); 944 spin_unlock(&ci->i_unsafe_lock); 945 } 946 947 if (req->r_unsafe_dir) { 948 /* avoid calling iput_final() in mds dispatch threads */ 949 ceph_async_iput(req->r_unsafe_dir); 950 req->r_unsafe_dir = NULL; 951 } 952 953 complete_all(&req->r_safe_completion); 954 955 ceph_mdsc_put_request(req); 956 } 957 958 /* 959 * Walk back up the dentry tree until we hit a dentry representing a 960 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 961 * when calling this) to ensure that the objects won't disappear while we're 962 * working with them. Once we hit a candidate dentry, we attempt to take a 963 * reference to it, and return that as the result. 964 */ 965 static struct inode *get_nonsnap_parent(struct dentry *dentry) 966 { 967 struct inode *inode = NULL; 968 969 while (dentry && !IS_ROOT(dentry)) { 970 inode = d_inode_rcu(dentry); 971 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 972 break; 973 dentry = dentry->d_parent; 974 } 975 if (inode) 976 inode = igrab(inode); 977 return inode; 978 } 979 980 /* 981 * Choose mds to send request to next. If there is a hint set in the 982 * request (e.g., due to a prior forward hint from the mds), use that. 983 * Otherwise, consult frag tree and/or caps to identify the 984 * appropriate mds. If all else fails, choose randomly. 985 * 986 * Called under mdsc->mutex. 987 */ 988 static int __choose_mds(struct ceph_mds_client *mdsc, 989 struct ceph_mds_request *req, 990 bool *random) 991 { 992 struct inode *inode; 993 struct ceph_inode_info *ci; 994 struct ceph_cap *cap; 995 int mode = req->r_direct_mode; 996 int mds = -1; 997 u32 hash = req->r_direct_hash; 998 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 999 1000 if (random) 1001 *random = false; 1002 1003 /* 1004 * is there a specific mds we should try? ignore hint if we have 1005 * no session and the mds is not up (active or recovering). 1006 */ 1007 if (req->r_resend_mds >= 0 && 1008 (__have_session(mdsc, req->r_resend_mds) || 1009 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1010 dout("%s using resend_mds mds%d\n", __func__, 1011 req->r_resend_mds); 1012 return req->r_resend_mds; 1013 } 1014 1015 if (mode == USE_RANDOM_MDS) 1016 goto random; 1017 1018 inode = NULL; 1019 if (req->r_inode) { 1020 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1021 inode = req->r_inode; 1022 ihold(inode); 1023 } else { 1024 /* req->r_dentry is non-null for LSSNAP request */ 1025 rcu_read_lock(); 1026 inode = get_nonsnap_parent(req->r_dentry); 1027 rcu_read_unlock(); 1028 dout("%s using snapdir's parent %p\n", __func__, inode); 1029 } 1030 } else if (req->r_dentry) { 1031 /* ignore race with rename; old or new d_parent is okay */ 1032 struct dentry *parent; 1033 struct inode *dir; 1034 1035 rcu_read_lock(); 1036 parent = READ_ONCE(req->r_dentry->d_parent); 1037 dir = req->r_parent ? : d_inode_rcu(parent); 1038 1039 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1040 /* not this fs or parent went negative */ 1041 inode = d_inode(req->r_dentry); 1042 if (inode) 1043 ihold(inode); 1044 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1045 /* direct snapped/virtual snapdir requests 1046 * based on parent dir inode */ 1047 inode = get_nonsnap_parent(parent); 1048 dout("%s using nonsnap parent %p\n", __func__, inode); 1049 } else { 1050 /* dentry target */ 1051 inode = d_inode(req->r_dentry); 1052 if (!inode || mode == USE_AUTH_MDS) { 1053 /* dir + name */ 1054 inode = igrab(dir); 1055 hash = ceph_dentry_hash(dir, req->r_dentry); 1056 is_hash = true; 1057 } else { 1058 ihold(inode); 1059 } 1060 } 1061 rcu_read_unlock(); 1062 } 1063 1064 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, 1065 hash, mode); 1066 if (!inode) 1067 goto random; 1068 ci = ceph_inode(inode); 1069 1070 if (is_hash && S_ISDIR(inode->i_mode)) { 1071 struct ceph_inode_frag frag; 1072 int found; 1073 1074 ceph_choose_frag(ci, hash, &frag, &found); 1075 if (found) { 1076 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1077 u8 r; 1078 1079 /* choose a random replica */ 1080 get_random_bytes(&r, 1); 1081 r %= frag.ndist; 1082 mds = frag.dist[r]; 1083 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", 1084 __func__, inode, ceph_vinop(inode), 1085 frag.frag, mds, (int)r, frag.ndist); 1086 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1087 CEPH_MDS_STATE_ACTIVE && 1088 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1089 goto out; 1090 } 1091 1092 /* since this file/dir wasn't known to be 1093 * replicated, then we want to look for the 1094 * authoritative mds. */ 1095 if (frag.mds >= 0) { 1096 /* choose auth mds */ 1097 mds = frag.mds; 1098 dout("%s %p %llx.%llx frag %u mds%d (auth)\n", 1099 __func__, inode, ceph_vinop(inode), 1100 frag.frag, mds); 1101 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1102 CEPH_MDS_STATE_ACTIVE) { 1103 if (mode == USE_ANY_MDS && 1104 !ceph_mdsmap_is_laggy(mdsc->mdsmap, 1105 mds)) 1106 goto out; 1107 } 1108 } 1109 mode = USE_AUTH_MDS; 1110 } 1111 } 1112 1113 spin_lock(&ci->i_ceph_lock); 1114 cap = NULL; 1115 if (mode == USE_AUTH_MDS) 1116 cap = ci->i_auth_cap; 1117 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1118 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1119 if (!cap) { 1120 spin_unlock(&ci->i_ceph_lock); 1121 ceph_async_iput(inode); 1122 goto random; 1123 } 1124 mds = cap->session->s_mds; 1125 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, 1126 inode, ceph_vinop(inode), mds, 1127 cap == ci->i_auth_cap ? "auth " : "", cap); 1128 spin_unlock(&ci->i_ceph_lock); 1129 out: 1130 /* avoid calling iput_final() while holding mdsc->mutex or 1131 * in mds dispatch threads */ 1132 ceph_async_iput(inode); 1133 return mds; 1134 1135 random: 1136 if (random) 1137 *random = true; 1138 1139 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1140 dout("%s chose random mds%d\n", __func__, mds); 1141 return mds; 1142 } 1143 1144 1145 /* 1146 * session messages 1147 */ 1148 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 1149 { 1150 struct ceph_msg *msg; 1151 struct ceph_mds_session_head *h; 1152 1153 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1154 false); 1155 if (!msg) { 1156 pr_err("create_session_msg ENOMEM creating msg\n"); 1157 return NULL; 1158 } 1159 h = msg->front.iov_base; 1160 h->op = cpu_to_le32(op); 1161 h->seq = cpu_to_le64(seq); 1162 1163 return msg; 1164 } 1165 1166 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1167 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1168 static void encode_supported_features(void **p, void *end) 1169 { 1170 static const size_t count = ARRAY_SIZE(feature_bits); 1171 1172 if (count > 0) { 1173 size_t i; 1174 size_t size = FEATURE_BYTES(count); 1175 1176 BUG_ON(*p + 4 + size > end); 1177 ceph_encode_32(p, size); 1178 memset(*p, 0, size); 1179 for (i = 0; i < count; i++) 1180 ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8); 1181 *p += size; 1182 } else { 1183 BUG_ON(*p + 4 > end); 1184 ceph_encode_32(p, 0); 1185 } 1186 } 1187 1188 /* 1189 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1190 * to include additional client metadata fields. 1191 */ 1192 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1193 { 1194 struct ceph_msg *msg; 1195 struct ceph_mds_session_head *h; 1196 int i = -1; 1197 int extra_bytes = 0; 1198 int metadata_key_count = 0; 1199 struct ceph_options *opt = mdsc->fsc->client->options; 1200 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1201 size_t size, count; 1202 void *p, *end; 1203 1204 const char* metadata[][2] = { 1205 {"hostname", mdsc->nodename}, 1206 {"kernel_version", init_utsname()->release}, 1207 {"entity_id", opt->name ? : ""}, 1208 {"root", fsopt->server_path ? : "/"}, 1209 {NULL, NULL} 1210 }; 1211 1212 /* Calculate serialized length of metadata */ 1213 extra_bytes = 4; /* map length */ 1214 for (i = 0; metadata[i][0]; ++i) { 1215 extra_bytes += 8 + strlen(metadata[i][0]) + 1216 strlen(metadata[i][1]); 1217 metadata_key_count++; 1218 } 1219 1220 /* supported feature */ 1221 size = 0; 1222 count = ARRAY_SIZE(feature_bits); 1223 if (count > 0) 1224 size = FEATURE_BYTES(count); 1225 extra_bytes += 4 + size; 1226 1227 /* Allocate the message */ 1228 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1229 GFP_NOFS, false); 1230 if (!msg) { 1231 pr_err("create_session_msg ENOMEM creating msg\n"); 1232 return NULL; 1233 } 1234 p = msg->front.iov_base; 1235 end = p + msg->front.iov_len; 1236 1237 h = p; 1238 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1239 h->seq = cpu_to_le64(seq); 1240 1241 /* 1242 * Serialize client metadata into waiting buffer space, using 1243 * the format that userspace expects for map<string, string> 1244 * 1245 * ClientSession messages with metadata are v3 1246 */ 1247 msg->hdr.version = cpu_to_le16(3); 1248 msg->hdr.compat_version = cpu_to_le16(1); 1249 1250 /* The write pointer, following the session_head structure */ 1251 p += sizeof(*h); 1252 1253 /* Number of entries in the map */ 1254 ceph_encode_32(&p, metadata_key_count); 1255 1256 /* Two length-prefixed strings for each entry in the map */ 1257 for (i = 0; metadata[i][0]; ++i) { 1258 size_t const key_len = strlen(metadata[i][0]); 1259 size_t const val_len = strlen(metadata[i][1]); 1260 1261 ceph_encode_32(&p, key_len); 1262 memcpy(p, metadata[i][0], key_len); 1263 p += key_len; 1264 ceph_encode_32(&p, val_len); 1265 memcpy(p, metadata[i][1], val_len); 1266 p += val_len; 1267 } 1268 1269 encode_supported_features(&p, end); 1270 msg->front.iov_len = p - msg->front.iov_base; 1271 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1272 1273 return msg; 1274 } 1275 1276 /* 1277 * send session open request. 1278 * 1279 * called under mdsc->mutex 1280 */ 1281 static int __open_session(struct ceph_mds_client *mdsc, 1282 struct ceph_mds_session *session) 1283 { 1284 struct ceph_msg *msg; 1285 int mstate; 1286 int mds = session->s_mds; 1287 1288 /* wait for mds to go active? */ 1289 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1290 dout("open_session to mds%d (%s)\n", mds, 1291 ceph_mds_state_name(mstate)); 1292 session->s_state = CEPH_MDS_SESSION_OPENING; 1293 session->s_renew_requested = jiffies; 1294 1295 /* send connect message */ 1296 msg = create_session_open_msg(mdsc, session->s_seq); 1297 if (!msg) 1298 return -ENOMEM; 1299 ceph_con_send(&session->s_con, msg); 1300 return 0; 1301 } 1302 1303 /* 1304 * open sessions for any export targets for the given mds 1305 * 1306 * called under mdsc->mutex 1307 */ 1308 static struct ceph_mds_session * 1309 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1310 { 1311 struct ceph_mds_session *session; 1312 1313 session = __ceph_lookup_mds_session(mdsc, target); 1314 if (!session) { 1315 session = register_session(mdsc, target); 1316 if (IS_ERR(session)) 1317 return session; 1318 } 1319 if (session->s_state == CEPH_MDS_SESSION_NEW || 1320 session->s_state == CEPH_MDS_SESSION_CLOSING) 1321 __open_session(mdsc, session); 1322 1323 return session; 1324 } 1325 1326 struct ceph_mds_session * 1327 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1328 { 1329 struct ceph_mds_session *session; 1330 1331 dout("open_export_target_session to mds%d\n", target); 1332 1333 mutex_lock(&mdsc->mutex); 1334 session = __open_export_target_session(mdsc, target); 1335 mutex_unlock(&mdsc->mutex); 1336 1337 return session; 1338 } 1339 1340 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1341 struct ceph_mds_session *session) 1342 { 1343 struct ceph_mds_info *mi; 1344 struct ceph_mds_session *ts; 1345 int i, mds = session->s_mds; 1346 1347 if (mds >= mdsc->mdsmap->possible_max_rank) 1348 return; 1349 1350 mi = &mdsc->mdsmap->m_info[mds]; 1351 dout("open_export_target_sessions for mds%d (%d targets)\n", 1352 session->s_mds, mi->num_export_targets); 1353 1354 for (i = 0; i < mi->num_export_targets; i++) { 1355 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1356 if (!IS_ERR(ts)) 1357 ceph_put_mds_session(ts); 1358 } 1359 } 1360 1361 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1362 struct ceph_mds_session *session) 1363 { 1364 mutex_lock(&mdsc->mutex); 1365 __open_export_target_sessions(mdsc, session); 1366 mutex_unlock(&mdsc->mutex); 1367 } 1368 1369 /* 1370 * session caps 1371 */ 1372 1373 static void detach_cap_releases(struct ceph_mds_session *session, 1374 struct list_head *target) 1375 { 1376 lockdep_assert_held(&session->s_cap_lock); 1377 1378 list_splice_init(&session->s_cap_releases, target); 1379 session->s_num_cap_releases = 0; 1380 dout("dispose_cap_releases mds%d\n", session->s_mds); 1381 } 1382 1383 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1384 struct list_head *dispose) 1385 { 1386 while (!list_empty(dispose)) { 1387 struct ceph_cap *cap; 1388 /* zero out the in-progress message */ 1389 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1390 list_del(&cap->session_caps); 1391 ceph_put_cap(mdsc, cap); 1392 } 1393 } 1394 1395 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1396 struct ceph_mds_session *session) 1397 { 1398 struct ceph_mds_request *req; 1399 struct rb_node *p; 1400 struct ceph_inode_info *ci; 1401 1402 dout("cleanup_session_requests mds%d\n", session->s_mds); 1403 mutex_lock(&mdsc->mutex); 1404 while (!list_empty(&session->s_unsafe)) { 1405 req = list_first_entry(&session->s_unsafe, 1406 struct ceph_mds_request, r_unsafe_item); 1407 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1408 req->r_tid); 1409 if (req->r_target_inode) { 1410 /* dropping unsafe change of inode's attributes */ 1411 ci = ceph_inode(req->r_target_inode); 1412 errseq_set(&ci->i_meta_err, -EIO); 1413 } 1414 if (req->r_unsafe_dir) { 1415 /* dropping unsafe directory operation */ 1416 ci = ceph_inode(req->r_unsafe_dir); 1417 errseq_set(&ci->i_meta_err, -EIO); 1418 } 1419 __unregister_request(mdsc, req); 1420 } 1421 /* zero r_attempts, so kick_requests() will re-send requests */ 1422 p = rb_first(&mdsc->request_tree); 1423 while (p) { 1424 req = rb_entry(p, struct ceph_mds_request, r_node); 1425 p = rb_next(p); 1426 if (req->r_session && 1427 req->r_session->s_mds == session->s_mds) 1428 req->r_attempts = 0; 1429 } 1430 mutex_unlock(&mdsc->mutex); 1431 } 1432 1433 /* 1434 * Helper to safely iterate over all caps associated with a session, with 1435 * special care taken to handle a racing __ceph_remove_cap(). 1436 * 1437 * Caller must hold session s_mutex. 1438 */ 1439 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1440 int (*cb)(struct inode *, struct ceph_cap *, 1441 void *), void *arg) 1442 { 1443 struct list_head *p; 1444 struct ceph_cap *cap; 1445 struct inode *inode, *last_inode = NULL; 1446 struct ceph_cap *old_cap = NULL; 1447 int ret; 1448 1449 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1450 spin_lock(&session->s_cap_lock); 1451 p = session->s_caps.next; 1452 while (p != &session->s_caps) { 1453 cap = list_entry(p, struct ceph_cap, session_caps); 1454 inode = igrab(&cap->ci->vfs_inode); 1455 if (!inode) { 1456 p = p->next; 1457 continue; 1458 } 1459 session->s_cap_iterator = cap; 1460 spin_unlock(&session->s_cap_lock); 1461 1462 if (last_inode) { 1463 /* avoid calling iput_final() while holding 1464 * s_mutex or in mds dispatch threads */ 1465 ceph_async_iput(last_inode); 1466 last_inode = NULL; 1467 } 1468 if (old_cap) { 1469 ceph_put_cap(session->s_mdsc, old_cap); 1470 old_cap = NULL; 1471 } 1472 1473 ret = cb(inode, cap, arg); 1474 last_inode = inode; 1475 1476 spin_lock(&session->s_cap_lock); 1477 p = p->next; 1478 if (!cap->ci) { 1479 dout("iterate_session_caps finishing cap %p removal\n", 1480 cap); 1481 BUG_ON(cap->session != session); 1482 cap->session = NULL; 1483 list_del_init(&cap->session_caps); 1484 session->s_nr_caps--; 1485 if (cap->queue_release) 1486 __ceph_queue_cap_release(session, cap); 1487 else 1488 old_cap = cap; /* put_cap it w/o locks held */ 1489 } 1490 if (ret < 0) 1491 goto out; 1492 } 1493 ret = 0; 1494 out: 1495 session->s_cap_iterator = NULL; 1496 spin_unlock(&session->s_cap_lock); 1497 1498 ceph_async_iput(last_inode); 1499 if (old_cap) 1500 ceph_put_cap(session->s_mdsc, old_cap); 1501 1502 return ret; 1503 } 1504 1505 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1506 void *arg) 1507 { 1508 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg; 1509 struct ceph_inode_info *ci = ceph_inode(inode); 1510 LIST_HEAD(to_remove); 1511 bool dirty_dropped = false; 1512 bool invalidate = false; 1513 1514 dout("removing cap %p, ci is %p, inode is %p\n", 1515 cap, ci, &ci->vfs_inode); 1516 spin_lock(&ci->i_ceph_lock); 1517 __ceph_remove_cap(cap, false); 1518 if (!ci->i_auth_cap) { 1519 struct ceph_cap_flush *cf; 1520 struct ceph_mds_client *mdsc = fsc->mdsc; 1521 1522 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 1523 if (inode->i_data.nrpages > 0) 1524 invalidate = true; 1525 if (ci->i_wrbuffer_ref > 0) 1526 mapping_set_error(&inode->i_data, -EIO); 1527 } 1528 1529 while (!list_empty(&ci->i_cap_flush_list)) { 1530 cf = list_first_entry(&ci->i_cap_flush_list, 1531 struct ceph_cap_flush, i_list); 1532 list_move(&cf->i_list, &to_remove); 1533 } 1534 1535 spin_lock(&mdsc->cap_dirty_lock); 1536 1537 list_for_each_entry(cf, &to_remove, i_list) 1538 list_del(&cf->g_list); 1539 1540 if (!list_empty(&ci->i_dirty_item)) { 1541 pr_warn_ratelimited( 1542 " dropping dirty %s state for %p %lld\n", 1543 ceph_cap_string(ci->i_dirty_caps), 1544 inode, ceph_ino(inode)); 1545 ci->i_dirty_caps = 0; 1546 list_del_init(&ci->i_dirty_item); 1547 dirty_dropped = true; 1548 } 1549 if (!list_empty(&ci->i_flushing_item)) { 1550 pr_warn_ratelimited( 1551 " dropping dirty+flushing %s state for %p %lld\n", 1552 ceph_cap_string(ci->i_flushing_caps), 1553 inode, ceph_ino(inode)); 1554 ci->i_flushing_caps = 0; 1555 list_del_init(&ci->i_flushing_item); 1556 mdsc->num_cap_flushing--; 1557 dirty_dropped = true; 1558 } 1559 spin_unlock(&mdsc->cap_dirty_lock); 1560 1561 if (dirty_dropped) { 1562 errseq_set(&ci->i_meta_err, -EIO); 1563 1564 if (ci->i_wrbuffer_ref_head == 0 && 1565 ci->i_wr_ref == 0 && 1566 ci->i_dirty_caps == 0 && 1567 ci->i_flushing_caps == 0) { 1568 ceph_put_snap_context(ci->i_head_snapc); 1569 ci->i_head_snapc = NULL; 1570 } 1571 } 1572 1573 if (atomic_read(&ci->i_filelock_ref) > 0) { 1574 /* make further file lock syscall return -EIO */ 1575 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK; 1576 pr_warn_ratelimited(" dropping file locks for %p %lld\n", 1577 inode, ceph_ino(inode)); 1578 } 1579 1580 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { 1581 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); 1582 ci->i_prealloc_cap_flush = NULL; 1583 } 1584 } 1585 spin_unlock(&ci->i_ceph_lock); 1586 while (!list_empty(&to_remove)) { 1587 struct ceph_cap_flush *cf; 1588 cf = list_first_entry(&to_remove, 1589 struct ceph_cap_flush, i_list); 1590 list_del(&cf->i_list); 1591 ceph_free_cap_flush(cf); 1592 } 1593 1594 wake_up_all(&ci->i_cap_wq); 1595 if (invalidate) 1596 ceph_queue_invalidate(inode); 1597 if (dirty_dropped) 1598 iput(inode); 1599 return 0; 1600 } 1601 1602 /* 1603 * caller must hold session s_mutex 1604 */ 1605 static void remove_session_caps(struct ceph_mds_session *session) 1606 { 1607 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1608 struct super_block *sb = fsc->sb; 1609 LIST_HEAD(dispose); 1610 1611 dout("remove_session_caps on %p\n", session); 1612 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1613 1614 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1615 1616 spin_lock(&session->s_cap_lock); 1617 if (session->s_nr_caps > 0) { 1618 struct inode *inode; 1619 struct ceph_cap *cap, *prev = NULL; 1620 struct ceph_vino vino; 1621 /* 1622 * iterate_session_caps() skips inodes that are being 1623 * deleted, we need to wait until deletions are complete. 1624 * __wait_on_freeing_inode() is designed for the job, 1625 * but it is not exported, so use lookup inode function 1626 * to access it. 1627 */ 1628 while (!list_empty(&session->s_caps)) { 1629 cap = list_entry(session->s_caps.next, 1630 struct ceph_cap, session_caps); 1631 if (cap == prev) 1632 break; 1633 prev = cap; 1634 vino = cap->ci->i_vino; 1635 spin_unlock(&session->s_cap_lock); 1636 1637 inode = ceph_find_inode(sb, vino); 1638 /* avoid calling iput_final() while holding s_mutex */ 1639 ceph_async_iput(inode); 1640 1641 spin_lock(&session->s_cap_lock); 1642 } 1643 } 1644 1645 // drop cap expires and unlock s_cap_lock 1646 detach_cap_releases(session, &dispose); 1647 1648 BUG_ON(session->s_nr_caps > 0); 1649 BUG_ON(!list_empty(&session->s_cap_flushing)); 1650 spin_unlock(&session->s_cap_lock); 1651 dispose_cap_releases(session->s_mdsc, &dispose); 1652 } 1653 1654 enum { 1655 RECONNECT, 1656 RENEWCAPS, 1657 FORCE_RO, 1658 }; 1659 1660 /* 1661 * wake up any threads waiting on this session's caps. if the cap is 1662 * old (didn't get renewed on the client reconnect), remove it now. 1663 * 1664 * caller must hold s_mutex. 1665 */ 1666 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1667 void *arg) 1668 { 1669 struct ceph_inode_info *ci = ceph_inode(inode); 1670 unsigned long ev = (unsigned long)arg; 1671 1672 if (ev == RECONNECT) { 1673 spin_lock(&ci->i_ceph_lock); 1674 ci->i_wanted_max_size = 0; 1675 ci->i_requested_max_size = 0; 1676 spin_unlock(&ci->i_ceph_lock); 1677 } else if (ev == RENEWCAPS) { 1678 if (cap->cap_gen < cap->session->s_cap_gen) { 1679 /* mds did not re-issue stale cap */ 1680 spin_lock(&ci->i_ceph_lock); 1681 cap->issued = cap->implemented = CEPH_CAP_PIN; 1682 spin_unlock(&ci->i_ceph_lock); 1683 } 1684 } else if (ev == FORCE_RO) { 1685 } 1686 wake_up_all(&ci->i_cap_wq); 1687 return 0; 1688 } 1689 1690 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1691 { 1692 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1693 ceph_iterate_session_caps(session, wake_up_session_cb, 1694 (void *)(unsigned long)ev); 1695 } 1696 1697 /* 1698 * Send periodic message to MDS renewing all currently held caps. The 1699 * ack will reset the expiration for all caps from this session. 1700 * 1701 * caller holds s_mutex 1702 */ 1703 static int send_renew_caps(struct ceph_mds_client *mdsc, 1704 struct ceph_mds_session *session) 1705 { 1706 struct ceph_msg *msg; 1707 int state; 1708 1709 if (time_after_eq(jiffies, session->s_cap_ttl) && 1710 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1711 pr_info("mds%d caps stale\n", session->s_mds); 1712 session->s_renew_requested = jiffies; 1713 1714 /* do not try to renew caps until a recovering mds has reconnected 1715 * with its clients. */ 1716 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1717 if (state < CEPH_MDS_STATE_RECONNECT) { 1718 dout("send_renew_caps ignoring mds%d (%s)\n", 1719 session->s_mds, ceph_mds_state_name(state)); 1720 return 0; 1721 } 1722 1723 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1724 ceph_mds_state_name(state)); 1725 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1726 ++session->s_renew_seq); 1727 if (!msg) 1728 return -ENOMEM; 1729 ceph_con_send(&session->s_con, msg); 1730 return 0; 1731 } 1732 1733 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1734 struct ceph_mds_session *session, u64 seq) 1735 { 1736 struct ceph_msg *msg; 1737 1738 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1739 session->s_mds, ceph_session_state_name(session->s_state), seq); 1740 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1741 if (!msg) 1742 return -ENOMEM; 1743 ceph_con_send(&session->s_con, msg); 1744 return 0; 1745 } 1746 1747 1748 /* 1749 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1750 * 1751 * Called under session->s_mutex 1752 */ 1753 static void renewed_caps(struct ceph_mds_client *mdsc, 1754 struct ceph_mds_session *session, int is_renew) 1755 { 1756 int was_stale; 1757 int wake = 0; 1758 1759 spin_lock(&session->s_cap_lock); 1760 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1761 1762 session->s_cap_ttl = session->s_renew_requested + 1763 mdsc->mdsmap->m_session_timeout*HZ; 1764 1765 if (was_stale) { 1766 if (time_before(jiffies, session->s_cap_ttl)) { 1767 pr_info("mds%d caps renewed\n", session->s_mds); 1768 wake = 1; 1769 } else { 1770 pr_info("mds%d caps still stale\n", session->s_mds); 1771 } 1772 } 1773 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1774 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1775 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1776 spin_unlock(&session->s_cap_lock); 1777 1778 if (wake) 1779 wake_up_session_caps(session, RENEWCAPS); 1780 } 1781 1782 /* 1783 * send a session close request 1784 */ 1785 static int request_close_session(struct ceph_mds_client *mdsc, 1786 struct ceph_mds_session *session) 1787 { 1788 struct ceph_msg *msg; 1789 1790 dout("request_close_session mds%d state %s seq %lld\n", 1791 session->s_mds, ceph_session_state_name(session->s_state), 1792 session->s_seq); 1793 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1794 if (!msg) 1795 return -ENOMEM; 1796 ceph_con_send(&session->s_con, msg); 1797 return 1; 1798 } 1799 1800 /* 1801 * Called with s_mutex held. 1802 */ 1803 static int __close_session(struct ceph_mds_client *mdsc, 1804 struct ceph_mds_session *session) 1805 { 1806 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1807 return 0; 1808 session->s_state = CEPH_MDS_SESSION_CLOSING; 1809 return request_close_session(mdsc, session); 1810 } 1811 1812 static bool drop_negative_children(struct dentry *dentry) 1813 { 1814 struct dentry *child; 1815 bool all_negative = true; 1816 1817 if (!d_is_dir(dentry)) 1818 goto out; 1819 1820 spin_lock(&dentry->d_lock); 1821 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1822 if (d_really_is_positive(child)) { 1823 all_negative = false; 1824 break; 1825 } 1826 } 1827 spin_unlock(&dentry->d_lock); 1828 1829 if (all_negative) 1830 shrink_dcache_parent(dentry); 1831 out: 1832 return all_negative; 1833 } 1834 1835 /* 1836 * Trim old(er) caps. 1837 * 1838 * Because we can't cache an inode without one or more caps, we do 1839 * this indirectly: if a cap is unused, we prune its aliases, at which 1840 * point the inode will hopefully get dropped to. 1841 * 1842 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1843 * memory pressure from the MDS, though, so it needn't be perfect. 1844 */ 1845 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1846 { 1847 int *remaining = arg; 1848 struct ceph_inode_info *ci = ceph_inode(inode); 1849 int used, wanted, oissued, mine; 1850 1851 if (*remaining <= 0) 1852 return -1; 1853 1854 spin_lock(&ci->i_ceph_lock); 1855 mine = cap->issued | cap->implemented; 1856 used = __ceph_caps_used(ci); 1857 wanted = __ceph_caps_file_wanted(ci); 1858 oissued = __ceph_caps_issued_other(ci, cap); 1859 1860 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1861 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1862 ceph_cap_string(used), ceph_cap_string(wanted)); 1863 if (cap == ci->i_auth_cap) { 1864 if (ci->i_dirty_caps || ci->i_flushing_caps || 1865 !list_empty(&ci->i_cap_snaps)) 1866 goto out; 1867 if ((used | wanted) & CEPH_CAP_ANY_WR) 1868 goto out; 1869 /* Note: it's possible that i_filelock_ref becomes non-zero 1870 * after dropping auth caps. It doesn't hurt because reply 1871 * of lock mds request will re-add auth caps. */ 1872 if (atomic_read(&ci->i_filelock_ref) > 0) 1873 goto out; 1874 } 1875 /* The inode has cached pages, but it's no longer used. 1876 * we can safely drop it */ 1877 if (S_ISREG(inode->i_mode) && 1878 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 1879 !(oissued & CEPH_CAP_FILE_CACHE)) { 1880 used = 0; 1881 oissued = 0; 1882 } 1883 if ((used | wanted) & ~oissued & mine) 1884 goto out; /* we need these caps */ 1885 1886 if (oissued) { 1887 /* we aren't the only cap.. just remove us */ 1888 __ceph_remove_cap(cap, true); 1889 (*remaining)--; 1890 } else { 1891 struct dentry *dentry; 1892 /* try dropping referring dentries */ 1893 spin_unlock(&ci->i_ceph_lock); 1894 dentry = d_find_any_alias(inode); 1895 if (dentry && drop_negative_children(dentry)) { 1896 int count; 1897 dput(dentry); 1898 d_prune_aliases(inode); 1899 count = atomic_read(&inode->i_count); 1900 if (count == 1) 1901 (*remaining)--; 1902 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1903 inode, cap, count); 1904 } else { 1905 dput(dentry); 1906 } 1907 return 0; 1908 } 1909 1910 out: 1911 spin_unlock(&ci->i_ceph_lock); 1912 return 0; 1913 } 1914 1915 /* 1916 * Trim session cap count down to some max number. 1917 */ 1918 int ceph_trim_caps(struct ceph_mds_client *mdsc, 1919 struct ceph_mds_session *session, 1920 int max_caps) 1921 { 1922 int trim_caps = session->s_nr_caps - max_caps; 1923 1924 dout("trim_caps mds%d start: %d / %d, trim %d\n", 1925 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 1926 if (trim_caps > 0) { 1927 int remaining = trim_caps; 1928 1929 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 1930 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 1931 session->s_mds, session->s_nr_caps, max_caps, 1932 trim_caps - remaining); 1933 } 1934 1935 ceph_flush_cap_releases(mdsc, session); 1936 return 0; 1937 } 1938 1939 static int check_caps_flush(struct ceph_mds_client *mdsc, 1940 u64 want_flush_tid) 1941 { 1942 int ret = 1; 1943 1944 spin_lock(&mdsc->cap_dirty_lock); 1945 if (!list_empty(&mdsc->cap_flush_list)) { 1946 struct ceph_cap_flush *cf = 1947 list_first_entry(&mdsc->cap_flush_list, 1948 struct ceph_cap_flush, g_list); 1949 if (cf->tid <= want_flush_tid) { 1950 dout("check_caps_flush still flushing tid " 1951 "%llu <= %llu\n", cf->tid, want_flush_tid); 1952 ret = 0; 1953 } 1954 } 1955 spin_unlock(&mdsc->cap_dirty_lock); 1956 return ret; 1957 } 1958 1959 /* 1960 * flush all dirty inode data to disk. 1961 * 1962 * returns true if we've flushed through want_flush_tid 1963 */ 1964 static void wait_caps_flush(struct ceph_mds_client *mdsc, 1965 u64 want_flush_tid) 1966 { 1967 dout("check_caps_flush want %llu\n", want_flush_tid); 1968 1969 wait_event(mdsc->cap_flushing_wq, 1970 check_caps_flush(mdsc, want_flush_tid)); 1971 1972 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 1973 } 1974 1975 /* 1976 * called under s_mutex 1977 */ 1978 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 1979 struct ceph_mds_session *session) 1980 { 1981 struct ceph_msg *msg = NULL; 1982 struct ceph_mds_cap_release *head; 1983 struct ceph_mds_cap_item *item; 1984 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 1985 struct ceph_cap *cap; 1986 LIST_HEAD(tmp_list); 1987 int num_cap_releases; 1988 __le32 barrier, *cap_barrier; 1989 1990 down_read(&osdc->lock); 1991 barrier = cpu_to_le32(osdc->epoch_barrier); 1992 up_read(&osdc->lock); 1993 1994 spin_lock(&session->s_cap_lock); 1995 again: 1996 list_splice_init(&session->s_cap_releases, &tmp_list); 1997 num_cap_releases = session->s_num_cap_releases; 1998 session->s_num_cap_releases = 0; 1999 spin_unlock(&session->s_cap_lock); 2000 2001 while (!list_empty(&tmp_list)) { 2002 if (!msg) { 2003 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2004 PAGE_SIZE, GFP_NOFS, false); 2005 if (!msg) 2006 goto out_err; 2007 head = msg->front.iov_base; 2008 head->num = cpu_to_le32(0); 2009 msg->front.iov_len = sizeof(*head); 2010 2011 msg->hdr.version = cpu_to_le16(2); 2012 msg->hdr.compat_version = cpu_to_le16(1); 2013 } 2014 2015 cap = list_first_entry(&tmp_list, struct ceph_cap, 2016 session_caps); 2017 list_del(&cap->session_caps); 2018 num_cap_releases--; 2019 2020 head = msg->front.iov_base; 2021 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2022 &head->num); 2023 item = msg->front.iov_base + msg->front.iov_len; 2024 item->ino = cpu_to_le64(cap->cap_ino); 2025 item->cap_id = cpu_to_le64(cap->cap_id); 2026 item->migrate_seq = cpu_to_le32(cap->mseq); 2027 item->seq = cpu_to_le32(cap->issue_seq); 2028 msg->front.iov_len += sizeof(*item); 2029 2030 ceph_put_cap(mdsc, cap); 2031 2032 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2033 // Append cap_barrier field 2034 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2035 *cap_barrier = barrier; 2036 msg->front.iov_len += sizeof(*cap_barrier); 2037 2038 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2039 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2040 ceph_con_send(&session->s_con, msg); 2041 msg = NULL; 2042 } 2043 } 2044 2045 BUG_ON(num_cap_releases != 0); 2046 2047 spin_lock(&session->s_cap_lock); 2048 if (!list_empty(&session->s_cap_releases)) 2049 goto again; 2050 spin_unlock(&session->s_cap_lock); 2051 2052 if (msg) { 2053 // Append cap_barrier field 2054 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2055 *cap_barrier = barrier; 2056 msg->front.iov_len += sizeof(*cap_barrier); 2057 2058 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2059 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2060 ceph_con_send(&session->s_con, msg); 2061 } 2062 return; 2063 out_err: 2064 pr_err("send_cap_releases mds%d, failed to allocate message\n", 2065 session->s_mds); 2066 spin_lock(&session->s_cap_lock); 2067 list_splice(&tmp_list, &session->s_cap_releases); 2068 session->s_num_cap_releases += num_cap_releases; 2069 spin_unlock(&session->s_cap_lock); 2070 } 2071 2072 static void ceph_cap_release_work(struct work_struct *work) 2073 { 2074 struct ceph_mds_session *session = 2075 container_of(work, struct ceph_mds_session, s_cap_release_work); 2076 2077 mutex_lock(&session->s_mutex); 2078 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2079 session->s_state == CEPH_MDS_SESSION_HUNG) 2080 ceph_send_cap_releases(session->s_mdsc, session); 2081 mutex_unlock(&session->s_mutex); 2082 ceph_put_mds_session(session); 2083 } 2084 2085 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 2086 struct ceph_mds_session *session) 2087 { 2088 if (mdsc->stopping) 2089 return; 2090 2091 ceph_get_mds_session(session); 2092 if (queue_work(mdsc->fsc->cap_wq, 2093 &session->s_cap_release_work)) { 2094 dout("cap release work queued\n"); 2095 } else { 2096 ceph_put_mds_session(session); 2097 dout("failed to queue cap release work\n"); 2098 } 2099 } 2100 2101 /* 2102 * caller holds session->s_cap_lock 2103 */ 2104 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2105 struct ceph_cap *cap) 2106 { 2107 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2108 session->s_num_cap_releases++; 2109 2110 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2111 ceph_flush_cap_releases(session->s_mdsc, session); 2112 } 2113 2114 static void ceph_cap_reclaim_work(struct work_struct *work) 2115 { 2116 struct ceph_mds_client *mdsc = 2117 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2118 int ret = ceph_trim_dentries(mdsc); 2119 if (ret == -EAGAIN) 2120 ceph_queue_cap_reclaim_work(mdsc); 2121 } 2122 2123 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2124 { 2125 if (mdsc->stopping) 2126 return; 2127 2128 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2129 dout("caps reclaim work queued\n"); 2130 } else { 2131 dout("failed to queue caps release work\n"); 2132 } 2133 } 2134 2135 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2136 { 2137 int val; 2138 if (!nr) 2139 return; 2140 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2141 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2142 atomic_set(&mdsc->cap_reclaim_pending, 0); 2143 ceph_queue_cap_reclaim_work(mdsc); 2144 } 2145 } 2146 2147 /* 2148 * requests 2149 */ 2150 2151 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2152 struct inode *dir) 2153 { 2154 struct ceph_inode_info *ci = ceph_inode(dir); 2155 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2156 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2157 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2158 unsigned int num_entries; 2159 int order; 2160 2161 spin_lock(&ci->i_ceph_lock); 2162 num_entries = ci->i_files + ci->i_subdirs; 2163 spin_unlock(&ci->i_ceph_lock); 2164 num_entries = max(num_entries, 1U); 2165 num_entries = min(num_entries, opt->max_readdir); 2166 2167 order = get_order(size * num_entries); 2168 while (order >= 0) { 2169 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2170 __GFP_NOWARN, 2171 order); 2172 if (rinfo->dir_entries) 2173 break; 2174 order--; 2175 } 2176 if (!rinfo->dir_entries) 2177 return -ENOMEM; 2178 2179 num_entries = (PAGE_SIZE << order) / size; 2180 num_entries = min(num_entries, opt->max_readdir); 2181 2182 rinfo->dir_buf_size = PAGE_SIZE << order; 2183 req->r_num_caps = num_entries + 1; 2184 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2185 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2186 return 0; 2187 } 2188 2189 /* 2190 * Create an mds request. 2191 */ 2192 struct ceph_mds_request * 2193 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2194 { 2195 struct ceph_mds_request *req; 2196 2197 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2198 if (!req) 2199 return ERR_PTR(-ENOMEM); 2200 2201 mutex_init(&req->r_fill_mutex); 2202 req->r_mdsc = mdsc; 2203 req->r_started = jiffies; 2204 req->r_resend_mds = -1; 2205 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2206 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2207 req->r_fmode = -1; 2208 kref_init(&req->r_kref); 2209 RB_CLEAR_NODE(&req->r_node); 2210 INIT_LIST_HEAD(&req->r_wait); 2211 init_completion(&req->r_completion); 2212 init_completion(&req->r_safe_completion); 2213 INIT_LIST_HEAD(&req->r_unsafe_item); 2214 2215 ktime_get_coarse_real_ts64(&req->r_stamp); 2216 2217 req->r_op = op; 2218 req->r_direct_mode = mode; 2219 return req; 2220 } 2221 2222 /* 2223 * return oldest (lowest) request, tid in request tree, 0 if none. 2224 * 2225 * called under mdsc->mutex. 2226 */ 2227 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2228 { 2229 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2230 return NULL; 2231 return rb_entry(rb_first(&mdsc->request_tree), 2232 struct ceph_mds_request, r_node); 2233 } 2234 2235 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2236 { 2237 return mdsc->oldest_tid; 2238 } 2239 2240 /* 2241 * Build a dentry's path. Allocate on heap; caller must kfree. Based 2242 * on build_path_from_dentry in fs/cifs/dir.c. 2243 * 2244 * If @stop_on_nosnap, generate path relative to the first non-snapped 2245 * inode. 2246 * 2247 * Encode hidden .snap dirs as a double /, i.e. 2248 * foo/.snap/bar -> foo//bar 2249 */ 2250 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, 2251 int stop_on_nosnap) 2252 { 2253 struct dentry *temp; 2254 char *path; 2255 int pos; 2256 unsigned seq; 2257 u64 base; 2258 2259 if (!dentry) 2260 return ERR_PTR(-EINVAL); 2261 2262 path = __getname(); 2263 if (!path) 2264 return ERR_PTR(-ENOMEM); 2265 retry: 2266 pos = PATH_MAX - 1; 2267 path[pos] = '\0'; 2268 2269 seq = read_seqbegin(&rename_lock); 2270 rcu_read_lock(); 2271 temp = dentry; 2272 for (;;) { 2273 struct inode *inode; 2274 2275 spin_lock(&temp->d_lock); 2276 inode = d_inode(temp); 2277 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2278 dout("build_path path+%d: %p SNAPDIR\n", 2279 pos, temp); 2280 } else if (stop_on_nosnap && inode && dentry != temp && 2281 ceph_snap(inode) == CEPH_NOSNAP) { 2282 spin_unlock(&temp->d_lock); 2283 pos++; /* get rid of any prepended '/' */ 2284 break; 2285 } else { 2286 pos -= temp->d_name.len; 2287 if (pos < 0) { 2288 spin_unlock(&temp->d_lock); 2289 break; 2290 } 2291 memcpy(path + pos, temp->d_name.name, temp->d_name.len); 2292 } 2293 spin_unlock(&temp->d_lock); 2294 temp = READ_ONCE(temp->d_parent); 2295 2296 /* Are we at the root? */ 2297 if (IS_ROOT(temp)) 2298 break; 2299 2300 /* Are we out of buffer? */ 2301 if (--pos < 0) 2302 break; 2303 2304 path[pos] = '/'; 2305 } 2306 base = ceph_ino(d_inode(temp)); 2307 rcu_read_unlock(); 2308 2309 if (read_seqretry(&rename_lock, seq)) 2310 goto retry; 2311 2312 if (pos < 0) { 2313 /* 2314 * A rename didn't occur, but somehow we didn't end up where 2315 * we thought we would. Throw a warning and try again. 2316 */ 2317 pr_warn("build_path did not end path lookup where " 2318 "expected, pos is %d\n", pos); 2319 goto retry; 2320 } 2321 2322 *pbase = base; 2323 *plen = PATH_MAX - 1 - pos; 2324 dout("build_path on %p %d built %llx '%.*s'\n", 2325 dentry, d_count(dentry), base, *plen, path + pos); 2326 return path + pos; 2327 } 2328 2329 static int build_dentry_path(struct dentry *dentry, struct inode *dir, 2330 const char **ppath, int *ppathlen, u64 *pino, 2331 bool *pfreepath, bool parent_locked) 2332 { 2333 char *path; 2334 2335 rcu_read_lock(); 2336 if (!dir) 2337 dir = d_inode_rcu(dentry->d_parent); 2338 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { 2339 *pino = ceph_ino(dir); 2340 rcu_read_unlock(); 2341 *ppath = dentry->d_name.name; 2342 *ppathlen = dentry->d_name.len; 2343 return 0; 2344 } 2345 rcu_read_unlock(); 2346 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2347 if (IS_ERR(path)) 2348 return PTR_ERR(path); 2349 *ppath = path; 2350 *pfreepath = true; 2351 return 0; 2352 } 2353 2354 static int build_inode_path(struct inode *inode, 2355 const char **ppath, int *ppathlen, u64 *pino, 2356 bool *pfreepath) 2357 { 2358 struct dentry *dentry; 2359 char *path; 2360 2361 if (ceph_snap(inode) == CEPH_NOSNAP) { 2362 *pino = ceph_ino(inode); 2363 *ppathlen = 0; 2364 return 0; 2365 } 2366 dentry = d_find_alias(inode); 2367 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2368 dput(dentry); 2369 if (IS_ERR(path)) 2370 return PTR_ERR(path); 2371 *ppath = path; 2372 *pfreepath = true; 2373 return 0; 2374 } 2375 2376 /* 2377 * request arguments may be specified via an inode *, a dentry *, or 2378 * an explicit ino+path. 2379 */ 2380 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2381 struct inode *rdiri, const char *rpath, 2382 u64 rino, const char **ppath, int *pathlen, 2383 u64 *ino, bool *freepath, bool parent_locked) 2384 { 2385 int r = 0; 2386 2387 if (rinode) { 2388 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2389 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2390 ceph_snap(rinode)); 2391 } else if (rdentry) { 2392 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2393 freepath, parent_locked); 2394 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2395 *ppath); 2396 } else if (rpath || rino) { 2397 *ino = rino; 2398 *ppath = rpath; 2399 *pathlen = rpath ? strlen(rpath) : 0; 2400 dout(" path %.*s\n", *pathlen, rpath); 2401 } 2402 2403 return r; 2404 } 2405 2406 /* 2407 * called under mdsc->mutex 2408 */ 2409 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, 2410 struct ceph_mds_request *req, 2411 int mds, bool drop_cap_releases) 2412 { 2413 struct ceph_msg *msg; 2414 struct ceph_mds_request_head *head; 2415 const char *path1 = NULL; 2416 const char *path2 = NULL; 2417 u64 ino1 = 0, ino2 = 0; 2418 int pathlen1 = 0, pathlen2 = 0; 2419 bool freepath1 = false, freepath2 = false; 2420 int len; 2421 u16 releases; 2422 void *p, *end; 2423 int ret; 2424 2425 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2426 req->r_parent, req->r_path1, req->r_ino1.ino, 2427 &path1, &pathlen1, &ino1, &freepath1, 2428 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2429 &req->r_req_flags)); 2430 if (ret < 0) { 2431 msg = ERR_PTR(ret); 2432 goto out; 2433 } 2434 2435 /* If r_old_dentry is set, then assume that its parent is locked */ 2436 ret = set_request_path_attr(NULL, req->r_old_dentry, 2437 req->r_old_dentry_dir, 2438 req->r_path2, req->r_ino2.ino, 2439 &path2, &pathlen2, &ino2, &freepath2, true); 2440 if (ret < 0) { 2441 msg = ERR_PTR(ret); 2442 goto out_free1; 2443 } 2444 2445 len = sizeof(*head) + 2446 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2447 sizeof(struct ceph_timespec); 2448 2449 /* calculate (max) length for cap releases */ 2450 len += sizeof(struct ceph_mds_request_release) * 2451 (!!req->r_inode_drop + !!req->r_dentry_drop + 2452 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2453 if (req->r_dentry_drop) 2454 len += pathlen1; 2455 if (req->r_old_dentry_drop) 2456 len += pathlen2; 2457 2458 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2459 if (!msg) { 2460 msg = ERR_PTR(-ENOMEM); 2461 goto out_free2; 2462 } 2463 2464 msg->hdr.version = cpu_to_le16(2); 2465 msg->hdr.tid = cpu_to_le64(req->r_tid); 2466 2467 head = msg->front.iov_base; 2468 p = msg->front.iov_base + sizeof(*head); 2469 end = msg->front.iov_base + msg->front.iov_len; 2470 2471 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2472 head->op = cpu_to_le32(req->r_op); 2473 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid)); 2474 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid)); 2475 head->ino = cpu_to_le64(req->r_deleg_ino); 2476 head->args = req->r_args; 2477 2478 ceph_encode_filepath(&p, end, ino1, path1); 2479 ceph_encode_filepath(&p, end, ino2, path2); 2480 2481 /* make note of release offset, in case we need to replay */ 2482 req->r_request_release_offset = p - msg->front.iov_base; 2483 2484 /* cap releases */ 2485 releases = 0; 2486 if (req->r_inode_drop) 2487 releases += ceph_encode_inode_release(&p, 2488 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2489 mds, req->r_inode_drop, req->r_inode_unless, 2490 req->r_op == CEPH_MDS_OP_READDIR); 2491 if (req->r_dentry_drop) 2492 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2493 req->r_parent, mds, req->r_dentry_drop, 2494 req->r_dentry_unless); 2495 if (req->r_old_dentry_drop) 2496 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2497 req->r_old_dentry_dir, mds, 2498 req->r_old_dentry_drop, 2499 req->r_old_dentry_unless); 2500 if (req->r_old_inode_drop) 2501 releases += ceph_encode_inode_release(&p, 2502 d_inode(req->r_old_dentry), 2503 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2504 2505 if (drop_cap_releases) { 2506 releases = 0; 2507 p = msg->front.iov_base + req->r_request_release_offset; 2508 } 2509 2510 head->num_releases = cpu_to_le16(releases); 2511 2512 /* time stamp */ 2513 { 2514 struct ceph_timespec ts; 2515 ceph_encode_timespec64(&ts, &req->r_stamp); 2516 ceph_encode_copy(&p, &ts, sizeof(ts)); 2517 } 2518 2519 BUG_ON(p > end); 2520 msg->front.iov_len = p - msg->front.iov_base; 2521 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2522 2523 if (req->r_pagelist) { 2524 struct ceph_pagelist *pagelist = req->r_pagelist; 2525 ceph_msg_data_add_pagelist(msg, pagelist); 2526 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2527 } else { 2528 msg->hdr.data_len = 0; 2529 } 2530 2531 msg->hdr.data_off = cpu_to_le16(0); 2532 2533 out_free2: 2534 if (freepath2) 2535 ceph_mdsc_free_path((char *)path2, pathlen2); 2536 out_free1: 2537 if (freepath1) 2538 ceph_mdsc_free_path((char *)path1, pathlen1); 2539 out: 2540 return msg; 2541 } 2542 2543 /* 2544 * called under mdsc->mutex if error, under no mutex if 2545 * success. 2546 */ 2547 static void complete_request(struct ceph_mds_client *mdsc, 2548 struct ceph_mds_request *req) 2549 { 2550 if (req->r_callback) 2551 req->r_callback(mdsc, req); 2552 complete_all(&req->r_completion); 2553 } 2554 2555 /* 2556 * called under mdsc->mutex 2557 */ 2558 static int __prepare_send_request(struct ceph_mds_client *mdsc, 2559 struct ceph_mds_request *req, 2560 int mds, bool drop_cap_releases) 2561 { 2562 struct ceph_mds_request_head *rhead; 2563 struct ceph_msg *msg; 2564 int flags = 0; 2565 2566 req->r_attempts++; 2567 if (req->r_inode) { 2568 struct ceph_cap *cap = 2569 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2570 2571 if (cap) 2572 req->r_sent_on_mseq = cap->mseq; 2573 else 2574 req->r_sent_on_mseq = -1; 2575 } 2576 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 2577 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2578 2579 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2580 void *p; 2581 /* 2582 * Replay. Do not regenerate message (and rebuild 2583 * paths, etc.); just use the original message. 2584 * Rebuilding paths will break for renames because 2585 * d_move mangles the src name. 2586 */ 2587 msg = req->r_request; 2588 rhead = msg->front.iov_base; 2589 2590 flags = le32_to_cpu(rhead->flags); 2591 flags |= CEPH_MDS_FLAG_REPLAY; 2592 rhead->flags = cpu_to_le32(flags); 2593 2594 if (req->r_target_inode) 2595 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2596 2597 rhead->num_retry = req->r_attempts - 1; 2598 2599 /* remove cap/dentry releases from message */ 2600 rhead->num_releases = 0; 2601 2602 /* time stamp */ 2603 p = msg->front.iov_base + req->r_request_release_offset; 2604 { 2605 struct ceph_timespec ts; 2606 ceph_encode_timespec64(&ts, &req->r_stamp); 2607 ceph_encode_copy(&p, &ts, sizeof(ts)); 2608 } 2609 2610 msg->front.iov_len = p - msg->front.iov_base; 2611 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2612 return 0; 2613 } 2614 2615 if (req->r_request) { 2616 ceph_msg_put(req->r_request); 2617 req->r_request = NULL; 2618 } 2619 msg = create_request_message(mdsc, req, mds, drop_cap_releases); 2620 if (IS_ERR(msg)) { 2621 req->r_err = PTR_ERR(msg); 2622 return PTR_ERR(msg); 2623 } 2624 req->r_request = msg; 2625 2626 rhead = msg->front.iov_base; 2627 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2628 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2629 flags |= CEPH_MDS_FLAG_REPLAY; 2630 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 2631 flags |= CEPH_MDS_FLAG_ASYNC; 2632 if (req->r_parent) 2633 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2634 rhead->flags = cpu_to_le32(flags); 2635 rhead->num_fwd = req->r_num_fwd; 2636 rhead->num_retry = req->r_attempts - 1; 2637 2638 dout(" r_parent = %p\n", req->r_parent); 2639 return 0; 2640 } 2641 2642 /* 2643 * called under mdsc->mutex 2644 */ 2645 static int __send_request(struct ceph_mds_client *mdsc, 2646 struct ceph_mds_session *session, 2647 struct ceph_mds_request *req, 2648 bool drop_cap_releases) 2649 { 2650 int err; 2651 2652 err = __prepare_send_request(mdsc, req, session->s_mds, 2653 drop_cap_releases); 2654 if (!err) { 2655 ceph_msg_get(req->r_request); 2656 ceph_con_send(&session->s_con, req->r_request); 2657 } 2658 2659 return err; 2660 } 2661 2662 /* 2663 * send request, or put it on the appropriate wait list. 2664 */ 2665 static void __do_request(struct ceph_mds_client *mdsc, 2666 struct ceph_mds_request *req) 2667 { 2668 struct ceph_mds_session *session = NULL; 2669 int mds = -1; 2670 int err = 0; 2671 bool random; 2672 2673 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2674 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2675 __unregister_request(mdsc, req); 2676 return; 2677 } 2678 2679 if (req->r_timeout && 2680 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2681 dout("do_request timed out\n"); 2682 err = -ETIMEDOUT; 2683 goto finish; 2684 } 2685 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2686 dout("do_request forced umount\n"); 2687 err = -EIO; 2688 goto finish; 2689 } 2690 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2691 if (mdsc->mdsmap_err) { 2692 err = mdsc->mdsmap_err; 2693 dout("do_request mdsmap err %d\n", err); 2694 goto finish; 2695 } 2696 if (mdsc->mdsmap->m_epoch == 0) { 2697 dout("do_request no mdsmap, waiting for map\n"); 2698 list_add(&req->r_wait, &mdsc->waiting_for_map); 2699 return; 2700 } 2701 if (!(mdsc->fsc->mount_options->flags & 2702 CEPH_MOUNT_OPT_MOUNTWAIT) && 2703 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2704 err = -EHOSTUNREACH; 2705 goto finish; 2706 } 2707 } 2708 2709 put_request_session(req); 2710 2711 mds = __choose_mds(mdsc, req, &random); 2712 if (mds < 0 || 2713 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2714 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2715 err = -EJUKEBOX; 2716 goto finish; 2717 } 2718 dout("do_request no mds or not active, waiting for map\n"); 2719 list_add(&req->r_wait, &mdsc->waiting_for_map); 2720 return; 2721 } 2722 2723 /* get, open session */ 2724 session = __ceph_lookup_mds_session(mdsc, mds); 2725 if (!session) { 2726 session = register_session(mdsc, mds); 2727 if (IS_ERR(session)) { 2728 err = PTR_ERR(session); 2729 goto finish; 2730 } 2731 } 2732 req->r_session = ceph_get_mds_session(session); 2733 2734 dout("do_request mds%d session %p state %s\n", mds, session, 2735 ceph_session_state_name(session->s_state)); 2736 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2737 session->s_state != CEPH_MDS_SESSION_HUNG) { 2738 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2739 err = -EACCES; 2740 goto out_session; 2741 } 2742 /* 2743 * We cannot queue async requests since the caps and delegated 2744 * inodes are bound to the session. Just return -EJUKEBOX and 2745 * let the caller retry a sync request in that case. 2746 */ 2747 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2748 err = -EJUKEBOX; 2749 goto out_session; 2750 } 2751 if (session->s_state == CEPH_MDS_SESSION_NEW || 2752 session->s_state == CEPH_MDS_SESSION_CLOSING) { 2753 __open_session(mdsc, session); 2754 /* retry the same mds later */ 2755 if (random) 2756 req->r_resend_mds = mds; 2757 } 2758 list_add(&req->r_wait, &session->s_waiting); 2759 goto out_session; 2760 } 2761 2762 /* send request */ 2763 req->r_resend_mds = -1; /* forget any previous mds hint */ 2764 2765 if (req->r_request_started == 0) /* note request start time */ 2766 req->r_request_started = jiffies; 2767 2768 err = __send_request(mdsc, session, req, false); 2769 2770 out_session: 2771 ceph_put_mds_session(session); 2772 finish: 2773 if (err) { 2774 dout("__do_request early error %d\n", err); 2775 req->r_err = err; 2776 complete_request(mdsc, req); 2777 __unregister_request(mdsc, req); 2778 } 2779 return; 2780 } 2781 2782 /* 2783 * called under mdsc->mutex 2784 */ 2785 static void __wake_requests(struct ceph_mds_client *mdsc, 2786 struct list_head *head) 2787 { 2788 struct ceph_mds_request *req; 2789 LIST_HEAD(tmp_list); 2790 2791 list_splice_init(head, &tmp_list); 2792 2793 while (!list_empty(&tmp_list)) { 2794 req = list_entry(tmp_list.next, 2795 struct ceph_mds_request, r_wait); 2796 list_del_init(&req->r_wait); 2797 dout(" wake request %p tid %llu\n", req, req->r_tid); 2798 __do_request(mdsc, req); 2799 } 2800 } 2801 2802 /* 2803 * Wake up threads with requests pending for @mds, so that they can 2804 * resubmit their requests to a possibly different mds. 2805 */ 2806 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 2807 { 2808 struct ceph_mds_request *req; 2809 struct rb_node *p = rb_first(&mdsc->request_tree); 2810 2811 dout("kick_requests mds%d\n", mds); 2812 while (p) { 2813 req = rb_entry(p, struct ceph_mds_request, r_node); 2814 p = rb_next(p); 2815 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2816 continue; 2817 if (req->r_attempts > 0) 2818 continue; /* only new requests */ 2819 if (req->r_session && 2820 req->r_session->s_mds == mds) { 2821 dout(" kicking tid %llu\n", req->r_tid); 2822 list_del_init(&req->r_wait); 2823 __do_request(mdsc, req); 2824 } 2825 } 2826 } 2827 2828 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 2829 struct ceph_mds_request *req) 2830 { 2831 int err = 0; 2832 2833 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 2834 if (req->r_inode) 2835 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2836 if (req->r_parent) { 2837 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 2838 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 2839 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 2840 spin_lock(&ci->i_ceph_lock); 2841 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 2842 __ceph_touch_fmode(ci, mdsc, fmode); 2843 spin_unlock(&ci->i_ceph_lock); 2844 ihold(req->r_parent); 2845 } 2846 if (req->r_old_dentry_dir) 2847 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2848 CEPH_CAP_PIN); 2849 2850 if (req->r_inode) { 2851 err = ceph_wait_on_async_create(req->r_inode); 2852 if (err) { 2853 dout("%s: wait for async create returned: %d\n", 2854 __func__, err); 2855 return err; 2856 } 2857 } 2858 2859 if (!err && req->r_old_inode) { 2860 err = ceph_wait_on_async_create(req->r_old_inode); 2861 if (err) { 2862 dout("%s: wait for async create returned: %d\n", 2863 __func__, err); 2864 return err; 2865 } 2866 } 2867 2868 dout("submit_request on %p for inode %p\n", req, dir); 2869 mutex_lock(&mdsc->mutex); 2870 __register_request(mdsc, req, dir); 2871 __do_request(mdsc, req); 2872 err = req->r_err; 2873 mutex_unlock(&mdsc->mutex); 2874 return err; 2875 } 2876 2877 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 2878 struct ceph_mds_request *req) 2879 { 2880 int err; 2881 2882 /* wait */ 2883 dout("do_request waiting\n"); 2884 if (!req->r_timeout && req->r_wait_for_completion) { 2885 err = req->r_wait_for_completion(mdsc, req); 2886 } else { 2887 long timeleft = wait_for_completion_killable_timeout( 2888 &req->r_completion, 2889 ceph_timeout_jiffies(req->r_timeout)); 2890 if (timeleft > 0) 2891 err = 0; 2892 else if (!timeleft) 2893 err = -ETIMEDOUT; /* timed out */ 2894 else 2895 err = timeleft; /* killed */ 2896 } 2897 dout("do_request waited, got %d\n", err); 2898 mutex_lock(&mdsc->mutex); 2899 2900 /* only abort if we didn't race with a real reply */ 2901 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2902 err = le32_to_cpu(req->r_reply_info.head->result); 2903 } else if (err < 0) { 2904 dout("aborted request %lld with %d\n", req->r_tid, err); 2905 2906 /* 2907 * ensure we aren't running concurrently with 2908 * ceph_fill_trace or ceph_readdir_prepopulate, which 2909 * rely on locks (dir mutex) held by our caller. 2910 */ 2911 mutex_lock(&req->r_fill_mutex); 2912 req->r_err = err; 2913 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 2914 mutex_unlock(&req->r_fill_mutex); 2915 2916 if (req->r_parent && 2917 (req->r_op & CEPH_MDS_OP_WRITE)) 2918 ceph_invalidate_dir_request(req); 2919 } else { 2920 err = req->r_err; 2921 } 2922 2923 mutex_unlock(&mdsc->mutex); 2924 return err; 2925 } 2926 2927 /* 2928 * Synchrously perform an mds request. Take care of all of the 2929 * session setup, forwarding, retry details. 2930 */ 2931 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 2932 struct inode *dir, 2933 struct ceph_mds_request *req) 2934 { 2935 int err; 2936 2937 dout("do_request on %p\n", req); 2938 2939 /* issue */ 2940 err = ceph_mdsc_submit_request(mdsc, dir, req); 2941 if (!err) 2942 err = ceph_mdsc_wait_request(mdsc, req); 2943 dout("do_request %p done, result %d\n", req, err); 2944 return err; 2945 } 2946 2947 /* 2948 * Invalidate dir's completeness, dentry lease state on an aborted MDS 2949 * namespace request. 2950 */ 2951 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 2952 { 2953 struct inode *dir = req->r_parent; 2954 struct inode *old_dir = req->r_old_dentry_dir; 2955 2956 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 2957 2958 ceph_dir_clear_complete(dir); 2959 if (old_dir) 2960 ceph_dir_clear_complete(old_dir); 2961 if (req->r_dentry) 2962 ceph_invalidate_dentry_lease(req->r_dentry); 2963 if (req->r_old_dentry) 2964 ceph_invalidate_dentry_lease(req->r_old_dentry); 2965 } 2966 2967 /* 2968 * Handle mds reply. 2969 * 2970 * We take the session mutex and parse and process the reply immediately. 2971 * This preserves the logical ordering of replies, capabilities, etc., sent 2972 * by the MDS as they are applied to our local cache. 2973 */ 2974 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 2975 { 2976 struct ceph_mds_client *mdsc = session->s_mdsc; 2977 struct ceph_mds_request *req; 2978 struct ceph_mds_reply_head *head = msg->front.iov_base; 2979 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 2980 struct ceph_snap_realm *realm; 2981 u64 tid; 2982 int err, result; 2983 int mds = session->s_mds; 2984 2985 if (msg->front.iov_len < sizeof(*head)) { 2986 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 2987 ceph_msg_dump(msg); 2988 return; 2989 } 2990 2991 /* get request, session */ 2992 tid = le64_to_cpu(msg->hdr.tid); 2993 mutex_lock(&mdsc->mutex); 2994 req = lookup_get_request(mdsc, tid); 2995 if (!req) { 2996 dout("handle_reply on unknown tid %llu\n", tid); 2997 mutex_unlock(&mdsc->mutex); 2998 return; 2999 } 3000 dout("handle_reply %p\n", req); 3001 3002 /* correct session? */ 3003 if (req->r_session != session) { 3004 pr_err("mdsc_handle_reply got %llu on session mds%d" 3005 " not mds%d\n", tid, session->s_mds, 3006 req->r_session ? req->r_session->s_mds : -1); 3007 mutex_unlock(&mdsc->mutex); 3008 goto out; 3009 } 3010 3011 /* dup? */ 3012 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3013 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3014 pr_warn("got a dup %s reply on %llu from mds%d\n", 3015 head->safe ? "safe" : "unsafe", tid, mds); 3016 mutex_unlock(&mdsc->mutex); 3017 goto out; 3018 } 3019 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3020 pr_warn("got unsafe after safe on %llu from mds%d\n", 3021 tid, mds); 3022 mutex_unlock(&mdsc->mutex); 3023 goto out; 3024 } 3025 3026 result = le32_to_cpu(head->result); 3027 3028 /* 3029 * Handle an ESTALE 3030 * if we're not talking to the authority, send to them 3031 * if the authority has changed while we weren't looking, 3032 * send to new authority 3033 * Otherwise we just have to return an ESTALE 3034 */ 3035 if (result == -ESTALE) { 3036 dout("got ESTALE on request %llu\n", req->r_tid); 3037 req->r_resend_mds = -1; 3038 if (req->r_direct_mode != USE_AUTH_MDS) { 3039 dout("not using auth, setting for that now\n"); 3040 req->r_direct_mode = USE_AUTH_MDS; 3041 __do_request(mdsc, req); 3042 mutex_unlock(&mdsc->mutex); 3043 goto out; 3044 } else { 3045 int mds = __choose_mds(mdsc, req, NULL); 3046 if (mds >= 0 && mds != req->r_session->s_mds) { 3047 dout("but auth changed, so resending\n"); 3048 __do_request(mdsc, req); 3049 mutex_unlock(&mdsc->mutex); 3050 goto out; 3051 } 3052 } 3053 dout("have to return ESTALE on request %llu\n", req->r_tid); 3054 } 3055 3056 3057 if (head->safe) { 3058 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3059 __unregister_request(mdsc, req); 3060 3061 /* last request during umount? */ 3062 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3063 complete_all(&mdsc->safe_umount_waiters); 3064 3065 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3066 /* 3067 * We already handled the unsafe response, now do the 3068 * cleanup. No need to examine the response; the MDS 3069 * doesn't include any result info in the safe 3070 * response. And even if it did, there is nothing 3071 * useful we could do with a revised return value. 3072 */ 3073 dout("got safe reply %llu, mds%d\n", tid, mds); 3074 3075 mutex_unlock(&mdsc->mutex); 3076 goto out; 3077 } 3078 } else { 3079 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3080 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3081 } 3082 3083 dout("handle_reply tid %lld result %d\n", tid, result); 3084 rinfo = &req->r_reply_info; 3085 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3086 err = parse_reply_info(session, msg, rinfo, (u64)-1); 3087 else 3088 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); 3089 mutex_unlock(&mdsc->mutex); 3090 3091 mutex_lock(&session->s_mutex); 3092 if (err < 0) { 3093 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 3094 ceph_msg_dump(msg); 3095 goto out_err; 3096 } 3097 3098 /* snap trace */ 3099 realm = NULL; 3100 if (rinfo->snapblob_len) { 3101 down_write(&mdsc->snap_rwsem); 3102 ceph_update_snap_trace(mdsc, rinfo->snapblob, 3103 rinfo->snapblob + rinfo->snapblob_len, 3104 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3105 &realm); 3106 downgrade_write(&mdsc->snap_rwsem); 3107 } else { 3108 down_read(&mdsc->snap_rwsem); 3109 } 3110 3111 /* insert trace into our cache */ 3112 mutex_lock(&req->r_fill_mutex); 3113 current->journal_info = req; 3114 err = ceph_fill_trace(mdsc->fsc->sb, req); 3115 if (err == 0) { 3116 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3117 req->r_op == CEPH_MDS_OP_LSSNAP)) 3118 ceph_readdir_prepopulate(req, req->r_session); 3119 } 3120 current->journal_info = NULL; 3121 mutex_unlock(&req->r_fill_mutex); 3122 3123 up_read(&mdsc->snap_rwsem); 3124 if (realm) 3125 ceph_put_snap_realm(mdsc, realm); 3126 3127 if (err == 0) { 3128 if (req->r_target_inode && 3129 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3130 struct ceph_inode_info *ci = 3131 ceph_inode(req->r_target_inode); 3132 spin_lock(&ci->i_unsafe_lock); 3133 list_add_tail(&req->r_unsafe_target_item, 3134 &ci->i_unsafe_iops); 3135 spin_unlock(&ci->i_unsafe_lock); 3136 } 3137 3138 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3139 } 3140 out_err: 3141 mutex_lock(&mdsc->mutex); 3142 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3143 if (err) { 3144 req->r_err = err; 3145 } else { 3146 req->r_reply = ceph_msg_get(msg); 3147 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3148 } 3149 } else { 3150 dout("reply arrived after request %lld was aborted\n", tid); 3151 } 3152 mutex_unlock(&mdsc->mutex); 3153 3154 mutex_unlock(&session->s_mutex); 3155 3156 /* kick calling process */ 3157 complete_request(mdsc, req); 3158 out: 3159 ceph_mdsc_put_request(req); 3160 return; 3161 } 3162 3163 3164 3165 /* 3166 * handle mds notification that our request has been forwarded. 3167 */ 3168 static void handle_forward(struct ceph_mds_client *mdsc, 3169 struct ceph_mds_session *session, 3170 struct ceph_msg *msg) 3171 { 3172 struct ceph_mds_request *req; 3173 u64 tid = le64_to_cpu(msg->hdr.tid); 3174 u32 next_mds; 3175 u32 fwd_seq; 3176 int err = -EINVAL; 3177 void *p = msg->front.iov_base; 3178 void *end = p + msg->front.iov_len; 3179 3180 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3181 next_mds = ceph_decode_32(&p); 3182 fwd_seq = ceph_decode_32(&p); 3183 3184 mutex_lock(&mdsc->mutex); 3185 req = lookup_get_request(mdsc, tid); 3186 if (!req) { 3187 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 3188 goto out; /* dup reply? */ 3189 } 3190 3191 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3192 dout("forward tid %llu aborted, unregistering\n", tid); 3193 __unregister_request(mdsc, req); 3194 } else if (fwd_seq <= req->r_num_fwd) { 3195 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 3196 tid, next_mds, req->r_num_fwd, fwd_seq); 3197 } else { 3198 /* resend. forward race not possible; mds would drop */ 3199 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3200 BUG_ON(req->r_err); 3201 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3202 req->r_attempts = 0; 3203 req->r_num_fwd = fwd_seq; 3204 req->r_resend_mds = next_mds; 3205 put_request_session(req); 3206 __do_request(mdsc, req); 3207 } 3208 ceph_mdsc_put_request(req); 3209 out: 3210 mutex_unlock(&mdsc->mutex); 3211 return; 3212 3213 bad: 3214 pr_err("mdsc_handle_forward decode error err=%d\n", err); 3215 } 3216 3217 static int __decode_session_metadata(void **p, void *end, 3218 bool *blacklisted) 3219 { 3220 /* map<string,string> */ 3221 u32 n; 3222 bool err_str; 3223 ceph_decode_32_safe(p, end, n, bad); 3224 while (n-- > 0) { 3225 u32 len; 3226 ceph_decode_32_safe(p, end, len, bad); 3227 ceph_decode_need(p, end, len, bad); 3228 err_str = !strncmp(*p, "error_string", len); 3229 *p += len; 3230 ceph_decode_32_safe(p, end, len, bad); 3231 ceph_decode_need(p, end, len, bad); 3232 if (err_str && strnstr(*p, "blacklisted", len)) 3233 *blacklisted = true; 3234 *p += len; 3235 } 3236 return 0; 3237 bad: 3238 return -1; 3239 } 3240 3241 /* 3242 * handle a mds session control message 3243 */ 3244 static void handle_session(struct ceph_mds_session *session, 3245 struct ceph_msg *msg) 3246 { 3247 struct ceph_mds_client *mdsc = session->s_mdsc; 3248 int mds = session->s_mds; 3249 int msg_version = le16_to_cpu(msg->hdr.version); 3250 void *p = msg->front.iov_base; 3251 void *end = p + msg->front.iov_len; 3252 struct ceph_mds_session_head *h; 3253 u32 op; 3254 u64 seq; 3255 unsigned long features = 0; 3256 int wake = 0; 3257 bool blacklisted = false; 3258 3259 /* decode */ 3260 ceph_decode_need(&p, end, sizeof(*h), bad); 3261 h = p; 3262 p += sizeof(*h); 3263 3264 op = le32_to_cpu(h->op); 3265 seq = le64_to_cpu(h->seq); 3266 3267 if (msg_version >= 3) { 3268 u32 len; 3269 /* version >= 2, metadata */ 3270 if (__decode_session_metadata(&p, end, &blacklisted) < 0) 3271 goto bad; 3272 /* version >= 3, feature bits */ 3273 ceph_decode_32_safe(&p, end, len, bad); 3274 ceph_decode_need(&p, end, len, bad); 3275 memcpy(&features, p, min_t(size_t, len, sizeof(features))); 3276 p += len; 3277 } 3278 3279 mutex_lock(&mdsc->mutex); 3280 if (op == CEPH_SESSION_CLOSE) { 3281 ceph_get_mds_session(session); 3282 __unregister_session(mdsc, session); 3283 } 3284 /* FIXME: this ttl calculation is generous */ 3285 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 3286 mutex_unlock(&mdsc->mutex); 3287 3288 mutex_lock(&session->s_mutex); 3289 3290 dout("handle_session mds%d %s %p state %s seq %llu\n", 3291 mds, ceph_session_op_name(op), session, 3292 ceph_session_state_name(session->s_state), seq); 3293 3294 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 3295 session->s_state = CEPH_MDS_SESSION_OPEN; 3296 pr_info("mds%d came back\n", session->s_mds); 3297 } 3298 3299 switch (op) { 3300 case CEPH_SESSION_OPEN: 3301 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3302 pr_info("mds%d reconnect success\n", session->s_mds); 3303 session->s_state = CEPH_MDS_SESSION_OPEN; 3304 session->s_features = features; 3305 renewed_caps(mdsc, session, 0); 3306 wake = 1; 3307 if (mdsc->stopping) 3308 __close_session(mdsc, session); 3309 break; 3310 3311 case CEPH_SESSION_RENEWCAPS: 3312 if (session->s_renew_seq == seq) 3313 renewed_caps(mdsc, session, 1); 3314 break; 3315 3316 case CEPH_SESSION_CLOSE: 3317 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3318 pr_info("mds%d reconnect denied\n", session->s_mds); 3319 session->s_state = CEPH_MDS_SESSION_CLOSED; 3320 cleanup_session_requests(mdsc, session); 3321 remove_session_caps(session); 3322 wake = 2; /* for good measure */ 3323 wake_up_all(&mdsc->session_close_wq); 3324 break; 3325 3326 case CEPH_SESSION_STALE: 3327 pr_info("mds%d caps went stale, renewing\n", 3328 session->s_mds); 3329 spin_lock(&session->s_gen_ttl_lock); 3330 session->s_cap_gen++; 3331 session->s_cap_ttl = jiffies - 1; 3332 spin_unlock(&session->s_gen_ttl_lock); 3333 send_renew_caps(mdsc, session); 3334 break; 3335 3336 case CEPH_SESSION_RECALL_STATE: 3337 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 3338 break; 3339 3340 case CEPH_SESSION_FLUSHMSG: 3341 send_flushmsg_ack(mdsc, session, seq); 3342 break; 3343 3344 case CEPH_SESSION_FORCE_RO: 3345 dout("force_session_readonly %p\n", session); 3346 spin_lock(&session->s_cap_lock); 3347 session->s_readonly = true; 3348 spin_unlock(&session->s_cap_lock); 3349 wake_up_session_caps(session, FORCE_RO); 3350 break; 3351 3352 case CEPH_SESSION_REJECT: 3353 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 3354 pr_info("mds%d rejected session\n", session->s_mds); 3355 session->s_state = CEPH_MDS_SESSION_REJECTED; 3356 cleanup_session_requests(mdsc, session); 3357 remove_session_caps(session); 3358 if (blacklisted) 3359 mdsc->fsc->blacklisted = true; 3360 wake = 2; /* for good measure */ 3361 break; 3362 3363 default: 3364 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 3365 WARN_ON(1); 3366 } 3367 3368 mutex_unlock(&session->s_mutex); 3369 if (wake) { 3370 mutex_lock(&mdsc->mutex); 3371 __wake_requests(mdsc, &session->s_waiting); 3372 if (wake == 2) 3373 kick_requests(mdsc, mds); 3374 mutex_unlock(&mdsc->mutex); 3375 } 3376 if (op == CEPH_SESSION_CLOSE) 3377 ceph_put_mds_session(session); 3378 return; 3379 3380 bad: 3381 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 3382 (int)msg->front.iov_len); 3383 ceph_msg_dump(msg); 3384 return; 3385 } 3386 3387 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 3388 { 3389 int dcaps; 3390 3391 dcaps = xchg(&req->r_dir_caps, 0); 3392 if (dcaps) { 3393 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3394 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 3395 } 3396 } 3397 3398 /* 3399 * called under session->mutex. 3400 */ 3401 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 3402 struct ceph_mds_session *session) 3403 { 3404 struct ceph_mds_request *req, *nreq; 3405 struct rb_node *p; 3406 3407 dout("replay_unsafe_requests mds%d\n", session->s_mds); 3408 3409 mutex_lock(&mdsc->mutex); 3410 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 3411 __send_request(mdsc, session, req, true); 3412 3413 /* 3414 * also re-send old requests when MDS enters reconnect stage. So that MDS 3415 * can process completed request in clientreplay stage. 3416 */ 3417 p = rb_first(&mdsc->request_tree); 3418 while (p) { 3419 req = rb_entry(p, struct ceph_mds_request, r_node); 3420 p = rb_next(p); 3421 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3422 continue; 3423 if (req->r_attempts == 0) 3424 continue; /* only old requests */ 3425 if (!req->r_session) 3426 continue; 3427 if (req->r_session->s_mds != session->s_mds) 3428 continue; 3429 3430 ceph_mdsc_release_dir_caps(req); 3431 3432 __send_request(mdsc, session, req, true); 3433 } 3434 mutex_unlock(&mdsc->mutex); 3435 } 3436 3437 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 3438 { 3439 struct ceph_msg *reply; 3440 struct ceph_pagelist *_pagelist; 3441 struct page *page; 3442 __le32 *addr; 3443 int err = -ENOMEM; 3444 3445 if (!recon_state->allow_multi) 3446 return -ENOSPC; 3447 3448 /* can't handle message that contains both caps and realm */ 3449 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 3450 3451 /* pre-allocate new pagelist */ 3452 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 3453 if (!_pagelist) 3454 return -ENOMEM; 3455 3456 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3457 if (!reply) 3458 goto fail_msg; 3459 3460 /* placeholder for nr_caps */ 3461 err = ceph_pagelist_encode_32(_pagelist, 0); 3462 if (err < 0) 3463 goto fail; 3464 3465 if (recon_state->nr_caps) { 3466 /* currently encoding caps */ 3467 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 3468 if (err) 3469 goto fail; 3470 } else { 3471 /* placeholder for nr_realms (currently encoding relams) */ 3472 err = ceph_pagelist_encode_32(_pagelist, 0); 3473 if (err < 0) 3474 goto fail; 3475 } 3476 3477 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 3478 if (err) 3479 goto fail; 3480 3481 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 3482 addr = kmap_atomic(page); 3483 if (recon_state->nr_caps) { 3484 /* currently encoding caps */ 3485 *addr = cpu_to_le32(recon_state->nr_caps); 3486 } else { 3487 /* currently encoding relams */ 3488 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 3489 } 3490 kunmap_atomic(addr); 3491 3492 reply->hdr.version = cpu_to_le16(5); 3493 reply->hdr.compat_version = cpu_to_le16(4); 3494 3495 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 3496 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 3497 3498 ceph_con_send(&recon_state->session->s_con, reply); 3499 ceph_pagelist_release(recon_state->pagelist); 3500 3501 recon_state->pagelist = _pagelist; 3502 recon_state->nr_caps = 0; 3503 recon_state->nr_realms = 0; 3504 recon_state->msg_version = 5; 3505 return 0; 3506 fail: 3507 ceph_msg_put(reply); 3508 fail_msg: 3509 ceph_pagelist_release(_pagelist); 3510 return err; 3511 } 3512 3513 /* 3514 * Encode information about a cap for a reconnect with the MDS. 3515 */ 3516 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, 3517 void *arg) 3518 { 3519 union { 3520 struct ceph_mds_cap_reconnect v2; 3521 struct ceph_mds_cap_reconnect_v1 v1; 3522 } rec; 3523 struct ceph_inode_info *ci = cap->ci; 3524 struct ceph_reconnect_state *recon_state = arg; 3525 struct ceph_pagelist *pagelist = recon_state->pagelist; 3526 int err; 3527 u64 snap_follows; 3528 3529 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 3530 inode, ceph_vinop(inode), cap, cap->cap_id, 3531 ceph_cap_string(cap->issued)); 3532 3533 spin_lock(&ci->i_ceph_lock); 3534 cap->seq = 0; /* reset cap seq */ 3535 cap->issue_seq = 0; /* and issue_seq */ 3536 cap->mseq = 0; /* and migrate_seq */ 3537 cap->cap_gen = cap->session->s_cap_gen; 3538 3539 /* These are lost when the session goes away */ 3540 if (S_ISDIR(inode->i_mode)) { 3541 if (cap->issued & CEPH_CAP_DIR_CREATE) { 3542 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 3543 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 3544 } 3545 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 3546 } 3547 3548 if (recon_state->msg_version >= 2) { 3549 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 3550 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3551 rec.v2.issued = cpu_to_le32(cap->issued); 3552 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3553 rec.v2.pathbase = 0; 3554 rec.v2.flock_len = (__force __le32) 3555 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 3556 } else { 3557 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 3558 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3559 rec.v1.issued = cpu_to_le32(cap->issued); 3560 rec.v1.size = cpu_to_le64(inode->i_size); 3561 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 3562 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 3563 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3564 rec.v1.pathbase = 0; 3565 } 3566 3567 if (list_empty(&ci->i_cap_snaps)) { 3568 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 3569 } else { 3570 struct ceph_cap_snap *capsnap = 3571 list_first_entry(&ci->i_cap_snaps, 3572 struct ceph_cap_snap, ci_item); 3573 snap_follows = capsnap->follows; 3574 } 3575 spin_unlock(&ci->i_ceph_lock); 3576 3577 if (recon_state->msg_version >= 2) { 3578 int num_fcntl_locks, num_flock_locks; 3579 struct ceph_filelock *flocks = NULL; 3580 size_t struct_len, total_len = sizeof(u64); 3581 u8 struct_v = 0; 3582 3583 encode_again: 3584 if (rec.v2.flock_len) { 3585 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 3586 } else { 3587 num_fcntl_locks = 0; 3588 num_flock_locks = 0; 3589 } 3590 if (num_fcntl_locks + num_flock_locks > 0) { 3591 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 3592 sizeof(struct ceph_filelock), 3593 GFP_NOFS); 3594 if (!flocks) { 3595 err = -ENOMEM; 3596 goto out_err; 3597 } 3598 err = ceph_encode_locks_to_buffer(inode, flocks, 3599 num_fcntl_locks, 3600 num_flock_locks); 3601 if (err) { 3602 kfree(flocks); 3603 flocks = NULL; 3604 if (err == -ENOSPC) 3605 goto encode_again; 3606 goto out_err; 3607 } 3608 } else { 3609 kfree(flocks); 3610 flocks = NULL; 3611 } 3612 3613 if (recon_state->msg_version >= 3) { 3614 /* version, compat_version and struct_len */ 3615 total_len += 2 * sizeof(u8) + sizeof(u32); 3616 struct_v = 2; 3617 } 3618 /* 3619 * number of encoded locks is stable, so copy to pagelist 3620 */ 3621 struct_len = 2 * sizeof(u32) + 3622 (num_fcntl_locks + num_flock_locks) * 3623 sizeof(struct ceph_filelock); 3624 rec.v2.flock_len = cpu_to_le32(struct_len); 3625 3626 struct_len += sizeof(u32) + sizeof(rec.v2); 3627 3628 if (struct_v >= 2) 3629 struct_len += sizeof(u64); /* snap_follows */ 3630 3631 total_len += struct_len; 3632 3633 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 3634 err = send_reconnect_partial(recon_state); 3635 if (err) 3636 goto out_freeflocks; 3637 pagelist = recon_state->pagelist; 3638 } 3639 3640 err = ceph_pagelist_reserve(pagelist, total_len); 3641 if (err) 3642 goto out_freeflocks; 3643 3644 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3645 if (recon_state->msg_version >= 3) { 3646 ceph_pagelist_encode_8(pagelist, struct_v); 3647 ceph_pagelist_encode_8(pagelist, 1); 3648 ceph_pagelist_encode_32(pagelist, struct_len); 3649 } 3650 ceph_pagelist_encode_string(pagelist, NULL, 0); 3651 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 3652 ceph_locks_to_pagelist(flocks, pagelist, 3653 num_fcntl_locks, num_flock_locks); 3654 if (struct_v >= 2) 3655 ceph_pagelist_encode_64(pagelist, snap_follows); 3656 out_freeflocks: 3657 kfree(flocks); 3658 } else { 3659 u64 pathbase = 0; 3660 int pathlen = 0; 3661 char *path = NULL; 3662 struct dentry *dentry; 3663 3664 dentry = d_find_alias(inode); 3665 if (dentry) { 3666 path = ceph_mdsc_build_path(dentry, 3667 &pathlen, &pathbase, 0); 3668 dput(dentry); 3669 if (IS_ERR(path)) { 3670 err = PTR_ERR(path); 3671 goto out_err; 3672 } 3673 rec.v1.pathbase = cpu_to_le64(pathbase); 3674 } 3675 3676 err = ceph_pagelist_reserve(pagelist, 3677 sizeof(u64) + sizeof(u32) + 3678 pathlen + sizeof(rec.v1)); 3679 if (err) { 3680 goto out_freepath; 3681 } 3682 3683 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3684 ceph_pagelist_encode_string(pagelist, path, pathlen); 3685 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 3686 out_freepath: 3687 ceph_mdsc_free_path(path, pathlen); 3688 } 3689 3690 out_err: 3691 if (err >= 0) 3692 recon_state->nr_caps++; 3693 return err; 3694 } 3695 3696 static int encode_snap_realms(struct ceph_mds_client *mdsc, 3697 struct ceph_reconnect_state *recon_state) 3698 { 3699 struct rb_node *p; 3700 struct ceph_pagelist *pagelist = recon_state->pagelist; 3701 int err = 0; 3702 3703 if (recon_state->msg_version >= 4) { 3704 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 3705 if (err < 0) 3706 goto fail; 3707 } 3708 3709 /* 3710 * snaprealms. we provide mds with the ino, seq (version), and 3711 * parent for all of our realms. If the mds has any newer info, 3712 * it will tell us. 3713 */ 3714 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 3715 struct ceph_snap_realm *realm = 3716 rb_entry(p, struct ceph_snap_realm, node); 3717 struct ceph_mds_snaprealm_reconnect sr_rec; 3718 3719 if (recon_state->msg_version >= 4) { 3720 size_t need = sizeof(u8) * 2 + sizeof(u32) + 3721 sizeof(sr_rec); 3722 3723 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 3724 err = send_reconnect_partial(recon_state); 3725 if (err) 3726 goto fail; 3727 pagelist = recon_state->pagelist; 3728 } 3729 3730 err = ceph_pagelist_reserve(pagelist, need); 3731 if (err) 3732 goto fail; 3733 3734 ceph_pagelist_encode_8(pagelist, 1); 3735 ceph_pagelist_encode_8(pagelist, 1); 3736 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 3737 } 3738 3739 dout(" adding snap realm %llx seq %lld parent %llx\n", 3740 realm->ino, realm->seq, realm->parent_ino); 3741 sr_rec.ino = cpu_to_le64(realm->ino); 3742 sr_rec.seq = cpu_to_le64(realm->seq); 3743 sr_rec.parent = cpu_to_le64(realm->parent_ino); 3744 3745 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 3746 if (err) 3747 goto fail; 3748 3749 recon_state->nr_realms++; 3750 } 3751 fail: 3752 return err; 3753 } 3754 3755 3756 /* 3757 * If an MDS fails and recovers, clients need to reconnect in order to 3758 * reestablish shared state. This includes all caps issued through 3759 * this session _and_ the snap_realm hierarchy. Because it's not 3760 * clear which snap realms the mds cares about, we send everything we 3761 * know about.. that ensures we'll then get any new info the 3762 * recovering MDS might have. 3763 * 3764 * This is a relatively heavyweight operation, but it's rare. 3765 * 3766 * called with mdsc->mutex held. 3767 */ 3768 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 3769 struct ceph_mds_session *session) 3770 { 3771 struct ceph_msg *reply; 3772 int mds = session->s_mds; 3773 int err = -ENOMEM; 3774 struct ceph_reconnect_state recon_state = { 3775 .session = session, 3776 }; 3777 LIST_HEAD(dispose); 3778 3779 pr_info("mds%d reconnect start\n", mds); 3780 3781 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 3782 if (!recon_state.pagelist) 3783 goto fail_nopagelist; 3784 3785 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3786 if (!reply) 3787 goto fail_nomsg; 3788 3789 xa_destroy(&session->s_delegated_inos); 3790 3791 mutex_lock(&session->s_mutex); 3792 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 3793 session->s_seq = 0; 3794 3795 dout("session %p state %s\n", session, 3796 ceph_session_state_name(session->s_state)); 3797 3798 spin_lock(&session->s_gen_ttl_lock); 3799 session->s_cap_gen++; 3800 spin_unlock(&session->s_gen_ttl_lock); 3801 3802 spin_lock(&session->s_cap_lock); 3803 /* don't know if session is readonly */ 3804 session->s_readonly = 0; 3805 /* 3806 * notify __ceph_remove_cap() that we are composing cap reconnect. 3807 * If a cap get released before being added to the cap reconnect, 3808 * __ceph_remove_cap() should skip queuing cap release. 3809 */ 3810 session->s_cap_reconnect = 1; 3811 /* drop old cap expires; we're about to reestablish that state */ 3812 detach_cap_releases(session, &dispose); 3813 spin_unlock(&session->s_cap_lock); 3814 dispose_cap_releases(mdsc, &dispose); 3815 3816 /* trim unused caps to reduce MDS's cache rejoin time */ 3817 if (mdsc->fsc->sb->s_root) 3818 shrink_dcache_parent(mdsc->fsc->sb->s_root); 3819 3820 ceph_con_close(&session->s_con); 3821 ceph_con_open(&session->s_con, 3822 CEPH_ENTITY_TYPE_MDS, mds, 3823 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 3824 3825 /* replay unsafe requests */ 3826 replay_unsafe_requests(mdsc, session); 3827 3828 ceph_early_kick_flushing_caps(mdsc, session); 3829 3830 down_read(&mdsc->snap_rwsem); 3831 3832 /* placeholder for nr_caps */ 3833 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 3834 if (err) 3835 goto fail; 3836 3837 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 3838 recon_state.msg_version = 3; 3839 recon_state.allow_multi = true; 3840 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 3841 recon_state.msg_version = 3; 3842 } else { 3843 recon_state.msg_version = 2; 3844 } 3845 /* trsaverse this session's caps */ 3846 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 3847 3848 spin_lock(&session->s_cap_lock); 3849 session->s_cap_reconnect = 0; 3850 spin_unlock(&session->s_cap_lock); 3851 3852 if (err < 0) 3853 goto fail; 3854 3855 /* check if all realms can be encoded into current message */ 3856 if (mdsc->num_snap_realms) { 3857 size_t total_len = 3858 recon_state.pagelist->length + 3859 mdsc->num_snap_realms * 3860 sizeof(struct ceph_mds_snaprealm_reconnect); 3861 if (recon_state.msg_version >= 4) { 3862 /* number of realms */ 3863 total_len += sizeof(u32); 3864 /* version, compat_version and struct_len */ 3865 total_len += mdsc->num_snap_realms * 3866 (2 * sizeof(u8) + sizeof(u32)); 3867 } 3868 if (total_len > RECONNECT_MAX_SIZE) { 3869 if (!recon_state.allow_multi) { 3870 err = -ENOSPC; 3871 goto fail; 3872 } 3873 if (recon_state.nr_caps) { 3874 err = send_reconnect_partial(&recon_state); 3875 if (err) 3876 goto fail; 3877 } 3878 recon_state.msg_version = 5; 3879 } 3880 } 3881 3882 err = encode_snap_realms(mdsc, &recon_state); 3883 if (err < 0) 3884 goto fail; 3885 3886 if (recon_state.msg_version >= 5) { 3887 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 3888 if (err < 0) 3889 goto fail; 3890 } 3891 3892 if (recon_state.nr_caps || recon_state.nr_realms) { 3893 struct page *page = 3894 list_first_entry(&recon_state.pagelist->head, 3895 struct page, lru); 3896 __le32 *addr = kmap_atomic(page); 3897 if (recon_state.nr_caps) { 3898 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 3899 *addr = cpu_to_le32(recon_state.nr_caps); 3900 } else if (recon_state.msg_version >= 4) { 3901 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 3902 } 3903 kunmap_atomic(addr); 3904 } 3905 3906 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 3907 if (recon_state.msg_version >= 4) 3908 reply->hdr.compat_version = cpu_to_le16(4); 3909 3910 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 3911 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 3912 3913 ceph_con_send(&session->s_con, reply); 3914 3915 mutex_unlock(&session->s_mutex); 3916 3917 mutex_lock(&mdsc->mutex); 3918 __wake_requests(mdsc, &session->s_waiting); 3919 mutex_unlock(&mdsc->mutex); 3920 3921 up_read(&mdsc->snap_rwsem); 3922 ceph_pagelist_release(recon_state.pagelist); 3923 return; 3924 3925 fail: 3926 ceph_msg_put(reply); 3927 up_read(&mdsc->snap_rwsem); 3928 mutex_unlock(&session->s_mutex); 3929 fail_nomsg: 3930 ceph_pagelist_release(recon_state.pagelist); 3931 fail_nopagelist: 3932 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 3933 return; 3934 } 3935 3936 3937 /* 3938 * compare old and new mdsmaps, kicking requests 3939 * and closing out old connections as necessary 3940 * 3941 * called under mdsc->mutex. 3942 */ 3943 static void check_new_map(struct ceph_mds_client *mdsc, 3944 struct ceph_mdsmap *newmap, 3945 struct ceph_mdsmap *oldmap) 3946 { 3947 int i; 3948 int oldstate, newstate; 3949 struct ceph_mds_session *s; 3950 3951 dout("check_new_map new %u old %u\n", 3952 newmap->m_epoch, oldmap->m_epoch); 3953 3954 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 3955 if (!mdsc->sessions[i]) 3956 continue; 3957 s = mdsc->sessions[i]; 3958 oldstate = ceph_mdsmap_get_state(oldmap, i); 3959 newstate = ceph_mdsmap_get_state(newmap, i); 3960 3961 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 3962 i, ceph_mds_state_name(oldstate), 3963 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 3964 ceph_mds_state_name(newstate), 3965 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 3966 ceph_session_state_name(s->s_state)); 3967 3968 if (i >= newmap->possible_max_rank) { 3969 /* force close session for stopped mds */ 3970 ceph_get_mds_session(s); 3971 __unregister_session(mdsc, s); 3972 __wake_requests(mdsc, &s->s_waiting); 3973 mutex_unlock(&mdsc->mutex); 3974 3975 mutex_lock(&s->s_mutex); 3976 cleanup_session_requests(mdsc, s); 3977 remove_session_caps(s); 3978 mutex_unlock(&s->s_mutex); 3979 3980 ceph_put_mds_session(s); 3981 3982 mutex_lock(&mdsc->mutex); 3983 kick_requests(mdsc, i); 3984 continue; 3985 } 3986 3987 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 3988 ceph_mdsmap_get_addr(newmap, i), 3989 sizeof(struct ceph_entity_addr))) { 3990 /* just close it */ 3991 mutex_unlock(&mdsc->mutex); 3992 mutex_lock(&s->s_mutex); 3993 mutex_lock(&mdsc->mutex); 3994 ceph_con_close(&s->s_con); 3995 mutex_unlock(&s->s_mutex); 3996 s->s_state = CEPH_MDS_SESSION_RESTARTING; 3997 } else if (oldstate == newstate) { 3998 continue; /* nothing new with this mds */ 3999 } 4000 4001 /* 4002 * send reconnect? 4003 */ 4004 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 4005 newstate >= CEPH_MDS_STATE_RECONNECT) { 4006 mutex_unlock(&mdsc->mutex); 4007 send_mds_reconnect(mdsc, s); 4008 mutex_lock(&mdsc->mutex); 4009 } 4010 4011 /* 4012 * kick request on any mds that has gone active. 4013 */ 4014 if (oldstate < CEPH_MDS_STATE_ACTIVE && 4015 newstate >= CEPH_MDS_STATE_ACTIVE) { 4016 if (oldstate != CEPH_MDS_STATE_CREATING && 4017 oldstate != CEPH_MDS_STATE_STARTING) 4018 pr_info("mds%d recovery completed\n", s->s_mds); 4019 kick_requests(mdsc, i); 4020 ceph_kick_flushing_caps(mdsc, s); 4021 wake_up_session_caps(s, RECONNECT); 4022 } 4023 } 4024 4025 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4026 s = mdsc->sessions[i]; 4027 if (!s) 4028 continue; 4029 if (!ceph_mdsmap_is_laggy(newmap, i)) 4030 continue; 4031 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4032 s->s_state == CEPH_MDS_SESSION_HUNG || 4033 s->s_state == CEPH_MDS_SESSION_CLOSING) { 4034 dout(" connecting to export targets of laggy mds%d\n", 4035 i); 4036 __open_export_target_sessions(mdsc, s); 4037 } 4038 } 4039 } 4040 4041 4042 4043 /* 4044 * leases 4045 */ 4046 4047 /* 4048 * caller must hold session s_mutex, dentry->d_lock 4049 */ 4050 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 4051 { 4052 struct ceph_dentry_info *di = ceph_dentry(dentry); 4053 4054 ceph_put_mds_session(di->lease_session); 4055 di->lease_session = NULL; 4056 } 4057 4058 static void handle_lease(struct ceph_mds_client *mdsc, 4059 struct ceph_mds_session *session, 4060 struct ceph_msg *msg) 4061 { 4062 struct super_block *sb = mdsc->fsc->sb; 4063 struct inode *inode; 4064 struct dentry *parent, *dentry; 4065 struct ceph_dentry_info *di; 4066 int mds = session->s_mds; 4067 struct ceph_mds_lease *h = msg->front.iov_base; 4068 u32 seq; 4069 struct ceph_vino vino; 4070 struct qstr dname; 4071 int release = 0; 4072 4073 dout("handle_lease from mds%d\n", mds); 4074 4075 /* decode */ 4076 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 4077 goto bad; 4078 vino.ino = le64_to_cpu(h->ino); 4079 vino.snap = CEPH_NOSNAP; 4080 seq = le32_to_cpu(h->seq); 4081 dname.len = get_unaligned_le32(h + 1); 4082 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 4083 goto bad; 4084 dname.name = (void *)(h + 1) + sizeof(u32); 4085 4086 /* lookup inode */ 4087 inode = ceph_find_inode(sb, vino); 4088 dout("handle_lease %s, ino %llx %p %.*s\n", 4089 ceph_lease_op_name(h->action), vino.ino, inode, 4090 dname.len, dname.name); 4091 4092 mutex_lock(&session->s_mutex); 4093 session->s_seq++; 4094 4095 if (!inode) { 4096 dout("handle_lease no inode %llx\n", vino.ino); 4097 goto release; 4098 } 4099 4100 /* dentry */ 4101 parent = d_find_alias(inode); 4102 if (!parent) { 4103 dout("no parent dentry on inode %p\n", inode); 4104 WARN_ON(1); 4105 goto release; /* hrm... */ 4106 } 4107 dname.hash = full_name_hash(parent, dname.name, dname.len); 4108 dentry = d_lookup(parent, &dname); 4109 dput(parent); 4110 if (!dentry) 4111 goto release; 4112 4113 spin_lock(&dentry->d_lock); 4114 di = ceph_dentry(dentry); 4115 switch (h->action) { 4116 case CEPH_MDS_LEASE_REVOKE: 4117 if (di->lease_session == session) { 4118 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 4119 h->seq = cpu_to_le32(di->lease_seq); 4120 __ceph_mdsc_drop_dentry_lease(dentry); 4121 } 4122 release = 1; 4123 break; 4124 4125 case CEPH_MDS_LEASE_RENEW: 4126 if (di->lease_session == session && 4127 di->lease_gen == session->s_cap_gen && 4128 di->lease_renew_from && 4129 di->lease_renew_after == 0) { 4130 unsigned long duration = 4131 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 4132 4133 di->lease_seq = seq; 4134 di->time = di->lease_renew_from + duration; 4135 di->lease_renew_after = di->lease_renew_from + 4136 (duration >> 1); 4137 di->lease_renew_from = 0; 4138 } 4139 break; 4140 } 4141 spin_unlock(&dentry->d_lock); 4142 dput(dentry); 4143 4144 if (!release) 4145 goto out; 4146 4147 release: 4148 /* let's just reuse the same message */ 4149 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 4150 ceph_msg_get(msg); 4151 ceph_con_send(&session->s_con, msg); 4152 4153 out: 4154 mutex_unlock(&session->s_mutex); 4155 /* avoid calling iput_final() in mds dispatch threads */ 4156 ceph_async_iput(inode); 4157 return; 4158 4159 bad: 4160 pr_err("corrupt lease message\n"); 4161 ceph_msg_dump(msg); 4162 } 4163 4164 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 4165 struct dentry *dentry, char action, 4166 u32 seq) 4167 { 4168 struct ceph_msg *msg; 4169 struct ceph_mds_lease *lease; 4170 struct inode *dir; 4171 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 4172 4173 dout("lease_send_msg identry %p %s to mds%d\n", 4174 dentry, ceph_lease_op_name(action), session->s_mds); 4175 4176 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 4177 if (!msg) 4178 return; 4179 lease = msg->front.iov_base; 4180 lease->action = action; 4181 lease->seq = cpu_to_le32(seq); 4182 4183 spin_lock(&dentry->d_lock); 4184 dir = d_inode(dentry->d_parent); 4185 lease->ino = cpu_to_le64(ceph_ino(dir)); 4186 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 4187 4188 put_unaligned_le32(dentry->d_name.len, lease + 1); 4189 memcpy((void *)(lease + 1) + 4, 4190 dentry->d_name.name, dentry->d_name.len); 4191 spin_unlock(&dentry->d_lock); 4192 /* 4193 * if this is a preemptive lease RELEASE, no need to 4194 * flush request stream, since the actual request will 4195 * soon follow. 4196 */ 4197 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 4198 4199 ceph_con_send(&session->s_con, msg); 4200 } 4201 4202 /* 4203 * lock unlock sessions, to wait ongoing session activities 4204 */ 4205 static void lock_unlock_sessions(struct ceph_mds_client *mdsc) 4206 { 4207 int i; 4208 4209 mutex_lock(&mdsc->mutex); 4210 for (i = 0; i < mdsc->max_sessions; i++) { 4211 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4212 if (!s) 4213 continue; 4214 mutex_unlock(&mdsc->mutex); 4215 mutex_lock(&s->s_mutex); 4216 mutex_unlock(&s->s_mutex); 4217 ceph_put_mds_session(s); 4218 mutex_lock(&mdsc->mutex); 4219 } 4220 mutex_unlock(&mdsc->mutex); 4221 } 4222 4223 static void maybe_recover_session(struct ceph_mds_client *mdsc) 4224 { 4225 struct ceph_fs_client *fsc = mdsc->fsc; 4226 4227 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 4228 return; 4229 4230 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 4231 return; 4232 4233 if (!READ_ONCE(fsc->blacklisted)) 4234 return; 4235 4236 if (fsc->last_auto_reconnect && 4237 time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30)) 4238 return; 4239 4240 pr_info("auto reconnect after blacklisted\n"); 4241 fsc->last_auto_reconnect = jiffies; 4242 ceph_force_reconnect(fsc->sb); 4243 } 4244 4245 /* 4246 * delayed work -- periodically trim expired leases, renew caps with mds 4247 */ 4248 static void schedule_delayed(struct ceph_mds_client *mdsc) 4249 { 4250 int delay = 5; 4251 unsigned hz = round_jiffies_relative(HZ * delay); 4252 schedule_delayed_work(&mdsc->delayed_work, hz); 4253 } 4254 4255 static void delayed_work(struct work_struct *work) 4256 { 4257 int i; 4258 struct ceph_mds_client *mdsc = 4259 container_of(work, struct ceph_mds_client, delayed_work.work); 4260 int renew_interval; 4261 int renew_caps; 4262 4263 dout("mdsc delayed_work\n"); 4264 4265 mutex_lock(&mdsc->mutex); 4266 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 4267 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 4268 mdsc->last_renew_caps); 4269 if (renew_caps) 4270 mdsc->last_renew_caps = jiffies; 4271 4272 for (i = 0; i < mdsc->max_sessions; i++) { 4273 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4274 if (!s) 4275 continue; 4276 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 4277 dout("resending session close request for mds%d\n", 4278 s->s_mds); 4279 request_close_session(mdsc, s); 4280 ceph_put_mds_session(s); 4281 continue; 4282 } 4283 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 4284 if (s->s_state == CEPH_MDS_SESSION_OPEN) { 4285 s->s_state = CEPH_MDS_SESSION_HUNG; 4286 pr_info("mds%d hung\n", s->s_mds); 4287 } 4288 } 4289 if (s->s_state == CEPH_MDS_SESSION_NEW || 4290 s->s_state == CEPH_MDS_SESSION_RESTARTING || 4291 s->s_state == CEPH_MDS_SESSION_REJECTED) { 4292 /* this mds is failed or recovering, just wait */ 4293 ceph_put_mds_session(s); 4294 continue; 4295 } 4296 mutex_unlock(&mdsc->mutex); 4297 4298 mutex_lock(&s->s_mutex); 4299 if (renew_caps) 4300 send_renew_caps(mdsc, s); 4301 else 4302 ceph_con_keepalive(&s->s_con); 4303 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4304 s->s_state == CEPH_MDS_SESSION_HUNG) 4305 ceph_send_cap_releases(mdsc, s); 4306 mutex_unlock(&s->s_mutex); 4307 ceph_put_mds_session(s); 4308 4309 mutex_lock(&mdsc->mutex); 4310 } 4311 mutex_unlock(&mdsc->mutex); 4312 4313 ceph_check_delayed_caps(mdsc); 4314 4315 ceph_queue_cap_reclaim_work(mdsc); 4316 4317 ceph_trim_snapid_map(mdsc); 4318 4319 maybe_recover_session(mdsc); 4320 4321 schedule_delayed(mdsc); 4322 } 4323 4324 int ceph_mdsc_init(struct ceph_fs_client *fsc) 4325 4326 { 4327 struct ceph_mds_client *mdsc; 4328 4329 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 4330 if (!mdsc) 4331 return -ENOMEM; 4332 mdsc->fsc = fsc; 4333 mutex_init(&mdsc->mutex); 4334 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 4335 if (!mdsc->mdsmap) { 4336 kfree(mdsc); 4337 return -ENOMEM; 4338 } 4339 4340 fsc->mdsc = mdsc; 4341 init_completion(&mdsc->safe_umount_waiters); 4342 init_waitqueue_head(&mdsc->session_close_wq); 4343 INIT_LIST_HEAD(&mdsc->waiting_for_map); 4344 mdsc->sessions = NULL; 4345 atomic_set(&mdsc->num_sessions, 0); 4346 mdsc->max_sessions = 0; 4347 mdsc->stopping = 0; 4348 atomic64_set(&mdsc->quotarealms_count, 0); 4349 mdsc->quotarealms_inodes = RB_ROOT; 4350 mutex_init(&mdsc->quotarealms_inodes_mutex); 4351 mdsc->last_snap_seq = 0; 4352 init_rwsem(&mdsc->snap_rwsem); 4353 mdsc->snap_realms = RB_ROOT; 4354 INIT_LIST_HEAD(&mdsc->snap_empty); 4355 mdsc->num_snap_realms = 0; 4356 spin_lock_init(&mdsc->snap_empty_lock); 4357 mdsc->last_tid = 0; 4358 mdsc->oldest_tid = 0; 4359 mdsc->request_tree = RB_ROOT; 4360 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 4361 mdsc->last_renew_caps = jiffies; 4362 INIT_LIST_HEAD(&mdsc->cap_delay_list); 4363 INIT_LIST_HEAD(&mdsc->cap_wait_list); 4364 spin_lock_init(&mdsc->cap_delay_lock); 4365 INIT_LIST_HEAD(&mdsc->snap_flush_list); 4366 spin_lock_init(&mdsc->snap_flush_lock); 4367 mdsc->last_cap_flush_tid = 1; 4368 INIT_LIST_HEAD(&mdsc->cap_flush_list); 4369 INIT_LIST_HEAD(&mdsc->cap_dirty); 4370 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 4371 mdsc->num_cap_flushing = 0; 4372 spin_lock_init(&mdsc->cap_dirty_lock); 4373 init_waitqueue_head(&mdsc->cap_flushing_wq); 4374 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 4375 atomic_set(&mdsc->cap_reclaim_pending, 0); 4376 4377 spin_lock_init(&mdsc->dentry_list_lock); 4378 INIT_LIST_HEAD(&mdsc->dentry_leases); 4379 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 4380 4381 ceph_caps_init(mdsc); 4382 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 4383 4384 spin_lock_init(&mdsc->snapid_map_lock); 4385 mdsc->snapid_map_tree = RB_ROOT; 4386 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 4387 4388 init_rwsem(&mdsc->pool_perm_rwsem); 4389 mdsc->pool_perm_tree = RB_ROOT; 4390 4391 strscpy(mdsc->nodename, utsname()->nodename, 4392 sizeof(mdsc->nodename)); 4393 return 0; 4394 } 4395 4396 /* 4397 * Wait for safe replies on open mds requests. If we time out, drop 4398 * all requests from the tree to avoid dangling dentry refs. 4399 */ 4400 static void wait_requests(struct ceph_mds_client *mdsc) 4401 { 4402 struct ceph_options *opts = mdsc->fsc->client->options; 4403 struct ceph_mds_request *req; 4404 4405 mutex_lock(&mdsc->mutex); 4406 if (__get_oldest_req(mdsc)) { 4407 mutex_unlock(&mdsc->mutex); 4408 4409 dout("wait_requests waiting for requests\n"); 4410 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 4411 ceph_timeout_jiffies(opts->mount_timeout)); 4412 4413 /* tear down remaining requests */ 4414 mutex_lock(&mdsc->mutex); 4415 while ((req = __get_oldest_req(mdsc))) { 4416 dout("wait_requests timed out on tid %llu\n", 4417 req->r_tid); 4418 list_del_init(&req->r_wait); 4419 __unregister_request(mdsc, req); 4420 } 4421 } 4422 mutex_unlock(&mdsc->mutex); 4423 dout("wait_requests done\n"); 4424 } 4425 4426 /* 4427 * called before mount is ro, and before dentries are torn down. 4428 * (hmm, does this still race with new lookups?) 4429 */ 4430 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 4431 { 4432 dout("pre_umount\n"); 4433 mdsc->stopping = 1; 4434 4435 lock_unlock_sessions(mdsc); 4436 ceph_flush_dirty_caps(mdsc); 4437 wait_requests(mdsc); 4438 4439 /* 4440 * wait for reply handlers to drop their request refs and 4441 * their inode/dcache refs 4442 */ 4443 ceph_msgr_flush(); 4444 4445 ceph_cleanup_quotarealms_inodes(mdsc); 4446 } 4447 4448 /* 4449 * wait for all write mds requests to flush. 4450 */ 4451 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 4452 { 4453 struct ceph_mds_request *req = NULL, *nextreq; 4454 struct rb_node *n; 4455 4456 mutex_lock(&mdsc->mutex); 4457 dout("wait_unsafe_requests want %lld\n", want_tid); 4458 restart: 4459 req = __get_oldest_req(mdsc); 4460 while (req && req->r_tid <= want_tid) { 4461 /* find next request */ 4462 n = rb_next(&req->r_node); 4463 if (n) 4464 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 4465 else 4466 nextreq = NULL; 4467 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 4468 (req->r_op & CEPH_MDS_OP_WRITE)) { 4469 /* write op */ 4470 ceph_mdsc_get_request(req); 4471 if (nextreq) 4472 ceph_mdsc_get_request(nextreq); 4473 mutex_unlock(&mdsc->mutex); 4474 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 4475 req->r_tid, want_tid); 4476 wait_for_completion(&req->r_safe_completion); 4477 mutex_lock(&mdsc->mutex); 4478 ceph_mdsc_put_request(req); 4479 if (!nextreq) 4480 break; /* next dne before, so we're done! */ 4481 if (RB_EMPTY_NODE(&nextreq->r_node)) { 4482 /* next request was removed from tree */ 4483 ceph_mdsc_put_request(nextreq); 4484 goto restart; 4485 } 4486 ceph_mdsc_put_request(nextreq); /* won't go away */ 4487 } 4488 req = nextreq; 4489 } 4490 mutex_unlock(&mdsc->mutex); 4491 dout("wait_unsafe_requests done\n"); 4492 } 4493 4494 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 4495 { 4496 u64 want_tid, want_flush; 4497 4498 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4499 return; 4500 4501 dout("sync\n"); 4502 mutex_lock(&mdsc->mutex); 4503 want_tid = mdsc->last_tid; 4504 mutex_unlock(&mdsc->mutex); 4505 4506 ceph_flush_dirty_caps(mdsc); 4507 spin_lock(&mdsc->cap_dirty_lock); 4508 want_flush = mdsc->last_cap_flush_tid; 4509 if (!list_empty(&mdsc->cap_flush_list)) { 4510 struct ceph_cap_flush *cf = 4511 list_last_entry(&mdsc->cap_flush_list, 4512 struct ceph_cap_flush, g_list); 4513 cf->wake = true; 4514 } 4515 spin_unlock(&mdsc->cap_dirty_lock); 4516 4517 dout("sync want tid %lld flush_seq %lld\n", 4518 want_tid, want_flush); 4519 4520 wait_unsafe_requests(mdsc, want_tid); 4521 wait_caps_flush(mdsc, want_flush); 4522 } 4523 4524 /* 4525 * true if all sessions are closed, or we force unmount 4526 */ 4527 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 4528 { 4529 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4530 return true; 4531 return atomic_read(&mdsc->num_sessions) <= skipped; 4532 } 4533 4534 /* 4535 * called after sb is ro. 4536 */ 4537 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 4538 { 4539 struct ceph_options *opts = mdsc->fsc->client->options; 4540 struct ceph_mds_session *session; 4541 int i; 4542 int skipped = 0; 4543 4544 dout("close_sessions\n"); 4545 4546 /* close sessions */ 4547 mutex_lock(&mdsc->mutex); 4548 for (i = 0; i < mdsc->max_sessions; i++) { 4549 session = __ceph_lookup_mds_session(mdsc, i); 4550 if (!session) 4551 continue; 4552 mutex_unlock(&mdsc->mutex); 4553 mutex_lock(&session->s_mutex); 4554 if (__close_session(mdsc, session) <= 0) 4555 skipped++; 4556 mutex_unlock(&session->s_mutex); 4557 ceph_put_mds_session(session); 4558 mutex_lock(&mdsc->mutex); 4559 } 4560 mutex_unlock(&mdsc->mutex); 4561 4562 dout("waiting for sessions to close\n"); 4563 wait_event_timeout(mdsc->session_close_wq, 4564 done_closing_sessions(mdsc, skipped), 4565 ceph_timeout_jiffies(opts->mount_timeout)); 4566 4567 /* tear down remaining sessions */ 4568 mutex_lock(&mdsc->mutex); 4569 for (i = 0; i < mdsc->max_sessions; i++) { 4570 if (mdsc->sessions[i]) { 4571 session = ceph_get_mds_session(mdsc->sessions[i]); 4572 __unregister_session(mdsc, session); 4573 mutex_unlock(&mdsc->mutex); 4574 mutex_lock(&session->s_mutex); 4575 remove_session_caps(session); 4576 mutex_unlock(&session->s_mutex); 4577 ceph_put_mds_session(session); 4578 mutex_lock(&mdsc->mutex); 4579 } 4580 } 4581 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 4582 mutex_unlock(&mdsc->mutex); 4583 4584 ceph_cleanup_snapid_map(mdsc); 4585 ceph_cleanup_empty_realms(mdsc); 4586 4587 cancel_work_sync(&mdsc->cap_reclaim_work); 4588 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4589 4590 dout("stopped\n"); 4591 } 4592 4593 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 4594 { 4595 struct ceph_mds_session *session; 4596 int mds; 4597 4598 dout("force umount\n"); 4599 4600 mutex_lock(&mdsc->mutex); 4601 for (mds = 0; mds < mdsc->max_sessions; mds++) { 4602 session = __ceph_lookup_mds_session(mdsc, mds); 4603 if (!session) 4604 continue; 4605 4606 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 4607 __unregister_session(mdsc, session); 4608 __wake_requests(mdsc, &session->s_waiting); 4609 mutex_unlock(&mdsc->mutex); 4610 4611 mutex_lock(&session->s_mutex); 4612 __close_session(mdsc, session); 4613 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 4614 cleanup_session_requests(mdsc, session); 4615 remove_session_caps(session); 4616 } 4617 mutex_unlock(&session->s_mutex); 4618 ceph_put_mds_session(session); 4619 4620 mutex_lock(&mdsc->mutex); 4621 kick_requests(mdsc, mds); 4622 } 4623 __wake_requests(mdsc, &mdsc->waiting_for_map); 4624 mutex_unlock(&mdsc->mutex); 4625 } 4626 4627 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 4628 { 4629 dout("stop\n"); 4630 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4631 if (mdsc->mdsmap) 4632 ceph_mdsmap_destroy(mdsc->mdsmap); 4633 kfree(mdsc->sessions); 4634 ceph_caps_finalize(mdsc); 4635 ceph_pool_perm_destroy(mdsc); 4636 } 4637 4638 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 4639 { 4640 struct ceph_mds_client *mdsc = fsc->mdsc; 4641 dout("mdsc_destroy %p\n", mdsc); 4642 4643 if (!mdsc) 4644 return; 4645 4646 /* flush out any connection work with references to us */ 4647 ceph_msgr_flush(); 4648 4649 ceph_mdsc_stop(mdsc); 4650 4651 fsc->mdsc = NULL; 4652 kfree(mdsc); 4653 dout("mdsc_destroy %p done\n", mdsc); 4654 } 4655 4656 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4657 { 4658 struct ceph_fs_client *fsc = mdsc->fsc; 4659 const char *mds_namespace = fsc->mount_options->mds_namespace; 4660 void *p = msg->front.iov_base; 4661 void *end = p + msg->front.iov_len; 4662 u32 epoch; 4663 u32 map_len; 4664 u32 num_fs; 4665 u32 mount_fscid = (u32)-1; 4666 u8 struct_v, struct_cv; 4667 int err = -EINVAL; 4668 4669 ceph_decode_need(&p, end, sizeof(u32), bad); 4670 epoch = ceph_decode_32(&p); 4671 4672 dout("handle_fsmap epoch %u\n", epoch); 4673 4674 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4675 struct_v = ceph_decode_8(&p); 4676 struct_cv = ceph_decode_8(&p); 4677 map_len = ceph_decode_32(&p); 4678 4679 ceph_decode_need(&p, end, sizeof(u32) * 3, bad); 4680 p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */ 4681 4682 num_fs = ceph_decode_32(&p); 4683 while (num_fs-- > 0) { 4684 void *info_p, *info_end; 4685 u32 info_len; 4686 u8 info_v, info_cv; 4687 u32 fscid, namelen; 4688 4689 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4690 info_v = ceph_decode_8(&p); 4691 info_cv = ceph_decode_8(&p); 4692 info_len = ceph_decode_32(&p); 4693 ceph_decode_need(&p, end, info_len, bad); 4694 info_p = p; 4695 info_end = p + info_len; 4696 p = info_end; 4697 4698 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 4699 fscid = ceph_decode_32(&info_p); 4700 namelen = ceph_decode_32(&info_p); 4701 ceph_decode_need(&info_p, info_end, namelen, bad); 4702 4703 if (mds_namespace && 4704 strlen(mds_namespace) == namelen && 4705 !strncmp(mds_namespace, (char *)info_p, namelen)) { 4706 mount_fscid = fscid; 4707 break; 4708 } 4709 } 4710 4711 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 4712 if (mount_fscid != (u32)-1) { 4713 fsc->client->monc.fs_cluster_id = mount_fscid; 4714 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 4715 0, true); 4716 ceph_monc_renew_subs(&fsc->client->monc); 4717 } else { 4718 err = -ENOENT; 4719 goto err_out; 4720 } 4721 return; 4722 4723 bad: 4724 pr_err("error decoding fsmap\n"); 4725 err_out: 4726 mutex_lock(&mdsc->mutex); 4727 mdsc->mdsmap_err = err; 4728 __wake_requests(mdsc, &mdsc->waiting_for_map); 4729 mutex_unlock(&mdsc->mutex); 4730 } 4731 4732 /* 4733 * handle mds map update. 4734 */ 4735 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4736 { 4737 u32 epoch; 4738 u32 maplen; 4739 void *p = msg->front.iov_base; 4740 void *end = p + msg->front.iov_len; 4741 struct ceph_mdsmap *newmap, *oldmap; 4742 struct ceph_fsid fsid; 4743 int err = -EINVAL; 4744 4745 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 4746 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 4747 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 4748 return; 4749 epoch = ceph_decode_32(&p); 4750 maplen = ceph_decode_32(&p); 4751 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 4752 4753 /* do we need it? */ 4754 mutex_lock(&mdsc->mutex); 4755 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 4756 dout("handle_map epoch %u <= our %u\n", 4757 epoch, mdsc->mdsmap->m_epoch); 4758 mutex_unlock(&mdsc->mutex); 4759 return; 4760 } 4761 4762 newmap = ceph_mdsmap_decode(&p, end); 4763 if (IS_ERR(newmap)) { 4764 err = PTR_ERR(newmap); 4765 goto bad_unlock; 4766 } 4767 4768 /* swap into place */ 4769 if (mdsc->mdsmap) { 4770 oldmap = mdsc->mdsmap; 4771 mdsc->mdsmap = newmap; 4772 check_new_map(mdsc, newmap, oldmap); 4773 ceph_mdsmap_destroy(oldmap); 4774 } else { 4775 mdsc->mdsmap = newmap; /* first mds map */ 4776 } 4777 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 4778 MAX_LFS_FILESIZE); 4779 4780 __wake_requests(mdsc, &mdsc->waiting_for_map); 4781 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 4782 mdsc->mdsmap->m_epoch); 4783 4784 mutex_unlock(&mdsc->mutex); 4785 schedule_delayed(mdsc); 4786 return; 4787 4788 bad_unlock: 4789 mutex_unlock(&mdsc->mutex); 4790 bad: 4791 pr_err("error decoding mdsmap %d\n", err); 4792 return; 4793 } 4794 4795 static struct ceph_connection *con_get(struct ceph_connection *con) 4796 { 4797 struct ceph_mds_session *s = con->private; 4798 4799 if (ceph_get_mds_session(s)) 4800 return con; 4801 return NULL; 4802 } 4803 4804 static void con_put(struct ceph_connection *con) 4805 { 4806 struct ceph_mds_session *s = con->private; 4807 4808 ceph_put_mds_session(s); 4809 } 4810 4811 /* 4812 * if the client is unresponsive for long enough, the mds will kill 4813 * the session entirely. 4814 */ 4815 static void peer_reset(struct ceph_connection *con) 4816 { 4817 struct ceph_mds_session *s = con->private; 4818 struct ceph_mds_client *mdsc = s->s_mdsc; 4819 4820 pr_warn("mds%d closed our session\n", s->s_mds); 4821 send_mds_reconnect(mdsc, s); 4822 } 4823 4824 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 4825 { 4826 struct ceph_mds_session *s = con->private; 4827 struct ceph_mds_client *mdsc = s->s_mdsc; 4828 int type = le16_to_cpu(msg->hdr.type); 4829 4830 mutex_lock(&mdsc->mutex); 4831 if (__verify_registered_session(mdsc, s) < 0) { 4832 mutex_unlock(&mdsc->mutex); 4833 goto out; 4834 } 4835 mutex_unlock(&mdsc->mutex); 4836 4837 switch (type) { 4838 case CEPH_MSG_MDS_MAP: 4839 ceph_mdsc_handle_mdsmap(mdsc, msg); 4840 break; 4841 case CEPH_MSG_FS_MAP_USER: 4842 ceph_mdsc_handle_fsmap(mdsc, msg); 4843 break; 4844 case CEPH_MSG_CLIENT_SESSION: 4845 handle_session(s, msg); 4846 break; 4847 case CEPH_MSG_CLIENT_REPLY: 4848 handle_reply(s, msg); 4849 break; 4850 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 4851 handle_forward(mdsc, s, msg); 4852 break; 4853 case CEPH_MSG_CLIENT_CAPS: 4854 ceph_handle_caps(s, msg); 4855 break; 4856 case CEPH_MSG_CLIENT_SNAP: 4857 ceph_handle_snap(mdsc, s, msg); 4858 break; 4859 case CEPH_MSG_CLIENT_LEASE: 4860 handle_lease(mdsc, s, msg); 4861 break; 4862 case CEPH_MSG_CLIENT_QUOTA: 4863 ceph_handle_quota(mdsc, s, msg); 4864 break; 4865 4866 default: 4867 pr_err("received unknown message type %d %s\n", type, 4868 ceph_msg_type_name(type)); 4869 } 4870 out: 4871 ceph_msg_put(msg); 4872 } 4873 4874 /* 4875 * authentication 4876 */ 4877 4878 /* 4879 * Note: returned pointer is the address of a structure that's 4880 * managed separately. Caller must *not* attempt to free it. 4881 */ 4882 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 4883 int *proto, int force_new) 4884 { 4885 struct ceph_mds_session *s = con->private; 4886 struct ceph_mds_client *mdsc = s->s_mdsc; 4887 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4888 struct ceph_auth_handshake *auth = &s->s_auth; 4889 4890 if (force_new && auth->authorizer) { 4891 ceph_auth_destroy_authorizer(auth->authorizer); 4892 auth->authorizer = NULL; 4893 } 4894 if (!auth->authorizer) { 4895 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 4896 auth); 4897 if (ret) 4898 return ERR_PTR(ret); 4899 } else { 4900 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 4901 auth); 4902 if (ret) 4903 return ERR_PTR(ret); 4904 } 4905 *proto = ac->protocol; 4906 4907 return auth; 4908 } 4909 4910 static int add_authorizer_challenge(struct ceph_connection *con, 4911 void *challenge_buf, int challenge_buf_len) 4912 { 4913 struct ceph_mds_session *s = con->private; 4914 struct ceph_mds_client *mdsc = s->s_mdsc; 4915 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4916 4917 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 4918 challenge_buf, challenge_buf_len); 4919 } 4920 4921 static int verify_authorizer_reply(struct ceph_connection *con) 4922 { 4923 struct ceph_mds_session *s = con->private; 4924 struct ceph_mds_client *mdsc = s->s_mdsc; 4925 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4926 4927 return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer); 4928 } 4929 4930 static int invalidate_authorizer(struct ceph_connection *con) 4931 { 4932 struct ceph_mds_session *s = con->private; 4933 struct ceph_mds_client *mdsc = s->s_mdsc; 4934 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4935 4936 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 4937 4938 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 4939 } 4940 4941 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 4942 struct ceph_msg_header *hdr, int *skip) 4943 { 4944 struct ceph_msg *msg; 4945 int type = (int) le16_to_cpu(hdr->type); 4946 int front_len = (int) le32_to_cpu(hdr->front_len); 4947 4948 if (con->in_msg) 4949 return con->in_msg; 4950 4951 *skip = 0; 4952 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 4953 if (!msg) { 4954 pr_err("unable to allocate msg type %d len %d\n", 4955 type, front_len); 4956 return NULL; 4957 } 4958 4959 return msg; 4960 } 4961 4962 static int mds_sign_message(struct ceph_msg *msg) 4963 { 4964 struct ceph_mds_session *s = msg->con->private; 4965 struct ceph_auth_handshake *auth = &s->s_auth; 4966 4967 return ceph_auth_sign_message(auth, msg); 4968 } 4969 4970 static int mds_check_message_signature(struct ceph_msg *msg) 4971 { 4972 struct ceph_mds_session *s = msg->con->private; 4973 struct ceph_auth_handshake *auth = &s->s_auth; 4974 4975 return ceph_auth_check_message_signature(auth, msg); 4976 } 4977 4978 static const struct ceph_connection_operations mds_con_ops = { 4979 .get = con_get, 4980 .put = con_put, 4981 .dispatch = dispatch, 4982 .get_authorizer = get_authorizer, 4983 .add_authorizer_challenge = add_authorizer_challenge, 4984 .verify_authorizer_reply = verify_authorizer_reply, 4985 .invalidate_authorizer = invalidate_authorizer, 4986 .peer_reset = peer_reset, 4987 .alloc_msg = mds_alloc_msg, 4988 .sign_message = mds_sign_message, 4989 .check_message_signature = mds_check_message_signature, 4990 }; 4991 4992 /* eof */ 4993