1 /* 2 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 * 39 * Author: Tom Tucker <tom@opengridcomputing.com> 40 */ 41 42 #include <linux/sunrpc/debug.h> 43 #include <linux/sunrpc/rpc_rdma.h> 44 #include <linux/spinlock.h> 45 #include <asm/unaligned.h> 46 #include <rdma/ib_verbs.h> 47 #include <rdma/rdma_cm.h> 48 #include <linux/sunrpc/svc_rdma.h> 49 50 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 51 52 /* Encode an XDR as an array of IB SGE 53 * 54 * Assumptions: 55 * - head[0] is physically contiguous. 56 * - tail[0] is physically contiguous. 57 * - pages[] is not physically or virtually contigous and consists of 58 * PAGE_SIZE elements. 59 * 60 * Output: 61 * SGE[0] reserved for RCPRDMA header 62 * SGE[1] data from xdr->head[] 63 * SGE[2..sge_count-2] data from xdr->pages[] 64 * SGE[sge_count-1] data from xdr->tail. 65 * 66 * The max SGE we need is the length of the XDR / pagesize + one for 67 * head + one for tail + one for RPCRDMA header. Since RPCSVC_MAXPAGES 68 * reserves a page for both the request and the reply header, and this 69 * array is only concerned with the reply we are assured that we have 70 * on extra page for the RPCRMDA header. 71 */ 72 static void xdr_to_sge(struct svcxprt_rdma *xprt, 73 struct xdr_buf *xdr, 74 struct svc_rdma_req_map *vec) 75 { 76 int sge_max = (xdr->len+PAGE_SIZE-1) / PAGE_SIZE + 3; 77 int sge_no; 78 u32 sge_bytes; 79 u32 page_bytes; 80 u32 page_off; 81 int page_no; 82 83 BUG_ON(xdr->len != 84 (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)); 85 86 /* Skip the first sge, this is for the RPCRDMA header */ 87 sge_no = 1; 88 89 /* Head SGE */ 90 vec->sge[sge_no].iov_base = xdr->head[0].iov_base; 91 vec->sge[sge_no].iov_len = xdr->head[0].iov_len; 92 sge_no++; 93 94 /* pages SGE */ 95 page_no = 0; 96 page_bytes = xdr->page_len; 97 page_off = xdr->page_base; 98 while (page_bytes) { 99 vec->sge[sge_no].iov_base = 100 page_address(xdr->pages[page_no]) + page_off; 101 sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off)); 102 page_bytes -= sge_bytes; 103 vec->sge[sge_no].iov_len = sge_bytes; 104 105 sge_no++; 106 page_no++; 107 page_off = 0; /* reset for next time through loop */ 108 } 109 110 /* Tail SGE */ 111 if (xdr->tail[0].iov_len) { 112 vec->sge[sge_no].iov_base = xdr->tail[0].iov_base; 113 vec->sge[sge_no].iov_len = xdr->tail[0].iov_len; 114 sge_no++; 115 } 116 117 BUG_ON(sge_no > sge_max); 118 vec->count = sge_no; 119 } 120 121 /* Assumptions: 122 * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE 123 */ 124 static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, 125 u32 rmr, u64 to, 126 u32 xdr_off, int write_len, 127 struct svc_rdma_req_map *vec) 128 { 129 struct ib_send_wr write_wr; 130 struct ib_sge *sge; 131 int xdr_sge_no; 132 int sge_no; 133 int sge_bytes; 134 int sge_off; 135 int bc; 136 struct svc_rdma_op_ctxt *ctxt; 137 138 BUG_ON(vec->count > RPCSVC_MAXPAGES); 139 dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, " 140 "write_len=%d, vec->sge=%p, vec->count=%lu\n", 141 rmr, (unsigned long long)to, xdr_off, 142 write_len, vec->sge, vec->count); 143 144 ctxt = svc_rdma_get_context(xprt); 145 ctxt->direction = DMA_TO_DEVICE; 146 sge = ctxt->sge; 147 148 /* Find the SGE associated with xdr_off */ 149 for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count; 150 xdr_sge_no++) { 151 if (vec->sge[xdr_sge_no].iov_len > bc) 152 break; 153 bc -= vec->sge[xdr_sge_no].iov_len; 154 } 155 156 sge_off = bc; 157 bc = write_len; 158 sge_no = 0; 159 160 /* Copy the remaining SGE */ 161 while (bc != 0 && xdr_sge_no < vec->count) { 162 sge[sge_no].lkey = xprt->sc_phys_mr->lkey; 163 sge_bytes = min((size_t)bc, 164 (size_t)(vec->sge[xdr_sge_no].iov_len-sge_off)); 165 sge[sge_no].length = sge_bytes; 166 atomic_inc(&xprt->sc_dma_used); 167 sge[sge_no].addr = 168 ib_dma_map_single(xprt->sc_cm_id->device, 169 (void *) 170 vec->sge[xdr_sge_no].iov_base + sge_off, 171 sge_bytes, DMA_TO_DEVICE); 172 if (dma_mapping_error(xprt->sc_cm_id->device->dma_device, 173 sge[sge_no].addr)) 174 goto err; 175 sge_off = 0; 176 sge_no++; 177 ctxt->count++; 178 xdr_sge_no++; 179 bc -= sge_bytes; 180 } 181 182 BUG_ON(bc != 0); 183 BUG_ON(xdr_sge_no > vec->count); 184 185 /* Prepare WRITE WR */ 186 memset(&write_wr, 0, sizeof write_wr); 187 ctxt->wr_op = IB_WR_RDMA_WRITE; 188 write_wr.wr_id = (unsigned long)ctxt; 189 write_wr.sg_list = &sge[0]; 190 write_wr.num_sge = sge_no; 191 write_wr.opcode = IB_WR_RDMA_WRITE; 192 write_wr.send_flags = IB_SEND_SIGNALED; 193 write_wr.wr.rdma.rkey = rmr; 194 write_wr.wr.rdma.remote_addr = to; 195 196 /* Post It */ 197 atomic_inc(&rdma_stat_write); 198 if (svc_rdma_send(xprt, &write_wr)) 199 goto err; 200 return 0; 201 err: 202 svc_rdma_put_context(ctxt, 0); 203 /* Fatal error, close transport */ 204 return -EIO; 205 } 206 207 static int send_write_chunks(struct svcxprt_rdma *xprt, 208 struct rpcrdma_msg *rdma_argp, 209 struct rpcrdma_msg *rdma_resp, 210 struct svc_rqst *rqstp, 211 struct svc_rdma_req_map *vec) 212 { 213 u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len; 214 int write_len; 215 int max_write; 216 u32 xdr_off; 217 int chunk_off; 218 int chunk_no; 219 struct rpcrdma_write_array *arg_ary; 220 struct rpcrdma_write_array *res_ary; 221 int ret; 222 223 arg_ary = svc_rdma_get_write_array(rdma_argp); 224 if (!arg_ary) 225 return 0; 226 res_ary = (struct rpcrdma_write_array *) 227 &rdma_resp->rm_body.rm_chunks[1]; 228 229 max_write = xprt->sc_max_sge * PAGE_SIZE; 230 231 /* Write chunks start at the pagelist */ 232 for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0; 233 xfer_len && chunk_no < arg_ary->wc_nchunks; 234 chunk_no++) { 235 struct rpcrdma_segment *arg_ch; 236 u64 rs_offset; 237 238 arg_ch = &arg_ary->wc_array[chunk_no].wc_target; 239 write_len = min(xfer_len, arg_ch->rs_length); 240 241 /* Prepare the response chunk given the length actually 242 * written */ 243 rs_offset = get_unaligned(&(arg_ch->rs_offset)); 244 svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no, 245 arg_ch->rs_handle, 246 rs_offset, 247 write_len); 248 chunk_off = 0; 249 while (write_len) { 250 int this_write; 251 this_write = min(write_len, max_write); 252 ret = send_write(xprt, rqstp, 253 arg_ch->rs_handle, 254 rs_offset + chunk_off, 255 xdr_off, 256 this_write, 257 vec); 258 if (ret) { 259 dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n", 260 ret); 261 return -EIO; 262 } 263 chunk_off += this_write; 264 xdr_off += this_write; 265 xfer_len -= this_write; 266 write_len -= this_write; 267 } 268 } 269 /* Update the req with the number of chunks actually used */ 270 svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no); 271 272 return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len; 273 } 274 275 static int send_reply_chunks(struct svcxprt_rdma *xprt, 276 struct rpcrdma_msg *rdma_argp, 277 struct rpcrdma_msg *rdma_resp, 278 struct svc_rqst *rqstp, 279 struct svc_rdma_req_map *vec) 280 { 281 u32 xfer_len = rqstp->rq_res.len; 282 int write_len; 283 int max_write; 284 u32 xdr_off; 285 int chunk_no; 286 int chunk_off; 287 struct rpcrdma_segment *ch; 288 struct rpcrdma_write_array *arg_ary; 289 struct rpcrdma_write_array *res_ary; 290 int ret; 291 292 arg_ary = svc_rdma_get_reply_array(rdma_argp); 293 if (!arg_ary) 294 return 0; 295 /* XXX: need to fix when reply lists occur with read-list and or 296 * write-list */ 297 res_ary = (struct rpcrdma_write_array *) 298 &rdma_resp->rm_body.rm_chunks[2]; 299 300 max_write = xprt->sc_max_sge * PAGE_SIZE; 301 302 /* xdr offset starts at RPC message */ 303 for (xdr_off = 0, chunk_no = 0; 304 xfer_len && chunk_no < arg_ary->wc_nchunks; 305 chunk_no++) { 306 u64 rs_offset; 307 ch = &arg_ary->wc_array[chunk_no].wc_target; 308 write_len = min(xfer_len, ch->rs_length); 309 310 311 /* Prepare the reply chunk given the length actually 312 * written */ 313 rs_offset = get_unaligned(&(ch->rs_offset)); 314 svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no, 315 ch->rs_handle, rs_offset, 316 write_len); 317 chunk_off = 0; 318 while (write_len) { 319 int this_write; 320 321 this_write = min(write_len, max_write); 322 ret = send_write(xprt, rqstp, 323 ch->rs_handle, 324 rs_offset + chunk_off, 325 xdr_off, 326 this_write, 327 vec); 328 if (ret) { 329 dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n", 330 ret); 331 return -EIO; 332 } 333 chunk_off += this_write; 334 xdr_off += this_write; 335 xfer_len -= this_write; 336 write_len -= this_write; 337 } 338 } 339 /* Update the req with the number of chunks actually used */ 340 svc_rdma_xdr_encode_reply_array(res_ary, chunk_no); 341 342 return rqstp->rq_res.len; 343 } 344 345 /* This function prepares the portion of the RPCRDMA message to be 346 * sent in the RDMA_SEND. This function is called after data sent via 347 * RDMA has already been transmitted. There are three cases: 348 * - The RPCRDMA header, RPC header, and payload are all sent in a 349 * single RDMA_SEND. This is the "inline" case. 350 * - The RPCRDMA header and some portion of the RPC header and data 351 * are sent via this RDMA_SEND and another portion of the data is 352 * sent via RDMA. 353 * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC 354 * header and data are all transmitted via RDMA. 355 * In all three cases, this function prepares the RPCRDMA header in 356 * sge[0], the 'type' parameter indicates the type to place in the 357 * RPCRDMA header, and the 'byte_count' field indicates how much of 358 * the XDR to include in this RDMA_SEND. 359 */ 360 static int send_reply(struct svcxprt_rdma *rdma, 361 struct svc_rqst *rqstp, 362 struct page *page, 363 struct rpcrdma_msg *rdma_resp, 364 struct svc_rdma_op_ctxt *ctxt, 365 struct svc_rdma_req_map *vec, 366 int byte_count) 367 { 368 struct ib_send_wr send_wr; 369 int sge_no; 370 int sge_bytes; 371 int page_no; 372 int ret; 373 374 /* Post a recv buffer to handle another request. */ 375 ret = svc_rdma_post_recv(rdma); 376 if (ret) { 377 printk(KERN_INFO 378 "svcrdma: could not post a receive buffer, err=%d." 379 "Closing transport %p.\n", ret, rdma); 380 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); 381 svc_rdma_put_context(ctxt, 0); 382 return -ENOTCONN; 383 } 384 385 /* Prepare the context */ 386 ctxt->pages[0] = page; 387 ctxt->count = 1; 388 389 /* Prepare the SGE for the RPCRDMA Header */ 390 atomic_inc(&rdma->sc_dma_used); 391 ctxt->sge[0].addr = 392 ib_dma_map_page(rdma->sc_cm_id->device, 393 page, 0, PAGE_SIZE, DMA_TO_DEVICE); 394 ctxt->direction = DMA_TO_DEVICE; 395 ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp); 396 ctxt->sge[0].lkey = rdma->sc_phys_mr->lkey; 397 398 /* Determine how many of our SGE are to be transmitted */ 399 for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) { 400 sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count); 401 byte_count -= sge_bytes; 402 atomic_inc(&rdma->sc_dma_used); 403 ctxt->sge[sge_no].addr = 404 ib_dma_map_single(rdma->sc_cm_id->device, 405 vec->sge[sge_no].iov_base, 406 sge_bytes, DMA_TO_DEVICE); 407 ctxt->sge[sge_no].length = sge_bytes; 408 ctxt->sge[sge_no].lkey = rdma->sc_phys_mr->lkey; 409 } 410 BUG_ON(byte_count != 0); 411 412 /* Save all respages in the ctxt and remove them from the 413 * respages array. They are our pages until the I/O 414 * completes. 415 */ 416 for (page_no = 0; page_no < rqstp->rq_resused; page_no++) { 417 ctxt->pages[page_no+1] = rqstp->rq_respages[page_no]; 418 ctxt->count++; 419 rqstp->rq_respages[page_no] = NULL; 420 /* If there are more pages than SGE, terminate SGE list */ 421 if (page_no+1 >= sge_no) 422 ctxt->sge[page_no+1].length = 0; 423 } 424 BUG_ON(sge_no > rdma->sc_max_sge); 425 memset(&send_wr, 0, sizeof send_wr); 426 ctxt->wr_op = IB_WR_SEND; 427 send_wr.wr_id = (unsigned long)ctxt; 428 send_wr.sg_list = ctxt->sge; 429 send_wr.num_sge = sge_no; 430 send_wr.opcode = IB_WR_SEND; 431 send_wr.send_flags = IB_SEND_SIGNALED; 432 433 ret = svc_rdma_send(rdma, &send_wr); 434 if (ret) 435 svc_rdma_put_context(ctxt, 1); 436 437 return ret; 438 } 439 440 void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp) 441 { 442 } 443 444 /* 445 * Return the start of an xdr buffer. 446 */ 447 static void *xdr_start(struct xdr_buf *xdr) 448 { 449 return xdr->head[0].iov_base - 450 (xdr->len - 451 xdr->page_len - 452 xdr->tail[0].iov_len - 453 xdr->head[0].iov_len); 454 } 455 456 int svc_rdma_sendto(struct svc_rqst *rqstp) 457 { 458 struct svc_xprt *xprt = rqstp->rq_xprt; 459 struct svcxprt_rdma *rdma = 460 container_of(xprt, struct svcxprt_rdma, sc_xprt); 461 struct rpcrdma_msg *rdma_argp; 462 struct rpcrdma_msg *rdma_resp; 463 struct rpcrdma_write_array *reply_ary; 464 enum rpcrdma_proc reply_type; 465 int ret; 466 int inline_bytes; 467 struct page *res_page; 468 struct svc_rdma_op_ctxt *ctxt; 469 struct svc_rdma_req_map *vec; 470 471 dprintk("svcrdma: sending response for rqstp=%p\n", rqstp); 472 473 /* Get the RDMA request header. */ 474 rdma_argp = xdr_start(&rqstp->rq_arg); 475 476 /* Build an req vec for the XDR */ 477 ctxt = svc_rdma_get_context(rdma); 478 ctxt->direction = DMA_TO_DEVICE; 479 vec = svc_rdma_get_req_map(); 480 xdr_to_sge(rdma, &rqstp->rq_res, vec); 481 482 inline_bytes = rqstp->rq_res.len; 483 484 /* Create the RDMA response header */ 485 res_page = svc_rdma_get_page(); 486 rdma_resp = page_address(res_page); 487 reply_ary = svc_rdma_get_reply_array(rdma_argp); 488 if (reply_ary) 489 reply_type = RDMA_NOMSG; 490 else 491 reply_type = RDMA_MSG; 492 svc_rdma_xdr_encode_reply_header(rdma, rdma_argp, 493 rdma_resp, reply_type); 494 495 /* Send any write-chunk data and build resp write-list */ 496 ret = send_write_chunks(rdma, rdma_argp, rdma_resp, 497 rqstp, vec); 498 if (ret < 0) { 499 printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n", 500 ret); 501 goto error; 502 } 503 inline_bytes -= ret; 504 505 /* Send any reply-list data and update resp reply-list */ 506 ret = send_reply_chunks(rdma, rdma_argp, rdma_resp, 507 rqstp, vec); 508 if (ret < 0) { 509 printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n", 510 ret); 511 goto error; 512 } 513 inline_bytes -= ret; 514 515 ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, vec, 516 inline_bytes); 517 svc_rdma_put_req_map(vec); 518 dprintk("svcrdma: send_reply returns %d\n", ret); 519 return ret; 520 error: 521 svc_rdma_put_req_map(vec); 522 svc_rdma_put_context(ctxt, 0); 523 put_page(res_page); 524 return ret; 525 } 526