1 /* 2 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 * 39 * Author: Tom Tucker <tom@opengridcomputing.com> 40 */ 41 42 #include <linux/sunrpc/debug.h> 43 #include <linux/sunrpc/rpc_rdma.h> 44 #include <linux/spinlock.h> 45 #include <asm/unaligned.h> 46 #include <rdma/ib_verbs.h> 47 #include <rdma/rdma_cm.h> 48 #include <linux/sunrpc/svc_rdma.h> 49 50 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 51 52 /* 53 * Replace the pages in the rq_argpages array with the pages from the SGE in 54 * the RDMA_RECV completion. The SGL should contain full pages up until the 55 * last one. 56 */ 57 static void rdma_build_arg_xdr(struct svc_rqst *rqstp, 58 struct svc_rdma_op_ctxt *ctxt, 59 u32 byte_count) 60 { 61 struct page *page; 62 u32 bc; 63 int sge_no; 64 65 /* Swap the page in the SGE with the page in argpages */ 66 page = ctxt->pages[0]; 67 put_page(rqstp->rq_pages[0]); 68 rqstp->rq_pages[0] = page; 69 70 /* Set up the XDR head */ 71 rqstp->rq_arg.head[0].iov_base = page_address(page); 72 rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length); 73 rqstp->rq_arg.len = byte_count; 74 rqstp->rq_arg.buflen = byte_count; 75 76 /* Compute bytes past head in the SGL */ 77 bc = byte_count - rqstp->rq_arg.head[0].iov_len; 78 79 /* If data remains, store it in the pagelist */ 80 rqstp->rq_arg.page_len = bc; 81 rqstp->rq_arg.page_base = 0; 82 rqstp->rq_arg.pages = &rqstp->rq_pages[1]; 83 sge_no = 1; 84 while (bc && sge_no < ctxt->count) { 85 page = ctxt->pages[sge_no]; 86 put_page(rqstp->rq_pages[sge_no]); 87 rqstp->rq_pages[sge_no] = page; 88 bc -= min(bc, ctxt->sge[sge_no].length); 89 rqstp->rq_arg.buflen += ctxt->sge[sge_no].length; 90 sge_no++; 91 } 92 rqstp->rq_respages = &rqstp->rq_pages[sge_no]; 93 94 /* We should never run out of SGE because the limit is defined to 95 * support the max allowed RPC data length 96 */ 97 BUG_ON(bc && (sge_no == ctxt->count)); 98 BUG_ON((rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len) 99 != byte_count); 100 BUG_ON(rqstp->rq_arg.len != byte_count); 101 102 /* If not all pages were used from the SGL, free the remaining ones */ 103 bc = sge_no; 104 while (sge_no < ctxt->count) { 105 page = ctxt->pages[sge_no++]; 106 put_page(page); 107 } 108 ctxt->count = bc; 109 110 /* Set up tail */ 111 rqstp->rq_arg.tail[0].iov_base = NULL; 112 rqstp->rq_arg.tail[0].iov_len = 0; 113 } 114 115 struct chunk_sge { 116 int start; /* sge no for this chunk */ 117 int count; /* sge count for this chunk */ 118 }; 119 120 /* Encode a read-chunk-list as an array of IB SGE 121 * 122 * Assumptions: 123 * - chunk[0]->position points to pages[0] at an offset of 0 124 * - pages[] is not physically or virtually contigous and consists of 125 * PAGE_SIZE elements. 126 * 127 * Output: 128 * - sge array pointing into pages[] array. 129 * - chunk_sge array specifying sge index and count for each 130 * chunk in the read list 131 * 132 */ 133 static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt, 134 struct svc_rqst *rqstp, 135 struct svc_rdma_op_ctxt *head, 136 struct rpcrdma_msg *rmsgp, 137 struct ib_sge *sge, 138 struct chunk_sge *ch_sge_ary, 139 int ch_count, 140 int byte_count) 141 { 142 int sge_no; 143 int sge_bytes; 144 int page_off; 145 int page_no; 146 int ch_bytes; 147 int ch_no; 148 struct rpcrdma_read_chunk *ch; 149 150 sge_no = 0; 151 page_no = 0; 152 page_off = 0; 153 ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; 154 ch_no = 0; 155 ch_bytes = ch->rc_target.rs_length; 156 head->arg.head[0] = rqstp->rq_arg.head[0]; 157 head->arg.tail[0] = rqstp->rq_arg.tail[0]; 158 head->arg.pages = &head->pages[head->count]; 159 head->sge[0].length = head->count; /* save count of hdr pages */ 160 head->arg.page_base = 0; 161 head->arg.page_len = ch_bytes; 162 head->arg.len = rqstp->rq_arg.len + ch_bytes; 163 head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes; 164 head->count++; 165 ch_sge_ary[0].start = 0; 166 while (byte_count) { 167 sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes); 168 sge[sge_no].addr = 169 ib_dma_map_page(xprt->sc_cm_id->device, 170 rqstp->rq_arg.pages[page_no], 171 page_off, sge_bytes, 172 DMA_FROM_DEVICE); 173 sge[sge_no].length = sge_bytes; 174 sge[sge_no].lkey = xprt->sc_phys_mr->lkey; 175 /* 176 * Don't bump head->count here because the same page 177 * may be used by multiple SGE. 178 */ 179 head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no]; 180 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1]; 181 182 byte_count -= sge_bytes; 183 ch_bytes -= sge_bytes; 184 sge_no++; 185 /* 186 * If all bytes for this chunk have been mapped to an 187 * SGE, move to the next SGE 188 */ 189 if (ch_bytes == 0) { 190 ch_sge_ary[ch_no].count = 191 sge_no - ch_sge_ary[ch_no].start; 192 ch_no++; 193 ch++; 194 ch_sge_ary[ch_no].start = sge_no; 195 ch_bytes = ch->rc_target.rs_length; 196 /* If bytes remaining account for next chunk */ 197 if (byte_count) { 198 head->arg.page_len += ch_bytes; 199 head->arg.len += ch_bytes; 200 head->arg.buflen += ch_bytes; 201 } 202 } 203 /* 204 * If this SGE consumed all of the page, move to the 205 * next page 206 */ 207 if ((sge_bytes + page_off) == PAGE_SIZE) { 208 page_no++; 209 page_off = 0; 210 /* 211 * If there are still bytes left to map, bump 212 * the page count 213 */ 214 if (byte_count) 215 head->count++; 216 } else 217 page_off += sge_bytes; 218 } 219 BUG_ON(byte_count != 0); 220 return sge_no; 221 } 222 223 static void rdma_set_ctxt_sge(struct svc_rdma_op_ctxt *ctxt, 224 struct ib_sge *sge, 225 u64 *sgl_offset, 226 int count) 227 { 228 int i; 229 230 ctxt->count = count; 231 for (i = 0; i < count; i++) { 232 ctxt->sge[i].addr = sge[i].addr; 233 ctxt->sge[i].length = sge[i].length; 234 *sgl_offset = *sgl_offset + sge[i].length; 235 } 236 } 237 238 static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count) 239 { 240 #ifdef RDMA_TRANSPORT_IWARP 241 if ((RDMA_TRANSPORT_IWARP == 242 rdma_node_get_transport(xprt->sc_cm_id-> 243 device->node_type)) 244 && sge_count > 1) 245 return 1; 246 else 247 #endif 248 return min_t(int, sge_count, xprt->sc_max_sge); 249 } 250 251 /* 252 * Use RDMA_READ to read data from the advertised client buffer into the 253 * XDR stream starting at rq_arg.head[0].iov_base. 254 * Each chunk in the array 255 * contains the following fields: 256 * discrim - '1', This isn't used for data placement 257 * position - The xdr stream offset (the same for every chunk) 258 * handle - RMR for client memory region 259 * length - data transfer length 260 * offset - 64 bit tagged offset in remote memory region 261 * 262 * On our side, we need to read into a pagelist. The first page immediately 263 * follows the RPC header. 264 * 265 * This function returns 1 to indicate success. The data is not yet in 266 * the pagelist and therefore the RPC request must be deferred. The 267 * I/O completion will enqueue the transport again and 268 * svc_rdma_recvfrom will complete the request. 269 * 270 * NOTE: The ctxt must not be touched after the last WR has been posted 271 * because the I/O completion processing may occur on another 272 * processor and free / modify the context. Ne touche pas! 273 */ 274 static int rdma_read_xdr(struct svcxprt_rdma *xprt, 275 struct rpcrdma_msg *rmsgp, 276 struct svc_rqst *rqstp, 277 struct svc_rdma_op_ctxt *hdr_ctxt) 278 { 279 struct ib_send_wr read_wr; 280 int err = 0; 281 int ch_no; 282 struct ib_sge *sge; 283 int ch_count; 284 int byte_count; 285 int sge_count; 286 u64 sgl_offset; 287 struct rpcrdma_read_chunk *ch; 288 struct svc_rdma_op_ctxt *ctxt = NULL; 289 struct svc_rdma_op_ctxt *head; 290 struct svc_rdma_op_ctxt *tmp_sge_ctxt; 291 struct svc_rdma_op_ctxt *tmp_ch_ctxt; 292 struct chunk_sge *ch_sge_ary; 293 294 /* If no read list is present, return 0 */ 295 ch = svc_rdma_get_read_chunk(rmsgp); 296 if (!ch) 297 return 0; 298 299 /* Allocate temporary contexts to keep SGE */ 300 BUG_ON(sizeof(struct ib_sge) < sizeof(struct chunk_sge)); 301 tmp_sge_ctxt = svc_rdma_get_context(xprt); 302 sge = tmp_sge_ctxt->sge; 303 tmp_ch_ctxt = svc_rdma_get_context(xprt); 304 ch_sge_ary = (struct chunk_sge *)tmp_ch_ctxt->sge; 305 306 svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count); 307 sge_count = rdma_rcl_to_sge(xprt, rqstp, hdr_ctxt, rmsgp, 308 sge, ch_sge_ary, 309 ch_count, byte_count); 310 head = svc_rdma_get_context(xprt); 311 sgl_offset = 0; 312 ch_no = 0; 313 314 for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; 315 ch->rc_discrim != 0; ch++, ch_no++) { 316 next_sge: 317 if (!ctxt) 318 ctxt = head; 319 else { 320 ctxt->next = svc_rdma_get_context(xprt); 321 ctxt = ctxt->next; 322 } 323 ctxt->next = NULL; 324 ctxt->direction = DMA_FROM_DEVICE; 325 clear_bit(RDMACTXT_F_READ_DONE, &ctxt->flags); 326 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 327 if ((ch+1)->rc_discrim == 0) { 328 /* 329 * Checked in sq_cq_reap to see if we need to 330 * be enqueued 331 */ 332 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 333 ctxt->next = hdr_ctxt; 334 hdr_ctxt->next = head; 335 } 336 337 /* Prepare READ WR */ 338 memset(&read_wr, 0, sizeof read_wr); 339 ctxt->wr_op = IB_WR_RDMA_READ; 340 read_wr.wr_id = (unsigned long)ctxt; 341 read_wr.opcode = IB_WR_RDMA_READ; 342 read_wr.send_flags = IB_SEND_SIGNALED; 343 read_wr.wr.rdma.rkey = ch->rc_target.rs_handle; 344 read_wr.wr.rdma.remote_addr = 345 get_unaligned(&(ch->rc_target.rs_offset)) + 346 sgl_offset; 347 read_wr.sg_list = &sge[ch_sge_ary[ch_no].start]; 348 read_wr.num_sge = 349 rdma_read_max_sge(xprt, ch_sge_ary[ch_no].count); 350 rdma_set_ctxt_sge(ctxt, &sge[ch_sge_ary[ch_no].start], 351 &sgl_offset, 352 read_wr.num_sge); 353 354 /* Post the read */ 355 err = svc_rdma_send(xprt, &read_wr); 356 if (err) { 357 printk(KERN_ERR "svcrdma: Error posting send = %d\n", 358 err); 359 /* 360 * Break the circular list so free knows when 361 * to stop if the error happened to occur on 362 * the last read 363 */ 364 ctxt->next = NULL; 365 goto out; 366 } 367 atomic_inc(&rdma_stat_read); 368 369 if (read_wr.num_sge < ch_sge_ary[ch_no].count) { 370 ch_sge_ary[ch_no].count -= read_wr.num_sge; 371 ch_sge_ary[ch_no].start += read_wr.num_sge; 372 goto next_sge; 373 } 374 sgl_offset = 0; 375 err = 0; 376 } 377 378 out: 379 svc_rdma_put_context(tmp_sge_ctxt, 0); 380 svc_rdma_put_context(tmp_ch_ctxt, 0); 381 382 /* Detach arg pages. svc_recv will replenish them */ 383 for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++) 384 rqstp->rq_pages[ch_no] = NULL; 385 386 /* 387 * Detach res pages. svc_release must see a resused count of 388 * zero or it will attempt to put them. 389 */ 390 while (rqstp->rq_resused) 391 rqstp->rq_respages[--rqstp->rq_resused] = NULL; 392 393 if (err) { 394 printk(KERN_ERR "svcrdma : RDMA_READ error = %d\n", err); 395 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 396 /* Free the linked list of read contexts */ 397 while (head != NULL) { 398 ctxt = head->next; 399 svc_rdma_put_context(head, 1); 400 head = ctxt; 401 } 402 return 0; 403 } 404 405 return 1; 406 } 407 408 static int rdma_read_complete(struct svc_rqst *rqstp, 409 struct svc_rdma_op_ctxt *data) 410 { 411 struct svc_rdma_op_ctxt *head = data->next; 412 int page_no; 413 int ret; 414 415 BUG_ON(!head); 416 417 /* Copy RPC pages */ 418 for (page_no = 0; page_no < head->count; page_no++) { 419 put_page(rqstp->rq_pages[page_no]); 420 rqstp->rq_pages[page_no] = head->pages[page_no]; 421 } 422 /* Point rq_arg.pages past header */ 423 rqstp->rq_arg.pages = &rqstp->rq_pages[head->sge[0].length]; 424 rqstp->rq_arg.page_len = head->arg.page_len; 425 rqstp->rq_arg.page_base = head->arg.page_base; 426 427 /* rq_respages starts after the last arg page */ 428 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no]; 429 rqstp->rq_resused = 0; 430 431 /* Rebuild rq_arg head and tail. */ 432 rqstp->rq_arg.head[0] = head->arg.head[0]; 433 rqstp->rq_arg.tail[0] = head->arg.tail[0]; 434 rqstp->rq_arg.len = head->arg.len; 435 rqstp->rq_arg.buflen = head->arg.buflen; 436 437 /* XXX: What should this be? */ 438 rqstp->rq_prot = IPPROTO_MAX; 439 440 /* 441 * Free the contexts we used to build the RDMA_READ. We have 442 * to be careful here because the context list uses the same 443 * next pointer used to chain the contexts associated with the 444 * RDMA_READ 445 */ 446 data->next = NULL; /* terminate circular list */ 447 do { 448 data = head->next; 449 svc_rdma_put_context(head, 0); 450 head = data; 451 } while (head != NULL); 452 453 ret = rqstp->rq_arg.head[0].iov_len 454 + rqstp->rq_arg.page_len 455 + rqstp->rq_arg.tail[0].iov_len; 456 dprintk("svcrdma: deferred read ret=%d, rq_arg.len =%d, " 457 "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n", 458 ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base, 459 rqstp->rq_arg.head[0].iov_len); 460 461 /* Indicate that we've consumed an RQ credit */ 462 rqstp->rq_xprt_ctxt = rqstp->rq_xprt; 463 svc_xprt_received(rqstp->rq_xprt); 464 return ret; 465 } 466 467 /* 468 * Set up the rqstp thread context to point to the RQ buffer. If 469 * necessary, pull additional data from the client with an RDMA_READ 470 * request. 471 */ 472 int svc_rdma_recvfrom(struct svc_rqst *rqstp) 473 { 474 struct svc_xprt *xprt = rqstp->rq_xprt; 475 struct svcxprt_rdma *rdma_xprt = 476 container_of(xprt, struct svcxprt_rdma, sc_xprt); 477 struct svc_rdma_op_ctxt *ctxt = NULL; 478 struct rpcrdma_msg *rmsgp; 479 int ret = 0; 480 int len; 481 482 dprintk("svcrdma: rqstp=%p\n", rqstp); 483 484 /* 485 * The rq_xprt_ctxt indicates if we've consumed an RQ credit 486 * or not. It is used in the rdma xpo_release_rqst function to 487 * determine whether or not to return an RQ WQE to the RQ. 488 */ 489 rqstp->rq_xprt_ctxt = NULL; 490 491 spin_lock_bh(&rdma_xprt->sc_read_complete_lock); 492 if (!list_empty(&rdma_xprt->sc_read_complete_q)) { 493 ctxt = list_entry(rdma_xprt->sc_read_complete_q.next, 494 struct svc_rdma_op_ctxt, 495 dto_q); 496 list_del_init(&ctxt->dto_q); 497 } 498 spin_unlock_bh(&rdma_xprt->sc_read_complete_lock); 499 if (ctxt) 500 return rdma_read_complete(rqstp, ctxt); 501 502 spin_lock_bh(&rdma_xprt->sc_rq_dto_lock); 503 if (!list_empty(&rdma_xprt->sc_rq_dto_q)) { 504 ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next, 505 struct svc_rdma_op_ctxt, 506 dto_q); 507 list_del_init(&ctxt->dto_q); 508 } else { 509 atomic_inc(&rdma_stat_rq_starve); 510 clear_bit(XPT_DATA, &xprt->xpt_flags); 511 ctxt = NULL; 512 } 513 spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock); 514 if (!ctxt) { 515 /* This is the EAGAIN path. The svc_recv routine will 516 * return -EAGAIN, the nfsd thread will go to call into 517 * svc_recv again and we shouldn't be on the active 518 * transport list 519 */ 520 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) 521 goto close_out; 522 523 BUG_ON(ret); 524 goto out; 525 } 526 dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n", 527 ctxt, rdma_xprt, rqstp, ctxt->wc_status); 528 BUG_ON(ctxt->wc_status != IB_WC_SUCCESS); 529 atomic_inc(&rdma_stat_recv); 530 531 /* Build up the XDR from the receive buffers. */ 532 rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len); 533 534 /* Decode the RDMA header. */ 535 len = svc_rdma_xdr_decode_req(&rmsgp, rqstp); 536 rqstp->rq_xprt_hlen = len; 537 538 /* If the request is invalid, reply with an error */ 539 if (len < 0) { 540 if (len == -ENOSYS) 541 (void)svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS); 542 goto close_out; 543 } 544 545 /* Read read-list data. If we would need to wait, defer 546 * it. Not that in this case, we don't return the RQ credit 547 * until after the read completes. 548 */ 549 if (rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt)) { 550 svc_xprt_received(xprt); 551 return 0; 552 } 553 554 /* Indicate we've consumed an RQ credit */ 555 rqstp->rq_xprt_ctxt = rqstp->rq_xprt; 556 557 ret = rqstp->rq_arg.head[0].iov_len 558 + rqstp->rq_arg.page_len 559 + rqstp->rq_arg.tail[0].iov_len; 560 svc_rdma_put_context(ctxt, 0); 561 out: 562 dprintk("svcrdma: ret = %d, rq_arg.len =%d, " 563 "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n", 564 ret, rqstp->rq_arg.len, 565 rqstp->rq_arg.head[0].iov_base, 566 rqstp->rq_arg.head[0].iov_len); 567 rqstp->rq_prot = IPPROTO_MAX; 568 svc_xprt_copy_addrs(rqstp, xprt); 569 svc_xprt_received(xprt); 570 return ret; 571 572 close_out: 573 if (ctxt) { 574 svc_rdma_put_context(ctxt, 1); 575 /* Indicate we've consumed an RQ credit */ 576 rqstp->rq_xprt_ctxt = rqstp->rq_xprt; 577 } 578 dprintk("svcrdma: transport %p is closing\n", xprt); 579 /* 580 * Set the close bit and enqueue it. svc_recv will see the 581 * close bit and call svc_xprt_delete 582 */ 583 set_bit(XPT_CLOSE, &xprt->xpt_flags); 584 svc_xprt_received(xprt); 585 return 0; 586 } 587