1 /* 2 * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 * 32 */ 33 #include <linux/kernel.h> 34 #include <linux/moduleparam.h> 35 #include <linux/gfp.h> 36 #include <net/sock.h> 37 #include <linux/in.h> 38 #include <linux/list.h> 39 #include <linux/ratelimit.h> 40 #include <linux/export.h> 41 #include <linux/sizes.h> 42 43 #include "rds.h" 44 45 /* When transmitting messages in rds_send_xmit, we need to emerge from 46 * time to time and briefly release the CPU. Otherwise the softlock watchdog 47 * will kick our shin. 48 * Also, it seems fairer to not let one busy connection stall all the 49 * others. 50 * 51 * send_batch_count is the number of times we'll loop in send_xmit. Setting 52 * it to 0 will restore the old behavior (where we looped until we had 53 * drained the queue). 54 */ 55 static int send_batch_count = SZ_1K; 56 module_param(send_batch_count, int, 0444); 57 MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); 58 59 static void rds_send_remove_from_sock(struct list_head *messages, int status); 60 61 /* 62 * Reset the send state. Callers must ensure that this doesn't race with 63 * rds_send_xmit(). 64 */ 65 void rds_send_path_reset(struct rds_conn_path *cp) 66 { 67 struct rds_message *rm, *tmp; 68 unsigned long flags; 69 70 if (cp->cp_xmit_rm) { 71 rm = cp->cp_xmit_rm; 72 cp->cp_xmit_rm = NULL; 73 /* Tell the user the RDMA op is no longer mapped by the 74 * transport. This isn't entirely true (it's flushed out 75 * independently) but as the connection is down, there's 76 * no ongoing RDMA to/from that memory */ 77 rds_message_unmapped(rm); 78 rds_message_put(rm); 79 } 80 81 cp->cp_xmit_sg = 0; 82 cp->cp_xmit_hdr_off = 0; 83 cp->cp_xmit_data_off = 0; 84 cp->cp_xmit_atomic_sent = 0; 85 cp->cp_xmit_rdma_sent = 0; 86 cp->cp_xmit_data_sent = 0; 87 88 cp->cp_conn->c_map_queued = 0; 89 90 cp->cp_unacked_packets = rds_sysctl_max_unacked_packets; 91 cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes; 92 93 /* Mark messages as retransmissions, and move them to the send q */ 94 spin_lock_irqsave(&cp->cp_lock, flags); 95 list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { 96 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 97 set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags); 98 } 99 list_splice_init(&cp->cp_retrans, &cp->cp_send_queue); 100 spin_unlock_irqrestore(&cp->cp_lock, flags); 101 } 102 EXPORT_SYMBOL_GPL(rds_send_path_reset); 103 104 static int acquire_in_xmit(struct rds_conn_path *cp) 105 { 106 return test_and_set_bit_lock(RDS_IN_XMIT, &cp->cp_flags) == 0; 107 } 108 109 static void release_in_xmit(struct rds_conn_path *cp) 110 { 111 clear_bit_unlock(RDS_IN_XMIT, &cp->cp_flags); 112 /* 113 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a 114 * hot path and finding waiters is very rare. We don't want to walk 115 * the system-wide hashed waitqueue buckets in the fast path only to 116 * almost never find waiters. 117 */ 118 if (waitqueue_active(&cp->cp_waitq)) 119 wake_up_all(&cp->cp_waitq); 120 } 121 122 /* 123 * We're making the conscious trade-off here to only send one message 124 * down the connection at a time. 125 * Pro: 126 * - tx queueing is a simple fifo list 127 * - reassembly is optional and easily done by transports per conn 128 * - no per flow rx lookup at all, straight to the socket 129 * - less per-frag memory and wire overhead 130 * Con: 131 * - queued acks can be delayed behind large messages 132 * Depends: 133 * - small message latency is higher behind queued large messages 134 * - large message latency isn't starved by intervening small sends 135 */ 136 int rds_send_xmit(struct rds_conn_path *cp) 137 { 138 struct rds_connection *conn = cp->cp_conn; 139 struct rds_message *rm; 140 unsigned long flags; 141 unsigned int tmp; 142 struct scatterlist *sg; 143 int ret = 0; 144 LIST_HEAD(to_be_dropped); 145 int batch_count; 146 unsigned long send_gen = 0; 147 int same_rm = 0; 148 149 restart: 150 batch_count = 0; 151 152 /* 153 * sendmsg calls here after having queued its message on the send 154 * queue. We only have one task feeding the connection at a time. If 155 * another thread is already feeding the queue then we back off. This 156 * avoids blocking the caller and trading per-connection data between 157 * caches per message. 158 */ 159 if (!acquire_in_xmit(cp)) { 160 rds_stats_inc(s_send_lock_contention); 161 ret = -ENOMEM; 162 goto out; 163 } 164 165 if (rds_destroy_pending(cp->cp_conn)) { 166 release_in_xmit(cp); 167 ret = -ENETUNREACH; /* dont requeue send work */ 168 goto out; 169 } 170 171 /* 172 * we record the send generation after doing the xmit acquire. 173 * if someone else manages to jump in and do some work, we'll use 174 * this to avoid a goto restart farther down. 175 * 176 * The acquire_in_xmit() check above ensures that only one 177 * caller can increment c_send_gen at any time. 178 */ 179 send_gen = READ_ONCE(cp->cp_send_gen) + 1; 180 WRITE_ONCE(cp->cp_send_gen, send_gen); 181 182 /* 183 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, 184 * we do the opposite to avoid races. 185 */ 186 if (!rds_conn_path_up(cp)) { 187 release_in_xmit(cp); 188 ret = 0; 189 goto out; 190 } 191 192 if (conn->c_trans->xmit_path_prepare) 193 conn->c_trans->xmit_path_prepare(cp); 194 195 /* 196 * spin trying to push headers and data down the connection until 197 * the connection doesn't make forward progress. 198 */ 199 while (1) { 200 201 rm = cp->cp_xmit_rm; 202 203 if (!rm) { 204 same_rm = 0; 205 } else { 206 same_rm++; 207 if (same_rm >= 4096) { 208 rds_stats_inc(s_send_stuck_rm); 209 ret = -EAGAIN; 210 break; 211 } 212 } 213 214 /* 215 * If between sending messages, we can send a pending congestion 216 * map update. 217 */ 218 if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) { 219 rm = rds_cong_update_alloc(conn); 220 if (IS_ERR(rm)) { 221 ret = PTR_ERR(rm); 222 break; 223 } 224 rm->data.op_active = 1; 225 rm->m_inc.i_conn_path = cp; 226 rm->m_inc.i_conn = cp->cp_conn; 227 228 cp->cp_xmit_rm = rm; 229 } 230 231 /* 232 * If not already working on one, grab the next message. 233 * 234 * cp_xmit_rm holds a ref while we're sending this message down 235 * the connction. We can use this ref while holding the 236 * send_sem.. rds_send_reset() is serialized with it. 237 */ 238 if (!rm) { 239 unsigned int len; 240 241 batch_count++; 242 243 /* we want to process as big a batch as we can, but 244 * we also want to avoid softlockups. If we've been 245 * through a lot of messages, lets back off and see 246 * if anyone else jumps in 247 */ 248 if (batch_count >= send_batch_count) 249 goto over_batch; 250 251 spin_lock_irqsave(&cp->cp_lock, flags); 252 253 if (!list_empty(&cp->cp_send_queue)) { 254 rm = list_entry(cp->cp_send_queue.next, 255 struct rds_message, 256 m_conn_item); 257 rds_message_addref(rm); 258 259 /* 260 * Move the message from the send queue to the retransmit 261 * list right away. 262 */ 263 list_move_tail(&rm->m_conn_item, 264 &cp->cp_retrans); 265 } 266 267 spin_unlock_irqrestore(&cp->cp_lock, flags); 268 269 if (!rm) 270 break; 271 272 /* Unfortunately, the way Infiniband deals with 273 * RDMA to a bad MR key is by moving the entire 274 * queue pair to error state. We could possibly 275 * recover from that, but right now we drop the 276 * connection. 277 * Therefore, we never retransmit messages with RDMA ops. 278 */ 279 if (test_bit(RDS_MSG_FLUSH, &rm->m_flags) || 280 (rm->rdma.op_active && 281 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))) { 282 spin_lock_irqsave(&cp->cp_lock, flags); 283 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) 284 list_move(&rm->m_conn_item, &to_be_dropped); 285 spin_unlock_irqrestore(&cp->cp_lock, flags); 286 continue; 287 } 288 289 /* Require an ACK every once in a while */ 290 len = ntohl(rm->m_inc.i_hdr.h_len); 291 if (cp->cp_unacked_packets == 0 || 292 cp->cp_unacked_bytes < len) { 293 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 294 295 cp->cp_unacked_packets = 296 rds_sysctl_max_unacked_packets; 297 cp->cp_unacked_bytes = 298 rds_sysctl_max_unacked_bytes; 299 rds_stats_inc(s_send_ack_required); 300 } else { 301 cp->cp_unacked_bytes -= len; 302 cp->cp_unacked_packets--; 303 } 304 305 cp->cp_xmit_rm = rm; 306 } 307 308 /* The transport either sends the whole rdma or none of it */ 309 if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) { 310 rm->m_final_op = &rm->rdma; 311 /* The transport owns the mapped memory for now. 312 * You can't unmap it while it's on the send queue 313 */ 314 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 315 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); 316 if (ret) { 317 clear_bit(RDS_MSG_MAPPED, &rm->m_flags); 318 wake_up_interruptible(&rm->m_flush_wait); 319 break; 320 } 321 cp->cp_xmit_rdma_sent = 1; 322 323 } 324 325 if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) { 326 rm->m_final_op = &rm->atomic; 327 /* The transport owns the mapped memory for now. 328 * You can't unmap it while it's on the send queue 329 */ 330 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 331 ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); 332 if (ret) { 333 clear_bit(RDS_MSG_MAPPED, &rm->m_flags); 334 wake_up_interruptible(&rm->m_flush_wait); 335 break; 336 } 337 cp->cp_xmit_atomic_sent = 1; 338 339 } 340 341 /* 342 * A number of cases require an RDS header to be sent 343 * even if there is no data. 344 * We permit 0-byte sends; rds-ping depends on this. 345 * However, if there are exclusively attached silent ops, 346 * we skip the hdr/data send, to enable silent operation. 347 */ 348 if (rm->data.op_nents == 0) { 349 int ops_present; 350 int all_ops_are_silent = 1; 351 352 ops_present = (rm->atomic.op_active || rm->rdma.op_active); 353 if (rm->atomic.op_active && !rm->atomic.op_silent) 354 all_ops_are_silent = 0; 355 if (rm->rdma.op_active && !rm->rdma.op_silent) 356 all_ops_are_silent = 0; 357 358 if (ops_present && all_ops_are_silent 359 && !rm->m_rdma_cookie) 360 rm->data.op_active = 0; 361 } 362 363 if (rm->data.op_active && !cp->cp_xmit_data_sent) { 364 rm->m_final_op = &rm->data; 365 366 ret = conn->c_trans->xmit(conn, rm, 367 cp->cp_xmit_hdr_off, 368 cp->cp_xmit_sg, 369 cp->cp_xmit_data_off); 370 if (ret <= 0) 371 break; 372 373 if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) { 374 tmp = min_t(int, ret, 375 sizeof(struct rds_header) - 376 cp->cp_xmit_hdr_off); 377 cp->cp_xmit_hdr_off += tmp; 378 ret -= tmp; 379 } 380 381 sg = &rm->data.op_sg[cp->cp_xmit_sg]; 382 while (ret) { 383 tmp = min_t(int, ret, sg->length - 384 cp->cp_xmit_data_off); 385 cp->cp_xmit_data_off += tmp; 386 ret -= tmp; 387 if (cp->cp_xmit_data_off == sg->length) { 388 cp->cp_xmit_data_off = 0; 389 sg++; 390 cp->cp_xmit_sg++; 391 BUG_ON(ret != 0 && cp->cp_xmit_sg == 392 rm->data.op_nents); 393 } 394 } 395 396 if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) && 397 (cp->cp_xmit_sg == rm->data.op_nents)) 398 cp->cp_xmit_data_sent = 1; 399 } 400 401 /* 402 * A rm will only take multiple times through this loop 403 * if there is a data op. Thus, if the data is sent (or there was 404 * none), then we're done with the rm. 405 */ 406 if (!rm->data.op_active || cp->cp_xmit_data_sent) { 407 cp->cp_xmit_rm = NULL; 408 cp->cp_xmit_sg = 0; 409 cp->cp_xmit_hdr_off = 0; 410 cp->cp_xmit_data_off = 0; 411 cp->cp_xmit_rdma_sent = 0; 412 cp->cp_xmit_atomic_sent = 0; 413 cp->cp_xmit_data_sent = 0; 414 415 rds_message_put(rm); 416 } 417 } 418 419 over_batch: 420 if (conn->c_trans->xmit_path_complete) 421 conn->c_trans->xmit_path_complete(cp); 422 release_in_xmit(cp); 423 424 /* Nuke any messages we decided not to retransmit. */ 425 if (!list_empty(&to_be_dropped)) { 426 /* irqs on here, so we can put(), unlike above */ 427 list_for_each_entry(rm, &to_be_dropped, m_conn_item) 428 rds_message_put(rm); 429 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); 430 } 431 432 /* 433 * Other senders can queue a message after we last test the send queue 434 * but before we clear RDS_IN_XMIT. In that case they'd back off and 435 * not try and send their newly queued message. We need to check the 436 * send queue after having cleared RDS_IN_XMIT so that their message 437 * doesn't get stuck on the send queue. 438 * 439 * If the transport cannot continue (i.e ret != 0), then it must 440 * call us when more room is available, such as from the tx 441 * completion handler. 442 * 443 * We have an extra generation check here so that if someone manages 444 * to jump in after our release_in_xmit, we'll see that they have done 445 * some work and we will skip our goto 446 */ 447 if (ret == 0) { 448 bool raced; 449 450 smp_mb(); 451 raced = send_gen != READ_ONCE(cp->cp_send_gen); 452 453 if ((test_bit(0, &conn->c_map_queued) || 454 !list_empty(&cp->cp_send_queue)) && !raced) { 455 if (batch_count < send_batch_count) 456 goto restart; 457 rcu_read_lock(); 458 if (rds_destroy_pending(cp->cp_conn)) 459 ret = -ENETUNREACH; 460 else 461 queue_delayed_work(rds_wq, &cp->cp_send_w, 1); 462 rcu_read_unlock(); 463 } else if (raced) { 464 rds_stats_inc(s_send_lock_queue_raced); 465 } 466 } 467 out: 468 return ret; 469 } 470 EXPORT_SYMBOL_GPL(rds_send_xmit); 471 472 static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm) 473 { 474 u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len); 475 476 assert_spin_locked(&rs->rs_lock); 477 478 BUG_ON(rs->rs_snd_bytes < len); 479 rs->rs_snd_bytes -= len; 480 481 if (rs->rs_snd_bytes == 0) 482 rds_stats_inc(s_send_queue_empty); 483 } 484 485 static inline int rds_send_is_acked(struct rds_message *rm, u64 ack, 486 is_acked_func is_acked) 487 { 488 if (is_acked) 489 return is_acked(rm, ack); 490 return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack; 491 } 492 493 /* 494 * This is pretty similar to what happens below in the ACK 495 * handling code - except that we call here as soon as we get 496 * the IB send completion on the RDMA op and the accompanying 497 * message. 498 */ 499 void rds_rdma_send_complete(struct rds_message *rm, int status) 500 { 501 struct rds_sock *rs = NULL; 502 struct rm_rdma_op *ro; 503 struct rds_notifier *notifier; 504 unsigned long flags; 505 506 spin_lock_irqsave(&rm->m_rs_lock, flags); 507 508 ro = &rm->rdma; 509 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && 510 ro->op_active && ro->op_notify && ro->op_notifier) { 511 notifier = ro->op_notifier; 512 rs = rm->m_rs; 513 sock_hold(rds_rs_to_sk(rs)); 514 515 notifier->n_status = status; 516 spin_lock(&rs->rs_lock); 517 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); 518 spin_unlock(&rs->rs_lock); 519 520 ro->op_notifier = NULL; 521 } 522 523 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 524 525 if (rs) { 526 rds_wake_sk_sleep(rs); 527 sock_put(rds_rs_to_sk(rs)); 528 } 529 } 530 EXPORT_SYMBOL_GPL(rds_rdma_send_complete); 531 532 /* 533 * Just like above, except looks at atomic op 534 */ 535 void rds_atomic_send_complete(struct rds_message *rm, int status) 536 { 537 struct rds_sock *rs = NULL; 538 struct rm_atomic_op *ao; 539 struct rds_notifier *notifier; 540 unsigned long flags; 541 542 spin_lock_irqsave(&rm->m_rs_lock, flags); 543 544 ao = &rm->atomic; 545 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) 546 && ao->op_active && ao->op_notify && ao->op_notifier) { 547 notifier = ao->op_notifier; 548 rs = rm->m_rs; 549 sock_hold(rds_rs_to_sk(rs)); 550 551 notifier->n_status = status; 552 spin_lock(&rs->rs_lock); 553 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); 554 spin_unlock(&rs->rs_lock); 555 556 ao->op_notifier = NULL; 557 } 558 559 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 560 561 if (rs) { 562 rds_wake_sk_sleep(rs); 563 sock_put(rds_rs_to_sk(rs)); 564 } 565 } 566 EXPORT_SYMBOL_GPL(rds_atomic_send_complete); 567 568 /* 569 * This is the same as rds_rdma_send_complete except we 570 * don't do any locking - we have all the ingredients (message, 571 * socket, socket lock) and can just move the notifier. 572 */ 573 static inline void 574 __rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) 575 { 576 struct rm_rdma_op *ro; 577 struct rm_atomic_op *ao; 578 579 ro = &rm->rdma; 580 if (ro->op_active && ro->op_notify && ro->op_notifier) { 581 ro->op_notifier->n_status = status; 582 list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue); 583 ro->op_notifier = NULL; 584 } 585 586 ao = &rm->atomic; 587 if (ao->op_active && ao->op_notify && ao->op_notifier) { 588 ao->op_notifier->n_status = status; 589 list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue); 590 ao->op_notifier = NULL; 591 } 592 593 /* No need to wake the app - caller does this */ 594 } 595 596 /* 597 * This removes messages from the socket's list if they're on it. The list 598 * argument must be private to the caller, we must be able to modify it 599 * without locks. The messages must have a reference held for their 600 * position on the list. This function will drop that reference after 601 * removing the messages from the 'messages' list regardless of if it found 602 * the messages on the socket list or not. 603 */ 604 static void rds_send_remove_from_sock(struct list_head *messages, int status) 605 { 606 unsigned long flags; 607 struct rds_sock *rs = NULL; 608 struct rds_message *rm; 609 610 while (!list_empty(messages)) { 611 int was_on_sock = 0; 612 613 rm = list_entry(messages->next, struct rds_message, 614 m_conn_item); 615 list_del_init(&rm->m_conn_item); 616 617 /* 618 * If we see this flag cleared then we're *sure* that someone 619 * else beat us to removing it from the sock. If we race 620 * with their flag update we'll get the lock and then really 621 * see that the flag has been cleared. 622 * 623 * The message spinlock makes sure nobody clears rm->m_rs 624 * while we're messing with it. It does not prevent the 625 * message from being removed from the socket, though. 626 */ 627 spin_lock_irqsave(&rm->m_rs_lock, flags); 628 if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) 629 goto unlock_and_drop; 630 631 if (rs != rm->m_rs) { 632 if (rs) { 633 rds_wake_sk_sleep(rs); 634 sock_put(rds_rs_to_sk(rs)); 635 } 636 rs = rm->m_rs; 637 if (rs) 638 sock_hold(rds_rs_to_sk(rs)); 639 } 640 if (!rs) 641 goto unlock_and_drop; 642 spin_lock(&rs->rs_lock); 643 644 if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) { 645 struct rm_rdma_op *ro = &rm->rdma; 646 struct rds_notifier *notifier; 647 648 list_del_init(&rm->m_sock_item); 649 rds_send_sndbuf_remove(rs, rm); 650 651 if (ro->op_active && ro->op_notifier && 652 (ro->op_notify || (ro->op_recverr && status))) { 653 notifier = ro->op_notifier; 654 list_add_tail(¬ifier->n_list, 655 &rs->rs_notify_queue); 656 if (!notifier->n_status) 657 notifier->n_status = status; 658 rm->rdma.op_notifier = NULL; 659 } 660 was_on_sock = 1; 661 } 662 spin_unlock(&rs->rs_lock); 663 664 unlock_and_drop: 665 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 666 rds_message_put(rm); 667 if (was_on_sock) 668 rds_message_put(rm); 669 } 670 671 if (rs) { 672 rds_wake_sk_sleep(rs); 673 sock_put(rds_rs_to_sk(rs)); 674 } 675 } 676 677 /* 678 * Transports call here when they've determined that the receiver queued 679 * messages up to, and including, the given sequence number. Messages are 680 * moved to the retrans queue when rds_send_xmit picks them off the send 681 * queue. This means that in the TCP case, the message may not have been 682 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked 683 * checks the RDS_MSG_HAS_ACK_SEQ bit. 684 */ 685 void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack, 686 is_acked_func is_acked) 687 { 688 struct rds_message *rm, *tmp; 689 unsigned long flags; 690 LIST_HEAD(list); 691 692 spin_lock_irqsave(&cp->cp_lock, flags); 693 694 list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { 695 if (!rds_send_is_acked(rm, ack, is_acked)) 696 break; 697 698 list_move(&rm->m_conn_item, &list); 699 clear_bit(RDS_MSG_ON_CONN, &rm->m_flags); 700 } 701 702 /* order flag updates with spin locks */ 703 if (!list_empty(&list)) 704 smp_mb__after_atomic(); 705 706 spin_unlock_irqrestore(&cp->cp_lock, flags); 707 708 /* now remove the messages from the sock list as needed */ 709 rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS); 710 } 711 EXPORT_SYMBOL_GPL(rds_send_path_drop_acked); 712 713 void rds_send_drop_acked(struct rds_connection *conn, u64 ack, 714 is_acked_func is_acked) 715 { 716 WARN_ON(conn->c_trans->t_mp_capable); 717 rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked); 718 } 719 EXPORT_SYMBOL_GPL(rds_send_drop_acked); 720 721 void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest) 722 { 723 struct rds_message *rm, *tmp; 724 struct rds_connection *conn; 725 struct rds_conn_path *cp; 726 unsigned long flags; 727 LIST_HEAD(list); 728 729 /* get all the messages we're dropping under the rs lock */ 730 spin_lock_irqsave(&rs->rs_lock, flags); 731 732 list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) { 733 if (dest && 734 (!ipv6_addr_equal(&dest->sin6_addr, &rm->m_daddr) || 735 dest->sin6_port != rm->m_inc.i_hdr.h_dport)) 736 continue; 737 738 list_move(&rm->m_sock_item, &list); 739 rds_send_sndbuf_remove(rs, rm); 740 clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags); 741 } 742 743 /* order flag updates with the rs lock */ 744 smp_mb__after_atomic(); 745 746 spin_unlock_irqrestore(&rs->rs_lock, flags); 747 748 if (list_empty(&list)) 749 return; 750 751 /* Remove the messages from the conn */ 752 list_for_each_entry(rm, &list, m_sock_item) { 753 754 conn = rm->m_inc.i_conn; 755 if (conn->c_trans->t_mp_capable) 756 cp = rm->m_inc.i_conn_path; 757 else 758 cp = &conn->c_path[0]; 759 760 spin_lock_irqsave(&cp->cp_lock, flags); 761 /* 762 * Maybe someone else beat us to removing rm from the conn. 763 * If we race with their flag update we'll get the lock and 764 * then really see that the flag has been cleared. 765 */ 766 if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { 767 spin_unlock_irqrestore(&cp->cp_lock, flags); 768 continue; 769 } 770 list_del_init(&rm->m_conn_item); 771 spin_unlock_irqrestore(&cp->cp_lock, flags); 772 773 /* 774 * Couldn't grab m_rs_lock in top loop (lock ordering), 775 * but we can now. 776 */ 777 spin_lock_irqsave(&rm->m_rs_lock, flags); 778 779 spin_lock(&rs->rs_lock); 780 __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); 781 spin_unlock(&rs->rs_lock); 782 783 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 784 785 rds_message_put(rm); 786 } 787 788 rds_wake_sk_sleep(rs); 789 790 while (!list_empty(&list)) { 791 rm = list_entry(list.next, struct rds_message, m_sock_item); 792 list_del_init(&rm->m_sock_item); 793 rds_message_wait(rm); 794 795 /* just in case the code above skipped this message 796 * because RDS_MSG_ON_CONN wasn't set, run it again here 797 * taking m_rs_lock is the only thing that keeps us 798 * from racing with ack processing. 799 */ 800 spin_lock_irqsave(&rm->m_rs_lock, flags); 801 802 spin_lock(&rs->rs_lock); 803 __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); 804 spin_unlock(&rs->rs_lock); 805 806 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 807 808 rds_message_put(rm); 809 } 810 } 811 812 /* 813 * we only want this to fire once so we use the callers 'queued'. It's 814 * possible that another thread can race with us and remove the 815 * message from the flow with RDS_CANCEL_SENT_TO. 816 */ 817 static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, 818 struct rds_conn_path *cp, 819 struct rds_message *rm, __be16 sport, 820 __be16 dport, int *queued) 821 { 822 unsigned long flags; 823 u32 len; 824 825 if (*queued) 826 goto out; 827 828 len = be32_to_cpu(rm->m_inc.i_hdr.h_len); 829 830 /* this is the only place which holds both the socket's rs_lock 831 * and the connection's c_lock */ 832 spin_lock_irqsave(&rs->rs_lock, flags); 833 834 /* 835 * If there is a little space in sndbuf, we don't queue anything, 836 * and userspace gets -EAGAIN. But poll() indicates there's send 837 * room. This can lead to bad behavior (spinning) if snd_bytes isn't 838 * freed up by incoming acks. So we check the *old* value of 839 * rs_snd_bytes here to allow the last msg to exceed the buffer, 840 * and poll() now knows no more data can be sent. 841 */ 842 if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) { 843 rs->rs_snd_bytes += len; 844 845 /* let recv side know we are close to send space exhaustion. 846 * This is probably not the optimal way to do it, as this 847 * means we set the flag on *all* messages as soon as our 848 * throughput hits a certain threshold. 849 */ 850 if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2) 851 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 852 853 list_add_tail(&rm->m_sock_item, &rs->rs_send_queue); 854 set_bit(RDS_MSG_ON_SOCK, &rm->m_flags); 855 rds_message_addref(rm); 856 sock_hold(rds_rs_to_sk(rs)); 857 rm->m_rs = rs; 858 859 /* The code ordering is a little weird, but we're 860 trying to minimize the time we hold c_lock */ 861 rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0); 862 rm->m_inc.i_conn = conn; 863 rm->m_inc.i_conn_path = cp; 864 rds_message_addref(rm); 865 866 spin_lock(&cp->cp_lock); 867 rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++); 868 list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); 869 set_bit(RDS_MSG_ON_CONN, &rm->m_flags); 870 spin_unlock(&cp->cp_lock); 871 872 rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n", 873 rm, len, rs, rs->rs_snd_bytes, 874 (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence)); 875 876 *queued = 1; 877 } 878 879 spin_unlock_irqrestore(&rs->rs_lock, flags); 880 out: 881 return *queued; 882 } 883 884 /* 885 * rds_message is getting to be quite complicated, and we'd like to allocate 886 * it all in one go. This figures out how big it needs to be up front. 887 */ 888 static int rds_rm_size(struct msghdr *msg, int num_sgs, 889 struct rds_iov_vector_arr *vct) 890 { 891 struct cmsghdr *cmsg; 892 int size = 0; 893 int cmsg_groups = 0; 894 int retval; 895 bool zcopy_cookie = false; 896 struct rds_iov_vector *iov, *tmp_iov; 897 898 if (num_sgs < 0) 899 return -EINVAL; 900 901 for_each_cmsghdr(cmsg, msg) { 902 if (!CMSG_OK(msg, cmsg)) 903 return -EINVAL; 904 905 if (cmsg->cmsg_level != SOL_RDS) 906 continue; 907 908 switch (cmsg->cmsg_type) { 909 case RDS_CMSG_RDMA_ARGS: 910 if (vct->indx >= vct->len) { 911 vct->len += vct->incr; 912 tmp_iov = 913 krealloc(vct->vec, 914 vct->len * 915 sizeof(struct rds_iov_vector), 916 GFP_KERNEL); 917 if (!tmp_iov) { 918 vct->len -= vct->incr; 919 return -ENOMEM; 920 } 921 vct->vec = tmp_iov; 922 } 923 iov = &vct->vec[vct->indx]; 924 memset(iov, 0, sizeof(struct rds_iov_vector)); 925 vct->indx++; 926 cmsg_groups |= 1; 927 retval = rds_rdma_extra_size(CMSG_DATA(cmsg), iov); 928 if (retval < 0) 929 return retval; 930 size += retval; 931 932 break; 933 934 case RDS_CMSG_ZCOPY_COOKIE: 935 zcopy_cookie = true; 936 fallthrough; 937 938 case RDS_CMSG_RDMA_DEST: 939 case RDS_CMSG_RDMA_MAP: 940 cmsg_groups |= 2; 941 /* these are valid but do no add any size */ 942 break; 943 944 case RDS_CMSG_ATOMIC_CSWP: 945 case RDS_CMSG_ATOMIC_FADD: 946 case RDS_CMSG_MASKED_ATOMIC_CSWP: 947 case RDS_CMSG_MASKED_ATOMIC_FADD: 948 cmsg_groups |= 1; 949 size += sizeof(struct scatterlist); 950 break; 951 952 default: 953 return -EINVAL; 954 } 955 956 } 957 958 if ((msg->msg_flags & MSG_ZEROCOPY) && !zcopy_cookie) 959 return -EINVAL; 960 961 size += num_sgs * sizeof(struct scatterlist); 962 963 /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */ 964 if (cmsg_groups == 3) 965 return -EINVAL; 966 967 return size; 968 } 969 970 static int rds_cmsg_zcopy(struct rds_sock *rs, struct rds_message *rm, 971 struct cmsghdr *cmsg) 972 { 973 u32 *cookie; 974 975 if (cmsg->cmsg_len < CMSG_LEN(sizeof(*cookie)) || 976 !rm->data.op_mmp_znotifier) 977 return -EINVAL; 978 cookie = CMSG_DATA(cmsg); 979 rm->data.op_mmp_znotifier->z_cookie = *cookie; 980 return 0; 981 } 982 983 static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, 984 struct msghdr *msg, int *allocated_mr, 985 struct rds_iov_vector_arr *vct) 986 { 987 struct cmsghdr *cmsg; 988 int ret = 0, ind = 0; 989 990 for_each_cmsghdr(cmsg, msg) { 991 if (!CMSG_OK(msg, cmsg)) 992 return -EINVAL; 993 994 if (cmsg->cmsg_level != SOL_RDS) 995 continue; 996 997 /* As a side effect, RDMA_DEST and RDMA_MAP will set 998 * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr. 999 */ 1000 switch (cmsg->cmsg_type) { 1001 case RDS_CMSG_RDMA_ARGS: 1002 if (ind >= vct->indx) 1003 return -ENOMEM; 1004 ret = rds_cmsg_rdma_args(rs, rm, cmsg, &vct->vec[ind]); 1005 ind++; 1006 break; 1007 1008 case RDS_CMSG_RDMA_DEST: 1009 ret = rds_cmsg_rdma_dest(rs, rm, cmsg); 1010 break; 1011 1012 case RDS_CMSG_RDMA_MAP: 1013 ret = rds_cmsg_rdma_map(rs, rm, cmsg); 1014 if (!ret) 1015 *allocated_mr = 1; 1016 else if (ret == -ENODEV) 1017 /* Accommodate the get_mr() case which can fail 1018 * if connection isn't established yet. 1019 */ 1020 ret = -EAGAIN; 1021 break; 1022 case RDS_CMSG_ATOMIC_CSWP: 1023 case RDS_CMSG_ATOMIC_FADD: 1024 case RDS_CMSG_MASKED_ATOMIC_CSWP: 1025 case RDS_CMSG_MASKED_ATOMIC_FADD: 1026 ret = rds_cmsg_atomic(rs, rm, cmsg); 1027 break; 1028 1029 case RDS_CMSG_ZCOPY_COOKIE: 1030 ret = rds_cmsg_zcopy(rs, rm, cmsg); 1031 break; 1032 1033 default: 1034 return -EINVAL; 1035 } 1036 1037 if (ret) 1038 break; 1039 } 1040 1041 return ret; 1042 } 1043 1044 static int rds_send_mprds_hash(struct rds_sock *rs, 1045 struct rds_connection *conn, int nonblock) 1046 { 1047 int hash; 1048 1049 if (conn->c_npaths == 0) 1050 hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS); 1051 else 1052 hash = RDS_MPATH_HASH(rs, conn->c_npaths); 1053 if (conn->c_npaths == 0 && hash != 0) { 1054 rds_send_ping(conn, 0); 1055 1056 /* The underlying connection is not up yet. Need to wait 1057 * until it is up to be sure that the non-zero c_path can be 1058 * used. But if we are interrupted, we have to use the zero 1059 * c_path in case the connection ends up being non-MP capable. 1060 */ 1061 if (conn->c_npaths == 0) { 1062 /* Cannot wait for the connection be made, so just use 1063 * the base c_path. 1064 */ 1065 if (nonblock) 1066 return 0; 1067 if (wait_event_interruptible(conn->c_hs_waitq, 1068 conn->c_npaths != 0)) 1069 hash = 0; 1070 } 1071 if (conn->c_npaths == 1) 1072 hash = 0; 1073 } 1074 return hash; 1075 } 1076 1077 static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes) 1078 { 1079 struct rds_rdma_args *args; 1080 struct cmsghdr *cmsg; 1081 1082 for_each_cmsghdr(cmsg, msg) { 1083 if (!CMSG_OK(msg, cmsg)) 1084 return -EINVAL; 1085 1086 if (cmsg->cmsg_level != SOL_RDS) 1087 continue; 1088 1089 if (cmsg->cmsg_type == RDS_CMSG_RDMA_ARGS) { 1090 if (cmsg->cmsg_len < 1091 CMSG_LEN(sizeof(struct rds_rdma_args))) 1092 return -EINVAL; 1093 args = CMSG_DATA(cmsg); 1094 *rdma_bytes += args->remote_vec.bytes; 1095 } 1096 } 1097 return 0; 1098 } 1099 1100 int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) 1101 { 1102 struct sock *sk = sock->sk; 1103 struct rds_sock *rs = rds_sk_to_rs(sk); 1104 DECLARE_SOCKADDR(struct sockaddr_in6 *, sin6, msg->msg_name); 1105 DECLARE_SOCKADDR(struct sockaddr_in *, usin, msg->msg_name); 1106 __be16 dport; 1107 struct rds_message *rm = NULL; 1108 struct rds_connection *conn; 1109 int ret = 0; 1110 int queued = 0, allocated_mr = 0; 1111 int nonblock = msg->msg_flags & MSG_DONTWAIT; 1112 long timeo = sock_sndtimeo(sk, nonblock); 1113 struct rds_conn_path *cpath; 1114 struct in6_addr daddr; 1115 __u32 scope_id = 0; 1116 size_t rdma_payload_len = 0; 1117 bool zcopy = ((msg->msg_flags & MSG_ZEROCOPY) && 1118 sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY)); 1119 int num_sgs = DIV_ROUND_UP(payload_len, PAGE_SIZE); 1120 int namelen; 1121 struct rds_iov_vector_arr vct; 1122 int ind; 1123 1124 memset(&vct, 0, sizeof(vct)); 1125 1126 /* expect 1 RDMA CMSG per rds_sendmsg. can still grow if more needed. */ 1127 vct.incr = 1; 1128 1129 /* Mirror Linux UDP mirror of BSD error message compatibility */ 1130 /* XXX: Perhaps MSG_MORE someday */ 1131 if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT | MSG_ZEROCOPY)) { 1132 ret = -EOPNOTSUPP; 1133 goto out; 1134 } 1135 1136 namelen = msg->msg_namelen; 1137 if (namelen != 0) { 1138 if (namelen < sizeof(*usin)) { 1139 ret = -EINVAL; 1140 goto out; 1141 } 1142 switch (usin->sin_family) { 1143 case AF_INET: 1144 if (usin->sin_addr.s_addr == htonl(INADDR_ANY) || 1145 usin->sin_addr.s_addr == htonl(INADDR_BROADCAST) || 1146 ipv4_is_multicast(usin->sin_addr.s_addr)) { 1147 ret = -EINVAL; 1148 goto out; 1149 } 1150 ipv6_addr_set_v4mapped(usin->sin_addr.s_addr, &daddr); 1151 dport = usin->sin_port; 1152 break; 1153 1154 #if IS_ENABLED(CONFIG_IPV6) 1155 case AF_INET6: { 1156 int addr_type; 1157 1158 if (namelen < sizeof(*sin6)) { 1159 ret = -EINVAL; 1160 goto out; 1161 } 1162 addr_type = ipv6_addr_type(&sin6->sin6_addr); 1163 if (!(addr_type & IPV6_ADDR_UNICAST)) { 1164 __be32 addr4; 1165 1166 if (!(addr_type & IPV6_ADDR_MAPPED)) { 1167 ret = -EINVAL; 1168 goto out; 1169 } 1170 1171 /* It is a mapped address. Need to do some 1172 * sanity checks. 1173 */ 1174 addr4 = sin6->sin6_addr.s6_addr32[3]; 1175 if (addr4 == htonl(INADDR_ANY) || 1176 addr4 == htonl(INADDR_BROADCAST) || 1177 ipv4_is_multicast(addr4)) { 1178 ret = -EINVAL; 1179 goto out; 1180 } 1181 } 1182 if (addr_type & IPV6_ADDR_LINKLOCAL) { 1183 if (sin6->sin6_scope_id == 0) { 1184 ret = -EINVAL; 1185 goto out; 1186 } 1187 scope_id = sin6->sin6_scope_id; 1188 } 1189 1190 daddr = sin6->sin6_addr; 1191 dport = sin6->sin6_port; 1192 break; 1193 } 1194 #endif 1195 1196 default: 1197 ret = -EINVAL; 1198 goto out; 1199 } 1200 } else { 1201 /* We only care about consistency with ->connect() */ 1202 lock_sock(sk); 1203 daddr = rs->rs_conn_addr; 1204 dport = rs->rs_conn_port; 1205 scope_id = rs->rs_bound_scope_id; 1206 release_sock(sk); 1207 } 1208 1209 lock_sock(sk); 1210 if (ipv6_addr_any(&rs->rs_bound_addr) || ipv6_addr_any(&daddr)) { 1211 release_sock(sk); 1212 ret = -ENOTCONN; 1213 goto out; 1214 } else if (namelen != 0) { 1215 /* Cannot send to an IPv4 address using an IPv6 source 1216 * address and cannot send to an IPv6 address using an 1217 * IPv4 source address. 1218 */ 1219 if (ipv6_addr_v4mapped(&daddr) ^ 1220 ipv6_addr_v4mapped(&rs->rs_bound_addr)) { 1221 release_sock(sk); 1222 ret = -EOPNOTSUPP; 1223 goto out; 1224 } 1225 /* If the socket is already bound to a link local address, 1226 * it can only send to peers on the same link. But allow 1227 * communicating between link local and non-link local address. 1228 */ 1229 if (scope_id != rs->rs_bound_scope_id) { 1230 if (!scope_id) { 1231 scope_id = rs->rs_bound_scope_id; 1232 } else if (rs->rs_bound_scope_id) { 1233 release_sock(sk); 1234 ret = -EINVAL; 1235 goto out; 1236 } 1237 } 1238 } 1239 release_sock(sk); 1240 1241 ret = rds_rdma_bytes(msg, &rdma_payload_len); 1242 if (ret) 1243 goto out; 1244 1245 if (max_t(size_t, payload_len, rdma_payload_len) > RDS_MAX_MSG_SIZE) { 1246 ret = -EMSGSIZE; 1247 goto out; 1248 } 1249 1250 if (payload_len > rds_sk_sndbuf(rs)) { 1251 ret = -EMSGSIZE; 1252 goto out; 1253 } 1254 1255 if (zcopy) { 1256 if (rs->rs_transport->t_type != RDS_TRANS_TCP) { 1257 ret = -EOPNOTSUPP; 1258 goto out; 1259 } 1260 num_sgs = iov_iter_npages(&msg->msg_iter, INT_MAX); 1261 } 1262 /* size of rm including all sgs */ 1263 ret = rds_rm_size(msg, num_sgs, &vct); 1264 if (ret < 0) 1265 goto out; 1266 1267 rm = rds_message_alloc(ret, GFP_KERNEL); 1268 if (!rm) { 1269 ret = -ENOMEM; 1270 goto out; 1271 } 1272 1273 /* Attach data to the rm */ 1274 if (payload_len) { 1275 rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs); 1276 if (IS_ERR(rm->data.op_sg)) { 1277 ret = PTR_ERR(rm->data.op_sg); 1278 goto out; 1279 } 1280 ret = rds_message_copy_from_user(rm, &msg->msg_iter, zcopy); 1281 if (ret) 1282 goto out; 1283 } 1284 rm->data.op_active = 1; 1285 1286 rm->m_daddr = daddr; 1287 1288 /* rds_conn_create has a spinlock that runs with IRQ off. 1289 * Caching the conn in the socket helps a lot. */ 1290 if (rs->rs_conn && ipv6_addr_equal(&rs->rs_conn->c_faddr, &daddr) && 1291 rs->rs_tos == rs->rs_conn->c_tos) { 1292 conn = rs->rs_conn; 1293 } else { 1294 conn = rds_conn_create_outgoing(sock_net(sock->sk), 1295 &rs->rs_bound_addr, &daddr, 1296 rs->rs_transport, rs->rs_tos, 1297 sock->sk->sk_allocation, 1298 scope_id); 1299 if (IS_ERR(conn)) { 1300 ret = PTR_ERR(conn); 1301 goto out; 1302 } 1303 rs->rs_conn = conn; 1304 } 1305 1306 if (conn->c_trans->t_mp_capable) 1307 cpath = &conn->c_path[rds_send_mprds_hash(rs, conn, nonblock)]; 1308 else 1309 cpath = &conn->c_path[0]; 1310 1311 rm->m_conn_path = cpath; 1312 1313 /* Parse any control messages the user may have included. */ 1314 ret = rds_cmsg_send(rs, rm, msg, &allocated_mr, &vct); 1315 if (ret) 1316 goto out; 1317 1318 if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) { 1319 printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", 1320 &rm->rdma, conn->c_trans->xmit_rdma); 1321 ret = -EOPNOTSUPP; 1322 goto out; 1323 } 1324 1325 if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) { 1326 printk_ratelimited(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n", 1327 &rm->atomic, conn->c_trans->xmit_atomic); 1328 ret = -EOPNOTSUPP; 1329 goto out; 1330 } 1331 1332 if (rds_destroy_pending(conn)) { 1333 ret = -EAGAIN; 1334 goto out; 1335 } 1336 1337 if (rds_conn_path_down(cpath)) 1338 rds_check_all_paths(conn); 1339 1340 ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); 1341 if (ret) { 1342 rs->rs_seen_congestion = 1; 1343 goto out; 1344 } 1345 while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port, 1346 dport, &queued)) { 1347 rds_stats_inc(s_send_queue_full); 1348 1349 if (nonblock) { 1350 ret = -EAGAIN; 1351 goto out; 1352 } 1353 1354 timeo = wait_event_interruptible_timeout(*sk_sleep(sk), 1355 rds_send_queue_rm(rs, conn, cpath, rm, 1356 rs->rs_bound_port, 1357 dport, 1358 &queued), 1359 timeo); 1360 rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo); 1361 if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT) 1362 continue; 1363 1364 ret = timeo; 1365 if (ret == 0) 1366 ret = -ETIMEDOUT; 1367 goto out; 1368 } 1369 1370 /* 1371 * By now we've committed to the send. We reuse rds_send_worker() 1372 * to retry sends in the rds thread if the transport asks us to. 1373 */ 1374 rds_stats_inc(s_send_queued); 1375 1376 ret = rds_send_xmit(cpath); 1377 if (ret == -ENOMEM || ret == -EAGAIN) { 1378 ret = 0; 1379 rcu_read_lock(); 1380 if (rds_destroy_pending(cpath->cp_conn)) 1381 ret = -ENETUNREACH; 1382 else 1383 queue_delayed_work(rds_wq, &cpath->cp_send_w, 1); 1384 rcu_read_unlock(); 1385 } 1386 if (ret) 1387 goto out; 1388 rds_message_put(rm); 1389 1390 for (ind = 0; ind < vct.indx; ind++) 1391 kfree(vct.vec[ind].iov); 1392 kfree(vct.vec); 1393 1394 return payload_len; 1395 1396 out: 1397 for (ind = 0; ind < vct.indx; ind++) 1398 kfree(vct.vec[ind].iov); 1399 kfree(vct.vec); 1400 1401 /* If the user included a RDMA_MAP cmsg, we allocated a MR on the fly. 1402 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN 1403 * or in any other way, we need to destroy the MR again */ 1404 if (allocated_mr) 1405 rds_rdma_unuse(rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1); 1406 1407 if (rm) 1408 rds_message_put(rm); 1409 return ret; 1410 } 1411 1412 /* 1413 * send out a probe. Can be shared by rds_send_ping, 1414 * rds_send_pong, rds_send_hb. 1415 * rds_send_hb should use h_flags 1416 * RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED 1417 * or 1418 * RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED 1419 */ 1420 static int 1421 rds_send_probe(struct rds_conn_path *cp, __be16 sport, 1422 __be16 dport, u8 h_flags) 1423 { 1424 struct rds_message *rm; 1425 unsigned long flags; 1426 int ret = 0; 1427 1428 rm = rds_message_alloc(0, GFP_ATOMIC); 1429 if (!rm) { 1430 ret = -ENOMEM; 1431 goto out; 1432 } 1433 1434 rm->m_daddr = cp->cp_conn->c_faddr; 1435 rm->data.op_active = 1; 1436 1437 rds_conn_path_connect_if_down(cp); 1438 1439 ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL); 1440 if (ret) 1441 goto out; 1442 1443 spin_lock_irqsave(&cp->cp_lock, flags); 1444 list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); 1445 set_bit(RDS_MSG_ON_CONN, &rm->m_flags); 1446 rds_message_addref(rm); 1447 rm->m_inc.i_conn = cp->cp_conn; 1448 rm->m_inc.i_conn_path = cp; 1449 1450 rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 1451 cp->cp_next_tx_seq); 1452 rm->m_inc.i_hdr.h_flags |= h_flags; 1453 cp->cp_next_tx_seq++; 1454 1455 if (RDS_HS_PROBE(be16_to_cpu(sport), be16_to_cpu(dport)) && 1456 cp->cp_conn->c_trans->t_mp_capable) { 1457 u16 npaths = cpu_to_be16(RDS_MPATH_WORKERS); 1458 u32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num); 1459 1460 rds_message_add_extension(&rm->m_inc.i_hdr, 1461 RDS_EXTHDR_NPATHS, &npaths, 1462 sizeof(npaths)); 1463 rds_message_add_extension(&rm->m_inc.i_hdr, 1464 RDS_EXTHDR_GEN_NUM, 1465 &my_gen_num, 1466 sizeof(u32)); 1467 } 1468 spin_unlock_irqrestore(&cp->cp_lock, flags); 1469 1470 rds_stats_inc(s_send_queued); 1471 rds_stats_inc(s_send_pong); 1472 1473 /* schedule the send work on rds_wq */ 1474 rcu_read_lock(); 1475 if (!rds_destroy_pending(cp->cp_conn)) 1476 queue_delayed_work(rds_wq, &cp->cp_send_w, 1); 1477 rcu_read_unlock(); 1478 1479 rds_message_put(rm); 1480 return 0; 1481 1482 out: 1483 if (rm) 1484 rds_message_put(rm); 1485 return ret; 1486 } 1487 1488 int 1489 rds_send_pong(struct rds_conn_path *cp, __be16 dport) 1490 { 1491 return rds_send_probe(cp, 0, dport, 0); 1492 } 1493 1494 void 1495 rds_send_ping(struct rds_connection *conn, int cp_index) 1496 { 1497 unsigned long flags; 1498 struct rds_conn_path *cp = &conn->c_path[cp_index]; 1499 1500 spin_lock_irqsave(&cp->cp_lock, flags); 1501 if (conn->c_ping_triggered) { 1502 spin_unlock_irqrestore(&cp->cp_lock, flags); 1503 return; 1504 } 1505 conn->c_ping_triggered = 1; 1506 spin_unlock_irqrestore(&cp->cp_lock, flags); 1507 rds_send_probe(cp, cpu_to_be16(RDS_FLAG_PROBE_PORT), 0, 0); 1508 } 1509 EXPORT_SYMBOL_GPL(rds_send_ping); 1510