1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/types.h> 5 #include <linux/slab.h> 6 #include <linux/random.h> 7 #include <linux/sched.h> 8 9 #include <linux/ceph/mon_client.h> 10 #include <linux/ceph/libceph.h> 11 #include <linux/ceph/debugfs.h> 12 #include <linux/ceph/decode.h> 13 #include <linux/ceph/auth.h> 14 15 /* 16 * Interact with Ceph monitor cluster. Handle requests for new map 17 * versions, and periodically resend as needed. Also implement 18 * statfs() and umount(). 19 * 20 * A small cluster of Ceph "monitors" are responsible for managing critical 21 * cluster configuration and state information. An odd number (e.g., 3, 5) 22 * of cmon daemons use a modified version of the Paxos part-time parliament 23 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 24 * list of clients who have mounted the file system. 25 * 26 * We maintain an open, active session with a monitor at all times in order to 27 * receive timely MDSMap updates. We periodically send a keepalive byte on the 28 * TCP socket to ensure we detect a failure. If the connection does break, we 29 * randomly hunt for a new monitor. Once the connection is reestablished, we 30 * resend any outstanding requests. 31 */ 32 33 static const struct ceph_connection_operations mon_con_ops; 34 35 static int __validate_auth(struct ceph_mon_client *monc); 36 37 /* 38 * Decode a monmap blob (e.g., during mount). 39 */ 40 struct ceph_monmap *ceph_monmap_decode(void *p, void *end) 41 { 42 struct ceph_monmap *m = NULL; 43 int i, err = -EINVAL; 44 struct ceph_fsid fsid; 45 u32 epoch, num_mon; 46 u16 version; 47 u32 len; 48 49 ceph_decode_32_safe(&p, end, len, bad); 50 ceph_decode_need(&p, end, len, bad); 51 52 dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p)); 53 54 ceph_decode_16_safe(&p, end, version, bad); 55 56 ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); 57 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 58 epoch = ceph_decode_32(&p); 59 60 num_mon = ceph_decode_32(&p); 61 ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad); 62 63 if (num_mon >= CEPH_MAX_MON) 64 goto bad; 65 m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS); 66 if (m == NULL) 67 return ERR_PTR(-ENOMEM); 68 m->fsid = fsid; 69 m->epoch = epoch; 70 m->num_mon = num_mon; 71 ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0])); 72 for (i = 0; i < num_mon; i++) 73 ceph_decode_addr(&m->mon_inst[i].addr); 74 75 dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, 76 m->num_mon); 77 for (i = 0; i < m->num_mon; i++) 78 dout("monmap_decode mon%d is %s\n", i, 79 ceph_pr_addr(&m->mon_inst[i].addr.in_addr)); 80 return m; 81 82 bad: 83 dout("monmap_decode failed with %d\n", err); 84 kfree(m); 85 return ERR_PTR(err); 86 } 87 88 /* 89 * return true if *addr is included in the monmap. 90 */ 91 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 92 { 93 int i; 94 95 for (i = 0; i < m->num_mon; i++) 96 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0) 97 return 1; 98 return 0; 99 } 100 101 /* 102 * Send an auth request. 103 */ 104 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 105 { 106 monc->pending_auth = 1; 107 monc->m_auth->front.iov_len = len; 108 monc->m_auth->hdr.front_len = cpu_to_le32(len); 109 ceph_msg_revoke(monc->m_auth); 110 ceph_msg_get(monc->m_auth); /* keep our ref */ 111 ceph_con_send(&monc->con, monc->m_auth); 112 } 113 114 /* 115 * Close monitor session, if any. 116 */ 117 static void __close_session(struct ceph_mon_client *monc) 118 { 119 dout("__close_session closing mon%d\n", monc->cur_mon); 120 ceph_msg_revoke(monc->m_auth); 121 ceph_msg_revoke_incoming(monc->m_auth_reply); 122 ceph_msg_revoke(monc->m_subscribe); 123 ceph_msg_revoke_incoming(monc->m_subscribe_ack); 124 ceph_con_close(&monc->con); 125 monc->cur_mon = -1; 126 monc->pending_auth = 0; 127 ceph_auth_reset(monc->auth); 128 } 129 130 /* 131 * Open a session with a (new) monitor. 132 */ 133 static int __open_session(struct ceph_mon_client *monc) 134 { 135 char r; 136 int ret; 137 138 if (monc->cur_mon < 0) { 139 get_random_bytes(&r, 1); 140 monc->cur_mon = r % monc->monmap->num_mon; 141 dout("open_session num=%d r=%d -> mon%d\n", 142 monc->monmap->num_mon, r, monc->cur_mon); 143 monc->sub_sent = 0; 144 monc->sub_renew_after = jiffies; /* i.e., expired */ 145 monc->want_next_osdmap = !!monc->want_next_osdmap; 146 147 dout("open_session mon%d opening\n", monc->cur_mon); 148 ceph_con_open(&monc->con, 149 CEPH_ENTITY_TYPE_MON, monc->cur_mon, 150 &monc->monmap->mon_inst[monc->cur_mon].addr); 151 152 /* send an initial keepalive to ensure our timestamp is 153 * valid by the time we are in an OPENED state */ 154 ceph_con_keepalive(&monc->con); 155 156 /* initiatiate authentication handshake */ 157 ret = ceph_auth_build_hello(monc->auth, 158 monc->m_auth->front.iov_base, 159 monc->m_auth->front_alloc_len); 160 __send_prepared_auth_request(monc, ret); 161 } else { 162 dout("open_session mon%d already open\n", monc->cur_mon); 163 } 164 return 0; 165 } 166 167 static bool __sub_expired(struct ceph_mon_client *monc) 168 { 169 return time_after_eq(jiffies, monc->sub_renew_after); 170 } 171 172 /* 173 * Reschedule delayed work timer. 174 */ 175 static void __schedule_delayed(struct ceph_mon_client *monc) 176 { 177 struct ceph_options *opt = monc->client->options; 178 unsigned long delay; 179 180 if (monc->cur_mon < 0 || __sub_expired(monc)) { 181 delay = 10 * HZ; 182 } else { 183 delay = 20 * HZ; 184 if (opt->monc_ping_timeout > 0) 185 delay = min(delay, opt->monc_ping_timeout / 3); 186 } 187 dout("__schedule_delayed after %lu\n", delay); 188 schedule_delayed_work(&monc->delayed_work, 189 round_jiffies_relative(delay)); 190 } 191 192 /* 193 * Send subscribe request for mdsmap and/or osdmap. 194 */ 195 static void __send_subscribe(struct ceph_mon_client *monc) 196 { 197 dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n", 198 (unsigned int)monc->sub_sent, __sub_expired(monc), 199 monc->want_next_osdmap); 200 if ((__sub_expired(monc) && !monc->sub_sent) || 201 monc->want_next_osdmap == 1) { 202 struct ceph_msg *msg = monc->m_subscribe; 203 struct ceph_mon_subscribe_item *i; 204 void *p, *end; 205 int num; 206 207 p = msg->front.iov_base; 208 end = p + msg->front_alloc_len; 209 210 num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; 211 ceph_encode_32(&p, num); 212 213 if (monc->want_next_osdmap) { 214 dout("__send_subscribe to 'osdmap' %u\n", 215 (unsigned int)monc->have_osdmap); 216 ceph_encode_string(&p, end, "osdmap", 6); 217 i = p; 218 i->have = cpu_to_le64(monc->have_osdmap); 219 i->onetime = 1; 220 p += sizeof(*i); 221 monc->want_next_osdmap = 2; /* requested */ 222 } 223 if (monc->want_mdsmap) { 224 dout("__send_subscribe to 'mdsmap' %u+\n", 225 (unsigned int)monc->have_mdsmap); 226 ceph_encode_string(&p, end, "mdsmap", 6); 227 i = p; 228 i->have = cpu_to_le64(monc->have_mdsmap); 229 i->onetime = 0; 230 p += sizeof(*i); 231 } 232 ceph_encode_string(&p, end, "monmap", 6); 233 i = p; 234 i->have = 0; 235 i->onetime = 0; 236 p += sizeof(*i); 237 238 msg->front.iov_len = p - msg->front.iov_base; 239 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 240 ceph_msg_revoke(msg); 241 ceph_con_send(&monc->con, ceph_msg_get(msg)); 242 243 monc->sub_sent = jiffies | 1; /* never 0 */ 244 } 245 } 246 247 static void handle_subscribe_ack(struct ceph_mon_client *monc, 248 struct ceph_msg *msg) 249 { 250 unsigned int seconds; 251 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 252 253 if (msg->front.iov_len < sizeof(*h)) 254 goto bad; 255 seconds = le32_to_cpu(h->duration); 256 257 mutex_lock(&monc->mutex); 258 if (monc->hunting) { 259 pr_info("mon%d %s session established\n", 260 monc->cur_mon, 261 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 262 monc->hunting = false; 263 } 264 dout("handle_subscribe_ack after %d seconds\n", seconds); 265 monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1; 266 monc->sub_sent = 0; 267 mutex_unlock(&monc->mutex); 268 return; 269 bad: 270 pr_err("got corrupt subscribe-ack msg\n"); 271 ceph_msg_dump(msg); 272 } 273 274 /* 275 * Keep track of which maps we have 276 */ 277 int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got) 278 { 279 mutex_lock(&monc->mutex); 280 monc->have_mdsmap = got; 281 mutex_unlock(&monc->mutex); 282 return 0; 283 } 284 EXPORT_SYMBOL(ceph_monc_got_mdsmap); 285 286 int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got) 287 { 288 mutex_lock(&monc->mutex); 289 monc->have_osdmap = got; 290 monc->want_next_osdmap = 0; 291 mutex_unlock(&monc->mutex); 292 return 0; 293 } 294 295 /* 296 * Register interest in the next osdmap 297 */ 298 void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) 299 { 300 dout("request_next_osdmap have %u\n", monc->have_osdmap); 301 mutex_lock(&monc->mutex); 302 if (!monc->want_next_osdmap) 303 monc->want_next_osdmap = 1; 304 if (monc->want_next_osdmap < 2) 305 __send_subscribe(monc); 306 mutex_unlock(&monc->mutex); 307 } 308 EXPORT_SYMBOL(ceph_monc_request_next_osdmap); 309 310 /* 311 * Wait for an osdmap with a given epoch. 312 * 313 * @epoch: epoch to wait for 314 * @timeout: in jiffies, 0 means "wait forever" 315 */ 316 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, 317 unsigned long timeout) 318 { 319 unsigned long started = jiffies; 320 long ret; 321 322 mutex_lock(&monc->mutex); 323 while (monc->have_osdmap < epoch) { 324 mutex_unlock(&monc->mutex); 325 326 if (timeout && time_after_eq(jiffies, started + timeout)) 327 return -ETIMEDOUT; 328 329 ret = wait_event_interruptible_timeout(monc->client->auth_wq, 330 monc->have_osdmap >= epoch, 331 ceph_timeout_jiffies(timeout)); 332 if (ret < 0) 333 return ret; 334 335 mutex_lock(&monc->mutex); 336 } 337 338 mutex_unlock(&monc->mutex); 339 return 0; 340 } 341 EXPORT_SYMBOL(ceph_monc_wait_osdmap); 342 343 /* 344 * 345 */ 346 int ceph_monc_open_session(struct ceph_mon_client *monc) 347 { 348 mutex_lock(&monc->mutex); 349 __open_session(monc); 350 __schedule_delayed(monc); 351 mutex_unlock(&monc->mutex); 352 return 0; 353 } 354 EXPORT_SYMBOL(ceph_monc_open_session); 355 356 /* 357 * We require the fsid and global_id in order to initialize our 358 * debugfs dir. 359 */ 360 static bool have_debugfs_info(struct ceph_mon_client *monc) 361 { 362 dout("have_debugfs_info fsid %d globalid %lld\n", 363 (int)monc->client->have_fsid, monc->auth->global_id); 364 return monc->client->have_fsid && monc->auth->global_id > 0; 365 } 366 367 static void ceph_monc_handle_map(struct ceph_mon_client *monc, 368 struct ceph_msg *msg) 369 { 370 struct ceph_client *client = monc->client; 371 struct ceph_monmap *monmap = NULL, *old = monc->monmap; 372 void *p, *end; 373 int had_debugfs_info, init_debugfs = 0; 374 375 mutex_lock(&monc->mutex); 376 377 had_debugfs_info = have_debugfs_info(monc); 378 379 dout("handle_monmap\n"); 380 p = msg->front.iov_base; 381 end = p + msg->front.iov_len; 382 383 monmap = ceph_monmap_decode(p, end); 384 if (IS_ERR(monmap)) { 385 pr_err("problem decoding monmap, %d\n", 386 (int)PTR_ERR(monmap)); 387 goto out; 388 } 389 390 if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) { 391 kfree(monmap); 392 goto out; 393 } 394 395 client->monc.monmap = monmap; 396 kfree(old); 397 398 if (!client->have_fsid) { 399 client->have_fsid = true; 400 if (!had_debugfs_info && have_debugfs_info(monc)) { 401 pr_info("client%lld fsid %pU\n", 402 ceph_client_id(monc->client), 403 &monc->client->fsid); 404 init_debugfs = 1; 405 } 406 mutex_unlock(&monc->mutex); 407 408 if (init_debugfs) { 409 /* 410 * do debugfs initialization without mutex to avoid 411 * creating a locking dependency 412 */ 413 ceph_debugfs_client_init(monc->client); 414 } 415 416 goto out_unlocked; 417 } 418 out: 419 mutex_unlock(&monc->mutex); 420 out_unlocked: 421 wake_up_all(&client->auth_wq); 422 } 423 424 /* 425 * generic requests (currently statfs, mon_get_version) 426 */ 427 static struct ceph_mon_generic_request *__lookup_generic_req( 428 struct ceph_mon_client *monc, u64 tid) 429 { 430 struct ceph_mon_generic_request *req; 431 struct rb_node *n = monc->generic_request_tree.rb_node; 432 433 while (n) { 434 req = rb_entry(n, struct ceph_mon_generic_request, node); 435 if (tid < req->tid) 436 n = n->rb_left; 437 else if (tid > req->tid) 438 n = n->rb_right; 439 else 440 return req; 441 } 442 return NULL; 443 } 444 445 static void __insert_generic_request(struct ceph_mon_client *monc, 446 struct ceph_mon_generic_request *new) 447 { 448 struct rb_node **p = &monc->generic_request_tree.rb_node; 449 struct rb_node *parent = NULL; 450 struct ceph_mon_generic_request *req = NULL; 451 452 while (*p) { 453 parent = *p; 454 req = rb_entry(parent, struct ceph_mon_generic_request, node); 455 if (new->tid < req->tid) 456 p = &(*p)->rb_left; 457 else if (new->tid > req->tid) 458 p = &(*p)->rb_right; 459 else 460 BUG(); 461 } 462 463 rb_link_node(&new->node, parent, p); 464 rb_insert_color(&new->node, &monc->generic_request_tree); 465 } 466 467 static void release_generic_request(struct kref *kref) 468 { 469 struct ceph_mon_generic_request *req = 470 container_of(kref, struct ceph_mon_generic_request, kref); 471 472 if (req->reply) 473 ceph_msg_put(req->reply); 474 if (req->request) 475 ceph_msg_put(req->request); 476 477 kfree(req); 478 } 479 480 static void put_generic_request(struct ceph_mon_generic_request *req) 481 { 482 kref_put(&req->kref, release_generic_request); 483 } 484 485 static void get_generic_request(struct ceph_mon_generic_request *req) 486 { 487 kref_get(&req->kref); 488 } 489 490 static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 491 struct ceph_msg_header *hdr, 492 int *skip) 493 { 494 struct ceph_mon_client *monc = con->private; 495 struct ceph_mon_generic_request *req; 496 u64 tid = le64_to_cpu(hdr->tid); 497 struct ceph_msg *m; 498 499 mutex_lock(&monc->mutex); 500 req = __lookup_generic_req(monc, tid); 501 if (!req) { 502 dout("get_generic_reply %lld dne\n", tid); 503 *skip = 1; 504 m = NULL; 505 } else { 506 dout("get_generic_reply %lld got %p\n", tid, req->reply); 507 *skip = 0; 508 m = ceph_msg_get(req->reply); 509 /* 510 * we don't need to track the connection reading into 511 * this reply because we only have one open connection 512 * at a time, ever. 513 */ 514 } 515 mutex_unlock(&monc->mutex); 516 return m; 517 } 518 519 static int __do_generic_request(struct ceph_mon_client *monc, u64 tid, 520 struct ceph_mon_generic_request *req) 521 { 522 int err; 523 524 /* register request */ 525 req->tid = tid != 0 ? tid : ++monc->last_tid; 526 req->request->hdr.tid = cpu_to_le64(req->tid); 527 __insert_generic_request(monc, req); 528 monc->num_generic_requests++; 529 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 530 mutex_unlock(&monc->mutex); 531 532 err = wait_for_completion_interruptible(&req->completion); 533 534 mutex_lock(&monc->mutex); 535 rb_erase(&req->node, &monc->generic_request_tree); 536 monc->num_generic_requests--; 537 538 if (!err) 539 err = req->result; 540 return err; 541 } 542 543 static int do_generic_request(struct ceph_mon_client *monc, 544 struct ceph_mon_generic_request *req) 545 { 546 int err; 547 548 mutex_lock(&monc->mutex); 549 err = __do_generic_request(monc, 0, req); 550 mutex_unlock(&monc->mutex); 551 552 return err; 553 } 554 555 /* 556 * statfs 557 */ 558 static void handle_statfs_reply(struct ceph_mon_client *monc, 559 struct ceph_msg *msg) 560 { 561 struct ceph_mon_generic_request *req; 562 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 563 u64 tid = le64_to_cpu(msg->hdr.tid); 564 565 if (msg->front.iov_len != sizeof(*reply)) 566 goto bad; 567 dout("handle_statfs_reply %p tid %llu\n", msg, tid); 568 569 mutex_lock(&monc->mutex); 570 req = __lookup_generic_req(monc, tid); 571 if (req) { 572 *(struct ceph_statfs *)req->buf = reply->st; 573 req->result = 0; 574 get_generic_request(req); 575 } 576 mutex_unlock(&monc->mutex); 577 if (req) { 578 complete_all(&req->completion); 579 put_generic_request(req); 580 } 581 return; 582 583 bad: 584 pr_err("corrupt statfs reply, tid %llu\n", tid); 585 ceph_msg_dump(msg); 586 } 587 588 /* 589 * Do a synchronous statfs(). 590 */ 591 int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) 592 { 593 struct ceph_mon_generic_request *req; 594 struct ceph_mon_statfs *h; 595 int err; 596 597 req = kzalloc(sizeof(*req), GFP_NOFS); 598 if (!req) 599 return -ENOMEM; 600 601 kref_init(&req->kref); 602 req->buf = buf; 603 init_completion(&req->completion); 604 605 err = -ENOMEM; 606 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, 607 true); 608 if (!req->request) 609 goto out; 610 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS, 611 true); 612 if (!req->reply) 613 goto out; 614 615 /* fill out request */ 616 h = req->request->front.iov_base; 617 h->monhdr.have_version = 0; 618 h->monhdr.session_mon = cpu_to_le16(-1); 619 h->monhdr.session_mon_tid = 0; 620 h->fsid = monc->monmap->fsid; 621 622 err = do_generic_request(monc, req); 623 624 out: 625 put_generic_request(req); 626 return err; 627 } 628 EXPORT_SYMBOL(ceph_monc_do_statfs); 629 630 static void handle_get_version_reply(struct ceph_mon_client *monc, 631 struct ceph_msg *msg) 632 { 633 struct ceph_mon_generic_request *req; 634 u64 tid = le64_to_cpu(msg->hdr.tid); 635 void *p = msg->front.iov_base; 636 void *end = p + msg->front_alloc_len; 637 u64 handle; 638 639 dout("%s %p tid %llu\n", __func__, msg, tid); 640 641 ceph_decode_need(&p, end, 2*sizeof(u64), bad); 642 handle = ceph_decode_64(&p); 643 if (tid != 0 && tid != handle) 644 goto bad; 645 646 mutex_lock(&monc->mutex); 647 req = __lookup_generic_req(monc, handle); 648 if (req) { 649 *(u64 *)req->buf = ceph_decode_64(&p); 650 req->result = 0; 651 get_generic_request(req); 652 } 653 mutex_unlock(&monc->mutex); 654 if (req) { 655 complete_all(&req->completion); 656 put_generic_request(req); 657 } 658 659 return; 660 bad: 661 pr_err("corrupt mon_get_version reply, tid %llu\n", tid); 662 ceph_msg_dump(msg); 663 } 664 665 /* 666 * Send MMonGetVersion and wait for the reply. 667 * 668 * @what: one of "mdsmap", "osdmap" or "monmap" 669 */ 670 int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what, 671 u64 *newest) 672 { 673 struct ceph_mon_generic_request *req; 674 void *p, *end; 675 u64 tid; 676 int err; 677 678 req = kzalloc(sizeof(*req), GFP_NOFS); 679 if (!req) 680 return -ENOMEM; 681 682 kref_init(&req->kref); 683 req->buf = newest; 684 init_completion(&req->completion); 685 686 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, 687 sizeof(u64) + sizeof(u32) + strlen(what), 688 GFP_NOFS, true); 689 if (!req->request) { 690 err = -ENOMEM; 691 goto out; 692 } 693 694 req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024, 695 GFP_NOFS, true); 696 if (!req->reply) { 697 err = -ENOMEM; 698 goto out; 699 } 700 701 p = req->request->front.iov_base; 702 end = p + req->request->front_alloc_len; 703 704 /* fill out request */ 705 mutex_lock(&monc->mutex); 706 tid = ++monc->last_tid; 707 ceph_encode_64(&p, tid); /* handle */ 708 ceph_encode_string(&p, end, what, strlen(what)); 709 710 err = __do_generic_request(monc, tid, req); 711 712 mutex_unlock(&monc->mutex); 713 out: 714 put_generic_request(req); 715 return err; 716 } 717 EXPORT_SYMBOL(ceph_monc_do_get_version); 718 719 /* 720 * Resend pending generic requests. 721 */ 722 static void __resend_generic_request(struct ceph_mon_client *monc) 723 { 724 struct ceph_mon_generic_request *req; 725 struct rb_node *p; 726 727 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 728 req = rb_entry(p, struct ceph_mon_generic_request, node); 729 ceph_msg_revoke(req->request); 730 ceph_msg_revoke_incoming(req->reply); 731 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 732 } 733 } 734 735 /* 736 * Delayed work. If we haven't mounted yet, retry. Otherwise, 737 * renew/retry subscription as needed (in case it is timing out, or we 738 * got an ENOMEM). And keep the monitor connection alive. 739 */ 740 static void delayed_work(struct work_struct *work) 741 { 742 struct ceph_mon_client *monc = 743 container_of(work, struct ceph_mon_client, delayed_work.work); 744 745 dout("monc delayed_work\n"); 746 mutex_lock(&monc->mutex); 747 if (monc->hunting) { 748 __close_session(monc); 749 __open_session(monc); /* continue hunting */ 750 } else { 751 struct ceph_options *opt = monc->client->options; 752 int is_auth = ceph_auth_is_authenticated(monc->auth); 753 if (ceph_con_keepalive_expired(&monc->con, 754 opt->monc_ping_timeout)) { 755 dout("monc keepalive timeout\n"); 756 is_auth = 0; 757 __close_session(monc); 758 monc->hunting = true; 759 __open_session(monc); 760 } 761 762 if (!monc->hunting) { 763 ceph_con_keepalive(&monc->con); 764 __validate_auth(monc); 765 } 766 767 if (is_auth) 768 __send_subscribe(monc); 769 } 770 __schedule_delayed(monc); 771 mutex_unlock(&monc->mutex); 772 } 773 774 /* 775 * On startup, we build a temporary monmap populated with the IPs 776 * provided by mount(2). 777 */ 778 static int build_initial_monmap(struct ceph_mon_client *monc) 779 { 780 struct ceph_options *opt = monc->client->options; 781 struct ceph_entity_addr *mon_addr = opt->mon_addr; 782 int num_mon = opt->num_mon; 783 int i; 784 785 /* build initial monmap */ 786 monc->monmap = kzalloc(sizeof(*monc->monmap) + 787 num_mon*sizeof(monc->monmap->mon_inst[0]), 788 GFP_KERNEL); 789 if (!monc->monmap) 790 return -ENOMEM; 791 for (i = 0; i < num_mon; i++) { 792 monc->monmap->mon_inst[i].addr = mon_addr[i]; 793 monc->monmap->mon_inst[i].addr.nonce = 0; 794 monc->monmap->mon_inst[i].name.type = 795 CEPH_ENTITY_TYPE_MON; 796 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i); 797 } 798 monc->monmap->num_mon = num_mon; 799 return 0; 800 } 801 802 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 803 { 804 int err = 0; 805 806 dout("init\n"); 807 memset(monc, 0, sizeof(*monc)); 808 monc->client = cl; 809 monc->monmap = NULL; 810 mutex_init(&monc->mutex); 811 812 err = build_initial_monmap(monc); 813 if (err) 814 goto out; 815 816 /* connection */ 817 /* authentication */ 818 monc->auth = ceph_auth_init(cl->options->name, 819 cl->options->key); 820 if (IS_ERR(monc->auth)) { 821 err = PTR_ERR(monc->auth); 822 goto out_monmap; 823 } 824 monc->auth->want_keys = 825 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 826 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 827 828 /* msgs */ 829 err = -ENOMEM; 830 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 831 sizeof(struct ceph_mon_subscribe_ack), 832 GFP_NOFS, true); 833 if (!monc->m_subscribe_ack) 834 goto out_auth; 835 836 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS, 837 true); 838 if (!monc->m_subscribe) 839 goto out_subscribe_ack; 840 841 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS, 842 true); 843 if (!monc->m_auth_reply) 844 goto out_subscribe; 845 846 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true); 847 monc->pending_auth = 0; 848 if (!monc->m_auth) 849 goto out_auth_reply; 850 851 ceph_con_init(&monc->con, monc, &mon_con_ops, 852 &monc->client->msgr); 853 854 monc->cur_mon = -1; 855 monc->hunting = true; 856 monc->sub_renew_after = jiffies; 857 monc->sub_sent = 0; 858 859 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 860 monc->generic_request_tree = RB_ROOT; 861 monc->num_generic_requests = 0; 862 monc->last_tid = 0; 863 864 monc->have_mdsmap = 0; 865 monc->have_osdmap = 0; 866 monc->want_next_osdmap = 1; 867 return 0; 868 869 out_auth_reply: 870 ceph_msg_put(monc->m_auth_reply); 871 out_subscribe: 872 ceph_msg_put(monc->m_subscribe); 873 out_subscribe_ack: 874 ceph_msg_put(monc->m_subscribe_ack); 875 out_auth: 876 ceph_auth_destroy(monc->auth); 877 out_monmap: 878 kfree(monc->monmap); 879 out: 880 return err; 881 } 882 EXPORT_SYMBOL(ceph_monc_init); 883 884 void ceph_monc_stop(struct ceph_mon_client *monc) 885 { 886 dout("stop\n"); 887 cancel_delayed_work_sync(&monc->delayed_work); 888 889 mutex_lock(&monc->mutex); 890 __close_session(monc); 891 892 mutex_unlock(&monc->mutex); 893 894 /* 895 * flush msgr queue before we destroy ourselves to ensure that: 896 * - any work that references our embedded con is finished. 897 * - any osd_client or other work that may reference an authorizer 898 * finishes before we shut down the auth subsystem. 899 */ 900 ceph_msgr_flush(); 901 902 ceph_auth_destroy(monc->auth); 903 904 ceph_msg_put(monc->m_auth); 905 ceph_msg_put(monc->m_auth_reply); 906 ceph_msg_put(monc->m_subscribe); 907 ceph_msg_put(monc->m_subscribe_ack); 908 909 kfree(monc->monmap); 910 } 911 EXPORT_SYMBOL(ceph_monc_stop); 912 913 static void handle_auth_reply(struct ceph_mon_client *monc, 914 struct ceph_msg *msg) 915 { 916 int ret; 917 int was_auth = 0; 918 int had_debugfs_info, init_debugfs = 0; 919 920 mutex_lock(&monc->mutex); 921 had_debugfs_info = have_debugfs_info(monc); 922 was_auth = ceph_auth_is_authenticated(monc->auth); 923 monc->pending_auth = 0; 924 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 925 msg->front.iov_len, 926 monc->m_auth->front.iov_base, 927 monc->m_auth->front_alloc_len); 928 if (ret < 0) { 929 monc->client->auth_err = ret; 930 wake_up_all(&monc->client->auth_wq); 931 } else if (ret > 0) { 932 __send_prepared_auth_request(monc, ret); 933 } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) { 934 dout("authenticated, starting session\n"); 935 936 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 937 monc->client->msgr.inst.name.num = 938 cpu_to_le64(monc->auth->global_id); 939 940 __send_subscribe(monc); 941 __resend_generic_request(monc); 942 } 943 944 if (!had_debugfs_info && have_debugfs_info(monc)) { 945 pr_info("client%lld fsid %pU\n", 946 ceph_client_id(monc->client), 947 &monc->client->fsid); 948 init_debugfs = 1; 949 } 950 mutex_unlock(&monc->mutex); 951 952 if (init_debugfs) { 953 /* 954 * do debugfs initialization without mutex to avoid 955 * creating a locking dependency 956 */ 957 ceph_debugfs_client_init(monc->client); 958 } 959 } 960 961 static int __validate_auth(struct ceph_mon_client *monc) 962 { 963 int ret; 964 965 if (monc->pending_auth) 966 return 0; 967 968 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 969 monc->m_auth->front_alloc_len); 970 if (ret <= 0) 971 return ret; /* either an error, or no need to authenticate */ 972 __send_prepared_auth_request(monc, ret); 973 return 0; 974 } 975 976 int ceph_monc_validate_auth(struct ceph_mon_client *monc) 977 { 978 int ret; 979 980 mutex_lock(&monc->mutex); 981 ret = __validate_auth(monc); 982 mutex_unlock(&monc->mutex); 983 return ret; 984 } 985 EXPORT_SYMBOL(ceph_monc_validate_auth); 986 987 /* 988 * handle incoming message 989 */ 990 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 991 { 992 struct ceph_mon_client *monc = con->private; 993 int type = le16_to_cpu(msg->hdr.type); 994 995 if (!monc) 996 return; 997 998 switch (type) { 999 case CEPH_MSG_AUTH_REPLY: 1000 handle_auth_reply(monc, msg); 1001 break; 1002 1003 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1004 handle_subscribe_ack(monc, msg); 1005 break; 1006 1007 case CEPH_MSG_STATFS_REPLY: 1008 handle_statfs_reply(monc, msg); 1009 break; 1010 1011 case CEPH_MSG_MON_GET_VERSION_REPLY: 1012 handle_get_version_reply(monc, msg); 1013 break; 1014 1015 case CEPH_MSG_MON_MAP: 1016 ceph_monc_handle_map(monc, msg); 1017 break; 1018 1019 case CEPH_MSG_OSD_MAP: 1020 ceph_osdc_handle_map(&monc->client->osdc, msg); 1021 break; 1022 1023 default: 1024 /* can the chained handler handle it? */ 1025 if (monc->client->extra_mon_dispatch && 1026 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 1027 break; 1028 1029 pr_err("received unknown message type %d %s\n", type, 1030 ceph_msg_type_name(type)); 1031 } 1032 ceph_msg_put(msg); 1033 } 1034 1035 /* 1036 * Allocate memory for incoming message 1037 */ 1038 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 1039 struct ceph_msg_header *hdr, 1040 int *skip) 1041 { 1042 struct ceph_mon_client *monc = con->private; 1043 int type = le16_to_cpu(hdr->type); 1044 int front_len = le32_to_cpu(hdr->front_len); 1045 struct ceph_msg *m = NULL; 1046 1047 *skip = 0; 1048 1049 switch (type) { 1050 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1051 m = ceph_msg_get(monc->m_subscribe_ack); 1052 break; 1053 case CEPH_MSG_STATFS_REPLY: 1054 return get_generic_reply(con, hdr, skip); 1055 case CEPH_MSG_AUTH_REPLY: 1056 m = ceph_msg_get(monc->m_auth_reply); 1057 break; 1058 case CEPH_MSG_MON_GET_VERSION_REPLY: 1059 if (le64_to_cpu(hdr->tid) != 0) 1060 return get_generic_reply(con, hdr, skip); 1061 1062 /* 1063 * Older OSDs don't set reply tid even if the orignal 1064 * request had a non-zero tid. Workaround this weirdness 1065 * by falling through to the allocate case. 1066 */ 1067 case CEPH_MSG_MON_MAP: 1068 case CEPH_MSG_MDS_MAP: 1069 case CEPH_MSG_OSD_MAP: 1070 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1071 if (!m) 1072 return NULL; /* ENOMEM--return skip == 0 */ 1073 break; 1074 } 1075 1076 if (!m) { 1077 pr_info("alloc_msg unknown type %d\n", type); 1078 *skip = 1; 1079 } else if (front_len > m->front_alloc_len) { 1080 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n", 1081 front_len, m->front_alloc_len, 1082 (unsigned int)con->peer_name.type, 1083 le64_to_cpu(con->peer_name.num)); 1084 ceph_msg_put(m); 1085 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1086 } 1087 1088 return m; 1089 } 1090 1091 /* 1092 * If the monitor connection resets, pick a new monitor and resubmit 1093 * any pending requests. 1094 */ 1095 static void mon_fault(struct ceph_connection *con) 1096 { 1097 struct ceph_mon_client *monc = con->private; 1098 1099 if (!monc) 1100 return; 1101 1102 dout("mon_fault\n"); 1103 mutex_lock(&monc->mutex); 1104 if (!con->private) 1105 goto out; 1106 1107 if (!monc->hunting) 1108 pr_info("mon%d %s session lost, " 1109 "hunting for new mon\n", monc->cur_mon, 1110 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 1111 1112 __close_session(monc); 1113 if (!monc->hunting) { 1114 /* start hunting */ 1115 monc->hunting = true; 1116 __open_session(monc); 1117 } else { 1118 /* already hunting, let's wait a bit */ 1119 __schedule_delayed(monc); 1120 } 1121 out: 1122 mutex_unlock(&monc->mutex); 1123 } 1124 1125 /* 1126 * We can ignore refcounting on the connection struct, as all references 1127 * will come from the messenger workqueue, which is drained prior to 1128 * mon_client destruction. 1129 */ 1130 static struct ceph_connection *con_get(struct ceph_connection *con) 1131 { 1132 return con; 1133 } 1134 1135 static void con_put(struct ceph_connection *con) 1136 { 1137 } 1138 1139 static const struct ceph_connection_operations mon_con_ops = { 1140 .get = con_get, 1141 .put = con_put, 1142 .dispatch = dispatch, 1143 .fault = mon_fault, 1144 .alloc_msg = mon_alloc_msg, 1145 }; 1146