1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/module.h> 5 #include <linux/types.h> 6 #include <linux/slab.h> 7 #include <linux/random.h> 8 #include <linux/sched.h> 9 10 #include <linux/ceph/ceph_features.h> 11 #include <linux/ceph/mon_client.h> 12 #include <linux/ceph/libceph.h> 13 #include <linux/ceph/debugfs.h> 14 #include <linux/ceph/decode.h> 15 #include <linux/ceph/auth.h> 16 17 /* 18 * Interact with Ceph monitor cluster. Handle requests for new map 19 * versions, and periodically resend as needed. Also implement 20 * statfs() and umount(). 21 * 22 * A small cluster of Ceph "monitors" are responsible for managing critical 23 * cluster configuration and state information. An odd number (e.g., 3, 5) 24 * of cmon daemons use a modified version of the Paxos part-time parliament 25 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 26 * list of clients who have mounted the file system. 27 * 28 * We maintain an open, active session with a monitor at all times in order to 29 * receive timely MDSMap updates. We periodically send a keepalive byte on the 30 * TCP socket to ensure we detect a failure. If the connection does break, we 31 * randomly hunt for a new monitor. Once the connection is reestablished, we 32 * resend any outstanding requests. 33 */ 34 35 static const struct ceph_connection_operations mon_con_ops; 36 37 static int __validate_auth(struct ceph_mon_client *monc); 38 39 static int decode_mon_info(void **p, void *end, bool msgr2, 40 struct ceph_entity_addr *addr) 41 { 42 void *mon_info_end; 43 u32 struct_len; 44 u8 struct_v; 45 int ret; 46 47 ret = ceph_start_decoding(p, end, 1, "mon_info_t", &struct_v, 48 &struct_len); 49 if (ret) 50 return ret; 51 52 mon_info_end = *p + struct_len; 53 ceph_decode_skip_string(p, end, e_inval); /* skip mon name */ 54 ret = ceph_decode_entity_addrvec(p, end, msgr2, addr); 55 if (ret) 56 return ret; 57 58 *p = mon_info_end; 59 return 0; 60 61 e_inval: 62 return -EINVAL; 63 } 64 65 /* 66 * Decode a monmap blob (e.g., during mount). 67 * 68 * Assume MonMap v3 (i.e. encoding with MONNAMES and MONENC). 69 */ 70 static struct ceph_monmap *ceph_monmap_decode(void **p, void *end, bool msgr2) 71 { 72 struct ceph_monmap *monmap = NULL; 73 struct ceph_fsid fsid; 74 u32 struct_len; 75 int blob_len; 76 int num_mon; 77 u8 struct_v; 78 u32 epoch; 79 int ret; 80 int i; 81 82 ceph_decode_32_safe(p, end, blob_len, e_inval); 83 ceph_decode_need(p, end, blob_len, e_inval); 84 85 ret = ceph_start_decoding(p, end, 6, "monmap", &struct_v, &struct_len); 86 if (ret) 87 goto fail; 88 89 dout("%s struct_v %d\n", __func__, struct_v); 90 ceph_decode_copy_safe(p, end, &fsid, sizeof(fsid), e_inval); 91 ceph_decode_32_safe(p, end, epoch, e_inval); 92 if (struct_v >= 6) { 93 u32 feat_struct_len; 94 u8 feat_struct_v; 95 96 *p += sizeof(struct ceph_timespec); /* skip last_changed */ 97 *p += sizeof(struct ceph_timespec); /* skip created */ 98 99 ret = ceph_start_decoding(p, end, 1, "mon_feature_t", 100 &feat_struct_v, &feat_struct_len); 101 if (ret) 102 goto fail; 103 104 *p += feat_struct_len; /* skip persistent_features */ 105 106 ret = ceph_start_decoding(p, end, 1, "mon_feature_t", 107 &feat_struct_v, &feat_struct_len); 108 if (ret) 109 goto fail; 110 111 *p += feat_struct_len; /* skip optional_features */ 112 } 113 ceph_decode_32_safe(p, end, num_mon, e_inval); 114 115 dout("%s fsid %pU epoch %u num_mon %d\n", __func__, &fsid, epoch, 116 num_mon); 117 if (num_mon > CEPH_MAX_MON) 118 goto e_inval; 119 120 monmap = kmalloc(struct_size(monmap, mon_inst, num_mon), GFP_NOIO); 121 if (!monmap) { 122 ret = -ENOMEM; 123 goto fail; 124 } 125 monmap->fsid = fsid; 126 monmap->epoch = epoch; 127 monmap->num_mon = num_mon; 128 129 /* legacy_mon_addr map or mon_info map */ 130 for (i = 0; i < num_mon; i++) { 131 struct ceph_entity_inst *inst = &monmap->mon_inst[i]; 132 133 ceph_decode_skip_string(p, end, e_inval); /* skip mon name */ 134 inst->name.type = CEPH_ENTITY_TYPE_MON; 135 inst->name.num = cpu_to_le64(i); 136 137 if (struct_v >= 6) 138 ret = decode_mon_info(p, end, msgr2, &inst->addr); 139 else 140 ret = ceph_decode_entity_addr(p, end, &inst->addr); 141 if (ret) 142 goto fail; 143 144 dout("%s mon%d addr %s\n", __func__, i, 145 ceph_pr_addr(&inst->addr)); 146 } 147 148 return monmap; 149 150 e_inval: 151 ret = -EINVAL; 152 fail: 153 kfree(monmap); 154 return ERR_PTR(ret); 155 } 156 157 /* 158 * return true if *addr is included in the monmap. 159 */ 160 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 161 { 162 int i; 163 164 for (i = 0; i < m->num_mon; i++) { 165 if (ceph_addr_equal_no_type(addr, &m->mon_inst[i].addr)) 166 return 1; 167 } 168 169 return 0; 170 } 171 172 /* 173 * Send an auth request. 174 */ 175 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 176 { 177 monc->pending_auth = 1; 178 monc->m_auth->front.iov_len = len; 179 monc->m_auth->hdr.front_len = cpu_to_le32(len); 180 ceph_msg_revoke(monc->m_auth); 181 ceph_msg_get(monc->m_auth); /* keep our ref */ 182 ceph_con_send(&monc->con, monc->m_auth); 183 } 184 185 /* 186 * Close monitor session, if any. 187 */ 188 static void __close_session(struct ceph_mon_client *monc) 189 { 190 dout("__close_session closing mon%d\n", monc->cur_mon); 191 ceph_msg_revoke(monc->m_auth); 192 ceph_msg_revoke_incoming(monc->m_auth_reply); 193 ceph_msg_revoke(monc->m_subscribe); 194 ceph_msg_revoke_incoming(monc->m_subscribe_ack); 195 ceph_con_close(&monc->con); 196 197 monc->pending_auth = 0; 198 ceph_auth_reset(monc->auth); 199 } 200 201 /* 202 * Pick a new monitor at random and set cur_mon. If we are repicking 203 * (i.e. cur_mon is already set), be sure to pick a different one. 204 */ 205 static void pick_new_mon(struct ceph_mon_client *monc) 206 { 207 int old_mon = monc->cur_mon; 208 209 BUG_ON(monc->monmap->num_mon < 1); 210 211 if (monc->monmap->num_mon == 1) { 212 monc->cur_mon = 0; 213 } else { 214 int max = monc->monmap->num_mon; 215 int o = -1; 216 int n; 217 218 if (monc->cur_mon >= 0) { 219 if (monc->cur_mon < monc->monmap->num_mon) 220 o = monc->cur_mon; 221 if (o >= 0) 222 max--; 223 } 224 225 n = get_random_u32_below(max); 226 if (o >= 0 && n >= o) 227 n++; 228 229 monc->cur_mon = n; 230 } 231 232 dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon, 233 monc->cur_mon, monc->monmap->num_mon); 234 } 235 236 /* 237 * Open a session with a new monitor. 238 */ 239 static void __open_session(struct ceph_mon_client *monc) 240 { 241 int ret; 242 243 pick_new_mon(monc); 244 245 monc->hunting = true; 246 if (monc->had_a_connection) { 247 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF; 248 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT) 249 monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT; 250 } 251 252 monc->sub_renew_after = jiffies; /* i.e., expired */ 253 monc->sub_renew_sent = 0; 254 255 dout("%s opening mon%d\n", __func__, monc->cur_mon); 256 ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon, 257 &monc->monmap->mon_inst[monc->cur_mon].addr); 258 259 /* 260 * Queue a keepalive to ensure that in case of an early fault 261 * the messenger doesn't put us into STANDBY state and instead 262 * retries. This also ensures that our timestamp is valid by 263 * the time we finish hunting and delayed_work() checks it. 264 */ 265 ceph_con_keepalive(&monc->con); 266 if (ceph_msgr2(monc->client)) { 267 monc->pending_auth = 1; 268 return; 269 } 270 271 /* initiate authentication handshake */ 272 ret = ceph_auth_build_hello(monc->auth, 273 monc->m_auth->front.iov_base, 274 monc->m_auth->front_alloc_len); 275 BUG_ON(ret <= 0); 276 __send_prepared_auth_request(monc, ret); 277 } 278 279 static void reopen_session(struct ceph_mon_client *monc) 280 { 281 if (!monc->hunting) 282 pr_info("mon%d %s session lost, hunting for new mon\n", 283 monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr)); 284 285 __close_session(monc); 286 __open_session(monc); 287 } 288 289 void ceph_monc_reopen_session(struct ceph_mon_client *monc) 290 { 291 mutex_lock(&monc->mutex); 292 reopen_session(monc); 293 mutex_unlock(&monc->mutex); 294 } 295 296 static void un_backoff(struct ceph_mon_client *monc) 297 { 298 monc->hunt_mult /= 2; /* reduce by 50% */ 299 if (monc->hunt_mult < 1) 300 monc->hunt_mult = 1; 301 dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult); 302 } 303 304 /* 305 * Reschedule delayed work timer. 306 */ 307 static void __schedule_delayed(struct ceph_mon_client *monc) 308 { 309 unsigned long delay; 310 311 if (monc->hunting) 312 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult; 313 else 314 delay = CEPH_MONC_PING_INTERVAL; 315 316 dout("__schedule_delayed after %lu\n", delay); 317 mod_delayed_work(system_wq, &monc->delayed_work, 318 round_jiffies_relative(delay)); 319 } 320 321 const char *ceph_sub_str[] = { 322 [CEPH_SUB_MONMAP] = "monmap", 323 [CEPH_SUB_OSDMAP] = "osdmap", 324 [CEPH_SUB_FSMAP] = "fsmap.user", 325 [CEPH_SUB_MDSMAP] = "mdsmap", 326 }; 327 328 /* 329 * Send subscribe request for one or more maps, according to 330 * monc->subs. 331 */ 332 static void __send_subscribe(struct ceph_mon_client *monc) 333 { 334 struct ceph_msg *msg = monc->m_subscribe; 335 void *p = msg->front.iov_base; 336 void *const end = p + msg->front_alloc_len; 337 int num = 0; 338 int i; 339 340 dout("%s sent %lu\n", __func__, monc->sub_renew_sent); 341 342 BUG_ON(monc->cur_mon < 0); 343 344 if (!monc->sub_renew_sent) 345 monc->sub_renew_sent = jiffies | 1; /* never 0 */ 346 347 msg->hdr.version = cpu_to_le16(2); 348 349 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 350 if (monc->subs[i].want) 351 num++; 352 } 353 BUG_ON(num < 1); /* monmap sub is always there */ 354 ceph_encode_32(&p, num); 355 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 356 char buf[32]; 357 int len; 358 359 if (!monc->subs[i].want) 360 continue; 361 362 len = sprintf(buf, "%s", ceph_sub_str[i]); 363 if (i == CEPH_SUB_MDSMAP && 364 monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE) 365 len += sprintf(buf + len, ".%d", monc->fs_cluster_id); 366 367 dout("%s %s start %llu flags 0x%x\n", __func__, buf, 368 le64_to_cpu(monc->subs[i].item.start), 369 monc->subs[i].item.flags); 370 ceph_encode_string(&p, end, buf, len); 371 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item)); 372 p += sizeof(monc->subs[i].item); 373 } 374 375 BUG_ON(p > end); 376 msg->front.iov_len = p - msg->front.iov_base; 377 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 378 ceph_msg_revoke(msg); 379 ceph_con_send(&monc->con, ceph_msg_get(msg)); 380 } 381 382 static void handle_subscribe_ack(struct ceph_mon_client *monc, 383 struct ceph_msg *msg) 384 { 385 unsigned int seconds; 386 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 387 388 if (msg->front.iov_len < sizeof(*h)) 389 goto bad; 390 seconds = le32_to_cpu(h->duration); 391 392 mutex_lock(&monc->mutex); 393 if (monc->sub_renew_sent) { 394 /* 395 * This is only needed for legacy (infernalis or older) 396 * MONs -- see delayed_work(). 397 */ 398 monc->sub_renew_after = monc->sub_renew_sent + 399 (seconds >> 1) * HZ - 1; 400 dout("%s sent %lu duration %d renew after %lu\n", __func__, 401 monc->sub_renew_sent, seconds, monc->sub_renew_after); 402 monc->sub_renew_sent = 0; 403 } else { 404 dout("%s sent %lu renew after %lu, ignoring\n", __func__, 405 monc->sub_renew_sent, monc->sub_renew_after); 406 } 407 mutex_unlock(&monc->mutex); 408 return; 409 bad: 410 pr_err("got corrupt subscribe-ack msg\n"); 411 ceph_msg_dump(msg); 412 } 413 414 /* 415 * Register interest in a map 416 * 417 * @sub: one of CEPH_SUB_* 418 * @epoch: X for "every map since X", or 0 for "just the latest" 419 */ 420 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub, 421 u32 epoch, bool continuous) 422 { 423 __le64 start = cpu_to_le64(epoch); 424 u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0; 425 426 dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub], 427 epoch, continuous); 428 429 if (monc->subs[sub].want && 430 monc->subs[sub].item.start == start && 431 monc->subs[sub].item.flags == flags) 432 return false; 433 434 monc->subs[sub].item.start = start; 435 monc->subs[sub].item.flags = flags; 436 monc->subs[sub].want = true; 437 438 return true; 439 } 440 441 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch, 442 bool continuous) 443 { 444 bool need_request; 445 446 mutex_lock(&monc->mutex); 447 need_request = __ceph_monc_want_map(monc, sub, epoch, continuous); 448 mutex_unlock(&monc->mutex); 449 450 return need_request; 451 } 452 EXPORT_SYMBOL(ceph_monc_want_map); 453 454 /* 455 * Keep track of which maps we have 456 * 457 * @sub: one of CEPH_SUB_* 458 */ 459 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub, 460 u32 epoch) 461 { 462 dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch); 463 464 if (monc->subs[sub].want) { 465 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME) 466 monc->subs[sub].want = false; 467 else 468 monc->subs[sub].item.start = cpu_to_le64(epoch + 1); 469 } 470 471 monc->subs[sub].have = epoch; 472 } 473 474 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch) 475 { 476 mutex_lock(&monc->mutex); 477 __ceph_monc_got_map(monc, sub, epoch); 478 mutex_unlock(&monc->mutex); 479 } 480 EXPORT_SYMBOL(ceph_monc_got_map); 481 482 void ceph_monc_renew_subs(struct ceph_mon_client *monc) 483 { 484 mutex_lock(&monc->mutex); 485 __send_subscribe(monc); 486 mutex_unlock(&monc->mutex); 487 } 488 EXPORT_SYMBOL(ceph_monc_renew_subs); 489 490 /* 491 * Wait for an osdmap with a given epoch. 492 * 493 * @epoch: epoch to wait for 494 * @timeout: in jiffies, 0 means "wait forever" 495 */ 496 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, 497 unsigned long timeout) 498 { 499 unsigned long started = jiffies; 500 long ret; 501 502 mutex_lock(&monc->mutex); 503 while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) { 504 mutex_unlock(&monc->mutex); 505 506 if (timeout && time_after_eq(jiffies, started + timeout)) 507 return -ETIMEDOUT; 508 509 ret = wait_event_interruptible_timeout(monc->client->auth_wq, 510 monc->subs[CEPH_SUB_OSDMAP].have >= epoch, 511 ceph_timeout_jiffies(timeout)); 512 if (ret < 0) 513 return ret; 514 515 mutex_lock(&monc->mutex); 516 } 517 518 mutex_unlock(&monc->mutex); 519 return 0; 520 } 521 EXPORT_SYMBOL(ceph_monc_wait_osdmap); 522 523 /* 524 * Open a session with a random monitor. Request monmap and osdmap, 525 * which are waited upon in __ceph_open_session(). 526 */ 527 int ceph_monc_open_session(struct ceph_mon_client *monc) 528 { 529 mutex_lock(&monc->mutex); 530 __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true); 531 __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false); 532 __open_session(monc); 533 __schedule_delayed(monc); 534 mutex_unlock(&monc->mutex); 535 return 0; 536 } 537 EXPORT_SYMBOL(ceph_monc_open_session); 538 539 static void ceph_monc_handle_map(struct ceph_mon_client *monc, 540 struct ceph_msg *msg) 541 { 542 struct ceph_client *client = monc->client; 543 struct ceph_monmap *monmap; 544 void *p, *end; 545 546 mutex_lock(&monc->mutex); 547 548 dout("handle_monmap\n"); 549 p = msg->front.iov_base; 550 end = p + msg->front.iov_len; 551 552 monmap = ceph_monmap_decode(&p, end, ceph_msgr2(client)); 553 if (IS_ERR(monmap)) { 554 pr_err("problem decoding monmap, %d\n", 555 (int)PTR_ERR(monmap)); 556 ceph_msg_dump(msg); 557 goto out; 558 } 559 560 if (ceph_check_fsid(client, &monmap->fsid) < 0) { 561 kfree(monmap); 562 goto out; 563 } 564 565 kfree(monc->monmap); 566 monc->monmap = monmap; 567 568 __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch); 569 client->have_fsid = true; 570 571 out: 572 mutex_unlock(&monc->mutex); 573 wake_up_all(&client->auth_wq); 574 } 575 576 /* 577 * generic requests (currently statfs, mon_get_version) 578 */ 579 DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node) 580 581 static void release_generic_request(struct kref *kref) 582 { 583 struct ceph_mon_generic_request *req = 584 container_of(kref, struct ceph_mon_generic_request, kref); 585 586 dout("%s greq %p request %p reply %p\n", __func__, req, req->request, 587 req->reply); 588 WARN_ON(!RB_EMPTY_NODE(&req->node)); 589 590 if (req->reply) 591 ceph_msg_put(req->reply); 592 if (req->request) 593 ceph_msg_put(req->request); 594 595 kfree(req); 596 } 597 598 static void put_generic_request(struct ceph_mon_generic_request *req) 599 { 600 if (req) 601 kref_put(&req->kref, release_generic_request); 602 } 603 604 static void get_generic_request(struct ceph_mon_generic_request *req) 605 { 606 kref_get(&req->kref); 607 } 608 609 static struct ceph_mon_generic_request * 610 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp) 611 { 612 struct ceph_mon_generic_request *req; 613 614 req = kzalloc(sizeof(*req), gfp); 615 if (!req) 616 return NULL; 617 618 req->monc = monc; 619 kref_init(&req->kref); 620 RB_CLEAR_NODE(&req->node); 621 init_completion(&req->completion); 622 623 dout("%s greq %p\n", __func__, req); 624 return req; 625 } 626 627 static void register_generic_request(struct ceph_mon_generic_request *req) 628 { 629 struct ceph_mon_client *monc = req->monc; 630 631 WARN_ON(req->tid); 632 633 get_generic_request(req); 634 req->tid = ++monc->last_tid; 635 insert_generic_request(&monc->generic_request_tree, req); 636 } 637 638 static void send_generic_request(struct ceph_mon_client *monc, 639 struct ceph_mon_generic_request *req) 640 { 641 WARN_ON(!req->tid); 642 643 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 644 req->request->hdr.tid = cpu_to_le64(req->tid); 645 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 646 } 647 648 static void __finish_generic_request(struct ceph_mon_generic_request *req) 649 { 650 struct ceph_mon_client *monc = req->monc; 651 652 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 653 erase_generic_request(&monc->generic_request_tree, req); 654 655 ceph_msg_revoke(req->request); 656 ceph_msg_revoke_incoming(req->reply); 657 } 658 659 static void finish_generic_request(struct ceph_mon_generic_request *req) 660 { 661 __finish_generic_request(req); 662 put_generic_request(req); 663 } 664 665 static void complete_generic_request(struct ceph_mon_generic_request *req) 666 { 667 if (req->complete_cb) 668 req->complete_cb(req); 669 else 670 complete_all(&req->completion); 671 put_generic_request(req); 672 } 673 674 static void cancel_generic_request(struct ceph_mon_generic_request *req) 675 { 676 struct ceph_mon_client *monc = req->monc; 677 struct ceph_mon_generic_request *lookup_req; 678 679 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 680 681 mutex_lock(&monc->mutex); 682 lookup_req = lookup_generic_request(&monc->generic_request_tree, 683 req->tid); 684 if (lookup_req) { 685 WARN_ON(lookup_req != req); 686 finish_generic_request(req); 687 } 688 689 mutex_unlock(&monc->mutex); 690 } 691 692 static int wait_generic_request(struct ceph_mon_generic_request *req) 693 { 694 int ret; 695 696 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 697 ret = wait_for_completion_interruptible(&req->completion); 698 if (ret) 699 cancel_generic_request(req); 700 else 701 ret = req->result; /* completed */ 702 703 return ret; 704 } 705 706 static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 707 struct ceph_msg_header *hdr, 708 int *skip) 709 { 710 struct ceph_mon_client *monc = con->private; 711 struct ceph_mon_generic_request *req; 712 u64 tid = le64_to_cpu(hdr->tid); 713 struct ceph_msg *m; 714 715 mutex_lock(&monc->mutex); 716 req = lookup_generic_request(&monc->generic_request_tree, tid); 717 if (!req) { 718 dout("get_generic_reply %lld dne\n", tid); 719 *skip = 1; 720 m = NULL; 721 } else { 722 dout("get_generic_reply %lld got %p\n", tid, req->reply); 723 *skip = 0; 724 m = ceph_msg_get(req->reply); 725 /* 726 * we don't need to track the connection reading into 727 * this reply because we only have one open connection 728 * at a time, ever. 729 */ 730 } 731 mutex_unlock(&monc->mutex); 732 return m; 733 } 734 735 /* 736 * statfs 737 */ 738 static void handle_statfs_reply(struct ceph_mon_client *monc, 739 struct ceph_msg *msg) 740 { 741 struct ceph_mon_generic_request *req; 742 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 743 u64 tid = le64_to_cpu(msg->hdr.tid); 744 745 dout("%s msg %p tid %llu\n", __func__, msg, tid); 746 747 if (msg->front.iov_len != sizeof(*reply)) 748 goto bad; 749 750 mutex_lock(&monc->mutex); 751 req = lookup_generic_request(&monc->generic_request_tree, tid); 752 if (!req) { 753 mutex_unlock(&monc->mutex); 754 return; 755 } 756 757 req->result = 0; 758 *req->u.st = reply->st; /* struct */ 759 __finish_generic_request(req); 760 mutex_unlock(&monc->mutex); 761 762 complete_generic_request(req); 763 return; 764 765 bad: 766 pr_err("corrupt statfs reply, tid %llu\n", tid); 767 ceph_msg_dump(msg); 768 } 769 770 /* 771 * Do a synchronous statfs(). 772 */ 773 int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool, 774 struct ceph_statfs *buf) 775 { 776 struct ceph_mon_generic_request *req; 777 struct ceph_mon_statfs *h; 778 int ret = -ENOMEM; 779 780 req = alloc_generic_request(monc, GFP_NOFS); 781 if (!req) 782 goto out; 783 784 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, 785 true); 786 if (!req->request) 787 goto out; 788 789 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true); 790 if (!req->reply) 791 goto out; 792 793 req->u.st = buf; 794 req->request->hdr.version = cpu_to_le16(2); 795 796 mutex_lock(&monc->mutex); 797 register_generic_request(req); 798 /* fill out request */ 799 h = req->request->front.iov_base; 800 h->monhdr.have_version = 0; 801 h->monhdr.session_mon = cpu_to_le16(-1); 802 h->monhdr.session_mon_tid = 0; 803 h->fsid = monc->monmap->fsid; 804 h->contains_data_pool = (data_pool != CEPH_NOPOOL); 805 h->data_pool = cpu_to_le64(data_pool); 806 send_generic_request(monc, req); 807 mutex_unlock(&monc->mutex); 808 809 ret = wait_generic_request(req); 810 out: 811 put_generic_request(req); 812 return ret; 813 } 814 EXPORT_SYMBOL(ceph_monc_do_statfs); 815 816 static void handle_get_version_reply(struct ceph_mon_client *monc, 817 struct ceph_msg *msg) 818 { 819 struct ceph_mon_generic_request *req; 820 u64 tid = le64_to_cpu(msg->hdr.tid); 821 void *p = msg->front.iov_base; 822 void *end = p + msg->front_alloc_len; 823 u64 handle; 824 825 dout("%s msg %p tid %llu\n", __func__, msg, tid); 826 827 ceph_decode_need(&p, end, 2*sizeof(u64), bad); 828 handle = ceph_decode_64(&p); 829 if (tid != 0 && tid != handle) 830 goto bad; 831 832 mutex_lock(&monc->mutex); 833 req = lookup_generic_request(&monc->generic_request_tree, handle); 834 if (!req) { 835 mutex_unlock(&monc->mutex); 836 return; 837 } 838 839 req->result = 0; 840 req->u.newest = ceph_decode_64(&p); 841 __finish_generic_request(req); 842 mutex_unlock(&monc->mutex); 843 844 complete_generic_request(req); 845 return; 846 847 bad: 848 pr_err("corrupt mon_get_version reply, tid %llu\n", tid); 849 ceph_msg_dump(msg); 850 } 851 852 static struct ceph_mon_generic_request * 853 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 854 ceph_monc_callback_t cb, u64 private_data) 855 { 856 struct ceph_mon_generic_request *req; 857 858 req = alloc_generic_request(monc, GFP_NOIO); 859 if (!req) 860 goto err_put_req; 861 862 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, 863 sizeof(u64) + sizeof(u32) + strlen(what), 864 GFP_NOIO, true); 865 if (!req->request) 866 goto err_put_req; 867 868 req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO, 869 true); 870 if (!req->reply) 871 goto err_put_req; 872 873 req->complete_cb = cb; 874 req->private_data = private_data; 875 876 mutex_lock(&monc->mutex); 877 register_generic_request(req); 878 { 879 void *p = req->request->front.iov_base; 880 void *const end = p + req->request->front_alloc_len; 881 882 ceph_encode_64(&p, req->tid); /* handle */ 883 ceph_encode_string(&p, end, what, strlen(what)); 884 WARN_ON(p != end); 885 } 886 send_generic_request(monc, req); 887 mutex_unlock(&monc->mutex); 888 889 return req; 890 891 err_put_req: 892 put_generic_request(req); 893 return ERR_PTR(-ENOMEM); 894 } 895 896 /* 897 * Send MMonGetVersion and wait for the reply. 898 * 899 * @what: one of "mdsmap", "osdmap" or "monmap" 900 */ 901 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 902 u64 *newest) 903 { 904 struct ceph_mon_generic_request *req; 905 int ret; 906 907 req = __ceph_monc_get_version(monc, what, NULL, 0); 908 if (IS_ERR(req)) 909 return PTR_ERR(req); 910 911 ret = wait_generic_request(req); 912 if (!ret) 913 *newest = req->u.newest; 914 915 put_generic_request(req); 916 return ret; 917 } 918 EXPORT_SYMBOL(ceph_monc_get_version); 919 920 /* 921 * Send MMonGetVersion, 922 * 923 * @what: one of "mdsmap", "osdmap" or "monmap" 924 */ 925 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, 926 ceph_monc_callback_t cb, u64 private_data) 927 { 928 struct ceph_mon_generic_request *req; 929 930 req = __ceph_monc_get_version(monc, what, cb, private_data); 931 if (IS_ERR(req)) 932 return PTR_ERR(req); 933 934 put_generic_request(req); 935 return 0; 936 } 937 EXPORT_SYMBOL(ceph_monc_get_version_async); 938 939 static void handle_command_ack(struct ceph_mon_client *monc, 940 struct ceph_msg *msg) 941 { 942 struct ceph_mon_generic_request *req; 943 void *p = msg->front.iov_base; 944 void *const end = p + msg->front_alloc_len; 945 u64 tid = le64_to_cpu(msg->hdr.tid); 946 947 dout("%s msg %p tid %llu\n", __func__, msg, tid); 948 949 ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) + 950 sizeof(u32), bad); 951 p += sizeof(struct ceph_mon_request_header); 952 953 mutex_lock(&monc->mutex); 954 req = lookup_generic_request(&monc->generic_request_tree, tid); 955 if (!req) { 956 mutex_unlock(&monc->mutex); 957 return; 958 } 959 960 req->result = ceph_decode_32(&p); 961 __finish_generic_request(req); 962 mutex_unlock(&monc->mutex); 963 964 complete_generic_request(req); 965 return; 966 967 bad: 968 pr_err("corrupt mon_command ack, tid %llu\n", tid); 969 ceph_msg_dump(msg); 970 } 971 972 static __printf(2, 0) 973 int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt, 974 va_list ap) 975 { 976 struct ceph_mon_generic_request *req; 977 struct ceph_mon_command *h; 978 int ret = -ENOMEM; 979 int len; 980 981 req = alloc_generic_request(monc, GFP_NOIO); 982 if (!req) 983 goto out; 984 985 req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true); 986 if (!req->request) 987 goto out; 988 989 req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO, 990 true); 991 if (!req->reply) 992 goto out; 993 994 mutex_lock(&monc->mutex); 995 register_generic_request(req); 996 h = req->request->front.iov_base; 997 h->monhdr.have_version = 0; 998 h->monhdr.session_mon = cpu_to_le16(-1); 999 h->monhdr.session_mon_tid = 0; 1000 h->fsid = monc->monmap->fsid; 1001 h->num_strs = cpu_to_le32(1); 1002 len = vsprintf(h->str, fmt, ap); 1003 h->str_len = cpu_to_le32(len); 1004 send_generic_request(monc, req); 1005 mutex_unlock(&monc->mutex); 1006 1007 ret = wait_generic_request(req); 1008 out: 1009 put_generic_request(req); 1010 return ret; 1011 } 1012 1013 static __printf(2, 3) 1014 int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...) 1015 { 1016 va_list ap; 1017 int ret; 1018 1019 va_start(ap, fmt); 1020 ret = do_mon_command_vargs(monc, fmt, ap); 1021 va_end(ap); 1022 return ret; 1023 } 1024 1025 int ceph_monc_blocklist_add(struct ceph_mon_client *monc, 1026 struct ceph_entity_addr *client_addr) 1027 { 1028 int ret; 1029 1030 ret = do_mon_command(monc, 1031 "{ \"prefix\": \"osd blocklist\", \ 1032 \"blocklistop\": \"add\", \ 1033 \"addr\": \"%pISpc/%u\" }", 1034 &client_addr->in_addr, 1035 le32_to_cpu(client_addr->nonce)); 1036 if (ret == -EINVAL) { 1037 /* 1038 * The monitor returns EINVAL on an unrecognized command. 1039 * Try the legacy command -- it is exactly the same except 1040 * for the name. 1041 */ 1042 ret = do_mon_command(monc, 1043 "{ \"prefix\": \"osd blacklist\", \ 1044 \"blacklistop\": \"add\", \ 1045 \"addr\": \"%pISpc/%u\" }", 1046 &client_addr->in_addr, 1047 le32_to_cpu(client_addr->nonce)); 1048 } 1049 if (ret) 1050 return ret; 1051 1052 /* 1053 * Make sure we have the osdmap that includes the blocklist 1054 * entry. This is needed to ensure that the OSDs pick up the 1055 * new blocklist before processing any future requests from 1056 * this client. 1057 */ 1058 return ceph_wait_for_latest_osdmap(monc->client, 0); 1059 } 1060 EXPORT_SYMBOL(ceph_monc_blocklist_add); 1061 1062 /* 1063 * Resend pending generic requests. 1064 */ 1065 static void __resend_generic_request(struct ceph_mon_client *monc) 1066 { 1067 struct ceph_mon_generic_request *req; 1068 struct rb_node *p; 1069 1070 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 1071 req = rb_entry(p, struct ceph_mon_generic_request, node); 1072 ceph_msg_revoke(req->request); 1073 ceph_msg_revoke_incoming(req->reply); 1074 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 1075 } 1076 } 1077 1078 /* 1079 * Delayed work. If we haven't mounted yet, retry. Otherwise, 1080 * renew/retry subscription as needed (in case it is timing out, or we 1081 * got an ENOMEM). And keep the monitor connection alive. 1082 */ 1083 static void delayed_work(struct work_struct *work) 1084 { 1085 struct ceph_mon_client *monc = 1086 container_of(work, struct ceph_mon_client, delayed_work.work); 1087 1088 mutex_lock(&monc->mutex); 1089 dout("%s mon%d\n", __func__, monc->cur_mon); 1090 if (monc->cur_mon < 0) { 1091 goto out; 1092 } 1093 1094 if (monc->hunting) { 1095 dout("%s continuing hunt\n", __func__); 1096 reopen_session(monc); 1097 } else { 1098 int is_auth = ceph_auth_is_authenticated(monc->auth); 1099 1100 dout("%s is_authed %d\n", __func__, is_auth); 1101 if (ceph_con_keepalive_expired(&monc->con, 1102 CEPH_MONC_PING_TIMEOUT)) { 1103 dout("monc keepalive timeout\n"); 1104 is_auth = 0; 1105 reopen_session(monc); 1106 } 1107 1108 if (!monc->hunting) { 1109 ceph_con_keepalive(&monc->con); 1110 __validate_auth(monc); 1111 un_backoff(monc); 1112 } 1113 1114 if (is_auth && 1115 !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) { 1116 unsigned long now = jiffies; 1117 1118 dout("%s renew subs? now %lu renew after %lu\n", 1119 __func__, now, monc->sub_renew_after); 1120 if (time_after_eq(now, monc->sub_renew_after)) 1121 __send_subscribe(monc); 1122 } 1123 } 1124 __schedule_delayed(monc); 1125 1126 out: 1127 mutex_unlock(&monc->mutex); 1128 } 1129 1130 /* 1131 * On startup, we build a temporary monmap populated with the IPs 1132 * provided by mount(2). 1133 */ 1134 static int build_initial_monmap(struct ceph_mon_client *monc) 1135 { 1136 __le32 my_type = ceph_msgr2(monc->client) ? 1137 CEPH_ENTITY_ADDR_TYPE_MSGR2 : CEPH_ENTITY_ADDR_TYPE_LEGACY; 1138 struct ceph_options *opt = monc->client->options; 1139 int num_mon = opt->num_mon; 1140 int i; 1141 1142 /* build initial monmap */ 1143 monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon), 1144 GFP_KERNEL); 1145 if (!monc->monmap) 1146 return -ENOMEM; 1147 1148 for (i = 0; i < num_mon; i++) { 1149 struct ceph_entity_inst *inst = &monc->monmap->mon_inst[i]; 1150 1151 memcpy(&inst->addr.in_addr, &opt->mon_addr[i].in_addr, 1152 sizeof(inst->addr.in_addr)); 1153 inst->addr.type = my_type; 1154 inst->addr.nonce = 0; 1155 inst->name.type = CEPH_ENTITY_TYPE_MON; 1156 inst->name.num = cpu_to_le64(i); 1157 } 1158 monc->monmap->num_mon = num_mon; 1159 return 0; 1160 } 1161 1162 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 1163 { 1164 int err; 1165 1166 dout("init\n"); 1167 memset(monc, 0, sizeof(*monc)); 1168 monc->client = cl; 1169 mutex_init(&monc->mutex); 1170 1171 err = build_initial_monmap(monc); 1172 if (err) 1173 goto out; 1174 1175 /* connection */ 1176 /* authentication */ 1177 monc->auth = ceph_auth_init(cl->options->name, cl->options->key, 1178 cl->options->con_modes); 1179 if (IS_ERR(monc->auth)) { 1180 err = PTR_ERR(monc->auth); 1181 goto out_monmap; 1182 } 1183 monc->auth->want_keys = 1184 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 1185 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 1186 1187 /* msgs */ 1188 err = -ENOMEM; 1189 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 1190 sizeof(struct ceph_mon_subscribe_ack), 1191 GFP_KERNEL, true); 1192 if (!monc->m_subscribe_ack) 1193 goto out_auth; 1194 1195 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128, 1196 GFP_KERNEL, true); 1197 if (!monc->m_subscribe) 1198 goto out_subscribe_ack; 1199 1200 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, 1201 GFP_KERNEL, true); 1202 if (!monc->m_auth_reply) 1203 goto out_subscribe; 1204 1205 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true); 1206 monc->pending_auth = 0; 1207 if (!monc->m_auth) 1208 goto out_auth_reply; 1209 1210 ceph_con_init(&monc->con, monc, &mon_con_ops, 1211 &monc->client->msgr); 1212 1213 monc->cur_mon = -1; 1214 monc->had_a_connection = false; 1215 monc->hunt_mult = 1; 1216 1217 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 1218 monc->generic_request_tree = RB_ROOT; 1219 monc->last_tid = 0; 1220 1221 monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE; 1222 1223 return 0; 1224 1225 out_auth_reply: 1226 ceph_msg_put(monc->m_auth_reply); 1227 out_subscribe: 1228 ceph_msg_put(monc->m_subscribe); 1229 out_subscribe_ack: 1230 ceph_msg_put(monc->m_subscribe_ack); 1231 out_auth: 1232 ceph_auth_destroy(monc->auth); 1233 out_monmap: 1234 kfree(monc->monmap); 1235 out: 1236 return err; 1237 } 1238 EXPORT_SYMBOL(ceph_monc_init); 1239 1240 void ceph_monc_stop(struct ceph_mon_client *monc) 1241 { 1242 dout("stop\n"); 1243 1244 mutex_lock(&monc->mutex); 1245 __close_session(monc); 1246 monc->hunting = false; 1247 monc->cur_mon = -1; 1248 mutex_unlock(&monc->mutex); 1249 1250 cancel_delayed_work_sync(&monc->delayed_work); 1251 1252 /* 1253 * flush msgr queue before we destroy ourselves to ensure that: 1254 * - any work that references our embedded con is finished. 1255 * - any osd_client or other work that may reference an authorizer 1256 * finishes before we shut down the auth subsystem. 1257 */ 1258 ceph_msgr_flush(); 1259 1260 ceph_auth_destroy(monc->auth); 1261 1262 WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree)); 1263 1264 ceph_msg_put(monc->m_auth); 1265 ceph_msg_put(monc->m_auth_reply); 1266 ceph_msg_put(monc->m_subscribe); 1267 ceph_msg_put(monc->m_subscribe_ack); 1268 1269 kfree(monc->monmap); 1270 } 1271 EXPORT_SYMBOL(ceph_monc_stop); 1272 1273 static void finish_hunting(struct ceph_mon_client *monc) 1274 { 1275 if (monc->hunting) { 1276 dout("%s found mon%d\n", __func__, monc->cur_mon); 1277 monc->hunting = false; 1278 monc->had_a_connection = true; 1279 un_backoff(monc); 1280 __schedule_delayed(monc); 1281 } 1282 } 1283 1284 static void finish_auth(struct ceph_mon_client *monc, int auth_err, 1285 bool was_authed) 1286 { 1287 dout("%s auth_err %d was_authed %d\n", __func__, auth_err, was_authed); 1288 WARN_ON(auth_err > 0); 1289 1290 monc->pending_auth = 0; 1291 if (auth_err) { 1292 monc->client->auth_err = auth_err; 1293 wake_up_all(&monc->client->auth_wq); 1294 return; 1295 } 1296 1297 if (!was_authed && ceph_auth_is_authenticated(monc->auth)) { 1298 dout("%s authenticated, starting session global_id %llu\n", 1299 __func__, monc->auth->global_id); 1300 1301 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 1302 monc->client->msgr.inst.name.num = 1303 cpu_to_le64(monc->auth->global_id); 1304 1305 __send_subscribe(monc); 1306 __resend_generic_request(monc); 1307 1308 pr_info("mon%d %s session established\n", monc->cur_mon, 1309 ceph_pr_addr(&monc->con.peer_addr)); 1310 } 1311 } 1312 1313 static void handle_auth_reply(struct ceph_mon_client *monc, 1314 struct ceph_msg *msg) 1315 { 1316 bool was_authed; 1317 int ret; 1318 1319 mutex_lock(&monc->mutex); 1320 was_authed = ceph_auth_is_authenticated(monc->auth); 1321 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 1322 msg->front.iov_len, 1323 monc->m_auth->front.iov_base, 1324 monc->m_auth->front_alloc_len); 1325 if (ret > 0) { 1326 __send_prepared_auth_request(monc, ret); 1327 } else { 1328 finish_auth(monc, ret, was_authed); 1329 finish_hunting(monc); 1330 } 1331 mutex_unlock(&monc->mutex); 1332 } 1333 1334 static int __validate_auth(struct ceph_mon_client *monc) 1335 { 1336 int ret; 1337 1338 if (monc->pending_auth) 1339 return 0; 1340 1341 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 1342 monc->m_auth->front_alloc_len); 1343 if (ret <= 0) 1344 return ret; /* either an error, or no need to authenticate */ 1345 __send_prepared_auth_request(monc, ret); 1346 return 0; 1347 } 1348 1349 int ceph_monc_validate_auth(struct ceph_mon_client *monc) 1350 { 1351 int ret; 1352 1353 mutex_lock(&monc->mutex); 1354 ret = __validate_auth(monc); 1355 mutex_unlock(&monc->mutex); 1356 return ret; 1357 } 1358 EXPORT_SYMBOL(ceph_monc_validate_auth); 1359 1360 static int mon_get_auth_request(struct ceph_connection *con, 1361 void *buf, int *buf_len, 1362 void **authorizer, int *authorizer_len) 1363 { 1364 struct ceph_mon_client *monc = con->private; 1365 int ret; 1366 1367 mutex_lock(&monc->mutex); 1368 ret = ceph_auth_get_request(monc->auth, buf, *buf_len); 1369 mutex_unlock(&monc->mutex); 1370 if (ret < 0) 1371 return ret; 1372 1373 *buf_len = ret; 1374 *authorizer = NULL; 1375 *authorizer_len = 0; 1376 return 0; 1377 } 1378 1379 static int mon_handle_auth_reply_more(struct ceph_connection *con, 1380 void *reply, int reply_len, 1381 void *buf, int *buf_len, 1382 void **authorizer, int *authorizer_len) 1383 { 1384 struct ceph_mon_client *monc = con->private; 1385 int ret; 1386 1387 mutex_lock(&monc->mutex); 1388 ret = ceph_auth_handle_reply_more(monc->auth, reply, reply_len, 1389 buf, *buf_len); 1390 mutex_unlock(&monc->mutex); 1391 if (ret < 0) 1392 return ret; 1393 1394 *buf_len = ret; 1395 *authorizer = NULL; 1396 *authorizer_len = 0; 1397 return 0; 1398 } 1399 1400 static int mon_handle_auth_done(struct ceph_connection *con, 1401 u64 global_id, void *reply, int reply_len, 1402 u8 *session_key, int *session_key_len, 1403 u8 *con_secret, int *con_secret_len) 1404 { 1405 struct ceph_mon_client *monc = con->private; 1406 bool was_authed; 1407 int ret; 1408 1409 mutex_lock(&monc->mutex); 1410 WARN_ON(!monc->hunting); 1411 was_authed = ceph_auth_is_authenticated(monc->auth); 1412 ret = ceph_auth_handle_reply_done(monc->auth, global_id, 1413 reply, reply_len, 1414 session_key, session_key_len, 1415 con_secret, con_secret_len); 1416 finish_auth(monc, ret, was_authed); 1417 if (!ret) 1418 finish_hunting(monc); 1419 mutex_unlock(&monc->mutex); 1420 return 0; 1421 } 1422 1423 static int mon_handle_auth_bad_method(struct ceph_connection *con, 1424 int used_proto, int result, 1425 const int *allowed_protos, int proto_cnt, 1426 const int *allowed_modes, int mode_cnt) 1427 { 1428 struct ceph_mon_client *monc = con->private; 1429 bool was_authed; 1430 1431 mutex_lock(&monc->mutex); 1432 WARN_ON(!monc->hunting); 1433 was_authed = ceph_auth_is_authenticated(monc->auth); 1434 ceph_auth_handle_bad_method(monc->auth, used_proto, result, 1435 allowed_protos, proto_cnt, 1436 allowed_modes, mode_cnt); 1437 finish_auth(monc, -EACCES, was_authed); 1438 mutex_unlock(&monc->mutex); 1439 return 0; 1440 } 1441 1442 /* 1443 * handle incoming message 1444 */ 1445 static void mon_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 1446 { 1447 struct ceph_mon_client *monc = con->private; 1448 int type = le16_to_cpu(msg->hdr.type); 1449 1450 switch (type) { 1451 case CEPH_MSG_AUTH_REPLY: 1452 handle_auth_reply(monc, msg); 1453 break; 1454 1455 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1456 handle_subscribe_ack(monc, msg); 1457 break; 1458 1459 case CEPH_MSG_STATFS_REPLY: 1460 handle_statfs_reply(monc, msg); 1461 break; 1462 1463 case CEPH_MSG_MON_GET_VERSION_REPLY: 1464 handle_get_version_reply(monc, msg); 1465 break; 1466 1467 case CEPH_MSG_MON_COMMAND_ACK: 1468 handle_command_ack(monc, msg); 1469 break; 1470 1471 case CEPH_MSG_MON_MAP: 1472 ceph_monc_handle_map(monc, msg); 1473 break; 1474 1475 case CEPH_MSG_OSD_MAP: 1476 ceph_osdc_handle_map(&monc->client->osdc, msg); 1477 break; 1478 1479 default: 1480 /* can the chained handler handle it? */ 1481 if (monc->client->extra_mon_dispatch && 1482 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 1483 break; 1484 1485 pr_err("received unknown message type %d %s\n", type, 1486 ceph_msg_type_name(type)); 1487 } 1488 ceph_msg_put(msg); 1489 } 1490 1491 /* 1492 * Allocate memory for incoming message 1493 */ 1494 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 1495 struct ceph_msg_header *hdr, 1496 int *skip) 1497 { 1498 struct ceph_mon_client *monc = con->private; 1499 int type = le16_to_cpu(hdr->type); 1500 int front_len = le32_to_cpu(hdr->front_len); 1501 struct ceph_msg *m = NULL; 1502 1503 *skip = 0; 1504 1505 switch (type) { 1506 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1507 m = ceph_msg_get(monc->m_subscribe_ack); 1508 break; 1509 case CEPH_MSG_STATFS_REPLY: 1510 case CEPH_MSG_MON_COMMAND_ACK: 1511 return get_generic_reply(con, hdr, skip); 1512 case CEPH_MSG_AUTH_REPLY: 1513 m = ceph_msg_get(monc->m_auth_reply); 1514 break; 1515 case CEPH_MSG_MON_GET_VERSION_REPLY: 1516 if (le64_to_cpu(hdr->tid) != 0) 1517 return get_generic_reply(con, hdr, skip); 1518 1519 /* 1520 * Older OSDs don't set reply tid even if the original 1521 * request had a non-zero tid. Work around this weirdness 1522 * by allocating a new message. 1523 */ 1524 fallthrough; 1525 case CEPH_MSG_MON_MAP: 1526 case CEPH_MSG_MDS_MAP: 1527 case CEPH_MSG_OSD_MAP: 1528 case CEPH_MSG_FS_MAP_USER: 1529 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1530 if (!m) 1531 return NULL; /* ENOMEM--return skip == 0 */ 1532 break; 1533 } 1534 1535 if (!m) { 1536 pr_info("alloc_msg unknown type %d\n", type); 1537 *skip = 1; 1538 } else if (front_len > m->front_alloc_len) { 1539 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n", 1540 front_len, m->front_alloc_len, 1541 (unsigned int)con->peer_name.type, 1542 le64_to_cpu(con->peer_name.num)); 1543 ceph_msg_put(m); 1544 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1545 } 1546 1547 return m; 1548 } 1549 1550 /* 1551 * If the monitor connection resets, pick a new monitor and resubmit 1552 * any pending requests. 1553 */ 1554 static void mon_fault(struct ceph_connection *con) 1555 { 1556 struct ceph_mon_client *monc = con->private; 1557 1558 mutex_lock(&monc->mutex); 1559 dout("%s mon%d\n", __func__, monc->cur_mon); 1560 if (monc->cur_mon >= 0) { 1561 if (!monc->hunting) { 1562 dout("%s hunting for new mon\n", __func__); 1563 reopen_session(monc); 1564 __schedule_delayed(monc); 1565 } else { 1566 dout("%s already hunting\n", __func__); 1567 } 1568 } 1569 mutex_unlock(&monc->mutex); 1570 } 1571 1572 /* 1573 * We can ignore refcounting on the connection struct, as all references 1574 * will come from the messenger workqueue, which is drained prior to 1575 * mon_client destruction. 1576 */ 1577 static struct ceph_connection *mon_get_con(struct ceph_connection *con) 1578 { 1579 return con; 1580 } 1581 1582 static void mon_put_con(struct ceph_connection *con) 1583 { 1584 } 1585 1586 static const struct ceph_connection_operations mon_con_ops = { 1587 .get = mon_get_con, 1588 .put = mon_put_con, 1589 .alloc_msg = mon_alloc_msg, 1590 .dispatch = mon_dispatch, 1591 .fault = mon_fault, 1592 .get_auth_request = mon_get_auth_request, 1593 .handle_auth_reply_more = mon_handle_auth_reply_more, 1594 .handle_auth_done = mon_handle_auth_done, 1595 .handle_auth_bad_method = mon_handle_auth_bad_method, 1596 }; 1597