1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 15 #include "super.h" 16 #include "mds_client.h" 17 18 #include <linux/ceph/ceph_features.h> 19 #include <linux/ceph/messenger.h> 20 #include <linux/ceph/decode.h> 21 #include <linux/ceph/pagelist.h> 22 #include <linux/ceph/auth.h> 23 #include <linux/ceph/debugfs.h> 24 25 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 26 27 /* 28 * A cluster of MDS (metadata server) daemons is responsible for 29 * managing the file system namespace (the directory hierarchy and 30 * inodes) and for coordinating shared access to storage. Metadata is 31 * partitioning hierarchically across a number of servers, and that 32 * partition varies over time as the cluster adjusts the distribution 33 * in order to balance load. 34 * 35 * The MDS client is primarily responsible to managing synchronous 36 * metadata requests for operations like open, unlink, and so forth. 37 * If there is a MDS failure, we find out about it when we (possibly 38 * request and) receive a new MDS map, and can resubmit affected 39 * requests. 40 * 41 * For the most part, though, we take advantage of a lossless 42 * communications channel to the MDS, and do not need to worry about 43 * timing out or resubmitting requests. 44 * 45 * We maintain a stateful "session" with each MDS we interact with. 46 * Within each session, we sent periodic heartbeat messages to ensure 47 * any capabilities or leases we have been issues remain valid. If 48 * the session times out and goes stale, our leases and capabilities 49 * are no longer valid. 50 */ 51 52 struct ceph_reconnect_state { 53 struct ceph_mds_session *session; 54 int nr_caps, nr_realms; 55 struct ceph_pagelist *pagelist; 56 unsigned msg_version; 57 bool allow_multi; 58 }; 59 60 static void __wake_requests(struct ceph_mds_client *mdsc, 61 struct list_head *head); 62 static void ceph_cap_release_work(struct work_struct *work); 63 static void ceph_cap_reclaim_work(struct work_struct *work); 64 65 static const struct ceph_connection_operations mds_con_ops; 66 67 68 /* 69 * mds reply parsing 70 */ 71 72 static int parse_reply_info_quota(void **p, void *end, 73 struct ceph_mds_reply_info_in *info) 74 { 75 u8 struct_v, struct_compat; 76 u32 struct_len; 77 78 ceph_decode_8_safe(p, end, struct_v, bad); 79 ceph_decode_8_safe(p, end, struct_compat, bad); 80 /* struct_v is expected to be >= 1. we only 81 * understand encoding with struct_compat == 1. */ 82 if (!struct_v || struct_compat != 1) 83 goto bad; 84 ceph_decode_32_safe(p, end, struct_len, bad); 85 ceph_decode_need(p, end, struct_len, bad); 86 end = *p + struct_len; 87 ceph_decode_64_safe(p, end, info->max_bytes, bad); 88 ceph_decode_64_safe(p, end, info->max_files, bad); 89 *p = end; 90 return 0; 91 bad: 92 return -EIO; 93 } 94 95 /* 96 * parse individual inode info 97 */ 98 static int parse_reply_info_in(void **p, void *end, 99 struct ceph_mds_reply_info_in *info, 100 u64 features) 101 { 102 int err = 0; 103 u8 struct_v = 0; 104 105 if (features == (u64)-1) { 106 u32 struct_len; 107 u8 struct_compat; 108 ceph_decode_8_safe(p, end, struct_v, bad); 109 ceph_decode_8_safe(p, end, struct_compat, bad); 110 /* struct_v is expected to be >= 1. we only understand 111 * encoding with struct_compat == 1. */ 112 if (!struct_v || struct_compat != 1) 113 goto bad; 114 ceph_decode_32_safe(p, end, struct_len, bad); 115 ceph_decode_need(p, end, struct_len, bad); 116 end = *p + struct_len; 117 } 118 119 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 120 info->in = *p; 121 *p += sizeof(struct ceph_mds_reply_inode) + 122 sizeof(*info->in->fragtree.splits) * 123 le32_to_cpu(info->in->fragtree.nsplits); 124 125 ceph_decode_32_safe(p, end, info->symlink_len, bad); 126 ceph_decode_need(p, end, info->symlink_len, bad); 127 info->symlink = *p; 128 *p += info->symlink_len; 129 130 ceph_decode_copy_safe(p, end, &info->dir_layout, 131 sizeof(info->dir_layout), bad); 132 ceph_decode_32_safe(p, end, info->xattr_len, bad); 133 ceph_decode_need(p, end, info->xattr_len, bad); 134 info->xattr_data = *p; 135 *p += info->xattr_len; 136 137 if (features == (u64)-1) { 138 /* inline data */ 139 ceph_decode_64_safe(p, end, info->inline_version, bad); 140 ceph_decode_32_safe(p, end, info->inline_len, bad); 141 ceph_decode_need(p, end, info->inline_len, bad); 142 info->inline_data = *p; 143 *p += info->inline_len; 144 /* quota */ 145 err = parse_reply_info_quota(p, end, info); 146 if (err < 0) 147 goto out_bad; 148 /* pool namespace */ 149 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 150 if (info->pool_ns_len > 0) { 151 ceph_decode_need(p, end, info->pool_ns_len, bad); 152 info->pool_ns_data = *p; 153 *p += info->pool_ns_len; 154 } 155 156 /* btime */ 157 ceph_decode_need(p, end, sizeof(info->btime), bad); 158 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 159 160 /* change attribute */ 161 ceph_decode_64_safe(p, end, info->change_attr, bad); 162 163 /* dir pin */ 164 if (struct_v >= 2) { 165 ceph_decode_32_safe(p, end, info->dir_pin, bad); 166 } else { 167 info->dir_pin = -ENODATA; 168 } 169 170 /* snapshot birth time, remains zero for v<=2 */ 171 if (struct_v >= 3) { 172 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 173 ceph_decode_copy(p, &info->snap_btime, 174 sizeof(info->snap_btime)); 175 } else { 176 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 177 } 178 179 *p = end; 180 } else { 181 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 182 ceph_decode_64_safe(p, end, info->inline_version, bad); 183 ceph_decode_32_safe(p, end, info->inline_len, bad); 184 ceph_decode_need(p, end, info->inline_len, bad); 185 info->inline_data = *p; 186 *p += info->inline_len; 187 } else 188 info->inline_version = CEPH_INLINE_NONE; 189 190 if (features & CEPH_FEATURE_MDS_QUOTA) { 191 err = parse_reply_info_quota(p, end, info); 192 if (err < 0) 193 goto out_bad; 194 } else { 195 info->max_bytes = 0; 196 info->max_files = 0; 197 } 198 199 info->pool_ns_len = 0; 200 info->pool_ns_data = NULL; 201 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 202 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 203 if (info->pool_ns_len > 0) { 204 ceph_decode_need(p, end, info->pool_ns_len, bad); 205 info->pool_ns_data = *p; 206 *p += info->pool_ns_len; 207 } 208 } 209 210 if (features & CEPH_FEATURE_FS_BTIME) { 211 ceph_decode_need(p, end, sizeof(info->btime), bad); 212 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 213 ceph_decode_64_safe(p, end, info->change_attr, bad); 214 } 215 216 info->dir_pin = -ENODATA; 217 /* info->snap_btime remains zero */ 218 } 219 return 0; 220 bad: 221 err = -EIO; 222 out_bad: 223 return err; 224 } 225 226 static int parse_reply_info_dir(void **p, void *end, 227 struct ceph_mds_reply_dirfrag **dirfrag, 228 u64 features) 229 { 230 if (features == (u64)-1) { 231 u8 struct_v, struct_compat; 232 u32 struct_len; 233 ceph_decode_8_safe(p, end, struct_v, bad); 234 ceph_decode_8_safe(p, end, struct_compat, bad); 235 /* struct_v is expected to be >= 1. we only understand 236 * encoding whose struct_compat == 1. */ 237 if (!struct_v || struct_compat != 1) 238 goto bad; 239 ceph_decode_32_safe(p, end, struct_len, bad); 240 ceph_decode_need(p, end, struct_len, bad); 241 end = *p + struct_len; 242 } 243 244 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 245 *dirfrag = *p; 246 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 247 if (unlikely(*p > end)) 248 goto bad; 249 if (features == (u64)-1) 250 *p = end; 251 return 0; 252 bad: 253 return -EIO; 254 } 255 256 static int parse_reply_info_lease(void **p, void *end, 257 struct ceph_mds_reply_lease **lease, 258 u64 features) 259 { 260 if (features == (u64)-1) { 261 u8 struct_v, struct_compat; 262 u32 struct_len; 263 ceph_decode_8_safe(p, end, struct_v, bad); 264 ceph_decode_8_safe(p, end, struct_compat, bad); 265 /* struct_v is expected to be >= 1. we only understand 266 * encoding whose struct_compat == 1. */ 267 if (!struct_v || struct_compat != 1) 268 goto bad; 269 ceph_decode_32_safe(p, end, struct_len, bad); 270 ceph_decode_need(p, end, struct_len, bad); 271 end = *p + struct_len; 272 } 273 274 ceph_decode_need(p, end, sizeof(**lease), bad); 275 *lease = *p; 276 *p += sizeof(**lease); 277 if (features == (u64)-1) 278 *p = end; 279 return 0; 280 bad: 281 return -EIO; 282 } 283 284 /* 285 * parse a normal reply, which may contain a (dir+)dentry and/or a 286 * target inode. 287 */ 288 static int parse_reply_info_trace(void **p, void *end, 289 struct ceph_mds_reply_info_parsed *info, 290 u64 features) 291 { 292 int err; 293 294 if (info->head->is_dentry) { 295 err = parse_reply_info_in(p, end, &info->diri, features); 296 if (err < 0) 297 goto out_bad; 298 299 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 300 if (err < 0) 301 goto out_bad; 302 303 ceph_decode_32_safe(p, end, info->dname_len, bad); 304 ceph_decode_need(p, end, info->dname_len, bad); 305 info->dname = *p; 306 *p += info->dname_len; 307 308 err = parse_reply_info_lease(p, end, &info->dlease, features); 309 if (err < 0) 310 goto out_bad; 311 } 312 313 if (info->head->is_target) { 314 err = parse_reply_info_in(p, end, &info->targeti, features); 315 if (err < 0) 316 goto out_bad; 317 } 318 319 if (unlikely(*p != end)) 320 goto bad; 321 return 0; 322 323 bad: 324 err = -EIO; 325 out_bad: 326 pr_err("problem parsing mds trace %d\n", err); 327 return err; 328 } 329 330 /* 331 * parse readdir results 332 */ 333 static int parse_reply_info_readdir(void **p, void *end, 334 struct ceph_mds_reply_info_parsed *info, 335 u64 features) 336 { 337 u32 num, i = 0; 338 int err; 339 340 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 341 if (err < 0) 342 goto out_bad; 343 344 ceph_decode_need(p, end, sizeof(num) + 2, bad); 345 num = ceph_decode_32(p); 346 { 347 u16 flags = ceph_decode_16(p); 348 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 349 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 350 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 351 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 352 } 353 if (num == 0) 354 goto done; 355 356 BUG_ON(!info->dir_entries); 357 if ((unsigned long)(info->dir_entries + num) > 358 (unsigned long)info->dir_entries + info->dir_buf_size) { 359 pr_err("dir contents are larger than expected\n"); 360 WARN_ON(1); 361 goto bad; 362 } 363 364 info->dir_nr = num; 365 while (num) { 366 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 367 /* dentry */ 368 ceph_decode_32_safe(p, end, rde->name_len, bad); 369 ceph_decode_need(p, end, rde->name_len, bad); 370 rde->name = *p; 371 *p += rde->name_len; 372 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 373 374 /* dentry lease */ 375 err = parse_reply_info_lease(p, end, &rde->lease, features); 376 if (err) 377 goto out_bad; 378 /* inode */ 379 err = parse_reply_info_in(p, end, &rde->inode, features); 380 if (err < 0) 381 goto out_bad; 382 /* ceph_readdir_prepopulate() will update it */ 383 rde->offset = 0; 384 i++; 385 num--; 386 } 387 388 done: 389 /* Skip over any unrecognized fields */ 390 *p = end; 391 return 0; 392 393 bad: 394 err = -EIO; 395 out_bad: 396 pr_err("problem parsing dir contents %d\n", err); 397 return err; 398 } 399 400 /* 401 * parse fcntl F_GETLK results 402 */ 403 static int parse_reply_info_filelock(void **p, void *end, 404 struct ceph_mds_reply_info_parsed *info, 405 u64 features) 406 { 407 if (*p + sizeof(*info->filelock_reply) > end) 408 goto bad; 409 410 info->filelock_reply = *p; 411 412 /* Skip over any unrecognized fields */ 413 *p = end; 414 return 0; 415 bad: 416 return -EIO; 417 } 418 419 420 #if BITS_PER_LONG == 64 421 422 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 423 424 static int ceph_parse_deleg_inos(void **p, void *end, 425 struct ceph_mds_session *s) 426 { 427 u32 sets; 428 429 ceph_decode_32_safe(p, end, sets, bad); 430 dout("got %u sets of delegated inodes\n", sets); 431 while (sets--) { 432 u64 start, len, ino; 433 434 ceph_decode_64_safe(p, end, start, bad); 435 ceph_decode_64_safe(p, end, len, bad); 436 while (len--) { 437 int err = xa_insert(&s->s_delegated_inos, ino = start++, 438 DELEGATED_INO_AVAILABLE, 439 GFP_KERNEL); 440 if (!err) { 441 dout("added delegated inode 0x%llx\n", 442 start - 1); 443 } else if (err == -EBUSY) { 444 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n", 445 start - 1); 446 } else { 447 return err; 448 } 449 } 450 } 451 return 0; 452 bad: 453 return -EIO; 454 } 455 456 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 457 { 458 unsigned long ino; 459 void *val; 460 461 xa_for_each(&s->s_delegated_inos, ino, val) { 462 val = xa_erase(&s->s_delegated_inos, ino); 463 if (val == DELEGATED_INO_AVAILABLE) 464 return ino; 465 } 466 return 0; 467 } 468 469 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 470 { 471 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 472 GFP_KERNEL); 473 } 474 #else /* BITS_PER_LONG == 64 */ 475 /* 476 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 477 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 478 * and bottom words? 479 */ 480 static int ceph_parse_deleg_inos(void **p, void *end, 481 struct ceph_mds_session *s) 482 { 483 u32 sets; 484 485 ceph_decode_32_safe(p, end, sets, bad); 486 if (sets) 487 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 488 return 0; 489 bad: 490 return -EIO; 491 } 492 493 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 494 { 495 return 0; 496 } 497 498 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 499 { 500 return 0; 501 } 502 #endif /* BITS_PER_LONG == 64 */ 503 504 /* 505 * parse create results 506 */ 507 static int parse_reply_info_create(void **p, void *end, 508 struct ceph_mds_reply_info_parsed *info, 509 u64 features, struct ceph_mds_session *s) 510 { 511 int ret; 512 513 if (features == (u64)-1 || 514 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 515 if (*p == end) { 516 /* Malformed reply? */ 517 info->has_create_ino = false; 518 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 519 info->has_create_ino = true; 520 /* struct_v, struct_compat, and len */ 521 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad); 522 ceph_decode_64_safe(p, end, info->ino, bad); 523 ret = ceph_parse_deleg_inos(p, end, s); 524 if (ret) 525 return ret; 526 } else { 527 /* legacy */ 528 ceph_decode_64_safe(p, end, info->ino, bad); 529 info->has_create_ino = true; 530 } 531 } else { 532 if (*p != end) 533 goto bad; 534 } 535 536 /* Skip over any unrecognized fields */ 537 *p = end; 538 return 0; 539 bad: 540 return -EIO; 541 } 542 543 /* 544 * parse extra results 545 */ 546 static int parse_reply_info_extra(void **p, void *end, 547 struct ceph_mds_reply_info_parsed *info, 548 u64 features, struct ceph_mds_session *s) 549 { 550 u32 op = le32_to_cpu(info->head->op); 551 552 if (op == CEPH_MDS_OP_GETFILELOCK) 553 return parse_reply_info_filelock(p, end, info, features); 554 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 555 return parse_reply_info_readdir(p, end, info, features); 556 else if (op == CEPH_MDS_OP_CREATE) 557 return parse_reply_info_create(p, end, info, features, s); 558 else 559 return -EIO; 560 } 561 562 /* 563 * parse entire mds reply 564 */ 565 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 566 struct ceph_mds_reply_info_parsed *info, 567 u64 features) 568 { 569 void *p, *end; 570 u32 len; 571 int err; 572 573 info->head = msg->front.iov_base; 574 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 575 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 576 577 /* trace */ 578 ceph_decode_32_safe(&p, end, len, bad); 579 if (len > 0) { 580 ceph_decode_need(&p, end, len, bad); 581 err = parse_reply_info_trace(&p, p+len, info, features); 582 if (err < 0) 583 goto out_bad; 584 } 585 586 /* extra */ 587 ceph_decode_32_safe(&p, end, len, bad); 588 if (len > 0) { 589 ceph_decode_need(&p, end, len, bad); 590 err = parse_reply_info_extra(&p, p+len, info, features, s); 591 if (err < 0) 592 goto out_bad; 593 } 594 595 /* snap blob */ 596 ceph_decode_32_safe(&p, end, len, bad); 597 info->snapblob_len = len; 598 info->snapblob = p; 599 p += len; 600 601 if (p != end) 602 goto bad; 603 return 0; 604 605 bad: 606 err = -EIO; 607 out_bad: 608 pr_err("mds parse_reply err %d\n", err); 609 return err; 610 } 611 612 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 613 { 614 if (!info->dir_entries) 615 return; 616 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 617 } 618 619 620 /* 621 * sessions 622 */ 623 const char *ceph_session_state_name(int s) 624 { 625 switch (s) { 626 case CEPH_MDS_SESSION_NEW: return "new"; 627 case CEPH_MDS_SESSION_OPENING: return "opening"; 628 case CEPH_MDS_SESSION_OPEN: return "open"; 629 case CEPH_MDS_SESSION_HUNG: return "hung"; 630 case CEPH_MDS_SESSION_CLOSING: return "closing"; 631 case CEPH_MDS_SESSION_CLOSED: return "closed"; 632 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 633 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 634 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 635 default: return "???"; 636 } 637 } 638 639 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 640 { 641 if (refcount_inc_not_zero(&s->s_ref)) { 642 dout("mdsc get_session %p %d -> %d\n", s, 643 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref)); 644 return s; 645 } else { 646 dout("mdsc get_session %p 0 -- FAIL\n", s); 647 return NULL; 648 } 649 } 650 651 void ceph_put_mds_session(struct ceph_mds_session *s) 652 { 653 dout("mdsc put_session %p %d -> %d\n", s, 654 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1); 655 if (refcount_dec_and_test(&s->s_ref)) { 656 if (s->s_auth.authorizer) 657 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 658 WARN_ON(mutex_is_locked(&s->s_mutex)); 659 xa_destroy(&s->s_delegated_inos); 660 kfree(s); 661 } 662 } 663 664 /* 665 * called under mdsc->mutex 666 */ 667 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 668 int mds) 669 { 670 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 671 return NULL; 672 return ceph_get_mds_session(mdsc->sessions[mds]); 673 } 674 675 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 676 { 677 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 678 return false; 679 else 680 return true; 681 } 682 683 static int __verify_registered_session(struct ceph_mds_client *mdsc, 684 struct ceph_mds_session *s) 685 { 686 if (s->s_mds >= mdsc->max_sessions || 687 mdsc->sessions[s->s_mds] != s) 688 return -ENOENT; 689 return 0; 690 } 691 692 /* 693 * create+register a new session for given mds. 694 * called under mdsc->mutex. 695 */ 696 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 697 int mds) 698 { 699 struct ceph_mds_session *s; 700 701 if (mds >= mdsc->mdsmap->possible_max_rank) 702 return ERR_PTR(-EINVAL); 703 704 s = kzalloc(sizeof(*s), GFP_NOFS); 705 if (!s) 706 return ERR_PTR(-ENOMEM); 707 708 if (mds >= mdsc->max_sessions) { 709 int newmax = 1 << get_count_order(mds + 1); 710 struct ceph_mds_session **sa; 711 712 dout("%s: realloc to %d\n", __func__, newmax); 713 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 714 if (!sa) 715 goto fail_realloc; 716 if (mdsc->sessions) { 717 memcpy(sa, mdsc->sessions, 718 mdsc->max_sessions * sizeof(void *)); 719 kfree(mdsc->sessions); 720 } 721 mdsc->sessions = sa; 722 mdsc->max_sessions = newmax; 723 } 724 725 dout("%s: mds%d\n", __func__, mds); 726 s->s_mdsc = mdsc; 727 s->s_mds = mds; 728 s->s_state = CEPH_MDS_SESSION_NEW; 729 s->s_ttl = 0; 730 s->s_seq = 0; 731 mutex_init(&s->s_mutex); 732 733 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 734 735 spin_lock_init(&s->s_gen_ttl_lock); 736 s->s_cap_gen = 1; 737 s->s_cap_ttl = jiffies - 1; 738 739 spin_lock_init(&s->s_cap_lock); 740 s->s_renew_requested = 0; 741 s->s_renew_seq = 0; 742 INIT_LIST_HEAD(&s->s_caps); 743 s->s_nr_caps = 0; 744 refcount_set(&s->s_ref, 1); 745 INIT_LIST_HEAD(&s->s_waiting); 746 INIT_LIST_HEAD(&s->s_unsafe); 747 xa_init(&s->s_delegated_inos); 748 s->s_num_cap_releases = 0; 749 s->s_cap_reconnect = 0; 750 s->s_cap_iterator = NULL; 751 INIT_LIST_HEAD(&s->s_cap_releases); 752 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 753 754 INIT_LIST_HEAD(&s->s_cap_dirty); 755 INIT_LIST_HEAD(&s->s_cap_flushing); 756 757 mdsc->sessions[mds] = s; 758 atomic_inc(&mdsc->num_sessions); 759 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 760 761 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 762 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 763 764 return s; 765 766 fail_realloc: 767 kfree(s); 768 return ERR_PTR(-ENOMEM); 769 } 770 771 /* 772 * called under mdsc->mutex 773 */ 774 static void __unregister_session(struct ceph_mds_client *mdsc, 775 struct ceph_mds_session *s) 776 { 777 dout("__unregister_session mds%d %p\n", s->s_mds, s); 778 BUG_ON(mdsc->sessions[s->s_mds] != s); 779 mdsc->sessions[s->s_mds] = NULL; 780 ceph_con_close(&s->s_con); 781 ceph_put_mds_session(s); 782 atomic_dec(&mdsc->num_sessions); 783 } 784 785 /* 786 * drop session refs in request. 787 * 788 * should be last request ref, or hold mdsc->mutex 789 */ 790 static void put_request_session(struct ceph_mds_request *req) 791 { 792 if (req->r_session) { 793 ceph_put_mds_session(req->r_session); 794 req->r_session = NULL; 795 } 796 } 797 798 void ceph_mdsc_release_request(struct kref *kref) 799 { 800 struct ceph_mds_request *req = container_of(kref, 801 struct ceph_mds_request, 802 r_kref); 803 ceph_mdsc_release_dir_caps_no_check(req); 804 destroy_reply_info(&req->r_reply_info); 805 if (req->r_request) 806 ceph_msg_put(req->r_request); 807 if (req->r_reply) 808 ceph_msg_put(req->r_reply); 809 if (req->r_inode) { 810 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 811 /* avoid calling iput_final() in mds dispatch threads */ 812 ceph_async_iput(req->r_inode); 813 } 814 if (req->r_parent) { 815 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 816 ceph_async_iput(req->r_parent); 817 } 818 ceph_async_iput(req->r_target_inode); 819 if (req->r_dentry) 820 dput(req->r_dentry); 821 if (req->r_old_dentry) 822 dput(req->r_old_dentry); 823 if (req->r_old_dentry_dir) { 824 /* 825 * track (and drop pins for) r_old_dentry_dir 826 * separately, since r_old_dentry's d_parent may have 827 * changed between the dir mutex being dropped and 828 * this request being freed. 829 */ 830 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 831 CEPH_CAP_PIN); 832 ceph_async_iput(req->r_old_dentry_dir); 833 } 834 kfree(req->r_path1); 835 kfree(req->r_path2); 836 put_cred(req->r_cred); 837 if (req->r_pagelist) 838 ceph_pagelist_release(req->r_pagelist); 839 put_request_session(req); 840 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 841 WARN_ON_ONCE(!list_empty(&req->r_wait)); 842 kmem_cache_free(ceph_mds_request_cachep, req); 843 } 844 845 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 846 847 /* 848 * lookup session, bump ref if found. 849 * 850 * called under mdsc->mutex. 851 */ 852 static struct ceph_mds_request * 853 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 854 { 855 struct ceph_mds_request *req; 856 857 req = lookup_request(&mdsc->request_tree, tid); 858 if (req) 859 ceph_mdsc_get_request(req); 860 861 return req; 862 } 863 864 /* 865 * Register an in-flight request, and assign a tid. Link to directory 866 * are modifying (if any). 867 * 868 * Called under mdsc->mutex. 869 */ 870 static void __register_request(struct ceph_mds_client *mdsc, 871 struct ceph_mds_request *req, 872 struct inode *dir) 873 { 874 int ret = 0; 875 876 req->r_tid = ++mdsc->last_tid; 877 if (req->r_num_caps) { 878 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 879 req->r_num_caps); 880 if (ret < 0) { 881 pr_err("__register_request %p " 882 "failed to reserve caps: %d\n", req, ret); 883 /* set req->r_err to fail early from __do_request */ 884 req->r_err = ret; 885 return; 886 } 887 } 888 dout("__register_request %p tid %lld\n", req, req->r_tid); 889 ceph_mdsc_get_request(req); 890 insert_request(&mdsc->request_tree, req); 891 892 req->r_cred = get_current_cred(); 893 894 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 895 mdsc->oldest_tid = req->r_tid; 896 897 if (dir) { 898 struct ceph_inode_info *ci = ceph_inode(dir); 899 900 ihold(dir); 901 req->r_unsafe_dir = dir; 902 spin_lock(&ci->i_unsafe_lock); 903 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 904 spin_unlock(&ci->i_unsafe_lock); 905 } 906 } 907 908 static void __unregister_request(struct ceph_mds_client *mdsc, 909 struct ceph_mds_request *req) 910 { 911 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 912 913 /* Never leave an unregistered request on an unsafe list! */ 914 list_del_init(&req->r_unsafe_item); 915 916 if (req->r_tid == mdsc->oldest_tid) { 917 struct rb_node *p = rb_next(&req->r_node); 918 mdsc->oldest_tid = 0; 919 while (p) { 920 struct ceph_mds_request *next_req = 921 rb_entry(p, struct ceph_mds_request, r_node); 922 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 923 mdsc->oldest_tid = next_req->r_tid; 924 break; 925 } 926 p = rb_next(p); 927 } 928 } 929 930 erase_request(&mdsc->request_tree, req); 931 932 if (req->r_unsafe_dir) { 933 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 934 spin_lock(&ci->i_unsafe_lock); 935 list_del_init(&req->r_unsafe_dir_item); 936 spin_unlock(&ci->i_unsafe_lock); 937 } 938 if (req->r_target_inode && 939 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 940 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 941 spin_lock(&ci->i_unsafe_lock); 942 list_del_init(&req->r_unsafe_target_item); 943 spin_unlock(&ci->i_unsafe_lock); 944 } 945 946 if (req->r_unsafe_dir) { 947 /* avoid calling iput_final() in mds dispatch threads */ 948 ceph_async_iput(req->r_unsafe_dir); 949 req->r_unsafe_dir = NULL; 950 } 951 952 complete_all(&req->r_safe_completion); 953 954 ceph_mdsc_put_request(req); 955 } 956 957 /* 958 * Walk back up the dentry tree until we hit a dentry representing a 959 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 960 * when calling this) to ensure that the objects won't disappear while we're 961 * working with them. Once we hit a candidate dentry, we attempt to take a 962 * reference to it, and return that as the result. 963 */ 964 static struct inode *get_nonsnap_parent(struct dentry *dentry) 965 { 966 struct inode *inode = NULL; 967 968 while (dentry && !IS_ROOT(dentry)) { 969 inode = d_inode_rcu(dentry); 970 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 971 break; 972 dentry = dentry->d_parent; 973 } 974 if (inode) 975 inode = igrab(inode); 976 return inode; 977 } 978 979 /* 980 * Choose mds to send request to next. If there is a hint set in the 981 * request (e.g., due to a prior forward hint from the mds), use that. 982 * Otherwise, consult frag tree and/or caps to identify the 983 * appropriate mds. If all else fails, choose randomly. 984 * 985 * Called under mdsc->mutex. 986 */ 987 static int __choose_mds(struct ceph_mds_client *mdsc, 988 struct ceph_mds_request *req, 989 bool *random) 990 { 991 struct inode *inode; 992 struct ceph_inode_info *ci; 993 struct ceph_cap *cap; 994 int mode = req->r_direct_mode; 995 int mds = -1; 996 u32 hash = req->r_direct_hash; 997 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 998 999 if (random) 1000 *random = false; 1001 1002 /* 1003 * is there a specific mds we should try? ignore hint if we have 1004 * no session and the mds is not up (active or recovering). 1005 */ 1006 if (req->r_resend_mds >= 0 && 1007 (__have_session(mdsc, req->r_resend_mds) || 1008 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1009 dout("%s using resend_mds mds%d\n", __func__, 1010 req->r_resend_mds); 1011 return req->r_resend_mds; 1012 } 1013 1014 if (mode == USE_RANDOM_MDS) 1015 goto random; 1016 1017 inode = NULL; 1018 if (req->r_inode) { 1019 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1020 inode = req->r_inode; 1021 ihold(inode); 1022 } else { 1023 /* req->r_dentry is non-null for LSSNAP request */ 1024 rcu_read_lock(); 1025 inode = get_nonsnap_parent(req->r_dentry); 1026 rcu_read_unlock(); 1027 dout("%s using snapdir's parent %p\n", __func__, inode); 1028 } 1029 } else if (req->r_dentry) { 1030 /* ignore race with rename; old or new d_parent is okay */ 1031 struct dentry *parent; 1032 struct inode *dir; 1033 1034 rcu_read_lock(); 1035 parent = READ_ONCE(req->r_dentry->d_parent); 1036 dir = req->r_parent ? : d_inode_rcu(parent); 1037 1038 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1039 /* not this fs or parent went negative */ 1040 inode = d_inode(req->r_dentry); 1041 if (inode) 1042 ihold(inode); 1043 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1044 /* direct snapped/virtual snapdir requests 1045 * based on parent dir inode */ 1046 inode = get_nonsnap_parent(parent); 1047 dout("%s using nonsnap parent %p\n", __func__, inode); 1048 } else { 1049 /* dentry target */ 1050 inode = d_inode(req->r_dentry); 1051 if (!inode || mode == USE_AUTH_MDS) { 1052 /* dir + name */ 1053 inode = igrab(dir); 1054 hash = ceph_dentry_hash(dir, req->r_dentry); 1055 is_hash = true; 1056 } else { 1057 ihold(inode); 1058 } 1059 } 1060 rcu_read_unlock(); 1061 } 1062 1063 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, 1064 hash, mode); 1065 if (!inode) 1066 goto random; 1067 ci = ceph_inode(inode); 1068 1069 if (is_hash && S_ISDIR(inode->i_mode)) { 1070 struct ceph_inode_frag frag; 1071 int found; 1072 1073 ceph_choose_frag(ci, hash, &frag, &found); 1074 if (found) { 1075 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1076 u8 r; 1077 1078 /* choose a random replica */ 1079 get_random_bytes(&r, 1); 1080 r %= frag.ndist; 1081 mds = frag.dist[r]; 1082 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", 1083 __func__, inode, ceph_vinop(inode), 1084 frag.frag, mds, (int)r, frag.ndist); 1085 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1086 CEPH_MDS_STATE_ACTIVE && 1087 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1088 goto out; 1089 } 1090 1091 /* since this file/dir wasn't known to be 1092 * replicated, then we want to look for the 1093 * authoritative mds. */ 1094 if (frag.mds >= 0) { 1095 /* choose auth mds */ 1096 mds = frag.mds; 1097 dout("%s %p %llx.%llx frag %u mds%d (auth)\n", 1098 __func__, inode, ceph_vinop(inode), 1099 frag.frag, mds); 1100 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1101 CEPH_MDS_STATE_ACTIVE) { 1102 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1103 mds)) 1104 goto out; 1105 } 1106 } 1107 mode = USE_AUTH_MDS; 1108 } 1109 } 1110 1111 spin_lock(&ci->i_ceph_lock); 1112 cap = NULL; 1113 if (mode == USE_AUTH_MDS) 1114 cap = ci->i_auth_cap; 1115 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1116 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1117 if (!cap) { 1118 spin_unlock(&ci->i_ceph_lock); 1119 ceph_async_iput(inode); 1120 goto random; 1121 } 1122 mds = cap->session->s_mds; 1123 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, 1124 inode, ceph_vinop(inode), mds, 1125 cap == ci->i_auth_cap ? "auth " : "", cap); 1126 spin_unlock(&ci->i_ceph_lock); 1127 out: 1128 /* avoid calling iput_final() while holding mdsc->mutex or 1129 * in mds dispatch threads */ 1130 ceph_async_iput(inode); 1131 return mds; 1132 1133 random: 1134 if (random) 1135 *random = true; 1136 1137 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1138 dout("%s chose random mds%d\n", __func__, mds); 1139 return mds; 1140 } 1141 1142 1143 /* 1144 * session messages 1145 */ 1146 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 1147 { 1148 struct ceph_msg *msg; 1149 struct ceph_mds_session_head *h; 1150 1151 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1152 false); 1153 if (!msg) { 1154 pr_err("create_session_msg ENOMEM creating msg\n"); 1155 return NULL; 1156 } 1157 h = msg->front.iov_base; 1158 h->op = cpu_to_le32(op); 1159 h->seq = cpu_to_le64(seq); 1160 1161 return msg; 1162 } 1163 1164 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1165 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1166 static int encode_supported_features(void **p, void *end) 1167 { 1168 static const size_t count = ARRAY_SIZE(feature_bits); 1169 1170 if (count > 0) { 1171 size_t i; 1172 size_t size = FEATURE_BYTES(count); 1173 1174 if (WARN_ON_ONCE(*p + 4 + size > end)) 1175 return -ERANGE; 1176 1177 ceph_encode_32(p, size); 1178 memset(*p, 0, size); 1179 for (i = 0; i < count; i++) 1180 ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8); 1181 *p += size; 1182 } else { 1183 if (WARN_ON_ONCE(*p + 4 > end)) 1184 return -ERANGE; 1185 1186 ceph_encode_32(p, 0); 1187 } 1188 1189 return 0; 1190 } 1191 1192 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1193 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1194 static int encode_metric_spec(void **p, void *end) 1195 { 1196 static const size_t count = ARRAY_SIZE(metric_bits); 1197 1198 /* header */ 1199 if (WARN_ON_ONCE(*p + 2 > end)) 1200 return -ERANGE; 1201 1202 ceph_encode_8(p, 1); /* version */ 1203 ceph_encode_8(p, 1); /* compat */ 1204 1205 if (count > 0) { 1206 size_t i; 1207 size_t size = METRIC_BYTES(count); 1208 1209 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1210 return -ERANGE; 1211 1212 /* metric spec info length */ 1213 ceph_encode_32(p, 4 + size); 1214 1215 /* metric spec */ 1216 ceph_encode_32(p, size); 1217 memset(*p, 0, size); 1218 for (i = 0; i < count; i++) 1219 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1220 *p += size; 1221 } else { 1222 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1223 return -ERANGE; 1224 1225 /* metric spec info length */ 1226 ceph_encode_32(p, 4); 1227 /* metric spec */ 1228 ceph_encode_32(p, 0); 1229 } 1230 1231 return 0; 1232 } 1233 1234 /* 1235 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1236 * to include additional client metadata fields. 1237 */ 1238 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1239 { 1240 struct ceph_msg *msg; 1241 struct ceph_mds_session_head *h; 1242 int i; 1243 int extra_bytes = 0; 1244 int metadata_key_count = 0; 1245 struct ceph_options *opt = mdsc->fsc->client->options; 1246 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1247 size_t size, count; 1248 void *p, *end; 1249 int ret; 1250 1251 const char* metadata[][2] = { 1252 {"hostname", mdsc->nodename}, 1253 {"kernel_version", init_utsname()->release}, 1254 {"entity_id", opt->name ? : ""}, 1255 {"root", fsopt->server_path ? : "/"}, 1256 {NULL, NULL} 1257 }; 1258 1259 /* Calculate serialized length of metadata */ 1260 extra_bytes = 4; /* map length */ 1261 for (i = 0; metadata[i][0]; ++i) { 1262 extra_bytes += 8 + strlen(metadata[i][0]) + 1263 strlen(metadata[i][1]); 1264 metadata_key_count++; 1265 } 1266 1267 /* supported feature */ 1268 size = 0; 1269 count = ARRAY_SIZE(feature_bits); 1270 if (count > 0) 1271 size = FEATURE_BYTES(count); 1272 extra_bytes += 4 + size; 1273 1274 /* metric spec */ 1275 size = 0; 1276 count = ARRAY_SIZE(metric_bits); 1277 if (count > 0) 1278 size = METRIC_BYTES(count); 1279 extra_bytes += 2 + 4 + 4 + size; 1280 1281 /* Allocate the message */ 1282 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1283 GFP_NOFS, false); 1284 if (!msg) { 1285 pr_err("create_session_msg ENOMEM creating msg\n"); 1286 return ERR_PTR(-ENOMEM); 1287 } 1288 p = msg->front.iov_base; 1289 end = p + msg->front.iov_len; 1290 1291 h = p; 1292 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1293 h->seq = cpu_to_le64(seq); 1294 1295 /* 1296 * Serialize client metadata into waiting buffer space, using 1297 * the format that userspace expects for map<string, string> 1298 * 1299 * ClientSession messages with metadata are v4 1300 */ 1301 msg->hdr.version = cpu_to_le16(4); 1302 msg->hdr.compat_version = cpu_to_le16(1); 1303 1304 /* The write pointer, following the session_head structure */ 1305 p += sizeof(*h); 1306 1307 /* Number of entries in the map */ 1308 ceph_encode_32(&p, metadata_key_count); 1309 1310 /* Two length-prefixed strings for each entry in the map */ 1311 for (i = 0; metadata[i][0]; ++i) { 1312 size_t const key_len = strlen(metadata[i][0]); 1313 size_t const val_len = strlen(metadata[i][1]); 1314 1315 ceph_encode_32(&p, key_len); 1316 memcpy(p, metadata[i][0], key_len); 1317 p += key_len; 1318 ceph_encode_32(&p, val_len); 1319 memcpy(p, metadata[i][1], val_len); 1320 p += val_len; 1321 } 1322 1323 ret = encode_supported_features(&p, end); 1324 if (ret) { 1325 pr_err("encode_supported_features failed!\n"); 1326 ceph_msg_put(msg); 1327 return ERR_PTR(ret); 1328 } 1329 1330 ret = encode_metric_spec(&p, end); 1331 if (ret) { 1332 pr_err("encode_metric_spec failed!\n"); 1333 ceph_msg_put(msg); 1334 return ERR_PTR(ret); 1335 } 1336 1337 msg->front.iov_len = p - msg->front.iov_base; 1338 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1339 1340 return msg; 1341 } 1342 1343 /* 1344 * send session open request. 1345 * 1346 * called under mdsc->mutex 1347 */ 1348 static int __open_session(struct ceph_mds_client *mdsc, 1349 struct ceph_mds_session *session) 1350 { 1351 struct ceph_msg *msg; 1352 int mstate; 1353 int mds = session->s_mds; 1354 1355 /* wait for mds to go active? */ 1356 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1357 dout("open_session to mds%d (%s)\n", mds, 1358 ceph_mds_state_name(mstate)); 1359 session->s_state = CEPH_MDS_SESSION_OPENING; 1360 session->s_renew_requested = jiffies; 1361 1362 /* send connect message */ 1363 msg = create_session_open_msg(mdsc, session->s_seq); 1364 if (IS_ERR(msg)) 1365 return PTR_ERR(msg); 1366 ceph_con_send(&session->s_con, msg); 1367 return 0; 1368 } 1369 1370 /* 1371 * open sessions for any export targets for the given mds 1372 * 1373 * called under mdsc->mutex 1374 */ 1375 static struct ceph_mds_session * 1376 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1377 { 1378 struct ceph_mds_session *session; 1379 int ret; 1380 1381 session = __ceph_lookup_mds_session(mdsc, target); 1382 if (!session) { 1383 session = register_session(mdsc, target); 1384 if (IS_ERR(session)) 1385 return session; 1386 } 1387 if (session->s_state == CEPH_MDS_SESSION_NEW || 1388 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1389 ret = __open_session(mdsc, session); 1390 if (ret) 1391 return ERR_PTR(ret); 1392 } 1393 1394 return session; 1395 } 1396 1397 struct ceph_mds_session * 1398 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1399 { 1400 struct ceph_mds_session *session; 1401 1402 dout("open_export_target_session to mds%d\n", target); 1403 1404 mutex_lock(&mdsc->mutex); 1405 session = __open_export_target_session(mdsc, target); 1406 mutex_unlock(&mdsc->mutex); 1407 1408 return session; 1409 } 1410 1411 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1412 struct ceph_mds_session *session) 1413 { 1414 struct ceph_mds_info *mi; 1415 struct ceph_mds_session *ts; 1416 int i, mds = session->s_mds; 1417 1418 if (mds >= mdsc->mdsmap->possible_max_rank) 1419 return; 1420 1421 mi = &mdsc->mdsmap->m_info[mds]; 1422 dout("open_export_target_sessions for mds%d (%d targets)\n", 1423 session->s_mds, mi->num_export_targets); 1424 1425 for (i = 0; i < mi->num_export_targets; i++) { 1426 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1427 if (!IS_ERR(ts)) 1428 ceph_put_mds_session(ts); 1429 } 1430 } 1431 1432 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1433 struct ceph_mds_session *session) 1434 { 1435 mutex_lock(&mdsc->mutex); 1436 __open_export_target_sessions(mdsc, session); 1437 mutex_unlock(&mdsc->mutex); 1438 } 1439 1440 /* 1441 * session caps 1442 */ 1443 1444 static void detach_cap_releases(struct ceph_mds_session *session, 1445 struct list_head *target) 1446 { 1447 lockdep_assert_held(&session->s_cap_lock); 1448 1449 list_splice_init(&session->s_cap_releases, target); 1450 session->s_num_cap_releases = 0; 1451 dout("dispose_cap_releases mds%d\n", session->s_mds); 1452 } 1453 1454 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1455 struct list_head *dispose) 1456 { 1457 while (!list_empty(dispose)) { 1458 struct ceph_cap *cap; 1459 /* zero out the in-progress message */ 1460 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1461 list_del(&cap->session_caps); 1462 ceph_put_cap(mdsc, cap); 1463 } 1464 } 1465 1466 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1467 struct ceph_mds_session *session) 1468 { 1469 struct ceph_mds_request *req; 1470 struct rb_node *p; 1471 struct ceph_inode_info *ci; 1472 1473 dout("cleanup_session_requests mds%d\n", session->s_mds); 1474 mutex_lock(&mdsc->mutex); 1475 while (!list_empty(&session->s_unsafe)) { 1476 req = list_first_entry(&session->s_unsafe, 1477 struct ceph_mds_request, r_unsafe_item); 1478 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1479 req->r_tid); 1480 if (req->r_target_inode) { 1481 /* dropping unsafe change of inode's attributes */ 1482 ci = ceph_inode(req->r_target_inode); 1483 errseq_set(&ci->i_meta_err, -EIO); 1484 } 1485 if (req->r_unsafe_dir) { 1486 /* dropping unsafe directory operation */ 1487 ci = ceph_inode(req->r_unsafe_dir); 1488 errseq_set(&ci->i_meta_err, -EIO); 1489 } 1490 __unregister_request(mdsc, req); 1491 } 1492 /* zero r_attempts, so kick_requests() will re-send requests */ 1493 p = rb_first(&mdsc->request_tree); 1494 while (p) { 1495 req = rb_entry(p, struct ceph_mds_request, r_node); 1496 p = rb_next(p); 1497 if (req->r_session && 1498 req->r_session->s_mds == session->s_mds) 1499 req->r_attempts = 0; 1500 } 1501 mutex_unlock(&mdsc->mutex); 1502 } 1503 1504 /* 1505 * Helper to safely iterate over all caps associated with a session, with 1506 * special care taken to handle a racing __ceph_remove_cap(). 1507 * 1508 * Caller must hold session s_mutex. 1509 */ 1510 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1511 int (*cb)(struct inode *, struct ceph_cap *, 1512 void *), void *arg) 1513 { 1514 struct list_head *p; 1515 struct ceph_cap *cap; 1516 struct inode *inode, *last_inode = NULL; 1517 struct ceph_cap *old_cap = NULL; 1518 int ret; 1519 1520 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1521 spin_lock(&session->s_cap_lock); 1522 p = session->s_caps.next; 1523 while (p != &session->s_caps) { 1524 cap = list_entry(p, struct ceph_cap, session_caps); 1525 inode = igrab(&cap->ci->vfs_inode); 1526 if (!inode) { 1527 p = p->next; 1528 continue; 1529 } 1530 session->s_cap_iterator = cap; 1531 spin_unlock(&session->s_cap_lock); 1532 1533 if (last_inode) { 1534 /* avoid calling iput_final() while holding 1535 * s_mutex or in mds dispatch threads */ 1536 ceph_async_iput(last_inode); 1537 last_inode = NULL; 1538 } 1539 if (old_cap) { 1540 ceph_put_cap(session->s_mdsc, old_cap); 1541 old_cap = NULL; 1542 } 1543 1544 ret = cb(inode, cap, arg); 1545 last_inode = inode; 1546 1547 spin_lock(&session->s_cap_lock); 1548 p = p->next; 1549 if (!cap->ci) { 1550 dout("iterate_session_caps finishing cap %p removal\n", 1551 cap); 1552 BUG_ON(cap->session != session); 1553 cap->session = NULL; 1554 list_del_init(&cap->session_caps); 1555 session->s_nr_caps--; 1556 atomic64_dec(&session->s_mdsc->metric.total_caps); 1557 if (cap->queue_release) 1558 __ceph_queue_cap_release(session, cap); 1559 else 1560 old_cap = cap; /* put_cap it w/o locks held */ 1561 } 1562 if (ret < 0) 1563 goto out; 1564 } 1565 ret = 0; 1566 out: 1567 session->s_cap_iterator = NULL; 1568 spin_unlock(&session->s_cap_lock); 1569 1570 ceph_async_iput(last_inode); 1571 if (old_cap) 1572 ceph_put_cap(session->s_mdsc, old_cap); 1573 1574 return ret; 1575 } 1576 1577 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1578 void *arg) 1579 { 1580 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg; 1581 struct ceph_inode_info *ci = ceph_inode(inode); 1582 LIST_HEAD(to_remove); 1583 bool dirty_dropped = false; 1584 bool invalidate = false; 1585 1586 dout("removing cap %p, ci is %p, inode is %p\n", 1587 cap, ci, &ci->vfs_inode); 1588 spin_lock(&ci->i_ceph_lock); 1589 __ceph_remove_cap(cap, false); 1590 if (!ci->i_auth_cap) { 1591 struct ceph_cap_flush *cf; 1592 struct ceph_mds_client *mdsc = fsc->mdsc; 1593 1594 if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) { 1595 if (inode->i_data.nrpages > 0) 1596 invalidate = true; 1597 if (ci->i_wrbuffer_ref > 0) 1598 mapping_set_error(&inode->i_data, -EIO); 1599 } 1600 1601 while (!list_empty(&ci->i_cap_flush_list)) { 1602 cf = list_first_entry(&ci->i_cap_flush_list, 1603 struct ceph_cap_flush, i_list); 1604 list_move(&cf->i_list, &to_remove); 1605 } 1606 1607 spin_lock(&mdsc->cap_dirty_lock); 1608 1609 list_for_each_entry(cf, &to_remove, i_list) 1610 list_del(&cf->g_list); 1611 1612 if (!list_empty(&ci->i_dirty_item)) { 1613 pr_warn_ratelimited( 1614 " dropping dirty %s state for %p %lld\n", 1615 ceph_cap_string(ci->i_dirty_caps), 1616 inode, ceph_ino(inode)); 1617 ci->i_dirty_caps = 0; 1618 list_del_init(&ci->i_dirty_item); 1619 dirty_dropped = true; 1620 } 1621 if (!list_empty(&ci->i_flushing_item)) { 1622 pr_warn_ratelimited( 1623 " dropping dirty+flushing %s state for %p %lld\n", 1624 ceph_cap_string(ci->i_flushing_caps), 1625 inode, ceph_ino(inode)); 1626 ci->i_flushing_caps = 0; 1627 list_del_init(&ci->i_flushing_item); 1628 mdsc->num_cap_flushing--; 1629 dirty_dropped = true; 1630 } 1631 spin_unlock(&mdsc->cap_dirty_lock); 1632 1633 if (dirty_dropped) { 1634 errseq_set(&ci->i_meta_err, -EIO); 1635 1636 if (ci->i_wrbuffer_ref_head == 0 && 1637 ci->i_wr_ref == 0 && 1638 ci->i_dirty_caps == 0 && 1639 ci->i_flushing_caps == 0) { 1640 ceph_put_snap_context(ci->i_head_snapc); 1641 ci->i_head_snapc = NULL; 1642 } 1643 } 1644 1645 if (atomic_read(&ci->i_filelock_ref) > 0) { 1646 /* make further file lock syscall return -EIO */ 1647 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK; 1648 pr_warn_ratelimited(" dropping file locks for %p %lld\n", 1649 inode, ceph_ino(inode)); 1650 } 1651 1652 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { 1653 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); 1654 ci->i_prealloc_cap_flush = NULL; 1655 } 1656 } 1657 spin_unlock(&ci->i_ceph_lock); 1658 while (!list_empty(&to_remove)) { 1659 struct ceph_cap_flush *cf; 1660 cf = list_first_entry(&to_remove, 1661 struct ceph_cap_flush, i_list); 1662 list_del(&cf->i_list); 1663 ceph_free_cap_flush(cf); 1664 } 1665 1666 wake_up_all(&ci->i_cap_wq); 1667 if (invalidate) 1668 ceph_queue_invalidate(inode); 1669 if (dirty_dropped) 1670 iput(inode); 1671 return 0; 1672 } 1673 1674 /* 1675 * caller must hold session s_mutex 1676 */ 1677 static void remove_session_caps(struct ceph_mds_session *session) 1678 { 1679 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1680 struct super_block *sb = fsc->sb; 1681 LIST_HEAD(dispose); 1682 1683 dout("remove_session_caps on %p\n", session); 1684 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1685 1686 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1687 1688 spin_lock(&session->s_cap_lock); 1689 if (session->s_nr_caps > 0) { 1690 struct inode *inode; 1691 struct ceph_cap *cap, *prev = NULL; 1692 struct ceph_vino vino; 1693 /* 1694 * iterate_session_caps() skips inodes that are being 1695 * deleted, we need to wait until deletions are complete. 1696 * __wait_on_freeing_inode() is designed for the job, 1697 * but it is not exported, so use lookup inode function 1698 * to access it. 1699 */ 1700 while (!list_empty(&session->s_caps)) { 1701 cap = list_entry(session->s_caps.next, 1702 struct ceph_cap, session_caps); 1703 if (cap == prev) 1704 break; 1705 prev = cap; 1706 vino = cap->ci->i_vino; 1707 spin_unlock(&session->s_cap_lock); 1708 1709 inode = ceph_find_inode(sb, vino); 1710 /* avoid calling iput_final() while holding s_mutex */ 1711 ceph_async_iput(inode); 1712 1713 spin_lock(&session->s_cap_lock); 1714 } 1715 } 1716 1717 // drop cap expires and unlock s_cap_lock 1718 detach_cap_releases(session, &dispose); 1719 1720 BUG_ON(session->s_nr_caps > 0); 1721 BUG_ON(!list_empty(&session->s_cap_flushing)); 1722 spin_unlock(&session->s_cap_lock); 1723 dispose_cap_releases(session->s_mdsc, &dispose); 1724 } 1725 1726 enum { 1727 RECONNECT, 1728 RENEWCAPS, 1729 FORCE_RO, 1730 }; 1731 1732 /* 1733 * wake up any threads waiting on this session's caps. if the cap is 1734 * old (didn't get renewed on the client reconnect), remove it now. 1735 * 1736 * caller must hold s_mutex. 1737 */ 1738 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1739 void *arg) 1740 { 1741 struct ceph_inode_info *ci = ceph_inode(inode); 1742 unsigned long ev = (unsigned long)arg; 1743 1744 if (ev == RECONNECT) { 1745 spin_lock(&ci->i_ceph_lock); 1746 ci->i_wanted_max_size = 0; 1747 ci->i_requested_max_size = 0; 1748 spin_unlock(&ci->i_ceph_lock); 1749 } else if (ev == RENEWCAPS) { 1750 if (cap->cap_gen < cap->session->s_cap_gen) { 1751 /* mds did not re-issue stale cap */ 1752 spin_lock(&ci->i_ceph_lock); 1753 cap->issued = cap->implemented = CEPH_CAP_PIN; 1754 spin_unlock(&ci->i_ceph_lock); 1755 } 1756 } else if (ev == FORCE_RO) { 1757 } 1758 wake_up_all(&ci->i_cap_wq); 1759 return 0; 1760 } 1761 1762 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1763 { 1764 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1765 ceph_iterate_session_caps(session, wake_up_session_cb, 1766 (void *)(unsigned long)ev); 1767 } 1768 1769 /* 1770 * Send periodic message to MDS renewing all currently held caps. The 1771 * ack will reset the expiration for all caps from this session. 1772 * 1773 * caller holds s_mutex 1774 */ 1775 static int send_renew_caps(struct ceph_mds_client *mdsc, 1776 struct ceph_mds_session *session) 1777 { 1778 struct ceph_msg *msg; 1779 int state; 1780 1781 if (time_after_eq(jiffies, session->s_cap_ttl) && 1782 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1783 pr_info("mds%d caps stale\n", session->s_mds); 1784 session->s_renew_requested = jiffies; 1785 1786 /* do not try to renew caps until a recovering mds has reconnected 1787 * with its clients. */ 1788 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1789 if (state < CEPH_MDS_STATE_RECONNECT) { 1790 dout("send_renew_caps ignoring mds%d (%s)\n", 1791 session->s_mds, ceph_mds_state_name(state)); 1792 return 0; 1793 } 1794 1795 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1796 ceph_mds_state_name(state)); 1797 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1798 ++session->s_renew_seq); 1799 if (!msg) 1800 return -ENOMEM; 1801 ceph_con_send(&session->s_con, msg); 1802 return 0; 1803 } 1804 1805 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1806 struct ceph_mds_session *session, u64 seq) 1807 { 1808 struct ceph_msg *msg; 1809 1810 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1811 session->s_mds, ceph_session_state_name(session->s_state), seq); 1812 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1813 if (!msg) 1814 return -ENOMEM; 1815 ceph_con_send(&session->s_con, msg); 1816 return 0; 1817 } 1818 1819 1820 /* 1821 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1822 * 1823 * Called under session->s_mutex 1824 */ 1825 static void renewed_caps(struct ceph_mds_client *mdsc, 1826 struct ceph_mds_session *session, int is_renew) 1827 { 1828 int was_stale; 1829 int wake = 0; 1830 1831 spin_lock(&session->s_cap_lock); 1832 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1833 1834 session->s_cap_ttl = session->s_renew_requested + 1835 mdsc->mdsmap->m_session_timeout*HZ; 1836 1837 if (was_stale) { 1838 if (time_before(jiffies, session->s_cap_ttl)) { 1839 pr_info("mds%d caps renewed\n", session->s_mds); 1840 wake = 1; 1841 } else { 1842 pr_info("mds%d caps still stale\n", session->s_mds); 1843 } 1844 } 1845 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1846 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1847 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1848 spin_unlock(&session->s_cap_lock); 1849 1850 if (wake) 1851 wake_up_session_caps(session, RENEWCAPS); 1852 } 1853 1854 /* 1855 * send a session close request 1856 */ 1857 static int request_close_session(struct ceph_mds_session *session) 1858 { 1859 struct ceph_msg *msg; 1860 1861 dout("request_close_session mds%d state %s seq %lld\n", 1862 session->s_mds, ceph_session_state_name(session->s_state), 1863 session->s_seq); 1864 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1865 if (!msg) 1866 return -ENOMEM; 1867 ceph_con_send(&session->s_con, msg); 1868 return 1; 1869 } 1870 1871 /* 1872 * Called with s_mutex held. 1873 */ 1874 static int __close_session(struct ceph_mds_client *mdsc, 1875 struct ceph_mds_session *session) 1876 { 1877 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1878 return 0; 1879 session->s_state = CEPH_MDS_SESSION_CLOSING; 1880 return request_close_session(session); 1881 } 1882 1883 static bool drop_negative_children(struct dentry *dentry) 1884 { 1885 struct dentry *child; 1886 bool all_negative = true; 1887 1888 if (!d_is_dir(dentry)) 1889 goto out; 1890 1891 spin_lock(&dentry->d_lock); 1892 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1893 if (d_really_is_positive(child)) { 1894 all_negative = false; 1895 break; 1896 } 1897 } 1898 spin_unlock(&dentry->d_lock); 1899 1900 if (all_negative) 1901 shrink_dcache_parent(dentry); 1902 out: 1903 return all_negative; 1904 } 1905 1906 /* 1907 * Trim old(er) caps. 1908 * 1909 * Because we can't cache an inode without one or more caps, we do 1910 * this indirectly: if a cap is unused, we prune its aliases, at which 1911 * point the inode will hopefully get dropped to. 1912 * 1913 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1914 * memory pressure from the MDS, though, so it needn't be perfect. 1915 */ 1916 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1917 { 1918 int *remaining = arg; 1919 struct ceph_inode_info *ci = ceph_inode(inode); 1920 int used, wanted, oissued, mine; 1921 1922 if (*remaining <= 0) 1923 return -1; 1924 1925 spin_lock(&ci->i_ceph_lock); 1926 mine = cap->issued | cap->implemented; 1927 used = __ceph_caps_used(ci); 1928 wanted = __ceph_caps_file_wanted(ci); 1929 oissued = __ceph_caps_issued_other(ci, cap); 1930 1931 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1932 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1933 ceph_cap_string(used), ceph_cap_string(wanted)); 1934 if (cap == ci->i_auth_cap) { 1935 if (ci->i_dirty_caps || ci->i_flushing_caps || 1936 !list_empty(&ci->i_cap_snaps)) 1937 goto out; 1938 if ((used | wanted) & CEPH_CAP_ANY_WR) 1939 goto out; 1940 /* Note: it's possible that i_filelock_ref becomes non-zero 1941 * after dropping auth caps. It doesn't hurt because reply 1942 * of lock mds request will re-add auth caps. */ 1943 if (atomic_read(&ci->i_filelock_ref) > 0) 1944 goto out; 1945 } 1946 /* The inode has cached pages, but it's no longer used. 1947 * we can safely drop it */ 1948 if (S_ISREG(inode->i_mode) && 1949 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 1950 !(oissued & CEPH_CAP_FILE_CACHE)) { 1951 used = 0; 1952 oissued = 0; 1953 } 1954 if ((used | wanted) & ~oissued & mine) 1955 goto out; /* we need these caps */ 1956 1957 if (oissued) { 1958 /* we aren't the only cap.. just remove us */ 1959 __ceph_remove_cap(cap, true); 1960 (*remaining)--; 1961 } else { 1962 struct dentry *dentry; 1963 /* try dropping referring dentries */ 1964 spin_unlock(&ci->i_ceph_lock); 1965 dentry = d_find_any_alias(inode); 1966 if (dentry && drop_negative_children(dentry)) { 1967 int count; 1968 dput(dentry); 1969 d_prune_aliases(inode); 1970 count = atomic_read(&inode->i_count); 1971 if (count == 1) 1972 (*remaining)--; 1973 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1974 inode, cap, count); 1975 } else { 1976 dput(dentry); 1977 } 1978 return 0; 1979 } 1980 1981 out: 1982 spin_unlock(&ci->i_ceph_lock); 1983 return 0; 1984 } 1985 1986 /* 1987 * Trim session cap count down to some max number. 1988 */ 1989 int ceph_trim_caps(struct ceph_mds_client *mdsc, 1990 struct ceph_mds_session *session, 1991 int max_caps) 1992 { 1993 int trim_caps = session->s_nr_caps - max_caps; 1994 1995 dout("trim_caps mds%d start: %d / %d, trim %d\n", 1996 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 1997 if (trim_caps > 0) { 1998 int remaining = trim_caps; 1999 2000 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2001 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 2002 session->s_mds, session->s_nr_caps, max_caps, 2003 trim_caps - remaining); 2004 } 2005 2006 ceph_flush_cap_releases(mdsc, session); 2007 return 0; 2008 } 2009 2010 static int check_caps_flush(struct ceph_mds_client *mdsc, 2011 u64 want_flush_tid) 2012 { 2013 int ret = 1; 2014 2015 spin_lock(&mdsc->cap_dirty_lock); 2016 if (!list_empty(&mdsc->cap_flush_list)) { 2017 struct ceph_cap_flush *cf = 2018 list_first_entry(&mdsc->cap_flush_list, 2019 struct ceph_cap_flush, g_list); 2020 if (cf->tid <= want_flush_tid) { 2021 dout("check_caps_flush still flushing tid " 2022 "%llu <= %llu\n", cf->tid, want_flush_tid); 2023 ret = 0; 2024 } 2025 } 2026 spin_unlock(&mdsc->cap_dirty_lock); 2027 return ret; 2028 } 2029 2030 /* 2031 * flush all dirty inode data to disk. 2032 * 2033 * returns true if we've flushed through want_flush_tid 2034 */ 2035 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2036 u64 want_flush_tid) 2037 { 2038 dout("check_caps_flush want %llu\n", want_flush_tid); 2039 2040 wait_event(mdsc->cap_flushing_wq, 2041 check_caps_flush(mdsc, want_flush_tid)); 2042 2043 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 2044 } 2045 2046 /* 2047 * called under s_mutex 2048 */ 2049 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2050 struct ceph_mds_session *session) 2051 { 2052 struct ceph_msg *msg = NULL; 2053 struct ceph_mds_cap_release *head; 2054 struct ceph_mds_cap_item *item; 2055 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2056 struct ceph_cap *cap; 2057 LIST_HEAD(tmp_list); 2058 int num_cap_releases; 2059 __le32 barrier, *cap_barrier; 2060 2061 down_read(&osdc->lock); 2062 barrier = cpu_to_le32(osdc->epoch_barrier); 2063 up_read(&osdc->lock); 2064 2065 spin_lock(&session->s_cap_lock); 2066 again: 2067 list_splice_init(&session->s_cap_releases, &tmp_list); 2068 num_cap_releases = session->s_num_cap_releases; 2069 session->s_num_cap_releases = 0; 2070 spin_unlock(&session->s_cap_lock); 2071 2072 while (!list_empty(&tmp_list)) { 2073 if (!msg) { 2074 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2075 PAGE_SIZE, GFP_NOFS, false); 2076 if (!msg) 2077 goto out_err; 2078 head = msg->front.iov_base; 2079 head->num = cpu_to_le32(0); 2080 msg->front.iov_len = sizeof(*head); 2081 2082 msg->hdr.version = cpu_to_le16(2); 2083 msg->hdr.compat_version = cpu_to_le16(1); 2084 } 2085 2086 cap = list_first_entry(&tmp_list, struct ceph_cap, 2087 session_caps); 2088 list_del(&cap->session_caps); 2089 num_cap_releases--; 2090 2091 head = msg->front.iov_base; 2092 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2093 &head->num); 2094 item = msg->front.iov_base + msg->front.iov_len; 2095 item->ino = cpu_to_le64(cap->cap_ino); 2096 item->cap_id = cpu_to_le64(cap->cap_id); 2097 item->migrate_seq = cpu_to_le32(cap->mseq); 2098 item->seq = cpu_to_le32(cap->issue_seq); 2099 msg->front.iov_len += sizeof(*item); 2100 2101 ceph_put_cap(mdsc, cap); 2102 2103 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2104 // Append cap_barrier field 2105 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2106 *cap_barrier = barrier; 2107 msg->front.iov_len += sizeof(*cap_barrier); 2108 2109 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2110 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2111 ceph_con_send(&session->s_con, msg); 2112 msg = NULL; 2113 } 2114 } 2115 2116 BUG_ON(num_cap_releases != 0); 2117 2118 spin_lock(&session->s_cap_lock); 2119 if (!list_empty(&session->s_cap_releases)) 2120 goto again; 2121 spin_unlock(&session->s_cap_lock); 2122 2123 if (msg) { 2124 // Append cap_barrier field 2125 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2126 *cap_barrier = barrier; 2127 msg->front.iov_len += sizeof(*cap_barrier); 2128 2129 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2130 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2131 ceph_con_send(&session->s_con, msg); 2132 } 2133 return; 2134 out_err: 2135 pr_err("send_cap_releases mds%d, failed to allocate message\n", 2136 session->s_mds); 2137 spin_lock(&session->s_cap_lock); 2138 list_splice(&tmp_list, &session->s_cap_releases); 2139 session->s_num_cap_releases += num_cap_releases; 2140 spin_unlock(&session->s_cap_lock); 2141 } 2142 2143 static void ceph_cap_release_work(struct work_struct *work) 2144 { 2145 struct ceph_mds_session *session = 2146 container_of(work, struct ceph_mds_session, s_cap_release_work); 2147 2148 mutex_lock(&session->s_mutex); 2149 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2150 session->s_state == CEPH_MDS_SESSION_HUNG) 2151 ceph_send_cap_releases(session->s_mdsc, session); 2152 mutex_unlock(&session->s_mutex); 2153 ceph_put_mds_session(session); 2154 } 2155 2156 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 2157 struct ceph_mds_session *session) 2158 { 2159 if (mdsc->stopping) 2160 return; 2161 2162 ceph_get_mds_session(session); 2163 if (queue_work(mdsc->fsc->cap_wq, 2164 &session->s_cap_release_work)) { 2165 dout("cap release work queued\n"); 2166 } else { 2167 ceph_put_mds_session(session); 2168 dout("failed to queue cap release work\n"); 2169 } 2170 } 2171 2172 /* 2173 * caller holds session->s_cap_lock 2174 */ 2175 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2176 struct ceph_cap *cap) 2177 { 2178 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2179 session->s_num_cap_releases++; 2180 2181 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2182 ceph_flush_cap_releases(session->s_mdsc, session); 2183 } 2184 2185 static void ceph_cap_reclaim_work(struct work_struct *work) 2186 { 2187 struct ceph_mds_client *mdsc = 2188 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2189 int ret = ceph_trim_dentries(mdsc); 2190 if (ret == -EAGAIN) 2191 ceph_queue_cap_reclaim_work(mdsc); 2192 } 2193 2194 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2195 { 2196 if (mdsc->stopping) 2197 return; 2198 2199 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2200 dout("caps reclaim work queued\n"); 2201 } else { 2202 dout("failed to queue caps release work\n"); 2203 } 2204 } 2205 2206 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2207 { 2208 int val; 2209 if (!nr) 2210 return; 2211 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2212 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2213 atomic_set(&mdsc->cap_reclaim_pending, 0); 2214 ceph_queue_cap_reclaim_work(mdsc); 2215 } 2216 } 2217 2218 /* 2219 * requests 2220 */ 2221 2222 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2223 struct inode *dir) 2224 { 2225 struct ceph_inode_info *ci = ceph_inode(dir); 2226 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2227 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2228 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2229 unsigned int num_entries; 2230 int order; 2231 2232 spin_lock(&ci->i_ceph_lock); 2233 num_entries = ci->i_files + ci->i_subdirs; 2234 spin_unlock(&ci->i_ceph_lock); 2235 num_entries = max(num_entries, 1U); 2236 num_entries = min(num_entries, opt->max_readdir); 2237 2238 order = get_order(size * num_entries); 2239 while (order >= 0) { 2240 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2241 __GFP_NOWARN, 2242 order); 2243 if (rinfo->dir_entries) 2244 break; 2245 order--; 2246 } 2247 if (!rinfo->dir_entries) 2248 return -ENOMEM; 2249 2250 num_entries = (PAGE_SIZE << order) / size; 2251 num_entries = min(num_entries, opt->max_readdir); 2252 2253 rinfo->dir_buf_size = PAGE_SIZE << order; 2254 req->r_num_caps = num_entries + 1; 2255 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2256 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2257 return 0; 2258 } 2259 2260 /* 2261 * Create an mds request. 2262 */ 2263 struct ceph_mds_request * 2264 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2265 { 2266 struct ceph_mds_request *req; 2267 2268 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2269 if (!req) 2270 return ERR_PTR(-ENOMEM); 2271 2272 mutex_init(&req->r_fill_mutex); 2273 req->r_mdsc = mdsc; 2274 req->r_started = jiffies; 2275 req->r_start_latency = ktime_get(); 2276 req->r_resend_mds = -1; 2277 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2278 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2279 req->r_fmode = -1; 2280 kref_init(&req->r_kref); 2281 RB_CLEAR_NODE(&req->r_node); 2282 INIT_LIST_HEAD(&req->r_wait); 2283 init_completion(&req->r_completion); 2284 init_completion(&req->r_safe_completion); 2285 INIT_LIST_HEAD(&req->r_unsafe_item); 2286 2287 ktime_get_coarse_real_ts64(&req->r_stamp); 2288 2289 req->r_op = op; 2290 req->r_direct_mode = mode; 2291 return req; 2292 } 2293 2294 /* 2295 * return oldest (lowest) request, tid in request tree, 0 if none. 2296 * 2297 * called under mdsc->mutex. 2298 */ 2299 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2300 { 2301 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2302 return NULL; 2303 return rb_entry(rb_first(&mdsc->request_tree), 2304 struct ceph_mds_request, r_node); 2305 } 2306 2307 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2308 { 2309 return mdsc->oldest_tid; 2310 } 2311 2312 /* 2313 * Build a dentry's path. Allocate on heap; caller must kfree. Based 2314 * on build_path_from_dentry in fs/cifs/dir.c. 2315 * 2316 * If @stop_on_nosnap, generate path relative to the first non-snapped 2317 * inode. 2318 * 2319 * Encode hidden .snap dirs as a double /, i.e. 2320 * foo/.snap/bar -> foo//bar 2321 */ 2322 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, 2323 int stop_on_nosnap) 2324 { 2325 struct dentry *temp; 2326 char *path; 2327 int pos; 2328 unsigned seq; 2329 u64 base; 2330 2331 if (!dentry) 2332 return ERR_PTR(-EINVAL); 2333 2334 path = __getname(); 2335 if (!path) 2336 return ERR_PTR(-ENOMEM); 2337 retry: 2338 pos = PATH_MAX - 1; 2339 path[pos] = '\0'; 2340 2341 seq = read_seqbegin(&rename_lock); 2342 rcu_read_lock(); 2343 temp = dentry; 2344 for (;;) { 2345 struct inode *inode; 2346 2347 spin_lock(&temp->d_lock); 2348 inode = d_inode(temp); 2349 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2350 dout("build_path path+%d: %p SNAPDIR\n", 2351 pos, temp); 2352 } else if (stop_on_nosnap && inode && dentry != temp && 2353 ceph_snap(inode) == CEPH_NOSNAP) { 2354 spin_unlock(&temp->d_lock); 2355 pos++; /* get rid of any prepended '/' */ 2356 break; 2357 } else { 2358 pos -= temp->d_name.len; 2359 if (pos < 0) { 2360 spin_unlock(&temp->d_lock); 2361 break; 2362 } 2363 memcpy(path + pos, temp->d_name.name, temp->d_name.len); 2364 } 2365 spin_unlock(&temp->d_lock); 2366 temp = READ_ONCE(temp->d_parent); 2367 2368 /* Are we at the root? */ 2369 if (IS_ROOT(temp)) 2370 break; 2371 2372 /* Are we out of buffer? */ 2373 if (--pos < 0) 2374 break; 2375 2376 path[pos] = '/'; 2377 } 2378 base = ceph_ino(d_inode(temp)); 2379 rcu_read_unlock(); 2380 2381 if (read_seqretry(&rename_lock, seq)) 2382 goto retry; 2383 2384 if (pos < 0) { 2385 /* 2386 * A rename didn't occur, but somehow we didn't end up where 2387 * we thought we would. Throw a warning and try again. 2388 */ 2389 pr_warn("build_path did not end path lookup where " 2390 "expected, pos is %d\n", pos); 2391 goto retry; 2392 } 2393 2394 *pbase = base; 2395 *plen = PATH_MAX - 1 - pos; 2396 dout("build_path on %p %d built %llx '%.*s'\n", 2397 dentry, d_count(dentry), base, *plen, path + pos); 2398 return path + pos; 2399 } 2400 2401 static int build_dentry_path(struct dentry *dentry, struct inode *dir, 2402 const char **ppath, int *ppathlen, u64 *pino, 2403 bool *pfreepath, bool parent_locked) 2404 { 2405 char *path; 2406 2407 rcu_read_lock(); 2408 if (!dir) 2409 dir = d_inode_rcu(dentry->d_parent); 2410 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { 2411 *pino = ceph_ino(dir); 2412 rcu_read_unlock(); 2413 *ppath = dentry->d_name.name; 2414 *ppathlen = dentry->d_name.len; 2415 return 0; 2416 } 2417 rcu_read_unlock(); 2418 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2419 if (IS_ERR(path)) 2420 return PTR_ERR(path); 2421 *ppath = path; 2422 *pfreepath = true; 2423 return 0; 2424 } 2425 2426 static int build_inode_path(struct inode *inode, 2427 const char **ppath, int *ppathlen, u64 *pino, 2428 bool *pfreepath) 2429 { 2430 struct dentry *dentry; 2431 char *path; 2432 2433 if (ceph_snap(inode) == CEPH_NOSNAP) { 2434 *pino = ceph_ino(inode); 2435 *ppathlen = 0; 2436 return 0; 2437 } 2438 dentry = d_find_alias(inode); 2439 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2440 dput(dentry); 2441 if (IS_ERR(path)) 2442 return PTR_ERR(path); 2443 *ppath = path; 2444 *pfreepath = true; 2445 return 0; 2446 } 2447 2448 /* 2449 * request arguments may be specified via an inode *, a dentry *, or 2450 * an explicit ino+path. 2451 */ 2452 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2453 struct inode *rdiri, const char *rpath, 2454 u64 rino, const char **ppath, int *pathlen, 2455 u64 *ino, bool *freepath, bool parent_locked) 2456 { 2457 int r = 0; 2458 2459 if (rinode) { 2460 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2461 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2462 ceph_snap(rinode)); 2463 } else if (rdentry) { 2464 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2465 freepath, parent_locked); 2466 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2467 *ppath); 2468 } else if (rpath || rino) { 2469 *ino = rino; 2470 *ppath = rpath; 2471 *pathlen = rpath ? strlen(rpath) : 0; 2472 dout(" path %.*s\n", *pathlen, rpath); 2473 } 2474 2475 return r; 2476 } 2477 2478 /* 2479 * called under mdsc->mutex 2480 */ 2481 static struct ceph_msg *create_request_message(struct ceph_mds_session *session, 2482 struct ceph_mds_request *req, 2483 bool drop_cap_releases) 2484 { 2485 int mds = session->s_mds; 2486 struct ceph_mds_client *mdsc = session->s_mdsc; 2487 struct ceph_msg *msg; 2488 struct ceph_mds_request_head_old *head; 2489 const char *path1 = NULL; 2490 const char *path2 = NULL; 2491 u64 ino1 = 0, ino2 = 0; 2492 int pathlen1 = 0, pathlen2 = 0; 2493 bool freepath1 = false, freepath2 = false; 2494 int len, i; 2495 u16 releases; 2496 void *p, *end; 2497 int ret; 2498 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); 2499 2500 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2501 req->r_parent, req->r_path1, req->r_ino1.ino, 2502 &path1, &pathlen1, &ino1, &freepath1, 2503 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2504 &req->r_req_flags)); 2505 if (ret < 0) { 2506 msg = ERR_PTR(ret); 2507 goto out; 2508 } 2509 2510 /* If r_old_dentry is set, then assume that its parent is locked */ 2511 ret = set_request_path_attr(NULL, req->r_old_dentry, 2512 req->r_old_dentry_dir, 2513 req->r_path2, req->r_ino2.ino, 2514 &path2, &pathlen2, &ino2, &freepath2, true); 2515 if (ret < 0) { 2516 msg = ERR_PTR(ret); 2517 goto out_free1; 2518 } 2519 2520 if (legacy) { 2521 /* Old style */ 2522 len = sizeof(*head); 2523 } else { 2524 /* New style: add gid_list and any later fields */ 2525 len = sizeof(struct ceph_mds_request_head) + sizeof(u32) + 2526 (sizeof(u64) * req->r_cred->group_info->ngroups); 2527 } 2528 2529 len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2530 sizeof(struct ceph_timespec); 2531 2532 /* calculate (max) length for cap releases */ 2533 len += sizeof(struct ceph_mds_request_release) * 2534 (!!req->r_inode_drop + !!req->r_dentry_drop + 2535 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2536 2537 if (req->r_dentry_drop) 2538 len += pathlen1; 2539 if (req->r_old_dentry_drop) 2540 len += pathlen2; 2541 2542 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2543 if (!msg) { 2544 msg = ERR_PTR(-ENOMEM); 2545 goto out_free2; 2546 } 2547 2548 msg->hdr.tid = cpu_to_le64(req->r_tid); 2549 2550 /* 2551 * The old ceph_mds_request_header didn't contain a version field, and 2552 * one was added when we moved the message version from 3->4. 2553 */ 2554 if (legacy) { 2555 msg->hdr.version = cpu_to_le16(3); 2556 head = msg->front.iov_base; 2557 p = msg->front.iov_base + sizeof(*head); 2558 } else { 2559 struct ceph_mds_request_head *new_head = msg->front.iov_base; 2560 2561 msg->hdr.version = cpu_to_le16(4); 2562 new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); 2563 head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2564 p = msg->front.iov_base + sizeof(*new_head); 2565 } 2566 2567 end = msg->front.iov_base + msg->front.iov_len; 2568 2569 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2570 head->op = cpu_to_le32(req->r_op); 2571 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, 2572 req->r_cred->fsuid)); 2573 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, 2574 req->r_cred->fsgid)); 2575 head->ino = cpu_to_le64(req->r_deleg_ino); 2576 head->args = req->r_args; 2577 2578 ceph_encode_filepath(&p, end, ino1, path1); 2579 ceph_encode_filepath(&p, end, ino2, path2); 2580 2581 /* make note of release offset, in case we need to replay */ 2582 req->r_request_release_offset = p - msg->front.iov_base; 2583 2584 /* cap releases */ 2585 releases = 0; 2586 if (req->r_inode_drop) 2587 releases += ceph_encode_inode_release(&p, 2588 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2589 mds, req->r_inode_drop, req->r_inode_unless, 2590 req->r_op == CEPH_MDS_OP_READDIR); 2591 if (req->r_dentry_drop) 2592 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2593 req->r_parent, mds, req->r_dentry_drop, 2594 req->r_dentry_unless); 2595 if (req->r_old_dentry_drop) 2596 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2597 req->r_old_dentry_dir, mds, 2598 req->r_old_dentry_drop, 2599 req->r_old_dentry_unless); 2600 if (req->r_old_inode_drop) 2601 releases += ceph_encode_inode_release(&p, 2602 d_inode(req->r_old_dentry), 2603 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2604 2605 if (drop_cap_releases) { 2606 releases = 0; 2607 p = msg->front.iov_base + req->r_request_release_offset; 2608 } 2609 2610 head->num_releases = cpu_to_le16(releases); 2611 2612 /* time stamp */ 2613 { 2614 struct ceph_timespec ts; 2615 ceph_encode_timespec64(&ts, &req->r_stamp); 2616 ceph_encode_copy(&p, &ts, sizeof(ts)); 2617 } 2618 2619 /* gid list */ 2620 if (!legacy) { 2621 ceph_encode_32(&p, req->r_cred->group_info->ngroups); 2622 for (i = 0; i < req->r_cred->group_info->ngroups; i++) 2623 ceph_encode_64(&p, from_kgid(&init_user_ns, 2624 req->r_cred->group_info->gid[i])); 2625 } 2626 2627 if (WARN_ON_ONCE(p > end)) { 2628 ceph_msg_put(msg); 2629 msg = ERR_PTR(-ERANGE); 2630 goto out_free2; 2631 } 2632 2633 msg->front.iov_len = p - msg->front.iov_base; 2634 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2635 2636 if (req->r_pagelist) { 2637 struct ceph_pagelist *pagelist = req->r_pagelist; 2638 ceph_msg_data_add_pagelist(msg, pagelist); 2639 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2640 } else { 2641 msg->hdr.data_len = 0; 2642 } 2643 2644 msg->hdr.data_off = cpu_to_le16(0); 2645 2646 out_free2: 2647 if (freepath2) 2648 ceph_mdsc_free_path((char *)path2, pathlen2); 2649 out_free1: 2650 if (freepath1) 2651 ceph_mdsc_free_path((char *)path1, pathlen1); 2652 out: 2653 return msg; 2654 } 2655 2656 /* 2657 * called under mdsc->mutex if error, under no mutex if 2658 * success. 2659 */ 2660 static void complete_request(struct ceph_mds_client *mdsc, 2661 struct ceph_mds_request *req) 2662 { 2663 req->r_end_latency = ktime_get(); 2664 2665 if (req->r_callback) 2666 req->r_callback(mdsc, req); 2667 complete_all(&req->r_completion); 2668 } 2669 2670 static struct ceph_mds_request_head_old * 2671 find_old_request_head(void *p, u64 features) 2672 { 2673 bool legacy = !(features & CEPH_FEATURE_FS_BTIME); 2674 struct ceph_mds_request_head *new_head; 2675 2676 if (legacy) 2677 return (struct ceph_mds_request_head_old *)p; 2678 new_head = (struct ceph_mds_request_head *)p; 2679 return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2680 } 2681 2682 /* 2683 * called under mdsc->mutex 2684 */ 2685 static int __prepare_send_request(struct ceph_mds_session *session, 2686 struct ceph_mds_request *req, 2687 bool drop_cap_releases) 2688 { 2689 int mds = session->s_mds; 2690 struct ceph_mds_client *mdsc = session->s_mdsc; 2691 struct ceph_mds_request_head_old *rhead; 2692 struct ceph_msg *msg; 2693 int flags = 0; 2694 2695 req->r_attempts++; 2696 if (req->r_inode) { 2697 struct ceph_cap *cap = 2698 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2699 2700 if (cap) 2701 req->r_sent_on_mseq = cap->mseq; 2702 else 2703 req->r_sent_on_mseq = -1; 2704 } 2705 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 2706 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2707 2708 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2709 void *p; 2710 2711 /* 2712 * Replay. Do not regenerate message (and rebuild 2713 * paths, etc.); just use the original message. 2714 * Rebuilding paths will break for renames because 2715 * d_move mangles the src name. 2716 */ 2717 msg = req->r_request; 2718 rhead = find_old_request_head(msg->front.iov_base, 2719 session->s_con.peer_features); 2720 2721 flags = le32_to_cpu(rhead->flags); 2722 flags |= CEPH_MDS_FLAG_REPLAY; 2723 rhead->flags = cpu_to_le32(flags); 2724 2725 if (req->r_target_inode) 2726 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2727 2728 rhead->num_retry = req->r_attempts - 1; 2729 2730 /* remove cap/dentry releases from message */ 2731 rhead->num_releases = 0; 2732 2733 /* time stamp */ 2734 p = msg->front.iov_base + req->r_request_release_offset; 2735 { 2736 struct ceph_timespec ts; 2737 ceph_encode_timespec64(&ts, &req->r_stamp); 2738 ceph_encode_copy(&p, &ts, sizeof(ts)); 2739 } 2740 2741 msg->front.iov_len = p - msg->front.iov_base; 2742 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2743 return 0; 2744 } 2745 2746 if (req->r_request) { 2747 ceph_msg_put(req->r_request); 2748 req->r_request = NULL; 2749 } 2750 msg = create_request_message(session, req, drop_cap_releases); 2751 if (IS_ERR(msg)) { 2752 req->r_err = PTR_ERR(msg); 2753 return PTR_ERR(msg); 2754 } 2755 req->r_request = msg; 2756 2757 rhead = find_old_request_head(msg->front.iov_base, 2758 session->s_con.peer_features); 2759 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2760 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2761 flags |= CEPH_MDS_FLAG_REPLAY; 2762 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 2763 flags |= CEPH_MDS_FLAG_ASYNC; 2764 if (req->r_parent) 2765 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2766 rhead->flags = cpu_to_le32(flags); 2767 rhead->num_fwd = req->r_num_fwd; 2768 rhead->num_retry = req->r_attempts - 1; 2769 2770 dout(" r_parent = %p\n", req->r_parent); 2771 return 0; 2772 } 2773 2774 /* 2775 * called under mdsc->mutex 2776 */ 2777 static int __send_request(struct ceph_mds_session *session, 2778 struct ceph_mds_request *req, 2779 bool drop_cap_releases) 2780 { 2781 int err; 2782 2783 err = __prepare_send_request(session, req, drop_cap_releases); 2784 if (!err) { 2785 ceph_msg_get(req->r_request); 2786 ceph_con_send(&session->s_con, req->r_request); 2787 } 2788 2789 return err; 2790 } 2791 2792 /* 2793 * send request, or put it on the appropriate wait list. 2794 */ 2795 static void __do_request(struct ceph_mds_client *mdsc, 2796 struct ceph_mds_request *req) 2797 { 2798 struct ceph_mds_session *session = NULL; 2799 int mds = -1; 2800 int err = 0; 2801 bool random; 2802 2803 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2804 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2805 __unregister_request(mdsc, req); 2806 return; 2807 } 2808 2809 if (req->r_timeout && 2810 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2811 dout("do_request timed out\n"); 2812 err = -ETIMEDOUT; 2813 goto finish; 2814 } 2815 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2816 dout("do_request forced umount\n"); 2817 err = -EIO; 2818 goto finish; 2819 } 2820 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2821 if (mdsc->mdsmap_err) { 2822 err = mdsc->mdsmap_err; 2823 dout("do_request mdsmap err %d\n", err); 2824 goto finish; 2825 } 2826 if (mdsc->mdsmap->m_epoch == 0) { 2827 dout("do_request no mdsmap, waiting for map\n"); 2828 list_add(&req->r_wait, &mdsc->waiting_for_map); 2829 return; 2830 } 2831 if (!(mdsc->fsc->mount_options->flags & 2832 CEPH_MOUNT_OPT_MOUNTWAIT) && 2833 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2834 err = -EHOSTUNREACH; 2835 goto finish; 2836 } 2837 } 2838 2839 put_request_session(req); 2840 2841 mds = __choose_mds(mdsc, req, &random); 2842 if (mds < 0 || 2843 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2844 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2845 err = -EJUKEBOX; 2846 goto finish; 2847 } 2848 dout("do_request no mds or not active, waiting for map\n"); 2849 list_add(&req->r_wait, &mdsc->waiting_for_map); 2850 return; 2851 } 2852 2853 /* get, open session */ 2854 session = __ceph_lookup_mds_session(mdsc, mds); 2855 if (!session) { 2856 session = register_session(mdsc, mds); 2857 if (IS_ERR(session)) { 2858 err = PTR_ERR(session); 2859 goto finish; 2860 } 2861 } 2862 req->r_session = ceph_get_mds_session(session); 2863 2864 dout("do_request mds%d session %p state %s\n", mds, session, 2865 ceph_session_state_name(session->s_state)); 2866 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2867 session->s_state != CEPH_MDS_SESSION_HUNG) { 2868 /* 2869 * We cannot queue async requests since the caps and delegated 2870 * inodes are bound to the session. Just return -EJUKEBOX and 2871 * let the caller retry a sync request in that case. 2872 */ 2873 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2874 err = -EJUKEBOX; 2875 goto out_session; 2876 } 2877 2878 /* 2879 * If the session has been REJECTED, then return a hard error, 2880 * unless it's a CLEANRECOVER mount, in which case we'll queue 2881 * it to the mdsc queue. 2882 */ 2883 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2884 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) 2885 list_add(&req->r_wait, &mdsc->waiting_for_map); 2886 else 2887 err = -EACCES; 2888 goto out_session; 2889 } 2890 2891 if (session->s_state == CEPH_MDS_SESSION_NEW || 2892 session->s_state == CEPH_MDS_SESSION_CLOSING) { 2893 err = __open_session(mdsc, session); 2894 if (err) 2895 goto out_session; 2896 /* retry the same mds later */ 2897 if (random) 2898 req->r_resend_mds = mds; 2899 } 2900 list_add(&req->r_wait, &session->s_waiting); 2901 goto out_session; 2902 } 2903 2904 /* send request */ 2905 req->r_resend_mds = -1; /* forget any previous mds hint */ 2906 2907 if (req->r_request_started == 0) /* note request start time */ 2908 req->r_request_started = jiffies; 2909 2910 err = __send_request(session, req, false); 2911 2912 out_session: 2913 ceph_put_mds_session(session); 2914 finish: 2915 if (err) { 2916 dout("__do_request early error %d\n", err); 2917 req->r_err = err; 2918 complete_request(mdsc, req); 2919 __unregister_request(mdsc, req); 2920 } 2921 return; 2922 } 2923 2924 /* 2925 * called under mdsc->mutex 2926 */ 2927 static void __wake_requests(struct ceph_mds_client *mdsc, 2928 struct list_head *head) 2929 { 2930 struct ceph_mds_request *req; 2931 LIST_HEAD(tmp_list); 2932 2933 list_splice_init(head, &tmp_list); 2934 2935 while (!list_empty(&tmp_list)) { 2936 req = list_entry(tmp_list.next, 2937 struct ceph_mds_request, r_wait); 2938 list_del_init(&req->r_wait); 2939 dout(" wake request %p tid %llu\n", req, req->r_tid); 2940 __do_request(mdsc, req); 2941 } 2942 } 2943 2944 /* 2945 * Wake up threads with requests pending for @mds, so that they can 2946 * resubmit their requests to a possibly different mds. 2947 */ 2948 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 2949 { 2950 struct ceph_mds_request *req; 2951 struct rb_node *p = rb_first(&mdsc->request_tree); 2952 2953 dout("kick_requests mds%d\n", mds); 2954 while (p) { 2955 req = rb_entry(p, struct ceph_mds_request, r_node); 2956 p = rb_next(p); 2957 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2958 continue; 2959 if (req->r_attempts > 0) 2960 continue; /* only new requests */ 2961 if (req->r_session && 2962 req->r_session->s_mds == mds) { 2963 dout(" kicking tid %llu\n", req->r_tid); 2964 list_del_init(&req->r_wait); 2965 __do_request(mdsc, req); 2966 } 2967 } 2968 } 2969 2970 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 2971 struct ceph_mds_request *req) 2972 { 2973 int err = 0; 2974 2975 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 2976 if (req->r_inode) 2977 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2978 if (req->r_parent) { 2979 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 2980 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 2981 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 2982 spin_lock(&ci->i_ceph_lock); 2983 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 2984 __ceph_touch_fmode(ci, mdsc, fmode); 2985 spin_unlock(&ci->i_ceph_lock); 2986 ihold(req->r_parent); 2987 } 2988 if (req->r_old_dentry_dir) 2989 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2990 CEPH_CAP_PIN); 2991 2992 if (req->r_inode) { 2993 err = ceph_wait_on_async_create(req->r_inode); 2994 if (err) { 2995 dout("%s: wait for async create returned: %d\n", 2996 __func__, err); 2997 return err; 2998 } 2999 } 3000 3001 if (!err && req->r_old_inode) { 3002 err = ceph_wait_on_async_create(req->r_old_inode); 3003 if (err) { 3004 dout("%s: wait for async create returned: %d\n", 3005 __func__, err); 3006 return err; 3007 } 3008 } 3009 3010 dout("submit_request on %p for inode %p\n", req, dir); 3011 mutex_lock(&mdsc->mutex); 3012 __register_request(mdsc, req, dir); 3013 __do_request(mdsc, req); 3014 err = req->r_err; 3015 mutex_unlock(&mdsc->mutex); 3016 return err; 3017 } 3018 3019 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 3020 struct ceph_mds_request *req) 3021 { 3022 int err; 3023 3024 /* wait */ 3025 dout("do_request waiting\n"); 3026 if (!req->r_timeout && req->r_wait_for_completion) { 3027 err = req->r_wait_for_completion(mdsc, req); 3028 } else { 3029 long timeleft = wait_for_completion_killable_timeout( 3030 &req->r_completion, 3031 ceph_timeout_jiffies(req->r_timeout)); 3032 if (timeleft > 0) 3033 err = 0; 3034 else if (!timeleft) 3035 err = -ETIMEDOUT; /* timed out */ 3036 else 3037 err = timeleft; /* killed */ 3038 } 3039 dout("do_request waited, got %d\n", err); 3040 mutex_lock(&mdsc->mutex); 3041 3042 /* only abort if we didn't race with a real reply */ 3043 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3044 err = le32_to_cpu(req->r_reply_info.head->result); 3045 } else if (err < 0) { 3046 dout("aborted request %lld with %d\n", req->r_tid, err); 3047 3048 /* 3049 * ensure we aren't running concurrently with 3050 * ceph_fill_trace or ceph_readdir_prepopulate, which 3051 * rely on locks (dir mutex) held by our caller. 3052 */ 3053 mutex_lock(&req->r_fill_mutex); 3054 req->r_err = err; 3055 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3056 mutex_unlock(&req->r_fill_mutex); 3057 3058 if (req->r_parent && 3059 (req->r_op & CEPH_MDS_OP_WRITE)) 3060 ceph_invalidate_dir_request(req); 3061 } else { 3062 err = req->r_err; 3063 } 3064 3065 mutex_unlock(&mdsc->mutex); 3066 return err; 3067 } 3068 3069 /* 3070 * Synchrously perform an mds request. Take care of all of the 3071 * session setup, forwarding, retry details. 3072 */ 3073 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3074 struct inode *dir, 3075 struct ceph_mds_request *req) 3076 { 3077 int err; 3078 3079 dout("do_request on %p\n", req); 3080 3081 /* issue */ 3082 err = ceph_mdsc_submit_request(mdsc, dir, req); 3083 if (!err) 3084 err = ceph_mdsc_wait_request(mdsc, req); 3085 dout("do_request %p done, result %d\n", req, err); 3086 return err; 3087 } 3088 3089 /* 3090 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3091 * namespace request. 3092 */ 3093 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3094 { 3095 struct inode *dir = req->r_parent; 3096 struct inode *old_dir = req->r_old_dentry_dir; 3097 3098 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 3099 3100 ceph_dir_clear_complete(dir); 3101 if (old_dir) 3102 ceph_dir_clear_complete(old_dir); 3103 if (req->r_dentry) 3104 ceph_invalidate_dentry_lease(req->r_dentry); 3105 if (req->r_old_dentry) 3106 ceph_invalidate_dentry_lease(req->r_old_dentry); 3107 } 3108 3109 /* 3110 * Handle mds reply. 3111 * 3112 * We take the session mutex and parse and process the reply immediately. 3113 * This preserves the logical ordering of replies, capabilities, etc., sent 3114 * by the MDS as they are applied to our local cache. 3115 */ 3116 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3117 { 3118 struct ceph_mds_client *mdsc = session->s_mdsc; 3119 struct ceph_mds_request *req; 3120 struct ceph_mds_reply_head *head = msg->front.iov_base; 3121 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3122 struct ceph_snap_realm *realm; 3123 u64 tid; 3124 int err, result; 3125 int mds = session->s_mds; 3126 3127 if (msg->front.iov_len < sizeof(*head)) { 3128 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 3129 ceph_msg_dump(msg); 3130 return; 3131 } 3132 3133 /* get request, session */ 3134 tid = le64_to_cpu(msg->hdr.tid); 3135 mutex_lock(&mdsc->mutex); 3136 req = lookup_get_request(mdsc, tid); 3137 if (!req) { 3138 dout("handle_reply on unknown tid %llu\n", tid); 3139 mutex_unlock(&mdsc->mutex); 3140 return; 3141 } 3142 dout("handle_reply %p\n", req); 3143 3144 /* correct session? */ 3145 if (req->r_session != session) { 3146 pr_err("mdsc_handle_reply got %llu on session mds%d" 3147 " not mds%d\n", tid, session->s_mds, 3148 req->r_session ? req->r_session->s_mds : -1); 3149 mutex_unlock(&mdsc->mutex); 3150 goto out; 3151 } 3152 3153 /* dup? */ 3154 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3155 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3156 pr_warn("got a dup %s reply on %llu from mds%d\n", 3157 head->safe ? "safe" : "unsafe", tid, mds); 3158 mutex_unlock(&mdsc->mutex); 3159 goto out; 3160 } 3161 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3162 pr_warn("got unsafe after safe on %llu from mds%d\n", 3163 tid, mds); 3164 mutex_unlock(&mdsc->mutex); 3165 goto out; 3166 } 3167 3168 result = le32_to_cpu(head->result); 3169 3170 /* 3171 * Handle an ESTALE 3172 * if we're not talking to the authority, send to them 3173 * if the authority has changed while we weren't looking, 3174 * send to new authority 3175 * Otherwise we just have to return an ESTALE 3176 */ 3177 if (result == -ESTALE) { 3178 dout("got ESTALE on request %llu\n", req->r_tid); 3179 req->r_resend_mds = -1; 3180 if (req->r_direct_mode != USE_AUTH_MDS) { 3181 dout("not using auth, setting for that now\n"); 3182 req->r_direct_mode = USE_AUTH_MDS; 3183 __do_request(mdsc, req); 3184 mutex_unlock(&mdsc->mutex); 3185 goto out; 3186 } else { 3187 int mds = __choose_mds(mdsc, req, NULL); 3188 if (mds >= 0 && mds != req->r_session->s_mds) { 3189 dout("but auth changed, so resending\n"); 3190 __do_request(mdsc, req); 3191 mutex_unlock(&mdsc->mutex); 3192 goto out; 3193 } 3194 } 3195 dout("have to return ESTALE on request %llu\n", req->r_tid); 3196 } 3197 3198 3199 if (head->safe) { 3200 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3201 __unregister_request(mdsc, req); 3202 3203 /* last request during umount? */ 3204 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3205 complete_all(&mdsc->safe_umount_waiters); 3206 3207 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3208 /* 3209 * We already handled the unsafe response, now do the 3210 * cleanup. No need to examine the response; the MDS 3211 * doesn't include any result info in the safe 3212 * response. And even if it did, there is nothing 3213 * useful we could do with a revised return value. 3214 */ 3215 dout("got safe reply %llu, mds%d\n", tid, mds); 3216 3217 mutex_unlock(&mdsc->mutex); 3218 goto out; 3219 } 3220 } else { 3221 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3222 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3223 } 3224 3225 dout("handle_reply tid %lld result %d\n", tid, result); 3226 rinfo = &req->r_reply_info; 3227 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3228 err = parse_reply_info(session, msg, rinfo, (u64)-1); 3229 else 3230 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); 3231 mutex_unlock(&mdsc->mutex); 3232 3233 /* Must find target inode outside of mutexes to avoid deadlocks */ 3234 if ((err >= 0) && rinfo->head->is_target) { 3235 struct inode *in; 3236 struct ceph_vino tvino = { 3237 .ino = le64_to_cpu(rinfo->targeti.in->ino), 3238 .snap = le64_to_cpu(rinfo->targeti.in->snapid) 3239 }; 3240 3241 in = ceph_get_inode(mdsc->fsc->sb, tvino); 3242 if (IS_ERR(in)) { 3243 err = PTR_ERR(in); 3244 mutex_lock(&session->s_mutex); 3245 goto out_err; 3246 } 3247 req->r_target_inode = in; 3248 } 3249 3250 mutex_lock(&session->s_mutex); 3251 if (err < 0) { 3252 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 3253 ceph_msg_dump(msg); 3254 goto out_err; 3255 } 3256 3257 /* snap trace */ 3258 realm = NULL; 3259 if (rinfo->snapblob_len) { 3260 down_write(&mdsc->snap_rwsem); 3261 ceph_update_snap_trace(mdsc, rinfo->snapblob, 3262 rinfo->snapblob + rinfo->snapblob_len, 3263 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3264 &realm); 3265 downgrade_write(&mdsc->snap_rwsem); 3266 } else { 3267 down_read(&mdsc->snap_rwsem); 3268 } 3269 3270 /* insert trace into our cache */ 3271 mutex_lock(&req->r_fill_mutex); 3272 current->journal_info = req; 3273 err = ceph_fill_trace(mdsc->fsc->sb, req); 3274 if (err == 0) { 3275 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3276 req->r_op == CEPH_MDS_OP_LSSNAP)) 3277 ceph_readdir_prepopulate(req, req->r_session); 3278 } 3279 current->journal_info = NULL; 3280 mutex_unlock(&req->r_fill_mutex); 3281 3282 up_read(&mdsc->snap_rwsem); 3283 if (realm) 3284 ceph_put_snap_realm(mdsc, realm); 3285 3286 if (err == 0) { 3287 if (req->r_target_inode && 3288 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3289 struct ceph_inode_info *ci = 3290 ceph_inode(req->r_target_inode); 3291 spin_lock(&ci->i_unsafe_lock); 3292 list_add_tail(&req->r_unsafe_target_item, 3293 &ci->i_unsafe_iops); 3294 spin_unlock(&ci->i_unsafe_lock); 3295 } 3296 3297 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3298 } 3299 out_err: 3300 mutex_lock(&mdsc->mutex); 3301 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3302 if (err) { 3303 req->r_err = err; 3304 } else { 3305 req->r_reply = ceph_msg_get(msg); 3306 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3307 } 3308 } else { 3309 dout("reply arrived after request %lld was aborted\n", tid); 3310 } 3311 mutex_unlock(&mdsc->mutex); 3312 3313 mutex_unlock(&session->s_mutex); 3314 3315 /* kick calling process */ 3316 complete_request(mdsc, req); 3317 3318 ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency, 3319 req->r_end_latency, err); 3320 out: 3321 ceph_mdsc_put_request(req); 3322 return; 3323 } 3324 3325 3326 3327 /* 3328 * handle mds notification that our request has been forwarded. 3329 */ 3330 static void handle_forward(struct ceph_mds_client *mdsc, 3331 struct ceph_mds_session *session, 3332 struct ceph_msg *msg) 3333 { 3334 struct ceph_mds_request *req; 3335 u64 tid = le64_to_cpu(msg->hdr.tid); 3336 u32 next_mds; 3337 u32 fwd_seq; 3338 int err = -EINVAL; 3339 void *p = msg->front.iov_base; 3340 void *end = p + msg->front.iov_len; 3341 3342 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3343 next_mds = ceph_decode_32(&p); 3344 fwd_seq = ceph_decode_32(&p); 3345 3346 mutex_lock(&mdsc->mutex); 3347 req = lookup_get_request(mdsc, tid); 3348 if (!req) { 3349 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 3350 goto out; /* dup reply? */ 3351 } 3352 3353 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3354 dout("forward tid %llu aborted, unregistering\n", tid); 3355 __unregister_request(mdsc, req); 3356 } else if (fwd_seq <= req->r_num_fwd) { 3357 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 3358 tid, next_mds, req->r_num_fwd, fwd_seq); 3359 } else { 3360 /* resend. forward race not possible; mds would drop */ 3361 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3362 BUG_ON(req->r_err); 3363 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3364 req->r_attempts = 0; 3365 req->r_num_fwd = fwd_seq; 3366 req->r_resend_mds = next_mds; 3367 put_request_session(req); 3368 __do_request(mdsc, req); 3369 } 3370 ceph_mdsc_put_request(req); 3371 out: 3372 mutex_unlock(&mdsc->mutex); 3373 return; 3374 3375 bad: 3376 pr_err("mdsc_handle_forward decode error err=%d\n", err); 3377 } 3378 3379 static int __decode_session_metadata(void **p, void *end, 3380 bool *blocklisted) 3381 { 3382 /* map<string,string> */ 3383 u32 n; 3384 bool err_str; 3385 ceph_decode_32_safe(p, end, n, bad); 3386 while (n-- > 0) { 3387 u32 len; 3388 ceph_decode_32_safe(p, end, len, bad); 3389 ceph_decode_need(p, end, len, bad); 3390 err_str = !strncmp(*p, "error_string", len); 3391 *p += len; 3392 ceph_decode_32_safe(p, end, len, bad); 3393 ceph_decode_need(p, end, len, bad); 3394 /* 3395 * Match "blocklisted (blacklisted)" from newer MDSes, 3396 * or "blacklisted" from older MDSes. 3397 */ 3398 if (err_str && strnstr(*p, "blacklisted", len)) 3399 *blocklisted = true; 3400 *p += len; 3401 } 3402 return 0; 3403 bad: 3404 return -1; 3405 } 3406 3407 /* 3408 * handle a mds session control message 3409 */ 3410 static void handle_session(struct ceph_mds_session *session, 3411 struct ceph_msg *msg) 3412 { 3413 struct ceph_mds_client *mdsc = session->s_mdsc; 3414 int mds = session->s_mds; 3415 int msg_version = le16_to_cpu(msg->hdr.version); 3416 void *p = msg->front.iov_base; 3417 void *end = p + msg->front.iov_len; 3418 struct ceph_mds_session_head *h; 3419 u32 op; 3420 u64 seq, features = 0; 3421 int wake = 0; 3422 bool blocklisted = false; 3423 3424 /* decode */ 3425 ceph_decode_need(&p, end, sizeof(*h), bad); 3426 h = p; 3427 p += sizeof(*h); 3428 3429 op = le32_to_cpu(h->op); 3430 seq = le64_to_cpu(h->seq); 3431 3432 if (msg_version >= 3) { 3433 u32 len; 3434 /* version >= 2, metadata */ 3435 if (__decode_session_metadata(&p, end, &blocklisted) < 0) 3436 goto bad; 3437 /* version >= 3, feature bits */ 3438 ceph_decode_32_safe(&p, end, len, bad); 3439 if (len) { 3440 ceph_decode_64_safe(&p, end, features, bad); 3441 p += len - sizeof(features); 3442 } 3443 } 3444 3445 mutex_lock(&mdsc->mutex); 3446 if (op == CEPH_SESSION_CLOSE) { 3447 ceph_get_mds_session(session); 3448 __unregister_session(mdsc, session); 3449 } 3450 /* FIXME: this ttl calculation is generous */ 3451 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 3452 mutex_unlock(&mdsc->mutex); 3453 3454 mutex_lock(&session->s_mutex); 3455 3456 dout("handle_session mds%d %s %p state %s seq %llu\n", 3457 mds, ceph_session_op_name(op), session, 3458 ceph_session_state_name(session->s_state), seq); 3459 3460 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 3461 session->s_state = CEPH_MDS_SESSION_OPEN; 3462 pr_info("mds%d came back\n", session->s_mds); 3463 } 3464 3465 switch (op) { 3466 case CEPH_SESSION_OPEN: 3467 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3468 pr_info("mds%d reconnect success\n", session->s_mds); 3469 session->s_state = CEPH_MDS_SESSION_OPEN; 3470 session->s_features = features; 3471 renewed_caps(mdsc, session, 0); 3472 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features)) 3473 metric_schedule_delayed(&mdsc->metric); 3474 wake = 1; 3475 if (mdsc->stopping) 3476 __close_session(mdsc, session); 3477 break; 3478 3479 case CEPH_SESSION_RENEWCAPS: 3480 if (session->s_renew_seq == seq) 3481 renewed_caps(mdsc, session, 1); 3482 break; 3483 3484 case CEPH_SESSION_CLOSE: 3485 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3486 pr_info("mds%d reconnect denied\n", session->s_mds); 3487 session->s_state = CEPH_MDS_SESSION_CLOSED; 3488 cleanup_session_requests(mdsc, session); 3489 remove_session_caps(session); 3490 wake = 2; /* for good measure */ 3491 wake_up_all(&mdsc->session_close_wq); 3492 break; 3493 3494 case CEPH_SESSION_STALE: 3495 pr_info("mds%d caps went stale, renewing\n", 3496 session->s_mds); 3497 spin_lock(&session->s_gen_ttl_lock); 3498 session->s_cap_gen++; 3499 session->s_cap_ttl = jiffies - 1; 3500 spin_unlock(&session->s_gen_ttl_lock); 3501 send_renew_caps(mdsc, session); 3502 break; 3503 3504 case CEPH_SESSION_RECALL_STATE: 3505 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 3506 break; 3507 3508 case CEPH_SESSION_FLUSHMSG: 3509 send_flushmsg_ack(mdsc, session, seq); 3510 break; 3511 3512 case CEPH_SESSION_FORCE_RO: 3513 dout("force_session_readonly %p\n", session); 3514 spin_lock(&session->s_cap_lock); 3515 session->s_readonly = true; 3516 spin_unlock(&session->s_cap_lock); 3517 wake_up_session_caps(session, FORCE_RO); 3518 break; 3519 3520 case CEPH_SESSION_REJECT: 3521 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 3522 pr_info("mds%d rejected session\n", session->s_mds); 3523 session->s_state = CEPH_MDS_SESSION_REJECTED; 3524 cleanup_session_requests(mdsc, session); 3525 remove_session_caps(session); 3526 if (blocklisted) 3527 mdsc->fsc->blocklisted = true; 3528 wake = 2; /* for good measure */ 3529 break; 3530 3531 default: 3532 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 3533 WARN_ON(1); 3534 } 3535 3536 mutex_unlock(&session->s_mutex); 3537 if (wake) { 3538 mutex_lock(&mdsc->mutex); 3539 __wake_requests(mdsc, &session->s_waiting); 3540 if (wake == 2) 3541 kick_requests(mdsc, mds); 3542 mutex_unlock(&mdsc->mutex); 3543 } 3544 if (op == CEPH_SESSION_CLOSE) 3545 ceph_put_mds_session(session); 3546 return; 3547 3548 bad: 3549 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 3550 (int)msg->front.iov_len); 3551 ceph_msg_dump(msg); 3552 return; 3553 } 3554 3555 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 3556 { 3557 int dcaps; 3558 3559 dcaps = xchg(&req->r_dir_caps, 0); 3560 if (dcaps) { 3561 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3562 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 3563 } 3564 } 3565 3566 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req) 3567 { 3568 int dcaps; 3569 3570 dcaps = xchg(&req->r_dir_caps, 0); 3571 if (dcaps) { 3572 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3573 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent), 3574 dcaps); 3575 } 3576 } 3577 3578 /* 3579 * called under session->mutex. 3580 */ 3581 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 3582 struct ceph_mds_session *session) 3583 { 3584 struct ceph_mds_request *req, *nreq; 3585 struct rb_node *p; 3586 3587 dout("replay_unsafe_requests mds%d\n", session->s_mds); 3588 3589 mutex_lock(&mdsc->mutex); 3590 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 3591 __send_request(session, req, true); 3592 3593 /* 3594 * also re-send old requests when MDS enters reconnect stage. So that MDS 3595 * can process completed request in clientreplay stage. 3596 */ 3597 p = rb_first(&mdsc->request_tree); 3598 while (p) { 3599 req = rb_entry(p, struct ceph_mds_request, r_node); 3600 p = rb_next(p); 3601 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3602 continue; 3603 if (req->r_attempts == 0) 3604 continue; /* only old requests */ 3605 if (!req->r_session) 3606 continue; 3607 if (req->r_session->s_mds != session->s_mds) 3608 continue; 3609 3610 ceph_mdsc_release_dir_caps_no_check(req); 3611 3612 __send_request(session, req, true); 3613 } 3614 mutex_unlock(&mdsc->mutex); 3615 } 3616 3617 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 3618 { 3619 struct ceph_msg *reply; 3620 struct ceph_pagelist *_pagelist; 3621 struct page *page; 3622 __le32 *addr; 3623 int err = -ENOMEM; 3624 3625 if (!recon_state->allow_multi) 3626 return -ENOSPC; 3627 3628 /* can't handle message that contains both caps and realm */ 3629 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 3630 3631 /* pre-allocate new pagelist */ 3632 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 3633 if (!_pagelist) 3634 return -ENOMEM; 3635 3636 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3637 if (!reply) 3638 goto fail_msg; 3639 3640 /* placeholder for nr_caps */ 3641 err = ceph_pagelist_encode_32(_pagelist, 0); 3642 if (err < 0) 3643 goto fail; 3644 3645 if (recon_state->nr_caps) { 3646 /* currently encoding caps */ 3647 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 3648 if (err) 3649 goto fail; 3650 } else { 3651 /* placeholder for nr_realms (currently encoding relams) */ 3652 err = ceph_pagelist_encode_32(_pagelist, 0); 3653 if (err < 0) 3654 goto fail; 3655 } 3656 3657 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 3658 if (err) 3659 goto fail; 3660 3661 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 3662 addr = kmap_atomic(page); 3663 if (recon_state->nr_caps) { 3664 /* currently encoding caps */ 3665 *addr = cpu_to_le32(recon_state->nr_caps); 3666 } else { 3667 /* currently encoding relams */ 3668 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 3669 } 3670 kunmap_atomic(addr); 3671 3672 reply->hdr.version = cpu_to_le16(5); 3673 reply->hdr.compat_version = cpu_to_le16(4); 3674 3675 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 3676 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 3677 3678 ceph_con_send(&recon_state->session->s_con, reply); 3679 ceph_pagelist_release(recon_state->pagelist); 3680 3681 recon_state->pagelist = _pagelist; 3682 recon_state->nr_caps = 0; 3683 recon_state->nr_realms = 0; 3684 recon_state->msg_version = 5; 3685 return 0; 3686 fail: 3687 ceph_msg_put(reply); 3688 fail_msg: 3689 ceph_pagelist_release(_pagelist); 3690 return err; 3691 } 3692 3693 static struct dentry* d_find_primary(struct inode *inode) 3694 { 3695 struct dentry *alias, *dn = NULL; 3696 3697 if (hlist_empty(&inode->i_dentry)) 3698 return NULL; 3699 3700 spin_lock(&inode->i_lock); 3701 if (hlist_empty(&inode->i_dentry)) 3702 goto out_unlock; 3703 3704 if (S_ISDIR(inode->i_mode)) { 3705 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias); 3706 if (!IS_ROOT(alias)) 3707 dn = dget(alias); 3708 goto out_unlock; 3709 } 3710 3711 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) { 3712 spin_lock(&alias->d_lock); 3713 if (!d_unhashed(alias) && 3714 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 3715 dn = dget_dlock(alias); 3716 } 3717 spin_unlock(&alias->d_lock); 3718 if (dn) 3719 break; 3720 } 3721 out_unlock: 3722 spin_unlock(&inode->i_lock); 3723 return dn; 3724 } 3725 3726 /* 3727 * Encode information about a cap for a reconnect with the MDS. 3728 */ 3729 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, 3730 void *arg) 3731 { 3732 union { 3733 struct ceph_mds_cap_reconnect v2; 3734 struct ceph_mds_cap_reconnect_v1 v1; 3735 } rec; 3736 struct ceph_inode_info *ci = cap->ci; 3737 struct ceph_reconnect_state *recon_state = arg; 3738 struct ceph_pagelist *pagelist = recon_state->pagelist; 3739 struct dentry *dentry; 3740 char *path; 3741 int pathlen, err; 3742 u64 pathbase; 3743 u64 snap_follows; 3744 3745 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 3746 inode, ceph_vinop(inode), cap, cap->cap_id, 3747 ceph_cap_string(cap->issued)); 3748 3749 dentry = d_find_primary(inode); 3750 if (dentry) { 3751 /* set pathbase to parent dir when msg_version >= 2 */ 3752 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 3753 recon_state->msg_version >= 2); 3754 dput(dentry); 3755 if (IS_ERR(path)) { 3756 err = PTR_ERR(path); 3757 goto out_err; 3758 } 3759 } else { 3760 path = NULL; 3761 pathlen = 0; 3762 pathbase = 0; 3763 } 3764 3765 spin_lock(&ci->i_ceph_lock); 3766 cap->seq = 0; /* reset cap seq */ 3767 cap->issue_seq = 0; /* and issue_seq */ 3768 cap->mseq = 0; /* and migrate_seq */ 3769 cap->cap_gen = cap->session->s_cap_gen; 3770 3771 /* These are lost when the session goes away */ 3772 if (S_ISDIR(inode->i_mode)) { 3773 if (cap->issued & CEPH_CAP_DIR_CREATE) { 3774 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 3775 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 3776 } 3777 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 3778 } 3779 3780 if (recon_state->msg_version >= 2) { 3781 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 3782 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3783 rec.v2.issued = cpu_to_le32(cap->issued); 3784 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3785 rec.v2.pathbase = cpu_to_le64(pathbase); 3786 rec.v2.flock_len = (__force __le32) 3787 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 3788 } else { 3789 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 3790 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3791 rec.v1.issued = cpu_to_le32(cap->issued); 3792 rec.v1.size = cpu_to_le64(inode->i_size); 3793 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 3794 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 3795 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3796 rec.v1.pathbase = cpu_to_le64(pathbase); 3797 } 3798 3799 if (list_empty(&ci->i_cap_snaps)) { 3800 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 3801 } else { 3802 struct ceph_cap_snap *capsnap = 3803 list_first_entry(&ci->i_cap_snaps, 3804 struct ceph_cap_snap, ci_item); 3805 snap_follows = capsnap->follows; 3806 } 3807 spin_unlock(&ci->i_ceph_lock); 3808 3809 if (recon_state->msg_version >= 2) { 3810 int num_fcntl_locks, num_flock_locks; 3811 struct ceph_filelock *flocks = NULL; 3812 size_t struct_len, total_len = sizeof(u64); 3813 u8 struct_v = 0; 3814 3815 encode_again: 3816 if (rec.v2.flock_len) { 3817 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 3818 } else { 3819 num_fcntl_locks = 0; 3820 num_flock_locks = 0; 3821 } 3822 if (num_fcntl_locks + num_flock_locks > 0) { 3823 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 3824 sizeof(struct ceph_filelock), 3825 GFP_NOFS); 3826 if (!flocks) { 3827 err = -ENOMEM; 3828 goto out_err; 3829 } 3830 err = ceph_encode_locks_to_buffer(inode, flocks, 3831 num_fcntl_locks, 3832 num_flock_locks); 3833 if (err) { 3834 kfree(flocks); 3835 flocks = NULL; 3836 if (err == -ENOSPC) 3837 goto encode_again; 3838 goto out_err; 3839 } 3840 } else { 3841 kfree(flocks); 3842 flocks = NULL; 3843 } 3844 3845 if (recon_state->msg_version >= 3) { 3846 /* version, compat_version and struct_len */ 3847 total_len += 2 * sizeof(u8) + sizeof(u32); 3848 struct_v = 2; 3849 } 3850 /* 3851 * number of encoded locks is stable, so copy to pagelist 3852 */ 3853 struct_len = 2 * sizeof(u32) + 3854 (num_fcntl_locks + num_flock_locks) * 3855 sizeof(struct ceph_filelock); 3856 rec.v2.flock_len = cpu_to_le32(struct_len); 3857 3858 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2); 3859 3860 if (struct_v >= 2) 3861 struct_len += sizeof(u64); /* snap_follows */ 3862 3863 total_len += struct_len; 3864 3865 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 3866 err = send_reconnect_partial(recon_state); 3867 if (err) 3868 goto out_freeflocks; 3869 pagelist = recon_state->pagelist; 3870 } 3871 3872 err = ceph_pagelist_reserve(pagelist, total_len); 3873 if (err) 3874 goto out_freeflocks; 3875 3876 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3877 if (recon_state->msg_version >= 3) { 3878 ceph_pagelist_encode_8(pagelist, struct_v); 3879 ceph_pagelist_encode_8(pagelist, 1); 3880 ceph_pagelist_encode_32(pagelist, struct_len); 3881 } 3882 ceph_pagelist_encode_string(pagelist, path, pathlen); 3883 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 3884 ceph_locks_to_pagelist(flocks, pagelist, 3885 num_fcntl_locks, num_flock_locks); 3886 if (struct_v >= 2) 3887 ceph_pagelist_encode_64(pagelist, snap_follows); 3888 out_freeflocks: 3889 kfree(flocks); 3890 } else { 3891 err = ceph_pagelist_reserve(pagelist, 3892 sizeof(u64) + sizeof(u32) + 3893 pathlen + sizeof(rec.v1)); 3894 if (err) 3895 goto out_err; 3896 3897 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3898 ceph_pagelist_encode_string(pagelist, path, pathlen); 3899 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 3900 } 3901 3902 out_err: 3903 ceph_mdsc_free_path(path, pathlen); 3904 if (!err) 3905 recon_state->nr_caps++; 3906 return err; 3907 } 3908 3909 static int encode_snap_realms(struct ceph_mds_client *mdsc, 3910 struct ceph_reconnect_state *recon_state) 3911 { 3912 struct rb_node *p; 3913 struct ceph_pagelist *pagelist = recon_state->pagelist; 3914 int err = 0; 3915 3916 if (recon_state->msg_version >= 4) { 3917 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 3918 if (err < 0) 3919 goto fail; 3920 } 3921 3922 /* 3923 * snaprealms. we provide mds with the ino, seq (version), and 3924 * parent for all of our realms. If the mds has any newer info, 3925 * it will tell us. 3926 */ 3927 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 3928 struct ceph_snap_realm *realm = 3929 rb_entry(p, struct ceph_snap_realm, node); 3930 struct ceph_mds_snaprealm_reconnect sr_rec; 3931 3932 if (recon_state->msg_version >= 4) { 3933 size_t need = sizeof(u8) * 2 + sizeof(u32) + 3934 sizeof(sr_rec); 3935 3936 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 3937 err = send_reconnect_partial(recon_state); 3938 if (err) 3939 goto fail; 3940 pagelist = recon_state->pagelist; 3941 } 3942 3943 err = ceph_pagelist_reserve(pagelist, need); 3944 if (err) 3945 goto fail; 3946 3947 ceph_pagelist_encode_8(pagelist, 1); 3948 ceph_pagelist_encode_8(pagelist, 1); 3949 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 3950 } 3951 3952 dout(" adding snap realm %llx seq %lld parent %llx\n", 3953 realm->ino, realm->seq, realm->parent_ino); 3954 sr_rec.ino = cpu_to_le64(realm->ino); 3955 sr_rec.seq = cpu_to_le64(realm->seq); 3956 sr_rec.parent = cpu_to_le64(realm->parent_ino); 3957 3958 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 3959 if (err) 3960 goto fail; 3961 3962 recon_state->nr_realms++; 3963 } 3964 fail: 3965 return err; 3966 } 3967 3968 3969 /* 3970 * If an MDS fails and recovers, clients need to reconnect in order to 3971 * reestablish shared state. This includes all caps issued through 3972 * this session _and_ the snap_realm hierarchy. Because it's not 3973 * clear which snap realms the mds cares about, we send everything we 3974 * know about.. that ensures we'll then get any new info the 3975 * recovering MDS might have. 3976 * 3977 * This is a relatively heavyweight operation, but it's rare. 3978 */ 3979 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 3980 struct ceph_mds_session *session) 3981 { 3982 struct ceph_msg *reply; 3983 int mds = session->s_mds; 3984 int err = -ENOMEM; 3985 struct ceph_reconnect_state recon_state = { 3986 .session = session, 3987 }; 3988 LIST_HEAD(dispose); 3989 3990 pr_info("mds%d reconnect start\n", mds); 3991 3992 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 3993 if (!recon_state.pagelist) 3994 goto fail_nopagelist; 3995 3996 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3997 if (!reply) 3998 goto fail_nomsg; 3999 4000 xa_destroy(&session->s_delegated_inos); 4001 4002 mutex_lock(&session->s_mutex); 4003 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 4004 session->s_seq = 0; 4005 4006 dout("session %p state %s\n", session, 4007 ceph_session_state_name(session->s_state)); 4008 4009 spin_lock(&session->s_gen_ttl_lock); 4010 session->s_cap_gen++; 4011 spin_unlock(&session->s_gen_ttl_lock); 4012 4013 spin_lock(&session->s_cap_lock); 4014 /* don't know if session is readonly */ 4015 session->s_readonly = 0; 4016 /* 4017 * notify __ceph_remove_cap() that we are composing cap reconnect. 4018 * If a cap get released before being added to the cap reconnect, 4019 * __ceph_remove_cap() should skip queuing cap release. 4020 */ 4021 session->s_cap_reconnect = 1; 4022 /* drop old cap expires; we're about to reestablish that state */ 4023 detach_cap_releases(session, &dispose); 4024 spin_unlock(&session->s_cap_lock); 4025 dispose_cap_releases(mdsc, &dispose); 4026 4027 /* trim unused caps to reduce MDS's cache rejoin time */ 4028 if (mdsc->fsc->sb->s_root) 4029 shrink_dcache_parent(mdsc->fsc->sb->s_root); 4030 4031 ceph_con_close(&session->s_con); 4032 ceph_con_open(&session->s_con, 4033 CEPH_ENTITY_TYPE_MDS, mds, 4034 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 4035 4036 /* replay unsafe requests */ 4037 replay_unsafe_requests(mdsc, session); 4038 4039 ceph_early_kick_flushing_caps(mdsc, session); 4040 4041 down_read(&mdsc->snap_rwsem); 4042 4043 /* placeholder for nr_caps */ 4044 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 4045 if (err) 4046 goto fail; 4047 4048 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 4049 recon_state.msg_version = 3; 4050 recon_state.allow_multi = true; 4051 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 4052 recon_state.msg_version = 3; 4053 } else { 4054 recon_state.msg_version = 2; 4055 } 4056 /* trsaverse this session's caps */ 4057 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 4058 4059 spin_lock(&session->s_cap_lock); 4060 session->s_cap_reconnect = 0; 4061 spin_unlock(&session->s_cap_lock); 4062 4063 if (err < 0) 4064 goto fail; 4065 4066 /* check if all realms can be encoded into current message */ 4067 if (mdsc->num_snap_realms) { 4068 size_t total_len = 4069 recon_state.pagelist->length + 4070 mdsc->num_snap_realms * 4071 sizeof(struct ceph_mds_snaprealm_reconnect); 4072 if (recon_state.msg_version >= 4) { 4073 /* number of realms */ 4074 total_len += sizeof(u32); 4075 /* version, compat_version and struct_len */ 4076 total_len += mdsc->num_snap_realms * 4077 (2 * sizeof(u8) + sizeof(u32)); 4078 } 4079 if (total_len > RECONNECT_MAX_SIZE) { 4080 if (!recon_state.allow_multi) { 4081 err = -ENOSPC; 4082 goto fail; 4083 } 4084 if (recon_state.nr_caps) { 4085 err = send_reconnect_partial(&recon_state); 4086 if (err) 4087 goto fail; 4088 } 4089 recon_state.msg_version = 5; 4090 } 4091 } 4092 4093 err = encode_snap_realms(mdsc, &recon_state); 4094 if (err < 0) 4095 goto fail; 4096 4097 if (recon_state.msg_version >= 5) { 4098 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 4099 if (err < 0) 4100 goto fail; 4101 } 4102 4103 if (recon_state.nr_caps || recon_state.nr_realms) { 4104 struct page *page = 4105 list_first_entry(&recon_state.pagelist->head, 4106 struct page, lru); 4107 __le32 *addr = kmap_atomic(page); 4108 if (recon_state.nr_caps) { 4109 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 4110 *addr = cpu_to_le32(recon_state.nr_caps); 4111 } else if (recon_state.msg_version >= 4) { 4112 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 4113 } 4114 kunmap_atomic(addr); 4115 } 4116 4117 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 4118 if (recon_state.msg_version >= 4) 4119 reply->hdr.compat_version = cpu_to_le16(4); 4120 4121 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 4122 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 4123 4124 ceph_con_send(&session->s_con, reply); 4125 4126 mutex_unlock(&session->s_mutex); 4127 4128 mutex_lock(&mdsc->mutex); 4129 __wake_requests(mdsc, &session->s_waiting); 4130 mutex_unlock(&mdsc->mutex); 4131 4132 up_read(&mdsc->snap_rwsem); 4133 ceph_pagelist_release(recon_state.pagelist); 4134 return; 4135 4136 fail: 4137 ceph_msg_put(reply); 4138 up_read(&mdsc->snap_rwsem); 4139 mutex_unlock(&session->s_mutex); 4140 fail_nomsg: 4141 ceph_pagelist_release(recon_state.pagelist); 4142 fail_nopagelist: 4143 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 4144 return; 4145 } 4146 4147 4148 /* 4149 * compare old and new mdsmaps, kicking requests 4150 * and closing out old connections as necessary 4151 * 4152 * called under mdsc->mutex. 4153 */ 4154 static void check_new_map(struct ceph_mds_client *mdsc, 4155 struct ceph_mdsmap *newmap, 4156 struct ceph_mdsmap *oldmap) 4157 { 4158 int i; 4159 int oldstate, newstate; 4160 struct ceph_mds_session *s; 4161 4162 dout("check_new_map new %u old %u\n", 4163 newmap->m_epoch, oldmap->m_epoch); 4164 4165 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4166 if (!mdsc->sessions[i]) 4167 continue; 4168 s = mdsc->sessions[i]; 4169 oldstate = ceph_mdsmap_get_state(oldmap, i); 4170 newstate = ceph_mdsmap_get_state(newmap, i); 4171 4172 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 4173 i, ceph_mds_state_name(oldstate), 4174 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 4175 ceph_mds_state_name(newstate), 4176 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 4177 ceph_session_state_name(s->s_state)); 4178 4179 if (i >= newmap->possible_max_rank) { 4180 /* force close session for stopped mds */ 4181 ceph_get_mds_session(s); 4182 __unregister_session(mdsc, s); 4183 __wake_requests(mdsc, &s->s_waiting); 4184 mutex_unlock(&mdsc->mutex); 4185 4186 mutex_lock(&s->s_mutex); 4187 cleanup_session_requests(mdsc, s); 4188 remove_session_caps(s); 4189 mutex_unlock(&s->s_mutex); 4190 4191 ceph_put_mds_session(s); 4192 4193 mutex_lock(&mdsc->mutex); 4194 kick_requests(mdsc, i); 4195 continue; 4196 } 4197 4198 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 4199 ceph_mdsmap_get_addr(newmap, i), 4200 sizeof(struct ceph_entity_addr))) { 4201 /* just close it */ 4202 mutex_unlock(&mdsc->mutex); 4203 mutex_lock(&s->s_mutex); 4204 mutex_lock(&mdsc->mutex); 4205 ceph_con_close(&s->s_con); 4206 mutex_unlock(&s->s_mutex); 4207 s->s_state = CEPH_MDS_SESSION_RESTARTING; 4208 } else if (oldstate == newstate) { 4209 continue; /* nothing new with this mds */ 4210 } 4211 4212 /* 4213 * send reconnect? 4214 */ 4215 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 4216 newstate >= CEPH_MDS_STATE_RECONNECT) { 4217 mutex_unlock(&mdsc->mutex); 4218 send_mds_reconnect(mdsc, s); 4219 mutex_lock(&mdsc->mutex); 4220 } 4221 4222 /* 4223 * kick request on any mds that has gone active. 4224 */ 4225 if (oldstate < CEPH_MDS_STATE_ACTIVE && 4226 newstate >= CEPH_MDS_STATE_ACTIVE) { 4227 if (oldstate != CEPH_MDS_STATE_CREATING && 4228 oldstate != CEPH_MDS_STATE_STARTING) 4229 pr_info("mds%d recovery completed\n", s->s_mds); 4230 kick_requests(mdsc, i); 4231 mutex_unlock(&mdsc->mutex); 4232 mutex_lock(&s->s_mutex); 4233 mutex_lock(&mdsc->mutex); 4234 ceph_kick_flushing_caps(mdsc, s); 4235 mutex_unlock(&s->s_mutex); 4236 wake_up_session_caps(s, RECONNECT); 4237 } 4238 } 4239 4240 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4241 s = mdsc->sessions[i]; 4242 if (!s) 4243 continue; 4244 if (!ceph_mdsmap_is_laggy(newmap, i)) 4245 continue; 4246 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4247 s->s_state == CEPH_MDS_SESSION_HUNG || 4248 s->s_state == CEPH_MDS_SESSION_CLOSING) { 4249 dout(" connecting to export targets of laggy mds%d\n", 4250 i); 4251 __open_export_target_sessions(mdsc, s); 4252 } 4253 } 4254 } 4255 4256 4257 4258 /* 4259 * leases 4260 */ 4261 4262 /* 4263 * caller must hold session s_mutex, dentry->d_lock 4264 */ 4265 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 4266 { 4267 struct ceph_dentry_info *di = ceph_dentry(dentry); 4268 4269 ceph_put_mds_session(di->lease_session); 4270 di->lease_session = NULL; 4271 } 4272 4273 static void handle_lease(struct ceph_mds_client *mdsc, 4274 struct ceph_mds_session *session, 4275 struct ceph_msg *msg) 4276 { 4277 struct super_block *sb = mdsc->fsc->sb; 4278 struct inode *inode; 4279 struct dentry *parent, *dentry; 4280 struct ceph_dentry_info *di; 4281 int mds = session->s_mds; 4282 struct ceph_mds_lease *h = msg->front.iov_base; 4283 u32 seq; 4284 struct ceph_vino vino; 4285 struct qstr dname; 4286 int release = 0; 4287 4288 dout("handle_lease from mds%d\n", mds); 4289 4290 /* decode */ 4291 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 4292 goto bad; 4293 vino.ino = le64_to_cpu(h->ino); 4294 vino.snap = CEPH_NOSNAP; 4295 seq = le32_to_cpu(h->seq); 4296 dname.len = get_unaligned_le32(h + 1); 4297 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 4298 goto bad; 4299 dname.name = (void *)(h + 1) + sizeof(u32); 4300 4301 /* lookup inode */ 4302 inode = ceph_find_inode(sb, vino); 4303 dout("handle_lease %s, ino %llx %p %.*s\n", 4304 ceph_lease_op_name(h->action), vino.ino, inode, 4305 dname.len, dname.name); 4306 4307 mutex_lock(&session->s_mutex); 4308 inc_session_sequence(session); 4309 4310 if (!inode) { 4311 dout("handle_lease no inode %llx\n", vino.ino); 4312 goto release; 4313 } 4314 4315 /* dentry */ 4316 parent = d_find_alias(inode); 4317 if (!parent) { 4318 dout("no parent dentry on inode %p\n", inode); 4319 WARN_ON(1); 4320 goto release; /* hrm... */ 4321 } 4322 dname.hash = full_name_hash(parent, dname.name, dname.len); 4323 dentry = d_lookup(parent, &dname); 4324 dput(parent); 4325 if (!dentry) 4326 goto release; 4327 4328 spin_lock(&dentry->d_lock); 4329 di = ceph_dentry(dentry); 4330 switch (h->action) { 4331 case CEPH_MDS_LEASE_REVOKE: 4332 if (di->lease_session == session) { 4333 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 4334 h->seq = cpu_to_le32(di->lease_seq); 4335 __ceph_mdsc_drop_dentry_lease(dentry); 4336 } 4337 release = 1; 4338 break; 4339 4340 case CEPH_MDS_LEASE_RENEW: 4341 if (di->lease_session == session && 4342 di->lease_gen == session->s_cap_gen && 4343 di->lease_renew_from && 4344 di->lease_renew_after == 0) { 4345 unsigned long duration = 4346 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 4347 4348 di->lease_seq = seq; 4349 di->time = di->lease_renew_from + duration; 4350 di->lease_renew_after = di->lease_renew_from + 4351 (duration >> 1); 4352 di->lease_renew_from = 0; 4353 } 4354 break; 4355 } 4356 spin_unlock(&dentry->d_lock); 4357 dput(dentry); 4358 4359 if (!release) 4360 goto out; 4361 4362 release: 4363 /* let's just reuse the same message */ 4364 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 4365 ceph_msg_get(msg); 4366 ceph_con_send(&session->s_con, msg); 4367 4368 out: 4369 mutex_unlock(&session->s_mutex); 4370 /* avoid calling iput_final() in mds dispatch threads */ 4371 ceph_async_iput(inode); 4372 return; 4373 4374 bad: 4375 pr_err("corrupt lease message\n"); 4376 ceph_msg_dump(msg); 4377 } 4378 4379 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 4380 struct dentry *dentry, char action, 4381 u32 seq) 4382 { 4383 struct ceph_msg *msg; 4384 struct ceph_mds_lease *lease; 4385 struct inode *dir; 4386 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 4387 4388 dout("lease_send_msg identry %p %s to mds%d\n", 4389 dentry, ceph_lease_op_name(action), session->s_mds); 4390 4391 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 4392 if (!msg) 4393 return; 4394 lease = msg->front.iov_base; 4395 lease->action = action; 4396 lease->seq = cpu_to_le32(seq); 4397 4398 spin_lock(&dentry->d_lock); 4399 dir = d_inode(dentry->d_parent); 4400 lease->ino = cpu_to_le64(ceph_ino(dir)); 4401 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 4402 4403 put_unaligned_le32(dentry->d_name.len, lease + 1); 4404 memcpy((void *)(lease + 1) + 4, 4405 dentry->d_name.name, dentry->d_name.len); 4406 spin_unlock(&dentry->d_lock); 4407 /* 4408 * if this is a preemptive lease RELEASE, no need to 4409 * flush request stream, since the actual request will 4410 * soon follow. 4411 */ 4412 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 4413 4414 ceph_con_send(&session->s_con, msg); 4415 } 4416 4417 /* 4418 * lock unlock sessions, to wait ongoing session activities 4419 */ 4420 static void lock_unlock_sessions(struct ceph_mds_client *mdsc) 4421 { 4422 int i; 4423 4424 mutex_lock(&mdsc->mutex); 4425 for (i = 0; i < mdsc->max_sessions; i++) { 4426 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4427 if (!s) 4428 continue; 4429 mutex_unlock(&mdsc->mutex); 4430 mutex_lock(&s->s_mutex); 4431 mutex_unlock(&s->s_mutex); 4432 ceph_put_mds_session(s); 4433 mutex_lock(&mdsc->mutex); 4434 } 4435 mutex_unlock(&mdsc->mutex); 4436 } 4437 4438 static void maybe_recover_session(struct ceph_mds_client *mdsc) 4439 { 4440 struct ceph_fs_client *fsc = mdsc->fsc; 4441 4442 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 4443 return; 4444 4445 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 4446 return; 4447 4448 if (!READ_ONCE(fsc->blocklisted)) 4449 return; 4450 4451 pr_info("auto reconnect after blocklisted\n"); 4452 ceph_force_reconnect(fsc->sb); 4453 } 4454 4455 bool check_session_state(struct ceph_mds_session *s) 4456 { 4457 switch (s->s_state) { 4458 case CEPH_MDS_SESSION_OPEN: 4459 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 4460 s->s_state = CEPH_MDS_SESSION_HUNG; 4461 pr_info("mds%d hung\n", s->s_mds); 4462 } 4463 break; 4464 case CEPH_MDS_SESSION_CLOSING: 4465 /* Should never reach this when we're unmounting */ 4466 WARN_ON_ONCE(true); 4467 fallthrough; 4468 case CEPH_MDS_SESSION_NEW: 4469 case CEPH_MDS_SESSION_RESTARTING: 4470 case CEPH_MDS_SESSION_CLOSED: 4471 case CEPH_MDS_SESSION_REJECTED: 4472 return false; 4473 } 4474 4475 return true; 4476 } 4477 4478 /* 4479 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 4480 * then we need to retransmit that request. 4481 */ 4482 void inc_session_sequence(struct ceph_mds_session *s) 4483 { 4484 lockdep_assert_held(&s->s_mutex); 4485 4486 s->s_seq++; 4487 4488 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 4489 int ret; 4490 4491 dout("resending session close request for mds%d\n", s->s_mds); 4492 ret = request_close_session(s); 4493 if (ret < 0) 4494 pr_err("unable to close session to mds%d: %d\n", 4495 s->s_mds, ret); 4496 } 4497 } 4498 4499 /* 4500 * delayed work -- periodically trim expired leases, renew caps with mds 4501 */ 4502 static void schedule_delayed(struct ceph_mds_client *mdsc) 4503 { 4504 int delay = 5; 4505 unsigned hz = round_jiffies_relative(HZ * delay); 4506 schedule_delayed_work(&mdsc->delayed_work, hz); 4507 } 4508 4509 static void delayed_work(struct work_struct *work) 4510 { 4511 int i; 4512 struct ceph_mds_client *mdsc = 4513 container_of(work, struct ceph_mds_client, delayed_work.work); 4514 int renew_interval; 4515 int renew_caps; 4516 4517 dout("mdsc delayed_work\n"); 4518 4519 if (mdsc->stopping) 4520 return; 4521 4522 mutex_lock(&mdsc->mutex); 4523 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 4524 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 4525 mdsc->last_renew_caps); 4526 if (renew_caps) 4527 mdsc->last_renew_caps = jiffies; 4528 4529 for (i = 0; i < mdsc->max_sessions; i++) { 4530 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4531 if (!s) 4532 continue; 4533 4534 if (!check_session_state(s)) { 4535 ceph_put_mds_session(s); 4536 continue; 4537 } 4538 mutex_unlock(&mdsc->mutex); 4539 4540 mutex_lock(&s->s_mutex); 4541 if (renew_caps) 4542 send_renew_caps(mdsc, s); 4543 else 4544 ceph_con_keepalive(&s->s_con); 4545 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4546 s->s_state == CEPH_MDS_SESSION_HUNG) 4547 ceph_send_cap_releases(mdsc, s); 4548 mutex_unlock(&s->s_mutex); 4549 ceph_put_mds_session(s); 4550 4551 mutex_lock(&mdsc->mutex); 4552 } 4553 mutex_unlock(&mdsc->mutex); 4554 4555 ceph_check_delayed_caps(mdsc); 4556 4557 ceph_queue_cap_reclaim_work(mdsc); 4558 4559 ceph_trim_snapid_map(mdsc); 4560 4561 maybe_recover_session(mdsc); 4562 4563 schedule_delayed(mdsc); 4564 } 4565 4566 int ceph_mdsc_init(struct ceph_fs_client *fsc) 4567 4568 { 4569 struct ceph_mds_client *mdsc; 4570 int err; 4571 4572 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 4573 if (!mdsc) 4574 return -ENOMEM; 4575 mdsc->fsc = fsc; 4576 mutex_init(&mdsc->mutex); 4577 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 4578 if (!mdsc->mdsmap) { 4579 err = -ENOMEM; 4580 goto err_mdsc; 4581 } 4582 4583 init_completion(&mdsc->safe_umount_waiters); 4584 init_waitqueue_head(&mdsc->session_close_wq); 4585 INIT_LIST_HEAD(&mdsc->waiting_for_map); 4586 mdsc->sessions = NULL; 4587 atomic_set(&mdsc->num_sessions, 0); 4588 mdsc->max_sessions = 0; 4589 mdsc->stopping = 0; 4590 atomic64_set(&mdsc->quotarealms_count, 0); 4591 mdsc->quotarealms_inodes = RB_ROOT; 4592 mutex_init(&mdsc->quotarealms_inodes_mutex); 4593 mdsc->last_snap_seq = 0; 4594 init_rwsem(&mdsc->snap_rwsem); 4595 mdsc->snap_realms = RB_ROOT; 4596 INIT_LIST_HEAD(&mdsc->snap_empty); 4597 mdsc->num_snap_realms = 0; 4598 spin_lock_init(&mdsc->snap_empty_lock); 4599 mdsc->last_tid = 0; 4600 mdsc->oldest_tid = 0; 4601 mdsc->request_tree = RB_ROOT; 4602 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 4603 mdsc->last_renew_caps = jiffies; 4604 INIT_LIST_HEAD(&mdsc->cap_delay_list); 4605 INIT_LIST_HEAD(&mdsc->cap_wait_list); 4606 spin_lock_init(&mdsc->cap_delay_lock); 4607 INIT_LIST_HEAD(&mdsc->snap_flush_list); 4608 spin_lock_init(&mdsc->snap_flush_lock); 4609 mdsc->last_cap_flush_tid = 1; 4610 INIT_LIST_HEAD(&mdsc->cap_flush_list); 4611 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 4612 mdsc->num_cap_flushing = 0; 4613 spin_lock_init(&mdsc->cap_dirty_lock); 4614 init_waitqueue_head(&mdsc->cap_flushing_wq); 4615 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 4616 atomic_set(&mdsc->cap_reclaim_pending, 0); 4617 err = ceph_metric_init(&mdsc->metric); 4618 if (err) 4619 goto err_mdsmap; 4620 4621 spin_lock_init(&mdsc->dentry_list_lock); 4622 INIT_LIST_HEAD(&mdsc->dentry_leases); 4623 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 4624 4625 ceph_caps_init(mdsc); 4626 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 4627 4628 spin_lock_init(&mdsc->snapid_map_lock); 4629 mdsc->snapid_map_tree = RB_ROOT; 4630 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 4631 4632 init_rwsem(&mdsc->pool_perm_rwsem); 4633 mdsc->pool_perm_tree = RB_ROOT; 4634 4635 strscpy(mdsc->nodename, utsname()->nodename, 4636 sizeof(mdsc->nodename)); 4637 4638 fsc->mdsc = mdsc; 4639 return 0; 4640 4641 err_mdsmap: 4642 kfree(mdsc->mdsmap); 4643 err_mdsc: 4644 kfree(mdsc); 4645 return err; 4646 } 4647 4648 /* 4649 * Wait for safe replies on open mds requests. If we time out, drop 4650 * all requests from the tree to avoid dangling dentry refs. 4651 */ 4652 static void wait_requests(struct ceph_mds_client *mdsc) 4653 { 4654 struct ceph_options *opts = mdsc->fsc->client->options; 4655 struct ceph_mds_request *req; 4656 4657 mutex_lock(&mdsc->mutex); 4658 if (__get_oldest_req(mdsc)) { 4659 mutex_unlock(&mdsc->mutex); 4660 4661 dout("wait_requests waiting for requests\n"); 4662 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 4663 ceph_timeout_jiffies(opts->mount_timeout)); 4664 4665 /* tear down remaining requests */ 4666 mutex_lock(&mdsc->mutex); 4667 while ((req = __get_oldest_req(mdsc))) { 4668 dout("wait_requests timed out on tid %llu\n", 4669 req->r_tid); 4670 list_del_init(&req->r_wait); 4671 __unregister_request(mdsc, req); 4672 } 4673 } 4674 mutex_unlock(&mdsc->mutex); 4675 dout("wait_requests done\n"); 4676 } 4677 4678 /* 4679 * called before mount is ro, and before dentries are torn down. 4680 * (hmm, does this still race with new lookups?) 4681 */ 4682 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 4683 { 4684 dout("pre_umount\n"); 4685 mdsc->stopping = 1; 4686 4687 lock_unlock_sessions(mdsc); 4688 ceph_flush_dirty_caps(mdsc); 4689 wait_requests(mdsc); 4690 4691 /* 4692 * wait for reply handlers to drop their request refs and 4693 * their inode/dcache refs 4694 */ 4695 ceph_msgr_flush(); 4696 4697 ceph_cleanup_quotarealms_inodes(mdsc); 4698 } 4699 4700 /* 4701 * wait for all write mds requests to flush. 4702 */ 4703 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 4704 { 4705 struct ceph_mds_request *req = NULL, *nextreq; 4706 struct rb_node *n; 4707 4708 mutex_lock(&mdsc->mutex); 4709 dout("wait_unsafe_requests want %lld\n", want_tid); 4710 restart: 4711 req = __get_oldest_req(mdsc); 4712 while (req && req->r_tid <= want_tid) { 4713 /* find next request */ 4714 n = rb_next(&req->r_node); 4715 if (n) 4716 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 4717 else 4718 nextreq = NULL; 4719 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 4720 (req->r_op & CEPH_MDS_OP_WRITE)) { 4721 /* write op */ 4722 ceph_mdsc_get_request(req); 4723 if (nextreq) 4724 ceph_mdsc_get_request(nextreq); 4725 mutex_unlock(&mdsc->mutex); 4726 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 4727 req->r_tid, want_tid); 4728 wait_for_completion(&req->r_safe_completion); 4729 mutex_lock(&mdsc->mutex); 4730 ceph_mdsc_put_request(req); 4731 if (!nextreq) 4732 break; /* next dne before, so we're done! */ 4733 if (RB_EMPTY_NODE(&nextreq->r_node)) { 4734 /* next request was removed from tree */ 4735 ceph_mdsc_put_request(nextreq); 4736 goto restart; 4737 } 4738 ceph_mdsc_put_request(nextreq); /* won't go away */ 4739 } 4740 req = nextreq; 4741 } 4742 mutex_unlock(&mdsc->mutex); 4743 dout("wait_unsafe_requests done\n"); 4744 } 4745 4746 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 4747 { 4748 u64 want_tid, want_flush; 4749 4750 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) 4751 return; 4752 4753 dout("sync\n"); 4754 mutex_lock(&mdsc->mutex); 4755 want_tid = mdsc->last_tid; 4756 mutex_unlock(&mdsc->mutex); 4757 4758 ceph_flush_dirty_caps(mdsc); 4759 spin_lock(&mdsc->cap_dirty_lock); 4760 want_flush = mdsc->last_cap_flush_tid; 4761 if (!list_empty(&mdsc->cap_flush_list)) { 4762 struct ceph_cap_flush *cf = 4763 list_last_entry(&mdsc->cap_flush_list, 4764 struct ceph_cap_flush, g_list); 4765 cf->wake = true; 4766 } 4767 spin_unlock(&mdsc->cap_dirty_lock); 4768 4769 dout("sync want tid %lld flush_seq %lld\n", 4770 want_tid, want_flush); 4771 4772 wait_unsafe_requests(mdsc, want_tid); 4773 wait_caps_flush(mdsc, want_flush); 4774 } 4775 4776 /* 4777 * true if all sessions are closed, or we force unmount 4778 */ 4779 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 4780 { 4781 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4782 return true; 4783 return atomic_read(&mdsc->num_sessions) <= skipped; 4784 } 4785 4786 /* 4787 * called after sb is ro. 4788 */ 4789 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 4790 { 4791 struct ceph_options *opts = mdsc->fsc->client->options; 4792 struct ceph_mds_session *session; 4793 int i; 4794 int skipped = 0; 4795 4796 dout("close_sessions\n"); 4797 4798 /* close sessions */ 4799 mutex_lock(&mdsc->mutex); 4800 for (i = 0; i < mdsc->max_sessions; i++) { 4801 session = __ceph_lookup_mds_session(mdsc, i); 4802 if (!session) 4803 continue; 4804 mutex_unlock(&mdsc->mutex); 4805 mutex_lock(&session->s_mutex); 4806 if (__close_session(mdsc, session) <= 0) 4807 skipped++; 4808 mutex_unlock(&session->s_mutex); 4809 ceph_put_mds_session(session); 4810 mutex_lock(&mdsc->mutex); 4811 } 4812 mutex_unlock(&mdsc->mutex); 4813 4814 dout("waiting for sessions to close\n"); 4815 wait_event_timeout(mdsc->session_close_wq, 4816 done_closing_sessions(mdsc, skipped), 4817 ceph_timeout_jiffies(opts->mount_timeout)); 4818 4819 /* tear down remaining sessions */ 4820 mutex_lock(&mdsc->mutex); 4821 for (i = 0; i < mdsc->max_sessions; i++) { 4822 if (mdsc->sessions[i]) { 4823 session = ceph_get_mds_session(mdsc->sessions[i]); 4824 __unregister_session(mdsc, session); 4825 mutex_unlock(&mdsc->mutex); 4826 mutex_lock(&session->s_mutex); 4827 remove_session_caps(session); 4828 mutex_unlock(&session->s_mutex); 4829 ceph_put_mds_session(session); 4830 mutex_lock(&mdsc->mutex); 4831 } 4832 } 4833 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 4834 mutex_unlock(&mdsc->mutex); 4835 4836 ceph_cleanup_snapid_map(mdsc); 4837 ceph_cleanup_empty_realms(mdsc); 4838 4839 cancel_work_sync(&mdsc->cap_reclaim_work); 4840 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4841 4842 dout("stopped\n"); 4843 } 4844 4845 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 4846 { 4847 struct ceph_mds_session *session; 4848 int mds; 4849 4850 dout("force umount\n"); 4851 4852 mutex_lock(&mdsc->mutex); 4853 for (mds = 0; mds < mdsc->max_sessions; mds++) { 4854 session = __ceph_lookup_mds_session(mdsc, mds); 4855 if (!session) 4856 continue; 4857 4858 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 4859 __unregister_session(mdsc, session); 4860 __wake_requests(mdsc, &session->s_waiting); 4861 mutex_unlock(&mdsc->mutex); 4862 4863 mutex_lock(&session->s_mutex); 4864 __close_session(mdsc, session); 4865 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 4866 cleanup_session_requests(mdsc, session); 4867 remove_session_caps(session); 4868 } 4869 mutex_unlock(&session->s_mutex); 4870 ceph_put_mds_session(session); 4871 4872 mutex_lock(&mdsc->mutex); 4873 kick_requests(mdsc, mds); 4874 } 4875 __wake_requests(mdsc, &mdsc->waiting_for_map); 4876 mutex_unlock(&mdsc->mutex); 4877 } 4878 4879 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 4880 { 4881 dout("stop\n"); 4882 /* 4883 * Make sure the delayed work stopped before releasing 4884 * the resources. 4885 * 4886 * Because the cancel_delayed_work_sync() will only 4887 * guarantee that the work finishes executing. But the 4888 * delayed work will re-arm itself again after that. 4889 */ 4890 flush_delayed_work(&mdsc->delayed_work); 4891 4892 if (mdsc->mdsmap) 4893 ceph_mdsmap_destroy(mdsc->mdsmap); 4894 kfree(mdsc->sessions); 4895 ceph_caps_finalize(mdsc); 4896 ceph_pool_perm_destroy(mdsc); 4897 } 4898 4899 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 4900 { 4901 struct ceph_mds_client *mdsc = fsc->mdsc; 4902 dout("mdsc_destroy %p\n", mdsc); 4903 4904 if (!mdsc) 4905 return; 4906 4907 /* flush out any connection work with references to us */ 4908 ceph_msgr_flush(); 4909 4910 ceph_mdsc_stop(mdsc); 4911 4912 ceph_metric_destroy(&mdsc->metric); 4913 4914 flush_delayed_work(&mdsc->metric.delayed_work); 4915 fsc->mdsc = NULL; 4916 kfree(mdsc); 4917 dout("mdsc_destroy %p done\n", mdsc); 4918 } 4919 4920 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4921 { 4922 struct ceph_fs_client *fsc = mdsc->fsc; 4923 const char *mds_namespace = fsc->mount_options->mds_namespace; 4924 void *p = msg->front.iov_base; 4925 void *end = p + msg->front.iov_len; 4926 u32 epoch; 4927 u32 num_fs; 4928 u32 mount_fscid = (u32)-1; 4929 int err = -EINVAL; 4930 4931 ceph_decode_need(&p, end, sizeof(u32), bad); 4932 epoch = ceph_decode_32(&p); 4933 4934 dout("handle_fsmap epoch %u\n", epoch); 4935 4936 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */ 4937 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad); 4938 4939 ceph_decode_32_safe(&p, end, num_fs, bad); 4940 while (num_fs-- > 0) { 4941 void *info_p, *info_end; 4942 u32 info_len; 4943 u32 fscid, namelen; 4944 4945 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4946 p += 2; // info_v, info_cv 4947 info_len = ceph_decode_32(&p); 4948 ceph_decode_need(&p, end, info_len, bad); 4949 info_p = p; 4950 info_end = p + info_len; 4951 p = info_end; 4952 4953 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 4954 fscid = ceph_decode_32(&info_p); 4955 namelen = ceph_decode_32(&info_p); 4956 ceph_decode_need(&info_p, info_end, namelen, bad); 4957 4958 if (mds_namespace && 4959 strlen(mds_namespace) == namelen && 4960 !strncmp(mds_namespace, (char *)info_p, namelen)) { 4961 mount_fscid = fscid; 4962 break; 4963 } 4964 } 4965 4966 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 4967 if (mount_fscid != (u32)-1) { 4968 fsc->client->monc.fs_cluster_id = mount_fscid; 4969 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 4970 0, true); 4971 ceph_monc_renew_subs(&fsc->client->monc); 4972 } else { 4973 err = -ENOENT; 4974 goto err_out; 4975 } 4976 return; 4977 4978 bad: 4979 pr_err("error decoding fsmap\n"); 4980 err_out: 4981 mutex_lock(&mdsc->mutex); 4982 mdsc->mdsmap_err = err; 4983 __wake_requests(mdsc, &mdsc->waiting_for_map); 4984 mutex_unlock(&mdsc->mutex); 4985 } 4986 4987 /* 4988 * handle mds map update. 4989 */ 4990 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4991 { 4992 u32 epoch; 4993 u32 maplen; 4994 void *p = msg->front.iov_base; 4995 void *end = p + msg->front.iov_len; 4996 struct ceph_mdsmap *newmap, *oldmap; 4997 struct ceph_fsid fsid; 4998 int err = -EINVAL; 4999 5000 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 5001 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 5002 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 5003 return; 5004 epoch = ceph_decode_32(&p); 5005 maplen = ceph_decode_32(&p); 5006 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 5007 5008 /* do we need it? */ 5009 mutex_lock(&mdsc->mutex); 5010 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 5011 dout("handle_map epoch %u <= our %u\n", 5012 epoch, mdsc->mdsmap->m_epoch); 5013 mutex_unlock(&mdsc->mutex); 5014 return; 5015 } 5016 5017 newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client)); 5018 if (IS_ERR(newmap)) { 5019 err = PTR_ERR(newmap); 5020 goto bad_unlock; 5021 } 5022 5023 /* swap into place */ 5024 if (mdsc->mdsmap) { 5025 oldmap = mdsc->mdsmap; 5026 mdsc->mdsmap = newmap; 5027 check_new_map(mdsc, newmap, oldmap); 5028 ceph_mdsmap_destroy(oldmap); 5029 } else { 5030 mdsc->mdsmap = newmap; /* first mds map */ 5031 } 5032 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 5033 MAX_LFS_FILESIZE); 5034 5035 __wake_requests(mdsc, &mdsc->waiting_for_map); 5036 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 5037 mdsc->mdsmap->m_epoch); 5038 5039 mutex_unlock(&mdsc->mutex); 5040 schedule_delayed(mdsc); 5041 return; 5042 5043 bad_unlock: 5044 mutex_unlock(&mdsc->mutex); 5045 bad: 5046 pr_err("error decoding mdsmap %d\n", err); 5047 return; 5048 } 5049 5050 static struct ceph_connection *con_get(struct ceph_connection *con) 5051 { 5052 struct ceph_mds_session *s = con->private; 5053 5054 if (ceph_get_mds_session(s)) 5055 return con; 5056 return NULL; 5057 } 5058 5059 static void con_put(struct ceph_connection *con) 5060 { 5061 struct ceph_mds_session *s = con->private; 5062 5063 ceph_put_mds_session(s); 5064 } 5065 5066 /* 5067 * if the client is unresponsive for long enough, the mds will kill 5068 * the session entirely. 5069 */ 5070 static void peer_reset(struct ceph_connection *con) 5071 { 5072 struct ceph_mds_session *s = con->private; 5073 struct ceph_mds_client *mdsc = s->s_mdsc; 5074 5075 pr_warn("mds%d closed our session\n", s->s_mds); 5076 send_mds_reconnect(mdsc, s); 5077 } 5078 5079 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 5080 { 5081 struct ceph_mds_session *s = con->private; 5082 struct ceph_mds_client *mdsc = s->s_mdsc; 5083 int type = le16_to_cpu(msg->hdr.type); 5084 5085 mutex_lock(&mdsc->mutex); 5086 if (__verify_registered_session(mdsc, s) < 0) { 5087 mutex_unlock(&mdsc->mutex); 5088 goto out; 5089 } 5090 mutex_unlock(&mdsc->mutex); 5091 5092 switch (type) { 5093 case CEPH_MSG_MDS_MAP: 5094 ceph_mdsc_handle_mdsmap(mdsc, msg); 5095 break; 5096 case CEPH_MSG_FS_MAP_USER: 5097 ceph_mdsc_handle_fsmap(mdsc, msg); 5098 break; 5099 case CEPH_MSG_CLIENT_SESSION: 5100 handle_session(s, msg); 5101 break; 5102 case CEPH_MSG_CLIENT_REPLY: 5103 handle_reply(s, msg); 5104 break; 5105 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 5106 handle_forward(mdsc, s, msg); 5107 break; 5108 case CEPH_MSG_CLIENT_CAPS: 5109 ceph_handle_caps(s, msg); 5110 break; 5111 case CEPH_MSG_CLIENT_SNAP: 5112 ceph_handle_snap(mdsc, s, msg); 5113 break; 5114 case CEPH_MSG_CLIENT_LEASE: 5115 handle_lease(mdsc, s, msg); 5116 break; 5117 case CEPH_MSG_CLIENT_QUOTA: 5118 ceph_handle_quota(mdsc, s, msg); 5119 break; 5120 5121 default: 5122 pr_err("received unknown message type %d %s\n", type, 5123 ceph_msg_type_name(type)); 5124 } 5125 out: 5126 ceph_msg_put(msg); 5127 } 5128 5129 /* 5130 * authentication 5131 */ 5132 5133 /* 5134 * Note: returned pointer is the address of a structure that's 5135 * managed separately. Caller must *not* attempt to free it. 5136 */ 5137 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 5138 int *proto, int force_new) 5139 { 5140 struct ceph_mds_session *s = con->private; 5141 struct ceph_mds_client *mdsc = s->s_mdsc; 5142 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5143 struct ceph_auth_handshake *auth = &s->s_auth; 5144 int ret; 5145 5146 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5147 force_new, proto, NULL, NULL); 5148 if (ret) 5149 return ERR_PTR(ret); 5150 5151 return auth; 5152 } 5153 5154 static int add_authorizer_challenge(struct ceph_connection *con, 5155 void *challenge_buf, int challenge_buf_len) 5156 { 5157 struct ceph_mds_session *s = con->private; 5158 struct ceph_mds_client *mdsc = s->s_mdsc; 5159 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5160 5161 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 5162 challenge_buf, challenge_buf_len); 5163 } 5164 5165 static int verify_authorizer_reply(struct ceph_connection *con) 5166 { 5167 struct ceph_mds_session *s = con->private; 5168 struct ceph_mds_client *mdsc = s->s_mdsc; 5169 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5170 struct ceph_auth_handshake *auth = &s->s_auth; 5171 5172 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer, 5173 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len, 5174 NULL, NULL, NULL, NULL); 5175 } 5176 5177 static int invalidate_authorizer(struct ceph_connection *con) 5178 { 5179 struct ceph_mds_session *s = con->private; 5180 struct ceph_mds_client *mdsc = s->s_mdsc; 5181 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5182 5183 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 5184 5185 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 5186 } 5187 5188 static int mds_get_auth_request(struct ceph_connection *con, 5189 void *buf, int *buf_len, 5190 void **authorizer, int *authorizer_len) 5191 { 5192 struct ceph_mds_session *s = con->private; 5193 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5194 struct ceph_auth_handshake *auth = &s->s_auth; 5195 int ret; 5196 5197 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5198 buf, buf_len); 5199 if (ret) 5200 return ret; 5201 5202 *authorizer = auth->authorizer_buf; 5203 *authorizer_len = auth->authorizer_buf_len; 5204 return 0; 5205 } 5206 5207 static int mds_handle_auth_reply_more(struct ceph_connection *con, 5208 void *reply, int reply_len, 5209 void *buf, int *buf_len, 5210 void **authorizer, int *authorizer_len) 5211 { 5212 struct ceph_mds_session *s = con->private; 5213 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5214 struct ceph_auth_handshake *auth = &s->s_auth; 5215 int ret; 5216 5217 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len, 5218 buf, buf_len); 5219 if (ret) 5220 return ret; 5221 5222 *authorizer = auth->authorizer_buf; 5223 *authorizer_len = auth->authorizer_buf_len; 5224 return 0; 5225 } 5226 5227 static int mds_handle_auth_done(struct ceph_connection *con, 5228 u64 global_id, void *reply, int reply_len, 5229 u8 *session_key, int *session_key_len, 5230 u8 *con_secret, int *con_secret_len) 5231 { 5232 struct ceph_mds_session *s = con->private; 5233 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5234 struct ceph_auth_handshake *auth = &s->s_auth; 5235 5236 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len, 5237 session_key, session_key_len, 5238 con_secret, con_secret_len); 5239 } 5240 5241 static int mds_handle_auth_bad_method(struct ceph_connection *con, 5242 int used_proto, int result, 5243 const int *allowed_protos, int proto_cnt, 5244 const int *allowed_modes, int mode_cnt) 5245 { 5246 struct ceph_mds_session *s = con->private; 5247 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc; 5248 int ret; 5249 5250 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS, 5251 used_proto, result, 5252 allowed_protos, proto_cnt, 5253 allowed_modes, mode_cnt)) { 5254 ret = ceph_monc_validate_auth(monc); 5255 if (ret) 5256 return ret; 5257 } 5258 5259 return -EACCES; 5260 } 5261 5262 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 5263 struct ceph_msg_header *hdr, int *skip) 5264 { 5265 struct ceph_msg *msg; 5266 int type = (int) le16_to_cpu(hdr->type); 5267 int front_len = (int) le32_to_cpu(hdr->front_len); 5268 5269 if (con->in_msg) 5270 return con->in_msg; 5271 5272 *skip = 0; 5273 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 5274 if (!msg) { 5275 pr_err("unable to allocate msg type %d len %d\n", 5276 type, front_len); 5277 return NULL; 5278 } 5279 5280 return msg; 5281 } 5282 5283 static int mds_sign_message(struct ceph_msg *msg) 5284 { 5285 struct ceph_mds_session *s = msg->con->private; 5286 struct ceph_auth_handshake *auth = &s->s_auth; 5287 5288 return ceph_auth_sign_message(auth, msg); 5289 } 5290 5291 static int mds_check_message_signature(struct ceph_msg *msg) 5292 { 5293 struct ceph_mds_session *s = msg->con->private; 5294 struct ceph_auth_handshake *auth = &s->s_auth; 5295 5296 return ceph_auth_check_message_signature(auth, msg); 5297 } 5298 5299 static const struct ceph_connection_operations mds_con_ops = { 5300 .get = con_get, 5301 .put = con_put, 5302 .dispatch = dispatch, 5303 .get_authorizer = get_authorizer, 5304 .add_authorizer_challenge = add_authorizer_challenge, 5305 .verify_authorizer_reply = verify_authorizer_reply, 5306 .invalidate_authorizer = invalidate_authorizer, 5307 .peer_reset = peer_reset, 5308 .alloc_msg = mds_alloc_msg, 5309 .sign_message = mds_sign_message, 5310 .check_message_signature = mds_check_message_signature, 5311 .get_auth_request = mds_get_auth_request, 5312 .handle_auth_reply_more = mds_handle_auth_reply_more, 5313 .handle_auth_done = mds_handle_auth_done, 5314 .handle_auth_bad_method = mds_handle_auth_bad_method, 5315 }; 5316 5317 /* eof */ 5318