1 // SPDX-License-Identifier: GPL-2.0 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/bvec.h> 5 #include <linux/crc32c.h> 6 #include <linux/net.h> 7 #include <linux/socket.h> 8 #include <net/sock.h> 9 10 #include <linux/ceph/ceph_features.h> 11 #include <linux/ceph/decode.h> 12 #include <linux/ceph/libceph.h> 13 #include <linux/ceph/messenger.h> 14 15 /* static tag bytes (protocol control messages) */ 16 static char tag_msg = CEPH_MSGR_TAG_MSG; 17 static char tag_ack = CEPH_MSGR_TAG_ACK; 18 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; 19 static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2; 20 21 /* 22 * If @buf is NULL, discard up to @len bytes. 23 */ 24 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) 25 { 26 struct kvec iov = {buf, len}; 27 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 28 int r; 29 30 if (!buf) 31 msg.msg_flags |= MSG_TRUNC; 32 33 iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, len); 34 r = sock_recvmsg(sock, &msg, msg.msg_flags); 35 if (r == -EAGAIN) 36 r = 0; 37 return r; 38 } 39 40 static int ceph_tcp_recvpage(struct socket *sock, struct page *page, 41 int page_offset, size_t length) 42 { 43 struct bio_vec bvec = { 44 .bv_page = page, 45 .bv_offset = page_offset, 46 .bv_len = length 47 }; 48 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 49 int r; 50 51 BUG_ON(page_offset + length > PAGE_SIZE); 52 iov_iter_bvec(&msg.msg_iter, READ, &bvec, 1, length); 53 r = sock_recvmsg(sock, &msg, msg.msg_flags); 54 if (r == -EAGAIN) 55 r = 0; 56 return r; 57 } 58 59 /* 60 * write something. @more is true if caller will be sending more data 61 * shortly. 62 */ 63 static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, 64 size_t kvlen, size_t len, bool more) 65 { 66 struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; 67 int r; 68 69 if (more) 70 msg.msg_flags |= MSG_MORE; 71 else 72 msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ 73 74 r = kernel_sendmsg(sock, &msg, iov, kvlen, len); 75 if (r == -EAGAIN) 76 r = 0; 77 return r; 78 } 79 80 /* 81 * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST 82 */ 83 static int ceph_tcp_sendpage(struct socket *sock, struct page *page, 84 int offset, size_t size, int more) 85 { 86 ssize_t (*sendpage)(struct socket *sock, struct page *page, 87 int offset, size_t size, int flags); 88 int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more; 89 int ret; 90 91 /* 92 * sendpage cannot properly handle pages with page_count == 0, 93 * we need to fall back to sendmsg if that's the case. 94 * 95 * Same goes for slab pages: skb_can_coalesce() allows 96 * coalescing neighboring slab objects into a single frag which 97 * triggers one of hardened usercopy checks. 98 */ 99 if (sendpage_ok(page)) 100 sendpage = sock->ops->sendpage; 101 else 102 sendpage = sock_no_sendpage; 103 104 ret = sendpage(sock, page, offset, size, flags); 105 if (ret == -EAGAIN) 106 ret = 0; 107 108 return ret; 109 } 110 111 static void con_out_kvec_reset(struct ceph_connection *con) 112 { 113 BUG_ON(con->out_skip); 114 115 con->out_kvec_left = 0; 116 con->out_kvec_bytes = 0; 117 con->out_kvec_cur = &con->out_kvec[0]; 118 } 119 120 static void con_out_kvec_add(struct ceph_connection *con, 121 size_t size, void *data) 122 { 123 int index = con->out_kvec_left; 124 125 BUG_ON(con->out_skip); 126 BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); 127 128 con->out_kvec[index].iov_len = size; 129 con->out_kvec[index].iov_base = data; 130 con->out_kvec_left++; 131 con->out_kvec_bytes += size; 132 } 133 134 /* 135 * Chop off a kvec from the end. Return residual number of bytes for 136 * that kvec, i.e. how many bytes would have been written if the kvec 137 * hadn't been nuked. 138 */ 139 static int con_out_kvec_skip(struct ceph_connection *con) 140 { 141 int off = con->out_kvec_cur - con->out_kvec; 142 int skip = 0; 143 144 if (con->out_kvec_bytes > 0) { 145 skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len; 146 BUG_ON(con->out_kvec_bytes < skip); 147 BUG_ON(!con->out_kvec_left); 148 con->out_kvec_bytes -= skip; 149 con->out_kvec_left--; 150 } 151 152 return skip; 153 } 154 155 static size_t sizeof_footer(struct ceph_connection *con) 156 { 157 return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? 158 sizeof(struct ceph_msg_footer) : 159 sizeof(struct ceph_msg_footer_old); 160 } 161 162 static void prepare_message_data(struct ceph_msg *msg, u32 data_len) 163 { 164 /* Initialize data cursor */ 165 166 ceph_msg_data_cursor_init(&msg->cursor, msg, data_len); 167 } 168 169 /* 170 * Prepare footer for currently outgoing message, and finish things 171 * off. Assumes out_kvec* are already valid.. we just add on to the end. 172 */ 173 static void prepare_write_message_footer(struct ceph_connection *con) 174 { 175 struct ceph_msg *m = con->out_msg; 176 177 m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; 178 179 dout("prepare_write_message_footer %p\n", con); 180 con_out_kvec_add(con, sizeof_footer(con), &m->footer); 181 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { 182 if (con->ops->sign_message) 183 con->ops->sign_message(m); 184 else 185 m->footer.sig = 0; 186 } else { 187 m->old_footer.flags = m->footer.flags; 188 } 189 con->out_more = m->more_to_follow; 190 con->out_msg_done = true; 191 } 192 193 /* 194 * Prepare headers for the next outgoing message. 195 */ 196 static void prepare_write_message(struct ceph_connection *con) 197 { 198 struct ceph_msg *m; 199 u32 crc; 200 201 con_out_kvec_reset(con); 202 con->out_msg_done = false; 203 204 /* Sneak an ack in there first? If we can get it into the same 205 * TCP packet that's a good thing. */ 206 if (con->in_seq > con->in_seq_acked) { 207 con->in_seq_acked = con->in_seq; 208 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 209 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 210 con_out_kvec_add(con, sizeof (con->out_temp_ack), 211 &con->out_temp_ack); 212 } 213 214 ceph_con_get_out_msg(con); 215 m = con->out_msg; 216 217 dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n", 218 m, con->out_seq, le16_to_cpu(m->hdr.type), 219 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), 220 m->data_length); 221 WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len)); 222 WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len)); 223 224 /* tag + hdr + front + middle */ 225 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); 226 con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr); 227 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); 228 229 if (m->middle) 230 con_out_kvec_add(con, m->middle->vec.iov_len, 231 m->middle->vec.iov_base); 232 233 /* fill in hdr crc and finalize hdr */ 234 crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); 235 con->out_msg->hdr.crc = cpu_to_le32(crc); 236 memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr)); 237 238 /* fill in front and middle crc, footer */ 239 crc = crc32c(0, m->front.iov_base, m->front.iov_len); 240 con->out_msg->footer.front_crc = cpu_to_le32(crc); 241 if (m->middle) { 242 crc = crc32c(0, m->middle->vec.iov_base, 243 m->middle->vec.iov_len); 244 con->out_msg->footer.middle_crc = cpu_to_le32(crc); 245 } else 246 con->out_msg->footer.middle_crc = 0; 247 dout("%s front_crc %u middle_crc %u\n", __func__, 248 le32_to_cpu(con->out_msg->footer.front_crc), 249 le32_to_cpu(con->out_msg->footer.middle_crc)); 250 con->out_msg->footer.flags = 0; 251 252 /* is there a data payload? */ 253 con->out_msg->footer.data_crc = 0; 254 if (m->data_length) { 255 prepare_message_data(con->out_msg, m->data_length); 256 con->out_more = 1; /* data + footer will follow */ 257 } else { 258 /* no, queue up footer too and be done */ 259 prepare_write_message_footer(con); 260 } 261 262 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 263 } 264 265 /* 266 * Prepare an ack. 267 */ 268 static void prepare_write_ack(struct ceph_connection *con) 269 { 270 dout("prepare_write_ack %p %llu -> %llu\n", con, 271 con->in_seq_acked, con->in_seq); 272 con->in_seq_acked = con->in_seq; 273 274 con_out_kvec_reset(con); 275 276 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); 277 278 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 279 con_out_kvec_add(con, sizeof (con->out_temp_ack), 280 &con->out_temp_ack); 281 282 con->out_more = 1; /* more will follow.. eventually.. */ 283 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 284 } 285 286 /* 287 * Prepare to share the seq during handshake 288 */ 289 static void prepare_write_seq(struct ceph_connection *con) 290 { 291 dout("prepare_write_seq %p %llu -> %llu\n", con, 292 con->in_seq_acked, con->in_seq); 293 con->in_seq_acked = con->in_seq; 294 295 con_out_kvec_reset(con); 296 297 con->out_temp_ack = cpu_to_le64(con->in_seq_acked); 298 con_out_kvec_add(con, sizeof (con->out_temp_ack), 299 &con->out_temp_ack); 300 301 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 302 } 303 304 /* 305 * Prepare to write keepalive byte. 306 */ 307 static void prepare_write_keepalive(struct ceph_connection *con) 308 { 309 dout("prepare_write_keepalive %p\n", con); 310 con_out_kvec_reset(con); 311 if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) { 312 struct timespec64 now; 313 314 ktime_get_real_ts64(&now); 315 con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); 316 ceph_encode_timespec64(&con->out_temp_keepalive2, &now); 317 con_out_kvec_add(con, sizeof(con->out_temp_keepalive2), 318 &con->out_temp_keepalive2); 319 } else { 320 con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); 321 } 322 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 323 } 324 325 /* 326 * Connection negotiation. 327 */ 328 329 static int get_connect_authorizer(struct ceph_connection *con) 330 { 331 struct ceph_auth_handshake *auth; 332 int auth_proto; 333 334 if (!con->ops->get_authorizer) { 335 con->auth = NULL; 336 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; 337 con->out_connect.authorizer_len = 0; 338 return 0; 339 } 340 341 auth = con->ops->get_authorizer(con, &auth_proto, con->auth_retry); 342 if (IS_ERR(auth)) 343 return PTR_ERR(auth); 344 345 con->auth = auth; 346 con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); 347 con->out_connect.authorizer_len = cpu_to_le32(auth->authorizer_buf_len); 348 return 0; 349 } 350 351 /* 352 * We connected to a peer and are saying hello. 353 */ 354 static void prepare_write_banner(struct ceph_connection *con) 355 { 356 con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); 357 con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), 358 &con->msgr->my_enc_addr); 359 360 con->out_more = 0; 361 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 362 } 363 364 static void __prepare_write_connect(struct ceph_connection *con) 365 { 366 con_out_kvec_add(con, sizeof(con->out_connect), &con->out_connect); 367 if (con->auth) 368 con_out_kvec_add(con, con->auth->authorizer_buf_len, 369 con->auth->authorizer_buf); 370 371 con->out_more = 0; 372 ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); 373 } 374 375 static int prepare_write_connect(struct ceph_connection *con) 376 { 377 unsigned int global_seq = ceph_get_global_seq(con->msgr, 0); 378 int proto; 379 int ret; 380 381 switch (con->peer_name.type) { 382 case CEPH_ENTITY_TYPE_MON: 383 proto = CEPH_MONC_PROTOCOL; 384 break; 385 case CEPH_ENTITY_TYPE_OSD: 386 proto = CEPH_OSDC_PROTOCOL; 387 break; 388 case CEPH_ENTITY_TYPE_MDS: 389 proto = CEPH_MDSC_PROTOCOL; 390 break; 391 default: 392 BUG(); 393 } 394 395 dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, 396 con->connect_seq, global_seq, proto); 397 398 con->out_connect.features = 399 cpu_to_le64(from_msgr(con->msgr)->supported_features); 400 con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); 401 con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); 402 con->out_connect.global_seq = cpu_to_le32(global_seq); 403 con->out_connect.protocol_version = cpu_to_le32(proto); 404 con->out_connect.flags = 0; 405 406 ret = get_connect_authorizer(con); 407 if (ret) 408 return ret; 409 410 __prepare_write_connect(con); 411 return 0; 412 } 413 414 /* 415 * write as much of pending kvecs to the socket as we can. 416 * 1 -> done 417 * 0 -> socket full, but more to do 418 * <0 -> error 419 */ 420 static int write_partial_kvec(struct ceph_connection *con) 421 { 422 int ret; 423 424 dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); 425 while (con->out_kvec_bytes > 0) { 426 ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, 427 con->out_kvec_left, con->out_kvec_bytes, 428 con->out_more); 429 if (ret <= 0) 430 goto out; 431 con->out_kvec_bytes -= ret; 432 if (con->out_kvec_bytes == 0) 433 break; /* done */ 434 435 /* account for full iov entries consumed */ 436 while (ret >= con->out_kvec_cur->iov_len) { 437 BUG_ON(!con->out_kvec_left); 438 ret -= con->out_kvec_cur->iov_len; 439 con->out_kvec_cur++; 440 con->out_kvec_left--; 441 } 442 /* and for a partially-consumed entry */ 443 if (ret) { 444 con->out_kvec_cur->iov_len -= ret; 445 con->out_kvec_cur->iov_base += ret; 446 } 447 } 448 con->out_kvec_left = 0; 449 ret = 1; 450 out: 451 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, 452 con->out_kvec_bytes, con->out_kvec_left, ret); 453 return ret; /* done! */ 454 } 455 456 /* 457 * Write as much message data payload as we can. If we finish, queue 458 * up the footer. 459 * 1 -> done, footer is now queued in out_kvec[]. 460 * 0 -> socket full, but more to do 461 * <0 -> error 462 */ 463 static int write_partial_message_data(struct ceph_connection *con) 464 { 465 struct ceph_msg *msg = con->out_msg; 466 struct ceph_msg_data_cursor *cursor = &msg->cursor; 467 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); 468 int more = MSG_MORE | MSG_SENDPAGE_NOTLAST; 469 u32 crc; 470 471 dout("%s %p msg %p\n", __func__, con, msg); 472 473 if (!msg->num_data_items) 474 return -EINVAL; 475 476 /* 477 * Iterate through each page that contains data to be 478 * written, and send as much as possible for each. 479 * 480 * If we are calculating the data crc (the default), we will 481 * need to map the page. If we have no pages, they have 482 * been revoked, so use the zero page. 483 */ 484 crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0; 485 while (cursor->total_resid) { 486 struct page *page; 487 size_t page_offset; 488 size_t length; 489 int ret; 490 491 if (!cursor->resid) { 492 ceph_msg_data_advance(cursor, 0); 493 continue; 494 } 495 496 page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); 497 if (length == cursor->total_resid) 498 more = MSG_MORE; 499 ret = ceph_tcp_sendpage(con->sock, page, page_offset, length, 500 more); 501 if (ret <= 0) { 502 if (do_datacrc) 503 msg->footer.data_crc = cpu_to_le32(crc); 504 505 return ret; 506 } 507 if (do_datacrc && cursor->need_crc) 508 crc = ceph_crc32c_page(crc, page, page_offset, length); 509 ceph_msg_data_advance(cursor, (size_t)ret); 510 } 511 512 dout("%s %p msg %p done\n", __func__, con, msg); 513 514 /* prepare and queue up footer, too */ 515 if (do_datacrc) 516 msg->footer.data_crc = cpu_to_le32(crc); 517 else 518 msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 519 con_out_kvec_reset(con); 520 prepare_write_message_footer(con); 521 522 return 1; /* must return > 0 to indicate success */ 523 } 524 525 /* 526 * write some zeros 527 */ 528 static int write_partial_skip(struct ceph_connection *con) 529 { 530 int more = MSG_MORE | MSG_SENDPAGE_NOTLAST; 531 int ret; 532 533 dout("%s %p %d left\n", __func__, con, con->out_skip); 534 while (con->out_skip > 0) { 535 size_t size = min(con->out_skip, (int) PAGE_SIZE); 536 537 if (size == con->out_skip) 538 more = MSG_MORE; 539 ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size, 540 more); 541 if (ret <= 0) 542 goto out; 543 con->out_skip -= ret; 544 } 545 ret = 1; 546 out: 547 return ret; 548 } 549 550 /* 551 * Prepare to read connection handshake, or an ack. 552 */ 553 static void prepare_read_banner(struct ceph_connection *con) 554 { 555 dout("prepare_read_banner %p\n", con); 556 con->in_base_pos = 0; 557 } 558 559 static void prepare_read_connect(struct ceph_connection *con) 560 { 561 dout("prepare_read_connect %p\n", con); 562 con->in_base_pos = 0; 563 } 564 565 static void prepare_read_ack(struct ceph_connection *con) 566 { 567 dout("prepare_read_ack %p\n", con); 568 con->in_base_pos = 0; 569 } 570 571 static void prepare_read_seq(struct ceph_connection *con) 572 { 573 dout("prepare_read_seq %p\n", con); 574 con->in_base_pos = 0; 575 con->in_tag = CEPH_MSGR_TAG_SEQ; 576 } 577 578 static void prepare_read_tag(struct ceph_connection *con) 579 { 580 dout("prepare_read_tag %p\n", con); 581 con->in_base_pos = 0; 582 con->in_tag = CEPH_MSGR_TAG_READY; 583 } 584 585 static void prepare_read_keepalive_ack(struct ceph_connection *con) 586 { 587 dout("prepare_read_keepalive_ack %p\n", con); 588 con->in_base_pos = 0; 589 } 590 591 /* 592 * Prepare to read a message. 593 */ 594 static int prepare_read_message(struct ceph_connection *con) 595 { 596 dout("prepare_read_message %p\n", con); 597 BUG_ON(con->in_msg != NULL); 598 con->in_base_pos = 0; 599 con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; 600 return 0; 601 } 602 603 static int read_partial(struct ceph_connection *con, 604 int end, int size, void *object) 605 { 606 while (con->in_base_pos < end) { 607 int left = end - con->in_base_pos; 608 int have = size - left; 609 int ret = ceph_tcp_recvmsg(con->sock, object + have, left); 610 if (ret <= 0) 611 return ret; 612 con->in_base_pos += ret; 613 } 614 return 1; 615 } 616 617 /* 618 * Read all or part of the connect-side handshake on a new connection 619 */ 620 static int read_partial_banner(struct ceph_connection *con) 621 { 622 int size; 623 int end; 624 int ret; 625 626 dout("read_partial_banner %p at %d\n", con, con->in_base_pos); 627 628 /* peer's banner */ 629 size = strlen(CEPH_BANNER); 630 end = size; 631 ret = read_partial(con, end, size, con->in_banner); 632 if (ret <= 0) 633 goto out; 634 635 size = sizeof (con->actual_peer_addr); 636 end += size; 637 ret = read_partial(con, end, size, &con->actual_peer_addr); 638 if (ret <= 0) 639 goto out; 640 ceph_decode_banner_addr(&con->actual_peer_addr); 641 642 size = sizeof (con->peer_addr_for_me); 643 end += size; 644 ret = read_partial(con, end, size, &con->peer_addr_for_me); 645 if (ret <= 0) 646 goto out; 647 ceph_decode_banner_addr(&con->peer_addr_for_me); 648 649 out: 650 return ret; 651 } 652 653 static int read_partial_connect(struct ceph_connection *con) 654 { 655 int size; 656 int end; 657 int ret; 658 659 dout("read_partial_connect %p at %d\n", con, con->in_base_pos); 660 661 size = sizeof (con->in_reply); 662 end = size; 663 ret = read_partial(con, end, size, &con->in_reply); 664 if (ret <= 0) 665 goto out; 666 667 if (con->auth) { 668 size = le32_to_cpu(con->in_reply.authorizer_len); 669 if (size > con->auth->authorizer_reply_buf_len) { 670 pr_err("authorizer reply too big: %d > %zu\n", size, 671 con->auth->authorizer_reply_buf_len); 672 ret = -EINVAL; 673 goto out; 674 } 675 676 end += size; 677 ret = read_partial(con, end, size, 678 con->auth->authorizer_reply_buf); 679 if (ret <= 0) 680 goto out; 681 } 682 683 dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n", 684 con, (int)con->in_reply.tag, 685 le32_to_cpu(con->in_reply.connect_seq), 686 le32_to_cpu(con->in_reply.global_seq)); 687 out: 688 return ret; 689 } 690 691 /* 692 * Verify the hello banner looks okay. 693 */ 694 static int verify_hello(struct ceph_connection *con) 695 { 696 if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { 697 pr_err("connect to %s got bad banner\n", 698 ceph_pr_addr(&con->peer_addr)); 699 con->error_msg = "protocol error, bad banner"; 700 return -1; 701 } 702 return 0; 703 } 704 705 static int process_banner(struct ceph_connection *con) 706 { 707 struct ceph_entity_addr *my_addr = &con->msgr->inst.addr; 708 709 dout("process_banner on %p\n", con); 710 711 if (verify_hello(con) < 0) 712 return -1; 713 714 /* 715 * Make sure the other end is who we wanted. note that the other 716 * end may not yet know their ip address, so if it's 0.0.0.0, give 717 * them the benefit of the doubt. 718 */ 719 if (memcmp(&con->peer_addr, &con->actual_peer_addr, 720 sizeof(con->peer_addr)) != 0 && 721 !(ceph_addr_is_blank(&con->actual_peer_addr) && 722 con->actual_peer_addr.nonce == con->peer_addr.nonce)) { 723 pr_warn("wrong peer, want %s/%u, got %s/%u\n", 724 ceph_pr_addr(&con->peer_addr), 725 le32_to_cpu(con->peer_addr.nonce), 726 ceph_pr_addr(&con->actual_peer_addr), 727 le32_to_cpu(con->actual_peer_addr.nonce)); 728 con->error_msg = "wrong peer at address"; 729 return -1; 730 } 731 732 /* 733 * did we learn our address? 734 */ 735 if (ceph_addr_is_blank(my_addr)) { 736 memcpy(&my_addr->in_addr, 737 &con->peer_addr_for_me.in_addr, 738 sizeof(con->peer_addr_for_me.in_addr)); 739 ceph_addr_set_port(my_addr, 0); 740 ceph_encode_my_addr(con->msgr); 741 dout("process_banner learned my addr is %s\n", 742 ceph_pr_addr(my_addr)); 743 } 744 745 return 0; 746 } 747 748 static int process_connect(struct ceph_connection *con) 749 { 750 u64 sup_feat = from_msgr(con->msgr)->supported_features; 751 u64 req_feat = from_msgr(con->msgr)->required_features; 752 u64 server_feat = le64_to_cpu(con->in_reply.features); 753 int ret; 754 755 dout("process_connect on %p tag %d\n", con, (int)con->in_tag); 756 757 if (con->auth) { 758 int len = le32_to_cpu(con->in_reply.authorizer_len); 759 760 /* 761 * Any connection that defines ->get_authorizer() 762 * should also define ->add_authorizer_challenge() and 763 * ->verify_authorizer_reply(). 764 * 765 * See get_connect_authorizer(). 766 */ 767 if (con->in_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { 768 ret = con->ops->add_authorizer_challenge( 769 con, con->auth->authorizer_reply_buf, len); 770 if (ret < 0) 771 return ret; 772 773 con_out_kvec_reset(con); 774 __prepare_write_connect(con); 775 prepare_read_connect(con); 776 return 0; 777 } 778 779 if (len) { 780 ret = con->ops->verify_authorizer_reply(con); 781 if (ret < 0) { 782 con->error_msg = "bad authorize reply"; 783 return ret; 784 } 785 } 786 } 787 788 switch (con->in_reply.tag) { 789 case CEPH_MSGR_TAG_FEATURES: 790 pr_err("%s%lld %s feature set mismatch," 791 " my %llx < server's %llx, missing %llx\n", 792 ENTITY_NAME(con->peer_name), 793 ceph_pr_addr(&con->peer_addr), 794 sup_feat, server_feat, server_feat & ~sup_feat); 795 con->error_msg = "missing required protocol features"; 796 return -1; 797 798 case CEPH_MSGR_TAG_BADPROTOVER: 799 pr_err("%s%lld %s protocol version mismatch," 800 " my %d != server's %d\n", 801 ENTITY_NAME(con->peer_name), 802 ceph_pr_addr(&con->peer_addr), 803 le32_to_cpu(con->out_connect.protocol_version), 804 le32_to_cpu(con->in_reply.protocol_version)); 805 con->error_msg = "protocol version mismatch"; 806 return -1; 807 808 case CEPH_MSGR_TAG_BADAUTHORIZER: 809 con->auth_retry++; 810 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, 811 con->auth_retry); 812 if (con->auth_retry == 2) { 813 con->error_msg = "connect authorization failure"; 814 return -1; 815 } 816 con_out_kvec_reset(con); 817 ret = prepare_write_connect(con); 818 if (ret < 0) 819 return ret; 820 prepare_read_connect(con); 821 break; 822 823 case CEPH_MSGR_TAG_RESETSESSION: 824 /* 825 * If we connected with a large connect_seq but the peer 826 * has no record of a session with us (no connection, or 827 * connect_seq == 0), they will send RESETSESION to indicate 828 * that they must have reset their session, and may have 829 * dropped messages. 830 */ 831 dout("process_connect got RESET peer seq %u\n", 832 le32_to_cpu(con->in_reply.connect_seq)); 833 pr_info("%s%lld %s session reset\n", 834 ENTITY_NAME(con->peer_name), 835 ceph_pr_addr(&con->peer_addr)); 836 ceph_con_reset_session(con); 837 con_out_kvec_reset(con); 838 ret = prepare_write_connect(con); 839 if (ret < 0) 840 return ret; 841 prepare_read_connect(con); 842 843 /* Tell ceph about it. */ 844 mutex_unlock(&con->mutex); 845 if (con->ops->peer_reset) 846 con->ops->peer_reset(con); 847 mutex_lock(&con->mutex); 848 if (con->state != CEPH_CON_S_V1_CONNECT_MSG) 849 return -EAGAIN; 850 break; 851 852 case CEPH_MSGR_TAG_RETRY_SESSION: 853 /* 854 * If we sent a smaller connect_seq than the peer has, try 855 * again with a larger value. 856 */ 857 dout("process_connect got RETRY_SESSION my seq %u, peer %u\n", 858 le32_to_cpu(con->out_connect.connect_seq), 859 le32_to_cpu(con->in_reply.connect_seq)); 860 con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); 861 con_out_kvec_reset(con); 862 ret = prepare_write_connect(con); 863 if (ret < 0) 864 return ret; 865 prepare_read_connect(con); 866 break; 867 868 case CEPH_MSGR_TAG_RETRY_GLOBAL: 869 /* 870 * If we sent a smaller global_seq than the peer has, try 871 * again with a larger value. 872 */ 873 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", 874 con->peer_global_seq, 875 le32_to_cpu(con->in_reply.global_seq)); 876 ceph_get_global_seq(con->msgr, 877 le32_to_cpu(con->in_reply.global_seq)); 878 con_out_kvec_reset(con); 879 ret = prepare_write_connect(con); 880 if (ret < 0) 881 return ret; 882 prepare_read_connect(con); 883 break; 884 885 case CEPH_MSGR_TAG_SEQ: 886 case CEPH_MSGR_TAG_READY: 887 if (req_feat & ~server_feat) { 888 pr_err("%s%lld %s protocol feature mismatch," 889 " my required %llx > server's %llx, need %llx\n", 890 ENTITY_NAME(con->peer_name), 891 ceph_pr_addr(&con->peer_addr), 892 req_feat, server_feat, req_feat & ~server_feat); 893 con->error_msg = "missing required protocol features"; 894 return -1; 895 } 896 897 WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG); 898 con->state = CEPH_CON_S_OPEN; 899 con->auth_retry = 0; /* we authenticated; clear flag */ 900 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 901 con->connect_seq++; 902 con->peer_features = server_feat; 903 dout("process_connect got READY gseq %d cseq %d (%d)\n", 904 con->peer_global_seq, 905 le32_to_cpu(con->in_reply.connect_seq), 906 con->connect_seq); 907 WARN_ON(con->connect_seq != 908 le32_to_cpu(con->in_reply.connect_seq)); 909 910 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) 911 ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX); 912 913 con->delay = 0; /* reset backoff memory */ 914 915 if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) { 916 prepare_write_seq(con); 917 prepare_read_seq(con); 918 } else { 919 prepare_read_tag(con); 920 } 921 break; 922 923 case CEPH_MSGR_TAG_WAIT: 924 /* 925 * If there is a connection race (we are opening 926 * connections to each other), one of us may just have 927 * to WAIT. This shouldn't happen if we are the 928 * client. 929 */ 930 con->error_msg = "protocol error, got WAIT as client"; 931 return -1; 932 933 default: 934 con->error_msg = "protocol error, garbage tag during connect"; 935 return -1; 936 } 937 return 0; 938 } 939 940 /* 941 * read (part of) an ack 942 */ 943 static int read_partial_ack(struct ceph_connection *con) 944 { 945 int size = sizeof (con->in_temp_ack); 946 int end = size; 947 948 return read_partial(con, end, size, &con->in_temp_ack); 949 } 950 951 /* 952 * We can finally discard anything that's been acked. 953 */ 954 static void process_ack(struct ceph_connection *con) 955 { 956 u64 ack = le64_to_cpu(con->in_temp_ack); 957 958 if (con->in_tag == CEPH_MSGR_TAG_ACK) 959 ceph_con_discard_sent(con, ack); 960 else 961 ceph_con_discard_requeued(con, ack); 962 963 prepare_read_tag(con); 964 } 965 966 static int read_partial_message_section(struct ceph_connection *con, 967 struct kvec *section, 968 unsigned int sec_len, u32 *crc) 969 { 970 int ret, left; 971 972 BUG_ON(!section); 973 974 while (section->iov_len < sec_len) { 975 BUG_ON(section->iov_base == NULL); 976 left = sec_len - section->iov_len; 977 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + 978 section->iov_len, left); 979 if (ret <= 0) 980 return ret; 981 section->iov_len += ret; 982 } 983 if (section->iov_len == sec_len) 984 *crc = crc32c(0, section->iov_base, section->iov_len); 985 986 return 1; 987 } 988 989 static int read_partial_msg_data(struct ceph_connection *con) 990 { 991 struct ceph_msg *msg = con->in_msg; 992 struct ceph_msg_data_cursor *cursor = &msg->cursor; 993 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); 994 struct page *page; 995 size_t page_offset; 996 size_t length; 997 u32 crc = 0; 998 int ret; 999 1000 if (!msg->num_data_items) 1001 return -EIO; 1002 1003 if (do_datacrc) 1004 crc = con->in_data_crc; 1005 while (cursor->total_resid) { 1006 if (!cursor->resid) { 1007 ceph_msg_data_advance(cursor, 0); 1008 continue; 1009 } 1010 1011 page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); 1012 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); 1013 if (ret <= 0) { 1014 if (do_datacrc) 1015 con->in_data_crc = crc; 1016 1017 return ret; 1018 } 1019 1020 if (do_datacrc) 1021 crc = ceph_crc32c_page(crc, page, page_offset, ret); 1022 ceph_msg_data_advance(cursor, (size_t)ret); 1023 } 1024 if (do_datacrc) 1025 con->in_data_crc = crc; 1026 1027 return 1; /* must return > 0 to indicate success */ 1028 } 1029 1030 /* 1031 * read (part of) a message. 1032 */ 1033 static int read_partial_message(struct ceph_connection *con) 1034 { 1035 struct ceph_msg *m = con->in_msg; 1036 int size; 1037 int end; 1038 int ret; 1039 unsigned int front_len, middle_len, data_len; 1040 bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); 1041 bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); 1042 u64 seq; 1043 u32 crc; 1044 1045 dout("read_partial_message con %p msg %p\n", con, m); 1046 1047 /* header */ 1048 size = sizeof (con->in_hdr); 1049 end = size; 1050 ret = read_partial(con, end, size, &con->in_hdr); 1051 if (ret <= 0) 1052 return ret; 1053 1054 crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); 1055 if (cpu_to_le32(crc) != con->in_hdr.crc) { 1056 pr_err("read_partial_message bad hdr crc %u != expected %u\n", 1057 crc, con->in_hdr.crc); 1058 return -EBADMSG; 1059 } 1060 1061 front_len = le32_to_cpu(con->in_hdr.front_len); 1062 if (front_len > CEPH_MSG_MAX_FRONT_LEN) 1063 return -EIO; 1064 middle_len = le32_to_cpu(con->in_hdr.middle_len); 1065 if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN) 1066 return -EIO; 1067 data_len = le32_to_cpu(con->in_hdr.data_len); 1068 if (data_len > CEPH_MSG_MAX_DATA_LEN) 1069 return -EIO; 1070 1071 /* verify seq# */ 1072 seq = le64_to_cpu(con->in_hdr.seq); 1073 if ((s64)seq - (s64)con->in_seq < 1) { 1074 pr_info("skipping %s%lld %s seq %lld expected %lld\n", 1075 ENTITY_NAME(con->peer_name), 1076 ceph_pr_addr(&con->peer_addr), 1077 seq, con->in_seq + 1); 1078 con->in_base_pos = -front_len - middle_len - data_len - 1079 sizeof_footer(con); 1080 con->in_tag = CEPH_MSGR_TAG_READY; 1081 return 1; 1082 } else if ((s64)seq - (s64)con->in_seq > 1) { 1083 pr_err("read_partial_message bad seq %lld expected %lld\n", 1084 seq, con->in_seq + 1); 1085 con->error_msg = "bad message sequence # for incoming message"; 1086 return -EBADE; 1087 } 1088 1089 /* allocate message? */ 1090 if (!con->in_msg) { 1091 int skip = 0; 1092 1093 dout("got hdr type %d front %d data %d\n", con->in_hdr.type, 1094 front_len, data_len); 1095 ret = ceph_con_in_msg_alloc(con, &con->in_hdr, &skip); 1096 if (ret < 0) 1097 return ret; 1098 1099 BUG_ON(!con->in_msg ^ skip); 1100 if (skip) { 1101 /* skip this message */ 1102 dout("alloc_msg said skip message\n"); 1103 con->in_base_pos = -front_len - middle_len - data_len - 1104 sizeof_footer(con); 1105 con->in_tag = CEPH_MSGR_TAG_READY; 1106 con->in_seq++; 1107 return 1; 1108 } 1109 1110 BUG_ON(!con->in_msg); 1111 BUG_ON(con->in_msg->con != con); 1112 m = con->in_msg; 1113 m->front.iov_len = 0; /* haven't read it yet */ 1114 if (m->middle) 1115 m->middle->vec.iov_len = 0; 1116 1117 /* prepare for data payload, if any */ 1118 1119 if (data_len) 1120 prepare_message_data(con->in_msg, data_len); 1121 } 1122 1123 /* front */ 1124 ret = read_partial_message_section(con, &m->front, front_len, 1125 &con->in_front_crc); 1126 if (ret <= 0) 1127 return ret; 1128 1129 /* middle */ 1130 if (m->middle) { 1131 ret = read_partial_message_section(con, &m->middle->vec, 1132 middle_len, 1133 &con->in_middle_crc); 1134 if (ret <= 0) 1135 return ret; 1136 } 1137 1138 /* (page) data */ 1139 if (data_len) { 1140 ret = read_partial_msg_data(con); 1141 if (ret <= 0) 1142 return ret; 1143 } 1144 1145 /* footer */ 1146 size = sizeof_footer(con); 1147 end += size; 1148 ret = read_partial(con, end, size, &m->footer); 1149 if (ret <= 0) 1150 return ret; 1151 1152 if (!need_sign) { 1153 m->footer.flags = m->old_footer.flags; 1154 m->footer.sig = 0; 1155 } 1156 1157 dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", 1158 m, front_len, m->footer.front_crc, middle_len, 1159 m->footer.middle_crc, data_len, m->footer.data_crc); 1160 1161 /* crc ok? */ 1162 if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { 1163 pr_err("read_partial_message %p front crc %u != exp. %u\n", 1164 m, con->in_front_crc, m->footer.front_crc); 1165 return -EBADMSG; 1166 } 1167 if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { 1168 pr_err("read_partial_message %p middle crc %u != exp %u\n", 1169 m, con->in_middle_crc, m->footer.middle_crc); 1170 return -EBADMSG; 1171 } 1172 if (do_datacrc && 1173 (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && 1174 con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { 1175 pr_err("read_partial_message %p data crc %u != exp. %u\n", m, 1176 con->in_data_crc, le32_to_cpu(m->footer.data_crc)); 1177 return -EBADMSG; 1178 } 1179 1180 if (need_sign && con->ops->check_message_signature && 1181 con->ops->check_message_signature(m)) { 1182 pr_err("read_partial_message %p signature check failed\n", m); 1183 return -EBADMSG; 1184 } 1185 1186 return 1; /* done! */ 1187 } 1188 1189 static int read_keepalive_ack(struct ceph_connection *con) 1190 { 1191 struct ceph_timespec ceph_ts; 1192 size_t size = sizeof(ceph_ts); 1193 int ret = read_partial(con, size, size, &ceph_ts); 1194 if (ret <= 0) 1195 return ret; 1196 ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts); 1197 prepare_read_tag(con); 1198 return 1; 1199 } 1200 1201 /* 1202 * Read what we can from the socket. 1203 */ 1204 int ceph_con_v1_try_read(struct ceph_connection *con) 1205 { 1206 int ret = -1; 1207 1208 more: 1209 dout("try_read start %p state %d\n", con, con->state); 1210 if (con->state != CEPH_CON_S_V1_BANNER && 1211 con->state != CEPH_CON_S_V1_CONNECT_MSG && 1212 con->state != CEPH_CON_S_OPEN) 1213 return 0; 1214 1215 BUG_ON(!con->sock); 1216 1217 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 1218 con->in_base_pos); 1219 1220 if (con->state == CEPH_CON_S_V1_BANNER) { 1221 ret = read_partial_banner(con); 1222 if (ret <= 0) 1223 goto out; 1224 ret = process_banner(con); 1225 if (ret < 0) 1226 goto out; 1227 1228 con->state = CEPH_CON_S_V1_CONNECT_MSG; 1229 1230 /* 1231 * Received banner is good, exchange connection info. 1232 * Do not reset out_kvec, as sending our banner raced 1233 * with receiving peer banner after connect completed. 1234 */ 1235 ret = prepare_write_connect(con); 1236 if (ret < 0) 1237 goto out; 1238 prepare_read_connect(con); 1239 1240 /* Send connection info before awaiting response */ 1241 goto out; 1242 } 1243 1244 if (con->state == CEPH_CON_S_V1_CONNECT_MSG) { 1245 ret = read_partial_connect(con); 1246 if (ret <= 0) 1247 goto out; 1248 ret = process_connect(con); 1249 if (ret < 0) 1250 goto out; 1251 goto more; 1252 } 1253 1254 WARN_ON(con->state != CEPH_CON_S_OPEN); 1255 1256 if (con->in_base_pos < 0) { 1257 /* 1258 * skipping + discarding content. 1259 */ 1260 ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos); 1261 if (ret <= 0) 1262 goto out; 1263 dout("skipped %d / %d bytes\n", ret, -con->in_base_pos); 1264 con->in_base_pos += ret; 1265 if (con->in_base_pos) 1266 goto more; 1267 } 1268 if (con->in_tag == CEPH_MSGR_TAG_READY) { 1269 /* 1270 * what's next? 1271 */ 1272 ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); 1273 if (ret <= 0) 1274 goto out; 1275 dout("try_read got tag %d\n", (int)con->in_tag); 1276 switch (con->in_tag) { 1277 case CEPH_MSGR_TAG_MSG: 1278 prepare_read_message(con); 1279 break; 1280 case CEPH_MSGR_TAG_ACK: 1281 prepare_read_ack(con); 1282 break; 1283 case CEPH_MSGR_TAG_KEEPALIVE2_ACK: 1284 prepare_read_keepalive_ack(con); 1285 break; 1286 case CEPH_MSGR_TAG_CLOSE: 1287 ceph_con_close_socket(con); 1288 con->state = CEPH_CON_S_CLOSED; 1289 goto out; 1290 default: 1291 goto bad_tag; 1292 } 1293 } 1294 if (con->in_tag == CEPH_MSGR_TAG_MSG) { 1295 ret = read_partial_message(con); 1296 if (ret <= 0) { 1297 switch (ret) { 1298 case -EBADMSG: 1299 con->error_msg = "bad crc/signature"; 1300 fallthrough; 1301 case -EBADE: 1302 ret = -EIO; 1303 break; 1304 case -EIO: 1305 con->error_msg = "io error"; 1306 break; 1307 } 1308 goto out; 1309 } 1310 if (con->in_tag == CEPH_MSGR_TAG_READY) 1311 goto more; 1312 ceph_con_process_message(con); 1313 if (con->state == CEPH_CON_S_OPEN) 1314 prepare_read_tag(con); 1315 goto more; 1316 } 1317 if (con->in_tag == CEPH_MSGR_TAG_ACK || 1318 con->in_tag == CEPH_MSGR_TAG_SEQ) { 1319 /* 1320 * the final handshake seq exchange is semantically 1321 * equivalent to an ACK 1322 */ 1323 ret = read_partial_ack(con); 1324 if (ret <= 0) 1325 goto out; 1326 process_ack(con); 1327 goto more; 1328 } 1329 if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { 1330 ret = read_keepalive_ack(con); 1331 if (ret <= 0) 1332 goto out; 1333 goto more; 1334 } 1335 1336 out: 1337 dout("try_read done on %p ret %d\n", con, ret); 1338 return ret; 1339 1340 bad_tag: 1341 pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); 1342 con->error_msg = "protocol error, garbage tag"; 1343 ret = -1; 1344 goto out; 1345 } 1346 1347 /* 1348 * Write something to the socket. Called in a worker thread when the 1349 * socket appears to be writeable and we have something ready to send. 1350 */ 1351 int ceph_con_v1_try_write(struct ceph_connection *con) 1352 { 1353 int ret = 1; 1354 1355 dout("try_write start %p state %d\n", con, con->state); 1356 if (con->state != CEPH_CON_S_PREOPEN && 1357 con->state != CEPH_CON_S_V1_BANNER && 1358 con->state != CEPH_CON_S_V1_CONNECT_MSG && 1359 con->state != CEPH_CON_S_OPEN) 1360 return 0; 1361 1362 /* open the socket first? */ 1363 if (con->state == CEPH_CON_S_PREOPEN) { 1364 BUG_ON(con->sock); 1365 con->state = CEPH_CON_S_V1_BANNER; 1366 1367 con_out_kvec_reset(con); 1368 prepare_write_banner(con); 1369 prepare_read_banner(con); 1370 1371 BUG_ON(con->in_msg); 1372 con->in_tag = CEPH_MSGR_TAG_READY; 1373 dout("try_write initiating connect on %p new state %d\n", 1374 con, con->state); 1375 ret = ceph_tcp_connect(con); 1376 if (ret < 0) { 1377 con->error_msg = "connect error"; 1378 goto out; 1379 } 1380 } 1381 1382 more: 1383 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 1384 BUG_ON(!con->sock); 1385 1386 /* kvec data queued? */ 1387 if (con->out_kvec_left) { 1388 ret = write_partial_kvec(con); 1389 if (ret <= 0) 1390 goto out; 1391 } 1392 if (con->out_skip) { 1393 ret = write_partial_skip(con); 1394 if (ret <= 0) 1395 goto out; 1396 } 1397 1398 /* msg pages? */ 1399 if (con->out_msg) { 1400 if (con->out_msg_done) { 1401 ceph_msg_put(con->out_msg); 1402 con->out_msg = NULL; /* we're done with this one */ 1403 goto do_next; 1404 } 1405 1406 ret = write_partial_message_data(con); 1407 if (ret == 1) 1408 goto more; /* we need to send the footer, too! */ 1409 if (ret == 0) 1410 goto out; 1411 if (ret < 0) { 1412 dout("try_write write_partial_message_data err %d\n", 1413 ret); 1414 goto out; 1415 } 1416 } 1417 1418 do_next: 1419 if (con->state == CEPH_CON_S_OPEN) { 1420 if (ceph_con_flag_test_and_clear(con, 1421 CEPH_CON_F_KEEPALIVE_PENDING)) { 1422 prepare_write_keepalive(con); 1423 goto more; 1424 } 1425 /* is anything else pending? */ 1426 if (!list_empty(&con->out_queue)) { 1427 prepare_write_message(con); 1428 goto more; 1429 } 1430 if (con->in_seq > con->in_seq_acked) { 1431 prepare_write_ack(con); 1432 goto more; 1433 } 1434 } 1435 1436 /* Nothing to do! */ 1437 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); 1438 dout("try_write nothing else to write.\n"); 1439 ret = 0; 1440 out: 1441 dout("try_write done on %p ret %d\n", con, ret); 1442 return ret; 1443 } 1444 1445 void ceph_con_v1_revoke(struct ceph_connection *con) 1446 { 1447 struct ceph_msg *msg = con->out_msg; 1448 1449 WARN_ON(con->out_skip); 1450 /* footer */ 1451 if (con->out_msg_done) { 1452 con->out_skip += con_out_kvec_skip(con); 1453 } else { 1454 WARN_ON(!msg->data_length); 1455 con->out_skip += sizeof_footer(con); 1456 } 1457 /* data, middle, front */ 1458 if (msg->data_length) 1459 con->out_skip += msg->cursor.total_resid; 1460 if (msg->middle) 1461 con->out_skip += con_out_kvec_skip(con); 1462 con->out_skip += con_out_kvec_skip(con); 1463 1464 dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con, 1465 con->out_kvec_bytes, con->out_skip); 1466 } 1467 1468 void ceph_con_v1_revoke_incoming(struct ceph_connection *con) 1469 { 1470 unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); 1471 unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); 1472 unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); 1473 1474 /* skip rest of message */ 1475 con->in_base_pos = con->in_base_pos - 1476 sizeof(struct ceph_msg_header) - 1477 front_len - 1478 middle_len - 1479 data_len - 1480 sizeof(struct ceph_msg_footer); 1481 1482 con->in_tag = CEPH_MSGR_TAG_READY; 1483 con->in_seq++; 1484 1485 dout("%s con %p in_base_pos %d\n", __func__, con, con->in_base_pos); 1486 } 1487 1488 bool ceph_con_v1_opened(struct ceph_connection *con) 1489 { 1490 return con->connect_seq; 1491 } 1492 1493 void ceph_con_v1_reset_session(struct ceph_connection *con) 1494 { 1495 con->connect_seq = 0; 1496 con->peer_global_seq = 0; 1497 } 1498 1499 void ceph_con_v1_reset_protocol(struct ceph_connection *con) 1500 { 1501 con->out_skip = 0; 1502 } 1503