1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 #include <linux/bitmap.h> 15 16 #include "super.h" 17 #include "mds_client.h" 18 19 #include <linux/ceph/ceph_features.h> 20 #include <linux/ceph/messenger.h> 21 #include <linux/ceph/decode.h> 22 #include <linux/ceph/pagelist.h> 23 #include <linux/ceph/auth.h> 24 #include <linux/ceph/debugfs.h> 25 26 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 27 28 /* 29 * A cluster of MDS (metadata server) daemons is responsible for 30 * managing the file system namespace (the directory hierarchy and 31 * inodes) and for coordinating shared access to storage. Metadata is 32 * partitioning hierarchically across a number of servers, and that 33 * partition varies over time as the cluster adjusts the distribution 34 * in order to balance load. 35 * 36 * The MDS client is primarily responsible to managing synchronous 37 * metadata requests for operations like open, unlink, and so forth. 38 * If there is a MDS failure, we find out about it when we (possibly 39 * request and) receive a new MDS map, and can resubmit affected 40 * requests. 41 * 42 * For the most part, though, we take advantage of a lossless 43 * communications channel to the MDS, and do not need to worry about 44 * timing out or resubmitting requests. 45 * 46 * We maintain a stateful "session" with each MDS we interact with. 47 * Within each session, we sent periodic heartbeat messages to ensure 48 * any capabilities or leases we have been issues remain valid. If 49 * the session times out and goes stale, our leases and capabilities 50 * are no longer valid. 51 */ 52 53 struct ceph_reconnect_state { 54 struct ceph_mds_session *session; 55 int nr_caps, nr_realms; 56 struct ceph_pagelist *pagelist; 57 unsigned msg_version; 58 bool allow_multi; 59 }; 60 61 static void __wake_requests(struct ceph_mds_client *mdsc, 62 struct list_head *head); 63 static void ceph_cap_release_work(struct work_struct *work); 64 static void ceph_cap_reclaim_work(struct work_struct *work); 65 66 static const struct ceph_connection_operations mds_con_ops; 67 68 69 /* 70 * mds reply parsing 71 */ 72 73 static int parse_reply_info_quota(void **p, void *end, 74 struct ceph_mds_reply_info_in *info) 75 { 76 u8 struct_v, struct_compat; 77 u32 struct_len; 78 79 ceph_decode_8_safe(p, end, struct_v, bad); 80 ceph_decode_8_safe(p, end, struct_compat, bad); 81 /* struct_v is expected to be >= 1. we only 82 * understand encoding with struct_compat == 1. */ 83 if (!struct_v || struct_compat != 1) 84 goto bad; 85 ceph_decode_32_safe(p, end, struct_len, bad); 86 ceph_decode_need(p, end, struct_len, bad); 87 end = *p + struct_len; 88 ceph_decode_64_safe(p, end, info->max_bytes, bad); 89 ceph_decode_64_safe(p, end, info->max_files, bad); 90 *p = end; 91 return 0; 92 bad: 93 return -EIO; 94 } 95 96 /* 97 * parse individual inode info 98 */ 99 static int parse_reply_info_in(void **p, void *end, 100 struct ceph_mds_reply_info_in *info, 101 u64 features) 102 { 103 int err = 0; 104 u8 struct_v = 0; 105 106 if (features == (u64)-1) { 107 u32 struct_len; 108 u8 struct_compat; 109 ceph_decode_8_safe(p, end, struct_v, bad); 110 ceph_decode_8_safe(p, end, struct_compat, bad); 111 /* struct_v is expected to be >= 1. we only understand 112 * encoding with struct_compat == 1. */ 113 if (!struct_v || struct_compat != 1) 114 goto bad; 115 ceph_decode_32_safe(p, end, struct_len, bad); 116 ceph_decode_need(p, end, struct_len, bad); 117 end = *p + struct_len; 118 } 119 120 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 121 info->in = *p; 122 *p += sizeof(struct ceph_mds_reply_inode) + 123 sizeof(*info->in->fragtree.splits) * 124 le32_to_cpu(info->in->fragtree.nsplits); 125 126 ceph_decode_32_safe(p, end, info->symlink_len, bad); 127 ceph_decode_need(p, end, info->symlink_len, bad); 128 info->symlink = *p; 129 *p += info->symlink_len; 130 131 ceph_decode_copy_safe(p, end, &info->dir_layout, 132 sizeof(info->dir_layout), bad); 133 ceph_decode_32_safe(p, end, info->xattr_len, bad); 134 ceph_decode_need(p, end, info->xattr_len, bad); 135 info->xattr_data = *p; 136 *p += info->xattr_len; 137 138 if (features == (u64)-1) { 139 /* inline data */ 140 ceph_decode_64_safe(p, end, info->inline_version, bad); 141 ceph_decode_32_safe(p, end, info->inline_len, bad); 142 ceph_decode_need(p, end, info->inline_len, bad); 143 info->inline_data = *p; 144 *p += info->inline_len; 145 /* quota */ 146 err = parse_reply_info_quota(p, end, info); 147 if (err < 0) 148 goto out_bad; 149 /* pool namespace */ 150 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 151 if (info->pool_ns_len > 0) { 152 ceph_decode_need(p, end, info->pool_ns_len, bad); 153 info->pool_ns_data = *p; 154 *p += info->pool_ns_len; 155 } 156 157 /* btime */ 158 ceph_decode_need(p, end, sizeof(info->btime), bad); 159 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 160 161 /* change attribute */ 162 ceph_decode_64_safe(p, end, info->change_attr, bad); 163 164 /* dir pin */ 165 if (struct_v >= 2) { 166 ceph_decode_32_safe(p, end, info->dir_pin, bad); 167 } else { 168 info->dir_pin = -ENODATA; 169 } 170 171 /* snapshot birth time, remains zero for v<=2 */ 172 if (struct_v >= 3) { 173 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 174 ceph_decode_copy(p, &info->snap_btime, 175 sizeof(info->snap_btime)); 176 } else { 177 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 178 } 179 180 /* snapshot count, remains zero for v<=3 */ 181 if (struct_v >= 4) { 182 ceph_decode_64_safe(p, end, info->rsnaps, bad); 183 } else { 184 info->rsnaps = 0; 185 } 186 187 *p = end; 188 } else { 189 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 190 ceph_decode_64_safe(p, end, info->inline_version, bad); 191 ceph_decode_32_safe(p, end, info->inline_len, bad); 192 ceph_decode_need(p, end, info->inline_len, bad); 193 info->inline_data = *p; 194 *p += info->inline_len; 195 } else 196 info->inline_version = CEPH_INLINE_NONE; 197 198 if (features & CEPH_FEATURE_MDS_QUOTA) { 199 err = parse_reply_info_quota(p, end, info); 200 if (err < 0) 201 goto out_bad; 202 } else { 203 info->max_bytes = 0; 204 info->max_files = 0; 205 } 206 207 info->pool_ns_len = 0; 208 info->pool_ns_data = NULL; 209 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 210 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 211 if (info->pool_ns_len > 0) { 212 ceph_decode_need(p, end, info->pool_ns_len, bad); 213 info->pool_ns_data = *p; 214 *p += info->pool_ns_len; 215 } 216 } 217 218 if (features & CEPH_FEATURE_FS_BTIME) { 219 ceph_decode_need(p, end, sizeof(info->btime), bad); 220 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 221 ceph_decode_64_safe(p, end, info->change_attr, bad); 222 } 223 224 info->dir_pin = -ENODATA; 225 /* info->snap_btime and info->rsnaps remain zero */ 226 } 227 return 0; 228 bad: 229 err = -EIO; 230 out_bad: 231 return err; 232 } 233 234 static int parse_reply_info_dir(void **p, void *end, 235 struct ceph_mds_reply_dirfrag **dirfrag, 236 u64 features) 237 { 238 if (features == (u64)-1) { 239 u8 struct_v, struct_compat; 240 u32 struct_len; 241 ceph_decode_8_safe(p, end, struct_v, bad); 242 ceph_decode_8_safe(p, end, struct_compat, bad); 243 /* struct_v is expected to be >= 1. we only understand 244 * encoding whose struct_compat == 1. */ 245 if (!struct_v || struct_compat != 1) 246 goto bad; 247 ceph_decode_32_safe(p, end, struct_len, bad); 248 ceph_decode_need(p, end, struct_len, bad); 249 end = *p + struct_len; 250 } 251 252 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 253 *dirfrag = *p; 254 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 255 if (unlikely(*p > end)) 256 goto bad; 257 if (features == (u64)-1) 258 *p = end; 259 return 0; 260 bad: 261 return -EIO; 262 } 263 264 static int parse_reply_info_lease(void **p, void *end, 265 struct ceph_mds_reply_lease **lease, 266 u64 features) 267 { 268 if (features == (u64)-1) { 269 u8 struct_v, struct_compat; 270 u32 struct_len; 271 ceph_decode_8_safe(p, end, struct_v, bad); 272 ceph_decode_8_safe(p, end, struct_compat, bad); 273 /* struct_v is expected to be >= 1. we only understand 274 * encoding whose struct_compat == 1. */ 275 if (!struct_v || struct_compat != 1) 276 goto bad; 277 ceph_decode_32_safe(p, end, struct_len, bad); 278 ceph_decode_need(p, end, struct_len, bad); 279 end = *p + struct_len; 280 } 281 282 ceph_decode_need(p, end, sizeof(**lease), bad); 283 *lease = *p; 284 *p += sizeof(**lease); 285 if (features == (u64)-1) 286 *p = end; 287 return 0; 288 bad: 289 return -EIO; 290 } 291 292 /* 293 * parse a normal reply, which may contain a (dir+)dentry and/or a 294 * target inode. 295 */ 296 static int parse_reply_info_trace(void **p, void *end, 297 struct ceph_mds_reply_info_parsed *info, 298 u64 features) 299 { 300 int err; 301 302 if (info->head->is_dentry) { 303 err = parse_reply_info_in(p, end, &info->diri, features); 304 if (err < 0) 305 goto out_bad; 306 307 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 308 if (err < 0) 309 goto out_bad; 310 311 ceph_decode_32_safe(p, end, info->dname_len, bad); 312 ceph_decode_need(p, end, info->dname_len, bad); 313 info->dname = *p; 314 *p += info->dname_len; 315 316 err = parse_reply_info_lease(p, end, &info->dlease, features); 317 if (err < 0) 318 goto out_bad; 319 } 320 321 if (info->head->is_target) { 322 err = parse_reply_info_in(p, end, &info->targeti, features); 323 if (err < 0) 324 goto out_bad; 325 } 326 327 if (unlikely(*p != end)) 328 goto bad; 329 return 0; 330 331 bad: 332 err = -EIO; 333 out_bad: 334 pr_err("problem parsing mds trace %d\n", err); 335 return err; 336 } 337 338 /* 339 * parse readdir results 340 */ 341 static int parse_reply_info_readdir(void **p, void *end, 342 struct ceph_mds_reply_info_parsed *info, 343 u64 features) 344 { 345 u32 num, i = 0; 346 int err; 347 348 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 349 if (err < 0) 350 goto out_bad; 351 352 ceph_decode_need(p, end, sizeof(num) + 2, bad); 353 num = ceph_decode_32(p); 354 { 355 u16 flags = ceph_decode_16(p); 356 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 357 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 358 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 359 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 360 } 361 if (num == 0) 362 goto done; 363 364 BUG_ON(!info->dir_entries); 365 if ((unsigned long)(info->dir_entries + num) > 366 (unsigned long)info->dir_entries + info->dir_buf_size) { 367 pr_err("dir contents are larger than expected\n"); 368 WARN_ON(1); 369 goto bad; 370 } 371 372 info->dir_nr = num; 373 while (num) { 374 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 375 /* dentry */ 376 ceph_decode_32_safe(p, end, rde->name_len, bad); 377 ceph_decode_need(p, end, rde->name_len, bad); 378 rde->name = *p; 379 *p += rde->name_len; 380 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 381 382 /* dentry lease */ 383 err = parse_reply_info_lease(p, end, &rde->lease, features); 384 if (err) 385 goto out_bad; 386 /* inode */ 387 err = parse_reply_info_in(p, end, &rde->inode, features); 388 if (err < 0) 389 goto out_bad; 390 /* ceph_readdir_prepopulate() will update it */ 391 rde->offset = 0; 392 i++; 393 num--; 394 } 395 396 done: 397 /* Skip over any unrecognized fields */ 398 *p = end; 399 return 0; 400 401 bad: 402 err = -EIO; 403 out_bad: 404 pr_err("problem parsing dir contents %d\n", err); 405 return err; 406 } 407 408 /* 409 * parse fcntl F_GETLK results 410 */ 411 static int parse_reply_info_filelock(void **p, void *end, 412 struct ceph_mds_reply_info_parsed *info, 413 u64 features) 414 { 415 if (*p + sizeof(*info->filelock_reply) > end) 416 goto bad; 417 418 info->filelock_reply = *p; 419 420 /* Skip over any unrecognized fields */ 421 *p = end; 422 return 0; 423 bad: 424 return -EIO; 425 } 426 427 428 #if BITS_PER_LONG == 64 429 430 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 431 432 static int ceph_parse_deleg_inos(void **p, void *end, 433 struct ceph_mds_session *s) 434 { 435 u32 sets; 436 437 ceph_decode_32_safe(p, end, sets, bad); 438 dout("got %u sets of delegated inodes\n", sets); 439 while (sets--) { 440 u64 start, len; 441 442 ceph_decode_64_safe(p, end, start, bad); 443 ceph_decode_64_safe(p, end, len, bad); 444 445 /* Don't accept a delegation of system inodes */ 446 if (start < CEPH_INO_SYSTEM_BASE) { 447 pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n", 448 start, len); 449 continue; 450 } 451 while (len--) { 452 int err = xa_insert(&s->s_delegated_inos, start++, 453 DELEGATED_INO_AVAILABLE, 454 GFP_KERNEL); 455 if (!err) { 456 dout("added delegated inode 0x%llx\n", 457 start - 1); 458 } else if (err == -EBUSY) { 459 pr_warn("MDS delegated inode 0x%llx more than once.\n", 460 start - 1); 461 } else { 462 return err; 463 } 464 } 465 } 466 return 0; 467 bad: 468 return -EIO; 469 } 470 471 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 472 { 473 unsigned long ino; 474 void *val; 475 476 xa_for_each(&s->s_delegated_inos, ino, val) { 477 val = xa_erase(&s->s_delegated_inos, ino); 478 if (val == DELEGATED_INO_AVAILABLE) 479 return ino; 480 } 481 return 0; 482 } 483 484 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 485 { 486 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 487 GFP_KERNEL); 488 } 489 #else /* BITS_PER_LONG == 64 */ 490 /* 491 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 492 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 493 * and bottom words? 494 */ 495 static int ceph_parse_deleg_inos(void **p, void *end, 496 struct ceph_mds_session *s) 497 { 498 u32 sets; 499 500 ceph_decode_32_safe(p, end, sets, bad); 501 if (sets) 502 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 503 return 0; 504 bad: 505 return -EIO; 506 } 507 508 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 509 { 510 return 0; 511 } 512 513 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 514 { 515 return 0; 516 } 517 #endif /* BITS_PER_LONG == 64 */ 518 519 /* 520 * parse create results 521 */ 522 static int parse_reply_info_create(void **p, void *end, 523 struct ceph_mds_reply_info_parsed *info, 524 u64 features, struct ceph_mds_session *s) 525 { 526 int ret; 527 528 if (features == (u64)-1 || 529 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 530 if (*p == end) { 531 /* Malformed reply? */ 532 info->has_create_ino = false; 533 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 534 info->has_create_ino = true; 535 /* struct_v, struct_compat, and len */ 536 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad); 537 ceph_decode_64_safe(p, end, info->ino, bad); 538 ret = ceph_parse_deleg_inos(p, end, s); 539 if (ret) 540 return ret; 541 } else { 542 /* legacy */ 543 ceph_decode_64_safe(p, end, info->ino, bad); 544 info->has_create_ino = true; 545 } 546 } else { 547 if (*p != end) 548 goto bad; 549 } 550 551 /* Skip over any unrecognized fields */ 552 *p = end; 553 return 0; 554 bad: 555 return -EIO; 556 } 557 558 static int parse_reply_info_getvxattr(void **p, void *end, 559 struct ceph_mds_reply_info_parsed *info, 560 u64 features) 561 { 562 u32 value_len; 563 564 ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */ 565 ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */ 566 ceph_decode_skip_32(p, end, bad); /* skip payload length */ 567 568 ceph_decode_32_safe(p, end, value_len, bad); 569 570 if (value_len == end - *p) { 571 info->xattr_info.xattr_value = *p; 572 info->xattr_info.xattr_value_len = value_len; 573 *p = end; 574 return value_len; 575 } 576 bad: 577 return -EIO; 578 } 579 580 /* 581 * parse extra results 582 */ 583 static int parse_reply_info_extra(void **p, void *end, 584 struct ceph_mds_reply_info_parsed *info, 585 u64 features, struct ceph_mds_session *s) 586 { 587 u32 op = le32_to_cpu(info->head->op); 588 589 if (op == CEPH_MDS_OP_GETFILELOCK) 590 return parse_reply_info_filelock(p, end, info, features); 591 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 592 return parse_reply_info_readdir(p, end, info, features); 593 else if (op == CEPH_MDS_OP_CREATE) 594 return parse_reply_info_create(p, end, info, features, s); 595 else if (op == CEPH_MDS_OP_GETVXATTR) 596 return parse_reply_info_getvxattr(p, end, info, features); 597 else 598 return -EIO; 599 } 600 601 /* 602 * parse entire mds reply 603 */ 604 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 605 struct ceph_mds_reply_info_parsed *info, 606 u64 features) 607 { 608 void *p, *end; 609 u32 len; 610 int err; 611 612 info->head = msg->front.iov_base; 613 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 614 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 615 616 /* trace */ 617 ceph_decode_32_safe(&p, end, len, bad); 618 if (len > 0) { 619 ceph_decode_need(&p, end, len, bad); 620 err = parse_reply_info_trace(&p, p+len, info, features); 621 if (err < 0) 622 goto out_bad; 623 } 624 625 /* extra */ 626 ceph_decode_32_safe(&p, end, len, bad); 627 if (len > 0) { 628 ceph_decode_need(&p, end, len, bad); 629 err = parse_reply_info_extra(&p, p+len, info, features, s); 630 if (err < 0) 631 goto out_bad; 632 } 633 634 /* snap blob */ 635 ceph_decode_32_safe(&p, end, len, bad); 636 info->snapblob_len = len; 637 info->snapblob = p; 638 p += len; 639 640 if (p != end) 641 goto bad; 642 return 0; 643 644 bad: 645 err = -EIO; 646 out_bad: 647 pr_err("mds parse_reply err %d\n", err); 648 return err; 649 } 650 651 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 652 { 653 if (!info->dir_entries) 654 return; 655 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 656 } 657 658 /* 659 * In async unlink case the kclient won't wait for the first reply 660 * from MDS and just drop all the links and unhash the dentry and then 661 * succeeds immediately. 662 * 663 * For any new create/link/rename,etc requests followed by using the 664 * same file names we must wait for the first reply of the inflight 665 * unlink request, or the MDS possibly will fail these following 666 * requests with -EEXIST if the inflight async unlink request was 667 * delayed for some reasons. 668 * 669 * And the worst case is that for the none async openc request it will 670 * successfully open the file if the CDentry hasn't been unlinked yet, 671 * but later the previous delayed async unlink request will remove the 672 * CDenty. That means the just created file is possiblly deleted later 673 * by accident. 674 * 675 * We need to wait for the inflight async unlink requests to finish 676 * when creating new files/directories by using the same file names. 677 */ 678 int ceph_wait_on_conflict_unlink(struct dentry *dentry) 679 { 680 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 681 struct dentry *pdentry = dentry->d_parent; 682 struct dentry *udentry, *found = NULL; 683 struct ceph_dentry_info *di; 684 struct qstr dname; 685 u32 hash = dentry->d_name.hash; 686 int err; 687 688 dname.name = dentry->d_name.name; 689 dname.len = dentry->d_name.len; 690 691 rcu_read_lock(); 692 hash_for_each_possible_rcu(fsc->async_unlink_conflict, di, 693 hnode, hash) { 694 udentry = di->dentry; 695 696 spin_lock(&udentry->d_lock); 697 if (udentry->d_name.hash != hash) 698 goto next; 699 if (unlikely(udentry->d_parent != pdentry)) 700 goto next; 701 if (!hash_hashed(&di->hnode)) 702 goto next; 703 704 if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags)) 705 pr_warn("%s dentry %p:%pd async unlink bit is not set\n", 706 __func__, dentry, dentry); 707 708 if (!d_same_name(udentry, pdentry, &dname)) 709 goto next; 710 711 spin_unlock(&udentry->d_lock); 712 found = dget(udentry); 713 break; 714 next: 715 spin_unlock(&udentry->d_lock); 716 } 717 rcu_read_unlock(); 718 719 if (likely(!found)) 720 return 0; 721 722 dout("%s dentry %p:%pd conflict with old %p:%pd\n", __func__, 723 dentry, dentry, found, found); 724 725 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT, 726 TASK_KILLABLE); 727 dput(found); 728 return err; 729 } 730 731 732 /* 733 * sessions 734 */ 735 const char *ceph_session_state_name(int s) 736 { 737 switch (s) { 738 case CEPH_MDS_SESSION_NEW: return "new"; 739 case CEPH_MDS_SESSION_OPENING: return "opening"; 740 case CEPH_MDS_SESSION_OPEN: return "open"; 741 case CEPH_MDS_SESSION_HUNG: return "hung"; 742 case CEPH_MDS_SESSION_CLOSING: return "closing"; 743 case CEPH_MDS_SESSION_CLOSED: return "closed"; 744 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 745 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 746 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 747 default: return "???"; 748 } 749 } 750 751 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 752 { 753 if (refcount_inc_not_zero(&s->s_ref)) 754 return s; 755 return NULL; 756 } 757 758 void ceph_put_mds_session(struct ceph_mds_session *s) 759 { 760 if (IS_ERR_OR_NULL(s)) 761 return; 762 763 if (refcount_dec_and_test(&s->s_ref)) { 764 if (s->s_auth.authorizer) 765 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 766 WARN_ON(mutex_is_locked(&s->s_mutex)); 767 xa_destroy(&s->s_delegated_inos); 768 kfree(s); 769 } 770 } 771 772 /* 773 * called under mdsc->mutex 774 */ 775 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 776 int mds) 777 { 778 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 779 return NULL; 780 return ceph_get_mds_session(mdsc->sessions[mds]); 781 } 782 783 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 784 { 785 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 786 return false; 787 else 788 return true; 789 } 790 791 static int __verify_registered_session(struct ceph_mds_client *mdsc, 792 struct ceph_mds_session *s) 793 { 794 if (s->s_mds >= mdsc->max_sessions || 795 mdsc->sessions[s->s_mds] != s) 796 return -ENOENT; 797 return 0; 798 } 799 800 /* 801 * create+register a new session for given mds. 802 * called under mdsc->mutex. 803 */ 804 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 805 int mds) 806 { 807 struct ceph_mds_session *s; 808 809 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 810 return ERR_PTR(-EIO); 811 812 if (mds >= mdsc->mdsmap->possible_max_rank) 813 return ERR_PTR(-EINVAL); 814 815 s = kzalloc(sizeof(*s), GFP_NOFS); 816 if (!s) 817 return ERR_PTR(-ENOMEM); 818 819 if (mds >= mdsc->max_sessions) { 820 int newmax = 1 << get_count_order(mds + 1); 821 struct ceph_mds_session **sa; 822 823 dout("%s: realloc to %d\n", __func__, newmax); 824 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 825 if (!sa) 826 goto fail_realloc; 827 if (mdsc->sessions) { 828 memcpy(sa, mdsc->sessions, 829 mdsc->max_sessions * sizeof(void *)); 830 kfree(mdsc->sessions); 831 } 832 mdsc->sessions = sa; 833 mdsc->max_sessions = newmax; 834 } 835 836 dout("%s: mds%d\n", __func__, mds); 837 s->s_mdsc = mdsc; 838 s->s_mds = mds; 839 s->s_state = CEPH_MDS_SESSION_NEW; 840 mutex_init(&s->s_mutex); 841 842 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 843 844 atomic_set(&s->s_cap_gen, 1); 845 s->s_cap_ttl = jiffies - 1; 846 847 spin_lock_init(&s->s_cap_lock); 848 INIT_LIST_HEAD(&s->s_caps); 849 refcount_set(&s->s_ref, 1); 850 INIT_LIST_HEAD(&s->s_waiting); 851 INIT_LIST_HEAD(&s->s_unsafe); 852 xa_init(&s->s_delegated_inos); 853 INIT_LIST_HEAD(&s->s_cap_releases); 854 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 855 856 INIT_LIST_HEAD(&s->s_cap_dirty); 857 INIT_LIST_HEAD(&s->s_cap_flushing); 858 859 mdsc->sessions[mds] = s; 860 atomic_inc(&mdsc->num_sessions); 861 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 862 863 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 864 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 865 866 return s; 867 868 fail_realloc: 869 kfree(s); 870 return ERR_PTR(-ENOMEM); 871 } 872 873 /* 874 * called under mdsc->mutex 875 */ 876 static void __unregister_session(struct ceph_mds_client *mdsc, 877 struct ceph_mds_session *s) 878 { 879 dout("__unregister_session mds%d %p\n", s->s_mds, s); 880 BUG_ON(mdsc->sessions[s->s_mds] != s); 881 mdsc->sessions[s->s_mds] = NULL; 882 ceph_con_close(&s->s_con); 883 ceph_put_mds_session(s); 884 atomic_dec(&mdsc->num_sessions); 885 } 886 887 /* 888 * drop session refs in request. 889 * 890 * should be last request ref, or hold mdsc->mutex 891 */ 892 static void put_request_session(struct ceph_mds_request *req) 893 { 894 if (req->r_session) { 895 ceph_put_mds_session(req->r_session); 896 req->r_session = NULL; 897 } 898 } 899 900 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc, 901 void (*cb)(struct ceph_mds_session *), 902 bool check_state) 903 { 904 int mds; 905 906 mutex_lock(&mdsc->mutex); 907 for (mds = 0; mds < mdsc->max_sessions; ++mds) { 908 struct ceph_mds_session *s; 909 910 s = __ceph_lookup_mds_session(mdsc, mds); 911 if (!s) 912 continue; 913 914 if (check_state && !check_session_state(s)) { 915 ceph_put_mds_session(s); 916 continue; 917 } 918 919 mutex_unlock(&mdsc->mutex); 920 cb(s); 921 ceph_put_mds_session(s); 922 mutex_lock(&mdsc->mutex); 923 } 924 mutex_unlock(&mdsc->mutex); 925 } 926 927 void ceph_mdsc_release_request(struct kref *kref) 928 { 929 struct ceph_mds_request *req = container_of(kref, 930 struct ceph_mds_request, 931 r_kref); 932 ceph_mdsc_release_dir_caps_no_check(req); 933 destroy_reply_info(&req->r_reply_info); 934 if (req->r_request) 935 ceph_msg_put(req->r_request); 936 if (req->r_reply) 937 ceph_msg_put(req->r_reply); 938 if (req->r_inode) { 939 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 940 iput(req->r_inode); 941 } 942 if (req->r_parent) { 943 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 944 iput(req->r_parent); 945 } 946 iput(req->r_target_inode); 947 if (req->r_dentry) 948 dput(req->r_dentry); 949 if (req->r_old_dentry) 950 dput(req->r_old_dentry); 951 if (req->r_old_dentry_dir) { 952 /* 953 * track (and drop pins for) r_old_dentry_dir 954 * separately, since r_old_dentry's d_parent may have 955 * changed between the dir mutex being dropped and 956 * this request being freed. 957 */ 958 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 959 CEPH_CAP_PIN); 960 iput(req->r_old_dentry_dir); 961 } 962 kfree(req->r_path1); 963 kfree(req->r_path2); 964 put_cred(req->r_cred); 965 if (req->r_pagelist) 966 ceph_pagelist_release(req->r_pagelist); 967 put_request_session(req); 968 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 969 WARN_ON_ONCE(!list_empty(&req->r_wait)); 970 kmem_cache_free(ceph_mds_request_cachep, req); 971 } 972 973 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 974 975 /* 976 * lookup session, bump ref if found. 977 * 978 * called under mdsc->mutex. 979 */ 980 static struct ceph_mds_request * 981 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 982 { 983 struct ceph_mds_request *req; 984 985 req = lookup_request(&mdsc->request_tree, tid); 986 if (req) 987 ceph_mdsc_get_request(req); 988 989 return req; 990 } 991 992 /* 993 * Register an in-flight request, and assign a tid. Link to directory 994 * are modifying (if any). 995 * 996 * Called under mdsc->mutex. 997 */ 998 static void __register_request(struct ceph_mds_client *mdsc, 999 struct ceph_mds_request *req, 1000 struct inode *dir) 1001 { 1002 int ret = 0; 1003 1004 req->r_tid = ++mdsc->last_tid; 1005 if (req->r_num_caps) { 1006 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 1007 req->r_num_caps); 1008 if (ret < 0) { 1009 pr_err("__register_request %p " 1010 "failed to reserve caps: %d\n", req, ret); 1011 /* set req->r_err to fail early from __do_request */ 1012 req->r_err = ret; 1013 return; 1014 } 1015 } 1016 dout("__register_request %p tid %lld\n", req, req->r_tid); 1017 ceph_mdsc_get_request(req); 1018 insert_request(&mdsc->request_tree, req); 1019 1020 req->r_cred = get_current_cred(); 1021 1022 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 1023 mdsc->oldest_tid = req->r_tid; 1024 1025 if (dir) { 1026 struct ceph_inode_info *ci = ceph_inode(dir); 1027 1028 ihold(dir); 1029 req->r_unsafe_dir = dir; 1030 spin_lock(&ci->i_unsafe_lock); 1031 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 1032 spin_unlock(&ci->i_unsafe_lock); 1033 } 1034 } 1035 1036 static void __unregister_request(struct ceph_mds_client *mdsc, 1037 struct ceph_mds_request *req) 1038 { 1039 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 1040 1041 /* Never leave an unregistered request on an unsafe list! */ 1042 list_del_init(&req->r_unsafe_item); 1043 1044 if (req->r_tid == mdsc->oldest_tid) { 1045 struct rb_node *p = rb_next(&req->r_node); 1046 mdsc->oldest_tid = 0; 1047 while (p) { 1048 struct ceph_mds_request *next_req = 1049 rb_entry(p, struct ceph_mds_request, r_node); 1050 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 1051 mdsc->oldest_tid = next_req->r_tid; 1052 break; 1053 } 1054 p = rb_next(p); 1055 } 1056 } 1057 1058 erase_request(&mdsc->request_tree, req); 1059 1060 if (req->r_unsafe_dir) { 1061 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 1062 spin_lock(&ci->i_unsafe_lock); 1063 list_del_init(&req->r_unsafe_dir_item); 1064 spin_unlock(&ci->i_unsafe_lock); 1065 } 1066 if (req->r_target_inode && 1067 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 1068 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 1069 spin_lock(&ci->i_unsafe_lock); 1070 list_del_init(&req->r_unsafe_target_item); 1071 spin_unlock(&ci->i_unsafe_lock); 1072 } 1073 1074 if (req->r_unsafe_dir) { 1075 iput(req->r_unsafe_dir); 1076 req->r_unsafe_dir = NULL; 1077 } 1078 1079 complete_all(&req->r_safe_completion); 1080 1081 ceph_mdsc_put_request(req); 1082 } 1083 1084 /* 1085 * Walk back up the dentry tree until we hit a dentry representing a 1086 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 1087 * when calling this) to ensure that the objects won't disappear while we're 1088 * working with them. Once we hit a candidate dentry, we attempt to take a 1089 * reference to it, and return that as the result. 1090 */ 1091 static struct inode *get_nonsnap_parent(struct dentry *dentry) 1092 { 1093 struct inode *inode = NULL; 1094 1095 while (dentry && !IS_ROOT(dentry)) { 1096 inode = d_inode_rcu(dentry); 1097 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 1098 break; 1099 dentry = dentry->d_parent; 1100 } 1101 if (inode) 1102 inode = igrab(inode); 1103 return inode; 1104 } 1105 1106 /* 1107 * Choose mds to send request to next. If there is a hint set in the 1108 * request (e.g., due to a prior forward hint from the mds), use that. 1109 * Otherwise, consult frag tree and/or caps to identify the 1110 * appropriate mds. If all else fails, choose randomly. 1111 * 1112 * Called under mdsc->mutex. 1113 */ 1114 static int __choose_mds(struct ceph_mds_client *mdsc, 1115 struct ceph_mds_request *req, 1116 bool *random) 1117 { 1118 struct inode *inode; 1119 struct ceph_inode_info *ci; 1120 struct ceph_cap *cap; 1121 int mode = req->r_direct_mode; 1122 int mds = -1; 1123 u32 hash = req->r_direct_hash; 1124 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1125 1126 if (random) 1127 *random = false; 1128 1129 /* 1130 * is there a specific mds we should try? ignore hint if we have 1131 * no session and the mds is not up (active or recovering). 1132 */ 1133 if (req->r_resend_mds >= 0 && 1134 (__have_session(mdsc, req->r_resend_mds) || 1135 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1136 dout("%s using resend_mds mds%d\n", __func__, 1137 req->r_resend_mds); 1138 return req->r_resend_mds; 1139 } 1140 1141 if (mode == USE_RANDOM_MDS) 1142 goto random; 1143 1144 inode = NULL; 1145 if (req->r_inode) { 1146 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1147 inode = req->r_inode; 1148 ihold(inode); 1149 } else { 1150 /* req->r_dentry is non-null for LSSNAP request */ 1151 rcu_read_lock(); 1152 inode = get_nonsnap_parent(req->r_dentry); 1153 rcu_read_unlock(); 1154 dout("%s using snapdir's parent %p\n", __func__, inode); 1155 } 1156 } else if (req->r_dentry) { 1157 /* ignore race with rename; old or new d_parent is okay */ 1158 struct dentry *parent; 1159 struct inode *dir; 1160 1161 rcu_read_lock(); 1162 parent = READ_ONCE(req->r_dentry->d_parent); 1163 dir = req->r_parent ? : d_inode_rcu(parent); 1164 1165 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1166 /* not this fs or parent went negative */ 1167 inode = d_inode(req->r_dentry); 1168 if (inode) 1169 ihold(inode); 1170 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1171 /* direct snapped/virtual snapdir requests 1172 * based on parent dir inode */ 1173 inode = get_nonsnap_parent(parent); 1174 dout("%s using nonsnap parent %p\n", __func__, inode); 1175 } else { 1176 /* dentry target */ 1177 inode = d_inode(req->r_dentry); 1178 if (!inode || mode == USE_AUTH_MDS) { 1179 /* dir + name */ 1180 inode = igrab(dir); 1181 hash = ceph_dentry_hash(dir, req->r_dentry); 1182 is_hash = true; 1183 } else { 1184 ihold(inode); 1185 } 1186 } 1187 rcu_read_unlock(); 1188 } 1189 1190 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, 1191 hash, mode); 1192 if (!inode) 1193 goto random; 1194 ci = ceph_inode(inode); 1195 1196 if (is_hash && S_ISDIR(inode->i_mode)) { 1197 struct ceph_inode_frag frag; 1198 int found; 1199 1200 ceph_choose_frag(ci, hash, &frag, &found); 1201 if (found) { 1202 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1203 u8 r; 1204 1205 /* choose a random replica */ 1206 get_random_bytes(&r, 1); 1207 r %= frag.ndist; 1208 mds = frag.dist[r]; 1209 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", 1210 __func__, inode, ceph_vinop(inode), 1211 frag.frag, mds, (int)r, frag.ndist); 1212 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1213 CEPH_MDS_STATE_ACTIVE && 1214 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1215 goto out; 1216 } 1217 1218 /* since this file/dir wasn't known to be 1219 * replicated, then we want to look for the 1220 * authoritative mds. */ 1221 if (frag.mds >= 0) { 1222 /* choose auth mds */ 1223 mds = frag.mds; 1224 dout("%s %p %llx.%llx frag %u mds%d (auth)\n", 1225 __func__, inode, ceph_vinop(inode), 1226 frag.frag, mds); 1227 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1228 CEPH_MDS_STATE_ACTIVE) { 1229 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1230 mds)) 1231 goto out; 1232 } 1233 } 1234 mode = USE_AUTH_MDS; 1235 } 1236 } 1237 1238 spin_lock(&ci->i_ceph_lock); 1239 cap = NULL; 1240 if (mode == USE_AUTH_MDS) 1241 cap = ci->i_auth_cap; 1242 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1243 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1244 if (!cap) { 1245 spin_unlock(&ci->i_ceph_lock); 1246 iput(inode); 1247 goto random; 1248 } 1249 mds = cap->session->s_mds; 1250 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, 1251 inode, ceph_vinop(inode), mds, 1252 cap == ci->i_auth_cap ? "auth " : "", cap); 1253 spin_unlock(&ci->i_ceph_lock); 1254 out: 1255 iput(inode); 1256 return mds; 1257 1258 random: 1259 if (random) 1260 *random = true; 1261 1262 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1263 dout("%s chose random mds%d\n", __func__, mds); 1264 return mds; 1265 } 1266 1267 1268 /* 1269 * session messages 1270 */ 1271 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq) 1272 { 1273 struct ceph_msg *msg; 1274 struct ceph_mds_session_head *h; 1275 1276 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1277 false); 1278 if (!msg) { 1279 pr_err("ENOMEM creating session %s msg\n", 1280 ceph_session_op_name(op)); 1281 return NULL; 1282 } 1283 h = msg->front.iov_base; 1284 h->op = cpu_to_le32(op); 1285 h->seq = cpu_to_le64(seq); 1286 1287 return msg; 1288 } 1289 1290 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1291 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1292 static int encode_supported_features(void **p, void *end) 1293 { 1294 static const size_t count = ARRAY_SIZE(feature_bits); 1295 1296 if (count > 0) { 1297 size_t i; 1298 size_t size = FEATURE_BYTES(count); 1299 unsigned long bit; 1300 1301 if (WARN_ON_ONCE(*p + 4 + size > end)) 1302 return -ERANGE; 1303 1304 ceph_encode_32(p, size); 1305 memset(*p, 0, size); 1306 for (i = 0; i < count; i++) { 1307 bit = feature_bits[i]; 1308 ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8); 1309 } 1310 *p += size; 1311 } else { 1312 if (WARN_ON_ONCE(*p + 4 > end)) 1313 return -ERANGE; 1314 1315 ceph_encode_32(p, 0); 1316 } 1317 1318 return 0; 1319 } 1320 1321 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1322 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1323 static int encode_metric_spec(void **p, void *end) 1324 { 1325 static const size_t count = ARRAY_SIZE(metric_bits); 1326 1327 /* header */ 1328 if (WARN_ON_ONCE(*p + 2 > end)) 1329 return -ERANGE; 1330 1331 ceph_encode_8(p, 1); /* version */ 1332 ceph_encode_8(p, 1); /* compat */ 1333 1334 if (count > 0) { 1335 size_t i; 1336 size_t size = METRIC_BYTES(count); 1337 1338 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1339 return -ERANGE; 1340 1341 /* metric spec info length */ 1342 ceph_encode_32(p, 4 + size); 1343 1344 /* metric spec */ 1345 ceph_encode_32(p, size); 1346 memset(*p, 0, size); 1347 for (i = 0; i < count; i++) 1348 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1349 *p += size; 1350 } else { 1351 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1352 return -ERANGE; 1353 1354 /* metric spec info length */ 1355 ceph_encode_32(p, 4); 1356 /* metric spec */ 1357 ceph_encode_32(p, 0); 1358 } 1359 1360 return 0; 1361 } 1362 1363 /* 1364 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1365 * to include additional client metadata fields. 1366 */ 1367 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1368 { 1369 struct ceph_msg *msg; 1370 struct ceph_mds_session_head *h; 1371 int i; 1372 int extra_bytes = 0; 1373 int metadata_key_count = 0; 1374 struct ceph_options *opt = mdsc->fsc->client->options; 1375 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1376 size_t size, count; 1377 void *p, *end; 1378 int ret; 1379 1380 const char* metadata[][2] = { 1381 {"hostname", mdsc->nodename}, 1382 {"kernel_version", init_utsname()->release}, 1383 {"entity_id", opt->name ? : ""}, 1384 {"root", fsopt->server_path ? : "/"}, 1385 {NULL, NULL} 1386 }; 1387 1388 /* Calculate serialized length of metadata */ 1389 extra_bytes = 4; /* map length */ 1390 for (i = 0; metadata[i][0]; ++i) { 1391 extra_bytes += 8 + strlen(metadata[i][0]) + 1392 strlen(metadata[i][1]); 1393 metadata_key_count++; 1394 } 1395 1396 /* supported feature */ 1397 size = 0; 1398 count = ARRAY_SIZE(feature_bits); 1399 if (count > 0) 1400 size = FEATURE_BYTES(count); 1401 extra_bytes += 4 + size; 1402 1403 /* metric spec */ 1404 size = 0; 1405 count = ARRAY_SIZE(metric_bits); 1406 if (count > 0) 1407 size = METRIC_BYTES(count); 1408 extra_bytes += 2 + 4 + 4 + size; 1409 1410 /* Allocate the message */ 1411 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1412 GFP_NOFS, false); 1413 if (!msg) { 1414 pr_err("ENOMEM creating session open msg\n"); 1415 return ERR_PTR(-ENOMEM); 1416 } 1417 p = msg->front.iov_base; 1418 end = p + msg->front.iov_len; 1419 1420 h = p; 1421 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1422 h->seq = cpu_to_le64(seq); 1423 1424 /* 1425 * Serialize client metadata into waiting buffer space, using 1426 * the format that userspace expects for map<string, string> 1427 * 1428 * ClientSession messages with metadata are v4 1429 */ 1430 msg->hdr.version = cpu_to_le16(4); 1431 msg->hdr.compat_version = cpu_to_le16(1); 1432 1433 /* The write pointer, following the session_head structure */ 1434 p += sizeof(*h); 1435 1436 /* Number of entries in the map */ 1437 ceph_encode_32(&p, metadata_key_count); 1438 1439 /* Two length-prefixed strings for each entry in the map */ 1440 for (i = 0; metadata[i][0]; ++i) { 1441 size_t const key_len = strlen(metadata[i][0]); 1442 size_t const val_len = strlen(metadata[i][1]); 1443 1444 ceph_encode_32(&p, key_len); 1445 memcpy(p, metadata[i][0], key_len); 1446 p += key_len; 1447 ceph_encode_32(&p, val_len); 1448 memcpy(p, metadata[i][1], val_len); 1449 p += val_len; 1450 } 1451 1452 ret = encode_supported_features(&p, end); 1453 if (ret) { 1454 pr_err("encode_supported_features failed!\n"); 1455 ceph_msg_put(msg); 1456 return ERR_PTR(ret); 1457 } 1458 1459 ret = encode_metric_spec(&p, end); 1460 if (ret) { 1461 pr_err("encode_metric_spec failed!\n"); 1462 ceph_msg_put(msg); 1463 return ERR_PTR(ret); 1464 } 1465 1466 msg->front.iov_len = p - msg->front.iov_base; 1467 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1468 1469 return msg; 1470 } 1471 1472 /* 1473 * send session open request. 1474 * 1475 * called under mdsc->mutex 1476 */ 1477 static int __open_session(struct ceph_mds_client *mdsc, 1478 struct ceph_mds_session *session) 1479 { 1480 struct ceph_msg *msg; 1481 int mstate; 1482 int mds = session->s_mds; 1483 1484 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 1485 return -EIO; 1486 1487 /* wait for mds to go active? */ 1488 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1489 dout("open_session to mds%d (%s)\n", mds, 1490 ceph_mds_state_name(mstate)); 1491 session->s_state = CEPH_MDS_SESSION_OPENING; 1492 session->s_renew_requested = jiffies; 1493 1494 /* send connect message */ 1495 msg = create_session_open_msg(mdsc, session->s_seq); 1496 if (IS_ERR(msg)) 1497 return PTR_ERR(msg); 1498 ceph_con_send(&session->s_con, msg); 1499 return 0; 1500 } 1501 1502 /* 1503 * open sessions for any export targets for the given mds 1504 * 1505 * called under mdsc->mutex 1506 */ 1507 static struct ceph_mds_session * 1508 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1509 { 1510 struct ceph_mds_session *session; 1511 int ret; 1512 1513 session = __ceph_lookup_mds_session(mdsc, target); 1514 if (!session) { 1515 session = register_session(mdsc, target); 1516 if (IS_ERR(session)) 1517 return session; 1518 } 1519 if (session->s_state == CEPH_MDS_SESSION_NEW || 1520 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1521 ret = __open_session(mdsc, session); 1522 if (ret) 1523 return ERR_PTR(ret); 1524 } 1525 1526 return session; 1527 } 1528 1529 struct ceph_mds_session * 1530 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1531 { 1532 struct ceph_mds_session *session; 1533 1534 dout("open_export_target_session to mds%d\n", target); 1535 1536 mutex_lock(&mdsc->mutex); 1537 session = __open_export_target_session(mdsc, target); 1538 mutex_unlock(&mdsc->mutex); 1539 1540 return session; 1541 } 1542 1543 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1544 struct ceph_mds_session *session) 1545 { 1546 struct ceph_mds_info *mi; 1547 struct ceph_mds_session *ts; 1548 int i, mds = session->s_mds; 1549 1550 if (mds >= mdsc->mdsmap->possible_max_rank) 1551 return; 1552 1553 mi = &mdsc->mdsmap->m_info[mds]; 1554 dout("open_export_target_sessions for mds%d (%d targets)\n", 1555 session->s_mds, mi->num_export_targets); 1556 1557 for (i = 0; i < mi->num_export_targets; i++) { 1558 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1559 ceph_put_mds_session(ts); 1560 } 1561 } 1562 1563 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1564 struct ceph_mds_session *session) 1565 { 1566 mutex_lock(&mdsc->mutex); 1567 __open_export_target_sessions(mdsc, session); 1568 mutex_unlock(&mdsc->mutex); 1569 } 1570 1571 /* 1572 * session caps 1573 */ 1574 1575 static void detach_cap_releases(struct ceph_mds_session *session, 1576 struct list_head *target) 1577 { 1578 lockdep_assert_held(&session->s_cap_lock); 1579 1580 list_splice_init(&session->s_cap_releases, target); 1581 session->s_num_cap_releases = 0; 1582 dout("dispose_cap_releases mds%d\n", session->s_mds); 1583 } 1584 1585 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1586 struct list_head *dispose) 1587 { 1588 while (!list_empty(dispose)) { 1589 struct ceph_cap *cap; 1590 /* zero out the in-progress message */ 1591 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1592 list_del(&cap->session_caps); 1593 ceph_put_cap(mdsc, cap); 1594 } 1595 } 1596 1597 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1598 struct ceph_mds_session *session) 1599 { 1600 struct ceph_mds_request *req; 1601 struct rb_node *p; 1602 1603 dout("cleanup_session_requests mds%d\n", session->s_mds); 1604 mutex_lock(&mdsc->mutex); 1605 while (!list_empty(&session->s_unsafe)) { 1606 req = list_first_entry(&session->s_unsafe, 1607 struct ceph_mds_request, r_unsafe_item); 1608 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1609 req->r_tid); 1610 if (req->r_target_inode) 1611 mapping_set_error(req->r_target_inode->i_mapping, -EIO); 1612 if (req->r_unsafe_dir) 1613 mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO); 1614 __unregister_request(mdsc, req); 1615 } 1616 /* zero r_attempts, so kick_requests() will re-send requests */ 1617 p = rb_first(&mdsc->request_tree); 1618 while (p) { 1619 req = rb_entry(p, struct ceph_mds_request, r_node); 1620 p = rb_next(p); 1621 if (req->r_session && 1622 req->r_session->s_mds == session->s_mds) 1623 req->r_attempts = 0; 1624 } 1625 mutex_unlock(&mdsc->mutex); 1626 } 1627 1628 /* 1629 * Helper to safely iterate over all caps associated with a session, with 1630 * special care taken to handle a racing __ceph_remove_cap(). 1631 * 1632 * Caller must hold session s_mutex. 1633 */ 1634 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1635 int (*cb)(struct inode *, int mds, void *), 1636 void *arg) 1637 { 1638 struct list_head *p; 1639 struct ceph_cap *cap; 1640 struct inode *inode, *last_inode = NULL; 1641 struct ceph_cap *old_cap = NULL; 1642 int ret; 1643 1644 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1645 spin_lock(&session->s_cap_lock); 1646 p = session->s_caps.next; 1647 while (p != &session->s_caps) { 1648 int mds; 1649 1650 cap = list_entry(p, struct ceph_cap, session_caps); 1651 inode = igrab(&cap->ci->netfs.inode); 1652 if (!inode) { 1653 p = p->next; 1654 continue; 1655 } 1656 session->s_cap_iterator = cap; 1657 mds = cap->mds; 1658 spin_unlock(&session->s_cap_lock); 1659 1660 if (last_inode) { 1661 iput(last_inode); 1662 last_inode = NULL; 1663 } 1664 if (old_cap) { 1665 ceph_put_cap(session->s_mdsc, old_cap); 1666 old_cap = NULL; 1667 } 1668 1669 ret = cb(inode, mds, arg); 1670 last_inode = inode; 1671 1672 spin_lock(&session->s_cap_lock); 1673 p = p->next; 1674 if (!cap->ci) { 1675 dout("iterate_session_caps finishing cap %p removal\n", 1676 cap); 1677 BUG_ON(cap->session != session); 1678 cap->session = NULL; 1679 list_del_init(&cap->session_caps); 1680 session->s_nr_caps--; 1681 atomic64_dec(&session->s_mdsc->metric.total_caps); 1682 if (cap->queue_release) 1683 __ceph_queue_cap_release(session, cap); 1684 else 1685 old_cap = cap; /* put_cap it w/o locks held */ 1686 } 1687 if (ret < 0) 1688 goto out; 1689 } 1690 ret = 0; 1691 out: 1692 session->s_cap_iterator = NULL; 1693 spin_unlock(&session->s_cap_lock); 1694 1695 iput(last_inode); 1696 if (old_cap) 1697 ceph_put_cap(session->s_mdsc, old_cap); 1698 1699 return ret; 1700 } 1701 1702 static int remove_session_caps_cb(struct inode *inode, int mds, void *arg) 1703 { 1704 struct ceph_inode_info *ci = ceph_inode(inode); 1705 bool invalidate = false; 1706 struct ceph_cap *cap; 1707 int iputs = 0; 1708 1709 spin_lock(&ci->i_ceph_lock); 1710 cap = __get_cap_for_mds(ci, mds); 1711 if (cap) { 1712 dout(" removing cap %p, ci is %p, inode is %p\n", 1713 cap, ci, &ci->netfs.inode); 1714 1715 iputs = ceph_purge_inode_cap(inode, cap, &invalidate); 1716 } 1717 spin_unlock(&ci->i_ceph_lock); 1718 1719 if (cap) 1720 wake_up_all(&ci->i_cap_wq); 1721 if (invalidate) 1722 ceph_queue_invalidate(inode); 1723 while (iputs--) 1724 iput(inode); 1725 return 0; 1726 } 1727 1728 /* 1729 * caller must hold session s_mutex 1730 */ 1731 static void remove_session_caps(struct ceph_mds_session *session) 1732 { 1733 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1734 struct super_block *sb = fsc->sb; 1735 LIST_HEAD(dispose); 1736 1737 dout("remove_session_caps on %p\n", session); 1738 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1739 1740 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1741 1742 spin_lock(&session->s_cap_lock); 1743 if (session->s_nr_caps > 0) { 1744 struct inode *inode; 1745 struct ceph_cap *cap, *prev = NULL; 1746 struct ceph_vino vino; 1747 /* 1748 * iterate_session_caps() skips inodes that are being 1749 * deleted, we need to wait until deletions are complete. 1750 * __wait_on_freeing_inode() is designed for the job, 1751 * but it is not exported, so use lookup inode function 1752 * to access it. 1753 */ 1754 while (!list_empty(&session->s_caps)) { 1755 cap = list_entry(session->s_caps.next, 1756 struct ceph_cap, session_caps); 1757 if (cap == prev) 1758 break; 1759 prev = cap; 1760 vino = cap->ci->i_vino; 1761 spin_unlock(&session->s_cap_lock); 1762 1763 inode = ceph_find_inode(sb, vino); 1764 iput(inode); 1765 1766 spin_lock(&session->s_cap_lock); 1767 } 1768 } 1769 1770 // drop cap expires and unlock s_cap_lock 1771 detach_cap_releases(session, &dispose); 1772 1773 BUG_ON(session->s_nr_caps > 0); 1774 BUG_ON(!list_empty(&session->s_cap_flushing)); 1775 spin_unlock(&session->s_cap_lock); 1776 dispose_cap_releases(session->s_mdsc, &dispose); 1777 } 1778 1779 enum { 1780 RECONNECT, 1781 RENEWCAPS, 1782 FORCE_RO, 1783 }; 1784 1785 /* 1786 * wake up any threads waiting on this session's caps. if the cap is 1787 * old (didn't get renewed on the client reconnect), remove it now. 1788 * 1789 * caller must hold s_mutex. 1790 */ 1791 static int wake_up_session_cb(struct inode *inode, int mds, void *arg) 1792 { 1793 struct ceph_inode_info *ci = ceph_inode(inode); 1794 unsigned long ev = (unsigned long)arg; 1795 1796 if (ev == RECONNECT) { 1797 spin_lock(&ci->i_ceph_lock); 1798 ci->i_wanted_max_size = 0; 1799 ci->i_requested_max_size = 0; 1800 spin_unlock(&ci->i_ceph_lock); 1801 } else if (ev == RENEWCAPS) { 1802 struct ceph_cap *cap; 1803 1804 spin_lock(&ci->i_ceph_lock); 1805 cap = __get_cap_for_mds(ci, mds); 1806 /* mds did not re-issue stale cap */ 1807 if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) 1808 cap->issued = cap->implemented = CEPH_CAP_PIN; 1809 spin_unlock(&ci->i_ceph_lock); 1810 } else if (ev == FORCE_RO) { 1811 } 1812 wake_up_all(&ci->i_cap_wq); 1813 return 0; 1814 } 1815 1816 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1817 { 1818 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1819 ceph_iterate_session_caps(session, wake_up_session_cb, 1820 (void *)(unsigned long)ev); 1821 } 1822 1823 /* 1824 * Send periodic message to MDS renewing all currently held caps. The 1825 * ack will reset the expiration for all caps from this session. 1826 * 1827 * caller holds s_mutex 1828 */ 1829 static int send_renew_caps(struct ceph_mds_client *mdsc, 1830 struct ceph_mds_session *session) 1831 { 1832 struct ceph_msg *msg; 1833 int state; 1834 1835 if (time_after_eq(jiffies, session->s_cap_ttl) && 1836 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1837 pr_info("mds%d caps stale\n", session->s_mds); 1838 session->s_renew_requested = jiffies; 1839 1840 /* do not try to renew caps until a recovering mds has reconnected 1841 * with its clients. */ 1842 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1843 if (state < CEPH_MDS_STATE_RECONNECT) { 1844 dout("send_renew_caps ignoring mds%d (%s)\n", 1845 session->s_mds, ceph_mds_state_name(state)); 1846 return 0; 1847 } 1848 1849 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1850 ceph_mds_state_name(state)); 1851 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1852 ++session->s_renew_seq); 1853 if (!msg) 1854 return -ENOMEM; 1855 ceph_con_send(&session->s_con, msg); 1856 return 0; 1857 } 1858 1859 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1860 struct ceph_mds_session *session, u64 seq) 1861 { 1862 struct ceph_msg *msg; 1863 1864 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1865 session->s_mds, ceph_session_state_name(session->s_state), seq); 1866 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1867 if (!msg) 1868 return -ENOMEM; 1869 ceph_con_send(&session->s_con, msg); 1870 return 0; 1871 } 1872 1873 1874 /* 1875 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1876 * 1877 * Called under session->s_mutex 1878 */ 1879 static void renewed_caps(struct ceph_mds_client *mdsc, 1880 struct ceph_mds_session *session, int is_renew) 1881 { 1882 int was_stale; 1883 int wake = 0; 1884 1885 spin_lock(&session->s_cap_lock); 1886 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1887 1888 session->s_cap_ttl = session->s_renew_requested + 1889 mdsc->mdsmap->m_session_timeout*HZ; 1890 1891 if (was_stale) { 1892 if (time_before(jiffies, session->s_cap_ttl)) { 1893 pr_info("mds%d caps renewed\n", session->s_mds); 1894 wake = 1; 1895 } else { 1896 pr_info("mds%d caps still stale\n", session->s_mds); 1897 } 1898 } 1899 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1900 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1901 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1902 spin_unlock(&session->s_cap_lock); 1903 1904 if (wake) 1905 wake_up_session_caps(session, RENEWCAPS); 1906 } 1907 1908 /* 1909 * send a session close request 1910 */ 1911 static int request_close_session(struct ceph_mds_session *session) 1912 { 1913 struct ceph_msg *msg; 1914 1915 dout("request_close_session mds%d state %s seq %lld\n", 1916 session->s_mds, ceph_session_state_name(session->s_state), 1917 session->s_seq); 1918 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE, 1919 session->s_seq); 1920 if (!msg) 1921 return -ENOMEM; 1922 ceph_con_send(&session->s_con, msg); 1923 return 1; 1924 } 1925 1926 /* 1927 * Called with s_mutex held. 1928 */ 1929 static int __close_session(struct ceph_mds_client *mdsc, 1930 struct ceph_mds_session *session) 1931 { 1932 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1933 return 0; 1934 session->s_state = CEPH_MDS_SESSION_CLOSING; 1935 return request_close_session(session); 1936 } 1937 1938 static bool drop_negative_children(struct dentry *dentry) 1939 { 1940 struct dentry *child; 1941 bool all_negative = true; 1942 1943 if (!d_is_dir(dentry)) 1944 goto out; 1945 1946 spin_lock(&dentry->d_lock); 1947 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1948 if (d_really_is_positive(child)) { 1949 all_negative = false; 1950 break; 1951 } 1952 } 1953 spin_unlock(&dentry->d_lock); 1954 1955 if (all_negative) 1956 shrink_dcache_parent(dentry); 1957 out: 1958 return all_negative; 1959 } 1960 1961 /* 1962 * Trim old(er) caps. 1963 * 1964 * Because we can't cache an inode without one or more caps, we do 1965 * this indirectly: if a cap is unused, we prune its aliases, at which 1966 * point the inode will hopefully get dropped to. 1967 * 1968 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1969 * memory pressure from the MDS, though, so it needn't be perfect. 1970 */ 1971 static int trim_caps_cb(struct inode *inode, int mds, void *arg) 1972 { 1973 int *remaining = arg; 1974 struct ceph_inode_info *ci = ceph_inode(inode); 1975 int used, wanted, oissued, mine; 1976 struct ceph_cap *cap; 1977 1978 if (*remaining <= 0) 1979 return -1; 1980 1981 spin_lock(&ci->i_ceph_lock); 1982 cap = __get_cap_for_mds(ci, mds); 1983 if (!cap) { 1984 spin_unlock(&ci->i_ceph_lock); 1985 return 0; 1986 } 1987 mine = cap->issued | cap->implemented; 1988 used = __ceph_caps_used(ci); 1989 wanted = __ceph_caps_file_wanted(ci); 1990 oissued = __ceph_caps_issued_other(ci, cap); 1991 1992 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1993 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1994 ceph_cap_string(used), ceph_cap_string(wanted)); 1995 if (cap == ci->i_auth_cap) { 1996 if (ci->i_dirty_caps || ci->i_flushing_caps || 1997 !list_empty(&ci->i_cap_snaps)) 1998 goto out; 1999 if ((used | wanted) & CEPH_CAP_ANY_WR) 2000 goto out; 2001 /* Note: it's possible that i_filelock_ref becomes non-zero 2002 * after dropping auth caps. It doesn't hurt because reply 2003 * of lock mds request will re-add auth caps. */ 2004 if (atomic_read(&ci->i_filelock_ref) > 0) 2005 goto out; 2006 } 2007 /* The inode has cached pages, but it's no longer used. 2008 * we can safely drop it */ 2009 if (S_ISREG(inode->i_mode) && 2010 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 2011 !(oissued & CEPH_CAP_FILE_CACHE)) { 2012 used = 0; 2013 oissued = 0; 2014 } 2015 if ((used | wanted) & ~oissued & mine) 2016 goto out; /* we need these caps */ 2017 2018 if (oissued) { 2019 /* we aren't the only cap.. just remove us */ 2020 ceph_remove_cap(cap, true); 2021 (*remaining)--; 2022 } else { 2023 struct dentry *dentry; 2024 /* try dropping referring dentries */ 2025 spin_unlock(&ci->i_ceph_lock); 2026 dentry = d_find_any_alias(inode); 2027 if (dentry && drop_negative_children(dentry)) { 2028 int count; 2029 dput(dentry); 2030 d_prune_aliases(inode); 2031 count = atomic_read(&inode->i_count); 2032 if (count == 1) 2033 (*remaining)--; 2034 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 2035 inode, cap, count); 2036 } else { 2037 dput(dentry); 2038 } 2039 return 0; 2040 } 2041 2042 out: 2043 spin_unlock(&ci->i_ceph_lock); 2044 return 0; 2045 } 2046 2047 /* 2048 * Trim session cap count down to some max number. 2049 */ 2050 int ceph_trim_caps(struct ceph_mds_client *mdsc, 2051 struct ceph_mds_session *session, 2052 int max_caps) 2053 { 2054 int trim_caps = session->s_nr_caps - max_caps; 2055 2056 dout("trim_caps mds%d start: %d / %d, trim %d\n", 2057 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 2058 if (trim_caps > 0) { 2059 int remaining = trim_caps; 2060 2061 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2062 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 2063 session->s_mds, session->s_nr_caps, max_caps, 2064 trim_caps - remaining); 2065 } 2066 2067 ceph_flush_cap_releases(mdsc, session); 2068 return 0; 2069 } 2070 2071 static int check_caps_flush(struct ceph_mds_client *mdsc, 2072 u64 want_flush_tid) 2073 { 2074 int ret = 1; 2075 2076 spin_lock(&mdsc->cap_dirty_lock); 2077 if (!list_empty(&mdsc->cap_flush_list)) { 2078 struct ceph_cap_flush *cf = 2079 list_first_entry(&mdsc->cap_flush_list, 2080 struct ceph_cap_flush, g_list); 2081 if (cf->tid <= want_flush_tid) { 2082 dout("check_caps_flush still flushing tid " 2083 "%llu <= %llu\n", cf->tid, want_flush_tid); 2084 ret = 0; 2085 } 2086 } 2087 spin_unlock(&mdsc->cap_dirty_lock); 2088 return ret; 2089 } 2090 2091 /* 2092 * flush all dirty inode data to disk. 2093 * 2094 * returns true if we've flushed through want_flush_tid 2095 */ 2096 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2097 u64 want_flush_tid) 2098 { 2099 dout("check_caps_flush want %llu\n", want_flush_tid); 2100 2101 wait_event(mdsc->cap_flushing_wq, 2102 check_caps_flush(mdsc, want_flush_tid)); 2103 2104 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 2105 } 2106 2107 /* 2108 * called under s_mutex 2109 */ 2110 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2111 struct ceph_mds_session *session) 2112 { 2113 struct ceph_msg *msg = NULL; 2114 struct ceph_mds_cap_release *head; 2115 struct ceph_mds_cap_item *item; 2116 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2117 struct ceph_cap *cap; 2118 LIST_HEAD(tmp_list); 2119 int num_cap_releases; 2120 __le32 barrier, *cap_barrier; 2121 2122 down_read(&osdc->lock); 2123 barrier = cpu_to_le32(osdc->epoch_barrier); 2124 up_read(&osdc->lock); 2125 2126 spin_lock(&session->s_cap_lock); 2127 again: 2128 list_splice_init(&session->s_cap_releases, &tmp_list); 2129 num_cap_releases = session->s_num_cap_releases; 2130 session->s_num_cap_releases = 0; 2131 spin_unlock(&session->s_cap_lock); 2132 2133 while (!list_empty(&tmp_list)) { 2134 if (!msg) { 2135 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2136 PAGE_SIZE, GFP_NOFS, false); 2137 if (!msg) 2138 goto out_err; 2139 head = msg->front.iov_base; 2140 head->num = cpu_to_le32(0); 2141 msg->front.iov_len = sizeof(*head); 2142 2143 msg->hdr.version = cpu_to_le16(2); 2144 msg->hdr.compat_version = cpu_to_le16(1); 2145 } 2146 2147 cap = list_first_entry(&tmp_list, struct ceph_cap, 2148 session_caps); 2149 list_del(&cap->session_caps); 2150 num_cap_releases--; 2151 2152 head = msg->front.iov_base; 2153 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2154 &head->num); 2155 item = msg->front.iov_base + msg->front.iov_len; 2156 item->ino = cpu_to_le64(cap->cap_ino); 2157 item->cap_id = cpu_to_le64(cap->cap_id); 2158 item->migrate_seq = cpu_to_le32(cap->mseq); 2159 item->seq = cpu_to_le32(cap->issue_seq); 2160 msg->front.iov_len += sizeof(*item); 2161 2162 ceph_put_cap(mdsc, cap); 2163 2164 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2165 // Append cap_barrier field 2166 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2167 *cap_barrier = barrier; 2168 msg->front.iov_len += sizeof(*cap_barrier); 2169 2170 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2171 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2172 ceph_con_send(&session->s_con, msg); 2173 msg = NULL; 2174 } 2175 } 2176 2177 BUG_ON(num_cap_releases != 0); 2178 2179 spin_lock(&session->s_cap_lock); 2180 if (!list_empty(&session->s_cap_releases)) 2181 goto again; 2182 spin_unlock(&session->s_cap_lock); 2183 2184 if (msg) { 2185 // Append cap_barrier field 2186 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2187 *cap_barrier = barrier; 2188 msg->front.iov_len += sizeof(*cap_barrier); 2189 2190 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2191 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2192 ceph_con_send(&session->s_con, msg); 2193 } 2194 return; 2195 out_err: 2196 pr_err("send_cap_releases mds%d, failed to allocate message\n", 2197 session->s_mds); 2198 spin_lock(&session->s_cap_lock); 2199 list_splice(&tmp_list, &session->s_cap_releases); 2200 session->s_num_cap_releases += num_cap_releases; 2201 spin_unlock(&session->s_cap_lock); 2202 } 2203 2204 static void ceph_cap_release_work(struct work_struct *work) 2205 { 2206 struct ceph_mds_session *session = 2207 container_of(work, struct ceph_mds_session, s_cap_release_work); 2208 2209 mutex_lock(&session->s_mutex); 2210 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2211 session->s_state == CEPH_MDS_SESSION_HUNG) 2212 ceph_send_cap_releases(session->s_mdsc, session); 2213 mutex_unlock(&session->s_mutex); 2214 ceph_put_mds_session(session); 2215 } 2216 2217 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 2218 struct ceph_mds_session *session) 2219 { 2220 if (mdsc->stopping) 2221 return; 2222 2223 ceph_get_mds_session(session); 2224 if (queue_work(mdsc->fsc->cap_wq, 2225 &session->s_cap_release_work)) { 2226 dout("cap release work queued\n"); 2227 } else { 2228 ceph_put_mds_session(session); 2229 dout("failed to queue cap release work\n"); 2230 } 2231 } 2232 2233 /* 2234 * caller holds session->s_cap_lock 2235 */ 2236 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2237 struct ceph_cap *cap) 2238 { 2239 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2240 session->s_num_cap_releases++; 2241 2242 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2243 ceph_flush_cap_releases(session->s_mdsc, session); 2244 } 2245 2246 static void ceph_cap_reclaim_work(struct work_struct *work) 2247 { 2248 struct ceph_mds_client *mdsc = 2249 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2250 int ret = ceph_trim_dentries(mdsc); 2251 if (ret == -EAGAIN) 2252 ceph_queue_cap_reclaim_work(mdsc); 2253 } 2254 2255 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2256 { 2257 if (mdsc->stopping) 2258 return; 2259 2260 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2261 dout("caps reclaim work queued\n"); 2262 } else { 2263 dout("failed to queue caps release work\n"); 2264 } 2265 } 2266 2267 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2268 { 2269 int val; 2270 if (!nr) 2271 return; 2272 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2273 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2274 atomic_set(&mdsc->cap_reclaim_pending, 0); 2275 ceph_queue_cap_reclaim_work(mdsc); 2276 } 2277 } 2278 2279 /* 2280 * requests 2281 */ 2282 2283 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2284 struct inode *dir) 2285 { 2286 struct ceph_inode_info *ci = ceph_inode(dir); 2287 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2288 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2289 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2290 unsigned int num_entries; 2291 int order; 2292 2293 spin_lock(&ci->i_ceph_lock); 2294 num_entries = ci->i_files + ci->i_subdirs; 2295 spin_unlock(&ci->i_ceph_lock); 2296 num_entries = max(num_entries, 1U); 2297 num_entries = min(num_entries, opt->max_readdir); 2298 2299 order = get_order(size * num_entries); 2300 while (order >= 0) { 2301 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2302 __GFP_NOWARN | 2303 __GFP_ZERO, 2304 order); 2305 if (rinfo->dir_entries) 2306 break; 2307 order--; 2308 } 2309 if (!rinfo->dir_entries) 2310 return -ENOMEM; 2311 2312 num_entries = (PAGE_SIZE << order) / size; 2313 num_entries = min(num_entries, opt->max_readdir); 2314 2315 rinfo->dir_buf_size = PAGE_SIZE << order; 2316 req->r_num_caps = num_entries + 1; 2317 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2318 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2319 return 0; 2320 } 2321 2322 /* 2323 * Create an mds request. 2324 */ 2325 struct ceph_mds_request * 2326 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2327 { 2328 struct ceph_mds_request *req; 2329 2330 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2331 if (!req) 2332 return ERR_PTR(-ENOMEM); 2333 2334 mutex_init(&req->r_fill_mutex); 2335 req->r_mdsc = mdsc; 2336 req->r_started = jiffies; 2337 req->r_start_latency = ktime_get(); 2338 req->r_resend_mds = -1; 2339 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2340 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2341 req->r_fmode = -1; 2342 req->r_feature_needed = -1; 2343 kref_init(&req->r_kref); 2344 RB_CLEAR_NODE(&req->r_node); 2345 INIT_LIST_HEAD(&req->r_wait); 2346 init_completion(&req->r_completion); 2347 init_completion(&req->r_safe_completion); 2348 INIT_LIST_HEAD(&req->r_unsafe_item); 2349 2350 ktime_get_coarse_real_ts64(&req->r_stamp); 2351 2352 req->r_op = op; 2353 req->r_direct_mode = mode; 2354 return req; 2355 } 2356 2357 /* 2358 * return oldest (lowest) request, tid in request tree, 0 if none. 2359 * 2360 * called under mdsc->mutex. 2361 */ 2362 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2363 { 2364 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2365 return NULL; 2366 return rb_entry(rb_first(&mdsc->request_tree), 2367 struct ceph_mds_request, r_node); 2368 } 2369 2370 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2371 { 2372 return mdsc->oldest_tid; 2373 } 2374 2375 /* 2376 * Build a dentry's path. Allocate on heap; caller must kfree. Based 2377 * on build_path_from_dentry in fs/cifs/dir.c. 2378 * 2379 * If @stop_on_nosnap, generate path relative to the first non-snapped 2380 * inode. 2381 * 2382 * Encode hidden .snap dirs as a double /, i.e. 2383 * foo/.snap/bar -> foo//bar 2384 */ 2385 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, 2386 int stop_on_nosnap) 2387 { 2388 struct dentry *temp; 2389 char *path; 2390 int pos; 2391 unsigned seq; 2392 u64 base; 2393 2394 if (!dentry) 2395 return ERR_PTR(-EINVAL); 2396 2397 path = __getname(); 2398 if (!path) 2399 return ERR_PTR(-ENOMEM); 2400 retry: 2401 pos = PATH_MAX - 1; 2402 path[pos] = '\0'; 2403 2404 seq = read_seqbegin(&rename_lock); 2405 rcu_read_lock(); 2406 temp = dentry; 2407 for (;;) { 2408 struct inode *inode; 2409 2410 spin_lock(&temp->d_lock); 2411 inode = d_inode(temp); 2412 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2413 dout("build_path path+%d: %p SNAPDIR\n", 2414 pos, temp); 2415 } else if (stop_on_nosnap && inode && dentry != temp && 2416 ceph_snap(inode) == CEPH_NOSNAP) { 2417 spin_unlock(&temp->d_lock); 2418 pos++; /* get rid of any prepended '/' */ 2419 break; 2420 } else { 2421 pos -= temp->d_name.len; 2422 if (pos < 0) { 2423 spin_unlock(&temp->d_lock); 2424 break; 2425 } 2426 memcpy(path + pos, temp->d_name.name, temp->d_name.len); 2427 } 2428 spin_unlock(&temp->d_lock); 2429 temp = READ_ONCE(temp->d_parent); 2430 2431 /* Are we at the root? */ 2432 if (IS_ROOT(temp)) 2433 break; 2434 2435 /* Are we out of buffer? */ 2436 if (--pos < 0) 2437 break; 2438 2439 path[pos] = '/'; 2440 } 2441 base = ceph_ino(d_inode(temp)); 2442 rcu_read_unlock(); 2443 2444 if (read_seqretry(&rename_lock, seq)) 2445 goto retry; 2446 2447 if (pos < 0) { 2448 /* 2449 * A rename didn't occur, but somehow we didn't end up where 2450 * we thought we would. Throw a warning and try again. 2451 */ 2452 pr_warn("build_path did not end path lookup where " 2453 "expected, pos is %d\n", pos); 2454 goto retry; 2455 } 2456 2457 *pbase = base; 2458 *plen = PATH_MAX - 1 - pos; 2459 dout("build_path on %p %d built %llx '%.*s'\n", 2460 dentry, d_count(dentry), base, *plen, path + pos); 2461 return path + pos; 2462 } 2463 2464 static int build_dentry_path(struct dentry *dentry, struct inode *dir, 2465 const char **ppath, int *ppathlen, u64 *pino, 2466 bool *pfreepath, bool parent_locked) 2467 { 2468 char *path; 2469 2470 rcu_read_lock(); 2471 if (!dir) 2472 dir = d_inode_rcu(dentry->d_parent); 2473 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { 2474 *pino = ceph_ino(dir); 2475 rcu_read_unlock(); 2476 *ppath = dentry->d_name.name; 2477 *ppathlen = dentry->d_name.len; 2478 return 0; 2479 } 2480 rcu_read_unlock(); 2481 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2482 if (IS_ERR(path)) 2483 return PTR_ERR(path); 2484 *ppath = path; 2485 *pfreepath = true; 2486 return 0; 2487 } 2488 2489 static int build_inode_path(struct inode *inode, 2490 const char **ppath, int *ppathlen, u64 *pino, 2491 bool *pfreepath) 2492 { 2493 struct dentry *dentry; 2494 char *path; 2495 2496 if (ceph_snap(inode) == CEPH_NOSNAP) { 2497 *pino = ceph_ino(inode); 2498 *ppathlen = 0; 2499 return 0; 2500 } 2501 dentry = d_find_alias(inode); 2502 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2503 dput(dentry); 2504 if (IS_ERR(path)) 2505 return PTR_ERR(path); 2506 *ppath = path; 2507 *pfreepath = true; 2508 return 0; 2509 } 2510 2511 /* 2512 * request arguments may be specified via an inode *, a dentry *, or 2513 * an explicit ino+path. 2514 */ 2515 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2516 struct inode *rdiri, const char *rpath, 2517 u64 rino, const char **ppath, int *pathlen, 2518 u64 *ino, bool *freepath, bool parent_locked) 2519 { 2520 int r = 0; 2521 2522 if (rinode) { 2523 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2524 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2525 ceph_snap(rinode)); 2526 } else if (rdentry) { 2527 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2528 freepath, parent_locked); 2529 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2530 *ppath); 2531 } else if (rpath || rino) { 2532 *ino = rino; 2533 *ppath = rpath; 2534 *pathlen = rpath ? strlen(rpath) : 0; 2535 dout(" path %.*s\n", *pathlen, rpath); 2536 } 2537 2538 return r; 2539 } 2540 2541 static void encode_timestamp_and_gids(void **p, 2542 const struct ceph_mds_request *req) 2543 { 2544 struct ceph_timespec ts; 2545 int i; 2546 2547 ceph_encode_timespec64(&ts, &req->r_stamp); 2548 ceph_encode_copy(p, &ts, sizeof(ts)); 2549 2550 /* gid_list */ 2551 ceph_encode_32(p, req->r_cred->group_info->ngroups); 2552 for (i = 0; i < req->r_cred->group_info->ngroups; i++) 2553 ceph_encode_64(p, from_kgid(&init_user_ns, 2554 req->r_cred->group_info->gid[i])); 2555 } 2556 2557 /* 2558 * called under mdsc->mutex 2559 */ 2560 static struct ceph_msg *create_request_message(struct ceph_mds_session *session, 2561 struct ceph_mds_request *req, 2562 bool drop_cap_releases) 2563 { 2564 int mds = session->s_mds; 2565 struct ceph_mds_client *mdsc = session->s_mdsc; 2566 struct ceph_msg *msg; 2567 struct ceph_mds_request_head_old *head; 2568 const char *path1 = NULL; 2569 const char *path2 = NULL; 2570 u64 ino1 = 0, ino2 = 0; 2571 int pathlen1 = 0, pathlen2 = 0; 2572 bool freepath1 = false, freepath2 = false; 2573 struct dentry *old_dentry = NULL; 2574 int len; 2575 u16 releases; 2576 void *p, *end; 2577 int ret; 2578 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); 2579 2580 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2581 req->r_parent, req->r_path1, req->r_ino1.ino, 2582 &path1, &pathlen1, &ino1, &freepath1, 2583 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2584 &req->r_req_flags)); 2585 if (ret < 0) { 2586 msg = ERR_PTR(ret); 2587 goto out; 2588 } 2589 2590 /* If r_old_dentry is set, then assume that its parent is locked */ 2591 if (req->r_old_dentry && 2592 !(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED)) 2593 old_dentry = req->r_old_dentry; 2594 ret = set_request_path_attr(NULL, old_dentry, 2595 req->r_old_dentry_dir, 2596 req->r_path2, req->r_ino2.ino, 2597 &path2, &pathlen2, &ino2, &freepath2, true); 2598 if (ret < 0) { 2599 msg = ERR_PTR(ret); 2600 goto out_free1; 2601 } 2602 2603 len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head); 2604 len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2605 sizeof(struct ceph_timespec); 2606 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); 2607 2608 /* calculate (max) length for cap releases */ 2609 len += sizeof(struct ceph_mds_request_release) * 2610 (!!req->r_inode_drop + !!req->r_dentry_drop + 2611 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2612 2613 if (req->r_dentry_drop) 2614 len += pathlen1; 2615 if (req->r_old_dentry_drop) 2616 len += pathlen2; 2617 2618 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2619 if (!msg) { 2620 msg = ERR_PTR(-ENOMEM); 2621 goto out_free2; 2622 } 2623 2624 msg->hdr.tid = cpu_to_le64(req->r_tid); 2625 2626 /* 2627 * The old ceph_mds_request_head didn't contain a version field, and 2628 * one was added when we moved the message version from 3->4. 2629 */ 2630 if (legacy) { 2631 msg->hdr.version = cpu_to_le16(3); 2632 head = msg->front.iov_base; 2633 p = msg->front.iov_base + sizeof(*head); 2634 } else { 2635 struct ceph_mds_request_head *new_head = msg->front.iov_base; 2636 2637 msg->hdr.version = cpu_to_le16(4); 2638 new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); 2639 head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2640 p = msg->front.iov_base + sizeof(*new_head); 2641 } 2642 2643 end = msg->front.iov_base + msg->front.iov_len; 2644 2645 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2646 head->op = cpu_to_le32(req->r_op); 2647 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, 2648 req->r_cred->fsuid)); 2649 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, 2650 req->r_cred->fsgid)); 2651 head->ino = cpu_to_le64(req->r_deleg_ino); 2652 head->args = req->r_args; 2653 2654 ceph_encode_filepath(&p, end, ino1, path1); 2655 ceph_encode_filepath(&p, end, ino2, path2); 2656 2657 /* make note of release offset, in case we need to replay */ 2658 req->r_request_release_offset = p - msg->front.iov_base; 2659 2660 /* cap releases */ 2661 releases = 0; 2662 if (req->r_inode_drop) 2663 releases += ceph_encode_inode_release(&p, 2664 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2665 mds, req->r_inode_drop, req->r_inode_unless, 2666 req->r_op == CEPH_MDS_OP_READDIR); 2667 if (req->r_dentry_drop) 2668 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2669 req->r_parent, mds, req->r_dentry_drop, 2670 req->r_dentry_unless); 2671 if (req->r_old_dentry_drop) 2672 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2673 req->r_old_dentry_dir, mds, 2674 req->r_old_dentry_drop, 2675 req->r_old_dentry_unless); 2676 if (req->r_old_inode_drop) 2677 releases += ceph_encode_inode_release(&p, 2678 d_inode(req->r_old_dentry), 2679 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2680 2681 if (drop_cap_releases) { 2682 releases = 0; 2683 p = msg->front.iov_base + req->r_request_release_offset; 2684 } 2685 2686 head->num_releases = cpu_to_le16(releases); 2687 2688 encode_timestamp_and_gids(&p, req); 2689 2690 if (WARN_ON_ONCE(p > end)) { 2691 ceph_msg_put(msg); 2692 msg = ERR_PTR(-ERANGE); 2693 goto out_free2; 2694 } 2695 2696 msg->front.iov_len = p - msg->front.iov_base; 2697 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2698 2699 if (req->r_pagelist) { 2700 struct ceph_pagelist *pagelist = req->r_pagelist; 2701 ceph_msg_data_add_pagelist(msg, pagelist); 2702 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2703 } else { 2704 msg->hdr.data_len = 0; 2705 } 2706 2707 msg->hdr.data_off = cpu_to_le16(0); 2708 2709 out_free2: 2710 if (freepath2) 2711 ceph_mdsc_free_path((char *)path2, pathlen2); 2712 out_free1: 2713 if (freepath1) 2714 ceph_mdsc_free_path((char *)path1, pathlen1); 2715 out: 2716 return msg; 2717 } 2718 2719 /* 2720 * called under mdsc->mutex if error, under no mutex if 2721 * success. 2722 */ 2723 static void complete_request(struct ceph_mds_client *mdsc, 2724 struct ceph_mds_request *req) 2725 { 2726 req->r_end_latency = ktime_get(); 2727 2728 if (req->r_callback) 2729 req->r_callback(mdsc, req); 2730 complete_all(&req->r_completion); 2731 } 2732 2733 static struct ceph_mds_request_head_old * 2734 find_old_request_head(void *p, u64 features) 2735 { 2736 bool legacy = !(features & CEPH_FEATURE_FS_BTIME); 2737 struct ceph_mds_request_head *new_head; 2738 2739 if (legacy) 2740 return (struct ceph_mds_request_head_old *)p; 2741 new_head = (struct ceph_mds_request_head *)p; 2742 return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2743 } 2744 2745 /* 2746 * called under mdsc->mutex 2747 */ 2748 static int __prepare_send_request(struct ceph_mds_session *session, 2749 struct ceph_mds_request *req, 2750 bool drop_cap_releases) 2751 { 2752 int mds = session->s_mds; 2753 struct ceph_mds_client *mdsc = session->s_mdsc; 2754 struct ceph_mds_request_head_old *rhead; 2755 struct ceph_msg *msg; 2756 int flags = 0, max_retry; 2757 2758 /* 2759 * The type of 'r_attempts' in kernel 'ceph_mds_request' 2760 * is 'int', while in 'ceph_mds_request_head' the type of 2761 * 'num_retry' is '__u8'. So in case the request retries 2762 * exceeding 256 times, the MDS will receive a incorrect 2763 * retry seq. 2764 * 2765 * In this case it's ususally a bug in MDS and continue 2766 * retrying the request makes no sense. 2767 * 2768 * In future this could be fixed in ceph code, so avoid 2769 * using the hardcode here. 2770 */ 2771 max_retry = sizeof_field(struct ceph_mds_request_head, num_retry); 2772 max_retry = 1 << (max_retry * BITS_PER_BYTE); 2773 if (req->r_attempts >= max_retry) { 2774 pr_warn_ratelimited("%s request tid %llu seq overflow\n", 2775 __func__, req->r_tid); 2776 return -EMULTIHOP; 2777 } 2778 2779 req->r_attempts++; 2780 if (req->r_inode) { 2781 struct ceph_cap *cap = 2782 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2783 2784 if (cap) 2785 req->r_sent_on_mseq = cap->mseq; 2786 else 2787 req->r_sent_on_mseq = -1; 2788 } 2789 dout("%s %p tid %lld %s (attempt %d)\n", __func__, req, 2790 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2791 2792 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2793 void *p; 2794 2795 /* 2796 * Replay. Do not regenerate message (and rebuild 2797 * paths, etc.); just use the original message. 2798 * Rebuilding paths will break for renames because 2799 * d_move mangles the src name. 2800 */ 2801 msg = req->r_request; 2802 rhead = find_old_request_head(msg->front.iov_base, 2803 session->s_con.peer_features); 2804 2805 flags = le32_to_cpu(rhead->flags); 2806 flags |= CEPH_MDS_FLAG_REPLAY; 2807 rhead->flags = cpu_to_le32(flags); 2808 2809 if (req->r_target_inode) 2810 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2811 2812 rhead->num_retry = req->r_attempts - 1; 2813 2814 /* remove cap/dentry releases from message */ 2815 rhead->num_releases = 0; 2816 2817 p = msg->front.iov_base + req->r_request_release_offset; 2818 encode_timestamp_and_gids(&p, req); 2819 2820 msg->front.iov_len = p - msg->front.iov_base; 2821 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2822 return 0; 2823 } 2824 2825 if (req->r_request) { 2826 ceph_msg_put(req->r_request); 2827 req->r_request = NULL; 2828 } 2829 msg = create_request_message(session, req, drop_cap_releases); 2830 if (IS_ERR(msg)) { 2831 req->r_err = PTR_ERR(msg); 2832 return PTR_ERR(msg); 2833 } 2834 req->r_request = msg; 2835 2836 rhead = find_old_request_head(msg->front.iov_base, 2837 session->s_con.peer_features); 2838 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2839 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2840 flags |= CEPH_MDS_FLAG_REPLAY; 2841 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 2842 flags |= CEPH_MDS_FLAG_ASYNC; 2843 if (req->r_parent) 2844 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2845 rhead->flags = cpu_to_le32(flags); 2846 rhead->num_fwd = req->r_num_fwd; 2847 rhead->num_retry = req->r_attempts - 1; 2848 2849 dout(" r_parent = %p\n", req->r_parent); 2850 return 0; 2851 } 2852 2853 /* 2854 * called under mdsc->mutex 2855 */ 2856 static int __send_request(struct ceph_mds_session *session, 2857 struct ceph_mds_request *req, 2858 bool drop_cap_releases) 2859 { 2860 int err; 2861 2862 err = __prepare_send_request(session, req, drop_cap_releases); 2863 if (!err) { 2864 ceph_msg_get(req->r_request); 2865 ceph_con_send(&session->s_con, req->r_request); 2866 } 2867 2868 return err; 2869 } 2870 2871 /* 2872 * send request, or put it on the appropriate wait list. 2873 */ 2874 static void __do_request(struct ceph_mds_client *mdsc, 2875 struct ceph_mds_request *req) 2876 { 2877 struct ceph_mds_session *session = NULL; 2878 int mds = -1; 2879 int err = 0; 2880 bool random; 2881 2882 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2883 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2884 __unregister_request(mdsc, req); 2885 return; 2886 } 2887 2888 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) { 2889 dout("do_request metadata corrupted\n"); 2890 err = -EIO; 2891 goto finish; 2892 } 2893 if (req->r_timeout && 2894 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2895 dout("do_request timed out\n"); 2896 err = -ETIMEDOUT; 2897 goto finish; 2898 } 2899 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2900 dout("do_request forced umount\n"); 2901 err = -EIO; 2902 goto finish; 2903 } 2904 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2905 if (mdsc->mdsmap_err) { 2906 err = mdsc->mdsmap_err; 2907 dout("do_request mdsmap err %d\n", err); 2908 goto finish; 2909 } 2910 if (mdsc->mdsmap->m_epoch == 0) { 2911 dout("do_request no mdsmap, waiting for map\n"); 2912 list_add(&req->r_wait, &mdsc->waiting_for_map); 2913 return; 2914 } 2915 if (!(mdsc->fsc->mount_options->flags & 2916 CEPH_MOUNT_OPT_MOUNTWAIT) && 2917 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2918 err = -EHOSTUNREACH; 2919 goto finish; 2920 } 2921 } 2922 2923 put_request_session(req); 2924 2925 mds = __choose_mds(mdsc, req, &random); 2926 if (mds < 0 || 2927 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2928 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2929 err = -EJUKEBOX; 2930 goto finish; 2931 } 2932 dout("do_request no mds or not active, waiting for map\n"); 2933 list_add(&req->r_wait, &mdsc->waiting_for_map); 2934 return; 2935 } 2936 2937 /* get, open session */ 2938 session = __ceph_lookup_mds_session(mdsc, mds); 2939 if (!session) { 2940 session = register_session(mdsc, mds); 2941 if (IS_ERR(session)) { 2942 err = PTR_ERR(session); 2943 goto finish; 2944 } 2945 } 2946 req->r_session = ceph_get_mds_session(session); 2947 2948 dout("do_request mds%d session %p state %s\n", mds, session, 2949 ceph_session_state_name(session->s_state)); 2950 2951 /* 2952 * The old ceph will crash the MDSs when see unknown OPs 2953 */ 2954 if (req->r_feature_needed > 0 && 2955 !test_bit(req->r_feature_needed, &session->s_features)) { 2956 err = -EOPNOTSUPP; 2957 goto out_session; 2958 } 2959 2960 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2961 session->s_state != CEPH_MDS_SESSION_HUNG) { 2962 /* 2963 * We cannot queue async requests since the caps and delegated 2964 * inodes are bound to the session. Just return -EJUKEBOX and 2965 * let the caller retry a sync request in that case. 2966 */ 2967 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2968 err = -EJUKEBOX; 2969 goto out_session; 2970 } 2971 2972 /* 2973 * If the session has been REJECTED, then return a hard error, 2974 * unless it's a CLEANRECOVER mount, in which case we'll queue 2975 * it to the mdsc queue. 2976 */ 2977 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2978 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) 2979 list_add(&req->r_wait, &mdsc->waiting_for_map); 2980 else 2981 err = -EACCES; 2982 goto out_session; 2983 } 2984 2985 if (session->s_state == CEPH_MDS_SESSION_NEW || 2986 session->s_state == CEPH_MDS_SESSION_CLOSING) { 2987 err = __open_session(mdsc, session); 2988 if (err) 2989 goto out_session; 2990 /* retry the same mds later */ 2991 if (random) 2992 req->r_resend_mds = mds; 2993 } 2994 list_add(&req->r_wait, &session->s_waiting); 2995 goto out_session; 2996 } 2997 2998 /* send request */ 2999 req->r_resend_mds = -1; /* forget any previous mds hint */ 3000 3001 if (req->r_request_started == 0) /* note request start time */ 3002 req->r_request_started = jiffies; 3003 3004 /* 3005 * For async create we will choose the auth MDS of frag in parent 3006 * directory to send the request and ususally this works fine, but 3007 * if the migrated the dirtory to another MDS before it could handle 3008 * it the request will be forwarded. 3009 * 3010 * And then the auth cap will be changed. 3011 */ 3012 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) { 3013 struct ceph_dentry_info *di = ceph_dentry(req->r_dentry); 3014 struct ceph_inode_info *ci; 3015 struct ceph_cap *cap; 3016 3017 /* 3018 * The request maybe handled very fast and the new inode 3019 * hasn't been linked to the dentry yet. We need to wait 3020 * for the ceph_finish_async_create(), which shouldn't be 3021 * stuck too long or fail in thoery, to finish when forwarding 3022 * the request. 3023 */ 3024 if (!d_inode(req->r_dentry)) { 3025 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT, 3026 TASK_KILLABLE); 3027 if (err) { 3028 mutex_lock(&req->r_fill_mutex); 3029 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3030 mutex_unlock(&req->r_fill_mutex); 3031 goto out_session; 3032 } 3033 } 3034 3035 ci = ceph_inode(d_inode(req->r_dentry)); 3036 3037 spin_lock(&ci->i_ceph_lock); 3038 cap = ci->i_auth_cap; 3039 if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) { 3040 dout("do_request session changed for auth cap %d -> %d\n", 3041 cap->session->s_mds, session->s_mds); 3042 3043 /* Remove the auth cap from old session */ 3044 spin_lock(&cap->session->s_cap_lock); 3045 cap->session->s_nr_caps--; 3046 list_del_init(&cap->session_caps); 3047 spin_unlock(&cap->session->s_cap_lock); 3048 3049 /* Add the auth cap to the new session */ 3050 cap->mds = mds; 3051 cap->session = session; 3052 spin_lock(&session->s_cap_lock); 3053 session->s_nr_caps++; 3054 list_add_tail(&cap->session_caps, &session->s_caps); 3055 spin_unlock(&session->s_cap_lock); 3056 3057 change_auth_cap_ses(ci, session); 3058 } 3059 spin_unlock(&ci->i_ceph_lock); 3060 } 3061 3062 err = __send_request(session, req, false); 3063 3064 out_session: 3065 ceph_put_mds_session(session); 3066 finish: 3067 if (err) { 3068 dout("__do_request early error %d\n", err); 3069 req->r_err = err; 3070 complete_request(mdsc, req); 3071 __unregister_request(mdsc, req); 3072 } 3073 return; 3074 } 3075 3076 /* 3077 * called under mdsc->mutex 3078 */ 3079 static void __wake_requests(struct ceph_mds_client *mdsc, 3080 struct list_head *head) 3081 { 3082 struct ceph_mds_request *req; 3083 LIST_HEAD(tmp_list); 3084 3085 list_splice_init(head, &tmp_list); 3086 3087 while (!list_empty(&tmp_list)) { 3088 req = list_entry(tmp_list.next, 3089 struct ceph_mds_request, r_wait); 3090 list_del_init(&req->r_wait); 3091 dout(" wake request %p tid %llu\n", req, req->r_tid); 3092 __do_request(mdsc, req); 3093 } 3094 } 3095 3096 /* 3097 * Wake up threads with requests pending for @mds, so that they can 3098 * resubmit their requests to a possibly different mds. 3099 */ 3100 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 3101 { 3102 struct ceph_mds_request *req; 3103 struct rb_node *p = rb_first(&mdsc->request_tree); 3104 3105 dout("kick_requests mds%d\n", mds); 3106 while (p) { 3107 req = rb_entry(p, struct ceph_mds_request, r_node); 3108 p = rb_next(p); 3109 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3110 continue; 3111 if (req->r_attempts > 0) 3112 continue; /* only new requests */ 3113 if (req->r_session && 3114 req->r_session->s_mds == mds) { 3115 dout(" kicking tid %llu\n", req->r_tid); 3116 list_del_init(&req->r_wait); 3117 __do_request(mdsc, req); 3118 } 3119 } 3120 } 3121 3122 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 3123 struct ceph_mds_request *req) 3124 { 3125 int err = 0; 3126 3127 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 3128 if (req->r_inode) 3129 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 3130 if (req->r_parent) { 3131 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 3132 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 3133 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 3134 spin_lock(&ci->i_ceph_lock); 3135 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 3136 __ceph_touch_fmode(ci, mdsc, fmode); 3137 spin_unlock(&ci->i_ceph_lock); 3138 } 3139 if (req->r_old_dentry_dir) 3140 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 3141 CEPH_CAP_PIN); 3142 3143 if (req->r_inode) { 3144 err = ceph_wait_on_async_create(req->r_inode); 3145 if (err) { 3146 dout("%s: wait for async create returned: %d\n", 3147 __func__, err); 3148 return err; 3149 } 3150 } 3151 3152 if (!err && req->r_old_inode) { 3153 err = ceph_wait_on_async_create(req->r_old_inode); 3154 if (err) { 3155 dout("%s: wait for async create returned: %d\n", 3156 __func__, err); 3157 return err; 3158 } 3159 } 3160 3161 dout("submit_request on %p for inode %p\n", req, dir); 3162 mutex_lock(&mdsc->mutex); 3163 __register_request(mdsc, req, dir); 3164 __do_request(mdsc, req); 3165 err = req->r_err; 3166 mutex_unlock(&mdsc->mutex); 3167 return err; 3168 } 3169 3170 int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 3171 struct ceph_mds_request *req, 3172 ceph_mds_request_wait_callback_t wait_func) 3173 { 3174 int err; 3175 3176 /* wait */ 3177 dout("do_request waiting\n"); 3178 if (wait_func) { 3179 err = wait_func(mdsc, req); 3180 } else { 3181 long timeleft = wait_for_completion_killable_timeout( 3182 &req->r_completion, 3183 ceph_timeout_jiffies(req->r_timeout)); 3184 if (timeleft > 0) 3185 err = 0; 3186 else if (!timeleft) 3187 err = -ETIMEDOUT; /* timed out */ 3188 else 3189 err = timeleft; /* killed */ 3190 } 3191 dout("do_request waited, got %d\n", err); 3192 mutex_lock(&mdsc->mutex); 3193 3194 /* only abort if we didn't race with a real reply */ 3195 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3196 err = le32_to_cpu(req->r_reply_info.head->result); 3197 } else if (err < 0) { 3198 dout("aborted request %lld with %d\n", req->r_tid, err); 3199 3200 /* 3201 * ensure we aren't running concurrently with 3202 * ceph_fill_trace or ceph_readdir_prepopulate, which 3203 * rely on locks (dir mutex) held by our caller. 3204 */ 3205 mutex_lock(&req->r_fill_mutex); 3206 req->r_err = err; 3207 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3208 mutex_unlock(&req->r_fill_mutex); 3209 3210 if (req->r_parent && 3211 (req->r_op & CEPH_MDS_OP_WRITE)) 3212 ceph_invalidate_dir_request(req); 3213 } else { 3214 err = req->r_err; 3215 } 3216 3217 mutex_unlock(&mdsc->mutex); 3218 return err; 3219 } 3220 3221 /* 3222 * Synchrously perform an mds request. Take care of all of the 3223 * session setup, forwarding, retry details. 3224 */ 3225 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3226 struct inode *dir, 3227 struct ceph_mds_request *req) 3228 { 3229 int err; 3230 3231 dout("do_request on %p\n", req); 3232 3233 /* issue */ 3234 err = ceph_mdsc_submit_request(mdsc, dir, req); 3235 if (!err) 3236 err = ceph_mdsc_wait_request(mdsc, req, NULL); 3237 dout("do_request %p done, result %d\n", req, err); 3238 return err; 3239 } 3240 3241 /* 3242 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3243 * namespace request. 3244 */ 3245 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3246 { 3247 struct inode *dir = req->r_parent; 3248 struct inode *old_dir = req->r_old_dentry_dir; 3249 3250 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 3251 3252 ceph_dir_clear_complete(dir); 3253 if (old_dir) 3254 ceph_dir_clear_complete(old_dir); 3255 if (req->r_dentry) 3256 ceph_invalidate_dentry_lease(req->r_dentry); 3257 if (req->r_old_dentry) 3258 ceph_invalidate_dentry_lease(req->r_old_dentry); 3259 } 3260 3261 /* 3262 * Handle mds reply. 3263 * 3264 * We take the session mutex and parse and process the reply immediately. 3265 * This preserves the logical ordering of replies, capabilities, etc., sent 3266 * by the MDS as they are applied to our local cache. 3267 */ 3268 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3269 { 3270 struct ceph_mds_client *mdsc = session->s_mdsc; 3271 struct ceph_mds_request *req; 3272 struct ceph_mds_reply_head *head = msg->front.iov_base; 3273 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3274 struct ceph_snap_realm *realm; 3275 u64 tid; 3276 int err, result; 3277 int mds = session->s_mds; 3278 bool close_sessions = false; 3279 3280 if (msg->front.iov_len < sizeof(*head)) { 3281 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 3282 ceph_msg_dump(msg); 3283 return; 3284 } 3285 3286 /* get request, session */ 3287 tid = le64_to_cpu(msg->hdr.tid); 3288 mutex_lock(&mdsc->mutex); 3289 req = lookup_get_request(mdsc, tid); 3290 if (!req) { 3291 dout("handle_reply on unknown tid %llu\n", tid); 3292 mutex_unlock(&mdsc->mutex); 3293 return; 3294 } 3295 dout("handle_reply %p\n", req); 3296 3297 /* correct session? */ 3298 if (req->r_session != session) { 3299 pr_err("mdsc_handle_reply got %llu on session mds%d" 3300 " not mds%d\n", tid, session->s_mds, 3301 req->r_session ? req->r_session->s_mds : -1); 3302 mutex_unlock(&mdsc->mutex); 3303 goto out; 3304 } 3305 3306 /* dup? */ 3307 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3308 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3309 pr_warn("got a dup %s reply on %llu from mds%d\n", 3310 head->safe ? "safe" : "unsafe", tid, mds); 3311 mutex_unlock(&mdsc->mutex); 3312 goto out; 3313 } 3314 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3315 pr_warn("got unsafe after safe on %llu from mds%d\n", 3316 tid, mds); 3317 mutex_unlock(&mdsc->mutex); 3318 goto out; 3319 } 3320 3321 result = le32_to_cpu(head->result); 3322 3323 if (head->safe) { 3324 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3325 __unregister_request(mdsc, req); 3326 3327 /* last request during umount? */ 3328 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3329 complete_all(&mdsc->safe_umount_waiters); 3330 3331 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3332 /* 3333 * We already handled the unsafe response, now do the 3334 * cleanup. No need to examine the response; the MDS 3335 * doesn't include any result info in the safe 3336 * response. And even if it did, there is nothing 3337 * useful we could do with a revised return value. 3338 */ 3339 dout("got safe reply %llu, mds%d\n", tid, mds); 3340 3341 mutex_unlock(&mdsc->mutex); 3342 goto out; 3343 } 3344 } else { 3345 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3346 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3347 } 3348 3349 dout("handle_reply tid %lld result %d\n", tid, result); 3350 rinfo = &req->r_reply_info; 3351 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3352 err = parse_reply_info(session, msg, rinfo, (u64)-1); 3353 else 3354 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); 3355 mutex_unlock(&mdsc->mutex); 3356 3357 /* Must find target inode outside of mutexes to avoid deadlocks */ 3358 if ((err >= 0) && rinfo->head->is_target) { 3359 struct inode *in; 3360 struct ceph_vino tvino = { 3361 .ino = le64_to_cpu(rinfo->targeti.in->ino), 3362 .snap = le64_to_cpu(rinfo->targeti.in->snapid) 3363 }; 3364 3365 in = ceph_get_inode(mdsc->fsc->sb, tvino); 3366 if (IS_ERR(in)) { 3367 err = PTR_ERR(in); 3368 mutex_lock(&session->s_mutex); 3369 goto out_err; 3370 } 3371 req->r_target_inode = in; 3372 } 3373 3374 mutex_lock(&session->s_mutex); 3375 if (err < 0) { 3376 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 3377 ceph_msg_dump(msg); 3378 goto out_err; 3379 } 3380 3381 /* snap trace */ 3382 realm = NULL; 3383 if (rinfo->snapblob_len) { 3384 down_write(&mdsc->snap_rwsem); 3385 err = ceph_update_snap_trace(mdsc, rinfo->snapblob, 3386 rinfo->snapblob + rinfo->snapblob_len, 3387 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3388 &realm); 3389 if (err) { 3390 up_write(&mdsc->snap_rwsem); 3391 close_sessions = true; 3392 if (err == -EIO) 3393 ceph_msg_dump(msg); 3394 goto out_err; 3395 } 3396 downgrade_write(&mdsc->snap_rwsem); 3397 } else { 3398 down_read(&mdsc->snap_rwsem); 3399 } 3400 3401 /* insert trace into our cache */ 3402 mutex_lock(&req->r_fill_mutex); 3403 current->journal_info = req; 3404 err = ceph_fill_trace(mdsc->fsc->sb, req); 3405 if (err == 0) { 3406 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3407 req->r_op == CEPH_MDS_OP_LSSNAP)) 3408 ceph_readdir_prepopulate(req, req->r_session); 3409 } 3410 current->journal_info = NULL; 3411 mutex_unlock(&req->r_fill_mutex); 3412 3413 up_read(&mdsc->snap_rwsem); 3414 if (realm) 3415 ceph_put_snap_realm(mdsc, realm); 3416 3417 if (err == 0) { 3418 if (req->r_target_inode && 3419 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3420 struct ceph_inode_info *ci = 3421 ceph_inode(req->r_target_inode); 3422 spin_lock(&ci->i_unsafe_lock); 3423 list_add_tail(&req->r_unsafe_target_item, 3424 &ci->i_unsafe_iops); 3425 spin_unlock(&ci->i_unsafe_lock); 3426 } 3427 3428 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3429 } 3430 out_err: 3431 mutex_lock(&mdsc->mutex); 3432 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3433 if (err) { 3434 req->r_err = err; 3435 } else { 3436 req->r_reply = ceph_msg_get(msg); 3437 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3438 } 3439 } else { 3440 dout("reply arrived after request %lld was aborted\n", tid); 3441 } 3442 mutex_unlock(&mdsc->mutex); 3443 3444 mutex_unlock(&session->s_mutex); 3445 3446 /* kick calling process */ 3447 complete_request(mdsc, req); 3448 3449 ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency, 3450 req->r_end_latency, err); 3451 out: 3452 ceph_mdsc_put_request(req); 3453 3454 /* Defer closing the sessions after s_mutex lock being released */ 3455 if (close_sessions) 3456 ceph_mdsc_close_sessions(mdsc); 3457 return; 3458 } 3459 3460 3461 3462 /* 3463 * handle mds notification that our request has been forwarded. 3464 */ 3465 static void handle_forward(struct ceph_mds_client *mdsc, 3466 struct ceph_mds_session *session, 3467 struct ceph_msg *msg) 3468 { 3469 struct ceph_mds_request *req; 3470 u64 tid = le64_to_cpu(msg->hdr.tid); 3471 u32 next_mds; 3472 u32 fwd_seq; 3473 int err = -EINVAL; 3474 void *p = msg->front.iov_base; 3475 void *end = p + msg->front.iov_len; 3476 bool aborted = false; 3477 3478 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3479 next_mds = ceph_decode_32(&p); 3480 fwd_seq = ceph_decode_32(&p); 3481 3482 mutex_lock(&mdsc->mutex); 3483 req = lookup_get_request(mdsc, tid); 3484 if (!req) { 3485 mutex_unlock(&mdsc->mutex); 3486 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 3487 return; /* dup reply? */ 3488 } 3489 3490 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3491 dout("forward tid %llu aborted, unregistering\n", tid); 3492 __unregister_request(mdsc, req); 3493 } else if (fwd_seq <= req->r_num_fwd) { 3494 /* 3495 * The type of 'num_fwd' in ceph 'MClientRequestForward' 3496 * is 'int32_t', while in 'ceph_mds_request_head' the 3497 * type is '__u8'. So in case the request bounces between 3498 * MDSes exceeding 256 times, the client will get stuck. 3499 * 3500 * In this case it's ususally a bug in MDS and continue 3501 * bouncing the request makes no sense. 3502 * 3503 * In future this could be fixed in ceph code, so avoid 3504 * using the hardcode here. 3505 */ 3506 int max = sizeof_field(struct ceph_mds_request_head, num_fwd); 3507 max = 1 << (max * BITS_PER_BYTE); 3508 if (req->r_num_fwd >= max) { 3509 mutex_lock(&req->r_fill_mutex); 3510 req->r_err = -EMULTIHOP; 3511 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3512 mutex_unlock(&req->r_fill_mutex); 3513 aborted = true; 3514 pr_warn_ratelimited("forward tid %llu seq overflow\n", 3515 tid); 3516 } else { 3517 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 3518 tid, next_mds, req->r_num_fwd, fwd_seq); 3519 } 3520 } else { 3521 /* resend. forward race not possible; mds would drop */ 3522 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3523 BUG_ON(req->r_err); 3524 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3525 req->r_attempts = 0; 3526 req->r_num_fwd = fwd_seq; 3527 req->r_resend_mds = next_mds; 3528 put_request_session(req); 3529 __do_request(mdsc, req); 3530 } 3531 mutex_unlock(&mdsc->mutex); 3532 3533 /* kick calling process */ 3534 if (aborted) 3535 complete_request(mdsc, req); 3536 ceph_mdsc_put_request(req); 3537 return; 3538 3539 bad: 3540 pr_err("mdsc_handle_forward decode error err=%d\n", err); 3541 } 3542 3543 static int __decode_session_metadata(void **p, void *end, 3544 bool *blocklisted) 3545 { 3546 /* map<string,string> */ 3547 u32 n; 3548 bool err_str; 3549 ceph_decode_32_safe(p, end, n, bad); 3550 while (n-- > 0) { 3551 u32 len; 3552 ceph_decode_32_safe(p, end, len, bad); 3553 ceph_decode_need(p, end, len, bad); 3554 err_str = !strncmp(*p, "error_string", len); 3555 *p += len; 3556 ceph_decode_32_safe(p, end, len, bad); 3557 ceph_decode_need(p, end, len, bad); 3558 /* 3559 * Match "blocklisted (blacklisted)" from newer MDSes, 3560 * or "blacklisted" from older MDSes. 3561 */ 3562 if (err_str && strnstr(*p, "blacklisted", len)) 3563 *blocklisted = true; 3564 *p += len; 3565 } 3566 return 0; 3567 bad: 3568 return -1; 3569 } 3570 3571 /* 3572 * handle a mds session control message 3573 */ 3574 static void handle_session(struct ceph_mds_session *session, 3575 struct ceph_msg *msg) 3576 { 3577 struct ceph_mds_client *mdsc = session->s_mdsc; 3578 int mds = session->s_mds; 3579 int msg_version = le16_to_cpu(msg->hdr.version); 3580 void *p = msg->front.iov_base; 3581 void *end = p + msg->front.iov_len; 3582 struct ceph_mds_session_head *h; 3583 u32 op; 3584 u64 seq, features = 0; 3585 int wake = 0; 3586 bool blocklisted = false; 3587 3588 /* decode */ 3589 ceph_decode_need(&p, end, sizeof(*h), bad); 3590 h = p; 3591 p += sizeof(*h); 3592 3593 op = le32_to_cpu(h->op); 3594 seq = le64_to_cpu(h->seq); 3595 3596 if (msg_version >= 3) { 3597 u32 len; 3598 /* version >= 2 and < 5, decode metadata, skip otherwise 3599 * as it's handled via flags. 3600 */ 3601 if (msg_version >= 5) 3602 ceph_decode_skip_map(&p, end, string, string, bad); 3603 else if (__decode_session_metadata(&p, end, &blocklisted) < 0) 3604 goto bad; 3605 3606 /* version >= 3, feature bits */ 3607 ceph_decode_32_safe(&p, end, len, bad); 3608 if (len) { 3609 ceph_decode_64_safe(&p, end, features, bad); 3610 p += len - sizeof(features); 3611 } 3612 } 3613 3614 if (msg_version >= 5) { 3615 u32 flags, len; 3616 3617 /* version >= 4 */ 3618 ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */ 3619 ceph_decode_32_safe(&p, end, len, bad); /* len */ 3620 ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */ 3621 3622 /* version >= 5, flags */ 3623 ceph_decode_32_safe(&p, end, flags, bad); 3624 if (flags & CEPH_SESSION_BLOCKLISTED) { 3625 pr_warn("mds%d session blocklisted\n", session->s_mds); 3626 blocklisted = true; 3627 } 3628 } 3629 3630 mutex_lock(&mdsc->mutex); 3631 if (op == CEPH_SESSION_CLOSE) { 3632 ceph_get_mds_session(session); 3633 __unregister_session(mdsc, session); 3634 } 3635 /* FIXME: this ttl calculation is generous */ 3636 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 3637 mutex_unlock(&mdsc->mutex); 3638 3639 mutex_lock(&session->s_mutex); 3640 3641 dout("handle_session mds%d %s %p state %s seq %llu\n", 3642 mds, ceph_session_op_name(op), session, 3643 ceph_session_state_name(session->s_state), seq); 3644 3645 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 3646 session->s_state = CEPH_MDS_SESSION_OPEN; 3647 pr_info("mds%d came back\n", session->s_mds); 3648 } 3649 3650 switch (op) { 3651 case CEPH_SESSION_OPEN: 3652 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3653 pr_info("mds%d reconnect success\n", session->s_mds); 3654 3655 if (session->s_state == CEPH_MDS_SESSION_OPEN) { 3656 pr_notice("mds%d is already opened\n", session->s_mds); 3657 } else { 3658 session->s_state = CEPH_MDS_SESSION_OPEN; 3659 session->s_features = features; 3660 renewed_caps(mdsc, session, 0); 3661 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, 3662 &session->s_features)) 3663 metric_schedule_delayed(&mdsc->metric); 3664 } 3665 3666 /* 3667 * The connection maybe broken and the session in client 3668 * side has been reinitialized, need to update the seq 3669 * anyway. 3670 */ 3671 if (!session->s_seq && seq) 3672 session->s_seq = seq; 3673 3674 wake = 1; 3675 if (mdsc->stopping) 3676 __close_session(mdsc, session); 3677 break; 3678 3679 case CEPH_SESSION_RENEWCAPS: 3680 if (session->s_renew_seq == seq) 3681 renewed_caps(mdsc, session, 1); 3682 break; 3683 3684 case CEPH_SESSION_CLOSE: 3685 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3686 pr_info("mds%d reconnect denied\n", session->s_mds); 3687 session->s_state = CEPH_MDS_SESSION_CLOSED; 3688 cleanup_session_requests(mdsc, session); 3689 remove_session_caps(session); 3690 wake = 2; /* for good measure */ 3691 wake_up_all(&mdsc->session_close_wq); 3692 break; 3693 3694 case CEPH_SESSION_STALE: 3695 pr_info("mds%d caps went stale, renewing\n", 3696 session->s_mds); 3697 atomic_inc(&session->s_cap_gen); 3698 session->s_cap_ttl = jiffies - 1; 3699 send_renew_caps(mdsc, session); 3700 break; 3701 3702 case CEPH_SESSION_RECALL_STATE: 3703 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 3704 break; 3705 3706 case CEPH_SESSION_FLUSHMSG: 3707 /* flush cap releases */ 3708 spin_lock(&session->s_cap_lock); 3709 if (session->s_num_cap_releases) 3710 ceph_flush_cap_releases(mdsc, session); 3711 spin_unlock(&session->s_cap_lock); 3712 3713 send_flushmsg_ack(mdsc, session, seq); 3714 break; 3715 3716 case CEPH_SESSION_FORCE_RO: 3717 dout("force_session_readonly %p\n", session); 3718 spin_lock(&session->s_cap_lock); 3719 session->s_readonly = true; 3720 spin_unlock(&session->s_cap_lock); 3721 wake_up_session_caps(session, FORCE_RO); 3722 break; 3723 3724 case CEPH_SESSION_REJECT: 3725 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 3726 pr_info("mds%d rejected session\n", session->s_mds); 3727 session->s_state = CEPH_MDS_SESSION_REJECTED; 3728 cleanup_session_requests(mdsc, session); 3729 remove_session_caps(session); 3730 if (blocklisted) 3731 mdsc->fsc->blocklisted = true; 3732 wake = 2; /* for good measure */ 3733 break; 3734 3735 default: 3736 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 3737 WARN_ON(1); 3738 } 3739 3740 mutex_unlock(&session->s_mutex); 3741 if (wake) { 3742 mutex_lock(&mdsc->mutex); 3743 __wake_requests(mdsc, &session->s_waiting); 3744 if (wake == 2) 3745 kick_requests(mdsc, mds); 3746 mutex_unlock(&mdsc->mutex); 3747 } 3748 if (op == CEPH_SESSION_CLOSE) 3749 ceph_put_mds_session(session); 3750 return; 3751 3752 bad: 3753 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 3754 (int)msg->front.iov_len); 3755 ceph_msg_dump(msg); 3756 return; 3757 } 3758 3759 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 3760 { 3761 int dcaps; 3762 3763 dcaps = xchg(&req->r_dir_caps, 0); 3764 if (dcaps) { 3765 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3766 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 3767 } 3768 } 3769 3770 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req) 3771 { 3772 int dcaps; 3773 3774 dcaps = xchg(&req->r_dir_caps, 0); 3775 if (dcaps) { 3776 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3777 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent), 3778 dcaps); 3779 } 3780 } 3781 3782 /* 3783 * called under session->mutex. 3784 */ 3785 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 3786 struct ceph_mds_session *session) 3787 { 3788 struct ceph_mds_request *req, *nreq; 3789 struct rb_node *p; 3790 3791 dout("replay_unsafe_requests mds%d\n", session->s_mds); 3792 3793 mutex_lock(&mdsc->mutex); 3794 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 3795 __send_request(session, req, true); 3796 3797 /* 3798 * also re-send old requests when MDS enters reconnect stage. So that MDS 3799 * can process completed request in clientreplay stage. 3800 */ 3801 p = rb_first(&mdsc->request_tree); 3802 while (p) { 3803 req = rb_entry(p, struct ceph_mds_request, r_node); 3804 p = rb_next(p); 3805 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3806 continue; 3807 if (req->r_attempts == 0) 3808 continue; /* only old requests */ 3809 if (!req->r_session) 3810 continue; 3811 if (req->r_session->s_mds != session->s_mds) 3812 continue; 3813 3814 ceph_mdsc_release_dir_caps_no_check(req); 3815 3816 __send_request(session, req, true); 3817 } 3818 mutex_unlock(&mdsc->mutex); 3819 } 3820 3821 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 3822 { 3823 struct ceph_msg *reply; 3824 struct ceph_pagelist *_pagelist; 3825 struct page *page; 3826 __le32 *addr; 3827 int err = -ENOMEM; 3828 3829 if (!recon_state->allow_multi) 3830 return -ENOSPC; 3831 3832 /* can't handle message that contains both caps and realm */ 3833 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 3834 3835 /* pre-allocate new pagelist */ 3836 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 3837 if (!_pagelist) 3838 return -ENOMEM; 3839 3840 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3841 if (!reply) 3842 goto fail_msg; 3843 3844 /* placeholder for nr_caps */ 3845 err = ceph_pagelist_encode_32(_pagelist, 0); 3846 if (err < 0) 3847 goto fail; 3848 3849 if (recon_state->nr_caps) { 3850 /* currently encoding caps */ 3851 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 3852 if (err) 3853 goto fail; 3854 } else { 3855 /* placeholder for nr_realms (currently encoding relams) */ 3856 err = ceph_pagelist_encode_32(_pagelist, 0); 3857 if (err < 0) 3858 goto fail; 3859 } 3860 3861 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 3862 if (err) 3863 goto fail; 3864 3865 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 3866 addr = kmap_atomic(page); 3867 if (recon_state->nr_caps) { 3868 /* currently encoding caps */ 3869 *addr = cpu_to_le32(recon_state->nr_caps); 3870 } else { 3871 /* currently encoding relams */ 3872 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 3873 } 3874 kunmap_atomic(addr); 3875 3876 reply->hdr.version = cpu_to_le16(5); 3877 reply->hdr.compat_version = cpu_to_le16(4); 3878 3879 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 3880 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 3881 3882 ceph_con_send(&recon_state->session->s_con, reply); 3883 ceph_pagelist_release(recon_state->pagelist); 3884 3885 recon_state->pagelist = _pagelist; 3886 recon_state->nr_caps = 0; 3887 recon_state->nr_realms = 0; 3888 recon_state->msg_version = 5; 3889 return 0; 3890 fail: 3891 ceph_msg_put(reply); 3892 fail_msg: 3893 ceph_pagelist_release(_pagelist); 3894 return err; 3895 } 3896 3897 static struct dentry* d_find_primary(struct inode *inode) 3898 { 3899 struct dentry *alias, *dn = NULL; 3900 3901 if (hlist_empty(&inode->i_dentry)) 3902 return NULL; 3903 3904 spin_lock(&inode->i_lock); 3905 if (hlist_empty(&inode->i_dentry)) 3906 goto out_unlock; 3907 3908 if (S_ISDIR(inode->i_mode)) { 3909 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias); 3910 if (!IS_ROOT(alias)) 3911 dn = dget(alias); 3912 goto out_unlock; 3913 } 3914 3915 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) { 3916 spin_lock(&alias->d_lock); 3917 if (!d_unhashed(alias) && 3918 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 3919 dn = dget_dlock(alias); 3920 } 3921 spin_unlock(&alias->d_lock); 3922 if (dn) 3923 break; 3924 } 3925 out_unlock: 3926 spin_unlock(&inode->i_lock); 3927 return dn; 3928 } 3929 3930 /* 3931 * Encode information about a cap for a reconnect with the MDS. 3932 */ 3933 static int reconnect_caps_cb(struct inode *inode, int mds, void *arg) 3934 { 3935 union { 3936 struct ceph_mds_cap_reconnect v2; 3937 struct ceph_mds_cap_reconnect_v1 v1; 3938 } rec; 3939 struct ceph_inode_info *ci = ceph_inode(inode); 3940 struct ceph_reconnect_state *recon_state = arg; 3941 struct ceph_pagelist *pagelist = recon_state->pagelist; 3942 struct dentry *dentry; 3943 struct ceph_cap *cap; 3944 char *path; 3945 int pathlen = 0, err = 0; 3946 u64 pathbase; 3947 u64 snap_follows; 3948 3949 dentry = d_find_primary(inode); 3950 if (dentry) { 3951 /* set pathbase to parent dir when msg_version >= 2 */ 3952 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 3953 recon_state->msg_version >= 2); 3954 dput(dentry); 3955 if (IS_ERR(path)) { 3956 err = PTR_ERR(path); 3957 goto out_err; 3958 } 3959 } else { 3960 path = NULL; 3961 pathbase = 0; 3962 } 3963 3964 spin_lock(&ci->i_ceph_lock); 3965 cap = __get_cap_for_mds(ci, mds); 3966 if (!cap) { 3967 spin_unlock(&ci->i_ceph_lock); 3968 goto out_err; 3969 } 3970 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 3971 inode, ceph_vinop(inode), cap, cap->cap_id, 3972 ceph_cap_string(cap->issued)); 3973 3974 cap->seq = 0; /* reset cap seq */ 3975 cap->issue_seq = 0; /* and issue_seq */ 3976 cap->mseq = 0; /* and migrate_seq */ 3977 cap->cap_gen = atomic_read(&cap->session->s_cap_gen); 3978 3979 /* These are lost when the session goes away */ 3980 if (S_ISDIR(inode->i_mode)) { 3981 if (cap->issued & CEPH_CAP_DIR_CREATE) { 3982 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 3983 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 3984 } 3985 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 3986 } 3987 3988 if (recon_state->msg_version >= 2) { 3989 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 3990 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3991 rec.v2.issued = cpu_to_le32(cap->issued); 3992 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3993 rec.v2.pathbase = cpu_to_le64(pathbase); 3994 rec.v2.flock_len = (__force __le32) 3995 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 3996 } else { 3997 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 3998 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3999 rec.v1.issued = cpu_to_le32(cap->issued); 4000 rec.v1.size = cpu_to_le64(i_size_read(inode)); 4001 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 4002 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 4003 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 4004 rec.v1.pathbase = cpu_to_le64(pathbase); 4005 } 4006 4007 if (list_empty(&ci->i_cap_snaps)) { 4008 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 4009 } else { 4010 struct ceph_cap_snap *capsnap = 4011 list_first_entry(&ci->i_cap_snaps, 4012 struct ceph_cap_snap, ci_item); 4013 snap_follows = capsnap->follows; 4014 } 4015 spin_unlock(&ci->i_ceph_lock); 4016 4017 if (recon_state->msg_version >= 2) { 4018 int num_fcntl_locks, num_flock_locks; 4019 struct ceph_filelock *flocks = NULL; 4020 size_t struct_len, total_len = sizeof(u64); 4021 u8 struct_v = 0; 4022 4023 encode_again: 4024 if (rec.v2.flock_len) { 4025 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 4026 } else { 4027 num_fcntl_locks = 0; 4028 num_flock_locks = 0; 4029 } 4030 if (num_fcntl_locks + num_flock_locks > 0) { 4031 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 4032 sizeof(struct ceph_filelock), 4033 GFP_NOFS); 4034 if (!flocks) { 4035 err = -ENOMEM; 4036 goto out_err; 4037 } 4038 err = ceph_encode_locks_to_buffer(inode, flocks, 4039 num_fcntl_locks, 4040 num_flock_locks); 4041 if (err) { 4042 kfree(flocks); 4043 flocks = NULL; 4044 if (err == -ENOSPC) 4045 goto encode_again; 4046 goto out_err; 4047 } 4048 } else { 4049 kfree(flocks); 4050 flocks = NULL; 4051 } 4052 4053 if (recon_state->msg_version >= 3) { 4054 /* version, compat_version and struct_len */ 4055 total_len += 2 * sizeof(u8) + sizeof(u32); 4056 struct_v = 2; 4057 } 4058 /* 4059 * number of encoded locks is stable, so copy to pagelist 4060 */ 4061 struct_len = 2 * sizeof(u32) + 4062 (num_fcntl_locks + num_flock_locks) * 4063 sizeof(struct ceph_filelock); 4064 rec.v2.flock_len = cpu_to_le32(struct_len); 4065 4066 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2); 4067 4068 if (struct_v >= 2) 4069 struct_len += sizeof(u64); /* snap_follows */ 4070 4071 total_len += struct_len; 4072 4073 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 4074 err = send_reconnect_partial(recon_state); 4075 if (err) 4076 goto out_freeflocks; 4077 pagelist = recon_state->pagelist; 4078 } 4079 4080 err = ceph_pagelist_reserve(pagelist, total_len); 4081 if (err) 4082 goto out_freeflocks; 4083 4084 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4085 if (recon_state->msg_version >= 3) { 4086 ceph_pagelist_encode_8(pagelist, struct_v); 4087 ceph_pagelist_encode_8(pagelist, 1); 4088 ceph_pagelist_encode_32(pagelist, struct_len); 4089 } 4090 ceph_pagelist_encode_string(pagelist, path, pathlen); 4091 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 4092 ceph_locks_to_pagelist(flocks, pagelist, 4093 num_fcntl_locks, num_flock_locks); 4094 if (struct_v >= 2) 4095 ceph_pagelist_encode_64(pagelist, snap_follows); 4096 out_freeflocks: 4097 kfree(flocks); 4098 } else { 4099 err = ceph_pagelist_reserve(pagelist, 4100 sizeof(u64) + sizeof(u32) + 4101 pathlen + sizeof(rec.v1)); 4102 if (err) 4103 goto out_err; 4104 4105 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4106 ceph_pagelist_encode_string(pagelist, path, pathlen); 4107 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 4108 } 4109 4110 out_err: 4111 ceph_mdsc_free_path(path, pathlen); 4112 if (!err) 4113 recon_state->nr_caps++; 4114 return err; 4115 } 4116 4117 static int encode_snap_realms(struct ceph_mds_client *mdsc, 4118 struct ceph_reconnect_state *recon_state) 4119 { 4120 struct rb_node *p; 4121 struct ceph_pagelist *pagelist = recon_state->pagelist; 4122 int err = 0; 4123 4124 if (recon_state->msg_version >= 4) { 4125 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 4126 if (err < 0) 4127 goto fail; 4128 } 4129 4130 /* 4131 * snaprealms. we provide mds with the ino, seq (version), and 4132 * parent for all of our realms. If the mds has any newer info, 4133 * it will tell us. 4134 */ 4135 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 4136 struct ceph_snap_realm *realm = 4137 rb_entry(p, struct ceph_snap_realm, node); 4138 struct ceph_mds_snaprealm_reconnect sr_rec; 4139 4140 if (recon_state->msg_version >= 4) { 4141 size_t need = sizeof(u8) * 2 + sizeof(u32) + 4142 sizeof(sr_rec); 4143 4144 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 4145 err = send_reconnect_partial(recon_state); 4146 if (err) 4147 goto fail; 4148 pagelist = recon_state->pagelist; 4149 } 4150 4151 err = ceph_pagelist_reserve(pagelist, need); 4152 if (err) 4153 goto fail; 4154 4155 ceph_pagelist_encode_8(pagelist, 1); 4156 ceph_pagelist_encode_8(pagelist, 1); 4157 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 4158 } 4159 4160 dout(" adding snap realm %llx seq %lld parent %llx\n", 4161 realm->ino, realm->seq, realm->parent_ino); 4162 sr_rec.ino = cpu_to_le64(realm->ino); 4163 sr_rec.seq = cpu_to_le64(realm->seq); 4164 sr_rec.parent = cpu_to_le64(realm->parent_ino); 4165 4166 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 4167 if (err) 4168 goto fail; 4169 4170 recon_state->nr_realms++; 4171 } 4172 fail: 4173 return err; 4174 } 4175 4176 4177 /* 4178 * If an MDS fails and recovers, clients need to reconnect in order to 4179 * reestablish shared state. This includes all caps issued through 4180 * this session _and_ the snap_realm hierarchy. Because it's not 4181 * clear which snap realms the mds cares about, we send everything we 4182 * know about.. that ensures we'll then get any new info the 4183 * recovering MDS might have. 4184 * 4185 * This is a relatively heavyweight operation, but it's rare. 4186 */ 4187 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 4188 struct ceph_mds_session *session) 4189 { 4190 struct ceph_msg *reply; 4191 int mds = session->s_mds; 4192 int err = -ENOMEM; 4193 struct ceph_reconnect_state recon_state = { 4194 .session = session, 4195 }; 4196 LIST_HEAD(dispose); 4197 4198 pr_info("mds%d reconnect start\n", mds); 4199 4200 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 4201 if (!recon_state.pagelist) 4202 goto fail_nopagelist; 4203 4204 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4205 if (!reply) 4206 goto fail_nomsg; 4207 4208 xa_destroy(&session->s_delegated_inos); 4209 4210 mutex_lock(&session->s_mutex); 4211 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 4212 session->s_seq = 0; 4213 4214 dout("session %p state %s\n", session, 4215 ceph_session_state_name(session->s_state)); 4216 4217 atomic_inc(&session->s_cap_gen); 4218 4219 spin_lock(&session->s_cap_lock); 4220 /* don't know if session is readonly */ 4221 session->s_readonly = 0; 4222 /* 4223 * notify __ceph_remove_cap() that we are composing cap reconnect. 4224 * If a cap get released before being added to the cap reconnect, 4225 * __ceph_remove_cap() should skip queuing cap release. 4226 */ 4227 session->s_cap_reconnect = 1; 4228 /* drop old cap expires; we're about to reestablish that state */ 4229 detach_cap_releases(session, &dispose); 4230 spin_unlock(&session->s_cap_lock); 4231 dispose_cap_releases(mdsc, &dispose); 4232 4233 /* trim unused caps to reduce MDS's cache rejoin time */ 4234 if (mdsc->fsc->sb->s_root) 4235 shrink_dcache_parent(mdsc->fsc->sb->s_root); 4236 4237 ceph_con_close(&session->s_con); 4238 ceph_con_open(&session->s_con, 4239 CEPH_ENTITY_TYPE_MDS, mds, 4240 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 4241 4242 /* replay unsafe requests */ 4243 replay_unsafe_requests(mdsc, session); 4244 4245 ceph_early_kick_flushing_caps(mdsc, session); 4246 4247 down_read(&mdsc->snap_rwsem); 4248 4249 /* placeholder for nr_caps */ 4250 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 4251 if (err) 4252 goto fail; 4253 4254 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 4255 recon_state.msg_version = 3; 4256 recon_state.allow_multi = true; 4257 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 4258 recon_state.msg_version = 3; 4259 } else { 4260 recon_state.msg_version = 2; 4261 } 4262 /* trsaverse this session's caps */ 4263 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 4264 4265 spin_lock(&session->s_cap_lock); 4266 session->s_cap_reconnect = 0; 4267 spin_unlock(&session->s_cap_lock); 4268 4269 if (err < 0) 4270 goto fail; 4271 4272 /* check if all realms can be encoded into current message */ 4273 if (mdsc->num_snap_realms) { 4274 size_t total_len = 4275 recon_state.pagelist->length + 4276 mdsc->num_snap_realms * 4277 sizeof(struct ceph_mds_snaprealm_reconnect); 4278 if (recon_state.msg_version >= 4) { 4279 /* number of realms */ 4280 total_len += sizeof(u32); 4281 /* version, compat_version and struct_len */ 4282 total_len += mdsc->num_snap_realms * 4283 (2 * sizeof(u8) + sizeof(u32)); 4284 } 4285 if (total_len > RECONNECT_MAX_SIZE) { 4286 if (!recon_state.allow_multi) { 4287 err = -ENOSPC; 4288 goto fail; 4289 } 4290 if (recon_state.nr_caps) { 4291 err = send_reconnect_partial(&recon_state); 4292 if (err) 4293 goto fail; 4294 } 4295 recon_state.msg_version = 5; 4296 } 4297 } 4298 4299 err = encode_snap_realms(mdsc, &recon_state); 4300 if (err < 0) 4301 goto fail; 4302 4303 if (recon_state.msg_version >= 5) { 4304 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 4305 if (err < 0) 4306 goto fail; 4307 } 4308 4309 if (recon_state.nr_caps || recon_state.nr_realms) { 4310 struct page *page = 4311 list_first_entry(&recon_state.pagelist->head, 4312 struct page, lru); 4313 __le32 *addr = kmap_atomic(page); 4314 if (recon_state.nr_caps) { 4315 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 4316 *addr = cpu_to_le32(recon_state.nr_caps); 4317 } else if (recon_state.msg_version >= 4) { 4318 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 4319 } 4320 kunmap_atomic(addr); 4321 } 4322 4323 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 4324 if (recon_state.msg_version >= 4) 4325 reply->hdr.compat_version = cpu_to_le16(4); 4326 4327 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 4328 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 4329 4330 ceph_con_send(&session->s_con, reply); 4331 4332 mutex_unlock(&session->s_mutex); 4333 4334 mutex_lock(&mdsc->mutex); 4335 __wake_requests(mdsc, &session->s_waiting); 4336 mutex_unlock(&mdsc->mutex); 4337 4338 up_read(&mdsc->snap_rwsem); 4339 ceph_pagelist_release(recon_state.pagelist); 4340 return; 4341 4342 fail: 4343 ceph_msg_put(reply); 4344 up_read(&mdsc->snap_rwsem); 4345 mutex_unlock(&session->s_mutex); 4346 fail_nomsg: 4347 ceph_pagelist_release(recon_state.pagelist); 4348 fail_nopagelist: 4349 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 4350 return; 4351 } 4352 4353 4354 /* 4355 * compare old and new mdsmaps, kicking requests 4356 * and closing out old connections as necessary 4357 * 4358 * called under mdsc->mutex. 4359 */ 4360 static void check_new_map(struct ceph_mds_client *mdsc, 4361 struct ceph_mdsmap *newmap, 4362 struct ceph_mdsmap *oldmap) 4363 { 4364 int i, j, err; 4365 int oldstate, newstate; 4366 struct ceph_mds_session *s; 4367 unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0}; 4368 4369 dout("check_new_map new %u old %u\n", 4370 newmap->m_epoch, oldmap->m_epoch); 4371 4372 if (newmap->m_info) { 4373 for (i = 0; i < newmap->possible_max_rank; i++) { 4374 for (j = 0; j < newmap->m_info[i].num_export_targets; j++) 4375 set_bit(newmap->m_info[i].export_targets[j], targets); 4376 } 4377 } 4378 4379 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4380 if (!mdsc->sessions[i]) 4381 continue; 4382 s = mdsc->sessions[i]; 4383 oldstate = ceph_mdsmap_get_state(oldmap, i); 4384 newstate = ceph_mdsmap_get_state(newmap, i); 4385 4386 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 4387 i, ceph_mds_state_name(oldstate), 4388 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 4389 ceph_mds_state_name(newstate), 4390 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 4391 ceph_session_state_name(s->s_state)); 4392 4393 if (i >= newmap->possible_max_rank) { 4394 /* force close session for stopped mds */ 4395 ceph_get_mds_session(s); 4396 __unregister_session(mdsc, s); 4397 __wake_requests(mdsc, &s->s_waiting); 4398 mutex_unlock(&mdsc->mutex); 4399 4400 mutex_lock(&s->s_mutex); 4401 cleanup_session_requests(mdsc, s); 4402 remove_session_caps(s); 4403 mutex_unlock(&s->s_mutex); 4404 4405 ceph_put_mds_session(s); 4406 4407 mutex_lock(&mdsc->mutex); 4408 kick_requests(mdsc, i); 4409 continue; 4410 } 4411 4412 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 4413 ceph_mdsmap_get_addr(newmap, i), 4414 sizeof(struct ceph_entity_addr))) { 4415 /* just close it */ 4416 mutex_unlock(&mdsc->mutex); 4417 mutex_lock(&s->s_mutex); 4418 mutex_lock(&mdsc->mutex); 4419 ceph_con_close(&s->s_con); 4420 mutex_unlock(&s->s_mutex); 4421 s->s_state = CEPH_MDS_SESSION_RESTARTING; 4422 } else if (oldstate == newstate) { 4423 continue; /* nothing new with this mds */ 4424 } 4425 4426 /* 4427 * send reconnect? 4428 */ 4429 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 4430 newstate >= CEPH_MDS_STATE_RECONNECT) { 4431 mutex_unlock(&mdsc->mutex); 4432 clear_bit(i, targets); 4433 send_mds_reconnect(mdsc, s); 4434 mutex_lock(&mdsc->mutex); 4435 } 4436 4437 /* 4438 * kick request on any mds that has gone active. 4439 */ 4440 if (oldstate < CEPH_MDS_STATE_ACTIVE && 4441 newstate >= CEPH_MDS_STATE_ACTIVE) { 4442 if (oldstate != CEPH_MDS_STATE_CREATING && 4443 oldstate != CEPH_MDS_STATE_STARTING) 4444 pr_info("mds%d recovery completed\n", s->s_mds); 4445 kick_requests(mdsc, i); 4446 mutex_unlock(&mdsc->mutex); 4447 mutex_lock(&s->s_mutex); 4448 mutex_lock(&mdsc->mutex); 4449 ceph_kick_flushing_caps(mdsc, s); 4450 mutex_unlock(&s->s_mutex); 4451 wake_up_session_caps(s, RECONNECT); 4452 } 4453 } 4454 4455 /* 4456 * Only open and reconnect sessions that don't exist yet. 4457 */ 4458 for (i = 0; i < newmap->possible_max_rank; i++) { 4459 /* 4460 * In case the import MDS is crashed just after 4461 * the EImportStart journal is flushed, so when 4462 * a standby MDS takes over it and is replaying 4463 * the EImportStart journal the new MDS daemon 4464 * will wait the client to reconnect it, but the 4465 * client may never register/open the session yet. 4466 * 4467 * Will try to reconnect that MDS daemon if the 4468 * rank number is in the export targets array and 4469 * is the up:reconnect state. 4470 */ 4471 newstate = ceph_mdsmap_get_state(newmap, i); 4472 if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT) 4473 continue; 4474 4475 /* 4476 * The session maybe registered and opened by some 4477 * requests which were choosing random MDSes during 4478 * the mdsc->mutex's unlock/lock gap below in rare 4479 * case. But the related MDS daemon will just queue 4480 * that requests and be still waiting for the client's 4481 * reconnection request in up:reconnect state. 4482 */ 4483 s = __ceph_lookup_mds_session(mdsc, i); 4484 if (likely(!s)) { 4485 s = __open_export_target_session(mdsc, i); 4486 if (IS_ERR(s)) { 4487 err = PTR_ERR(s); 4488 pr_err("failed to open export target session, err %d\n", 4489 err); 4490 continue; 4491 } 4492 } 4493 dout("send reconnect to export target mds.%d\n", i); 4494 mutex_unlock(&mdsc->mutex); 4495 send_mds_reconnect(mdsc, s); 4496 ceph_put_mds_session(s); 4497 mutex_lock(&mdsc->mutex); 4498 } 4499 4500 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4501 s = mdsc->sessions[i]; 4502 if (!s) 4503 continue; 4504 if (!ceph_mdsmap_is_laggy(newmap, i)) 4505 continue; 4506 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4507 s->s_state == CEPH_MDS_SESSION_HUNG || 4508 s->s_state == CEPH_MDS_SESSION_CLOSING) { 4509 dout(" connecting to export targets of laggy mds%d\n", 4510 i); 4511 __open_export_target_sessions(mdsc, s); 4512 } 4513 } 4514 } 4515 4516 4517 4518 /* 4519 * leases 4520 */ 4521 4522 /* 4523 * caller must hold session s_mutex, dentry->d_lock 4524 */ 4525 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 4526 { 4527 struct ceph_dentry_info *di = ceph_dentry(dentry); 4528 4529 ceph_put_mds_session(di->lease_session); 4530 di->lease_session = NULL; 4531 } 4532 4533 static void handle_lease(struct ceph_mds_client *mdsc, 4534 struct ceph_mds_session *session, 4535 struct ceph_msg *msg) 4536 { 4537 struct super_block *sb = mdsc->fsc->sb; 4538 struct inode *inode; 4539 struct dentry *parent, *dentry; 4540 struct ceph_dentry_info *di; 4541 int mds = session->s_mds; 4542 struct ceph_mds_lease *h = msg->front.iov_base; 4543 u32 seq; 4544 struct ceph_vino vino; 4545 struct qstr dname; 4546 int release = 0; 4547 4548 dout("handle_lease from mds%d\n", mds); 4549 4550 /* decode */ 4551 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 4552 goto bad; 4553 vino.ino = le64_to_cpu(h->ino); 4554 vino.snap = CEPH_NOSNAP; 4555 seq = le32_to_cpu(h->seq); 4556 dname.len = get_unaligned_le32(h + 1); 4557 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 4558 goto bad; 4559 dname.name = (void *)(h + 1) + sizeof(u32); 4560 4561 /* lookup inode */ 4562 inode = ceph_find_inode(sb, vino); 4563 dout("handle_lease %s, ino %llx %p %.*s\n", 4564 ceph_lease_op_name(h->action), vino.ino, inode, 4565 dname.len, dname.name); 4566 4567 mutex_lock(&session->s_mutex); 4568 inc_session_sequence(session); 4569 4570 if (!inode) { 4571 dout("handle_lease no inode %llx\n", vino.ino); 4572 goto release; 4573 } 4574 4575 /* dentry */ 4576 parent = d_find_alias(inode); 4577 if (!parent) { 4578 dout("no parent dentry on inode %p\n", inode); 4579 WARN_ON(1); 4580 goto release; /* hrm... */ 4581 } 4582 dname.hash = full_name_hash(parent, dname.name, dname.len); 4583 dentry = d_lookup(parent, &dname); 4584 dput(parent); 4585 if (!dentry) 4586 goto release; 4587 4588 spin_lock(&dentry->d_lock); 4589 di = ceph_dentry(dentry); 4590 switch (h->action) { 4591 case CEPH_MDS_LEASE_REVOKE: 4592 if (di->lease_session == session) { 4593 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 4594 h->seq = cpu_to_le32(di->lease_seq); 4595 __ceph_mdsc_drop_dentry_lease(dentry); 4596 } 4597 release = 1; 4598 break; 4599 4600 case CEPH_MDS_LEASE_RENEW: 4601 if (di->lease_session == session && 4602 di->lease_gen == atomic_read(&session->s_cap_gen) && 4603 di->lease_renew_from && 4604 di->lease_renew_after == 0) { 4605 unsigned long duration = 4606 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 4607 4608 di->lease_seq = seq; 4609 di->time = di->lease_renew_from + duration; 4610 di->lease_renew_after = di->lease_renew_from + 4611 (duration >> 1); 4612 di->lease_renew_from = 0; 4613 } 4614 break; 4615 } 4616 spin_unlock(&dentry->d_lock); 4617 dput(dentry); 4618 4619 if (!release) 4620 goto out; 4621 4622 release: 4623 /* let's just reuse the same message */ 4624 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 4625 ceph_msg_get(msg); 4626 ceph_con_send(&session->s_con, msg); 4627 4628 out: 4629 mutex_unlock(&session->s_mutex); 4630 iput(inode); 4631 return; 4632 4633 bad: 4634 pr_err("corrupt lease message\n"); 4635 ceph_msg_dump(msg); 4636 } 4637 4638 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 4639 struct dentry *dentry, char action, 4640 u32 seq) 4641 { 4642 struct ceph_msg *msg; 4643 struct ceph_mds_lease *lease; 4644 struct inode *dir; 4645 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 4646 4647 dout("lease_send_msg identry %p %s to mds%d\n", 4648 dentry, ceph_lease_op_name(action), session->s_mds); 4649 4650 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 4651 if (!msg) 4652 return; 4653 lease = msg->front.iov_base; 4654 lease->action = action; 4655 lease->seq = cpu_to_le32(seq); 4656 4657 spin_lock(&dentry->d_lock); 4658 dir = d_inode(dentry->d_parent); 4659 lease->ino = cpu_to_le64(ceph_ino(dir)); 4660 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 4661 4662 put_unaligned_le32(dentry->d_name.len, lease + 1); 4663 memcpy((void *)(lease + 1) + 4, 4664 dentry->d_name.name, dentry->d_name.len); 4665 spin_unlock(&dentry->d_lock); 4666 4667 ceph_con_send(&session->s_con, msg); 4668 } 4669 4670 /* 4671 * lock unlock the session, to wait ongoing session activities 4672 */ 4673 static void lock_unlock_session(struct ceph_mds_session *s) 4674 { 4675 mutex_lock(&s->s_mutex); 4676 mutex_unlock(&s->s_mutex); 4677 } 4678 4679 static void maybe_recover_session(struct ceph_mds_client *mdsc) 4680 { 4681 struct ceph_fs_client *fsc = mdsc->fsc; 4682 4683 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 4684 return; 4685 4686 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 4687 return; 4688 4689 if (!READ_ONCE(fsc->blocklisted)) 4690 return; 4691 4692 pr_info("auto reconnect after blocklisted\n"); 4693 ceph_force_reconnect(fsc->sb); 4694 } 4695 4696 bool check_session_state(struct ceph_mds_session *s) 4697 { 4698 switch (s->s_state) { 4699 case CEPH_MDS_SESSION_OPEN: 4700 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 4701 s->s_state = CEPH_MDS_SESSION_HUNG; 4702 pr_info("mds%d hung\n", s->s_mds); 4703 } 4704 break; 4705 case CEPH_MDS_SESSION_CLOSING: 4706 case CEPH_MDS_SESSION_NEW: 4707 case CEPH_MDS_SESSION_RESTARTING: 4708 case CEPH_MDS_SESSION_CLOSED: 4709 case CEPH_MDS_SESSION_REJECTED: 4710 return false; 4711 } 4712 4713 return true; 4714 } 4715 4716 /* 4717 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 4718 * then we need to retransmit that request. 4719 */ 4720 void inc_session_sequence(struct ceph_mds_session *s) 4721 { 4722 lockdep_assert_held(&s->s_mutex); 4723 4724 s->s_seq++; 4725 4726 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 4727 int ret; 4728 4729 dout("resending session close request for mds%d\n", s->s_mds); 4730 ret = request_close_session(s); 4731 if (ret < 0) 4732 pr_err("unable to close session to mds%d: %d\n", 4733 s->s_mds, ret); 4734 } 4735 } 4736 4737 /* 4738 * delayed work -- periodically trim expired leases, renew caps with mds. If 4739 * the @delay parameter is set to 0 or if it's more than 5 secs, the default 4740 * workqueue delay value of 5 secs will be used. 4741 */ 4742 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay) 4743 { 4744 unsigned long max_delay = HZ * 5; 4745 4746 /* 5 secs default delay */ 4747 if (!delay || (delay > max_delay)) 4748 delay = max_delay; 4749 schedule_delayed_work(&mdsc->delayed_work, 4750 round_jiffies_relative(delay)); 4751 } 4752 4753 static void delayed_work(struct work_struct *work) 4754 { 4755 struct ceph_mds_client *mdsc = 4756 container_of(work, struct ceph_mds_client, delayed_work.work); 4757 unsigned long delay; 4758 int renew_interval; 4759 int renew_caps; 4760 int i; 4761 4762 dout("mdsc delayed_work\n"); 4763 4764 if (mdsc->stopping) 4765 return; 4766 4767 mutex_lock(&mdsc->mutex); 4768 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 4769 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 4770 mdsc->last_renew_caps); 4771 if (renew_caps) 4772 mdsc->last_renew_caps = jiffies; 4773 4774 for (i = 0; i < mdsc->max_sessions; i++) { 4775 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4776 if (!s) 4777 continue; 4778 4779 if (!check_session_state(s)) { 4780 ceph_put_mds_session(s); 4781 continue; 4782 } 4783 mutex_unlock(&mdsc->mutex); 4784 4785 mutex_lock(&s->s_mutex); 4786 if (renew_caps) 4787 send_renew_caps(mdsc, s); 4788 else 4789 ceph_con_keepalive(&s->s_con); 4790 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4791 s->s_state == CEPH_MDS_SESSION_HUNG) 4792 ceph_send_cap_releases(mdsc, s); 4793 mutex_unlock(&s->s_mutex); 4794 ceph_put_mds_session(s); 4795 4796 mutex_lock(&mdsc->mutex); 4797 } 4798 mutex_unlock(&mdsc->mutex); 4799 4800 delay = ceph_check_delayed_caps(mdsc); 4801 4802 ceph_queue_cap_reclaim_work(mdsc); 4803 4804 ceph_trim_snapid_map(mdsc); 4805 4806 maybe_recover_session(mdsc); 4807 4808 schedule_delayed(mdsc, delay); 4809 } 4810 4811 int ceph_mdsc_init(struct ceph_fs_client *fsc) 4812 4813 { 4814 struct ceph_mds_client *mdsc; 4815 int err; 4816 4817 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 4818 if (!mdsc) 4819 return -ENOMEM; 4820 mdsc->fsc = fsc; 4821 mutex_init(&mdsc->mutex); 4822 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 4823 if (!mdsc->mdsmap) { 4824 err = -ENOMEM; 4825 goto err_mdsc; 4826 } 4827 4828 init_completion(&mdsc->safe_umount_waiters); 4829 init_waitqueue_head(&mdsc->session_close_wq); 4830 INIT_LIST_HEAD(&mdsc->waiting_for_map); 4831 mdsc->quotarealms_inodes = RB_ROOT; 4832 mutex_init(&mdsc->quotarealms_inodes_mutex); 4833 init_rwsem(&mdsc->snap_rwsem); 4834 mdsc->snap_realms = RB_ROOT; 4835 INIT_LIST_HEAD(&mdsc->snap_empty); 4836 spin_lock_init(&mdsc->snap_empty_lock); 4837 mdsc->request_tree = RB_ROOT; 4838 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 4839 mdsc->last_renew_caps = jiffies; 4840 INIT_LIST_HEAD(&mdsc->cap_delay_list); 4841 INIT_LIST_HEAD(&mdsc->cap_wait_list); 4842 spin_lock_init(&mdsc->cap_delay_lock); 4843 INIT_LIST_HEAD(&mdsc->snap_flush_list); 4844 spin_lock_init(&mdsc->snap_flush_lock); 4845 mdsc->last_cap_flush_tid = 1; 4846 INIT_LIST_HEAD(&mdsc->cap_flush_list); 4847 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 4848 spin_lock_init(&mdsc->cap_dirty_lock); 4849 init_waitqueue_head(&mdsc->cap_flushing_wq); 4850 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 4851 err = ceph_metric_init(&mdsc->metric); 4852 if (err) 4853 goto err_mdsmap; 4854 4855 spin_lock_init(&mdsc->dentry_list_lock); 4856 INIT_LIST_HEAD(&mdsc->dentry_leases); 4857 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 4858 4859 ceph_caps_init(mdsc); 4860 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 4861 4862 spin_lock_init(&mdsc->snapid_map_lock); 4863 mdsc->snapid_map_tree = RB_ROOT; 4864 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 4865 4866 init_rwsem(&mdsc->pool_perm_rwsem); 4867 mdsc->pool_perm_tree = RB_ROOT; 4868 4869 strscpy(mdsc->nodename, utsname()->nodename, 4870 sizeof(mdsc->nodename)); 4871 4872 fsc->mdsc = mdsc; 4873 return 0; 4874 4875 err_mdsmap: 4876 kfree(mdsc->mdsmap); 4877 err_mdsc: 4878 kfree(mdsc); 4879 return err; 4880 } 4881 4882 /* 4883 * Wait for safe replies on open mds requests. If we time out, drop 4884 * all requests from the tree to avoid dangling dentry refs. 4885 */ 4886 static void wait_requests(struct ceph_mds_client *mdsc) 4887 { 4888 struct ceph_options *opts = mdsc->fsc->client->options; 4889 struct ceph_mds_request *req; 4890 4891 mutex_lock(&mdsc->mutex); 4892 if (__get_oldest_req(mdsc)) { 4893 mutex_unlock(&mdsc->mutex); 4894 4895 dout("wait_requests waiting for requests\n"); 4896 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 4897 ceph_timeout_jiffies(opts->mount_timeout)); 4898 4899 /* tear down remaining requests */ 4900 mutex_lock(&mdsc->mutex); 4901 while ((req = __get_oldest_req(mdsc))) { 4902 dout("wait_requests timed out on tid %llu\n", 4903 req->r_tid); 4904 list_del_init(&req->r_wait); 4905 __unregister_request(mdsc, req); 4906 } 4907 } 4908 mutex_unlock(&mdsc->mutex); 4909 dout("wait_requests done\n"); 4910 } 4911 4912 void send_flush_mdlog(struct ceph_mds_session *s) 4913 { 4914 struct ceph_msg *msg; 4915 4916 /* 4917 * Pre-luminous MDS crashes when it sees an unknown session request 4918 */ 4919 if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS)) 4920 return; 4921 4922 mutex_lock(&s->s_mutex); 4923 dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds, 4924 ceph_session_state_name(s->s_state), s->s_seq); 4925 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG, 4926 s->s_seq); 4927 if (!msg) { 4928 pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n", 4929 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 4930 } else { 4931 ceph_con_send(&s->s_con, msg); 4932 } 4933 mutex_unlock(&s->s_mutex); 4934 } 4935 4936 /* 4937 * called before mount is ro, and before dentries are torn down. 4938 * (hmm, does this still race with new lookups?) 4939 */ 4940 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 4941 { 4942 dout("pre_umount\n"); 4943 mdsc->stopping = 1; 4944 4945 ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true); 4946 ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false); 4947 ceph_flush_dirty_caps(mdsc); 4948 wait_requests(mdsc); 4949 4950 /* 4951 * wait for reply handlers to drop their request refs and 4952 * their inode/dcache refs 4953 */ 4954 ceph_msgr_flush(); 4955 4956 ceph_cleanup_quotarealms_inodes(mdsc); 4957 } 4958 4959 /* 4960 * flush the mdlog and wait for all write mds requests to flush. 4961 */ 4962 static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc, 4963 u64 want_tid) 4964 { 4965 struct ceph_mds_request *req = NULL, *nextreq; 4966 struct ceph_mds_session *last_session = NULL; 4967 struct rb_node *n; 4968 4969 mutex_lock(&mdsc->mutex); 4970 dout("%s want %lld\n", __func__, want_tid); 4971 restart: 4972 req = __get_oldest_req(mdsc); 4973 while (req && req->r_tid <= want_tid) { 4974 /* find next request */ 4975 n = rb_next(&req->r_node); 4976 if (n) 4977 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 4978 else 4979 nextreq = NULL; 4980 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 4981 (req->r_op & CEPH_MDS_OP_WRITE)) { 4982 struct ceph_mds_session *s = req->r_session; 4983 4984 if (!s) { 4985 req = nextreq; 4986 continue; 4987 } 4988 4989 /* write op */ 4990 ceph_mdsc_get_request(req); 4991 if (nextreq) 4992 ceph_mdsc_get_request(nextreq); 4993 s = ceph_get_mds_session(s); 4994 mutex_unlock(&mdsc->mutex); 4995 4996 /* send flush mdlog request to MDS */ 4997 if (last_session != s) { 4998 send_flush_mdlog(s); 4999 ceph_put_mds_session(last_session); 5000 last_session = s; 5001 } else { 5002 ceph_put_mds_session(s); 5003 } 5004 dout("%s wait on %llu (want %llu)\n", __func__, 5005 req->r_tid, want_tid); 5006 wait_for_completion(&req->r_safe_completion); 5007 5008 mutex_lock(&mdsc->mutex); 5009 ceph_mdsc_put_request(req); 5010 if (!nextreq) 5011 break; /* next dne before, so we're done! */ 5012 if (RB_EMPTY_NODE(&nextreq->r_node)) { 5013 /* next request was removed from tree */ 5014 ceph_mdsc_put_request(nextreq); 5015 goto restart; 5016 } 5017 ceph_mdsc_put_request(nextreq); /* won't go away */ 5018 } 5019 req = nextreq; 5020 } 5021 mutex_unlock(&mdsc->mutex); 5022 ceph_put_mds_session(last_session); 5023 dout("%s done\n", __func__); 5024 } 5025 5026 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 5027 { 5028 u64 want_tid, want_flush; 5029 5030 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) 5031 return; 5032 5033 dout("sync\n"); 5034 mutex_lock(&mdsc->mutex); 5035 want_tid = mdsc->last_tid; 5036 mutex_unlock(&mdsc->mutex); 5037 5038 ceph_flush_dirty_caps(mdsc); 5039 spin_lock(&mdsc->cap_dirty_lock); 5040 want_flush = mdsc->last_cap_flush_tid; 5041 if (!list_empty(&mdsc->cap_flush_list)) { 5042 struct ceph_cap_flush *cf = 5043 list_last_entry(&mdsc->cap_flush_list, 5044 struct ceph_cap_flush, g_list); 5045 cf->wake = true; 5046 } 5047 spin_unlock(&mdsc->cap_dirty_lock); 5048 5049 dout("sync want tid %lld flush_seq %lld\n", 5050 want_tid, want_flush); 5051 5052 flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid); 5053 wait_caps_flush(mdsc, want_flush); 5054 } 5055 5056 /* 5057 * true if all sessions are closed, or we force unmount 5058 */ 5059 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 5060 { 5061 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 5062 return true; 5063 return atomic_read(&mdsc->num_sessions) <= skipped; 5064 } 5065 5066 /* 5067 * called after sb is ro or when metadata corrupted. 5068 */ 5069 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 5070 { 5071 struct ceph_options *opts = mdsc->fsc->client->options; 5072 struct ceph_mds_session *session; 5073 int i; 5074 int skipped = 0; 5075 5076 dout("close_sessions\n"); 5077 5078 /* close sessions */ 5079 mutex_lock(&mdsc->mutex); 5080 for (i = 0; i < mdsc->max_sessions; i++) { 5081 session = __ceph_lookup_mds_session(mdsc, i); 5082 if (!session) 5083 continue; 5084 mutex_unlock(&mdsc->mutex); 5085 mutex_lock(&session->s_mutex); 5086 if (__close_session(mdsc, session) <= 0) 5087 skipped++; 5088 mutex_unlock(&session->s_mutex); 5089 ceph_put_mds_session(session); 5090 mutex_lock(&mdsc->mutex); 5091 } 5092 mutex_unlock(&mdsc->mutex); 5093 5094 dout("waiting for sessions to close\n"); 5095 wait_event_timeout(mdsc->session_close_wq, 5096 done_closing_sessions(mdsc, skipped), 5097 ceph_timeout_jiffies(opts->mount_timeout)); 5098 5099 /* tear down remaining sessions */ 5100 mutex_lock(&mdsc->mutex); 5101 for (i = 0; i < mdsc->max_sessions; i++) { 5102 if (mdsc->sessions[i]) { 5103 session = ceph_get_mds_session(mdsc->sessions[i]); 5104 __unregister_session(mdsc, session); 5105 mutex_unlock(&mdsc->mutex); 5106 mutex_lock(&session->s_mutex); 5107 remove_session_caps(session); 5108 mutex_unlock(&session->s_mutex); 5109 ceph_put_mds_session(session); 5110 mutex_lock(&mdsc->mutex); 5111 } 5112 } 5113 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 5114 mutex_unlock(&mdsc->mutex); 5115 5116 ceph_cleanup_snapid_map(mdsc); 5117 ceph_cleanup_global_and_empty_realms(mdsc); 5118 5119 cancel_work_sync(&mdsc->cap_reclaim_work); 5120 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 5121 5122 dout("stopped\n"); 5123 } 5124 5125 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 5126 { 5127 struct ceph_mds_session *session; 5128 int mds; 5129 5130 dout("force umount\n"); 5131 5132 mutex_lock(&mdsc->mutex); 5133 for (mds = 0; mds < mdsc->max_sessions; mds++) { 5134 session = __ceph_lookup_mds_session(mdsc, mds); 5135 if (!session) 5136 continue; 5137 5138 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 5139 __unregister_session(mdsc, session); 5140 __wake_requests(mdsc, &session->s_waiting); 5141 mutex_unlock(&mdsc->mutex); 5142 5143 mutex_lock(&session->s_mutex); 5144 __close_session(mdsc, session); 5145 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 5146 cleanup_session_requests(mdsc, session); 5147 remove_session_caps(session); 5148 } 5149 mutex_unlock(&session->s_mutex); 5150 ceph_put_mds_session(session); 5151 5152 mutex_lock(&mdsc->mutex); 5153 kick_requests(mdsc, mds); 5154 } 5155 __wake_requests(mdsc, &mdsc->waiting_for_map); 5156 mutex_unlock(&mdsc->mutex); 5157 } 5158 5159 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 5160 { 5161 dout("stop\n"); 5162 /* 5163 * Make sure the delayed work stopped before releasing 5164 * the resources. 5165 * 5166 * Because the cancel_delayed_work_sync() will only 5167 * guarantee that the work finishes executing. But the 5168 * delayed work will re-arm itself again after that. 5169 */ 5170 flush_delayed_work(&mdsc->delayed_work); 5171 5172 if (mdsc->mdsmap) 5173 ceph_mdsmap_destroy(mdsc->mdsmap); 5174 kfree(mdsc->sessions); 5175 ceph_caps_finalize(mdsc); 5176 ceph_pool_perm_destroy(mdsc); 5177 } 5178 5179 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 5180 { 5181 struct ceph_mds_client *mdsc = fsc->mdsc; 5182 dout("mdsc_destroy %p\n", mdsc); 5183 5184 if (!mdsc) 5185 return; 5186 5187 /* flush out any connection work with references to us */ 5188 ceph_msgr_flush(); 5189 5190 ceph_mdsc_stop(mdsc); 5191 5192 ceph_metric_destroy(&mdsc->metric); 5193 5194 fsc->mdsc = NULL; 5195 kfree(mdsc); 5196 dout("mdsc_destroy %p done\n", mdsc); 5197 } 5198 5199 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 5200 { 5201 struct ceph_fs_client *fsc = mdsc->fsc; 5202 const char *mds_namespace = fsc->mount_options->mds_namespace; 5203 void *p = msg->front.iov_base; 5204 void *end = p + msg->front.iov_len; 5205 u32 epoch; 5206 u32 num_fs; 5207 u32 mount_fscid = (u32)-1; 5208 int err = -EINVAL; 5209 5210 ceph_decode_need(&p, end, sizeof(u32), bad); 5211 epoch = ceph_decode_32(&p); 5212 5213 dout("handle_fsmap epoch %u\n", epoch); 5214 5215 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */ 5216 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad); 5217 5218 ceph_decode_32_safe(&p, end, num_fs, bad); 5219 while (num_fs-- > 0) { 5220 void *info_p, *info_end; 5221 u32 info_len; 5222 u32 fscid, namelen; 5223 5224 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 5225 p += 2; // info_v, info_cv 5226 info_len = ceph_decode_32(&p); 5227 ceph_decode_need(&p, end, info_len, bad); 5228 info_p = p; 5229 info_end = p + info_len; 5230 p = info_end; 5231 5232 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 5233 fscid = ceph_decode_32(&info_p); 5234 namelen = ceph_decode_32(&info_p); 5235 ceph_decode_need(&info_p, info_end, namelen, bad); 5236 5237 if (mds_namespace && 5238 strlen(mds_namespace) == namelen && 5239 !strncmp(mds_namespace, (char *)info_p, namelen)) { 5240 mount_fscid = fscid; 5241 break; 5242 } 5243 } 5244 5245 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 5246 if (mount_fscid != (u32)-1) { 5247 fsc->client->monc.fs_cluster_id = mount_fscid; 5248 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 5249 0, true); 5250 ceph_monc_renew_subs(&fsc->client->monc); 5251 } else { 5252 err = -ENOENT; 5253 goto err_out; 5254 } 5255 return; 5256 5257 bad: 5258 pr_err("error decoding fsmap %d. Shutting down mount.\n", err); 5259 ceph_umount_begin(mdsc->fsc->sb); 5260 err_out: 5261 mutex_lock(&mdsc->mutex); 5262 mdsc->mdsmap_err = err; 5263 __wake_requests(mdsc, &mdsc->waiting_for_map); 5264 mutex_unlock(&mdsc->mutex); 5265 } 5266 5267 /* 5268 * handle mds map update. 5269 */ 5270 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 5271 { 5272 u32 epoch; 5273 u32 maplen; 5274 void *p = msg->front.iov_base; 5275 void *end = p + msg->front.iov_len; 5276 struct ceph_mdsmap *newmap, *oldmap; 5277 struct ceph_fsid fsid; 5278 int err = -EINVAL; 5279 5280 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 5281 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 5282 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 5283 return; 5284 epoch = ceph_decode_32(&p); 5285 maplen = ceph_decode_32(&p); 5286 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 5287 5288 /* do we need it? */ 5289 mutex_lock(&mdsc->mutex); 5290 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 5291 dout("handle_map epoch %u <= our %u\n", 5292 epoch, mdsc->mdsmap->m_epoch); 5293 mutex_unlock(&mdsc->mutex); 5294 return; 5295 } 5296 5297 newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client)); 5298 if (IS_ERR(newmap)) { 5299 err = PTR_ERR(newmap); 5300 goto bad_unlock; 5301 } 5302 5303 /* swap into place */ 5304 if (mdsc->mdsmap) { 5305 oldmap = mdsc->mdsmap; 5306 mdsc->mdsmap = newmap; 5307 check_new_map(mdsc, newmap, oldmap); 5308 ceph_mdsmap_destroy(oldmap); 5309 } else { 5310 mdsc->mdsmap = newmap; /* first mds map */ 5311 } 5312 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 5313 MAX_LFS_FILESIZE); 5314 5315 __wake_requests(mdsc, &mdsc->waiting_for_map); 5316 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 5317 mdsc->mdsmap->m_epoch); 5318 5319 mutex_unlock(&mdsc->mutex); 5320 schedule_delayed(mdsc, 0); 5321 return; 5322 5323 bad_unlock: 5324 mutex_unlock(&mdsc->mutex); 5325 bad: 5326 pr_err("error decoding mdsmap %d. Shutting down mount.\n", err); 5327 ceph_umount_begin(mdsc->fsc->sb); 5328 return; 5329 } 5330 5331 static struct ceph_connection *mds_get_con(struct ceph_connection *con) 5332 { 5333 struct ceph_mds_session *s = con->private; 5334 5335 if (ceph_get_mds_session(s)) 5336 return con; 5337 return NULL; 5338 } 5339 5340 static void mds_put_con(struct ceph_connection *con) 5341 { 5342 struct ceph_mds_session *s = con->private; 5343 5344 ceph_put_mds_session(s); 5345 } 5346 5347 /* 5348 * if the client is unresponsive for long enough, the mds will kill 5349 * the session entirely. 5350 */ 5351 static void mds_peer_reset(struct ceph_connection *con) 5352 { 5353 struct ceph_mds_session *s = con->private; 5354 struct ceph_mds_client *mdsc = s->s_mdsc; 5355 5356 pr_warn("mds%d closed our session\n", s->s_mds); 5357 if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO) 5358 send_mds_reconnect(mdsc, s); 5359 } 5360 5361 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 5362 { 5363 struct ceph_mds_session *s = con->private; 5364 struct ceph_mds_client *mdsc = s->s_mdsc; 5365 int type = le16_to_cpu(msg->hdr.type); 5366 5367 mutex_lock(&mdsc->mutex); 5368 if (__verify_registered_session(mdsc, s) < 0) { 5369 mutex_unlock(&mdsc->mutex); 5370 goto out; 5371 } 5372 mutex_unlock(&mdsc->mutex); 5373 5374 switch (type) { 5375 case CEPH_MSG_MDS_MAP: 5376 ceph_mdsc_handle_mdsmap(mdsc, msg); 5377 break; 5378 case CEPH_MSG_FS_MAP_USER: 5379 ceph_mdsc_handle_fsmap(mdsc, msg); 5380 break; 5381 case CEPH_MSG_CLIENT_SESSION: 5382 handle_session(s, msg); 5383 break; 5384 case CEPH_MSG_CLIENT_REPLY: 5385 handle_reply(s, msg); 5386 break; 5387 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 5388 handle_forward(mdsc, s, msg); 5389 break; 5390 case CEPH_MSG_CLIENT_CAPS: 5391 ceph_handle_caps(s, msg); 5392 break; 5393 case CEPH_MSG_CLIENT_SNAP: 5394 ceph_handle_snap(mdsc, s, msg); 5395 break; 5396 case CEPH_MSG_CLIENT_LEASE: 5397 handle_lease(mdsc, s, msg); 5398 break; 5399 case CEPH_MSG_CLIENT_QUOTA: 5400 ceph_handle_quota(mdsc, s, msg); 5401 break; 5402 5403 default: 5404 pr_err("received unknown message type %d %s\n", type, 5405 ceph_msg_type_name(type)); 5406 } 5407 out: 5408 ceph_msg_put(msg); 5409 } 5410 5411 /* 5412 * authentication 5413 */ 5414 5415 /* 5416 * Note: returned pointer is the address of a structure that's 5417 * managed separately. Caller must *not* attempt to free it. 5418 */ 5419 static struct ceph_auth_handshake * 5420 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new) 5421 { 5422 struct ceph_mds_session *s = con->private; 5423 struct ceph_mds_client *mdsc = s->s_mdsc; 5424 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5425 struct ceph_auth_handshake *auth = &s->s_auth; 5426 int ret; 5427 5428 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5429 force_new, proto, NULL, NULL); 5430 if (ret) 5431 return ERR_PTR(ret); 5432 5433 return auth; 5434 } 5435 5436 static int mds_add_authorizer_challenge(struct ceph_connection *con, 5437 void *challenge_buf, int challenge_buf_len) 5438 { 5439 struct ceph_mds_session *s = con->private; 5440 struct ceph_mds_client *mdsc = s->s_mdsc; 5441 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5442 5443 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 5444 challenge_buf, challenge_buf_len); 5445 } 5446 5447 static int mds_verify_authorizer_reply(struct ceph_connection *con) 5448 { 5449 struct ceph_mds_session *s = con->private; 5450 struct ceph_mds_client *mdsc = s->s_mdsc; 5451 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5452 struct ceph_auth_handshake *auth = &s->s_auth; 5453 5454 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer, 5455 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len, 5456 NULL, NULL, NULL, NULL); 5457 } 5458 5459 static int mds_invalidate_authorizer(struct ceph_connection *con) 5460 { 5461 struct ceph_mds_session *s = con->private; 5462 struct ceph_mds_client *mdsc = s->s_mdsc; 5463 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5464 5465 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 5466 5467 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 5468 } 5469 5470 static int mds_get_auth_request(struct ceph_connection *con, 5471 void *buf, int *buf_len, 5472 void **authorizer, int *authorizer_len) 5473 { 5474 struct ceph_mds_session *s = con->private; 5475 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5476 struct ceph_auth_handshake *auth = &s->s_auth; 5477 int ret; 5478 5479 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5480 buf, buf_len); 5481 if (ret) 5482 return ret; 5483 5484 *authorizer = auth->authorizer_buf; 5485 *authorizer_len = auth->authorizer_buf_len; 5486 return 0; 5487 } 5488 5489 static int mds_handle_auth_reply_more(struct ceph_connection *con, 5490 void *reply, int reply_len, 5491 void *buf, int *buf_len, 5492 void **authorizer, int *authorizer_len) 5493 { 5494 struct ceph_mds_session *s = con->private; 5495 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5496 struct ceph_auth_handshake *auth = &s->s_auth; 5497 int ret; 5498 5499 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len, 5500 buf, buf_len); 5501 if (ret) 5502 return ret; 5503 5504 *authorizer = auth->authorizer_buf; 5505 *authorizer_len = auth->authorizer_buf_len; 5506 return 0; 5507 } 5508 5509 static int mds_handle_auth_done(struct ceph_connection *con, 5510 u64 global_id, void *reply, int reply_len, 5511 u8 *session_key, int *session_key_len, 5512 u8 *con_secret, int *con_secret_len) 5513 { 5514 struct ceph_mds_session *s = con->private; 5515 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5516 struct ceph_auth_handshake *auth = &s->s_auth; 5517 5518 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len, 5519 session_key, session_key_len, 5520 con_secret, con_secret_len); 5521 } 5522 5523 static int mds_handle_auth_bad_method(struct ceph_connection *con, 5524 int used_proto, int result, 5525 const int *allowed_protos, int proto_cnt, 5526 const int *allowed_modes, int mode_cnt) 5527 { 5528 struct ceph_mds_session *s = con->private; 5529 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc; 5530 int ret; 5531 5532 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS, 5533 used_proto, result, 5534 allowed_protos, proto_cnt, 5535 allowed_modes, mode_cnt)) { 5536 ret = ceph_monc_validate_auth(monc); 5537 if (ret) 5538 return ret; 5539 } 5540 5541 return -EACCES; 5542 } 5543 5544 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 5545 struct ceph_msg_header *hdr, int *skip) 5546 { 5547 struct ceph_msg *msg; 5548 int type = (int) le16_to_cpu(hdr->type); 5549 int front_len = (int) le32_to_cpu(hdr->front_len); 5550 5551 if (con->in_msg) 5552 return con->in_msg; 5553 5554 *skip = 0; 5555 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 5556 if (!msg) { 5557 pr_err("unable to allocate msg type %d len %d\n", 5558 type, front_len); 5559 return NULL; 5560 } 5561 5562 return msg; 5563 } 5564 5565 static int mds_sign_message(struct ceph_msg *msg) 5566 { 5567 struct ceph_mds_session *s = msg->con->private; 5568 struct ceph_auth_handshake *auth = &s->s_auth; 5569 5570 return ceph_auth_sign_message(auth, msg); 5571 } 5572 5573 static int mds_check_message_signature(struct ceph_msg *msg) 5574 { 5575 struct ceph_mds_session *s = msg->con->private; 5576 struct ceph_auth_handshake *auth = &s->s_auth; 5577 5578 return ceph_auth_check_message_signature(auth, msg); 5579 } 5580 5581 static const struct ceph_connection_operations mds_con_ops = { 5582 .get = mds_get_con, 5583 .put = mds_put_con, 5584 .alloc_msg = mds_alloc_msg, 5585 .dispatch = mds_dispatch, 5586 .peer_reset = mds_peer_reset, 5587 .get_authorizer = mds_get_authorizer, 5588 .add_authorizer_challenge = mds_add_authorizer_challenge, 5589 .verify_authorizer_reply = mds_verify_authorizer_reply, 5590 .invalidate_authorizer = mds_invalidate_authorizer, 5591 .sign_message = mds_sign_message, 5592 .check_message_signature = mds_check_message_signature, 5593 .get_auth_request = mds_get_auth_request, 5594 .handle_auth_reply_more = mds_handle_auth_reply_more, 5595 .handle_auth_done = mds_handle_auth_done, 5596 .handle_auth_bad_method = mds_handle_auth_bad_method, 5597 }; 5598 5599 /* eof */ 5600