1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/module.h> 5 #include <linux/types.h> 6 #include <linux/slab.h> 7 #include <linux/random.h> 8 #include <linux/sched.h> 9 10 #include <linux/ceph/ceph_features.h> 11 #include <linux/ceph/mon_client.h> 12 #include <linux/ceph/libceph.h> 13 #include <linux/ceph/debugfs.h> 14 #include <linux/ceph/decode.h> 15 #include <linux/ceph/auth.h> 16 17 /* 18 * Interact with Ceph monitor cluster. Handle requests for new map 19 * versions, and periodically resend as needed. Also implement 20 * statfs() and umount(). 21 * 22 * A small cluster of Ceph "monitors" are responsible for managing critical 23 * cluster configuration and state information. An odd number (e.g., 3, 5) 24 * of cmon daemons use a modified version of the Paxos part-time parliament 25 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 26 * list of clients who have mounted the file system. 27 * 28 * We maintain an open, active session with a monitor at all times in order to 29 * receive timely MDSMap updates. We periodically send a keepalive byte on the 30 * TCP socket to ensure we detect a failure. If the connection does break, we 31 * randomly hunt for a new monitor. Once the connection is reestablished, we 32 * resend any outstanding requests. 33 */ 34 35 static const struct ceph_connection_operations mon_con_ops; 36 37 static int __validate_auth(struct ceph_mon_client *monc); 38 39 static int decode_mon_info(void **p, void *end, bool msgr2, 40 struct ceph_entity_addr *addr) 41 { 42 void *mon_info_end; 43 u32 struct_len; 44 u8 struct_v; 45 int ret; 46 47 ret = ceph_start_decoding(p, end, 1, "mon_info_t", &struct_v, 48 &struct_len); 49 if (ret) 50 return ret; 51 52 mon_info_end = *p + struct_len; 53 ceph_decode_skip_string(p, end, e_inval); /* skip mon name */ 54 ret = ceph_decode_entity_addrvec(p, end, msgr2, addr); 55 if (ret) 56 return ret; 57 58 *p = mon_info_end; 59 return 0; 60 61 e_inval: 62 return -EINVAL; 63 } 64 65 /* 66 * Decode a monmap blob (e.g., during mount). 67 * 68 * Assume MonMap v3 (i.e. encoding with MONNAMES and MONENC). 69 */ 70 static struct ceph_monmap *ceph_monmap_decode(void **p, void *end, bool msgr2) 71 { 72 struct ceph_monmap *monmap = NULL; 73 struct ceph_fsid fsid; 74 u32 struct_len; 75 int blob_len; 76 int num_mon; 77 u8 struct_v; 78 u32 epoch; 79 int ret; 80 int i; 81 82 ceph_decode_32_safe(p, end, blob_len, e_inval); 83 ceph_decode_need(p, end, blob_len, e_inval); 84 85 ret = ceph_start_decoding(p, end, 6, "monmap", &struct_v, &struct_len); 86 if (ret) 87 goto fail; 88 89 dout("%s struct_v %d\n", __func__, struct_v); 90 ceph_decode_copy_safe(p, end, &fsid, sizeof(fsid), e_inval); 91 ceph_decode_32_safe(p, end, epoch, e_inval); 92 if (struct_v >= 6) { 93 u32 feat_struct_len; 94 u8 feat_struct_v; 95 96 *p += sizeof(struct ceph_timespec); /* skip last_changed */ 97 *p += sizeof(struct ceph_timespec); /* skip created */ 98 99 ret = ceph_start_decoding(p, end, 1, "mon_feature_t", 100 &feat_struct_v, &feat_struct_len); 101 if (ret) 102 goto fail; 103 104 *p += feat_struct_len; /* skip persistent_features */ 105 106 ret = ceph_start_decoding(p, end, 1, "mon_feature_t", 107 &feat_struct_v, &feat_struct_len); 108 if (ret) 109 goto fail; 110 111 *p += feat_struct_len; /* skip optional_features */ 112 } 113 ceph_decode_32_safe(p, end, num_mon, e_inval); 114 115 dout("%s fsid %pU epoch %u num_mon %d\n", __func__, &fsid, epoch, 116 num_mon); 117 if (num_mon > CEPH_MAX_MON) 118 goto e_inval; 119 120 monmap = kmalloc(struct_size(monmap, mon_inst, num_mon), GFP_NOIO); 121 if (!monmap) { 122 ret = -ENOMEM; 123 goto fail; 124 } 125 monmap->fsid = fsid; 126 monmap->epoch = epoch; 127 monmap->num_mon = num_mon; 128 129 /* legacy_mon_addr map or mon_info map */ 130 for (i = 0; i < num_mon; i++) { 131 struct ceph_entity_inst *inst = &monmap->mon_inst[i]; 132 133 ceph_decode_skip_string(p, end, e_inval); /* skip mon name */ 134 inst->name.type = CEPH_ENTITY_TYPE_MON; 135 inst->name.num = cpu_to_le64(i); 136 137 if (struct_v >= 6) 138 ret = decode_mon_info(p, end, msgr2, &inst->addr); 139 else 140 ret = ceph_decode_entity_addr(p, end, &inst->addr); 141 if (ret) 142 goto fail; 143 144 dout("%s mon%d addr %s\n", __func__, i, 145 ceph_pr_addr(&inst->addr)); 146 } 147 148 return monmap; 149 150 e_inval: 151 ret = -EINVAL; 152 fail: 153 kfree(monmap); 154 return ERR_PTR(ret); 155 } 156 157 /* 158 * return true if *addr is included in the monmap. 159 */ 160 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 161 { 162 int i; 163 164 for (i = 0; i < m->num_mon; i++) { 165 if (ceph_addr_equal_no_type(addr, &m->mon_inst[i].addr)) 166 return 1; 167 } 168 169 return 0; 170 } 171 172 /* 173 * Send an auth request. 174 */ 175 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 176 { 177 monc->pending_auth = 1; 178 monc->m_auth->front.iov_len = len; 179 monc->m_auth->hdr.front_len = cpu_to_le32(len); 180 ceph_msg_revoke(monc->m_auth); 181 ceph_msg_get(monc->m_auth); /* keep our ref */ 182 ceph_con_send(&monc->con, monc->m_auth); 183 } 184 185 /* 186 * Close monitor session, if any. 187 */ 188 static void __close_session(struct ceph_mon_client *monc) 189 { 190 dout("__close_session closing mon%d\n", monc->cur_mon); 191 ceph_msg_revoke(monc->m_auth); 192 ceph_msg_revoke_incoming(monc->m_auth_reply); 193 ceph_msg_revoke(monc->m_subscribe); 194 ceph_msg_revoke_incoming(monc->m_subscribe_ack); 195 ceph_con_close(&monc->con); 196 197 monc->pending_auth = 0; 198 ceph_auth_reset(monc->auth); 199 } 200 201 /* 202 * Pick a new monitor at random and set cur_mon. If we are repicking 203 * (i.e. cur_mon is already set), be sure to pick a different one. 204 */ 205 static void pick_new_mon(struct ceph_mon_client *monc) 206 { 207 int old_mon = monc->cur_mon; 208 209 BUG_ON(monc->monmap->num_mon < 1); 210 211 if (monc->monmap->num_mon == 1) { 212 monc->cur_mon = 0; 213 } else { 214 int max = monc->monmap->num_mon; 215 int o = -1; 216 int n; 217 218 if (monc->cur_mon >= 0) { 219 if (monc->cur_mon < monc->monmap->num_mon) 220 o = monc->cur_mon; 221 if (o >= 0) 222 max--; 223 } 224 225 n = prandom_u32() % max; 226 if (o >= 0 && n >= o) 227 n++; 228 229 monc->cur_mon = n; 230 } 231 232 dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon, 233 monc->cur_mon, monc->monmap->num_mon); 234 } 235 236 /* 237 * Open a session with a new monitor. 238 */ 239 static void __open_session(struct ceph_mon_client *monc) 240 { 241 int ret; 242 243 pick_new_mon(monc); 244 245 monc->hunting = true; 246 if (monc->had_a_connection) { 247 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF; 248 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT) 249 monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT; 250 } 251 252 monc->sub_renew_after = jiffies; /* i.e., expired */ 253 monc->sub_renew_sent = 0; 254 255 dout("%s opening mon%d\n", __func__, monc->cur_mon); 256 ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon, 257 &monc->monmap->mon_inst[monc->cur_mon].addr); 258 259 /* 260 * Queue a keepalive to ensure that in case of an early fault 261 * the messenger doesn't put us into STANDBY state and instead 262 * retries. This also ensures that our timestamp is valid by 263 * the time we finish hunting and delayed_work() checks it. 264 */ 265 ceph_con_keepalive(&monc->con); 266 if (ceph_msgr2(monc->client)) { 267 monc->pending_auth = 1; 268 return; 269 } 270 271 /* initiate authentication handshake */ 272 ret = ceph_auth_build_hello(monc->auth, 273 monc->m_auth->front.iov_base, 274 monc->m_auth->front_alloc_len); 275 BUG_ON(ret <= 0); 276 __send_prepared_auth_request(monc, ret); 277 } 278 279 static void reopen_session(struct ceph_mon_client *monc) 280 { 281 if (!monc->hunting) 282 pr_info("mon%d %s session lost, hunting for new mon\n", 283 monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr)); 284 285 __close_session(monc); 286 __open_session(monc); 287 } 288 289 void ceph_monc_reopen_session(struct ceph_mon_client *monc) 290 { 291 mutex_lock(&monc->mutex); 292 reopen_session(monc); 293 mutex_unlock(&monc->mutex); 294 } 295 296 static void un_backoff(struct ceph_mon_client *monc) 297 { 298 monc->hunt_mult /= 2; /* reduce by 50% */ 299 if (monc->hunt_mult < 1) 300 monc->hunt_mult = 1; 301 dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult); 302 } 303 304 /* 305 * Reschedule delayed work timer. 306 */ 307 static void __schedule_delayed(struct ceph_mon_client *monc) 308 { 309 unsigned long delay; 310 311 if (monc->hunting) 312 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult; 313 else 314 delay = CEPH_MONC_PING_INTERVAL; 315 316 dout("__schedule_delayed after %lu\n", delay); 317 mod_delayed_work(system_wq, &monc->delayed_work, 318 round_jiffies_relative(delay)); 319 } 320 321 const char *ceph_sub_str[] = { 322 [CEPH_SUB_MONMAP] = "monmap", 323 [CEPH_SUB_OSDMAP] = "osdmap", 324 [CEPH_SUB_FSMAP] = "fsmap.user", 325 [CEPH_SUB_MDSMAP] = "mdsmap", 326 }; 327 328 /* 329 * Send subscribe request for one or more maps, according to 330 * monc->subs. 331 */ 332 static void __send_subscribe(struct ceph_mon_client *monc) 333 { 334 struct ceph_msg *msg = monc->m_subscribe; 335 void *p = msg->front.iov_base; 336 void *const end = p + msg->front_alloc_len; 337 int num = 0; 338 int i; 339 340 dout("%s sent %lu\n", __func__, monc->sub_renew_sent); 341 342 BUG_ON(monc->cur_mon < 0); 343 344 if (!monc->sub_renew_sent) 345 monc->sub_renew_sent = jiffies | 1; /* never 0 */ 346 347 msg->hdr.version = cpu_to_le16(2); 348 349 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 350 if (monc->subs[i].want) 351 num++; 352 } 353 BUG_ON(num < 1); /* monmap sub is always there */ 354 ceph_encode_32(&p, num); 355 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 356 char buf[32]; 357 int len; 358 359 if (!monc->subs[i].want) 360 continue; 361 362 len = sprintf(buf, "%s", ceph_sub_str[i]); 363 if (i == CEPH_SUB_MDSMAP && 364 monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE) 365 len += sprintf(buf + len, ".%d", monc->fs_cluster_id); 366 367 dout("%s %s start %llu flags 0x%x\n", __func__, buf, 368 le64_to_cpu(monc->subs[i].item.start), 369 monc->subs[i].item.flags); 370 ceph_encode_string(&p, end, buf, len); 371 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item)); 372 p += sizeof(monc->subs[i].item); 373 } 374 375 BUG_ON(p > end); 376 msg->front.iov_len = p - msg->front.iov_base; 377 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 378 ceph_msg_revoke(msg); 379 ceph_con_send(&monc->con, ceph_msg_get(msg)); 380 } 381 382 static void handle_subscribe_ack(struct ceph_mon_client *monc, 383 struct ceph_msg *msg) 384 { 385 unsigned int seconds; 386 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 387 388 if (msg->front.iov_len < sizeof(*h)) 389 goto bad; 390 seconds = le32_to_cpu(h->duration); 391 392 mutex_lock(&monc->mutex); 393 if (monc->sub_renew_sent) { 394 /* 395 * This is only needed for legacy (infernalis or older) 396 * MONs -- see delayed_work(). 397 */ 398 monc->sub_renew_after = monc->sub_renew_sent + 399 (seconds >> 1) * HZ - 1; 400 dout("%s sent %lu duration %d renew after %lu\n", __func__, 401 monc->sub_renew_sent, seconds, monc->sub_renew_after); 402 monc->sub_renew_sent = 0; 403 } else { 404 dout("%s sent %lu renew after %lu, ignoring\n", __func__, 405 monc->sub_renew_sent, monc->sub_renew_after); 406 } 407 mutex_unlock(&monc->mutex); 408 return; 409 bad: 410 pr_err("got corrupt subscribe-ack msg\n"); 411 ceph_msg_dump(msg); 412 } 413 414 /* 415 * Register interest in a map 416 * 417 * @sub: one of CEPH_SUB_* 418 * @epoch: X for "every map since X", or 0 for "just the latest" 419 */ 420 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub, 421 u32 epoch, bool continuous) 422 { 423 __le64 start = cpu_to_le64(epoch); 424 u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0; 425 426 dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub], 427 epoch, continuous); 428 429 if (monc->subs[sub].want && 430 monc->subs[sub].item.start == start && 431 monc->subs[sub].item.flags == flags) 432 return false; 433 434 monc->subs[sub].item.start = start; 435 monc->subs[sub].item.flags = flags; 436 monc->subs[sub].want = true; 437 438 return true; 439 } 440 441 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch, 442 bool continuous) 443 { 444 bool need_request; 445 446 mutex_lock(&monc->mutex); 447 need_request = __ceph_monc_want_map(monc, sub, epoch, continuous); 448 mutex_unlock(&monc->mutex); 449 450 return need_request; 451 } 452 EXPORT_SYMBOL(ceph_monc_want_map); 453 454 /* 455 * Keep track of which maps we have 456 * 457 * @sub: one of CEPH_SUB_* 458 */ 459 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub, 460 u32 epoch) 461 { 462 dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch); 463 464 if (monc->subs[sub].want) { 465 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME) 466 monc->subs[sub].want = false; 467 else 468 monc->subs[sub].item.start = cpu_to_le64(epoch + 1); 469 } 470 471 monc->subs[sub].have = epoch; 472 } 473 474 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch) 475 { 476 mutex_lock(&monc->mutex); 477 __ceph_monc_got_map(monc, sub, epoch); 478 mutex_unlock(&monc->mutex); 479 } 480 EXPORT_SYMBOL(ceph_monc_got_map); 481 482 void ceph_monc_renew_subs(struct ceph_mon_client *monc) 483 { 484 mutex_lock(&monc->mutex); 485 __send_subscribe(monc); 486 mutex_unlock(&monc->mutex); 487 } 488 EXPORT_SYMBOL(ceph_monc_renew_subs); 489 490 /* 491 * Wait for an osdmap with a given epoch. 492 * 493 * @epoch: epoch to wait for 494 * @timeout: in jiffies, 0 means "wait forever" 495 */ 496 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, 497 unsigned long timeout) 498 { 499 unsigned long started = jiffies; 500 long ret; 501 502 mutex_lock(&monc->mutex); 503 while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) { 504 mutex_unlock(&monc->mutex); 505 506 if (timeout && time_after_eq(jiffies, started + timeout)) 507 return -ETIMEDOUT; 508 509 ret = wait_event_interruptible_timeout(monc->client->auth_wq, 510 monc->subs[CEPH_SUB_OSDMAP].have >= epoch, 511 ceph_timeout_jiffies(timeout)); 512 if (ret < 0) 513 return ret; 514 515 mutex_lock(&monc->mutex); 516 } 517 518 mutex_unlock(&monc->mutex); 519 return 0; 520 } 521 EXPORT_SYMBOL(ceph_monc_wait_osdmap); 522 523 /* 524 * Open a session with a random monitor. Request monmap and osdmap, 525 * which are waited upon in __ceph_open_session(). 526 */ 527 int ceph_monc_open_session(struct ceph_mon_client *monc) 528 { 529 mutex_lock(&monc->mutex); 530 __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true); 531 __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false); 532 __open_session(monc); 533 __schedule_delayed(monc); 534 mutex_unlock(&monc->mutex); 535 return 0; 536 } 537 EXPORT_SYMBOL(ceph_monc_open_session); 538 539 static void ceph_monc_handle_map(struct ceph_mon_client *monc, 540 struct ceph_msg *msg) 541 { 542 struct ceph_client *client = monc->client; 543 struct ceph_monmap *monmap; 544 void *p, *end; 545 546 mutex_lock(&monc->mutex); 547 548 dout("handle_monmap\n"); 549 p = msg->front.iov_base; 550 end = p + msg->front.iov_len; 551 552 monmap = ceph_monmap_decode(&p, end, ceph_msgr2(client)); 553 if (IS_ERR(monmap)) { 554 pr_err("problem decoding monmap, %d\n", 555 (int)PTR_ERR(monmap)); 556 ceph_msg_dump(msg); 557 goto out; 558 } 559 560 if (ceph_check_fsid(client, &monmap->fsid) < 0) { 561 kfree(monmap); 562 goto out; 563 } 564 565 kfree(monc->monmap); 566 monc->monmap = monmap; 567 568 __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch); 569 client->have_fsid = true; 570 571 out: 572 mutex_unlock(&monc->mutex); 573 wake_up_all(&client->auth_wq); 574 } 575 576 /* 577 * generic requests (currently statfs, mon_get_version) 578 */ 579 DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node) 580 581 static void release_generic_request(struct kref *kref) 582 { 583 struct ceph_mon_generic_request *req = 584 container_of(kref, struct ceph_mon_generic_request, kref); 585 586 dout("%s greq %p request %p reply %p\n", __func__, req, req->request, 587 req->reply); 588 WARN_ON(!RB_EMPTY_NODE(&req->node)); 589 590 if (req->reply) 591 ceph_msg_put(req->reply); 592 if (req->request) 593 ceph_msg_put(req->request); 594 595 kfree(req); 596 } 597 598 static void put_generic_request(struct ceph_mon_generic_request *req) 599 { 600 if (req) 601 kref_put(&req->kref, release_generic_request); 602 } 603 604 static void get_generic_request(struct ceph_mon_generic_request *req) 605 { 606 kref_get(&req->kref); 607 } 608 609 static struct ceph_mon_generic_request * 610 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp) 611 { 612 struct ceph_mon_generic_request *req; 613 614 req = kzalloc(sizeof(*req), gfp); 615 if (!req) 616 return NULL; 617 618 req->monc = monc; 619 kref_init(&req->kref); 620 RB_CLEAR_NODE(&req->node); 621 init_completion(&req->completion); 622 623 dout("%s greq %p\n", __func__, req); 624 return req; 625 } 626 627 static void register_generic_request(struct ceph_mon_generic_request *req) 628 { 629 struct ceph_mon_client *monc = req->monc; 630 631 WARN_ON(req->tid); 632 633 get_generic_request(req); 634 req->tid = ++monc->last_tid; 635 insert_generic_request(&monc->generic_request_tree, req); 636 } 637 638 static void send_generic_request(struct ceph_mon_client *monc, 639 struct ceph_mon_generic_request *req) 640 { 641 WARN_ON(!req->tid); 642 643 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 644 req->request->hdr.tid = cpu_to_le64(req->tid); 645 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 646 } 647 648 static void __finish_generic_request(struct ceph_mon_generic_request *req) 649 { 650 struct ceph_mon_client *monc = req->monc; 651 652 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 653 erase_generic_request(&monc->generic_request_tree, req); 654 655 ceph_msg_revoke(req->request); 656 ceph_msg_revoke_incoming(req->reply); 657 } 658 659 static void finish_generic_request(struct ceph_mon_generic_request *req) 660 { 661 __finish_generic_request(req); 662 put_generic_request(req); 663 } 664 665 static void complete_generic_request(struct ceph_mon_generic_request *req) 666 { 667 if (req->complete_cb) 668 req->complete_cb(req); 669 else 670 complete_all(&req->completion); 671 put_generic_request(req); 672 } 673 674 static void cancel_generic_request(struct ceph_mon_generic_request *req) 675 { 676 struct ceph_mon_client *monc = req->monc; 677 struct ceph_mon_generic_request *lookup_req; 678 679 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 680 681 mutex_lock(&monc->mutex); 682 lookup_req = lookup_generic_request(&monc->generic_request_tree, 683 req->tid); 684 if (lookup_req) { 685 WARN_ON(lookup_req != req); 686 finish_generic_request(req); 687 } 688 689 mutex_unlock(&monc->mutex); 690 } 691 692 static int wait_generic_request(struct ceph_mon_generic_request *req) 693 { 694 int ret; 695 696 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 697 ret = wait_for_completion_interruptible(&req->completion); 698 if (ret) 699 cancel_generic_request(req); 700 else 701 ret = req->result; /* completed */ 702 703 return ret; 704 } 705 706 static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 707 struct ceph_msg_header *hdr, 708 int *skip) 709 { 710 struct ceph_mon_client *monc = con->private; 711 struct ceph_mon_generic_request *req; 712 u64 tid = le64_to_cpu(hdr->tid); 713 struct ceph_msg *m; 714 715 mutex_lock(&monc->mutex); 716 req = lookup_generic_request(&monc->generic_request_tree, tid); 717 if (!req) { 718 dout("get_generic_reply %lld dne\n", tid); 719 *skip = 1; 720 m = NULL; 721 } else { 722 dout("get_generic_reply %lld got %p\n", tid, req->reply); 723 *skip = 0; 724 m = ceph_msg_get(req->reply); 725 /* 726 * we don't need to track the connection reading into 727 * this reply because we only have one open connection 728 * at a time, ever. 729 */ 730 } 731 mutex_unlock(&monc->mutex); 732 return m; 733 } 734 735 /* 736 * statfs 737 */ 738 static void handle_statfs_reply(struct ceph_mon_client *monc, 739 struct ceph_msg *msg) 740 { 741 struct ceph_mon_generic_request *req; 742 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 743 u64 tid = le64_to_cpu(msg->hdr.tid); 744 745 dout("%s msg %p tid %llu\n", __func__, msg, tid); 746 747 if (msg->front.iov_len != sizeof(*reply)) 748 goto bad; 749 750 mutex_lock(&monc->mutex); 751 req = lookup_generic_request(&monc->generic_request_tree, tid); 752 if (!req) { 753 mutex_unlock(&monc->mutex); 754 return; 755 } 756 757 req->result = 0; 758 *req->u.st = reply->st; /* struct */ 759 __finish_generic_request(req); 760 mutex_unlock(&monc->mutex); 761 762 complete_generic_request(req); 763 return; 764 765 bad: 766 pr_err("corrupt statfs reply, tid %llu\n", tid); 767 ceph_msg_dump(msg); 768 } 769 770 /* 771 * Do a synchronous statfs(). 772 */ 773 int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool, 774 struct ceph_statfs *buf) 775 { 776 struct ceph_mon_generic_request *req; 777 struct ceph_mon_statfs *h; 778 int ret = -ENOMEM; 779 780 req = alloc_generic_request(monc, GFP_NOFS); 781 if (!req) 782 goto out; 783 784 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, 785 true); 786 if (!req->request) 787 goto out; 788 789 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true); 790 if (!req->reply) 791 goto out; 792 793 req->u.st = buf; 794 req->request->hdr.version = cpu_to_le16(2); 795 796 mutex_lock(&monc->mutex); 797 register_generic_request(req); 798 /* fill out request */ 799 h = req->request->front.iov_base; 800 h->monhdr.have_version = 0; 801 h->monhdr.session_mon = cpu_to_le16(-1); 802 h->monhdr.session_mon_tid = 0; 803 h->fsid = monc->monmap->fsid; 804 h->contains_data_pool = (data_pool != CEPH_NOPOOL); 805 h->data_pool = cpu_to_le64(data_pool); 806 send_generic_request(monc, req); 807 mutex_unlock(&monc->mutex); 808 809 ret = wait_generic_request(req); 810 out: 811 put_generic_request(req); 812 return ret; 813 } 814 EXPORT_SYMBOL(ceph_monc_do_statfs); 815 816 static void handle_get_version_reply(struct ceph_mon_client *monc, 817 struct ceph_msg *msg) 818 { 819 struct ceph_mon_generic_request *req; 820 u64 tid = le64_to_cpu(msg->hdr.tid); 821 void *p = msg->front.iov_base; 822 void *end = p + msg->front_alloc_len; 823 u64 handle; 824 825 dout("%s msg %p tid %llu\n", __func__, msg, tid); 826 827 ceph_decode_need(&p, end, 2*sizeof(u64), bad); 828 handle = ceph_decode_64(&p); 829 if (tid != 0 && tid != handle) 830 goto bad; 831 832 mutex_lock(&monc->mutex); 833 req = lookup_generic_request(&monc->generic_request_tree, handle); 834 if (!req) { 835 mutex_unlock(&monc->mutex); 836 return; 837 } 838 839 req->result = 0; 840 req->u.newest = ceph_decode_64(&p); 841 __finish_generic_request(req); 842 mutex_unlock(&monc->mutex); 843 844 complete_generic_request(req); 845 return; 846 847 bad: 848 pr_err("corrupt mon_get_version reply, tid %llu\n", tid); 849 ceph_msg_dump(msg); 850 } 851 852 static struct ceph_mon_generic_request * 853 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 854 ceph_monc_callback_t cb, u64 private_data) 855 { 856 struct ceph_mon_generic_request *req; 857 858 req = alloc_generic_request(monc, GFP_NOIO); 859 if (!req) 860 goto err_put_req; 861 862 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, 863 sizeof(u64) + sizeof(u32) + strlen(what), 864 GFP_NOIO, true); 865 if (!req->request) 866 goto err_put_req; 867 868 req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO, 869 true); 870 if (!req->reply) 871 goto err_put_req; 872 873 req->complete_cb = cb; 874 req->private_data = private_data; 875 876 mutex_lock(&monc->mutex); 877 register_generic_request(req); 878 { 879 void *p = req->request->front.iov_base; 880 void *const end = p + req->request->front_alloc_len; 881 882 ceph_encode_64(&p, req->tid); /* handle */ 883 ceph_encode_string(&p, end, what, strlen(what)); 884 WARN_ON(p != end); 885 } 886 send_generic_request(monc, req); 887 mutex_unlock(&monc->mutex); 888 889 return req; 890 891 err_put_req: 892 put_generic_request(req); 893 return ERR_PTR(-ENOMEM); 894 } 895 896 /* 897 * Send MMonGetVersion and wait for the reply. 898 * 899 * @what: one of "mdsmap", "osdmap" or "monmap" 900 */ 901 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 902 u64 *newest) 903 { 904 struct ceph_mon_generic_request *req; 905 int ret; 906 907 req = __ceph_monc_get_version(monc, what, NULL, 0); 908 if (IS_ERR(req)) 909 return PTR_ERR(req); 910 911 ret = wait_generic_request(req); 912 if (!ret) 913 *newest = req->u.newest; 914 915 put_generic_request(req); 916 return ret; 917 } 918 EXPORT_SYMBOL(ceph_monc_get_version); 919 920 /* 921 * Send MMonGetVersion, 922 * 923 * @what: one of "mdsmap", "osdmap" or "monmap" 924 */ 925 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, 926 ceph_monc_callback_t cb, u64 private_data) 927 { 928 struct ceph_mon_generic_request *req; 929 930 req = __ceph_monc_get_version(monc, what, cb, private_data); 931 if (IS_ERR(req)) 932 return PTR_ERR(req); 933 934 put_generic_request(req); 935 return 0; 936 } 937 EXPORT_SYMBOL(ceph_monc_get_version_async); 938 939 static void handle_command_ack(struct ceph_mon_client *monc, 940 struct ceph_msg *msg) 941 { 942 struct ceph_mon_generic_request *req; 943 void *p = msg->front.iov_base; 944 void *const end = p + msg->front_alloc_len; 945 u64 tid = le64_to_cpu(msg->hdr.tid); 946 947 dout("%s msg %p tid %llu\n", __func__, msg, tid); 948 949 ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) + 950 sizeof(u32), bad); 951 p += sizeof(struct ceph_mon_request_header); 952 953 mutex_lock(&monc->mutex); 954 req = lookup_generic_request(&monc->generic_request_tree, tid); 955 if (!req) { 956 mutex_unlock(&monc->mutex); 957 return; 958 } 959 960 req->result = ceph_decode_32(&p); 961 __finish_generic_request(req); 962 mutex_unlock(&monc->mutex); 963 964 complete_generic_request(req); 965 return; 966 967 bad: 968 pr_err("corrupt mon_command ack, tid %llu\n", tid); 969 ceph_msg_dump(msg); 970 } 971 972 static __printf(2, 0) 973 int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt, 974 va_list ap) 975 { 976 struct ceph_mon_generic_request *req; 977 struct ceph_mon_command *h; 978 int ret = -ENOMEM; 979 int len; 980 981 req = alloc_generic_request(monc, GFP_NOIO); 982 if (!req) 983 goto out; 984 985 req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true); 986 if (!req->request) 987 goto out; 988 989 req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO, 990 true); 991 if (!req->reply) 992 goto out; 993 994 mutex_lock(&monc->mutex); 995 register_generic_request(req); 996 h = req->request->front.iov_base; 997 h->monhdr.have_version = 0; 998 h->monhdr.session_mon = cpu_to_le16(-1); 999 h->monhdr.session_mon_tid = 0; 1000 h->fsid = monc->monmap->fsid; 1001 h->num_strs = cpu_to_le32(1); 1002 len = vsprintf(h->str, fmt, ap); 1003 h->str_len = cpu_to_le32(len); 1004 send_generic_request(monc, req); 1005 mutex_unlock(&monc->mutex); 1006 1007 ret = wait_generic_request(req); 1008 out: 1009 put_generic_request(req); 1010 return ret; 1011 } 1012 1013 static __printf(2, 3) 1014 int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...) 1015 { 1016 va_list ap; 1017 int ret; 1018 1019 va_start(ap, fmt); 1020 ret = do_mon_command_vargs(monc, fmt, ap); 1021 va_end(ap); 1022 return ret; 1023 } 1024 1025 int ceph_monc_blocklist_add(struct ceph_mon_client *monc, 1026 struct ceph_entity_addr *client_addr) 1027 { 1028 int ret; 1029 1030 ret = do_mon_command(monc, 1031 "{ \"prefix\": \"osd blocklist\", \ 1032 \"blocklistop\": \"add\", \ 1033 \"addr\": \"%pISpc/%u\" }", 1034 &client_addr->in_addr, 1035 le32_to_cpu(client_addr->nonce)); 1036 if (ret == -EINVAL) { 1037 /* 1038 * The monitor returns EINVAL on an unrecognized command. 1039 * Try the legacy command -- it is exactly the same except 1040 * for the name. 1041 */ 1042 ret = do_mon_command(monc, 1043 "{ \"prefix\": \"osd blacklist\", \ 1044 \"blacklistop\": \"add\", \ 1045 \"addr\": \"%pISpc/%u\" }", 1046 &client_addr->in_addr, 1047 le32_to_cpu(client_addr->nonce)); 1048 } 1049 if (ret) 1050 return ret; 1051 1052 /* 1053 * Make sure we have the osdmap that includes the blocklist 1054 * entry. This is needed to ensure that the OSDs pick up the 1055 * new blocklist before processing any future requests from 1056 * this client. 1057 */ 1058 return ceph_wait_for_latest_osdmap(monc->client, 0); 1059 } 1060 EXPORT_SYMBOL(ceph_monc_blocklist_add); 1061 1062 /* 1063 * Resend pending generic requests. 1064 */ 1065 static void __resend_generic_request(struct ceph_mon_client *monc) 1066 { 1067 struct ceph_mon_generic_request *req; 1068 struct rb_node *p; 1069 1070 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 1071 req = rb_entry(p, struct ceph_mon_generic_request, node); 1072 ceph_msg_revoke(req->request); 1073 ceph_msg_revoke_incoming(req->reply); 1074 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 1075 } 1076 } 1077 1078 /* 1079 * Delayed work. If we haven't mounted yet, retry. Otherwise, 1080 * renew/retry subscription as needed (in case it is timing out, or we 1081 * got an ENOMEM). And keep the monitor connection alive. 1082 */ 1083 static void delayed_work(struct work_struct *work) 1084 { 1085 struct ceph_mon_client *monc = 1086 container_of(work, struct ceph_mon_client, delayed_work.work); 1087 1088 dout("monc delayed_work\n"); 1089 mutex_lock(&monc->mutex); 1090 if (monc->hunting) { 1091 dout("%s continuing hunt\n", __func__); 1092 reopen_session(monc); 1093 } else { 1094 int is_auth = ceph_auth_is_authenticated(monc->auth); 1095 if (ceph_con_keepalive_expired(&monc->con, 1096 CEPH_MONC_PING_TIMEOUT)) { 1097 dout("monc keepalive timeout\n"); 1098 is_auth = 0; 1099 reopen_session(monc); 1100 } 1101 1102 if (!monc->hunting) { 1103 ceph_con_keepalive(&monc->con); 1104 __validate_auth(monc); 1105 un_backoff(monc); 1106 } 1107 1108 if (is_auth && 1109 !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) { 1110 unsigned long now = jiffies; 1111 1112 dout("%s renew subs? now %lu renew after %lu\n", 1113 __func__, now, monc->sub_renew_after); 1114 if (time_after_eq(now, monc->sub_renew_after)) 1115 __send_subscribe(monc); 1116 } 1117 } 1118 __schedule_delayed(monc); 1119 mutex_unlock(&monc->mutex); 1120 } 1121 1122 /* 1123 * On startup, we build a temporary monmap populated with the IPs 1124 * provided by mount(2). 1125 */ 1126 static int build_initial_monmap(struct ceph_mon_client *monc) 1127 { 1128 __le32 my_type = ceph_msgr2(monc->client) ? 1129 CEPH_ENTITY_ADDR_TYPE_MSGR2 : CEPH_ENTITY_ADDR_TYPE_LEGACY; 1130 struct ceph_options *opt = monc->client->options; 1131 int num_mon = opt->num_mon; 1132 int i; 1133 1134 /* build initial monmap */ 1135 monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon), 1136 GFP_KERNEL); 1137 if (!monc->monmap) 1138 return -ENOMEM; 1139 1140 for (i = 0; i < num_mon; i++) { 1141 struct ceph_entity_inst *inst = &monc->monmap->mon_inst[i]; 1142 1143 memcpy(&inst->addr.in_addr, &opt->mon_addr[i].in_addr, 1144 sizeof(inst->addr.in_addr)); 1145 inst->addr.type = my_type; 1146 inst->addr.nonce = 0; 1147 inst->name.type = CEPH_ENTITY_TYPE_MON; 1148 inst->name.num = cpu_to_le64(i); 1149 } 1150 monc->monmap->num_mon = num_mon; 1151 return 0; 1152 } 1153 1154 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 1155 { 1156 int err = 0; 1157 1158 dout("init\n"); 1159 memset(monc, 0, sizeof(*monc)); 1160 monc->client = cl; 1161 monc->monmap = NULL; 1162 mutex_init(&monc->mutex); 1163 1164 err = build_initial_monmap(monc); 1165 if (err) 1166 goto out; 1167 1168 /* connection */ 1169 /* authentication */ 1170 monc->auth = ceph_auth_init(cl->options->name, cl->options->key, 1171 cl->options->con_modes); 1172 if (IS_ERR(monc->auth)) { 1173 err = PTR_ERR(monc->auth); 1174 goto out_monmap; 1175 } 1176 monc->auth->want_keys = 1177 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 1178 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 1179 1180 /* msgs */ 1181 err = -ENOMEM; 1182 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 1183 sizeof(struct ceph_mon_subscribe_ack), 1184 GFP_KERNEL, true); 1185 if (!monc->m_subscribe_ack) 1186 goto out_auth; 1187 1188 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128, 1189 GFP_KERNEL, true); 1190 if (!monc->m_subscribe) 1191 goto out_subscribe_ack; 1192 1193 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, 1194 GFP_KERNEL, true); 1195 if (!monc->m_auth_reply) 1196 goto out_subscribe; 1197 1198 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true); 1199 monc->pending_auth = 0; 1200 if (!monc->m_auth) 1201 goto out_auth_reply; 1202 1203 ceph_con_init(&monc->con, monc, &mon_con_ops, 1204 &monc->client->msgr); 1205 1206 monc->cur_mon = -1; 1207 monc->had_a_connection = false; 1208 monc->hunt_mult = 1; 1209 1210 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 1211 monc->generic_request_tree = RB_ROOT; 1212 monc->last_tid = 0; 1213 1214 monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE; 1215 1216 return 0; 1217 1218 out_auth_reply: 1219 ceph_msg_put(monc->m_auth_reply); 1220 out_subscribe: 1221 ceph_msg_put(monc->m_subscribe); 1222 out_subscribe_ack: 1223 ceph_msg_put(monc->m_subscribe_ack); 1224 out_auth: 1225 ceph_auth_destroy(monc->auth); 1226 out_monmap: 1227 kfree(monc->monmap); 1228 out: 1229 return err; 1230 } 1231 EXPORT_SYMBOL(ceph_monc_init); 1232 1233 void ceph_monc_stop(struct ceph_mon_client *monc) 1234 { 1235 dout("stop\n"); 1236 cancel_delayed_work_sync(&monc->delayed_work); 1237 1238 mutex_lock(&monc->mutex); 1239 __close_session(monc); 1240 monc->cur_mon = -1; 1241 mutex_unlock(&monc->mutex); 1242 1243 /* 1244 * flush msgr queue before we destroy ourselves to ensure that: 1245 * - any work that references our embedded con is finished. 1246 * - any osd_client or other work that may reference an authorizer 1247 * finishes before we shut down the auth subsystem. 1248 */ 1249 ceph_msgr_flush(); 1250 1251 ceph_auth_destroy(monc->auth); 1252 1253 WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree)); 1254 1255 ceph_msg_put(monc->m_auth); 1256 ceph_msg_put(monc->m_auth_reply); 1257 ceph_msg_put(monc->m_subscribe); 1258 ceph_msg_put(monc->m_subscribe_ack); 1259 1260 kfree(monc->monmap); 1261 } 1262 EXPORT_SYMBOL(ceph_monc_stop); 1263 1264 static void finish_hunting(struct ceph_mon_client *monc) 1265 { 1266 if (monc->hunting) { 1267 dout("%s found mon%d\n", __func__, monc->cur_mon); 1268 monc->hunting = false; 1269 monc->had_a_connection = true; 1270 un_backoff(monc); 1271 __schedule_delayed(monc); 1272 } 1273 } 1274 1275 static void finish_auth(struct ceph_mon_client *monc, int auth_err, 1276 bool was_authed) 1277 { 1278 dout("%s auth_err %d was_authed %d\n", __func__, auth_err, was_authed); 1279 WARN_ON(auth_err > 0); 1280 1281 monc->pending_auth = 0; 1282 if (auth_err) { 1283 monc->client->auth_err = auth_err; 1284 wake_up_all(&monc->client->auth_wq); 1285 return; 1286 } 1287 1288 if (!was_authed && ceph_auth_is_authenticated(monc->auth)) { 1289 dout("%s authenticated, starting session global_id %llu\n", 1290 __func__, monc->auth->global_id); 1291 1292 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 1293 monc->client->msgr.inst.name.num = 1294 cpu_to_le64(monc->auth->global_id); 1295 1296 __send_subscribe(monc); 1297 __resend_generic_request(monc); 1298 1299 pr_info("mon%d %s session established\n", monc->cur_mon, 1300 ceph_pr_addr(&monc->con.peer_addr)); 1301 } 1302 } 1303 1304 static void handle_auth_reply(struct ceph_mon_client *monc, 1305 struct ceph_msg *msg) 1306 { 1307 bool was_authed; 1308 int ret; 1309 1310 mutex_lock(&monc->mutex); 1311 was_authed = ceph_auth_is_authenticated(monc->auth); 1312 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 1313 msg->front.iov_len, 1314 monc->m_auth->front.iov_base, 1315 monc->m_auth->front_alloc_len); 1316 if (ret > 0) { 1317 __send_prepared_auth_request(monc, ret); 1318 } else { 1319 finish_auth(monc, ret, was_authed); 1320 finish_hunting(monc); 1321 } 1322 mutex_unlock(&monc->mutex); 1323 } 1324 1325 static int __validate_auth(struct ceph_mon_client *monc) 1326 { 1327 int ret; 1328 1329 if (monc->pending_auth) 1330 return 0; 1331 1332 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 1333 monc->m_auth->front_alloc_len); 1334 if (ret <= 0) 1335 return ret; /* either an error, or no need to authenticate */ 1336 __send_prepared_auth_request(monc, ret); 1337 return 0; 1338 } 1339 1340 int ceph_monc_validate_auth(struct ceph_mon_client *monc) 1341 { 1342 int ret; 1343 1344 mutex_lock(&monc->mutex); 1345 ret = __validate_auth(monc); 1346 mutex_unlock(&monc->mutex); 1347 return ret; 1348 } 1349 EXPORT_SYMBOL(ceph_monc_validate_auth); 1350 1351 static int mon_get_auth_request(struct ceph_connection *con, 1352 void *buf, int *buf_len, 1353 void **authorizer, int *authorizer_len) 1354 { 1355 struct ceph_mon_client *monc = con->private; 1356 int ret; 1357 1358 mutex_lock(&monc->mutex); 1359 ret = ceph_auth_get_request(monc->auth, buf, *buf_len); 1360 mutex_unlock(&monc->mutex); 1361 if (ret < 0) 1362 return ret; 1363 1364 *buf_len = ret; 1365 *authorizer = NULL; 1366 *authorizer_len = 0; 1367 return 0; 1368 } 1369 1370 static int mon_handle_auth_reply_more(struct ceph_connection *con, 1371 void *reply, int reply_len, 1372 void *buf, int *buf_len, 1373 void **authorizer, int *authorizer_len) 1374 { 1375 struct ceph_mon_client *monc = con->private; 1376 int ret; 1377 1378 mutex_lock(&monc->mutex); 1379 ret = ceph_auth_handle_reply_more(monc->auth, reply, reply_len, 1380 buf, *buf_len); 1381 mutex_unlock(&monc->mutex); 1382 if (ret < 0) 1383 return ret; 1384 1385 *buf_len = ret; 1386 *authorizer = NULL; 1387 *authorizer_len = 0; 1388 return 0; 1389 } 1390 1391 static int mon_handle_auth_done(struct ceph_connection *con, 1392 u64 global_id, void *reply, int reply_len, 1393 u8 *session_key, int *session_key_len, 1394 u8 *con_secret, int *con_secret_len) 1395 { 1396 struct ceph_mon_client *monc = con->private; 1397 bool was_authed; 1398 int ret; 1399 1400 mutex_lock(&monc->mutex); 1401 WARN_ON(!monc->hunting); 1402 was_authed = ceph_auth_is_authenticated(monc->auth); 1403 ret = ceph_auth_handle_reply_done(monc->auth, global_id, 1404 reply, reply_len, 1405 session_key, session_key_len, 1406 con_secret, con_secret_len); 1407 finish_auth(monc, ret, was_authed); 1408 if (!ret) 1409 finish_hunting(monc); 1410 mutex_unlock(&monc->mutex); 1411 return 0; 1412 } 1413 1414 static int mon_handle_auth_bad_method(struct ceph_connection *con, 1415 int used_proto, int result, 1416 const int *allowed_protos, int proto_cnt, 1417 const int *allowed_modes, int mode_cnt) 1418 { 1419 struct ceph_mon_client *monc = con->private; 1420 bool was_authed; 1421 1422 mutex_lock(&monc->mutex); 1423 WARN_ON(!monc->hunting); 1424 was_authed = ceph_auth_is_authenticated(monc->auth); 1425 ceph_auth_handle_bad_method(monc->auth, used_proto, result, 1426 allowed_protos, proto_cnt, 1427 allowed_modes, mode_cnt); 1428 finish_auth(monc, -EACCES, was_authed); 1429 mutex_unlock(&monc->mutex); 1430 return 0; 1431 } 1432 1433 /* 1434 * handle incoming message 1435 */ 1436 static void mon_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 1437 { 1438 struct ceph_mon_client *monc = con->private; 1439 int type = le16_to_cpu(msg->hdr.type); 1440 1441 switch (type) { 1442 case CEPH_MSG_AUTH_REPLY: 1443 handle_auth_reply(monc, msg); 1444 break; 1445 1446 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1447 handle_subscribe_ack(monc, msg); 1448 break; 1449 1450 case CEPH_MSG_STATFS_REPLY: 1451 handle_statfs_reply(monc, msg); 1452 break; 1453 1454 case CEPH_MSG_MON_GET_VERSION_REPLY: 1455 handle_get_version_reply(monc, msg); 1456 break; 1457 1458 case CEPH_MSG_MON_COMMAND_ACK: 1459 handle_command_ack(monc, msg); 1460 break; 1461 1462 case CEPH_MSG_MON_MAP: 1463 ceph_monc_handle_map(monc, msg); 1464 break; 1465 1466 case CEPH_MSG_OSD_MAP: 1467 ceph_osdc_handle_map(&monc->client->osdc, msg); 1468 break; 1469 1470 default: 1471 /* can the chained handler handle it? */ 1472 if (monc->client->extra_mon_dispatch && 1473 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 1474 break; 1475 1476 pr_err("received unknown message type %d %s\n", type, 1477 ceph_msg_type_name(type)); 1478 } 1479 ceph_msg_put(msg); 1480 } 1481 1482 /* 1483 * Allocate memory for incoming message 1484 */ 1485 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 1486 struct ceph_msg_header *hdr, 1487 int *skip) 1488 { 1489 struct ceph_mon_client *monc = con->private; 1490 int type = le16_to_cpu(hdr->type); 1491 int front_len = le32_to_cpu(hdr->front_len); 1492 struct ceph_msg *m = NULL; 1493 1494 *skip = 0; 1495 1496 switch (type) { 1497 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1498 m = ceph_msg_get(monc->m_subscribe_ack); 1499 break; 1500 case CEPH_MSG_STATFS_REPLY: 1501 case CEPH_MSG_MON_COMMAND_ACK: 1502 return get_generic_reply(con, hdr, skip); 1503 case CEPH_MSG_AUTH_REPLY: 1504 m = ceph_msg_get(monc->m_auth_reply); 1505 break; 1506 case CEPH_MSG_MON_GET_VERSION_REPLY: 1507 if (le64_to_cpu(hdr->tid) != 0) 1508 return get_generic_reply(con, hdr, skip); 1509 1510 /* 1511 * Older OSDs don't set reply tid even if the original 1512 * request had a non-zero tid. Work around this weirdness 1513 * by allocating a new message. 1514 */ 1515 fallthrough; 1516 case CEPH_MSG_MON_MAP: 1517 case CEPH_MSG_MDS_MAP: 1518 case CEPH_MSG_OSD_MAP: 1519 case CEPH_MSG_FS_MAP_USER: 1520 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1521 if (!m) 1522 return NULL; /* ENOMEM--return skip == 0 */ 1523 break; 1524 } 1525 1526 if (!m) { 1527 pr_info("alloc_msg unknown type %d\n", type); 1528 *skip = 1; 1529 } else if (front_len > m->front_alloc_len) { 1530 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n", 1531 front_len, m->front_alloc_len, 1532 (unsigned int)con->peer_name.type, 1533 le64_to_cpu(con->peer_name.num)); 1534 ceph_msg_put(m); 1535 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1536 } 1537 1538 return m; 1539 } 1540 1541 /* 1542 * If the monitor connection resets, pick a new monitor and resubmit 1543 * any pending requests. 1544 */ 1545 static void mon_fault(struct ceph_connection *con) 1546 { 1547 struct ceph_mon_client *monc = con->private; 1548 1549 mutex_lock(&monc->mutex); 1550 dout("%s mon%d\n", __func__, monc->cur_mon); 1551 if (monc->cur_mon >= 0) { 1552 if (!monc->hunting) { 1553 dout("%s hunting for new mon\n", __func__); 1554 reopen_session(monc); 1555 __schedule_delayed(monc); 1556 } else { 1557 dout("%s already hunting\n", __func__); 1558 } 1559 } 1560 mutex_unlock(&monc->mutex); 1561 } 1562 1563 /* 1564 * We can ignore refcounting on the connection struct, as all references 1565 * will come from the messenger workqueue, which is drained prior to 1566 * mon_client destruction. 1567 */ 1568 static struct ceph_connection *mon_get_con(struct ceph_connection *con) 1569 { 1570 return con; 1571 } 1572 1573 static void mon_put_con(struct ceph_connection *con) 1574 { 1575 } 1576 1577 static const struct ceph_connection_operations mon_con_ops = { 1578 .get = mon_get_con, 1579 .put = mon_put_con, 1580 .alloc_msg = mon_alloc_msg, 1581 .dispatch = mon_dispatch, 1582 .fault = mon_fault, 1583 .get_auth_request = mon_get_auth_request, 1584 .handle_auth_reply_more = mon_handle_auth_reply_more, 1585 .handle_auth_done = mon_handle_auth_done, 1586 .handle_auth_bad_method = mon_handle_auth_bad_method, 1587 }; 1588