1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/types.h> 5 #include <linux/slab.h> 6 #include <linux/random.h> 7 #include <linux/sched.h> 8 9 #include <linux/ceph/ceph_features.h> 10 #include <linux/ceph/mon_client.h> 11 #include <linux/ceph/libceph.h> 12 #include <linux/ceph/debugfs.h> 13 #include <linux/ceph/decode.h> 14 #include <linux/ceph/auth.h> 15 16 /* 17 * Interact with Ceph monitor cluster. Handle requests for new map 18 * versions, and periodically resend as needed. Also implement 19 * statfs() and umount(). 20 * 21 * A small cluster of Ceph "monitors" are responsible for managing critical 22 * cluster configuration and state information. An odd number (e.g., 3, 5) 23 * of cmon daemons use a modified version of the Paxos part-time parliament 24 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 25 * list of clients who have mounted the file system. 26 * 27 * We maintain an open, active session with a monitor at all times in order to 28 * receive timely MDSMap updates. We periodically send a keepalive byte on the 29 * TCP socket to ensure we detect a failure. If the connection does break, we 30 * randomly hunt for a new monitor. Once the connection is reestablished, we 31 * resend any outstanding requests. 32 */ 33 34 static const struct ceph_connection_operations mon_con_ops; 35 36 static int __validate_auth(struct ceph_mon_client *monc); 37 38 /* 39 * Decode a monmap blob (e.g., during mount). 40 */ 41 struct ceph_monmap *ceph_monmap_decode(void *p, void *end) 42 { 43 struct ceph_monmap *m = NULL; 44 int i, err = -EINVAL; 45 struct ceph_fsid fsid; 46 u32 epoch, num_mon; 47 u32 len; 48 49 ceph_decode_32_safe(&p, end, len, bad); 50 ceph_decode_need(&p, end, len, bad); 51 52 dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p)); 53 p += sizeof(u16); /* skip version */ 54 55 ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); 56 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 57 epoch = ceph_decode_32(&p); 58 59 num_mon = ceph_decode_32(&p); 60 ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad); 61 62 if (num_mon >= CEPH_MAX_MON) 63 goto bad; 64 m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS); 65 if (m == NULL) 66 return ERR_PTR(-ENOMEM); 67 m->fsid = fsid; 68 m->epoch = epoch; 69 m->num_mon = num_mon; 70 ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0])); 71 for (i = 0; i < num_mon; i++) 72 ceph_decode_addr(&m->mon_inst[i].addr); 73 74 dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, 75 m->num_mon); 76 for (i = 0; i < m->num_mon; i++) 77 dout("monmap_decode mon%d is %s\n", i, 78 ceph_pr_addr(&m->mon_inst[i].addr.in_addr)); 79 return m; 80 81 bad: 82 dout("monmap_decode failed with %d\n", err); 83 kfree(m); 84 return ERR_PTR(err); 85 } 86 87 /* 88 * return true if *addr is included in the monmap. 89 */ 90 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 91 { 92 int i; 93 94 for (i = 0; i < m->num_mon; i++) 95 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0) 96 return 1; 97 return 0; 98 } 99 100 /* 101 * Send an auth request. 102 */ 103 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 104 { 105 monc->pending_auth = 1; 106 monc->m_auth->front.iov_len = len; 107 monc->m_auth->hdr.front_len = cpu_to_le32(len); 108 ceph_msg_revoke(monc->m_auth); 109 ceph_msg_get(monc->m_auth); /* keep our ref */ 110 ceph_con_send(&monc->con, monc->m_auth); 111 } 112 113 /* 114 * Close monitor session, if any. 115 */ 116 static void __close_session(struct ceph_mon_client *monc) 117 { 118 dout("__close_session closing mon%d\n", monc->cur_mon); 119 ceph_msg_revoke(monc->m_auth); 120 ceph_msg_revoke_incoming(monc->m_auth_reply); 121 ceph_msg_revoke(monc->m_subscribe); 122 ceph_msg_revoke_incoming(monc->m_subscribe_ack); 123 ceph_con_close(&monc->con); 124 125 monc->pending_auth = 0; 126 ceph_auth_reset(monc->auth); 127 } 128 129 /* 130 * Pick a new monitor at random and set cur_mon. If we are repicking 131 * (i.e. cur_mon is already set), be sure to pick a different one. 132 */ 133 static void pick_new_mon(struct ceph_mon_client *monc) 134 { 135 int old_mon = monc->cur_mon; 136 137 BUG_ON(monc->monmap->num_mon < 1); 138 139 if (monc->monmap->num_mon == 1) { 140 monc->cur_mon = 0; 141 } else { 142 int max = monc->monmap->num_mon; 143 int o = -1; 144 int n; 145 146 if (monc->cur_mon >= 0) { 147 if (monc->cur_mon < monc->monmap->num_mon) 148 o = monc->cur_mon; 149 if (o >= 0) 150 max--; 151 } 152 153 n = prandom_u32() % max; 154 if (o >= 0 && n >= o) 155 n++; 156 157 monc->cur_mon = n; 158 } 159 160 dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon, 161 monc->cur_mon, monc->monmap->num_mon); 162 } 163 164 /* 165 * Open a session with a new monitor. 166 */ 167 static void __open_session(struct ceph_mon_client *monc) 168 { 169 int ret; 170 171 pick_new_mon(monc); 172 173 monc->hunting = true; 174 if (monc->had_a_connection) { 175 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF; 176 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT) 177 monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT; 178 } 179 180 monc->sub_renew_after = jiffies; /* i.e., expired */ 181 monc->sub_renew_sent = 0; 182 183 dout("%s opening mon%d\n", __func__, monc->cur_mon); 184 ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon, 185 &monc->monmap->mon_inst[monc->cur_mon].addr); 186 187 /* 188 * send an initial keepalive to ensure our timestamp is valid 189 * by the time we are in an OPENED state 190 */ 191 ceph_con_keepalive(&monc->con); 192 193 /* initiate authentication handshake */ 194 ret = ceph_auth_build_hello(monc->auth, 195 monc->m_auth->front.iov_base, 196 monc->m_auth->front_alloc_len); 197 BUG_ON(ret <= 0); 198 __send_prepared_auth_request(monc, ret); 199 } 200 201 static void reopen_session(struct ceph_mon_client *monc) 202 { 203 if (!monc->hunting) 204 pr_info("mon%d %s session lost, hunting for new mon\n", 205 monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr)); 206 207 __close_session(monc); 208 __open_session(monc); 209 } 210 211 /* 212 * Reschedule delayed work timer. 213 */ 214 static void __schedule_delayed(struct ceph_mon_client *monc) 215 { 216 unsigned long delay; 217 218 if (monc->hunting) 219 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult; 220 else 221 delay = CEPH_MONC_PING_INTERVAL; 222 223 dout("__schedule_delayed after %lu\n", delay); 224 mod_delayed_work(system_wq, &monc->delayed_work, 225 round_jiffies_relative(delay)); 226 } 227 228 const char *ceph_sub_str[] = { 229 [CEPH_SUB_MONMAP] = "monmap", 230 [CEPH_SUB_OSDMAP] = "osdmap", 231 [CEPH_SUB_FSMAP] = "fsmap.user", 232 [CEPH_SUB_MDSMAP] = "mdsmap", 233 }; 234 235 /* 236 * Send subscribe request for one or more maps, according to 237 * monc->subs. 238 */ 239 static void __send_subscribe(struct ceph_mon_client *monc) 240 { 241 struct ceph_msg *msg = monc->m_subscribe; 242 void *p = msg->front.iov_base; 243 void *const end = p + msg->front_alloc_len; 244 int num = 0; 245 int i; 246 247 dout("%s sent %lu\n", __func__, monc->sub_renew_sent); 248 249 BUG_ON(monc->cur_mon < 0); 250 251 if (!monc->sub_renew_sent) 252 monc->sub_renew_sent = jiffies | 1; /* never 0 */ 253 254 msg->hdr.version = cpu_to_le16(2); 255 256 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 257 if (monc->subs[i].want) 258 num++; 259 } 260 BUG_ON(num < 1); /* monmap sub is always there */ 261 ceph_encode_32(&p, num); 262 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 263 char buf[32]; 264 int len; 265 266 if (!monc->subs[i].want) 267 continue; 268 269 len = sprintf(buf, "%s", ceph_sub_str[i]); 270 if (i == CEPH_SUB_MDSMAP && 271 monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE) 272 len += sprintf(buf + len, ".%d", monc->fs_cluster_id); 273 274 dout("%s %s start %llu flags 0x%x\n", __func__, buf, 275 le64_to_cpu(monc->subs[i].item.start), 276 monc->subs[i].item.flags); 277 ceph_encode_string(&p, end, buf, len); 278 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item)); 279 p += sizeof(monc->subs[i].item); 280 } 281 282 BUG_ON(p > end); 283 msg->front.iov_len = p - msg->front.iov_base; 284 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 285 ceph_msg_revoke(msg); 286 ceph_con_send(&monc->con, ceph_msg_get(msg)); 287 } 288 289 static void handle_subscribe_ack(struct ceph_mon_client *monc, 290 struct ceph_msg *msg) 291 { 292 unsigned int seconds; 293 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 294 295 if (msg->front.iov_len < sizeof(*h)) 296 goto bad; 297 seconds = le32_to_cpu(h->duration); 298 299 mutex_lock(&monc->mutex); 300 if (monc->sub_renew_sent) { 301 /* 302 * This is only needed for legacy (infernalis or older) 303 * MONs -- see delayed_work(). 304 */ 305 monc->sub_renew_after = monc->sub_renew_sent + 306 (seconds >> 1) * HZ - 1; 307 dout("%s sent %lu duration %d renew after %lu\n", __func__, 308 monc->sub_renew_sent, seconds, monc->sub_renew_after); 309 monc->sub_renew_sent = 0; 310 } else { 311 dout("%s sent %lu renew after %lu, ignoring\n", __func__, 312 monc->sub_renew_sent, monc->sub_renew_after); 313 } 314 mutex_unlock(&monc->mutex); 315 return; 316 bad: 317 pr_err("got corrupt subscribe-ack msg\n"); 318 ceph_msg_dump(msg); 319 } 320 321 /* 322 * Register interest in a map 323 * 324 * @sub: one of CEPH_SUB_* 325 * @epoch: X for "every map since X", or 0 for "just the latest" 326 */ 327 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub, 328 u32 epoch, bool continuous) 329 { 330 __le64 start = cpu_to_le64(epoch); 331 u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0; 332 333 dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub], 334 epoch, continuous); 335 336 if (monc->subs[sub].want && 337 monc->subs[sub].item.start == start && 338 monc->subs[sub].item.flags == flags) 339 return false; 340 341 monc->subs[sub].item.start = start; 342 monc->subs[sub].item.flags = flags; 343 monc->subs[sub].want = true; 344 345 return true; 346 } 347 348 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch, 349 bool continuous) 350 { 351 bool need_request; 352 353 mutex_lock(&monc->mutex); 354 need_request = __ceph_monc_want_map(monc, sub, epoch, continuous); 355 mutex_unlock(&monc->mutex); 356 357 return need_request; 358 } 359 EXPORT_SYMBOL(ceph_monc_want_map); 360 361 /* 362 * Keep track of which maps we have 363 * 364 * @sub: one of CEPH_SUB_* 365 */ 366 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub, 367 u32 epoch) 368 { 369 dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch); 370 371 if (monc->subs[sub].want) { 372 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME) 373 monc->subs[sub].want = false; 374 else 375 monc->subs[sub].item.start = cpu_to_le64(epoch + 1); 376 } 377 378 monc->subs[sub].have = epoch; 379 } 380 381 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch) 382 { 383 mutex_lock(&monc->mutex); 384 __ceph_monc_got_map(monc, sub, epoch); 385 mutex_unlock(&monc->mutex); 386 } 387 EXPORT_SYMBOL(ceph_monc_got_map); 388 389 void ceph_monc_renew_subs(struct ceph_mon_client *monc) 390 { 391 mutex_lock(&monc->mutex); 392 __send_subscribe(monc); 393 mutex_unlock(&monc->mutex); 394 } 395 EXPORT_SYMBOL(ceph_monc_renew_subs); 396 397 /* 398 * Wait for an osdmap with a given epoch. 399 * 400 * @epoch: epoch to wait for 401 * @timeout: in jiffies, 0 means "wait forever" 402 */ 403 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, 404 unsigned long timeout) 405 { 406 unsigned long started = jiffies; 407 long ret; 408 409 mutex_lock(&monc->mutex); 410 while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) { 411 mutex_unlock(&monc->mutex); 412 413 if (timeout && time_after_eq(jiffies, started + timeout)) 414 return -ETIMEDOUT; 415 416 ret = wait_event_interruptible_timeout(monc->client->auth_wq, 417 monc->subs[CEPH_SUB_OSDMAP].have >= epoch, 418 ceph_timeout_jiffies(timeout)); 419 if (ret < 0) 420 return ret; 421 422 mutex_lock(&monc->mutex); 423 } 424 425 mutex_unlock(&monc->mutex); 426 return 0; 427 } 428 EXPORT_SYMBOL(ceph_monc_wait_osdmap); 429 430 /* 431 * Open a session with a random monitor. Request monmap and osdmap, 432 * which are waited upon in __ceph_open_session(). 433 */ 434 int ceph_monc_open_session(struct ceph_mon_client *monc) 435 { 436 mutex_lock(&monc->mutex); 437 __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true); 438 __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false); 439 __open_session(monc); 440 __schedule_delayed(monc); 441 mutex_unlock(&monc->mutex); 442 return 0; 443 } 444 EXPORT_SYMBOL(ceph_monc_open_session); 445 446 static void ceph_monc_handle_map(struct ceph_mon_client *monc, 447 struct ceph_msg *msg) 448 { 449 struct ceph_client *client = monc->client; 450 struct ceph_monmap *monmap = NULL, *old = monc->monmap; 451 void *p, *end; 452 453 mutex_lock(&monc->mutex); 454 455 dout("handle_monmap\n"); 456 p = msg->front.iov_base; 457 end = p + msg->front.iov_len; 458 459 monmap = ceph_monmap_decode(p, end); 460 if (IS_ERR(monmap)) { 461 pr_err("problem decoding monmap, %d\n", 462 (int)PTR_ERR(monmap)); 463 goto out; 464 } 465 466 if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) { 467 kfree(monmap); 468 goto out; 469 } 470 471 client->monc.monmap = monmap; 472 kfree(old); 473 474 __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch); 475 client->have_fsid = true; 476 477 out: 478 mutex_unlock(&monc->mutex); 479 wake_up_all(&client->auth_wq); 480 } 481 482 /* 483 * generic requests (currently statfs, mon_get_version) 484 */ 485 DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node) 486 487 static void release_generic_request(struct kref *kref) 488 { 489 struct ceph_mon_generic_request *req = 490 container_of(kref, struct ceph_mon_generic_request, kref); 491 492 dout("%s greq %p request %p reply %p\n", __func__, req, req->request, 493 req->reply); 494 WARN_ON(!RB_EMPTY_NODE(&req->node)); 495 496 if (req->reply) 497 ceph_msg_put(req->reply); 498 if (req->request) 499 ceph_msg_put(req->request); 500 501 kfree(req); 502 } 503 504 static void put_generic_request(struct ceph_mon_generic_request *req) 505 { 506 if (req) 507 kref_put(&req->kref, release_generic_request); 508 } 509 510 static void get_generic_request(struct ceph_mon_generic_request *req) 511 { 512 kref_get(&req->kref); 513 } 514 515 static struct ceph_mon_generic_request * 516 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp) 517 { 518 struct ceph_mon_generic_request *req; 519 520 req = kzalloc(sizeof(*req), gfp); 521 if (!req) 522 return NULL; 523 524 req->monc = monc; 525 kref_init(&req->kref); 526 RB_CLEAR_NODE(&req->node); 527 init_completion(&req->completion); 528 529 dout("%s greq %p\n", __func__, req); 530 return req; 531 } 532 533 static void register_generic_request(struct ceph_mon_generic_request *req) 534 { 535 struct ceph_mon_client *monc = req->monc; 536 537 WARN_ON(req->tid); 538 539 get_generic_request(req); 540 req->tid = ++monc->last_tid; 541 insert_generic_request(&monc->generic_request_tree, req); 542 } 543 544 static void send_generic_request(struct ceph_mon_client *monc, 545 struct ceph_mon_generic_request *req) 546 { 547 WARN_ON(!req->tid); 548 549 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 550 req->request->hdr.tid = cpu_to_le64(req->tid); 551 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 552 } 553 554 static void __finish_generic_request(struct ceph_mon_generic_request *req) 555 { 556 struct ceph_mon_client *monc = req->monc; 557 558 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 559 erase_generic_request(&monc->generic_request_tree, req); 560 561 ceph_msg_revoke(req->request); 562 ceph_msg_revoke_incoming(req->reply); 563 } 564 565 static void finish_generic_request(struct ceph_mon_generic_request *req) 566 { 567 __finish_generic_request(req); 568 put_generic_request(req); 569 } 570 571 static void complete_generic_request(struct ceph_mon_generic_request *req) 572 { 573 if (req->complete_cb) 574 req->complete_cb(req); 575 else 576 complete_all(&req->completion); 577 put_generic_request(req); 578 } 579 580 static void cancel_generic_request(struct ceph_mon_generic_request *req) 581 { 582 struct ceph_mon_client *monc = req->monc; 583 struct ceph_mon_generic_request *lookup_req; 584 585 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 586 587 mutex_lock(&monc->mutex); 588 lookup_req = lookup_generic_request(&monc->generic_request_tree, 589 req->tid); 590 if (lookup_req) { 591 WARN_ON(lookup_req != req); 592 finish_generic_request(req); 593 } 594 595 mutex_unlock(&monc->mutex); 596 } 597 598 static int wait_generic_request(struct ceph_mon_generic_request *req) 599 { 600 int ret; 601 602 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 603 ret = wait_for_completion_interruptible(&req->completion); 604 if (ret) 605 cancel_generic_request(req); 606 else 607 ret = req->result; /* completed */ 608 609 return ret; 610 } 611 612 static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 613 struct ceph_msg_header *hdr, 614 int *skip) 615 { 616 struct ceph_mon_client *monc = con->private; 617 struct ceph_mon_generic_request *req; 618 u64 tid = le64_to_cpu(hdr->tid); 619 struct ceph_msg *m; 620 621 mutex_lock(&monc->mutex); 622 req = lookup_generic_request(&monc->generic_request_tree, tid); 623 if (!req) { 624 dout("get_generic_reply %lld dne\n", tid); 625 *skip = 1; 626 m = NULL; 627 } else { 628 dout("get_generic_reply %lld got %p\n", tid, req->reply); 629 *skip = 0; 630 m = ceph_msg_get(req->reply); 631 /* 632 * we don't need to track the connection reading into 633 * this reply because we only have one open connection 634 * at a time, ever. 635 */ 636 } 637 mutex_unlock(&monc->mutex); 638 return m; 639 } 640 641 /* 642 * statfs 643 */ 644 static void handle_statfs_reply(struct ceph_mon_client *monc, 645 struct ceph_msg *msg) 646 { 647 struct ceph_mon_generic_request *req; 648 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 649 u64 tid = le64_to_cpu(msg->hdr.tid); 650 651 dout("%s msg %p tid %llu\n", __func__, msg, tid); 652 653 if (msg->front.iov_len != sizeof(*reply)) 654 goto bad; 655 656 mutex_lock(&monc->mutex); 657 req = lookup_generic_request(&monc->generic_request_tree, tid); 658 if (!req) { 659 mutex_unlock(&monc->mutex); 660 return; 661 } 662 663 req->result = 0; 664 *req->u.st = reply->st; /* struct */ 665 __finish_generic_request(req); 666 mutex_unlock(&monc->mutex); 667 668 complete_generic_request(req); 669 return; 670 671 bad: 672 pr_err("corrupt statfs reply, tid %llu\n", tid); 673 ceph_msg_dump(msg); 674 } 675 676 /* 677 * Do a synchronous statfs(). 678 */ 679 int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) 680 { 681 struct ceph_mon_generic_request *req; 682 struct ceph_mon_statfs *h; 683 int ret = -ENOMEM; 684 685 req = alloc_generic_request(monc, GFP_NOFS); 686 if (!req) 687 goto out; 688 689 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, 690 true); 691 if (!req->request) 692 goto out; 693 694 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true); 695 if (!req->reply) 696 goto out; 697 698 req->u.st = buf; 699 700 mutex_lock(&monc->mutex); 701 register_generic_request(req); 702 /* fill out request */ 703 h = req->request->front.iov_base; 704 h->monhdr.have_version = 0; 705 h->monhdr.session_mon = cpu_to_le16(-1); 706 h->monhdr.session_mon_tid = 0; 707 h->fsid = monc->monmap->fsid; 708 send_generic_request(monc, req); 709 mutex_unlock(&monc->mutex); 710 711 ret = wait_generic_request(req); 712 out: 713 put_generic_request(req); 714 return ret; 715 } 716 EXPORT_SYMBOL(ceph_monc_do_statfs); 717 718 static void handle_get_version_reply(struct ceph_mon_client *monc, 719 struct ceph_msg *msg) 720 { 721 struct ceph_mon_generic_request *req; 722 u64 tid = le64_to_cpu(msg->hdr.tid); 723 void *p = msg->front.iov_base; 724 void *end = p + msg->front_alloc_len; 725 u64 handle; 726 727 dout("%s msg %p tid %llu\n", __func__, msg, tid); 728 729 ceph_decode_need(&p, end, 2*sizeof(u64), bad); 730 handle = ceph_decode_64(&p); 731 if (tid != 0 && tid != handle) 732 goto bad; 733 734 mutex_lock(&monc->mutex); 735 req = lookup_generic_request(&monc->generic_request_tree, handle); 736 if (!req) { 737 mutex_unlock(&monc->mutex); 738 return; 739 } 740 741 req->result = 0; 742 req->u.newest = ceph_decode_64(&p); 743 __finish_generic_request(req); 744 mutex_unlock(&monc->mutex); 745 746 complete_generic_request(req); 747 return; 748 749 bad: 750 pr_err("corrupt mon_get_version reply, tid %llu\n", tid); 751 ceph_msg_dump(msg); 752 } 753 754 static struct ceph_mon_generic_request * 755 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 756 ceph_monc_callback_t cb, u64 private_data) 757 { 758 struct ceph_mon_generic_request *req; 759 760 req = alloc_generic_request(monc, GFP_NOIO); 761 if (!req) 762 goto err_put_req; 763 764 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, 765 sizeof(u64) + sizeof(u32) + strlen(what), 766 GFP_NOIO, true); 767 if (!req->request) 768 goto err_put_req; 769 770 req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO, 771 true); 772 if (!req->reply) 773 goto err_put_req; 774 775 req->complete_cb = cb; 776 req->private_data = private_data; 777 778 mutex_lock(&monc->mutex); 779 register_generic_request(req); 780 { 781 void *p = req->request->front.iov_base; 782 void *const end = p + req->request->front_alloc_len; 783 784 ceph_encode_64(&p, req->tid); /* handle */ 785 ceph_encode_string(&p, end, what, strlen(what)); 786 WARN_ON(p != end); 787 } 788 send_generic_request(monc, req); 789 mutex_unlock(&monc->mutex); 790 791 return req; 792 793 err_put_req: 794 put_generic_request(req); 795 return ERR_PTR(-ENOMEM); 796 } 797 798 /* 799 * Send MMonGetVersion and wait for the reply. 800 * 801 * @what: one of "mdsmap", "osdmap" or "monmap" 802 */ 803 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 804 u64 *newest) 805 { 806 struct ceph_mon_generic_request *req; 807 int ret; 808 809 req = __ceph_monc_get_version(monc, what, NULL, 0); 810 if (IS_ERR(req)) 811 return PTR_ERR(req); 812 813 ret = wait_generic_request(req); 814 if (!ret) 815 *newest = req->u.newest; 816 817 put_generic_request(req); 818 return ret; 819 } 820 EXPORT_SYMBOL(ceph_monc_get_version); 821 822 /* 823 * Send MMonGetVersion, 824 * 825 * @what: one of "mdsmap", "osdmap" or "monmap" 826 */ 827 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, 828 ceph_monc_callback_t cb, u64 private_data) 829 { 830 struct ceph_mon_generic_request *req; 831 832 req = __ceph_monc_get_version(monc, what, cb, private_data); 833 if (IS_ERR(req)) 834 return PTR_ERR(req); 835 836 put_generic_request(req); 837 return 0; 838 } 839 EXPORT_SYMBOL(ceph_monc_get_version_async); 840 841 static void handle_command_ack(struct ceph_mon_client *monc, 842 struct ceph_msg *msg) 843 { 844 struct ceph_mon_generic_request *req; 845 void *p = msg->front.iov_base; 846 void *const end = p + msg->front_alloc_len; 847 u64 tid = le64_to_cpu(msg->hdr.tid); 848 849 dout("%s msg %p tid %llu\n", __func__, msg, tid); 850 851 ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) + 852 sizeof(u32), bad); 853 p += sizeof(struct ceph_mon_request_header); 854 855 mutex_lock(&monc->mutex); 856 req = lookup_generic_request(&monc->generic_request_tree, tid); 857 if (!req) { 858 mutex_unlock(&monc->mutex); 859 return; 860 } 861 862 req->result = ceph_decode_32(&p); 863 __finish_generic_request(req); 864 mutex_unlock(&monc->mutex); 865 866 complete_generic_request(req); 867 return; 868 869 bad: 870 pr_err("corrupt mon_command ack, tid %llu\n", tid); 871 ceph_msg_dump(msg); 872 } 873 874 int ceph_monc_blacklist_add(struct ceph_mon_client *monc, 875 struct ceph_entity_addr *client_addr) 876 { 877 struct ceph_mon_generic_request *req; 878 struct ceph_mon_command *h; 879 int ret = -ENOMEM; 880 int len; 881 882 req = alloc_generic_request(monc, GFP_NOIO); 883 if (!req) 884 goto out; 885 886 req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true); 887 if (!req->request) 888 goto out; 889 890 req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO, 891 true); 892 if (!req->reply) 893 goto out; 894 895 mutex_lock(&monc->mutex); 896 register_generic_request(req); 897 h = req->request->front.iov_base; 898 h->monhdr.have_version = 0; 899 h->monhdr.session_mon = cpu_to_le16(-1); 900 h->monhdr.session_mon_tid = 0; 901 h->fsid = monc->monmap->fsid; 902 h->num_strs = cpu_to_le32(1); 903 len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \ 904 \"blacklistop\": \"add\", \ 905 \"addr\": \"%pISpc/%u\" }", 906 &client_addr->in_addr, le32_to_cpu(client_addr->nonce)); 907 h->str_len = cpu_to_le32(len); 908 send_generic_request(monc, req); 909 mutex_unlock(&monc->mutex); 910 911 ret = wait_generic_request(req); 912 out: 913 put_generic_request(req); 914 return ret; 915 } 916 EXPORT_SYMBOL(ceph_monc_blacklist_add); 917 918 /* 919 * Resend pending generic requests. 920 */ 921 static void __resend_generic_request(struct ceph_mon_client *monc) 922 { 923 struct ceph_mon_generic_request *req; 924 struct rb_node *p; 925 926 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 927 req = rb_entry(p, struct ceph_mon_generic_request, node); 928 ceph_msg_revoke(req->request); 929 ceph_msg_revoke_incoming(req->reply); 930 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 931 } 932 } 933 934 /* 935 * Delayed work. If we haven't mounted yet, retry. Otherwise, 936 * renew/retry subscription as needed (in case it is timing out, or we 937 * got an ENOMEM). And keep the monitor connection alive. 938 */ 939 static void delayed_work(struct work_struct *work) 940 { 941 struct ceph_mon_client *monc = 942 container_of(work, struct ceph_mon_client, delayed_work.work); 943 944 dout("monc delayed_work\n"); 945 mutex_lock(&monc->mutex); 946 if (monc->hunting) { 947 dout("%s continuing hunt\n", __func__); 948 reopen_session(monc); 949 } else { 950 int is_auth = ceph_auth_is_authenticated(monc->auth); 951 if (ceph_con_keepalive_expired(&monc->con, 952 CEPH_MONC_PING_TIMEOUT)) { 953 dout("monc keepalive timeout\n"); 954 is_auth = 0; 955 reopen_session(monc); 956 } 957 958 if (!monc->hunting) { 959 ceph_con_keepalive(&monc->con); 960 __validate_auth(monc); 961 } 962 963 if (is_auth && 964 !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) { 965 unsigned long now = jiffies; 966 967 dout("%s renew subs? now %lu renew after %lu\n", 968 __func__, now, monc->sub_renew_after); 969 if (time_after_eq(now, monc->sub_renew_after)) 970 __send_subscribe(monc); 971 } 972 } 973 __schedule_delayed(monc); 974 mutex_unlock(&monc->mutex); 975 } 976 977 /* 978 * On startup, we build a temporary monmap populated with the IPs 979 * provided by mount(2). 980 */ 981 static int build_initial_monmap(struct ceph_mon_client *monc) 982 { 983 struct ceph_options *opt = monc->client->options; 984 struct ceph_entity_addr *mon_addr = opt->mon_addr; 985 int num_mon = opt->num_mon; 986 int i; 987 988 /* build initial monmap */ 989 monc->monmap = kzalloc(sizeof(*monc->monmap) + 990 num_mon*sizeof(monc->monmap->mon_inst[0]), 991 GFP_KERNEL); 992 if (!monc->monmap) 993 return -ENOMEM; 994 for (i = 0; i < num_mon; i++) { 995 monc->monmap->mon_inst[i].addr = mon_addr[i]; 996 monc->monmap->mon_inst[i].addr.nonce = 0; 997 monc->monmap->mon_inst[i].name.type = 998 CEPH_ENTITY_TYPE_MON; 999 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i); 1000 } 1001 monc->monmap->num_mon = num_mon; 1002 return 0; 1003 } 1004 1005 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 1006 { 1007 int err = 0; 1008 1009 dout("init\n"); 1010 memset(monc, 0, sizeof(*monc)); 1011 monc->client = cl; 1012 monc->monmap = NULL; 1013 mutex_init(&monc->mutex); 1014 1015 err = build_initial_monmap(monc); 1016 if (err) 1017 goto out; 1018 1019 /* connection */ 1020 /* authentication */ 1021 monc->auth = ceph_auth_init(cl->options->name, 1022 cl->options->key); 1023 if (IS_ERR(monc->auth)) { 1024 err = PTR_ERR(monc->auth); 1025 goto out_monmap; 1026 } 1027 monc->auth->want_keys = 1028 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 1029 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 1030 1031 /* msgs */ 1032 err = -ENOMEM; 1033 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 1034 sizeof(struct ceph_mon_subscribe_ack), 1035 GFP_KERNEL, true); 1036 if (!monc->m_subscribe_ack) 1037 goto out_auth; 1038 1039 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128, 1040 GFP_KERNEL, true); 1041 if (!monc->m_subscribe) 1042 goto out_subscribe_ack; 1043 1044 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, 1045 GFP_KERNEL, true); 1046 if (!monc->m_auth_reply) 1047 goto out_subscribe; 1048 1049 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true); 1050 monc->pending_auth = 0; 1051 if (!monc->m_auth) 1052 goto out_auth_reply; 1053 1054 ceph_con_init(&monc->con, monc, &mon_con_ops, 1055 &monc->client->msgr); 1056 1057 monc->cur_mon = -1; 1058 monc->had_a_connection = false; 1059 monc->hunt_mult = 1; 1060 1061 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 1062 monc->generic_request_tree = RB_ROOT; 1063 monc->last_tid = 0; 1064 1065 monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE; 1066 1067 return 0; 1068 1069 out_auth_reply: 1070 ceph_msg_put(monc->m_auth_reply); 1071 out_subscribe: 1072 ceph_msg_put(monc->m_subscribe); 1073 out_subscribe_ack: 1074 ceph_msg_put(monc->m_subscribe_ack); 1075 out_auth: 1076 ceph_auth_destroy(monc->auth); 1077 out_monmap: 1078 kfree(monc->monmap); 1079 out: 1080 return err; 1081 } 1082 EXPORT_SYMBOL(ceph_monc_init); 1083 1084 void ceph_monc_stop(struct ceph_mon_client *monc) 1085 { 1086 dout("stop\n"); 1087 cancel_delayed_work_sync(&monc->delayed_work); 1088 1089 mutex_lock(&monc->mutex); 1090 __close_session(monc); 1091 monc->cur_mon = -1; 1092 mutex_unlock(&monc->mutex); 1093 1094 /* 1095 * flush msgr queue before we destroy ourselves to ensure that: 1096 * - any work that references our embedded con is finished. 1097 * - any osd_client or other work that may reference an authorizer 1098 * finishes before we shut down the auth subsystem. 1099 */ 1100 ceph_msgr_flush(); 1101 1102 ceph_auth_destroy(monc->auth); 1103 1104 WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree)); 1105 1106 ceph_msg_put(monc->m_auth); 1107 ceph_msg_put(monc->m_auth_reply); 1108 ceph_msg_put(monc->m_subscribe); 1109 ceph_msg_put(monc->m_subscribe_ack); 1110 1111 kfree(monc->monmap); 1112 } 1113 EXPORT_SYMBOL(ceph_monc_stop); 1114 1115 static void finish_hunting(struct ceph_mon_client *monc) 1116 { 1117 if (monc->hunting) { 1118 dout("%s found mon%d\n", __func__, monc->cur_mon); 1119 monc->hunting = false; 1120 monc->had_a_connection = true; 1121 monc->hunt_mult /= 2; /* reduce by 50% */ 1122 if (monc->hunt_mult < 1) 1123 monc->hunt_mult = 1; 1124 } 1125 } 1126 1127 static void handle_auth_reply(struct ceph_mon_client *monc, 1128 struct ceph_msg *msg) 1129 { 1130 int ret; 1131 int was_auth = 0; 1132 1133 mutex_lock(&monc->mutex); 1134 was_auth = ceph_auth_is_authenticated(monc->auth); 1135 monc->pending_auth = 0; 1136 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 1137 msg->front.iov_len, 1138 monc->m_auth->front.iov_base, 1139 monc->m_auth->front_alloc_len); 1140 if (ret > 0) { 1141 __send_prepared_auth_request(monc, ret); 1142 goto out; 1143 } 1144 1145 finish_hunting(monc); 1146 1147 if (ret < 0) { 1148 monc->client->auth_err = ret; 1149 } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) { 1150 dout("authenticated, starting session\n"); 1151 1152 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 1153 monc->client->msgr.inst.name.num = 1154 cpu_to_le64(monc->auth->global_id); 1155 1156 __send_subscribe(monc); 1157 __resend_generic_request(monc); 1158 1159 pr_info("mon%d %s session established\n", monc->cur_mon, 1160 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 1161 } 1162 1163 out: 1164 mutex_unlock(&monc->mutex); 1165 if (monc->client->auth_err < 0) 1166 wake_up_all(&monc->client->auth_wq); 1167 } 1168 1169 static int __validate_auth(struct ceph_mon_client *monc) 1170 { 1171 int ret; 1172 1173 if (monc->pending_auth) 1174 return 0; 1175 1176 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 1177 monc->m_auth->front_alloc_len); 1178 if (ret <= 0) 1179 return ret; /* either an error, or no need to authenticate */ 1180 __send_prepared_auth_request(monc, ret); 1181 return 0; 1182 } 1183 1184 int ceph_monc_validate_auth(struct ceph_mon_client *monc) 1185 { 1186 int ret; 1187 1188 mutex_lock(&monc->mutex); 1189 ret = __validate_auth(monc); 1190 mutex_unlock(&monc->mutex); 1191 return ret; 1192 } 1193 EXPORT_SYMBOL(ceph_monc_validate_auth); 1194 1195 /* 1196 * handle incoming message 1197 */ 1198 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 1199 { 1200 struct ceph_mon_client *monc = con->private; 1201 int type = le16_to_cpu(msg->hdr.type); 1202 1203 if (!monc) 1204 return; 1205 1206 switch (type) { 1207 case CEPH_MSG_AUTH_REPLY: 1208 handle_auth_reply(monc, msg); 1209 break; 1210 1211 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1212 handle_subscribe_ack(monc, msg); 1213 break; 1214 1215 case CEPH_MSG_STATFS_REPLY: 1216 handle_statfs_reply(monc, msg); 1217 break; 1218 1219 case CEPH_MSG_MON_GET_VERSION_REPLY: 1220 handle_get_version_reply(monc, msg); 1221 break; 1222 1223 case CEPH_MSG_MON_COMMAND_ACK: 1224 handle_command_ack(monc, msg); 1225 break; 1226 1227 case CEPH_MSG_MON_MAP: 1228 ceph_monc_handle_map(monc, msg); 1229 break; 1230 1231 case CEPH_MSG_OSD_MAP: 1232 ceph_osdc_handle_map(&monc->client->osdc, msg); 1233 break; 1234 1235 default: 1236 /* can the chained handler handle it? */ 1237 if (monc->client->extra_mon_dispatch && 1238 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 1239 break; 1240 1241 pr_err("received unknown message type %d %s\n", type, 1242 ceph_msg_type_name(type)); 1243 } 1244 ceph_msg_put(msg); 1245 } 1246 1247 /* 1248 * Allocate memory for incoming message 1249 */ 1250 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 1251 struct ceph_msg_header *hdr, 1252 int *skip) 1253 { 1254 struct ceph_mon_client *monc = con->private; 1255 int type = le16_to_cpu(hdr->type); 1256 int front_len = le32_to_cpu(hdr->front_len); 1257 struct ceph_msg *m = NULL; 1258 1259 *skip = 0; 1260 1261 switch (type) { 1262 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1263 m = ceph_msg_get(monc->m_subscribe_ack); 1264 break; 1265 case CEPH_MSG_STATFS_REPLY: 1266 case CEPH_MSG_MON_COMMAND_ACK: 1267 return get_generic_reply(con, hdr, skip); 1268 case CEPH_MSG_AUTH_REPLY: 1269 m = ceph_msg_get(monc->m_auth_reply); 1270 break; 1271 case CEPH_MSG_MON_GET_VERSION_REPLY: 1272 if (le64_to_cpu(hdr->tid) != 0) 1273 return get_generic_reply(con, hdr, skip); 1274 1275 /* 1276 * Older OSDs don't set reply tid even if the orignal 1277 * request had a non-zero tid. Workaround this weirdness 1278 * by falling through to the allocate case. 1279 */ 1280 case CEPH_MSG_MON_MAP: 1281 case CEPH_MSG_MDS_MAP: 1282 case CEPH_MSG_OSD_MAP: 1283 case CEPH_MSG_FS_MAP_USER: 1284 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1285 if (!m) 1286 return NULL; /* ENOMEM--return skip == 0 */ 1287 break; 1288 } 1289 1290 if (!m) { 1291 pr_info("alloc_msg unknown type %d\n", type); 1292 *skip = 1; 1293 } else if (front_len > m->front_alloc_len) { 1294 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n", 1295 front_len, m->front_alloc_len, 1296 (unsigned int)con->peer_name.type, 1297 le64_to_cpu(con->peer_name.num)); 1298 ceph_msg_put(m); 1299 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1300 } 1301 1302 return m; 1303 } 1304 1305 /* 1306 * If the monitor connection resets, pick a new monitor and resubmit 1307 * any pending requests. 1308 */ 1309 static void mon_fault(struct ceph_connection *con) 1310 { 1311 struct ceph_mon_client *monc = con->private; 1312 1313 mutex_lock(&monc->mutex); 1314 dout("%s mon%d\n", __func__, monc->cur_mon); 1315 if (monc->cur_mon >= 0) { 1316 if (!monc->hunting) { 1317 dout("%s hunting for new mon\n", __func__); 1318 reopen_session(monc); 1319 __schedule_delayed(monc); 1320 } else { 1321 dout("%s already hunting\n", __func__); 1322 } 1323 } 1324 mutex_unlock(&monc->mutex); 1325 } 1326 1327 /* 1328 * We can ignore refcounting on the connection struct, as all references 1329 * will come from the messenger workqueue, which is drained prior to 1330 * mon_client destruction. 1331 */ 1332 static struct ceph_connection *con_get(struct ceph_connection *con) 1333 { 1334 return con; 1335 } 1336 1337 static void con_put(struct ceph_connection *con) 1338 { 1339 } 1340 1341 static const struct ceph_connection_operations mon_con_ops = { 1342 .get = con_get, 1343 .put = con_put, 1344 .dispatch = dispatch, 1345 .fault = mon_fault, 1346 .alloc_msg = mon_alloc_msg, 1347 }; 1348