1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/types.h> 5 #include <linux/slab.h> 6 #include <linux/random.h> 7 #include <linux/sched.h> 8 9 #include <linux/ceph/mon_client.h> 10 #include <linux/ceph/libceph.h> 11 #include <linux/ceph/debugfs.h> 12 #include <linux/ceph/decode.h> 13 #include <linux/ceph/auth.h> 14 15 /* 16 * Interact with Ceph monitor cluster. Handle requests for new map 17 * versions, and periodically resend as needed. Also implement 18 * statfs() and umount(). 19 * 20 * A small cluster of Ceph "monitors" are responsible for managing critical 21 * cluster configuration and state information. An odd number (e.g., 3, 5) 22 * of cmon daemons use a modified version of the Paxos part-time parliament 23 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 24 * list of clients who have mounted the file system. 25 * 26 * We maintain an open, active session with a monitor at all times in order to 27 * receive timely MDSMap updates. We periodically send a keepalive byte on the 28 * TCP socket to ensure we detect a failure. If the connection does break, we 29 * randomly hunt for a new monitor. Once the connection is reestablished, we 30 * resend any outstanding requests. 31 */ 32 33 static const struct ceph_connection_operations mon_con_ops; 34 35 static int __validate_auth(struct ceph_mon_client *monc); 36 37 /* 38 * Decode a monmap blob (e.g., during mount). 39 */ 40 struct ceph_monmap *ceph_monmap_decode(void *p, void *end) 41 { 42 struct ceph_monmap *m = NULL; 43 int i, err = -EINVAL; 44 struct ceph_fsid fsid; 45 u32 epoch, num_mon; 46 u32 len; 47 48 ceph_decode_32_safe(&p, end, len, bad); 49 ceph_decode_need(&p, end, len, bad); 50 51 dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p)); 52 p += sizeof(u16); /* skip version */ 53 54 ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); 55 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 56 epoch = ceph_decode_32(&p); 57 58 num_mon = ceph_decode_32(&p); 59 ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad); 60 61 if (num_mon >= CEPH_MAX_MON) 62 goto bad; 63 m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS); 64 if (m == NULL) 65 return ERR_PTR(-ENOMEM); 66 m->fsid = fsid; 67 m->epoch = epoch; 68 m->num_mon = num_mon; 69 ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0])); 70 for (i = 0; i < num_mon; i++) 71 ceph_decode_addr(&m->mon_inst[i].addr); 72 73 dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, 74 m->num_mon); 75 for (i = 0; i < m->num_mon; i++) 76 dout("monmap_decode mon%d is %s\n", i, 77 ceph_pr_addr(&m->mon_inst[i].addr.in_addr)); 78 return m; 79 80 bad: 81 dout("monmap_decode failed with %d\n", err); 82 kfree(m); 83 return ERR_PTR(err); 84 } 85 86 /* 87 * return true if *addr is included in the monmap. 88 */ 89 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 90 { 91 int i; 92 93 for (i = 0; i < m->num_mon; i++) 94 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0) 95 return 1; 96 return 0; 97 } 98 99 /* 100 * Send an auth request. 101 */ 102 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 103 { 104 monc->pending_auth = 1; 105 monc->m_auth->front.iov_len = len; 106 monc->m_auth->hdr.front_len = cpu_to_le32(len); 107 ceph_msg_revoke(monc->m_auth); 108 ceph_msg_get(monc->m_auth); /* keep our ref */ 109 ceph_con_send(&monc->con, monc->m_auth); 110 } 111 112 /* 113 * Close monitor session, if any. 114 */ 115 static void __close_session(struct ceph_mon_client *monc) 116 { 117 dout("__close_session closing mon%d\n", monc->cur_mon); 118 ceph_msg_revoke(monc->m_auth); 119 ceph_msg_revoke_incoming(monc->m_auth_reply); 120 ceph_msg_revoke(monc->m_subscribe); 121 ceph_msg_revoke_incoming(monc->m_subscribe_ack); 122 ceph_con_close(&monc->con); 123 124 monc->pending_auth = 0; 125 ceph_auth_reset(monc->auth); 126 } 127 128 /* 129 * Pick a new monitor at random and set cur_mon. If we are repicking 130 * (i.e. cur_mon is already set), be sure to pick a different one. 131 */ 132 static void pick_new_mon(struct ceph_mon_client *monc) 133 { 134 int old_mon = monc->cur_mon; 135 136 BUG_ON(monc->monmap->num_mon < 1); 137 138 if (monc->monmap->num_mon == 1) { 139 monc->cur_mon = 0; 140 } else { 141 int max = monc->monmap->num_mon; 142 int o = -1; 143 int n; 144 145 if (monc->cur_mon >= 0) { 146 if (monc->cur_mon < monc->monmap->num_mon) 147 o = monc->cur_mon; 148 if (o >= 0) 149 max--; 150 } 151 152 n = prandom_u32() % max; 153 if (o >= 0 && n >= o) 154 n++; 155 156 monc->cur_mon = n; 157 } 158 159 dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon, 160 monc->cur_mon, monc->monmap->num_mon); 161 } 162 163 /* 164 * Open a session with a new monitor. 165 */ 166 static void __open_session(struct ceph_mon_client *monc) 167 { 168 int ret; 169 170 pick_new_mon(monc); 171 172 monc->hunting = true; 173 if (monc->had_a_connection) { 174 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF; 175 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT) 176 monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT; 177 } 178 179 monc->sub_renew_after = jiffies; /* i.e., expired */ 180 monc->sub_renew_sent = 0; 181 182 dout("%s opening mon%d\n", __func__, monc->cur_mon); 183 ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon, 184 &monc->monmap->mon_inst[monc->cur_mon].addr); 185 186 /* 187 * send an initial keepalive to ensure our timestamp is valid 188 * by the time we are in an OPENED state 189 */ 190 ceph_con_keepalive(&monc->con); 191 192 /* initiate authentication handshake */ 193 ret = ceph_auth_build_hello(monc->auth, 194 monc->m_auth->front.iov_base, 195 monc->m_auth->front_alloc_len); 196 BUG_ON(ret <= 0); 197 __send_prepared_auth_request(monc, ret); 198 } 199 200 static void reopen_session(struct ceph_mon_client *monc) 201 { 202 if (!monc->hunting) 203 pr_info("mon%d %s session lost, hunting for new mon\n", 204 monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr)); 205 206 __close_session(monc); 207 __open_session(monc); 208 } 209 210 /* 211 * Reschedule delayed work timer. 212 */ 213 static void __schedule_delayed(struct ceph_mon_client *monc) 214 { 215 unsigned long delay; 216 217 if (monc->hunting) 218 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult; 219 else 220 delay = CEPH_MONC_PING_INTERVAL; 221 222 dout("__schedule_delayed after %lu\n", delay); 223 mod_delayed_work(system_wq, &monc->delayed_work, 224 round_jiffies_relative(delay)); 225 } 226 227 const char *ceph_sub_str[] = { 228 [CEPH_SUB_MONMAP] = "monmap", 229 [CEPH_SUB_OSDMAP] = "osdmap", 230 [CEPH_SUB_FSMAP] = "fsmap.user", 231 [CEPH_SUB_MDSMAP] = "mdsmap", 232 }; 233 234 /* 235 * Send subscribe request for one or more maps, according to 236 * monc->subs. 237 */ 238 static void __send_subscribe(struct ceph_mon_client *monc) 239 { 240 struct ceph_msg *msg = monc->m_subscribe; 241 void *p = msg->front.iov_base; 242 void *const end = p + msg->front_alloc_len; 243 int num = 0; 244 int i; 245 246 dout("%s sent %lu\n", __func__, monc->sub_renew_sent); 247 248 BUG_ON(monc->cur_mon < 0); 249 250 if (!monc->sub_renew_sent) 251 monc->sub_renew_sent = jiffies | 1; /* never 0 */ 252 253 msg->hdr.version = cpu_to_le16(2); 254 255 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 256 if (monc->subs[i].want) 257 num++; 258 } 259 BUG_ON(num < 1); /* monmap sub is always there */ 260 ceph_encode_32(&p, num); 261 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 262 char buf[32]; 263 int len; 264 265 if (!monc->subs[i].want) 266 continue; 267 268 len = sprintf(buf, "%s", ceph_sub_str[i]); 269 if (i == CEPH_SUB_MDSMAP && 270 monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE) 271 len += sprintf(buf + len, ".%d", monc->fs_cluster_id); 272 273 dout("%s %s start %llu flags 0x%x\n", __func__, buf, 274 le64_to_cpu(monc->subs[i].item.start), 275 monc->subs[i].item.flags); 276 ceph_encode_string(&p, end, buf, len); 277 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item)); 278 p += sizeof(monc->subs[i].item); 279 } 280 281 BUG_ON(p > end); 282 msg->front.iov_len = p - msg->front.iov_base; 283 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 284 ceph_msg_revoke(msg); 285 ceph_con_send(&monc->con, ceph_msg_get(msg)); 286 } 287 288 static void handle_subscribe_ack(struct ceph_mon_client *monc, 289 struct ceph_msg *msg) 290 { 291 unsigned int seconds; 292 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 293 294 if (msg->front.iov_len < sizeof(*h)) 295 goto bad; 296 seconds = le32_to_cpu(h->duration); 297 298 mutex_lock(&monc->mutex); 299 if (monc->sub_renew_sent) { 300 monc->sub_renew_after = monc->sub_renew_sent + 301 (seconds >> 1) * HZ - 1; 302 dout("%s sent %lu duration %d renew after %lu\n", __func__, 303 monc->sub_renew_sent, seconds, monc->sub_renew_after); 304 monc->sub_renew_sent = 0; 305 } else { 306 dout("%s sent %lu renew after %lu, ignoring\n", __func__, 307 monc->sub_renew_sent, monc->sub_renew_after); 308 } 309 mutex_unlock(&monc->mutex); 310 return; 311 bad: 312 pr_err("got corrupt subscribe-ack msg\n"); 313 ceph_msg_dump(msg); 314 } 315 316 /* 317 * Register interest in a map 318 * 319 * @sub: one of CEPH_SUB_* 320 * @epoch: X for "every map since X", or 0 for "just the latest" 321 */ 322 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub, 323 u32 epoch, bool continuous) 324 { 325 __le64 start = cpu_to_le64(epoch); 326 u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0; 327 328 dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub], 329 epoch, continuous); 330 331 if (monc->subs[sub].want && 332 monc->subs[sub].item.start == start && 333 monc->subs[sub].item.flags == flags) 334 return false; 335 336 monc->subs[sub].item.start = start; 337 monc->subs[sub].item.flags = flags; 338 monc->subs[sub].want = true; 339 340 return true; 341 } 342 343 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch, 344 bool continuous) 345 { 346 bool need_request; 347 348 mutex_lock(&monc->mutex); 349 need_request = __ceph_monc_want_map(monc, sub, epoch, continuous); 350 mutex_unlock(&monc->mutex); 351 352 return need_request; 353 } 354 EXPORT_SYMBOL(ceph_monc_want_map); 355 356 /* 357 * Keep track of which maps we have 358 * 359 * @sub: one of CEPH_SUB_* 360 */ 361 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub, 362 u32 epoch) 363 { 364 dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch); 365 366 if (monc->subs[sub].want) { 367 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME) 368 monc->subs[sub].want = false; 369 else 370 monc->subs[sub].item.start = cpu_to_le64(epoch + 1); 371 } 372 373 monc->subs[sub].have = epoch; 374 } 375 376 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch) 377 { 378 mutex_lock(&monc->mutex); 379 __ceph_monc_got_map(monc, sub, epoch); 380 mutex_unlock(&monc->mutex); 381 } 382 EXPORT_SYMBOL(ceph_monc_got_map); 383 384 void ceph_monc_renew_subs(struct ceph_mon_client *monc) 385 { 386 mutex_lock(&monc->mutex); 387 __send_subscribe(monc); 388 mutex_unlock(&monc->mutex); 389 } 390 EXPORT_SYMBOL(ceph_monc_renew_subs); 391 392 /* 393 * Wait for an osdmap with a given epoch. 394 * 395 * @epoch: epoch to wait for 396 * @timeout: in jiffies, 0 means "wait forever" 397 */ 398 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, 399 unsigned long timeout) 400 { 401 unsigned long started = jiffies; 402 long ret; 403 404 mutex_lock(&monc->mutex); 405 while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) { 406 mutex_unlock(&monc->mutex); 407 408 if (timeout && time_after_eq(jiffies, started + timeout)) 409 return -ETIMEDOUT; 410 411 ret = wait_event_interruptible_timeout(monc->client->auth_wq, 412 monc->subs[CEPH_SUB_OSDMAP].have >= epoch, 413 ceph_timeout_jiffies(timeout)); 414 if (ret < 0) 415 return ret; 416 417 mutex_lock(&monc->mutex); 418 } 419 420 mutex_unlock(&monc->mutex); 421 return 0; 422 } 423 EXPORT_SYMBOL(ceph_monc_wait_osdmap); 424 425 /* 426 * Open a session with a random monitor. Request monmap and osdmap, 427 * which are waited upon in __ceph_open_session(). 428 */ 429 int ceph_monc_open_session(struct ceph_mon_client *monc) 430 { 431 mutex_lock(&monc->mutex); 432 __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true); 433 __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false); 434 __open_session(monc); 435 __schedule_delayed(monc); 436 mutex_unlock(&monc->mutex); 437 return 0; 438 } 439 EXPORT_SYMBOL(ceph_monc_open_session); 440 441 static void ceph_monc_handle_map(struct ceph_mon_client *monc, 442 struct ceph_msg *msg) 443 { 444 struct ceph_client *client = monc->client; 445 struct ceph_monmap *monmap = NULL, *old = monc->monmap; 446 void *p, *end; 447 448 mutex_lock(&monc->mutex); 449 450 dout("handle_monmap\n"); 451 p = msg->front.iov_base; 452 end = p + msg->front.iov_len; 453 454 monmap = ceph_monmap_decode(p, end); 455 if (IS_ERR(monmap)) { 456 pr_err("problem decoding monmap, %d\n", 457 (int)PTR_ERR(monmap)); 458 goto out; 459 } 460 461 if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) { 462 kfree(monmap); 463 goto out; 464 } 465 466 client->monc.monmap = monmap; 467 kfree(old); 468 469 __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch); 470 client->have_fsid = true; 471 472 out: 473 mutex_unlock(&monc->mutex); 474 wake_up_all(&client->auth_wq); 475 } 476 477 /* 478 * generic requests (currently statfs, mon_get_version) 479 */ 480 DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node) 481 482 static void release_generic_request(struct kref *kref) 483 { 484 struct ceph_mon_generic_request *req = 485 container_of(kref, struct ceph_mon_generic_request, kref); 486 487 dout("%s greq %p request %p reply %p\n", __func__, req, req->request, 488 req->reply); 489 WARN_ON(!RB_EMPTY_NODE(&req->node)); 490 491 if (req->reply) 492 ceph_msg_put(req->reply); 493 if (req->request) 494 ceph_msg_put(req->request); 495 496 kfree(req); 497 } 498 499 static void put_generic_request(struct ceph_mon_generic_request *req) 500 { 501 if (req) 502 kref_put(&req->kref, release_generic_request); 503 } 504 505 static void get_generic_request(struct ceph_mon_generic_request *req) 506 { 507 kref_get(&req->kref); 508 } 509 510 static struct ceph_mon_generic_request * 511 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp) 512 { 513 struct ceph_mon_generic_request *req; 514 515 req = kzalloc(sizeof(*req), gfp); 516 if (!req) 517 return NULL; 518 519 req->monc = monc; 520 kref_init(&req->kref); 521 RB_CLEAR_NODE(&req->node); 522 init_completion(&req->completion); 523 524 dout("%s greq %p\n", __func__, req); 525 return req; 526 } 527 528 static void register_generic_request(struct ceph_mon_generic_request *req) 529 { 530 struct ceph_mon_client *monc = req->monc; 531 532 WARN_ON(req->tid); 533 534 get_generic_request(req); 535 req->tid = ++monc->last_tid; 536 insert_generic_request(&monc->generic_request_tree, req); 537 } 538 539 static void send_generic_request(struct ceph_mon_client *monc, 540 struct ceph_mon_generic_request *req) 541 { 542 WARN_ON(!req->tid); 543 544 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 545 req->request->hdr.tid = cpu_to_le64(req->tid); 546 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 547 } 548 549 static void __finish_generic_request(struct ceph_mon_generic_request *req) 550 { 551 struct ceph_mon_client *monc = req->monc; 552 553 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 554 erase_generic_request(&monc->generic_request_tree, req); 555 556 ceph_msg_revoke(req->request); 557 ceph_msg_revoke_incoming(req->reply); 558 } 559 560 static void finish_generic_request(struct ceph_mon_generic_request *req) 561 { 562 __finish_generic_request(req); 563 put_generic_request(req); 564 } 565 566 static void complete_generic_request(struct ceph_mon_generic_request *req) 567 { 568 if (req->complete_cb) 569 req->complete_cb(req); 570 else 571 complete_all(&req->completion); 572 put_generic_request(req); 573 } 574 575 static void cancel_generic_request(struct ceph_mon_generic_request *req) 576 { 577 struct ceph_mon_client *monc = req->monc; 578 struct ceph_mon_generic_request *lookup_req; 579 580 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 581 582 mutex_lock(&monc->mutex); 583 lookup_req = lookup_generic_request(&monc->generic_request_tree, 584 req->tid); 585 if (lookup_req) { 586 WARN_ON(lookup_req != req); 587 finish_generic_request(req); 588 } 589 590 mutex_unlock(&monc->mutex); 591 } 592 593 static int wait_generic_request(struct ceph_mon_generic_request *req) 594 { 595 int ret; 596 597 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 598 ret = wait_for_completion_interruptible(&req->completion); 599 if (ret) 600 cancel_generic_request(req); 601 else 602 ret = req->result; /* completed */ 603 604 return ret; 605 } 606 607 static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 608 struct ceph_msg_header *hdr, 609 int *skip) 610 { 611 struct ceph_mon_client *monc = con->private; 612 struct ceph_mon_generic_request *req; 613 u64 tid = le64_to_cpu(hdr->tid); 614 struct ceph_msg *m; 615 616 mutex_lock(&monc->mutex); 617 req = lookup_generic_request(&monc->generic_request_tree, tid); 618 if (!req) { 619 dout("get_generic_reply %lld dne\n", tid); 620 *skip = 1; 621 m = NULL; 622 } else { 623 dout("get_generic_reply %lld got %p\n", tid, req->reply); 624 *skip = 0; 625 m = ceph_msg_get(req->reply); 626 /* 627 * we don't need to track the connection reading into 628 * this reply because we only have one open connection 629 * at a time, ever. 630 */ 631 } 632 mutex_unlock(&monc->mutex); 633 return m; 634 } 635 636 /* 637 * statfs 638 */ 639 static void handle_statfs_reply(struct ceph_mon_client *monc, 640 struct ceph_msg *msg) 641 { 642 struct ceph_mon_generic_request *req; 643 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 644 u64 tid = le64_to_cpu(msg->hdr.tid); 645 646 dout("%s msg %p tid %llu\n", __func__, msg, tid); 647 648 if (msg->front.iov_len != sizeof(*reply)) 649 goto bad; 650 651 mutex_lock(&monc->mutex); 652 req = lookup_generic_request(&monc->generic_request_tree, tid); 653 if (!req) { 654 mutex_unlock(&monc->mutex); 655 return; 656 } 657 658 req->result = 0; 659 *req->u.st = reply->st; /* struct */ 660 __finish_generic_request(req); 661 mutex_unlock(&monc->mutex); 662 663 complete_generic_request(req); 664 return; 665 666 bad: 667 pr_err("corrupt statfs reply, tid %llu\n", tid); 668 ceph_msg_dump(msg); 669 } 670 671 /* 672 * Do a synchronous statfs(). 673 */ 674 int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) 675 { 676 struct ceph_mon_generic_request *req; 677 struct ceph_mon_statfs *h; 678 int ret = -ENOMEM; 679 680 req = alloc_generic_request(monc, GFP_NOFS); 681 if (!req) 682 goto out; 683 684 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, 685 true); 686 if (!req->request) 687 goto out; 688 689 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true); 690 if (!req->reply) 691 goto out; 692 693 req->u.st = buf; 694 695 mutex_lock(&monc->mutex); 696 register_generic_request(req); 697 /* fill out request */ 698 h = req->request->front.iov_base; 699 h->monhdr.have_version = 0; 700 h->monhdr.session_mon = cpu_to_le16(-1); 701 h->monhdr.session_mon_tid = 0; 702 h->fsid = monc->monmap->fsid; 703 send_generic_request(monc, req); 704 mutex_unlock(&monc->mutex); 705 706 ret = wait_generic_request(req); 707 out: 708 put_generic_request(req); 709 return ret; 710 } 711 EXPORT_SYMBOL(ceph_monc_do_statfs); 712 713 static void handle_get_version_reply(struct ceph_mon_client *monc, 714 struct ceph_msg *msg) 715 { 716 struct ceph_mon_generic_request *req; 717 u64 tid = le64_to_cpu(msg->hdr.tid); 718 void *p = msg->front.iov_base; 719 void *end = p + msg->front_alloc_len; 720 u64 handle; 721 722 dout("%s msg %p tid %llu\n", __func__, msg, tid); 723 724 ceph_decode_need(&p, end, 2*sizeof(u64), bad); 725 handle = ceph_decode_64(&p); 726 if (tid != 0 && tid != handle) 727 goto bad; 728 729 mutex_lock(&monc->mutex); 730 req = lookup_generic_request(&monc->generic_request_tree, handle); 731 if (!req) { 732 mutex_unlock(&monc->mutex); 733 return; 734 } 735 736 req->result = 0; 737 req->u.newest = ceph_decode_64(&p); 738 __finish_generic_request(req); 739 mutex_unlock(&monc->mutex); 740 741 complete_generic_request(req); 742 return; 743 744 bad: 745 pr_err("corrupt mon_get_version reply, tid %llu\n", tid); 746 ceph_msg_dump(msg); 747 } 748 749 static struct ceph_mon_generic_request * 750 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 751 ceph_monc_callback_t cb, u64 private_data) 752 { 753 struct ceph_mon_generic_request *req; 754 755 req = alloc_generic_request(monc, GFP_NOIO); 756 if (!req) 757 goto err_put_req; 758 759 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, 760 sizeof(u64) + sizeof(u32) + strlen(what), 761 GFP_NOIO, true); 762 if (!req->request) 763 goto err_put_req; 764 765 req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO, 766 true); 767 if (!req->reply) 768 goto err_put_req; 769 770 req->complete_cb = cb; 771 req->private_data = private_data; 772 773 mutex_lock(&monc->mutex); 774 register_generic_request(req); 775 { 776 void *p = req->request->front.iov_base; 777 void *const end = p + req->request->front_alloc_len; 778 779 ceph_encode_64(&p, req->tid); /* handle */ 780 ceph_encode_string(&p, end, what, strlen(what)); 781 WARN_ON(p != end); 782 } 783 send_generic_request(monc, req); 784 mutex_unlock(&monc->mutex); 785 786 return req; 787 788 err_put_req: 789 put_generic_request(req); 790 return ERR_PTR(-ENOMEM); 791 } 792 793 /* 794 * Send MMonGetVersion and wait for the reply. 795 * 796 * @what: one of "mdsmap", "osdmap" or "monmap" 797 */ 798 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 799 u64 *newest) 800 { 801 struct ceph_mon_generic_request *req; 802 int ret; 803 804 req = __ceph_monc_get_version(monc, what, NULL, 0); 805 if (IS_ERR(req)) 806 return PTR_ERR(req); 807 808 ret = wait_generic_request(req); 809 if (!ret) 810 *newest = req->u.newest; 811 812 put_generic_request(req); 813 return ret; 814 } 815 EXPORT_SYMBOL(ceph_monc_get_version); 816 817 /* 818 * Send MMonGetVersion, 819 * 820 * @what: one of "mdsmap", "osdmap" or "monmap" 821 */ 822 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, 823 ceph_monc_callback_t cb, u64 private_data) 824 { 825 struct ceph_mon_generic_request *req; 826 827 req = __ceph_monc_get_version(monc, what, cb, private_data); 828 if (IS_ERR(req)) 829 return PTR_ERR(req); 830 831 put_generic_request(req); 832 return 0; 833 } 834 EXPORT_SYMBOL(ceph_monc_get_version_async); 835 836 static void handle_command_ack(struct ceph_mon_client *monc, 837 struct ceph_msg *msg) 838 { 839 struct ceph_mon_generic_request *req; 840 void *p = msg->front.iov_base; 841 void *const end = p + msg->front_alloc_len; 842 u64 tid = le64_to_cpu(msg->hdr.tid); 843 844 dout("%s msg %p tid %llu\n", __func__, msg, tid); 845 846 ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) + 847 sizeof(u32), bad); 848 p += sizeof(struct ceph_mon_request_header); 849 850 mutex_lock(&monc->mutex); 851 req = lookup_generic_request(&monc->generic_request_tree, tid); 852 if (!req) { 853 mutex_unlock(&monc->mutex); 854 return; 855 } 856 857 req->result = ceph_decode_32(&p); 858 __finish_generic_request(req); 859 mutex_unlock(&monc->mutex); 860 861 complete_generic_request(req); 862 return; 863 864 bad: 865 pr_err("corrupt mon_command ack, tid %llu\n", tid); 866 ceph_msg_dump(msg); 867 } 868 869 int ceph_monc_blacklist_add(struct ceph_mon_client *monc, 870 struct ceph_entity_addr *client_addr) 871 { 872 struct ceph_mon_generic_request *req; 873 struct ceph_mon_command *h; 874 int ret = -ENOMEM; 875 int len; 876 877 req = alloc_generic_request(monc, GFP_NOIO); 878 if (!req) 879 goto out; 880 881 req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true); 882 if (!req->request) 883 goto out; 884 885 req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO, 886 true); 887 if (!req->reply) 888 goto out; 889 890 mutex_lock(&monc->mutex); 891 register_generic_request(req); 892 h = req->request->front.iov_base; 893 h->monhdr.have_version = 0; 894 h->monhdr.session_mon = cpu_to_le16(-1); 895 h->monhdr.session_mon_tid = 0; 896 h->fsid = monc->monmap->fsid; 897 h->num_strs = cpu_to_le32(1); 898 len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \ 899 \"blacklistop\": \"add\", \ 900 \"addr\": \"%pISpc/%u\" }", 901 &client_addr->in_addr, le32_to_cpu(client_addr->nonce)); 902 h->str_len = cpu_to_le32(len); 903 send_generic_request(monc, req); 904 mutex_unlock(&monc->mutex); 905 906 ret = wait_generic_request(req); 907 out: 908 put_generic_request(req); 909 return ret; 910 } 911 EXPORT_SYMBOL(ceph_monc_blacklist_add); 912 913 /* 914 * Resend pending generic requests. 915 */ 916 static void __resend_generic_request(struct ceph_mon_client *monc) 917 { 918 struct ceph_mon_generic_request *req; 919 struct rb_node *p; 920 921 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 922 req = rb_entry(p, struct ceph_mon_generic_request, node); 923 ceph_msg_revoke(req->request); 924 ceph_msg_revoke_incoming(req->reply); 925 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 926 } 927 } 928 929 /* 930 * Delayed work. If we haven't mounted yet, retry. Otherwise, 931 * renew/retry subscription as needed (in case it is timing out, or we 932 * got an ENOMEM). And keep the monitor connection alive. 933 */ 934 static void delayed_work(struct work_struct *work) 935 { 936 struct ceph_mon_client *monc = 937 container_of(work, struct ceph_mon_client, delayed_work.work); 938 939 dout("monc delayed_work\n"); 940 mutex_lock(&monc->mutex); 941 if (monc->hunting) { 942 dout("%s continuing hunt\n", __func__); 943 reopen_session(monc); 944 } else { 945 int is_auth = ceph_auth_is_authenticated(monc->auth); 946 if (ceph_con_keepalive_expired(&monc->con, 947 CEPH_MONC_PING_TIMEOUT)) { 948 dout("monc keepalive timeout\n"); 949 is_auth = 0; 950 reopen_session(monc); 951 } 952 953 if (!monc->hunting) { 954 ceph_con_keepalive(&monc->con); 955 __validate_auth(monc); 956 } 957 958 if (is_auth) { 959 unsigned long now = jiffies; 960 961 dout("%s renew subs? now %lu renew after %lu\n", 962 __func__, now, monc->sub_renew_after); 963 if (time_after_eq(now, monc->sub_renew_after)) 964 __send_subscribe(monc); 965 } 966 } 967 __schedule_delayed(monc); 968 mutex_unlock(&monc->mutex); 969 } 970 971 /* 972 * On startup, we build a temporary monmap populated with the IPs 973 * provided by mount(2). 974 */ 975 static int build_initial_monmap(struct ceph_mon_client *monc) 976 { 977 struct ceph_options *opt = monc->client->options; 978 struct ceph_entity_addr *mon_addr = opt->mon_addr; 979 int num_mon = opt->num_mon; 980 int i; 981 982 /* build initial monmap */ 983 monc->monmap = kzalloc(sizeof(*monc->monmap) + 984 num_mon*sizeof(monc->monmap->mon_inst[0]), 985 GFP_KERNEL); 986 if (!monc->monmap) 987 return -ENOMEM; 988 for (i = 0; i < num_mon; i++) { 989 monc->monmap->mon_inst[i].addr = mon_addr[i]; 990 monc->monmap->mon_inst[i].addr.nonce = 0; 991 monc->monmap->mon_inst[i].name.type = 992 CEPH_ENTITY_TYPE_MON; 993 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i); 994 } 995 monc->monmap->num_mon = num_mon; 996 return 0; 997 } 998 999 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 1000 { 1001 int err = 0; 1002 1003 dout("init\n"); 1004 memset(monc, 0, sizeof(*monc)); 1005 monc->client = cl; 1006 monc->monmap = NULL; 1007 mutex_init(&monc->mutex); 1008 1009 err = build_initial_monmap(monc); 1010 if (err) 1011 goto out; 1012 1013 /* connection */ 1014 /* authentication */ 1015 monc->auth = ceph_auth_init(cl->options->name, 1016 cl->options->key); 1017 if (IS_ERR(monc->auth)) { 1018 err = PTR_ERR(monc->auth); 1019 goto out_monmap; 1020 } 1021 monc->auth->want_keys = 1022 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 1023 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 1024 1025 /* msgs */ 1026 err = -ENOMEM; 1027 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 1028 sizeof(struct ceph_mon_subscribe_ack), 1029 GFP_KERNEL, true); 1030 if (!monc->m_subscribe_ack) 1031 goto out_auth; 1032 1033 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128, 1034 GFP_KERNEL, true); 1035 if (!monc->m_subscribe) 1036 goto out_subscribe_ack; 1037 1038 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, 1039 GFP_KERNEL, true); 1040 if (!monc->m_auth_reply) 1041 goto out_subscribe; 1042 1043 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true); 1044 monc->pending_auth = 0; 1045 if (!monc->m_auth) 1046 goto out_auth_reply; 1047 1048 ceph_con_init(&monc->con, monc, &mon_con_ops, 1049 &monc->client->msgr); 1050 1051 monc->cur_mon = -1; 1052 monc->had_a_connection = false; 1053 monc->hunt_mult = 1; 1054 1055 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 1056 monc->generic_request_tree = RB_ROOT; 1057 monc->last_tid = 0; 1058 1059 monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE; 1060 1061 return 0; 1062 1063 out_auth_reply: 1064 ceph_msg_put(monc->m_auth_reply); 1065 out_subscribe: 1066 ceph_msg_put(monc->m_subscribe); 1067 out_subscribe_ack: 1068 ceph_msg_put(monc->m_subscribe_ack); 1069 out_auth: 1070 ceph_auth_destroy(monc->auth); 1071 out_monmap: 1072 kfree(monc->monmap); 1073 out: 1074 return err; 1075 } 1076 EXPORT_SYMBOL(ceph_monc_init); 1077 1078 void ceph_monc_stop(struct ceph_mon_client *monc) 1079 { 1080 dout("stop\n"); 1081 cancel_delayed_work_sync(&monc->delayed_work); 1082 1083 mutex_lock(&monc->mutex); 1084 __close_session(monc); 1085 monc->cur_mon = -1; 1086 mutex_unlock(&monc->mutex); 1087 1088 /* 1089 * flush msgr queue before we destroy ourselves to ensure that: 1090 * - any work that references our embedded con is finished. 1091 * - any osd_client or other work that may reference an authorizer 1092 * finishes before we shut down the auth subsystem. 1093 */ 1094 ceph_msgr_flush(); 1095 1096 ceph_auth_destroy(monc->auth); 1097 1098 WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree)); 1099 1100 ceph_msg_put(monc->m_auth); 1101 ceph_msg_put(monc->m_auth_reply); 1102 ceph_msg_put(monc->m_subscribe); 1103 ceph_msg_put(monc->m_subscribe_ack); 1104 1105 kfree(monc->monmap); 1106 } 1107 EXPORT_SYMBOL(ceph_monc_stop); 1108 1109 static void finish_hunting(struct ceph_mon_client *monc) 1110 { 1111 if (monc->hunting) { 1112 dout("%s found mon%d\n", __func__, monc->cur_mon); 1113 monc->hunting = false; 1114 monc->had_a_connection = true; 1115 monc->hunt_mult /= 2; /* reduce by 50% */ 1116 if (monc->hunt_mult < 1) 1117 monc->hunt_mult = 1; 1118 } 1119 } 1120 1121 static void handle_auth_reply(struct ceph_mon_client *monc, 1122 struct ceph_msg *msg) 1123 { 1124 int ret; 1125 int was_auth = 0; 1126 1127 mutex_lock(&monc->mutex); 1128 was_auth = ceph_auth_is_authenticated(monc->auth); 1129 monc->pending_auth = 0; 1130 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 1131 msg->front.iov_len, 1132 monc->m_auth->front.iov_base, 1133 monc->m_auth->front_alloc_len); 1134 if (ret > 0) { 1135 __send_prepared_auth_request(monc, ret); 1136 goto out; 1137 } 1138 1139 finish_hunting(monc); 1140 1141 if (ret < 0) { 1142 monc->client->auth_err = ret; 1143 } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) { 1144 dout("authenticated, starting session\n"); 1145 1146 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 1147 monc->client->msgr.inst.name.num = 1148 cpu_to_le64(monc->auth->global_id); 1149 1150 __send_subscribe(monc); 1151 __resend_generic_request(monc); 1152 1153 pr_info("mon%d %s session established\n", monc->cur_mon, 1154 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 1155 } 1156 1157 out: 1158 mutex_unlock(&monc->mutex); 1159 if (monc->client->auth_err < 0) 1160 wake_up_all(&monc->client->auth_wq); 1161 } 1162 1163 static int __validate_auth(struct ceph_mon_client *monc) 1164 { 1165 int ret; 1166 1167 if (monc->pending_auth) 1168 return 0; 1169 1170 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 1171 monc->m_auth->front_alloc_len); 1172 if (ret <= 0) 1173 return ret; /* either an error, or no need to authenticate */ 1174 __send_prepared_auth_request(monc, ret); 1175 return 0; 1176 } 1177 1178 int ceph_monc_validate_auth(struct ceph_mon_client *monc) 1179 { 1180 int ret; 1181 1182 mutex_lock(&monc->mutex); 1183 ret = __validate_auth(monc); 1184 mutex_unlock(&monc->mutex); 1185 return ret; 1186 } 1187 EXPORT_SYMBOL(ceph_monc_validate_auth); 1188 1189 /* 1190 * handle incoming message 1191 */ 1192 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 1193 { 1194 struct ceph_mon_client *monc = con->private; 1195 int type = le16_to_cpu(msg->hdr.type); 1196 1197 if (!monc) 1198 return; 1199 1200 switch (type) { 1201 case CEPH_MSG_AUTH_REPLY: 1202 handle_auth_reply(monc, msg); 1203 break; 1204 1205 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1206 handle_subscribe_ack(monc, msg); 1207 break; 1208 1209 case CEPH_MSG_STATFS_REPLY: 1210 handle_statfs_reply(monc, msg); 1211 break; 1212 1213 case CEPH_MSG_MON_GET_VERSION_REPLY: 1214 handle_get_version_reply(monc, msg); 1215 break; 1216 1217 case CEPH_MSG_MON_COMMAND_ACK: 1218 handle_command_ack(monc, msg); 1219 break; 1220 1221 case CEPH_MSG_MON_MAP: 1222 ceph_monc_handle_map(monc, msg); 1223 break; 1224 1225 case CEPH_MSG_OSD_MAP: 1226 ceph_osdc_handle_map(&monc->client->osdc, msg); 1227 break; 1228 1229 default: 1230 /* can the chained handler handle it? */ 1231 if (monc->client->extra_mon_dispatch && 1232 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 1233 break; 1234 1235 pr_err("received unknown message type %d %s\n", type, 1236 ceph_msg_type_name(type)); 1237 } 1238 ceph_msg_put(msg); 1239 } 1240 1241 /* 1242 * Allocate memory for incoming message 1243 */ 1244 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 1245 struct ceph_msg_header *hdr, 1246 int *skip) 1247 { 1248 struct ceph_mon_client *monc = con->private; 1249 int type = le16_to_cpu(hdr->type); 1250 int front_len = le32_to_cpu(hdr->front_len); 1251 struct ceph_msg *m = NULL; 1252 1253 *skip = 0; 1254 1255 switch (type) { 1256 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1257 m = ceph_msg_get(monc->m_subscribe_ack); 1258 break; 1259 case CEPH_MSG_STATFS_REPLY: 1260 case CEPH_MSG_MON_COMMAND_ACK: 1261 return get_generic_reply(con, hdr, skip); 1262 case CEPH_MSG_AUTH_REPLY: 1263 m = ceph_msg_get(monc->m_auth_reply); 1264 break; 1265 case CEPH_MSG_MON_GET_VERSION_REPLY: 1266 if (le64_to_cpu(hdr->tid) != 0) 1267 return get_generic_reply(con, hdr, skip); 1268 1269 /* 1270 * Older OSDs don't set reply tid even if the orignal 1271 * request had a non-zero tid. Workaround this weirdness 1272 * by falling through to the allocate case. 1273 */ 1274 case CEPH_MSG_MON_MAP: 1275 case CEPH_MSG_MDS_MAP: 1276 case CEPH_MSG_OSD_MAP: 1277 case CEPH_MSG_FS_MAP_USER: 1278 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1279 if (!m) 1280 return NULL; /* ENOMEM--return skip == 0 */ 1281 break; 1282 } 1283 1284 if (!m) { 1285 pr_info("alloc_msg unknown type %d\n", type); 1286 *skip = 1; 1287 } else if (front_len > m->front_alloc_len) { 1288 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n", 1289 front_len, m->front_alloc_len, 1290 (unsigned int)con->peer_name.type, 1291 le64_to_cpu(con->peer_name.num)); 1292 ceph_msg_put(m); 1293 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1294 } 1295 1296 return m; 1297 } 1298 1299 /* 1300 * If the monitor connection resets, pick a new monitor and resubmit 1301 * any pending requests. 1302 */ 1303 static void mon_fault(struct ceph_connection *con) 1304 { 1305 struct ceph_mon_client *monc = con->private; 1306 1307 mutex_lock(&monc->mutex); 1308 dout("%s mon%d\n", __func__, monc->cur_mon); 1309 if (monc->cur_mon >= 0) { 1310 if (!monc->hunting) { 1311 dout("%s hunting for new mon\n", __func__); 1312 reopen_session(monc); 1313 __schedule_delayed(monc); 1314 } else { 1315 dout("%s already hunting\n", __func__); 1316 } 1317 } 1318 mutex_unlock(&monc->mutex); 1319 } 1320 1321 /* 1322 * We can ignore refcounting on the connection struct, as all references 1323 * will come from the messenger workqueue, which is drained prior to 1324 * mon_client destruction. 1325 */ 1326 static struct ceph_connection *con_get(struct ceph_connection *con) 1327 { 1328 return con; 1329 } 1330 1331 static void con_put(struct ceph_connection *con) 1332 { 1333 } 1334 1335 static const struct ceph_connection_operations mon_con_ops = { 1336 .get = con_get, 1337 .put = con_put, 1338 .dispatch = dispatch, 1339 .fault = mon_fault, 1340 .alloc_msg = mon_alloc_msg, 1341 }; 1342