1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 #include <linux/bitmap.h> 15 16 #include "super.h" 17 #include "mds_client.h" 18 19 #include <linux/ceph/ceph_features.h> 20 #include <linux/ceph/messenger.h> 21 #include <linux/ceph/decode.h> 22 #include <linux/ceph/pagelist.h> 23 #include <linux/ceph/auth.h> 24 #include <linux/ceph/debugfs.h> 25 26 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 27 28 /* 29 * A cluster of MDS (metadata server) daemons is responsible for 30 * managing the file system namespace (the directory hierarchy and 31 * inodes) and for coordinating shared access to storage. Metadata is 32 * partitioning hierarchically across a number of servers, and that 33 * partition varies over time as the cluster adjusts the distribution 34 * in order to balance load. 35 * 36 * The MDS client is primarily responsible to managing synchronous 37 * metadata requests for operations like open, unlink, and so forth. 38 * If there is a MDS failure, we find out about it when we (possibly 39 * request and) receive a new MDS map, and can resubmit affected 40 * requests. 41 * 42 * For the most part, though, we take advantage of a lossless 43 * communications channel to the MDS, and do not need to worry about 44 * timing out or resubmitting requests. 45 * 46 * We maintain a stateful "session" with each MDS we interact with. 47 * Within each session, we sent periodic heartbeat messages to ensure 48 * any capabilities or leases we have been issues remain valid. If 49 * the session times out and goes stale, our leases and capabilities 50 * are no longer valid. 51 */ 52 53 struct ceph_reconnect_state { 54 struct ceph_mds_session *session; 55 int nr_caps, nr_realms; 56 struct ceph_pagelist *pagelist; 57 unsigned msg_version; 58 bool allow_multi; 59 }; 60 61 static void __wake_requests(struct ceph_mds_client *mdsc, 62 struct list_head *head); 63 static void ceph_cap_release_work(struct work_struct *work); 64 static void ceph_cap_reclaim_work(struct work_struct *work); 65 66 static const struct ceph_connection_operations mds_con_ops; 67 68 69 /* 70 * mds reply parsing 71 */ 72 73 static int parse_reply_info_quota(void **p, void *end, 74 struct ceph_mds_reply_info_in *info) 75 { 76 u8 struct_v, struct_compat; 77 u32 struct_len; 78 79 ceph_decode_8_safe(p, end, struct_v, bad); 80 ceph_decode_8_safe(p, end, struct_compat, bad); 81 /* struct_v is expected to be >= 1. we only 82 * understand encoding with struct_compat == 1. */ 83 if (!struct_v || struct_compat != 1) 84 goto bad; 85 ceph_decode_32_safe(p, end, struct_len, bad); 86 ceph_decode_need(p, end, struct_len, bad); 87 end = *p + struct_len; 88 ceph_decode_64_safe(p, end, info->max_bytes, bad); 89 ceph_decode_64_safe(p, end, info->max_files, bad); 90 *p = end; 91 return 0; 92 bad: 93 return -EIO; 94 } 95 96 /* 97 * parse individual inode info 98 */ 99 static int parse_reply_info_in(void **p, void *end, 100 struct ceph_mds_reply_info_in *info, 101 u64 features) 102 { 103 int err = 0; 104 u8 struct_v = 0; 105 106 if (features == (u64)-1) { 107 u32 struct_len; 108 u8 struct_compat; 109 ceph_decode_8_safe(p, end, struct_v, bad); 110 ceph_decode_8_safe(p, end, struct_compat, bad); 111 /* struct_v is expected to be >= 1. we only understand 112 * encoding with struct_compat == 1. */ 113 if (!struct_v || struct_compat != 1) 114 goto bad; 115 ceph_decode_32_safe(p, end, struct_len, bad); 116 ceph_decode_need(p, end, struct_len, bad); 117 end = *p + struct_len; 118 } 119 120 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 121 info->in = *p; 122 *p += sizeof(struct ceph_mds_reply_inode) + 123 sizeof(*info->in->fragtree.splits) * 124 le32_to_cpu(info->in->fragtree.nsplits); 125 126 ceph_decode_32_safe(p, end, info->symlink_len, bad); 127 ceph_decode_need(p, end, info->symlink_len, bad); 128 info->symlink = *p; 129 *p += info->symlink_len; 130 131 ceph_decode_copy_safe(p, end, &info->dir_layout, 132 sizeof(info->dir_layout), bad); 133 ceph_decode_32_safe(p, end, info->xattr_len, bad); 134 ceph_decode_need(p, end, info->xattr_len, bad); 135 info->xattr_data = *p; 136 *p += info->xattr_len; 137 138 if (features == (u64)-1) { 139 /* inline data */ 140 ceph_decode_64_safe(p, end, info->inline_version, bad); 141 ceph_decode_32_safe(p, end, info->inline_len, bad); 142 ceph_decode_need(p, end, info->inline_len, bad); 143 info->inline_data = *p; 144 *p += info->inline_len; 145 /* quota */ 146 err = parse_reply_info_quota(p, end, info); 147 if (err < 0) 148 goto out_bad; 149 /* pool namespace */ 150 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 151 if (info->pool_ns_len > 0) { 152 ceph_decode_need(p, end, info->pool_ns_len, bad); 153 info->pool_ns_data = *p; 154 *p += info->pool_ns_len; 155 } 156 157 /* btime */ 158 ceph_decode_need(p, end, sizeof(info->btime), bad); 159 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 160 161 /* change attribute */ 162 ceph_decode_64_safe(p, end, info->change_attr, bad); 163 164 /* dir pin */ 165 if (struct_v >= 2) { 166 ceph_decode_32_safe(p, end, info->dir_pin, bad); 167 } else { 168 info->dir_pin = -ENODATA; 169 } 170 171 /* snapshot birth time, remains zero for v<=2 */ 172 if (struct_v >= 3) { 173 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 174 ceph_decode_copy(p, &info->snap_btime, 175 sizeof(info->snap_btime)); 176 } else { 177 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 178 } 179 180 /* snapshot count, remains zero for v<=3 */ 181 if (struct_v >= 4) { 182 ceph_decode_64_safe(p, end, info->rsnaps, bad); 183 } else { 184 info->rsnaps = 0; 185 } 186 187 *p = end; 188 } else { 189 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 190 ceph_decode_64_safe(p, end, info->inline_version, bad); 191 ceph_decode_32_safe(p, end, info->inline_len, bad); 192 ceph_decode_need(p, end, info->inline_len, bad); 193 info->inline_data = *p; 194 *p += info->inline_len; 195 } else 196 info->inline_version = CEPH_INLINE_NONE; 197 198 if (features & CEPH_FEATURE_MDS_QUOTA) { 199 err = parse_reply_info_quota(p, end, info); 200 if (err < 0) 201 goto out_bad; 202 } else { 203 info->max_bytes = 0; 204 info->max_files = 0; 205 } 206 207 info->pool_ns_len = 0; 208 info->pool_ns_data = NULL; 209 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 210 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 211 if (info->pool_ns_len > 0) { 212 ceph_decode_need(p, end, info->pool_ns_len, bad); 213 info->pool_ns_data = *p; 214 *p += info->pool_ns_len; 215 } 216 } 217 218 if (features & CEPH_FEATURE_FS_BTIME) { 219 ceph_decode_need(p, end, sizeof(info->btime), bad); 220 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 221 ceph_decode_64_safe(p, end, info->change_attr, bad); 222 } 223 224 info->dir_pin = -ENODATA; 225 /* info->snap_btime and info->rsnaps remain zero */ 226 } 227 return 0; 228 bad: 229 err = -EIO; 230 out_bad: 231 return err; 232 } 233 234 static int parse_reply_info_dir(void **p, void *end, 235 struct ceph_mds_reply_dirfrag **dirfrag, 236 u64 features) 237 { 238 if (features == (u64)-1) { 239 u8 struct_v, struct_compat; 240 u32 struct_len; 241 ceph_decode_8_safe(p, end, struct_v, bad); 242 ceph_decode_8_safe(p, end, struct_compat, bad); 243 /* struct_v is expected to be >= 1. we only understand 244 * encoding whose struct_compat == 1. */ 245 if (!struct_v || struct_compat != 1) 246 goto bad; 247 ceph_decode_32_safe(p, end, struct_len, bad); 248 ceph_decode_need(p, end, struct_len, bad); 249 end = *p + struct_len; 250 } 251 252 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 253 *dirfrag = *p; 254 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 255 if (unlikely(*p > end)) 256 goto bad; 257 if (features == (u64)-1) 258 *p = end; 259 return 0; 260 bad: 261 return -EIO; 262 } 263 264 static int parse_reply_info_lease(void **p, void *end, 265 struct ceph_mds_reply_lease **lease, 266 u64 features) 267 { 268 if (features == (u64)-1) { 269 u8 struct_v, struct_compat; 270 u32 struct_len; 271 ceph_decode_8_safe(p, end, struct_v, bad); 272 ceph_decode_8_safe(p, end, struct_compat, bad); 273 /* struct_v is expected to be >= 1. we only understand 274 * encoding whose struct_compat == 1. */ 275 if (!struct_v || struct_compat != 1) 276 goto bad; 277 ceph_decode_32_safe(p, end, struct_len, bad); 278 ceph_decode_need(p, end, struct_len, bad); 279 end = *p + struct_len; 280 } 281 282 ceph_decode_need(p, end, sizeof(**lease), bad); 283 *lease = *p; 284 *p += sizeof(**lease); 285 if (features == (u64)-1) 286 *p = end; 287 return 0; 288 bad: 289 return -EIO; 290 } 291 292 /* 293 * parse a normal reply, which may contain a (dir+)dentry and/or a 294 * target inode. 295 */ 296 static int parse_reply_info_trace(void **p, void *end, 297 struct ceph_mds_reply_info_parsed *info, 298 u64 features) 299 { 300 int err; 301 302 if (info->head->is_dentry) { 303 err = parse_reply_info_in(p, end, &info->diri, features); 304 if (err < 0) 305 goto out_bad; 306 307 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 308 if (err < 0) 309 goto out_bad; 310 311 ceph_decode_32_safe(p, end, info->dname_len, bad); 312 ceph_decode_need(p, end, info->dname_len, bad); 313 info->dname = *p; 314 *p += info->dname_len; 315 316 err = parse_reply_info_lease(p, end, &info->dlease, features); 317 if (err < 0) 318 goto out_bad; 319 } 320 321 if (info->head->is_target) { 322 err = parse_reply_info_in(p, end, &info->targeti, features); 323 if (err < 0) 324 goto out_bad; 325 } 326 327 if (unlikely(*p != end)) 328 goto bad; 329 return 0; 330 331 bad: 332 err = -EIO; 333 out_bad: 334 pr_err("problem parsing mds trace %d\n", err); 335 return err; 336 } 337 338 /* 339 * parse readdir results 340 */ 341 static int parse_reply_info_readdir(void **p, void *end, 342 struct ceph_mds_reply_info_parsed *info, 343 u64 features) 344 { 345 u32 num, i = 0; 346 int err; 347 348 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 349 if (err < 0) 350 goto out_bad; 351 352 ceph_decode_need(p, end, sizeof(num) + 2, bad); 353 num = ceph_decode_32(p); 354 { 355 u16 flags = ceph_decode_16(p); 356 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 357 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 358 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 359 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 360 } 361 if (num == 0) 362 goto done; 363 364 BUG_ON(!info->dir_entries); 365 if ((unsigned long)(info->dir_entries + num) > 366 (unsigned long)info->dir_entries + info->dir_buf_size) { 367 pr_err("dir contents are larger than expected\n"); 368 WARN_ON(1); 369 goto bad; 370 } 371 372 info->dir_nr = num; 373 while (num) { 374 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 375 /* dentry */ 376 ceph_decode_32_safe(p, end, rde->name_len, bad); 377 ceph_decode_need(p, end, rde->name_len, bad); 378 rde->name = *p; 379 *p += rde->name_len; 380 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 381 382 /* dentry lease */ 383 err = parse_reply_info_lease(p, end, &rde->lease, features); 384 if (err) 385 goto out_bad; 386 /* inode */ 387 err = parse_reply_info_in(p, end, &rde->inode, features); 388 if (err < 0) 389 goto out_bad; 390 /* ceph_readdir_prepopulate() will update it */ 391 rde->offset = 0; 392 i++; 393 num--; 394 } 395 396 done: 397 /* Skip over any unrecognized fields */ 398 *p = end; 399 return 0; 400 401 bad: 402 err = -EIO; 403 out_bad: 404 pr_err("problem parsing dir contents %d\n", err); 405 return err; 406 } 407 408 /* 409 * parse fcntl F_GETLK results 410 */ 411 static int parse_reply_info_filelock(void **p, void *end, 412 struct ceph_mds_reply_info_parsed *info, 413 u64 features) 414 { 415 if (*p + sizeof(*info->filelock_reply) > end) 416 goto bad; 417 418 info->filelock_reply = *p; 419 420 /* Skip over any unrecognized fields */ 421 *p = end; 422 return 0; 423 bad: 424 return -EIO; 425 } 426 427 428 #if BITS_PER_LONG == 64 429 430 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 431 432 static int ceph_parse_deleg_inos(void **p, void *end, 433 struct ceph_mds_session *s) 434 { 435 u32 sets; 436 437 ceph_decode_32_safe(p, end, sets, bad); 438 dout("got %u sets of delegated inodes\n", sets); 439 while (sets--) { 440 u64 start, len; 441 442 ceph_decode_64_safe(p, end, start, bad); 443 ceph_decode_64_safe(p, end, len, bad); 444 445 /* Don't accept a delegation of system inodes */ 446 if (start < CEPH_INO_SYSTEM_BASE) { 447 pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n", 448 start, len); 449 continue; 450 } 451 while (len--) { 452 int err = xa_insert(&s->s_delegated_inos, start++, 453 DELEGATED_INO_AVAILABLE, 454 GFP_KERNEL); 455 if (!err) { 456 dout("added delegated inode 0x%llx\n", 457 start - 1); 458 } else if (err == -EBUSY) { 459 pr_warn("MDS delegated inode 0x%llx more than once.\n", 460 start - 1); 461 } else { 462 return err; 463 } 464 } 465 } 466 return 0; 467 bad: 468 return -EIO; 469 } 470 471 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 472 { 473 unsigned long ino; 474 void *val; 475 476 xa_for_each(&s->s_delegated_inos, ino, val) { 477 val = xa_erase(&s->s_delegated_inos, ino); 478 if (val == DELEGATED_INO_AVAILABLE) 479 return ino; 480 } 481 return 0; 482 } 483 484 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 485 { 486 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 487 GFP_KERNEL); 488 } 489 #else /* BITS_PER_LONG == 64 */ 490 /* 491 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 492 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 493 * and bottom words? 494 */ 495 static int ceph_parse_deleg_inos(void **p, void *end, 496 struct ceph_mds_session *s) 497 { 498 u32 sets; 499 500 ceph_decode_32_safe(p, end, sets, bad); 501 if (sets) 502 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 503 return 0; 504 bad: 505 return -EIO; 506 } 507 508 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 509 { 510 return 0; 511 } 512 513 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 514 { 515 return 0; 516 } 517 #endif /* BITS_PER_LONG == 64 */ 518 519 /* 520 * parse create results 521 */ 522 static int parse_reply_info_create(void **p, void *end, 523 struct ceph_mds_reply_info_parsed *info, 524 u64 features, struct ceph_mds_session *s) 525 { 526 int ret; 527 528 if (features == (u64)-1 || 529 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 530 if (*p == end) { 531 /* Malformed reply? */ 532 info->has_create_ino = false; 533 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 534 info->has_create_ino = true; 535 /* struct_v, struct_compat, and len */ 536 ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad); 537 ceph_decode_64_safe(p, end, info->ino, bad); 538 ret = ceph_parse_deleg_inos(p, end, s); 539 if (ret) 540 return ret; 541 } else { 542 /* legacy */ 543 ceph_decode_64_safe(p, end, info->ino, bad); 544 info->has_create_ino = true; 545 } 546 } else { 547 if (*p != end) 548 goto bad; 549 } 550 551 /* Skip over any unrecognized fields */ 552 *p = end; 553 return 0; 554 bad: 555 return -EIO; 556 } 557 558 static int parse_reply_info_getvxattr(void **p, void *end, 559 struct ceph_mds_reply_info_parsed *info, 560 u64 features) 561 { 562 u32 value_len; 563 564 ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */ 565 ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */ 566 ceph_decode_skip_32(p, end, bad); /* skip payload length */ 567 568 ceph_decode_32_safe(p, end, value_len, bad); 569 570 if (value_len == end - *p) { 571 info->xattr_info.xattr_value = *p; 572 info->xattr_info.xattr_value_len = value_len; 573 *p = end; 574 return value_len; 575 } 576 bad: 577 return -EIO; 578 } 579 580 /* 581 * parse extra results 582 */ 583 static int parse_reply_info_extra(void **p, void *end, 584 struct ceph_mds_reply_info_parsed *info, 585 u64 features, struct ceph_mds_session *s) 586 { 587 u32 op = le32_to_cpu(info->head->op); 588 589 if (op == CEPH_MDS_OP_GETFILELOCK) 590 return parse_reply_info_filelock(p, end, info, features); 591 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 592 return parse_reply_info_readdir(p, end, info, features); 593 else if (op == CEPH_MDS_OP_CREATE) 594 return parse_reply_info_create(p, end, info, features, s); 595 else if (op == CEPH_MDS_OP_GETVXATTR) 596 return parse_reply_info_getvxattr(p, end, info, features); 597 else 598 return -EIO; 599 } 600 601 /* 602 * parse entire mds reply 603 */ 604 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 605 struct ceph_mds_reply_info_parsed *info, 606 u64 features) 607 { 608 void *p, *end; 609 u32 len; 610 int err; 611 612 info->head = msg->front.iov_base; 613 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 614 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 615 616 /* trace */ 617 ceph_decode_32_safe(&p, end, len, bad); 618 if (len > 0) { 619 ceph_decode_need(&p, end, len, bad); 620 err = parse_reply_info_trace(&p, p+len, info, features); 621 if (err < 0) 622 goto out_bad; 623 } 624 625 /* extra */ 626 ceph_decode_32_safe(&p, end, len, bad); 627 if (len > 0) { 628 ceph_decode_need(&p, end, len, bad); 629 err = parse_reply_info_extra(&p, p+len, info, features, s); 630 if (err < 0) 631 goto out_bad; 632 } 633 634 /* snap blob */ 635 ceph_decode_32_safe(&p, end, len, bad); 636 info->snapblob_len = len; 637 info->snapblob = p; 638 p += len; 639 640 if (p != end) 641 goto bad; 642 return 0; 643 644 bad: 645 err = -EIO; 646 out_bad: 647 pr_err("mds parse_reply err %d\n", err); 648 ceph_msg_dump(msg); 649 return err; 650 } 651 652 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 653 { 654 if (!info->dir_entries) 655 return; 656 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 657 } 658 659 /* 660 * In async unlink case the kclient won't wait for the first reply 661 * from MDS and just drop all the links and unhash the dentry and then 662 * succeeds immediately. 663 * 664 * For any new create/link/rename,etc requests followed by using the 665 * same file names we must wait for the first reply of the inflight 666 * unlink request, or the MDS possibly will fail these following 667 * requests with -EEXIST if the inflight async unlink request was 668 * delayed for some reasons. 669 * 670 * And the worst case is that for the none async openc request it will 671 * successfully open the file if the CDentry hasn't been unlinked yet, 672 * but later the previous delayed async unlink request will remove the 673 * CDenty. That means the just created file is possiblly deleted later 674 * by accident. 675 * 676 * We need to wait for the inflight async unlink requests to finish 677 * when creating new files/directories by using the same file names. 678 */ 679 int ceph_wait_on_conflict_unlink(struct dentry *dentry) 680 { 681 struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); 682 struct dentry *pdentry = dentry->d_parent; 683 struct dentry *udentry, *found = NULL; 684 struct ceph_dentry_info *di; 685 struct qstr dname; 686 u32 hash = dentry->d_name.hash; 687 int err; 688 689 dname.name = dentry->d_name.name; 690 dname.len = dentry->d_name.len; 691 692 rcu_read_lock(); 693 hash_for_each_possible_rcu(fsc->async_unlink_conflict, di, 694 hnode, hash) { 695 udentry = di->dentry; 696 697 spin_lock(&udentry->d_lock); 698 if (udentry->d_name.hash != hash) 699 goto next; 700 if (unlikely(udentry->d_parent != pdentry)) 701 goto next; 702 if (!hash_hashed(&di->hnode)) 703 goto next; 704 705 if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags)) 706 pr_warn("%s dentry %p:%pd async unlink bit is not set\n", 707 __func__, dentry, dentry); 708 709 if (!d_same_name(udentry, pdentry, &dname)) 710 goto next; 711 712 spin_unlock(&udentry->d_lock); 713 found = dget(udentry); 714 break; 715 next: 716 spin_unlock(&udentry->d_lock); 717 } 718 rcu_read_unlock(); 719 720 if (likely(!found)) 721 return 0; 722 723 dout("%s dentry %p:%pd conflict with old %p:%pd\n", __func__, 724 dentry, dentry, found, found); 725 726 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT, 727 TASK_KILLABLE); 728 dput(found); 729 return err; 730 } 731 732 733 /* 734 * sessions 735 */ 736 const char *ceph_session_state_name(int s) 737 { 738 switch (s) { 739 case CEPH_MDS_SESSION_NEW: return "new"; 740 case CEPH_MDS_SESSION_OPENING: return "opening"; 741 case CEPH_MDS_SESSION_OPEN: return "open"; 742 case CEPH_MDS_SESSION_HUNG: return "hung"; 743 case CEPH_MDS_SESSION_CLOSING: return "closing"; 744 case CEPH_MDS_SESSION_CLOSED: return "closed"; 745 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 746 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 747 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 748 default: return "???"; 749 } 750 } 751 752 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 753 { 754 if (refcount_inc_not_zero(&s->s_ref)) 755 return s; 756 return NULL; 757 } 758 759 void ceph_put_mds_session(struct ceph_mds_session *s) 760 { 761 if (IS_ERR_OR_NULL(s)) 762 return; 763 764 if (refcount_dec_and_test(&s->s_ref)) { 765 if (s->s_auth.authorizer) 766 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 767 WARN_ON(mutex_is_locked(&s->s_mutex)); 768 xa_destroy(&s->s_delegated_inos); 769 kfree(s); 770 } 771 } 772 773 /* 774 * called under mdsc->mutex 775 */ 776 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 777 int mds) 778 { 779 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 780 return NULL; 781 return ceph_get_mds_session(mdsc->sessions[mds]); 782 } 783 784 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 785 { 786 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 787 return false; 788 else 789 return true; 790 } 791 792 static int __verify_registered_session(struct ceph_mds_client *mdsc, 793 struct ceph_mds_session *s) 794 { 795 if (s->s_mds >= mdsc->max_sessions || 796 mdsc->sessions[s->s_mds] != s) 797 return -ENOENT; 798 return 0; 799 } 800 801 /* 802 * create+register a new session for given mds. 803 * called under mdsc->mutex. 804 */ 805 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 806 int mds) 807 { 808 struct ceph_mds_session *s; 809 810 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 811 return ERR_PTR(-EIO); 812 813 if (mds >= mdsc->mdsmap->possible_max_rank) 814 return ERR_PTR(-EINVAL); 815 816 s = kzalloc(sizeof(*s), GFP_NOFS); 817 if (!s) 818 return ERR_PTR(-ENOMEM); 819 820 if (mds >= mdsc->max_sessions) { 821 int newmax = 1 << get_count_order(mds + 1); 822 struct ceph_mds_session **sa; 823 824 dout("%s: realloc to %d\n", __func__, newmax); 825 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 826 if (!sa) 827 goto fail_realloc; 828 if (mdsc->sessions) { 829 memcpy(sa, mdsc->sessions, 830 mdsc->max_sessions * sizeof(void *)); 831 kfree(mdsc->sessions); 832 } 833 mdsc->sessions = sa; 834 mdsc->max_sessions = newmax; 835 } 836 837 dout("%s: mds%d\n", __func__, mds); 838 s->s_mdsc = mdsc; 839 s->s_mds = mds; 840 s->s_state = CEPH_MDS_SESSION_NEW; 841 mutex_init(&s->s_mutex); 842 843 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 844 845 atomic_set(&s->s_cap_gen, 1); 846 s->s_cap_ttl = jiffies - 1; 847 848 spin_lock_init(&s->s_cap_lock); 849 INIT_LIST_HEAD(&s->s_caps); 850 refcount_set(&s->s_ref, 1); 851 INIT_LIST_HEAD(&s->s_waiting); 852 INIT_LIST_HEAD(&s->s_unsafe); 853 xa_init(&s->s_delegated_inos); 854 INIT_LIST_HEAD(&s->s_cap_releases); 855 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 856 857 INIT_LIST_HEAD(&s->s_cap_dirty); 858 INIT_LIST_HEAD(&s->s_cap_flushing); 859 860 mdsc->sessions[mds] = s; 861 atomic_inc(&mdsc->num_sessions); 862 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 863 864 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 865 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 866 867 return s; 868 869 fail_realloc: 870 kfree(s); 871 return ERR_PTR(-ENOMEM); 872 } 873 874 /* 875 * called under mdsc->mutex 876 */ 877 static void __unregister_session(struct ceph_mds_client *mdsc, 878 struct ceph_mds_session *s) 879 { 880 dout("__unregister_session mds%d %p\n", s->s_mds, s); 881 BUG_ON(mdsc->sessions[s->s_mds] != s); 882 mdsc->sessions[s->s_mds] = NULL; 883 ceph_con_close(&s->s_con); 884 ceph_put_mds_session(s); 885 atomic_dec(&mdsc->num_sessions); 886 } 887 888 /* 889 * drop session refs in request. 890 * 891 * should be last request ref, or hold mdsc->mutex 892 */ 893 static void put_request_session(struct ceph_mds_request *req) 894 { 895 if (req->r_session) { 896 ceph_put_mds_session(req->r_session); 897 req->r_session = NULL; 898 } 899 } 900 901 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc, 902 void (*cb)(struct ceph_mds_session *), 903 bool check_state) 904 { 905 int mds; 906 907 mutex_lock(&mdsc->mutex); 908 for (mds = 0; mds < mdsc->max_sessions; ++mds) { 909 struct ceph_mds_session *s; 910 911 s = __ceph_lookup_mds_session(mdsc, mds); 912 if (!s) 913 continue; 914 915 if (check_state && !check_session_state(s)) { 916 ceph_put_mds_session(s); 917 continue; 918 } 919 920 mutex_unlock(&mdsc->mutex); 921 cb(s); 922 ceph_put_mds_session(s); 923 mutex_lock(&mdsc->mutex); 924 } 925 mutex_unlock(&mdsc->mutex); 926 } 927 928 void ceph_mdsc_release_request(struct kref *kref) 929 { 930 struct ceph_mds_request *req = container_of(kref, 931 struct ceph_mds_request, 932 r_kref); 933 ceph_mdsc_release_dir_caps_no_check(req); 934 destroy_reply_info(&req->r_reply_info); 935 if (req->r_request) 936 ceph_msg_put(req->r_request); 937 if (req->r_reply) 938 ceph_msg_put(req->r_reply); 939 if (req->r_inode) { 940 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 941 iput(req->r_inode); 942 } 943 if (req->r_parent) { 944 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 945 iput(req->r_parent); 946 } 947 iput(req->r_target_inode); 948 if (req->r_dentry) 949 dput(req->r_dentry); 950 if (req->r_old_dentry) 951 dput(req->r_old_dentry); 952 if (req->r_old_dentry_dir) { 953 /* 954 * track (and drop pins for) r_old_dentry_dir 955 * separately, since r_old_dentry's d_parent may have 956 * changed between the dir mutex being dropped and 957 * this request being freed. 958 */ 959 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 960 CEPH_CAP_PIN); 961 iput(req->r_old_dentry_dir); 962 } 963 kfree(req->r_path1); 964 kfree(req->r_path2); 965 put_cred(req->r_cred); 966 if (req->r_pagelist) 967 ceph_pagelist_release(req->r_pagelist); 968 put_request_session(req); 969 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 970 WARN_ON_ONCE(!list_empty(&req->r_wait)); 971 kmem_cache_free(ceph_mds_request_cachep, req); 972 } 973 974 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 975 976 /* 977 * lookup session, bump ref if found. 978 * 979 * called under mdsc->mutex. 980 */ 981 static struct ceph_mds_request * 982 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 983 { 984 struct ceph_mds_request *req; 985 986 req = lookup_request(&mdsc->request_tree, tid); 987 if (req) 988 ceph_mdsc_get_request(req); 989 990 return req; 991 } 992 993 /* 994 * Register an in-flight request, and assign a tid. Link to directory 995 * are modifying (if any). 996 * 997 * Called under mdsc->mutex. 998 */ 999 static void __register_request(struct ceph_mds_client *mdsc, 1000 struct ceph_mds_request *req, 1001 struct inode *dir) 1002 { 1003 int ret = 0; 1004 1005 req->r_tid = ++mdsc->last_tid; 1006 if (req->r_num_caps) { 1007 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 1008 req->r_num_caps); 1009 if (ret < 0) { 1010 pr_err("__register_request %p " 1011 "failed to reserve caps: %d\n", req, ret); 1012 /* set req->r_err to fail early from __do_request */ 1013 req->r_err = ret; 1014 return; 1015 } 1016 } 1017 dout("__register_request %p tid %lld\n", req, req->r_tid); 1018 ceph_mdsc_get_request(req); 1019 insert_request(&mdsc->request_tree, req); 1020 1021 req->r_cred = get_current_cred(); 1022 1023 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 1024 mdsc->oldest_tid = req->r_tid; 1025 1026 if (dir) { 1027 struct ceph_inode_info *ci = ceph_inode(dir); 1028 1029 ihold(dir); 1030 req->r_unsafe_dir = dir; 1031 spin_lock(&ci->i_unsafe_lock); 1032 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 1033 spin_unlock(&ci->i_unsafe_lock); 1034 } 1035 } 1036 1037 static void __unregister_request(struct ceph_mds_client *mdsc, 1038 struct ceph_mds_request *req) 1039 { 1040 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 1041 1042 /* Never leave an unregistered request on an unsafe list! */ 1043 list_del_init(&req->r_unsafe_item); 1044 1045 if (req->r_tid == mdsc->oldest_tid) { 1046 struct rb_node *p = rb_next(&req->r_node); 1047 mdsc->oldest_tid = 0; 1048 while (p) { 1049 struct ceph_mds_request *next_req = 1050 rb_entry(p, struct ceph_mds_request, r_node); 1051 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 1052 mdsc->oldest_tid = next_req->r_tid; 1053 break; 1054 } 1055 p = rb_next(p); 1056 } 1057 } 1058 1059 erase_request(&mdsc->request_tree, req); 1060 1061 if (req->r_unsafe_dir) { 1062 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 1063 spin_lock(&ci->i_unsafe_lock); 1064 list_del_init(&req->r_unsafe_dir_item); 1065 spin_unlock(&ci->i_unsafe_lock); 1066 } 1067 if (req->r_target_inode && 1068 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 1069 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 1070 spin_lock(&ci->i_unsafe_lock); 1071 list_del_init(&req->r_unsafe_target_item); 1072 spin_unlock(&ci->i_unsafe_lock); 1073 } 1074 1075 if (req->r_unsafe_dir) { 1076 iput(req->r_unsafe_dir); 1077 req->r_unsafe_dir = NULL; 1078 } 1079 1080 complete_all(&req->r_safe_completion); 1081 1082 ceph_mdsc_put_request(req); 1083 } 1084 1085 /* 1086 * Walk back up the dentry tree until we hit a dentry representing a 1087 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 1088 * when calling this) to ensure that the objects won't disappear while we're 1089 * working with them. Once we hit a candidate dentry, we attempt to take a 1090 * reference to it, and return that as the result. 1091 */ 1092 static struct inode *get_nonsnap_parent(struct dentry *dentry) 1093 { 1094 struct inode *inode = NULL; 1095 1096 while (dentry && !IS_ROOT(dentry)) { 1097 inode = d_inode_rcu(dentry); 1098 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 1099 break; 1100 dentry = dentry->d_parent; 1101 } 1102 if (inode) 1103 inode = igrab(inode); 1104 return inode; 1105 } 1106 1107 /* 1108 * Choose mds to send request to next. If there is a hint set in the 1109 * request (e.g., due to a prior forward hint from the mds), use that. 1110 * Otherwise, consult frag tree and/or caps to identify the 1111 * appropriate mds. If all else fails, choose randomly. 1112 * 1113 * Called under mdsc->mutex. 1114 */ 1115 static int __choose_mds(struct ceph_mds_client *mdsc, 1116 struct ceph_mds_request *req, 1117 bool *random) 1118 { 1119 struct inode *inode; 1120 struct ceph_inode_info *ci; 1121 struct ceph_cap *cap; 1122 int mode = req->r_direct_mode; 1123 int mds = -1; 1124 u32 hash = req->r_direct_hash; 1125 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1126 1127 if (random) 1128 *random = false; 1129 1130 /* 1131 * is there a specific mds we should try? ignore hint if we have 1132 * no session and the mds is not up (active or recovering). 1133 */ 1134 if (req->r_resend_mds >= 0 && 1135 (__have_session(mdsc, req->r_resend_mds) || 1136 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1137 dout("%s using resend_mds mds%d\n", __func__, 1138 req->r_resend_mds); 1139 return req->r_resend_mds; 1140 } 1141 1142 if (mode == USE_RANDOM_MDS) 1143 goto random; 1144 1145 inode = NULL; 1146 if (req->r_inode) { 1147 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1148 inode = req->r_inode; 1149 ihold(inode); 1150 } else { 1151 /* req->r_dentry is non-null for LSSNAP request */ 1152 rcu_read_lock(); 1153 inode = get_nonsnap_parent(req->r_dentry); 1154 rcu_read_unlock(); 1155 dout("%s using snapdir's parent %p\n", __func__, inode); 1156 } 1157 } else if (req->r_dentry) { 1158 /* ignore race with rename; old or new d_parent is okay */ 1159 struct dentry *parent; 1160 struct inode *dir; 1161 1162 rcu_read_lock(); 1163 parent = READ_ONCE(req->r_dentry->d_parent); 1164 dir = req->r_parent ? : d_inode_rcu(parent); 1165 1166 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1167 /* not this fs or parent went negative */ 1168 inode = d_inode(req->r_dentry); 1169 if (inode) 1170 ihold(inode); 1171 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1172 /* direct snapped/virtual snapdir requests 1173 * based on parent dir inode */ 1174 inode = get_nonsnap_parent(parent); 1175 dout("%s using nonsnap parent %p\n", __func__, inode); 1176 } else { 1177 /* dentry target */ 1178 inode = d_inode(req->r_dentry); 1179 if (!inode || mode == USE_AUTH_MDS) { 1180 /* dir + name */ 1181 inode = igrab(dir); 1182 hash = ceph_dentry_hash(dir, req->r_dentry); 1183 is_hash = true; 1184 } else { 1185 ihold(inode); 1186 } 1187 } 1188 rcu_read_unlock(); 1189 } 1190 1191 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, 1192 hash, mode); 1193 if (!inode) 1194 goto random; 1195 ci = ceph_inode(inode); 1196 1197 if (is_hash && S_ISDIR(inode->i_mode)) { 1198 struct ceph_inode_frag frag; 1199 int found; 1200 1201 ceph_choose_frag(ci, hash, &frag, &found); 1202 if (found) { 1203 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1204 u8 r; 1205 1206 /* choose a random replica */ 1207 get_random_bytes(&r, 1); 1208 r %= frag.ndist; 1209 mds = frag.dist[r]; 1210 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", 1211 __func__, inode, ceph_vinop(inode), 1212 frag.frag, mds, (int)r, frag.ndist); 1213 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1214 CEPH_MDS_STATE_ACTIVE && 1215 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1216 goto out; 1217 } 1218 1219 /* since this file/dir wasn't known to be 1220 * replicated, then we want to look for the 1221 * authoritative mds. */ 1222 if (frag.mds >= 0) { 1223 /* choose auth mds */ 1224 mds = frag.mds; 1225 dout("%s %p %llx.%llx frag %u mds%d (auth)\n", 1226 __func__, inode, ceph_vinop(inode), 1227 frag.frag, mds); 1228 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1229 CEPH_MDS_STATE_ACTIVE) { 1230 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1231 mds)) 1232 goto out; 1233 } 1234 } 1235 mode = USE_AUTH_MDS; 1236 } 1237 } 1238 1239 spin_lock(&ci->i_ceph_lock); 1240 cap = NULL; 1241 if (mode == USE_AUTH_MDS) 1242 cap = ci->i_auth_cap; 1243 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1244 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1245 if (!cap) { 1246 spin_unlock(&ci->i_ceph_lock); 1247 iput(inode); 1248 goto random; 1249 } 1250 mds = cap->session->s_mds; 1251 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, 1252 inode, ceph_vinop(inode), mds, 1253 cap == ci->i_auth_cap ? "auth " : "", cap); 1254 spin_unlock(&ci->i_ceph_lock); 1255 out: 1256 iput(inode); 1257 return mds; 1258 1259 random: 1260 if (random) 1261 *random = true; 1262 1263 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1264 dout("%s chose random mds%d\n", __func__, mds); 1265 return mds; 1266 } 1267 1268 1269 /* 1270 * session messages 1271 */ 1272 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq) 1273 { 1274 struct ceph_msg *msg; 1275 struct ceph_mds_session_head *h; 1276 1277 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1278 false); 1279 if (!msg) { 1280 pr_err("ENOMEM creating session %s msg\n", 1281 ceph_session_op_name(op)); 1282 return NULL; 1283 } 1284 h = msg->front.iov_base; 1285 h->op = cpu_to_le32(op); 1286 h->seq = cpu_to_le64(seq); 1287 1288 return msg; 1289 } 1290 1291 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1292 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1293 static int encode_supported_features(void **p, void *end) 1294 { 1295 static const size_t count = ARRAY_SIZE(feature_bits); 1296 1297 if (count > 0) { 1298 size_t i; 1299 size_t size = FEATURE_BYTES(count); 1300 unsigned long bit; 1301 1302 if (WARN_ON_ONCE(*p + 4 + size > end)) 1303 return -ERANGE; 1304 1305 ceph_encode_32(p, size); 1306 memset(*p, 0, size); 1307 for (i = 0; i < count; i++) { 1308 bit = feature_bits[i]; 1309 ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8); 1310 } 1311 *p += size; 1312 } else { 1313 if (WARN_ON_ONCE(*p + 4 > end)) 1314 return -ERANGE; 1315 1316 ceph_encode_32(p, 0); 1317 } 1318 1319 return 0; 1320 } 1321 1322 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1323 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1324 static int encode_metric_spec(void **p, void *end) 1325 { 1326 static const size_t count = ARRAY_SIZE(metric_bits); 1327 1328 /* header */ 1329 if (WARN_ON_ONCE(*p + 2 > end)) 1330 return -ERANGE; 1331 1332 ceph_encode_8(p, 1); /* version */ 1333 ceph_encode_8(p, 1); /* compat */ 1334 1335 if (count > 0) { 1336 size_t i; 1337 size_t size = METRIC_BYTES(count); 1338 1339 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1340 return -ERANGE; 1341 1342 /* metric spec info length */ 1343 ceph_encode_32(p, 4 + size); 1344 1345 /* metric spec */ 1346 ceph_encode_32(p, size); 1347 memset(*p, 0, size); 1348 for (i = 0; i < count; i++) 1349 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1350 *p += size; 1351 } else { 1352 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1353 return -ERANGE; 1354 1355 /* metric spec info length */ 1356 ceph_encode_32(p, 4); 1357 /* metric spec */ 1358 ceph_encode_32(p, 0); 1359 } 1360 1361 return 0; 1362 } 1363 1364 /* 1365 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1366 * to include additional client metadata fields. 1367 */ 1368 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1369 { 1370 struct ceph_msg *msg; 1371 struct ceph_mds_session_head *h; 1372 int i; 1373 int extra_bytes = 0; 1374 int metadata_key_count = 0; 1375 struct ceph_options *opt = mdsc->fsc->client->options; 1376 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1377 size_t size, count; 1378 void *p, *end; 1379 int ret; 1380 1381 const char* metadata[][2] = { 1382 {"hostname", mdsc->nodename}, 1383 {"kernel_version", init_utsname()->release}, 1384 {"entity_id", opt->name ? : ""}, 1385 {"root", fsopt->server_path ? : "/"}, 1386 {NULL, NULL} 1387 }; 1388 1389 /* Calculate serialized length of metadata */ 1390 extra_bytes = 4; /* map length */ 1391 for (i = 0; metadata[i][0]; ++i) { 1392 extra_bytes += 8 + strlen(metadata[i][0]) + 1393 strlen(metadata[i][1]); 1394 metadata_key_count++; 1395 } 1396 1397 /* supported feature */ 1398 size = 0; 1399 count = ARRAY_SIZE(feature_bits); 1400 if (count > 0) 1401 size = FEATURE_BYTES(count); 1402 extra_bytes += 4 + size; 1403 1404 /* metric spec */ 1405 size = 0; 1406 count = ARRAY_SIZE(metric_bits); 1407 if (count > 0) 1408 size = METRIC_BYTES(count); 1409 extra_bytes += 2 + 4 + 4 + size; 1410 1411 /* Allocate the message */ 1412 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1413 GFP_NOFS, false); 1414 if (!msg) { 1415 pr_err("ENOMEM creating session open msg\n"); 1416 return ERR_PTR(-ENOMEM); 1417 } 1418 p = msg->front.iov_base; 1419 end = p + msg->front.iov_len; 1420 1421 h = p; 1422 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1423 h->seq = cpu_to_le64(seq); 1424 1425 /* 1426 * Serialize client metadata into waiting buffer space, using 1427 * the format that userspace expects for map<string, string> 1428 * 1429 * ClientSession messages with metadata are v4 1430 */ 1431 msg->hdr.version = cpu_to_le16(4); 1432 msg->hdr.compat_version = cpu_to_le16(1); 1433 1434 /* The write pointer, following the session_head structure */ 1435 p += sizeof(*h); 1436 1437 /* Number of entries in the map */ 1438 ceph_encode_32(&p, metadata_key_count); 1439 1440 /* Two length-prefixed strings for each entry in the map */ 1441 for (i = 0; metadata[i][0]; ++i) { 1442 size_t const key_len = strlen(metadata[i][0]); 1443 size_t const val_len = strlen(metadata[i][1]); 1444 1445 ceph_encode_32(&p, key_len); 1446 memcpy(p, metadata[i][0], key_len); 1447 p += key_len; 1448 ceph_encode_32(&p, val_len); 1449 memcpy(p, metadata[i][1], val_len); 1450 p += val_len; 1451 } 1452 1453 ret = encode_supported_features(&p, end); 1454 if (ret) { 1455 pr_err("encode_supported_features failed!\n"); 1456 ceph_msg_put(msg); 1457 return ERR_PTR(ret); 1458 } 1459 1460 ret = encode_metric_spec(&p, end); 1461 if (ret) { 1462 pr_err("encode_metric_spec failed!\n"); 1463 ceph_msg_put(msg); 1464 return ERR_PTR(ret); 1465 } 1466 1467 msg->front.iov_len = p - msg->front.iov_base; 1468 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1469 1470 return msg; 1471 } 1472 1473 /* 1474 * send session open request. 1475 * 1476 * called under mdsc->mutex 1477 */ 1478 static int __open_session(struct ceph_mds_client *mdsc, 1479 struct ceph_mds_session *session) 1480 { 1481 struct ceph_msg *msg; 1482 int mstate; 1483 int mds = session->s_mds; 1484 1485 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) 1486 return -EIO; 1487 1488 /* wait for mds to go active? */ 1489 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1490 dout("open_session to mds%d (%s)\n", mds, 1491 ceph_mds_state_name(mstate)); 1492 session->s_state = CEPH_MDS_SESSION_OPENING; 1493 session->s_renew_requested = jiffies; 1494 1495 /* send connect message */ 1496 msg = create_session_open_msg(mdsc, session->s_seq); 1497 if (IS_ERR(msg)) 1498 return PTR_ERR(msg); 1499 ceph_con_send(&session->s_con, msg); 1500 return 0; 1501 } 1502 1503 /* 1504 * open sessions for any export targets for the given mds 1505 * 1506 * called under mdsc->mutex 1507 */ 1508 static struct ceph_mds_session * 1509 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1510 { 1511 struct ceph_mds_session *session; 1512 int ret; 1513 1514 session = __ceph_lookup_mds_session(mdsc, target); 1515 if (!session) { 1516 session = register_session(mdsc, target); 1517 if (IS_ERR(session)) 1518 return session; 1519 } 1520 if (session->s_state == CEPH_MDS_SESSION_NEW || 1521 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1522 ret = __open_session(mdsc, session); 1523 if (ret) 1524 return ERR_PTR(ret); 1525 } 1526 1527 return session; 1528 } 1529 1530 struct ceph_mds_session * 1531 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1532 { 1533 struct ceph_mds_session *session; 1534 1535 dout("open_export_target_session to mds%d\n", target); 1536 1537 mutex_lock(&mdsc->mutex); 1538 session = __open_export_target_session(mdsc, target); 1539 mutex_unlock(&mdsc->mutex); 1540 1541 return session; 1542 } 1543 1544 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1545 struct ceph_mds_session *session) 1546 { 1547 struct ceph_mds_info *mi; 1548 struct ceph_mds_session *ts; 1549 int i, mds = session->s_mds; 1550 1551 if (mds >= mdsc->mdsmap->possible_max_rank) 1552 return; 1553 1554 mi = &mdsc->mdsmap->m_info[mds]; 1555 dout("open_export_target_sessions for mds%d (%d targets)\n", 1556 session->s_mds, mi->num_export_targets); 1557 1558 for (i = 0; i < mi->num_export_targets; i++) { 1559 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1560 ceph_put_mds_session(ts); 1561 } 1562 } 1563 1564 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1565 struct ceph_mds_session *session) 1566 { 1567 mutex_lock(&mdsc->mutex); 1568 __open_export_target_sessions(mdsc, session); 1569 mutex_unlock(&mdsc->mutex); 1570 } 1571 1572 /* 1573 * session caps 1574 */ 1575 1576 static void detach_cap_releases(struct ceph_mds_session *session, 1577 struct list_head *target) 1578 { 1579 lockdep_assert_held(&session->s_cap_lock); 1580 1581 list_splice_init(&session->s_cap_releases, target); 1582 session->s_num_cap_releases = 0; 1583 dout("dispose_cap_releases mds%d\n", session->s_mds); 1584 } 1585 1586 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1587 struct list_head *dispose) 1588 { 1589 while (!list_empty(dispose)) { 1590 struct ceph_cap *cap; 1591 /* zero out the in-progress message */ 1592 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1593 list_del(&cap->session_caps); 1594 ceph_put_cap(mdsc, cap); 1595 } 1596 } 1597 1598 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1599 struct ceph_mds_session *session) 1600 { 1601 struct ceph_mds_request *req; 1602 struct rb_node *p; 1603 1604 dout("cleanup_session_requests mds%d\n", session->s_mds); 1605 mutex_lock(&mdsc->mutex); 1606 while (!list_empty(&session->s_unsafe)) { 1607 req = list_first_entry(&session->s_unsafe, 1608 struct ceph_mds_request, r_unsafe_item); 1609 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1610 req->r_tid); 1611 if (req->r_target_inode) 1612 mapping_set_error(req->r_target_inode->i_mapping, -EIO); 1613 if (req->r_unsafe_dir) 1614 mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO); 1615 __unregister_request(mdsc, req); 1616 } 1617 /* zero r_attempts, so kick_requests() will re-send requests */ 1618 p = rb_first(&mdsc->request_tree); 1619 while (p) { 1620 req = rb_entry(p, struct ceph_mds_request, r_node); 1621 p = rb_next(p); 1622 if (req->r_session && 1623 req->r_session->s_mds == session->s_mds) 1624 req->r_attempts = 0; 1625 } 1626 mutex_unlock(&mdsc->mutex); 1627 } 1628 1629 /* 1630 * Helper to safely iterate over all caps associated with a session, with 1631 * special care taken to handle a racing __ceph_remove_cap(). 1632 * 1633 * Caller must hold session s_mutex. 1634 */ 1635 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1636 int (*cb)(struct inode *, int mds, void *), 1637 void *arg) 1638 { 1639 struct list_head *p; 1640 struct ceph_cap *cap; 1641 struct inode *inode, *last_inode = NULL; 1642 struct ceph_cap *old_cap = NULL; 1643 int ret; 1644 1645 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1646 spin_lock(&session->s_cap_lock); 1647 p = session->s_caps.next; 1648 while (p != &session->s_caps) { 1649 int mds; 1650 1651 cap = list_entry(p, struct ceph_cap, session_caps); 1652 inode = igrab(&cap->ci->netfs.inode); 1653 if (!inode) { 1654 p = p->next; 1655 continue; 1656 } 1657 session->s_cap_iterator = cap; 1658 mds = cap->mds; 1659 spin_unlock(&session->s_cap_lock); 1660 1661 if (last_inode) { 1662 iput(last_inode); 1663 last_inode = NULL; 1664 } 1665 if (old_cap) { 1666 ceph_put_cap(session->s_mdsc, old_cap); 1667 old_cap = NULL; 1668 } 1669 1670 ret = cb(inode, mds, arg); 1671 last_inode = inode; 1672 1673 spin_lock(&session->s_cap_lock); 1674 p = p->next; 1675 if (!cap->ci) { 1676 dout("iterate_session_caps finishing cap %p removal\n", 1677 cap); 1678 BUG_ON(cap->session != session); 1679 cap->session = NULL; 1680 list_del_init(&cap->session_caps); 1681 session->s_nr_caps--; 1682 atomic64_dec(&session->s_mdsc->metric.total_caps); 1683 if (cap->queue_release) 1684 __ceph_queue_cap_release(session, cap); 1685 else 1686 old_cap = cap; /* put_cap it w/o locks held */ 1687 } 1688 if (ret < 0) 1689 goto out; 1690 } 1691 ret = 0; 1692 out: 1693 session->s_cap_iterator = NULL; 1694 spin_unlock(&session->s_cap_lock); 1695 1696 iput(last_inode); 1697 if (old_cap) 1698 ceph_put_cap(session->s_mdsc, old_cap); 1699 1700 return ret; 1701 } 1702 1703 static int remove_session_caps_cb(struct inode *inode, int mds, void *arg) 1704 { 1705 struct ceph_inode_info *ci = ceph_inode(inode); 1706 bool invalidate = false; 1707 struct ceph_cap *cap; 1708 int iputs = 0; 1709 1710 spin_lock(&ci->i_ceph_lock); 1711 cap = __get_cap_for_mds(ci, mds); 1712 if (cap) { 1713 dout(" removing cap %p, ci is %p, inode is %p\n", 1714 cap, ci, &ci->netfs.inode); 1715 1716 iputs = ceph_purge_inode_cap(inode, cap, &invalidate); 1717 } 1718 spin_unlock(&ci->i_ceph_lock); 1719 1720 if (cap) 1721 wake_up_all(&ci->i_cap_wq); 1722 if (invalidate) 1723 ceph_queue_invalidate(inode); 1724 while (iputs--) 1725 iput(inode); 1726 return 0; 1727 } 1728 1729 /* 1730 * caller must hold session s_mutex 1731 */ 1732 static void remove_session_caps(struct ceph_mds_session *session) 1733 { 1734 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1735 struct super_block *sb = fsc->sb; 1736 LIST_HEAD(dispose); 1737 1738 dout("remove_session_caps on %p\n", session); 1739 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1740 1741 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1742 1743 spin_lock(&session->s_cap_lock); 1744 if (session->s_nr_caps > 0) { 1745 struct inode *inode; 1746 struct ceph_cap *cap, *prev = NULL; 1747 struct ceph_vino vino; 1748 /* 1749 * iterate_session_caps() skips inodes that are being 1750 * deleted, we need to wait until deletions are complete. 1751 * __wait_on_freeing_inode() is designed for the job, 1752 * but it is not exported, so use lookup inode function 1753 * to access it. 1754 */ 1755 while (!list_empty(&session->s_caps)) { 1756 cap = list_entry(session->s_caps.next, 1757 struct ceph_cap, session_caps); 1758 if (cap == prev) 1759 break; 1760 prev = cap; 1761 vino = cap->ci->i_vino; 1762 spin_unlock(&session->s_cap_lock); 1763 1764 inode = ceph_find_inode(sb, vino); 1765 iput(inode); 1766 1767 spin_lock(&session->s_cap_lock); 1768 } 1769 } 1770 1771 // drop cap expires and unlock s_cap_lock 1772 detach_cap_releases(session, &dispose); 1773 1774 BUG_ON(session->s_nr_caps > 0); 1775 BUG_ON(!list_empty(&session->s_cap_flushing)); 1776 spin_unlock(&session->s_cap_lock); 1777 dispose_cap_releases(session->s_mdsc, &dispose); 1778 } 1779 1780 enum { 1781 RECONNECT, 1782 RENEWCAPS, 1783 FORCE_RO, 1784 }; 1785 1786 /* 1787 * wake up any threads waiting on this session's caps. if the cap is 1788 * old (didn't get renewed on the client reconnect), remove it now. 1789 * 1790 * caller must hold s_mutex. 1791 */ 1792 static int wake_up_session_cb(struct inode *inode, int mds, void *arg) 1793 { 1794 struct ceph_inode_info *ci = ceph_inode(inode); 1795 unsigned long ev = (unsigned long)arg; 1796 1797 if (ev == RECONNECT) { 1798 spin_lock(&ci->i_ceph_lock); 1799 ci->i_wanted_max_size = 0; 1800 ci->i_requested_max_size = 0; 1801 spin_unlock(&ci->i_ceph_lock); 1802 } else if (ev == RENEWCAPS) { 1803 struct ceph_cap *cap; 1804 1805 spin_lock(&ci->i_ceph_lock); 1806 cap = __get_cap_for_mds(ci, mds); 1807 /* mds did not re-issue stale cap */ 1808 if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen)) 1809 cap->issued = cap->implemented = CEPH_CAP_PIN; 1810 spin_unlock(&ci->i_ceph_lock); 1811 } else if (ev == FORCE_RO) { 1812 } 1813 wake_up_all(&ci->i_cap_wq); 1814 return 0; 1815 } 1816 1817 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1818 { 1819 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1820 ceph_iterate_session_caps(session, wake_up_session_cb, 1821 (void *)(unsigned long)ev); 1822 } 1823 1824 /* 1825 * Send periodic message to MDS renewing all currently held caps. The 1826 * ack will reset the expiration for all caps from this session. 1827 * 1828 * caller holds s_mutex 1829 */ 1830 static int send_renew_caps(struct ceph_mds_client *mdsc, 1831 struct ceph_mds_session *session) 1832 { 1833 struct ceph_msg *msg; 1834 int state; 1835 1836 if (time_after_eq(jiffies, session->s_cap_ttl) && 1837 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1838 pr_info("mds%d caps stale\n", session->s_mds); 1839 session->s_renew_requested = jiffies; 1840 1841 /* do not try to renew caps until a recovering mds has reconnected 1842 * with its clients. */ 1843 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1844 if (state < CEPH_MDS_STATE_RECONNECT) { 1845 dout("send_renew_caps ignoring mds%d (%s)\n", 1846 session->s_mds, ceph_mds_state_name(state)); 1847 return 0; 1848 } 1849 1850 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1851 ceph_mds_state_name(state)); 1852 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1853 ++session->s_renew_seq); 1854 if (!msg) 1855 return -ENOMEM; 1856 ceph_con_send(&session->s_con, msg); 1857 return 0; 1858 } 1859 1860 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1861 struct ceph_mds_session *session, u64 seq) 1862 { 1863 struct ceph_msg *msg; 1864 1865 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1866 session->s_mds, ceph_session_state_name(session->s_state), seq); 1867 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1868 if (!msg) 1869 return -ENOMEM; 1870 ceph_con_send(&session->s_con, msg); 1871 return 0; 1872 } 1873 1874 1875 /* 1876 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1877 * 1878 * Called under session->s_mutex 1879 */ 1880 static void renewed_caps(struct ceph_mds_client *mdsc, 1881 struct ceph_mds_session *session, int is_renew) 1882 { 1883 int was_stale; 1884 int wake = 0; 1885 1886 spin_lock(&session->s_cap_lock); 1887 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1888 1889 session->s_cap_ttl = session->s_renew_requested + 1890 mdsc->mdsmap->m_session_timeout*HZ; 1891 1892 if (was_stale) { 1893 if (time_before(jiffies, session->s_cap_ttl)) { 1894 pr_info("mds%d caps renewed\n", session->s_mds); 1895 wake = 1; 1896 } else { 1897 pr_info("mds%d caps still stale\n", session->s_mds); 1898 } 1899 } 1900 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1901 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1902 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1903 spin_unlock(&session->s_cap_lock); 1904 1905 if (wake) 1906 wake_up_session_caps(session, RENEWCAPS); 1907 } 1908 1909 /* 1910 * send a session close request 1911 */ 1912 static int request_close_session(struct ceph_mds_session *session) 1913 { 1914 struct ceph_msg *msg; 1915 1916 dout("request_close_session mds%d state %s seq %lld\n", 1917 session->s_mds, ceph_session_state_name(session->s_state), 1918 session->s_seq); 1919 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE, 1920 session->s_seq); 1921 if (!msg) 1922 return -ENOMEM; 1923 ceph_con_send(&session->s_con, msg); 1924 return 1; 1925 } 1926 1927 /* 1928 * Called with s_mutex held. 1929 */ 1930 static int __close_session(struct ceph_mds_client *mdsc, 1931 struct ceph_mds_session *session) 1932 { 1933 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1934 return 0; 1935 session->s_state = CEPH_MDS_SESSION_CLOSING; 1936 return request_close_session(session); 1937 } 1938 1939 static bool drop_negative_children(struct dentry *dentry) 1940 { 1941 struct dentry *child; 1942 bool all_negative = true; 1943 1944 if (!d_is_dir(dentry)) 1945 goto out; 1946 1947 spin_lock(&dentry->d_lock); 1948 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1949 if (d_really_is_positive(child)) { 1950 all_negative = false; 1951 break; 1952 } 1953 } 1954 spin_unlock(&dentry->d_lock); 1955 1956 if (all_negative) 1957 shrink_dcache_parent(dentry); 1958 out: 1959 return all_negative; 1960 } 1961 1962 /* 1963 * Trim old(er) caps. 1964 * 1965 * Because we can't cache an inode without one or more caps, we do 1966 * this indirectly: if a cap is unused, we prune its aliases, at which 1967 * point the inode will hopefully get dropped to. 1968 * 1969 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1970 * memory pressure from the MDS, though, so it needn't be perfect. 1971 */ 1972 static int trim_caps_cb(struct inode *inode, int mds, void *arg) 1973 { 1974 int *remaining = arg; 1975 struct ceph_inode_info *ci = ceph_inode(inode); 1976 int used, wanted, oissued, mine; 1977 struct ceph_cap *cap; 1978 1979 if (*remaining <= 0) 1980 return -1; 1981 1982 spin_lock(&ci->i_ceph_lock); 1983 cap = __get_cap_for_mds(ci, mds); 1984 if (!cap) { 1985 spin_unlock(&ci->i_ceph_lock); 1986 return 0; 1987 } 1988 mine = cap->issued | cap->implemented; 1989 used = __ceph_caps_used(ci); 1990 wanted = __ceph_caps_file_wanted(ci); 1991 oissued = __ceph_caps_issued_other(ci, cap); 1992 1993 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1994 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1995 ceph_cap_string(used), ceph_cap_string(wanted)); 1996 if (cap == ci->i_auth_cap) { 1997 if (ci->i_dirty_caps || ci->i_flushing_caps || 1998 !list_empty(&ci->i_cap_snaps)) 1999 goto out; 2000 if ((used | wanted) & CEPH_CAP_ANY_WR) 2001 goto out; 2002 /* Note: it's possible that i_filelock_ref becomes non-zero 2003 * after dropping auth caps. It doesn't hurt because reply 2004 * of lock mds request will re-add auth caps. */ 2005 if (atomic_read(&ci->i_filelock_ref) > 0) 2006 goto out; 2007 } 2008 /* The inode has cached pages, but it's no longer used. 2009 * we can safely drop it */ 2010 if (S_ISREG(inode->i_mode) && 2011 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 2012 !(oissued & CEPH_CAP_FILE_CACHE)) { 2013 used = 0; 2014 oissued = 0; 2015 } 2016 if ((used | wanted) & ~oissued & mine) 2017 goto out; /* we need these caps */ 2018 2019 if (oissued) { 2020 /* we aren't the only cap.. just remove us */ 2021 ceph_remove_cap(cap, true); 2022 (*remaining)--; 2023 } else { 2024 struct dentry *dentry; 2025 /* try dropping referring dentries */ 2026 spin_unlock(&ci->i_ceph_lock); 2027 dentry = d_find_any_alias(inode); 2028 if (dentry && drop_negative_children(dentry)) { 2029 int count; 2030 dput(dentry); 2031 d_prune_aliases(inode); 2032 count = atomic_read(&inode->i_count); 2033 if (count == 1) 2034 (*remaining)--; 2035 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 2036 inode, cap, count); 2037 } else { 2038 dput(dentry); 2039 } 2040 return 0; 2041 } 2042 2043 out: 2044 spin_unlock(&ci->i_ceph_lock); 2045 return 0; 2046 } 2047 2048 /* 2049 * Trim session cap count down to some max number. 2050 */ 2051 int ceph_trim_caps(struct ceph_mds_client *mdsc, 2052 struct ceph_mds_session *session, 2053 int max_caps) 2054 { 2055 int trim_caps = session->s_nr_caps - max_caps; 2056 2057 dout("trim_caps mds%d start: %d / %d, trim %d\n", 2058 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 2059 if (trim_caps > 0) { 2060 int remaining = trim_caps; 2061 2062 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2063 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 2064 session->s_mds, session->s_nr_caps, max_caps, 2065 trim_caps - remaining); 2066 } 2067 2068 ceph_flush_cap_releases(mdsc, session); 2069 return 0; 2070 } 2071 2072 static int check_caps_flush(struct ceph_mds_client *mdsc, 2073 u64 want_flush_tid) 2074 { 2075 int ret = 1; 2076 2077 spin_lock(&mdsc->cap_dirty_lock); 2078 if (!list_empty(&mdsc->cap_flush_list)) { 2079 struct ceph_cap_flush *cf = 2080 list_first_entry(&mdsc->cap_flush_list, 2081 struct ceph_cap_flush, g_list); 2082 if (cf->tid <= want_flush_tid) { 2083 dout("check_caps_flush still flushing tid " 2084 "%llu <= %llu\n", cf->tid, want_flush_tid); 2085 ret = 0; 2086 } 2087 } 2088 spin_unlock(&mdsc->cap_dirty_lock); 2089 return ret; 2090 } 2091 2092 /* 2093 * flush all dirty inode data to disk. 2094 * 2095 * returns true if we've flushed through want_flush_tid 2096 */ 2097 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2098 u64 want_flush_tid) 2099 { 2100 dout("check_caps_flush want %llu\n", want_flush_tid); 2101 2102 wait_event(mdsc->cap_flushing_wq, 2103 check_caps_flush(mdsc, want_flush_tid)); 2104 2105 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 2106 } 2107 2108 /* 2109 * called under s_mutex 2110 */ 2111 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2112 struct ceph_mds_session *session) 2113 { 2114 struct ceph_msg *msg = NULL; 2115 struct ceph_mds_cap_release *head; 2116 struct ceph_mds_cap_item *item; 2117 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2118 struct ceph_cap *cap; 2119 LIST_HEAD(tmp_list); 2120 int num_cap_releases; 2121 __le32 barrier, *cap_barrier; 2122 2123 down_read(&osdc->lock); 2124 barrier = cpu_to_le32(osdc->epoch_barrier); 2125 up_read(&osdc->lock); 2126 2127 spin_lock(&session->s_cap_lock); 2128 again: 2129 list_splice_init(&session->s_cap_releases, &tmp_list); 2130 num_cap_releases = session->s_num_cap_releases; 2131 session->s_num_cap_releases = 0; 2132 spin_unlock(&session->s_cap_lock); 2133 2134 while (!list_empty(&tmp_list)) { 2135 if (!msg) { 2136 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2137 PAGE_SIZE, GFP_NOFS, false); 2138 if (!msg) 2139 goto out_err; 2140 head = msg->front.iov_base; 2141 head->num = cpu_to_le32(0); 2142 msg->front.iov_len = sizeof(*head); 2143 2144 msg->hdr.version = cpu_to_le16(2); 2145 msg->hdr.compat_version = cpu_to_le16(1); 2146 } 2147 2148 cap = list_first_entry(&tmp_list, struct ceph_cap, 2149 session_caps); 2150 list_del(&cap->session_caps); 2151 num_cap_releases--; 2152 2153 head = msg->front.iov_base; 2154 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2155 &head->num); 2156 item = msg->front.iov_base + msg->front.iov_len; 2157 item->ino = cpu_to_le64(cap->cap_ino); 2158 item->cap_id = cpu_to_le64(cap->cap_id); 2159 item->migrate_seq = cpu_to_le32(cap->mseq); 2160 item->seq = cpu_to_le32(cap->issue_seq); 2161 msg->front.iov_len += sizeof(*item); 2162 2163 ceph_put_cap(mdsc, cap); 2164 2165 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2166 // Append cap_barrier field 2167 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2168 *cap_barrier = barrier; 2169 msg->front.iov_len += sizeof(*cap_barrier); 2170 2171 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2172 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2173 ceph_con_send(&session->s_con, msg); 2174 msg = NULL; 2175 } 2176 } 2177 2178 BUG_ON(num_cap_releases != 0); 2179 2180 spin_lock(&session->s_cap_lock); 2181 if (!list_empty(&session->s_cap_releases)) 2182 goto again; 2183 spin_unlock(&session->s_cap_lock); 2184 2185 if (msg) { 2186 // Append cap_barrier field 2187 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2188 *cap_barrier = barrier; 2189 msg->front.iov_len += sizeof(*cap_barrier); 2190 2191 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2192 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2193 ceph_con_send(&session->s_con, msg); 2194 } 2195 return; 2196 out_err: 2197 pr_err("send_cap_releases mds%d, failed to allocate message\n", 2198 session->s_mds); 2199 spin_lock(&session->s_cap_lock); 2200 list_splice(&tmp_list, &session->s_cap_releases); 2201 session->s_num_cap_releases += num_cap_releases; 2202 spin_unlock(&session->s_cap_lock); 2203 } 2204 2205 static void ceph_cap_release_work(struct work_struct *work) 2206 { 2207 struct ceph_mds_session *session = 2208 container_of(work, struct ceph_mds_session, s_cap_release_work); 2209 2210 mutex_lock(&session->s_mutex); 2211 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2212 session->s_state == CEPH_MDS_SESSION_HUNG) 2213 ceph_send_cap_releases(session->s_mdsc, session); 2214 mutex_unlock(&session->s_mutex); 2215 ceph_put_mds_session(session); 2216 } 2217 2218 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 2219 struct ceph_mds_session *session) 2220 { 2221 if (mdsc->stopping) 2222 return; 2223 2224 ceph_get_mds_session(session); 2225 if (queue_work(mdsc->fsc->cap_wq, 2226 &session->s_cap_release_work)) { 2227 dout("cap release work queued\n"); 2228 } else { 2229 ceph_put_mds_session(session); 2230 dout("failed to queue cap release work\n"); 2231 } 2232 } 2233 2234 /* 2235 * caller holds session->s_cap_lock 2236 */ 2237 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2238 struct ceph_cap *cap) 2239 { 2240 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2241 session->s_num_cap_releases++; 2242 2243 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2244 ceph_flush_cap_releases(session->s_mdsc, session); 2245 } 2246 2247 static void ceph_cap_reclaim_work(struct work_struct *work) 2248 { 2249 struct ceph_mds_client *mdsc = 2250 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2251 int ret = ceph_trim_dentries(mdsc); 2252 if (ret == -EAGAIN) 2253 ceph_queue_cap_reclaim_work(mdsc); 2254 } 2255 2256 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2257 { 2258 if (mdsc->stopping) 2259 return; 2260 2261 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2262 dout("caps reclaim work queued\n"); 2263 } else { 2264 dout("failed to queue caps release work\n"); 2265 } 2266 } 2267 2268 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2269 { 2270 int val; 2271 if (!nr) 2272 return; 2273 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2274 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2275 atomic_set(&mdsc->cap_reclaim_pending, 0); 2276 ceph_queue_cap_reclaim_work(mdsc); 2277 } 2278 } 2279 2280 /* 2281 * requests 2282 */ 2283 2284 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2285 struct inode *dir) 2286 { 2287 struct ceph_inode_info *ci = ceph_inode(dir); 2288 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2289 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2290 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2291 unsigned int num_entries; 2292 int order; 2293 2294 spin_lock(&ci->i_ceph_lock); 2295 num_entries = ci->i_files + ci->i_subdirs; 2296 spin_unlock(&ci->i_ceph_lock); 2297 num_entries = max(num_entries, 1U); 2298 num_entries = min(num_entries, opt->max_readdir); 2299 2300 order = get_order(size * num_entries); 2301 while (order >= 0) { 2302 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2303 __GFP_NOWARN | 2304 __GFP_ZERO, 2305 order); 2306 if (rinfo->dir_entries) 2307 break; 2308 order--; 2309 } 2310 if (!rinfo->dir_entries) 2311 return -ENOMEM; 2312 2313 num_entries = (PAGE_SIZE << order) / size; 2314 num_entries = min(num_entries, opt->max_readdir); 2315 2316 rinfo->dir_buf_size = PAGE_SIZE << order; 2317 req->r_num_caps = num_entries + 1; 2318 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2319 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2320 return 0; 2321 } 2322 2323 /* 2324 * Create an mds request. 2325 */ 2326 struct ceph_mds_request * 2327 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2328 { 2329 struct ceph_mds_request *req; 2330 2331 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2332 if (!req) 2333 return ERR_PTR(-ENOMEM); 2334 2335 mutex_init(&req->r_fill_mutex); 2336 req->r_mdsc = mdsc; 2337 req->r_started = jiffies; 2338 req->r_start_latency = ktime_get(); 2339 req->r_resend_mds = -1; 2340 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2341 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2342 req->r_fmode = -1; 2343 req->r_feature_needed = -1; 2344 kref_init(&req->r_kref); 2345 RB_CLEAR_NODE(&req->r_node); 2346 INIT_LIST_HEAD(&req->r_wait); 2347 init_completion(&req->r_completion); 2348 init_completion(&req->r_safe_completion); 2349 INIT_LIST_HEAD(&req->r_unsafe_item); 2350 2351 ktime_get_coarse_real_ts64(&req->r_stamp); 2352 2353 req->r_op = op; 2354 req->r_direct_mode = mode; 2355 return req; 2356 } 2357 2358 /* 2359 * return oldest (lowest) request, tid in request tree, 0 if none. 2360 * 2361 * called under mdsc->mutex. 2362 */ 2363 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2364 { 2365 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2366 return NULL; 2367 return rb_entry(rb_first(&mdsc->request_tree), 2368 struct ceph_mds_request, r_node); 2369 } 2370 2371 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2372 { 2373 return mdsc->oldest_tid; 2374 } 2375 2376 /* 2377 * Build a dentry's path. Allocate on heap; caller must kfree. Based 2378 * on build_path_from_dentry in fs/cifs/dir.c. 2379 * 2380 * If @stop_on_nosnap, generate path relative to the first non-snapped 2381 * inode. 2382 * 2383 * Encode hidden .snap dirs as a double /, i.e. 2384 * foo/.snap/bar -> foo//bar 2385 */ 2386 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, 2387 int stop_on_nosnap) 2388 { 2389 struct dentry *temp; 2390 char *path; 2391 int pos; 2392 unsigned seq; 2393 u64 base; 2394 2395 if (!dentry) 2396 return ERR_PTR(-EINVAL); 2397 2398 path = __getname(); 2399 if (!path) 2400 return ERR_PTR(-ENOMEM); 2401 retry: 2402 pos = PATH_MAX - 1; 2403 path[pos] = '\0'; 2404 2405 seq = read_seqbegin(&rename_lock); 2406 rcu_read_lock(); 2407 temp = dentry; 2408 for (;;) { 2409 struct inode *inode; 2410 2411 spin_lock(&temp->d_lock); 2412 inode = d_inode(temp); 2413 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2414 dout("build_path path+%d: %p SNAPDIR\n", 2415 pos, temp); 2416 } else if (stop_on_nosnap && inode && dentry != temp && 2417 ceph_snap(inode) == CEPH_NOSNAP) { 2418 spin_unlock(&temp->d_lock); 2419 pos++; /* get rid of any prepended '/' */ 2420 break; 2421 } else { 2422 pos -= temp->d_name.len; 2423 if (pos < 0) { 2424 spin_unlock(&temp->d_lock); 2425 break; 2426 } 2427 memcpy(path + pos, temp->d_name.name, temp->d_name.len); 2428 } 2429 spin_unlock(&temp->d_lock); 2430 temp = READ_ONCE(temp->d_parent); 2431 2432 /* Are we at the root? */ 2433 if (IS_ROOT(temp)) 2434 break; 2435 2436 /* Are we out of buffer? */ 2437 if (--pos < 0) 2438 break; 2439 2440 path[pos] = '/'; 2441 } 2442 base = ceph_ino(d_inode(temp)); 2443 rcu_read_unlock(); 2444 2445 if (read_seqretry(&rename_lock, seq)) 2446 goto retry; 2447 2448 if (pos < 0) { 2449 /* 2450 * A rename didn't occur, but somehow we didn't end up where 2451 * we thought we would. Throw a warning and try again. 2452 */ 2453 pr_warn("build_path did not end path lookup where " 2454 "expected, pos is %d\n", pos); 2455 goto retry; 2456 } 2457 2458 *pbase = base; 2459 *plen = PATH_MAX - 1 - pos; 2460 dout("build_path on %p %d built %llx '%.*s'\n", 2461 dentry, d_count(dentry), base, *plen, path + pos); 2462 return path + pos; 2463 } 2464 2465 static int build_dentry_path(struct dentry *dentry, struct inode *dir, 2466 const char **ppath, int *ppathlen, u64 *pino, 2467 bool *pfreepath, bool parent_locked) 2468 { 2469 char *path; 2470 2471 rcu_read_lock(); 2472 if (!dir) 2473 dir = d_inode_rcu(dentry->d_parent); 2474 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { 2475 *pino = ceph_ino(dir); 2476 rcu_read_unlock(); 2477 *ppath = dentry->d_name.name; 2478 *ppathlen = dentry->d_name.len; 2479 return 0; 2480 } 2481 rcu_read_unlock(); 2482 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2483 if (IS_ERR(path)) 2484 return PTR_ERR(path); 2485 *ppath = path; 2486 *pfreepath = true; 2487 return 0; 2488 } 2489 2490 static int build_inode_path(struct inode *inode, 2491 const char **ppath, int *ppathlen, u64 *pino, 2492 bool *pfreepath) 2493 { 2494 struct dentry *dentry; 2495 char *path; 2496 2497 if (ceph_snap(inode) == CEPH_NOSNAP) { 2498 *pino = ceph_ino(inode); 2499 *ppathlen = 0; 2500 return 0; 2501 } 2502 dentry = d_find_alias(inode); 2503 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2504 dput(dentry); 2505 if (IS_ERR(path)) 2506 return PTR_ERR(path); 2507 *ppath = path; 2508 *pfreepath = true; 2509 return 0; 2510 } 2511 2512 /* 2513 * request arguments may be specified via an inode *, a dentry *, or 2514 * an explicit ino+path. 2515 */ 2516 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2517 struct inode *rdiri, const char *rpath, 2518 u64 rino, const char **ppath, int *pathlen, 2519 u64 *ino, bool *freepath, bool parent_locked) 2520 { 2521 int r = 0; 2522 2523 if (rinode) { 2524 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2525 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2526 ceph_snap(rinode)); 2527 } else if (rdentry) { 2528 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2529 freepath, parent_locked); 2530 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2531 *ppath); 2532 } else if (rpath || rino) { 2533 *ino = rino; 2534 *ppath = rpath; 2535 *pathlen = rpath ? strlen(rpath) : 0; 2536 dout(" path %.*s\n", *pathlen, rpath); 2537 } 2538 2539 return r; 2540 } 2541 2542 static void encode_timestamp_and_gids(void **p, 2543 const struct ceph_mds_request *req) 2544 { 2545 struct ceph_timespec ts; 2546 int i; 2547 2548 ceph_encode_timespec64(&ts, &req->r_stamp); 2549 ceph_encode_copy(p, &ts, sizeof(ts)); 2550 2551 /* gid_list */ 2552 ceph_encode_32(p, req->r_cred->group_info->ngroups); 2553 for (i = 0; i < req->r_cred->group_info->ngroups; i++) 2554 ceph_encode_64(p, from_kgid(&init_user_ns, 2555 req->r_cred->group_info->gid[i])); 2556 } 2557 2558 /* 2559 * called under mdsc->mutex 2560 */ 2561 static struct ceph_msg *create_request_message(struct ceph_mds_session *session, 2562 struct ceph_mds_request *req, 2563 bool drop_cap_releases) 2564 { 2565 int mds = session->s_mds; 2566 struct ceph_mds_client *mdsc = session->s_mdsc; 2567 struct ceph_msg *msg; 2568 struct ceph_mds_request_head_old *head; 2569 const char *path1 = NULL; 2570 const char *path2 = NULL; 2571 u64 ino1 = 0, ino2 = 0; 2572 int pathlen1 = 0, pathlen2 = 0; 2573 bool freepath1 = false, freepath2 = false; 2574 struct dentry *old_dentry = NULL; 2575 int len; 2576 u16 releases; 2577 void *p, *end; 2578 int ret; 2579 bool legacy = !(session->s_con.peer_features & CEPH_FEATURE_FS_BTIME); 2580 2581 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2582 req->r_parent, req->r_path1, req->r_ino1.ino, 2583 &path1, &pathlen1, &ino1, &freepath1, 2584 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2585 &req->r_req_flags)); 2586 if (ret < 0) { 2587 msg = ERR_PTR(ret); 2588 goto out; 2589 } 2590 2591 /* If r_old_dentry is set, then assume that its parent is locked */ 2592 if (req->r_old_dentry && 2593 !(req->r_old_dentry->d_flags & DCACHE_DISCONNECTED)) 2594 old_dentry = req->r_old_dentry; 2595 ret = set_request_path_attr(NULL, old_dentry, 2596 req->r_old_dentry_dir, 2597 req->r_path2, req->r_ino2.ino, 2598 &path2, &pathlen2, &ino2, &freepath2, true); 2599 if (ret < 0) { 2600 msg = ERR_PTR(ret); 2601 goto out_free1; 2602 } 2603 2604 len = legacy ? sizeof(*head) : sizeof(struct ceph_mds_request_head); 2605 len += pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2606 sizeof(struct ceph_timespec); 2607 len += sizeof(u32) + (sizeof(u64) * req->r_cred->group_info->ngroups); 2608 2609 /* calculate (max) length for cap releases */ 2610 len += sizeof(struct ceph_mds_request_release) * 2611 (!!req->r_inode_drop + !!req->r_dentry_drop + 2612 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2613 2614 if (req->r_dentry_drop) 2615 len += pathlen1; 2616 if (req->r_old_dentry_drop) 2617 len += pathlen2; 2618 2619 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2620 if (!msg) { 2621 msg = ERR_PTR(-ENOMEM); 2622 goto out_free2; 2623 } 2624 2625 msg->hdr.tid = cpu_to_le64(req->r_tid); 2626 2627 /* 2628 * The old ceph_mds_request_head didn't contain a version field, and 2629 * one was added when we moved the message version from 3->4. 2630 */ 2631 if (legacy) { 2632 msg->hdr.version = cpu_to_le16(3); 2633 head = msg->front.iov_base; 2634 p = msg->front.iov_base + sizeof(*head); 2635 } else { 2636 struct ceph_mds_request_head *new_head = msg->front.iov_base; 2637 2638 msg->hdr.version = cpu_to_le16(4); 2639 new_head->version = cpu_to_le16(CEPH_MDS_REQUEST_HEAD_VERSION); 2640 head = (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2641 p = msg->front.iov_base + sizeof(*new_head); 2642 } 2643 2644 end = msg->front.iov_base + msg->front.iov_len; 2645 2646 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2647 head->op = cpu_to_le32(req->r_op); 2648 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, 2649 req->r_cred->fsuid)); 2650 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, 2651 req->r_cred->fsgid)); 2652 head->ino = cpu_to_le64(req->r_deleg_ino); 2653 head->args = req->r_args; 2654 2655 ceph_encode_filepath(&p, end, ino1, path1); 2656 ceph_encode_filepath(&p, end, ino2, path2); 2657 2658 /* make note of release offset, in case we need to replay */ 2659 req->r_request_release_offset = p - msg->front.iov_base; 2660 2661 /* cap releases */ 2662 releases = 0; 2663 if (req->r_inode_drop) 2664 releases += ceph_encode_inode_release(&p, 2665 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2666 mds, req->r_inode_drop, req->r_inode_unless, 2667 req->r_op == CEPH_MDS_OP_READDIR); 2668 if (req->r_dentry_drop) 2669 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2670 req->r_parent, mds, req->r_dentry_drop, 2671 req->r_dentry_unless); 2672 if (req->r_old_dentry_drop) 2673 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2674 req->r_old_dentry_dir, mds, 2675 req->r_old_dentry_drop, 2676 req->r_old_dentry_unless); 2677 if (req->r_old_inode_drop) 2678 releases += ceph_encode_inode_release(&p, 2679 d_inode(req->r_old_dentry), 2680 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2681 2682 if (drop_cap_releases) { 2683 releases = 0; 2684 p = msg->front.iov_base + req->r_request_release_offset; 2685 } 2686 2687 head->num_releases = cpu_to_le16(releases); 2688 2689 encode_timestamp_and_gids(&p, req); 2690 2691 if (WARN_ON_ONCE(p > end)) { 2692 ceph_msg_put(msg); 2693 msg = ERR_PTR(-ERANGE); 2694 goto out_free2; 2695 } 2696 2697 msg->front.iov_len = p - msg->front.iov_base; 2698 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2699 2700 if (req->r_pagelist) { 2701 struct ceph_pagelist *pagelist = req->r_pagelist; 2702 ceph_msg_data_add_pagelist(msg, pagelist); 2703 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2704 } else { 2705 msg->hdr.data_len = 0; 2706 } 2707 2708 msg->hdr.data_off = cpu_to_le16(0); 2709 2710 out_free2: 2711 if (freepath2) 2712 ceph_mdsc_free_path((char *)path2, pathlen2); 2713 out_free1: 2714 if (freepath1) 2715 ceph_mdsc_free_path((char *)path1, pathlen1); 2716 out: 2717 return msg; 2718 } 2719 2720 /* 2721 * called under mdsc->mutex if error, under no mutex if 2722 * success. 2723 */ 2724 static void complete_request(struct ceph_mds_client *mdsc, 2725 struct ceph_mds_request *req) 2726 { 2727 req->r_end_latency = ktime_get(); 2728 2729 if (req->r_callback) 2730 req->r_callback(mdsc, req); 2731 complete_all(&req->r_completion); 2732 } 2733 2734 static struct ceph_mds_request_head_old * 2735 find_old_request_head(void *p, u64 features) 2736 { 2737 bool legacy = !(features & CEPH_FEATURE_FS_BTIME); 2738 struct ceph_mds_request_head *new_head; 2739 2740 if (legacy) 2741 return (struct ceph_mds_request_head_old *)p; 2742 new_head = (struct ceph_mds_request_head *)p; 2743 return (struct ceph_mds_request_head_old *)&new_head->oldest_client_tid; 2744 } 2745 2746 /* 2747 * called under mdsc->mutex 2748 */ 2749 static int __prepare_send_request(struct ceph_mds_session *session, 2750 struct ceph_mds_request *req, 2751 bool drop_cap_releases) 2752 { 2753 int mds = session->s_mds; 2754 struct ceph_mds_client *mdsc = session->s_mdsc; 2755 struct ceph_mds_request_head_old *rhead; 2756 struct ceph_msg *msg; 2757 int flags = 0, max_retry; 2758 2759 /* 2760 * The type of 'r_attempts' in kernel 'ceph_mds_request' 2761 * is 'int', while in 'ceph_mds_request_head' the type of 2762 * 'num_retry' is '__u8'. So in case the request retries 2763 * exceeding 256 times, the MDS will receive a incorrect 2764 * retry seq. 2765 * 2766 * In this case it's ususally a bug in MDS and continue 2767 * retrying the request makes no sense. 2768 * 2769 * In future this could be fixed in ceph code, so avoid 2770 * using the hardcode here. 2771 */ 2772 max_retry = sizeof_field(struct ceph_mds_request_head, num_retry); 2773 max_retry = 1 << (max_retry * BITS_PER_BYTE); 2774 if (req->r_attempts >= max_retry) { 2775 pr_warn_ratelimited("%s request tid %llu seq overflow\n", 2776 __func__, req->r_tid); 2777 return -EMULTIHOP; 2778 } 2779 2780 req->r_attempts++; 2781 if (req->r_inode) { 2782 struct ceph_cap *cap = 2783 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2784 2785 if (cap) 2786 req->r_sent_on_mseq = cap->mseq; 2787 else 2788 req->r_sent_on_mseq = -1; 2789 } 2790 dout("%s %p tid %lld %s (attempt %d)\n", __func__, req, 2791 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2792 2793 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2794 void *p; 2795 2796 /* 2797 * Replay. Do not regenerate message (and rebuild 2798 * paths, etc.); just use the original message. 2799 * Rebuilding paths will break for renames because 2800 * d_move mangles the src name. 2801 */ 2802 msg = req->r_request; 2803 rhead = find_old_request_head(msg->front.iov_base, 2804 session->s_con.peer_features); 2805 2806 flags = le32_to_cpu(rhead->flags); 2807 flags |= CEPH_MDS_FLAG_REPLAY; 2808 rhead->flags = cpu_to_le32(flags); 2809 2810 if (req->r_target_inode) 2811 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2812 2813 rhead->num_retry = req->r_attempts - 1; 2814 2815 /* remove cap/dentry releases from message */ 2816 rhead->num_releases = 0; 2817 2818 p = msg->front.iov_base + req->r_request_release_offset; 2819 encode_timestamp_and_gids(&p, req); 2820 2821 msg->front.iov_len = p - msg->front.iov_base; 2822 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2823 return 0; 2824 } 2825 2826 if (req->r_request) { 2827 ceph_msg_put(req->r_request); 2828 req->r_request = NULL; 2829 } 2830 msg = create_request_message(session, req, drop_cap_releases); 2831 if (IS_ERR(msg)) { 2832 req->r_err = PTR_ERR(msg); 2833 return PTR_ERR(msg); 2834 } 2835 req->r_request = msg; 2836 2837 rhead = find_old_request_head(msg->front.iov_base, 2838 session->s_con.peer_features); 2839 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2840 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2841 flags |= CEPH_MDS_FLAG_REPLAY; 2842 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 2843 flags |= CEPH_MDS_FLAG_ASYNC; 2844 if (req->r_parent) 2845 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2846 rhead->flags = cpu_to_le32(flags); 2847 rhead->num_fwd = req->r_num_fwd; 2848 rhead->num_retry = req->r_attempts - 1; 2849 2850 dout(" r_parent = %p\n", req->r_parent); 2851 return 0; 2852 } 2853 2854 /* 2855 * called under mdsc->mutex 2856 */ 2857 static int __send_request(struct ceph_mds_session *session, 2858 struct ceph_mds_request *req, 2859 bool drop_cap_releases) 2860 { 2861 int err; 2862 2863 err = __prepare_send_request(session, req, drop_cap_releases); 2864 if (!err) { 2865 ceph_msg_get(req->r_request); 2866 ceph_con_send(&session->s_con, req->r_request); 2867 } 2868 2869 return err; 2870 } 2871 2872 /* 2873 * send request, or put it on the appropriate wait list. 2874 */ 2875 static void __do_request(struct ceph_mds_client *mdsc, 2876 struct ceph_mds_request *req) 2877 { 2878 struct ceph_mds_session *session = NULL; 2879 int mds = -1; 2880 int err = 0; 2881 bool random; 2882 2883 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2884 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2885 __unregister_request(mdsc, req); 2886 return; 2887 } 2888 2889 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) { 2890 dout("do_request metadata corrupted\n"); 2891 err = -EIO; 2892 goto finish; 2893 } 2894 if (req->r_timeout && 2895 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2896 dout("do_request timed out\n"); 2897 err = -ETIMEDOUT; 2898 goto finish; 2899 } 2900 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2901 dout("do_request forced umount\n"); 2902 err = -EIO; 2903 goto finish; 2904 } 2905 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2906 if (mdsc->mdsmap_err) { 2907 err = mdsc->mdsmap_err; 2908 dout("do_request mdsmap err %d\n", err); 2909 goto finish; 2910 } 2911 if (mdsc->mdsmap->m_epoch == 0) { 2912 dout("do_request no mdsmap, waiting for map\n"); 2913 list_add(&req->r_wait, &mdsc->waiting_for_map); 2914 return; 2915 } 2916 if (!(mdsc->fsc->mount_options->flags & 2917 CEPH_MOUNT_OPT_MOUNTWAIT) && 2918 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2919 err = -EHOSTUNREACH; 2920 goto finish; 2921 } 2922 } 2923 2924 put_request_session(req); 2925 2926 mds = __choose_mds(mdsc, req, &random); 2927 if (mds < 0 || 2928 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2929 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2930 err = -EJUKEBOX; 2931 goto finish; 2932 } 2933 dout("do_request no mds or not active, waiting for map\n"); 2934 list_add(&req->r_wait, &mdsc->waiting_for_map); 2935 return; 2936 } 2937 2938 /* get, open session */ 2939 session = __ceph_lookup_mds_session(mdsc, mds); 2940 if (!session) { 2941 session = register_session(mdsc, mds); 2942 if (IS_ERR(session)) { 2943 err = PTR_ERR(session); 2944 goto finish; 2945 } 2946 } 2947 req->r_session = ceph_get_mds_session(session); 2948 2949 dout("do_request mds%d session %p state %s\n", mds, session, 2950 ceph_session_state_name(session->s_state)); 2951 2952 /* 2953 * The old ceph will crash the MDSs when see unknown OPs 2954 */ 2955 if (req->r_feature_needed > 0 && 2956 !test_bit(req->r_feature_needed, &session->s_features)) { 2957 err = -EOPNOTSUPP; 2958 goto out_session; 2959 } 2960 2961 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2962 session->s_state != CEPH_MDS_SESSION_HUNG) { 2963 /* 2964 * We cannot queue async requests since the caps and delegated 2965 * inodes are bound to the session. Just return -EJUKEBOX and 2966 * let the caller retry a sync request in that case. 2967 */ 2968 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2969 err = -EJUKEBOX; 2970 goto out_session; 2971 } 2972 2973 /* 2974 * If the session has been REJECTED, then return a hard error, 2975 * unless it's a CLEANRECOVER mount, in which case we'll queue 2976 * it to the mdsc queue. 2977 */ 2978 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2979 if (ceph_test_mount_opt(mdsc->fsc, CLEANRECOVER)) 2980 list_add(&req->r_wait, &mdsc->waiting_for_map); 2981 else 2982 err = -EACCES; 2983 goto out_session; 2984 } 2985 2986 if (session->s_state == CEPH_MDS_SESSION_NEW || 2987 session->s_state == CEPH_MDS_SESSION_CLOSING) { 2988 err = __open_session(mdsc, session); 2989 if (err) 2990 goto out_session; 2991 /* retry the same mds later */ 2992 if (random) 2993 req->r_resend_mds = mds; 2994 } 2995 list_add(&req->r_wait, &session->s_waiting); 2996 goto out_session; 2997 } 2998 2999 /* send request */ 3000 req->r_resend_mds = -1; /* forget any previous mds hint */ 3001 3002 if (req->r_request_started == 0) /* note request start time */ 3003 req->r_request_started = jiffies; 3004 3005 /* 3006 * For async create we will choose the auth MDS of frag in parent 3007 * directory to send the request and ususally this works fine, but 3008 * if the migrated the dirtory to another MDS before it could handle 3009 * it the request will be forwarded. 3010 * 3011 * And then the auth cap will be changed. 3012 */ 3013 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags) && req->r_num_fwd) { 3014 struct ceph_dentry_info *di = ceph_dentry(req->r_dentry); 3015 struct ceph_inode_info *ci; 3016 struct ceph_cap *cap; 3017 3018 /* 3019 * The request maybe handled very fast and the new inode 3020 * hasn't been linked to the dentry yet. We need to wait 3021 * for the ceph_finish_async_create(), which shouldn't be 3022 * stuck too long or fail in thoery, to finish when forwarding 3023 * the request. 3024 */ 3025 if (!d_inode(req->r_dentry)) { 3026 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_CREATE_BIT, 3027 TASK_KILLABLE); 3028 if (err) { 3029 mutex_lock(&req->r_fill_mutex); 3030 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3031 mutex_unlock(&req->r_fill_mutex); 3032 goto out_session; 3033 } 3034 } 3035 3036 ci = ceph_inode(d_inode(req->r_dentry)); 3037 3038 spin_lock(&ci->i_ceph_lock); 3039 cap = ci->i_auth_cap; 3040 if (ci->i_ceph_flags & CEPH_I_ASYNC_CREATE && mds != cap->mds) { 3041 dout("do_request session changed for auth cap %d -> %d\n", 3042 cap->session->s_mds, session->s_mds); 3043 3044 /* Remove the auth cap from old session */ 3045 spin_lock(&cap->session->s_cap_lock); 3046 cap->session->s_nr_caps--; 3047 list_del_init(&cap->session_caps); 3048 spin_unlock(&cap->session->s_cap_lock); 3049 3050 /* Add the auth cap to the new session */ 3051 cap->mds = mds; 3052 cap->session = session; 3053 spin_lock(&session->s_cap_lock); 3054 session->s_nr_caps++; 3055 list_add_tail(&cap->session_caps, &session->s_caps); 3056 spin_unlock(&session->s_cap_lock); 3057 3058 change_auth_cap_ses(ci, session); 3059 } 3060 spin_unlock(&ci->i_ceph_lock); 3061 } 3062 3063 err = __send_request(session, req, false); 3064 3065 out_session: 3066 ceph_put_mds_session(session); 3067 finish: 3068 if (err) { 3069 dout("__do_request early error %d\n", err); 3070 req->r_err = err; 3071 complete_request(mdsc, req); 3072 __unregister_request(mdsc, req); 3073 } 3074 return; 3075 } 3076 3077 /* 3078 * called under mdsc->mutex 3079 */ 3080 static void __wake_requests(struct ceph_mds_client *mdsc, 3081 struct list_head *head) 3082 { 3083 struct ceph_mds_request *req; 3084 LIST_HEAD(tmp_list); 3085 3086 list_splice_init(head, &tmp_list); 3087 3088 while (!list_empty(&tmp_list)) { 3089 req = list_entry(tmp_list.next, 3090 struct ceph_mds_request, r_wait); 3091 list_del_init(&req->r_wait); 3092 dout(" wake request %p tid %llu\n", req, req->r_tid); 3093 __do_request(mdsc, req); 3094 } 3095 } 3096 3097 /* 3098 * Wake up threads with requests pending for @mds, so that they can 3099 * resubmit their requests to a possibly different mds. 3100 */ 3101 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 3102 { 3103 struct ceph_mds_request *req; 3104 struct rb_node *p = rb_first(&mdsc->request_tree); 3105 3106 dout("kick_requests mds%d\n", mds); 3107 while (p) { 3108 req = rb_entry(p, struct ceph_mds_request, r_node); 3109 p = rb_next(p); 3110 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3111 continue; 3112 if (req->r_attempts > 0) 3113 continue; /* only new requests */ 3114 if (req->r_session && 3115 req->r_session->s_mds == mds) { 3116 dout(" kicking tid %llu\n", req->r_tid); 3117 list_del_init(&req->r_wait); 3118 __do_request(mdsc, req); 3119 } 3120 } 3121 } 3122 3123 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 3124 struct ceph_mds_request *req) 3125 { 3126 int err = 0; 3127 3128 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 3129 if (req->r_inode) 3130 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 3131 if (req->r_parent) { 3132 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 3133 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 3134 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 3135 spin_lock(&ci->i_ceph_lock); 3136 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 3137 __ceph_touch_fmode(ci, mdsc, fmode); 3138 spin_unlock(&ci->i_ceph_lock); 3139 } 3140 if (req->r_old_dentry_dir) 3141 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 3142 CEPH_CAP_PIN); 3143 3144 if (req->r_inode) { 3145 err = ceph_wait_on_async_create(req->r_inode); 3146 if (err) { 3147 dout("%s: wait for async create returned: %d\n", 3148 __func__, err); 3149 return err; 3150 } 3151 } 3152 3153 if (!err && req->r_old_inode) { 3154 err = ceph_wait_on_async_create(req->r_old_inode); 3155 if (err) { 3156 dout("%s: wait for async create returned: %d\n", 3157 __func__, err); 3158 return err; 3159 } 3160 } 3161 3162 dout("submit_request on %p for inode %p\n", req, dir); 3163 mutex_lock(&mdsc->mutex); 3164 __register_request(mdsc, req, dir); 3165 __do_request(mdsc, req); 3166 err = req->r_err; 3167 mutex_unlock(&mdsc->mutex); 3168 return err; 3169 } 3170 3171 int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 3172 struct ceph_mds_request *req, 3173 ceph_mds_request_wait_callback_t wait_func) 3174 { 3175 int err; 3176 3177 /* wait */ 3178 dout("do_request waiting\n"); 3179 if (wait_func) { 3180 err = wait_func(mdsc, req); 3181 } else { 3182 long timeleft = wait_for_completion_killable_timeout( 3183 &req->r_completion, 3184 ceph_timeout_jiffies(req->r_timeout)); 3185 if (timeleft > 0) 3186 err = 0; 3187 else if (!timeleft) 3188 err = -ETIMEDOUT; /* timed out */ 3189 else 3190 err = timeleft; /* killed */ 3191 } 3192 dout("do_request waited, got %d\n", err); 3193 mutex_lock(&mdsc->mutex); 3194 3195 /* only abort if we didn't race with a real reply */ 3196 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 3197 err = le32_to_cpu(req->r_reply_info.head->result); 3198 } else if (err < 0) { 3199 dout("aborted request %lld with %d\n", req->r_tid, err); 3200 3201 /* 3202 * ensure we aren't running concurrently with 3203 * ceph_fill_trace or ceph_readdir_prepopulate, which 3204 * rely on locks (dir mutex) held by our caller. 3205 */ 3206 mutex_lock(&req->r_fill_mutex); 3207 req->r_err = err; 3208 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3209 mutex_unlock(&req->r_fill_mutex); 3210 3211 if (req->r_parent && 3212 (req->r_op & CEPH_MDS_OP_WRITE)) 3213 ceph_invalidate_dir_request(req); 3214 } else { 3215 err = req->r_err; 3216 } 3217 3218 mutex_unlock(&mdsc->mutex); 3219 return err; 3220 } 3221 3222 /* 3223 * Synchrously perform an mds request. Take care of all of the 3224 * session setup, forwarding, retry details. 3225 */ 3226 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3227 struct inode *dir, 3228 struct ceph_mds_request *req) 3229 { 3230 int err; 3231 3232 dout("do_request on %p\n", req); 3233 3234 /* issue */ 3235 err = ceph_mdsc_submit_request(mdsc, dir, req); 3236 if (!err) 3237 err = ceph_mdsc_wait_request(mdsc, req, NULL); 3238 dout("do_request %p done, result %d\n", req, err); 3239 return err; 3240 } 3241 3242 /* 3243 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3244 * namespace request. 3245 */ 3246 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3247 { 3248 struct inode *dir = req->r_parent; 3249 struct inode *old_dir = req->r_old_dentry_dir; 3250 3251 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 3252 3253 ceph_dir_clear_complete(dir); 3254 if (old_dir) 3255 ceph_dir_clear_complete(old_dir); 3256 if (req->r_dentry) 3257 ceph_invalidate_dentry_lease(req->r_dentry); 3258 if (req->r_old_dentry) 3259 ceph_invalidate_dentry_lease(req->r_old_dentry); 3260 } 3261 3262 /* 3263 * Handle mds reply. 3264 * 3265 * We take the session mutex and parse and process the reply immediately. 3266 * This preserves the logical ordering of replies, capabilities, etc., sent 3267 * by the MDS as they are applied to our local cache. 3268 */ 3269 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3270 { 3271 struct ceph_mds_client *mdsc = session->s_mdsc; 3272 struct ceph_mds_request *req; 3273 struct ceph_mds_reply_head *head = msg->front.iov_base; 3274 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3275 struct ceph_snap_realm *realm; 3276 u64 tid; 3277 int err, result; 3278 int mds = session->s_mds; 3279 bool close_sessions = false; 3280 3281 if (msg->front.iov_len < sizeof(*head)) { 3282 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 3283 ceph_msg_dump(msg); 3284 return; 3285 } 3286 3287 /* get request, session */ 3288 tid = le64_to_cpu(msg->hdr.tid); 3289 mutex_lock(&mdsc->mutex); 3290 req = lookup_get_request(mdsc, tid); 3291 if (!req) { 3292 dout("handle_reply on unknown tid %llu\n", tid); 3293 mutex_unlock(&mdsc->mutex); 3294 return; 3295 } 3296 dout("handle_reply %p\n", req); 3297 3298 /* correct session? */ 3299 if (req->r_session != session) { 3300 pr_err("mdsc_handle_reply got %llu on session mds%d" 3301 " not mds%d\n", tid, session->s_mds, 3302 req->r_session ? req->r_session->s_mds : -1); 3303 mutex_unlock(&mdsc->mutex); 3304 goto out; 3305 } 3306 3307 /* dup? */ 3308 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3309 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3310 pr_warn("got a dup %s reply on %llu from mds%d\n", 3311 head->safe ? "safe" : "unsafe", tid, mds); 3312 mutex_unlock(&mdsc->mutex); 3313 goto out; 3314 } 3315 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3316 pr_warn("got unsafe after safe on %llu from mds%d\n", 3317 tid, mds); 3318 mutex_unlock(&mdsc->mutex); 3319 goto out; 3320 } 3321 3322 result = le32_to_cpu(head->result); 3323 3324 if (head->safe) { 3325 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3326 __unregister_request(mdsc, req); 3327 3328 /* last request during umount? */ 3329 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3330 complete_all(&mdsc->safe_umount_waiters); 3331 3332 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3333 /* 3334 * We already handled the unsafe response, now do the 3335 * cleanup. No need to examine the response; the MDS 3336 * doesn't include any result info in the safe 3337 * response. And even if it did, there is nothing 3338 * useful we could do with a revised return value. 3339 */ 3340 dout("got safe reply %llu, mds%d\n", tid, mds); 3341 3342 mutex_unlock(&mdsc->mutex); 3343 goto out; 3344 } 3345 } else { 3346 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3347 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3348 } 3349 3350 dout("handle_reply tid %lld result %d\n", tid, result); 3351 rinfo = &req->r_reply_info; 3352 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3353 err = parse_reply_info(session, msg, rinfo, (u64)-1); 3354 else 3355 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); 3356 mutex_unlock(&mdsc->mutex); 3357 3358 /* Must find target inode outside of mutexes to avoid deadlocks */ 3359 if ((err >= 0) && rinfo->head->is_target) { 3360 struct inode *in; 3361 struct ceph_vino tvino = { 3362 .ino = le64_to_cpu(rinfo->targeti.in->ino), 3363 .snap = le64_to_cpu(rinfo->targeti.in->snapid) 3364 }; 3365 3366 in = ceph_get_inode(mdsc->fsc->sb, tvino); 3367 if (IS_ERR(in)) { 3368 err = PTR_ERR(in); 3369 mutex_lock(&session->s_mutex); 3370 goto out_err; 3371 } 3372 req->r_target_inode = in; 3373 } 3374 3375 mutex_lock(&session->s_mutex); 3376 if (err < 0) { 3377 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 3378 ceph_msg_dump(msg); 3379 goto out_err; 3380 } 3381 3382 /* snap trace */ 3383 realm = NULL; 3384 if (rinfo->snapblob_len) { 3385 down_write(&mdsc->snap_rwsem); 3386 err = ceph_update_snap_trace(mdsc, rinfo->snapblob, 3387 rinfo->snapblob + rinfo->snapblob_len, 3388 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3389 &realm); 3390 if (err) { 3391 up_write(&mdsc->snap_rwsem); 3392 close_sessions = true; 3393 if (err == -EIO) 3394 ceph_msg_dump(msg); 3395 goto out_err; 3396 } 3397 downgrade_write(&mdsc->snap_rwsem); 3398 } else { 3399 down_read(&mdsc->snap_rwsem); 3400 } 3401 3402 /* insert trace into our cache */ 3403 mutex_lock(&req->r_fill_mutex); 3404 current->journal_info = req; 3405 err = ceph_fill_trace(mdsc->fsc->sb, req); 3406 if (err == 0) { 3407 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3408 req->r_op == CEPH_MDS_OP_LSSNAP)) 3409 ceph_readdir_prepopulate(req, req->r_session); 3410 } 3411 current->journal_info = NULL; 3412 mutex_unlock(&req->r_fill_mutex); 3413 3414 up_read(&mdsc->snap_rwsem); 3415 if (realm) 3416 ceph_put_snap_realm(mdsc, realm); 3417 3418 if (err == 0) { 3419 if (req->r_target_inode && 3420 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3421 struct ceph_inode_info *ci = 3422 ceph_inode(req->r_target_inode); 3423 spin_lock(&ci->i_unsafe_lock); 3424 list_add_tail(&req->r_unsafe_target_item, 3425 &ci->i_unsafe_iops); 3426 spin_unlock(&ci->i_unsafe_lock); 3427 } 3428 3429 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3430 } 3431 out_err: 3432 mutex_lock(&mdsc->mutex); 3433 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3434 if (err) { 3435 req->r_err = err; 3436 } else { 3437 req->r_reply = ceph_msg_get(msg); 3438 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3439 } 3440 } else { 3441 dout("reply arrived after request %lld was aborted\n", tid); 3442 } 3443 mutex_unlock(&mdsc->mutex); 3444 3445 mutex_unlock(&session->s_mutex); 3446 3447 /* kick calling process */ 3448 complete_request(mdsc, req); 3449 3450 ceph_update_metadata_metrics(&mdsc->metric, req->r_start_latency, 3451 req->r_end_latency, err); 3452 out: 3453 ceph_mdsc_put_request(req); 3454 3455 /* Defer closing the sessions after s_mutex lock being released */ 3456 if (close_sessions) 3457 ceph_mdsc_close_sessions(mdsc); 3458 return; 3459 } 3460 3461 3462 3463 /* 3464 * handle mds notification that our request has been forwarded. 3465 */ 3466 static void handle_forward(struct ceph_mds_client *mdsc, 3467 struct ceph_mds_session *session, 3468 struct ceph_msg *msg) 3469 { 3470 struct ceph_mds_request *req; 3471 u64 tid = le64_to_cpu(msg->hdr.tid); 3472 u32 next_mds; 3473 u32 fwd_seq; 3474 int err = -EINVAL; 3475 void *p = msg->front.iov_base; 3476 void *end = p + msg->front.iov_len; 3477 bool aborted = false; 3478 3479 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3480 next_mds = ceph_decode_32(&p); 3481 fwd_seq = ceph_decode_32(&p); 3482 3483 mutex_lock(&mdsc->mutex); 3484 req = lookup_get_request(mdsc, tid); 3485 if (!req) { 3486 mutex_unlock(&mdsc->mutex); 3487 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 3488 return; /* dup reply? */ 3489 } 3490 3491 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3492 dout("forward tid %llu aborted, unregistering\n", tid); 3493 __unregister_request(mdsc, req); 3494 } else if (fwd_seq <= req->r_num_fwd) { 3495 /* 3496 * The type of 'num_fwd' in ceph 'MClientRequestForward' 3497 * is 'int32_t', while in 'ceph_mds_request_head' the 3498 * type is '__u8'. So in case the request bounces between 3499 * MDSes exceeding 256 times, the client will get stuck. 3500 * 3501 * In this case it's ususally a bug in MDS and continue 3502 * bouncing the request makes no sense. 3503 * 3504 * In future this could be fixed in ceph code, so avoid 3505 * using the hardcode here. 3506 */ 3507 int max = sizeof_field(struct ceph_mds_request_head, num_fwd); 3508 max = 1 << (max * BITS_PER_BYTE); 3509 if (req->r_num_fwd >= max) { 3510 mutex_lock(&req->r_fill_mutex); 3511 req->r_err = -EMULTIHOP; 3512 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 3513 mutex_unlock(&req->r_fill_mutex); 3514 aborted = true; 3515 pr_warn_ratelimited("forward tid %llu seq overflow\n", 3516 tid); 3517 } else { 3518 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 3519 tid, next_mds, req->r_num_fwd, fwd_seq); 3520 } 3521 } else { 3522 /* resend. forward race not possible; mds would drop */ 3523 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3524 BUG_ON(req->r_err); 3525 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3526 req->r_attempts = 0; 3527 req->r_num_fwd = fwd_seq; 3528 req->r_resend_mds = next_mds; 3529 put_request_session(req); 3530 __do_request(mdsc, req); 3531 } 3532 mutex_unlock(&mdsc->mutex); 3533 3534 /* kick calling process */ 3535 if (aborted) 3536 complete_request(mdsc, req); 3537 ceph_mdsc_put_request(req); 3538 return; 3539 3540 bad: 3541 pr_err("mdsc_handle_forward decode error err=%d\n", err); 3542 ceph_msg_dump(msg); 3543 } 3544 3545 static int __decode_session_metadata(void **p, void *end, 3546 bool *blocklisted) 3547 { 3548 /* map<string,string> */ 3549 u32 n; 3550 bool err_str; 3551 ceph_decode_32_safe(p, end, n, bad); 3552 while (n-- > 0) { 3553 u32 len; 3554 ceph_decode_32_safe(p, end, len, bad); 3555 ceph_decode_need(p, end, len, bad); 3556 err_str = !strncmp(*p, "error_string", len); 3557 *p += len; 3558 ceph_decode_32_safe(p, end, len, bad); 3559 ceph_decode_need(p, end, len, bad); 3560 /* 3561 * Match "blocklisted (blacklisted)" from newer MDSes, 3562 * or "blacklisted" from older MDSes. 3563 */ 3564 if (err_str && strnstr(*p, "blacklisted", len)) 3565 *blocklisted = true; 3566 *p += len; 3567 } 3568 return 0; 3569 bad: 3570 return -1; 3571 } 3572 3573 /* 3574 * handle a mds session control message 3575 */ 3576 static void handle_session(struct ceph_mds_session *session, 3577 struct ceph_msg *msg) 3578 { 3579 struct ceph_mds_client *mdsc = session->s_mdsc; 3580 int mds = session->s_mds; 3581 int msg_version = le16_to_cpu(msg->hdr.version); 3582 void *p = msg->front.iov_base; 3583 void *end = p + msg->front.iov_len; 3584 struct ceph_mds_session_head *h; 3585 u32 op; 3586 u64 seq, features = 0; 3587 int wake = 0; 3588 bool blocklisted = false; 3589 3590 /* decode */ 3591 ceph_decode_need(&p, end, sizeof(*h), bad); 3592 h = p; 3593 p += sizeof(*h); 3594 3595 op = le32_to_cpu(h->op); 3596 seq = le64_to_cpu(h->seq); 3597 3598 if (msg_version >= 3) { 3599 u32 len; 3600 /* version >= 2 and < 5, decode metadata, skip otherwise 3601 * as it's handled via flags. 3602 */ 3603 if (msg_version >= 5) 3604 ceph_decode_skip_map(&p, end, string, string, bad); 3605 else if (__decode_session_metadata(&p, end, &blocklisted) < 0) 3606 goto bad; 3607 3608 /* version >= 3, feature bits */ 3609 ceph_decode_32_safe(&p, end, len, bad); 3610 if (len) { 3611 ceph_decode_64_safe(&p, end, features, bad); 3612 p += len - sizeof(features); 3613 } 3614 } 3615 3616 if (msg_version >= 5) { 3617 u32 flags, len; 3618 3619 /* version >= 4 */ 3620 ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */ 3621 ceph_decode_32_safe(&p, end, len, bad); /* len */ 3622 ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */ 3623 3624 /* version >= 5, flags */ 3625 ceph_decode_32_safe(&p, end, flags, bad); 3626 if (flags & CEPH_SESSION_BLOCKLISTED) { 3627 pr_warn("mds%d session blocklisted\n", session->s_mds); 3628 blocklisted = true; 3629 } 3630 } 3631 3632 mutex_lock(&mdsc->mutex); 3633 if (op == CEPH_SESSION_CLOSE) { 3634 ceph_get_mds_session(session); 3635 __unregister_session(mdsc, session); 3636 } 3637 /* FIXME: this ttl calculation is generous */ 3638 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 3639 mutex_unlock(&mdsc->mutex); 3640 3641 mutex_lock(&session->s_mutex); 3642 3643 dout("handle_session mds%d %s %p state %s seq %llu\n", 3644 mds, ceph_session_op_name(op), session, 3645 ceph_session_state_name(session->s_state), seq); 3646 3647 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 3648 session->s_state = CEPH_MDS_SESSION_OPEN; 3649 pr_info("mds%d came back\n", session->s_mds); 3650 } 3651 3652 switch (op) { 3653 case CEPH_SESSION_OPEN: 3654 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3655 pr_info("mds%d reconnect success\n", session->s_mds); 3656 3657 if (session->s_state == CEPH_MDS_SESSION_OPEN) { 3658 pr_notice("mds%d is already opened\n", session->s_mds); 3659 } else { 3660 session->s_state = CEPH_MDS_SESSION_OPEN; 3661 session->s_features = features; 3662 renewed_caps(mdsc, session, 0); 3663 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, 3664 &session->s_features)) 3665 metric_schedule_delayed(&mdsc->metric); 3666 } 3667 3668 /* 3669 * The connection maybe broken and the session in client 3670 * side has been reinitialized, need to update the seq 3671 * anyway. 3672 */ 3673 if (!session->s_seq && seq) 3674 session->s_seq = seq; 3675 3676 wake = 1; 3677 if (mdsc->stopping) 3678 __close_session(mdsc, session); 3679 break; 3680 3681 case CEPH_SESSION_RENEWCAPS: 3682 if (session->s_renew_seq == seq) 3683 renewed_caps(mdsc, session, 1); 3684 break; 3685 3686 case CEPH_SESSION_CLOSE: 3687 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3688 pr_info("mds%d reconnect denied\n", session->s_mds); 3689 session->s_state = CEPH_MDS_SESSION_CLOSED; 3690 cleanup_session_requests(mdsc, session); 3691 remove_session_caps(session); 3692 wake = 2; /* for good measure */ 3693 wake_up_all(&mdsc->session_close_wq); 3694 break; 3695 3696 case CEPH_SESSION_STALE: 3697 pr_info("mds%d caps went stale, renewing\n", 3698 session->s_mds); 3699 atomic_inc(&session->s_cap_gen); 3700 session->s_cap_ttl = jiffies - 1; 3701 send_renew_caps(mdsc, session); 3702 break; 3703 3704 case CEPH_SESSION_RECALL_STATE: 3705 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 3706 break; 3707 3708 case CEPH_SESSION_FLUSHMSG: 3709 /* flush cap releases */ 3710 spin_lock(&session->s_cap_lock); 3711 if (session->s_num_cap_releases) 3712 ceph_flush_cap_releases(mdsc, session); 3713 spin_unlock(&session->s_cap_lock); 3714 3715 send_flushmsg_ack(mdsc, session, seq); 3716 break; 3717 3718 case CEPH_SESSION_FORCE_RO: 3719 dout("force_session_readonly %p\n", session); 3720 spin_lock(&session->s_cap_lock); 3721 session->s_readonly = true; 3722 spin_unlock(&session->s_cap_lock); 3723 wake_up_session_caps(session, FORCE_RO); 3724 break; 3725 3726 case CEPH_SESSION_REJECT: 3727 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 3728 pr_info("mds%d rejected session\n", session->s_mds); 3729 session->s_state = CEPH_MDS_SESSION_REJECTED; 3730 cleanup_session_requests(mdsc, session); 3731 remove_session_caps(session); 3732 if (blocklisted) 3733 mdsc->fsc->blocklisted = true; 3734 wake = 2; /* for good measure */ 3735 break; 3736 3737 default: 3738 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 3739 WARN_ON(1); 3740 } 3741 3742 mutex_unlock(&session->s_mutex); 3743 if (wake) { 3744 mutex_lock(&mdsc->mutex); 3745 __wake_requests(mdsc, &session->s_waiting); 3746 if (wake == 2) 3747 kick_requests(mdsc, mds); 3748 mutex_unlock(&mdsc->mutex); 3749 } 3750 if (op == CEPH_SESSION_CLOSE) 3751 ceph_put_mds_session(session); 3752 return; 3753 3754 bad: 3755 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 3756 (int)msg->front.iov_len); 3757 ceph_msg_dump(msg); 3758 return; 3759 } 3760 3761 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 3762 { 3763 int dcaps; 3764 3765 dcaps = xchg(&req->r_dir_caps, 0); 3766 if (dcaps) { 3767 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3768 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 3769 } 3770 } 3771 3772 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req) 3773 { 3774 int dcaps; 3775 3776 dcaps = xchg(&req->r_dir_caps, 0); 3777 if (dcaps) { 3778 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3779 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent), 3780 dcaps); 3781 } 3782 } 3783 3784 /* 3785 * called under session->mutex. 3786 */ 3787 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 3788 struct ceph_mds_session *session) 3789 { 3790 struct ceph_mds_request *req, *nreq; 3791 struct rb_node *p; 3792 3793 dout("replay_unsafe_requests mds%d\n", session->s_mds); 3794 3795 mutex_lock(&mdsc->mutex); 3796 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 3797 __send_request(session, req, true); 3798 3799 /* 3800 * also re-send old requests when MDS enters reconnect stage. So that MDS 3801 * can process completed request in clientreplay stage. 3802 */ 3803 p = rb_first(&mdsc->request_tree); 3804 while (p) { 3805 req = rb_entry(p, struct ceph_mds_request, r_node); 3806 p = rb_next(p); 3807 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3808 continue; 3809 if (req->r_attempts == 0) 3810 continue; /* only old requests */ 3811 if (!req->r_session) 3812 continue; 3813 if (req->r_session->s_mds != session->s_mds) 3814 continue; 3815 3816 ceph_mdsc_release_dir_caps_no_check(req); 3817 3818 __send_request(session, req, true); 3819 } 3820 mutex_unlock(&mdsc->mutex); 3821 } 3822 3823 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 3824 { 3825 struct ceph_msg *reply; 3826 struct ceph_pagelist *_pagelist; 3827 struct page *page; 3828 __le32 *addr; 3829 int err = -ENOMEM; 3830 3831 if (!recon_state->allow_multi) 3832 return -ENOSPC; 3833 3834 /* can't handle message that contains both caps and realm */ 3835 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 3836 3837 /* pre-allocate new pagelist */ 3838 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 3839 if (!_pagelist) 3840 return -ENOMEM; 3841 3842 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3843 if (!reply) 3844 goto fail_msg; 3845 3846 /* placeholder for nr_caps */ 3847 err = ceph_pagelist_encode_32(_pagelist, 0); 3848 if (err < 0) 3849 goto fail; 3850 3851 if (recon_state->nr_caps) { 3852 /* currently encoding caps */ 3853 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 3854 if (err) 3855 goto fail; 3856 } else { 3857 /* placeholder for nr_realms (currently encoding relams) */ 3858 err = ceph_pagelist_encode_32(_pagelist, 0); 3859 if (err < 0) 3860 goto fail; 3861 } 3862 3863 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 3864 if (err) 3865 goto fail; 3866 3867 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 3868 addr = kmap_atomic(page); 3869 if (recon_state->nr_caps) { 3870 /* currently encoding caps */ 3871 *addr = cpu_to_le32(recon_state->nr_caps); 3872 } else { 3873 /* currently encoding relams */ 3874 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 3875 } 3876 kunmap_atomic(addr); 3877 3878 reply->hdr.version = cpu_to_le16(5); 3879 reply->hdr.compat_version = cpu_to_le16(4); 3880 3881 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 3882 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 3883 3884 ceph_con_send(&recon_state->session->s_con, reply); 3885 ceph_pagelist_release(recon_state->pagelist); 3886 3887 recon_state->pagelist = _pagelist; 3888 recon_state->nr_caps = 0; 3889 recon_state->nr_realms = 0; 3890 recon_state->msg_version = 5; 3891 return 0; 3892 fail: 3893 ceph_msg_put(reply); 3894 fail_msg: 3895 ceph_pagelist_release(_pagelist); 3896 return err; 3897 } 3898 3899 static struct dentry* d_find_primary(struct inode *inode) 3900 { 3901 struct dentry *alias, *dn = NULL; 3902 3903 if (hlist_empty(&inode->i_dentry)) 3904 return NULL; 3905 3906 spin_lock(&inode->i_lock); 3907 if (hlist_empty(&inode->i_dentry)) 3908 goto out_unlock; 3909 3910 if (S_ISDIR(inode->i_mode)) { 3911 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias); 3912 if (!IS_ROOT(alias)) 3913 dn = dget(alias); 3914 goto out_unlock; 3915 } 3916 3917 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) { 3918 spin_lock(&alias->d_lock); 3919 if (!d_unhashed(alias) && 3920 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 3921 dn = dget_dlock(alias); 3922 } 3923 spin_unlock(&alias->d_lock); 3924 if (dn) 3925 break; 3926 } 3927 out_unlock: 3928 spin_unlock(&inode->i_lock); 3929 return dn; 3930 } 3931 3932 /* 3933 * Encode information about a cap for a reconnect with the MDS. 3934 */ 3935 static int reconnect_caps_cb(struct inode *inode, int mds, void *arg) 3936 { 3937 union { 3938 struct ceph_mds_cap_reconnect v2; 3939 struct ceph_mds_cap_reconnect_v1 v1; 3940 } rec; 3941 struct ceph_inode_info *ci = ceph_inode(inode); 3942 struct ceph_reconnect_state *recon_state = arg; 3943 struct ceph_pagelist *pagelist = recon_state->pagelist; 3944 struct dentry *dentry; 3945 struct ceph_cap *cap; 3946 char *path; 3947 int pathlen = 0, err; 3948 u64 pathbase; 3949 u64 snap_follows; 3950 3951 dentry = d_find_primary(inode); 3952 if (dentry) { 3953 /* set pathbase to parent dir when msg_version >= 2 */ 3954 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 3955 recon_state->msg_version >= 2); 3956 dput(dentry); 3957 if (IS_ERR(path)) { 3958 err = PTR_ERR(path); 3959 goto out_err; 3960 } 3961 } else { 3962 path = NULL; 3963 pathbase = 0; 3964 } 3965 3966 spin_lock(&ci->i_ceph_lock); 3967 cap = __get_cap_for_mds(ci, mds); 3968 if (!cap) { 3969 spin_unlock(&ci->i_ceph_lock); 3970 err = 0; 3971 goto out_err; 3972 } 3973 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 3974 inode, ceph_vinop(inode), cap, cap->cap_id, 3975 ceph_cap_string(cap->issued)); 3976 3977 cap->seq = 0; /* reset cap seq */ 3978 cap->issue_seq = 0; /* and issue_seq */ 3979 cap->mseq = 0; /* and migrate_seq */ 3980 cap->cap_gen = atomic_read(&cap->session->s_cap_gen); 3981 3982 /* These are lost when the session goes away */ 3983 if (S_ISDIR(inode->i_mode)) { 3984 if (cap->issued & CEPH_CAP_DIR_CREATE) { 3985 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 3986 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 3987 } 3988 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 3989 } 3990 3991 if (recon_state->msg_version >= 2) { 3992 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 3993 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3994 rec.v2.issued = cpu_to_le32(cap->issued); 3995 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3996 rec.v2.pathbase = cpu_to_le64(pathbase); 3997 rec.v2.flock_len = (__force __le32) 3998 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 3999 } else { 4000 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 4001 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 4002 rec.v1.issued = cpu_to_le32(cap->issued); 4003 rec.v1.size = cpu_to_le64(i_size_read(inode)); 4004 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 4005 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 4006 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 4007 rec.v1.pathbase = cpu_to_le64(pathbase); 4008 } 4009 4010 if (list_empty(&ci->i_cap_snaps)) { 4011 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 4012 } else { 4013 struct ceph_cap_snap *capsnap = 4014 list_first_entry(&ci->i_cap_snaps, 4015 struct ceph_cap_snap, ci_item); 4016 snap_follows = capsnap->follows; 4017 } 4018 spin_unlock(&ci->i_ceph_lock); 4019 4020 if (recon_state->msg_version >= 2) { 4021 int num_fcntl_locks, num_flock_locks; 4022 struct ceph_filelock *flocks = NULL; 4023 size_t struct_len, total_len = sizeof(u64); 4024 u8 struct_v = 0; 4025 4026 encode_again: 4027 if (rec.v2.flock_len) { 4028 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 4029 } else { 4030 num_fcntl_locks = 0; 4031 num_flock_locks = 0; 4032 } 4033 if (num_fcntl_locks + num_flock_locks > 0) { 4034 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 4035 sizeof(struct ceph_filelock), 4036 GFP_NOFS); 4037 if (!flocks) { 4038 err = -ENOMEM; 4039 goto out_err; 4040 } 4041 err = ceph_encode_locks_to_buffer(inode, flocks, 4042 num_fcntl_locks, 4043 num_flock_locks); 4044 if (err) { 4045 kfree(flocks); 4046 flocks = NULL; 4047 if (err == -ENOSPC) 4048 goto encode_again; 4049 goto out_err; 4050 } 4051 } else { 4052 kfree(flocks); 4053 flocks = NULL; 4054 } 4055 4056 if (recon_state->msg_version >= 3) { 4057 /* version, compat_version and struct_len */ 4058 total_len += 2 * sizeof(u8) + sizeof(u32); 4059 struct_v = 2; 4060 } 4061 /* 4062 * number of encoded locks is stable, so copy to pagelist 4063 */ 4064 struct_len = 2 * sizeof(u32) + 4065 (num_fcntl_locks + num_flock_locks) * 4066 sizeof(struct ceph_filelock); 4067 rec.v2.flock_len = cpu_to_le32(struct_len); 4068 4069 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2); 4070 4071 if (struct_v >= 2) 4072 struct_len += sizeof(u64); /* snap_follows */ 4073 4074 total_len += struct_len; 4075 4076 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 4077 err = send_reconnect_partial(recon_state); 4078 if (err) 4079 goto out_freeflocks; 4080 pagelist = recon_state->pagelist; 4081 } 4082 4083 err = ceph_pagelist_reserve(pagelist, total_len); 4084 if (err) 4085 goto out_freeflocks; 4086 4087 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4088 if (recon_state->msg_version >= 3) { 4089 ceph_pagelist_encode_8(pagelist, struct_v); 4090 ceph_pagelist_encode_8(pagelist, 1); 4091 ceph_pagelist_encode_32(pagelist, struct_len); 4092 } 4093 ceph_pagelist_encode_string(pagelist, path, pathlen); 4094 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 4095 ceph_locks_to_pagelist(flocks, pagelist, 4096 num_fcntl_locks, num_flock_locks); 4097 if (struct_v >= 2) 4098 ceph_pagelist_encode_64(pagelist, snap_follows); 4099 out_freeflocks: 4100 kfree(flocks); 4101 } else { 4102 err = ceph_pagelist_reserve(pagelist, 4103 sizeof(u64) + sizeof(u32) + 4104 pathlen + sizeof(rec.v1)); 4105 if (err) 4106 goto out_err; 4107 4108 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 4109 ceph_pagelist_encode_string(pagelist, path, pathlen); 4110 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 4111 } 4112 4113 out_err: 4114 ceph_mdsc_free_path(path, pathlen); 4115 if (!err) 4116 recon_state->nr_caps++; 4117 return err; 4118 } 4119 4120 static int encode_snap_realms(struct ceph_mds_client *mdsc, 4121 struct ceph_reconnect_state *recon_state) 4122 { 4123 struct rb_node *p; 4124 struct ceph_pagelist *pagelist = recon_state->pagelist; 4125 int err = 0; 4126 4127 if (recon_state->msg_version >= 4) { 4128 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 4129 if (err < 0) 4130 goto fail; 4131 } 4132 4133 /* 4134 * snaprealms. we provide mds with the ino, seq (version), and 4135 * parent for all of our realms. If the mds has any newer info, 4136 * it will tell us. 4137 */ 4138 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 4139 struct ceph_snap_realm *realm = 4140 rb_entry(p, struct ceph_snap_realm, node); 4141 struct ceph_mds_snaprealm_reconnect sr_rec; 4142 4143 if (recon_state->msg_version >= 4) { 4144 size_t need = sizeof(u8) * 2 + sizeof(u32) + 4145 sizeof(sr_rec); 4146 4147 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 4148 err = send_reconnect_partial(recon_state); 4149 if (err) 4150 goto fail; 4151 pagelist = recon_state->pagelist; 4152 } 4153 4154 err = ceph_pagelist_reserve(pagelist, need); 4155 if (err) 4156 goto fail; 4157 4158 ceph_pagelist_encode_8(pagelist, 1); 4159 ceph_pagelist_encode_8(pagelist, 1); 4160 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 4161 } 4162 4163 dout(" adding snap realm %llx seq %lld parent %llx\n", 4164 realm->ino, realm->seq, realm->parent_ino); 4165 sr_rec.ino = cpu_to_le64(realm->ino); 4166 sr_rec.seq = cpu_to_le64(realm->seq); 4167 sr_rec.parent = cpu_to_le64(realm->parent_ino); 4168 4169 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 4170 if (err) 4171 goto fail; 4172 4173 recon_state->nr_realms++; 4174 } 4175 fail: 4176 return err; 4177 } 4178 4179 4180 /* 4181 * If an MDS fails and recovers, clients need to reconnect in order to 4182 * reestablish shared state. This includes all caps issued through 4183 * this session _and_ the snap_realm hierarchy. Because it's not 4184 * clear which snap realms the mds cares about, we send everything we 4185 * know about.. that ensures we'll then get any new info the 4186 * recovering MDS might have. 4187 * 4188 * This is a relatively heavyweight operation, but it's rare. 4189 */ 4190 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 4191 struct ceph_mds_session *session) 4192 { 4193 struct ceph_msg *reply; 4194 int mds = session->s_mds; 4195 int err = -ENOMEM; 4196 struct ceph_reconnect_state recon_state = { 4197 .session = session, 4198 }; 4199 LIST_HEAD(dispose); 4200 4201 pr_info("mds%d reconnect start\n", mds); 4202 4203 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 4204 if (!recon_state.pagelist) 4205 goto fail_nopagelist; 4206 4207 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 4208 if (!reply) 4209 goto fail_nomsg; 4210 4211 xa_destroy(&session->s_delegated_inos); 4212 4213 mutex_lock(&session->s_mutex); 4214 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 4215 session->s_seq = 0; 4216 4217 dout("session %p state %s\n", session, 4218 ceph_session_state_name(session->s_state)); 4219 4220 atomic_inc(&session->s_cap_gen); 4221 4222 spin_lock(&session->s_cap_lock); 4223 /* don't know if session is readonly */ 4224 session->s_readonly = 0; 4225 /* 4226 * notify __ceph_remove_cap() that we are composing cap reconnect. 4227 * If a cap get released before being added to the cap reconnect, 4228 * __ceph_remove_cap() should skip queuing cap release. 4229 */ 4230 session->s_cap_reconnect = 1; 4231 /* drop old cap expires; we're about to reestablish that state */ 4232 detach_cap_releases(session, &dispose); 4233 spin_unlock(&session->s_cap_lock); 4234 dispose_cap_releases(mdsc, &dispose); 4235 4236 /* trim unused caps to reduce MDS's cache rejoin time */ 4237 if (mdsc->fsc->sb->s_root) 4238 shrink_dcache_parent(mdsc->fsc->sb->s_root); 4239 4240 ceph_con_close(&session->s_con); 4241 ceph_con_open(&session->s_con, 4242 CEPH_ENTITY_TYPE_MDS, mds, 4243 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 4244 4245 /* replay unsafe requests */ 4246 replay_unsafe_requests(mdsc, session); 4247 4248 ceph_early_kick_flushing_caps(mdsc, session); 4249 4250 down_read(&mdsc->snap_rwsem); 4251 4252 /* placeholder for nr_caps */ 4253 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 4254 if (err) 4255 goto fail; 4256 4257 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 4258 recon_state.msg_version = 3; 4259 recon_state.allow_multi = true; 4260 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 4261 recon_state.msg_version = 3; 4262 } else { 4263 recon_state.msg_version = 2; 4264 } 4265 /* trsaverse this session's caps */ 4266 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 4267 4268 spin_lock(&session->s_cap_lock); 4269 session->s_cap_reconnect = 0; 4270 spin_unlock(&session->s_cap_lock); 4271 4272 if (err < 0) 4273 goto fail; 4274 4275 /* check if all realms can be encoded into current message */ 4276 if (mdsc->num_snap_realms) { 4277 size_t total_len = 4278 recon_state.pagelist->length + 4279 mdsc->num_snap_realms * 4280 sizeof(struct ceph_mds_snaprealm_reconnect); 4281 if (recon_state.msg_version >= 4) { 4282 /* number of realms */ 4283 total_len += sizeof(u32); 4284 /* version, compat_version and struct_len */ 4285 total_len += mdsc->num_snap_realms * 4286 (2 * sizeof(u8) + sizeof(u32)); 4287 } 4288 if (total_len > RECONNECT_MAX_SIZE) { 4289 if (!recon_state.allow_multi) { 4290 err = -ENOSPC; 4291 goto fail; 4292 } 4293 if (recon_state.nr_caps) { 4294 err = send_reconnect_partial(&recon_state); 4295 if (err) 4296 goto fail; 4297 } 4298 recon_state.msg_version = 5; 4299 } 4300 } 4301 4302 err = encode_snap_realms(mdsc, &recon_state); 4303 if (err < 0) 4304 goto fail; 4305 4306 if (recon_state.msg_version >= 5) { 4307 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 4308 if (err < 0) 4309 goto fail; 4310 } 4311 4312 if (recon_state.nr_caps || recon_state.nr_realms) { 4313 struct page *page = 4314 list_first_entry(&recon_state.pagelist->head, 4315 struct page, lru); 4316 __le32 *addr = kmap_atomic(page); 4317 if (recon_state.nr_caps) { 4318 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 4319 *addr = cpu_to_le32(recon_state.nr_caps); 4320 } else if (recon_state.msg_version >= 4) { 4321 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 4322 } 4323 kunmap_atomic(addr); 4324 } 4325 4326 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 4327 if (recon_state.msg_version >= 4) 4328 reply->hdr.compat_version = cpu_to_le16(4); 4329 4330 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 4331 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 4332 4333 ceph_con_send(&session->s_con, reply); 4334 4335 mutex_unlock(&session->s_mutex); 4336 4337 mutex_lock(&mdsc->mutex); 4338 __wake_requests(mdsc, &session->s_waiting); 4339 mutex_unlock(&mdsc->mutex); 4340 4341 up_read(&mdsc->snap_rwsem); 4342 ceph_pagelist_release(recon_state.pagelist); 4343 return; 4344 4345 fail: 4346 ceph_msg_put(reply); 4347 up_read(&mdsc->snap_rwsem); 4348 mutex_unlock(&session->s_mutex); 4349 fail_nomsg: 4350 ceph_pagelist_release(recon_state.pagelist); 4351 fail_nopagelist: 4352 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 4353 return; 4354 } 4355 4356 4357 /* 4358 * compare old and new mdsmaps, kicking requests 4359 * and closing out old connections as necessary 4360 * 4361 * called under mdsc->mutex. 4362 */ 4363 static void check_new_map(struct ceph_mds_client *mdsc, 4364 struct ceph_mdsmap *newmap, 4365 struct ceph_mdsmap *oldmap) 4366 { 4367 int i, j, err; 4368 int oldstate, newstate; 4369 struct ceph_mds_session *s; 4370 unsigned long targets[DIV_ROUND_UP(CEPH_MAX_MDS, sizeof(unsigned long))] = {0}; 4371 4372 dout("check_new_map new %u old %u\n", 4373 newmap->m_epoch, oldmap->m_epoch); 4374 4375 if (newmap->m_info) { 4376 for (i = 0; i < newmap->possible_max_rank; i++) { 4377 for (j = 0; j < newmap->m_info[i].num_export_targets; j++) 4378 set_bit(newmap->m_info[i].export_targets[j], targets); 4379 } 4380 } 4381 4382 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4383 if (!mdsc->sessions[i]) 4384 continue; 4385 s = mdsc->sessions[i]; 4386 oldstate = ceph_mdsmap_get_state(oldmap, i); 4387 newstate = ceph_mdsmap_get_state(newmap, i); 4388 4389 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 4390 i, ceph_mds_state_name(oldstate), 4391 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 4392 ceph_mds_state_name(newstate), 4393 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 4394 ceph_session_state_name(s->s_state)); 4395 4396 if (i >= newmap->possible_max_rank) { 4397 /* force close session for stopped mds */ 4398 ceph_get_mds_session(s); 4399 __unregister_session(mdsc, s); 4400 __wake_requests(mdsc, &s->s_waiting); 4401 mutex_unlock(&mdsc->mutex); 4402 4403 mutex_lock(&s->s_mutex); 4404 cleanup_session_requests(mdsc, s); 4405 remove_session_caps(s); 4406 mutex_unlock(&s->s_mutex); 4407 4408 ceph_put_mds_session(s); 4409 4410 mutex_lock(&mdsc->mutex); 4411 kick_requests(mdsc, i); 4412 continue; 4413 } 4414 4415 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 4416 ceph_mdsmap_get_addr(newmap, i), 4417 sizeof(struct ceph_entity_addr))) { 4418 /* just close it */ 4419 mutex_unlock(&mdsc->mutex); 4420 mutex_lock(&s->s_mutex); 4421 mutex_lock(&mdsc->mutex); 4422 ceph_con_close(&s->s_con); 4423 mutex_unlock(&s->s_mutex); 4424 s->s_state = CEPH_MDS_SESSION_RESTARTING; 4425 } else if (oldstate == newstate) { 4426 continue; /* nothing new with this mds */ 4427 } 4428 4429 /* 4430 * send reconnect? 4431 */ 4432 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 4433 newstate >= CEPH_MDS_STATE_RECONNECT) { 4434 mutex_unlock(&mdsc->mutex); 4435 clear_bit(i, targets); 4436 send_mds_reconnect(mdsc, s); 4437 mutex_lock(&mdsc->mutex); 4438 } 4439 4440 /* 4441 * kick request on any mds that has gone active. 4442 */ 4443 if (oldstate < CEPH_MDS_STATE_ACTIVE && 4444 newstate >= CEPH_MDS_STATE_ACTIVE) { 4445 if (oldstate != CEPH_MDS_STATE_CREATING && 4446 oldstate != CEPH_MDS_STATE_STARTING) 4447 pr_info("mds%d recovery completed\n", s->s_mds); 4448 kick_requests(mdsc, i); 4449 mutex_unlock(&mdsc->mutex); 4450 mutex_lock(&s->s_mutex); 4451 mutex_lock(&mdsc->mutex); 4452 ceph_kick_flushing_caps(mdsc, s); 4453 mutex_unlock(&s->s_mutex); 4454 wake_up_session_caps(s, RECONNECT); 4455 } 4456 } 4457 4458 /* 4459 * Only open and reconnect sessions that don't exist yet. 4460 */ 4461 for (i = 0; i < newmap->possible_max_rank; i++) { 4462 /* 4463 * In case the import MDS is crashed just after 4464 * the EImportStart journal is flushed, so when 4465 * a standby MDS takes over it and is replaying 4466 * the EImportStart journal the new MDS daemon 4467 * will wait the client to reconnect it, but the 4468 * client may never register/open the session yet. 4469 * 4470 * Will try to reconnect that MDS daemon if the 4471 * rank number is in the export targets array and 4472 * is the up:reconnect state. 4473 */ 4474 newstate = ceph_mdsmap_get_state(newmap, i); 4475 if (!test_bit(i, targets) || newstate != CEPH_MDS_STATE_RECONNECT) 4476 continue; 4477 4478 /* 4479 * The session maybe registered and opened by some 4480 * requests which were choosing random MDSes during 4481 * the mdsc->mutex's unlock/lock gap below in rare 4482 * case. But the related MDS daemon will just queue 4483 * that requests and be still waiting for the client's 4484 * reconnection request in up:reconnect state. 4485 */ 4486 s = __ceph_lookup_mds_session(mdsc, i); 4487 if (likely(!s)) { 4488 s = __open_export_target_session(mdsc, i); 4489 if (IS_ERR(s)) { 4490 err = PTR_ERR(s); 4491 pr_err("failed to open export target session, err %d\n", 4492 err); 4493 continue; 4494 } 4495 } 4496 dout("send reconnect to export target mds.%d\n", i); 4497 mutex_unlock(&mdsc->mutex); 4498 send_mds_reconnect(mdsc, s); 4499 ceph_put_mds_session(s); 4500 mutex_lock(&mdsc->mutex); 4501 } 4502 4503 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4504 s = mdsc->sessions[i]; 4505 if (!s) 4506 continue; 4507 if (!ceph_mdsmap_is_laggy(newmap, i)) 4508 continue; 4509 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4510 s->s_state == CEPH_MDS_SESSION_HUNG || 4511 s->s_state == CEPH_MDS_SESSION_CLOSING) { 4512 dout(" connecting to export targets of laggy mds%d\n", 4513 i); 4514 __open_export_target_sessions(mdsc, s); 4515 } 4516 } 4517 } 4518 4519 4520 4521 /* 4522 * leases 4523 */ 4524 4525 /* 4526 * caller must hold session s_mutex, dentry->d_lock 4527 */ 4528 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 4529 { 4530 struct ceph_dentry_info *di = ceph_dentry(dentry); 4531 4532 ceph_put_mds_session(di->lease_session); 4533 di->lease_session = NULL; 4534 } 4535 4536 static void handle_lease(struct ceph_mds_client *mdsc, 4537 struct ceph_mds_session *session, 4538 struct ceph_msg *msg) 4539 { 4540 struct super_block *sb = mdsc->fsc->sb; 4541 struct inode *inode; 4542 struct dentry *parent, *dentry; 4543 struct ceph_dentry_info *di; 4544 int mds = session->s_mds; 4545 struct ceph_mds_lease *h = msg->front.iov_base; 4546 u32 seq; 4547 struct ceph_vino vino; 4548 struct qstr dname; 4549 int release = 0; 4550 4551 dout("handle_lease from mds%d\n", mds); 4552 4553 /* decode */ 4554 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 4555 goto bad; 4556 vino.ino = le64_to_cpu(h->ino); 4557 vino.snap = CEPH_NOSNAP; 4558 seq = le32_to_cpu(h->seq); 4559 dname.len = get_unaligned_le32(h + 1); 4560 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 4561 goto bad; 4562 dname.name = (void *)(h + 1) + sizeof(u32); 4563 4564 /* lookup inode */ 4565 inode = ceph_find_inode(sb, vino); 4566 dout("handle_lease %s, ino %llx %p %.*s\n", 4567 ceph_lease_op_name(h->action), vino.ino, inode, 4568 dname.len, dname.name); 4569 4570 mutex_lock(&session->s_mutex); 4571 inc_session_sequence(session); 4572 4573 if (!inode) { 4574 dout("handle_lease no inode %llx\n", vino.ino); 4575 goto release; 4576 } 4577 4578 /* dentry */ 4579 parent = d_find_alias(inode); 4580 if (!parent) { 4581 dout("no parent dentry on inode %p\n", inode); 4582 WARN_ON(1); 4583 goto release; /* hrm... */ 4584 } 4585 dname.hash = full_name_hash(parent, dname.name, dname.len); 4586 dentry = d_lookup(parent, &dname); 4587 dput(parent); 4588 if (!dentry) 4589 goto release; 4590 4591 spin_lock(&dentry->d_lock); 4592 di = ceph_dentry(dentry); 4593 switch (h->action) { 4594 case CEPH_MDS_LEASE_REVOKE: 4595 if (di->lease_session == session) { 4596 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 4597 h->seq = cpu_to_le32(di->lease_seq); 4598 __ceph_mdsc_drop_dentry_lease(dentry); 4599 } 4600 release = 1; 4601 break; 4602 4603 case CEPH_MDS_LEASE_RENEW: 4604 if (di->lease_session == session && 4605 di->lease_gen == atomic_read(&session->s_cap_gen) && 4606 di->lease_renew_from && 4607 di->lease_renew_after == 0) { 4608 unsigned long duration = 4609 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 4610 4611 di->lease_seq = seq; 4612 di->time = di->lease_renew_from + duration; 4613 di->lease_renew_after = di->lease_renew_from + 4614 (duration >> 1); 4615 di->lease_renew_from = 0; 4616 } 4617 break; 4618 } 4619 spin_unlock(&dentry->d_lock); 4620 dput(dentry); 4621 4622 if (!release) 4623 goto out; 4624 4625 release: 4626 /* let's just reuse the same message */ 4627 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 4628 ceph_msg_get(msg); 4629 ceph_con_send(&session->s_con, msg); 4630 4631 out: 4632 mutex_unlock(&session->s_mutex); 4633 iput(inode); 4634 return; 4635 4636 bad: 4637 pr_err("corrupt lease message\n"); 4638 ceph_msg_dump(msg); 4639 } 4640 4641 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 4642 struct dentry *dentry, char action, 4643 u32 seq) 4644 { 4645 struct ceph_msg *msg; 4646 struct ceph_mds_lease *lease; 4647 struct inode *dir; 4648 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 4649 4650 dout("lease_send_msg identry %p %s to mds%d\n", 4651 dentry, ceph_lease_op_name(action), session->s_mds); 4652 4653 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 4654 if (!msg) 4655 return; 4656 lease = msg->front.iov_base; 4657 lease->action = action; 4658 lease->seq = cpu_to_le32(seq); 4659 4660 spin_lock(&dentry->d_lock); 4661 dir = d_inode(dentry->d_parent); 4662 lease->ino = cpu_to_le64(ceph_ino(dir)); 4663 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 4664 4665 put_unaligned_le32(dentry->d_name.len, lease + 1); 4666 memcpy((void *)(lease + 1) + 4, 4667 dentry->d_name.name, dentry->d_name.len); 4668 spin_unlock(&dentry->d_lock); 4669 4670 ceph_con_send(&session->s_con, msg); 4671 } 4672 4673 /* 4674 * lock unlock the session, to wait ongoing session activities 4675 */ 4676 static void lock_unlock_session(struct ceph_mds_session *s) 4677 { 4678 mutex_lock(&s->s_mutex); 4679 mutex_unlock(&s->s_mutex); 4680 } 4681 4682 static void maybe_recover_session(struct ceph_mds_client *mdsc) 4683 { 4684 struct ceph_fs_client *fsc = mdsc->fsc; 4685 4686 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 4687 return; 4688 4689 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 4690 return; 4691 4692 if (!READ_ONCE(fsc->blocklisted)) 4693 return; 4694 4695 pr_info("auto reconnect after blocklisted\n"); 4696 ceph_force_reconnect(fsc->sb); 4697 } 4698 4699 bool check_session_state(struct ceph_mds_session *s) 4700 { 4701 switch (s->s_state) { 4702 case CEPH_MDS_SESSION_OPEN: 4703 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 4704 s->s_state = CEPH_MDS_SESSION_HUNG; 4705 pr_info("mds%d hung\n", s->s_mds); 4706 } 4707 break; 4708 case CEPH_MDS_SESSION_CLOSING: 4709 case CEPH_MDS_SESSION_NEW: 4710 case CEPH_MDS_SESSION_RESTARTING: 4711 case CEPH_MDS_SESSION_CLOSED: 4712 case CEPH_MDS_SESSION_REJECTED: 4713 return false; 4714 } 4715 4716 return true; 4717 } 4718 4719 /* 4720 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 4721 * then we need to retransmit that request. 4722 */ 4723 void inc_session_sequence(struct ceph_mds_session *s) 4724 { 4725 lockdep_assert_held(&s->s_mutex); 4726 4727 s->s_seq++; 4728 4729 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 4730 int ret; 4731 4732 dout("resending session close request for mds%d\n", s->s_mds); 4733 ret = request_close_session(s); 4734 if (ret < 0) 4735 pr_err("unable to close session to mds%d: %d\n", 4736 s->s_mds, ret); 4737 } 4738 } 4739 4740 /* 4741 * delayed work -- periodically trim expired leases, renew caps with mds. If 4742 * the @delay parameter is set to 0 or if it's more than 5 secs, the default 4743 * workqueue delay value of 5 secs will be used. 4744 */ 4745 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay) 4746 { 4747 unsigned long max_delay = HZ * 5; 4748 4749 /* 5 secs default delay */ 4750 if (!delay || (delay > max_delay)) 4751 delay = max_delay; 4752 schedule_delayed_work(&mdsc->delayed_work, 4753 round_jiffies_relative(delay)); 4754 } 4755 4756 static void delayed_work(struct work_struct *work) 4757 { 4758 struct ceph_mds_client *mdsc = 4759 container_of(work, struct ceph_mds_client, delayed_work.work); 4760 unsigned long delay; 4761 int renew_interval; 4762 int renew_caps; 4763 int i; 4764 4765 dout("mdsc delayed_work\n"); 4766 4767 if (mdsc->stopping) 4768 return; 4769 4770 mutex_lock(&mdsc->mutex); 4771 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 4772 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 4773 mdsc->last_renew_caps); 4774 if (renew_caps) 4775 mdsc->last_renew_caps = jiffies; 4776 4777 for (i = 0; i < mdsc->max_sessions; i++) { 4778 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4779 if (!s) 4780 continue; 4781 4782 if (!check_session_state(s)) { 4783 ceph_put_mds_session(s); 4784 continue; 4785 } 4786 mutex_unlock(&mdsc->mutex); 4787 4788 mutex_lock(&s->s_mutex); 4789 if (renew_caps) 4790 send_renew_caps(mdsc, s); 4791 else 4792 ceph_con_keepalive(&s->s_con); 4793 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4794 s->s_state == CEPH_MDS_SESSION_HUNG) 4795 ceph_send_cap_releases(mdsc, s); 4796 mutex_unlock(&s->s_mutex); 4797 ceph_put_mds_session(s); 4798 4799 mutex_lock(&mdsc->mutex); 4800 } 4801 mutex_unlock(&mdsc->mutex); 4802 4803 delay = ceph_check_delayed_caps(mdsc); 4804 4805 ceph_queue_cap_reclaim_work(mdsc); 4806 4807 ceph_trim_snapid_map(mdsc); 4808 4809 maybe_recover_session(mdsc); 4810 4811 schedule_delayed(mdsc, delay); 4812 } 4813 4814 int ceph_mdsc_init(struct ceph_fs_client *fsc) 4815 4816 { 4817 struct ceph_mds_client *mdsc; 4818 int err; 4819 4820 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 4821 if (!mdsc) 4822 return -ENOMEM; 4823 mdsc->fsc = fsc; 4824 mutex_init(&mdsc->mutex); 4825 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 4826 if (!mdsc->mdsmap) { 4827 err = -ENOMEM; 4828 goto err_mdsc; 4829 } 4830 4831 init_completion(&mdsc->safe_umount_waiters); 4832 init_waitqueue_head(&mdsc->session_close_wq); 4833 INIT_LIST_HEAD(&mdsc->waiting_for_map); 4834 mdsc->quotarealms_inodes = RB_ROOT; 4835 mutex_init(&mdsc->quotarealms_inodes_mutex); 4836 init_rwsem(&mdsc->snap_rwsem); 4837 mdsc->snap_realms = RB_ROOT; 4838 INIT_LIST_HEAD(&mdsc->snap_empty); 4839 spin_lock_init(&mdsc->snap_empty_lock); 4840 mdsc->request_tree = RB_ROOT; 4841 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 4842 mdsc->last_renew_caps = jiffies; 4843 INIT_LIST_HEAD(&mdsc->cap_delay_list); 4844 INIT_LIST_HEAD(&mdsc->cap_wait_list); 4845 spin_lock_init(&mdsc->cap_delay_lock); 4846 INIT_LIST_HEAD(&mdsc->snap_flush_list); 4847 spin_lock_init(&mdsc->snap_flush_lock); 4848 mdsc->last_cap_flush_tid = 1; 4849 INIT_LIST_HEAD(&mdsc->cap_flush_list); 4850 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 4851 spin_lock_init(&mdsc->cap_dirty_lock); 4852 init_waitqueue_head(&mdsc->cap_flushing_wq); 4853 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 4854 err = ceph_metric_init(&mdsc->metric); 4855 if (err) 4856 goto err_mdsmap; 4857 4858 spin_lock_init(&mdsc->dentry_list_lock); 4859 INIT_LIST_HEAD(&mdsc->dentry_leases); 4860 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 4861 4862 ceph_caps_init(mdsc); 4863 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 4864 4865 spin_lock_init(&mdsc->snapid_map_lock); 4866 mdsc->snapid_map_tree = RB_ROOT; 4867 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 4868 4869 init_rwsem(&mdsc->pool_perm_rwsem); 4870 mdsc->pool_perm_tree = RB_ROOT; 4871 4872 strscpy(mdsc->nodename, utsname()->nodename, 4873 sizeof(mdsc->nodename)); 4874 4875 fsc->mdsc = mdsc; 4876 return 0; 4877 4878 err_mdsmap: 4879 kfree(mdsc->mdsmap); 4880 err_mdsc: 4881 kfree(mdsc); 4882 return err; 4883 } 4884 4885 /* 4886 * Wait for safe replies on open mds requests. If we time out, drop 4887 * all requests from the tree to avoid dangling dentry refs. 4888 */ 4889 static void wait_requests(struct ceph_mds_client *mdsc) 4890 { 4891 struct ceph_options *opts = mdsc->fsc->client->options; 4892 struct ceph_mds_request *req; 4893 4894 mutex_lock(&mdsc->mutex); 4895 if (__get_oldest_req(mdsc)) { 4896 mutex_unlock(&mdsc->mutex); 4897 4898 dout("wait_requests waiting for requests\n"); 4899 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 4900 ceph_timeout_jiffies(opts->mount_timeout)); 4901 4902 /* tear down remaining requests */ 4903 mutex_lock(&mdsc->mutex); 4904 while ((req = __get_oldest_req(mdsc))) { 4905 dout("wait_requests timed out on tid %llu\n", 4906 req->r_tid); 4907 list_del_init(&req->r_wait); 4908 __unregister_request(mdsc, req); 4909 } 4910 } 4911 mutex_unlock(&mdsc->mutex); 4912 dout("wait_requests done\n"); 4913 } 4914 4915 void send_flush_mdlog(struct ceph_mds_session *s) 4916 { 4917 struct ceph_msg *msg; 4918 4919 /* 4920 * Pre-luminous MDS crashes when it sees an unknown session request 4921 */ 4922 if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS)) 4923 return; 4924 4925 mutex_lock(&s->s_mutex); 4926 dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds, 4927 ceph_session_state_name(s->s_state), s->s_seq); 4928 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG, 4929 s->s_seq); 4930 if (!msg) { 4931 pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n", 4932 s->s_mds, ceph_session_state_name(s->s_state), s->s_seq); 4933 } else { 4934 ceph_con_send(&s->s_con, msg); 4935 } 4936 mutex_unlock(&s->s_mutex); 4937 } 4938 4939 /* 4940 * called before mount is ro, and before dentries are torn down. 4941 * (hmm, does this still race with new lookups?) 4942 */ 4943 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 4944 { 4945 dout("pre_umount\n"); 4946 mdsc->stopping = 1; 4947 4948 ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true); 4949 ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false); 4950 ceph_flush_dirty_caps(mdsc); 4951 wait_requests(mdsc); 4952 4953 /* 4954 * wait for reply handlers to drop their request refs and 4955 * their inode/dcache refs 4956 */ 4957 ceph_msgr_flush(); 4958 4959 ceph_cleanup_quotarealms_inodes(mdsc); 4960 } 4961 4962 /* 4963 * flush the mdlog and wait for all write mds requests to flush. 4964 */ 4965 static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc, 4966 u64 want_tid) 4967 { 4968 struct ceph_mds_request *req = NULL, *nextreq; 4969 struct ceph_mds_session *last_session = NULL; 4970 struct rb_node *n; 4971 4972 mutex_lock(&mdsc->mutex); 4973 dout("%s want %lld\n", __func__, want_tid); 4974 restart: 4975 req = __get_oldest_req(mdsc); 4976 while (req && req->r_tid <= want_tid) { 4977 /* find next request */ 4978 n = rb_next(&req->r_node); 4979 if (n) 4980 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 4981 else 4982 nextreq = NULL; 4983 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 4984 (req->r_op & CEPH_MDS_OP_WRITE)) { 4985 struct ceph_mds_session *s = req->r_session; 4986 4987 if (!s) { 4988 req = nextreq; 4989 continue; 4990 } 4991 4992 /* write op */ 4993 ceph_mdsc_get_request(req); 4994 if (nextreq) 4995 ceph_mdsc_get_request(nextreq); 4996 s = ceph_get_mds_session(s); 4997 mutex_unlock(&mdsc->mutex); 4998 4999 /* send flush mdlog request to MDS */ 5000 if (last_session != s) { 5001 send_flush_mdlog(s); 5002 ceph_put_mds_session(last_session); 5003 last_session = s; 5004 } else { 5005 ceph_put_mds_session(s); 5006 } 5007 dout("%s wait on %llu (want %llu)\n", __func__, 5008 req->r_tid, want_tid); 5009 wait_for_completion(&req->r_safe_completion); 5010 5011 mutex_lock(&mdsc->mutex); 5012 ceph_mdsc_put_request(req); 5013 if (!nextreq) 5014 break; /* next dne before, so we're done! */ 5015 if (RB_EMPTY_NODE(&nextreq->r_node)) { 5016 /* next request was removed from tree */ 5017 ceph_mdsc_put_request(nextreq); 5018 goto restart; 5019 } 5020 ceph_mdsc_put_request(nextreq); /* won't go away */ 5021 } 5022 req = nextreq; 5023 } 5024 mutex_unlock(&mdsc->mutex); 5025 ceph_put_mds_session(last_session); 5026 dout("%s done\n", __func__); 5027 } 5028 5029 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 5030 { 5031 u64 want_tid, want_flush; 5032 5033 if (READ_ONCE(mdsc->fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) 5034 return; 5035 5036 dout("sync\n"); 5037 mutex_lock(&mdsc->mutex); 5038 want_tid = mdsc->last_tid; 5039 mutex_unlock(&mdsc->mutex); 5040 5041 ceph_flush_dirty_caps(mdsc); 5042 spin_lock(&mdsc->cap_dirty_lock); 5043 want_flush = mdsc->last_cap_flush_tid; 5044 if (!list_empty(&mdsc->cap_flush_list)) { 5045 struct ceph_cap_flush *cf = 5046 list_last_entry(&mdsc->cap_flush_list, 5047 struct ceph_cap_flush, g_list); 5048 cf->wake = true; 5049 } 5050 spin_unlock(&mdsc->cap_dirty_lock); 5051 5052 dout("sync want tid %lld flush_seq %lld\n", 5053 want_tid, want_flush); 5054 5055 flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid); 5056 wait_caps_flush(mdsc, want_flush); 5057 } 5058 5059 /* 5060 * true if all sessions are closed, or we force unmount 5061 */ 5062 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 5063 { 5064 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 5065 return true; 5066 return atomic_read(&mdsc->num_sessions) <= skipped; 5067 } 5068 5069 /* 5070 * called after sb is ro or when metadata corrupted. 5071 */ 5072 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 5073 { 5074 struct ceph_options *opts = mdsc->fsc->client->options; 5075 struct ceph_mds_session *session; 5076 int i; 5077 int skipped = 0; 5078 5079 dout("close_sessions\n"); 5080 5081 /* close sessions */ 5082 mutex_lock(&mdsc->mutex); 5083 for (i = 0; i < mdsc->max_sessions; i++) { 5084 session = __ceph_lookup_mds_session(mdsc, i); 5085 if (!session) 5086 continue; 5087 mutex_unlock(&mdsc->mutex); 5088 mutex_lock(&session->s_mutex); 5089 if (__close_session(mdsc, session) <= 0) 5090 skipped++; 5091 mutex_unlock(&session->s_mutex); 5092 ceph_put_mds_session(session); 5093 mutex_lock(&mdsc->mutex); 5094 } 5095 mutex_unlock(&mdsc->mutex); 5096 5097 dout("waiting for sessions to close\n"); 5098 wait_event_timeout(mdsc->session_close_wq, 5099 done_closing_sessions(mdsc, skipped), 5100 ceph_timeout_jiffies(opts->mount_timeout)); 5101 5102 /* tear down remaining sessions */ 5103 mutex_lock(&mdsc->mutex); 5104 for (i = 0; i < mdsc->max_sessions; i++) { 5105 if (mdsc->sessions[i]) { 5106 session = ceph_get_mds_session(mdsc->sessions[i]); 5107 __unregister_session(mdsc, session); 5108 mutex_unlock(&mdsc->mutex); 5109 mutex_lock(&session->s_mutex); 5110 remove_session_caps(session); 5111 mutex_unlock(&session->s_mutex); 5112 ceph_put_mds_session(session); 5113 mutex_lock(&mdsc->mutex); 5114 } 5115 } 5116 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 5117 mutex_unlock(&mdsc->mutex); 5118 5119 ceph_cleanup_snapid_map(mdsc); 5120 ceph_cleanup_global_and_empty_realms(mdsc); 5121 5122 cancel_work_sync(&mdsc->cap_reclaim_work); 5123 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 5124 5125 dout("stopped\n"); 5126 } 5127 5128 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 5129 { 5130 struct ceph_mds_session *session; 5131 int mds; 5132 5133 dout("force umount\n"); 5134 5135 mutex_lock(&mdsc->mutex); 5136 for (mds = 0; mds < mdsc->max_sessions; mds++) { 5137 session = __ceph_lookup_mds_session(mdsc, mds); 5138 if (!session) 5139 continue; 5140 5141 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 5142 __unregister_session(mdsc, session); 5143 __wake_requests(mdsc, &session->s_waiting); 5144 mutex_unlock(&mdsc->mutex); 5145 5146 mutex_lock(&session->s_mutex); 5147 __close_session(mdsc, session); 5148 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 5149 cleanup_session_requests(mdsc, session); 5150 remove_session_caps(session); 5151 } 5152 mutex_unlock(&session->s_mutex); 5153 ceph_put_mds_session(session); 5154 5155 mutex_lock(&mdsc->mutex); 5156 kick_requests(mdsc, mds); 5157 } 5158 __wake_requests(mdsc, &mdsc->waiting_for_map); 5159 mutex_unlock(&mdsc->mutex); 5160 } 5161 5162 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 5163 { 5164 dout("stop\n"); 5165 /* 5166 * Make sure the delayed work stopped before releasing 5167 * the resources. 5168 * 5169 * Because the cancel_delayed_work_sync() will only 5170 * guarantee that the work finishes executing. But the 5171 * delayed work will re-arm itself again after that. 5172 */ 5173 flush_delayed_work(&mdsc->delayed_work); 5174 5175 if (mdsc->mdsmap) 5176 ceph_mdsmap_destroy(mdsc->mdsmap); 5177 kfree(mdsc->sessions); 5178 ceph_caps_finalize(mdsc); 5179 ceph_pool_perm_destroy(mdsc); 5180 } 5181 5182 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 5183 { 5184 struct ceph_mds_client *mdsc = fsc->mdsc; 5185 dout("mdsc_destroy %p\n", mdsc); 5186 5187 if (!mdsc) 5188 return; 5189 5190 /* flush out any connection work with references to us */ 5191 ceph_msgr_flush(); 5192 5193 ceph_mdsc_stop(mdsc); 5194 5195 ceph_metric_destroy(&mdsc->metric); 5196 5197 fsc->mdsc = NULL; 5198 kfree(mdsc); 5199 dout("mdsc_destroy %p done\n", mdsc); 5200 } 5201 5202 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 5203 { 5204 struct ceph_fs_client *fsc = mdsc->fsc; 5205 const char *mds_namespace = fsc->mount_options->mds_namespace; 5206 void *p = msg->front.iov_base; 5207 void *end = p + msg->front.iov_len; 5208 u32 epoch; 5209 u32 num_fs; 5210 u32 mount_fscid = (u32)-1; 5211 int err = -EINVAL; 5212 5213 ceph_decode_need(&p, end, sizeof(u32), bad); 5214 epoch = ceph_decode_32(&p); 5215 5216 dout("handle_fsmap epoch %u\n", epoch); 5217 5218 /* struct_v, struct_cv, map_len, epoch, legacy_client_fscid */ 5219 ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 3, bad); 5220 5221 ceph_decode_32_safe(&p, end, num_fs, bad); 5222 while (num_fs-- > 0) { 5223 void *info_p, *info_end; 5224 u32 info_len; 5225 u32 fscid, namelen; 5226 5227 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 5228 p += 2; // info_v, info_cv 5229 info_len = ceph_decode_32(&p); 5230 ceph_decode_need(&p, end, info_len, bad); 5231 info_p = p; 5232 info_end = p + info_len; 5233 p = info_end; 5234 5235 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 5236 fscid = ceph_decode_32(&info_p); 5237 namelen = ceph_decode_32(&info_p); 5238 ceph_decode_need(&info_p, info_end, namelen, bad); 5239 5240 if (mds_namespace && 5241 strlen(mds_namespace) == namelen && 5242 !strncmp(mds_namespace, (char *)info_p, namelen)) { 5243 mount_fscid = fscid; 5244 break; 5245 } 5246 } 5247 5248 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 5249 if (mount_fscid != (u32)-1) { 5250 fsc->client->monc.fs_cluster_id = mount_fscid; 5251 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 5252 0, true); 5253 ceph_monc_renew_subs(&fsc->client->monc); 5254 } else { 5255 err = -ENOENT; 5256 goto err_out; 5257 } 5258 return; 5259 5260 bad: 5261 pr_err("error decoding fsmap %d. Shutting down mount.\n", err); 5262 ceph_umount_begin(mdsc->fsc->sb); 5263 ceph_msg_dump(msg); 5264 err_out: 5265 mutex_lock(&mdsc->mutex); 5266 mdsc->mdsmap_err = err; 5267 __wake_requests(mdsc, &mdsc->waiting_for_map); 5268 mutex_unlock(&mdsc->mutex); 5269 } 5270 5271 /* 5272 * handle mds map update. 5273 */ 5274 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 5275 { 5276 u32 epoch; 5277 u32 maplen; 5278 void *p = msg->front.iov_base; 5279 void *end = p + msg->front.iov_len; 5280 struct ceph_mdsmap *newmap, *oldmap; 5281 struct ceph_fsid fsid; 5282 int err = -EINVAL; 5283 5284 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 5285 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 5286 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 5287 return; 5288 epoch = ceph_decode_32(&p); 5289 maplen = ceph_decode_32(&p); 5290 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 5291 5292 /* do we need it? */ 5293 mutex_lock(&mdsc->mutex); 5294 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 5295 dout("handle_map epoch %u <= our %u\n", 5296 epoch, mdsc->mdsmap->m_epoch); 5297 mutex_unlock(&mdsc->mutex); 5298 return; 5299 } 5300 5301 newmap = ceph_mdsmap_decode(&p, end, ceph_msgr2(mdsc->fsc->client)); 5302 if (IS_ERR(newmap)) { 5303 err = PTR_ERR(newmap); 5304 goto bad_unlock; 5305 } 5306 5307 /* swap into place */ 5308 if (mdsc->mdsmap) { 5309 oldmap = mdsc->mdsmap; 5310 mdsc->mdsmap = newmap; 5311 check_new_map(mdsc, newmap, oldmap); 5312 ceph_mdsmap_destroy(oldmap); 5313 } else { 5314 mdsc->mdsmap = newmap; /* first mds map */ 5315 } 5316 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 5317 MAX_LFS_FILESIZE); 5318 5319 __wake_requests(mdsc, &mdsc->waiting_for_map); 5320 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 5321 mdsc->mdsmap->m_epoch); 5322 5323 mutex_unlock(&mdsc->mutex); 5324 schedule_delayed(mdsc, 0); 5325 return; 5326 5327 bad_unlock: 5328 mutex_unlock(&mdsc->mutex); 5329 bad: 5330 pr_err("error decoding mdsmap %d. Shutting down mount.\n", err); 5331 ceph_umount_begin(mdsc->fsc->sb); 5332 ceph_msg_dump(msg); 5333 return; 5334 } 5335 5336 static struct ceph_connection *mds_get_con(struct ceph_connection *con) 5337 { 5338 struct ceph_mds_session *s = con->private; 5339 5340 if (ceph_get_mds_session(s)) 5341 return con; 5342 return NULL; 5343 } 5344 5345 static void mds_put_con(struct ceph_connection *con) 5346 { 5347 struct ceph_mds_session *s = con->private; 5348 5349 ceph_put_mds_session(s); 5350 } 5351 5352 /* 5353 * if the client is unresponsive for long enough, the mds will kill 5354 * the session entirely. 5355 */ 5356 static void mds_peer_reset(struct ceph_connection *con) 5357 { 5358 struct ceph_mds_session *s = con->private; 5359 struct ceph_mds_client *mdsc = s->s_mdsc; 5360 5361 pr_warn("mds%d closed our session\n", s->s_mds); 5362 if (READ_ONCE(mdsc->fsc->mount_state) != CEPH_MOUNT_FENCE_IO) 5363 send_mds_reconnect(mdsc, s); 5364 } 5365 5366 static void mds_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 5367 { 5368 struct ceph_mds_session *s = con->private; 5369 struct ceph_mds_client *mdsc = s->s_mdsc; 5370 int type = le16_to_cpu(msg->hdr.type); 5371 5372 mutex_lock(&mdsc->mutex); 5373 if (__verify_registered_session(mdsc, s) < 0) { 5374 mutex_unlock(&mdsc->mutex); 5375 goto out; 5376 } 5377 mutex_unlock(&mdsc->mutex); 5378 5379 switch (type) { 5380 case CEPH_MSG_MDS_MAP: 5381 ceph_mdsc_handle_mdsmap(mdsc, msg); 5382 break; 5383 case CEPH_MSG_FS_MAP_USER: 5384 ceph_mdsc_handle_fsmap(mdsc, msg); 5385 break; 5386 case CEPH_MSG_CLIENT_SESSION: 5387 handle_session(s, msg); 5388 break; 5389 case CEPH_MSG_CLIENT_REPLY: 5390 handle_reply(s, msg); 5391 break; 5392 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 5393 handle_forward(mdsc, s, msg); 5394 break; 5395 case CEPH_MSG_CLIENT_CAPS: 5396 ceph_handle_caps(s, msg); 5397 break; 5398 case CEPH_MSG_CLIENT_SNAP: 5399 ceph_handle_snap(mdsc, s, msg); 5400 break; 5401 case CEPH_MSG_CLIENT_LEASE: 5402 handle_lease(mdsc, s, msg); 5403 break; 5404 case CEPH_MSG_CLIENT_QUOTA: 5405 ceph_handle_quota(mdsc, s, msg); 5406 break; 5407 5408 default: 5409 pr_err("received unknown message type %d %s\n", type, 5410 ceph_msg_type_name(type)); 5411 } 5412 out: 5413 ceph_msg_put(msg); 5414 } 5415 5416 /* 5417 * authentication 5418 */ 5419 5420 /* 5421 * Note: returned pointer is the address of a structure that's 5422 * managed separately. Caller must *not* attempt to free it. 5423 */ 5424 static struct ceph_auth_handshake * 5425 mds_get_authorizer(struct ceph_connection *con, int *proto, int force_new) 5426 { 5427 struct ceph_mds_session *s = con->private; 5428 struct ceph_mds_client *mdsc = s->s_mdsc; 5429 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5430 struct ceph_auth_handshake *auth = &s->s_auth; 5431 int ret; 5432 5433 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5434 force_new, proto, NULL, NULL); 5435 if (ret) 5436 return ERR_PTR(ret); 5437 5438 return auth; 5439 } 5440 5441 static int mds_add_authorizer_challenge(struct ceph_connection *con, 5442 void *challenge_buf, int challenge_buf_len) 5443 { 5444 struct ceph_mds_session *s = con->private; 5445 struct ceph_mds_client *mdsc = s->s_mdsc; 5446 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5447 5448 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 5449 challenge_buf, challenge_buf_len); 5450 } 5451 5452 static int mds_verify_authorizer_reply(struct ceph_connection *con) 5453 { 5454 struct ceph_mds_session *s = con->private; 5455 struct ceph_mds_client *mdsc = s->s_mdsc; 5456 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5457 struct ceph_auth_handshake *auth = &s->s_auth; 5458 5459 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer, 5460 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len, 5461 NULL, NULL, NULL, NULL); 5462 } 5463 5464 static int mds_invalidate_authorizer(struct ceph_connection *con) 5465 { 5466 struct ceph_mds_session *s = con->private; 5467 struct ceph_mds_client *mdsc = s->s_mdsc; 5468 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5469 5470 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 5471 5472 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 5473 } 5474 5475 static int mds_get_auth_request(struct ceph_connection *con, 5476 void *buf, int *buf_len, 5477 void **authorizer, int *authorizer_len) 5478 { 5479 struct ceph_mds_session *s = con->private; 5480 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5481 struct ceph_auth_handshake *auth = &s->s_auth; 5482 int ret; 5483 5484 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_MDS, 5485 buf, buf_len); 5486 if (ret) 5487 return ret; 5488 5489 *authorizer = auth->authorizer_buf; 5490 *authorizer_len = auth->authorizer_buf_len; 5491 return 0; 5492 } 5493 5494 static int mds_handle_auth_reply_more(struct ceph_connection *con, 5495 void *reply, int reply_len, 5496 void *buf, int *buf_len, 5497 void **authorizer, int *authorizer_len) 5498 { 5499 struct ceph_mds_session *s = con->private; 5500 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5501 struct ceph_auth_handshake *auth = &s->s_auth; 5502 int ret; 5503 5504 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len, 5505 buf, buf_len); 5506 if (ret) 5507 return ret; 5508 5509 *authorizer = auth->authorizer_buf; 5510 *authorizer_len = auth->authorizer_buf_len; 5511 return 0; 5512 } 5513 5514 static int mds_handle_auth_done(struct ceph_connection *con, 5515 u64 global_id, void *reply, int reply_len, 5516 u8 *session_key, int *session_key_len, 5517 u8 *con_secret, int *con_secret_len) 5518 { 5519 struct ceph_mds_session *s = con->private; 5520 struct ceph_auth_client *ac = s->s_mdsc->fsc->client->monc.auth; 5521 struct ceph_auth_handshake *auth = &s->s_auth; 5522 5523 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len, 5524 session_key, session_key_len, 5525 con_secret, con_secret_len); 5526 } 5527 5528 static int mds_handle_auth_bad_method(struct ceph_connection *con, 5529 int used_proto, int result, 5530 const int *allowed_protos, int proto_cnt, 5531 const int *allowed_modes, int mode_cnt) 5532 { 5533 struct ceph_mds_session *s = con->private; 5534 struct ceph_mon_client *monc = &s->s_mdsc->fsc->client->monc; 5535 int ret; 5536 5537 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_MDS, 5538 used_proto, result, 5539 allowed_protos, proto_cnt, 5540 allowed_modes, mode_cnt)) { 5541 ret = ceph_monc_validate_auth(monc); 5542 if (ret) 5543 return ret; 5544 } 5545 5546 return -EACCES; 5547 } 5548 5549 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 5550 struct ceph_msg_header *hdr, int *skip) 5551 { 5552 struct ceph_msg *msg; 5553 int type = (int) le16_to_cpu(hdr->type); 5554 int front_len = (int) le32_to_cpu(hdr->front_len); 5555 5556 if (con->in_msg) 5557 return con->in_msg; 5558 5559 *skip = 0; 5560 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 5561 if (!msg) { 5562 pr_err("unable to allocate msg type %d len %d\n", 5563 type, front_len); 5564 return NULL; 5565 } 5566 5567 return msg; 5568 } 5569 5570 static int mds_sign_message(struct ceph_msg *msg) 5571 { 5572 struct ceph_mds_session *s = msg->con->private; 5573 struct ceph_auth_handshake *auth = &s->s_auth; 5574 5575 return ceph_auth_sign_message(auth, msg); 5576 } 5577 5578 static int mds_check_message_signature(struct ceph_msg *msg) 5579 { 5580 struct ceph_mds_session *s = msg->con->private; 5581 struct ceph_auth_handshake *auth = &s->s_auth; 5582 5583 return ceph_auth_check_message_signature(auth, msg); 5584 } 5585 5586 static const struct ceph_connection_operations mds_con_ops = { 5587 .get = mds_get_con, 5588 .put = mds_put_con, 5589 .alloc_msg = mds_alloc_msg, 5590 .dispatch = mds_dispatch, 5591 .peer_reset = mds_peer_reset, 5592 .get_authorizer = mds_get_authorizer, 5593 .add_authorizer_challenge = mds_add_authorizer_challenge, 5594 .verify_authorizer_reply = mds_verify_authorizer_reply, 5595 .invalidate_authorizer = mds_invalidate_authorizer, 5596 .sign_message = mds_sign_message, 5597 .check_message_signature = mds_check_message_signature, 5598 .get_auth_request = mds_get_auth_request, 5599 .handle_auth_reply_more = mds_handle_auth_reply_more, 5600 .handle_auth_done = mds_handle_auth_done, 5601 .handle_auth_bad_method = mds_handle_auth_bad_method, 5602 }; 5603 5604 /* eof */ 5605