1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 15 #include "super.h" 16 #include "mds_client.h" 17 18 #include <linux/ceph/ceph_features.h> 19 #include <linux/ceph/messenger.h> 20 #include <linux/ceph/decode.h> 21 #include <linux/ceph/pagelist.h> 22 #include <linux/ceph/auth.h> 23 #include <linux/ceph/debugfs.h> 24 25 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 26 27 /* 28 * A cluster of MDS (metadata server) daemons is responsible for 29 * managing the file system namespace (the directory hierarchy and 30 * inodes) and for coordinating shared access to storage. Metadata is 31 * partitioning hierarchically across a number of servers, and that 32 * partition varies over time as the cluster adjusts the distribution 33 * in order to balance load. 34 * 35 * The MDS client is primarily responsible to managing synchronous 36 * metadata requests for operations like open, unlink, and so forth. 37 * If there is a MDS failure, we find out about it when we (possibly 38 * request and) receive a new MDS map, and can resubmit affected 39 * requests. 40 * 41 * For the most part, though, we take advantage of a lossless 42 * communications channel to the MDS, and do not need to worry about 43 * timing out or resubmitting requests. 44 * 45 * We maintain a stateful "session" with each MDS we interact with. 46 * Within each session, we sent periodic heartbeat messages to ensure 47 * any capabilities or leases we have been issues remain valid. If 48 * the session times out and goes stale, our leases and capabilities 49 * are no longer valid. 50 */ 51 52 struct ceph_reconnect_state { 53 struct ceph_mds_session *session; 54 int nr_caps, nr_realms; 55 struct ceph_pagelist *pagelist; 56 unsigned msg_version; 57 bool allow_multi; 58 }; 59 60 static void __wake_requests(struct ceph_mds_client *mdsc, 61 struct list_head *head); 62 static void ceph_cap_release_work(struct work_struct *work); 63 static void ceph_cap_reclaim_work(struct work_struct *work); 64 65 static const struct ceph_connection_operations mds_con_ops; 66 67 68 /* 69 * mds reply parsing 70 */ 71 72 static int parse_reply_info_quota(void **p, void *end, 73 struct ceph_mds_reply_info_in *info) 74 { 75 u8 struct_v, struct_compat; 76 u32 struct_len; 77 78 ceph_decode_8_safe(p, end, struct_v, bad); 79 ceph_decode_8_safe(p, end, struct_compat, bad); 80 /* struct_v is expected to be >= 1. we only 81 * understand encoding with struct_compat == 1. */ 82 if (!struct_v || struct_compat != 1) 83 goto bad; 84 ceph_decode_32_safe(p, end, struct_len, bad); 85 ceph_decode_need(p, end, struct_len, bad); 86 end = *p + struct_len; 87 ceph_decode_64_safe(p, end, info->max_bytes, bad); 88 ceph_decode_64_safe(p, end, info->max_files, bad); 89 *p = end; 90 return 0; 91 bad: 92 return -EIO; 93 } 94 95 /* 96 * parse individual inode info 97 */ 98 static int parse_reply_info_in(void **p, void *end, 99 struct ceph_mds_reply_info_in *info, 100 u64 features) 101 { 102 int err = 0; 103 u8 struct_v = 0; 104 105 if (features == (u64)-1) { 106 u32 struct_len; 107 u8 struct_compat; 108 ceph_decode_8_safe(p, end, struct_v, bad); 109 ceph_decode_8_safe(p, end, struct_compat, bad); 110 /* struct_v is expected to be >= 1. we only understand 111 * encoding with struct_compat == 1. */ 112 if (!struct_v || struct_compat != 1) 113 goto bad; 114 ceph_decode_32_safe(p, end, struct_len, bad); 115 ceph_decode_need(p, end, struct_len, bad); 116 end = *p + struct_len; 117 } 118 119 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 120 info->in = *p; 121 *p += sizeof(struct ceph_mds_reply_inode) + 122 sizeof(*info->in->fragtree.splits) * 123 le32_to_cpu(info->in->fragtree.nsplits); 124 125 ceph_decode_32_safe(p, end, info->symlink_len, bad); 126 ceph_decode_need(p, end, info->symlink_len, bad); 127 info->symlink = *p; 128 *p += info->symlink_len; 129 130 ceph_decode_copy_safe(p, end, &info->dir_layout, 131 sizeof(info->dir_layout), bad); 132 ceph_decode_32_safe(p, end, info->xattr_len, bad); 133 ceph_decode_need(p, end, info->xattr_len, bad); 134 info->xattr_data = *p; 135 *p += info->xattr_len; 136 137 if (features == (u64)-1) { 138 /* inline data */ 139 ceph_decode_64_safe(p, end, info->inline_version, bad); 140 ceph_decode_32_safe(p, end, info->inline_len, bad); 141 ceph_decode_need(p, end, info->inline_len, bad); 142 info->inline_data = *p; 143 *p += info->inline_len; 144 /* quota */ 145 err = parse_reply_info_quota(p, end, info); 146 if (err < 0) 147 goto out_bad; 148 /* pool namespace */ 149 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 150 if (info->pool_ns_len > 0) { 151 ceph_decode_need(p, end, info->pool_ns_len, bad); 152 info->pool_ns_data = *p; 153 *p += info->pool_ns_len; 154 } 155 156 /* btime */ 157 ceph_decode_need(p, end, sizeof(info->btime), bad); 158 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 159 160 /* change attribute */ 161 ceph_decode_64_safe(p, end, info->change_attr, bad); 162 163 /* dir pin */ 164 if (struct_v >= 2) { 165 ceph_decode_32_safe(p, end, info->dir_pin, bad); 166 } else { 167 info->dir_pin = -ENODATA; 168 } 169 170 /* snapshot birth time, remains zero for v<=2 */ 171 if (struct_v >= 3) { 172 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 173 ceph_decode_copy(p, &info->snap_btime, 174 sizeof(info->snap_btime)); 175 } else { 176 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 177 } 178 179 /* snapshot count, remains zero for v<=3 */ 180 if (struct_v >= 4) { 181 ceph_decode_64_safe(p, end, info->rsnaps, bad); 182 } else { 183 info->rsnaps = 0; 184 } 185 186 *p = end; 187 } else { 188 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 189 ceph_decode_64_safe(p, end, info->inline_version, bad); 190 ceph_decode_32_safe(p, end, info->inline_len, bad); 191 ceph_decode_need(p, end, info->inline_len, bad); 192 info->inline_data = *p; 193 *p += info->inline_len; 194 } else 195 info->inline_version = CEPH_INLINE_NONE; 196 197 if (features & CEPH_FEATURE_MDS_QUOTA) { 198 err = parse_reply_info_quota(p, end, info); 199 if (err < 0) 200 goto out_bad; 201 } else { 202 info->max_bytes = 0; 203 info->max_files = 0; 204 } 205 206 info->pool_ns_len = 0; 207 info->pool_ns_data = NULL; 208 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 209 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 210 if (info->pool_ns_len > 0) { 211 ceph_decode_need(p, end, info->pool_ns_len, bad); 212 info->pool_ns_data = *p; 213 *p += info->pool_ns_len; 214 } 215 } 216 217 if (features & CEPH_FEATURE_FS_BTIME) { 218 ceph_decode_need(p, end, sizeof(info->btime), bad); 219 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 220 ceph_decode_64_safe(p, end, info->change_attr, bad); 221 } 222 223 info->dir_pin = -ENODATA; 224 /* info->snap_btime and info->rsnaps remain zero */ 225 } 226 return 0; 227 bad: 228 err = -EIO; 229 out_bad: 230 return err; 231 } 232 233 static int parse_reply_info_dir(void **p, void *end, 234 struct ceph_mds_reply_dirfrag **dirfrag, 235 u64 features) 236 { 237 if (features == (u64)-1) { 238 u8 struct_v, struct_compat; 239 u32 struct_len; 240 ceph_decode_8_safe(p, end, struct_v, bad); 241 ceph_decode_8_safe(p, end, struct_compat, bad); 242 /* struct_v is expected to be >= 1. we only understand 243 * encoding whose struct_compat == 1. */ 244 if (!struct_v || struct_compat != 1) 245 goto bad; 246 ceph_decode_32_safe(p, end, struct_len, bad); 247 ceph_decode_need(p, end, struct_len, bad); 248 end = *p + struct_len; 249 } 250 251 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 252 *dirfrag = *p; 253 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 254 if (unlikely(*p > end)) 255 goto bad; 256 if (features == (u64)-1) 257 *p = end; 258 return 0; 259 bad: 260 return -EIO; 261 } 262 263 static int parse_reply_info_lease(void **p, void *end, 264 struct ceph_mds_reply_lease **lease, 265 u64 features) 266 { 267 if (features == (u64)-1) { 268 u8 struct_v, struct_compat; 269 u32 struct_len; 270 ceph_decode_8_safe(p, end, struct_v, bad); 271 ceph_decode_8_safe(p, end, struct_compat, bad); 272 /* struct_v is expected to be >= 1. we only understand 273 * encoding whose struct_compat == 1. */ 274 if (!struct_v || struct_compat != 1) 275 goto bad; 276 ceph_decode_32_safe(p, end, struct_len, bad); 277 ceph_decode_need(p, end, struct_len, bad); 278 end = *p + struct_len; 279 } 280 281 ceph_decode_need(p, end, sizeof(**lease), bad); 282 *lease = *p; 283 *p += sizeof(**lease); 284 if (features == (u64)-1) 285 *p = end; 286 return 0; 287 bad: 288 return -EIO; 289 } 290 291 /* 292 * parse a normal reply, which may contain a (dir+)dentry and/or a 293 * target inode. 294 */ 295 static int parse_reply_info_trace(void **p, void *end, 296 struct ceph_mds_reply_info_parsed *info, 297 u64 features) 298 { 299 int err; 300 301 if (info->head->is_dentry) { 302 err = parse_reply_info_in(p, end, &info->diri, features); 303 if (err < 0) 304 goto out_bad; 305 306 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 307 if (err < 0) 308 goto out_bad; 309 310 ceph_decode_32_safe(p, end, info->dname_len, bad); 311 ceph_decode_need(p, end, info->dname_len, bad); 312 info->dname = *p; 313 *p += info->dname_len; 314 315 err = parse_reply_info_lease(p, end, &info->dlease, features); 316 if (err < 0) 317 goto out_bad; 318 } 319 320 if (info->head->is_target) { 321 err = parse_reply_info_in(p, end, &info->targeti, features); 322 if (err < 0) 323 goto out_bad; 324 } 325 326 if (unlikely(*p != end)) 327 goto bad; 328 return 0; 329 330 bad: 331 err = -EIO; 332 out_bad: 333 pr_err("problem parsing mds trace %d\n", err); 334 return err; 335 } 336 337 /* 338 * parse readdir results 339 */ 340 static int parse_reply_info_readdir(void **p, void *end, 341 struct ceph_mds_reply_info_parsed *info, 342 u64 features) 343 { 344 u32 num, i = 0; 345 int err; 346 347 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 348 if (err < 0) 349 goto out_bad; 350 351 ceph_decode_need(p, end, sizeof(num) + 2, bad); 352 num = ceph_decode_32(p); 353 { 354 u16 flags = ceph_decode_16(p); 355 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 356 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 357 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 358 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 359 } 360 if (num == 0) 361 goto done; 362 363 BUG_ON(!info->dir_entries); 364 if ((unsigned long)(info->dir_entries + num) > 365 (unsigned long)info->dir_entries + info->dir_buf_size) { 366 pr_err("dir contents are larger than expected\n"); 367 WARN_ON(1); 368 goto bad; 369 } 370 371 info->dir_nr = num; 372 while (num) { 373 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 374 /* dentry */ 375 ceph_decode_32_safe(p, end, rde->name_len, bad); 376 ceph_decode_need(p, end, rde->name_len, bad); 377 rde->name = *p; 378 *p += rde->name_len; 379 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 380 381 /* dentry lease */ 382 err = parse_reply_info_lease(p, end, &rde->lease, features); 383 if (err) 384 goto out_bad; 385 /* inode */ 386 err = parse_reply_info_in(p, end, &rde->inode, features); 387 if (err < 0) 388 goto out_bad; 389 /* ceph_readdir_prepopulate() will update it */ 390 rde->offset = 0; 391 i++; 392 num--; 393 } 394 395 done: 396 /* Skip over any unrecognized fields */ 397 *p = end; 398 return 0; 399 400 bad: 401 err = -EIO; 402 out_bad: 403 pr_err("problem parsing dir contents %d\n", err); 404 return err; 405 } 406 407 /* 408 * parse fcntl F_GETLK results 409 */ 410 static int parse_reply_info_filelock(void **p, void *end, 411 struct ceph_mds_reply_info_parsed *info, 412 u64 features) 413 { 414 if (*p + sizeof(*info->filelock_reply) > end) 415 goto bad; 416 417 info->filelock_reply = *p; 418 419 /* Skip over any unrecognized fields */ 420 *p = end; 421 return 0; 422 bad: 423 return -EIO; 424 } 425 426 427 #if BITS_PER_LONG == 64 428 429 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 430 431 static int ceph_parse_deleg_inos(void **p, void *end, 432 struct ceph_mds_session *s) 433 { 434 u32 sets; 435 436 ceph_decode_32_safe(p, end, sets, bad); 437 dout("got %u sets of delegated inodes\n", sets); 438 while (sets--) { 439 u64 start, len, ino; 440 441 ceph_decode_64_safe(p, end, start, bad); 442 ceph_decode_64_safe(p, end, len, bad); 443 444 /* Don't accept a delegation of system inodes */ 445 if (start < CEPH_INO_SYSTEM_BASE) { 446 pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n", 447 start, len); 448 continue; 449 } 450 while (len--) { 451 int err = xa_insert(&s->s_delegated_inos, ino = start++, 452 DELEGATED_INO_AVAILABLE, 453 GFP_KERNEL); 454 if (!err) { 455 dout("added delegated inode 0x%llx\n", 456 start - 1); 457 } else if (err == -EBUSY) { 458 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n", 459 start - 1); 460 } else { 461 return err; 462 } 463 } 464 } 465 return 0; 466 bad: 467 return -EIO; 468 } 469 470 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 471 { 472 unsigned long ino; 473 void *val; 474 475 xa_for_each(&s->s_delegated_inos, ino, val) { 476 val = xa_erase(&s->s_delegated_inos, ino); 477 if (val == DELEGATED_INO_AVAILABLE) 478 return ino; 479 } 480 return 0; 481 } 482 483 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 484 { 485 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 486 GFP_KERNEL); 487 } 488 #else /* BITS_PER_LONG == 64 */ 489 /* 490 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 491 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 492 * and bottom words? 493 */ 494 static int ceph_parse_deleg_inos(void **p, void *end, 495 struct ceph_mds_session *s) 496 { 497 u32 sets; 498 499 ceph_decode_32_safe(p, end, sets, bad); 500 if (sets) 501 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 502 return 0; 503 bad: 504 return -EIO; 505 } 506 507 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 508 { 509 return 0; 510 } 511 512 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 513 { 514 return 0; 515 } 516 #endif /* BITS_PER_LONG == 64 */ 517 518 /* 519 * parse create results 520 */ 521 static int parse_reply_info_create(void **p, void *end, 522 struct ceph_mds_reply_info_parsed *info, 523 u64 features, struct ceph_mds_session *s) 524 { 525 int ret; 526 527 if (features == (u64)-1 || 528 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 529 if (*p == end) { 530 /* Malformed reply? */ 531 info->has_create_ino = false; 532 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 533 info->has_create_ino = true; 534 /* struct_v, struct_compat, and len */ 535 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad); 536 ceph_decode_64_safe(p, end, info->ino, bad); 537 ret = ceph_parse_deleg_inos(p, end, s); 538 if (ret) 539 return ret; 540 } else { 541 /* legacy */ 542 ceph_decode_64_safe(p, end, info->ino, bad); 543 info->has_create_ino = true; 544 } 545 } else { 546 if (*p != end) 547 goto bad; 548 } 549 550 /* Skip over any unrecognized fields */ 551 *p = end; 552 return 0; 553 bad: 554 return -EIO; 555 } 556 557 /* 558 * parse extra results 559 */ 560 static int parse_reply_info_extra(void **p, void *end, 561 struct ceph_mds_reply_info_parsed *info, 562 u64 features, struct ceph_mds_session *s) 563 { 564 u32 op = le32_to_cpu(info->head->op); 565 566 if (op == CEPH_MDS_OP_GETFILELOCK) 567 return parse_reply_info_filelock(p, end, info, features); 568 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 569 return parse_reply_info_readdir(p, end, info, features); 570 else if (op == CEPH_MDS_OP_CREATE) 571 return parse_reply_info_create(p, end, info, features, s); 572 else 573 return -EIO; 574 } 575 576 /* 577 * parse entire mds reply 578 */ 579 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 580 struct ceph_mds_reply_info_parsed *info, 581 u64 features) 582 { 583 void *p, *end; 584 u32 len; 585 int err; 586 587 info->head = msg->front.iov_base; 588 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 589 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 590 591 /* trace */ 592 ceph_decode_32_safe(&p, end, len, bad); 593 if (len > 0) { 594 ceph_decode_need(&p, end, len, bad); 595 err = parse_reply_info_trace(&p, p+len, info, features); 596 if (err < 0) 597 goto out_bad; 598 } 599 600 /* extra */ 601 ceph_decode_32_safe(&p, end, len, bad); 602 if (len > 0) { 603 ceph_decode_need(&p, end, len, bad); 604 err = parse_reply_info_extra(&p, p+len, info, features, s); 605 if (err < 0) 606 goto out_bad; 607 } 608 609 /* snap blob */ 610 ceph_decode_32_safe(&p, end, len, bad); 611 info->snapblob_len = len; 612 info->snapblob = p; 613 p += len; 614 615 if (p != end) 616 goto bad; 617 return 0; 618 619 bad: 620 err = -EIO; 621 out_bad: 622 pr_err("mds parse_reply err %d\n", err); 623 return err; 624 } 625 626 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 627 { 628 if (!info->dir_entries) 629 return; 630 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 631 } 632 633 634 /* 635 * sessions 636 */ 637 const char *ceph_session_state_name(int s) 638 { 639 switch (s) { 640 case CEPH_MDS_SESSION_NEW: return "new"; 641 case CEPH_MDS_SESSION_OPENING: return "opening"; 642 case CEPH_MDS_SESSION_OPEN: return "open"; 643 case CEPH_MDS_SESSION_HUNG: return "hung"; 644 case CEPH_MDS_SESSION_CLOSING: return "closing"; 645 case CEPH_MDS_SESSION_CLOSED: return "closed"; 646 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 647 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 648 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 649 default: return "???"; 650 } 651 } 652 653 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 654 { 655 if (refcount_inc_not_zero(&s->s_ref)) { 656 dout("mdsc get_session %p %d -> %d\n", s, 657 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref)); 658 return s; 659 } else { 660 dout("mdsc get_session %p 0 -- FAIL\n", s); 661 return NULL; 662 } 663 } 664 665 void ceph_put_mds_session(struct ceph_mds_session *s) 666 { 667 if (IS_ERR_OR_NULL(s)) 668 return; 669 670 dout("mdsc put_session %p %d -> %d\n", s, 671 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1); 672 if (refcount_dec_and_test(&s->s_ref)) { 673 if (s->s_auth.authorizer) 674 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 675 WARN_ON(mutex_is_locked(&s->s_mutex)); 676 xa_destroy(&s->s_delegated_inos); 677 kfree(s); 678 } 679 } 680 681 /* 682 * called under mdsc->mutex 683 */ 684 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 685 int mds) 686 { 687 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 688 return NULL; 689 return ceph_get_mds_session(mdsc->sessions[mds]); 690 } 691 692 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 693 { 694 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 695 return false; 696 else 697 return true; 698 } 699 700 static int __verify_registered_session(struct ceph_mds_client *mdsc, 701 struct ceph_mds_session *s) 702 { 703 if (s->s_mds >= mdsc->max_sessions || 704 mdsc->sessions[s->s_mds] != s) 705 return -ENOENT; 706 return 0; 707 } 708 709 /* 710 * create+register a new session for given mds. 711 * called under mdsc->mutex. 712 */ 713 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 714 int mds) 715 { 716 struct ceph_mds_session *s; 717 718 if (mds >= mdsc->mdsmap->possible_max_rank) 719 return ERR_PTR(-EINVAL); 720 721 s = kzalloc(sizeof(*s), GFP_NOFS); 722 if (!s) 723 return ERR_PTR(-ENOMEM); 724 725 if (mds >= mdsc->max_sessions) { 726 int newmax = 1 << get_count_order(mds + 1); 727 struct ceph_mds_session **sa; 728 729 dout("%s: realloc to %d\n", __func__, newmax); 730 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 731 if (!sa) 732 goto fail_realloc; 733 if (mdsc->sessions) { 734 memcpy(sa, mdsc->sessions, 735 mdsc->max_sessions * sizeof(void *)); 736 kfree(mdsc->sessions); 737 } 738 mdsc->sessions = sa; 739 mdsc->max_sessions = newmax; 740 } 741 742 dout("%s: mds%d\n", __func__, mds); 743 s->s_mdsc = mdsc; 744 s->s_mds = mds; 745 s->s_state = CEPH_MDS_SESSION_NEW; 746 s->s_ttl = 0; 747 s->s_seq = 0; 748 mutex_init(&s->s_mutex); 749 750 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 751 752 atomic_set(&s->s_cap_gen, 1); 753 s->s_cap_ttl = jiffies - 1; 754 755 spin_lock_init(&s->s_cap_lock); 756 s->s_renew_requested = 0; 757 s->s_renew_seq = 0; 758 INIT_LIST_HEAD(&s->s_caps); 759 s->s_nr_caps = 0; 760 refcount_set(&s->s_ref, 1); 761 INIT_LIST_HEAD(&s->s_waiting); 762 INIT_LIST_HEAD(&s->s_unsafe); 763 xa_init(&s->s_delegated_inos); 764 s->s_num_cap_releases = 0; 765 s->s_cap_reconnect = 0; 766 s->s_cap_iterator = NULL; 767 INIT_LIST_HEAD(&s->s_cap_releases); 768 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 769 770 INIT_LIST_HEAD(&s->s_cap_dirty); 771 INIT_LIST_HEAD(&s->s_cap_flushing); 772 773 mdsc->sessions[mds] = s; 774 atomic_inc(&mdsc->num_sessions); 775 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 776 777 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 778 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 779 780 return s; 781 782 fail_realloc: 783 kfree(s); 784 return ERR_PTR(-ENOMEM); 785 } 786 787 /* 788 * called under mdsc->mutex 789 */ 790 static void __unregister_session(struct ceph_mds_client *mdsc, 791 struct ceph_mds_session *s) 792 { 793 dout("__unregister_session mds%d %p\n", s->s_mds, s); 794 BUG_ON(mdsc->sessions[s->s_mds] != s); 795 mdsc->sessions[s->s_mds] = NULL; 796 ceph_con_close(&s->s_con); 797 ceph_put_mds_session(s); 798 atomic_dec(&mdsc->num_sessions); 799 } 800 801 /* 802 * drop session refs in request. 803 * 804 * should be last request ref, or hold mdsc->mutex 805 */ 806 static void put_request_session(struct ceph_mds_request *req) 807 { 808 if (req->r_session) { 809 ceph_put_mds_session(req->r_session); 810 req->r_session = NULL; 811 } 812 } 813 814 void ceph_mdsc_release_request(struct kref *kref) 815 { 816 struct ceph_mds_request *req = container_of(kref, 817 struct ceph_mds_request, 818 r_kref); 819 ceph_mdsc_release_dir_caps_no_check(req); 820 destroy_reply_info(&req->r_reply_info); 821 if (req->r_request) 822 ceph_msg_put(req->r_request); 823 if (req->r_reply) 824 ceph_msg_put(req->r_reply); 825 if (req->r_inode) { 826 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 827 iput(req->r_inode); 828 } 829 if (req->r_parent) { 830 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 831 iput(req->r_parent); 832 } 833 iput(req->r_target_inode); 834 if (req->r_dentry) 835 dput(req->r_dentry); 836 if (req->r_old_dentry) 837 dput(req->r_old_dentry); 838 if (req->r_old_dentry_dir) { 839 /* 840 * track (and drop pins for) r_old_dentry_dir 841 * separately, since r_old_dentry's d_parent may have 842 * changed between the dir mutex being dropped and 843 * this request being freed. 844 */ 845 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 846 CEPH_CAP_PIN); 847 iput(req->r_old_dentry_dir); 848 } 849 kfree(req->r_path1); 850 kfree(req->r_path2); 851 put_cred(req->r_cred); 852 if (req->r_pagelist) 853 ceph_pagelist_release(req->r_pagelist); 854 put_request_session(req); 855 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 856 WARN_ON_ONCE(!list_empty(&req->r_wait)); 857 kmem_cache_free(ceph_mds_request_cachep, req); 858 } 859 860 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 861 862 /* 863 * lookup session, bump ref if found. 864 * 865 * called under mdsc->mutex. 866 */ 867 static struct ceph_mds_request * 868 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 869 { 870 struct ceph_mds_request *req; 871 872 req = lookup_request(&mdsc->request_tree, tid); 873 if (req) 874 ceph_mdsc_get_request(req); 875 876 return req; 877 } 878 879 /* 880 * Register an in-flight request, and assign a tid. Link to directory 881 * are modifying (if any). 882 * 883 * Called under mdsc->mutex. 884 */ 885 static void __register_request(struct ceph_mds_client *mdsc, 886 struct ceph_mds_request *req, 887 struct inode *dir) 888 { 889 int ret = 0; 890 891 req->r_tid = ++mdsc->last_tid; 892 if (req->r_num_caps) { 893 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 894 req->r_num_caps); 895 if (ret < 0) { 896 pr_err("__register_request %p " 897 "failed to reserve caps: %d\n", req, ret); 898 /* set req->r_err to fail early from __do_request */ 899 req->r_err = ret; 900 return; 901 } 902 } 903 dout("__register_request %p tid %lld\n", req, req->r_tid); 904 ceph_mdsc_get_request(req); 905 insert_request(&mdsc->request_tree, req); 906 907 req->r_cred = get_current_cred(); 908 909 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 910 mdsc->oldest_tid = req->r_tid; 911 912 if (dir) { 913 struct ceph_inode_info *ci = ceph_inode(dir); 914 915 ihold(dir); 916 req->r_unsafe_dir = dir; 917 spin_lock(&ci->i_unsafe_lock); 918 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 919 spin_unlock(&ci->i_unsafe_lock); 920 } 921 } 922 923 static void __unregister_request(struct ceph_mds_client *mdsc, 924 struct ceph_mds_request *req) 925 { 926 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 927 928 /* Never leave an unregistered request on an unsafe list! */ 929 list_del_init(&req->r_unsafe_item); 930 931 if (req->r_tid == mdsc->oldest_tid) { 932 struct rb_node *p = rb_next(&req->r_node); 933 mdsc->oldest_tid = 0; 934 while (p) { 935 struct ceph_mds_request *next_req = 936 rb_entry(p, struct ceph_mds_request, r_node); 937 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 938 mdsc->oldest_tid = next_req->r_tid; 939 break; 940 } 941 p = rb_next(p); 942 } 943 } 944 945 erase_request(&mdsc->request_tree, req); 946 947 if (req->r_unsafe_dir) { 948 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 949 spin_lock(&ci->i_unsafe_lock); 950 list_del_init(&req->r_unsafe_dir_item); 951 spin_unlock(&ci->i_unsafe_lock); 952 } 953 if (req->r_target_inode && 954 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 955 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 956 spin_lock(&ci->i_unsafe_lock); 957 list_del_init(&req->r_unsafe_target_item); 958 spin_unlock(&ci->i_unsafe_lock); 959 } 960 961 if (req->r_unsafe_dir) { 962 iput(req->r_unsafe_dir); 963 req->r_unsafe_dir = NULL; 964 } 965 966 complete_all(&req->r_safe_completion); 967 968 ceph_mdsc_put_request(req); 969 } 970 971 /* 972 * Walk back up the dentry tree until we hit a dentry representing a 973 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 974 * when calling this) to ensure that the objects won't disappear while we're 975 * working with them. Once we hit a candidate dentry, we attempt to take a 976 * reference to it, and return that as the result. 977 */ 978 static struct inode *get_nonsnap_parent(struct dentry *dentry) 979 { 980 struct inode *inode = NULL; 981 982 while (dentry && !IS_ROOT(dentry)) { 983 inode = d_inode_rcu(dentry); 984 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 985 break; 986 dentry = dentry->d_parent; 987 } 988 if (inode) 989 inode = igrab(inode); 990 return inode; 991 } 992 993 /* 994 * Choose mds to send request to next. If there is a hint set in the 995 * request (e.g., due to a prior forward hint from the mds), use that. 996 * Otherwise, consult frag tree and/or caps to identify the 997 * appropriate mds. If all else fails, choose randomly. 998 * 999 * Called under mdsc->mutex. 1000 */ 1001 static int __choose_mds(struct ceph_mds_client *mdsc, 1002 struct ceph_mds_request *req, 1003 bool *random) 1004 { 1005 struct inode *inode; 1006 struct ceph_inode_info *ci; 1007 struct ceph_cap *cap; 1008 int mode = req->r_direct_mode; 1009 int mds = -1; 1010 u32 hash = req->r_direct_hash; 1011 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1012 1013 if (random) 1014 *random = false; 1015 1016 /* 1017 * is there a specific mds we should try? ignore hint if we have 1018 * no session and the mds is not up (active or recovering). 1019 */ 1020 if (req->r_resend_mds >= 0 && 1021 (__have_session(mdsc, req->r_resend_mds) || 1022 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1023 dout("%s using resend_mds mds%d\n", __func__, 1024 req->r_resend_mds); 1025 return req->r_resend_mds; 1026 } 1027 1028 if (mode == USE_RANDOM_MDS) 1029 goto random; 1030 1031 inode = NULL; 1032 if (req->r_inode) { 1033 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1034 inode = req->r_inode; 1035 ihold(inode); 1036 } else { 1037 /* req->r_dentry is non-null for LSSNAP request */ 1038 rcu_read_lock(); 1039 inode = get_nonsnap_parent(req->r_dentry); 1040 rcu_read_unlock(); 1041 dout("%s using snapdir's parent %p\n", __func__, inode); 1042 } 1043 } else if (req->r_dentry) { 1044 /* ignore race with rename; old or new d_parent is okay */ 1045 struct dentry *parent; 1046 struct inode *dir; 1047 1048 rcu_read_lock(); 1049 parent = READ_ONCE(req->r_dentry->d_parent); 1050 dir = req->r_parent ? : d_inode_rcu(parent); 1051 1052 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1053 /* not this fs or parent went negative */ 1054 inode = d_inode(req->r_dentry); 1055 if (inode) 1056 ihold(inode); 1057 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1058 /* direct snapped/virtual snapdir requests 1059 * based on parent dir inode */ 1060 inode = get_nonsnap_parent(parent); 1061 dout("%s using nonsnap parent %p\n", __func__, inode); 1062 } else { 1063 /* dentry target */ 1064 inode = d_inode(req->r_dentry); 1065 if (!inode || mode == USE_AUTH_MDS) { 1066 /* dir + name */ 1067 inode = igrab(dir); 1068 hash = ceph_dentry_hash(dir, req->r_dentry); 1069 is_hash = true; 1070 } else { 1071 ihold(inode); 1072 } 1073 } 1074 rcu_read_unlock(); 1075 } 1076 1077 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, 1078 hash, mode); 1079 if (!inode) 1080 goto random; 1081 ci = ceph_inode(inode); 1082 1083 if (is_hash && S_ISDIR(inode->i_mode)) { 1084 struct ceph_inode_frag frag; 1085 int found; 1086 1087 ceph_choose_frag(ci, hash, &frag, &found); 1088 if (found) { 1089 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1090 u8 r; 1091 1092 /* choose a random replica */ 1093 get_random_bytes(&r, 1); 1094 r %= frag.ndist; 1095 mds = frag.dist[r]; 1096 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", 1097 __func__, inode, ceph_vinop(inode), 1098 frag.frag, mds, (int)r, frag.ndist); 1099 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1100 CEPH_MDS_STATE_ACTIVE && 1101 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1102 goto out; 1103 } 1104 1105 /* since this file/dir wasn't known to be 1106 * replicated, then we want to look for the 1107 * authoritative mds. */ 1108 if (frag.mds >= 0) { 1109 /* choose auth mds */ 1110 mds = frag.mds; 1111 dout("%s %p %llx.%llx frag %u mds%d (auth)\n", 1112 __func__, inode, ceph_vinop(inode), 1113 frag.frag, mds); 1114 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1115 CEPH_MDS_STATE_ACTIVE) { 1116 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1117 mds)) 1118 goto out; 1119 } 1120 } 1121 mode = USE_AUTH_MDS; 1122 } 1123 } 1124 1125 spin_lock(&ci->i_ceph_lock); 1126 cap = NULL; 1127 if (mode == USE_AUTH_MDS) 1128 cap = ci->i_auth_cap; 1129 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1130 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1131 if (!cap) { 1132 spin_unlock(&ci->i_ceph_lock); 1133 iput(inode); 1134 goto random; 1135 } 1136 mds = cap->session->s_mds; 1137 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, 1138 inode, ceph_vinop(inode), mds, 1139 cap == ci->i_auth_cap ? "auth " : "", cap); 1140 spin_unlock(&ci->i_ceph_lock); 1141 out: 1142 iput(inode); 1143 return mds; 1144 1145 random: 1146 if (random) 1147 *random = true; 1148 1149 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1150 dout("%s chose random mds%d\n", __func__, mds); 1151 return mds; 1152 } 1153 1154 1155 /* 1156 * session messages 1157 */ 1158 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 1159 { 1160 struct ceph_msg *msg; 1161 struct ceph_mds_session_head *h; 1162 1163 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1164 false); 1165 if (!msg) { 1166 pr_err("create_session_msg ENOMEM creating msg\n"); 1167 return NULL; 1168 } 1169 h = msg->front.iov_base; 1170 h->op = cpu_to_le32(op); 1171 h->seq = cpu_to_le64(seq); 1172 1173 return msg; 1174 } 1175 1176 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1177 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1178 static int encode_supported_features(void **p, void *end) 1179 { 1180 static const size_t count = ARRAY_SIZE(feature_bits); 1181 1182 if (count > 0) { 1183 size_t i; 1184 size_t size = FEATURE_BYTES(count); 1185 1186 if (WARN_ON_ONCE(*p + 4 + size > end)) 1187 return -ERANGE; 1188 1189 ceph_encode_32(p, size); 1190 memset(*p, 0, size); 1191 for (i = 0; i < count; i++) 1192 ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8); 1193 *p += size; 1194 } else { 1195 if (WARN_ON_ONCE(*p + 4 > end)) 1196 return -ERANGE; 1197 1198 ceph_encode_32(p, 0); 1199 } 1200 1201 return 0; 1202 } 1203 1204 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1205 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1206 static int encode_metric_spec(void **p, void *end) 1207 { 1208 static const size_t count = ARRAY_SIZE(metric_bits); 1209 1210 /* header */ 1211 if (WARN_ON_ONCE(*p + 2 > end)) 1212 return -ERANGE; 1213 1214 ceph_encode_8(p, 1); /* version */ 1215 ceph_encode_8(p, 1); /* compat */ 1216 1217 if (count > 0) { 1218 size_t i; 1219 size_t size = METRIC_BYTES(count); 1220 1221 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1222 return -ERANGE; 1223 1224 /* metric spec info length */ 1225 ceph_encode_32(p, 4 + size); 1226 1227 /* metric spec */ 1228 ceph_encode_32(p, size); 1229 memset(*p, 0, size); 1230 for (i = 0; i < count; i++) 1231 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1232 *p += size; 1233 } else { 1234 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1235 return -ERANGE; 1236 1237 /* metric spec info length */ 1238 ceph_encode_32(p, 4); 1239 /* metric spec */ 1240 ceph_encode_32(p, 0); 1241 } 1242 1243 return 0; 1244 } 1245 1246 /* 1247 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1248 * to include additional client metadata fields. 1249 */ 1250 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1251 { 1252 struct ceph_msg *msg; 1253 struct ceph_mds_session_head *h; 1254 int i; 1255 int extra_bytes = 0; 1256 int metadata_key_count = 0; 1257 struct ceph_options *opt = mdsc->fsc->client->options; 1258 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1259 size_t size, count; 1260 void *p, *end; 1261 int ret; 1262 1263 const char* metadata[][2] = { 1264 {"hostname", mdsc->nodename}, 1265 {"kernel_version", init_utsname()->release}, 1266 {"entity_id", opt->name ? : ""}, 1267 {"root", fsopt->server_path ? : "/"}, 1268 {NULL, NULL} 1269 }; 1270 1271 /* Calculate serialized length of metadata */ 1272 extra_bytes = 4; /* map length */ 1273 for (i = 0; metadata[i][0]; ++i) { 1274 extra_bytes += 8 + strlen(metadata[i][0]) + 1275 strlen(metadata[i][1]); 1276 metadata_key_count++; 1277 } 1278 1279 /* supported feature */ 1280 size = 0; 1281 count = ARRAY_SIZE(feature_bits); 1282 if (count > 0) 1283 size = FEATURE_BYTES(count); 1284 extra_bytes += 4 + size; 1285 1286 /* metric spec */ 1287 size = 0; 1288 count = ARRAY_SIZE(metric_bits); 1289 if (count > 0) 1290 size = METRIC_BYTES(count); 1291 extra_bytes += 2 + 4 + 4 + size; 1292 1293 /* Allocate the message */ 1294 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1295 GFP_NOFS, false); 1296 if (!msg) { 1297 pr_err("create_session_msg ENOMEM creating msg\n"); 1298 return ERR_PTR(-ENOMEM); 1299 } 1300 p = msg->front.iov_base; 1301 end = p + msg->front.iov_len; 1302 1303 h = p; 1304 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1305 h->seq = cpu_to_le64(seq); 1306 1307 /* 1308 * Serialize client metadata into waiting buffer space, using 1309 * the format that userspace expects for map<string, string> 1310 * 1311 * ClientSession messages with metadata are v4 1312 */ 1313 msg->hdr.version = cpu_to_le16(4); 1314 msg->hdr.compat_version = cpu_to_le16(1); 1315 1316 /* The write pointer, following the session_head structure */ 1317 p += sizeof(*h); 1318 1319 /* Number of entries in the map */ 1320 ceph_encode_32(&p, metadata_key_count); 1321 1322 /* Two length-prefixed strings for each entry in the map */ 1323 for (i = 0; metadata[i][0]; ++i) { 1324 size_t const key_len = strlen(metadata[i][0]); 1325 size_t const val_len = strlen(metadata[i][1]); 1326 1327 ceph_encode_32(&p, key_len); 1328 memcpy(p, metadata[i][0], key_len); 1329 p += key_len; 1330 ceph_encode_32(&p, val_len); 1331 memcpy(p, metadata[i][1], val_len); 1332 p += val_len; 1333 } 1334 1335 ret = encode_supported_features(&p, end); 1336 if (ret) { 1337 pr_err("encode_supported_features failed!\n"); 1338 ceph_msg_put(msg); 1339 return ERR_PTR(ret); 1340 } 1341 1342 ret = encode_metric_spec(&p, end); 1343 if (ret) { 1344 pr_err("encode_metric_spec failed!\n"); 1345 ceph_msg_put(msg); 1346 return ERR_PTR(ret); 1347 } 1348 1349 msg->front.iov_len = p - msg->front.iov_base; 1350 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1351 1352 return msg; 1353 } 1354 1355 /* 1356 * send session open request. 1357 * 1358 * called under mdsc->mutex 1359 */ 1360 static int __open_session(struct ceph_mds_client *mdsc, 1361 struct ceph_mds_session *session) 1362 { 1363 struct ceph_msg *msg; 1364 int mstate; 1365 int mds = session->s_mds; 1366 1367 /* wait for mds to go active? */ 1368 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1369 dout("open_session to mds%d (%s)\n", mds, 1370 ceph_mds_state_name(mstate)); 1371 session->s_state = CEPH_MDS_SESSION_OPENING; 1372 session->s_renew_requested = jiffies; 1373 1374 /* send connect message */ 1375 msg = create_session_open_msg(mdsc, session->s_seq); 1376 if (IS_ERR(msg)) 1377 return PTR_ERR(msg); 1378 ceph_con_send(&session->s_con, msg); 1379 return 0; 1380 } 1381 1382 /* 1383 * open sessions for any export targets for the given mds 1384 * 1385 * called under mdsc->mutex 1386 */ 1387 static struct ceph_mds_session * 1388 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1389 { 1390 struct ceph_mds_session *session; 1391 int ret; 1392 1393 session = __ceph_lookup_mds_session(mdsc, target); 1394 if (!session) { 1395 session = register_session(mdsc, target); 1396 if (IS_ERR(session)) 1397 return session; 1398 } 1399 if (session->s_state == CEPH_MDS_SESSION_NEW || 1400 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1401 ret = __open_session(mdsc, session); 1402 if (ret) 1403 return ERR_PTR(ret); 1404 } 1405 1406 return session; 1407 } 1408 1409 struct ceph_mds_session * 1410 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1411 { 1412 struct ceph_mds_session *session; 1413 1414 dout("open_export_target_session to mds%d\n", target); 1415 1416 mutex_lock(&mdsc->mutex); 1417 session = __open_export_target_session(mdsc, target); 1418 mutex_unlock(&mdsc->mutex); 1419 1420 return session; 1421 } 1422 1423 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1424 struct ceph_mds_session *session) 1425 { 1426 struct ceph_mds_info *mi; 1427 struct ceph_mds_session *ts; 1428 int i, mds = session->s_mds; 1429 1430 if (mds >= mdsc->mdsmap->possible_max_rank) 1431 return; 1432 1433 mi = &mdsc->mdsmap->m_info[mds]; 1434 dout("open_export_target_sessions for mds%d (%d targets)\n", 1435 session->s_mds, mi->num_export_targets); 1436 1437 for (i = 0; i < mi->num_export_targets; i++) { 1438 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1439 ceph_put_mds_session(ts); 1440 } 1441 } 1442 1443 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1444 struct ceph_mds_session *session) 1445 { 1446 mutex_lock(&mdsc->mutex); 1447 __open_export_target_sessions(mdsc, session); 1448 mutex_unlock(&mdsc->mutex); 1449 } 1450 1451 /* 1452 * session caps 1453 */ 1454 1455 static void detach_cap_releases(struct ceph_mds_session *session, 1456 struct list_head *target) 1457 { 1458 lockdep_assert_held(&session->s_cap_lock); 1459 1460 list_splice_init(&session->s_cap_releases, target); 1461 session->s_num_cap_releases = 0; 1462 dout("dispose_cap_releases mds%d\n", session->s_mds); 1463 } 1464 1465 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1466 struct list_head *dispose) 1467 { 1468 while (!list_empty(dispose)) { 1469 struct ceph_cap *cap; 1470 /* zero out the in-progress message */ 1471 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1472 list_del(&cap->session_caps); 1473 ceph_put_cap(mdsc, cap); 1474 } 1475 } 1476 1477 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1478 struct ceph_mds_session *session) 1479 { 1480 struct ceph_mds_request *req; 1481 struct rb_node *p; 1482 struct ceph_inode_info *ci; 1483 1484 dout("cleanup_session_requests mds%d\n", session->s_mds); 1485 mutex_lock(&mdsc->mutex); 1486 while (!list_empty(&session->s_unsafe)) { 1487 req = list_first_entry(&session->s_unsafe, 1488 struct ceph_mds_request, r_unsafe_item); 1489 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1490 req->r_tid); 1491 if (req->r_target_inode) { 1492 /* dropping unsafe change of inode's attributes */ 1493 ci = ceph_inode(req->r_target_inode); 1494 errseq_set(&ci->i_meta_err, -EIO); 1495 } 1496 if (req->r_unsafe_dir) { 1497 /* dropping unsafe directory operation */ 1498 ci = ceph_inode(req->r_unsafe_dir); 1499 errseq_set(&ci->i_meta_err, -EIO); 1500 } 1501 __unregister_request(mdsc, req); 1502 } 1503 /* zero r_attempts, so kick_requests() will re-send requests */ 1504 p = rb_first(&mdsc->request_tree); 1505 while (p) { 1506 req = rb_entry(p, struct ceph_mds_request, r_node); 1507 p = rb_next(p); 1508 if (req->r_session && 1509 req->r_session->s_mds == session->s_mds) 1510 req->r_attempts = 0; 1511 } 1512 mutex_unlock(&mdsc->mutex); 1513 } 1514 1515 /* 1516 * Helper to safely iterate over all caps associated with a session, with 1517 * special care taken to handle a racing __ceph_remove_cap(). 1518 * 1519 * Caller must hold session s_mutex. 1520 */ 1521 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1522 int (*cb)(struct inode *, struct ceph_cap *, 1523 void *), void *arg) 1524 { 1525 struct list_head *p; 1526 struct ceph_cap *cap; 1527 struct inode *inode, *last_inode = NULL; 1528 struct ceph_cap *old_cap = NULL; 1529 int ret; 1530 1531 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1532 spin_lock(&session->s_cap_lock); 1533 p = session->s_caps.next; 1534 while (p != &session->s_caps) { 1535 cap = list_entry(p, struct ceph_cap, session_caps); 1536 inode = igrab(&cap->ci->vfs_inode); 1537 if (!inode) { 1538 p = p->next; 1539 continue; 1540 } 1541 session->s_cap_iterator = cap; 1542 spin_unlock(&session->s_cap_lock); 1543 1544 if (last_inode) { 1545 iput(last_inode); 1546 last_inode = NULL; 1547 } 1548 if (old_cap) { 1549 ceph_put_cap(session->s_mdsc, old_cap); 1550 old_cap = NULL; 1551 } 1552 1553 ret = cb(inode, cap, arg); 1554 last_inode = inode; 1555 1556 spin_lock(&session->s_cap_lock); 1557 p = p->next; 1558 if (!cap->ci) { 1559 dout("iterate_session_caps finishing cap %p removal\n", 1560 cap); 1561 BUG_ON(cap->session != session); 1562 cap->session = NULL; 1563 list_del_init(&cap->session_caps); 1564 session->s_nr_caps--; 1565 atomic64_dec(&session->s_mdsc->metric.total_caps); 1566 if (cap->queue_release) 1567 __ceph_queue_cap_release(session, cap); 1568 else 1569 old_cap = cap; /* put_cap it w/o locks held */ 1570 } 1571 if (ret < 0) 1572 goto out; 1573 } 1574 ret = 0; 1575 out: 1576 session->s_cap_iterator = NULL; 1577 spin_unlock(&session->s_cap_lock); 1578 1579 iput(last_inode); 1580 if (old_cap) 1581 ceph_put_cap(session->s_mdsc, old_cap); 1582 1583 return ret; 1584 } 1585 1586 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1587 void *arg) 1588 { 1589 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg; 1590 struct ceph_inode_info *ci = ceph_inode(inode); 1591 LIST_HEAD(to_remove); 1592 bool dirty_dropped = false; 1593 bool invalidate = false; 1594 1595 dout("removing cap %p, ci is %p, inode is %p\n", 1596 cap, ci, &ci->vfs_inode); 1597 spin_lock(&ci->i_ceph_lock); 1598 __ceph_remove_cap(cap, false); 1599 if (!ci->i_auth_cap) { 1600 struct ceph_cap_flush *cf; 1601 struct ceph_mds_client *mdsc = fsc->mdsc; 1602 1603 if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) { 1604 if (inode->i_data.nrpages > 0) 1605 invalidate = true; 1606 if (ci->i_wrbuffer_ref > 0) 1607 mapping_set_error(&inode->i_data, -EIO); 1608 } 1609 1610 while (!list_empty(&ci->i_cap_flush_list)) { 1611 cf = list_first_entry(&ci->i_cap_flush_list, 1612 struct ceph_cap_flush, i_list); 1613 list_move(&cf->i_list, &to_remove); 1614 } 1615 1616 spin_lock(&mdsc->cap_dirty_lock); 1617 1618 list_for_each_entry(cf, &to_remove, i_list) 1619 list_del(&cf->g_list); 1620 1621 if (!list_empty(&ci->i_dirty_item)) { 1622 pr_warn_ratelimited( 1623 " dropping dirty %s state for %p %lld\n", 1624 ceph_cap_string(ci->i_dirty_caps), 1625 inode, ceph_ino(inode)); 1626 ci->i_dirty_caps = 0; 1627 list_del_init(&ci->i_dirty_item); 1628 dirty_dropped = true; 1629 } 1630 if (!list_empty(&ci->i_flushing_item)) { 1631 pr_warn_ratelimited( 1632 " dropping dirty+flushing %s state for %p %lld\n", 1633 ceph_cap_string(ci->i_flushing_caps), 1634 inode, ceph_ino(inode)); 1635 ci->i_flushing_caps = 0; 1636 list_del_init(&ci->i_flushing_item); 1637 mdsc->num_cap_flushing--; 1638 dirty_dropped = true; 1639 } 1640 spin_unlock(&mdsc->cap_dirty_lock); 1641 1642 if (dirty_dropped) { 1643 errseq_set(&ci->i_meta_err, -EIO); 1644 1645 if (ci->i_wrbuffer_ref_head == 0 && 1646 ci->i_wr_ref == 0 && 1647 ci->i_dirty_caps == 0 && 1648 ci->i_flushing_caps == 0) { 1649 ceph_put_snap_context(ci->i_head_snapc); 1650 ci->i_head_snapc = NULL; 1651 } 1652 } 1653 1654 if (atomic_read(&ci->i_filelock_ref) > 0) { 1655 /* make further file lock syscall return -EIO */ 1656 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK; 1657 pr_warn_ratelimited(" dropping file locks for %p %lld\n", 1658 inode, ceph_ino(inode)); 1659 } 1660 1661 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { 1662 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); 1663 ci->i_prealloc_cap_flush = NULL; 1664 } 1665 } 1666 spin_unlock(&ci->i_ceph_lock); 1667 while (!list_empty(&to_remove)) { 1668 struct ceph_cap_flush *cf; 1669 cf = list_first_entry(&to_remove, 1670 struct ceph_cap_flush, i_list); 1671 list_del(&cf->i_list); 1672 ceph_free_cap_flush(cf); 1673 } 1674 1675 wake_up_all(&ci->i_cap_wq); 1676 if (invalidate) 1677 ceph_queue_invalidate(inode); 1678 if (dirty_dropped) 1679 iput(inode); 1680 return 0; 1681 } 1682 1683 /* 1684 * caller must hold session s_mutex 1685 */ 1686 static void remove_session_caps(struct ceph_mds_session *session) 1687 { 1688 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1689 struct super_block *sb = fsc->sb; 1690 LIST_HEAD(dispose); 1691 1692 dout("remove_session_caps on %p\n", session); 1693 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1694 1695 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1696 1697 spin_lock(&session->s_cap_lock); 1698 if (session->s_nr_caps > 0) { 1699 struct inode *inode; 1700 struct ceph_cap *cap, *prev = NULL; 1701 struct ceph_vino vino; 1702 /* 1703 * iterate_session_caps() skips inodes that are being 1704 * deleted, we need to wait until deletions are complete. 1705 * __wait_on_freeing_inode() is designed for the job, 1706 * but it is not exported, so use lookup inode function 1707 * to access it. 1708 */ 1709 while (!list_empty(&session->s_caps)) { 1710 cap = list_entry(session->s_caps.next, 1711 struct ceph_cap, session_caps); 1712 if (cap == prev) 1713 break; 1714 prev = cap; 1715 vino = cap->ci->i_vino; 1716 spin_unlock(&session->s_cap_lock); 1717 1718 inode = ceph_find_inode(sb, vino); 1719 iput(inode); 1720 1721 spin_lock(&session->s_cap_lock); 1722 } 1723 } 1724 1725 // drop cap expires and unlock s_cap_lock 1726 detach_cap_releases(session, &dispose); 1727 1728 BUG_ON(session->s_nr_caps > 0); 1729 BUG_ON(!list_empty(&session->s_cap_flushing)); 1730 spin_unlock(&session->s_cap_lock); 1731 dispose_cap_releases(session->s_mdsc, &dispose); 1732 } 1733 1734 enum { 1735 RECONNECT, 1736 RENEWCAPS, 1737 FORCE_RO, 1738 }; 1739 1740 /* 1741 * wake up any threads waiting on this session's caps. if the cap is 1742 * old (didn't get renewed on the client reconnect), remove it now. 1743 * 1744 * caller must hold s_mutex. 1745 */ 1746 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1747 void *arg) 1748 { 1749 struct ceph_inode_info *ci = ceph_inode(inode); 1750 unsigned long ev = (unsigned long)arg; 1751 1752 if (ev == RECONNECT) { 1753 spin_lock(&ci->i_ceph_lock); 1754 ci->i_wanted_max_size = 0; 1755 ci->i_requested_max_size = 0; 1756 spin_unlock(&ci->i_ceph_lock); 1757 } else if (ev == RENEWCAPS) { 1758 if (cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) { 1759 /* mds did not re-issue stale cap */ 1760 spin_lock(&ci->i_ceph_lock); 1761 cap->issued = cap->implemented = CEPH_CAP_PIN; 1762 spin_unlock(&ci->i_ceph_lock); 1763 } 1764 } else if (ev == FORCE_RO) { 1765 } 1766 wake_up_all(&ci->i_cap_wq); 1767 return 0; 1768 } 1769 1770 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1771 { 1772 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1773 ceph_iterate_session_caps(session, wake_up_session_cb, 1774 (void *)(unsigned long)ev); 1775 } 1776 1777 /* 1778 * Send periodic message to MDS renewing all currently held caps. The 1779 * ack will reset the expiration for all caps from this session. 1780 * 1781 * caller holds s_mutex 1782 */ 1783 static int send_renew_caps(struct ceph_mds_client *mdsc, 1784 struct ceph_mds_session *session) 1785 { 1786 struct ceph_msg *msg; 1787 int state; 1788 1789 if (time_after_eq(jiffies, session->s_cap_ttl) && 1790 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1791 pr_info("mds%d caps stale\n", session->s_mds); 1792 session->s_renew_requested = jiffies; 1793 1794 /* do not try to renew caps until a recovering mds has reconnected 1795 * with its clients. */ 1796 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1797 if (state < CEPH_MDS_STATE_RECONNECT) { 1798 dout("send_renew_caps ignoring mds%d (%s)\n", 1799 session->s_mds, ceph_mds_state_name(state)); 1800 return 0; 1801 } 1802 1803 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1804 ceph_mds_state_name(state)); 1805 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1806 ++session->s_renew_seq); 1807 if (!msg) 1808 return -ENOMEM; 1809 ceph_con_send(&session->s_con, msg); 1810 return 0; 1811 } 1812 1813 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1814 struct ceph_mds_session *session, u64 seq) 1815 { 1816 struct ceph_msg *msg; 1817 1818 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1819 session->s_mds, ceph_session_state_name(session->s_state), seq); 1820 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1821 if (!msg) 1822 return -ENOMEM; 1823 ceph_con_send(&session->s_con, msg); 1824 return 0; 1825 } 1826 1827 1828 /* 1829 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1830 * 1831 * Called under session->s_mutex 1832 */ 1833 static void renewed_caps(struct ceph_mds_client *mdsc, 1834 struct ceph_mds_session *session, int is_renew) 1835 { 1836 int was_stale; 1837 int wake = 0; 1838 1839 spin_lock(&session->s_cap_lock); 1840 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1841 1842 session->s_cap_ttl = session->s_renew_requested + 1843 mdsc->mdsmap->m_session_timeout*HZ; 1844 1845 if (was_stale) { 1846 if (time_before(jiffies, session->s_cap_ttl)) { 1847 pr_info("mds%d caps renewed\n", session->s_mds); 1848 wake = 1; 1849 } else { 1850 pr_info("mds%d caps still stale\n", session->s_mds); 1851 } 1852 } 1853 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1854 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1855 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1856 spin_unlock(&session->s_cap_lock); 1857 1858 if (wake) 1859 wake_up_session_caps(session, RENEWCAPS); 1860 } 1861 1862 /* 1863 * send a session close request 1864 */ 1865 static int request_close_session(struct ceph_mds_session *session) 1866 { 1867 struct ceph_msg *msg; 1868 1869 dout("request_close_session mds%d state %s seq %lld\n", 1870 session->s_mds, ceph_session_state_name(session->s_state), 1871 session->s_seq); 1872 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1873 if (!msg) 1874 return -ENOMEM; 1875 ceph_con_send(&session->s_con, msg); 1876 return 1; 1877 } 1878 1879 /* 1880 * Called with s_mutex held. 1881 */ 1882 static int __close_session(struct ceph_mds_client *mdsc, 1883 struct ceph_mds_session *session) 1884 { 1885 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1886 return 0; 1887 session->s_state = CEPH_MDS_SESSION_CLOSING; 1888 return request_close_session(session); 1889 } 1890 1891 static bool drop_negative_children(struct dentry *dentry) 1892 { 1893 struct dentry *child; 1894 bool all_negative = true; 1895 1896 if (!d_is_dir(dentry)) 1897 goto out; 1898 1899 spin_lock(&dentry->d_lock); 1900 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1901 if (d_really_is_positive(child)) { 1902 all_negative = false; 1903 break; 1904 } 1905 } 1906 spin_unlock(&dentry->d_lock); 1907 1908 if (all_negative) 1909 shrink_dcache_parent(dentry); 1910 out: 1911 return all_negative; 1912 } 1913 1914 /* 1915 * Trim old(er) caps. 1916 * 1917 * Because we can't cache an inode without one or more caps, we do 1918 * this indirectly: if a cap is unused, we prune its aliases, at which 1919 * point the inode will hopefully get dropped to. 1920 * 1921 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1922 * memory pressure from the MDS, though, so it needn't be perfect. 1923 */ 1924 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1925 { 1926 int *remaining = arg; 1927 struct ceph_inode_info *ci = ceph_inode(inode); 1928 int used, wanted, oissued, mine; 1929 1930 if (*remaining <= 0) 1931 return -1; 1932 1933 spin_lock(&ci->i_ceph_lock); 1934 mine = cap->issued | cap->implemented; 1935 used = __ceph_caps_used(ci); 1936 wanted = __ceph_caps_file_wanted(ci); 1937 oissued = __ceph_caps_issued_other(ci, cap); 1938 1939 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1940 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1941 ceph_cap_string(used), ceph_cap_string(wanted)); 1942 if (cap == ci->i_auth_cap) { 1943 if (ci->i_dirty_caps || ci->i_flushing_caps || 1944 !list_empty(&ci->i_cap_snaps)) 1945 goto out; 1946 if ((used | wanted) & CEPH_CAP_ANY_WR) 1947 goto out; 1948 /* Note: it's possible that i_filelock_ref becomes non-zero 1949 * after dropping auth caps. It doesn't hurt because reply 1950 * of lock mds request will re-add auth caps. */ 1951 if (atomic_read(&ci->i_filelock_ref) > 0) 1952 goto out; 1953 } 1954 /* The inode has cached pages, but it's no longer used. 1955 * we can safely drop it */ 1956 if (S_ISREG(inode->i_mode) && 1957 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 1958 !(oissued & CEPH_CAP_FILE_CACHE)) { 1959 used = 0; 1960 oissued = 0; 1961 } 1962 if ((used | wanted) & ~oissued & mine) 1963 goto out; /* we need these caps */ 1964 1965 if (oissued) { 1966 /* we aren't the only cap.. just remove us */ 1967 __ceph_remove_cap(cap, true); 1968 (*remaining)--; 1969 } else { 1970 struct dentry *dentry; 1971 /* try dropping referring dentries */ 1972 spin_unlock(&ci->i_ceph_lock); 1973 dentry = d_find_any_alias(inode); 1974 if (dentry && drop_negative_children(dentry)) { 1975 int count; 1976 dput(dentry); 1977 d_prune_aliases(inode); 1978 count = atomic_read(&inode->i_count); 1979 if (count == 1) 1980 (*remaining)--; 1981 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1982 inode, cap, count); 1983 } else { 1984 dput(dentry); 1985 } 1986 return 0; 1987 } 1988 1989 out: 1990 spin_unlock(&ci->i_ceph_lock); 1991 return 0; 1992 } 1993 1994 /* 1995 * Trim session cap count down to some max number. 1996 */ 1997 int ceph_trim_caps(struct ceph_mds_client *mdsc, 1998 struct ceph_mds_session *session, 1999 int max_caps) 2000 { 2001 int trim_caps = session->s_nr_caps - max_caps; 2002 2003 dout("trim_caps mds%d start: %d / %d, trim %d\n", 2004 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 2005 if (trim_caps > 0) { 2006 int remaining = trim_caps; 2007 2008 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2009 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 2010 session->s_mds, session->s_nr_caps, max_caps, 2011 trim_caps - remaining); 2012 } 2013 2014 ceph_flush_cap_releases(mdsc, session); 2015 return 0; 2016 } 2017 2018 static int check_caps_flush(struct ceph_mds_client *mdsc, 2019 u64 want_flush_tid) 2020 { 2021 int ret = 1; 2022 2023 spin_lock(&mdsc->cap_dirty_lock); 2024 if (!list_empty(&mdsc->cap_flush_list)) { 2025 struct ceph_cap_flush *cf = 2026 list_first_entry(&mdsc->cap_flush_list, 2027 struct ceph_cap_flush, g_list); 2028 if (cf->tid <= want_flush_tid) { 2029 dout("check_caps_flush still flushing tid " 2030 "%llu <= %llu\n", cf->tid, want_flush_tid); 2031 ret = 0; 2032 } 2033 } 2034 spin_unlock(&mdsc->cap_dirty_lock); 2035 return ret; 2036 } 2037 2038 /* 2039 * flush all dirty inode data to disk. 2040 * 2041 * returns true if we've flushed through want_flush_tid 2042 */ 2043 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2044 u64 want_flush_tid) 2045 { 2046 dout("check_caps_flush want %llu\n", want_flush_tid); 2047 2048 wait_event(mdsc->cap_flushing_wq, 2049 check_caps_flush(mdsc, want_flush_tid)); 2050 2051 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 2052 } 2053 2054 /* 2055 * called under s_mutex 2056 */ 2057 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2058 struct ceph_mds_session *session) 2059 { 2060 struct ceph_msg *msg = NULL; 2061 struct ceph_mds_cap_release *head; 2062 struct ceph_mds_cap_item *item; 2063 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2064 struct ceph_cap *cap; 2065 LIST_HEAD(tmp_list); 2066 int num_cap_releases; 2067 __le32 barrier, *cap_barrier; 2068 2069 down_read(&osdc->lock); 2070 barrier = cpu_to_le32(osdc->epoch_barrier); 2071 up_read(&osdc->lock); 2072 2073 spin_lock(&session->s_cap_lock); 2074 again: 2075 list_splice_init(&session->s_cap_releases, &tmp_list); 2076 num_cap_releases = session->s_num_cap_releases; 2077 session->s_num_cap_releases = 0; 2078 spin_unlock(&session->s_cap_lock); 2079 2080 while (!list_empty(&tmp_list)) { 2081 if (!msg) { 2082 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2083 PAGE_SIZE, GFP_NOFS, false); 2084 if (!msg) 2085 goto out_err; 2086 head = msg->front.iov_base; 2087 head->num = cpu_to_le32(0); 2088 msg->front.iov_len = sizeof(*head); 2089 2090 msg->hdr.version = cpu_to_le16(2); 2091 msg->hdr.compat_version = cpu_to_le16(1); 2092 } 2093 2094 cap = list_first_entry(&tmp_list, struct ceph_cap, 2095 session_caps); 2096 list_del(&cap->session_caps); 2097 num_cap_releases--; 2098 2099 head = msg->front.iov_base; 2100 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2101 &head->num); 2102 item = msg->front.iov_base + msg->front.iov_len; 2103 item->ino = cpu_to_le64(cap->cap_ino); 2104 item->cap_id = cpu_to_le64(cap->cap_id); 2105 item->migrate_seq = cpu_to_le32(cap->mseq); 2106 item->seq = cpu_to_le32(cap->issue_seq); 2107 msg->front.iov_len += sizeof(*item); 2108 2109 ceph_put_cap(mdsc, cap); 2110 2111 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2112 // Append cap_barrier field 2113 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2114 *cap_barrier = barrier; 2115 msg->front.iov_len += sizeof(*cap_barrier); 2116 2117 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2118 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2119 ceph_con_send(&session->s_con, msg); 2120 msg = NULL; 2121 } 2122 } 2123 2124 BUG_ON(num_cap_releases != 0); 2125 2126 spin_lock(&session->s_cap_lock); 2127 if (!list_empty(&session->s_cap_releases)) 2128 goto again; 2129 spin_unlock(&session->s_cap_lock); 2130 2131 if (msg) { 2132 // Append cap_barrier field 2133 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2134 *cap_barrier = barrier; 2135 msg->front.iov_len += sizeof(*cap_barrier); 2136 2137 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2138 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2139 ceph_con_send(&session->s_con, msg); 2140 } 2141 return; 2142 out_err: 2143 pr_err("send_cap_releases mds%d, failed to allocate message\n", 2144 session->s_mds); 2145 spin_lock(&session->s_cap_lock); 2146 list_splice(&tmp_list, &session->s_cap_releases); 2147 session->s_num_cap_releases += num_cap_releases; 2148 spin_unlock(&session->s_cap_lock); 2149 } 2150 2151 static void ceph_cap_release_work(struct work_struct *work) 2152 { 2153 struct ceph_mds_session *session = 2154 container_of(work, struct ceph_mds_session, s_cap_release_work); 2155 2156 mutex_lock(&session->s_mutex); 2157 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2158 session->s_state == CEPH_MDS_SESSION_HUNG) 2159 ceph_send_cap_releases(session->s_mdsc, session); 2160 mutex_unlock(&session->s_mutex); 2161 ceph_put_mds_session(session); 2162 } 2163 2164 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 2165 struct ceph_mds_session *session) 2166 { 2167 if (mdsc->stopping) 2168 return; 2169 2170 ceph_get_mds_session(session); 2171 if (queue_work(mdsc->fsc->cap_wq, 2172 &session->s_cap_release_work)) { 2173 dout("cap release work queued\n"); 2174 } else { 2175 ceph_put_mds_session(session); 2176 dout("failed to queue cap release work\n"); 2177 } 2178 } 2179 2180 /* 2181 * caller holds session->s_cap_lock 2182 */ 2183 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2184 struct ceph_cap *cap) 2185 { 2186 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2187 session->s_num_cap_releases++; 2188 2189 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2190 ceph_flush_cap_releases(session->s_mdsc, session); 2191 } 2192 2193 static void ceph_cap_reclaim_work(struct work_struct *work) 2194 { 2195 struct ceph_mds_client *mdsc = 2196 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2197 int ret = ceph_trim_dentries(mdsc); 2198 if (ret == -EAGAIN) 2199 ceph_queue_cap_reclaim_work(mdsc); 2200 } 2201 2202 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2203 { 2204 if (mdsc->stopping) 2205 return; 2206 2207 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2208 dout("caps reclaim work queued\n"); 2209 } else { 2210 dout("failed to queue caps release work\n"); 2211 } 2212 } 2213 2214 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2215 { 2216 int val; 2217 if (!nr) 2218 return; 2219 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2220 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2221 atomic_set(&mdsc->cap_reclaim_pending, 0); 2222 ceph_queue_cap_reclaim_work(mdsc); 2223 } 2224 } 2225 2226 /* 2227 * requests 2228 */ 2229 2230 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2231 struct inode *dir) 2232 { 2233 struct ceph_inode_info *ci = ceph_inode(dir); 2234 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2235 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2236 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2237 unsigned int num_entries; 2238 int order; 2239 2240 spin_lock(&ci->i_ceph_lock); 2241 num_entries = ci->i_files + ci->i_subdirs; 2242 spin_unlock(&ci->i_ceph_lock); 2243 num_entries = max(num_entries, 1U); 2244 num_entries = min(num_entries, opt->max_readdir); 2245 2246 order = get_order(size * num_entries); 2247 while (order >= 0) { 2248 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2249 __GFP_NOWARN, 2250 order); 2251 if (rinfo->dir_entries) 2252 break; 2253 order--; 2254 } 2255 if (!rinfo->dir_entries) 2256 return -ENOMEM; 2257 2258 num_entries = (PAGE_SIZE << order) / size; 2259 num_entries = min(num_entries, opt->max_readdir); 2260 2261 rinfo->dir_buf_size = PAGE_SIZE << order; 2262 req->r_num_caps = num_entries + 1; 2263 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2264 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2265 return 0; 2266 } 2267 2268 /* 2269 * Create an mds request. 2270 */ 2271 struct ceph_mds_request * 2272 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2273 { 2274 struct ceph_mds_request *req; 2275 2276 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2277 if (!req) 2278 return ERR_PTR(-ENOMEM); 2279 2280 mutex_init(&req->r_fill_mutex); 2281 req->r_mdsc = mdsc; 2282 req->r_started = jiffies; 2283 req->r_start_latency = ktime_get(); 2284 req->r_resend_mds = -1; 2285 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2286 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2287 req->r_fmode = -1; 2288 kref_init(&req->r_kref); 2289 RB_CLEAR_NODE(&req->r_node); 2290 INIT_LIST_HEAD(&req->r_wait); 2291 init_completion(&req->r_completion); 2292 init_completion(&req->r_safe_completion); 2293 INIT_LIST_HEAD(&req->r_unsafe_item); 2294 2295 ktime_get_coarse_real_ts64(&req->r_stamp); 2296 2297 req->r_op = op; 2298 req->r_direct_mode = mode; 2299 return req; 2300 } 2301 2302 /* 2303 * return oldest (lowest) request, tid in request tree, 0 if none. 2304 * 2305 * called under mdsc->mutex. 2306 */ 2307 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2308 { 2309 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2310 return NULL; 2311 return rb_entry(rb_first(&mdsc->request_tree), 2312 struct ceph_mds_request, r_node); 2313 } 2314 2315 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2316 { 2317 return mdsc->oldest_tid; 2318 } 2319 2320 /* 2321 * Build a dentry's path. Allocate on heap; caller must kfree. Based 2322 * on build_path_from_dentry in fs/cifs/dir.c. 2323 * 2324 * If @stop_on_nosnap, generate path relative to the first non-snapped 2325 * inode. 2326 * 2327 * Encode hidden .snap dirs as a double /, i.e. 2328 * foo/.snap/bar -> foo//bar 2329 */ 2330 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, 2331 int stop_on_nosnap) 2332 { 2333 struct dentry *temp; 2334 char *path; 2335 int pos; 2336 unsigned seq; 2337 u64 base; 2338 2339 if (!dentry) 2340 return ERR_PTR(-EINVAL); 2341 2342 path = __getname(); 2343 if (!path) 2344 return ERR_PTR(-ENOMEM); 2345 retry: 2346 pos = PATH_MAX - 1; 2347 path[pos] = '\0'; 2348 2349 seq = read_seqbegin(&rename_lock); 2350 rcu_read_lock(); 2351 temp = dentry; 2352 for (;;) { 2353 struct inode *inode; 2354 2355 spin_lock(&temp->d_lock); 2356 inode = d_inode(temp); 2357 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2358 dout("build_path path+%d: %p SNAPDIR\n", 2359 pos, temp); 2360 } else if (stop_on_nosnap && inode && dentry != temp && 2361 ceph_snap(inode) == CEPH_NOSNAP) { 2362 spin_unlock(&temp->d_lock); 2363 pos++; /* get rid of any prepended '/' */ 2364 break; 2365 } else { 2366 pos -= temp->d_name.len; 2367 if (pos < 0) { 2368 spin_unlock(&temp->d_lock); 2369 break; 2370 } 2371 memcpy(path + pos, temp->d_name.name, temp->d_name.len); 2372 } 2373 spin_unlock(&temp->d_lock); 2374 temp = READ_ONCE(temp->d_parent); 2375 2376 /* Are we at the root? */ 2377 if (IS_ROOT(temp)) 2378 break; 2379 2380 /* Are we out of buffer? */ 2381 if (--pos < 0) 2382 break; 2383 2384 path[pos] = '/'; 2385 } 2386 base = ceph_ino(d_inode(temp)); 2387 rcu_read_unlock(); 2388 2389 if (read_seqretry(&rename_lock, seq)) 2390 goto retry; 2391 2392 if (pos < 0) { 2393 /* 2394 * A rename didn't occur, but somehow we didn't end up where 2395 * we thought we would. Throw a warning and try again. 2396 */ 2397 pr_warn("build_path did not end path lookup where " 2398 "expected, pos is %d\n", pos); 2399 goto retry; 2400 } 2401 2402 *pbase = base; 2403 *plen = PATH_MAX - 1 - pos; 2404 dout("build_path on %p %d built %llx '%.*s'\n", 2405 dentry, d_count(dentry), base, *plen, path + pos); 2406 return path + pos; 2407 } 2408 2409 static int build_dentry_path(struct dentry *dentry, struct inode *dir, 2410 const char **ppath, int *ppathlen, u64 *pino, 2411 bool *pfreepath, bool parent_locked) 2412 { 2413 char *path; 2414 2415 rcu_read_lock(); 2416 if (!dir) 2417 dir = d_inode_rcu(dentry->d_parent); 2418 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { 2419 *pino = ceph_ino(dir); 2420 rcu_read_unlock(); 2421 *ppath = dentry->d_name.name; 2422 *ppathlen = dentry->d_name.len; 2423 return 0; 2424 } 2425 rcu_read_unlock(); 2426 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2427 if (IS_ERR(path)) 2428 return PTR_ERR(path); 2429 *ppath = path; 2430 *pfreepath = true; 2431 return 0; 2432 } 2433 2434 static int build_inode_path(struct inode *inode, 2435 const char **ppath, int *ppathlen, u64 *pino, 2436 bool *pfreepath) 2437 { 2438 struct dentry *dentry; 2439 char *path; 2440 2441 if (ceph_snap(inode) == CEPH_NOSNAP) { 2442 *pino = ceph_ino(inode); 2443 *ppathlen = 0; 2444 return 0; 2445 } 2446 dentry = d_find_alias(inode); 2447 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2448 dput(dentry); 2449 if (IS_ERR(path)) 2450 return PTR_ERR(path); 2451 *ppath = path; 2452 *pfreepath = true; 2453 return 0; 2454 } 2455 2456 /* 2457 * request arguments may be specified via an inode *, a dentry *, or 2458 * an explicit ino+path. 2459 */ 2460 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2461 struct inode *rdiri, const char *rpath, 2462 u64 rino, const char **ppath, int *pathlen, 2463 u64 *ino, bool *freepath, bool parent_locked) 2464 { 2465 int r = 0; 2466 2467 if (rinode) { 2468 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2469 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2470 ceph_snap(rinode)); 2471 } else if (rdentry) { 2472 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2473 freepath, parent_locked); 2474 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2475 *ppath); 2476 } else if (rpath || rino) { 2477 *ino = rino; 2478 *ppath = rpath; 2479 *pathlen = rpath ? strlen(rpath) : 0; 2480 dout(" path %.*s\n", *pathlen, rpath); 2481 } 2482 2483 return r; 2484 } 2485 2486 static void encode_timestamp_and_gids(void **p, 2487 const struct ceph_mds_request *req) 2488 { 2489 struct ceph_timespec ts; 2490 int i; 2491 2492 ceph_encode_timespec64(&ts, &req->r_stamp); 2493 ceph_encode_copy(p, &ts, sizeof(ts)); 2494 2495 /* gid_list */ 2496 ceph_encode_32(p, req->r_cred->group_info->ngroups); 2497 for (i = 0; i < req->r_cred->group_info->ngroups; i++) 2498 ceph_encode_64(p, from_kgid(&init_user_ns, 2499 req->r_cred->group_info->gid[i])); 2500 } 2501 2502 /* 2503 * called under mdsc->mutex 2504 */ 2505 static struct ceph_msg *create_request_message(struct ceph_mds_session *session, 2506 struct ceph_mds_request *req, 2507 bool drop_cap_releases) 2508 { 2509 int mds = session->s_mds; 2510 struct ceph_mds_client *mdsc = session->s_mdsc; 2511 struct ceph_msg *msg; 2512 struct ceph_mds_request_head_old *head; 2513 const char *path1 = NULL; 2514 const char *path2 = NULL; 2515 u64 ino1 = 0, ino2 = 0; 2516 int pathlen1 = 0, pathlen2 = 0; 2517 bool freepath1 = false, freepath2 = false; 2518 int len; 2519 u16 releases; 2520 void *p, *end; 2521 int ret; 2522 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); 2523 2524 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2525 req->r_parent, req->r_path1, req->r_ino1.ino, 2526 &path1, &pathlen1, &ino1, &freepath1, 2527 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2528 &req->r_req_flags)); 2529 if (ret < 0) { 2530 msg = ERR_PTR(ret); 2531 goto out; 2532 } 2533 2534 /* If r_old_dentry is set, then assume that its parent is locked */ 2535 ret = set_request_path_attr(NULL, req->r_old_dentry, 2536 req->r_old_dentry_dir, 2537 req->r_path2, req->r_ino2.ino, 2538 &path2, &pathlen2, &ino2, &freepath2, true); 2539 if (ret < 0) { 2540 msg = ERR_PTR(ret); 2541 goto out_free1; 2542 } 2543 2544 len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head); 2545 len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2546 sizeof(struct ceph_timespec); 2547 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); 2548 2549 /* calculate (max) length for cap releases */ 2550 len += sizeof(struct ceph_mds_request_release) * 2551 (!!req->r_inode_drop + !!req->r_dentry_drop + 2552 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2553 2554 if (req->r_dentry_drop) 2555 len += pathlen1; 2556 if (req->r_old_dentry_drop) 2557 len += pathlen2; 2558 2559 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2560 if (!msg) { 2561 msg = ERR_PTR(-ENOMEM); 2562 goto out_free2; 2563 } 2564 2565 msg->hdr.tid = cpu_to_le64(req->r_tid); 2566 2567 /* 2568 * The old ceph_mds_request_head didn't contain a version field, and 2569 * one was added when we moved the message version from 3->4. 2570 */ 2571 if (legacy) { 2572 msg->hdr.version = cpu_to_le16(3); 2573 head = msg->front.iov_base; 2574 p = msg->front.iov_base + sizeof(*head); 2575 } else { 2576 struct ceph_mds_request_head *new_head = msg->front.iov_base; 2577 2578 msg->hdr.version = cpu_to_le16(4); 2579 new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); 2580 head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2581 p = msg->front.iov_base + sizeof(*new_head); 2582 } 2583 2584 end = msg->front.iov_base + msg->front.iov_len; 2585 2586 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2587 head->op = cpu_to_le32(req->r_op); 2588 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, 2589 req->r_cred->fsuid)); 2590 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, 2591 req->r_cred->fsgid)); 2592 head->ino = cpu_to_le64(req->r_deleg_ino); 2593 head->args = req->r_args; 2594 2595 ceph_encode_filepath(&p, end, ino1, path1); 2596 ceph_encode_filepath(&p, end, ino2, path2); 2597 2598 /* make note of release offset, in case we need to replay */ 2599 req->r_request_release_offset = p - msg->front.iov_base; 2600 2601 /* cap releases */ 2602 releases = 0; 2603 if (req->r_inode_drop) 2604 releases += ceph_encode_inode_release(&p, 2605 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2606 mds, req->r_inode_drop, req->r_inode_unless, 2607 req->r_op == CEPH_MDS_OP_READDIR); 2608 if (req->r_dentry_drop) 2609 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2610 req->r_parent, mds, req->r_dentry_drop, 2611 req->r_dentry_unless); 2612 if (req->r_old_dentry_drop) 2613 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2614 req->r_old_dentry_dir, mds, 2615 req->r_old_dentry_drop, 2616 req->r_old_dentry_unless); 2617 if (req->r_old_inode_drop) 2618 releases += ceph_encode_inode_release(&p, 2619 d_inode(req->r_old_dentry), 2620 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2621 2622 if (drop_cap_releases) { 2623 releases = 0; 2624 p = msg->front.iov_base + req->r_request_release_offset; 2625 } 2626 2627 head->num_releases = cpu_to_le16(releases); 2628 2629 encode_timestamp_and_gids(&p, req); 2630 2631 if (WARN_ON_ONCE(p > end)) { 2632 ceph_msg_put(msg); 2633 msg = ERR_PTR(-ERANGE); 2634 goto out_free2; 2635 } 2636 2637 msg->front.iov_len = p - msg->front.iov_base; 2638 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2639 2640 if (req->r_pagelist) { 2641 struct ceph_pagelist *pagelist = req->r_pagelist; 2642 ceph_msg_data_add_pagelist(msg, pagelist); 2643 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2644 } else { 2645 msg->hdr.data_len = 0; 2646 } 2647 2648 msg->hdr.data_off = cpu_to_le16(0); 2649 2650 out_free2: 2651 if (freepath2) 2652 ceph_mdsc_free_path((char *)path2, pathlen2); 2653 out_free1: 2654 if (freepath1) 2655 ceph_mdsc_free_path((char *)path1, pathlen1); 2656 out: 2657 return msg; 2658 } 2659 2660 /* 2661 * called under mdsc->mutex if error, under no mutex if 2662 * success. 2663 */ 2664 static void complete_request(struct ceph_mds_client *mdsc, 2665 struct ceph_mds_request *req) 2666 { 2667 req->r_end_latency = ktime_get(); 2668 2669 if (req->r_callback) 2670 req->r_callback(mdsc, req); 2671 complete_all(&req->r_completion); 2672 } 2673 2674 static struct ceph_mds_request_head_old * 2675 find_old_request_head(void *p, u64 features) 2676 { 2677 bool legacy = !(features & CEPH_FEATURE_FS_BTIME); 2678 struct ceph_mds_request_head *new_head; 2679 2680 if (legacy) 2681 return (struct ceph_mds_request_head_old *)p; 2682 new_head = (struct ceph_mds_request_head *)p; 2683 return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2684 } 2685 2686 /* 2687 * called under mdsc->mutex 2688 */ 2689 static int __prepare_send_request(struct ceph_mds_session *session, 2690 struct ceph_mds_request *req, 2691 bool drop_cap_releases) 2692 { 2693 int mds = session->s_mds; 2694 struct ceph_mds_client *mdsc = session->s_mdsc; 2695 struct ceph_mds_request_head_old *rhead; 2696 struct ceph_msg *msg; 2697 int flags = 0; 2698 2699 req->r_attempts++; 2700 if (req->r_inode) { 2701 struct ceph_cap *cap = 2702 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2703 2704 if (cap) 2705 req->r_sent_on_mseq = cap->mseq; 2706 else 2707 req->r_sent_on_mseq = -1; 2708 } 2709 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 2710 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2711 2712 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2713 void *p; 2714 2715 /* 2716 * Replay. Do not regenerate message (and rebuild 2717 * paths, etc.); just use the original message. 2718 * Rebuilding paths will break for renames because 2719 * d_move mangles the src name. 2720 */ 2721 msg = req->r_request; 2722 rhead = find_old_request_head(msg->front.iov_base, 2723 session->s_con.peer_features); 2724 2725 flags = le32_to_cpu(rhead->flags); 2726 flags |= CEPH_MDS_FLAG_REPLAY; 2727 rhead->flags = cpu_to_le32(flags); 2728 2729 if (req->r_target_inode) 2730 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2731 2732 rhead->num_retry = req->r_attempts - 1; 2733 2734 /* remove cap/dentry releases from message */ 2735 rhead->num_releases = 0; 2736 2737 p = msg->front.iov_base + req->r_request_release_offset; 2738 encode_timestamp_and_gids(&p, req); 2739 2740 msg->front.iov_len = p - msg->front.iov_base; 2741 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2742 return 0; 2743 } 2744 2745 if (req->r_request) { 2746 ceph_msg_put(req->r_request); 2747 req->r_request = NULL; 2748 } 2749 msg = create_request_message(session, req, drop_cap_releases); 2750 if (IS_ERR(msg)) { 2751 req->r_err = PTR_ERR(msg); 2752 return PTR_ERR(msg); 2753 } 2754 req->r_request = msg; 2755 2756 rhead = find_old_request_head(msg->front.iov_base, 2757 session->s_con.peer_features); 2758 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2759 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2760 flags |= CEPH_MDS_FLAG_REPLAY; 2761 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 2762 flags |= CEPH_MDS_FLAG_ASYNC; 2763 if (req->r_parent) 2764 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2765 rhead->flags = cpu_to_le32(flags); 2766 rhead->num_fwd = req->r_num_fwd; 2767 rhead->num_retry = req->r_attempts - 1; 2768 2769 dout(" r_parent = %p\n", req->r_parent); 2770 return 0; 2771 } 2772 2773 /* 2774 * called under mdsc->mutex 2775 */ 2776 static int __send_request(struct ceph_mds_session *session, 2777 struct ceph_mds_request *req, 2778 bool drop_cap_releases) 2779 { 2780 int err; 2781 2782 err = __prepare_send_request(session, req, drop_cap_releases); 2783 if (!err) { 2784 ceph_msg_get(req->r_request); 2785 ceph_con_send(&session->s_con, req->r_request); 2786 } 2787 2788 return err; 2789 } 2790 2791 /* 2792 * send request, or put it on the appropriate wait list. 2793 */ 2794 static void __do_request(struct ceph_mds_client *mdsc, 2795 struct ceph_mds_request *req) 2796 { 2797 struct ceph_mds_session *session = NULL; 2798 int mds = -1; 2799 int err = 0; 2800 bool random; 2801 2802 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2803 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2804 __unregister_request(mdsc, req); 2805 return; 2806 } 2807 2808 if (req->r_timeout && 2809 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2810 dout("do_request timed out\n"); 2811 err = -ETIMEDOUT; 2812 goto finish; 2813 } 2814 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2815 dout("do_request forced umount\n"); 2816 err = -EIO; 2817 goto finish; 2818 } 2819 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2820 if (mdsc->mdsmap_err) { 2821 err = mdsc->mdsmap_err; 2822 dout("do_request mdsmap err %d\n", err); 2823 goto finish; 2824 } 2825 if (mdsc->mdsmap->m_epoch == 0) { 2826 dout("do_request no mdsmap, waiting for map\n"); 2827 list_add(&req->r_wait, &mdsc->waiting_for_map); 2828 return; 2829 } 2830 if (!(mdsc->fsc->mount_options->flags & 2831 CEPH_MOUNT_OPT_MOUNTWAIT) && 2832 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2833 err = -EHOSTUNREACH; 2834 goto finish; 2835 } 2836 } 2837 2838 put_request_session(req); 2839 2840 mds = __choose_mds(mdsc, req, &random); 2841 if (mds < 0 || 2842 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2843 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2844 err = -EJUKEBOX; 2845 goto finish; 2846 } 2847 dout("do_request no mds or not active, waiting for map\n"); 2848 list_add(&req->r_wait, &mdsc->waiting_for_map); 2849 return; 2850 } 2851 2852 /* get, open session */ 2853 session = __ceph_lookup_mds_session(mdsc, mds); 2854 if (!session) { 2855 session = register_session(mdsc, mds); 2856 if (IS_ERR(session)) { 2857 err = PTR_ERR(session); 2858 goto finish; 2859 } 2860 } 2861 req->r_session = ceph_get_mds_session(session); 2862 2863 dout("do_request mds%d session %p state %s\n", mds, session, 2864 ceph_session_state_name(session->s_state)); 2865 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2866 session->s_state != CEPH_MDS_SESSION_HUNG) { 2867 /* 2868 * We cannot queue async requests since the caps and delegated 2869 * inodes are bound to the session. Just return -EJUKEBOX and 2870 * let the caller retry a sync request in that case. 2871 */ 2872 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2873 err = -EJUKEBOX; 2874 goto out_session; 2875 } 2876 2877 /* 2878 * If the session has been REJECTED, then return a hard error, 2879 * unless it's a CLEANRECOVER mount, in which case we'll queue 2880 * it to the mdsc queue. 2881 */ 2882 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2883 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) 2884 list_add(&req->r_wait, &mdsc->waiting_for_map); 2885 else 2886 err = -EACCES; 2887 goto out_session; 2888 } 2889 2890 if (session->s_state == CEPH_MDS_SESSION_NEW || 2891 session->s_state == CEPH_MDS_SESSION_CLOSING) { 2892 err = __open_session(mdsc, session); 2893 if (err) 2894 goto out_session; 2895 /* retry the same mds later */ 2896 if (random) 2897 req->r_resend_mds = mds; 2898 } 2899 list_add(&req->r_wait, &session->s_waiting); 2900 goto out_session; 2901 } 2902 2903 /* send request */ 2904 req->r_resend_mds = -1; /* forget any previous mds hint */ 2905 2906 if (req->r_request_started == 0) /* note request start time */ 2907 req->r_request_started = jiffies; 2908 2909 err = __send_request(session, req, false); 2910 2911 out_session: 2912 ceph_put_mds_session(session); 2913 finish: 2914 if (err) { 2915 dout("__do_request early error %d\n", err); 2916 req->r_err = err; 2917 complete_request(mdsc, req); 2918 __unregister_request(mdsc, req); 2919 } 2920 return; 2921 } 2922 2923 /* 2924 * called under mdsc->mutex 2925 */ 2926 static void __wake_requests(struct ceph_mds_client *mdsc, 2927 struct list_head *head) 2928 { 2929 struct ceph_mds_request *req; 2930 LIST_HEAD(tmp_list); 2931 2932 list_splice_init(head, &tmp_list); 2933 2934 while (!list_empty(&tmp_list)) { 2935 req = list_entry(tmp_list.next, 2936 struct ceph_mds_request, r_wait); 2937 list_del_init(&req->r_wait); 2938 dout(" wake request %p tid %llu\n", req, req->r_tid); 2939 __do_request(mdsc, req); 2940 } 2941 } 2942 2943 /* 2944 * Wake up threads with requests pending for @mds, so that they can 2945 * resubmit their requests to a possibly different mds. 2946 */ 2947 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 2948 { 2949 struct ceph_mds_request *req; 2950 struct rb_node *p = rb_first(&mdsc->request_tree); 2951 2952 dout("kick_requests mds%d\n", mds); 2953 while (p) { 2954 req = rb_entry(p, struct ceph_mds_request, r_node); 2955 p = rb_next(p); 2956 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2957 continue; 2958 if (req->r_attempts > 0) 2959 continue; /* only new requests */ 2960 if (req->r_session && 2961 req->r_session->s_mds == mds) { 2962 dout(" kicking tid %llu\n", req->r_tid); 2963 list_del_init(&req->r_wait); 2964 __do_request(mdsc, req); 2965 } 2966 } 2967 } 2968 2969 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 2970 struct ceph_mds_request *req) 2971 { 2972 int err = 0; 2973 2974 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 2975 if (req->r_inode) 2976 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2977 if (req->r_parent) { 2978 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 2979 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 2980 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 2981 spin_lock(&ci->i_ceph_lock); 2982 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 2983 __ceph_touch_fmode(ci, mdsc, fmode); 2984 spin_unlock(&ci->i_ceph_lock); 2985 } 2986 if (req->r_old_dentry_dir) 2987 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2988 CEPH_CAP_PIN); 2989 2990 if (req->r_inode) { 2991 err = ceph_wait_on_async_create(req->r_inode); 2992 if (err) { 2993 dout("%s: wait for async create returned: %d\n", 2994 __func__, err); 2995 return err; 2996 } 2997 } 2998 2999 if (!err && req->r_old_inode) { 3000 err = ceph_wait_on_async_create(req->r_old_inode); 3001 if (err) { 3002 dout("%s: wait for async create returned: %d\n", 3003 __func__, err); 3004 return err; 3005 } 3006 } 3007 3008 dout("submit_request on %p for inode %p\n", req, dir); 3009 mutex_lock(&mdsc->mutex); 3010 __register_request(mdsc, req, dir); 3011 __do_request(mdsc, req); 3012 err = req->r_err; 3013 mutex_unlock(&mdsc->mutex); 3014 return err; 3015 } 3016 3017 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 3018 struct ceph_mds_request *req) 3019 { 3020 int err; 3021 3022 /* wait */ 3023 dout("do_request waiting\n"); 3024 if (!req->r_timeout && req->r_wait_for_completion) { 3025 err = req->r_wait_for_completion(mdsc, req); 3026 } else { 3027 long timeleft = wait_for_completion_killable_timeout( 3028 &req->r_completion, 3029 ceph_timeout_jiffies(req->r_timeout)); 3030 if (timeleft > 0) 3031 err = 0; 3032 else if (!timeleft) 3033 err = -ETIMEDOUT; /* timed out */ 3034 else 3035 err = timeleft; /* killed */ 3036 } 3037 dout("do_request waited, got %d\n", err); 3038 mutex_lock(&mdsc->mutex); 3039 3040 /* only abort if we didn't race with a real reply */ 3041 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3042 err = le32_to_cpu(req->r_reply_info.head->result); 3043 } else if (err < 0) { 3044 dout("aborted request %lld with %d\n", req->r_tid, err); 3045 3046 /* 3047 * ensure we aren't running concurrently with 3048 * ceph_fill_trace or ceph_readdir_prepopulate, which 3049 * rely on locks (dir mutex) held by our caller. 3050 */ 3051 mutex_lock(&req->r_fill_mutex); 3052 req->r_err = err; 3053 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3054 mutex_unlock(&req->r_fill_mutex); 3055 3056 if (req->r_parent && 3057 (req->r_op & CEPH_MDS_OP_WRITE)) 3058 ceph_invalidate_dir_request(req); 3059 } else { 3060 err = req->r_err; 3061 } 3062 3063 mutex_unlock(&mdsc->mutex); 3064 return err; 3065 } 3066 3067 /* 3068 * Synchrously perform an mds request. Take care of all of the 3069 * session setup, forwarding, retry details. 3070 */ 3071 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3072 struct inode *dir, 3073 struct ceph_mds_request *req) 3074 { 3075 int err; 3076 3077 dout("do_request on %p\n", req); 3078 3079 /* issue */ 3080 err = ceph_mdsc_submit_request(mdsc, dir, req); 3081 if (!err) 3082 err = ceph_mdsc_wait_request(mdsc, req); 3083 dout("do_request %p done, result %d\n", req, err); 3084 return err; 3085 } 3086 3087 /* 3088 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3089 * namespace request. 3090 */ 3091 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3092 { 3093 struct inode *dir = req->r_parent; 3094 struct inode *old_dir = req->r_old_dentry_dir; 3095 3096 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 3097 3098 ceph_dir_clear_complete(dir); 3099 if (old_dir) 3100 ceph_dir_clear_complete(old_dir); 3101 if (req->r_dentry) 3102 ceph_invalidate_dentry_lease(req->r_dentry); 3103 if (req->r_old_dentry) 3104 ceph_invalidate_dentry_lease(req->r_old_dentry); 3105 } 3106 3107 /* 3108 * Handle mds reply. 3109 * 3110 * We take the session mutex and parse and process the reply immediately. 3111 * This preserves the logical ordering of replies, capabilities, etc., sent 3112 * by the MDS as they are applied to our local cache. 3113 */ 3114 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3115 { 3116 struct ceph_mds_client *mdsc = session->s_mdsc; 3117 struct ceph_mds_request *req; 3118 struct ceph_mds_reply_head *head = msg->front.iov_base; 3119 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3120 struct ceph_snap_realm *realm; 3121 u64 tid; 3122 int err, result; 3123 int mds = session->s_mds; 3124 3125 if (msg->front.iov_len < sizeof(*head)) { 3126 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 3127 ceph_msg_dump(msg); 3128 return; 3129 } 3130 3131 /* get request, session */ 3132 tid = le64_to_cpu(msg->hdr.tid); 3133 mutex_lock(&mdsc->mutex); 3134 req = lookup_get_request(mdsc, tid); 3135 if (!req) { 3136 dout("handle_reply on unknown tid %llu\n", tid); 3137 mutex_unlock(&mdsc->mutex); 3138 return; 3139 } 3140 dout("handle_reply %p\n", req); 3141 3142 /* correct session? */ 3143 if (req->r_session != session) { 3144 pr_err("mdsc_handle_reply got %llu on session mds%d" 3145 " not mds%d\n", tid, session->s_mds, 3146 req->r_session ? req->r_session->s_mds : -1); 3147 mutex_unlock(&mdsc->mutex); 3148 goto out; 3149 } 3150 3151 /* dup? */ 3152 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3153 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3154 pr_warn("got a dup %s reply on %llu from mds%d\n", 3155 head->safe ? "safe" : "unsafe", tid, mds); 3156 mutex_unlock(&mdsc->mutex); 3157 goto out; 3158 } 3159 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3160 pr_warn("got unsafe after safe on %llu from mds%d\n", 3161 tid, mds); 3162 mutex_unlock(&mdsc->mutex); 3163 goto out; 3164 } 3165 3166 result = le32_to_cpu(head->result); 3167 3168 /* 3169 * Handle an ESTALE 3170 * if we're not talking to the authority, send to them 3171 * if the authority has changed while we weren't looking, 3172 * send to new authority 3173 * Otherwise we just have to return an ESTALE 3174 */ 3175 if (result == -ESTALE) { 3176 dout("got ESTALE on request %llu\n", req->r_tid); 3177 req->r_resend_mds = -1; 3178 if (req->r_direct_mode != USE_AUTH_MDS) { 3179 dout("not using auth, setting for that now\n"); 3180 req->r_direct_mode = USE_AUTH_MDS; 3181 __do_request(mdsc, req); 3182 mutex_unlock(&mdsc->mutex); 3183 goto out; 3184 } else { 3185 int mds = __choose_mds(mdsc, req, NULL); 3186 if (mds >= 0 && mds != req->r_session->s_mds) { 3187 dout("but auth changed, so resending\n"); 3188 __do_request(mdsc, req); 3189 mutex_unlock(&mdsc->mutex); 3190 goto out; 3191 } 3192 } 3193 dout("have to return ESTALE on request %llu\n", req->r_tid); 3194 } 3195 3196 3197 if (head->safe) { 3198 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3199 __unregister_request(mdsc, req); 3200 3201 /* last request during umount? */ 3202 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3203 complete_all(&mdsc->safe_umount_waiters); 3204 3205 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3206 /* 3207 * We already handled the unsafe response, now do the 3208 * cleanup. No need to examine the response; the MDS 3209 * doesn't include any result info in the safe 3210 * response. And even if it did, there is nothing 3211 * useful we could do with a revised return value. 3212 */ 3213 dout("got safe reply %llu, mds%d\n", tid, mds); 3214 3215 mutex_unlock(&mdsc->mutex); 3216 goto out; 3217 } 3218 } else { 3219 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3220 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3221 } 3222 3223 dout("handle_reply tid %lld result %d\n", tid, result); 3224 rinfo = &req->r_reply_info; 3225 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3226 err = parse_reply_info(session, msg, rinfo, (u64)-1); 3227 else 3228 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); 3229 mutex_unlock(&mdsc->mutex); 3230 3231 /* Must find target inode outside of mutexes to avoid deadlocks */ 3232 if ((err >= 0) && rinfo->head->is_target) { 3233 struct inode *in; 3234 struct ceph_vino tvino = { 3235 .ino = le64_to_cpu(rinfo->targeti.in->ino), 3236 .snap = le64_to_cpu(rinfo->targeti.in->snapid) 3237 }; 3238 3239 in = ceph_get_inode(mdsc->fsc->sb, tvino); 3240 if (IS_ERR(in)) { 3241 err = PTR_ERR(in); 3242 mutex_lock(&session->s_mutex); 3243 goto out_err; 3244 } 3245 req->r_target_inode = in; 3246 } 3247 3248 mutex_lock(&session->s_mutex); 3249 if (err < 0) { 3250 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 3251 ceph_msg_dump(msg); 3252 goto out_err; 3253 } 3254 3255 /* snap trace */ 3256 realm = NULL; 3257 if (rinfo->snapblob_len) { 3258 down_write(&mdsc->snap_rwsem); 3259 ceph_update_snap_trace(mdsc, rinfo->snapblob, 3260 rinfo->snapblob + rinfo->snapblob_len, 3261 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3262 &realm); 3263 downgrade_write(&mdsc->snap_rwsem); 3264 } else { 3265 down_read(&mdsc->snap_rwsem); 3266 } 3267 3268 /* insert trace into our cache */ 3269 mutex_lock(&req->r_fill_mutex); 3270 current->journal_info = req; 3271 err = ceph_fill_trace(mdsc->fsc->sb, req); 3272 if (err == 0) { 3273 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3274 req->r_op == CEPH_MDS_OP_LSSNAP)) 3275 ceph_readdir_prepopulate(req, req->r_session); 3276 } 3277 current->journal_info = NULL; 3278 mutex_unlock(&req->r_fill_mutex); 3279 3280 up_read(&mdsc->snap_rwsem); 3281 if (realm) 3282 ceph_put_snap_realm(mdsc, realm); 3283 3284 if (err == 0) { 3285 if (req->r_target_inode && 3286 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3287 struct ceph_inode_info *ci = 3288 ceph_inode(req->r_target_inode); 3289 spin_lock(&ci->i_unsafe_lock); 3290 list_add_tail(&req->r_unsafe_target_item, 3291 &ci->i_unsafe_iops); 3292 spin_unlock(&ci->i_unsafe_lock); 3293 } 3294 3295 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3296 } 3297 out_err: 3298 mutex_lock(&mdsc->mutex); 3299 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3300 if (err) { 3301 req->r_err = err; 3302 } else { 3303 req->r_reply = ceph_msg_get(msg); 3304 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3305 } 3306 } else { 3307 dout("reply arrived after request %lld was aborted\n", tid); 3308 } 3309 mutex_unlock(&mdsc->mutex); 3310 3311 mutex_unlock(&session->s_mutex); 3312 3313 /* kick calling process */ 3314 complete_request(mdsc, req); 3315 3316 ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency, 3317 req->r_end_latency, err); 3318 out: 3319 ceph_mdsc_put_request(req); 3320 return; 3321 } 3322 3323 3324 3325 /* 3326 * handle mds notification that our request has been forwarded. 3327 */ 3328 static void handle_forward(struct ceph_mds_client *mdsc, 3329 struct ceph_mds_session *session, 3330 struct ceph_msg *msg) 3331 { 3332 struct ceph_mds_request *req; 3333 u64 tid = le64_to_cpu(msg->hdr.tid); 3334 u32 next_mds; 3335 u32 fwd_seq; 3336 int err = -EINVAL; 3337 void *p = msg->front.iov_base; 3338 void *end = p + msg->front.iov_len; 3339 3340 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3341 next_mds = ceph_decode_32(&p); 3342 fwd_seq = ceph_decode_32(&p); 3343 3344 mutex_lock(&mdsc->mutex); 3345 req = lookup_get_request(mdsc, tid); 3346 if (!req) { 3347 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 3348 goto out; /* dup reply? */ 3349 } 3350 3351 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3352 dout("forward tid %llu aborted, unregistering\n", tid); 3353 __unregister_request(mdsc, req); 3354 } else if (fwd_seq <= req->r_num_fwd) { 3355 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 3356 tid, next_mds, req->r_num_fwd, fwd_seq); 3357 } else { 3358 /* resend. forward race not possible; mds would drop */ 3359 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3360 BUG_ON(req->r_err); 3361 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3362 req->r_attempts = 0; 3363 req->r_num_fwd = fwd_seq; 3364 req->r_resend_mds = next_mds; 3365 put_request_session(req); 3366 __do_request(mdsc, req); 3367 } 3368 ceph_mdsc_put_request(req); 3369 out: 3370 mutex_unlock(&mdsc->mutex); 3371 return; 3372 3373 bad: 3374 pr_err("mdsc_handle_forward decode error err=%d\n", err); 3375 } 3376 3377 static int __decode_session_metadata(void **p, void *end, 3378 bool *blocklisted) 3379 { 3380 /* map<string,string> */ 3381 u32 n; 3382 bool err_str; 3383 ceph_decode_32_safe(p, end, n, bad); 3384 while (n-- > 0) { 3385 u32 len; 3386 ceph_decode_32_safe(p, end, len, bad); 3387 ceph_decode_need(p, end, len, bad); 3388 err_str = !strncmp(*p, "error_string", len); 3389 *p += len; 3390 ceph_decode_32_safe(p, end, len, bad); 3391 ceph_decode_need(p, end, len, bad); 3392 /* 3393 * Match "blocklisted (blacklisted)" from newer MDSes, 3394 * or "blacklisted" from older MDSes. 3395 */ 3396 if (err_str && strnstr(*p, "blacklisted", len)) 3397 *blocklisted = true; 3398 *p += len; 3399 } 3400 return 0; 3401 bad: 3402 return -1; 3403 } 3404 3405 /* 3406 * handle a mds session control message 3407 */ 3408 static void handle_session(struct ceph_mds_session *session, 3409 struct ceph_msg *msg) 3410 { 3411 struct ceph_mds_client *mdsc = session->s_mdsc; 3412 int mds = session->s_mds; 3413 int msg_version = le16_to_cpu(msg->hdr.version); 3414 void *p = msg->front.iov_base; 3415 void *end = p + msg->front.iov_len; 3416 struct ceph_mds_session_head *h; 3417 u32 op; 3418 u64 seq, features = 0; 3419 int wake = 0; 3420 bool blocklisted = false; 3421 3422 /* decode */ 3423 ceph_decode_need(&p, end, sizeof(*h), bad); 3424 h = p; 3425 p += sizeof(*h); 3426 3427 op = le32_to_cpu(h->op); 3428 seq = le64_to_cpu(h->seq); 3429 3430 if (msg_version >= 3) { 3431 u32 len; 3432 /* version >= 2, metadata */ 3433 if (__decode_session_metadata(&p, end, &blocklisted) < 0) 3434 goto bad; 3435 /* version >= 3, feature bits */ 3436 ceph_decode_32_safe(&p, end, len, bad); 3437 if (len) { 3438 ceph_decode_64_safe(&p, end, features, bad); 3439 p += len - sizeof(features); 3440 } 3441 } 3442 3443 mutex_lock(&mdsc->mutex); 3444 if (op == CEPH_SESSION_CLOSE) { 3445 ceph_get_mds_session(session); 3446 __unregister_session(mdsc, session); 3447 } 3448 /* FIXME: this ttl calculation is generous */ 3449 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 3450 mutex_unlock(&mdsc->mutex); 3451 3452 mutex_lock(&session->s_mutex); 3453 3454 dout("handle_session mds%d %s %p state %s seq %llu\n", 3455 mds, ceph_session_op_name(op), session, 3456 ceph_session_state_name(session->s_state), seq); 3457 3458 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 3459 session->s_state = CEPH_MDS_SESSION_OPEN; 3460 pr_info("mds%d came back\n", session->s_mds); 3461 } 3462 3463 switch (op) { 3464 case CEPH_SESSION_OPEN: 3465 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3466 pr_info("mds%d reconnect success\n", session->s_mds); 3467 session->s_state = CEPH_MDS_SESSION_OPEN; 3468 session->s_features = features; 3469 renewed_caps(mdsc, session, 0); 3470 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features)) 3471 metric_schedule_delayed(&mdsc->metric); 3472 wake = 1; 3473 if (mdsc->stopping) 3474 __close_session(mdsc, session); 3475 break; 3476 3477 case CEPH_SESSION_RENEWCAPS: 3478 if (session->s_renew_seq == seq) 3479 renewed_caps(mdsc, session, 1); 3480 break; 3481 3482 case CEPH_SESSION_CLOSE: 3483 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3484 pr_info("mds%d reconnect denied\n", session->s_mds); 3485 session->s_state = CEPH_MDS_SESSION_CLOSED; 3486 cleanup_session_requests(mdsc, session); 3487 remove_session_caps(session); 3488 wake = 2; /* for good measure */ 3489 wake_up_all(&mdsc->session_close_wq); 3490 break; 3491 3492 case CEPH_SESSION_STALE: 3493 pr_info("mds%d caps went stale, renewing\n", 3494 session->s_mds); 3495 atomic_inc(&session->s_cap_gen); 3496 session->s_cap_ttl = jiffies - 1; 3497 send_renew_caps(mdsc, session); 3498 break; 3499 3500 case CEPH_SESSION_RECALL_STATE: 3501 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 3502 break; 3503 3504 case CEPH_SESSION_FLUSHMSG: 3505 send_flushmsg_ack(mdsc, session, seq); 3506 break; 3507 3508 case CEPH_SESSION_FORCE_RO: 3509 dout("force_session_readonly %p\n", session); 3510 spin_lock(&session->s_cap_lock); 3511 session->s_readonly = true; 3512 spin_unlock(&session->s_cap_lock); 3513 wake_up_session_caps(session, FORCE_RO); 3514 break; 3515 3516 case CEPH_SESSION_REJECT: 3517 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 3518 pr_info("mds%d rejected session\n", session->s_mds); 3519 session->s_state = CEPH_MDS_SESSION_REJECTED; 3520 cleanup_session_requests(mdsc, session); 3521 remove_session_caps(session); 3522 if (blocklisted) 3523 mdsc->fsc->blocklisted = true; 3524 wake = 2; /* for good measure */ 3525 break; 3526 3527 default: 3528 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 3529 WARN_ON(1); 3530 } 3531 3532 mutex_unlock(&session->s_mutex); 3533 if (wake) { 3534 mutex_lock(&mdsc->mutex); 3535 __wake_requests(mdsc, &session->s_waiting); 3536 if (wake == 2) 3537 kick_requests(mdsc, mds); 3538 mutex_unlock(&mdsc->mutex); 3539 } 3540 if (op == CEPH_SESSION_CLOSE) 3541 ceph_put_mds_session(session); 3542 return; 3543 3544 bad: 3545 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 3546 (int)msg->front.iov_len); 3547 ceph_msg_dump(msg); 3548 return; 3549 } 3550 3551 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 3552 { 3553 int dcaps; 3554 3555 dcaps = xchg(&req->r_dir_caps, 0); 3556 if (dcaps) { 3557 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3558 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 3559 } 3560 } 3561 3562 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req) 3563 { 3564 int dcaps; 3565 3566 dcaps = xchg(&req->r_dir_caps, 0); 3567 if (dcaps) { 3568 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3569 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent), 3570 dcaps); 3571 } 3572 } 3573 3574 /* 3575 * called under session->mutex. 3576 */ 3577 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 3578 struct ceph_mds_session *session) 3579 { 3580 struct ceph_mds_request *req, *nreq; 3581 struct rb_node *p; 3582 3583 dout("replay_unsafe_requests mds%d\n", session->s_mds); 3584 3585 mutex_lock(&mdsc->mutex); 3586 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 3587 __send_request(session, req, true); 3588 3589 /* 3590 * also re-send old requests when MDS enters reconnect stage. So that MDS 3591 * can process completed request in clientreplay stage. 3592 */ 3593 p = rb_first(&mdsc->request_tree); 3594 while (p) { 3595 req = rb_entry(p, struct ceph_mds_request, r_node); 3596 p = rb_next(p); 3597 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3598 continue; 3599 if (req->r_attempts == 0) 3600 continue; /* only old requests */ 3601 if (!req->r_session) 3602 continue; 3603 if (req->r_session->s_mds != session->s_mds) 3604 continue; 3605 3606 ceph_mdsc_release_dir_caps_no_check(req); 3607 3608 __send_request(session, req, true); 3609 } 3610 mutex_unlock(&mdsc->mutex); 3611 } 3612 3613 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 3614 { 3615 struct ceph_msg *reply; 3616 struct ceph_pagelist *_pagelist; 3617 struct page *page; 3618 __le32 *addr; 3619 int err = -ENOMEM; 3620 3621 if (!recon_state->allow_multi) 3622 return -ENOSPC; 3623 3624 /* can't handle message that contains both caps and realm */ 3625 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 3626 3627 /* pre-allocate new pagelist */ 3628 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 3629 if (!_pagelist) 3630 return -ENOMEM; 3631 3632 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3633 if (!reply) 3634 goto fail_msg; 3635 3636 /* placeholder for nr_caps */ 3637 err = ceph_pagelist_encode_32(_pagelist, 0); 3638 if (err < 0) 3639 goto fail; 3640 3641 if (recon_state->nr_caps) { 3642 /* currently encoding caps */ 3643 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 3644 if (err) 3645 goto fail; 3646 } else { 3647 /* placeholder for nr_realms (currently encoding relams) */ 3648 err = ceph_pagelist_encode_32(_pagelist, 0); 3649 if (err < 0) 3650 goto fail; 3651 } 3652 3653 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 3654 if (err) 3655 goto fail; 3656 3657 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 3658 addr = kmap_atomic(page); 3659 if (recon_state->nr_caps) { 3660 /* currently encoding caps */ 3661 *addr = cpu_to_le32(recon_state->nr_caps); 3662 } else { 3663 /* currently encoding relams */ 3664 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 3665 } 3666 kunmap_atomic(addr); 3667 3668 reply->hdr.version = cpu_to_le16(5); 3669 reply->hdr.compat_version = cpu_to_le16(4); 3670 3671 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 3672 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 3673 3674 ceph_con_send(&recon_state->session->s_con, reply); 3675 ceph_pagelist_release(recon_state->pagelist); 3676 3677 recon_state->pagelist = _pagelist; 3678 recon_state->nr_caps = 0; 3679 recon_state->nr_realms = 0; 3680 recon_state->msg_version = 5; 3681 return 0; 3682 fail: 3683 ceph_msg_put(reply); 3684 fail_msg: 3685 ceph_pagelist_release(_pagelist); 3686 return err; 3687 } 3688 3689 static struct dentry* d_find_primary(struct inode *inode) 3690 { 3691 struct dentry *alias, *dn = NULL; 3692 3693 if (hlist_empty(&inode->i_dentry)) 3694 return NULL; 3695 3696 spin_lock(&inode->i_lock); 3697 if (hlist_empty(&inode->i_dentry)) 3698 goto out_unlock; 3699 3700 if (S_ISDIR(inode->i_mode)) { 3701 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias); 3702 if (!IS_ROOT(alias)) 3703 dn = dget(alias); 3704 goto out_unlock; 3705 } 3706 3707 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) { 3708 spin_lock(&alias->d_lock); 3709 if (!d_unhashed(alias) && 3710 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 3711 dn = dget_dlock(alias); 3712 } 3713 spin_unlock(&alias->d_lock); 3714 if (dn) 3715 break; 3716 } 3717 out_unlock: 3718 spin_unlock(&inode->i_lock); 3719 return dn; 3720 } 3721 3722 /* 3723 * Encode information about a cap for a reconnect with the MDS. 3724 */ 3725 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, 3726 void *arg) 3727 { 3728 union { 3729 struct ceph_mds_cap_reconnect v2; 3730 struct ceph_mds_cap_reconnect_v1 v1; 3731 } rec; 3732 struct ceph_inode_info *ci = cap->ci; 3733 struct ceph_reconnect_state *recon_state = arg; 3734 struct ceph_pagelist *pagelist = recon_state->pagelist; 3735 struct dentry *dentry; 3736 char *path; 3737 int pathlen, err; 3738 u64 pathbase; 3739 u64 snap_follows; 3740 3741 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 3742 inode, ceph_vinop(inode), cap, cap->cap_id, 3743 ceph_cap_string(cap->issued)); 3744 3745 dentry = d_find_primary(inode); 3746 if (dentry) { 3747 /* set pathbase to parent dir when msg_version >= 2 */ 3748 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 3749 recon_state->msg_version >= 2); 3750 dput(dentry); 3751 if (IS_ERR(path)) { 3752 err = PTR_ERR(path); 3753 goto out_err; 3754 } 3755 } else { 3756 path = NULL; 3757 pathlen = 0; 3758 pathbase = 0; 3759 } 3760 3761 spin_lock(&ci->i_ceph_lock); 3762 cap->seq = 0; /* reset cap seq */ 3763 cap->issue_seq = 0; /* and issue_seq */ 3764 cap->mseq = 0; /* and migrate_seq */ 3765 cap->cap_gen = atomic_read(&cap->session->s_cap_gen); 3766 3767 /* These are lost when the session goes away */ 3768 if (S_ISDIR(inode->i_mode)) { 3769 if (cap->issued & CEPH_CAP_DIR_CREATE) { 3770 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 3771 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 3772 } 3773 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 3774 } 3775 3776 if (recon_state->msg_version >= 2) { 3777 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 3778 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3779 rec.v2.issued = cpu_to_le32(cap->issued); 3780 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3781 rec.v2.pathbase = cpu_to_le64(pathbase); 3782 rec.v2.flock_len = (__force __le32) 3783 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 3784 } else { 3785 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 3786 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3787 rec.v1.issued = cpu_to_le32(cap->issued); 3788 rec.v1.size = cpu_to_le64(i_size_read(inode)); 3789 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 3790 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 3791 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3792 rec.v1.pathbase = cpu_to_le64(pathbase); 3793 } 3794 3795 if (list_empty(&ci->i_cap_snaps)) { 3796 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 3797 } else { 3798 struct ceph_cap_snap *capsnap = 3799 list_first_entry(&ci->i_cap_snaps, 3800 struct ceph_cap_snap, ci_item); 3801 snap_follows = capsnap->follows; 3802 } 3803 spin_unlock(&ci->i_ceph_lock); 3804 3805 if (recon_state->msg_version >= 2) { 3806 int num_fcntl_locks, num_flock_locks; 3807 struct ceph_filelock *flocks = NULL; 3808 size_t struct_len, total_len = sizeof(u64); 3809 u8 struct_v = 0; 3810 3811 encode_again: 3812 if (rec.v2.flock_len) { 3813 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 3814 } else { 3815 num_fcntl_locks = 0; 3816 num_flock_locks = 0; 3817 } 3818 if (num_fcntl_locks + num_flock_locks > 0) { 3819 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 3820 sizeof(struct ceph_filelock), 3821 GFP_NOFS); 3822 if (!flocks) { 3823 err = -ENOMEM; 3824 goto out_err; 3825 } 3826 err = ceph_encode_locks_to_buffer(inode, flocks, 3827 num_fcntl_locks, 3828 num_flock_locks); 3829 if (err) { 3830 kfree(flocks); 3831 flocks = NULL; 3832 if (err == -ENOSPC) 3833 goto encode_again; 3834 goto out_err; 3835 } 3836 } else { 3837 kfree(flocks); 3838 flocks = NULL; 3839 } 3840 3841 if (recon_state->msg_version >= 3) { 3842 /* version, compat_version and struct_len */ 3843 total_len += 2 * sizeof(u8) + sizeof(u32); 3844 struct_v = 2; 3845 } 3846 /* 3847 * number of encoded locks is stable, so copy to pagelist 3848 */ 3849 struct_len = 2 * sizeof(u32) + 3850 (num_fcntl_locks + num_flock_locks) * 3851 sizeof(struct ceph_filelock); 3852 rec.v2.flock_len = cpu_to_le32(struct_len); 3853 3854 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2); 3855 3856 if (struct_v >= 2) 3857 struct_len += sizeof(u64); /* snap_follows */ 3858 3859 total_len += struct_len; 3860 3861 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 3862 err = send_reconnect_partial(recon_state); 3863 if (err) 3864 goto out_freeflocks; 3865 pagelist = recon_state->pagelist; 3866 } 3867 3868 err = ceph_pagelist_reserve(pagelist, total_len); 3869 if (err) 3870 goto out_freeflocks; 3871 3872 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3873 if (recon_state->msg_version >= 3) { 3874 ceph_pagelist_encode_8(pagelist, struct_v); 3875 ceph_pagelist_encode_8(pagelist, 1); 3876 ceph_pagelist_encode_32(pagelist, struct_len); 3877 } 3878 ceph_pagelist_encode_string(pagelist, path, pathlen); 3879 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 3880 ceph_locks_to_pagelist(flocks, pagelist, 3881 num_fcntl_locks, num_flock_locks); 3882 if (struct_v >= 2) 3883 ceph_pagelist_encode_64(pagelist, snap_follows); 3884 out_freeflocks: 3885 kfree(flocks); 3886 } else { 3887 err = ceph_pagelist_reserve(pagelist, 3888 sizeof(u64) + sizeof(u32) + 3889 pathlen + sizeof(rec.v1)); 3890 if (err) 3891 goto out_err; 3892 3893 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3894 ceph_pagelist_encode_string(pagelist, path, pathlen); 3895 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 3896 } 3897 3898 out_err: 3899 ceph_mdsc_free_path(path, pathlen); 3900 if (!err) 3901 recon_state->nr_caps++; 3902 return err; 3903 } 3904 3905 static int encode_snap_realms(struct ceph_mds_client *mdsc, 3906 struct ceph_reconnect_state *recon_state) 3907 { 3908 struct rb_node *p; 3909 struct ceph_pagelist *pagelist = recon_state->pagelist; 3910 int err = 0; 3911 3912 if (recon_state->msg_version >= 4) { 3913 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 3914 if (err < 0) 3915 goto fail; 3916 } 3917 3918 /* 3919 * snaprealms. we provide mds with the ino, seq (version), and 3920 * parent for all of our realms. If the mds has any newer info, 3921 * it will tell us. 3922 */ 3923 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 3924 struct ceph_snap_realm *realm = 3925 rb_entry(p, struct ceph_snap_realm, node); 3926 struct ceph_mds_snaprealm_reconnect sr_rec; 3927 3928 if (recon_state->msg_version >= 4) { 3929 size_t need = sizeof(u8) * 2 + sizeof(u32) + 3930 sizeof(sr_rec); 3931 3932 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 3933 err = send_reconnect_partial(recon_state); 3934 if (err) 3935 goto fail; 3936 pagelist = recon_state->pagelist; 3937 } 3938 3939 err = ceph_pagelist_reserve(pagelist, need); 3940 if (err) 3941 goto fail; 3942 3943 ceph_pagelist_encode_8(pagelist, 1); 3944 ceph_pagelist_encode_8(pagelist, 1); 3945 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 3946 } 3947 3948 dout(" adding snap realm %llx seq %lld parent %llx\n", 3949 realm->ino, realm->seq, realm->parent_ino); 3950 sr_rec.ino = cpu_to_le64(realm->ino); 3951 sr_rec.seq = cpu_to_le64(realm->seq); 3952 sr_rec.parent = cpu_to_le64(realm->parent_ino); 3953 3954 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 3955 if (err) 3956 goto fail; 3957 3958 recon_state->nr_realms++; 3959 } 3960 fail: 3961 return err; 3962 } 3963 3964 3965 /* 3966 * If an MDS fails and recovers, clients need to reconnect in order to 3967 * reestablish shared state. This includes all caps issued through 3968 * this session _and_ the snap_realm hierarchy. Because it's not 3969 * clear which snap realms the mds cares about, we send everything we 3970 * know about.. that ensures we'll then get any new info the 3971 * recovering MDS might have. 3972 * 3973 * This is a relatively heavyweight operation, but it's rare. 3974 */ 3975 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 3976 struct ceph_mds_session *session) 3977 { 3978 struct ceph_msg *reply; 3979 int mds = session->s_mds; 3980 int err = -ENOMEM; 3981 struct ceph_reconnect_state recon_state = { 3982 .session = session, 3983 }; 3984 LIST_HEAD(dispose); 3985 3986 pr_info("mds%d reconnect start\n", mds); 3987 3988 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 3989 if (!recon_state.pagelist) 3990 goto fail_nopagelist; 3991 3992 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3993 if (!reply) 3994 goto fail_nomsg; 3995 3996 xa_destroy(&session->s_delegated_inos); 3997 3998 mutex_lock(&session->s_mutex); 3999 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 4000 session->s_seq = 0; 4001 4002 dout("session %p state %s\n", session, 4003 ceph_session_state_name(session->s_state)); 4004 4005 atomic_inc(&session->s_cap_gen); 4006 4007 spin_lock(&session->s_cap_lock); 4008 /* don't know if session is readonly */ 4009 session->s_readonly = 0; 4010 /* 4011 * notify __ceph_remove_cap() that we are composing cap reconnect. 4012 * If a cap get released before being added to the cap reconnect, 4013 * __ceph_remove_cap() should skip queuing cap release. 4014 */ 4015 session->s_cap_reconnect = 1; 4016 /* drop old cap expires; we're about to reestablish that state */ 4017 detach_cap_releases(session, &dispose); 4018 spin_unlock(&session->s_cap_lock); 4019 dispose_cap_releases(mdsc, &dispose); 4020 4021 /* trim unused caps to reduce MDS's cache rejoin time */ 4022 if (mdsc->fsc->sb->s_root) 4023 shrink_dcache_parent(mdsc->fsc->sb->s_root); 4024 4025 ceph_con_close(&session->s_con); 4026 ceph_con_open(&session->s_con, 4027 CEPH_ENTITY_TYPE_MDS, mds, 4028 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 4029 4030 /* replay unsafe requests */ 4031 replay_unsafe_requests(mdsc, session); 4032 4033 ceph_early_kick_flushing_caps(mdsc, session); 4034 4035 down_read(&mdsc->snap_rwsem); 4036 4037 /* placeholder for nr_caps */ 4038 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 4039 if (err) 4040 goto fail; 4041 4042 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 4043 recon_state.msg_version = 3; 4044 recon_state.allow_multi = true; 4045 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 4046 recon_state.msg_version = 3; 4047 } else { 4048 recon_state.msg_version = 2; 4049 } 4050 /* trsaverse this session's caps */ 4051 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 4052 4053 spin_lock(&session->s_cap_lock); 4054 session->s_cap_reconnect = 0; 4055 spin_unlock(&session->s_cap_lock); 4056 4057 if (err < 0) 4058 goto fail; 4059 4060 /* check if all realms can be encoded into current message */ 4061 if (mdsc->num_snap_realms) { 4062 size_t total_len = 4063 recon_state.pagelist->length + 4064 mdsc->num_snap_realms * 4065 sizeof(struct ceph_mds_snaprealm_reconnect); 4066 if (recon_state.msg_version >= 4) { 4067 /* number of realms */ 4068 total_len += sizeof(u32); 4069 /* version, compat_version and struct_len */ 4070 total_len += mdsc->num_snap_realms * 4071 (2 * sizeof(u8) + sizeof(u32)); 4072 } 4073 if (total_len > RECONNECT_MAX_SIZE) { 4074 if (!recon_state.allow_multi) { 4075 err = -ENOSPC; 4076 goto fail; 4077 } 4078 if (recon_state.nr_caps) { 4079 err = send_reconnect_partial(&recon_state); 4080 if (err) 4081 goto fail; 4082 } 4083 recon_state.msg_version = 5; 4084 } 4085 } 4086 4087 err = encode_snap_realms(mdsc, &recon_state); 4088 if (err < 0) 4089 goto fail; 4090 4091 if (recon_state.msg_version >= 5) { 4092 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 4093 if (err < 0) 4094 goto fail; 4095 } 4096 4097 if (recon_state.nr_caps || recon_state.nr_realms) { 4098 struct page *page = 4099 list_first_entry(&recon_state.pagelist->head, 4100 struct page, lru); 4101 __le32 *addr = kmap_atomic(page); 4102 if (recon_state.nr_caps) { 4103 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 4104 *addr = cpu_to_le32(recon_state.nr_caps); 4105 } else if (recon_state.msg_version >= 4) { 4106 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 4107 } 4108 kunmap_atomic(addr); 4109 } 4110 4111 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 4112 if (recon_state.msg_version >= 4) 4113 reply->hdr.compat_version = cpu_to_le16(4); 4114 4115 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 4116 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 4117 4118 ceph_con_send(&session->s_con, reply); 4119 4120 mutex_unlock(&session->s_mutex); 4121 4122 mutex_lock(&mdsc->mutex); 4123 __wake_requests(mdsc, &session->s_waiting); 4124 mutex_unlock(&mdsc->mutex); 4125 4126 up_read(&mdsc->snap_rwsem); 4127 ceph_pagelist_release(recon_state.pagelist); 4128 return; 4129 4130 fail: 4131 ceph_msg_put(reply); 4132 up_read(&mdsc->snap_rwsem); 4133 mutex_unlock(&session->s_mutex); 4134 fail_nomsg: 4135 ceph_pagelist_release(recon_state.pagelist); 4136 fail_nopagelist: 4137 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 4138 return; 4139 } 4140 4141 4142 /* 4143 * compare old and new mdsmaps, kicking requests 4144 * and closing out old connections as necessary 4145 * 4146 * called under mdsc->mutex. 4147 */ 4148 static void check_new_map(struct ceph_mds_client *mdsc, 4149 struct ceph_mdsmap *newmap, 4150 struct ceph_mdsmap *oldmap) 4151 { 4152 int i; 4153 int oldstate, newstate; 4154 struct ceph_mds_session *s; 4155 4156 dout("check_new_map new %u old %u\n", 4157 newmap->m_epoch, oldmap->m_epoch); 4158 4159 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4160 if (!mdsc->sessions[i]) 4161 continue; 4162 s = mdsc->sessions[i]; 4163 oldstate = ceph_mdsmap_get_state(oldmap, i); 4164 newstate = ceph_mdsmap_get_state(newmap, i); 4165 4166 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 4167 i, ceph_mds_state_name(oldstate), 4168 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 4169 ceph_mds_state_name(newstate), 4170 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 4171 ceph_session_state_name(s->s_state)); 4172 4173 if (i >= newmap->possible_max_rank) { 4174 /* force close session for stopped mds */ 4175 ceph_get_mds_session(s); 4176 __unregister_session(mdsc, s); 4177 __wake_requests(mdsc, &s->s_waiting); 4178 mutex_unlock(&mdsc->mutex); 4179 4180 mutex_lock(&s->s_mutex); 4181 cleanup_session_requests(mdsc, s); 4182 remove_session_caps(s); 4183 mutex_unlock(&s->s_mutex); 4184 4185 ceph_put_mds_session(s); 4186 4187 mutex_lock(&mdsc->mutex); 4188 kick_requests(mdsc, i); 4189 continue; 4190 } 4191 4192 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 4193 ceph_mdsmap_get_addr(newmap, i), 4194 sizeof(struct ceph_entity_addr))) { 4195 /* just close it */ 4196 mutex_unlock(&mdsc->mutex); 4197 mutex_lock(&s->s_mutex); 4198 mutex_lock(&mdsc->mutex); 4199 ceph_con_close(&s->s_con); 4200 mutex_unlock(&s->s_mutex); 4201 s->s_state = CEPH_MDS_SESSION_RESTARTING; 4202 } else if (oldstate == newstate) { 4203 continue; /* nothing new with this mds */ 4204 } 4205 4206 /* 4207 * send reconnect? 4208 */ 4209 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 4210 newstate >= CEPH_MDS_STATE_RECONNECT) { 4211 mutex_unlock(&mdsc->mutex); 4212 send_mds_reconnect(mdsc, s); 4213 mutex_lock(&mdsc->mutex); 4214 } 4215 4216 /* 4217 * kick request on any mds that has gone active. 4218 */ 4219 if (oldstate < CEPH_MDS_STATE_ACTIVE && 4220 newstate >= CEPH_MDS_STATE_ACTIVE) { 4221 if (oldstate != CEPH_MDS_STATE_CREATING && 4222 oldstate != CEPH_MDS_STATE_STARTING) 4223 pr_info("mds%d recovery completed\n", s->s_mds); 4224 kick_requests(mdsc, i); 4225 mutex_unlock(&mdsc->mutex); 4226 mutex_lock(&s->s_mutex); 4227 mutex_lock(&mdsc->mutex); 4228 ceph_kick_flushing_caps(mdsc, s); 4229 mutex_unlock(&s->s_mutex); 4230 wake_up_session_caps(s, RECONNECT); 4231 } 4232 } 4233 4234 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4235 s = mdsc->sessions[i]; 4236 if (!s) 4237 continue; 4238 if (!ceph_mdsmap_is_laggy(newmap, i)) 4239 continue; 4240 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4241 s->s_state == CEPH_MDS_SESSION_HUNG || 4242 s->s_state == CEPH_MDS_SESSION_CLOSING) { 4243 dout(" connecting to export targets of laggy mds%d\n", 4244 i); 4245 __open_export_target_sessions(mdsc, s); 4246 } 4247 } 4248 } 4249 4250 4251 4252 /* 4253 * leases 4254 */ 4255 4256 /* 4257 * caller must hold session s_mutex, dentry->d_lock 4258 */ 4259 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 4260 { 4261 struct ceph_dentry_info *di = ceph_dentry(dentry); 4262 4263 ceph_put_mds_session(di->lease_session); 4264 di->lease_session = NULL; 4265 } 4266 4267 static void handle_lease(struct ceph_mds_client *mdsc, 4268 struct ceph_mds_session *session, 4269 struct ceph_msg *msg) 4270 { 4271 struct super_block *sb = mdsc->fsc->sb; 4272 struct inode *inode; 4273 struct dentry *parent, *dentry; 4274 struct ceph_dentry_info *di; 4275 int mds = session->s_mds; 4276 struct ceph_mds_lease *h = msg->front.iov_base; 4277 u32 seq; 4278 struct ceph_vino vino; 4279 struct qstr dname; 4280 int release = 0; 4281 4282 dout("handle_lease from mds%d\n", mds); 4283 4284 /* decode */ 4285 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 4286 goto bad; 4287 vino.ino = le64_to_cpu(h->ino); 4288 vino.snap = CEPH_NOSNAP; 4289 seq = le32_to_cpu(h->seq); 4290 dname.len = get_unaligned_le32(h + 1); 4291 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 4292 goto bad; 4293 dname.name = (void *)(h + 1) + sizeof(u32); 4294 4295 /* lookup inode */ 4296 inode = ceph_find_inode(sb, vino); 4297 dout("handle_lease %s, ino %llx %p %.*s\n", 4298 ceph_lease_op_name(h->action), vino.ino, inode, 4299 dname.len, dname.name); 4300 4301 mutex_lock(&session->s_mutex); 4302 inc_session_sequence(session); 4303 4304 if (!inode) { 4305 dout("handle_lease no inode %llx\n", vino.ino); 4306 goto release; 4307 } 4308 4309 /* dentry */ 4310 parent = d_find_alias(inode); 4311 if (!parent) { 4312 dout("no parent dentry on inode %p\n", inode); 4313 WARN_ON(1); 4314 goto release; /* hrm... */ 4315 } 4316 dname.hash = full_name_hash(parent, dname.name, dname.len); 4317 dentry = d_lookup(parent, &dname); 4318 dput(parent); 4319 if (!dentry) 4320 goto release; 4321 4322 spin_lock(&dentry->d_lock); 4323 di = ceph_dentry(dentry); 4324 switch (h->action) { 4325 case CEPH_MDS_LEASE_REVOKE: 4326 if (di->lease_session == session) { 4327 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 4328 h->seq = cpu_to_le32(di->lease_seq); 4329 __ceph_mdsc_drop_dentry_lease(dentry); 4330 } 4331 release = 1; 4332 break; 4333 4334 case CEPH_MDS_LEASE_RENEW: 4335 if (di->lease_session == session && 4336 di->lease_gen == atomic_read(&session->s_cap_gen) && 4337 di->lease_renew_from && 4338 di->lease_renew_after == 0) { 4339 unsigned long duration = 4340 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 4341 4342 di->lease_seq = seq; 4343 di->time = di->lease_renew_from + duration; 4344 di->lease_renew_after = di->lease_renew_from + 4345 (duration >> 1); 4346 di->lease_renew_from = 0; 4347 } 4348 break; 4349 } 4350 spin_unlock(&dentry->d_lock); 4351 dput(dentry); 4352 4353 if (!release) 4354 goto out; 4355 4356 release: 4357 /* let's just reuse the same message */ 4358 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 4359 ceph_msg_get(msg); 4360 ceph_con_send(&session->s_con, msg); 4361 4362 out: 4363 mutex_unlock(&session->s_mutex); 4364 iput(inode); 4365 return; 4366 4367 bad: 4368 pr_err("corrupt lease message\n"); 4369 ceph_msg_dump(msg); 4370 } 4371 4372 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 4373 struct dentry *dentry, char action, 4374 u32 seq) 4375 { 4376 struct ceph_msg *msg; 4377 struct ceph_mds_lease *lease; 4378 struct inode *dir; 4379 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 4380 4381 dout("lease_send_msg identry %p %s to mds%d\n", 4382 dentry, ceph_lease_op_name(action), session->s_mds); 4383 4384 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 4385 if (!msg) 4386 return; 4387 lease = msg->front.iov_base; 4388 lease->action = action; 4389 lease->seq = cpu_to_le32(seq); 4390 4391 spin_lock(&dentry->d_lock); 4392 dir = d_inode(dentry->d_parent); 4393 lease->ino = cpu_to_le64(ceph_ino(dir)); 4394 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 4395 4396 put_unaligned_le32(dentry->d_name.len, lease + 1); 4397 memcpy((void *)(lease + 1) + 4, 4398 dentry->d_name.name, dentry->d_name.len); 4399 spin_unlock(&dentry->d_lock); 4400 /* 4401 * if this is a preemptive lease RELEASE, no need to 4402 * flush request stream, since the actual request will 4403 * soon follow. 4404 */ 4405 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 4406 4407 ceph_con_send(&session->s_con, msg); 4408 } 4409 4410 /* 4411 * lock unlock sessions, to wait ongoing session activities 4412 */ 4413 static void lock_unlock_sessions(struct ceph_mds_client *mdsc) 4414 { 4415 int i; 4416 4417 mutex_lock(&mdsc->mutex); 4418 for (i = 0; i < mdsc->max_sessions; i++) { 4419 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4420 if (!s) 4421 continue; 4422 mutex_unlock(&mdsc->mutex); 4423 mutex_lock(&s->s_mutex); 4424 mutex_unlock(&s->s_mutex); 4425 ceph_put_mds_session(s); 4426 mutex_lock(&mdsc->mutex); 4427 } 4428 mutex_unlock(&mdsc->mutex); 4429 } 4430 4431 static void maybe_recover_session(struct ceph_mds_client *mdsc) 4432 { 4433 struct ceph_fs_client *fsc = mdsc->fsc; 4434 4435 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 4436 return; 4437 4438 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 4439 return; 4440 4441 if (!READ_ONCE(fsc->blocklisted)) 4442 return; 4443 4444 pr_info("auto reconnect after blocklisted\n"); 4445 ceph_force_reconnect(fsc->sb); 4446 } 4447 4448 bool check_session_state(struct ceph_mds_session *s) 4449 { 4450 switch (s->s_state) { 4451 case CEPH_MDS_SESSION_OPEN: 4452 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 4453 s->s_state = CEPH_MDS_SESSION_HUNG; 4454 pr_info("mds%d hung\n", s->s_mds); 4455 } 4456 break; 4457 case CEPH_MDS_SESSION_CLOSING: 4458 /* Should never reach this when we're unmounting */ 4459 WARN_ON_ONCE(s->s_ttl); 4460 fallthrough; 4461 case CEPH_MDS_SESSION_NEW: 4462 case CEPH_MDS_SESSION_RESTARTING: 4463 case CEPH_MDS_SESSION_CLOSED: 4464 case CEPH_MDS_SESSION_REJECTED: 4465 return false; 4466 } 4467 4468 return true; 4469 } 4470 4471 /* 4472 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 4473 * then we need to retransmit that request. 4474 */ 4475 void inc_session_sequence(struct ceph_mds_session *s) 4476 { 4477 lockdep_assert_held(&s->s_mutex); 4478 4479 s->s_seq++; 4480 4481 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 4482 int ret; 4483 4484 dout("resending session close request for mds%d\n", s->s_mds); 4485 ret = request_close_session(s); 4486 if (ret < 0) 4487 pr_err("unable to close session to mds%d: %d\n", 4488 s->s_mds, ret); 4489 } 4490 } 4491 4492 /* 4493 * delayed work -- periodically trim expired leases, renew caps with mds. If 4494 * the @delay parameter is set to 0 or if it's more than 5 secs, the default 4495 * workqueue delay value of 5 secs will be used. 4496 */ 4497 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay) 4498 { 4499 unsigned long max_delay = HZ * 5; 4500 4501 /* 5 secs default delay */ 4502 if (!delay || (delay > max_delay)) 4503 delay = max_delay; 4504 schedule_delayed_work(&mdsc->delayed_work, 4505 round_jiffies_relative(delay)); 4506 } 4507 4508 static void delayed_work(struct work_struct *work) 4509 { 4510 struct ceph_mds_client *mdsc = 4511 container_of(work, struct ceph_mds_client, delayed_work.work); 4512 unsigned long delay; 4513 int renew_interval; 4514 int renew_caps; 4515 int i; 4516 4517 dout("mdsc delayed_work\n"); 4518 4519 if (mdsc->stopping) 4520 return; 4521 4522 mutex_lock(&mdsc->mutex); 4523 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 4524 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 4525 mdsc->last_renew_caps); 4526 if (renew_caps) 4527 mdsc->last_renew_caps = jiffies; 4528 4529 for (i = 0; i < mdsc->max_sessions; i++) { 4530 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4531 if (!s) 4532 continue; 4533 4534 if (!check_session_state(s)) { 4535 ceph_put_mds_session(s); 4536 continue; 4537 } 4538 mutex_unlock(&mdsc->mutex); 4539 4540 mutex_lock(&s->s_mutex); 4541 if (renew_caps) 4542 send_renew_caps(mdsc, s); 4543 else 4544 ceph_con_keepalive(&s->s_con); 4545 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4546 s->s_state == CEPH_MDS_SESSION_HUNG) 4547 ceph_send_cap_releases(mdsc, s); 4548 mutex_unlock(&s->s_mutex); 4549 ceph_put_mds_session(s); 4550 4551 mutex_lock(&mdsc->mutex); 4552 } 4553 mutex_unlock(&mdsc->mutex); 4554 4555 delay = ceph_check_delayed_caps(mdsc); 4556 4557 ceph_queue_cap_reclaim_work(mdsc); 4558 4559 ceph_trim_snapid_map(mdsc); 4560 4561 maybe_recover_session(mdsc); 4562 4563 schedule_delayed(mdsc, delay); 4564 } 4565 4566 int ceph_mdsc_init(struct ceph_fs_client *fsc) 4567 4568 { 4569 struct ceph_mds_client *mdsc; 4570 int err; 4571 4572 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 4573 if (!mdsc) 4574 return -ENOMEM; 4575 mdsc->fsc = fsc; 4576 mutex_init(&mdsc->mutex); 4577 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 4578 if (!mdsc->mdsmap) { 4579 err = -ENOMEM; 4580 goto err_mdsc; 4581 } 4582 4583 init_completion(&mdsc->safe_umount_waiters); 4584 init_waitqueue_head(&mdsc->session_close_wq); 4585 INIT_LIST_HEAD(&mdsc->waiting_for_map); 4586 mdsc->sessions = NULL; 4587 atomic_set(&mdsc->num_sessions, 0); 4588 mdsc->max_sessions = 0; 4589 mdsc->stopping = 0; 4590 atomic64_set(&mdsc->quotarealms_count, 0); 4591 mdsc->quotarealms_inodes = RB_ROOT; 4592 mutex_init(&mdsc->quotarealms_inodes_mutex); 4593 mdsc->last_snap_seq = 0; 4594 init_rwsem(&mdsc->snap_rwsem); 4595 mdsc->snap_realms = RB_ROOT; 4596 INIT_LIST_HEAD(&mdsc->snap_empty); 4597 mdsc->num_snap_realms = 0; 4598 spin_lock_init(&mdsc->snap_empty_lock); 4599 mdsc->last_tid = 0; 4600 mdsc->oldest_tid = 0; 4601 mdsc->request_tree = RB_ROOT; 4602 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 4603 mdsc->last_renew_caps = jiffies; 4604 INIT_LIST_HEAD(&mdsc->cap_delay_list); 4605 INIT_LIST_HEAD(&mdsc->cap_wait_list); 4606 spin_lock_init(&mdsc->cap_delay_lock); 4607 INIT_LIST_HEAD(&mdsc->snap_flush_list); 4608 spin_lock_init(&mdsc->snap_flush_lock); 4609 mdsc->last_cap_flush_tid = 1; 4610 INIT_LIST_HEAD(&mdsc->cap_flush_list); 4611 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 4612 mdsc->num_cap_flushing = 0; 4613 spin_lock_init(&mdsc->cap_dirty_lock); 4614 init_waitqueue_head(&mdsc->cap_flushing_wq); 4615 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 4616 atomic_set(&mdsc->cap_reclaim_pending, 0); 4617 err = ceph_metric_init(&mdsc->metric); 4618 if (err) 4619 goto err_mdsmap; 4620 4621 spin_lock_init(&mdsc->dentry_list_lock); 4622 INIT_LIST_HEAD(&mdsc->dentry_leases); 4623 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 4624 4625 ceph_caps_init(mdsc); 4626 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 4627 4628 spin_lock_init(&mdsc->snapid_map_lock); 4629 mdsc->snapid_map_tree = RB_ROOT; 4630 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 4631 4632 init_rwsem(&mdsc->pool_perm_rwsem); 4633 mdsc->pool_perm_tree = RB_ROOT; 4634 4635 strscpy(mdsc->nodename, utsname()->nodename, 4636 sizeof(mdsc->nodename)); 4637 4638 fsc->mdsc = mdsc; 4639 return 0; 4640 4641 err_mdsmap: 4642 kfree(mdsc->mdsmap); 4643 err_mdsc: 4644 kfree(mdsc); 4645 return err; 4646 } 4647 4648 /* 4649 * Wait for safe replies on open mds requests. If we time out, drop 4650 * all requests from the tree to avoid dangling dentry refs. 4651 */ 4652 static void wait_requests(struct ceph_mds_client *mdsc) 4653 { 4654 struct ceph_options *opts = mdsc->fsc->client->options; 4655 struct ceph_mds_request *req; 4656 4657 mutex_lock(&mdsc->mutex); 4658 if (__get_oldest_req(mdsc)) { 4659 mutex_unlock(&mdsc->mutex); 4660 4661 dout("wait_requests waiting for requests\n"); 4662 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 4663 ceph_timeout_jiffies(opts->mount_timeout)); 4664 4665 /* tear down remaining requests */ 4666 mutex_lock(&mdsc->mutex); 4667 while ((req = __get_oldest_req(mdsc))) { 4668 dout("wait_requests timed out on tid %llu\n", 4669 req->r_tid); 4670 list_del_init(&req->r_wait); 4671 __unregister_request(mdsc, req); 4672 } 4673 } 4674 mutex_unlock(&mdsc->mutex); 4675 dout("wait_requests done\n"); 4676 } 4677 4678 /* 4679 * called before mount is ro, and before dentries are torn down. 4680 * (hmm, does this still race with new lookups?) 4681 */ 4682 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 4683 { 4684 dout("pre_umount\n"); 4685 mdsc->stopping = 1; 4686 4687 lock_unlock_sessions(mdsc); 4688 ceph_flush_dirty_caps(mdsc); 4689 wait_requests(mdsc); 4690 4691 /* 4692 * wait for reply handlers to drop their request refs and 4693 * their inode/dcache refs 4694 */ 4695 ceph_msgr_flush(); 4696 4697 ceph_cleanup_quotarealms_inodes(mdsc); 4698 } 4699 4700 /* 4701 * wait for all write mds requests to flush. 4702 */ 4703 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 4704 { 4705 struct ceph_mds_request *req = NULL, *nextreq; 4706 struct rb_node *n; 4707 4708 mutex_lock(&mdsc->mutex); 4709 dout("wait_unsafe_requests want %lld\n", want_tid); 4710 restart: 4711 req = __get_oldest_req(mdsc); 4712 while (req && req->r_tid <= want_tid) { 4713 /* find next request */ 4714 n = rb_next(&req->r_node); 4715 if (n) 4716 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 4717 else 4718 nextreq = NULL; 4719 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 4720 (req->r_op & CEPH_MDS_OP_WRITE)) { 4721 /* write op */ 4722 ceph_mdsc_get_request(req); 4723 if (nextreq) 4724 ceph_mdsc_get_request(nextreq); 4725 mutex_unlock(&mdsc->mutex); 4726 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 4727 req->r_tid, want_tid); 4728 wait_for_completion(&req->r_safe_completion); 4729 mutex_lock(&mdsc->mutex); 4730 ceph_mdsc_put_request(req); 4731 if (!nextreq) 4732 break; /* next dne before, so we're done! */ 4733 if (RB_EMPTY_NODE(&nextreq->r_node)) { 4734 /* next request was removed from tree */ 4735 ceph_mdsc_put_request(nextreq); 4736 goto restart; 4737 } 4738 ceph_mdsc_put_request(nextreq); /* won't go away */ 4739 } 4740 req = nextreq; 4741 } 4742 mutex_unlock(&mdsc->mutex); 4743 dout("wait_unsafe_requests done\n"); 4744 } 4745 4746 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 4747 { 4748 u64 want_tid, want_flush; 4749 4750 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) 4751 return; 4752 4753 dout("sync\n"); 4754 mutex_lock(&mdsc->mutex); 4755 want_tid = mdsc->last_tid; 4756 mutex_unlock(&mdsc->mutex); 4757 4758 ceph_flush_dirty_caps(mdsc); 4759 spin_lock(&mdsc->cap_dirty_lock); 4760 want_flush = mdsc->last_cap_flush_tid; 4761 if (!list_empty(&mdsc->cap_flush_list)) { 4762 struct ceph_cap_flush *cf = 4763 list_last_entry(&mdsc->cap_flush_list, 4764 struct ceph_cap_flush, g_list); 4765 cf->wake = true; 4766 } 4767 spin_unlock(&mdsc->cap_dirty_lock); 4768 4769 dout("sync want tid %lld flush_seq %lld\n", 4770 want_tid, want_flush); 4771 4772 wait_unsafe_requests(mdsc, want_tid); 4773 wait_caps_flush(mdsc, want_flush); 4774 } 4775 4776 /* 4777 * true if all sessions are closed, or we force unmount 4778 */ 4779 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 4780 { 4781 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4782 return true; 4783 return atomic_read(&mdsc->num_sessions) <= skipped; 4784 } 4785 4786 /* 4787 * called after sb is ro. 4788 */ 4789 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 4790 { 4791 struct ceph_options *opts = mdsc->fsc->client->options; 4792 struct ceph_mds_session *session; 4793 int i; 4794 int skipped = 0; 4795 4796 dout("close_sessions\n"); 4797 4798 /* close sessions */ 4799 mutex_lock(&mdsc->mutex); 4800 for (i = 0; i < mdsc->max_sessions; i++) { 4801 session = __ceph_lookup_mds_session(mdsc, i); 4802 if (!session) 4803 continue; 4804 mutex_unlock(&mdsc->mutex); 4805 mutex_lock(&session->s_mutex); 4806 if (__close_session(mdsc, session) <= 0) 4807 skipped++; 4808 mutex_unlock(&session->s_mutex); 4809 ceph_put_mds_session(session); 4810 mutex_lock(&mdsc->mutex); 4811 } 4812 mutex_unlock(&mdsc->mutex); 4813 4814 dout("waiting for sessions to close\n"); 4815 wait_event_timeout(mdsc->session_close_wq, 4816 done_closing_sessions(mdsc, skipped), 4817 ceph_timeout_jiffies(opts->mount_timeout)); 4818 4819 /* tear down remaining sessions */ 4820 mutex_lock(&mdsc->mutex); 4821 for (i = 0; i < mdsc->max_sessions; i++) { 4822 if (mdsc->sessions[i]) { 4823 session = ceph_get_mds_session(mdsc->sessions[i]); 4824 __unregister_session(mdsc, session); 4825 mutex_unlock(&mdsc->mutex); 4826 mutex_lock(&session->s_mutex); 4827 remove_session_caps(session); 4828 mutex_unlock(&session->s_mutex); 4829 ceph_put_mds_session(session); 4830 mutex_lock(&mdsc->mutex); 4831 } 4832 } 4833 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 4834 mutex_unlock(&mdsc->mutex); 4835 4836 ceph_cleanup_snapid_map(mdsc); 4837 ceph_cleanup_empty_realms(mdsc); 4838 4839 cancel_work_sync(&mdsc->cap_reclaim_work); 4840 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4841 4842 dout("stopped\n"); 4843 } 4844 4845 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 4846 { 4847 struct ceph_mds_session *session; 4848 int mds; 4849 4850 dout("force umount\n"); 4851 4852 mutex_lock(&mdsc->mutex); 4853 for (mds = 0; mds < mdsc->max_sessions; mds++) { 4854 session = __ceph_lookup_mds_session(mdsc, mds); 4855 if (!session) 4856 continue; 4857 4858 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 4859 __unregister_session(mdsc, session); 4860 __wake_requests(mdsc, &session->s_waiting); 4861 mutex_unlock(&mdsc->mutex); 4862 4863 mutex_lock(&session->s_mutex); 4864 __close_session(mdsc, session); 4865 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 4866 cleanup_session_requests(mdsc, session); 4867 remove_session_caps(session); 4868 } 4869 mutex_unlock(&session->s_mutex); 4870 ceph_put_mds_session(session); 4871 4872 mutex_lock(&mdsc->mutex); 4873 kick_requests(mdsc, mds); 4874 } 4875 __wake_requests(mdsc, &mdsc->waiting_for_map); 4876 mutex_unlock(&mdsc->mutex); 4877 } 4878 4879 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 4880 { 4881 dout("stop\n"); 4882 /* 4883 * Make sure the delayed work stopped before releasing 4884 * the resources. 4885 * 4886 * Because the cancel_delayed_work_sync() will only 4887 * guarantee that the work finishes executing. But the 4888 * delayed work will re-arm itself again after that. 4889 */ 4890 flush_delayed_work(&mdsc->delayed_work); 4891 4892 if (mdsc->mdsmap) 4893 ceph_mdsmap_destroy(mdsc->mdsmap); 4894 kfree(mdsc->sessions); 4895 ceph_caps_finalize(mdsc); 4896 ceph_pool_perm_destroy(mdsc); 4897 } 4898 4899 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 4900 { 4901 struct ceph_mds_client *mdsc = fsc->mdsc; 4902 dout("mdsc_destroy %p\n", mdsc); 4903 4904 if (!mdsc) 4905 return; 4906 4907 /* flush out any connection work with references to us */ 4908 ceph_msgr_flush(); 4909 4910 ceph_mdsc_stop(mdsc); 4911 4912 ceph_metric_destroy(&mdsc->metric); 4913 4914 flush_delayed_work(&mdsc->metric.delayed_work); 4915 fsc->mdsc = NULL; 4916 kfree(mdsc); 4917 dout("mdsc_destroy %p done\n", mdsc); 4918 } 4919 4920 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4921 { 4922 struct ceph_fs_client *fsc = mdsc->fsc; 4923 const char *mds_namespace = fsc->mount_options->mds_namespace; 4924 void *p = msg->front.iov_base; 4925 void *end = p + msg->front.iov_len; 4926 u32 epoch; 4927 u32 num_fs; 4928 u32 mount_fscid = (u32)-1; 4929 int err = -EINVAL; 4930 4931 ceph_decode_need(&p, end, sizeof(u32), bad); 4932 epoch = ceph_decode_32(&p); 4933 4934 dout("handle_fsmap epoch %u\n", epoch); 4935 4936 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */ 4937 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad); 4938 4939 ceph_decode_32_safe(&p, end, num_fs, bad); 4940 while (num_fs-- > 0) { 4941 void *info_p, *info_end; 4942 u32 info_len; 4943 u32 fscid, namelen; 4944 4945 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4946 p += 2; // info_v, info_cv 4947 info_len = ceph_decode_32(&p); 4948 ceph_decode_need(&p, end, info_len, bad); 4949 info_p = p; 4950 info_end = p + info_len; 4951 p = info_end; 4952 4953 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 4954 fscid = ceph_decode_32(&info_p); 4955 namelen = ceph_decode_32(&info_p); 4956 ceph_decode_need(&info_p, info_end, namelen, bad); 4957 4958 if (mds_namespace && 4959 strlen(mds_namespace) == namelen && 4960 !strncmp(mds_namespace, (char *)info_p, namelen)) { 4961 mount_fscid = fscid; 4962 break; 4963 } 4964 } 4965 4966 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 4967 if (mount_fscid != (u32)-1) { 4968 fsc->client->monc.fs_cluster_id = mount_fscid; 4969 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 4970 0, true); 4971 ceph_monc_renew_subs(&fsc->client->monc); 4972 } else { 4973 err = -ENOENT; 4974 goto err_out; 4975 } 4976 return; 4977 4978 bad: 4979 pr_err("error decoding fsmap\n"); 4980 err_out: 4981 mutex_lock(&mdsc->mutex); 4982 mdsc->mdsmap_err = err; 4983 __wake_requests(mdsc, &mdsc->waiting_for_map); 4984 mutex_unlock(&mdsc->mutex); 4985 } 4986 4987 /* 4988 * handle mds map update. 4989 */ 4990 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4991 { 4992 u32 epoch; 4993 u32 maplen; 4994 void *p = msg->front.iov_base; 4995 void *end = p + msg->front.iov_len; 4996 struct ceph_mdsmap *newmap, *oldmap; 4997 struct ceph_fsid fsid; 4998 int err = -EINVAL; 4999 5000 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 5001 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 5002 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 5003 return; 5004 epoch = ceph_decode_32(&p); 5005 maplen = ceph_decode_32(&p); 5006 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 5007 5008 /* do we need it? */ 5009 mutex_lock(&mdsc->mutex); 5010 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 5011 dout("handle_map epoch %u <= our %u\n", 5012 epoch, mdsc->mdsmap->m_epoch); 5013 mutex_unlock(&mdsc->mutex); 5014 return; 5015 } 5016 5017 newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client)); 5018 if (IS_ERR(newmap)) { 5019 err = PTR_ERR(newmap); 5020 goto bad_unlock; 5021 } 5022 5023 /* swap into place */ 5024 if (mdsc->mdsmap) { 5025 oldmap = mdsc->mdsmap; 5026 mdsc->mdsmap = newmap; 5027 check_new_map(mdsc, newmap, oldmap); 5028 ceph_mdsmap_destroy(oldmap); 5029 } else { 5030 mdsc->mdsmap = newmap; /* first mds map */ 5031 } 5032 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 5033 MAX_LFS_FILESIZE); 5034 5035 __wake_requests(mdsc, &mdsc->waiting_for_map); 5036 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 5037 mdsc->mdsmap->m_epoch); 5038 5039 mutex_unlock(&mdsc->mutex); 5040 schedule_delayed(mdsc, 0); 5041 return; 5042 5043 bad_unlock: 5044 mutex_unlock(&mdsc->mutex); 5045 bad: 5046 pr_err("error decoding mdsmap %d\n", err); 5047 return; 5048 } 5049 5050 static struct ceph_connection *mds_get_con(struct ceph_connection *con) 5051 { 5052 struct ceph_mds_session *s = con->private; 5053 5054 if (ceph_get_mds_session(s)) 5055 return con; 5056 return NULL; 5057 } 5058 5059 static void mds_put_con(struct ceph_connection *con) 5060 { 5061 struct ceph_mds_session *s = con->private; 5062 5063 ceph_put_mds_session(s); 5064 } 5065 5066 /* 5067 * if the client is unresponsive for long enough, the mds will kill 5068 * the session entirely. 5069 */ 5070 static void mds_peer_reset(struct ceph_connection *con) 5071 { 5072 struct ceph_mds_session *s = con->private; 5073 struct ceph_mds_client *mdsc = s->s_mdsc; 5074 5075 pr_warn("mds%d closed our session\n", s->s_mds); 5076 send_mds_reconnect(mdsc, s); 5077 } 5078 5079 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 5080 { 5081 struct ceph_mds_session *s = con->private; 5082 struct ceph_mds_client *mdsc = s->s_mdsc; 5083 int type = le16_to_cpu(msg->hdr.type); 5084 5085 mutex_lock(&mdsc->mutex); 5086 if (__verify_registered_session(mdsc, s) < 0) { 5087 mutex_unlock(&mdsc->mutex); 5088 goto out; 5089 } 5090 mutex_unlock(&mdsc->mutex); 5091 5092 switch (type) { 5093 case CEPH_MSG_MDS_MAP: 5094 ceph_mdsc_handle_mdsmap(mdsc, msg); 5095 break; 5096 case CEPH_MSG_FS_MAP_USER: 5097 ceph_mdsc_handle_fsmap(mdsc, msg); 5098 break; 5099 case CEPH_MSG_CLIENT_SESSION: 5100 handle_session(s, msg); 5101 break; 5102 case CEPH_MSG_CLIENT_REPLY: 5103 handle_reply(s, msg); 5104 break; 5105 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 5106 handle_forward(mdsc, s, msg); 5107 break; 5108 case CEPH_MSG_CLIENT_CAPS: 5109 ceph_handle_caps(s, msg); 5110 break; 5111 case CEPH_MSG_CLIENT_SNAP: 5112 ceph_handle_snap(mdsc, s, msg); 5113 break; 5114 case CEPH_MSG_CLIENT_LEASE: 5115 handle_lease(mdsc, s, msg); 5116 break; 5117 case CEPH_MSG_CLIENT_QUOTA: 5118 ceph_handle_quota(mdsc, s, msg); 5119 break; 5120 5121 default: 5122 pr_err("received unknown message type %d %s\n", type, 5123 ceph_msg_type_name(type)); 5124 } 5125 out: 5126 ceph_msg_put(msg); 5127 } 5128 5129 /* 5130 * authentication 5131 */ 5132 5133 /* 5134 * Note: returned pointer is the address of a structure that's 5135 * managed separately. Caller must *not* attempt to free it. 5136 */ 5137 static struct ceph_auth_handshake * 5138 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new) 5139 { 5140 struct ceph_mds_session *s = con->private; 5141 struct ceph_mds_client *mdsc = s->s_mdsc; 5142 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5143 struct ceph_auth_handshake *auth = &s->s_auth; 5144 int ret; 5145 5146 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5147 force_new, proto, NULL, NULL); 5148 if (ret) 5149 return ERR_PTR(ret); 5150 5151 return auth; 5152 } 5153 5154 static int mds_add_authorizer_challenge(struct ceph_connection *con, 5155 void *challenge_buf, int challenge_buf_len) 5156 { 5157 struct ceph_mds_session *s = con->private; 5158 struct ceph_mds_client *mdsc = s->s_mdsc; 5159 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5160 5161 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 5162 challenge_buf, challenge_buf_len); 5163 } 5164 5165 static int mds_verify_authorizer_reply(struct ceph_connection *con) 5166 { 5167 struct ceph_mds_session *s = con->private; 5168 struct ceph_mds_client *mdsc = s->s_mdsc; 5169 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5170 struct ceph_auth_handshake *auth = &s->s_auth; 5171 5172 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer, 5173 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len, 5174 NULL, NULL, NULL, NULL); 5175 } 5176 5177 static int mds_invalidate_authorizer(struct ceph_connection *con) 5178 { 5179 struct ceph_mds_session *s = con->private; 5180 struct ceph_mds_client *mdsc = s->s_mdsc; 5181 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5182 5183 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 5184 5185 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 5186 } 5187 5188 static int mds_get_auth_request(struct ceph_connection *con, 5189 void *buf, int *buf_len, 5190 void **authorizer, int *authorizer_len) 5191 { 5192 struct ceph_mds_session *s = con->private; 5193 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5194 struct ceph_auth_handshake *auth = &s->s_auth; 5195 int ret; 5196 5197 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5198 buf, buf_len); 5199 if (ret) 5200 return ret; 5201 5202 *authorizer = auth->authorizer_buf; 5203 *authorizer_len = auth->authorizer_buf_len; 5204 return 0; 5205 } 5206 5207 static int mds_handle_auth_reply_more(struct ceph_connection *con, 5208 void *reply, int reply_len, 5209 void *buf, int *buf_len, 5210 void **authorizer, int *authorizer_len) 5211 { 5212 struct ceph_mds_session *s = con->private; 5213 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5214 struct ceph_auth_handshake *auth = &s->s_auth; 5215 int ret; 5216 5217 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len, 5218 buf, buf_len); 5219 if (ret) 5220 return ret; 5221 5222 *authorizer = auth->authorizer_buf; 5223 *authorizer_len = auth->authorizer_buf_len; 5224 return 0; 5225 } 5226 5227 static int mds_handle_auth_done(struct ceph_connection *con, 5228 u64 global_id, void *reply, int reply_len, 5229 u8 *session_key, int *session_key_len, 5230 u8 *con_secret, int *con_secret_len) 5231 { 5232 struct ceph_mds_session *s = con->private; 5233 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5234 struct ceph_auth_handshake *auth = &s->s_auth; 5235 5236 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len, 5237 session_key, session_key_len, 5238 con_secret, con_secret_len); 5239 } 5240 5241 static int mds_handle_auth_bad_method(struct ceph_connection *con, 5242 int used_proto, int result, 5243 const int *allowed_protos, int proto_cnt, 5244 const int *allowed_modes, int mode_cnt) 5245 { 5246 struct ceph_mds_session *s = con->private; 5247 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc; 5248 int ret; 5249 5250 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS, 5251 used_proto, result, 5252 allowed_protos, proto_cnt, 5253 allowed_modes, mode_cnt)) { 5254 ret = ceph_monc_validate_auth(monc); 5255 if (ret) 5256 return ret; 5257 } 5258 5259 return -EACCES; 5260 } 5261 5262 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 5263 struct ceph_msg_header *hdr, int *skip) 5264 { 5265 struct ceph_msg *msg; 5266 int type = (int) le16_to_cpu(hdr->type); 5267 int front_len = (int) le32_to_cpu(hdr->front_len); 5268 5269 if (con->in_msg) 5270 return con->in_msg; 5271 5272 *skip = 0; 5273 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 5274 if (!msg) { 5275 pr_err("unable to allocate msg type %d len %d\n", 5276 type, front_len); 5277 return NULL; 5278 } 5279 5280 return msg; 5281 } 5282 5283 static int mds_sign_message(struct ceph_msg *msg) 5284 { 5285 struct ceph_mds_session *s = msg->con->private; 5286 struct ceph_auth_handshake *auth = &s->s_auth; 5287 5288 return ceph_auth_sign_message(auth, msg); 5289 } 5290 5291 static int mds_check_message_signature(struct ceph_msg *msg) 5292 { 5293 struct ceph_mds_session *s = msg->con->private; 5294 struct ceph_auth_handshake *auth = &s->s_auth; 5295 5296 return ceph_auth_check_message_signature(auth, msg); 5297 } 5298 5299 static const struct ceph_connection_operations mds_con_ops = { 5300 .get = mds_get_con, 5301 .put = mds_put_con, 5302 .alloc_msg = mds_alloc_msg, 5303 .dispatch = mds_dispatch, 5304 .peer_reset = mds_peer_reset, 5305 .get_authorizer = mds_get_authorizer, 5306 .add_authorizer_challenge = mds_add_authorizer_challenge, 5307 .verify_authorizer_reply = mds_verify_authorizer_reply, 5308 .invalidate_authorizer = mds_invalidate_authorizer, 5309 .sign_message = mds_sign_message, 5310 .check_message_signature = mds_check_message_signature, 5311 .get_auth_request = mds_get_auth_request, 5312 .handle_auth_reply_more = mds_handle_auth_reply_more, 5313 .handle_auth_done = mds_handle_auth_done, 5314 .handle_auth_bad_method = mds_handle_auth_bad_method, 5315 }; 5316 5317 /* eof */ 5318