1 /* 2 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 * 39 * Author: Tom Tucker <tom@opengridcomputing.com> 40 */ 41 42 #include <linux/sunrpc/debug.h> 43 #include <linux/sunrpc/rpc_rdma.h> 44 #include <linux/spinlock.h> 45 #include <asm/unaligned.h> 46 #include <rdma/ib_verbs.h> 47 #include <rdma/rdma_cm.h> 48 #include <linux/sunrpc/svc_rdma.h> 49 50 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 51 52 /* 53 * Replace the pages in the rq_argpages array with the pages from the SGE in 54 * the RDMA_RECV completion. The SGL should contain full pages up until the 55 * last one. 56 */ 57 static void rdma_build_arg_xdr(struct svc_rqst *rqstp, 58 struct svc_rdma_op_ctxt *ctxt, 59 u32 byte_count) 60 { 61 struct page *page; 62 u32 bc; 63 int sge_no; 64 65 /* Swap the page in the SGE with the page in argpages */ 66 page = ctxt->pages[0]; 67 put_page(rqstp->rq_pages[0]); 68 rqstp->rq_pages[0] = page; 69 70 /* Set up the XDR head */ 71 rqstp->rq_arg.head[0].iov_base = page_address(page); 72 rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length); 73 rqstp->rq_arg.len = byte_count; 74 rqstp->rq_arg.buflen = byte_count; 75 76 /* Compute bytes past head in the SGL */ 77 bc = byte_count - rqstp->rq_arg.head[0].iov_len; 78 79 /* If data remains, store it in the pagelist */ 80 rqstp->rq_arg.page_len = bc; 81 rqstp->rq_arg.page_base = 0; 82 rqstp->rq_arg.pages = &rqstp->rq_pages[1]; 83 sge_no = 1; 84 while (bc && sge_no < ctxt->count) { 85 page = ctxt->pages[sge_no]; 86 put_page(rqstp->rq_pages[sge_no]); 87 rqstp->rq_pages[sge_no] = page; 88 bc -= min(bc, ctxt->sge[sge_no].length); 89 rqstp->rq_arg.buflen += ctxt->sge[sge_no].length; 90 sge_no++; 91 } 92 rqstp->rq_respages = &rqstp->rq_pages[sge_no]; 93 94 /* We should never run out of SGE because the limit is defined to 95 * support the max allowed RPC data length 96 */ 97 BUG_ON(bc && (sge_no == ctxt->count)); 98 BUG_ON((rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len) 99 != byte_count); 100 BUG_ON(rqstp->rq_arg.len != byte_count); 101 102 /* If not all pages were used from the SGL, free the remaining ones */ 103 bc = sge_no; 104 while (sge_no < ctxt->count) { 105 page = ctxt->pages[sge_no++]; 106 put_page(page); 107 } 108 ctxt->count = bc; 109 110 /* Set up tail */ 111 rqstp->rq_arg.tail[0].iov_base = NULL; 112 rqstp->rq_arg.tail[0].iov_len = 0; 113 } 114 115 struct chunk_sge { 116 int start; /* sge no for this chunk */ 117 int count; /* sge count for this chunk */ 118 }; 119 120 /* Encode a read-chunk-list as an array of IB SGE 121 * 122 * Assumptions: 123 * - chunk[0]->position points to pages[0] at an offset of 0 124 * - pages[] is not physically or virtually contigous and consists of 125 * PAGE_SIZE elements. 126 * 127 * Output: 128 * - sge array pointing into pages[] array. 129 * - chunk_sge array specifying sge index and count for each 130 * chunk in the read list 131 * 132 */ 133 static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt, 134 struct svc_rqst *rqstp, 135 struct svc_rdma_op_ctxt *head, 136 struct rpcrdma_msg *rmsgp, 137 struct ib_sge *sge, 138 struct chunk_sge *ch_sge_ary, 139 int ch_count, 140 int byte_count) 141 { 142 int sge_no; 143 int sge_bytes; 144 int page_off; 145 int page_no; 146 int ch_bytes; 147 int ch_no; 148 struct rpcrdma_read_chunk *ch; 149 150 sge_no = 0; 151 page_no = 0; 152 page_off = 0; 153 ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; 154 ch_no = 0; 155 ch_bytes = ch->rc_target.rs_length; 156 head->arg.head[0] = rqstp->rq_arg.head[0]; 157 head->arg.tail[0] = rqstp->rq_arg.tail[0]; 158 head->arg.pages = &head->pages[head->count]; 159 head->sge[0].length = head->count; /* save count of hdr pages */ 160 head->arg.page_base = 0; 161 head->arg.page_len = ch_bytes; 162 head->arg.len = rqstp->rq_arg.len + ch_bytes; 163 head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes; 164 head->count++; 165 ch_sge_ary[0].start = 0; 166 while (byte_count) { 167 sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes); 168 sge[sge_no].addr = 169 ib_dma_map_page(xprt->sc_cm_id->device, 170 rqstp->rq_arg.pages[page_no], 171 page_off, sge_bytes, 172 DMA_FROM_DEVICE); 173 sge[sge_no].length = sge_bytes; 174 sge[sge_no].lkey = xprt->sc_phys_mr->lkey; 175 /* 176 * Don't bump head->count here because the same page 177 * may be used by multiple SGE. 178 */ 179 head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no]; 180 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1]; 181 182 byte_count -= sge_bytes; 183 ch_bytes -= sge_bytes; 184 sge_no++; 185 /* 186 * If all bytes for this chunk have been mapped to an 187 * SGE, move to the next SGE 188 */ 189 if (ch_bytes == 0) { 190 ch_sge_ary[ch_no].count = 191 sge_no - ch_sge_ary[ch_no].start; 192 ch_no++; 193 ch++; 194 ch_sge_ary[ch_no].start = sge_no; 195 ch_bytes = ch->rc_target.rs_length; 196 /* If bytes remaining account for next chunk */ 197 if (byte_count) { 198 head->arg.page_len += ch_bytes; 199 head->arg.len += ch_bytes; 200 head->arg.buflen += ch_bytes; 201 } 202 } 203 /* 204 * If this SGE consumed all of the page, move to the 205 * next page 206 */ 207 if ((sge_bytes + page_off) == PAGE_SIZE) { 208 page_no++; 209 page_off = 0; 210 /* 211 * If there are still bytes left to map, bump 212 * the page count 213 */ 214 if (byte_count) 215 head->count++; 216 } else 217 page_off += sge_bytes; 218 } 219 BUG_ON(byte_count != 0); 220 return sge_no; 221 } 222 223 static void rdma_set_ctxt_sge(struct svc_rdma_op_ctxt *ctxt, 224 struct ib_sge *sge, 225 u64 *sgl_offset, 226 int count) 227 { 228 int i; 229 230 ctxt->count = count; 231 for (i = 0; i < count; i++) { 232 ctxt->sge[i].addr = sge[i].addr; 233 ctxt->sge[i].length = sge[i].length; 234 *sgl_offset = *sgl_offset + sge[i].length; 235 } 236 } 237 238 static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count) 239 { 240 if ((RDMA_TRANSPORT_IWARP == 241 rdma_node_get_transport(xprt->sc_cm_id-> 242 device->node_type)) 243 && sge_count > 1) 244 return 1; 245 else 246 return min_t(int, sge_count, xprt->sc_max_sge); 247 } 248 249 /* 250 * Use RDMA_READ to read data from the advertised client buffer into the 251 * XDR stream starting at rq_arg.head[0].iov_base. 252 * Each chunk in the array 253 * contains the following fields: 254 * discrim - '1', This isn't used for data placement 255 * position - The xdr stream offset (the same for every chunk) 256 * handle - RMR for client memory region 257 * length - data transfer length 258 * offset - 64 bit tagged offset in remote memory region 259 * 260 * On our side, we need to read into a pagelist. The first page immediately 261 * follows the RPC header. 262 * 263 * This function returns 1 to indicate success. The data is not yet in 264 * the pagelist and therefore the RPC request must be deferred. The 265 * I/O completion will enqueue the transport again and 266 * svc_rdma_recvfrom will complete the request. 267 * 268 * NOTE: The ctxt must not be touched after the last WR has been posted 269 * because the I/O completion processing may occur on another 270 * processor and free / modify the context. Ne touche pas! 271 */ 272 static int rdma_read_xdr(struct svcxprt_rdma *xprt, 273 struct rpcrdma_msg *rmsgp, 274 struct svc_rqst *rqstp, 275 struct svc_rdma_op_ctxt *hdr_ctxt) 276 { 277 struct ib_send_wr read_wr; 278 int err = 0; 279 int ch_no; 280 struct ib_sge *sge; 281 int ch_count; 282 int byte_count; 283 int sge_count; 284 u64 sgl_offset; 285 struct rpcrdma_read_chunk *ch; 286 struct svc_rdma_op_ctxt *ctxt = NULL; 287 struct svc_rdma_op_ctxt *head; 288 struct svc_rdma_op_ctxt *tmp_sge_ctxt; 289 struct svc_rdma_op_ctxt *tmp_ch_ctxt; 290 struct chunk_sge *ch_sge_ary; 291 292 /* If no read list is present, return 0 */ 293 ch = svc_rdma_get_read_chunk(rmsgp); 294 if (!ch) 295 return 0; 296 297 /* Allocate temporary contexts to keep SGE */ 298 BUG_ON(sizeof(struct ib_sge) < sizeof(struct chunk_sge)); 299 tmp_sge_ctxt = svc_rdma_get_context(xprt); 300 sge = tmp_sge_ctxt->sge; 301 tmp_ch_ctxt = svc_rdma_get_context(xprt); 302 ch_sge_ary = (struct chunk_sge *)tmp_ch_ctxt->sge; 303 304 svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count); 305 sge_count = rdma_rcl_to_sge(xprt, rqstp, hdr_ctxt, rmsgp, 306 sge, ch_sge_ary, 307 ch_count, byte_count); 308 head = svc_rdma_get_context(xprt); 309 sgl_offset = 0; 310 ch_no = 0; 311 312 for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; 313 ch->rc_discrim != 0; ch++, ch_no++) { 314 next_sge: 315 if (!ctxt) 316 ctxt = head; 317 else { 318 ctxt->next = svc_rdma_get_context(xprt); 319 ctxt = ctxt->next; 320 } 321 ctxt->next = NULL; 322 ctxt->direction = DMA_FROM_DEVICE; 323 clear_bit(RDMACTXT_F_READ_DONE, &ctxt->flags); 324 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 325 326 /* Prepare READ WR */ 327 memset(&read_wr, 0, sizeof read_wr); 328 ctxt->wr_op = IB_WR_RDMA_READ; 329 read_wr.wr_id = (unsigned long)ctxt; 330 read_wr.opcode = IB_WR_RDMA_READ; 331 read_wr.send_flags = IB_SEND_SIGNALED; 332 read_wr.wr.rdma.rkey = ch->rc_target.rs_handle; 333 read_wr.wr.rdma.remote_addr = 334 get_unaligned(&(ch->rc_target.rs_offset)) + 335 sgl_offset; 336 read_wr.sg_list = &sge[ch_sge_ary[ch_no].start]; 337 read_wr.num_sge = 338 rdma_read_max_sge(xprt, ch_sge_ary[ch_no].count); 339 rdma_set_ctxt_sge(ctxt, &sge[ch_sge_ary[ch_no].start], 340 &sgl_offset, 341 read_wr.num_sge); 342 if (((ch+1)->rc_discrim == 0) && 343 (read_wr.num_sge == ch_sge_ary[ch_no].count)) { 344 /* 345 * Mark the last RDMA_READ with a bit to 346 * indicate all RPC data has been fetched from 347 * the client and the RPC needs to be enqueued. 348 */ 349 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 350 ctxt->next = hdr_ctxt; 351 hdr_ctxt->next = head; 352 } 353 /* Post the read */ 354 err = svc_rdma_send(xprt, &read_wr); 355 if (err) { 356 printk(KERN_ERR "svcrdma: Error posting send = %d\n", 357 err); 358 /* 359 * Break the circular list so free knows when 360 * to stop if the error happened to occur on 361 * the last read 362 */ 363 ctxt->next = NULL; 364 goto out; 365 } 366 atomic_inc(&rdma_stat_read); 367 368 if (read_wr.num_sge < ch_sge_ary[ch_no].count) { 369 ch_sge_ary[ch_no].count -= read_wr.num_sge; 370 ch_sge_ary[ch_no].start += read_wr.num_sge; 371 goto next_sge; 372 } 373 sgl_offset = 0; 374 err = 0; 375 } 376 377 out: 378 svc_rdma_put_context(tmp_sge_ctxt, 0); 379 svc_rdma_put_context(tmp_ch_ctxt, 0); 380 381 /* Detach arg pages. svc_recv will replenish them */ 382 for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++) 383 rqstp->rq_pages[ch_no] = NULL; 384 385 /* 386 * Detach res pages. svc_release must see a resused count of 387 * zero or it will attempt to put them. 388 */ 389 while (rqstp->rq_resused) 390 rqstp->rq_respages[--rqstp->rq_resused] = NULL; 391 392 if (err) { 393 printk(KERN_ERR "svcrdma : RDMA_READ error = %d\n", err); 394 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 395 /* Free the linked list of read contexts */ 396 while (head != NULL) { 397 ctxt = head->next; 398 svc_rdma_put_context(head, 1); 399 head = ctxt; 400 } 401 return 0; 402 } 403 404 return 1; 405 } 406 407 static int rdma_read_complete(struct svc_rqst *rqstp, 408 struct svc_rdma_op_ctxt *data) 409 { 410 struct svc_rdma_op_ctxt *head = data->next; 411 int page_no; 412 int ret; 413 414 BUG_ON(!head); 415 416 /* Copy RPC pages */ 417 for (page_no = 0; page_no < head->count; page_no++) { 418 put_page(rqstp->rq_pages[page_no]); 419 rqstp->rq_pages[page_no] = head->pages[page_no]; 420 } 421 /* Point rq_arg.pages past header */ 422 rqstp->rq_arg.pages = &rqstp->rq_pages[head->sge[0].length]; 423 rqstp->rq_arg.page_len = head->arg.page_len; 424 rqstp->rq_arg.page_base = head->arg.page_base; 425 426 /* rq_respages starts after the last arg page */ 427 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no]; 428 rqstp->rq_resused = 0; 429 430 /* Rebuild rq_arg head and tail. */ 431 rqstp->rq_arg.head[0] = head->arg.head[0]; 432 rqstp->rq_arg.tail[0] = head->arg.tail[0]; 433 rqstp->rq_arg.len = head->arg.len; 434 rqstp->rq_arg.buflen = head->arg.buflen; 435 436 /* XXX: What should this be? */ 437 rqstp->rq_prot = IPPROTO_MAX; 438 439 /* 440 * Free the contexts we used to build the RDMA_READ. We have 441 * to be careful here because the context list uses the same 442 * next pointer used to chain the contexts associated with the 443 * RDMA_READ 444 */ 445 data->next = NULL; /* terminate circular list */ 446 do { 447 data = head->next; 448 svc_rdma_put_context(head, 0); 449 head = data; 450 } while (head != NULL); 451 452 ret = rqstp->rq_arg.head[0].iov_len 453 + rqstp->rq_arg.page_len 454 + rqstp->rq_arg.tail[0].iov_len; 455 dprintk("svcrdma: deferred read ret=%d, rq_arg.len =%d, " 456 "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n", 457 ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base, 458 rqstp->rq_arg.head[0].iov_len); 459 460 /* Indicate that we've consumed an RQ credit */ 461 rqstp->rq_xprt_ctxt = rqstp->rq_xprt; 462 svc_xprt_received(rqstp->rq_xprt); 463 return ret; 464 } 465 466 /* 467 * Set up the rqstp thread context to point to the RQ buffer. If 468 * necessary, pull additional data from the client with an RDMA_READ 469 * request. 470 */ 471 int svc_rdma_recvfrom(struct svc_rqst *rqstp) 472 { 473 struct svc_xprt *xprt = rqstp->rq_xprt; 474 struct svcxprt_rdma *rdma_xprt = 475 container_of(xprt, struct svcxprt_rdma, sc_xprt); 476 struct svc_rdma_op_ctxt *ctxt = NULL; 477 struct rpcrdma_msg *rmsgp; 478 int ret = 0; 479 int len; 480 481 dprintk("svcrdma: rqstp=%p\n", rqstp); 482 483 /* 484 * The rq_xprt_ctxt indicates if we've consumed an RQ credit 485 * or not. It is used in the rdma xpo_release_rqst function to 486 * determine whether or not to return an RQ WQE to the RQ. 487 */ 488 rqstp->rq_xprt_ctxt = NULL; 489 490 spin_lock_bh(&rdma_xprt->sc_read_complete_lock); 491 if (!list_empty(&rdma_xprt->sc_read_complete_q)) { 492 ctxt = list_entry(rdma_xprt->sc_read_complete_q.next, 493 struct svc_rdma_op_ctxt, 494 dto_q); 495 list_del_init(&ctxt->dto_q); 496 } 497 spin_unlock_bh(&rdma_xprt->sc_read_complete_lock); 498 if (ctxt) 499 return rdma_read_complete(rqstp, ctxt); 500 501 spin_lock_bh(&rdma_xprt->sc_rq_dto_lock); 502 if (!list_empty(&rdma_xprt->sc_rq_dto_q)) { 503 ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next, 504 struct svc_rdma_op_ctxt, 505 dto_q); 506 list_del_init(&ctxt->dto_q); 507 } else { 508 atomic_inc(&rdma_stat_rq_starve); 509 clear_bit(XPT_DATA, &xprt->xpt_flags); 510 ctxt = NULL; 511 } 512 spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock); 513 if (!ctxt) { 514 /* This is the EAGAIN path. The svc_recv routine will 515 * return -EAGAIN, the nfsd thread will go to call into 516 * svc_recv again and we shouldn't be on the active 517 * transport list 518 */ 519 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) 520 goto close_out; 521 522 BUG_ON(ret); 523 goto out; 524 } 525 dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n", 526 ctxt, rdma_xprt, rqstp, ctxt->wc_status); 527 BUG_ON(ctxt->wc_status != IB_WC_SUCCESS); 528 atomic_inc(&rdma_stat_recv); 529 530 /* Build up the XDR from the receive buffers. */ 531 rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len); 532 533 /* Decode the RDMA header. */ 534 len = svc_rdma_xdr_decode_req(&rmsgp, rqstp); 535 rqstp->rq_xprt_hlen = len; 536 537 /* If the request is invalid, reply with an error */ 538 if (len < 0) { 539 if (len == -ENOSYS) 540 (void)svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS); 541 goto close_out; 542 } 543 544 /* Read read-list data. If we would need to wait, defer 545 * it. Not that in this case, we don't return the RQ credit 546 * until after the read completes. 547 */ 548 if (rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt)) { 549 svc_xprt_received(xprt); 550 return 0; 551 } 552 553 /* Indicate we've consumed an RQ credit */ 554 rqstp->rq_xprt_ctxt = rqstp->rq_xprt; 555 556 ret = rqstp->rq_arg.head[0].iov_len 557 + rqstp->rq_arg.page_len 558 + rqstp->rq_arg.tail[0].iov_len; 559 svc_rdma_put_context(ctxt, 0); 560 out: 561 dprintk("svcrdma: ret = %d, rq_arg.len =%d, " 562 "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n", 563 ret, rqstp->rq_arg.len, 564 rqstp->rq_arg.head[0].iov_base, 565 rqstp->rq_arg.head[0].iov_len); 566 rqstp->rq_prot = IPPROTO_MAX; 567 svc_xprt_copy_addrs(rqstp, xprt); 568 svc_xprt_received(xprt); 569 return ret; 570 571 close_out: 572 if (ctxt) { 573 svc_rdma_put_context(ctxt, 1); 574 /* Indicate we've consumed an RQ credit */ 575 rqstp->rq_xprt_ctxt = rqstp->rq_xprt; 576 } 577 dprintk("svcrdma: transport %p is closing\n", xprt); 578 /* 579 * Set the close bit and enqueue it. svc_recv will see the 580 * close bit and call svc_xprt_delete 581 */ 582 set_bit(XPT_CLOSE, &xprt->xpt_flags); 583 svc_xprt_received(xprt); 584 return 0; 585 } 586