1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (c) 2016-2018 Oracle. All rights reserved. 4 * 5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks. 6 */ 7 8 #include <rdma/rw.h> 9 10 #include <linux/sunrpc/xdr.h> 11 #include <linux/sunrpc/rpc_rdma.h> 12 #include <linux/sunrpc/svc_rdma.h> 13 14 #include "xprt_rdma.h" 15 #include <trace/events/rpcrdma.h> 16 17 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc); 18 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc); 19 20 /* Each R/W context contains state for one chain of RDMA Read or 21 * Write Work Requests. 22 * 23 * Each WR chain handles a single contiguous server-side buffer, 24 * because scatterlist entries after the first have to start on 25 * page alignment. xdr_buf iovecs cannot guarantee alignment. 26 * 27 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment 28 * from a client may contain a unique R_key, so each WR chain moves 29 * up to one segment at a time. 30 * 31 * The scatterlist makes this data structure over 4KB in size. To 32 * make it less likely to fail, and to handle the allocation for 33 * smaller I/O requests without disabling bottom-halves, these 34 * contexts are created on demand, but cached and reused until the 35 * controlling svcxprt_rdma is destroyed. 36 */ 37 struct svc_rdma_rw_ctxt { 38 struct llist_node rw_node; 39 struct list_head rw_list; 40 struct rdma_rw_ctx rw_ctx; 41 unsigned int rw_nents; 42 struct sg_table rw_sg_table; 43 struct scatterlist rw_first_sgl[]; 44 }; 45 46 static inline struct svc_rdma_rw_ctxt * 47 svc_rdma_next_ctxt(struct list_head *list) 48 { 49 return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt, 50 rw_list); 51 } 52 53 static struct svc_rdma_rw_ctxt * 54 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges) 55 { 56 struct svc_rdma_rw_ctxt *ctxt; 57 struct llist_node *node; 58 59 spin_lock(&rdma->sc_rw_ctxt_lock); 60 node = llist_del_first(&rdma->sc_rw_ctxts); 61 spin_unlock(&rdma->sc_rw_ctxt_lock); 62 if (node) { 63 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); 64 } else { 65 ctxt = kmalloc_node(struct_size(ctxt, rw_first_sgl, SG_CHUNK_SIZE), 66 GFP_KERNEL, ibdev_to_node(rdma->sc_cm_id->device)); 67 if (!ctxt) 68 goto out_noctx; 69 70 INIT_LIST_HEAD(&ctxt->rw_list); 71 } 72 73 ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl; 74 if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges, 75 ctxt->rw_sg_table.sgl, 76 SG_CHUNK_SIZE)) 77 goto out_free; 78 return ctxt; 79 80 out_free: 81 kfree(ctxt); 82 out_noctx: 83 trace_svcrdma_no_rwctx_err(rdma, sges); 84 return NULL; 85 } 86 87 static void __svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, 88 struct svc_rdma_rw_ctxt *ctxt, 89 struct llist_head *list) 90 { 91 sg_free_table_chained(&ctxt->rw_sg_table, SG_CHUNK_SIZE); 92 llist_add(&ctxt->rw_node, list); 93 } 94 95 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, 96 struct svc_rdma_rw_ctxt *ctxt) 97 { 98 __svc_rdma_put_rw_ctxt(rdma, ctxt, &rdma->sc_rw_ctxts); 99 } 100 101 /** 102 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts 103 * @rdma: transport about to be destroyed 104 * 105 */ 106 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma) 107 { 108 struct svc_rdma_rw_ctxt *ctxt; 109 struct llist_node *node; 110 111 while ((node = llist_del_first(&rdma->sc_rw_ctxts)) != NULL) { 112 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); 113 kfree(ctxt); 114 } 115 } 116 117 /** 118 * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O 119 * @rdma: controlling transport instance 120 * @ctxt: R/W context to prepare 121 * @offset: RDMA offset 122 * @handle: RDMA tag/handle 123 * @direction: I/O direction 124 * 125 * Returns on success, the number of WQEs that will be needed 126 * on the workqueue, or a negative errno. 127 */ 128 static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma, 129 struct svc_rdma_rw_ctxt *ctxt, 130 u64 offset, u32 handle, 131 enum dma_data_direction direction) 132 { 133 int ret; 134 135 ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, rdma->sc_port_num, 136 ctxt->rw_sg_table.sgl, ctxt->rw_nents, 137 0, offset, handle, direction); 138 if (unlikely(ret < 0)) { 139 svc_rdma_put_rw_ctxt(rdma, ctxt); 140 trace_svcrdma_dma_map_rw_err(rdma, ctxt->rw_nents, ret); 141 } 142 return ret; 143 } 144 145 /* A chunk context tracks all I/O for moving one Read or Write 146 * chunk. This is a set of rdma_rw's that handle data movement 147 * for all segments of one chunk. 148 * 149 * These are small, acquired with a single allocator call, and 150 * no more than one is needed per chunk. They are allocated on 151 * demand, and not cached. 152 */ 153 struct svc_rdma_chunk_ctxt { 154 struct rpc_rdma_cid cc_cid; 155 struct ib_cqe cc_cqe; 156 struct svcxprt_rdma *cc_rdma; 157 struct list_head cc_rwctxts; 158 ktime_t cc_posttime; 159 int cc_sqecount; 160 enum ib_wc_status cc_status; 161 struct completion cc_done; 162 }; 163 164 static void svc_rdma_cc_cid_init(struct svcxprt_rdma *rdma, 165 struct rpc_rdma_cid *cid) 166 { 167 cid->ci_queue_id = rdma->sc_sq_cq->res.id; 168 cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids); 169 } 170 171 static void svc_rdma_cc_init(struct svcxprt_rdma *rdma, 172 struct svc_rdma_chunk_ctxt *cc) 173 { 174 svc_rdma_cc_cid_init(rdma, &cc->cc_cid); 175 cc->cc_rdma = rdma; 176 177 INIT_LIST_HEAD(&cc->cc_rwctxts); 178 cc->cc_sqecount = 0; 179 } 180 181 /* 182 * The consumed rw_ctx's are cleaned and placed on a local llist so 183 * that only one atomic llist operation is needed to put them all 184 * back on the free list. 185 */ 186 static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc, 187 enum dma_data_direction dir) 188 { 189 struct svcxprt_rdma *rdma = cc->cc_rdma; 190 struct llist_node *first, *last; 191 struct svc_rdma_rw_ctxt *ctxt; 192 LLIST_HEAD(free); 193 194 trace_svcrdma_cc_release(&cc->cc_cid, cc->cc_sqecount); 195 196 first = last = NULL; 197 while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) { 198 list_del(&ctxt->rw_list); 199 200 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp, 201 rdma->sc_port_num, ctxt->rw_sg_table.sgl, 202 ctxt->rw_nents, dir); 203 __svc_rdma_put_rw_ctxt(rdma, ctxt, &free); 204 205 ctxt->rw_node.next = first; 206 first = &ctxt->rw_node; 207 if (!last) 208 last = first; 209 } 210 if (first) 211 llist_add_batch(first, last, &rdma->sc_rw_ctxts); 212 } 213 214 /* State for sending a Write or Reply chunk. 215 * - Tracks progress of writing one chunk over all its segments 216 * - Stores arguments for the SGL constructor functions 217 */ 218 struct svc_rdma_write_info { 219 const struct svc_rdma_chunk *wi_chunk; 220 221 /* write state of this chunk */ 222 unsigned int wi_seg_off; 223 unsigned int wi_seg_no; 224 225 /* SGL constructor arguments */ 226 const struct xdr_buf *wi_xdr; 227 unsigned char *wi_base; 228 unsigned int wi_next_off; 229 230 struct svc_rdma_chunk_ctxt wi_cc; 231 }; 232 233 static struct svc_rdma_write_info * 234 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, 235 const struct svc_rdma_chunk *chunk) 236 { 237 struct svc_rdma_write_info *info; 238 239 info = kmalloc_node(sizeof(*info), GFP_KERNEL, 240 ibdev_to_node(rdma->sc_cm_id->device)); 241 if (!info) 242 return info; 243 244 info->wi_chunk = chunk; 245 info->wi_seg_off = 0; 246 info->wi_seg_no = 0; 247 svc_rdma_cc_init(rdma, &info->wi_cc); 248 info->wi_cc.cc_cqe.done = svc_rdma_write_done; 249 return info; 250 } 251 252 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) 253 { 254 svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE); 255 kfree(info); 256 } 257 258 /** 259 * svc_rdma_write_done - Write chunk completion 260 * @cq: controlling Completion Queue 261 * @wc: Work Completion 262 * 263 * Pages under I/O are freed by a subsequent Send completion. 264 */ 265 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) 266 { 267 struct ib_cqe *cqe = wc->wr_cqe; 268 struct svc_rdma_chunk_ctxt *cc = 269 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 270 struct svcxprt_rdma *rdma = cc->cc_rdma; 271 struct svc_rdma_write_info *info = 272 container_of(cc, struct svc_rdma_write_info, wi_cc); 273 274 switch (wc->status) { 275 case IB_WC_SUCCESS: 276 trace_svcrdma_wc_write(wc, &cc->cc_cid); 277 break; 278 case IB_WC_WR_FLUSH_ERR: 279 trace_svcrdma_wc_write_flush(wc, &cc->cc_cid); 280 break; 281 default: 282 trace_svcrdma_wc_write_err(wc, &cc->cc_cid); 283 } 284 285 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); 286 287 if (unlikely(wc->status != IB_WC_SUCCESS)) 288 svc_xprt_deferred_close(&rdma->sc_xprt); 289 290 svc_rdma_write_info_free(info); 291 } 292 293 /* State for pulling a Read chunk. 294 */ 295 struct svc_rdma_read_info { 296 struct svc_rqst *ri_rqst; 297 struct svc_rdma_recv_ctxt *ri_readctxt; 298 unsigned int ri_pageno; 299 unsigned int ri_pageoff; 300 unsigned int ri_totalbytes; 301 302 struct svc_rdma_chunk_ctxt ri_cc; 303 }; 304 305 static struct svc_rdma_read_info * 306 svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma) 307 { 308 struct svc_rdma_read_info *info; 309 310 info = kmalloc_node(sizeof(*info), GFP_KERNEL, 311 ibdev_to_node(rdma->sc_cm_id->device)); 312 if (!info) 313 return info; 314 315 svc_rdma_cc_init(rdma, &info->ri_cc); 316 info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done; 317 return info; 318 } 319 320 static void svc_rdma_read_info_free(struct svc_rdma_read_info *info) 321 { 322 svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE); 323 kfree(info); 324 } 325 326 /** 327 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx 328 * @cq: controlling Completion Queue 329 * @wc: Work Completion 330 * 331 */ 332 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) 333 { 334 struct ib_cqe *cqe = wc->wr_cqe; 335 struct svc_rdma_chunk_ctxt *cc = 336 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 337 struct svc_rdma_read_info *info; 338 339 switch (wc->status) { 340 case IB_WC_SUCCESS: 341 info = container_of(cc, struct svc_rdma_read_info, ri_cc); 342 trace_svcrdma_wc_read(wc, &cc->cc_cid, info->ri_totalbytes, 343 cc->cc_posttime); 344 break; 345 case IB_WC_WR_FLUSH_ERR: 346 trace_svcrdma_wc_read_flush(wc, &cc->cc_cid); 347 break; 348 default: 349 trace_svcrdma_wc_read_err(wc, &cc->cc_cid); 350 } 351 352 svc_rdma_wake_send_waiters(cc->cc_rdma, cc->cc_sqecount); 353 cc->cc_status = wc->status; 354 complete(&cc->cc_done); 355 return; 356 } 357 358 /* 359 * Assumptions: 360 * - If ib_post_send() succeeds, only one completion is expected, 361 * even if one or more WRs are flushed. This is true when posting 362 * an rdma_rw_ctx or when posting a single signaled WR. 363 */ 364 static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc) 365 { 366 struct svcxprt_rdma *rdma = cc->cc_rdma; 367 struct ib_send_wr *first_wr; 368 const struct ib_send_wr *bad_wr; 369 struct list_head *tmp; 370 struct ib_cqe *cqe; 371 int ret; 372 373 might_sleep(); 374 375 if (cc->cc_sqecount > rdma->sc_sq_depth) 376 return -EINVAL; 377 378 first_wr = NULL; 379 cqe = &cc->cc_cqe; 380 list_for_each(tmp, &cc->cc_rwctxts) { 381 struct svc_rdma_rw_ctxt *ctxt; 382 383 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list); 384 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp, 385 rdma->sc_port_num, cqe, first_wr); 386 cqe = NULL; 387 } 388 389 do { 390 if (atomic_sub_return(cc->cc_sqecount, 391 &rdma->sc_sq_avail) > 0) { 392 cc->cc_posttime = ktime_get(); 393 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); 394 if (ret) 395 break; 396 return 0; 397 } 398 399 percpu_counter_inc(&svcrdma_stat_sq_starve); 400 trace_svcrdma_sq_full(rdma); 401 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 402 wait_event(rdma->sc_send_wait, 403 atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount); 404 trace_svcrdma_sq_retry(rdma); 405 } while (1); 406 407 trace_svcrdma_sq_post_err(rdma, ret); 408 svc_xprt_deferred_close(&rdma->sc_xprt); 409 410 /* If even one was posted, there will be a completion. */ 411 if (bad_wr != first_wr) 412 return 0; 413 414 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 415 wake_up(&rdma->sc_send_wait); 416 return -ENOTCONN; 417 } 418 419 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf 420 */ 421 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info, 422 unsigned int len, 423 struct svc_rdma_rw_ctxt *ctxt) 424 { 425 struct scatterlist *sg = ctxt->rw_sg_table.sgl; 426 427 sg_set_buf(&sg[0], info->wi_base, len); 428 info->wi_base += len; 429 430 ctxt->rw_nents = 1; 431 } 432 433 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist. 434 */ 435 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info, 436 unsigned int remaining, 437 struct svc_rdma_rw_ctxt *ctxt) 438 { 439 unsigned int sge_no, sge_bytes, page_off, page_no; 440 const struct xdr_buf *xdr = info->wi_xdr; 441 struct scatterlist *sg; 442 struct page **page; 443 444 page_off = info->wi_next_off + xdr->page_base; 445 page_no = page_off >> PAGE_SHIFT; 446 page_off = offset_in_page(page_off); 447 page = xdr->pages + page_no; 448 info->wi_next_off += remaining; 449 sg = ctxt->rw_sg_table.sgl; 450 sge_no = 0; 451 do { 452 sge_bytes = min_t(unsigned int, remaining, 453 PAGE_SIZE - page_off); 454 sg_set_page(sg, *page, sge_bytes, page_off); 455 456 remaining -= sge_bytes; 457 sg = sg_next(sg); 458 page_off = 0; 459 sge_no++; 460 page++; 461 } while (remaining); 462 463 ctxt->rw_nents = sge_no; 464 } 465 466 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing 467 * an RPC Reply. 468 */ 469 static int 470 svc_rdma_build_writes(struct svc_rdma_write_info *info, 471 void (*constructor)(struct svc_rdma_write_info *info, 472 unsigned int len, 473 struct svc_rdma_rw_ctxt *ctxt), 474 unsigned int remaining) 475 { 476 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; 477 struct svcxprt_rdma *rdma = cc->cc_rdma; 478 const struct svc_rdma_segment *seg; 479 struct svc_rdma_rw_ctxt *ctxt; 480 int ret; 481 482 do { 483 unsigned int write_len; 484 u64 offset; 485 486 if (info->wi_seg_no >= info->wi_chunk->ch_segcount) 487 goto out_overflow; 488 489 seg = &info->wi_chunk->ch_segments[info->wi_seg_no]; 490 write_len = min(remaining, seg->rs_length - info->wi_seg_off); 491 if (!write_len) 492 goto out_overflow; 493 ctxt = svc_rdma_get_rw_ctxt(rdma, 494 (write_len >> PAGE_SHIFT) + 2); 495 if (!ctxt) 496 return -ENOMEM; 497 498 constructor(info, write_len, ctxt); 499 offset = seg->rs_offset + info->wi_seg_off; 500 ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle, 501 DMA_TO_DEVICE); 502 if (ret < 0) 503 return -EIO; 504 percpu_counter_inc(&svcrdma_stat_write); 505 506 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 507 cc->cc_sqecount += ret; 508 if (write_len == seg->rs_length - info->wi_seg_off) { 509 info->wi_seg_no++; 510 info->wi_seg_off = 0; 511 } else { 512 info->wi_seg_off += write_len; 513 } 514 remaining -= write_len; 515 } while (remaining); 516 517 return 0; 518 519 out_overflow: 520 trace_svcrdma_small_wrch_err(rdma, remaining, info->wi_seg_no, 521 info->wi_chunk->ch_segcount); 522 return -E2BIG; 523 } 524 525 /** 526 * svc_rdma_iov_write - Construct RDMA Writes from an iov 527 * @info: pointer to write arguments 528 * @iov: kvec to write 529 * 530 * Returns: 531 * On success, returns zero 532 * %-E2BIG if the client-provided Write chunk is too small 533 * %-ENOMEM if a resource has been exhausted 534 * %-EIO if an rdma-rw error occurred 535 */ 536 static int svc_rdma_iov_write(struct svc_rdma_write_info *info, 537 const struct kvec *iov) 538 { 539 info->wi_base = iov->iov_base; 540 return svc_rdma_build_writes(info, svc_rdma_vec_to_sg, 541 iov->iov_len); 542 } 543 544 /** 545 * svc_rdma_pages_write - Construct RDMA Writes from pages 546 * @info: pointer to write arguments 547 * @xdr: xdr_buf with pages to write 548 * @offset: offset into the content of @xdr 549 * @length: number of bytes to write 550 * 551 * Returns: 552 * On success, returns zero 553 * %-E2BIG if the client-provided Write chunk is too small 554 * %-ENOMEM if a resource has been exhausted 555 * %-EIO if an rdma-rw error occurred 556 */ 557 static int svc_rdma_pages_write(struct svc_rdma_write_info *info, 558 const struct xdr_buf *xdr, 559 unsigned int offset, 560 unsigned long length) 561 { 562 info->wi_xdr = xdr; 563 info->wi_next_off = offset - xdr->head[0].iov_len; 564 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg, 565 length); 566 } 567 568 /** 569 * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf 570 * @xdr: xdr_buf to write 571 * @data: pointer to write arguments 572 * 573 * Returns: 574 * On success, returns zero 575 * %-E2BIG if the client-provided Write chunk is too small 576 * %-ENOMEM if a resource has been exhausted 577 * %-EIO if an rdma-rw error occurred 578 */ 579 static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data) 580 { 581 struct svc_rdma_write_info *info = data; 582 int ret; 583 584 if (xdr->head[0].iov_len) { 585 ret = svc_rdma_iov_write(info, &xdr->head[0]); 586 if (ret < 0) 587 return ret; 588 } 589 590 if (xdr->page_len) { 591 ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len, 592 xdr->page_len); 593 if (ret < 0) 594 return ret; 595 } 596 597 if (xdr->tail[0].iov_len) { 598 ret = svc_rdma_iov_write(info, &xdr->tail[0]); 599 if (ret < 0) 600 return ret; 601 } 602 603 return xdr->len; 604 } 605 606 /** 607 * svc_rdma_send_write_chunk - Write all segments in a Write chunk 608 * @rdma: controlling RDMA transport 609 * @chunk: Write chunk provided by the client 610 * @xdr: xdr_buf containing the data payload 611 * 612 * Returns a non-negative number of bytes the chunk consumed, or 613 * %-E2BIG if the payload was larger than the Write chunk, 614 * %-EINVAL if client provided too many segments, 615 * %-ENOMEM if rdma_rw context pool was exhausted, 616 * %-ENOTCONN if posting failed (connection is lost), 617 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 618 */ 619 int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, 620 const struct svc_rdma_chunk *chunk, 621 const struct xdr_buf *xdr) 622 { 623 struct svc_rdma_write_info *info; 624 struct svc_rdma_chunk_ctxt *cc; 625 int ret; 626 627 info = svc_rdma_write_info_alloc(rdma, chunk); 628 if (!info) 629 return -ENOMEM; 630 cc = &info->wi_cc; 631 632 ret = svc_rdma_xb_write(xdr, info); 633 if (ret != xdr->len) 634 goto out_err; 635 636 trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount); 637 ret = svc_rdma_post_chunk_ctxt(cc); 638 if (ret < 0) 639 goto out_err; 640 return xdr->len; 641 642 out_err: 643 svc_rdma_write_info_free(info); 644 return ret; 645 } 646 647 /** 648 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk 649 * @rdma: controlling RDMA transport 650 * @rctxt: Write and Reply chunks from client 651 * @xdr: xdr_buf containing an RPC Reply 652 * 653 * Returns a non-negative number of bytes the chunk consumed, or 654 * %-E2BIG if the payload was larger than the Reply chunk, 655 * %-EINVAL if client provided too many segments, 656 * %-ENOMEM if rdma_rw context pool was exhausted, 657 * %-ENOTCONN if posting failed (connection is lost), 658 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 659 */ 660 int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, 661 const struct svc_rdma_recv_ctxt *rctxt, 662 const struct xdr_buf *xdr) 663 { 664 struct svc_rdma_write_info *info; 665 struct svc_rdma_chunk_ctxt *cc; 666 struct svc_rdma_chunk *chunk; 667 int ret; 668 669 if (pcl_is_empty(&rctxt->rc_reply_pcl)) 670 return 0; 671 672 chunk = pcl_first_chunk(&rctxt->rc_reply_pcl); 673 info = svc_rdma_write_info_alloc(rdma, chunk); 674 if (!info) 675 return -ENOMEM; 676 cc = &info->wi_cc; 677 678 ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr, 679 svc_rdma_xb_write, info); 680 if (ret < 0) 681 goto out_err; 682 683 trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount); 684 ret = svc_rdma_post_chunk_ctxt(cc); 685 if (ret < 0) 686 goto out_err; 687 688 return xdr->len; 689 690 out_err: 691 svc_rdma_write_info_free(info); 692 return ret; 693 } 694 695 /** 696 * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment 697 * @info: context for ongoing I/O 698 * @segment: co-ordinates of remote memory to be read 699 * 700 * Returns: 701 * %0: the Read WR chain was constructed successfully 702 * %-EINVAL: there were not enough rq_pages to finish 703 * %-ENOMEM: allocating a local resources failed 704 * %-EIO: a DMA mapping error occurred 705 */ 706 static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, 707 const struct svc_rdma_segment *segment) 708 { 709 struct svc_rdma_recv_ctxt *head = info->ri_readctxt; 710 struct svc_rdma_chunk_ctxt *cc = &info->ri_cc; 711 struct svc_rqst *rqstp = info->ri_rqst; 712 unsigned int sge_no, seg_len, len; 713 struct svc_rdma_rw_ctxt *ctxt; 714 struct scatterlist *sg; 715 int ret; 716 717 len = segment->rs_length; 718 sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT; 719 ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no); 720 if (!ctxt) 721 return -ENOMEM; 722 ctxt->rw_nents = sge_no; 723 724 sg = ctxt->rw_sg_table.sgl; 725 for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) { 726 seg_len = min_t(unsigned int, len, 727 PAGE_SIZE - info->ri_pageoff); 728 729 if (!info->ri_pageoff) 730 head->rc_page_count++; 731 732 sg_set_page(sg, rqstp->rq_pages[info->ri_pageno], 733 seg_len, info->ri_pageoff); 734 sg = sg_next(sg); 735 736 info->ri_pageoff += seg_len; 737 if (info->ri_pageoff == PAGE_SIZE) { 738 info->ri_pageno++; 739 info->ri_pageoff = 0; 740 } 741 len -= seg_len; 742 743 /* Safety check */ 744 if (len && 745 &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end) 746 goto out_overrun; 747 } 748 749 ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, segment->rs_offset, 750 segment->rs_handle, DMA_FROM_DEVICE); 751 if (ret < 0) 752 return -EIO; 753 percpu_counter_inc(&svcrdma_stat_read); 754 755 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 756 cc->cc_sqecount += ret; 757 return 0; 758 759 out_overrun: 760 trace_svcrdma_page_overrun_err(cc->cc_rdma, rqstp, info->ri_pageno); 761 return -EINVAL; 762 } 763 764 /** 765 * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk 766 * @info: context for ongoing I/O 767 * @chunk: Read chunk to pull 768 * 769 * Return values: 770 * %0: the Read WR chain was constructed successfully 771 * %-EINVAL: there were not enough resources to finish 772 * %-ENOMEM: allocating a local resources failed 773 * %-EIO: a DMA mapping error occurred 774 */ 775 static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info, 776 const struct svc_rdma_chunk *chunk) 777 { 778 const struct svc_rdma_segment *segment; 779 int ret; 780 781 ret = -EINVAL; 782 pcl_for_each_segment(segment, chunk) { 783 ret = svc_rdma_build_read_segment(info, segment); 784 if (ret < 0) 785 break; 786 info->ri_totalbytes += segment->rs_length; 787 } 788 return ret; 789 } 790 791 /** 792 * svc_rdma_copy_inline_range - Copy part of the inline content into pages 793 * @info: context for RDMA Reads 794 * @offset: offset into the Receive buffer of region to copy 795 * @remaining: length of region to copy 796 * 797 * Take a page at a time from rqstp->rq_pages and copy the inline 798 * content from the Receive buffer into that page. Update 799 * info->ri_pageno and info->ri_pageoff so that the next RDMA Read 800 * result will land contiguously with the copied content. 801 * 802 * Return values: 803 * %0: Inline content was successfully copied 804 * %-EINVAL: offset or length was incorrect 805 */ 806 static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info, 807 unsigned int offset, 808 unsigned int remaining) 809 { 810 struct svc_rdma_recv_ctxt *head = info->ri_readctxt; 811 unsigned char *dst, *src = head->rc_recv_buf; 812 struct svc_rqst *rqstp = info->ri_rqst; 813 unsigned int page_no, numpages; 814 815 numpages = PAGE_ALIGN(info->ri_pageoff + remaining) >> PAGE_SHIFT; 816 for (page_no = 0; page_no < numpages; page_no++) { 817 unsigned int page_len; 818 819 page_len = min_t(unsigned int, remaining, 820 PAGE_SIZE - info->ri_pageoff); 821 822 if (!info->ri_pageoff) 823 head->rc_page_count++; 824 825 dst = page_address(rqstp->rq_pages[info->ri_pageno]); 826 memcpy(dst + info->ri_pageno, src + offset, page_len); 827 828 info->ri_totalbytes += page_len; 829 info->ri_pageoff += page_len; 830 if (info->ri_pageoff == PAGE_SIZE) { 831 info->ri_pageno++; 832 info->ri_pageoff = 0; 833 } 834 remaining -= page_len; 835 offset += page_len; 836 } 837 838 return -EINVAL; 839 } 840 841 /** 842 * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks 843 * @info: context for RDMA Reads 844 * 845 * The chunk data lands in rqstp->rq_arg as a series of contiguous pages, 846 * like an incoming TCP call. 847 * 848 * Return values: 849 * %0: RDMA Read WQEs were successfully built 850 * %-EINVAL: client provided too many chunks or segments, 851 * %-ENOMEM: rdma_rw context pool was exhausted, 852 * %-ENOTCONN: posting failed (connection is lost), 853 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 854 */ 855 static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *info) 856 { 857 struct svc_rdma_recv_ctxt *head = info->ri_readctxt; 858 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; 859 struct xdr_buf *buf = &info->ri_rqst->rq_arg; 860 struct svc_rdma_chunk *chunk, *next; 861 unsigned int start, length; 862 int ret; 863 864 start = 0; 865 chunk = pcl_first_chunk(pcl); 866 length = chunk->ch_position; 867 ret = svc_rdma_copy_inline_range(info, start, length); 868 if (ret < 0) 869 return ret; 870 871 pcl_for_each_chunk(chunk, pcl) { 872 ret = svc_rdma_build_read_chunk(info, chunk); 873 if (ret < 0) 874 return ret; 875 876 next = pcl_next_chunk(pcl, chunk); 877 if (!next) 878 break; 879 880 start += length; 881 length = next->ch_position - info->ri_totalbytes; 882 ret = svc_rdma_copy_inline_range(info, start, length); 883 if (ret < 0) 884 return ret; 885 } 886 887 start += length; 888 length = head->rc_byte_len - start; 889 ret = svc_rdma_copy_inline_range(info, start, length); 890 if (ret < 0) 891 return ret; 892 893 buf->len += info->ri_totalbytes; 894 buf->buflen += info->ri_totalbytes; 895 896 buf->head[0].iov_base = page_address(info->ri_rqst->rq_pages[0]); 897 buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes); 898 buf->pages = &info->ri_rqst->rq_pages[1]; 899 buf->page_len = info->ri_totalbytes - buf->head[0].iov_len; 900 return 0; 901 } 902 903 /** 904 * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks 905 * @info: context for RDMA Reads 906 * 907 * The chunk data lands in the page list of rqstp->rq_arg.pages. 908 * 909 * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec. 910 * Therefore, XDR round-up of the Read chunk and trailing 911 * inline content must both be added at the end of the pagelist. 912 * 913 * Return values: 914 * %0: RDMA Read WQEs were successfully built 915 * %-EINVAL: client provided too many chunks or segments, 916 * %-ENOMEM: rdma_rw context pool was exhausted, 917 * %-ENOTCONN: posting failed (connection is lost), 918 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 919 */ 920 static int svc_rdma_read_data_item(struct svc_rdma_read_info *info) 921 { 922 struct svc_rdma_recv_ctxt *head = info->ri_readctxt; 923 struct xdr_buf *buf = &info->ri_rqst->rq_arg; 924 struct svc_rdma_chunk *chunk; 925 unsigned int length; 926 int ret; 927 928 chunk = pcl_first_chunk(&head->rc_read_pcl); 929 ret = svc_rdma_build_read_chunk(info, chunk); 930 if (ret < 0) 931 goto out; 932 933 /* Split the Receive buffer between the head and tail 934 * buffers at Read chunk's position. XDR roundup of the 935 * chunk is not included in either the pagelist or in 936 * the tail. 937 */ 938 buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position; 939 buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position; 940 buf->head[0].iov_len = chunk->ch_position; 941 942 /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2). 943 * 944 * If the client already rounded up the chunk length, the 945 * length does not change. Otherwise, the length of the page 946 * list is increased to include XDR round-up. 947 * 948 * Currently these chunks always start at page offset 0, 949 * thus the rounded-up length never crosses a page boundary. 950 */ 951 buf->pages = &info->ri_rqst->rq_pages[0]; 952 length = xdr_align_size(chunk->ch_length); 953 buf->page_len = length; 954 buf->len += length; 955 buf->buflen += length; 956 957 out: 958 return ret; 959 } 960 961 /** 962 * svc_rdma_read_chunk_range - Build RDMA Read WQEs for portion of a chunk 963 * @info: context for RDMA Reads 964 * @chunk: parsed Call chunk to pull 965 * @offset: offset of region to pull 966 * @length: length of region to pull 967 * 968 * Return values: 969 * %0: RDMA Read WQEs were successfully built 970 * %-EINVAL: there were not enough resources to finish 971 * %-ENOMEM: rdma_rw context pool was exhausted, 972 * %-ENOTCONN: posting failed (connection is lost), 973 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 974 */ 975 static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info, 976 const struct svc_rdma_chunk *chunk, 977 unsigned int offset, unsigned int length) 978 { 979 const struct svc_rdma_segment *segment; 980 int ret; 981 982 ret = -EINVAL; 983 pcl_for_each_segment(segment, chunk) { 984 struct svc_rdma_segment dummy; 985 986 if (offset > segment->rs_length) { 987 offset -= segment->rs_length; 988 continue; 989 } 990 991 dummy.rs_handle = segment->rs_handle; 992 dummy.rs_length = min_t(u32, length, segment->rs_length) - offset; 993 dummy.rs_offset = segment->rs_offset + offset; 994 995 ret = svc_rdma_build_read_segment(info, &dummy); 996 if (ret < 0) 997 break; 998 999 info->ri_totalbytes += dummy.rs_length; 1000 length -= dummy.rs_length; 1001 offset = 0; 1002 } 1003 return ret; 1004 } 1005 1006 /** 1007 * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message 1008 * @info: context for RDMA Reads 1009 * 1010 * Return values: 1011 * %0: RDMA Read WQEs were successfully built 1012 * %-EINVAL: there were not enough resources to finish 1013 * %-ENOMEM: rdma_rw context pool was exhausted, 1014 * %-ENOTCONN: posting failed (connection is lost), 1015 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1016 */ 1017 static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info) 1018 { 1019 struct svc_rdma_recv_ctxt *head = info->ri_readctxt; 1020 const struct svc_rdma_chunk *call_chunk = 1021 pcl_first_chunk(&head->rc_call_pcl); 1022 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; 1023 struct svc_rdma_chunk *chunk, *next; 1024 unsigned int start, length; 1025 int ret; 1026 1027 if (pcl_is_empty(pcl)) 1028 return svc_rdma_build_read_chunk(info, call_chunk); 1029 1030 start = 0; 1031 chunk = pcl_first_chunk(pcl); 1032 length = chunk->ch_position; 1033 ret = svc_rdma_read_chunk_range(info, call_chunk, start, length); 1034 if (ret < 0) 1035 return ret; 1036 1037 pcl_for_each_chunk(chunk, pcl) { 1038 ret = svc_rdma_build_read_chunk(info, chunk); 1039 if (ret < 0) 1040 return ret; 1041 1042 next = pcl_next_chunk(pcl, chunk); 1043 if (!next) 1044 break; 1045 1046 start += length; 1047 length = next->ch_position - info->ri_totalbytes; 1048 ret = svc_rdma_read_chunk_range(info, call_chunk, 1049 start, length); 1050 if (ret < 0) 1051 return ret; 1052 } 1053 1054 start += length; 1055 length = call_chunk->ch_length - start; 1056 return svc_rdma_read_chunk_range(info, call_chunk, start, length); 1057 } 1058 1059 /** 1060 * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message 1061 * @info: context for RDMA Reads 1062 * 1063 * The start of the data lands in the first page just after the 1064 * Transport header, and the rest lands in rqstp->rq_arg.pages. 1065 * 1066 * Assumptions: 1067 * - A PZRC is never sent in an RDMA_MSG message, though it's 1068 * allowed by spec. 1069 * 1070 * Return values: 1071 * %0: RDMA Read WQEs were successfully built 1072 * %-EINVAL: client provided too many chunks or segments, 1073 * %-ENOMEM: rdma_rw context pool was exhausted, 1074 * %-ENOTCONN: posting failed (connection is lost), 1075 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1076 */ 1077 static noinline int svc_rdma_read_special(struct svc_rdma_read_info *info) 1078 { 1079 struct xdr_buf *buf = &info->ri_rqst->rq_arg; 1080 int ret; 1081 1082 ret = svc_rdma_read_call_chunk(info); 1083 if (ret < 0) 1084 goto out; 1085 1086 buf->len += info->ri_totalbytes; 1087 buf->buflen += info->ri_totalbytes; 1088 1089 buf->head[0].iov_base = page_address(info->ri_rqst->rq_pages[0]); 1090 buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes); 1091 buf->pages = &info->ri_rqst->rq_pages[1]; 1092 buf->page_len = info->ri_totalbytes - buf->head[0].iov_len; 1093 1094 out: 1095 return ret; 1096 } 1097 1098 /** 1099 * svc_rdma_process_read_list - Pull list of Read chunks from the client 1100 * @rdma: controlling RDMA transport 1101 * @rqstp: set of pages to use as Read sink buffers 1102 * @head: pages under I/O collect here 1103 * 1104 * The RPC/RDMA protocol assumes that the upper layer's XDR decoders 1105 * pull each Read chunk as they decode an incoming RPC message. 1106 * 1107 * On Linux, however, the server needs to have a fully-constructed RPC 1108 * message in rqstp->rq_arg when there is a positive return code from 1109 * ->xpo_recvfrom. So the Read list is safety-checked immediately when 1110 * it is received, then here the whole Read list is pulled all at once. 1111 * The ingress RPC message is fully reconstructed once all associated 1112 * RDMA Reads have completed. 1113 * 1114 * Return values: 1115 * %1: all needed RDMA Reads were posted successfully, 1116 * %-EINVAL: client provided too many chunks or segments, 1117 * %-ENOMEM: rdma_rw context pool was exhausted, 1118 * %-ENOTCONN: posting failed (connection is lost), 1119 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1120 */ 1121 int svc_rdma_process_read_list(struct svcxprt_rdma *rdma, 1122 struct svc_rqst *rqstp, 1123 struct svc_rdma_recv_ctxt *head) 1124 { 1125 struct svc_rdma_read_info *info; 1126 struct svc_rdma_chunk_ctxt *cc; 1127 int ret; 1128 1129 info = svc_rdma_read_info_alloc(rdma); 1130 if (!info) 1131 return -ENOMEM; 1132 cc = &info->ri_cc; 1133 info->ri_rqst = rqstp; 1134 info->ri_readctxt = head; 1135 info->ri_pageno = 0; 1136 info->ri_pageoff = 0; 1137 info->ri_totalbytes = 0; 1138 1139 if (pcl_is_empty(&head->rc_call_pcl)) { 1140 if (head->rc_read_pcl.cl_count == 1) 1141 ret = svc_rdma_read_data_item(info); 1142 else 1143 ret = svc_rdma_read_multiple_chunks(info); 1144 } else 1145 ret = svc_rdma_read_special(info); 1146 if (ret < 0) 1147 goto out_err; 1148 1149 trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount); 1150 init_completion(&cc->cc_done); 1151 ret = svc_rdma_post_chunk_ctxt(cc); 1152 if (ret < 0) 1153 goto out_err; 1154 1155 ret = 1; 1156 wait_for_completion(&cc->cc_done); 1157 if (cc->cc_status != IB_WC_SUCCESS) 1158 ret = -EIO; 1159 1160 /* rq_respages starts after the last arg page */ 1161 rqstp->rq_respages = &rqstp->rq_pages[head->rc_page_count]; 1162 rqstp->rq_next_page = rqstp->rq_respages + 1; 1163 1164 /* Ensure svc_rdma_recv_ctxt_put() does not try to release pages */ 1165 head->rc_page_count = 0; 1166 1167 out_err: 1168 svc_rdma_read_info_free(info); 1169 return ret; 1170 } 1171