1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/types.h> 5 #include <linux/slab.h> 6 #include <linux/random.h> 7 #include <linux/sched.h> 8 9 #include <linux/ceph/mon_client.h> 10 #include <linux/ceph/libceph.h> 11 #include <linux/ceph/debugfs.h> 12 #include <linux/ceph/decode.h> 13 #include <linux/ceph/auth.h> 14 15 /* 16 * Interact with Ceph monitor cluster. Handle requests for new map 17 * versions, and periodically resend as needed. Also implement 18 * statfs() and umount(). 19 * 20 * A small cluster of Ceph "monitors" are responsible for managing critical 21 * cluster configuration and state information. An odd number (e.g., 3, 5) 22 * of cmon daemons use a modified version of the Paxos part-time parliament 23 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 24 * list of clients who have mounted the file system. 25 * 26 * We maintain an open, active session with a monitor at all times in order to 27 * receive timely MDSMap updates. We periodically send a keepalive byte on the 28 * TCP socket to ensure we detect a failure. If the connection does break, we 29 * randomly hunt for a new monitor. Once the connection is reestablished, we 30 * resend any outstanding requests. 31 */ 32 33 static const struct ceph_connection_operations mon_con_ops; 34 35 static int __validate_auth(struct ceph_mon_client *monc); 36 37 /* 38 * Decode a monmap blob (e.g., during mount). 39 */ 40 struct ceph_monmap *ceph_monmap_decode(void *p, void *end) 41 { 42 struct ceph_monmap *m = NULL; 43 int i, err = -EINVAL; 44 struct ceph_fsid fsid; 45 u32 epoch, num_mon; 46 u16 version; 47 u32 len; 48 49 ceph_decode_32_safe(&p, end, len, bad); 50 ceph_decode_need(&p, end, len, bad); 51 52 dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p)); 53 54 ceph_decode_16_safe(&p, end, version, bad); 55 56 ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); 57 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 58 epoch = ceph_decode_32(&p); 59 60 num_mon = ceph_decode_32(&p); 61 ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad); 62 63 if (num_mon >= CEPH_MAX_MON) 64 goto bad; 65 m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS); 66 if (m == NULL) 67 return ERR_PTR(-ENOMEM); 68 m->fsid = fsid; 69 m->epoch = epoch; 70 m->num_mon = num_mon; 71 ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0])); 72 for (i = 0; i < num_mon; i++) 73 ceph_decode_addr(&m->mon_inst[i].addr); 74 75 dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, 76 m->num_mon); 77 for (i = 0; i < m->num_mon; i++) 78 dout("monmap_decode mon%d is %s\n", i, 79 ceph_pr_addr(&m->mon_inst[i].addr.in_addr)); 80 return m; 81 82 bad: 83 dout("monmap_decode failed with %d\n", err); 84 kfree(m); 85 return ERR_PTR(err); 86 } 87 88 /* 89 * return true if *addr is included in the monmap. 90 */ 91 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 92 { 93 int i; 94 95 for (i = 0; i < m->num_mon; i++) 96 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0) 97 return 1; 98 return 0; 99 } 100 101 /* 102 * Send an auth request. 103 */ 104 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 105 { 106 monc->pending_auth = 1; 107 monc->m_auth->front.iov_len = len; 108 monc->m_auth->hdr.front_len = cpu_to_le32(len); 109 ceph_msg_revoke(monc->m_auth); 110 ceph_msg_get(monc->m_auth); /* keep our ref */ 111 ceph_con_send(&monc->con, monc->m_auth); 112 } 113 114 /* 115 * Close monitor session, if any. 116 */ 117 static void __close_session(struct ceph_mon_client *monc) 118 { 119 dout("__close_session closing mon%d\n", monc->cur_mon); 120 ceph_msg_revoke(monc->m_auth); 121 ceph_msg_revoke_incoming(monc->m_auth_reply); 122 ceph_msg_revoke(monc->m_subscribe); 123 ceph_msg_revoke_incoming(monc->m_subscribe_ack); 124 ceph_con_close(&monc->con); 125 126 monc->pending_auth = 0; 127 ceph_auth_reset(monc->auth); 128 } 129 130 /* 131 * Pick a new monitor at random and set cur_mon. If we are repicking 132 * (i.e. cur_mon is already set), be sure to pick a different one. 133 */ 134 static void pick_new_mon(struct ceph_mon_client *monc) 135 { 136 int old_mon = monc->cur_mon; 137 138 BUG_ON(monc->monmap->num_mon < 1); 139 140 if (monc->monmap->num_mon == 1) { 141 monc->cur_mon = 0; 142 } else { 143 int max = monc->monmap->num_mon; 144 int o = -1; 145 int n; 146 147 if (monc->cur_mon >= 0) { 148 if (monc->cur_mon < monc->monmap->num_mon) 149 o = monc->cur_mon; 150 if (o >= 0) 151 max--; 152 } 153 154 n = prandom_u32() % max; 155 if (o >= 0 && n >= o) 156 n++; 157 158 monc->cur_mon = n; 159 } 160 161 dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon, 162 monc->cur_mon, monc->monmap->num_mon); 163 } 164 165 /* 166 * Open a session with a new monitor. 167 */ 168 static void __open_session(struct ceph_mon_client *monc) 169 { 170 int ret; 171 172 pick_new_mon(monc); 173 174 monc->hunting = true; 175 if (monc->had_a_connection) { 176 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF; 177 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT) 178 monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT; 179 } 180 181 monc->sub_renew_after = jiffies; /* i.e., expired */ 182 monc->sub_renew_sent = 0; 183 184 dout("%s opening mon%d\n", __func__, monc->cur_mon); 185 ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon, 186 &monc->monmap->mon_inst[monc->cur_mon].addr); 187 188 /* 189 * send an initial keepalive to ensure our timestamp is valid 190 * by the time we are in an OPENED state 191 */ 192 ceph_con_keepalive(&monc->con); 193 194 /* initiate authentication handshake */ 195 ret = ceph_auth_build_hello(monc->auth, 196 monc->m_auth->front.iov_base, 197 monc->m_auth->front_alloc_len); 198 BUG_ON(ret <= 0); 199 __send_prepared_auth_request(monc, ret); 200 } 201 202 static void reopen_session(struct ceph_mon_client *monc) 203 { 204 if (!monc->hunting) 205 pr_info("mon%d %s session lost, hunting for new mon\n", 206 monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr)); 207 208 __close_session(monc); 209 __open_session(monc); 210 } 211 212 /* 213 * Reschedule delayed work timer. 214 */ 215 static void __schedule_delayed(struct ceph_mon_client *monc) 216 { 217 unsigned long delay; 218 219 if (monc->hunting) 220 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult; 221 else 222 delay = CEPH_MONC_PING_INTERVAL; 223 224 dout("__schedule_delayed after %lu\n", delay); 225 mod_delayed_work(system_wq, &monc->delayed_work, 226 round_jiffies_relative(delay)); 227 } 228 229 const char *ceph_sub_str[] = { 230 [CEPH_SUB_MDSMAP] = "mdsmap", 231 [CEPH_SUB_MONMAP] = "monmap", 232 [CEPH_SUB_OSDMAP] = "osdmap", 233 }; 234 235 /* 236 * Send subscribe request for one or more maps, according to 237 * monc->subs. 238 */ 239 static void __send_subscribe(struct ceph_mon_client *monc) 240 { 241 struct ceph_msg *msg = monc->m_subscribe; 242 void *p = msg->front.iov_base; 243 void *const end = p + msg->front_alloc_len; 244 int num = 0; 245 int i; 246 247 dout("%s sent %lu\n", __func__, monc->sub_renew_sent); 248 249 BUG_ON(monc->cur_mon < 0); 250 251 if (!monc->sub_renew_sent) 252 monc->sub_renew_sent = jiffies | 1; /* never 0 */ 253 254 msg->hdr.version = cpu_to_le16(2); 255 256 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 257 if (monc->subs[i].want) 258 num++; 259 } 260 BUG_ON(num < 1); /* monmap sub is always there */ 261 ceph_encode_32(&p, num); 262 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 263 const char *s = ceph_sub_str[i]; 264 265 if (!monc->subs[i].want) 266 continue; 267 268 dout("%s %s start %llu flags 0x%x\n", __func__, s, 269 le64_to_cpu(monc->subs[i].item.start), 270 monc->subs[i].item.flags); 271 ceph_encode_string(&p, end, s, strlen(s)); 272 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item)); 273 p += sizeof(monc->subs[i].item); 274 } 275 276 BUG_ON(p != (end - 35 - (ARRAY_SIZE(monc->subs) - num) * 19)); 277 msg->front.iov_len = p - msg->front.iov_base; 278 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 279 ceph_msg_revoke(msg); 280 ceph_con_send(&monc->con, ceph_msg_get(msg)); 281 } 282 283 static void handle_subscribe_ack(struct ceph_mon_client *monc, 284 struct ceph_msg *msg) 285 { 286 unsigned int seconds; 287 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 288 289 if (msg->front.iov_len < sizeof(*h)) 290 goto bad; 291 seconds = le32_to_cpu(h->duration); 292 293 mutex_lock(&monc->mutex); 294 if (monc->sub_renew_sent) { 295 monc->sub_renew_after = monc->sub_renew_sent + 296 (seconds >> 1) * HZ - 1; 297 dout("%s sent %lu duration %d renew after %lu\n", __func__, 298 monc->sub_renew_sent, seconds, monc->sub_renew_after); 299 monc->sub_renew_sent = 0; 300 } else { 301 dout("%s sent %lu renew after %lu, ignoring\n", __func__, 302 monc->sub_renew_sent, monc->sub_renew_after); 303 } 304 mutex_unlock(&monc->mutex); 305 return; 306 bad: 307 pr_err("got corrupt subscribe-ack msg\n"); 308 ceph_msg_dump(msg); 309 } 310 311 /* 312 * Register interest in a map 313 * 314 * @sub: one of CEPH_SUB_* 315 * @epoch: X for "every map since X", or 0 for "just the latest" 316 */ 317 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub, 318 u32 epoch, bool continuous) 319 { 320 __le64 start = cpu_to_le64(epoch); 321 u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0; 322 323 dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub], 324 epoch, continuous); 325 326 if (monc->subs[sub].want && 327 monc->subs[sub].item.start == start && 328 monc->subs[sub].item.flags == flags) 329 return false; 330 331 monc->subs[sub].item.start = start; 332 monc->subs[sub].item.flags = flags; 333 monc->subs[sub].want = true; 334 335 return true; 336 } 337 338 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch, 339 bool continuous) 340 { 341 bool need_request; 342 343 mutex_lock(&monc->mutex); 344 need_request = __ceph_monc_want_map(monc, sub, epoch, continuous); 345 mutex_unlock(&monc->mutex); 346 347 return need_request; 348 } 349 EXPORT_SYMBOL(ceph_monc_want_map); 350 351 /* 352 * Keep track of which maps we have 353 * 354 * @sub: one of CEPH_SUB_* 355 */ 356 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub, 357 u32 epoch) 358 { 359 dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch); 360 361 if (monc->subs[sub].want) { 362 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME) 363 monc->subs[sub].want = false; 364 else 365 monc->subs[sub].item.start = cpu_to_le64(epoch + 1); 366 } 367 368 monc->subs[sub].have = epoch; 369 } 370 371 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch) 372 { 373 mutex_lock(&monc->mutex); 374 __ceph_monc_got_map(monc, sub, epoch); 375 mutex_unlock(&monc->mutex); 376 } 377 EXPORT_SYMBOL(ceph_monc_got_map); 378 379 /* 380 * Register interest in the next osdmap 381 */ 382 void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) 383 { 384 dout("%s have %u\n", __func__, monc->subs[CEPH_SUB_OSDMAP].have); 385 mutex_lock(&monc->mutex); 386 if (__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 387 monc->subs[CEPH_SUB_OSDMAP].have + 1, false)) 388 __send_subscribe(monc); 389 mutex_unlock(&monc->mutex); 390 } 391 EXPORT_SYMBOL(ceph_monc_request_next_osdmap); 392 393 /* 394 * Wait for an osdmap with a given epoch. 395 * 396 * @epoch: epoch to wait for 397 * @timeout: in jiffies, 0 means "wait forever" 398 */ 399 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, 400 unsigned long timeout) 401 { 402 unsigned long started = jiffies; 403 long ret; 404 405 mutex_lock(&monc->mutex); 406 while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) { 407 mutex_unlock(&monc->mutex); 408 409 if (timeout && time_after_eq(jiffies, started + timeout)) 410 return -ETIMEDOUT; 411 412 ret = wait_event_interruptible_timeout(monc->client->auth_wq, 413 monc->subs[CEPH_SUB_OSDMAP].have >= epoch, 414 ceph_timeout_jiffies(timeout)); 415 if (ret < 0) 416 return ret; 417 418 mutex_lock(&monc->mutex); 419 } 420 421 mutex_unlock(&monc->mutex); 422 return 0; 423 } 424 EXPORT_SYMBOL(ceph_monc_wait_osdmap); 425 426 /* 427 * Open a session with a random monitor. Request monmap and osdmap, 428 * which are waited upon in __ceph_open_session(). 429 */ 430 int ceph_monc_open_session(struct ceph_mon_client *monc) 431 { 432 mutex_lock(&monc->mutex); 433 __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true); 434 __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false); 435 __open_session(monc); 436 __schedule_delayed(monc); 437 mutex_unlock(&monc->mutex); 438 return 0; 439 } 440 EXPORT_SYMBOL(ceph_monc_open_session); 441 442 static void ceph_monc_handle_map(struct ceph_mon_client *monc, 443 struct ceph_msg *msg) 444 { 445 struct ceph_client *client = monc->client; 446 struct ceph_monmap *monmap = NULL, *old = monc->monmap; 447 void *p, *end; 448 449 mutex_lock(&monc->mutex); 450 451 dout("handle_monmap\n"); 452 p = msg->front.iov_base; 453 end = p + msg->front.iov_len; 454 455 monmap = ceph_monmap_decode(p, end); 456 if (IS_ERR(monmap)) { 457 pr_err("problem decoding monmap, %d\n", 458 (int)PTR_ERR(monmap)); 459 goto out; 460 } 461 462 if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) { 463 kfree(monmap); 464 goto out; 465 } 466 467 client->monc.monmap = monmap; 468 kfree(old); 469 470 __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch); 471 client->have_fsid = true; 472 473 out: 474 mutex_unlock(&monc->mutex); 475 wake_up_all(&client->auth_wq); 476 } 477 478 /* 479 * generic requests (currently statfs, mon_get_version) 480 */ 481 static struct ceph_mon_generic_request *__lookup_generic_req( 482 struct ceph_mon_client *monc, u64 tid) 483 { 484 struct ceph_mon_generic_request *req; 485 struct rb_node *n = monc->generic_request_tree.rb_node; 486 487 while (n) { 488 req = rb_entry(n, struct ceph_mon_generic_request, node); 489 if (tid < req->tid) 490 n = n->rb_left; 491 else if (tid > req->tid) 492 n = n->rb_right; 493 else 494 return req; 495 } 496 return NULL; 497 } 498 499 static void __insert_generic_request(struct ceph_mon_client *monc, 500 struct ceph_mon_generic_request *new) 501 { 502 struct rb_node **p = &monc->generic_request_tree.rb_node; 503 struct rb_node *parent = NULL; 504 struct ceph_mon_generic_request *req = NULL; 505 506 while (*p) { 507 parent = *p; 508 req = rb_entry(parent, struct ceph_mon_generic_request, node); 509 if (new->tid < req->tid) 510 p = &(*p)->rb_left; 511 else if (new->tid > req->tid) 512 p = &(*p)->rb_right; 513 else 514 BUG(); 515 } 516 517 rb_link_node(&new->node, parent, p); 518 rb_insert_color(&new->node, &monc->generic_request_tree); 519 } 520 521 static void release_generic_request(struct kref *kref) 522 { 523 struct ceph_mon_generic_request *req = 524 container_of(kref, struct ceph_mon_generic_request, kref); 525 526 if (req->reply) 527 ceph_msg_put(req->reply); 528 if (req->request) 529 ceph_msg_put(req->request); 530 531 kfree(req); 532 } 533 534 static void put_generic_request(struct ceph_mon_generic_request *req) 535 { 536 kref_put(&req->kref, release_generic_request); 537 } 538 539 static void get_generic_request(struct ceph_mon_generic_request *req) 540 { 541 kref_get(&req->kref); 542 } 543 544 static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 545 struct ceph_msg_header *hdr, 546 int *skip) 547 { 548 struct ceph_mon_client *monc = con->private; 549 struct ceph_mon_generic_request *req; 550 u64 tid = le64_to_cpu(hdr->tid); 551 struct ceph_msg *m; 552 553 mutex_lock(&monc->mutex); 554 req = __lookup_generic_req(monc, tid); 555 if (!req) { 556 dout("get_generic_reply %lld dne\n", tid); 557 *skip = 1; 558 m = NULL; 559 } else { 560 dout("get_generic_reply %lld got %p\n", tid, req->reply); 561 *skip = 0; 562 m = ceph_msg_get(req->reply); 563 /* 564 * we don't need to track the connection reading into 565 * this reply because we only have one open connection 566 * at a time, ever. 567 */ 568 } 569 mutex_unlock(&monc->mutex); 570 return m; 571 } 572 573 static int __do_generic_request(struct ceph_mon_client *monc, u64 tid, 574 struct ceph_mon_generic_request *req) 575 { 576 int err; 577 578 /* register request */ 579 req->tid = tid != 0 ? tid : ++monc->last_tid; 580 req->request->hdr.tid = cpu_to_le64(req->tid); 581 __insert_generic_request(monc, req); 582 monc->num_generic_requests++; 583 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 584 mutex_unlock(&monc->mutex); 585 586 err = wait_for_completion_interruptible(&req->completion); 587 588 mutex_lock(&monc->mutex); 589 rb_erase(&req->node, &monc->generic_request_tree); 590 monc->num_generic_requests--; 591 592 if (!err) 593 err = req->result; 594 return err; 595 } 596 597 static int do_generic_request(struct ceph_mon_client *monc, 598 struct ceph_mon_generic_request *req) 599 { 600 int err; 601 602 mutex_lock(&monc->mutex); 603 err = __do_generic_request(monc, 0, req); 604 mutex_unlock(&monc->mutex); 605 606 return err; 607 } 608 609 /* 610 * statfs 611 */ 612 static void handle_statfs_reply(struct ceph_mon_client *monc, 613 struct ceph_msg *msg) 614 { 615 struct ceph_mon_generic_request *req; 616 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 617 u64 tid = le64_to_cpu(msg->hdr.tid); 618 619 if (msg->front.iov_len != sizeof(*reply)) 620 goto bad; 621 dout("handle_statfs_reply %p tid %llu\n", msg, tid); 622 623 mutex_lock(&monc->mutex); 624 req = __lookup_generic_req(monc, tid); 625 if (req) { 626 *(struct ceph_statfs *)req->buf = reply->st; 627 req->result = 0; 628 get_generic_request(req); 629 } 630 mutex_unlock(&monc->mutex); 631 if (req) { 632 complete_all(&req->completion); 633 put_generic_request(req); 634 } 635 return; 636 637 bad: 638 pr_err("corrupt statfs reply, tid %llu\n", tid); 639 ceph_msg_dump(msg); 640 } 641 642 /* 643 * Do a synchronous statfs(). 644 */ 645 int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) 646 { 647 struct ceph_mon_generic_request *req; 648 struct ceph_mon_statfs *h; 649 int err; 650 651 req = kzalloc(sizeof(*req), GFP_NOFS); 652 if (!req) 653 return -ENOMEM; 654 655 kref_init(&req->kref); 656 req->buf = buf; 657 init_completion(&req->completion); 658 659 err = -ENOMEM; 660 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, 661 true); 662 if (!req->request) 663 goto out; 664 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS, 665 true); 666 if (!req->reply) 667 goto out; 668 669 /* fill out request */ 670 h = req->request->front.iov_base; 671 h->monhdr.have_version = 0; 672 h->monhdr.session_mon = cpu_to_le16(-1); 673 h->monhdr.session_mon_tid = 0; 674 h->fsid = monc->monmap->fsid; 675 676 err = do_generic_request(monc, req); 677 678 out: 679 put_generic_request(req); 680 return err; 681 } 682 EXPORT_SYMBOL(ceph_monc_do_statfs); 683 684 static void handle_get_version_reply(struct ceph_mon_client *monc, 685 struct ceph_msg *msg) 686 { 687 struct ceph_mon_generic_request *req; 688 u64 tid = le64_to_cpu(msg->hdr.tid); 689 void *p = msg->front.iov_base; 690 void *end = p + msg->front_alloc_len; 691 u64 handle; 692 693 dout("%s %p tid %llu\n", __func__, msg, tid); 694 695 ceph_decode_need(&p, end, 2*sizeof(u64), bad); 696 handle = ceph_decode_64(&p); 697 if (tid != 0 && tid != handle) 698 goto bad; 699 700 mutex_lock(&monc->mutex); 701 req = __lookup_generic_req(monc, handle); 702 if (req) { 703 *(u64 *)req->buf = ceph_decode_64(&p); 704 req->result = 0; 705 get_generic_request(req); 706 } 707 mutex_unlock(&monc->mutex); 708 if (req) { 709 complete_all(&req->completion); 710 put_generic_request(req); 711 } 712 713 return; 714 bad: 715 pr_err("corrupt mon_get_version reply, tid %llu\n", tid); 716 ceph_msg_dump(msg); 717 } 718 719 /* 720 * Send MMonGetVersion and wait for the reply. 721 * 722 * @what: one of "mdsmap", "osdmap" or "monmap" 723 */ 724 int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what, 725 u64 *newest) 726 { 727 struct ceph_mon_generic_request *req; 728 void *p, *end; 729 u64 tid; 730 int err; 731 732 req = kzalloc(sizeof(*req), GFP_NOFS); 733 if (!req) 734 return -ENOMEM; 735 736 kref_init(&req->kref); 737 req->buf = newest; 738 init_completion(&req->completion); 739 740 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, 741 sizeof(u64) + sizeof(u32) + strlen(what), 742 GFP_NOFS, true); 743 if (!req->request) { 744 err = -ENOMEM; 745 goto out; 746 } 747 748 req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024, 749 GFP_NOFS, true); 750 if (!req->reply) { 751 err = -ENOMEM; 752 goto out; 753 } 754 755 p = req->request->front.iov_base; 756 end = p + req->request->front_alloc_len; 757 758 /* fill out request */ 759 mutex_lock(&monc->mutex); 760 tid = ++monc->last_tid; 761 ceph_encode_64(&p, tid); /* handle */ 762 ceph_encode_string(&p, end, what, strlen(what)); 763 764 err = __do_generic_request(monc, tid, req); 765 766 mutex_unlock(&monc->mutex); 767 out: 768 put_generic_request(req); 769 return err; 770 } 771 EXPORT_SYMBOL(ceph_monc_do_get_version); 772 773 /* 774 * Resend pending generic requests. 775 */ 776 static void __resend_generic_request(struct ceph_mon_client *monc) 777 { 778 struct ceph_mon_generic_request *req; 779 struct rb_node *p; 780 781 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 782 req = rb_entry(p, struct ceph_mon_generic_request, node); 783 ceph_msg_revoke(req->request); 784 ceph_msg_revoke_incoming(req->reply); 785 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 786 } 787 } 788 789 /* 790 * Delayed work. If we haven't mounted yet, retry. Otherwise, 791 * renew/retry subscription as needed (in case it is timing out, or we 792 * got an ENOMEM). And keep the monitor connection alive. 793 */ 794 static void delayed_work(struct work_struct *work) 795 { 796 struct ceph_mon_client *monc = 797 container_of(work, struct ceph_mon_client, delayed_work.work); 798 799 dout("monc delayed_work\n"); 800 mutex_lock(&monc->mutex); 801 if (monc->hunting) { 802 dout("%s continuing hunt\n", __func__); 803 reopen_session(monc); 804 } else { 805 int is_auth = ceph_auth_is_authenticated(monc->auth); 806 if (ceph_con_keepalive_expired(&monc->con, 807 CEPH_MONC_PING_TIMEOUT)) { 808 dout("monc keepalive timeout\n"); 809 is_auth = 0; 810 reopen_session(monc); 811 } 812 813 if (!monc->hunting) { 814 ceph_con_keepalive(&monc->con); 815 __validate_auth(monc); 816 } 817 818 if (is_auth) { 819 unsigned long now = jiffies; 820 821 dout("%s renew subs? now %lu renew after %lu\n", 822 __func__, now, monc->sub_renew_after); 823 if (time_after_eq(now, monc->sub_renew_after)) 824 __send_subscribe(monc); 825 } 826 } 827 __schedule_delayed(monc); 828 mutex_unlock(&monc->mutex); 829 } 830 831 /* 832 * On startup, we build a temporary monmap populated with the IPs 833 * provided by mount(2). 834 */ 835 static int build_initial_monmap(struct ceph_mon_client *monc) 836 { 837 struct ceph_options *opt = monc->client->options; 838 struct ceph_entity_addr *mon_addr = opt->mon_addr; 839 int num_mon = opt->num_mon; 840 int i; 841 842 /* build initial monmap */ 843 monc->monmap = kzalloc(sizeof(*monc->monmap) + 844 num_mon*sizeof(monc->monmap->mon_inst[0]), 845 GFP_KERNEL); 846 if (!monc->monmap) 847 return -ENOMEM; 848 for (i = 0; i < num_mon; i++) { 849 monc->monmap->mon_inst[i].addr = mon_addr[i]; 850 monc->monmap->mon_inst[i].addr.nonce = 0; 851 monc->monmap->mon_inst[i].name.type = 852 CEPH_ENTITY_TYPE_MON; 853 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i); 854 } 855 monc->monmap->num_mon = num_mon; 856 return 0; 857 } 858 859 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 860 { 861 int err = 0; 862 863 dout("init\n"); 864 memset(monc, 0, sizeof(*monc)); 865 monc->client = cl; 866 monc->monmap = NULL; 867 mutex_init(&monc->mutex); 868 869 err = build_initial_monmap(monc); 870 if (err) 871 goto out; 872 873 /* connection */ 874 /* authentication */ 875 monc->auth = ceph_auth_init(cl->options->name, 876 cl->options->key); 877 if (IS_ERR(monc->auth)) { 878 err = PTR_ERR(monc->auth); 879 goto out_monmap; 880 } 881 monc->auth->want_keys = 882 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 883 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 884 885 /* msgs */ 886 err = -ENOMEM; 887 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 888 sizeof(struct ceph_mon_subscribe_ack), 889 GFP_NOFS, true); 890 if (!monc->m_subscribe_ack) 891 goto out_auth; 892 893 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS, 894 true); 895 if (!monc->m_subscribe) 896 goto out_subscribe_ack; 897 898 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS, 899 true); 900 if (!monc->m_auth_reply) 901 goto out_subscribe; 902 903 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true); 904 monc->pending_auth = 0; 905 if (!monc->m_auth) 906 goto out_auth_reply; 907 908 ceph_con_init(&monc->con, monc, &mon_con_ops, 909 &monc->client->msgr); 910 911 monc->cur_mon = -1; 912 monc->had_a_connection = false; 913 monc->hunt_mult = 1; 914 915 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 916 monc->generic_request_tree = RB_ROOT; 917 monc->num_generic_requests = 0; 918 monc->last_tid = 0; 919 920 return 0; 921 922 out_auth_reply: 923 ceph_msg_put(monc->m_auth_reply); 924 out_subscribe: 925 ceph_msg_put(monc->m_subscribe); 926 out_subscribe_ack: 927 ceph_msg_put(monc->m_subscribe_ack); 928 out_auth: 929 ceph_auth_destroy(monc->auth); 930 out_monmap: 931 kfree(monc->monmap); 932 out: 933 return err; 934 } 935 EXPORT_SYMBOL(ceph_monc_init); 936 937 void ceph_monc_stop(struct ceph_mon_client *monc) 938 { 939 dout("stop\n"); 940 cancel_delayed_work_sync(&monc->delayed_work); 941 942 mutex_lock(&monc->mutex); 943 __close_session(monc); 944 monc->cur_mon = -1; 945 mutex_unlock(&monc->mutex); 946 947 /* 948 * flush msgr queue before we destroy ourselves to ensure that: 949 * - any work that references our embedded con is finished. 950 * - any osd_client or other work that may reference an authorizer 951 * finishes before we shut down the auth subsystem. 952 */ 953 ceph_msgr_flush(); 954 955 ceph_auth_destroy(monc->auth); 956 957 ceph_msg_put(monc->m_auth); 958 ceph_msg_put(monc->m_auth_reply); 959 ceph_msg_put(monc->m_subscribe); 960 ceph_msg_put(monc->m_subscribe_ack); 961 962 kfree(monc->monmap); 963 } 964 EXPORT_SYMBOL(ceph_monc_stop); 965 966 static void finish_hunting(struct ceph_mon_client *monc) 967 { 968 if (monc->hunting) { 969 dout("%s found mon%d\n", __func__, monc->cur_mon); 970 monc->hunting = false; 971 monc->had_a_connection = true; 972 monc->hunt_mult /= 2; /* reduce by 50% */ 973 if (monc->hunt_mult < 1) 974 monc->hunt_mult = 1; 975 } 976 } 977 978 static void handle_auth_reply(struct ceph_mon_client *monc, 979 struct ceph_msg *msg) 980 { 981 int ret; 982 int was_auth = 0; 983 984 mutex_lock(&monc->mutex); 985 was_auth = ceph_auth_is_authenticated(monc->auth); 986 monc->pending_auth = 0; 987 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 988 msg->front.iov_len, 989 monc->m_auth->front.iov_base, 990 monc->m_auth->front_alloc_len); 991 if (ret > 0) { 992 __send_prepared_auth_request(monc, ret); 993 goto out; 994 } 995 996 finish_hunting(monc); 997 998 if (ret < 0) { 999 monc->client->auth_err = ret; 1000 } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) { 1001 dout("authenticated, starting session\n"); 1002 1003 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 1004 monc->client->msgr.inst.name.num = 1005 cpu_to_le64(monc->auth->global_id); 1006 1007 __send_subscribe(monc); 1008 __resend_generic_request(monc); 1009 1010 pr_info("mon%d %s session established\n", monc->cur_mon, 1011 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 1012 } 1013 1014 out: 1015 mutex_unlock(&monc->mutex); 1016 if (monc->client->auth_err < 0) 1017 wake_up_all(&monc->client->auth_wq); 1018 } 1019 1020 static int __validate_auth(struct ceph_mon_client *monc) 1021 { 1022 int ret; 1023 1024 if (monc->pending_auth) 1025 return 0; 1026 1027 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 1028 monc->m_auth->front_alloc_len); 1029 if (ret <= 0) 1030 return ret; /* either an error, or no need to authenticate */ 1031 __send_prepared_auth_request(monc, ret); 1032 return 0; 1033 } 1034 1035 int ceph_monc_validate_auth(struct ceph_mon_client *monc) 1036 { 1037 int ret; 1038 1039 mutex_lock(&monc->mutex); 1040 ret = __validate_auth(monc); 1041 mutex_unlock(&monc->mutex); 1042 return ret; 1043 } 1044 EXPORT_SYMBOL(ceph_monc_validate_auth); 1045 1046 /* 1047 * handle incoming message 1048 */ 1049 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 1050 { 1051 struct ceph_mon_client *monc = con->private; 1052 int type = le16_to_cpu(msg->hdr.type); 1053 1054 if (!monc) 1055 return; 1056 1057 switch (type) { 1058 case CEPH_MSG_AUTH_REPLY: 1059 handle_auth_reply(monc, msg); 1060 break; 1061 1062 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1063 handle_subscribe_ack(monc, msg); 1064 break; 1065 1066 case CEPH_MSG_STATFS_REPLY: 1067 handle_statfs_reply(monc, msg); 1068 break; 1069 1070 case CEPH_MSG_MON_GET_VERSION_REPLY: 1071 handle_get_version_reply(monc, msg); 1072 break; 1073 1074 case CEPH_MSG_MON_MAP: 1075 ceph_monc_handle_map(monc, msg); 1076 break; 1077 1078 case CEPH_MSG_OSD_MAP: 1079 ceph_osdc_handle_map(&monc->client->osdc, msg); 1080 break; 1081 1082 default: 1083 /* can the chained handler handle it? */ 1084 if (monc->client->extra_mon_dispatch && 1085 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 1086 break; 1087 1088 pr_err("received unknown message type %d %s\n", type, 1089 ceph_msg_type_name(type)); 1090 } 1091 ceph_msg_put(msg); 1092 } 1093 1094 /* 1095 * Allocate memory for incoming message 1096 */ 1097 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 1098 struct ceph_msg_header *hdr, 1099 int *skip) 1100 { 1101 struct ceph_mon_client *monc = con->private; 1102 int type = le16_to_cpu(hdr->type); 1103 int front_len = le32_to_cpu(hdr->front_len); 1104 struct ceph_msg *m = NULL; 1105 1106 *skip = 0; 1107 1108 switch (type) { 1109 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1110 m = ceph_msg_get(monc->m_subscribe_ack); 1111 break; 1112 case CEPH_MSG_STATFS_REPLY: 1113 return get_generic_reply(con, hdr, skip); 1114 case CEPH_MSG_AUTH_REPLY: 1115 m = ceph_msg_get(monc->m_auth_reply); 1116 break; 1117 case CEPH_MSG_MON_GET_VERSION_REPLY: 1118 if (le64_to_cpu(hdr->tid) != 0) 1119 return get_generic_reply(con, hdr, skip); 1120 1121 /* 1122 * Older OSDs don't set reply tid even if the orignal 1123 * request had a non-zero tid. Workaround this weirdness 1124 * by falling through to the allocate case. 1125 */ 1126 case CEPH_MSG_MON_MAP: 1127 case CEPH_MSG_MDS_MAP: 1128 case CEPH_MSG_OSD_MAP: 1129 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1130 if (!m) 1131 return NULL; /* ENOMEM--return skip == 0 */ 1132 break; 1133 } 1134 1135 if (!m) { 1136 pr_info("alloc_msg unknown type %d\n", type); 1137 *skip = 1; 1138 } else if (front_len > m->front_alloc_len) { 1139 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n", 1140 front_len, m->front_alloc_len, 1141 (unsigned int)con->peer_name.type, 1142 le64_to_cpu(con->peer_name.num)); 1143 ceph_msg_put(m); 1144 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1145 } 1146 1147 return m; 1148 } 1149 1150 /* 1151 * If the monitor connection resets, pick a new monitor and resubmit 1152 * any pending requests. 1153 */ 1154 static void mon_fault(struct ceph_connection *con) 1155 { 1156 struct ceph_mon_client *monc = con->private; 1157 1158 mutex_lock(&monc->mutex); 1159 dout("%s mon%d\n", __func__, monc->cur_mon); 1160 if (monc->cur_mon >= 0) { 1161 if (!monc->hunting) { 1162 dout("%s hunting for new mon\n", __func__); 1163 reopen_session(monc); 1164 __schedule_delayed(monc); 1165 } else { 1166 dout("%s already hunting\n", __func__); 1167 } 1168 } 1169 mutex_unlock(&monc->mutex); 1170 } 1171 1172 /* 1173 * We can ignore refcounting on the connection struct, as all references 1174 * will come from the messenger workqueue, which is drained prior to 1175 * mon_client destruction. 1176 */ 1177 static struct ceph_connection *con_get(struct ceph_connection *con) 1178 { 1179 return con; 1180 } 1181 1182 static void con_put(struct ceph_connection *con) 1183 { 1184 } 1185 1186 static const struct ceph_connection_operations mon_con_ops = { 1187 .get = con_get, 1188 .put = con_put, 1189 .dispatch = dispatch, 1190 .fault = mon_fault, 1191 .alloc_msg = mon_alloc_msg, 1192 }; 1193