1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 #include <linux/bitmap.h> 15 16 #include "super.h" 17 #include "mds_client.h" 18 19 #include <linux/ceph/ceph_features.h> 20 #include <linux/ceph/messenger.h> 21 #include <linux/ceph/decode.h> 22 #include <linux/ceph/pagelist.h> 23 #include <linux/ceph/auth.h> 24 #include <linux/ceph/debugfs.h> 25 26 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 27 28 /* 29 * A cluster of MDS (metadata server) daemons is responsible for 30 * managing the file system namespace (the directory hierarchy and 31 * inodes) and for coordinating shared access to storage. Metadata is 32 * partitioning hierarchically across a number of servers, and that 33 * partition varies over time as the cluster adjusts the distribution 34 * in order to balance load. 35 * 36 * The MDS client is primarily responsible to managing synchronous 37 * metadata requests for operations like open, unlink, and so forth. 38 * If there is a MDS failure, we find out about it when we (possibly 39 * request and) receive a new MDS map, and can resubmit affected 40 * requests. 41 * 42 * For the most part, though, we take advantage of a lossless 43 * communications channel to the MDS, and do not need to worry about 44 * timing out or resubmitting requests. 45 * 46 * We maintain a stateful "session" with each MDS we interact with. 47 * Within each session, we sent periodic heartbeat messages to ensure 48 * any capabilities or leases we have been issues remain valid. If 49 * the session times out and goes stale, our leases and capabilities 50 * are no longer valid. 51 */ 52 53 struct ceph_reconnect_state { 54 struct ceph_mds_session *session; 55 int nr_caps, nr_realms; 56 struct ceph_pagelist *pagelist; 57 unsigned msg_version; 58 bool allow_multi; 59 }; 60 61 static void __wake_requests(struct ceph_mds_client *mdsc, 62 struct list_head *head); 63 static void ceph_cap_release_work(struct work_struct *work); 64 static void ceph_cap_reclaim_work(struct work_struct *work); 65 66 static const struct ceph_connection_operations mds_con_ops; 67 68 69 /* 70 * mds reply parsing 71 */ 72 73 static int parse_reply_info_quota(void **p, void *end, 74 struct ceph_mds_reply_info_in *info) 75 { 76 u8 struct_v, struct_compat; 77 u32 struct_len; 78 79 ceph_decode_8_safe(p, end, struct_v, bad); 80 ceph_decode_8_safe(p, end, struct_compat, bad); 81 /* struct_v is expected to be >= 1. we only 82 * understand encoding with struct_compat == 1. */ 83 if (!struct_v || struct_compat != 1) 84 goto bad; 85 ceph_decode_32_safe(p, end, struct_len, bad); 86 ceph_decode_need(p, end, struct_len, bad); 87 end = *p + struct_len; 88 ceph_decode_64_safe(p, end, info->max_bytes, bad); 89 ceph_decode_64_safe(p, end, info->max_files, bad); 90 *p = end; 91 return 0; 92 bad: 93 return -EIO; 94 } 95 96 /* 97 * parse individual inode info 98 */ 99 static int parse_reply_info_in(void **p, void *end, 100 struct ceph_mds_reply_info_in *info, 101 u64 features) 102 { 103 int err = 0; 104 u8 struct_v = 0; 105 106 if (features == (u64)-1) { 107 u32 struct_len; 108 u8 struct_compat; 109 ceph_decode_8_safe(p, end, struct_v, bad); 110 ceph_decode_8_safe(p, end, struct_compat, bad); 111 /* struct_v is expected to be >= 1. we only understand 112 * encoding with struct_compat == 1. */ 113 if (!struct_v || struct_compat != 1) 114 goto bad; 115 ceph_decode_32_safe(p, end, struct_len, bad); 116 ceph_decode_need(p, end, struct_len, bad); 117 end = *p + struct_len; 118 } 119 120 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 121 info->in = *p; 122 *p += sizeof(struct ceph_mds_reply_inode) + 123 sizeof(*info->in->fragtree.splits) * 124 le32_to_cpu(info->in->fragtree.nsplits); 125 126 ceph_decode_32_safe(p, end, info->symlink_len, bad); 127 ceph_decode_need(p, end, info->symlink_len, bad); 128 info->symlink = *p; 129 *p += info->symlink_len; 130 131 ceph_decode_copy_safe(p, end, &info->dir_layout, 132 sizeof(info->dir_layout), bad); 133 ceph_decode_32_safe(p, end, info->xattr_len, bad); 134 ceph_decode_need(p, end, info->xattr_len, bad); 135 info->xattr_data = *p; 136 *p += info->xattr_len; 137 138 if (features == (u64)-1) { 139 /* inline data */ 140 ceph_decode_64_safe(p, end, info->inline_version, bad); 141 ceph_decode_32_safe(p, end, info->inline_len, bad); 142 ceph_decode_need(p, end, info->inline_len, bad); 143 info->inline_data = *p; 144 *p += info->inline_len; 145 /* quota */ 146 err = parse_reply_info_quota(p, end, info); 147 if (err < 0) 148 goto out_bad; 149 /* pool namespace */ 150 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 151 if (info->pool_ns_len > 0) { 152 ceph_decode_need(p, end, info->pool_ns_len, bad); 153 info->pool_ns_data = *p; 154 *p += info->pool_ns_len; 155 } 156 157 /* btime */ 158 ceph_decode_need(p, end, sizeof(info->btime), bad); 159 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 160 161 /* change attribute */ 162 ceph_decode_64_safe(p, end, info->change_attr, bad); 163 164 /* dir pin */ 165 if (struct_v >= 2) { 166 ceph_decode_32_safe(p, end, info->dir_pin, bad); 167 } else { 168 info->dir_pin = -ENODATA; 169 } 170 171 /* snapshot birth time, remains zero for v<=2 */ 172 if (struct_v >= 3) { 173 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 174 ceph_decode_copy(p, &info->snap_btime, 175 sizeof(info->snap_btime)); 176 } else { 177 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 178 } 179 180 /* snapshot count, remains zero for v<=3 */ 181 if (struct_v >= 4) { 182 ceph_decode_64_safe(p, end, info->rsnaps, bad); 183 } else { 184 info->rsnaps = 0; 185 } 186 187 *p = end; 188 } else { 189 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 190 ceph_decode_64_safe(p, end, info->inline_version, bad); 191 ceph_decode_32_safe(p, end, info->inline_len, bad); 192 ceph_decode_need(p, end, info->inline_len, bad); 193 info->inline_data = *p; 194 *p += info->inline_len; 195 } else 196 info->inline_version = CEPH_INLINE_NONE; 197 198 if (features & CEPH_FEATURE_MDS_QUOTA) { 199 err = parse_reply_info_quota(p, end, info); 200 if (err < 0) 201 goto out_bad; 202 } else { 203 info->max_bytes = 0; 204 info->max_files = 0; 205 } 206 207 info->pool_ns_len = 0; 208 info->pool_ns_data = NULL; 209 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 210 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 211 if (info->pool_ns_len > 0) { 212 ceph_decode_need(p, end, info->pool_ns_len, bad); 213 info->pool_ns_data = *p; 214 *p += info->pool_ns_len; 215 } 216 } 217 218 if (features & CEPH_FEATURE_FS_BTIME) { 219 ceph_decode_need(p, end, sizeof(info->btime), bad); 220 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 221 ceph_decode_64_safe(p, end, info->change_attr, bad); 222 } 223 224 info->dir_pin = -ENODATA; 225 /* info->snap_btime and info->rsnaps remain zero */ 226 } 227 return 0; 228 bad: 229 err = -EIO; 230 out_bad: 231 return err; 232 } 233 234 static int parse_reply_info_dir(void **p, void *end, 235 struct ceph_mds_reply_dirfrag **dirfrag, 236 u64 features) 237 { 238 if (features == (u64)-1) { 239 u8 struct_v, struct_compat; 240 u32 struct_len; 241 ceph_decode_8_safe(p, end, struct_v, bad); 242 ceph_decode_8_safe(p, end, struct_compat, bad); 243 /* struct_v is expected to be >= 1. we only understand 244 * encoding whose struct_compat == 1. */ 245 if (!struct_v || struct_compat != 1) 246 goto bad; 247 ceph_decode_32_safe(p, end, struct_len, bad); 248 ceph_decode_need(p, end, struct_len, bad); 249 end = *p + struct_len; 250 } 251 252 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 253 *dirfrag = *p; 254 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 255 if (unlikely(*p > end)) 256 goto bad; 257 if (features == (u64)-1) 258 *p = end; 259 return 0; 260 bad: 261 return -EIO; 262 } 263 264 static int parse_reply_info_lease(void **p, void *end, 265 struct ceph_mds_reply_lease **lease, 266 u64 features) 267 { 268 if (features == (u64)-1) { 269 u8 struct_v, struct_compat; 270 u32 struct_len; 271 ceph_decode_8_safe(p, end, struct_v, bad); 272 ceph_decode_8_safe(p, end, struct_compat, bad); 273 /* struct_v is expected to be >= 1. we only understand 274 * encoding whose struct_compat == 1. */ 275 if (!struct_v || struct_compat != 1) 276 goto bad; 277 ceph_decode_32_safe(p, end, struct_len, bad); 278 ceph_decode_need(p, end, struct_len, bad); 279 end = *p + struct_len; 280 } 281 282 ceph_decode_need(p, end, sizeof(**lease), bad); 283 *lease = *p; 284 *p += sizeof(**lease); 285 if (features == (u64)-1) 286 *p = end; 287 return 0; 288 bad: 289 return -EIO; 290 } 291 292 /* 293 * parse a normal reply, which may contain a (dir+)dentry and/or a 294 * target inode. 295 */ 296 static int parse_reply_info_trace(void **p, void *end, 297 struct ceph_mds_reply_info_parsed *info, 298 u64 features) 299 { 300 int err; 301 302 if (info->head->is_dentry) { 303 err = parse_reply_info_in(p, end, &info->diri, features); 304 if (err < 0) 305 goto out_bad; 306 307 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 308 if (err < 0) 309 goto out_bad; 310 311 ceph_decode_32_safe(p, end, info->dname_len, bad); 312 ceph_decode_need(p, end, info->dname_len, bad); 313 info->dname = *p; 314 *p += info->dname_len; 315 316 err = parse_reply_info_lease(p, end, &info->dlease, features); 317 if (err < 0) 318 goto out_bad; 319 } 320 321 if (info->head->is_target) { 322 err = parse_reply_info_in(p, end, &info->targeti, features); 323 if (err < 0) 324 goto out_bad; 325 } 326 327 if (unlikely(*p != end)) 328 goto bad; 329 return 0; 330 331 bad: 332 err = -EIO; 333 out_bad: 334 pr_err("problem parsing mds trace %d\n", err); 335 return err; 336 } 337 338 /* 339 * parse readdir results 340 */ 341 static int parse_reply_info_readdir(void **p, void *end, 342 struct ceph_mds_reply_info_parsed *info, 343 u64 features) 344 { 345 u32 num, i = 0; 346 int err; 347 348 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 349 if (err < 0) 350 goto out_bad; 351 352 ceph_decode_need(p, end, sizeof(num) + 2, bad); 353 num = ceph_decode_32(p); 354 { 355 u16 flags = ceph_decode_16(p); 356 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 357 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 358 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 359 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 360 } 361 if (num == 0) 362 goto done; 363 364 BUG_ON(!info->dir_entries); 365 if ((unsigned long)(info->dir_entries + num) > 366 (unsigned long)info->dir_entries + info->dir_buf_size) { 367 pr_err("dir contents are larger than expected\n"); 368 WARN_ON(1); 369 goto bad; 370 } 371 372 info->dir_nr = num; 373 while (num) { 374 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 375 /* dentry */ 376 ceph_decode_32_safe(p, end, rde->name_len, bad); 377 ceph_decode_need(p, end, rde->name_len, bad); 378 rde->name = *p; 379 *p += rde->name_len; 380 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 381 382 /* dentry lease */ 383 err = parse_reply_info_lease(p, end, &rde->lease, features); 384 if (err) 385 goto out_bad; 386 /* inode */ 387 err = parse_reply_info_in(p, end, &rde->inode, features); 388 if (err < 0) 389 goto out_bad; 390 /* ceph_readdir_prepopulate() will update it */ 391 rde->offset = 0; 392 i++; 393 num--; 394 } 395 396 done: 397 /* Skip over any unrecognized fields */ 398 *p = end; 399 return 0; 400 401 bad: 402 err = -EIO; 403 out_bad: 404 pr_err("problem parsing dir contents %d\n", err); 405 return err; 406 } 407 408 /* 409 * parse fcntl F_GETLK results 410 */ 411 static int parse_reply_info_filelock(void **p, void *end, 412 struct ceph_mds_reply_info_parsed *info, 413 u64 features) 414 { 415 if (*p + sizeof(*info->filelock_reply) > end) 416 goto bad; 417 418 info->filelock_reply = *p; 419 420 /* Skip over any unrecognized fields */ 421 *p = end; 422 return 0; 423 bad: 424 return -EIO; 425 } 426 427 428 #if BITS_PER_LONG == 64 429 430 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 431 432 static int ceph_parse_deleg_inos(void **p, void *end, 433 struct ceph_mds_session *s) 434 { 435 u32 sets; 436 437 ceph_decode_32_safe(p, end, sets, bad); 438 dout("got %u sets of delegated inodes\n", sets); 439 while (sets--) { 440 u64 start, len, ino; 441 442 ceph_decode_64_safe(p, end, start, bad); 443 ceph_decode_64_safe(p, end, len, bad); 444 445 /* Don't accept a delegation of system inodes */ 446 if (start < CEPH_INO_SYSTEM_BASE) { 447 pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n", 448 start, len); 449 continue; 450 } 451 while (len--) { 452 int err = xa_insert(&s->s_delegated_inos, ino = start++, 453 DELEGATED_INO_AVAILABLE, 454 GFP_KERNEL); 455 if (!err) { 456 dout("added delegated inode 0x%llx\n", 457 start - 1); 458 } else if (err == -EBUSY) { 459 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n", 460 start - 1); 461 } else { 462 return err; 463 } 464 } 465 } 466 return 0; 467 bad: 468 return -EIO; 469 } 470 471 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 472 { 473 unsigned long ino; 474 void *val; 475 476 xa_for_each(&s->s_delegated_inos, ino, val) { 477 val = xa_erase(&s->s_delegated_inos, ino); 478 if (val == DELEGATED_INO_AVAILABLE) 479 return ino; 480 } 481 return 0; 482 } 483 484 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 485 { 486 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 487 GFP_KERNEL); 488 } 489 #else /* BITS_PER_LONG == 64 */ 490 /* 491 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 492 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 493 * and bottom words? 494 */ 495 static int ceph_parse_deleg_inos(void **p, void *end, 496 struct ceph_mds_session *s) 497 { 498 u32 sets; 499 500 ceph_decode_32_safe(p, end, sets, bad); 501 if (sets) 502 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 503 return 0; 504 bad: 505 return -EIO; 506 } 507 508 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 509 { 510 return 0; 511 } 512 513 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 514 { 515 return 0; 516 } 517 #endif /* BITS_PER_LONG == 64 */ 518 519 /* 520 * parse create results 521 */ 522 static int parse_reply_info_create(void **p, void *end, 523 struct ceph_mds_reply_info_parsed *info, 524 u64 features, struct ceph_mds_session *s) 525 { 526 int ret; 527 528 if (features == (u64)-1 || 529 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 530 if (*p == end) { 531 /* Malformed reply? */ 532 info->has_create_ino = false; 533 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 534 info->has_create_ino = true; 535 /* struct_v, struct_compat, and len */ 536 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad); 537 ceph_decode_64_safe(p, end, info->ino, bad); 538 ret = ceph_parse_deleg_inos(p, end, s); 539 if (ret) 540 return ret; 541 } else { 542 /* legacy */ 543 ceph_decode_64_safe(p, end, info->ino, bad); 544 info->has_create_ino = true; 545 } 546 } else { 547 if (*p != end) 548 goto bad; 549 } 550 551 /* Skip over any unrecognized fields */ 552 *p = end; 553 return 0; 554 bad: 555 return -EIO; 556 } 557 558 /* 559 * parse extra results 560 */ 561 static int parse_reply_info_extra(void **p, void *end, 562 struct ceph_mds_reply_info_parsed *info, 563 u64 features, struct ceph_mds_session *s) 564 { 565 u32 op = le32_to_cpu(info->head->op); 566 567 if (op == CEPH_MDS_OP_GETFILELOCK) 568 return parse_reply_info_filelock(p, end, info, features); 569 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 570 return parse_reply_info_readdir(p, end, info, features); 571 else if (op == CEPH_MDS_OP_CREATE) 572 return parse_reply_info_create(p, end, info, features, s); 573 else 574 return -EIO; 575 } 576 577 /* 578 * parse entire mds reply 579 */ 580 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 581 struct ceph_mds_reply_info_parsed *info, 582 u64 features) 583 { 584 void *p, *end; 585 u32 len; 586 int err; 587 588 info->head = msg->front.iov_base; 589 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 590 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 591 592 /* trace */ 593 ceph_decode_32_safe(&p, end, len, bad); 594 if (len > 0) { 595 ceph_decode_need(&p, end, len, bad); 596 err = parse_reply_info_trace(&p, p+len, info, features); 597 if (err < 0) 598 goto out_bad; 599 } 600 601 /* extra */ 602 ceph_decode_32_safe(&p, end, len, bad); 603 if (len > 0) { 604 ceph_decode_need(&p, end, len, bad); 605 err = parse_reply_info_extra(&p, p+len, info, features, s); 606 if (err < 0) 607 goto out_bad; 608 } 609 610 /* snap blob */ 611 ceph_decode_32_safe(&p, end, len, bad); 612 info->snapblob_len = len; 613 info->snapblob = p; 614 p += len; 615 616 if (p != end) 617 goto bad; 618 return 0; 619 620 bad: 621 err = -EIO; 622 out_bad: 623 pr_err("mds parse_reply err %d\n", err); 624 return err; 625 } 626 627 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 628 { 629 if (!info->dir_entries) 630 return; 631 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 632 } 633 634 635 /* 636 * sessions 637 */ 638 const char *ceph_session_state_name(int s) 639 { 640 switch (s) { 641 case CEPH_MDS_SESSION_NEW: return "new"; 642 case CEPH_MDS_SESSION_OPENING: return "opening"; 643 case CEPH_MDS_SESSION_OPEN: return "open"; 644 case CEPH_MDS_SESSION_HUNG: return "hung"; 645 case CEPH_MDS_SESSION_CLOSING: return "closing"; 646 case CEPH_MDS_SESSION_CLOSED: return "closed"; 647 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 648 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 649 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 650 default: return "???"; 651 } 652 } 653 654 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 655 { 656 if (refcount_inc_not_zero(&s->s_ref)) 657 return s; 658 return NULL; 659 } 660 661 void ceph_put_mds_session(struct ceph_mds_session *s) 662 { 663 if (IS_ERR_OR_NULL(s)) 664 return; 665 666 if (refcount_dec_and_test(&s->s_ref)) { 667 if (s->s_auth.authorizer) 668 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 669 WARN_ON(mutex_is_locked(&s->s_mutex)); 670 xa_destroy(&s->s_delegated_inos); 671 kfree(s); 672 } 673 } 674 675 /* 676 * called under mdsc->mutex 677 */ 678 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 679 int mds) 680 { 681 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 682 return NULL; 683 return ceph_get_mds_session(mdsc->sessions[mds]); 684 } 685 686 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 687 { 688 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 689 return false; 690 else 691 return true; 692 } 693 694 static int __verify_registered_session(struct ceph_mds_client *mdsc, 695 struct ceph_mds_session *s) 696 { 697 if (s->s_mds >= mdsc->max_sessions || 698 mdsc->sessions[s->s_mds] != s) 699 return -ENOENT; 700 return 0; 701 } 702 703 /* 704 * create+register a new session for given mds. 705 * called under mdsc->mutex. 706 */ 707 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 708 int mds) 709 { 710 struct ceph_mds_session *s; 711 712 if (mds >= mdsc->mdsmap->possible_max_rank) 713 return ERR_PTR(-EINVAL); 714 715 s = kzalloc(sizeof(*s), GFP_NOFS); 716 if (!s) 717 return ERR_PTR(-ENOMEM); 718 719 if (mds >= mdsc->max_sessions) { 720 int newmax = 1 << get_count_order(mds + 1); 721 struct ceph_mds_session **sa; 722 723 dout("%s: realloc to %d\n", __func__, newmax); 724 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 725 if (!sa) 726 goto fail_realloc; 727 if (mdsc->sessions) { 728 memcpy(sa, mdsc->sessions, 729 mdsc->max_sessions * sizeof(void *)); 730 kfree(mdsc->sessions); 731 } 732 mdsc->sessions = sa; 733 mdsc->max_sessions = newmax; 734 } 735 736 dout("%s: mds%d\n", __func__, mds); 737 s->s_mdsc = mdsc; 738 s->s_mds = mds; 739 s->s_state = CEPH_MDS_SESSION_NEW; 740 mutex_init(&s->s_mutex); 741 742 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 743 744 atomic_set(&s->s_cap_gen, 1); 745 s->s_cap_ttl = jiffies - 1; 746 747 spin_lock_init(&s->s_cap_lock); 748 INIT_LIST_HEAD(&s->s_caps); 749 refcount_set(&s->s_ref, 1); 750 INIT_LIST_HEAD(&s->s_waiting); 751 INIT_LIST_HEAD(&s->s_unsafe); 752 xa_init(&s->s_delegated_inos); 753 INIT_LIST_HEAD(&s->s_cap_releases); 754 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 755 756 INIT_LIST_HEAD(&s->s_cap_dirty); 757 INIT_LIST_HEAD(&s->s_cap_flushing); 758 759 mdsc->sessions[mds] = s; 760 atomic_inc(&mdsc->num_sessions); 761 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 762 763 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 764 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 765 766 return s; 767 768 fail_realloc: 769 kfree(s); 770 return ERR_PTR(-ENOMEM); 771 } 772 773 /* 774 * called under mdsc->mutex 775 */ 776 static void __unregister_session(struct ceph_mds_client *mdsc, 777 struct ceph_mds_session *s) 778 { 779 dout("__unregister_session mds%d %p\n", s->s_mds, s); 780 BUG_ON(mdsc->sessions[s->s_mds] != s); 781 mdsc->sessions[s->s_mds] = NULL; 782 ceph_con_close(&s->s_con); 783 ceph_put_mds_session(s); 784 atomic_dec(&mdsc->num_sessions); 785 } 786 787 /* 788 * drop session refs in request. 789 * 790 * should be last request ref, or hold mdsc->mutex 791 */ 792 static void put_request_session(struct ceph_mds_request *req) 793 { 794 if (req->r_session) { 795 ceph_put_mds_session(req->r_session); 796 req->r_session = NULL; 797 } 798 } 799 800 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc, 801 void (*cb)(struct ceph_mds_session *), 802 bool check_state) 803 { 804 int mds; 805 806 mutex_lock(&mdsc->mutex); 807 for (mds = 0; mds < mdsc->max_sessions; ++mds) { 808 struct ceph_mds_session *s; 809 810 s = __ceph_lookup_mds_session(mdsc, mds); 811 if (!s) 812 continue; 813 814 if (check_state && !check_session_state(s)) { 815 ceph_put_mds_session(s); 816 continue; 817 } 818 819 mutex_unlock(&mdsc->mutex); 820 cb(s); 821 ceph_put_mds_session(s); 822 mutex_lock(&mdsc->mutex); 823 } 824 mutex_unlock(&mdsc->mutex); 825 } 826 827 void ceph_mdsc_release_request(struct kref *kref) 828 { 829 struct ceph_mds_request *req = container_of(kref, 830 struct ceph_mds_request, 831 r_kref); 832 ceph_mdsc_release_dir_caps_no_check(req); 833 destroy_reply_info(&req->r_reply_info); 834 if (req->r_request) 835 ceph_msg_put(req->r_request); 836 if (req->r_reply) 837 ceph_msg_put(req->r_reply); 838 if (req->r_inode) { 839 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 840 iput(req->r_inode); 841 } 842 if (req->r_parent) { 843 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 844 iput(req->r_parent); 845 } 846 iput(req->r_target_inode); 847 if (req->r_dentry) 848 dput(req->r_dentry); 849 if (req->r_old_dentry) 850 dput(req->r_old_dentry); 851 if (req->r_old_dentry_dir) { 852 /* 853 * track (and drop pins for) r_old_dentry_dir 854 * separately, since r_old_dentry's d_parent may have 855 * changed between the dir mutex being dropped and 856 * this request being freed. 857 */ 858 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 859 CEPH_CAP_PIN); 860 iput(req->r_old_dentry_dir); 861 } 862 kfree(req->r_path1); 863 kfree(req->r_path2); 864 put_cred(req->r_cred); 865 if (req->r_pagelist) 866 ceph_pagelist_release(req->r_pagelist); 867 put_request_session(req); 868 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 869 WARN_ON_ONCE(!list_empty(&req->r_wait)); 870 kmem_cache_free(ceph_mds_request_cachep, req); 871 } 872 873 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 874 875 /* 876 * lookup session, bump ref if found. 877 * 878 * called under mdsc->mutex. 879 */ 880 static struct ceph_mds_request * 881 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 882 { 883 struct ceph_mds_request *req; 884 885 req = lookup_request(&mdsc->request_tree, tid); 886 if (req) 887 ceph_mdsc_get_request(req); 888 889 return req; 890 } 891 892 /* 893 * Register an in-flight request, and assign a tid. Link to directory 894 * are modifying (if any). 895 * 896 * Called under mdsc->mutex. 897 */ 898 static void __register_request(struct ceph_mds_client *mdsc, 899 struct ceph_mds_request *req, 900 struct inode *dir) 901 { 902 int ret = 0; 903 904 req->r_tid = ++mdsc->last_tid; 905 if (req->r_num_caps) { 906 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 907 req->r_num_caps); 908 if (ret < 0) { 909 pr_err("__register_request %p " 910 "failed to reserve caps: %d\n", req, ret); 911 /* set req->r_err to fail early from __do_request */ 912 req->r_err = ret; 913 return; 914 } 915 } 916 dout("__register_request %p tid %lld\n", req, req->r_tid); 917 ceph_mdsc_get_request(req); 918 insert_request(&mdsc->request_tree, req); 919 920 req->r_cred = get_current_cred(); 921 922 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 923 mdsc->oldest_tid = req->r_tid; 924 925 if (dir) { 926 struct ceph_inode_info *ci = ceph_inode(dir); 927 928 ihold(dir); 929 req->r_unsafe_dir = dir; 930 spin_lock(&ci->i_unsafe_lock); 931 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 932 spin_unlock(&ci->i_unsafe_lock); 933 } 934 } 935 936 static void __unregister_request(struct ceph_mds_client *mdsc, 937 struct ceph_mds_request *req) 938 { 939 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 940 941 /* Never leave an unregistered request on an unsafe list! */ 942 list_del_init(&req->r_unsafe_item); 943 944 if (req->r_tid == mdsc->oldest_tid) { 945 struct rb_node *p = rb_next(&req->r_node); 946 mdsc->oldest_tid = 0; 947 while (p) { 948 struct ceph_mds_request *next_req = 949 rb_entry(p, struct ceph_mds_request, r_node); 950 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 951 mdsc->oldest_tid = next_req->r_tid; 952 break; 953 } 954 p = rb_next(p); 955 } 956 } 957 958 erase_request(&mdsc->request_tree, req); 959 960 if (req->r_unsafe_dir) { 961 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 962 spin_lock(&ci->i_unsafe_lock); 963 list_del_init(&req->r_unsafe_dir_item); 964 spin_unlock(&ci->i_unsafe_lock); 965 } 966 if (req->r_target_inode && 967 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 968 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 969 spin_lock(&ci->i_unsafe_lock); 970 list_del_init(&req->r_unsafe_target_item); 971 spin_unlock(&ci->i_unsafe_lock); 972 } 973 974 if (req->r_unsafe_dir) { 975 iput(req->r_unsafe_dir); 976 req->r_unsafe_dir = NULL; 977 } 978 979 complete_all(&req->r_safe_completion); 980 981 ceph_mdsc_put_request(req); 982 } 983 984 /* 985 * Walk back up the dentry tree until we hit a dentry representing a 986 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 987 * when calling this) to ensure that the objects won't disappear while we're 988 * working with them. Once we hit a candidate dentry, we attempt to take a 989 * reference to it, and return that as the result. 990 */ 991 static struct inode *get_nonsnap_parent(struct dentry *dentry) 992 { 993 struct inode *inode = NULL; 994 995 while (dentry && !IS_ROOT(dentry)) { 996 inode = d_inode_rcu(dentry); 997 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 998 break; 999 dentry = dentry->d_parent; 1000 } 1001 if (inode) 1002 inode = igrab(inode); 1003 return inode; 1004 } 1005 1006 /* 1007 * Choose mds to send request to next. If there is a hint set in the 1008 * request (e.g., due to a prior forward hint from the mds), use that. 1009 * Otherwise, consult frag tree and/or caps to identify the 1010 * appropriate mds. If all else fails, choose randomly. 1011 * 1012 * Called under mdsc->mutex. 1013 */ 1014 static int __choose_mds(struct ceph_mds_client *mdsc, 1015 struct ceph_mds_request *req, 1016 bool *random) 1017 { 1018 struct inode *inode; 1019 struct ceph_inode_info *ci; 1020 struct ceph_cap *cap; 1021 int mode = req->r_direct_mode; 1022 int mds = -1; 1023 u32 hash = req->r_direct_hash; 1024 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1025 1026 if (random) 1027 *random = false; 1028 1029 /* 1030 * is there a specific mds we should try? ignore hint if we have 1031 * no session and the mds is not up (active or recovering). 1032 */ 1033 if (req->r_resend_mds >= 0 && 1034 (__have_session(mdsc, req->r_resend_mds) || 1035 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1036 dout("%s using resend_mds mds%d\n", __func__, 1037 req->r_resend_mds); 1038 return req->r_resend_mds; 1039 } 1040 1041 if (mode == USE_RANDOM_MDS) 1042 goto random; 1043 1044 inode = NULL; 1045 if (req->r_inode) { 1046 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1047 inode = req->r_inode; 1048 ihold(inode); 1049 } else { 1050 /* req->r_dentry is non-null for LSSNAP request */ 1051 rcu_read_lock(); 1052 inode = get_nonsnap_parent(req->r_dentry); 1053 rcu_read_unlock(); 1054 dout("%s using snapdir's parent %p\n", __func__, inode); 1055 } 1056 } else if (req->r_dentry) { 1057 /* ignore race with rename; old or new d_parent is okay */ 1058 struct dentry *parent; 1059 struct inode *dir; 1060 1061 rcu_read_lock(); 1062 parent = READ_ONCE(req->r_dentry->d_parent); 1063 dir = req->r_parent ? : d_inode_rcu(parent); 1064 1065 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1066 /* not this fs or parent went negative */ 1067 inode = d_inode(req->r_dentry); 1068 if (inode) 1069 ihold(inode); 1070 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1071 /* direct snapped/virtual snapdir requests 1072 * based on parent dir inode */ 1073 inode = get_nonsnap_parent(parent); 1074 dout("%s using nonsnap parent %p\n", __func__, inode); 1075 } else { 1076 /* dentry target */ 1077 inode = d_inode(req->r_dentry); 1078 if (!inode || mode == USE_AUTH_MDS) { 1079 /* dir + name */ 1080 inode = igrab(dir); 1081 hash = ceph_dentry_hash(dir, req->r_dentry); 1082 is_hash = true; 1083 } else { 1084 ihold(inode); 1085 } 1086 } 1087 rcu_read_unlock(); 1088 } 1089 1090 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, 1091 hash, mode); 1092 if (!inode) 1093 goto random; 1094 ci = ceph_inode(inode); 1095 1096 if (is_hash && S_ISDIR(inode->i_mode)) { 1097 struct ceph_inode_frag frag; 1098 int found; 1099 1100 ceph_choose_frag(ci, hash, &frag, &found); 1101 if (found) { 1102 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1103 u8 r; 1104 1105 /* choose a random replica */ 1106 get_random_bytes(&r, 1); 1107 r %= frag.ndist; 1108 mds = frag.dist[r]; 1109 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", 1110 __func__, inode, ceph_vinop(inode), 1111 frag.frag, mds, (int)r, frag.ndist); 1112 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1113 CEPH_MDS_STATE_ACTIVE && 1114 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1115 goto out; 1116 } 1117 1118 /* since this file/dir wasn't known to be 1119 * replicated, then we want to look for the 1120 * authoritative mds. */ 1121 if (frag.mds >= 0) { 1122 /* choose auth mds */ 1123 mds = frag.mds; 1124 dout("%s %p %llx.%llx frag %u mds%d (auth)\n", 1125 __func__, inode, ceph_vinop(inode), 1126 frag.frag, mds); 1127 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1128 CEPH_MDS_STATE_ACTIVE) { 1129 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1130 mds)) 1131 goto out; 1132 } 1133 } 1134 mode = USE_AUTH_MDS; 1135 } 1136 } 1137 1138 spin_lock(&ci->i_ceph_lock); 1139 cap = NULL; 1140 if (mode == USE_AUTH_MDS) 1141 cap = ci->i_auth_cap; 1142 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1143 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1144 if (!cap) { 1145 spin_unlock(&ci->i_ceph_lock); 1146 iput(inode); 1147 goto random; 1148 } 1149 mds = cap->session->s_mds; 1150 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, 1151 inode, ceph_vinop(inode), mds, 1152 cap == ci->i_auth_cap ? "auth " : "", cap); 1153 spin_unlock(&ci->i_ceph_lock); 1154 out: 1155 iput(inode); 1156 return mds; 1157 1158 random: 1159 if (random) 1160 *random = true; 1161 1162 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1163 dout("%s chose random mds%d\n", __func__, mds); 1164 return mds; 1165 } 1166 1167 1168 /* 1169 * session messages 1170 */ 1171 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq) 1172 { 1173 struct ceph_msg *msg; 1174 struct ceph_mds_session_head *h; 1175 1176 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1177 false); 1178 if (!msg) { 1179 pr_err("ENOMEM creating session %s msg\n", 1180 ceph_session_op_name(op)); 1181 return NULL; 1182 } 1183 h = msg->front.iov_base; 1184 h->op = cpu_to_le32(op); 1185 h->seq = cpu_to_le64(seq); 1186 1187 return msg; 1188 } 1189 1190 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1191 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1192 static int encode_supported_features(void **p, void *end) 1193 { 1194 static const size_t count = ARRAY_SIZE(feature_bits); 1195 1196 if (count > 0) { 1197 size_t i; 1198 size_t size = FEATURE_BYTES(count); 1199 1200 if (WARN_ON_ONCE(*p + 4 + size > end)) 1201 return -ERANGE; 1202 1203 ceph_encode_32(p, size); 1204 memset(*p, 0, size); 1205 for (i = 0; i < count; i++) 1206 ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8); 1207 *p += size; 1208 } else { 1209 if (WARN_ON_ONCE(*p + 4 > end)) 1210 return -ERANGE; 1211 1212 ceph_encode_32(p, 0); 1213 } 1214 1215 return 0; 1216 } 1217 1218 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1219 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1220 static int encode_metric_spec(void **p, void *end) 1221 { 1222 static const size_t count = ARRAY_SIZE(metric_bits); 1223 1224 /* header */ 1225 if (WARN_ON_ONCE(*p + 2 > end)) 1226 return -ERANGE; 1227 1228 ceph_encode_8(p, 1); /* version */ 1229 ceph_encode_8(p, 1); /* compat */ 1230 1231 if (count > 0) { 1232 size_t i; 1233 size_t size = METRIC_BYTES(count); 1234 1235 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1236 return -ERANGE; 1237 1238 /* metric spec info length */ 1239 ceph_encode_32(p, 4 + size); 1240 1241 /* metric spec */ 1242 ceph_encode_32(p, size); 1243 memset(*p, 0, size); 1244 for (i = 0; i < count; i++) 1245 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1246 *p += size; 1247 } else { 1248 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1249 return -ERANGE; 1250 1251 /* metric spec info length */ 1252 ceph_encode_32(p, 4); 1253 /* metric spec */ 1254 ceph_encode_32(p, 0); 1255 } 1256 1257 return 0; 1258 } 1259 1260 /* 1261 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1262 * to include additional client metadata fields. 1263 */ 1264 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1265 { 1266 struct ceph_msg *msg; 1267 struct ceph_mds_session_head *h; 1268 int i; 1269 int extra_bytes = 0; 1270 int metadata_key_count = 0; 1271 struct ceph_options *opt = mdsc->fsc->client->options; 1272 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1273 size_t size, count; 1274 void *p, *end; 1275 int ret; 1276 1277 const char* metadata[][2] = { 1278 {"hostname", mdsc->nodename}, 1279 {"kernel_version", init_utsname()->release}, 1280 {"entity_id", opt->name ? : ""}, 1281 {"root", fsopt->server_path ? : "/"}, 1282 {NULL, NULL} 1283 }; 1284 1285 /* Calculate serialized length of metadata */ 1286 extra_bytes = 4; /* map length */ 1287 for (i = 0; metadata[i][0]; ++i) { 1288 extra_bytes += 8 + strlen(metadata[i][0]) + 1289 strlen(metadata[i][1]); 1290 metadata_key_count++; 1291 } 1292 1293 /* supported feature */ 1294 size = 0; 1295 count = ARRAY_SIZE(feature_bits); 1296 if (count > 0) 1297 size = FEATURE_BYTES(count); 1298 extra_bytes += 4 + size; 1299 1300 /* metric spec */ 1301 size = 0; 1302 count = ARRAY_SIZE(metric_bits); 1303 if (count > 0) 1304 size = METRIC_BYTES(count); 1305 extra_bytes += 2 + 4 + 4 + size; 1306 1307 /* Allocate the message */ 1308 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1309 GFP_NOFS, false); 1310 if (!msg) { 1311 pr_err("ENOMEM creating session open msg\n"); 1312 return ERR_PTR(-ENOMEM); 1313 } 1314 p = msg->front.iov_base; 1315 end = p + msg->front.iov_len; 1316 1317 h = p; 1318 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1319 h->seq = cpu_to_le64(seq); 1320 1321 /* 1322 * Serialize client metadata into waiting buffer space, using 1323 * the format that userspace expects for map<string, string> 1324 * 1325 * ClientSession messages with metadata are v4 1326 */ 1327 msg->hdr.version = cpu_to_le16(4); 1328 msg->hdr.compat_version = cpu_to_le16(1); 1329 1330 /* The write pointer, following the session_head structure */ 1331 p += sizeof(*h); 1332 1333 /* Number of entries in the map */ 1334 ceph_encode_32(&p, metadata_key_count); 1335 1336 /* Two length-prefixed strings for each entry in the map */ 1337 for (i = 0; metadata[i][0]; ++i) { 1338 size_t const key_len = strlen(metadata[i][0]); 1339 size_t const val_len = strlen(metadata[i][1]); 1340 1341 ceph_encode_32(&p, key_len); 1342 memcpy(p, metadata[i][0], key_len); 1343 p += key_len; 1344 ceph_encode_32(&p, val_len); 1345 memcpy(p, metadata[i][1], val_len); 1346 p += val_len; 1347 } 1348 1349 ret = encode_supported_features(&p, end); 1350 if (ret) { 1351 pr_err("encode_supported_features failed!\n"); 1352 ceph_msg_put(msg); 1353 return ERR_PTR(ret); 1354 } 1355 1356 ret = encode_metric_spec(&p, end); 1357 if (ret) { 1358 pr_err("encode_metric_spec failed!\n"); 1359 ceph_msg_put(msg); 1360 return ERR_PTR(ret); 1361 } 1362 1363 msg->front.iov_len = p - msg->front.iov_base; 1364 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1365 1366 return msg; 1367 } 1368 1369 /* 1370 * send session open request. 1371 * 1372 * called under mdsc->mutex 1373 */ 1374 static int __open_session(struct ceph_mds_client *mdsc, 1375 struct ceph_mds_session *session) 1376 { 1377 struct ceph_msg *msg; 1378 int mstate; 1379 int mds = session->s_mds; 1380 1381 /* wait for mds to go active? */ 1382 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1383 dout("open_session to mds%d (%s)\n", mds, 1384 ceph_mds_state_name(mstate)); 1385 session->s_state = CEPH_MDS_SESSION_OPENING; 1386 session->s_renew_requested = jiffies; 1387 1388 /* send connect message */ 1389 msg = create_session_open_msg(mdsc, session->s_seq); 1390 if (IS_ERR(msg)) 1391 return PTR_ERR(msg); 1392 ceph_con_send(&session->s_con, msg); 1393 return 0; 1394 } 1395 1396 /* 1397 * open sessions for any export targets for the given mds 1398 * 1399 * called under mdsc->mutex 1400 */ 1401 static struct ceph_mds_session * 1402 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1403 { 1404 struct ceph_mds_session *session; 1405 int ret; 1406 1407 session = __ceph_lookup_mds_session(mdsc, target); 1408 if (!session) { 1409 session = register_session(mdsc, target); 1410 if (IS_ERR(session)) 1411 return session; 1412 } 1413 if (session->s_state == CEPH_MDS_SESSION_NEW || 1414 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1415 ret = __open_session(mdsc, session); 1416 if (ret) 1417 return ERR_PTR(ret); 1418 } 1419 1420 return session; 1421 } 1422 1423 struct ceph_mds_session * 1424 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1425 { 1426 struct ceph_mds_session *session; 1427 1428 dout("open_export_target_session to mds%d\n", target); 1429 1430 mutex_lock(&mdsc->mutex); 1431 session = __open_export_target_session(mdsc, target); 1432 mutex_unlock(&mdsc->mutex); 1433 1434 return session; 1435 } 1436 1437 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1438 struct ceph_mds_session *session) 1439 { 1440 struct ceph_mds_info *mi; 1441 struct ceph_mds_session *ts; 1442 int i, mds = session->s_mds; 1443 1444 if (mds >= mdsc->mdsmap->possible_max_rank) 1445 return; 1446 1447 mi = &mdsc->mdsmap->m_info[mds]; 1448 dout("open_export_target_sessions for mds%d (%d targets)\n", 1449 session->s_mds, mi->num_export_targets); 1450 1451 for (i = 0; i < mi->num_export_targets; i++) { 1452 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1453 ceph_put_mds_session(ts); 1454 } 1455 } 1456 1457 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1458 struct ceph_mds_session *session) 1459 { 1460 mutex_lock(&mdsc->mutex); 1461 __open_export_target_sessions(mdsc, session); 1462 mutex_unlock(&mdsc->mutex); 1463 } 1464 1465 /* 1466 * session caps 1467 */ 1468 1469 static void detach_cap_releases(struct ceph_mds_session *session, 1470 struct list_head *target) 1471 { 1472 lockdep_assert_held(&session->s_cap_lock); 1473 1474 list_splice_init(&session->s_cap_releases, target); 1475 session->s_num_cap_releases = 0; 1476 dout("dispose_cap_releases mds%d\n", session->s_mds); 1477 } 1478 1479 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1480 struct list_head *dispose) 1481 { 1482 while (!list_empty(dispose)) { 1483 struct ceph_cap *cap; 1484 /* zero out the in-progress message */ 1485 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1486 list_del(&cap->session_caps); 1487 ceph_put_cap(mdsc, cap); 1488 } 1489 } 1490 1491 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1492 struct ceph_mds_session *session) 1493 { 1494 struct ceph_mds_request *req; 1495 struct rb_node *p; 1496 struct ceph_inode_info *ci; 1497 1498 dout("cleanup_session_requests mds%d\n", session->s_mds); 1499 mutex_lock(&mdsc->mutex); 1500 while (!list_empty(&session->s_unsafe)) { 1501 req = list_first_entry(&session->s_unsafe, 1502 struct ceph_mds_request, r_unsafe_item); 1503 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1504 req->r_tid); 1505 if (req->r_target_inode) { 1506 /* dropping unsafe change of inode's attributes */ 1507 ci = ceph_inode(req->r_target_inode); 1508 errseq_set(&ci->i_meta_err, -EIO); 1509 } 1510 if (req->r_unsafe_dir) { 1511 /* dropping unsafe directory operation */ 1512 ci = ceph_inode(req->r_unsafe_dir); 1513 errseq_set(&ci->i_meta_err, -EIO); 1514 } 1515 __unregister_request(mdsc, req); 1516 } 1517 /* zero r_attempts, so kick_requests() will re-send requests */ 1518 p = rb_first(&mdsc->request_tree); 1519 while (p) { 1520 req = rb_entry(p, struct ceph_mds_request, r_node); 1521 p = rb_next(p); 1522 if (req->r_session && 1523 req->r_session->s_mds == session->s_mds) 1524 req->r_attempts = 0; 1525 } 1526 mutex_unlock(&mdsc->mutex); 1527 } 1528 1529 /* 1530 * Helper to safely iterate over all caps associated with a session, with 1531 * special care taken to handle a racing __ceph_remove_cap(). 1532 * 1533 * Caller must hold session s_mutex. 1534 */ 1535 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1536 int (*cb)(struct inode *, struct ceph_cap *, 1537 void *), void *arg) 1538 { 1539 struct list_head *p; 1540 struct ceph_cap *cap; 1541 struct inode *inode, *last_inode = NULL; 1542 struct ceph_cap *old_cap = NULL; 1543 int ret; 1544 1545 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1546 spin_lock(&session->s_cap_lock); 1547 p = session->s_caps.next; 1548 while (p != &session->s_caps) { 1549 cap = list_entry(p, struct ceph_cap, session_caps); 1550 inode = igrab(&cap->ci->vfs_inode); 1551 if (!inode) { 1552 p = p->next; 1553 continue; 1554 } 1555 session->s_cap_iterator = cap; 1556 spin_unlock(&session->s_cap_lock); 1557 1558 if (last_inode) { 1559 iput(last_inode); 1560 last_inode = NULL; 1561 } 1562 if (old_cap) { 1563 ceph_put_cap(session->s_mdsc, old_cap); 1564 old_cap = NULL; 1565 } 1566 1567 ret = cb(inode, cap, arg); 1568 last_inode = inode; 1569 1570 spin_lock(&session->s_cap_lock); 1571 p = p->next; 1572 if (!cap->ci) { 1573 dout("iterate_session_caps finishing cap %p removal\n", 1574 cap); 1575 BUG_ON(cap->session != session); 1576 cap->session = NULL; 1577 list_del_init(&cap->session_caps); 1578 session->s_nr_caps--; 1579 atomic64_dec(&session->s_mdsc->metric.total_caps); 1580 if (cap->queue_release) 1581 __ceph_queue_cap_release(session, cap); 1582 else 1583 old_cap = cap; /* put_cap it w/o locks held */ 1584 } 1585 if (ret < 0) 1586 goto out; 1587 } 1588 ret = 0; 1589 out: 1590 session->s_cap_iterator = NULL; 1591 spin_unlock(&session->s_cap_lock); 1592 1593 iput(last_inode); 1594 if (old_cap) 1595 ceph_put_cap(session->s_mdsc, old_cap); 1596 1597 return ret; 1598 } 1599 1600 static int remove_capsnaps(struct ceph_mds_client *mdsc, struct inode *inode) 1601 { 1602 struct ceph_inode_info *ci = ceph_inode(inode); 1603 struct ceph_cap_snap *capsnap; 1604 int capsnap_release = 0; 1605 1606 lockdep_assert_held(&ci->i_ceph_lock); 1607 1608 dout("removing capsnaps, ci is %p, inode is %p\n", ci, inode); 1609 1610 while (!list_empty(&ci->i_cap_snaps)) { 1611 capsnap = list_first_entry(&ci->i_cap_snaps, 1612 struct ceph_cap_snap, ci_item); 1613 __ceph_remove_capsnap(inode, capsnap, NULL, NULL); 1614 ceph_put_snap_context(capsnap->context); 1615 ceph_put_cap_snap(capsnap); 1616 capsnap_release++; 1617 } 1618 wake_up_all(&ci->i_cap_wq); 1619 wake_up_all(&mdsc->cap_flushing_wq); 1620 return capsnap_release; 1621 } 1622 1623 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1624 void *arg) 1625 { 1626 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg; 1627 struct ceph_mds_client *mdsc = fsc->mdsc; 1628 struct ceph_inode_info *ci = ceph_inode(inode); 1629 LIST_HEAD(to_remove); 1630 bool dirty_dropped = false; 1631 bool invalidate = false; 1632 int capsnap_release = 0; 1633 1634 dout("removing cap %p, ci is %p, inode is %p\n", 1635 cap, ci, &ci->vfs_inode); 1636 spin_lock(&ci->i_ceph_lock); 1637 __ceph_remove_cap(cap, false); 1638 if (!ci->i_auth_cap) { 1639 struct ceph_cap_flush *cf; 1640 1641 if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) { 1642 if (inode->i_data.nrpages > 0) 1643 invalidate = true; 1644 if (ci->i_wrbuffer_ref > 0) 1645 mapping_set_error(&inode->i_data, -EIO); 1646 } 1647 1648 while (!list_empty(&ci->i_cap_flush_list)) { 1649 cf = list_first_entry(&ci->i_cap_flush_list, 1650 struct ceph_cap_flush, i_list); 1651 list_move(&cf->i_list, &to_remove); 1652 } 1653 1654 spin_lock(&mdsc->cap_dirty_lock); 1655 1656 list_for_each_entry(cf, &to_remove, i_list) 1657 list_del_init(&cf->g_list); 1658 1659 if (!list_empty(&ci->i_dirty_item)) { 1660 pr_warn_ratelimited( 1661 " dropping dirty %s state for %p %lld\n", 1662 ceph_cap_string(ci->i_dirty_caps), 1663 inode, ceph_ino(inode)); 1664 ci->i_dirty_caps = 0; 1665 list_del_init(&ci->i_dirty_item); 1666 dirty_dropped = true; 1667 } 1668 if (!list_empty(&ci->i_flushing_item)) { 1669 pr_warn_ratelimited( 1670 " dropping dirty+flushing %s state for %p %lld\n", 1671 ceph_cap_string(ci->i_flushing_caps), 1672 inode, ceph_ino(inode)); 1673 ci->i_flushing_caps = 0; 1674 list_del_init(&ci->i_flushing_item); 1675 mdsc->num_cap_flushing--; 1676 dirty_dropped = true; 1677 } 1678 spin_unlock(&mdsc->cap_dirty_lock); 1679 1680 if (dirty_dropped) { 1681 errseq_set(&ci->i_meta_err, -EIO); 1682 1683 if (ci->i_wrbuffer_ref_head == 0 && 1684 ci->i_wr_ref == 0 && 1685 ci->i_dirty_caps == 0 && 1686 ci->i_flushing_caps == 0) { 1687 ceph_put_snap_context(ci->i_head_snapc); 1688 ci->i_head_snapc = NULL; 1689 } 1690 } 1691 1692 if (atomic_read(&ci->i_filelock_ref) > 0) { 1693 /* make further file lock syscall return -EIO */ 1694 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK; 1695 pr_warn_ratelimited(" dropping file locks for %p %lld\n", 1696 inode, ceph_ino(inode)); 1697 } 1698 1699 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { 1700 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); 1701 ci->i_prealloc_cap_flush = NULL; 1702 } 1703 1704 if (!list_empty(&ci->i_cap_snaps)) 1705 capsnap_release = remove_capsnaps(mdsc, inode); 1706 } 1707 spin_unlock(&ci->i_ceph_lock); 1708 while (!list_empty(&to_remove)) { 1709 struct ceph_cap_flush *cf; 1710 cf = list_first_entry(&to_remove, 1711 struct ceph_cap_flush, i_list); 1712 list_del_init(&cf->i_list); 1713 if (!cf->is_capsnap) 1714 ceph_free_cap_flush(cf); 1715 } 1716 1717 wake_up_all(&ci->i_cap_wq); 1718 if (invalidate) 1719 ceph_queue_invalidate(inode); 1720 if (dirty_dropped) 1721 iput(inode); 1722 while (capsnap_release--) 1723 iput(inode); 1724 return 0; 1725 } 1726 1727 /* 1728 * caller must hold session s_mutex 1729 */ 1730 static void remove_session_caps(struct ceph_mds_session *session) 1731 { 1732 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1733 struct super_block *sb = fsc->sb; 1734 LIST_HEAD(dispose); 1735 1736 dout("remove_session_caps on %p\n", session); 1737 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1738 1739 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1740 1741 spin_lock(&session->s_cap_lock); 1742 if (session->s_nr_caps > 0) { 1743 struct inode *inode; 1744 struct ceph_cap *cap, *prev = NULL; 1745 struct ceph_vino vino; 1746 /* 1747 * iterate_session_caps() skips inodes that are being 1748 * deleted, we need to wait until deletions are complete. 1749 * __wait_on_freeing_inode() is designed for the job, 1750 * but it is not exported, so use lookup inode function 1751 * to access it. 1752 */ 1753 while (!list_empty(&session->s_caps)) { 1754 cap = list_entry(session->s_caps.next, 1755 struct ceph_cap, session_caps); 1756 if (cap == prev) 1757 break; 1758 prev = cap; 1759 vino = cap->ci->i_vino; 1760 spin_unlock(&session->s_cap_lock); 1761 1762 inode = ceph_find_inode(sb, vino); 1763 iput(inode); 1764 1765 spin_lock(&session->s_cap_lock); 1766 } 1767 } 1768 1769 // drop cap expires and unlock s_cap_lock 1770 detach_cap_releases(session, &dispose); 1771 1772 BUG_ON(session->s_nr_caps > 0); 1773 BUG_ON(!list_empty(&session->s_cap_flushing)); 1774 spin_unlock(&session->s_cap_lock); 1775 dispose_cap_releases(session->s_mdsc, &dispose); 1776 } 1777 1778 enum { 1779 RECONNECT, 1780 RENEWCAPS, 1781 FORCE_RO, 1782 }; 1783 1784 /* 1785 * wake up any threads waiting on this session's caps. if the cap is 1786 * old (didn't get renewed on the client reconnect), remove it now. 1787 * 1788 * caller must hold s_mutex. 1789 */ 1790 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1791 void *arg) 1792 { 1793 struct ceph_inode_info *ci = ceph_inode(inode); 1794 unsigned long ev = (unsigned long)arg; 1795 1796 if (ev == RECONNECT) { 1797 spin_lock(&ci->i_ceph_lock); 1798 ci->i_wanted_max_size = 0; 1799 ci->i_requested_max_size = 0; 1800 spin_unlock(&ci->i_ceph_lock); 1801 } else if (ev == RENEWCAPS) { 1802 if (cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) { 1803 /* mds did not re-issue stale cap */ 1804 spin_lock(&ci->i_ceph_lock); 1805 cap->issued = cap->implemented = CEPH_CAP_PIN; 1806 spin_unlock(&ci->i_ceph_lock); 1807 } 1808 } else if (ev == FORCE_RO) { 1809 } 1810 wake_up_all(&ci->i_cap_wq); 1811 return 0; 1812 } 1813 1814 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1815 { 1816 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1817 ceph_iterate_session_caps(session, wake_up_session_cb, 1818 (void *)(unsigned long)ev); 1819 } 1820 1821 /* 1822 * Send periodic message to MDS renewing all currently held caps. The 1823 * ack will reset the expiration for all caps from this session. 1824 * 1825 * caller holds s_mutex 1826 */ 1827 static int send_renew_caps(struct ceph_mds_client *mdsc, 1828 struct ceph_mds_session *session) 1829 { 1830 struct ceph_msg *msg; 1831 int state; 1832 1833 if (time_after_eq(jiffies, session->s_cap_ttl) && 1834 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1835 pr_info("mds%d caps stale\n", session->s_mds); 1836 session->s_renew_requested = jiffies; 1837 1838 /* do not try to renew caps until a recovering mds has reconnected 1839 * with its clients. */ 1840 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1841 if (state < CEPH_MDS_STATE_RECONNECT) { 1842 dout("send_renew_caps ignoring mds%d (%s)\n", 1843 session->s_mds, ceph_mds_state_name(state)); 1844 return 0; 1845 } 1846 1847 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1848 ceph_mds_state_name(state)); 1849 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1850 ++session->s_renew_seq); 1851 if (!msg) 1852 return -ENOMEM; 1853 ceph_con_send(&session->s_con, msg); 1854 return 0; 1855 } 1856 1857 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1858 struct ceph_mds_session *session, u64 seq) 1859 { 1860 struct ceph_msg *msg; 1861 1862 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1863 session->s_mds, ceph_session_state_name(session->s_state), seq); 1864 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1865 if (!msg) 1866 return -ENOMEM; 1867 ceph_con_send(&session->s_con, msg); 1868 return 0; 1869 } 1870 1871 1872 /* 1873 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1874 * 1875 * Called under session->s_mutex 1876 */ 1877 static void renewed_caps(struct ceph_mds_client *mdsc, 1878 struct ceph_mds_session *session, int is_renew) 1879 { 1880 int was_stale; 1881 int wake = 0; 1882 1883 spin_lock(&session->s_cap_lock); 1884 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1885 1886 session->s_cap_ttl = session->s_renew_requested + 1887 mdsc->mdsmap->m_session_timeout*HZ; 1888 1889 if (was_stale) { 1890 if (time_before(jiffies, session->s_cap_ttl)) { 1891 pr_info("mds%d caps renewed\n", session->s_mds); 1892 wake = 1; 1893 } else { 1894 pr_info("mds%d caps still stale\n", session->s_mds); 1895 } 1896 } 1897 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1898 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1899 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1900 spin_unlock(&session->s_cap_lock); 1901 1902 if (wake) 1903 wake_up_session_caps(session, RENEWCAPS); 1904 } 1905 1906 /* 1907 * send a session close request 1908 */ 1909 static int request_close_session(struct ceph_mds_session *session) 1910 { 1911 struct ceph_msg *msg; 1912 1913 dout("request_close_session mds%d state %s seq %lld\n", 1914 session->s_mds, ceph_session_state_name(session->s_state), 1915 session->s_seq); 1916 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE, 1917 session->s_seq); 1918 if (!msg) 1919 return -ENOMEM; 1920 ceph_con_send(&session->s_con, msg); 1921 return 1; 1922 } 1923 1924 /* 1925 * Called with s_mutex held. 1926 */ 1927 static int __close_session(struct ceph_mds_client *mdsc, 1928 struct ceph_mds_session *session) 1929 { 1930 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1931 return 0; 1932 session->s_state = CEPH_MDS_SESSION_CLOSING; 1933 return request_close_session(session); 1934 } 1935 1936 static bool drop_negative_children(struct dentry *dentry) 1937 { 1938 struct dentry *child; 1939 bool all_negative = true; 1940 1941 if (!d_is_dir(dentry)) 1942 goto out; 1943 1944 spin_lock(&dentry->d_lock); 1945 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1946 if (d_really_is_positive(child)) { 1947 all_negative = false; 1948 break; 1949 } 1950 } 1951 spin_unlock(&dentry->d_lock); 1952 1953 if (all_negative) 1954 shrink_dcache_parent(dentry); 1955 out: 1956 return all_negative; 1957 } 1958 1959 /* 1960 * Trim old(er) caps. 1961 * 1962 * Because we can't cache an inode without one or more caps, we do 1963 * this indirectly: if a cap is unused, we prune its aliases, at which 1964 * point the inode will hopefully get dropped to. 1965 * 1966 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1967 * memory pressure from the MDS, though, so it needn't be perfect. 1968 */ 1969 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1970 { 1971 int *remaining = arg; 1972 struct ceph_inode_info *ci = ceph_inode(inode); 1973 int used, wanted, oissued, mine; 1974 1975 if (*remaining <= 0) 1976 return -1; 1977 1978 spin_lock(&ci->i_ceph_lock); 1979 mine = cap->issued | cap->implemented; 1980 used = __ceph_caps_used(ci); 1981 wanted = __ceph_caps_file_wanted(ci); 1982 oissued = __ceph_caps_issued_other(ci, cap); 1983 1984 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1985 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1986 ceph_cap_string(used), ceph_cap_string(wanted)); 1987 if (cap == ci->i_auth_cap) { 1988 if (ci->i_dirty_caps || ci->i_flushing_caps || 1989 !list_empty(&ci->i_cap_snaps)) 1990 goto out; 1991 if ((used | wanted) & CEPH_CAP_ANY_WR) 1992 goto out; 1993 /* Note: it's possible that i_filelock_ref becomes non-zero 1994 * after dropping auth caps. It doesn't hurt because reply 1995 * of lock mds request will re-add auth caps. */ 1996 if (atomic_read(&ci->i_filelock_ref) > 0) 1997 goto out; 1998 } 1999 /* The inode has cached pages, but it's no longer used. 2000 * we can safely drop it */ 2001 if (S_ISREG(inode->i_mode) && 2002 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 2003 !(oissued & CEPH_CAP_FILE_CACHE)) { 2004 used = 0; 2005 oissued = 0; 2006 } 2007 if ((used | wanted) & ~oissued & mine) 2008 goto out; /* we need these caps */ 2009 2010 if (oissued) { 2011 /* we aren't the only cap.. just remove us */ 2012 ceph_remove_cap(cap, true); 2013 (*remaining)--; 2014 } else { 2015 struct dentry *dentry; 2016 /* try dropping referring dentries */ 2017 spin_unlock(&ci->i_ceph_lock); 2018 dentry = d_find_any_alias(inode); 2019 if (dentry && drop_negative_children(dentry)) { 2020 int count; 2021 dput(dentry); 2022 d_prune_aliases(inode); 2023 count = atomic_read(&inode->i_count); 2024 if (count == 1) 2025 (*remaining)--; 2026 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 2027 inode, cap, count); 2028 } else { 2029 dput(dentry); 2030 } 2031 return 0; 2032 } 2033 2034 out: 2035 spin_unlock(&ci->i_ceph_lock); 2036 return 0; 2037 } 2038 2039 /* 2040 * Trim session cap count down to some max number. 2041 */ 2042 int ceph_trim_caps(struct ceph_mds_client *mdsc, 2043 struct ceph_mds_session *session, 2044 int max_caps) 2045 { 2046 int trim_caps = session->s_nr_caps - max_caps; 2047 2048 dout("trim_caps mds%d start: %d / %d, trim %d\n", 2049 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 2050 if (trim_caps > 0) { 2051 int remaining = trim_caps; 2052 2053 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2054 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 2055 session->s_mds, session->s_nr_caps, max_caps, 2056 trim_caps - remaining); 2057 } 2058 2059 ceph_flush_cap_releases(mdsc, session); 2060 return 0; 2061 } 2062 2063 static int check_caps_flush(struct ceph_mds_client *mdsc, 2064 u64 want_flush_tid) 2065 { 2066 int ret = 1; 2067 2068 spin_lock(&mdsc->cap_dirty_lock); 2069 if (!list_empty(&mdsc->cap_flush_list)) { 2070 struct ceph_cap_flush *cf = 2071 list_first_entry(&mdsc->cap_flush_list, 2072 struct ceph_cap_flush, g_list); 2073 if (cf->tid <= want_flush_tid) { 2074 dout("check_caps_flush still flushing tid " 2075 "%llu <= %llu\n", cf->tid, want_flush_tid); 2076 ret = 0; 2077 } 2078 } 2079 spin_unlock(&mdsc->cap_dirty_lock); 2080 return ret; 2081 } 2082 2083 /* 2084 * flush all dirty inode data to disk. 2085 * 2086 * returns true if we've flushed through want_flush_tid 2087 */ 2088 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2089 u64 want_flush_tid) 2090 { 2091 dout("check_caps_flush want %llu\n", want_flush_tid); 2092 2093 wait_event(mdsc->cap_flushing_wq, 2094 check_caps_flush(mdsc, want_flush_tid)); 2095 2096 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 2097 } 2098 2099 /* 2100 * called under s_mutex 2101 */ 2102 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2103 struct ceph_mds_session *session) 2104 { 2105 struct ceph_msg *msg = NULL; 2106 struct ceph_mds_cap_release *head; 2107 struct ceph_mds_cap_item *item; 2108 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2109 struct ceph_cap *cap; 2110 LIST_HEAD(tmp_list); 2111 int num_cap_releases; 2112 __le32 barrier, *cap_barrier; 2113 2114 down_read(&osdc->lock); 2115 barrier = cpu_to_le32(osdc->epoch_barrier); 2116 up_read(&osdc->lock); 2117 2118 spin_lock(&session->s_cap_lock); 2119 again: 2120 list_splice_init(&session->s_cap_releases, &tmp_list); 2121 num_cap_releases = session->s_num_cap_releases; 2122 session->s_num_cap_releases = 0; 2123 spin_unlock(&session->s_cap_lock); 2124 2125 while (!list_empty(&tmp_list)) { 2126 if (!msg) { 2127 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2128 PAGE_SIZE, GFP_NOFS, false); 2129 if (!msg) 2130 goto out_err; 2131 head = msg->front.iov_base; 2132 head->num = cpu_to_le32(0); 2133 msg->front.iov_len = sizeof(*head); 2134 2135 msg->hdr.version = cpu_to_le16(2); 2136 msg->hdr.compat_version = cpu_to_le16(1); 2137 } 2138 2139 cap = list_first_entry(&tmp_list, struct ceph_cap, 2140 session_caps); 2141 list_del(&cap->session_caps); 2142 num_cap_releases--; 2143 2144 head = msg->front.iov_base; 2145 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2146 &head->num); 2147 item = msg->front.iov_base + msg->front.iov_len; 2148 item->ino = cpu_to_le64(cap->cap_ino); 2149 item->cap_id = cpu_to_le64(cap->cap_id); 2150 item->migrate_seq = cpu_to_le32(cap->mseq); 2151 item->seq = cpu_to_le32(cap->issue_seq); 2152 msg->front.iov_len += sizeof(*item); 2153 2154 ceph_put_cap(mdsc, cap); 2155 2156 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2157 // Append cap_barrier field 2158 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2159 *cap_barrier = barrier; 2160 msg->front.iov_len += sizeof(*cap_barrier); 2161 2162 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2163 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2164 ceph_con_send(&session->s_con, msg); 2165 msg = NULL; 2166 } 2167 } 2168 2169 BUG_ON(num_cap_releases != 0); 2170 2171 spin_lock(&session->s_cap_lock); 2172 if (!list_empty(&session->s_cap_releases)) 2173 goto again; 2174 spin_unlock(&session->s_cap_lock); 2175 2176 if (msg) { 2177 // Append cap_barrier field 2178 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2179 *cap_barrier = barrier; 2180 msg->front.iov_len += sizeof(*cap_barrier); 2181 2182 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2183 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2184 ceph_con_send(&session->s_con, msg); 2185 } 2186 return; 2187 out_err: 2188 pr_err("send_cap_releases mds%d, failed to allocate message\n", 2189 session->s_mds); 2190 spin_lock(&session->s_cap_lock); 2191 list_splice(&tmp_list, &session->s_cap_releases); 2192 session->s_num_cap_releases += num_cap_releases; 2193 spin_unlock(&session->s_cap_lock); 2194 } 2195 2196 static void ceph_cap_release_work(struct work_struct *work) 2197 { 2198 struct ceph_mds_session *session = 2199 container_of(work, struct ceph_mds_session, s_cap_release_work); 2200 2201 mutex_lock(&session->s_mutex); 2202 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2203 session->s_state == CEPH_MDS_SESSION_HUNG) 2204 ceph_send_cap_releases(session->s_mdsc, session); 2205 mutex_unlock(&session->s_mutex); 2206 ceph_put_mds_session(session); 2207 } 2208 2209 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 2210 struct ceph_mds_session *session) 2211 { 2212 if (mdsc->stopping) 2213 return; 2214 2215 ceph_get_mds_session(session); 2216 if (queue_work(mdsc->fsc->cap_wq, 2217 &session->s_cap_release_work)) { 2218 dout("cap release work queued\n"); 2219 } else { 2220 ceph_put_mds_session(session); 2221 dout("failed to queue cap release work\n"); 2222 } 2223 } 2224 2225 /* 2226 * caller holds session->s_cap_lock 2227 */ 2228 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2229 struct ceph_cap *cap) 2230 { 2231 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2232 session->s_num_cap_releases++; 2233 2234 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2235 ceph_flush_cap_releases(session->s_mdsc, session); 2236 } 2237 2238 static void ceph_cap_reclaim_work(struct work_struct *work) 2239 { 2240 struct ceph_mds_client *mdsc = 2241 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2242 int ret = ceph_trim_dentries(mdsc); 2243 if (ret == -EAGAIN) 2244 ceph_queue_cap_reclaim_work(mdsc); 2245 } 2246 2247 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2248 { 2249 if (mdsc->stopping) 2250 return; 2251 2252 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2253 dout("caps reclaim work queued\n"); 2254 } else { 2255 dout("failed to queue caps release work\n"); 2256 } 2257 } 2258 2259 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2260 { 2261 int val; 2262 if (!nr) 2263 return; 2264 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2265 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2266 atomic_set(&mdsc->cap_reclaim_pending, 0); 2267 ceph_queue_cap_reclaim_work(mdsc); 2268 } 2269 } 2270 2271 /* 2272 * requests 2273 */ 2274 2275 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2276 struct inode *dir) 2277 { 2278 struct ceph_inode_info *ci = ceph_inode(dir); 2279 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2280 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2281 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2282 unsigned int num_entries; 2283 int order; 2284 2285 spin_lock(&ci->i_ceph_lock); 2286 num_entries = ci->i_files + ci->i_subdirs; 2287 spin_unlock(&ci->i_ceph_lock); 2288 num_entries = max(num_entries, 1U); 2289 num_entries = min(num_entries, opt->max_readdir); 2290 2291 order = get_order(size * num_entries); 2292 while (order >= 0) { 2293 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2294 __GFP_NOWARN, 2295 order); 2296 if (rinfo->dir_entries) 2297 break; 2298 order--; 2299 } 2300 if (!rinfo->dir_entries) 2301 return -ENOMEM; 2302 2303 num_entries = (PAGE_SIZE << order) / size; 2304 num_entries = min(num_entries, opt->max_readdir); 2305 2306 rinfo->dir_buf_size = PAGE_SIZE << order; 2307 req->r_num_caps = num_entries + 1; 2308 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2309 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2310 return 0; 2311 } 2312 2313 /* 2314 * Create an mds request. 2315 */ 2316 struct ceph_mds_request * 2317 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2318 { 2319 struct ceph_mds_request *req; 2320 2321 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2322 if (!req) 2323 return ERR_PTR(-ENOMEM); 2324 2325 mutex_init(&req->r_fill_mutex); 2326 req->r_mdsc = mdsc; 2327 req->r_started = jiffies; 2328 req->r_start_latency = ktime_get(); 2329 req->r_resend_mds = -1; 2330 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2331 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2332 req->r_fmode = -1; 2333 kref_init(&req->r_kref); 2334 RB_CLEAR_NODE(&req->r_node); 2335 INIT_LIST_HEAD(&req->r_wait); 2336 init_completion(&req->r_completion); 2337 init_completion(&req->r_safe_completion); 2338 INIT_LIST_HEAD(&req->r_unsafe_item); 2339 2340 ktime_get_coarse_real_ts64(&req->r_stamp); 2341 2342 req->r_op = op; 2343 req->r_direct_mode = mode; 2344 return req; 2345 } 2346 2347 /* 2348 * return oldest (lowest) request, tid in request tree, 0 if none. 2349 * 2350 * called under mdsc->mutex. 2351 */ 2352 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2353 { 2354 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2355 return NULL; 2356 return rb_entry(rb_first(&mdsc->request_tree), 2357 struct ceph_mds_request, r_node); 2358 } 2359 2360 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2361 { 2362 return mdsc->oldest_tid; 2363 } 2364 2365 /* 2366 * Build a dentry's path. Allocate on heap; caller must kfree. Based 2367 * on build_path_from_dentry in fs/cifs/dir.c. 2368 * 2369 * If @stop_on_nosnap, generate path relative to the first non-snapped 2370 * inode. 2371 * 2372 * Encode hidden .snap dirs as a double /, i.e. 2373 * foo/.snap/bar -> foo//bar 2374 */ 2375 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, 2376 int stop_on_nosnap) 2377 { 2378 struct dentry *temp; 2379 char *path; 2380 int pos; 2381 unsigned seq; 2382 u64 base; 2383 2384 if (!dentry) 2385 return ERR_PTR(-EINVAL); 2386 2387 path = __getname(); 2388 if (!path) 2389 return ERR_PTR(-ENOMEM); 2390 retry: 2391 pos = PATH_MAX - 1; 2392 path[pos] = '\0'; 2393 2394 seq = read_seqbegin(&rename_lock); 2395 rcu_read_lock(); 2396 temp = dentry; 2397 for (;;) { 2398 struct inode *inode; 2399 2400 spin_lock(&temp->d_lock); 2401 inode = d_inode(temp); 2402 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2403 dout("build_path path+%d: %p SNAPDIR\n", 2404 pos, temp); 2405 } else if (stop_on_nosnap && inode && dentry != temp && 2406 ceph_snap(inode) == CEPH_NOSNAP) { 2407 spin_unlock(&temp->d_lock); 2408 pos++; /* get rid of any prepended '/' */ 2409 break; 2410 } else { 2411 pos -= temp->d_name.len; 2412 if (pos < 0) { 2413 spin_unlock(&temp->d_lock); 2414 break; 2415 } 2416 memcpy(path + pos, temp->d_name.name, temp->d_name.len); 2417 } 2418 spin_unlock(&temp->d_lock); 2419 temp = READ_ONCE(temp->d_parent); 2420 2421 /* Are we at the root? */ 2422 if (IS_ROOT(temp)) 2423 break; 2424 2425 /* Are we out of buffer? */ 2426 if (--pos < 0) 2427 break; 2428 2429 path[pos] = '/'; 2430 } 2431 base = ceph_ino(d_inode(temp)); 2432 rcu_read_unlock(); 2433 2434 if (read_seqretry(&rename_lock, seq)) 2435 goto retry; 2436 2437 if (pos < 0) { 2438 /* 2439 * A rename didn't occur, but somehow we didn't end up where 2440 * we thought we would. Throw a warning and try again. 2441 */ 2442 pr_warn("build_path did not end path lookup where " 2443 "expected, pos is %d\n", pos); 2444 goto retry; 2445 } 2446 2447 *pbase = base; 2448 *plen = PATH_MAX - 1 - pos; 2449 dout("build_path on %p %d built %llx '%.*s'\n", 2450 dentry, d_count(dentry), base, *plen, path + pos); 2451 return path + pos; 2452 } 2453 2454 static int build_dentry_path(struct dentry *dentry, struct inode *dir, 2455 const char **ppath, int *ppathlen, u64 *pino, 2456 bool *pfreepath, bool parent_locked) 2457 { 2458 char *path; 2459 2460 rcu_read_lock(); 2461 if (!dir) 2462 dir = d_inode_rcu(dentry->d_parent); 2463 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { 2464 *pino = ceph_ino(dir); 2465 rcu_read_unlock(); 2466 *ppath = dentry->d_name.name; 2467 *ppathlen = dentry->d_name.len; 2468 return 0; 2469 } 2470 rcu_read_unlock(); 2471 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2472 if (IS_ERR(path)) 2473 return PTR_ERR(path); 2474 *ppath = path; 2475 *pfreepath = true; 2476 return 0; 2477 } 2478 2479 static int build_inode_path(struct inode *inode, 2480 const char **ppath, int *ppathlen, u64 *pino, 2481 bool *pfreepath) 2482 { 2483 struct dentry *dentry; 2484 char *path; 2485 2486 if (ceph_snap(inode) == CEPH_NOSNAP) { 2487 *pino = ceph_ino(inode); 2488 *ppathlen = 0; 2489 return 0; 2490 } 2491 dentry = d_find_alias(inode); 2492 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2493 dput(dentry); 2494 if (IS_ERR(path)) 2495 return PTR_ERR(path); 2496 *ppath = path; 2497 *pfreepath = true; 2498 return 0; 2499 } 2500 2501 /* 2502 * request arguments may be specified via an inode *, a dentry *, or 2503 * an explicit ino+path. 2504 */ 2505 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2506 struct inode *rdiri, const char *rpath, 2507 u64 rino, const char **ppath, int *pathlen, 2508 u64 *ino, bool *freepath, bool parent_locked) 2509 { 2510 int r = 0; 2511 2512 if (rinode) { 2513 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2514 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2515 ceph_snap(rinode)); 2516 } else if (rdentry) { 2517 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2518 freepath, parent_locked); 2519 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2520 *ppath); 2521 } else if (rpath || rino) { 2522 *ino = rino; 2523 *ppath = rpath; 2524 *pathlen = rpath ? strlen(rpath) : 0; 2525 dout(" path %.*s\n", *pathlen, rpath); 2526 } 2527 2528 return r; 2529 } 2530 2531 static void encode_timestamp_and_gids(void **p, 2532 const struct ceph_mds_request *req) 2533 { 2534 struct ceph_timespec ts; 2535 int i; 2536 2537 ceph_encode_timespec64(&ts, &req->r_stamp); 2538 ceph_encode_copy(p, &ts, sizeof(ts)); 2539 2540 /* gid_list */ 2541 ceph_encode_32(p, req->r_cred->group_info->ngroups); 2542 for (i = 0; i < req->r_cred->group_info->ngroups; i++) 2543 ceph_encode_64(p, from_kgid(&init_user_ns, 2544 req->r_cred->group_info->gid[i])); 2545 } 2546 2547 /* 2548 * called under mdsc->mutex 2549 */ 2550 static struct ceph_msg *create_request_message(struct ceph_mds_session *session, 2551 struct ceph_mds_request *req, 2552 bool drop_cap_releases) 2553 { 2554 int mds = session->s_mds; 2555 struct ceph_mds_client *mdsc = session->s_mdsc; 2556 struct ceph_msg *msg; 2557 struct ceph_mds_request_head_old *head; 2558 const char *path1 = NULL; 2559 const char *path2 = NULL; 2560 u64 ino1 = 0, ino2 = 0; 2561 int pathlen1 = 0, pathlen2 = 0; 2562 bool freepath1 = false, freepath2 = false; 2563 int len; 2564 u16 releases; 2565 void *p, *end; 2566 int ret; 2567 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); 2568 2569 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2570 req->r_parent, req->r_path1, req->r_ino1.ino, 2571 &path1, &pathlen1, &ino1, &freepath1, 2572 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2573 &req->r_req_flags)); 2574 if (ret < 0) { 2575 msg = ERR_PTR(ret); 2576 goto out; 2577 } 2578 2579 /* If r_old_dentry is set, then assume that its parent is locked */ 2580 ret = set_request_path_attr(NULL, req->r_old_dentry, 2581 req->r_old_dentry_dir, 2582 req->r_path2, req->r_ino2.ino, 2583 &path2, &pathlen2, &ino2, &freepath2, true); 2584 if (ret < 0) { 2585 msg = ERR_PTR(ret); 2586 goto out_free1; 2587 } 2588 2589 len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head); 2590 len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2591 sizeof(struct ceph_timespec); 2592 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); 2593 2594 /* calculate (max) length for cap releases */ 2595 len += sizeof(struct ceph_mds_request_release) * 2596 (!!req->r_inode_drop + !!req->r_dentry_drop + 2597 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2598 2599 if (req->r_dentry_drop) 2600 len += pathlen1; 2601 if (req->r_old_dentry_drop) 2602 len += pathlen2; 2603 2604 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2605 if (!msg) { 2606 msg = ERR_PTR(-ENOMEM); 2607 goto out_free2; 2608 } 2609 2610 msg->hdr.tid = cpu_to_le64(req->r_tid); 2611 2612 /* 2613 * The old ceph_mds_request_head didn't contain a version field, and 2614 * one was added when we moved the message version from 3->4. 2615 */ 2616 if (legacy) { 2617 msg->hdr.version = cpu_to_le16(3); 2618 head = msg->front.iov_base; 2619 p = msg->front.iov_base + sizeof(*head); 2620 } else { 2621 struct ceph_mds_request_head *new_head = msg->front.iov_base; 2622 2623 msg->hdr.version = cpu_to_le16(4); 2624 new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); 2625 head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2626 p = msg->front.iov_base + sizeof(*new_head); 2627 } 2628 2629 end = msg->front.iov_base + msg->front.iov_len; 2630 2631 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2632 head->op = cpu_to_le32(req->r_op); 2633 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, 2634 req->r_cred->fsuid)); 2635 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, 2636 req->r_cred->fsgid)); 2637 head->ino = cpu_to_le64(req->r_deleg_ino); 2638 head->args = req->r_args; 2639 2640 ceph_encode_filepath(&p, end, ino1, path1); 2641 ceph_encode_filepath(&p, end, ino2, path2); 2642 2643 /* make note of release offset, in case we need to replay */ 2644 req->r_request_release_offset = p - msg->front.iov_base; 2645 2646 /* cap releases */ 2647 releases = 0; 2648 if (req->r_inode_drop) 2649 releases += ceph_encode_inode_release(&p, 2650 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2651 mds, req->r_inode_drop, req->r_inode_unless, 2652 req->r_op == CEPH_MDS_OP_READDIR); 2653 if (req->r_dentry_drop) 2654 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2655 req->r_parent, mds, req->r_dentry_drop, 2656 req->r_dentry_unless); 2657 if (req->r_old_dentry_drop) 2658 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2659 req->r_old_dentry_dir, mds, 2660 req->r_old_dentry_drop, 2661 req->r_old_dentry_unless); 2662 if (req->r_old_inode_drop) 2663 releases += ceph_encode_inode_release(&p, 2664 d_inode(req->r_old_dentry), 2665 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2666 2667 if (drop_cap_releases) { 2668 releases = 0; 2669 p = msg->front.iov_base + req->r_request_release_offset; 2670 } 2671 2672 head->num_releases = cpu_to_le16(releases); 2673 2674 encode_timestamp_and_gids(&p, req); 2675 2676 if (WARN_ON_ONCE(p > end)) { 2677 ceph_msg_put(msg); 2678 msg = ERR_PTR(-ERANGE); 2679 goto out_free2; 2680 } 2681 2682 msg->front.iov_len = p - msg->front.iov_base; 2683 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2684 2685 if (req->r_pagelist) { 2686 struct ceph_pagelist *pagelist = req->r_pagelist; 2687 ceph_msg_data_add_pagelist(msg, pagelist); 2688 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2689 } else { 2690 msg->hdr.data_len = 0; 2691 } 2692 2693 msg->hdr.data_off = cpu_to_le16(0); 2694 2695 out_free2: 2696 if (freepath2) 2697 ceph_mdsc_free_path((char *)path2, pathlen2); 2698 out_free1: 2699 if (freepath1) 2700 ceph_mdsc_free_path((char *)path1, pathlen1); 2701 out: 2702 return msg; 2703 } 2704 2705 /* 2706 * called under mdsc->mutex if error, under no mutex if 2707 * success. 2708 */ 2709 static void complete_request(struct ceph_mds_client *mdsc, 2710 struct ceph_mds_request *req) 2711 { 2712 req->r_end_latency = ktime_get(); 2713 2714 if (req->r_callback) 2715 req->r_callback(mdsc, req); 2716 complete_all(&req->r_completion); 2717 } 2718 2719 static struct ceph_mds_request_head_old * 2720 find_old_request_head(void *p, u64 features) 2721 { 2722 bool legacy = !(features & CEPH_FEATURE_FS_BTIME); 2723 struct ceph_mds_request_head *new_head; 2724 2725 if (legacy) 2726 return (struct ceph_mds_request_head_old *)p; 2727 new_head = (struct ceph_mds_request_head *)p; 2728 return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2729 } 2730 2731 /* 2732 * called under mdsc->mutex 2733 */ 2734 static int __prepare_send_request(struct ceph_mds_session *session, 2735 struct ceph_mds_request *req, 2736 bool drop_cap_releases) 2737 { 2738 int mds = session->s_mds; 2739 struct ceph_mds_client *mdsc = session->s_mdsc; 2740 struct ceph_mds_request_head_old *rhead; 2741 struct ceph_msg *msg; 2742 int flags = 0; 2743 2744 req->r_attempts++; 2745 if (req->r_inode) { 2746 struct ceph_cap *cap = 2747 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2748 2749 if (cap) 2750 req->r_sent_on_mseq = cap->mseq; 2751 else 2752 req->r_sent_on_mseq = -1; 2753 } 2754 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 2755 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2756 2757 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2758 void *p; 2759 2760 /* 2761 * Replay. Do not regenerate message (and rebuild 2762 * paths, etc.); just use the original message. 2763 * Rebuilding paths will break for renames because 2764 * d_move mangles the src name. 2765 */ 2766 msg = req->r_request; 2767 rhead = find_old_request_head(msg->front.iov_base, 2768 session->s_con.peer_features); 2769 2770 flags = le32_to_cpu(rhead->flags); 2771 flags |= CEPH_MDS_FLAG_REPLAY; 2772 rhead->flags = cpu_to_le32(flags); 2773 2774 if (req->r_target_inode) 2775 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2776 2777 rhead->num_retry = req->r_attempts - 1; 2778 2779 /* remove cap/dentry releases from message */ 2780 rhead->num_releases = 0; 2781 2782 p = msg->front.iov_base + req->r_request_release_offset; 2783 encode_timestamp_and_gids(&p, req); 2784 2785 msg->front.iov_len = p - msg->front.iov_base; 2786 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2787 return 0; 2788 } 2789 2790 if (req->r_request) { 2791 ceph_msg_put(req->r_request); 2792 req->r_request = NULL; 2793 } 2794 msg = create_request_message(session, req, drop_cap_releases); 2795 if (IS_ERR(msg)) { 2796 req->r_err = PTR_ERR(msg); 2797 return PTR_ERR(msg); 2798 } 2799 req->r_request = msg; 2800 2801 rhead = find_old_request_head(msg->front.iov_base, 2802 session->s_con.peer_features); 2803 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2804 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2805 flags |= CEPH_MDS_FLAG_REPLAY; 2806 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 2807 flags |= CEPH_MDS_FLAG_ASYNC; 2808 if (req->r_parent) 2809 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2810 rhead->flags = cpu_to_le32(flags); 2811 rhead->num_fwd = req->r_num_fwd; 2812 rhead->num_retry = req->r_attempts - 1; 2813 2814 dout(" r_parent = %p\n", req->r_parent); 2815 return 0; 2816 } 2817 2818 /* 2819 * called under mdsc->mutex 2820 */ 2821 static int __send_request(struct ceph_mds_session *session, 2822 struct ceph_mds_request *req, 2823 bool drop_cap_releases) 2824 { 2825 int err; 2826 2827 err = __prepare_send_request(session, req, drop_cap_releases); 2828 if (!err) { 2829 ceph_msg_get(req->r_request); 2830 ceph_con_send(&session->s_con, req->r_request); 2831 } 2832 2833 return err; 2834 } 2835 2836 /* 2837 * send request, or put it on the appropriate wait list. 2838 */ 2839 static void __do_request(struct ceph_mds_client *mdsc, 2840 struct ceph_mds_request *req) 2841 { 2842 struct ceph_mds_session *session = NULL; 2843 int mds = -1; 2844 int err = 0; 2845 bool random; 2846 2847 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2848 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2849 __unregister_request(mdsc, req); 2850 return; 2851 } 2852 2853 if (req->r_timeout && 2854 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2855 dout("do_request timed out\n"); 2856 err = -ETIMEDOUT; 2857 goto finish; 2858 } 2859 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2860 dout("do_request forced umount\n"); 2861 err = -EIO; 2862 goto finish; 2863 } 2864 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2865 if (mdsc->mdsmap_err) { 2866 err = mdsc->mdsmap_err; 2867 dout("do_request mdsmap err %d\n", err); 2868 goto finish; 2869 } 2870 if (mdsc->mdsmap->m_epoch == 0) { 2871 dout("do_request no mdsmap, waiting for map\n"); 2872 list_add(&req->r_wait, &mdsc->waiting_for_map); 2873 return; 2874 } 2875 if (!(mdsc->fsc->mount_options->flags & 2876 CEPH_MOUNT_OPT_MOUNTWAIT) && 2877 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2878 err = -EHOSTUNREACH; 2879 goto finish; 2880 } 2881 } 2882 2883 put_request_session(req); 2884 2885 mds = __choose_mds(mdsc, req, &random); 2886 if (mds < 0 || 2887 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2888 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2889 err = -EJUKEBOX; 2890 goto finish; 2891 } 2892 dout("do_request no mds or not active, waiting for map\n"); 2893 list_add(&req->r_wait, &mdsc->waiting_for_map); 2894 return; 2895 } 2896 2897 /* get, open session */ 2898 session = __ceph_lookup_mds_session(mdsc, mds); 2899 if (!session) { 2900 session = register_session(mdsc, mds); 2901 if (IS_ERR(session)) { 2902 err = PTR_ERR(session); 2903 goto finish; 2904 } 2905 } 2906 req->r_session = ceph_get_mds_session(session); 2907 2908 dout("do_request mds%d session %p state %s\n", mds, session, 2909 ceph_session_state_name(session->s_state)); 2910 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2911 session->s_state != CEPH_MDS_SESSION_HUNG) { 2912 /* 2913 * We cannot queue async requests since the caps and delegated 2914 * inodes are bound to the session. Just return -EJUKEBOX and 2915 * let the caller retry a sync request in that case. 2916 */ 2917 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2918 err = -EJUKEBOX; 2919 goto out_session; 2920 } 2921 2922 /* 2923 * If the session has been REJECTED, then return a hard error, 2924 * unless it's a CLEANRECOVER mount, in which case we'll queue 2925 * it to the mdsc queue. 2926 */ 2927 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2928 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) 2929 list_add(&req->r_wait, &mdsc->waiting_for_map); 2930 else 2931 err = -EACCES; 2932 goto out_session; 2933 } 2934 2935 if (session->s_state == CEPH_MDS_SESSION_NEW || 2936 session->s_state == CEPH_MDS_SESSION_CLOSING) { 2937 err = __open_session(mdsc, session); 2938 if (err) 2939 goto out_session; 2940 /* retry the same mds later */ 2941 if (random) 2942 req->r_resend_mds = mds; 2943 } 2944 list_add(&req->r_wait, &session->s_waiting); 2945 goto out_session; 2946 } 2947 2948 /* send request */ 2949 req->r_resend_mds = -1; /* forget any previous mds hint */ 2950 2951 if (req->r_request_started == 0) /* note request start time */ 2952 req->r_request_started = jiffies; 2953 2954 err = __send_request(session, req, false); 2955 2956 out_session: 2957 ceph_put_mds_session(session); 2958 finish: 2959 if (err) { 2960 dout("__do_request early error %d\n", err); 2961 req->r_err = err; 2962 complete_request(mdsc, req); 2963 __unregister_request(mdsc, req); 2964 } 2965 return; 2966 } 2967 2968 /* 2969 * called under mdsc->mutex 2970 */ 2971 static void __wake_requests(struct ceph_mds_client *mdsc, 2972 struct list_head *head) 2973 { 2974 struct ceph_mds_request *req; 2975 LIST_HEAD(tmp_list); 2976 2977 list_splice_init(head, &tmp_list); 2978 2979 while (!list_empty(&tmp_list)) { 2980 req = list_entry(tmp_list.next, 2981 struct ceph_mds_request, r_wait); 2982 list_del_init(&req->r_wait); 2983 dout(" wake request %p tid %llu\n", req, req->r_tid); 2984 __do_request(mdsc, req); 2985 } 2986 } 2987 2988 /* 2989 * Wake up threads with requests pending for @mds, so that they can 2990 * resubmit their requests to a possibly different mds. 2991 */ 2992 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 2993 { 2994 struct ceph_mds_request *req; 2995 struct rb_node *p = rb_first(&mdsc->request_tree); 2996 2997 dout("kick_requests mds%d\n", mds); 2998 while (p) { 2999 req = rb_entry(p, struct ceph_mds_request, r_node); 3000 p = rb_next(p); 3001 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3002 continue; 3003 if (req->r_attempts > 0) 3004 continue; /* only new requests */ 3005 if (req->r_session && 3006 req->r_session->s_mds == mds) { 3007 dout(" kicking tid %llu\n", req->r_tid); 3008 list_del_init(&req->r_wait); 3009 __do_request(mdsc, req); 3010 } 3011 } 3012 } 3013 3014 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 3015 struct ceph_mds_request *req) 3016 { 3017 int err = 0; 3018 3019 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 3020 if (req->r_inode) 3021 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 3022 if (req->r_parent) { 3023 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 3024 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 3025 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 3026 spin_lock(&ci->i_ceph_lock); 3027 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 3028 __ceph_touch_fmode(ci, mdsc, fmode); 3029 spin_unlock(&ci->i_ceph_lock); 3030 } 3031 if (req->r_old_dentry_dir) 3032 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 3033 CEPH_CAP_PIN); 3034 3035 if (req->r_inode) { 3036 err = ceph_wait_on_async_create(req->r_inode); 3037 if (err) { 3038 dout("%s: wait for async create returned: %d\n", 3039 __func__, err); 3040 return err; 3041 } 3042 } 3043 3044 if (!err && req->r_old_inode) { 3045 err = ceph_wait_on_async_create(req->r_old_inode); 3046 if (err) { 3047 dout("%s: wait for async create returned: %d\n", 3048 __func__, err); 3049 return err; 3050 } 3051 } 3052 3053 dout("submit_request on %p for inode %p\n", req, dir); 3054 mutex_lock(&mdsc->mutex); 3055 __register_request(mdsc, req, dir); 3056 __do_request(mdsc, req); 3057 err = req->r_err; 3058 mutex_unlock(&mdsc->mutex); 3059 return err; 3060 } 3061 3062 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 3063 struct ceph_mds_request *req) 3064 { 3065 int err; 3066 3067 /* wait */ 3068 dout("do_request waiting\n"); 3069 if (!req->r_timeout && req->r_wait_for_completion) { 3070 err = req->r_wait_for_completion(mdsc, req); 3071 } else { 3072 long timeleft = wait_for_completion_killable_timeout( 3073 &req->r_completion, 3074 ceph_timeout_jiffies(req->r_timeout)); 3075 if (timeleft > 0) 3076 err = 0; 3077 else if (!timeleft) 3078 err = -ETIMEDOUT; /* timed out */ 3079 else 3080 err = timeleft; /* killed */ 3081 } 3082 dout("do_request waited, got %d\n", err); 3083 mutex_lock(&mdsc->mutex); 3084 3085 /* only abort if we didn't race with a real reply */ 3086 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3087 err = le32_to_cpu(req->r_reply_info.head->result); 3088 } else if (err < 0) { 3089 dout("aborted request %lld with %d\n", req->r_tid, err); 3090 3091 /* 3092 * ensure we aren't running concurrently with 3093 * ceph_fill_trace or ceph_readdir_prepopulate, which 3094 * rely on locks (dir mutex) held by our caller. 3095 */ 3096 mutex_lock(&req->r_fill_mutex); 3097 req->r_err = err; 3098 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3099 mutex_unlock(&req->r_fill_mutex); 3100 3101 if (req->r_parent && 3102 (req->r_op & CEPH_MDS_OP_WRITE)) 3103 ceph_invalidate_dir_request(req); 3104 } else { 3105 err = req->r_err; 3106 } 3107 3108 mutex_unlock(&mdsc->mutex); 3109 return err; 3110 } 3111 3112 /* 3113 * Synchrously perform an mds request. Take care of all of the 3114 * session setup, forwarding, retry details. 3115 */ 3116 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3117 struct inode *dir, 3118 struct ceph_mds_request *req) 3119 { 3120 int err; 3121 3122 dout("do_request on %p\n", req); 3123 3124 /* issue */ 3125 err = ceph_mdsc_submit_request(mdsc, dir, req); 3126 if (!err) 3127 err = ceph_mdsc_wait_request(mdsc, req); 3128 dout("do_request %p done, result %d\n", req, err); 3129 return err; 3130 } 3131 3132 /* 3133 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3134 * namespace request. 3135 */ 3136 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3137 { 3138 struct inode *dir = req->r_parent; 3139 struct inode *old_dir = req->r_old_dentry_dir; 3140 3141 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 3142 3143 ceph_dir_clear_complete(dir); 3144 if (old_dir) 3145 ceph_dir_clear_complete(old_dir); 3146 if (req->r_dentry) 3147 ceph_invalidate_dentry_lease(req->r_dentry); 3148 if (req->r_old_dentry) 3149 ceph_invalidate_dentry_lease(req->r_old_dentry); 3150 } 3151 3152 /* 3153 * Handle mds reply. 3154 * 3155 * We take the session mutex and parse and process the reply immediately. 3156 * This preserves the logical ordering of replies, capabilities, etc., sent 3157 * by the MDS as they are applied to our local cache. 3158 */ 3159 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3160 { 3161 struct ceph_mds_client *mdsc = session->s_mdsc; 3162 struct ceph_mds_request *req; 3163 struct ceph_mds_reply_head *head = msg->front.iov_base; 3164 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3165 struct ceph_snap_realm *realm; 3166 u64 tid; 3167 int err, result; 3168 int mds = session->s_mds; 3169 3170 if (msg->front.iov_len < sizeof(*head)) { 3171 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 3172 ceph_msg_dump(msg); 3173 return; 3174 } 3175 3176 /* get request, session */ 3177 tid = le64_to_cpu(msg->hdr.tid); 3178 mutex_lock(&mdsc->mutex); 3179 req = lookup_get_request(mdsc, tid); 3180 if (!req) { 3181 dout("handle_reply on unknown tid %llu\n", tid); 3182 mutex_unlock(&mdsc->mutex); 3183 return; 3184 } 3185 dout("handle_reply %p\n", req); 3186 3187 /* correct session? */ 3188 if (req->r_session != session) { 3189 pr_err("mdsc_handle_reply got %llu on session mds%d" 3190 " not mds%d\n", tid, session->s_mds, 3191 req->r_session ? req->r_session->s_mds : -1); 3192 mutex_unlock(&mdsc->mutex); 3193 goto out; 3194 } 3195 3196 /* dup? */ 3197 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3198 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3199 pr_warn("got a dup %s reply on %llu from mds%d\n", 3200 head->safe ? "safe" : "unsafe", tid, mds); 3201 mutex_unlock(&mdsc->mutex); 3202 goto out; 3203 } 3204 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3205 pr_warn("got unsafe after safe on %llu from mds%d\n", 3206 tid, mds); 3207 mutex_unlock(&mdsc->mutex); 3208 goto out; 3209 } 3210 3211 result = le32_to_cpu(head->result); 3212 3213 /* 3214 * Handle an ESTALE 3215 * if we're not talking to the authority, send to them 3216 * if the authority has changed while we weren't looking, 3217 * send to new authority 3218 * Otherwise we just have to return an ESTALE 3219 */ 3220 if (result == -ESTALE) { 3221 dout("got ESTALE on request %llu\n", req->r_tid); 3222 req->r_resend_mds = -1; 3223 if (req->r_direct_mode != USE_AUTH_MDS) { 3224 dout("not using auth, setting for that now\n"); 3225 req->r_direct_mode = USE_AUTH_MDS; 3226 __do_request(mdsc, req); 3227 mutex_unlock(&mdsc->mutex); 3228 goto out; 3229 } else { 3230 int mds = __choose_mds(mdsc, req, NULL); 3231 if (mds >= 0 && mds != req->r_session->s_mds) { 3232 dout("but auth changed, so resending\n"); 3233 __do_request(mdsc, req); 3234 mutex_unlock(&mdsc->mutex); 3235 goto out; 3236 } 3237 } 3238 dout("have to return ESTALE on request %llu\n", req->r_tid); 3239 } 3240 3241 3242 if (head->safe) { 3243 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3244 __unregister_request(mdsc, req); 3245 3246 /* last request during umount? */ 3247 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3248 complete_all(&mdsc->safe_umount_waiters); 3249 3250 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3251 /* 3252 * We already handled the unsafe response, now do the 3253 * cleanup. No need to examine the response; the MDS 3254 * doesn't include any result info in the safe 3255 * response. And even if it did, there is nothing 3256 * useful we could do with a revised return value. 3257 */ 3258 dout("got safe reply %llu, mds%d\n", tid, mds); 3259 3260 mutex_unlock(&mdsc->mutex); 3261 goto out; 3262 } 3263 } else { 3264 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3265 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3266 } 3267 3268 dout("handle_reply tid %lld result %d\n", tid, result); 3269 rinfo = &req->r_reply_info; 3270 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3271 err = parse_reply_info(session, msg, rinfo, (u64)-1); 3272 else 3273 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); 3274 mutex_unlock(&mdsc->mutex); 3275 3276 /* Must find target inode outside of mutexes to avoid deadlocks */ 3277 if ((err >= 0) && rinfo->head->is_target) { 3278 struct inode *in; 3279 struct ceph_vino tvino = { 3280 .ino = le64_to_cpu(rinfo->targeti.in->ino), 3281 .snap = le64_to_cpu(rinfo->targeti.in->snapid) 3282 }; 3283 3284 in = ceph_get_inode(mdsc->fsc->sb, tvino); 3285 if (IS_ERR(in)) { 3286 err = PTR_ERR(in); 3287 mutex_lock(&session->s_mutex); 3288 goto out_err; 3289 } 3290 req->r_target_inode = in; 3291 } 3292 3293 mutex_lock(&session->s_mutex); 3294 if (err < 0) { 3295 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 3296 ceph_msg_dump(msg); 3297 goto out_err; 3298 } 3299 3300 /* snap trace */ 3301 realm = NULL; 3302 if (rinfo->snapblob_len) { 3303 down_write(&mdsc->snap_rwsem); 3304 ceph_update_snap_trace(mdsc, rinfo->snapblob, 3305 rinfo->snapblob + rinfo->snapblob_len, 3306 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3307 &realm); 3308 downgrade_write(&mdsc->snap_rwsem); 3309 } else { 3310 down_read(&mdsc->snap_rwsem); 3311 } 3312 3313 /* insert trace into our cache */ 3314 mutex_lock(&req->r_fill_mutex); 3315 current->journal_info = req; 3316 err = ceph_fill_trace(mdsc->fsc->sb, req); 3317 if (err == 0) { 3318 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3319 req->r_op == CEPH_MDS_OP_LSSNAP)) 3320 ceph_readdir_prepopulate(req, req->r_session); 3321 } 3322 current->journal_info = NULL; 3323 mutex_unlock(&req->r_fill_mutex); 3324 3325 up_read(&mdsc->snap_rwsem); 3326 if (realm) 3327 ceph_put_snap_realm(mdsc, realm); 3328 3329 if (err == 0) { 3330 if (req->r_target_inode && 3331 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3332 struct ceph_inode_info *ci = 3333 ceph_inode(req->r_target_inode); 3334 spin_lock(&ci->i_unsafe_lock); 3335 list_add_tail(&req->r_unsafe_target_item, 3336 &ci->i_unsafe_iops); 3337 spin_unlock(&ci->i_unsafe_lock); 3338 } 3339 3340 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3341 } 3342 out_err: 3343 mutex_lock(&mdsc->mutex); 3344 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3345 if (err) { 3346 req->r_err = err; 3347 } else { 3348 req->r_reply = ceph_msg_get(msg); 3349 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3350 } 3351 } else { 3352 dout("reply arrived after request %lld was aborted\n", tid); 3353 } 3354 mutex_unlock(&mdsc->mutex); 3355 3356 mutex_unlock(&session->s_mutex); 3357 3358 /* kick calling process */ 3359 complete_request(mdsc, req); 3360 3361 ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency, 3362 req->r_end_latency, err); 3363 out: 3364 ceph_mdsc_put_request(req); 3365 return; 3366 } 3367 3368 3369 3370 /* 3371 * handle mds notification that our request has been forwarded. 3372 */ 3373 static void handle_forward(struct ceph_mds_client *mdsc, 3374 struct ceph_mds_session *session, 3375 struct ceph_msg *msg) 3376 { 3377 struct ceph_mds_request *req; 3378 u64 tid = le64_to_cpu(msg->hdr.tid); 3379 u32 next_mds; 3380 u32 fwd_seq; 3381 int err = -EINVAL; 3382 void *p = msg->front.iov_base; 3383 void *end = p + msg->front.iov_len; 3384 3385 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3386 next_mds = ceph_decode_32(&p); 3387 fwd_seq = ceph_decode_32(&p); 3388 3389 mutex_lock(&mdsc->mutex); 3390 req = lookup_get_request(mdsc, tid); 3391 if (!req) { 3392 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 3393 goto out; /* dup reply? */ 3394 } 3395 3396 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3397 dout("forward tid %llu aborted, unregistering\n", tid); 3398 __unregister_request(mdsc, req); 3399 } else if (fwd_seq <= req->r_num_fwd) { 3400 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 3401 tid, next_mds, req->r_num_fwd, fwd_seq); 3402 } else { 3403 /* resend. forward race not possible; mds would drop */ 3404 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3405 BUG_ON(req->r_err); 3406 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3407 req->r_attempts = 0; 3408 req->r_num_fwd = fwd_seq; 3409 req->r_resend_mds = next_mds; 3410 put_request_session(req); 3411 __do_request(mdsc, req); 3412 } 3413 ceph_mdsc_put_request(req); 3414 out: 3415 mutex_unlock(&mdsc->mutex); 3416 return; 3417 3418 bad: 3419 pr_err("mdsc_handle_forward decode error err=%d\n", err); 3420 } 3421 3422 static int __decode_session_metadata(void **p, void *end, 3423 bool *blocklisted) 3424 { 3425 /* map<string,string> */ 3426 u32 n; 3427 bool err_str; 3428 ceph_decode_32_safe(p, end, n, bad); 3429 while (n-- > 0) { 3430 u32 len; 3431 ceph_decode_32_safe(p, end, len, bad); 3432 ceph_decode_need(p, end, len, bad); 3433 err_str = !strncmp(*p, "error_string", len); 3434 *p += len; 3435 ceph_decode_32_safe(p, end, len, bad); 3436 ceph_decode_need(p, end, len, bad); 3437 /* 3438 * Match "blocklisted (blacklisted)" from newer MDSes, 3439 * or "blacklisted" from older MDSes. 3440 */ 3441 if (err_str && strnstr(*p, "blacklisted", len)) 3442 *blocklisted = true; 3443 *p += len; 3444 } 3445 return 0; 3446 bad: 3447 return -1; 3448 } 3449 3450 /* 3451 * handle a mds session control message 3452 */ 3453 static void handle_session(struct ceph_mds_session *session, 3454 struct ceph_msg *msg) 3455 { 3456 struct ceph_mds_client *mdsc = session->s_mdsc; 3457 int mds = session->s_mds; 3458 int msg_version = le16_to_cpu(msg->hdr.version); 3459 void *p = msg->front.iov_base; 3460 void *end = p + msg->front.iov_len; 3461 struct ceph_mds_session_head *h; 3462 u32 op; 3463 u64 seq, features = 0; 3464 int wake = 0; 3465 bool blocklisted = false; 3466 3467 /* decode */ 3468 ceph_decode_need(&p, end, sizeof(*h), bad); 3469 h = p; 3470 p += sizeof(*h); 3471 3472 op = le32_to_cpu(h->op); 3473 seq = le64_to_cpu(h->seq); 3474 3475 if (msg_version >= 3) { 3476 u32 len; 3477 /* version >= 2, metadata */ 3478 if (__decode_session_metadata(&p, end, &blocklisted) < 0) 3479 goto bad; 3480 /* version >= 3, feature bits */ 3481 ceph_decode_32_safe(&p, end, len, bad); 3482 if (len) { 3483 ceph_decode_64_safe(&p, end, features, bad); 3484 p += len - sizeof(features); 3485 } 3486 } 3487 3488 mutex_lock(&mdsc->mutex); 3489 if (op == CEPH_SESSION_CLOSE) { 3490 ceph_get_mds_session(session); 3491 __unregister_session(mdsc, session); 3492 } 3493 /* FIXME: this ttl calculation is generous */ 3494 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 3495 mutex_unlock(&mdsc->mutex); 3496 3497 mutex_lock(&session->s_mutex); 3498 3499 dout("handle_session mds%d %s %p state %s seq %llu\n", 3500 mds, ceph_session_op_name(op), session, 3501 ceph_session_state_name(session->s_state), seq); 3502 3503 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 3504 session->s_state = CEPH_MDS_SESSION_OPEN; 3505 pr_info("mds%d came back\n", session->s_mds); 3506 } 3507 3508 switch (op) { 3509 case CEPH_SESSION_OPEN: 3510 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3511 pr_info("mds%d reconnect success\n", session->s_mds); 3512 session->s_state = CEPH_MDS_SESSION_OPEN; 3513 session->s_features = features; 3514 renewed_caps(mdsc, session, 0); 3515 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features)) 3516 metric_schedule_delayed(&mdsc->metric); 3517 wake = 1; 3518 if (mdsc->stopping) 3519 __close_session(mdsc, session); 3520 break; 3521 3522 case CEPH_SESSION_RENEWCAPS: 3523 if (session->s_renew_seq == seq) 3524 renewed_caps(mdsc, session, 1); 3525 break; 3526 3527 case CEPH_SESSION_CLOSE: 3528 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3529 pr_info("mds%d reconnect denied\n", session->s_mds); 3530 session->s_state = CEPH_MDS_SESSION_CLOSED; 3531 cleanup_session_requests(mdsc, session); 3532 remove_session_caps(session); 3533 wake = 2; /* for good measure */ 3534 wake_up_all(&mdsc->session_close_wq); 3535 break; 3536 3537 case CEPH_SESSION_STALE: 3538 pr_info("mds%d caps went stale, renewing\n", 3539 session->s_mds); 3540 atomic_inc(&session->s_cap_gen); 3541 session->s_cap_ttl = jiffies - 1; 3542 send_renew_caps(mdsc, session); 3543 break; 3544 3545 case CEPH_SESSION_RECALL_STATE: 3546 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 3547 break; 3548 3549 case CEPH_SESSION_FLUSHMSG: 3550 send_flushmsg_ack(mdsc, session, seq); 3551 break; 3552 3553 case CEPH_SESSION_FORCE_RO: 3554 dout("force_session_readonly %p\n", session); 3555 spin_lock(&session->s_cap_lock); 3556 session->s_readonly = true; 3557 spin_unlock(&session->s_cap_lock); 3558 wake_up_session_caps(session, FORCE_RO); 3559 break; 3560 3561 case CEPH_SESSION_REJECT: 3562 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 3563 pr_info("mds%d rejected session\n", session->s_mds); 3564 session->s_state = CEPH_MDS_SESSION_REJECTED; 3565 cleanup_session_requests(mdsc, session); 3566 remove_session_caps(session); 3567 if (blocklisted) 3568 mdsc->fsc->blocklisted = true; 3569 wake = 2; /* for good measure */ 3570 break; 3571 3572 default: 3573 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 3574 WARN_ON(1); 3575 } 3576 3577 mutex_unlock(&session->s_mutex); 3578 if (wake) { 3579 mutex_lock(&mdsc->mutex); 3580 __wake_requests(mdsc, &session->s_waiting); 3581 if (wake == 2) 3582 kick_requests(mdsc, mds); 3583 mutex_unlock(&mdsc->mutex); 3584 } 3585 if (op == CEPH_SESSION_CLOSE) 3586 ceph_put_mds_session(session); 3587 return; 3588 3589 bad: 3590 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 3591 (int)msg->front.iov_len); 3592 ceph_msg_dump(msg); 3593 return; 3594 } 3595 3596 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 3597 { 3598 int dcaps; 3599 3600 dcaps = xchg(&req->r_dir_caps, 0); 3601 if (dcaps) { 3602 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3603 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 3604 } 3605 } 3606 3607 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req) 3608 { 3609 int dcaps; 3610 3611 dcaps = xchg(&req->r_dir_caps, 0); 3612 if (dcaps) { 3613 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3614 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent), 3615 dcaps); 3616 } 3617 } 3618 3619 /* 3620 * called under session->mutex. 3621 */ 3622 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 3623 struct ceph_mds_session *session) 3624 { 3625 struct ceph_mds_request *req, *nreq; 3626 struct rb_node *p; 3627 3628 dout("replay_unsafe_requests mds%d\n", session->s_mds); 3629 3630 mutex_lock(&mdsc->mutex); 3631 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 3632 __send_request(session, req, true); 3633 3634 /* 3635 * also re-send old requests when MDS enters reconnect stage. So that MDS 3636 * can process completed request in clientreplay stage. 3637 */ 3638 p = rb_first(&mdsc->request_tree); 3639 while (p) { 3640 req = rb_entry(p, struct ceph_mds_request, r_node); 3641 p = rb_next(p); 3642 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3643 continue; 3644 if (req->r_attempts == 0) 3645 continue; /* only old requests */ 3646 if (!req->r_session) 3647 continue; 3648 if (req->r_session->s_mds != session->s_mds) 3649 continue; 3650 3651 ceph_mdsc_release_dir_caps_no_check(req); 3652 3653 __send_request(session, req, true); 3654 } 3655 mutex_unlock(&mdsc->mutex); 3656 } 3657 3658 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 3659 { 3660 struct ceph_msg *reply; 3661 struct ceph_pagelist *_pagelist; 3662 struct page *page; 3663 __le32 *addr; 3664 int err = -ENOMEM; 3665 3666 if (!recon_state->allow_multi) 3667 return -ENOSPC; 3668 3669 /* can't handle message that contains both caps and realm */ 3670 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 3671 3672 /* pre-allocate new pagelist */ 3673 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 3674 if (!_pagelist) 3675 return -ENOMEM; 3676 3677 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3678 if (!reply) 3679 goto fail_msg; 3680 3681 /* placeholder for nr_caps */ 3682 err = ceph_pagelist_encode_32(_pagelist, 0); 3683 if (err < 0) 3684 goto fail; 3685 3686 if (recon_state->nr_caps) { 3687 /* currently encoding caps */ 3688 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 3689 if (err) 3690 goto fail; 3691 } else { 3692 /* placeholder for nr_realms (currently encoding relams) */ 3693 err = ceph_pagelist_encode_32(_pagelist, 0); 3694 if (err < 0) 3695 goto fail; 3696 } 3697 3698 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 3699 if (err) 3700 goto fail; 3701 3702 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 3703 addr = kmap_atomic(page); 3704 if (recon_state->nr_caps) { 3705 /* currently encoding caps */ 3706 *addr = cpu_to_le32(recon_state->nr_caps); 3707 } else { 3708 /* currently encoding relams */ 3709 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 3710 } 3711 kunmap_atomic(addr); 3712 3713 reply->hdr.version = cpu_to_le16(5); 3714 reply->hdr.compat_version = cpu_to_le16(4); 3715 3716 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 3717 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 3718 3719 ceph_con_send(&recon_state->session->s_con, reply); 3720 ceph_pagelist_release(recon_state->pagelist); 3721 3722 recon_state->pagelist = _pagelist; 3723 recon_state->nr_caps = 0; 3724 recon_state->nr_realms = 0; 3725 recon_state->msg_version = 5; 3726 return 0; 3727 fail: 3728 ceph_msg_put(reply); 3729 fail_msg: 3730 ceph_pagelist_release(_pagelist); 3731 return err; 3732 } 3733 3734 static struct dentry* d_find_primary(struct inode *inode) 3735 { 3736 struct dentry *alias, *dn = NULL; 3737 3738 if (hlist_empty(&inode->i_dentry)) 3739 return NULL; 3740 3741 spin_lock(&inode->i_lock); 3742 if (hlist_empty(&inode->i_dentry)) 3743 goto out_unlock; 3744 3745 if (S_ISDIR(inode->i_mode)) { 3746 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias); 3747 if (!IS_ROOT(alias)) 3748 dn = dget(alias); 3749 goto out_unlock; 3750 } 3751 3752 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) { 3753 spin_lock(&alias->d_lock); 3754 if (!d_unhashed(alias) && 3755 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 3756 dn = dget_dlock(alias); 3757 } 3758 spin_unlock(&alias->d_lock); 3759 if (dn) 3760 break; 3761 } 3762 out_unlock: 3763 spin_unlock(&inode->i_lock); 3764 return dn; 3765 } 3766 3767 /* 3768 * Encode information about a cap for a reconnect with the MDS. 3769 */ 3770 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, 3771 void *arg) 3772 { 3773 union { 3774 struct ceph_mds_cap_reconnect v2; 3775 struct ceph_mds_cap_reconnect_v1 v1; 3776 } rec; 3777 struct ceph_inode_info *ci = cap->ci; 3778 struct ceph_reconnect_state *recon_state = arg; 3779 struct ceph_pagelist *pagelist = recon_state->pagelist; 3780 struct dentry *dentry; 3781 char *path; 3782 int pathlen, err; 3783 u64 pathbase; 3784 u64 snap_follows; 3785 3786 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 3787 inode, ceph_vinop(inode), cap, cap->cap_id, 3788 ceph_cap_string(cap->issued)); 3789 3790 dentry = d_find_primary(inode); 3791 if (dentry) { 3792 /* set pathbase to parent dir when msg_version >= 2 */ 3793 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 3794 recon_state->msg_version >= 2); 3795 dput(dentry); 3796 if (IS_ERR(path)) { 3797 err = PTR_ERR(path); 3798 goto out_err; 3799 } 3800 } else { 3801 path = NULL; 3802 pathlen = 0; 3803 pathbase = 0; 3804 } 3805 3806 spin_lock(&ci->i_ceph_lock); 3807 cap->seq = 0; /* reset cap seq */ 3808 cap->issue_seq = 0; /* and issue_seq */ 3809 cap->mseq = 0; /* and migrate_seq */ 3810 cap->cap_gen = atomic_read(&cap->session->s_cap_gen); 3811 3812 /* These are lost when the session goes away */ 3813 if (S_ISDIR(inode->i_mode)) { 3814 if (cap->issued & CEPH_CAP_DIR_CREATE) { 3815 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 3816 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 3817 } 3818 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 3819 } 3820 3821 if (recon_state->msg_version >= 2) { 3822 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 3823 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3824 rec.v2.issued = cpu_to_le32(cap->issued); 3825 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3826 rec.v2.pathbase = cpu_to_le64(pathbase); 3827 rec.v2.flock_len = (__force __le32) 3828 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 3829 } else { 3830 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 3831 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3832 rec.v1.issued = cpu_to_le32(cap->issued); 3833 rec.v1.size = cpu_to_le64(i_size_read(inode)); 3834 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 3835 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 3836 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3837 rec.v1.pathbase = cpu_to_le64(pathbase); 3838 } 3839 3840 if (list_empty(&ci->i_cap_snaps)) { 3841 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 3842 } else { 3843 struct ceph_cap_snap *capsnap = 3844 list_first_entry(&ci->i_cap_snaps, 3845 struct ceph_cap_snap, ci_item); 3846 snap_follows = capsnap->follows; 3847 } 3848 spin_unlock(&ci->i_ceph_lock); 3849 3850 if (recon_state->msg_version >= 2) { 3851 int num_fcntl_locks, num_flock_locks; 3852 struct ceph_filelock *flocks = NULL; 3853 size_t struct_len, total_len = sizeof(u64); 3854 u8 struct_v = 0; 3855 3856 encode_again: 3857 if (rec.v2.flock_len) { 3858 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 3859 } else { 3860 num_fcntl_locks = 0; 3861 num_flock_locks = 0; 3862 } 3863 if (num_fcntl_locks + num_flock_locks > 0) { 3864 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 3865 sizeof(struct ceph_filelock), 3866 GFP_NOFS); 3867 if (!flocks) { 3868 err = -ENOMEM; 3869 goto out_err; 3870 } 3871 err = ceph_encode_locks_to_buffer(inode, flocks, 3872 num_fcntl_locks, 3873 num_flock_locks); 3874 if (err) { 3875 kfree(flocks); 3876 flocks = NULL; 3877 if (err == -ENOSPC) 3878 goto encode_again; 3879 goto out_err; 3880 } 3881 } else { 3882 kfree(flocks); 3883 flocks = NULL; 3884 } 3885 3886 if (recon_state->msg_version >= 3) { 3887 /* version, compat_version and struct_len */ 3888 total_len += 2 * sizeof(u8) + sizeof(u32); 3889 struct_v = 2; 3890 } 3891 /* 3892 * number of encoded locks is stable, so copy to pagelist 3893 */ 3894 struct_len = 2 * sizeof(u32) + 3895 (num_fcntl_locks + num_flock_locks) * 3896 sizeof(struct ceph_filelock); 3897 rec.v2.flock_len = cpu_to_le32(struct_len); 3898 3899 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2); 3900 3901 if (struct_v >= 2) 3902 struct_len += sizeof(u64); /* snap_follows */ 3903 3904 total_len += struct_len; 3905 3906 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 3907 err = send_reconnect_partial(recon_state); 3908 if (err) 3909 goto out_freeflocks; 3910 pagelist = recon_state->pagelist; 3911 } 3912 3913 err = ceph_pagelist_reserve(pagelist, total_len); 3914 if (err) 3915 goto out_freeflocks; 3916 3917 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3918 if (recon_state->msg_version >= 3) { 3919 ceph_pagelist_encode_8(pagelist, struct_v); 3920 ceph_pagelist_encode_8(pagelist, 1); 3921 ceph_pagelist_encode_32(pagelist, struct_len); 3922 } 3923 ceph_pagelist_encode_string(pagelist, path, pathlen); 3924 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 3925 ceph_locks_to_pagelist(flocks, pagelist, 3926 num_fcntl_locks, num_flock_locks); 3927 if (struct_v >= 2) 3928 ceph_pagelist_encode_64(pagelist, snap_follows); 3929 out_freeflocks: 3930 kfree(flocks); 3931 } else { 3932 err = ceph_pagelist_reserve(pagelist, 3933 sizeof(u64) + sizeof(u32) + 3934 pathlen + sizeof(rec.v1)); 3935 if (err) 3936 goto out_err; 3937 3938 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3939 ceph_pagelist_encode_string(pagelist, path, pathlen); 3940 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 3941 } 3942 3943 out_err: 3944 ceph_mdsc_free_path(path, pathlen); 3945 if (!err) 3946 recon_state->nr_caps++; 3947 return err; 3948 } 3949 3950 static int encode_snap_realms(struct ceph_mds_client *mdsc, 3951 struct ceph_reconnect_state *recon_state) 3952 { 3953 struct rb_node *p; 3954 struct ceph_pagelist *pagelist = recon_state->pagelist; 3955 int err = 0; 3956 3957 if (recon_state->msg_version >= 4) { 3958 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 3959 if (err < 0) 3960 goto fail; 3961 } 3962 3963 /* 3964 * snaprealms. we provide mds with the ino, seq (version), and 3965 * parent for all of our realms. If the mds has any newer info, 3966 * it will tell us. 3967 */ 3968 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 3969 struct ceph_snap_realm *realm = 3970 rb_entry(p, struct ceph_snap_realm, node); 3971 struct ceph_mds_snaprealm_reconnect sr_rec; 3972 3973 if (recon_state->msg_version >= 4) { 3974 size_t need = sizeof(u8) * 2 + sizeof(u32) + 3975 sizeof(sr_rec); 3976 3977 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 3978 err = send_reconnect_partial(recon_state); 3979 if (err) 3980 goto fail; 3981 pagelist = recon_state->pagelist; 3982 } 3983 3984 err = ceph_pagelist_reserve(pagelist, need); 3985 if (err) 3986 goto fail; 3987 3988 ceph_pagelist_encode_8(pagelist, 1); 3989 ceph_pagelist_encode_8(pagelist, 1); 3990 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 3991 } 3992 3993 dout(" adding snap realm %llx seq %lld parent %llx\n", 3994 realm->ino, realm->seq, realm->parent_ino); 3995 sr_rec.ino = cpu_to_le64(realm->ino); 3996 sr_rec.seq = cpu_to_le64(realm->seq); 3997 sr_rec.parent = cpu_to_le64(realm->parent_ino); 3998 3999 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 4000 if (err) 4001 goto fail; 4002 4003 recon_state->nr_realms++; 4004 } 4005 fail: 4006 return err; 4007 } 4008 4009 4010 /* 4011 * If an MDS fails and recovers, clients need to reconnect in order to 4012 * reestablish shared state. This includes all caps issued through 4013 * this session _and_ the snap_realm hierarchy. Because it's not 4014 * clear which snap realms the mds cares about, we send everything we 4015 * know about.. that ensures we'll then get any new info the 4016 * recovering MDS might have. 4017 * 4018 * This is a relatively heavyweight operation, but it's rare. 4019 */ 4020 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 4021 struct ceph_mds_session *session) 4022 { 4023 struct ceph_msg *reply; 4024 int mds = session->s_mds; 4025 int err = -ENOMEM; 4026 struct ceph_reconnect_state recon_state = { 4027 .session = session, 4028 }; 4029 LIST_HEAD(dispose); 4030 4031 pr_info("mds%d reconnect start\n", mds); 4032 4033 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 4034 if (!recon_state.pagelist) 4035 goto fail_nopagelist; 4036 4037 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4038 if (!reply) 4039 goto fail_nomsg; 4040 4041 xa_destroy(&session->s_delegated_inos); 4042 4043 mutex_lock(&session->s_mutex); 4044 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 4045 session->s_seq = 0; 4046 4047 dout("session %p state %s\n", session, 4048 ceph_session_state_name(session->s_state)); 4049 4050 atomic_inc(&session->s_cap_gen); 4051 4052 spin_lock(&session->s_cap_lock); 4053 /* don't know if session is readonly */ 4054 session->s_readonly = 0; 4055 /* 4056 * notify __ceph_remove_cap() that we are composing cap reconnect. 4057 * If a cap get released before being added to the cap reconnect, 4058 * __ceph_remove_cap() should skip queuing cap release. 4059 */ 4060 session->s_cap_reconnect = 1; 4061 /* drop old cap expires; we're about to reestablish that state */ 4062 detach_cap_releases(session, &dispose); 4063 spin_unlock(&session->s_cap_lock); 4064 dispose_cap_releases(mdsc, &dispose); 4065 4066 /* trim unused caps to reduce MDS's cache rejoin time */ 4067 if (mdsc->fsc->sb->s_root) 4068 shrink_dcache_parent(mdsc->fsc->sb->s_root); 4069 4070 ceph_con_close(&session->s_con); 4071 ceph_con_open(&session->s_con, 4072 CEPH_ENTITY_TYPE_MDS, mds, 4073 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 4074 4075 /* replay unsafe requests */ 4076 replay_unsafe_requests(mdsc, session); 4077 4078 ceph_early_kick_flushing_caps(mdsc, session); 4079 4080 down_read(&mdsc->snap_rwsem); 4081 4082 /* placeholder for nr_caps */ 4083 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 4084 if (err) 4085 goto fail; 4086 4087 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 4088 recon_state.msg_version = 3; 4089 recon_state.allow_multi = true; 4090 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 4091 recon_state.msg_version = 3; 4092 } else { 4093 recon_state.msg_version = 2; 4094 } 4095 /* trsaverse this session's caps */ 4096 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 4097 4098 spin_lock(&session->s_cap_lock); 4099 session->s_cap_reconnect = 0; 4100 spin_unlock(&session->s_cap_lock); 4101 4102 if (err < 0) 4103 goto fail; 4104 4105 /* check if all realms can be encoded into current message */ 4106 if (mdsc->num_snap_realms) { 4107 size_t total_len = 4108 recon_state.pagelist->length + 4109 mdsc->num_snap_realms * 4110 sizeof(struct ceph_mds_snaprealm_reconnect); 4111 if (recon_state.msg_version >= 4) { 4112 /* number of realms */ 4113 total_len += sizeof(u32); 4114 /* version, compat_version and struct_len */ 4115 total_len += mdsc->num_snap_realms * 4116 (2 * sizeof(u8) + sizeof(u32)); 4117 } 4118 if (total_len > RECONNECT_MAX_SIZE) { 4119 if (!recon_state.allow_multi) { 4120 err = -ENOSPC; 4121 goto fail; 4122 } 4123 if (recon_state.nr_caps) { 4124 err = send_reconnect_partial(&recon_state); 4125 if (err) 4126 goto fail; 4127 } 4128 recon_state.msg_version = 5; 4129 } 4130 } 4131 4132 err = encode_snap_realms(mdsc, &recon_state); 4133 if (err < 0) 4134 goto fail; 4135 4136 if (recon_state.msg_version >= 5) { 4137 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 4138 if (err < 0) 4139 goto fail; 4140 } 4141 4142 if (recon_state.nr_caps || recon_state.nr_realms) { 4143 struct page *page = 4144 list_first_entry(&recon_state.pagelist->head, 4145 struct page, lru); 4146 __le32 *addr = kmap_atomic(page); 4147 if (recon_state.nr_caps) { 4148 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 4149 *addr = cpu_to_le32(recon_state.nr_caps); 4150 } else if (recon_state.msg_version >= 4) { 4151 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 4152 } 4153 kunmap_atomic(addr); 4154 } 4155 4156 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 4157 if (recon_state.msg_version >= 4) 4158 reply->hdr.compat_version = cpu_to_le16(4); 4159 4160 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 4161 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 4162 4163 ceph_con_send(&session->s_con, reply); 4164 4165 mutex_unlock(&session->s_mutex); 4166 4167 mutex_lock(&mdsc->mutex); 4168 __wake_requests(mdsc, &session->s_waiting); 4169 mutex_unlock(&mdsc->mutex); 4170 4171 up_read(&mdsc->snap_rwsem); 4172 ceph_pagelist_release(recon_state.pagelist); 4173 return; 4174 4175 fail: 4176 ceph_msg_put(reply); 4177 up_read(&mdsc->snap_rwsem); 4178 mutex_unlock(&session->s_mutex); 4179 fail_nomsg: 4180 ceph_pagelist_release(recon_state.pagelist); 4181 fail_nopagelist: 4182 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 4183 return; 4184 } 4185 4186 4187 /* 4188 * compare old and new mdsmaps, kicking requests 4189 * and closing out old connections as necessary 4190 * 4191 * called under mdsc->mutex. 4192 */ 4193 static void check_new_map(struct ceph_mds_client *mdsc, 4194 struct ceph_mdsmap *newmap, 4195 struct ceph_mdsmap *oldmap) 4196 { 4197 int i, j, err; 4198 int oldstate, newstate; 4199 struct ceph_mds_session *s; 4200 unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0}; 4201 4202 dout("check_new_map new %u old %u\n", 4203 newmap->m_epoch, oldmap->m_epoch); 4204 4205 if (newmap->m_info) { 4206 for (i = 0; i < newmap->possible_max_rank; i++) { 4207 for (j = 0; j < newmap->m_info[i].num_export_targets; j++) 4208 set_bit(newmap->m_info[i].export_targets[j], targets); 4209 } 4210 } 4211 4212 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4213 if (!mdsc->sessions[i]) 4214 continue; 4215 s = mdsc->sessions[i]; 4216 oldstate = ceph_mdsmap_get_state(oldmap, i); 4217 newstate = ceph_mdsmap_get_state(newmap, i); 4218 4219 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 4220 i, ceph_mds_state_name(oldstate), 4221 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 4222 ceph_mds_state_name(newstate), 4223 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 4224 ceph_session_state_name(s->s_state)); 4225 4226 if (i >= newmap->possible_max_rank) { 4227 /* force close session for stopped mds */ 4228 ceph_get_mds_session(s); 4229 __unregister_session(mdsc, s); 4230 __wake_requests(mdsc, &s->s_waiting); 4231 mutex_unlock(&mdsc->mutex); 4232 4233 mutex_lock(&s->s_mutex); 4234 cleanup_session_requests(mdsc, s); 4235 remove_session_caps(s); 4236 mutex_unlock(&s->s_mutex); 4237 4238 ceph_put_mds_session(s); 4239 4240 mutex_lock(&mdsc->mutex); 4241 kick_requests(mdsc, i); 4242 continue; 4243 } 4244 4245 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 4246 ceph_mdsmap_get_addr(newmap, i), 4247 sizeof(struct ceph_entity_addr))) { 4248 /* just close it */ 4249 mutex_unlock(&mdsc->mutex); 4250 mutex_lock(&s->s_mutex); 4251 mutex_lock(&mdsc->mutex); 4252 ceph_con_close(&s->s_con); 4253 mutex_unlock(&s->s_mutex); 4254 s->s_state = CEPH_MDS_SESSION_RESTARTING; 4255 } else if (oldstate == newstate) { 4256 continue; /* nothing new with this mds */ 4257 } 4258 4259 /* 4260 * send reconnect? 4261 */ 4262 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 4263 newstate >= CEPH_MDS_STATE_RECONNECT) { 4264 mutex_unlock(&mdsc->mutex); 4265 clear_bit(i, targets); 4266 send_mds_reconnect(mdsc, s); 4267 mutex_lock(&mdsc->mutex); 4268 } 4269 4270 /* 4271 * kick request on any mds that has gone active. 4272 */ 4273 if (oldstate < CEPH_MDS_STATE_ACTIVE && 4274 newstate >= CEPH_MDS_STATE_ACTIVE) { 4275 if (oldstate != CEPH_MDS_STATE_CREATING && 4276 oldstate != CEPH_MDS_STATE_STARTING) 4277 pr_info("mds%d recovery completed\n", s->s_mds); 4278 kick_requests(mdsc, i); 4279 mutex_unlock(&mdsc->mutex); 4280 mutex_lock(&s->s_mutex); 4281 mutex_lock(&mdsc->mutex); 4282 ceph_kick_flushing_caps(mdsc, s); 4283 mutex_unlock(&s->s_mutex); 4284 wake_up_session_caps(s, RECONNECT); 4285 } 4286 } 4287 4288 /* 4289 * Only open and reconnect sessions that don't exist yet. 4290 */ 4291 for (i = 0; i < newmap->possible_max_rank; i++) { 4292 /* 4293 * In case the import MDS is crashed just after 4294 * the EImportStart journal is flushed, so when 4295 * a standby MDS takes over it and is replaying 4296 * the EImportStart journal the new MDS daemon 4297 * will wait the client to reconnect it, but the 4298 * client may never register/open the session yet. 4299 * 4300 * Will try to reconnect that MDS daemon if the 4301 * rank number is in the export targets array and 4302 * is the up:reconnect state. 4303 */ 4304 newstate = ceph_mdsmap_get_state(newmap, i); 4305 if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT) 4306 continue; 4307 4308 /* 4309 * The session maybe registered and opened by some 4310 * requests which were choosing random MDSes during 4311 * the mdsc->mutex's unlock/lock gap below in rare 4312 * case. But the related MDS daemon will just queue 4313 * that requests and be still waiting for the client's 4314 * reconnection request in up:reconnect state. 4315 */ 4316 s = __ceph_lookup_mds_session(mdsc, i); 4317 if (likely(!s)) { 4318 s = __open_export_target_session(mdsc, i); 4319 if (IS_ERR(s)) { 4320 err = PTR_ERR(s); 4321 pr_err("failed to open export target session, err %d\n", 4322 err); 4323 continue; 4324 } 4325 } 4326 dout("send reconnect to export target mds.%d\n", i); 4327 mutex_unlock(&mdsc->mutex); 4328 send_mds_reconnect(mdsc, s); 4329 ceph_put_mds_session(s); 4330 mutex_lock(&mdsc->mutex); 4331 } 4332 4333 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4334 s = mdsc->sessions[i]; 4335 if (!s) 4336 continue; 4337 if (!ceph_mdsmap_is_laggy(newmap, i)) 4338 continue; 4339 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4340 s->s_state == CEPH_MDS_SESSION_HUNG || 4341 s->s_state == CEPH_MDS_SESSION_CLOSING) { 4342 dout(" connecting to export targets of laggy mds%d\n", 4343 i); 4344 __open_export_target_sessions(mdsc, s); 4345 } 4346 } 4347 } 4348 4349 4350 4351 /* 4352 * leases 4353 */ 4354 4355 /* 4356 * caller must hold session s_mutex, dentry->d_lock 4357 */ 4358 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 4359 { 4360 struct ceph_dentry_info *di = ceph_dentry(dentry); 4361 4362 ceph_put_mds_session(di->lease_session); 4363 di->lease_session = NULL; 4364 } 4365 4366 static void handle_lease(struct ceph_mds_client *mdsc, 4367 struct ceph_mds_session *session, 4368 struct ceph_msg *msg) 4369 { 4370 struct super_block *sb = mdsc->fsc->sb; 4371 struct inode *inode; 4372 struct dentry *parent, *dentry; 4373 struct ceph_dentry_info *di; 4374 int mds = session->s_mds; 4375 struct ceph_mds_lease *h = msg->front.iov_base; 4376 u32 seq; 4377 struct ceph_vino vino; 4378 struct qstr dname; 4379 int release = 0; 4380 4381 dout("handle_lease from mds%d\n", mds); 4382 4383 /* decode */ 4384 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 4385 goto bad; 4386 vino.ino = le64_to_cpu(h->ino); 4387 vino.snap = CEPH_NOSNAP; 4388 seq = le32_to_cpu(h->seq); 4389 dname.len = get_unaligned_le32(h + 1); 4390 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 4391 goto bad; 4392 dname.name = (void *)(h + 1) + sizeof(u32); 4393 4394 /* lookup inode */ 4395 inode = ceph_find_inode(sb, vino); 4396 dout("handle_lease %s, ino %llx %p %.*s\n", 4397 ceph_lease_op_name(h->action), vino.ino, inode, 4398 dname.len, dname.name); 4399 4400 mutex_lock(&session->s_mutex); 4401 inc_session_sequence(session); 4402 4403 if (!inode) { 4404 dout("handle_lease no inode %llx\n", vino.ino); 4405 goto release; 4406 } 4407 4408 /* dentry */ 4409 parent = d_find_alias(inode); 4410 if (!parent) { 4411 dout("no parent dentry on inode %p\n", inode); 4412 WARN_ON(1); 4413 goto release; /* hrm... */ 4414 } 4415 dname.hash = full_name_hash(parent, dname.name, dname.len); 4416 dentry = d_lookup(parent, &dname); 4417 dput(parent); 4418 if (!dentry) 4419 goto release; 4420 4421 spin_lock(&dentry->d_lock); 4422 di = ceph_dentry(dentry); 4423 switch (h->action) { 4424 case CEPH_MDS_LEASE_REVOKE: 4425 if (di->lease_session == session) { 4426 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 4427 h->seq = cpu_to_le32(di->lease_seq); 4428 __ceph_mdsc_drop_dentry_lease(dentry); 4429 } 4430 release = 1; 4431 break; 4432 4433 case CEPH_MDS_LEASE_RENEW: 4434 if (di->lease_session == session && 4435 di->lease_gen == atomic_read(&session->s_cap_gen) && 4436 di->lease_renew_from && 4437 di->lease_renew_after == 0) { 4438 unsigned long duration = 4439 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 4440 4441 di->lease_seq = seq; 4442 di->time = di->lease_renew_from + duration; 4443 di->lease_renew_after = di->lease_renew_from + 4444 (duration >> 1); 4445 di->lease_renew_from = 0; 4446 } 4447 break; 4448 } 4449 spin_unlock(&dentry->d_lock); 4450 dput(dentry); 4451 4452 if (!release) 4453 goto out; 4454 4455 release: 4456 /* let's just reuse the same message */ 4457 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 4458 ceph_msg_get(msg); 4459 ceph_con_send(&session->s_con, msg); 4460 4461 out: 4462 mutex_unlock(&session->s_mutex); 4463 iput(inode); 4464 return; 4465 4466 bad: 4467 pr_err("corrupt lease message\n"); 4468 ceph_msg_dump(msg); 4469 } 4470 4471 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 4472 struct dentry *dentry, char action, 4473 u32 seq) 4474 { 4475 struct ceph_msg *msg; 4476 struct ceph_mds_lease *lease; 4477 struct inode *dir; 4478 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 4479 4480 dout("lease_send_msg identry %p %s to mds%d\n", 4481 dentry, ceph_lease_op_name(action), session->s_mds); 4482 4483 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 4484 if (!msg) 4485 return; 4486 lease = msg->front.iov_base; 4487 lease->action = action; 4488 lease->seq = cpu_to_le32(seq); 4489 4490 spin_lock(&dentry->d_lock); 4491 dir = d_inode(dentry->d_parent); 4492 lease->ino = cpu_to_le64(ceph_ino(dir)); 4493 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 4494 4495 put_unaligned_le32(dentry->d_name.len, lease + 1); 4496 memcpy((void *)(lease + 1) + 4, 4497 dentry->d_name.name, dentry->d_name.len); 4498 spin_unlock(&dentry->d_lock); 4499 /* 4500 * if this is a preemptive lease RELEASE, no need to 4501 * flush request stream, since the actual request will 4502 * soon follow. 4503 */ 4504 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 4505 4506 ceph_con_send(&session->s_con, msg); 4507 } 4508 4509 /* 4510 * lock unlock the session, to wait ongoing session activities 4511 */ 4512 static void lock_unlock_session(struct ceph_mds_session *s) 4513 { 4514 mutex_lock(&s->s_mutex); 4515 mutex_unlock(&s->s_mutex); 4516 } 4517 4518 static void maybe_recover_session(struct ceph_mds_client *mdsc) 4519 { 4520 struct ceph_fs_client *fsc = mdsc->fsc; 4521 4522 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 4523 return; 4524 4525 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 4526 return; 4527 4528 if (!READ_ONCE(fsc->blocklisted)) 4529 return; 4530 4531 pr_info("auto reconnect after blocklisted\n"); 4532 ceph_force_reconnect(fsc->sb); 4533 } 4534 4535 bool check_session_state(struct ceph_mds_session *s) 4536 { 4537 struct ceph_fs_client *fsc = s->s_mdsc->fsc; 4538 4539 switch (s->s_state) { 4540 case CEPH_MDS_SESSION_OPEN: 4541 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 4542 s->s_state = CEPH_MDS_SESSION_HUNG; 4543 pr_info("mds%d hung\n", s->s_mds); 4544 } 4545 break; 4546 case CEPH_MDS_SESSION_CLOSING: 4547 /* Should never reach this when not force unmounting */ 4548 WARN_ON_ONCE(s->s_ttl && 4549 READ_ONCE(fsc->mount_state) != CEPH_MOUNT_SHUTDOWN); 4550 fallthrough; 4551 case CEPH_MDS_SESSION_NEW: 4552 case CEPH_MDS_SESSION_RESTARTING: 4553 case CEPH_MDS_SESSION_CLOSED: 4554 case CEPH_MDS_SESSION_REJECTED: 4555 return false; 4556 } 4557 4558 return true; 4559 } 4560 4561 /* 4562 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 4563 * then we need to retransmit that request. 4564 */ 4565 void inc_session_sequence(struct ceph_mds_session *s) 4566 { 4567 lockdep_assert_held(&s->s_mutex); 4568 4569 s->s_seq++; 4570 4571 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 4572 int ret; 4573 4574 dout("resending session close request for mds%d\n", s->s_mds); 4575 ret = request_close_session(s); 4576 if (ret < 0) 4577 pr_err("unable to close session to mds%d: %d\n", 4578 s->s_mds, ret); 4579 } 4580 } 4581 4582 /* 4583 * delayed work -- periodically trim expired leases, renew caps with mds. If 4584 * the @delay parameter is set to 0 or if it's more than 5 secs, the default 4585 * workqueue delay value of 5 secs will be used. 4586 */ 4587 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay) 4588 { 4589 unsigned long max_delay = HZ * 5; 4590 4591 /* 5 secs default delay */ 4592 if (!delay || (delay > max_delay)) 4593 delay = max_delay; 4594 schedule_delayed_work(&mdsc->delayed_work, 4595 round_jiffies_relative(delay)); 4596 } 4597 4598 static void delayed_work(struct work_struct *work) 4599 { 4600 struct ceph_mds_client *mdsc = 4601 container_of(work, struct ceph_mds_client, delayed_work.work); 4602 unsigned long delay; 4603 int renew_interval; 4604 int renew_caps; 4605 int i; 4606 4607 dout("mdsc delayed_work\n"); 4608 4609 if (mdsc->stopping) 4610 return; 4611 4612 mutex_lock(&mdsc->mutex); 4613 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 4614 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 4615 mdsc->last_renew_caps); 4616 if (renew_caps) 4617 mdsc->last_renew_caps = jiffies; 4618 4619 for (i = 0; i < mdsc->max_sessions; i++) { 4620 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4621 if (!s) 4622 continue; 4623 4624 if (!check_session_state(s)) { 4625 ceph_put_mds_session(s); 4626 continue; 4627 } 4628 mutex_unlock(&mdsc->mutex); 4629 4630 mutex_lock(&s->s_mutex); 4631 if (renew_caps) 4632 send_renew_caps(mdsc, s); 4633 else 4634 ceph_con_keepalive(&s->s_con); 4635 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4636 s->s_state == CEPH_MDS_SESSION_HUNG) 4637 ceph_send_cap_releases(mdsc, s); 4638 mutex_unlock(&s->s_mutex); 4639 ceph_put_mds_session(s); 4640 4641 mutex_lock(&mdsc->mutex); 4642 } 4643 mutex_unlock(&mdsc->mutex); 4644 4645 delay = ceph_check_delayed_caps(mdsc); 4646 4647 ceph_queue_cap_reclaim_work(mdsc); 4648 4649 ceph_trim_snapid_map(mdsc); 4650 4651 maybe_recover_session(mdsc); 4652 4653 schedule_delayed(mdsc, delay); 4654 } 4655 4656 int ceph_mdsc_init(struct ceph_fs_client *fsc) 4657 4658 { 4659 struct ceph_mds_client *mdsc; 4660 int err; 4661 4662 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 4663 if (!mdsc) 4664 return -ENOMEM; 4665 mdsc->fsc = fsc; 4666 mutex_init(&mdsc->mutex); 4667 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 4668 if (!mdsc->mdsmap) { 4669 err = -ENOMEM; 4670 goto err_mdsc; 4671 } 4672 4673 init_completion(&mdsc->safe_umount_waiters); 4674 init_waitqueue_head(&mdsc->session_close_wq); 4675 INIT_LIST_HEAD(&mdsc->waiting_for_map); 4676 mdsc->quotarealms_inodes = RB_ROOT; 4677 mutex_init(&mdsc->quotarealms_inodes_mutex); 4678 init_rwsem(&mdsc->snap_rwsem); 4679 mdsc->snap_realms = RB_ROOT; 4680 INIT_LIST_HEAD(&mdsc->snap_empty); 4681 spin_lock_init(&mdsc->snap_empty_lock); 4682 mdsc->request_tree = RB_ROOT; 4683 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 4684 mdsc->last_renew_caps = jiffies; 4685 INIT_LIST_HEAD(&mdsc->cap_delay_list); 4686 INIT_LIST_HEAD(&mdsc->cap_wait_list); 4687 spin_lock_init(&mdsc->cap_delay_lock); 4688 INIT_LIST_HEAD(&mdsc->snap_flush_list); 4689 spin_lock_init(&mdsc->snap_flush_lock); 4690 mdsc->last_cap_flush_tid = 1; 4691 INIT_LIST_HEAD(&mdsc->cap_flush_list); 4692 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 4693 spin_lock_init(&mdsc->cap_dirty_lock); 4694 init_waitqueue_head(&mdsc->cap_flushing_wq); 4695 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 4696 err = ceph_metric_init(&mdsc->metric); 4697 if (err) 4698 goto err_mdsmap; 4699 4700 spin_lock_init(&mdsc->dentry_list_lock); 4701 INIT_LIST_HEAD(&mdsc->dentry_leases); 4702 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 4703 4704 ceph_caps_init(mdsc); 4705 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 4706 4707 spin_lock_init(&mdsc->snapid_map_lock); 4708 mdsc->snapid_map_tree = RB_ROOT; 4709 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 4710 4711 init_rwsem(&mdsc->pool_perm_rwsem); 4712 mdsc->pool_perm_tree = RB_ROOT; 4713 4714 strscpy(mdsc->nodename, utsname()->nodename, 4715 sizeof(mdsc->nodename)); 4716 4717 fsc->mdsc = mdsc; 4718 return 0; 4719 4720 err_mdsmap: 4721 kfree(mdsc->mdsmap); 4722 err_mdsc: 4723 kfree(mdsc); 4724 return err; 4725 } 4726 4727 /* 4728 * Wait for safe replies on open mds requests. If we time out, drop 4729 * all requests from the tree to avoid dangling dentry refs. 4730 */ 4731 static void wait_requests(struct ceph_mds_client *mdsc) 4732 { 4733 struct ceph_options *opts = mdsc->fsc->client->options; 4734 struct ceph_mds_request *req; 4735 4736 mutex_lock(&mdsc->mutex); 4737 if (__get_oldest_req(mdsc)) { 4738 mutex_unlock(&mdsc->mutex); 4739 4740 dout("wait_requests waiting for requests\n"); 4741 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 4742 ceph_timeout_jiffies(opts->mount_timeout)); 4743 4744 /* tear down remaining requests */ 4745 mutex_lock(&mdsc->mutex); 4746 while ((req = __get_oldest_req(mdsc))) { 4747 dout("wait_requests timed out on tid %llu\n", 4748 req->r_tid); 4749 list_del_init(&req->r_wait); 4750 __unregister_request(mdsc, req); 4751 } 4752 } 4753 mutex_unlock(&mdsc->mutex); 4754 dout("wait_requests done\n"); 4755 } 4756 4757 void send_flush_mdlog(struct ceph_mds_session *s) 4758 { 4759 struct ceph_msg *msg; 4760 4761 /* 4762 * Pre-luminous MDS crashes when it sees an unknown session request 4763 */ 4764 if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS)) 4765 return; 4766 4767 mutex_lock(&s->s_mutex); 4768 dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds, 4769 ceph_session_state_name(s->s_state), s->s_seq); 4770 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG, 4771 s->s_seq); 4772 if (!msg) { 4773 pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n", 4774 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 4775 } else { 4776 ceph_con_send(&s->s_con, msg); 4777 } 4778 mutex_unlock(&s->s_mutex); 4779 } 4780 4781 /* 4782 * called before mount is ro, and before dentries are torn down. 4783 * (hmm, does this still race with new lookups?) 4784 */ 4785 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 4786 { 4787 dout("pre_umount\n"); 4788 mdsc->stopping = 1; 4789 4790 ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true); 4791 ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false); 4792 ceph_flush_dirty_caps(mdsc); 4793 wait_requests(mdsc); 4794 4795 /* 4796 * wait for reply handlers to drop their request refs and 4797 * their inode/dcache refs 4798 */ 4799 ceph_msgr_flush(); 4800 4801 ceph_cleanup_quotarealms_inodes(mdsc); 4802 } 4803 4804 /* 4805 * wait for all write mds requests to flush. 4806 */ 4807 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 4808 { 4809 struct ceph_mds_request *req = NULL, *nextreq; 4810 struct rb_node *n; 4811 4812 mutex_lock(&mdsc->mutex); 4813 dout("wait_unsafe_requests want %lld\n", want_tid); 4814 restart: 4815 req = __get_oldest_req(mdsc); 4816 while (req && req->r_tid <= want_tid) { 4817 /* find next request */ 4818 n = rb_next(&req->r_node); 4819 if (n) 4820 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 4821 else 4822 nextreq = NULL; 4823 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 4824 (req->r_op & CEPH_MDS_OP_WRITE)) { 4825 /* write op */ 4826 ceph_mdsc_get_request(req); 4827 if (nextreq) 4828 ceph_mdsc_get_request(nextreq); 4829 mutex_unlock(&mdsc->mutex); 4830 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 4831 req->r_tid, want_tid); 4832 wait_for_completion(&req->r_safe_completion); 4833 mutex_lock(&mdsc->mutex); 4834 ceph_mdsc_put_request(req); 4835 if (!nextreq) 4836 break; /* next dne before, so we're done! */ 4837 if (RB_EMPTY_NODE(&nextreq->r_node)) { 4838 /* next request was removed from tree */ 4839 ceph_mdsc_put_request(nextreq); 4840 goto restart; 4841 } 4842 ceph_mdsc_put_request(nextreq); /* won't go away */ 4843 } 4844 req = nextreq; 4845 } 4846 mutex_unlock(&mdsc->mutex); 4847 dout("wait_unsafe_requests done\n"); 4848 } 4849 4850 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 4851 { 4852 u64 want_tid, want_flush; 4853 4854 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) 4855 return; 4856 4857 dout("sync\n"); 4858 mutex_lock(&mdsc->mutex); 4859 want_tid = mdsc->last_tid; 4860 mutex_unlock(&mdsc->mutex); 4861 4862 ceph_flush_dirty_caps(mdsc); 4863 spin_lock(&mdsc->cap_dirty_lock); 4864 want_flush = mdsc->last_cap_flush_tid; 4865 if (!list_empty(&mdsc->cap_flush_list)) { 4866 struct ceph_cap_flush *cf = 4867 list_last_entry(&mdsc->cap_flush_list, 4868 struct ceph_cap_flush, g_list); 4869 cf->wake = true; 4870 } 4871 spin_unlock(&mdsc->cap_dirty_lock); 4872 4873 dout("sync want tid %lld flush_seq %lld\n", 4874 want_tid, want_flush); 4875 4876 wait_unsafe_requests(mdsc, want_tid); 4877 wait_caps_flush(mdsc, want_flush); 4878 } 4879 4880 /* 4881 * true if all sessions are closed, or we force unmount 4882 */ 4883 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 4884 { 4885 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4886 return true; 4887 return atomic_read(&mdsc->num_sessions) <= skipped; 4888 } 4889 4890 /* 4891 * called after sb is ro. 4892 */ 4893 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 4894 { 4895 struct ceph_options *opts = mdsc->fsc->client->options; 4896 struct ceph_mds_session *session; 4897 int i; 4898 int skipped = 0; 4899 4900 dout("close_sessions\n"); 4901 4902 /* close sessions */ 4903 mutex_lock(&mdsc->mutex); 4904 for (i = 0; i < mdsc->max_sessions; i++) { 4905 session = __ceph_lookup_mds_session(mdsc, i); 4906 if (!session) 4907 continue; 4908 mutex_unlock(&mdsc->mutex); 4909 mutex_lock(&session->s_mutex); 4910 if (__close_session(mdsc, session) <= 0) 4911 skipped++; 4912 mutex_unlock(&session->s_mutex); 4913 ceph_put_mds_session(session); 4914 mutex_lock(&mdsc->mutex); 4915 } 4916 mutex_unlock(&mdsc->mutex); 4917 4918 dout("waiting for sessions to close\n"); 4919 wait_event_timeout(mdsc->session_close_wq, 4920 done_closing_sessions(mdsc, skipped), 4921 ceph_timeout_jiffies(opts->mount_timeout)); 4922 4923 /* tear down remaining sessions */ 4924 mutex_lock(&mdsc->mutex); 4925 for (i = 0; i < mdsc->max_sessions; i++) { 4926 if (mdsc->sessions[i]) { 4927 session = ceph_get_mds_session(mdsc->sessions[i]); 4928 __unregister_session(mdsc, session); 4929 mutex_unlock(&mdsc->mutex); 4930 mutex_lock(&session->s_mutex); 4931 remove_session_caps(session); 4932 mutex_unlock(&session->s_mutex); 4933 ceph_put_mds_session(session); 4934 mutex_lock(&mdsc->mutex); 4935 } 4936 } 4937 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 4938 mutex_unlock(&mdsc->mutex); 4939 4940 ceph_cleanup_snapid_map(mdsc); 4941 ceph_cleanup_empty_realms(mdsc); 4942 4943 cancel_work_sync(&mdsc->cap_reclaim_work); 4944 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4945 4946 dout("stopped\n"); 4947 } 4948 4949 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 4950 { 4951 struct ceph_mds_session *session; 4952 int mds; 4953 4954 dout("force umount\n"); 4955 4956 mutex_lock(&mdsc->mutex); 4957 for (mds = 0; mds < mdsc->max_sessions; mds++) { 4958 session = __ceph_lookup_mds_session(mdsc, mds); 4959 if (!session) 4960 continue; 4961 4962 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 4963 __unregister_session(mdsc, session); 4964 __wake_requests(mdsc, &session->s_waiting); 4965 mutex_unlock(&mdsc->mutex); 4966 4967 mutex_lock(&session->s_mutex); 4968 __close_session(mdsc, session); 4969 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 4970 cleanup_session_requests(mdsc, session); 4971 remove_session_caps(session); 4972 } 4973 mutex_unlock(&session->s_mutex); 4974 ceph_put_mds_session(session); 4975 4976 mutex_lock(&mdsc->mutex); 4977 kick_requests(mdsc, mds); 4978 } 4979 __wake_requests(mdsc, &mdsc->waiting_for_map); 4980 mutex_unlock(&mdsc->mutex); 4981 } 4982 4983 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 4984 { 4985 dout("stop\n"); 4986 /* 4987 * Make sure the delayed work stopped before releasing 4988 * the resources. 4989 * 4990 * Because the cancel_delayed_work_sync() will only 4991 * guarantee that the work finishes executing. But the 4992 * delayed work will re-arm itself again after that. 4993 */ 4994 flush_delayed_work(&mdsc->delayed_work); 4995 4996 if (mdsc->mdsmap) 4997 ceph_mdsmap_destroy(mdsc->mdsmap); 4998 kfree(mdsc->sessions); 4999 ceph_caps_finalize(mdsc); 5000 ceph_pool_perm_destroy(mdsc); 5001 } 5002 5003 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 5004 { 5005 struct ceph_mds_client *mdsc = fsc->mdsc; 5006 dout("mdsc_destroy %p\n", mdsc); 5007 5008 if (!mdsc) 5009 return; 5010 5011 /* flush out any connection work with references to us */ 5012 ceph_msgr_flush(); 5013 5014 ceph_mdsc_stop(mdsc); 5015 5016 ceph_metric_destroy(&mdsc->metric); 5017 5018 fsc->mdsc = NULL; 5019 kfree(mdsc); 5020 dout("mdsc_destroy %p done\n", mdsc); 5021 } 5022 5023 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 5024 { 5025 struct ceph_fs_client *fsc = mdsc->fsc; 5026 const char *mds_namespace = fsc->mount_options->mds_namespace; 5027 void *p = msg->front.iov_base; 5028 void *end = p + msg->front.iov_len; 5029 u32 epoch; 5030 u32 num_fs; 5031 u32 mount_fscid = (u32)-1; 5032 int err = -EINVAL; 5033 5034 ceph_decode_need(&p, end, sizeof(u32), bad); 5035 epoch = ceph_decode_32(&p); 5036 5037 dout("handle_fsmap epoch %u\n", epoch); 5038 5039 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */ 5040 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad); 5041 5042 ceph_decode_32_safe(&p, end, num_fs, bad); 5043 while (num_fs-- > 0) { 5044 void *info_p, *info_end; 5045 u32 info_len; 5046 u32 fscid, namelen; 5047 5048 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 5049 p += 2; // info_v, info_cv 5050 info_len = ceph_decode_32(&p); 5051 ceph_decode_need(&p, end, info_len, bad); 5052 info_p = p; 5053 info_end = p + info_len; 5054 p = info_end; 5055 5056 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 5057 fscid = ceph_decode_32(&info_p); 5058 namelen = ceph_decode_32(&info_p); 5059 ceph_decode_need(&info_p, info_end, namelen, bad); 5060 5061 if (mds_namespace && 5062 strlen(mds_namespace) == namelen && 5063 !strncmp(mds_namespace, (char *)info_p, namelen)) { 5064 mount_fscid = fscid; 5065 break; 5066 } 5067 } 5068 5069 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 5070 if (mount_fscid != (u32)-1) { 5071 fsc->client->monc.fs_cluster_id = mount_fscid; 5072 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 5073 0, true); 5074 ceph_monc_renew_subs(&fsc->client->monc); 5075 } else { 5076 err = -ENOENT; 5077 goto err_out; 5078 } 5079 return; 5080 5081 bad: 5082 pr_err("error decoding fsmap\n"); 5083 err_out: 5084 mutex_lock(&mdsc->mutex); 5085 mdsc->mdsmap_err = err; 5086 __wake_requests(mdsc, &mdsc->waiting_for_map); 5087 mutex_unlock(&mdsc->mutex); 5088 } 5089 5090 /* 5091 * handle mds map update. 5092 */ 5093 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 5094 { 5095 u32 epoch; 5096 u32 maplen; 5097 void *p = msg->front.iov_base; 5098 void *end = p + msg->front.iov_len; 5099 struct ceph_mdsmap *newmap, *oldmap; 5100 struct ceph_fsid fsid; 5101 int err = -EINVAL; 5102 5103 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 5104 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 5105 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 5106 return; 5107 epoch = ceph_decode_32(&p); 5108 maplen = ceph_decode_32(&p); 5109 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 5110 5111 /* do we need it? */ 5112 mutex_lock(&mdsc->mutex); 5113 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 5114 dout("handle_map epoch %u <= our %u\n", 5115 epoch, mdsc->mdsmap->m_epoch); 5116 mutex_unlock(&mdsc->mutex); 5117 return; 5118 } 5119 5120 newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client)); 5121 if (IS_ERR(newmap)) { 5122 err = PTR_ERR(newmap); 5123 goto bad_unlock; 5124 } 5125 5126 /* swap into place */ 5127 if (mdsc->mdsmap) { 5128 oldmap = mdsc->mdsmap; 5129 mdsc->mdsmap = newmap; 5130 check_new_map(mdsc, newmap, oldmap); 5131 ceph_mdsmap_destroy(oldmap); 5132 } else { 5133 mdsc->mdsmap = newmap; /* first mds map */ 5134 } 5135 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 5136 MAX_LFS_FILESIZE); 5137 5138 __wake_requests(mdsc, &mdsc->waiting_for_map); 5139 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 5140 mdsc->mdsmap->m_epoch); 5141 5142 mutex_unlock(&mdsc->mutex); 5143 schedule_delayed(mdsc, 0); 5144 return; 5145 5146 bad_unlock: 5147 mutex_unlock(&mdsc->mutex); 5148 bad: 5149 pr_err("error decoding mdsmap %d\n", err); 5150 return; 5151 } 5152 5153 static struct ceph_connection *mds_get_con(struct ceph_connection *con) 5154 { 5155 struct ceph_mds_session *s = con->private; 5156 5157 if (ceph_get_mds_session(s)) 5158 return con; 5159 return NULL; 5160 } 5161 5162 static void mds_put_con(struct ceph_connection *con) 5163 { 5164 struct ceph_mds_session *s = con->private; 5165 5166 ceph_put_mds_session(s); 5167 } 5168 5169 /* 5170 * if the client is unresponsive for long enough, the mds will kill 5171 * the session entirely. 5172 */ 5173 static void mds_peer_reset(struct ceph_connection *con) 5174 { 5175 struct ceph_mds_session *s = con->private; 5176 struct ceph_mds_client *mdsc = s->s_mdsc; 5177 5178 pr_warn("mds%d closed our session\n", s->s_mds); 5179 send_mds_reconnect(mdsc, s); 5180 } 5181 5182 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 5183 { 5184 struct ceph_mds_session *s = con->private; 5185 struct ceph_mds_client *mdsc = s->s_mdsc; 5186 int type = le16_to_cpu(msg->hdr.type); 5187 5188 mutex_lock(&mdsc->mutex); 5189 if (__verify_registered_session(mdsc, s) < 0) { 5190 mutex_unlock(&mdsc->mutex); 5191 goto out; 5192 } 5193 mutex_unlock(&mdsc->mutex); 5194 5195 switch (type) { 5196 case CEPH_MSG_MDS_MAP: 5197 ceph_mdsc_handle_mdsmap(mdsc, msg); 5198 break; 5199 case CEPH_MSG_FS_MAP_USER: 5200 ceph_mdsc_handle_fsmap(mdsc, msg); 5201 break; 5202 case CEPH_MSG_CLIENT_SESSION: 5203 handle_session(s, msg); 5204 break; 5205 case CEPH_MSG_CLIENT_REPLY: 5206 handle_reply(s, msg); 5207 break; 5208 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 5209 handle_forward(mdsc, s, msg); 5210 break; 5211 case CEPH_MSG_CLIENT_CAPS: 5212 ceph_handle_caps(s, msg); 5213 break; 5214 case CEPH_MSG_CLIENT_SNAP: 5215 ceph_handle_snap(mdsc, s, msg); 5216 break; 5217 case CEPH_MSG_CLIENT_LEASE: 5218 handle_lease(mdsc, s, msg); 5219 break; 5220 case CEPH_MSG_CLIENT_QUOTA: 5221 ceph_handle_quota(mdsc, s, msg); 5222 break; 5223 5224 default: 5225 pr_err("received unknown message type %d %s\n", type, 5226 ceph_msg_type_name(type)); 5227 } 5228 out: 5229 ceph_msg_put(msg); 5230 } 5231 5232 /* 5233 * authentication 5234 */ 5235 5236 /* 5237 * Note: returned pointer is the address of a structure that's 5238 * managed separately. Caller must *not* attempt to free it. 5239 */ 5240 static struct ceph_auth_handshake * 5241 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new) 5242 { 5243 struct ceph_mds_session *s = con->private; 5244 struct ceph_mds_client *mdsc = s->s_mdsc; 5245 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5246 struct ceph_auth_handshake *auth = &s->s_auth; 5247 int ret; 5248 5249 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5250 force_new, proto, NULL, NULL); 5251 if (ret) 5252 return ERR_PTR(ret); 5253 5254 return auth; 5255 } 5256 5257 static int mds_add_authorizer_challenge(struct ceph_connection *con, 5258 void *challenge_buf, int challenge_buf_len) 5259 { 5260 struct ceph_mds_session *s = con->private; 5261 struct ceph_mds_client *mdsc = s->s_mdsc; 5262 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5263 5264 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 5265 challenge_buf, challenge_buf_len); 5266 } 5267 5268 static int mds_verify_authorizer_reply(struct ceph_connection *con) 5269 { 5270 struct ceph_mds_session *s = con->private; 5271 struct ceph_mds_client *mdsc = s->s_mdsc; 5272 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5273 struct ceph_auth_handshake *auth = &s->s_auth; 5274 5275 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer, 5276 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len, 5277 NULL, NULL, NULL, NULL); 5278 } 5279 5280 static int mds_invalidate_authorizer(struct ceph_connection *con) 5281 { 5282 struct ceph_mds_session *s = con->private; 5283 struct ceph_mds_client *mdsc = s->s_mdsc; 5284 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5285 5286 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 5287 5288 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 5289 } 5290 5291 static int mds_get_auth_request(struct ceph_connection *con, 5292 void *buf, int *buf_len, 5293 void **authorizer, int *authorizer_len) 5294 { 5295 struct ceph_mds_session *s = con->private; 5296 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5297 struct ceph_auth_handshake *auth = &s->s_auth; 5298 int ret; 5299 5300 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5301 buf, buf_len); 5302 if (ret) 5303 return ret; 5304 5305 *authorizer = auth->authorizer_buf; 5306 *authorizer_len = auth->authorizer_buf_len; 5307 return 0; 5308 } 5309 5310 static int mds_handle_auth_reply_more(struct ceph_connection *con, 5311 void *reply, int reply_len, 5312 void *buf, int *buf_len, 5313 void **authorizer, int *authorizer_len) 5314 { 5315 struct ceph_mds_session *s = con->private; 5316 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5317 struct ceph_auth_handshake *auth = &s->s_auth; 5318 int ret; 5319 5320 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len, 5321 buf, buf_len); 5322 if (ret) 5323 return ret; 5324 5325 *authorizer = auth->authorizer_buf; 5326 *authorizer_len = auth->authorizer_buf_len; 5327 return 0; 5328 } 5329 5330 static int mds_handle_auth_done(struct ceph_connection *con, 5331 u64 global_id, void *reply, int reply_len, 5332 u8 *session_key, int *session_key_len, 5333 u8 *con_secret, int *con_secret_len) 5334 { 5335 struct ceph_mds_session *s = con->private; 5336 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5337 struct ceph_auth_handshake *auth = &s->s_auth; 5338 5339 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len, 5340 session_key, session_key_len, 5341 con_secret, con_secret_len); 5342 } 5343 5344 static int mds_handle_auth_bad_method(struct ceph_connection *con, 5345 int used_proto, int result, 5346 const int *allowed_protos, int proto_cnt, 5347 const int *allowed_modes, int mode_cnt) 5348 { 5349 struct ceph_mds_session *s = con->private; 5350 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc; 5351 int ret; 5352 5353 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS, 5354 used_proto, result, 5355 allowed_protos, proto_cnt, 5356 allowed_modes, mode_cnt)) { 5357 ret = ceph_monc_validate_auth(monc); 5358 if (ret) 5359 return ret; 5360 } 5361 5362 return -EACCES; 5363 } 5364 5365 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 5366 struct ceph_msg_header *hdr, int *skip) 5367 { 5368 struct ceph_msg *msg; 5369 int type = (int) le16_to_cpu(hdr->type); 5370 int front_len = (int) le32_to_cpu(hdr->front_len); 5371 5372 if (con->in_msg) 5373 return con->in_msg; 5374 5375 *skip = 0; 5376 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 5377 if (!msg) { 5378 pr_err("unable to allocate msg type %d len %d\n", 5379 type, front_len); 5380 return NULL; 5381 } 5382 5383 return msg; 5384 } 5385 5386 static int mds_sign_message(struct ceph_msg *msg) 5387 { 5388 struct ceph_mds_session *s = msg->con->private; 5389 struct ceph_auth_handshake *auth = &s->s_auth; 5390 5391 return ceph_auth_sign_message(auth, msg); 5392 } 5393 5394 static int mds_check_message_signature(struct ceph_msg *msg) 5395 { 5396 struct ceph_mds_session *s = msg->con->private; 5397 struct ceph_auth_handshake *auth = &s->s_auth; 5398 5399 return ceph_auth_check_message_signature(auth, msg); 5400 } 5401 5402 static const struct ceph_connection_operations mds_con_ops = { 5403 .get = mds_get_con, 5404 .put = mds_put_con, 5405 .alloc_msg = mds_alloc_msg, 5406 .dispatch = mds_dispatch, 5407 .peer_reset = mds_peer_reset, 5408 .get_authorizer = mds_get_authorizer, 5409 .add_authorizer_challenge = mds_add_authorizer_challenge, 5410 .verify_authorizer_reply = mds_verify_authorizer_reply, 5411 .invalidate_authorizer = mds_invalidate_authorizer, 5412 .sign_message = mds_sign_message, 5413 .check_message_signature = mds_check_message_signature, 5414 .get_auth_request = mds_get_auth_request, 5415 .handle_auth_reply_more = mds_handle_auth_reply_more, 5416 .handle_auth_done = mds_handle_auth_done, 5417 .handle_auth_bad_method = mds_handle_auth_bad_method, 5418 }; 5419 5420 /* eof */ 5421