1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (c) 2016-2018 Oracle. All rights reserved. 4 * 5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks. 6 */ 7 8 #include <rdma/rw.h> 9 10 #include <linux/sunrpc/xdr.h> 11 #include <linux/sunrpc/rpc_rdma.h> 12 #include <linux/sunrpc/svc_rdma.h> 13 14 #include "xprt_rdma.h" 15 #include <trace/events/rpcrdma.h> 16 17 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc); 18 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc); 19 20 /* Each R/W context contains state for one chain of RDMA Read or 21 * Write Work Requests. 22 * 23 * Each WR chain handles a single contiguous server-side buffer, 24 * because scatterlist entries after the first have to start on 25 * page alignment. xdr_buf iovecs cannot guarantee alignment. 26 * 27 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment 28 * from a client may contain a unique R_key, so each WR chain moves 29 * up to one segment at a time. 30 * 31 * The scatterlist makes this data structure over 4KB in size. To 32 * make it less likely to fail, and to handle the allocation for 33 * smaller I/O requests without disabling bottom-halves, these 34 * contexts are created on demand, but cached and reused until the 35 * controlling svcxprt_rdma is destroyed. 36 */ 37 struct svc_rdma_rw_ctxt { 38 struct list_head rw_list; 39 struct rdma_rw_ctx rw_ctx; 40 unsigned int rw_nents; 41 struct sg_table rw_sg_table; 42 struct scatterlist rw_first_sgl[]; 43 }; 44 45 static inline struct svc_rdma_rw_ctxt * 46 svc_rdma_next_ctxt(struct list_head *list) 47 { 48 return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt, 49 rw_list); 50 } 51 52 static struct svc_rdma_rw_ctxt * 53 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges) 54 { 55 struct svc_rdma_rw_ctxt *ctxt; 56 57 spin_lock(&rdma->sc_rw_ctxt_lock); 58 59 ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts); 60 if (ctxt) { 61 list_del(&ctxt->rw_list); 62 spin_unlock(&rdma->sc_rw_ctxt_lock); 63 } else { 64 spin_unlock(&rdma->sc_rw_ctxt_lock); 65 ctxt = kmalloc(struct_size(ctxt, rw_first_sgl, SG_CHUNK_SIZE), 66 GFP_KERNEL); 67 if (!ctxt) 68 goto out_noctx; 69 INIT_LIST_HEAD(&ctxt->rw_list); 70 } 71 72 ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl; 73 if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges, 74 ctxt->rw_sg_table.sgl, 75 SG_CHUNK_SIZE)) 76 goto out_free; 77 return ctxt; 78 79 out_free: 80 kfree(ctxt); 81 out_noctx: 82 trace_svcrdma_no_rwctx_err(rdma, sges); 83 return NULL; 84 } 85 86 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, 87 struct svc_rdma_rw_ctxt *ctxt) 88 { 89 sg_free_table_chained(&ctxt->rw_sg_table, SG_CHUNK_SIZE); 90 91 spin_lock(&rdma->sc_rw_ctxt_lock); 92 list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts); 93 spin_unlock(&rdma->sc_rw_ctxt_lock); 94 } 95 96 /** 97 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts 98 * @rdma: transport about to be destroyed 99 * 100 */ 101 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma) 102 { 103 struct svc_rdma_rw_ctxt *ctxt; 104 105 while ((ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts)) != NULL) { 106 list_del(&ctxt->rw_list); 107 kfree(ctxt); 108 } 109 } 110 111 /** 112 * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O 113 * @rdma: controlling transport instance 114 * @ctxt: R/W context to prepare 115 * @offset: RDMA offset 116 * @handle: RDMA tag/handle 117 * @direction: I/O direction 118 * 119 * Returns on success, the number of WQEs that will be needed 120 * on the workqueue, or a negative errno. 121 */ 122 static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma, 123 struct svc_rdma_rw_ctxt *ctxt, 124 u64 offset, u32 handle, 125 enum dma_data_direction direction) 126 { 127 int ret; 128 129 ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, rdma->sc_port_num, 130 ctxt->rw_sg_table.sgl, ctxt->rw_nents, 131 0, offset, handle, direction); 132 if (unlikely(ret < 0)) { 133 svc_rdma_put_rw_ctxt(rdma, ctxt); 134 trace_svcrdma_dma_map_rw_err(rdma, ctxt->rw_nents, ret); 135 } 136 return ret; 137 } 138 139 /* A chunk context tracks all I/O for moving one Read or Write 140 * chunk. This is a set of rdma_rw's that handle data movement 141 * for all segments of one chunk. 142 * 143 * These are small, acquired with a single allocator call, and 144 * no more than one is needed per chunk. They are allocated on 145 * demand, and not cached. 146 */ 147 struct svc_rdma_chunk_ctxt { 148 struct rpc_rdma_cid cc_cid; 149 struct ib_cqe cc_cqe; 150 struct svcxprt_rdma *cc_rdma; 151 struct list_head cc_rwctxts; 152 int cc_sqecount; 153 enum ib_wc_status cc_status; 154 struct completion cc_done; 155 }; 156 157 static void svc_rdma_cc_cid_init(struct svcxprt_rdma *rdma, 158 struct rpc_rdma_cid *cid) 159 { 160 cid->ci_queue_id = rdma->sc_sq_cq->res.id; 161 cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids); 162 } 163 164 static void svc_rdma_cc_init(struct svcxprt_rdma *rdma, 165 struct svc_rdma_chunk_ctxt *cc) 166 { 167 svc_rdma_cc_cid_init(rdma, &cc->cc_cid); 168 cc->cc_rdma = rdma; 169 170 INIT_LIST_HEAD(&cc->cc_rwctxts); 171 cc->cc_sqecount = 0; 172 } 173 174 static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc, 175 enum dma_data_direction dir) 176 { 177 struct svcxprt_rdma *rdma = cc->cc_rdma; 178 struct svc_rdma_rw_ctxt *ctxt; 179 180 while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) { 181 list_del(&ctxt->rw_list); 182 183 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp, 184 rdma->sc_port_num, ctxt->rw_sg_table.sgl, 185 ctxt->rw_nents, dir); 186 svc_rdma_put_rw_ctxt(rdma, ctxt); 187 } 188 } 189 190 /* State for sending a Write or Reply chunk. 191 * - Tracks progress of writing one chunk over all its segments 192 * - Stores arguments for the SGL constructor functions 193 */ 194 struct svc_rdma_write_info { 195 const struct svc_rdma_chunk *wi_chunk; 196 197 /* write state of this chunk */ 198 unsigned int wi_seg_off; 199 unsigned int wi_seg_no; 200 201 /* SGL constructor arguments */ 202 const struct xdr_buf *wi_xdr; 203 unsigned char *wi_base; 204 unsigned int wi_next_off; 205 206 struct svc_rdma_chunk_ctxt wi_cc; 207 }; 208 209 static struct svc_rdma_write_info * 210 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, 211 const struct svc_rdma_chunk *chunk) 212 { 213 struct svc_rdma_write_info *info; 214 215 info = kmalloc(sizeof(*info), GFP_KERNEL); 216 if (!info) 217 return info; 218 219 info->wi_chunk = chunk; 220 info->wi_seg_off = 0; 221 info->wi_seg_no = 0; 222 svc_rdma_cc_init(rdma, &info->wi_cc); 223 info->wi_cc.cc_cqe.done = svc_rdma_write_done; 224 return info; 225 } 226 227 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) 228 { 229 svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE); 230 kfree(info); 231 } 232 233 /** 234 * svc_rdma_write_done - Write chunk completion 235 * @cq: controlling Completion Queue 236 * @wc: Work Completion 237 * 238 * Pages under I/O are freed by a subsequent Send completion. 239 */ 240 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) 241 { 242 struct ib_cqe *cqe = wc->wr_cqe; 243 struct svc_rdma_chunk_ctxt *cc = 244 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 245 struct svcxprt_rdma *rdma = cc->cc_rdma; 246 struct svc_rdma_write_info *info = 247 container_of(cc, struct svc_rdma_write_info, wi_cc); 248 249 trace_svcrdma_wc_write(wc, &cc->cc_cid); 250 251 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 252 wake_up(&rdma->sc_send_wait); 253 254 if (unlikely(wc->status != IB_WC_SUCCESS)) 255 svc_xprt_deferred_close(&rdma->sc_xprt); 256 257 svc_rdma_write_info_free(info); 258 } 259 260 /* State for pulling a Read chunk. 261 */ 262 struct svc_rdma_read_info { 263 struct svc_rqst *ri_rqst; 264 struct svc_rdma_recv_ctxt *ri_readctxt; 265 unsigned int ri_pageno; 266 unsigned int ri_pageoff; 267 unsigned int ri_totalbytes; 268 269 struct svc_rdma_chunk_ctxt ri_cc; 270 }; 271 272 static struct svc_rdma_read_info * 273 svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma) 274 { 275 struct svc_rdma_read_info *info; 276 277 info = kmalloc(sizeof(*info), GFP_KERNEL); 278 if (!info) 279 return info; 280 281 svc_rdma_cc_init(rdma, &info->ri_cc); 282 info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done; 283 return info; 284 } 285 286 static void svc_rdma_read_info_free(struct svc_rdma_read_info *info) 287 { 288 svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE); 289 kfree(info); 290 } 291 292 /** 293 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx 294 * @cq: controlling Completion Queue 295 * @wc: Work Completion 296 * 297 */ 298 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) 299 { 300 struct ib_cqe *cqe = wc->wr_cqe; 301 struct svc_rdma_chunk_ctxt *cc = 302 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 303 struct svcxprt_rdma *rdma = cc->cc_rdma; 304 305 trace_svcrdma_wc_read(wc, &cc->cc_cid); 306 307 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 308 wake_up(&rdma->sc_send_wait); 309 310 cc->cc_status = wc->status; 311 complete(&cc->cc_done); 312 return; 313 } 314 315 /* This function sleeps when the transport's Send Queue is congested. 316 * 317 * Assumptions: 318 * - If ib_post_send() succeeds, only one completion is expected, 319 * even if one or more WRs are flushed. This is true when posting 320 * an rdma_rw_ctx or when posting a single signaled WR. 321 */ 322 static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc) 323 { 324 struct svcxprt_rdma *rdma = cc->cc_rdma; 325 struct ib_send_wr *first_wr; 326 const struct ib_send_wr *bad_wr; 327 struct list_head *tmp; 328 struct ib_cqe *cqe; 329 int ret; 330 331 if (cc->cc_sqecount > rdma->sc_sq_depth) 332 return -EINVAL; 333 334 first_wr = NULL; 335 cqe = &cc->cc_cqe; 336 list_for_each(tmp, &cc->cc_rwctxts) { 337 struct svc_rdma_rw_ctxt *ctxt; 338 339 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list); 340 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp, 341 rdma->sc_port_num, cqe, first_wr); 342 cqe = NULL; 343 } 344 345 do { 346 if (atomic_sub_return(cc->cc_sqecount, 347 &rdma->sc_sq_avail) > 0) { 348 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); 349 if (ret) 350 break; 351 return 0; 352 } 353 354 percpu_counter_inc(&svcrdma_stat_sq_starve); 355 trace_svcrdma_sq_full(rdma); 356 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 357 wait_event(rdma->sc_send_wait, 358 atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount); 359 trace_svcrdma_sq_retry(rdma); 360 } while (1); 361 362 trace_svcrdma_sq_post_err(rdma, ret); 363 svc_xprt_deferred_close(&rdma->sc_xprt); 364 365 /* If even one was posted, there will be a completion. */ 366 if (bad_wr != first_wr) 367 return 0; 368 369 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 370 wake_up(&rdma->sc_send_wait); 371 return -ENOTCONN; 372 } 373 374 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf 375 */ 376 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info, 377 unsigned int len, 378 struct svc_rdma_rw_ctxt *ctxt) 379 { 380 struct scatterlist *sg = ctxt->rw_sg_table.sgl; 381 382 sg_set_buf(&sg[0], info->wi_base, len); 383 info->wi_base += len; 384 385 ctxt->rw_nents = 1; 386 } 387 388 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist. 389 */ 390 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info, 391 unsigned int remaining, 392 struct svc_rdma_rw_ctxt *ctxt) 393 { 394 unsigned int sge_no, sge_bytes, page_off, page_no; 395 const struct xdr_buf *xdr = info->wi_xdr; 396 struct scatterlist *sg; 397 struct page **page; 398 399 page_off = info->wi_next_off + xdr->page_base; 400 page_no = page_off >> PAGE_SHIFT; 401 page_off = offset_in_page(page_off); 402 page = xdr->pages + page_no; 403 info->wi_next_off += remaining; 404 sg = ctxt->rw_sg_table.sgl; 405 sge_no = 0; 406 do { 407 sge_bytes = min_t(unsigned int, remaining, 408 PAGE_SIZE - page_off); 409 sg_set_page(sg, *page, sge_bytes, page_off); 410 411 remaining -= sge_bytes; 412 sg = sg_next(sg); 413 page_off = 0; 414 sge_no++; 415 page++; 416 } while (remaining); 417 418 ctxt->rw_nents = sge_no; 419 } 420 421 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing 422 * an RPC Reply. 423 */ 424 static int 425 svc_rdma_build_writes(struct svc_rdma_write_info *info, 426 void (*constructor)(struct svc_rdma_write_info *info, 427 unsigned int len, 428 struct svc_rdma_rw_ctxt *ctxt), 429 unsigned int remaining) 430 { 431 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; 432 struct svcxprt_rdma *rdma = cc->cc_rdma; 433 const struct svc_rdma_segment *seg; 434 struct svc_rdma_rw_ctxt *ctxt; 435 int ret; 436 437 do { 438 unsigned int write_len; 439 u64 offset; 440 441 seg = &info->wi_chunk->ch_segments[info->wi_seg_no]; 442 if (!seg) 443 goto out_overflow; 444 445 write_len = min(remaining, seg->rs_length - info->wi_seg_off); 446 if (!write_len) 447 goto out_overflow; 448 ctxt = svc_rdma_get_rw_ctxt(rdma, 449 (write_len >> PAGE_SHIFT) + 2); 450 if (!ctxt) 451 return -ENOMEM; 452 453 constructor(info, write_len, ctxt); 454 offset = seg->rs_offset + info->wi_seg_off; 455 ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle, 456 DMA_TO_DEVICE); 457 if (ret < 0) 458 return -EIO; 459 percpu_counter_inc(&svcrdma_stat_write); 460 461 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 462 cc->cc_sqecount += ret; 463 if (write_len == seg->rs_length - info->wi_seg_off) { 464 info->wi_seg_no++; 465 info->wi_seg_off = 0; 466 } else { 467 info->wi_seg_off += write_len; 468 } 469 remaining -= write_len; 470 } while (remaining); 471 472 return 0; 473 474 out_overflow: 475 trace_svcrdma_small_wrch_err(rdma, remaining, info->wi_seg_no, 476 info->wi_chunk->ch_segcount); 477 return -E2BIG; 478 } 479 480 /** 481 * svc_rdma_iov_write - Construct RDMA Writes from an iov 482 * @info: pointer to write arguments 483 * @iov: kvec to write 484 * 485 * Returns: 486 * On success, returns zero 487 * %-E2BIG if the client-provided Write chunk is too small 488 * %-ENOMEM if a resource has been exhausted 489 * %-EIO if an rdma-rw error occurred 490 */ 491 static int svc_rdma_iov_write(struct svc_rdma_write_info *info, 492 const struct kvec *iov) 493 { 494 info->wi_base = iov->iov_base; 495 return svc_rdma_build_writes(info, svc_rdma_vec_to_sg, 496 iov->iov_len); 497 } 498 499 /** 500 * svc_rdma_pages_write - Construct RDMA Writes from pages 501 * @info: pointer to write arguments 502 * @xdr: xdr_buf with pages to write 503 * @offset: offset into the content of @xdr 504 * @length: number of bytes to write 505 * 506 * Returns: 507 * On success, returns zero 508 * %-E2BIG if the client-provided Write chunk is too small 509 * %-ENOMEM if a resource has been exhausted 510 * %-EIO if an rdma-rw error occurred 511 */ 512 static int svc_rdma_pages_write(struct svc_rdma_write_info *info, 513 const struct xdr_buf *xdr, 514 unsigned int offset, 515 unsigned long length) 516 { 517 info->wi_xdr = xdr; 518 info->wi_next_off = offset - xdr->head[0].iov_len; 519 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg, 520 length); 521 } 522 523 /** 524 * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf 525 * @xdr: xdr_buf to write 526 * @data: pointer to write arguments 527 * 528 * Returns: 529 * On success, returns zero 530 * %-E2BIG if the client-provided Write chunk is too small 531 * %-ENOMEM if a resource has been exhausted 532 * %-EIO if an rdma-rw error occurred 533 */ 534 static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data) 535 { 536 struct svc_rdma_write_info *info = data; 537 int ret; 538 539 if (xdr->head[0].iov_len) { 540 ret = svc_rdma_iov_write(info, &xdr->head[0]); 541 if (ret < 0) 542 return ret; 543 } 544 545 if (xdr->page_len) { 546 ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len, 547 xdr->page_len); 548 if (ret < 0) 549 return ret; 550 } 551 552 if (xdr->tail[0].iov_len) { 553 ret = svc_rdma_iov_write(info, &xdr->tail[0]); 554 if (ret < 0) 555 return ret; 556 } 557 558 return xdr->len; 559 } 560 561 /** 562 * svc_rdma_send_write_chunk - Write all segments in a Write chunk 563 * @rdma: controlling RDMA transport 564 * @chunk: Write chunk provided by the client 565 * @xdr: xdr_buf containing the data payload 566 * 567 * Returns a non-negative number of bytes the chunk consumed, or 568 * %-E2BIG if the payload was larger than the Write chunk, 569 * %-EINVAL if client provided too many segments, 570 * %-ENOMEM if rdma_rw context pool was exhausted, 571 * %-ENOTCONN if posting failed (connection is lost), 572 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 573 */ 574 int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, 575 const struct svc_rdma_chunk *chunk, 576 const struct xdr_buf *xdr) 577 { 578 struct svc_rdma_write_info *info; 579 struct svc_rdma_chunk_ctxt *cc; 580 int ret; 581 582 info = svc_rdma_write_info_alloc(rdma, chunk); 583 if (!info) 584 return -ENOMEM; 585 cc = &info->wi_cc; 586 587 ret = svc_rdma_xb_write(xdr, info); 588 if (ret != xdr->len) 589 goto out_err; 590 591 trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount); 592 ret = svc_rdma_post_chunk_ctxt(cc); 593 if (ret < 0) 594 goto out_err; 595 return xdr->len; 596 597 out_err: 598 svc_rdma_write_info_free(info); 599 return ret; 600 } 601 602 /** 603 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk 604 * @rdma: controlling RDMA transport 605 * @rctxt: Write and Reply chunks from client 606 * @xdr: xdr_buf containing an RPC Reply 607 * 608 * Returns a non-negative number of bytes the chunk consumed, or 609 * %-E2BIG if the payload was larger than the Reply chunk, 610 * %-EINVAL if client provided too many segments, 611 * %-ENOMEM if rdma_rw context pool was exhausted, 612 * %-ENOTCONN if posting failed (connection is lost), 613 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 614 */ 615 int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, 616 const struct svc_rdma_recv_ctxt *rctxt, 617 const struct xdr_buf *xdr) 618 { 619 struct svc_rdma_write_info *info; 620 struct svc_rdma_chunk_ctxt *cc; 621 struct svc_rdma_chunk *chunk; 622 int ret; 623 624 if (pcl_is_empty(&rctxt->rc_reply_pcl)) 625 return 0; 626 627 chunk = pcl_first_chunk(&rctxt->rc_reply_pcl); 628 info = svc_rdma_write_info_alloc(rdma, chunk); 629 if (!info) 630 return -ENOMEM; 631 cc = &info->wi_cc; 632 633 ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr, 634 svc_rdma_xb_write, info); 635 if (ret < 0) 636 goto out_err; 637 638 trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount); 639 ret = svc_rdma_post_chunk_ctxt(cc); 640 if (ret < 0) 641 goto out_err; 642 643 return xdr->len; 644 645 out_err: 646 svc_rdma_write_info_free(info); 647 return ret; 648 } 649 650 /** 651 * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment 652 * @info: context for ongoing I/O 653 * @segment: co-ordinates of remote memory to be read 654 * 655 * Returns: 656 * %0: the Read WR chain was constructed successfully 657 * %-EINVAL: there were not enough rq_pages to finish 658 * %-ENOMEM: allocating a local resources failed 659 * %-EIO: a DMA mapping error occurred 660 */ 661 static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, 662 const struct svc_rdma_segment *segment) 663 { 664 struct svc_rdma_recv_ctxt *head = info->ri_readctxt; 665 struct svc_rdma_chunk_ctxt *cc = &info->ri_cc; 666 struct svc_rqst *rqstp = info->ri_rqst; 667 unsigned int sge_no, seg_len, len; 668 struct svc_rdma_rw_ctxt *ctxt; 669 struct scatterlist *sg; 670 int ret; 671 672 len = segment->rs_length; 673 sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT; 674 ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no); 675 if (!ctxt) 676 return -ENOMEM; 677 ctxt->rw_nents = sge_no; 678 679 sg = ctxt->rw_sg_table.sgl; 680 for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) { 681 seg_len = min_t(unsigned int, len, 682 PAGE_SIZE - info->ri_pageoff); 683 684 if (!info->ri_pageoff) 685 head->rc_page_count++; 686 687 sg_set_page(sg, rqstp->rq_pages[info->ri_pageno], 688 seg_len, info->ri_pageoff); 689 sg = sg_next(sg); 690 691 info->ri_pageoff += seg_len; 692 if (info->ri_pageoff == PAGE_SIZE) { 693 info->ri_pageno++; 694 info->ri_pageoff = 0; 695 } 696 len -= seg_len; 697 698 /* Safety check */ 699 if (len && 700 &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end) 701 goto out_overrun; 702 } 703 704 ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, segment->rs_offset, 705 segment->rs_handle, DMA_FROM_DEVICE); 706 if (ret < 0) 707 return -EIO; 708 percpu_counter_inc(&svcrdma_stat_read); 709 710 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 711 cc->cc_sqecount += ret; 712 return 0; 713 714 out_overrun: 715 trace_svcrdma_page_overrun_err(cc->cc_rdma, rqstp, info->ri_pageno); 716 return -EINVAL; 717 } 718 719 /** 720 * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk 721 * @info: context for ongoing I/O 722 * @chunk: Read chunk to pull 723 * 724 * Return values: 725 * %0: the Read WR chain was constructed successfully 726 * %-EINVAL: there were not enough resources to finish 727 * %-ENOMEM: allocating a local resources failed 728 * %-EIO: a DMA mapping error occurred 729 */ 730 static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info, 731 const struct svc_rdma_chunk *chunk) 732 { 733 const struct svc_rdma_segment *segment; 734 int ret; 735 736 ret = -EINVAL; 737 pcl_for_each_segment(segment, chunk) { 738 ret = svc_rdma_build_read_segment(info, segment); 739 if (ret < 0) 740 break; 741 info->ri_totalbytes += segment->rs_length; 742 } 743 return ret; 744 } 745 746 /** 747 * svc_rdma_copy_inline_range - Copy part of the inline content into pages 748 * @info: context for RDMA Reads 749 * @offset: offset into the Receive buffer of region to copy 750 * @remaining: length of region to copy 751 * 752 * Take a page at a time from rqstp->rq_pages and copy the inline 753 * content from the Receive buffer into that page. Update 754 * info->ri_pageno and info->ri_pageoff so that the next RDMA Read 755 * result will land contiguously with the copied content. 756 * 757 * Return values: 758 * %0: Inline content was successfully copied 759 * %-EINVAL: offset or length was incorrect 760 */ 761 static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info, 762 unsigned int offset, 763 unsigned int remaining) 764 { 765 struct svc_rdma_recv_ctxt *head = info->ri_readctxt; 766 unsigned char *dst, *src = head->rc_recv_buf; 767 struct svc_rqst *rqstp = info->ri_rqst; 768 unsigned int page_no, numpages; 769 770 numpages = PAGE_ALIGN(info->ri_pageoff + remaining) >> PAGE_SHIFT; 771 for (page_no = 0; page_no < numpages; page_no++) { 772 unsigned int page_len; 773 774 page_len = min_t(unsigned int, remaining, 775 PAGE_SIZE - info->ri_pageoff); 776 777 if (!info->ri_pageoff) 778 head->rc_page_count++; 779 780 dst = page_address(rqstp->rq_pages[info->ri_pageno]); 781 memcpy(dst + info->ri_pageno, src + offset, page_len); 782 783 info->ri_totalbytes += page_len; 784 info->ri_pageoff += page_len; 785 if (info->ri_pageoff == PAGE_SIZE) { 786 info->ri_pageno++; 787 info->ri_pageoff = 0; 788 } 789 remaining -= page_len; 790 offset += page_len; 791 } 792 793 return -EINVAL; 794 } 795 796 /** 797 * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks 798 * @info: context for RDMA Reads 799 * 800 * The chunk data lands in rqstp->rq_arg as a series of contiguous pages, 801 * like an incoming TCP call. 802 * 803 * Return values: 804 * %0: RDMA Read WQEs were successfully built 805 * %-EINVAL: client provided too many chunks or segments, 806 * %-ENOMEM: rdma_rw context pool was exhausted, 807 * %-ENOTCONN: posting failed (connection is lost), 808 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 809 */ 810 static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *info) 811 { 812 struct svc_rdma_recv_ctxt *head = info->ri_readctxt; 813 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; 814 struct xdr_buf *buf = &info->ri_rqst->rq_arg; 815 struct svc_rdma_chunk *chunk, *next; 816 unsigned int start, length; 817 int ret; 818 819 start = 0; 820 chunk = pcl_first_chunk(pcl); 821 length = chunk->ch_position; 822 ret = svc_rdma_copy_inline_range(info, start, length); 823 if (ret < 0) 824 return ret; 825 826 pcl_for_each_chunk(chunk, pcl) { 827 ret = svc_rdma_build_read_chunk(info, chunk); 828 if (ret < 0) 829 return ret; 830 831 next = pcl_next_chunk(pcl, chunk); 832 if (!next) 833 break; 834 835 start += length; 836 length = next->ch_position - info->ri_totalbytes; 837 ret = svc_rdma_copy_inline_range(info, start, length); 838 if (ret < 0) 839 return ret; 840 } 841 842 start += length; 843 length = head->rc_byte_len - start; 844 ret = svc_rdma_copy_inline_range(info, start, length); 845 if (ret < 0) 846 return ret; 847 848 buf->len += info->ri_totalbytes; 849 buf->buflen += info->ri_totalbytes; 850 851 buf->head[0].iov_base = page_address(info->ri_rqst->rq_pages[0]); 852 buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes); 853 buf->pages = &info->ri_rqst->rq_pages[1]; 854 buf->page_len = info->ri_totalbytes - buf->head[0].iov_len; 855 return 0; 856 } 857 858 /** 859 * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks 860 * @info: context for RDMA Reads 861 * 862 * The chunk data lands in the page list of rqstp->rq_arg.pages. 863 * 864 * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec. 865 * Therefore, XDR round-up of the Read chunk and trailing 866 * inline content must both be added at the end of the pagelist. 867 * 868 * Return values: 869 * %0: RDMA Read WQEs were successfully built 870 * %-EINVAL: client provided too many chunks or segments, 871 * %-ENOMEM: rdma_rw context pool was exhausted, 872 * %-ENOTCONN: posting failed (connection is lost), 873 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 874 */ 875 static int svc_rdma_read_data_item(struct svc_rdma_read_info *info) 876 { 877 struct svc_rdma_recv_ctxt *head = info->ri_readctxt; 878 struct xdr_buf *buf = &info->ri_rqst->rq_arg; 879 struct svc_rdma_chunk *chunk; 880 unsigned int length; 881 int ret; 882 883 chunk = pcl_first_chunk(&head->rc_read_pcl); 884 ret = svc_rdma_build_read_chunk(info, chunk); 885 if (ret < 0) 886 goto out; 887 888 /* Split the Receive buffer between the head and tail 889 * buffers at Read chunk's position. XDR roundup of the 890 * chunk is not included in either the pagelist or in 891 * the tail. 892 */ 893 buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position; 894 buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position; 895 buf->head[0].iov_len = chunk->ch_position; 896 897 /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2). 898 * 899 * If the client already rounded up the chunk length, the 900 * length does not change. Otherwise, the length of the page 901 * list is increased to include XDR round-up. 902 * 903 * Currently these chunks always start at page offset 0, 904 * thus the rounded-up length never crosses a page boundary. 905 */ 906 buf->pages = &info->ri_rqst->rq_pages[0]; 907 length = xdr_align_size(chunk->ch_length); 908 buf->page_len = length; 909 buf->len += length; 910 buf->buflen += length; 911 912 out: 913 return ret; 914 } 915 916 /** 917 * svc_rdma_read_chunk_range - Build RDMA Read WQEs for portion of a chunk 918 * @info: context for RDMA Reads 919 * @chunk: parsed Call chunk to pull 920 * @offset: offset of region to pull 921 * @length: length of region to pull 922 * 923 * Return values: 924 * %0: RDMA Read WQEs were successfully built 925 * %-EINVAL: there were not enough resources to finish 926 * %-ENOMEM: rdma_rw context pool was exhausted, 927 * %-ENOTCONN: posting failed (connection is lost), 928 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 929 */ 930 static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info, 931 const struct svc_rdma_chunk *chunk, 932 unsigned int offset, unsigned int length) 933 { 934 const struct svc_rdma_segment *segment; 935 int ret; 936 937 ret = -EINVAL; 938 pcl_for_each_segment(segment, chunk) { 939 struct svc_rdma_segment dummy; 940 941 if (offset > segment->rs_length) { 942 offset -= segment->rs_length; 943 continue; 944 } 945 946 dummy.rs_handle = segment->rs_handle; 947 dummy.rs_length = min_t(u32, length, segment->rs_length) - offset; 948 dummy.rs_offset = segment->rs_offset + offset; 949 950 ret = svc_rdma_build_read_segment(info, &dummy); 951 if (ret < 0) 952 break; 953 954 info->ri_totalbytes += dummy.rs_length; 955 length -= dummy.rs_length; 956 offset = 0; 957 } 958 return ret; 959 } 960 961 /** 962 * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message 963 * @info: context for RDMA Reads 964 * 965 * Return values: 966 * %0: RDMA Read WQEs were successfully built 967 * %-EINVAL: there were not enough resources to finish 968 * %-ENOMEM: rdma_rw context pool was exhausted, 969 * %-ENOTCONN: posting failed (connection is lost), 970 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 971 */ 972 static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info) 973 { 974 struct svc_rdma_recv_ctxt *head = info->ri_readctxt; 975 const struct svc_rdma_chunk *call_chunk = 976 pcl_first_chunk(&head->rc_call_pcl); 977 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; 978 struct svc_rdma_chunk *chunk, *next; 979 unsigned int start, length; 980 int ret; 981 982 if (pcl_is_empty(pcl)) 983 return svc_rdma_build_read_chunk(info, call_chunk); 984 985 start = 0; 986 chunk = pcl_first_chunk(pcl); 987 length = chunk->ch_position; 988 ret = svc_rdma_read_chunk_range(info, call_chunk, start, length); 989 if (ret < 0) 990 return ret; 991 992 pcl_for_each_chunk(chunk, pcl) { 993 ret = svc_rdma_build_read_chunk(info, chunk); 994 if (ret < 0) 995 return ret; 996 997 next = pcl_next_chunk(pcl, chunk); 998 if (!next) 999 break; 1000 1001 start += length; 1002 length = next->ch_position - info->ri_totalbytes; 1003 ret = svc_rdma_read_chunk_range(info, call_chunk, 1004 start, length); 1005 if (ret < 0) 1006 return ret; 1007 } 1008 1009 start += length; 1010 length = call_chunk->ch_length - start; 1011 return svc_rdma_read_chunk_range(info, call_chunk, start, length); 1012 } 1013 1014 /** 1015 * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message 1016 * @info: context for RDMA Reads 1017 * 1018 * The start of the data lands in the first page just after the 1019 * Transport header, and the rest lands in rqstp->rq_arg.pages. 1020 * 1021 * Assumptions: 1022 * - A PZRC is never sent in an RDMA_MSG message, though it's 1023 * allowed by spec. 1024 * 1025 * Return values: 1026 * %0: RDMA Read WQEs were successfully built 1027 * %-EINVAL: client provided too many chunks or segments, 1028 * %-ENOMEM: rdma_rw context pool was exhausted, 1029 * %-ENOTCONN: posting failed (connection is lost), 1030 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1031 */ 1032 static noinline int svc_rdma_read_special(struct svc_rdma_read_info *info) 1033 { 1034 struct xdr_buf *buf = &info->ri_rqst->rq_arg; 1035 int ret; 1036 1037 ret = svc_rdma_read_call_chunk(info); 1038 if (ret < 0) 1039 goto out; 1040 1041 buf->len += info->ri_totalbytes; 1042 buf->buflen += info->ri_totalbytes; 1043 1044 buf->head[0].iov_base = page_address(info->ri_rqst->rq_pages[0]); 1045 buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes); 1046 buf->pages = &info->ri_rqst->rq_pages[1]; 1047 buf->page_len = info->ri_totalbytes - buf->head[0].iov_len; 1048 1049 out: 1050 return ret; 1051 } 1052 1053 /** 1054 * svc_rdma_process_read_list - Pull list of Read chunks from the client 1055 * @rdma: controlling RDMA transport 1056 * @rqstp: set of pages to use as Read sink buffers 1057 * @head: pages under I/O collect here 1058 * 1059 * The RPC/RDMA protocol assumes that the upper layer's XDR decoders 1060 * pull each Read chunk as they decode an incoming RPC message. 1061 * 1062 * On Linux, however, the server needs to have a fully-constructed RPC 1063 * message in rqstp->rq_arg when there is a positive return code from 1064 * ->xpo_recvfrom. So the Read list is safety-checked immediately when 1065 * it is received, then here the whole Read list is pulled all at once. 1066 * The ingress RPC message is fully reconstructed once all associated 1067 * RDMA Reads have completed. 1068 * 1069 * Return values: 1070 * %1: all needed RDMA Reads were posted successfully, 1071 * %-EINVAL: client provided too many chunks or segments, 1072 * %-ENOMEM: rdma_rw context pool was exhausted, 1073 * %-ENOTCONN: posting failed (connection is lost), 1074 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1075 */ 1076 int svc_rdma_process_read_list(struct svcxprt_rdma *rdma, 1077 struct svc_rqst *rqstp, 1078 struct svc_rdma_recv_ctxt *head) 1079 { 1080 struct svc_rdma_read_info *info; 1081 struct svc_rdma_chunk_ctxt *cc; 1082 int ret; 1083 1084 info = svc_rdma_read_info_alloc(rdma); 1085 if (!info) 1086 return -ENOMEM; 1087 cc = &info->ri_cc; 1088 info->ri_rqst = rqstp; 1089 info->ri_readctxt = head; 1090 info->ri_pageno = 0; 1091 info->ri_pageoff = 0; 1092 info->ri_totalbytes = 0; 1093 1094 if (pcl_is_empty(&head->rc_call_pcl)) { 1095 if (head->rc_read_pcl.cl_count == 1) 1096 ret = svc_rdma_read_data_item(info); 1097 else 1098 ret = svc_rdma_read_multiple_chunks(info); 1099 } else 1100 ret = svc_rdma_read_special(info); 1101 if (ret < 0) 1102 goto out_err; 1103 1104 trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount); 1105 init_completion(&cc->cc_done); 1106 ret = svc_rdma_post_chunk_ctxt(cc); 1107 if (ret < 0) 1108 goto out_err; 1109 1110 ret = 1; 1111 wait_for_completion(&cc->cc_done); 1112 if (cc->cc_status != IB_WC_SUCCESS) 1113 ret = -EIO; 1114 1115 /* rq_respages starts after the last arg page */ 1116 rqstp->rq_respages = &rqstp->rq_pages[head->rc_page_count]; 1117 rqstp->rq_next_page = rqstp->rq_respages + 1; 1118 1119 /* Ensure svc_rdma_recv_ctxt_put() does not try to release pages */ 1120 head->rc_page_count = 0; 1121 1122 out_err: 1123 svc_rdma_read_info_free(info); 1124 return ret; 1125 } 1126