1 /* 2 * Copyright (c) 2016 Oracle. All rights reserved. 3 * 4 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks. 5 */ 6 7 #include <linux/sunrpc/rpc_rdma.h> 8 #include <linux/sunrpc/svc_rdma.h> 9 #include <linux/sunrpc/debug.h> 10 11 #include <rdma/rw.h> 12 13 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 14 15 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc); 16 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc); 17 18 /* Each R/W context contains state for one chain of RDMA Read or 19 * Write Work Requests. 20 * 21 * Each WR chain handles a single contiguous server-side buffer, 22 * because scatterlist entries after the first have to start on 23 * page alignment. xdr_buf iovecs cannot guarantee alignment. 24 * 25 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment 26 * from a client may contain a unique R_key, so each WR chain moves 27 * up to one segment at a time. 28 * 29 * The scatterlist makes this data structure over 4KB in size. To 30 * make it less likely to fail, and to handle the allocation for 31 * smaller I/O requests without disabling bottom-halves, these 32 * contexts are created on demand, but cached and reused until the 33 * controlling svcxprt_rdma is destroyed. 34 */ 35 struct svc_rdma_rw_ctxt { 36 struct list_head rw_list; 37 struct rdma_rw_ctx rw_ctx; 38 int rw_nents; 39 struct sg_table rw_sg_table; 40 struct scatterlist rw_first_sgl[0]; 41 }; 42 43 static inline struct svc_rdma_rw_ctxt * 44 svc_rdma_next_ctxt(struct list_head *list) 45 { 46 return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt, 47 rw_list); 48 } 49 50 static struct svc_rdma_rw_ctxt * 51 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges) 52 { 53 struct svc_rdma_rw_ctxt *ctxt; 54 55 spin_lock(&rdma->sc_rw_ctxt_lock); 56 57 ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts); 58 if (ctxt) { 59 list_del(&ctxt->rw_list); 60 spin_unlock(&rdma->sc_rw_ctxt_lock); 61 } else { 62 spin_unlock(&rdma->sc_rw_ctxt_lock); 63 ctxt = kmalloc(sizeof(*ctxt) + 64 SG_CHUNK_SIZE * sizeof(struct scatterlist), 65 GFP_KERNEL); 66 if (!ctxt) 67 goto out; 68 INIT_LIST_HEAD(&ctxt->rw_list); 69 } 70 71 ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl; 72 if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges, 73 ctxt->rw_sg_table.sgl)) { 74 kfree(ctxt); 75 ctxt = NULL; 76 } 77 out: 78 return ctxt; 79 } 80 81 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, 82 struct svc_rdma_rw_ctxt *ctxt) 83 { 84 sg_free_table_chained(&ctxt->rw_sg_table, true); 85 86 spin_lock(&rdma->sc_rw_ctxt_lock); 87 list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts); 88 spin_unlock(&rdma->sc_rw_ctxt_lock); 89 } 90 91 /** 92 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts 93 * @rdma: transport about to be destroyed 94 * 95 */ 96 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma) 97 { 98 struct svc_rdma_rw_ctxt *ctxt; 99 100 while ((ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts)) != NULL) { 101 list_del(&ctxt->rw_list); 102 kfree(ctxt); 103 } 104 } 105 106 /* A chunk context tracks all I/O for moving one Read or Write 107 * chunk. This is a a set of rdma_rw's that handle data movement 108 * for all segments of one chunk. 109 * 110 * These are small, acquired with a single allocator call, and 111 * no more than one is needed per chunk. They are allocated on 112 * demand, and not cached. 113 */ 114 struct svc_rdma_chunk_ctxt { 115 struct ib_cqe cc_cqe; 116 struct svcxprt_rdma *cc_rdma; 117 struct list_head cc_rwctxts; 118 int cc_sqecount; 119 }; 120 121 static void svc_rdma_cc_init(struct svcxprt_rdma *rdma, 122 struct svc_rdma_chunk_ctxt *cc) 123 { 124 cc->cc_rdma = rdma; 125 svc_xprt_get(&rdma->sc_xprt); 126 127 INIT_LIST_HEAD(&cc->cc_rwctxts); 128 cc->cc_sqecount = 0; 129 } 130 131 static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc, 132 enum dma_data_direction dir) 133 { 134 struct svcxprt_rdma *rdma = cc->cc_rdma; 135 struct svc_rdma_rw_ctxt *ctxt; 136 137 while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) { 138 list_del(&ctxt->rw_list); 139 140 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp, 141 rdma->sc_port_num, ctxt->rw_sg_table.sgl, 142 ctxt->rw_nents, dir); 143 svc_rdma_put_rw_ctxt(rdma, ctxt); 144 } 145 svc_xprt_put(&rdma->sc_xprt); 146 } 147 148 /* State for sending a Write or Reply chunk. 149 * - Tracks progress of writing one chunk over all its segments 150 * - Stores arguments for the SGL constructor functions 151 */ 152 struct svc_rdma_write_info { 153 /* write state of this chunk */ 154 unsigned int wi_seg_off; 155 unsigned int wi_seg_no; 156 unsigned int wi_nsegs; 157 __be32 *wi_segs; 158 159 /* SGL constructor arguments */ 160 struct xdr_buf *wi_xdr; 161 unsigned char *wi_base; 162 unsigned int wi_next_off; 163 164 struct svc_rdma_chunk_ctxt wi_cc; 165 }; 166 167 static struct svc_rdma_write_info * 168 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk) 169 { 170 struct svc_rdma_write_info *info; 171 172 info = kmalloc(sizeof(*info), GFP_KERNEL); 173 if (!info) 174 return info; 175 176 info->wi_seg_off = 0; 177 info->wi_seg_no = 0; 178 info->wi_nsegs = be32_to_cpup(++chunk); 179 info->wi_segs = ++chunk; 180 svc_rdma_cc_init(rdma, &info->wi_cc); 181 info->wi_cc.cc_cqe.done = svc_rdma_write_done; 182 return info; 183 } 184 185 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) 186 { 187 svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE); 188 kfree(info); 189 } 190 191 /** 192 * svc_rdma_write_done - Write chunk completion 193 * @cq: controlling Completion Queue 194 * @wc: Work Completion 195 * 196 * Pages under I/O are freed by a subsequent Send completion. 197 */ 198 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) 199 { 200 struct ib_cqe *cqe = wc->wr_cqe; 201 struct svc_rdma_chunk_ctxt *cc = 202 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 203 struct svcxprt_rdma *rdma = cc->cc_rdma; 204 struct svc_rdma_write_info *info = 205 container_of(cc, struct svc_rdma_write_info, wi_cc); 206 207 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 208 wake_up(&rdma->sc_send_wait); 209 210 if (unlikely(wc->status != IB_WC_SUCCESS)) { 211 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); 212 if (wc->status != IB_WC_WR_FLUSH_ERR) 213 pr_err("svcrdma: write ctx: %s (%u/0x%x)\n", 214 ib_wc_status_msg(wc->status), 215 wc->status, wc->vendor_err); 216 } 217 218 svc_rdma_write_info_free(info); 219 } 220 221 /* State for pulling a Read chunk. 222 */ 223 struct svc_rdma_read_info { 224 struct svc_rdma_op_ctxt *ri_readctxt; 225 unsigned int ri_position; 226 unsigned int ri_pageno; 227 unsigned int ri_pageoff; 228 unsigned int ri_chunklen; 229 230 struct svc_rdma_chunk_ctxt ri_cc; 231 }; 232 233 static struct svc_rdma_read_info * 234 svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma) 235 { 236 struct svc_rdma_read_info *info; 237 238 info = kmalloc(sizeof(*info), GFP_KERNEL); 239 if (!info) 240 return info; 241 242 svc_rdma_cc_init(rdma, &info->ri_cc); 243 info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done; 244 return info; 245 } 246 247 static void svc_rdma_read_info_free(struct svc_rdma_read_info *info) 248 { 249 svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE); 250 kfree(info); 251 } 252 253 /** 254 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx 255 * @cq: controlling Completion Queue 256 * @wc: Work Completion 257 * 258 */ 259 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) 260 { 261 struct ib_cqe *cqe = wc->wr_cqe; 262 struct svc_rdma_chunk_ctxt *cc = 263 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 264 struct svcxprt_rdma *rdma = cc->cc_rdma; 265 struct svc_rdma_read_info *info = 266 container_of(cc, struct svc_rdma_read_info, ri_cc); 267 268 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 269 wake_up(&rdma->sc_send_wait); 270 271 if (unlikely(wc->status != IB_WC_SUCCESS)) { 272 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); 273 if (wc->status != IB_WC_WR_FLUSH_ERR) 274 pr_err("svcrdma: read ctx: %s (%u/0x%x)\n", 275 ib_wc_status_msg(wc->status), 276 wc->status, wc->vendor_err); 277 svc_rdma_put_context(info->ri_readctxt, 1); 278 } else { 279 spin_lock(&rdma->sc_rq_dto_lock); 280 list_add_tail(&info->ri_readctxt->list, 281 &rdma->sc_read_complete_q); 282 spin_unlock(&rdma->sc_rq_dto_lock); 283 284 set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags); 285 svc_xprt_enqueue(&rdma->sc_xprt); 286 } 287 288 svc_rdma_read_info_free(info); 289 } 290 291 /* This function sleeps when the transport's Send Queue is congested. 292 * 293 * Assumptions: 294 * - If ib_post_send() succeeds, only one completion is expected, 295 * even if one or more WRs are flushed. This is true when posting 296 * an rdma_rw_ctx or when posting a single signaled WR. 297 */ 298 static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc) 299 { 300 struct svcxprt_rdma *rdma = cc->cc_rdma; 301 struct svc_xprt *xprt = &rdma->sc_xprt; 302 struct ib_send_wr *first_wr, *bad_wr; 303 struct list_head *tmp; 304 struct ib_cqe *cqe; 305 int ret; 306 307 if (cc->cc_sqecount > rdma->sc_sq_depth) 308 return -EINVAL; 309 310 first_wr = NULL; 311 cqe = &cc->cc_cqe; 312 list_for_each(tmp, &cc->cc_rwctxts) { 313 struct svc_rdma_rw_ctxt *ctxt; 314 315 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list); 316 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp, 317 rdma->sc_port_num, cqe, first_wr); 318 cqe = NULL; 319 } 320 321 do { 322 if (atomic_sub_return(cc->cc_sqecount, 323 &rdma->sc_sq_avail) > 0) { 324 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); 325 if (ret) 326 break; 327 return 0; 328 } 329 330 atomic_inc(&rdma_stat_sq_starve); 331 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 332 wait_event(rdma->sc_send_wait, 333 atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount); 334 } while (1); 335 336 pr_err("svcrdma: ib_post_send failed (%d)\n", ret); 337 set_bit(XPT_CLOSE, &xprt->xpt_flags); 338 339 /* If even one was posted, there will be a completion. */ 340 if (bad_wr != first_wr) 341 return 0; 342 343 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 344 wake_up(&rdma->sc_send_wait); 345 return -ENOTCONN; 346 } 347 348 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf 349 */ 350 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info, 351 unsigned int len, 352 struct svc_rdma_rw_ctxt *ctxt) 353 { 354 struct scatterlist *sg = ctxt->rw_sg_table.sgl; 355 356 sg_set_buf(&sg[0], info->wi_base, len); 357 info->wi_base += len; 358 359 ctxt->rw_nents = 1; 360 } 361 362 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist. 363 */ 364 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info, 365 unsigned int remaining, 366 struct svc_rdma_rw_ctxt *ctxt) 367 { 368 unsigned int sge_no, sge_bytes, page_off, page_no; 369 struct xdr_buf *xdr = info->wi_xdr; 370 struct scatterlist *sg; 371 struct page **page; 372 373 page_off = info->wi_next_off + xdr->page_base; 374 page_no = page_off >> PAGE_SHIFT; 375 page_off = offset_in_page(page_off); 376 page = xdr->pages + page_no; 377 info->wi_next_off += remaining; 378 sg = ctxt->rw_sg_table.sgl; 379 sge_no = 0; 380 do { 381 sge_bytes = min_t(unsigned int, remaining, 382 PAGE_SIZE - page_off); 383 sg_set_page(sg, *page, sge_bytes, page_off); 384 385 remaining -= sge_bytes; 386 sg = sg_next(sg); 387 page_off = 0; 388 sge_no++; 389 page++; 390 } while (remaining); 391 392 ctxt->rw_nents = sge_no; 393 } 394 395 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing 396 * an RPC Reply. 397 */ 398 static int 399 svc_rdma_build_writes(struct svc_rdma_write_info *info, 400 void (*constructor)(struct svc_rdma_write_info *info, 401 unsigned int len, 402 struct svc_rdma_rw_ctxt *ctxt), 403 unsigned int remaining) 404 { 405 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; 406 struct svcxprt_rdma *rdma = cc->cc_rdma; 407 struct svc_rdma_rw_ctxt *ctxt; 408 __be32 *seg; 409 int ret; 410 411 seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz; 412 do { 413 unsigned int write_len; 414 u32 seg_length, seg_handle; 415 u64 seg_offset; 416 417 if (info->wi_seg_no >= info->wi_nsegs) 418 goto out_overflow; 419 420 seg_handle = be32_to_cpup(seg); 421 seg_length = be32_to_cpup(seg + 1); 422 xdr_decode_hyper(seg + 2, &seg_offset); 423 seg_offset += info->wi_seg_off; 424 425 write_len = min(remaining, seg_length - info->wi_seg_off); 426 ctxt = svc_rdma_get_rw_ctxt(rdma, 427 (write_len >> PAGE_SHIFT) + 2); 428 if (!ctxt) 429 goto out_noctx; 430 431 constructor(info, write_len, ctxt); 432 ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, 433 rdma->sc_port_num, ctxt->rw_sg_table.sgl, 434 ctxt->rw_nents, 0, seg_offset, 435 seg_handle, DMA_TO_DEVICE); 436 if (ret < 0) 437 goto out_initerr; 438 439 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 440 cc->cc_sqecount += ret; 441 if (write_len == seg_length - info->wi_seg_off) { 442 seg += 4; 443 info->wi_seg_no++; 444 info->wi_seg_off = 0; 445 } else { 446 info->wi_seg_off += write_len; 447 } 448 remaining -= write_len; 449 } while (remaining); 450 451 return 0; 452 453 out_overflow: 454 dprintk("svcrdma: inadequate space in Write chunk (%u)\n", 455 info->wi_nsegs); 456 return -E2BIG; 457 458 out_noctx: 459 dprintk("svcrdma: no R/W ctxs available\n"); 460 return -ENOMEM; 461 462 out_initerr: 463 svc_rdma_put_rw_ctxt(rdma, ctxt); 464 pr_err("svcrdma: failed to map pagelist (%d)\n", ret); 465 return -EIO; 466 } 467 468 /* Send one of an xdr_buf's kvecs by itself. To send a Reply 469 * chunk, the whole RPC Reply is written back to the client. 470 * This function writes either the head or tail of the xdr_buf 471 * containing the Reply. 472 */ 473 static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info, 474 struct kvec *vec) 475 { 476 info->wi_base = vec->iov_base; 477 return svc_rdma_build_writes(info, svc_rdma_vec_to_sg, 478 vec->iov_len); 479 } 480 481 /* Send an xdr_buf's page list by itself. A Write chunk is 482 * just the page list. a Reply chunk is the head, page list, 483 * and tail. This function is shared between the two types 484 * of chunk. 485 */ 486 static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info, 487 struct xdr_buf *xdr) 488 { 489 info->wi_xdr = xdr; 490 info->wi_next_off = 0; 491 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg, 492 xdr->page_len); 493 } 494 495 /** 496 * svc_rdma_send_write_chunk - Write all segments in a Write chunk 497 * @rdma: controlling RDMA transport 498 * @wr_ch: Write chunk provided by client 499 * @xdr: xdr_buf containing the data payload 500 * 501 * Returns a non-negative number of bytes the chunk consumed, or 502 * %-E2BIG if the payload was larger than the Write chunk, 503 * %-EINVAL if client provided too many segments, 504 * %-ENOMEM if rdma_rw context pool was exhausted, 505 * %-ENOTCONN if posting failed (connection is lost), 506 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 507 */ 508 int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch, 509 struct xdr_buf *xdr) 510 { 511 struct svc_rdma_write_info *info; 512 int ret; 513 514 if (!xdr->page_len) 515 return 0; 516 517 info = svc_rdma_write_info_alloc(rdma, wr_ch); 518 if (!info) 519 return -ENOMEM; 520 521 ret = svc_rdma_send_xdr_pagelist(info, xdr); 522 if (ret < 0) 523 goto out_err; 524 525 ret = svc_rdma_post_chunk_ctxt(&info->wi_cc); 526 if (ret < 0) 527 goto out_err; 528 return xdr->page_len; 529 530 out_err: 531 svc_rdma_write_info_free(info); 532 return ret; 533 } 534 535 /** 536 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk 537 * @rdma: controlling RDMA transport 538 * @rp_ch: Reply chunk provided by client 539 * @writelist: true if client provided a Write list 540 * @xdr: xdr_buf containing an RPC Reply 541 * 542 * Returns a non-negative number of bytes the chunk consumed, or 543 * %-E2BIG if the payload was larger than the Reply chunk, 544 * %-EINVAL if client provided too many segments, 545 * %-ENOMEM if rdma_rw context pool was exhausted, 546 * %-ENOTCONN if posting failed (connection is lost), 547 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 548 */ 549 int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch, 550 bool writelist, struct xdr_buf *xdr) 551 { 552 struct svc_rdma_write_info *info; 553 int consumed, ret; 554 555 info = svc_rdma_write_info_alloc(rdma, rp_ch); 556 if (!info) 557 return -ENOMEM; 558 559 ret = svc_rdma_send_xdr_kvec(info, &xdr->head[0]); 560 if (ret < 0) 561 goto out_err; 562 consumed = xdr->head[0].iov_len; 563 564 /* Send the page list in the Reply chunk only if the 565 * client did not provide Write chunks. 566 */ 567 if (!writelist && xdr->page_len) { 568 ret = svc_rdma_send_xdr_pagelist(info, xdr); 569 if (ret < 0) 570 goto out_err; 571 consumed += xdr->page_len; 572 } 573 574 if (xdr->tail[0].iov_len) { 575 ret = svc_rdma_send_xdr_kvec(info, &xdr->tail[0]); 576 if (ret < 0) 577 goto out_err; 578 consumed += xdr->tail[0].iov_len; 579 } 580 581 ret = svc_rdma_post_chunk_ctxt(&info->wi_cc); 582 if (ret < 0) 583 goto out_err; 584 return consumed; 585 586 out_err: 587 svc_rdma_write_info_free(info); 588 return ret; 589 } 590 591 static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, 592 struct svc_rqst *rqstp, 593 u32 rkey, u32 len, u64 offset) 594 { 595 struct svc_rdma_op_ctxt *head = info->ri_readctxt; 596 struct svc_rdma_chunk_ctxt *cc = &info->ri_cc; 597 struct svc_rdma_rw_ctxt *ctxt; 598 unsigned int sge_no, seg_len; 599 struct scatterlist *sg; 600 int ret; 601 602 sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT; 603 ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no); 604 if (!ctxt) 605 goto out_noctx; 606 ctxt->rw_nents = sge_no; 607 608 dprintk("svcrdma: reading segment %u@0x%016llx:0x%08x (%u sges)\n", 609 len, offset, rkey, sge_no); 610 611 sg = ctxt->rw_sg_table.sgl; 612 for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) { 613 seg_len = min_t(unsigned int, len, 614 PAGE_SIZE - info->ri_pageoff); 615 616 head->arg.pages[info->ri_pageno] = 617 rqstp->rq_pages[info->ri_pageno]; 618 if (!info->ri_pageoff) 619 head->count++; 620 621 sg_set_page(sg, rqstp->rq_pages[info->ri_pageno], 622 seg_len, info->ri_pageoff); 623 sg = sg_next(sg); 624 625 info->ri_pageoff += seg_len; 626 if (info->ri_pageoff == PAGE_SIZE) { 627 info->ri_pageno++; 628 info->ri_pageoff = 0; 629 } 630 len -= seg_len; 631 632 /* Safety check */ 633 if (len && 634 &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end) 635 goto out_overrun; 636 } 637 638 ret = rdma_rw_ctx_init(&ctxt->rw_ctx, cc->cc_rdma->sc_qp, 639 cc->cc_rdma->sc_port_num, 640 ctxt->rw_sg_table.sgl, ctxt->rw_nents, 641 0, offset, rkey, DMA_FROM_DEVICE); 642 if (ret < 0) 643 goto out_initerr; 644 645 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 646 cc->cc_sqecount += ret; 647 return 0; 648 649 out_noctx: 650 dprintk("svcrdma: no R/W ctxs available\n"); 651 return -ENOMEM; 652 653 out_overrun: 654 dprintk("svcrdma: request overruns rq_pages\n"); 655 return -EINVAL; 656 657 out_initerr: 658 svc_rdma_put_rw_ctxt(cc->cc_rdma, ctxt); 659 pr_err("svcrdma: failed to map pagelist (%d)\n", ret); 660 return -EIO; 661 } 662 663 static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp, 664 struct svc_rdma_read_info *info, 665 __be32 *p) 666 { 667 int ret; 668 669 info->ri_chunklen = 0; 670 while (*p++ != xdr_zero) { 671 u32 rs_handle, rs_length; 672 u64 rs_offset; 673 674 if (be32_to_cpup(p++) != info->ri_position) 675 break; 676 rs_handle = be32_to_cpup(p++); 677 rs_length = be32_to_cpup(p++); 678 p = xdr_decode_hyper(p, &rs_offset); 679 680 ret = svc_rdma_build_read_segment(info, rqstp, 681 rs_handle, rs_length, 682 rs_offset); 683 if (ret < 0) 684 break; 685 686 info->ri_chunklen += rs_length; 687 } 688 689 return ret; 690 } 691 692 /* If there is inline content following the Read chunk, append it to 693 * the page list immediately following the data payload. This has to 694 * be done after the reader function has determined how many pages 695 * were consumed for RDMA Read. 696 * 697 * On entry, ri_pageno and ri_pageoff point directly to the end of the 698 * page list. On exit, both have been updated to the new "next byte". 699 * 700 * Assumptions: 701 * - Inline content fits entirely in rq_pages[0] 702 * - Trailing content is only a handful of bytes 703 */ 704 static int svc_rdma_copy_tail(struct svc_rqst *rqstp, 705 struct svc_rdma_read_info *info) 706 { 707 struct svc_rdma_op_ctxt *head = info->ri_readctxt; 708 unsigned int tail_length, remaining; 709 u8 *srcp, *destp; 710 711 /* Assert that all inline content fits in page 0. This is an 712 * implementation limit, not a protocol limit. 713 */ 714 if (head->arg.head[0].iov_len > PAGE_SIZE) { 715 pr_warn_once("svcrdma: too much trailing inline content\n"); 716 return -EINVAL; 717 } 718 719 srcp = head->arg.head[0].iov_base; 720 srcp += info->ri_position; 721 tail_length = head->arg.head[0].iov_len - info->ri_position; 722 remaining = tail_length; 723 724 /* If there is room on the last page in the page list, try to 725 * fit the trailing content there. 726 */ 727 if (info->ri_pageoff > 0) { 728 unsigned int len; 729 730 len = min_t(unsigned int, remaining, 731 PAGE_SIZE - info->ri_pageoff); 732 destp = page_address(rqstp->rq_pages[info->ri_pageno]); 733 destp += info->ri_pageoff; 734 735 memcpy(destp, srcp, len); 736 srcp += len; 737 destp += len; 738 info->ri_pageoff += len; 739 remaining -= len; 740 741 if (info->ri_pageoff == PAGE_SIZE) { 742 info->ri_pageno++; 743 info->ri_pageoff = 0; 744 } 745 } 746 747 /* Otherwise, a fresh page is needed. */ 748 if (remaining) { 749 head->arg.pages[info->ri_pageno] = 750 rqstp->rq_pages[info->ri_pageno]; 751 head->count++; 752 753 destp = page_address(rqstp->rq_pages[info->ri_pageno]); 754 memcpy(destp, srcp, remaining); 755 info->ri_pageoff += remaining; 756 } 757 758 head->arg.page_len += tail_length; 759 head->arg.len += tail_length; 760 head->arg.buflen += tail_length; 761 return 0; 762 } 763 764 /* Construct RDMA Reads to pull over a normal Read chunk. The chunk 765 * data lands in the page list of head->arg.pages. 766 * 767 * Currently NFSD does not look at the head->arg.tail[0] iovec. 768 * Therefore, XDR round-up of the Read chunk and trailing 769 * inline content must both be added at the end of the pagelist. 770 */ 771 static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp, 772 struct svc_rdma_read_info *info, 773 __be32 *p) 774 { 775 struct svc_rdma_op_ctxt *head = info->ri_readctxt; 776 int ret; 777 778 dprintk("svcrdma: Reading Read chunk at position %u\n", 779 info->ri_position); 780 781 info->ri_pageno = head->hdr_count; 782 info->ri_pageoff = 0; 783 784 ret = svc_rdma_build_read_chunk(rqstp, info, p); 785 if (ret < 0) 786 goto out; 787 788 /* Read chunk may need XDR round-up (see RFC 5666, s. 3.7). 789 */ 790 if (info->ri_chunklen & 3) { 791 u32 padlen = 4 - (info->ri_chunklen & 3); 792 793 info->ri_chunklen += padlen; 794 795 /* NB: data payload always starts on XDR alignment, 796 * thus the pad can never contain a page boundary. 797 */ 798 info->ri_pageoff += padlen; 799 if (info->ri_pageoff == PAGE_SIZE) { 800 info->ri_pageno++; 801 info->ri_pageoff = 0; 802 } 803 } 804 805 head->arg.page_len = info->ri_chunklen; 806 head->arg.len += info->ri_chunklen; 807 head->arg.buflen += info->ri_chunklen; 808 809 if (info->ri_position < head->arg.head[0].iov_len) { 810 ret = svc_rdma_copy_tail(rqstp, info); 811 if (ret < 0) 812 goto out; 813 } 814 head->arg.head[0].iov_len = info->ri_position; 815 816 out: 817 return ret; 818 } 819 820 /* Construct RDMA Reads to pull over a Position Zero Read chunk. 821 * The start of the data lands in the first page just after 822 * the Transport header, and the rest lands in the page list of 823 * head->arg.pages. 824 * 825 * Assumptions: 826 * - A PZRC has an XDR-aligned length (no implicit round-up). 827 * - There can be no trailing inline content (IOW, we assume 828 * a PZRC is never sent in an RDMA_MSG message, though it's 829 * allowed by spec). 830 */ 831 static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp, 832 struct svc_rdma_read_info *info, 833 __be32 *p) 834 { 835 struct svc_rdma_op_ctxt *head = info->ri_readctxt; 836 int ret; 837 838 dprintk("svcrdma: Reading Position Zero Read chunk\n"); 839 840 info->ri_pageno = head->hdr_count - 1; 841 info->ri_pageoff = offset_in_page(head->byte_len); 842 843 ret = svc_rdma_build_read_chunk(rqstp, info, p); 844 if (ret < 0) 845 goto out; 846 847 head->arg.len += info->ri_chunklen; 848 head->arg.buflen += info->ri_chunklen; 849 850 if (head->arg.buflen <= head->sge[0].length) { 851 /* Transport header and RPC message fit entirely 852 * in page where head iovec resides. 853 */ 854 head->arg.head[0].iov_len = info->ri_chunklen; 855 } else { 856 /* Transport header and part of RPC message reside 857 * in the head iovec's page. 858 */ 859 head->arg.head[0].iov_len = 860 head->sge[0].length - head->byte_len; 861 head->arg.page_len = 862 info->ri_chunklen - head->arg.head[0].iov_len; 863 } 864 865 out: 866 return ret; 867 } 868 869 /** 870 * svc_rdma_recv_read_chunk - Pull a Read chunk from the client 871 * @rdma: controlling RDMA transport 872 * @rqstp: set of pages to use as Read sink buffers 873 * @head: pages under I/O collect here 874 * @p: pointer to start of Read chunk 875 * 876 * Returns: 877 * %0 if all needed RDMA Reads were posted successfully, 878 * %-EINVAL if client provided too many segments, 879 * %-ENOMEM if rdma_rw context pool was exhausted, 880 * %-ENOTCONN if posting failed (connection is lost), 881 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 882 * 883 * Assumptions: 884 * - All Read segments in @p have the same Position value. 885 */ 886 int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, 887 struct svc_rdma_op_ctxt *head, __be32 *p) 888 { 889 struct svc_rdma_read_info *info; 890 struct page **page; 891 int ret; 892 893 /* The request (with page list) is constructed in 894 * head->arg. Pages involved with RDMA Read I/O are 895 * transferred there. 896 */ 897 head->hdr_count = head->count; 898 head->arg.head[0] = rqstp->rq_arg.head[0]; 899 head->arg.tail[0] = rqstp->rq_arg.tail[0]; 900 head->arg.pages = head->pages; 901 head->arg.page_base = 0; 902 head->arg.page_len = 0; 903 head->arg.len = rqstp->rq_arg.len; 904 head->arg.buflen = rqstp->rq_arg.buflen; 905 906 info = svc_rdma_read_info_alloc(rdma); 907 if (!info) 908 return -ENOMEM; 909 info->ri_readctxt = head; 910 911 info->ri_position = be32_to_cpup(p + 1); 912 if (info->ri_position) 913 ret = svc_rdma_build_normal_read_chunk(rqstp, info, p); 914 else 915 ret = svc_rdma_build_pz_read_chunk(rqstp, info, p); 916 917 /* Mark the start of the pages that can be used for the reply */ 918 if (info->ri_pageoff > 0) 919 info->ri_pageno++; 920 rqstp->rq_respages = &rqstp->rq_pages[info->ri_pageno]; 921 rqstp->rq_next_page = rqstp->rq_respages + 1; 922 923 if (ret < 0) 924 goto out; 925 926 ret = svc_rdma_post_chunk_ctxt(&info->ri_cc); 927 928 out: 929 /* Read sink pages have been moved from rqstp->rq_pages to 930 * head->arg.pages. Force svc_recv to refill those slots 931 * in rq_pages. 932 */ 933 for (page = rqstp->rq_pages; page < rqstp->rq_respages; page++) 934 *page = NULL; 935 936 if (ret < 0) 937 svc_rdma_read_info_free(info); 938 return ret; 939 } 940