1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/types.h> 5 #include <linux/slab.h> 6 #include <linux/random.h> 7 #include <linux/sched.h> 8 9 #include <linux/ceph/ceph_features.h> 10 #include <linux/ceph/mon_client.h> 11 #include <linux/ceph/libceph.h> 12 #include <linux/ceph/debugfs.h> 13 #include <linux/ceph/decode.h> 14 #include <linux/ceph/auth.h> 15 16 /* 17 * Interact with Ceph monitor cluster. Handle requests for new map 18 * versions, and periodically resend as needed. Also implement 19 * statfs() and umount(). 20 * 21 * A small cluster of Ceph "monitors" are responsible for managing critical 22 * cluster configuration and state information. An odd number (e.g., 3, 5) 23 * of cmon daemons use a modified version of the Paxos part-time parliament 24 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 25 * list of clients who have mounted the file system. 26 * 27 * We maintain an open, active session with a monitor at all times in order to 28 * receive timely MDSMap updates. We periodically send a keepalive byte on the 29 * TCP socket to ensure we detect a failure. If the connection does break, we 30 * randomly hunt for a new monitor. Once the connection is reestablished, we 31 * resend any outstanding requests. 32 */ 33 34 static const struct ceph_connection_operations mon_con_ops; 35 36 static int __validate_auth(struct ceph_mon_client *monc); 37 38 /* 39 * Decode a monmap blob (e.g., during mount). 40 */ 41 struct ceph_monmap *ceph_monmap_decode(void *p, void *end) 42 { 43 struct ceph_monmap *m = NULL; 44 int i, err = -EINVAL; 45 struct ceph_fsid fsid; 46 u32 epoch, num_mon; 47 u32 len; 48 49 ceph_decode_32_safe(&p, end, len, bad); 50 ceph_decode_need(&p, end, len, bad); 51 52 dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p)); 53 p += sizeof(u16); /* skip version */ 54 55 ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); 56 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 57 epoch = ceph_decode_32(&p); 58 59 num_mon = ceph_decode_32(&p); 60 ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad); 61 62 if (num_mon >= CEPH_MAX_MON) 63 goto bad; 64 m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS); 65 if (m == NULL) 66 return ERR_PTR(-ENOMEM); 67 m->fsid = fsid; 68 m->epoch = epoch; 69 m->num_mon = num_mon; 70 ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0])); 71 for (i = 0; i < num_mon; i++) 72 ceph_decode_addr(&m->mon_inst[i].addr); 73 74 dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, 75 m->num_mon); 76 for (i = 0; i < m->num_mon; i++) 77 dout("monmap_decode mon%d is %s\n", i, 78 ceph_pr_addr(&m->mon_inst[i].addr.in_addr)); 79 return m; 80 81 bad: 82 dout("monmap_decode failed with %d\n", err); 83 kfree(m); 84 return ERR_PTR(err); 85 } 86 87 /* 88 * return true if *addr is included in the monmap. 89 */ 90 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 91 { 92 int i; 93 94 for (i = 0; i < m->num_mon; i++) 95 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0) 96 return 1; 97 return 0; 98 } 99 100 /* 101 * Send an auth request. 102 */ 103 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 104 { 105 monc->pending_auth = 1; 106 monc->m_auth->front.iov_len = len; 107 monc->m_auth->hdr.front_len = cpu_to_le32(len); 108 ceph_msg_revoke(monc->m_auth); 109 ceph_msg_get(monc->m_auth); /* keep our ref */ 110 ceph_con_send(&monc->con, monc->m_auth); 111 } 112 113 /* 114 * Close monitor session, if any. 115 */ 116 static void __close_session(struct ceph_mon_client *monc) 117 { 118 dout("__close_session closing mon%d\n", monc->cur_mon); 119 ceph_msg_revoke(monc->m_auth); 120 ceph_msg_revoke_incoming(monc->m_auth_reply); 121 ceph_msg_revoke(monc->m_subscribe); 122 ceph_msg_revoke_incoming(monc->m_subscribe_ack); 123 ceph_con_close(&monc->con); 124 125 monc->pending_auth = 0; 126 ceph_auth_reset(monc->auth); 127 } 128 129 /* 130 * Pick a new monitor at random and set cur_mon. If we are repicking 131 * (i.e. cur_mon is already set), be sure to pick a different one. 132 */ 133 static void pick_new_mon(struct ceph_mon_client *monc) 134 { 135 int old_mon = monc->cur_mon; 136 137 BUG_ON(monc->monmap->num_mon < 1); 138 139 if (monc->monmap->num_mon == 1) { 140 monc->cur_mon = 0; 141 } else { 142 int max = monc->monmap->num_mon; 143 int o = -1; 144 int n; 145 146 if (monc->cur_mon >= 0) { 147 if (monc->cur_mon < monc->monmap->num_mon) 148 o = monc->cur_mon; 149 if (o >= 0) 150 max--; 151 } 152 153 n = prandom_u32() % max; 154 if (o >= 0 && n >= o) 155 n++; 156 157 monc->cur_mon = n; 158 } 159 160 dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon, 161 monc->cur_mon, monc->monmap->num_mon); 162 } 163 164 /* 165 * Open a session with a new monitor. 166 */ 167 static void __open_session(struct ceph_mon_client *monc) 168 { 169 int ret; 170 171 pick_new_mon(monc); 172 173 monc->hunting = true; 174 if (monc->had_a_connection) { 175 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF; 176 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT) 177 monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT; 178 } 179 180 monc->sub_renew_after = jiffies; /* i.e., expired */ 181 monc->sub_renew_sent = 0; 182 183 dout("%s opening mon%d\n", __func__, monc->cur_mon); 184 ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon, 185 &monc->monmap->mon_inst[monc->cur_mon].addr); 186 187 /* 188 * send an initial keepalive to ensure our timestamp is valid 189 * by the time we are in an OPENED state 190 */ 191 ceph_con_keepalive(&monc->con); 192 193 /* initiate authentication handshake */ 194 ret = ceph_auth_build_hello(monc->auth, 195 monc->m_auth->front.iov_base, 196 monc->m_auth->front_alloc_len); 197 BUG_ON(ret <= 0); 198 __send_prepared_auth_request(monc, ret); 199 } 200 201 static void reopen_session(struct ceph_mon_client *monc) 202 { 203 if (!monc->hunting) 204 pr_info("mon%d %s session lost, hunting for new mon\n", 205 monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr.in_addr)); 206 207 __close_session(monc); 208 __open_session(monc); 209 } 210 211 /* 212 * Reschedule delayed work timer. 213 */ 214 static void __schedule_delayed(struct ceph_mon_client *monc) 215 { 216 unsigned long delay; 217 218 if (monc->hunting) 219 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult; 220 else 221 delay = CEPH_MONC_PING_INTERVAL; 222 223 dout("__schedule_delayed after %lu\n", delay); 224 mod_delayed_work(system_wq, &monc->delayed_work, 225 round_jiffies_relative(delay)); 226 } 227 228 const char *ceph_sub_str[] = { 229 [CEPH_SUB_MONMAP] = "monmap", 230 [CEPH_SUB_OSDMAP] = "osdmap", 231 [CEPH_SUB_FSMAP] = "fsmap.user", 232 [CEPH_SUB_MDSMAP] = "mdsmap", 233 }; 234 235 /* 236 * Send subscribe request for one or more maps, according to 237 * monc->subs. 238 */ 239 static void __send_subscribe(struct ceph_mon_client *monc) 240 { 241 struct ceph_msg *msg = monc->m_subscribe; 242 void *p = msg->front.iov_base; 243 void *const end = p + msg->front_alloc_len; 244 int num = 0; 245 int i; 246 247 dout("%s sent %lu\n", __func__, monc->sub_renew_sent); 248 249 BUG_ON(monc->cur_mon < 0); 250 251 if (!monc->sub_renew_sent) 252 monc->sub_renew_sent = jiffies | 1; /* never 0 */ 253 254 msg->hdr.version = cpu_to_le16(2); 255 256 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 257 if (monc->subs[i].want) 258 num++; 259 } 260 BUG_ON(num < 1); /* monmap sub is always there */ 261 ceph_encode_32(&p, num); 262 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 263 char buf[32]; 264 int len; 265 266 if (!monc->subs[i].want) 267 continue; 268 269 len = sprintf(buf, "%s", ceph_sub_str[i]); 270 if (i == CEPH_SUB_MDSMAP && 271 monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE) 272 len += sprintf(buf + len, ".%d", monc->fs_cluster_id); 273 274 dout("%s %s start %llu flags 0x%x\n", __func__, buf, 275 le64_to_cpu(monc->subs[i].item.start), 276 monc->subs[i].item.flags); 277 ceph_encode_string(&p, end, buf, len); 278 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item)); 279 p += sizeof(monc->subs[i].item); 280 } 281 282 BUG_ON(p > end); 283 msg->front.iov_len = p - msg->front.iov_base; 284 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 285 ceph_msg_revoke(msg); 286 ceph_con_send(&monc->con, ceph_msg_get(msg)); 287 } 288 289 static void handle_subscribe_ack(struct ceph_mon_client *monc, 290 struct ceph_msg *msg) 291 { 292 unsigned int seconds; 293 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 294 295 if (msg->front.iov_len < sizeof(*h)) 296 goto bad; 297 seconds = le32_to_cpu(h->duration); 298 299 mutex_lock(&monc->mutex); 300 if (monc->sub_renew_sent) { 301 /* 302 * This is only needed for legacy (infernalis or older) 303 * MONs -- see delayed_work(). 304 */ 305 monc->sub_renew_after = monc->sub_renew_sent + 306 (seconds >> 1) * HZ - 1; 307 dout("%s sent %lu duration %d renew after %lu\n", __func__, 308 monc->sub_renew_sent, seconds, monc->sub_renew_after); 309 monc->sub_renew_sent = 0; 310 } else { 311 dout("%s sent %lu renew after %lu, ignoring\n", __func__, 312 monc->sub_renew_sent, monc->sub_renew_after); 313 } 314 mutex_unlock(&monc->mutex); 315 return; 316 bad: 317 pr_err("got corrupt subscribe-ack msg\n"); 318 ceph_msg_dump(msg); 319 } 320 321 /* 322 * Register interest in a map 323 * 324 * @sub: one of CEPH_SUB_* 325 * @epoch: X for "every map since X", or 0 for "just the latest" 326 */ 327 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub, 328 u32 epoch, bool continuous) 329 { 330 __le64 start = cpu_to_le64(epoch); 331 u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0; 332 333 dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub], 334 epoch, continuous); 335 336 if (monc->subs[sub].want && 337 monc->subs[sub].item.start == start && 338 monc->subs[sub].item.flags == flags) 339 return false; 340 341 monc->subs[sub].item.start = start; 342 monc->subs[sub].item.flags = flags; 343 monc->subs[sub].want = true; 344 345 return true; 346 } 347 348 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch, 349 bool continuous) 350 { 351 bool need_request; 352 353 mutex_lock(&monc->mutex); 354 need_request = __ceph_monc_want_map(monc, sub, epoch, continuous); 355 mutex_unlock(&monc->mutex); 356 357 return need_request; 358 } 359 EXPORT_SYMBOL(ceph_monc_want_map); 360 361 /* 362 * Keep track of which maps we have 363 * 364 * @sub: one of CEPH_SUB_* 365 */ 366 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub, 367 u32 epoch) 368 { 369 dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch); 370 371 if (monc->subs[sub].want) { 372 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME) 373 monc->subs[sub].want = false; 374 else 375 monc->subs[sub].item.start = cpu_to_le64(epoch + 1); 376 } 377 378 monc->subs[sub].have = epoch; 379 } 380 381 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch) 382 { 383 mutex_lock(&monc->mutex); 384 __ceph_monc_got_map(monc, sub, epoch); 385 mutex_unlock(&monc->mutex); 386 } 387 EXPORT_SYMBOL(ceph_monc_got_map); 388 389 void ceph_monc_renew_subs(struct ceph_mon_client *monc) 390 { 391 mutex_lock(&monc->mutex); 392 __send_subscribe(monc); 393 mutex_unlock(&monc->mutex); 394 } 395 EXPORT_SYMBOL(ceph_monc_renew_subs); 396 397 /* 398 * Wait for an osdmap with a given epoch. 399 * 400 * @epoch: epoch to wait for 401 * @timeout: in jiffies, 0 means "wait forever" 402 */ 403 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, 404 unsigned long timeout) 405 { 406 unsigned long started = jiffies; 407 long ret; 408 409 mutex_lock(&monc->mutex); 410 while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) { 411 mutex_unlock(&monc->mutex); 412 413 if (timeout && time_after_eq(jiffies, started + timeout)) 414 return -ETIMEDOUT; 415 416 ret = wait_event_interruptible_timeout(monc->client->auth_wq, 417 monc->subs[CEPH_SUB_OSDMAP].have >= epoch, 418 ceph_timeout_jiffies(timeout)); 419 if (ret < 0) 420 return ret; 421 422 mutex_lock(&monc->mutex); 423 } 424 425 mutex_unlock(&monc->mutex); 426 return 0; 427 } 428 EXPORT_SYMBOL(ceph_monc_wait_osdmap); 429 430 /* 431 * Open a session with a random monitor. Request monmap and osdmap, 432 * which are waited upon in __ceph_open_session(). 433 */ 434 int ceph_monc_open_session(struct ceph_mon_client *monc) 435 { 436 mutex_lock(&monc->mutex); 437 __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true); 438 __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false); 439 __open_session(monc); 440 __schedule_delayed(monc); 441 mutex_unlock(&monc->mutex); 442 return 0; 443 } 444 EXPORT_SYMBOL(ceph_monc_open_session); 445 446 static void ceph_monc_handle_map(struct ceph_mon_client *monc, 447 struct ceph_msg *msg) 448 { 449 struct ceph_client *client = monc->client; 450 struct ceph_monmap *monmap = NULL, *old = monc->monmap; 451 void *p, *end; 452 453 mutex_lock(&monc->mutex); 454 455 dout("handle_monmap\n"); 456 p = msg->front.iov_base; 457 end = p + msg->front.iov_len; 458 459 monmap = ceph_monmap_decode(p, end); 460 if (IS_ERR(monmap)) { 461 pr_err("problem decoding monmap, %d\n", 462 (int)PTR_ERR(monmap)); 463 goto out; 464 } 465 466 if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) { 467 kfree(monmap); 468 goto out; 469 } 470 471 client->monc.monmap = monmap; 472 kfree(old); 473 474 __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch); 475 client->have_fsid = true; 476 477 out: 478 mutex_unlock(&monc->mutex); 479 wake_up_all(&client->auth_wq); 480 } 481 482 /* 483 * generic requests (currently statfs, mon_get_version) 484 */ 485 DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node) 486 487 static void release_generic_request(struct kref *kref) 488 { 489 struct ceph_mon_generic_request *req = 490 container_of(kref, struct ceph_mon_generic_request, kref); 491 492 dout("%s greq %p request %p reply %p\n", __func__, req, req->request, 493 req->reply); 494 WARN_ON(!RB_EMPTY_NODE(&req->node)); 495 496 if (req->reply) 497 ceph_msg_put(req->reply); 498 if (req->request) 499 ceph_msg_put(req->request); 500 501 kfree(req); 502 } 503 504 static void put_generic_request(struct ceph_mon_generic_request *req) 505 { 506 if (req) 507 kref_put(&req->kref, release_generic_request); 508 } 509 510 static void get_generic_request(struct ceph_mon_generic_request *req) 511 { 512 kref_get(&req->kref); 513 } 514 515 static struct ceph_mon_generic_request * 516 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp) 517 { 518 struct ceph_mon_generic_request *req; 519 520 req = kzalloc(sizeof(*req), gfp); 521 if (!req) 522 return NULL; 523 524 req->monc = monc; 525 kref_init(&req->kref); 526 RB_CLEAR_NODE(&req->node); 527 init_completion(&req->completion); 528 529 dout("%s greq %p\n", __func__, req); 530 return req; 531 } 532 533 static void register_generic_request(struct ceph_mon_generic_request *req) 534 { 535 struct ceph_mon_client *monc = req->monc; 536 537 WARN_ON(req->tid); 538 539 get_generic_request(req); 540 req->tid = ++monc->last_tid; 541 insert_generic_request(&monc->generic_request_tree, req); 542 } 543 544 static void send_generic_request(struct ceph_mon_client *monc, 545 struct ceph_mon_generic_request *req) 546 { 547 WARN_ON(!req->tid); 548 549 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 550 req->request->hdr.tid = cpu_to_le64(req->tid); 551 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 552 } 553 554 static void __finish_generic_request(struct ceph_mon_generic_request *req) 555 { 556 struct ceph_mon_client *monc = req->monc; 557 558 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 559 erase_generic_request(&monc->generic_request_tree, req); 560 561 ceph_msg_revoke(req->request); 562 ceph_msg_revoke_incoming(req->reply); 563 } 564 565 static void finish_generic_request(struct ceph_mon_generic_request *req) 566 { 567 __finish_generic_request(req); 568 put_generic_request(req); 569 } 570 571 static void complete_generic_request(struct ceph_mon_generic_request *req) 572 { 573 if (req->complete_cb) 574 req->complete_cb(req); 575 else 576 complete_all(&req->completion); 577 put_generic_request(req); 578 } 579 580 static void cancel_generic_request(struct ceph_mon_generic_request *req) 581 { 582 struct ceph_mon_client *monc = req->monc; 583 struct ceph_mon_generic_request *lookup_req; 584 585 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 586 587 mutex_lock(&monc->mutex); 588 lookup_req = lookup_generic_request(&monc->generic_request_tree, 589 req->tid); 590 if (lookup_req) { 591 WARN_ON(lookup_req != req); 592 finish_generic_request(req); 593 } 594 595 mutex_unlock(&monc->mutex); 596 } 597 598 static int wait_generic_request(struct ceph_mon_generic_request *req) 599 { 600 int ret; 601 602 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 603 ret = wait_for_completion_interruptible(&req->completion); 604 if (ret) 605 cancel_generic_request(req); 606 else 607 ret = req->result; /* completed */ 608 609 return ret; 610 } 611 612 static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 613 struct ceph_msg_header *hdr, 614 int *skip) 615 { 616 struct ceph_mon_client *monc = con->private; 617 struct ceph_mon_generic_request *req; 618 u64 tid = le64_to_cpu(hdr->tid); 619 struct ceph_msg *m; 620 621 mutex_lock(&monc->mutex); 622 req = lookup_generic_request(&monc->generic_request_tree, tid); 623 if (!req) { 624 dout("get_generic_reply %lld dne\n", tid); 625 *skip = 1; 626 m = NULL; 627 } else { 628 dout("get_generic_reply %lld got %p\n", tid, req->reply); 629 *skip = 0; 630 m = ceph_msg_get(req->reply); 631 /* 632 * we don't need to track the connection reading into 633 * this reply because we only have one open connection 634 * at a time, ever. 635 */ 636 } 637 mutex_unlock(&monc->mutex); 638 return m; 639 } 640 641 /* 642 * statfs 643 */ 644 static void handle_statfs_reply(struct ceph_mon_client *monc, 645 struct ceph_msg *msg) 646 { 647 struct ceph_mon_generic_request *req; 648 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 649 u64 tid = le64_to_cpu(msg->hdr.tid); 650 651 dout("%s msg %p tid %llu\n", __func__, msg, tid); 652 653 if (msg->front.iov_len != sizeof(*reply)) 654 goto bad; 655 656 mutex_lock(&monc->mutex); 657 req = lookup_generic_request(&monc->generic_request_tree, tid); 658 if (!req) { 659 mutex_unlock(&monc->mutex); 660 return; 661 } 662 663 req->result = 0; 664 *req->u.st = reply->st; /* struct */ 665 __finish_generic_request(req); 666 mutex_unlock(&monc->mutex); 667 668 complete_generic_request(req); 669 return; 670 671 bad: 672 pr_err("corrupt statfs reply, tid %llu\n", tid); 673 ceph_msg_dump(msg); 674 } 675 676 /* 677 * Do a synchronous statfs(). 678 */ 679 int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool, 680 struct ceph_statfs *buf) 681 { 682 struct ceph_mon_generic_request *req; 683 struct ceph_mon_statfs *h; 684 int ret = -ENOMEM; 685 686 req = alloc_generic_request(monc, GFP_NOFS); 687 if (!req) 688 goto out; 689 690 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, 691 true); 692 if (!req->request) 693 goto out; 694 695 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true); 696 if (!req->reply) 697 goto out; 698 699 req->u.st = buf; 700 req->request->hdr.version = cpu_to_le16(2); 701 702 mutex_lock(&monc->mutex); 703 register_generic_request(req); 704 /* fill out request */ 705 h = req->request->front.iov_base; 706 h->monhdr.have_version = 0; 707 h->monhdr.session_mon = cpu_to_le16(-1); 708 h->monhdr.session_mon_tid = 0; 709 h->fsid = monc->monmap->fsid; 710 h->contains_data_pool = (data_pool != CEPH_NOPOOL); 711 h->data_pool = cpu_to_le64(data_pool); 712 send_generic_request(monc, req); 713 mutex_unlock(&monc->mutex); 714 715 ret = wait_generic_request(req); 716 out: 717 put_generic_request(req); 718 return ret; 719 } 720 EXPORT_SYMBOL(ceph_monc_do_statfs); 721 722 static void handle_get_version_reply(struct ceph_mon_client *monc, 723 struct ceph_msg *msg) 724 { 725 struct ceph_mon_generic_request *req; 726 u64 tid = le64_to_cpu(msg->hdr.tid); 727 void *p = msg->front.iov_base; 728 void *end = p + msg->front_alloc_len; 729 u64 handle; 730 731 dout("%s msg %p tid %llu\n", __func__, msg, tid); 732 733 ceph_decode_need(&p, end, 2*sizeof(u64), bad); 734 handle = ceph_decode_64(&p); 735 if (tid != 0 && tid != handle) 736 goto bad; 737 738 mutex_lock(&monc->mutex); 739 req = lookup_generic_request(&monc->generic_request_tree, handle); 740 if (!req) { 741 mutex_unlock(&monc->mutex); 742 return; 743 } 744 745 req->result = 0; 746 req->u.newest = ceph_decode_64(&p); 747 __finish_generic_request(req); 748 mutex_unlock(&monc->mutex); 749 750 complete_generic_request(req); 751 return; 752 753 bad: 754 pr_err("corrupt mon_get_version reply, tid %llu\n", tid); 755 ceph_msg_dump(msg); 756 } 757 758 static struct ceph_mon_generic_request * 759 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 760 ceph_monc_callback_t cb, u64 private_data) 761 { 762 struct ceph_mon_generic_request *req; 763 764 req = alloc_generic_request(monc, GFP_NOIO); 765 if (!req) 766 goto err_put_req; 767 768 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, 769 sizeof(u64) + sizeof(u32) + strlen(what), 770 GFP_NOIO, true); 771 if (!req->request) 772 goto err_put_req; 773 774 req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO, 775 true); 776 if (!req->reply) 777 goto err_put_req; 778 779 req->complete_cb = cb; 780 req->private_data = private_data; 781 782 mutex_lock(&monc->mutex); 783 register_generic_request(req); 784 { 785 void *p = req->request->front.iov_base; 786 void *const end = p + req->request->front_alloc_len; 787 788 ceph_encode_64(&p, req->tid); /* handle */ 789 ceph_encode_string(&p, end, what, strlen(what)); 790 WARN_ON(p != end); 791 } 792 send_generic_request(monc, req); 793 mutex_unlock(&monc->mutex); 794 795 return req; 796 797 err_put_req: 798 put_generic_request(req); 799 return ERR_PTR(-ENOMEM); 800 } 801 802 /* 803 * Send MMonGetVersion and wait for the reply. 804 * 805 * @what: one of "mdsmap", "osdmap" or "monmap" 806 */ 807 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 808 u64 *newest) 809 { 810 struct ceph_mon_generic_request *req; 811 int ret; 812 813 req = __ceph_monc_get_version(monc, what, NULL, 0); 814 if (IS_ERR(req)) 815 return PTR_ERR(req); 816 817 ret = wait_generic_request(req); 818 if (!ret) 819 *newest = req->u.newest; 820 821 put_generic_request(req); 822 return ret; 823 } 824 EXPORT_SYMBOL(ceph_monc_get_version); 825 826 /* 827 * Send MMonGetVersion, 828 * 829 * @what: one of "mdsmap", "osdmap" or "monmap" 830 */ 831 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, 832 ceph_monc_callback_t cb, u64 private_data) 833 { 834 struct ceph_mon_generic_request *req; 835 836 req = __ceph_monc_get_version(monc, what, cb, private_data); 837 if (IS_ERR(req)) 838 return PTR_ERR(req); 839 840 put_generic_request(req); 841 return 0; 842 } 843 EXPORT_SYMBOL(ceph_monc_get_version_async); 844 845 static void handle_command_ack(struct ceph_mon_client *monc, 846 struct ceph_msg *msg) 847 { 848 struct ceph_mon_generic_request *req; 849 void *p = msg->front.iov_base; 850 void *const end = p + msg->front_alloc_len; 851 u64 tid = le64_to_cpu(msg->hdr.tid); 852 853 dout("%s msg %p tid %llu\n", __func__, msg, tid); 854 855 ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) + 856 sizeof(u32), bad); 857 p += sizeof(struct ceph_mon_request_header); 858 859 mutex_lock(&monc->mutex); 860 req = lookup_generic_request(&monc->generic_request_tree, tid); 861 if (!req) { 862 mutex_unlock(&monc->mutex); 863 return; 864 } 865 866 req->result = ceph_decode_32(&p); 867 __finish_generic_request(req); 868 mutex_unlock(&monc->mutex); 869 870 complete_generic_request(req); 871 return; 872 873 bad: 874 pr_err("corrupt mon_command ack, tid %llu\n", tid); 875 ceph_msg_dump(msg); 876 } 877 878 int ceph_monc_blacklist_add(struct ceph_mon_client *monc, 879 struct ceph_entity_addr *client_addr) 880 { 881 struct ceph_mon_generic_request *req; 882 struct ceph_mon_command *h; 883 int ret = -ENOMEM; 884 int len; 885 886 req = alloc_generic_request(monc, GFP_NOIO); 887 if (!req) 888 goto out; 889 890 req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true); 891 if (!req->request) 892 goto out; 893 894 req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO, 895 true); 896 if (!req->reply) 897 goto out; 898 899 mutex_lock(&monc->mutex); 900 register_generic_request(req); 901 h = req->request->front.iov_base; 902 h->monhdr.have_version = 0; 903 h->monhdr.session_mon = cpu_to_le16(-1); 904 h->monhdr.session_mon_tid = 0; 905 h->fsid = monc->monmap->fsid; 906 h->num_strs = cpu_to_le32(1); 907 len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \ 908 \"blacklistop\": \"add\", \ 909 \"addr\": \"%pISpc/%u\" }", 910 &client_addr->in_addr, le32_to_cpu(client_addr->nonce)); 911 h->str_len = cpu_to_le32(len); 912 send_generic_request(monc, req); 913 mutex_unlock(&monc->mutex); 914 915 ret = wait_generic_request(req); 916 out: 917 put_generic_request(req); 918 return ret; 919 } 920 EXPORT_SYMBOL(ceph_monc_blacklist_add); 921 922 /* 923 * Resend pending generic requests. 924 */ 925 static void __resend_generic_request(struct ceph_mon_client *monc) 926 { 927 struct ceph_mon_generic_request *req; 928 struct rb_node *p; 929 930 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 931 req = rb_entry(p, struct ceph_mon_generic_request, node); 932 ceph_msg_revoke(req->request); 933 ceph_msg_revoke_incoming(req->reply); 934 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 935 } 936 } 937 938 /* 939 * Delayed work. If we haven't mounted yet, retry. Otherwise, 940 * renew/retry subscription as needed (in case it is timing out, or we 941 * got an ENOMEM). And keep the monitor connection alive. 942 */ 943 static void delayed_work(struct work_struct *work) 944 { 945 struct ceph_mon_client *monc = 946 container_of(work, struct ceph_mon_client, delayed_work.work); 947 948 dout("monc delayed_work\n"); 949 mutex_lock(&monc->mutex); 950 if (monc->hunting) { 951 dout("%s continuing hunt\n", __func__); 952 reopen_session(monc); 953 } else { 954 int is_auth = ceph_auth_is_authenticated(monc->auth); 955 if (ceph_con_keepalive_expired(&monc->con, 956 CEPH_MONC_PING_TIMEOUT)) { 957 dout("monc keepalive timeout\n"); 958 is_auth = 0; 959 reopen_session(monc); 960 } 961 962 if (!monc->hunting) { 963 ceph_con_keepalive(&monc->con); 964 __validate_auth(monc); 965 } 966 967 if (is_auth && 968 !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) { 969 unsigned long now = jiffies; 970 971 dout("%s renew subs? now %lu renew after %lu\n", 972 __func__, now, monc->sub_renew_after); 973 if (time_after_eq(now, monc->sub_renew_after)) 974 __send_subscribe(monc); 975 } 976 } 977 __schedule_delayed(monc); 978 mutex_unlock(&monc->mutex); 979 } 980 981 /* 982 * On startup, we build a temporary monmap populated with the IPs 983 * provided by mount(2). 984 */ 985 static int build_initial_monmap(struct ceph_mon_client *monc) 986 { 987 struct ceph_options *opt = monc->client->options; 988 struct ceph_entity_addr *mon_addr = opt->mon_addr; 989 int num_mon = opt->num_mon; 990 int i; 991 992 /* build initial monmap */ 993 monc->monmap = kzalloc(sizeof(*monc->monmap) + 994 num_mon*sizeof(monc->monmap->mon_inst[0]), 995 GFP_KERNEL); 996 if (!monc->monmap) 997 return -ENOMEM; 998 for (i = 0; i < num_mon; i++) { 999 monc->monmap->mon_inst[i].addr = mon_addr[i]; 1000 monc->monmap->mon_inst[i].addr.nonce = 0; 1001 monc->monmap->mon_inst[i].name.type = 1002 CEPH_ENTITY_TYPE_MON; 1003 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i); 1004 } 1005 monc->monmap->num_mon = num_mon; 1006 return 0; 1007 } 1008 1009 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 1010 { 1011 int err = 0; 1012 1013 dout("init\n"); 1014 memset(monc, 0, sizeof(*monc)); 1015 monc->client = cl; 1016 monc->monmap = NULL; 1017 mutex_init(&monc->mutex); 1018 1019 err = build_initial_monmap(monc); 1020 if (err) 1021 goto out; 1022 1023 /* connection */ 1024 /* authentication */ 1025 monc->auth = ceph_auth_init(cl->options->name, 1026 cl->options->key); 1027 if (IS_ERR(monc->auth)) { 1028 err = PTR_ERR(monc->auth); 1029 goto out_monmap; 1030 } 1031 monc->auth->want_keys = 1032 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 1033 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 1034 1035 /* msgs */ 1036 err = -ENOMEM; 1037 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 1038 sizeof(struct ceph_mon_subscribe_ack), 1039 GFP_KERNEL, true); 1040 if (!monc->m_subscribe_ack) 1041 goto out_auth; 1042 1043 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128, 1044 GFP_KERNEL, true); 1045 if (!monc->m_subscribe) 1046 goto out_subscribe_ack; 1047 1048 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, 1049 GFP_KERNEL, true); 1050 if (!monc->m_auth_reply) 1051 goto out_subscribe; 1052 1053 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true); 1054 monc->pending_auth = 0; 1055 if (!monc->m_auth) 1056 goto out_auth_reply; 1057 1058 ceph_con_init(&monc->con, monc, &mon_con_ops, 1059 &monc->client->msgr); 1060 1061 monc->cur_mon = -1; 1062 monc->had_a_connection = false; 1063 monc->hunt_mult = 1; 1064 1065 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 1066 monc->generic_request_tree = RB_ROOT; 1067 monc->last_tid = 0; 1068 1069 monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE; 1070 1071 return 0; 1072 1073 out_auth_reply: 1074 ceph_msg_put(monc->m_auth_reply); 1075 out_subscribe: 1076 ceph_msg_put(monc->m_subscribe); 1077 out_subscribe_ack: 1078 ceph_msg_put(monc->m_subscribe_ack); 1079 out_auth: 1080 ceph_auth_destroy(monc->auth); 1081 out_monmap: 1082 kfree(monc->monmap); 1083 out: 1084 return err; 1085 } 1086 EXPORT_SYMBOL(ceph_monc_init); 1087 1088 void ceph_monc_stop(struct ceph_mon_client *monc) 1089 { 1090 dout("stop\n"); 1091 cancel_delayed_work_sync(&monc->delayed_work); 1092 1093 mutex_lock(&monc->mutex); 1094 __close_session(monc); 1095 monc->cur_mon = -1; 1096 mutex_unlock(&monc->mutex); 1097 1098 /* 1099 * flush msgr queue before we destroy ourselves to ensure that: 1100 * - any work that references our embedded con is finished. 1101 * - any osd_client or other work that may reference an authorizer 1102 * finishes before we shut down the auth subsystem. 1103 */ 1104 ceph_msgr_flush(); 1105 1106 ceph_auth_destroy(monc->auth); 1107 1108 WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree)); 1109 1110 ceph_msg_put(monc->m_auth); 1111 ceph_msg_put(monc->m_auth_reply); 1112 ceph_msg_put(monc->m_subscribe); 1113 ceph_msg_put(monc->m_subscribe_ack); 1114 1115 kfree(monc->monmap); 1116 } 1117 EXPORT_SYMBOL(ceph_monc_stop); 1118 1119 static void finish_hunting(struct ceph_mon_client *monc) 1120 { 1121 if (monc->hunting) { 1122 dout("%s found mon%d\n", __func__, monc->cur_mon); 1123 monc->hunting = false; 1124 monc->had_a_connection = true; 1125 monc->hunt_mult /= 2; /* reduce by 50% */ 1126 if (monc->hunt_mult < 1) 1127 monc->hunt_mult = 1; 1128 } 1129 } 1130 1131 static void handle_auth_reply(struct ceph_mon_client *monc, 1132 struct ceph_msg *msg) 1133 { 1134 int ret; 1135 int was_auth = 0; 1136 1137 mutex_lock(&monc->mutex); 1138 was_auth = ceph_auth_is_authenticated(monc->auth); 1139 monc->pending_auth = 0; 1140 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 1141 msg->front.iov_len, 1142 monc->m_auth->front.iov_base, 1143 monc->m_auth->front_alloc_len); 1144 if (ret > 0) { 1145 __send_prepared_auth_request(monc, ret); 1146 goto out; 1147 } 1148 1149 finish_hunting(monc); 1150 1151 if (ret < 0) { 1152 monc->client->auth_err = ret; 1153 } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) { 1154 dout("authenticated, starting session\n"); 1155 1156 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 1157 monc->client->msgr.inst.name.num = 1158 cpu_to_le64(monc->auth->global_id); 1159 1160 __send_subscribe(monc); 1161 __resend_generic_request(monc); 1162 1163 pr_info("mon%d %s session established\n", monc->cur_mon, 1164 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 1165 } 1166 1167 out: 1168 mutex_unlock(&monc->mutex); 1169 if (monc->client->auth_err < 0) 1170 wake_up_all(&monc->client->auth_wq); 1171 } 1172 1173 static int __validate_auth(struct ceph_mon_client *monc) 1174 { 1175 int ret; 1176 1177 if (monc->pending_auth) 1178 return 0; 1179 1180 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 1181 monc->m_auth->front_alloc_len); 1182 if (ret <= 0) 1183 return ret; /* either an error, or no need to authenticate */ 1184 __send_prepared_auth_request(monc, ret); 1185 return 0; 1186 } 1187 1188 int ceph_monc_validate_auth(struct ceph_mon_client *monc) 1189 { 1190 int ret; 1191 1192 mutex_lock(&monc->mutex); 1193 ret = __validate_auth(monc); 1194 mutex_unlock(&monc->mutex); 1195 return ret; 1196 } 1197 EXPORT_SYMBOL(ceph_monc_validate_auth); 1198 1199 /* 1200 * handle incoming message 1201 */ 1202 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 1203 { 1204 struct ceph_mon_client *monc = con->private; 1205 int type = le16_to_cpu(msg->hdr.type); 1206 1207 if (!monc) 1208 return; 1209 1210 switch (type) { 1211 case CEPH_MSG_AUTH_REPLY: 1212 handle_auth_reply(monc, msg); 1213 break; 1214 1215 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1216 handle_subscribe_ack(monc, msg); 1217 break; 1218 1219 case CEPH_MSG_STATFS_REPLY: 1220 handle_statfs_reply(monc, msg); 1221 break; 1222 1223 case CEPH_MSG_MON_GET_VERSION_REPLY: 1224 handle_get_version_reply(monc, msg); 1225 break; 1226 1227 case CEPH_MSG_MON_COMMAND_ACK: 1228 handle_command_ack(monc, msg); 1229 break; 1230 1231 case CEPH_MSG_MON_MAP: 1232 ceph_monc_handle_map(monc, msg); 1233 break; 1234 1235 case CEPH_MSG_OSD_MAP: 1236 ceph_osdc_handle_map(&monc->client->osdc, msg); 1237 break; 1238 1239 default: 1240 /* can the chained handler handle it? */ 1241 if (monc->client->extra_mon_dispatch && 1242 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 1243 break; 1244 1245 pr_err("received unknown message type %d %s\n", type, 1246 ceph_msg_type_name(type)); 1247 } 1248 ceph_msg_put(msg); 1249 } 1250 1251 /* 1252 * Allocate memory for incoming message 1253 */ 1254 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 1255 struct ceph_msg_header *hdr, 1256 int *skip) 1257 { 1258 struct ceph_mon_client *monc = con->private; 1259 int type = le16_to_cpu(hdr->type); 1260 int front_len = le32_to_cpu(hdr->front_len); 1261 struct ceph_msg *m = NULL; 1262 1263 *skip = 0; 1264 1265 switch (type) { 1266 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1267 m = ceph_msg_get(monc->m_subscribe_ack); 1268 break; 1269 case CEPH_MSG_STATFS_REPLY: 1270 case CEPH_MSG_MON_COMMAND_ACK: 1271 return get_generic_reply(con, hdr, skip); 1272 case CEPH_MSG_AUTH_REPLY: 1273 m = ceph_msg_get(monc->m_auth_reply); 1274 break; 1275 case CEPH_MSG_MON_GET_VERSION_REPLY: 1276 if (le64_to_cpu(hdr->tid) != 0) 1277 return get_generic_reply(con, hdr, skip); 1278 1279 /* 1280 * Older OSDs don't set reply tid even if the orignal 1281 * request had a non-zero tid. Workaround this weirdness 1282 * by falling through to the allocate case. 1283 */ 1284 case CEPH_MSG_MON_MAP: 1285 case CEPH_MSG_MDS_MAP: 1286 case CEPH_MSG_OSD_MAP: 1287 case CEPH_MSG_FS_MAP_USER: 1288 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1289 if (!m) 1290 return NULL; /* ENOMEM--return skip == 0 */ 1291 break; 1292 } 1293 1294 if (!m) { 1295 pr_info("alloc_msg unknown type %d\n", type); 1296 *skip = 1; 1297 } else if (front_len > m->front_alloc_len) { 1298 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n", 1299 front_len, m->front_alloc_len, 1300 (unsigned int)con->peer_name.type, 1301 le64_to_cpu(con->peer_name.num)); 1302 ceph_msg_put(m); 1303 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1304 } 1305 1306 return m; 1307 } 1308 1309 /* 1310 * If the monitor connection resets, pick a new monitor and resubmit 1311 * any pending requests. 1312 */ 1313 static void mon_fault(struct ceph_connection *con) 1314 { 1315 struct ceph_mon_client *monc = con->private; 1316 1317 mutex_lock(&monc->mutex); 1318 dout("%s mon%d\n", __func__, monc->cur_mon); 1319 if (monc->cur_mon >= 0) { 1320 if (!monc->hunting) { 1321 dout("%s hunting for new mon\n", __func__); 1322 reopen_session(monc); 1323 __schedule_delayed(monc); 1324 } else { 1325 dout("%s already hunting\n", __func__); 1326 } 1327 } 1328 mutex_unlock(&monc->mutex); 1329 } 1330 1331 /* 1332 * We can ignore refcounting on the connection struct, as all references 1333 * will come from the messenger workqueue, which is drained prior to 1334 * mon_client destruction. 1335 */ 1336 static struct ceph_connection *con_get(struct ceph_connection *con) 1337 { 1338 return con; 1339 } 1340 1341 static void con_put(struct ceph_connection *con) 1342 { 1343 } 1344 1345 static const struct ceph_connection_operations mon_con_ops = { 1346 .get = con_get, 1347 .put = con_put, 1348 .dispatch = dispatch, 1349 .fault = mon_fault, 1350 .alloc_msg = mon_alloc_msg, 1351 }; 1352