1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 15 #include "super.h" 16 #include "mds_client.h" 17 18 #include <linux/ceph/ceph_features.h> 19 #include <linux/ceph/messenger.h> 20 #include <linux/ceph/decode.h> 21 #include <linux/ceph/pagelist.h> 22 #include <linux/ceph/auth.h> 23 #include <linux/ceph/debugfs.h> 24 25 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 26 27 /* 28 * A cluster of MDS (metadata server) daemons is responsible for 29 * managing the file system namespace (the directory hierarchy and 30 * inodes) and for coordinating shared access to storage. Metadata is 31 * partitioning hierarchically across a number of servers, and that 32 * partition varies over time as the cluster adjusts the distribution 33 * in order to balance load. 34 * 35 * The MDS client is primarily responsible to managing synchronous 36 * metadata requests for operations like open, unlink, and so forth. 37 * If there is a MDS failure, we find out about it when we (possibly 38 * request and) receive a new MDS map, and can resubmit affected 39 * requests. 40 * 41 * For the most part, though, we take advantage of a lossless 42 * communications channel to the MDS, and do not need to worry about 43 * timing out or resubmitting requests. 44 * 45 * We maintain a stateful "session" with each MDS we interact with. 46 * Within each session, we sent periodic heartbeat messages to ensure 47 * any capabilities or leases we have been issues remain valid. If 48 * the session times out and goes stale, our leases and capabilities 49 * are no longer valid. 50 */ 51 52 struct ceph_reconnect_state { 53 struct ceph_mds_session *session; 54 int nr_caps, nr_realms; 55 struct ceph_pagelist *pagelist; 56 unsigned msg_version; 57 bool allow_multi; 58 }; 59 60 static void __wake_requests(struct ceph_mds_client *mdsc, 61 struct list_head *head); 62 static void ceph_cap_release_work(struct work_struct *work); 63 static void ceph_cap_reclaim_work(struct work_struct *work); 64 65 static const struct ceph_connection_operations mds_con_ops; 66 67 68 /* 69 * mds reply parsing 70 */ 71 72 static int parse_reply_info_quota(void **p, void *end, 73 struct ceph_mds_reply_info_in *info) 74 { 75 u8 struct_v, struct_compat; 76 u32 struct_len; 77 78 ceph_decode_8_safe(p, end, struct_v, bad); 79 ceph_decode_8_safe(p, end, struct_compat, bad); 80 /* struct_v is expected to be >= 1. we only 81 * understand encoding with struct_compat == 1. */ 82 if (!struct_v || struct_compat != 1) 83 goto bad; 84 ceph_decode_32_safe(p, end, struct_len, bad); 85 ceph_decode_need(p, end, struct_len, bad); 86 end = *p + struct_len; 87 ceph_decode_64_safe(p, end, info->max_bytes, bad); 88 ceph_decode_64_safe(p, end, info->max_files, bad); 89 *p = end; 90 return 0; 91 bad: 92 return -EIO; 93 } 94 95 /* 96 * parse individual inode info 97 */ 98 static int parse_reply_info_in(void **p, void *end, 99 struct ceph_mds_reply_info_in *info, 100 u64 features) 101 { 102 int err = 0; 103 u8 struct_v = 0; 104 105 if (features == (u64)-1) { 106 u32 struct_len; 107 u8 struct_compat; 108 ceph_decode_8_safe(p, end, struct_v, bad); 109 ceph_decode_8_safe(p, end, struct_compat, bad); 110 /* struct_v is expected to be >= 1. we only understand 111 * encoding with struct_compat == 1. */ 112 if (!struct_v || struct_compat != 1) 113 goto bad; 114 ceph_decode_32_safe(p, end, struct_len, bad); 115 ceph_decode_need(p, end, struct_len, bad); 116 end = *p + struct_len; 117 } 118 119 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 120 info->in = *p; 121 *p += sizeof(struct ceph_mds_reply_inode) + 122 sizeof(*info->in->fragtree.splits) * 123 le32_to_cpu(info->in->fragtree.nsplits); 124 125 ceph_decode_32_safe(p, end, info->symlink_len, bad); 126 ceph_decode_need(p, end, info->symlink_len, bad); 127 info->symlink = *p; 128 *p += info->symlink_len; 129 130 ceph_decode_copy_safe(p, end, &info->dir_layout, 131 sizeof(info->dir_layout), bad); 132 ceph_decode_32_safe(p, end, info->xattr_len, bad); 133 ceph_decode_need(p, end, info->xattr_len, bad); 134 info->xattr_data = *p; 135 *p += info->xattr_len; 136 137 if (features == (u64)-1) { 138 /* inline data */ 139 ceph_decode_64_safe(p, end, info->inline_version, bad); 140 ceph_decode_32_safe(p, end, info->inline_len, bad); 141 ceph_decode_need(p, end, info->inline_len, bad); 142 info->inline_data = *p; 143 *p += info->inline_len; 144 /* quota */ 145 err = parse_reply_info_quota(p, end, info); 146 if (err < 0) 147 goto out_bad; 148 /* pool namespace */ 149 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 150 if (info->pool_ns_len > 0) { 151 ceph_decode_need(p, end, info->pool_ns_len, bad); 152 info->pool_ns_data = *p; 153 *p += info->pool_ns_len; 154 } 155 156 /* btime */ 157 ceph_decode_need(p, end, sizeof(info->btime), bad); 158 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 159 160 /* change attribute */ 161 ceph_decode_64_safe(p, end, info->change_attr, bad); 162 163 /* dir pin */ 164 if (struct_v >= 2) { 165 ceph_decode_32_safe(p, end, info->dir_pin, bad); 166 } else { 167 info->dir_pin = -ENODATA; 168 } 169 170 /* snapshot birth time, remains zero for v<=2 */ 171 if (struct_v >= 3) { 172 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 173 ceph_decode_copy(p, &info->snap_btime, 174 sizeof(info->snap_btime)); 175 } else { 176 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 177 } 178 179 *p = end; 180 } else { 181 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 182 ceph_decode_64_safe(p, end, info->inline_version, bad); 183 ceph_decode_32_safe(p, end, info->inline_len, bad); 184 ceph_decode_need(p, end, info->inline_len, bad); 185 info->inline_data = *p; 186 *p += info->inline_len; 187 } else 188 info->inline_version = CEPH_INLINE_NONE; 189 190 if (features & CEPH_FEATURE_MDS_QUOTA) { 191 err = parse_reply_info_quota(p, end, info); 192 if (err < 0) 193 goto out_bad; 194 } else { 195 info->max_bytes = 0; 196 info->max_files = 0; 197 } 198 199 info->pool_ns_len = 0; 200 info->pool_ns_data = NULL; 201 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 202 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 203 if (info->pool_ns_len > 0) { 204 ceph_decode_need(p, end, info->pool_ns_len, bad); 205 info->pool_ns_data = *p; 206 *p += info->pool_ns_len; 207 } 208 } 209 210 if (features & CEPH_FEATURE_FS_BTIME) { 211 ceph_decode_need(p, end, sizeof(info->btime), bad); 212 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 213 ceph_decode_64_safe(p, end, info->change_attr, bad); 214 } 215 216 info->dir_pin = -ENODATA; 217 /* info->snap_btime remains zero */ 218 } 219 return 0; 220 bad: 221 err = -EIO; 222 out_bad: 223 return err; 224 } 225 226 static int parse_reply_info_dir(void **p, void *end, 227 struct ceph_mds_reply_dirfrag **dirfrag, 228 u64 features) 229 { 230 if (features == (u64)-1) { 231 u8 struct_v, struct_compat; 232 u32 struct_len; 233 ceph_decode_8_safe(p, end, struct_v, bad); 234 ceph_decode_8_safe(p, end, struct_compat, bad); 235 /* struct_v is expected to be >= 1. we only understand 236 * encoding whose struct_compat == 1. */ 237 if (!struct_v || struct_compat != 1) 238 goto bad; 239 ceph_decode_32_safe(p, end, struct_len, bad); 240 ceph_decode_need(p, end, struct_len, bad); 241 end = *p + struct_len; 242 } 243 244 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 245 *dirfrag = *p; 246 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 247 if (unlikely(*p > end)) 248 goto bad; 249 if (features == (u64)-1) 250 *p = end; 251 return 0; 252 bad: 253 return -EIO; 254 } 255 256 static int parse_reply_info_lease(void **p, void *end, 257 struct ceph_mds_reply_lease **lease, 258 u64 features) 259 { 260 if (features == (u64)-1) { 261 u8 struct_v, struct_compat; 262 u32 struct_len; 263 ceph_decode_8_safe(p, end, struct_v, bad); 264 ceph_decode_8_safe(p, end, struct_compat, bad); 265 /* struct_v is expected to be >= 1. we only understand 266 * encoding whose struct_compat == 1. */ 267 if (!struct_v || struct_compat != 1) 268 goto bad; 269 ceph_decode_32_safe(p, end, struct_len, bad); 270 ceph_decode_need(p, end, struct_len, bad); 271 end = *p + struct_len; 272 } 273 274 ceph_decode_need(p, end, sizeof(**lease), bad); 275 *lease = *p; 276 *p += sizeof(**lease); 277 if (features == (u64)-1) 278 *p = end; 279 return 0; 280 bad: 281 return -EIO; 282 } 283 284 /* 285 * parse a normal reply, which may contain a (dir+)dentry and/or a 286 * target inode. 287 */ 288 static int parse_reply_info_trace(void **p, void *end, 289 struct ceph_mds_reply_info_parsed *info, 290 u64 features) 291 { 292 int err; 293 294 if (info->head->is_dentry) { 295 err = parse_reply_info_in(p, end, &info->diri, features); 296 if (err < 0) 297 goto out_bad; 298 299 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 300 if (err < 0) 301 goto out_bad; 302 303 ceph_decode_32_safe(p, end, info->dname_len, bad); 304 ceph_decode_need(p, end, info->dname_len, bad); 305 info->dname = *p; 306 *p += info->dname_len; 307 308 err = parse_reply_info_lease(p, end, &info->dlease, features); 309 if (err < 0) 310 goto out_bad; 311 } 312 313 if (info->head->is_target) { 314 err = parse_reply_info_in(p, end, &info->targeti, features); 315 if (err < 0) 316 goto out_bad; 317 } 318 319 if (unlikely(*p != end)) 320 goto bad; 321 return 0; 322 323 bad: 324 err = -EIO; 325 out_bad: 326 pr_err("problem parsing mds trace %d\n", err); 327 return err; 328 } 329 330 /* 331 * parse readdir results 332 */ 333 static int parse_reply_info_readdir(void **p, void *end, 334 struct ceph_mds_reply_info_parsed *info, 335 u64 features) 336 { 337 u32 num, i = 0; 338 int err; 339 340 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 341 if (err < 0) 342 goto out_bad; 343 344 ceph_decode_need(p, end, sizeof(num) + 2, bad); 345 num = ceph_decode_32(p); 346 { 347 u16 flags = ceph_decode_16(p); 348 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 349 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 350 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 351 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 352 } 353 if (num == 0) 354 goto done; 355 356 BUG_ON(!info->dir_entries); 357 if ((unsigned long)(info->dir_entries + num) > 358 (unsigned long)info->dir_entries + info->dir_buf_size) { 359 pr_err("dir contents are larger than expected\n"); 360 WARN_ON(1); 361 goto bad; 362 } 363 364 info->dir_nr = num; 365 while (num) { 366 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 367 /* dentry */ 368 ceph_decode_32_safe(p, end, rde->name_len, bad); 369 ceph_decode_need(p, end, rde->name_len, bad); 370 rde->name = *p; 371 *p += rde->name_len; 372 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 373 374 /* dentry lease */ 375 err = parse_reply_info_lease(p, end, &rde->lease, features); 376 if (err) 377 goto out_bad; 378 /* inode */ 379 err = parse_reply_info_in(p, end, &rde->inode, features); 380 if (err < 0) 381 goto out_bad; 382 /* ceph_readdir_prepopulate() will update it */ 383 rde->offset = 0; 384 i++; 385 num--; 386 } 387 388 done: 389 /* Skip over any unrecognized fields */ 390 *p = end; 391 return 0; 392 393 bad: 394 err = -EIO; 395 out_bad: 396 pr_err("problem parsing dir contents %d\n", err); 397 return err; 398 } 399 400 /* 401 * parse fcntl F_GETLK results 402 */ 403 static int parse_reply_info_filelock(void **p, void *end, 404 struct ceph_mds_reply_info_parsed *info, 405 u64 features) 406 { 407 if (*p + sizeof(*info->filelock_reply) > end) 408 goto bad; 409 410 info->filelock_reply = *p; 411 412 /* Skip over any unrecognized fields */ 413 *p = end; 414 return 0; 415 bad: 416 return -EIO; 417 } 418 419 420 #if BITS_PER_LONG == 64 421 422 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 423 424 static int ceph_parse_deleg_inos(void **p, void *end, 425 struct ceph_mds_session *s) 426 { 427 u32 sets; 428 429 ceph_decode_32_safe(p, end, sets, bad); 430 dout("got %u sets of delegated inodes\n", sets); 431 while (sets--) { 432 u64 start, len, ino; 433 434 ceph_decode_64_safe(p, end, start, bad); 435 ceph_decode_64_safe(p, end, len, bad); 436 while (len--) { 437 int err = xa_insert(&s->s_delegated_inos, ino = start++, 438 DELEGATED_INO_AVAILABLE, 439 GFP_KERNEL); 440 if (!err) { 441 dout("added delegated inode 0x%llx\n", 442 start - 1); 443 } else if (err == -EBUSY) { 444 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n", 445 start - 1); 446 } else { 447 return err; 448 } 449 } 450 } 451 return 0; 452 bad: 453 return -EIO; 454 } 455 456 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 457 { 458 unsigned long ino; 459 void *val; 460 461 xa_for_each(&s->s_delegated_inos, ino, val) { 462 val = xa_erase(&s->s_delegated_inos, ino); 463 if (val == DELEGATED_INO_AVAILABLE) 464 return ino; 465 } 466 return 0; 467 } 468 469 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 470 { 471 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 472 GFP_KERNEL); 473 } 474 #else /* BITS_PER_LONG == 64 */ 475 /* 476 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 477 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 478 * and bottom words? 479 */ 480 static int ceph_parse_deleg_inos(void **p, void *end, 481 struct ceph_mds_session *s) 482 { 483 u32 sets; 484 485 ceph_decode_32_safe(p, end, sets, bad); 486 if (sets) 487 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 488 return 0; 489 bad: 490 return -EIO; 491 } 492 493 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 494 { 495 return 0; 496 } 497 498 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 499 { 500 return 0; 501 } 502 #endif /* BITS_PER_LONG == 64 */ 503 504 /* 505 * parse create results 506 */ 507 static int parse_reply_info_create(void **p, void *end, 508 struct ceph_mds_reply_info_parsed *info, 509 u64 features, struct ceph_mds_session *s) 510 { 511 int ret; 512 513 if (features == (u64)-1 || 514 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 515 if (*p == end) { 516 /* Malformed reply? */ 517 info->has_create_ino = false; 518 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 519 u8 struct_v, struct_compat; 520 u32 len; 521 522 info->has_create_ino = true; 523 ceph_decode_8_safe(p, end, struct_v, bad); 524 ceph_decode_8_safe(p, end, struct_compat, bad); 525 ceph_decode_32_safe(p, end, len, bad); 526 ceph_decode_64_safe(p, end, info->ino, bad); 527 ret = ceph_parse_deleg_inos(p, end, s); 528 if (ret) 529 return ret; 530 } else { 531 /* legacy */ 532 ceph_decode_64_safe(p, end, info->ino, bad); 533 info->has_create_ino = true; 534 } 535 } else { 536 if (*p != end) 537 goto bad; 538 } 539 540 /* Skip over any unrecognized fields */ 541 *p = end; 542 return 0; 543 bad: 544 return -EIO; 545 } 546 547 /* 548 * parse extra results 549 */ 550 static int parse_reply_info_extra(void **p, void *end, 551 struct ceph_mds_reply_info_parsed *info, 552 u64 features, struct ceph_mds_session *s) 553 { 554 u32 op = le32_to_cpu(info->head->op); 555 556 if (op == CEPH_MDS_OP_GETFILELOCK) 557 return parse_reply_info_filelock(p, end, info, features); 558 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 559 return parse_reply_info_readdir(p, end, info, features); 560 else if (op == CEPH_MDS_OP_CREATE) 561 return parse_reply_info_create(p, end, info, features, s); 562 else 563 return -EIO; 564 } 565 566 /* 567 * parse entire mds reply 568 */ 569 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 570 struct ceph_mds_reply_info_parsed *info, 571 u64 features) 572 { 573 void *p, *end; 574 u32 len; 575 int err; 576 577 info->head = msg->front.iov_base; 578 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 579 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 580 581 /* trace */ 582 ceph_decode_32_safe(&p, end, len, bad); 583 if (len > 0) { 584 ceph_decode_need(&p, end, len, bad); 585 err = parse_reply_info_trace(&p, p+len, info, features); 586 if (err < 0) 587 goto out_bad; 588 } 589 590 /* extra */ 591 ceph_decode_32_safe(&p, end, len, bad); 592 if (len > 0) { 593 ceph_decode_need(&p, end, len, bad); 594 err = parse_reply_info_extra(&p, p+len, info, features, s); 595 if (err < 0) 596 goto out_bad; 597 } 598 599 /* snap blob */ 600 ceph_decode_32_safe(&p, end, len, bad); 601 info->snapblob_len = len; 602 info->snapblob = p; 603 p += len; 604 605 if (p != end) 606 goto bad; 607 return 0; 608 609 bad: 610 err = -EIO; 611 out_bad: 612 pr_err("mds parse_reply err %d\n", err); 613 return err; 614 } 615 616 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 617 { 618 if (!info->dir_entries) 619 return; 620 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 621 } 622 623 624 /* 625 * sessions 626 */ 627 const char *ceph_session_state_name(int s) 628 { 629 switch (s) { 630 case CEPH_MDS_SESSION_NEW: return "new"; 631 case CEPH_MDS_SESSION_OPENING: return "opening"; 632 case CEPH_MDS_SESSION_OPEN: return "open"; 633 case CEPH_MDS_SESSION_HUNG: return "hung"; 634 case CEPH_MDS_SESSION_CLOSING: return "closing"; 635 case CEPH_MDS_SESSION_CLOSED: return "closed"; 636 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 637 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 638 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 639 default: return "???"; 640 } 641 } 642 643 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 644 { 645 if (refcount_inc_not_zero(&s->s_ref)) { 646 dout("mdsc get_session %p %d -> %d\n", s, 647 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref)); 648 return s; 649 } else { 650 dout("mdsc get_session %p 0 -- FAIL\n", s); 651 return NULL; 652 } 653 } 654 655 void ceph_put_mds_session(struct ceph_mds_session *s) 656 { 657 dout("mdsc put_session %p %d -> %d\n", s, 658 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1); 659 if (refcount_dec_and_test(&s->s_ref)) { 660 if (s->s_auth.authorizer) 661 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 662 WARN_ON(mutex_is_locked(&s->s_mutex)); 663 xa_destroy(&s->s_delegated_inos); 664 kfree(s); 665 } 666 } 667 668 /* 669 * called under mdsc->mutex 670 */ 671 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 672 int mds) 673 { 674 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 675 return NULL; 676 return ceph_get_mds_session(mdsc->sessions[mds]); 677 } 678 679 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 680 { 681 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 682 return false; 683 else 684 return true; 685 } 686 687 static int __verify_registered_session(struct ceph_mds_client *mdsc, 688 struct ceph_mds_session *s) 689 { 690 if (s->s_mds >= mdsc->max_sessions || 691 mdsc->sessions[s->s_mds] != s) 692 return -ENOENT; 693 return 0; 694 } 695 696 /* 697 * create+register a new session for given mds. 698 * called under mdsc->mutex. 699 */ 700 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 701 int mds) 702 { 703 struct ceph_mds_session *s; 704 705 if (mds >= mdsc->mdsmap->possible_max_rank) 706 return ERR_PTR(-EINVAL); 707 708 s = kzalloc(sizeof(*s), GFP_NOFS); 709 if (!s) 710 return ERR_PTR(-ENOMEM); 711 712 if (mds >= mdsc->max_sessions) { 713 int newmax = 1 << get_count_order(mds + 1); 714 struct ceph_mds_session **sa; 715 716 dout("%s: realloc to %d\n", __func__, newmax); 717 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 718 if (!sa) 719 goto fail_realloc; 720 if (mdsc->sessions) { 721 memcpy(sa, mdsc->sessions, 722 mdsc->max_sessions * sizeof(void *)); 723 kfree(mdsc->sessions); 724 } 725 mdsc->sessions = sa; 726 mdsc->max_sessions = newmax; 727 } 728 729 dout("%s: mds%d\n", __func__, mds); 730 s->s_mdsc = mdsc; 731 s->s_mds = mds; 732 s->s_state = CEPH_MDS_SESSION_NEW; 733 s->s_ttl = 0; 734 s->s_seq = 0; 735 mutex_init(&s->s_mutex); 736 737 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 738 739 spin_lock_init(&s->s_gen_ttl_lock); 740 s->s_cap_gen = 1; 741 s->s_cap_ttl = jiffies - 1; 742 743 spin_lock_init(&s->s_cap_lock); 744 s->s_renew_requested = 0; 745 s->s_renew_seq = 0; 746 INIT_LIST_HEAD(&s->s_caps); 747 s->s_nr_caps = 0; 748 refcount_set(&s->s_ref, 1); 749 INIT_LIST_HEAD(&s->s_waiting); 750 INIT_LIST_HEAD(&s->s_unsafe); 751 xa_init(&s->s_delegated_inos); 752 s->s_num_cap_releases = 0; 753 s->s_cap_reconnect = 0; 754 s->s_cap_iterator = NULL; 755 INIT_LIST_HEAD(&s->s_cap_releases); 756 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 757 758 INIT_LIST_HEAD(&s->s_cap_dirty); 759 INIT_LIST_HEAD(&s->s_cap_flushing); 760 761 mdsc->sessions[mds] = s; 762 atomic_inc(&mdsc->num_sessions); 763 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 764 765 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 766 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 767 768 return s; 769 770 fail_realloc: 771 kfree(s); 772 return ERR_PTR(-ENOMEM); 773 } 774 775 /* 776 * called under mdsc->mutex 777 */ 778 static void __unregister_session(struct ceph_mds_client *mdsc, 779 struct ceph_mds_session *s) 780 { 781 dout("__unregister_session mds%d %p\n", s->s_mds, s); 782 BUG_ON(mdsc->sessions[s->s_mds] != s); 783 mdsc->sessions[s->s_mds] = NULL; 784 ceph_con_close(&s->s_con); 785 ceph_put_mds_session(s); 786 atomic_dec(&mdsc->num_sessions); 787 } 788 789 /* 790 * drop session refs in request. 791 * 792 * should be last request ref, or hold mdsc->mutex 793 */ 794 static void put_request_session(struct ceph_mds_request *req) 795 { 796 if (req->r_session) { 797 ceph_put_mds_session(req->r_session); 798 req->r_session = NULL; 799 } 800 } 801 802 void ceph_mdsc_release_request(struct kref *kref) 803 { 804 struct ceph_mds_request *req = container_of(kref, 805 struct ceph_mds_request, 806 r_kref); 807 ceph_mdsc_release_dir_caps_no_check(req); 808 destroy_reply_info(&req->r_reply_info); 809 if (req->r_request) 810 ceph_msg_put(req->r_request); 811 if (req->r_reply) 812 ceph_msg_put(req->r_reply); 813 if (req->r_inode) { 814 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 815 /* avoid calling iput_final() in mds dispatch threads */ 816 ceph_async_iput(req->r_inode); 817 } 818 if (req->r_parent) { 819 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 820 ceph_async_iput(req->r_parent); 821 } 822 ceph_async_iput(req->r_target_inode); 823 if (req->r_dentry) 824 dput(req->r_dentry); 825 if (req->r_old_dentry) 826 dput(req->r_old_dentry); 827 if (req->r_old_dentry_dir) { 828 /* 829 * track (and drop pins for) r_old_dentry_dir 830 * separately, since r_old_dentry's d_parent may have 831 * changed between the dir mutex being dropped and 832 * this request being freed. 833 */ 834 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 835 CEPH_CAP_PIN); 836 ceph_async_iput(req->r_old_dentry_dir); 837 } 838 kfree(req->r_path1); 839 kfree(req->r_path2); 840 if (req->r_pagelist) 841 ceph_pagelist_release(req->r_pagelist); 842 put_request_session(req); 843 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 844 WARN_ON_ONCE(!list_empty(&req->r_wait)); 845 kmem_cache_free(ceph_mds_request_cachep, req); 846 } 847 848 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 849 850 /* 851 * lookup session, bump ref if found. 852 * 853 * called under mdsc->mutex. 854 */ 855 static struct ceph_mds_request * 856 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 857 { 858 struct ceph_mds_request *req; 859 860 req = lookup_request(&mdsc->request_tree, tid); 861 if (req) 862 ceph_mdsc_get_request(req); 863 864 return req; 865 } 866 867 /* 868 * Register an in-flight request, and assign a tid. Link to directory 869 * are modifying (if any). 870 * 871 * Called under mdsc->mutex. 872 */ 873 static void __register_request(struct ceph_mds_client *mdsc, 874 struct ceph_mds_request *req, 875 struct inode *dir) 876 { 877 int ret = 0; 878 879 req->r_tid = ++mdsc->last_tid; 880 if (req->r_num_caps) { 881 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 882 req->r_num_caps); 883 if (ret < 0) { 884 pr_err("__register_request %p " 885 "failed to reserve caps: %d\n", req, ret); 886 /* set req->r_err to fail early from __do_request */ 887 req->r_err = ret; 888 return; 889 } 890 } 891 dout("__register_request %p tid %lld\n", req, req->r_tid); 892 ceph_mdsc_get_request(req); 893 insert_request(&mdsc->request_tree, req); 894 895 req->r_uid = current_fsuid(); 896 req->r_gid = current_fsgid(); 897 898 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 899 mdsc->oldest_tid = req->r_tid; 900 901 if (dir) { 902 struct ceph_inode_info *ci = ceph_inode(dir); 903 904 ihold(dir); 905 req->r_unsafe_dir = dir; 906 spin_lock(&ci->i_unsafe_lock); 907 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 908 spin_unlock(&ci->i_unsafe_lock); 909 } 910 } 911 912 static void __unregister_request(struct ceph_mds_client *mdsc, 913 struct ceph_mds_request *req) 914 { 915 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 916 917 /* Never leave an unregistered request on an unsafe list! */ 918 list_del_init(&req->r_unsafe_item); 919 920 if (req->r_tid == mdsc->oldest_tid) { 921 struct rb_node *p = rb_next(&req->r_node); 922 mdsc->oldest_tid = 0; 923 while (p) { 924 struct ceph_mds_request *next_req = 925 rb_entry(p, struct ceph_mds_request, r_node); 926 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 927 mdsc->oldest_tid = next_req->r_tid; 928 break; 929 } 930 p = rb_next(p); 931 } 932 } 933 934 erase_request(&mdsc->request_tree, req); 935 936 if (req->r_unsafe_dir) { 937 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 938 spin_lock(&ci->i_unsafe_lock); 939 list_del_init(&req->r_unsafe_dir_item); 940 spin_unlock(&ci->i_unsafe_lock); 941 } 942 if (req->r_target_inode && 943 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 944 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 945 spin_lock(&ci->i_unsafe_lock); 946 list_del_init(&req->r_unsafe_target_item); 947 spin_unlock(&ci->i_unsafe_lock); 948 } 949 950 if (req->r_unsafe_dir) { 951 /* avoid calling iput_final() in mds dispatch threads */ 952 ceph_async_iput(req->r_unsafe_dir); 953 req->r_unsafe_dir = NULL; 954 } 955 956 complete_all(&req->r_safe_completion); 957 958 ceph_mdsc_put_request(req); 959 } 960 961 /* 962 * Walk back up the dentry tree until we hit a dentry representing a 963 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 964 * when calling this) to ensure that the objects won't disappear while we're 965 * working with them. Once we hit a candidate dentry, we attempt to take a 966 * reference to it, and return that as the result. 967 */ 968 static struct inode *get_nonsnap_parent(struct dentry *dentry) 969 { 970 struct inode *inode = NULL; 971 972 while (dentry && !IS_ROOT(dentry)) { 973 inode = d_inode_rcu(dentry); 974 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 975 break; 976 dentry = dentry->d_parent; 977 } 978 if (inode) 979 inode = igrab(inode); 980 return inode; 981 } 982 983 /* 984 * Choose mds to send request to next. If there is a hint set in the 985 * request (e.g., due to a prior forward hint from the mds), use that. 986 * Otherwise, consult frag tree and/or caps to identify the 987 * appropriate mds. If all else fails, choose randomly. 988 * 989 * Called under mdsc->mutex. 990 */ 991 static int __choose_mds(struct ceph_mds_client *mdsc, 992 struct ceph_mds_request *req, 993 bool *random) 994 { 995 struct inode *inode; 996 struct ceph_inode_info *ci; 997 struct ceph_cap *cap; 998 int mode = req->r_direct_mode; 999 int mds = -1; 1000 u32 hash = req->r_direct_hash; 1001 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1002 1003 if (random) 1004 *random = false; 1005 1006 /* 1007 * is there a specific mds we should try? ignore hint if we have 1008 * no session and the mds is not up (active or recovering). 1009 */ 1010 if (req->r_resend_mds >= 0 && 1011 (__have_session(mdsc, req->r_resend_mds) || 1012 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1013 dout("%s using resend_mds mds%d\n", __func__, 1014 req->r_resend_mds); 1015 return req->r_resend_mds; 1016 } 1017 1018 if (mode == USE_RANDOM_MDS) 1019 goto random; 1020 1021 inode = NULL; 1022 if (req->r_inode) { 1023 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1024 inode = req->r_inode; 1025 ihold(inode); 1026 } else { 1027 /* req->r_dentry is non-null for LSSNAP request */ 1028 rcu_read_lock(); 1029 inode = get_nonsnap_parent(req->r_dentry); 1030 rcu_read_unlock(); 1031 dout("%s using snapdir's parent %p\n", __func__, inode); 1032 } 1033 } else if (req->r_dentry) { 1034 /* ignore race with rename; old or new d_parent is okay */ 1035 struct dentry *parent; 1036 struct inode *dir; 1037 1038 rcu_read_lock(); 1039 parent = READ_ONCE(req->r_dentry->d_parent); 1040 dir = req->r_parent ? : d_inode_rcu(parent); 1041 1042 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1043 /* not this fs or parent went negative */ 1044 inode = d_inode(req->r_dentry); 1045 if (inode) 1046 ihold(inode); 1047 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1048 /* direct snapped/virtual snapdir requests 1049 * based on parent dir inode */ 1050 inode = get_nonsnap_parent(parent); 1051 dout("%s using nonsnap parent %p\n", __func__, inode); 1052 } else { 1053 /* dentry target */ 1054 inode = d_inode(req->r_dentry); 1055 if (!inode || mode == USE_AUTH_MDS) { 1056 /* dir + name */ 1057 inode = igrab(dir); 1058 hash = ceph_dentry_hash(dir, req->r_dentry); 1059 is_hash = true; 1060 } else { 1061 ihold(inode); 1062 } 1063 } 1064 rcu_read_unlock(); 1065 } 1066 1067 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, 1068 hash, mode); 1069 if (!inode) 1070 goto random; 1071 ci = ceph_inode(inode); 1072 1073 if (is_hash && S_ISDIR(inode->i_mode)) { 1074 struct ceph_inode_frag frag; 1075 int found; 1076 1077 ceph_choose_frag(ci, hash, &frag, &found); 1078 if (found) { 1079 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1080 u8 r; 1081 1082 /* choose a random replica */ 1083 get_random_bytes(&r, 1); 1084 r %= frag.ndist; 1085 mds = frag.dist[r]; 1086 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", 1087 __func__, inode, ceph_vinop(inode), 1088 frag.frag, mds, (int)r, frag.ndist); 1089 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1090 CEPH_MDS_STATE_ACTIVE && 1091 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1092 goto out; 1093 } 1094 1095 /* since this file/dir wasn't known to be 1096 * replicated, then we want to look for the 1097 * authoritative mds. */ 1098 if (frag.mds >= 0) { 1099 /* choose auth mds */ 1100 mds = frag.mds; 1101 dout("%s %p %llx.%llx frag %u mds%d (auth)\n", 1102 __func__, inode, ceph_vinop(inode), 1103 frag.frag, mds); 1104 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1105 CEPH_MDS_STATE_ACTIVE) { 1106 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1107 mds)) 1108 goto out; 1109 } 1110 } 1111 mode = USE_AUTH_MDS; 1112 } 1113 } 1114 1115 spin_lock(&ci->i_ceph_lock); 1116 cap = NULL; 1117 if (mode == USE_AUTH_MDS) 1118 cap = ci->i_auth_cap; 1119 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1120 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1121 if (!cap) { 1122 spin_unlock(&ci->i_ceph_lock); 1123 ceph_async_iput(inode); 1124 goto random; 1125 } 1126 mds = cap->session->s_mds; 1127 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, 1128 inode, ceph_vinop(inode), mds, 1129 cap == ci->i_auth_cap ? "auth " : "", cap); 1130 spin_unlock(&ci->i_ceph_lock); 1131 out: 1132 /* avoid calling iput_final() while holding mdsc->mutex or 1133 * in mds dispatch threads */ 1134 ceph_async_iput(inode); 1135 return mds; 1136 1137 random: 1138 if (random) 1139 *random = true; 1140 1141 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1142 dout("%s chose random mds%d\n", __func__, mds); 1143 return mds; 1144 } 1145 1146 1147 /* 1148 * session messages 1149 */ 1150 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 1151 { 1152 struct ceph_msg *msg; 1153 struct ceph_mds_session_head *h; 1154 1155 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1156 false); 1157 if (!msg) { 1158 pr_err("create_session_msg ENOMEM creating msg\n"); 1159 return NULL; 1160 } 1161 h = msg->front.iov_base; 1162 h->op = cpu_to_le32(op); 1163 h->seq = cpu_to_le64(seq); 1164 1165 return msg; 1166 } 1167 1168 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1169 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1170 static int encode_supported_features(void **p, void *end) 1171 { 1172 static const size_t count = ARRAY_SIZE(feature_bits); 1173 1174 if (count > 0) { 1175 size_t i; 1176 size_t size = FEATURE_BYTES(count); 1177 1178 if (WARN_ON_ONCE(*p + 4 + size > end)) 1179 return -ERANGE; 1180 1181 ceph_encode_32(p, size); 1182 memset(*p, 0, size); 1183 for (i = 0; i < count; i++) 1184 ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8); 1185 *p += size; 1186 } else { 1187 if (WARN_ON_ONCE(*p + 4 > end)) 1188 return -ERANGE; 1189 1190 ceph_encode_32(p, 0); 1191 } 1192 1193 return 0; 1194 } 1195 1196 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1197 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1198 static int encode_metric_spec(void **p, void *end) 1199 { 1200 static const size_t count = ARRAY_SIZE(metric_bits); 1201 1202 /* header */ 1203 if (WARN_ON_ONCE(*p + 2 > end)) 1204 return -ERANGE; 1205 1206 ceph_encode_8(p, 1); /* version */ 1207 ceph_encode_8(p, 1); /* compat */ 1208 1209 if (count > 0) { 1210 size_t i; 1211 size_t size = METRIC_BYTES(count); 1212 1213 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1214 return -ERANGE; 1215 1216 /* metric spec info length */ 1217 ceph_encode_32(p, 4 + size); 1218 1219 /* metric spec */ 1220 ceph_encode_32(p, size); 1221 memset(*p, 0, size); 1222 for (i = 0; i < count; i++) 1223 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1224 *p += size; 1225 } else { 1226 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1227 return -ERANGE; 1228 1229 /* metric spec info length */ 1230 ceph_encode_32(p, 4); 1231 /* metric spec */ 1232 ceph_encode_32(p, 0); 1233 } 1234 1235 return 0; 1236 } 1237 1238 /* 1239 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1240 * to include additional client metadata fields. 1241 */ 1242 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1243 { 1244 struct ceph_msg *msg; 1245 struct ceph_mds_session_head *h; 1246 int i = -1; 1247 int extra_bytes = 0; 1248 int metadata_key_count = 0; 1249 struct ceph_options *opt = mdsc->fsc->client->options; 1250 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1251 size_t size, count; 1252 void *p, *end; 1253 int ret; 1254 1255 const char* metadata[][2] = { 1256 {"hostname", mdsc->nodename}, 1257 {"kernel_version", init_utsname()->release}, 1258 {"entity_id", opt->name ? : ""}, 1259 {"root", fsopt->server_path ? : "/"}, 1260 {NULL, NULL} 1261 }; 1262 1263 /* Calculate serialized length of metadata */ 1264 extra_bytes = 4; /* map length */ 1265 for (i = 0; metadata[i][0]; ++i) { 1266 extra_bytes += 8 + strlen(metadata[i][0]) + 1267 strlen(metadata[i][1]); 1268 metadata_key_count++; 1269 } 1270 1271 /* supported feature */ 1272 size = 0; 1273 count = ARRAY_SIZE(feature_bits); 1274 if (count > 0) 1275 size = FEATURE_BYTES(count); 1276 extra_bytes += 4 + size; 1277 1278 /* metric spec */ 1279 size = 0; 1280 count = ARRAY_SIZE(metric_bits); 1281 if (count > 0) 1282 size = METRIC_BYTES(count); 1283 extra_bytes += 2 + 4 + 4 + size; 1284 1285 /* Allocate the message */ 1286 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1287 GFP_NOFS, false); 1288 if (!msg) { 1289 pr_err("create_session_msg ENOMEM creating msg\n"); 1290 return ERR_PTR(-ENOMEM); 1291 } 1292 p = msg->front.iov_base; 1293 end = p + msg->front.iov_len; 1294 1295 h = p; 1296 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1297 h->seq = cpu_to_le64(seq); 1298 1299 /* 1300 * Serialize client metadata into waiting buffer space, using 1301 * the format that userspace expects for map<string, string> 1302 * 1303 * ClientSession messages with metadata are v4 1304 */ 1305 msg->hdr.version = cpu_to_le16(4); 1306 msg->hdr.compat_version = cpu_to_le16(1); 1307 1308 /* The write pointer, following the session_head structure */ 1309 p += sizeof(*h); 1310 1311 /* Number of entries in the map */ 1312 ceph_encode_32(&p, metadata_key_count); 1313 1314 /* Two length-prefixed strings for each entry in the map */ 1315 for (i = 0; metadata[i][0]; ++i) { 1316 size_t const key_len = strlen(metadata[i][0]); 1317 size_t const val_len = strlen(metadata[i][1]); 1318 1319 ceph_encode_32(&p, key_len); 1320 memcpy(p, metadata[i][0], key_len); 1321 p += key_len; 1322 ceph_encode_32(&p, val_len); 1323 memcpy(p, metadata[i][1], val_len); 1324 p += val_len; 1325 } 1326 1327 ret = encode_supported_features(&p, end); 1328 if (ret) { 1329 pr_err("encode_supported_features failed!\n"); 1330 ceph_msg_put(msg); 1331 return ERR_PTR(ret); 1332 } 1333 1334 ret = encode_metric_spec(&p, end); 1335 if (ret) { 1336 pr_err("encode_metric_spec failed!\n"); 1337 ceph_msg_put(msg); 1338 return ERR_PTR(ret); 1339 } 1340 1341 msg->front.iov_len = p - msg->front.iov_base; 1342 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1343 1344 return msg; 1345 } 1346 1347 /* 1348 * send session open request. 1349 * 1350 * called under mdsc->mutex 1351 */ 1352 static int __open_session(struct ceph_mds_client *mdsc, 1353 struct ceph_mds_session *session) 1354 { 1355 struct ceph_msg *msg; 1356 int mstate; 1357 int mds = session->s_mds; 1358 1359 /* wait for mds to go active? */ 1360 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1361 dout("open_session to mds%d (%s)\n", mds, 1362 ceph_mds_state_name(mstate)); 1363 session->s_state = CEPH_MDS_SESSION_OPENING; 1364 session->s_renew_requested = jiffies; 1365 1366 /* send connect message */ 1367 msg = create_session_open_msg(mdsc, session->s_seq); 1368 if (IS_ERR(msg)) 1369 return PTR_ERR(msg); 1370 ceph_con_send(&session->s_con, msg); 1371 return 0; 1372 } 1373 1374 /* 1375 * open sessions for any export targets for the given mds 1376 * 1377 * called under mdsc->mutex 1378 */ 1379 static struct ceph_mds_session * 1380 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1381 { 1382 struct ceph_mds_session *session; 1383 int ret; 1384 1385 session = __ceph_lookup_mds_session(mdsc, target); 1386 if (!session) { 1387 session = register_session(mdsc, target); 1388 if (IS_ERR(session)) 1389 return session; 1390 } 1391 if (session->s_state == CEPH_MDS_SESSION_NEW || 1392 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1393 ret = __open_session(mdsc, session); 1394 if (ret) 1395 return ERR_PTR(ret); 1396 } 1397 1398 return session; 1399 } 1400 1401 struct ceph_mds_session * 1402 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1403 { 1404 struct ceph_mds_session *session; 1405 1406 dout("open_export_target_session to mds%d\n", target); 1407 1408 mutex_lock(&mdsc->mutex); 1409 session = __open_export_target_session(mdsc, target); 1410 mutex_unlock(&mdsc->mutex); 1411 1412 return session; 1413 } 1414 1415 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1416 struct ceph_mds_session *session) 1417 { 1418 struct ceph_mds_info *mi; 1419 struct ceph_mds_session *ts; 1420 int i, mds = session->s_mds; 1421 1422 if (mds >= mdsc->mdsmap->possible_max_rank) 1423 return; 1424 1425 mi = &mdsc->mdsmap->m_info[mds]; 1426 dout("open_export_target_sessions for mds%d (%d targets)\n", 1427 session->s_mds, mi->num_export_targets); 1428 1429 for (i = 0; i < mi->num_export_targets; i++) { 1430 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1431 if (!IS_ERR(ts)) 1432 ceph_put_mds_session(ts); 1433 } 1434 } 1435 1436 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1437 struct ceph_mds_session *session) 1438 { 1439 mutex_lock(&mdsc->mutex); 1440 __open_export_target_sessions(mdsc, session); 1441 mutex_unlock(&mdsc->mutex); 1442 } 1443 1444 /* 1445 * session caps 1446 */ 1447 1448 static void detach_cap_releases(struct ceph_mds_session *session, 1449 struct list_head *target) 1450 { 1451 lockdep_assert_held(&session->s_cap_lock); 1452 1453 list_splice_init(&session->s_cap_releases, target); 1454 session->s_num_cap_releases = 0; 1455 dout("dispose_cap_releases mds%d\n", session->s_mds); 1456 } 1457 1458 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1459 struct list_head *dispose) 1460 { 1461 while (!list_empty(dispose)) { 1462 struct ceph_cap *cap; 1463 /* zero out the in-progress message */ 1464 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1465 list_del(&cap->session_caps); 1466 ceph_put_cap(mdsc, cap); 1467 } 1468 } 1469 1470 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1471 struct ceph_mds_session *session) 1472 { 1473 struct ceph_mds_request *req; 1474 struct rb_node *p; 1475 struct ceph_inode_info *ci; 1476 1477 dout("cleanup_session_requests mds%d\n", session->s_mds); 1478 mutex_lock(&mdsc->mutex); 1479 while (!list_empty(&session->s_unsafe)) { 1480 req = list_first_entry(&session->s_unsafe, 1481 struct ceph_mds_request, r_unsafe_item); 1482 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1483 req->r_tid); 1484 if (req->r_target_inode) { 1485 /* dropping unsafe change of inode's attributes */ 1486 ci = ceph_inode(req->r_target_inode); 1487 errseq_set(&ci->i_meta_err, -EIO); 1488 } 1489 if (req->r_unsafe_dir) { 1490 /* dropping unsafe directory operation */ 1491 ci = ceph_inode(req->r_unsafe_dir); 1492 errseq_set(&ci->i_meta_err, -EIO); 1493 } 1494 __unregister_request(mdsc, req); 1495 } 1496 /* zero r_attempts, so kick_requests() will re-send requests */ 1497 p = rb_first(&mdsc->request_tree); 1498 while (p) { 1499 req = rb_entry(p, struct ceph_mds_request, r_node); 1500 p = rb_next(p); 1501 if (req->r_session && 1502 req->r_session->s_mds == session->s_mds) 1503 req->r_attempts = 0; 1504 } 1505 mutex_unlock(&mdsc->mutex); 1506 } 1507 1508 /* 1509 * Helper to safely iterate over all caps associated with a session, with 1510 * special care taken to handle a racing __ceph_remove_cap(). 1511 * 1512 * Caller must hold session s_mutex. 1513 */ 1514 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1515 int (*cb)(struct inode *, struct ceph_cap *, 1516 void *), void *arg) 1517 { 1518 struct list_head *p; 1519 struct ceph_cap *cap; 1520 struct inode *inode, *last_inode = NULL; 1521 struct ceph_cap *old_cap = NULL; 1522 int ret; 1523 1524 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1525 spin_lock(&session->s_cap_lock); 1526 p = session->s_caps.next; 1527 while (p != &session->s_caps) { 1528 cap = list_entry(p, struct ceph_cap, session_caps); 1529 inode = igrab(&cap->ci->vfs_inode); 1530 if (!inode) { 1531 p = p->next; 1532 continue; 1533 } 1534 session->s_cap_iterator = cap; 1535 spin_unlock(&session->s_cap_lock); 1536 1537 if (last_inode) { 1538 /* avoid calling iput_final() while holding 1539 * s_mutex or in mds dispatch threads */ 1540 ceph_async_iput(last_inode); 1541 last_inode = NULL; 1542 } 1543 if (old_cap) { 1544 ceph_put_cap(session->s_mdsc, old_cap); 1545 old_cap = NULL; 1546 } 1547 1548 ret = cb(inode, cap, arg); 1549 last_inode = inode; 1550 1551 spin_lock(&session->s_cap_lock); 1552 p = p->next; 1553 if (!cap->ci) { 1554 dout("iterate_session_caps finishing cap %p removal\n", 1555 cap); 1556 BUG_ON(cap->session != session); 1557 cap->session = NULL; 1558 list_del_init(&cap->session_caps); 1559 session->s_nr_caps--; 1560 atomic64_dec(&session->s_mdsc->metric.total_caps); 1561 if (cap->queue_release) 1562 __ceph_queue_cap_release(session, cap); 1563 else 1564 old_cap = cap; /* put_cap it w/o locks held */ 1565 } 1566 if (ret < 0) 1567 goto out; 1568 } 1569 ret = 0; 1570 out: 1571 session->s_cap_iterator = NULL; 1572 spin_unlock(&session->s_cap_lock); 1573 1574 ceph_async_iput(last_inode); 1575 if (old_cap) 1576 ceph_put_cap(session->s_mdsc, old_cap); 1577 1578 return ret; 1579 } 1580 1581 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1582 void *arg) 1583 { 1584 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg; 1585 struct ceph_inode_info *ci = ceph_inode(inode); 1586 LIST_HEAD(to_remove); 1587 bool dirty_dropped = false; 1588 bool invalidate = false; 1589 1590 dout("removing cap %p, ci is %p, inode is %p\n", 1591 cap, ci, &ci->vfs_inode); 1592 spin_lock(&ci->i_ceph_lock); 1593 __ceph_remove_cap(cap, false); 1594 if (!ci->i_auth_cap) { 1595 struct ceph_cap_flush *cf; 1596 struct ceph_mds_client *mdsc = fsc->mdsc; 1597 1598 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 1599 if (inode->i_data.nrpages > 0) 1600 invalidate = true; 1601 if (ci->i_wrbuffer_ref > 0) 1602 mapping_set_error(&inode->i_data, -EIO); 1603 } 1604 1605 while (!list_empty(&ci->i_cap_flush_list)) { 1606 cf = list_first_entry(&ci->i_cap_flush_list, 1607 struct ceph_cap_flush, i_list); 1608 list_move(&cf->i_list, &to_remove); 1609 } 1610 1611 spin_lock(&mdsc->cap_dirty_lock); 1612 1613 list_for_each_entry(cf, &to_remove, i_list) 1614 list_del(&cf->g_list); 1615 1616 if (!list_empty(&ci->i_dirty_item)) { 1617 pr_warn_ratelimited( 1618 " dropping dirty %s state for %p %lld\n", 1619 ceph_cap_string(ci->i_dirty_caps), 1620 inode, ceph_ino(inode)); 1621 ci->i_dirty_caps = 0; 1622 list_del_init(&ci->i_dirty_item); 1623 dirty_dropped = true; 1624 } 1625 if (!list_empty(&ci->i_flushing_item)) { 1626 pr_warn_ratelimited( 1627 " dropping dirty+flushing %s state for %p %lld\n", 1628 ceph_cap_string(ci->i_flushing_caps), 1629 inode, ceph_ino(inode)); 1630 ci->i_flushing_caps = 0; 1631 list_del_init(&ci->i_flushing_item); 1632 mdsc->num_cap_flushing--; 1633 dirty_dropped = true; 1634 } 1635 spin_unlock(&mdsc->cap_dirty_lock); 1636 1637 if (dirty_dropped) { 1638 errseq_set(&ci->i_meta_err, -EIO); 1639 1640 if (ci->i_wrbuffer_ref_head == 0 && 1641 ci->i_wr_ref == 0 && 1642 ci->i_dirty_caps == 0 && 1643 ci->i_flushing_caps == 0) { 1644 ceph_put_snap_context(ci->i_head_snapc); 1645 ci->i_head_snapc = NULL; 1646 } 1647 } 1648 1649 if (atomic_read(&ci->i_filelock_ref) > 0) { 1650 /* make further file lock syscall return -EIO */ 1651 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK; 1652 pr_warn_ratelimited(" dropping file locks for %p %lld\n", 1653 inode, ceph_ino(inode)); 1654 } 1655 1656 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { 1657 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); 1658 ci->i_prealloc_cap_flush = NULL; 1659 } 1660 } 1661 spin_unlock(&ci->i_ceph_lock); 1662 while (!list_empty(&to_remove)) { 1663 struct ceph_cap_flush *cf; 1664 cf = list_first_entry(&to_remove, 1665 struct ceph_cap_flush, i_list); 1666 list_del(&cf->i_list); 1667 ceph_free_cap_flush(cf); 1668 } 1669 1670 wake_up_all(&ci->i_cap_wq); 1671 if (invalidate) 1672 ceph_queue_invalidate(inode); 1673 if (dirty_dropped) 1674 iput(inode); 1675 return 0; 1676 } 1677 1678 /* 1679 * caller must hold session s_mutex 1680 */ 1681 static void remove_session_caps(struct ceph_mds_session *session) 1682 { 1683 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1684 struct super_block *sb = fsc->sb; 1685 LIST_HEAD(dispose); 1686 1687 dout("remove_session_caps on %p\n", session); 1688 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1689 1690 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1691 1692 spin_lock(&session->s_cap_lock); 1693 if (session->s_nr_caps > 0) { 1694 struct inode *inode; 1695 struct ceph_cap *cap, *prev = NULL; 1696 struct ceph_vino vino; 1697 /* 1698 * iterate_session_caps() skips inodes that are being 1699 * deleted, we need to wait until deletions are complete. 1700 * __wait_on_freeing_inode() is designed for the job, 1701 * but it is not exported, so use lookup inode function 1702 * to access it. 1703 */ 1704 while (!list_empty(&session->s_caps)) { 1705 cap = list_entry(session->s_caps.next, 1706 struct ceph_cap, session_caps); 1707 if (cap == prev) 1708 break; 1709 prev = cap; 1710 vino = cap->ci->i_vino; 1711 spin_unlock(&session->s_cap_lock); 1712 1713 inode = ceph_find_inode(sb, vino); 1714 /* avoid calling iput_final() while holding s_mutex */ 1715 ceph_async_iput(inode); 1716 1717 spin_lock(&session->s_cap_lock); 1718 } 1719 } 1720 1721 // drop cap expires and unlock s_cap_lock 1722 detach_cap_releases(session, &dispose); 1723 1724 BUG_ON(session->s_nr_caps > 0); 1725 BUG_ON(!list_empty(&session->s_cap_flushing)); 1726 spin_unlock(&session->s_cap_lock); 1727 dispose_cap_releases(session->s_mdsc, &dispose); 1728 } 1729 1730 enum { 1731 RECONNECT, 1732 RENEWCAPS, 1733 FORCE_RO, 1734 }; 1735 1736 /* 1737 * wake up any threads waiting on this session's caps. if the cap is 1738 * old (didn't get renewed on the client reconnect), remove it now. 1739 * 1740 * caller must hold s_mutex. 1741 */ 1742 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1743 void *arg) 1744 { 1745 struct ceph_inode_info *ci = ceph_inode(inode); 1746 unsigned long ev = (unsigned long)arg; 1747 1748 if (ev == RECONNECT) { 1749 spin_lock(&ci->i_ceph_lock); 1750 ci->i_wanted_max_size = 0; 1751 ci->i_requested_max_size = 0; 1752 spin_unlock(&ci->i_ceph_lock); 1753 } else if (ev == RENEWCAPS) { 1754 if (cap->cap_gen < cap->session->s_cap_gen) { 1755 /* mds did not re-issue stale cap */ 1756 spin_lock(&ci->i_ceph_lock); 1757 cap->issued = cap->implemented = CEPH_CAP_PIN; 1758 spin_unlock(&ci->i_ceph_lock); 1759 } 1760 } else if (ev == FORCE_RO) { 1761 } 1762 wake_up_all(&ci->i_cap_wq); 1763 return 0; 1764 } 1765 1766 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1767 { 1768 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1769 ceph_iterate_session_caps(session, wake_up_session_cb, 1770 (void *)(unsigned long)ev); 1771 } 1772 1773 /* 1774 * Send periodic message to MDS renewing all currently held caps. The 1775 * ack will reset the expiration for all caps from this session. 1776 * 1777 * caller holds s_mutex 1778 */ 1779 static int send_renew_caps(struct ceph_mds_client *mdsc, 1780 struct ceph_mds_session *session) 1781 { 1782 struct ceph_msg *msg; 1783 int state; 1784 1785 if (time_after_eq(jiffies, session->s_cap_ttl) && 1786 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1787 pr_info("mds%d caps stale\n", session->s_mds); 1788 session->s_renew_requested = jiffies; 1789 1790 /* do not try to renew caps until a recovering mds has reconnected 1791 * with its clients. */ 1792 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1793 if (state < CEPH_MDS_STATE_RECONNECT) { 1794 dout("send_renew_caps ignoring mds%d (%s)\n", 1795 session->s_mds, ceph_mds_state_name(state)); 1796 return 0; 1797 } 1798 1799 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1800 ceph_mds_state_name(state)); 1801 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1802 ++session->s_renew_seq); 1803 if (!msg) 1804 return -ENOMEM; 1805 ceph_con_send(&session->s_con, msg); 1806 return 0; 1807 } 1808 1809 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1810 struct ceph_mds_session *session, u64 seq) 1811 { 1812 struct ceph_msg *msg; 1813 1814 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1815 session->s_mds, ceph_session_state_name(session->s_state), seq); 1816 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1817 if (!msg) 1818 return -ENOMEM; 1819 ceph_con_send(&session->s_con, msg); 1820 return 0; 1821 } 1822 1823 1824 /* 1825 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1826 * 1827 * Called under session->s_mutex 1828 */ 1829 static void renewed_caps(struct ceph_mds_client *mdsc, 1830 struct ceph_mds_session *session, int is_renew) 1831 { 1832 int was_stale; 1833 int wake = 0; 1834 1835 spin_lock(&session->s_cap_lock); 1836 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1837 1838 session->s_cap_ttl = session->s_renew_requested + 1839 mdsc->mdsmap->m_session_timeout*HZ; 1840 1841 if (was_stale) { 1842 if (time_before(jiffies, session->s_cap_ttl)) { 1843 pr_info("mds%d caps renewed\n", session->s_mds); 1844 wake = 1; 1845 } else { 1846 pr_info("mds%d caps still stale\n", session->s_mds); 1847 } 1848 } 1849 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1850 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1851 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1852 spin_unlock(&session->s_cap_lock); 1853 1854 if (wake) 1855 wake_up_session_caps(session, RENEWCAPS); 1856 } 1857 1858 /* 1859 * send a session close request 1860 */ 1861 static int request_close_session(struct ceph_mds_session *session) 1862 { 1863 struct ceph_msg *msg; 1864 1865 dout("request_close_session mds%d state %s seq %lld\n", 1866 session->s_mds, ceph_session_state_name(session->s_state), 1867 session->s_seq); 1868 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1869 if (!msg) 1870 return -ENOMEM; 1871 ceph_con_send(&session->s_con, msg); 1872 return 1; 1873 } 1874 1875 /* 1876 * Called with s_mutex held. 1877 */ 1878 static int __close_session(struct ceph_mds_client *mdsc, 1879 struct ceph_mds_session *session) 1880 { 1881 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1882 return 0; 1883 session->s_state = CEPH_MDS_SESSION_CLOSING; 1884 return request_close_session(session); 1885 } 1886 1887 static bool drop_negative_children(struct dentry *dentry) 1888 { 1889 struct dentry *child; 1890 bool all_negative = true; 1891 1892 if (!d_is_dir(dentry)) 1893 goto out; 1894 1895 spin_lock(&dentry->d_lock); 1896 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1897 if (d_really_is_positive(child)) { 1898 all_negative = false; 1899 break; 1900 } 1901 } 1902 spin_unlock(&dentry->d_lock); 1903 1904 if (all_negative) 1905 shrink_dcache_parent(dentry); 1906 out: 1907 return all_negative; 1908 } 1909 1910 /* 1911 * Trim old(er) caps. 1912 * 1913 * Because we can't cache an inode without one or more caps, we do 1914 * this indirectly: if a cap is unused, we prune its aliases, at which 1915 * point the inode will hopefully get dropped to. 1916 * 1917 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1918 * memory pressure from the MDS, though, so it needn't be perfect. 1919 */ 1920 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1921 { 1922 int *remaining = arg; 1923 struct ceph_inode_info *ci = ceph_inode(inode); 1924 int used, wanted, oissued, mine; 1925 1926 if (*remaining <= 0) 1927 return -1; 1928 1929 spin_lock(&ci->i_ceph_lock); 1930 mine = cap->issued | cap->implemented; 1931 used = __ceph_caps_used(ci); 1932 wanted = __ceph_caps_file_wanted(ci); 1933 oissued = __ceph_caps_issued_other(ci, cap); 1934 1935 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1936 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1937 ceph_cap_string(used), ceph_cap_string(wanted)); 1938 if (cap == ci->i_auth_cap) { 1939 if (ci->i_dirty_caps || ci->i_flushing_caps || 1940 !list_empty(&ci->i_cap_snaps)) 1941 goto out; 1942 if ((used | wanted) & CEPH_CAP_ANY_WR) 1943 goto out; 1944 /* Note: it's possible that i_filelock_ref becomes non-zero 1945 * after dropping auth caps. It doesn't hurt because reply 1946 * of lock mds request will re-add auth caps. */ 1947 if (atomic_read(&ci->i_filelock_ref) > 0) 1948 goto out; 1949 } 1950 /* The inode has cached pages, but it's no longer used. 1951 * we can safely drop it */ 1952 if (S_ISREG(inode->i_mode) && 1953 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 1954 !(oissued & CEPH_CAP_FILE_CACHE)) { 1955 used = 0; 1956 oissued = 0; 1957 } 1958 if ((used | wanted) & ~oissued & mine) 1959 goto out; /* we need these caps */ 1960 1961 if (oissued) { 1962 /* we aren't the only cap.. just remove us */ 1963 __ceph_remove_cap(cap, true); 1964 (*remaining)--; 1965 } else { 1966 struct dentry *dentry; 1967 /* try dropping referring dentries */ 1968 spin_unlock(&ci->i_ceph_lock); 1969 dentry = d_find_any_alias(inode); 1970 if (dentry && drop_negative_children(dentry)) { 1971 int count; 1972 dput(dentry); 1973 d_prune_aliases(inode); 1974 count = atomic_read(&inode->i_count); 1975 if (count == 1) 1976 (*remaining)--; 1977 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1978 inode, cap, count); 1979 } else { 1980 dput(dentry); 1981 } 1982 return 0; 1983 } 1984 1985 out: 1986 spin_unlock(&ci->i_ceph_lock); 1987 return 0; 1988 } 1989 1990 /* 1991 * Trim session cap count down to some max number. 1992 */ 1993 int ceph_trim_caps(struct ceph_mds_client *mdsc, 1994 struct ceph_mds_session *session, 1995 int max_caps) 1996 { 1997 int trim_caps = session->s_nr_caps - max_caps; 1998 1999 dout("trim_caps mds%d start: %d / %d, trim %d\n", 2000 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 2001 if (trim_caps > 0) { 2002 int remaining = trim_caps; 2003 2004 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2005 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 2006 session->s_mds, session->s_nr_caps, max_caps, 2007 trim_caps - remaining); 2008 } 2009 2010 ceph_flush_cap_releases(mdsc, session); 2011 return 0; 2012 } 2013 2014 static int check_caps_flush(struct ceph_mds_client *mdsc, 2015 u64 want_flush_tid) 2016 { 2017 int ret = 1; 2018 2019 spin_lock(&mdsc->cap_dirty_lock); 2020 if (!list_empty(&mdsc->cap_flush_list)) { 2021 struct ceph_cap_flush *cf = 2022 list_first_entry(&mdsc->cap_flush_list, 2023 struct ceph_cap_flush, g_list); 2024 if (cf->tid <= want_flush_tid) { 2025 dout("check_caps_flush still flushing tid " 2026 "%llu <= %llu\n", cf->tid, want_flush_tid); 2027 ret = 0; 2028 } 2029 } 2030 spin_unlock(&mdsc->cap_dirty_lock); 2031 return ret; 2032 } 2033 2034 /* 2035 * flush all dirty inode data to disk. 2036 * 2037 * returns true if we've flushed through want_flush_tid 2038 */ 2039 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2040 u64 want_flush_tid) 2041 { 2042 dout("check_caps_flush want %llu\n", want_flush_tid); 2043 2044 wait_event(mdsc->cap_flushing_wq, 2045 check_caps_flush(mdsc, want_flush_tid)); 2046 2047 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 2048 } 2049 2050 /* 2051 * called under s_mutex 2052 */ 2053 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2054 struct ceph_mds_session *session) 2055 { 2056 struct ceph_msg *msg = NULL; 2057 struct ceph_mds_cap_release *head; 2058 struct ceph_mds_cap_item *item; 2059 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2060 struct ceph_cap *cap; 2061 LIST_HEAD(tmp_list); 2062 int num_cap_releases; 2063 __le32 barrier, *cap_barrier; 2064 2065 down_read(&osdc->lock); 2066 barrier = cpu_to_le32(osdc->epoch_barrier); 2067 up_read(&osdc->lock); 2068 2069 spin_lock(&session->s_cap_lock); 2070 again: 2071 list_splice_init(&session->s_cap_releases, &tmp_list); 2072 num_cap_releases = session->s_num_cap_releases; 2073 session->s_num_cap_releases = 0; 2074 spin_unlock(&session->s_cap_lock); 2075 2076 while (!list_empty(&tmp_list)) { 2077 if (!msg) { 2078 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2079 PAGE_SIZE, GFP_NOFS, false); 2080 if (!msg) 2081 goto out_err; 2082 head = msg->front.iov_base; 2083 head->num = cpu_to_le32(0); 2084 msg->front.iov_len = sizeof(*head); 2085 2086 msg->hdr.version = cpu_to_le16(2); 2087 msg->hdr.compat_version = cpu_to_le16(1); 2088 } 2089 2090 cap = list_first_entry(&tmp_list, struct ceph_cap, 2091 session_caps); 2092 list_del(&cap->session_caps); 2093 num_cap_releases--; 2094 2095 head = msg->front.iov_base; 2096 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2097 &head->num); 2098 item = msg->front.iov_base + msg->front.iov_len; 2099 item->ino = cpu_to_le64(cap->cap_ino); 2100 item->cap_id = cpu_to_le64(cap->cap_id); 2101 item->migrate_seq = cpu_to_le32(cap->mseq); 2102 item->seq = cpu_to_le32(cap->issue_seq); 2103 msg->front.iov_len += sizeof(*item); 2104 2105 ceph_put_cap(mdsc, cap); 2106 2107 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2108 // Append cap_barrier field 2109 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2110 *cap_barrier = barrier; 2111 msg->front.iov_len += sizeof(*cap_barrier); 2112 2113 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2114 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2115 ceph_con_send(&session->s_con, msg); 2116 msg = NULL; 2117 } 2118 } 2119 2120 BUG_ON(num_cap_releases != 0); 2121 2122 spin_lock(&session->s_cap_lock); 2123 if (!list_empty(&session->s_cap_releases)) 2124 goto again; 2125 spin_unlock(&session->s_cap_lock); 2126 2127 if (msg) { 2128 // Append cap_barrier field 2129 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2130 *cap_barrier = barrier; 2131 msg->front.iov_len += sizeof(*cap_barrier); 2132 2133 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2134 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2135 ceph_con_send(&session->s_con, msg); 2136 } 2137 return; 2138 out_err: 2139 pr_err("send_cap_releases mds%d, failed to allocate message\n", 2140 session->s_mds); 2141 spin_lock(&session->s_cap_lock); 2142 list_splice(&tmp_list, &session->s_cap_releases); 2143 session->s_num_cap_releases += num_cap_releases; 2144 spin_unlock(&session->s_cap_lock); 2145 } 2146 2147 static void ceph_cap_release_work(struct work_struct *work) 2148 { 2149 struct ceph_mds_session *session = 2150 container_of(work, struct ceph_mds_session, s_cap_release_work); 2151 2152 mutex_lock(&session->s_mutex); 2153 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2154 session->s_state == CEPH_MDS_SESSION_HUNG) 2155 ceph_send_cap_releases(session->s_mdsc, session); 2156 mutex_unlock(&session->s_mutex); 2157 ceph_put_mds_session(session); 2158 } 2159 2160 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 2161 struct ceph_mds_session *session) 2162 { 2163 if (mdsc->stopping) 2164 return; 2165 2166 ceph_get_mds_session(session); 2167 if (queue_work(mdsc->fsc->cap_wq, 2168 &session->s_cap_release_work)) { 2169 dout("cap release work queued\n"); 2170 } else { 2171 ceph_put_mds_session(session); 2172 dout("failed to queue cap release work\n"); 2173 } 2174 } 2175 2176 /* 2177 * caller holds session->s_cap_lock 2178 */ 2179 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2180 struct ceph_cap *cap) 2181 { 2182 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2183 session->s_num_cap_releases++; 2184 2185 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2186 ceph_flush_cap_releases(session->s_mdsc, session); 2187 } 2188 2189 static void ceph_cap_reclaim_work(struct work_struct *work) 2190 { 2191 struct ceph_mds_client *mdsc = 2192 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2193 int ret = ceph_trim_dentries(mdsc); 2194 if (ret == -EAGAIN) 2195 ceph_queue_cap_reclaim_work(mdsc); 2196 } 2197 2198 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2199 { 2200 if (mdsc->stopping) 2201 return; 2202 2203 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2204 dout("caps reclaim work queued\n"); 2205 } else { 2206 dout("failed to queue caps release work\n"); 2207 } 2208 } 2209 2210 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2211 { 2212 int val; 2213 if (!nr) 2214 return; 2215 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2216 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2217 atomic_set(&mdsc->cap_reclaim_pending, 0); 2218 ceph_queue_cap_reclaim_work(mdsc); 2219 } 2220 } 2221 2222 /* 2223 * requests 2224 */ 2225 2226 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2227 struct inode *dir) 2228 { 2229 struct ceph_inode_info *ci = ceph_inode(dir); 2230 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2231 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2232 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2233 unsigned int num_entries; 2234 int order; 2235 2236 spin_lock(&ci->i_ceph_lock); 2237 num_entries = ci->i_files + ci->i_subdirs; 2238 spin_unlock(&ci->i_ceph_lock); 2239 num_entries = max(num_entries, 1U); 2240 num_entries = min(num_entries, opt->max_readdir); 2241 2242 order = get_order(size * num_entries); 2243 while (order >= 0) { 2244 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2245 __GFP_NOWARN, 2246 order); 2247 if (rinfo->dir_entries) 2248 break; 2249 order--; 2250 } 2251 if (!rinfo->dir_entries) 2252 return -ENOMEM; 2253 2254 num_entries = (PAGE_SIZE << order) / size; 2255 num_entries = min(num_entries, opt->max_readdir); 2256 2257 rinfo->dir_buf_size = PAGE_SIZE << order; 2258 req->r_num_caps = num_entries + 1; 2259 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2260 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2261 return 0; 2262 } 2263 2264 /* 2265 * Create an mds request. 2266 */ 2267 struct ceph_mds_request * 2268 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2269 { 2270 struct ceph_mds_request *req; 2271 2272 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2273 if (!req) 2274 return ERR_PTR(-ENOMEM); 2275 2276 mutex_init(&req->r_fill_mutex); 2277 req->r_mdsc = mdsc; 2278 req->r_started = jiffies; 2279 req->r_start_latency = ktime_get(); 2280 req->r_resend_mds = -1; 2281 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2282 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2283 req->r_fmode = -1; 2284 kref_init(&req->r_kref); 2285 RB_CLEAR_NODE(&req->r_node); 2286 INIT_LIST_HEAD(&req->r_wait); 2287 init_completion(&req->r_completion); 2288 init_completion(&req->r_safe_completion); 2289 INIT_LIST_HEAD(&req->r_unsafe_item); 2290 2291 ktime_get_coarse_real_ts64(&req->r_stamp); 2292 2293 req->r_op = op; 2294 req->r_direct_mode = mode; 2295 return req; 2296 } 2297 2298 /* 2299 * return oldest (lowest) request, tid in request tree, 0 if none. 2300 * 2301 * called under mdsc->mutex. 2302 */ 2303 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2304 { 2305 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2306 return NULL; 2307 return rb_entry(rb_first(&mdsc->request_tree), 2308 struct ceph_mds_request, r_node); 2309 } 2310 2311 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2312 { 2313 return mdsc->oldest_tid; 2314 } 2315 2316 /* 2317 * Build a dentry's path. Allocate on heap; caller must kfree. Based 2318 * on build_path_from_dentry in fs/cifs/dir.c. 2319 * 2320 * If @stop_on_nosnap, generate path relative to the first non-snapped 2321 * inode. 2322 * 2323 * Encode hidden .snap dirs as a double /, i.e. 2324 * foo/.snap/bar -> foo//bar 2325 */ 2326 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, 2327 int stop_on_nosnap) 2328 { 2329 struct dentry *temp; 2330 char *path; 2331 int pos; 2332 unsigned seq; 2333 u64 base; 2334 2335 if (!dentry) 2336 return ERR_PTR(-EINVAL); 2337 2338 path = __getname(); 2339 if (!path) 2340 return ERR_PTR(-ENOMEM); 2341 retry: 2342 pos = PATH_MAX - 1; 2343 path[pos] = '\0'; 2344 2345 seq = read_seqbegin(&rename_lock); 2346 rcu_read_lock(); 2347 temp = dentry; 2348 for (;;) { 2349 struct inode *inode; 2350 2351 spin_lock(&temp->d_lock); 2352 inode = d_inode(temp); 2353 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2354 dout("build_path path+%d: %p SNAPDIR\n", 2355 pos, temp); 2356 } else if (stop_on_nosnap && inode && dentry != temp && 2357 ceph_snap(inode) == CEPH_NOSNAP) { 2358 spin_unlock(&temp->d_lock); 2359 pos++; /* get rid of any prepended '/' */ 2360 break; 2361 } else { 2362 pos -= temp->d_name.len; 2363 if (pos < 0) { 2364 spin_unlock(&temp->d_lock); 2365 break; 2366 } 2367 memcpy(path + pos, temp->d_name.name, temp->d_name.len); 2368 } 2369 spin_unlock(&temp->d_lock); 2370 temp = READ_ONCE(temp->d_parent); 2371 2372 /* Are we at the root? */ 2373 if (IS_ROOT(temp)) 2374 break; 2375 2376 /* Are we out of buffer? */ 2377 if (--pos < 0) 2378 break; 2379 2380 path[pos] = '/'; 2381 } 2382 base = ceph_ino(d_inode(temp)); 2383 rcu_read_unlock(); 2384 2385 if (read_seqretry(&rename_lock, seq)) 2386 goto retry; 2387 2388 if (pos < 0) { 2389 /* 2390 * A rename didn't occur, but somehow we didn't end up where 2391 * we thought we would. Throw a warning and try again. 2392 */ 2393 pr_warn("build_path did not end path lookup where " 2394 "expected, pos is %d\n", pos); 2395 goto retry; 2396 } 2397 2398 *pbase = base; 2399 *plen = PATH_MAX - 1 - pos; 2400 dout("build_path on %p %d built %llx '%.*s'\n", 2401 dentry, d_count(dentry), base, *plen, path + pos); 2402 return path + pos; 2403 } 2404 2405 static int build_dentry_path(struct dentry *dentry, struct inode *dir, 2406 const char **ppath, int *ppathlen, u64 *pino, 2407 bool *pfreepath, bool parent_locked) 2408 { 2409 char *path; 2410 2411 rcu_read_lock(); 2412 if (!dir) 2413 dir = d_inode_rcu(dentry->d_parent); 2414 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { 2415 *pino = ceph_ino(dir); 2416 rcu_read_unlock(); 2417 *ppath = dentry->d_name.name; 2418 *ppathlen = dentry->d_name.len; 2419 return 0; 2420 } 2421 rcu_read_unlock(); 2422 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2423 if (IS_ERR(path)) 2424 return PTR_ERR(path); 2425 *ppath = path; 2426 *pfreepath = true; 2427 return 0; 2428 } 2429 2430 static int build_inode_path(struct inode *inode, 2431 const char **ppath, int *ppathlen, u64 *pino, 2432 bool *pfreepath) 2433 { 2434 struct dentry *dentry; 2435 char *path; 2436 2437 if (ceph_snap(inode) == CEPH_NOSNAP) { 2438 *pino = ceph_ino(inode); 2439 *ppathlen = 0; 2440 return 0; 2441 } 2442 dentry = d_find_alias(inode); 2443 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2444 dput(dentry); 2445 if (IS_ERR(path)) 2446 return PTR_ERR(path); 2447 *ppath = path; 2448 *pfreepath = true; 2449 return 0; 2450 } 2451 2452 /* 2453 * request arguments may be specified via an inode *, a dentry *, or 2454 * an explicit ino+path. 2455 */ 2456 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2457 struct inode *rdiri, const char *rpath, 2458 u64 rino, const char **ppath, int *pathlen, 2459 u64 *ino, bool *freepath, bool parent_locked) 2460 { 2461 int r = 0; 2462 2463 if (rinode) { 2464 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2465 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2466 ceph_snap(rinode)); 2467 } else if (rdentry) { 2468 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2469 freepath, parent_locked); 2470 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2471 *ppath); 2472 } else if (rpath || rino) { 2473 *ino = rino; 2474 *ppath = rpath; 2475 *pathlen = rpath ? strlen(rpath) : 0; 2476 dout(" path %.*s\n", *pathlen, rpath); 2477 } 2478 2479 return r; 2480 } 2481 2482 /* 2483 * called under mdsc->mutex 2484 */ 2485 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, 2486 struct ceph_mds_request *req, 2487 int mds, bool drop_cap_releases) 2488 { 2489 struct ceph_msg *msg; 2490 struct ceph_mds_request_head *head; 2491 const char *path1 = NULL; 2492 const char *path2 = NULL; 2493 u64 ino1 = 0, ino2 = 0; 2494 int pathlen1 = 0, pathlen2 = 0; 2495 bool freepath1 = false, freepath2 = false; 2496 int len; 2497 u16 releases; 2498 void *p, *end; 2499 int ret; 2500 2501 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2502 req->r_parent, req->r_path1, req->r_ino1.ino, 2503 &path1, &pathlen1, &ino1, &freepath1, 2504 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2505 &req->r_req_flags)); 2506 if (ret < 0) { 2507 msg = ERR_PTR(ret); 2508 goto out; 2509 } 2510 2511 /* If r_old_dentry is set, then assume that its parent is locked */ 2512 ret = set_request_path_attr(NULL, req->r_old_dentry, 2513 req->r_old_dentry_dir, 2514 req->r_path2, req->r_ino2.ino, 2515 &path2, &pathlen2, &ino2, &freepath2, true); 2516 if (ret < 0) { 2517 msg = ERR_PTR(ret); 2518 goto out_free1; 2519 } 2520 2521 len = sizeof(*head) + 2522 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2523 sizeof(struct ceph_timespec); 2524 2525 /* calculate (max) length for cap releases */ 2526 len += sizeof(struct ceph_mds_request_release) * 2527 (!!req->r_inode_drop + !!req->r_dentry_drop + 2528 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2529 if (req->r_dentry_drop) 2530 len += pathlen1; 2531 if (req->r_old_dentry_drop) 2532 len += pathlen2; 2533 2534 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2535 if (!msg) { 2536 msg = ERR_PTR(-ENOMEM); 2537 goto out_free2; 2538 } 2539 2540 msg->hdr.version = cpu_to_le16(2); 2541 msg->hdr.tid = cpu_to_le64(req->r_tid); 2542 2543 head = msg->front.iov_base; 2544 p = msg->front.iov_base + sizeof(*head); 2545 end = msg->front.iov_base + msg->front.iov_len; 2546 2547 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2548 head->op = cpu_to_le32(req->r_op); 2549 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid)); 2550 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid)); 2551 head->ino = cpu_to_le64(req->r_deleg_ino); 2552 head->args = req->r_args; 2553 2554 ceph_encode_filepath(&p, end, ino1, path1); 2555 ceph_encode_filepath(&p, end, ino2, path2); 2556 2557 /* make note of release offset, in case we need to replay */ 2558 req->r_request_release_offset = p - msg->front.iov_base; 2559 2560 /* cap releases */ 2561 releases = 0; 2562 if (req->r_inode_drop) 2563 releases += ceph_encode_inode_release(&p, 2564 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2565 mds, req->r_inode_drop, req->r_inode_unless, 2566 req->r_op == CEPH_MDS_OP_READDIR); 2567 if (req->r_dentry_drop) 2568 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2569 req->r_parent, mds, req->r_dentry_drop, 2570 req->r_dentry_unless); 2571 if (req->r_old_dentry_drop) 2572 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2573 req->r_old_dentry_dir, mds, 2574 req->r_old_dentry_drop, 2575 req->r_old_dentry_unless); 2576 if (req->r_old_inode_drop) 2577 releases += ceph_encode_inode_release(&p, 2578 d_inode(req->r_old_dentry), 2579 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2580 2581 if (drop_cap_releases) { 2582 releases = 0; 2583 p = msg->front.iov_base + req->r_request_release_offset; 2584 } 2585 2586 head->num_releases = cpu_to_le16(releases); 2587 2588 /* time stamp */ 2589 { 2590 struct ceph_timespec ts; 2591 ceph_encode_timespec64(&ts, &req->r_stamp); 2592 ceph_encode_copy(&p, &ts, sizeof(ts)); 2593 } 2594 2595 if (WARN_ON_ONCE(p > end)) { 2596 ceph_msg_put(msg); 2597 msg = ERR_PTR(-ERANGE); 2598 goto out_free2; 2599 } 2600 2601 msg->front.iov_len = p - msg->front.iov_base; 2602 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2603 2604 if (req->r_pagelist) { 2605 struct ceph_pagelist *pagelist = req->r_pagelist; 2606 ceph_msg_data_add_pagelist(msg, pagelist); 2607 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2608 } else { 2609 msg->hdr.data_len = 0; 2610 } 2611 2612 msg->hdr.data_off = cpu_to_le16(0); 2613 2614 out_free2: 2615 if (freepath2) 2616 ceph_mdsc_free_path((char *)path2, pathlen2); 2617 out_free1: 2618 if (freepath1) 2619 ceph_mdsc_free_path((char *)path1, pathlen1); 2620 out: 2621 return msg; 2622 } 2623 2624 /* 2625 * called under mdsc->mutex if error, under no mutex if 2626 * success. 2627 */ 2628 static void complete_request(struct ceph_mds_client *mdsc, 2629 struct ceph_mds_request *req) 2630 { 2631 req->r_end_latency = ktime_get(); 2632 2633 if (req->r_callback) 2634 req->r_callback(mdsc, req); 2635 complete_all(&req->r_completion); 2636 } 2637 2638 /* 2639 * called under mdsc->mutex 2640 */ 2641 static int __prepare_send_request(struct ceph_mds_client *mdsc, 2642 struct ceph_mds_request *req, 2643 int mds, bool drop_cap_releases) 2644 { 2645 struct ceph_mds_request_head *rhead; 2646 struct ceph_msg *msg; 2647 int flags = 0; 2648 2649 req->r_attempts++; 2650 if (req->r_inode) { 2651 struct ceph_cap *cap = 2652 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2653 2654 if (cap) 2655 req->r_sent_on_mseq = cap->mseq; 2656 else 2657 req->r_sent_on_mseq = -1; 2658 } 2659 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 2660 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2661 2662 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2663 void *p; 2664 /* 2665 * Replay. Do not regenerate message (and rebuild 2666 * paths, etc.); just use the original message. 2667 * Rebuilding paths will break for renames because 2668 * d_move mangles the src name. 2669 */ 2670 msg = req->r_request; 2671 rhead = msg->front.iov_base; 2672 2673 flags = le32_to_cpu(rhead->flags); 2674 flags |= CEPH_MDS_FLAG_REPLAY; 2675 rhead->flags = cpu_to_le32(flags); 2676 2677 if (req->r_target_inode) 2678 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2679 2680 rhead->num_retry = req->r_attempts - 1; 2681 2682 /* remove cap/dentry releases from message */ 2683 rhead->num_releases = 0; 2684 2685 /* time stamp */ 2686 p = msg->front.iov_base + req->r_request_release_offset; 2687 { 2688 struct ceph_timespec ts; 2689 ceph_encode_timespec64(&ts, &req->r_stamp); 2690 ceph_encode_copy(&p, &ts, sizeof(ts)); 2691 } 2692 2693 msg->front.iov_len = p - msg->front.iov_base; 2694 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2695 return 0; 2696 } 2697 2698 if (req->r_request) { 2699 ceph_msg_put(req->r_request); 2700 req->r_request = NULL; 2701 } 2702 msg = create_request_message(mdsc, req, mds, drop_cap_releases); 2703 if (IS_ERR(msg)) { 2704 req->r_err = PTR_ERR(msg); 2705 return PTR_ERR(msg); 2706 } 2707 req->r_request = msg; 2708 2709 rhead = msg->front.iov_base; 2710 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2711 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2712 flags |= CEPH_MDS_FLAG_REPLAY; 2713 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 2714 flags |= CEPH_MDS_FLAG_ASYNC; 2715 if (req->r_parent) 2716 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2717 rhead->flags = cpu_to_le32(flags); 2718 rhead->num_fwd = req->r_num_fwd; 2719 rhead->num_retry = req->r_attempts - 1; 2720 2721 dout(" r_parent = %p\n", req->r_parent); 2722 return 0; 2723 } 2724 2725 /* 2726 * called under mdsc->mutex 2727 */ 2728 static int __send_request(struct ceph_mds_client *mdsc, 2729 struct ceph_mds_session *session, 2730 struct ceph_mds_request *req, 2731 bool drop_cap_releases) 2732 { 2733 int err; 2734 2735 err = __prepare_send_request(mdsc, req, session->s_mds, 2736 drop_cap_releases); 2737 if (!err) { 2738 ceph_msg_get(req->r_request); 2739 ceph_con_send(&session->s_con, req->r_request); 2740 } 2741 2742 return err; 2743 } 2744 2745 /* 2746 * send request, or put it on the appropriate wait list. 2747 */ 2748 static void __do_request(struct ceph_mds_client *mdsc, 2749 struct ceph_mds_request *req) 2750 { 2751 struct ceph_mds_session *session = NULL; 2752 int mds = -1; 2753 int err = 0; 2754 bool random; 2755 2756 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2757 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2758 __unregister_request(mdsc, req); 2759 return; 2760 } 2761 2762 if (req->r_timeout && 2763 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2764 dout("do_request timed out\n"); 2765 err = -ETIMEDOUT; 2766 goto finish; 2767 } 2768 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2769 dout("do_request forced umount\n"); 2770 err = -EIO; 2771 goto finish; 2772 } 2773 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2774 if (mdsc->mdsmap_err) { 2775 err = mdsc->mdsmap_err; 2776 dout("do_request mdsmap err %d\n", err); 2777 goto finish; 2778 } 2779 if (mdsc->mdsmap->m_epoch == 0) { 2780 dout("do_request no mdsmap, waiting for map\n"); 2781 list_add(&req->r_wait, &mdsc->waiting_for_map); 2782 return; 2783 } 2784 if (!(mdsc->fsc->mount_options->flags & 2785 CEPH_MOUNT_OPT_MOUNTWAIT) && 2786 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2787 err = -EHOSTUNREACH; 2788 goto finish; 2789 } 2790 } 2791 2792 put_request_session(req); 2793 2794 mds = __choose_mds(mdsc, req, &random); 2795 if (mds < 0 || 2796 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2797 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2798 err = -EJUKEBOX; 2799 goto finish; 2800 } 2801 dout("do_request no mds or not active, waiting for map\n"); 2802 list_add(&req->r_wait, &mdsc->waiting_for_map); 2803 return; 2804 } 2805 2806 /* get, open session */ 2807 session = __ceph_lookup_mds_session(mdsc, mds); 2808 if (!session) { 2809 session = register_session(mdsc, mds); 2810 if (IS_ERR(session)) { 2811 err = PTR_ERR(session); 2812 goto finish; 2813 } 2814 } 2815 req->r_session = ceph_get_mds_session(session); 2816 2817 dout("do_request mds%d session %p state %s\n", mds, session, 2818 ceph_session_state_name(session->s_state)); 2819 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2820 session->s_state != CEPH_MDS_SESSION_HUNG) { 2821 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2822 err = -EACCES; 2823 goto out_session; 2824 } 2825 /* 2826 * We cannot queue async requests since the caps and delegated 2827 * inodes are bound to the session. Just return -EJUKEBOX and 2828 * let the caller retry a sync request in that case. 2829 */ 2830 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2831 err = -EJUKEBOX; 2832 goto out_session; 2833 } 2834 if (session->s_state == CEPH_MDS_SESSION_NEW || 2835 session->s_state == CEPH_MDS_SESSION_CLOSING) { 2836 err = __open_session(mdsc, session); 2837 if (err) 2838 goto out_session; 2839 /* retry the same mds later */ 2840 if (random) 2841 req->r_resend_mds = mds; 2842 } 2843 list_add(&req->r_wait, &session->s_waiting); 2844 goto out_session; 2845 } 2846 2847 /* send request */ 2848 req->r_resend_mds = -1; /* forget any previous mds hint */ 2849 2850 if (req->r_request_started == 0) /* note request start time */ 2851 req->r_request_started = jiffies; 2852 2853 err = __send_request(mdsc, session, req, false); 2854 2855 out_session: 2856 ceph_put_mds_session(session); 2857 finish: 2858 if (err) { 2859 dout("__do_request early error %d\n", err); 2860 req->r_err = err; 2861 complete_request(mdsc, req); 2862 __unregister_request(mdsc, req); 2863 } 2864 return; 2865 } 2866 2867 /* 2868 * called under mdsc->mutex 2869 */ 2870 static void __wake_requests(struct ceph_mds_client *mdsc, 2871 struct list_head *head) 2872 { 2873 struct ceph_mds_request *req; 2874 LIST_HEAD(tmp_list); 2875 2876 list_splice_init(head, &tmp_list); 2877 2878 while (!list_empty(&tmp_list)) { 2879 req = list_entry(tmp_list.next, 2880 struct ceph_mds_request, r_wait); 2881 list_del_init(&req->r_wait); 2882 dout(" wake request %p tid %llu\n", req, req->r_tid); 2883 __do_request(mdsc, req); 2884 } 2885 } 2886 2887 /* 2888 * Wake up threads with requests pending for @mds, so that they can 2889 * resubmit their requests to a possibly different mds. 2890 */ 2891 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 2892 { 2893 struct ceph_mds_request *req; 2894 struct rb_node *p = rb_first(&mdsc->request_tree); 2895 2896 dout("kick_requests mds%d\n", mds); 2897 while (p) { 2898 req = rb_entry(p, struct ceph_mds_request, r_node); 2899 p = rb_next(p); 2900 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2901 continue; 2902 if (req->r_attempts > 0) 2903 continue; /* only new requests */ 2904 if (req->r_session && 2905 req->r_session->s_mds == mds) { 2906 dout(" kicking tid %llu\n", req->r_tid); 2907 list_del_init(&req->r_wait); 2908 __do_request(mdsc, req); 2909 } 2910 } 2911 } 2912 2913 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 2914 struct ceph_mds_request *req) 2915 { 2916 int err = 0; 2917 2918 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 2919 if (req->r_inode) 2920 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2921 if (req->r_parent) { 2922 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 2923 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 2924 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 2925 spin_lock(&ci->i_ceph_lock); 2926 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 2927 __ceph_touch_fmode(ci, mdsc, fmode); 2928 spin_unlock(&ci->i_ceph_lock); 2929 ihold(req->r_parent); 2930 } 2931 if (req->r_old_dentry_dir) 2932 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2933 CEPH_CAP_PIN); 2934 2935 if (req->r_inode) { 2936 err = ceph_wait_on_async_create(req->r_inode); 2937 if (err) { 2938 dout("%s: wait for async create returned: %d\n", 2939 __func__, err); 2940 return err; 2941 } 2942 } 2943 2944 if (!err && req->r_old_inode) { 2945 err = ceph_wait_on_async_create(req->r_old_inode); 2946 if (err) { 2947 dout("%s: wait for async create returned: %d\n", 2948 __func__, err); 2949 return err; 2950 } 2951 } 2952 2953 dout("submit_request on %p for inode %p\n", req, dir); 2954 mutex_lock(&mdsc->mutex); 2955 __register_request(mdsc, req, dir); 2956 __do_request(mdsc, req); 2957 err = req->r_err; 2958 mutex_unlock(&mdsc->mutex); 2959 return err; 2960 } 2961 2962 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 2963 struct ceph_mds_request *req) 2964 { 2965 int err; 2966 2967 /* wait */ 2968 dout("do_request waiting\n"); 2969 if (!req->r_timeout && req->r_wait_for_completion) { 2970 err = req->r_wait_for_completion(mdsc, req); 2971 } else { 2972 long timeleft = wait_for_completion_killable_timeout( 2973 &req->r_completion, 2974 ceph_timeout_jiffies(req->r_timeout)); 2975 if (timeleft > 0) 2976 err = 0; 2977 else if (!timeleft) 2978 err = -ETIMEDOUT; /* timed out */ 2979 else 2980 err = timeleft; /* killed */ 2981 } 2982 dout("do_request waited, got %d\n", err); 2983 mutex_lock(&mdsc->mutex); 2984 2985 /* only abort if we didn't race with a real reply */ 2986 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2987 err = le32_to_cpu(req->r_reply_info.head->result); 2988 } else if (err < 0) { 2989 dout("aborted request %lld with %d\n", req->r_tid, err); 2990 2991 /* 2992 * ensure we aren't running concurrently with 2993 * ceph_fill_trace or ceph_readdir_prepopulate, which 2994 * rely on locks (dir mutex) held by our caller. 2995 */ 2996 mutex_lock(&req->r_fill_mutex); 2997 req->r_err = err; 2998 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 2999 mutex_unlock(&req->r_fill_mutex); 3000 3001 if (req->r_parent && 3002 (req->r_op & CEPH_MDS_OP_WRITE)) 3003 ceph_invalidate_dir_request(req); 3004 } else { 3005 err = req->r_err; 3006 } 3007 3008 mutex_unlock(&mdsc->mutex); 3009 return err; 3010 } 3011 3012 /* 3013 * Synchrously perform an mds request. Take care of all of the 3014 * session setup, forwarding, retry details. 3015 */ 3016 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3017 struct inode *dir, 3018 struct ceph_mds_request *req) 3019 { 3020 int err; 3021 3022 dout("do_request on %p\n", req); 3023 3024 /* issue */ 3025 err = ceph_mdsc_submit_request(mdsc, dir, req); 3026 if (!err) 3027 err = ceph_mdsc_wait_request(mdsc, req); 3028 dout("do_request %p done, result %d\n", req, err); 3029 return err; 3030 } 3031 3032 /* 3033 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3034 * namespace request. 3035 */ 3036 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3037 { 3038 struct inode *dir = req->r_parent; 3039 struct inode *old_dir = req->r_old_dentry_dir; 3040 3041 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 3042 3043 ceph_dir_clear_complete(dir); 3044 if (old_dir) 3045 ceph_dir_clear_complete(old_dir); 3046 if (req->r_dentry) 3047 ceph_invalidate_dentry_lease(req->r_dentry); 3048 if (req->r_old_dentry) 3049 ceph_invalidate_dentry_lease(req->r_old_dentry); 3050 } 3051 3052 /* 3053 * Handle mds reply. 3054 * 3055 * We take the session mutex and parse and process the reply immediately. 3056 * This preserves the logical ordering of replies, capabilities, etc., sent 3057 * by the MDS as they are applied to our local cache. 3058 */ 3059 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3060 { 3061 struct ceph_mds_client *mdsc = session->s_mdsc; 3062 struct ceph_mds_request *req; 3063 struct ceph_mds_reply_head *head = msg->front.iov_base; 3064 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3065 struct ceph_snap_realm *realm; 3066 u64 tid; 3067 int err, result; 3068 int mds = session->s_mds; 3069 3070 if (msg->front.iov_len < sizeof(*head)) { 3071 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 3072 ceph_msg_dump(msg); 3073 return; 3074 } 3075 3076 /* get request, session */ 3077 tid = le64_to_cpu(msg->hdr.tid); 3078 mutex_lock(&mdsc->mutex); 3079 req = lookup_get_request(mdsc, tid); 3080 if (!req) { 3081 dout("handle_reply on unknown tid %llu\n", tid); 3082 mutex_unlock(&mdsc->mutex); 3083 return; 3084 } 3085 dout("handle_reply %p\n", req); 3086 3087 /* correct session? */ 3088 if (req->r_session != session) { 3089 pr_err("mdsc_handle_reply got %llu on session mds%d" 3090 " not mds%d\n", tid, session->s_mds, 3091 req->r_session ? req->r_session->s_mds : -1); 3092 mutex_unlock(&mdsc->mutex); 3093 goto out; 3094 } 3095 3096 /* dup? */ 3097 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3098 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3099 pr_warn("got a dup %s reply on %llu from mds%d\n", 3100 head->safe ? "safe" : "unsafe", tid, mds); 3101 mutex_unlock(&mdsc->mutex); 3102 goto out; 3103 } 3104 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3105 pr_warn("got unsafe after safe on %llu from mds%d\n", 3106 tid, mds); 3107 mutex_unlock(&mdsc->mutex); 3108 goto out; 3109 } 3110 3111 result = le32_to_cpu(head->result); 3112 3113 /* 3114 * Handle an ESTALE 3115 * if we're not talking to the authority, send to them 3116 * if the authority has changed while we weren't looking, 3117 * send to new authority 3118 * Otherwise we just have to return an ESTALE 3119 */ 3120 if (result == -ESTALE) { 3121 dout("got ESTALE on request %llu\n", req->r_tid); 3122 req->r_resend_mds = -1; 3123 if (req->r_direct_mode != USE_AUTH_MDS) { 3124 dout("not using auth, setting for that now\n"); 3125 req->r_direct_mode = USE_AUTH_MDS; 3126 __do_request(mdsc, req); 3127 mutex_unlock(&mdsc->mutex); 3128 goto out; 3129 } else { 3130 int mds = __choose_mds(mdsc, req, NULL); 3131 if (mds >= 0 && mds != req->r_session->s_mds) { 3132 dout("but auth changed, so resending\n"); 3133 __do_request(mdsc, req); 3134 mutex_unlock(&mdsc->mutex); 3135 goto out; 3136 } 3137 } 3138 dout("have to return ESTALE on request %llu\n", req->r_tid); 3139 } 3140 3141 3142 if (head->safe) { 3143 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3144 __unregister_request(mdsc, req); 3145 3146 /* last request during umount? */ 3147 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3148 complete_all(&mdsc->safe_umount_waiters); 3149 3150 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3151 /* 3152 * We already handled the unsafe response, now do the 3153 * cleanup. No need to examine the response; the MDS 3154 * doesn't include any result info in the safe 3155 * response. And even if it did, there is nothing 3156 * useful we could do with a revised return value. 3157 */ 3158 dout("got safe reply %llu, mds%d\n", tid, mds); 3159 3160 mutex_unlock(&mdsc->mutex); 3161 goto out; 3162 } 3163 } else { 3164 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3165 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3166 } 3167 3168 dout("handle_reply tid %lld result %d\n", tid, result); 3169 rinfo = &req->r_reply_info; 3170 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3171 err = parse_reply_info(session, msg, rinfo, (u64)-1); 3172 else 3173 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); 3174 mutex_unlock(&mdsc->mutex); 3175 3176 mutex_lock(&session->s_mutex); 3177 if (err < 0) { 3178 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 3179 ceph_msg_dump(msg); 3180 goto out_err; 3181 } 3182 3183 /* snap trace */ 3184 realm = NULL; 3185 if (rinfo->snapblob_len) { 3186 down_write(&mdsc->snap_rwsem); 3187 ceph_update_snap_trace(mdsc, rinfo->snapblob, 3188 rinfo->snapblob + rinfo->snapblob_len, 3189 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3190 &realm); 3191 downgrade_write(&mdsc->snap_rwsem); 3192 } else { 3193 down_read(&mdsc->snap_rwsem); 3194 } 3195 3196 /* insert trace into our cache */ 3197 mutex_lock(&req->r_fill_mutex); 3198 current->journal_info = req; 3199 err = ceph_fill_trace(mdsc->fsc->sb, req); 3200 if (err == 0) { 3201 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3202 req->r_op == CEPH_MDS_OP_LSSNAP)) 3203 ceph_readdir_prepopulate(req, req->r_session); 3204 } 3205 current->journal_info = NULL; 3206 mutex_unlock(&req->r_fill_mutex); 3207 3208 up_read(&mdsc->snap_rwsem); 3209 if (realm) 3210 ceph_put_snap_realm(mdsc, realm); 3211 3212 if (err == 0) { 3213 if (req->r_target_inode && 3214 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3215 struct ceph_inode_info *ci = 3216 ceph_inode(req->r_target_inode); 3217 spin_lock(&ci->i_unsafe_lock); 3218 list_add_tail(&req->r_unsafe_target_item, 3219 &ci->i_unsafe_iops); 3220 spin_unlock(&ci->i_unsafe_lock); 3221 } 3222 3223 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3224 } 3225 out_err: 3226 mutex_lock(&mdsc->mutex); 3227 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3228 if (err) { 3229 req->r_err = err; 3230 } else { 3231 req->r_reply = ceph_msg_get(msg); 3232 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3233 } 3234 } else { 3235 dout("reply arrived after request %lld was aborted\n", tid); 3236 } 3237 mutex_unlock(&mdsc->mutex); 3238 3239 mutex_unlock(&session->s_mutex); 3240 3241 /* kick calling process */ 3242 complete_request(mdsc, req); 3243 3244 ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency, 3245 req->r_end_latency, err); 3246 out: 3247 ceph_mdsc_put_request(req); 3248 return; 3249 } 3250 3251 3252 3253 /* 3254 * handle mds notification that our request has been forwarded. 3255 */ 3256 static void handle_forward(struct ceph_mds_client *mdsc, 3257 struct ceph_mds_session *session, 3258 struct ceph_msg *msg) 3259 { 3260 struct ceph_mds_request *req; 3261 u64 tid = le64_to_cpu(msg->hdr.tid); 3262 u32 next_mds; 3263 u32 fwd_seq; 3264 int err = -EINVAL; 3265 void *p = msg->front.iov_base; 3266 void *end = p + msg->front.iov_len; 3267 3268 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3269 next_mds = ceph_decode_32(&p); 3270 fwd_seq = ceph_decode_32(&p); 3271 3272 mutex_lock(&mdsc->mutex); 3273 req = lookup_get_request(mdsc, tid); 3274 if (!req) { 3275 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 3276 goto out; /* dup reply? */ 3277 } 3278 3279 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3280 dout("forward tid %llu aborted, unregistering\n", tid); 3281 __unregister_request(mdsc, req); 3282 } else if (fwd_seq <= req->r_num_fwd) { 3283 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 3284 tid, next_mds, req->r_num_fwd, fwd_seq); 3285 } else { 3286 /* resend. forward race not possible; mds would drop */ 3287 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3288 BUG_ON(req->r_err); 3289 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3290 req->r_attempts = 0; 3291 req->r_num_fwd = fwd_seq; 3292 req->r_resend_mds = next_mds; 3293 put_request_session(req); 3294 __do_request(mdsc, req); 3295 } 3296 ceph_mdsc_put_request(req); 3297 out: 3298 mutex_unlock(&mdsc->mutex); 3299 return; 3300 3301 bad: 3302 pr_err("mdsc_handle_forward decode error err=%d\n", err); 3303 } 3304 3305 static int __decode_session_metadata(void **p, void *end, 3306 bool *blacklisted) 3307 { 3308 /* map<string,string> */ 3309 u32 n; 3310 bool err_str; 3311 ceph_decode_32_safe(p, end, n, bad); 3312 while (n-- > 0) { 3313 u32 len; 3314 ceph_decode_32_safe(p, end, len, bad); 3315 ceph_decode_need(p, end, len, bad); 3316 err_str = !strncmp(*p, "error_string", len); 3317 *p += len; 3318 ceph_decode_32_safe(p, end, len, bad); 3319 ceph_decode_need(p, end, len, bad); 3320 if (err_str && strnstr(*p, "blacklisted", len)) 3321 *blacklisted = true; 3322 *p += len; 3323 } 3324 return 0; 3325 bad: 3326 return -1; 3327 } 3328 3329 /* 3330 * handle a mds session control message 3331 */ 3332 static void handle_session(struct ceph_mds_session *session, 3333 struct ceph_msg *msg) 3334 { 3335 struct ceph_mds_client *mdsc = session->s_mdsc; 3336 int mds = session->s_mds; 3337 int msg_version = le16_to_cpu(msg->hdr.version); 3338 void *p = msg->front.iov_base; 3339 void *end = p + msg->front.iov_len; 3340 struct ceph_mds_session_head *h; 3341 u32 op; 3342 u64 seq, features = 0; 3343 int wake = 0; 3344 bool blacklisted = false; 3345 3346 /* decode */ 3347 ceph_decode_need(&p, end, sizeof(*h), bad); 3348 h = p; 3349 p += sizeof(*h); 3350 3351 op = le32_to_cpu(h->op); 3352 seq = le64_to_cpu(h->seq); 3353 3354 if (msg_version >= 3) { 3355 u32 len; 3356 /* version >= 2, metadata */ 3357 if (__decode_session_metadata(&p, end, &blacklisted) < 0) 3358 goto bad; 3359 /* version >= 3, feature bits */ 3360 ceph_decode_32_safe(&p, end, len, bad); 3361 if (len) { 3362 ceph_decode_64_safe(&p, end, features, bad); 3363 p += len - sizeof(features); 3364 } 3365 } 3366 3367 mutex_lock(&mdsc->mutex); 3368 if (op == CEPH_SESSION_CLOSE) { 3369 ceph_get_mds_session(session); 3370 __unregister_session(mdsc, session); 3371 } 3372 /* FIXME: this ttl calculation is generous */ 3373 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 3374 mutex_unlock(&mdsc->mutex); 3375 3376 mutex_lock(&session->s_mutex); 3377 3378 dout("handle_session mds%d %s %p state %s seq %llu\n", 3379 mds, ceph_session_op_name(op), session, 3380 ceph_session_state_name(session->s_state), seq); 3381 3382 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 3383 session->s_state = CEPH_MDS_SESSION_OPEN; 3384 pr_info("mds%d came back\n", session->s_mds); 3385 } 3386 3387 switch (op) { 3388 case CEPH_SESSION_OPEN: 3389 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3390 pr_info("mds%d reconnect success\n", session->s_mds); 3391 session->s_state = CEPH_MDS_SESSION_OPEN; 3392 session->s_features = features; 3393 renewed_caps(mdsc, session, 0); 3394 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features)) 3395 metric_schedule_delayed(&mdsc->metric); 3396 wake = 1; 3397 if (mdsc->stopping) 3398 __close_session(mdsc, session); 3399 break; 3400 3401 case CEPH_SESSION_RENEWCAPS: 3402 if (session->s_renew_seq == seq) 3403 renewed_caps(mdsc, session, 1); 3404 break; 3405 3406 case CEPH_SESSION_CLOSE: 3407 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3408 pr_info("mds%d reconnect denied\n", session->s_mds); 3409 session->s_state = CEPH_MDS_SESSION_CLOSED; 3410 cleanup_session_requests(mdsc, session); 3411 remove_session_caps(session); 3412 wake = 2; /* for good measure */ 3413 wake_up_all(&mdsc->session_close_wq); 3414 break; 3415 3416 case CEPH_SESSION_STALE: 3417 pr_info("mds%d caps went stale, renewing\n", 3418 session->s_mds); 3419 spin_lock(&session->s_gen_ttl_lock); 3420 session->s_cap_gen++; 3421 session->s_cap_ttl = jiffies - 1; 3422 spin_unlock(&session->s_gen_ttl_lock); 3423 send_renew_caps(mdsc, session); 3424 break; 3425 3426 case CEPH_SESSION_RECALL_STATE: 3427 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 3428 break; 3429 3430 case CEPH_SESSION_FLUSHMSG: 3431 send_flushmsg_ack(mdsc, session, seq); 3432 break; 3433 3434 case CEPH_SESSION_FORCE_RO: 3435 dout("force_session_readonly %p\n", session); 3436 spin_lock(&session->s_cap_lock); 3437 session->s_readonly = true; 3438 spin_unlock(&session->s_cap_lock); 3439 wake_up_session_caps(session, FORCE_RO); 3440 break; 3441 3442 case CEPH_SESSION_REJECT: 3443 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 3444 pr_info("mds%d rejected session\n", session->s_mds); 3445 session->s_state = CEPH_MDS_SESSION_REJECTED; 3446 cleanup_session_requests(mdsc, session); 3447 remove_session_caps(session); 3448 if (blacklisted) 3449 mdsc->fsc->blacklisted = true; 3450 wake = 2; /* for good measure */ 3451 break; 3452 3453 default: 3454 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 3455 WARN_ON(1); 3456 } 3457 3458 mutex_unlock(&session->s_mutex); 3459 if (wake) { 3460 mutex_lock(&mdsc->mutex); 3461 __wake_requests(mdsc, &session->s_waiting); 3462 if (wake == 2) 3463 kick_requests(mdsc, mds); 3464 mutex_unlock(&mdsc->mutex); 3465 } 3466 if (op == CEPH_SESSION_CLOSE) 3467 ceph_put_mds_session(session); 3468 return; 3469 3470 bad: 3471 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 3472 (int)msg->front.iov_len); 3473 ceph_msg_dump(msg); 3474 return; 3475 } 3476 3477 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 3478 { 3479 int dcaps; 3480 3481 dcaps = xchg(&req->r_dir_caps, 0); 3482 if (dcaps) { 3483 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3484 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 3485 } 3486 } 3487 3488 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req) 3489 { 3490 int dcaps; 3491 3492 dcaps = xchg(&req->r_dir_caps, 0); 3493 if (dcaps) { 3494 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3495 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent), 3496 dcaps); 3497 } 3498 } 3499 3500 /* 3501 * called under session->mutex. 3502 */ 3503 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 3504 struct ceph_mds_session *session) 3505 { 3506 struct ceph_mds_request *req, *nreq; 3507 struct rb_node *p; 3508 3509 dout("replay_unsafe_requests mds%d\n", session->s_mds); 3510 3511 mutex_lock(&mdsc->mutex); 3512 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 3513 __send_request(mdsc, session, req, true); 3514 3515 /* 3516 * also re-send old requests when MDS enters reconnect stage. So that MDS 3517 * can process completed request in clientreplay stage. 3518 */ 3519 p = rb_first(&mdsc->request_tree); 3520 while (p) { 3521 req = rb_entry(p, struct ceph_mds_request, r_node); 3522 p = rb_next(p); 3523 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3524 continue; 3525 if (req->r_attempts == 0) 3526 continue; /* only old requests */ 3527 if (!req->r_session) 3528 continue; 3529 if (req->r_session->s_mds != session->s_mds) 3530 continue; 3531 3532 ceph_mdsc_release_dir_caps_no_check(req); 3533 3534 __send_request(mdsc, session, req, true); 3535 } 3536 mutex_unlock(&mdsc->mutex); 3537 } 3538 3539 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 3540 { 3541 struct ceph_msg *reply; 3542 struct ceph_pagelist *_pagelist; 3543 struct page *page; 3544 __le32 *addr; 3545 int err = -ENOMEM; 3546 3547 if (!recon_state->allow_multi) 3548 return -ENOSPC; 3549 3550 /* can't handle message that contains both caps and realm */ 3551 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 3552 3553 /* pre-allocate new pagelist */ 3554 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 3555 if (!_pagelist) 3556 return -ENOMEM; 3557 3558 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3559 if (!reply) 3560 goto fail_msg; 3561 3562 /* placeholder for nr_caps */ 3563 err = ceph_pagelist_encode_32(_pagelist, 0); 3564 if (err < 0) 3565 goto fail; 3566 3567 if (recon_state->nr_caps) { 3568 /* currently encoding caps */ 3569 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 3570 if (err) 3571 goto fail; 3572 } else { 3573 /* placeholder for nr_realms (currently encoding relams) */ 3574 err = ceph_pagelist_encode_32(_pagelist, 0); 3575 if (err < 0) 3576 goto fail; 3577 } 3578 3579 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 3580 if (err) 3581 goto fail; 3582 3583 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 3584 addr = kmap_atomic(page); 3585 if (recon_state->nr_caps) { 3586 /* currently encoding caps */ 3587 *addr = cpu_to_le32(recon_state->nr_caps); 3588 } else { 3589 /* currently encoding relams */ 3590 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 3591 } 3592 kunmap_atomic(addr); 3593 3594 reply->hdr.version = cpu_to_le16(5); 3595 reply->hdr.compat_version = cpu_to_le16(4); 3596 3597 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 3598 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 3599 3600 ceph_con_send(&recon_state->session->s_con, reply); 3601 ceph_pagelist_release(recon_state->pagelist); 3602 3603 recon_state->pagelist = _pagelist; 3604 recon_state->nr_caps = 0; 3605 recon_state->nr_realms = 0; 3606 recon_state->msg_version = 5; 3607 return 0; 3608 fail: 3609 ceph_msg_put(reply); 3610 fail_msg: 3611 ceph_pagelist_release(_pagelist); 3612 return err; 3613 } 3614 3615 /* 3616 * Encode information about a cap for a reconnect with the MDS. 3617 */ 3618 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, 3619 void *arg) 3620 { 3621 union { 3622 struct ceph_mds_cap_reconnect v2; 3623 struct ceph_mds_cap_reconnect_v1 v1; 3624 } rec; 3625 struct ceph_inode_info *ci = cap->ci; 3626 struct ceph_reconnect_state *recon_state = arg; 3627 struct ceph_pagelist *pagelist = recon_state->pagelist; 3628 int err; 3629 u64 snap_follows; 3630 3631 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 3632 inode, ceph_vinop(inode), cap, cap->cap_id, 3633 ceph_cap_string(cap->issued)); 3634 3635 spin_lock(&ci->i_ceph_lock); 3636 cap->seq = 0; /* reset cap seq */ 3637 cap->issue_seq = 0; /* and issue_seq */ 3638 cap->mseq = 0; /* and migrate_seq */ 3639 cap->cap_gen = cap->session->s_cap_gen; 3640 3641 /* These are lost when the session goes away */ 3642 if (S_ISDIR(inode->i_mode)) { 3643 if (cap->issued & CEPH_CAP_DIR_CREATE) { 3644 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 3645 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 3646 } 3647 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 3648 } 3649 3650 if (recon_state->msg_version >= 2) { 3651 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 3652 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3653 rec.v2.issued = cpu_to_le32(cap->issued); 3654 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3655 rec.v2.pathbase = 0; 3656 rec.v2.flock_len = (__force __le32) 3657 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 3658 } else { 3659 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 3660 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3661 rec.v1.issued = cpu_to_le32(cap->issued); 3662 rec.v1.size = cpu_to_le64(inode->i_size); 3663 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 3664 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 3665 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3666 rec.v1.pathbase = 0; 3667 } 3668 3669 if (list_empty(&ci->i_cap_snaps)) { 3670 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 3671 } else { 3672 struct ceph_cap_snap *capsnap = 3673 list_first_entry(&ci->i_cap_snaps, 3674 struct ceph_cap_snap, ci_item); 3675 snap_follows = capsnap->follows; 3676 } 3677 spin_unlock(&ci->i_ceph_lock); 3678 3679 if (recon_state->msg_version >= 2) { 3680 int num_fcntl_locks, num_flock_locks; 3681 struct ceph_filelock *flocks = NULL; 3682 size_t struct_len, total_len = sizeof(u64); 3683 u8 struct_v = 0; 3684 3685 encode_again: 3686 if (rec.v2.flock_len) { 3687 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 3688 } else { 3689 num_fcntl_locks = 0; 3690 num_flock_locks = 0; 3691 } 3692 if (num_fcntl_locks + num_flock_locks > 0) { 3693 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 3694 sizeof(struct ceph_filelock), 3695 GFP_NOFS); 3696 if (!flocks) { 3697 err = -ENOMEM; 3698 goto out_err; 3699 } 3700 err = ceph_encode_locks_to_buffer(inode, flocks, 3701 num_fcntl_locks, 3702 num_flock_locks); 3703 if (err) { 3704 kfree(flocks); 3705 flocks = NULL; 3706 if (err == -ENOSPC) 3707 goto encode_again; 3708 goto out_err; 3709 } 3710 } else { 3711 kfree(flocks); 3712 flocks = NULL; 3713 } 3714 3715 if (recon_state->msg_version >= 3) { 3716 /* version, compat_version and struct_len */ 3717 total_len += 2 * sizeof(u8) + sizeof(u32); 3718 struct_v = 2; 3719 } 3720 /* 3721 * number of encoded locks is stable, so copy to pagelist 3722 */ 3723 struct_len = 2 * sizeof(u32) + 3724 (num_fcntl_locks + num_flock_locks) * 3725 sizeof(struct ceph_filelock); 3726 rec.v2.flock_len = cpu_to_le32(struct_len); 3727 3728 struct_len += sizeof(u32) + sizeof(rec.v2); 3729 3730 if (struct_v >= 2) 3731 struct_len += sizeof(u64); /* snap_follows */ 3732 3733 total_len += struct_len; 3734 3735 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 3736 err = send_reconnect_partial(recon_state); 3737 if (err) 3738 goto out_freeflocks; 3739 pagelist = recon_state->pagelist; 3740 } 3741 3742 err = ceph_pagelist_reserve(pagelist, total_len); 3743 if (err) 3744 goto out_freeflocks; 3745 3746 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3747 if (recon_state->msg_version >= 3) { 3748 ceph_pagelist_encode_8(pagelist, struct_v); 3749 ceph_pagelist_encode_8(pagelist, 1); 3750 ceph_pagelist_encode_32(pagelist, struct_len); 3751 } 3752 ceph_pagelist_encode_string(pagelist, NULL, 0); 3753 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 3754 ceph_locks_to_pagelist(flocks, pagelist, 3755 num_fcntl_locks, num_flock_locks); 3756 if (struct_v >= 2) 3757 ceph_pagelist_encode_64(pagelist, snap_follows); 3758 out_freeflocks: 3759 kfree(flocks); 3760 } else { 3761 u64 pathbase = 0; 3762 int pathlen = 0; 3763 char *path = NULL; 3764 struct dentry *dentry; 3765 3766 dentry = d_find_alias(inode); 3767 if (dentry) { 3768 path = ceph_mdsc_build_path(dentry, 3769 &pathlen, &pathbase, 0); 3770 dput(dentry); 3771 if (IS_ERR(path)) { 3772 err = PTR_ERR(path); 3773 goto out_err; 3774 } 3775 rec.v1.pathbase = cpu_to_le64(pathbase); 3776 } 3777 3778 err = ceph_pagelist_reserve(pagelist, 3779 sizeof(u64) + sizeof(u32) + 3780 pathlen + sizeof(rec.v1)); 3781 if (err) { 3782 goto out_freepath; 3783 } 3784 3785 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3786 ceph_pagelist_encode_string(pagelist, path, pathlen); 3787 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 3788 out_freepath: 3789 ceph_mdsc_free_path(path, pathlen); 3790 } 3791 3792 out_err: 3793 if (err >= 0) 3794 recon_state->nr_caps++; 3795 return err; 3796 } 3797 3798 static int encode_snap_realms(struct ceph_mds_client *mdsc, 3799 struct ceph_reconnect_state *recon_state) 3800 { 3801 struct rb_node *p; 3802 struct ceph_pagelist *pagelist = recon_state->pagelist; 3803 int err = 0; 3804 3805 if (recon_state->msg_version >= 4) { 3806 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 3807 if (err < 0) 3808 goto fail; 3809 } 3810 3811 /* 3812 * snaprealms. we provide mds with the ino, seq (version), and 3813 * parent for all of our realms. If the mds has any newer info, 3814 * it will tell us. 3815 */ 3816 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 3817 struct ceph_snap_realm *realm = 3818 rb_entry(p, struct ceph_snap_realm, node); 3819 struct ceph_mds_snaprealm_reconnect sr_rec; 3820 3821 if (recon_state->msg_version >= 4) { 3822 size_t need = sizeof(u8) * 2 + sizeof(u32) + 3823 sizeof(sr_rec); 3824 3825 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 3826 err = send_reconnect_partial(recon_state); 3827 if (err) 3828 goto fail; 3829 pagelist = recon_state->pagelist; 3830 } 3831 3832 err = ceph_pagelist_reserve(pagelist, need); 3833 if (err) 3834 goto fail; 3835 3836 ceph_pagelist_encode_8(pagelist, 1); 3837 ceph_pagelist_encode_8(pagelist, 1); 3838 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 3839 } 3840 3841 dout(" adding snap realm %llx seq %lld parent %llx\n", 3842 realm->ino, realm->seq, realm->parent_ino); 3843 sr_rec.ino = cpu_to_le64(realm->ino); 3844 sr_rec.seq = cpu_to_le64(realm->seq); 3845 sr_rec.parent = cpu_to_le64(realm->parent_ino); 3846 3847 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 3848 if (err) 3849 goto fail; 3850 3851 recon_state->nr_realms++; 3852 } 3853 fail: 3854 return err; 3855 } 3856 3857 3858 /* 3859 * If an MDS fails and recovers, clients need to reconnect in order to 3860 * reestablish shared state. This includes all caps issued through 3861 * this session _and_ the snap_realm hierarchy. Because it's not 3862 * clear which snap realms the mds cares about, we send everything we 3863 * know about.. that ensures we'll then get any new info the 3864 * recovering MDS might have. 3865 * 3866 * This is a relatively heavyweight operation, but it's rare. 3867 */ 3868 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 3869 struct ceph_mds_session *session) 3870 { 3871 struct ceph_msg *reply; 3872 int mds = session->s_mds; 3873 int err = -ENOMEM; 3874 struct ceph_reconnect_state recon_state = { 3875 .session = session, 3876 }; 3877 LIST_HEAD(dispose); 3878 3879 pr_info("mds%d reconnect start\n", mds); 3880 3881 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 3882 if (!recon_state.pagelist) 3883 goto fail_nopagelist; 3884 3885 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3886 if (!reply) 3887 goto fail_nomsg; 3888 3889 xa_destroy(&session->s_delegated_inos); 3890 3891 mutex_lock(&session->s_mutex); 3892 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 3893 session->s_seq = 0; 3894 3895 dout("session %p state %s\n", session, 3896 ceph_session_state_name(session->s_state)); 3897 3898 spin_lock(&session->s_gen_ttl_lock); 3899 session->s_cap_gen++; 3900 spin_unlock(&session->s_gen_ttl_lock); 3901 3902 spin_lock(&session->s_cap_lock); 3903 /* don't know if session is readonly */ 3904 session->s_readonly = 0; 3905 /* 3906 * notify __ceph_remove_cap() that we are composing cap reconnect. 3907 * If a cap get released before being added to the cap reconnect, 3908 * __ceph_remove_cap() should skip queuing cap release. 3909 */ 3910 session->s_cap_reconnect = 1; 3911 /* drop old cap expires; we're about to reestablish that state */ 3912 detach_cap_releases(session, &dispose); 3913 spin_unlock(&session->s_cap_lock); 3914 dispose_cap_releases(mdsc, &dispose); 3915 3916 /* trim unused caps to reduce MDS's cache rejoin time */ 3917 if (mdsc->fsc->sb->s_root) 3918 shrink_dcache_parent(mdsc->fsc->sb->s_root); 3919 3920 ceph_con_close(&session->s_con); 3921 ceph_con_open(&session->s_con, 3922 CEPH_ENTITY_TYPE_MDS, mds, 3923 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 3924 3925 /* replay unsafe requests */ 3926 replay_unsafe_requests(mdsc, session); 3927 3928 ceph_early_kick_flushing_caps(mdsc, session); 3929 3930 down_read(&mdsc->snap_rwsem); 3931 3932 /* placeholder for nr_caps */ 3933 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 3934 if (err) 3935 goto fail; 3936 3937 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 3938 recon_state.msg_version = 3; 3939 recon_state.allow_multi = true; 3940 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 3941 recon_state.msg_version = 3; 3942 } else { 3943 recon_state.msg_version = 2; 3944 } 3945 /* trsaverse this session's caps */ 3946 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 3947 3948 spin_lock(&session->s_cap_lock); 3949 session->s_cap_reconnect = 0; 3950 spin_unlock(&session->s_cap_lock); 3951 3952 if (err < 0) 3953 goto fail; 3954 3955 /* check if all realms can be encoded into current message */ 3956 if (mdsc->num_snap_realms) { 3957 size_t total_len = 3958 recon_state.pagelist->length + 3959 mdsc->num_snap_realms * 3960 sizeof(struct ceph_mds_snaprealm_reconnect); 3961 if (recon_state.msg_version >= 4) { 3962 /* number of realms */ 3963 total_len += sizeof(u32); 3964 /* version, compat_version and struct_len */ 3965 total_len += mdsc->num_snap_realms * 3966 (2 * sizeof(u8) + sizeof(u32)); 3967 } 3968 if (total_len > RECONNECT_MAX_SIZE) { 3969 if (!recon_state.allow_multi) { 3970 err = -ENOSPC; 3971 goto fail; 3972 } 3973 if (recon_state.nr_caps) { 3974 err = send_reconnect_partial(&recon_state); 3975 if (err) 3976 goto fail; 3977 } 3978 recon_state.msg_version = 5; 3979 } 3980 } 3981 3982 err = encode_snap_realms(mdsc, &recon_state); 3983 if (err < 0) 3984 goto fail; 3985 3986 if (recon_state.msg_version >= 5) { 3987 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 3988 if (err < 0) 3989 goto fail; 3990 } 3991 3992 if (recon_state.nr_caps || recon_state.nr_realms) { 3993 struct page *page = 3994 list_first_entry(&recon_state.pagelist->head, 3995 struct page, lru); 3996 __le32 *addr = kmap_atomic(page); 3997 if (recon_state.nr_caps) { 3998 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 3999 *addr = cpu_to_le32(recon_state.nr_caps); 4000 } else if (recon_state.msg_version >= 4) { 4001 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 4002 } 4003 kunmap_atomic(addr); 4004 } 4005 4006 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 4007 if (recon_state.msg_version >= 4) 4008 reply->hdr.compat_version = cpu_to_le16(4); 4009 4010 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 4011 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 4012 4013 ceph_con_send(&session->s_con, reply); 4014 4015 mutex_unlock(&session->s_mutex); 4016 4017 mutex_lock(&mdsc->mutex); 4018 __wake_requests(mdsc, &session->s_waiting); 4019 mutex_unlock(&mdsc->mutex); 4020 4021 up_read(&mdsc->snap_rwsem); 4022 ceph_pagelist_release(recon_state.pagelist); 4023 return; 4024 4025 fail: 4026 ceph_msg_put(reply); 4027 up_read(&mdsc->snap_rwsem); 4028 mutex_unlock(&session->s_mutex); 4029 fail_nomsg: 4030 ceph_pagelist_release(recon_state.pagelist); 4031 fail_nopagelist: 4032 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 4033 return; 4034 } 4035 4036 4037 /* 4038 * compare old and new mdsmaps, kicking requests 4039 * and closing out old connections as necessary 4040 * 4041 * called under mdsc->mutex. 4042 */ 4043 static void check_new_map(struct ceph_mds_client *mdsc, 4044 struct ceph_mdsmap *newmap, 4045 struct ceph_mdsmap *oldmap) 4046 { 4047 int i; 4048 int oldstate, newstate; 4049 struct ceph_mds_session *s; 4050 4051 dout("check_new_map new %u old %u\n", 4052 newmap->m_epoch, oldmap->m_epoch); 4053 4054 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4055 if (!mdsc->sessions[i]) 4056 continue; 4057 s = mdsc->sessions[i]; 4058 oldstate = ceph_mdsmap_get_state(oldmap, i); 4059 newstate = ceph_mdsmap_get_state(newmap, i); 4060 4061 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 4062 i, ceph_mds_state_name(oldstate), 4063 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 4064 ceph_mds_state_name(newstate), 4065 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 4066 ceph_session_state_name(s->s_state)); 4067 4068 if (i >= newmap->possible_max_rank) { 4069 /* force close session for stopped mds */ 4070 ceph_get_mds_session(s); 4071 __unregister_session(mdsc, s); 4072 __wake_requests(mdsc, &s->s_waiting); 4073 mutex_unlock(&mdsc->mutex); 4074 4075 mutex_lock(&s->s_mutex); 4076 cleanup_session_requests(mdsc, s); 4077 remove_session_caps(s); 4078 mutex_unlock(&s->s_mutex); 4079 4080 ceph_put_mds_session(s); 4081 4082 mutex_lock(&mdsc->mutex); 4083 kick_requests(mdsc, i); 4084 continue; 4085 } 4086 4087 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 4088 ceph_mdsmap_get_addr(newmap, i), 4089 sizeof(struct ceph_entity_addr))) { 4090 /* just close it */ 4091 mutex_unlock(&mdsc->mutex); 4092 mutex_lock(&s->s_mutex); 4093 mutex_lock(&mdsc->mutex); 4094 ceph_con_close(&s->s_con); 4095 mutex_unlock(&s->s_mutex); 4096 s->s_state = CEPH_MDS_SESSION_RESTARTING; 4097 } else if (oldstate == newstate) { 4098 continue; /* nothing new with this mds */ 4099 } 4100 4101 /* 4102 * send reconnect? 4103 */ 4104 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 4105 newstate >= CEPH_MDS_STATE_RECONNECT) { 4106 mutex_unlock(&mdsc->mutex); 4107 send_mds_reconnect(mdsc, s); 4108 mutex_lock(&mdsc->mutex); 4109 } 4110 4111 /* 4112 * kick request on any mds that has gone active. 4113 */ 4114 if (oldstate < CEPH_MDS_STATE_ACTIVE && 4115 newstate >= CEPH_MDS_STATE_ACTIVE) { 4116 if (oldstate != CEPH_MDS_STATE_CREATING && 4117 oldstate != CEPH_MDS_STATE_STARTING) 4118 pr_info("mds%d recovery completed\n", s->s_mds); 4119 kick_requests(mdsc, i); 4120 mutex_unlock(&mdsc->mutex); 4121 mutex_lock(&s->s_mutex); 4122 mutex_lock(&mdsc->mutex); 4123 ceph_kick_flushing_caps(mdsc, s); 4124 mutex_unlock(&s->s_mutex); 4125 wake_up_session_caps(s, RECONNECT); 4126 } 4127 } 4128 4129 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4130 s = mdsc->sessions[i]; 4131 if (!s) 4132 continue; 4133 if (!ceph_mdsmap_is_laggy(newmap, i)) 4134 continue; 4135 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4136 s->s_state == CEPH_MDS_SESSION_HUNG || 4137 s->s_state == CEPH_MDS_SESSION_CLOSING) { 4138 dout(" connecting to export targets of laggy mds%d\n", 4139 i); 4140 __open_export_target_sessions(mdsc, s); 4141 } 4142 } 4143 } 4144 4145 4146 4147 /* 4148 * leases 4149 */ 4150 4151 /* 4152 * caller must hold session s_mutex, dentry->d_lock 4153 */ 4154 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 4155 { 4156 struct ceph_dentry_info *di = ceph_dentry(dentry); 4157 4158 ceph_put_mds_session(di->lease_session); 4159 di->lease_session = NULL; 4160 } 4161 4162 static void handle_lease(struct ceph_mds_client *mdsc, 4163 struct ceph_mds_session *session, 4164 struct ceph_msg *msg) 4165 { 4166 struct super_block *sb = mdsc->fsc->sb; 4167 struct inode *inode; 4168 struct dentry *parent, *dentry; 4169 struct ceph_dentry_info *di; 4170 int mds = session->s_mds; 4171 struct ceph_mds_lease *h = msg->front.iov_base; 4172 u32 seq; 4173 struct ceph_vino vino; 4174 struct qstr dname; 4175 int release = 0; 4176 4177 dout("handle_lease from mds%d\n", mds); 4178 4179 /* decode */ 4180 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 4181 goto bad; 4182 vino.ino = le64_to_cpu(h->ino); 4183 vino.snap = CEPH_NOSNAP; 4184 seq = le32_to_cpu(h->seq); 4185 dname.len = get_unaligned_le32(h + 1); 4186 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 4187 goto bad; 4188 dname.name = (void *)(h + 1) + sizeof(u32); 4189 4190 /* lookup inode */ 4191 inode = ceph_find_inode(sb, vino); 4192 dout("handle_lease %s, ino %llx %p %.*s\n", 4193 ceph_lease_op_name(h->action), vino.ino, inode, 4194 dname.len, dname.name); 4195 4196 mutex_lock(&session->s_mutex); 4197 session->s_seq++; 4198 4199 if (!inode) { 4200 dout("handle_lease no inode %llx\n", vino.ino); 4201 goto release; 4202 } 4203 4204 /* dentry */ 4205 parent = d_find_alias(inode); 4206 if (!parent) { 4207 dout("no parent dentry on inode %p\n", inode); 4208 WARN_ON(1); 4209 goto release; /* hrm... */ 4210 } 4211 dname.hash = full_name_hash(parent, dname.name, dname.len); 4212 dentry = d_lookup(parent, &dname); 4213 dput(parent); 4214 if (!dentry) 4215 goto release; 4216 4217 spin_lock(&dentry->d_lock); 4218 di = ceph_dentry(dentry); 4219 switch (h->action) { 4220 case CEPH_MDS_LEASE_REVOKE: 4221 if (di->lease_session == session) { 4222 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 4223 h->seq = cpu_to_le32(di->lease_seq); 4224 __ceph_mdsc_drop_dentry_lease(dentry); 4225 } 4226 release = 1; 4227 break; 4228 4229 case CEPH_MDS_LEASE_RENEW: 4230 if (di->lease_session == session && 4231 di->lease_gen == session->s_cap_gen && 4232 di->lease_renew_from && 4233 di->lease_renew_after == 0) { 4234 unsigned long duration = 4235 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 4236 4237 di->lease_seq = seq; 4238 di->time = di->lease_renew_from + duration; 4239 di->lease_renew_after = di->lease_renew_from + 4240 (duration >> 1); 4241 di->lease_renew_from = 0; 4242 } 4243 break; 4244 } 4245 spin_unlock(&dentry->d_lock); 4246 dput(dentry); 4247 4248 if (!release) 4249 goto out; 4250 4251 release: 4252 /* let's just reuse the same message */ 4253 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 4254 ceph_msg_get(msg); 4255 ceph_con_send(&session->s_con, msg); 4256 4257 out: 4258 mutex_unlock(&session->s_mutex); 4259 /* avoid calling iput_final() in mds dispatch threads */ 4260 ceph_async_iput(inode); 4261 return; 4262 4263 bad: 4264 pr_err("corrupt lease message\n"); 4265 ceph_msg_dump(msg); 4266 } 4267 4268 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 4269 struct dentry *dentry, char action, 4270 u32 seq) 4271 { 4272 struct ceph_msg *msg; 4273 struct ceph_mds_lease *lease; 4274 struct inode *dir; 4275 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 4276 4277 dout("lease_send_msg identry %p %s to mds%d\n", 4278 dentry, ceph_lease_op_name(action), session->s_mds); 4279 4280 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 4281 if (!msg) 4282 return; 4283 lease = msg->front.iov_base; 4284 lease->action = action; 4285 lease->seq = cpu_to_le32(seq); 4286 4287 spin_lock(&dentry->d_lock); 4288 dir = d_inode(dentry->d_parent); 4289 lease->ino = cpu_to_le64(ceph_ino(dir)); 4290 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 4291 4292 put_unaligned_le32(dentry->d_name.len, lease + 1); 4293 memcpy((void *)(lease + 1) + 4, 4294 dentry->d_name.name, dentry->d_name.len); 4295 spin_unlock(&dentry->d_lock); 4296 /* 4297 * if this is a preemptive lease RELEASE, no need to 4298 * flush request stream, since the actual request will 4299 * soon follow. 4300 */ 4301 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 4302 4303 ceph_con_send(&session->s_con, msg); 4304 } 4305 4306 /* 4307 * lock unlock sessions, to wait ongoing session activities 4308 */ 4309 static void lock_unlock_sessions(struct ceph_mds_client *mdsc) 4310 { 4311 int i; 4312 4313 mutex_lock(&mdsc->mutex); 4314 for (i = 0; i < mdsc->max_sessions; i++) { 4315 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4316 if (!s) 4317 continue; 4318 mutex_unlock(&mdsc->mutex); 4319 mutex_lock(&s->s_mutex); 4320 mutex_unlock(&s->s_mutex); 4321 ceph_put_mds_session(s); 4322 mutex_lock(&mdsc->mutex); 4323 } 4324 mutex_unlock(&mdsc->mutex); 4325 } 4326 4327 static void maybe_recover_session(struct ceph_mds_client *mdsc) 4328 { 4329 struct ceph_fs_client *fsc = mdsc->fsc; 4330 4331 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 4332 return; 4333 4334 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 4335 return; 4336 4337 if (!READ_ONCE(fsc->blacklisted)) 4338 return; 4339 4340 if (fsc->last_auto_reconnect && 4341 time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30)) 4342 return; 4343 4344 pr_info("auto reconnect after blacklisted\n"); 4345 fsc->last_auto_reconnect = jiffies; 4346 ceph_force_reconnect(fsc->sb); 4347 } 4348 4349 bool check_session_state(struct ceph_mds_session *s) 4350 { 4351 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 4352 dout("resending session close request for mds%d\n", 4353 s->s_mds); 4354 request_close_session(s); 4355 return false; 4356 } 4357 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 4358 if (s->s_state == CEPH_MDS_SESSION_OPEN) { 4359 s->s_state = CEPH_MDS_SESSION_HUNG; 4360 pr_info("mds%d hung\n", s->s_mds); 4361 } 4362 } 4363 if (s->s_state == CEPH_MDS_SESSION_NEW || 4364 s->s_state == CEPH_MDS_SESSION_RESTARTING || 4365 s->s_state == CEPH_MDS_SESSION_CLOSED || 4366 s->s_state == CEPH_MDS_SESSION_REJECTED) 4367 /* this mds is failed or recovering, just wait */ 4368 return false; 4369 4370 return true; 4371 } 4372 4373 /* 4374 * delayed work -- periodically trim expired leases, renew caps with mds 4375 */ 4376 static void schedule_delayed(struct ceph_mds_client *mdsc) 4377 { 4378 int delay = 5; 4379 unsigned hz = round_jiffies_relative(HZ * delay); 4380 schedule_delayed_work(&mdsc->delayed_work, hz); 4381 } 4382 4383 static void delayed_work(struct work_struct *work) 4384 { 4385 int i; 4386 struct ceph_mds_client *mdsc = 4387 container_of(work, struct ceph_mds_client, delayed_work.work); 4388 int renew_interval; 4389 int renew_caps; 4390 4391 dout("mdsc delayed_work\n"); 4392 4393 if (mdsc->stopping) 4394 return; 4395 4396 mutex_lock(&mdsc->mutex); 4397 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 4398 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 4399 mdsc->last_renew_caps); 4400 if (renew_caps) 4401 mdsc->last_renew_caps = jiffies; 4402 4403 for (i = 0; i < mdsc->max_sessions; i++) { 4404 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4405 if (!s) 4406 continue; 4407 4408 if (!check_session_state(s)) { 4409 ceph_put_mds_session(s); 4410 continue; 4411 } 4412 mutex_unlock(&mdsc->mutex); 4413 4414 mutex_lock(&s->s_mutex); 4415 if (renew_caps) 4416 send_renew_caps(mdsc, s); 4417 else 4418 ceph_con_keepalive(&s->s_con); 4419 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4420 s->s_state == CEPH_MDS_SESSION_HUNG) 4421 ceph_send_cap_releases(mdsc, s); 4422 mutex_unlock(&s->s_mutex); 4423 ceph_put_mds_session(s); 4424 4425 mutex_lock(&mdsc->mutex); 4426 } 4427 mutex_unlock(&mdsc->mutex); 4428 4429 ceph_check_delayed_caps(mdsc); 4430 4431 ceph_queue_cap_reclaim_work(mdsc); 4432 4433 ceph_trim_snapid_map(mdsc); 4434 4435 maybe_recover_session(mdsc); 4436 4437 schedule_delayed(mdsc); 4438 } 4439 4440 int ceph_mdsc_init(struct ceph_fs_client *fsc) 4441 4442 { 4443 struct ceph_mds_client *mdsc; 4444 int err; 4445 4446 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 4447 if (!mdsc) 4448 return -ENOMEM; 4449 mdsc->fsc = fsc; 4450 mutex_init(&mdsc->mutex); 4451 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 4452 if (!mdsc->mdsmap) { 4453 err = -ENOMEM; 4454 goto err_mdsc; 4455 } 4456 4457 init_completion(&mdsc->safe_umount_waiters); 4458 init_waitqueue_head(&mdsc->session_close_wq); 4459 INIT_LIST_HEAD(&mdsc->waiting_for_map); 4460 mdsc->sessions = NULL; 4461 atomic_set(&mdsc->num_sessions, 0); 4462 mdsc->max_sessions = 0; 4463 mdsc->stopping = 0; 4464 atomic64_set(&mdsc->quotarealms_count, 0); 4465 mdsc->quotarealms_inodes = RB_ROOT; 4466 mutex_init(&mdsc->quotarealms_inodes_mutex); 4467 mdsc->last_snap_seq = 0; 4468 init_rwsem(&mdsc->snap_rwsem); 4469 mdsc->snap_realms = RB_ROOT; 4470 INIT_LIST_HEAD(&mdsc->snap_empty); 4471 mdsc->num_snap_realms = 0; 4472 spin_lock_init(&mdsc->snap_empty_lock); 4473 mdsc->last_tid = 0; 4474 mdsc->oldest_tid = 0; 4475 mdsc->request_tree = RB_ROOT; 4476 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 4477 mdsc->last_renew_caps = jiffies; 4478 INIT_LIST_HEAD(&mdsc->cap_delay_list); 4479 INIT_LIST_HEAD(&mdsc->cap_wait_list); 4480 spin_lock_init(&mdsc->cap_delay_lock); 4481 INIT_LIST_HEAD(&mdsc->snap_flush_list); 4482 spin_lock_init(&mdsc->snap_flush_lock); 4483 mdsc->last_cap_flush_tid = 1; 4484 INIT_LIST_HEAD(&mdsc->cap_flush_list); 4485 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 4486 mdsc->num_cap_flushing = 0; 4487 spin_lock_init(&mdsc->cap_dirty_lock); 4488 init_waitqueue_head(&mdsc->cap_flushing_wq); 4489 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 4490 atomic_set(&mdsc->cap_reclaim_pending, 0); 4491 err = ceph_metric_init(&mdsc->metric); 4492 if (err) 4493 goto err_mdsmap; 4494 4495 spin_lock_init(&mdsc->dentry_list_lock); 4496 INIT_LIST_HEAD(&mdsc->dentry_leases); 4497 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 4498 4499 ceph_caps_init(mdsc); 4500 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 4501 4502 spin_lock_init(&mdsc->snapid_map_lock); 4503 mdsc->snapid_map_tree = RB_ROOT; 4504 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 4505 4506 init_rwsem(&mdsc->pool_perm_rwsem); 4507 mdsc->pool_perm_tree = RB_ROOT; 4508 4509 strscpy(mdsc->nodename, utsname()->nodename, 4510 sizeof(mdsc->nodename)); 4511 4512 fsc->mdsc = mdsc; 4513 return 0; 4514 4515 err_mdsmap: 4516 kfree(mdsc->mdsmap); 4517 err_mdsc: 4518 kfree(mdsc); 4519 return err; 4520 } 4521 4522 /* 4523 * Wait for safe replies on open mds requests. If we time out, drop 4524 * all requests from the tree to avoid dangling dentry refs. 4525 */ 4526 static void wait_requests(struct ceph_mds_client *mdsc) 4527 { 4528 struct ceph_options *opts = mdsc->fsc->client->options; 4529 struct ceph_mds_request *req; 4530 4531 mutex_lock(&mdsc->mutex); 4532 if (__get_oldest_req(mdsc)) { 4533 mutex_unlock(&mdsc->mutex); 4534 4535 dout("wait_requests waiting for requests\n"); 4536 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 4537 ceph_timeout_jiffies(opts->mount_timeout)); 4538 4539 /* tear down remaining requests */ 4540 mutex_lock(&mdsc->mutex); 4541 while ((req = __get_oldest_req(mdsc))) { 4542 dout("wait_requests timed out on tid %llu\n", 4543 req->r_tid); 4544 list_del_init(&req->r_wait); 4545 __unregister_request(mdsc, req); 4546 } 4547 } 4548 mutex_unlock(&mdsc->mutex); 4549 dout("wait_requests done\n"); 4550 } 4551 4552 /* 4553 * called before mount is ro, and before dentries are torn down. 4554 * (hmm, does this still race with new lookups?) 4555 */ 4556 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 4557 { 4558 dout("pre_umount\n"); 4559 mdsc->stopping = 1; 4560 4561 lock_unlock_sessions(mdsc); 4562 ceph_flush_dirty_caps(mdsc); 4563 wait_requests(mdsc); 4564 4565 /* 4566 * wait for reply handlers to drop their request refs and 4567 * their inode/dcache refs 4568 */ 4569 ceph_msgr_flush(); 4570 4571 ceph_cleanup_quotarealms_inodes(mdsc); 4572 } 4573 4574 /* 4575 * wait for all write mds requests to flush. 4576 */ 4577 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 4578 { 4579 struct ceph_mds_request *req = NULL, *nextreq; 4580 struct rb_node *n; 4581 4582 mutex_lock(&mdsc->mutex); 4583 dout("wait_unsafe_requests want %lld\n", want_tid); 4584 restart: 4585 req = __get_oldest_req(mdsc); 4586 while (req && req->r_tid <= want_tid) { 4587 /* find next request */ 4588 n = rb_next(&req->r_node); 4589 if (n) 4590 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 4591 else 4592 nextreq = NULL; 4593 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 4594 (req->r_op & CEPH_MDS_OP_WRITE)) { 4595 /* write op */ 4596 ceph_mdsc_get_request(req); 4597 if (nextreq) 4598 ceph_mdsc_get_request(nextreq); 4599 mutex_unlock(&mdsc->mutex); 4600 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 4601 req->r_tid, want_tid); 4602 wait_for_completion(&req->r_safe_completion); 4603 mutex_lock(&mdsc->mutex); 4604 ceph_mdsc_put_request(req); 4605 if (!nextreq) 4606 break; /* next dne before, so we're done! */ 4607 if (RB_EMPTY_NODE(&nextreq->r_node)) { 4608 /* next request was removed from tree */ 4609 ceph_mdsc_put_request(nextreq); 4610 goto restart; 4611 } 4612 ceph_mdsc_put_request(nextreq); /* won't go away */ 4613 } 4614 req = nextreq; 4615 } 4616 mutex_unlock(&mdsc->mutex); 4617 dout("wait_unsafe_requests done\n"); 4618 } 4619 4620 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 4621 { 4622 u64 want_tid, want_flush; 4623 4624 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4625 return; 4626 4627 dout("sync\n"); 4628 mutex_lock(&mdsc->mutex); 4629 want_tid = mdsc->last_tid; 4630 mutex_unlock(&mdsc->mutex); 4631 4632 ceph_flush_dirty_caps(mdsc); 4633 spin_lock(&mdsc->cap_dirty_lock); 4634 want_flush = mdsc->last_cap_flush_tid; 4635 if (!list_empty(&mdsc->cap_flush_list)) { 4636 struct ceph_cap_flush *cf = 4637 list_last_entry(&mdsc->cap_flush_list, 4638 struct ceph_cap_flush, g_list); 4639 cf->wake = true; 4640 } 4641 spin_unlock(&mdsc->cap_dirty_lock); 4642 4643 dout("sync want tid %lld flush_seq %lld\n", 4644 want_tid, want_flush); 4645 4646 wait_unsafe_requests(mdsc, want_tid); 4647 wait_caps_flush(mdsc, want_flush); 4648 } 4649 4650 /* 4651 * true if all sessions are closed, or we force unmount 4652 */ 4653 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 4654 { 4655 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4656 return true; 4657 return atomic_read(&mdsc->num_sessions) <= skipped; 4658 } 4659 4660 /* 4661 * called after sb is ro. 4662 */ 4663 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 4664 { 4665 struct ceph_options *opts = mdsc->fsc->client->options; 4666 struct ceph_mds_session *session; 4667 int i; 4668 int skipped = 0; 4669 4670 dout("close_sessions\n"); 4671 4672 /* close sessions */ 4673 mutex_lock(&mdsc->mutex); 4674 for (i = 0; i < mdsc->max_sessions; i++) { 4675 session = __ceph_lookup_mds_session(mdsc, i); 4676 if (!session) 4677 continue; 4678 mutex_unlock(&mdsc->mutex); 4679 mutex_lock(&session->s_mutex); 4680 if (__close_session(mdsc, session) <= 0) 4681 skipped++; 4682 mutex_unlock(&session->s_mutex); 4683 ceph_put_mds_session(session); 4684 mutex_lock(&mdsc->mutex); 4685 } 4686 mutex_unlock(&mdsc->mutex); 4687 4688 dout("waiting for sessions to close\n"); 4689 wait_event_timeout(mdsc->session_close_wq, 4690 done_closing_sessions(mdsc, skipped), 4691 ceph_timeout_jiffies(opts->mount_timeout)); 4692 4693 /* tear down remaining sessions */ 4694 mutex_lock(&mdsc->mutex); 4695 for (i = 0; i < mdsc->max_sessions; i++) { 4696 if (mdsc->sessions[i]) { 4697 session = ceph_get_mds_session(mdsc->sessions[i]); 4698 __unregister_session(mdsc, session); 4699 mutex_unlock(&mdsc->mutex); 4700 mutex_lock(&session->s_mutex); 4701 remove_session_caps(session); 4702 mutex_unlock(&session->s_mutex); 4703 ceph_put_mds_session(session); 4704 mutex_lock(&mdsc->mutex); 4705 } 4706 } 4707 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 4708 mutex_unlock(&mdsc->mutex); 4709 4710 ceph_cleanup_snapid_map(mdsc); 4711 ceph_cleanup_empty_realms(mdsc); 4712 4713 cancel_work_sync(&mdsc->cap_reclaim_work); 4714 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4715 4716 dout("stopped\n"); 4717 } 4718 4719 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 4720 { 4721 struct ceph_mds_session *session; 4722 int mds; 4723 4724 dout("force umount\n"); 4725 4726 mutex_lock(&mdsc->mutex); 4727 for (mds = 0; mds < mdsc->max_sessions; mds++) { 4728 session = __ceph_lookup_mds_session(mdsc, mds); 4729 if (!session) 4730 continue; 4731 4732 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 4733 __unregister_session(mdsc, session); 4734 __wake_requests(mdsc, &session->s_waiting); 4735 mutex_unlock(&mdsc->mutex); 4736 4737 mutex_lock(&session->s_mutex); 4738 __close_session(mdsc, session); 4739 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 4740 cleanup_session_requests(mdsc, session); 4741 remove_session_caps(session); 4742 } 4743 mutex_unlock(&session->s_mutex); 4744 ceph_put_mds_session(session); 4745 4746 mutex_lock(&mdsc->mutex); 4747 kick_requests(mdsc, mds); 4748 } 4749 __wake_requests(mdsc, &mdsc->waiting_for_map); 4750 mutex_unlock(&mdsc->mutex); 4751 } 4752 4753 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 4754 { 4755 dout("stop\n"); 4756 /* 4757 * Make sure the delayed work stopped before releasing 4758 * the resources. 4759 * 4760 * Because the cancel_delayed_work_sync() will only 4761 * guarantee that the work finishes executing. But the 4762 * delayed work will re-arm itself again after that. 4763 */ 4764 flush_delayed_work(&mdsc->delayed_work); 4765 4766 if (mdsc->mdsmap) 4767 ceph_mdsmap_destroy(mdsc->mdsmap); 4768 kfree(mdsc->sessions); 4769 ceph_caps_finalize(mdsc); 4770 ceph_pool_perm_destroy(mdsc); 4771 } 4772 4773 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 4774 { 4775 struct ceph_mds_client *mdsc = fsc->mdsc; 4776 dout("mdsc_destroy %p\n", mdsc); 4777 4778 if (!mdsc) 4779 return; 4780 4781 /* flush out any connection work with references to us */ 4782 ceph_msgr_flush(); 4783 4784 ceph_mdsc_stop(mdsc); 4785 4786 ceph_metric_destroy(&mdsc->metric); 4787 4788 flush_delayed_work(&mdsc->metric.delayed_work); 4789 fsc->mdsc = NULL; 4790 kfree(mdsc); 4791 dout("mdsc_destroy %p done\n", mdsc); 4792 } 4793 4794 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4795 { 4796 struct ceph_fs_client *fsc = mdsc->fsc; 4797 const char *mds_namespace = fsc->mount_options->mds_namespace; 4798 void *p = msg->front.iov_base; 4799 void *end = p + msg->front.iov_len; 4800 u32 epoch; 4801 u32 map_len; 4802 u32 num_fs; 4803 u32 mount_fscid = (u32)-1; 4804 u8 struct_v, struct_cv; 4805 int err = -EINVAL; 4806 4807 ceph_decode_need(&p, end, sizeof(u32), bad); 4808 epoch = ceph_decode_32(&p); 4809 4810 dout("handle_fsmap epoch %u\n", epoch); 4811 4812 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4813 struct_v = ceph_decode_8(&p); 4814 struct_cv = ceph_decode_8(&p); 4815 map_len = ceph_decode_32(&p); 4816 4817 ceph_decode_need(&p, end, sizeof(u32) * 3, bad); 4818 p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */ 4819 4820 num_fs = ceph_decode_32(&p); 4821 while (num_fs-- > 0) { 4822 void *info_p, *info_end; 4823 u32 info_len; 4824 u8 info_v, info_cv; 4825 u32 fscid, namelen; 4826 4827 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4828 info_v = ceph_decode_8(&p); 4829 info_cv = ceph_decode_8(&p); 4830 info_len = ceph_decode_32(&p); 4831 ceph_decode_need(&p, end, info_len, bad); 4832 info_p = p; 4833 info_end = p + info_len; 4834 p = info_end; 4835 4836 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 4837 fscid = ceph_decode_32(&info_p); 4838 namelen = ceph_decode_32(&info_p); 4839 ceph_decode_need(&info_p, info_end, namelen, bad); 4840 4841 if (mds_namespace && 4842 strlen(mds_namespace) == namelen && 4843 !strncmp(mds_namespace, (char *)info_p, namelen)) { 4844 mount_fscid = fscid; 4845 break; 4846 } 4847 } 4848 4849 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 4850 if (mount_fscid != (u32)-1) { 4851 fsc->client->monc.fs_cluster_id = mount_fscid; 4852 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 4853 0, true); 4854 ceph_monc_renew_subs(&fsc->client->monc); 4855 } else { 4856 err = -ENOENT; 4857 goto err_out; 4858 } 4859 return; 4860 4861 bad: 4862 pr_err("error decoding fsmap\n"); 4863 err_out: 4864 mutex_lock(&mdsc->mutex); 4865 mdsc->mdsmap_err = err; 4866 __wake_requests(mdsc, &mdsc->waiting_for_map); 4867 mutex_unlock(&mdsc->mutex); 4868 } 4869 4870 /* 4871 * handle mds map update. 4872 */ 4873 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4874 { 4875 u32 epoch; 4876 u32 maplen; 4877 void *p = msg->front.iov_base; 4878 void *end = p + msg->front.iov_len; 4879 struct ceph_mdsmap *newmap, *oldmap; 4880 struct ceph_fsid fsid; 4881 int err = -EINVAL; 4882 4883 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 4884 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 4885 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 4886 return; 4887 epoch = ceph_decode_32(&p); 4888 maplen = ceph_decode_32(&p); 4889 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 4890 4891 /* do we need it? */ 4892 mutex_lock(&mdsc->mutex); 4893 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 4894 dout("handle_map epoch %u <= our %u\n", 4895 epoch, mdsc->mdsmap->m_epoch); 4896 mutex_unlock(&mdsc->mutex); 4897 return; 4898 } 4899 4900 newmap = ceph_mdsmap_decode(&p, end); 4901 if (IS_ERR(newmap)) { 4902 err = PTR_ERR(newmap); 4903 goto bad_unlock; 4904 } 4905 4906 /* swap into place */ 4907 if (mdsc->mdsmap) { 4908 oldmap = mdsc->mdsmap; 4909 mdsc->mdsmap = newmap; 4910 check_new_map(mdsc, newmap, oldmap); 4911 ceph_mdsmap_destroy(oldmap); 4912 } else { 4913 mdsc->mdsmap = newmap; /* first mds map */ 4914 } 4915 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 4916 MAX_LFS_FILESIZE); 4917 4918 __wake_requests(mdsc, &mdsc->waiting_for_map); 4919 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 4920 mdsc->mdsmap->m_epoch); 4921 4922 mutex_unlock(&mdsc->mutex); 4923 schedule_delayed(mdsc); 4924 return; 4925 4926 bad_unlock: 4927 mutex_unlock(&mdsc->mutex); 4928 bad: 4929 pr_err("error decoding mdsmap %d\n", err); 4930 return; 4931 } 4932 4933 static struct ceph_connection *con_get(struct ceph_connection *con) 4934 { 4935 struct ceph_mds_session *s = con->private; 4936 4937 if (ceph_get_mds_session(s)) 4938 return con; 4939 return NULL; 4940 } 4941 4942 static void con_put(struct ceph_connection *con) 4943 { 4944 struct ceph_mds_session *s = con->private; 4945 4946 ceph_put_mds_session(s); 4947 } 4948 4949 /* 4950 * if the client is unresponsive for long enough, the mds will kill 4951 * the session entirely. 4952 */ 4953 static void peer_reset(struct ceph_connection *con) 4954 { 4955 struct ceph_mds_session *s = con->private; 4956 struct ceph_mds_client *mdsc = s->s_mdsc; 4957 4958 pr_warn("mds%d closed our session\n", s->s_mds); 4959 send_mds_reconnect(mdsc, s); 4960 } 4961 4962 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 4963 { 4964 struct ceph_mds_session *s = con->private; 4965 struct ceph_mds_client *mdsc = s->s_mdsc; 4966 int type = le16_to_cpu(msg->hdr.type); 4967 4968 mutex_lock(&mdsc->mutex); 4969 if (__verify_registered_session(mdsc, s) < 0) { 4970 mutex_unlock(&mdsc->mutex); 4971 goto out; 4972 } 4973 mutex_unlock(&mdsc->mutex); 4974 4975 switch (type) { 4976 case CEPH_MSG_MDS_MAP: 4977 ceph_mdsc_handle_mdsmap(mdsc, msg); 4978 break; 4979 case CEPH_MSG_FS_MAP_USER: 4980 ceph_mdsc_handle_fsmap(mdsc, msg); 4981 break; 4982 case CEPH_MSG_CLIENT_SESSION: 4983 handle_session(s, msg); 4984 break; 4985 case CEPH_MSG_CLIENT_REPLY: 4986 handle_reply(s, msg); 4987 break; 4988 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 4989 handle_forward(mdsc, s, msg); 4990 break; 4991 case CEPH_MSG_CLIENT_CAPS: 4992 ceph_handle_caps(s, msg); 4993 break; 4994 case CEPH_MSG_CLIENT_SNAP: 4995 ceph_handle_snap(mdsc, s, msg); 4996 break; 4997 case CEPH_MSG_CLIENT_LEASE: 4998 handle_lease(mdsc, s, msg); 4999 break; 5000 case CEPH_MSG_CLIENT_QUOTA: 5001 ceph_handle_quota(mdsc, s, msg); 5002 break; 5003 5004 default: 5005 pr_err("received unknown message type %d %s\n", type, 5006 ceph_msg_type_name(type)); 5007 } 5008 out: 5009 ceph_msg_put(msg); 5010 } 5011 5012 /* 5013 * authentication 5014 */ 5015 5016 /* 5017 * Note: returned pointer is the address of a structure that's 5018 * managed separately. Caller must *not* attempt to free it. 5019 */ 5020 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 5021 int *proto, int force_new) 5022 { 5023 struct ceph_mds_session *s = con->private; 5024 struct ceph_mds_client *mdsc = s->s_mdsc; 5025 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5026 struct ceph_auth_handshake *auth = &s->s_auth; 5027 5028 if (force_new && auth->authorizer) { 5029 ceph_auth_destroy_authorizer(auth->authorizer); 5030 auth->authorizer = NULL; 5031 } 5032 if (!auth->authorizer) { 5033 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 5034 auth); 5035 if (ret) 5036 return ERR_PTR(ret); 5037 } else { 5038 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 5039 auth); 5040 if (ret) 5041 return ERR_PTR(ret); 5042 } 5043 *proto = ac->protocol; 5044 5045 return auth; 5046 } 5047 5048 static int add_authorizer_challenge(struct ceph_connection *con, 5049 void *challenge_buf, int challenge_buf_len) 5050 { 5051 struct ceph_mds_session *s = con->private; 5052 struct ceph_mds_client *mdsc = s->s_mdsc; 5053 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5054 5055 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 5056 challenge_buf, challenge_buf_len); 5057 } 5058 5059 static int verify_authorizer_reply(struct ceph_connection *con) 5060 { 5061 struct ceph_mds_session *s = con->private; 5062 struct ceph_mds_client *mdsc = s->s_mdsc; 5063 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5064 5065 return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer); 5066 } 5067 5068 static int invalidate_authorizer(struct ceph_connection *con) 5069 { 5070 struct ceph_mds_session *s = con->private; 5071 struct ceph_mds_client *mdsc = s->s_mdsc; 5072 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5073 5074 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 5075 5076 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 5077 } 5078 5079 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 5080 struct ceph_msg_header *hdr, int *skip) 5081 { 5082 struct ceph_msg *msg; 5083 int type = (int) le16_to_cpu(hdr->type); 5084 int front_len = (int) le32_to_cpu(hdr->front_len); 5085 5086 if (con->in_msg) 5087 return con->in_msg; 5088 5089 *skip = 0; 5090 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 5091 if (!msg) { 5092 pr_err("unable to allocate msg type %d len %d\n", 5093 type, front_len); 5094 return NULL; 5095 } 5096 5097 return msg; 5098 } 5099 5100 static int mds_sign_message(struct ceph_msg *msg) 5101 { 5102 struct ceph_mds_session *s = msg->con->private; 5103 struct ceph_auth_handshake *auth = &s->s_auth; 5104 5105 return ceph_auth_sign_message(auth, msg); 5106 } 5107 5108 static int mds_check_message_signature(struct ceph_msg *msg) 5109 { 5110 struct ceph_mds_session *s = msg->con->private; 5111 struct ceph_auth_handshake *auth = &s->s_auth; 5112 5113 return ceph_auth_check_message_signature(auth, msg); 5114 } 5115 5116 static const struct ceph_connection_operations mds_con_ops = { 5117 .get = con_get, 5118 .put = con_put, 5119 .dispatch = dispatch, 5120 .get_authorizer = get_authorizer, 5121 .add_authorizer_challenge = add_authorizer_challenge, 5122 .verify_authorizer_reply = verify_authorizer_reply, 5123 .invalidate_authorizer = invalidate_authorizer, 5124 .peer_reset = peer_reset, 5125 .alloc_msg = mds_alloc_msg, 5126 .sign_message = mds_sign_message, 5127 .check_message_signature = mds_check_message_signature, 5128 }; 5129 5130 /* eof */ 5131