1 /* 2 * Copyright (c) 2006 Oracle. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 * 32 */ 33 #include <linux/kernel.h> 34 #include <linux/moduleparam.h> 35 #include <linux/gfp.h> 36 #include <net/sock.h> 37 #include <linux/in.h> 38 #include <linux/list.h> 39 #include <linux/ratelimit.h> 40 #include <linux/export.h> 41 #include <linux/sizes.h> 42 43 #include "rds.h" 44 45 /* When transmitting messages in rds_send_xmit, we need to emerge from 46 * time to time and briefly release the CPU. Otherwise the softlock watchdog 47 * will kick our shin. 48 * Also, it seems fairer to not let one busy connection stall all the 49 * others. 50 * 51 * send_batch_count is the number of times we'll loop in send_xmit. Setting 52 * it to 0 will restore the old behavior (where we looped until we had 53 * drained the queue). 54 */ 55 static int send_batch_count = SZ_1K; 56 module_param(send_batch_count, int, 0444); 57 MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); 58 59 static void rds_send_remove_from_sock(struct list_head *messages, int status); 60 61 /* 62 * Reset the send state. Callers must ensure that this doesn't race with 63 * rds_send_xmit(). 64 */ 65 void rds_send_path_reset(struct rds_conn_path *cp) 66 { 67 struct rds_message *rm, *tmp; 68 unsigned long flags; 69 70 if (cp->cp_xmit_rm) { 71 rm = cp->cp_xmit_rm; 72 cp->cp_xmit_rm = NULL; 73 /* Tell the user the RDMA op is no longer mapped by the 74 * transport. This isn't entirely true (it's flushed out 75 * independently) but as the connection is down, there's 76 * no ongoing RDMA to/from that memory */ 77 rds_message_unmapped(rm); 78 rds_message_put(rm); 79 } 80 81 cp->cp_xmit_sg = 0; 82 cp->cp_xmit_hdr_off = 0; 83 cp->cp_xmit_data_off = 0; 84 cp->cp_xmit_atomic_sent = 0; 85 cp->cp_xmit_rdma_sent = 0; 86 cp->cp_xmit_data_sent = 0; 87 88 cp->cp_conn->c_map_queued = 0; 89 90 cp->cp_unacked_packets = rds_sysctl_max_unacked_packets; 91 cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes; 92 93 /* Mark messages as retransmissions, and move them to the send q */ 94 spin_lock_irqsave(&cp->cp_lock, flags); 95 list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { 96 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 97 set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags); 98 } 99 list_splice_init(&cp->cp_retrans, &cp->cp_send_queue); 100 spin_unlock_irqrestore(&cp->cp_lock, flags); 101 } 102 EXPORT_SYMBOL_GPL(rds_send_path_reset); 103 104 static int acquire_in_xmit(struct rds_conn_path *cp) 105 { 106 return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0; 107 } 108 109 static void release_in_xmit(struct rds_conn_path *cp) 110 { 111 clear_bit(RDS_IN_XMIT, &cp->cp_flags); 112 smp_mb__after_atomic(); 113 /* 114 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a 115 * hot path and finding waiters is very rare. We don't want to walk 116 * the system-wide hashed waitqueue buckets in the fast path only to 117 * almost never find waiters. 118 */ 119 if (waitqueue_active(&cp->cp_waitq)) 120 wake_up_all(&cp->cp_waitq); 121 } 122 123 /* 124 * We're making the conscious trade-off here to only send one message 125 * down the connection at a time. 126 * Pro: 127 * - tx queueing is a simple fifo list 128 * - reassembly is optional and easily done by transports per conn 129 * - no per flow rx lookup at all, straight to the socket 130 * - less per-frag memory and wire overhead 131 * Con: 132 * - queued acks can be delayed behind large messages 133 * Depends: 134 * - small message latency is higher behind queued large messages 135 * - large message latency isn't starved by intervening small sends 136 */ 137 int rds_send_xmit(struct rds_conn_path *cp) 138 { 139 struct rds_connection *conn = cp->cp_conn; 140 struct rds_message *rm; 141 unsigned long flags; 142 unsigned int tmp; 143 struct scatterlist *sg; 144 int ret = 0; 145 LIST_HEAD(to_be_dropped); 146 int batch_count; 147 unsigned long send_gen = 0; 148 149 restart: 150 batch_count = 0; 151 152 /* 153 * sendmsg calls here after having queued its message on the send 154 * queue. We only have one task feeding the connection at a time. If 155 * another thread is already feeding the queue then we back off. This 156 * avoids blocking the caller and trading per-connection data between 157 * caches per message. 158 */ 159 if (!acquire_in_xmit(cp)) { 160 rds_stats_inc(s_send_lock_contention); 161 ret = -ENOMEM; 162 goto out; 163 } 164 165 /* 166 * we record the send generation after doing the xmit acquire. 167 * if someone else manages to jump in and do some work, we'll use 168 * this to avoid a goto restart farther down. 169 * 170 * The acquire_in_xmit() check above ensures that only one 171 * caller can increment c_send_gen at any time. 172 */ 173 cp->cp_send_gen++; 174 send_gen = cp->cp_send_gen; 175 176 /* 177 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, 178 * we do the opposite to avoid races. 179 */ 180 if (!rds_conn_path_up(cp)) { 181 release_in_xmit(cp); 182 ret = 0; 183 goto out; 184 } 185 186 if (conn->c_trans->xmit_path_prepare) 187 conn->c_trans->xmit_path_prepare(cp); 188 189 /* 190 * spin trying to push headers and data down the connection until 191 * the connection doesn't make forward progress. 192 */ 193 while (1) { 194 195 rm = cp->cp_xmit_rm; 196 197 /* 198 * If between sending messages, we can send a pending congestion 199 * map update. 200 */ 201 if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) { 202 rm = rds_cong_update_alloc(conn); 203 if (IS_ERR(rm)) { 204 ret = PTR_ERR(rm); 205 break; 206 } 207 rm->data.op_active = 1; 208 rm->m_inc.i_conn_path = cp; 209 rm->m_inc.i_conn = cp->cp_conn; 210 211 cp->cp_xmit_rm = rm; 212 } 213 214 /* 215 * If not already working on one, grab the next message. 216 * 217 * cp_xmit_rm holds a ref while we're sending this message down 218 * the connction. We can use this ref while holding the 219 * send_sem.. rds_send_reset() is serialized with it. 220 */ 221 if (!rm) { 222 unsigned int len; 223 224 batch_count++; 225 226 /* we want to process as big a batch as we can, but 227 * we also want to avoid softlockups. If we've been 228 * through a lot of messages, lets back off and see 229 * if anyone else jumps in 230 */ 231 if (batch_count >= send_batch_count) 232 goto over_batch; 233 234 spin_lock_irqsave(&cp->cp_lock, flags); 235 236 if (!list_empty(&cp->cp_send_queue)) { 237 rm = list_entry(cp->cp_send_queue.next, 238 struct rds_message, 239 m_conn_item); 240 rds_message_addref(rm); 241 242 /* 243 * Move the message from the send queue to the retransmit 244 * list right away. 245 */ 246 list_move_tail(&rm->m_conn_item, 247 &cp->cp_retrans); 248 } 249 250 spin_unlock_irqrestore(&cp->cp_lock, flags); 251 252 if (!rm) 253 break; 254 255 /* Unfortunately, the way Infiniband deals with 256 * RDMA to a bad MR key is by moving the entire 257 * queue pair to error state. We cold possibly 258 * recover from that, but right now we drop the 259 * connection. 260 * Therefore, we never retransmit messages with RDMA ops. 261 */ 262 if (test_bit(RDS_MSG_FLUSH, &rm->m_flags) || 263 (rm->rdma.op_active && 264 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))) { 265 spin_lock_irqsave(&cp->cp_lock, flags); 266 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) 267 list_move(&rm->m_conn_item, &to_be_dropped); 268 spin_unlock_irqrestore(&cp->cp_lock, flags); 269 continue; 270 } 271 272 /* Require an ACK every once in a while */ 273 len = ntohl(rm->m_inc.i_hdr.h_len); 274 if (cp->cp_unacked_packets == 0 || 275 cp->cp_unacked_bytes < len) { 276 __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 277 278 cp->cp_unacked_packets = 279 rds_sysctl_max_unacked_packets; 280 cp->cp_unacked_bytes = 281 rds_sysctl_max_unacked_bytes; 282 rds_stats_inc(s_send_ack_required); 283 } else { 284 cp->cp_unacked_bytes -= len; 285 cp->cp_unacked_packets--; 286 } 287 288 cp->cp_xmit_rm = rm; 289 } 290 291 /* The transport either sends the whole rdma or none of it */ 292 if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) { 293 rm->m_final_op = &rm->rdma; 294 /* The transport owns the mapped memory for now. 295 * You can't unmap it while it's on the send queue 296 */ 297 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 298 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); 299 if (ret) { 300 clear_bit(RDS_MSG_MAPPED, &rm->m_flags); 301 wake_up_interruptible(&rm->m_flush_wait); 302 break; 303 } 304 cp->cp_xmit_rdma_sent = 1; 305 306 } 307 308 if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) { 309 rm->m_final_op = &rm->atomic; 310 /* The transport owns the mapped memory for now. 311 * You can't unmap it while it's on the send queue 312 */ 313 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 314 ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); 315 if (ret) { 316 clear_bit(RDS_MSG_MAPPED, &rm->m_flags); 317 wake_up_interruptible(&rm->m_flush_wait); 318 break; 319 } 320 cp->cp_xmit_atomic_sent = 1; 321 322 } 323 324 /* 325 * A number of cases require an RDS header to be sent 326 * even if there is no data. 327 * We permit 0-byte sends; rds-ping depends on this. 328 * However, if there are exclusively attached silent ops, 329 * we skip the hdr/data send, to enable silent operation. 330 */ 331 if (rm->data.op_nents == 0) { 332 int ops_present; 333 int all_ops_are_silent = 1; 334 335 ops_present = (rm->atomic.op_active || rm->rdma.op_active); 336 if (rm->atomic.op_active && !rm->atomic.op_silent) 337 all_ops_are_silent = 0; 338 if (rm->rdma.op_active && !rm->rdma.op_silent) 339 all_ops_are_silent = 0; 340 341 if (ops_present && all_ops_are_silent 342 && !rm->m_rdma_cookie) 343 rm->data.op_active = 0; 344 } 345 346 if (rm->data.op_active && !cp->cp_xmit_data_sent) { 347 rm->m_final_op = &rm->data; 348 349 ret = conn->c_trans->xmit(conn, rm, 350 cp->cp_xmit_hdr_off, 351 cp->cp_xmit_sg, 352 cp->cp_xmit_data_off); 353 if (ret <= 0) 354 break; 355 356 if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) { 357 tmp = min_t(int, ret, 358 sizeof(struct rds_header) - 359 cp->cp_xmit_hdr_off); 360 cp->cp_xmit_hdr_off += tmp; 361 ret -= tmp; 362 } 363 364 sg = &rm->data.op_sg[cp->cp_xmit_sg]; 365 while (ret) { 366 tmp = min_t(int, ret, sg->length - 367 cp->cp_xmit_data_off); 368 cp->cp_xmit_data_off += tmp; 369 ret -= tmp; 370 if (cp->cp_xmit_data_off == sg->length) { 371 cp->cp_xmit_data_off = 0; 372 sg++; 373 cp->cp_xmit_sg++; 374 BUG_ON(ret != 0 && cp->cp_xmit_sg == 375 rm->data.op_nents); 376 } 377 } 378 379 if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) && 380 (cp->cp_xmit_sg == rm->data.op_nents)) 381 cp->cp_xmit_data_sent = 1; 382 } 383 384 /* 385 * A rm will only take multiple times through this loop 386 * if there is a data op. Thus, if the data is sent (or there was 387 * none), then we're done with the rm. 388 */ 389 if (!rm->data.op_active || cp->cp_xmit_data_sent) { 390 cp->cp_xmit_rm = NULL; 391 cp->cp_xmit_sg = 0; 392 cp->cp_xmit_hdr_off = 0; 393 cp->cp_xmit_data_off = 0; 394 cp->cp_xmit_rdma_sent = 0; 395 cp->cp_xmit_atomic_sent = 0; 396 cp->cp_xmit_data_sent = 0; 397 398 rds_message_put(rm); 399 } 400 } 401 402 over_batch: 403 if (conn->c_trans->xmit_path_complete) 404 conn->c_trans->xmit_path_complete(cp); 405 release_in_xmit(cp); 406 407 /* Nuke any messages we decided not to retransmit. */ 408 if (!list_empty(&to_be_dropped)) { 409 /* irqs on here, so we can put(), unlike above */ 410 list_for_each_entry(rm, &to_be_dropped, m_conn_item) 411 rds_message_put(rm); 412 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); 413 } 414 415 /* 416 * Other senders can queue a message after we last test the send queue 417 * but before we clear RDS_IN_XMIT. In that case they'd back off and 418 * not try and send their newly queued message. We need to check the 419 * send queue after having cleared RDS_IN_XMIT so that their message 420 * doesn't get stuck on the send queue. 421 * 422 * If the transport cannot continue (i.e ret != 0), then it must 423 * call us when more room is available, such as from the tx 424 * completion handler. 425 * 426 * We have an extra generation check here so that if someone manages 427 * to jump in after our release_in_xmit, we'll see that they have done 428 * some work and we will skip our goto 429 */ 430 if (ret == 0) { 431 smp_mb(); 432 if ((test_bit(0, &conn->c_map_queued) || 433 !list_empty(&cp->cp_send_queue)) && 434 send_gen == cp->cp_send_gen) { 435 rds_stats_inc(s_send_lock_queue_raced); 436 if (batch_count < send_batch_count) 437 goto restart; 438 queue_delayed_work(rds_wq, &cp->cp_send_w, 1); 439 } 440 } 441 out: 442 return ret; 443 } 444 EXPORT_SYMBOL_GPL(rds_send_xmit); 445 446 static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm) 447 { 448 u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len); 449 450 assert_spin_locked(&rs->rs_lock); 451 452 BUG_ON(rs->rs_snd_bytes < len); 453 rs->rs_snd_bytes -= len; 454 455 if (rs->rs_snd_bytes == 0) 456 rds_stats_inc(s_send_queue_empty); 457 } 458 459 static inline int rds_send_is_acked(struct rds_message *rm, u64 ack, 460 is_acked_func is_acked) 461 { 462 if (is_acked) 463 return is_acked(rm, ack); 464 return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack; 465 } 466 467 /* 468 * This is pretty similar to what happens below in the ACK 469 * handling code - except that we call here as soon as we get 470 * the IB send completion on the RDMA op and the accompanying 471 * message. 472 */ 473 void rds_rdma_send_complete(struct rds_message *rm, int status) 474 { 475 struct rds_sock *rs = NULL; 476 struct rm_rdma_op *ro; 477 struct rds_notifier *notifier; 478 unsigned long flags; 479 480 spin_lock_irqsave(&rm->m_rs_lock, flags); 481 482 ro = &rm->rdma; 483 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && 484 ro->op_active && ro->op_notify && ro->op_notifier) { 485 notifier = ro->op_notifier; 486 rs = rm->m_rs; 487 sock_hold(rds_rs_to_sk(rs)); 488 489 notifier->n_status = status; 490 spin_lock(&rs->rs_lock); 491 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); 492 spin_unlock(&rs->rs_lock); 493 494 ro->op_notifier = NULL; 495 } 496 497 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 498 499 if (rs) { 500 rds_wake_sk_sleep(rs); 501 sock_put(rds_rs_to_sk(rs)); 502 } 503 } 504 EXPORT_SYMBOL_GPL(rds_rdma_send_complete); 505 506 /* 507 * Just like above, except looks at atomic op 508 */ 509 void rds_atomic_send_complete(struct rds_message *rm, int status) 510 { 511 struct rds_sock *rs = NULL; 512 struct rm_atomic_op *ao; 513 struct rds_notifier *notifier; 514 unsigned long flags; 515 516 spin_lock_irqsave(&rm->m_rs_lock, flags); 517 518 ao = &rm->atomic; 519 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) 520 && ao->op_active && ao->op_notify && ao->op_notifier) { 521 notifier = ao->op_notifier; 522 rs = rm->m_rs; 523 sock_hold(rds_rs_to_sk(rs)); 524 525 notifier->n_status = status; 526 spin_lock(&rs->rs_lock); 527 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue); 528 spin_unlock(&rs->rs_lock); 529 530 ao->op_notifier = NULL; 531 } 532 533 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 534 535 if (rs) { 536 rds_wake_sk_sleep(rs); 537 sock_put(rds_rs_to_sk(rs)); 538 } 539 } 540 EXPORT_SYMBOL_GPL(rds_atomic_send_complete); 541 542 /* 543 * This is the same as rds_rdma_send_complete except we 544 * don't do any locking - we have all the ingredients (message, 545 * socket, socket lock) and can just move the notifier. 546 */ 547 static inline void 548 __rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) 549 { 550 struct rm_rdma_op *ro; 551 struct rm_atomic_op *ao; 552 553 ro = &rm->rdma; 554 if (ro->op_active && ro->op_notify && ro->op_notifier) { 555 ro->op_notifier->n_status = status; 556 list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue); 557 ro->op_notifier = NULL; 558 } 559 560 ao = &rm->atomic; 561 if (ao->op_active && ao->op_notify && ao->op_notifier) { 562 ao->op_notifier->n_status = status; 563 list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue); 564 ao->op_notifier = NULL; 565 } 566 567 /* No need to wake the app - caller does this */ 568 } 569 570 /* 571 * This removes messages from the socket's list if they're on it. The list 572 * argument must be private to the caller, we must be able to modify it 573 * without locks. The messages must have a reference held for their 574 * position on the list. This function will drop that reference after 575 * removing the messages from the 'messages' list regardless of if it found 576 * the messages on the socket list or not. 577 */ 578 static void rds_send_remove_from_sock(struct list_head *messages, int status) 579 { 580 unsigned long flags; 581 struct rds_sock *rs = NULL; 582 struct rds_message *rm; 583 584 while (!list_empty(messages)) { 585 int was_on_sock = 0; 586 587 rm = list_entry(messages->next, struct rds_message, 588 m_conn_item); 589 list_del_init(&rm->m_conn_item); 590 591 /* 592 * If we see this flag cleared then we're *sure* that someone 593 * else beat us to removing it from the sock. If we race 594 * with their flag update we'll get the lock and then really 595 * see that the flag has been cleared. 596 * 597 * The message spinlock makes sure nobody clears rm->m_rs 598 * while we're messing with it. It does not prevent the 599 * message from being removed from the socket, though. 600 */ 601 spin_lock_irqsave(&rm->m_rs_lock, flags); 602 if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) 603 goto unlock_and_drop; 604 605 if (rs != rm->m_rs) { 606 if (rs) { 607 rds_wake_sk_sleep(rs); 608 sock_put(rds_rs_to_sk(rs)); 609 } 610 rs = rm->m_rs; 611 if (rs) 612 sock_hold(rds_rs_to_sk(rs)); 613 } 614 if (!rs) 615 goto unlock_and_drop; 616 spin_lock(&rs->rs_lock); 617 618 if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) { 619 struct rm_rdma_op *ro = &rm->rdma; 620 struct rds_notifier *notifier; 621 622 list_del_init(&rm->m_sock_item); 623 rds_send_sndbuf_remove(rs, rm); 624 625 if (ro->op_active && ro->op_notifier && 626 (ro->op_notify || (ro->op_recverr && status))) { 627 notifier = ro->op_notifier; 628 list_add_tail(¬ifier->n_list, 629 &rs->rs_notify_queue); 630 if (!notifier->n_status) 631 notifier->n_status = status; 632 rm->rdma.op_notifier = NULL; 633 } 634 was_on_sock = 1; 635 rm->m_rs = NULL; 636 } 637 spin_unlock(&rs->rs_lock); 638 639 unlock_and_drop: 640 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 641 rds_message_put(rm); 642 if (was_on_sock) 643 rds_message_put(rm); 644 } 645 646 if (rs) { 647 rds_wake_sk_sleep(rs); 648 sock_put(rds_rs_to_sk(rs)); 649 } 650 } 651 652 /* 653 * Transports call here when they've determined that the receiver queued 654 * messages up to, and including, the given sequence number. Messages are 655 * moved to the retrans queue when rds_send_xmit picks them off the send 656 * queue. This means that in the TCP case, the message may not have been 657 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked 658 * checks the RDS_MSG_HAS_ACK_SEQ bit. 659 */ 660 void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack, 661 is_acked_func is_acked) 662 { 663 struct rds_message *rm, *tmp; 664 unsigned long flags; 665 LIST_HEAD(list); 666 667 spin_lock_irqsave(&cp->cp_lock, flags); 668 669 list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) { 670 if (!rds_send_is_acked(rm, ack, is_acked)) 671 break; 672 673 list_move(&rm->m_conn_item, &list); 674 clear_bit(RDS_MSG_ON_CONN, &rm->m_flags); 675 } 676 677 /* order flag updates with spin locks */ 678 if (!list_empty(&list)) 679 smp_mb__after_atomic(); 680 681 spin_unlock_irqrestore(&cp->cp_lock, flags); 682 683 /* now remove the messages from the sock list as needed */ 684 rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS); 685 } 686 EXPORT_SYMBOL_GPL(rds_send_path_drop_acked); 687 688 void rds_send_drop_acked(struct rds_connection *conn, u64 ack, 689 is_acked_func is_acked) 690 { 691 WARN_ON(conn->c_trans->t_mp_capable); 692 rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked); 693 } 694 EXPORT_SYMBOL_GPL(rds_send_drop_acked); 695 696 void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) 697 { 698 struct rds_message *rm, *tmp; 699 struct rds_connection *conn; 700 struct rds_conn_path *cp; 701 unsigned long flags; 702 LIST_HEAD(list); 703 704 /* get all the messages we're dropping under the rs lock */ 705 spin_lock_irqsave(&rs->rs_lock, flags); 706 707 list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) { 708 if (dest && (dest->sin_addr.s_addr != rm->m_daddr || 709 dest->sin_port != rm->m_inc.i_hdr.h_dport)) 710 continue; 711 712 list_move(&rm->m_sock_item, &list); 713 rds_send_sndbuf_remove(rs, rm); 714 clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags); 715 } 716 717 /* order flag updates with the rs lock */ 718 smp_mb__after_atomic(); 719 720 spin_unlock_irqrestore(&rs->rs_lock, flags); 721 722 if (list_empty(&list)) 723 return; 724 725 /* Remove the messages from the conn */ 726 list_for_each_entry(rm, &list, m_sock_item) { 727 728 conn = rm->m_inc.i_conn; 729 if (conn->c_trans->t_mp_capable) 730 cp = rm->m_inc.i_conn_path; 731 else 732 cp = &conn->c_path[0]; 733 734 spin_lock_irqsave(&cp->cp_lock, flags); 735 /* 736 * Maybe someone else beat us to removing rm from the conn. 737 * If we race with their flag update we'll get the lock and 738 * then really see that the flag has been cleared. 739 */ 740 if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { 741 spin_unlock_irqrestore(&cp->cp_lock, flags); 742 spin_lock_irqsave(&rm->m_rs_lock, flags); 743 rm->m_rs = NULL; 744 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 745 continue; 746 } 747 list_del_init(&rm->m_conn_item); 748 spin_unlock_irqrestore(&cp->cp_lock, flags); 749 750 /* 751 * Couldn't grab m_rs_lock in top loop (lock ordering), 752 * but we can now. 753 */ 754 spin_lock_irqsave(&rm->m_rs_lock, flags); 755 756 spin_lock(&rs->rs_lock); 757 __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); 758 spin_unlock(&rs->rs_lock); 759 760 rm->m_rs = NULL; 761 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 762 763 rds_message_put(rm); 764 } 765 766 rds_wake_sk_sleep(rs); 767 768 while (!list_empty(&list)) { 769 rm = list_entry(list.next, struct rds_message, m_sock_item); 770 list_del_init(&rm->m_sock_item); 771 rds_message_wait(rm); 772 773 /* just in case the code above skipped this message 774 * because RDS_MSG_ON_CONN wasn't set, run it again here 775 * taking m_rs_lock is the only thing that keeps us 776 * from racing with ack processing. 777 */ 778 spin_lock_irqsave(&rm->m_rs_lock, flags); 779 780 spin_lock(&rs->rs_lock); 781 __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); 782 spin_unlock(&rs->rs_lock); 783 784 rm->m_rs = NULL; 785 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 786 787 rds_message_put(rm); 788 } 789 } 790 791 /* 792 * we only want this to fire once so we use the callers 'queued'. It's 793 * possible that another thread can race with us and remove the 794 * message from the flow with RDS_CANCEL_SENT_TO. 795 */ 796 static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, 797 struct rds_conn_path *cp, 798 struct rds_message *rm, __be16 sport, 799 __be16 dport, int *queued) 800 { 801 unsigned long flags; 802 u32 len; 803 804 if (*queued) 805 goto out; 806 807 len = be32_to_cpu(rm->m_inc.i_hdr.h_len); 808 809 /* this is the only place which holds both the socket's rs_lock 810 * and the connection's c_lock */ 811 spin_lock_irqsave(&rs->rs_lock, flags); 812 813 /* 814 * If there is a little space in sndbuf, we don't queue anything, 815 * and userspace gets -EAGAIN. But poll() indicates there's send 816 * room. This can lead to bad behavior (spinning) if snd_bytes isn't 817 * freed up by incoming acks. So we check the *old* value of 818 * rs_snd_bytes here to allow the last msg to exceed the buffer, 819 * and poll() now knows no more data can be sent. 820 */ 821 if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) { 822 rs->rs_snd_bytes += len; 823 824 /* let recv side know we are close to send space exhaustion. 825 * This is probably not the optimal way to do it, as this 826 * means we set the flag on *all* messages as soon as our 827 * throughput hits a certain threshold. 828 */ 829 if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2) 830 __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 831 832 list_add_tail(&rm->m_sock_item, &rs->rs_send_queue); 833 set_bit(RDS_MSG_ON_SOCK, &rm->m_flags); 834 rds_message_addref(rm); 835 rm->m_rs = rs; 836 837 /* The code ordering is a little weird, but we're 838 trying to minimize the time we hold c_lock */ 839 rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0); 840 rm->m_inc.i_conn = conn; 841 rm->m_inc.i_conn_path = cp; 842 rds_message_addref(rm); 843 844 spin_lock(&cp->cp_lock); 845 rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++); 846 list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); 847 set_bit(RDS_MSG_ON_CONN, &rm->m_flags); 848 spin_unlock(&cp->cp_lock); 849 850 rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n", 851 rm, len, rs, rs->rs_snd_bytes, 852 (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence)); 853 854 *queued = 1; 855 } 856 857 spin_unlock_irqrestore(&rs->rs_lock, flags); 858 out: 859 return *queued; 860 } 861 862 /* 863 * rds_message is getting to be quite complicated, and we'd like to allocate 864 * it all in one go. This figures out how big it needs to be up front. 865 */ 866 static int rds_rm_size(struct msghdr *msg, int data_len) 867 { 868 struct cmsghdr *cmsg; 869 int size = 0; 870 int cmsg_groups = 0; 871 int retval; 872 873 for_each_cmsghdr(cmsg, msg) { 874 if (!CMSG_OK(msg, cmsg)) 875 return -EINVAL; 876 877 if (cmsg->cmsg_level != SOL_RDS) 878 continue; 879 880 switch (cmsg->cmsg_type) { 881 case RDS_CMSG_RDMA_ARGS: 882 cmsg_groups |= 1; 883 retval = rds_rdma_extra_size(CMSG_DATA(cmsg)); 884 if (retval < 0) 885 return retval; 886 size += retval; 887 888 break; 889 890 case RDS_CMSG_RDMA_DEST: 891 case RDS_CMSG_RDMA_MAP: 892 cmsg_groups |= 2; 893 /* these are valid but do no add any size */ 894 break; 895 896 case RDS_CMSG_ATOMIC_CSWP: 897 case RDS_CMSG_ATOMIC_FADD: 898 case RDS_CMSG_MASKED_ATOMIC_CSWP: 899 case RDS_CMSG_MASKED_ATOMIC_FADD: 900 cmsg_groups |= 1; 901 size += sizeof(struct scatterlist); 902 break; 903 904 default: 905 return -EINVAL; 906 } 907 908 } 909 910 size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist); 911 912 /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */ 913 if (cmsg_groups == 3) 914 return -EINVAL; 915 916 return size; 917 } 918 919 static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, 920 struct msghdr *msg, int *allocated_mr) 921 { 922 struct cmsghdr *cmsg; 923 int ret = 0; 924 925 for_each_cmsghdr(cmsg, msg) { 926 if (!CMSG_OK(msg, cmsg)) 927 return -EINVAL; 928 929 if (cmsg->cmsg_level != SOL_RDS) 930 continue; 931 932 /* As a side effect, RDMA_DEST and RDMA_MAP will set 933 * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr. 934 */ 935 switch (cmsg->cmsg_type) { 936 case RDS_CMSG_RDMA_ARGS: 937 ret = rds_cmsg_rdma_args(rs, rm, cmsg); 938 break; 939 940 case RDS_CMSG_RDMA_DEST: 941 ret = rds_cmsg_rdma_dest(rs, rm, cmsg); 942 break; 943 944 case RDS_CMSG_RDMA_MAP: 945 ret = rds_cmsg_rdma_map(rs, rm, cmsg); 946 if (!ret) 947 *allocated_mr = 1; 948 break; 949 case RDS_CMSG_ATOMIC_CSWP: 950 case RDS_CMSG_ATOMIC_FADD: 951 case RDS_CMSG_MASKED_ATOMIC_CSWP: 952 case RDS_CMSG_MASKED_ATOMIC_FADD: 953 ret = rds_cmsg_atomic(rs, rm, cmsg); 954 break; 955 956 default: 957 return -EINVAL; 958 } 959 960 if (ret) 961 break; 962 } 963 964 return ret; 965 } 966 967 static void rds_send_ping(struct rds_connection *conn); 968 969 static int rds_send_mprds_hash(struct rds_sock *rs, struct rds_connection *conn) 970 { 971 int hash; 972 973 if (conn->c_npaths == 0) 974 hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS); 975 else 976 hash = RDS_MPATH_HASH(rs, conn->c_npaths); 977 if (conn->c_npaths == 0 && hash != 0) { 978 rds_send_ping(conn); 979 980 if (conn->c_npaths == 0) { 981 wait_event_interruptible(conn->c_hs_waitq, 982 (conn->c_npaths != 0)); 983 } 984 if (conn->c_npaths == 1) 985 hash = 0; 986 } 987 return hash; 988 } 989 990 int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) 991 { 992 struct sock *sk = sock->sk; 993 struct rds_sock *rs = rds_sk_to_rs(sk); 994 DECLARE_SOCKADDR(struct sockaddr_in *, usin, msg->msg_name); 995 __be32 daddr; 996 __be16 dport; 997 struct rds_message *rm = NULL; 998 struct rds_connection *conn; 999 int ret = 0; 1000 int queued = 0, allocated_mr = 0; 1001 int nonblock = msg->msg_flags & MSG_DONTWAIT; 1002 long timeo = sock_sndtimeo(sk, nonblock); 1003 struct rds_conn_path *cpath; 1004 1005 /* Mirror Linux UDP mirror of BSD error message compatibility */ 1006 /* XXX: Perhaps MSG_MORE someday */ 1007 if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT)) { 1008 ret = -EOPNOTSUPP; 1009 goto out; 1010 } 1011 1012 if (msg->msg_namelen) { 1013 /* XXX fail non-unicast destination IPs? */ 1014 if (msg->msg_namelen < sizeof(*usin) || usin->sin_family != AF_INET) { 1015 ret = -EINVAL; 1016 goto out; 1017 } 1018 daddr = usin->sin_addr.s_addr; 1019 dport = usin->sin_port; 1020 } else { 1021 /* We only care about consistency with ->connect() */ 1022 lock_sock(sk); 1023 daddr = rs->rs_conn_addr; 1024 dport = rs->rs_conn_port; 1025 release_sock(sk); 1026 } 1027 1028 lock_sock(sk); 1029 if (daddr == 0 || rs->rs_bound_addr == 0) { 1030 release_sock(sk); 1031 ret = -ENOTCONN; /* XXX not a great errno */ 1032 goto out; 1033 } 1034 release_sock(sk); 1035 1036 if (payload_len > rds_sk_sndbuf(rs)) { 1037 ret = -EMSGSIZE; 1038 goto out; 1039 } 1040 1041 /* size of rm including all sgs */ 1042 ret = rds_rm_size(msg, payload_len); 1043 if (ret < 0) 1044 goto out; 1045 1046 rm = rds_message_alloc(ret, GFP_KERNEL); 1047 if (!rm) { 1048 ret = -ENOMEM; 1049 goto out; 1050 } 1051 1052 /* Attach data to the rm */ 1053 if (payload_len) { 1054 rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE)); 1055 if (!rm->data.op_sg) { 1056 ret = -ENOMEM; 1057 goto out; 1058 } 1059 ret = rds_message_copy_from_user(rm, &msg->msg_iter); 1060 if (ret) 1061 goto out; 1062 } 1063 rm->data.op_active = 1; 1064 1065 rm->m_daddr = daddr; 1066 1067 /* rds_conn_create has a spinlock that runs with IRQ off. 1068 * Caching the conn in the socket helps a lot. */ 1069 if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) 1070 conn = rs->rs_conn; 1071 else { 1072 conn = rds_conn_create_outgoing(sock_net(sock->sk), 1073 rs->rs_bound_addr, daddr, 1074 rs->rs_transport, 1075 sock->sk->sk_allocation); 1076 if (IS_ERR(conn)) { 1077 ret = PTR_ERR(conn); 1078 goto out; 1079 } 1080 rs->rs_conn = conn; 1081 } 1082 1083 /* Parse any control messages the user may have included. */ 1084 ret = rds_cmsg_send(rs, rm, msg, &allocated_mr); 1085 if (ret) 1086 goto out; 1087 1088 if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) { 1089 printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", 1090 &rm->rdma, conn->c_trans->xmit_rdma); 1091 ret = -EOPNOTSUPP; 1092 goto out; 1093 } 1094 1095 if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) { 1096 printk_ratelimited(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n", 1097 &rm->atomic, conn->c_trans->xmit_atomic); 1098 ret = -EOPNOTSUPP; 1099 goto out; 1100 } 1101 1102 if (conn->c_trans->t_mp_capable) 1103 cpath = &conn->c_path[rds_send_mprds_hash(rs, conn)]; 1104 else 1105 cpath = &conn->c_path[0]; 1106 1107 rds_conn_path_connect_if_down(cpath); 1108 1109 ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); 1110 if (ret) { 1111 rs->rs_seen_congestion = 1; 1112 goto out; 1113 } 1114 while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port, 1115 dport, &queued)) { 1116 rds_stats_inc(s_send_queue_full); 1117 1118 if (nonblock) { 1119 ret = -EAGAIN; 1120 goto out; 1121 } 1122 1123 timeo = wait_event_interruptible_timeout(*sk_sleep(sk), 1124 rds_send_queue_rm(rs, conn, cpath, rm, 1125 rs->rs_bound_port, 1126 dport, 1127 &queued), 1128 timeo); 1129 rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo); 1130 if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT) 1131 continue; 1132 1133 ret = timeo; 1134 if (ret == 0) 1135 ret = -ETIMEDOUT; 1136 goto out; 1137 } 1138 1139 /* 1140 * By now we've committed to the send. We reuse rds_send_worker() 1141 * to retry sends in the rds thread if the transport asks us to. 1142 */ 1143 rds_stats_inc(s_send_queued); 1144 1145 ret = rds_send_xmit(cpath); 1146 if (ret == -ENOMEM || ret == -EAGAIN) 1147 queue_delayed_work(rds_wq, &cpath->cp_send_w, 1); 1148 1149 rds_message_put(rm); 1150 return payload_len; 1151 1152 out: 1153 /* If the user included a RDMA_MAP cmsg, we allocated a MR on the fly. 1154 * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN 1155 * or in any other way, we need to destroy the MR again */ 1156 if (allocated_mr) 1157 rds_rdma_unuse(rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1); 1158 1159 if (rm) 1160 rds_message_put(rm); 1161 return ret; 1162 } 1163 1164 /* 1165 * send out a probe. Can be shared by rds_send_ping, 1166 * rds_send_pong, rds_send_hb. 1167 * rds_send_hb should use h_flags 1168 * RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED 1169 * or 1170 * RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED 1171 */ 1172 int 1173 rds_send_probe(struct rds_conn_path *cp, __be16 sport, 1174 __be16 dport, u8 h_flags) 1175 { 1176 struct rds_message *rm; 1177 unsigned long flags; 1178 int ret = 0; 1179 1180 rm = rds_message_alloc(0, GFP_ATOMIC); 1181 if (!rm) { 1182 ret = -ENOMEM; 1183 goto out; 1184 } 1185 1186 rm->m_daddr = cp->cp_conn->c_faddr; 1187 rm->data.op_active = 1; 1188 1189 rds_conn_path_connect_if_down(cp); 1190 1191 ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL); 1192 if (ret) 1193 goto out; 1194 1195 spin_lock_irqsave(&cp->cp_lock, flags); 1196 list_add_tail(&rm->m_conn_item, &cp->cp_send_queue); 1197 set_bit(RDS_MSG_ON_CONN, &rm->m_flags); 1198 rds_message_addref(rm); 1199 rm->m_inc.i_conn = cp->cp_conn; 1200 rm->m_inc.i_conn_path = cp; 1201 1202 rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 1203 cp->cp_next_tx_seq); 1204 rm->m_inc.i_hdr.h_flags |= h_flags; 1205 cp->cp_next_tx_seq++; 1206 1207 if (RDS_HS_PROBE(sport, dport) && cp->cp_conn->c_trans->t_mp_capable) { 1208 u16 npaths = RDS_MPATH_WORKERS; 1209 1210 rds_message_add_extension(&rm->m_inc.i_hdr, 1211 RDS_EXTHDR_NPATHS, &npaths, 1212 sizeof(npaths)); 1213 rds_message_add_extension(&rm->m_inc.i_hdr, 1214 RDS_EXTHDR_GEN_NUM, 1215 &cp->cp_conn->c_my_gen_num, 1216 sizeof(u32)); 1217 } 1218 spin_unlock_irqrestore(&cp->cp_lock, flags); 1219 1220 rds_stats_inc(s_send_queued); 1221 rds_stats_inc(s_send_pong); 1222 1223 /* schedule the send work on rds_wq */ 1224 queue_delayed_work(rds_wq, &cp->cp_send_w, 1); 1225 1226 rds_message_put(rm); 1227 return 0; 1228 1229 out: 1230 if (rm) 1231 rds_message_put(rm); 1232 return ret; 1233 } 1234 1235 int 1236 rds_send_pong(struct rds_conn_path *cp, __be16 dport) 1237 { 1238 return rds_send_probe(cp, 0, dport, 0); 1239 } 1240 1241 void 1242 rds_send_ping(struct rds_connection *conn) 1243 { 1244 unsigned long flags; 1245 struct rds_conn_path *cp = &conn->c_path[0]; 1246 1247 spin_lock_irqsave(&cp->cp_lock, flags); 1248 if (conn->c_ping_triggered) { 1249 spin_unlock_irqrestore(&cp->cp_lock, flags); 1250 return; 1251 } 1252 conn->c_ping_triggered = 1; 1253 spin_unlock_irqrestore(&cp->cp_lock, flags); 1254 rds_send_probe(&conn->c_path[0], RDS_FLAG_PROBE_PORT, 0, 0); 1255 } 1256