1 /* 2 * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 * 32 */ 33 #include <linux/kernel.h> 34 #include <linux/slab.h> 35 #include <net/sock.h> 36 #include <linux/in.h> 37 #include <linux/export.h> 38 #include <linux/time.h> 39 #include <linux/rds.h> 40 41 #include "rds.h" 42 43 void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn, 44 struct in6_addr *saddr) 45 { 46 int i; 47 48 refcount_set(&inc->i_refcount, 1); 49 INIT_LIST_HEAD(&inc->i_item); 50 inc->i_conn = conn; 51 inc->i_saddr = *saddr; 52 inc->i_rdma_cookie = 0; 53 inc->i_rx_tstamp = ktime_set(0, 0); 54 55 for (i = 0; i < RDS_RX_MAX_TRACES; i++) 56 inc->i_rx_lat_trace[i] = 0; 57 } 58 EXPORT_SYMBOL_GPL(rds_inc_init); 59 60 void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *cp, 61 struct in6_addr *saddr) 62 { 63 refcount_set(&inc->i_refcount, 1); 64 INIT_LIST_HEAD(&inc->i_item); 65 inc->i_conn = cp->cp_conn; 66 inc->i_conn_path = cp; 67 inc->i_saddr = *saddr; 68 inc->i_rdma_cookie = 0; 69 inc->i_rx_tstamp = ktime_set(0, 0); 70 } 71 EXPORT_SYMBOL_GPL(rds_inc_path_init); 72 73 static void rds_inc_addref(struct rds_incoming *inc) 74 { 75 rdsdebug("addref inc %p ref %d\n", inc, refcount_read(&inc->i_refcount)); 76 refcount_inc(&inc->i_refcount); 77 } 78 79 void rds_inc_put(struct rds_incoming *inc) 80 { 81 rdsdebug("put inc %p ref %d\n", inc, refcount_read(&inc->i_refcount)); 82 if (refcount_dec_and_test(&inc->i_refcount)) { 83 BUG_ON(!list_empty(&inc->i_item)); 84 85 inc->i_conn->c_trans->inc_free(inc); 86 } 87 } 88 EXPORT_SYMBOL_GPL(rds_inc_put); 89 90 static void rds_recv_rcvbuf_delta(struct rds_sock *rs, struct sock *sk, 91 struct rds_cong_map *map, 92 int delta, __be16 port) 93 { 94 int now_congested; 95 96 if (delta == 0) 97 return; 98 99 rs->rs_rcv_bytes += delta; 100 if (delta > 0) 101 rds_stats_add(s_recv_bytes_added_to_socket, delta); 102 else 103 rds_stats_add(s_recv_bytes_removed_from_socket, -delta); 104 105 /* loop transport doesn't send/recv congestion updates */ 106 if (rs->rs_transport->t_type == RDS_TRANS_LOOP) 107 return; 108 109 now_congested = rs->rs_rcv_bytes > rds_sk_rcvbuf(rs); 110 111 rdsdebug("rs %p (%pI6c:%u) recv bytes %d buf %d " 112 "now_cong %d delta %d\n", 113 rs, &rs->rs_bound_addr, 114 ntohs(rs->rs_bound_port), rs->rs_rcv_bytes, 115 rds_sk_rcvbuf(rs), now_congested, delta); 116 117 /* wasn't -> am congested */ 118 if (!rs->rs_congested && now_congested) { 119 rs->rs_congested = 1; 120 rds_cong_set_bit(map, port); 121 rds_cong_queue_updates(map); 122 } 123 /* was -> aren't congested */ 124 /* Require more free space before reporting uncongested to prevent 125 bouncing cong/uncong state too often */ 126 else if (rs->rs_congested && (rs->rs_rcv_bytes < (rds_sk_rcvbuf(rs)/2))) { 127 rs->rs_congested = 0; 128 rds_cong_clear_bit(map, port); 129 rds_cong_queue_updates(map); 130 } 131 132 /* do nothing if no change in cong state */ 133 } 134 135 static void rds_conn_peer_gen_update(struct rds_connection *conn, 136 u32 peer_gen_num) 137 { 138 int i; 139 struct rds_message *rm, *tmp; 140 unsigned long flags; 141 142 WARN_ON(conn->c_trans->t_type != RDS_TRANS_TCP); 143 if (peer_gen_num != 0) { 144 if (conn->c_peer_gen_num != 0 && 145 peer_gen_num != conn->c_peer_gen_num) { 146 for (i = 0; i < RDS_MPATH_WORKERS; i++) { 147 struct rds_conn_path *cp; 148 149 cp = &conn->c_path[i]; 150 spin_lock_irqsave(&cp->cp_lock, flags); 151 cp->cp_next_tx_seq = 1; 152 cp->cp_next_rx_seq = 0; 153 list_for_each_entry_safe(rm, tmp, 154 &cp->cp_retrans, 155 m_conn_item) { 156 set_bit(RDS_MSG_FLUSH, &rm->m_flags); 157 } 158 spin_unlock_irqrestore(&cp->cp_lock, flags); 159 } 160 } 161 conn->c_peer_gen_num = peer_gen_num; 162 } 163 } 164 165 /* 166 * Process all extension headers that come with this message. 167 */ 168 static void rds_recv_incoming_exthdrs(struct rds_incoming *inc, struct rds_sock *rs) 169 { 170 struct rds_header *hdr = &inc->i_hdr; 171 unsigned int pos = 0, type, len; 172 union { 173 struct rds_ext_header_version version; 174 struct rds_ext_header_rdma rdma; 175 struct rds_ext_header_rdma_dest rdma_dest; 176 } buffer; 177 178 while (1) { 179 len = sizeof(buffer); 180 type = rds_message_next_extension(hdr, &pos, &buffer, &len); 181 if (type == RDS_EXTHDR_NONE) 182 break; 183 /* Process extension header here */ 184 switch (type) { 185 case RDS_EXTHDR_RDMA: 186 rds_rdma_unuse(rs, be32_to_cpu(buffer.rdma.h_rdma_rkey), 0); 187 break; 188 189 case RDS_EXTHDR_RDMA_DEST: 190 /* We ignore the size for now. We could stash it 191 * somewhere and use it for error checking. */ 192 inc->i_rdma_cookie = rds_rdma_make_cookie( 193 be32_to_cpu(buffer.rdma_dest.h_rdma_rkey), 194 be32_to_cpu(buffer.rdma_dest.h_rdma_offset)); 195 196 break; 197 } 198 } 199 } 200 201 static void rds_recv_hs_exthdrs(struct rds_header *hdr, 202 struct rds_connection *conn) 203 { 204 unsigned int pos = 0, type, len; 205 union { 206 struct rds_ext_header_version version; 207 u16 rds_npaths; 208 u32 rds_gen_num; 209 } buffer; 210 u32 new_peer_gen_num = 0; 211 212 while (1) { 213 len = sizeof(buffer); 214 type = rds_message_next_extension(hdr, &pos, &buffer, &len); 215 if (type == RDS_EXTHDR_NONE) 216 break; 217 /* Process extension header here */ 218 switch (type) { 219 case RDS_EXTHDR_NPATHS: 220 conn->c_npaths = min_t(int, RDS_MPATH_WORKERS, 221 be16_to_cpu(buffer.rds_npaths)); 222 break; 223 case RDS_EXTHDR_GEN_NUM: 224 new_peer_gen_num = be32_to_cpu(buffer.rds_gen_num); 225 break; 226 default: 227 pr_warn_ratelimited("ignoring unknown exthdr type " 228 "0x%x\n", type); 229 } 230 } 231 /* if RDS_EXTHDR_NPATHS was not found, default to a single-path */ 232 conn->c_npaths = max_t(int, conn->c_npaths, 1); 233 conn->c_ping_triggered = 0; 234 rds_conn_peer_gen_update(conn, new_peer_gen_num); 235 } 236 237 /* rds_start_mprds() will synchronously start multiple paths when appropriate. 238 * The scheme is based on the following rules: 239 * 240 * 1. rds_sendmsg on first connect attempt sends the probe ping, with the 241 * sender's npaths (s_npaths) 242 * 2. rcvr of probe-ping knows the mprds_paths = min(s_npaths, r_npaths). It 243 * sends back a probe-pong with r_npaths. After that, if rcvr is the 244 * smaller ip addr, it starts rds_conn_path_connect_if_down on all 245 * mprds_paths. 246 * 3. sender gets woken up, and can move to rds_conn_path_connect_if_down. 247 * If it is the smaller ipaddr, rds_conn_path_connect_if_down can be 248 * called after reception of the probe-pong on all mprds_paths. 249 * Otherwise (sender of probe-ping is not the smaller ip addr): just call 250 * rds_conn_path_connect_if_down on the hashed path. (see rule 4) 251 * 4. rds_connect_worker must only trigger a connection if laddr < faddr. 252 * 5. sender may end up queuing the packet on the cp. will get sent out later. 253 * when connection is completed. 254 */ 255 static void rds_start_mprds(struct rds_connection *conn) 256 { 257 int i; 258 struct rds_conn_path *cp; 259 260 if (conn->c_npaths > 1 && 261 rds_addr_cmp(&conn->c_laddr, &conn->c_faddr) < 0) { 262 for (i = 0; i < conn->c_npaths; i++) { 263 cp = &conn->c_path[i]; 264 rds_conn_path_connect_if_down(cp); 265 } 266 } 267 } 268 269 /* 270 * The transport must make sure that this is serialized against other 271 * rx and conn reset on this specific conn. 272 * 273 * We currently assert that only one fragmented message will be sent 274 * down a connection at a time. This lets us reassemble in the conn 275 * instead of per-flow which means that we don't have to go digging through 276 * flows to tear down partial reassembly progress on conn failure and 277 * we save flow lookup and locking for each frag arrival. It does mean 278 * that small messages will wait behind large ones. Fragmenting at all 279 * is only to reduce the memory consumption of pre-posted buffers. 280 * 281 * The caller passes in saddr and daddr instead of us getting it from the 282 * conn. This lets loopback, who only has one conn for both directions, 283 * tell us which roles the addrs in the conn are playing for this message. 284 */ 285 void rds_recv_incoming(struct rds_connection *conn, struct in6_addr *saddr, 286 struct in6_addr *daddr, 287 struct rds_incoming *inc, gfp_t gfp) 288 { 289 struct rds_sock *rs = NULL; 290 struct sock *sk; 291 unsigned long flags; 292 struct rds_conn_path *cp; 293 294 inc->i_conn = conn; 295 inc->i_rx_jiffies = jiffies; 296 if (conn->c_trans->t_mp_capable) 297 cp = inc->i_conn_path; 298 else 299 cp = &conn->c_path[0]; 300 301 rdsdebug("conn %p next %llu inc %p seq %llu len %u sport %u dport %u " 302 "flags 0x%x rx_jiffies %lu\n", conn, 303 (unsigned long long)cp->cp_next_rx_seq, 304 inc, 305 (unsigned long long)be64_to_cpu(inc->i_hdr.h_sequence), 306 be32_to_cpu(inc->i_hdr.h_len), 307 be16_to_cpu(inc->i_hdr.h_sport), 308 be16_to_cpu(inc->i_hdr.h_dport), 309 inc->i_hdr.h_flags, 310 inc->i_rx_jiffies); 311 312 /* 313 * Sequence numbers should only increase. Messages get their 314 * sequence number as they're queued in a sending conn. They 315 * can be dropped, though, if the sending socket is closed before 316 * they hit the wire. So sequence numbers can skip forward 317 * under normal operation. They can also drop back in the conn 318 * failover case as previously sent messages are resent down the 319 * new instance of a conn. We drop those, otherwise we have 320 * to assume that the next valid seq does not come after a 321 * hole in the fragment stream. 322 * 323 * The headers don't give us a way to realize if fragments of 324 * a message have been dropped. We assume that frags that arrive 325 * to a flow are part of the current message on the flow that is 326 * being reassembled. This means that senders can't drop messages 327 * from the sending conn until all their frags are sent. 328 * 329 * XXX we could spend more on the wire to get more robust failure 330 * detection, arguably worth it to avoid data corruption. 331 */ 332 if (be64_to_cpu(inc->i_hdr.h_sequence) < cp->cp_next_rx_seq && 333 (inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) { 334 rds_stats_inc(s_recv_drop_old_seq); 335 goto out; 336 } 337 cp->cp_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1; 338 339 if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) { 340 if (inc->i_hdr.h_sport == 0) { 341 rdsdebug("ignore ping with 0 sport from %pI6c\n", 342 saddr); 343 goto out; 344 } 345 rds_stats_inc(s_recv_ping); 346 rds_send_pong(cp, inc->i_hdr.h_sport); 347 /* if this is a handshake ping, start multipath if necessary */ 348 if (RDS_HS_PROBE(be16_to_cpu(inc->i_hdr.h_sport), 349 be16_to_cpu(inc->i_hdr.h_dport))) { 350 rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn); 351 rds_start_mprds(cp->cp_conn); 352 } 353 goto out; 354 } 355 356 if (be16_to_cpu(inc->i_hdr.h_dport) == RDS_FLAG_PROBE_PORT && 357 inc->i_hdr.h_sport == 0) { 358 rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn); 359 /* if this is a handshake pong, start multipath if necessary */ 360 rds_start_mprds(cp->cp_conn); 361 wake_up(&cp->cp_conn->c_hs_waitq); 362 goto out; 363 } 364 365 rs = rds_find_bound(daddr, inc->i_hdr.h_dport, conn->c_bound_if); 366 if (!rs) { 367 rds_stats_inc(s_recv_drop_no_sock); 368 goto out; 369 } 370 371 /* Process extension headers */ 372 rds_recv_incoming_exthdrs(inc, rs); 373 374 /* We can be racing with rds_release() which marks the socket dead. */ 375 sk = rds_rs_to_sk(rs); 376 377 /* serialize with rds_release -> sock_orphan */ 378 write_lock_irqsave(&rs->rs_recv_lock, flags); 379 if (!sock_flag(sk, SOCK_DEAD)) { 380 rdsdebug("adding inc %p to rs %p's recv queue\n", inc, rs); 381 rds_stats_inc(s_recv_queued); 382 rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong, 383 be32_to_cpu(inc->i_hdr.h_len), 384 inc->i_hdr.h_dport); 385 if (sock_flag(sk, SOCK_RCVTSTAMP)) 386 inc->i_rx_tstamp = ktime_get_real(); 387 rds_inc_addref(inc); 388 inc->i_rx_lat_trace[RDS_MSG_RX_END] = local_clock(); 389 list_add_tail(&inc->i_item, &rs->rs_recv_queue); 390 __rds_wake_sk_sleep(sk); 391 } else { 392 rds_stats_inc(s_recv_drop_dead_sock); 393 } 394 write_unlock_irqrestore(&rs->rs_recv_lock, flags); 395 396 out: 397 if (rs) 398 rds_sock_put(rs); 399 } 400 EXPORT_SYMBOL_GPL(rds_recv_incoming); 401 402 /* 403 * be very careful here. This is being called as the condition in 404 * wait_event_*() needs to cope with being called many times. 405 */ 406 static int rds_next_incoming(struct rds_sock *rs, struct rds_incoming **inc) 407 { 408 unsigned long flags; 409 410 if (!*inc) { 411 read_lock_irqsave(&rs->rs_recv_lock, flags); 412 if (!list_empty(&rs->rs_recv_queue)) { 413 *inc = list_entry(rs->rs_recv_queue.next, 414 struct rds_incoming, 415 i_item); 416 rds_inc_addref(*inc); 417 } 418 read_unlock_irqrestore(&rs->rs_recv_lock, flags); 419 } 420 421 return *inc != NULL; 422 } 423 424 static int rds_still_queued(struct rds_sock *rs, struct rds_incoming *inc, 425 int drop) 426 { 427 struct sock *sk = rds_rs_to_sk(rs); 428 int ret = 0; 429 unsigned long flags; 430 431 write_lock_irqsave(&rs->rs_recv_lock, flags); 432 if (!list_empty(&inc->i_item)) { 433 ret = 1; 434 if (drop) { 435 /* XXX make sure this i_conn is reliable */ 436 rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong, 437 -be32_to_cpu(inc->i_hdr.h_len), 438 inc->i_hdr.h_dport); 439 list_del_init(&inc->i_item); 440 rds_inc_put(inc); 441 } 442 } 443 write_unlock_irqrestore(&rs->rs_recv_lock, flags); 444 445 rdsdebug("inc %p rs %p still %d dropped %d\n", inc, rs, ret, drop); 446 return ret; 447 } 448 449 /* 450 * Pull errors off the error queue. 451 * If msghdr is NULL, we will just purge the error queue. 452 */ 453 int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msghdr) 454 { 455 struct rds_notifier *notifier; 456 struct rds_rdma_notify cmsg = { 0 }; /* fill holes with zero */ 457 unsigned int count = 0, max_messages = ~0U; 458 unsigned long flags; 459 LIST_HEAD(copy); 460 int err = 0; 461 462 463 /* put_cmsg copies to user space and thus may sleep. We can't do this 464 * with rs_lock held, so first grab as many notifications as we can stuff 465 * in the user provided cmsg buffer. We don't try to copy more, to avoid 466 * losing notifications - except when the buffer is so small that it wouldn't 467 * even hold a single notification. Then we give him as much of this single 468 * msg as we can squeeze in, and set MSG_CTRUNC. 469 */ 470 if (msghdr) { 471 max_messages = msghdr->msg_controllen / CMSG_SPACE(sizeof(cmsg)); 472 if (!max_messages) 473 max_messages = 1; 474 } 475 476 spin_lock_irqsave(&rs->rs_lock, flags); 477 while (!list_empty(&rs->rs_notify_queue) && count < max_messages) { 478 notifier = list_entry(rs->rs_notify_queue.next, 479 struct rds_notifier, n_list); 480 list_move(¬ifier->n_list, ©); 481 count++; 482 } 483 spin_unlock_irqrestore(&rs->rs_lock, flags); 484 485 if (!count) 486 return 0; 487 488 while (!list_empty(©)) { 489 notifier = list_entry(copy.next, struct rds_notifier, n_list); 490 491 if (msghdr) { 492 cmsg.user_token = notifier->n_user_token; 493 cmsg.status = notifier->n_status; 494 495 err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_RDMA_STATUS, 496 sizeof(cmsg), &cmsg); 497 if (err) 498 break; 499 } 500 501 list_del_init(¬ifier->n_list); 502 kfree(notifier); 503 } 504 505 /* If we bailed out because of an error in put_cmsg, 506 * we may be left with one or more notifications that we 507 * didn't process. Return them to the head of the list. */ 508 if (!list_empty(©)) { 509 spin_lock_irqsave(&rs->rs_lock, flags); 510 list_splice(©, &rs->rs_notify_queue); 511 spin_unlock_irqrestore(&rs->rs_lock, flags); 512 } 513 514 return err; 515 } 516 517 /* 518 * Queue a congestion notification 519 */ 520 static int rds_notify_cong(struct rds_sock *rs, struct msghdr *msghdr) 521 { 522 uint64_t notify = rs->rs_cong_notify; 523 unsigned long flags; 524 int err; 525 526 err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_CONG_UPDATE, 527 sizeof(notify), ¬ify); 528 if (err) 529 return err; 530 531 spin_lock_irqsave(&rs->rs_lock, flags); 532 rs->rs_cong_notify &= ~notify; 533 spin_unlock_irqrestore(&rs->rs_lock, flags); 534 535 return 0; 536 } 537 538 /* 539 * Receive any control messages. 540 */ 541 static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg, 542 struct rds_sock *rs) 543 { 544 int ret = 0; 545 546 if (inc->i_rdma_cookie) { 547 ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RDMA_DEST, 548 sizeof(inc->i_rdma_cookie), &inc->i_rdma_cookie); 549 if (ret) 550 goto out; 551 } 552 553 if ((inc->i_rx_tstamp != 0) && 554 sock_flag(rds_rs_to_sk(rs), SOCK_RCVTSTAMP)) { 555 struct timeval tv = ktime_to_timeval(inc->i_rx_tstamp); 556 ret = put_cmsg(msg, SOL_SOCKET, SCM_TIMESTAMP, 557 sizeof(tv), &tv); 558 if (ret) 559 goto out; 560 } 561 562 if (rs->rs_rx_traces) { 563 struct rds_cmsg_rx_trace t; 564 int i, j; 565 566 memset(&t, 0, sizeof(t)); 567 inc->i_rx_lat_trace[RDS_MSG_RX_CMSG] = local_clock(); 568 t.rx_traces = rs->rs_rx_traces; 569 for (i = 0; i < rs->rs_rx_traces; i++) { 570 j = rs->rs_rx_trace[i]; 571 t.rx_trace_pos[i] = j; 572 t.rx_trace[i] = inc->i_rx_lat_trace[j + 1] - 573 inc->i_rx_lat_trace[j]; 574 } 575 576 ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RXPATH_LATENCY, 577 sizeof(t), &t); 578 if (ret) 579 goto out; 580 } 581 582 out: 583 return ret; 584 } 585 586 static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg) 587 { 588 struct rds_msg_zcopy_queue *q = &rs->rs_zcookie_queue; 589 struct rds_msg_zcopy_info *info = NULL; 590 struct rds_zcopy_cookies *done; 591 unsigned long flags; 592 593 if (!msg->msg_control) 594 return false; 595 596 if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) || 597 msg->msg_controllen < CMSG_SPACE(sizeof(*done))) 598 return false; 599 600 spin_lock_irqsave(&q->lock, flags); 601 if (!list_empty(&q->zcookie_head)) { 602 info = list_entry(q->zcookie_head.next, 603 struct rds_msg_zcopy_info, rs_zcookie_next); 604 list_del(&info->rs_zcookie_next); 605 } 606 spin_unlock_irqrestore(&q->lock, flags); 607 if (!info) 608 return false; 609 done = &info->zcookies; 610 if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done), 611 done)) { 612 spin_lock_irqsave(&q->lock, flags); 613 list_add(&info->rs_zcookie_next, &q->zcookie_head); 614 spin_unlock_irqrestore(&q->lock, flags); 615 return false; 616 } 617 kfree(info); 618 return true; 619 } 620 621 int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, 622 int msg_flags) 623 { 624 struct sock *sk = sock->sk; 625 struct rds_sock *rs = rds_sk_to_rs(sk); 626 long timeo; 627 int ret = 0, nonblock = msg_flags & MSG_DONTWAIT; 628 DECLARE_SOCKADDR(struct sockaddr_in6 *, sin6, msg->msg_name); 629 DECLARE_SOCKADDR(struct sockaddr_in *, sin, msg->msg_name); 630 struct rds_incoming *inc = NULL; 631 632 /* udp_recvmsg()->sock_recvtimeo() gets away without locking too.. */ 633 timeo = sock_rcvtimeo(sk, nonblock); 634 635 rdsdebug("size %zu flags 0x%x timeo %ld\n", size, msg_flags, timeo); 636 637 if (msg_flags & MSG_OOB) 638 goto out; 639 if (msg_flags & MSG_ERRQUEUE) 640 return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR); 641 642 while (1) { 643 /* If there are pending notifications, do those - and nothing else */ 644 if (!list_empty(&rs->rs_notify_queue)) { 645 ret = rds_notify_queue_get(rs, msg); 646 break; 647 } 648 649 if (rs->rs_cong_notify) { 650 ret = rds_notify_cong(rs, msg); 651 break; 652 } 653 654 if (!rds_next_incoming(rs, &inc)) { 655 if (nonblock) { 656 bool reaped = rds_recvmsg_zcookie(rs, msg); 657 658 ret = reaped ? 0 : -EAGAIN; 659 break; 660 } 661 662 timeo = wait_event_interruptible_timeout(*sk_sleep(sk), 663 (!list_empty(&rs->rs_notify_queue) || 664 rs->rs_cong_notify || 665 rds_next_incoming(rs, &inc)), timeo); 666 rdsdebug("recvmsg woke inc %p timeo %ld\n", inc, 667 timeo); 668 if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT) 669 continue; 670 671 ret = timeo; 672 if (ret == 0) 673 ret = -ETIMEDOUT; 674 break; 675 } 676 677 rdsdebug("copying inc %p from %pI6c:%u to user\n", inc, 678 &inc->i_conn->c_faddr, 679 ntohs(inc->i_hdr.h_sport)); 680 ret = inc->i_conn->c_trans->inc_copy_to_user(inc, &msg->msg_iter); 681 if (ret < 0) 682 break; 683 684 /* 685 * if the message we just copied isn't at the head of the 686 * recv queue then someone else raced us to return it, try 687 * to get the next message. 688 */ 689 if (!rds_still_queued(rs, inc, !(msg_flags & MSG_PEEK))) { 690 rds_inc_put(inc); 691 inc = NULL; 692 rds_stats_inc(s_recv_deliver_raced); 693 iov_iter_revert(&msg->msg_iter, ret); 694 continue; 695 } 696 697 if (ret < be32_to_cpu(inc->i_hdr.h_len)) { 698 if (msg_flags & MSG_TRUNC) 699 ret = be32_to_cpu(inc->i_hdr.h_len); 700 msg->msg_flags |= MSG_TRUNC; 701 } 702 703 if (rds_cmsg_recv(inc, msg, rs)) { 704 ret = -EFAULT; 705 goto out; 706 } 707 rds_recvmsg_zcookie(rs, msg); 708 709 rds_stats_inc(s_recv_delivered); 710 711 if (msg->msg_name) { 712 if (ipv6_addr_v4mapped(&inc->i_saddr)) { 713 sin = (struct sockaddr_in *)msg->msg_name; 714 715 sin->sin_family = AF_INET; 716 sin->sin_port = inc->i_hdr.h_sport; 717 sin->sin_addr.s_addr = 718 inc->i_saddr.s6_addr32[3]; 719 memset(sin->sin_zero, 0, sizeof(sin->sin_zero)); 720 msg->msg_namelen = sizeof(*sin); 721 } else { 722 sin6 = (struct sockaddr_in6 *)msg->msg_name; 723 724 sin6->sin6_family = AF_INET6; 725 sin6->sin6_port = inc->i_hdr.h_sport; 726 sin6->sin6_addr = inc->i_saddr; 727 sin6->sin6_flowinfo = 0; 728 sin6->sin6_scope_id = rs->rs_bound_scope_id; 729 msg->msg_namelen = sizeof(*sin6); 730 } 731 } 732 break; 733 } 734 735 if (inc) 736 rds_inc_put(inc); 737 738 out: 739 return ret; 740 } 741 742 /* 743 * The socket is being shut down and we're asked to drop messages that were 744 * queued for recvmsg. The caller has unbound the socket so the receive path 745 * won't queue any more incoming fragments or messages on the socket. 746 */ 747 void rds_clear_recv_queue(struct rds_sock *rs) 748 { 749 struct sock *sk = rds_rs_to_sk(rs); 750 struct rds_incoming *inc, *tmp; 751 unsigned long flags; 752 753 write_lock_irqsave(&rs->rs_recv_lock, flags); 754 list_for_each_entry_safe(inc, tmp, &rs->rs_recv_queue, i_item) { 755 rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong, 756 -be32_to_cpu(inc->i_hdr.h_len), 757 inc->i_hdr.h_dport); 758 list_del_init(&inc->i_item); 759 rds_inc_put(inc); 760 } 761 write_unlock_irqrestore(&rs->rs_recv_lock, flags); 762 } 763 764 /* 765 * inc->i_saddr isn't used here because it is only set in the receive 766 * path. 767 */ 768 void rds_inc_info_copy(struct rds_incoming *inc, 769 struct rds_info_iterator *iter, 770 __be32 saddr, __be32 daddr, int flip) 771 { 772 struct rds_info_message minfo; 773 774 minfo.seq = be64_to_cpu(inc->i_hdr.h_sequence); 775 minfo.len = be32_to_cpu(inc->i_hdr.h_len); 776 777 if (flip) { 778 minfo.laddr = daddr; 779 minfo.faddr = saddr; 780 minfo.lport = inc->i_hdr.h_dport; 781 minfo.fport = inc->i_hdr.h_sport; 782 } else { 783 minfo.laddr = saddr; 784 minfo.faddr = daddr; 785 minfo.lport = inc->i_hdr.h_sport; 786 minfo.fport = inc->i_hdr.h_dport; 787 } 788 789 minfo.flags = 0; 790 791 rds_info_copy(iter, &minfo, sizeof(minfo)); 792 } 793 794 #if IS_ENABLED(CONFIG_IPV6) 795 void rds6_inc_info_copy(struct rds_incoming *inc, 796 struct rds_info_iterator *iter, 797 struct in6_addr *saddr, struct in6_addr *daddr, 798 int flip) 799 { 800 struct rds6_info_message minfo6; 801 802 minfo6.seq = be64_to_cpu(inc->i_hdr.h_sequence); 803 minfo6.len = be32_to_cpu(inc->i_hdr.h_len); 804 805 if (flip) { 806 minfo6.laddr = *daddr; 807 minfo6.faddr = *saddr; 808 minfo6.lport = inc->i_hdr.h_dport; 809 minfo6.fport = inc->i_hdr.h_sport; 810 } else { 811 minfo6.laddr = *saddr; 812 minfo6.faddr = *daddr; 813 minfo6.lport = inc->i_hdr.h_sport; 814 minfo6.fport = inc->i_hdr.h_dport; 815 } 816 817 rds_info_copy(iter, &minfo6, sizeof(minfo6)); 818 } 819 #endif 820