1 /* 2 * Copyright (c) 2016 Oracle. All rights reserved. 3 * 4 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks. 5 */ 6 7 #include <linux/sunrpc/rpc_rdma.h> 8 #include <linux/sunrpc/svc_rdma.h> 9 #include <linux/sunrpc/debug.h> 10 11 #include <rdma/rw.h> 12 13 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 14 15 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc); 16 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc); 17 18 /* Each R/W context contains state for one chain of RDMA Read or 19 * Write Work Requests. 20 * 21 * Each WR chain handles a single contiguous server-side buffer, 22 * because scatterlist entries after the first have to start on 23 * page alignment. xdr_buf iovecs cannot guarantee alignment. 24 * 25 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment 26 * from a client may contain a unique R_key, so each WR chain moves 27 * up to one segment at a time. 28 * 29 * The scatterlist makes this data structure over 4KB in size. To 30 * make it less likely to fail, and to handle the allocation for 31 * smaller I/O requests without disabling bottom-halves, these 32 * contexts are created on demand, but cached and reused until the 33 * controlling svcxprt_rdma is destroyed. 34 */ 35 struct svc_rdma_rw_ctxt { 36 struct list_head rw_list; 37 struct rdma_rw_ctx rw_ctx; 38 int rw_nents; 39 struct sg_table rw_sg_table; 40 struct scatterlist rw_first_sgl[0]; 41 }; 42 43 static inline struct svc_rdma_rw_ctxt * 44 svc_rdma_next_ctxt(struct list_head *list) 45 { 46 return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt, 47 rw_list); 48 } 49 50 static struct svc_rdma_rw_ctxt * 51 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges) 52 { 53 struct svc_rdma_rw_ctxt *ctxt; 54 55 spin_lock(&rdma->sc_rw_ctxt_lock); 56 57 ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts); 58 if (ctxt) { 59 list_del(&ctxt->rw_list); 60 spin_unlock(&rdma->sc_rw_ctxt_lock); 61 } else { 62 spin_unlock(&rdma->sc_rw_ctxt_lock); 63 ctxt = kmalloc(sizeof(*ctxt) + 64 SG_CHUNK_SIZE * sizeof(struct scatterlist), 65 GFP_KERNEL); 66 if (!ctxt) 67 goto out; 68 INIT_LIST_HEAD(&ctxt->rw_list); 69 } 70 71 ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl; 72 if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges, 73 ctxt->rw_sg_table.sgl)) { 74 kfree(ctxt); 75 ctxt = NULL; 76 } 77 out: 78 return ctxt; 79 } 80 81 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, 82 struct svc_rdma_rw_ctxt *ctxt) 83 { 84 sg_free_table_chained(&ctxt->rw_sg_table, true); 85 86 spin_lock(&rdma->sc_rw_ctxt_lock); 87 list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts); 88 spin_unlock(&rdma->sc_rw_ctxt_lock); 89 } 90 91 /** 92 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts 93 * @rdma: transport about to be destroyed 94 * 95 */ 96 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma) 97 { 98 struct svc_rdma_rw_ctxt *ctxt; 99 100 while ((ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts)) != NULL) { 101 list_del(&ctxt->rw_list); 102 kfree(ctxt); 103 } 104 } 105 106 /* A chunk context tracks all I/O for moving one Read or Write 107 * chunk. This is a a set of rdma_rw's that handle data movement 108 * for all segments of one chunk. 109 * 110 * These are small, acquired with a single allocator call, and 111 * no more than one is needed per chunk. They are allocated on 112 * demand, and not cached. 113 */ 114 struct svc_rdma_chunk_ctxt { 115 struct ib_cqe cc_cqe; 116 struct svcxprt_rdma *cc_rdma; 117 struct list_head cc_rwctxts; 118 int cc_sqecount; 119 }; 120 121 static void svc_rdma_cc_init(struct svcxprt_rdma *rdma, 122 struct svc_rdma_chunk_ctxt *cc) 123 { 124 cc->cc_rdma = rdma; 125 svc_xprt_get(&rdma->sc_xprt); 126 127 INIT_LIST_HEAD(&cc->cc_rwctxts); 128 cc->cc_sqecount = 0; 129 } 130 131 static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc, 132 enum dma_data_direction dir) 133 { 134 struct svcxprt_rdma *rdma = cc->cc_rdma; 135 struct svc_rdma_rw_ctxt *ctxt; 136 137 while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) { 138 list_del(&ctxt->rw_list); 139 140 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp, 141 rdma->sc_port_num, ctxt->rw_sg_table.sgl, 142 ctxt->rw_nents, dir); 143 svc_rdma_put_rw_ctxt(rdma, ctxt); 144 } 145 svc_xprt_put(&rdma->sc_xprt); 146 } 147 148 /* State for sending a Write or Reply chunk. 149 * - Tracks progress of writing one chunk over all its segments 150 * - Stores arguments for the SGL constructor functions 151 */ 152 struct svc_rdma_write_info { 153 /* write state of this chunk */ 154 unsigned int wi_seg_off; 155 unsigned int wi_seg_no; 156 unsigned int wi_nsegs; 157 __be32 *wi_segs; 158 159 /* SGL constructor arguments */ 160 struct xdr_buf *wi_xdr; 161 unsigned char *wi_base; 162 unsigned int wi_next_off; 163 164 struct svc_rdma_chunk_ctxt wi_cc; 165 }; 166 167 static struct svc_rdma_write_info * 168 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk) 169 { 170 struct svc_rdma_write_info *info; 171 172 info = kmalloc(sizeof(*info), GFP_KERNEL); 173 if (!info) 174 return info; 175 176 info->wi_seg_off = 0; 177 info->wi_seg_no = 0; 178 info->wi_nsegs = be32_to_cpup(++chunk); 179 info->wi_segs = ++chunk; 180 svc_rdma_cc_init(rdma, &info->wi_cc); 181 info->wi_cc.cc_cqe.done = svc_rdma_write_done; 182 return info; 183 } 184 185 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) 186 { 187 svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE); 188 kfree(info); 189 } 190 191 /** 192 * svc_rdma_write_done - Write chunk completion 193 * @cq: controlling Completion Queue 194 * @wc: Work Completion 195 * 196 * Pages under I/O are freed by a subsequent Send completion. 197 */ 198 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) 199 { 200 struct ib_cqe *cqe = wc->wr_cqe; 201 struct svc_rdma_chunk_ctxt *cc = 202 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 203 struct svcxprt_rdma *rdma = cc->cc_rdma; 204 struct svc_rdma_write_info *info = 205 container_of(cc, struct svc_rdma_write_info, wi_cc); 206 207 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 208 wake_up(&rdma->sc_send_wait); 209 210 if (unlikely(wc->status != IB_WC_SUCCESS)) { 211 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); 212 if (wc->status != IB_WC_WR_FLUSH_ERR) 213 pr_err("svcrdma: write ctx: %s (%u/0x%x)\n", 214 ib_wc_status_msg(wc->status), 215 wc->status, wc->vendor_err); 216 } 217 218 svc_rdma_write_info_free(info); 219 } 220 221 /* State for pulling a Read chunk. 222 */ 223 struct svc_rdma_read_info { 224 struct svc_rdma_op_ctxt *ri_readctxt; 225 unsigned int ri_position; 226 unsigned int ri_pageno; 227 unsigned int ri_pageoff; 228 unsigned int ri_chunklen; 229 230 struct svc_rdma_chunk_ctxt ri_cc; 231 }; 232 233 static struct svc_rdma_read_info * 234 svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma) 235 { 236 struct svc_rdma_read_info *info; 237 238 info = kmalloc(sizeof(*info), GFP_KERNEL); 239 if (!info) 240 return info; 241 242 svc_rdma_cc_init(rdma, &info->ri_cc); 243 info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done; 244 return info; 245 } 246 247 static void svc_rdma_read_info_free(struct svc_rdma_read_info *info) 248 { 249 svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE); 250 kfree(info); 251 } 252 253 /** 254 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx 255 * @cq: controlling Completion Queue 256 * @wc: Work Completion 257 * 258 */ 259 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) 260 { 261 struct ib_cqe *cqe = wc->wr_cqe; 262 struct svc_rdma_chunk_ctxt *cc = 263 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 264 struct svcxprt_rdma *rdma = cc->cc_rdma; 265 struct svc_rdma_read_info *info = 266 container_of(cc, struct svc_rdma_read_info, ri_cc); 267 268 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 269 wake_up(&rdma->sc_send_wait); 270 271 if (unlikely(wc->status != IB_WC_SUCCESS)) { 272 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); 273 if (wc->status != IB_WC_WR_FLUSH_ERR) 274 pr_err("svcrdma: read ctx: %s (%u/0x%x)\n", 275 ib_wc_status_msg(wc->status), 276 wc->status, wc->vendor_err); 277 svc_rdma_put_context(info->ri_readctxt, 1); 278 } else { 279 spin_lock(&rdma->sc_rq_dto_lock); 280 list_add_tail(&info->ri_readctxt->list, 281 &rdma->sc_read_complete_q); 282 spin_unlock(&rdma->sc_rq_dto_lock); 283 284 set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags); 285 svc_xprt_enqueue(&rdma->sc_xprt); 286 } 287 288 svc_rdma_read_info_free(info); 289 } 290 291 /* This function sleeps when the transport's Send Queue is congested. 292 * 293 * Assumptions: 294 * - If ib_post_send() succeeds, only one completion is expected, 295 * even if one or more WRs are flushed. This is true when posting 296 * an rdma_rw_ctx or when posting a single signaled WR. 297 */ 298 static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc) 299 { 300 struct svcxprt_rdma *rdma = cc->cc_rdma; 301 struct svc_xprt *xprt = &rdma->sc_xprt; 302 struct ib_send_wr *first_wr, *bad_wr; 303 struct list_head *tmp; 304 struct ib_cqe *cqe; 305 int ret; 306 307 if (cc->cc_sqecount > rdma->sc_sq_depth) 308 return -EINVAL; 309 310 first_wr = NULL; 311 cqe = &cc->cc_cqe; 312 list_for_each(tmp, &cc->cc_rwctxts) { 313 struct svc_rdma_rw_ctxt *ctxt; 314 315 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list); 316 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp, 317 rdma->sc_port_num, cqe, first_wr); 318 cqe = NULL; 319 } 320 321 do { 322 if (atomic_sub_return(cc->cc_sqecount, 323 &rdma->sc_sq_avail) > 0) { 324 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); 325 if (ret) 326 break; 327 return 0; 328 } 329 330 atomic_inc(&rdma_stat_sq_starve); 331 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 332 wait_event(rdma->sc_send_wait, 333 atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount); 334 } while (1); 335 336 pr_err("svcrdma: ib_post_send failed (%d)\n", ret); 337 set_bit(XPT_CLOSE, &xprt->xpt_flags); 338 339 /* If even one was posted, there will be a completion. */ 340 if (bad_wr != first_wr) 341 return 0; 342 343 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 344 wake_up(&rdma->sc_send_wait); 345 return -ENOTCONN; 346 } 347 348 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf 349 */ 350 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info, 351 unsigned int len, 352 struct svc_rdma_rw_ctxt *ctxt) 353 { 354 struct scatterlist *sg = ctxt->rw_sg_table.sgl; 355 356 sg_set_buf(&sg[0], info->wi_base, len); 357 info->wi_base += len; 358 359 ctxt->rw_nents = 1; 360 } 361 362 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist. 363 */ 364 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info, 365 unsigned int remaining, 366 struct svc_rdma_rw_ctxt *ctxt) 367 { 368 unsigned int sge_no, sge_bytes, page_off, page_no; 369 struct xdr_buf *xdr = info->wi_xdr; 370 struct scatterlist *sg; 371 struct page **page; 372 373 page_off = info->wi_next_off + xdr->page_base; 374 page_no = page_off >> PAGE_SHIFT; 375 page_off = offset_in_page(page_off); 376 page = xdr->pages + page_no; 377 info->wi_next_off += remaining; 378 sg = ctxt->rw_sg_table.sgl; 379 sge_no = 0; 380 do { 381 sge_bytes = min_t(unsigned int, remaining, 382 PAGE_SIZE - page_off); 383 sg_set_page(sg, *page, sge_bytes, page_off); 384 385 remaining -= sge_bytes; 386 sg = sg_next(sg); 387 page_off = 0; 388 sge_no++; 389 page++; 390 } while (remaining); 391 392 ctxt->rw_nents = sge_no; 393 } 394 395 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing 396 * an RPC Reply. 397 */ 398 static int 399 svc_rdma_build_writes(struct svc_rdma_write_info *info, 400 void (*constructor)(struct svc_rdma_write_info *info, 401 unsigned int len, 402 struct svc_rdma_rw_ctxt *ctxt), 403 unsigned int remaining) 404 { 405 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; 406 struct svcxprt_rdma *rdma = cc->cc_rdma; 407 struct svc_rdma_rw_ctxt *ctxt; 408 __be32 *seg; 409 int ret; 410 411 seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz; 412 do { 413 unsigned int write_len; 414 u32 seg_length, seg_handle; 415 u64 seg_offset; 416 417 if (info->wi_seg_no >= info->wi_nsegs) 418 goto out_overflow; 419 420 seg_handle = be32_to_cpup(seg); 421 seg_length = be32_to_cpup(seg + 1); 422 xdr_decode_hyper(seg + 2, &seg_offset); 423 seg_offset += info->wi_seg_off; 424 425 write_len = min(remaining, seg_length - info->wi_seg_off); 426 ctxt = svc_rdma_get_rw_ctxt(rdma, 427 (write_len >> PAGE_SHIFT) + 2); 428 if (!ctxt) 429 goto out_noctx; 430 431 constructor(info, write_len, ctxt); 432 ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, 433 rdma->sc_port_num, ctxt->rw_sg_table.sgl, 434 ctxt->rw_nents, 0, seg_offset, 435 seg_handle, DMA_TO_DEVICE); 436 if (ret < 0) 437 goto out_initerr; 438 439 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 440 cc->cc_sqecount += ret; 441 if (write_len == seg_length - info->wi_seg_off) { 442 seg += 4; 443 info->wi_seg_no++; 444 info->wi_seg_off = 0; 445 } else { 446 info->wi_seg_off += write_len; 447 } 448 remaining -= write_len; 449 } while (remaining); 450 451 return 0; 452 453 out_overflow: 454 dprintk("svcrdma: inadequate space in Write chunk (%u)\n", 455 info->wi_nsegs); 456 return -E2BIG; 457 458 out_noctx: 459 dprintk("svcrdma: no R/W ctxs available\n"); 460 return -ENOMEM; 461 462 out_initerr: 463 svc_rdma_put_rw_ctxt(rdma, ctxt); 464 pr_err("svcrdma: failed to map pagelist (%d)\n", ret); 465 return -EIO; 466 } 467 468 /* Send one of an xdr_buf's kvecs by itself. To send a Reply 469 * chunk, the whole RPC Reply is written back to the client. 470 * This function writes either the head or tail of the xdr_buf 471 * containing the Reply. 472 */ 473 static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info, 474 struct kvec *vec) 475 { 476 info->wi_base = vec->iov_base; 477 return svc_rdma_build_writes(info, svc_rdma_vec_to_sg, 478 vec->iov_len); 479 } 480 481 /* Send an xdr_buf's page list by itself. A Write chunk is 482 * just the page list. a Reply chunk is the head, page list, 483 * and tail. This function is shared between the two types 484 * of chunk. 485 */ 486 static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info, 487 struct xdr_buf *xdr) 488 { 489 info->wi_xdr = xdr; 490 info->wi_next_off = 0; 491 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg, 492 xdr->page_len); 493 } 494 495 /** 496 * svc_rdma_send_write_chunk - Write all segments in a Write chunk 497 * @rdma: controlling RDMA transport 498 * @wr_ch: Write chunk provided by client 499 * @xdr: xdr_buf containing the data payload 500 * 501 * Returns a non-negative number of bytes the chunk consumed, or 502 * %-E2BIG if the payload was larger than the Write chunk, 503 * %-EINVAL if client provided too many segments, 504 * %-ENOMEM if rdma_rw context pool was exhausted, 505 * %-ENOTCONN if posting failed (connection is lost), 506 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 507 */ 508 int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch, 509 struct xdr_buf *xdr) 510 { 511 struct svc_rdma_write_info *info; 512 int ret; 513 514 if (!xdr->page_len) 515 return 0; 516 517 info = svc_rdma_write_info_alloc(rdma, wr_ch); 518 if (!info) 519 return -ENOMEM; 520 521 ret = svc_rdma_send_xdr_pagelist(info, xdr); 522 if (ret < 0) 523 goto out_err; 524 525 ret = svc_rdma_post_chunk_ctxt(&info->wi_cc); 526 if (ret < 0) 527 goto out_err; 528 return xdr->page_len; 529 530 out_err: 531 svc_rdma_write_info_free(info); 532 return ret; 533 } 534 535 /** 536 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk 537 * @rdma: controlling RDMA transport 538 * @rp_ch: Reply chunk provided by client 539 * @writelist: true if client provided a Write list 540 * @xdr: xdr_buf containing an RPC Reply 541 * 542 * Returns a non-negative number of bytes the chunk consumed, or 543 * %-E2BIG if the payload was larger than the Reply chunk, 544 * %-EINVAL if client provided too many segments, 545 * %-ENOMEM if rdma_rw context pool was exhausted, 546 * %-ENOTCONN if posting failed (connection is lost), 547 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 548 */ 549 int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch, 550 bool writelist, struct xdr_buf *xdr) 551 { 552 struct svc_rdma_write_info *info; 553 int consumed, ret; 554 555 info = svc_rdma_write_info_alloc(rdma, rp_ch); 556 if (!info) 557 return -ENOMEM; 558 559 ret = svc_rdma_send_xdr_kvec(info, &xdr->head[0]); 560 if (ret < 0) 561 goto out_err; 562 consumed = xdr->head[0].iov_len; 563 564 /* Send the page list in the Reply chunk only if the 565 * client did not provide Write chunks. 566 */ 567 if (!writelist && xdr->page_len) { 568 ret = svc_rdma_send_xdr_pagelist(info, xdr); 569 if (ret < 0) 570 goto out_err; 571 consumed += xdr->page_len; 572 } 573 574 if (xdr->tail[0].iov_len) { 575 ret = svc_rdma_send_xdr_kvec(info, &xdr->tail[0]); 576 if (ret < 0) 577 goto out_err; 578 consumed += xdr->tail[0].iov_len; 579 } 580 581 ret = svc_rdma_post_chunk_ctxt(&info->wi_cc); 582 if (ret < 0) 583 goto out_err; 584 return consumed; 585 586 out_err: 587 svc_rdma_write_info_free(info); 588 return ret; 589 } 590 591 static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, 592 struct svc_rqst *rqstp, 593 u32 rkey, u32 len, u64 offset) 594 { 595 struct svc_rdma_op_ctxt *head = info->ri_readctxt; 596 struct svc_rdma_chunk_ctxt *cc = &info->ri_cc; 597 struct svc_rdma_rw_ctxt *ctxt; 598 unsigned int sge_no, seg_len; 599 struct scatterlist *sg; 600 int ret; 601 602 sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT; 603 ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no); 604 if (!ctxt) 605 goto out_noctx; 606 ctxt->rw_nents = sge_no; 607 608 dprintk("svcrdma: reading segment %u@0x%016llx:0x%08x (%u sges)\n", 609 len, offset, rkey, sge_no); 610 611 sg = ctxt->rw_sg_table.sgl; 612 for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) { 613 seg_len = min_t(unsigned int, len, 614 PAGE_SIZE - info->ri_pageoff); 615 616 head->arg.pages[info->ri_pageno] = 617 rqstp->rq_pages[info->ri_pageno]; 618 if (!info->ri_pageoff) 619 head->count++; 620 621 sg_set_page(sg, rqstp->rq_pages[info->ri_pageno], 622 seg_len, info->ri_pageoff); 623 sg = sg_next(sg); 624 625 info->ri_pageoff += seg_len; 626 if (info->ri_pageoff == PAGE_SIZE) { 627 info->ri_pageno++; 628 info->ri_pageoff = 0; 629 } 630 len -= seg_len; 631 632 /* Safety check */ 633 if (len && 634 &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end) 635 goto out_overrun; 636 } 637 638 ret = rdma_rw_ctx_init(&ctxt->rw_ctx, cc->cc_rdma->sc_qp, 639 cc->cc_rdma->sc_port_num, 640 ctxt->rw_sg_table.sgl, ctxt->rw_nents, 641 0, offset, rkey, DMA_FROM_DEVICE); 642 if (ret < 0) 643 goto out_initerr; 644 645 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 646 cc->cc_sqecount += ret; 647 return 0; 648 649 out_noctx: 650 dprintk("svcrdma: no R/W ctxs available\n"); 651 return -ENOMEM; 652 653 out_overrun: 654 dprintk("svcrdma: request overruns rq_pages\n"); 655 return -EINVAL; 656 657 out_initerr: 658 svc_rdma_put_rw_ctxt(cc->cc_rdma, ctxt); 659 pr_err("svcrdma: failed to map pagelist (%d)\n", ret); 660 return -EIO; 661 } 662 663 /* Walk the segments in the Read chunk starting at @p and construct 664 * RDMA Read operations to pull the chunk to the server. 665 */ 666 static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp, 667 struct svc_rdma_read_info *info, 668 __be32 *p) 669 { 670 int ret; 671 672 ret = -EINVAL; 673 info->ri_chunklen = 0; 674 while (*p++ != xdr_zero && be32_to_cpup(p++) == info->ri_position) { 675 u32 rs_handle, rs_length; 676 u64 rs_offset; 677 678 rs_handle = be32_to_cpup(p++); 679 rs_length = be32_to_cpup(p++); 680 p = xdr_decode_hyper(p, &rs_offset); 681 682 ret = svc_rdma_build_read_segment(info, rqstp, 683 rs_handle, rs_length, 684 rs_offset); 685 if (ret < 0) 686 break; 687 688 info->ri_chunklen += rs_length; 689 } 690 691 return ret; 692 } 693 694 /* Construct RDMA Reads to pull over a normal Read chunk. The chunk 695 * data lands in the page list of head->arg.pages. 696 * 697 * Currently NFSD does not look at the head->arg.tail[0] iovec. 698 * Therefore, XDR round-up of the Read chunk and trailing 699 * inline content must both be added at the end of the pagelist. 700 */ 701 static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp, 702 struct svc_rdma_read_info *info, 703 __be32 *p) 704 { 705 struct svc_rdma_op_ctxt *head = info->ri_readctxt; 706 int ret; 707 708 dprintk("svcrdma: Reading Read chunk at position %u\n", 709 info->ri_position); 710 711 info->ri_pageno = head->hdr_count; 712 info->ri_pageoff = 0; 713 714 ret = svc_rdma_build_read_chunk(rqstp, info, p); 715 if (ret < 0) 716 goto out; 717 718 /* Split the Receive buffer between the head and tail 719 * buffers at Read chunk's position. XDR roundup of the 720 * chunk is not included in either the pagelist or in 721 * the tail. 722 */ 723 head->arg.tail[0].iov_base = 724 head->arg.head[0].iov_base + info->ri_position; 725 head->arg.tail[0].iov_len = 726 head->arg.head[0].iov_len - info->ri_position; 727 head->arg.head[0].iov_len = info->ri_position; 728 729 /* Read chunk may need XDR roundup (see RFC 5666, s. 3.7). 730 * 731 * NFSv2/3 write decoders need the length of the tail to 732 * contain the size of the roundup padding. 733 */ 734 head->arg.tail[0].iov_len += 4 - (info->ri_chunklen & 3); 735 736 head->arg.page_len = info->ri_chunklen; 737 head->arg.len += info->ri_chunklen; 738 head->arg.buflen += info->ri_chunklen; 739 740 out: 741 return ret; 742 } 743 744 /* Construct RDMA Reads to pull over a Position Zero Read chunk. 745 * The start of the data lands in the first page just after 746 * the Transport header, and the rest lands in the page list of 747 * head->arg.pages. 748 * 749 * Assumptions: 750 * - A PZRC has an XDR-aligned length (no implicit round-up). 751 * - There can be no trailing inline content (IOW, we assume 752 * a PZRC is never sent in an RDMA_MSG message, though it's 753 * allowed by spec). 754 */ 755 static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp, 756 struct svc_rdma_read_info *info, 757 __be32 *p) 758 { 759 struct svc_rdma_op_ctxt *head = info->ri_readctxt; 760 int ret; 761 762 dprintk("svcrdma: Reading Position Zero Read chunk\n"); 763 764 info->ri_pageno = head->hdr_count - 1; 765 info->ri_pageoff = offset_in_page(head->byte_len); 766 767 ret = svc_rdma_build_read_chunk(rqstp, info, p); 768 if (ret < 0) 769 goto out; 770 771 head->arg.len += info->ri_chunklen; 772 head->arg.buflen += info->ri_chunklen; 773 774 if (head->arg.buflen <= head->sge[0].length) { 775 /* Transport header and RPC message fit entirely 776 * in page where head iovec resides. 777 */ 778 head->arg.head[0].iov_len = info->ri_chunklen; 779 } else { 780 /* Transport header and part of RPC message reside 781 * in the head iovec's page. 782 */ 783 head->arg.head[0].iov_len = 784 head->sge[0].length - head->byte_len; 785 head->arg.page_len = 786 info->ri_chunklen - head->arg.head[0].iov_len; 787 } 788 789 out: 790 return ret; 791 } 792 793 /** 794 * svc_rdma_recv_read_chunk - Pull a Read chunk from the client 795 * @rdma: controlling RDMA transport 796 * @rqstp: set of pages to use as Read sink buffers 797 * @head: pages under I/O collect here 798 * @p: pointer to start of Read chunk 799 * 800 * Returns: 801 * %0 if all needed RDMA Reads were posted successfully, 802 * %-EINVAL if client provided too many segments, 803 * %-ENOMEM if rdma_rw context pool was exhausted, 804 * %-ENOTCONN if posting failed (connection is lost), 805 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 806 * 807 * Assumptions: 808 * - All Read segments in @p have the same Position value. 809 */ 810 int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, 811 struct svc_rdma_op_ctxt *head, __be32 *p) 812 { 813 struct svc_rdma_read_info *info; 814 struct page **page; 815 int ret; 816 817 /* The request (with page list) is constructed in 818 * head->arg. Pages involved with RDMA Read I/O are 819 * transferred there. 820 */ 821 head->hdr_count = head->count; 822 head->arg.head[0] = rqstp->rq_arg.head[0]; 823 head->arg.tail[0] = rqstp->rq_arg.tail[0]; 824 head->arg.pages = head->pages; 825 head->arg.page_base = 0; 826 head->arg.page_len = 0; 827 head->arg.len = rqstp->rq_arg.len; 828 head->arg.buflen = rqstp->rq_arg.buflen; 829 830 info = svc_rdma_read_info_alloc(rdma); 831 if (!info) 832 return -ENOMEM; 833 info->ri_readctxt = head; 834 835 info->ri_position = be32_to_cpup(p + 1); 836 if (info->ri_position) 837 ret = svc_rdma_build_normal_read_chunk(rqstp, info, p); 838 else 839 ret = svc_rdma_build_pz_read_chunk(rqstp, info, p); 840 841 /* Mark the start of the pages that can be used for the reply */ 842 if (info->ri_pageoff > 0) 843 info->ri_pageno++; 844 rqstp->rq_respages = &rqstp->rq_pages[info->ri_pageno]; 845 rqstp->rq_next_page = rqstp->rq_respages + 1; 846 847 if (ret < 0) 848 goto out; 849 850 ret = svc_rdma_post_chunk_ctxt(&info->ri_cc); 851 852 out: 853 /* Read sink pages have been moved from rqstp->rq_pages to 854 * head->arg.pages. Force svc_recv to refill those slots 855 * in rq_pages. 856 */ 857 for (page = rqstp->rq_pages; page < rqstp->rq_respages; page++) 858 *page = NULL; 859 860 if (ret < 0) 861 svc_rdma_read_info_free(info); 862 return ret; 863 } 864