1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/module.h> 5 #include <linux/types.h> 6 #include <linux/slab.h> 7 #include <linux/random.h> 8 #include <linux/sched.h> 9 10 #include <linux/ceph/ceph_features.h> 11 #include <linux/ceph/mon_client.h> 12 #include <linux/ceph/libceph.h> 13 #include <linux/ceph/debugfs.h> 14 #include <linux/ceph/decode.h> 15 #include <linux/ceph/auth.h> 16 17 /* 18 * Interact with Ceph monitor cluster. Handle requests for new map 19 * versions, and periodically resend as needed. Also implement 20 * statfs() and umount(). 21 * 22 * A small cluster of Ceph "monitors" are responsible for managing critical 23 * cluster configuration and state information. An odd number (e.g., 3, 5) 24 * of cmon daemons use a modified version of the Paxos part-time parliament 25 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 26 * list of clients who have mounted the file system. 27 * 28 * We maintain an open, active session with a monitor at all times in order to 29 * receive timely MDSMap updates. We periodically send a keepalive byte on the 30 * TCP socket to ensure we detect a failure. If the connection does break, we 31 * randomly hunt for a new monitor. Once the connection is reestablished, we 32 * resend any outstanding requests. 33 */ 34 35 static const struct ceph_connection_operations mon_con_ops; 36 37 static int __validate_auth(struct ceph_mon_client *monc); 38 39 /* 40 * Decode a monmap blob (e.g., during mount). 41 */ 42 static struct ceph_monmap *ceph_monmap_decode(void *p, void *end) 43 { 44 struct ceph_monmap *m = NULL; 45 int i, err = -EINVAL; 46 struct ceph_fsid fsid; 47 u32 epoch, num_mon; 48 u32 len; 49 50 ceph_decode_32_safe(&p, end, len, bad); 51 ceph_decode_need(&p, end, len, bad); 52 53 dout("monmap_decode %p %p len %d (%d)\n", p, end, len, (int)(end-p)); 54 p += sizeof(u16); /* skip version */ 55 56 ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); 57 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 58 epoch = ceph_decode_32(&p); 59 60 num_mon = ceph_decode_32(&p); 61 62 if (num_mon > CEPH_MAX_MON) 63 goto bad; 64 m = kmalloc(struct_size(m, mon_inst, num_mon), GFP_NOFS); 65 if (m == NULL) 66 return ERR_PTR(-ENOMEM); 67 m->fsid = fsid; 68 m->epoch = epoch; 69 m->num_mon = num_mon; 70 for (i = 0; i < num_mon; ++i) { 71 struct ceph_entity_inst *inst = &m->mon_inst[i]; 72 73 /* copy name portion */ 74 ceph_decode_copy_safe(&p, end, &inst->name, 75 sizeof(inst->name), bad); 76 err = ceph_decode_entity_addr(&p, end, &inst->addr); 77 if (err) 78 goto bad; 79 } 80 dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, 81 m->num_mon); 82 for (i = 0; i < m->num_mon; i++) 83 dout("monmap_decode mon%d is %s\n", i, 84 ceph_pr_addr(&m->mon_inst[i].addr)); 85 return m; 86 bad: 87 dout("monmap_decode failed with %d\n", err); 88 kfree(m); 89 return ERR_PTR(err); 90 } 91 92 /* 93 * return true if *addr is included in the monmap. 94 */ 95 int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 96 { 97 int i; 98 99 for (i = 0; i < m->num_mon; i++) 100 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0) 101 return 1; 102 return 0; 103 } 104 105 /* 106 * Send an auth request. 107 */ 108 static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 109 { 110 monc->pending_auth = 1; 111 monc->m_auth->front.iov_len = len; 112 monc->m_auth->hdr.front_len = cpu_to_le32(len); 113 ceph_msg_revoke(monc->m_auth); 114 ceph_msg_get(monc->m_auth); /* keep our ref */ 115 ceph_con_send(&monc->con, monc->m_auth); 116 } 117 118 /* 119 * Close monitor session, if any. 120 */ 121 static void __close_session(struct ceph_mon_client *monc) 122 { 123 dout("__close_session closing mon%d\n", monc->cur_mon); 124 ceph_msg_revoke(monc->m_auth); 125 ceph_msg_revoke_incoming(monc->m_auth_reply); 126 ceph_msg_revoke(monc->m_subscribe); 127 ceph_msg_revoke_incoming(monc->m_subscribe_ack); 128 ceph_con_close(&monc->con); 129 130 monc->pending_auth = 0; 131 ceph_auth_reset(monc->auth); 132 } 133 134 /* 135 * Pick a new monitor at random and set cur_mon. If we are repicking 136 * (i.e. cur_mon is already set), be sure to pick a different one. 137 */ 138 static void pick_new_mon(struct ceph_mon_client *monc) 139 { 140 int old_mon = monc->cur_mon; 141 142 BUG_ON(monc->monmap->num_mon < 1); 143 144 if (monc->monmap->num_mon == 1) { 145 monc->cur_mon = 0; 146 } else { 147 int max = monc->monmap->num_mon; 148 int o = -1; 149 int n; 150 151 if (monc->cur_mon >= 0) { 152 if (monc->cur_mon < monc->monmap->num_mon) 153 o = monc->cur_mon; 154 if (o >= 0) 155 max--; 156 } 157 158 n = prandom_u32() % max; 159 if (o >= 0 && n >= o) 160 n++; 161 162 monc->cur_mon = n; 163 } 164 165 dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon, 166 monc->cur_mon, monc->monmap->num_mon); 167 } 168 169 /* 170 * Open a session with a new monitor. 171 */ 172 static void __open_session(struct ceph_mon_client *monc) 173 { 174 int ret; 175 176 pick_new_mon(monc); 177 178 monc->hunting = true; 179 if (monc->had_a_connection) { 180 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF; 181 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT) 182 monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT; 183 } 184 185 monc->sub_renew_after = jiffies; /* i.e., expired */ 186 monc->sub_renew_sent = 0; 187 188 dout("%s opening mon%d\n", __func__, monc->cur_mon); 189 ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon, 190 &monc->monmap->mon_inst[monc->cur_mon].addr); 191 192 /* 193 * send an initial keepalive to ensure our timestamp is valid 194 * by the time we are in an OPENED state 195 */ 196 ceph_con_keepalive(&monc->con); 197 198 /* initiate authentication handshake */ 199 ret = ceph_auth_build_hello(monc->auth, 200 monc->m_auth->front.iov_base, 201 monc->m_auth->front_alloc_len); 202 BUG_ON(ret <= 0); 203 __send_prepared_auth_request(monc, ret); 204 } 205 206 static void reopen_session(struct ceph_mon_client *monc) 207 { 208 if (!monc->hunting) 209 pr_info("mon%d %s session lost, hunting for new mon\n", 210 monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr)); 211 212 __close_session(monc); 213 __open_session(monc); 214 } 215 216 void ceph_monc_reopen_session(struct ceph_mon_client *monc) 217 { 218 mutex_lock(&monc->mutex); 219 reopen_session(monc); 220 mutex_unlock(&monc->mutex); 221 } 222 223 static void un_backoff(struct ceph_mon_client *monc) 224 { 225 monc->hunt_mult /= 2; /* reduce by 50% */ 226 if (monc->hunt_mult < 1) 227 monc->hunt_mult = 1; 228 dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult); 229 } 230 231 /* 232 * Reschedule delayed work timer. 233 */ 234 static void __schedule_delayed(struct ceph_mon_client *monc) 235 { 236 unsigned long delay; 237 238 if (monc->hunting) 239 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult; 240 else 241 delay = CEPH_MONC_PING_INTERVAL; 242 243 dout("__schedule_delayed after %lu\n", delay); 244 mod_delayed_work(system_wq, &monc->delayed_work, 245 round_jiffies_relative(delay)); 246 } 247 248 const char *ceph_sub_str[] = { 249 [CEPH_SUB_MONMAP] = "monmap", 250 [CEPH_SUB_OSDMAP] = "osdmap", 251 [CEPH_SUB_FSMAP] = "fsmap.user", 252 [CEPH_SUB_MDSMAP] = "mdsmap", 253 }; 254 255 /* 256 * Send subscribe request for one or more maps, according to 257 * monc->subs. 258 */ 259 static void __send_subscribe(struct ceph_mon_client *monc) 260 { 261 struct ceph_msg *msg = monc->m_subscribe; 262 void *p = msg->front.iov_base; 263 void *const end = p + msg->front_alloc_len; 264 int num = 0; 265 int i; 266 267 dout("%s sent %lu\n", __func__, monc->sub_renew_sent); 268 269 BUG_ON(monc->cur_mon < 0); 270 271 if (!monc->sub_renew_sent) 272 monc->sub_renew_sent = jiffies | 1; /* never 0 */ 273 274 msg->hdr.version = cpu_to_le16(2); 275 276 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 277 if (monc->subs[i].want) 278 num++; 279 } 280 BUG_ON(num < 1); /* monmap sub is always there */ 281 ceph_encode_32(&p, num); 282 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) { 283 char buf[32]; 284 int len; 285 286 if (!monc->subs[i].want) 287 continue; 288 289 len = sprintf(buf, "%s", ceph_sub_str[i]); 290 if (i == CEPH_SUB_MDSMAP && 291 monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE) 292 len += sprintf(buf + len, ".%d", monc->fs_cluster_id); 293 294 dout("%s %s start %llu flags 0x%x\n", __func__, buf, 295 le64_to_cpu(monc->subs[i].item.start), 296 monc->subs[i].item.flags); 297 ceph_encode_string(&p, end, buf, len); 298 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item)); 299 p += sizeof(monc->subs[i].item); 300 } 301 302 BUG_ON(p > end); 303 msg->front.iov_len = p - msg->front.iov_base; 304 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 305 ceph_msg_revoke(msg); 306 ceph_con_send(&monc->con, ceph_msg_get(msg)); 307 } 308 309 static void handle_subscribe_ack(struct ceph_mon_client *monc, 310 struct ceph_msg *msg) 311 { 312 unsigned int seconds; 313 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 314 315 if (msg->front.iov_len < sizeof(*h)) 316 goto bad; 317 seconds = le32_to_cpu(h->duration); 318 319 mutex_lock(&monc->mutex); 320 if (monc->sub_renew_sent) { 321 /* 322 * This is only needed for legacy (infernalis or older) 323 * MONs -- see delayed_work(). 324 */ 325 monc->sub_renew_after = monc->sub_renew_sent + 326 (seconds >> 1) * HZ - 1; 327 dout("%s sent %lu duration %d renew after %lu\n", __func__, 328 monc->sub_renew_sent, seconds, monc->sub_renew_after); 329 monc->sub_renew_sent = 0; 330 } else { 331 dout("%s sent %lu renew after %lu, ignoring\n", __func__, 332 monc->sub_renew_sent, monc->sub_renew_after); 333 } 334 mutex_unlock(&monc->mutex); 335 return; 336 bad: 337 pr_err("got corrupt subscribe-ack msg\n"); 338 ceph_msg_dump(msg); 339 } 340 341 /* 342 * Register interest in a map 343 * 344 * @sub: one of CEPH_SUB_* 345 * @epoch: X for "every map since X", or 0 for "just the latest" 346 */ 347 static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub, 348 u32 epoch, bool continuous) 349 { 350 __le64 start = cpu_to_le64(epoch); 351 u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0; 352 353 dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub], 354 epoch, continuous); 355 356 if (monc->subs[sub].want && 357 monc->subs[sub].item.start == start && 358 monc->subs[sub].item.flags == flags) 359 return false; 360 361 monc->subs[sub].item.start = start; 362 monc->subs[sub].item.flags = flags; 363 monc->subs[sub].want = true; 364 365 return true; 366 } 367 368 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch, 369 bool continuous) 370 { 371 bool need_request; 372 373 mutex_lock(&monc->mutex); 374 need_request = __ceph_monc_want_map(monc, sub, epoch, continuous); 375 mutex_unlock(&monc->mutex); 376 377 return need_request; 378 } 379 EXPORT_SYMBOL(ceph_monc_want_map); 380 381 /* 382 * Keep track of which maps we have 383 * 384 * @sub: one of CEPH_SUB_* 385 */ 386 static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub, 387 u32 epoch) 388 { 389 dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch); 390 391 if (monc->subs[sub].want) { 392 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME) 393 monc->subs[sub].want = false; 394 else 395 monc->subs[sub].item.start = cpu_to_le64(epoch + 1); 396 } 397 398 monc->subs[sub].have = epoch; 399 } 400 401 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch) 402 { 403 mutex_lock(&monc->mutex); 404 __ceph_monc_got_map(monc, sub, epoch); 405 mutex_unlock(&monc->mutex); 406 } 407 EXPORT_SYMBOL(ceph_monc_got_map); 408 409 void ceph_monc_renew_subs(struct ceph_mon_client *monc) 410 { 411 mutex_lock(&monc->mutex); 412 __send_subscribe(monc); 413 mutex_unlock(&monc->mutex); 414 } 415 EXPORT_SYMBOL(ceph_monc_renew_subs); 416 417 /* 418 * Wait for an osdmap with a given epoch. 419 * 420 * @epoch: epoch to wait for 421 * @timeout: in jiffies, 0 means "wait forever" 422 */ 423 int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, 424 unsigned long timeout) 425 { 426 unsigned long started = jiffies; 427 long ret; 428 429 mutex_lock(&monc->mutex); 430 while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) { 431 mutex_unlock(&monc->mutex); 432 433 if (timeout && time_after_eq(jiffies, started + timeout)) 434 return -ETIMEDOUT; 435 436 ret = wait_event_interruptible_timeout(monc->client->auth_wq, 437 monc->subs[CEPH_SUB_OSDMAP].have >= epoch, 438 ceph_timeout_jiffies(timeout)); 439 if (ret < 0) 440 return ret; 441 442 mutex_lock(&monc->mutex); 443 } 444 445 mutex_unlock(&monc->mutex); 446 return 0; 447 } 448 EXPORT_SYMBOL(ceph_monc_wait_osdmap); 449 450 /* 451 * Open a session with a random monitor. Request monmap and osdmap, 452 * which are waited upon in __ceph_open_session(). 453 */ 454 int ceph_monc_open_session(struct ceph_mon_client *monc) 455 { 456 mutex_lock(&monc->mutex); 457 __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true); 458 __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false); 459 __open_session(monc); 460 __schedule_delayed(monc); 461 mutex_unlock(&monc->mutex); 462 return 0; 463 } 464 EXPORT_SYMBOL(ceph_monc_open_session); 465 466 static void ceph_monc_handle_map(struct ceph_mon_client *monc, 467 struct ceph_msg *msg) 468 { 469 struct ceph_client *client = monc->client; 470 struct ceph_monmap *monmap; 471 void *p, *end; 472 473 mutex_lock(&monc->mutex); 474 475 dout("handle_monmap\n"); 476 p = msg->front.iov_base; 477 end = p + msg->front.iov_len; 478 479 monmap = ceph_monmap_decode(p, end); 480 if (IS_ERR(monmap)) { 481 pr_err("problem decoding monmap, %d\n", 482 (int)PTR_ERR(monmap)); 483 ceph_msg_dump(msg); 484 goto out; 485 } 486 487 if (ceph_check_fsid(client, &monmap->fsid) < 0) { 488 kfree(monmap); 489 goto out; 490 } 491 492 kfree(monc->monmap); 493 monc->monmap = monmap; 494 495 __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch); 496 client->have_fsid = true; 497 498 out: 499 mutex_unlock(&monc->mutex); 500 wake_up_all(&client->auth_wq); 501 } 502 503 /* 504 * generic requests (currently statfs, mon_get_version) 505 */ 506 DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node) 507 508 static void release_generic_request(struct kref *kref) 509 { 510 struct ceph_mon_generic_request *req = 511 container_of(kref, struct ceph_mon_generic_request, kref); 512 513 dout("%s greq %p request %p reply %p\n", __func__, req, req->request, 514 req->reply); 515 WARN_ON(!RB_EMPTY_NODE(&req->node)); 516 517 if (req->reply) 518 ceph_msg_put(req->reply); 519 if (req->request) 520 ceph_msg_put(req->request); 521 522 kfree(req); 523 } 524 525 static void put_generic_request(struct ceph_mon_generic_request *req) 526 { 527 if (req) 528 kref_put(&req->kref, release_generic_request); 529 } 530 531 static void get_generic_request(struct ceph_mon_generic_request *req) 532 { 533 kref_get(&req->kref); 534 } 535 536 static struct ceph_mon_generic_request * 537 alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp) 538 { 539 struct ceph_mon_generic_request *req; 540 541 req = kzalloc(sizeof(*req), gfp); 542 if (!req) 543 return NULL; 544 545 req->monc = monc; 546 kref_init(&req->kref); 547 RB_CLEAR_NODE(&req->node); 548 init_completion(&req->completion); 549 550 dout("%s greq %p\n", __func__, req); 551 return req; 552 } 553 554 static void register_generic_request(struct ceph_mon_generic_request *req) 555 { 556 struct ceph_mon_client *monc = req->monc; 557 558 WARN_ON(req->tid); 559 560 get_generic_request(req); 561 req->tid = ++monc->last_tid; 562 insert_generic_request(&monc->generic_request_tree, req); 563 } 564 565 static void send_generic_request(struct ceph_mon_client *monc, 566 struct ceph_mon_generic_request *req) 567 { 568 WARN_ON(!req->tid); 569 570 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 571 req->request->hdr.tid = cpu_to_le64(req->tid); 572 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 573 } 574 575 static void __finish_generic_request(struct ceph_mon_generic_request *req) 576 { 577 struct ceph_mon_client *monc = req->monc; 578 579 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 580 erase_generic_request(&monc->generic_request_tree, req); 581 582 ceph_msg_revoke(req->request); 583 ceph_msg_revoke_incoming(req->reply); 584 } 585 586 static void finish_generic_request(struct ceph_mon_generic_request *req) 587 { 588 __finish_generic_request(req); 589 put_generic_request(req); 590 } 591 592 static void complete_generic_request(struct ceph_mon_generic_request *req) 593 { 594 if (req->complete_cb) 595 req->complete_cb(req); 596 else 597 complete_all(&req->completion); 598 put_generic_request(req); 599 } 600 601 static void cancel_generic_request(struct ceph_mon_generic_request *req) 602 { 603 struct ceph_mon_client *monc = req->monc; 604 struct ceph_mon_generic_request *lookup_req; 605 606 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 607 608 mutex_lock(&monc->mutex); 609 lookup_req = lookup_generic_request(&monc->generic_request_tree, 610 req->tid); 611 if (lookup_req) { 612 WARN_ON(lookup_req != req); 613 finish_generic_request(req); 614 } 615 616 mutex_unlock(&monc->mutex); 617 } 618 619 static int wait_generic_request(struct ceph_mon_generic_request *req) 620 { 621 int ret; 622 623 dout("%s greq %p tid %llu\n", __func__, req, req->tid); 624 ret = wait_for_completion_interruptible(&req->completion); 625 if (ret) 626 cancel_generic_request(req); 627 else 628 ret = req->result; /* completed */ 629 630 return ret; 631 } 632 633 static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 634 struct ceph_msg_header *hdr, 635 int *skip) 636 { 637 struct ceph_mon_client *monc = con->private; 638 struct ceph_mon_generic_request *req; 639 u64 tid = le64_to_cpu(hdr->tid); 640 struct ceph_msg *m; 641 642 mutex_lock(&monc->mutex); 643 req = lookup_generic_request(&monc->generic_request_tree, tid); 644 if (!req) { 645 dout("get_generic_reply %lld dne\n", tid); 646 *skip = 1; 647 m = NULL; 648 } else { 649 dout("get_generic_reply %lld got %p\n", tid, req->reply); 650 *skip = 0; 651 m = ceph_msg_get(req->reply); 652 /* 653 * we don't need to track the connection reading into 654 * this reply because we only have one open connection 655 * at a time, ever. 656 */ 657 } 658 mutex_unlock(&monc->mutex); 659 return m; 660 } 661 662 /* 663 * statfs 664 */ 665 static void handle_statfs_reply(struct ceph_mon_client *monc, 666 struct ceph_msg *msg) 667 { 668 struct ceph_mon_generic_request *req; 669 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 670 u64 tid = le64_to_cpu(msg->hdr.tid); 671 672 dout("%s msg %p tid %llu\n", __func__, msg, tid); 673 674 if (msg->front.iov_len != sizeof(*reply)) 675 goto bad; 676 677 mutex_lock(&monc->mutex); 678 req = lookup_generic_request(&monc->generic_request_tree, tid); 679 if (!req) { 680 mutex_unlock(&monc->mutex); 681 return; 682 } 683 684 req->result = 0; 685 *req->u.st = reply->st; /* struct */ 686 __finish_generic_request(req); 687 mutex_unlock(&monc->mutex); 688 689 complete_generic_request(req); 690 return; 691 692 bad: 693 pr_err("corrupt statfs reply, tid %llu\n", tid); 694 ceph_msg_dump(msg); 695 } 696 697 /* 698 * Do a synchronous statfs(). 699 */ 700 int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool, 701 struct ceph_statfs *buf) 702 { 703 struct ceph_mon_generic_request *req; 704 struct ceph_mon_statfs *h; 705 int ret = -ENOMEM; 706 707 req = alloc_generic_request(monc, GFP_NOFS); 708 if (!req) 709 goto out; 710 711 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, 712 true); 713 if (!req->request) 714 goto out; 715 716 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true); 717 if (!req->reply) 718 goto out; 719 720 req->u.st = buf; 721 req->request->hdr.version = cpu_to_le16(2); 722 723 mutex_lock(&monc->mutex); 724 register_generic_request(req); 725 /* fill out request */ 726 h = req->request->front.iov_base; 727 h->monhdr.have_version = 0; 728 h->monhdr.session_mon = cpu_to_le16(-1); 729 h->monhdr.session_mon_tid = 0; 730 h->fsid = monc->monmap->fsid; 731 h->contains_data_pool = (data_pool != CEPH_NOPOOL); 732 h->data_pool = cpu_to_le64(data_pool); 733 send_generic_request(monc, req); 734 mutex_unlock(&monc->mutex); 735 736 ret = wait_generic_request(req); 737 out: 738 put_generic_request(req); 739 return ret; 740 } 741 EXPORT_SYMBOL(ceph_monc_do_statfs); 742 743 static void handle_get_version_reply(struct ceph_mon_client *monc, 744 struct ceph_msg *msg) 745 { 746 struct ceph_mon_generic_request *req; 747 u64 tid = le64_to_cpu(msg->hdr.tid); 748 void *p = msg->front.iov_base; 749 void *end = p + msg->front_alloc_len; 750 u64 handle; 751 752 dout("%s msg %p tid %llu\n", __func__, msg, tid); 753 754 ceph_decode_need(&p, end, 2*sizeof(u64), bad); 755 handle = ceph_decode_64(&p); 756 if (tid != 0 && tid != handle) 757 goto bad; 758 759 mutex_lock(&monc->mutex); 760 req = lookup_generic_request(&monc->generic_request_tree, handle); 761 if (!req) { 762 mutex_unlock(&monc->mutex); 763 return; 764 } 765 766 req->result = 0; 767 req->u.newest = ceph_decode_64(&p); 768 __finish_generic_request(req); 769 mutex_unlock(&monc->mutex); 770 771 complete_generic_request(req); 772 return; 773 774 bad: 775 pr_err("corrupt mon_get_version reply, tid %llu\n", tid); 776 ceph_msg_dump(msg); 777 } 778 779 static struct ceph_mon_generic_request * 780 __ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 781 ceph_monc_callback_t cb, u64 private_data) 782 { 783 struct ceph_mon_generic_request *req; 784 785 req = alloc_generic_request(monc, GFP_NOIO); 786 if (!req) 787 goto err_put_req; 788 789 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION, 790 sizeof(u64) + sizeof(u32) + strlen(what), 791 GFP_NOIO, true); 792 if (!req->request) 793 goto err_put_req; 794 795 req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO, 796 true); 797 if (!req->reply) 798 goto err_put_req; 799 800 req->complete_cb = cb; 801 req->private_data = private_data; 802 803 mutex_lock(&monc->mutex); 804 register_generic_request(req); 805 { 806 void *p = req->request->front.iov_base; 807 void *const end = p + req->request->front_alloc_len; 808 809 ceph_encode_64(&p, req->tid); /* handle */ 810 ceph_encode_string(&p, end, what, strlen(what)); 811 WARN_ON(p != end); 812 } 813 send_generic_request(monc, req); 814 mutex_unlock(&monc->mutex); 815 816 return req; 817 818 err_put_req: 819 put_generic_request(req); 820 return ERR_PTR(-ENOMEM); 821 } 822 823 /* 824 * Send MMonGetVersion and wait for the reply. 825 * 826 * @what: one of "mdsmap", "osdmap" or "monmap" 827 */ 828 int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what, 829 u64 *newest) 830 { 831 struct ceph_mon_generic_request *req; 832 int ret; 833 834 req = __ceph_monc_get_version(monc, what, NULL, 0); 835 if (IS_ERR(req)) 836 return PTR_ERR(req); 837 838 ret = wait_generic_request(req); 839 if (!ret) 840 *newest = req->u.newest; 841 842 put_generic_request(req); 843 return ret; 844 } 845 EXPORT_SYMBOL(ceph_monc_get_version); 846 847 /* 848 * Send MMonGetVersion, 849 * 850 * @what: one of "mdsmap", "osdmap" or "monmap" 851 */ 852 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what, 853 ceph_monc_callback_t cb, u64 private_data) 854 { 855 struct ceph_mon_generic_request *req; 856 857 req = __ceph_monc_get_version(monc, what, cb, private_data); 858 if (IS_ERR(req)) 859 return PTR_ERR(req); 860 861 put_generic_request(req); 862 return 0; 863 } 864 EXPORT_SYMBOL(ceph_monc_get_version_async); 865 866 static void handle_command_ack(struct ceph_mon_client *monc, 867 struct ceph_msg *msg) 868 { 869 struct ceph_mon_generic_request *req; 870 void *p = msg->front.iov_base; 871 void *const end = p + msg->front_alloc_len; 872 u64 tid = le64_to_cpu(msg->hdr.tid); 873 874 dout("%s msg %p tid %llu\n", __func__, msg, tid); 875 876 ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) + 877 sizeof(u32), bad); 878 p += sizeof(struct ceph_mon_request_header); 879 880 mutex_lock(&monc->mutex); 881 req = lookup_generic_request(&monc->generic_request_tree, tid); 882 if (!req) { 883 mutex_unlock(&monc->mutex); 884 return; 885 } 886 887 req->result = ceph_decode_32(&p); 888 __finish_generic_request(req); 889 mutex_unlock(&monc->mutex); 890 891 complete_generic_request(req); 892 return; 893 894 bad: 895 pr_err("corrupt mon_command ack, tid %llu\n", tid); 896 ceph_msg_dump(msg); 897 } 898 899 static __printf(2, 0) 900 int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt, 901 va_list ap) 902 { 903 struct ceph_mon_generic_request *req; 904 struct ceph_mon_command *h; 905 int ret = -ENOMEM; 906 int len; 907 908 req = alloc_generic_request(monc, GFP_NOIO); 909 if (!req) 910 goto out; 911 912 req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true); 913 if (!req->request) 914 goto out; 915 916 req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO, 917 true); 918 if (!req->reply) 919 goto out; 920 921 mutex_lock(&monc->mutex); 922 register_generic_request(req); 923 h = req->request->front.iov_base; 924 h->monhdr.have_version = 0; 925 h->monhdr.session_mon = cpu_to_le16(-1); 926 h->monhdr.session_mon_tid = 0; 927 h->fsid = monc->monmap->fsid; 928 h->num_strs = cpu_to_le32(1); 929 len = vsprintf(h->str, fmt, ap); 930 h->str_len = cpu_to_le32(len); 931 send_generic_request(monc, req); 932 mutex_unlock(&monc->mutex); 933 934 ret = wait_generic_request(req); 935 out: 936 put_generic_request(req); 937 return ret; 938 } 939 940 static __printf(2, 3) 941 int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...) 942 { 943 va_list ap; 944 int ret; 945 946 va_start(ap, fmt); 947 ret = do_mon_command_vargs(monc, fmt, ap); 948 va_end(ap); 949 return ret; 950 } 951 952 int ceph_monc_blocklist_add(struct ceph_mon_client *monc, 953 struct ceph_entity_addr *client_addr) 954 { 955 int ret; 956 957 ret = do_mon_command(monc, 958 "{ \"prefix\": \"osd blocklist\", \ 959 \"blocklistop\": \"add\", \ 960 \"addr\": \"%pISpc/%u\" }", 961 &client_addr->in_addr, 962 le32_to_cpu(client_addr->nonce)); 963 if (ret == -EINVAL) { 964 /* 965 * The monitor returns EINVAL on an unrecognized command. 966 * Try the legacy command -- it is exactly the same except 967 * for the name. 968 */ 969 ret = do_mon_command(monc, 970 "{ \"prefix\": \"osd blacklist\", \ 971 \"blacklistop\": \"add\", \ 972 \"addr\": \"%pISpc/%u\" }", 973 &client_addr->in_addr, 974 le32_to_cpu(client_addr->nonce)); 975 } 976 if (ret) 977 return ret; 978 979 /* 980 * Make sure we have the osdmap that includes the blocklist 981 * entry. This is needed to ensure that the OSDs pick up the 982 * new blocklist before processing any future requests from 983 * this client. 984 */ 985 return ceph_wait_for_latest_osdmap(monc->client, 0); 986 } 987 EXPORT_SYMBOL(ceph_monc_blocklist_add); 988 989 /* 990 * Resend pending generic requests. 991 */ 992 static void __resend_generic_request(struct ceph_mon_client *monc) 993 { 994 struct ceph_mon_generic_request *req; 995 struct rb_node *p; 996 997 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 998 req = rb_entry(p, struct ceph_mon_generic_request, node); 999 ceph_msg_revoke(req->request); 1000 ceph_msg_revoke_incoming(req->reply); 1001 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 1002 } 1003 } 1004 1005 /* 1006 * Delayed work. If we haven't mounted yet, retry. Otherwise, 1007 * renew/retry subscription as needed (in case it is timing out, or we 1008 * got an ENOMEM). And keep the monitor connection alive. 1009 */ 1010 static void delayed_work(struct work_struct *work) 1011 { 1012 struct ceph_mon_client *monc = 1013 container_of(work, struct ceph_mon_client, delayed_work.work); 1014 1015 dout("monc delayed_work\n"); 1016 mutex_lock(&monc->mutex); 1017 if (monc->hunting) { 1018 dout("%s continuing hunt\n", __func__); 1019 reopen_session(monc); 1020 } else { 1021 int is_auth = ceph_auth_is_authenticated(monc->auth); 1022 if (ceph_con_keepalive_expired(&monc->con, 1023 CEPH_MONC_PING_TIMEOUT)) { 1024 dout("monc keepalive timeout\n"); 1025 is_auth = 0; 1026 reopen_session(monc); 1027 } 1028 1029 if (!monc->hunting) { 1030 ceph_con_keepalive(&monc->con); 1031 __validate_auth(monc); 1032 un_backoff(monc); 1033 } 1034 1035 if (is_auth && 1036 !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) { 1037 unsigned long now = jiffies; 1038 1039 dout("%s renew subs? now %lu renew after %lu\n", 1040 __func__, now, monc->sub_renew_after); 1041 if (time_after_eq(now, monc->sub_renew_after)) 1042 __send_subscribe(monc); 1043 } 1044 } 1045 __schedule_delayed(monc); 1046 mutex_unlock(&monc->mutex); 1047 } 1048 1049 /* 1050 * On startup, we build a temporary monmap populated with the IPs 1051 * provided by mount(2). 1052 */ 1053 static int build_initial_monmap(struct ceph_mon_client *monc) 1054 { 1055 struct ceph_options *opt = monc->client->options; 1056 struct ceph_entity_addr *mon_addr = opt->mon_addr; 1057 int num_mon = opt->num_mon; 1058 int i; 1059 1060 /* build initial monmap */ 1061 monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon), 1062 GFP_KERNEL); 1063 if (!monc->monmap) 1064 return -ENOMEM; 1065 for (i = 0; i < num_mon; i++) { 1066 monc->monmap->mon_inst[i].addr = mon_addr[i]; 1067 monc->monmap->mon_inst[i].addr.nonce = 0; 1068 monc->monmap->mon_inst[i].name.type = 1069 CEPH_ENTITY_TYPE_MON; 1070 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i); 1071 } 1072 monc->monmap->num_mon = num_mon; 1073 return 0; 1074 } 1075 1076 int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 1077 { 1078 int err = 0; 1079 1080 dout("init\n"); 1081 memset(monc, 0, sizeof(*monc)); 1082 monc->client = cl; 1083 monc->monmap = NULL; 1084 mutex_init(&monc->mutex); 1085 1086 err = build_initial_monmap(monc); 1087 if (err) 1088 goto out; 1089 1090 /* connection */ 1091 /* authentication */ 1092 monc->auth = ceph_auth_init(cl->options->name, 1093 cl->options->key); 1094 if (IS_ERR(monc->auth)) { 1095 err = PTR_ERR(monc->auth); 1096 goto out_monmap; 1097 } 1098 monc->auth->want_keys = 1099 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 1100 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 1101 1102 /* msgs */ 1103 err = -ENOMEM; 1104 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 1105 sizeof(struct ceph_mon_subscribe_ack), 1106 GFP_KERNEL, true); 1107 if (!monc->m_subscribe_ack) 1108 goto out_auth; 1109 1110 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128, 1111 GFP_KERNEL, true); 1112 if (!monc->m_subscribe) 1113 goto out_subscribe_ack; 1114 1115 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, 1116 GFP_KERNEL, true); 1117 if (!monc->m_auth_reply) 1118 goto out_subscribe; 1119 1120 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true); 1121 monc->pending_auth = 0; 1122 if (!monc->m_auth) 1123 goto out_auth_reply; 1124 1125 ceph_con_init(&monc->con, monc, &mon_con_ops, 1126 &monc->client->msgr); 1127 1128 monc->cur_mon = -1; 1129 monc->had_a_connection = false; 1130 monc->hunt_mult = 1; 1131 1132 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 1133 monc->generic_request_tree = RB_ROOT; 1134 monc->last_tid = 0; 1135 1136 monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE; 1137 1138 return 0; 1139 1140 out_auth_reply: 1141 ceph_msg_put(monc->m_auth_reply); 1142 out_subscribe: 1143 ceph_msg_put(monc->m_subscribe); 1144 out_subscribe_ack: 1145 ceph_msg_put(monc->m_subscribe_ack); 1146 out_auth: 1147 ceph_auth_destroy(monc->auth); 1148 out_monmap: 1149 kfree(monc->monmap); 1150 out: 1151 return err; 1152 } 1153 EXPORT_SYMBOL(ceph_monc_init); 1154 1155 void ceph_monc_stop(struct ceph_mon_client *monc) 1156 { 1157 dout("stop\n"); 1158 cancel_delayed_work_sync(&monc->delayed_work); 1159 1160 mutex_lock(&monc->mutex); 1161 __close_session(monc); 1162 monc->cur_mon = -1; 1163 mutex_unlock(&monc->mutex); 1164 1165 /* 1166 * flush msgr queue before we destroy ourselves to ensure that: 1167 * - any work that references our embedded con is finished. 1168 * - any osd_client or other work that may reference an authorizer 1169 * finishes before we shut down the auth subsystem. 1170 */ 1171 ceph_msgr_flush(); 1172 1173 ceph_auth_destroy(monc->auth); 1174 1175 WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree)); 1176 1177 ceph_msg_put(monc->m_auth); 1178 ceph_msg_put(monc->m_auth_reply); 1179 ceph_msg_put(monc->m_subscribe); 1180 ceph_msg_put(monc->m_subscribe_ack); 1181 1182 kfree(monc->monmap); 1183 } 1184 EXPORT_SYMBOL(ceph_monc_stop); 1185 1186 static void finish_hunting(struct ceph_mon_client *monc) 1187 { 1188 if (monc->hunting) { 1189 dout("%s found mon%d\n", __func__, monc->cur_mon); 1190 monc->hunting = false; 1191 monc->had_a_connection = true; 1192 un_backoff(monc); 1193 __schedule_delayed(monc); 1194 } 1195 } 1196 1197 static void handle_auth_reply(struct ceph_mon_client *monc, 1198 struct ceph_msg *msg) 1199 { 1200 int ret; 1201 int was_auth = 0; 1202 1203 mutex_lock(&monc->mutex); 1204 was_auth = ceph_auth_is_authenticated(monc->auth); 1205 monc->pending_auth = 0; 1206 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 1207 msg->front.iov_len, 1208 monc->m_auth->front.iov_base, 1209 monc->m_auth->front_alloc_len); 1210 if (ret > 0) { 1211 __send_prepared_auth_request(monc, ret); 1212 goto out; 1213 } 1214 1215 finish_hunting(monc); 1216 1217 if (ret < 0) { 1218 monc->client->auth_err = ret; 1219 } else if (!was_auth && ceph_auth_is_authenticated(monc->auth)) { 1220 dout("authenticated, starting session\n"); 1221 1222 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 1223 monc->client->msgr.inst.name.num = 1224 cpu_to_le64(monc->auth->global_id); 1225 1226 __send_subscribe(monc); 1227 __resend_generic_request(monc); 1228 1229 pr_info("mon%d %s session established\n", monc->cur_mon, 1230 ceph_pr_addr(&monc->con.peer_addr)); 1231 } 1232 1233 out: 1234 mutex_unlock(&monc->mutex); 1235 if (monc->client->auth_err < 0) 1236 wake_up_all(&monc->client->auth_wq); 1237 } 1238 1239 static int __validate_auth(struct ceph_mon_client *monc) 1240 { 1241 int ret; 1242 1243 if (monc->pending_auth) 1244 return 0; 1245 1246 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 1247 monc->m_auth->front_alloc_len); 1248 if (ret <= 0) 1249 return ret; /* either an error, or no need to authenticate */ 1250 __send_prepared_auth_request(monc, ret); 1251 return 0; 1252 } 1253 1254 int ceph_monc_validate_auth(struct ceph_mon_client *monc) 1255 { 1256 int ret; 1257 1258 mutex_lock(&monc->mutex); 1259 ret = __validate_auth(monc); 1260 mutex_unlock(&monc->mutex); 1261 return ret; 1262 } 1263 EXPORT_SYMBOL(ceph_monc_validate_auth); 1264 1265 /* 1266 * handle incoming message 1267 */ 1268 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 1269 { 1270 struct ceph_mon_client *monc = con->private; 1271 int type = le16_to_cpu(msg->hdr.type); 1272 1273 switch (type) { 1274 case CEPH_MSG_AUTH_REPLY: 1275 handle_auth_reply(monc, msg); 1276 break; 1277 1278 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1279 handle_subscribe_ack(monc, msg); 1280 break; 1281 1282 case CEPH_MSG_STATFS_REPLY: 1283 handle_statfs_reply(monc, msg); 1284 break; 1285 1286 case CEPH_MSG_MON_GET_VERSION_REPLY: 1287 handle_get_version_reply(monc, msg); 1288 break; 1289 1290 case CEPH_MSG_MON_COMMAND_ACK: 1291 handle_command_ack(monc, msg); 1292 break; 1293 1294 case CEPH_MSG_MON_MAP: 1295 ceph_monc_handle_map(monc, msg); 1296 break; 1297 1298 case CEPH_MSG_OSD_MAP: 1299 ceph_osdc_handle_map(&monc->client->osdc, msg); 1300 break; 1301 1302 default: 1303 /* can the chained handler handle it? */ 1304 if (monc->client->extra_mon_dispatch && 1305 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 1306 break; 1307 1308 pr_err("received unknown message type %d %s\n", type, 1309 ceph_msg_type_name(type)); 1310 } 1311 ceph_msg_put(msg); 1312 } 1313 1314 /* 1315 * Allocate memory for incoming message 1316 */ 1317 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 1318 struct ceph_msg_header *hdr, 1319 int *skip) 1320 { 1321 struct ceph_mon_client *monc = con->private; 1322 int type = le16_to_cpu(hdr->type); 1323 int front_len = le32_to_cpu(hdr->front_len); 1324 struct ceph_msg *m = NULL; 1325 1326 *skip = 0; 1327 1328 switch (type) { 1329 case CEPH_MSG_MON_SUBSCRIBE_ACK: 1330 m = ceph_msg_get(monc->m_subscribe_ack); 1331 break; 1332 case CEPH_MSG_STATFS_REPLY: 1333 case CEPH_MSG_MON_COMMAND_ACK: 1334 return get_generic_reply(con, hdr, skip); 1335 case CEPH_MSG_AUTH_REPLY: 1336 m = ceph_msg_get(monc->m_auth_reply); 1337 break; 1338 case CEPH_MSG_MON_GET_VERSION_REPLY: 1339 if (le64_to_cpu(hdr->tid) != 0) 1340 return get_generic_reply(con, hdr, skip); 1341 1342 /* 1343 * Older OSDs don't set reply tid even if the orignal 1344 * request had a non-zero tid. Work around this weirdness 1345 * by allocating a new message. 1346 */ 1347 fallthrough; 1348 case CEPH_MSG_MON_MAP: 1349 case CEPH_MSG_MDS_MAP: 1350 case CEPH_MSG_OSD_MAP: 1351 case CEPH_MSG_FS_MAP_USER: 1352 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1353 if (!m) 1354 return NULL; /* ENOMEM--return skip == 0 */ 1355 break; 1356 } 1357 1358 if (!m) { 1359 pr_info("alloc_msg unknown type %d\n", type); 1360 *skip = 1; 1361 } else if (front_len > m->front_alloc_len) { 1362 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n", 1363 front_len, m->front_alloc_len, 1364 (unsigned int)con->peer_name.type, 1365 le64_to_cpu(con->peer_name.num)); 1366 ceph_msg_put(m); 1367 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 1368 } 1369 1370 return m; 1371 } 1372 1373 /* 1374 * If the monitor connection resets, pick a new monitor and resubmit 1375 * any pending requests. 1376 */ 1377 static void mon_fault(struct ceph_connection *con) 1378 { 1379 struct ceph_mon_client *monc = con->private; 1380 1381 mutex_lock(&monc->mutex); 1382 dout("%s mon%d\n", __func__, monc->cur_mon); 1383 if (monc->cur_mon >= 0) { 1384 if (!monc->hunting) { 1385 dout("%s hunting for new mon\n", __func__); 1386 reopen_session(monc); 1387 __schedule_delayed(monc); 1388 } else { 1389 dout("%s already hunting\n", __func__); 1390 } 1391 } 1392 mutex_unlock(&monc->mutex); 1393 } 1394 1395 /* 1396 * We can ignore refcounting on the connection struct, as all references 1397 * will come from the messenger workqueue, which is drained prior to 1398 * mon_client destruction. 1399 */ 1400 static struct ceph_connection *con_get(struct ceph_connection *con) 1401 { 1402 return con; 1403 } 1404 1405 static void con_put(struct ceph_connection *con) 1406 { 1407 } 1408 1409 static const struct ceph_connection_operations mon_con_ops = { 1410 .get = con_get, 1411 .put = con_put, 1412 .dispatch = dispatch, 1413 .fault = mon_fault, 1414 .alloc_msg = mon_alloc_msg, 1415 }; 1416