1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/fs.h> 5 #include <linux/wait.h> 6 #include <linux/slab.h> 7 #include <linux/gfp.h> 8 #include <linux/sched.h> 9 #include <linux/debugfs.h> 10 #include <linux/seq_file.h> 11 #include <linux/ratelimit.h> 12 #include <linux/bits.h> 13 #include <linux/ktime.h> 14 15 #include "super.h" 16 #include "mds_client.h" 17 18 #include <linux/ceph/ceph_features.h> 19 #include <linux/ceph/messenger.h> 20 #include <linux/ceph/decode.h> 21 #include <linux/ceph/pagelist.h> 22 #include <linux/ceph/auth.h> 23 #include <linux/ceph/debugfs.h> 24 25 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE) 26 27 /* 28 * A cluster of MDS (metadata server) daemons is responsible for 29 * managing the file system namespace (the directory hierarchy and 30 * inodes) and for coordinating shared access to storage. Metadata is 31 * partitioning hierarchically across a number of servers, and that 32 * partition varies over time as the cluster adjusts the distribution 33 * in order to balance load. 34 * 35 * The MDS client is primarily responsible to managing synchronous 36 * metadata requests for operations like open, unlink, and so forth. 37 * If there is a MDS failure, we find out about it when we (possibly 38 * request and) receive a new MDS map, and can resubmit affected 39 * requests. 40 * 41 * For the most part, though, we take advantage of a lossless 42 * communications channel to the MDS, and do not need to worry about 43 * timing out or resubmitting requests. 44 * 45 * We maintain a stateful "session" with each MDS we interact with. 46 * Within each session, we sent periodic heartbeat messages to ensure 47 * any capabilities or leases we have been issues remain valid. If 48 * the session times out and goes stale, our leases and capabilities 49 * are no longer valid. 50 */ 51 52 struct ceph_reconnect_state { 53 struct ceph_mds_session *session; 54 int nr_caps, nr_realms; 55 struct ceph_pagelist *pagelist; 56 unsigned msg_version; 57 bool allow_multi; 58 }; 59 60 static void __wake_requests(struct ceph_mds_client *mdsc, 61 struct list_head *head); 62 static void ceph_cap_release_work(struct work_struct *work); 63 static void ceph_cap_reclaim_work(struct work_struct *work); 64 65 static const struct ceph_connection_operations mds_con_ops; 66 67 68 /* 69 * mds reply parsing 70 */ 71 72 static int parse_reply_info_quota(void **p, void *end, 73 struct ceph_mds_reply_info_in *info) 74 { 75 u8 struct_v, struct_compat; 76 u32 struct_len; 77 78 ceph_decode_8_safe(p, end, struct_v, bad); 79 ceph_decode_8_safe(p, end, struct_compat, bad); 80 /* struct_v is expected to be >= 1. we only 81 * understand encoding with struct_compat == 1. */ 82 if (!struct_v || struct_compat != 1) 83 goto bad; 84 ceph_decode_32_safe(p, end, struct_len, bad); 85 ceph_decode_need(p, end, struct_len, bad); 86 end = *p + struct_len; 87 ceph_decode_64_safe(p, end, info->max_bytes, bad); 88 ceph_decode_64_safe(p, end, info->max_files, bad); 89 *p = end; 90 return 0; 91 bad: 92 return -EIO; 93 } 94 95 /* 96 * parse individual inode info 97 */ 98 static int parse_reply_info_in(void **p, void *end, 99 struct ceph_mds_reply_info_in *info, 100 u64 features) 101 { 102 int err = 0; 103 u8 struct_v = 0; 104 105 if (features == (u64)-1) { 106 u32 struct_len; 107 u8 struct_compat; 108 ceph_decode_8_safe(p, end, struct_v, bad); 109 ceph_decode_8_safe(p, end, struct_compat, bad); 110 /* struct_v is expected to be >= 1. we only understand 111 * encoding with struct_compat == 1. */ 112 if (!struct_v || struct_compat != 1) 113 goto bad; 114 ceph_decode_32_safe(p, end, struct_len, bad); 115 ceph_decode_need(p, end, struct_len, bad); 116 end = *p + struct_len; 117 } 118 119 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad); 120 info->in = *p; 121 *p += sizeof(struct ceph_mds_reply_inode) + 122 sizeof(*info->in->fragtree.splits) * 123 le32_to_cpu(info->in->fragtree.nsplits); 124 125 ceph_decode_32_safe(p, end, info->symlink_len, bad); 126 ceph_decode_need(p, end, info->symlink_len, bad); 127 info->symlink = *p; 128 *p += info->symlink_len; 129 130 ceph_decode_copy_safe(p, end, &info->dir_layout, 131 sizeof(info->dir_layout), bad); 132 ceph_decode_32_safe(p, end, info->xattr_len, bad); 133 ceph_decode_need(p, end, info->xattr_len, bad); 134 info->xattr_data = *p; 135 *p += info->xattr_len; 136 137 if (features == (u64)-1) { 138 /* inline data */ 139 ceph_decode_64_safe(p, end, info->inline_version, bad); 140 ceph_decode_32_safe(p, end, info->inline_len, bad); 141 ceph_decode_need(p, end, info->inline_len, bad); 142 info->inline_data = *p; 143 *p += info->inline_len; 144 /* quota */ 145 err = parse_reply_info_quota(p, end, info); 146 if (err < 0) 147 goto out_bad; 148 /* pool namespace */ 149 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 150 if (info->pool_ns_len > 0) { 151 ceph_decode_need(p, end, info->pool_ns_len, bad); 152 info->pool_ns_data = *p; 153 *p += info->pool_ns_len; 154 } 155 156 /* btime */ 157 ceph_decode_need(p, end, sizeof(info->btime), bad); 158 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 159 160 /* change attribute */ 161 ceph_decode_64_safe(p, end, info->change_attr, bad); 162 163 /* dir pin */ 164 if (struct_v >= 2) { 165 ceph_decode_32_safe(p, end, info->dir_pin, bad); 166 } else { 167 info->dir_pin = -ENODATA; 168 } 169 170 /* snapshot birth time, remains zero for v<=2 */ 171 if (struct_v >= 3) { 172 ceph_decode_need(p, end, sizeof(info->snap_btime), bad); 173 ceph_decode_copy(p, &info->snap_btime, 174 sizeof(info->snap_btime)); 175 } else { 176 memset(&info->snap_btime, 0, sizeof(info->snap_btime)); 177 } 178 179 *p = end; 180 } else { 181 if (features & CEPH_FEATURE_MDS_INLINE_DATA) { 182 ceph_decode_64_safe(p, end, info->inline_version, bad); 183 ceph_decode_32_safe(p, end, info->inline_len, bad); 184 ceph_decode_need(p, end, info->inline_len, bad); 185 info->inline_data = *p; 186 *p += info->inline_len; 187 } else 188 info->inline_version = CEPH_INLINE_NONE; 189 190 if (features & CEPH_FEATURE_MDS_QUOTA) { 191 err = parse_reply_info_quota(p, end, info); 192 if (err < 0) 193 goto out_bad; 194 } else { 195 info->max_bytes = 0; 196 info->max_files = 0; 197 } 198 199 info->pool_ns_len = 0; 200 info->pool_ns_data = NULL; 201 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) { 202 ceph_decode_32_safe(p, end, info->pool_ns_len, bad); 203 if (info->pool_ns_len > 0) { 204 ceph_decode_need(p, end, info->pool_ns_len, bad); 205 info->pool_ns_data = *p; 206 *p += info->pool_ns_len; 207 } 208 } 209 210 if (features & CEPH_FEATURE_FS_BTIME) { 211 ceph_decode_need(p, end, sizeof(info->btime), bad); 212 ceph_decode_copy(p, &info->btime, sizeof(info->btime)); 213 ceph_decode_64_safe(p, end, info->change_attr, bad); 214 } 215 216 info->dir_pin = -ENODATA; 217 /* info->snap_btime remains zero */ 218 } 219 return 0; 220 bad: 221 err = -EIO; 222 out_bad: 223 return err; 224 } 225 226 static int parse_reply_info_dir(void **p, void *end, 227 struct ceph_mds_reply_dirfrag **dirfrag, 228 u64 features) 229 { 230 if (features == (u64)-1) { 231 u8 struct_v, struct_compat; 232 u32 struct_len; 233 ceph_decode_8_safe(p, end, struct_v, bad); 234 ceph_decode_8_safe(p, end, struct_compat, bad); 235 /* struct_v is expected to be >= 1. we only understand 236 * encoding whose struct_compat == 1. */ 237 if (!struct_v || struct_compat != 1) 238 goto bad; 239 ceph_decode_32_safe(p, end, struct_len, bad); 240 ceph_decode_need(p, end, struct_len, bad); 241 end = *p + struct_len; 242 } 243 244 ceph_decode_need(p, end, sizeof(**dirfrag), bad); 245 *dirfrag = *p; 246 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist); 247 if (unlikely(*p > end)) 248 goto bad; 249 if (features == (u64)-1) 250 *p = end; 251 return 0; 252 bad: 253 return -EIO; 254 } 255 256 static int parse_reply_info_lease(void **p, void *end, 257 struct ceph_mds_reply_lease **lease, 258 u64 features) 259 { 260 if (features == (u64)-1) { 261 u8 struct_v, struct_compat; 262 u32 struct_len; 263 ceph_decode_8_safe(p, end, struct_v, bad); 264 ceph_decode_8_safe(p, end, struct_compat, bad); 265 /* struct_v is expected to be >= 1. we only understand 266 * encoding whose struct_compat == 1. */ 267 if (!struct_v || struct_compat != 1) 268 goto bad; 269 ceph_decode_32_safe(p, end, struct_len, bad); 270 ceph_decode_need(p, end, struct_len, bad); 271 end = *p + struct_len; 272 } 273 274 ceph_decode_need(p, end, sizeof(**lease), bad); 275 *lease = *p; 276 *p += sizeof(**lease); 277 if (features == (u64)-1) 278 *p = end; 279 return 0; 280 bad: 281 return -EIO; 282 } 283 284 /* 285 * parse a normal reply, which may contain a (dir+)dentry and/or a 286 * target inode. 287 */ 288 static int parse_reply_info_trace(void **p, void *end, 289 struct ceph_mds_reply_info_parsed *info, 290 u64 features) 291 { 292 int err; 293 294 if (info->head->is_dentry) { 295 err = parse_reply_info_in(p, end, &info->diri, features); 296 if (err < 0) 297 goto out_bad; 298 299 err = parse_reply_info_dir(p, end, &info->dirfrag, features); 300 if (err < 0) 301 goto out_bad; 302 303 ceph_decode_32_safe(p, end, info->dname_len, bad); 304 ceph_decode_need(p, end, info->dname_len, bad); 305 info->dname = *p; 306 *p += info->dname_len; 307 308 err = parse_reply_info_lease(p, end, &info->dlease, features); 309 if (err < 0) 310 goto out_bad; 311 } 312 313 if (info->head->is_target) { 314 err = parse_reply_info_in(p, end, &info->targeti, features); 315 if (err < 0) 316 goto out_bad; 317 } 318 319 if (unlikely(*p != end)) 320 goto bad; 321 return 0; 322 323 bad: 324 err = -EIO; 325 out_bad: 326 pr_err("problem parsing mds trace %d\n", err); 327 return err; 328 } 329 330 /* 331 * parse readdir results 332 */ 333 static int parse_reply_info_readdir(void **p, void *end, 334 struct ceph_mds_reply_info_parsed *info, 335 u64 features) 336 { 337 u32 num, i = 0; 338 int err; 339 340 err = parse_reply_info_dir(p, end, &info->dir_dir, features); 341 if (err < 0) 342 goto out_bad; 343 344 ceph_decode_need(p, end, sizeof(num) + 2, bad); 345 num = ceph_decode_32(p); 346 { 347 u16 flags = ceph_decode_16(p); 348 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); 349 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); 350 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); 351 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH); 352 } 353 if (num == 0) 354 goto done; 355 356 BUG_ON(!info->dir_entries); 357 if ((unsigned long)(info->dir_entries + num) > 358 (unsigned long)info->dir_entries + info->dir_buf_size) { 359 pr_err("dir contents are larger than expected\n"); 360 WARN_ON(1); 361 goto bad; 362 } 363 364 info->dir_nr = num; 365 while (num) { 366 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i; 367 /* dentry */ 368 ceph_decode_32_safe(p, end, rde->name_len, bad); 369 ceph_decode_need(p, end, rde->name_len, bad); 370 rde->name = *p; 371 *p += rde->name_len; 372 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); 373 374 /* dentry lease */ 375 err = parse_reply_info_lease(p, end, &rde->lease, features); 376 if (err) 377 goto out_bad; 378 /* inode */ 379 err = parse_reply_info_in(p, end, &rde->inode, features); 380 if (err < 0) 381 goto out_bad; 382 /* ceph_readdir_prepopulate() will update it */ 383 rde->offset = 0; 384 i++; 385 num--; 386 } 387 388 done: 389 /* Skip over any unrecognized fields */ 390 *p = end; 391 return 0; 392 393 bad: 394 err = -EIO; 395 out_bad: 396 pr_err("problem parsing dir contents %d\n", err); 397 return err; 398 } 399 400 /* 401 * parse fcntl F_GETLK results 402 */ 403 static int parse_reply_info_filelock(void **p, void *end, 404 struct ceph_mds_reply_info_parsed *info, 405 u64 features) 406 { 407 if (*p + sizeof(*info->filelock_reply) > end) 408 goto bad; 409 410 info->filelock_reply = *p; 411 412 /* Skip over any unrecognized fields */ 413 *p = end; 414 return 0; 415 bad: 416 return -EIO; 417 } 418 419 420 #if BITS_PER_LONG == 64 421 422 #define DELEGATED_INO_AVAILABLE xa_mk_value(1) 423 424 static int ceph_parse_deleg_inos(void **p, void *end, 425 struct ceph_mds_session *s) 426 { 427 u32 sets; 428 429 ceph_decode_32_safe(p, end, sets, bad); 430 dout("got %u sets of delegated inodes\n", sets); 431 while (sets--) { 432 u64 start, len, ino; 433 434 ceph_decode_64_safe(p, end, start, bad); 435 ceph_decode_64_safe(p, end, len, bad); 436 while (len--) { 437 int err = xa_insert(&s->s_delegated_inos, ino = start++, 438 DELEGATED_INO_AVAILABLE, 439 GFP_KERNEL); 440 if (!err) { 441 dout("added delegated inode 0x%llx\n", 442 start - 1); 443 } else if (err == -EBUSY) { 444 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n", 445 start - 1); 446 } else { 447 return err; 448 } 449 } 450 } 451 return 0; 452 bad: 453 return -EIO; 454 } 455 456 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 457 { 458 unsigned long ino; 459 void *val; 460 461 xa_for_each(&s->s_delegated_inos, ino, val) { 462 val = xa_erase(&s->s_delegated_inos, ino); 463 if (val == DELEGATED_INO_AVAILABLE) 464 return ino; 465 } 466 return 0; 467 } 468 469 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 470 { 471 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE, 472 GFP_KERNEL); 473 } 474 #else /* BITS_PER_LONG == 64 */ 475 /* 476 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just 477 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top 478 * and bottom words? 479 */ 480 static int ceph_parse_deleg_inos(void **p, void *end, 481 struct ceph_mds_session *s) 482 { 483 u32 sets; 484 485 ceph_decode_32_safe(p, end, sets, bad); 486 if (sets) 487 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad); 488 return 0; 489 bad: 490 return -EIO; 491 } 492 493 u64 ceph_get_deleg_ino(struct ceph_mds_session *s) 494 { 495 return 0; 496 } 497 498 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino) 499 { 500 return 0; 501 } 502 #endif /* BITS_PER_LONG == 64 */ 503 504 /* 505 * parse create results 506 */ 507 static int parse_reply_info_create(void **p, void *end, 508 struct ceph_mds_reply_info_parsed *info, 509 u64 features, struct ceph_mds_session *s) 510 { 511 int ret; 512 513 if (features == (u64)-1 || 514 (features & CEPH_FEATURE_REPLY_CREATE_INODE)) { 515 if (*p == end) { 516 /* Malformed reply? */ 517 info->has_create_ino = false; 518 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) { 519 u8 struct_v, struct_compat; 520 u32 len; 521 522 info->has_create_ino = true; 523 ceph_decode_8_safe(p, end, struct_v, bad); 524 ceph_decode_8_safe(p, end, struct_compat, bad); 525 ceph_decode_32_safe(p, end, len, bad); 526 ceph_decode_64_safe(p, end, info->ino, bad); 527 ret = ceph_parse_deleg_inos(p, end, s); 528 if (ret) 529 return ret; 530 } else { 531 /* legacy */ 532 ceph_decode_64_safe(p, end, info->ino, bad); 533 info->has_create_ino = true; 534 } 535 } else { 536 if (*p != end) 537 goto bad; 538 } 539 540 /* Skip over any unrecognized fields */ 541 *p = end; 542 return 0; 543 bad: 544 return -EIO; 545 } 546 547 /* 548 * parse extra results 549 */ 550 static int parse_reply_info_extra(void **p, void *end, 551 struct ceph_mds_reply_info_parsed *info, 552 u64 features, struct ceph_mds_session *s) 553 { 554 u32 op = le32_to_cpu(info->head->op); 555 556 if (op == CEPH_MDS_OP_GETFILELOCK) 557 return parse_reply_info_filelock(p, end, info, features); 558 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP) 559 return parse_reply_info_readdir(p, end, info, features); 560 else if (op == CEPH_MDS_OP_CREATE) 561 return parse_reply_info_create(p, end, info, features, s); 562 else 563 return -EIO; 564 } 565 566 /* 567 * parse entire mds reply 568 */ 569 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg, 570 struct ceph_mds_reply_info_parsed *info, 571 u64 features) 572 { 573 void *p, *end; 574 u32 len; 575 int err; 576 577 info->head = msg->front.iov_base; 578 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head); 579 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head); 580 581 /* trace */ 582 ceph_decode_32_safe(&p, end, len, bad); 583 if (len > 0) { 584 ceph_decode_need(&p, end, len, bad); 585 err = parse_reply_info_trace(&p, p+len, info, features); 586 if (err < 0) 587 goto out_bad; 588 } 589 590 /* extra */ 591 ceph_decode_32_safe(&p, end, len, bad); 592 if (len > 0) { 593 ceph_decode_need(&p, end, len, bad); 594 err = parse_reply_info_extra(&p, p+len, info, features, s); 595 if (err < 0) 596 goto out_bad; 597 } 598 599 /* snap blob */ 600 ceph_decode_32_safe(&p, end, len, bad); 601 info->snapblob_len = len; 602 info->snapblob = p; 603 p += len; 604 605 if (p != end) 606 goto bad; 607 return 0; 608 609 bad: 610 err = -EIO; 611 out_bad: 612 pr_err("mds parse_reply err %d\n", err); 613 return err; 614 } 615 616 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info) 617 { 618 if (!info->dir_entries) 619 return; 620 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size)); 621 } 622 623 624 /* 625 * sessions 626 */ 627 const char *ceph_session_state_name(int s) 628 { 629 switch (s) { 630 case CEPH_MDS_SESSION_NEW: return "new"; 631 case CEPH_MDS_SESSION_OPENING: return "opening"; 632 case CEPH_MDS_SESSION_OPEN: return "open"; 633 case CEPH_MDS_SESSION_HUNG: return "hung"; 634 case CEPH_MDS_SESSION_CLOSING: return "closing"; 635 case CEPH_MDS_SESSION_CLOSED: return "closed"; 636 case CEPH_MDS_SESSION_RESTARTING: return "restarting"; 637 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting"; 638 case CEPH_MDS_SESSION_REJECTED: return "rejected"; 639 default: return "???"; 640 } 641 } 642 643 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s) 644 { 645 if (refcount_inc_not_zero(&s->s_ref)) { 646 dout("mdsc get_session %p %d -> %d\n", s, 647 refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref)); 648 return s; 649 } else { 650 dout("mdsc get_session %p 0 -- FAIL\n", s); 651 return NULL; 652 } 653 } 654 655 void ceph_put_mds_session(struct ceph_mds_session *s) 656 { 657 dout("mdsc put_session %p %d -> %d\n", s, 658 refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1); 659 if (refcount_dec_and_test(&s->s_ref)) { 660 if (s->s_auth.authorizer) 661 ceph_auth_destroy_authorizer(s->s_auth.authorizer); 662 WARN_ON(mutex_is_locked(&s->s_mutex)); 663 xa_destroy(&s->s_delegated_inos); 664 kfree(s); 665 } 666 } 667 668 /* 669 * called under mdsc->mutex 670 */ 671 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc, 672 int mds) 673 { 674 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 675 return NULL; 676 return ceph_get_mds_session(mdsc->sessions[mds]); 677 } 678 679 static bool __have_session(struct ceph_mds_client *mdsc, int mds) 680 { 681 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds]) 682 return false; 683 else 684 return true; 685 } 686 687 static int __verify_registered_session(struct ceph_mds_client *mdsc, 688 struct ceph_mds_session *s) 689 { 690 if (s->s_mds >= mdsc->max_sessions || 691 mdsc->sessions[s->s_mds] != s) 692 return -ENOENT; 693 return 0; 694 } 695 696 /* 697 * create+register a new session for given mds. 698 * called under mdsc->mutex. 699 */ 700 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, 701 int mds) 702 { 703 struct ceph_mds_session *s; 704 705 if (mds >= mdsc->mdsmap->possible_max_rank) 706 return ERR_PTR(-EINVAL); 707 708 s = kzalloc(sizeof(*s), GFP_NOFS); 709 if (!s) 710 return ERR_PTR(-ENOMEM); 711 712 if (mds >= mdsc->max_sessions) { 713 int newmax = 1 << get_count_order(mds + 1); 714 struct ceph_mds_session **sa; 715 716 dout("%s: realloc to %d\n", __func__, newmax); 717 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); 718 if (!sa) 719 goto fail_realloc; 720 if (mdsc->sessions) { 721 memcpy(sa, mdsc->sessions, 722 mdsc->max_sessions * sizeof(void *)); 723 kfree(mdsc->sessions); 724 } 725 mdsc->sessions = sa; 726 mdsc->max_sessions = newmax; 727 } 728 729 dout("%s: mds%d\n", __func__, mds); 730 s->s_mdsc = mdsc; 731 s->s_mds = mds; 732 s->s_state = CEPH_MDS_SESSION_NEW; 733 s->s_ttl = 0; 734 s->s_seq = 0; 735 mutex_init(&s->s_mutex); 736 737 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr); 738 739 spin_lock_init(&s->s_gen_ttl_lock); 740 s->s_cap_gen = 1; 741 s->s_cap_ttl = jiffies - 1; 742 743 spin_lock_init(&s->s_cap_lock); 744 s->s_renew_requested = 0; 745 s->s_renew_seq = 0; 746 INIT_LIST_HEAD(&s->s_caps); 747 s->s_nr_caps = 0; 748 refcount_set(&s->s_ref, 1); 749 INIT_LIST_HEAD(&s->s_waiting); 750 INIT_LIST_HEAD(&s->s_unsafe); 751 xa_init(&s->s_delegated_inos); 752 s->s_num_cap_releases = 0; 753 s->s_cap_reconnect = 0; 754 s->s_cap_iterator = NULL; 755 INIT_LIST_HEAD(&s->s_cap_releases); 756 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); 757 758 INIT_LIST_HEAD(&s->s_cap_dirty); 759 INIT_LIST_HEAD(&s->s_cap_flushing); 760 761 mdsc->sessions[mds] = s; 762 atomic_inc(&mdsc->num_sessions); 763 refcount_inc(&s->s_ref); /* one ref to sessions[], one to caller */ 764 765 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds, 766 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 767 768 return s; 769 770 fail_realloc: 771 kfree(s); 772 return ERR_PTR(-ENOMEM); 773 } 774 775 /* 776 * called under mdsc->mutex 777 */ 778 static void __unregister_session(struct ceph_mds_client *mdsc, 779 struct ceph_mds_session *s) 780 { 781 dout("__unregister_session mds%d %p\n", s->s_mds, s); 782 BUG_ON(mdsc->sessions[s->s_mds] != s); 783 mdsc->sessions[s->s_mds] = NULL; 784 ceph_con_close(&s->s_con); 785 ceph_put_mds_session(s); 786 atomic_dec(&mdsc->num_sessions); 787 } 788 789 /* 790 * drop session refs in request. 791 * 792 * should be last request ref, or hold mdsc->mutex 793 */ 794 static void put_request_session(struct ceph_mds_request *req) 795 { 796 if (req->r_session) { 797 ceph_put_mds_session(req->r_session); 798 req->r_session = NULL; 799 } 800 } 801 802 void ceph_mdsc_release_request(struct kref *kref) 803 { 804 struct ceph_mds_request *req = container_of(kref, 805 struct ceph_mds_request, 806 r_kref); 807 ceph_mdsc_release_dir_caps_no_check(req); 808 destroy_reply_info(&req->r_reply_info); 809 if (req->r_request) 810 ceph_msg_put(req->r_request); 811 if (req->r_reply) 812 ceph_msg_put(req->r_reply); 813 if (req->r_inode) { 814 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 815 /* avoid calling iput_final() in mds dispatch threads */ 816 ceph_async_iput(req->r_inode); 817 } 818 if (req->r_parent) { 819 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); 820 ceph_async_iput(req->r_parent); 821 } 822 ceph_async_iput(req->r_target_inode); 823 if (req->r_dentry) 824 dput(req->r_dentry); 825 if (req->r_old_dentry) 826 dput(req->r_old_dentry); 827 if (req->r_old_dentry_dir) { 828 /* 829 * track (and drop pins for) r_old_dentry_dir 830 * separately, since r_old_dentry's d_parent may have 831 * changed between the dir mutex being dropped and 832 * this request being freed. 833 */ 834 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), 835 CEPH_CAP_PIN); 836 ceph_async_iput(req->r_old_dentry_dir); 837 } 838 kfree(req->r_path1); 839 kfree(req->r_path2); 840 if (req->r_pagelist) 841 ceph_pagelist_release(req->r_pagelist); 842 put_request_session(req); 843 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation); 844 WARN_ON_ONCE(!list_empty(&req->r_wait)); 845 kmem_cache_free(ceph_mds_request_cachep, req); 846 } 847 848 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) 849 850 /* 851 * lookup session, bump ref if found. 852 * 853 * called under mdsc->mutex. 854 */ 855 static struct ceph_mds_request * 856 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid) 857 { 858 struct ceph_mds_request *req; 859 860 req = lookup_request(&mdsc->request_tree, tid); 861 if (req) 862 ceph_mdsc_get_request(req); 863 864 return req; 865 } 866 867 /* 868 * Register an in-flight request, and assign a tid. Link to directory 869 * are modifying (if any). 870 * 871 * Called under mdsc->mutex. 872 */ 873 static void __register_request(struct ceph_mds_client *mdsc, 874 struct ceph_mds_request *req, 875 struct inode *dir) 876 { 877 int ret = 0; 878 879 req->r_tid = ++mdsc->last_tid; 880 if (req->r_num_caps) { 881 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation, 882 req->r_num_caps); 883 if (ret < 0) { 884 pr_err("__register_request %p " 885 "failed to reserve caps: %d\n", req, ret); 886 /* set req->r_err to fail early from __do_request */ 887 req->r_err = ret; 888 return; 889 } 890 } 891 dout("__register_request %p tid %lld\n", req, req->r_tid); 892 ceph_mdsc_get_request(req); 893 insert_request(&mdsc->request_tree, req); 894 895 req->r_uid = current_fsuid(); 896 req->r_gid = current_fsgid(); 897 898 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) 899 mdsc->oldest_tid = req->r_tid; 900 901 if (dir) { 902 struct ceph_inode_info *ci = ceph_inode(dir); 903 904 ihold(dir); 905 req->r_unsafe_dir = dir; 906 spin_lock(&ci->i_unsafe_lock); 907 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops); 908 spin_unlock(&ci->i_unsafe_lock); 909 } 910 } 911 912 static void __unregister_request(struct ceph_mds_client *mdsc, 913 struct ceph_mds_request *req) 914 { 915 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 916 917 /* Never leave an unregistered request on an unsafe list! */ 918 list_del_init(&req->r_unsafe_item); 919 920 if (req->r_tid == mdsc->oldest_tid) { 921 struct rb_node *p = rb_next(&req->r_node); 922 mdsc->oldest_tid = 0; 923 while (p) { 924 struct ceph_mds_request *next_req = 925 rb_entry(p, struct ceph_mds_request, r_node); 926 if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { 927 mdsc->oldest_tid = next_req->r_tid; 928 break; 929 } 930 p = rb_next(p); 931 } 932 } 933 934 erase_request(&mdsc->request_tree, req); 935 936 if (req->r_unsafe_dir) { 937 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); 938 spin_lock(&ci->i_unsafe_lock); 939 list_del_init(&req->r_unsafe_dir_item); 940 spin_unlock(&ci->i_unsafe_lock); 941 } 942 if (req->r_target_inode && 943 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 944 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); 945 spin_lock(&ci->i_unsafe_lock); 946 list_del_init(&req->r_unsafe_target_item); 947 spin_unlock(&ci->i_unsafe_lock); 948 } 949 950 if (req->r_unsafe_dir) { 951 /* avoid calling iput_final() in mds dispatch threads */ 952 ceph_async_iput(req->r_unsafe_dir); 953 req->r_unsafe_dir = NULL; 954 } 955 956 complete_all(&req->r_safe_completion); 957 958 ceph_mdsc_put_request(req); 959 } 960 961 /* 962 * Walk back up the dentry tree until we hit a dentry representing a 963 * non-snapshot inode. We do this using the rcu_read_lock (which must be held 964 * when calling this) to ensure that the objects won't disappear while we're 965 * working with them. Once we hit a candidate dentry, we attempt to take a 966 * reference to it, and return that as the result. 967 */ 968 static struct inode *get_nonsnap_parent(struct dentry *dentry) 969 { 970 struct inode *inode = NULL; 971 972 while (dentry && !IS_ROOT(dentry)) { 973 inode = d_inode_rcu(dentry); 974 if (!inode || ceph_snap(inode) == CEPH_NOSNAP) 975 break; 976 dentry = dentry->d_parent; 977 } 978 if (inode) 979 inode = igrab(inode); 980 return inode; 981 } 982 983 /* 984 * Choose mds to send request to next. If there is a hint set in the 985 * request (e.g., due to a prior forward hint from the mds), use that. 986 * Otherwise, consult frag tree and/or caps to identify the 987 * appropriate mds. If all else fails, choose randomly. 988 * 989 * Called under mdsc->mutex. 990 */ 991 static int __choose_mds(struct ceph_mds_client *mdsc, 992 struct ceph_mds_request *req, 993 bool *random) 994 { 995 struct inode *inode; 996 struct ceph_inode_info *ci; 997 struct ceph_cap *cap; 998 int mode = req->r_direct_mode; 999 int mds = -1; 1000 u32 hash = req->r_direct_hash; 1001 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); 1002 1003 if (random) 1004 *random = false; 1005 1006 /* 1007 * is there a specific mds we should try? ignore hint if we have 1008 * no session and the mds is not up (active or recovering). 1009 */ 1010 if (req->r_resend_mds >= 0 && 1011 (__have_session(mdsc, req->r_resend_mds) || 1012 ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) { 1013 dout("%s using resend_mds mds%d\n", __func__, 1014 req->r_resend_mds); 1015 return req->r_resend_mds; 1016 } 1017 1018 if (mode == USE_RANDOM_MDS) 1019 goto random; 1020 1021 inode = NULL; 1022 if (req->r_inode) { 1023 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) { 1024 inode = req->r_inode; 1025 ihold(inode); 1026 } else { 1027 /* req->r_dentry is non-null for LSSNAP request */ 1028 rcu_read_lock(); 1029 inode = get_nonsnap_parent(req->r_dentry); 1030 rcu_read_unlock(); 1031 dout("%s using snapdir's parent %p\n", __func__, inode); 1032 } 1033 } else if (req->r_dentry) { 1034 /* ignore race with rename; old or new d_parent is okay */ 1035 struct dentry *parent; 1036 struct inode *dir; 1037 1038 rcu_read_lock(); 1039 parent = READ_ONCE(req->r_dentry->d_parent); 1040 dir = req->r_parent ? : d_inode_rcu(parent); 1041 1042 if (!dir || dir->i_sb != mdsc->fsc->sb) { 1043 /* not this fs or parent went negative */ 1044 inode = d_inode(req->r_dentry); 1045 if (inode) 1046 ihold(inode); 1047 } else if (ceph_snap(dir) != CEPH_NOSNAP) { 1048 /* direct snapped/virtual snapdir requests 1049 * based on parent dir inode */ 1050 inode = get_nonsnap_parent(parent); 1051 dout("%s using nonsnap parent %p\n", __func__, inode); 1052 } else { 1053 /* dentry target */ 1054 inode = d_inode(req->r_dentry); 1055 if (!inode || mode == USE_AUTH_MDS) { 1056 /* dir + name */ 1057 inode = igrab(dir); 1058 hash = ceph_dentry_hash(dir, req->r_dentry); 1059 is_hash = true; 1060 } else { 1061 ihold(inode); 1062 } 1063 } 1064 rcu_read_unlock(); 1065 } 1066 1067 dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash, 1068 hash, mode); 1069 if (!inode) 1070 goto random; 1071 ci = ceph_inode(inode); 1072 1073 if (is_hash && S_ISDIR(inode->i_mode)) { 1074 struct ceph_inode_frag frag; 1075 int found; 1076 1077 ceph_choose_frag(ci, hash, &frag, &found); 1078 if (found) { 1079 if (mode == USE_ANY_MDS && frag.ndist > 0) { 1080 u8 r; 1081 1082 /* choose a random replica */ 1083 get_random_bytes(&r, 1); 1084 r %= frag.ndist; 1085 mds = frag.dist[r]; 1086 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n", 1087 __func__, inode, ceph_vinop(inode), 1088 frag.frag, mds, (int)r, frag.ndist); 1089 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1090 CEPH_MDS_STATE_ACTIVE && 1091 !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) 1092 goto out; 1093 } 1094 1095 /* since this file/dir wasn't known to be 1096 * replicated, then we want to look for the 1097 * authoritative mds. */ 1098 if (frag.mds >= 0) { 1099 /* choose auth mds */ 1100 mds = frag.mds; 1101 dout("%s %p %llx.%llx frag %u mds%d (auth)\n", 1102 __func__, inode, ceph_vinop(inode), 1103 frag.frag, mds); 1104 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= 1105 CEPH_MDS_STATE_ACTIVE) { 1106 if (!ceph_mdsmap_is_laggy(mdsc->mdsmap, 1107 mds)) 1108 goto out; 1109 } 1110 } 1111 mode = USE_AUTH_MDS; 1112 } 1113 } 1114 1115 spin_lock(&ci->i_ceph_lock); 1116 cap = NULL; 1117 if (mode == USE_AUTH_MDS) 1118 cap = ci->i_auth_cap; 1119 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps)) 1120 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); 1121 if (!cap) { 1122 spin_unlock(&ci->i_ceph_lock); 1123 ceph_async_iput(inode); 1124 goto random; 1125 } 1126 mds = cap->session->s_mds; 1127 dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__, 1128 inode, ceph_vinop(inode), mds, 1129 cap == ci->i_auth_cap ? "auth " : "", cap); 1130 spin_unlock(&ci->i_ceph_lock); 1131 out: 1132 /* avoid calling iput_final() while holding mdsc->mutex or 1133 * in mds dispatch threads */ 1134 ceph_async_iput(inode); 1135 return mds; 1136 1137 random: 1138 if (random) 1139 *random = true; 1140 1141 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap); 1142 dout("%s chose random mds%d\n", __func__, mds); 1143 return mds; 1144 } 1145 1146 1147 /* 1148 * session messages 1149 */ 1150 static struct ceph_msg *create_session_msg(u32 op, u64 seq) 1151 { 1152 struct ceph_msg *msg; 1153 struct ceph_mds_session_head *h; 1154 1155 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS, 1156 false); 1157 if (!msg) { 1158 pr_err("create_session_msg ENOMEM creating msg\n"); 1159 return NULL; 1160 } 1161 h = msg->front.iov_base; 1162 h->op = cpu_to_le32(op); 1163 h->seq = cpu_to_le64(seq); 1164 1165 return msg; 1166 } 1167 1168 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED; 1169 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8) 1170 static int encode_supported_features(void **p, void *end) 1171 { 1172 static const size_t count = ARRAY_SIZE(feature_bits); 1173 1174 if (count > 0) { 1175 size_t i; 1176 size_t size = FEATURE_BYTES(count); 1177 1178 if (WARN_ON_ONCE(*p + 4 + size > end)) 1179 return -ERANGE; 1180 1181 ceph_encode_32(p, size); 1182 memset(*p, 0, size); 1183 for (i = 0; i < count; i++) 1184 ((unsigned char*)(*p))[i / 8] |= BIT(feature_bits[i] % 8); 1185 *p += size; 1186 } else { 1187 if (WARN_ON_ONCE(*p + 4 > end)) 1188 return -ERANGE; 1189 1190 ceph_encode_32(p, 0); 1191 } 1192 1193 return 0; 1194 } 1195 1196 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED; 1197 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8) 1198 static int encode_metric_spec(void **p, void *end) 1199 { 1200 static const size_t count = ARRAY_SIZE(metric_bits); 1201 1202 /* header */ 1203 if (WARN_ON_ONCE(*p + 2 > end)) 1204 return -ERANGE; 1205 1206 ceph_encode_8(p, 1); /* version */ 1207 ceph_encode_8(p, 1); /* compat */ 1208 1209 if (count > 0) { 1210 size_t i; 1211 size_t size = METRIC_BYTES(count); 1212 1213 if (WARN_ON_ONCE(*p + 4 + 4 + size > end)) 1214 return -ERANGE; 1215 1216 /* metric spec info length */ 1217 ceph_encode_32(p, 4 + size); 1218 1219 /* metric spec */ 1220 ceph_encode_32(p, size); 1221 memset(*p, 0, size); 1222 for (i = 0; i < count; i++) 1223 ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8); 1224 *p += size; 1225 } else { 1226 if (WARN_ON_ONCE(*p + 4 + 4 > end)) 1227 return -ERANGE; 1228 1229 /* metric spec info length */ 1230 ceph_encode_32(p, 4); 1231 /* metric spec */ 1232 ceph_encode_32(p, 0); 1233 } 1234 1235 return 0; 1236 } 1237 1238 /* 1239 * session message, specialization for CEPH_SESSION_REQUEST_OPEN 1240 * to include additional client metadata fields. 1241 */ 1242 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq) 1243 { 1244 struct ceph_msg *msg; 1245 struct ceph_mds_session_head *h; 1246 int i = -1; 1247 int extra_bytes = 0; 1248 int metadata_key_count = 0; 1249 struct ceph_options *opt = mdsc->fsc->client->options; 1250 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options; 1251 size_t size, count; 1252 void *p, *end; 1253 int ret; 1254 1255 const char* metadata[][2] = { 1256 {"hostname", mdsc->nodename}, 1257 {"kernel_version", init_utsname()->release}, 1258 {"entity_id", opt->name ? : ""}, 1259 {"root", fsopt->server_path ? : "/"}, 1260 {NULL, NULL} 1261 }; 1262 1263 /* Calculate serialized length of metadata */ 1264 extra_bytes = 4; /* map length */ 1265 for (i = 0; metadata[i][0]; ++i) { 1266 extra_bytes += 8 + strlen(metadata[i][0]) + 1267 strlen(metadata[i][1]); 1268 metadata_key_count++; 1269 } 1270 1271 /* supported feature */ 1272 size = 0; 1273 count = ARRAY_SIZE(feature_bits); 1274 if (count > 0) 1275 size = FEATURE_BYTES(count); 1276 extra_bytes += 4 + size; 1277 1278 /* metric spec */ 1279 size = 0; 1280 count = ARRAY_SIZE(metric_bits); 1281 if (count > 0) 1282 size = METRIC_BYTES(count); 1283 extra_bytes += 2 + 4 + 4 + size; 1284 1285 /* Allocate the message */ 1286 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes, 1287 GFP_NOFS, false); 1288 if (!msg) { 1289 pr_err("create_session_msg ENOMEM creating msg\n"); 1290 return ERR_PTR(-ENOMEM); 1291 } 1292 p = msg->front.iov_base; 1293 end = p + msg->front.iov_len; 1294 1295 h = p; 1296 h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN); 1297 h->seq = cpu_to_le64(seq); 1298 1299 /* 1300 * Serialize client metadata into waiting buffer space, using 1301 * the format that userspace expects for map<string, string> 1302 * 1303 * ClientSession messages with metadata are v4 1304 */ 1305 msg->hdr.version = cpu_to_le16(4); 1306 msg->hdr.compat_version = cpu_to_le16(1); 1307 1308 /* The write pointer, following the session_head structure */ 1309 p += sizeof(*h); 1310 1311 /* Number of entries in the map */ 1312 ceph_encode_32(&p, metadata_key_count); 1313 1314 /* Two length-prefixed strings for each entry in the map */ 1315 for (i = 0; metadata[i][0]; ++i) { 1316 size_t const key_len = strlen(metadata[i][0]); 1317 size_t const val_len = strlen(metadata[i][1]); 1318 1319 ceph_encode_32(&p, key_len); 1320 memcpy(p, metadata[i][0], key_len); 1321 p += key_len; 1322 ceph_encode_32(&p, val_len); 1323 memcpy(p, metadata[i][1], val_len); 1324 p += val_len; 1325 } 1326 1327 ret = encode_supported_features(&p, end); 1328 if (ret) { 1329 pr_err("encode_supported_features failed!\n"); 1330 ceph_msg_put(msg); 1331 return ERR_PTR(ret); 1332 } 1333 1334 ret = encode_metric_spec(&p, end); 1335 if (ret) { 1336 pr_err("encode_metric_spec failed!\n"); 1337 ceph_msg_put(msg); 1338 return ERR_PTR(ret); 1339 } 1340 1341 msg->front.iov_len = p - msg->front.iov_base; 1342 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1343 1344 return msg; 1345 } 1346 1347 /* 1348 * send session open request. 1349 * 1350 * called under mdsc->mutex 1351 */ 1352 static int __open_session(struct ceph_mds_client *mdsc, 1353 struct ceph_mds_session *session) 1354 { 1355 struct ceph_msg *msg; 1356 int mstate; 1357 int mds = session->s_mds; 1358 1359 /* wait for mds to go active? */ 1360 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds); 1361 dout("open_session to mds%d (%s)\n", mds, 1362 ceph_mds_state_name(mstate)); 1363 session->s_state = CEPH_MDS_SESSION_OPENING; 1364 session->s_renew_requested = jiffies; 1365 1366 /* send connect message */ 1367 msg = create_session_open_msg(mdsc, session->s_seq); 1368 if (IS_ERR(msg)) 1369 return PTR_ERR(msg); 1370 ceph_con_send(&session->s_con, msg); 1371 return 0; 1372 } 1373 1374 /* 1375 * open sessions for any export targets for the given mds 1376 * 1377 * called under mdsc->mutex 1378 */ 1379 static struct ceph_mds_session * 1380 __open_export_target_session(struct ceph_mds_client *mdsc, int target) 1381 { 1382 struct ceph_mds_session *session; 1383 int ret; 1384 1385 session = __ceph_lookup_mds_session(mdsc, target); 1386 if (!session) { 1387 session = register_session(mdsc, target); 1388 if (IS_ERR(session)) 1389 return session; 1390 } 1391 if (session->s_state == CEPH_MDS_SESSION_NEW || 1392 session->s_state == CEPH_MDS_SESSION_CLOSING) { 1393 ret = __open_session(mdsc, session); 1394 if (ret) 1395 return ERR_PTR(ret); 1396 } 1397 1398 return session; 1399 } 1400 1401 struct ceph_mds_session * 1402 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target) 1403 { 1404 struct ceph_mds_session *session; 1405 1406 dout("open_export_target_session to mds%d\n", target); 1407 1408 mutex_lock(&mdsc->mutex); 1409 session = __open_export_target_session(mdsc, target); 1410 mutex_unlock(&mdsc->mutex); 1411 1412 return session; 1413 } 1414 1415 static void __open_export_target_sessions(struct ceph_mds_client *mdsc, 1416 struct ceph_mds_session *session) 1417 { 1418 struct ceph_mds_info *mi; 1419 struct ceph_mds_session *ts; 1420 int i, mds = session->s_mds; 1421 1422 if (mds >= mdsc->mdsmap->possible_max_rank) 1423 return; 1424 1425 mi = &mdsc->mdsmap->m_info[mds]; 1426 dout("open_export_target_sessions for mds%d (%d targets)\n", 1427 session->s_mds, mi->num_export_targets); 1428 1429 for (i = 0; i < mi->num_export_targets; i++) { 1430 ts = __open_export_target_session(mdsc, mi->export_targets[i]); 1431 if (!IS_ERR(ts)) 1432 ceph_put_mds_session(ts); 1433 } 1434 } 1435 1436 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, 1437 struct ceph_mds_session *session) 1438 { 1439 mutex_lock(&mdsc->mutex); 1440 __open_export_target_sessions(mdsc, session); 1441 mutex_unlock(&mdsc->mutex); 1442 } 1443 1444 /* 1445 * session caps 1446 */ 1447 1448 static void detach_cap_releases(struct ceph_mds_session *session, 1449 struct list_head *target) 1450 { 1451 lockdep_assert_held(&session->s_cap_lock); 1452 1453 list_splice_init(&session->s_cap_releases, target); 1454 session->s_num_cap_releases = 0; 1455 dout("dispose_cap_releases mds%d\n", session->s_mds); 1456 } 1457 1458 static void dispose_cap_releases(struct ceph_mds_client *mdsc, 1459 struct list_head *dispose) 1460 { 1461 while (!list_empty(dispose)) { 1462 struct ceph_cap *cap; 1463 /* zero out the in-progress message */ 1464 cap = list_first_entry(dispose, struct ceph_cap, session_caps); 1465 list_del(&cap->session_caps); 1466 ceph_put_cap(mdsc, cap); 1467 } 1468 } 1469 1470 static void cleanup_session_requests(struct ceph_mds_client *mdsc, 1471 struct ceph_mds_session *session) 1472 { 1473 struct ceph_mds_request *req; 1474 struct rb_node *p; 1475 struct ceph_inode_info *ci; 1476 1477 dout("cleanup_session_requests mds%d\n", session->s_mds); 1478 mutex_lock(&mdsc->mutex); 1479 while (!list_empty(&session->s_unsafe)) { 1480 req = list_first_entry(&session->s_unsafe, 1481 struct ceph_mds_request, r_unsafe_item); 1482 pr_warn_ratelimited(" dropping unsafe request %llu\n", 1483 req->r_tid); 1484 if (req->r_target_inode) { 1485 /* dropping unsafe change of inode's attributes */ 1486 ci = ceph_inode(req->r_target_inode); 1487 errseq_set(&ci->i_meta_err, -EIO); 1488 } 1489 if (req->r_unsafe_dir) { 1490 /* dropping unsafe directory operation */ 1491 ci = ceph_inode(req->r_unsafe_dir); 1492 errseq_set(&ci->i_meta_err, -EIO); 1493 } 1494 __unregister_request(mdsc, req); 1495 } 1496 /* zero r_attempts, so kick_requests() will re-send requests */ 1497 p = rb_first(&mdsc->request_tree); 1498 while (p) { 1499 req = rb_entry(p, struct ceph_mds_request, r_node); 1500 p = rb_next(p); 1501 if (req->r_session && 1502 req->r_session->s_mds == session->s_mds) 1503 req->r_attempts = 0; 1504 } 1505 mutex_unlock(&mdsc->mutex); 1506 } 1507 1508 /* 1509 * Helper to safely iterate over all caps associated with a session, with 1510 * special care taken to handle a racing __ceph_remove_cap(). 1511 * 1512 * Caller must hold session s_mutex. 1513 */ 1514 int ceph_iterate_session_caps(struct ceph_mds_session *session, 1515 int (*cb)(struct inode *, struct ceph_cap *, 1516 void *), void *arg) 1517 { 1518 struct list_head *p; 1519 struct ceph_cap *cap; 1520 struct inode *inode, *last_inode = NULL; 1521 struct ceph_cap *old_cap = NULL; 1522 int ret; 1523 1524 dout("iterate_session_caps %p mds%d\n", session, session->s_mds); 1525 spin_lock(&session->s_cap_lock); 1526 p = session->s_caps.next; 1527 while (p != &session->s_caps) { 1528 cap = list_entry(p, struct ceph_cap, session_caps); 1529 inode = igrab(&cap->ci->vfs_inode); 1530 if (!inode) { 1531 p = p->next; 1532 continue; 1533 } 1534 session->s_cap_iterator = cap; 1535 spin_unlock(&session->s_cap_lock); 1536 1537 if (last_inode) { 1538 /* avoid calling iput_final() while holding 1539 * s_mutex or in mds dispatch threads */ 1540 ceph_async_iput(last_inode); 1541 last_inode = NULL; 1542 } 1543 if (old_cap) { 1544 ceph_put_cap(session->s_mdsc, old_cap); 1545 old_cap = NULL; 1546 } 1547 1548 ret = cb(inode, cap, arg); 1549 last_inode = inode; 1550 1551 spin_lock(&session->s_cap_lock); 1552 p = p->next; 1553 if (!cap->ci) { 1554 dout("iterate_session_caps finishing cap %p removal\n", 1555 cap); 1556 BUG_ON(cap->session != session); 1557 cap->session = NULL; 1558 list_del_init(&cap->session_caps); 1559 session->s_nr_caps--; 1560 atomic64_dec(&session->s_mdsc->metric.total_caps); 1561 if (cap->queue_release) 1562 __ceph_queue_cap_release(session, cap); 1563 else 1564 old_cap = cap; /* put_cap it w/o locks held */ 1565 } 1566 if (ret < 0) 1567 goto out; 1568 } 1569 ret = 0; 1570 out: 1571 session->s_cap_iterator = NULL; 1572 spin_unlock(&session->s_cap_lock); 1573 1574 ceph_async_iput(last_inode); 1575 if (old_cap) 1576 ceph_put_cap(session->s_mdsc, old_cap); 1577 1578 return ret; 1579 } 1580 1581 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, 1582 void *arg) 1583 { 1584 struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg; 1585 struct ceph_inode_info *ci = ceph_inode(inode); 1586 LIST_HEAD(to_remove); 1587 bool dirty_dropped = false; 1588 bool invalidate = false; 1589 1590 dout("removing cap %p, ci is %p, inode is %p\n", 1591 cap, ci, &ci->vfs_inode); 1592 spin_lock(&ci->i_ceph_lock); 1593 __ceph_remove_cap(cap, false); 1594 if (!ci->i_auth_cap) { 1595 struct ceph_cap_flush *cf; 1596 struct ceph_mds_client *mdsc = fsc->mdsc; 1597 1598 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 1599 if (inode->i_data.nrpages > 0) 1600 invalidate = true; 1601 if (ci->i_wrbuffer_ref > 0) 1602 mapping_set_error(&inode->i_data, -EIO); 1603 } 1604 1605 while (!list_empty(&ci->i_cap_flush_list)) { 1606 cf = list_first_entry(&ci->i_cap_flush_list, 1607 struct ceph_cap_flush, i_list); 1608 list_move(&cf->i_list, &to_remove); 1609 } 1610 1611 spin_lock(&mdsc->cap_dirty_lock); 1612 1613 list_for_each_entry(cf, &to_remove, i_list) 1614 list_del(&cf->g_list); 1615 1616 if (!list_empty(&ci->i_dirty_item)) { 1617 pr_warn_ratelimited( 1618 " dropping dirty %s state for %p %lld\n", 1619 ceph_cap_string(ci->i_dirty_caps), 1620 inode, ceph_ino(inode)); 1621 ci->i_dirty_caps = 0; 1622 list_del_init(&ci->i_dirty_item); 1623 dirty_dropped = true; 1624 } 1625 if (!list_empty(&ci->i_flushing_item)) { 1626 pr_warn_ratelimited( 1627 " dropping dirty+flushing %s state for %p %lld\n", 1628 ceph_cap_string(ci->i_flushing_caps), 1629 inode, ceph_ino(inode)); 1630 ci->i_flushing_caps = 0; 1631 list_del_init(&ci->i_flushing_item); 1632 mdsc->num_cap_flushing--; 1633 dirty_dropped = true; 1634 } 1635 spin_unlock(&mdsc->cap_dirty_lock); 1636 1637 if (dirty_dropped) { 1638 errseq_set(&ci->i_meta_err, -EIO); 1639 1640 if (ci->i_wrbuffer_ref_head == 0 && 1641 ci->i_wr_ref == 0 && 1642 ci->i_dirty_caps == 0 && 1643 ci->i_flushing_caps == 0) { 1644 ceph_put_snap_context(ci->i_head_snapc); 1645 ci->i_head_snapc = NULL; 1646 } 1647 } 1648 1649 if (atomic_read(&ci->i_filelock_ref) > 0) { 1650 /* make further file lock syscall return -EIO */ 1651 ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK; 1652 pr_warn_ratelimited(" dropping file locks for %p %lld\n", 1653 inode, ceph_ino(inode)); 1654 } 1655 1656 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { 1657 list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove); 1658 ci->i_prealloc_cap_flush = NULL; 1659 } 1660 } 1661 spin_unlock(&ci->i_ceph_lock); 1662 while (!list_empty(&to_remove)) { 1663 struct ceph_cap_flush *cf; 1664 cf = list_first_entry(&to_remove, 1665 struct ceph_cap_flush, i_list); 1666 list_del(&cf->i_list); 1667 ceph_free_cap_flush(cf); 1668 } 1669 1670 wake_up_all(&ci->i_cap_wq); 1671 if (invalidate) 1672 ceph_queue_invalidate(inode); 1673 if (dirty_dropped) 1674 iput(inode); 1675 return 0; 1676 } 1677 1678 /* 1679 * caller must hold session s_mutex 1680 */ 1681 static void remove_session_caps(struct ceph_mds_session *session) 1682 { 1683 struct ceph_fs_client *fsc = session->s_mdsc->fsc; 1684 struct super_block *sb = fsc->sb; 1685 LIST_HEAD(dispose); 1686 1687 dout("remove_session_caps on %p\n", session); 1688 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc); 1689 1690 wake_up_all(&fsc->mdsc->cap_flushing_wq); 1691 1692 spin_lock(&session->s_cap_lock); 1693 if (session->s_nr_caps > 0) { 1694 struct inode *inode; 1695 struct ceph_cap *cap, *prev = NULL; 1696 struct ceph_vino vino; 1697 /* 1698 * iterate_session_caps() skips inodes that are being 1699 * deleted, we need to wait until deletions are complete. 1700 * __wait_on_freeing_inode() is designed for the job, 1701 * but it is not exported, so use lookup inode function 1702 * to access it. 1703 */ 1704 while (!list_empty(&session->s_caps)) { 1705 cap = list_entry(session->s_caps.next, 1706 struct ceph_cap, session_caps); 1707 if (cap == prev) 1708 break; 1709 prev = cap; 1710 vino = cap->ci->i_vino; 1711 spin_unlock(&session->s_cap_lock); 1712 1713 inode = ceph_find_inode(sb, vino); 1714 /* avoid calling iput_final() while holding s_mutex */ 1715 ceph_async_iput(inode); 1716 1717 spin_lock(&session->s_cap_lock); 1718 } 1719 } 1720 1721 // drop cap expires and unlock s_cap_lock 1722 detach_cap_releases(session, &dispose); 1723 1724 BUG_ON(session->s_nr_caps > 0); 1725 BUG_ON(!list_empty(&session->s_cap_flushing)); 1726 spin_unlock(&session->s_cap_lock); 1727 dispose_cap_releases(session->s_mdsc, &dispose); 1728 } 1729 1730 enum { 1731 RECONNECT, 1732 RENEWCAPS, 1733 FORCE_RO, 1734 }; 1735 1736 /* 1737 * wake up any threads waiting on this session's caps. if the cap is 1738 * old (didn't get renewed on the client reconnect), remove it now. 1739 * 1740 * caller must hold s_mutex. 1741 */ 1742 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, 1743 void *arg) 1744 { 1745 struct ceph_inode_info *ci = ceph_inode(inode); 1746 unsigned long ev = (unsigned long)arg; 1747 1748 if (ev == RECONNECT) { 1749 spin_lock(&ci->i_ceph_lock); 1750 ci->i_wanted_max_size = 0; 1751 ci->i_requested_max_size = 0; 1752 spin_unlock(&ci->i_ceph_lock); 1753 } else if (ev == RENEWCAPS) { 1754 if (cap->cap_gen < cap->session->s_cap_gen) { 1755 /* mds did not re-issue stale cap */ 1756 spin_lock(&ci->i_ceph_lock); 1757 cap->issued = cap->implemented = CEPH_CAP_PIN; 1758 spin_unlock(&ci->i_ceph_lock); 1759 } 1760 } else if (ev == FORCE_RO) { 1761 } 1762 wake_up_all(&ci->i_cap_wq); 1763 return 0; 1764 } 1765 1766 static void wake_up_session_caps(struct ceph_mds_session *session, int ev) 1767 { 1768 dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); 1769 ceph_iterate_session_caps(session, wake_up_session_cb, 1770 (void *)(unsigned long)ev); 1771 } 1772 1773 /* 1774 * Send periodic message to MDS renewing all currently held caps. The 1775 * ack will reset the expiration for all caps from this session. 1776 * 1777 * caller holds s_mutex 1778 */ 1779 static int send_renew_caps(struct ceph_mds_client *mdsc, 1780 struct ceph_mds_session *session) 1781 { 1782 struct ceph_msg *msg; 1783 int state; 1784 1785 if (time_after_eq(jiffies, session->s_cap_ttl) && 1786 time_after_eq(session->s_cap_ttl, session->s_renew_requested)) 1787 pr_info("mds%d caps stale\n", session->s_mds); 1788 session->s_renew_requested = jiffies; 1789 1790 /* do not try to renew caps until a recovering mds has reconnected 1791 * with its clients. */ 1792 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); 1793 if (state < CEPH_MDS_STATE_RECONNECT) { 1794 dout("send_renew_caps ignoring mds%d (%s)\n", 1795 session->s_mds, ceph_mds_state_name(state)); 1796 return 0; 1797 } 1798 1799 dout("send_renew_caps to mds%d (%s)\n", session->s_mds, 1800 ceph_mds_state_name(state)); 1801 msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 1802 ++session->s_renew_seq); 1803 if (!msg) 1804 return -ENOMEM; 1805 ceph_con_send(&session->s_con, msg); 1806 return 0; 1807 } 1808 1809 static int send_flushmsg_ack(struct ceph_mds_client *mdsc, 1810 struct ceph_mds_session *session, u64 seq) 1811 { 1812 struct ceph_msg *msg; 1813 1814 dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n", 1815 session->s_mds, ceph_session_state_name(session->s_state), seq); 1816 msg = create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq); 1817 if (!msg) 1818 return -ENOMEM; 1819 ceph_con_send(&session->s_con, msg); 1820 return 0; 1821 } 1822 1823 1824 /* 1825 * Note new cap ttl, and any transition from stale -> not stale (fresh?). 1826 * 1827 * Called under session->s_mutex 1828 */ 1829 static void renewed_caps(struct ceph_mds_client *mdsc, 1830 struct ceph_mds_session *session, int is_renew) 1831 { 1832 int was_stale; 1833 int wake = 0; 1834 1835 spin_lock(&session->s_cap_lock); 1836 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl); 1837 1838 session->s_cap_ttl = session->s_renew_requested + 1839 mdsc->mdsmap->m_session_timeout*HZ; 1840 1841 if (was_stale) { 1842 if (time_before(jiffies, session->s_cap_ttl)) { 1843 pr_info("mds%d caps renewed\n", session->s_mds); 1844 wake = 1; 1845 } else { 1846 pr_info("mds%d caps still stale\n", session->s_mds); 1847 } 1848 } 1849 dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n", 1850 session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh", 1851 time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh"); 1852 spin_unlock(&session->s_cap_lock); 1853 1854 if (wake) 1855 wake_up_session_caps(session, RENEWCAPS); 1856 } 1857 1858 /* 1859 * send a session close request 1860 */ 1861 static int request_close_session(struct ceph_mds_session *session) 1862 { 1863 struct ceph_msg *msg; 1864 1865 dout("request_close_session mds%d state %s seq %lld\n", 1866 session->s_mds, ceph_session_state_name(session->s_state), 1867 session->s_seq); 1868 msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq); 1869 if (!msg) 1870 return -ENOMEM; 1871 ceph_con_send(&session->s_con, msg); 1872 return 1; 1873 } 1874 1875 /* 1876 * Called with s_mutex held. 1877 */ 1878 static int __close_session(struct ceph_mds_client *mdsc, 1879 struct ceph_mds_session *session) 1880 { 1881 if (session->s_state >= CEPH_MDS_SESSION_CLOSING) 1882 return 0; 1883 session->s_state = CEPH_MDS_SESSION_CLOSING; 1884 return request_close_session(session); 1885 } 1886 1887 static bool drop_negative_children(struct dentry *dentry) 1888 { 1889 struct dentry *child; 1890 bool all_negative = true; 1891 1892 if (!d_is_dir(dentry)) 1893 goto out; 1894 1895 spin_lock(&dentry->d_lock); 1896 list_for_each_entry(child, &dentry->d_subdirs, d_child) { 1897 if (d_really_is_positive(child)) { 1898 all_negative = false; 1899 break; 1900 } 1901 } 1902 spin_unlock(&dentry->d_lock); 1903 1904 if (all_negative) 1905 shrink_dcache_parent(dentry); 1906 out: 1907 return all_negative; 1908 } 1909 1910 /* 1911 * Trim old(er) caps. 1912 * 1913 * Because we can't cache an inode without one or more caps, we do 1914 * this indirectly: if a cap is unused, we prune its aliases, at which 1915 * point the inode will hopefully get dropped to. 1916 * 1917 * Yes, this is a bit sloppy. Our only real goal here is to respond to 1918 * memory pressure from the MDS, though, so it needn't be perfect. 1919 */ 1920 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) 1921 { 1922 int *remaining = arg; 1923 struct ceph_inode_info *ci = ceph_inode(inode); 1924 int used, wanted, oissued, mine; 1925 1926 if (*remaining <= 0) 1927 return -1; 1928 1929 spin_lock(&ci->i_ceph_lock); 1930 mine = cap->issued | cap->implemented; 1931 used = __ceph_caps_used(ci); 1932 wanted = __ceph_caps_file_wanted(ci); 1933 oissued = __ceph_caps_issued_other(ci, cap); 1934 1935 dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n", 1936 inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), 1937 ceph_cap_string(used), ceph_cap_string(wanted)); 1938 if (cap == ci->i_auth_cap) { 1939 if (ci->i_dirty_caps || ci->i_flushing_caps || 1940 !list_empty(&ci->i_cap_snaps)) 1941 goto out; 1942 if ((used | wanted) & CEPH_CAP_ANY_WR) 1943 goto out; 1944 /* Note: it's possible that i_filelock_ref becomes non-zero 1945 * after dropping auth caps. It doesn't hurt because reply 1946 * of lock mds request will re-add auth caps. */ 1947 if (atomic_read(&ci->i_filelock_ref) > 0) 1948 goto out; 1949 } 1950 /* The inode has cached pages, but it's no longer used. 1951 * we can safely drop it */ 1952 if (S_ISREG(inode->i_mode) && 1953 wanted == 0 && used == CEPH_CAP_FILE_CACHE && 1954 !(oissued & CEPH_CAP_FILE_CACHE)) { 1955 used = 0; 1956 oissued = 0; 1957 } 1958 if ((used | wanted) & ~oissued & mine) 1959 goto out; /* we need these caps */ 1960 1961 if (oissued) { 1962 /* we aren't the only cap.. just remove us */ 1963 __ceph_remove_cap(cap, true); 1964 (*remaining)--; 1965 } else { 1966 struct dentry *dentry; 1967 /* try dropping referring dentries */ 1968 spin_unlock(&ci->i_ceph_lock); 1969 dentry = d_find_any_alias(inode); 1970 if (dentry && drop_negative_children(dentry)) { 1971 int count; 1972 dput(dentry); 1973 d_prune_aliases(inode); 1974 count = atomic_read(&inode->i_count); 1975 if (count == 1) 1976 (*remaining)--; 1977 dout("trim_caps_cb %p cap %p pruned, count now %d\n", 1978 inode, cap, count); 1979 } else { 1980 dput(dentry); 1981 } 1982 return 0; 1983 } 1984 1985 out: 1986 spin_unlock(&ci->i_ceph_lock); 1987 return 0; 1988 } 1989 1990 /* 1991 * Trim session cap count down to some max number. 1992 */ 1993 int ceph_trim_caps(struct ceph_mds_client *mdsc, 1994 struct ceph_mds_session *session, 1995 int max_caps) 1996 { 1997 int trim_caps = session->s_nr_caps - max_caps; 1998 1999 dout("trim_caps mds%d start: %d / %d, trim %d\n", 2000 session->s_mds, session->s_nr_caps, max_caps, trim_caps); 2001 if (trim_caps > 0) { 2002 int remaining = trim_caps; 2003 2004 ceph_iterate_session_caps(session, trim_caps_cb, &remaining); 2005 dout("trim_caps mds%d done: %d / %d, trimmed %d\n", 2006 session->s_mds, session->s_nr_caps, max_caps, 2007 trim_caps - remaining); 2008 } 2009 2010 ceph_flush_cap_releases(mdsc, session); 2011 return 0; 2012 } 2013 2014 static int check_caps_flush(struct ceph_mds_client *mdsc, 2015 u64 want_flush_tid) 2016 { 2017 int ret = 1; 2018 2019 spin_lock(&mdsc->cap_dirty_lock); 2020 if (!list_empty(&mdsc->cap_flush_list)) { 2021 struct ceph_cap_flush *cf = 2022 list_first_entry(&mdsc->cap_flush_list, 2023 struct ceph_cap_flush, g_list); 2024 if (cf->tid <= want_flush_tid) { 2025 dout("check_caps_flush still flushing tid " 2026 "%llu <= %llu\n", cf->tid, want_flush_tid); 2027 ret = 0; 2028 } 2029 } 2030 spin_unlock(&mdsc->cap_dirty_lock); 2031 return ret; 2032 } 2033 2034 /* 2035 * flush all dirty inode data to disk. 2036 * 2037 * returns true if we've flushed through want_flush_tid 2038 */ 2039 static void wait_caps_flush(struct ceph_mds_client *mdsc, 2040 u64 want_flush_tid) 2041 { 2042 dout("check_caps_flush want %llu\n", want_flush_tid); 2043 2044 wait_event(mdsc->cap_flushing_wq, 2045 check_caps_flush(mdsc, want_flush_tid)); 2046 2047 dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); 2048 } 2049 2050 /* 2051 * called under s_mutex 2052 */ 2053 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc, 2054 struct ceph_mds_session *session) 2055 { 2056 struct ceph_msg *msg = NULL; 2057 struct ceph_mds_cap_release *head; 2058 struct ceph_mds_cap_item *item; 2059 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; 2060 struct ceph_cap *cap; 2061 LIST_HEAD(tmp_list); 2062 int num_cap_releases; 2063 __le32 barrier, *cap_barrier; 2064 2065 down_read(&osdc->lock); 2066 barrier = cpu_to_le32(osdc->epoch_barrier); 2067 up_read(&osdc->lock); 2068 2069 spin_lock(&session->s_cap_lock); 2070 again: 2071 list_splice_init(&session->s_cap_releases, &tmp_list); 2072 num_cap_releases = session->s_num_cap_releases; 2073 session->s_num_cap_releases = 0; 2074 spin_unlock(&session->s_cap_lock); 2075 2076 while (!list_empty(&tmp_list)) { 2077 if (!msg) { 2078 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, 2079 PAGE_SIZE, GFP_NOFS, false); 2080 if (!msg) 2081 goto out_err; 2082 head = msg->front.iov_base; 2083 head->num = cpu_to_le32(0); 2084 msg->front.iov_len = sizeof(*head); 2085 2086 msg->hdr.version = cpu_to_le16(2); 2087 msg->hdr.compat_version = cpu_to_le16(1); 2088 } 2089 2090 cap = list_first_entry(&tmp_list, struct ceph_cap, 2091 session_caps); 2092 list_del(&cap->session_caps); 2093 num_cap_releases--; 2094 2095 head = msg->front.iov_base; 2096 put_unaligned_le32(get_unaligned_le32(&head->num) + 1, 2097 &head->num); 2098 item = msg->front.iov_base + msg->front.iov_len; 2099 item->ino = cpu_to_le64(cap->cap_ino); 2100 item->cap_id = cpu_to_le64(cap->cap_id); 2101 item->migrate_seq = cpu_to_le32(cap->mseq); 2102 item->seq = cpu_to_le32(cap->issue_seq); 2103 msg->front.iov_len += sizeof(*item); 2104 2105 ceph_put_cap(mdsc, cap); 2106 2107 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { 2108 // Append cap_barrier field 2109 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2110 *cap_barrier = barrier; 2111 msg->front.iov_len += sizeof(*cap_barrier); 2112 2113 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2114 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2115 ceph_con_send(&session->s_con, msg); 2116 msg = NULL; 2117 } 2118 } 2119 2120 BUG_ON(num_cap_releases != 0); 2121 2122 spin_lock(&session->s_cap_lock); 2123 if (!list_empty(&session->s_cap_releases)) 2124 goto again; 2125 spin_unlock(&session->s_cap_lock); 2126 2127 if (msg) { 2128 // Append cap_barrier field 2129 cap_barrier = msg->front.iov_base + msg->front.iov_len; 2130 *cap_barrier = barrier; 2131 msg->front.iov_len += sizeof(*cap_barrier); 2132 2133 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2134 dout("send_cap_releases mds%d %p\n", session->s_mds, msg); 2135 ceph_con_send(&session->s_con, msg); 2136 } 2137 return; 2138 out_err: 2139 pr_err("send_cap_releases mds%d, failed to allocate message\n", 2140 session->s_mds); 2141 spin_lock(&session->s_cap_lock); 2142 list_splice(&tmp_list, &session->s_cap_releases); 2143 session->s_num_cap_releases += num_cap_releases; 2144 spin_unlock(&session->s_cap_lock); 2145 } 2146 2147 static void ceph_cap_release_work(struct work_struct *work) 2148 { 2149 struct ceph_mds_session *session = 2150 container_of(work, struct ceph_mds_session, s_cap_release_work); 2151 2152 mutex_lock(&session->s_mutex); 2153 if (session->s_state == CEPH_MDS_SESSION_OPEN || 2154 session->s_state == CEPH_MDS_SESSION_HUNG) 2155 ceph_send_cap_releases(session->s_mdsc, session); 2156 mutex_unlock(&session->s_mutex); 2157 ceph_put_mds_session(session); 2158 } 2159 2160 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc, 2161 struct ceph_mds_session *session) 2162 { 2163 if (mdsc->stopping) 2164 return; 2165 2166 ceph_get_mds_session(session); 2167 if (queue_work(mdsc->fsc->cap_wq, 2168 &session->s_cap_release_work)) { 2169 dout("cap release work queued\n"); 2170 } else { 2171 ceph_put_mds_session(session); 2172 dout("failed to queue cap release work\n"); 2173 } 2174 } 2175 2176 /* 2177 * caller holds session->s_cap_lock 2178 */ 2179 void __ceph_queue_cap_release(struct ceph_mds_session *session, 2180 struct ceph_cap *cap) 2181 { 2182 list_add_tail(&cap->session_caps, &session->s_cap_releases); 2183 session->s_num_cap_releases++; 2184 2185 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE)) 2186 ceph_flush_cap_releases(session->s_mdsc, session); 2187 } 2188 2189 static void ceph_cap_reclaim_work(struct work_struct *work) 2190 { 2191 struct ceph_mds_client *mdsc = 2192 container_of(work, struct ceph_mds_client, cap_reclaim_work); 2193 int ret = ceph_trim_dentries(mdsc); 2194 if (ret == -EAGAIN) 2195 ceph_queue_cap_reclaim_work(mdsc); 2196 } 2197 2198 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc) 2199 { 2200 if (mdsc->stopping) 2201 return; 2202 2203 if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) { 2204 dout("caps reclaim work queued\n"); 2205 } else { 2206 dout("failed to queue caps release work\n"); 2207 } 2208 } 2209 2210 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr) 2211 { 2212 int val; 2213 if (!nr) 2214 return; 2215 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending); 2216 if ((val % CEPH_CAPS_PER_RELEASE) < nr) { 2217 atomic_set(&mdsc->cap_reclaim_pending, 0); 2218 ceph_queue_cap_reclaim_work(mdsc); 2219 } 2220 } 2221 2222 /* 2223 * requests 2224 */ 2225 2226 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, 2227 struct inode *dir) 2228 { 2229 struct ceph_inode_info *ci = ceph_inode(dir); 2230 struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; 2231 struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; 2232 size_t size = sizeof(struct ceph_mds_reply_dir_entry); 2233 unsigned int num_entries; 2234 int order; 2235 2236 spin_lock(&ci->i_ceph_lock); 2237 num_entries = ci->i_files + ci->i_subdirs; 2238 spin_unlock(&ci->i_ceph_lock); 2239 num_entries = max(num_entries, 1U); 2240 num_entries = min(num_entries, opt->max_readdir); 2241 2242 order = get_order(size * num_entries); 2243 while (order >= 0) { 2244 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | 2245 __GFP_NOWARN, 2246 order); 2247 if (rinfo->dir_entries) 2248 break; 2249 order--; 2250 } 2251 if (!rinfo->dir_entries) 2252 return -ENOMEM; 2253 2254 num_entries = (PAGE_SIZE << order) / size; 2255 num_entries = min(num_entries, opt->max_readdir); 2256 2257 rinfo->dir_buf_size = PAGE_SIZE << order; 2258 req->r_num_caps = num_entries + 1; 2259 req->r_args.readdir.max_entries = cpu_to_le32(num_entries); 2260 req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes); 2261 return 0; 2262 } 2263 2264 /* 2265 * Create an mds request. 2266 */ 2267 struct ceph_mds_request * 2268 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) 2269 { 2270 struct ceph_mds_request *req; 2271 2272 req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS); 2273 if (!req) 2274 return ERR_PTR(-ENOMEM); 2275 2276 mutex_init(&req->r_fill_mutex); 2277 req->r_mdsc = mdsc; 2278 req->r_started = jiffies; 2279 req->r_start_latency = ktime_get(); 2280 req->r_resend_mds = -1; 2281 INIT_LIST_HEAD(&req->r_unsafe_dir_item); 2282 INIT_LIST_HEAD(&req->r_unsafe_target_item); 2283 req->r_fmode = -1; 2284 kref_init(&req->r_kref); 2285 RB_CLEAR_NODE(&req->r_node); 2286 INIT_LIST_HEAD(&req->r_wait); 2287 init_completion(&req->r_completion); 2288 init_completion(&req->r_safe_completion); 2289 INIT_LIST_HEAD(&req->r_unsafe_item); 2290 2291 ktime_get_coarse_real_ts64(&req->r_stamp); 2292 2293 req->r_op = op; 2294 req->r_direct_mode = mode; 2295 return req; 2296 } 2297 2298 /* 2299 * return oldest (lowest) request, tid in request tree, 0 if none. 2300 * 2301 * called under mdsc->mutex. 2302 */ 2303 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) 2304 { 2305 if (RB_EMPTY_ROOT(&mdsc->request_tree)) 2306 return NULL; 2307 return rb_entry(rb_first(&mdsc->request_tree), 2308 struct ceph_mds_request, r_node); 2309 } 2310 2311 static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) 2312 { 2313 return mdsc->oldest_tid; 2314 } 2315 2316 /* 2317 * Build a dentry's path. Allocate on heap; caller must kfree. Based 2318 * on build_path_from_dentry in fs/cifs/dir.c. 2319 * 2320 * If @stop_on_nosnap, generate path relative to the first non-snapped 2321 * inode. 2322 * 2323 * Encode hidden .snap dirs as a double /, i.e. 2324 * foo/.snap/bar -> foo//bar 2325 */ 2326 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase, 2327 int stop_on_nosnap) 2328 { 2329 struct dentry *temp; 2330 char *path; 2331 int pos; 2332 unsigned seq; 2333 u64 base; 2334 2335 if (!dentry) 2336 return ERR_PTR(-EINVAL); 2337 2338 path = __getname(); 2339 if (!path) 2340 return ERR_PTR(-ENOMEM); 2341 retry: 2342 pos = PATH_MAX - 1; 2343 path[pos] = '\0'; 2344 2345 seq = read_seqbegin(&rename_lock); 2346 rcu_read_lock(); 2347 temp = dentry; 2348 for (;;) { 2349 struct inode *inode; 2350 2351 spin_lock(&temp->d_lock); 2352 inode = d_inode(temp); 2353 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) { 2354 dout("build_path path+%d: %p SNAPDIR\n", 2355 pos, temp); 2356 } else if (stop_on_nosnap && inode && dentry != temp && 2357 ceph_snap(inode) == CEPH_NOSNAP) { 2358 spin_unlock(&temp->d_lock); 2359 pos++; /* get rid of any prepended '/' */ 2360 break; 2361 } else { 2362 pos -= temp->d_name.len; 2363 if (pos < 0) { 2364 spin_unlock(&temp->d_lock); 2365 break; 2366 } 2367 memcpy(path + pos, temp->d_name.name, temp->d_name.len); 2368 } 2369 spin_unlock(&temp->d_lock); 2370 temp = READ_ONCE(temp->d_parent); 2371 2372 /* Are we at the root? */ 2373 if (IS_ROOT(temp)) 2374 break; 2375 2376 /* Are we out of buffer? */ 2377 if (--pos < 0) 2378 break; 2379 2380 path[pos] = '/'; 2381 } 2382 base = ceph_ino(d_inode(temp)); 2383 rcu_read_unlock(); 2384 2385 if (read_seqretry(&rename_lock, seq)) 2386 goto retry; 2387 2388 if (pos < 0) { 2389 /* 2390 * A rename didn't occur, but somehow we didn't end up where 2391 * we thought we would. Throw a warning and try again. 2392 */ 2393 pr_warn("build_path did not end path lookup where " 2394 "expected, pos is %d\n", pos); 2395 goto retry; 2396 } 2397 2398 *pbase = base; 2399 *plen = PATH_MAX - 1 - pos; 2400 dout("build_path on %p %d built %llx '%.*s'\n", 2401 dentry, d_count(dentry), base, *plen, path + pos); 2402 return path + pos; 2403 } 2404 2405 static int build_dentry_path(struct dentry *dentry, struct inode *dir, 2406 const char **ppath, int *ppathlen, u64 *pino, 2407 bool *pfreepath, bool parent_locked) 2408 { 2409 char *path; 2410 2411 rcu_read_lock(); 2412 if (!dir) 2413 dir = d_inode_rcu(dentry->d_parent); 2414 if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) { 2415 *pino = ceph_ino(dir); 2416 rcu_read_unlock(); 2417 *ppath = dentry->d_name.name; 2418 *ppathlen = dentry->d_name.len; 2419 return 0; 2420 } 2421 rcu_read_unlock(); 2422 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2423 if (IS_ERR(path)) 2424 return PTR_ERR(path); 2425 *ppath = path; 2426 *pfreepath = true; 2427 return 0; 2428 } 2429 2430 static int build_inode_path(struct inode *inode, 2431 const char **ppath, int *ppathlen, u64 *pino, 2432 bool *pfreepath) 2433 { 2434 struct dentry *dentry; 2435 char *path; 2436 2437 if (ceph_snap(inode) == CEPH_NOSNAP) { 2438 *pino = ceph_ino(inode); 2439 *ppathlen = 0; 2440 return 0; 2441 } 2442 dentry = d_find_alias(inode); 2443 path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); 2444 dput(dentry); 2445 if (IS_ERR(path)) 2446 return PTR_ERR(path); 2447 *ppath = path; 2448 *pfreepath = true; 2449 return 0; 2450 } 2451 2452 /* 2453 * request arguments may be specified via an inode *, a dentry *, or 2454 * an explicit ino+path. 2455 */ 2456 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, 2457 struct inode *rdiri, const char *rpath, 2458 u64 rino, const char **ppath, int *pathlen, 2459 u64 *ino, bool *freepath, bool parent_locked) 2460 { 2461 int r = 0; 2462 2463 if (rinode) { 2464 r = build_inode_path(rinode, ppath, pathlen, ino, freepath); 2465 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), 2466 ceph_snap(rinode)); 2467 } else if (rdentry) { 2468 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, 2469 freepath, parent_locked); 2470 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, 2471 *ppath); 2472 } else if (rpath || rino) { 2473 *ino = rino; 2474 *ppath = rpath; 2475 *pathlen = rpath ? strlen(rpath) : 0; 2476 dout(" path %.*s\n", *pathlen, rpath); 2477 } 2478 2479 return r; 2480 } 2481 2482 /* 2483 * called under mdsc->mutex 2484 */ 2485 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, 2486 struct ceph_mds_request *req, 2487 int mds, bool drop_cap_releases) 2488 { 2489 struct ceph_msg *msg; 2490 struct ceph_mds_request_head *head; 2491 const char *path1 = NULL; 2492 const char *path2 = NULL; 2493 u64 ino1 = 0, ino2 = 0; 2494 int pathlen1 = 0, pathlen2 = 0; 2495 bool freepath1 = false, freepath2 = false; 2496 int len; 2497 u16 releases; 2498 void *p, *end; 2499 int ret; 2500 2501 ret = set_request_path_attr(req->r_inode, req->r_dentry, 2502 req->r_parent, req->r_path1, req->r_ino1.ino, 2503 &path1, &pathlen1, &ino1, &freepath1, 2504 test_bit(CEPH_MDS_R_PARENT_LOCKED, 2505 &req->r_req_flags)); 2506 if (ret < 0) { 2507 msg = ERR_PTR(ret); 2508 goto out; 2509 } 2510 2511 /* If r_old_dentry is set, then assume that its parent is locked */ 2512 ret = set_request_path_attr(NULL, req->r_old_dentry, 2513 req->r_old_dentry_dir, 2514 req->r_path2, req->r_ino2.ino, 2515 &path2, &pathlen2, &ino2, &freepath2, true); 2516 if (ret < 0) { 2517 msg = ERR_PTR(ret); 2518 goto out_free1; 2519 } 2520 2521 len = sizeof(*head) + 2522 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) + 2523 sizeof(struct ceph_timespec); 2524 2525 /* calculate (max) length for cap releases */ 2526 len += sizeof(struct ceph_mds_request_release) * 2527 (!!req->r_inode_drop + !!req->r_dentry_drop + 2528 !!req->r_old_inode_drop + !!req->r_old_dentry_drop); 2529 if (req->r_dentry_drop) 2530 len += pathlen1; 2531 if (req->r_old_dentry_drop) 2532 len += pathlen2; 2533 2534 msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false); 2535 if (!msg) { 2536 msg = ERR_PTR(-ENOMEM); 2537 goto out_free2; 2538 } 2539 2540 msg->hdr.version = cpu_to_le16(2); 2541 msg->hdr.tid = cpu_to_le64(req->r_tid); 2542 2543 head = msg->front.iov_base; 2544 p = msg->front.iov_base + sizeof(*head); 2545 end = msg->front.iov_base + msg->front.iov_len; 2546 2547 head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch); 2548 head->op = cpu_to_le32(req->r_op); 2549 head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid)); 2550 head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid)); 2551 head->ino = cpu_to_le64(req->r_deleg_ino); 2552 head->args = req->r_args; 2553 2554 ceph_encode_filepath(&p, end, ino1, path1); 2555 ceph_encode_filepath(&p, end, ino2, path2); 2556 2557 /* make note of release offset, in case we need to replay */ 2558 req->r_request_release_offset = p - msg->front.iov_base; 2559 2560 /* cap releases */ 2561 releases = 0; 2562 if (req->r_inode_drop) 2563 releases += ceph_encode_inode_release(&p, 2564 req->r_inode ? req->r_inode : d_inode(req->r_dentry), 2565 mds, req->r_inode_drop, req->r_inode_unless, 2566 req->r_op == CEPH_MDS_OP_READDIR); 2567 if (req->r_dentry_drop) 2568 releases += ceph_encode_dentry_release(&p, req->r_dentry, 2569 req->r_parent, mds, req->r_dentry_drop, 2570 req->r_dentry_unless); 2571 if (req->r_old_dentry_drop) 2572 releases += ceph_encode_dentry_release(&p, req->r_old_dentry, 2573 req->r_old_dentry_dir, mds, 2574 req->r_old_dentry_drop, 2575 req->r_old_dentry_unless); 2576 if (req->r_old_inode_drop) 2577 releases += ceph_encode_inode_release(&p, 2578 d_inode(req->r_old_dentry), 2579 mds, req->r_old_inode_drop, req->r_old_inode_unless, 0); 2580 2581 if (drop_cap_releases) { 2582 releases = 0; 2583 p = msg->front.iov_base + req->r_request_release_offset; 2584 } 2585 2586 head->num_releases = cpu_to_le16(releases); 2587 2588 /* time stamp */ 2589 { 2590 struct ceph_timespec ts; 2591 ceph_encode_timespec64(&ts, &req->r_stamp); 2592 ceph_encode_copy(&p, &ts, sizeof(ts)); 2593 } 2594 2595 if (WARN_ON_ONCE(p > end)) { 2596 ceph_msg_put(msg); 2597 msg = ERR_PTR(-ERANGE); 2598 goto out_free2; 2599 } 2600 2601 msg->front.iov_len = p - msg->front.iov_base; 2602 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2603 2604 if (req->r_pagelist) { 2605 struct ceph_pagelist *pagelist = req->r_pagelist; 2606 ceph_msg_data_add_pagelist(msg, pagelist); 2607 msg->hdr.data_len = cpu_to_le32(pagelist->length); 2608 } else { 2609 msg->hdr.data_len = 0; 2610 } 2611 2612 msg->hdr.data_off = cpu_to_le16(0); 2613 2614 out_free2: 2615 if (freepath2) 2616 ceph_mdsc_free_path((char *)path2, pathlen2); 2617 out_free1: 2618 if (freepath1) 2619 ceph_mdsc_free_path((char *)path1, pathlen1); 2620 out: 2621 return msg; 2622 } 2623 2624 /* 2625 * called under mdsc->mutex if error, under no mutex if 2626 * success. 2627 */ 2628 static void complete_request(struct ceph_mds_client *mdsc, 2629 struct ceph_mds_request *req) 2630 { 2631 req->r_end_latency = ktime_get(); 2632 2633 if (req->r_callback) 2634 req->r_callback(mdsc, req); 2635 complete_all(&req->r_completion); 2636 } 2637 2638 /* 2639 * called under mdsc->mutex 2640 */ 2641 static int __prepare_send_request(struct ceph_mds_client *mdsc, 2642 struct ceph_mds_request *req, 2643 int mds, bool drop_cap_releases) 2644 { 2645 struct ceph_mds_request_head *rhead; 2646 struct ceph_msg *msg; 2647 int flags = 0; 2648 2649 req->r_attempts++; 2650 if (req->r_inode) { 2651 struct ceph_cap *cap = 2652 ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds); 2653 2654 if (cap) 2655 req->r_sent_on_mseq = cap->mseq; 2656 else 2657 req->r_sent_on_mseq = -1; 2658 } 2659 dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, 2660 req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); 2661 2662 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 2663 void *p; 2664 /* 2665 * Replay. Do not regenerate message (and rebuild 2666 * paths, etc.); just use the original message. 2667 * Rebuilding paths will break for renames because 2668 * d_move mangles the src name. 2669 */ 2670 msg = req->r_request; 2671 rhead = msg->front.iov_base; 2672 2673 flags = le32_to_cpu(rhead->flags); 2674 flags |= CEPH_MDS_FLAG_REPLAY; 2675 rhead->flags = cpu_to_le32(flags); 2676 2677 if (req->r_target_inode) 2678 rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode)); 2679 2680 rhead->num_retry = req->r_attempts - 1; 2681 2682 /* remove cap/dentry releases from message */ 2683 rhead->num_releases = 0; 2684 2685 /* time stamp */ 2686 p = msg->front.iov_base + req->r_request_release_offset; 2687 { 2688 struct ceph_timespec ts; 2689 ceph_encode_timespec64(&ts, &req->r_stamp); 2690 ceph_encode_copy(&p, &ts, sizeof(ts)); 2691 } 2692 2693 msg->front.iov_len = p - msg->front.iov_base; 2694 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2695 return 0; 2696 } 2697 2698 if (req->r_request) { 2699 ceph_msg_put(req->r_request); 2700 req->r_request = NULL; 2701 } 2702 msg = create_request_message(mdsc, req, mds, drop_cap_releases); 2703 if (IS_ERR(msg)) { 2704 req->r_err = PTR_ERR(msg); 2705 return PTR_ERR(msg); 2706 } 2707 req->r_request = msg; 2708 2709 rhead = msg->front.iov_base; 2710 rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); 2711 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2712 flags |= CEPH_MDS_FLAG_REPLAY; 2713 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) 2714 flags |= CEPH_MDS_FLAG_ASYNC; 2715 if (req->r_parent) 2716 flags |= CEPH_MDS_FLAG_WANT_DENTRY; 2717 rhead->flags = cpu_to_le32(flags); 2718 rhead->num_fwd = req->r_num_fwd; 2719 rhead->num_retry = req->r_attempts - 1; 2720 2721 dout(" r_parent = %p\n", req->r_parent); 2722 return 0; 2723 } 2724 2725 /* 2726 * called under mdsc->mutex 2727 */ 2728 static int __send_request(struct ceph_mds_client *mdsc, 2729 struct ceph_mds_session *session, 2730 struct ceph_mds_request *req, 2731 bool drop_cap_releases) 2732 { 2733 int err; 2734 2735 err = __prepare_send_request(mdsc, req, session->s_mds, 2736 drop_cap_releases); 2737 if (!err) { 2738 ceph_msg_get(req->r_request); 2739 ceph_con_send(&session->s_con, req->r_request); 2740 } 2741 2742 return err; 2743 } 2744 2745 /* 2746 * send request, or put it on the appropriate wait list. 2747 */ 2748 static void __do_request(struct ceph_mds_client *mdsc, 2749 struct ceph_mds_request *req) 2750 { 2751 struct ceph_mds_session *session = NULL; 2752 int mds = -1; 2753 int err = 0; 2754 bool random; 2755 2756 if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2757 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) 2758 __unregister_request(mdsc, req); 2759 return; 2760 } 2761 2762 if (req->r_timeout && 2763 time_after_eq(jiffies, req->r_started + req->r_timeout)) { 2764 dout("do_request timed out\n"); 2765 err = -ETIMEDOUT; 2766 goto finish; 2767 } 2768 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { 2769 dout("do_request forced umount\n"); 2770 err = -EIO; 2771 goto finish; 2772 } 2773 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { 2774 if (mdsc->mdsmap_err) { 2775 err = mdsc->mdsmap_err; 2776 dout("do_request mdsmap err %d\n", err); 2777 goto finish; 2778 } 2779 if (mdsc->mdsmap->m_epoch == 0) { 2780 dout("do_request no mdsmap, waiting for map\n"); 2781 list_add(&req->r_wait, &mdsc->waiting_for_map); 2782 return; 2783 } 2784 if (!(mdsc->fsc->mount_options->flags & 2785 CEPH_MOUNT_OPT_MOUNTWAIT) && 2786 !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) { 2787 err = -EHOSTUNREACH; 2788 goto finish; 2789 } 2790 } 2791 2792 put_request_session(req); 2793 2794 mds = __choose_mds(mdsc, req, &random); 2795 if (mds < 0 || 2796 ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) { 2797 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2798 err = -EJUKEBOX; 2799 goto finish; 2800 } 2801 dout("do_request no mds or not active, waiting for map\n"); 2802 list_add(&req->r_wait, &mdsc->waiting_for_map); 2803 return; 2804 } 2805 2806 /* get, open session */ 2807 session = __ceph_lookup_mds_session(mdsc, mds); 2808 if (!session) { 2809 session = register_session(mdsc, mds); 2810 if (IS_ERR(session)) { 2811 err = PTR_ERR(session); 2812 goto finish; 2813 } 2814 } 2815 req->r_session = ceph_get_mds_session(session); 2816 2817 dout("do_request mds%d session %p state %s\n", mds, session, 2818 ceph_session_state_name(session->s_state)); 2819 if (session->s_state != CEPH_MDS_SESSION_OPEN && 2820 session->s_state != CEPH_MDS_SESSION_HUNG) { 2821 if (session->s_state == CEPH_MDS_SESSION_REJECTED) { 2822 err = -EACCES; 2823 goto out_session; 2824 } 2825 /* 2826 * We cannot queue async requests since the caps and delegated 2827 * inodes are bound to the session. Just return -EJUKEBOX and 2828 * let the caller retry a sync request in that case. 2829 */ 2830 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) { 2831 err = -EJUKEBOX; 2832 goto out_session; 2833 } 2834 if (session->s_state == CEPH_MDS_SESSION_NEW || 2835 session->s_state == CEPH_MDS_SESSION_CLOSING) { 2836 err = __open_session(mdsc, session); 2837 if (err) 2838 goto out_session; 2839 /* retry the same mds later */ 2840 if (random) 2841 req->r_resend_mds = mds; 2842 } 2843 list_add(&req->r_wait, &session->s_waiting); 2844 goto out_session; 2845 } 2846 2847 /* send request */ 2848 req->r_resend_mds = -1; /* forget any previous mds hint */ 2849 2850 if (req->r_request_started == 0) /* note request start time */ 2851 req->r_request_started = jiffies; 2852 2853 err = __send_request(mdsc, session, req, false); 2854 2855 out_session: 2856 ceph_put_mds_session(session); 2857 finish: 2858 if (err) { 2859 dout("__do_request early error %d\n", err); 2860 req->r_err = err; 2861 complete_request(mdsc, req); 2862 __unregister_request(mdsc, req); 2863 } 2864 return; 2865 } 2866 2867 /* 2868 * called under mdsc->mutex 2869 */ 2870 static void __wake_requests(struct ceph_mds_client *mdsc, 2871 struct list_head *head) 2872 { 2873 struct ceph_mds_request *req; 2874 LIST_HEAD(tmp_list); 2875 2876 list_splice_init(head, &tmp_list); 2877 2878 while (!list_empty(&tmp_list)) { 2879 req = list_entry(tmp_list.next, 2880 struct ceph_mds_request, r_wait); 2881 list_del_init(&req->r_wait); 2882 dout(" wake request %p tid %llu\n", req, req->r_tid); 2883 __do_request(mdsc, req); 2884 } 2885 } 2886 2887 /* 2888 * Wake up threads with requests pending for @mds, so that they can 2889 * resubmit their requests to a possibly different mds. 2890 */ 2891 static void kick_requests(struct ceph_mds_client *mdsc, int mds) 2892 { 2893 struct ceph_mds_request *req; 2894 struct rb_node *p = rb_first(&mdsc->request_tree); 2895 2896 dout("kick_requests mds%d\n", mds); 2897 while (p) { 2898 req = rb_entry(p, struct ceph_mds_request, r_node); 2899 p = rb_next(p); 2900 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 2901 continue; 2902 if (req->r_attempts > 0) 2903 continue; /* only new requests */ 2904 if (req->r_session && 2905 req->r_session->s_mds == mds) { 2906 dout(" kicking tid %llu\n", req->r_tid); 2907 list_del_init(&req->r_wait); 2908 __do_request(mdsc, req); 2909 } 2910 } 2911 } 2912 2913 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir, 2914 struct ceph_mds_request *req) 2915 { 2916 int err = 0; 2917 2918 /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ 2919 if (req->r_inode) 2920 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); 2921 if (req->r_parent) { 2922 struct ceph_inode_info *ci = ceph_inode(req->r_parent); 2923 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ? 2924 CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD; 2925 spin_lock(&ci->i_ceph_lock); 2926 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false); 2927 __ceph_touch_fmode(ci, mdsc, fmode); 2928 spin_unlock(&ci->i_ceph_lock); 2929 ihold(req->r_parent); 2930 } 2931 if (req->r_old_dentry_dir) 2932 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), 2933 CEPH_CAP_PIN); 2934 2935 if (req->r_inode) { 2936 err = ceph_wait_on_async_create(req->r_inode); 2937 if (err) { 2938 dout("%s: wait for async create returned: %d\n", 2939 __func__, err); 2940 return err; 2941 } 2942 } 2943 2944 if (!err && req->r_old_inode) { 2945 err = ceph_wait_on_async_create(req->r_old_inode); 2946 if (err) { 2947 dout("%s: wait for async create returned: %d\n", 2948 __func__, err); 2949 return err; 2950 } 2951 } 2952 2953 dout("submit_request on %p for inode %p\n", req, dir); 2954 mutex_lock(&mdsc->mutex); 2955 __register_request(mdsc, req, dir); 2956 __do_request(mdsc, req); 2957 err = req->r_err; 2958 mutex_unlock(&mdsc->mutex); 2959 return err; 2960 } 2961 2962 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc, 2963 struct ceph_mds_request *req) 2964 { 2965 int err; 2966 2967 /* wait */ 2968 dout("do_request waiting\n"); 2969 if (!req->r_timeout && req->r_wait_for_completion) { 2970 err = req->r_wait_for_completion(mdsc, req); 2971 } else { 2972 long timeleft = wait_for_completion_killable_timeout( 2973 &req->r_completion, 2974 ceph_timeout_jiffies(req->r_timeout)); 2975 if (timeleft > 0) 2976 err = 0; 2977 else if (!timeleft) 2978 err = -ETIMEDOUT; /* timed out */ 2979 else 2980 err = timeleft; /* killed */ 2981 } 2982 dout("do_request waited, got %d\n", err); 2983 mutex_lock(&mdsc->mutex); 2984 2985 /* only abort if we didn't race with a real reply */ 2986 if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { 2987 err = le32_to_cpu(req->r_reply_info.head->result); 2988 } else if (err < 0) { 2989 dout("aborted request %lld with %d\n", req->r_tid, err); 2990 2991 /* 2992 * ensure we aren't running concurrently with 2993 * ceph_fill_trace or ceph_readdir_prepopulate, which 2994 * rely on locks (dir mutex) held by our caller. 2995 */ 2996 mutex_lock(&req->r_fill_mutex); 2997 req->r_err = err; 2998 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); 2999 mutex_unlock(&req->r_fill_mutex); 3000 3001 if (req->r_parent && 3002 (req->r_op & CEPH_MDS_OP_WRITE)) 3003 ceph_invalidate_dir_request(req); 3004 } else { 3005 err = req->r_err; 3006 } 3007 3008 mutex_unlock(&mdsc->mutex); 3009 return err; 3010 } 3011 3012 /* 3013 * Synchrously perform an mds request. Take care of all of the 3014 * session setup, forwarding, retry details. 3015 */ 3016 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, 3017 struct inode *dir, 3018 struct ceph_mds_request *req) 3019 { 3020 int err; 3021 3022 dout("do_request on %p\n", req); 3023 3024 /* issue */ 3025 err = ceph_mdsc_submit_request(mdsc, dir, req); 3026 if (!err) 3027 err = ceph_mdsc_wait_request(mdsc, req); 3028 dout("do_request %p done, result %d\n", req, err); 3029 return err; 3030 } 3031 3032 /* 3033 * Invalidate dir's completeness, dentry lease state on an aborted MDS 3034 * namespace request. 3035 */ 3036 void ceph_invalidate_dir_request(struct ceph_mds_request *req) 3037 { 3038 struct inode *dir = req->r_parent; 3039 struct inode *old_dir = req->r_old_dentry_dir; 3040 3041 dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir); 3042 3043 ceph_dir_clear_complete(dir); 3044 if (old_dir) 3045 ceph_dir_clear_complete(old_dir); 3046 if (req->r_dentry) 3047 ceph_invalidate_dentry_lease(req->r_dentry); 3048 if (req->r_old_dentry) 3049 ceph_invalidate_dentry_lease(req->r_old_dentry); 3050 } 3051 3052 /* 3053 * Handle mds reply. 3054 * 3055 * We take the session mutex and parse and process the reply immediately. 3056 * This preserves the logical ordering of replies, capabilities, etc., sent 3057 * by the MDS as they are applied to our local cache. 3058 */ 3059 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) 3060 { 3061 struct ceph_mds_client *mdsc = session->s_mdsc; 3062 struct ceph_mds_request *req; 3063 struct ceph_mds_reply_head *head = msg->front.iov_base; 3064 struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */ 3065 struct ceph_snap_realm *realm; 3066 u64 tid; 3067 int err, result; 3068 int mds = session->s_mds; 3069 3070 if (msg->front.iov_len < sizeof(*head)) { 3071 pr_err("mdsc_handle_reply got corrupt (short) reply\n"); 3072 ceph_msg_dump(msg); 3073 return; 3074 } 3075 3076 /* get request, session */ 3077 tid = le64_to_cpu(msg->hdr.tid); 3078 mutex_lock(&mdsc->mutex); 3079 req = lookup_get_request(mdsc, tid); 3080 if (!req) { 3081 dout("handle_reply on unknown tid %llu\n", tid); 3082 mutex_unlock(&mdsc->mutex); 3083 return; 3084 } 3085 dout("handle_reply %p\n", req); 3086 3087 /* correct session? */ 3088 if (req->r_session != session) { 3089 pr_err("mdsc_handle_reply got %llu on session mds%d" 3090 " not mds%d\n", tid, session->s_mds, 3091 req->r_session ? req->r_session->s_mds : -1); 3092 mutex_unlock(&mdsc->mutex); 3093 goto out; 3094 } 3095 3096 /* dup? */ 3097 if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || 3098 (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { 3099 pr_warn("got a dup %s reply on %llu from mds%d\n", 3100 head->safe ? "safe" : "unsafe", tid, mds); 3101 mutex_unlock(&mdsc->mutex); 3102 goto out; 3103 } 3104 if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { 3105 pr_warn("got unsafe after safe on %llu from mds%d\n", 3106 tid, mds); 3107 mutex_unlock(&mdsc->mutex); 3108 goto out; 3109 } 3110 3111 result = le32_to_cpu(head->result); 3112 3113 /* 3114 * Handle an ESTALE 3115 * if we're not talking to the authority, send to them 3116 * if the authority has changed while we weren't looking, 3117 * send to new authority 3118 * Otherwise we just have to return an ESTALE 3119 */ 3120 if (result == -ESTALE) { 3121 dout("got ESTALE on request %llu\n", req->r_tid); 3122 req->r_resend_mds = -1; 3123 if (req->r_direct_mode != USE_AUTH_MDS) { 3124 dout("not using auth, setting for that now\n"); 3125 req->r_direct_mode = USE_AUTH_MDS; 3126 __do_request(mdsc, req); 3127 mutex_unlock(&mdsc->mutex); 3128 goto out; 3129 } else { 3130 int mds = __choose_mds(mdsc, req, NULL); 3131 if (mds >= 0 && mds != req->r_session->s_mds) { 3132 dout("but auth changed, so resending\n"); 3133 __do_request(mdsc, req); 3134 mutex_unlock(&mdsc->mutex); 3135 goto out; 3136 } 3137 } 3138 dout("have to return ESTALE on request %llu\n", req->r_tid); 3139 } 3140 3141 3142 if (head->safe) { 3143 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); 3144 __unregister_request(mdsc, req); 3145 3146 /* last request during umount? */ 3147 if (mdsc->stopping && !__get_oldest_req(mdsc)) 3148 complete_all(&mdsc->safe_umount_waiters); 3149 3150 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3151 /* 3152 * We already handled the unsafe response, now do the 3153 * cleanup. No need to examine the response; the MDS 3154 * doesn't include any result info in the safe 3155 * response. And even if it did, there is nothing 3156 * useful we could do with a revised return value. 3157 */ 3158 dout("got safe reply %llu, mds%d\n", tid, mds); 3159 3160 mutex_unlock(&mdsc->mutex); 3161 goto out; 3162 } 3163 } else { 3164 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); 3165 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); 3166 } 3167 3168 dout("handle_reply tid %lld result %d\n", tid, result); 3169 rinfo = &req->r_reply_info; 3170 if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features)) 3171 err = parse_reply_info(session, msg, rinfo, (u64)-1); 3172 else 3173 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features); 3174 mutex_unlock(&mdsc->mutex); 3175 3176 mutex_lock(&session->s_mutex); 3177 if (err < 0) { 3178 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid); 3179 ceph_msg_dump(msg); 3180 goto out_err; 3181 } 3182 3183 /* snap trace */ 3184 realm = NULL; 3185 if (rinfo->snapblob_len) { 3186 down_write(&mdsc->snap_rwsem); 3187 ceph_update_snap_trace(mdsc, rinfo->snapblob, 3188 rinfo->snapblob + rinfo->snapblob_len, 3189 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP, 3190 &realm); 3191 downgrade_write(&mdsc->snap_rwsem); 3192 } else { 3193 down_read(&mdsc->snap_rwsem); 3194 } 3195 3196 /* insert trace into our cache */ 3197 mutex_lock(&req->r_fill_mutex); 3198 current->journal_info = req; 3199 err = ceph_fill_trace(mdsc->fsc->sb, req); 3200 if (err == 0) { 3201 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || 3202 req->r_op == CEPH_MDS_OP_LSSNAP)) 3203 ceph_readdir_prepopulate(req, req->r_session); 3204 } 3205 current->journal_info = NULL; 3206 mutex_unlock(&req->r_fill_mutex); 3207 3208 up_read(&mdsc->snap_rwsem); 3209 if (realm) 3210 ceph_put_snap_realm(mdsc, realm); 3211 3212 if (err == 0) { 3213 if (req->r_target_inode && 3214 test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { 3215 struct ceph_inode_info *ci = 3216 ceph_inode(req->r_target_inode); 3217 spin_lock(&ci->i_unsafe_lock); 3218 list_add_tail(&req->r_unsafe_target_item, 3219 &ci->i_unsafe_iops); 3220 spin_unlock(&ci->i_unsafe_lock); 3221 } 3222 3223 ceph_unreserve_caps(mdsc, &req->r_caps_reservation); 3224 } 3225 out_err: 3226 mutex_lock(&mdsc->mutex); 3227 if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3228 if (err) { 3229 req->r_err = err; 3230 } else { 3231 req->r_reply = ceph_msg_get(msg); 3232 set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); 3233 } 3234 } else { 3235 dout("reply arrived after request %lld was aborted\n", tid); 3236 } 3237 mutex_unlock(&mdsc->mutex); 3238 3239 mutex_unlock(&session->s_mutex); 3240 3241 /* kick calling process */ 3242 complete_request(mdsc, req); 3243 3244 ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency, 3245 req->r_end_latency, err); 3246 out: 3247 ceph_mdsc_put_request(req); 3248 return; 3249 } 3250 3251 3252 3253 /* 3254 * handle mds notification that our request has been forwarded. 3255 */ 3256 static void handle_forward(struct ceph_mds_client *mdsc, 3257 struct ceph_mds_session *session, 3258 struct ceph_msg *msg) 3259 { 3260 struct ceph_mds_request *req; 3261 u64 tid = le64_to_cpu(msg->hdr.tid); 3262 u32 next_mds; 3263 u32 fwd_seq; 3264 int err = -EINVAL; 3265 void *p = msg->front.iov_base; 3266 void *end = p + msg->front.iov_len; 3267 3268 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3269 next_mds = ceph_decode_32(&p); 3270 fwd_seq = ceph_decode_32(&p); 3271 3272 mutex_lock(&mdsc->mutex); 3273 req = lookup_get_request(mdsc, tid); 3274 if (!req) { 3275 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds); 3276 goto out; /* dup reply? */ 3277 } 3278 3279 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { 3280 dout("forward tid %llu aborted, unregistering\n", tid); 3281 __unregister_request(mdsc, req); 3282 } else if (fwd_seq <= req->r_num_fwd) { 3283 dout("forward tid %llu to mds%d - old seq %d <= %d\n", 3284 tid, next_mds, req->r_num_fwd, fwd_seq); 3285 } else { 3286 /* resend. forward race not possible; mds would drop */ 3287 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); 3288 BUG_ON(req->r_err); 3289 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); 3290 req->r_attempts = 0; 3291 req->r_num_fwd = fwd_seq; 3292 req->r_resend_mds = next_mds; 3293 put_request_session(req); 3294 __do_request(mdsc, req); 3295 } 3296 ceph_mdsc_put_request(req); 3297 out: 3298 mutex_unlock(&mdsc->mutex); 3299 return; 3300 3301 bad: 3302 pr_err("mdsc_handle_forward decode error err=%d\n", err); 3303 } 3304 3305 static int __decode_session_metadata(void **p, void *end, 3306 bool *blocklisted) 3307 { 3308 /* map<string,string> */ 3309 u32 n; 3310 bool err_str; 3311 ceph_decode_32_safe(p, end, n, bad); 3312 while (n-- > 0) { 3313 u32 len; 3314 ceph_decode_32_safe(p, end, len, bad); 3315 ceph_decode_need(p, end, len, bad); 3316 err_str = !strncmp(*p, "error_string", len); 3317 *p += len; 3318 ceph_decode_32_safe(p, end, len, bad); 3319 ceph_decode_need(p, end, len, bad); 3320 /* 3321 * Match "blocklisted (blacklisted)" from newer MDSes, 3322 * or "blacklisted" from older MDSes. 3323 */ 3324 if (err_str && strnstr(*p, "blacklisted", len)) 3325 *blocklisted = true; 3326 *p += len; 3327 } 3328 return 0; 3329 bad: 3330 return -1; 3331 } 3332 3333 /* 3334 * handle a mds session control message 3335 */ 3336 static void handle_session(struct ceph_mds_session *session, 3337 struct ceph_msg *msg) 3338 { 3339 struct ceph_mds_client *mdsc = session->s_mdsc; 3340 int mds = session->s_mds; 3341 int msg_version = le16_to_cpu(msg->hdr.version); 3342 void *p = msg->front.iov_base; 3343 void *end = p + msg->front.iov_len; 3344 struct ceph_mds_session_head *h; 3345 u32 op; 3346 u64 seq, features = 0; 3347 int wake = 0; 3348 bool blocklisted = false; 3349 3350 /* decode */ 3351 ceph_decode_need(&p, end, sizeof(*h), bad); 3352 h = p; 3353 p += sizeof(*h); 3354 3355 op = le32_to_cpu(h->op); 3356 seq = le64_to_cpu(h->seq); 3357 3358 if (msg_version >= 3) { 3359 u32 len; 3360 /* version >= 2, metadata */ 3361 if (__decode_session_metadata(&p, end, &blocklisted) < 0) 3362 goto bad; 3363 /* version >= 3, feature bits */ 3364 ceph_decode_32_safe(&p, end, len, bad); 3365 if (len) { 3366 ceph_decode_64_safe(&p, end, features, bad); 3367 p += len - sizeof(features); 3368 } 3369 } 3370 3371 mutex_lock(&mdsc->mutex); 3372 if (op == CEPH_SESSION_CLOSE) { 3373 ceph_get_mds_session(session); 3374 __unregister_session(mdsc, session); 3375 } 3376 /* FIXME: this ttl calculation is generous */ 3377 session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose; 3378 mutex_unlock(&mdsc->mutex); 3379 3380 mutex_lock(&session->s_mutex); 3381 3382 dout("handle_session mds%d %s %p state %s seq %llu\n", 3383 mds, ceph_session_op_name(op), session, 3384 ceph_session_state_name(session->s_state), seq); 3385 3386 if (session->s_state == CEPH_MDS_SESSION_HUNG) { 3387 session->s_state = CEPH_MDS_SESSION_OPEN; 3388 pr_info("mds%d came back\n", session->s_mds); 3389 } 3390 3391 switch (op) { 3392 case CEPH_SESSION_OPEN: 3393 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3394 pr_info("mds%d reconnect success\n", session->s_mds); 3395 session->s_state = CEPH_MDS_SESSION_OPEN; 3396 session->s_features = features; 3397 renewed_caps(mdsc, session, 0); 3398 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features)) 3399 metric_schedule_delayed(&mdsc->metric); 3400 wake = 1; 3401 if (mdsc->stopping) 3402 __close_session(mdsc, session); 3403 break; 3404 3405 case CEPH_SESSION_RENEWCAPS: 3406 if (session->s_renew_seq == seq) 3407 renewed_caps(mdsc, session, 1); 3408 break; 3409 3410 case CEPH_SESSION_CLOSE: 3411 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING) 3412 pr_info("mds%d reconnect denied\n", session->s_mds); 3413 session->s_state = CEPH_MDS_SESSION_CLOSED; 3414 cleanup_session_requests(mdsc, session); 3415 remove_session_caps(session); 3416 wake = 2; /* for good measure */ 3417 wake_up_all(&mdsc->session_close_wq); 3418 break; 3419 3420 case CEPH_SESSION_STALE: 3421 pr_info("mds%d caps went stale, renewing\n", 3422 session->s_mds); 3423 spin_lock(&session->s_gen_ttl_lock); 3424 session->s_cap_gen++; 3425 session->s_cap_ttl = jiffies - 1; 3426 spin_unlock(&session->s_gen_ttl_lock); 3427 send_renew_caps(mdsc, session); 3428 break; 3429 3430 case CEPH_SESSION_RECALL_STATE: 3431 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps)); 3432 break; 3433 3434 case CEPH_SESSION_FLUSHMSG: 3435 send_flushmsg_ack(mdsc, session, seq); 3436 break; 3437 3438 case CEPH_SESSION_FORCE_RO: 3439 dout("force_session_readonly %p\n", session); 3440 spin_lock(&session->s_cap_lock); 3441 session->s_readonly = true; 3442 spin_unlock(&session->s_cap_lock); 3443 wake_up_session_caps(session, FORCE_RO); 3444 break; 3445 3446 case CEPH_SESSION_REJECT: 3447 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING); 3448 pr_info("mds%d rejected session\n", session->s_mds); 3449 session->s_state = CEPH_MDS_SESSION_REJECTED; 3450 cleanup_session_requests(mdsc, session); 3451 remove_session_caps(session); 3452 if (blocklisted) 3453 mdsc->fsc->blocklisted = true; 3454 wake = 2; /* for good measure */ 3455 break; 3456 3457 default: 3458 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds); 3459 WARN_ON(1); 3460 } 3461 3462 mutex_unlock(&session->s_mutex); 3463 if (wake) { 3464 mutex_lock(&mdsc->mutex); 3465 __wake_requests(mdsc, &session->s_waiting); 3466 if (wake == 2) 3467 kick_requests(mdsc, mds); 3468 mutex_unlock(&mdsc->mutex); 3469 } 3470 if (op == CEPH_SESSION_CLOSE) 3471 ceph_put_mds_session(session); 3472 return; 3473 3474 bad: 3475 pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds, 3476 (int)msg->front.iov_len); 3477 ceph_msg_dump(msg); 3478 return; 3479 } 3480 3481 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) 3482 { 3483 int dcaps; 3484 3485 dcaps = xchg(&req->r_dir_caps, 0); 3486 if (dcaps) { 3487 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3488 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps); 3489 } 3490 } 3491 3492 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req) 3493 { 3494 int dcaps; 3495 3496 dcaps = xchg(&req->r_dir_caps, 0); 3497 if (dcaps) { 3498 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps)); 3499 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent), 3500 dcaps); 3501 } 3502 } 3503 3504 /* 3505 * called under session->mutex. 3506 */ 3507 static void replay_unsafe_requests(struct ceph_mds_client *mdsc, 3508 struct ceph_mds_session *session) 3509 { 3510 struct ceph_mds_request *req, *nreq; 3511 struct rb_node *p; 3512 3513 dout("replay_unsafe_requests mds%d\n", session->s_mds); 3514 3515 mutex_lock(&mdsc->mutex); 3516 list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) 3517 __send_request(mdsc, session, req, true); 3518 3519 /* 3520 * also re-send old requests when MDS enters reconnect stage. So that MDS 3521 * can process completed request in clientreplay stage. 3522 */ 3523 p = rb_first(&mdsc->request_tree); 3524 while (p) { 3525 req = rb_entry(p, struct ceph_mds_request, r_node); 3526 p = rb_next(p); 3527 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) 3528 continue; 3529 if (req->r_attempts == 0) 3530 continue; /* only old requests */ 3531 if (!req->r_session) 3532 continue; 3533 if (req->r_session->s_mds != session->s_mds) 3534 continue; 3535 3536 ceph_mdsc_release_dir_caps_no_check(req); 3537 3538 __send_request(mdsc, session, req, true); 3539 } 3540 mutex_unlock(&mdsc->mutex); 3541 } 3542 3543 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state) 3544 { 3545 struct ceph_msg *reply; 3546 struct ceph_pagelist *_pagelist; 3547 struct page *page; 3548 __le32 *addr; 3549 int err = -ENOMEM; 3550 3551 if (!recon_state->allow_multi) 3552 return -ENOSPC; 3553 3554 /* can't handle message that contains both caps and realm */ 3555 BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms); 3556 3557 /* pre-allocate new pagelist */ 3558 _pagelist = ceph_pagelist_alloc(GFP_NOFS); 3559 if (!_pagelist) 3560 return -ENOMEM; 3561 3562 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3563 if (!reply) 3564 goto fail_msg; 3565 3566 /* placeholder for nr_caps */ 3567 err = ceph_pagelist_encode_32(_pagelist, 0); 3568 if (err < 0) 3569 goto fail; 3570 3571 if (recon_state->nr_caps) { 3572 /* currently encoding caps */ 3573 err = ceph_pagelist_encode_32(recon_state->pagelist, 0); 3574 if (err) 3575 goto fail; 3576 } else { 3577 /* placeholder for nr_realms (currently encoding relams) */ 3578 err = ceph_pagelist_encode_32(_pagelist, 0); 3579 if (err < 0) 3580 goto fail; 3581 } 3582 3583 err = ceph_pagelist_encode_8(recon_state->pagelist, 1); 3584 if (err) 3585 goto fail; 3586 3587 page = list_first_entry(&recon_state->pagelist->head, struct page, lru); 3588 addr = kmap_atomic(page); 3589 if (recon_state->nr_caps) { 3590 /* currently encoding caps */ 3591 *addr = cpu_to_le32(recon_state->nr_caps); 3592 } else { 3593 /* currently encoding relams */ 3594 *(addr + 1) = cpu_to_le32(recon_state->nr_realms); 3595 } 3596 kunmap_atomic(addr); 3597 3598 reply->hdr.version = cpu_to_le16(5); 3599 reply->hdr.compat_version = cpu_to_le16(4); 3600 3601 reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length); 3602 ceph_msg_data_add_pagelist(reply, recon_state->pagelist); 3603 3604 ceph_con_send(&recon_state->session->s_con, reply); 3605 ceph_pagelist_release(recon_state->pagelist); 3606 3607 recon_state->pagelist = _pagelist; 3608 recon_state->nr_caps = 0; 3609 recon_state->nr_realms = 0; 3610 recon_state->msg_version = 5; 3611 return 0; 3612 fail: 3613 ceph_msg_put(reply); 3614 fail_msg: 3615 ceph_pagelist_release(_pagelist); 3616 return err; 3617 } 3618 3619 static struct dentry* d_find_primary(struct inode *inode) 3620 { 3621 struct dentry *alias, *dn = NULL; 3622 3623 if (hlist_empty(&inode->i_dentry)) 3624 return NULL; 3625 3626 spin_lock(&inode->i_lock); 3627 if (hlist_empty(&inode->i_dentry)) 3628 goto out_unlock; 3629 3630 if (S_ISDIR(inode->i_mode)) { 3631 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias); 3632 if (!IS_ROOT(alias)) 3633 dn = dget(alias); 3634 goto out_unlock; 3635 } 3636 3637 hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) { 3638 spin_lock(&alias->d_lock); 3639 if (!d_unhashed(alias) && 3640 (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) { 3641 dn = dget_dlock(alias); 3642 } 3643 spin_unlock(&alias->d_lock); 3644 if (dn) 3645 break; 3646 } 3647 out_unlock: 3648 spin_unlock(&inode->i_lock); 3649 return dn; 3650 } 3651 3652 /* 3653 * Encode information about a cap for a reconnect with the MDS. 3654 */ 3655 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap, 3656 void *arg) 3657 { 3658 union { 3659 struct ceph_mds_cap_reconnect v2; 3660 struct ceph_mds_cap_reconnect_v1 v1; 3661 } rec; 3662 struct ceph_inode_info *ci = cap->ci; 3663 struct ceph_reconnect_state *recon_state = arg; 3664 struct ceph_pagelist *pagelist = recon_state->pagelist; 3665 struct dentry *dentry; 3666 char *path; 3667 int pathlen, err; 3668 u64 pathbase; 3669 u64 snap_follows; 3670 3671 dout(" adding %p ino %llx.%llx cap %p %lld %s\n", 3672 inode, ceph_vinop(inode), cap, cap->cap_id, 3673 ceph_cap_string(cap->issued)); 3674 3675 dentry = d_find_primary(inode); 3676 if (dentry) { 3677 /* set pathbase to parent dir when msg_version >= 2 */ 3678 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 3679 recon_state->msg_version >= 2); 3680 dput(dentry); 3681 if (IS_ERR(path)) { 3682 err = PTR_ERR(path); 3683 goto out_err; 3684 } 3685 } else { 3686 path = NULL; 3687 pathlen = 0; 3688 pathbase = 0; 3689 } 3690 3691 spin_lock(&ci->i_ceph_lock); 3692 cap->seq = 0; /* reset cap seq */ 3693 cap->issue_seq = 0; /* and issue_seq */ 3694 cap->mseq = 0; /* and migrate_seq */ 3695 cap->cap_gen = cap->session->s_cap_gen; 3696 3697 /* These are lost when the session goes away */ 3698 if (S_ISDIR(inode->i_mode)) { 3699 if (cap->issued & CEPH_CAP_DIR_CREATE) { 3700 ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns)); 3701 memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout)); 3702 } 3703 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS; 3704 } 3705 3706 if (recon_state->msg_version >= 2) { 3707 rec.v2.cap_id = cpu_to_le64(cap->cap_id); 3708 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3709 rec.v2.issued = cpu_to_le32(cap->issued); 3710 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3711 rec.v2.pathbase = cpu_to_le64(pathbase); 3712 rec.v2.flock_len = (__force __le32) 3713 ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); 3714 } else { 3715 rec.v1.cap_id = cpu_to_le64(cap->cap_id); 3716 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); 3717 rec.v1.issued = cpu_to_le32(cap->issued); 3718 rec.v1.size = cpu_to_le64(inode->i_size); 3719 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); 3720 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); 3721 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); 3722 rec.v1.pathbase = cpu_to_le64(pathbase); 3723 } 3724 3725 if (list_empty(&ci->i_cap_snaps)) { 3726 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0; 3727 } else { 3728 struct ceph_cap_snap *capsnap = 3729 list_first_entry(&ci->i_cap_snaps, 3730 struct ceph_cap_snap, ci_item); 3731 snap_follows = capsnap->follows; 3732 } 3733 spin_unlock(&ci->i_ceph_lock); 3734 3735 if (recon_state->msg_version >= 2) { 3736 int num_fcntl_locks, num_flock_locks; 3737 struct ceph_filelock *flocks = NULL; 3738 size_t struct_len, total_len = sizeof(u64); 3739 u8 struct_v = 0; 3740 3741 encode_again: 3742 if (rec.v2.flock_len) { 3743 ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks); 3744 } else { 3745 num_fcntl_locks = 0; 3746 num_flock_locks = 0; 3747 } 3748 if (num_fcntl_locks + num_flock_locks > 0) { 3749 flocks = kmalloc_array(num_fcntl_locks + num_flock_locks, 3750 sizeof(struct ceph_filelock), 3751 GFP_NOFS); 3752 if (!flocks) { 3753 err = -ENOMEM; 3754 goto out_err; 3755 } 3756 err = ceph_encode_locks_to_buffer(inode, flocks, 3757 num_fcntl_locks, 3758 num_flock_locks); 3759 if (err) { 3760 kfree(flocks); 3761 flocks = NULL; 3762 if (err == -ENOSPC) 3763 goto encode_again; 3764 goto out_err; 3765 } 3766 } else { 3767 kfree(flocks); 3768 flocks = NULL; 3769 } 3770 3771 if (recon_state->msg_version >= 3) { 3772 /* version, compat_version and struct_len */ 3773 total_len += 2 * sizeof(u8) + sizeof(u32); 3774 struct_v = 2; 3775 } 3776 /* 3777 * number of encoded locks is stable, so copy to pagelist 3778 */ 3779 struct_len = 2 * sizeof(u32) + 3780 (num_fcntl_locks + num_flock_locks) * 3781 sizeof(struct ceph_filelock); 3782 rec.v2.flock_len = cpu_to_le32(struct_len); 3783 3784 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2); 3785 3786 if (struct_v >= 2) 3787 struct_len += sizeof(u64); /* snap_follows */ 3788 3789 total_len += struct_len; 3790 3791 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) { 3792 err = send_reconnect_partial(recon_state); 3793 if (err) 3794 goto out_freeflocks; 3795 pagelist = recon_state->pagelist; 3796 } 3797 3798 err = ceph_pagelist_reserve(pagelist, total_len); 3799 if (err) 3800 goto out_freeflocks; 3801 3802 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3803 if (recon_state->msg_version >= 3) { 3804 ceph_pagelist_encode_8(pagelist, struct_v); 3805 ceph_pagelist_encode_8(pagelist, 1); 3806 ceph_pagelist_encode_32(pagelist, struct_len); 3807 } 3808 ceph_pagelist_encode_string(pagelist, path, pathlen); 3809 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); 3810 ceph_locks_to_pagelist(flocks, pagelist, 3811 num_fcntl_locks, num_flock_locks); 3812 if (struct_v >= 2) 3813 ceph_pagelist_encode_64(pagelist, snap_follows); 3814 out_freeflocks: 3815 kfree(flocks); 3816 } else { 3817 err = ceph_pagelist_reserve(pagelist, 3818 sizeof(u64) + sizeof(u32) + 3819 pathlen + sizeof(rec.v1)); 3820 if (err) 3821 goto out_err; 3822 3823 ceph_pagelist_encode_64(pagelist, ceph_ino(inode)); 3824 ceph_pagelist_encode_string(pagelist, path, pathlen); 3825 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); 3826 } 3827 3828 out_err: 3829 ceph_mdsc_free_path(path, pathlen); 3830 if (!err) 3831 recon_state->nr_caps++; 3832 return err; 3833 } 3834 3835 static int encode_snap_realms(struct ceph_mds_client *mdsc, 3836 struct ceph_reconnect_state *recon_state) 3837 { 3838 struct rb_node *p; 3839 struct ceph_pagelist *pagelist = recon_state->pagelist; 3840 int err = 0; 3841 3842 if (recon_state->msg_version >= 4) { 3843 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms); 3844 if (err < 0) 3845 goto fail; 3846 } 3847 3848 /* 3849 * snaprealms. we provide mds with the ino, seq (version), and 3850 * parent for all of our realms. If the mds has any newer info, 3851 * it will tell us. 3852 */ 3853 for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) { 3854 struct ceph_snap_realm *realm = 3855 rb_entry(p, struct ceph_snap_realm, node); 3856 struct ceph_mds_snaprealm_reconnect sr_rec; 3857 3858 if (recon_state->msg_version >= 4) { 3859 size_t need = sizeof(u8) * 2 + sizeof(u32) + 3860 sizeof(sr_rec); 3861 3862 if (pagelist->length + need > RECONNECT_MAX_SIZE) { 3863 err = send_reconnect_partial(recon_state); 3864 if (err) 3865 goto fail; 3866 pagelist = recon_state->pagelist; 3867 } 3868 3869 err = ceph_pagelist_reserve(pagelist, need); 3870 if (err) 3871 goto fail; 3872 3873 ceph_pagelist_encode_8(pagelist, 1); 3874 ceph_pagelist_encode_8(pagelist, 1); 3875 ceph_pagelist_encode_32(pagelist, sizeof(sr_rec)); 3876 } 3877 3878 dout(" adding snap realm %llx seq %lld parent %llx\n", 3879 realm->ino, realm->seq, realm->parent_ino); 3880 sr_rec.ino = cpu_to_le64(realm->ino); 3881 sr_rec.seq = cpu_to_le64(realm->seq); 3882 sr_rec.parent = cpu_to_le64(realm->parent_ino); 3883 3884 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec)); 3885 if (err) 3886 goto fail; 3887 3888 recon_state->nr_realms++; 3889 } 3890 fail: 3891 return err; 3892 } 3893 3894 3895 /* 3896 * If an MDS fails and recovers, clients need to reconnect in order to 3897 * reestablish shared state. This includes all caps issued through 3898 * this session _and_ the snap_realm hierarchy. Because it's not 3899 * clear which snap realms the mds cares about, we send everything we 3900 * know about.. that ensures we'll then get any new info the 3901 * recovering MDS might have. 3902 * 3903 * This is a relatively heavyweight operation, but it's rare. 3904 */ 3905 static void send_mds_reconnect(struct ceph_mds_client *mdsc, 3906 struct ceph_mds_session *session) 3907 { 3908 struct ceph_msg *reply; 3909 int mds = session->s_mds; 3910 int err = -ENOMEM; 3911 struct ceph_reconnect_state recon_state = { 3912 .session = session, 3913 }; 3914 LIST_HEAD(dispose); 3915 3916 pr_info("mds%d reconnect start\n", mds); 3917 3918 recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS); 3919 if (!recon_state.pagelist) 3920 goto fail_nopagelist; 3921 3922 reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false); 3923 if (!reply) 3924 goto fail_nomsg; 3925 3926 xa_destroy(&session->s_delegated_inos); 3927 3928 mutex_lock(&session->s_mutex); 3929 session->s_state = CEPH_MDS_SESSION_RECONNECTING; 3930 session->s_seq = 0; 3931 3932 dout("session %p state %s\n", session, 3933 ceph_session_state_name(session->s_state)); 3934 3935 spin_lock(&session->s_gen_ttl_lock); 3936 session->s_cap_gen++; 3937 spin_unlock(&session->s_gen_ttl_lock); 3938 3939 spin_lock(&session->s_cap_lock); 3940 /* don't know if session is readonly */ 3941 session->s_readonly = 0; 3942 /* 3943 * notify __ceph_remove_cap() that we are composing cap reconnect. 3944 * If a cap get released before being added to the cap reconnect, 3945 * __ceph_remove_cap() should skip queuing cap release. 3946 */ 3947 session->s_cap_reconnect = 1; 3948 /* drop old cap expires; we're about to reestablish that state */ 3949 detach_cap_releases(session, &dispose); 3950 spin_unlock(&session->s_cap_lock); 3951 dispose_cap_releases(mdsc, &dispose); 3952 3953 /* trim unused caps to reduce MDS's cache rejoin time */ 3954 if (mdsc->fsc->sb->s_root) 3955 shrink_dcache_parent(mdsc->fsc->sb->s_root); 3956 3957 ceph_con_close(&session->s_con); 3958 ceph_con_open(&session->s_con, 3959 CEPH_ENTITY_TYPE_MDS, mds, 3960 ceph_mdsmap_get_addr(mdsc->mdsmap, mds)); 3961 3962 /* replay unsafe requests */ 3963 replay_unsafe_requests(mdsc, session); 3964 3965 ceph_early_kick_flushing_caps(mdsc, session); 3966 3967 down_read(&mdsc->snap_rwsem); 3968 3969 /* placeholder for nr_caps */ 3970 err = ceph_pagelist_encode_32(recon_state.pagelist, 0); 3971 if (err) 3972 goto fail; 3973 3974 if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) { 3975 recon_state.msg_version = 3; 3976 recon_state.allow_multi = true; 3977 } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) { 3978 recon_state.msg_version = 3; 3979 } else { 3980 recon_state.msg_version = 2; 3981 } 3982 /* trsaverse this session's caps */ 3983 err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state); 3984 3985 spin_lock(&session->s_cap_lock); 3986 session->s_cap_reconnect = 0; 3987 spin_unlock(&session->s_cap_lock); 3988 3989 if (err < 0) 3990 goto fail; 3991 3992 /* check if all realms can be encoded into current message */ 3993 if (mdsc->num_snap_realms) { 3994 size_t total_len = 3995 recon_state.pagelist->length + 3996 mdsc->num_snap_realms * 3997 sizeof(struct ceph_mds_snaprealm_reconnect); 3998 if (recon_state.msg_version >= 4) { 3999 /* number of realms */ 4000 total_len += sizeof(u32); 4001 /* version, compat_version and struct_len */ 4002 total_len += mdsc->num_snap_realms * 4003 (2 * sizeof(u8) + sizeof(u32)); 4004 } 4005 if (total_len > RECONNECT_MAX_SIZE) { 4006 if (!recon_state.allow_multi) { 4007 err = -ENOSPC; 4008 goto fail; 4009 } 4010 if (recon_state.nr_caps) { 4011 err = send_reconnect_partial(&recon_state); 4012 if (err) 4013 goto fail; 4014 } 4015 recon_state.msg_version = 5; 4016 } 4017 } 4018 4019 err = encode_snap_realms(mdsc, &recon_state); 4020 if (err < 0) 4021 goto fail; 4022 4023 if (recon_state.msg_version >= 5) { 4024 err = ceph_pagelist_encode_8(recon_state.pagelist, 0); 4025 if (err < 0) 4026 goto fail; 4027 } 4028 4029 if (recon_state.nr_caps || recon_state.nr_realms) { 4030 struct page *page = 4031 list_first_entry(&recon_state.pagelist->head, 4032 struct page, lru); 4033 __le32 *addr = kmap_atomic(page); 4034 if (recon_state.nr_caps) { 4035 WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms); 4036 *addr = cpu_to_le32(recon_state.nr_caps); 4037 } else if (recon_state.msg_version >= 4) { 4038 *(addr + 1) = cpu_to_le32(recon_state.nr_realms); 4039 } 4040 kunmap_atomic(addr); 4041 } 4042 4043 reply->hdr.version = cpu_to_le16(recon_state.msg_version); 4044 if (recon_state.msg_version >= 4) 4045 reply->hdr.compat_version = cpu_to_le16(4); 4046 4047 reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length); 4048 ceph_msg_data_add_pagelist(reply, recon_state.pagelist); 4049 4050 ceph_con_send(&session->s_con, reply); 4051 4052 mutex_unlock(&session->s_mutex); 4053 4054 mutex_lock(&mdsc->mutex); 4055 __wake_requests(mdsc, &session->s_waiting); 4056 mutex_unlock(&mdsc->mutex); 4057 4058 up_read(&mdsc->snap_rwsem); 4059 ceph_pagelist_release(recon_state.pagelist); 4060 return; 4061 4062 fail: 4063 ceph_msg_put(reply); 4064 up_read(&mdsc->snap_rwsem); 4065 mutex_unlock(&session->s_mutex); 4066 fail_nomsg: 4067 ceph_pagelist_release(recon_state.pagelist); 4068 fail_nopagelist: 4069 pr_err("error %d preparing reconnect for mds%d\n", err, mds); 4070 return; 4071 } 4072 4073 4074 /* 4075 * compare old and new mdsmaps, kicking requests 4076 * and closing out old connections as necessary 4077 * 4078 * called under mdsc->mutex. 4079 */ 4080 static void check_new_map(struct ceph_mds_client *mdsc, 4081 struct ceph_mdsmap *newmap, 4082 struct ceph_mdsmap *oldmap) 4083 { 4084 int i; 4085 int oldstate, newstate; 4086 struct ceph_mds_session *s; 4087 4088 dout("check_new_map new %u old %u\n", 4089 newmap->m_epoch, oldmap->m_epoch); 4090 4091 for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4092 if (!mdsc->sessions[i]) 4093 continue; 4094 s = mdsc->sessions[i]; 4095 oldstate = ceph_mdsmap_get_state(oldmap, i); 4096 newstate = ceph_mdsmap_get_state(newmap, i); 4097 4098 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n", 4099 i, ceph_mds_state_name(oldstate), 4100 ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "", 4101 ceph_mds_state_name(newstate), 4102 ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "", 4103 ceph_session_state_name(s->s_state)); 4104 4105 if (i >= newmap->possible_max_rank) { 4106 /* force close session for stopped mds */ 4107 ceph_get_mds_session(s); 4108 __unregister_session(mdsc, s); 4109 __wake_requests(mdsc, &s->s_waiting); 4110 mutex_unlock(&mdsc->mutex); 4111 4112 mutex_lock(&s->s_mutex); 4113 cleanup_session_requests(mdsc, s); 4114 remove_session_caps(s); 4115 mutex_unlock(&s->s_mutex); 4116 4117 ceph_put_mds_session(s); 4118 4119 mutex_lock(&mdsc->mutex); 4120 kick_requests(mdsc, i); 4121 continue; 4122 } 4123 4124 if (memcmp(ceph_mdsmap_get_addr(oldmap, i), 4125 ceph_mdsmap_get_addr(newmap, i), 4126 sizeof(struct ceph_entity_addr))) { 4127 /* just close it */ 4128 mutex_unlock(&mdsc->mutex); 4129 mutex_lock(&s->s_mutex); 4130 mutex_lock(&mdsc->mutex); 4131 ceph_con_close(&s->s_con); 4132 mutex_unlock(&s->s_mutex); 4133 s->s_state = CEPH_MDS_SESSION_RESTARTING; 4134 } else if (oldstate == newstate) { 4135 continue; /* nothing new with this mds */ 4136 } 4137 4138 /* 4139 * send reconnect? 4140 */ 4141 if (s->s_state == CEPH_MDS_SESSION_RESTARTING && 4142 newstate >= CEPH_MDS_STATE_RECONNECT) { 4143 mutex_unlock(&mdsc->mutex); 4144 send_mds_reconnect(mdsc, s); 4145 mutex_lock(&mdsc->mutex); 4146 } 4147 4148 /* 4149 * kick request on any mds that has gone active. 4150 */ 4151 if (oldstate < CEPH_MDS_STATE_ACTIVE && 4152 newstate >= CEPH_MDS_STATE_ACTIVE) { 4153 if (oldstate != CEPH_MDS_STATE_CREATING && 4154 oldstate != CEPH_MDS_STATE_STARTING) 4155 pr_info("mds%d recovery completed\n", s->s_mds); 4156 kick_requests(mdsc, i); 4157 mutex_unlock(&mdsc->mutex); 4158 mutex_lock(&s->s_mutex); 4159 mutex_lock(&mdsc->mutex); 4160 ceph_kick_flushing_caps(mdsc, s); 4161 mutex_unlock(&s->s_mutex); 4162 wake_up_session_caps(s, RECONNECT); 4163 } 4164 } 4165 4166 for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) { 4167 s = mdsc->sessions[i]; 4168 if (!s) 4169 continue; 4170 if (!ceph_mdsmap_is_laggy(newmap, i)) 4171 continue; 4172 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4173 s->s_state == CEPH_MDS_SESSION_HUNG || 4174 s->s_state == CEPH_MDS_SESSION_CLOSING) { 4175 dout(" connecting to export targets of laggy mds%d\n", 4176 i); 4177 __open_export_target_sessions(mdsc, s); 4178 } 4179 } 4180 } 4181 4182 4183 4184 /* 4185 * leases 4186 */ 4187 4188 /* 4189 * caller must hold session s_mutex, dentry->d_lock 4190 */ 4191 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry) 4192 { 4193 struct ceph_dentry_info *di = ceph_dentry(dentry); 4194 4195 ceph_put_mds_session(di->lease_session); 4196 di->lease_session = NULL; 4197 } 4198 4199 static void handle_lease(struct ceph_mds_client *mdsc, 4200 struct ceph_mds_session *session, 4201 struct ceph_msg *msg) 4202 { 4203 struct super_block *sb = mdsc->fsc->sb; 4204 struct inode *inode; 4205 struct dentry *parent, *dentry; 4206 struct ceph_dentry_info *di; 4207 int mds = session->s_mds; 4208 struct ceph_mds_lease *h = msg->front.iov_base; 4209 u32 seq; 4210 struct ceph_vino vino; 4211 struct qstr dname; 4212 int release = 0; 4213 4214 dout("handle_lease from mds%d\n", mds); 4215 4216 /* decode */ 4217 if (msg->front.iov_len < sizeof(*h) + sizeof(u32)) 4218 goto bad; 4219 vino.ino = le64_to_cpu(h->ino); 4220 vino.snap = CEPH_NOSNAP; 4221 seq = le32_to_cpu(h->seq); 4222 dname.len = get_unaligned_le32(h + 1); 4223 if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len) 4224 goto bad; 4225 dname.name = (void *)(h + 1) + sizeof(u32); 4226 4227 /* lookup inode */ 4228 inode = ceph_find_inode(sb, vino); 4229 dout("handle_lease %s, ino %llx %p %.*s\n", 4230 ceph_lease_op_name(h->action), vino.ino, inode, 4231 dname.len, dname.name); 4232 4233 mutex_lock(&session->s_mutex); 4234 inc_session_sequence(session); 4235 4236 if (!inode) { 4237 dout("handle_lease no inode %llx\n", vino.ino); 4238 goto release; 4239 } 4240 4241 /* dentry */ 4242 parent = d_find_alias(inode); 4243 if (!parent) { 4244 dout("no parent dentry on inode %p\n", inode); 4245 WARN_ON(1); 4246 goto release; /* hrm... */ 4247 } 4248 dname.hash = full_name_hash(parent, dname.name, dname.len); 4249 dentry = d_lookup(parent, &dname); 4250 dput(parent); 4251 if (!dentry) 4252 goto release; 4253 4254 spin_lock(&dentry->d_lock); 4255 di = ceph_dentry(dentry); 4256 switch (h->action) { 4257 case CEPH_MDS_LEASE_REVOKE: 4258 if (di->lease_session == session) { 4259 if (ceph_seq_cmp(di->lease_seq, seq) > 0) 4260 h->seq = cpu_to_le32(di->lease_seq); 4261 __ceph_mdsc_drop_dentry_lease(dentry); 4262 } 4263 release = 1; 4264 break; 4265 4266 case CEPH_MDS_LEASE_RENEW: 4267 if (di->lease_session == session && 4268 di->lease_gen == session->s_cap_gen && 4269 di->lease_renew_from && 4270 di->lease_renew_after == 0) { 4271 unsigned long duration = 4272 msecs_to_jiffies(le32_to_cpu(h->duration_ms)); 4273 4274 di->lease_seq = seq; 4275 di->time = di->lease_renew_from + duration; 4276 di->lease_renew_after = di->lease_renew_from + 4277 (duration >> 1); 4278 di->lease_renew_from = 0; 4279 } 4280 break; 4281 } 4282 spin_unlock(&dentry->d_lock); 4283 dput(dentry); 4284 4285 if (!release) 4286 goto out; 4287 4288 release: 4289 /* let's just reuse the same message */ 4290 h->action = CEPH_MDS_LEASE_REVOKE_ACK; 4291 ceph_msg_get(msg); 4292 ceph_con_send(&session->s_con, msg); 4293 4294 out: 4295 mutex_unlock(&session->s_mutex); 4296 /* avoid calling iput_final() in mds dispatch threads */ 4297 ceph_async_iput(inode); 4298 return; 4299 4300 bad: 4301 pr_err("corrupt lease message\n"); 4302 ceph_msg_dump(msg); 4303 } 4304 4305 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session, 4306 struct dentry *dentry, char action, 4307 u32 seq) 4308 { 4309 struct ceph_msg *msg; 4310 struct ceph_mds_lease *lease; 4311 struct inode *dir; 4312 int len = sizeof(*lease) + sizeof(u32) + NAME_MAX; 4313 4314 dout("lease_send_msg identry %p %s to mds%d\n", 4315 dentry, ceph_lease_op_name(action), session->s_mds); 4316 4317 msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false); 4318 if (!msg) 4319 return; 4320 lease = msg->front.iov_base; 4321 lease->action = action; 4322 lease->seq = cpu_to_le32(seq); 4323 4324 spin_lock(&dentry->d_lock); 4325 dir = d_inode(dentry->d_parent); 4326 lease->ino = cpu_to_le64(ceph_ino(dir)); 4327 lease->first = lease->last = cpu_to_le64(ceph_snap(dir)); 4328 4329 put_unaligned_le32(dentry->d_name.len, lease + 1); 4330 memcpy((void *)(lease + 1) + 4, 4331 dentry->d_name.name, dentry->d_name.len); 4332 spin_unlock(&dentry->d_lock); 4333 /* 4334 * if this is a preemptive lease RELEASE, no need to 4335 * flush request stream, since the actual request will 4336 * soon follow. 4337 */ 4338 msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE); 4339 4340 ceph_con_send(&session->s_con, msg); 4341 } 4342 4343 /* 4344 * lock unlock sessions, to wait ongoing session activities 4345 */ 4346 static void lock_unlock_sessions(struct ceph_mds_client *mdsc) 4347 { 4348 int i; 4349 4350 mutex_lock(&mdsc->mutex); 4351 for (i = 0; i < mdsc->max_sessions; i++) { 4352 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4353 if (!s) 4354 continue; 4355 mutex_unlock(&mdsc->mutex); 4356 mutex_lock(&s->s_mutex); 4357 mutex_unlock(&s->s_mutex); 4358 ceph_put_mds_session(s); 4359 mutex_lock(&mdsc->mutex); 4360 } 4361 mutex_unlock(&mdsc->mutex); 4362 } 4363 4364 static void maybe_recover_session(struct ceph_mds_client *mdsc) 4365 { 4366 struct ceph_fs_client *fsc = mdsc->fsc; 4367 4368 if (!ceph_test_mount_opt(fsc, CLEANRECOVER)) 4369 return; 4370 4371 if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED) 4372 return; 4373 4374 if (!READ_ONCE(fsc->blocklisted)) 4375 return; 4376 4377 if (fsc->last_auto_reconnect && 4378 time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30)) 4379 return; 4380 4381 pr_info("auto reconnect after blocklisted\n"); 4382 fsc->last_auto_reconnect = jiffies; 4383 ceph_force_reconnect(fsc->sb); 4384 } 4385 4386 bool check_session_state(struct ceph_mds_session *s) 4387 { 4388 switch (s->s_state) { 4389 case CEPH_MDS_SESSION_OPEN: 4390 if (s->s_ttl && time_after(jiffies, s->s_ttl)) { 4391 s->s_state = CEPH_MDS_SESSION_HUNG; 4392 pr_info("mds%d hung\n", s->s_mds); 4393 } 4394 break; 4395 case CEPH_MDS_SESSION_CLOSING: 4396 /* Should never reach this when we're unmounting */ 4397 WARN_ON_ONCE(true); 4398 fallthrough; 4399 case CEPH_MDS_SESSION_NEW: 4400 case CEPH_MDS_SESSION_RESTARTING: 4401 case CEPH_MDS_SESSION_CLOSED: 4402 case CEPH_MDS_SESSION_REJECTED: 4403 return false; 4404 } 4405 4406 return true; 4407 } 4408 4409 /* 4410 * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply, 4411 * then we need to retransmit that request. 4412 */ 4413 void inc_session_sequence(struct ceph_mds_session *s) 4414 { 4415 lockdep_assert_held(&s->s_mutex); 4416 4417 s->s_seq++; 4418 4419 if (s->s_state == CEPH_MDS_SESSION_CLOSING) { 4420 int ret; 4421 4422 dout("resending session close request for mds%d\n", s->s_mds); 4423 ret = request_close_session(s); 4424 if (ret < 0) 4425 pr_err("unable to close session to mds%d: %d\n", 4426 s->s_mds, ret); 4427 } 4428 } 4429 4430 /* 4431 * delayed work -- periodically trim expired leases, renew caps with mds 4432 */ 4433 static void schedule_delayed(struct ceph_mds_client *mdsc) 4434 { 4435 int delay = 5; 4436 unsigned hz = round_jiffies_relative(HZ * delay); 4437 schedule_delayed_work(&mdsc->delayed_work, hz); 4438 } 4439 4440 static void delayed_work(struct work_struct *work) 4441 { 4442 int i; 4443 struct ceph_mds_client *mdsc = 4444 container_of(work, struct ceph_mds_client, delayed_work.work); 4445 int renew_interval; 4446 int renew_caps; 4447 4448 dout("mdsc delayed_work\n"); 4449 4450 if (mdsc->stopping) 4451 return; 4452 4453 mutex_lock(&mdsc->mutex); 4454 renew_interval = mdsc->mdsmap->m_session_timeout >> 2; 4455 renew_caps = time_after_eq(jiffies, HZ*renew_interval + 4456 mdsc->last_renew_caps); 4457 if (renew_caps) 4458 mdsc->last_renew_caps = jiffies; 4459 4460 for (i = 0; i < mdsc->max_sessions; i++) { 4461 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i); 4462 if (!s) 4463 continue; 4464 4465 if (!check_session_state(s)) { 4466 ceph_put_mds_session(s); 4467 continue; 4468 } 4469 mutex_unlock(&mdsc->mutex); 4470 4471 mutex_lock(&s->s_mutex); 4472 if (renew_caps) 4473 send_renew_caps(mdsc, s); 4474 else 4475 ceph_con_keepalive(&s->s_con); 4476 if (s->s_state == CEPH_MDS_SESSION_OPEN || 4477 s->s_state == CEPH_MDS_SESSION_HUNG) 4478 ceph_send_cap_releases(mdsc, s); 4479 mutex_unlock(&s->s_mutex); 4480 ceph_put_mds_session(s); 4481 4482 mutex_lock(&mdsc->mutex); 4483 } 4484 mutex_unlock(&mdsc->mutex); 4485 4486 ceph_check_delayed_caps(mdsc); 4487 4488 ceph_queue_cap_reclaim_work(mdsc); 4489 4490 ceph_trim_snapid_map(mdsc); 4491 4492 maybe_recover_session(mdsc); 4493 4494 schedule_delayed(mdsc); 4495 } 4496 4497 int ceph_mdsc_init(struct ceph_fs_client *fsc) 4498 4499 { 4500 struct ceph_mds_client *mdsc; 4501 int err; 4502 4503 mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); 4504 if (!mdsc) 4505 return -ENOMEM; 4506 mdsc->fsc = fsc; 4507 mutex_init(&mdsc->mutex); 4508 mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); 4509 if (!mdsc->mdsmap) { 4510 err = -ENOMEM; 4511 goto err_mdsc; 4512 } 4513 4514 init_completion(&mdsc->safe_umount_waiters); 4515 init_waitqueue_head(&mdsc->session_close_wq); 4516 INIT_LIST_HEAD(&mdsc->waiting_for_map); 4517 mdsc->sessions = NULL; 4518 atomic_set(&mdsc->num_sessions, 0); 4519 mdsc->max_sessions = 0; 4520 mdsc->stopping = 0; 4521 atomic64_set(&mdsc->quotarealms_count, 0); 4522 mdsc->quotarealms_inodes = RB_ROOT; 4523 mutex_init(&mdsc->quotarealms_inodes_mutex); 4524 mdsc->last_snap_seq = 0; 4525 init_rwsem(&mdsc->snap_rwsem); 4526 mdsc->snap_realms = RB_ROOT; 4527 INIT_LIST_HEAD(&mdsc->snap_empty); 4528 mdsc->num_snap_realms = 0; 4529 spin_lock_init(&mdsc->snap_empty_lock); 4530 mdsc->last_tid = 0; 4531 mdsc->oldest_tid = 0; 4532 mdsc->request_tree = RB_ROOT; 4533 INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); 4534 mdsc->last_renew_caps = jiffies; 4535 INIT_LIST_HEAD(&mdsc->cap_delay_list); 4536 INIT_LIST_HEAD(&mdsc->cap_wait_list); 4537 spin_lock_init(&mdsc->cap_delay_lock); 4538 INIT_LIST_HEAD(&mdsc->snap_flush_list); 4539 spin_lock_init(&mdsc->snap_flush_lock); 4540 mdsc->last_cap_flush_tid = 1; 4541 INIT_LIST_HEAD(&mdsc->cap_flush_list); 4542 INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); 4543 mdsc->num_cap_flushing = 0; 4544 spin_lock_init(&mdsc->cap_dirty_lock); 4545 init_waitqueue_head(&mdsc->cap_flushing_wq); 4546 INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); 4547 atomic_set(&mdsc->cap_reclaim_pending, 0); 4548 err = ceph_metric_init(&mdsc->metric); 4549 if (err) 4550 goto err_mdsmap; 4551 4552 spin_lock_init(&mdsc->dentry_list_lock); 4553 INIT_LIST_HEAD(&mdsc->dentry_leases); 4554 INIT_LIST_HEAD(&mdsc->dentry_dir_leases); 4555 4556 ceph_caps_init(mdsc); 4557 ceph_adjust_caps_max_min(mdsc, fsc->mount_options); 4558 4559 spin_lock_init(&mdsc->snapid_map_lock); 4560 mdsc->snapid_map_tree = RB_ROOT; 4561 INIT_LIST_HEAD(&mdsc->snapid_map_lru); 4562 4563 init_rwsem(&mdsc->pool_perm_rwsem); 4564 mdsc->pool_perm_tree = RB_ROOT; 4565 4566 strscpy(mdsc->nodename, utsname()->nodename, 4567 sizeof(mdsc->nodename)); 4568 4569 fsc->mdsc = mdsc; 4570 return 0; 4571 4572 err_mdsmap: 4573 kfree(mdsc->mdsmap); 4574 err_mdsc: 4575 kfree(mdsc); 4576 return err; 4577 } 4578 4579 /* 4580 * Wait for safe replies on open mds requests. If we time out, drop 4581 * all requests from the tree to avoid dangling dentry refs. 4582 */ 4583 static void wait_requests(struct ceph_mds_client *mdsc) 4584 { 4585 struct ceph_options *opts = mdsc->fsc->client->options; 4586 struct ceph_mds_request *req; 4587 4588 mutex_lock(&mdsc->mutex); 4589 if (__get_oldest_req(mdsc)) { 4590 mutex_unlock(&mdsc->mutex); 4591 4592 dout("wait_requests waiting for requests\n"); 4593 wait_for_completion_timeout(&mdsc->safe_umount_waiters, 4594 ceph_timeout_jiffies(opts->mount_timeout)); 4595 4596 /* tear down remaining requests */ 4597 mutex_lock(&mdsc->mutex); 4598 while ((req = __get_oldest_req(mdsc))) { 4599 dout("wait_requests timed out on tid %llu\n", 4600 req->r_tid); 4601 list_del_init(&req->r_wait); 4602 __unregister_request(mdsc, req); 4603 } 4604 } 4605 mutex_unlock(&mdsc->mutex); 4606 dout("wait_requests done\n"); 4607 } 4608 4609 /* 4610 * called before mount is ro, and before dentries are torn down. 4611 * (hmm, does this still race with new lookups?) 4612 */ 4613 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc) 4614 { 4615 dout("pre_umount\n"); 4616 mdsc->stopping = 1; 4617 4618 lock_unlock_sessions(mdsc); 4619 ceph_flush_dirty_caps(mdsc); 4620 wait_requests(mdsc); 4621 4622 /* 4623 * wait for reply handlers to drop their request refs and 4624 * their inode/dcache refs 4625 */ 4626 ceph_msgr_flush(); 4627 4628 ceph_cleanup_quotarealms_inodes(mdsc); 4629 } 4630 4631 /* 4632 * wait for all write mds requests to flush. 4633 */ 4634 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid) 4635 { 4636 struct ceph_mds_request *req = NULL, *nextreq; 4637 struct rb_node *n; 4638 4639 mutex_lock(&mdsc->mutex); 4640 dout("wait_unsafe_requests want %lld\n", want_tid); 4641 restart: 4642 req = __get_oldest_req(mdsc); 4643 while (req && req->r_tid <= want_tid) { 4644 /* find next request */ 4645 n = rb_next(&req->r_node); 4646 if (n) 4647 nextreq = rb_entry(n, struct ceph_mds_request, r_node); 4648 else 4649 nextreq = NULL; 4650 if (req->r_op != CEPH_MDS_OP_SETFILELOCK && 4651 (req->r_op & CEPH_MDS_OP_WRITE)) { 4652 /* write op */ 4653 ceph_mdsc_get_request(req); 4654 if (nextreq) 4655 ceph_mdsc_get_request(nextreq); 4656 mutex_unlock(&mdsc->mutex); 4657 dout("wait_unsafe_requests wait on %llu (want %llu)\n", 4658 req->r_tid, want_tid); 4659 wait_for_completion(&req->r_safe_completion); 4660 mutex_lock(&mdsc->mutex); 4661 ceph_mdsc_put_request(req); 4662 if (!nextreq) 4663 break; /* next dne before, so we're done! */ 4664 if (RB_EMPTY_NODE(&nextreq->r_node)) { 4665 /* next request was removed from tree */ 4666 ceph_mdsc_put_request(nextreq); 4667 goto restart; 4668 } 4669 ceph_mdsc_put_request(nextreq); /* won't go away */ 4670 } 4671 req = nextreq; 4672 } 4673 mutex_unlock(&mdsc->mutex); 4674 dout("wait_unsafe_requests done\n"); 4675 } 4676 4677 void ceph_mdsc_sync(struct ceph_mds_client *mdsc) 4678 { 4679 u64 want_tid, want_flush; 4680 4681 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4682 return; 4683 4684 dout("sync\n"); 4685 mutex_lock(&mdsc->mutex); 4686 want_tid = mdsc->last_tid; 4687 mutex_unlock(&mdsc->mutex); 4688 4689 ceph_flush_dirty_caps(mdsc); 4690 spin_lock(&mdsc->cap_dirty_lock); 4691 want_flush = mdsc->last_cap_flush_tid; 4692 if (!list_empty(&mdsc->cap_flush_list)) { 4693 struct ceph_cap_flush *cf = 4694 list_last_entry(&mdsc->cap_flush_list, 4695 struct ceph_cap_flush, g_list); 4696 cf->wake = true; 4697 } 4698 spin_unlock(&mdsc->cap_dirty_lock); 4699 4700 dout("sync want tid %lld flush_seq %lld\n", 4701 want_tid, want_flush); 4702 4703 wait_unsafe_requests(mdsc, want_tid); 4704 wait_caps_flush(mdsc, want_flush); 4705 } 4706 4707 /* 4708 * true if all sessions are closed, or we force unmount 4709 */ 4710 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) 4711 { 4712 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) 4713 return true; 4714 return atomic_read(&mdsc->num_sessions) <= skipped; 4715 } 4716 4717 /* 4718 * called after sb is ro. 4719 */ 4720 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) 4721 { 4722 struct ceph_options *opts = mdsc->fsc->client->options; 4723 struct ceph_mds_session *session; 4724 int i; 4725 int skipped = 0; 4726 4727 dout("close_sessions\n"); 4728 4729 /* close sessions */ 4730 mutex_lock(&mdsc->mutex); 4731 for (i = 0; i < mdsc->max_sessions; i++) { 4732 session = __ceph_lookup_mds_session(mdsc, i); 4733 if (!session) 4734 continue; 4735 mutex_unlock(&mdsc->mutex); 4736 mutex_lock(&session->s_mutex); 4737 if (__close_session(mdsc, session) <= 0) 4738 skipped++; 4739 mutex_unlock(&session->s_mutex); 4740 ceph_put_mds_session(session); 4741 mutex_lock(&mdsc->mutex); 4742 } 4743 mutex_unlock(&mdsc->mutex); 4744 4745 dout("waiting for sessions to close\n"); 4746 wait_event_timeout(mdsc->session_close_wq, 4747 done_closing_sessions(mdsc, skipped), 4748 ceph_timeout_jiffies(opts->mount_timeout)); 4749 4750 /* tear down remaining sessions */ 4751 mutex_lock(&mdsc->mutex); 4752 for (i = 0; i < mdsc->max_sessions; i++) { 4753 if (mdsc->sessions[i]) { 4754 session = ceph_get_mds_session(mdsc->sessions[i]); 4755 __unregister_session(mdsc, session); 4756 mutex_unlock(&mdsc->mutex); 4757 mutex_lock(&session->s_mutex); 4758 remove_session_caps(session); 4759 mutex_unlock(&session->s_mutex); 4760 ceph_put_mds_session(session); 4761 mutex_lock(&mdsc->mutex); 4762 } 4763 } 4764 WARN_ON(!list_empty(&mdsc->cap_delay_list)); 4765 mutex_unlock(&mdsc->mutex); 4766 4767 ceph_cleanup_snapid_map(mdsc); 4768 ceph_cleanup_empty_realms(mdsc); 4769 4770 cancel_work_sync(&mdsc->cap_reclaim_work); 4771 cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */ 4772 4773 dout("stopped\n"); 4774 } 4775 4776 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc) 4777 { 4778 struct ceph_mds_session *session; 4779 int mds; 4780 4781 dout("force umount\n"); 4782 4783 mutex_lock(&mdsc->mutex); 4784 for (mds = 0; mds < mdsc->max_sessions; mds++) { 4785 session = __ceph_lookup_mds_session(mdsc, mds); 4786 if (!session) 4787 continue; 4788 4789 if (session->s_state == CEPH_MDS_SESSION_REJECTED) 4790 __unregister_session(mdsc, session); 4791 __wake_requests(mdsc, &session->s_waiting); 4792 mutex_unlock(&mdsc->mutex); 4793 4794 mutex_lock(&session->s_mutex); 4795 __close_session(mdsc, session); 4796 if (session->s_state == CEPH_MDS_SESSION_CLOSING) { 4797 cleanup_session_requests(mdsc, session); 4798 remove_session_caps(session); 4799 } 4800 mutex_unlock(&session->s_mutex); 4801 ceph_put_mds_session(session); 4802 4803 mutex_lock(&mdsc->mutex); 4804 kick_requests(mdsc, mds); 4805 } 4806 __wake_requests(mdsc, &mdsc->waiting_for_map); 4807 mutex_unlock(&mdsc->mutex); 4808 } 4809 4810 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) 4811 { 4812 dout("stop\n"); 4813 /* 4814 * Make sure the delayed work stopped before releasing 4815 * the resources. 4816 * 4817 * Because the cancel_delayed_work_sync() will only 4818 * guarantee that the work finishes executing. But the 4819 * delayed work will re-arm itself again after that. 4820 */ 4821 flush_delayed_work(&mdsc->delayed_work); 4822 4823 if (mdsc->mdsmap) 4824 ceph_mdsmap_destroy(mdsc->mdsmap); 4825 kfree(mdsc->sessions); 4826 ceph_caps_finalize(mdsc); 4827 ceph_pool_perm_destroy(mdsc); 4828 } 4829 4830 void ceph_mdsc_destroy(struct ceph_fs_client *fsc) 4831 { 4832 struct ceph_mds_client *mdsc = fsc->mdsc; 4833 dout("mdsc_destroy %p\n", mdsc); 4834 4835 if (!mdsc) 4836 return; 4837 4838 /* flush out any connection work with references to us */ 4839 ceph_msgr_flush(); 4840 4841 ceph_mdsc_stop(mdsc); 4842 4843 ceph_metric_destroy(&mdsc->metric); 4844 4845 flush_delayed_work(&mdsc->metric.delayed_work); 4846 fsc->mdsc = NULL; 4847 kfree(mdsc); 4848 dout("mdsc_destroy %p done\n", mdsc); 4849 } 4850 4851 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4852 { 4853 struct ceph_fs_client *fsc = mdsc->fsc; 4854 const char *mds_namespace = fsc->mount_options->mds_namespace; 4855 void *p = msg->front.iov_base; 4856 void *end = p + msg->front.iov_len; 4857 u32 epoch; 4858 u32 map_len; 4859 u32 num_fs; 4860 u32 mount_fscid = (u32)-1; 4861 u8 struct_v, struct_cv; 4862 int err = -EINVAL; 4863 4864 ceph_decode_need(&p, end, sizeof(u32), bad); 4865 epoch = ceph_decode_32(&p); 4866 4867 dout("handle_fsmap epoch %u\n", epoch); 4868 4869 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4870 struct_v = ceph_decode_8(&p); 4871 struct_cv = ceph_decode_8(&p); 4872 map_len = ceph_decode_32(&p); 4873 4874 ceph_decode_need(&p, end, sizeof(u32) * 3, bad); 4875 p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */ 4876 4877 num_fs = ceph_decode_32(&p); 4878 while (num_fs-- > 0) { 4879 void *info_p, *info_end; 4880 u32 info_len; 4881 u8 info_v, info_cv; 4882 u32 fscid, namelen; 4883 4884 ceph_decode_need(&p, end, 2 + sizeof(u32), bad); 4885 info_v = ceph_decode_8(&p); 4886 info_cv = ceph_decode_8(&p); 4887 info_len = ceph_decode_32(&p); 4888 ceph_decode_need(&p, end, info_len, bad); 4889 info_p = p; 4890 info_end = p + info_len; 4891 p = info_end; 4892 4893 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad); 4894 fscid = ceph_decode_32(&info_p); 4895 namelen = ceph_decode_32(&info_p); 4896 ceph_decode_need(&info_p, info_end, namelen, bad); 4897 4898 if (mds_namespace && 4899 strlen(mds_namespace) == namelen && 4900 !strncmp(mds_namespace, (char *)info_p, namelen)) { 4901 mount_fscid = fscid; 4902 break; 4903 } 4904 } 4905 4906 ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch); 4907 if (mount_fscid != (u32)-1) { 4908 fsc->client->monc.fs_cluster_id = mount_fscid; 4909 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 4910 0, true); 4911 ceph_monc_renew_subs(&fsc->client->monc); 4912 } else { 4913 err = -ENOENT; 4914 goto err_out; 4915 } 4916 return; 4917 4918 bad: 4919 pr_err("error decoding fsmap\n"); 4920 err_out: 4921 mutex_lock(&mdsc->mutex); 4922 mdsc->mdsmap_err = err; 4923 __wake_requests(mdsc, &mdsc->waiting_for_map); 4924 mutex_unlock(&mdsc->mutex); 4925 } 4926 4927 /* 4928 * handle mds map update. 4929 */ 4930 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg) 4931 { 4932 u32 epoch; 4933 u32 maplen; 4934 void *p = msg->front.iov_base; 4935 void *end = p + msg->front.iov_len; 4936 struct ceph_mdsmap *newmap, *oldmap; 4937 struct ceph_fsid fsid; 4938 int err = -EINVAL; 4939 4940 ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad); 4941 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 4942 if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0) 4943 return; 4944 epoch = ceph_decode_32(&p); 4945 maplen = ceph_decode_32(&p); 4946 dout("handle_map epoch %u len %d\n", epoch, (int)maplen); 4947 4948 /* do we need it? */ 4949 mutex_lock(&mdsc->mutex); 4950 if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) { 4951 dout("handle_map epoch %u <= our %u\n", 4952 epoch, mdsc->mdsmap->m_epoch); 4953 mutex_unlock(&mdsc->mutex); 4954 return; 4955 } 4956 4957 newmap = ceph_mdsmap_decode(&p, end); 4958 if (IS_ERR(newmap)) { 4959 err = PTR_ERR(newmap); 4960 goto bad_unlock; 4961 } 4962 4963 /* swap into place */ 4964 if (mdsc->mdsmap) { 4965 oldmap = mdsc->mdsmap; 4966 mdsc->mdsmap = newmap; 4967 check_new_map(mdsc, newmap, oldmap); 4968 ceph_mdsmap_destroy(oldmap); 4969 } else { 4970 mdsc->mdsmap = newmap; /* first mds map */ 4971 } 4972 mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size, 4973 MAX_LFS_FILESIZE); 4974 4975 __wake_requests(mdsc, &mdsc->waiting_for_map); 4976 ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP, 4977 mdsc->mdsmap->m_epoch); 4978 4979 mutex_unlock(&mdsc->mutex); 4980 schedule_delayed(mdsc); 4981 return; 4982 4983 bad_unlock: 4984 mutex_unlock(&mdsc->mutex); 4985 bad: 4986 pr_err("error decoding mdsmap %d\n", err); 4987 return; 4988 } 4989 4990 static struct ceph_connection *con_get(struct ceph_connection *con) 4991 { 4992 struct ceph_mds_session *s = con->private; 4993 4994 if (ceph_get_mds_session(s)) 4995 return con; 4996 return NULL; 4997 } 4998 4999 static void con_put(struct ceph_connection *con) 5000 { 5001 struct ceph_mds_session *s = con->private; 5002 5003 ceph_put_mds_session(s); 5004 } 5005 5006 /* 5007 * if the client is unresponsive for long enough, the mds will kill 5008 * the session entirely. 5009 */ 5010 static void peer_reset(struct ceph_connection *con) 5011 { 5012 struct ceph_mds_session *s = con->private; 5013 struct ceph_mds_client *mdsc = s->s_mdsc; 5014 5015 pr_warn("mds%d closed our session\n", s->s_mds); 5016 send_mds_reconnect(mdsc, s); 5017 } 5018 5019 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 5020 { 5021 struct ceph_mds_session *s = con->private; 5022 struct ceph_mds_client *mdsc = s->s_mdsc; 5023 int type = le16_to_cpu(msg->hdr.type); 5024 5025 mutex_lock(&mdsc->mutex); 5026 if (__verify_registered_session(mdsc, s) < 0) { 5027 mutex_unlock(&mdsc->mutex); 5028 goto out; 5029 } 5030 mutex_unlock(&mdsc->mutex); 5031 5032 switch (type) { 5033 case CEPH_MSG_MDS_MAP: 5034 ceph_mdsc_handle_mdsmap(mdsc, msg); 5035 break; 5036 case CEPH_MSG_FS_MAP_USER: 5037 ceph_mdsc_handle_fsmap(mdsc, msg); 5038 break; 5039 case CEPH_MSG_CLIENT_SESSION: 5040 handle_session(s, msg); 5041 break; 5042 case CEPH_MSG_CLIENT_REPLY: 5043 handle_reply(s, msg); 5044 break; 5045 case CEPH_MSG_CLIENT_REQUEST_FORWARD: 5046 handle_forward(mdsc, s, msg); 5047 break; 5048 case CEPH_MSG_CLIENT_CAPS: 5049 ceph_handle_caps(s, msg); 5050 break; 5051 case CEPH_MSG_CLIENT_SNAP: 5052 ceph_handle_snap(mdsc, s, msg); 5053 break; 5054 case CEPH_MSG_CLIENT_LEASE: 5055 handle_lease(mdsc, s, msg); 5056 break; 5057 case CEPH_MSG_CLIENT_QUOTA: 5058 ceph_handle_quota(mdsc, s, msg); 5059 break; 5060 5061 default: 5062 pr_err("received unknown message type %d %s\n", type, 5063 ceph_msg_type_name(type)); 5064 } 5065 out: 5066 ceph_msg_put(msg); 5067 } 5068 5069 /* 5070 * authentication 5071 */ 5072 5073 /* 5074 * Note: returned pointer is the address of a structure that's 5075 * managed separately. Caller must *not* attempt to free it. 5076 */ 5077 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 5078 int *proto, int force_new) 5079 { 5080 struct ceph_mds_session *s = con->private; 5081 struct ceph_mds_client *mdsc = s->s_mdsc; 5082 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5083 struct ceph_auth_handshake *auth = &s->s_auth; 5084 5085 if (force_new && auth->authorizer) { 5086 ceph_auth_destroy_authorizer(auth->authorizer); 5087 auth->authorizer = NULL; 5088 } 5089 if (!auth->authorizer) { 5090 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 5091 auth); 5092 if (ret) 5093 return ERR_PTR(ret); 5094 } else { 5095 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS, 5096 auth); 5097 if (ret) 5098 return ERR_PTR(ret); 5099 } 5100 *proto = ac->protocol; 5101 5102 return auth; 5103 } 5104 5105 static int add_authorizer_challenge(struct ceph_connection *con, 5106 void *challenge_buf, int challenge_buf_len) 5107 { 5108 struct ceph_mds_session *s = con->private; 5109 struct ceph_mds_client *mdsc = s->s_mdsc; 5110 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5111 5112 return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer, 5113 challenge_buf, challenge_buf_len); 5114 } 5115 5116 static int verify_authorizer_reply(struct ceph_connection *con) 5117 { 5118 struct ceph_mds_session *s = con->private; 5119 struct ceph_mds_client *mdsc = s->s_mdsc; 5120 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5121 5122 return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer); 5123 } 5124 5125 static int invalidate_authorizer(struct ceph_connection *con) 5126 { 5127 struct ceph_mds_session *s = con->private; 5128 struct ceph_mds_client *mdsc = s->s_mdsc; 5129 struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth; 5130 5131 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS); 5132 5133 return ceph_monc_validate_auth(&mdsc->fsc->client->monc); 5134 } 5135 5136 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, 5137 struct ceph_msg_header *hdr, int *skip) 5138 { 5139 struct ceph_msg *msg; 5140 int type = (int) le16_to_cpu(hdr->type); 5141 int front_len = (int) le32_to_cpu(hdr->front_len); 5142 5143 if (con->in_msg) 5144 return con->in_msg; 5145 5146 *skip = 0; 5147 msg = ceph_msg_new(type, front_len, GFP_NOFS, false); 5148 if (!msg) { 5149 pr_err("unable to allocate msg type %d len %d\n", 5150 type, front_len); 5151 return NULL; 5152 } 5153 5154 return msg; 5155 } 5156 5157 static int mds_sign_message(struct ceph_msg *msg) 5158 { 5159 struct ceph_mds_session *s = msg->con->private; 5160 struct ceph_auth_handshake *auth = &s->s_auth; 5161 5162 return ceph_auth_sign_message(auth, msg); 5163 } 5164 5165 static int mds_check_message_signature(struct ceph_msg *msg) 5166 { 5167 struct ceph_mds_session *s = msg->con->private; 5168 struct ceph_auth_handshake *auth = &s->s_auth; 5169 5170 return ceph_auth_check_message_signature(auth, msg); 5171 } 5172 5173 static const struct ceph_connection_operations mds_con_ops = { 5174 .get = con_get, 5175 .put = con_put, 5176 .dispatch = dispatch, 5177 .get_authorizer = get_authorizer, 5178 .add_authorizer_challenge = add_authorizer_challenge, 5179 .verify_authorizer_reply = verify_authorizer_reply, 5180 .invalidate_authorizer = invalidate_authorizer, 5181 .peer_reset = peer_reset, 5182 .alloc_msg = mds_alloc_msg, 5183 .sign_message = mds_sign_message, 5184 .check_message_signature = mds_check_message_signature, 5185 }; 5186 5187 /* eof */ 5188