1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 15 #include "super.h" 16 #include "mds_client.h" 17 18 #include <linux/ceph/ceph_features.h> 19 #include <linux/ceph/messenger.h> 20 #include <linux/ceph/decode.h> 21 #include <linux/ceph/pagelist.h> 22 #include <linux/ceph/auth.h> 23 #include <linux/ceph/debugfs.h> 24 25 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 26 27 /* 28 * A cluster of MDS (metadata server) daemons is responsible for 29 * managing the file system namespace (the directory hierarchy and 30 * inodes) and for coordinating shared access to storage. Metadata is 31 * partitioning hierarchically across a number of servers, and that 32 * partition varies over time as the cluster adjusts the distribution 33 * in order to balance load. 34 * 35 * The MDS client is primarily responsible to managing synchronous 36 * metadata requests for operations like open, unlink, and so forth. 37 * If there is a MDS failure, we find out about it when we (possibly 38 * request and) receive a new MDS map, and can resubmit affected 39 * requests. 40 * 41 * For the most part, though, we take advantage of a lossless 42 * communications channel to the MDS, and do not need to worry about 43 * timing out or resubmitting requests. 44 * 45 * We maintain a stateful "session" with each MDS we interact with. 46 * Within each session, we sent periodic heartbeat messages to ensure 47 * any capabilities or leases we have been issues remain valid. If 48 * the session times out and goes stale, our leases and capabilities 49 * are no longer valid. 50 */ 51 52 struct ceph_reconnect_state { 53 struct ceph_mds_session *session; 54 int nr_caps, nr_realms; 55 struct ceph_pagelist *pagelist; 56 unsigned msg_version; 57 bool allow_multi; 58 }; 59 60 static void __wake_requests(struct ceph_mds_client *mdsc, 61 struct list_head *head); 62 static void ceph_cap_release_work(struct work_struct *work); 63 static void ceph_cap_reclaim_work(struct work_struct *work); 64 65 static const struct ceph_connection_operations mds_con_ops; 66 67 68 /* 69 * mds reply parsing 70 */ 71 72 static int parse_reply_info_quota(void **p, void *end, 73 struct ceph_mds_reply_info_in *info) 74 { 75 u8 struct_v, struct_compat; 76 u32 struct_len; 77 78 ceph_decode_8_safe(p, end, struct_v, bad); 79 ceph_decode_8_safe(p, end, struct_compat, bad); 80 /* struct_v is expected to be >= 1. we only 81 * understand encoding with struct_compat == 1. */ 82 if (!struct_v || struct_compat != 1) 83 goto bad; 84 ceph_decode_32_safe(p, end, struct_len, bad); 85 ceph_decode_need(p, end, struct_len, bad); 86 end = *p + struct_len; 87 ceph_decode_64_safe(p, end, info->max_bytes, bad); 88 ceph_decode_64_safe(p, end, info->max_files, bad); 89 *p = end; 90 return 0; 91 bad: 92 return -EIO; 93 } 94 95 /* 96 * parse individual inode info 97 */ 98 static int parse_reply_info_in(void **p, void *end, 99 struct ceph_mds_reply_info_in *info, 100 u64 features) 101 { 102 int err = 0; 103 u8 struct_v = 0; 104 105 if (features == (u64)-1) { 106 u32 struct_len; 107 u8 struct_compat; 108 ceph_decode_8_safe(p, end, struct_v, bad); 109 ceph_decode_8_safe(p, end, struct_compat, bad); 110 /* struct_v is expected to be >= 1. we only understand 111 * encoding with struct_compat == 1. */ 112 if (!struct_v || struct_compat != 1) 113 goto bad; 114 ceph_decode_32_safe(p, end, struct_len, bad); 115 ceph_decode_need(p, end, struct_len, bad); 116 end = *p + struct_len; 117 } 118 119 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 120 info->in = *p; 121 *p += sizeof(struct ceph_mds_reply_inode) + 122 sizeof(*info->in->fragtree.splits) * 123 le32_to_cpu(info->in->fragtree.nsplits); 124 125 ceph_decode_32_safe(p, end, info->symlink_len, bad); 126 ceph_decode_need(p, end, info->symlink_len, bad); 127 info->symlink = *p; 128 *p += info->symlink_len; 129 130 ceph_decode_copy_safe(p, end, &info->dir_layout, 131 sizeof(info->dir_layout), bad); 132 ceph_decode_32_safe(p, end, info->xattr_len, bad); 133 ceph_decode_need(p, end, info->xattr_len, bad); 134 info->xattr_data = *p; 135 *p += info->xattr_len; 136 137 if (features == (u64)-1) { 138 /* inline data */ 139 ceph_decode_64_safe(p, end, info->inline_version, bad); 140 ceph_decode_32_safe(p, end, info->inline_len, bad); 141 ceph_decode_need(p, end, info->inline_len, bad); 142 info->inline_data = *p; 143 *p += info->inline_len; 144 /* quota */ 145 err = parse_reply_info_quota(p, end, info); 146 if (err < 0) 147 goto out_bad; 148 /* pool namespace */ 149 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 150 if (info->pool_ns_len > 0) { 151 ceph_decode_need(p, end, info->pool_ns_len, bad); 152 info->pool_ns_data = *p; 153 *p += info->pool_ns_len; 154 } 155 156 /* btime */ 157 ceph_decode_need(p, end, sizeof(info->btime), bad); 158 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 159 160 /* change attribute */ 161 ceph_decode_64_safe(p, end, info->change_attr, bad); 162 163 /* dir pin */ 164 if (struct_v >= 2) { 165 ceph_decode_32_safe(p, end, info->dir_pin, bad); 166 } else { 167 info->dir_pin = -ENODATA; 168 } 169 170 /* snapshot birth time, remains zero for v<=2 */ 171 if (struct_v >= 3) { 172 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 173 ceph_decode_copy(p, &info->snap_btime, 174 sizeof(info->snap_btime)); 175 } else { 176 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 177 } 178 179 /* snapshot count, remains zero for v<=3 */ 180 if (struct_v >= 4) { 181 ceph_decode_64_safe(p, end, info->rsnaps, bad); 182 } else { 183 info->rsnaps = 0; 184 } 185 186 *p = end; 187 } else { 188 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 189 ceph_decode_64_safe(p, end, info->inline_version, bad); 190 ceph_decode_32_safe(p, end, info->inline_len, bad); 191 ceph_decode_need(p, end, info->inline_len, bad); 192 info->inline_data = *p; 193 *p += info->inline_len; 194 } else 195 info->inline_version = CEPH_INLINE_NONE; 196 197 if (features & CEPH_FEATURE_MDS_QUOTA) { 198 err = parse_reply_info_quota(p, end, info); 199 if (err < 0) 200 goto out_bad; 201 } else { 202 info->max_bytes = 0; 203 info->max_files = 0; 204 } 205 206 info->pool_ns_len = 0; 207 info->pool_ns_data = NULL; 208 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 209 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 210 if (info->pool_ns_len > 0) { 211 ceph_decode_need(p, end, info->pool_ns_len, bad); 212 info->pool_ns_data = *p; 213 *p += info->pool_ns_len; 214 } 215 } 216 217 if (features & CEPH_FEATURE_FS_BTIME) { 218 ceph_decode_need(p, end, sizeof(info->btime), bad); 219 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 220 ceph_decode_64_safe(p, end, info->change_attr, bad); 221 } 222 223 info->dir_pin = -ENODATA; 224 /* info->snap_btime and info->rsnaps remain zero */ 225 } 226 return 0; 227 bad: 228 err = -EIO; 229 out_bad: 230 return err; 231 } 232 233 static int parse_reply_info_dir(void **p, void *end, 234 struct ceph_mds_reply_dirfrag **dirfrag, 235 u64 features) 236 { 237 if (features == (u64)-1) { 238 u8 struct_v, struct_compat; 239 u32 struct_len; 240 ceph_decode_8_safe(p, end, struct_v, bad); 241 ceph_decode_8_safe(p, end, struct_compat, bad); 242 /* struct_v is expected to be >= 1. we only understand 243 * encoding whose struct_compat == 1. */ 244 if (!struct_v || struct_compat != 1) 245 goto bad; 246 ceph_decode_32_safe(p, end, struct_len, bad); 247 ceph_decode_need(p, end, struct_len, bad); 248 end = *p + struct_len; 249 } 250 251 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 252 *dirfrag = *p; 253 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 254 if (unlikely(*p > end)) 255 goto bad; 256 if (features == (u64)-1) 257 *p = end; 258 return 0; 259 bad: 260 return -EIO; 261 } 262 263 static int parse_reply_info_lease(void **p, void *end, 264 struct ceph_mds_reply_lease **lease, 265 u64 features) 266 { 267 if (features == (u64)-1) { 268 u8 struct_v, struct_compat; 269 u32 struct_len; 270 ceph_decode_8_safe(p, end, struct_v, bad); 271 ceph_decode_8_safe(p, end, struct_compat, bad); 272 /* struct_v is expected to be >= 1. we only understand 273 * encoding whose struct_compat == 1. */ 274 if (!struct_v || struct_compat != 1) 275 goto bad; 276 ceph_decode_32_safe(p, end, struct_len, bad); 277 ceph_decode_need(p, end, struct_len, bad); 278 end = *p + struct_len; 279 } 280 281 ceph_decode_need(p, end, sizeof(**lease), bad); 282 *lease = *p; 283 *p += sizeof(**lease); 284 if (features == (u64)-1) 285 *p = end; 286 return 0; 287 bad: 288 return -EIO; 289 } 290 291 /* 292 * parse a normal reply, which may contain a (dir+)dentry and/or a 293 * target inode. 294 */ 295 static int parse_reply_info_trace(void **p, void *end, 296 struct ceph_mds_reply_info_parsed *info, 297 u64 features) 298 { 299 int err; 300 301 if (info->head->is_dentry) { 302 err = parse_reply_info_in(p, end, &info->diri, features); 303 if (err < 0) 304 goto out_bad; 305 306 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 307 if (err < 0) 308 goto out_bad; 309 310 ceph_decode_32_safe(p, end, info->dname_len, bad); 311 ceph_decode_need(p, end, info->dname_len, bad); 312 info->dname = *p; 313 *p += info->dname_len; 314 315 err = parse_reply_info_lease(p, end, &info->dlease, features); 316 if (err < 0) 317 goto out_bad; 318 } 319 320 if (info->head->is_target) { 321 err = parse_reply_info_in(p, end, &info->targeti, features); 322 if (err < 0) 323 goto out_bad; 324 } 325 326 if (unlikely(*p != end)) 327 goto bad; 328 return 0; 329 330 bad: 331 err = -EIO; 332 out_bad: 333 pr_err("problem parsing mds trace %d\n", err); 334 return err; 335 } 336 337 /* 338 * parse readdir results 339 */ 340 static int parse_reply_info_readdir(void **p, void *end, 341 struct ceph_mds_reply_info_parsed *info, 342 u64 features) 343 { 344 u32 num, i = 0; 345 int err; 346 347 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 348 if (err < 0) 349 goto out_bad; 350 351 ceph_decode_need(p, end, sizeof(num) + 2, bad); 352 num = ceph_decode_32(p); 353 { 354 u16 flags = ceph_decode_16(p); 355 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 356 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 357 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 358 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 359 } 360 if (num == 0) 361 goto done; 362 363 BUG_ON(!info->dir_entries); 364 if ((unsigned long)(info->dir_entries + num) > 365 (unsigned long)info->dir_entries + info->dir_buf_size) { 366 pr_err("dir contents are larger than expected\n"); 367 WARN_ON(1); 368 goto bad; 369 } 370 371 info->dir_nr = num; 372 while (num) { 373 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 374 /* dentry */ 375 ceph_decode_32_safe(p, end, rde->name_len, bad); 376 ceph_decode_need(p, end, rde->name_len, bad); 377 rde->name = *p; 378 *p += rde->name_len; 379 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 380 381 /* dentry lease */ 382 err = parse_reply_info_lease(p, end, &rde->lease, features); 383 if (err) 384 goto out_bad; 385 /* inode */ 386 err = parse_reply_info_in(p, end, &rde->inode, features); 387 if (err < 0) 388 goto out_bad; 389 /* ceph_readdir_prepopulate() will update it */ 390 rde->offset = 0; 391 i++; 392 num--; 393 } 394 395 done: 396 /* Skip over any unrecognized fields */ 397 *p = end; 398 return 0; 399 400 bad: 401 err = -EIO; 402 out_bad: 403 pr_err("problem parsing dir contents %d\n", err); 404 return err; 405 } 406 407 /* 408 * parse fcntl F_GETLK results 409 */ 410 static int parse_reply_info_filelock(void **p, void *end, 411 struct ceph_mds_reply_info_parsed *info, 412 u64 features) 413 { 414 if (*p + sizeof(*info->filelock_reply) > end) 415 goto bad; 416 417 info->filelock_reply = *p; 418 419 /* Skip over any unrecognized fields */ 420 *p = end; 421 return 0; 422 bad: 423 return -EIO; 424 } 425 426 427 #if BITS_PER_LONG == 64 428 429 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 430 431 static int ceph_parse_deleg_inos(void **p, void *end, 432 struct ceph_mds_session *s) 433 { 434 u32 sets; 435 436 ceph_decode_32_safe(p, end, sets, bad); 437 dout("got %u sets of delegated inodes\n", sets); 438 while (sets--) { 439 u64 start, len, ino; 440 441 ceph_decode_64_safe(p, end, start, bad); 442 ceph_decode_64_safe(p, end, len, bad); 443 444 /* Don't accept a delegation of system inodes */ 445 if (start < CEPH_INO_SYSTEM_BASE) { 446 pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n", 447 start, len); 448 continue; 449 } 450 while (len--) { 451 int err = xa_insert(&s->s_delegated_inos, ino = start++, 452 DELEGATED_INO_AVAILABLE, 453 GFP_KERNEL); 454 if (!err) { 455 dout("added delegated inode 0x%llx\n", 456 start - 1); 457 } else if (err == -EBUSY) { 458 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n", 459 start - 1); 460 } else { 461 return err; 462 } 463 } 464 } 465 return 0; 466 bad: 467 return -EIO; 468 } 469 470 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 471 { 472 unsigned long ino; 473 void *val; 474 475 xa_for_each(&s->s_delegated_inos, ino, val) { 476 val = xa_erase(&s->s_delegated_inos, ino); 477 if (val == DELEGATED_INO_AVAILABLE) 478 return ino; 479 } 480 return 0; 481 } 482 483 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 484 { 485 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 486 GFP_KERNEL); 487 } 488 #else /* BITS_PER_LONG == 64 */ 489 /* 490 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 491 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 492 * and bottom words? 493 */ 494 static int ceph_parse_deleg_inos(void **p, void *end, 495 struct ceph_mds_session *s) 496 { 497 u32 sets; 498 499 ceph_decode_32_safe(p, end, sets, bad); 500 if (sets) 501 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 502 return 0; 503 bad: 504 return -EIO; 505 } 506 507 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 508 { 509 return 0; 510 } 511 512 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 513 { 514 return 0; 515 } 516 #endif /* BITS_PER_LONG == 64 */ 517 518 /* 519 * parse create results 520 */ 521 static int parse_reply_info_create(void **p, void *end, 522 struct ceph_mds_reply_info_parsed *info, 523 u64 features, struct ceph_mds_session *s) 524 { 525 int ret; 526 527 if (features == (u64)-1 || 528 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 529 if (*p == end) { 530 /* Malformed reply? */ 531 info->has_create_ino = false; 532 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 533 info->has_create_ino = true; 534 /* struct_v, struct_compat, and len */ 535 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad); 536 ceph_decode_64_safe(p, end, info->ino, bad); 537 ret = ceph_parse_deleg_inos(p, end, s); 538 if (ret) 539 return ret; 540 } else { 541 /* legacy */ 542 ceph_decode_64_safe(p, end, info->ino, bad); 543 info->has_create_ino = true; 544 } 545 } else { 546 if (*p != end) 547 goto bad; 548 } 549 550 /* Skip over any unrecognized fields */ 551 *p = end; 552 return 0; 553 bad: 554 return -EIO; 555 } 556 557 /* 558 * parse extra results 559 */ 560 static int parse_reply_info_extra(void **p, void *end, 561 struct ceph_mds_reply_info_parsed *info, 562 u64 features, struct ceph_mds_session *s) 563 { 564 u32 op = le32_to_cpu(info->head->op); 565 566 if (op == CEPH_MDS_OP_GETFILELOCK) 567 return parse_reply_info_filelock(p, end, info, features); 568 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 569 return parse_reply_info_readdir(p, end, info, features); 570 else if (op == CEPH_MDS_OP_CREATE) 571 return parse_reply_info_create(p, end, info, features, s); 572 else 573 return -EIO; 574 } 575 576 /* 577 * parse entire mds reply 578 */ 579 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 580 struct ceph_mds_reply_info_parsed *info, 581 u64 features) 582 { 583 void *p, *end; 584 u32 len; 585 int err; 586 587 info->head = msg->front.iov_base; 588 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 589 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 590 591 /* trace */ 592 ceph_decode_32_safe(&p, end, len, bad); 593 if (len > 0) { 594 ceph_decode_need(&p, end, len, bad); 595 err = parse_reply_info_trace(&p, p+len, info, features); 596 if (err < 0) 597 goto out_bad; 598 } 599 600 /* extra */ 601 ceph_decode_32_safe(&p, end, len, bad); 602 if (len > 0) { 603 ceph_decode_need(&p, end, len, bad); 604 err = parse_reply_info_extra(&p, p+len, info, features, s); 605 if (err < 0) 606 goto out_bad; 607 } 608 609 /* snap blob */ 610 ceph_decode_32_safe(&p, end, len, bad); 611 info->snapblob_len = len; 612 info->snapblob = p; 613 p += len; 614 615 if (p != end) 616 goto bad; 617 return 0; 618 619 bad: 620 err = -EIO; 621 out_bad: 622 pr_err("mds parse_reply err %d\n", err); 623 return err; 624 } 625 626 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 627 { 628 if (!info->dir_entries) 629 return; 630 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 631 } 632 633 634 /* 635 * sessions 636 */ 637 const char *ceph_session_state_name(int s) 638 { 639 switch (s) { 640 case CEPH_MDS_SESSION_NEW: return "new"; 641 case CEPH_MDS_SESSION_OPENING: return "opening"; 642 case CEPH_MDS_SESSION_OPEN: return "open"; 643 case CEPH_MDS_SESSION_HUNG: return "hung"; 644 case CEPH_MDS_SESSION_CLOSING: return "closing"; 645 case CEPH_MDS_SESSION_CLOSED: return "closed"; 646 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 647 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 648 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 649 default: return "???"; 650 } 651 } 652 653 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 654 { 655 if (refcount_inc_not_zero(&s->s_ref)) { 656 dout("mdsc get_session %p %d -> %d\n", s, 657 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref)); 658 return s; 659 } else { 660 dout("mdsc get_session %p 0 -- FAIL\n", s); 661 return NULL; 662 } 663 } 664 665 void ceph_put_mds_session(struct ceph_mds_session *s) 666 { 667 if (IS_ERR_OR_NULL(s)) 668 return; 669 670 dout("mdsc put_session %p %d -> %d\n", s, 671 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1); 672 if (refcount_dec_and_test(&s->s_ref)) { 673 if (s->s_auth.authorizer) 674 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 675 WARN_ON(mutex_is_locked(&s->s_mutex)); 676 xa_destroy(&s->s_delegated_inos); 677 kfree(s); 678 } 679 } 680 681 /* 682 * called under mdsc->mutex 683 */ 684 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 685 int mds) 686 { 687 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 688 return NULL; 689 return ceph_get_mds_session(mdsc->sessions[mds]); 690 } 691 692 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 693 { 694 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 695 return false; 696 else 697 return true; 698 } 699 700 static int __verify_registered_session(struct ceph_mds_client *mdsc, 701 struct ceph_mds_session *s) 702 { 703 if (s->s_mds >= mdsc->max_sessions || 704 mdsc->sessions[s->s_mds] != s) 705 return -ENOENT; 706 return 0; 707 } 708 709 /* 710 * create+register a new session for given mds. 711 * called under mdsc->mutex. 712 */ 713 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 714 int mds) 715 { 716 struct ceph_mds_session *s; 717 718 if (mds >= mdsc->mdsmap->possible_max_rank) 719 return ERR_PTR(-EINVAL); 720 721 s = kzalloc(sizeof(*s), GFP_NOFS); 722 if (!s) 723 return ERR_PTR(-ENOMEM); 724 725 if (mds >= mdsc->max_sessions) { 726 int newmax = 1 << get_count_order(mds + 1); 727 struct ceph_mds_session **sa; 728 729 dout("%s: realloc to %d\n", __func__, newmax); 730 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 731 if (!sa) 732 goto fail_realloc; 733 if (mdsc->sessions) { 734 memcpy(sa, mdsc->sessions, 735 mdsc->max_sessions * sizeof(void *)); 736 kfree(mdsc->sessions); 737 } 738 mdsc->sessions = sa; 739 mdsc->max_sessions = newmax; 740 } 741 742 dout("%s: mds%d\n", __func__, mds); 743 s->s_mdsc = mdsc; 744 s->s_mds = mds; 745 s->s_state = CEPH_MDS_SESSION_NEW; 746 s->s_ttl = 0; 747 s->s_seq = 0; 748 mutex_init(&s->s_mutex); 749 750 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 751 752 atomic_set(&s->s_cap_gen, 1); 753 s->s_cap_ttl = jiffies - 1; 754 755 spin_lock_init(&s->s_cap_lock); 756 s->s_renew_requested = 0; 757 s->s_renew_seq = 0; 758 INIT_LIST_HEAD(&s->s_caps); 759 s->s_nr_caps = 0; 760 refcount_set(&s->s_ref, 1); 761 INIT_LIST_HEAD(&s->s_waiting); 762 INIT_LIST_HEAD(&s->s_unsafe); 763 xa_init(&s->s_delegated_inos); 764 s->s_num_cap_releases = 0; 765 s->s_cap_reconnect = 0; 766 s->s_cap_iterator = NULL; 767 INIT_LIST_HEAD(&s->s_cap_releases); 768 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 769 770 INIT_LIST_HEAD(&s->s_cap_dirty); 771 INIT_LIST_HEAD(&s->s_cap_flushing); 772 773 mdsc->sessions[mds] = s; 774 atomic_inc(&mdsc->num_sessions); 775 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 776 777 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 778 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 779 780 return s; 781 782 fail_realloc: 783 kfree(s); 784 return ERR_PTR(-ENOMEM); 785 } 786 787 /* 788 * called under mdsc->mutex 789 */ 790 static void __unregister_session(struct ceph_mds_client *mdsc, 791 struct ceph_mds_session *s) 792 { 793 dout("__unregister_session mds%d %p\n", s->s_mds, s); 794 BUG_ON(mdsc->sessions[s->s_mds] != s); 795 mdsc->sessions[s->s_mds] = NULL; 796 ceph_con_close(&s->s_con); 797 ceph_put_mds_session(s); 798 atomic_dec(&mdsc->num_sessions); 799 } 800 801 /* 802 * drop session refs in request. 803 * 804 * should be last request ref, or hold mdsc->mutex 805 */ 806 static void put_request_session(struct ceph_mds_request *req) 807 { 808 if (req->r_session) { 809 ceph_put_mds_session(req->r_session); 810 req->r_session = NULL; 811 } 812 } 813 814 void ceph_mdsc_release_request(struct kref *kref) 815 { 816 struct ceph_mds_request *req = container_of(kref, 817 struct ceph_mds_request, 818 r_kref); 819 ceph_mdsc_release_dir_caps_no_check(req); 820 destroy_reply_info(&req->r_reply_info); 821 if (req->r_request) 822 ceph_msg_put(req->r_request); 823 if (req->r_reply) 824 ceph_msg_put(req->r_reply); 825 if (req->r_inode) { 826 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 827 iput(req->r_inode); 828 } 829 if (req->r_parent) { 830 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 831 iput(req->r_parent); 832 } 833 iput(req->r_target_inode); 834 if (req->r_dentry) 835 dput(req->r_dentry); 836 if (req->r_old_dentry) 837 dput(req->r_old_dentry); 838 if (req->r_old_dentry_dir) { 839 /* 840 * track (and drop pins for) r_old_dentry_dir 841 * separately, since r_old_dentry's d_parent may have 842 * changed between the dir mutex being dropped and 843 * this request being freed. 844 */ 845 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 846 CEPH_CAP_PIN); 847 iput(req->r_old_dentry_dir); 848 } 849 kfree(req->r_path1); 850 kfree(req->r_path2); 851 put_cred(req->r_cred); 852 if (req->r_pagelist) 853 ceph_pagelist_release(req->r_pagelist); 854 put_request_session(req); 855 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 856 WARN_ON_ONCE(!list_empty(&req->r_wait)); 857 kmem_cache_free(ceph_mds_request_cachep, req); 858 } 859 860 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 861 862 /* 863 * lookup session, bump ref if found. 864 * 865 * called under mdsc->mutex. 866 */ 867 static struct ceph_mds_request * 868 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 869 { 870 struct ceph_mds_request *req; 871 872 req = lookup_request(&mdsc->request_tree, tid); 873 if (req) 874 ceph_mdsc_get_request(req); 875 876 return req; 877 } 878 879 /* 880 * Register an in-flight request, and assign a tid. Link to directory 881 * are modifying (if any). 882 * 883 * Called under mdsc->mutex. 884 */ 885 static void __register_request(struct ceph_mds_client *mdsc, 886 struct ceph_mds_request *req, 887 struct inode *dir) 888 { 889 int ret = 0; 890 891 req->r_tid = ++mdsc->last_tid; 892 if (req->r_num_caps) { 893 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 894 req->r_num_caps); 895 if (ret < 0) { 896 pr_err("__register_request %p " 897 "failed to reserve caps: %d\n", req, ret); 898 /* set req->r_err to fail early from __do_request */ 899 req->r_err = ret; 900 return; 901 } 902 } 903 dout("__register_request %p tid %lld\n", req, req->r_tid); 904 ceph_mdsc_get_request(req); 905 insert_request(&mdsc->request_tree, req); 906 907 req->r_cred = get_current_cred(); 908 909 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 910 mdsc->oldest_tid = req->r_tid; 911 912 if (dir) { 913 struct ceph_inode_info *ci = ceph_inode(dir); 914 915 ihold(dir); 916 req->r_unsafe_dir = dir; 917 spin_lock(&ci->i_unsafe_lock); 918 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 919 spin_unlock(&ci->i_unsafe_lock); 920 } 921 } 922 923 static void __unregister_request(struct ceph_mds_client *mdsc, 924 struct ceph_mds_request *req) 925 { 926 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 927 928 /* Never leave an unregistered request on an unsafe list! */ 929 list_del_init(&req->r_unsafe_item); 930 931 if (req->r_tid == mdsc->oldest_tid) { 932 struct rb_node *p = rb_next(&req->r_node); 933 mdsc->oldest_tid = 0; 934 while (p) { 935 struct ceph_mds_request *next_req = 936 rb_entry(p, struct ceph_mds_request, r_node); 937 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 938 mdsc->oldest_tid = next_req->r_tid; 939 break; 940 } 941 p = rb_next(p); 942 } 943 } 944 945 erase_request(&mdsc->request_tree, req); 946 947 if (req->r_unsafe_dir) { 948 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 949 spin_lock(&ci->i_unsafe_lock); 950 list_del_init(&req->r_unsafe_dir_item); 951 spin_unlock(&ci->i_unsafe_lock); 952 } 953 if (req->r_target_inode && 954 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 955 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 956 spin_lock(&ci->i_unsafe_lock); 957 list_del_init(&req->r_unsafe_target_item); 958 spin_unlock(&ci->i_unsafe_lock); 959 } 960 961 if (req->r_unsafe_dir) { 962 iput(req->r_unsafe_dir); 963 req->r_unsafe_dir = NULL; 964 } 965 966 complete_all(&req->r_safe_completion); 967 968 ceph_mdsc_put_request(req); 969 } 970 971 /* 972 * Walk back up the dentry tree until we hit a dentry representing a 973 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 974 * when calling this) to ensure that the objects won't disappear while we're 975 * working with them. Once we hit a candidate dentry, we attempt to take a 976 * reference to it, and return that as the result. 977 */ 978 static struct inode *get_nonsnap_parent(struct dentry *dentry) 979 { 980 struct inode *inode = NULL; 981 982 while (dentry && !IS_ROOT(dentry)) { 983 inode = d_inode_rcu(dentry); 984 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 985 break; 986 dentry = dentry->d_parent; 987 } 988 if (inode) 989 inode = igrab(inode); 990 return inode; 991 } 992 993 /* 994 * Choose mds to send request to next. If there is a hint set in the 995 * request (e.g., due to a prior forward hint from the mds), use that. 996 * Otherwise, consult frag tree and/or caps to identify the 997 * appropriate mds. If all else fails, choose randomly. 998 * 999 * Called under mdsc->mutex. 1000 */ 1001 static int __choose_mds(struct ceph_mds_client *mdsc, 1002 struct ceph_mds_request *req, 1003 bool *random) 1004 { 1005 struct inode *inode; 1006 struct ceph_inode_info *ci; 1007 struct ceph_cap *cap; 1008 int mode = req->r_direct_mode; 1009 int mds = -1; 1010 u32 hash = req->r_direct_hash; 1011 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1012 1013 if (random) 1014 *random = false; 1015 1016 /* 1017 * is there a specific mds we should try? ignore hint if we have 1018 * no session and the mds is not up (active or recovering). 1019 */ 1020 if (req->r_resend_mds >= 0 && 1021 (__have_session(mdsc, req->r_resend_mds) || 1022 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1023 dout("%s using resend_mds mds%d\n", __func__, 1024 req->r_resend_mds); 1025 return req->r_resend_mds; 1026 } 1027 1028 if (mode == USE_RANDOM_MDS) 1029 goto random; 1030 1031 inode = NULL; 1032 if (req->r_inode) { 1033 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1034 inode = req->r_inode; 1035 ihold(inode); 1036 } else { 1037 /* req->r_dentry is non-null for LSSNAP request */ 1038 rcu_read_lock(); 1039 inode = get_nonsnap_parent(req->r_dentry); 1040 rcu_read_unlock(); 1041 dout("%s using snapdir's parent %p\n", __func__, inode); 1042 } 1043 } else if (req->r_dentry) { 1044 /* ignore race with rename; old or new d_parent is okay */ 1045 struct dentry *parent; 1046 struct inode *dir; 1047 1048 rcu_read_lock(); 1049 parent = READ_ONCE(req->r_dentry->d_parent); 1050 dir = req->r_parent ? : d_inode_rcu(parent); 1051 1052 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1053 /* not this fs or parent went negative */ 1054 inode = d_inode(req->r_dentry); 1055 if (inode) 1056 ihold(inode); 1057 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1058 /* direct snapped/virtual snapdir requests 1059 * based on parent dir inode */ 1060 inode = get_nonsnap_parent(parent); 1061 dout("%s using nonsnap parent %p\n", __func__, inode); 1062 } else { 1063 /* dentry target */ 1064 inode = d_inode(req->r_dentry); 1065 if (!inode || mode == USE_AUTH_MDS) { 1066 /* dir + name */ 1067 inode = igrab(dir); 1068 hash = ceph_dentry_hash(dir, req->r_dentry); 1069 is_hash = true; 1070 } else { 1071 ihold(inode); 1072 } 1073 } 1074 rcu_read_unlock(); 1075 } 1076 1077 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, 1078 hash, mode); 1079 if (!inode) 1080 goto random; 1081 ci = ceph_inode(inode); 1082 1083 if (is_hash && S_ISDIR(inode->i_mode)) { 1084 struct ceph_inode_frag frag; 1085 int found; 1086 1087 ceph_choose_frag(ci, hash, &frag, &found); 1088 if (found) { 1089 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1090 u8 r; 1091 1092 /* choose a random replica */ 1093 get_random_bytes(&r, 1); 1094 r %= frag.ndist; 1095 mds = frag.dist[r]; 1096 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", 1097 __func__, inode, ceph_vinop(inode), 1098 frag.frag, mds, (int)r, frag.ndist); 1099 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1100 CEPH_MDS_STATE_ACTIVE && 1101 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1102 goto out; 1103 } 1104 1105 /* since this file/dir wasn't known to be 1106 * replicated, then we want to look for the 1107 * authoritative mds. */ 1108 if (frag.mds >= 0) { 1109 /* choose auth mds */ 1110 mds = frag.mds; 1111 dout("%s %p %llx.%llx frag %u mds%d (auth)\n", 1112 __func__, inode, ceph_vinop(inode), 1113 frag.frag, mds); 1114 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1115 CEPH_MDS_STATE_ACTIVE) { 1116 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1117 mds)) 1118 goto out; 1119 } 1120 } 1121 mode = USE_AUTH_MDS; 1122 } 1123 } 1124 1125 spin_lock(&ci->i_ceph_lock); 1126 cap = NULL; 1127 if (mode == USE_AUTH_MDS) 1128 cap = ci->i_auth_cap; 1129 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1130 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1131 if (!cap) { 1132 spin_unlock(&ci->i_ceph_lock); 1133 iput(inode); 1134 goto random; 1135 } 1136 mds = cap->session->s_mds; 1137 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, 1138 inode, ceph_vinop(inode), mds, 1139 cap == ci->i_auth_cap ? "auth " : "", cap); 1140 spin_unlock(&ci->i_ceph_lock); 1141 out: 1142 iput(inode); 1143 return mds; 1144 1145 random: 1146 if (random) 1147 *random = true; 1148 1149 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1150 dout("%s chose random mds%d\n", __func__, mds); 1151 return mds; 1152 } 1153 1154 1155 /* 1156 * session messages 1157 */ 1158 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 1159 { 1160 struct ceph_msg *msg; 1161 struct ceph_mds_session_head *h; 1162 1163 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1164 false); 1165 if (!msg) { 1166 pr_err("create_session_msg ENOMEM creating msg\n"); 1167 return NULL; 1168 } 1169 h = msg->front.iov_base; 1170 h->op = cpu_to_le32(op); 1171 h->seq = cpu_to_le64(seq); 1172 1173 return msg; 1174 } 1175 1176 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1177 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1178 static int encode_supported_features(void **p, void *end) 1179 { 1180 static const size_t count = ARRAY_SIZE(feature_bits); 1181 1182 if (count > 0) { 1183 size_t i; 1184 size_t size = FEATURE_BYTES(count); 1185 1186 if (WARN_ON_ONCE(*p + 4 + size > end)) 1187 return -ERANGE; 1188 1189 ceph_encode_32(p, size); 1190 memset(*p, 0, size); 1191 for (i = 0; i < count; i++) 1192 ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8); 1193 *p += size; 1194 } else { 1195 if (WARN_ON_ONCE(*p + 4 > end)) 1196 return -ERANGE; 1197 1198 ceph_encode_32(p, 0); 1199 } 1200 1201 return 0; 1202 } 1203 1204 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1205 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1206 static int encode_metric_spec(void **p, void *end) 1207 { 1208 static const size_t count = ARRAY_SIZE(metric_bits); 1209 1210 /* header */ 1211 if (WARN_ON_ONCE(*p + 2 > end)) 1212 return -ERANGE; 1213 1214 ceph_encode_8(p, 1); /* version */ 1215 ceph_encode_8(p, 1); /* compat */ 1216 1217 if (count > 0) { 1218 size_t i; 1219 size_t size = METRIC_BYTES(count); 1220 1221 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1222 return -ERANGE; 1223 1224 /* metric spec info length */ 1225 ceph_encode_32(p, 4 + size); 1226 1227 /* metric spec */ 1228 ceph_encode_32(p, size); 1229 memset(*p, 0, size); 1230 for (i = 0; i < count; i++) 1231 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1232 *p += size; 1233 } else { 1234 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1235 return -ERANGE; 1236 1237 /* metric spec info length */ 1238 ceph_encode_32(p, 4); 1239 /* metric spec */ 1240 ceph_encode_32(p, 0); 1241 } 1242 1243 return 0; 1244 } 1245 1246 /* 1247 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1248 * to include additional client metadata fields. 1249 */ 1250 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1251 { 1252 struct ceph_msg *msg; 1253 struct ceph_mds_session_head *h; 1254 int i; 1255 int extra_bytes = 0; 1256 int metadata_key_count = 0; 1257 struct ceph_options *opt = mdsc->fsc->client->options; 1258 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1259 size_t size, count; 1260 void *p, *end; 1261 int ret; 1262 1263 const char* metadata[][2] = { 1264 {"hostname", mdsc->nodename}, 1265 {"kernel_version", init_utsname()->release}, 1266 {"entity_id", opt->name ? : ""}, 1267 {"root", fsopt->server_path ? : "/"}, 1268 {NULL, NULL} 1269 }; 1270 1271 /* Calculate serialized length of metadata */ 1272 extra_bytes = 4; /* map length */ 1273 for (i = 0; metadata[i][0]; ++i) { 1274 extra_bytes += 8 + strlen(metadata[i][0]) + 1275 strlen(metadata[i][1]); 1276 metadata_key_count++; 1277 } 1278 1279 /* supported feature */ 1280 size = 0; 1281 count = ARRAY_SIZE(feature_bits); 1282 if (count > 0) 1283 size = FEATURE_BYTES(count); 1284 extra_bytes += 4 + size; 1285 1286 /* metric spec */ 1287 size = 0; 1288 count = ARRAY_SIZE(metric_bits); 1289 if (count > 0) 1290 size = METRIC_BYTES(count); 1291 extra_bytes += 2 + 4 + 4 + size; 1292 1293 /* Allocate the message */ 1294 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1295 GFP_NOFS, false); 1296 if (!msg) { 1297 pr_err("create_session_msg ENOMEM creating msg\n"); 1298 return ERR_PTR(-ENOMEM); 1299 } 1300 p = msg->front.iov_base; 1301 end = p + msg->front.iov_len; 1302 1303 h = p; 1304 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1305 h->seq = cpu_to_le64(seq); 1306 1307 /* 1308 * Serialize client metadata into waiting buffer space, using 1309 * the format that userspace expects for map<string, string> 1310 * 1311 * ClientSession messages with metadata are v4 1312 */ 1313 msg->hdr.version = cpu_to_le16(4); 1314 msg->hdr.compat_version = cpu_to_le16(1); 1315 1316 /* The write pointer, following the session_head structure */ 1317 p += sizeof(*h); 1318 1319 /* Number of entries in the map */ 1320 ceph_encode_32(&p, metadata_key_count); 1321 1322 /* Two length-prefixed strings for each entry in the map */ 1323 for (i = 0; metadata[i][0]; ++i) { 1324 size_t const key_len = strlen(metadata[i][0]); 1325 size_t const val_len = strlen(metadata[i][1]); 1326 1327 ceph_encode_32(&p, key_len); 1328 memcpy(p, metadata[i][0], key_len); 1329 p += key_len; 1330 ceph_encode_32(&p, val_len); 1331 memcpy(p, metadata[i][1], val_len); 1332 p += val_len; 1333 } 1334 1335 ret = encode_supported_features(&p, end); 1336 if (ret) { 1337 pr_err("encode_supported_features failed!\n"); 1338 ceph_msg_put(msg); 1339 return ERR_PTR(ret); 1340 } 1341 1342 ret = encode_metric_spec(&p, end); 1343 if (ret) { 1344 pr_err("encode_metric_spec failed!\n"); 1345 ceph_msg_put(msg); 1346 return ERR_PTR(ret); 1347 } 1348 1349 msg->front.iov_len = p - msg->front.iov_base; 1350 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1351 1352 return msg; 1353 } 1354 1355 /* 1356 * send session open request. 1357 * 1358 * called under mdsc->mutex 1359 */ 1360 static int __open_session(struct ceph_mds_client *mdsc, 1361 struct ceph_mds_session *session) 1362 { 1363 struct ceph_msg *msg; 1364 int mstate; 1365 int mds = session->s_mds; 1366 1367 /* wait for mds to go active? */ 1368 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1369 dout("open_session to mds%d (%s)\n", mds, 1370 ceph_mds_state_name(mstate)); 1371 session->s_state = CEPH_MDS_SESSION_OPENING; 1372 session->s_renew_requested = jiffies; 1373 1374 /* send connect message */ 1375 msg = create_session_open_msg(mdsc, session->s_seq); 1376 if (IS_ERR(msg)) 1377 return PTR_ERR(msg); 1378 ceph_con_send(&session->s_con, msg); 1379 return 0; 1380 } 1381 1382 /* 1383 * open sessions for any export targets for the given mds 1384 * 1385 * called under mdsc->mutex 1386 */ 1387 static struct ceph_mds_session * 1388 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1389 { 1390 struct ceph_mds_session *session; 1391 int ret; 1392 1393 session = __ceph_lookup_mds_session(mdsc, target); 1394 if (!session) { 1395 session = register_session(mdsc, target); 1396 if (IS_ERR(session)) 1397 return session; 1398 } 1399 if (session->s_state == CEPH_MDS_SESSION_NEW || 1400 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1401 ret = __open_session(mdsc, session); 1402 if (ret) 1403 return ERR_PTR(ret); 1404 } 1405 1406 return session; 1407 } 1408 1409 struct ceph_mds_session * 1410 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1411 { 1412 struct ceph_mds_session *session; 1413 1414 dout("open_export_target_session to mds%d\n", target); 1415 1416 mutex_lock(&mdsc->mutex); 1417 session = __open_export_target_session(mdsc, target); 1418 mutex_unlock(&mdsc->mutex); 1419 1420 return session; 1421 } 1422 1423 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1424 struct ceph_mds_session *session) 1425 { 1426 struct ceph_mds_info *mi; 1427 struct ceph_mds_session *ts; 1428 int i, mds = session->s_mds; 1429 1430 if (mds >= mdsc->mdsmap->possible_max_rank) 1431 return; 1432 1433 mi = &mdsc->mdsmap->m_info[mds]; 1434 dout("open_export_target_sessions for mds%d (%d targets)\n", 1435 session->s_mds, mi->num_export_targets); 1436 1437 for (i = 0; i < mi->num_export_targets; i++) { 1438 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1439 ceph_put_mds_session(ts); 1440 } 1441 } 1442 1443 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1444 struct ceph_mds_session *session) 1445 { 1446 mutex_lock(&mdsc->mutex); 1447 __open_export_target_sessions(mdsc, session); 1448 mutex_unlock(&mdsc->mutex); 1449 } 1450 1451 /* 1452 * session caps 1453 */ 1454 1455 static void detach_cap_releases(struct ceph_mds_session *session, 1456 struct list_head *target) 1457 { 1458 lockdep_assert_held(&session->s_cap_lock); 1459 1460 list_splice_init(&session->s_cap_releases, target); 1461 session->s_num_cap_releases = 0; 1462 dout("dispose_cap_releases mds%d\n", session->s_mds); 1463 } 1464 1465 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1466 struct list_head *dispose) 1467 { 1468 while (!list_empty(dispose)) { 1469 struct ceph_cap *cap; 1470 /* zero out the in-progress message */ 1471 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1472 list_del(&cap->session_caps); 1473 ceph_put_cap(mdsc, cap); 1474 } 1475 } 1476 1477 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1478 struct ceph_mds_session *session) 1479 { 1480 struct ceph_mds_request *req; 1481 struct rb_node *p; 1482 struct ceph_inode_info *ci; 1483 1484 dout("cleanup_session_requests mds%d\n", session->s_mds); 1485 mutex_lock(&mdsc->mutex); 1486 while (!list_empty(&session->s_unsafe)) { 1487 req = list_first_entry(&session->s_unsafe, 1488 struct ceph_mds_request, r_unsafe_item); 1489 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1490 req->r_tid); 1491 if (req->r_target_inode) { 1492 /* dropping unsafe change of inode's attributes */ 1493 ci = ceph_inode(req->r_target_inode); 1494 errseq_set(&ci->i_meta_err, -EIO); 1495 } 1496 if (req->r_unsafe_dir) { 1497 /* dropping unsafe directory operation */ 1498 ci = ceph_inode(req->r_unsafe_dir); 1499 errseq_set(&ci->i_meta_err, -EIO); 1500 } 1501 __unregister_request(mdsc, req); 1502 } 1503 /* zero r_attempts, so kick_requests() will re-send requests */ 1504 p = rb_first(&mdsc->request_tree); 1505 while (p) { 1506 req = rb_entry(p, struct ceph_mds_request, r_node); 1507 p = rb_next(p); 1508 if (req->r_session && 1509 req->r_session->s_mds == session->s_mds) 1510 req->r_attempts = 0; 1511 } 1512 mutex_unlock(&mdsc->mutex); 1513 } 1514 1515 /* 1516 * Helper to safely iterate over all caps associated with a session, with 1517 * special care taken to handle a racing __ceph_remove_cap(). 1518 * 1519 * Caller must hold session s_mutex. 1520 */ 1521 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1522 int (*cb)(struct inode *, struct ceph_cap *, 1523 void *), void *arg) 1524 { 1525 struct list_head *p; 1526 struct ceph_cap *cap; 1527 struct inode *inode, *last_inode = NULL; 1528 struct ceph_cap *old_cap = NULL; 1529 int ret; 1530 1531 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1532 spin_lock(&session->s_cap_lock); 1533 p = session->s_caps.next; 1534 while (p != &session->s_caps) { 1535 cap = list_entry(p, struct ceph_cap, session_caps); 1536 inode = igrab(&cap->ci->vfs_inode); 1537 if (!inode) { 1538 p = p->next; 1539 continue; 1540 } 1541 session->s_cap_iterator = cap; 1542 spin_unlock(&session->s_cap_lock); 1543 1544 if (last_inode) { 1545 iput(last_inode); 1546 last_inode = NULL; 1547 } 1548 if (old_cap) { 1549 ceph_put_cap(session->s_mdsc, old_cap); 1550 old_cap = NULL; 1551 } 1552 1553 ret = cb(inode, cap, arg); 1554 last_inode = inode; 1555 1556 spin_lock(&session->s_cap_lock); 1557 p = p->next; 1558 if (!cap->ci) { 1559 dout("iterate_session_caps finishing cap %p removal\n", 1560 cap); 1561 BUG_ON(cap->session != session); 1562 cap->session = NULL; 1563 list_del_init(&cap->session_caps); 1564 session->s_nr_caps--; 1565 atomic64_dec(&session->s_mdsc->metric.total_caps); 1566 if (cap->queue_release) 1567 __ceph_queue_cap_release(session, cap); 1568 else 1569 old_cap = cap; /* put_cap it w/o locks held */ 1570 } 1571 if (ret < 0) 1572 goto out; 1573 } 1574 ret = 0; 1575 out: 1576 session->s_cap_iterator = NULL; 1577 spin_unlock(&session->s_cap_lock); 1578 1579 iput(last_inode); 1580 if (old_cap) 1581 ceph_put_cap(session->s_mdsc, old_cap); 1582 1583 return ret; 1584 } 1585 1586 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1587 void *arg) 1588 { 1589 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg; 1590 struct ceph_inode_info *ci = ceph_inode(inode); 1591 LIST_HEAD(to_remove); 1592 bool dirty_dropped = false; 1593 bool invalidate = false; 1594 1595 dout("removing cap %p, ci is %p, inode is %p\n", 1596 cap, ci, &ci->vfs_inode); 1597 spin_lock(&ci->i_ceph_lock); 1598 __ceph_remove_cap(cap, false); 1599 if (!ci->i_auth_cap) { 1600 struct ceph_cap_flush *cf; 1601 struct ceph_mds_client *mdsc = fsc->mdsc; 1602 1603 if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) { 1604 if (inode->i_data.nrpages > 0) 1605 invalidate = true; 1606 if (ci->i_wrbuffer_ref > 0) 1607 mapping_set_error(&inode->i_data, -EIO); 1608 } 1609 1610 while (!list_empty(&ci->i_cap_flush_list)) { 1611 cf = list_first_entry(&ci->i_cap_flush_list, 1612 struct ceph_cap_flush, i_list); 1613 list_move(&cf->i_list, &to_remove); 1614 } 1615 1616 spin_lock(&mdsc->cap_dirty_lock); 1617 1618 list_for_each_entry(cf, &to_remove, i_list) 1619 list_del_init(&cf->g_list); 1620 1621 if (!list_empty(&ci->i_dirty_item)) { 1622 pr_warn_ratelimited( 1623 " dropping dirty %s state for %p %lld\n", 1624 ceph_cap_string(ci->i_dirty_caps), 1625 inode, ceph_ino(inode)); 1626 ci->i_dirty_caps = 0; 1627 list_del_init(&ci->i_dirty_item); 1628 dirty_dropped = true; 1629 } 1630 if (!list_empty(&ci->i_flushing_item)) { 1631 pr_warn_ratelimited( 1632 " dropping dirty+flushing %s state for %p %lld\n", 1633 ceph_cap_string(ci->i_flushing_caps), 1634 inode, ceph_ino(inode)); 1635 ci->i_flushing_caps = 0; 1636 list_del_init(&ci->i_flushing_item); 1637 mdsc->num_cap_flushing--; 1638 dirty_dropped = true; 1639 } 1640 spin_unlock(&mdsc->cap_dirty_lock); 1641 1642 if (dirty_dropped) { 1643 errseq_set(&ci->i_meta_err, -EIO); 1644 1645 if (ci->i_wrbuffer_ref_head == 0 && 1646 ci->i_wr_ref == 0 && 1647 ci->i_dirty_caps == 0 && 1648 ci->i_flushing_caps == 0) { 1649 ceph_put_snap_context(ci->i_head_snapc); 1650 ci->i_head_snapc = NULL; 1651 } 1652 } 1653 1654 if (atomic_read(&ci->i_filelock_ref) > 0) { 1655 /* make further file lock syscall return -EIO */ 1656 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK; 1657 pr_warn_ratelimited(" dropping file locks for %p %lld\n", 1658 inode, ceph_ino(inode)); 1659 } 1660 1661 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { 1662 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); 1663 ci->i_prealloc_cap_flush = NULL; 1664 } 1665 } 1666 spin_unlock(&ci->i_ceph_lock); 1667 while (!list_empty(&to_remove)) { 1668 struct ceph_cap_flush *cf; 1669 cf = list_first_entry(&to_remove, 1670 struct ceph_cap_flush, i_list); 1671 list_del_init(&cf->i_list); 1672 if (!cf->is_capsnap) 1673 ceph_free_cap_flush(cf); 1674 } 1675 1676 wake_up_all(&ci->i_cap_wq); 1677 if (invalidate) 1678 ceph_queue_invalidate(inode); 1679 if (dirty_dropped) 1680 iput(inode); 1681 return 0; 1682 } 1683 1684 /* 1685 * caller must hold session s_mutex 1686 */ 1687 static void remove_session_caps(struct ceph_mds_session *session) 1688 { 1689 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1690 struct super_block *sb = fsc->sb; 1691 LIST_HEAD(dispose); 1692 1693 dout("remove_session_caps on %p\n", session); 1694 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1695 1696 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1697 1698 spin_lock(&session->s_cap_lock); 1699 if (session->s_nr_caps > 0) { 1700 struct inode *inode; 1701 struct ceph_cap *cap, *prev = NULL; 1702 struct ceph_vino vino; 1703 /* 1704 * iterate_session_caps() skips inodes that are being 1705 * deleted, we need to wait until deletions are complete. 1706 * __wait_on_freeing_inode() is designed for the job, 1707 * but it is not exported, so use lookup inode function 1708 * to access it. 1709 */ 1710 while (!list_empty(&session->s_caps)) { 1711 cap = list_entry(session->s_caps.next, 1712 struct ceph_cap, session_caps); 1713 if (cap == prev) 1714 break; 1715 prev = cap; 1716 vino = cap->ci->i_vino; 1717 spin_unlock(&session->s_cap_lock); 1718 1719 inode = ceph_find_inode(sb, vino); 1720 iput(inode); 1721 1722 spin_lock(&session->s_cap_lock); 1723 } 1724 } 1725 1726 // drop cap expires and unlock s_cap_lock 1727 detach_cap_releases(session, &dispose); 1728 1729 BUG_ON(session->s_nr_caps > 0); 1730 BUG_ON(!list_empty(&session->s_cap_flushing)); 1731 spin_unlock(&session->s_cap_lock); 1732 dispose_cap_releases(session->s_mdsc, &dispose); 1733 } 1734 1735 enum { 1736 RECONNECT, 1737 RENEWCAPS, 1738 FORCE_RO, 1739 }; 1740 1741 /* 1742 * wake up any threads waiting on this session's caps. if the cap is 1743 * old (didn't get renewed on the client reconnect), remove it now. 1744 * 1745 * caller must hold s_mutex. 1746 */ 1747 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1748 void *arg) 1749 { 1750 struct ceph_inode_info *ci = ceph_inode(inode); 1751 unsigned long ev = (unsigned long)arg; 1752 1753 if (ev == RECONNECT) { 1754 spin_lock(&ci->i_ceph_lock); 1755 ci->i_wanted_max_size = 0; 1756 ci->i_requested_max_size = 0; 1757 spin_unlock(&ci->i_ceph_lock); 1758 } else if (ev == RENEWCAPS) { 1759 if (cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) { 1760 /* mds did not re-issue stale cap */ 1761 spin_lock(&ci->i_ceph_lock); 1762 cap->issued = cap->implemented = CEPH_CAP_PIN; 1763 spin_unlock(&ci->i_ceph_lock); 1764 } 1765 } else if (ev == FORCE_RO) { 1766 } 1767 wake_up_all(&ci->i_cap_wq); 1768 return 0; 1769 } 1770 1771 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1772 { 1773 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1774 ceph_iterate_session_caps(session, wake_up_session_cb, 1775 (void *)(unsigned long)ev); 1776 } 1777 1778 /* 1779 * Send periodic message to MDS renewing all currently held caps. The 1780 * ack will reset the expiration for all caps from this session. 1781 * 1782 * caller holds s_mutex 1783 */ 1784 static int send_renew_caps(struct ceph_mds_client *mdsc, 1785 struct ceph_mds_session *session) 1786 { 1787 struct ceph_msg *msg; 1788 int state; 1789 1790 if (time_after_eq(jiffies, session->s_cap_ttl) && 1791 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1792 pr_info("mds%d caps stale\n", session->s_mds); 1793 session->s_renew_requested = jiffies; 1794 1795 /* do not try to renew caps until a recovering mds has reconnected 1796 * with its clients. */ 1797 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1798 if (state < CEPH_MDS_STATE_RECONNECT) { 1799 dout("send_renew_caps ignoring mds%d (%s)\n", 1800 session->s_mds, ceph_mds_state_name(state)); 1801 return 0; 1802 } 1803 1804 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1805 ceph_mds_state_name(state)); 1806 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1807 ++session->s_renew_seq); 1808 if (!msg) 1809 return -ENOMEM; 1810 ceph_con_send(&session->s_con, msg); 1811 return 0; 1812 } 1813 1814 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1815 struct ceph_mds_session *session, u64 seq) 1816 { 1817 struct ceph_msg *msg; 1818 1819 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1820 session->s_mds, ceph_session_state_name(session->s_state), seq); 1821 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1822 if (!msg) 1823 return -ENOMEM; 1824 ceph_con_send(&session->s_con, msg); 1825 return 0; 1826 } 1827 1828 1829 /* 1830 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1831 * 1832 * Called under session->s_mutex 1833 */ 1834 static void renewed_caps(struct ceph_mds_client *mdsc, 1835 struct ceph_mds_session *session, int is_renew) 1836 { 1837 int was_stale; 1838 int wake = 0; 1839 1840 spin_lock(&session->s_cap_lock); 1841 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1842 1843 session->s_cap_ttl = session->s_renew_requested + 1844 mdsc->mdsmap->m_session_timeout*HZ; 1845 1846 if (was_stale) { 1847 if (time_before(jiffies, session->s_cap_ttl)) { 1848 pr_info("mds%d caps renewed\n", session->s_mds); 1849 wake = 1; 1850 } else { 1851 pr_info("mds%d caps still stale\n", session->s_mds); 1852 } 1853 } 1854 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1855 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1856 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1857 spin_unlock(&session->s_cap_lock); 1858 1859 if (wake) 1860 wake_up_session_caps(session, RENEWCAPS); 1861 } 1862 1863 /* 1864 * send a session close request 1865 */ 1866 static int request_close_session(struct ceph_mds_session *session) 1867 { 1868 struct ceph_msg *msg; 1869 1870 dout("request_close_session mds%d state %s seq %lld\n", 1871 session->s_mds, ceph_session_state_name(session->s_state), 1872 session->s_seq); 1873 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1874 if (!msg) 1875 return -ENOMEM; 1876 ceph_con_send(&session->s_con, msg); 1877 return 1; 1878 } 1879 1880 /* 1881 * Called with s_mutex held. 1882 */ 1883 static int __close_session(struct ceph_mds_client *mdsc, 1884 struct ceph_mds_session *session) 1885 { 1886 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1887 return 0; 1888 session->s_state = CEPH_MDS_SESSION_CLOSING; 1889 return request_close_session(session); 1890 } 1891 1892 static bool drop_negative_children(struct dentry *dentry) 1893 { 1894 struct dentry *child; 1895 bool all_negative = true; 1896 1897 if (!d_is_dir(dentry)) 1898 goto out; 1899 1900 spin_lock(&dentry->d_lock); 1901 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1902 if (d_really_is_positive(child)) { 1903 all_negative = false; 1904 break; 1905 } 1906 } 1907 spin_unlock(&dentry->d_lock); 1908 1909 if (all_negative) 1910 shrink_dcache_parent(dentry); 1911 out: 1912 return all_negative; 1913 } 1914 1915 /* 1916 * Trim old(er) caps. 1917 * 1918 * Because we can't cache an inode without one or more caps, we do 1919 * this indirectly: if a cap is unused, we prune its aliases, at which 1920 * point the inode will hopefully get dropped to. 1921 * 1922 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1923 * memory pressure from the MDS, though, so it needn't be perfect. 1924 */ 1925 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1926 { 1927 int *remaining = arg; 1928 struct ceph_inode_info *ci = ceph_inode(inode); 1929 int used, wanted, oissued, mine; 1930 1931 if (*remaining <= 0) 1932 return -1; 1933 1934 spin_lock(&ci->i_ceph_lock); 1935 mine = cap->issued | cap->implemented; 1936 used = __ceph_caps_used(ci); 1937 wanted = __ceph_caps_file_wanted(ci); 1938 oissued = __ceph_caps_issued_other(ci, cap); 1939 1940 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1941 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1942 ceph_cap_string(used), ceph_cap_string(wanted)); 1943 if (cap == ci->i_auth_cap) { 1944 if (ci->i_dirty_caps || ci->i_flushing_caps || 1945 !list_empty(&ci->i_cap_snaps)) 1946 goto out; 1947 if ((used | wanted) & CEPH_CAP_ANY_WR) 1948 goto out; 1949 /* Note: it's possible that i_filelock_ref becomes non-zero 1950 * after dropping auth caps. It doesn't hurt because reply 1951 * of lock mds request will re-add auth caps. */ 1952 if (atomic_read(&ci->i_filelock_ref) > 0) 1953 goto out; 1954 } 1955 /* The inode has cached pages, but it's no longer used. 1956 * we can safely drop it */ 1957 if (S_ISREG(inode->i_mode) && 1958 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 1959 !(oissued & CEPH_CAP_FILE_CACHE)) { 1960 used = 0; 1961 oissued = 0; 1962 } 1963 if ((used | wanted) & ~oissued & mine) 1964 goto out; /* we need these caps */ 1965 1966 if (oissued) { 1967 /* we aren't the only cap.. just remove us */ 1968 __ceph_remove_cap(cap, true); 1969 (*remaining)--; 1970 } else { 1971 struct dentry *dentry; 1972 /* try dropping referring dentries */ 1973 spin_unlock(&ci->i_ceph_lock); 1974 dentry = d_find_any_alias(inode); 1975 if (dentry && drop_negative_children(dentry)) { 1976 int count; 1977 dput(dentry); 1978 d_prune_aliases(inode); 1979 count = atomic_read(&inode->i_count); 1980 if (count == 1) 1981 (*remaining)--; 1982 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1983 inode, cap, count); 1984 } else { 1985 dput(dentry); 1986 } 1987 return 0; 1988 } 1989 1990 out: 1991 spin_unlock(&ci->i_ceph_lock); 1992 return 0; 1993 } 1994 1995 /* 1996 * Trim session cap count down to some max number. 1997 */ 1998 int ceph_trim_caps(struct ceph_mds_client *mdsc, 1999 struct ceph_mds_session *session, 2000 int max_caps) 2001 { 2002 int trim_caps = session->s_nr_caps - max_caps; 2003 2004 dout("trim_caps mds%d start: %d / %d, trim %d\n", 2005 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 2006 if (trim_caps > 0) { 2007 int remaining = trim_caps; 2008 2009 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2010 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 2011 session->s_mds, session->s_nr_caps, max_caps, 2012 trim_caps - remaining); 2013 } 2014 2015 ceph_flush_cap_releases(mdsc, session); 2016 return 0; 2017 } 2018 2019 static int check_caps_flush(struct ceph_mds_client *mdsc, 2020 u64 want_flush_tid) 2021 { 2022 int ret = 1; 2023 2024 spin_lock(&mdsc->cap_dirty_lock); 2025 if (!list_empty(&mdsc->cap_flush_list)) { 2026 struct ceph_cap_flush *cf = 2027 list_first_entry(&mdsc->cap_flush_list, 2028 struct ceph_cap_flush, g_list); 2029 if (cf->tid <= want_flush_tid) { 2030 dout("check_caps_flush still flushing tid " 2031 "%llu <= %llu\n", cf->tid, want_flush_tid); 2032 ret = 0; 2033 } 2034 } 2035 spin_unlock(&mdsc->cap_dirty_lock); 2036 return ret; 2037 } 2038 2039 /* 2040 * flush all dirty inode data to disk. 2041 * 2042 * returns true if we've flushed through want_flush_tid 2043 */ 2044 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2045 u64 want_flush_tid) 2046 { 2047 dout("check_caps_flush want %llu\n", want_flush_tid); 2048 2049 wait_event(mdsc->cap_flushing_wq, 2050 check_caps_flush(mdsc, want_flush_tid)); 2051 2052 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 2053 } 2054 2055 /* 2056 * called under s_mutex 2057 */ 2058 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2059 struct ceph_mds_session *session) 2060 { 2061 struct ceph_msg *msg = NULL; 2062 struct ceph_mds_cap_release *head; 2063 struct ceph_mds_cap_item *item; 2064 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2065 struct ceph_cap *cap; 2066 LIST_HEAD(tmp_list); 2067 int num_cap_releases; 2068 __le32 barrier, *cap_barrier; 2069 2070 down_read(&osdc->lock); 2071 barrier = cpu_to_le32(osdc->epoch_barrier); 2072 up_read(&osdc->lock); 2073 2074 spin_lock(&session->s_cap_lock); 2075 again: 2076 list_splice_init(&session->s_cap_releases, &tmp_list); 2077 num_cap_releases = session->s_num_cap_releases; 2078 session->s_num_cap_releases = 0; 2079 spin_unlock(&session->s_cap_lock); 2080 2081 while (!list_empty(&tmp_list)) { 2082 if (!msg) { 2083 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2084 PAGE_SIZE, GFP_NOFS, false); 2085 if (!msg) 2086 goto out_err; 2087 head = msg->front.iov_base; 2088 head->num = cpu_to_le32(0); 2089 msg->front.iov_len = sizeof(*head); 2090 2091 msg->hdr.version = cpu_to_le16(2); 2092 msg->hdr.compat_version = cpu_to_le16(1); 2093 } 2094 2095 cap = list_first_entry(&tmp_list, struct ceph_cap, 2096 session_caps); 2097 list_del(&cap->session_caps); 2098 num_cap_releases--; 2099 2100 head = msg->front.iov_base; 2101 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2102 &head->num); 2103 item = msg->front.iov_base + msg->front.iov_len; 2104 item->ino = cpu_to_le64(cap->cap_ino); 2105 item->cap_id = cpu_to_le64(cap->cap_id); 2106 item->migrate_seq = cpu_to_le32(cap->mseq); 2107 item->seq = cpu_to_le32(cap->issue_seq); 2108 msg->front.iov_len += sizeof(*item); 2109 2110 ceph_put_cap(mdsc, cap); 2111 2112 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2113 // Append cap_barrier field 2114 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2115 *cap_barrier = barrier; 2116 msg->front.iov_len += sizeof(*cap_barrier); 2117 2118 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2119 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2120 ceph_con_send(&session->s_con, msg); 2121 msg = NULL; 2122 } 2123 } 2124 2125 BUG_ON(num_cap_releases != 0); 2126 2127 spin_lock(&session->s_cap_lock); 2128 if (!list_empty(&session->s_cap_releases)) 2129 goto again; 2130 spin_unlock(&session->s_cap_lock); 2131 2132 if (msg) { 2133 // Append cap_barrier field 2134 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2135 *cap_barrier = barrier; 2136 msg->front.iov_len += sizeof(*cap_barrier); 2137 2138 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2139 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2140 ceph_con_send(&session->s_con, msg); 2141 } 2142 return; 2143 out_err: 2144 pr_err("send_cap_releases mds%d, failed to allocate message\n", 2145 session->s_mds); 2146 spin_lock(&session->s_cap_lock); 2147 list_splice(&tmp_list, &session->s_cap_releases); 2148 session->s_num_cap_releases += num_cap_releases; 2149 spin_unlock(&session->s_cap_lock); 2150 } 2151 2152 static void ceph_cap_release_work(struct work_struct *work) 2153 { 2154 struct ceph_mds_session *session = 2155 container_of(work, struct ceph_mds_session, s_cap_release_work); 2156 2157 mutex_lock(&session->s_mutex); 2158 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2159 session->s_state == CEPH_MDS_SESSION_HUNG) 2160 ceph_send_cap_releases(session->s_mdsc, session); 2161 mutex_unlock(&session->s_mutex); 2162 ceph_put_mds_session(session); 2163 } 2164 2165 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 2166 struct ceph_mds_session *session) 2167 { 2168 if (mdsc->stopping) 2169 return; 2170 2171 ceph_get_mds_session(session); 2172 if (queue_work(mdsc->fsc->cap_wq, 2173 &session->s_cap_release_work)) { 2174 dout("cap release work queued\n"); 2175 } else { 2176 ceph_put_mds_session(session); 2177 dout("failed to queue cap release work\n"); 2178 } 2179 } 2180 2181 /* 2182 * caller holds session->s_cap_lock 2183 */ 2184 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2185 struct ceph_cap *cap) 2186 { 2187 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2188 session->s_num_cap_releases++; 2189 2190 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2191 ceph_flush_cap_releases(session->s_mdsc, session); 2192 } 2193 2194 static void ceph_cap_reclaim_work(struct work_struct *work) 2195 { 2196 struct ceph_mds_client *mdsc = 2197 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2198 int ret = ceph_trim_dentries(mdsc); 2199 if (ret == -EAGAIN) 2200 ceph_queue_cap_reclaim_work(mdsc); 2201 } 2202 2203 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2204 { 2205 if (mdsc->stopping) 2206 return; 2207 2208 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2209 dout("caps reclaim work queued\n"); 2210 } else { 2211 dout("failed to queue caps release work\n"); 2212 } 2213 } 2214 2215 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2216 { 2217 int val; 2218 if (!nr) 2219 return; 2220 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2221 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2222 atomic_set(&mdsc->cap_reclaim_pending, 0); 2223 ceph_queue_cap_reclaim_work(mdsc); 2224 } 2225 } 2226 2227 /* 2228 * requests 2229 */ 2230 2231 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2232 struct inode *dir) 2233 { 2234 struct ceph_inode_info *ci = ceph_inode(dir); 2235 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2236 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2237 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2238 unsigned int num_entries; 2239 int order; 2240 2241 spin_lock(&ci->i_ceph_lock); 2242 num_entries = ci->i_files + ci->i_subdirs; 2243 spin_unlock(&ci->i_ceph_lock); 2244 num_entries = max(num_entries, 1U); 2245 num_entries = min(num_entries, opt->max_readdir); 2246 2247 order = get_order(size * num_entries); 2248 while (order >= 0) { 2249 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2250 __GFP_NOWARN, 2251 order); 2252 if (rinfo->dir_entries) 2253 break; 2254 order--; 2255 } 2256 if (!rinfo->dir_entries) 2257 return -ENOMEM; 2258 2259 num_entries = (PAGE_SIZE << order) / size; 2260 num_entries = min(num_entries, opt->max_readdir); 2261 2262 rinfo->dir_buf_size = PAGE_SIZE << order; 2263 req->r_num_caps = num_entries + 1; 2264 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2265 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2266 return 0; 2267 } 2268 2269 /* 2270 * Create an mds request. 2271 */ 2272 struct ceph_mds_request * 2273 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2274 { 2275 struct ceph_mds_request *req; 2276 2277 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2278 if (!req) 2279 return ERR_PTR(-ENOMEM); 2280 2281 mutex_init(&req->r_fill_mutex); 2282 req->r_mdsc = mdsc; 2283 req->r_started = jiffies; 2284 req->r_start_latency = ktime_get(); 2285 req->r_resend_mds = -1; 2286 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2287 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2288 req->r_fmode = -1; 2289 kref_init(&req->r_kref); 2290 RB_CLEAR_NODE(&req->r_node); 2291 INIT_LIST_HEAD(&req->r_wait); 2292 init_completion(&req->r_completion); 2293 init_completion(&req->r_safe_completion); 2294 INIT_LIST_HEAD(&req->r_unsafe_item); 2295 2296 ktime_get_coarse_real_ts64(&req->r_stamp); 2297 2298 req->r_op = op; 2299 req->r_direct_mode = mode; 2300 return req; 2301 } 2302 2303 /* 2304 * return oldest (lowest) request, tid in request tree, 0 if none. 2305 * 2306 * called under mdsc->mutex. 2307 */ 2308 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2309 { 2310 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2311 return NULL; 2312 return rb_entry(rb_first(&mdsc->request_tree), 2313 struct ceph_mds_request, r_node); 2314 } 2315 2316 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2317 { 2318 return mdsc->oldest_tid; 2319 } 2320 2321 /* 2322 * Build a dentry's path. Allocate on heap; caller must kfree. Based 2323 * on build_path_from_dentry in fs/cifs/dir.c. 2324 * 2325 * If @stop_on_nosnap, generate path relative to the first non-snapped 2326 * inode. 2327 * 2328 * Encode hidden .snap dirs as a double /, i.e. 2329 * foo/.snap/bar -> foo//bar 2330 */ 2331 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, 2332 int stop_on_nosnap) 2333 { 2334 struct dentry *temp; 2335 char *path; 2336 int pos; 2337 unsigned seq; 2338 u64 base; 2339 2340 if (!dentry) 2341 return ERR_PTR(-EINVAL); 2342 2343 path = __getname(); 2344 if (!path) 2345 return ERR_PTR(-ENOMEM); 2346 retry: 2347 pos = PATH_MAX - 1; 2348 path[pos] = '\0'; 2349 2350 seq = read_seqbegin(&rename_lock); 2351 rcu_read_lock(); 2352 temp = dentry; 2353 for (;;) { 2354 struct inode *inode; 2355 2356 spin_lock(&temp->d_lock); 2357 inode = d_inode(temp); 2358 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2359 dout("build_path path+%d: %p SNAPDIR\n", 2360 pos, temp); 2361 } else if (stop_on_nosnap && inode && dentry != temp && 2362 ceph_snap(inode) == CEPH_NOSNAP) { 2363 spin_unlock(&temp->d_lock); 2364 pos++; /* get rid of any prepended '/' */ 2365 break; 2366 } else { 2367 pos -= temp->d_name.len; 2368 if (pos < 0) { 2369 spin_unlock(&temp->d_lock); 2370 break; 2371 } 2372 memcpy(path + pos, temp->d_name.name, temp->d_name.len); 2373 } 2374 spin_unlock(&temp->d_lock); 2375 temp = READ_ONCE(temp->d_parent); 2376 2377 /* Are we at the root? */ 2378 if (IS_ROOT(temp)) 2379 break; 2380 2381 /* Are we out of buffer? */ 2382 if (--pos < 0) 2383 break; 2384 2385 path[pos] = '/'; 2386 } 2387 base = ceph_ino(d_inode(temp)); 2388 rcu_read_unlock(); 2389 2390 if (read_seqretry(&rename_lock, seq)) 2391 goto retry; 2392 2393 if (pos < 0) { 2394 /* 2395 * A rename didn't occur, but somehow we didn't end up where 2396 * we thought we would. Throw a warning and try again. 2397 */ 2398 pr_warn("build_path did not end path lookup where " 2399 "expected, pos is %d\n", pos); 2400 goto retry; 2401 } 2402 2403 *pbase = base; 2404 *plen = PATH_MAX - 1 - pos; 2405 dout("build_path on %p %d built %llx '%.*s'\n", 2406 dentry, d_count(dentry), base, *plen, path + pos); 2407 return path + pos; 2408 } 2409 2410 static int build_dentry_path(struct dentry *dentry, struct inode *dir, 2411 const char **ppath, int *ppathlen, u64 *pino, 2412 bool *pfreepath, bool parent_locked) 2413 { 2414 char *path; 2415 2416 rcu_read_lock(); 2417 if (!dir) 2418 dir = d_inode_rcu(dentry->d_parent); 2419 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { 2420 *pino = ceph_ino(dir); 2421 rcu_read_unlock(); 2422 *ppath = dentry->d_name.name; 2423 *ppathlen = dentry->d_name.len; 2424 return 0; 2425 } 2426 rcu_read_unlock(); 2427 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2428 if (IS_ERR(path)) 2429 return PTR_ERR(path); 2430 *ppath = path; 2431 *pfreepath = true; 2432 return 0; 2433 } 2434 2435 static int build_inode_path(struct inode *inode, 2436 const char **ppath, int *ppathlen, u64 *pino, 2437 bool *pfreepath) 2438 { 2439 struct dentry *dentry; 2440 char *path; 2441 2442 if (ceph_snap(inode) == CEPH_NOSNAP) { 2443 *pino = ceph_ino(inode); 2444 *ppathlen = 0; 2445 return 0; 2446 } 2447 dentry = d_find_alias(inode); 2448 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2449 dput(dentry); 2450 if (IS_ERR(path)) 2451 return PTR_ERR(path); 2452 *ppath = path; 2453 *pfreepath = true; 2454 return 0; 2455 } 2456 2457 /* 2458 * request arguments may be specified via an inode *, a dentry *, or 2459 * an explicit ino+path. 2460 */ 2461 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2462 struct inode *rdiri, const char *rpath, 2463 u64 rino, const char **ppath, int *pathlen, 2464 u64 *ino, bool *freepath, bool parent_locked) 2465 { 2466 int r = 0; 2467 2468 if (rinode) { 2469 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2470 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2471 ceph_snap(rinode)); 2472 } else if (rdentry) { 2473 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2474 freepath, parent_locked); 2475 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2476 *ppath); 2477 } else if (rpath || rino) { 2478 *ino = rino; 2479 *ppath = rpath; 2480 *pathlen = rpath ? strlen(rpath) : 0; 2481 dout(" path %.*s\n", *pathlen, rpath); 2482 } 2483 2484 return r; 2485 } 2486 2487 static void encode_timestamp_and_gids(void **p, 2488 const struct ceph_mds_request *req) 2489 { 2490 struct ceph_timespec ts; 2491 int i; 2492 2493 ceph_encode_timespec64(&ts, &req->r_stamp); 2494 ceph_encode_copy(p, &ts, sizeof(ts)); 2495 2496 /* gid_list */ 2497 ceph_encode_32(p, req->r_cred->group_info->ngroups); 2498 for (i = 0; i < req->r_cred->group_info->ngroups; i++) 2499 ceph_encode_64(p, from_kgid(&init_user_ns, 2500 req->r_cred->group_info->gid[i])); 2501 } 2502 2503 /* 2504 * called under mdsc->mutex 2505 */ 2506 static struct ceph_msg *create_request_message(struct ceph_mds_session *session, 2507 struct ceph_mds_request *req, 2508 bool drop_cap_releases) 2509 { 2510 int mds = session->s_mds; 2511 struct ceph_mds_client *mdsc = session->s_mdsc; 2512 struct ceph_msg *msg; 2513 struct ceph_mds_request_head_old *head; 2514 const char *path1 = NULL; 2515 const char *path2 = NULL; 2516 u64 ino1 = 0, ino2 = 0; 2517 int pathlen1 = 0, pathlen2 = 0; 2518 bool freepath1 = false, freepath2 = false; 2519 int len; 2520 u16 releases; 2521 void *p, *end; 2522 int ret; 2523 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); 2524 2525 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2526 req->r_parent, req->r_path1, req->r_ino1.ino, 2527 &path1, &pathlen1, &ino1, &freepath1, 2528 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2529 &req->r_req_flags)); 2530 if (ret < 0) { 2531 msg = ERR_PTR(ret); 2532 goto out; 2533 } 2534 2535 /* If r_old_dentry is set, then assume that its parent is locked */ 2536 ret = set_request_path_attr(NULL, req->r_old_dentry, 2537 req->r_old_dentry_dir, 2538 req->r_path2, req->r_ino2.ino, 2539 &path2, &pathlen2, &ino2, &freepath2, true); 2540 if (ret < 0) { 2541 msg = ERR_PTR(ret); 2542 goto out_free1; 2543 } 2544 2545 len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head); 2546 len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2547 sizeof(struct ceph_timespec); 2548 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); 2549 2550 /* calculate (max) length for cap releases */ 2551 len += sizeof(struct ceph_mds_request_release) * 2552 (!!req->r_inode_drop + !!req->r_dentry_drop + 2553 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2554 2555 if (req->r_dentry_drop) 2556 len += pathlen1; 2557 if (req->r_old_dentry_drop) 2558 len += pathlen2; 2559 2560 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2561 if (!msg) { 2562 msg = ERR_PTR(-ENOMEM); 2563 goto out_free2; 2564 } 2565 2566 msg->hdr.tid = cpu_to_le64(req->r_tid); 2567 2568 /* 2569 * The old ceph_mds_request_head didn't contain a version field, and 2570 * one was added when we moved the message version from 3->4. 2571 */ 2572 if (legacy) { 2573 msg->hdr.version = cpu_to_le16(3); 2574 head = msg->front.iov_base; 2575 p = msg->front.iov_base + sizeof(*head); 2576 } else { 2577 struct ceph_mds_request_head *new_head = msg->front.iov_base; 2578 2579 msg->hdr.version = cpu_to_le16(4); 2580 new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); 2581 head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2582 p = msg->front.iov_base + sizeof(*new_head); 2583 } 2584 2585 end = msg->front.iov_base + msg->front.iov_len; 2586 2587 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2588 head->op = cpu_to_le32(req->r_op); 2589 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, 2590 req->r_cred->fsuid)); 2591 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, 2592 req->r_cred->fsgid)); 2593 head->ino = cpu_to_le64(req->r_deleg_ino); 2594 head->args = req->r_args; 2595 2596 ceph_encode_filepath(&p, end, ino1, path1); 2597 ceph_encode_filepath(&p, end, ino2, path2); 2598 2599 /* make note of release offset, in case we need to replay */ 2600 req->r_request_release_offset = p - msg->front.iov_base; 2601 2602 /* cap releases */ 2603 releases = 0; 2604 if (req->r_inode_drop) 2605 releases += ceph_encode_inode_release(&p, 2606 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2607 mds, req->r_inode_drop, req->r_inode_unless, 2608 req->r_op == CEPH_MDS_OP_READDIR); 2609 if (req->r_dentry_drop) 2610 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2611 req->r_parent, mds, req->r_dentry_drop, 2612 req->r_dentry_unless); 2613 if (req->r_old_dentry_drop) 2614 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2615 req->r_old_dentry_dir, mds, 2616 req->r_old_dentry_drop, 2617 req->r_old_dentry_unless); 2618 if (req->r_old_inode_drop) 2619 releases += ceph_encode_inode_release(&p, 2620 d_inode(req->r_old_dentry), 2621 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2622 2623 if (drop_cap_releases) { 2624 releases = 0; 2625 p = msg->front.iov_base + req->r_request_release_offset; 2626 } 2627 2628 head->num_releases = cpu_to_le16(releases); 2629 2630 encode_timestamp_and_gids(&p, req); 2631 2632 if (WARN_ON_ONCE(p > end)) { 2633 ceph_msg_put(msg); 2634 msg = ERR_PTR(-ERANGE); 2635 goto out_free2; 2636 } 2637 2638 msg->front.iov_len = p - msg->front.iov_base; 2639 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2640 2641 if (req->r_pagelist) { 2642 struct ceph_pagelist *pagelist = req->r_pagelist; 2643 ceph_msg_data_add_pagelist(msg, pagelist); 2644 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2645 } else { 2646 msg->hdr.data_len = 0; 2647 } 2648 2649 msg->hdr.data_off = cpu_to_le16(0); 2650 2651 out_free2: 2652 if (freepath2) 2653 ceph_mdsc_free_path((char *)path2, pathlen2); 2654 out_free1: 2655 if (freepath1) 2656 ceph_mdsc_free_path((char *)path1, pathlen1); 2657 out: 2658 return msg; 2659 } 2660 2661 /* 2662 * called under mdsc->mutex if error, under no mutex if 2663 * success. 2664 */ 2665 static void complete_request(struct ceph_mds_client *mdsc, 2666 struct ceph_mds_request *req) 2667 { 2668 req->r_end_latency = ktime_get(); 2669 2670 if (req->r_callback) 2671 req->r_callback(mdsc, req); 2672 complete_all(&req->r_completion); 2673 } 2674 2675 static struct ceph_mds_request_head_old * 2676 find_old_request_head(void *p, u64 features) 2677 { 2678 bool legacy = !(features & CEPH_FEATURE_FS_BTIME); 2679 struct ceph_mds_request_head *new_head; 2680 2681 if (legacy) 2682 return (struct ceph_mds_request_head_old *)p; 2683 new_head = (struct ceph_mds_request_head *)p; 2684 return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2685 } 2686 2687 /* 2688 * called under mdsc->mutex 2689 */ 2690 static int __prepare_send_request(struct ceph_mds_session *session, 2691 struct ceph_mds_request *req, 2692 bool drop_cap_releases) 2693 { 2694 int mds = session->s_mds; 2695 struct ceph_mds_client *mdsc = session->s_mdsc; 2696 struct ceph_mds_request_head_old *rhead; 2697 struct ceph_msg *msg; 2698 int flags = 0; 2699 2700 req->r_attempts++; 2701 if (req->r_inode) { 2702 struct ceph_cap *cap = 2703 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2704 2705 if (cap) 2706 req->r_sent_on_mseq = cap->mseq; 2707 else 2708 req->r_sent_on_mseq = -1; 2709 } 2710 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 2711 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2712 2713 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2714 void *p; 2715 2716 /* 2717 * Replay. Do not regenerate message (and rebuild 2718 * paths, etc.); just use the original message. 2719 * Rebuilding paths will break for renames because 2720 * d_move mangles the src name. 2721 */ 2722 msg = req->r_request; 2723 rhead = find_old_request_head(msg->front.iov_base, 2724 session->s_con.peer_features); 2725 2726 flags = le32_to_cpu(rhead->flags); 2727 flags |= CEPH_MDS_FLAG_REPLAY; 2728 rhead->flags = cpu_to_le32(flags); 2729 2730 if (req->r_target_inode) 2731 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2732 2733 rhead->num_retry = req->r_attempts - 1; 2734 2735 /* remove cap/dentry releases from message */ 2736 rhead->num_releases = 0; 2737 2738 p = msg->front.iov_base + req->r_request_release_offset; 2739 encode_timestamp_and_gids(&p, req); 2740 2741 msg->front.iov_len = p - msg->front.iov_base; 2742 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2743 return 0; 2744 } 2745 2746 if (req->r_request) { 2747 ceph_msg_put(req->r_request); 2748 req->r_request = NULL; 2749 } 2750 msg = create_request_message(session, req, drop_cap_releases); 2751 if (IS_ERR(msg)) { 2752 req->r_err = PTR_ERR(msg); 2753 return PTR_ERR(msg); 2754 } 2755 req->r_request = msg; 2756 2757 rhead = find_old_request_head(msg->front.iov_base, 2758 session->s_con.peer_features); 2759 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2760 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2761 flags |= CEPH_MDS_FLAG_REPLAY; 2762 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 2763 flags |= CEPH_MDS_FLAG_ASYNC; 2764 if (req->r_parent) 2765 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2766 rhead->flags = cpu_to_le32(flags); 2767 rhead->num_fwd = req->r_num_fwd; 2768 rhead->num_retry = req->r_attempts - 1; 2769 2770 dout(" r_parent = %p\n", req->r_parent); 2771 return 0; 2772 } 2773 2774 /* 2775 * called under mdsc->mutex 2776 */ 2777 static int __send_request(struct ceph_mds_session *session, 2778 struct ceph_mds_request *req, 2779 bool drop_cap_releases) 2780 { 2781 int err; 2782 2783 err = __prepare_send_request(session, req, drop_cap_releases); 2784 if (!err) { 2785 ceph_msg_get(req->r_request); 2786 ceph_con_send(&session->s_con, req->r_request); 2787 } 2788 2789 return err; 2790 } 2791 2792 /* 2793 * send request, or put it on the appropriate wait list. 2794 */ 2795 static void __do_request(struct ceph_mds_client *mdsc, 2796 struct ceph_mds_request *req) 2797 { 2798 struct ceph_mds_session *session = NULL; 2799 int mds = -1; 2800 int err = 0; 2801 bool random; 2802 2803 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2804 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2805 __unregister_request(mdsc, req); 2806 return; 2807 } 2808 2809 if (req->r_timeout && 2810 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2811 dout("do_request timed out\n"); 2812 err = -ETIMEDOUT; 2813 goto finish; 2814 } 2815 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2816 dout("do_request forced umount\n"); 2817 err = -EIO; 2818 goto finish; 2819 } 2820 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2821 if (mdsc->mdsmap_err) { 2822 err = mdsc->mdsmap_err; 2823 dout("do_request mdsmap err %d\n", err); 2824 goto finish; 2825 } 2826 if (mdsc->mdsmap->m_epoch == 0) { 2827 dout("do_request no mdsmap, waiting for map\n"); 2828 list_add(&req->r_wait, &mdsc->waiting_for_map); 2829 return; 2830 } 2831 if (!(mdsc->fsc->mount_options->flags & 2832 CEPH_MOUNT_OPT_MOUNTWAIT) && 2833 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2834 err = -EHOSTUNREACH; 2835 goto finish; 2836 } 2837 } 2838 2839 put_request_session(req); 2840 2841 mds = __choose_mds(mdsc, req, &random); 2842 if (mds < 0 || 2843 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2844 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2845 err = -EJUKEBOX; 2846 goto finish; 2847 } 2848 dout("do_request no mds or not active, waiting for map\n"); 2849 list_add(&req->r_wait, &mdsc->waiting_for_map); 2850 return; 2851 } 2852 2853 /* get, open session */ 2854 session = __ceph_lookup_mds_session(mdsc, mds); 2855 if (!session) { 2856 session = register_session(mdsc, mds); 2857 if (IS_ERR(session)) { 2858 err = PTR_ERR(session); 2859 goto finish; 2860 } 2861 } 2862 req->r_session = ceph_get_mds_session(session); 2863 2864 dout("do_request mds%d session %p state %s\n", mds, session, 2865 ceph_session_state_name(session->s_state)); 2866 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2867 session->s_state != CEPH_MDS_SESSION_HUNG) { 2868 /* 2869 * We cannot queue async requests since the caps and delegated 2870 * inodes are bound to the session. Just return -EJUKEBOX and 2871 * let the caller retry a sync request in that case. 2872 */ 2873 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2874 err = -EJUKEBOX; 2875 goto out_session; 2876 } 2877 2878 /* 2879 * If the session has been REJECTED, then return a hard error, 2880 * unless it's a CLEANRECOVER mount, in which case we'll queue 2881 * it to the mdsc queue. 2882 */ 2883 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2884 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) 2885 list_add(&req->r_wait, &mdsc->waiting_for_map); 2886 else 2887 err = -EACCES; 2888 goto out_session; 2889 } 2890 2891 if (session->s_state == CEPH_MDS_SESSION_NEW || 2892 session->s_state == CEPH_MDS_SESSION_CLOSING) { 2893 err = __open_session(mdsc, session); 2894 if (err) 2895 goto out_session; 2896 /* retry the same mds later */ 2897 if (random) 2898 req->r_resend_mds = mds; 2899 } 2900 list_add(&req->r_wait, &session->s_waiting); 2901 goto out_session; 2902 } 2903 2904 /* send request */ 2905 req->r_resend_mds = -1; /* forget any previous mds hint */ 2906 2907 if (req->r_request_started == 0) /* note request start time */ 2908 req->r_request_started = jiffies; 2909 2910 err = __send_request(session, req, false); 2911 2912 out_session: 2913 ceph_put_mds_session(session); 2914 finish: 2915 if (err) { 2916 dout("__do_request early error %d\n", err); 2917 req->r_err = err; 2918 complete_request(mdsc, req); 2919 __unregister_request(mdsc, req); 2920 } 2921 return; 2922 } 2923 2924 /* 2925 * called under mdsc->mutex 2926 */ 2927 static void __wake_requests(struct ceph_mds_client *mdsc, 2928 struct list_head *head) 2929 { 2930 struct ceph_mds_request *req; 2931 LIST_HEAD(tmp_list); 2932 2933 list_splice_init(head, &tmp_list); 2934 2935 while (!list_empty(&tmp_list)) { 2936 req = list_entry(tmp_list.next, 2937 struct ceph_mds_request, r_wait); 2938 list_del_init(&req->r_wait); 2939 dout(" wake request %p tid %llu\n", req, req->r_tid); 2940 __do_request(mdsc, req); 2941 } 2942 } 2943 2944 /* 2945 * Wake up threads with requests pending for @mds, so that they can 2946 * resubmit their requests to a possibly different mds. 2947 */ 2948 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 2949 { 2950 struct ceph_mds_request *req; 2951 struct rb_node *p = rb_first(&mdsc->request_tree); 2952 2953 dout("kick_requests mds%d\n", mds); 2954 while (p) { 2955 req = rb_entry(p, struct ceph_mds_request, r_node); 2956 p = rb_next(p); 2957 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2958 continue; 2959 if (req->r_attempts > 0) 2960 continue; /* only new requests */ 2961 if (req->r_session && 2962 req->r_session->s_mds == mds) { 2963 dout(" kicking tid %llu\n", req->r_tid); 2964 list_del_init(&req->r_wait); 2965 __do_request(mdsc, req); 2966 } 2967 } 2968 } 2969 2970 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 2971 struct ceph_mds_request *req) 2972 { 2973 int err = 0; 2974 2975 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 2976 if (req->r_inode) 2977 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2978 if (req->r_parent) { 2979 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 2980 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 2981 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 2982 spin_lock(&ci->i_ceph_lock); 2983 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 2984 __ceph_touch_fmode(ci, mdsc, fmode); 2985 spin_unlock(&ci->i_ceph_lock); 2986 } 2987 if (req->r_old_dentry_dir) 2988 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2989 CEPH_CAP_PIN); 2990 2991 if (req->r_inode) { 2992 err = ceph_wait_on_async_create(req->r_inode); 2993 if (err) { 2994 dout("%s: wait for async create returned: %d\n", 2995 __func__, err); 2996 return err; 2997 } 2998 } 2999 3000 if (!err && req->r_old_inode) { 3001 err = ceph_wait_on_async_create(req->r_old_inode); 3002 if (err) { 3003 dout("%s: wait for async create returned: %d\n", 3004 __func__, err); 3005 return err; 3006 } 3007 } 3008 3009 dout("submit_request on %p for inode %p\n", req, dir); 3010 mutex_lock(&mdsc->mutex); 3011 __register_request(mdsc, req, dir); 3012 __do_request(mdsc, req); 3013 err = req->r_err; 3014 mutex_unlock(&mdsc->mutex); 3015 return err; 3016 } 3017 3018 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 3019 struct ceph_mds_request *req) 3020 { 3021 int err; 3022 3023 /* wait */ 3024 dout("do_request waiting\n"); 3025 if (!req->r_timeout && req->r_wait_for_completion) { 3026 err = req->r_wait_for_completion(mdsc, req); 3027 } else { 3028 long timeleft = wait_for_completion_killable_timeout( 3029 &req->r_completion, 3030 ceph_timeout_jiffies(req->r_timeout)); 3031 if (timeleft > 0) 3032 err = 0; 3033 else if (!timeleft) 3034 err = -ETIMEDOUT; /* timed out */ 3035 else 3036 err = timeleft; /* killed */ 3037 } 3038 dout("do_request waited, got %d\n", err); 3039 mutex_lock(&mdsc->mutex); 3040 3041 /* only abort if we didn't race with a real reply */ 3042 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3043 err = le32_to_cpu(req->r_reply_info.head->result); 3044 } else if (err < 0) { 3045 dout("aborted request %lld with %d\n", req->r_tid, err); 3046 3047 /* 3048 * ensure we aren't running concurrently with 3049 * ceph_fill_trace or ceph_readdir_prepopulate, which 3050 * rely on locks (dir mutex) held by our caller. 3051 */ 3052 mutex_lock(&req->r_fill_mutex); 3053 req->r_err = err; 3054 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3055 mutex_unlock(&req->r_fill_mutex); 3056 3057 if (req->r_parent && 3058 (req->r_op & CEPH_MDS_OP_WRITE)) 3059 ceph_invalidate_dir_request(req); 3060 } else { 3061 err = req->r_err; 3062 } 3063 3064 mutex_unlock(&mdsc->mutex); 3065 return err; 3066 } 3067 3068 /* 3069 * Synchrously perform an mds request. Take care of all of the 3070 * session setup, forwarding, retry details. 3071 */ 3072 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3073 struct inode *dir, 3074 struct ceph_mds_request *req) 3075 { 3076 int err; 3077 3078 dout("do_request on %p\n", req); 3079 3080 /* issue */ 3081 err = ceph_mdsc_submit_request(mdsc, dir, req); 3082 if (!err) 3083 err = ceph_mdsc_wait_request(mdsc, req); 3084 dout("do_request %p done, result %d\n", req, err); 3085 return err; 3086 } 3087 3088 /* 3089 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3090 * namespace request. 3091 */ 3092 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3093 { 3094 struct inode *dir = req->r_parent; 3095 struct inode *old_dir = req->r_old_dentry_dir; 3096 3097 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 3098 3099 ceph_dir_clear_complete(dir); 3100 if (old_dir) 3101 ceph_dir_clear_complete(old_dir); 3102 if (req->r_dentry) 3103 ceph_invalidate_dentry_lease(req->r_dentry); 3104 if (req->r_old_dentry) 3105 ceph_invalidate_dentry_lease(req->r_old_dentry); 3106 } 3107 3108 /* 3109 * Handle mds reply. 3110 * 3111 * We take the session mutex and parse and process the reply immediately. 3112 * This preserves the logical ordering of replies, capabilities, etc., sent 3113 * by the MDS as they are applied to our local cache. 3114 */ 3115 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3116 { 3117 struct ceph_mds_client *mdsc = session->s_mdsc; 3118 struct ceph_mds_request *req; 3119 struct ceph_mds_reply_head *head = msg->front.iov_base; 3120 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3121 struct ceph_snap_realm *realm; 3122 u64 tid; 3123 int err, result; 3124 int mds = session->s_mds; 3125 3126 if (msg->front.iov_len < sizeof(*head)) { 3127 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 3128 ceph_msg_dump(msg); 3129 return; 3130 } 3131 3132 /* get request, session */ 3133 tid = le64_to_cpu(msg->hdr.tid); 3134 mutex_lock(&mdsc->mutex); 3135 req = lookup_get_request(mdsc, tid); 3136 if (!req) { 3137 dout("handle_reply on unknown tid %llu\n", tid); 3138 mutex_unlock(&mdsc->mutex); 3139 return; 3140 } 3141 dout("handle_reply %p\n", req); 3142 3143 /* correct session? */ 3144 if (req->r_session != session) { 3145 pr_err("mdsc_handle_reply got %llu on session mds%d" 3146 " not mds%d\n", tid, session->s_mds, 3147 req->r_session ? req->r_session->s_mds : -1); 3148 mutex_unlock(&mdsc->mutex); 3149 goto out; 3150 } 3151 3152 /* dup? */ 3153 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3154 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3155 pr_warn("got a dup %s reply on %llu from mds%d\n", 3156 head->safe ? "safe" : "unsafe", tid, mds); 3157 mutex_unlock(&mdsc->mutex); 3158 goto out; 3159 } 3160 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3161 pr_warn("got unsafe after safe on %llu from mds%d\n", 3162 tid, mds); 3163 mutex_unlock(&mdsc->mutex); 3164 goto out; 3165 } 3166 3167 result = le32_to_cpu(head->result); 3168 3169 /* 3170 * Handle an ESTALE 3171 * if we're not talking to the authority, send to them 3172 * if the authority has changed while we weren't looking, 3173 * send to new authority 3174 * Otherwise we just have to return an ESTALE 3175 */ 3176 if (result == -ESTALE) { 3177 dout("got ESTALE on request %llu\n", req->r_tid); 3178 req->r_resend_mds = -1; 3179 if (req->r_direct_mode != USE_AUTH_MDS) { 3180 dout("not using auth, setting for that now\n"); 3181 req->r_direct_mode = USE_AUTH_MDS; 3182 __do_request(mdsc, req); 3183 mutex_unlock(&mdsc->mutex); 3184 goto out; 3185 } else { 3186 int mds = __choose_mds(mdsc, req, NULL); 3187 if (mds >= 0 && mds != req->r_session->s_mds) { 3188 dout("but auth changed, so resending\n"); 3189 __do_request(mdsc, req); 3190 mutex_unlock(&mdsc->mutex); 3191 goto out; 3192 } 3193 } 3194 dout("have to return ESTALE on request %llu\n", req->r_tid); 3195 } 3196 3197 3198 if (head->safe) { 3199 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3200 __unregister_request(mdsc, req); 3201 3202 /* last request during umount? */ 3203 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3204 complete_all(&mdsc->safe_umount_waiters); 3205 3206 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3207 /* 3208 * We already handled the unsafe response, now do the 3209 * cleanup. No need to examine the response; the MDS 3210 * doesn't include any result info in the safe 3211 * response. And even if it did, there is nothing 3212 * useful we could do with a revised return value. 3213 */ 3214 dout("got safe reply %llu, mds%d\n", tid, mds); 3215 3216 mutex_unlock(&mdsc->mutex); 3217 goto out; 3218 } 3219 } else { 3220 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3221 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3222 } 3223 3224 dout("handle_reply tid %lld result %d\n", tid, result); 3225 rinfo = &req->r_reply_info; 3226 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3227 err = parse_reply_info(session, msg, rinfo, (u64)-1); 3228 else 3229 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); 3230 mutex_unlock(&mdsc->mutex); 3231 3232 /* Must find target inode outside of mutexes to avoid deadlocks */ 3233 if ((err >= 0) && rinfo->head->is_target) { 3234 struct inode *in; 3235 struct ceph_vino tvino = { 3236 .ino = le64_to_cpu(rinfo->targeti.in->ino), 3237 .snap = le64_to_cpu(rinfo->targeti.in->snapid) 3238 }; 3239 3240 in = ceph_get_inode(mdsc->fsc->sb, tvino); 3241 if (IS_ERR(in)) { 3242 err = PTR_ERR(in); 3243 mutex_lock(&session->s_mutex); 3244 goto out_err; 3245 } 3246 req->r_target_inode = in; 3247 } 3248 3249 mutex_lock(&session->s_mutex); 3250 if (err < 0) { 3251 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 3252 ceph_msg_dump(msg); 3253 goto out_err; 3254 } 3255 3256 /* snap trace */ 3257 realm = NULL; 3258 if (rinfo->snapblob_len) { 3259 down_write(&mdsc->snap_rwsem); 3260 ceph_update_snap_trace(mdsc, rinfo->snapblob, 3261 rinfo->snapblob + rinfo->snapblob_len, 3262 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3263 &realm); 3264 downgrade_write(&mdsc->snap_rwsem); 3265 } else { 3266 down_read(&mdsc->snap_rwsem); 3267 } 3268 3269 /* insert trace into our cache */ 3270 mutex_lock(&req->r_fill_mutex); 3271 current->journal_info = req; 3272 err = ceph_fill_trace(mdsc->fsc->sb, req); 3273 if (err == 0) { 3274 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3275 req->r_op == CEPH_MDS_OP_LSSNAP)) 3276 ceph_readdir_prepopulate(req, req->r_session); 3277 } 3278 current->journal_info = NULL; 3279 mutex_unlock(&req->r_fill_mutex); 3280 3281 up_read(&mdsc->snap_rwsem); 3282 if (realm) 3283 ceph_put_snap_realm(mdsc, realm); 3284 3285 if (err == 0) { 3286 if (req->r_target_inode && 3287 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3288 struct ceph_inode_info *ci = 3289 ceph_inode(req->r_target_inode); 3290 spin_lock(&ci->i_unsafe_lock); 3291 list_add_tail(&req->r_unsafe_target_item, 3292 &ci->i_unsafe_iops); 3293 spin_unlock(&ci->i_unsafe_lock); 3294 } 3295 3296 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3297 } 3298 out_err: 3299 mutex_lock(&mdsc->mutex); 3300 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3301 if (err) { 3302 req->r_err = err; 3303 } else { 3304 req->r_reply = ceph_msg_get(msg); 3305 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3306 } 3307 } else { 3308 dout("reply arrived after request %lld was aborted\n", tid); 3309 } 3310 mutex_unlock(&mdsc->mutex); 3311 3312 mutex_unlock(&session->s_mutex); 3313 3314 /* kick calling process */ 3315 complete_request(mdsc, req); 3316 3317 ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency, 3318 req->r_end_latency, err); 3319 out: 3320 ceph_mdsc_put_request(req); 3321 return; 3322 } 3323 3324 3325 3326 /* 3327 * handle mds notification that our request has been forwarded. 3328 */ 3329 static void handle_forward(struct ceph_mds_client *mdsc, 3330 struct ceph_mds_session *session, 3331 struct ceph_msg *msg) 3332 { 3333 struct ceph_mds_request *req; 3334 u64 tid = le64_to_cpu(msg->hdr.tid); 3335 u32 next_mds; 3336 u32 fwd_seq; 3337 int err = -EINVAL; 3338 void *p = msg->front.iov_base; 3339 void *end = p + msg->front.iov_len; 3340 3341 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3342 next_mds = ceph_decode_32(&p); 3343 fwd_seq = ceph_decode_32(&p); 3344 3345 mutex_lock(&mdsc->mutex); 3346 req = lookup_get_request(mdsc, tid); 3347 if (!req) { 3348 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 3349 goto out; /* dup reply? */ 3350 } 3351 3352 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3353 dout("forward tid %llu aborted, unregistering\n", tid); 3354 __unregister_request(mdsc, req); 3355 } else if (fwd_seq <= req->r_num_fwd) { 3356 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 3357 tid, next_mds, req->r_num_fwd, fwd_seq); 3358 } else { 3359 /* resend. forward race not possible; mds would drop */ 3360 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3361 BUG_ON(req->r_err); 3362 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3363 req->r_attempts = 0; 3364 req->r_num_fwd = fwd_seq; 3365 req->r_resend_mds = next_mds; 3366 put_request_session(req); 3367 __do_request(mdsc, req); 3368 } 3369 ceph_mdsc_put_request(req); 3370 out: 3371 mutex_unlock(&mdsc->mutex); 3372 return; 3373 3374 bad: 3375 pr_err("mdsc_handle_forward decode error err=%d\n", err); 3376 } 3377 3378 static int __decode_session_metadata(void **p, void *end, 3379 bool *blocklisted) 3380 { 3381 /* map<string,string> */ 3382 u32 n; 3383 bool err_str; 3384 ceph_decode_32_safe(p, end, n, bad); 3385 while (n-- > 0) { 3386 u32 len; 3387 ceph_decode_32_safe(p, end, len, bad); 3388 ceph_decode_need(p, end, len, bad); 3389 err_str = !strncmp(*p, "error_string", len); 3390 *p += len; 3391 ceph_decode_32_safe(p, end, len, bad); 3392 ceph_decode_need(p, end, len, bad); 3393 /* 3394 * Match "blocklisted (blacklisted)" from newer MDSes, 3395 * or "blacklisted" from older MDSes. 3396 */ 3397 if (err_str && strnstr(*p, "blacklisted", len)) 3398 *blocklisted = true; 3399 *p += len; 3400 } 3401 return 0; 3402 bad: 3403 return -1; 3404 } 3405 3406 /* 3407 * handle a mds session control message 3408 */ 3409 static void handle_session(struct ceph_mds_session *session, 3410 struct ceph_msg *msg) 3411 { 3412 struct ceph_mds_client *mdsc = session->s_mdsc; 3413 int mds = session->s_mds; 3414 int msg_version = le16_to_cpu(msg->hdr.version); 3415 void *p = msg->front.iov_base; 3416 void *end = p + msg->front.iov_len; 3417 struct ceph_mds_session_head *h; 3418 u32 op; 3419 u64 seq, features = 0; 3420 int wake = 0; 3421 bool blocklisted = false; 3422 3423 /* decode */ 3424 ceph_decode_need(&p, end, sizeof(*h), bad); 3425 h = p; 3426 p += sizeof(*h); 3427 3428 op = le32_to_cpu(h->op); 3429 seq = le64_to_cpu(h->seq); 3430 3431 if (msg_version >= 3) { 3432 u32 len; 3433 /* version >= 2, metadata */ 3434 if (__decode_session_metadata(&p, end, &blocklisted) < 0) 3435 goto bad; 3436 /* version >= 3, feature bits */ 3437 ceph_decode_32_safe(&p, end, len, bad); 3438 if (len) { 3439 ceph_decode_64_safe(&p, end, features, bad); 3440 p += len - sizeof(features); 3441 } 3442 } 3443 3444 mutex_lock(&mdsc->mutex); 3445 if (op == CEPH_SESSION_CLOSE) { 3446 ceph_get_mds_session(session); 3447 __unregister_session(mdsc, session); 3448 } 3449 /* FIXME: this ttl calculation is generous */ 3450 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 3451 mutex_unlock(&mdsc->mutex); 3452 3453 mutex_lock(&session->s_mutex); 3454 3455 dout("handle_session mds%d %s %p state %s seq %llu\n", 3456 mds, ceph_session_op_name(op), session, 3457 ceph_session_state_name(session->s_state), seq); 3458 3459 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 3460 session->s_state = CEPH_MDS_SESSION_OPEN; 3461 pr_info("mds%d came back\n", session->s_mds); 3462 } 3463 3464 switch (op) { 3465 case CEPH_SESSION_OPEN: 3466 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3467 pr_info("mds%d reconnect success\n", session->s_mds); 3468 session->s_state = CEPH_MDS_SESSION_OPEN; 3469 session->s_features = features; 3470 renewed_caps(mdsc, session, 0); 3471 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features)) 3472 metric_schedule_delayed(&mdsc->metric); 3473 wake = 1; 3474 if (mdsc->stopping) 3475 __close_session(mdsc, session); 3476 break; 3477 3478 case CEPH_SESSION_RENEWCAPS: 3479 if (session->s_renew_seq == seq) 3480 renewed_caps(mdsc, session, 1); 3481 break; 3482 3483 case CEPH_SESSION_CLOSE: 3484 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3485 pr_info("mds%d reconnect denied\n", session->s_mds); 3486 session->s_state = CEPH_MDS_SESSION_CLOSED; 3487 cleanup_session_requests(mdsc, session); 3488 remove_session_caps(session); 3489 wake = 2; /* for good measure */ 3490 wake_up_all(&mdsc->session_close_wq); 3491 break; 3492 3493 case CEPH_SESSION_STALE: 3494 pr_info("mds%d caps went stale, renewing\n", 3495 session->s_mds); 3496 atomic_inc(&session->s_cap_gen); 3497 session->s_cap_ttl = jiffies - 1; 3498 send_renew_caps(mdsc, session); 3499 break; 3500 3501 case CEPH_SESSION_RECALL_STATE: 3502 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 3503 break; 3504 3505 case CEPH_SESSION_FLUSHMSG: 3506 send_flushmsg_ack(mdsc, session, seq); 3507 break; 3508 3509 case CEPH_SESSION_FORCE_RO: 3510 dout("force_session_readonly %p\n", session); 3511 spin_lock(&session->s_cap_lock); 3512 session->s_readonly = true; 3513 spin_unlock(&session->s_cap_lock); 3514 wake_up_session_caps(session, FORCE_RO); 3515 break; 3516 3517 case CEPH_SESSION_REJECT: 3518 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 3519 pr_info("mds%d rejected session\n", session->s_mds); 3520 session->s_state = CEPH_MDS_SESSION_REJECTED; 3521 cleanup_session_requests(mdsc, session); 3522 remove_session_caps(session); 3523 if (blocklisted) 3524 mdsc->fsc->blocklisted = true; 3525 wake = 2; /* for good measure */ 3526 break; 3527 3528 default: 3529 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 3530 WARN_ON(1); 3531 } 3532 3533 mutex_unlock(&session->s_mutex); 3534 if (wake) { 3535 mutex_lock(&mdsc->mutex); 3536 __wake_requests(mdsc, &session->s_waiting); 3537 if (wake == 2) 3538 kick_requests(mdsc, mds); 3539 mutex_unlock(&mdsc->mutex); 3540 } 3541 if (op == CEPH_SESSION_CLOSE) 3542 ceph_put_mds_session(session); 3543 return; 3544 3545 bad: 3546 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 3547 (int)msg->front.iov_len); 3548 ceph_msg_dump(msg); 3549 return; 3550 } 3551 3552 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 3553 { 3554 int dcaps; 3555 3556 dcaps = xchg(&req->r_dir_caps, 0); 3557 if (dcaps) { 3558 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3559 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 3560 } 3561 } 3562 3563 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req) 3564 { 3565 int dcaps; 3566 3567 dcaps = xchg(&req->r_dir_caps, 0); 3568 if (dcaps) { 3569 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3570 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent), 3571 dcaps); 3572 } 3573 } 3574 3575 /* 3576 * called under session->mutex. 3577 */ 3578 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 3579 struct ceph_mds_session *session) 3580 { 3581 struct ceph_mds_request *req, *nreq; 3582 struct rb_node *p; 3583 3584 dout("replay_unsafe_requests mds%d\n", session->s_mds); 3585 3586 mutex_lock(&mdsc->mutex); 3587 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 3588 __send_request(session, req, true); 3589 3590 /* 3591 * also re-send old requests when MDS enters reconnect stage. So that MDS 3592 * can process completed request in clientreplay stage. 3593 */ 3594 p = rb_first(&mdsc->request_tree); 3595 while (p) { 3596 req = rb_entry(p, struct ceph_mds_request, r_node); 3597 p = rb_next(p); 3598 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3599 continue; 3600 if (req->r_attempts == 0) 3601 continue; /* only old requests */ 3602 if (!req->r_session) 3603 continue; 3604 if (req->r_session->s_mds != session->s_mds) 3605 continue; 3606 3607 ceph_mdsc_release_dir_caps_no_check(req); 3608 3609 __send_request(session, req, true); 3610 } 3611 mutex_unlock(&mdsc->mutex); 3612 } 3613 3614 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 3615 { 3616 struct ceph_msg *reply; 3617 struct ceph_pagelist *_pagelist; 3618 struct page *page; 3619 __le32 *addr; 3620 int err = -ENOMEM; 3621 3622 if (!recon_state->allow_multi) 3623 return -ENOSPC; 3624 3625 /* can't handle message that contains both caps and realm */ 3626 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 3627 3628 /* pre-allocate new pagelist */ 3629 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 3630 if (!_pagelist) 3631 return -ENOMEM; 3632 3633 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3634 if (!reply) 3635 goto fail_msg; 3636 3637 /* placeholder for nr_caps */ 3638 err = ceph_pagelist_encode_32(_pagelist, 0); 3639 if (err < 0) 3640 goto fail; 3641 3642 if (recon_state->nr_caps) { 3643 /* currently encoding caps */ 3644 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 3645 if (err) 3646 goto fail; 3647 } else { 3648 /* placeholder for nr_realms (currently encoding relams) */ 3649 err = ceph_pagelist_encode_32(_pagelist, 0); 3650 if (err < 0) 3651 goto fail; 3652 } 3653 3654 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 3655 if (err) 3656 goto fail; 3657 3658 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 3659 addr = kmap_atomic(page); 3660 if (recon_state->nr_caps) { 3661 /* currently encoding caps */ 3662 *addr = cpu_to_le32(recon_state->nr_caps); 3663 } else { 3664 /* currently encoding relams */ 3665 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 3666 } 3667 kunmap_atomic(addr); 3668 3669 reply->hdr.version = cpu_to_le16(5); 3670 reply->hdr.compat_version = cpu_to_le16(4); 3671 3672 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 3673 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 3674 3675 ceph_con_send(&recon_state->session->s_con, reply); 3676 ceph_pagelist_release(recon_state->pagelist); 3677 3678 recon_state->pagelist = _pagelist; 3679 recon_state->nr_caps = 0; 3680 recon_state->nr_realms = 0; 3681 recon_state->msg_version = 5; 3682 return 0; 3683 fail: 3684 ceph_msg_put(reply); 3685 fail_msg: 3686 ceph_pagelist_release(_pagelist); 3687 return err; 3688 } 3689 3690 static struct dentry* d_find_primary(struct inode *inode) 3691 { 3692 struct dentry *alias, *dn = NULL; 3693 3694 if (hlist_empty(&inode->i_dentry)) 3695 return NULL; 3696 3697 spin_lock(&inode->i_lock); 3698 if (hlist_empty(&inode->i_dentry)) 3699 goto out_unlock; 3700 3701 if (S_ISDIR(inode->i_mode)) { 3702 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias); 3703 if (!IS_ROOT(alias)) 3704 dn = dget(alias); 3705 goto out_unlock; 3706 } 3707 3708 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) { 3709 spin_lock(&alias->d_lock); 3710 if (!d_unhashed(alias) && 3711 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 3712 dn = dget_dlock(alias); 3713 } 3714 spin_unlock(&alias->d_lock); 3715 if (dn) 3716 break; 3717 } 3718 out_unlock: 3719 spin_unlock(&inode->i_lock); 3720 return dn; 3721 } 3722 3723 /* 3724 * Encode information about a cap for a reconnect with the MDS. 3725 */ 3726 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, 3727 void *arg) 3728 { 3729 union { 3730 struct ceph_mds_cap_reconnect v2; 3731 struct ceph_mds_cap_reconnect_v1 v1; 3732 } rec; 3733 struct ceph_inode_info *ci = cap->ci; 3734 struct ceph_reconnect_state *recon_state = arg; 3735 struct ceph_pagelist *pagelist = recon_state->pagelist; 3736 struct dentry *dentry; 3737 char *path; 3738 int pathlen, err; 3739 u64 pathbase; 3740 u64 snap_follows; 3741 3742 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 3743 inode, ceph_vinop(inode), cap, cap->cap_id, 3744 ceph_cap_string(cap->issued)); 3745 3746 dentry = d_find_primary(inode); 3747 if (dentry) { 3748 /* set pathbase to parent dir when msg_version >= 2 */ 3749 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 3750 recon_state->msg_version >= 2); 3751 dput(dentry); 3752 if (IS_ERR(path)) { 3753 err = PTR_ERR(path); 3754 goto out_err; 3755 } 3756 } else { 3757 path = NULL; 3758 pathlen = 0; 3759 pathbase = 0; 3760 } 3761 3762 spin_lock(&ci->i_ceph_lock); 3763 cap->seq = 0; /* reset cap seq */ 3764 cap->issue_seq = 0; /* and issue_seq */ 3765 cap->mseq = 0; /* and migrate_seq */ 3766 cap->cap_gen = atomic_read(&cap->session->s_cap_gen); 3767 3768 /* These are lost when the session goes away */ 3769 if (S_ISDIR(inode->i_mode)) { 3770 if (cap->issued & CEPH_CAP_DIR_CREATE) { 3771 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 3772 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 3773 } 3774 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 3775 } 3776 3777 if (recon_state->msg_version >= 2) { 3778 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 3779 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3780 rec.v2.issued = cpu_to_le32(cap->issued); 3781 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3782 rec.v2.pathbase = cpu_to_le64(pathbase); 3783 rec.v2.flock_len = (__force __le32) 3784 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 3785 } else { 3786 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 3787 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3788 rec.v1.issued = cpu_to_le32(cap->issued); 3789 rec.v1.size = cpu_to_le64(i_size_read(inode)); 3790 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 3791 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 3792 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3793 rec.v1.pathbase = cpu_to_le64(pathbase); 3794 } 3795 3796 if (list_empty(&ci->i_cap_snaps)) { 3797 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 3798 } else { 3799 struct ceph_cap_snap *capsnap = 3800 list_first_entry(&ci->i_cap_snaps, 3801 struct ceph_cap_snap, ci_item); 3802 snap_follows = capsnap->follows; 3803 } 3804 spin_unlock(&ci->i_ceph_lock); 3805 3806 if (recon_state->msg_version >= 2) { 3807 int num_fcntl_locks, num_flock_locks; 3808 struct ceph_filelock *flocks = NULL; 3809 size_t struct_len, total_len = sizeof(u64); 3810 u8 struct_v = 0; 3811 3812 encode_again: 3813 if (rec.v2.flock_len) { 3814 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 3815 } else { 3816 num_fcntl_locks = 0; 3817 num_flock_locks = 0; 3818 } 3819 if (num_fcntl_locks + num_flock_locks > 0) { 3820 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 3821 sizeof(struct ceph_filelock), 3822 GFP_NOFS); 3823 if (!flocks) { 3824 err = -ENOMEM; 3825 goto out_err; 3826 } 3827 err = ceph_encode_locks_to_buffer(inode, flocks, 3828 num_fcntl_locks, 3829 num_flock_locks); 3830 if (err) { 3831 kfree(flocks); 3832 flocks = NULL; 3833 if (err == -ENOSPC) 3834 goto encode_again; 3835 goto out_err; 3836 } 3837 } else { 3838 kfree(flocks); 3839 flocks = NULL; 3840 } 3841 3842 if (recon_state->msg_version >= 3) { 3843 /* version, compat_version and struct_len */ 3844 total_len += 2 * sizeof(u8) + sizeof(u32); 3845 struct_v = 2; 3846 } 3847 /* 3848 * number of encoded locks is stable, so copy to pagelist 3849 */ 3850 struct_len = 2 * sizeof(u32) + 3851 (num_fcntl_locks + num_flock_locks) * 3852 sizeof(struct ceph_filelock); 3853 rec.v2.flock_len = cpu_to_le32(struct_len); 3854 3855 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2); 3856 3857 if (struct_v >= 2) 3858 struct_len += sizeof(u64); /* snap_follows */ 3859 3860 total_len += struct_len; 3861 3862 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 3863 err = send_reconnect_partial(recon_state); 3864 if (err) 3865 goto out_freeflocks; 3866 pagelist = recon_state->pagelist; 3867 } 3868 3869 err = ceph_pagelist_reserve(pagelist, total_len); 3870 if (err) 3871 goto out_freeflocks; 3872 3873 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3874 if (recon_state->msg_version >= 3) { 3875 ceph_pagelist_encode_8(pagelist, struct_v); 3876 ceph_pagelist_encode_8(pagelist, 1); 3877 ceph_pagelist_encode_32(pagelist, struct_len); 3878 } 3879 ceph_pagelist_encode_string(pagelist, path, pathlen); 3880 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 3881 ceph_locks_to_pagelist(flocks, pagelist, 3882 num_fcntl_locks, num_flock_locks); 3883 if (struct_v >= 2) 3884 ceph_pagelist_encode_64(pagelist, snap_follows); 3885 out_freeflocks: 3886 kfree(flocks); 3887 } else { 3888 err = ceph_pagelist_reserve(pagelist, 3889 sizeof(u64) + sizeof(u32) + 3890 pathlen + sizeof(rec.v1)); 3891 if (err) 3892 goto out_err; 3893 3894 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3895 ceph_pagelist_encode_string(pagelist, path, pathlen); 3896 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 3897 } 3898 3899 out_err: 3900 ceph_mdsc_free_path(path, pathlen); 3901 if (!err) 3902 recon_state->nr_caps++; 3903 return err; 3904 } 3905 3906 static int encode_snap_realms(struct ceph_mds_client *mdsc, 3907 struct ceph_reconnect_state *recon_state) 3908 { 3909 struct rb_node *p; 3910 struct ceph_pagelist *pagelist = recon_state->pagelist; 3911 int err = 0; 3912 3913 if (recon_state->msg_version >= 4) { 3914 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 3915 if (err < 0) 3916 goto fail; 3917 } 3918 3919 /* 3920 * snaprealms. we provide mds with the ino, seq (version), and 3921 * parent for all of our realms. If the mds has any newer info, 3922 * it will tell us. 3923 */ 3924 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 3925 struct ceph_snap_realm *realm = 3926 rb_entry(p, struct ceph_snap_realm, node); 3927 struct ceph_mds_snaprealm_reconnect sr_rec; 3928 3929 if (recon_state->msg_version >= 4) { 3930 size_t need = sizeof(u8) * 2 + sizeof(u32) + 3931 sizeof(sr_rec); 3932 3933 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 3934 err = send_reconnect_partial(recon_state); 3935 if (err) 3936 goto fail; 3937 pagelist = recon_state->pagelist; 3938 } 3939 3940 err = ceph_pagelist_reserve(pagelist, need); 3941 if (err) 3942 goto fail; 3943 3944 ceph_pagelist_encode_8(pagelist, 1); 3945 ceph_pagelist_encode_8(pagelist, 1); 3946 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 3947 } 3948 3949 dout(" adding snap realm %llx seq %lld parent %llx\n", 3950 realm->ino, realm->seq, realm->parent_ino); 3951 sr_rec.ino = cpu_to_le64(realm->ino); 3952 sr_rec.seq = cpu_to_le64(realm->seq); 3953 sr_rec.parent = cpu_to_le64(realm->parent_ino); 3954 3955 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 3956 if (err) 3957 goto fail; 3958 3959 recon_state->nr_realms++; 3960 } 3961 fail: 3962 return err; 3963 } 3964 3965 3966 /* 3967 * If an MDS fails and recovers, clients need to reconnect in order to 3968 * reestablish shared state. This includes all caps issued through 3969 * this session _and_ the snap_realm hierarchy. Because it's not 3970 * clear which snap realms the mds cares about, we send everything we 3971 * know about.. that ensures we'll then get any new info the 3972 * recovering MDS might have. 3973 * 3974 * This is a relatively heavyweight operation, but it's rare. 3975 */ 3976 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 3977 struct ceph_mds_session *session) 3978 { 3979 struct ceph_msg *reply; 3980 int mds = session->s_mds; 3981 int err = -ENOMEM; 3982 struct ceph_reconnect_state recon_state = { 3983 .session = session, 3984 }; 3985 LIST_HEAD(dispose); 3986 3987 pr_info("mds%d reconnect start\n", mds); 3988 3989 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 3990 if (!recon_state.pagelist) 3991 goto fail_nopagelist; 3992 3993 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3994 if (!reply) 3995 goto fail_nomsg; 3996 3997 xa_destroy(&session->s_delegated_inos); 3998 3999 mutex_lock(&session->s_mutex); 4000 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 4001 session->s_seq = 0; 4002 4003 dout("session %p state %s\n", session, 4004 ceph_session_state_name(session->s_state)); 4005 4006 atomic_inc(&session->s_cap_gen); 4007 4008 spin_lock(&session->s_cap_lock); 4009 /* don't know if session is readonly */ 4010 session->s_readonly = 0; 4011 /* 4012 * notify __ceph_remove_cap() that we are composing cap reconnect. 4013 * If a cap get released before being added to the cap reconnect, 4014 * __ceph_remove_cap() should skip queuing cap release. 4015 */ 4016 session->s_cap_reconnect = 1; 4017 /* drop old cap expires; we're about to reestablish that state */ 4018 detach_cap_releases(session, &dispose); 4019 spin_unlock(&session->s_cap_lock); 4020 dispose_cap_releases(mdsc, &dispose); 4021 4022 /* trim unused caps to reduce MDS's cache rejoin time */ 4023 if (mdsc->fsc->sb->s_root) 4024 shrink_dcache_parent(mdsc->fsc->sb->s_root); 4025 4026 ceph_con_close(&session->s_con); 4027 ceph_con_open(&session->s_con, 4028 CEPH_ENTITY_TYPE_MDS, mds, 4029 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 4030 4031 /* replay unsafe requests */ 4032 replay_unsafe_requests(mdsc, session); 4033 4034 ceph_early_kick_flushing_caps(mdsc, session); 4035 4036 down_read(&mdsc->snap_rwsem); 4037 4038 /* placeholder for nr_caps */ 4039 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 4040 if (err) 4041 goto fail; 4042 4043 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 4044 recon_state.msg_version = 3; 4045 recon_state.allow_multi = true; 4046 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 4047 recon_state.msg_version = 3; 4048 } else { 4049 recon_state.msg_version = 2; 4050 } 4051 /* trsaverse this session's caps */ 4052 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 4053 4054 spin_lock(&session->s_cap_lock); 4055 session->s_cap_reconnect = 0; 4056 spin_unlock(&session->s_cap_lock); 4057 4058 if (err < 0) 4059 goto fail; 4060 4061 /* check if all realms can be encoded into current message */ 4062 if (mdsc->num_snap_realms) { 4063 size_t total_len = 4064 recon_state.pagelist->length + 4065 mdsc->num_snap_realms * 4066 sizeof(struct ceph_mds_snaprealm_reconnect); 4067 if (recon_state.msg_version >= 4) { 4068 /* number of realms */ 4069 total_len += sizeof(u32); 4070 /* version, compat_version and struct_len */ 4071 total_len += mdsc->num_snap_realms * 4072 (2 * sizeof(u8) + sizeof(u32)); 4073 } 4074 if (total_len > RECONNECT_MAX_SIZE) { 4075 if (!recon_state.allow_multi) { 4076 err = -ENOSPC; 4077 goto fail; 4078 } 4079 if (recon_state.nr_caps) { 4080 err = send_reconnect_partial(&recon_state); 4081 if (err) 4082 goto fail; 4083 } 4084 recon_state.msg_version = 5; 4085 } 4086 } 4087 4088 err = encode_snap_realms(mdsc, &recon_state); 4089 if (err < 0) 4090 goto fail; 4091 4092 if (recon_state.msg_version >= 5) { 4093 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 4094 if (err < 0) 4095 goto fail; 4096 } 4097 4098 if (recon_state.nr_caps || recon_state.nr_realms) { 4099 struct page *page = 4100 list_first_entry(&recon_state.pagelist->head, 4101 struct page, lru); 4102 __le32 *addr = kmap_atomic(page); 4103 if (recon_state.nr_caps) { 4104 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 4105 *addr = cpu_to_le32(recon_state.nr_caps); 4106 } else if (recon_state.msg_version >= 4) { 4107 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 4108 } 4109 kunmap_atomic(addr); 4110 } 4111 4112 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 4113 if (recon_state.msg_version >= 4) 4114 reply->hdr.compat_version = cpu_to_le16(4); 4115 4116 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 4117 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 4118 4119 ceph_con_send(&session->s_con, reply); 4120 4121 mutex_unlock(&session->s_mutex); 4122 4123 mutex_lock(&mdsc->mutex); 4124 __wake_requests(mdsc, &session->s_waiting); 4125 mutex_unlock(&mdsc->mutex); 4126 4127 up_read(&mdsc->snap_rwsem); 4128 ceph_pagelist_release(recon_state.pagelist); 4129 return; 4130 4131 fail: 4132 ceph_msg_put(reply); 4133 up_read(&mdsc->snap_rwsem); 4134 mutex_unlock(&session->s_mutex); 4135 fail_nomsg: 4136 ceph_pagelist_release(recon_state.pagelist); 4137 fail_nopagelist: 4138 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 4139 return; 4140 } 4141 4142 4143 /* 4144 * compare old and new mdsmaps, kicking requests 4145 * and closing out old connections as necessary 4146 * 4147 * called under mdsc->mutex. 4148 */ 4149 static void check_new_map(struct ceph_mds_client *mdsc, 4150 struct ceph_mdsmap *newmap, 4151 struct ceph_mdsmap *oldmap) 4152 { 4153 int i; 4154 int oldstate, newstate; 4155 struct ceph_mds_session *s; 4156 4157 dout("check_new_map new %u old %u\n", 4158 newmap->m_epoch, oldmap->m_epoch); 4159 4160 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4161 if (!mdsc->sessions[i]) 4162 continue; 4163 s = mdsc->sessions[i]; 4164 oldstate = ceph_mdsmap_get_state(oldmap, i); 4165 newstate = ceph_mdsmap_get_state(newmap, i); 4166 4167 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 4168 i, ceph_mds_state_name(oldstate), 4169 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 4170 ceph_mds_state_name(newstate), 4171 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 4172 ceph_session_state_name(s->s_state)); 4173 4174 if (i >= newmap->possible_max_rank) { 4175 /* force close session for stopped mds */ 4176 ceph_get_mds_session(s); 4177 __unregister_session(mdsc, s); 4178 __wake_requests(mdsc, &s->s_waiting); 4179 mutex_unlock(&mdsc->mutex); 4180 4181 mutex_lock(&s->s_mutex); 4182 cleanup_session_requests(mdsc, s); 4183 remove_session_caps(s); 4184 mutex_unlock(&s->s_mutex); 4185 4186 ceph_put_mds_session(s); 4187 4188 mutex_lock(&mdsc->mutex); 4189 kick_requests(mdsc, i); 4190 continue; 4191 } 4192 4193 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 4194 ceph_mdsmap_get_addr(newmap, i), 4195 sizeof(struct ceph_entity_addr))) { 4196 /* just close it */ 4197 mutex_unlock(&mdsc->mutex); 4198 mutex_lock(&s->s_mutex); 4199 mutex_lock(&mdsc->mutex); 4200 ceph_con_close(&s->s_con); 4201 mutex_unlock(&s->s_mutex); 4202 s->s_state = CEPH_MDS_SESSION_RESTARTING; 4203 } else if (oldstate == newstate) { 4204 continue; /* nothing new with this mds */ 4205 } 4206 4207 /* 4208 * send reconnect? 4209 */ 4210 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 4211 newstate >= CEPH_MDS_STATE_RECONNECT) { 4212 mutex_unlock(&mdsc->mutex); 4213 send_mds_reconnect(mdsc, s); 4214 mutex_lock(&mdsc->mutex); 4215 } 4216 4217 /* 4218 * kick request on any mds that has gone active. 4219 */ 4220 if (oldstate < CEPH_MDS_STATE_ACTIVE && 4221 newstate >= CEPH_MDS_STATE_ACTIVE) { 4222 if (oldstate != CEPH_MDS_STATE_CREATING && 4223 oldstate != CEPH_MDS_STATE_STARTING) 4224 pr_info("mds%d recovery completed\n", s->s_mds); 4225 kick_requests(mdsc, i); 4226 mutex_unlock(&mdsc->mutex); 4227 mutex_lock(&s->s_mutex); 4228 mutex_lock(&mdsc->mutex); 4229 ceph_kick_flushing_caps(mdsc, s); 4230 mutex_unlock(&s->s_mutex); 4231 wake_up_session_caps(s, RECONNECT); 4232 } 4233 } 4234 4235 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4236 s = mdsc->sessions[i]; 4237 if (!s) 4238 continue; 4239 if (!ceph_mdsmap_is_laggy(newmap, i)) 4240 continue; 4241 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4242 s->s_state == CEPH_MDS_SESSION_HUNG || 4243 s->s_state == CEPH_MDS_SESSION_CLOSING) { 4244 dout(" connecting to export targets of laggy mds%d\n", 4245 i); 4246 __open_export_target_sessions(mdsc, s); 4247 } 4248 } 4249 } 4250 4251 4252 4253 /* 4254 * leases 4255 */ 4256 4257 /* 4258 * caller must hold session s_mutex, dentry->d_lock 4259 */ 4260 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 4261 { 4262 struct ceph_dentry_info *di = ceph_dentry(dentry); 4263 4264 ceph_put_mds_session(di->lease_session); 4265 di->lease_session = NULL; 4266 } 4267 4268 static void handle_lease(struct ceph_mds_client *mdsc, 4269 struct ceph_mds_session *session, 4270 struct ceph_msg *msg) 4271 { 4272 struct super_block *sb = mdsc->fsc->sb; 4273 struct inode *inode; 4274 struct dentry *parent, *dentry; 4275 struct ceph_dentry_info *di; 4276 int mds = session->s_mds; 4277 struct ceph_mds_lease *h = msg->front.iov_base; 4278 u32 seq; 4279 struct ceph_vino vino; 4280 struct qstr dname; 4281 int release = 0; 4282 4283 dout("handle_lease from mds%d\n", mds); 4284 4285 /* decode */ 4286 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 4287 goto bad; 4288 vino.ino = le64_to_cpu(h->ino); 4289 vino.snap = CEPH_NOSNAP; 4290 seq = le32_to_cpu(h->seq); 4291 dname.len = get_unaligned_le32(h + 1); 4292 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 4293 goto bad; 4294 dname.name = (void *)(h + 1) + sizeof(u32); 4295 4296 /* lookup inode */ 4297 inode = ceph_find_inode(sb, vino); 4298 dout("handle_lease %s, ino %llx %p %.*s\n", 4299 ceph_lease_op_name(h->action), vino.ino, inode, 4300 dname.len, dname.name); 4301 4302 mutex_lock(&session->s_mutex); 4303 inc_session_sequence(session); 4304 4305 if (!inode) { 4306 dout("handle_lease no inode %llx\n", vino.ino); 4307 goto release; 4308 } 4309 4310 /* dentry */ 4311 parent = d_find_alias(inode); 4312 if (!parent) { 4313 dout("no parent dentry on inode %p\n", inode); 4314 WARN_ON(1); 4315 goto release; /* hrm... */ 4316 } 4317 dname.hash = full_name_hash(parent, dname.name, dname.len); 4318 dentry = d_lookup(parent, &dname); 4319 dput(parent); 4320 if (!dentry) 4321 goto release; 4322 4323 spin_lock(&dentry->d_lock); 4324 di = ceph_dentry(dentry); 4325 switch (h->action) { 4326 case CEPH_MDS_LEASE_REVOKE: 4327 if (di->lease_session == session) { 4328 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 4329 h->seq = cpu_to_le32(di->lease_seq); 4330 __ceph_mdsc_drop_dentry_lease(dentry); 4331 } 4332 release = 1; 4333 break; 4334 4335 case CEPH_MDS_LEASE_RENEW: 4336 if (di->lease_session == session && 4337 di->lease_gen == atomic_read(&session->s_cap_gen) && 4338 di->lease_renew_from && 4339 di->lease_renew_after == 0) { 4340 unsigned long duration = 4341 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 4342 4343 di->lease_seq = seq; 4344 di->time = di->lease_renew_from + duration; 4345 di->lease_renew_after = di->lease_renew_from + 4346 (duration >> 1); 4347 di->lease_renew_from = 0; 4348 } 4349 break; 4350 } 4351 spin_unlock(&dentry->d_lock); 4352 dput(dentry); 4353 4354 if (!release) 4355 goto out; 4356 4357 release: 4358 /* let's just reuse the same message */ 4359 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 4360 ceph_msg_get(msg); 4361 ceph_con_send(&session->s_con, msg); 4362 4363 out: 4364 mutex_unlock(&session->s_mutex); 4365 iput(inode); 4366 return; 4367 4368 bad: 4369 pr_err("corrupt lease message\n"); 4370 ceph_msg_dump(msg); 4371 } 4372 4373 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 4374 struct dentry *dentry, char action, 4375 u32 seq) 4376 { 4377 struct ceph_msg *msg; 4378 struct ceph_mds_lease *lease; 4379 struct inode *dir; 4380 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 4381 4382 dout("lease_send_msg identry %p %s to mds%d\n", 4383 dentry, ceph_lease_op_name(action), session->s_mds); 4384 4385 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 4386 if (!msg) 4387 return; 4388 lease = msg->front.iov_base; 4389 lease->action = action; 4390 lease->seq = cpu_to_le32(seq); 4391 4392 spin_lock(&dentry->d_lock); 4393 dir = d_inode(dentry->d_parent); 4394 lease->ino = cpu_to_le64(ceph_ino(dir)); 4395 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 4396 4397 put_unaligned_le32(dentry->d_name.len, lease + 1); 4398 memcpy((void *)(lease + 1) + 4, 4399 dentry->d_name.name, dentry->d_name.len); 4400 spin_unlock(&dentry->d_lock); 4401 /* 4402 * if this is a preemptive lease RELEASE, no need to 4403 * flush request stream, since the actual request will 4404 * soon follow. 4405 */ 4406 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 4407 4408 ceph_con_send(&session->s_con, msg); 4409 } 4410 4411 /* 4412 * lock unlock sessions, to wait ongoing session activities 4413 */ 4414 static void lock_unlock_sessions(struct ceph_mds_client *mdsc) 4415 { 4416 int i; 4417 4418 mutex_lock(&mdsc->mutex); 4419 for (i = 0; i < mdsc->max_sessions; i++) { 4420 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4421 if (!s) 4422 continue; 4423 mutex_unlock(&mdsc->mutex); 4424 mutex_lock(&s->s_mutex); 4425 mutex_unlock(&s->s_mutex); 4426 ceph_put_mds_session(s); 4427 mutex_lock(&mdsc->mutex); 4428 } 4429 mutex_unlock(&mdsc->mutex); 4430 } 4431 4432 static void maybe_recover_session(struct ceph_mds_client *mdsc) 4433 { 4434 struct ceph_fs_client *fsc = mdsc->fsc; 4435 4436 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 4437 return; 4438 4439 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 4440 return; 4441 4442 if (!READ_ONCE(fsc->blocklisted)) 4443 return; 4444 4445 pr_info("auto reconnect after blocklisted\n"); 4446 ceph_force_reconnect(fsc->sb); 4447 } 4448 4449 bool check_session_state(struct ceph_mds_session *s) 4450 { 4451 switch (s->s_state) { 4452 case CEPH_MDS_SESSION_OPEN: 4453 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 4454 s->s_state = CEPH_MDS_SESSION_HUNG; 4455 pr_info("mds%d hung\n", s->s_mds); 4456 } 4457 break; 4458 case CEPH_MDS_SESSION_CLOSING: 4459 /* Should never reach this when we're unmounting */ 4460 WARN_ON_ONCE(s->s_ttl); 4461 fallthrough; 4462 case CEPH_MDS_SESSION_NEW: 4463 case CEPH_MDS_SESSION_RESTARTING: 4464 case CEPH_MDS_SESSION_CLOSED: 4465 case CEPH_MDS_SESSION_REJECTED: 4466 return false; 4467 } 4468 4469 return true; 4470 } 4471 4472 /* 4473 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 4474 * then we need to retransmit that request. 4475 */ 4476 void inc_session_sequence(struct ceph_mds_session *s) 4477 { 4478 lockdep_assert_held(&s->s_mutex); 4479 4480 s->s_seq++; 4481 4482 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 4483 int ret; 4484 4485 dout("resending session close request for mds%d\n", s->s_mds); 4486 ret = request_close_session(s); 4487 if (ret < 0) 4488 pr_err("unable to close session to mds%d: %d\n", 4489 s->s_mds, ret); 4490 } 4491 } 4492 4493 /* 4494 * delayed work -- periodically trim expired leases, renew caps with mds. If 4495 * the @delay parameter is set to 0 or if it's more than 5 secs, the default 4496 * workqueue delay value of 5 secs will be used. 4497 */ 4498 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay) 4499 { 4500 unsigned long max_delay = HZ * 5; 4501 4502 /* 5 secs default delay */ 4503 if (!delay || (delay > max_delay)) 4504 delay = max_delay; 4505 schedule_delayed_work(&mdsc->delayed_work, 4506 round_jiffies_relative(delay)); 4507 } 4508 4509 static void delayed_work(struct work_struct *work) 4510 { 4511 struct ceph_mds_client *mdsc = 4512 container_of(work, struct ceph_mds_client, delayed_work.work); 4513 unsigned long delay; 4514 int renew_interval; 4515 int renew_caps; 4516 int i; 4517 4518 dout("mdsc delayed_work\n"); 4519 4520 if (mdsc->stopping) 4521 return; 4522 4523 mutex_lock(&mdsc->mutex); 4524 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 4525 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 4526 mdsc->last_renew_caps); 4527 if (renew_caps) 4528 mdsc->last_renew_caps = jiffies; 4529 4530 for (i = 0; i < mdsc->max_sessions; i++) { 4531 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4532 if (!s) 4533 continue; 4534 4535 if (!check_session_state(s)) { 4536 ceph_put_mds_session(s); 4537 continue; 4538 } 4539 mutex_unlock(&mdsc->mutex); 4540 4541 mutex_lock(&s->s_mutex); 4542 if (renew_caps) 4543 send_renew_caps(mdsc, s); 4544 else 4545 ceph_con_keepalive(&s->s_con); 4546 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4547 s->s_state == CEPH_MDS_SESSION_HUNG) 4548 ceph_send_cap_releases(mdsc, s); 4549 mutex_unlock(&s->s_mutex); 4550 ceph_put_mds_session(s); 4551 4552 mutex_lock(&mdsc->mutex); 4553 } 4554 mutex_unlock(&mdsc->mutex); 4555 4556 delay = ceph_check_delayed_caps(mdsc); 4557 4558 ceph_queue_cap_reclaim_work(mdsc); 4559 4560 ceph_trim_snapid_map(mdsc); 4561 4562 maybe_recover_session(mdsc); 4563 4564 schedule_delayed(mdsc, delay); 4565 } 4566 4567 int ceph_mdsc_init(struct ceph_fs_client *fsc) 4568 4569 { 4570 struct ceph_mds_client *mdsc; 4571 int err; 4572 4573 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 4574 if (!mdsc) 4575 return -ENOMEM; 4576 mdsc->fsc = fsc; 4577 mutex_init(&mdsc->mutex); 4578 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 4579 if (!mdsc->mdsmap) { 4580 err = -ENOMEM; 4581 goto err_mdsc; 4582 } 4583 4584 init_completion(&mdsc->safe_umount_waiters); 4585 init_waitqueue_head(&mdsc->session_close_wq); 4586 INIT_LIST_HEAD(&mdsc->waiting_for_map); 4587 mdsc->sessions = NULL; 4588 atomic_set(&mdsc->num_sessions, 0); 4589 mdsc->max_sessions = 0; 4590 mdsc->stopping = 0; 4591 atomic64_set(&mdsc->quotarealms_count, 0); 4592 mdsc->quotarealms_inodes = RB_ROOT; 4593 mutex_init(&mdsc->quotarealms_inodes_mutex); 4594 mdsc->last_snap_seq = 0; 4595 init_rwsem(&mdsc->snap_rwsem); 4596 mdsc->snap_realms = RB_ROOT; 4597 INIT_LIST_HEAD(&mdsc->snap_empty); 4598 mdsc->num_snap_realms = 0; 4599 spin_lock_init(&mdsc->snap_empty_lock); 4600 mdsc->last_tid = 0; 4601 mdsc->oldest_tid = 0; 4602 mdsc->request_tree = RB_ROOT; 4603 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 4604 mdsc->last_renew_caps = jiffies; 4605 INIT_LIST_HEAD(&mdsc->cap_delay_list); 4606 INIT_LIST_HEAD(&mdsc->cap_wait_list); 4607 spin_lock_init(&mdsc->cap_delay_lock); 4608 INIT_LIST_HEAD(&mdsc->snap_flush_list); 4609 spin_lock_init(&mdsc->snap_flush_lock); 4610 mdsc->last_cap_flush_tid = 1; 4611 INIT_LIST_HEAD(&mdsc->cap_flush_list); 4612 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 4613 mdsc->num_cap_flushing = 0; 4614 spin_lock_init(&mdsc->cap_dirty_lock); 4615 init_waitqueue_head(&mdsc->cap_flushing_wq); 4616 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 4617 atomic_set(&mdsc->cap_reclaim_pending, 0); 4618 err = ceph_metric_init(&mdsc->metric); 4619 if (err) 4620 goto err_mdsmap; 4621 4622 spin_lock_init(&mdsc->dentry_list_lock); 4623 INIT_LIST_HEAD(&mdsc->dentry_leases); 4624 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 4625 4626 ceph_caps_init(mdsc); 4627 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 4628 4629 spin_lock_init(&mdsc->snapid_map_lock); 4630 mdsc->snapid_map_tree = RB_ROOT; 4631 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 4632 4633 init_rwsem(&mdsc->pool_perm_rwsem); 4634 mdsc->pool_perm_tree = RB_ROOT; 4635 4636 strscpy(mdsc->nodename, utsname()->nodename, 4637 sizeof(mdsc->nodename)); 4638 4639 fsc->mdsc = mdsc; 4640 return 0; 4641 4642 err_mdsmap: 4643 kfree(mdsc->mdsmap); 4644 err_mdsc: 4645 kfree(mdsc); 4646 return err; 4647 } 4648 4649 /* 4650 * Wait for safe replies on open mds requests. If we time out, drop 4651 * all requests from the tree to avoid dangling dentry refs. 4652 */ 4653 static void wait_requests(struct ceph_mds_client *mdsc) 4654 { 4655 struct ceph_options *opts = mdsc->fsc->client->options; 4656 struct ceph_mds_request *req; 4657 4658 mutex_lock(&mdsc->mutex); 4659 if (__get_oldest_req(mdsc)) { 4660 mutex_unlock(&mdsc->mutex); 4661 4662 dout("wait_requests waiting for requests\n"); 4663 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 4664 ceph_timeout_jiffies(opts->mount_timeout)); 4665 4666 /* tear down remaining requests */ 4667 mutex_lock(&mdsc->mutex); 4668 while ((req = __get_oldest_req(mdsc))) { 4669 dout("wait_requests timed out on tid %llu\n", 4670 req->r_tid); 4671 list_del_init(&req->r_wait); 4672 __unregister_request(mdsc, req); 4673 } 4674 } 4675 mutex_unlock(&mdsc->mutex); 4676 dout("wait_requests done\n"); 4677 } 4678 4679 /* 4680 * called before mount is ro, and before dentries are torn down. 4681 * (hmm, does this still race with new lookups?) 4682 */ 4683 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 4684 { 4685 dout("pre_umount\n"); 4686 mdsc->stopping = 1; 4687 4688 lock_unlock_sessions(mdsc); 4689 ceph_flush_dirty_caps(mdsc); 4690 wait_requests(mdsc); 4691 4692 /* 4693 * wait for reply handlers to drop their request refs and 4694 * their inode/dcache refs 4695 */ 4696 ceph_msgr_flush(); 4697 4698 ceph_cleanup_quotarealms_inodes(mdsc); 4699 } 4700 4701 /* 4702 * wait for all write mds requests to flush. 4703 */ 4704 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 4705 { 4706 struct ceph_mds_request *req = NULL, *nextreq; 4707 struct rb_node *n; 4708 4709 mutex_lock(&mdsc->mutex); 4710 dout("wait_unsafe_requests want %lld\n", want_tid); 4711 restart: 4712 req = __get_oldest_req(mdsc); 4713 while (req && req->r_tid <= want_tid) { 4714 /* find next request */ 4715 n = rb_next(&req->r_node); 4716 if (n) 4717 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 4718 else 4719 nextreq = NULL; 4720 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 4721 (req->r_op & CEPH_MDS_OP_WRITE)) { 4722 /* write op */ 4723 ceph_mdsc_get_request(req); 4724 if (nextreq) 4725 ceph_mdsc_get_request(nextreq); 4726 mutex_unlock(&mdsc->mutex); 4727 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 4728 req->r_tid, want_tid); 4729 wait_for_completion(&req->r_safe_completion); 4730 mutex_lock(&mdsc->mutex); 4731 ceph_mdsc_put_request(req); 4732 if (!nextreq) 4733 break; /* next dne before, so we're done! */ 4734 if (RB_EMPTY_NODE(&nextreq->r_node)) { 4735 /* next request was removed from tree */ 4736 ceph_mdsc_put_request(nextreq); 4737 goto restart; 4738 } 4739 ceph_mdsc_put_request(nextreq); /* won't go away */ 4740 } 4741 req = nextreq; 4742 } 4743 mutex_unlock(&mdsc->mutex); 4744 dout("wait_unsafe_requests done\n"); 4745 } 4746 4747 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 4748 { 4749 u64 want_tid, want_flush; 4750 4751 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) 4752 return; 4753 4754 dout("sync\n"); 4755 mutex_lock(&mdsc->mutex); 4756 want_tid = mdsc->last_tid; 4757 mutex_unlock(&mdsc->mutex); 4758 4759 ceph_flush_dirty_caps(mdsc); 4760 spin_lock(&mdsc->cap_dirty_lock); 4761 want_flush = mdsc->last_cap_flush_tid; 4762 if (!list_empty(&mdsc->cap_flush_list)) { 4763 struct ceph_cap_flush *cf = 4764 list_last_entry(&mdsc->cap_flush_list, 4765 struct ceph_cap_flush, g_list); 4766 cf->wake = true; 4767 } 4768 spin_unlock(&mdsc->cap_dirty_lock); 4769 4770 dout("sync want tid %lld flush_seq %lld\n", 4771 want_tid, want_flush); 4772 4773 wait_unsafe_requests(mdsc, want_tid); 4774 wait_caps_flush(mdsc, want_flush); 4775 } 4776 4777 /* 4778 * true if all sessions are closed, or we force unmount 4779 */ 4780 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 4781 { 4782 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4783 return true; 4784 return atomic_read(&mdsc->num_sessions) <= skipped; 4785 } 4786 4787 /* 4788 * called after sb is ro. 4789 */ 4790 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 4791 { 4792 struct ceph_options *opts = mdsc->fsc->client->options; 4793 struct ceph_mds_session *session; 4794 int i; 4795 int skipped = 0; 4796 4797 dout("close_sessions\n"); 4798 4799 /* close sessions */ 4800 mutex_lock(&mdsc->mutex); 4801 for (i = 0; i < mdsc->max_sessions; i++) { 4802 session = __ceph_lookup_mds_session(mdsc, i); 4803 if (!session) 4804 continue; 4805 mutex_unlock(&mdsc->mutex); 4806 mutex_lock(&session->s_mutex); 4807 if (__close_session(mdsc, session) <= 0) 4808 skipped++; 4809 mutex_unlock(&session->s_mutex); 4810 ceph_put_mds_session(session); 4811 mutex_lock(&mdsc->mutex); 4812 } 4813 mutex_unlock(&mdsc->mutex); 4814 4815 dout("waiting for sessions to close\n"); 4816 wait_event_timeout(mdsc->session_close_wq, 4817 done_closing_sessions(mdsc, skipped), 4818 ceph_timeout_jiffies(opts->mount_timeout)); 4819 4820 /* tear down remaining sessions */ 4821 mutex_lock(&mdsc->mutex); 4822 for (i = 0; i < mdsc->max_sessions; i++) { 4823 if (mdsc->sessions[i]) { 4824 session = ceph_get_mds_session(mdsc->sessions[i]); 4825 __unregister_session(mdsc, session); 4826 mutex_unlock(&mdsc->mutex); 4827 mutex_lock(&session->s_mutex); 4828 remove_session_caps(session); 4829 mutex_unlock(&session->s_mutex); 4830 ceph_put_mds_session(session); 4831 mutex_lock(&mdsc->mutex); 4832 } 4833 } 4834 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 4835 mutex_unlock(&mdsc->mutex); 4836 4837 ceph_cleanup_snapid_map(mdsc); 4838 ceph_cleanup_empty_realms(mdsc); 4839 4840 cancel_work_sync(&mdsc->cap_reclaim_work); 4841 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4842 4843 dout("stopped\n"); 4844 } 4845 4846 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 4847 { 4848 struct ceph_mds_session *session; 4849 int mds; 4850 4851 dout("force umount\n"); 4852 4853 mutex_lock(&mdsc->mutex); 4854 for (mds = 0; mds < mdsc->max_sessions; mds++) { 4855 session = __ceph_lookup_mds_session(mdsc, mds); 4856 if (!session) 4857 continue; 4858 4859 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 4860 __unregister_session(mdsc, session); 4861 __wake_requests(mdsc, &session->s_waiting); 4862 mutex_unlock(&mdsc->mutex); 4863 4864 mutex_lock(&session->s_mutex); 4865 __close_session(mdsc, session); 4866 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 4867 cleanup_session_requests(mdsc, session); 4868 remove_session_caps(session); 4869 } 4870 mutex_unlock(&session->s_mutex); 4871 ceph_put_mds_session(session); 4872 4873 mutex_lock(&mdsc->mutex); 4874 kick_requests(mdsc, mds); 4875 } 4876 __wake_requests(mdsc, &mdsc->waiting_for_map); 4877 mutex_unlock(&mdsc->mutex); 4878 } 4879 4880 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 4881 { 4882 dout("stop\n"); 4883 /* 4884 * Make sure the delayed work stopped before releasing 4885 * the resources. 4886 * 4887 * Because the cancel_delayed_work_sync() will only 4888 * guarantee that the work finishes executing. But the 4889 * delayed work will re-arm itself again after that. 4890 */ 4891 flush_delayed_work(&mdsc->delayed_work); 4892 4893 if (mdsc->mdsmap) 4894 ceph_mdsmap_destroy(mdsc->mdsmap); 4895 kfree(mdsc->sessions); 4896 ceph_caps_finalize(mdsc); 4897 ceph_pool_perm_destroy(mdsc); 4898 } 4899 4900 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 4901 { 4902 struct ceph_mds_client *mdsc = fsc->mdsc; 4903 dout("mdsc_destroy %p\n", mdsc); 4904 4905 if (!mdsc) 4906 return; 4907 4908 /* flush out any connection work with references to us */ 4909 ceph_msgr_flush(); 4910 4911 ceph_mdsc_stop(mdsc); 4912 4913 ceph_metric_destroy(&mdsc->metric); 4914 4915 flush_delayed_work(&mdsc->metric.delayed_work); 4916 fsc->mdsc = NULL; 4917 kfree(mdsc); 4918 dout("mdsc_destroy %p done\n", mdsc); 4919 } 4920 4921 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4922 { 4923 struct ceph_fs_client *fsc = mdsc->fsc; 4924 const char *mds_namespace = fsc->mount_options->mds_namespace; 4925 void *p = msg->front.iov_base; 4926 void *end = p + msg->front.iov_len; 4927 u32 epoch; 4928 u32 num_fs; 4929 u32 mount_fscid = (u32)-1; 4930 int err = -EINVAL; 4931 4932 ceph_decode_need(&p, end, sizeof(u32), bad); 4933 epoch = ceph_decode_32(&p); 4934 4935 dout("handle_fsmap epoch %u\n", epoch); 4936 4937 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */ 4938 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad); 4939 4940 ceph_decode_32_safe(&p, end, num_fs, bad); 4941 while (num_fs-- > 0) { 4942 void *info_p, *info_end; 4943 u32 info_len; 4944 u32 fscid, namelen; 4945 4946 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4947 p += 2; // info_v, info_cv 4948 info_len = ceph_decode_32(&p); 4949 ceph_decode_need(&p, end, info_len, bad); 4950 info_p = p; 4951 info_end = p + info_len; 4952 p = info_end; 4953 4954 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 4955 fscid = ceph_decode_32(&info_p); 4956 namelen = ceph_decode_32(&info_p); 4957 ceph_decode_need(&info_p, info_end, namelen, bad); 4958 4959 if (mds_namespace && 4960 strlen(mds_namespace) == namelen && 4961 !strncmp(mds_namespace, (char *)info_p, namelen)) { 4962 mount_fscid = fscid; 4963 break; 4964 } 4965 } 4966 4967 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 4968 if (mount_fscid != (u32)-1) { 4969 fsc->client->monc.fs_cluster_id = mount_fscid; 4970 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 4971 0, true); 4972 ceph_monc_renew_subs(&fsc->client->monc); 4973 } else { 4974 err = -ENOENT; 4975 goto err_out; 4976 } 4977 return; 4978 4979 bad: 4980 pr_err("error decoding fsmap\n"); 4981 err_out: 4982 mutex_lock(&mdsc->mutex); 4983 mdsc->mdsmap_err = err; 4984 __wake_requests(mdsc, &mdsc->waiting_for_map); 4985 mutex_unlock(&mdsc->mutex); 4986 } 4987 4988 /* 4989 * handle mds map update. 4990 */ 4991 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4992 { 4993 u32 epoch; 4994 u32 maplen; 4995 void *p = msg->front.iov_base; 4996 void *end = p + msg->front.iov_len; 4997 struct ceph_mdsmap *newmap, *oldmap; 4998 struct ceph_fsid fsid; 4999 int err = -EINVAL; 5000 5001 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 5002 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 5003 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 5004 return; 5005 epoch = ceph_decode_32(&p); 5006 maplen = ceph_decode_32(&p); 5007 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 5008 5009 /* do we need it? */ 5010 mutex_lock(&mdsc->mutex); 5011 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 5012 dout("handle_map epoch %u <= our %u\n", 5013 epoch, mdsc->mdsmap->m_epoch); 5014 mutex_unlock(&mdsc->mutex); 5015 return; 5016 } 5017 5018 newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client)); 5019 if (IS_ERR(newmap)) { 5020 err = PTR_ERR(newmap); 5021 goto bad_unlock; 5022 } 5023 5024 /* swap into place */ 5025 if (mdsc->mdsmap) { 5026 oldmap = mdsc->mdsmap; 5027 mdsc->mdsmap = newmap; 5028 check_new_map(mdsc, newmap, oldmap); 5029 ceph_mdsmap_destroy(oldmap); 5030 } else { 5031 mdsc->mdsmap = newmap; /* first mds map */ 5032 } 5033 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 5034 MAX_LFS_FILESIZE); 5035 5036 __wake_requests(mdsc, &mdsc->waiting_for_map); 5037 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 5038 mdsc->mdsmap->m_epoch); 5039 5040 mutex_unlock(&mdsc->mutex); 5041 schedule_delayed(mdsc, 0); 5042 return; 5043 5044 bad_unlock: 5045 mutex_unlock(&mdsc->mutex); 5046 bad: 5047 pr_err("error decoding mdsmap %d\n", err); 5048 return; 5049 } 5050 5051 static struct ceph_connection *mds_get_con(struct ceph_connection *con) 5052 { 5053 struct ceph_mds_session *s = con->private; 5054 5055 if (ceph_get_mds_session(s)) 5056 return con; 5057 return NULL; 5058 } 5059 5060 static void mds_put_con(struct ceph_connection *con) 5061 { 5062 struct ceph_mds_session *s = con->private; 5063 5064 ceph_put_mds_session(s); 5065 } 5066 5067 /* 5068 * if the client is unresponsive for long enough, the mds will kill 5069 * the session entirely. 5070 */ 5071 static void mds_peer_reset(struct ceph_connection *con) 5072 { 5073 struct ceph_mds_session *s = con->private; 5074 struct ceph_mds_client *mdsc = s->s_mdsc; 5075 5076 pr_warn("mds%d closed our session\n", s->s_mds); 5077 send_mds_reconnect(mdsc, s); 5078 } 5079 5080 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 5081 { 5082 struct ceph_mds_session *s = con->private; 5083 struct ceph_mds_client *mdsc = s->s_mdsc; 5084 int type = le16_to_cpu(msg->hdr.type); 5085 5086 mutex_lock(&mdsc->mutex); 5087 if (__verify_registered_session(mdsc, s) < 0) { 5088 mutex_unlock(&mdsc->mutex); 5089 goto out; 5090 } 5091 mutex_unlock(&mdsc->mutex); 5092 5093 switch (type) { 5094 case CEPH_MSG_MDS_MAP: 5095 ceph_mdsc_handle_mdsmap(mdsc, msg); 5096 break; 5097 case CEPH_MSG_FS_MAP_USER: 5098 ceph_mdsc_handle_fsmap(mdsc, msg); 5099 break; 5100 case CEPH_MSG_CLIENT_SESSION: 5101 handle_session(s, msg); 5102 break; 5103 case CEPH_MSG_CLIENT_REPLY: 5104 handle_reply(s, msg); 5105 break; 5106 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 5107 handle_forward(mdsc, s, msg); 5108 break; 5109 case CEPH_MSG_CLIENT_CAPS: 5110 ceph_handle_caps(s, msg); 5111 break; 5112 case CEPH_MSG_CLIENT_SNAP: 5113 ceph_handle_snap(mdsc, s, msg); 5114 break; 5115 case CEPH_MSG_CLIENT_LEASE: 5116 handle_lease(mdsc, s, msg); 5117 break; 5118 case CEPH_MSG_CLIENT_QUOTA: 5119 ceph_handle_quota(mdsc, s, msg); 5120 break; 5121 5122 default: 5123 pr_err("received unknown message type %d %s\n", type, 5124 ceph_msg_type_name(type)); 5125 } 5126 out: 5127 ceph_msg_put(msg); 5128 } 5129 5130 /* 5131 * authentication 5132 */ 5133 5134 /* 5135 * Note: returned pointer is the address of a structure that's 5136 * managed separately. Caller must *not* attempt to free it. 5137 */ 5138 static struct ceph_auth_handshake * 5139 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new) 5140 { 5141 struct ceph_mds_session *s = con->private; 5142 struct ceph_mds_client *mdsc = s->s_mdsc; 5143 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5144 struct ceph_auth_handshake *auth = &s->s_auth; 5145 int ret; 5146 5147 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5148 force_new, proto, NULL, NULL); 5149 if (ret) 5150 return ERR_PTR(ret); 5151 5152 return auth; 5153 } 5154 5155 static int mds_add_authorizer_challenge(struct ceph_connection *con, 5156 void *challenge_buf, int challenge_buf_len) 5157 { 5158 struct ceph_mds_session *s = con->private; 5159 struct ceph_mds_client *mdsc = s->s_mdsc; 5160 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5161 5162 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 5163 challenge_buf, challenge_buf_len); 5164 } 5165 5166 static int mds_verify_authorizer_reply(struct ceph_connection *con) 5167 { 5168 struct ceph_mds_session *s = con->private; 5169 struct ceph_mds_client *mdsc = s->s_mdsc; 5170 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5171 struct ceph_auth_handshake *auth = &s->s_auth; 5172 5173 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer, 5174 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len, 5175 NULL, NULL, NULL, NULL); 5176 } 5177 5178 static int mds_invalidate_authorizer(struct ceph_connection *con) 5179 { 5180 struct ceph_mds_session *s = con->private; 5181 struct ceph_mds_client *mdsc = s->s_mdsc; 5182 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5183 5184 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 5185 5186 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 5187 } 5188 5189 static int mds_get_auth_request(struct ceph_connection *con, 5190 void *buf, int *buf_len, 5191 void **authorizer, int *authorizer_len) 5192 { 5193 struct ceph_mds_session *s = con->private; 5194 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5195 struct ceph_auth_handshake *auth = &s->s_auth; 5196 int ret; 5197 5198 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5199 buf, buf_len); 5200 if (ret) 5201 return ret; 5202 5203 *authorizer = auth->authorizer_buf; 5204 *authorizer_len = auth->authorizer_buf_len; 5205 return 0; 5206 } 5207 5208 static int mds_handle_auth_reply_more(struct ceph_connection *con, 5209 void *reply, int reply_len, 5210 void *buf, int *buf_len, 5211 void **authorizer, int *authorizer_len) 5212 { 5213 struct ceph_mds_session *s = con->private; 5214 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5215 struct ceph_auth_handshake *auth = &s->s_auth; 5216 int ret; 5217 5218 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len, 5219 buf, buf_len); 5220 if (ret) 5221 return ret; 5222 5223 *authorizer = auth->authorizer_buf; 5224 *authorizer_len = auth->authorizer_buf_len; 5225 return 0; 5226 } 5227 5228 static int mds_handle_auth_done(struct ceph_connection *con, 5229 u64 global_id, void *reply, int reply_len, 5230 u8 *session_key, int *session_key_len, 5231 u8 *con_secret, int *con_secret_len) 5232 { 5233 struct ceph_mds_session *s = con->private; 5234 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5235 struct ceph_auth_handshake *auth = &s->s_auth; 5236 5237 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len, 5238 session_key, session_key_len, 5239 con_secret, con_secret_len); 5240 } 5241 5242 static int mds_handle_auth_bad_method(struct ceph_connection *con, 5243 int used_proto, int result, 5244 const int *allowed_protos, int proto_cnt, 5245 const int *allowed_modes, int mode_cnt) 5246 { 5247 struct ceph_mds_session *s = con->private; 5248 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc; 5249 int ret; 5250 5251 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS, 5252 used_proto, result, 5253 allowed_protos, proto_cnt, 5254 allowed_modes, mode_cnt)) { 5255 ret = ceph_monc_validate_auth(monc); 5256 if (ret) 5257 return ret; 5258 } 5259 5260 return -EACCES; 5261 } 5262 5263 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 5264 struct ceph_msg_header *hdr, int *skip) 5265 { 5266 struct ceph_msg *msg; 5267 int type = (int) le16_to_cpu(hdr->type); 5268 int front_len = (int) le32_to_cpu(hdr->front_len); 5269 5270 if (con->in_msg) 5271 return con->in_msg; 5272 5273 *skip = 0; 5274 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 5275 if (!msg) { 5276 pr_err("unable to allocate msg type %d len %d\n", 5277 type, front_len); 5278 return NULL; 5279 } 5280 5281 return msg; 5282 } 5283 5284 static int mds_sign_message(struct ceph_msg *msg) 5285 { 5286 struct ceph_mds_session *s = msg->con->private; 5287 struct ceph_auth_handshake *auth = &s->s_auth; 5288 5289 return ceph_auth_sign_message(auth, msg); 5290 } 5291 5292 static int mds_check_message_signature(struct ceph_msg *msg) 5293 { 5294 struct ceph_mds_session *s = msg->con->private; 5295 struct ceph_auth_handshake *auth = &s->s_auth; 5296 5297 return ceph_auth_check_message_signature(auth, msg); 5298 } 5299 5300 static const struct ceph_connection_operations mds_con_ops = { 5301 .get = mds_get_con, 5302 .put = mds_put_con, 5303 .alloc_msg = mds_alloc_msg, 5304 .dispatch = mds_dispatch, 5305 .peer_reset = mds_peer_reset, 5306 .get_authorizer = mds_get_authorizer, 5307 .add_authorizer_challenge = mds_add_authorizer_challenge, 5308 .verify_authorizer_reply = mds_verify_authorizer_reply, 5309 .invalidate_authorizer = mds_invalidate_authorizer, 5310 .sign_message = mds_sign_message, 5311 .check_message_signature = mds_check_message_signature, 5312 .get_auth_request = mds_get_auth_request, 5313 .handle_auth_reply_more = mds_handle_auth_reply_more, 5314 .handle_auth_done = mds_handle_auth_done, 5315 .handle_auth_bad_method = mds_handle_auth_bad_method, 5316 }; 5317 5318 /* eof */ 5319