1 /* 2 * Copyright (c) 2006 Oracle. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 * 32 */ 33 #include <linux/kernel.h> 34 #include <linux/in.h> 35 #include <linux/device.h> 36 #include <linux/dmapool.h> 37 #include <linux/ratelimit.h> 38 39 #include "rds_single_path.h" 40 #include "rds.h" 41 #include "ib.h" 42 43 /* 44 * Convert IB-specific error message to RDS error message and call core 45 * completion handler. 46 */ 47 static void rds_ib_send_complete(struct rds_message *rm, 48 int wc_status, 49 void (*complete)(struct rds_message *rm, int status)) 50 { 51 int notify_status; 52 53 switch (wc_status) { 54 case IB_WC_WR_FLUSH_ERR: 55 return; 56 57 case IB_WC_SUCCESS: 58 notify_status = RDS_RDMA_SUCCESS; 59 break; 60 61 case IB_WC_REM_ACCESS_ERR: 62 notify_status = RDS_RDMA_REMOTE_ERROR; 63 break; 64 65 default: 66 notify_status = RDS_RDMA_OTHER_ERROR; 67 break; 68 } 69 complete(rm, notify_status); 70 } 71 72 static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic, 73 struct rm_rdma_op *op, 74 int wc_status) 75 { 76 if (op->op_mapped) { 77 ib_dma_unmap_sg(ic->i_cm_id->device, 78 op->op_sg, op->op_nents, 79 op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE); 80 op->op_mapped = 0; 81 } 82 83 /* If the user asked for a completion notification on this 84 * message, we can implement three different semantics: 85 * 1. Notify when we received the ACK on the RDS message 86 * that was queued with the RDMA. This provides reliable 87 * notification of RDMA status at the expense of a one-way 88 * packet delay. 89 * 2. Notify when the IB stack gives us the completion event for 90 * the RDMA operation. 91 * 3. Notify when the IB stack gives us the completion event for 92 * the accompanying RDS messages. 93 * Here, we implement approach #3. To implement approach #2, 94 * we would need to take an event for the rdma WR. To implement #1, 95 * don't call rds_rdma_send_complete at all, and fall back to the notify 96 * handling in the ACK processing code. 97 * 98 * Note: There's no need to explicitly sync any RDMA buffers using 99 * ib_dma_sync_sg_for_cpu - the completion for the RDMA 100 * operation itself unmapped the RDMA buffers, which takes care 101 * of synching. 102 */ 103 rds_ib_send_complete(container_of(op, struct rds_message, rdma), 104 wc_status, rds_rdma_send_complete); 105 106 if (op->op_write) 107 rds_stats_add(s_send_rdma_bytes, op->op_bytes); 108 else 109 rds_stats_add(s_recv_rdma_bytes, op->op_bytes); 110 } 111 112 static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic, 113 struct rm_atomic_op *op, 114 int wc_status) 115 { 116 /* unmap atomic recvbuf */ 117 if (op->op_mapped) { 118 ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1, 119 DMA_FROM_DEVICE); 120 op->op_mapped = 0; 121 } 122 123 rds_ib_send_complete(container_of(op, struct rds_message, atomic), 124 wc_status, rds_atomic_send_complete); 125 126 if (op->op_type == RDS_ATOMIC_TYPE_CSWP) 127 rds_ib_stats_inc(s_ib_atomic_cswp); 128 else 129 rds_ib_stats_inc(s_ib_atomic_fadd); 130 } 131 132 static void rds_ib_send_unmap_data(struct rds_ib_connection *ic, 133 struct rm_data_op *op, 134 int wc_status) 135 { 136 struct rds_message *rm = container_of(op, struct rds_message, data); 137 138 if (op->op_nents) 139 ib_dma_unmap_sg(ic->i_cm_id->device, 140 op->op_sg, op->op_nents, 141 DMA_TO_DEVICE); 142 143 if (rm->rdma.op_active && rm->data.op_notify) 144 rds_ib_send_unmap_rdma(ic, &rm->rdma, wc_status); 145 } 146 147 /* 148 * Unmap the resources associated with a struct send_work. 149 * 150 * Returns the rm for no good reason other than it is unobtainable 151 * other than by switching on wr.opcode, currently, and the caller, 152 * the event handler, needs it. 153 */ 154 static struct rds_message *rds_ib_send_unmap_op(struct rds_ib_connection *ic, 155 struct rds_ib_send_work *send, 156 int wc_status) 157 { 158 struct rds_message *rm = NULL; 159 160 /* In the error case, wc.opcode sometimes contains garbage */ 161 switch (send->s_wr.opcode) { 162 case IB_WR_SEND: 163 if (send->s_op) { 164 rm = container_of(send->s_op, struct rds_message, data); 165 rds_ib_send_unmap_data(ic, send->s_op, wc_status); 166 } 167 break; 168 case IB_WR_RDMA_WRITE: 169 case IB_WR_RDMA_READ: 170 if (send->s_op) { 171 rm = container_of(send->s_op, struct rds_message, rdma); 172 rds_ib_send_unmap_rdma(ic, send->s_op, wc_status); 173 } 174 break; 175 case IB_WR_ATOMIC_FETCH_AND_ADD: 176 case IB_WR_ATOMIC_CMP_AND_SWP: 177 if (send->s_op) { 178 rm = container_of(send->s_op, struct rds_message, atomic); 179 rds_ib_send_unmap_atomic(ic, send->s_op, wc_status); 180 } 181 break; 182 default: 183 printk_ratelimited(KERN_NOTICE 184 "RDS/IB: %s: unexpected opcode 0x%x in WR!\n", 185 __func__, send->s_wr.opcode); 186 break; 187 } 188 189 send->s_wr.opcode = 0xdead; 190 191 return rm; 192 } 193 194 void rds_ib_send_init_ring(struct rds_ib_connection *ic) 195 { 196 struct rds_ib_send_work *send; 197 u32 i; 198 199 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { 200 struct ib_sge *sge; 201 202 send->s_op = NULL; 203 204 send->s_wr.wr_id = i; 205 send->s_wr.sg_list = send->s_sge; 206 send->s_wr.ex.imm_data = 0; 207 208 sge = &send->s_sge[0]; 209 sge->addr = ic->i_send_hdrs_dma + (i * sizeof(struct rds_header)); 210 sge->length = sizeof(struct rds_header); 211 sge->lkey = ic->i_pd->local_dma_lkey; 212 213 send->s_sge[1].lkey = ic->i_pd->local_dma_lkey; 214 } 215 } 216 217 void rds_ib_send_clear_ring(struct rds_ib_connection *ic) 218 { 219 struct rds_ib_send_work *send; 220 u32 i; 221 222 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { 223 if (send->s_op && send->s_wr.opcode != 0xdead) 224 rds_ib_send_unmap_op(ic, send, IB_WC_WR_FLUSH_ERR); 225 } 226 } 227 228 /* 229 * The only fast path caller always has a non-zero nr, so we don't 230 * bother testing nr before performing the atomic sub. 231 */ 232 static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr) 233 { 234 if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) && 235 waitqueue_active(&rds_ib_ring_empty_wait)) 236 wake_up(&rds_ib_ring_empty_wait); 237 BUG_ON(atomic_read(&ic->i_signaled_sends) < 0); 238 } 239 240 /* 241 * The _oldest/_free ring operations here race cleanly with the alloc/unalloc 242 * operations performed in the send path. As the sender allocs and potentially 243 * unallocs the next free entry in the ring it doesn't alter which is 244 * the next to be freed, which is what this is concerned with. 245 */ 246 void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc) 247 { 248 struct rds_message *rm = NULL; 249 struct rds_connection *conn = ic->conn; 250 struct rds_ib_send_work *send; 251 u32 completed; 252 u32 oldest; 253 u32 i = 0; 254 int nr_sig = 0; 255 256 257 rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n", 258 (unsigned long long)wc->wr_id, wc->status, 259 ib_wc_status_msg(wc->status), wc->byte_len, 260 be32_to_cpu(wc->ex.imm_data)); 261 rds_ib_stats_inc(s_ib_tx_cq_event); 262 263 if (wc->wr_id == RDS_IB_ACK_WR_ID) { 264 if (time_after(jiffies, ic->i_ack_queued + HZ / 2)) 265 rds_ib_stats_inc(s_ib_tx_stalled); 266 rds_ib_ack_send_complete(ic); 267 return; 268 } 269 270 oldest = rds_ib_ring_oldest(&ic->i_send_ring); 271 272 completed = rds_ib_ring_completed(&ic->i_send_ring, wc->wr_id, oldest); 273 274 for (i = 0; i < completed; i++) { 275 send = &ic->i_sends[oldest]; 276 if (send->s_wr.send_flags & IB_SEND_SIGNALED) 277 nr_sig++; 278 279 rm = rds_ib_send_unmap_op(ic, send, wc->status); 280 281 if (time_after(jiffies, send->s_queued + HZ / 2)) 282 rds_ib_stats_inc(s_ib_tx_stalled); 283 284 if (send->s_op) { 285 if (send->s_op == rm->m_final_op) { 286 /* If anyone waited for this message to get 287 * flushed out, wake them up now 288 */ 289 rds_message_unmapped(rm); 290 } 291 rds_message_put(rm); 292 send->s_op = NULL; 293 } 294 295 oldest = (oldest + 1) % ic->i_send_ring.w_nr; 296 } 297 298 rds_ib_ring_free(&ic->i_send_ring, completed); 299 rds_ib_sub_signaled(ic, nr_sig); 300 nr_sig = 0; 301 302 if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) || 303 test_bit(0, &conn->c_map_queued)) 304 queue_delayed_work(rds_wq, &conn->c_send_w, 0); 305 306 /* We expect errors as the qp is drained during shutdown */ 307 if (wc->status != IB_WC_SUCCESS && rds_conn_up(conn)) { 308 rds_ib_conn_error(conn, "send completion on <%pI4,%pI4> had status %u (%s), disconnecting and reconnecting\n", 309 &conn->c_laddr, &conn->c_faddr, wc->status, 310 ib_wc_status_msg(wc->status)); 311 } 312 } 313 314 /* 315 * This is the main function for allocating credits when sending 316 * messages. 317 * 318 * Conceptually, we have two counters: 319 * - send credits: this tells us how many WRs we're allowed 320 * to submit without overruning the receiver's queue. For 321 * each SEND WR we post, we decrement this by one. 322 * 323 * - posted credits: this tells us how many WRs we recently 324 * posted to the receive queue. This value is transferred 325 * to the peer as a "credit update" in a RDS header field. 326 * Every time we transmit credits to the peer, we subtract 327 * the amount of transferred credits from this counter. 328 * 329 * It is essential that we avoid situations where both sides have 330 * exhausted their send credits, and are unable to send new credits 331 * to the peer. We achieve this by requiring that we send at least 332 * one credit update to the peer before exhausting our credits. 333 * When new credits arrive, we subtract one credit that is withheld 334 * until we've posted new buffers and are ready to transmit these 335 * credits (see rds_ib_send_add_credits below). 336 * 337 * The RDS send code is essentially single-threaded; rds_send_xmit 338 * sets RDS_IN_XMIT to ensure exclusive access to the send ring. 339 * However, the ACK sending code is independent and can race with 340 * message SENDs. 341 * 342 * In the send path, we need to update the counters for send credits 343 * and the counter of posted buffers atomically - when we use the 344 * last available credit, we cannot allow another thread to race us 345 * and grab the posted credits counter. Hence, we have to use a 346 * spinlock to protect the credit counter, or use atomics. 347 * 348 * Spinlocks shared between the send and the receive path are bad, 349 * because they create unnecessary delays. An early implementation 350 * using a spinlock showed a 5% degradation in throughput at some 351 * loads. 352 * 353 * This implementation avoids spinlocks completely, putting both 354 * counters into a single atomic, and updating that atomic using 355 * atomic_add (in the receive path, when receiving fresh credits), 356 * and using atomic_cmpxchg when updating the two counters. 357 */ 358 int rds_ib_send_grab_credits(struct rds_ib_connection *ic, 359 u32 wanted, u32 *adv_credits, int need_posted, int max_posted) 360 { 361 unsigned int avail, posted, got = 0, advertise; 362 long oldval, newval; 363 364 *adv_credits = 0; 365 if (!ic->i_flowctl) 366 return wanted; 367 368 try_again: 369 advertise = 0; 370 oldval = newval = atomic_read(&ic->i_credits); 371 posted = IB_GET_POST_CREDITS(oldval); 372 avail = IB_GET_SEND_CREDITS(oldval); 373 374 rdsdebug("wanted=%u credits=%u posted=%u\n", 375 wanted, avail, posted); 376 377 /* The last credit must be used to send a credit update. */ 378 if (avail && !posted) 379 avail--; 380 381 if (avail < wanted) { 382 struct rds_connection *conn = ic->i_cm_id->context; 383 384 /* Oops, there aren't that many credits left! */ 385 set_bit(RDS_LL_SEND_FULL, &conn->c_flags); 386 got = avail; 387 } else { 388 /* Sometimes you get what you want, lalala. */ 389 got = wanted; 390 } 391 newval -= IB_SET_SEND_CREDITS(got); 392 393 /* 394 * If need_posted is non-zero, then the caller wants 395 * the posted regardless of whether any send credits are 396 * available. 397 */ 398 if (posted && (got || need_posted)) { 399 advertise = min_t(unsigned int, posted, max_posted); 400 newval -= IB_SET_POST_CREDITS(advertise); 401 } 402 403 /* Finally bill everything */ 404 if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval) 405 goto try_again; 406 407 *adv_credits = advertise; 408 return got; 409 } 410 411 void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits) 412 { 413 struct rds_ib_connection *ic = conn->c_transport_data; 414 415 if (credits == 0) 416 return; 417 418 rdsdebug("credits=%u current=%u%s\n", 419 credits, 420 IB_GET_SEND_CREDITS(atomic_read(&ic->i_credits)), 421 test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ? ", ll_send_full" : ""); 422 423 atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits); 424 if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags)) 425 queue_delayed_work(rds_wq, &conn->c_send_w, 0); 426 427 WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384); 428 429 rds_ib_stats_inc(s_ib_rx_credit_updates); 430 } 431 432 void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted) 433 { 434 struct rds_ib_connection *ic = conn->c_transport_data; 435 436 if (posted == 0) 437 return; 438 439 atomic_add(IB_SET_POST_CREDITS(posted), &ic->i_credits); 440 441 /* Decide whether to send an update to the peer now. 442 * If we would send a credit update for every single buffer we 443 * post, we would end up with an ACK storm (ACK arrives, 444 * consumes buffer, we refill the ring, send ACK to remote 445 * advertising the newly posted buffer... ad inf) 446 * 447 * Performance pretty much depends on how often we send 448 * credit updates - too frequent updates mean lots of ACKs. 449 * Too infrequent updates, and the peer will run out of 450 * credits and has to throttle. 451 * For the time being, 16 seems to be a good compromise. 452 */ 453 if (IB_GET_POST_CREDITS(atomic_read(&ic->i_credits)) >= 16) 454 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 455 } 456 457 static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic, 458 struct rds_ib_send_work *send, 459 bool notify) 460 { 461 /* 462 * We want to delay signaling completions just enough to get 463 * the batching benefits but not so much that we create dead time 464 * on the wire. 465 */ 466 if (ic->i_unsignaled_wrs-- == 0 || notify) { 467 ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs; 468 send->s_wr.send_flags |= IB_SEND_SIGNALED; 469 return 1; 470 } 471 return 0; 472 } 473 474 /* 475 * This can be called multiple times for a given message. The first time 476 * we see a message we map its scatterlist into the IB device so that 477 * we can provide that mapped address to the IB scatter gather entries 478 * in the IB work requests. We translate the scatterlist into a series 479 * of work requests that fragment the message. These work requests complete 480 * in order so we pass ownership of the message to the completion handler 481 * once we send the final fragment. 482 * 483 * The RDS core uses the c_send_lock to only enter this function once 484 * per connection. This makes sure that the tx ring alloc/unalloc pairs 485 * don't get out of sync and confuse the ring. 486 */ 487 int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, 488 unsigned int hdr_off, unsigned int sg, unsigned int off) 489 { 490 struct rds_ib_connection *ic = conn->c_transport_data; 491 struct ib_device *dev = ic->i_cm_id->device; 492 struct rds_ib_send_work *send = NULL; 493 struct rds_ib_send_work *first; 494 struct rds_ib_send_work *prev; 495 struct ib_send_wr *failed_wr; 496 struct scatterlist *scat; 497 u32 pos; 498 u32 i; 499 u32 work_alloc; 500 u32 credit_alloc = 0; 501 u32 posted; 502 u32 adv_credits = 0; 503 int send_flags = 0; 504 int bytes_sent = 0; 505 int ret; 506 int flow_controlled = 0; 507 int nr_sig = 0; 508 509 BUG_ON(off % RDS_FRAG_SIZE); 510 BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header)); 511 512 /* Do not send cong updates to IB loopback */ 513 if (conn->c_loopback 514 && rm->m_inc.i_hdr.h_flags & RDS_FLAG_CONG_BITMAP) { 515 rds_cong_map_updated(conn->c_fcong, ~(u64) 0); 516 scat = &rm->data.op_sg[sg]; 517 ret = max_t(int, RDS_CONG_MAP_BYTES, scat->length); 518 return sizeof(struct rds_header) + ret; 519 } 520 521 /* FIXME we may overallocate here */ 522 if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0) 523 i = 1; 524 else 525 i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE); 526 527 work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos); 528 if (work_alloc == 0) { 529 set_bit(RDS_LL_SEND_FULL, &conn->c_flags); 530 rds_ib_stats_inc(s_ib_tx_ring_full); 531 ret = -ENOMEM; 532 goto out; 533 } 534 535 if (ic->i_flowctl) { 536 credit_alloc = rds_ib_send_grab_credits(ic, work_alloc, &posted, 0, RDS_MAX_ADV_CREDIT); 537 adv_credits += posted; 538 if (credit_alloc < work_alloc) { 539 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - credit_alloc); 540 work_alloc = credit_alloc; 541 flow_controlled = 1; 542 } 543 if (work_alloc == 0) { 544 set_bit(RDS_LL_SEND_FULL, &conn->c_flags); 545 rds_ib_stats_inc(s_ib_tx_throttle); 546 ret = -ENOMEM; 547 goto out; 548 } 549 } 550 551 /* map the message the first time we see it */ 552 if (!ic->i_data_op) { 553 if (rm->data.op_nents) { 554 rm->data.op_count = ib_dma_map_sg(dev, 555 rm->data.op_sg, 556 rm->data.op_nents, 557 DMA_TO_DEVICE); 558 rdsdebug("ic %p mapping rm %p: %d\n", ic, rm, rm->data.op_count); 559 if (rm->data.op_count == 0) { 560 rds_ib_stats_inc(s_ib_tx_sg_mapping_failure); 561 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 562 ret = -ENOMEM; /* XXX ? */ 563 goto out; 564 } 565 } else { 566 rm->data.op_count = 0; 567 } 568 569 rds_message_addref(rm); 570 rm->data.op_dmasg = 0; 571 rm->data.op_dmaoff = 0; 572 ic->i_data_op = &rm->data; 573 574 /* Finalize the header */ 575 if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags)) 576 rm->m_inc.i_hdr.h_flags |= RDS_FLAG_ACK_REQUIRED; 577 if (test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) 578 rm->m_inc.i_hdr.h_flags |= RDS_FLAG_RETRANSMITTED; 579 580 /* If it has a RDMA op, tell the peer we did it. This is 581 * used by the peer to release use-once RDMA MRs. */ 582 if (rm->rdma.op_active) { 583 struct rds_ext_header_rdma ext_hdr; 584 585 ext_hdr.h_rdma_rkey = cpu_to_be32(rm->rdma.op_rkey); 586 rds_message_add_extension(&rm->m_inc.i_hdr, 587 RDS_EXTHDR_RDMA, &ext_hdr, sizeof(ext_hdr)); 588 } 589 if (rm->m_rdma_cookie) { 590 rds_message_add_rdma_dest_extension(&rm->m_inc.i_hdr, 591 rds_rdma_cookie_key(rm->m_rdma_cookie), 592 rds_rdma_cookie_offset(rm->m_rdma_cookie)); 593 } 594 595 /* Note - rds_ib_piggyb_ack clears the ACK_REQUIRED bit, so 596 * we should not do this unless we have a chance of at least 597 * sticking the header into the send ring. Which is why we 598 * should call rds_ib_ring_alloc first. */ 599 rm->m_inc.i_hdr.h_ack = cpu_to_be64(rds_ib_piggyb_ack(ic)); 600 rds_message_make_checksum(&rm->m_inc.i_hdr); 601 602 /* 603 * Update adv_credits since we reset the ACK_REQUIRED bit. 604 */ 605 if (ic->i_flowctl) { 606 rds_ib_send_grab_credits(ic, 0, &posted, 1, RDS_MAX_ADV_CREDIT - adv_credits); 607 adv_credits += posted; 608 BUG_ON(adv_credits > 255); 609 } 610 } 611 612 /* Sometimes you want to put a fence between an RDMA 613 * READ and the following SEND. 614 * We could either do this all the time 615 * or when requested by the user. Right now, we let 616 * the application choose. 617 */ 618 if (rm->rdma.op_active && rm->rdma.op_fence) 619 send_flags = IB_SEND_FENCE; 620 621 /* Each frag gets a header. Msgs may be 0 bytes */ 622 send = &ic->i_sends[pos]; 623 first = send; 624 prev = NULL; 625 scat = &ic->i_data_op->op_sg[rm->data.op_dmasg]; 626 i = 0; 627 do { 628 unsigned int len = 0; 629 630 /* Set up the header */ 631 send->s_wr.send_flags = send_flags; 632 send->s_wr.opcode = IB_WR_SEND; 633 send->s_wr.num_sge = 1; 634 send->s_wr.next = NULL; 635 send->s_queued = jiffies; 636 send->s_op = NULL; 637 638 send->s_sge[0].addr = ic->i_send_hdrs_dma 639 + (pos * sizeof(struct rds_header)); 640 send->s_sge[0].length = sizeof(struct rds_header); 641 642 memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header)); 643 644 /* Set up the data, if present */ 645 if (i < work_alloc 646 && scat != &rm->data.op_sg[rm->data.op_count]) { 647 len = min(RDS_FRAG_SIZE, 648 ib_sg_dma_len(dev, scat) - rm->data.op_dmaoff); 649 send->s_wr.num_sge = 2; 650 651 send->s_sge[1].addr = ib_sg_dma_address(dev, scat); 652 send->s_sge[1].addr += rm->data.op_dmaoff; 653 send->s_sge[1].length = len; 654 655 bytes_sent += len; 656 rm->data.op_dmaoff += len; 657 if (rm->data.op_dmaoff == ib_sg_dma_len(dev, scat)) { 658 scat++; 659 rm->data.op_dmasg++; 660 rm->data.op_dmaoff = 0; 661 } 662 } 663 664 rds_ib_set_wr_signal_state(ic, send, false); 665 666 /* 667 * Always signal the last one if we're stopping due to flow control. 668 */ 669 if (ic->i_flowctl && flow_controlled && i == (work_alloc - 1)) { 670 rds_ib_set_wr_signal_state(ic, send, true); 671 send->s_wr.send_flags |= IB_SEND_SOLICITED; 672 } 673 674 if (send->s_wr.send_flags & IB_SEND_SIGNALED) 675 nr_sig++; 676 677 rdsdebug("send %p wr %p num_sge %u next %p\n", send, 678 &send->s_wr, send->s_wr.num_sge, send->s_wr.next); 679 680 if (ic->i_flowctl && adv_credits) { 681 struct rds_header *hdr = &ic->i_send_hdrs[pos]; 682 683 /* add credit and redo the header checksum */ 684 hdr->h_credit = adv_credits; 685 rds_message_make_checksum(hdr); 686 adv_credits = 0; 687 rds_ib_stats_inc(s_ib_tx_credit_updates); 688 } 689 690 if (prev) 691 prev->s_wr.next = &send->s_wr; 692 prev = send; 693 694 pos = (pos + 1) % ic->i_send_ring.w_nr; 695 send = &ic->i_sends[pos]; 696 i++; 697 698 } while (i < work_alloc 699 && scat != &rm->data.op_sg[rm->data.op_count]); 700 701 /* Account the RDS header in the number of bytes we sent, but just once. 702 * The caller has no concept of fragmentation. */ 703 if (hdr_off == 0) 704 bytes_sent += sizeof(struct rds_header); 705 706 /* if we finished the message then send completion owns it */ 707 if (scat == &rm->data.op_sg[rm->data.op_count]) { 708 prev->s_op = ic->i_data_op; 709 prev->s_wr.send_flags |= IB_SEND_SOLICITED; 710 if (!(prev->s_wr.send_flags & IB_SEND_SIGNALED)) 711 nr_sig += rds_ib_set_wr_signal_state(ic, prev, true); 712 ic->i_data_op = NULL; 713 } 714 715 /* Put back wrs & credits we didn't use */ 716 if (i < work_alloc) { 717 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i); 718 work_alloc = i; 719 } 720 if (ic->i_flowctl && i < credit_alloc) 721 rds_ib_send_add_credits(conn, credit_alloc - i); 722 723 if (nr_sig) 724 atomic_add(nr_sig, &ic->i_signaled_sends); 725 726 /* XXX need to worry about failed_wr and partial sends. */ 727 failed_wr = &first->s_wr; 728 ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr); 729 rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic, 730 first, &first->s_wr, ret, failed_wr); 731 BUG_ON(failed_wr != &first->s_wr); 732 if (ret) { 733 printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 " 734 "returned %d\n", &conn->c_faddr, ret); 735 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 736 rds_ib_sub_signaled(ic, nr_sig); 737 if (prev->s_op) { 738 ic->i_data_op = prev->s_op; 739 prev->s_op = NULL; 740 } 741 742 rds_ib_conn_error(ic->conn, "ib_post_send failed\n"); 743 goto out; 744 } 745 746 ret = bytes_sent; 747 out: 748 BUG_ON(adv_credits); 749 return ret; 750 } 751 752 /* 753 * Issue atomic operation. 754 * A simplified version of the rdma case, we always map 1 SG, and 755 * only 8 bytes, for the return value from the atomic operation. 756 */ 757 int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) 758 { 759 struct rds_ib_connection *ic = conn->c_transport_data; 760 struct rds_ib_send_work *send = NULL; 761 struct ib_send_wr *failed_wr; 762 struct rds_ib_device *rds_ibdev; 763 u32 pos; 764 u32 work_alloc; 765 int ret; 766 int nr_sig = 0; 767 768 rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client); 769 770 work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1, &pos); 771 if (work_alloc != 1) { 772 rds_ib_stats_inc(s_ib_tx_ring_full); 773 ret = -ENOMEM; 774 goto out; 775 } 776 777 /* address of send request in ring */ 778 send = &ic->i_sends[pos]; 779 send->s_queued = jiffies; 780 781 if (op->op_type == RDS_ATOMIC_TYPE_CSWP) { 782 send->s_atomic_wr.wr.opcode = IB_WR_MASKED_ATOMIC_CMP_AND_SWP; 783 send->s_atomic_wr.compare_add = op->op_m_cswp.compare; 784 send->s_atomic_wr.swap = op->op_m_cswp.swap; 785 send->s_atomic_wr.compare_add_mask = op->op_m_cswp.compare_mask; 786 send->s_atomic_wr.swap_mask = op->op_m_cswp.swap_mask; 787 } else { /* FADD */ 788 send->s_atomic_wr.wr.opcode = IB_WR_MASKED_ATOMIC_FETCH_AND_ADD; 789 send->s_atomic_wr.compare_add = op->op_m_fadd.add; 790 send->s_atomic_wr.swap = 0; 791 send->s_atomic_wr.compare_add_mask = op->op_m_fadd.nocarry_mask; 792 send->s_atomic_wr.swap_mask = 0; 793 } 794 send->s_wr.send_flags = 0; 795 nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify); 796 send->s_atomic_wr.wr.num_sge = 1; 797 send->s_atomic_wr.wr.next = NULL; 798 send->s_atomic_wr.remote_addr = op->op_remote_addr; 799 send->s_atomic_wr.rkey = op->op_rkey; 800 send->s_op = op; 801 rds_message_addref(container_of(send->s_op, struct rds_message, atomic)); 802 803 /* map 8 byte retval buffer to the device */ 804 ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE); 805 rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret); 806 if (ret != 1) { 807 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 808 rds_ib_stats_inc(s_ib_tx_sg_mapping_failure); 809 ret = -ENOMEM; /* XXX ? */ 810 goto out; 811 } 812 813 /* Convert our struct scatterlist to struct ib_sge */ 814 send->s_sge[0].addr = ib_sg_dma_address(ic->i_cm_id->device, op->op_sg); 815 send->s_sge[0].length = ib_sg_dma_len(ic->i_cm_id->device, op->op_sg); 816 send->s_sge[0].lkey = ic->i_pd->local_dma_lkey; 817 818 rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr, 819 send->s_sge[0].addr, send->s_sge[0].length); 820 821 if (nr_sig) 822 atomic_add(nr_sig, &ic->i_signaled_sends); 823 824 failed_wr = &send->s_atomic_wr.wr; 825 ret = ib_post_send(ic->i_cm_id->qp, &send->s_atomic_wr.wr, &failed_wr); 826 rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic, 827 send, &send->s_atomic_wr, ret, failed_wr); 828 BUG_ON(failed_wr != &send->s_atomic_wr.wr); 829 if (ret) { 830 printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 " 831 "returned %d\n", &conn->c_faddr, ret); 832 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 833 rds_ib_sub_signaled(ic, nr_sig); 834 goto out; 835 } 836 837 if (unlikely(failed_wr != &send->s_atomic_wr.wr)) { 838 printk(KERN_WARNING "RDS/IB: atomic ib_post_send() rc=%d, but failed_wqe updated!\n", ret); 839 BUG_ON(failed_wr != &send->s_atomic_wr.wr); 840 } 841 842 out: 843 return ret; 844 } 845 846 int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) 847 { 848 struct rds_ib_connection *ic = conn->c_transport_data; 849 struct rds_ib_send_work *send = NULL; 850 struct rds_ib_send_work *first; 851 struct rds_ib_send_work *prev; 852 struct ib_send_wr *failed_wr; 853 struct scatterlist *scat; 854 unsigned long len; 855 u64 remote_addr = op->op_remote_addr; 856 u32 max_sge = ic->rds_ibdev->max_sge; 857 u32 pos; 858 u32 work_alloc; 859 u32 i; 860 u32 j; 861 int sent; 862 int ret; 863 int num_sge; 864 int nr_sig = 0; 865 866 /* map the op the first time we see it */ 867 if (!op->op_mapped) { 868 op->op_count = ib_dma_map_sg(ic->i_cm_id->device, 869 op->op_sg, op->op_nents, (op->op_write) ? 870 DMA_TO_DEVICE : DMA_FROM_DEVICE); 871 rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->op_count); 872 if (op->op_count == 0) { 873 rds_ib_stats_inc(s_ib_tx_sg_mapping_failure); 874 ret = -ENOMEM; /* XXX ? */ 875 goto out; 876 } 877 878 op->op_mapped = 1; 879 } 880 881 /* 882 * Instead of knowing how to return a partial rdma read/write we insist that there 883 * be enough work requests to send the entire message. 884 */ 885 i = ceil(op->op_count, max_sge); 886 887 work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, i, &pos); 888 if (work_alloc != i) { 889 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 890 rds_ib_stats_inc(s_ib_tx_ring_full); 891 ret = -ENOMEM; 892 goto out; 893 } 894 895 send = &ic->i_sends[pos]; 896 first = send; 897 prev = NULL; 898 scat = &op->op_sg[0]; 899 sent = 0; 900 num_sge = op->op_count; 901 902 for (i = 0; i < work_alloc && scat != &op->op_sg[op->op_count]; i++) { 903 send->s_wr.send_flags = 0; 904 send->s_queued = jiffies; 905 send->s_op = NULL; 906 907 nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify); 908 909 send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ; 910 send->s_rdma_wr.remote_addr = remote_addr; 911 send->s_rdma_wr.rkey = op->op_rkey; 912 913 if (num_sge > max_sge) { 914 send->s_rdma_wr.wr.num_sge = max_sge; 915 num_sge -= max_sge; 916 } else { 917 send->s_rdma_wr.wr.num_sge = num_sge; 918 } 919 920 send->s_rdma_wr.wr.next = NULL; 921 922 if (prev) 923 prev->s_rdma_wr.wr.next = &send->s_rdma_wr.wr; 924 925 for (j = 0; j < send->s_rdma_wr.wr.num_sge && 926 scat != &op->op_sg[op->op_count]; j++) { 927 len = ib_sg_dma_len(ic->i_cm_id->device, scat); 928 send->s_sge[j].addr = 929 ib_sg_dma_address(ic->i_cm_id->device, scat); 930 send->s_sge[j].length = len; 931 send->s_sge[j].lkey = ic->i_pd->local_dma_lkey; 932 933 sent += len; 934 rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr); 935 936 remote_addr += len; 937 scat++; 938 } 939 940 rdsdebug("send %p wr %p num_sge %u next %p\n", send, 941 &send->s_rdma_wr.wr, 942 send->s_rdma_wr.wr.num_sge, 943 send->s_rdma_wr.wr.next); 944 945 prev = send; 946 if (++send == &ic->i_sends[ic->i_send_ring.w_nr]) 947 send = ic->i_sends; 948 } 949 950 /* give a reference to the last op */ 951 if (scat == &op->op_sg[op->op_count]) { 952 prev->s_op = op; 953 rds_message_addref(container_of(op, struct rds_message, rdma)); 954 } 955 956 if (i < work_alloc) { 957 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i); 958 work_alloc = i; 959 } 960 961 if (nr_sig) 962 atomic_add(nr_sig, &ic->i_signaled_sends); 963 964 failed_wr = &first->s_rdma_wr.wr; 965 ret = ib_post_send(ic->i_cm_id->qp, &first->s_rdma_wr.wr, &failed_wr); 966 rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic, 967 first, &first->s_rdma_wr.wr, ret, failed_wr); 968 BUG_ON(failed_wr != &first->s_rdma_wr.wr); 969 if (ret) { 970 printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 " 971 "returned %d\n", &conn->c_faddr, ret); 972 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 973 rds_ib_sub_signaled(ic, nr_sig); 974 goto out; 975 } 976 977 if (unlikely(failed_wr != &first->s_rdma_wr.wr)) { 978 printk(KERN_WARNING "RDS/IB: ib_post_send() rc=%d, but failed_wqe updated!\n", ret); 979 BUG_ON(failed_wr != &first->s_rdma_wr.wr); 980 } 981 982 983 out: 984 return ret; 985 } 986 987 void rds_ib_xmit_path_complete(struct rds_conn_path *cp) 988 { 989 struct rds_connection *conn = cp->cp_conn; 990 struct rds_ib_connection *ic = conn->c_transport_data; 991 992 /* We may have a pending ACK or window update we were unable 993 * to send previously (due to flow control). Try again. */ 994 rds_ib_attempt_ack(ic); 995 } 996