1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 15 #include "super.h" 16 #include "mds_client.h" 17 18 #include <linux/ceph/ceph_features.h> 19 #include <linux/ceph/messenger.h> 20 #include <linux/ceph/decode.h> 21 #include <linux/ceph/pagelist.h> 22 #include <linux/ceph/auth.h> 23 #include <linux/ceph/debugfs.h> 24 25 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 26 27 /* 28 * A cluster of MDS (metadata server) daemons is responsible for 29 * managing the file system namespace (the directory hierarchy and 30 * inodes) and for coordinating shared access to storage. Metadata is 31 * partitioning hierarchically across a number of servers, and that 32 * partition varies over time as the cluster adjusts the distribution 33 * in order to balance load. 34 * 35 * The MDS client is primarily responsible to managing synchronous 36 * metadata requests for operations like open, unlink, and so forth. 37 * If there is a MDS failure, we find out about it when we (possibly 38 * request and) receive a new MDS map, and can resubmit affected 39 * requests. 40 * 41 * For the most part, though, we take advantage of a lossless 42 * communications channel to the MDS, and do not need to worry about 43 * timing out or resubmitting requests. 44 * 45 * We maintain a stateful "session" with each MDS we interact with. 46 * Within each session, we sent periodic heartbeat messages to ensure 47 * any capabilities or leases we have been issues remain valid. If 48 * the session times out and goes stale, our leases and capabilities 49 * are no longer valid. 50 */ 51 52 struct ceph_reconnect_state { 53 struct ceph_mds_session *session; 54 int nr_caps, nr_realms; 55 struct ceph_pagelist *pagelist; 56 unsigned msg_version; 57 bool allow_multi; 58 }; 59 60 static void __wake_requests(struct ceph_mds_client *mdsc, 61 struct list_head *head); 62 static void ceph_cap_release_work(struct work_struct *work); 63 static void ceph_cap_reclaim_work(struct work_struct *work); 64 65 static const struct ceph_connection_operations mds_con_ops; 66 67 68 /* 69 * mds reply parsing 70 */ 71 72 static int parse_reply_info_quota(void **p, void *end, 73 struct ceph_mds_reply_info_in *info) 74 { 75 u8 struct_v, struct_compat; 76 u32 struct_len; 77 78 ceph_decode_8_safe(p, end, struct_v, bad); 79 ceph_decode_8_safe(p, end, struct_compat, bad); 80 /* struct_v is expected to be >= 1. we only 81 * understand encoding with struct_compat == 1. */ 82 if (!struct_v || struct_compat != 1) 83 goto bad; 84 ceph_decode_32_safe(p, end, struct_len, bad); 85 ceph_decode_need(p, end, struct_len, bad); 86 end = *p + struct_len; 87 ceph_decode_64_safe(p, end, info->max_bytes, bad); 88 ceph_decode_64_safe(p, end, info->max_files, bad); 89 *p = end; 90 return 0; 91 bad: 92 return -EIO; 93 } 94 95 /* 96 * parse individual inode info 97 */ 98 static int parse_reply_info_in(void **p, void *end, 99 struct ceph_mds_reply_info_in *info, 100 u64 features) 101 { 102 int err = 0; 103 u8 struct_v = 0; 104 105 if (features == (u64)-1) { 106 u32 struct_len; 107 u8 struct_compat; 108 ceph_decode_8_safe(p, end, struct_v, bad); 109 ceph_decode_8_safe(p, end, struct_compat, bad); 110 /* struct_v is expected to be >= 1. we only understand 111 * encoding with struct_compat == 1. */ 112 if (!struct_v || struct_compat != 1) 113 goto bad; 114 ceph_decode_32_safe(p, end, struct_len, bad); 115 ceph_decode_need(p, end, struct_len, bad); 116 end = *p + struct_len; 117 } 118 119 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 120 info->in = *p; 121 *p += sizeof(struct ceph_mds_reply_inode) + 122 sizeof(*info->in->fragtree.splits) * 123 le32_to_cpu(info->in->fragtree.nsplits); 124 125 ceph_decode_32_safe(p, end, info->symlink_len, bad); 126 ceph_decode_need(p, end, info->symlink_len, bad); 127 info->symlink = *p; 128 *p += info->symlink_len; 129 130 ceph_decode_copy_safe(p, end, &info->dir_layout, 131 sizeof(info->dir_layout), bad); 132 ceph_decode_32_safe(p, end, info->xattr_len, bad); 133 ceph_decode_need(p, end, info->xattr_len, bad); 134 info->xattr_data = *p; 135 *p += info->xattr_len; 136 137 if (features == (u64)-1) { 138 /* inline data */ 139 ceph_decode_64_safe(p, end, info->inline_version, bad); 140 ceph_decode_32_safe(p, end, info->inline_len, bad); 141 ceph_decode_need(p, end, info->inline_len, bad); 142 info->inline_data = *p; 143 *p += info->inline_len; 144 /* quota */ 145 err = parse_reply_info_quota(p, end, info); 146 if (err < 0) 147 goto out_bad; 148 /* pool namespace */ 149 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 150 if (info->pool_ns_len > 0) { 151 ceph_decode_need(p, end, info->pool_ns_len, bad); 152 info->pool_ns_data = *p; 153 *p += info->pool_ns_len; 154 } 155 156 /* btime */ 157 ceph_decode_need(p, end, sizeof(info->btime), bad); 158 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 159 160 /* change attribute */ 161 ceph_decode_64_safe(p, end, info->change_attr, bad); 162 163 /* dir pin */ 164 if (struct_v >= 2) { 165 ceph_decode_32_safe(p, end, info->dir_pin, bad); 166 } else { 167 info->dir_pin = -ENODATA; 168 } 169 170 /* snapshot birth time, remains zero for v<=2 */ 171 if (struct_v >= 3) { 172 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 173 ceph_decode_copy(p, &info->snap_btime, 174 sizeof(info->snap_btime)); 175 } else { 176 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 177 } 178 179 *p = end; 180 } else { 181 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 182 ceph_decode_64_safe(p, end, info->inline_version, bad); 183 ceph_decode_32_safe(p, end, info->inline_len, bad); 184 ceph_decode_need(p, end, info->inline_len, bad); 185 info->inline_data = *p; 186 *p += info->inline_len; 187 } else 188 info->inline_version = CEPH_INLINE_NONE; 189 190 if (features & CEPH_FEATURE_MDS_QUOTA) { 191 err = parse_reply_info_quota(p, end, info); 192 if (err < 0) 193 goto out_bad; 194 } else { 195 info->max_bytes = 0; 196 info->max_files = 0; 197 } 198 199 info->pool_ns_len = 0; 200 info->pool_ns_data = NULL; 201 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 202 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 203 if (info->pool_ns_len > 0) { 204 ceph_decode_need(p, end, info->pool_ns_len, bad); 205 info->pool_ns_data = *p; 206 *p += info->pool_ns_len; 207 } 208 } 209 210 if (features & CEPH_FEATURE_FS_BTIME) { 211 ceph_decode_need(p, end, sizeof(info->btime), bad); 212 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 213 ceph_decode_64_safe(p, end, info->change_attr, bad); 214 } 215 216 info->dir_pin = -ENODATA; 217 /* info->snap_btime remains zero */ 218 } 219 return 0; 220 bad: 221 err = -EIO; 222 out_bad: 223 return err; 224 } 225 226 static int parse_reply_info_dir(void **p, void *end, 227 struct ceph_mds_reply_dirfrag **dirfrag, 228 u64 features) 229 { 230 if (features == (u64)-1) { 231 u8 struct_v, struct_compat; 232 u32 struct_len; 233 ceph_decode_8_safe(p, end, struct_v, bad); 234 ceph_decode_8_safe(p, end, struct_compat, bad); 235 /* struct_v is expected to be >= 1. we only understand 236 * encoding whose struct_compat == 1. */ 237 if (!struct_v || struct_compat != 1) 238 goto bad; 239 ceph_decode_32_safe(p, end, struct_len, bad); 240 ceph_decode_need(p, end, struct_len, bad); 241 end = *p + struct_len; 242 } 243 244 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 245 *dirfrag = *p; 246 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 247 if (unlikely(*p > end)) 248 goto bad; 249 if (features == (u64)-1) 250 *p = end; 251 return 0; 252 bad: 253 return -EIO; 254 } 255 256 static int parse_reply_info_lease(void **p, void *end, 257 struct ceph_mds_reply_lease **lease, 258 u64 features) 259 { 260 if (features == (u64)-1) { 261 u8 struct_v, struct_compat; 262 u32 struct_len; 263 ceph_decode_8_safe(p, end, struct_v, bad); 264 ceph_decode_8_safe(p, end, struct_compat, bad); 265 /* struct_v is expected to be >= 1. we only understand 266 * encoding whose struct_compat == 1. */ 267 if (!struct_v || struct_compat != 1) 268 goto bad; 269 ceph_decode_32_safe(p, end, struct_len, bad); 270 ceph_decode_need(p, end, struct_len, bad); 271 end = *p + struct_len; 272 } 273 274 ceph_decode_need(p, end, sizeof(**lease), bad); 275 *lease = *p; 276 *p += sizeof(**lease); 277 if (features == (u64)-1) 278 *p = end; 279 return 0; 280 bad: 281 return -EIO; 282 } 283 284 /* 285 * parse a normal reply, which may contain a (dir+)dentry and/or a 286 * target inode. 287 */ 288 static int parse_reply_info_trace(void **p, void *end, 289 struct ceph_mds_reply_info_parsed *info, 290 u64 features) 291 { 292 int err; 293 294 if (info->head->is_dentry) { 295 err = parse_reply_info_in(p, end, &info->diri, features); 296 if (err < 0) 297 goto out_bad; 298 299 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 300 if (err < 0) 301 goto out_bad; 302 303 ceph_decode_32_safe(p, end, info->dname_len, bad); 304 ceph_decode_need(p, end, info->dname_len, bad); 305 info->dname = *p; 306 *p += info->dname_len; 307 308 err = parse_reply_info_lease(p, end, &info->dlease, features); 309 if (err < 0) 310 goto out_bad; 311 } 312 313 if (info->head->is_target) { 314 err = parse_reply_info_in(p, end, &info->targeti, features); 315 if (err < 0) 316 goto out_bad; 317 } 318 319 if (unlikely(*p != end)) 320 goto bad; 321 return 0; 322 323 bad: 324 err = -EIO; 325 out_bad: 326 pr_err("problem parsing mds trace %d\n", err); 327 return err; 328 } 329 330 /* 331 * parse readdir results 332 */ 333 static int parse_reply_info_readdir(void **p, void *end, 334 struct ceph_mds_reply_info_parsed *info, 335 u64 features) 336 { 337 u32 num, i = 0; 338 int err; 339 340 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 341 if (err < 0) 342 goto out_bad; 343 344 ceph_decode_need(p, end, sizeof(num) + 2, bad); 345 num = ceph_decode_32(p); 346 { 347 u16 flags = ceph_decode_16(p); 348 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 349 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 350 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 351 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 352 } 353 if (num == 0) 354 goto done; 355 356 BUG_ON(!info->dir_entries); 357 if ((unsigned long)(info->dir_entries + num) > 358 (unsigned long)info->dir_entries + info->dir_buf_size) { 359 pr_err("dir contents are larger than expected\n"); 360 WARN_ON(1); 361 goto bad; 362 } 363 364 info->dir_nr = num; 365 while (num) { 366 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 367 /* dentry */ 368 ceph_decode_32_safe(p, end, rde->name_len, bad); 369 ceph_decode_need(p, end, rde->name_len, bad); 370 rde->name = *p; 371 *p += rde->name_len; 372 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 373 374 /* dentry lease */ 375 err = parse_reply_info_lease(p, end, &rde->lease, features); 376 if (err) 377 goto out_bad; 378 /* inode */ 379 err = parse_reply_info_in(p, end, &rde->inode, features); 380 if (err < 0) 381 goto out_bad; 382 /* ceph_readdir_prepopulate() will update it */ 383 rde->offset = 0; 384 i++; 385 num--; 386 } 387 388 done: 389 /* Skip over any unrecognized fields */ 390 *p = end; 391 return 0; 392 393 bad: 394 err = -EIO; 395 out_bad: 396 pr_err("problem parsing dir contents %d\n", err); 397 return err; 398 } 399 400 /* 401 * parse fcntl F_GETLK results 402 */ 403 static int parse_reply_info_filelock(void **p, void *end, 404 struct ceph_mds_reply_info_parsed *info, 405 u64 features) 406 { 407 if (*p + sizeof(*info->filelock_reply) > end) 408 goto bad; 409 410 info->filelock_reply = *p; 411 412 /* Skip over any unrecognized fields */ 413 *p = end; 414 return 0; 415 bad: 416 return -EIO; 417 } 418 419 420 #if BITS_PER_LONG == 64 421 422 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 423 424 static int ceph_parse_deleg_inos(void **p, void *end, 425 struct ceph_mds_session *s) 426 { 427 u32 sets; 428 429 ceph_decode_32_safe(p, end, sets, bad); 430 dout("got %u sets of delegated inodes\n", sets); 431 while (sets--) { 432 u64 start, len, ino; 433 434 ceph_decode_64_safe(p, end, start, bad); 435 ceph_decode_64_safe(p, end, len, bad); 436 while (len--) { 437 int err = xa_insert(&s->s_delegated_inos, ino = start++, 438 DELEGATED_INO_AVAILABLE, 439 GFP_KERNEL); 440 if (!err) { 441 dout("added delegated inode 0x%llx\n", 442 start - 1); 443 } else if (err == -EBUSY) { 444 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n", 445 start - 1); 446 } else { 447 return err; 448 } 449 } 450 } 451 return 0; 452 bad: 453 return -EIO; 454 } 455 456 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 457 { 458 unsigned long ino; 459 void *val; 460 461 xa_for_each(&s->s_delegated_inos, ino, val) { 462 val = xa_erase(&s->s_delegated_inos, ino); 463 if (val == DELEGATED_INO_AVAILABLE) 464 return ino; 465 } 466 return 0; 467 } 468 469 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 470 { 471 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 472 GFP_KERNEL); 473 } 474 #else /* BITS_PER_LONG == 64 */ 475 /* 476 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 477 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 478 * and bottom words? 479 */ 480 static int ceph_parse_deleg_inos(void **p, void *end, 481 struct ceph_mds_session *s) 482 { 483 u32 sets; 484 485 ceph_decode_32_safe(p, end, sets, bad); 486 if (sets) 487 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 488 return 0; 489 bad: 490 return -EIO; 491 } 492 493 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 494 { 495 return 0; 496 } 497 498 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 499 { 500 return 0; 501 } 502 #endif /* BITS_PER_LONG == 64 */ 503 504 /* 505 * parse create results 506 */ 507 static int parse_reply_info_create(void **p, void *end, 508 struct ceph_mds_reply_info_parsed *info, 509 u64 features, struct ceph_mds_session *s) 510 { 511 int ret; 512 513 if (features == (u64)-1 || 514 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 515 if (*p == end) { 516 /* Malformed reply? */ 517 info->has_create_ino = false; 518 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 519 info->has_create_ino = true; 520 /* struct_v, struct_compat, and len */ 521 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad); 522 ceph_decode_64_safe(p, end, info->ino, bad); 523 ret = ceph_parse_deleg_inos(p, end, s); 524 if (ret) 525 return ret; 526 } else { 527 /* legacy */ 528 ceph_decode_64_safe(p, end, info->ino, bad); 529 info->has_create_ino = true; 530 } 531 } else { 532 if (*p != end) 533 goto bad; 534 } 535 536 /* Skip over any unrecognized fields */ 537 *p = end; 538 return 0; 539 bad: 540 return -EIO; 541 } 542 543 /* 544 * parse extra results 545 */ 546 static int parse_reply_info_extra(void **p, void *end, 547 struct ceph_mds_reply_info_parsed *info, 548 u64 features, struct ceph_mds_session *s) 549 { 550 u32 op = le32_to_cpu(info->head->op); 551 552 if (op == CEPH_MDS_OP_GETFILELOCK) 553 return parse_reply_info_filelock(p, end, info, features); 554 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 555 return parse_reply_info_readdir(p, end, info, features); 556 else if (op == CEPH_MDS_OP_CREATE) 557 return parse_reply_info_create(p, end, info, features, s); 558 else 559 return -EIO; 560 } 561 562 /* 563 * parse entire mds reply 564 */ 565 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 566 struct ceph_mds_reply_info_parsed *info, 567 u64 features) 568 { 569 void *p, *end; 570 u32 len; 571 int err; 572 573 info->head = msg->front.iov_base; 574 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 575 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 576 577 /* trace */ 578 ceph_decode_32_safe(&p, end, len, bad); 579 if (len > 0) { 580 ceph_decode_need(&p, end, len, bad); 581 err = parse_reply_info_trace(&p, p+len, info, features); 582 if (err < 0) 583 goto out_bad; 584 } 585 586 /* extra */ 587 ceph_decode_32_safe(&p, end, len, bad); 588 if (len > 0) { 589 ceph_decode_need(&p, end, len, bad); 590 err = parse_reply_info_extra(&p, p+len, info, features, s); 591 if (err < 0) 592 goto out_bad; 593 } 594 595 /* snap blob */ 596 ceph_decode_32_safe(&p, end, len, bad); 597 info->snapblob_len = len; 598 info->snapblob = p; 599 p += len; 600 601 if (p != end) 602 goto bad; 603 return 0; 604 605 bad: 606 err = -EIO; 607 out_bad: 608 pr_err("mds parse_reply err %d\n", err); 609 return err; 610 } 611 612 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 613 { 614 if (!info->dir_entries) 615 return; 616 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 617 } 618 619 620 /* 621 * sessions 622 */ 623 const char *ceph_session_state_name(int s) 624 { 625 switch (s) { 626 case CEPH_MDS_SESSION_NEW: return "new"; 627 case CEPH_MDS_SESSION_OPENING: return "opening"; 628 case CEPH_MDS_SESSION_OPEN: return "open"; 629 case CEPH_MDS_SESSION_HUNG: return "hung"; 630 case CEPH_MDS_SESSION_CLOSING: return "closing"; 631 case CEPH_MDS_SESSION_CLOSED: return "closed"; 632 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 633 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 634 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 635 default: return "???"; 636 } 637 } 638 639 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 640 { 641 if (refcount_inc_not_zero(&s->s_ref)) { 642 dout("mdsc get_session %p %d -> %d\n", s, 643 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref)); 644 return s; 645 } else { 646 dout("mdsc get_session %p 0 -- FAIL\n", s); 647 return NULL; 648 } 649 } 650 651 void ceph_put_mds_session(struct ceph_mds_session *s) 652 { 653 dout("mdsc put_session %p %d -> %d\n", s, 654 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1); 655 if (refcount_dec_and_test(&s->s_ref)) { 656 if (s->s_auth.authorizer) 657 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 658 WARN_ON(mutex_is_locked(&s->s_mutex)); 659 xa_destroy(&s->s_delegated_inos); 660 kfree(s); 661 } 662 } 663 664 /* 665 * called under mdsc->mutex 666 */ 667 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 668 int mds) 669 { 670 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 671 return NULL; 672 return ceph_get_mds_session(mdsc->sessions[mds]); 673 } 674 675 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 676 { 677 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 678 return false; 679 else 680 return true; 681 } 682 683 static int __verify_registered_session(struct ceph_mds_client *mdsc, 684 struct ceph_mds_session *s) 685 { 686 if (s->s_mds >= mdsc->max_sessions || 687 mdsc->sessions[s->s_mds] != s) 688 return -ENOENT; 689 return 0; 690 } 691 692 /* 693 * create+register a new session for given mds. 694 * called under mdsc->mutex. 695 */ 696 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 697 int mds) 698 { 699 struct ceph_mds_session *s; 700 701 if (mds >= mdsc->mdsmap->possible_max_rank) 702 return ERR_PTR(-EINVAL); 703 704 s = kzalloc(sizeof(*s), GFP_NOFS); 705 if (!s) 706 return ERR_PTR(-ENOMEM); 707 708 if (mds >= mdsc->max_sessions) { 709 int newmax = 1 << get_count_order(mds + 1); 710 struct ceph_mds_session **sa; 711 712 dout("%s: realloc to %d\n", __func__, newmax); 713 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 714 if (!sa) 715 goto fail_realloc; 716 if (mdsc->sessions) { 717 memcpy(sa, mdsc->sessions, 718 mdsc->max_sessions * sizeof(void *)); 719 kfree(mdsc->sessions); 720 } 721 mdsc->sessions = sa; 722 mdsc->max_sessions = newmax; 723 } 724 725 dout("%s: mds%d\n", __func__, mds); 726 s->s_mdsc = mdsc; 727 s->s_mds = mds; 728 s->s_state = CEPH_MDS_SESSION_NEW; 729 s->s_ttl = 0; 730 s->s_seq = 0; 731 mutex_init(&s->s_mutex); 732 733 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 734 735 spin_lock_init(&s->s_gen_ttl_lock); 736 s->s_cap_gen = 1; 737 s->s_cap_ttl = jiffies - 1; 738 739 spin_lock_init(&s->s_cap_lock); 740 s->s_renew_requested = 0; 741 s->s_renew_seq = 0; 742 INIT_LIST_HEAD(&s->s_caps); 743 s->s_nr_caps = 0; 744 refcount_set(&s->s_ref, 1); 745 INIT_LIST_HEAD(&s->s_waiting); 746 INIT_LIST_HEAD(&s->s_unsafe); 747 xa_init(&s->s_delegated_inos); 748 s->s_num_cap_releases = 0; 749 s->s_cap_reconnect = 0; 750 s->s_cap_iterator = NULL; 751 INIT_LIST_HEAD(&s->s_cap_releases); 752 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 753 754 INIT_LIST_HEAD(&s->s_cap_dirty); 755 INIT_LIST_HEAD(&s->s_cap_flushing); 756 757 mdsc->sessions[mds] = s; 758 atomic_inc(&mdsc->num_sessions); 759 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 760 761 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 762 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 763 764 return s; 765 766 fail_realloc: 767 kfree(s); 768 return ERR_PTR(-ENOMEM); 769 } 770 771 /* 772 * called under mdsc->mutex 773 */ 774 static void __unregister_session(struct ceph_mds_client *mdsc, 775 struct ceph_mds_session *s) 776 { 777 dout("__unregister_session mds%d %p\n", s->s_mds, s); 778 BUG_ON(mdsc->sessions[s->s_mds] != s); 779 mdsc->sessions[s->s_mds] = NULL; 780 ceph_con_close(&s->s_con); 781 ceph_put_mds_session(s); 782 atomic_dec(&mdsc->num_sessions); 783 } 784 785 /* 786 * drop session refs in request. 787 * 788 * should be last request ref, or hold mdsc->mutex 789 */ 790 static void put_request_session(struct ceph_mds_request *req) 791 { 792 if (req->r_session) { 793 ceph_put_mds_session(req->r_session); 794 req->r_session = NULL; 795 } 796 } 797 798 void ceph_mdsc_release_request(struct kref *kref) 799 { 800 struct ceph_mds_request *req = container_of(kref, 801 struct ceph_mds_request, 802 r_kref); 803 ceph_mdsc_release_dir_caps_no_check(req); 804 destroy_reply_info(&req->r_reply_info); 805 if (req->r_request) 806 ceph_msg_put(req->r_request); 807 if (req->r_reply) 808 ceph_msg_put(req->r_reply); 809 if (req->r_inode) { 810 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 811 /* avoid calling iput_final() in mds dispatch threads */ 812 ceph_async_iput(req->r_inode); 813 } 814 if (req->r_parent) { 815 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 816 ceph_async_iput(req->r_parent); 817 } 818 ceph_async_iput(req->r_target_inode); 819 if (req->r_dentry) 820 dput(req->r_dentry); 821 if (req->r_old_dentry) 822 dput(req->r_old_dentry); 823 if (req->r_old_dentry_dir) { 824 /* 825 * track (and drop pins for) r_old_dentry_dir 826 * separately, since r_old_dentry's d_parent may have 827 * changed between the dir mutex being dropped and 828 * this request being freed. 829 */ 830 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 831 CEPH_CAP_PIN); 832 ceph_async_iput(req->r_old_dentry_dir); 833 } 834 kfree(req->r_path1); 835 kfree(req->r_path2); 836 put_cred(req->r_cred); 837 if (req->r_pagelist) 838 ceph_pagelist_release(req->r_pagelist); 839 put_request_session(req); 840 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 841 WARN_ON_ONCE(!list_empty(&req->r_wait)); 842 kmem_cache_free(ceph_mds_request_cachep, req); 843 } 844 845 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 846 847 /* 848 * lookup session, bump ref if found. 849 * 850 * called under mdsc->mutex. 851 */ 852 static struct ceph_mds_request * 853 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 854 { 855 struct ceph_mds_request *req; 856 857 req = lookup_request(&mdsc->request_tree, tid); 858 if (req) 859 ceph_mdsc_get_request(req); 860 861 return req; 862 } 863 864 /* 865 * Register an in-flight request, and assign a tid. Link to directory 866 * are modifying (if any). 867 * 868 * Called under mdsc->mutex. 869 */ 870 static void __register_request(struct ceph_mds_client *mdsc, 871 struct ceph_mds_request *req, 872 struct inode *dir) 873 { 874 int ret = 0; 875 876 req->r_tid = ++mdsc->last_tid; 877 if (req->r_num_caps) { 878 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 879 req->r_num_caps); 880 if (ret < 0) { 881 pr_err("__register_request %p " 882 "failed to reserve caps: %d\n", req, ret); 883 /* set req->r_err to fail early from __do_request */ 884 req->r_err = ret; 885 return; 886 } 887 } 888 dout("__register_request %p tid %lld\n", req, req->r_tid); 889 ceph_mdsc_get_request(req); 890 insert_request(&mdsc->request_tree, req); 891 892 req->r_cred = get_current_cred(); 893 894 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 895 mdsc->oldest_tid = req->r_tid; 896 897 if (dir) { 898 struct ceph_inode_info *ci = ceph_inode(dir); 899 900 ihold(dir); 901 req->r_unsafe_dir = dir; 902 spin_lock(&ci->i_unsafe_lock); 903 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 904 spin_unlock(&ci->i_unsafe_lock); 905 } 906 } 907 908 static void __unregister_request(struct ceph_mds_client *mdsc, 909 struct ceph_mds_request *req) 910 { 911 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 912 913 /* Never leave an unregistered request on an unsafe list! */ 914 list_del_init(&req->r_unsafe_item); 915 916 if (req->r_tid == mdsc->oldest_tid) { 917 struct rb_node *p = rb_next(&req->r_node); 918 mdsc->oldest_tid = 0; 919 while (p) { 920 struct ceph_mds_request *next_req = 921 rb_entry(p, struct ceph_mds_request, r_node); 922 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 923 mdsc->oldest_tid = next_req->r_tid; 924 break; 925 } 926 p = rb_next(p); 927 } 928 } 929 930 erase_request(&mdsc->request_tree, req); 931 932 if (req->r_unsafe_dir) { 933 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 934 spin_lock(&ci->i_unsafe_lock); 935 list_del_init(&req->r_unsafe_dir_item); 936 spin_unlock(&ci->i_unsafe_lock); 937 } 938 if (req->r_target_inode && 939 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 940 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 941 spin_lock(&ci->i_unsafe_lock); 942 list_del_init(&req->r_unsafe_target_item); 943 spin_unlock(&ci->i_unsafe_lock); 944 } 945 946 if (req->r_unsafe_dir) { 947 /* avoid calling iput_final() in mds dispatch threads */ 948 ceph_async_iput(req->r_unsafe_dir); 949 req->r_unsafe_dir = NULL; 950 } 951 952 complete_all(&req->r_safe_completion); 953 954 ceph_mdsc_put_request(req); 955 } 956 957 /* 958 * Walk back up the dentry tree until we hit a dentry representing a 959 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 960 * when calling this) to ensure that the objects won't disappear while we're 961 * working with them. Once we hit a candidate dentry, we attempt to take a 962 * reference to it, and return that as the result. 963 */ 964 static struct inode *get_nonsnap_parent(struct dentry *dentry) 965 { 966 struct inode *inode = NULL; 967 968 while (dentry && !IS_ROOT(dentry)) { 969 inode = d_inode_rcu(dentry); 970 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 971 break; 972 dentry = dentry->d_parent; 973 } 974 if (inode) 975 inode = igrab(inode); 976 return inode; 977 } 978 979 /* 980 * Choose mds to send request to next. If there is a hint set in the 981 * request (e.g., due to a prior forward hint from the mds), use that. 982 * Otherwise, consult frag tree and/or caps to identify the 983 * appropriate mds. If all else fails, choose randomly. 984 * 985 * Called under mdsc->mutex. 986 */ 987 static int __choose_mds(struct ceph_mds_client *mdsc, 988 struct ceph_mds_request *req, 989 bool *random) 990 { 991 struct inode *inode; 992 struct ceph_inode_info *ci; 993 struct ceph_cap *cap; 994 int mode = req->r_direct_mode; 995 int mds = -1; 996 u32 hash = req->r_direct_hash; 997 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 998 999 if (random) 1000 *random = false; 1001 1002 /* 1003 * is there a specific mds we should try? ignore hint if we have 1004 * no session and the mds is not up (active or recovering). 1005 */ 1006 if (req->r_resend_mds >= 0 && 1007 (__have_session(mdsc, req->r_resend_mds) || 1008 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1009 dout("%s using resend_mds mds%d\n", __func__, 1010 req->r_resend_mds); 1011 return req->r_resend_mds; 1012 } 1013 1014 if (mode == USE_RANDOM_MDS) 1015 goto random; 1016 1017 inode = NULL; 1018 if (req->r_inode) { 1019 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1020 inode = req->r_inode; 1021 ihold(inode); 1022 } else { 1023 /* req->r_dentry is non-null for LSSNAP request */ 1024 rcu_read_lock(); 1025 inode = get_nonsnap_parent(req->r_dentry); 1026 rcu_read_unlock(); 1027 dout("%s using snapdir's parent %p\n", __func__, inode); 1028 } 1029 } else if (req->r_dentry) { 1030 /* ignore race with rename; old or new d_parent is okay */ 1031 struct dentry *parent; 1032 struct inode *dir; 1033 1034 rcu_read_lock(); 1035 parent = READ_ONCE(req->r_dentry->d_parent); 1036 dir = req->r_parent ? : d_inode_rcu(parent); 1037 1038 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1039 /* not this fs or parent went negative */ 1040 inode = d_inode(req->r_dentry); 1041 if (inode) 1042 ihold(inode); 1043 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1044 /* direct snapped/virtual snapdir requests 1045 * based on parent dir inode */ 1046 inode = get_nonsnap_parent(parent); 1047 dout("%s using nonsnap parent %p\n", __func__, inode); 1048 } else { 1049 /* dentry target */ 1050 inode = d_inode(req->r_dentry); 1051 if (!inode || mode == USE_AUTH_MDS) { 1052 /* dir + name */ 1053 inode = igrab(dir); 1054 hash = ceph_dentry_hash(dir, req->r_dentry); 1055 is_hash = true; 1056 } else { 1057 ihold(inode); 1058 } 1059 } 1060 rcu_read_unlock(); 1061 } 1062 1063 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, 1064 hash, mode); 1065 if (!inode) 1066 goto random; 1067 ci = ceph_inode(inode); 1068 1069 if (is_hash && S_ISDIR(inode->i_mode)) { 1070 struct ceph_inode_frag frag; 1071 int found; 1072 1073 ceph_choose_frag(ci, hash, &frag, &found); 1074 if (found) { 1075 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1076 u8 r; 1077 1078 /* choose a random replica */ 1079 get_random_bytes(&r, 1); 1080 r %= frag.ndist; 1081 mds = frag.dist[r]; 1082 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", 1083 __func__, inode, ceph_vinop(inode), 1084 frag.frag, mds, (int)r, frag.ndist); 1085 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1086 CEPH_MDS_STATE_ACTIVE && 1087 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1088 goto out; 1089 } 1090 1091 /* since this file/dir wasn't known to be 1092 * replicated, then we want to look for the 1093 * authoritative mds. */ 1094 if (frag.mds >= 0) { 1095 /* choose auth mds */ 1096 mds = frag.mds; 1097 dout("%s %p %llx.%llx frag %u mds%d (auth)\n", 1098 __func__, inode, ceph_vinop(inode), 1099 frag.frag, mds); 1100 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1101 CEPH_MDS_STATE_ACTIVE) { 1102 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1103 mds)) 1104 goto out; 1105 } 1106 } 1107 mode = USE_AUTH_MDS; 1108 } 1109 } 1110 1111 spin_lock(&ci->i_ceph_lock); 1112 cap = NULL; 1113 if (mode == USE_AUTH_MDS) 1114 cap = ci->i_auth_cap; 1115 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1116 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1117 if (!cap) { 1118 spin_unlock(&ci->i_ceph_lock); 1119 ceph_async_iput(inode); 1120 goto random; 1121 } 1122 mds = cap->session->s_mds; 1123 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, 1124 inode, ceph_vinop(inode), mds, 1125 cap == ci->i_auth_cap ? "auth " : "", cap); 1126 spin_unlock(&ci->i_ceph_lock); 1127 out: 1128 /* avoid calling iput_final() while holding mdsc->mutex or 1129 * in mds dispatch threads */ 1130 ceph_async_iput(inode); 1131 return mds; 1132 1133 random: 1134 if (random) 1135 *random = true; 1136 1137 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1138 dout("%s chose random mds%d\n", __func__, mds); 1139 return mds; 1140 } 1141 1142 1143 /* 1144 * session messages 1145 */ 1146 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 1147 { 1148 struct ceph_msg *msg; 1149 struct ceph_mds_session_head *h; 1150 1151 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1152 false); 1153 if (!msg) { 1154 pr_err("create_session_msg ENOMEM creating msg\n"); 1155 return NULL; 1156 } 1157 h = msg->front.iov_base; 1158 h->op = cpu_to_le32(op); 1159 h->seq = cpu_to_le64(seq); 1160 1161 return msg; 1162 } 1163 1164 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1165 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1166 static int encode_supported_features(void **p, void *end) 1167 { 1168 static const size_t count = ARRAY_SIZE(feature_bits); 1169 1170 if (count > 0) { 1171 size_t i; 1172 size_t size = FEATURE_BYTES(count); 1173 1174 if (WARN_ON_ONCE(*p + 4 + size > end)) 1175 return -ERANGE; 1176 1177 ceph_encode_32(p, size); 1178 memset(*p, 0, size); 1179 for (i = 0; i < count; i++) 1180 ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8); 1181 *p += size; 1182 } else { 1183 if (WARN_ON_ONCE(*p + 4 > end)) 1184 return -ERANGE; 1185 1186 ceph_encode_32(p, 0); 1187 } 1188 1189 return 0; 1190 } 1191 1192 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1193 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1194 static int encode_metric_spec(void **p, void *end) 1195 { 1196 static const size_t count = ARRAY_SIZE(metric_bits); 1197 1198 /* header */ 1199 if (WARN_ON_ONCE(*p + 2 > end)) 1200 return -ERANGE; 1201 1202 ceph_encode_8(p, 1); /* version */ 1203 ceph_encode_8(p, 1); /* compat */ 1204 1205 if (count > 0) { 1206 size_t i; 1207 size_t size = METRIC_BYTES(count); 1208 1209 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1210 return -ERANGE; 1211 1212 /* metric spec info length */ 1213 ceph_encode_32(p, 4 + size); 1214 1215 /* metric spec */ 1216 ceph_encode_32(p, size); 1217 memset(*p, 0, size); 1218 for (i = 0; i < count; i++) 1219 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1220 *p += size; 1221 } else { 1222 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1223 return -ERANGE; 1224 1225 /* metric spec info length */ 1226 ceph_encode_32(p, 4); 1227 /* metric spec */ 1228 ceph_encode_32(p, 0); 1229 } 1230 1231 return 0; 1232 } 1233 1234 /* 1235 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1236 * to include additional client metadata fields. 1237 */ 1238 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1239 { 1240 struct ceph_msg *msg; 1241 struct ceph_mds_session_head *h; 1242 int i; 1243 int extra_bytes = 0; 1244 int metadata_key_count = 0; 1245 struct ceph_options *opt = mdsc->fsc->client->options; 1246 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1247 size_t size, count; 1248 void *p, *end; 1249 int ret; 1250 1251 const char* metadata[][2] = { 1252 {"hostname", mdsc->nodename}, 1253 {"kernel_version", init_utsname()->release}, 1254 {"entity_id", opt->name ? : ""}, 1255 {"root", fsopt->server_path ? : "/"}, 1256 {NULL, NULL} 1257 }; 1258 1259 /* Calculate serialized length of metadata */ 1260 extra_bytes = 4; /* map length */ 1261 for (i = 0; metadata[i][0]; ++i) { 1262 extra_bytes += 8 + strlen(metadata[i][0]) + 1263 strlen(metadata[i][1]); 1264 metadata_key_count++; 1265 } 1266 1267 /* supported feature */ 1268 size = 0; 1269 count = ARRAY_SIZE(feature_bits); 1270 if (count > 0) 1271 size = FEATURE_BYTES(count); 1272 extra_bytes += 4 + size; 1273 1274 /* metric spec */ 1275 size = 0; 1276 count = ARRAY_SIZE(metric_bits); 1277 if (count > 0) 1278 size = METRIC_BYTES(count); 1279 extra_bytes += 2 + 4 + 4 + size; 1280 1281 /* Allocate the message */ 1282 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1283 GFP_NOFS, false); 1284 if (!msg) { 1285 pr_err("create_session_msg ENOMEM creating msg\n"); 1286 return ERR_PTR(-ENOMEM); 1287 } 1288 p = msg->front.iov_base; 1289 end = p + msg->front.iov_len; 1290 1291 h = p; 1292 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1293 h->seq = cpu_to_le64(seq); 1294 1295 /* 1296 * Serialize client metadata into waiting buffer space, using 1297 * the format that userspace expects for map<string, string> 1298 * 1299 * ClientSession messages with metadata are v4 1300 */ 1301 msg->hdr.version = cpu_to_le16(4); 1302 msg->hdr.compat_version = cpu_to_le16(1); 1303 1304 /* The write pointer, following the session_head structure */ 1305 p += sizeof(*h); 1306 1307 /* Number of entries in the map */ 1308 ceph_encode_32(&p, metadata_key_count); 1309 1310 /* Two length-prefixed strings for each entry in the map */ 1311 for (i = 0; metadata[i][0]; ++i) { 1312 size_t const key_len = strlen(metadata[i][0]); 1313 size_t const val_len = strlen(metadata[i][1]); 1314 1315 ceph_encode_32(&p, key_len); 1316 memcpy(p, metadata[i][0], key_len); 1317 p += key_len; 1318 ceph_encode_32(&p, val_len); 1319 memcpy(p, metadata[i][1], val_len); 1320 p += val_len; 1321 } 1322 1323 ret = encode_supported_features(&p, end); 1324 if (ret) { 1325 pr_err("encode_supported_features failed!\n"); 1326 ceph_msg_put(msg); 1327 return ERR_PTR(ret); 1328 } 1329 1330 ret = encode_metric_spec(&p, end); 1331 if (ret) { 1332 pr_err("encode_metric_spec failed!\n"); 1333 ceph_msg_put(msg); 1334 return ERR_PTR(ret); 1335 } 1336 1337 msg->front.iov_len = p - msg->front.iov_base; 1338 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1339 1340 return msg; 1341 } 1342 1343 /* 1344 * send session open request. 1345 * 1346 * called under mdsc->mutex 1347 */ 1348 static int __open_session(struct ceph_mds_client *mdsc, 1349 struct ceph_mds_session *session) 1350 { 1351 struct ceph_msg *msg; 1352 int mstate; 1353 int mds = session->s_mds; 1354 1355 /* wait for mds to go active? */ 1356 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1357 dout("open_session to mds%d (%s)\n", mds, 1358 ceph_mds_state_name(mstate)); 1359 session->s_state = CEPH_MDS_SESSION_OPENING; 1360 session->s_renew_requested = jiffies; 1361 1362 /* send connect message */ 1363 msg = create_session_open_msg(mdsc, session->s_seq); 1364 if (IS_ERR(msg)) 1365 return PTR_ERR(msg); 1366 ceph_con_send(&session->s_con, msg); 1367 return 0; 1368 } 1369 1370 /* 1371 * open sessions for any export targets for the given mds 1372 * 1373 * called under mdsc->mutex 1374 */ 1375 static struct ceph_mds_session * 1376 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1377 { 1378 struct ceph_mds_session *session; 1379 int ret; 1380 1381 session = __ceph_lookup_mds_session(mdsc, target); 1382 if (!session) { 1383 session = register_session(mdsc, target); 1384 if (IS_ERR(session)) 1385 return session; 1386 } 1387 if (session->s_state == CEPH_MDS_SESSION_NEW || 1388 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1389 ret = __open_session(mdsc, session); 1390 if (ret) 1391 return ERR_PTR(ret); 1392 } 1393 1394 return session; 1395 } 1396 1397 struct ceph_mds_session * 1398 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1399 { 1400 struct ceph_mds_session *session; 1401 1402 dout("open_export_target_session to mds%d\n", target); 1403 1404 mutex_lock(&mdsc->mutex); 1405 session = __open_export_target_session(mdsc, target); 1406 mutex_unlock(&mdsc->mutex); 1407 1408 return session; 1409 } 1410 1411 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1412 struct ceph_mds_session *session) 1413 { 1414 struct ceph_mds_info *mi; 1415 struct ceph_mds_session *ts; 1416 int i, mds = session->s_mds; 1417 1418 if (mds >= mdsc->mdsmap->possible_max_rank) 1419 return; 1420 1421 mi = &mdsc->mdsmap->m_info[mds]; 1422 dout("open_export_target_sessions for mds%d (%d targets)\n", 1423 session->s_mds, mi->num_export_targets); 1424 1425 for (i = 0; i < mi->num_export_targets; i++) { 1426 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1427 if (!IS_ERR(ts)) 1428 ceph_put_mds_session(ts); 1429 } 1430 } 1431 1432 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1433 struct ceph_mds_session *session) 1434 { 1435 mutex_lock(&mdsc->mutex); 1436 __open_export_target_sessions(mdsc, session); 1437 mutex_unlock(&mdsc->mutex); 1438 } 1439 1440 /* 1441 * session caps 1442 */ 1443 1444 static void detach_cap_releases(struct ceph_mds_session *session, 1445 struct list_head *target) 1446 { 1447 lockdep_assert_held(&session->s_cap_lock); 1448 1449 list_splice_init(&session->s_cap_releases, target); 1450 session->s_num_cap_releases = 0; 1451 dout("dispose_cap_releases mds%d\n", session->s_mds); 1452 } 1453 1454 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1455 struct list_head *dispose) 1456 { 1457 while (!list_empty(dispose)) { 1458 struct ceph_cap *cap; 1459 /* zero out the in-progress message */ 1460 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1461 list_del(&cap->session_caps); 1462 ceph_put_cap(mdsc, cap); 1463 } 1464 } 1465 1466 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1467 struct ceph_mds_session *session) 1468 { 1469 struct ceph_mds_request *req; 1470 struct rb_node *p; 1471 struct ceph_inode_info *ci; 1472 1473 dout("cleanup_session_requests mds%d\n", session->s_mds); 1474 mutex_lock(&mdsc->mutex); 1475 while (!list_empty(&session->s_unsafe)) { 1476 req = list_first_entry(&session->s_unsafe, 1477 struct ceph_mds_request, r_unsafe_item); 1478 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1479 req->r_tid); 1480 if (req->r_target_inode) { 1481 /* dropping unsafe change of inode's attributes */ 1482 ci = ceph_inode(req->r_target_inode); 1483 errseq_set(&ci->i_meta_err, -EIO); 1484 } 1485 if (req->r_unsafe_dir) { 1486 /* dropping unsafe directory operation */ 1487 ci = ceph_inode(req->r_unsafe_dir); 1488 errseq_set(&ci->i_meta_err, -EIO); 1489 } 1490 __unregister_request(mdsc, req); 1491 } 1492 /* zero r_attempts, so kick_requests() will re-send requests */ 1493 p = rb_first(&mdsc->request_tree); 1494 while (p) { 1495 req = rb_entry(p, struct ceph_mds_request, r_node); 1496 p = rb_next(p); 1497 if (req->r_session && 1498 req->r_session->s_mds == session->s_mds) 1499 req->r_attempts = 0; 1500 } 1501 mutex_unlock(&mdsc->mutex); 1502 } 1503 1504 /* 1505 * Helper to safely iterate over all caps associated with a session, with 1506 * special care taken to handle a racing __ceph_remove_cap(). 1507 * 1508 * Caller must hold session s_mutex. 1509 */ 1510 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1511 int (*cb)(struct inode *, struct ceph_cap *, 1512 void *), void *arg) 1513 { 1514 struct list_head *p; 1515 struct ceph_cap *cap; 1516 struct inode *inode, *last_inode = NULL; 1517 struct ceph_cap *old_cap = NULL; 1518 int ret; 1519 1520 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1521 spin_lock(&session->s_cap_lock); 1522 p = session->s_caps.next; 1523 while (p != &session->s_caps) { 1524 cap = list_entry(p, struct ceph_cap, session_caps); 1525 inode = igrab(&cap->ci->vfs_inode); 1526 if (!inode) { 1527 p = p->next; 1528 continue; 1529 } 1530 session->s_cap_iterator = cap; 1531 spin_unlock(&session->s_cap_lock); 1532 1533 if (last_inode) { 1534 /* avoid calling iput_final() while holding 1535 * s_mutex or in mds dispatch threads */ 1536 ceph_async_iput(last_inode); 1537 last_inode = NULL; 1538 } 1539 if (old_cap) { 1540 ceph_put_cap(session->s_mdsc, old_cap); 1541 old_cap = NULL; 1542 } 1543 1544 ret = cb(inode, cap, arg); 1545 last_inode = inode; 1546 1547 spin_lock(&session->s_cap_lock); 1548 p = p->next; 1549 if (!cap->ci) { 1550 dout("iterate_session_caps finishing cap %p removal\n", 1551 cap); 1552 BUG_ON(cap->session != session); 1553 cap->session = NULL; 1554 list_del_init(&cap->session_caps); 1555 session->s_nr_caps--; 1556 atomic64_dec(&session->s_mdsc->metric.total_caps); 1557 if (cap->queue_release) 1558 __ceph_queue_cap_release(session, cap); 1559 else 1560 old_cap = cap; /* put_cap it w/o locks held */ 1561 } 1562 if (ret < 0) 1563 goto out; 1564 } 1565 ret = 0; 1566 out: 1567 session->s_cap_iterator = NULL; 1568 spin_unlock(&session->s_cap_lock); 1569 1570 ceph_async_iput(last_inode); 1571 if (old_cap) 1572 ceph_put_cap(session->s_mdsc, old_cap); 1573 1574 return ret; 1575 } 1576 1577 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1578 void *arg) 1579 { 1580 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg; 1581 struct ceph_inode_info *ci = ceph_inode(inode); 1582 LIST_HEAD(to_remove); 1583 bool dirty_dropped = false; 1584 bool invalidate = false; 1585 1586 dout("removing cap %p, ci is %p, inode is %p\n", 1587 cap, ci, &ci->vfs_inode); 1588 spin_lock(&ci->i_ceph_lock); 1589 __ceph_remove_cap(cap, false); 1590 if (!ci->i_auth_cap) { 1591 struct ceph_cap_flush *cf; 1592 struct ceph_mds_client *mdsc = fsc->mdsc; 1593 1594 if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) { 1595 if (inode->i_data.nrpages > 0) 1596 invalidate = true; 1597 if (ci->i_wrbuffer_ref > 0) 1598 mapping_set_error(&inode->i_data, -EIO); 1599 } 1600 1601 while (!list_empty(&ci->i_cap_flush_list)) { 1602 cf = list_first_entry(&ci->i_cap_flush_list, 1603 struct ceph_cap_flush, i_list); 1604 list_move(&cf->i_list, &to_remove); 1605 } 1606 1607 spin_lock(&mdsc->cap_dirty_lock); 1608 1609 list_for_each_entry(cf, &to_remove, i_list) 1610 list_del(&cf->g_list); 1611 1612 if (!list_empty(&ci->i_dirty_item)) { 1613 pr_warn_ratelimited( 1614 " dropping dirty %s state for %p %lld\n", 1615 ceph_cap_string(ci->i_dirty_caps), 1616 inode, ceph_ino(inode)); 1617 ci->i_dirty_caps = 0; 1618 list_del_init(&ci->i_dirty_item); 1619 dirty_dropped = true; 1620 } 1621 if (!list_empty(&ci->i_flushing_item)) { 1622 pr_warn_ratelimited( 1623 " dropping dirty+flushing %s state for %p %lld\n", 1624 ceph_cap_string(ci->i_flushing_caps), 1625 inode, ceph_ino(inode)); 1626 ci->i_flushing_caps = 0; 1627 list_del_init(&ci->i_flushing_item); 1628 mdsc->num_cap_flushing--; 1629 dirty_dropped = true; 1630 } 1631 spin_unlock(&mdsc->cap_dirty_lock); 1632 1633 if (dirty_dropped) { 1634 errseq_set(&ci->i_meta_err, -EIO); 1635 1636 if (ci->i_wrbuffer_ref_head == 0 && 1637 ci->i_wr_ref == 0 && 1638 ci->i_dirty_caps == 0 && 1639 ci->i_flushing_caps == 0) { 1640 ceph_put_snap_context(ci->i_head_snapc); 1641 ci->i_head_snapc = NULL; 1642 } 1643 } 1644 1645 if (atomic_read(&ci->i_filelock_ref) > 0) { 1646 /* make further file lock syscall return -EIO */ 1647 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK; 1648 pr_warn_ratelimited(" dropping file locks for %p %lld\n", 1649 inode, ceph_ino(inode)); 1650 } 1651 1652 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { 1653 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); 1654 ci->i_prealloc_cap_flush = NULL; 1655 } 1656 } 1657 spin_unlock(&ci->i_ceph_lock); 1658 while (!list_empty(&to_remove)) { 1659 struct ceph_cap_flush *cf; 1660 cf = list_first_entry(&to_remove, 1661 struct ceph_cap_flush, i_list); 1662 list_del(&cf->i_list); 1663 ceph_free_cap_flush(cf); 1664 } 1665 1666 wake_up_all(&ci->i_cap_wq); 1667 if (invalidate) 1668 ceph_queue_invalidate(inode); 1669 if (dirty_dropped) 1670 iput(inode); 1671 return 0; 1672 } 1673 1674 /* 1675 * caller must hold session s_mutex 1676 */ 1677 static void remove_session_caps(struct ceph_mds_session *session) 1678 { 1679 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1680 struct super_block *sb = fsc->sb; 1681 LIST_HEAD(dispose); 1682 1683 dout("remove_session_caps on %p\n", session); 1684 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1685 1686 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1687 1688 spin_lock(&session->s_cap_lock); 1689 if (session->s_nr_caps > 0) { 1690 struct inode *inode; 1691 struct ceph_cap *cap, *prev = NULL; 1692 struct ceph_vino vino; 1693 /* 1694 * iterate_session_caps() skips inodes that are being 1695 * deleted, we need to wait until deletions are complete. 1696 * __wait_on_freeing_inode() is designed for the job, 1697 * but it is not exported, so use lookup inode function 1698 * to access it. 1699 */ 1700 while (!list_empty(&session->s_caps)) { 1701 cap = list_entry(session->s_caps.next, 1702 struct ceph_cap, session_caps); 1703 if (cap == prev) 1704 break; 1705 prev = cap; 1706 vino = cap->ci->i_vino; 1707 spin_unlock(&session->s_cap_lock); 1708 1709 inode = ceph_find_inode(sb, vino); 1710 /* avoid calling iput_final() while holding s_mutex */ 1711 ceph_async_iput(inode); 1712 1713 spin_lock(&session->s_cap_lock); 1714 } 1715 } 1716 1717 // drop cap expires and unlock s_cap_lock 1718 detach_cap_releases(session, &dispose); 1719 1720 BUG_ON(session->s_nr_caps > 0); 1721 BUG_ON(!list_empty(&session->s_cap_flushing)); 1722 spin_unlock(&session->s_cap_lock); 1723 dispose_cap_releases(session->s_mdsc, &dispose); 1724 } 1725 1726 enum { 1727 RECONNECT, 1728 RENEWCAPS, 1729 FORCE_RO, 1730 }; 1731 1732 /* 1733 * wake up any threads waiting on this session's caps. if the cap is 1734 * old (didn't get renewed on the client reconnect), remove it now. 1735 * 1736 * caller must hold s_mutex. 1737 */ 1738 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1739 void *arg) 1740 { 1741 struct ceph_inode_info *ci = ceph_inode(inode); 1742 unsigned long ev = (unsigned long)arg; 1743 1744 if (ev == RECONNECT) { 1745 spin_lock(&ci->i_ceph_lock); 1746 ci->i_wanted_max_size = 0; 1747 ci->i_requested_max_size = 0; 1748 spin_unlock(&ci->i_ceph_lock); 1749 } else if (ev == RENEWCAPS) { 1750 if (cap->cap_gen < cap->session->s_cap_gen) { 1751 /* mds did not re-issue stale cap */ 1752 spin_lock(&ci->i_ceph_lock); 1753 cap->issued = cap->implemented = CEPH_CAP_PIN; 1754 spin_unlock(&ci->i_ceph_lock); 1755 } 1756 } else if (ev == FORCE_RO) { 1757 } 1758 wake_up_all(&ci->i_cap_wq); 1759 return 0; 1760 } 1761 1762 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1763 { 1764 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1765 ceph_iterate_session_caps(session, wake_up_session_cb, 1766 (void *)(unsigned long)ev); 1767 } 1768 1769 /* 1770 * Send periodic message to MDS renewing all currently held caps. The 1771 * ack will reset the expiration for all caps from this session. 1772 * 1773 * caller holds s_mutex 1774 */ 1775 static int send_renew_caps(struct ceph_mds_client *mdsc, 1776 struct ceph_mds_session *session) 1777 { 1778 struct ceph_msg *msg; 1779 int state; 1780 1781 if (time_after_eq(jiffies, session->s_cap_ttl) && 1782 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1783 pr_info("mds%d caps stale\n", session->s_mds); 1784 session->s_renew_requested = jiffies; 1785 1786 /* do not try to renew caps until a recovering mds has reconnected 1787 * with its clients. */ 1788 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1789 if (state < CEPH_MDS_STATE_RECONNECT) { 1790 dout("send_renew_caps ignoring mds%d (%s)\n", 1791 session->s_mds, ceph_mds_state_name(state)); 1792 return 0; 1793 } 1794 1795 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1796 ceph_mds_state_name(state)); 1797 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1798 ++session->s_renew_seq); 1799 if (!msg) 1800 return -ENOMEM; 1801 ceph_con_send(&session->s_con, msg); 1802 return 0; 1803 } 1804 1805 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1806 struct ceph_mds_session *session, u64 seq) 1807 { 1808 struct ceph_msg *msg; 1809 1810 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1811 session->s_mds, ceph_session_state_name(session->s_state), seq); 1812 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1813 if (!msg) 1814 return -ENOMEM; 1815 ceph_con_send(&session->s_con, msg); 1816 return 0; 1817 } 1818 1819 1820 /* 1821 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1822 * 1823 * Called under session->s_mutex 1824 */ 1825 static void renewed_caps(struct ceph_mds_client *mdsc, 1826 struct ceph_mds_session *session, int is_renew) 1827 { 1828 int was_stale; 1829 int wake = 0; 1830 1831 spin_lock(&session->s_cap_lock); 1832 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1833 1834 session->s_cap_ttl = session->s_renew_requested + 1835 mdsc->mdsmap->m_session_timeout*HZ; 1836 1837 if (was_stale) { 1838 if (time_before(jiffies, session->s_cap_ttl)) { 1839 pr_info("mds%d caps renewed\n", session->s_mds); 1840 wake = 1; 1841 } else { 1842 pr_info("mds%d caps still stale\n", session->s_mds); 1843 } 1844 } 1845 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1846 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1847 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1848 spin_unlock(&session->s_cap_lock); 1849 1850 if (wake) 1851 wake_up_session_caps(session, RENEWCAPS); 1852 } 1853 1854 /* 1855 * send a session close request 1856 */ 1857 static int request_close_session(struct ceph_mds_session *session) 1858 { 1859 struct ceph_msg *msg; 1860 1861 dout("request_close_session mds%d state %s seq %lld\n", 1862 session->s_mds, ceph_session_state_name(session->s_state), 1863 session->s_seq); 1864 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1865 if (!msg) 1866 return -ENOMEM; 1867 ceph_con_send(&session->s_con, msg); 1868 return 1; 1869 } 1870 1871 /* 1872 * Called with s_mutex held. 1873 */ 1874 static int __close_session(struct ceph_mds_client *mdsc, 1875 struct ceph_mds_session *session) 1876 { 1877 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1878 return 0; 1879 session->s_state = CEPH_MDS_SESSION_CLOSING; 1880 return request_close_session(session); 1881 } 1882 1883 static bool drop_negative_children(struct dentry *dentry) 1884 { 1885 struct dentry *child; 1886 bool all_negative = true; 1887 1888 if (!d_is_dir(dentry)) 1889 goto out; 1890 1891 spin_lock(&dentry->d_lock); 1892 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1893 if (d_really_is_positive(child)) { 1894 all_negative = false; 1895 break; 1896 } 1897 } 1898 spin_unlock(&dentry->d_lock); 1899 1900 if (all_negative) 1901 shrink_dcache_parent(dentry); 1902 out: 1903 return all_negative; 1904 } 1905 1906 /* 1907 * Trim old(er) caps. 1908 * 1909 * Because we can't cache an inode without one or more caps, we do 1910 * this indirectly: if a cap is unused, we prune its aliases, at which 1911 * point the inode will hopefully get dropped to. 1912 * 1913 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1914 * memory pressure from the MDS, though, so it needn't be perfect. 1915 */ 1916 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1917 { 1918 int *remaining = arg; 1919 struct ceph_inode_info *ci = ceph_inode(inode); 1920 int used, wanted, oissued, mine; 1921 1922 if (*remaining <= 0) 1923 return -1; 1924 1925 spin_lock(&ci->i_ceph_lock); 1926 mine = cap->issued | cap->implemented; 1927 used = __ceph_caps_used(ci); 1928 wanted = __ceph_caps_file_wanted(ci); 1929 oissued = __ceph_caps_issued_other(ci, cap); 1930 1931 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1932 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1933 ceph_cap_string(used), ceph_cap_string(wanted)); 1934 if (cap == ci->i_auth_cap) { 1935 if (ci->i_dirty_caps || ci->i_flushing_caps || 1936 !list_empty(&ci->i_cap_snaps)) 1937 goto out; 1938 if ((used | wanted) & CEPH_CAP_ANY_WR) 1939 goto out; 1940 /* Note: it's possible that i_filelock_ref becomes non-zero 1941 * after dropping auth caps. It doesn't hurt because reply 1942 * of lock mds request will re-add auth caps. */ 1943 if (atomic_read(&ci->i_filelock_ref) > 0) 1944 goto out; 1945 } 1946 /* The inode has cached pages, but it's no longer used. 1947 * we can safely drop it */ 1948 if (S_ISREG(inode->i_mode) && 1949 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 1950 !(oissued & CEPH_CAP_FILE_CACHE)) { 1951 used = 0; 1952 oissued = 0; 1953 } 1954 if ((used | wanted) & ~oissued & mine) 1955 goto out; /* we need these caps */ 1956 1957 if (oissued) { 1958 /* we aren't the only cap.. just remove us */ 1959 __ceph_remove_cap(cap, true); 1960 (*remaining)--; 1961 } else { 1962 struct dentry *dentry; 1963 /* try dropping referring dentries */ 1964 spin_unlock(&ci->i_ceph_lock); 1965 dentry = d_find_any_alias(inode); 1966 if (dentry && drop_negative_children(dentry)) { 1967 int count; 1968 dput(dentry); 1969 d_prune_aliases(inode); 1970 count = atomic_read(&inode->i_count); 1971 if (count == 1) 1972 (*remaining)--; 1973 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1974 inode, cap, count); 1975 } else { 1976 dput(dentry); 1977 } 1978 return 0; 1979 } 1980 1981 out: 1982 spin_unlock(&ci->i_ceph_lock); 1983 return 0; 1984 } 1985 1986 /* 1987 * Trim session cap count down to some max number. 1988 */ 1989 int ceph_trim_caps(struct ceph_mds_client *mdsc, 1990 struct ceph_mds_session *session, 1991 int max_caps) 1992 { 1993 int trim_caps = session->s_nr_caps - max_caps; 1994 1995 dout("trim_caps mds%d start: %d / %d, trim %d\n", 1996 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 1997 if (trim_caps > 0) { 1998 int remaining = trim_caps; 1999 2000 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2001 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 2002 session->s_mds, session->s_nr_caps, max_caps, 2003 trim_caps - remaining); 2004 } 2005 2006 ceph_flush_cap_releases(mdsc, session); 2007 return 0; 2008 } 2009 2010 static int check_caps_flush(struct ceph_mds_client *mdsc, 2011 u64 want_flush_tid) 2012 { 2013 int ret = 1; 2014 2015 spin_lock(&mdsc->cap_dirty_lock); 2016 if (!list_empty(&mdsc->cap_flush_list)) { 2017 struct ceph_cap_flush *cf = 2018 list_first_entry(&mdsc->cap_flush_list, 2019 struct ceph_cap_flush, g_list); 2020 if (cf->tid <= want_flush_tid) { 2021 dout("check_caps_flush still flushing tid " 2022 "%llu <= %llu\n", cf->tid, want_flush_tid); 2023 ret = 0; 2024 } 2025 } 2026 spin_unlock(&mdsc->cap_dirty_lock); 2027 return ret; 2028 } 2029 2030 /* 2031 * flush all dirty inode data to disk. 2032 * 2033 * returns true if we've flushed through want_flush_tid 2034 */ 2035 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2036 u64 want_flush_tid) 2037 { 2038 dout("check_caps_flush want %llu\n", want_flush_tid); 2039 2040 wait_event(mdsc->cap_flushing_wq, 2041 check_caps_flush(mdsc, want_flush_tid)); 2042 2043 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 2044 } 2045 2046 /* 2047 * called under s_mutex 2048 */ 2049 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2050 struct ceph_mds_session *session) 2051 { 2052 struct ceph_msg *msg = NULL; 2053 struct ceph_mds_cap_release *head; 2054 struct ceph_mds_cap_item *item; 2055 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2056 struct ceph_cap *cap; 2057 LIST_HEAD(tmp_list); 2058 int num_cap_releases; 2059 __le32 barrier, *cap_barrier; 2060 2061 down_read(&osdc->lock); 2062 barrier = cpu_to_le32(osdc->epoch_barrier); 2063 up_read(&osdc->lock); 2064 2065 spin_lock(&session->s_cap_lock); 2066 again: 2067 list_splice_init(&session->s_cap_releases, &tmp_list); 2068 num_cap_releases = session->s_num_cap_releases; 2069 session->s_num_cap_releases = 0; 2070 spin_unlock(&session->s_cap_lock); 2071 2072 while (!list_empty(&tmp_list)) { 2073 if (!msg) { 2074 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2075 PAGE_SIZE, GFP_NOFS, false); 2076 if (!msg) 2077 goto out_err; 2078 head = msg->front.iov_base; 2079 head->num = cpu_to_le32(0); 2080 msg->front.iov_len = sizeof(*head); 2081 2082 msg->hdr.version = cpu_to_le16(2); 2083 msg->hdr.compat_version = cpu_to_le16(1); 2084 } 2085 2086 cap = list_first_entry(&tmp_list, struct ceph_cap, 2087 session_caps); 2088 list_del(&cap->session_caps); 2089 num_cap_releases--; 2090 2091 head = msg->front.iov_base; 2092 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2093 &head->num); 2094 item = msg->front.iov_base + msg->front.iov_len; 2095 item->ino = cpu_to_le64(cap->cap_ino); 2096 item->cap_id = cpu_to_le64(cap->cap_id); 2097 item->migrate_seq = cpu_to_le32(cap->mseq); 2098 item->seq = cpu_to_le32(cap->issue_seq); 2099 msg->front.iov_len += sizeof(*item); 2100 2101 ceph_put_cap(mdsc, cap); 2102 2103 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2104 // Append cap_barrier field 2105 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2106 *cap_barrier = barrier; 2107 msg->front.iov_len += sizeof(*cap_barrier); 2108 2109 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2110 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2111 ceph_con_send(&session->s_con, msg); 2112 msg = NULL; 2113 } 2114 } 2115 2116 BUG_ON(num_cap_releases != 0); 2117 2118 spin_lock(&session->s_cap_lock); 2119 if (!list_empty(&session->s_cap_releases)) 2120 goto again; 2121 spin_unlock(&session->s_cap_lock); 2122 2123 if (msg) { 2124 // Append cap_barrier field 2125 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2126 *cap_barrier = barrier; 2127 msg->front.iov_len += sizeof(*cap_barrier); 2128 2129 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2130 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2131 ceph_con_send(&session->s_con, msg); 2132 } 2133 return; 2134 out_err: 2135 pr_err("send_cap_releases mds%d, failed to allocate message\n", 2136 session->s_mds); 2137 spin_lock(&session->s_cap_lock); 2138 list_splice(&tmp_list, &session->s_cap_releases); 2139 session->s_num_cap_releases += num_cap_releases; 2140 spin_unlock(&session->s_cap_lock); 2141 } 2142 2143 static void ceph_cap_release_work(struct work_struct *work) 2144 { 2145 struct ceph_mds_session *session = 2146 container_of(work, struct ceph_mds_session, s_cap_release_work); 2147 2148 mutex_lock(&session->s_mutex); 2149 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2150 session->s_state == CEPH_MDS_SESSION_HUNG) 2151 ceph_send_cap_releases(session->s_mdsc, session); 2152 mutex_unlock(&session->s_mutex); 2153 ceph_put_mds_session(session); 2154 } 2155 2156 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 2157 struct ceph_mds_session *session) 2158 { 2159 if (mdsc->stopping) 2160 return; 2161 2162 ceph_get_mds_session(session); 2163 if (queue_work(mdsc->fsc->cap_wq, 2164 &session->s_cap_release_work)) { 2165 dout("cap release work queued\n"); 2166 } else { 2167 ceph_put_mds_session(session); 2168 dout("failed to queue cap release work\n"); 2169 } 2170 } 2171 2172 /* 2173 * caller holds session->s_cap_lock 2174 */ 2175 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2176 struct ceph_cap *cap) 2177 { 2178 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2179 session->s_num_cap_releases++; 2180 2181 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2182 ceph_flush_cap_releases(session->s_mdsc, session); 2183 } 2184 2185 static void ceph_cap_reclaim_work(struct work_struct *work) 2186 { 2187 struct ceph_mds_client *mdsc = 2188 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2189 int ret = ceph_trim_dentries(mdsc); 2190 if (ret == -EAGAIN) 2191 ceph_queue_cap_reclaim_work(mdsc); 2192 } 2193 2194 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2195 { 2196 if (mdsc->stopping) 2197 return; 2198 2199 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2200 dout("caps reclaim work queued\n"); 2201 } else { 2202 dout("failed to queue caps release work\n"); 2203 } 2204 } 2205 2206 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2207 { 2208 int val; 2209 if (!nr) 2210 return; 2211 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2212 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2213 atomic_set(&mdsc->cap_reclaim_pending, 0); 2214 ceph_queue_cap_reclaim_work(mdsc); 2215 } 2216 } 2217 2218 /* 2219 * requests 2220 */ 2221 2222 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2223 struct inode *dir) 2224 { 2225 struct ceph_inode_info *ci = ceph_inode(dir); 2226 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2227 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2228 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2229 unsigned int num_entries; 2230 int order; 2231 2232 spin_lock(&ci->i_ceph_lock); 2233 num_entries = ci->i_files + ci->i_subdirs; 2234 spin_unlock(&ci->i_ceph_lock); 2235 num_entries = max(num_entries, 1U); 2236 num_entries = min(num_entries, opt->max_readdir); 2237 2238 order = get_order(size * num_entries); 2239 while (order >= 0) { 2240 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2241 __GFP_NOWARN, 2242 order); 2243 if (rinfo->dir_entries) 2244 break; 2245 order--; 2246 } 2247 if (!rinfo->dir_entries) 2248 return -ENOMEM; 2249 2250 num_entries = (PAGE_SIZE << order) / size; 2251 num_entries = min(num_entries, opt->max_readdir); 2252 2253 rinfo->dir_buf_size = PAGE_SIZE << order; 2254 req->r_num_caps = num_entries + 1; 2255 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2256 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2257 return 0; 2258 } 2259 2260 /* 2261 * Create an mds request. 2262 */ 2263 struct ceph_mds_request * 2264 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2265 { 2266 struct ceph_mds_request *req; 2267 2268 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2269 if (!req) 2270 return ERR_PTR(-ENOMEM); 2271 2272 mutex_init(&req->r_fill_mutex); 2273 req->r_mdsc = mdsc; 2274 req->r_started = jiffies; 2275 req->r_start_latency = ktime_get(); 2276 req->r_resend_mds = -1; 2277 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2278 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2279 req->r_fmode = -1; 2280 kref_init(&req->r_kref); 2281 RB_CLEAR_NODE(&req->r_node); 2282 INIT_LIST_HEAD(&req->r_wait); 2283 init_completion(&req->r_completion); 2284 init_completion(&req->r_safe_completion); 2285 INIT_LIST_HEAD(&req->r_unsafe_item); 2286 2287 ktime_get_coarse_real_ts64(&req->r_stamp); 2288 2289 req->r_op = op; 2290 req->r_direct_mode = mode; 2291 return req; 2292 } 2293 2294 /* 2295 * return oldest (lowest) request, tid in request tree, 0 if none. 2296 * 2297 * called under mdsc->mutex. 2298 */ 2299 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2300 { 2301 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2302 return NULL; 2303 return rb_entry(rb_first(&mdsc->request_tree), 2304 struct ceph_mds_request, r_node); 2305 } 2306 2307 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2308 { 2309 return mdsc->oldest_tid; 2310 } 2311 2312 /* 2313 * Build a dentry's path. Allocate on heap; caller must kfree. Based 2314 * on build_path_from_dentry in fs/cifs/dir.c. 2315 * 2316 * If @stop_on_nosnap, generate path relative to the first non-snapped 2317 * inode. 2318 * 2319 * Encode hidden .snap dirs as a double /, i.e. 2320 * foo/.snap/bar -> foo//bar 2321 */ 2322 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, 2323 int stop_on_nosnap) 2324 { 2325 struct dentry *temp; 2326 char *path; 2327 int pos; 2328 unsigned seq; 2329 u64 base; 2330 2331 if (!dentry) 2332 return ERR_PTR(-EINVAL); 2333 2334 path = __getname(); 2335 if (!path) 2336 return ERR_PTR(-ENOMEM); 2337 retry: 2338 pos = PATH_MAX - 1; 2339 path[pos] = '\0'; 2340 2341 seq = read_seqbegin(&rename_lock); 2342 rcu_read_lock(); 2343 temp = dentry; 2344 for (;;) { 2345 struct inode *inode; 2346 2347 spin_lock(&temp->d_lock); 2348 inode = d_inode(temp); 2349 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2350 dout("build_path path+%d: %p SNAPDIR\n", 2351 pos, temp); 2352 } else if (stop_on_nosnap && inode && dentry != temp && 2353 ceph_snap(inode) == CEPH_NOSNAP) { 2354 spin_unlock(&temp->d_lock); 2355 pos++; /* get rid of any prepended '/' */ 2356 break; 2357 } else { 2358 pos -= temp->d_name.len; 2359 if (pos < 0) { 2360 spin_unlock(&temp->d_lock); 2361 break; 2362 } 2363 memcpy(path + pos, temp->d_name.name, temp->d_name.len); 2364 } 2365 spin_unlock(&temp->d_lock); 2366 temp = READ_ONCE(temp->d_parent); 2367 2368 /* Are we at the root? */ 2369 if (IS_ROOT(temp)) 2370 break; 2371 2372 /* Are we out of buffer? */ 2373 if (--pos < 0) 2374 break; 2375 2376 path[pos] = '/'; 2377 } 2378 base = ceph_ino(d_inode(temp)); 2379 rcu_read_unlock(); 2380 2381 if (read_seqretry(&rename_lock, seq)) 2382 goto retry; 2383 2384 if (pos < 0) { 2385 /* 2386 * A rename didn't occur, but somehow we didn't end up where 2387 * we thought we would. Throw a warning and try again. 2388 */ 2389 pr_warn("build_path did not end path lookup where " 2390 "expected, pos is %d\n", pos); 2391 goto retry; 2392 } 2393 2394 *pbase = base; 2395 *plen = PATH_MAX - 1 - pos; 2396 dout("build_path on %p %d built %llx '%.*s'\n", 2397 dentry, d_count(dentry), base, *plen, path + pos); 2398 return path + pos; 2399 } 2400 2401 static int build_dentry_path(struct dentry *dentry, struct inode *dir, 2402 const char **ppath, int *ppathlen, u64 *pino, 2403 bool *pfreepath, bool parent_locked) 2404 { 2405 char *path; 2406 2407 rcu_read_lock(); 2408 if (!dir) 2409 dir = d_inode_rcu(dentry->d_parent); 2410 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { 2411 *pino = ceph_ino(dir); 2412 rcu_read_unlock(); 2413 *ppath = dentry->d_name.name; 2414 *ppathlen = dentry->d_name.len; 2415 return 0; 2416 } 2417 rcu_read_unlock(); 2418 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2419 if (IS_ERR(path)) 2420 return PTR_ERR(path); 2421 *ppath = path; 2422 *pfreepath = true; 2423 return 0; 2424 } 2425 2426 static int build_inode_path(struct inode *inode, 2427 const char **ppath, int *ppathlen, u64 *pino, 2428 bool *pfreepath) 2429 { 2430 struct dentry *dentry; 2431 char *path; 2432 2433 if (ceph_snap(inode) == CEPH_NOSNAP) { 2434 *pino = ceph_ino(inode); 2435 *ppathlen = 0; 2436 return 0; 2437 } 2438 dentry = d_find_alias(inode); 2439 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2440 dput(dentry); 2441 if (IS_ERR(path)) 2442 return PTR_ERR(path); 2443 *ppath = path; 2444 *pfreepath = true; 2445 return 0; 2446 } 2447 2448 /* 2449 * request arguments may be specified via an inode *, a dentry *, or 2450 * an explicit ino+path. 2451 */ 2452 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2453 struct inode *rdiri, const char *rpath, 2454 u64 rino, const char **ppath, int *pathlen, 2455 u64 *ino, bool *freepath, bool parent_locked) 2456 { 2457 int r = 0; 2458 2459 if (rinode) { 2460 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2461 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2462 ceph_snap(rinode)); 2463 } else if (rdentry) { 2464 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2465 freepath, parent_locked); 2466 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2467 *ppath); 2468 } else if (rpath || rino) { 2469 *ino = rino; 2470 *ppath = rpath; 2471 *pathlen = rpath ? strlen(rpath) : 0; 2472 dout(" path %.*s\n", *pathlen, rpath); 2473 } 2474 2475 return r; 2476 } 2477 2478 static void encode_timestamp_and_gids(void **p, 2479 const struct ceph_mds_request *req) 2480 { 2481 struct ceph_timespec ts; 2482 int i; 2483 2484 ceph_encode_timespec64(&ts, &req->r_stamp); 2485 ceph_encode_copy(p, &ts, sizeof(ts)); 2486 2487 /* gid_list */ 2488 ceph_encode_32(p, req->r_cred->group_info->ngroups); 2489 for (i = 0; i < req->r_cred->group_info->ngroups; i++) 2490 ceph_encode_64(p, from_kgid(&init_user_ns, 2491 req->r_cred->group_info->gid[i])); 2492 } 2493 2494 /* 2495 * called under mdsc->mutex 2496 */ 2497 static struct ceph_msg *create_request_message(struct ceph_mds_session *session, 2498 struct ceph_mds_request *req, 2499 bool drop_cap_releases) 2500 { 2501 int mds = session->s_mds; 2502 struct ceph_mds_client *mdsc = session->s_mdsc; 2503 struct ceph_msg *msg; 2504 struct ceph_mds_request_head_old *head; 2505 const char *path1 = NULL; 2506 const char *path2 = NULL; 2507 u64 ino1 = 0, ino2 = 0; 2508 int pathlen1 = 0, pathlen2 = 0; 2509 bool freepath1 = false, freepath2 = false; 2510 int len; 2511 u16 releases; 2512 void *p, *end; 2513 int ret; 2514 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); 2515 2516 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2517 req->r_parent, req->r_path1, req->r_ino1.ino, 2518 &path1, &pathlen1, &ino1, &freepath1, 2519 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2520 &req->r_req_flags)); 2521 if (ret < 0) { 2522 msg = ERR_PTR(ret); 2523 goto out; 2524 } 2525 2526 /* If r_old_dentry is set, then assume that its parent is locked */ 2527 ret = set_request_path_attr(NULL, req->r_old_dentry, 2528 req->r_old_dentry_dir, 2529 req->r_path2, req->r_ino2.ino, 2530 &path2, &pathlen2, &ino2, &freepath2, true); 2531 if (ret < 0) { 2532 msg = ERR_PTR(ret); 2533 goto out_free1; 2534 } 2535 2536 len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head); 2537 len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2538 sizeof(struct ceph_timespec); 2539 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); 2540 2541 /* calculate (max) length for cap releases */ 2542 len += sizeof(struct ceph_mds_request_release) * 2543 (!!req->r_inode_drop + !!req->r_dentry_drop + 2544 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2545 2546 if (req->r_dentry_drop) 2547 len += pathlen1; 2548 if (req->r_old_dentry_drop) 2549 len += pathlen2; 2550 2551 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2552 if (!msg) { 2553 msg = ERR_PTR(-ENOMEM); 2554 goto out_free2; 2555 } 2556 2557 msg->hdr.tid = cpu_to_le64(req->r_tid); 2558 2559 /* 2560 * The old ceph_mds_request_head didn't contain a version field, and 2561 * one was added when we moved the message version from 3->4. 2562 */ 2563 if (legacy) { 2564 msg->hdr.version = cpu_to_le16(3); 2565 head = msg->front.iov_base; 2566 p = msg->front.iov_base + sizeof(*head); 2567 } else { 2568 struct ceph_mds_request_head *new_head = msg->front.iov_base; 2569 2570 msg->hdr.version = cpu_to_le16(4); 2571 new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); 2572 head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2573 p = msg->front.iov_base + sizeof(*new_head); 2574 } 2575 2576 end = msg->front.iov_base + msg->front.iov_len; 2577 2578 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2579 head->op = cpu_to_le32(req->r_op); 2580 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, 2581 req->r_cred->fsuid)); 2582 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, 2583 req->r_cred->fsgid)); 2584 head->ino = cpu_to_le64(req->r_deleg_ino); 2585 head->args = req->r_args; 2586 2587 ceph_encode_filepath(&p, end, ino1, path1); 2588 ceph_encode_filepath(&p, end, ino2, path2); 2589 2590 /* make note of release offset, in case we need to replay */ 2591 req->r_request_release_offset = p - msg->front.iov_base; 2592 2593 /* cap releases */ 2594 releases = 0; 2595 if (req->r_inode_drop) 2596 releases += ceph_encode_inode_release(&p, 2597 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2598 mds, req->r_inode_drop, req->r_inode_unless, 2599 req->r_op == CEPH_MDS_OP_READDIR); 2600 if (req->r_dentry_drop) 2601 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2602 req->r_parent, mds, req->r_dentry_drop, 2603 req->r_dentry_unless); 2604 if (req->r_old_dentry_drop) 2605 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2606 req->r_old_dentry_dir, mds, 2607 req->r_old_dentry_drop, 2608 req->r_old_dentry_unless); 2609 if (req->r_old_inode_drop) 2610 releases += ceph_encode_inode_release(&p, 2611 d_inode(req->r_old_dentry), 2612 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2613 2614 if (drop_cap_releases) { 2615 releases = 0; 2616 p = msg->front.iov_base + req->r_request_release_offset; 2617 } 2618 2619 head->num_releases = cpu_to_le16(releases); 2620 2621 encode_timestamp_and_gids(&p, req); 2622 2623 if (WARN_ON_ONCE(p > end)) { 2624 ceph_msg_put(msg); 2625 msg = ERR_PTR(-ERANGE); 2626 goto out_free2; 2627 } 2628 2629 msg->front.iov_len = p - msg->front.iov_base; 2630 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2631 2632 if (req->r_pagelist) { 2633 struct ceph_pagelist *pagelist = req->r_pagelist; 2634 ceph_msg_data_add_pagelist(msg, pagelist); 2635 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2636 } else { 2637 msg->hdr.data_len = 0; 2638 } 2639 2640 msg->hdr.data_off = cpu_to_le16(0); 2641 2642 out_free2: 2643 if (freepath2) 2644 ceph_mdsc_free_path((char *)path2, pathlen2); 2645 out_free1: 2646 if (freepath1) 2647 ceph_mdsc_free_path((char *)path1, pathlen1); 2648 out: 2649 return msg; 2650 } 2651 2652 /* 2653 * called under mdsc->mutex if error, under no mutex if 2654 * success. 2655 */ 2656 static void complete_request(struct ceph_mds_client *mdsc, 2657 struct ceph_mds_request *req) 2658 { 2659 req->r_end_latency = ktime_get(); 2660 2661 if (req->r_callback) 2662 req->r_callback(mdsc, req); 2663 complete_all(&req->r_completion); 2664 } 2665 2666 static struct ceph_mds_request_head_old * 2667 find_old_request_head(void *p, u64 features) 2668 { 2669 bool legacy = !(features & CEPH_FEATURE_FS_BTIME); 2670 struct ceph_mds_request_head *new_head; 2671 2672 if (legacy) 2673 return (struct ceph_mds_request_head_old *)p; 2674 new_head = (struct ceph_mds_request_head *)p; 2675 return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2676 } 2677 2678 /* 2679 * called under mdsc->mutex 2680 */ 2681 static int __prepare_send_request(struct ceph_mds_session *session, 2682 struct ceph_mds_request *req, 2683 bool drop_cap_releases) 2684 { 2685 int mds = session->s_mds; 2686 struct ceph_mds_client *mdsc = session->s_mdsc; 2687 struct ceph_mds_request_head_old *rhead; 2688 struct ceph_msg *msg; 2689 int flags = 0; 2690 2691 req->r_attempts++; 2692 if (req->r_inode) { 2693 struct ceph_cap *cap = 2694 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2695 2696 if (cap) 2697 req->r_sent_on_mseq = cap->mseq; 2698 else 2699 req->r_sent_on_mseq = -1; 2700 } 2701 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 2702 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2703 2704 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2705 void *p; 2706 2707 /* 2708 * Replay. Do not regenerate message (and rebuild 2709 * paths, etc.); just use the original message. 2710 * Rebuilding paths will break for renames because 2711 * d_move mangles the src name. 2712 */ 2713 msg = req->r_request; 2714 rhead = find_old_request_head(msg->front.iov_base, 2715 session->s_con.peer_features); 2716 2717 flags = le32_to_cpu(rhead->flags); 2718 flags |= CEPH_MDS_FLAG_REPLAY; 2719 rhead->flags = cpu_to_le32(flags); 2720 2721 if (req->r_target_inode) 2722 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2723 2724 rhead->num_retry = req->r_attempts - 1; 2725 2726 /* remove cap/dentry releases from message */ 2727 rhead->num_releases = 0; 2728 2729 p = msg->front.iov_base + req->r_request_release_offset; 2730 encode_timestamp_and_gids(&p, req); 2731 2732 msg->front.iov_len = p - msg->front.iov_base; 2733 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2734 return 0; 2735 } 2736 2737 if (req->r_request) { 2738 ceph_msg_put(req->r_request); 2739 req->r_request = NULL; 2740 } 2741 msg = create_request_message(session, req, drop_cap_releases); 2742 if (IS_ERR(msg)) { 2743 req->r_err = PTR_ERR(msg); 2744 return PTR_ERR(msg); 2745 } 2746 req->r_request = msg; 2747 2748 rhead = find_old_request_head(msg->front.iov_base, 2749 session->s_con.peer_features); 2750 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2751 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2752 flags |= CEPH_MDS_FLAG_REPLAY; 2753 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 2754 flags |= CEPH_MDS_FLAG_ASYNC; 2755 if (req->r_parent) 2756 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2757 rhead->flags = cpu_to_le32(flags); 2758 rhead->num_fwd = req->r_num_fwd; 2759 rhead->num_retry = req->r_attempts - 1; 2760 2761 dout(" r_parent = %p\n", req->r_parent); 2762 return 0; 2763 } 2764 2765 /* 2766 * called under mdsc->mutex 2767 */ 2768 static int __send_request(struct ceph_mds_session *session, 2769 struct ceph_mds_request *req, 2770 bool drop_cap_releases) 2771 { 2772 int err; 2773 2774 err = __prepare_send_request(session, req, drop_cap_releases); 2775 if (!err) { 2776 ceph_msg_get(req->r_request); 2777 ceph_con_send(&session->s_con, req->r_request); 2778 } 2779 2780 return err; 2781 } 2782 2783 /* 2784 * send request, or put it on the appropriate wait list. 2785 */ 2786 static void __do_request(struct ceph_mds_client *mdsc, 2787 struct ceph_mds_request *req) 2788 { 2789 struct ceph_mds_session *session = NULL; 2790 int mds = -1; 2791 int err = 0; 2792 bool random; 2793 2794 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2795 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2796 __unregister_request(mdsc, req); 2797 return; 2798 } 2799 2800 if (req->r_timeout && 2801 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2802 dout("do_request timed out\n"); 2803 err = -ETIMEDOUT; 2804 goto finish; 2805 } 2806 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2807 dout("do_request forced umount\n"); 2808 err = -EIO; 2809 goto finish; 2810 } 2811 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2812 if (mdsc->mdsmap_err) { 2813 err = mdsc->mdsmap_err; 2814 dout("do_request mdsmap err %d\n", err); 2815 goto finish; 2816 } 2817 if (mdsc->mdsmap->m_epoch == 0) { 2818 dout("do_request no mdsmap, waiting for map\n"); 2819 list_add(&req->r_wait, &mdsc->waiting_for_map); 2820 return; 2821 } 2822 if (!(mdsc->fsc->mount_options->flags & 2823 CEPH_MOUNT_OPT_MOUNTWAIT) && 2824 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2825 err = -EHOSTUNREACH; 2826 goto finish; 2827 } 2828 } 2829 2830 put_request_session(req); 2831 2832 mds = __choose_mds(mdsc, req, &random); 2833 if (mds < 0 || 2834 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2835 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2836 err = -EJUKEBOX; 2837 goto finish; 2838 } 2839 dout("do_request no mds or not active, waiting for map\n"); 2840 list_add(&req->r_wait, &mdsc->waiting_for_map); 2841 return; 2842 } 2843 2844 /* get, open session */ 2845 session = __ceph_lookup_mds_session(mdsc, mds); 2846 if (!session) { 2847 session = register_session(mdsc, mds); 2848 if (IS_ERR(session)) { 2849 err = PTR_ERR(session); 2850 goto finish; 2851 } 2852 } 2853 req->r_session = ceph_get_mds_session(session); 2854 2855 dout("do_request mds%d session %p state %s\n", mds, session, 2856 ceph_session_state_name(session->s_state)); 2857 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2858 session->s_state != CEPH_MDS_SESSION_HUNG) { 2859 /* 2860 * We cannot queue async requests since the caps and delegated 2861 * inodes are bound to the session. Just return -EJUKEBOX and 2862 * let the caller retry a sync request in that case. 2863 */ 2864 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2865 err = -EJUKEBOX; 2866 goto out_session; 2867 } 2868 2869 /* 2870 * If the session has been REJECTED, then return a hard error, 2871 * unless it's a CLEANRECOVER mount, in which case we'll queue 2872 * it to the mdsc queue. 2873 */ 2874 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2875 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) 2876 list_add(&req->r_wait, &mdsc->waiting_for_map); 2877 else 2878 err = -EACCES; 2879 goto out_session; 2880 } 2881 2882 if (session->s_state == CEPH_MDS_SESSION_NEW || 2883 session->s_state == CEPH_MDS_SESSION_CLOSING) { 2884 err = __open_session(mdsc, session); 2885 if (err) 2886 goto out_session; 2887 /* retry the same mds later */ 2888 if (random) 2889 req->r_resend_mds = mds; 2890 } 2891 list_add(&req->r_wait, &session->s_waiting); 2892 goto out_session; 2893 } 2894 2895 /* send request */ 2896 req->r_resend_mds = -1; /* forget any previous mds hint */ 2897 2898 if (req->r_request_started == 0) /* note request start time */ 2899 req->r_request_started = jiffies; 2900 2901 err = __send_request(session, req, false); 2902 2903 out_session: 2904 ceph_put_mds_session(session); 2905 finish: 2906 if (err) { 2907 dout("__do_request early error %d\n", err); 2908 req->r_err = err; 2909 complete_request(mdsc, req); 2910 __unregister_request(mdsc, req); 2911 } 2912 return; 2913 } 2914 2915 /* 2916 * called under mdsc->mutex 2917 */ 2918 static void __wake_requests(struct ceph_mds_client *mdsc, 2919 struct list_head *head) 2920 { 2921 struct ceph_mds_request *req; 2922 LIST_HEAD(tmp_list); 2923 2924 list_splice_init(head, &tmp_list); 2925 2926 while (!list_empty(&tmp_list)) { 2927 req = list_entry(tmp_list.next, 2928 struct ceph_mds_request, r_wait); 2929 list_del_init(&req->r_wait); 2930 dout(" wake request %p tid %llu\n", req, req->r_tid); 2931 __do_request(mdsc, req); 2932 } 2933 } 2934 2935 /* 2936 * Wake up threads with requests pending for @mds, so that they can 2937 * resubmit their requests to a possibly different mds. 2938 */ 2939 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 2940 { 2941 struct ceph_mds_request *req; 2942 struct rb_node *p = rb_first(&mdsc->request_tree); 2943 2944 dout("kick_requests mds%d\n", mds); 2945 while (p) { 2946 req = rb_entry(p, struct ceph_mds_request, r_node); 2947 p = rb_next(p); 2948 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2949 continue; 2950 if (req->r_attempts > 0) 2951 continue; /* only new requests */ 2952 if (req->r_session && 2953 req->r_session->s_mds == mds) { 2954 dout(" kicking tid %llu\n", req->r_tid); 2955 list_del_init(&req->r_wait); 2956 __do_request(mdsc, req); 2957 } 2958 } 2959 } 2960 2961 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 2962 struct ceph_mds_request *req) 2963 { 2964 int err = 0; 2965 2966 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 2967 if (req->r_inode) 2968 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2969 if (req->r_parent) { 2970 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 2971 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 2972 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 2973 spin_lock(&ci->i_ceph_lock); 2974 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 2975 __ceph_touch_fmode(ci, mdsc, fmode); 2976 spin_unlock(&ci->i_ceph_lock); 2977 ihold(req->r_parent); 2978 } 2979 if (req->r_old_dentry_dir) 2980 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2981 CEPH_CAP_PIN); 2982 2983 if (req->r_inode) { 2984 err = ceph_wait_on_async_create(req->r_inode); 2985 if (err) { 2986 dout("%s: wait for async create returned: %d\n", 2987 __func__, err); 2988 return err; 2989 } 2990 } 2991 2992 if (!err && req->r_old_inode) { 2993 err = ceph_wait_on_async_create(req->r_old_inode); 2994 if (err) { 2995 dout("%s: wait for async create returned: %d\n", 2996 __func__, err); 2997 return err; 2998 } 2999 } 3000 3001 dout("submit_request on %p for inode %p\n", req, dir); 3002 mutex_lock(&mdsc->mutex); 3003 __register_request(mdsc, req, dir); 3004 __do_request(mdsc, req); 3005 err = req->r_err; 3006 mutex_unlock(&mdsc->mutex); 3007 return err; 3008 } 3009 3010 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 3011 struct ceph_mds_request *req) 3012 { 3013 int err; 3014 3015 /* wait */ 3016 dout("do_request waiting\n"); 3017 if (!req->r_timeout && req->r_wait_for_completion) { 3018 err = req->r_wait_for_completion(mdsc, req); 3019 } else { 3020 long timeleft = wait_for_completion_killable_timeout( 3021 &req->r_completion, 3022 ceph_timeout_jiffies(req->r_timeout)); 3023 if (timeleft > 0) 3024 err = 0; 3025 else if (!timeleft) 3026 err = -ETIMEDOUT; /* timed out */ 3027 else 3028 err = timeleft; /* killed */ 3029 } 3030 dout("do_request waited, got %d\n", err); 3031 mutex_lock(&mdsc->mutex); 3032 3033 /* only abort if we didn't race with a real reply */ 3034 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3035 err = le32_to_cpu(req->r_reply_info.head->result); 3036 } else if (err < 0) { 3037 dout("aborted request %lld with %d\n", req->r_tid, err); 3038 3039 /* 3040 * ensure we aren't running concurrently with 3041 * ceph_fill_trace or ceph_readdir_prepopulate, which 3042 * rely on locks (dir mutex) held by our caller. 3043 */ 3044 mutex_lock(&req->r_fill_mutex); 3045 req->r_err = err; 3046 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3047 mutex_unlock(&req->r_fill_mutex); 3048 3049 if (req->r_parent && 3050 (req->r_op & CEPH_MDS_OP_WRITE)) 3051 ceph_invalidate_dir_request(req); 3052 } else { 3053 err = req->r_err; 3054 } 3055 3056 mutex_unlock(&mdsc->mutex); 3057 return err; 3058 } 3059 3060 /* 3061 * Synchrously perform an mds request. Take care of all of the 3062 * session setup, forwarding, retry details. 3063 */ 3064 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3065 struct inode *dir, 3066 struct ceph_mds_request *req) 3067 { 3068 int err; 3069 3070 dout("do_request on %p\n", req); 3071 3072 /* issue */ 3073 err = ceph_mdsc_submit_request(mdsc, dir, req); 3074 if (!err) 3075 err = ceph_mdsc_wait_request(mdsc, req); 3076 dout("do_request %p done, result %d\n", req, err); 3077 return err; 3078 } 3079 3080 /* 3081 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3082 * namespace request. 3083 */ 3084 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3085 { 3086 struct inode *dir = req->r_parent; 3087 struct inode *old_dir = req->r_old_dentry_dir; 3088 3089 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 3090 3091 ceph_dir_clear_complete(dir); 3092 if (old_dir) 3093 ceph_dir_clear_complete(old_dir); 3094 if (req->r_dentry) 3095 ceph_invalidate_dentry_lease(req->r_dentry); 3096 if (req->r_old_dentry) 3097 ceph_invalidate_dentry_lease(req->r_old_dentry); 3098 } 3099 3100 /* 3101 * Handle mds reply. 3102 * 3103 * We take the session mutex and parse and process the reply immediately. 3104 * This preserves the logical ordering of replies, capabilities, etc., sent 3105 * by the MDS as they are applied to our local cache. 3106 */ 3107 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3108 { 3109 struct ceph_mds_client *mdsc = session->s_mdsc; 3110 struct ceph_mds_request *req; 3111 struct ceph_mds_reply_head *head = msg->front.iov_base; 3112 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3113 struct ceph_snap_realm *realm; 3114 u64 tid; 3115 int err, result; 3116 int mds = session->s_mds; 3117 3118 if (msg->front.iov_len < sizeof(*head)) { 3119 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 3120 ceph_msg_dump(msg); 3121 return; 3122 } 3123 3124 /* get request, session */ 3125 tid = le64_to_cpu(msg->hdr.tid); 3126 mutex_lock(&mdsc->mutex); 3127 req = lookup_get_request(mdsc, tid); 3128 if (!req) { 3129 dout("handle_reply on unknown tid %llu\n", tid); 3130 mutex_unlock(&mdsc->mutex); 3131 return; 3132 } 3133 dout("handle_reply %p\n", req); 3134 3135 /* correct session? */ 3136 if (req->r_session != session) { 3137 pr_err("mdsc_handle_reply got %llu on session mds%d" 3138 " not mds%d\n", tid, session->s_mds, 3139 req->r_session ? req->r_session->s_mds : -1); 3140 mutex_unlock(&mdsc->mutex); 3141 goto out; 3142 } 3143 3144 /* dup? */ 3145 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3146 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3147 pr_warn("got a dup %s reply on %llu from mds%d\n", 3148 head->safe ? "safe" : "unsafe", tid, mds); 3149 mutex_unlock(&mdsc->mutex); 3150 goto out; 3151 } 3152 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3153 pr_warn("got unsafe after safe on %llu from mds%d\n", 3154 tid, mds); 3155 mutex_unlock(&mdsc->mutex); 3156 goto out; 3157 } 3158 3159 result = le32_to_cpu(head->result); 3160 3161 /* 3162 * Handle an ESTALE 3163 * if we're not talking to the authority, send to them 3164 * if the authority has changed while we weren't looking, 3165 * send to new authority 3166 * Otherwise we just have to return an ESTALE 3167 */ 3168 if (result == -ESTALE) { 3169 dout("got ESTALE on request %llu\n", req->r_tid); 3170 req->r_resend_mds = -1; 3171 if (req->r_direct_mode != USE_AUTH_MDS) { 3172 dout("not using auth, setting for that now\n"); 3173 req->r_direct_mode = USE_AUTH_MDS; 3174 __do_request(mdsc, req); 3175 mutex_unlock(&mdsc->mutex); 3176 goto out; 3177 } else { 3178 int mds = __choose_mds(mdsc, req, NULL); 3179 if (mds >= 0 && mds != req->r_session->s_mds) { 3180 dout("but auth changed, so resending\n"); 3181 __do_request(mdsc, req); 3182 mutex_unlock(&mdsc->mutex); 3183 goto out; 3184 } 3185 } 3186 dout("have to return ESTALE on request %llu\n", req->r_tid); 3187 } 3188 3189 3190 if (head->safe) { 3191 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3192 __unregister_request(mdsc, req); 3193 3194 /* last request during umount? */ 3195 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3196 complete_all(&mdsc->safe_umount_waiters); 3197 3198 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3199 /* 3200 * We already handled the unsafe response, now do the 3201 * cleanup. No need to examine the response; the MDS 3202 * doesn't include any result info in the safe 3203 * response. And even if it did, there is nothing 3204 * useful we could do with a revised return value. 3205 */ 3206 dout("got safe reply %llu, mds%d\n", tid, mds); 3207 3208 mutex_unlock(&mdsc->mutex); 3209 goto out; 3210 } 3211 } else { 3212 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3213 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3214 } 3215 3216 dout("handle_reply tid %lld result %d\n", tid, result); 3217 rinfo = &req->r_reply_info; 3218 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3219 err = parse_reply_info(session, msg, rinfo, (u64)-1); 3220 else 3221 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); 3222 mutex_unlock(&mdsc->mutex); 3223 3224 /* Must find target inode outside of mutexes to avoid deadlocks */ 3225 if ((err >= 0) && rinfo->head->is_target) { 3226 struct inode *in; 3227 struct ceph_vino tvino = { 3228 .ino = le64_to_cpu(rinfo->targeti.in->ino), 3229 .snap = le64_to_cpu(rinfo->targeti.in->snapid) 3230 }; 3231 3232 in = ceph_get_inode(mdsc->fsc->sb, tvino); 3233 if (IS_ERR(in)) { 3234 err = PTR_ERR(in); 3235 mutex_lock(&session->s_mutex); 3236 goto out_err; 3237 } 3238 req->r_target_inode = in; 3239 } 3240 3241 mutex_lock(&session->s_mutex); 3242 if (err < 0) { 3243 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 3244 ceph_msg_dump(msg); 3245 goto out_err; 3246 } 3247 3248 /* snap trace */ 3249 realm = NULL; 3250 if (rinfo->snapblob_len) { 3251 down_write(&mdsc->snap_rwsem); 3252 ceph_update_snap_trace(mdsc, rinfo->snapblob, 3253 rinfo->snapblob + rinfo->snapblob_len, 3254 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3255 &realm); 3256 downgrade_write(&mdsc->snap_rwsem); 3257 } else { 3258 down_read(&mdsc->snap_rwsem); 3259 } 3260 3261 /* insert trace into our cache */ 3262 mutex_lock(&req->r_fill_mutex); 3263 current->journal_info = req; 3264 err = ceph_fill_trace(mdsc->fsc->sb, req); 3265 if (err == 0) { 3266 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3267 req->r_op == CEPH_MDS_OP_LSSNAP)) 3268 ceph_readdir_prepopulate(req, req->r_session); 3269 } 3270 current->journal_info = NULL; 3271 mutex_unlock(&req->r_fill_mutex); 3272 3273 up_read(&mdsc->snap_rwsem); 3274 if (realm) 3275 ceph_put_snap_realm(mdsc, realm); 3276 3277 if (err == 0) { 3278 if (req->r_target_inode && 3279 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3280 struct ceph_inode_info *ci = 3281 ceph_inode(req->r_target_inode); 3282 spin_lock(&ci->i_unsafe_lock); 3283 list_add_tail(&req->r_unsafe_target_item, 3284 &ci->i_unsafe_iops); 3285 spin_unlock(&ci->i_unsafe_lock); 3286 } 3287 3288 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3289 } 3290 out_err: 3291 mutex_lock(&mdsc->mutex); 3292 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3293 if (err) { 3294 req->r_err = err; 3295 } else { 3296 req->r_reply = ceph_msg_get(msg); 3297 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3298 } 3299 } else { 3300 dout("reply arrived after request %lld was aborted\n", tid); 3301 } 3302 mutex_unlock(&mdsc->mutex); 3303 3304 mutex_unlock(&session->s_mutex); 3305 3306 /* kick calling process */ 3307 complete_request(mdsc, req); 3308 3309 ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency, 3310 req->r_end_latency, err); 3311 out: 3312 ceph_mdsc_put_request(req); 3313 return; 3314 } 3315 3316 3317 3318 /* 3319 * handle mds notification that our request has been forwarded. 3320 */ 3321 static void handle_forward(struct ceph_mds_client *mdsc, 3322 struct ceph_mds_session *session, 3323 struct ceph_msg *msg) 3324 { 3325 struct ceph_mds_request *req; 3326 u64 tid = le64_to_cpu(msg->hdr.tid); 3327 u32 next_mds; 3328 u32 fwd_seq; 3329 int err = -EINVAL; 3330 void *p = msg->front.iov_base; 3331 void *end = p + msg->front.iov_len; 3332 3333 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3334 next_mds = ceph_decode_32(&p); 3335 fwd_seq = ceph_decode_32(&p); 3336 3337 mutex_lock(&mdsc->mutex); 3338 req = lookup_get_request(mdsc, tid); 3339 if (!req) { 3340 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 3341 goto out; /* dup reply? */ 3342 } 3343 3344 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3345 dout("forward tid %llu aborted, unregistering\n", tid); 3346 __unregister_request(mdsc, req); 3347 } else if (fwd_seq <= req->r_num_fwd) { 3348 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 3349 tid, next_mds, req->r_num_fwd, fwd_seq); 3350 } else { 3351 /* resend. forward race not possible; mds would drop */ 3352 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3353 BUG_ON(req->r_err); 3354 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3355 req->r_attempts = 0; 3356 req->r_num_fwd = fwd_seq; 3357 req->r_resend_mds = next_mds; 3358 put_request_session(req); 3359 __do_request(mdsc, req); 3360 } 3361 ceph_mdsc_put_request(req); 3362 out: 3363 mutex_unlock(&mdsc->mutex); 3364 return; 3365 3366 bad: 3367 pr_err("mdsc_handle_forward decode error err=%d\n", err); 3368 } 3369 3370 static int __decode_session_metadata(void **p, void *end, 3371 bool *blocklisted) 3372 { 3373 /* map<string,string> */ 3374 u32 n; 3375 bool err_str; 3376 ceph_decode_32_safe(p, end, n, bad); 3377 while (n-- > 0) { 3378 u32 len; 3379 ceph_decode_32_safe(p, end, len, bad); 3380 ceph_decode_need(p, end, len, bad); 3381 err_str = !strncmp(*p, "error_string", len); 3382 *p += len; 3383 ceph_decode_32_safe(p, end, len, bad); 3384 ceph_decode_need(p, end, len, bad); 3385 /* 3386 * Match "blocklisted (blacklisted)" from newer MDSes, 3387 * or "blacklisted" from older MDSes. 3388 */ 3389 if (err_str && strnstr(*p, "blacklisted", len)) 3390 *blocklisted = true; 3391 *p += len; 3392 } 3393 return 0; 3394 bad: 3395 return -1; 3396 } 3397 3398 /* 3399 * handle a mds session control message 3400 */ 3401 static void handle_session(struct ceph_mds_session *session, 3402 struct ceph_msg *msg) 3403 { 3404 struct ceph_mds_client *mdsc = session->s_mdsc; 3405 int mds = session->s_mds; 3406 int msg_version = le16_to_cpu(msg->hdr.version); 3407 void *p = msg->front.iov_base; 3408 void *end = p + msg->front.iov_len; 3409 struct ceph_mds_session_head *h; 3410 u32 op; 3411 u64 seq, features = 0; 3412 int wake = 0; 3413 bool blocklisted = false; 3414 3415 /* decode */ 3416 ceph_decode_need(&p, end, sizeof(*h), bad); 3417 h = p; 3418 p += sizeof(*h); 3419 3420 op = le32_to_cpu(h->op); 3421 seq = le64_to_cpu(h->seq); 3422 3423 if (msg_version >= 3) { 3424 u32 len; 3425 /* version >= 2, metadata */ 3426 if (__decode_session_metadata(&p, end, &blocklisted) < 0) 3427 goto bad; 3428 /* version >= 3, feature bits */ 3429 ceph_decode_32_safe(&p, end, len, bad); 3430 if (len) { 3431 ceph_decode_64_safe(&p, end, features, bad); 3432 p += len - sizeof(features); 3433 } 3434 } 3435 3436 mutex_lock(&mdsc->mutex); 3437 if (op == CEPH_SESSION_CLOSE) { 3438 ceph_get_mds_session(session); 3439 __unregister_session(mdsc, session); 3440 } 3441 /* FIXME: this ttl calculation is generous */ 3442 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 3443 mutex_unlock(&mdsc->mutex); 3444 3445 mutex_lock(&session->s_mutex); 3446 3447 dout("handle_session mds%d %s %p state %s seq %llu\n", 3448 mds, ceph_session_op_name(op), session, 3449 ceph_session_state_name(session->s_state), seq); 3450 3451 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 3452 session->s_state = CEPH_MDS_SESSION_OPEN; 3453 pr_info("mds%d came back\n", session->s_mds); 3454 } 3455 3456 switch (op) { 3457 case CEPH_SESSION_OPEN: 3458 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3459 pr_info("mds%d reconnect success\n", session->s_mds); 3460 session->s_state = CEPH_MDS_SESSION_OPEN; 3461 session->s_features = features; 3462 renewed_caps(mdsc, session, 0); 3463 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features)) 3464 metric_schedule_delayed(&mdsc->metric); 3465 wake = 1; 3466 if (mdsc->stopping) 3467 __close_session(mdsc, session); 3468 break; 3469 3470 case CEPH_SESSION_RENEWCAPS: 3471 if (session->s_renew_seq == seq) 3472 renewed_caps(mdsc, session, 1); 3473 break; 3474 3475 case CEPH_SESSION_CLOSE: 3476 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3477 pr_info("mds%d reconnect denied\n", session->s_mds); 3478 session->s_state = CEPH_MDS_SESSION_CLOSED; 3479 cleanup_session_requests(mdsc, session); 3480 remove_session_caps(session); 3481 wake = 2; /* for good measure */ 3482 wake_up_all(&mdsc->session_close_wq); 3483 break; 3484 3485 case CEPH_SESSION_STALE: 3486 pr_info("mds%d caps went stale, renewing\n", 3487 session->s_mds); 3488 spin_lock(&session->s_gen_ttl_lock); 3489 session->s_cap_gen++; 3490 session->s_cap_ttl = jiffies - 1; 3491 spin_unlock(&session->s_gen_ttl_lock); 3492 send_renew_caps(mdsc, session); 3493 break; 3494 3495 case CEPH_SESSION_RECALL_STATE: 3496 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 3497 break; 3498 3499 case CEPH_SESSION_FLUSHMSG: 3500 send_flushmsg_ack(mdsc, session, seq); 3501 break; 3502 3503 case CEPH_SESSION_FORCE_RO: 3504 dout("force_session_readonly %p\n", session); 3505 spin_lock(&session->s_cap_lock); 3506 session->s_readonly = true; 3507 spin_unlock(&session->s_cap_lock); 3508 wake_up_session_caps(session, FORCE_RO); 3509 break; 3510 3511 case CEPH_SESSION_REJECT: 3512 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 3513 pr_info("mds%d rejected session\n", session->s_mds); 3514 session->s_state = CEPH_MDS_SESSION_REJECTED; 3515 cleanup_session_requests(mdsc, session); 3516 remove_session_caps(session); 3517 if (blocklisted) 3518 mdsc->fsc->blocklisted = true; 3519 wake = 2; /* for good measure */ 3520 break; 3521 3522 default: 3523 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 3524 WARN_ON(1); 3525 } 3526 3527 mutex_unlock(&session->s_mutex); 3528 if (wake) { 3529 mutex_lock(&mdsc->mutex); 3530 __wake_requests(mdsc, &session->s_waiting); 3531 if (wake == 2) 3532 kick_requests(mdsc, mds); 3533 mutex_unlock(&mdsc->mutex); 3534 } 3535 if (op == CEPH_SESSION_CLOSE) 3536 ceph_put_mds_session(session); 3537 return; 3538 3539 bad: 3540 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 3541 (int)msg->front.iov_len); 3542 ceph_msg_dump(msg); 3543 return; 3544 } 3545 3546 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 3547 { 3548 int dcaps; 3549 3550 dcaps = xchg(&req->r_dir_caps, 0); 3551 if (dcaps) { 3552 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3553 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 3554 } 3555 } 3556 3557 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req) 3558 { 3559 int dcaps; 3560 3561 dcaps = xchg(&req->r_dir_caps, 0); 3562 if (dcaps) { 3563 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3564 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent), 3565 dcaps); 3566 } 3567 } 3568 3569 /* 3570 * called under session->mutex. 3571 */ 3572 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 3573 struct ceph_mds_session *session) 3574 { 3575 struct ceph_mds_request *req, *nreq; 3576 struct rb_node *p; 3577 3578 dout("replay_unsafe_requests mds%d\n", session->s_mds); 3579 3580 mutex_lock(&mdsc->mutex); 3581 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 3582 __send_request(session, req, true); 3583 3584 /* 3585 * also re-send old requests when MDS enters reconnect stage. So that MDS 3586 * can process completed request in clientreplay stage. 3587 */ 3588 p = rb_first(&mdsc->request_tree); 3589 while (p) { 3590 req = rb_entry(p, struct ceph_mds_request, r_node); 3591 p = rb_next(p); 3592 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3593 continue; 3594 if (req->r_attempts == 0) 3595 continue; /* only old requests */ 3596 if (!req->r_session) 3597 continue; 3598 if (req->r_session->s_mds != session->s_mds) 3599 continue; 3600 3601 ceph_mdsc_release_dir_caps_no_check(req); 3602 3603 __send_request(session, req, true); 3604 } 3605 mutex_unlock(&mdsc->mutex); 3606 } 3607 3608 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 3609 { 3610 struct ceph_msg *reply; 3611 struct ceph_pagelist *_pagelist; 3612 struct page *page; 3613 __le32 *addr; 3614 int err = -ENOMEM; 3615 3616 if (!recon_state->allow_multi) 3617 return -ENOSPC; 3618 3619 /* can't handle message that contains both caps and realm */ 3620 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 3621 3622 /* pre-allocate new pagelist */ 3623 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 3624 if (!_pagelist) 3625 return -ENOMEM; 3626 3627 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3628 if (!reply) 3629 goto fail_msg; 3630 3631 /* placeholder for nr_caps */ 3632 err = ceph_pagelist_encode_32(_pagelist, 0); 3633 if (err < 0) 3634 goto fail; 3635 3636 if (recon_state->nr_caps) { 3637 /* currently encoding caps */ 3638 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 3639 if (err) 3640 goto fail; 3641 } else { 3642 /* placeholder for nr_realms (currently encoding relams) */ 3643 err = ceph_pagelist_encode_32(_pagelist, 0); 3644 if (err < 0) 3645 goto fail; 3646 } 3647 3648 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 3649 if (err) 3650 goto fail; 3651 3652 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 3653 addr = kmap_atomic(page); 3654 if (recon_state->nr_caps) { 3655 /* currently encoding caps */ 3656 *addr = cpu_to_le32(recon_state->nr_caps); 3657 } else { 3658 /* currently encoding relams */ 3659 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 3660 } 3661 kunmap_atomic(addr); 3662 3663 reply->hdr.version = cpu_to_le16(5); 3664 reply->hdr.compat_version = cpu_to_le16(4); 3665 3666 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 3667 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 3668 3669 ceph_con_send(&recon_state->session->s_con, reply); 3670 ceph_pagelist_release(recon_state->pagelist); 3671 3672 recon_state->pagelist = _pagelist; 3673 recon_state->nr_caps = 0; 3674 recon_state->nr_realms = 0; 3675 recon_state->msg_version = 5; 3676 return 0; 3677 fail: 3678 ceph_msg_put(reply); 3679 fail_msg: 3680 ceph_pagelist_release(_pagelist); 3681 return err; 3682 } 3683 3684 static struct dentry* d_find_primary(struct inode *inode) 3685 { 3686 struct dentry *alias, *dn = NULL; 3687 3688 if (hlist_empty(&inode->i_dentry)) 3689 return NULL; 3690 3691 spin_lock(&inode->i_lock); 3692 if (hlist_empty(&inode->i_dentry)) 3693 goto out_unlock; 3694 3695 if (S_ISDIR(inode->i_mode)) { 3696 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias); 3697 if (!IS_ROOT(alias)) 3698 dn = dget(alias); 3699 goto out_unlock; 3700 } 3701 3702 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) { 3703 spin_lock(&alias->d_lock); 3704 if (!d_unhashed(alias) && 3705 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 3706 dn = dget_dlock(alias); 3707 } 3708 spin_unlock(&alias->d_lock); 3709 if (dn) 3710 break; 3711 } 3712 out_unlock: 3713 spin_unlock(&inode->i_lock); 3714 return dn; 3715 } 3716 3717 /* 3718 * Encode information about a cap for a reconnect with the MDS. 3719 */ 3720 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, 3721 void *arg) 3722 { 3723 union { 3724 struct ceph_mds_cap_reconnect v2; 3725 struct ceph_mds_cap_reconnect_v1 v1; 3726 } rec; 3727 struct ceph_inode_info *ci = cap->ci; 3728 struct ceph_reconnect_state *recon_state = arg; 3729 struct ceph_pagelist *pagelist = recon_state->pagelist; 3730 struct dentry *dentry; 3731 char *path; 3732 int pathlen, err; 3733 u64 pathbase; 3734 u64 snap_follows; 3735 3736 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 3737 inode, ceph_vinop(inode), cap, cap->cap_id, 3738 ceph_cap_string(cap->issued)); 3739 3740 dentry = d_find_primary(inode); 3741 if (dentry) { 3742 /* set pathbase to parent dir when msg_version >= 2 */ 3743 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 3744 recon_state->msg_version >= 2); 3745 dput(dentry); 3746 if (IS_ERR(path)) { 3747 err = PTR_ERR(path); 3748 goto out_err; 3749 } 3750 } else { 3751 path = NULL; 3752 pathlen = 0; 3753 pathbase = 0; 3754 } 3755 3756 spin_lock(&ci->i_ceph_lock); 3757 cap->seq = 0; /* reset cap seq */ 3758 cap->issue_seq = 0; /* and issue_seq */ 3759 cap->mseq = 0; /* and migrate_seq */ 3760 cap->cap_gen = cap->session->s_cap_gen; 3761 3762 /* These are lost when the session goes away */ 3763 if (S_ISDIR(inode->i_mode)) { 3764 if (cap->issued & CEPH_CAP_DIR_CREATE) { 3765 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 3766 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 3767 } 3768 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 3769 } 3770 3771 if (recon_state->msg_version >= 2) { 3772 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 3773 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3774 rec.v2.issued = cpu_to_le32(cap->issued); 3775 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3776 rec.v2.pathbase = cpu_to_le64(pathbase); 3777 rec.v2.flock_len = (__force __le32) 3778 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 3779 } else { 3780 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 3781 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3782 rec.v1.issued = cpu_to_le32(cap->issued); 3783 rec.v1.size = cpu_to_le64(inode->i_size); 3784 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 3785 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 3786 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3787 rec.v1.pathbase = cpu_to_le64(pathbase); 3788 } 3789 3790 if (list_empty(&ci->i_cap_snaps)) { 3791 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 3792 } else { 3793 struct ceph_cap_snap *capsnap = 3794 list_first_entry(&ci->i_cap_snaps, 3795 struct ceph_cap_snap, ci_item); 3796 snap_follows = capsnap->follows; 3797 } 3798 spin_unlock(&ci->i_ceph_lock); 3799 3800 if (recon_state->msg_version >= 2) { 3801 int num_fcntl_locks, num_flock_locks; 3802 struct ceph_filelock *flocks = NULL; 3803 size_t struct_len, total_len = sizeof(u64); 3804 u8 struct_v = 0; 3805 3806 encode_again: 3807 if (rec.v2.flock_len) { 3808 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 3809 } else { 3810 num_fcntl_locks = 0; 3811 num_flock_locks = 0; 3812 } 3813 if (num_fcntl_locks + num_flock_locks > 0) { 3814 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 3815 sizeof(struct ceph_filelock), 3816 GFP_NOFS); 3817 if (!flocks) { 3818 err = -ENOMEM; 3819 goto out_err; 3820 } 3821 err = ceph_encode_locks_to_buffer(inode, flocks, 3822 num_fcntl_locks, 3823 num_flock_locks); 3824 if (err) { 3825 kfree(flocks); 3826 flocks = NULL; 3827 if (err == -ENOSPC) 3828 goto encode_again; 3829 goto out_err; 3830 } 3831 } else { 3832 kfree(flocks); 3833 flocks = NULL; 3834 } 3835 3836 if (recon_state->msg_version >= 3) { 3837 /* version, compat_version and struct_len */ 3838 total_len += 2 * sizeof(u8) + sizeof(u32); 3839 struct_v = 2; 3840 } 3841 /* 3842 * number of encoded locks is stable, so copy to pagelist 3843 */ 3844 struct_len = 2 * sizeof(u32) + 3845 (num_fcntl_locks + num_flock_locks) * 3846 sizeof(struct ceph_filelock); 3847 rec.v2.flock_len = cpu_to_le32(struct_len); 3848 3849 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2); 3850 3851 if (struct_v >= 2) 3852 struct_len += sizeof(u64); /* snap_follows */ 3853 3854 total_len += struct_len; 3855 3856 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 3857 err = send_reconnect_partial(recon_state); 3858 if (err) 3859 goto out_freeflocks; 3860 pagelist = recon_state->pagelist; 3861 } 3862 3863 err = ceph_pagelist_reserve(pagelist, total_len); 3864 if (err) 3865 goto out_freeflocks; 3866 3867 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3868 if (recon_state->msg_version >= 3) { 3869 ceph_pagelist_encode_8(pagelist, struct_v); 3870 ceph_pagelist_encode_8(pagelist, 1); 3871 ceph_pagelist_encode_32(pagelist, struct_len); 3872 } 3873 ceph_pagelist_encode_string(pagelist, path, pathlen); 3874 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 3875 ceph_locks_to_pagelist(flocks, pagelist, 3876 num_fcntl_locks, num_flock_locks); 3877 if (struct_v >= 2) 3878 ceph_pagelist_encode_64(pagelist, snap_follows); 3879 out_freeflocks: 3880 kfree(flocks); 3881 } else { 3882 err = ceph_pagelist_reserve(pagelist, 3883 sizeof(u64) + sizeof(u32) + 3884 pathlen + sizeof(rec.v1)); 3885 if (err) 3886 goto out_err; 3887 3888 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3889 ceph_pagelist_encode_string(pagelist, path, pathlen); 3890 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 3891 } 3892 3893 out_err: 3894 ceph_mdsc_free_path(path, pathlen); 3895 if (!err) 3896 recon_state->nr_caps++; 3897 return err; 3898 } 3899 3900 static int encode_snap_realms(struct ceph_mds_client *mdsc, 3901 struct ceph_reconnect_state *recon_state) 3902 { 3903 struct rb_node *p; 3904 struct ceph_pagelist *pagelist = recon_state->pagelist; 3905 int err = 0; 3906 3907 if (recon_state->msg_version >= 4) { 3908 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 3909 if (err < 0) 3910 goto fail; 3911 } 3912 3913 /* 3914 * snaprealms. we provide mds with the ino, seq (version), and 3915 * parent for all of our realms. If the mds has any newer info, 3916 * it will tell us. 3917 */ 3918 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 3919 struct ceph_snap_realm *realm = 3920 rb_entry(p, struct ceph_snap_realm, node); 3921 struct ceph_mds_snaprealm_reconnect sr_rec; 3922 3923 if (recon_state->msg_version >= 4) { 3924 size_t need = sizeof(u8) * 2 + sizeof(u32) + 3925 sizeof(sr_rec); 3926 3927 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 3928 err = send_reconnect_partial(recon_state); 3929 if (err) 3930 goto fail; 3931 pagelist = recon_state->pagelist; 3932 } 3933 3934 err = ceph_pagelist_reserve(pagelist, need); 3935 if (err) 3936 goto fail; 3937 3938 ceph_pagelist_encode_8(pagelist, 1); 3939 ceph_pagelist_encode_8(pagelist, 1); 3940 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 3941 } 3942 3943 dout(" adding snap realm %llx seq %lld parent %llx\n", 3944 realm->ino, realm->seq, realm->parent_ino); 3945 sr_rec.ino = cpu_to_le64(realm->ino); 3946 sr_rec.seq = cpu_to_le64(realm->seq); 3947 sr_rec.parent = cpu_to_le64(realm->parent_ino); 3948 3949 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 3950 if (err) 3951 goto fail; 3952 3953 recon_state->nr_realms++; 3954 } 3955 fail: 3956 return err; 3957 } 3958 3959 3960 /* 3961 * If an MDS fails and recovers, clients need to reconnect in order to 3962 * reestablish shared state. This includes all caps issued through 3963 * this session _and_ the snap_realm hierarchy. Because it's not 3964 * clear which snap realms the mds cares about, we send everything we 3965 * know about.. that ensures we'll then get any new info the 3966 * recovering MDS might have. 3967 * 3968 * This is a relatively heavyweight operation, but it's rare. 3969 */ 3970 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 3971 struct ceph_mds_session *session) 3972 { 3973 struct ceph_msg *reply; 3974 int mds = session->s_mds; 3975 int err = -ENOMEM; 3976 struct ceph_reconnect_state recon_state = { 3977 .session = session, 3978 }; 3979 LIST_HEAD(dispose); 3980 3981 pr_info("mds%d reconnect start\n", mds); 3982 3983 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 3984 if (!recon_state.pagelist) 3985 goto fail_nopagelist; 3986 3987 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3988 if (!reply) 3989 goto fail_nomsg; 3990 3991 xa_destroy(&session->s_delegated_inos); 3992 3993 mutex_lock(&session->s_mutex); 3994 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 3995 session->s_seq = 0; 3996 3997 dout("session %p state %s\n", session, 3998 ceph_session_state_name(session->s_state)); 3999 4000 spin_lock(&session->s_gen_ttl_lock); 4001 session->s_cap_gen++; 4002 spin_unlock(&session->s_gen_ttl_lock); 4003 4004 spin_lock(&session->s_cap_lock); 4005 /* don't know if session is readonly */ 4006 session->s_readonly = 0; 4007 /* 4008 * notify __ceph_remove_cap() that we are composing cap reconnect. 4009 * If a cap get released before being added to the cap reconnect, 4010 * __ceph_remove_cap() should skip queuing cap release. 4011 */ 4012 session->s_cap_reconnect = 1; 4013 /* drop old cap expires; we're about to reestablish that state */ 4014 detach_cap_releases(session, &dispose); 4015 spin_unlock(&session->s_cap_lock); 4016 dispose_cap_releases(mdsc, &dispose); 4017 4018 /* trim unused caps to reduce MDS's cache rejoin time */ 4019 if (mdsc->fsc->sb->s_root) 4020 shrink_dcache_parent(mdsc->fsc->sb->s_root); 4021 4022 ceph_con_close(&session->s_con); 4023 ceph_con_open(&session->s_con, 4024 CEPH_ENTITY_TYPE_MDS, mds, 4025 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 4026 4027 /* replay unsafe requests */ 4028 replay_unsafe_requests(mdsc, session); 4029 4030 ceph_early_kick_flushing_caps(mdsc, session); 4031 4032 down_read(&mdsc->snap_rwsem); 4033 4034 /* placeholder for nr_caps */ 4035 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 4036 if (err) 4037 goto fail; 4038 4039 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 4040 recon_state.msg_version = 3; 4041 recon_state.allow_multi = true; 4042 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 4043 recon_state.msg_version = 3; 4044 } else { 4045 recon_state.msg_version = 2; 4046 } 4047 /* trsaverse this session's caps */ 4048 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 4049 4050 spin_lock(&session->s_cap_lock); 4051 session->s_cap_reconnect = 0; 4052 spin_unlock(&session->s_cap_lock); 4053 4054 if (err < 0) 4055 goto fail; 4056 4057 /* check if all realms can be encoded into current message */ 4058 if (mdsc->num_snap_realms) { 4059 size_t total_len = 4060 recon_state.pagelist->length + 4061 mdsc->num_snap_realms * 4062 sizeof(struct ceph_mds_snaprealm_reconnect); 4063 if (recon_state.msg_version >= 4) { 4064 /* number of realms */ 4065 total_len += sizeof(u32); 4066 /* version, compat_version and struct_len */ 4067 total_len += mdsc->num_snap_realms * 4068 (2 * sizeof(u8) + sizeof(u32)); 4069 } 4070 if (total_len > RECONNECT_MAX_SIZE) { 4071 if (!recon_state.allow_multi) { 4072 err = -ENOSPC; 4073 goto fail; 4074 } 4075 if (recon_state.nr_caps) { 4076 err = send_reconnect_partial(&recon_state); 4077 if (err) 4078 goto fail; 4079 } 4080 recon_state.msg_version = 5; 4081 } 4082 } 4083 4084 err = encode_snap_realms(mdsc, &recon_state); 4085 if (err < 0) 4086 goto fail; 4087 4088 if (recon_state.msg_version >= 5) { 4089 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 4090 if (err < 0) 4091 goto fail; 4092 } 4093 4094 if (recon_state.nr_caps || recon_state.nr_realms) { 4095 struct page *page = 4096 list_first_entry(&recon_state.pagelist->head, 4097 struct page, lru); 4098 __le32 *addr = kmap_atomic(page); 4099 if (recon_state.nr_caps) { 4100 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 4101 *addr = cpu_to_le32(recon_state.nr_caps); 4102 } else if (recon_state.msg_version >= 4) { 4103 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 4104 } 4105 kunmap_atomic(addr); 4106 } 4107 4108 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 4109 if (recon_state.msg_version >= 4) 4110 reply->hdr.compat_version = cpu_to_le16(4); 4111 4112 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 4113 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 4114 4115 ceph_con_send(&session->s_con, reply); 4116 4117 mutex_unlock(&session->s_mutex); 4118 4119 mutex_lock(&mdsc->mutex); 4120 __wake_requests(mdsc, &session->s_waiting); 4121 mutex_unlock(&mdsc->mutex); 4122 4123 up_read(&mdsc->snap_rwsem); 4124 ceph_pagelist_release(recon_state.pagelist); 4125 return; 4126 4127 fail: 4128 ceph_msg_put(reply); 4129 up_read(&mdsc->snap_rwsem); 4130 mutex_unlock(&session->s_mutex); 4131 fail_nomsg: 4132 ceph_pagelist_release(recon_state.pagelist); 4133 fail_nopagelist: 4134 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 4135 return; 4136 } 4137 4138 4139 /* 4140 * compare old and new mdsmaps, kicking requests 4141 * and closing out old connections as necessary 4142 * 4143 * called under mdsc->mutex. 4144 */ 4145 static void check_new_map(struct ceph_mds_client *mdsc, 4146 struct ceph_mdsmap *newmap, 4147 struct ceph_mdsmap *oldmap) 4148 { 4149 int i; 4150 int oldstate, newstate; 4151 struct ceph_mds_session *s; 4152 4153 dout("check_new_map new %u old %u\n", 4154 newmap->m_epoch, oldmap->m_epoch); 4155 4156 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4157 if (!mdsc->sessions[i]) 4158 continue; 4159 s = mdsc->sessions[i]; 4160 oldstate = ceph_mdsmap_get_state(oldmap, i); 4161 newstate = ceph_mdsmap_get_state(newmap, i); 4162 4163 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 4164 i, ceph_mds_state_name(oldstate), 4165 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 4166 ceph_mds_state_name(newstate), 4167 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 4168 ceph_session_state_name(s->s_state)); 4169 4170 if (i >= newmap->possible_max_rank) { 4171 /* force close session for stopped mds */ 4172 ceph_get_mds_session(s); 4173 __unregister_session(mdsc, s); 4174 __wake_requests(mdsc, &s->s_waiting); 4175 mutex_unlock(&mdsc->mutex); 4176 4177 mutex_lock(&s->s_mutex); 4178 cleanup_session_requests(mdsc, s); 4179 remove_session_caps(s); 4180 mutex_unlock(&s->s_mutex); 4181 4182 ceph_put_mds_session(s); 4183 4184 mutex_lock(&mdsc->mutex); 4185 kick_requests(mdsc, i); 4186 continue; 4187 } 4188 4189 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 4190 ceph_mdsmap_get_addr(newmap, i), 4191 sizeof(struct ceph_entity_addr))) { 4192 /* just close it */ 4193 mutex_unlock(&mdsc->mutex); 4194 mutex_lock(&s->s_mutex); 4195 mutex_lock(&mdsc->mutex); 4196 ceph_con_close(&s->s_con); 4197 mutex_unlock(&s->s_mutex); 4198 s->s_state = CEPH_MDS_SESSION_RESTARTING; 4199 } else if (oldstate == newstate) { 4200 continue; /* nothing new with this mds */ 4201 } 4202 4203 /* 4204 * send reconnect? 4205 */ 4206 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 4207 newstate >= CEPH_MDS_STATE_RECONNECT) { 4208 mutex_unlock(&mdsc->mutex); 4209 send_mds_reconnect(mdsc, s); 4210 mutex_lock(&mdsc->mutex); 4211 } 4212 4213 /* 4214 * kick request on any mds that has gone active. 4215 */ 4216 if (oldstate < CEPH_MDS_STATE_ACTIVE && 4217 newstate >= CEPH_MDS_STATE_ACTIVE) { 4218 if (oldstate != CEPH_MDS_STATE_CREATING && 4219 oldstate != CEPH_MDS_STATE_STARTING) 4220 pr_info("mds%d recovery completed\n", s->s_mds); 4221 kick_requests(mdsc, i); 4222 mutex_unlock(&mdsc->mutex); 4223 mutex_lock(&s->s_mutex); 4224 mutex_lock(&mdsc->mutex); 4225 ceph_kick_flushing_caps(mdsc, s); 4226 mutex_unlock(&s->s_mutex); 4227 wake_up_session_caps(s, RECONNECT); 4228 } 4229 } 4230 4231 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4232 s = mdsc->sessions[i]; 4233 if (!s) 4234 continue; 4235 if (!ceph_mdsmap_is_laggy(newmap, i)) 4236 continue; 4237 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4238 s->s_state == CEPH_MDS_SESSION_HUNG || 4239 s->s_state == CEPH_MDS_SESSION_CLOSING) { 4240 dout(" connecting to export targets of laggy mds%d\n", 4241 i); 4242 __open_export_target_sessions(mdsc, s); 4243 } 4244 } 4245 } 4246 4247 4248 4249 /* 4250 * leases 4251 */ 4252 4253 /* 4254 * caller must hold session s_mutex, dentry->d_lock 4255 */ 4256 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 4257 { 4258 struct ceph_dentry_info *di = ceph_dentry(dentry); 4259 4260 ceph_put_mds_session(di->lease_session); 4261 di->lease_session = NULL; 4262 } 4263 4264 static void handle_lease(struct ceph_mds_client *mdsc, 4265 struct ceph_mds_session *session, 4266 struct ceph_msg *msg) 4267 { 4268 struct super_block *sb = mdsc->fsc->sb; 4269 struct inode *inode; 4270 struct dentry *parent, *dentry; 4271 struct ceph_dentry_info *di; 4272 int mds = session->s_mds; 4273 struct ceph_mds_lease *h = msg->front.iov_base; 4274 u32 seq; 4275 struct ceph_vino vino; 4276 struct qstr dname; 4277 int release = 0; 4278 4279 dout("handle_lease from mds%d\n", mds); 4280 4281 /* decode */ 4282 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 4283 goto bad; 4284 vino.ino = le64_to_cpu(h->ino); 4285 vino.snap = CEPH_NOSNAP; 4286 seq = le32_to_cpu(h->seq); 4287 dname.len = get_unaligned_le32(h + 1); 4288 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 4289 goto bad; 4290 dname.name = (void *)(h + 1) + sizeof(u32); 4291 4292 /* lookup inode */ 4293 inode = ceph_find_inode(sb, vino); 4294 dout("handle_lease %s, ino %llx %p %.*s\n", 4295 ceph_lease_op_name(h->action), vino.ino, inode, 4296 dname.len, dname.name); 4297 4298 mutex_lock(&session->s_mutex); 4299 inc_session_sequence(session); 4300 4301 if (!inode) { 4302 dout("handle_lease no inode %llx\n", vino.ino); 4303 goto release; 4304 } 4305 4306 /* dentry */ 4307 parent = d_find_alias(inode); 4308 if (!parent) { 4309 dout("no parent dentry on inode %p\n", inode); 4310 WARN_ON(1); 4311 goto release; /* hrm... */ 4312 } 4313 dname.hash = full_name_hash(parent, dname.name, dname.len); 4314 dentry = d_lookup(parent, &dname); 4315 dput(parent); 4316 if (!dentry) 4317 goto release; 4318 4319 spin_lock(&dentry->d_lock); 4320 di = ceph_dentry(dentry); 4321 switch (h->action) { 4322 case CEPH_MDS_LEASE_REVOKE: 4323 if (di->lease_session == session) { 4324 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 4325 h->seq = cpu_to_le32(di->lease_seq); 4326 __ceph_mdsc_drop_dentry_lease(dentry); 4327 } 4328 release = 1; 4329 break; 4330 4331 case CEPH_MDS_LEASE_RENEW: 4332 if (di->lease_session == session && 4333 di->lease_gen == session->s_cap_gen && 4334 di->lease_renew_from && 4335 di->lease_renew_after == 0) { 4336 unsigned long duration = 4337 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 4338 4339 di->lease_seq = seq; 4340 di->time = di->lease_renew_from + duration; 4341 di->lease_renew_after = di->lease_renew_from + 4342 (duration >> 1); 4343 di->lease_renew_from = 0; 4344 } 4345 break; 4346 } 4347 spin_unlock(&dentry->d_lock); 4348 dput(dentry); 4349 4350 if (!release) 4351 goto out; 4352 4353 release: 4354 /* let's just reuse the same message */ 4355 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 4356 ceph_msg_get(msg); 4357 ceph_con_send(&session->s_con, msg); 4358 4359 out: 4360 mutex_unlock(&session->s_mutex); 4361 /* avoid calling iput_final() in mds dispatch threads */ 4362 ceph_async_iput(inode); 4363 return; 4364 4365 bad: 4366 pr_err("corrupt lease message\n"); 4367 ceph_msg_dump(msg); 4368 } 4369 4370 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 4371 struct dentry *dentry, char action, 4372 u32 seq) 4373 { 4374 struct ceph_msg *msg; 4375 struct ceph_mds_lease *lease; 4376 struct inode *dir; 4377 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 4378 4379 dout("lease_send_msg identry %p %s to mds%d\n", 4380 dentry, ceph_lease_op_name(action), session->s_mds); 4381 4382 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 4383 if (!msg) 4384 return; 4385 lease = msg->front.iov_base; 4386 lease->action = action; 4387 lease->seq = cpu_to_le32(seq); 4388 4389 spin_lock(&dentry->d_lock); 4390 dir = d_inode(dentry->d_parent); 4391 lease->ino = cpu_to_le64(ceph_ino(dir)); 4392 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 4393 4394 put_unaligned_le32(dentry->d_name.len, lease + 1); 4395 memcpy((void *)(lease + 1) + 4, 4396 dentry->d_name.name, dentry->d_name.len); 4397 spin_unlock(&dentry->d_lock); 4398 /* 4399 * if this is a preemptive lease RELEASE, no need to 4400 * flush request stream, since the actual request will 4401 * soon follow. 4402 */ 4403 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 4404 4405 ceph_con_send(&session->s_con, msg); 4406 } 4407 4408 /* 4409 * lock unlock sessions, to wait ongoing session activities 4410 */ 4411 static void lock_unlock_sessions(struct ceph_mds_client *mdsc) 4412 { 4413 int i; 4414 4415 mutex_lock(&mdsc->mutex); 4416 for (i = 0; i < mdsc->max_sessions; i++) { 4417 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4418 if (!s) 4419 continue; 4420 mutex_unlock(&mdsc->mutex); 4421 mutex_lock(&s->s_mutex); 4422 mutex_unlock(&s->s_mutex); 4423 ceph_put_mds_session(s); 4424 mutex_lock(&mdsc->mutex); 4425 } 4426 mutex_unlock(&mdsc->mutex); 4427 } 4428 4429 static void maybe_recover_session(struct ceph_mds_client *mdsc) 4430 { 4431 struct ceph_fs_client *fsc = mdsc->fsc; 4432 4433 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 4434 return; 4435 4436 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 4437 return; 4438 4439 if (!READ_ONCE(fsc->blocklisted)) 4440 return; 4441 4442 pr_info("auto reconnect after blocklisted\n"); 4443 ceph_force_reconnect(fsc->sb); 4444 } 4445 4446 bool check_session_state(struct ceph_mds_session *s) 4447 { 4448 switch (s->s_state) { 4449 case CEPH_MDS_SESSION_OPEN: 4450 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 4451 s->s_state = CEPH_MDS_SESSION_HUNG; 4452 pr_info("mds%d hung\n", s->s_mds); 4453 } 4454 break; 4455 case CEPH_MDS_SESSION_CLOSING: 4456 /* Should never reach this when we're unmounting */ 4457 WARN_ON_ONCE(true); 4458 fallthrough; 4459 case CEPH_MDS_SESSION_NEW: 4460 case CEPH_MDS_SESSION_RESTARTING: 4461 case CEPH_MDS_SESSION_CLOSED: 4462 case CEPH_MDS_SESSION_REJECTED: 4463 return false; 4464 } 4465 4466 return true; 4467 } 4468 4469 /* 4470 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 4471 * then we need to retransmit that request. 4472 */ 4473 void inc_session_sequence(struct ceph_mds_session *s) 4474 { 4475 lockdep_assert_held(&s->s_mutex); 4476 4477 s->s_seq++; 4478 4479 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 4480 int ret; 4481 4482 dout("resending session close request for mds%d\n", s->s_mds); 4483 ret = request_close_session(s); 4484 if (ret < 0) 4485 pr_err("unable to close session to mds%d: %d\n", 4486 s->s_mds, ret); 4487 } 4488 } 4489 4490 /* 4491 * delayed work -- periodically trim expired leases, renew caps with mds 4492 */ 4493 static void schedule_delayed(struct ceph_mds_client *mdsc) 4494 { 4495 int delay = 5; 4496 unsigned hz = round_jiffies_relative(HZ * delay); 4497 schedule_delayed_work(&mdsc->delayed_work, hz); 4498 } 4499 4500 static void delayed_work(struct work_struct *work) 4501 { 4502 int i; 4503 struct ceph_mds_client *mdsc = 4504 container_of(work, struct ceph_mds_client, delayed_work.work); 4505 int renew_interval; 4506 int renew_caps; 4507 4508 dout("mdsc delayed_work\n"); 4509 4510 if (mdsc->stopping) 4511 return; 4512 4513 mutex_lock(&mdsc->mutex); 4514 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 4515 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 4516 mdsc->last_renew_caps); 4517 if (renew_caps) 4518 mdsc->last_renew_caps = jiffies; 4519 4520 for (i = 0; i < mdsc->max_sessions; i++) { 4521 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4522 if (!s) 4523 continue; 4524 4525 if (!check_session_state(s)) { 4526 ceph_put_mds_session(s); 4527 continue; 4528 } 4529 mutex_unlock(&mdsc->mutex); 4530 4531 mutex_lock(&s->s_mutex); 4532 if (renew_caps) 4533 send_renew_caps(mdsc, s); 4534 else 4535 ceph_con_keepalive(&s->s_con); 4536 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4537 s->s_state == CEPH_MDS_SESSION_HUNG) 4538 ceph_send_cap_releases(mdsc, s); 4539 mutex_unlock(&s->s_mutex); 4540 ceph_put_mds_session(s); 4541 4542 mutex_lock(&mdsc->mutex); 4543 } 4544 mutex_unlock(&mdsc->mutex); 4545 4546 ceph_check_delayed_caps(mdsc); 4547 4548 ceph_queue_cap_reclaim_work(mdsc); 4549 4550 ceph_trim_snapid_map(mdsc); 4551 4552 maybe_recover_session(mdsc); 4553 4554 schedule_delayed(mdsc); 4555 } 4556 4557 int ceph_mdsc_init(struct ceph_fs_client *fsc) 4558 4559 { 4560 struct ceph_mds_client *mdsc; 4561 int err; 4562 4563 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 4564 if (!mdsc) 4565 return -ENOMEM; 4566 mdsc->fsc = fsc; 4567 mutex_init(&mdsc->mutex); 4568 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 4569 if (!mdsc->mdsmap) { 4570 err = -ENOMEM; 4571 goto err_mdsc; 4572 } 4573 4574 init_completion(&mdsc->safe_umount_waiters); 4575 init_waitqueue_head(&mdsc->session_close_wq); 4576 INIT_LIST_HEAD(&mdsc->waiting_for_map); 4577 mdsc->sessions = NULL; 4578 atomic_set(&mdsc->num_sessions, 0); 4579 mdsc->max_sessions = 0; 4580 mdsc->stopping = 0; 4581 atomic64_set(&mdsc->quotarealms_count, 0); 4582 mdsc->quotarealms_inodes = RB_ROOT; 4583 mutex_init(&mdsc->quotarealms_inodes_mutex); 4584 mdsc->last_snap_seq = 0; 4585 init_rwsem(&mdsc->snap_rwsem); 4586 mdsc->snap_realms = RB_ROOT; 4587 INIT_LIST_HEAD(&mdsc->snap_empty); 4588 mdsc->num_snap_realms = 0; 4589 spin_lock_init(&mdsc->snap_empty_lock); 4590 mdsc->last_tid = 0; 4591 mdsc->oldest_tid = 0; 4592 mdsc->request_tree = RB_ROOT; 4593 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 4594 mdsc->last_renew_caps = jiffies; 4595 INIT_LIST_HEAD(&mdsc->cap_delay_list); 4596 INIT_LIST_HEAD(&mdsc->cap_wait_list); 4597 spin_lock_init(&mdsc->cap_delay_lock); 4598 INIT_LIST_HEAD(&mdsc->snap_flush_list); 4599 spin_lock_init(&mdsc->snap_flush_lock); 4600 mdsc->last_cap_flush_tid = 1; 4601 INIT_LIST_HEAD(&mdsc->cap_flush_list); 4602 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 4603 mdsc->num_cap_flushing = 0; 4604 spin_lock_init(&mdsc->cap_dirty_lock); 4605 init_waitqueue_head(&mdsc->cap_flushing_wq); 4606 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 4607 atomic_set(&mdsc->cap_reclaim_pending, 0); 4608 err = ceph_metric_init(&mdsc->metric); 4609 if (err) 4610 goto err_mdsmap; 4611 4612 spin_lock_init(&mdsc->dentry_list_lock); 4613 INIT_LIST_HEAD(&mdsc->dentry_leases); 4614 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 4615 4616 ceph_caps_init(mdsc); 4617 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 4618 4619 spin_lock_init(&mdsc->snapid_map_lock); 4620 mdsc->snapid_map_tree = RB_ROOT; 4621 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 4622 4623 init_rwsem(&mdsc->pool_perm_rwsem); 4624 mdsc->pool_perm_tree = RB_ROOT; 4625 4626 strscpy(mdsc->nodename, utsname()->nodename, 4627 sizeof(mdsc->nodename)); 4628 4629 fsc->mdsc = mdsc; 4630 return 0; 4631 4632 err_mdsmap: 4633 kfree(mdsc->mdsmap); 4634 err_mdsc: 4635 kfree(mdsc); 4636 return err; 4637 } 4638 4639 /* 4640 * Wait for safe replies on open mds requests. If we time out, drop 4641 * all requests from the tree to avoid dangling dentry refs. 4642 */ 4643 static void wait_requests(struct ceph_mds_client *mdsc) 4644 { 4645 struct ceph_options *opts = mdsc->fsc->client->options; 4646 struct ceph_mds_request *req; 4647 4648 mutex_lock(&mdsc->mutex); 4649 if (__get_oldest_req(mdsc)) { 4650 mutex_unlock(&mdsc->mutex); 4651 4652 dout("wait_requests waiting for requests\n"); 4653 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 4654 ceph_timeout_jiffies(opts->mount_timeout)); 4655 4656 /* tear down remaining requests */ 4657 mutex_lock(&mdsc->mutex); 4658 while ((req = __get_oldest_req(mdsc))) { 4659 dout("wait_requests timed out on tid %llu\n", 4660 req->r_tid); 4661 list_del_init(&req->r_wait); 4662 __unregister_request(mdsc, req); 4663 } 4664 } 4665 mutex_unlock(&mdsc->mutex); 4666 dout("wait_requests done\n"); 4667 } 4668 4669 /* 4670 * called before mount is ro, and before dentries are torn down. 4671 * (hmm, does this still race with new lookups?) 4672 */ 4673 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 4674 { 4675 dout("pre_umount\n"); 4676 mdsc->stopping = 1; 4677 4678 lock_unlock_sessions(mdsc); 4679 ceph_flush_dirty_caps(mdsc); 4680 wait_requests(mdsc); 4681 4682 /* 4683 * wait for reply handlers to drop their request refs and 4684 * their inode/dcache refs 4685 */ 4686 ceph_msgr_flush(); 4687 4688 ceph_cleanup_quotarealms_inodes(mdsc); 4689 } 4690 4691 /* 4692 * wait for all write mds requests to flush. 4693 */ 4694 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 4695 { 4696 struct ceph_mds_request *req = NULL, *nextreq; 4697 struct rb_node *n; 4698 4699 mutex_lock(&mdsc->mutex); 4700 dout("wait_unsafe_requests want %lld\n", want_tid); 4701 restart: 4702 req = __get_oldest_req(mdsc); 4703 while (req && req->r_tid <= want_tid) { 4704 /* find next request */ 4705 n = rb_next(&req->r_node); 4706 if (n) 4707 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 4708 else 4709 nextreq = NULL; 4710 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 4711 (req->r_op & CEPH_MDS_OP_WRITE)) { 4712 /* write op */ 4713 ceph_mdsc_get_request(req); 4714 if (nextreq) 4715 ceph_mdsc_get_request(nextreq); 4716 mutex_unlock(&mdsc->mutex); 4717 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 4718 req->r_tid, want_tid); 4719 wait_for_completion(&req->r_safe_completion); 4720 mutex_lock(&mdsc->mutex); 4721 ceph_mdsc_put_request(req); 4722 if (!nextreq) 4723 break; /* next dne before, so we're done! */ 4724 if (RB_EMPTY_NODE(&nextreq->r_node)) { 4725 /* next request was removed from tree */ 4726 ceph_mdsc_put_request(nextreq); 4727 goto restart; 4728 } 4729 ceph_mdsc_put_request(nextreq); /* won't go away */ 4730 } 4731 req = nextreq; 4732 } 4733 mutex_unlock(&mdsc->mutex); 4734 dout("wait_unsafe_requests done\n"); 4735 } 4736 4737 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 4738 { 4739 u64 want_tid, want_flush; 4740 4741 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) 4742 return; 4743 4744 dout("sync\n"); 4745 mutex_lock(&mdsc->mutex); 4746 want_tid = mdsc->last_tid; 4747 mutex_unlock(&mdsc->mutex); 4748 4749 ceph_flush_dirty_caps(mdsc); 4750 spin_lock(&mdsc->cap_dirty_lock); 4751 want_flush = mdsc->last_cap_flush_tid; 4752 if (!list_empty(&mdsc->cap_flush_list)) { 4753 struct ceph_cap_flush *cf = 4754 list_last_entry(&mdsc->cap_flush_list, 4755 struct ceph_cap_flush, g_list); 4756 cf->wake = true; 4757 } 4758 spin_unlock(&mdsc->cap_dirty_lock); 4759 4760 dout("sync want tid %lld flush_seq %lld\n", 4761 want_tid, want_flush); 4762 4763 wait_unsafe_requests(mdsc, want_tid); 4764 wait_caps_flush(mdsc, want_flush); 4765 } 4766 4767 /* 4768 * true if all sessions are closed, or we force unmount 4769 */ 4770 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 4771 { 4772 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4773 return true; 4774 return atomic_read(&mdsc->num_sessions) <= skipped; 4775 } 4776 4777 /* 4778 * called after sb is ro. 4779 */ 4780 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 4781 { 4782 struct ceph_options *opts = mdsc->fsc->client->options; 4783 struct ceph_mds_session *session; 4784 int i; 4785 int skipped = 0; 4786 4787 dout("close_sessions\n"); 4788 4789 /* close sessions */ 4790 mutex_lock(&mdsc->mutex); 4791 for (i = 0; i < mdsc->max_sessions; i++) { 4792 session = __ceph_lookup_mds_session(mdsc, i); 4793 if (!session) 4794 continue; 4795 mutex_unlock(&mdsc->mutex); 4796 mutex_lock(&session->s_mutex); 4797 if (__close_session(mdsc, session) <= 0) 4798 skipped++; 4799 mutex_unlock(&session->s_mutex); 4800 ceph_put_mds_session(session); 4801 mutex_lock(&mdsc->mutex); 4802 } 4803 mutex_unlock(&mdsc->mutex); 4804 4805 dout("waiting for sessions to close\n"); 4806 wait_event_timeout(mdsc->session_close_wq, 4807 done_closing_sessions(mdsc, skipped), 4808 ceph_timeout_jiffies(opts->mount_timeout)); 4809 4810 /* tear down remaining sessions */ 4811 mutex_lock(&mdsc->mutex); 4812 for (i = 0; i < mdsc->max_sessions; i++) { 4813 if (mdsc->sessions[i]) { 4814 session = ceph_get_mds_session(mdsc->sessions[i]); 4815 __unregister_session(mdsc, session); 4816 mutex_unlock(&mdsc->mutex); 4817 mutex_lock(&session->s_mutex); 4818 remove_session_caps(session); 4819 mutex_unlock(&session->s_mutex); 4820 ceph_put_mds_session(session); 4821 mutex_lock(&mdsc->mutex); 4822 } 4823 } 4824 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 4825 mutex_unlock(&mdsc->mutex); 4826 4827 ceph_cleanup_snapid_map(mdsc); 4828 ceph_cleanup_empty_realms(mdsc); 4829 4830 cancel_work_sync(&mdsc->cap_reclaim_work); 4831 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4832 4833 dout("stopped\n"); 4834 } 4835 4836 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 4837 { 4838 struct ceph_mds_session *session; 4839 int mds; 4840 4841 dout("force umount\n"); 4842 4843 mutex_lock(&mdsc->mutex); 4844 for (mds = 0; mds < mdsc->max_sessions; mds++) { 4845 session = __ceph_lookup_mds_session(mdsc, mds); 4846 if (!session) 4847 continue; 4848 4849 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 4850 __unregister_session(mdsc, session); 4851 __wake_requests(mdsc, &session->s_waiting); 4852 mutex_unlock(&mdsc->mutex); 4853 4854 mutex_lock(&session->s_mutex); 4855 __close_session(mdsc, session); 4856 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 4857 cleanup_session_requests(mdsc, session); 4858 remove_session_caps(session); 4859 } 4860 mutex_unlock(&session->s_mutex); 4861 ceph_put_mds_session(session); 4862 4863 mutex_lock(&mdsc->mutex); 4864 kick_requests(mdsc, mds); 4865 } 4866 __wake_requests(mdsc, &mdsc->waiting_for_map); 4867 mutex_unlock(&mdsc->mutex); 4868 } 4869 4870 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 4871 { 4872 dout("stop\n"); 4873 /* 4874 * Make sure the delayed work stopped before releasing 4875 * the resources. 4876 * 4877 * Because the cancel_delayed_work_sync() will only 4878 * guarantee that the work finishes executing. But the 4879 * delayed work will re-arm itself again after that. 4880 */ 4881 flush_delayed_work(&mdsc->delayed_work); 4882 4883 if (mdsc->mdsmap) 4884 ceph_mdsmap_destroy(mdsc->mdsmap); 4885 kfree(mdsc->sessions); 4886 ceph_caps_finalize(mdsc); 4887 ceph_pool_perm_destroy(mdsc); 4888 } 4889 4890 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 4891 { 4892 struct ceph_mds_client *mdsc = fsc->mdsc; 4893 dout("mdsc_destroy %p\n", mdsc); 4894 4895 if (!mdsc) 4896 return; 4897 4898 /* flush out any connection work with references to us */ 4899 ceph_msgr_flush(); 4900 4901 ceph_mdsc_stop(mdsc); 4902 4903 ceph_metric_destroy(&mdsc->metric); 4904 4905 flush_delayed_work(&mdsc->metric.delayed_work); 4906 fsc->mdsc = NULL; 4907 kfree(mdsc); 4908 dout("mdsc_destroy %p done\n", mdsc); 4909 } 4910 4911 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4912 { 4913 struct ceph_fs_client *fsc = mdsc->fsc; 4914 const char *mds_namespace = fsc->mount_options->mds_namespace; 4915 void *p = msg->front.iov_base; 4916 void *end = p + msg->front.iov_len; 4917 u32 epoch; 4918 u32 num_fs; 4919 u32 mount_fscid = (u32)-1; 4920 int err = -EINVAL; 4921 4922 ceph_decode_need(&p, end, sizeof(u32), bad); 4923 epoch = ceph_decode_32(&p); 4924 4925 dout("handle_fsmap epoch %u\n", epoch); 4926 4927 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */ 4928 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad); 4929 4930 ceph_decode_32_safe(&p, end, num_fs, bad); 4931 while (num_fs-- > 0) { 4932 void *info_p, *info_end; 4933 u32 info_len; 4934 u32 fscid, namelen; 4935 4936 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4937 p += 2; // info_v, info_cv 4938 info_len = ceph_decode_32(&p); 4939 ceph_decode_need(&p, end, info_len, bad); 4940 info_p = p; 4941 info_end = p + info_len; 4942 p = info_end; 4943 4944 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 4945 fscid = ceph_decode_32(&info_p); 4946 namelen = ceph_decode_32(&info_p); 4947 ceph_decode_need(&info_p, info_end, namelen, bad); 4948 4949 if (mds_namespace && 4950 strlen(mds_namespace) == namelen && 4951 !strncmp(mds_namespace, (char *)info_p, namelen)) { 4952 mount_fscid = fscid; 4953 break; 4954 } 4955 } 4956 4957 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 4958 if (mount_fscid != (u32)-1) { 4959 fsc->client->monc.fs_cluster_id = mount_fscid; 4960 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 4961 0, true); 4962 ceph_monc_renew_subs(&fsc->client->monc); 4963 } else { 4964 err = -ENOENT; 4965 goto err_out; 4966 } 4967 return; 4968 4969 bad: 4970 pr_err("error decoding fsmap\n"); 4971 err_out: 4972 mutex_lock(&mdsc->mutex); 4973 mdsc->mdsmap_err = err; 4974 __wake_requests(mdsc, &mdsc->waiting_for_map); 4975 mutex_unlock(&mdsc->mutex); 4976 } 4977 4978 /* 4979 * handle mds map update. 4980 */ 4981 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4982 { 4983 u32 epoch; 4984 u32 maplen; 4985 void *p = msg->front.iov_base; 4986 void *end = p + msg->front.iov_len; 4987 struct ceph_mdsmap *newmap, *oldmap; 4988 struct ceph_fsid fsid; 4989 int err = -EINVAL; 4990 4991 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 4992 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 4993 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 4994 return; 4995 epoch = ceph_decode_32(&p); 4996 maplen = ceph_decode_32(&p); 4997 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 4998 4999 /* do we need it? */ 5000 mutex_lock(&mdsc->mutex); 5001 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 5002 dout("handle_map epoch %u <= our %u\n", 5003 epoch, mdsc->mdsmap->m_epoch); 5004 mutex_unlock(&mdsc->mutex); 5005 return; 5006 } 5007 5008 newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client)); 5009 if (IS_ERR(newmap)) { 5010 err = PTR_ERR(newmap); 5011 goto bad_unlock; 5012 } 5013 5014 /* swap into place */ 5015 if (mdsc->mdsmap) { 5016 oldmap = mdsc->mdsmap; 5017 mdsc->mdsmap = newmap; 5018 check_new_map(mdsc, newmap, oldmap); 5019 ceph_mdsmap_destroy(oldmap); 5020 } else { 5021 mdsc->mdsmap = newmap; /* first mds map */ 5022 } 5023 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 5024 MAX_LFS_FILESIZE); 5025 5026 __wake_requests(mdsc, &mdsc->waiting_for_map); 5027 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 5028 mdsc->mdsmap->m_epoch); 5029 5030 mutex_unlock(&mdsc->mutex); 5031 schedule_delayed(mdsc); 5032 return; 5033 5034 bad_unlock: 5035 mutex_unlock(&mdsc->mutex); 5036 bad: 5037 pr_err("error decoding mdsmap %d\n", err); 5038 return; 5039 } 5040 5041 static struct ceph_connection *mds_get_con(struct ceph_connection *con) 5042 { 5043 struct ceph_mds_session *s = con->private; 5044 5045 if (ceph_get_mds_session(s)) 5046 return con; 5047 return NULL; 5048 } 5049 5050 static void mds_put_con(struct ceph_connection *con) 5051 { 5052 struct ceph_mds_session *s = con->private; 5053 5054 ceph_put_mds_session(s); 5055 } 5056 5057 /* 5058 * if the client is unresponsive for long enough, the mds will kill 5059 * the session entirely. 5060 */ 5061 static void mds_peer_reset(struct ceph_connection *con) 5062 { 5063 struct ceph_mds_session *s = con->private; 5064 struct ceph_mds_client *mdsc = s->s_mdsc; 5065 5066 pr_warn("mds%d closed our session\n", s->s_mds); 5067 send_mds_reconnect(mdsc, s); 5068 } 5069 5070 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 5071 { 5072 struct ceph_mds_session *s = con->private; 5073 struct ceph_mds_client *mdsc = s->s_mdsc; 5074 int type = le16_to_cpu(msg->hdr.type); 5075 5076 mutex_lock(&mdsc->mutex); 5077 if (__verify_registered_session(mdsc, s) < 0) { 5078 mutex_unlock(&mdsc->mutex); 5079 goto out; 5080 } 5081 mutex_unlock(&mdsc->mutex); 5082 5083 switch (type) { 5084 case CEPH_MSG_MDS_MAP: 5085 ceph_mdsc_handle_mdsmap(mdsc, msg); 5086 break; 5087 case CEPH_MSG_FS_MAP_USER: 5088 ceph_mdsc_handle_fsmap(mdsc, msg); 5089 break; 5090 case CEPH_MSG_CLIENT_SESSION: 5091 handle_session(s, msg); 5092 break; 5093 case CEPH_MSG_CLIENT_REPLY: 5094 handle_reply(s, msg); 5095 break; 5096 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 5097 handle_forward(mdsc, s, msg); 5098 break; 5099 case CEPH_MSG_CLIENT_CAPS: 5100 ceph_handle_caps(s, msg); 5101 break; 5102 case CEPH_MSG_CLIENT_SNAP: 5103 ceph_handle_snap(mdsc, s, msg); 5104 break; 5105 case CEPH_MSG_CLIENT_LEASE: 5106 handle_lease(mdsc, s, msg); 5107 break; 5108 case CEPH_MSG_CLIENT_QUOTA: 5109 ceph_handle_quota(mdsc, s, msg); 5110 break; 5111 5112 default: 5113 pr_err("received unknown message type %d %s\n", type, 5114 ceph_msg_type_name(type)); 5115 } 5116 out: 5117 ceph_msg_put(msg); 5118 } 5119 5120 /* 5121 * authentication 5122 */ 5123 5124 /* 5125 * Note: returned pointer is the address of a structure that's 5126 * managed separately. Caller must *not* attempt to free it. 5127 */ 5128 static struct ceph_auth_handshake * 5129 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new) 5130 { 5131 struct ceph_mds_session *s = con->private; 5132 struct ceph_mds_client *mdsc = s->s_mdsc; 5133 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5134 struct ceph_auth_handshake *auth = &s->s_auth; 5135 int ret; 5136 5137 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5138 force_new, proto, NULL, NULL); 5139 if (ret) 5140 return ERR_PTR(ret); 5141 5142 return auth; 5143 } 5144 5145 static int mds_add_authorizer_challenge(struct ceph_connection *con, 5146 void *challenge_buf, int challenge_buf_len) 5147 { 5148 struct ceph_mds_session *s = con->private; 5149 struct ceph_mds_client *mdsc = s->s_mdsc; 5150 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5151 5152 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 5153 challenge_buf, challenge_buf_len); 5154 } 5155 5156 static int mds_verify_authorizer_reply(struct ceph_connection *con) 5157 { 5158 struct ceph_mds_session *s = con->private; 5159 struct ceph_mds_client *mdsc = s->s_mdsc; 5160 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5161 struct ceph_auth_handshake *auth = &s->s_auth; 5162 5163 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer, 5164 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len, 5165 NULL, NULL, NULL, NULL); 5166 } 5167 5168 static int mds_invalidate_authorizer(struct ceph_connection *con) 5169 { 5170 struct ceph_mds_session *s = con->private; 5171 struct ceph_mds_client *mdsc = s->s_mdsc; 5172 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5173 5174 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 5175 5176 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 5177 } 5178 5179 static int mds_get_auth_request(struct ceph_connection *con, 5180 void *buf, int *buf_len, 5181 void **authorizer, int *authorizer_len) 5182 { 5183 struct ceph_mds_session *s = con->private; 5184 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5185 struct ceph_auth_handshake *auth = &s->s_auth; 5186 int ret; 5187 5188 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5189 buf, buf_len); 5190 if (ret) 5191 return ret; 5192 5193 *authorizer = auth->authorizer_buf; 5194 *authorizer_len = auth->authorizer_buf_len; 5195 return 0; 5196 } 5197 5198 static int mds_handle_auth_reply_more(struct ceph_connection *con, 5199 void *reply, int reply_len, 5200 void *buf, int *buf_len, 5201 void **authorizer, int *authorizer_len) 5202 { 5203 struct ceph_mds_session *s = con->private; 5204 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5205 struct ceph_auth_handshake *auth = &s->s_auth; 5206 int ret; 5207 5208 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len, 5209 buf, buf_len); 5210 if (ret) 5211 return ret; 5212 5213 *authorizer = auth->authorizer_buf; 5214 *authorizer_len = auth->authorizer_buf_len; 5215 return 0; 5216 } 5217 5218 static int mds_handle_auth_done(struct ceph_connection *con, 5219 u64 global_id, void *reply, int reply_len, 5220 u8 *session_key, int *session_key_len, 5221 u8 *con_secret, int *con_secret_len) 5222 { 5223 struct ceph_mds_session *s = con->private; 5224 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5225 struct ceph_auth_handshake *auth = &s->s_auth; 5226 5227 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len, 5228 session_key, session_key_len, 5229 con_secret, con_secret_len); 5230 } 5231 5232 static int mds_handle_auth_bad_method(struct ceph_connection *con, 5233 int used_proto, int result, 5234 const int *allowed_protos, int proto_cnt, 5235 const int *allowed_modes, int mode_cnt) 5236 { 5237 struct ceph_mds_session *s = con->private; 5238 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc; 5239 int ret; 5240 5241 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS, 5242 used_proto, result, 5243 allowed_protos, proto_cnt, 5244 allowed_modes, mode_cnt)) { 5245 ret = ceph_monc_validate_auth(monc); 5246 if (ret) 5247 return ret; 5248 } 5249 5250 return -EACCES; 5251 } 5252 5253 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 5254 struct ceph_msg_header *hdr, int *skip) 5255 { 5256 struct ceph_msg *msg; 5257 int type = (int) le16_to_cpu(hdr->type); 5258 int front_len = (int) le32_to_cpu(hdr->front_len); 5259 5260 if (con->in_msg) 5261 return con->in_msg; 5262 5263 *skip = 0; 5264 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 5265 if (!msg) { 5266 pr_err("unable to allocate msg type %d len %d\n", 5267 type, front_len); 5268 return NULL; 5269 } 5270 5271 return msg; 5272 } 5273 5274 static int mds_sign_message(struct ceph_msg *msg) 5275 { 5276 struct ceph_mds_session *s = msg->con->private; 5277 struct ceph_auth_handshake *auth = &s->s_auth; 5278 5279 return ceph_auth_sign_message(auth, msg); 5280 } 5281 5282 static int mds_check_message_signature(struct ceph_msg *msg) 5283 { 5284 struct ceph_mds_session *s = msg->con->private; 5285 struct ceph_auth_handshake *auth = &s->s_auth; 5286 5287 return ceph_auth_check_message_signature(auth, msg); 5288 } 5289 5290 static const struct ceph_connection_operations mds_con_ops = { 5291 .get = mds_get_con, 5292 .put = mds_put_con, 5293 .alloc_msg = mds_alloc_msg, 5294 .dispatch = mds_dispatch, 5295 .peer_reset = mds_peer_reset, 5296 .get_authorizer = mds_get_authorizer, 5297 .add_authorizer_challenge = mds_add_authorizer_challenge, 5298 .verify_authorizer_reply = mds_verify_authorizer_reply, 5299 .invalidate_authorizer = mds_invalidate_authorizer, 5300 .sign_message = mds_sign_message, 5301 .check_message_signature = mds_check_message_signature, 5302 .get_auth_request = mds_get_auth_request, 5303 .handle_auth_reply_more = mds_handle_auth_reply_more, 5304 .handle_auth_done = mds_handle_auth_done, 5305 .handle_auth_bad_method = mds_handle_auth_bad_method, 5306 }; 5307 5308 /* eof */ 5309