1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 15 #include "super.h" 16 #include "mds_client.h" 17 18 #include <linux/ceph/ceph_features.h> 19 #include <linux/ceph/messenger.h> 20 #include <linux/ceph/decode.h> 21 #include <linux/ceph/pagelist.h> 22 #include <linux/ceph/auth.h> 23 #include <linux/ceph/debugfs.h> 24 25 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 26 27 /* 28 * A cluster of MDS (metadata server) daemons is responsible for 29 * managing the file system namespace (the directory hierarchy and 30 * inodes) and for coordinating shared access to storage. Metadata is 31 * partitioning hierarchically across a number of servers, and that 32 * partition varies over time as the cluster adjusts the distribution 33 * in order to balance load. 34 * 35 * The MDS client is primarily responsible to managing synchronous 36 * metadata requests for operations like open, unlink, and so forth. 37 * If there is a MDS failure, we find out about it when we (possibly 38 * request and) receive a new MDS map, and can resubmit affected 39 * requests. 40 * 41 * For the most part, though, we take advantage of a lossless 42 * communications channel to the MDS, and do not need to worry about 43 * timing out or resubmitting requests. 44 * 45 * We maintain a stateful "session" with each MDS we interact with. 46 * Within each session, we sent periodic heartbeat messages to ensure 47 * any capabilities or leases we have been issues remain valid. If 48 * the session times out and goes stale, our leases and capabilities 49 * are no longer valid. 50 */ 51 52 struct ceph_reconnect_state { 53 struct ceph_mds_session *session; 54 int nr_caps, nr_realms; 55 struct ceph_pagelist *pagelist; 56 unsigned msg_version; 57 bool allow_multi; 58 }; 59 60 static void __wake_requests(struct ceph_mds_client *mdsc, 61 struct list_head *head); 62 static void ceph_cap_release_work(struct work_struct *work); 63 static void ceph_cap_reclaim_work(struct work_struct *work); 64 65 static const struct ceph_connection_operations mds_con_ops; 66 67 68 /* 69 * mds reply parsing 70 */ 71 72 static int parse_reply_info_quota(void **p, void *end, 73 struct ceph_mds_reply_info_in *info) 74 { 75 u8 struct_v, struct_compat; 76 u32 struct_len; 77 78 ceph_decode_8_safe(p, end, struct_v, bad); 79 ceph_decode_8_safe(p, end, struct_compat, bad); 80 /* struct_v is expected to be >= 1. we only 81 * understand encoding with struct_compat == 1. */ 82 if (!struct_v || struct_compat != 1) 83 goto bad; 84 ceph_decode_32_safe(p, end, struct_len, bad); 85 ceph_decode_need(p, end, struct_len, bad); 86 end = *p + struct_len; 87 ceph_decode_64_safe(p, end, info->max_bytes, bad); 88 ceph_decode_64_safe(p, end, info->max_files, bad); 89 *p = end; 90 return 0; 91 bad: 92 return -EIO; 93 } 94 95 /* 96 * parse individual inode info 97 */ 98 static int parse_reply_info_in(void **p, void *end, 99 struct ceph_mds_reply_info_in *info, 100 u64 features) 101 { 102 int err = 0; 103 u8 struct_v = 0; 104 105 if (features == (u64)-1) { 106 u32 struct_len; 107 u8 struct_compat; 108 ceph_decode_8_safe(p, end, struct_v, bad); 109 ceph_decode_8_safe(p, end, struct_compat, bad); 110 /* struct_v is expected to be >= 1. we only understand 111 * encoding with struct_compat == 1. */ 112 if (!struct_v || struct_compat != 1) 113 goto bad; 114 ceph_decode_32_safe(p, end, struct_len, bad); 115 ceph_decode_need(p, end, struct_len, bad); 116 end = *p + struct_len; 117 } 118 119 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 120 info->in = *p; 121 *p += sizeof(struct ceph_mds_reply_inode) + 122 sizeof(*info->in->fragtree.splits) * 123 le32_to_cpu(info->in->fragtree.nsplits); 124 125 ceph_decode_32_safe(p, end, info->symlink_len, bad); 126 ceph_decode_need(p, end, info->symlink_len, bad); 127 info->symlink = *p; 128 *p += info->symlink_len; 129 130 ceph_decode_copy_safe(p, end, &info->dir_layout, 131 sizeof(info->dir_layout), bad); 132 ceph_decode_32_safe(p, end, info->xattr_len, bad); 133 ceph_decode_need(p, end, info->xattr_len, bad); 134 info->xattr_data = *p; 135 *p += info->xattr_len; 136 137 if (features == (u64)-1) { 138 /* inline data */ 139 ceph_decode_64_safe(p, end, info->inline_version, bad); 140 ceph_decode_32_safe(p, end, info->inline_len, bad); 141 ceph_decode_need(p, end, info->inline_len, bad); 142 info->inline_data = *p; 143 *p += info->inline_len; 144 /* quota */ 145 err = parse_reply_info_quota(p, end, info); 146 if (err < 0) 147 goto out_bad; 148 /* pool namespace */ 149 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 150 if (info->pool_ns_len > 0) { 151 ceph_decode_need(p, end, info->pool_ns_len, bad); 152 info->pool_ns_data = *p; 153 *p += info->pool_ns_len; 154 } 155 156 /* btime */ 157 ceph_decode_need(p, end, sizeof(info->btime), bad); 158 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 159 160 /* change attribute */ 161 ceph_decode_64_safe(p, end, info->change_attr, bad); 162 163 /* dir pin */ 164 if (struct_v >= 2) { 165 ceph_decode_32_safe(p, end, info->dir_pin, bad); 166 } else { 167 info->dir_pin = -ENODATA; 168 } 169 170 /* snapshot birth time, remains zero for v<=2 */ 171 if (struct_v >= 3) { 172 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 173 ceph_decode_copy(p, &info->snap_btime, 174 sizeof(info->snap_btime)); 175 } else { 176 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 177 } 178 179 /* snapshot count, remains zero for v<=3 */ 180 if (struct_v >= 4) { 181 ceph_decode_64_safe(p, end, info->rsnaps, bad); 182 } else { 183 info->rsnaps = 0; 184 } 185 186 *p = end; 187 } else { 188 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 189 ceph_decode_64_safe(p, end, info->inline_version, bad); 190 ceph_decode_32_safe(p, end, info->inline_len, bad); 191 ceph_decode_need(p, end, info->inline_len, bad); 192 info->inline_data = *p; 193 *p += info->inline_len; 194 } else 195 info->inline_version = CEPH_INLINE_NONE; 196 197 if (features & CEPH_FEATURE_MDS_QUOTA) { 198 err = parse_reply_info_quota(p, end, info); 199 if (err < 0) 200 goto out_bad; 201 } else { 202 info->max_bytes = 0; 203 info->max_files = 0; 204 } 205 206 info->pool_ns_len = 0; 207 info->pool_ns_data = NULL; 208 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 209 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 210 if (info->pool_ns_len > 0) { 211 ceph_decode_need(p, end, info->pool_ns_len, bad); 212 info->pool_ns_data = *p; 213 *p += info->pool_ns_len; 214 } 215 } 216 217 if (features & CEPH_FEATURE_FS_BTIME) { 218 ceph_decode_need(p, end, sizeof(info->btime), bad); 219 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 220 ceph_decode_64_safe(p, end, info->change_attr, bad); 221 } 222 223 info->dir_pin = -ENODATA; 224 /* info->snap_btime and info->rsnaps remain zero */ 225 } 226 return 0; 227 bad: 228 err = -EIO; 229 out_bad: 230 return err; 231 } 232 233 static int parse_reply_info_dir(void **p, void *end, 234 struct ceph_mds_reply_dirfrag **dirfrag, 235 u64 features) 236 { 237 if (features == (u64)-1) { 238 u8 struct_v, struct_compat; 239 u32 struct_len; 240 ceph_decode_8_safe(p, end, struct_v, bad); 241 ceph_decode_8_safe(p, end, struct_compat, bad); 242 /* struct_v is expected to be >= 1. we only understand 243 * encoding whose struct_compat == 1. */ 244 if (!struct_v || struct_compat != 1) 245 goto bad; 246 ceph_decode_32_safe(p, end, struct_len, bad); 247 ceph_decode_need(p, end, struct_len, bad); 248 end = *p + struct_len; 249 } 250 251 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 252 *dirfrag = *p; 253 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 254 if (unlikely(*p > end)) 255 goto bad; 256 if (features == (u64)-1) 257 *p = end; 258 return 0; 259 bad: 260 return -EIO; 261 } 262 263 static int parse_reply_info_lease(void **p, void *end, 264 struct ceph_mds_reply_lease **lease, 265 u64 features) 266 { 267 if (features == (u64)-1) { 268 u8 struct_v, struct_compat; 269 u32 struct_len; 270 ceph_decode_8_safe(p, end, struct_v, bad); 271 ceph_decode_8_safe(p, end, struct_compat, bad); 272 /* struct_v is expected to be >= 1. we only understand 273 * encoding whose struct_compat == 1. */ 274 if (!struct_v || struct_compat != 1) 275 goto bad; 276 ceph_decode_32_safe(p, end, struct_len, bad); 277 ceph_decode_need(p, end, struct_len, bad); 278 end = *p + struct_len; 279 } 280 281 ceph_decode_need(p, end, sizeof(**lease), bad); 282 *lease = *p; 283 *p += sizeof(**lease); 284 if (features == (u64)-1) 285 *p = end; 286 return 0; 287 bad: 288 return -EIO; 289 } 290 291 /* 292 * parse a normal reply, which may contain a (dir+)dentry and/or a 293 * target inode. 294 */ 295 static int parse_reply_info_trace(void **p, void *end, 296 struct ceph_mds_reply_info_parsed *info, 297 u64 features) 298 { 299 int err; 300 301 if (info->head->is_dentry) { 302 err = parse_reply_info_in(p, end, &info->diri, features); 303 if (err < 0) 304 goto out_bad; 305 306 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 307 if (err < 0) 308 goto out_bad; 309 310 ceph_decode_32_safe(p, end, info->dname_len, bad); 311 ceph_decode_need(p, end, info->dname_len, bad); 312 info->dname = *p; 313 *p += info->dname_len; 314 315 err = parse_reply_info_lease(p, end, &info->dlease, features); 316 if (err < 0) 317 goto out_bad; 318 } 319 320 if (info->head->is_target) { 321 err = parse_reply_info_in(p, end, &info->targeti, features); 322 if (err < 0) 323 goto out_bad; 324 } 325 326 if (unlikely(*p != end)) 327 goto bad; 328 return 0; 329 330 bad: 331 err = -EIO; 332 out_bad: 333 pr_err("problem parsing mds trace %d\n", err); 334 return err; 335 } 336 337 /* 338 * parse readdir results 339 */ 340 static int parse_reply_info_readdir(void **p, void *end, 341 struct ceph_mds_reply_info_parsed *info, 342 u64 features) 343 { 344 u32 num, i = 0; 345 int err; 346 347 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 348 if (err < 0) 349 goto out_bad; 350 351 ceph_decode_need(p, end, sizeof(num) + 2, bad); 352 num = ceph_decode_32(p); 353 { 354 u16 flags = ceph_decode_16(p); 355 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 356 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 357 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 358 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 359 } 360 if (num == 0) 361 goto done; 362 363 BUG_ON(!info->dir_entries); 364 if ((unsigned long)(info->dir_entries + num) > 365 (unsigned long)info->dir_entries + info->dir_buf_size) { 366 pr_err("dir contents are larger than expected\n"); 367 WARN_ON(1); 368 goto bad; 369 } 370 371 info->dir_nr = num; 372 while (num) { 373 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 374 /* dentry */ 375 ceph_decode_32_safe(p, end, rde->name_len, bad); 376 ceph_decode_need(p, end, rde->name_len, bad); 377 rde->name = *p; 378 *p += rde->name_len; 379 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 380 381 /* dentry lease */ 382 err = parse_reply_info_lease(p, end, &rde->lease, features); 383 if (err) 384 goto out_bad; 385 /* inode */ 386 err = parse_reply_info_in(p, end, &rde->inode, features); 387 if (err < 0) 388 goto out_bad; 389 /* ceph_readdir_prepopulate() will update it */ 390 rde->offset = 0; 391 i++; 392 num--; 393 } 394 395 done: 396 /* Skip over any unrecognized fields */ 397 *p = end; 398 return 0; 399 400 bad: 401 err = -EIO; 402 out_bad: 403 pr_err("problem parsing dir contents %d\n", err); 404 return err; 405 } 406 407 /* 408 * parse fcntl F_GETLK results 409 */ 410 static int parse_reply_info_filelock(void **p, void *end, 411 struct ceph_mds_reply_info_parsed *info, 412 u64 features) 413 { 414 if (*p + sizeof(*info->filelock_reply) > end) 415 goto bad; 416 417 info->filelock_reply = *p; 418 419 /* Skip over any unrecognized fields */ 420 *p = end; 421 return 0; 422 bad: 423 return -EIO; 424 } 425 426 427 #if BITS_PER_LONG == 64 428 429 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 430 431 static int ceph_parse_deleg_inos(void **p, void *end, 432 struct ceph_mds_session *s) 433 { 434 u32 sets; 435 436 ceph_decode_32_safe(p, end, sets, bad); 437 dout("got %u sets of delegated inodes\n", sets); 438 while (sets--) { 439 u64 start, len, ino; 440 441 ceph_decode_64_safe(p, end, start, bad); 442 ceph_decode_64_safe(p, end, len, bad); 443 444 /* Don't accept a delegation of system inodes */ 445 if (start < CEPH_INO_SYSTEM_BASE) { 446 pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n", 447 start, len); 448 continue; 449 } 450 while (len--) { 451 int err = xa_insert(&s->s_delegated_inos, ino = start++, 452 DELEGATED_INO_AVAILABLE, 453 GFP_KERNEL); 454 if (!err) { 455 dout("added delegated inode 0x%llx\n", 456 start - 1); 457 } else if (err == -EBUSY) { 458 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n", 459 start - 1); 460 } else { 461 return err; 462 } 463 } 464 } 465 return 0; 466 bad: 467 return -EIO; 468 } 469 470 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 471 { 472 unsigned long ino; 473 void *val; 474 475 xa_for_each(&s->s_delegated_inos, ino, val) { 476 val = xa_erase(&s->s_delegated_inos, ino); 477 if (val == DELEGATED_INO_AVAILABLE) 478 return ino; 479 } 480 return 0; 481 } 482 483 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 484 { 485 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 486 GFP_KERNEL); 487 } 488 #else /* BITS_PER_LONG == 64 */ 489 /* 490 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 491 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 492 * and bottom words? 493 */ 494 static int ceph_parse_deleg_inos(void **p, void *end, 495 struct ceph_mds_session *s) 496 { 497 u32 sets; 498 499 ceph_decode_32_safe(p, end, sets, bad); 500 if (sets) 501 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 502 return 0; 503 bad: 504 return -EIO; 505 } 506 507 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 508 { 509 return 0; 510 } 511 512 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 513 { 514 return 0; 515 } 516 #endif /* BITS_PER_LONG == 64 */ 517 518 /* 519 * parse create results 520 */ 521 static int parse_reply_info_create(void **p, void *end, 522 struct ceph_mds_reply_info_parsed *info, 523 u64 features, struct ceph_mds_session *s) 524 { 525 int ret; 526 527 if (features == (u64)-1 || 528 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 529 if (*p == end) { 530 /* Malformed reply? */ 531 info->has_create_ino = false; 532 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 533 info->has_create_ino = true; 534 /* struct_v, struct_compat, and len */ 535 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad); 536 ceph_decode_64_safe(p, end, info->ino, bad); 537 ret = ceph_parse_deleg_inos(p, end, s); 538 if (ret) 539 return ret; 540 } else { 541 /* legacy */ 542 ceph_decode_64_safe(p, end, info->ino, bad); 543 info->has_create_ino = true; 544 } 545 } else { 546 if (*p != end) 547 goto bad; 548 } 549 550 /* Skip over any unrecognized fields */ 551 *p = end; 552 return 0; 553 bad: 554 return -EIO; 555 } 556 557 /* 558 * parse extra results 559 */ 560 static int parse_reply_info_extra(void **p, void *end, 561 struct ceph_mds_reply_info_parsed *info, 562 u64 features, struct ceph_mds_session *s) 563 { 564 u32 op = le32_to_cpu(info->head->op); 565 566 if (op == CEPH_MDS_OP_GETFILELOCK) 567 return parse_reply_info_filelock(p, end, info, features); 568 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 569 return parse_reply_info_readdir(p, end, info, features); 570 else if (op == CEPH_MDS_OP_CREATE) 571 return parse_reply_info_create(p, end, info, features, s); 572 else 573 return -EIO; 574 } 575 576 /* 577 * parse entire mds reply 578 */ 579 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 580 struct ceph_mds_reply_info_parsed *info, 581 u64 features) 582 { 583 void *p, *end; 584 u32 len; 585 int err; 586 587 info->head = msg->front.iov_base; 588 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 589 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 590 591 /* trace */ 592 ceph_decode_32_safe(&p, end, len, bad); 593 if (len > 0) { 594 ceph_decode_need(&p, end, len, bad); 595 err = parse_reply_info_trace(&p, p+len, info, features); 596 if (err < 0) 597 goto out_bad; 598 } 599 600 /* extra */ 601 ceph_decode_32_safe(&p, end, len, bad); 602 if (len > 0) { 603 ceph_decode_need(&p, end, len, bad); 604 err = parse_reply_info_extra(&p, p+len, info, features, s); 605 if (err < 0) 606 goto out_bad; 607 } 608 609 /* snap blob */ 610 ceph_decode_32_safe(&p, end, len, bad); 611 info->snapblob_len = len; 612 info->snapblob = p; 613 p += len; 614 615 if (p != end) 616 goto bad; 617 return 0; 618 619 bad: 620 err = -EIO; 621 out_bad: 622 pr_err("mds parse_reply err %d\n", err); 623 return err; 624 } 625 626 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 627 { 628 if (!info->dir_entries) 629 return; 630 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 631 } 632 633 634 /* 635 * sessions 636 */ 637 const char *ceph_session_state_name(int s) 638 { 639 switch (s) { 640 case CEPH_MDS_SESSION_NEW: return "new"; 641 case CEPH_MDS_SESSION_OPENING: return "opening"; 642 case CEPH_MDS_SESSION_OPEN: return "open"; 643 case CEPH_MDS_SESSION_HUNG: return "hung"; 644 case CEPH_MDS_SESSION_CLOSING: return "closing"; 645 case CEPH_MDS_SESSION_CLOSED: return "closed"; 646 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 647 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 648 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 649 default: return "???"; 650 } 651 } 652 653 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 654 { 655 if (refcount_inc_not_zero(&s->s_ref)) { 656 dout("mdsc get_session %p %d -> %d\n", s, 657 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref)); 658 return s; 659 } else { 660 dout("mdsc get_session %p 0 -- FAIL\n", s); 661 return NULL; 662 } 663 } 664 665 void ceph_put_mds_session(struct ceph_mds_session *s) 666 { 667 dout("mdsc put_session %p %d -> %d\n", s, 668 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1); 669 if (refcount_dec_and_test(&s->s_ref)) { 670 if (s->s_auth.authorizer) 671 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 672 WARN_ON(mutex_is_locked(&s->s_mutex)); 673 xa_destroy(&s->s_delegated_inos); 674 kfree(s); 675 } 676 } 677 678 /* 679 * called under mdsc->mutex 680 */ 681 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 682 int mds) 683 { 684 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 685 return NULL; 686 return ceph_get_mds_session(mdsc->sessions[mds]); 687 } 688 689 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 690 { 691 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 692 return false; 693 else 694 return true; 695 } 696 697 static int __verify_registered_session(struct ceph_mds_client *mdsc, 698 struct ceph_mds_session *s) 699 { 700 if (s->s_mds >= mdsc->max_sessions || 701 mdsc->sessions[s->s_mds] != s) 702 return -ENOENT; 703 return 0; 704 } 705 706 /* 707 * create+register a new session for given mds. 708 * called under mdsc->mutex. 709 */ 710 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 711 int mds) 712 { 713 struct ceph_mds_session *s; 714 715 if (mds >= mdsc->mdsmap->possible_max_rank) 716 return ERR_PTR(-EINVAL); 717 718 s = kzalloc(sizeof(*s), GFP_NOFS); 719 if (!s) 720 return ERR_PTR(-ENOMEM); 721 722 if (mds >= mdsc->max_sessions) { 723 int newmax = 1 << get_count_order(mds + 1); 724 struct ceph_mds_session **sa; 725 726 dout("%s: realloc to %d\n", __func__, newmax); 727 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 728 if (!sa) 729 goto fail_realloc; 730 if (mdsc->sessions) { 731 memcpy(sa, mdsc->sessions, 732 mdsc->max_sessions * sizeof(void *)); 733 kfree(mdsc->sessions); 734 } 735 mdsc->sessions = sa; 736 mdsc->max_sessions = newmax; 737 } 738 739 dout("%s: mds%d\n", __func__, mds); 740 s->s_mdsc = mdsc; 741 s->s_mds = mds; 742 s->s_state = CEPH_MDS_SESSION_NEW; 743 s->s_ttl = 0; 744 s->s_seq = 0; 745 mutex_init(&s->s_mutex); 746 747 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 748 749 spin_lock_init(&s->s_gen_ttl_lock); 750 s->s_cap_gen = 1; 751 s->s_cap_ttl = jiffies - 1; 752 753 spin_lock_init(&s->s_cap_lock); 754 s->s_renew_requested = 0; 755 s->s_renew_seq = 0; 756 INIT_LIST_HEAD(&s->s_caps); 757 s->s_nr_caps = 0; 758 refcount_set(&s->s_ref, 1); 759 INIT_LIST_HEAD(&s->s_waiting); 760 INIT_LIST_HEAD(&s->s_unsafe); 761 xa_init(&s->s_delegated_inos); 762 s->s_num_cap_releases = 0; 763 s->s_cap_reconnect = 0; 764 s->s_cap_iterator = NULL; 765 INIT_LIST_HEAD(&s->s_cap_releases); 766 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 767 768 INIT_LIST_HEAD(&s->s_cap_dirty); 769 INIT_LIST_HEAD(&s->s_cap_flushing); 770 771 mdsc->sessions[mds] = s; 772 atomic_inc(&mdsc->num_sessions); 773 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 774 775 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 776 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 777 778 return s; 779 780 fail_realloc: 781 kfree(s); 782 return ERR_PTR(-ENOMEM); 783 } 784 785 /* 786 * called under mdsc->mutex 787 */ 788 static void __unregister_session(struct ceph_mds_client *mdsc, 789 struct ceph_mds_session *s) 790 { 791 dout("__unregister_session mds%d %p\n", s->s_mds, s); 792 BUG_ON(mdsc->sessions[s->s_mds] != s); 793 mdsc->sessions[s->s_mds] = NULL; 794 ceph_con_close(&s->s_con); 795 ceph_put_mds_session(s); 796 atomic_dec(&mdsc->num_sessions); 797 } 798 799 /* 800 * drop session refs in request. 801 * 802 * should be last request ref, or hold mdsc->mutex 803 */ 804 static void put_request_session(struct ceph_mds_request *req) 805 { 806 if (req->r_session) { 807 ceph_put_mds_session(req->r_session); 808 req->r_session = NULL; 809 } 810 } 811 812 void ceph_mdsc_release_request(struct kref *kref) 813 { 814 struct ceph_mds_request *req = container_of(kref, 815 struct ceph_mds_request, 816 r_kref); 817 ceph_mdsc_release_dir_caps_no_check(req); 818 destroy_reply_info(&req->r_reply_info); 819 if (req->r_request) 820 ceph_msg_put(req->r_request); 821 if (req->r_reply) 822 ceph_msg_put(req->r_reply); 823 if (req->r_inode) { 824 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 825 /* avoid calling iput_final() in mds dispatch threads */ 826 ceph_async_iput(req->r_inode); 827 } 828 if (req->r_parent) { 829 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 830 ceph_async_iput(req->r_parent); 831 } 832 ceph_async_iput(req->r_target_inode); 833 if (req->r_dentry) 834 dput(req->r_dentry); 835 if (req->r_old_dentry) 836 dput(req->r_old_dentry); 837 if (req->r_old_dentry_dir) { 838 /* 839 * track (and drop pins for) r_old_dentry_dir 840 * separately, since r_old_dentry's d_parent may have 841 * changed between the dir mutex being dropped and 842 * this request being freed. 843 */ 844 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 845 CEPH_CAP_PIN); 846 ceph_async_iput(req->r_old_dentry_dir); 847 } 848 kfree(req->r_path1); 849 kfree(req->r_path2); 850 put_cred(req->r_cred); 851 if (req->r_pagelist) 852 ceph_pagelist_release(req->r_pagelist); 853 put_request_session(req); 854 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 855 WARN_ON_ONCE(!list_empty(&req->r_wait)); 856 kmem_cache_free(ceph_mds_request_cachep, req); 857 } 858 859 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 860 861 /* 862 * lookup session, bump ref if found. 863 * 864 * called under mdsc->mutex. 865 */ 866 static struct ceph_mds_request * 867 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 868 { 869 struct ceph_mds_request *req; 870 871 req = lookup_request(&mdsc->request_tree, tid); 872 if (req) 873 ceph_mdsc_get_request(req); 874 875 return req; 876 } 877 878 /* 879 * Register an in-flight request, and assign a tid. Link to directory 880 * are modifying (if any). 881 * 882 * Called under mdsc->mutex. 883 */ 884 static void __register_request(struct ceph_mds_client *mdsc, 885 struct ceph_mds_request *req, 886 struct inode *dir) 887 { 888 int ret = 0; 889 890 req->r_tid = ++mdsc->last_tid; 891 if (req->r_num_caps) { 892 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 893 req->r_num_caps); 894 if (ret < 0) { 895 pr_err("__register_request %p " 896 "failed to reserve caps: %d\n", req, ret); 897 /* set req->r_err to fail early from __do_request */ 898 req->r_err = ret; 899 return; 900 } 901 } 902 dout("__register_request %p tid %lld\n", req, req->r_tid); 903 ceph_mdsc_get_request(req); 904 insert_request(&mdsc->request_tree, req); 905 906 req->r_cred = get_current_cred(); 907 908 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 909 mdsc->oldest_tid = req->r_tid; 910 911 if (dir) { 912 struct ceph_inode_info *ci = ceph_inode(dir); 913 914 ihold(dir); 915 req->r_unsafe_dir = dir; 916 spin_lock(&ci->i_unsafe_lock); 917 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 918 spin_unlock(&ci->i_unsafe_lock); 919 } 920 } 921 922 static void __unregister_request(struct ceph_mds_client *mdsc, 923 struct ceph_mds_request *req) 924 { 925 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 926 927 /* Never leave an unregistered request on an unsafe list! */ 928 list_del_init(&req->r_unsafe_item); 929 930 if (req->r_tid == mdsc->oldest_tid) { 931 struct rb_node *p = rb_next(&req->r_node); 932 mdsc->oldest_tid = 0; 933 while (p) { 934 struct ceph_mds_request *next_req = 935 rb_entry(p, struct ceph_mds_request, r_node); 936 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 937 mdsc->oldest_tid = next_req->r_tid; 938 break; 939 } 940 p = rb_next(p); 941 } 942 } 943 944 erase_request(&mdsc->request_tree, req); 945 946 if (req->r_unsafe_dir) { 947 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 948 spin_lock(&ci->i_unsafe_lock); 949 list_del_init(&req->r_unsafe_dir_item); 950 spin_unlock(&ci->i_unsafe_lock); 951 } 952 if (req->r_target_inode && 953 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 954 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 955 spin_lock(&ci->i_unsafe_lock); 956 list_del_init(&req->r_unsafe_target_item); 957 spin_unlock(&ci->i_unsafe_lock); 958 } 959 960 if (req->r_unsafe_dir) { 961 /* avoid calling iput_final() in mds dispatch threads */ 962 ceph_async_iput(req->r_unsafe_dir); 963 req->r_unsafe_dir = NULL; 964 } 965 966 complete_all(&req->r_safe_completion); 967 968 ceph_mdsc_put_request(req); 969 } 970 971 /* 972 * Walk back up the dentry tree until we hit a dentry representing a 973 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 974 * when calling this) to ensure that the objects won't disappear while we're 975 * working with them. Once we hit a candidate dentry, we attempt to take a 976 * reference to it, and return that as the result. 977 */ 978 static struct inode *get_nonsnap_parent(struct dentry *dentry) 979 { 980 struct inode *inode = NULL; 981 982 while (dentry && !IS_ROOT(dentry)) { 983 inode = d_inode_rcu(dentry); 984 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 985 break; 986 dentry = dentry->d_parent; 987 } 988 if (inode) 989 inode = igrab(inode); 990 return inode; 991 } 992 993 /* 994 * Choose mds to send request to next. If there is a hint set in the 995 * request (e.g., due to a prior forward hint from the mds), use that. 996 * Otherwise, consult frag tree and/or caps to identify the 997 * appropriate mds. If all else fails, choose randomly. 998 * 999 * Called under mdsc->mutex. 1000 */ 1001 static int __choose_mds(struct ceph_mds_client *mdsc, 1002 struct ceph_mds_request *req, 1003 bool *random) 1004 { 1005 struct inode *inode; 1006 struct ceph_inode_info *ci; 1007 struct ceph_cap *cap; 1008 int mode = req->r_direct_mode; 1009 int mds = -1; 1010 u32 hash = req->r_direct_hash; 1011 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1012 1013 if (random) 1014 *random = false; 1015 1016 /* 1017 * is there a specific mds we should try? ignore hint if we have 1018 * no session and the mds is not up (active or recovering). 1019 */ 1020 if (req->r_resend_mds >= 0 && 1021 (__have_session(mdsc, req->r_resend_mds) || 1022 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1023 dout("%s using resend_mds mds%d\n", __func__, 1024 req->r_resend_mds); 1025 return req->r_resend_mds; 1026 } 1027 1028 if (mode == USE_RANDOM_MDS) 1029 goto random; 1030 1031 inode = NULL; 1032 if (req->r_inode) { 1033 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1034 inode = req->r_inode; 1035 ihold(inode); 1036 } else { 1037 /* req->r_dentry is non-null for LSSNAP request */ 1038 rcu_read_lock(); 1039 inode = get_nonsnap_parent(req->r_dentry); 1040 rcu_read_unlock(); 1041 dout("%s using snapdir's parent %p\n", __func__, inode); 1042 } 1043 } else if (req->r_dentry) { 1044 /* ignore race with rename; old or new d_parent is okay */ 1045 struct dentry *parent; 1046 struct inode *dir; 1047 1048 rcu_read_lock(); 1049 parent = READ_ONCE(req->r_dentry->d_parent); 1050 dir = req->r_parent ? : d_inode_rcu(parent); 1051 1052 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1053 /* not this fs or parent went negative */ 1054 inode = d_inode(req->r_dentry); 1055 if (inode) 1056 ihold(inode); 1057 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1058 /* direct snapped/virtual snapdir requests 1059 * based on parent dir inode */ 1060 inode = get_nonsnap_parent(parent); 1061 dout("%s using nonsnap parent %p\n", __func__, inode); 1062 } else { 1063 /* dentry target */ 1064 inode = d_inode(req->r_dentry); 1065 if (!inode || mode == USE_AUTH_MDS) { 1066 /* dir + name */ 1067 inode = igrab(dir); 1068 hash = ceph_dentry_hash(dir, req->r_dentry); 1069 is_hash = true; 1070 } else { 1071 ihold(inode); 1072 } 1073 } 1074 rcu_read_unlock(); 1075 } 1076 1077 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, 1078 hash, mode); 1079 if (!inode) 1080 goto random; 1081 ci = ceph_inode(inode); 1082 1083 if (is_hash && S_ISDIR(inode->i_mode)) { 1084 struct ceph_inode_frag frag; 1085 int found; 1086 1087 ceph_choose_frag(ci, hash, &frag, &found); 1088 if (found) { 1089 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1090 u8 r; 1091 1092 /* choose a random replica */ 1093 get_random_bytes(&r, 1); 1094 r %= frag.ndist; 1095 mds = frag.dist[r]; 1096 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", 1097 __func__, inode, ceph_vinop(inode), 1098 frag.frag, mds, (int)r, frag.ndist); 1099 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1100 CEPH_MDS_STATE_ACTIVE && 1101 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1102 goto out; 1103 } 1104 1105 /* since this file/dir wasn't known to be 1106 * replicated, then we want to look for the 1107 * authoritative mds. */ 1108 if (frag.mds >= 0) { 1109 /* choose auth mds */ 1110 mds = frag.mds; 1111 dout("%s %p %llx.%llx frag %u mds%d (auth)\n", 1112 __func__, inode, ceph_vinop(inode), 1113 frag.frag, mds); 1114 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1115 CEPH_MDS_STATE_ACTIVE) { 1116 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1117 mds)) 1118 goto out; 1119 } 1120 } 1121 mode = USE_AUTH_MDS; 1122 } 1123 } 1124 1125 spin_lock(&ci->i_ceph_lock); 1126 cap = NULL; 1127 if (mode == USE_AUTH_MDS) 1128 cap = ci->i_auth_cap; 1129 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1130 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1131 if (!cap) { 1132 spin_unlock(&ci->i_ceph_lock); 1133 ceph_async_iput(inode); 1134 goto random; 1135 } 1136 mds = cap->session->s_mds; 1137 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, 1138 inode, ceph_vinop(inode), mds, 1139 cap == ci->i_auth_cap ? "auth " : "", cap); 1140 spin_unlock(&ci->i_ceph_lock); 1141 out: 1142 /* avoid calling iput_final() while holding mdsc->mutex or 1143 * in mds dispatch threads */ 1144 ceph_async_iput(inode); 1145 return mds; 1146 1147 random: 1148 if (random) 1149 *random = true; 1150 1151 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1152 dout("%s chose random mds%d\n", __func__, mds); 1153 return mds; 1154 } 1155 1156 1157 /* 1158 * session messages 1159 */ 1160 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 1161 { 1162 struct ceph_msg *msg; 1163 struct ceph_mds_session_head *h; 1164 1165 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1166 false); 1167 if (!msg) { 1168 pr_err("create_session_msg ENOMEM creating msg\n"); 1169 return NULL; 1170 } 1171 h = msg->front.iov_base; 1172 h->op = cpu_to_le32(op); 1173 h->seq = cpu_to_le64(seq); 1174 1175 return msg; 1176 } 1177 1178 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1179 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1180 static int encode_supported_features(void **p, void *end) 1181 { 1182 static const size_t count = ARRAY_SIZE(feature_bits); 1183 1184 if (count > 0) { 1185 size_t i; 1186 size_t size = FEATURE_BYTES(count); 1187 1188 if (WARN_ON_ONCE(*p + 4 + size > end)) 1189 return -ERANGE; 1190 1191 ceph_encode_32(p, size); 1192 memset(*p, 0, size); 1193 for (i = 0; i < count; i++) 1194 ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8); 1195 *p += size; 1196 } else { 1197 if (WARN_ON_ONCE(*p + 4 > end)) 1198 return -ERANGE; 1199 1200 ceph_encode_32(p, 0); 1201 } 1202 1203 return 0; 1204 } 1205 1206 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1207 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1208 static int encode_metric_spec(void **p, void *end) 1209 { 1210 static const size_t count = ARRAY_SIZE(metric_bits); 1211 1212 /* header */ 1213 if (WARN_ON_ONCE(*p + 2 > end)) 1214 return -ERANGE; 1215 1216 ceph_encode_8(p, 1); /* version */ 1217 ceph_encode_8(p, 1); /* compat */ 1218 1219 if (count > 0) { 1220 size_t i; 1221 size_t size = METRIC_BYTES(count); 1222 1223 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1224 return -ERANGE; 1225 1226 /* metric spec info length */ 1227 ceph_encode_32(p, 4 + size); 1228 1229 /* metric spec */ 1230 ceph_encode_32(p, size); 1231 memset(*p, 0, size); 1232 for (i = 0; i < count; i++) 1233 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1234 *p += size; 1235 } else { 1236 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1237 return -ERANGE; 1238 1239 /* metric spec info length */ 1240 ceph_encode_32(p, 4); 1241 /* metric spec */ 1242 ceph_encode_32(p, 0); 1243 } 1244 1245 return 0; 1246 } 1247 1248 /* 1249 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1250 * to include additional client metadata fields. 1251 */ 1252 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1253 { 1254 struct ceph_msg *msg; 1255 struct ceph_mds_session_head *h; 1256 int i; 1257 int extra_bytes = 0; 1258 int metadata_key_count = 0; 1259 struct ceph_options *opt = mdsc->fsc->client->options; 1260 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1261 size_t size, count; 1262 void *p, *end; 1263 int ret; 1264 1265 const char* metadata[][2] = { 1266 {"hostname", mdsc->nodename}, 1267 {"kernel_version", init_utsname()->release}, 1268 {"entity_id", opt->name ? : ""}, 1269 {"root", fsopt->server_path ? : "/"}, 1270 {NULL, NULL} 1271 }; 1272 1273 /* Calculate serialized length of metadata */ 1274 extra_bytes = 4; /* map length */ 1275 for (i = 0; metadata[i][0]; ++i) { 1276 extra_bytes += 8 + strlen(metadata[i][0]) + 1277 strlen(metadata[i][1]); 1278 metadata_key_count++; 1279 } 1280 1281 /* supported feature */ 1282 size = 0; 1283 count = ARRAY_SIZE(feature_bits); 1284 if (count > 0) 1285 size = FEATURE_BYTES(count); 1286 extra_bytes += 4 + size; 1287 1288 /* metric spec */ 1289 size = 0; 1290 count = ARRAY_SIZE(metric_bits); 1291 if (count > 0) 1292 size = METRIC_BYTES(count); 1293 extra_bytes += 2 + 4 + 4 + size; 1294 1295 /* Allocate the message */ 1296 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1297 GFP_NOFS, false); 1298 if (!msg) { 1299 pr_err("create_session_msg ENOMEM creating msg\n"); 1300 return ERR_PTR(-ENOMEM); 1301 } 1302 p = msg->front.iov_base; 1303 end = p + msg->front.iov_len; 1304 1305 h = p; 1306 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1307 h->seq = cpu_to_le64(seq); 1308 1309 /* 1310 * Serialize client metadata into waiting buffer space, using 1311 * the format that userspace expects for map<string, string> 1312 * 1313 * ClientSession messages with metadata are v4 1314 */ 1315 msg->hdr.version = cpu_to_le16(4); 1316 msg->hdr.compat_version = cpu_to_le16(1); 1317 1318 /* The write pointer, following the session_head structure */ 1319 p += sizeof(*h); 1320 1321 /* Number of entries in the map */ 1322 ceph_encode_32(&p, metadata_key_count); 1323 1324 /* Two length-prefixed strings for each entry in the map */ 1325 for (i = 0; metadata[i][0]; ++i) { 1326 size_t const key_len = strlen(metadata[i][0]); 1327 size_t const val_len = strlen(metadata[i][1]); 1328 1329 ceph_encode_32(&p, key_len); 1330 memcpy(p, metadata[i][0], key_len); 1331 p += key_len; 1332 ceph_encode_32(&p, val_len); 1333 memcpy(p, metadata[i][1], val_len); 1334 p += val_len; 1335 } 1336 1337 ret = encode_supported_features(&p, end); 1338 if (ret) { 1339 pr_err("encode_supported_features failed!\n"); 1340 ceph_msg_put(msg); 1341 return ERR_PTR(ret); 1342 } 1343 1344 ret = encode_metric_spec(&p, end); 1345 if (ret) { 1346 pr_err("encode_metric_spec failed!\n"); 1347 ceph_msg_put(msg); 1348 return ERR_PTR(ret); 1349 } 1350 1351 msg->front.iov_len = p - msg->front.iov_base; 1352 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1353 1354 return msg; 1355 } 1356 1357 /* 1358 * send session open request. 1359 * 1360 * called under mdsc->mutex 1361 */ 1362 static int __open_session(struct ceph_mds_client *mdsc, 1363 struct ceph_mds_session *session) 1364 { 1365 struct ceph_msg *msg; 1366 int mstate; 1367 int mds = session->s_mds; 1368 1369 /* wait for mds to go active? */ 1370 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1371 dout("open_session to mds%d (%s)\n", mds, 1372 ceph_mds_state_name(mstate)); 1373 session->s_state = CEPH_MDS_SESSION_OPENING; 1374 session->s_renew_requested = jiffies; 1375 1376 /* send connect message */ 1377 msg = create_session_open_msg(mdsc, session->s_seq); 1378 if (IS_ERR(msg)) 1379 return PTR_ERR(msg); 1380 ceph_con_send(&session->s_con, msg); 1381 return 0; 1382 } 1383 1384 /* 1385 * open sessions for any export targets for the given mds 1386 * 1387 * called under mdsc->mutex 1388 */ 1389 static struct ceph_mds_session * 1390 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1391 { 1392 struct ceph_mds_session *session; 1393 int ret; 1394 1395 session = __ceph_lookup_mds_session(mdsc, target); 1396 if (!session) { 1397 session = register_session(mdsc, target); 1398 if (IS_ERR(session)) 1399 return session; 1400 } 1401 if (session->s_state == CEPH_MDS_SESSION_NEW || 1402 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1403 ret = __open_session(mdsc, session); 1404 if (ret) 1405 return ERR_PTR(ret); 1406 } 1407 1408 return session; 1409 } 1410 1411 struct ceph_mds_session * 1412 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1413 { 1414 struct ceph_mds_session *session; 1415 1416 dout("open_export_target_session to mds%d\n", target); 1417 1418 mutex_lock(&mdsc->mutex); 1419 session = __open_export_target_session(mdsc, target); 1420 mutex_unlock(&mdsc->mutex); 1421 1422 return session; 1423 } 1424 1425 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1426 struct ceph_mds_session *session) 1427 { 1428 struct ceph_mds_info *mi; 1429 struct ceph_mds_session *ts; 1430 int i, mds = session->s_mds; 1431 1432 if (mds >= mdsc->mdsmap->possible_max_rank) 1433 return; 1434 1435 mi = &mdsc->mdsmap->m_info[mds]; 1436 dout("open_export_target_sessions for mds%d (%d targets)\n", 1437 session->s_mds, mi->num_export_targets); 1438 1439 for (i = 0; i < mi->num_export_targets; i++) { 1440 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1441 if (!IS_ERR(ts)) 1442 ceph_put_mds_session(ts); 1443 } 1444 } 1445 1446 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1447 struct ceph_mds_session *session) 1448 { 1449 mutex_lock(&mdsc->mutex); 1450 __open_export_target_sessions(mdsc, session); 1451 mutex_unlock(&mdsc->mutex); 1452 } 1453 1454 /* 1455 * session caps 1456 */ 1457 1458 static void detach_cap_releases(struct ceph_mds_session *session, 1459 struct list_head *target) 1460 { 1461 lockdep_assert_held(&session->s_cap_lock); 1462 1463 list_splice_init(&session->s_cap_releases, target); 1464 session->s_num_cap_releases = 0; 1465 dout("dispose_cap_releases mds%d\n", session->s_mds); 1466 } 1467 1468 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1469 struct list_head *dispose) 1470 { 1471 while (!list_empty(dispose)) { 1472 struct ceph_cap *cap; 1473 /* zero out the in-progress message */ 1474 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1475 list_del(&cap->session_caps); 1476 ceph_put_cap(mdsc, cap); 1477 } 1478 } 1479 1480 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1481 struct ceph_mds_session *session) 1482 { 1483 struct ceph_mds_request *req; 1484 struct rb_node *p; 1485 struct ceph_inode_info *ci; 1486 1487 dout("cleanup_session_requests mds%d\n", session->s_mds); 1488 mutex_lock(&mdsc->mutex); 1489 while (!list_empty(&session->s_unsafe)) { 1490 req = list_first_entry(&session->s_unsafe, 1491 struct ceph_mds_request, r_unsafe_item); 1492 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1493 req->r_tid); 1494 if (req->r_target_inode) { 1495 /* dropping unsafe change of inode's attributes */ 1496 ci = ceph_inode(req->r_target_inode); 1497 errseq_set(&ci->i_meta_err, -EIO); 1498 } 1499 if (req->r_unsafe_dir) { 1500 /* dropping unsafe directory operation */ 1501 ci = ceph_inode(req->r_unsafe_dir); 1502 errseq_set(&ci->i_meta_err, -EIO); 1503 } 1504 __unregister_request(mdsc, req); 1505 } 1506 /* zero r_attempts, so kick_requests() will re-send requests */ 1507 p = rb_first(&mdsc->request_tree); 1508 while (p) { 1509 req = rb_entry(p, struct ceph_mds_request, r_node); 1510 p = rb_next(p); 1511 if (req->r_session && 1512 req->r_session->s_mds == session->s_mds) 1513 req->r_attempts = 0; 1514 } 1515 mutex_unlock(&mdsc->mutex); 1516 } 1517 1518 /* 1519 * Helper to safely iterate over all caps associated with a session, with 1520 * special care taken to handle a racing __ceph_remove_cap(). 1521 * 1522 * Caller must hold session s_mutex. 1523 */ 1524 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1525 int (*cb)(struct inode *, struct ceph_cap *, 1526 void *), void *arg) 1527 { 1528 struct list_head *p; 1529 struct ceph_cap *cap; 1530 struct inode *inode, *last_inode = NULL; 1531 struct ceph_cap *old_cap = NULL; 1532 int ret; 1533 1534 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1535 spin_lock(&session->s_cap_lock); 1536 p = session->s_caps.next; 1537 while (p != &session->s_caps) { 1538 cap = list_entry(p, struct ceph_cap, session_caps); 1539 inode = igrab(&cap->ci->vfs_inode); 1540 if (!inode) { 1541 p = p->next; 1542 continue; 1543 } 1544 session->s_cap_iterator = cap; 1545 spin_unlock(&session->s_cap_lock); 1546 1547 if (last_inode) { 1548 /* avoid calling iput_final() while holding 1549 * s_mutex or in mds dispatch threads */ 1550 ceph_async_iput(last_inode); 1551 last_inode = NULL; 1552 } 1553 if (old_cap) { 1554 ceph_put_cap(session->s_mdsc, old_cap); 1555 old_cap = NULL; 1556 } 1557 1558 ret = cb(inode, cap, arg); 1559 last_inode = inode; 1560 1561 spin_lock(&session->s_cap_lock); 1562 p = p->next; 1563 if (!cap->ci) { 1564 dout("iterate_session_caps finishing cap %p removal\n", 1565 cap); 1566 BUG_ON(cap->session != session); 1567 cap->session = NULL; 1568 list_del_init(&cap->session_caps); 1569 session->s_nr_caps--; 1570 atomic64_dec(&session->s_mdsc->metric.total_caps); 1571 if (cap->queue_release) 1572 __ceph_queue_cap_release(session, cap); 1573 else 1574 old_cap = cap; /* put_cap it w/o locks held */ 1575 } 1576 if (ret < 0) 1577 goto out; 1578 } 1579 ret = 0; 1580 out: 1581 session->s_cap_iterator = NULL; 1582 spin_unlock(&session->s_cap_lock); 1583 1584 ceph_async_iput(last_inode); 1585 if (old_cap) 1586 ceph_put_cap(session->s_mdsc, old_cap); 1587 1588 return ret; 1589 } 1590 1591 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1592 void *arg) 1593 { 1594 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg; 1595 struct ceph_inode_info *ci = ceph_inode(inode); 1596 LIST_HEAD(to_remove); 1597 bool dirty_dropped = false; 1598 bool invalidate = false; 1599 1600 dout("removing cap %p, ci is %p, inode is %p\n", 1601 cap, ci, &ci->vfs_inode); 1602 spin_lock(&ci->i_ceph_lock); 1603 __ceph_remove_cap(cap, false); 1604 if (!ci->i_auth_cap) { 1605 struct ceph_cap_flush *cf; 1606 struct ceph_mds_client *mdsc = fsc->mdsc; 1607 1608 if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) { 1609 if (inode->i_data.nrpages > 0) 1610 invalidate = true; 1611 if (ci->i_wrbuffer_ref > 0) 1612 mapping_set_error(&inode->i_data, -EIO); 1613 } 1614 1615 while (!list_empty(&ci->i_cap_flush_list)) { 1616 cf = list_first_entry(&ci->i_cap_flush_list, 1617 struct ceph_cap_flush, i_list); 1618 list_move(&cf->i_list, &to_remove); 1619 } 1620 1621 spin_lock(&mdsc->cap_dirty_lock); 1622 1623 list_for_each_entry(cf, &to_remove, i_list) 1624 list_del(&cf->g_list); 1625 1626 if (!list_empty(&ci->i_dirty_item)) { 1627 pr_warn_ratelimited( 1628 " dropping dirty %s state for %p %lld\n", 1629 ceph_cap_string(ci->i_dirty_caps), 1630 inode, ceph_ino(inode)); 1631 ci->i_dirty_caps = 0; 1632 list_del_init(&ci->i_dirty_item); 1633 dirty_dropped = true; 1634 } 1635 if (!list_empty(&ci->i_flushing_item)) { 1636 pr_warn_ratelimited( 1637 " dropping dirty+flushing %s state for %p %lld\n", 1638 ceph_cap_string(ci->i_flushing_caps), 1639 inode, ceph_ino(inode)); 1640 ci->i_flushing_caps = 0; 1641 list_del_init(&ci->i_flushing_item); 1642 mdsc->num_cap_flushing--; 1643 dirty_dropped = true; 1644 } 1645 spin_unlock(&mdsc->cap_dirty_lock); 1646 1647 if (dirty_dropped) { 1648 errseq_set(&ci->i_meta_err, -EIO); 1649 1650 if (ci->i_wrbuffer_ref_head == 0 && 1651 ci->i_wr_ref == 0 && 1652 ci->i_dirty_caps == 0 && 1653 ci->i_flushing_caps == 0) { 1654 ceph_put_snap_context(ci->i_head_snapc); 1655 ci->i_head_snapc = NULL; 1656 } 1657 } 1658 1659 if (atomic_read(&ci->i_filelock_ref) > 0) { 1660 /* make further file lock syscall return -EIO */ 1661 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK; 1662 pr_warn_ratelimited(" dropping file locks for %p %lld\n", 1663 inode, ceph_ino(inode)); 1664 } 1665 1666 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { 1667 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); 1668 ci->i_prealloc_cap_flush = NULL; 1669 } 1670 } 1671 spin_unlock(&ci->i_ceph_lock); 1672 while (!list_empty(&to_remove)) { 1673 struct ceph_cap_flush *cf; 1674 cf = list_first_entry(&to_remove, 1675 struct ceph_cap_flush, i_list); 1676 list_del(&cf->i_list); 1677 ceph_free_cap_flush(cf); 1678 } 1679 1680 wake_up_all(&ci->i_cap_wq); 1681 if (invalidate) 1682 ceph_queue_invalidate(inode); 1683 if (dirty_dropped) 1684 iput(inode); 1685 return 0; 1686 } 1687 1688 /* 1689 * caller must hold session s_mutex 1690 */ 1691 static void remove_session_caps(struct ceph_mds_session *session) 1692 { 1693 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1694 struct super_block *sb = fsc->sb; 1695 LIST_HEAD(dispose); 1696 1697 dout("remove_session_caps on %p\n", session); 1698 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1699 1700 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1701 1702 spin_lock(&session->s_cap_lock); 1703 if (session->s_nr_caps > 0) { 1704 struct inode *inode; 1705 struct ceph_cap *cap, *prev = NULL; 1706 struct ceph_vino vino; 1707 /* 1708 * iterate_session_caps() skips inodes that are being 1709 * deleted, we need to wait until deletions are complete. 1710 * __wait_on_freeing_inode() is designed for the job, 1711 * but it is not exported, so use lookup inode function 1712 * to access it. 1713 */ 1714 while (!list_empty(&session->s_caps)) { 1715 cap = list_entry(session->s_caps.next, 1716 struct ceph_cap, session_caps); 1717 if (cap == prev) 1718 break; 1719 prev = cap; 1720 vino = cap->ci->i_vino; 1721 spin_unlock(&session->s_cap_lock); 1722 1723 inode = ceph_find_inode(sb, vino); 1724 /* avoid calling iput_final() while holding s_mutex */ 1725 ceph_async_iput(inode); 1726 1727 spin_lock(&session->s_cap_lock); 1728 } 1729 } 1730 1731 // drop cap expires and unlock s_cap_lock 1732 detach_cap_releases(session, &dispose); 1733 1734 BUG_ON(session->s_nr_caps > 0); 1735 BUG_ON(!list_empty(&session->s_cap_flushing)); 1736 spin_unlock(&session->s_cap_lock); 1737 dispose_cap_releases(session->s_mdsc, &dispose); 1738 } 1739 1740 enum { 1741 RECONNECT, 1742 RENEWCAPS, 1743 FORCE_RO, 1744 }; 1745 1746 /* 1747 * wake up any threads waiting on this session's caps. if the cap is 1748 * old (didn't get renewed on the client reconnect), remove it now. 1749 * 1750 * caller must hold s_mutex. 1751 */ 1752 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1753 void *arg) 1754 { 1755 struct ceph_inode_info *ci = ceph_inode(inode); 1756 unsigned long ev = (unsigned long)arg; 1757 1758 if (ev == RECONNECT) { 1759 spin_lock(&ci->i_ceph_lock); 1760 ci->i_wanted_max_size = 0; 1761 ci->i_requested_max_size = 0; 1762 spin_unlock(&ci->i_ceph_lock); 1763 } else if (ev == RENEWCAPS) { 1764 if (cap->cap_gen < cap->session->s_cap_gen) { 1765 /* mds did not re-issue stale cap */ 1766 spin_lock(&ci->i_ceph_lock); 1767 cap->issued = cap->implemented = CEPH_CAP_PIN; 1768 spin_unlock(&ci->i_ceph_lock); 1769 } 1770 } else if (ev == FORCE_RO) { 1771 } 1772 wake_up_all(&ci->i_cap_wq); 1773 return 0; 1774 } 1775 1776 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1777 { 1778 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1779 ceph_iterate_session_caps(session, wake_up_session_cb, 1780 (void *)(unsigned long)ev); 1781 } 1782 1783 /* 1784 * Send periodic message to MDS renewing all currently held caps. The 1785 * ack will reset the expiration for all caps from this session. 1786 * 1787 * caller holds s_mutex 1788 */ 1789 static int send_renew_caps(struct ceph_mds_client *mdsc, 1790 struct ceph_mds_session *session) 1791 { 1792 struct ceph_msg *msg; 1793 int state; 1794 1795 if (time_after_eq(jiffies, session->s_cap_ttl) && 1796 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1797 pr_info("mds%d caps stale\n", session->s_mds); 1798 session->s_renew_requested = jiffies; 1799 1800 /* do not try to renew caps until a recovering mds has reconnected 1801 * with its clients. */ 1802 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1803 if (state < CEPH_MDS_STATE_RECONNECT) { 1804 dout("send_renew_caps ignoring mds%d (%s)\n", 1805 session->s_mds, ceph_mds_state_name(state)); 1806 return 0; 1807 } 1808 1809 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1810 ceph_mds_state_name(state)); 1811 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1812 ++session->s_renew_seq); 1813 if (!msg) 1814 return -ENOMEM; 1815 ceph_con_send(&session->s_con, msg); 1816 return 0; 1817 } 1818 1819 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1820 struct ceph_mds_session *session, u64 seq) 1821 { 1822 struct ceph_msg *msg; 1823 1824 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1825 session->s_mds, ceph_session_state_name(session->s_state), seq); 1826 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1827 if (!msg) 1828 return -ENOMEM; 1829 ceph_con_send(&session->s_con, msg); 1830 return 0; 1831 } 1832 1833 1834 /* 1835 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1836 * 1837 * Called under session->s_mutex 1838 */ 1839 static void renewed_caps(struct ceph_mds_client *mdsc, 1840 struct ceph_mds_session *session, int is_renew) 1841 { 1842 int was_stale; 1843 int wake = 0; 1844 1845 spin_lock(&session->s_cap_lock); 1846 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1847 1848 session->s_cap_ttl = session->s_renew_requested + 1849 mdsc->mdsmap->m_session_timeout*HZ; 1850 1851 if (was_stale) { 1852 if (time_before(jiffies, session->s_cap_ttl)) { 1853 pr_info("mds%d caps renewed\n", session->s_mds); 1854 wake = 1; 1855 } else { 1856 pr_info("mds%d caps still stale\n", session->s_mds); 1857 } 1858 } 1859 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1860 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1861 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1862 spin_unlock(&session->s_cap_lock); 1863 1864 if (wake) 1865 wake_up_session_caps(session, RENEWCAPS); 1866 } 1867 1868 /* 1869 * send a session close request 1870 */ 1871 static int request_close_session(struct ceph_mds_session *session) 1872 { 1873 struct ceph_msg *msg; 1874 1875 dout("request_close_session mds%d state %s seq %lld\n", 1876 session->s_mds, ceph_session_state_name(session->s_state), 1877 session->s_seq); 1878 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1879 if (!msg) 1880 return -ENOMEM; 1881 ceph_con_send(&session->s_con, msg); 1882 return 1; 1883 } 1884 1885 /* 1886 * Called with s_mutex held. 1887 */ 1888 static int __close_session(struct ceph_mds_client *mdsc, 1889 struct ceph_mds_session *session) 1890 { 1891 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1892 return 0; 1893 session->s_state = CEPH_MDS_SESSION_CLOSING; 1894 return request_close_session(session); 1895 } 1896 1897 static bool drop_negative_children(struct dentry *dentry) 1898 { 1899 struct dentry *child; 1900 bool all_negative = true; 1901 1902 if (!d_is_dir(dentry)) 1903 goto out; 1904 1905 spin_lock(&dentry->d_lock); 1906 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1907 if (d_really_is_positive(child)) { 1908 all_negative = false; 1909 break; 1910 } 1911 } 1912 spin_unlock(&dentry->d_lock); 1913 1914 if (all_negative) 1915 shrink_dcache_parent(dentry); 1916 out: 1917 return all_negative; 1918 } 1919 1920 /* 1921 * Trim old(er) caps. 1922 * 1923 * Because we can't cache an inode without one or more caps, we do 1924 * this indirectly: if a cap is unused, we prune its aliases, at which 1925 * point the inode will hopefully get dropped to. 1926 * 1927 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1928 * memory pressure from the MDS, though, so it needn't be perfect. 1929 */ 1930 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1931 { 1932 int *remaining = arg; 1933 struct ceph_inode_info *ci = ceph_inode(inode); 1934 int used, wanted, oissued, mine; 1935 1936 if (*remaining <= 0) 1937 return -1; 1938 1939 spin_lock(&ci->i_ceph_lock); 1940 mine = cap->issued | cap->implemented; 1941 used = __ceph_caps_used(ci); 1942 wanted = __ceph_caps_file_wanted(ci); 1943 oissued = __ceph_caps_issued_other(ci, cap); 1944 1945 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1946 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1947 ceph_cap_string(used), ceph_cap_string(wanted)); 1948 if (cap == ci->i_auth_cap) { 1949 if (ci->i_dirty_caps || ci->i_flushing_caps || 1950 !list_empty(&ci->i_cap_snaps)) 1951 goto out; 1952 if ((used | wanted) & CEPH_CAP_ANY_WR) 1953 goto out; 1954 /* Note: it's possible that i_filelock_ref becomes non-zero 1955 * after dropping auth caps. It doesn't hurt because reply 1956 * of lock mds request will re-add auth caps. */ 1957 if (atomic_read(&ci->i_filelock_ref) > 0) 1958 goto out; 1959 } 1960 /* The inode has cached pages, but it's no longer used. 1961 * we can safely drop it */ 1962 if (S_ISREG(inode->i_mode) && 1963 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 1964 !(oissued & CEPH_CAP_FILE_CACHE)) { 1965 used = 0; 1966 oissued = 0; 1967 } 1968 if ((used | wanted) & ~oissued & mine) 1969 goto out; /* we need these caps */ 1970 1971 if (oissued) { 1972 /* we aren't the only cap.. just remove us */ 1973 __ceph_remove_cap(cap, true); 1974 (*remaining)--; 1975 } else { 1976 struct dentry *dentry; 1977 /* try dropping referring dentries */ 1978 spin_unlock(&ci->i_ceph_lock); 1979 dentry = d_find_any_alias(inode); 1980 if (dentry && drop_negative_children(dentry)) { 1981 int count; 1982 dput(dentry); 1983 d_prune_aliases(inode); 1984 count = atomic_read(&inode->i_count); 1985 if (count == 1) 1986 (*remaining)--; 1987 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1988 inode, cap, count); 1989 } else { 1990 dput(dentry); 1991 } 1992 return 0; 1993 } 1994 1995 out: 1996 spin_unlock(&ci->i_ceph_lock); 1997 return 0; 1998 } 1999 2000 /* 2001 * Trim session cap count down to some max number. 2002 */ 2003 int ceph_trim_caps(struct ceph_mds_client *mdsc, 2004 struct ceph_mds_session *session, 2005 int max_caps) 2006 { 2007 int trim_caps = session->s_nr_caps - max_caps; 2008 2009 dout("trim_caps mds%d start: %d / %d, trim %d\n", 2010 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 2011 if (trim_caps > 0) { 2012 int remaining = trim_caps; 2013 2014 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2015 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 2016 session->s_mds, session->s_nr_caps, max_caps, 2017 trim_caps - remaining); 2018 } 2019 2020 ceph_flush_cap_releases(mdsc, session); 2021 return 0; 2022 } 2023 2024 static int check_caps_flush(struct ceph_mds_client *mdsc, 2025 u64 want_flush_tid) 2026 { 2027 int ret = 1; 2028 2029 spin_lock(&mdsc->cap_dirty_lock); 2030 if (!list_empty(&mdsc->cap_flush_list)) { 2031 struct ceph_cap_flush *cf = 2032 list_first_entry(&mdsc->cap_flush_list, 2033 struct ceph_cap_flush, g_list); 2034 if (cf->tid <= want_flush_tid) { 2035 dout("check_caps_flush still flushing tid " 2036 "%llu <= %llu\n", cf->tid, want_flush_tid); 2037 ret = 0; 2038 } 2039 } 2040 spin_unlock(&mdsc->cap_dirty_lock); 2041 return ret; 2042 } 2043 2044 /* 2045 * flush all dirty inode data to disk. 2046 * 2047 * returns true if we've flushed through want_flush_tid 2048 */ 2049 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2050 u64 want_flush_tid) 2051 { 2052 dout("check_caps_flush want %llu\n", want_flush_tid); 2053 2054 wait_event(mdsc->cap_flushing_wq, 2055 check_caps_flush(mdsc, want_flush_tid)); 2056 2057 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 2058 } 2059 2060 /* 2061 * called under s_mutex 2062 */ 2063 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2064 struct ceph_mds_session *session) 2065 { 2066 struct ceph_msg *msg = NULL; 2067 struct ceph_mds_cap_release *head; 2068 struct ceph_mds_cap_item *item; 2069 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2070 struct ceph_cap *cap; 2071 LIST_HEAD(tmp_list); 2072 int num_cap_releases; 2073 __le32 barrier, *cap_barrier; 2074 2075 down_read(&osdc->lock); 2076 barrier = cpu_to_le32(osdc->epoch_barrier); 2077 up_read(&osdc->lock); 2078 2079 spin_lock(&session->s_cap_lock); 2080 again: 2081 list_splice_init(&session->s_cap_releases, &tmp_list); 2082 num_cap_releases = session->s_num_cap_releases; 2083 session->s_num_cap_releases = 0; 2084 spin_unlock(&session->s_cap_lock); 2085 2086 while (!list_empty(&tmp_list)) { 2087 if (!msg) { 2088 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2089 PAGE_SIZE, GFP_NOFS, false); 2090 if (!msg) 2091 goto out_err; 2092 head = msg->front.iov_base; 2093 head->num = cpu_to_le32(0); 2094 msg->front.iov_len = sizeof(*head); 2095 2096 msg->hdr.version = cpu_to_le16(2); 2097 msg->hdr.compat_version = cpu_to_le16(1); 2098 } 2099 2100 cap = list_first_entry(&tmp_list, struct ceph_cap, 2101 session_caps); 2102 list_del(&cap->session_caps); 2103 num_cap_releases--; 2104 2105 head = msg->front.iov_base; 2106 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2107 &head->num); 2108 item = msg->front.iov_base + msg->front.iov_len; 2109 item->ino = cpu_to_le64(cap->cap_ino); 2110 item->cap_id = cpu_to_le64(cap->cap_id); 2111 item->migrate_seq = cpu_to_le32(cap->mseq); 2112 item->seq = cpu_to_le32(cap->issue_seq); 2113 msg->front.iov_len += sizeof(*item); 2114 2115 ceph_put_cap(mdsc, cap); 2116 2117 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2118 // Append cap_barrier field 2119 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2120 *cap_barrier = barrier; 2121 msg->front.iov_len += sizeof(*cap_barrier); 2122 2123 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2124 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2125 ceph_con_send(&session->s_con, msg); 2126 msg = NULL; 2127 } 2128 } 2129 2130 BUG_ON(num_cap_releases != 0); 2131 2132 spin_lock(&session->s_cap_lock); 2133 if (!list_empty(&session->s_cap_releases)) 2134 goto again; 2135 spin_unlock(&session->s_cap_lock); 2136 2137 if (msg) { 2138 // Append cap_barrier field 2139 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2140 *cap_barrier = barrier; 2141 msg->front.iov_len += sizeof(*cap_barrier); 2142 2143 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2144 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2145 ceph_con_send(&session->s_con, msg); 2146 } 2147 return; 2148 out_err: 2149 pr_err("send_cap_releases mds%d, failed to allocate message\n", 2150 session->s_mds); 2151 spin_lock(&session->s_cap_lock); 2152 list_splice(&tmp_list, &session->s_cap_releases); 2153 session->s_num_cap_releases += num_cap_releases; 2154 spin_unlock(&session->s_cap_lock); 2155 } 2156 2157 static void ceph_cap_release_work(struct work_struct *work) 2158 { 2159 struct ceph_mds_session *session = 2160 container_of(work, struct ceph_mds_session, s_cap_release_work); 2161 2162 mutex_lock(&session->s_mutex); 2163 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2164 session->s_state == CEPH_MDS_SESSION_HUNG) 2165 ceph_send_cap_releases(session->s_mdsc, session); 2166 mutex_unlock(&session->s_mutex); 2167 ceph_put_mds_session(session); 2168 } 2169 2170 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 2171 struct ceph_mds_session *session) 2172 { 2173 if (mdsc->stopping) 2174 return; 2175 2176 ceph_get_mds_session(session); 2177 if (queue_work(mdsc->fsc->cap_wq, 2178 &session->s_cap_release_work)) { 2179 dout("cap release work queued\n"); 2180 } else { 2181 ceph_put_mds_session(session); 2182 dout("failed to queue cap release work\n"); 2183 } 2184 } 2185 2186 /* 2187 * caller holds session->s_cap_lock 2188 */ 2189 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2190 struct ceph_cap *cap) 2191 { 2192 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2193 session->s_num_cap_releases++; 2194 2195 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2196 ceph_flush_cap_releases(session->s_mdsc, session); 2197 } 2198 2199 static void ceph_cap_reclaim_work(struct work_struct *work) 2200 { 2201 struct ceph_mds_client *mdsc = 2202 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2203 int ret = ceph_trim_dentries(mdsc); 2204 if (ret == -EAGAIN) 2205 ceph_queue_cap_reclaim_work(mdsc); 2206 } 2207 2208 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2209 { 2210 if (mdsc->stopping) 2211 return; 2212 2213 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2214 dout("caps reclaim work queued\n"); 2215 } else { 2216 dout("failed to queue caps release work\n"); 2217 } 2218 } 2219 2220 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2221 { 2222 int val; 2223 if (!nr) 2224 return; 2225 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2226 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2227 atomic_set(&mdsc->cap_reclaim_pending, 0); 2228 ceph_queue_cap_reclaim_work(mdsc); 2229 } 2230 } 2231 2232 /* 2233 * requests 2234 */ 2235 2236 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2237 struct inode *dir) 2238 { 2239 struct ceph_inode_info *ci = ceph_inode(dir); 2240 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2241 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2242 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2243 unsigned int num_entries; 2244 int order; 2245 2246 spin_lock(&ci->i_ceph_lock); 2247 num_entries = ci->i_files + ci->i_subdirs; 2248 spin_unlock(&ci->i_ceph_lock); 2249 num_entries = max(num_entries, 1U); 2250 num_entries = min(num_entries, opt->max_readdir); 2251 2252 order = get_order(size * num_entries); 2253 while (order >= 0) { 2254 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2255 __GFP_NOWARN, 2256 order); 2257 if (rinfo->dir_entries) 2258 break; 2259 order--; 2260 } 2261 if (!rinfo->dir_entries) 2262 return -ENOMEM; 2263 2264 num_entries = (PAGE_SIZE << order) / size; 2265 num_entries = min(num_entries, opt->max_readdir); 2266 2267 rinfo->dir_buf_size = PAGE_SIZE << order; 2268 req->r_num_caps = num_entries + 1; 2269 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2270 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2271 return 0; 2272 } 2273 2274 /* 2275 * Create an mds request. 2276 */ 2277 struct ceph_mds_request * 2278 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2279 { 2280 struct ceph_mds_request *req; 2281 2282 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2283 if (!req) 2284 return ERR_PTR(-ENOMEM); 2285 2286 mutex_init(&req->r_fill_mutex); 2287 req->r_mdsc = mdsc; 2288 req->r_started = jiffies; 2289 req->r_start_latency = ktime_get(); 2290 req->r_resend_mds = -1; 2291 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2292 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2293 req->r_fmode = -1; 2294 kref_init(&req->r_kref); 2295 RB_CLEAR_NODE(&req->r_node); 2296 INIT_LIST_HEAD(&req->r_wait); 2297 init_completion(&req->r_completion); 2298 init_completion(&req->r_safe_completion); 2299 INIT_LIST_HEAD(&req->r_unsafe_item); 2300 2301 ktime_get_coarse_real_ts64(&req->r_stamp); 2302 2303 req->r_op = op; 2304 req->r_direct_mode = mode; 2305 return req; 2306 } 2307 2308 /* 2309 * return oldest (lowest) request, tid in request tree, 0 if none. 2310 * 2311 * called under mdsc->mutex. 2312 */ 2313 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2314 { 2315 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2316 return NULL; 2317 return rb_entry(rb_first(&mdsc->request_tree), 2318 struct ceph_mds_request, r_node); 2319 } 2320 2321 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2322 { 2323 return mdsc->oldest_tid; 2324 } 2325 2326 /* 2327 * Build a dentry's path. Allocate on heap; caller must kfree. Based 2328 * on build_path_from_dentry in fs/cifs/dir.c. 2329 * 2330 * If @stop_on_nosnap, generate path relative to the first non-snapped 2331 * inode. 2332 * 2333 * Encode hidden .snap dirs as a double /, i.e. 2334 * foo/.snap/bar -> foo//bar 2335 */ 2336 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, 2337 int stop_on_nosnap) 2338 { 2339 struct dentry *temp; 2340 char *path; 2341 int pos; 2342 unsigned seq; 2343 u64 base; 2344 2345 if (!dentry) 2346 return ERR_PTR(-EINVAL); 2347 2348 path = __getname(); 2349 if (!path) 2350 return ERR_PTR(-ENOMEM); 2351 retry: 2352 pos = PATH_MAX - 1; 2353 path[pos] = '\0'; 2354 2355 seq = read_seqbegin(&rename_lock); 2356 rcu_read_lock(); 2357 temp = dentry; 2358 for (;;) { 2359 struct inode *inode; 2360 2361 spin_lock(&temp->d_lock); 2362 inode = d_inode(temp); 2363 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2364 dout("build_path path+%d: %p SNAPDIR\n", 2365 pos, temp); 2366 } else if (stop_on_nosnap && inode && dentry != temp && 2367 ceph_snap(inode) == CEPH_NOSNAP) { 2368 spin_unlock(&temp->d_lock); 2369 pos++; /* get rid of any prepended '/' */ 2370 break; 2371 } else { 2372 pos -= temp->d_name.len; 2373 if (pos < 0) { 2374 spin_unlock(&temp->d_lock); 2375 break; 2376 } 2377 memcpy(path + pos, temp->d_name.name, temp->d_name.len); 2378 } 2379 spin_unlock(&temp->d_lock); 2380 temp = READ_ONCE(temp->d_parent); 2381 2382 /* Are we at the root? */ 2383 if (IS_ROOT(temp)) 2384 break; 2385 2386 /* Are we out of buffer? */ 2387 if (--pos < 0) 2388 break; 2389 2390 path[pos] = '/'; 2391 } 2392 base = ceph_ino(d_inode(temp)); 2393 rcu_read_unlock(); 2394 2395 if (read_seqretry(&rename_lock, seq)) 2396 goto retry; 2397 2398 if (pos < 0) { 2399 /* 2400 * A rename didn't occur, but somehow we didn't end up where 2401 * we thought we would. Throw a warning and try again. 2402 */ 2403 pr_warn("build_path did not end path lookup where " 2404 "expected, pos is %d\n", pos); 2405 goto retry; 2406 } 2407 2408 *pbase = base; 2409 *plen = PATH_MAX - 1 - pos; 2410 dout("build_path on %p %d built %llx '%.*s'\n", 2411 dentry, d_count(dentry), base, *plen, path + pos); 2412 return path + pos; 2413 } 2414 2415 static int build_dentry_path(struct dentry *dentry, struct inode *dir, 2416 const char **ppath, int *ppathlen, u64 *pino, 2417 bool *pfreepath, bool parent_locked) 2418 { 2419 char *path; 2420 2421 rcu_read_lock(); 2422 if (!dir) 2423 dir = d_inode_rcu(dentry->d_parent); 2424 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { 2425 *pino = ceph_ino(dir); 2426 rcu_read_unlock(); 2427 *ppath = dentry->d_name.name; 2428 *ppathlen = dentry->d_name.len; 2429 return 0; 2430 } 2431 rcu_read_unlock(); 2432 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2433 if (IS_ERR(path)) 2434 return PTR_ERR(path); 2435 *ppath = path; 2436 *pfreepath = true; 2437 return 0; 2438 } 2439 2440 static int build_inode_path(struct inode *inode, 2441 const char **ppath, int *ppathlen, u64 *pino, 2442 bool *pfreepath) 2443 { 2444 struct dentry *dentry; 2445 char *path; 2446 2447 if (ceph_snap(inode) == CEPH_NOSNAP) { 2448 *pino = ceph_ino(inode); 2449 *ppathlen = 0; 2450 return 0; 2451 } 2452 dentry = d_find_alias(inode); 2453 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2454 dput(dentry); 2455 if (IS_ERR(path)) 2456 return PTR_ERR(path); 2457 *ppath = path; 2458 *pfreepath = true; 2459 return 0; 2460 } 2461 2462 /* 2463 * request arguments may be specified via an inode *, a dentry *, or 2464 * an explicit ino+path. 2465 */ 2466 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2467 struct inode *rdiri, const char *rpath, 2468 u64 rino, const char **ppath, int *pathlen, 2469 u64 *ino, bool *freepath, bool parent_locked) 2470 { 2471 int r = 0; 2472 2473 if (rinode) { 2474 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2475 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2476 ceph_snap(rinode)); 2477 } else if (rdentry) { 2478 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2479 freepath, parent_locked); 2480 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2481 *ppath); 2482 } else if (rpath || rino) { 2483 *ino = rino; 2484 *ppath = rpath; 2485 *pathlen = rpath ? strlen(rpath) : 0; 2486 dout(" path %.*s\n", *pathlen, rpath); 2487 } 2488 2489 return r; 2490 } 2491 2492 static void encode_timestamp_and_gids(void **p, 2493 const struct ceph_mds_request *req) 2494 { 2495 struct ceph_timespec ts; 2496 int i; 2497 2498 ceph_encode_timespec64(&ts, &req->r_stamp); 2499 ceph_encode_copy(p, &ts, sizeof(ts)); 2500 2501 /* gid_list */ 2502 ceph_encode_32(p, req->r_cred->group_info->ngroups); 2503 for (i = 0; i < req->r_cred->group_info->ngroups; i++) 2504 ceph_encode_64(p, from_kgid(&init_user_ns, 2505 req->r_cred->group_info->gid[i])); 2506 } 2507 2508 /* 2509 * called under mdsc->mutex 2510 */ 2511 static struct ceph_msg *create_request_message(struct ceph_mds_session *session, 2512 struct ceph_mds_request *req, 2513 bool drop_cap_releases) 2514 { 2515 int mds = session->s_mds; 2516 struct ceph_mds_client *mdsc = session->s_mdsc; 2517 struct ceph_msg *msg; 2518 struct ceph_mds_request_head_old *head; 2519 const char *path1 = NULL; 2520 const char *path2 = NULL; 2521 u64 ino1 = 0, ino2 = 0; 2522 int pathlen1 = 0, pathlen2 = 0; 2523 bool freepath1 = false, freepath2 = false; 2524 int len; 2525 u16 releases; 2526 void *p, *end; 2527 int ret; 2528 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); 2529 2530 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2531 req->r_parent, req->r_path1, req->r_ino1.ino, 2532 &path1, &pathlen1, &ino1, &freepath1, 2533 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2534 &req->r_req_flags)); 2535 if (ret < 0) { 2536 msg = ERR_PTR(ret); 2537 goto out; 2538 } 2539 2540 /* If r_old_dentry is set, then assume that its parent is locked */ 2541 ret = set_request_path_attr(NULL, req->r_old_dentry, 2542 req->r_old_dentry_dir, 2543 req->r_path2, req->r_ino2.ino, 2544 &path2, &pathlen2, &ino2, &freepath2, true); 2545 if (ret < 0) { 2546 msg = ERR_PTR(ret); 2547 goto out_free1; 2548 } 2549 2550 len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head); 2551 len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2552 sizeof(struct ceph_timespec); 2553 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); 2554 2555 /* calculate (max) length for cap releases */ 2556 len += sizeof(struct ceph_mds_request_release) * 2557 (!!req->r_inode_drop + !!req->r_dentry_drop + 2558 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2559 2560 if (req->r_dentry_drop) 2561 len += pathlen1; 2562 if (req->r_old_dentry_drop) 2563 len += pathlen2; 2564 2565 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2566 if (!msg) { 2567 msg = ERR_PTR(-ENOMEM); 2568 goto out_free2; 2569 } 2570 2571 msg->hdr.tid = cpu_to_le64(req->r_tid); 2572 2573 /* 2574 * The old ceph_mds_request_head didn't contain a version field, and 2575 * one was added when we moved the message version from 3->4. 2576 */ 2577 if (legacy) { 2578 msg->hdr.version = cpu_to_le16(3); 2579 head = msg->front.iov_base; 2580 p = msg->front.iov_base + sizeof(*head); 2581 } else { 2582 struct ceph_mds_request_head *new_head = msg->front.iov_base; 2583 2584 msg->hdr.version = cpu_to_le16(4); 2585 new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); 2586 head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2587 p = msg->front.iov_base + sizeof(*new_head); 2588 } 2589 2590 end = msg->front.iov_base + msg->front.iov_len; 2591 2592 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2593 head->op = cpu_to_le32(req->r_op); 2594 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, 2595 req->r_cred->fsuid)); 2596 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, 2597 req->r_cred->fsgid)); 2598 head->ino = cpu_to_le64(req->r_deleg_ino); 2599 head->args = req->r_args; 2600 2601 ceph_encode_filepath(&p, end, ino1, path1); 2602 ceph_encode_filepath(&p, end, ino2, path2); 2603 2604 /* make note of release offset, in case we need to replay */ 2605 req->r_request_release_offset = p - msg->front.iov_base; 2606 2607 /* cap releases */ 2608 releases = 0; 2609 if (req->r_inode_drop) 2610 releases += ceph_encode_inode_release(&p, 2611 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2612 mds, req->r_inode_drop, req->r_inode_unless, 2613 req->r_op == CEPH_MDS_OP_READDIR); 2614 if (req->r_dentry_drop) 2615 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2616 req->r_parent, mds, req->r_dentry_drop, 2617 req->r_dentry_unless); 2618 if (req->r_old_dentry_drop) 2619 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2620 req->r_old_dentry_dir, mds, 2621 req->r_old_dentry_drop, 2622 req->r_old_dentry_unless); 2623 if (req->r_old_inode_drop) 2624 releases += ceph_encode_inode_release(&p, 2625 d_inode(req->r_old_dentry), 2626 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2627 2628 if (drop_cap_releases) { 2629 releases = 0; 2630 p = msg->front.iov_base + req->r_request_release_offset; 2631 } 2632 2633 head->num_releases = cpu_to_le16(releases); 2634 2635 encode_timestamp_and_gids(&p, req); 2636 2637 if (WARN_ON_ONCE(p > end)) { 2638 ceph_msg_put(msg); 2639 msg = ERR_PTR(-ERANGE); 2640 goto out_free2; 2641 } 2642 2643 msg->front.iov_len = p - msg->front.iov_base; 2644 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2645 2646 if (req->r_pagelist) { 2647 struct ceph_pagelist *pagelist = req->r_pagelist; 2648 ceph_msg_data_add_pagelist(msg, pagelist); 2649 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2650 } else { 2651 msg->hdr.data_len = 0; 2652 } 2653 2654 msg->hdr.data_off = cpu_to_le16(0); 2655 2656 out_free2: 2657 if (freepath2) 2658 ceph_mdsc_free_path((char *)path2, pathlen2); 2659 out_free1: 2660 if (freepath1) 2661 ceph_mdsc_free_path((char *)path1, pathlen1); 2662 out: 2663 return msg; 2664 } 2665 2666 /* 2667 * called under mdsc->mutex if error, under no mutex if 2668 * success. 2669 */ 2670 static void complete_request(struct ceph_mds_client *mdsc, 2671 struct ceph_mds_request *req) 2672 { 2673 req->r_end_latency = ktime_get(); 2674 2675 if (req->r_callback) 2676 req->r_callback(mdsc, req); 2677 complete_all(&req->r_completion); 2678 } 2679 2680 static struct ceph_mds_request_head_old * 2681 find_old_request_head(void *p, u64 features) 2682 { 2683 bool legacy = !(features & CEPH_FEATURE_FS_BTIME); 2684 struct ceph_mds_request_head *new_head; 2685 2686 if (legacy) 2687 return (struct ceph_mds_request_head_old *)p; 2688 new_head = (struct ceph_mds_request_head *)p; 2689 return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2690 } 2691 2692 /* 2693 * called under mdsc->mutex 2694 */ 2695 static int __prepare_send_request(struct ceph_mds_session *session, 2696 struct ceph_mds_request *req, 2697 bool drop_cap_releases) 2698 { 2699 int mds = session->s_mds; 2700 struct ceph_mds_client *mdsc = session->s_mdsc; 2701 struct ceph_mds_request_head_old *rhead; 2702 struct ceph_msg *msg; 2703 int flags = 0; 2704 2705 req->r_attempts++; 2706 if (req->r_inode) { 2707 struct ceph_cap *cap = 2708 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2709 2710 if (cap) 2711 req->r_sent_on_mseq = cap->mseq; 2712 else 2713 req->r_sent_on_mseq = -1; 2714 } 2715 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 2716 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2717 2718 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2719 void *p; 2720 2721 /* 2722 * Replay. Do not regenerate message (and rebuild 2723 * paths, etc.); just use the original message. 2724 * Rebuilding paths will break for renames because 2725 * d_move mangles the src name. 2726 */ 2727 msg = req->r_request; 2728 rhead = find_old_request_head(msg->front.iov_base, 2729 session->s_con.peer_features); 2730 2731 flags = le32_to_cpu(rhead->flags); 2732 flags |= CEPH_MDS_FLAG_REPLAY; 2733 rhead->flags = cpu_to_le32(flags); 2734 2735 if (req->r_target_inode) 2736 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2737 2738 rhead->num_retry = req->r_attempts - 1; 2739 2740 /* remove cap/dentry releases from message */ 2741 rhead->num_releases = 0; 2742 2743 p = msg->front.iov_base + req->r_request_release_offset; 2744 encode_timestamp_and_gids(&p, req); 2745 2746 msg->front.iov_len = p - msg->front.iov_base; 2747 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2748 return 0; 2749 } 2750 2751 if (req->r_request) { 2752 ceph_msg_put(req->r_request); 2753 req->r_request = NULL; 2754 } 2755 msg = create_request_message(session, req, drop_cap_releases); 2756 if (IS_ERR(msg)) { 2757 req->r_err = PTR_ERR(msg); 2758 return PTR_ERR(msg); 2759 } 2760 req->r_request = msg; 2761 2762 rhead = find_old_request_head(msg->front.iov_base, 2763 session->s_con.peer_features); 2764 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2765 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2766 flags |= CEPH_MDS_FLAG_REPLAY; 2767 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 2768 flags |= CEPH_MDS_FLAG_ASYNC; 2769 if (req->r_parent) 2770 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2771 rhead->flags = cpu_to_le32(flags); 2772 rhead->num_fwd = req->r_num_fwd; 2773 rhead->num_retry = req->r_attempts - 1; 2774 2775 dout(" r_parent = %p\n", req->r_parent); 2776 return 0; 2777 } 2778 2779 /* 2780 * called under mdsc->mutex 2781 */ 2782 static int __send_request(struct ceph_mds_session *session, 2783 struct ceph_mds_request *req, 2784 bool drop_cap_releases) 2785 { 2786 int err; 2787 2788 err = __prepare_send_request(session, req, drop_cap_releases); 2789 if (!err) { 2790 ceph_msg_get(req->r_request); 2791 ceph_con_send(&session->s_con, req->r_request); 2792 } 2793 2794 return err; 2795 } 2796 2797 /* 2798 * send request, or put it on the appropriate wait list. 2799 */ 2800 static void __do_request(struct ceph_mds_client *mdsc, 2801 struct ceph_mds_request *req) 2802 { 2803 struct ceph_mds_session *session = NULL; 2804 int mds = -1; 2805 int err = 0; 2806 bool random; 2807 2808 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2809 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2810 __unregister_request(mdsc, req); 2811 return; 2812 } 2813 2814 if (req->r_timeout && 2815 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2816 dout("do_request timed out\n"); 2817 err = -ETIMEDOUT; 2818 goto finish; 2819 } 2820 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2821 dout("do_request forced umount\n"); 2822 err = -EIO; 2823 goto finish; 2824 } 2825 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2826 if (mdsc->mdsmap_err) { 2827 err = mdsc->mdsmap_err; 2828 dout("do_request mdsmap err %d\n", err); 2829 goto finish; 2830 } 2831 if (mdsc->mdsmap->m_epoch == 0) { 2832 dout("do_request no mdsmap, waiting for map\n"); 2833 list_add(&req->r_wait, &mdsc->waiting_for_map); 2834 return; 2835 } 2836 if (!(mdsc->fsc->mount_options->flags & 2837 CEPH_MOUNT_OPT_MOUNTWAIT) && 2838 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2839 err = -EHOSTUNREACH; 2840 goto finish; 2841 } 2842 } 2843 2844 put_request_session(req); 2845 2846 mds = __choose_mds(mdsc, req, &random); 2847 if (mds < 0 || 2848 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2849 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2850 err = -EJUKEBOX; 2851 goto finish; 2852 } 2853 dout("do_request no mds or not active, waiting for map\n"); 2854 list_add(&req->r_wait, &mdsc->waiting_for_map); 2855 return; 2856 } 2857 2858 /* get, open session */ 2859 session = __ceph_lookup_mds_session(mdsc, mds); 2860 if (!session) { 2861 session = register_session(mdsc, mds); 2862 if (IS_ERR(session)) { 2863 err = PTR_ERR(session); 2864 goto finish; 2865 } 2866 } 2867 req->r_session = ceph_get_mds_session(session); 2868 2869 dout("do_request mds%d session %p state %s\n", mds, session, 2870 ceph_session_state_name(session->s_state)); 2871 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2872 session->s_state != CEPH_MDS_SESSION_HUNG) { 2873 /* 2874 * We cannot queue async requests since the caps and delegated 2875 * inodes are bound to the session. Just return -EJUKEBOX and 2876 * let the caller retry a sync request in that case. 2877 */ 2878 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2879 err = -EJUKEBOX; 2880 goto out_session; 2881 } 2882 2883 /* 2884 * If the session has been REJECTED, then return a hard error, 2885 * unless it's a CLEANRECOVER mount, in which case we'll queue 2886 * it to the mdsc queue. 2887 */ 2888 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2889 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) 2890 list_add(&req->r_wait, &mdsc->waiting_for_map); 2891 else 2892 err = -EACCES; 2893 goto out_session; 2894 } 2895 2896 if (session->s_state == CEPH_MDS_SESSION_NEW || 2897 session->s_state == CEPH_MDS_SESSION_CLOSING) { 2898 err = __open_session(mdsc, session); 2899 if (err) 2900 goto out_session; 2901 /* retry the same mds later */ 2902 if (random) 2903 req->r_resend_mds = mds; 2904 } 2905 list_add(&req->r_wait, &session->s_waiting); 2906 goto out_session; 2907 } 2908 2909 /* send request */ 2910 req->r_resend_mds = -1; /* forget any previous mds hint */ 2911 2912 if (req->r_request_started == 0) /* note request start time */ 2913 req->r_request_started = jiffies; 2914 2915 err = __send_request(session, req, false); 2916 2917 out_session: 2918 ceph_put_mds_session(session); 2919 finish: 2920 if (err) { 2921 dout("__do_request early error %d\n", err); 2922 req->r_err = err; 2923 complete_request(mdsc, req); 2924 __unregister_request(mdsc, req); 2925 } 2926 return; 2927 } 2928 2929 /* 2930 * called under mdsc->mutex 2931 */ 2932 static void __wake_requests(struct ceph_mds_client *mdsc, 2933 struct list_head *head) 2934 { 2935 struct ceph_mds_request *req; 2936 LIST_HEAD(tmp_list); 2937 2938 list_splice_init(head, &tmp_list); 2939 2940 while (!list_empty(&tmp_list)) { 2941 req = list_entry(tmp_list.next, 2942 struct ceph_mds_request, r_wait); 2943 list_del_init(&req->r_wait); 2944 dout(" wake request %p tid %llu\n", req, req->r_tid); 2945 __do_request(mdsc, req); 2946 } 2947 } 2948 2949 /* 2950 * Wake up threads with requests pending for @mds, so that they can 2951 * resubmit their requests to a possibly different mds. 2952 */ 2953 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 2954 { 2955 struct ceph_mds_request *req; 2956 struct rb_node *p = rb_first(&mdsc->request_tree); 2957 2958 dout("kick_requests mds%d\n", mds); 2959 while (p) { 2960 req = rb_entry(p, struct ceph_mds_request, r_node); 2961 p = rb_next(p); 2962 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2963 continue; 2964 if (req->r_attempts > 0) 2965 continue; /* only new requests */ 2966 if (req->r_session && 2967 req->r_session->s_mds == mds) { 2968 dout(" kicking tid %llu\n", req->r_tid); 2969 list_del_init(&req->r_wait); 2970 __do_request(mdsc, req); 2971 } 2972 } 2973 } 2974 2975 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 2976 struct ceph_mds_request *req) 2977 { 2978 int err = 0; 2979 2980 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 2981 if (req->r_inode) 2982 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2983 if (req->r_parent) { 2984 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 2985 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 2986 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 2987 spin_lock(&ci->i_ceph_lock); 2988 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 2989 __ceph_touch_fmode(ci, mdsc, fmode); 2990 spin_unlock(&ci->i_ceph_lock); 2991 ihold(req->r_parent); 2992 } 2993 if (req->r_old_dentry_dir) 2994 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2995 CEPH_CAP_PIN); 2996 2997 if (req->r_inode) { 2998 err = ceph_wait_on_async_create(req->r_inode); 2999 if (err) { 3000 dout("%s: wait for async create returned: %d\n", 3001 __func__, err); 3002 return err; 3003 } 3004 } 3005 3006 if (!err && req->r_old_inode) { 3007 err = ceph_wait_on_async_create(req->r_old_inode); 3008 if (err) { 3009 dout("%s: wait for async create returned: %d\n", 3010 __func__, err); 3011 return err; 3012 } 3013 } 3014 3015 dout("submit_request on %p for inode %p\n", req, dir); 3016 mutex_lock(&mdsc->mutex); 3017 __register_request(mdsc, req, dir); 3018 __do_request(mdsc, req); 3019 err = req->r_err; 3020 mutex_unlock(&mdsc->mutex); 3021 return err; 3022 } 3023 3024 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 3025 struct ceph_mds_request *req) 3026 { 3027 int err; 3028 3029 /* wait */ 3030 dout("do_request waiting\n"); 3031 if (!req->r_timeout && req->r_wait_for_completion) { 3032 err = req->r_wait_for_completion(mdsc, req); 3033 } else { 3034 long timeleft = wait_for_completion_killable_timeout( 3035 &req->r_completion, 3036 ceph_timeout_jiffies(req->r_timeout)); 3037 if (timeleft > 0) 3038 err = 0; 3039 else if (!timeleft) 3040 err = -ETIMEDOUT; /* timed out */ 3041 else 3042 err = timeleft; /* killed */ 3043 } 3044 dout("do_request waited, got %d\n", err); 3045 mutex_lock(&mdsc->mutex); 3046 3047 /* only abort if we didn't race with a real reply */ 3048 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3049 err = le32_to_cpu(req->r_reply_info.head->result); 3050 } else if (err < 0) { 3051 dout("aborted request %lld with %d\n", req->r_tid, err); 3052 3053 /* 3054 * ensure we aren't running concurrently with 3055 * ceph_fill_trace or ceph_readdir_prepopulate, which 3056 * rely on locks (dir mutex) held by our caller. 3057 */ 3058 mutex_lock(&req->r_fill_mutex); 3059 req->r_err = err; 3060 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3061 mutex_unlock(&req->r_fill_mutex); 3062 3063 if (req->r_parent && 3064 (req->r_op & CEPH_MDS_OP_WRITE)) 3065 ceph_invalidate_dir_request(req); 3066 } else { 3067 err = req->r_err; 3068 } 3069 3070 mutex_unlock(&mdsc->mutex); 3071 return err; 3072 } 3073 3074 /* 3075 * Synchrously perform an mds request. Take care of all of the 3076 * session setup, forwarding, retry details. 3077 */ 3078 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3079 struct inode *dir, 3080 struct ceph_mds_request *req) 3081 { 3082 int err; 3083 3084 dout("do_request on %p\n", req); 3085 3086 /* issue */ 3087 err = ceph_mdsc_submit_request(mdsc, dir, req); 3088 if (!err) 3089 err = ceph_mdsc_wait_request(mdsc, req); 3090 dout("do_request %p done, result %d\n", req, err); 3091 return err; 3092 } 3093 3094 /* 3095 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3096 * namespace request. 3097 */ 3098 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3099 { 3100 struct inode *dir = req->r_parent; 3101 struct inode *old_dir = req->r_old_dentry_dir; 3102 3103 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 3104 3105 ceph_dir_clear_complete(dir); 3106 if (old_dir) 3107 ceph_dir_clear_complete(old_dir); 3108 if (req->r_dentry) 3109 ceph_invalidate_dentry_lease(req->r_dentry); 3110 if (req->r_old_dentry) 3111 ceph_invalidate_dentry_lease(req->r_old_dentry); 3112 } 3113 3114 /* 3115 * Handle mds reply. 3116 * 3117 * We take the session mutex and parse and process the reply immediately. 3118 * This preserves the logical ordering of replies, capabilities, etc., sent 3119 * by the MDS as they are applied to our local cache. 3120 */ 3121 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3122 { 3123 struct ceph_mds_client *mdsc = session->s_mdsc; 3124 struct ceph_mds_request *req; 3125 struct ceph_mds_reply_head *head = msg->front.iov_base; 3126 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3127 struct ceph_snap_realm *realm; 3128 u64 tid; 3129 int err, result; 3130 int mds = session->s_mds; 3131 3132 if (msg->front.iov_len < sizeof(*head)) { 3133 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 3134 ceph_msg_dump(msg); 3135 return; 3136 } 3137 3138 /* get request, session */ 3139 tid = le64_to_cpu(msg->hdr.tid); 3140 mutex_lock(&mdsc->mutex); 3141 req = lookup_get_request(mdsc, tid); 3142 if (!req) { 3143 dout("handle_reply on unknown tid %llu\n", tid); 3144 mutex_unlock(&mdsc->mutex); 3145 return; 3146 } 3147 dout("handle_reply %p\n", req); 3148 3149 /* correct session? */ 3150 if (req->r_session != session) { 3151 pr_err("mdsc_handle_reply got %llu on session mds%d" 3152 " not mds%d\n", tid, session->s_mds, 3153 req->r_session ? req->r_session->s_mds : -1); 3154 mutex_unlock(&mdsc->mutex); 3155 goto out; 3156 } 3157 3158 /* dup? */ 3159 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3160 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3161 pr_warn("got a dup %s reply on %llu from mds%d\n", 3162 head->safe ? "safe" : "unsafe", tid, mds); 3163 mutex_unlock(&mdsc->mutex); 3164 goto out; 3165 } 3166 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3167 pr_warn("got unsafe after safe on %llu from mds%d\n", 3168 tid, mds); 3169 mutex_unlock(&mdsc->mutex); 3170 goto out; 3171 } 3172 3173 result = le32_to_cpu(head->result); 3174 3175 /* 3176 * Handle an ESTALE 3177 * if we're not talking to the authority, send to them 3178 * if the authority has changed while we weren't looking, 3179 * send to new authority 3180 * Otherwise we just have to return an ESTALE 3181 */ 3182 if (result == -ESTALE) { 3183 dout("got ESTALE on request %llu\n", req->r_tid); 3184 req->r_resend_mds = -1; 3185 if (req->r_direct_mode != USE_AUTH_MDS) { 3186 dout("not using auth, setting for that now\n"); 3187 req->r_direct_mode = USE_AUTH_MDS; 3188 __do_request(mdsc, req); 3189 mutex_unlock(&mdsc->mutex); 3190 goto out; 3191 } else { 3192 int mds = __choose_mds(mdsc, req, NULL); 3193 if (mds >= 0 && mds != req->r_session->s_mds) { 3194 dout("but auth changed, so resending\n"); 3195 __do_request(mdsc, req); 3196 mutex_unlock(&mdsc->mutex); 3197 goto out; 3198 } 3199 } 3200 dout("have to return ESTALE on request %llu\n", req->r_tid); 3201 } 3202 3203 3204 if (head->safe) { 3205 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3206 __unregister_request(mdsc, req); 3207 3208 /* last request during umount? */ 3209 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3210 complete_all(&mdsc->safe_umount_waiters); 3211 3212 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3213 /* 3214 * We already handled the unsafe response, now do the 3215 * cleanup. No need to examine the response; the MDS 3216 * doesn't include any result info in the safe 3217 * response. And even if it did, there is nothing 3218 * useful we could do with a revised return value. 3219 */ 3220 dout("got safe reply %llu, mds%d\n", tid, mds); 3221 3222 mutex_unlock(&mdsc->mutex); 3223 goto out; 3224 } 3225 } else { 3226 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3227 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3228 } 3229 3230 dout("handle_reply tid %lld result %d\n", tid, result); 3231 rinfo = &req->r_reply_info; 3232 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3233 err = parse_reply_info(session, msg, rinfo, (u64)-1); 3234 else 3235 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); 3236 mutex_unlock(&mdsc->mutex); 3237 3238 /* Must find target inode outside of mutexes to avoid deadlocks */ 3239 if ((err >= 0) && rinfo->head->is_target) { 3240 struct inode *in; 3241 struct ceph_vino tvino = { 3242 .ino = le64_to_cpu(rinfo->targeti.in->ino), 3243 .snap = le64_to_cpu(rinfo->targeti.in->snapid) 3244 }; 3245 3246 in = ceph_get_inode(mdsc->fsc->sb, tvino); 3247 if (IS_ERR(in)) { 3248 err = PTR_ERR(in); 3249 mutex_lock(&session->s_mutex); 3250 goto out_err; 3251 } 3252 req->r_target_inode = in; 3253 } 3254 3255 mutex_lock(&session->s_mutex); 3256 if (err < 0) { 3257 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 3258 ceph_msg_dump(msg); 3259 goto out_err; 3260 } 3261 3262 /* snap trace */ 3263 realm = NULL; 3264 if (rinfo->snapblob_len) { 3265 down_write(&mdsc->snap_rwsem); 3266 ceph_update_snap_trace(mdsc, rinfo->snapblob, 3267 rinfo->snapblob + rinfo->snapblob_len, 3268 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3269 &realm); 3270 downgrade_write(&mdsc->snap_rwsem); 3271 } else { 3272 down_read(&mdsc->snap_rwsem); 3273 } 3274 3275 /* insert trace into our cache */ 3276 mutex_lock(&req->r_fill_mutex); 3277 current->journal_info = req; 3278 err = ceph_fill_trace(mdsc->fsc->sb, req); 3279 if (err == 0) { 3280 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3281 req->r_op == CEPH_MDS_OP_LSSNAP)) 3282 ceph_readdir_prepopulate(req, req->r_session); 3283 } 3284 current->journal_info = NULL; 3285 mutex_unlock(&req->r_fill_mutex); 3286 3287 up_read(&mdsc->snap_rwsem); 3288 if (realm) 3289 ceph_put_snap_realm(mdsc, realm); 3290 3291 if (err == 0) { 3292 if (req->r_target_inode && 3293 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3294 struct ceph_inode_info *ci = 3295 ceph_inode(req->r_target_inode); 3296 spin_lock(&ci->i_unsafe_lock); 3297 list_add_tail(&req->r_unsafe_target_item, 3298 &ci->i_unsafe_iops); 3299 spin_unlock(&ci->i_unsafe_lock); 3300 } 3301 3302 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3303 } 3304 out_err: 3305 mutex_lock(&mdsc->mutex); 3306 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3307 if (err) { 3308 req->r_err = err; 3309 } else { 3310 req->r_reply = ceph_msg_get(msg); 3311 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3312 } 3313 } else { 3314 dout("reply arrived after request %lld was aborted\n", tid); 3315 } 3316 mutex_unlock(&mdsc->mutex); 3317 3318 mutex_unlock(&session->s_mutex); 3319 3320 /* kick calling process */ 3321 complete_request(mdsc, req); 3322 3323 ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency, 3324 req->r_end_latency, err); 3325 out: 3326 ceph_mdsc_put_request(req); 3327 return; 3328 } 3329 3330 3331 3332 /* 3333 * handle mds notification that our request has been forwarded. 3334 */ 3335 static void handle_forward(struct ceph_mds_client *mdsc, 3336 struct ceph_mds_session *session, 3337 struct ceph_msg *msg) 3338 { 3339 struct ceph_mds_request *req; 3340 u64 tid = le64_to_cpu(msg->hdr.tid); 3341 u32 next_mds; 3342 u32 fwd_seq; 3343 int err = -EINVAL; 3344 void *p = msg->front.iov_base; 3345 void *end = p + msg->front.iov_len; 3346 3347 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3348 next_mds = ceph_decode_32(&p); 3349 fwd_seq = ceph_decode_32(&p); 3350 3351 mutex_lock(&mdsc->mutex); 3352 req = lookup_get_request(mdsc, tid); 3353 if (!req) { 3354 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 3355 goto out; /* dup reply? */ 3356 } 3357 3358 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3359 dout("forward tid %llu aborted, unregistering\n", tid); 3360 __unregister_request(mdsc, req); 3361 } else if (fwd_seq <= req->r_num_fwd) { 3362 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 3363 tid, next_mds, req->r_num_fwd, fwd_seq); 3364 } else { 3365 /* resend. forward race not possible; mds would drop */ 3366 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3367 BUG_ON(req->r_err); 3368 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3369 req->r_attempts = 0; 3370 req->r_num_fwd = fwd_seq; 3371 req->r_resend_mds = next_mds; 3372 put_request_session(req); 3373 __do_request(mdsc, req); 3374 } 3375 ceph_mdsc_put_request(req); 3376 out: 3377 mutex_unlock(&mdsc->mutex); 3378 return; 3379 3380 bad: 3381 pr_err("mdsc_handle_forward decode error err=%d\n", err); 3382 } 3383 3384 static int __decode_session_metadata(void **p, void *end, 3385 bool *blocklisted) 3386 { 3387 /* map<string,string> */ 3388 u32 n; 3389 bool err_str; 3390 ceph_decode_32_safe(p, end, n, bad); 3391 while (n-- > 0) { 3392 u32 len; 3393 ceph_decode_32_safe(p, end, len, bad); 3394 ceph_decode_need(p, end, len, bad); 3395 err_str = !strncmp(*p, "error_string", len); 3396 *p += len; 3397 ceph_decode_32_safe(p, end, len, bad); 3398 ceph_decode_need(p, end, len, bad); 3399 /* 3400 * Match "blocklisted (blacklisted)" from newer MDSes, 3401 * or "blacklisted" from older MDSes. 3402 */ 3403 if (err_str && strnstr(*p, "blacklisted", len)) 3404 *blocklisted = true; 3405 *p += len; 3406 } 3407 return 0; 3408 bad: 3409 return -1; 3410 } 3411 3412 /* 3413 * handle a mds session control message 3414 */ 3415 static void handle_session(struct ceph_mds_session *session, 3416 struct ceph_msg *msg) 3417 { 3418 struct ceph_mds_client *mdsc = session->s_mdsc; 3419 int mds = session->s_mds; 3420 int msg_version = le16_to_cpu(msg->hdr.version); 3421 void *p = msg->front.iov_base; 3422 void *end = p + msg->front.iov_len; 3423 struct ceph_mds_session_head *h; 3424 u32 op; 3425 u64 seq, features = 0; 3426 int wake = 0; 3427 bool blocklisted = false; 3428 3429 /* decode */ 3430 ceph_decode_need(&p, end, sizeof(*h), bad); 3431 h = p; 3432 p += sizeof(*h); 3433 3434 op = le32_to_cpu(h->op); 3435 seq = le64_to_cpu(h->seq); 3436 3437 if (msg_version >= 3) { 3438 u32 len; 3439 /* version >= 2, metadata */ 3440 if (__decode_session_metadata(&p, end, &blocklisted) < 0) 3441 goto bad; 3442 /* version >= 3, feature bits */ 3443 ceph_decode_32_safe(&p, end, len, bad); 3444 if (len) { 3445 ceph_decode_64_safe(&p, end, features, bad); 3446 p += len - sizeof(features); 3447 } 3448 } 3449 3450 mutex_lock(&mdsc->mutex); 3451 if (op == CEPH_SESSION_CLOSE) { 3452 ceph_get_mds_session(session); 3453 __unregister_session(mdsc, session); 3454 } 3455 /* FIXME: this ttl calculation is generous */ 3456 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 3457 mutex_unlock(&mdsc->mutex); 3458 3459 mutex_lock(&session->s_mutex); 3460 3461 dout("handle_session mds%d %s %p state %s seq %llu\n", 3462 mds, ceph_session_op_name(op), session, 3463 ceph_session_state_name(session->s_state), seq); 3464 3465 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 3466 session->s_state = CEPH_MDS_SESSION_OPEN; 3467 pr_info("mds%d came back\n", session->s_mds); 3468 } 3469 3470 switch (op) { 3471 case CEPH_SESSION_OPEN: 3472 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3473 pr_info("mds%d reconnect success\n", session->s_mds); 3474 session->s_state = CEPH_MDS_SESSION_OPEN; 3475 session->s_features = features; 3476 renewed_caps(mdsc, session, 0); 3477 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features)) 3478 metric_schedule_delayed(&mdsc->metric); 3479 wake = 1; 3480 if (mdsc->stopping) 3481 __close_session(mdsc, session); 3482 break; 3483 3484 case CEPH_SESSION_RENEWCAPS: 3485 if (session->s_renew_seq == seq) 3486 renewed_caps(mdsc, session, 1); 3487 break; 3488 3489 case CEPH_SESSION_CLOSE: 3490 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3491 pr_info("mds%d reconnect denied\n", session->s_mds); 3492 session->s_state = CEPH_MDS_SESSION_CLOSED; 3493 cleanup_session_requests(mdsc, session); 3494 remove_session_caps(session); 3495 wake = 2; /* for good measure */ 3496 wake_up_all(&mdsc->session_close_wq); 3497 break; 3498 3499 case CEPH_SESSION_STALE: 3500 pr_info("mds%d caps went stale, renewing\n", 3501 session->s_mds); 3502 spin_lock(&session->s_gen_ttl_lock); 3503 session->s_cap_gen++; 3504 session->s_cap_ttl = jiffies - 1; 3505 spin_unlock(&session->s_gen_ttl_lock); 3506 send_renew_caps(mdsc, session); 3507 break; 3508 3509 case CEPH_SESSION_RECALL_STATE: 3510 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 3511 break; 3512 3513 case CEPH_SESSION_FLUSHMSG: 3514 send_flushmsg_ack(mdsc, session, seq); 3515 break; 3516 3517 case CEPH_SESSION_FORCE_RO: 3518 dout("force_session_readonly %p\n", session); 3519 spin_lock(&session->s_cap_lock); 3520 session->s_readonly = true; 3521 spin_unlock(&session->s_cap_lock); 3522 wake_up_session_caps(session, FORCE_RO); 3523 break; 3524 3525 case CEPH_SESSION_REJECT: 3526 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 3527 pr_info("mds%d rejected session\n", session->s_mds); 3528 session->s_state = CEPH_MDS_SESSION_REJECTED; 3529 cleanup_session_requests(mdsc, session); 3530 remove_session_caps(session); 3531 if (blocklisted) 3532 mdsc->fsc->blocklisted = true; 3533 wake = 2; /* for good measure */ 3534 break; 3535 3536 default: 3537 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 3538 WARN_ON(1); 3539 } 3540 3541 mutex_unlock(&session->s_mutex); 3542 if (wake) { 3543 mutex_lock(&mdsc->mutex); 3544 __wake_requests(mdsc, &session->s_waiting); 3545 if (wake == 2) 3546 kick_requests(mdsc, mds); 3547 mutex_unlock(&mdsc->mutex); 3548 } 3549 if (op == CEPH_SESSION_CLOSE) 3550 ceph_put_mds_session(session); 3551 return; 3552 3553 bad: 3554 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 3555 (int)msg->front.iov_len); 3556 ceph_msg_dump(msg); 3557 return; 3558 } 3559 3560 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 3561 { 3562 int dcaps; 3563 3564 dcaps = xchg(&req->r_dir_caps, 0); 3565 if (dcaps) { 3566 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3567 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 3568 } 3569 } 3570 3571 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req) 3572 { 3573 int dcaps; 3574 3575 dcaps = xchg(&req->r_dir_caps, 0); 3576 if (dcaps) { 3577 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3578 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent), 3579 dcaps); 3580 } 3581 } 3582 3583 /* 3584 * called under session->mutex. 3585 */ 3586 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 3587 struct ceph_mds_session *session) 3588 { 3589 struct ceph_mds_request *req, *nreq; 3590 struct rb_node *p; 3591 3592 dout("replay_unsafe_requests mds%d\n", session->s_mds); 3593 3594 mutex_lock(&mdsc->mutex); 3595 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 3596 __send_request(session, req, true); 3597 3598 /* 3599 * also re-send old requests when MDS enters reconnect stage. So that MDS 3600 * can process completed request in clientreplay stage. 3601 */ 3602 p = rb_first(&mdsc->request_tree); 3603 while (p) { 3604 req = rb_entry(p, struct ceph_mds_request, r_node); 3605 p = rb_next(p); 3606 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3607 continue; 3608 if (req->r_attempts == 0) 3609 continue; /* only old requests */ 3610 if (!req->r_session) 3611 continue; 3612 if (req->r_session->s_mds != session->s_mds) 3613 continue; 3614 3615 ceph_mdsc_release_dir_caps_no_check(req); 3616 3617 __send_request(session, req, true); 3618 } 3619 mutex_unlock(&mdsc->mutex); 3620 } 3621 3622 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 3623 { 3624 struct ceph_msg *reply; 3625 struct ceph_pagelist *_pagelist; 3626 struct page *page; 3627 __le32 *addr; 3628 int err = -ENOMEM; 3629 3630 if (!recon_state->allow_multi) 3631 return -ENOSPC; 3632 3633 /* can't handle message that contains both caps and realm */ 3634 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 3635 3636 /* pre-allocate new pagelist */ 3637 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 3638 if (!_pagelist) 3639 return -ENOMEM; 3640 3641 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3642 if (!reply) 3643 goto fail_msg; 3644 3645 /* placeholder for nr_caps */ 3646 err = ceph_pagelist_encode_32(_pagelist, 0); 3647 if (err < 0) 3648 goto fail; 3649 3650 if (recon_state->nr_caps) { 3651 /* currently encoding caps */ 3652 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 3653 if (err) 3654 goto fail; 3655 } else { 3656 /* placeholder for nr_realms (currently encoding relams) */ 3657 err = ceph_pagelist_encode_32(_pagelist, 0); 3658 if (err < 0) 3659 goto fail; 3660 } 3661 3662 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 3663 if (err) 3664 goto fail; 3665 3666 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 3667 addr = kmap_atomic(page); 3668 if (recon_state->nr_caps) { 3669 /* currently encoding caps */ 3670 *addr = cpu_to_le32(recon_state->nr_caps); 3671 } else { 3672 /* currently encoding relams */ 3673 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 3674 } 3675 kunmap_atomic(addr); 3676 3677 reply->hdr.version = cpu_to_le16(5); 3678 reply->hdr.compat_version = cpu_to_le16(4); 3679 3680 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 3681 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 3682 3683 ceph_con_send(&recon_state->session->s_con, reply); 3684 ceph_pagelist_release(recon_state->pagelist); 3685 3686 recon_state->pagelist = _pagelist; 3687 recon_state->nr_caps = 0; 3688 recon_state->nr_realms = 0; 3689 recon_state->msg_version = 5; 3690 return 0; 3691 fail: 3692 ceph_msg_put(reply); 3693 fail_msg: 3694 ceph_pagelist_release(_pagelist); 3695 return err; 3696 } 3697 3698 static struct dentry* d_find_primary(struct inode *inode) 3699 { 3700 struct dentry *alias, *dn = NULL; 3701 3702 if (hlist_empty(&inode->i_dentry)) 3703 return NULL; 3704 3705 spin_lock(&inode->i_lock); 3706 if (hlist_empty(&inode->i_dentry)) 3707 goto out_unlock; 3708 3709 if (S_ISDIR(inode->i_mode)) { 3710 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias); 3711 if (!IS_ROOT(alias)) 3712 dn = dget(alias); 3713 goto out_unlock; 3714 } 3715 3716 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) { 3717 spin_lock(&alias->d_lock); 3718 if (!d_unhashed(alias) && 3719 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 3720 dn = dget_dlock(alias); 3721 } 3722 spin_unlock(&alias->d_lock); 3723 if (dn) 3724 break; 3725 } 3726 out_unlock: 3727 spin_unlock(&inode->i_lock); 3728 return dn; 3729 } 3730 3731 /* 3732 * Encode information about a cap for a reconnect with the MDS. 3733 */ 3734 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, 3735 void *arg) 3736 { 3737 union { 3738 struct ceph_mds_cap_reconnect v2; 3739 struct ceph_mds_cap_reconnect_v1 v1; 3740 } rec; 3741 struct ceph_inode_info *ci = cap->ci; 3742 struct ceph_reconnect_state *recon_state = arg; 3743 struct ceph_pagelist *pagelist = recon_state->pagelist; 3744 struct dentry *dentry; 3745 char *path; 3746 int pathlen, err; 3747 u64 pathbase; 3748 u64 snap_follows; 3749 3750 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 3751 inode, ceph_vinop(inode), cap, cap->cap_id, 3752 ceph_cap_string(cap->issued)); 3753 3754 dentry = d_find_primary(inode); 3755 if (dentry) { 3756 /* set pathbase to parent dir when msg_version >= 2 */ 3757 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 3758 recon_state->msg_version >= 2); 3759 dput(dentry); 3760 if (IS_ERR(path)) { 3761 err = PTR_ERR(path); 3762 goto out_err; 3763 } 3764 } else { 3765 path = NULL; 3766 pathlen = 0; 3767 pathbase = 0; 3768 } 3769 3770 spin_lock(&ci->i_ceph_lock); 3771 cap->seq = 0; /* reset cap seq */ 3772 cap->issue_seq = 0; /* and issue_seq */ 3773 cap->mseq = 0; /* and migrate_seq */ 3774 cap->cap_gen = cap->session->s_cap_gen; 3775 3776 /* These are lost when the session goes away */ 3777 if (S_ISDIR(inode->i_mode)) { 3778 if (cap->issued & CEPH_CAP_DIR_CREATE) { 3779 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 3780 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 3781 } 3782 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 3783 } 3784 3785 if (recon_state->msg_version >= 2) { 3786 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 3787 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3788 rec.v2.issued = cpu_to_le32(cap->issued); 3789 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3790 rec.v2.pathbase = cpu_to_le64(pathbase); 3791 rec.v2.flock_len = (__force __le32) 3792 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 3793 } else { 3794 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 3795 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3796 rec.v1.issued = cpu_to_le32(cap->issued); 3797 rec.v1.size = cpu_to_le64(i_size_read(inode)); 3798 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 3799 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 3800 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3801 rec.v1.pathbase = cpu_to_le64(pathbase); 3802 } 3803 3804 if (list_empty(&ci->i_cap_snaps)) { 3805 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 3806 } else { 3807 struct ceph_cap_snap *capsnap = 3808 list_first_entry(&ci->i_cap_snaps, 3809 struct ceph_cap_snap, ci_item); 3810 snap_follows = capsnap->follows; 3811 } 3812 spin_unlock(&ci->i_ceph_lock); 3813 3814 if (recon_state->msg_version >= 2) { 3815 int num_fcntl_locks, num_flock_locks; 3816 struct ceph_filelock *flocks = NULL; 3817 size_t struct_len, total_len = sizeof(u64); 3818 u8 struct_v = 0; 3819 3820 encode_again: 3821 if (rec.v2.flock_len) { 3822 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 3823 } else { 3824 num_fcntl_locks = 0; 3825 num_flock_locks = 0; 3826 } 3827 if (num_fcntl_locks + num_flock_locks > 0) { 3828 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 3829 sizeof(struct ceph_filelock), 3830 GFP_NOFS); 3831 if (!flocks) { 3832 err = -ENOMEM; 3833 goto out_err; 3834 } 3835 err = ceph_encode_locks_to_buffer(inode, flocks, 3836 num_fcntl_locks, 3837 num_flock_locks); 3838 if (err) { 3839 kfree(flocks); 3840 flocks = NULL; 3841 if (err == -ENOSPC) 3842 goto encode_again; 3843 goto out_err; 3844 } 3845 } else { 3846 kfree(flocks); 3847 flocks = NULL; 3848 } 3849 3850 if (recon_state->msg_version >= 3) { 3851 /* version, compat_version and struct_len */ 3852 total_len += 2 * sizeof(u8) + sizeof(u32); 3853 struct_v = 2; 3854 } 3855 /* 3856 * number of encoded locks is stable, so copy to pagelist 3857 */ 3858 struct_len = 2 * sizeof(u32) + 3859 (num_fcntl_locks + num_flock_locks) * 3860 sizeof(struct ceph_filelock); 3861 rec.v2.flock_len = cpu_to_le32(struct_len); 3862 3863 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2); 3864 3865 if (struct_v >= 2) 3866 struct_len += sizeof(u64); /* snap_follows */ 3867 3868 total_len += struct_len; 3869 3870 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 3871 err = send_reconnect_partial(recon_state); 3872 if (err) 3873 goto out_freeflocks; 3874 pagelist = recon_state->pagelist; 3875 } 3876 3877 err = ceph_pagelist_reserve(pagelist, total_len); 3878 if (err) 3879 goto out_freeflocks; 3880 3881 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3882 if (recon_state->msg_version >= 3) { 3883 ceph_pagelist_encode_8(pagelist, struct_v); 3884 ceph_pagelist_encode_8(pagelist, 1); 3885 ceph_pagelist_encode_32(pagelist, struct_len); 3886 } 3887 ceph_pagelist_encode_string(pagelist, path, pathlen); 3888 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 3889 ceph_locks_to_pagelist(flocks, pagelist, 3890 num_fcntl_locks, num_flock_locks); 3891 if (struct_v >= 2) 3892 ceph_pagelist_encode_64(pagelist, snap_follows); 3893 out_freeflocks: 3894 kfree(flocks); 3895 } else { 3896 err = ceph_pagelist_reserve(pagelist, 3897 sizeof(u64) + sizeof(u32) + 3898 pathlen + sizeof(rec.v1)); 3899 if (err) 3900 goto out_err; 3901 3902 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3903 ceph_pagelist_encode_string(pagelist, path, pathlen); 3904 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 3905 } 3906 3907 out_err: 3908 ceph_mdsc_free_path(path, pathlen); 3909 if (!err) 3910 recon_state->nr_caps++; 3911 return err; 3912 } 3913 3914 static int encode_snap_realms(struct ceph_mds_client *mdsc, 3915 struct ceph_reconnect_state *recon_state) 3916 { 3917 struct rb_node *p; 3918 struct ceph_pagelist *pagelist = recon_state->pagelist; 3919 int err = 0; 3920 3921 if (recon_state->msg_version >= 4) { 3922 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 3923 if (err < 0) 3924 goto fail; 3925 } 3926 3927 /* 3928 * snaprealms. we provide mds with the ino, seq (version), and 3929 * parent for all of our realms. If the mds has any newer info, 3930 * it will tell us. 3931 */ 3932 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 3933 struct ceph_snap_realm *realm = 3934 rb_entry(p, struct ceph_snap_realm, node); 3935 struct ceph_mds_snaprealm_reconnect sr_rec; 3936 3937 if (recon_state->msg_version >= 4) { 3938 size_t need = sizeof(u8) * 2 + sizeof(u32) + 3939 sizeof(sr_rec); 3940 3941 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 3942 err = send_reconnect_partial(recon_state); 3943 if (err) 3944 goto fail; 3945 pagelist = recon_state->pagelist; 3946 } 3947 3948 err = ceph_pagelist_reserve(pagelist, need); 3949 if (err) 3950 goto fail; 3951 3952 ceph_pagelist_encode_8(pagelist, 1); 3953 ceph_pagelist_encode_8(pagelist, 1); 3954 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 3955 } 3956 3957 dout(" adding snap realm %llx seq %lld parent %llx\n", 3958 realm->ino, realm->seq, realm->parent_ino); 3959 sr_rec.ino = cpu_to_le64(realm->ino); 3960 sr_rec.seq = cpu_to_le64(realm->seq); 3961 sr_rec.parent = cpu_to_le64(realm->parent_ino); 3962 3963 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 3964 if (err) 3965 goto fail; 3966 3967 recon_state->nr_realms++; 3968 } 3969 fail: 3970 return err; 3971 } 3972 3973 3974 /* 3975 * If an MDS fails and recovers, clients need to reconnect in order to 3976 * reestablish shared state. This includes all caps issued through 3977 * this session _and_ the snap_realm hierarchy. Because it's not 3978 * clear which snap realms the mds cares about, we send everything we 3979 * know about.. that ensures we'll then get any new info the 3980 * recovering MDS might have. 3981 * 3982 * This is a relatively heavyweight operation, but it's rare. 3983 */ 3984 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 3985 struct ceph_mds_session *session) 3986 { 3987 struct ceph_msg *reply; 3988 int mds = session->s_mds; 3989 int err = -ENOMEM; 3990 struct ceph_reconnect_state recon_state = { 3991 .session = session, 3992 }; 3993 LIST_HEAD(dispose); 3994 3995 pr_info("mds%d reconnect start\n", mds); 3996 3997 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 3998 if (!recon_state.pagelist) 3999 goto fail_nopagelist; 4000 4001 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4002 if (!reply) 4003 goto fail_nomsg; 4004 4005 xa_destroy(&session->s_delegated_inos); 4006 4007 mutex_lock(&session->s_mutex); 4008 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 4009 session->s_seq = 0; 4010 4011 dout("session %p state %s\n", session, 4012 ceph_session_state_name(session->s_state)); 4013 4014 spin_lock(&session->s_gen_ttl_lock); 4015 session->s_cap_gen++; 4016 spin_unlock(&session->s_gen_ttl_lock); 4017 4018 spin_lock(&session->s_cap_lock); 4019 /* don't know if session is readonly */ 4020 session->s_readonly = 0; 4021 /* 4022 * notify __ceph_remove_cap() that we are composing cap reconnect. 4023 * If a cap get released before being added to the cap reconnect, 4024 * __ceph_remove_cap() should skip queuing cap release. 4025 */ 4026 session->s_cap_reconnect = 1; 4027 /* drop old cap expires; we're about to reestablish that state */ 4028 detach_cap_releases(session, &dispose); 4029 spin_unlock(&session->s_cap_lock); 4030 dispose_cap_releases(mdsc, &dispose); 4031 4032 /* trim unused caps to reduce MDS's cache rejoin time */ 4033 if (mdsc->fsc->sb->s_root) 4034 shrink_dcache_parent(mdsc->fsc->sb->s_root); 4035 4036 ceph_con_close(&session->s_con); 4037 ceph_con_open(&session->s_con, 4038 CEPH_ENTITY_TYPE_MDS, mds, 4039 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 4040 4041 /* replay unsafe requests */ 4042 replay_unsafe_requests(mdsc, session); 4043 4044 ceph_early_kick_flushing_caps(mdsc, session); 4045 4046 down_read(&mdsc->snap_rwsem); 4047 4048 /* placeholder for nr_caps */ 4049 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 4050 if (err) 4051 goto fail; 4052 4053 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 4054 recon_state.msg_version = 3; 4055 recon_state.allow_multi = true; 4056 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 4057 recon_state.msg_version = 3; 4058 } else { 4059 recon_state.msg_version = 2; 4060 } 4061 /* trsaverse this session's caps */ 4062 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 4063 4064 spin_lock(&session->s_cap_lock); 4065 session->s_cap_reconnect = 0; 4066 spin_unlock(&session->s_cap_lock); 4067 4068 if (err < 0) 4069 goto fail; 4070 4071 /* check if all realms can be encoded into current message */ 4072 if (mdsc->num_snap_realms) { 4073 size_t total_len = 4074 recon_state.pagelist->length + 4075 mdsc->num_snap_realms * 4076 sizeof(struct ceph_mds_snaprealm_reconnect); 4077 if (recon_state.msg_version >= 4) { 4078 /* number of realms */ 4079 total_len += sizeof(u32); 4080 /* version, compat_version and struct_len */ 4081 total_len += mdsc->num_snap_realms * 4082 (2 * sizeof(u8) + sizeof(u32)); 4083 } 4084 if (total_len > RECONNECT_MAX_SIZE) { 4085 if (!recon_state.allow_multi) { 4086 err = -ENOSPC; 4087 goto fail; 4088 } 4089 if (recon_state.nr_caps) { 4090 err = send_reconnect_partial(&recon_state); 4091 if (err) 4092 goto fail; 4093 } 4094 recon_state.msg_version = 5; 4095 } 4096 } 4097 4098 err = encode_snap_realms(mdsc, &recon_state); 4099 if (err < 0) 4100 goto fail; 4101 4102 if (recon_state.msg_version >= 5) { 4103 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 4104 if (err < 0) 4105 goto fail; 4106 } 4107 4108 if (recon_state.nr_caps || recon_state.nr_realms) { 4109 struct page *page = 4110 list_first_entry(&recon_state.pagelist->head, 4111 struct page, lru); 4112 __le32 *addr = kmap_atomic(page); 4113 if (recon_state.nr_caps) { 4114 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 4115 *addr = cpu_to_le32(recon_state.nr_caps); 4116 } else if (recon_state.msg_version >= 4) { 4117 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 4118 } 4119 kunmap_atomic(addr); 4120 } 4121 4122 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 4123 if (recon_state.msg_version >= 4) 4124 reply->hdr.compat_version = cpu_to_le16(4); 4125 4126 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 4127 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 4128 4129 ceph_con_send(&session->s_con, reply); 4130 4131 mutex_unlock(&session->s_mutex); 4132 4133 mutex_lock(&mdsc->mutex); 4134 __wake_requests(mdsc, &session->s_waiting); 4135 mutex_unlock(&mdsc->mutex); 4136 4137 up_read(&mdsc->snap_rwsem); 4138 ceph_pagelist_release(recon_state.pagelist); 4139 return; 4140 4141 fail: 4142 ceph_msg_put(reply); 4143 up_read(&mdsc->snap_rwsem); 4144 mutex_unlock(&session->s_mutex); 4145 fail_nomsg: 4146 ceph_pagelist_release(recon_state.pagelist); 4147 fail_nopagelist: 4148 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 4149 return; 4150 } 4151 4152 4153 /* 4154 * compare old and new mdsmaps, kicking requests 4155 * and closing out old connections as necessary 4156 * 4157 * called under mdsc->mutex. 4158 */ 4159 static void check_new_map(struct ceph_mds_client *mdsc, 4160 struct ceph_mdsmap *newmap, 4161 struct ceph_mdsmap *oldmap) 4162 { 4163 int i; 4164 int oldstate, newstate; 4165 struct ceph_mds_session *s; 4166 4167 dout("check_new_map new %u old %u\n", 4168 newmap->m_epoch, oldmap->m_epoch); 4169 4170 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4171 if (!mdsc->sessions[i]) 4172 continue; 4173 s = mdsc->sessions[i]; 4174 oldstate = ceph_mdsmap_get_state(oldmap, i); 4175 newstate = ceph_mdsmap_get_state(newmap, i); 4176 4177 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 4178 i, ceph_mds_state_name(oldstate), 4179 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 4180 ceph_mds_state_name(newstate), 4181 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 4182 ceph_session_state_name(s->s_state)); 4183 4184 if (i >= newmap->possible_max_rank) { 4185 /* force close session for stopped mds */ 4186 ceph_get_mds_session(s); 4187 __unregister_session(mdsc, s); 4188 __wake_requests(mdsc, &s->s_waiting); 4189 mutex_unlock(&mdsc->mutex); 4190 4191 mutex_lock(&s->s_mutex); 4192 cleanup_session_requests(mdsc, s); 4193 remove_session_caps(s); 4194 mutex_unlock(&s->s_mutex); 4195 4196 ceph_put_mds_session(s); 4197 4198 mutex_lock(&mdsc->mutex); 4199 kick_requests(mdsc, i); 4200 continue; 4201 } 4202 4203 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 4204 ceph_mdsmap_get_addr(newmap, i), 4205 sizeof(struct ceph_entity_addr))) { 4206 /* just close it */ 4207 mutex_unlock(&mdsc->mutex); 4208 mutex_lock(&s->s_mutex); 4209 mutex_lock(&mdsc->mutex); 4210 ceph_con_close(&s->s_con); 4211 mutex_unlock(&s->s_mutex); 4212 s->s_state = CEPH_MDS_SESSION_RESTARTING; 4213 } else if (oldstate == newstate) { 4214 continue; /* nothing new with this mds */ 4215 } 4216 4217 /* 4218 * send reconnect? 4219 */ 4220 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 4221 newstate >= CEPH_MDS_STATE_RECONNECT) { 4222 mutex_unlock(&mdsc->mutex); 4223 send_mds_reconnect(mdsc, s); 4224 mutex_lock(&mdsc->mutex); 4225 } 4226 4227 /* 4228 * kick request on any mds that has gone active. 4229 */ 4230 if (oldstate < CEPH_MDS_STATE_ACTIVE && 4231 newstate >= CEPH_MDS_STATE_ACTIVE) { 4232 if (oldstate != CEPH_MDS_STATE_CREATING && 4233 oldstate != CEPH_MDS_STATE_STARTING) 4234 pr_info("mds%d recovery completed\n", s->s_mds); 4235 kick_requests(mdsc, i); 4236 mutex_unlock(&mdsc->mutex); 4237 mutex_lock(&s->s_mutex); 4238 mutex_lock(&mdsc->mutex); 4239 ceph_kick_flushing_caps(mdsc, s); 4240 mutex_unlock(&s->s_mutex); 4241 wake_up_session_caps(s, RECONNECT); 4242 } 4243 } 4244 4245 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4246 s = mdsc->sessions[i]; 4247 if (!s) 4248 continue; 4249 if (!ceph_mdsmap_is_laggy(newmap, i)) 4250 continue; 4251 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4252 s->s_state == CEPH_MDS_SESSION_HUNG || 4253 s->s_state == CEPH_MDS_SESSION_CLOSING) { 4254 dout(" connecting to export targets of laggy mds%d\n", 4255 i); 4256 __open_export_target_sessions(mdsc, s); 4257 } 4258 } 4259 } 4260 4261 4262 4263 /* 4264 * leases 4265 */ 4266 4267 /* 4268 * caller must hold session s_mutex, dentry->d_lock 4269 */ 4270 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 4271 { 4272 struct ceph_dentry_info *di = ceph_dentry(dentry); 4273 4274 ceph_put_mds_session(di->lease_session); 4275 di->lease_session = NULL; 4276 } 4277 4278 static void handle_lease(struct ceph_mds_client *mdsc, 4279 struct ceph_mds_session *session, 4280 struct ceph_msg *msg) 4281 { 4282 struct super_block *sb = mdsc->fsc->sb; 4283 struct inode *inode; 4284 struct dentry *parent, *dentry; 4285 struct ceph_dentry_info *di; 4286 int mds = session->s_mds; 4287 struct ceph_mds_lease *h = msg->front.iov_base; 4288 u32 seq; 4289 struct ceph_vino vino; 4290 struct qstr dname; 4291 int release = 0; 4292 4293 dout("handle_lease from mds%d\n", mds); 4294 4295 /* decode */ 4296 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 4297 goto bad; 4298 vino.ino = le64_to_cpu(h->ino); 4299 vino.snap = CEPH_NOSNAP; 4300 seq = le32_to_cpu(h->seq); 4301 dname.len = get_unaligned_le32(h + 1); 4302 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 4303 goto bad; 4304 dname.name = (void *)(h + 1) + sizeof(u32); 4305 4306 /* lookup inode */ 4307 inode = ceph_find_inode(sb, vino); 4308 dout("handle_lease %s, ino %llx %p %.*s\n", 4309 ceph_lease_op_name(h->action), vino.ino, inode, 4310 dname.len, dname.name); 4311 4312 mutex_lock(&session->s_mutex); 4313 inc_session_sequence(session); 4314 4315 if (!inode) { 4316 dout("handle_lease no inode %llx\n", vino.ino); 4317 goto release; 4318 } 4319 4320 /* dentry */ 4321 parent = d_find_alias(inode); 4322 if (!parent) { 4323 dout("no parent dentry on inode %p\n", inode); 4324 WARN_ON(1); 4325 goto release; /* hrm... */ 4326 } 4327 dname.hash = full_name_hash(parent, dname.name, dname.len); 4328 dentry = d_lookup(parent, &dname); 4329 dput(parent); 4330 if (!dentry) 4331 goto release; 4332 4333 spin_lock(&dentry->d_lock); 4334 di = ceph_dentry(dentry); 4335 switch (h->action) { 4336 case CEPH_MDS_LEASE_REVOKE: 4337 if (di->lease_session == session) { 4338 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 4339 h->seq = cpu_to_le32(di->lease_seq); 4340 __ceph_mdsc_drop_dentry_lease(dentry); 4341 } 4342 release = 1; 4343 break; 4344 4345 case CEPH_MDS_LEASE_RENEW: 4346 if (di->lease_session == session && 4347 di->lease_gen == session->s_cap_gen && 4348 di->lease_renew_from && 4349 di->lease_renew_after == 0) { 4350 unsigned long duration = 4351 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 4352 4353 di->lease_seq = seq; 4354 di->time = di->lease_renew_from + duration; 4355 di->lease_renew_after = di->lease_renew_from + 4356 (duration >> 1); 4357 di->lease_renew_from = 0; 4358 } 4359 break; 4360 } 4361 spin_unlock(&dentry->d_lock); 4362 dput(dentry); 4363 4364 if (!release) 4365 goto out; 4366 4367 release: 4368 /* let's just reuse the same message */ 4369 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 4370 ceph_msg_get(msg); 4371 ceph_con_send(&session->s_con, msg); 4372 4373 out: 4374 mutex_unlock(&session->s_mutex); 4375 /* avoid calling iput_final() in mds dispatch threads */ 4376 ceph_async_iput(inode); 4377 return; 4378 4379 bad: 4380 pr_err("corrupt lease message\n"); 4381 ceph_msg_dump(msg); 4382 } 4383 4384 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 4385 struct dentry *dentry, char action, 4386 u32 seq) 4387 { 4388 struct ceph_msg *msg; 4389 struct ceph_mds_lease *lease; 4390 struct inode *dir; 4391 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 4392 4393 dout("lease_send_msg identry %p %s to mds%d\n", 4394 dentry, ceph_lease_op_name(action), session->s_mds); 4395 4396 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 4397 if (!msg) 4398 return; 4399 lease = msg->front.iov_base; 4400 lease->action = action; 4401 lease->seq = cpu_to_le32(seq); 4402 4403 spin_lock(&dentry->d_lock); 4404 dir = d_inode(dentry->d_parent); 4405 lease->ino = cpu_to_le64(ceph_ino(dir)); 4406 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 4407 4408 put_unaligned_le32(dentry->d_name.len, lease + 1); 4409 memcpy((void *)(lease + 1) + 4, 4410 dentry->d_name.name, dentry->d_name.len); 4411 spin_unlock(&dentry->d_lock); 4412 /* 4413 * if this is a preemptive lease RELEASE, no need to 4414 * flush request stream, since the actual request will 4415 * soon follow. 4416 */ 4417 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 4418 4419 ceph_con_send(&session->s_con, msg); 4420 } 4421 4422 /* 4423 * lock unlock sessions, to wait ongoing session activities 4424 */ 4425 static void lock_unlock_sessions(struct ceph_mds_client *mdsc) 4426 { 4427 int i; 4428 4429 mutex_lock(&mdsc->mutex); 4430 for (i = 0; i < mdsc->max_sessions; i++) { 4431 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4432 if (!s) 4433 continue; 4434 mutex_unlock(&mdsc->mutex); 4435 mutex_lock(&s->s_mutex); 4436 mutex_unlock(&s->s_mutex); 4437 ceph_put_mds_session(s); 4438 mutex_lock(&mdsc->mutex); 4439 } 4440 mutex_unlock(&mdsc->mutex); 4441 } 4442 4443 static void maybe_recover_session(struct ceph_mds_client *mdsc) 4444 { 4445 struct ceph_fs_client *fsc = mdsc->fsc; 4446 4447 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 4448 return; 4449 4450 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 4451 return; 4452 4453 if (!READ_ONCE(fsc->blocklisted)) 4454 return; 4455 4456 pr_info("auto reconnect after blocklisted\n"); 4457 ceph_force_reconnect(fsc->sb); 4458 } 4459 4460 bool check_session_state(struct ceph_mds_session *s) 4461 { 4462 switch (s->s_state) { 4463 case CEPH_MDS_SESSION_OPEN: 4464 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 4465 s->s_state = CEPH_MDS_SESSION_HUNG; 4466 pr_info("mds%d hung\n", s->s_mds); 4467 } 4468 break; 4469 case CEPH_MDS_SESSION_CLOSING: 4470 /* Should never reach this when we're unmounting */ 4471 WARN_ON_ONCE(true); 4472 fallthrough; 4473 case CEPH_MDS_SESSION_NEW: 4474 case CEPH_MDS_SESSION_RESTARTING: 4475 case CEPH_MDS_SESSION_CLOSED: 4476 case CEPH_MDS_SESSION_REJECTED: 4477 return false; 4478 } 4479 4480 return true; 4481 } 4482 4483 /* 4484 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 4485 * then we need to retransmit that request. 4486 */ 4487 void inc_session_sequence(struct ceph_mds_session *s) 4488 { 4489 lockdep_assert_held(&s->s_mutex); 4490 4491 s->s_seq++; 4492 4493 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 4494 int ret; 4495 4496 dout("resending session close request for mds%d\n", s->s_mds); 4497 ret = request_close_session(s); 4498 if (ret < 0) 4499 pr_err("unable to close session to mds%d: %d\n", 4500 s->s_mds, ret); 4501 } 4502 } 4503 4504 /* 4505 * delayed work -- periodically trim expired leases, renew caps with mds 4506 */ 4507 static void schedule_delayed(struct ceph_mds_client *mdsc) 4508 { 4509 int delay = 5; 4510 unsigned hz = round_jiffies_relative(HZ * delay); 4511 schedule_delayed_work(&mdsc->delayed_work, hz); 4512 } 4513 4514 static void delayed_work(struct work_struct *work) 4515 { 4516 int i; 4517 struct ceph_mds_client *mdsc = 4518 container_of(work, struct ceph_mds_client, delayed_work.work); 4519 int renew_interval; 4520 int renew_caps; 4521 4522 dout("mdsc delayed_work\n"); 4523 4524 if (mdsc->stopping) 4525 return; 4526 4527 mutex_lock(&mdsc->mutex); 4528 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 4529 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 4530 mdsc->last_renew_caps); 4531 if (renew_caps) 4532 mdsc->last_renew_caps = jiffies; 4533 4534 for (i = 0; i < mdsc->max_sessions; i++) { 4535 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4536 if (!s) 4537 continue; 4538 4539 if (!check_session_state(s)) { 4540 ceph_put_mds_session(s); 4541 continue; 4542 } 4543 mutex_unlock(&mdsc->mutex); 4544 4545 mutex_lock(&s->s_mutex); 4546 if (renew_caps) 4547 send_renew_caps(mdsc, s); 4548 else 4549 ceph_con_keepalive(&s->s_con); 4550 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4551 s->s_state == CEPH_MDS_SESSION_HUNG) 4552 ceph_send_cap_releases(mdsc, s); 4553 mutex_unlock(&s->s_mutex); 4554 ceph_put_mds_session(s); 4555 4556 mutex_lock(&mdsc->mutex); 4557 } 4558 mutex_unlock(&mdsc->mutex); 4559 4560 ceph_check_delayed_caps(mdsc); 4561 4562 ceph_queue_cap_reclaim_work(mdsc); 4563 4564 ceph_trim_snapid_map(mdsc); 4565 4566 maybe_recover_session(mdsc); 4567 4568 schedule_delayed(mdsc); 4569 } 4570 4571 int ceph_mdsc_init(struct ceph_fs_client *fsc) 4572 4573 { 4574 struct ceph_mds_client *mdsc; 4575 int err; 4576 4577 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 4578 if (!mdsc) 4579 return -ENOMEM; 4580 mdsc->fsc = fsc; 4581 mutex_init(&mdsc->mutex); 4582 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 4583 if (!mdsc->mdsmap) { 4584 err = -ENOMEM; 4585 goto err_mdsc; 4586 } 4587 4588 init_completion(&mdsc->safe_umount_waiters); 4589 init_waitqueue_head(&mdsc->session_close_wq); 4590 INIT_LIST_HEAD(&mdsc->waiting_for_map); 4591 mdsc->sessions = NULL; 4592 atomic_set(&mdsc->num_sessions, 0); 4593 mdsc->max_sessions = 0; 4594 mdsc->stopping = 0; 4595 atomic64_set(&mdsc->quotarealms_count, 0); 4596 mdsc->quotarealms_inodes = RB_ROOT; 4597 mutex_init(&mdsc->quotarealms_inodes_mutex); 4598 mdsc->last_snap_seq = 0; 4599 init_rwsem(&mdsc->snap_rwsem); 4600 mdsc->snap_realms = RB_ROOT; 4601 INIT_LIST_HEAD(&mdsc->snap_empty); 4602 mdsc->num_snap_realms = 0; 4603 spin_lock_init(&mdsc->snap_empty_lock); 4604 mdsc->last_tid = 0; 4605 mdsc->oldest_tid = 0; 4606 mdsc->request_tree = RB_ROOT; 4607 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 4608 mdsc->last_renew_caps = jiffies; 4609 INIT_LIST_HEAD(&mdsc->cap_delay_list); 4610 INIT_LIST_HEAD(&mdsc->cap_wait_list); 4611 spin_lock_init(&mdsc->cap_delay_lock); 4612 INIT_LIST_HEAD(&mdsc->snap_flush_list); 4613 spin_lock_init(&mdsc->snap_flush_lock); 4614 mdsc->last_cap_flush_tid = 1; 4615 INIT_LIST_HEAD(&mdsc->cap_flush_list); 4616 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 4617 mdsc->num_cap_flushing = 0; 4618 spin_lock_init(&mdsc->cap_dirty_lock); 4619 init_waitqueue_head(&mdsc->cap_flushing_wq); 4620 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 4621 atomic_set(&mdsc->cap_reclaim_pending, 0); 4622 err = ceph_metric_init(&mdsc->metric); 4623 if (err) 4624 goto err_mdsmap; 4625 4626 spin_lock_init(&mdsc->dentry_list_lock); 4627 INIT_LIST_HEAD(&mdsc->dentry_leases); 4628 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 4629 4630 ceph_caps_init(mdsc); 4631 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 4632 4633 spin_lock_init(&mdsc->snapid_map_lock); 4634 mdsc->snapid_map_tree = RB_ROOT; 4635 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 4636 4637 init_rwsem(&mdsc->pool_perm_rwsem); 4638 mdsc->pool_perm_tree = RB_ROOT; 4639 4640 strscpy(mdsc->nodename, utsname()->nodename, 4641 sizeof(mdsc->nodename)); 4642 4643 fsc->mdsc = mdsc; 4644 return 0; 4645 4646 err_mdsmap: 4647 kfree(mdsc->mdsmap); 4648 err_mdsc: 4649 kfree(mdsc); 4650 return err; 4651 } 4652 4653 /* 4654 * Wait for safe replies on open mds requests. If we time out, drop 4655 * all requests from the tree to avoid dangling dentry refs. 4656 */ 4657 static void wait_requests(struct ceph_mds_client *mdsc) 4658 { 4659 struct ceph_options *opts = mdsc->fsc->client->options; 4660 struct ceph_mds_request *req; 4661 4662 mutex_lock(&mdsc->mutex); 4663 if (__get_oldest_req(mdsc)) { 4664 mutex_unlock(&mdsc->mutex); 4665 4666 dout("wait_requests waiting for requests\n"); 4667 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 4668 ceph_timeout_jiffies(opts->mount_timeout)); 4669 4670 /* tear down remaining requests */ 4671 mutex_lock(&mdsc->mutex); 4672 while ((req = __get_oldest_req(mdsc))) { 4673 dout("wait_requests timed out on tid %llu\n", 4674 req->r_tid); 4675 list_del_init(&req->r_wait); 4676 __unregister_request(mdsc, req); 4677 } 4678 } 4679 mutex_unlock(&mdsc->mutex); 4680 dout("wait_requests done\n"); 4681 } 4682 4683 /* 4684 * called before mount is ro, and before dentries are torn down. 4685 * (hmm, does this still race with new lookups?) 4686 */ 4687 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 4688 { 4689 dout("pre_umount\n"); 4690 mdsc->stopping = 1; 4691 4692 lock_unlock_sessions(mdsc); 4693 ceph_flush_dirty_caps(mdsc); 4694 wait_requests(mdsc); 4695 4696 /* 4697 * wait for reply handlers to drop their request refs and 4698 * their inode/dcache refs 4699 */ 4700 ceph_msgr_flush(); 4701 4702 ceph_cleanup_quotarealms_inodes(mdsc); 4703 } 4704 4705 /* 4706 * wait for all write mds requests to flush. 4707 */ 4708 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 4709 { 4710 struct ceph_mds_request *req = NULL, *nextreq; 4711 struct rb_node *n; 4712 4713 mutex_lock(&mdsc->mutex); 4714 dout("wait_unsafe_requests want %lld\n", want_tid); 4715 restart: 4716 req = __get_oldest_req(mdsc); 4717 while (req && req->r_tid <= want_tid) { 4718 /* find next request */ 4719 n = rb_next(&req->r_node); 4720 if (n) 4721 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 4722 else 4723 nextreq = NULL; 4724 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 4725 (req->r_op & CEPH_MDS_OP_WRITE)) { 4726 /* write op */ 4727 ceph_mdsc_get_request(req); 4728 if (nextreq) 4729 ceph_mdsc_get_request(nextreq); 4730 mutex_unlock(&mdsc->mutex); 4731 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 4732 req->r_tid, want_tid); 4733 wait_for_completion(&req->r_safe_completion); 4734 mutex_lock(&mdsc->mutex); 4735 ceph_mdsc_put_request(req); 4736 if (!nextreq) 4737 break; /* next dne before, so we're done! */ 4738 if (RB_EMPTY_NODE(&nextreq->r_node)) { 4739 /* next request was removed from tree */ 4740 ceph_mdsc_put_request(nextreq); 4741 goto restart; 4742 } 4743 ceph_mdsc_put_request(nextreq); /* won't go away */ 4744 } 4745 req = nextreq; 4746 } 4747 mutex_unlock(&mdsc->mutex); 4748 dout("wait_unsafe_requests done\n"); 4749 } 4750 4751 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 4752 { 4753 u64 want_tid, want_flush; 4754 4755 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) 4756 return; 4757 4758 dout("sync\n"); 4759 mutex_lock(&mdsc->mutex); 4760 want_tid = mdsc->last_tid; 4761 mutex_unlock(&mdsc->mutex); 4762 4763 ceph_flush_dirty_caps(mdsc); 4764 spin_lock(&mdsc->cap_dirty_lock); 4765 want_flush = mdsc->last_cap_flush_tid; 4766 if (!list_empty(&mdsc->cap_flush_list)) { 4767 struct ceph_cap_flush *cf = 4768 list_last_entry(&mdsc->cap_flush_list, 4769 struct ceph_cap_flush, g_list); 4770 cf->wake = true; 4771 } 4772 spin_unlock(&mdsc->cap_dirty_lock); 4773 4774 dout("sync want tid %lld flush_seq %lld\n", 4775 want_tid, want_flush); 4776 4777 wait_unsafe_requests(mdsc, want_tid); 4778 wait_caps_flush(mdsc, want_flush); 4779 } 4780 4781 /* 4782 * true if all sessions are closed, or we force unmount 4783 */ 4784 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 4785 { 4786 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4787 return true; 4788 return atomic_read(&mdsc->num_sessions) <= skipped; 4789 } 4790 4791 /* 4792 * called after sb is ro. 4793 */ 4794 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 4795 { 4796 struct ceph_options *opts = mdsc->fsc->client->options; 4797 struct ceph_mds_session *session; 4798 int i; 4799 int skipped = 0; 4800 4801 dout("close_sessions\n"); 4802 4803 /* close sessions */ 4804 mutex_lock(&mdsc->mutex); 4805 for (i = 0; i < mdsc->max_sessions; i++) { 4806 session = __ceph_lookup_mds_session(mdsc, i); 4807 if (!session) 4808 continue; 4809 mutex_unlock(&mdsc->mutex); 4810 mutex_lock(&session->s_mutex); 4811 if (__close_session(mdsc, session) <= 0) 4812 skipped++; 4813 mutex_unlock(&session->s_mutex); 4814 ceph_put_mds_session(session); 4815 mutex_lock(&mdsc->mutex); 4816 } 4817 mutex_unlock(&mdsc->mutex); 4818 4819 dout("waiting for sessions to close\n"); 4820 wait_event_timeout(mdsc->session_close_wq, 4821 done_closing_sessions(mdsc, skipped), 4822 ceph_timeout_jiffies(opts->mount_timeout)); 4823 4824 /* tear down remaining sessions */ 4825 mutex_lock(&mdsc->mutex); 4826 for (i = 0; i < mdsc->max_sessions; i++) { 4827 if (mdsc->sessions[i]) { 4828 session = ceph_get_mds_session(mdsc->sessions[i]); 4829 __unregister_session(mdsc, session); 4830 mutex_unlock(&mdsc->mutex); 4831 mutex_lock(&session->s_mutex); 4832 remove_session_caps(session); 4833 mutex_unlock(&session->s_mutex); 4834 ceph_put_mds_session(session); 4835 mutex_lock(&mdsc->mutex); 4836 } 4837 } 4838 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 4839 mutex_unlock(&mdsc->mutex); 4840 4841 ceph_cleanup_snapid_map(mdsc); 4842 ceph_cleanup_empty_realms(mdsc); 4843 4844 cancel_work_sync(&mdsc->cap_reclaim_work); 4845 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4846 4847 dout("stopped\n"); 4848 } 4849 4850 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 4851 { 4852 struct ceph_mds_session *session; 4853 int mds; 4854 4855 dout("force umount\n"); 4856 4857 mutex_lock(&mdsc->mutex); 4858 for (mds = 0; mds < mdsc->max_sessions; mds++) { 4859 session = __ceph_lookup_mds_session(mdsc, mds); 4860 if (!session) 4861 continue; 4862 4863 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 4864 __unregister_session(mdsc, session); 4865 __wake_requests(mdsc, &session->s_waiting); 4866 mutex_unlock(&mdsc->mutex); 4867 4868 mutex_lock(&session->s_mutex); 4869 __close_session(mdsc, session); 4870 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 4871 cleanup_session_requests(mdsc, session); 4872 remove_session_caps(session); 4873 } 4874 mutex_unlock(&session->s_mutex); 4875 ceph_put_mds_session(session); 4876 4877 mutex_lock(&mdsc->mutex); 4878 kick_requests(mdsc, mds); 4879 } 4880 __wake_requests(mdsc, &mdsc->waiting_for_map); 4881 mutex_unlock(&mdsc->mutex); 4882 } 4883 4884 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 4885 { 4886 dout("stop\n"); 4887 /* 4888 * Make sure the delayed work stopped before releasing 4889 * the resources. 4890 * 4891 * Because the cancel_delayed_work_sync() will only 4892 * guarantee that the work finishes executing. But the 4893 * delayed work will re-arm itself again after that. 4894 */ 4895 flush_delayed_work(&mdsc->delayed_work); 4896 4897 if (mdsc->mdsmap) 4898 ceph_mdsmap_destroy(mdsc->mdsmap); 4899 kfree(mdsc->sessions); 4900 ceph_caps_finalize(mdsc); 4901 ceph_pool_perm_destroy(mdsc); 4902 } 4903 4904 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 4905 { 4906 struct ceph_mds_client *mdsc = fsc->mdsc; 4907 dout("mdsc_destroy %p\n", mdsc); 4908 4909 if (!mdsc) 4910 return; 4911 4912 /* flush out any connection work with references to us */ 4913 ceph_msgr_flush(); 4914 4915 ceph_mdsc_stop(mdsc); 4916 4917 ceph_metric_destroy(&mdsc->metric); 4918 4919 flush_delayed_work(&mdsc->metric.delayed_work); 4920 fsc->mdsc = NULL; 4921 kfree(mdsc); 4922 dout("mdsc_destroy %p done\n", mdsc); 4923 } 4924 4925 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4926 { 4927 struct ceph_fs_client *fsc = mdsc->fsc; 4928 const char *mds_namespace = fsc->mount_options->mds_namespace; 4929 void *p = msg->front.iov_base; 4930 void *end = p + msg->front.iov_len; 4931 u32 epoch; 4932 u32 num_fs; 4933 u32 mount_fscid = (u32)-1; 4934 int err = -EINVAL; 4935 4936 ceph_decode_need(&p, end, sizeof(u32), bad); 4937 epoch = ceph_decode_32(&p); 4938 4939 dout("handle_fsmap epoch %u\n", epoch); 4940 4941 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */ 4942 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad); 4943 4944 ceph_decode_32_safe(&p, end, num_fs, bad); 4945 while (num_fs-- > 0) { 4946 void *info_p, *info_end; 4947 u32 info_len; 4948 u32 fscid, namelen; 4949 4950 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4951 p += 2; // info_v, info_cv 4952 info_len = ceph_decode_32(&p); 4953 ceph_decode_need(&p, end, info_len, bad); 4954 info_p = p; 4955 info_end = p + info_len; 4956 p = info_end; 4957 4958 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 4959 fscid = ceph_decode_32(&info_p); 4960 namelen = ceph_decode_32(&info_p); 4961 ceph_decode_need(&info_p, info_end, namelen, bad); 4962 4963 if (mds_namespace && 4964 strlen(mds_namespace) == namelen && 4965 !strncmp(mds_namespace, (char *)info_p, namelen)) { 4966 mount_fscid = fscid; 4967 break; 4968 } 4969 } 4970 4971 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 4972 if (mount_fscid != (u32)-1) { 4973 fsc->client->monc.fs_cluster_id = mount_fscid; 4974 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 4975 0, true); 4976 ceph_monc_renew_subs(&fsc->client->monc); 4977 } else { 4978 err = -ENOENT; 4979 goto err_out; 4980 } 4981 return; 4982 4983 bad: 4984 pr_err("error decoding fsmap\n"); 4985 err_out: 4986 mutex_lock(&mdsc->mutex); 4987 mdsc->mdsmap_err = err; 4988 __wake_requests(mdsc, &mdsc->waiting_for_map); 4989 mutex_unlock(&mdsc->mutex); 4990 } 4991 4992 /* 4993 * handle mds map update. 4994 */ 4995 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4996 { 4997 u32 epoch; 4998 u32 maplen; 4999 void *p = msg->front.iov_base; 5000 void *end = p + msg->front.iov_len; 5001 struct ceph_mdsmap *newmap, *oldmap; 5002 struct ceph_fsid fsid; 5003 int err = -EINVAL; 5004 5005 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 5006 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 5007 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 5008 return; 5009 epoch = ceph_decode_32(&p); 5010 maplen = ceph_decode_32(&p); 5011 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 5012 5013 /* do we need it? */ 5014 mutex_lock(&mdsc->mutex); 5015 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 5016 dout("handle_map epoch %u <= our %u\n", 5017 epoch, mdsc->mdsmap->m_epoch); 5018 mutex_unlock(&mdsc->mutex); 5019 return; 5020 } 5021 5022 newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client)); 5023 if (IS_ERR(newmap)) { 5024 err = PTR_ERR(newmap); 5025 goto bad_unlock; 5026 } 5027 5028 /* swap into place */ 5029 if (mdsc->mdsmap) { 5030 oldmap = mdsc->mdsmap; 5031 mdsc->mdsmap = newmap; 5032 check_new_map(mdsc, newmap, oldmap); 5033 ceph_mdsmap_destroy(oldmap); 5034 } else { 5035 mdsc->mdsmap = newmap; /* first mds map */ 5036 } 5037 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 5038 MAX_LFS_FILESIZE); 5039 5040 __wake_requests(mdsc, &mdsc->waiting_for_map); 5041 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 5042 mdsc->mdsmap->m_epoch); 5043 5044 mutex_unlock(&mdsc->mutex); 5045 schedule_delayed(mdsc); 5046 return; 5047 5048 bad_unlock: 5049 mutex_unlock(&mdsc->mutex); 5050 bad: 5051 pr_err("error decoding mdsmap %d\n", err); 5052 return; 5053 } 5054 5055 static struct ceph_connection *mds_get_con(struct ceph_connection *con) 5056 { 5057 struct ceph_mds_session *s = con->private; 5058 5059 if (ceph_get_mds_session(s)) 5060 return con; 5061 return NULL; 5062 } 5063 5064 static void mds_put_con(struct ceph_connection *con) 5065 { 5066 struct ceph_mds_session *s = con->private; 5067 5068 ceph_put_mds_session(s); 5069 } 5070 5071 /* 5072 * if the client is unresponsive for long enough, the mds will kill 5073 * the session entirely. 5074 */ 5075 static void mds_peer_reset(struct ceph_connection *con) 5076 { 5077 struct ceph_mds_session *s = con->private; 5078 struct ceph_mds_client *mdsc = s->s_mdsc; 5079 5080 pr_warn("mds%d closed our session\n", s->s_mds); 5081 send_mds_reconnect(mdsc, s); 5082 } 5083 5084 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 5085 { 5086 struct ceph_mds_session *s = con->private; 5087 struct ceph_mds_client *mdsc = s->s_mdsc; 5088 int type = le16_to_cpu(msg->hdr.type); 5089 5090 mutex_lock(&mdsc->mutex); 5091 if (__verify_registered_session(mdsc, s) < 0) { 5092 mutex_unlock(&mdsc->mutex); 5093 goto out; 5094 } 5095 mutex_unlock(&mdsc->mutex); 5096 5097 switch (type) { 5098 case CEPH_MSG_MDS_MAP: 5099 ceph_mdsc_handle_mdsmap(mdsc, msg); 5100 break; 5101 case CEPH_MSG_FS_MAP_USER: 5102 ceph_mdsc_handle_fsmap(mdsc, msg); 5103 break; 5104 case CEPH_MSG_CLIENT_SESSION: 5105 handle_session(s, msg); 5106 break; 5107 case CEPH_MSG_CLIENT_REPLY: 5108 handle_reply(s, msg); 5109 break; 5110 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 5111 handle_forward(mdsc, s, msg); 5112 break; 5113 case CEPH_MSG_CLIENT_CAPS: 5114 ceph_handle_caps(s, msg); 5115 break; 5116 case CEPH_MSG_CLIENT_SNAP: 5117 ceph_handle_snap(mdsc, s, msg); 5118 break; 5119 case CEPH_MSG_CLIENT_LEASE: 5120 handle_lease(mdsc, s, msg); 5121 break; 5122 case CEPH_MSG_CLIENT_QUOTA: 5123 ceph_handle_quota(mdsc, s, msg); 5124 break; 5125 5126 default: 5127 pr_err("received unknown message type %d %s\n", type, 5128 ceph_msg_type_name(type)); 5129 } 5130 out: 5131 ceph_msg_put(msg); 5132 } 5133 5134 /* 5135 * authentication 5136 */ 5137 5138 /* 5139 * Note: returned pointer is the address of a structure that's 5140 * managed separately. Caller must *not* attempt to free it. 5141 */ 5142 static struct ceph_auth_handshake * 5143 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new) 5144 { 5145 struct ceph_mds_session *s = con->private; 5146 struct ceph_mds_client *mdsc = s->s_mdsc; 5147 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5148 struct ceph_auth_handshake *auth = &s->s_auth; 5149 int ret; 5150 5151 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5152 force_new, proto, NULL, NULL); 5153 if (ret) 5154 return ERR_PTR(ret); 5155 5156 return auth; 5157 } 5158 5159 static int mds_add_authorizer_challenge(struct ceph_connection *con, 5160 void *challenge_buf, int challenge_buf_len) 5161 { 5162 struct ceph_mds_session *s = con->private; 5163 struct ceph_mds_client *mdsc = s->s_mdsc; 5164 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5165 5166 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 5167 challenge_buf, challenge_buf_len); 5168 } 5169 5170 static int mds_verify_authorizer_reply(struct ceph_connection *con) 5171 { 5172 struct ceph_mds_session *s = con->private; 5173 struct ceph_mds_client *mdsc = s->s_mdsc; 5174 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5175 struct ceph_auth_handshake *auth = &s->s_auth; 5176 5177 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer, 5178 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len, 5179 NULL, NULL, NULL, NULL); 5180 } 5181 5182 static int mds_invalidate_authorizer(struct ceph_connection *con) 5183 { 5184 struct ceph_mds_session *s = con->private; 5185 struct ceph_mds_client *mdsc = s->s_mdsc; 5186 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5187 5188 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 5189 5190 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 5191 } 5192 5193 static int mds_get_auth_request(struct ceph_connection *con, 5194 void *buf, int *buf_len, 5195 void **authorizer, int *authorizer_len) 5196 { 5197 struct ceph_mds_session *s = con->private; 5198 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5199 struct ceph_auth_handshake *auth = &s->s_auth; 5200 int ret; 5201 5202 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5203 buf, buf_len); 5204 if (ret) 5205 return ret; 5206 5207 *authorizer = auth->authorizer_buf; 5208 *authorizer_len = auth->authorizer_buf_len; 5209 return 0; 5210 } 5211 5212 static int mds_handle_auth_reply_more(struct ceph_connection *con, 5213 void *reply, int reply_len, 5214 void *buf, int *buf_len, 5215 void **authorizer, int *authorizer_len) 5216 { 5217 struct ceph_mds_session *s = con->private; 5218 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5219 struct ceph_auth_handshake *auth = &s->s_auth; 5220 int ret; 5221 5222 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len, 5223 buf, buf_len); 5224 if (ret) 5225 return ret; 5226 5227 *authorizer = auth->authorizer_buf; 5228 *authorizer_len = auth->authorizer_buf_len; 5229 return 0; 5230 } 5231 5232 static int mds_handle_auth_done(struct ceph_connection *con, 5233 u64 global_id, void *reply, int reply_len, 5234 u8 *session_key, int *session_key_len, 5235 u8 *con_secret, int *con_secret_len) 5236 { 5237 struct ceph_mds_session *s = con->private; 5238 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5239 struct ceph_auth_handshake *auth = &s->s_auth; 5240 5241 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len, 5242 session_key, session_key_len, 5243 con_secret, con_secret_len); 5244 } 5245 5246 static int mds_handle_auth_bad_method(struct ceph_connection *con, 5247 int used_proto, int result, 5248 const int *allowed_protos, int proto_cnt, 5249 const int *allowed_modes, int mode_cnt) 5250 { 5251 struct ceph_mds_session *s = con->private; 5252 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc; 5253 int ret; 5254 5255 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS, 5256 used_proto, result, 5257 allowed_protos, proto_cnt, 5258 allowed_modes, mode_cnt)) { 5259 ret = ceph_monc_validate_auth(monc); 5260 if (ret) 5261 return ret; 5262 } 5263 5264 return -EACCES; 5265 } 5266 5267 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 5268 struct ceph_msg_header *hdr, int *skip) 5269 { 5270 struct ceph_msg *msg; 5271 int type = (int) le16_to_cpu(hdr->type); 5272 int front_len = (int) le32_to_cpu(hdr->front_len); 5273 5274 if (con->in_msg) 5275 return con->in_msg; 5276 5277 *skip = 0; 5278 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 5279 if (!msg) { 5280 pr_err("unable to allocate msg type %d len %d\n", 5281 type, front_len); 5282 return NULL; 5283 } 5284 5285 return msg; 5286 } 5287 5288 static int mds_sign_message(struct ceph_msg *msg) 5289 { 5290 struct ceph_mds_session *s = msg->con->private; 5291 struct ceph_auth_handshake *auth = &s->s_auth; 5292 5293 return ceph_auth_sign_message(auth, msg); 5294 } 5295 5296 static int mds_check_message_signature(struct ceph_msg *msg) 5297 { 5298 struct ceph_mds_session *s = msg->con->private; 5299 struct ceph_auth_handshake *auth = &s->s_auth; 5300 5301 return ceph_auth_check_message_signature(auth, msg); 5302 } 5303 5304 static const struct ceph_connection_operations mds_con_ops = { 5305 .get = mds_get_con, 5306 .put = mds_put_con, 5307 .alloc_msg = mds_alloc_msg, 5308 .dispatch = mds_dispatch, 5309 .peer_reset = mds_peer_reset, 5310 .get_authorizer = mds_get_authorizer, 5311 .add_authorizer_challenge = mds_add_authorizer_challenge, 5312 .verify_authorizer_reply = mds_verify_authorizer_reply, 5313 .invalidate_authorizer = mds_invalidate_authorizer, 5314 .sign_message = mds_sign_message, 5315 .check_message_signature = mds_check_message_signature, 5316 .get_auth_request = mds_get_auth_request, 5317 .handle_auth_reply_more = mds_handle_auth_reply_more, 5318 .handle_auth_done = mds_handle_auth_done, 5319 .handle_auth_bad_method = mds_handle_auth_bad_method, 5320 }; 5321 5322 /* eof */ 5323