1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/types.h> 5 #include <linux/slab.h> 6 #include <linux/random.h> 7 #include <linux/sched.h> 8 9 #include <linux/ceph/mon_client.h> 10 #include <linux/ceph/libceph.h> 11 #include <linux/ceph/decode.h> 12 13 #include <linux/ceph/auth.h> 14 15 /* 16 * Interact with Ceph monitor cluster. Handle requests for new map 17 * versions, and periodically resend as needed. Also implement 18 * statfs() and umount(). 19 * 20 * A small cluster of Ceph "monitors" are responsible for managing critical 21 * cluster configuration and state information. An odd number (e.g., 3, 5) 22 * of cmon daemons use a modified version of the Paxos part-time parliament 23 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 24 * list of clients who have mounted the file system. 25 * 26 * We maintain an open, active session with a monitor at all times in order to 27 * receive timely MDSMap updates. We periodically send a keepalive byte on the 28 * TCP socket to ensure we detect a failure. If the connection does break, we 29 * randomly hunt for a new monitor. Once the connection is reestablished, we 30 * resend any outstanding requests. 31 */ 32 33 static const struct ceph_connection_operations mon_con_ops; 34 35 static int __validate_auth(struct ceph_mon_client *monc); 36 37 /* 38 * Decode a monmap blob (e.g., during mount). 39 */ 40 struct ceph_monmap *ceph_monmap_decode(void *p, void *end) 41 { 42 struct ceph_monmap *m = NULL; 43 int i, err = -EINVAL; 44 struct ceph_fsid fsid; 45 u32 epoch, num_mon; 46 u16 version; 47 u32 len; 48 49 ceph_decode_32_safe(&p, end, len, bad); 50 ceph_decode_need(&p, end, len, bad); 51 52 dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p)); 53 54 ceph_decode_16_safe(&p, end, version, bad); 55 56 ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); 57 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 58 epoch = ceph_decode_32(&p); 59 60 num_mon = ceph_decode_32(&p); 61 ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad); 62 63 if (num_mon >= CEPH_MAX_MON) 64 goto bad; 65 m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS); 66 if (m == NULL) 67 return ERR_PTR(-ENOMEM); 68 m->fsid = fsid; 69 m->epoch = epoch; 70 m->num_mon = num_mon; 71 ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0])); 72 for (i = 0; i < num_mon; i++) 73 ceph_decode_addr(&m->mon_inst[i].addr); 74 75 dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, 76 m->num_mon); 77 for (i = 0; i < m->num_mon; i++) 78 dout("monmap_decode mon%d is %s\n", i, 79 ceph_pr_addr(&m->mon_inst[i].addr.in_addr)); 80 return m; 81 82 bad: 83 dout("monmap_decode failed with %d\n", err); 84 kfree(m); 85 return ERR_PTR(err); 86 } 87 88 /* 89 * return true if *addr is included in the monmap. 90 */ 91 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 92 { 93 int i; 94 95 for (i = 0; i < m->num_mon; i++) 96 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0) 97 return 1; 98 return 0; 99 } 100 101 /* 102 * Send an auth request. 103 */ 104 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 105 { 106 monc->pending_auth = 1; 107 monc->m_auth->front.iov_len = len; 108 monc->m_auth->hdr.front_len = cpu_to_le32(len); 109 ceph_con_revoke(monc->con, monc->m_auth); 110 ceph_msg_get(monc->m_auth); /* keep our ref */ 111 ceph_con_send(monc->con, monc->m_auth); 112 } 113 114 /* 115 * Close monitor session, if any. 116 */ 117 static void __close_session(struct ceph_mon_client *monc) 118 { 119 if (monc->con) { 120 dout("__close_session closing mon%d\n", monc->cur_mon); 121 ceph_con_revoke(monc->con, monc->m_auth); 122 ceph_con_close(monc->con); 123 monc->cur_mon = -1; 124 monc->pending_auth = 0; 125 ceph_auth_reset(monc->auth); 126 } 127 } 128 129 /* 130 * Open a session with a (new) monitor. 131 */ 132 static int __open_session(struct ceph_mon_client *monc) 133 { 134 char r; 135 int ret; 136 137 if (monc->cur_mon < 0) { 138 get_random_bytes(&r, 1); 139 monc->cur_mon = r % monc->monmap->num_mon; 140 dout("open_session num=%d r=%d -> mon%d\n", 141 monc->monmap->num_mon, r, monc->cur_mon); 142 monc->sub_sent = 0; 143 monc->sub_renew_after = jiffies; /* i.e., expired */ 144 monc->want_next_osdmap = !!monc->want_next_osdmap; 145 146 dout("open_session mon%d opening\n", monc->cur_mon); 147 monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON; 148 monc->con->peer_name.num = cpu_to_le64(monc->cur_mon); 149 ceph_con_open(monc->con, 150 &monc->monmap->mon_inst[monc->cur_mon].addr); 151 152 /* initiatiate authentication handshake */ 153 ret = ceph_auth_build_hello(monc->auth, 154 monc->m_auth->front.iov_base, 155 monc->m_auth->front_max); 156 __send_prepared_auth_request(monc, ret); 157 } else { 158 dout("open_session mon%d already open\n", monc->cur_mon); 159 } 160 return 0; 161 } 162 163 static bool __sub_expired(struct ceph_mon_client *monc) 164 { 165 return time_after_eq(jiffies, monc->sub_renew_after); 166 } 167 168 /* 169 * Reschedule delayed work timer. 170 */ 171 static void __schedule_delayed(struct ceph_mon_client *monc) 172 { 173 unsigned delay; 174 175 if (monc->cur_mon < 0 || __sub_expired(monc)) 176 delay = 10 * HZ; 177 else 178 delay = 20 * HZ; 179 dout("__schedule_delayed after %u\n", delay); 180 schedule_delayed_work(&monc->delayed_work, delay); 181 } 182 183 /* 184 * Send subscribe request for mdsmap and/or osdmap. 185 */ 186 static void __send_subscribe(struct ceph_mon_client *monc) 187 { 188 dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n", 189 (unsigned)monc->sub_sent, __sub_expired(monc), 190 monc->want_next_osdmap); 191 if ((__sub_expired(monc) && !monc->sub_sent) || 192 monc->want_next_osdmap == 1) { 193 struct ceph_msg *msg = monc->m_subscribe; 194 struct ceph_mon_subscribe_item *i; 195 void *p, *end; 196 int num; 197 198 p = msg->front.iov_base; 199 end = p + msg->front_max; 200 201 num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; 202 ceph_encode_32(&p, num); 203 204 if (monc->want_next_osdmap) { 205 dout("__send_subscribe to 'osdmap' %u\n", 206 (unsigned)monc->have_osdmap); 207 ceph_encode_string(&p, end, "osdmap", 6); 208 i = p; 209 i->have = cpu_to_le64(monc->have_osdmap); 210 i->onetime = 1; 211 p += sizeof(*i); 212 monc->want_next_osdmap = 2; /* requested */ 213 } 214 if (monc->want_mdsmap) { 215 dout("__send_subscribe to 'mdsmap' %u+\n", 216 (unsigned)monc->have_mdsmap); 217 ceph_encode_string(&p, end, "mdsmap", 6); 218 i = p; 219 i->have = cpu_to_le64(monc->have_mdsmap); 220 i->onetime = 0; 221 p += sizeof(*i); 222 } 223 ceph_encode_string(&p, end, "monmap", 6); 224 i = p; 225 i->have = 0; 226 i->onetime = 0; 227 p += sizeof(*i); 228 229 msg->front.iov_len = p - msg->front.iov_base; 230 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 231 ceph_con_revoke(monc->con, msg); 232 ceph_con_send(monc->con, ceph_msg_get(msg)); 233 234 monc->sub_sent = jiffies | 1; /* never 0 */ 235 } 236 } 237 238 static void handle_subscribe_ack(struct ceph_mon_client *monc, 239 struct ceph_msg *msg) 240 { 241 unsigned seconds; 242 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 243 244 if (msg->front.iov_len < sizeof(*h)) 245 goto bad; 246 seconds = le32_to_cpu(h->duration); 247 248 mutex_lock(&monc->mutex); 249 if (monc->hunting) { 250 pr_info("mon%d %s session established\n", 251 monc->cur_mon, 252 ceph_pr_addr(&monc->con->peer_addr.in_addr)); 253 monc->hunting = false; 254 } 255 dout("handle_subscribe_ack after %d seconds\n", seconds); 256 monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1; 257 monc->sub_sent = 0; 258 mutex_unlock(&monc->mutex); 259 return; 260 bad: 261 pr_err("got corrupt subscribe-ack msg\n"); 262 ceph_msg_dump(msg); 263 } 264 265 /* 266 * Keep track of which maps we have 267 */ 268 int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got) 269 { 270 mutex_lock(&monc->mutex); 271 monc->have_mdsmap = got; 272 mutex_unlock(&monc->mutex); 273 return 0; 274 } 275 EXPORT_SYMBOL(ceph_monc_got_mdsmap); 276 277 int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got) 278 { 279 mutex_lock(&monc->mutex); 280 monc->have_osdmap = got; 281 monc->want_next_osdmap = 0; 282 mutex_unlock(&monc->mutex); 283 return 0; 284 } 285 286 /* 287 * Register interest in the next osdmap 288 */ 289 void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) 290 { 291 dout("request_next_osdmap have %u\n", monc->have_osdmap); 292 mutex_lock(&monc->mutex); 293 if (!monc->want_next_osdmap) 294 monc->want_next_osdmap = 1; 295 if (monc->want_next_osdmap < 2) 296 __send_subscribe(monc); 297 mutex_unlock(&monc->mutex); 298 } 299 300 /* 301 * 302 */ 303 int ceph_monc_open_session(struct ceph_mon_client *monc) 304 { 305 if (!monc->con) { 306 monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL); 307 if (!monc->con) 308 return -ENOMEM; 309 ceph_con_init(monc->client->msgr, monc->con); 310 monc->con->private = monc; 311 monc->con->ops = &mon_con_ops; 312 } 313 314 mutex_lock(&monc->mutex); 315 __open_session(monc); 316 __schedule_delayed(monc); 317 mutex_unlock(&monc->mutex); 318 return 0; 319 } 320 EXPORT_SYMBOL(ceph_monc_open_session); 321 322 /* 323 * The monitor responds with mount ack indicate mount success. The 324 * included client ticket allows the client to talk to MDSs and OSDs. 325 */ 326 static void ceph_monc_handle_map(struct ceph_mon_client *monc, 327 struct ceph_msg *msg) 328 { 329 struct ceph_client *client = monc->client; 330 struct ceph_monmap *monmap = NULL, *old = monc->monmap; 331 void *p, *end; 332 333 mutex_lock(&monc->mutex); 334 335 dout("handle_monmap\n"); 336 p = msg->front.iov_base; 337 end = p + msg->front.iov_len; 338 339 monmap = ceph_monmap_decode(p, end); 340 if (IS_ERR(monmap)) { 341 pr_err("problem decoding monmap, %d\n", 342 (int)PTR_ERR(monmap)); 343 goto out; 344 } 345 346 if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) { 347 kfree(monmap); 348 goto out; 349 } 350 351 client->monc.monmap = monmap; 352 kfree(old); 353 354 out: 355 mutex_unlock(&monc->mutex); 356 wake_up_all(&client->auth_wq); 357 } 358 359 /* 360 * generic requests (e.g., statfs, poolop) 361 */ 362 static struct ceph_mon_generic_request *__lookup_generic_req( 363 struct ceph_mon_client *monc, u64 tid) 364 { 365 struct ceph_mon_generic_request *req; 366 struct rb_node *n = monc->generic_request_tree.rb_node; 367 368 while (n) { 369 req = rb_entry(n, struct ceph_mon_generic_request, node); 370 if (tid < req->tid) 371 n = n->rb_left; 372 else if (tid > req->tid) 373 n = n->rb_right; 374 else 375 return req; 376 } 377 return NULL; 378 } 379 380 static void __insert_generic_request(struct ceph_mon_client *monc, 381 struct ceph_mon_generic_request *new) 382 { 383 struct rb_node **p = &monc->generic_request_tree.rb_node; 384 struct rb_node *parent = NULL; 385 struct ceph_mon_generic_request *req = NULL; 386 387 while (*p) { 388 parent = *p; 389 req = rb_entry(parent, struct ceph_mon_generic_request, node); 390 if (new->tid < req->tid) 391 p = &(*p)->rb_left; 392 else if (new->tid > req->tid) 393 p = &(*p)->rb_right; 394 else 395 BUG(); 396 } 397 398 rb_link_node(&new->node, parent, p); 399 rb_insert_color(&new->node, &monc->generic_request_tree); 400 } 401 402 static void release_generic_request(struct kref *kref) 403 { 404 struct ceph_mon_generic_request *req = 405 container_of(kref, struct ceph_mon_generic_request, kref); 406 407 if (req->reply) 408 ceph_msg_put(req->reply); 409 if (req->request) 410 ceph_msg_put(req->request); 411 412 kfree(req); 413 } 414 415 static void put_generic_request(struct ceph_mon_generic_request *req) 416 { 417 kref_put(&req->kref, release_generic_request); 418 } 419 420 static void get_generic_request(struct ceph_mon_generic_request *req) 421 { 422 kref_get(&req->kref); 423 } 424 425 static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 426 struct ceph_msg_header *hdr, 427 int *skip) 428 { 429 struct ceph_mon_client *monc = con->private; 430 struct ceph_mon_generic_request *req; 431 u64 tid = le64_to_cpu(hdr->tid); 432 struct ceph_msg *m; 433 434 mutex_lock(&monc->mutex); 435 req = __lookup_generic_req(monc, tid); 436 if (!req) { 437 dout("get_generic_reply %lld dne\n", tid); 438 *skip = 1; 439 m = NULL; 440 } else { 441 dout("get_generic_reply %lld got %p\n", tid, req->reply); 442 m = ceph_msg_get(req->reply); 443 /* 444 * we don't need to track the connection reading into 445 * this reply because we only have one open connection 446 * at a time, ever. 447 */ 448 } 449 mutex_unlock(&monc->mutex); 450 return m; 451 } 452 453 static int do_generic_request(struct ceph_mon_client *monc, 454 struct ceph_mon_generic_request *req) 455 { 456 int err; 457 458 /* register request */ 459 mutex_lock(&monc->mutex); 460 req->tid = ++monc->last_tid; 461 req->request->hdr.tid = cpu_to_le64(req->tid); 462 __insert_generic_request(monc, req); 463 monc->num_generic_requests++; 464 ceph_con_send(monc->con, ceph_msg_get(req->request)); 465 mutex_unlock(&monc->mutex); 466 467 err = wait_for_completion_interruptible(&req->completion); 468 469 mutex_lock(&monc->mutex); 470 rb_erase(&req->node, &monc->generic_request_tree); 471 monc->num_generic_requests--; 472 mutex_unlock(&monc->mutex); 473 474 if (!err) 475 err = req->result; 476 return err; 477 } 478 479 /* 480 * statfs 481 */ 482 static void handle_statfs_reply(struct ceph_mon_client *monc, 483 struct ceph_msg *msg) 484 { 485 struct ceph_mon_generic_request *req; 486 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 487 u64 tid = le64_to_cpu(msg->hdr.tid); 488 489 if (msg->front.iov_len != sizeof(*reply)) 490 goto bad; 491 dout("handle_statfs_reply %p tid %llu\n", msg, tid); 492 493 mutex_lock(&monc->mutex); 494 req = __lookup_generic_req(monc, tid); 495 if (req) { 496 *(struct ceph_statfs *)req->buf = reply->st; 497 req->result = 0; 498 get_generic_request(req); 499 } 500 mutex_unlock(&monc->mutex); 501 if (req) { 502 complete_all(&req->completion); 503 put_generic_request(req); 504 } 505 return; 506 507 bad: 508 pr_err("corrupt generic reply, tid %llu\n", tid); 509 ceph_msg_dump(msg); 510 } 511 512 /* 513 * Do a synchronous statfs(). 514 */ 515 int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) 516 { 517 struct ceph_mon_generic_request *req; 518 struct ceph_mon_statfs *h; 519 int err; 520 521 req = kzalloc(sizeof(*req), GFP_NOFS); 522 if (!req) 523 return -ENOMEM; 524 525 kref_init(&req->kref); 526 req->buf = buf; 527 req->buf_len = sizeof(*buf); 528 init_completion(&req->completion); 529 530 err = -ENOMEM; 531 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS); 532 if (!req->request) 533 goto out; 534 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS); 535 if (!req->reply) 536 goto out; 537 538 /* fill out request */ 539 h = req->request->front.iov_base; 540 h->monhdr.have_version = 0; 541 h->monhdr.session_mon = cpu_to_le16(-1); 542 h->monhdr.session_mon_tid = 0; 543 h->fsid = monc->monmap->fsid; 544 545 err = do_generic_request(monc, req); 546 547 out: 548 kref_put(&req->kref, release_generic_request); 549 return err; 550 } 551 EXPORT_SYMBOL(ceph_monc_do_statfs); 552 553 /* 554 * pool ops 555 */ 556 static int get_poolop_reply_buf(const char *src, size_t src_len, 557 char *dst, size_t dst_len) 558 { 559 u32 buf_len; 560 561 if (src_len != sizeof(u32) + dst_len) 562 return -EINVAL; 563 564 buf_len = le32_to_cpu(*(u32 *)src); 565 if (buf_len != dst_len) 566 return -EINVAL; 567 568 memcpy(dst, src + sizeof(u32), dst_len); 569 return 0; 570 } 571 572 static void handle_poolop_reply(struct ceph_mon_client *monc, 573 struct ceph_msg *msg) 574 { 575 struct ceph_mon_generic_request *req; 576 struct ceph_mon_poolop_reply *reply = msg->front.iov_base; 577 u64 tid = le64_to_cpu(msg->hdr.tid); 578 579 if (msg->front.iov_len < sizeof(*reply)) 580 goto bad; 581 dout("handle_poolop_reply %p tid %llu\n", msg, tid); 582 583 mutex_lock(&monc->mutex); 584 req = __lookup_generic_req(monc, tid); 585 if (req) { 586 if (req->buf_len && 587 get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply), 588 msg->front.iov_len - sizeof(*reply), 589 req->buf, req->buf_len) < 0) { 590 mutex_unlock(&monc->mutex); 591 goto bad; 592 } 593 req->result = le32_to_cpu(reply->reply_code); 594 get_generic_request(req); 595 } 596 mutex_unlock(&monc->mutex); 597 if (req) { 598 complete(&req->completion); 599 put_generic_request(req); 600 } 601 return; 602 603 bad: 604 pr_err("corrupt generic reply, tid %llu\n", tid); 605 ceph_msg_dump(msg); 606 } 607 608 /* 609 * Do a synchronous pool op. 610 */ 611 int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op, 612 u32 pool, u64 snapid, 613 char *buf, int len) 614 { 615 struct ceph_mon_generic_request *req; 616 struct ceph_mon_poolop *h; 617 int err; 618 619 req = kzalloc(sizeof(*req), GFP_NOFS); 620 if (!req) 621 return -ENOMEM; 622 623 kref_init(&req->kref); 624 req->buf = buf; 625 req->buf_len = len; 626 init_completion(&req->completion); 627 628 err = -ENOMEM; 629 req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS); 630 if (!req->request) 631 goto out; 632 req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS); 633 if (!req->reply) 634 goto out; 635 636 /* fill out request */ 637 req->request->hdr.version = cpu_to_le16(2); 638 h = req->request->front.iov_base; 639 h->monhdr.have_version = 0; 640 h->monhdr.session_mon = cpu_to_le16(-1); 641 h->monhdr.session_mon_tid = 0; 642 h->fsid = monc->monmap->fsid; 643 h->pool = cpu_to_le32(pool); 644 h->op = cpu_to_le32(op); 645 h->auid = 0; 646 h->snapid = cpu_to_le64(snapid); 647 h->name_len = 0; 648 649 err = do_generic_request(monc, req); 650 651 out: 652 kref_put(&req->kref, release_generic_request); 653 return err; 654 } 655 656 int ceph_monc_create_snapid(struct ceph_mon_client *monc, 657 u32 pool, u64 *snapid) 658 { 659 return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, 660 pool, 0, (char *)snapid, sizeof(*snapid)); 661 662 } 663 EXPORT_SYMBOL(ceph_monc_create_snapid); 664 665 int ceph_monc_delete_snapid(struct ceph_mon_client *monc, 666 u32 pool, u64 snapid) 667 { 668 return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, 669 pool, snapid, 0, 0); 670 671 } 672 673 /* 674 * Resend pending generic requests. 675 */ 676 static void __resend_generic_request(struct ceph_mon_client *monc) 677 { 678 struct ceph_mon_generic_request *req; 679 struct rb_node *p; 680 681 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 682 req = rb_entry(p, struct ceph_mon_generic_request, node); 683 ceph_con_revoke(monc->con, req->request); 684 ceph_con_send(monc->con, ceph_msg_get(req->request)); 685 } 686 } 687 688 /* 689 * Delayed work. If we haven't mounted yet, retry. Otherwise, 690 * renew/retry subscription as needed (in case it is timing out, or we 691 * got an ENOMEM). And keep the monitor connection alive. 692 */ 693 static void delayed_work(struct work_struct *work) 694 { 695 struct ceph_mon_client *monc = 696 container_of(work, struct ceph_mon_client, delayed_work.work); 697 698 dout("monc delayed_work\n"); 699 mutex_lock(&monc->mutex); 700 if (monc->hunting) { 701 __close_session(monc); 702 __open_session(monc); /* continue hunting */ 703 } else { 704 ceph_con_keepalive(monc->con); 705 706 __validate_auth(monc); 707 708 if (monc->auth->ops->is_authenticated(monc->auth)) 709 __send_subscribe(monc); 710 } 711 __schedule_delayed(monc); 712 mutex_unlock(&monc->mutex); 713 } 714 715 /* 716 * On startup, we build a temporary monmap populated with the IPs 717 * provided by mount(2). 718 */ 719 static int build_initial_monmap(struct ceph_mon_client *monc) 720 { 721 struct ceph_options *opt = monc->client->options; 722 struct ceph_entity_addr *mon_addr = opt->mon_addr; 723 int num_mon = opt->num_mon; 724 int i; 725 726 /* build initial monmap */ 727 monc->monmap = kzalloc(sizeof(*monc->monmap) + 728 num_mon*sizeof(monc->monmap->mon_inst[0]), 729 GFP_KERNEL); 730 if (!monc->monmap) 731 return -ENOMEM; 732 for (i = 0; i < num_mon; i++) { 733 monc->monmap->mon_inst[i].addr = mon_addr[i]; 734 monc->monmap->mon_inst[i].addr.nonce = 0; 735 monc->monmap->mon_inst[i].name.type = 736 CEPH_ENTITY_TYPE_MON; 737 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i); 738 } 739 monc->monmap->num_mon = num_mon; 740 monc->have_fsid = false; 741 return 0; 742 } 743 744 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 745 { 746 int err = 0; 747 748 dout("init\n"); 749 memset(monc, 0, sizeof(*monc)); 750 monc->client = cl; 751 monc->monmap = NULL; 752 mutex_init(&monc->mutex); 753 754 err = build_initial_monmap(monc); 755 if (err) 756 goto out; 757 758 monc->con = NULL; 759 760 /* authentication */ 761 monc->auth = ceph_auth_init(cl->options->name, 762 cl->options->secret); 763 if (IS_ERR(monc->auth)) 764 return PTR_ERR(monc->auth); 765 monc->auth->want_keys = 766 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 767 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 768 769 /* msgs */ 770 err = -ENOMEM; 771 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 772 sizeof(struct ceph_mon_subscribe_ack), 773 GFP_NOFS); 774 if (!monc->m_subscribe_ack) 775 goto out_monmap; 776 777 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS); 778 if (!monc->m_subscribe) 779 goto out_subscribe_ack; 780 781 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS); 782 if (!monc->m_auth_reply) 783 goto out_subscribe; 784 785 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS); 786 monc->pending_auth = 0; 787 if (!monc->m_auth) 788 goto out_auth_reply; 789 790 monc->cur_mon = -1; 791 monc->hunting = true; 792 monc->sub_renew_after = jiffies; 793 monc->sub_sent = 0; 794 795 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 796 monc->generic_request_tree = RB_ROOT; 797 monc->num_generic_requests = 0; 798 monc->last_tid = 0; 799 800 monc->have_mdsmap = 0; 801 monc->have_osdmap = 0; 802 monc->want_next_osdmap = 1; 803 return 0; 804 805 out_auth_reply: 806 ceph_msg_put(monc->m_auth_reply); 807 out_subscribe: 808 ceph_msg_put(monc->m_subscribe); 809 out_subscribe_ack: 810 ceph_msg_put(monc->m_subscribe_ack); 811 out_monmap: 812 kfree(monc->monmap); 813 out: 814 return err; 815 } 816 EXPORT_SYMBOL(ceph_monc_init); 817 818 void ceph_monc_stop(struct ceph_mon_client *monc) 819 { 820 dout("stop\n"); 821 cancel_delayed_work_sync(&monc->delayed_work); 822 823 mutex_lock(&monc->mutex); 824 __close_session(monc); 825 if (monc->con) { 826 monc->con->private = NULL; 827 monc->con->ops->put(monc->con); 828 monc->con = NULL; 829 } 830 mutex_unlock(&monc->mutex); 831 832 ceph_auth_destroy(monc->auth); 833 834 ceph_msg_put(monc->m_auth); 835 ceph_msg_put(monc->m_auth_reply); 836 ceph_msg_put(monc->m_subscribe); 837 ceph_msg_put(monc->m_subscribe_ack); 838 839 kfree(monc->monmap); 840 } 841 EXPORT_SYMBOL(ceph_monc_stop); 842 843 static void handle_auth_reply(struct ceph_mon_client *monc, 844 struct ceph_msg *msg) 845 { 846 int ret; 847 int was_auth = 0; 848 849 mutex_lock(&monc->mutex); 850 if (monc->auth->ops) 851 was_auth = monc->auth->ops->is_authenticated(monc->auth); 852 monc->pending_auth = 0; 853 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 854 msg->front.iov_len, 855 monc->m_auth->front.iov_base, 856 monc->m_auth->front_max); 857 if (ret < 0) { 858 monc->client->auth_err = ret; 859 wake_up_all(&monc->client->auth_wq); 860 } else if (ret > 0) { 861 __send_prepared_auth_request(monc, ret); 862 } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) { 863 dout("authenticated, starting session\n"); 864 865 monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 866 monc->client->msgr->inst.name.num = 867 cpu_to_le64(monc->auth->global_id); 868 869 __send_subscribe(monc); 870 __resend_generic_request(monc); 871 } 872 mutex_unlock(&monc->mutex); 873 } 874 875 static int __validate_auth(struct ceph_mon_client *monc) 876 { 877 int ret; 878 879 if (monc->pending_auth) 880 return 0; 881 882 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 883 monc->m_auth->front_max); 884 if (ret <= 0) 885 return ret; /* either an error, or no need to authenticate */ 886 __send_prepared_auth_request(monc, ret); 887 return 0; 888 } 889 890 int ceph_monc_validate_auth(struct ceph_mon_client *monc) 891 { 892 int ret; 893 894 mutex_lock(&monc->mutex); 895 ret = __validate_auth(monc); 896 mutex_unlock(&monc->mutex); 897 return ret; 898 } 899 EXPORT_SYMBOL(ceph_monc_validate_auth); 900 901 /* 902 * handle incoming message 903 */ 904 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 905 { 906 struct ceph_mon_client *monc = con->private; 907 int type = le16_to_cpu(msg->hdr.type); 908 909 if (!monc) 910 return; 911 912 switch (type) { 913 case CEPH_MSG_AUTH_REPLY: 914 handle_auth_reply(monc, msg); 915 break; 916 917 case CEPH_MSG_MON_SUBSCRIBE_ACK: 918 handle_subscribe_ack(monc, msg); 919 break; 920 921 case CEPH_MSG_STATFS_REPLY: 922 handle_statfs_reply(monc, msg); 923 break; 924 925 case CEPH_MSG_POOLOP_REPLY: 926 handle_poolop_reply(monc, msg); 927 break; 928 929 case CEPH_MSG_MON_MAP: 930 ceph_monc_handle_map(monc, msg); 931 break; 932 933 case CEPH_MSG_OSD_MAP: 934 ceph_osdc_handle_map(&monc->client->osdc, msg); 935 break; 936 937 default: 938 /* can the chained handler handle it? */ 939 if (monc->client->extra_mon_dispatch && 940 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 941 break; 942 943 pr_err("received unknown message type %d %s\n", type, 944 ceph_msg_type_name(type)); 945 } 946 ceph_msg_put(msg); 947 } 948 949 /* 950 * Allocate memory for incoming message 951 */ 952 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 953 struct ceph_msg_header *hdr, 954 int *skip) 955 { 956 struct ceph_mon_client *monc = con->private; 957 int type = le16_to_cpu(hdr->type); 958 int front_len = le32_to_cpu(hdr->front_len); 959 struct ceph_msg *m = NULL; 960 961 *skip = 0; 962 963 switch (type) { 964 case CEPH_MSG_MON_SUBSCRIBE_ACK: 965 m = ceph_msg_get(monc->m_subscribe_ack); 966 break; 967 case CEPH_MSG_POOLOP_REPLY: 968 case CEPH_MSG_STATFS_REPLY: 969 return get_generic_reply(con, hdr, skip); 970 case CEPH_MSG_AUTH_REPLY: 971 m = ceph_msg_get(monc->m_auth_reply); 972 break; 973 case CEPH_MSG_MON_MAP: 974 case CEPH_MSG_MDS_MAP: 975 case CEPH_MSG_OSD_MAP: 976 m = ceph_msg_new(type, front_len, GFP_NOFS); 977 break; 978 } 979 980 if (!m) { 981 pr_info("alloc_msg unknown type %d\n", type); 982 *skip = 1; 983 } 984 return m; 985 } 986 987 /* 988 * If the monitor connection resets, pick a new monitor and resubmit 989 * any pending requests. 990 */ 991 static void mon_fault(struct ceph_connection *con) 992 { 993 struct ceph_mon_client *monc = con->private; 994 995 if (!monc) 996 return; 997 998 dout("mon_fault\n"); 999 mutex_lock(&monc->mutex); 1000 if (!con->private) 1001 goto out; 1002 1003 if (monc->con && !monc->hunting) 1004 pr_info("mon%d %s session lost, " 1005 "hunting for new mon\n", monc->cur_mon, 1006 ceph_pr_addr(&monc->con->peer_addr.in_addr)); 1007 1008 __close_session(monc); 1009 if (!monc->hunting) { 1010 /* start hunting */ 1011 monc->hunting = true; 1012 __open_session(monc); 1013 } else { 1014 /* already hunting, let's wait a bit */ 1015 __schedule_delayed(monc); 1016 } 1017 out: 1018 mutex_unlock(&monc->mutex); 1019 } 1020 1021 static const struct ceph_connection_operations mon_con_ops = { 1022 .get = ceph_con_get, 1023 .put = ceph_con_put, 1024 .dispatch = dispatch, 1025 .fault = mon_fault, 1026 .alloc_msg = mon_alloc_msg, 1027 }; 1028