1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 15 #include "super.h" 16 #include "mds_client.h" 17 18 #include <linux/ceph/ceph_features.h> 19 #include <linux/ceph/messenger.h> 20 #include <linux/ceph/decode.h> 21 #include <linux/ceph/pagelist.h> 22 #include <linux/ceph/auth.h> 23 #include <linux/ceph/debugfs.h> 24 25 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 26 27 /* 28 * A cluster of MDS (metadata server) daemons is responsible for 29 * managing the file system namespace (the directory hierarchy and 30 * inodes) and for coordinating shared access to storage. Metadata is 31 * partitioning hierarchically across a number of servers, and that 32 * partition varies over time as the cluster adjusts the distribution 33 * in order to balance load. 34 * 35 * The MDS client is primarily responsible to managing synchronous 36 * metadata requests for operations like open, unlink, and so forth. 37 * If there is a MDS failure, we find out about it when we (possibly 38 * request and) receive a new MDS map, and can resubmit affected 39 * requests. 40 * 41 * For the most part, though, we take advantage of a lossless 42 * communications channel to the MDS, and do not need to worry about 43 * timing out or resubmitting requests. 44 * 45 * We maintain a stateful "session" with each MDS we interact with. 46 * Within each session, we sent periodic heartbeat messages to ensure 47 * any capabilities or leases we have been issues remain valid. If 48 * the session times out and goes stale, our leases and capabilities 49 * are no longer valid. 50 */ 51 52 struct ceph_reconnect_state { 53 struct ceph_mds_session *session; 54 int nr_caps, nr_realms; 55 struct ceph_pagelist *pagelist; 56 unsigned msg_version; 57 bool allow_multi; 58 }; 59 60 static void __wake_requests(struct ceph_mds_client *mdsc, 61 struct list_head *head); 62 static void ceph_cap_release_work(struct work_struct *work); 63 static void ceph_cap_reclaim_work(struct work_struct *work); 64 65 static const struct ceph_connection_operations mds_con_ops; 66 67 68 /* 69 * mds reply parsing 70 */ 71 72 static int parse_reply_info_quota(void **p, void *end, 73 struct ceph_mds_reply_info_in *info) 74 { 75 u8 struct_v, struct_compat; 76 u32 struct_len; 77 78 ceph_decode_8_safe(p, end, struct_v, bad); 79 ceph_decode_8_safe(p, end, struct_compat, bad); 80 /* struct_v is expected to be >= 1. we only 81 * understand encoding with struct_compat == 1. */ 82 if (!struct_v || struct_compat != 1) 83 goto bad; 84 ceph_decode_32_safe(p, end, struct_len, bad); 85 ceph_decode_need(p, end, struct_len, bad); 86 end = *p + struct_len; 87 ceph_decode_64_safe(p, end, info->max_bytes, bad); 88 ceph_decode_64_safe(p, end, info->max_files, bad); 89 *p = end; 90 return 0; 91 bad: 92 return -EIO; 93 } 94 95 /* 96 * parse individual inode info 97 */ 98 static int parse_reply_info_in(void **p, void *end, 99 struct ceph_mds_reply_info_in *info, 100 u64 features) 101 { 102 int err = 0; 103 u8 struct_v = 0; 104 105 if (features == (u64)-1) { 106 u32 struct_len; 107 u8 struct_compat; 108 ceph_decode_8_safe(p, end, struct_v, bad); 109 ceph_decode_8_safe(p, end, struct_compat, bad); 110 /* struct_v is expected to be >= 1. we only understand 111 * encoding with struct_compat == 1. */ 112 if (!struct_v || struct_compat != 1) 113 goto bad; 114 ceph_decode_32_safe(p, end, struct_len, bad); 115 ceph_decode_need(p, end, struct_len, bad); 116 end = *p + struct_len; 117 } 118 119 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 120 info->in = *p; 121 *p += sizeof(struct ceph_mds_reply_inode) + 122 sizeof(*info->in->fragtree.splits) * 123 le32_to_cpu(info->in->fragtree.nsplits); 124 125 ceph_decode_32_safe(p, end, info->symlink_len, bad); 126 ceph_decode_need(p, end, info->symlink_len, bad); 127 info->symlink = *p; 128 *p += info->symlink_len; 129 130 ceph_decode_copy_safe(p, end, &info->dir_layout, 131 sizeof(info->dir_layout), bad); 132 ceph_decode_32_safe(p, end, info->xattr_len, bad); 133 ceph_decode_need(p, end, info->xattr_len, bad); 134 info->xattr_data = *p; 135 *p += info->xattr_len; 136 137 if (features == (u64)-1) { 138 /* inline data */ 139 ceph_decode_64_safe(p, end, info->inline_version, bad); 140 ceph_decode_32_safe(p, end, info->inline_len, bad); 141 ceph_decode_need(p, end, info->inline_len, bad); 142 info->inline_data = *p; 143 *p += info->inline_len; 144 /* quota */ 145 err = parse_reply_info_quota(p, end, info); 146 if (err < 0) 147 goto out_bad; 148 /* pool namespace */ 149 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 150 if (info->pool_ns_len > 0) { 151 ceph_decode_need(p, end, info->pool_ns_len, bad); 152 info->pool_ns_data = *p; 153 *p += info->pool_ns_len; 154 } 155 156 /* btime */ 157 ceph_decode_need(p, end, sizeof(info->btime), bad); 158 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 159 160 /* change attribute */ 161 ceph_decode_64_safe(p, end, info->change_attr, bad); 162 163 /* dir pin */ 164 if (struct_v >= 2) { 165 ceph_decode_32_safe(p, end, info->dir_pin, bad); 166 } else { 167 info->dir_pin = -ENODATA; 168 } 169 170 /* snapshot birth time, remains zero for v<=2 */ 171 if (struct_v >= 3) { 172 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 173 ceph_decode_copy(p, &info->snap_btime, 174 sizeof(info->snap_btime)); 175 } else { 176 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 177 } 178 179 *p = end; 180 } else { 181 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 182 ceph_decode_64_safe(p, end, info->inline_version, bad); 183 ceph_decode_32_safe(p, end, info->inline_len, bad); 184 ceph_decode_need(p, end, info->inline_len, bad); 185 info->inline_data = *p; 186 *p += info->inline_len; 187 } else 188 info->inline_version = CEPH_INLINE_NONE; 189 190 if (features & CEPH_FEATURE_MDS_QUOTA) { 191 err = parse_reply_info_quota(p, end, info); 192 if (err < 0) 193 goto out_bad; 194 } else { 195 info->max_bytes = 0; 196 info->max_files = 0; 197 } 198 199 info->pool_ns_len = 0; 200 info->pool_ns_data = NULL; 201 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 202 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 203 if (info->pool_ns_len > 0) { 204 ceph_decode_need(p, end, info->pool_ns_len, bad); 205 info->pool_ns_data = *p; 206 *p += info->pool_ns_len; 207 } 208 } 209 210 if (features & CEPH_FEATURE_FS_BTIME) { 211 ceph_decode_need(p, end, sizeof(info->btime), bad); 212 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 213 ceph_decode_64_safe(p, end, info->change_attr, bad); 214 } 215 216 info->dir_pin = -ENODATA; 217 /* info->snap_btime remains zero */ 218 } 219 return 0; 220 bad: 221 err = -EIO; 222 out_bad: 223 return err; 224 } 225 226 static int parse_reply_info_dir(void **p, void *end, 227 struct ceph_mds_reply_dirfrag **dirfrag, 228 u64 features) 229 { 230 if (features == (u64)-1) { 231 u8 struct_v, struct_compat; 232 u32 struct_len; 233 ceph_decode_8_safe(p, end, struct_v, bad); 234 ceph_decode_8_safe(p, end, struct_compat, bad); 235 /* struct_v is expected to be >= 1. we only understand 236 * encoding whose struct_compat == 1. */ 237 if (!struct_v || struct_compat != 1) 238 goto bad; 239 ceph_decode_32_safe(p, end, struct_len, bad); 240 ceph_decode_need(p, end, struct_len, bad); 241 end = *p + struct_len; 242 } 243 244 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 245 *dirfrag = *p; 246 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 247 if (unlikely(*p > end)) 248 goto bad; 249 if (features == (u64)-1) 250 *p = end; 251 return 0; 252 bad: 253 return -EIO; 254 } 255 256 static int parse_reply_info_lease(void **p, void *end, 257 struct ceph_mds_reply_lease **lease, 258 u64 features) 259 { 260 if (features == (u64)-1) { 261 u8 struct_v, struct_compat; 262 u32 struct_len; 263 ceph_decode_8_safe(p, end, struct_v, bad); 264 ceph_decode_8_safe(p, end, struct_compat, bad); 265 /* struct_v is expected to be >= 1. we only understand 266 * encoding whose struct_compat == 1. */ 267 if (!struct_v || struct_compat != 1) 268 goto bad; 269 ceph_decode_32_safe(p, end, struct_len, bad); 270 ceph_decode_need(p, end, struct_len, bad); 271 end = *p + struct_len; 272 } 273 274 ceph_decode_need(p, end, sizeof(**lease), bad); 275 *lease = *p; 276 *p += sizeof(**lease); 277 if (features == (u64)-1) 278 *p = end; 279 return 0; 280 bad: 281 return -EIO; 282 } 283 284 /* 285 * parse a normal reply, which may contain a (dir+)dentry and/or a 286 * target inode. 287 */ 288 static int parse_reply_info_trace(void **p, void *end, 289 struct ceph_mds_reply_info_parsed *info, 290 u64 features) 291 { 292 int err; 293 294 if (info->head->is_dentry) { 295 err = parse_reply_info_in(p, end, &info->diri, features); 296 if (err < 0) 297 goto out_bad; 298 299 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 300 if (err < 0) 301 goto out_bad; 302 303 ceph_decode_32_safe(p, end, info->dname_len, bad); 304 ceph_decode_need(p, end, info->dname_len, bad); 305 info->dname = *p; 306 *p += info->dname_len; 307 308 err = parse_reply_info_lease(p, end, &info->dlease, features); 309 if (err < 0) 310 goto out_bad; 311 } 312 313 if (info->head->is_target) { 314 err = parse_reply_info_in(p, end, &info->targeti, features); 315 if (err < 0) 316 goto out_bad; 317 } 318 319 if (unlikely(*p != end)) 320 goto bad; 321 return 0; 322 323 bad: 324 err = -EIO; 325 out_bad: 326 pr_err("problem parsing mds trace %d\n", err); 327 return err; 328 } 329 330 /* 331 * parse readdir results 332 */ 333 static int parse_reply_info_readdir(void **p, void *end, 334 struct ceph_mds_reply_info_parsed *info, 335 u64 features) 336 { 337 u32 num, i = 0; 338 int err; 339 340 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 341 if (err < 0) 342 goto out_bad; 343 344 ceph_decode_need(p, end, sizeof(num) + 2, bad); 345 num = ceph_decode_32(p); 346 { 347 u16 flags = ceph_decode_16(p); 348 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 349 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 350 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 351 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 352 } 353 if (num == 0) 354 goto done; 355 356 BUG_ON(!info->dir_entries); 357 if ((unsigned long)(info->dir_entries + num) > 358 (unsigned long)info->dir_entries + info->dir_buf_size) { 359 pr_err("dir contents are larger than expected\n"); 360 WARN_ON(1); 361 goto bad; 362 } 363 364 info->dir_nr = num; 365 while (num) { 366 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 367 /* dentry */ 368 ceph_decode_32_safe(p, end, rde->name_len, bad); 369 ceph_decode_need(p, end, rde->name_len, bad); 370 rde->name = *p; 371 *p += rde->name_len; 372 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 373 374 /* dentry lease */ 375 err = parse_reply_info_lease(p, end, &rde->lease, features); 376 if (err) 377 goto out_bad; 378 /* inode */ 379 err = parse_reply_info_in(p, end, &rde->inode, features); 380 if (err < 0) 381 goto out_bad; 382 /* ceph_readdir_prepopulate() will update it */ 383 rde->offset = 0; 384 i++; 385 num--; 386 } 387 388 done: 389 /* Skip over any unrecognized fields */ 390 *p = end; 391 return 0; 392 393 bad: 394 err = -EIO; 395 out_bad: 396 pr_err("problem parsing dir contents %d\n", err); 397 return err; 398 } 399 400 /* 401 * parse fcntl F_GETLK results 402 */ 403 static int parse_reply_info_filelock(void **p, void *end, 404 struct ceph_mds_reply_info_parsed *info, 405 u64 features) 406 { 407 if (*p + sizeof(*info->filelock_reply) > end) 408 goto bad; 409 410 info->filelock_reply = *p; 411 412 /* Skip over any unrecognized fields */ 413 *p = end; 414 return 0; 415 bad: 416 return -EIO; 417 } 418 419 420 #if BITS_PER_LONG == 64 421 422 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 423 424 static int ceph_parse_deleg_inos(void **p, void *end, 425 struct ceph_mds_session *s) 426 { 427 u32 sets; 428 429 ceph_decode_32_safe(p, end, sets, bad); 430 dout("got %u sets of delegated inodes\n", sets); 431 while (sets--) { 432 u64 start, len, ino; 433 434 ceph_decode_64_safe(p, end, start, bad); 435 ceph_decode_64_safe(p, end, len, bad); 436 while (len--) { 437 int err = xa_insert(&s->s_delegated_inos, ino = start++, 438 DELEGATED_INO_AVAILABLE, 439 GFP_KERNEL); 440 if (!err) { 441 dout("added delegated inode 0x%llx\n", 442 start - 1); 443 } else if (err == -EBUSY) { 444 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n", 445 start - 1); 446 } else { 447 return err; 448 } 449 } 450 } 451 return 0; 452 bad: 453 return -EIO; 454 } 455 456 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 457 { 458 unsigned long ino; 459 void *val; 460 461 xa_for_each(&s->s_delegated_inos, ino, val) { 462 val = xa_erase(&s->s_delegated_inos, ino); 463 if (val == DELEGATED_INO_AVAILABLE) 464 return ino; 465 } 466 return 0; 467 } 468 469 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 470 { 471 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 472 GFP_KERNEL); 473 } 474 #else /* BITS_PER_LONG == 64 */ 475 /* 476 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 477 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 478 * and bottom words? 479 */ 480 static int ceph_parse_deleg_inos(void **p, void *end, 481 struct ceph_mds_session *s) 482 { 483 u32 sets; 484 485 ceph_decode_32_safe(p, end, sets, bad); 486 if (sets) 487 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 488 return 0; 489 bad: 490 return -EIO; 491 } 492 493 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 494 { 495 return 0; 496 } 497 498 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 499 { 500 return 0; 501 } 502 #endif /* BITS_PER_LONG == 64 */ 503 504 /* 505 * parse create results 506 */ 507 static int parse_reply_info_create(void **p, void *end, 508 struct ceph_mds_reply_info_parsed *info, 509 u64 features, struct ceph_mds_session *s) 510 { 511 int ret; 512 513 if (features == (u64)-1 || 514 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 515 if (*p == end) { 516 /* Malformed reply? */ 517 info->has_create_ino = false; 518 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 519 u8 struct_v, struct_compat; 520 u32 len; 521 522 info->has_create_ino = true; 523 ceph_decode_8_safe(p, end, struct_v, bad); 524 ceph_decode_8_safe(p, end, struct_compat, bad); 525 ceph_decode_32_safe(p, end, len, bad); 526 ceph_decode_64_safe(p, end, info->ino, bad); 527 ret = ceph_parse_deleg_inos(p, end, s); 528 if (ret) 529 return ret; 530 } else { 531 /* legacy */ 532 ceph_decode_64_safe(p, end, info->ino, bad); 533 info->has_create_ino = true; 534 } 535 } else { 536 if (*p != end) 537 goto bad; 538 } 539 540 /* Skip over any unrecognized fields */ 541 *p = end; 542 return 0; 543 bad: 544 return -EIO; 545 } 546 547 /* 548 * parse extra results 549 */ 550 static int parse_reply_info_extra(void **p, void *end, 551 struct ceph_mds_reply_info_parsed *info, 552 u64 features, struct ceph_mds_session *s) 553 { 554 u32 op = le32_to_cpu(info->head->op); 555 556 if (op == CEPH_MDS_OP_GETFILELOCK) 557 return parse_reply_info_filelock(p, end, info, features); 558 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 559 return parse_reply_info_readdir(p, end, info, features); 560 else if (op == CEPH_MDS_OP_CREATE) 561 return parse_reply_info_create(p, end, info, features, s); 562 else 563 return -EIO; 564 } 565 566 /* 567 * parse entire mds reply 568 */ 569 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 570 struct ceph_mds_reply_info_parsed *info, 571 u64 features) 572 { 573 void *p, *end; 574 u32 len; 575 int err; 576 577 info->head = msg->front.iov_base; 578 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 579 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 580 581 /* trace */ 582 ceph_decode_32_safe(&p, end, len, bad); 583 if (len > 0) { 584 ceph_decode_need(&p, end, len, bad); 585 err = parse_reply_info_trace(&p, p+len, info, features); 586 if (err < 0) 587 goto out_bad; 588 } 589 590 /* extra */ 591 ceph_decode_32_safe(&p, end, len, bad); 592 if (len > 0) { 593 ceph_decode_need(&p, end, len, bad); 594 err = parse_reply_info_extra(&p, p+len, info, features, s); 595 if (err < 0) 596 goto out_bad; 597 } 598 599 /* snap blob */ 600 ceph_decode_32_safe(&p, end, len, bad); 601 info->snapblob_len = len; 602 info->snapblob = p; 603 p += len; 604 605 if (p != end) 606 goto bad; 607 return 0; 608 609 bad: 610 err = -EIO; 611 out_bad: 612 pr_err("mds parse_reply err %d\n", err); 613 return err; 614 } 615 616 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 617 { 618 if (!info->dir_entries) 619 return; 620 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 621 } 622 623 624 /* 625 * sessions 626 */ 627 const char *ceph_session_state_name(int s) 628 { 629 switch (s) { 630 case CEPH_MDS_SESSION_NEW: return "new"; 631 case CEPH_MDS_SESSION_OPENING: return "opening"; 632 case CEPH_MDS_SESSION_OPEN: return "open"; 633 case CEPH_MDS_SESSION_HUNG: return "hung"; 634 case CEPH_MDS_SESSION_CLOSING: return "closing"; 635 case CEPH_MDS_SESSION_CLOSED: return "closed"; 636 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 637 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 638 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 639 default: return "???"; 640 } 641 } 642 643 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 644 { 645 if (refcount_inc_not_zero(&s->s_ref)) { 646 dout("mdsc get_session %p %d -> %d\n", s, 647 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref)); 648 return s; 649 } else { 650 dout("mdsc get_session %p 0 -- FAIL\n", s); 651 return NULL; 652 } 653 } 654 655 void ceph_put_mds_session(struct ceph_mds_session *s) 656 { 657 dout("mdsc put_session %p %d -> %d\n", s, 658 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1); 659 if (refcount_dec_and_test(&s->s_ref)) { 660 if (s->s_auth.authorizer) 661 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 662 WARN_ON(mutex_is_locked(&s->s_mutex)); 663 xa_destroy(&s->s_delegated_inos); 664 kfree(s); 665 } 666 } 667 668 /* 669 * called under mdsc->mutex 670 */ 671 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 672 int mds) 673 { 674 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 675 return NULL; 676 return ceph_get_mds_session(mdsc->sessions[mds]); 677 } 678 679 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 680 { 681 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 682 return false; 683 else 684 return true; 685 } 686 687 static int __verify_registered_session(struct ceph_mds_client *mdsc, 688 struct ceph_mds_session *s) 689 { 690 if (s->s_mds >= mdsc->max_sessions || 691 mdsc->sessions[s->s_mds] != s) 692 return -ENOENT; 693 return 0; 694 } 695 696 /* 697 * create+register a new session for given mds. 698 * called under mdsc->mutex. 699 */ 700 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 701 int mds) 702 { 703 struct ceph_mds_session *s; 704 705 if (mds >= mdsc->mdsmap->possible_max_rank) 706 return ERR_PTR(-EINVAL); 707 708 s = kzalloc(sizeof(*s), GFP_NOFS); 709 if (!s) 710 return ERR_PTR(-ENOMEM); 711 712 if (mds >= mdsc->max_sessions) { 713 int newmax = 1 << get_count_order(mds + 1); 714 struct ceph_mds_session **sa; 715 716 dout("%s: realloc to %d\n", __func__, newmax); 717 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 718 if (!sa) 719 goto fail_realloc; 720 if (mdsc->sessions) { 721 memcpy(sa, mdsc->sessions, 722 mdsc->max_sessions * sizeof(void *)); 723 kfree(mdsc->sessions); 724 } 725 mdsc->sessions = sa; 726 mdsc->max_sessions = newmax; 727 } 728 729 dout("%s: mds%d\n", __func__, mds); 730 s->s_mdsc = mdsc; 731 s->s_mds = mds; 732 s->s_state = CEPH_MDS_SESSION_NEW; 733 s->s_ttl = 0; 734 s->s_seq = 0; 735 mutex_init(&s->s_mutex); 736 737 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 738 739 spin_lock_init(&s->s_gen_ttl_lock); 740 s->s_cap_gen = 1; 741 s->s_cap_ttl = jiffies - 1; 742 743 spin_lock_init(&s->s_cap_lock); 744 s->s_renew_requested = 0; 745 s->s_renew_seq = 0; 746 INIT_LIST_HEAD(&s->s_caps); 747 s->s_nr_caps = 0; 748 refcount_set(&s->s_ref, 1); 749 INIT_LIST_HEAD(&s->s_waiting); 750 INIT_LIST_HEAD(&s->s_unsafe); 751 xa_init(&s->s_delegated_inos); 752 s->s_num_cap_releases = 0; 753 s->s_cap_reconnect = 0; 754 s->s_cap_iterator = NULL; 755 INIT_LIST_HEAD(&s->s_cap_releases); 756 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 757 758 INIT_LIST_HEAD(&s->s_cap_dirty); 759 INIT_LIST_HEAD(&s->s_cap_flushing); 760 761 mdsc->sessions[mds] = s; 762 atomic_inc(&mdsc->num_sessions); 763 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 764 765 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 766 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 767 768 return s; 769 770 fail_realloc: 771 kfree(s); 772 return ERR_PTR(-ENOMEM); 773 } 774 775 /* 776 * called under mdsc->mutex 777 */ 778 static void __unregister_session(struct ceph_mds_client *mdsc, 779 struct ceph_mds_session *s) 780 { 781 dout("__unregister_session mds%d %p\n", s->s_mds, s); 782 BUG_ON(mdsc->sessions[s->s_mds] != s); 783 mdsc->sessions[s->s_mds] = NULL; 784 ceph_con_close(&s->s_con); 785 ceph_put_mds_session(s); 786 atomic_dec(&mdsc->num_sessions); 787 } 788 789 /* 790 * drop session refs in request. 791 * 792 * should be last request ref, or hold mdsc->mutex 793 */ 794 static void put_request_session(struct ceph_mds_request *req) 795 { 796 if (req->r_session) { 797 ceph_put_mds_session(req->r_session); 798 req->r_session = NULL; 799 } 800 } 801 802 void ceph_mdsc_release_request(struct kref *kref) 803 { 804 struct ceph_mds_request *req = container_of(kref, 805 struct ceph_mds_request, 806 r_kref); 807 ceph_mdsc_release_dir_caps_no_check(req); 808 destroy_reply_info(&req->r_reply_info); 809 if (req->r_request) 810 ceph_msg_put(req->r_request); 811 if (req->r_reply) 812 ceph_msg_put(req->r_reply); 813 if (req->r_inode) { 814 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 815 /* avoid calling iput_final() in mds dispatch threads */ 816 ceph_async_iput(req->r_inode); 817 } 818 if (req->r_parent) { 819 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 820 ceph_async_iput(req->r_parent); 821 } 822 ceph_async_iput(req->r_target_inode); 823 if (req->r_dentry) 824 dput(req->r_dentry); 825 if (req->r_old_dentry) 826 dput(req->r_old_dentry); 827 if (req->r_old_dentry_dir) { 828 /* 829 * track (and drop pins for) r_old_dentry_dir 830 * separately, since r_old_dentry's d_parent may have 831 * changed between the dir mutex being dropped and 832 * this request being freed. 833 */ 834 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 835 CEPH_CAP_PIN); 836 ceph_async_iput(req->r_old_dentry_dir); 837 } 838 kfree(req->r_path1); 839 kfree(req->r_path2); 840 if (req->r_pagelist) 841 ceph_pagelist_release(req->r_pagelist); 842 put_request_session(req); 843 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 844 WARN_ON_ONCE(!list_empty(&req->r_wait)); 845 kmem_cache_free(ceph_mds_request_cachep, req); 846 } 847 848 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 849 850 /* 851 * lookup session, bump ref if found. 852 * 853 * called under mdsc->mutex. 854 */ 855 static struct ceph_mds_request * 856 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 857 { 858 struct ceph_mds_request *req; 859 860 req = lookup_request(&mdsc->request_tree, tid); 861 if (req) 862 ceph_mdsc_get_request(req); 863 864 return req; 865 } 866 867 /* 868 * Register an in-flight request, and assign a tid. Link to directory 869 * are modifying (if any). 870 * 871 * Called under mdsc->mutex. 872 */ 873 static void __register_request(struct ceph_mds_client *mdsc, 874 struct ceph_mds_request *req, 875 struct inode *dir) 876 { 877 int ret = 0; 878 879 req->r_tid = ++mdsc->last_tid; 880 if (req->r_num_caps) { 881 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 882 req->r_num_caps); 883 if (ret < 0) { 884 pr_err("__register_request %p " 885 "failed to reserve caps: %d\n", req, ret); 886 /* set req->r_err to fail early from __do_request */ 887 req->r_err = ret; 888 return; 889 } 890 } 891 dout("__register_request %p tid %lld\n", req, req->r_tid); 892 ceph_mdsc_get_request(req); 893 insert_request(&mdsc->request_tree, req); 894 895 req->r_uid = current_fsuid(); 896 req->r_gid = current_fsgid(); 897 898 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 899 mdsc->oldest_tid = req->r_tid; 900 901 if (dir) { 902 struct ceph_inode_info *ci = ceph_inode(dir); 903 904 ihold(dir); 905 req->r_unsafe_dir = dir; 906 spin_lock(&ci->i_unsafe_lock); 907 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 908 spin_unlock(&ci->i_unsafe_lock); 909 } 910 } 911 912 static void __unregister_request(struct ceph_mds_client *mdsc, 913 struct ceph_mds_request *req) 914 { 915 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 916 917 /* Never leave an unregistered request on an unsafe list! */ 918 list_del_init(&req->r_unsafe_item); 919 920 if (req->r_tid == mdsc->oldest_tid) { 921 struct rb_node *p = rb_next(&req->r_node); 922 mdsc->oldest_tid = 0; 923 while (p) { 924 struct ceph_mds_request *next_req = 925 rb_entry(p, struct ceph_mds_request, r_node); 926 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 927 mdsc->oldest_tid = next_req->r_tid; 928 break; 929 } 930 p = rb_next(p); 931 } 932 } 933 934 erase_request(&mdsc->request_tree, req); 935 936 if (req->r_unsafe_dir) { 937 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 938 spin_lock(&ci->i_unsafe_lock); 939 list_del_init(&req->r_unsafe_dir_item); 940 spin_unlock(&ci->i_unsafe_lock); 941 } 942 if (req->r_target_inode && 943 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 944 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 945 spin_lock(&ci->i_unsafe_lock); 946 list_del_init(&req->r_unsafe_target_item); 947 spin_unlock(&ci->i_unsafe_lock); 948 } 949 950 if (req->r_unsafe_dir) { 951 /* avoid calling iput_final() in mds dispatch threads */ 952 ceph_async_iput(req->r_unsafe_dir); 953 req->r_unsafe_dir = NULL; 954 } 955 956 complete_all(&req->r_safe_completion); 957 958 ceph_mdsc_put_request(req); 959 } 960 961 /* 962 * Walk back up the dentry tree until we hit a dentry representing a 963 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 964 * when calling this) to ensure that the objects won't disappear while we're 965 * working with them. Once we hit a candidate dentry, we attempt to take a 966 * reference to it, and return that as the result. 967 */ 968 static struct inode *get_nonsnap_parent(struct dentry *dentry) 969 { 970 struct inode *inode = NULL; 971 972 while (dentry && !IS_ROOT(dentry)) { 973 inode = d_inode_rcu(dentry); 974 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 975 break; 976 dentry = dentry->d_parent; 977 } 978 if (inode) 979 inode = igrab(inode); 980 return inode; 981 } 982 983 /* 984 * Choose mds to send request to next. If there is a hint set in the 985 * request (e.g., due to a prior forward hint from the mds), use that. 986 * Otherwise, consult frag tree and/or caps to identify the 987 * appropriate mds. If all else fails, choose randomly. 988 * 989 * Called under mdsc->mutex. 990 */ 991 static int __choose_mds(struct ceph_mds_client *mdsc, 992 struct ceph_mds_request *req, 993 bool *random) 994 { 995 struct inode *inode; 996 struct ceph_inode_info *ci; 997 struct ceph_cap *cap; 998 int mode = req->r_direct_mode; 999 int mds = -1; 1000 u32 hash = req->r_direct_hash; 1001 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1002 1003 if (random) 1004 *random = false; 1005 1006 /* 1007 * is there a specific mds we should try? ignore hint if we have 1008 * no session and the mds is not up (active or recovering). 1009 */ 1010 if (req->r_resend_mds >= 0 && 1011 (__have_session(mdsc, req->r_resend_mds) || 1012 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1013 dout("%s using resend_mds mds%d\n", __func__, 1014 req->r_resend_mds); 1015 return req->r_resend_mds; 1016 } 1017 1018 if (mode == USE_RANDOM_MDS) 1019 goto random; 1020 1021 inode = NULL; 1022 if (req->r_inode) { 1023 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1024 inode = req->r_inode; 1025 ihold(inode); 1026 } else { 1027 /* req->r_dentry is non-null for LSSNAP request */ 1028 rcu_read_lock(); 1029 inode = get_nonsnap_parent(req->r_dentry); 1030 rcu_read_unlock(); 1031 dout("%s using snapdir's parent %p\n", __func__, inode); 1032 } 1033 } else if (req->r_dentry) { 1034 /* ignore race with rename; old or new d_parent is okay */ 1035 struct dentry *parent; 1036 struct inode *dir; 1037 1038 rcu_read_lock(); 1039 parent = READ_ONCE(req->r_dentry->d_parent); 1040 dir = req->r_parent ? : d_inode_rcu(parent); 1041 1042 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1043 /* not this fs or parent went negative */ 1044 inode = d_inode(req->r_dentry); 1045 if (inode) 1046 ihold(inode); 1047 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1048 /* direct snapped/virtual snapdir requests 1049 * based on parent dir inode */ 1050 inode = get_nonsnap_parent(parent); 1051 dout("%s using nonsnap parent %p\n", __func__, inode); 1052 } else { 1053 /* dentry target */ 1054 inode = d_inode(req->r_dentry); 1055 if (!inode || mode == USE_AUTH_MDS) { 1056 /* dir + name */ 1057 inode = igrab(dir); 1058 hash = ceph_dentry_hash(dir, req->r_dentry); 1059 is_hash = true; 1060 } else { 1061 ihold(inode); 1062 } 1063 } 1064 rcu_read_unlock(); 1065 } 1066 1067 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, 1068 hash, mode); 1069 if (!inode) 1070 goto random; 1071 ci = ceph_inode(inode); 1072 1073 if (is_hash && S_ISDIR(inode->i_mode)) { 1074 struct ceph_inode_frag frag; 1075 int found; 1076 1077 ceph_choose_frag(ci, hash, &frag, &found); 1078 if (found) { 1079 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1080 u8 r; 1081 1082 /* choose a random replica */ 1083 get_random_bytes(&r, 1); 1084 r %= frag.ndist; 1085 mds = frag.dist[r]; 1086 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", 1087 __func__, inode, ceph_vinop(inode), 1088 frag.frag, mds, (int)r, frag.ndist); 1089 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1090 CEPH_MDS_STATE_ACTIVE && 1091 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1092 goto out; 1093 } 1094 1095 /* since this file/dir wasn't known to be 1096 * replicated, then we want to look for the 1097 * authoritative mds. */ 1098 if (frag.mds >= 0) { 1099 /* choose auth mds */ 1100 mds = frag.mds; 1101 dout("%s %p %llx.%llx frag %u mds%d (auth)\n", 1102 __func__, inode, ceph_vinop(inode), 1103 frag.frag, mds); 1104 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1105 CEPH_MDS_STATE_ACTIVE) { 1106 if (mode == USE_ANY_MDS && 1107 !ceph_mdsmap_is_laggy(mdsc->mdsmap, 1108 mds)) 1109 goto out; 1110 } 1111 } 1112 mode = USE_AUTH_MDS; 1113 } 1114 } 1115 1116 spin_lock(&ci->i_ceph_lock); 1117 cap = NULL; 1118 if (mode == USE_AUTH_MDS) 1119 cap = ci->i_auth_cap; 1120 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1121 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1122 if (!cap) { 1123 spin_unlock(&ci->i_ceph_lock); 1124 ceph_async_iput(inode); 1125 goto random; 1126 } 1127 mds = cap->session->s_mds; 1128 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, 1129 inode, ceph_vinop(inode), mds, 1130 cap == ci->i_auth_cap ? "auth " : "", cap); 1131 spin_unlock(&ci->i_ceph_lock); 1132 out: 1133 /* avoid calling iput_final() while holding mdsc->mutex or 1134 * in mds dispatch threads */ 1135 ceph_async_iput(inode); 1136 return mds; 1137 1138 random: 1139 if (random) 1140 *random = true; 1141 1142 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1143 dout("%s chose random mds%d\n", __func__, mds); 1144 return mds; 1145 } 1146 1147 1148 /* 1149 * session messages 1150 */ 1151 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 1152 { 1153 struct ceph_msg *msg; 1154 struct ceph_mds_session_head *h; 1155 1156 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1157 false); 1158 if (!msg) { 1159 pr_err("create_session_msg ENOMEM creating msg\n"); 1160 return NULL; 1161 } 1162 h = msg->front.iov_base; 1163 h->op = cpu_to_le32(op); 1164 h->seq = cpu_to_le64(seq); 1165 1166 return msg; 1167 } 1168 1169 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1170 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1171 static void encode_supported_features(void **p, void *end) 1172 { 1173 static const size_t count = ARRAY_SIZE(feature_bits); 1174 1175 if (count > 0) { 1176 size_t i; 1177 size_t size = FEATURE_BYTES(count); 1178 1179 BUG_ON(*p + 4 + size > end); 1180 ceph_encode_32(p, size); 1181 memset(*p, 0, size); 1182 for (i = 0; i < count; i++) 1183 ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8); 1184 *p += size; 1185 } else { 1186 BUG_ON(*p + 4 > end); 1187 ceph_encode_32(p, 0); 1188 } 1189 } 1190 1191 /* 1192 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1193 * to include additional client metadata fields. 1194 */ 1195 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1196 { 1197 struct ceph_msg *msg; 1198 struct ceph_mds_session_head *h; 1199 int i = -1; 1200 int extra_bytes = 0; 1201 int metadata_key_count = 0; 1202 struct ceph_options *opt = mdsc->fsc->client->options; 1203 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1204 size_t size, count; 1205 void *p, *end; 1206 1207 const char* metadata[][2] = { 1208 {"hostname", mdsc->nodename}, 1209 {"kernel_version", init_utsname()->release}, 1210 {"entity_id", opt->name ? : ""}, 1211 {"root", fsopt->server_path ? : "/"}, 1212 {NULL, NULL} 1213 }; 1214 1215 /* Calculate serialized length of metadata */ 1216 extra_bytes = 4; /* map length */ 1217 for (i = 0; metadata[i][0]; ++i) { 1218 extra_bytes += 8 + strlen(metadata[i][0]) + 1219 strlen(metadata[i][1]); 1220 metadata_key_count++; 1221 } 1222 1223 /* supported feature */ 1224 size = 0; 1225 count = ARRAY_SIZE(feature_bits); 1226 if (count > 0) 1227 size = FEATURE_BYTES(count); 1228 extra_bytes += 4 + size; 1229 1230 /* Allocate the message */ 1231 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1232 GFP_NOFS, false); 1233 if (!msg) { 1234 pr_err("create_session_msg ENOMEM creating msg\n"); 1235 return NULL; 1236 } 1237 p = msg->front.iov_base; 1238 end = p + msg->front.iov_len; 1239 1240 h = p; 1241 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1242 h->seq = cpu_to_le64(seq); 1243 1244 /* 1245 * Serialize client metadata into waiting buffer space, using 1246 * the format that userspace expects for map<string, string> 1247 * 1248 * ClientSession messages with metadata are v3 1249 */ 1250 msg->hdr.version = cpu_to_le16(3); 1251 msg->hdr.compat_version = cpu_to_le16(1); 1252 1253 /* The write pointer, following the session_head structure */ 1254 p += sizeof(*h); 1255 1256 /* Number of entries in the map */ 1257 ceph_encode_32(&p, metadata_key_count); 1258 1259 /* Two length-prefixed strings for each entry in the map */ 1260 for (i = 0; metadata[i][0]; ++i) { 1261 size_t const key_len = strlen(metadata[i][0]); 1262 size_t const val_len = strlen(metadata[i][1]); 1263 1264 ceph_encode_32(&p, key_len); 1265 memcpy(p, metadata[i][0], key_len); 1266 p += key_len; 1267 ceph_encode_32(&p, val_len); 1268 memcpy(p, metadata[i][1], val_len); 1269 p += val_len; 1270 } 1271 1272 encode_supported_features(&p, end); 1273 msg->front.iov_len = p - msg->front.iov_base; 1274 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1275 1276 return msg; 1277 } 1278 1279 /* 1280 * send session open request. 1281 * 1282 * called under mdsc->mutex 1283 */ 1284 static int __open_session(struct ceph_mds_client *mdsc, 1285 struct ceph_mds_session *session) 1286 { 1287 struct ceph_msg *msg; 1288 int mstate; 1289 int mds = session->s_mds; 1290 1291 /* wait for mds to go active? */ 1292 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1293 dout("open_session to mds%d (%s)\n", mds, 1294 ceph_mds_state_name(mstate)); 1295 session->s_state = CEPH_MDS_SESSION_OPENING; 1296 session->s_renew_requested = jiffies; 1297 1298 /* send connect message */ 1299 msg = create_session_open_msg(mdsc, session->s_seq); 1300 if (!msg) 1301 return -ENOMEM; 1302 ceph_con_send(&session->s_con, msg); 1303 return 0; 1304 } 1305 1306 /* 1307 * open sessions for any export targets for the given mds 1308 * 1309 * called under mdsc->mutex 1310 */ 1311 static struct ceph_mds_session * 1312 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1313 { 1314 struct ceph_mds_session *session; 1315 1316 session = __ceph_lookup_mds_session(mdsc, target); 1317 if (!session) { 1318 session = register_session(mdsc, target); 1319 if (IS_ERR(session)) 1320 return session; 1321 } 1322 if (session->s_state == CEPH_MDS_SESSION_NEW || 1323 session->s_state == CEPH_MDS_SESSION_CLOSING) 1324 __open_session(mdsc, session); 1325 1326 return session; 1327 } 1328 1329 struct ceph_mds_session * 1330 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1331 { 1332 struct ceph_mds_session *session; 1333 1334 dout("open_export_target_session to mds%d\n", target); 1335 1336 mutex_lock(&mdsc->mutex); 1337 session = __open_export_target_session(mdsc, target); 1338 mutex_unlock(&mdsc->mutex); 1339 1340 return session; 1341 } 1342 1343 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1344 struct ceph_mds_session *session) 1345 { 1346 struct ceph_mds_info *mi; 1347 struct ceph_mds_session *ts; 1348 int i, mds = session->s_mds; 1349 1350 if (mds >= mdsc->mdsmap->possible_max_rank) 1351 return; 1352 1353 mi = &mdsc->mdsmap->m_info[mds]; 1354 dout("open_export_target_sessions for mds%d (%d targets)\n", 1355 session->s_mds, mi->num_export_targets); 1356 1357 for (i = 0; i < mi->num_export_targets; i++) { 1358 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1359 if (!IS_ERR(ts)) 1360 ceph_put_mds_session(ts); 1361 } 1362 } 1363 1364 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1365 struct ceph_mds_session *session) 1366 { 1367 mutex_lock(&mdsc->mutex); 1368 __open_export_target_sessions(mdsc, session); 1369 mutex_unlock(&mdsc->mutex); 1370 } 1371 1372 /* 1373 * session caps 1374 */ 1375 1376 static void detach_cap_releases(struct ceph_mds_session *session, 1377 struct list_head *target) 1378 { 1379 lockdep_assert_held(&session->s_cap_lock); 1380 1381 list_splice_init(&session->s_cap_releases, target); 1382 session->s_num_cap_releases = 0; 1383 dout("dispose_cap_releases mds%d\n", session->s_mds); 1384 } 1385 1386 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1387 struct list_head *dispose) 1388 { 1389 while (!list_empty(dispose)) { 1390 struct ceph_cap *cap; 1391 /* zero out the in-progress message */ 1392 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1393 list_del(&cap->session_caps); 1394 ceph_put_cap(mdsc, cap); 1395 } 1396 } 1397 1398 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1399 struct ceph_mds_session *session) 1400 { 1401 struct ceph_mds_request *req; 1402 struct rb_node *p; 1403 struct ceph_inode_info *ci; 1404 1405 dout("cleanup_session_requests mds%d\n", session->s_mds); 1406 mutex_lock(&mdsc->mutex); 1407 while (!list_empty(&session->s_unsafe)) { 1408 req = list_first_entry(&session->s_unsafe, 1409 struct ceph_mds_request, r_unsafe_item); 1410 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1411 req->r_tid); 1412 if (req->r_target_inode) { 1413 /* dropping unsafe change of inode's attributes */ 1414 ci = ceph_inode(req->r_target_inode); 1415 errseq_set(&ci->i_meta_err, -EIO); 1416 } 1417 if (req->r_unsafe_dir) { 1418 /* dropping unsafe directory operation */ 1419 ci = ceph_inode(req->r_unsafe_dir); 1420 errseq_set(&ci->i_meta_err, -EIO); 1421 } 1422 __unregister_request(mdsc, req); 1423 } 1424 /* zero r_attempts, so kick_requests() will re-send requests */ 1425 p = rb_first(&mdsc->request_tree); 1426 while (p) { 1427 req = rb_entry(p, struct ceph_mds_request, r_node); 1428 p = rb_next(p); 1429 if (req->r_session && 1430 req->r_session->s_mds == session->s_mds) 1431 req->r_attempts = 0; 1432 } 1433 mutex_unlock(&mdsc->mutex); 1434 } 1435 1436 /* 1437 * Helper to safely iterate over all caps associated with a session, with 1438 * special care taken to handle a racing __ceph_remove_cap(). 1439 * 1440 * Caller must hold session s_mutex. 1441 */ 1442 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1443 int (*cb)(struct inode *, struct ceph_cap *, 1444 void *), void *arg) 1445 { 1446 struct list_head *p; 1447 struct ceph_cap *cap; 1448 struct inode *inode, *last_inode = NULL; 1449 struct ceph_cap *old_cap = NULL; 1450 int ret; 1451 1452 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1453 spin_lock(&session->s_cap_lock); 1454 p = session->s_caps.next; 1455 while (p != &session->s_caps) { 1456 cap = list_entry(p, struct ceph_cap, session_caps); 1457 inode = igrab(&cap->ci->vfs_inode); 1458 if (!inode) { 1459 p = p->next; 1460 continue; 1461 } 1462 session->s_cap_iterator = cap; 1463 spin_unlock(&session->s_cap_lock); 1464 1465 if (last_inode) { 1466 /* avoid calling iput_final() while holding 1467 * s_mutex or in mds dispatch threads */ 1468 ceph_async_iput(last_inode); 1469 last_inode = NULL; 1470 } 1471 if (old_cap) { 1472 ceph_put_cap(session->s_mdsc, old_cap); 1473 old_cap = NULL; 1474 } 1475 1476 ret = cb(inode, cap, arg); 1477 last_inode = inode; 1478 1479 spin_lock(&session->s_cap_lock); 1480 p = p->next; 1481 if (!cap->ci) { 1482 dout("iterate_session_caps finishing cap %p removal\n", 1483 cap); 1484 BUG_ON(cap->session != session); 1485 cap->session = NULL; 1486 list_del_init(&cap->session_caps); 1487 session->s_nr_caps--; 1488 if (cap->queue_release) 1489 __ceph_queue_cap_release(session, cap); 1490 else 1491 old_cap = cap; /* put_cap it w/o locks held */ 1492 } 1493 if (ret < 0) 1494 goto out; 1495 } 1496 ret = 0; 1497 out: 1498 session->s_cap_iterator = NULL; 1499 spin_unlock(&session->s_cap_lock); 1500 1501 ceph_async_iput(last_inode); 1502 if (old_cap) 1503 ceph_put_cap(session->s_mdsc, old_cap); 1504 1505 return ret; 1506 } 1507 1508 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1509 void *arg) 1510 { 1511 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg; 1512 struct ceph_inode_info *ci = ceph_inode(inode); 1513 LIST_HEAD(to_remove); 1514 bool dirty_dropped = false; 1515 bool invalidate = false; 1516 1517 dout("removing cap %p, ci is %p, inode is %p\n", 1518 cap, ci, &ci->vfs_inode); 1519 spin_lock(&ci->i_ceph_lock); 1520 __ceph_remove_cap(cap, false); 1521 if (!ci->i_auth_cap) { 1522 struct ceph_cap_flush *cf; 1523 struct ceph_mds_client *mdsc = fsc->mdsc; 1524 1525 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 1526 if (inode->i_data.nrpages > 0) 1527 invalidate = true; 1528 if (ci->i_wrbuffer_ref > 0) 1529 mapping_set_error(&inode->i_data, -EIO); 1530 } 1531 1532 while (!list_empty(&ci->i_cap_flush_list)) { 1533 cf = list_first_entry(&ci->i_cap_flush_list, 1534 struct ceph_cap_flush, i_list); 1535 list_move(&cf->i_list, &to_remove); 1536 } 1537 1538 spin_lock(&mdsc->cap_dirty_lock); 1539 1540 list_for_each_entry(cf, &to_remove, i_list) 1541 list_del(&cf->g_list); 1542 1543 if (!list_empty(&ci->i_dirty_item)) { 1544 pr_warn_ratelimited( 1545 " dropping dirty %s state for %p %lld\n", 1546 ceph_cap_string(ci->i_dirty_caps), 1547 inode, ceph_ino(inode)); 1548 ci->i_dirty_caps = 0; 1549 list_del_init(&ci->i_dirty_item); 1550 dirty_dropped = true; 1551 } 1552 if (!list_empty(&ci->i_flushing_item)) { 1553 pr_warn_ratelimited( 1554 " dropping dirty+flushing %s state for %p %lld\n", 1555 ceph_cap_string(ci->i_flushing_caps), 1556 inode, ceph_ino(inode)); 1557 ci->i_flushing_caps = 0; 1558 list_del_init(&ci->i_flushing_item); 1559 mdsc->num_cap_flushing--; 1560 dirty_dropped = true; 1561 } 1562 spin_unlock(&mdsc->cap_dirty_lock); 1563 1564 if (dirty_dropped) { 1565 errseq_set(&ci->i_meta_err, -EIO); 1566 1567 if (ci->i_wrbuffer_ref_head == 0 && 1568 ci->i_wr_ref == 0 && 1569 ci->i_dirty_caps == 0 && 1570 ci->i_flushing_caps == 0) { 1571 ceph_put_snap_context(ci->i_head_snapc); 1572 ci->i_head_snapc = NULL; 1573 } 1574 } 1575 1576 if (atomic_read(&ci->i_filelock_ref) > 0) { 1577 /* make further file lock syscall return -EIO */ 1578 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK; 1579 pr_warn_ratelimited(" dropping file locks for %p %lld\n", 1580 inode, ceph_ino(inode)); 1581 } 1582 1583 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { 1584 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); 1585 ci->i_prealloc_cap_flush = NULL; 1586 } 1587 } 1588 spin_unlock(&ci->i_ceph_lock); 1589 while (!list_empty(&to_remove)) { 1590 struct ceph_cap_flush *cf; 1591 cf = list_first_entry(&to_remove, 1592 struct ceph_cap_flush, i_list); 1593 list_del(&cf->i_list); 1594 ceph_free_cap_flush(cf); 1595 } 1596 1597 wake_up_all(&ci->i_cap_wq); 1598 if (invalidate) 1599 ceph_queue_invalidate(inode); 1600 if (dirty_dropped) 1601 iput(inode); 1602 return 0; 1603 } 1604 1605 /* 1606 * caller must hold session s_mutex 1607 */ 1608 static void remove_session_caps(struct ceph_mds_session *session) 1609 { 1610 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1611 struct super_block *sb = fsc->sb; 1612 LIST_HEAD(dispose); 1613 1614 dout("remove_session_caps on %p\n", session); 1615 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1616 1617 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1618 1619 spin_lock(&session->s_cap_lock); 1620 if (session->s_nr_caps > 0) { 1621 struct inode *inode; 1622 struct ceph_cap *cap, *prev = NULL; 1623 struct ceph_vino vino; 1624 /* 1625 * iterate_session_caps() skips inodes that are being 1626 * deleted, we need to wait until deletions are complete. 1627 * __wait_on_freeing_inode() is designed for the job, 1628 * but it is not exported, so use lookup inode function 1629 * to access it. 1630 */ 1631 while (!list_empty(&session->s_caps)) { 1632 cap = list_entry(session->s_caps.next, 1633 struct ceph_cap, session_caps); 1634 if (cap == prev) 1635 break; 1636 prev = cap; 1637 vino = cap->ci->i_vino; 1638 spin_unlock(&session->s_cap_lock); 1639 1640 inode = ceph_find_inode(sb, vino); 1641 /* avoid calling iput_final() while holding s_mutex */ 1642 ceph_async_iput(inode); 1643 1644 spin_lock(&session->s_cap_lock); 1645 } 1646 } 1647 1648 // drop cap expires and unlock s_cap_lock 1649 detach_cap_releases(session, &dispose); 1650 1651 BUG_ON(session->s_nr_caps > 0); 1652 BUG_ON(!list_empty(&session->s_cap_flushing)); 1653 spin_unlock(&session->s_cap_lock); 1654 dispose_cap_releases(session->s_mdsc, &dispose); 1655 } 1656 1657 enum { 1658 RECONNECT, 1659 RENEWCAPS, 1660 FORCE_RO, 1661 }; 1662 1663 /* 1664 * wake up any threads waiting on this session's caps. if the cap is 1665 * old (didn't get renewed on the client reconnect), remove it now. 1666 * 1667 * caller must hold s_mutex. 1668 */ 1669 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1670 void *arg) 1671 { 1672 struct ceph_inode_info *ci = ceph_inode(inode); 1673 unsigned long ev = (unsigned long)arg; 1674 1675 if (ev == RECONNECT) { 1676 spin_lock(&ci->i_ceph_lock); 1677 ci->i_wanted_max_size = 0; 1678 ci->i_requested_max_size = 0; 1679 spin_unlock(&ci->i_ceph_lock); 1680 } else if (ev == RENEWCAPS) { 1681 if (cap->cap_gen < cap->session->s_cap_gen) { 1682 /* mds did not re-issue stale cap */ 1683 spin_lock(&ci->i_ceph_lock); 1684 cap->issued = cap->implemented = CEPH_CAP_PIN; 1685 spin_unlock(&ci->i_ceph_lock); 1686 } 1687 } else if (ev == FORCE_RO) { 1688 } 1689 wake_up_all(&ci->i_cap_wq); 1690 return 0; 1691 } 1692 1693 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1694 { 1695 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1696 ceph_iterate_session_caps(session, wake_up_session_cb, 1697 (void *)(unsigned long)ev); 1698 } 1699 1700 /* 1701 * Send periodic message to MDS renewing all currently held caps. The 1702 * ack will reset the expiration for all caps from this session. 1703 * 1704 * caller holds s_mutex 1705 */ 1706 static int send_renew_caps(struct ceph_mds_client *mdsc, 1707 struct ceph_mds_session *session) 1708 { 1709 struct ceph_msg *msg; 1710 int state; 1711 1712 if (time_after_eq(jiffies, session->s_cap_ttl) && 1713 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1714 pr_info("mds%d caps stale\n", session->s_mds); 1715 session->s_renew_requested = jiffies; 1716 1717 /* do not try to renew caps until a recovering mds has reconnected 1718 * with its clients. */ 1719 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1720 if (state < CEPH_MDS_STATE_RECONNECT) { 1721 dout("send_renew_caps ignoring mds%d (%s)\n", 1722 session->s_mds, ceph_mds_state_name(state)); 1723 return 0; 1724 } 1725 1726 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1727 ceph_mds_state_name(state)); 1728 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1729 ++session->s_renew_seq); 1730 if (!msg) 1731 return -ENOMEM; 1732 ceph_con_send(&session->s_con, msg); 1733 return 0; 1734 } 1735 1736 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1737 struct ceph_mds_session *session, u64 seq) 1738 { 1739 struct ceph_msg *msg; 1740 1741 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1742 session->s_mds, ceph_session_state_name(session->s_state), seq); 1743 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1744 if (!msg) 1745 return -ENOMEM; 1746 ceph_con_send(&session->s_con, msg); 1747 return 0; 1748 } 1749 1750 1751 /* 1752 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1753 * 1754 * Called under session->s_mutex 1755 */ 1756 static void renewed_caps(struct ceph_mds_client *mdsc, 1757 struct ceph_mds_session *session, int is_renew) 1758 { 1759 int was_stale; 1760 int wake = 0; 1761 1762 spin_lock(&session->s_cap_lock); 1763 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1764 1765 session->s_cap_ttl = session->s_renew_requested + 1766 mdsc->mdsmap->m_session_timeout*HZ; 1767 1768 if (was_stale) { 1769 if (time_before(jiffies, session->s_cap_ttl)) { 1770 pr_info("mds%d caps renewed\n", session->s_mds); 1771 wake = 1; 1772 } else { 1773 pr_info("mds%d caps still stale\n", session->s_mds); 1774 } 1775 } 1776 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1777 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1778 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1779 spin_unlock(&session->s_cap_lock); 1780 1781 if (wake) 1782 wake_up_session_caps(session, RENEWCAPS); 1783 } 1784 1785 /* 1786 * send a session close request 1787 */ 1788 static int request_close_session(struct ceph_mds_client *mdsc, 1789 struct ceph_mds_session *session) 1790 { 1791 struct ceph_msg *msg; 1792 1793 dout("request_close_session mds%d state %s seq %lld\n", 1794 session->s_mds, ceph_session_state_name(session->s_state), 1795 session->s_seq); 1796 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1797 if (!msg) 1798 return -ENOMEM; 1799 ceph_con_send(&session->s_con, msg); 1800 return 1; 1801 } 1802 1803 /* 1804 * Called with s_mutex held. 1805 */ 1806 static int __close_session(struct ceph_mds_client *mdsc, 1807 struct ceph_mds_session *session) 1808 { 1809 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1810 return 0; 1811 session->s_state = CEPH_MDS_SESSION_CLOSING; 1812 return request_close_session(mdsc, session); 1813 } 1814 1815 static bool drop_negative_children(struct dentry *dentry) 1816 { 1817 struct dentry *child; 1818 bool all_negative = true; 1819 1820 if (!d_is_dir(dentry)) 1821 goto out; 1822 1823 spin_lock(&dentry->d_lock); 1824 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1825 if (d_really_is_positive(child)) { 1826 all_negative = false; 1827 break; 1828 } 1829 } 1830 spin_unlock(&dentry->d_lock); 1831 1832 if (all_negative) 1833 shrink_dcache_parent(dentry); 1834 out: 1835 return all_negative; 1836 } 1837 1838 /* 1839 * Trim old(er) caps. 1840 * 1841 * Because we can't cache an inode without one or more caps, we do 1842 * this indirectly: if a cap is unused, we prune its aliases, at which 1843 * point the inode will hopefully get dropped to. 1844 * 1845 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1846 * memory pressure from the MDS, though, so it needn't be perfect. 1847 */ 1848 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1849 { 1850 int *remaining = arg; 1851 struct ceph_inode_info *ci = ceph_inode(inode); 1852 int used, wanted, oissued, mine; 1853 1854 if (*remaining <= 0) 1855 return -1; 1856 1857 spin_lock(&ci->i_ceph_lock); 1858 mine = cap->issued | cap->implemented; 1859 used = __ceph_caps_used(ci); 1860 wanted = __ceph_caps_file_wanted(ci); 1861 oissued = __ceph_caps_issued_other(ci, cap); 1862 1863 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1864 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1865 ceph_cap_string(used), ceph_cap_string(wanted)); 1866 if (cap == ci->i_auth_cap) { 1867 if (ci->i_dirty_caps || ci->i_flushing_caps || 1868 !list_empty(&ci->i_cap_snaps)) 1869 goto out; 1870 if ((used | wanted) & CEPH_CAP_ANY_WR) 1871 goto out; 1872 /* Note: it's possible that i_filelock_ref becomes non-zero 1873 * after dropping auth caps. It doesn't hurt because reply 1874 * of lock mds request will re-add auth caps. */ 1875 if (atomic_read(&ci->i_filelock_ref) > 0) 1876 goto out; 1877 } 1878 /* The inode has cached pages, but it's no longer used. 1879 * we can safely drop it */ 1880 if (S_ISREG(inode->i_mode) && 1881 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 1882 !(oissued & CEPH_CAP_FILE_CACHE)) { 1883 used = 0; 1884 oissued = 0; 1885 } 1886 if ((used | wanted) & ~oissued & mine) 1887 goto out; /* we need these caps */ 1888 1889 if (oissued) { 1890 /* we aren't the only cap.. just remove us */ 1891 __ceph_remove_cap(cap, true); 1892 (*remaining)--; 1893 } else { 1894 struct dentry *dentry; 1895 /* try dropping referring dentries */ 1896 spin_unlock(&ci->i_ceph_lock); 1897 dentry = d_find_any_alias(inode); 1898 if (dentry && drop_negative_children(dentry)) { 1899 int count; 1900 dput(dentry); 1901 d_prune_aliases(inode); 1902 count = atomic_read(&inode->i_count); 1903 if (count == 1) 1904 (*remaining)--; 1905 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1906 inode, cap, count); 1907 } else { 1908 dput(dentry); 1909 } 1910 return 0; 1911 } 1912 1913 out: 1914 spin_unlock(&ci->i_ceph_lock); 1915 return 0; 1916 } 1917 1918 /* 1919 * Trim session cap count down to some max number. 1920 */ 1921 int ceph_trim_caps(struct ceph_mds_client *mdsc, 1922 struct ceph_mds_session *session, 1923 int max_caps) 1924 { 1925 int trim_caps = session->s_nr_caps - max_caps; 1926 1927 dout("trim_caps mds%d start: %d / %d, trim %d\n", 1928 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 1929 if (trim_caps > 0) { 1930 int remaining = trim_caps; 1931 1932 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 1933 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 1934 session->s_mds, session->s_nr_caps, max_caps, 1935 trim_caps - remaining); 1936 } 1937 1938 ceph_flush_cap_releases(mdsc, session); 1939 return 0; 1940 } 1941 1942 static int check_caps_flush(struct ceph_mds_client *mdsc, 1943 u64 want_flush_tid) 1944 { 1945 int ret = 1; 1946 1947 spin_lock(&mdsc->cap_dirty_lock); 1948 if (!list_empty(&mdsc->cap_flush_list)) { 1949 struct ceph_cap_flush *cf = 1950 list_first_entry(&mdsc->cap_flush_list, 1951 struct ceph_cap_flush, g_list); 1952 if (cf->tid <= want_flush_tid) { 1953 dout("check_caps_flush still flushing tid " 1954 "%llu <= %llu\n", cf->tid, want_flush_tid); 1955 ret = 0; 1956 } 1957 } 1958 spin_unlock(&mdsc->cap_dirty_lock); 1959 return ret; 1960 } 1961 1962 /* 1963 * flush all dirty inode data to disk. 1964 * 1965 * returns true if we've flushed through want_flush_tid 1966 */ 1967 static void wait_caps_flush(struct ceph_mds_client *mdsc, 1968 u64 want_flush_tid) 1969 { 1970 dout("check_caps_flush want %llu\n", want_flush_tid); 1971 1972 wait_event(mdsc->cap_flushing_wq, 1973 check_caps_flush(mdsc, want_flush_tid)); 1974 1975 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 1976 } 1977 1978 /* 1979 * called under s_mutex 1980 */ 1981 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 1982 struct ceph_mds_session *session) 1983 { 1984 struct ceph_msg *msg = NULL; 1985 struct ceph_mds_cap_release *head; 1986 struct ceph_mds_cap_item *item; 1987 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 1988 struct ceph_cap *cap; 1989 LIST_HEAD(tmp_list); 1990 int num_cap_releases; 1991 __le32 barrier, *cap_barrier; 1992 1993 down_read(&osdc->lock); 1994 barrier = cpu_to_le32(osdc->epoch_barrier); 1995 up_read(&osdc->lock); 1996 1997 spin_lock(&session->s_cap_lock); 1998 again: 1999 list_splice_init(&session->s_cap_releases, &tmp_list); 2000 num_cap_releases = session->s_num_cap_releases; 2001 session->s_num_cap_releases = 0; 2002 spin_unlock(&session->s_cap_lock); 2003 2004 while (!list_empty(&tmp_list)) { 2005 if (!msg) { 2006 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2007 PAGE_SIZE, GFP_NOFS, false); 2008 if (!msg) 2009 goto out_err; 2010 head = msg->front.iov_base; 2011 head->num = cpu_to_le32(0); 2012 msg->front.iov_len = sizeof(*head); 2013 2014 msg->hdr.version = cpu_to_le16(2); 2015 msg->hdr.compat_version = cpu_to_le16(1); 2016 } 2017 2018 cap = list_first_entry(&tmp_list, struct ceph_cap, 2019 session_caps); 2020 list_del(&cap->session_caps); 2021 num_cap_releases--; 2022 2023 head = msg->front.iov_base; 2024 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2025 &head->num); 2026 item = msg->front.iov_base + msg->front.iov_len; 2027 item->ino = cpu_to_le64(cap->cap_ino); 2028 item->cap_id = cpu_to_le64(cap->cap_id); 2029 item->migrate_seq = cpu_to_le32(cap->mseq); 2030 item->seq = cpu_to_le32(cap->issue_seq); 2031 msg->front.iov_len += sizeof(*item); 2032 2033 ceph_put_cap(mdsc, cap); 2034 2035 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2036 // Append cap_barrier field 2037 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2038 *cap_barrier = barrier; 2039 msg->front.iov_len += sizeof(*cap_barrier); 2040 2041 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2042 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2043 ceph_con_send(&session->s_con, msg); 2044 msg = NULL; 2045 } 2046 } 2047 2048 BUG_ON(num_cap_releases != 0); 2049 2050 spin_lock(&session->s_cap_lock); 2051 if (!list_empty(&session->s_cap_releases)) 2052 goto again; 2053 spin_unlock(&session->s_cap_lock); 2054 2055 if (msg) { 2056 // Append cap_barrier field 2057 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2058 *cap_barrier = barrier; 2059 msg->front.iov_len += sizeof(*cap_barrier); 2060 2061 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2062 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2063 ceph_con_send(&session->s_con, msg); 2064 } 2065 return; 2066 out_err: 2067 pr_err("send_cap_releases mds%d, failed to allocate message\n", 2068 session->s_mds); 2069 spin_lock(&session->s_cap_lock); 2070 list_splice(&tmp_list, &session->s_cap_releases); 2071 session->s_num_cap_releases += num_cap_releases; 2072 spin_unlock(&session->s_cap_lock); 2073 } 2074 2075 static void ceph_cap_release_work(struct work_struct *work) 2076 { 2077 struct ceph_mds_session *session = 2078 container_of(work, struct ceph_mds_session, s_cap_release_work); 2079 2080 mutex_lock(&session->s_mutex); 2081 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2082 session->s_state == CEPH_MDS_SESSION_HUNG) 2083 ceph_send_cap_releases(session->s_mdsc, session); 2084 mutex_unlock(&session->s_mutex); 2085 ceph_put_mds_session(session); 2086 } 2087 2088 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 2089 struct ceph_mds_session *session) 2090 { 2091 if (mdsc->stopping) 2092 return; 2093 2094 ceph_get_mds_session(session); 2095 if (queue_work(mdsc->fsc->cap_wq, 2096 &session->s_cap_release_work)) { 2097 dout("cap release work queued\n"); 2098 } else { 2099 ceph_put_mds_session(session); 2100 dout("failed to queue cap release work\n"); 2101 } 2102 } 2103 2104 /* 2105 * caller holds session->s_cap_lock 2106 */ 2107 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2108 struct ceph_cap *cap) 2109 { 2110 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2111 session->s_num_cap_releases++; 2112 2113 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2114 ceph_flush_cap_releases(session->s_mdsc, session); 2115 } 2116 2117 static void ceph_cap_reclaim_work(struct work_struct *work) 2118 { 2119 struct ceph_mds_client *mdsc = 2120 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2121 int ret = ceph_trim_dentries(mdsc); 2122 if (ret == -EAGAIN) 2123 ceph_queue_cap_reclaim_work(mdsc); 2124 } 2125 2126 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2127 { 2128 if (mdsc->stopping) 2129 return; 2130 2131 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2132 dout("caps reclaim work queued\n"); 2133 } else { 2134 dout("failed to queue caps release work\n"); 2135 } 2136 } 2137 2138 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2139 { 2140 int val; 2141 if (!nr) 2142 return; 2143 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2144 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2145 atomic_set(&mdsc->cap_reclaim_pending, 0); 2146 ceph_queue_cap_reclaim_work(mdsc); 2147 } 2148 } 2149 2150 /* 2151 * requests 2152 */ 2153 2154 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2155 struct inode *dir) 2156 { 2157 struct ceph_inode_info *ci = ceph_inode(dir); 2158 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2159 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2160 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2161 unsigned int num_entries; 2162 int order; 2163 2164 spin_lock(&ci->i_ceph_lock); 2165 num_entries = ci->i_files + ci->i_subdirs; 2166 spin_unlock(&ci->i_ceph_lock); 2167 num_entries = max(num_entries, 1U); 2168 num_entries = min(num_entries, opt->max_readdir); 2169 2170 order = get_order(size * num_entries); 2171 while (order >= 0) { 2172 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2173 __GFP_NOWARN, 2174 order); 2175 if (rinfo->dir_entries) 2176 break; 2177 order--; 2178 } 2179 if (!rinfo->dir_entries) 2180 return -ENOMEM; 2181 2182 num_entries = (PAGE_SIZE << order) / size; 2183 num_entries = min(num_entries, opt->max_readdir); 2184 2185 rinfo->dir_buf_size = PAGE_SIZE << order; 2186 req->r_num_caps = num_entries + 1; 2187 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2188 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2189 return 0; 2190 } 2191 2192 /* 2193 * Create an mds request. 2194 */ 2195 struct ceph_mds_request * 2196 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2197 { 2198 struct ceph_mds_request *req; 2199 2200 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2201 if (!req) 2202 return ERR_PTR(-ENOMEM); 2203 2204 mutex_init(&req->r_fill_mutex); 2205 req->r_mdsc = mdsc; 2206 req->r_started = jiffies; 2207 req->r_start_latency = ktime_get(); 2208 req->r_resend_mds = -1; 2209 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2210 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2211 req->r_fmode = -1; 2212 kref_init(&req->r_kref); 2213 RB_CLEAR_NODE(&req->r_node); 2214 INIT_LIST_HEAD(&req->r_wait); 2215 init_completion(&req->r_completion); 2216 init_completion(&req->r_safe_completion); 2217 INIT_LIST_HEAD(&req->r_unsafe_item); 2218 2219 ktime_get_coarse_real_ts64(&req->r_stamp); 2220 2221 req->r_op = op; 2222 req->r_direct_mode = mode; 2223 return req; 2224 } 2225 2226 /* 2227 * return oldest (lowest) request, tid in request tree, 0 if none. 2228 * 2229 * called under mdsc->mutex. 2230 */ 2231 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2232 { 2233 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2234 return NULL; 2235 return rb_entry(rb_first(&mdsc->request_tree), 2236 struct ceph_mds_request, r_node); 2237 } 2238 2239 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2240 { 2241 return mdsc->oldest_tid; 2242 } 2243 2244 /* 2245 * Build a dentry's path. Allocate on heap; caller must kfree. Based 2246 * on build_path_from_dentry in fs/cifs/dir.c. 2247 * 2248 * If @stop_on_nosnap, generate path relative to the first non-snapped 2249 * inode. 2250 * 2251 * Encode hidden .snap dirs as a double /, i.e. 2252 * foo/.snap/bar -> foo//bar 2253 */ 2254 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, 2255 int stop_on_nosnap) 2256 { 2257 struct dentry *temp; 2258 char *path; 2259 int pos; 2260 unsigned seq; 2261 u64 base; 2262 2263 if (!dentry) 2264 return ERR_PTR(-EINVAL); 2265 2266 path = __getname(); 2267 if (!path) 2268 return ERR_PTR(-ENOMEM); 2269 retry: 2270 pos = PATH_MAX - 1; 2271 path[pos] = '\0'; 2272 2273 seq = read_seqbegin(&rename_lock); 2274 rcu_read_lock(); 2275 temp = dentry; 2276 for (;;) { 2277 struct inode *inode; 2278 2279 spin_lock(&temp->d_lock); 2280 inode = d_inode(temp); 2281 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2282 dout("build_path path+%d: %p SNAPDIR\n", 2283 pos, temp); 2284 } else if (stop_on_nosnap && inode && dentry != temp && 2285 ceph_snap(inode) == CEPH_NOSNAP) { 2286 spin_unlock(&temp->d_lock); 2287 pos++; /* get rid of any prepended '/' */ 2288 break; 2289 } else { 2290 pos -= temp->d_name.len; 2291 if (pos < 0) { 2292 spin_unlock(&temp->d_lock); 2293 break; 2294 } 2295 memcpy(path + pos, temp->d_name.name, temp->d_name.len); 2296 } 2297 spin_unlock(&temp->d_lock); 2298 temp = READ_ONCE(temp->d_parent); 2299 2300 /* Are we at the root? */ 2301 if (IS_ROOT(temp)) 2302 break; 2303 2304 /* Are we out of buffer? */ 2305 if (--pos < 0) 2306 break; 2307 2308 path[pos] = '/'; 2309 } 2310 base = ceph_ino(d_inode(temp)); 2311 rcu_read_unlock(); 2312 2313 if (read_seqretry(&rename_lock, seq)) 2314 goto retry; 2315 2316 if (pos < 0) { 2317 /* 2318 * A rename didn't occur, but somehow we didn't end up where 2319 * we thought we would. Throw a warning and try again. 2320 */ 2321 pr_warn("build_path did not end path lookup where " 2322 "expected, pos is %d\n", pos); 2323 goto retry; 2324 } 2325 2326 *pbase = base; 2327 *plen = PATH_MAX - 1 - pos; 2328 dout("build_path on %p %d built %llx '%.*s'\n", 2329 dentry, d_count(dentry), base, *plen, path + pos); 2330 return path + pos; 2331 } 2332 2333 static int build_dentry_path(struct dentry *dentry, struct inode *dir, 2334 const char **ppath, int *ppathlen, u64 *pino, 2335 bool *pfreepath, bool parent_locked) 2336 { 2337 char *path; 2338 2339 rcu_read_lock(); 2340 if (!dir) 2341 dir = d_inode_rcu(dentry->d_parent); 2342 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { 2343 *pino = ceph_ino(dir); 2344 rcu_read_unlock(); 2345 *ppath = dentry->d_name.name; 2346 *ppathlen = dentry->d_name.len; 2347 return 0; 2348 } 2349 rcu_read_unlock(); 2350 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2351 if (IS_ERR(path)) 2352 return PTR_ERR(path); 2353 *ppath = path; 2354 *pfreepath = true; 2355 return 0; 2356 } 2357 2358 static int build_inode_path(struct inode *inode, 2359 const char **ppath, int *ppathlen, u64 *pino, 2360 bool *pfreepath) 2361 { 2362 struct dentry *dentry; 2363 char *path; 2364 2365 if (ceph_snap(inode) == CEPH_NOSNAP) { 2366 *pino = ceph_ino(inode); 2367 *ppathlen = 0; 2368 return 0; 2369 } 2370 dentry = d_find_alias(inode); 2371 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2372 dput(dentry); 2373 if (IS_ERR(path)) 2374 return PTR_ERR(path); 2375 *ppath = path; 2376 *pfreepath = true; 2377 return 0; 2378 } 2379 2380 /* 2381 * request arguments may be specified via an inode *, a dentry *, or 2382 * an explicit ino+path. 2383 */ 2384 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2385 struct inode *rdiri, const char *rpath, 2386 u64 rino, const char **ppath, int *pathlen, 2387 u64 *ino, bool *freepath, bool parent_locked) 2388 { 2389 int r = 0; 2390 2391 if (rinode) { 2392 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2393 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2394 ceph_snap(rinode)); 2395 } else if (rdentry) { 2396 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2397 freepath, parent_locked); 2398 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2399 *ppath); 2400 } else if (rpath || rino) { 2401 *ino = rino; 2402 *ppath = rpath; 2403 *pathlen = rpath ? strlen(rpath) : 0; 2404 dout(" path %.*s\n", *pathlen, rpath); 2405 } 2406 2407 return r; 2408 } 2409 2410 /* 2411 * called under mdsc->mutex 2412 */ 2413 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, 2414 struct ceph_mds_request *req, 2415 int mds, bool drop_cap_releases) 2416 { 2417 struct ceph_msg *msg; 2418 struct ceph_mds_request_head *head; 2419 const char *path1 = NULL; 2420 const char *path2 = NULL; 2421 u64 ino1 = 0, ino2 = 0; 2422 int pathlen1 = 0, pathlen2 = 0; 2423 bool freepath1 = false, freepath2 = false; 2424 int len; 2425 u16 releases; 2426 void *p, *end; 2427 int ret; 2428 2429 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2430 req->r_parent, req->r_path1, req->r_ino1.ino, 2431 &path1, &pathlen1, &ino1, &freepath1, 2432 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2433 &req->r_req_flags)); 2434 if (ret < 0) { 2435 msg = ERR_PTR(ret); 2436 goto out; 2437 } 2438 2439 /* If r_old_dentry is set, then assume that its parent is locked */ 2440 ret = set_request_path_attr(NULL, req->r_old_dentry, 2441 req->r_old_dentry_dir, 2442 req->r_path2, req->r_ino2.ino, 2443 &path2, &pathlen2, &ino2, &freepath2, true); 2444 if (ret < 0) { 2445 msg = ERR_PTR(ret); 2446 goto out_free1; 2447 } 2448 2449 len = sizeof(*head) + 2450 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2451 sizeof(struct ceph_timespec); 2452 2453 /* calculate (max) length for cap releases */ 2454 len += sizeof(struct ceph_mds_request_release) * 2455 (!!req->r_inode_drop + !!req->r_dentry_drop + 2456 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2457 if (req->r_dentry_drop) 2458 len += pathlen1; 2459 if (req->r_old_dentry_drop) 2460 len += pathlen2; 2461 2462 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2463 if (!msg) { 2464 msg = ERR_PTR(-ENOMEM); 2465 goto out_free2; 2466 } 2467 2468 msg->hdr.version = cpu_to_le16(2); 2469 msg->hdr.tid = cpu_to_le64(req->r_tid); 2470 2471 head = msg->front.iov_base; 2472 p = msg->front.iov_base + sizeof(*head); 2473 end = msg->front.iov_base + msg->front.iov_len; 2474 2475 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2476 head->op = cpu_to_le32(req->r_op); 2477 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid)); 2478 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid)); 2479 head->ino = cpu_to_le64(req->r_deleg_ino); 2480 head->args = req->r_args; 2481 2482 ceph_encode_filepath(&p, end, ino1, path1); 2483 ceph_encode_filepath(&p, end, ino2, path2); 2484 2485 /* make note of release offset, in case we need to replay */ 2486 req->r_request_release_offset = p - msg->front.iov_base; 2487 2488 /* cap releases */ 2489 releases = 0; 2490 if (req->r_inode_drop) 2491 releases += ceph_encode_inode_release(&p, 2492 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2493 mds, req->r_inode_drop, req->r_inode_unless, 2494 req->r_op == CEPH_MDS_OP_READDIR); 2495 if (req->r_dentry_drop) 2496 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2497 req->r_parent, mds, req->r_dentry_drop, 2498 req->r_dentry_unless); 2499 if (req->r_old_dentry_drop) 2500 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2501 req->r_old_dentry_dir, mds, 2502 req->r_old_dentry_drop, 2503 req->r_old_dentry_unless); 2504 if (req->r_old_inode_drop) 2505 releases += ceph_encode_inode_release(&p, 2506 d_inode(req->r_old_dentry), 2507 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2508 2509 if (drop_cap_releases) { 2510 releases = 0; 2511 p = msg->front.iov_base + req->r_request_release_offset; 2512 } 2513 2514 head->num_releases = cpu_to_le16(releases); 2515 2516 /* time stamp */ 2517 { 2518 struct ceph_timespec ts; 2519 ceph_encode_timespec64(&ts, &req->r_stamp); 2520 ceph_encode_copy(&p, &ts, sizeof(ts)); 2521 } 2522 2523 BUG_ON(p > end); 2524 msg->front.iov_len = p - msg->front.iov_base; 2525 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2526 2527 if (req->r_pagelist) { 2528 struct ceph_pagelist *pagelist = req->r_pagelist; 2529 ceph_msg_data_add_pagelist(msg, pagelist); 2530 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2531 } else { 2532 msg->hdr.data_len = 0; 2533 } 2534 2535 msg->hdr.data_off = cpu_to_le16(0); 2536 2537 out_free2: 2538 if (freepath2) 2539 ceph_mdsc_free_path((char *)path2, pathlen2); 2540 out_free1: 2541 if (freepath1) 2542 ceph_mdsc_free_path((char *)path1, pathlen1); 2543 out: 2544 return msg; 2545 } 2546 2547 /* 2548 * called under mdsc->mutex if error, under no mutex if 2549 * success. 2550 */ 2551 static void complete_request(struct ceph_mds_client *mdsc, 2552 struct ceph_mds_request *req) 2553 { 2554 req->r_end_latency = ktime_get(); 2555 2556 if (req->r_callback) 2557 req->r_callback(mdsc, req); 2558 complete_all(&req->r_completion); 2559 } 2560 2561 /* 2562 * called under mdsc->mutex 2563 */ 2564 static int __prepare_send_request(struct ceph_mds_client *mdsc, 2565 struct ceph_mds_request *req, 2566 int mds, bool drop_cap_releases) 2567 { 2568 struct ceph_mds_request_head *rhead; 2569 struct ceph_msg *msg; 2570 int flags = 0; 2571 2572 req->r_attempts++; 2573 if (req->r_inode) { 2574 struct ceph_cap *cap = 2575 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2576 2577 if (cap) 2578 req->r_sent_on_mseq = cap->mseq; 2579 else 2580 req->r_sent_on_mseq = -1; 2581 } 2582 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 2583 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2584 2585 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2586 void *p; 2587 /* 2588 * Replay. Do not regenerate message (and rebuild 2589 * paths, etc.); just use the original message. 2590 * Rebuilding paths will break for renames because 2591 * d_move mangles the src name. 2592 */ 2593 msg = req->r_request; 2594 rhead = msg->front.iov_base; 2595 2596 flags = le32_to_cpu(rhead->flags); 2597 flags |= CEPH_MDS_FLAG_REPLAY; 2598 rhead->flags = cpu_to_le32(flags); 2599 2600 if (req->r_target_inode) 2601 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2602 2603 rhead->num_retry = req->r_attempts - 1; 2604 2605 /* remove cap/dentry releases from message */ 2606 rhead->num_releases = 0; 2607 2608 /* time stamp */ 2609 p = msg->front.iov_base + req->r_request_release_offset; 2610 { 2611 struct ceph_timespec ts; 2612 ceph_encode_timespec64(&ts, &req->r_stamp); 2613 ceph_encode_copy(&p, &ts, sizeof(ts)); 2614 } 2615 2616 msg->front.iov_len = p - msg->front.iov_base; 2617 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2618 return 0; 2619 } 2620 2621 if (req->r_request) { 2622 ceph_msg_put(req->r_request); 2623 req->r_request = NULL; 2624 } 2625 msg = create_request_message(mdsc, req, mds, drop_cap_releases); 2626 if (IS_ERR(msg)) { 2627 req->r_err = PTR_ERR(msg); 2628 return PTR_ERR(msg); 2629 } 2630 req->r_request = msg; 2631 2632 rhead = msg->front.iov_base; 2633 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2634 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2635 flags |= CEPH_MDS_FLAG_REPLAY; 2636 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 2637 flags |= CEPH_MDS_FLAG_ASYNC; 2638 if (req->r_parent) 2639 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2640 rhead->flags = cpu_to_le32(flags); 2641 rhead->num_fwd = req->r_num_fwd; 2642 rhead->num_retry = req->r_attempts - 1; 2643 2644 dout(" r_parent = %p\n", req->r_parent); 2645 return 0; 2646 } 2647 2648 /* 2649 * called under mdsc->mutex 2650 */ 2651 static int __send_request(struct ceph_mds_client *mdsc, 2652 struct ceph_mds_session *session, 2653 struct ceph_mds_request *req, 2654 bool drop_cap_releases) 2655 { 2656 int err; 2657 2658 err = __prepare_send_request(mdsc, req, session->s_mds, 2659 drop_cap_releases); 2660 if (!err) { 2661 ceph_msg_get(req->r_request); 2662 ceph_con_send(&session->s_con, req->r_request); 2663 } 2664 2665 return err; 2666 } 2667 2668 /* 2669 * send request, or put it on the appropriate wait list. 2670 */ 2671 static void __do_request(struct ceph_mds_client *mdsc, 2672 struct ceph_mds_request *req) 2673 { 2674 struct ceph_mds_session *session = NULL; 2675 int mds = -1; 2676 int err = 0; 2677 bool random; 2678 2679 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2680 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2681 __unregister_request(mdsc, req); 2682 return; 2683 } 2684 2685 if (req->r_timeout && 2686 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2687 dout("do_request timed out\n"); 2688 err = -ETIMEDOUT; 2689 goto finish; 2690 } 2691 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2692 dout("do_request forced umount\n"); 2693 err = -EIO; 2694 goto finish; 2695 } 2696 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2697 if (mdsc->mdsmap_err) { 2698 err = mdsc->mdsmap_err; 2699 dout("do_request mdsmap err %d\n", err); 2700 goto finish; 2701 } 2702 if (mdsc->mdsmap->m_epoch == 0) { 2703 dout("do_request no mdsmap, waiting for map\n"); 2704 list_add(&req->r_wait, &mdsc->waiting_for_map); 2705 return; 2706 } 2707 if (!(mdsc->fsc->mount_options->flags & 2708 CEPH_MOUNT_OPT_MOUNTWAIT) && 2709 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2710 err = -EHOSTUNREACH; 2711 goto finish; 2712 } 2713 } 2714 2715 put_request_session(req); 2716 2717 mds = __choose_mds(mdsc, req, &random); 2718 if (mds < 0 || 2719 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2720 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2721 err = -EJUKEBOX; 2722 goto finish; 2723 } 2724 dout("do_request no mds or not active, waiting for map\n"); 2725 list_add(&req->r_wait, &mdsc->waiting_for_map); 2726 return; 2727 } 2728 2729 /* get, open session */ 2730 session = __ceph_lookup_mds_session(mdsc, mds); 2731 if (!session) { 2732 session = register_session(mdsc, mds); 2733 if (IS_ERR(session)) { 2734 err = PTR_ERR(session); 2735 goto finish; 2736 } 2737 } 2738 req->r_session = ceph_get_mds_session(session); 2739 2740 dout("do_request mds%d session %p state %s\n", mds, session, 2741 ceph_session_state_name(session->s_state)); 2742 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2743 session->s_state != CEPH_MDS_SESSION_HUNG) { 2744 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2745 err = -EACCES; 2746 goto out_session; 2747 } 2748 /* 2749 * We cannot queue async requests since the caps and delegated 2750 * inodes are bound to the session. Just return -EJUKEBOX and 2751 * let the caller retry a sync request in that case. 2752 */ 2753 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2754 err = -EJUKEBOX; 2755 goto out_session; 2756 } 2757 if (session->s_state == CEPH_MDS_SESSION_NEW || 2758 session->s_state == CEPH_MDS_SESSION_CLOSING) { 2759 __open_session(mdsc, session); 2760 /* retry the same mds later */ 2761 if (random) 2762 req->r_resend_mds = mds; 2763 } 2764 list_add(&req->r_wait, &session->s_waiting); 2765 goto out_session; 2766 } 2767 2768 /* send request */ 2769 req->r_resend_mds = -1; /* forget any previous mds hint */ 2770 2771 if (req->r_request_started == 0) /* note request start time */ 2772 req->r_request_started = jiffies; 2773 2774 err = __send_request(mdsc, session, req, false); 2775 2776 out_session: 2777 ceph_put_mds_session(session); 2778 finish: 2779 if (err) { 2780 dout("__do_request early error %d\n", err); 2781 req->r_err = err; 2782 complete_request(mdsc, req); 2783 __unregister_request(mdsc, req); 2784 } 2785 return; 2786 } 2787 2788 /* 2789 * called under mdsc->mutex 2790 */ 2791 static void __wake_requests(struct ceph_mds_client *mdsc, 2792 struct list_head *head) 2793 { 2794 struct ceph_mds_request *req; 2795 LIST_HEAD(tmp_list); 2796 2797 list_splice_init(head, &tmp_list); 2798 2799 while (!list_empty(&tmp_list)) { 2800 req = list_entry(tmp_list.next, 2801 struct ceph_mds_request, r_wait); 2802 list_del_init(&req->r_wait); 2803 dout(" wake request %p tid %llu\n", req, req->r_tid); 2804 __do_request(mdsc, req); 2805 } 2806 } 2807 2808 /* 2809 * Wake up threads with requests pending for @mds, so that they can 2810 * resubmit their requests to a possibly different mds. 2811 */ 2812 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 2813 { 2814 struct ceph_mds_request *req; 2815 struct rb_node *p = rb_first(&mdsc->request_tree); 2816 2817 dout("kick_requests mds%d\n", mds); 2818 while (p) { 2819 req = rb_entry(p, struct ceph_mds_request, r_node); 2820 p = rb_next(p); 2821 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2822 continue; 2823 if (req->r_attempts > 0) 2824 continue; /* only new requests */ 2825 if (req->r_session && 2826 req->r_session->s_mds == mds) { 2827 dout(" kicking tid %llu\n", req->r_tid); 2828 list_del_init(&req->r_wait); 2829 __do_request(mdsc, req); 2830 } 2831 } 2832 } 2833 2834 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 2835 struct ceph_mds_request *req) 2836 { 2837 int err = 0; 2838 2839 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 2840 if (req->r_inode) 2841 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2842 if (req->r_parent) { 2843 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 2844 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 2845 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 2846 spin_lock(&ci->i_ceph_lock); 2847 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 2848 __ceph_touch_fmode(ci, mdsc, fmode); 2849 spin_unlock(&ci->i_ceph_lock); 2850 ihold(req->r_parent); 2851 } 2852 if (req->r_old_dentry_dir) 2853 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2854 CEPH_CAP_PIN); 2855 2856 if (req->r_inode) { 2857 err = ceph_wait_on_async_create(req->r_inode); 2858 if (err) { 2859 dout("%s: wait for async create returned: %d\n", 2860 __func__, err); 2861 return err; 2862 } 2863 } 2864 2865 if (!err && req->r_old_inode) { 2866 err = ceph_wait_on_async_create(req->r_old_inode); 2867 if (err) { 2868 dout("%s: wait for async create returned: %d\n", 2869 __func__, err); 2870 return err; 2871 } 2872 } 2873 2874 dout("submit_request on %p for inode %p\n", req, dir); 2875 mutex_lock(&mdsc->mutex); 2876 __register_request(mdsc, req, dir); 2877 __do_request(mdsc, req); 2878 err = req->r_err; 2879 mutex_unlock(&mdsc->mutex); 2880 return err; 2881 } 2882 2883 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 2884 struct ceph_mds_request *req) 2885 { 2886 int err; 2887 2888 /* wait */ 2889 dout("do_request waiting\n"); 2890 if (!req->r_timeout && req->r_wait_for_completion) { 2891 err = req->r_wait_for_completion(mdsc, req); 2892 } else { 2893 long timeleft = wait_for_completion_killable_timeout( 2894 &req->r_completion, 2895 ceph_timeout_jiffies(req->r_timeout)); 2896 if (timeleft > 0) 2897 err = 0; 2898 else if (!timeleft) 2899 err = -ETIMEDOUT; /* timed out */ 2900 else 2901 err = timeleft; /* killed */ 2902 } 2903 dout("do_request waited, got %d\n", err); 2904 mutex_lock(&mdsc->mutex); 2905 2906 /* only abort if we didn't race with a real reply */ 2907 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2908 err = le32_to_cpu(req->r_reply_info.head->result); 2909 } else if (err < 0) { 2910 dout("aborted request %lld with %d\n", req->r_tid, err); 2911 2912 /* 2913 * ensure we aren't running concurrently with 2914 * ceph_fill_trace or ceph_readdir_prepopulate, which 2915 * rely on locks (dir mutex) held by our caller. 2916 */ 2917 mutex_lock(&req->r_fill_mutex); 2918 req->r_err = err; 2919 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 2920 mutex_unlock(&req->r_fill_mutex); 2921 2922 if (req->r_parent && 2923 (req->r_op & CEPH_MDS_OP_WRITE)) 2924 ceph_invalidate_dir_request(req); 2925 } else { 2926 err = req->r_err; 2927 } 2928 2929 mutex_unlock(&mdsc->mutex); 2930 return err; 2931 } 2932 2933 /* 2934 * Synchrously perform an mds request. Take care of all of the 2935 * session setup, forwarding, retry details. 2936 */ 2937 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 2938 struct inode *dir, 2939 struct ceph_mds_request *req) 2940 { 2941 int err; 2942 2943 dout("do_request on %p\n", req); 2944 2945 /* issue */ 2946 err = ceph_mdsc_submit_request(mdsc, dir, req); 2947 if (!err) 2948 err = ceph_mdsc_wait_request(mdsc, req); 2949 dout("do_request %p done, result %d\n", req, err); 2950 return err; 2951 } 2952 2953 /* 2954 * Invalidate dir's completeness, dentry lease state on an aborted MDS 2955 * namespace request. 2956 */ 2957 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 2958 { 2959 struct inode *dir = req->r_parent; 2960 struct inode *old_dir = req->r_old_dentry_dir; 2961 2962 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 2963 2964 ceph_dir_clear_complete(dir); 2965 if (old_dir) 2966 ceph_dir_clear_complete(old_dir); 2967 if (req->r_dentry) 2968 ceph_invalidate_dentry_lease(req->r_dentry); 2969 if (req->r_old_dentry) 2970 ceph_invalidate_dentry_lease(req->r_old_dentry); 2971 } 2972 2973 /* 2974 * Handle mds reply. 2975 * 2976 * We take the session mutex and parse and process the reply immediately. 2977 * This preserves the logical ordering of replies, capabilities, etc., sent 2978 * by the MDS as they are applied to our local cache. 2979 */ 2980 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 2981 { 2982 struct ceph_mds_client *mdsc = session->s_mdsc; 2983 struct ceph_mds_request *req; 2984 struct ceph_mds_reply_head *head = msg->front.iov_base; 2985 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 2986 struct ceph_snap_realm *realm; 2987 u64 tid; 2988 int err, result; 2989 int mds = session->s_mds; 2990 2991 if (msg->front.iov_len < sizeof(*head)) { 2992 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 2993 ceph_msg_dump(msg); 2994 return; 2995 } 2996 2997 /* get request, session */ 2998 tid = le64_to_cpu(msg->hdr.tid); 2999 mutex_lock(&mdsc->mutex); 3000 req = lookup_get_request(mdsc, tid); 3001 if (!req) { 3002 dout("handle_reply on unknown tid %llu\n", tid); 3003 mutex_unlock(&mdsc->mutex); 3004 return; 3005 } 3006 dout("handle_reply %p\n", req); 3007 3008 /* correct session? */ 3009 if (req->r_session != session) { 3010 pr_err("mdsc_handle_reply got %llu on session mds%d" 3011 " not mds%d\n", tid, session->s_mds, 3012 req->r_session ? req->r_session->s_mds : -1); 3013 mutex_unlock(&mdsc->mutex); 3014 goto out; 3015 } 3016 3017 /* dup? */ 3018 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3019 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3020 pr_warn("got a dup %s reply on %llu from mds%d\n", 3021 head->safe ? "safe" : "unsafe", tid, mds); 3022 mutex_unlock(&mdsc->mutex); 3023 goto out; 3024 } 3025 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3026 pr_warn("got unsafe after safe on %llu from mds%d\n", 3027 tid, mds); 3028 mutex_unlock(&mdsc->mutex); 3029 goto out; 3030 } 3031 3032 result = le32_to_cpu(head->result); 3033 3034 /* 3035 * Handle an ESTALE 3036 * if we're not talking to the authority, send to them 3037 * if the authority has changed while we weren't looking, 3038 * send to new authority 3039 * Otherwise we just have to return an ESTALE 3040 */ 3041 if (result == -ESTALE) { 3042 dout("got ESTALE on request %llu\n", req->r_tid); 3043 req->r_resend_mds = -1; 3044 if (req->r_direct_mode != USE_AUTH_MDS) { 3045 dout("not using auth, setting for that now\n"); 3046 req->r_direct_mode = USE_AUTH_MDS; 3047 __do_request(mdsc, req); 3048 mutex_unlock(&mdsc->mutex); 3049 goto out; 3050 } else { 3051 int mds = __choose_mds(mdsc, req, NULL); 3052 if (mds >= 0 && mds != req->r_session->s_mds) { 3053 dout("but auth changed, so resending\n"); 3054 __do_request(mdsc, req); 3055 mutex_unlock(&mdsc->mutex); 3056 goto out; 3057 } 3058 } 3059 dout("have to return ESTALE on request %llu\n", req->r_tid); 3060 } 3061 3062 3063 if (head->safe) { 3064 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3065 __unregister_request(mdsc, req); 3066 3067 /* last request during umount? */ 3068 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3069 complete_all(&mdsc->safe_umount_waiters); 3070 3071 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3072 /* 3073 * We already handled the unsafe response, now do the 3074 * cleanup. No need to examine the response; the MDS 3075 * doesn't include any result info in the safe 3076 * response. And even if it did, there is nothing 3077 * useful we could do with a revised return value. 3078 */ 3079 dout("got safe reply %llu, mds%d\n", tid, mds); 3080 3081 mutex_unlock(&mdsc->mutex); 3082 goto out; 3083 } 3084 } else { 3085 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3086 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3087 } 3088 3089 dout("handle_reply tid %lld result %d\n", tid, result); 3090 rinfo = &req->r_reply_info; 3091 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3092 err = parse_reply_info(session, msg, rinfo, (u64)-1); 3093 else 3094 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); 3095 mutex_unlock(&mdsc->mutex); 3096 3097 mutex_lock(&session->s_mutex); 3098 if (err < 0) { 3099 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 3100 ceph_msg_dump(msg); 3101 goto out_err; 3102 } 3103 3104 /* snap trace */ 3105 realm = NULL; 3106 if (rinfo->snapblob_len) { 3107 down_write(&mdsc->snap_rwsem); 3108 ceph_update_snap_trace(mdsc, rinfo->snapblob, 3109 rinfo->snapblob + rinfo->snapblob_len, 3110 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3111 &realm); 3112 downgrade_write(&mdsc->snap_rwsem); 3113 } else { 3114 down_read(&mdsc->snap_rwsem); 3115 } 3116 3117 /* insert trace into our cache */ 3118 mutex_lock(&req->r_fill_mutex); 3119 current->journal_info = req; 3120 err = ceph_fill_trace(mdsc->fsc->sb, req); 3121 if (err == 0) { 3122 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3123 req->r_op == CEPH_MDS_OP_LSSNAP)) 3124 ceph_readdir_prepopulate(req, req->r_session); 3125 } 3126 current->journal_info = NULL; 3127 mutex_unlock(&req->r_fill_mutex); 3128 3129 up_read(&mdsc->snap_rwsem); 3130 if (realm) 3131 ceph_put_snap_realm(mdsc, realm); 3132 3133 if (err == 0) { 3134 if (req->r_target_inode && 3135 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3136 struct ceph_inode_info *ci = 3137 ceph_inode(req->r_target_inode); 3138 spin_lock(&ci->i_unsafe_lock); 3139 list_add_tail(&req->r_unsafe_target_item, 3140 &ci->i_unsafe_iops); 3141 spin_unlock(&ci->i_unsafe_lock); 3142 } 3143 3144 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3145 } 3146 out_err: 3147 mutex_lock(&mdsc->mutex); 3148 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3149 if (err) { 3150 req->r_err = err; 3151 } else { 3152 req->r_reply = ceph_msg_get(msg); 3153 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3154 } 3155 } else { 3156 dout("reply arrived after request %lld was aborted\n", tid); 3157 } 3158 mutex_unlock(&mdsc->mutex); 3159 3160 mutex_unlock(&session->s_mutex); 3161 3162 /* kick calling process */ 3163 complete_request(mdsc, req); 3164 3165 ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency, 3166 req->r_end_latency, err); 3167 out: 3168 ceph_mdsc_put_request(req); 3169 return; 3170 } 3171 3172 3173 3174 /* 3175 * handle mds notification that our request has been forwarded. 3176 */ 3177 static void handle_forward(struct ceph_mds_client *mdsc, 3178 struct ceph_mds_session *session, 3179 struct ceph_msg *msg) 3180 { 3181 struct ceph_mds_request *req; 3182 u64 tid = le64_to_cpu(msg->hdr.tid); 3183 u32 next_mds; 3184 u32 fwd_seq; 3185 int err = -EINVAL; 3186 void *p = msg->front.iov_base; 3187 void *end = p + msg->front.iov_len; 3188 3189 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3190 next_mds = ceph_decode_32(&p); 3191 fwd_seq = ceph_decode_32(&p); 3192 3193 mutex_lock(&mdsc->mutex); 3194 req = lookup_get_request(mdsc, tid); 3195 if (!req) { 3196 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 3197 goto out; /* dup reply? */ 3198 } 3199 3200 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3201 dout("forward tid %llu aborted, unregistering\n", tid); 3202 __unregister_request(mdsc, req); 3203 } else if (fwd_seq <= req->r_num_fwd) { 3204 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 3205 tid, next_mds, req->r_num_fwd, fwd_seq); 3206 } else { 3207 /* resend. forward race not possible; mds would drop */ 3208 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3209 BUG_ON(req->r_err); 3210 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3211 req->r_attempts = 0; 3212 req->r_num_fwd = fwd_seq; 3213 req->r_resend_mds = next_mds; 3214 put_request_session(req); 3215 __do_request(mdsc, req); 3216 } 3217 ceph_mdsc_put_request(req); 3218 out: 3219 mutex_unlock(&mdsc->mutex); 3220 return; 3221 3222 bad: 3223 pr_err("mdsc_handle_forward decode error err=%d\n", err); 3224 } 3225 3226 static int __decode_session_metadata(void **p, void *end, 3227 bool *blacklisted) 3228 { 3229 /* map<string,string> */ 3230 u32 n; 3231 bool err_str; 3232 ceph_decode_32_safe(p, end, n, bad); 3233 while (n-- > 0) { 3234 u32 len; 3235 ceph_decode_32_safe(p, end, len, bad); 3236 ceph_decode_need(p, end, len, bad); 3237 err_str = !strncmp(*p, "error_string", len); 3238 *p += len; 3239 ceph_decode_32_safe(p, end, len, bad); 3240 ceph_decode_need(p, end, len, bad); 3241 if (err_str && strnstr(*p, "blacklisted", len)) 3242 *blacklisted = true; 3243 *p += len; 3244 } 3245 return 0; 3246 bad: 3247 return -1; 3248 } 3249 3250 /* 3251 * handle a mds session control message 3252 */ 3253 static void handle_session(struct ceph_mds_session *session, 3254 struct ceph_msg *msg) 3255 { 3256 struct ceph_mds_client *mdsc = session->s_mdsc; 3257 int mds = session->s_mds; 3258 int msg_version = le16_to_cpu(msg->hdr.version); 3259 void *p = msg->front.iov_base; 3260 void *end = p + msg->front.iov_len; 3261 struct ceph_mds_session_head *h; 3262 u32 op; 3263 u64 seq, features = 0; 3264 int wake = 0; 3265 bool blacklisted = false; 3266 3267 /* decode */ 3268 ceph_decode_need(&p, end, sizeof(*h), bad); 3269 h = p; 3270 p += sizeof(*h); 3271 3272 op = le32_to_cpu(h->op); 3273 seq = le64_to_cpu(h->seq); 3274 3275 if (msg_version >= 3) { 3276 u32 len; 3277 /* version >= 2, metadata */ 3278 if (__decode_session_metadata(&p, end, &blacklisted) < 0) 3279 goto bad; 3280 /* version >= 3, feature bits */ 3281 ceph_decode_32_safe(&p, end, len, bad); 3282 ceph_decode_64_safe(&p, end, features, bad); 3283 p += len - sizeof(features); 3284 } 3285 3286 mutex_lock(&mdsc->mutex); 3287 if (op == CEPH_SESSION_CLOSE) { 3288 ceph_get_mds_session(session); 3289 __unregister_session(mdsc, session); 3290 } 3291 /* FIXME: this ttl calculation is generous */ 3292 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 3293 mutex_unlock(&mdsc->mutex); 3294 3295 mutex_lock(&session->s_mutex); 3296 3297 dout("handle_session mds%d %s %p state %s seq %llu\n", 3298 mds, ceph_session_op_name(op), session, 3299 ceph_session_state_name(session->s_state), seq); 3300 3301 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 3302 session->s_state = CEPH_MDS_SESSION_OPEN; 3303 pr_info("mds%d came back\n", session->s_mds); 3304 } 3305 3306 switch (op) { 3307 case CEPH_SESSION_OPEN: 3308 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3309 pr_info("mds%d reconnect success\n", session->s_mds); 3310 session->s_state = CEPH_MDS_SESSION_OPEN; 3311 session->s_features = features; 3312 renewed_caps(mdsc, session, 0); 3313 wake = 1; 3314 if (mdsc->stopping) 3315 __close_session(mdsc, session); 3316 break; 3317 3318 case CEPH_SESSION_RENEWCAPS: 3319 if (session->s_renew_seq == seq) 3320 renewed_caps(mdsc, session, 1); 3321 break; 3322 3323 case CEPH_SESSION_CLOSE: 3324 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3325 pr_info("mds%d reconnect denied\n", session->s_mds); 3326 session->s_state = CEPH_MDS_SESSION_CLOSED; 3327 cleanup_session_requests(mdsc, session); 3328 remove_session_caps(session); 3329 wake = 2; /* for good measure */ 3330 wake_up_all(&mdsc->session_close_wq); 3331 break; 3332 3333 case CEPH_SESSION_STALE: 3334 pr_info("mds%d caps went stale, renewing\n", 3335 session->s_mds); 3336 spin_lock(&session->s_gen_ttl_lock); 3337 session->s_cap_gen++; 3338 session->s_cap_ttl = jiffies - 1; 3339 spin_unlock(&session->s_gen_ttl_lock); 3340 send_renew_caps(mdsc, session); 3341 break; 3342 3343 case CEPH_SESSION_RECALL_STATE: 3344 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 3345 break; 3346 3347 case CEPH_SESSION_FLUSHMSG: 3348 send_flushmsg_ack(mdsc, session, seq); 3349 break; 3350 3351 case CEPH_SESSION_FORCE_RO: 3352 dout("force_session_readonly %p\n", session); 3353 spin_lock(&session->s_cap_lock); 3354 session->s_readonly = true; 3355 spin_unlock(&session->s_cap_lock); 3356 wake_up_session_caps(session, FORCE_RO); 3357 break; 3358 3359 case CEPH_SESSION_REJECT: 3360 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 3361 pr_info("mds%d rejected session\n", session->s_mds); 3362 session->s_state = CEPH_MDS_SESSION_REJECTED; 3363 cleanup_session_requests(mdsc, session); 3364 remove_session_caps(session); 3365 if (blacklisted) 3366 mdsc->fsc->blacklisted = true; 3367 wake = 2; /* for good measure */ 3368 break; 3369 3370 default: 3371 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 3372 WARN_ON(1); 3373 } 3374 3375 mutex_unlock(&session->s_mutex); 3376 if (wake) { 3377 mutex_lock(&mdsc->mutex); 3378 __wake_requests(mdsc, &session->s_waiting); 3379 if (wake == 2) 3380 kick_requests(mdsc, mds); 3381 mutex_unlock(&mdsc->mutex); 3382 } 3383 if (op == CEPH_SESSION_CLOSE) 3384 ceph_put_mds_session(session); 3385 return; 3386 3387 bad: 3388 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 3389 (int)msg->front.iov_len); 3390 ceph_msg_dump(msg); 3391 return; 3392 } 3393 3394 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 3395 { 3396 int dcaps; 3397 3398 dcaps = xchg(&req->r_dir_caps, 0); 3399 if (dcaps) { 3400 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3401 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 3402 } 3403 } 3404 3405 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req) 3406 { 3407 int dcaps; 3408 3409 dcaps = xchg(&req->r_dir_caps, 0); 3410 if (dcaps) { 3411 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3412 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent), 3413 dcaps); 3414 } 3415 } 3416 3417 /* 3418 * called under session->mutex. 3419 */ 3420 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 3421 struct ceph_mds_session *session) 3422 { 3423 struct ceph_mds_request *req, *nreq; 3424 struct rb_node *p; 3425 3426 dout("replay_unsafe_requests mds%d\n", session->s_mds); 3427 3428 mutex_lock(&mdsc->mutex); 3429 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 3430 __send_request(mdsc, session, req, true); 3431 3432 /* 3433 * also re-send old requests when MDS enters reconnect stage. So that MDS 3434 * can process completed request in clientreplay stage. 3435 */ 3436 p = rb_first(&mdsc->request_tree); 3437 while (p) { 3438 req = rb_entry(p, struct ceph_mds_request, r_node); 3439 p = rb_next(p); 3440 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3441 continue; 3442 if (req->r_attempts == 0) 3443 continue; /* only old requests */ 3444 if (!req->r_session) 3445 continue; 3446 if (req->r_session->s_mds != session->s_mds) 3447 continue; 3448 3449 ceph_mdsc_release_dir_caps_no_check(req); 3450 3451 __send_request(mdsc, session, req, true); 3452 } 3453 mutex_unlock(&mdsc->mutex); 3454 } 3455 3456 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 3457 { 3458 struct ceph_msg *reply; 3459 struct ceph_pagelist *_pagelist; 3460 struct page *page; 3461 __le32 *addr; 3462 int err = -ENOMEM; 3463 3464 if (!recon_state->allow_multi) 3465 return -ENOSPC; 3466 3467 /* can't handle message that contains both caps and realm */ 3468 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 3469 3470 /* pre-allocate new pagelist */ 3471 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 3472 if (!_pagelist) 3473 return -ENOMEM; 3474 3475 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3476 if (!reply) 3477 goto fail_msg; 3478 3479 /* placeholder for nr_caps */ 3480 err = ceph_pagelist_encode_32(_pagelist, 0); 3481 if (err < 0) 3482 goto fail; 3483 3484 if (recon_state->nr_caps) { 3485 /* currently encoding caps */ 3486 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 3487 if (err) 3488 goto fail; 3489 } else { 3490 /* placeholder for nr_realms (currently encoding relams) */ 3491 err = ceph_pagelist_encode_32(_pagelist, 0); 3492 if (err < 0) 3493 goto fail; 3494 } 3495 3496 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 3497 if (err) 3498 goto fail; 3499 3500 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 3501 addr = kmap_atomic(page); 3502 if (recon_state->nr_caps) { 3503 /* currently encoding caps */ 3504 *addr = cpu_to_le32(recon_state->nr_caps); 3505 } else { 3506 /* currently encoding relams */ 3507 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 3508 } 3509 kunmap_atomic(addr); 3510 3511 reply->hdr.version = cpu_to_le16(5); 3512 reply->hdr.compat_version = cpu_to_le16(4); 3513 3514 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 3515 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 3516 3517 ceph_con_send(&recon_state->session->s_con, reply); 3518 ceph_pagelist_release(recon_state->pagelist); 3519 3520 recon_state->pagelist = _pagelist; 3521 recon_state->nr_caps = 0; 3522 recon_state->nr_realms = 0; 3523 recon_state->msg_version = 5; 3524 return 0; 3525 fail: 3526 ceph_msg_put(reply); 3527 fail_msg: 3528 ceph_pagelist_release(_pagelist); 3529 return err; 3530 } 3531 3532 /* 3533 * Encode information about a cap for a reconnect with the MDS. 3534 */ 3535 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, 3536 void *arg) 3537 { 3538 union { 3539 struct ceph_mds_cap_reconnect v2; 3540 struct ceph_mds_cap_reconnect_v1 v1; 3541 } rec; 3542 struct ceph_inode_info *ci = cap->ci; 3543 struct ceph_reconnect_state *recon_state = arg; 3544 struct ceph_pagelist *pagelist = recon_state->pagelist; 3545 int err; 3546 u64 snap_follows; 3547 3548 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 3549 inode, ceph_vinop(inode), cap, cap->cap_id, 3550 ceph_cap_string(cap->issued)); 3551 3552 spin_lock(&ci->i_ceph_lock); 3553 cap->seq = 0; /* reset cap seq */ 3554 cap->issue_seq = 0; /* and issue_seq */ 3555 cap->mseq = 0; /* and migrate_seq */ 3556 cap->cap_gen = cap->session->s_cap_gen; 3557 3558 /* These are lost when the session goes away */ 3559 if (S_ISDIR(inode->i_mode)) { 3560 if (cap->issued & CEPH_CAP_DIR_CREATE) { 3561 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 3562 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 3563 } 3564 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 3565 } 3566 3567 if (recon_state->msg_version >= 2) { 3568 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 3569 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3570 rec.v2.issued = cpu_to_le32(cap->issued); 3571 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3572 rec.v2.pathbase = 0; 3573 rec.v2.flock_len = (__force __le32) 3574 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 3575 } else { 3576 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 3577 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3578 rec.v1.issued = cpu_to_le32(cap->issued); 3579 rec.v1.size = cpu_to_le64(inode->i_size); 3580 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 3581 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 3582 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3583 rec.v1.pathbase = 0; 3584 } 3585 3586 if (list_empty(&ci->i_cap_snaps)) { 3587 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 3588 } else { 3589 struct ceph_cap_snap *capsnap = 3590 list_first_entry(&ci->i_cap_snaps, 3591 struct ceph_cap_snap, ci_item); 3592 snap_follows = capsnap->follows; 3593 } 3594 spin_unlock(&ci->i_ceph_lock); 3595 3596 if (recon_state->msg_version >= 2) { 3597 int num_fcntl_locks, num_flock_locks; 3598 struct ceph_filelock *flocks = NULL; 3599 size_t struct_len, total_len = sizeof(u64); 3600 u8 struct_v = 0; 3601 3602 encode_again: 3603 if (rec.v2.flock_len) { 3604 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 3605 } else { 3606 num_fcntl_locks = 0; 3607 num_flock_locks = 0; 3608 } 3609 if (num_fcntl_locks + num_flock_locks > 0) { 3610 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 3611 sizeof(struct ceph_filelock), 3612 GFP_NOFS); 3613 if (!flocks) { 3614 err = -ENOMEM; 3615 goto out_err; 3616 } 3617 err = ceph_encode_locks_to_buffer(inode, flocks, 3618 num_fcntl_locks, 3619 num_flock_locks); 3620 if (err) { 3621 kfree(flocks); 3622 flocks = NULL; 3623 if (err == -ENOSPC) 3624 goto encode_again; 3625 goto out_err; 3626 } 3627 } else { 3628 kfree(flocks); 3629 flocks = NULL; 3630 } 3631 3632 if (recon_state->msg_version >= 3) { 3633 /* version, compat_version and struct_len */ 3634 total_len += 2 * sizeof(u8) + sizeof(u32); 3635 struct_v = 2; 3636 } 3637 /* 3638 * number of encoded locks is stable, so copy to pagelist 3639 */ 3640 struct_len = 2 * sizeof(u32) + 3641 (num_fcntl_locks + num_flock_locks) * 3642 sizeof(struct ceph_filelock); 3643 rec.v2.flock_len = cpu_to_le32(struct_len); 3644 3645 struct_len += sizeof(u32) + sizeof(rec.v2); 3646 3647 if (struct_v >= 2) 3648 struct_len += sizeof(u64); /* snap_follows */ 3649 3650 total_len += struct_len; 3651 3652 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 3653 err = send_reconnect_partial(recon_state); 3654 if (err) 3655 goto out_freeflocks; 3656 pagelist = recon_state->pagelist; 3657 } 3658 3659 err = ceph_pagelist_reserve(pagelist, total_len); 3660 if (err) 3661 goto out_freeflocks; 3662 3663 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3664 if (recon_state->msg_version >= 3) { 3665 ceph_pagelist_encode_8(pagelist, struct_v); 3666 ceph_pagelist_encode_8(pagelist, 1); 3667 ceph_pagelist_encode_32(pagelist, struct_len); 3668 } 3669 ceph_pagelist_encode_string(pagelist, NULL, 0); 3670 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 3671 ceph_locks_to_pagelist(flocks, pagelist, 3672 num_fcntl_locks, num_flock_locks); 3673 if (struct_v >= 2) 3674 ceph_pagelist_encode_64(pagelist, snap_follows); 3675 out_freeflocks: 3676 kfree(flocks); 3677 } else { 3678 u64 pathbase = 0; 3679 int pathlen = 0; 3680 char *path = NULL; 3681 struct dentry *dentry; 3682 3683 dentry = d_find_alias(inode); 3684 if (dentry) { 3685 path = ceph_mdsc_build_path(dentry, 3686 &pathlen, &pathbase, 0); 3687 dput(dentry); 3688 if (IS_ERR(path)) { 3689 err = PTR_ERR(path); 3690 goto out_err; 3691 } 3692 rec.v1.pathbase = cpu_to_le64(pathbase); 3693 } 3694 3695 err = ceph_pagelist_reserve(pagelist, 3696 sizeof(u64) + sizeof(u32) + 3697 pathlen + sizeof(rec.v1)); 3698 if (err) { 3699 goto out_freepath; 3700 } 3701 3702 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3703 ceph_pagelist_encode_string(pagelist, path, pathlen); 3704 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 3705 out_freepath: 3706 ceph_mdsc_free_path(path, pathlen); 3707 } 3708 3709 out_err: 3710 if (err >= 0) 3711 recon_state->nr_caps++; 3712 return err; 3713 } 3714 3715 static int encode_snap_realms(struct ceph_mds_client *mdsc, 3716 struct ceph_reconnect_state *recon_state) 3717 { 3718 struct rb_node *p; 3719 struct ceph_pagelist *pagelist = recon_state->pagelist; 3720 int err = 0; 3721 3722 if (recon_state->msg_version >= 4) { 3723 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 3724 if (err < 0) 3725 goto fail; 3726 } 3727 3728 /* 3729 * snaprealms. we provide mds with the ino, seq (version), and 3730 * parent for all of our realms. If the mds has any newer info, 3731 * it will tell us. 3732 */ 3733 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 3734 struct ceph_snap_realm *realm = 3735 rb_entry(p, struct ceph_snap_realm, node); 3736 struct ceph_mds_snaprealm_reconnect sr_rec; 3737 3738 if (recon_state->msg_version >= 4) { 3739 size_t need = sizeof(u8) * 2 + sizeof(u32) + 3740 sizeof(sr_rec); 3741 3742 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 3743 err = send_reconnect_partial(recon_state); 3744 if (err) 3745 goto fail; 3746 pagelist = recon_state->pagelist; 3747 } 3748 3749 err = ceph_pagelist_reserve(pagelist, need); 3750 if (err) 3751 goto fail; 3752 3753 ceph_pagelist_encode_8(pagelist, 1); 3754 ceph_pagelist_encode_8(pagelist, 1); 3755 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 3756 } 3757 3758 dout(" adding snap realm %llx seq %lld parent %llx\n", 3759 realm->ino, realm->seq, realm->parent_ino); 3760 sr_rec.ino = cpu_to_le64(realm->ino); 3761 sr_rec.seq = cpu_to_le64(realm->seq); 3762 sr_rec.parent = cpu_to_le64(realm->parent_ino); 3763 3764 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 3765 if (err) 3766 goto fail; 3767 3768 recon_state->nr_realms++; 3769 } 3770 fail: 3771 return err; 3772 } 3773 3774 3775 /* 3776 * If an MDS fails and recovers, clients need to reconnect in order to 3777 * reestablish shared state. This includes all caps issued through 3778 * this session _and_ the snap_realm hierarchy. Because it's not 3779 * clear which snap realms the mds cares about, we send everything we 3780 * know about.. that ensures we'll then get any new info the 3781 * recovering MDS might have. 3782 * 3783 * This is a relatively heavyweight operation, but it's rare. 3784 */ 3785 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 3786 struct ceph_mds_session *session) 3787 { 3788 struct ceph_msg *reply; 3789 int mds = session->s_mds; 3790 int err = -ENOMEM; 3791 struct ceph_reconnect_state recon_state = { 3792 .session = session, 3793 }; 3794 LIST_HEAD(dispose); 3795 3796 pr_info("mds%d reconnect start\n", mds); 3797 3798 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 3799 if (!recon_state.pagelist) 3800 goto fail_nopagelist; 3801 3802 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3803 if (!reply) 3804 goto fail_nomsg; 3805 3806 xa_destroy(&session->s_delegated_inos); 3807 3808 mutex_lock(&session->s_mutex); 3809 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 3810 session->s_seq = 0; 3811 3812 dout("session %p state %s\n", session, 3813 ceph_session_state_name(session->s_state)); 3814 3815 spin_lock(&session->s_gen_ttl_lock); 3816 session->s_cap_gen++; 3817 spin_unlock(&session->s_gen_ttl_lock); 3818 3819 spin_lock(&session->s_cap_lock); 3820 /* don't know if session is readonly */ 3821 session->s_readonly = 0; 3822 /* 3823 * notify __ceph_remove_cap() that we are composing cap reconnect. 3824 * If a cap get released before being added to the cap reconnect, 3825 * __ceph_remove_cap() should skip queuing cap release. 3826 */ 3827 session->s_cap_reconnect = 1; 3828 /* drop old cap expires; we're about to reestablish that state */ 3829 detach_cap_releases(session, &dispose); 3830 spin_unlock(&session->s_cap_lock); 3831 dispose_cap_releases(mdsc, &dispose); 3832 3833 /* trim unused caps to reduce MDS's cache rejoin time */ 3834 if (mdsc->fsc->sb->s_root) 3835 shrink_dcache_parent(mdsc->fsc->sb->s_root); 3836 3837 ceph_con_close(&session->s_con); 3838 ceph_con_open(&session->s_con, 3839 CEPH_ENTITY_TYPE_MDS, mds, 3840 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 3841 3842 /* replay unsafe requests */ 3843 replay_unsafe_requests(mdsc, session); 3844 3845 ceph_early_kick_flushing_caps(mdsc, session); 3846 3847 down_read(&mdsc->snap_rwsem); 3848 3849 /* placeholder for nr_caps */ 3850 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 3851 if (err) 3852 goto fail; 3853 3854 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 3855 recon_state.msg_version = 3; 3856 recon_state.allow_multi = true; 3857 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 3858 recon_state.msg_version = 3; 3859 } else { 3860 recon_state.msg_version = 2; 3861 } 3862 /* trsaverse this session's caps */ 3863 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 3864 3865 spin_lock(&session->s_cap_lock); 3866 session->s_cap_reconnect = 0; 3867 spin_unlock(&session->s_cap_lock); 3868 3869 if (err < 0) 3870 goto fail; 3871 3872 /* check if all realms can be encoded into current message */ 3873 if (mdsc->num_snap_realms) { 3874 size_t total_len = 3875 recon_state.pagelist->length + 3876 mdsc->num_snap_realms * 3877 sizeof(struct ceph_mds_snaprealm_reconnect); 3878 if (recon_state.msg_version >= 4) { 3879 /* number of realms */ 3880 total_len += sizeof(u32); 3881 /* version, compat_version and struct_len */ 3882 total_len += mdsc->num_snap_realms * 3883 (2 * sizeof(u8) + sizeof(u32)); 3884 } 3885 if (total_len > RECONNECT_MAX_SIZE) { 3886 if (!recon_state.allow_multi) { 3887 err = -ENOSPC; 3888 goto fail; 3889 } 3890 if (recon_state.nr_caps) { 3891 err = send_reconnect_partial(&recon_state); 3892 if (err) 3893 goto fail; 3894 } 3895 recon_state.msg_version = 5; 3896 } 3897 } 3898 3899 err = encode_snap_realms(mdsc, &recon_state); 3900 if (err < 0) 3901 goto fail; 3902 3903 if (recon_state.msg_version >= 5) { 3904 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 3905 if (err < 0) 3906 goto fail; 3907 } 3908 3909 if (recon_state.nr_caps || recon_state.nr_realms) { 3910 struct page *page = 3911 list_first_entry(&recon_state.pagelist->head, 3912 struct page, lru); 3913 __le32 *addr = kmap_atomic(page); 3914 if (recon_state.nr_caps) { 3915 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 3916 *addr = cpu_to_le32(recon_state.nr_caps); 3917 } else if (recon_state.msg_version >= 4) { 3918 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 3919 } 3920 kunmap_atomic(addr); 3921 } 3922 3923 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 3924 if (recon_state.msg_version >= 4) 3925 reply->hdr.compat_version = cpu_to_le16(4); 3926 3927 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 3928 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 3929 3930 ceph_con_send(&session->s_con, reply); 3931 3932 mutex_unlock(&session->s_mutex); 3933 3934 mutex_lock(&mdsc->mutex); 3935 __wake_requests(mdsc, &session->s_waiting); 3936 mutex_unlock(&mdsc->mutex); 3937 3938 up_read(&mdsc->snap_rwsem); 3939 ceph_pagelist_release(recon_state.pagelist); 3940 return; 3941 3942 fail: 3943 ceph_msg_put(reply); 3944 up_read(&mdsc->snap_rwsem); 3945 mutex_unlock(&session->s_mutex); 3946 fail_nomsg: 3947 ceph_pagelist_release(recon_state.pagelist); 3948 fail_nopagelist: 3949 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 3950 return; 3951 } 3952 3953 3954 /* 3955 * compare old and new mdsmaps, kicking requests 3956 * and closing out old connections as necessary 3957 * 3958 * called under mdsc->mutex. 3959 */ 3960 static void check_new_map(struct ceph_mds_client *mdsc, 3961 struct ceph_mdsmap *newmap, 3962 struct ceph_mdsmap *oldmap) 3963 { 3964 int i; 3965 int oldstate, newstate; 3966 struct ceph_mds_session *s; 3967 3968 dout("check_new_map new %u old %u\n", 3969 newmap->m_epoch, oldmap->m_epoch); 3970 3971 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 3972 if (!mdsc->sessions[i]) 3973 continue; 3974 s = mdsc->sessions[i]; 3975 oldstate = ceph_mdsmap_get_state(oldmap, i); 3976 newstate = ceph_mdsmap_get_state(newmap, i); 3977 3978 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 3979 i, ceph_mds_state_name(oldstate), 3980 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 3981 ceph_mds_state_name(newstate), 3982 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 3983 ceph_session_state_name(s->s_state)); 3984 3985 if (i >= newmap->possible_max_rank) { 3986 /* force close session for stopped mds */ 3987 ceph_get_mds_session(s); 3988 __unregister_session(mdsc, s); 3989 __wake_requests(mdsc, &s->s_waiting); 3990 mutex_unlock(&mdsc->mutex); 3991 3992 mutex_lock(&s->s_mutex); 3993 cleanup_session_requests(mdsc, s); 3994 remove_session_caps(s); 3995 mutex_unlock(&s->s_mutex); 3996 3997 ceph_put_mds_session(s); 3998 3999 mutex_lock(&mdsc->mutex); 4000 kick_requests(mdsc, i); 4001 continue; 4002 } 4003 4004 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 4005 ceph_mdsmap_get_addr(newmap, i), 4006 sizeof(struct ceph_entity_addr))) { 4007 /* just close it */ 4008 mutex_unlock(&mdsc->mutex); 4009 mutex_lock(&s->s_mutex); 4010 mutex_lock(&mdsc->mutex); 4011 ceph_con_close(&s->s_con); 4012 mutex_unlock(&s->s_mutex); 4013 s->s_state = CEPH_MDS_SESSION_RESTARTING; 4014 } else if (oldstate == newstate) { 4015 continue; /* nothing new with this mds */ 4016 } 4017 4018 /* 4019 * send reconnect? 4020 */ 4021 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 4022 newstate >= CEPH_MDS_STATE_RECONNECT) { 4023 mutex_unlock(&mdsc->mutex); 4024 send_mds_reconnect(mdsc, s); 4025 mutex_lock(&mdsc->mutex); 4026 } 4027 4028 /* 4029 * kick request on any mds that has gone active. 4030 */ 4031 if (oldstate < CEPH_MDS_STATE_ACTIVE && 4032 newstate >= CEPH_MDS_STATE_ACTIVE) { 4033 if (oldstate != CEPH_MDS_STATE_CREATING && 4034 oldstate != CEPH_MDS_STATE_STARTING) 4035 pr_info("mds%d recovery completed\n", s->s_mds); 4036 kick_requests(mdsc, i); 4037 mutex_unlock(&mdsc->mutex); 4038 mutex_lock(&s->s_mutex); 4039 mutex_lock(&mdsc->mutex); 4040 ceph_kick_flushing_caps(mdsc, s); 4041 mutex_unlock(&s->s_mutex); 4042 wake_up_session_caps(s, RECONNECT); 4043 } 4044 } 4045 4046 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4047 s = mdsc->sessions[i]; 4048 if (!s) 4049 continue; 4050 if (!ceph_mdsmap_is_laggy(newmap, i)) 4051 continue; 4052 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4053 s->s_state == CEPH_MDS_SESSION_HUNG || 4054 s->s_state == CEPH_MDS_SESSION_CLOSING) { 4055 dout(" connecting to export targets of laggy mds%d\n", 4056 i); 4057 __open_export_target_sessions(mdsc, s); 4058 } 4059 } 4060 } 4061 4062 4063 4064 /* 4065 * leases 4066 */ 4067 4068 /* 4069 * caller must hold session s_mutex, dentry->d_lock 4070 */ 4071 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 4072 { 4073 struct ceph_dentry_info *di = ceph_dentry(dentry); 4074 4075 ceph_put_mds_session(di->lease_session); 4076 di->lease_session = NULL; 4077 } 4078 4079 static void handle_lease(struct ceph_mds_client *mdsc, 4080 struct ceph_mds_session *session, 4081 struct ceph_msg *msg) 4082 { 4083 struct super_block *sb = mdsc->fsc->sb; 4084 struct inode *inode; 4085 struct dentry *parent, *dentry; 4086 struct ceph_dentry_info *di; 4087 int mds = session->s_mds; 4088 struct ceph_mds_lease *h = msg->front.iov_base; 4089 u32 seq; 4090 struct ceph_vino vino; 4091 struct qstr dname; 4092 int release = 0; 4093 4094 dout("handle_lease from mds%d\n", mds); 4095 4096 /* decode */ 4097 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 4098 goto bad; 4099 vino.ino = le64_to_cpu(h->ino); 4100 vino.snap = CEPH_NOSNAP; 4101 seq = le32_to_cpu(h->seq); 4102 dname.len = get_unaligned_le32(h + 1); 4103 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 4104 goto bad; 4105 dname.name = (void *)(h + 1) + sizeof(u32); 4106 4107 /* lookup inode */ 4108 inode = ceph_find_inode(sb, vino); 4109 dout("handle_lease %s, ino %llx %p %.*s\n", 4110 ceph_lease_op_name(h->action), vino.ino, inode, 4111 dname.len, dname.name); 4112 4113 mutex_lock(&session->s_mutex); 4114 session->s_seq++; 4115 4116 if (!inode) { 4117 dout("handle_lease no inode %llx\n", vino.ino); 4118 goto release; 4119 } 4120 4121 /* dentry */ 4122 parent = d_find_alias(inode); 4123 if (!parent) { 4124 dout("no parent dentry on inode %p\n", inode); 4125 WARN_ON(1); 4126 goto release; /* hrm... */ 4127 } 4128 dname.hash = full_name_hash(parent, dname.name, dname.len); 4129 dentry = d_lookup(parent, &dname); 4130 dput(parent); 4131 if (!dentry) 4132 goto release; 4133 4134 spin_lock(&dentry->d_lock); 4135 di = ceph_dentry(dentry); 4136 switch (h->action) { 4137 case CEPH_MDS_LEASE_REVOKE: 4138 if (di->lease_session == session) { 4139 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 4140 h->seq = cpu_to_le32(di->lease_seq); 4141 __ceph_mdsc_drop_dentry_lease(dentry); 4142 } 4143 release = 1; 4144 break; 4145 4146 case CEPH_MDS_LEASE_RENEW: 4147 if (di->lease_session == session && 4148 di->lease_gen == session->s_cap_gen && 4149 di->lease_renew_from && 4150 di->lease_renew_after == 0) { 4151 unsigned long duration = 4152 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 4153 4154 di->lease_seq = seq; 4155 di->time = di->lease_renew_from + duration; 4156 di->lease_renew_after = di->lease_renew_from + 4157 (duration >> 1); 4158 di->lease_renew_from = 0; 4159 } 4160 break; 4161 } 4162 spin_unlock(&dentry->d_lock); 4163 dput(dentry); 4164 4165 if (!release) 4166 goto out; 4167 4168 release: 4169 /* let's just reuse the same message */ 4170 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 4171 ceph_msg_get(msg); 4172 ceph_con_send(&session->s_con, msg); 4173 4174 out: 4175 mutex_unlock(&session->s_mutex); 4176 /* avoid calling iput_final() in mds dispatch threads */ 4177 ceph_async_iput(inode); 4178 return; 4179 4180 bad: 4181 pr_err("corrupt lease message\n"); 4182 ceph_msg_dump(msg); 4183 } 4184 4185 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 4186 struct dentry *dentry, char action, 4187 u32 seq) 4188 { 4189 struct ceph_msg *msg; 4190 struct ceph_mds_lease *lease; 4191 struct inode *dir; 4192 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 4193 4194 dout("lease_send_msg identry %p %s to mds%d\n", 4195 dentry, ceph_lease_op_name(action), session->s_mds); 4196 4197 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 4198 if (!msg) 4199 return; 4200 lease = msg->front.iov_base; 4201 lease->action = action; 4202 lease->seq = cpu_to_le32(seq); 4203 4204 spin_lock(&dentry->d_lock); 4205 dir = d_inode(dentry->d_parent); 4206 lease->ino = cpu_to_le64(ceph_ino(dir)); 4207 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 4208 4209 put_unaligned_le32(dentry->d_name.len, lease + 1); 4210 memcpy((void *)(lease + 1) + 4, 4211 dentry->d_name.name, dentry->d_name.len); 4212 spin_unlock(&dentry->d_lock); 4213 /* 4214 * if this is a preemptive lease RELEASE, no need to 4215 * flush request stream, since the actual request will 4216 * soon follow. 4217 */ 4218 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 4219 4220 ceph_con_send(&session->s_con, msg); 4221 } 4222 4223 /* 4224 * lock unlock sessions, to wait ongoing session activities 4225 */ 4226 static void lock_unlock_sessions(struct ceph_mds_client *mdsc) 4227 { 4228 int i; 4229 4230 mutex_lock(&mdsc->mutex); 4231 for (i = 0; i < mdsc->max_sessions; i++) { 4232 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4233 if (!s) 4234 continue; 4235 mutex_unlock(&mdsc->mutex); 4236 mutex_lock(&s->s_mutex); 4237 mutex_unlock(&s->s_mutex); 4238 ceph_put_mds_session(s); 4239 mutex_lock(&mdsc->mutex); 4240 } 4241 mutex_unlock(&mdsc->mutex); 4242 } 4243 4244 static void maybe_recover_session(struct ceph_mds_client *mdsc) 4245 { 4246 struct ceph_fs_client *fsc = mdsc->fsc; 4247 4248 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 4249 return; 4250 4251 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 4252 return; 4253 4254 if (!READ_ONCE(fsc->blacklisted)) 4255 return; 4256 4257 if (fsc->last_auto_reconnect && 4258 time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30)) 4259 return; 4260 4261 pr_info("auto reconnect after blacklisted\n"); 4262 fsc->last_auto_reconnect = jiffies; 4263 ceph_force_reconnect(fsc->sb); 4264 } 4265 4266 /* 4267 * delayed work -- periodically trim expired leases, renew caps with mds 4268 */ 4269 static void schedule_delayed(struct ceph_mds_client *mdsc) 4270 { 4271 int delay = 5; 4272 unsigned hz = round_jiffies_relative(HZ * delay); 4273 schedule_delayed_work(&mdsc->delayed_work, hz); 4274 } 4275 4276 static void delayed_work(struct work_struct *work) 4277 { 4278 int i; 4279 struct ceph_mds_client *mdsc = 4280 container_of(work, struct ceph_mds_client, delayed_work.work); 4281 int renew_interval; 4282 int renew_caps; 4283 4284 dout("mdsc delayed_work\n"); 4285 4286 mutex_lock(&mdsc->mutex); 4287 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 4288 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 4289 mdsc->last_renew_caps); 4290 if (renew_caps) 4291 mdsc->last_renew_caps = jiffies; 4292 4293 for (i = 0; i < mdsc->max_sessions; i++) { 4294 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4295 if (!s) 4296 continue; 4297 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 4298 dout("resending session close request for mds%d\n", 4299 s->s_mds); 4300 request_close_session(mdsc, s); 4301 ceph_put_mds_session(s); 4302 continue; 4303 } 4304 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 4305 if (s->s_state == CEPH_MDS_SESSION_OPEN) { 4306 s->s_state = CEPH_MDS_SESSION_HUNG; 4307 pr_info("mds%d hung\n", s->s_mds); 4308 } 4309 } 4310 if (s->s_state == CEPH_MDS_SESSION_NEW || 4311 s->s_state == CEPH_MDS_SESSION_RESTARTING || 4312 s->s_state == CEPH_MDS_SESSION_REJECTED) { 4313 /* this mds is failed or recovering, just wait */ 4314 ceph_put_mds_session(s); 4315 continue; 4316 } 4317 mutex_unlock(&mdsc->mutex); 4318 4319 mutex_lock(&s->s_mutex); 4320 if (renew_caps) 4321 send_renew_caps(mdsc, s); 4322 else 4323 ceph_con_keepalive(&s->s_con); 4324 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4325 s->s_state == CEPH_MDS_SESSION_HUNG) 4326 ceph_send_cap_releases(mdsc, s); 4327 mutex_unlock(&s->s_mutex); 4328 ceph_put_mds_session(s); 4329 4330 mutex_lock(&mdsc->mutex); 4331 } 4332 mutex_unlock(&mdsc->mutex); 4333 4334 ceph_check_delayed_caps(mdsc); 4335 4336 ceph_queue_cap_reclaim_work(mdsc); 4337 4338 ceph_trim_snapid_map(mdsc); 4339 4340 maybe_recover_session(mdsc); 4341 4342 schedule_delayed(mdsc); 4343 } 4344 4345 int ceph_mdsc_init(struct ceph_fs_client *fsc) 4346 4347 { 4348 struct ceph_mds_client *mdsc; 4349 int err; 4350 4351 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 4352 if (!mdsc) 4353 return -ENOMEM; 4354 mdsc->fsc = fsc; 4355 mutex_init(&mdsc->mutex); 4356 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 4357 if (!mdsc->mdsmap) { 4358 err = -ENOMEM; 4359 goto err_mdsc; 4360 } 4361 4362 fsc->mdsc = mdsc; 4363 init_completion(&mdsc->safe_umount_waiters); 4364 init_waitqueue_head(&mdsc->session_close_wq); 4365 INIT_LIST_HEAD(&mdsc->waiting_for_map); 4366 mdsc->sessions = NULL; 4367 atomic_set(&mdsc->num_sessions, 0); 4368 mdsc->max_sessions = 0; 4369 mdsc->stopping = 0; 4370 atomic64_set(&mdsc->quotarealms_count, 0); 4371 mdsc->quotarealms_inodes = RB_ROOT; 4372 mutex_init(&mdsc->quotarealms_inodes_mutex); 4373 mdsc->last_snap_seq = 0; 4374 init_rwsem(&mdsc->snap_rwsem); 4375 mdsc->snap_realms = RB_ROOT; 4376 INIT_LIST_HEAD(&mdsc->snap_empty); 4377 mdsc->num_snap_realms = 0; 4378 spin_lock_init(&mdsc->snap_empty_lock); 4379 mdsc->last_tid = 0; 4380 mdsc->oldest_tid = 0; 4381 mdsc->request_tree = RB_ROOT; 4382 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 4383 mdsc->last_renew_caps = jiffies; 4384 INIT_LIST_HEAD(&mdsc->cap_delay_list); 4385 INIT_LIST_HEAD(&mdsc->cap_wait_list); 4386 spin_lock_init(&mdsc->cap_delay_lock); 4387 INIT_LIST_HEAD(&mdsc->snap_flush_list); 4388 spin_lock_init(&mdsc->snap_flush_lock); 4389 mdsc->last_cap_flush_tid = 1; 4390 INIT_LIST_HEAD(&mdsc->cap_flush_list); 4391 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 4392 mdsc->num_cap_flushing = 0; 4393 spin_lock_init(&mdsc->cap_dirty_lock); 4394 init_waitqueue_head(&mdsc->cap_flushing_wq); 4395 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 4396 atomic_set(&mdsc->cap_reclaim_pending, 0); 4397 err = ceph_metric_init(&mdsc->metric); 4398 if (err) 4399 goto err_mdsmap; 4400 4401 spin_lock_init(&mdsc->dentry_list_lock); 4402 INIT_LIST_HEAD(&mdsc->dentry_leases); 4403 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 4404 4405 ceph_caps_init(mdsc); 4406 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 4407 4408 spin_lock_init(&mdsc->snapid_map_lock); 4409 mdsc->snapid_map_tree = RB_ROOT; 4410 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 4411 4412 init_rwsem(&mdsc->pool_perm_rwsem); 4413 mdsc->pool_perm_tree = RB_ROOT; 4414 4415 strscpy(mdsc->nodename, utsname()->nodename, 4416 sizeof(mdsc->nodename)); 4417 return 0; 4418 4419 err_mdsmap: 4420 kfree(mdsc->mdsmap); 4421 err_mdsc: 4422 kfree(mdsc); 4423 return err; 4424 } 4425 4426 /* 4427 * Wait for safe replies on open mds requests. If we time out, drop 4428 * all requests from the tree to avoid dangling dentry refs. 4429 */ 4430 static void wait_requests(struct ceph_mds_client *mdsc) 4431 { 4432 struct ceph_options *opts = mdsc->fsc->client->options; 4433 struct ceph_mds_request *req; 4434 4435 mutex_lock(&mdsc->mutex); 4436 if (__get_oldest_req(mdsc)) { 4437 mutex_unlock(&mdsc->mutex); 4438 4439 dout("wait_requests waiting for requests\n"); 4440 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 4441 ceph_timeout_jiffies(opts->mount_timeout)); 4442 4443 /* tear down remaining requests */ 4444 mutex_lock(&mdsc->mutex); 4445 while ((req = __get_oldest_req(mdsc))) { 4446 dout("wait_requests timed out on tid %llu\n", 4447 req->r_tid); 4448 list_del_init(&req->r_wait); 4449 __unregister_request(mdsc, req); 4450 } 4451 } 4452 mutex_unlock(&mdsc->mutex); 4453 dout("wait_requests done\n"); 4454 } 4455 4456 /* 4457 * called before mount is ro, and before dentries are torn down. 4458 * (hmm, does this still race with new lookups?) 4459 */ 4460 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 4461 { 4462 dout("pre_umount\n"); 4463 mdsc->stopping = 1; 4464 4465 lock_unlock_sessions(mdsc); 4466 ceph_flush_dirty_caps(mdsc); 4467 wait_requests(mdsc); 4468 4469 /* 4470 * wait for reply handlers to drop their request refs and 4471 * their inode/dcache refs 4472 */ 4473 ceph_msgr_flush(); 4474 4475 ceph_cleanup_quotarealms_inodes(mdsc); 4476 } 4477 4478 /* 4479 * wait for all write mds requests to flush. 4480 */ 4481 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 4482 { 4483 struct ceph_mds_request *req = NULL, *nextreq; 4484 struct rb_node *n; 4485 4486 mutex_lock(&mdsc->mutex); 4487 dout("wait_unsafe_requests want %lld\n", want_tid); 4488 restart: 4489 req = __get_oldest_req(mdsc); 4490 while (req && req->r_tid <= want_tid) { 4491 /* find next request */ 4492 n = rb_next(&req->r_node); 4493 if (n) 4494 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 4495 else 4496 nextreq = NULL; 4497 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 4498 (req->r_op & CEPH_MDS_OP_WRITE)) { 4499 /* write op */ 4500 ceph_mdsc_get_request(req); 4501 if (nextreq) 4502 ceph_mdsc_get_request(nextreq); 4503 mutex_unlock(&mdsc->mutex); 4504 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 4505 req->r_tid, want_tid); 4506 wait_for_completion(&req->r_safe_completion); 4507 mutex_lock(&mdsc->mutex); 4508 ceph_mdsc_put_request(req); 4509 if (!nextreq) 4510 break; /* next dne before, so we're done! */ 4511 if (RB_EMPTY_NODE(&nextreq->r_node)) { 4512 /* next request was removed from tree */ 4513 ceph_mdsc_put_request(nextreq); 4514 goto restart; 4515 } 4516 ceph_mdsc_put_request(nextreq); /* won't go away */ 4517 } 4518 req = nextreq; 4519 } 4520 mutex_unlock(&mdsc->mutex); 4521 dout("wait_unsafe_requests done\n"); 4522 } 4523 4524 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 4525 { 4526 u64 want_tid, want_flush; 4527 4528 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4529 return; 4530 4531 dout("sync\n"); 4532 mutex_lock(&mdsc->mutex); 4533 want_tid = mdsc->last_tid; 4534 mutex_unlock(&mdsc->mutex); 4535 4536 ceph_flush_dirty_caps(mdsc); 4537 spin_lock(&mdsc->cap_dirty_lock); 4538 want_flush = mdsc->last_cap_flush_tid; 4539 if (!list_empty(&mdsc->cap_flush_list)) { 4540 struct ceph_cap_flush *cf = 4541 list_last_entry(&mdsc->cap_flush_list, 4542 struct ceph_cap_flush, g_list); 4543 cf->wake = true; 4544 } 4545 spin_unlock(&mdsc->cap_dirty_lock); 4546 4547 dout("sync want tid %lld flush_seq %lld\n", 4548 want_tid, want_flush); 4549 4550 wait_unsafe_requests(mdsc, want_tid); 4551 wait_caps_flush(mdsc, want_flush); 4552 } 4553 4554 /* 4555 * true if all sessions are closed, or we force unmount 4556 */ 4557 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 4558 { 4559 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4560 return true; 4561 return atomic_read(&mdsc->num_sessions) <= skipped; 4562 } 4563 4564 /* 4565 * called after sb is ro. 4566 */ 4567 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 4568 { 4569 struct ceph_options *opts = mdsc->fsc->client->options; 4570 struct ceph_mds_session *session; 4571 int i; 4572 int skipped = 0; 4573 4574 dout("close_sessions\n"); 4575 4576 /* close sessions */ 4577 mutex_lock(&mdsc->mutex); 4578 for (i = 0; i < mdsc->max_sessions; i++) { 4579 session = __ceph_lookup_mds_session(mdsc, i); 4580 if (!session) 4581 continue; 4582 mutex_unlock(&mdsc->mutex); 4583 mutex_lock(&session->s_mutex); 4584 if (__close_session(mdsc, session) <= 0) 4585 skipped++; 4586 mutex_unlock(&session->s_mutex); 4587 ceph_put_mds_session(session); 4588 mutex_lock(&mdsc->mutex); 4589 } 4590 mutex_unlock(&mdsc->mutex); 4591 4592 dout("waiting for sessions to close\n"); 4593 wait_event_timeout(mdsc->session_close_wq, 4594 done_closing_sessions(mdsc, skipped), 4595 ceph_timeout_jiffies(opts->mount_timeout)); 4596 4597 /* tear down remaining sessions */ 4598 mutex_lock(&mdsc->mutex); 4599 for (i = 0; i < mdsc->max_sessions; i++) { 4600 if (mdsc->sessions[i]) { 4601 session = ceph_get_mds_session(mdsc->sessions[i]); 4602 __unregister_session(mdsc, session); 4603 mutex_unlock(&mdsc->mutex); 4604 mutex_lock(&session->s_mutex); 4605 remove_session_caps(session); 4606 mutex_unlock(&session->s_mutex); 4607 ceph_put_mds_session(session); 4608 mutex_lock(&mdsc->mutex); 4609 } 4610 } 4611 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 4612 mutex_unlock(&mdsc->mutex); 4613 4614 ceph_cleanup_snapid_map(mdsc); 4615 ceph_cleanup_empty_realms(mdsc); 4616 4617 cancel_work_sync(&mdsc->cap_reclaim_work); 4618 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4619 4620 dout("stopped\n"); 4621 } 4622 4623 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 4624 { 4625 struct ceph_mds_session *session; 4626 int mds; 4627 4628 dout("force umount\n"); 4629 4630 mutex_lock(&mdsc->mutex); 4631 for (mds = 0; mds < mdsc->max_sessions; mds++) { 4632 session = __ceph_lookup_mds_session(mdsc, mds); 4633 if (!session) 4634 continue; 4635 4636 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 4637 __unregister_session(mdsc, session); 4638 __wake_requests(mdsc, &session->s_waiting); 4639 mutex_unlock(&mdsc->mutex); 4640 4641 mutex_lock(&session->s_mutex); 4642 __close_session(mdsc, session); 4643 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 4644 cleanup_session_requests(mdsc, session); 4645 remove_session_caps(session); 4646 } 4647 mutex_unlock(&session->s_mutex); 4648 ceph_put_mds_session(session); 4649 4650 mutex_lock(&mdsc->mutex); 4651 kick_requests(mdsc, mds); 4652 } 4653 __wake_requests(mdsc, &mdsc->waiting_for_map); 4654 mutex_unlock(&mdsc->mutex); 4655 } 4656 4657 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 4658 { 4659 dout("stop\n"); 4660 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4661 if (mdsc->mdsmap) 4662 ceph_mdsmap_destroy(mdsc->mdsmap); 4663 kfree(mdsc->sessions); 4664 ceph_caps_finalize(mdsc); 4665 ceph_pool_perm_destroy(mdsc); 4666 } 4667 4668 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 4669 { 4670 struct ceph_mds_client *mdsc = fsc->mdsc; 4671 dout("mdsc_destroy %p\n", mdsc); 4672 4673 if (!mdsc) 4674 return; 4675 4676 /* flush out any connection work with references to us */ 4677 ceph_msgr_flush(); 4678 4679 ceph_mdsc_stop(mdsc); 4680 4681 ceph_metric_destroy(&mdsc->metric); 4682 4683 fsc->mdsc = NULL; 4684 kfree(mdsc); 4685 dout("mdsc_destroy %p done\n", mdsc); 4686 } 4687 4688 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4689 { 4690 struct ceph_fs_client *fsc = mdsc->fsc; 4691 const char *mds_namespace = fsc->mount_options->mds_namespace; 4692 void *p = msg->front.iov_base; 4693 void *end = p + msg->front.iov_len; 4694 u32 epoch; 4695 u32 map_len; 4696 u32 num_fs; 4697 u32 mount_fscid = (u32)-1; 4698 u8 struct_v, struct_cv; 4699 int err = -EINVAL; 4700 4701 ceph_decode_need(&p, end, sizeof(u32), bad); 4702 epoch = ceph_decode_32(&p); 4703 4704 dout("handle_fsmap epoch %u\n", epoch); 4705 4706 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4707 struct_v = ceph_decode_8(&p); 4708 struct_cv = ceph_decode_8(&p); 4709 map_len = ceph_decode_32(&p); 4710 4711 ceph_decode_need(&p, end, sizeof(u32) * 3, bad); 4712 p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */ 4713 4714 num_fs = ceph_decode_32(&p); 4715 while (num_fs-- > 0) { 4716 void *info_p, *info_end; 4717 u32 info_len; 4718 u8 info_v, info_cv; 4719 u32 fscid, namelen; 4720 4721 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4722 info_v = ceph_decode_8(&p); 4723 info_cv = ceph_decode_8(&p); 4724 info_len = ceph_decode_32(&p); 4725 ceph_decode_need(&p, end, info_len, bad); 4726 info_p = p; 4727 info_end = p + info_len; 4728 p = info_end; 4729 4730 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 4731 fscid = ceph_decode_32(&info_p); 4732 namelen = ceph_decode_32(&info_p); 4733 ceph_decode_need(&info_p, info_end, namelen, bad); 4734 4735 if (mds_namespace && 4736 strlen(mds_namespace) == namelen && 4737 !strncmp(mds_namespace, (char *)info_p, namelen)) { 4738 mount_fscid = fscid; 4739 break; 4740 } 4741 } 4742 4743 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 4744 if (mount_fscid != (u32)-1) { 4745 fsc->client->monc.fs_cluster_id = mount_fscid; 4746 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 4747 0, true); 4748 ceph_monc_renew_subs(&fsc->client->monc); 4749 } else { 4750 err = -ENOENT; 4751 goto err_out; 4752 } 4753 return; 4754 4755 bad: 4756 pr_err("error decoding fsmap\n"); 4757 err_out: 4758 mutex_lock(&mdsc->mutex); 4759 mdsc->mdsmap_err = err; 4760 __wake_requests(mdsc, &mdsc->waiting_for_map); 4761 mutex_unlock(&mdsc->mutex); 4762 } 4763 4764 /* 4765 * handle mds map update. 4766 */ 4767 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4768 { 4769 u32 epoch; 4770 u32 maplen; 4771 void *p = msg->front.iov_base; 4772 void *end = p + msg->front.iov_len; 4773 struct ceph_mdsmap *newmap, *oldmap; 4774 struct ceph_fsid fsid; 4775 int err = -EINVAL; 4776 4777 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 4778 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 4779 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 4780 return; 4781 epoch = ceph_decode_32(&p); 4782 maplen = ceph_decode_32(&p); 4783 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 4784 4785 /* do we need it? */ 4786 mutex_lock(&mdsc->mutex); 4787 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 4788 dout("handle_map epoch %u <= our %u\n", 4789 epoch, mdsc->mdsmap->m_epoch); 4790 mutex_unlock(&mdsc->mutex); 4791 return; 4792 } 4793 4794 newmap = ceph_mdsmap_decode(&p, end); 4795 if (IS_ERR(newmap)) { 4796 err = PTR_ERR(newmap); 4797 goto bad_unlock; 4798 } 4799 4800 /* swap into place */ 4801 if (mdsc->mdsmap) { 4802 oldmap = mdsc->mdsmap; 4803 mdsc->mdsmap = newmap; 4804 check_new_map(mdsc, newmap, oldmap); 4805 ceph_mdsmap_destroy(oldmap); 4806 } else { 4807 mdsc->mdsmap = newmap; /* first mds map */ 4808 } 4809 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 4810 MAX_LFS_FILESIZE); 4811 4812 __wake_requests(mdsc, &mdsc->waiting_for_map); 4813 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 4814 mdsc->mdsmap->m_epoch); 4815 4816 mutex_unlock(&mdsc->mutex); 4817 schedule_delayed(mdsc); 4818 return; 4819 4820 bad_unlock: 4821 mutex_unlock(&mdsc->mutex); 4822 bad: 4823 pr_err("error decoding mdsmap %d\n", err); 4824 return; 4825 } 4826 4827 static struct ceph_connection *con_get(struct ceph_connection *con) 4828 { 4829 struct ceph_mds_session *s = con->private; 4830 4831 if (ceph_get_mds_session(s)) 4832 return con; 4833 return NULL; 4834 } 4835 4836 static void con_put(struct ceph_connection *con) 4837 { 4838 struct ceph_mds_session *s = con->private; 4839 4840 ceph_put_mds_session(s); 4841 } 4842 4843 /* 4844 * if the client is unresponsive for long enough, the mds will kill 4845 * the session entirely. 4846 */ 4847 static void peer_reset(struct ceph_connection *con) 4848 { 4849 struct ceph_mds_session *s = con->private; 4850 struct ceph_mds_client *mdsc = s->s_mdsc; 4851 4852 pr_warn("mds%d closed our session\n", s->s_mds); 4853 send_mds_reconnect(mdsc, s); 4854 } 4855 4856 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 4857 { 4858 struct ceph_mds_session *s = con->private; 4859 struct ceph_mds_client *mdsc = s->s_mdsc; 4860 int type = le16_to_cpu(msg->hdr.type); 4861 4862 mutex_lock(&mdsc->mutex); 4863 if (__verify_registered_session(mdsc, s) < 0) { 4864 mutex_unlock(&mdsc->mutex); 4865 goto out; 4866 } 4867 mutex_unlock(&mdsc->mutex); 4868 4869 switch (type) { 4870 case CEPH_MSG_MDS_MAP: 4871 ceph_mdsc_handle_mdsmap(mdsc, msg); 4872 break; 4873 case CEPH_MSG_FS_MAP_USER: 4874 ceph_mdsc_handle_fsmap(mdsc, msg); 4875 break; 4876 case CEPH_MSG_CLIENT_SESSION: 4877 handle_session(s, msg); 4878 break; 4879 case CEPH_MSG_CLIENT_REPLY: 4880 handle_reply(s, msg); 4881 break; 4882 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 4883 handle_forward(mdsc, s, msg); 4884 break; 4885 case CEPH_MSG_CLIENT_CAPS: 4886 ceph_handle_caps(s, msg); 4887 break; 4888 case CEPH_MSG_CLIENT_SNAP: 4889 ceph_handle_snap(mdsc, s, msg); 4890 break; 4891 case CEPH_MSG_CLIENT_LEASE: 4892 handle_lease(mdsc, s, msg); 4893 break; 4894 case CEPH_MSG_CLIENT_QUOTA: 4895 ceph_handle_quota(mdsc, s, msg); 4896 break; 4897 4898 default: 4899 pr_err("received unknown message type %d %s\n", type, 4900 ceph_msg_type_name(type)); 4901 } 4902 out: 4903 ceph_msg_put(msg); 4904 } 4905 4906 /* 4907 * authentication 4908 */ 4909 4910 /* 4911 * Note: returned pointer is the address of a structure that's 4912 * managed separately. Caller must *not* attempt to free it. 4913 */ 4914 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 4915 int *proto, int force_new) 4916 { 4917 struct ceph_mds_session *s = con->private; 4918 struct ceph_mds_client *mdsc = s->s_mdsc; 4919 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4920 struct ceph_auth_handshake *auth = &s->s_auth; 4921 4922 if (force_new && auth->authorizer) { 4923 ceph_auth_destroy_authorizer(auth->authorizer); 4924 auth->authorizer = NULL; 4925 } 4926 if (!auth->authorizer) { 4927 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 4928 auth); 4929 if (ret) 4930 return ERR_PTR(ret); 4931 } else { 4932 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 4933 auth); 4934 if (ret) 4935 return ERR_PTR(ret); 4936 } 4937 *proto = ac->protocol; 4938 4939 return auth; 4940 } 4941 4942 static int add_authorizer_challenge(struct ceph_connection *con, 4943 void *challenge_buf, int challenge_buf_len) 4944 { 4945 struct ceph_mds_session *s = con->private; 4946 struct ceph_mds_client *mdsc = s->s_mdsc; 4947 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4948 4949 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 4950 challenge_buf, challenge_buf_len); 4951 } 4952 4953 static int verify_authorizer_reply(struct ceph_connection *con) 4954 { 4955 struct ceph_mds_session *s = con->private; 4956 struct ceph_mds_client *mdsc = s->s_mdsc; 4957 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4958 4959 return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer); 4960 } 4961 4962 static int invalidate_authorizer(struct ceph_connection *con) 4963 { 4964 struct ceph_mds_session *s = con->private; 4965 struct ceph_mds_client *mdsc = s->s_mdsc; 4966 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 4967 4968 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 4969 4970 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 4971 } 4972 4973 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 4974 struct ceph_msg_header *hdr, int *skip) 4975 { 4976 struct ceph_msg *msg; 4977 int type = (int) le16_to_cpu(hdr->type); 4978 int front_len = (int) le32_to_cpu(hdr->front_len); 4979 4980 if (con->in_msg) 4981 return con->in_msg; 4982 4983 *skip = 0; 4984 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 4985 if (!msg) { 4986 pr_err("unable to allocate msg type %d len %d\n", 4987 type, front_len); 4988 return NULL; 4989 } 4990 4991 return msg; 4992 } 4993 4994 static int mds_sign_message(struct ceph_msg *msg) 4995 { 4996 struct ceph_mds_session *s = msg->con->private; 4997 struct ceph_auth_handshake *auth = &s->s_auth; 4998 4999 return ceph_auth_sign_message(auth, msg); 5000 } 5001 5002 static int mds_check_message_signature(struct ceph_msg *msg) 5003 { 5004 struct ceph_mds_session *s = msg->con->private; 5005 struct ceph_auth_handshake *auth = &s->s_auth; 5006 5007 return ceph_auth_check_message_signature(auth, msg); 5008 } 5009 5010 static const struct ceph_connection_operations mds_con_ops = { 5011 .get = con_get, 5012 .put = con_put, 5013 .dispatch = dispatch, 5014 .get_authorizer = get_authorizer, 5015 .add_authorizer_challenge = add_authorizer_challenge, 5016 .verify_authorizer_reply = verify_authorizer_reply, 5017 .invalidate_authorizer = invalidate_authorizer, 5018 .peer_reset = peer_reset, 5019 .alloc_msg = mds_alloc_msg, 5020 .sign_message = mds_sign_message, 5021 .check_message_signature = mds_check_message_signature, 5022 }; 5023 5024 /* eof */ 5025