1 /* 2 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 * 39 * Author: Tom Tucker <tom@opengridcomputing.com> 40 */ 41 42 #include <linux/sunrpc/debug.h> 43 #include <linux/sunrpc/rpc_rdma.h> 44 #include <linux/spinlock.h> 45 #include <asm/unaligned.h> 46 #include <rdma/ib_verbs.h> 47 #include <rdma/rdma_cm.h> 48 #include <linux/sunrpc/svc_rdma.h> 49 50 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 51 52 /* 53 * Replace the pages in the rq_argpages array with the pages from the SGE in 54 * the RDMA_RECV completion. The SGL should contain full pages up until the 55 * last one. 56 */ 57 static void rdma_build_arg_xdr(struct svc_rqst *rqstp, 58 struct svc_rdma_op_ctxt *ctxt, 59 u32 byte_count) 60 { 61 struct page *page; 62 u32 bc; 63 int sge_no; 64 65 /* Swap the page in the SGE with the page in argpages */ 66 page = ctxt->pages[0]; 67 put_page(rqstp->rq_pages[0]); 68 rqstp->rq_pages[0] = page; 69 70 /* Set up the XDR head */ 71 rqstp->rq_arg.head[0].iov_base = page_address(page); 72 rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length); 73 rqstp->rq_arg.len = byte_count; 74 rqstp->rq_arg.buflen = byte_count; 75 76 /* Compute bytes past head in the SGL */ 77 bc = byte_count - rqstp->rq_arg.head[0].iov_len; 78 79 /* If data remains, store it in the pagelist */ 80 rqstp->rq_arg.page_len = bc; 81 rqstp->rq_arg.page_base = 0; 82 rqstp->rq_arg.pages = &rqstp->rq_pages[1]; 83 sge_no = 1; 84 while (bc && sge_no < ctxt->count) { 85 page = ctxt->pages[sge_no]; 86 put_page(rqstp->rq_pages[sge_no]); 87 rqstp->rq_pages[sge_no] = page; 88 bc -= min(bc, ctxt->sge[sge_no].length); 89 rqstp->rq_arg.buflen += ctxt->sge[sge_no].length; 90 sge_no++; 91 } 92 rqstp->rq_respages = &rqstp->rq_pages[sge_no]; 93 rqstp->rq_next_page = rqstp->rq_respages + 1; 94 95 /* We should never run out of SGE because the limit is defined to 96 * support the max allowed RPC data length 97 */ 98 BUG_ON(bc && (sge_no == ctxt->count)); 99 BUG_ON((rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len) 100 != byte_count); 101 BUG_ON(rqstp->rq_arg.len != byte_count); 102 103 /* If not all pages were used from the SGL, free the remaining ones */ 104 bc = sge_no; 105 while (sge_no < ctxt->count) { 106 page = ctxt->pages[sge_no++]; 107 put_page(page); 108 } 109 ctxt->count = bc; 110 111 /* Set up tail */ 112 rqstp->rq_arg.tail[0].iov_base = NULL; 113 rqstp->rq_arg.tail[0].iov_len = 0; 114 } 115 116 /* Encode a read-chunk-list as an array of IB SGE 117 * 118 * Assumptions: 119 * - chunk[0]->position points to pages[0] at an offset of 0 120 * - pages[] is not physically or virtually contiguous and consists of 121 * PAGE_SIZE elements. 122 * 123 * Output: 124 * - sge array pointing into pages[] array. 125 * - chunk_sge array specifying sge index and count for each 126 * chunk in the read list 127 * 128 */ 129 static int map_read_chunks(struct svcxprt_rdma *xprt, 130 struct svc_rqst *rqstp, 131 struct svc_rdma_op_ctxt *head, 132 struct rpcrdma_msg *rmsgp, 133 struct svc_rdma_req_map *rpl_map, 134 struct svc_rdma_req_map *chl_map, 135 int ch_count, 136 int byte_count) 137 { 138 int sge_no; 139 int sge_bytes; 140 int page_off; 141 int page_no; 142 int ch_bytes; 143 int ch_no; 144 struct rpcrdma_read_chunk *ch; 145 146 sge_no = 0; 147 page_no = 0; 148 page_off = 0; 149 ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; 150 ch_no = 0; 151 ch_bytes = ntohl(ch->rc_target.rs_length); 152 head->arg.head[0] = rqstp->rq_arg.head[0]; 153 head->arg.tail[0] = rqstp->rq_arg.tail[0]; 154 head->arg.pages = &head->pages[head->count]; 155 head->hdr_count = head->count; /* save count of hdr pages */ 156 head->arg.page_base = 0; 157 head->arg.page_len = ch_bytes; 158 head->arg.len = rqstp->rq_arg.len + ch_bytes; 159 head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes; 160 head->count++; 161 chl_map->ch[0].start = 0; 162 while (byte_count) { 163 rpl_map->sge[sge_no].iov_base = 164 page_address(rqstp->rq_arg.pages[page_no]) + page_off; 165 sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes); 166 rpl_map->sge[sge_no].iov_len = sge_bytes; 167 /* 168 * Don't bump head->count here because the same page 169 * may be used by multiple SGE. 170 */ 171 head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no]; 172 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1]; 173 rqstp->rq_next_page = rqstp->rq_respages + 1; 174 175 byte_count -= sge_bytes; 176 ch_bytes -= sge_bytes; 177 sge_no++; 178 /* 179 * If all bytes for this chunk have been mapped to an 180 * SGE, move to the next SGE 181 */ 182 if (ch_bytes == 0) { 183 chl_map->ch[ch_no].count = 184 sge_no - chl_map->ch[ch_no].start; 185 ch_no++; 186 ch++; 187 chl_map->ch[ch_no].start = sge_no; 188 ch_bytes = ntohl(ch->rc_target.rs_length); 189 /* If bytes remaining account for next chunk */ 190 if (byte_count) { 191 head->arg.page_len += ch_bytes; 192 head->arg.len += ch_bytes; 193 head->arg.buflen += ch_bytes; 194 } 195 } 196 /* 197 * If this SGE consumed all of the page, move to the 198 * next page 199 */ 200 if ((sge_bytes + page_off) == PAGE_SIZE) { 201 page_no++; 202 page_off = 0; 203 /* 204 * If there are still bytes left to map, bump 205 * the page count 206 */ 207 if (byte_count) 208 head->count++; 209 } else 210 page_off += sge_bytes; 211 } 212 BUG_ON(byte_count != 0); 213 return sge_no; 214 } 215 216 /* Map a read-chunk-list to an XDR and fast register the page-list. 217 * 218 * Assumptions: 219 * - chunk[0] position points to pages[0] at an offset of 0 220 * - pages[] will be made physically contiguous by creating a one-off memory 221 * region using the fastreg verb. 222 * - byte_count is # of bytes in read-chunk-list 223 * - ch_count is # of chunks in read-chunk-list 224 * 225 * Output: 226 * - sge array pointing into pages[] array. 227 * - chunk_sge array specifying sge index and count for each 228 * chunk in the read list 229 */ 230 static int fast_reg_read_chunks(struct svcxprt_rdma *xprt, 231 struct svc_rqst *rqstp, 232 struct svc_rdma_op_ctxt *head, 233 struct rpcrdma_msg *rmsgp, 234 struct svc_rdma_req_map *rpl_map, 235 struct svc_rdma_req_map *chl_map, 236 int ch_count, 237 int byte_count) 238 { 239 int page_no; 240 int ch_no; 241 u32 offset; 242 struct rpcrdma_read_chunk *ch; 243 struct svc_rdma_fastreg_mr *frmr; 244 int ret = 0; 245 246 frmr = svc_rdma_get_frmr(xprt); 247 if (IS_ERR(frmr)) 248 return -ENOMEM; 249 250 head->frmr = frmr; 251 head->arg.head[0] = rqstp->rq_arg.head[0]; 252 head->arg.tail[0] = rqstp->rq_arg.tail[0]; 253 head->arg.pages = &head->pages[head->count]; 254 head->hdr_count = head->count; /* save count of hdr pages */ 255 head->arg.page_base = 0; 256 head->arg.page_len = byte_count; 257 head->arg.len = rqstp->rq_arg.len + byte_count; 258 head->arg.buflen = rqstp->rq_arg.buflen + byte_count; 259 260 /* Fast register the page list */ 261 frmr->kva = page_address(rqstp->rq_arg.pages[0]); 262 frmr->direction = DMA_FROM_DEVICE; 263 frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE); 264 frmr->map_len = byte_count; 265 frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT; 266 for (page_no = 0; page_no < frmr->page_list_len; page_no++) { 267 frmr->page_list->page_list[page_no] = 268 ib_dma_map_page(xprt->sc_cm_id->device, 269 rqstp->rq_arg.pages[page_no], 0, 270 PAGE_SIZE, DMA_FROM_DEVICE); 271 if (ib_dma_mapping_error(xprt->sc_cm_id->device, 272 frmr->page_list->page_list[page_no])) 273 goto fatal_err; 274 atomic_inc(&xprt->sc_dma_used); 275 head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no]; 276 } 277 head->count += page_no; 278 279 /* rq_respages points one past arg pages */ 280 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no]; 281 rqstp->rq_next_page = rqstp->rq_respages + 1; 282 283 /* Create the reply and chunk maps */ 284 offset = 0; 285 ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; 286 for (ch_no = 0; ch_no < ch_count; ch_no++) { 287 int len = ntohl(ch->rc_target.rs_length); 288 rpl_map->sge[ch_no].iov_base = frmr->kva + offset; 289 rpl_map->sge[ch_no].iov_len = len; 290 chl_map->ch[ch_no].count = 1; 291 chl_map->ch[ch_no].start = ch_no; 292 offset += len; 293 ch++; 294 } 295 296 ret = svc_rdma_fastreg(xprt, frmr); 297 if (ret) 298 goto fatal_err; 299 300 return ch_no; 301 302 fatal_err: 303 printk("svcrdma: error fast registering xdr for xprt %p", xprt); 304 svc_rdma_put_frmr(xprt, frmr); 305 return -EIO; 306 } 307 308 static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt, 309 struct svc_rdma_op_ctxt *ctxt, 310 struct svc_rdma_fastreg_mr *frmr, 311 struct kvec *vec, 312 u64 *sgl_offset, 313 int count) 314 { 315 int i; 316 unsigned long off; 317 318 ctxt->count = count; 319 ctxt->direction = DMA_FROM_DEVICE; 320 for (i = 0; i < count; i++) { 321 ctxt->sge[i].length = 0; /* in case map fails */ 322 if (!frmr) { 323 BUG_ON(!virt_to_page(vec[i].iov_base)); 324 off = (unsigned long)vec[i].iov_base & ~PAGE_MASK; 325 ctxt->sge[i].addr = 326 ib_dma_map_page(xprt->sc_cm_id->device, 327 virt_to_page(vec[i].iov_base), 328 off, 329 vec[i].iov_len, 330 DMA_FROM_DEVICE); 331 if (ib_dma_mapping_error(xprt->sc_cm_id->device, 332 ctxt->sge[i].addr)) 333 return -EINVAL; 334 ctxt->sge[i].lkey = xprt->sc_dma_lkey; 335 atomic_inc(&xprt->sc_dma_used); 336 } else { 337 ctxt->sge[i].addr = (unsigned long)vec[i].iov_base; 338 ctxt->sge[i].lkey = frmr->mr->lkey; 339 } 340 ctxt->sge[i].length = vec[i].iov_len; 341 *sgl_offset = *sgl_offset + vec[i].iov_len; 342 } 343 return 0; 344 } 345 346 static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count) 347 { 348 if ((rdma_node_get_transport(xprt->sc_cm_id->device->node_type) == 349 RDMA_TRANSPORT_IWARP) && 350 sge_count > 1) 351 return 1; 352 else 353 return min_t(int, sge_count, xprt->sc_max_sge); 354 } 355 356 /* 357 * Use RDMA_READ to read data from the advertised client buffer into the 358 * XDR stream starting at rq_arg.head[0].iov_base. 359 * Each chunk in the array 360 * contains the following fields: 361 * discrim - '1', This isn't used for data placement 362 * position - The xdr stream offset (the same for every chunk) 363 * handle - RMR for client memory region 364 * length - data transfer length 365 * offset - 64 bit tagged offset in remote memory region 366 * 367 * On our side, we need to read into a pagelist. The first page immediately 368 * follows the RPC header. 369 * 370 * This function returns: 371 * 0 - No error and no read-list found. 372 * 373 * 1 - Successful read-list processing. The data is not yet in 374 * the pagelist and therefore the RPC request must be deferred. The 375 * I/O completion will enqueue the transport again and 376 * svc_rdma_recvfrom will complete the request. 377 * 378 * <0 - Error processing/posting read-list. 379 * 380 * NOTE: The ctxt must not be touched after the last WR has been posted 381 * because the I/O completion processing may occur on another 382 * processor and free / modify the context. Ne touche pas! 383 */ 384 static int rdma_read_xdr(struct svcxprt_rdma *xprt, 385 struct rpcrdma_msg *rmsgp, 386 struct svc_rqst *rqstp, 387 struct svc_rdma_op_ctxt *hdr_ctxt) 388 { 389 struct ib_send_wr read_wr; 390 struct ib_send_wr inv_wr; 391 int err = 0; 392 int ch_no; 393 int ch_count; 394 int byte_count; 395 int sge_count; 396 u64 sgl_offset; 397 struct rpcrdma_read_chunk *ch; 398 struct svc_rdma_op_ctxt *ctxt = NULL; 399 struct svc_rdma_req_map *rpl_map; 400 struct svc_rdma_req_map *chl_map; 401 402 /* If no read list is present, return 0 */ 403 ch = svc_rdma_get_read_chunk(rmsgp); 404 if (!ch) 405 return 0; 406 407 svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count); 408 if (ch_count > RPCSVC_MAXPAGES) 409 return -EINVAL; 410 411 /* Allocate temporary reply and chunk maps */ 412 rpl_map = svc_rdma_get_req_map(); 413 chl_map = svc_rdma_get_req_map(); 414 415 if (!xprt->sc_frmr_pg_list_len) 416 sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp, 417 rpl_map, chl_map, ch_count, 418 byte_count); 419 else 420 sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp, 421 rpl_map, chl_map, ch_count, 422 byte_count); 423 if (sge_count < 0) { 424 err = -EIO; 425 goto out; 426 } 427 428 sgl_offset = 0; 429 ch_no = 0; 430 431 for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; 432 ch->rc_discrim != 0; ch++, ch_no++) { 433 u64 rs_offset; 434 next_sge: 435 ctxt = svc_rdma_get_context(xprt); 436 ctxt->direction = DMA_FROM_DEVICE; 437 ctxt->frmr = hdr_ctxt->frmr; 438 ctxt->read_hdr = NULL; 439 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 440 clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags); 441 442 /* Prepare READ WR */ 443 memset(&read_wr, 0, sizeof read_wr); 444 read_wr.wr_id = (unsigned long)ctxt; 445 read_wr.opcode = IB_WR_RDMA_READ; 446 ctxt->wr_op = read_wr.opcode; 447 read_wr.send_flags = IB_SEND_SIGNALED; 448 read_wr.wr.rdma.rkey = ntohl(ch->rc_target.rs_handle); 449 xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset, 450 &rs_offset); 451 read_wr.wr.rdma.remote_addr = rs_offset + sgl_offset; 452 read_wr.sg_list = ctxt->sge; 453 read_wr.num_sge = 454 rdma_read_max_sge(xprt, chl_map->ch[ch_no].count); 455 err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr, 456 &rpl_map->sge[chl_map->ch[ch_no].start], 457 &sgl_offset, 458 read_wr.num_sge); 459 if (err) { 460 svc_rdma_unmap_dma(ctxt); 461 svc_rdma_put_context(ctxt, 0); 462 goto out; 463 } 464 if (((ch+1)->rc_discrim == 0) && 465 (read_wr.num_sge == chl_map->ch[ch_no].count)) { 466 /* 467 * Mark the last RDMA_READ with a bit to 468 * indicate all RPC data has been fetched from 469 * the client and the RPC needs to be enqueued. 470 */ 471 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 472 if (hdr_ctxt->frmr) { 473 set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags); 474 /* 475 * Invalidate the local MR used to map the data 476 * sink. 477 */ 478 if (xprt->sc_dev_caps & 479 SVCRDMA_DEVCAP_READ_W_INV) { 480 read_wr.opcode = 481 IB_WR_RDMA_READ_WITH_INV; 482 ctxt->wr_op = read_wr.opcode; 483 read_wr.ex.invalidate_rkey = 484 ctxt->frmr->mr->lkey; 485 } else { 486 /* Prepare INVALIDATE WR */ 487 memset(&inv_wr, 0, sizeof inv_wr); 488 inv_wr.opcode = IB_WR_LOCAL_INV; 489 inv_wr.send_flags = IB_SEND_SIGNALED; 490 inv_wr.ex.invalidate_rkey = 491 hdr_ctxt->frmr->mr->lkey; 492 read_wr.next = &inv_wr; 493 } 494 } 495 ctxt->read_hdr = hdr_ctxt; 496 } 497 /* Post the read */ 498 err = svc_rdma_send(xprt, &read_wr); 499 if (err) { 500 printk(KERN_ERR "svcrdma: Error %d posting RDMA_READ\n", 501 err); 502 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 503 svc_rdma_unmap_dma(ctxt); 504 svc_rdma_put_context(ctxt, 0); 505 goto out; 506 } 507 atomic_inc(&rdma_stat_read); 508 509 if (read_wr.num_sge < chl_map->ch[ch_no].count) { 510 chl_map->ch[ch_no].count -= read_wr.num_sge; 511 chl_map->ch[ch_no].start += read_wr.num_sge; 512 goto next_sge; 513 } 514 sgl_offset = 0; 515 err = 1; 516 } 517 518 out: 519 svc_rdma_put_req_map(rpl_map); 520 svc_rdma_put_req_map(chl_map); 521 522 /* Detach arg pages. svc_recv will replenish them */ 523 for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++) 524 rqstp->rq_pages[ch_no] = NULL; 525 526 return err; 527 } 528 529 static int rdma_read_complete(struct svc_rqst *rqstp, 530 struct svc_rdma_op_ctxt *head) 531 { 532 int page_no; 533 int ret; 534 535 BUG_ON(!head); 536 537 /* Copy RPC pages */ 538 for (page_no = 0; page_no < head->count; page_no++) { 539 put_page(rqstp->rq_pages[page_no]); 540 rqstp->rq_pages[page_no] = head->pages[page_no]; 541 } 542 /* Point rq_arg.pages past header */ 543 rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count]; 544 rqstp->rq_arg.page_len = head->arg.page_len; 545 rqstp->rq_arg.page_base = head->arg.page_base; 546 547 /* rq_respages starts after the last arg page */ 548 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no]; 549 rqstp->rq_next_page = rqstp->rq_respages + 1; 550 551 /* Rebuild rq_arg head and tail. */ 552 rqstp->rq_arg.head[0] = head->arg.head[0]; 553 rqstp->rq_arg.tail[0] = head->arg.tail[0]; 554 rqstp->rq_arg.len = head->arg.len; 555 rqstp->rq_arg.buflen = head->arg.buflen; 556 557 /* Free the context */ 558 svc_rdma_put_context(head, 0); 559 560 /* XXX: What should this be? */ 561 rqstp->rq_prot = IPPROTO_MAX; 562 svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt); 563 564 ret = rqstp->rq_arg.head[0].iov_len 565 + rqstp->rq_arg.page_len 566 + rqstp->rq_arg.tail[0].iov_len; 567 dprintk("svcrdma: deferred read ret=%d, rq_arg.len =%d, " 568 "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n", 569 ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base, 570 rqstp->rq_arg.head[0].iov_len); 571 572 return ret; 573 } 574 575 /* 576 * Set up the rqstp thread context to point to the RQ buffer. If 577 * necessary, pull additional data from the client with an RDMA_READ 578 * request. 579 */ 580 int svc_rdma_recvfrom(struct svc_rqst *rqstp) 581 { 582 struct svc_xprt *xprt = rqstp->rq_xprt; 583 struct svcxprt_rdma *rdma_xprt = 584 container_of(xprt, struct svcxprt_rdma, sc_xprt); 585 struct svc_rdma_op_ctxt *ctxt = NULL; 586 struct rpcrdma_msg *rmsgp; 587 int ret = 0; 588 int len; 589 590 dprintk("svcrdma: rqstp=%p\n", rqstp); 591 592 spin_lock_bh(&rdma_xprt->sc_rq_dto_lock); 593 if (!list_empty(&rdma_xprt->sc_read_complete_q)) { 594 ctxt = list_entry(rdma_xprt->sc_read_complete_q.next, 595 struct svc_rdma_op_ctxt, 596 dto_q); 597 list_del_init(&ctxt->dto_q); 598 } 599 if (ctxt) { 600 spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock); 601 return rdma_read_complete(rqstp, ctxt); 602 } 603 604 if (!list_empty(&rdma_xprt->sc_rq_dto_q)) { 605 ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next, 606 struct svc_rdma_op_ctxt, 607 dto_q); 608 list_del_init(&ctxt->dto_q); 609 } else { 610 atomic_inc(&rdma_stat_rq_starve); 611 clear_bit(XPT_DATA, &xprt->xpt_flags); 612 ctxt = NULL; 613 } 614 spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock); 615 if (!ctxt) { 616 /* This is the EAGAIN path. The svc_recv routine will 617 * return -EAGAIN, the nfsd thread will go to call into 618 * svc_recv again and we shouldn't be on the active 619 * transport list 620 */ 621 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) 622 goto close_out; 623 624 BUG_ON(ret); 625 goto out; 626 } 627 dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n", 628 ctxt, rdma_xprt, rqstp, ctxt->wc_status); 629 BUG_ON(ctxt->wc_status != IB_WC_SUCCESS); 630 atomic_inc(&rdma_stat_recv); 631 632 /* Build up the XDR from the receive buffers. */ 633 rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len); 634 635 /* Decode the RDMA header. */ 636 len = svc_rdma_xdr_decode_req(&rmsgp, rqstp); 637 rqstp->rq_xprt_hlen = len; 638 639 /* If the request is invalid, reply with an error */ 640 if (len < 0) { 641 if (len == -ENOSYS) 642 svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS); 643 goto close_out; 644 } 645 646 /* Read read-list data. */ 647 ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt); 648 if (ret > 0) { 649 /* read-list posted, defer until data received from client. */ 650 goto defer; 651 } 652 if (ret < 0) { 653 /* Post of read-list failed, free context. */ 654 svc_rdma_put_context(ctxt, 1); 655 return 0; 656 } 657 658 ret = rqstp->rq_arg.head[0].iov_len 659 + rqstp->rq_arg.page_len 660 + rqstp->rq_arg.tail[0].iov_len; 661 svc_rdma_put_context(ctxt, 0); 662 out: 663 dprintk("svcrdma: ret = %d, rq_arg.len =%d, " 664 "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n", 665 ret, rqstp->rq_arg.len, 666 rqstp->rq_arg.head[0].iov_base, 667 rqstp->rq_arg.head[0].iov_len); 668 rqstp->rq_prot = IPPROTO_MAX; 669 svc_xprt_copy_addrs(rqstp, xprt); 670 return ret; 671 672 close_out: 673 if (ctxt) 674 svc_rdma_put_context(ctxt, 1); 675 dprintk("svcrdma: transport %p is closing\n", xprt); 676 /* 677 * Set the close bit and enqueue it. svc_recv will see the 678 * close bit and call svc_xprt_delete 679 */ 680 set_bit(XPT_CLOSE, &xprt->xpt_flags); 681 defer: 682 return 0; 683 } 684