1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/types.h> 5 #include <linux/slab.h> 6 #include <linux/random.h> 7 #include <linux/sched.h> 8 9 #include <linux/ceph/mon_client.h> 10 #include <linux/ceph/libceph.h> 11 #include <linux/ceph/debugfs.h> 12 #include <linux/ceph/decode.h> 13 #include <linux/ceph/auth.h> 14 15 /* 16 * Interact with Ceph monitor cluster. Handle requests for new map 17 * versions, and periodically resend as needed. Also implement 18 * statfs() and umount(). 19 * 20 * A small cluster of Ceph "monitors" are responsible for managing critical 21 * cluster configuration and state information. An odd number (e.g., 3, 5) 22 * of cmon daemons use a modified version of the Paxos part-time parliament 23 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 24 * list of clients who have mounted the file system. 25 * 26 * We maintain an open, active session with a monitor at all times in order to 27 * receive timely MDSMap updates. We periodically send a keepalive byte on the 28 * TCP socket to ensure we detect a failure. If the connection does break, we 29 * randomly hunt for a new monitor. Once the connection is reestablished, we 30 * resend any outstanding requests. 31 */ 32 33 static const struct ceph_connection_operations mon_con_ops; 34 35 static int __validate_auth(struct ceph_mon_client *monc); 36 37 /* 38 * Decode a monmap blob (e.g., during mount). 39 */ 40 struct ceph_monmap *ceph_monmap_decode(void *p, void *end) 41 { 42 struct ceph_monmap *m = NULL; 43 int i, err = -EINVAL; 44 struct ceph_fsid fsid; 45 u32 epoch, num_mon; 46 u16 version; 47 u32 len; 48 49 ceph_decode_32_safe(&p, end, len, bad); 50 ceph_decode_need(&p, end, len, bad); 51 52 dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p)); 53 54 ceph_decode_16_safe(&p, end, version, bad); 55 56 ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); 57 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 58 epoch = ceph_decode_32(&p); 59 60 num_mon = ceph_decode_32(&p); 61 ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad); 62 63 if (num_mon >= CEPH_MAX_MON) 64 goto bad; 65 m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS); 66 if (m == NULL) 67 return ERR_PTR(-ENOMEM); 68 m->fsid = fsid; 69 m->epoch = epoch; 70 m->num_mon = num_mon; 71 ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0])); 72 for (i = 0; i < num_mon; i++) 73 ceph_decode_addr(&m->mon_inst[i].addr); 74 75 dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, 76 m->num_mon); 77 for (i = 0; i < m->num_mon; i++) 78 dout("monmap_decode mon%d is %s\n", i, 79 ceph_pr_addr(&m->mon_inst[i].addr.in_addr)); 80 return m; 81 82 bad: 83 dout("monmap_decode failed with %d\n", err); 84 kfree(m); 85 return ERR_PTR(err); 86 } 87 88 /* 89 * return true if *addr is included in the monmap. 90 */ 91 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 92 { 93 int i; 94 95 for (i = 0; i < m->num_mon; i++) 96 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0) 97 return 1; 98 return 0; 99 } 100 101 /* 102 * Send an auth request. 103 */ 104 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 105 { 106 monc->pending_auth = 1; 107 monc->m_auth->front.iov_len = len; 108 monc->m_auth->hdr.front_len = cpu_to_le32(len); 109 ceph_msg_revoke(monc->m_auth); 110 ceph_msg_get(monc->m_auth); /* keep our ref */ 111 ceph_con_send(&monc->con, monc->m_auth); 112 } 113 114 /* 115 * Close monitor session, if any. 116 */ 117 static void __close_session(struct ceph_mon_client *monc) 118 { 119 dout("__close_session closing mon%d\n", monc->cur_mon); 120 ceph_msg_revoke(monc->m_auth); 121 ceph_msg_revoke_incoming(monc->m_auth_reply); 122 ceph_msg_revoke(monc->m_subscribe); 123 ceph_msg_revoke_incoming(monc->m_subscribe_ack); 124 ceph_con_close(&monc->con); 125 monc->cur_mon = -1; 126 monc->pending_auth = 0; 127 ceph_auth_reset(monc->auth); 128 } 129 130 /* 131 * Open a session with a (new) monitor. 132 */ 133 static int __open_session(struct ceph_mon_client *monc) 134 { 135 char r; 136 int ret; 137 138 if (monc->cur_mon < 0) { 139 get_random_bytes(&r, 1); 140 monc->cur_mon = r % monc->monmap->num_mon; 141 dout("open_session num=%d r=%d -> mon%d\n", 142 monc->monmap->num_mon, r, monc->cur_mon); 143 monc->sub_sent = 0; 144 monc->sub_renew_after = jiffies; /* i.e., expired */ 145 monc->want_next_osdmap = !!monc->want_next_osdmap; 146 147 dout("open_session mon%d opening\n", monc->cur_mon); 148 ceph_con_open(&monc->con, 149 CEPH_ENTITY_TYPE_MON, monc->cur_mon, 150 &monc->monmap->mon_inst[monc->cur_mon].addr); 151 152 /* send an initial keepalive to ensure our timestamp is 153 * valid by the time we are in an OPENED state */ 154 ceph_con_keepalive(&monc->con); 155 156 /* initiatiate authentication handshake */ 157 ret = ceph_auth_build_hello(monc->auth, 158 monc->m_auth->front.iov_base, 159 monc->m_auth->front_alloc_len); 160 __send_prepared_auth_request(monc, ret); 161 } else { 162 dout("open_session mon%d already open\n", monc->cur_mon); 163 } 164 return 0; 165 } 166 167 static bool __sub_expired(struct ceph_mon_client *monc) 168 { 169 return time_after_eq(jiffies, monc->sub_renew_after); 170 } 171 172 /* 173 * Reschedule delayed work timer. 174 */ 175 static void __schedule_delayed(struct ceph_mon_client *monc) 176 { 177 struct ceph_options *opt = monc->client->options; 178 unsigned long delay; 179 180 if (monc->cur_mon < 0 || __sub_expired(monc)) { 181 delay = 10 * HZ; 182 } else { 183 delay = 20 * HZ; 184 if (opt->monc_ping_timeout > 0) 185 delay = min(delay, opt->monc_ping_timeout / 3); 186 } 187 dout("__schedule_delayed after %lu\n", delay); 188 schedule_delayed_work(&monc->delayed_work, 189 round_jiffies_relative(delay)); 190 } 191 192 /* 193 * Send subscribe request for mdsmap and/or osdmap. 194 */ 195 static void __send_subscribe(struct ceph_mon_client *monc) 196 { 197 dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n", 198 (unsigned int)monc->sub_sent, __sub_expired(monc), 199 monc->want_next_osdmap); 200 if ((__sub_expired(monc) && !monc->sub_sent) || 201 monc->want_next_osdmap == 1) { 202 struct ceph_msg *msg = monc->m_subscribe; 203 struct ceph_mon_subscribe_item *i; 204 void *p, *end; 205 int num; 206 207 p = msg->front.iov_base; 208 end = p + msg->front_alloc_len; 209 210 num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; 211 ceph_encode_32(&p, num); 212 213 if (monc->want_next_osdmap) { 214 dout("__send_subscribe to 'osdmap' %u\n", 215 (unsigned int)monc->have_osdmap); 216 ceph_encode_string(&p, end, "osdmap", 6); 217 i = p; 218 i->have = cpu_to_le64(monc->have_osdmap); 219 i->onetime = 1; 220 p += sizeof(*i); 221 monc->want_next_osdmap = 2; /* requested */ 222 } 223 if (monc->want_mdsmap) { 224 dout("__send_subscribe to 'mdsmap' %u+\n", 225 (unsigned int)monc->have_mdsmap); 226 ceph_encode_string(&p, end, "mdsmap", 6); 227 i = p; 228 i->have = cpu_to_le64(monc->have_mdsmap); 229 i->onetime = 0; 230 p += sizeof(*i); 231 } 232 ceph_encode_string(&p, end, "monmap", 6); 233 i = p; 234 i->have = 0; 235 i->onetime = 0; 236 p += sizeof(*i); 237 238 msg->front.iov_len = p - msg->front.iov_base; 239 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 240 ceph_msg_revoke(msg); 241 ceph_con_send(&monc->con, ceph_msg_get(msg)); 242 243 monc->sub_sent = jiffies | 1; /* never 0 */ 244 } 245 } 246 247 static void handle_subscribe_ack(struct ceph_mon_client *monc, 248 struct ceph_msg *msg) 249 { 250 unsigned int seconds; 251 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 252 253 if (msg->front.iov_len < sizeof(*h)) 254 goto bad; 255 seconds = le32_to_cpu(h->duration); 256 257 mutex_lock(&monc->mutex); 258 if (monc->hunting) { 259 pr_info("mon%d %s session established\n", 260 monc->cur_mon, 261 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 262 monc->hunting = false; 263 } 264 dout("handle_subscribe_ack after %d seconds\n", seconds); 265 monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1; 266 monc->sub_sent = 0; 267 mutex_unlock(&monc->mutex); 268 return; 269 bad: 270 pr_err("got corrupt subscribe-ack msg\n"); 271 ceph_msg_dump(msg); 272 } 273 274 /* 275 * Keep track of which maps we have 276 */ 277 int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got) 278 { 279 mutex_lock(&monc->mutex); 280 monc->have_mdsmap = got; 281 mutex_unlock(&monc->mutex); 282 return 0; 283 } 284 EXPORT_SYMBOL(ceph_monc_got_mdsmap); 285 286 int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got) 287 { 288 mutex_lock(&monc->mutex); 289 monc->have_osdmap = got; 290 monc->want_next_osdmap = 0; 291 mutex_unlock(&monc->mutex); 292 return 0; 293 } 294 295 /* 296 * Register interest in the next osdmap 297 */ 298 void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) 299 { 300 dout("request_next_osdmap have %u\n", monc->have_osdmap); 301 mutex_lock(&monc->mutex); 302 if (!monc->want_next_osdmap) 303 monc->want_next_osdmap = 1; 304 if (monc->want_next_osdmap < 2) 305 __send_subscribe(monc); 306 mutex_unlock(&monc->mutex); 307 } 308 EXPORT_SYMBOL(ceph_monc_request_next_osdmap); 309 310 /* 311 * Wait for an osdmap with a given epoch. 312 * 313 * @epoch: epoch to wait for 314 * @timeout: in jiffies, 0 means "wait forever" 315 */ 316 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, 317 unsigned long timeout) 318 { 319 unsigned long started = jiffies; 320 long ret; 321 322 mutex_lock(&monc->mutex); 323 while (monc->have_osdmap < epoch) { 324 mutex_unlock(&monc->mutex); 325 326 if (timeout && time_after_eq(jiffies, started + timeout)) 327 return -ETIMEDOUT; 328 329 ret = wait_event_interruptible_timeout(monc->client->auth_wq, 330 monc->have_osdmap >= epoch, 331 ceph_timeout_jiffies(timeout)); 332 if (ret < 0) 333 return ret; 334 335 mutex_lock(&monc->mutex); 336 } 337 338 mutex_unlock(&monc->mutex); 339 return 0; 340 } 341 EXPORT_SYMBOL(ceph_monc_wait_osdmap); 342 343 /* 344 * 345 */ 346 int ceph_monc_open_session(struct ceph_mon_client *monc) 347 { 348 mutex_lock(&monc->mutex); 349 __open_session(monc); 350 __schedule_delayed(monc); 351 mutex_unlock(&monc->mutex); 352 return 0; 353 } 354 EXPORT_SYMBOL(ceph_monc_open_session); 355 356 /* 357 * We require the fsid and global_id in order to initialize our 358 * debugfs dir. 359 */ 360 static bool have_debugfs_info(struct ceph_mon_client *monc) 361 { 362 dout("have_debugfs_info fsid %d globalid %lld\n", 363 (int)monc->client->have_fsid, monc->auth->global_id); 364 return monc->client->have_fsid && monc->auth->global_id > 0; 365 } 366 367 /* 368 * The monitor responds with mount ack indicate mount success. The 369 * included client ticket allows the client to talk to MDSs and OSDs. 370 */ 371 static void ceph_monc_handle_map(struct ceph_mon_client *monc, 372 struct ceph_msg *msg) 373 { 374 struct ceph_client *client = monc->client; 375 struct ceph_monmap *monmap = NULL, *old = monc->monmap; 376 void *p, *end; 377 int had_debugfs_info, init_debugfs = 0; 378 379 mutex_lock(&monc->mutex); 380 381 had_debugfs_info = have_debugfs_info(monc); 382 383 dout("handle_monmap\n"); 384 p = msg->front.iov_base; 385 end = p + msg->front.iov_len; 386 387 monmap = ceph_monmap_decode(p, end); 388 if (IS_ERR(monmap)) { 389 pr_err("problem decoding monmap, %d\n", 390 (int)PTR_ERR(monmap)); 391 goto out; 392 } 393 394 if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) { 395 kfree(monmap); 396 goto out; 397 } 398 399 client->monc.monmap = monmap; 400 kfree(old); 401 402 if (!client->have_fsid) { 403 client->have_fsid = true; 404 if (!had_debugfs_info && have_debugfs_info(monc)) { 405 pr_info("client%lld fsid %pU\n", 406 ceph_client_id(monc->client), 407 &monc->client->fsid); 408 init_debugfs = 1; 409 } 410 mutex_unlock(&monc->mutex); 411 412 if (init_debugfs) { 413 /* 414 * do debugfs initialization without mutex to avoid 415 * creating a locking dependency 416 */ 417 ceph_debugfs_client_init(monc->client); 418 } 419 420 goto out_unlocked; 421 } 422 out: 423 mutex_unlock(&monc->mutex); 424 out_unlocked: 425 wake_up_all(&client->auth_wq); 426 } 427 428 /* 429 * generic requests (currently statfs, mon_get_version) 430 */ 431 static struct ceph_mon_generic_request *__lookup_generic_req( 432 struct ceph_mon_client *monc, u64 tid) 433 { 434 struct ceph_mon_generic_request *req; 435 struct rb_node *n = monc->generic_request_tree.rb_node; 436 437 while (n) { 438 req = rb_entry(n, struct ceph_mon_generic_request, node); 439 if (tid < req->tid) 440 n = n->rb_left; 441 else if (tid > req->tid) 442 n = n->rb_right; 443 else 444 return req; 445 } 446 return NULL; 447 } 448 449 static void __insert_generic_request(struct ceph_mon_client *monc, 450 struct ceph_mon_generic_request *new) 451 { 452 struct rb_node **p = &monc->generic_request_tree.rb_node; 453 struct rb_node *parent = NULL; 454 struct ceph_mon_generic_request *req = NULL; 455 456 while (*p) { 457 parent = *p; 458 req = rb_entry(parent, struct ceph_mon_generic_request, node); 459 if (new->tid < req->tid) 460 p = &(*p)->rb_left; 461 else if (new->tid > req->tid) 462 p = &(*p)->rb_right; 463 else 464 BUG(); 465 } 466 467 rb_link_node(&new->node, parent, p); 468 rb_insert_color(&new->node, &monc->generic_request_tree); 469 } 470 471 static void release_generic_request(struct kref *kref) 472 { 473 struct ceph_mon_generic_request *req = 474 container_of(kref, struct ceph_mon_generic_request, kref); 475 476 if (req->reply) 477 ceph_msg_put(req->reply); 478 if (req->request) 479 ceph_msg_put(req->request); 480 481 kfree(req); 482 } 483 484 static void put_generic_request(struct ceph_mon_generic_request *req) 485 { 486 kref_put(&req->kref, release_generic_request); 487 } 488 489 static void get_generic_request(struct ceph_mon_generic_request *req) 490 { 491 kref_get(&req->kref); 492 } 493 494 static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 495 struct ceph_msg_header *hdr, 496 int *skip) 497 { 498 struct ceph_mon_client *monc = con->private; 499 struct ceph_mon_generic_request *req; 500 u64 tid = le64_to_cpu(hdr->tid); 501 struct ceph_msg *m; 502 503 mutex_lock(&monc->mutex); 504 req = __lookup_generic_req(monc, tid); 505 if (!req) { 506 dout("get_generic_reply %lld dne\n", tid); 507 *skip = 1; 508 m = NULL; 509 } else { 510 dout("get_generic_reply %lld got %p\n", tid, req->reply); 511 *skip = 0; 512 m = ceph_msg_get(req->reply); 513 /* 514 * we don't need to track the connection reading into 515 * this reply because we only have one open connection 516 * at a time, ever. 517 */ 518 } 519 mutex_unlock(&monc->mutex); 520 return m; 521 } 522 523 static int __do_generic_request(struct ceph_mon_client *monc, u64 tid, 524 struct ceph_mon_generic_request *req) 525 { 526 int err; 527 528 /* register request */ 529 req->tid = tid != 0 ? tid : ++monc->last_tid; 530 req->request->hdr.tid = cpu_to_le64(req->tid); 531 __insert_generic_request(monc, req); 532 monc->num_generic_requests++; 533 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 534 mutex_unlock(&monc->mutex); 535 536 err = wait_for_completion_interruptible(&req->completion); 537 538 mutex_lock(&monc->mutex); 539 rb_erase(&req->node, &monc->generic_request_tree); 540 monc->num_generic_requests--; 541 542 if (!err) 543 err = req->result; 544 return err; 545 } 546 547 static int do_generic_request(struct ceph_mon_client *monc, 548 struct ceph_mon_generic_request *req) 549 { 550 int err; 551 552 mutex_lock(&monc->mutex); 553 err = __do_generic_request(monc, 0, req); 554 mutex_unlock(&monc->mutex); 555 556 return err; 557 } 558 559 /* 560 * statfs 561 */ 562 static void handle_statfs_reply(struct ceph_mon_client *monc, 563 struct ceph_msg *msg) 564 { 565 struct ceph_mon_generic_request *req; 566 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 567 u64 tid = le64_to_cpu(msg->hdr.tid); 568 569 if (msg->front.iov_len != sizeof(*reply)) 570 goto bad; 571 dout("handle_statfs_reply %p tid %llu\n", msg, tid); 572 573 mutex_lock(&monc->mutex); 574 req = __lookup_generic_req(monc, tid); 575 if (req) { 576 *(struct ceph_statfs *)req->buf = reply->st; 577 req->result = 0; 578 get_generic_request(req); 579 } 580 mutex_unlock(&monc->mutex); 581 if (req) { 582 complete_all(&req->completion); 583 put_generic_request(req); 584 } 585 return; 586 587 bad: 588 pr_err("corrupt statfs reply, tid %llu\n", tid); 589 ceph_msg_dump(msg); 590 } 591 592 /* 593 * Do a synchronous statfs(). 594 */ 595 int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) 596 { 597 struct ceph_mon_generic_request *req; 598 struct ceph_mon_statfs *h; 599 int err; 600 601 req = kzalloc(sizeof(*req), GFP_NOFS); 602 if (!req) 603 return -ENOMEM; 604 605 kref_init(&req->kref); 606 req->buf = buf; 607 init_completion(&req->completion); 608 609 err = -ENOMEM; 610 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, 611 true); 612 if (!req->request) 613 goto out; 614 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS, 615 true); 616 if (!req->reply) 617 goto out; 618 619 /* fill out request */ 620 h = req->request->front.iov_base; 621 h->monhdr.have_version = 0; 622 h->monhdr.session_mon = cpu_to_le16(-1); 623 h->monhdr.session_mon_tid = 0; 624 h->fsid = monc->monmap->fsid; 625 626 err = do_generic_request(monc, req); 627 628 out: 629 put_generic_request(req); 630 return err; 631 } 632 EXPORT_SYMBOL(ceph_monc_do_statfs); 633 634 static void handle_get_version_reply(struct ceph_mon_client *monc, 635 struct ceph_msg *msg) 636 { 637 struct ceph_mon_generic_request *req; 638 u64 tid = le64_to_cpu(msg->hdr.tid); 639 void *p = msg->front.iov_base; 640 void *end = p + msg->front_alloc_len; 641 u64 handle; 642 643 dout("%s %p tid %llu\n", __func__, msg, tid); 644 645 ceph_decode_need(&p, end, 2*sizeof(u64), bad); 646 handle = ceph_decode_64(&p); 647 if (tid != 0 && tid != handle) 648 goto bad; 649 650 mutex_lock(&monc->mutex); 651 req = __lookup_generic_req(monc, handle); 652 if (req) { 653 *(u64 *)req->buf = ceph_decode_64(&p); 654 req->result = 0; 655 get_generic_request(req); 656 } 657 mutex_unlock(&monc->mutex); 658 if (req) { 659 complete_all(&req->completion); 660 put_generic_request(req); 661 } 662 663 return; 664 bad: 665 pr_err("corrupt mon_get_version reply, tid %llu\n", tid); 666 ceph_msg_dump(msg); 667 } 668 669 /* 670 * Send MMonGetVersion and wait for the reply. 671 * 672 * @what: one of "mdsmap", "osdmap" or "monmap" 673 */ 674 int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what, 675 u64 *newest) 676 { 677 struct ceph_mon_generic_request *req; 678 void *p, *end; 679 u64 tid; 680 int err; 681 682 req = kzalloc(sizeof(*req), GFP_NOFS); 683 if (!req) 684 return -ENOMEM; 685 686 kref_init(&req->kref); 687 req->buf = newest; 688 init_completion(&req->completion); 689 690 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, 691 sizeof(u64) + sizeof(u32) + strlen(what), 692 GFP_NOFS, true); 693 if (!req->request) { 694 err = -ENOMEM; 695 goto out; 696 } 697 698 req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024, 699 GFP_NOFS, true); 700 if (!req->reply) { 701 err = -ENOMEM; 702 goto out; 703 } 704 705 p = req->request->front.iov_base; 706 end = p + req->request->front_alloc_len; 707 708 /* fill out request */ 709 mutex_lock(&monc->mutex); 710 tid = ++monc->last_tid; 711 ceph_encode_64(&p, tid); /* handle */ 712 ceph_encode_string(&p, end, what, strlen(what)); 713 714 err = __do_generic_request(monc, tid, req); 715 716 mutex_unlock(&monc->mutex); 717 out: 718 put_generic_request(req); 719 return err; 720 } 721 EXPORT_SYMBOL(ceph_monc_do_get_version); 722 723 /* 724 * Resend pending generic requests. 725 */ 726 static void __resend_generic_request(struct ceph_mon_client *monc) 727 { 728 struct ceph_mon_generic_request *req; 729 struct rb_node *p; 730 731 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 732 req = rb_entry(p, struct ceph_mon_generic_request, node); 733 ceph_msg_revoke(req->request); 734 ceph_msg_revoke_incoming(req->reply); 735 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 736 } 737 } 738 739 /* 740 * Delayed work. If we haven't mounted yet, retry. Otherwise, 741 * renew/retry subscription as needed (in case it is timing out, or we 742 * got an ENOMEM). And keep the monitor connection alive. 743 */ 744 static void delayed_work(struct work_struct *work) 745 { 746 struct ceph_mon_client *monc = 747 container_of(work, struct ceph_mon_client, delayed_work.work); 748 749 dout("monc delayed_work\n"); 750 mutex_lock(&monc->mutex); 751 if (monc->hunting) { 752 __close_session(monc); 753 __open_session(monc); /* continue hunting */ 754 } else { 755 struct ceph_options *opt = monc->client->options; 756 int is_auth = ceph_auth_is_authenticated(monc->auth); 757 if (ceph_con_keepalive_expired(&monc->con, 758 opt->monc_ping_timeout)) { 759 dout("monc keepalive timeout\n"); 760 is_auth = 0; 761 __close_session(monc); 762 monc->hunting = true; 763 __open_session(monc); 764 } 765 766 if (!monc->hunting) { 767 ceph_con_keepalive(&monc->con); 768 __validate_auth(monc); 769 } 770 771 if (is_auth) 772 __send_subscribe(monc); 773 } 774 __schedule_delayed(monc); 775 mutex_unlock(&monc->mutex); 776 } 777 778 /* 779 * On startup, we build a temporary monmap populated with the IPs 780 * provided by mount(2). 781 */ 782 static int build_initial_monmap(struct ceph_mon_client *monc) 783 { 784 struct ceph_options *opt = monc->client->options; 785 struct ceph_entity_addr *mon_addr = opt->mon_addr; 786 int num_mon = opt->num_mon; 787 int i; 788 789 /* build initial monmap */ 790 monc->monmap = kzalloc(sizeof(*monc->monmap) + 791 num_mon*sizeof(monc->monmap->mon_inst[0]), 792 GFP_KERNEL); 793 if (!monc->monmap) 794 return -ENOMEM; 795 for (i = 0; i < num_mon; i++) { 796 monc->monmap->mon_inst[i].addr = mon_addr[i]; 797 monc->monmap->mon_inst[i].addr.nonce = 0; 798 monc->monmap->mon_inst[i].name.type = 799 CEPH_ENTITY_TYPE_MON; 800 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i); 801 } 802 monc->monmap->num_mon = num_mon; 803 return 0; 804 } 805 806 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 807 { 808 int err = 0; 809 810 dout("init\n"); 811 memset(monc, 0, sizeof(*monc)); 812 monc->client = cl; 813 monc->monmap = NULL; 814 mutex_init(&monc->mutex); 815 816 err = build_initial_monmap(monc); 817 if (err) 818 goto out; 819 820 /* connection */ 821 /* authentication */ 822 monc->auth = ceph_auth_init(cl->options->name, 823 cl->options->key); 824 if (IS_ERR(monc->auth)) { 825 err = PTR_ERR(monc->auth); 826 goto out_monmap; 827 } 828 monc->auth->want_keys = 829 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 830 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 831 832 /* msgs */ 833 err = -ENOMEM; 834 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 835 sizeof(struct ceph_mon_subscribe_ack), 836 GFP_NOFS, true); 837 if (!monc->m_subscribe_ack) 838 goto out_auth; 839 840 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS, 841 true); 842 if (!monc->m_subscribe) 843 goto out_subscribe_ack; 844 845 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS, 846 true); 847 if (!monc->m_auth_reply) 848 goto out_subscribe; 849 850 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true); 851 monc->pending_auth = 0; 852 if (!monc->m_auth) 853 goto out_auth_reply; 854 855 ceph_con_init(&monc->con, monc, &mon_con_ops, 856 &monc->client->msgr); 857 858 monc->cur_mon = -1; 859 monc->hunting = true; 860 monc->sub_renew_after = jiffies; 861 monc->sub_sent = 0; 862 863 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 864 monc->generic_request_tree = RB_ROOT; 865 monc->num_generic_requests = 0; 866 monc->last_tid = 0; 867 868 monc->have_mdsmap = 0; 869 monc->have_osdmap = 0; 870 monc->want_next_osdmap = 1; 871 return 0; 872 873 out_auth_reply: 874 ceph_msg_put(monc->m_auth_reply); 875 out_subscribe: 876 ceph_msg_put(monc->m_subscribe); 877 out_subscribe_ack: 878 ceph_msg_put(monc->m_subscribe_ack); 879 out_auth: 880 ceph_auth_destroy(monc->auth); 881 out_monmap: 882 kfree(monc->monmap); 883 out: 884 return err; 885 } 886 EXPORT_SYMBOL(ceph_monc_init); 887 888 void ceph_monc_stop(struct ceph_mon_client *monc) 889 { 890 dout("stop\n"); 891 cancel_delayed_work_sync(&monc->delayed_work); 892 893 mutex_lock(&monc->mutex); 894 __close_session(monc); 895 896 mutex_unlock(&monc->mutex); 897 898 /* 899 * flush msgr queue before we destroy ourselves to ensure that: 900 * - any work that references our embedded con is finished. 901 * - any osd_client or other work that may reference an authorizer 902 * finishes before we shut down the auth subsystem. 903 */ 904 ceph_msgr_flush(); 905 906 ceph_auth_destroy(monc->auth); 907 908 ceph_msg_put(monc->m_auth); 909 ceph_msg_put(monc->m_auth_reply); 910 ceph_msg_put(monc->m_subscribe); 911 ceph_msg_put(monc->m_subscribe_ack); 912 913 kfree(monc->monmap); 914 } 915 EXPORT_SYMBOL(ceph_monc_stop); 916 917 static void handle_auth_reply(struct ceph_mon_client *monc, 918 struct ceph_msg *msg) 919 { 920 int ret; 921 int was_auth = 0; 922 int had_debugfs_info, init_debugfs = 0; 923 924 mutex_lock(&monc->mutex); 925 had_debugfs_info = have_debugfs_info(monc); 926 was_auth = ceph_auth_is_authenticated(monc->auth); 927 monc->pending_auth = 0; 928 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 929 msg->front.iov_len, 930 monc->m_auth->front.iov_base, 931 monc->m_auth->front_alloc_len); 932 if (ret < 0) { 933 monc->client->auth_err = ret; 934 wake_up_all(&monc->client->auth_wq); 935 } else if (ret > 0) { 936 __send_prepared_auth_request(monc, ret); 937 } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) { 938 dout("authenticated, starting session\n"); 939 940 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 941 monc->client->msgr.inst.name.num = 942 cpu_to_le64(monc->auth->global_id); 943 944 __send_subscribe(monc); 945 __resend_generic_request(monc); 946 } 947 948 if (!had_debugfs_info && have_debugfs_info(monc)) { 949 pr_info("client%lld fsid %pU\n", 950 ceph_client_id(monc->client), 951 &monc->client->fsid); 952 init_debugfs = 1; 953 } 954 mutex_unlock(&monc->mutex); 955 956 if (init_debugfs) { 957 /* 958 * do debugfs initialization without mutex to avoid 959 * creating a locking dependency 960 */ 961 ceph_debugfs_client_init(monc->client); 962 } 963 } 964 965 static int __validate_auth(struct ceph_mon_client *monc) 966 { 967 int ret; 968 969 if (monc->pending_auth) 970 return 0; 971 972 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 973 monc->m_auth->front_alloc_len); 974 if (ret <= 0) 975 return ret; /* either an error, or no need to authenticate */ 976 __send_prepared_auth_request(monc, ret); 977 return 0; 978 } 979 980 int ceph_monc_validate_auth(struct ceph_mon_client *monc) 981 { 982 int ret; 983 984 mutex_lock(&monc->mutex); 985 ret = __validate_auth(monc); 986 mutex_unlock(&monc->mutex); 987 return ret; 988 } 989 EXPORT_SYMBOL(ceph_monc_validate_auth); 990 991 /* 992 * handle incoming message 993 */ 994 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 995 { 996 struct ceph_mon_client *monc = con->private; 997 int type = le16_to_cpu(msg->hdr.type); 998 999 if (!monc) 1000 return; 1001 1002 switch (type) { 1003 case CEPH_MSG_AUTH_REPLY: 1004 handle_auth_reply(monc, msg); 1005 break; 1006 1007 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1008 handle_subscribe_ack(monc, msg); 1009 break; 1010 1011 case CEPH_MSG_STATFS_REPLY: 1012 handle_statfs_reply(monc, msg); 1013 break; 1014 1015 case CEPH_MSG_MON_GET_VERSION_REPLY: 1016 handle_get_version_reply(monc, msg); 1017 break; 1018 1019 case CEPH_MSG_MON_MAP: 1020 ceph_monc_handle_map(monc, msg); 1021 break; 1022 1023 case CEPH_MSG_OSD_MAP: 1024 ceph_osdc_handle_map(&monc->client->osdc, msg); 1025 break; 1026 1027 default: 1028 /* can the chained handler handle it? */ 1029 if (monc->client->extra_mon_dispatch && 1030 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 1031 break; 1032 1033 pr_err("received unknown message type %d %s\n", type, 1034 ceph_msg_type_name(type)); 1035 } 1036 ceph_msg_put(msg); 1037 } 1038 1039 /* 1040 * Allocate memory for incoming message 1041 */ 1042 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 1043 struct ceph_msg_header *hdr, 1044 int *skip) 1045 { 1046 struct ceph_mon_client *monc = con->private; 1047 int type = le16_to_cpu(hdr->type); 1048 int front_len = le32_to_cpu(hdr->front_len); 1049 struct ceph_msg *m = NULL; 1050 1051 *skip = 0; 1052 1053 switch (type) { 1054 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1055 m = ceph_msg_get(monc->m_subscribe_ack); 1056 break; 1057 case CEPH_MSG_STATFS_REPLY: 1058 return get_generic_reply(con, hdr, skip); 1059 case CEPH_MSG_AUTH_REPLY: 1060 m = ceph_msg_get(monc->m_auth_reply); 1061 break; 1062 case CEPH_MSG_MON_GET_VERSION_REPLY: 1063 if (le64_to_cpu(hdr->tid) != 0) 1064 return get_generic_reply(con, hdr, skip); 1065 1066 /* 1067 * Older OSDs don't set reply tid even if the orignal 1068 * request had a non-zero tid. Workaround this weirdness 1069 * by falling through to the allocate case. 1070 */ 1071 case CEPH_MSG_MON_MAP: 1072 case CEPH_MSG_MDS_MAP: 1073 case CEPH_MSG_OSD_MAP: 1074 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1075 if (!m) 1076 return NULL; /* ENOMEM--return skip == 0 */ 1077 break; 1078 } 1079 1080 if (!m) { 1081 pr_info("alloc_msg unknown type %d\n", type); 1082 *skip = 1; 1083 } else if (front_len > m->front_alloc_len) { 1084 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n", 1085 front_len, m->front_alloc_len, 1086 (unsigned int)con->peer_name.type, 1087 le64_to_cpu(con->peer_name.num)); 1088 ceph_msg_put(m); 1089 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1090 } 1091 1092 return m; 1093 } 1094 1095 /* 1096 * If the monitor connection resets, pick a new monitor and resubmit 1097 * any pending requests. 1098 */ 1099 static void mon_fault(struct ceph_connection *con) 1100 { 1101 struct ceph_mon_client *monc = con->private; 1102 1103 if (!monc) 1104 return; 1105 1106 dout("mon_fault\n"); 1107 mutex_lock(&monc->mutex); 1108 if (!con->private) 1109 goto out; 1110 1111 if (!monc->hunting) 1112 pr_info("mon%d %s session lost, " 1113 "hunting for new mon\n", monc->cur_mon, 1114 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 1115 1116 __close_session(monc); 1117 if (!monc->hunting) { 1118 /* start hunting */ 1119 monc->hunting = true; 1120 __open_session(monc); 1121 } else { 1122 /* already hunting, let's wait a bit */ 1123 __schedule_delayed(monc); 1124 } 1125 out: 1126 mutex_unlock(&monc->mutex); 1127 } 1128 1129 /* 1130 * We can ignore refcounting on the connection struct, as all references 1131 * will come from the messenger workqueue, which is drained prior to 1132 * mon_client destruction. 1133 */ 1134 static struct ceph_connection *con_get(struct ceph_connection *con) 1135 { 1136 return con; 1137 } 1138 1139 static void con_put(struct ceph_connection *con) 1140 { 1141 } 1142 1143 static const struct ceph_connection_operations mon_con_ops = { 1144 .get = con_get, 1145 .put = con_put, 1146 .dispatch = dispatch, 1147 .fault = mon_fault, 1148 .alloc_msg = mon_alloc_msg, 1149 }; 1150