1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/module.h> 5 #include <linux/types.h> 6 #include <linux/slab.h> 7 #include <linux/random.h> 8 #include <linux/sched.h> 9 10 #include <linux/ceph/ceph_features.h> 11 #include <linux/ceph/mon_client.h> 12 #include <linux/ceph/libceph.h> 13 #include <linux/ceph/debugfs.h> 14 #include <linux/ceph/decode.h> 15 #include <linux/ceph/auth.h> 16 17 /* 18 * Interact with Ceph monitor cluster. Handle requests for new map 19 * versions, and periodically resend as needed. Also implement 20 * statfs() and umount(). 21 * 22 * A small cluster of Ceph "monitors" are responsible for managing critical 23 * cluster configuration and state information. An odd number (e.g., 3, 5) 24 * of cmon daemons use a modified version of the Paxos part-time parliament 25 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 26 * list of clients who have mounted the file system. 27 * 28 * We maintain an open, active session with a monitor at all times in order to 29 * receive timely MDSMap updates. We periodically send a keepalive byte on the 30 * TCP socket to ensure we detect a failure. If the connection does break, we 31 * randomly hunt for a new monitor. Once the connection is reestablished, we 32 * resend any outstanding requests. 33 */ 34 35 static const struct ceph_connection_operations mon_con_ops; 36 37 static int __validate_auth(struct ceph_mon_client *monc); 38 39 /* 40 * Decode a monmap blob (e.g., during mount). 41 */ 42 static struct ceph_monmap *ceph_monmap_decode(void *p, void *end) 43 { 44 struct ceph_monmap *m = NULL; 45 int i, err = -EINVAL; 46 struct ceph_fsid fsid; 47 u32 epoch, num_mon; 48 u32 len; 49 50 ceph_decode_32_safe(&p, end, len, bad); 51 ceph_decode_need(&p, end, len, bad); 52 53 dout("monmap_decode %p %p len %d (%d)\n", p, end, len, (int)(end-p)); 54 p += sizeof(u16); /* skip version */ 55 56 ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); 57 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 58 epoch = ceph_decode_32(&p); 59 60 num_mon = ceph_decode_32(&p); 61 62 if (num_mon > CEPH_MAX_MON) 63 goto bad; 64 m = kmalloc(struct_size(m, mon_inst, num_mon), GFP_NOFS); 65 if (m == NULL) 66 return ERR_PTR(-ENOMEM); 67 m->fsid = fsid; 68 m->epoch = epoch; 69 m->num_mon = num_mon; 70 for (i = 0; i < num_mon; ++i) { 71 struct ceph_entity_inst *inst = &m->mon_inst[i]; 72 73 /* copy name portion */ 74 ceph_decode_copy_safe(&p, end, &inst->name, 75 sizeof(inst->name), bad); 76 err = ceph_decode_entity_addr(&p, end, &inst->addr); 77 if (err) 78 goto bad; 79 } 80 dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, 81 m->num_mon); 82 for (i = 0; i < m->num_mon; i++) 83 dout("monmap_decode mon%d is %s\n", i, 84 ceph_pr_addr(&m->mon_inst[i].addr)); 85 return m; 86 bad: 87 dout("monmap_decode failed with %d\n", err); 88 kfree(m); 89 return ERR_PTR(err); 90 } 91 92 /* 93 * return true if *addr is included in the monmap. 94 */ 95 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 96 { 97 int i; 98 99 for (i = 0; i < m->num_mon; i++) 100 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0) 101 return 1; 102 return 0; 103 } 104 105 /* 106 * Send an auth request. 107 */ 108 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 109 { 110 monc->pending_auth = 1; 111 monc->m_auth->front.iov_len = len; 112 monc->m_auth->hdr.front_len = cpu_to_le32(len); 113 ceph_msg_revoke(monc->m_auth); 114 ceph_msg_get(monc->m_auth); /* keep our ref */ 115 ceph_con_send(&monc->con, monc->m_auth); 116 } 117 118 /* 119 * Close monitor session, if any. 120 */ 121 static void __close_session(struct ceph_mon_client *monc) 122 { 123 dout("__close_session closing mon%d\n", monc->cur_mon); 124 ceph_msg_revoke(monc->m_auth); 125 ceph_msg_revoke_incoming(monc->m_auth_reply); 126 ceph_msg_revoke(monc->m_subscribe); 127 ceph_msg_revoke_incoming(monc->m_subscribe_ack); 128 ceph_con_close(&monc->con); 129 130 monc->pending_auth = 0; 131 ceph_auth_reset(monc->auth); 132 } 133 134 /* 135 * Pick a new monitor at random and set cur_mon. If we are repicking 136 * (i.e. cur_mon is already set), be sure to pick a different one. 137 */ 138 static void pick_new_mon(struct ceph_mon_client *monc) 139 { 140 int old_mon = monc->cur_mon; 141 142 BUG_ON(monc->monmap->num_mon < 1); 143 144 if (monc->monmap->num_mon == 1) { 145 monc->cur_mon = 0; 146 } else { 147 int max = monc->monmap->num_mon; 148 int o = -1; 149 int n; 150 151 if (monc->cur_mon >= 0) { 152 if (monc->cur_mon < monc->monmap->num_mon) 153 o = monc->cur_mon; 154 if (o >= 0) 155 max--; 156 } 157 158 n = prandom_u32() % max; 159 if (o >= 0 && n >= o) 160 n++; 161 162 monc->cur_mon = n; 163 } 164 165 dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon, 166 monc->cur_mon, monc->monmap->num_mon); 167 } 168 169 /* 170 * Open a session with a new monitor. 171 */ 172 static void __open_session(struct ceph_mon_client *monc) 173 { 174 int ret; 175 176 pick_new_mon(monc); 177 178 monc->hunting = true; 179 if (monc->had_a_connection) { 180 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF; 181 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT) 182 monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT; 183 } 184 185 monc->sub_renew_after = jiffies; /* i.e., expired */ 186 monc->sub_renew_sent = 0; 187 188 dout("%s opening mon%d\n", __func__, monc->cur_mon); 189 ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon, 190 &monc->monmap->mon_inst[monc->cur_mon].addr); 191 192 /* 193 * send an initial keepalive to ensure our timestamp is valid 194 * by the time we are in an OPENED state 195 */ 196 ceph_con_keepalive(&monc->con); 197 198 /* initiate authentication handshake */ 199 ret = ceph_auth_build_hello(monc->auth, 200 monc->m_auth->front.iov_base, 201 monc->m_auth->front_alloc_len); 202 BUG_ON(ret <= 0); 203 __send_prepared_auth_request(monc, ret); 204 } 205 206 static void reopen_session(struct ceph_mon_client *monc) 207 { 208 if (!monc->hunting) 209 pr_info("mon%d %s session lost, hunting for new mon\n", 210 monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr)); 211 212 __close_session(monc); 213 __open_session(monc); 214 } 215 216 void ceph_monc_reopen_session(struct ceph_mon_client *monc) 217 { 218 mutex_lock(&monc->mutex); 219 reopen_session(monc); 220 mutex_unlock(&monc->mutex); 221 } 222 223 static void un_backoff(struct ceph_mon_client *monc) 224 { 225 monc->hunt_mult /= 2; /* reduce by 50% */ 226 if (monc->hunt_mult < 1) 227 monc->hunt_mult = 1; 228 dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult); 229 } 230 231 /* 232 * Reschedule delayed work timer. 233 */ 234 static void __schedule_delayed(struct ceph_mon_client *monc) 235 { 236 unsigned long delay; 237 238 if (monc->hunting) 239 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult; 240 else 241 delay = CEPH_MONC_PING_INTERVAL; 242 243 dout("__schedule_delayed after %lu\n", delay); 244 mod_delayed_work(system_wq, &monc->delayed_work, 245 round_jiffies_relative(delay)); 246 } 247 248 const char *ceph_sub_str[] = { 249 [CEPH_SUB_MONMAP] = "monmap", 250 [CEPH_SUB_OSDMAP] = "osdmap", 251 [CEPH_SUB_FSMAP] = "fsmap.user", 252 [CEPH_SUB_MDSMAP] = "mdsmap", 253 }; 254 255 /* 256 * Send subscribe request for one or more maps, according to 257 * monc->subs. 258 */ 259 static void __send_subscribe(struct ceph_mon_client *monc) 260 { 261 struct ceph_msg *msg = monc->m_subscribe; 262 void *p = msg->front.iov_base; 263 void *const end = p + msg->front_alloc_len; 264 int num = 0; 265 int i; 266 267 dout("%s sent %lu\n", __func__, monc->sub_renew_sent); 268 269 BUG_ON(monc->cur_mon < 0); 270 271 if (!monc->sub_renew_sent) 272 monc->sub_renew_sent = jiffies | 1; /* never 0 */ 273 274 msg->hdr.version = cpu_to_le16(2); 275 276 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 277 if (monc->subs[i].want) 278 num++; 279 } 280 BUG_ON(num < 1); /* monmap sub is always there */ 281 ceph_encode_32(&p, num); 282 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 283 char buf[32]; 284 int len; 285 286 if (!monc->subs[i].want) 287 continue; 288 289 len = sprintf(buf, "%s", ceph_sub_str[i]); 290 if (i == CEPH_SUB_MDSMAP && 291 monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE) 292 len += sprintf(buf + len, ".%d", monc->fs_cluster_id); 293 294 dout("%s %s start %llu flags 0x%x\n", __func__, buf, 295 le64_to_cpu(monc->subs[i].item.start), 296 monc->subs[i].item.flags); 297 ceph_encode_string(&p, end, buf, len); 298 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item)); 299 p += sizeof(monc->subs[i].item); 300 } 301 302 BUG_ON(p > end); 303 msg->front.iov_len = p - msg->front.iov_base; 304 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 305 ceph_msg_revoke(msg); 306 ceph_con_send(&monc->con, ceph_msg_get(msg)); 307 } 308 309 static void handle_subscribe_ack(struct ceph_mon_client *monc, 310 struct ceph_msg *msg) 311 { 312 unsigned int seconds; 313 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 314 315 if (msg->front.iov_len < sizeof(*h)) 316 goto bad; 317 seconds = le32_to_cpu(h->duration); 318 319 mutex_lock(&monc->mutex); 320 if (monc->sub_renew_sent) { 321 /* 322 * This is only needed for legacy (infernalis or older) 323 * MONs -- see delayed_work(). 324 */ 325 monc->sub_renew_after = monc->sub_renew_sent + 326 (seconds >> 1) * HZ - 1; 327 dout("%s sent %lu duration %d renew after %lu\n", __func__, 328 monc->sub_renew_sent, seconds, monc->sub_renew_after); 329 monc->sub_renew_sent = 0; 330 } else { 331 dout("%s sent %lu renew after %lu, ignoring\n", __func__, 332 monc->sub_renew_sent, monc->sub_renew_after); 333 } 334 mutex_unlock(&monc->mutex); 335 return; 336 bad: 337 pr_err("got corrupt subscribe-ack msg\n"); 338 ceph_msg_dump(msg); 339 } 340 341 /* 342 * Register interest in a map 343 * 344 * @sub: one of CEPH_SUB_* 345 * @epoch: X for "every map since X", or 0 for "just the latest" 346 */ 347 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub, 348 u32 epoch, bool continuous) 349 { 350 __le64 start = cpu_to_le64(epoch); 351 u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0; 352 353 dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub], 354 epoch, continuous); 355 356 if (monc->subs[sub].want && 357 monc->subs[sub].item.start == start && 358 monc->subs[sub].item.flags == flags) 359 return false; 360 361 monc->subs[sub].item.start = start; 362 monc->subs[sub].item.flags = flags; 363 monc->subs[sub].want = true; 364 365 return true; 366 } 367 368 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch, 369 bool continuous) 370 { 371 bool need_request; 372 373 mutex_lock(&monc->mutex); 374 need_request = __ceph_monc_want_map(monc, sub, epoch, continuous); 375 mutex_unlock(&monc->mutex); 376 377 return need_request; 378 } 379 EXPORT_SYMBOL(ceph_monc_want_map); 380 381 /* 382 * Keep track of which maps we have 383 * 384 * @sub: one of CEPH_SUB_* 385 */ 386 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub, 387 u32 epoch) 388 { 389 dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch); 390 391 if (monc->subs[sub].want) { 392 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME) 393 monc->subs[sub].want = false; 394 else 395 monc->subs[sub].item.start = cpu_to_le64(epoch + 1); 396 } 397 398 monc->subs[sub].have = epoch; 399 } 400 401 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch) 402 { 403 mutex_lock(&monc->mutex); 404 __ceph_monc_got_map(monc, sub, epoch); 405 mutex_unlock(&monc->mutex); 406 } 407 EXPORT_SYMBOL(ceph_monc_got_map); 408 409 void ceph_monc_renew_subs(struct ceph_mon_client *monc) 410 { 411 mutex_lock(&monc->mutex); 412 __send_subscribe(monc); 413 mutex_unlock(&monc->mutex); 414 } 415 EXPORT_SYMBOL(ceph_monc_renew_subs); 416 417 /* 418 * Wait for an osdmap with a given epoch. 419 * 420 * @epoch: epoch to wait for 421 * @timeout: in jiffies, 0 means "wait forever" 422 */ 423 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, 424 unsigned long timeout) 425 { 426 unsigned long started = jiffies; 427 long ret; 428 429 mutex_lock(&monc->mutex); 430 while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) { 431 mutex_unlock(&monc->mutex); 432 433 if (timeout && time_after_eq(jiffies, started + timeout)) 434 return -ETIMEDOUT; 435 436 ret = wait_event_interruptible_timeout(monc->client->auth_wq, 437 monc->subs[CEPH_SUB_OSDMAP].have >= epoch, 438 ceph_timeout_jiffies(timeout)); 439 if (ret < 0) 440 return ret; 441 442 mutex_lock(&monc->mutex); 443 } 444 445 mutex_unlock(&monc->mutex); 446 return 0; 447 } 448 EXPORT_SYMBOL(ceph_monc_wait_osdmap); 449 450 /* 451 * Open a session with a random monitor. Request monmap and osdmap, 452 * which are waited upon in __ceph_open_session(). 453 */ 454 int ceph_monc_open_session(struct ceph_mon_client *monc) 455 { 456 mutex_lock(&monc->mutex); 457 __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true); 458 __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false); 459 __open_session(monc); 460 __schedule_delayed(monc); 461 mutex_unlock(&monc->mutex); 462 return 0; 463 } 464 EXPORT_SYMBOL(ceph_monc_open_session); 465 466 static void ceph_monc_handle_map(struct ceph_mon_client *monc, 467 struct ceph_msg *msg) 468 { 469 struct ceph_client *client = monc->client; 470 struct ceph_monmap *monmap = NULL, *old = monc->monmap; 471 void *p, *end; 472 473 mutex_lock(&monc->mutex); 474 475 dout("handle_monmap\n"); 476 p = msg->front.iov_base; 477 end = p + msg->front.iov_len; 478 479 monmap = ceph_monmap_decode(p, end); 480 if (IS_ERR(monmap)) { 481 pr_err("problem decoding monmap, %d\n", 482 (int)PTR_ERR(monmap)); 483 ceph_msg_dump(msg); 484 goto out; 485 } 486 487 if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) { 488 kfree(monmap); 489 goto out; 490 } 491 492 client->monc.monmap = monmap; 493 kfree(old); 494 495 __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch); 496 client->have_fsid = true; 497 498 out: 499 mutex_unlock(&monc->mutex); 500 wake_up_all(&client->auth_wq); 501 } 502 503 /* 504 * generic requests (currently statfs, mon_get_version) 505 */ 506 DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node) 507 508 static void release_generic_request(struct kref *kref) 509 { 510 struct ceph_mon_generic_request *req = 511 container_of(kref, struct ceph_mon_generic_request, kref); 512 513 dout("%s greq %p request %p reply %p\n", __func__, req, req->request, 514 req->reply); 515 WARN_ON(!RB_EMPTY_NODE(&req->node)); 516 517 if (req->reply) 518 ceph_msg_put(req->reply); 519 if (req->request) 520 ceph_msg_put(req->request); 521 522 kfree(req); 523 } 524 525 static void put_generic_request(struct ceph_mon_generic_request *req) 526 { 527 if (req) 528 kref_put(&req->kref, release_generic_request); 529 } 530 531 static void get_generic_request(struct ceph_mon_generic_request *req) 532 { 533 kref_get(&req->kref); 534 } 535 536 static struct ceph_mon_generic_request * 537 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp) 538 { 539 struct ceph_mon_generic_request *req; 540 541 req = kzalloc(sizeof(*req), gfp); 542 if (!req) 543 return NULL; 544 545 req->monc = monc; 546 kref_init(&req->kref); 547 RB_CLEAR_NODE(&req->node); 548 init_completion(&req->completion); 549 550 dout("%s greq %p\n", __func__, req); 551 return req; 552 } 553 554 static void register_generic_request(struct ceph_mon_generic_request *req) 555 { 556 struct ceph_mon_client *monc = req->monc; 557 558 WARN_ON(req->tid); 559 560 get_generic_request(req); 561 req->tid = ++monc->last_tid; 562 insert_generic_request(&monc->generic_request_tree, req); 563 } 564 565 static void send_generic_request(struct ceph_mon_client *monc, 566 struct ceph_mon_generic_request *req) 567 { 568 WARN_ON(!req->tid); 569 570 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 571 req->request->hdr.tid = cpu_to_le64(req->tid); 572 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 573 } 574 575 static void __finish_generic_request(struct ceph_mon_generic_request *req) 576 { 577 struct ceph_mon_client *monc = req->monc; 578 579 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 580 erase_generic_request(&monc->generic_request_tree, req); 581 582 ceph_msg_revoke(req->request); 583 ceph_msg_revoke_incoming(req->reply); 584 } 585 586 static void finish_generic_request(struct ceph_mon_generic_request *req) 587 { 588 __finish_generic_request(req); 589 put_generic_request(req); 590 } 591 592 static void complete_generic_request(struct ceph_mon_generic_request *req) 593 { 594 if (req->complete_cb) 595 req->complete_cb(req); 596 else 597 complete_all(&req->completion); 598 put_generic_request(req); 599 } 600 601 static void cancel_generic_request(struct ceph_mon_generic_request *req) 602 { 603 struct ceph_mon_client *monc = req->monc; 604 struct ceph_mon_generic_request *lookup_req; 605 606 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 607 608 mutex_lock(&monc->mutex); 609 lookup_req = lookup_generic_request(&monc->generic_request_tree, 610 req->tid); 611 if (lookup_req) { 612 WARN_ON(lookup_req != req); 613 finish_generic_request(req); 614 } 615 616 mutex_unlock(&monc->mutex); 617 } 618 619 static int wait_generic_request(struct ceph_mon_generic_request *req) 620 { 621 int ret; 622 623 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 624 ret = wait_for_completion_interruptible(&req->completion); 625 if (ret) 626 cancel_generic_request(req); 627 else 628 ret = req->result; /* completed */ 629 630 return ret; 631 } 632 633 static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 634 struct ceph_msg_header *hdr, 635 int *skip) 636 { 637 struct ceph_mon_client *monc = con->private; 638 struct ceph_mon_generic_request *req; 639 u64 tid = le64_to_cpu(hdr->tid); 640 struct ceph_msg *m; 641 642 mutex_lock(&monc->mutex); 643 req = lookup_generic_request(&monc->generic_request_tree, tid); 644 if (!req) { 645 dout("get_generic_reply %lld dne\n", tid); 646 *skip = 1; 647 m = NULL; 648 } else { 649 dout("get_generic_reply %lld got %p\n", tid, req->reply); 650 *skip = 0; 651 m = ceph_msg_get(req->reply); 652 /* 653 * we don't need to track the connection reading into 654 * this reply because we only have one open connection 655 * at a time, ever. 656 */ 657 } 658 mutex_unlock(&monc->mutex); 659 return m; 660 } 661 662 /* 663 * statfs 664 */ 665 static void handle_statfs_reply(struct ceph_mon_client *monc, 666 struct ceph_msg *msg) 667 { 668 struct ceph_mon_generic_request *req; 669 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 670 u64 tid = le64_to_cpu(msg->hdr.tid); 671 672 dout("%s msg %p tid %llu\n", __func__, msg, tid); 673 674 if (msg->front.iov_len != sizeof(*reply)) 675 goto bad; 676 677 mutex_lock(&monc->mutex); 678 req = lookup_generic_request(&monc->generic_request_tree, tid); 679 if (!req) { 680 mutex_unlock(&monc->mutex); 681 return; 682 } 683 684 req->result = 0; 685 *req->u.st = reply->st; /* struct */ 686 __finish_generic_request(req); 687 mutex_unlock(&monc->mutex); 688 689 complete_generic_request(req); 690 return; 691 692 bad: 693 pr_err("corrupt statfs reply, tid %llu\n", tid); 694 ceph_msg_dump(msg); 695 } 696 697 /* 698 * Do a synchronous statfs(). 699 */ 700 int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool, 701 struct ceph_statfs *buf) 702 { 703 struct ceph_mon_generic_request *req; 704 struct ceph_mon_statfs *h; 705 int ret = -ENOMEM; 706 707 req = alloc_generic_request(monc, GFP_NOFS); 708 if (!req) 709 goto out; 710 711 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, 712 true); 713 if (!req->request) 714 goto out; 715 716 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true); 717 if (!req->reply) 718 goto out; 719 720 req->u.st = buf; 721 req->request->hdr.version = cpu_to_le16(2); 722 723 mutex_lock(&monc->mutex); 724 register_generic_request(req); 725 /* fill out request */ 726 h = req->request->front.iov_base; 727 h->monhdr.have_version = 0; 728 h->monhdr.session_mon = cpu_to_le16(-1); 729 h->monhdr.session_mon_tid = 0; 730 h->fsid = monc->monmap->fsid; 731 h->contains_data_pool = (data_pool != CEPH_NOPOOL); 732 h->data_pool = cpu_to_le64(data_pool); 733 send_generic_request(monc, req); 734 mutex_unlock(&monc->mutex); 735 736 ret = wait_generic_request(req); 737 out: 738 put_generic_request(req); 739 return ret; 740 } 741 EXPORT_SYMBOL(ceph_monc_do_statfs); 742 743 static void handle_get_version_reply(struct ceph_mon_client *monc, 744 struct ceph_msg *msg) 745 { 746 struct ceph_mon_generic_request *req; 747 u64 tid = le64_to_cpu(msg->hdr.tid); 748 void *p = msg->front.iov_base; 749 void *end = p + msg->front_alloc_len; 750 u64 handle; 751 752 dout("%s msg %p tid %llu\n", __func__, msg, tid); 753 754 ceph_decode_need(&p, end, 2*sizeof(u64), bad); 755 handle = ceph_decode_64(&p); 756 if (tid != 0 && tid != handle) 757 goto bad; 758 759 mutex_lock(&monc->mutex); 760 req = lookup_generic_request(&monc->generic_request_tree, handle); 761 if (!req) { 762 mutex_unlock(&monc->mutex); 763 return; 764 } 765 766 req->result = 0; 767 req->u.newest = ceph_decode_64(&p); 768 __finish_generic_request(req); 769 mutex_unlock(&monc->mutex); 770 771 complete_generic_request(req); 772 return; 773 774 bad: 775 pr_err("corrupt mon_get_version reply, tid %llu\n", tid); 776 ceph_msg_dump(msg); 777 } 778 779 static struct ceph_mon_generic_request * 780 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 781 ceph_monc_callback_t cb, u64 private_data) 782 { 783 struct ceph_mon_generic_request *req; 784 785 req = alloc_generic_request(monc, GFP_NOIO); 786 if (!req) 787 goto err_put_req; 788 789 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, 790 sizeof(u64) + sizeof(u32) + strlen(what), 791 GFP_NOIO, true); 792 if (!req->request) 793 goto err_put_req; 794 795 req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO, 796 true); 797 if (!req->reply) 798 goto err_put_req; 799 800 req->complete_cb = cb; 801 req->private_data = private_data; 802 803 mutex_lock(&monc->mutex); 804 register_generic_request(req); 805 { 806 void *p = req->request->front.iov_base; 807 void *const end = p + req->request->front_alloc_len; 808 809 ceph_encode_64(&p, req->tid); /* handle */ 810 ceph_encode_string(&p, end, what, strlen(what)); 811 WARN_ON(p != end); 812 } 813 send_generic_request(monc, req); 814 mutex_unlock(&monc->mutex); 815 816 return req; 817 818 err_put_req: 819 put_generic_request(req); 820 return ERR_PTR(-ENOMEM); 821 } 822 823 /* 824 * Send MMonGetVersion and wait for the reply. 825 * 826 * @what: one of "mdsmap", "osdmap" or "monmap" 827 */ 828 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 829 u64 *newest) 830 { 831 struct ceph_mon_generic_request *req; 832 int ret; 833 834 req = __ceph_monc_get_version(monc, what, NULL, 0); 835 if (IS_ERR(req)) 836 return PTR_ERR(req); 837 838 ret = wait_generic_request(req); 839 if (!ret) 840 *newest = req->u.newest; 841 842 put_generic_request(req); 843 return ret; 844 } 845 EXPORT_SYMBOL(ceph_monc_get_version); 846 847 /* 848 * Send MMonGetVersion, 849 * 850 * @what: one of "mdsmap", "osdmap" or "monmap" 851 */ 852 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, 853 ceph_monc_callback_t cb, u64 private_data) 854 { 855 struct ceph_mon_generic_request *req; 856 857 req = __ceph_monc_get_version(monc, what, cb, private_data); 858 if (IS_ERR(req)) 859 return PTR_ERR(req); 860 861 put_generic_request(req); 862 return 0; 863 } 864 EXPORT_SYMBOL(ceph_monc_get_version_async); 865 866 static void handle_command_ack(struct ceph_mon_client *monc, 867 struct ceph_msg *msg) 868 { 869 struct ceph_mon_generic_request *req; 870 void *p = msg->front.iov_base; 871 void *const end = p + msg->front_alloc_len; 872 u64 tid = le64_to_cpu(msg->hdr.tid); 873 874 dout("%s msg %p tid %llu\n", __func__, msg, tid); 875 876 ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) + 877 sizeof(u32), bad); 878 p += sizeof(struct ceph_mon_request_header); 879 880 mutex_lock(&monc->mutex); 881 req = lookup_generic_request(&monc->generic_request_tree, tid); 882 if (!req) { 883 mutex_unlock(&monc->mutex); 884 return; 885 } 886 887 req->result = ceph_decode_32(&p); 888 __finish_generic_request(req); 889 mutex_unlock(&monc->mutex); 890 891 complete_generic_request(req); 892 return; 893 894 bad: 895 pr_err("corrupt mon_command ack, tid %llu\n", tid); 896 ceph_msg_dump(msg); 897 } 898 899 int ceph_monc_blacklist_add(struct ceph_mon_client *monc, 900 struct ceph_entity_addr *client_addr) 901 { 902 struct ceph_mon_generic_request *req; 903 struct ceph_mon_command *h; 904 int ret = -ENOMEM; 905 int len; 906 907 req = alloc_generic_request(monc, GFP_NOIO); 908 if (!req) 909 goto out; 910 911 req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true); 912 if (!req->request) 913 goto out; 914 915 req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO, 916 true); 917 if (!req->reply) 918 goto out; 919 920 mutex_lock(&monc->mutex); 921 register_generic_request(req); 922 h = req->request->front.iov_base; 923 h->monhdr.have_version = 0; 924 h->monhdr.session_mon = cpu_to_le16(-1); 925 h->monhdr.session_mon_tid = 0; 926 h->fsid = monc->monmap->fsid; 927 h->num_strs = cpu_to_le32(1); 928 len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \ 929 \"blacklistop\": \"add\", \ 930 \"addr\": \"%pISpc/%u\" }", 931 &client_addr->in_addr, le32_to_cpu(client_addr->nonce)); 932 h->str_len = cpu_to_le32(len); 933 send_generic_request(monc, req); 934 mutex_unlock(&monc->mutex); 935 936 ret = wait_generic_request(req); 937 if (!ret) 938 /* 939 * Make sure we have the osdmap that includes the blacklist 940 * entry. This is needed to ensure that the OSDs pick up the 941 * new blacklist before processing any future requests from 942 * this client. 943 */ 944 ret = ceph_wait_for_latest_osdmap(monc->client, 0); 945 946 out: 947 put_generic_request(req); 948 return ret; 949 } 950 EXPORT_SYMBOL(ceph_monc_blacklist_add); 951 952 /* 953 * Resend pending generic requests. 954 */ 955 static void __resend_generic_request(struct ceph_mon_client *monc) 956 { 957 struct ceph_mon_generic_request *req; 958 struct rb_node *p; 959 960 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 961 req = rb_entry(p, struct ceph_mon_generic_request, node); 962 ceph_msg_revoke(req->request); 963 ceph_msg_revoke_incoming(req->reply); 964 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 965 } 966 } 967 968 /* 969 * Delayed work. If we haven't mounted yet, retry. Otherwise, 970 * renew/retry subscription as needed (in case it is timing out, or we 971 * got an ENOMEM). And keep the monitor connection alive. 972 */ 973 static void delayed_work(struct work_struct *work) 974 { 975 struct ceph_mon_client *monc = 976 container_of(work, struct ceph_mon_client, delayed_work.work); 977 978 dout("monc delayed_work\n"); 979 mutex_lock(&monc->mutex); 980 if (monc->hunting) { 981 dout("%s continuing hunt\n", __func__); 982 reopen_session(monc); 983 } else { 984 int is_auth = ceph_auth_is_authenticated(monc->auth); 985 if (ceph_con_keepalive_expired(&monc->con, 986 CEPH_MONC_PING_TIMEOUT)) { 987 dout("monc keepalive timeout\n"); 988 is_auth = 0; 989 reopen_session(monc); 990 } 991 992 if (!monc->hunting) { 993 ceph_con_keepalive(&monc->con); 994 __validate_auth(monc); 995 un_backoff(monc); 996 } 997 998 if (is_auth && 999 !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) { 1000 unsigned long now = jiffies; 1001 1002 dout("%s renew subs? now %lu renew after %lu\n", 1003 __func__, now, monc->sub_renew_after); 1004 if (time_after_eq(now, monc->sub_renew_after)) 1005 __send_subscribe(monc); 1006 } 1007 } 1008 __schedule_delayed(monc); 1009 mutex_unlock(&monc->mutex); 1010 } 1011 1012 /* 1013 * On startup, we build a temporary monmap populated with the IPs 1014 * provided by mount(2). 1015 */ 1016 static int build_initial_monmap(struct ceph_mon_client *monc) 1017 { 1018 struct ceph_options *opt = monc->client->options; 1019 struct ceph_entity_addr *mon_addr = opt->mon_addr; 1020 int num_mon = opt->num_mon; 1021 int i; 1022 1023 /* build initial monmap */ 1024 monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon), 1025 GFP_KERNEL); 1026 if (!monc->monmap) 1027 return -ENOMEM; 1028 for (i = 0; i < num_mon; i++) { 1029 monc->monmap->mon_inst[i].addr = mon_addr[i]; 1030 monc->monmap->mon_inst[i].addr.nonce = 0; 1031 monc->monmap->mon_inst[i].name.type = 1032 CEPH_ENTITY_TYPE_MON; 1033 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i); 1034 } 1035 monc->monmap->num_mon = num_mon; 1036 return 0; 1037 } 1038 1039 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 1040 { 1041 int err = 0; 1042 1043 dout("init\n"); 1044 memset(monc, 0, sizeof(*monc)); 1045 monc->client = cl; 1046 monc->monmap = NULL; 1047 mutex_init(&monc->mutex); 1048 1049 err = build_initial_monmap(monc); 1050 if (err) 1051 goto out; 1052 1053 /* connection */ 1054 /* authentication */ 1055 monc->auth = ceph_auth_init(cl->options->name, 1056 cl->options->key); 1057 if (IS_ERR(monc->auth)) { 1058 err = PTR_ERR(monc->auth); 1059 goto out_monmap; 1060 } 1061 monc->auth->want_keys = 1062 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 1063 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 1064 1065 /* msgs */ 1066 err = -ENOMEM; 1067 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 1068 sizeof(struct ceph_mon_subscribe_ack), 1069 GFP_KERNEL, true); 1070 if (!monc->m_subscribe_ack) 1071 goto out_auth; 1072 1073 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128, 1074 GFP_KERNEL, true); 1075 if (!monc->m_subscribe) 1076 goto out_subscribe_ack; 1077 1078 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, 1079 GFP_KERNEL, true); 1080 if (!monc->m_auth_reply) 1081 goto out_subscribe; 1082 1083 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true); 1084 monc->pending_auth = 0; 1085 if (!monc->m_auth) 1086 goto out_auth_reply; 1087 1088 ceph_con_init(&monc->con, monc, &mon_con_ops, 1089 &monc->client->msgr); 1090 1091 monc->cur_mon = -1; 1092 monc->had_a_connection = false; 1093 monc->hunt_mult = 1; 1094 1095 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 1096 monc->generic_request_tree = RB_ROOT; 1097 monc->last_tid = 0; 1098 1099 monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE; 1100 1101 return 0; 1102 1103 out_auth_reply: 1104 ceph_msg_put(monc->m_auth_reply); 1105 out_subscribe: 1106 ceph_msg_put(monc->m_subscribe); 1107 out_subscribe_ack: 1108 ceph_msg_put(monc->m_subscribe_ack); 1109 out_auth: 1110 ceph_auth_destroy(monc->auth); 1111 out_monmap: 1112 kfree(monc->monmap); 1113 out: 1114 return err; 1115 } 1116 EXPORT_SYMBOL(ceph_monc_init); 1117 1118 void ceph_monc_stop(struct ceph_mon_client *monc) 1119 { 1120 dout("stop\n"); 1121 cancel_delayed_work_sync(&monc->delayed_work); 1122 1123 mutex_lock(&monc->mutex); 1124 __close_session(monc); 1125 monc->cur_mon = -1; 1126 mutex_unlock(&monc->mutex); 1127 1128 /* 1129 * flush msgr queue before we destroy ourselves to ensure that: 1130 * - any work that references our embedded con is finished. 1131 * - any osd_client or other work that may reference an authorizer 1132 * finishes before we shut down the auth subsystem. 1133 */ 1134 ceph_msgr_flush(); 1135 1136 ceph_auth_destroy(monc->auth); 1137 1138 WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree)); 1139 1140 ceph_msg_put(monc->m_auth); 1141 ceph_msg_put(monc->m_auth_reply); 1142 ceph_msg_put(monc->m_subscribe); 1143 ceph_msg_put(monc->m_subscribe_ack); 1144 1145 kfree(monc->monmap); 1146 } 1147 EXPORT_SYMBOL(ceph_monc_stop); 1148 1149 static void finish_hunting(struct ceph_mon_client *monc) 1150 { 1151 if (monc->hunting) { 1152 dout("%s found mon%d\n", __func__, monc->cur_mon); 1153 monc->hunting = false; 1154 monc->had_a_connection = true; 1155 un_backoff(monc); 1156 __schedule_delayed(monc); 1157 } 1158 } 1159 1160 static void handle_auth_reply(struct ceph_mon_client *monc, 1161 struct ceph_msg *msg) 1162 { 1163 int ret; 1164 int was_auth = 0; 1165 1166 mutex_lock(&monc->mutex); 1167 was_auth = ceph_auth_is_authenticated(monc->auth); 1168 monc->pending_auth = 0; 1169 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 1170 msg->front.iov_len, 1171 monc->m_auth->front.iov_base, 1172 monc->m_auth->front_alloc_len); 1173 if (ret > 0) { 1174 __send_prepared_auth_request(monc, ret); 1175 goto out; 1176 } 1177 1178 finish_hunting(monc); 1179 1180 if (ret < 0) { 1181 monc->client->auth_err = ret; 1182 } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) { 1183 dout("authenticated, starting session\n"); 1184 1185 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 1186 monc->client->msgr.inst.name.num = 1187 cpu_to_le64(monc->auth->global_id); 1188 1189 __send_subscribe(monc); 1190 __resend_generic_request(monc); 1191 1192 pr_info("mon%d %s session established\n", monc->cur_mon, 1193 ceph_pr_addr(&monc->con.peer_addr)); 1194 } 1195 1196 out: 1197 mutex_unlock(&monc->mutex); 1198 if (monc->client->auth_err < 0) 1199 wake_up_all(&monc->client->auth_wq); 1200 } 1201 1202 static int __validate_auth(struct ceph_mon_client *monc) 1203 { 1204 int ret; 1205 1206 if (monc->pending_auth) 1207 return 0; 1208 1209 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 1210 monc->m_auth->front_alloc_len); 1211 if (ret <= 0) 1212 return ret; /* either an error, or no need to authenticate */ 1213 __send_prepared_auth_request(monc, ret); 1214 return 0; 1215 } 1216 1217 int ceph_monc_validate_auth(struct ceph_mon_client *monc) 1218 { 1219 int ret; 1220 1221 mutex_lock(&monc->mutex); 1222 ret = __validate_auth(monc); 1223 mutex_unlock(&monc->mutex); 1224 return ret; 1225 } 1226 EXPORT_SYMBOL(ceph_monc_validate_auth); 1227 1228 /* 1229 * handle incoming message 1230 */ 1231 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 1232 { 1233 struct ceph_mon_client *monc = con->private; 1234 int type = le16_to_cpu(msg->hdr.type); 1235 1236 if (!monc) 1237 return; 1238 1239 switch (type) { 1240 case CEPH_MSG_AUTH_REPLY: 1241 handle_auth_reply(monc, msg); 1242 break; 1243 1244 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1245 handle_subscribe_ack(monc, msg); 1246 break; 1247 1248 case CEPH_MSG_STATFS_REPLY: 1249 handle_statfs_reply(monc, msg); 1250 break; 1251 1252 case CEPH_MSG_MON_GET_VERSION_REPLY: 1253 handle_get_version_reply(monc, msg); 1254 break; 1255 1256 case CEPH_MSG_MON_COMMAND_ACK: 1257 handle_command_ack(monc, msg); 1258 break; 1259 1260 case CEPH_MSG_MON_MAP: 1261 ceph_monc_handle_map(monc, msg); 1262 break; 1263 1264 case CEPH_MSG_OSD_MAP: 1265 ceph_osdc_handle_map(&monc->client->osdc, msg); 1266 break; 1267 1268 default: 1269 /* can the chained handler handle it? */ 1270 if (monc->client->extra_mon_dispatch && 1271 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 1272 break; 1273 1274 pr_err("received unknown message type %d %s\n", type, 1275 ceph_msg_type_name(type)); 1276 } 1277 ceph_msg_put(msg); 1278 } 1279 1280 /* 1281 * Allocate memory for incoming message 1282 */ 1283 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 1284 struct ceph_msg_header *hdr, 1285 int *skip) 1286 { 1287 struct ceph_mon_client *monc = con->private; 1288 int type = le16_to_cpu(hdr->type); 1289 int front_len = le32_to_cpu(hdr->front_len); 1290 struct ceph_msg *m = NULL; 1291 1292 *skip = 0; 1293 1294 switch (type) { 1295 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1296 m = ceph_msg_get(monc->m_subscribe_ack); 1297 break; 1298 case CEPH_MSG_STATFS_REPLY: 1299 case CEPH_MSG_MON_COMMAND_ACK: 1300 return get_generic_reply(con, hdr, skip); 1301 case CEPH_MSG_AUTH_REPLY: 1302 m = ceph_msg_get(monc->m_auth_reply); 1303 break; 1304 case CEPH_MSG_MON_GET_VERSION_REPLY: 1305 if (le64_to_cpu(hdr->tid) != 0) 1306 return get_generic_reply(con, hdr, skip); 1307 1308 /* 1309 * Older OSDs don't set reply tid even if the orignal 1310 * request had a non-zero tid. Work around this weirdness 1311 * by allocating a new message. 1312 */ 1313 /* fall through */ 1314 case CEPH_MSG_MON_MAP: 1315 case CEPH_MSG_MDS_MAP: 1316 case CEPH_MSG_OSD_MAP: 1317 case CEPH_MSG_FS_MAP_USER: 1318 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1319 if (!m) 1320 return NULL; /* ENOMEM--return skip == 0 */ 1321 break; 1322 } 1323 1324 if (!m) { 1325 pr_info("alloc_msg unknown type %d\n", type); 1326 *skip = 1; 1327 } else if (front_len > m->front_alloc_len) { 1328 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n", 1329 front_len, m->front_alloc_len, 1330 (unsigned int)con->peer_name.type, 1331 le64_to_cpu(con->peer_name.num)); 1332 ceph_msg_put(m); 1333 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1334 } 1335 1336 return m; 1337 } 1338 1339 /* 1340 * If the monitor connection resets, pick a new monitor and resubmit 1341 * any pending requests. 1342 */ 1343 static void mon_fault(struct ceph_connection *con) 1344 { 1345 struct ceph_mon_client *monc = con->private; 1346 1347 mutex_lock(&monc->mutex); 1348 dout("%s mon%d\n", __func__, monc->cur_mon); 1349 if (monc->cur_mon >= 0) { 1350 if (!monc->hunting) { 1351 dout("%s hunting for new mon\n", __func__); 1352 reopen_session(monc); 1353 __schedule_delayed(monc); 1354 } else { 1355 dout("%s already hunting\n", __func__); 1356 } 1357 } 1358 mutex_unlock(&monc->mutex); 1359 } 1360 1361 /* 1362 * We can ignore refcounting on the connection struct, as all references 1363 * will come from the messenger workqueue, which is drained prior to 1364 * mon_client destruction. 1365 */ 1366 static struct ceph_connection *con_get(struct ceph_connection *con) 1367 { 1368 return con; 1369 } 1370 1371 static void con_put(struct ceph_connection *con) 1372 { 1373 } 1374 1375 static const struct ceph_connection_operations mon_con_ops = { 1376 .get = con_get, 1377 .put = con_put, 1378 .dispatch = dispatch, 1379 .fault = mon_fault, 1380 .alloc_msg = mon_alloc_msg, 1381 }; 1382