1 // SPDX-License-Identifier: GPL-2.0 2 /* 3 * Copyright (c) 2016-2018 Oracle. All rights reserved. 4 * 5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks. 6 */ 7 8 #include <rdma/rw.h> 9 10 #include <linux/sunrpc/xdr.h> 11 #include <linux/sunrpc/rpc_rdma.h> 12 #include <linux/sunrpc/svc_rdma.h> 13 14 #include "xprt_rdma.h" 15 #include <trace/events/rpcrdma.h> 16 17 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc); 18 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc); 19 20 /* Each R/W context contains state for one chain of RDMA Read or 21 * Write Work Requests. 22 * 23 * Each WR chain handles a single contiguous server-side buffer, 24 * because scatterlist entries after the first have to start on 25 * page alignment. xdr_buf iovecs cannot guarantee alignment. 26 * 27 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment 28 * from a client may contain a unique R_key, so each WR chain moves 29 * up to one segment at a time. 30 * 31 * The scatterlist makes this data structure over 4KB in size. To 32 * make it less likely to fail, and to handle the allocation for 33 * smaller I/O requests without disabling bottom-halves, these 34 * contexts are created on demand, but cached and reused until the 35 * controlling svcxprt_rdma is destroyed. 36 */ 37 struct svc_rdma_rw_ctxt { 38 struct llist_node rw_node; 39 struct list_head rw_list; 40 struct rdma_rw_ctx rw_ctx; 41 unsigned int rw_nents; 42 struct sg_table rw_sg_table; 43 struct scatterlist rw_first_sgl[]; 44 }; 45 46 static inline struct svc_rdma_rw_ctxt * 47 svc_rdma_next_ctxt(struct list_head *list) 48 { 49 return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt, 50 rw_list); 51 } 52 53 static struct svc_rdma_rw_ctxt * 54 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges) 55 { 56 struct svc_rdma_rw_ctxt *ctxt; 57 struct llist_node *node; 58 59 spin_lock(&rdma->sc_rw_ctxt_lock); 60 node = llist_del_first(&rdma->sc_rw_ctxts); 61 spin_unlock(&rdma->sc_rw_ctxt_lock); 62 if (node) { 63 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); 64 } else { 65 ctxt = kmalloc_node(struct_size(ctxt, rw_first_sgl, SG_CHUNK_SIZE), 66 GFP_KERNEL, ibdev_to_node(rdma->sc_cm_id->device)); 67 if (!ctxt) 68 goto out_noctx; 69 70 INIT_LIST_HEAD(&ctxt->rw_list); 71 } 72 73 ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl; 74 if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges, 75 ctxt->rw_sg_table.sgl, 76 SG_CHUNK_SIZE)) 77 goto out_free; 78 return ctxt; 79 80 out_free: 81 kfree(ctxt); 82 out_noctx: 83 trace_svcrdma_no_rwctx_err(rdma, sges); 84 return NULL; 85 } 86 87 static void __svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt, 88 struct llist_head *list) 89 { 90 sg_free_table_chained(&ctxt->rw_sg_table, SG_CHUNK_SIZE); 91 llist_add(&ctxt->rw_node, list); 92 } 93 94 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, 95 struct svc_rdma_rw_ctxt *ctxt) 96 { 97 __svc_rdma_put_rw_ctxt(ctxt, &rdma->sc_rw_ctxts); 98 } 99 100 /** 101 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts 102 * @rdma: transport about to be destroyed 103 * 104 */ 105 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma) 106 { 107 struct svc_rdma_rw_ctxt *ctxt; 108 struct llist_node *node; 109 110 while ((node = llist_del_first(&rdma->sc_rw_ctxts)) != NULL) { 111 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node); 112 kfree(ctxt); 113 } 114 } 115 116 /** 117 * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O 118 * @rdma: controlling transport instance 119 * @ctxt: R/W context to prepare 120 * @offset: RDMA offset 121 * @handle: RDMA tag/handle 122 * @direction: I/O direction 123 * 124 * Returns on success, the number of WQEs that will be needed 125 * on the workqueue, or a negative errno. 126 */ 127 static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma, 128 struct svc_rdma_rw_ctxt *ctxt, 129 u64 offset, u32 handle, 130 enum dma_data_direction direction) 131 { 132 int ret; 133 134 ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, rdma->sc_port_num, 135 ctxt->rw_sg_table.sgl, ctxt->rw_nents, 136 0, offset, handle, direction); 137 if (unlikely(ret < 0)) { 138 svc_rdma_put_rw_ctxt(rdma, ctxt); 139 trace_svcrdma_dma_map_rw_err(rdma, ctxt->rw_nents, ret); 140 } 141 return ret; 142 } 143 144 /* A chunk context tracks all I/O for moving one Read or Write 145 * chunk. This is a set of rdma_rw's that handle data movement 146 * for all segments of one chunk. 147 * 148 * These are small, acquired with a single allocator call, and 149 * no more than one is needed per chunk. They are allocated on 150 * demand, and not cached. 151 */ 152 struct svc_rdma_chunk_ctxt { 153 struct rpc_rdma_cid cc_cid; 154 struct ib_cqe cc_cqe; 155 struct svcxprt_rdma *cc_rdma; 156 struct list_head cc_rwctxts; 157 ktime_t cc_posttime; 158 int cc_sqecount; 159 enum ib_wc_status cc_status; 160 struct completion cc_done; 161 }; 162 163 static void svc_rdma_cc_cid_init(struct svcxprt_rdma *rdma, 164 struct rpc_rdma_cid *cid) 165 { 166 cid->ci_queue_id = rdma->sc_sq_cq->res.id; 167 cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids); 168 } 169 170 static void svc_rdma_cc_init(struct svcxprt_rdma *rdma, 171 struct svc_rdma_chunk_ctxt *cc) 172 { 173 svc_rdma_cc_cid_init(rdma, &cc->cc_cid); 174 cc->cc_rdma = rdma; 175 176 INIT_LIST_HEAD(&cc->cc_rwctxts); 177 cc->cc_sqecount = 0; 178 } 179 180 /* 181 * The consumed rw_ctx's are cleaned and placed on a local llist so 182 * that only one atomic llist operation is needed to put them all 183 * back on the free list. 184 */ 185 static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc, 186 enum dma_data_direction dir) 187 { 188 struct svcxprt_rdma *rdma = cc->cc_rdma; 189 struct llist_node *first, *last; 190 struct svc_rdma_rw_ctxt *ctxt; 191 LLIST_HEAD(free); 192 193 trace_svcrdma_cc_release(&cc->cc_cid, cc->cc_sqecount); 194 195 first = last = NULL; 196 while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) { 197 list_del(&ctxt->rw_list); 198 199 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp, 200 rdma->sc_port_num, ctxt->rw_sg_table.sgl, 201 ctxt->rw_nents, dir); 202 __svc_rdma_put_rw_ctxt(ctxt, &free); 203 204 ctxt->rw_node.next = first; 205 first = &ctxt->rw_node; 206 if (!last) 207 last = first; 208 } 209 if (first) 210 llist_add_batch(first, last, &rdma->sc_rw_ctxts); 211 } 212 213 /* State for sending a Write or Reply chunk. 214 * - Tracks progress of writing one chunk over all its segments 215 * - Stores arguments for the SGL constructor functions 216 */ 217 struct svc_rdma_write_info { 218 const struct svc_rdma_chunk *wi_chunk; 219 220 /* write state of this chunk */ 221 unsigned int wi_seg_off; 222 unsigned int wi_seg_no; 223 224 /* SGL constructor arguments */ 225 const struct xdr_buf *wi_xdr; 226 unsigned char *wi_base; 227 unsigned int wi_next_off; 228 229 struct svc_rdma_chunk_ctxt wi_cc; 230 }; 231 232 static struct svc_rdma_write_info * 233 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, 234 const struct svc_rdma_chunk *chunk) 235 { 236 struct svc_rdma_write_info *info; 237 238 info = kmalloc_node(sizeof(*info), GFP_KERNEL, 239 ibdev_to_node(rdma->sc_cm_id->device)); 240 if (!info) 241 return info; 242 243 info->wi_chunk = chunk; 244 info->wi_seg_off = 0; 245 info->wi_seg_no = 0; 246 svc_rdma_cc_init(rdma, &info->wi_cc); 247 info->wi_cc.cc_cqe.done = svc_rdma_write_done; 248 return info; 249 } 250 251 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) 252 { 253 svc_rdma_cc_release(&info->wi_cc, DMA_TO_DEVICE); 254 kfree(info); 255 } 256 257 /** 258 * svc_rdma_write_done - Write chunk completion 259 * @cq: controlling Completion Queue 260 * @wc: Work Completion 261 * 262 * Pages under I/O are freed by a subsequent Send completion. 263 */ 264 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) 265 { 266 struct ib_cqe *cqe = wc->wr_cqe; 267 struct svc_rdma_chunk_ctxt *cc = 268 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 269 struct svcxprt_rdma *rdma = cc->cc_rdma; 270 struct svc_rdma_write_info *info = 271 container_of(cc, struct svc_rdma_write_info, wi_cc); 272 273 switch (wc->status) { 274 case IB_WC_SUCCESS: 275 trace_svcrdma_wc_write(wc, &cc->cc_cid); 276 break; 277 case IB_WC_WR_FLUSH_ERR: 278 trace_svcrdma_wc_write_flush(wc, &cc->cc_cid); 279 break; 280 default: 281 trace_svcrdma_wc_write_err(wc, &cc->cc_cid); 282 } 283 284 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount); 285 286 if (unlikely(wc->status != IB_WC_SUCCESS)) 287 svc_xprt_deferred_close(&rdma->sc_xprt); 288 289 svc_rdma_write_info_free(info); 290 } 291 292 /* State for pulling a Read chunk. 293 */ 294 struct svc_rdma_read_info { 295 struct svc_rqst *ri_rqst; 296 struct svc_rdma_recv_ctxt *ri_readctxt; 297 unsigned int ri_pageno; 298 unsigned int ri_pageoff; 299 unsigned int ri_totalbytes; 300 301 struct svc_rdma_chunk_ctxt ri_cc; 302 }; 303 304 static struct svc_rdma_read_info * 305 svc_rdma_read_info_alloc(struct svcxprt_rdma *rdma) 306 { 307 struct svc_rdma_read_info *info; 308 309 info = kmalloc_node(sizeof(*info), GFP_KERNEL, 310 ibdev_to_node(rdma->sc_cm_id->device)); 311 if (!info) 312 return info; 313 314 svc_rdma_cc_init(rdma, &info->ri_cc); 315 info->ri_cc.cc_cqe.done = svc_rdma_wc_read_done; 316 return info; 317 } 318 319 static void svc_rdma_read_info_free(struct svc_rdma_read_info *info) 320 { 321 svc_rdma_cc_release(&info->ri_cc, DMA_FROM_DEVICE); 322 kfree(info); 323 } 324 325 /** 326 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx 327 * @cq: controlling Completion Queue 328 * @wc: Work Completion 329 * 330 */ 331 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc) 332 { 333 struct ib_cqe *cqe = wc->wr_cqe; 334 struct svc_rdma_chunk_ctxt *cc = 335 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 336 struct svc_rdma_read_info *info; 337 338 switch (wc->status) { 339 case IB_WC_SUCCESS: 340 info = container_of(cc, struct svc_rdma_read_info, ri_cc); 341 trace_svcrdma_wc_read(wc, &cc->cc_cid, info->ri_totalbytes, 342 cc->cc_posttime); 343 break; 344 case IB_WC_WR_FLUSH_ERR: 345 trace_svcrdma_wc_read_flush(wc, &cc->cc_cid); 346 break; 347 default: 348 trace_svcrdma_wc_read_err(wc, &cc->cc_cid); 349 } 350 351 svc_rdma_wake_send_waiters(cc->cc_rdma, cc->cc_sqecount); 352 cc->cc_status = wc->status; 353 complete(&cc->cc_done); 354 return; 355 } 356 357 /* 358 * Assumptions: 359 * - If ib_post_send() succeeds, only one completion is expected, 360 * even if one or more WRs are flushed. This is true when posting 361 * an rdma_rw_ctx or when posting a single signaled WR. 362 */ 363 static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc) 364 { 365 struct svcxprt_rdma *rdma = cc->cc_rdma; 366 struct ib_send_wr *first_wr; 367 const struct ib_send_wr *bad_wr; 368 struct list_head *tmp; 369 struct ib_cqe *cqe; 370 int ret; 371 372 might_sleep(); 373 374 if (cc->cc_sqecount > rdma->sc_sq_depth) 375 return -EINVAL; 376 377 first_wr = NULL; 378 cqe = &cc->cc_cqe; 379 list_for_each(tmp, &cc->cc_rwctxts) { 380 struct svc_rdma_rw_ctxt *ctxt; 381 382 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list); 383 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp, 384 rdma->sc_port_num, cqe, first_wr); 385 cqe = NULL; 386 } 387 388 do { 389 if (atomic_sub_return(cc->cc_sqecount, 390 &rdma->sc_sq_avail) > 0) { 391 cc->cc_posttime = ktime_get(); 392 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); 393 if (ret) 394 break; 395 return 0; 396 } 397 398 percpu_counter_inc(&svcrdma_stat_sq_starve); 399 trace_svcrdma_sq_full(rdma); 400 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 401 wait_event(rdma->sc_send_wait, 402 atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount); 403 trace_svcrdma_sq_retry(rdma); 404 } while (1); 405 406 trace_svcrdma_sq_post_err(rdma, ret); 407 svc_xprt_deferred_close(&rdma->sc_xprt); 408 409 /* If even one was posted, there will be a completion. */ 410 if (bad_wr != first_wr) 411 return 0; 412 413 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 414 wake_up(&rdma->sc_send_wait); 415 return -ENOTCONN; 416 } 417 418 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf 419 */ 420 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info, 421 unsigned int len, 422 struct svc_rdma_rw_ctxt *ctxt) 423 { 424 struct scatterlist *sg = ctxt->rw_sg_table.sgl; 425 426 sg_set_buf(&sg[0], info->wi_base, len); 427 info->wi_base += len; 428 429 ctxt->rw_nents = 1; 430 } 431 432 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist. 433 */ 434 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info, 435 unsigned int remaining, 436 struct svc_rdma_rw_ctxt *ctxt) 437 { 438 unsigned int sge_no, sge_bytes, page_off, page_no; 439 const struct xdr_buf *xdr = info->wi_xdr; 440 struct scatterlist *sg; 441 struct page **page; 442 443 page_off = info->wi_next_off + xdr->page_base; 444 page_no = page_off >> PAGE_SHIFT; 445 page_off = offset_in_page(page_off); 446 page = xdr->pages + page_no; 447 info->wi_next_off += remaining; 448 sg = ctxt->rw_sg_table.sgl; 449 sge_no = 0; 450 do { 451 sge_bytes = min_t(unsigned int, remaining, 452 PAGE_SIZE - page_off); 453 sg_set_page(sg, *page, sge_bytes, page_off); 454 455 remaining -= sge_bytes; 456 sg = sg_next(sg); 457 page_off = 0; 458 sge_no++; 459 page++; 460 } while (remaining); 461 462 ctxt->rw_nents = sge_no; 463 } 464 465 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing 466 * an RPC Reply. 467 */ 468 static int 469 svc_rdma_build_writes(struct svc_rdma_write_info *info, 470 void (*constructor)(struct svc_rdma_write_info *info, 471 unsigned int len, 472 struct svc_rdma_rw_ctxt *ctxt), 473 unsigned int remaining) 474 { 475 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; 476 struct svcxprt_rdma *rdma = cc->cc_rdma; 477 const struct svc_rdma_segment *seg; 478 struct svc_rdma_rw_ctxt *ctxt; 479 int ret; 480 481 do { 482 unsigned int write_len; 483 u64 offset; 484 485 if (info->wi_seg_no >= info->wi_chunk->ch_segcount) 486 goto out_overflow; 487 488 seg = &info->wi_chunk->ch_segments[info->wi_seg_no]; 489 write_len = min(remaining, seg->rs_length - info->wi_seg_off); 490 if (!write_len) 491 goto out_overflow; 492 ctxt = svc_rdma_get_rw_ctxt(rdma, 493 (write_len >> PAGE_SHIFT) + 2); 494 if (!ctxt) 495 return -ENOMEM; 496 497 constructor(info, write_len, ctxt); 498 offset = seg->rs_offset + info->wi_seg_off; 499 ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle, 500 DMA_TO_DEVICE); 501 if (ret < 0) 502 return -EIO; 503 percpu_counter_inc(&svcrdma_stat_write); 504 505 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 506 cc->cc_sqecount += ret; 507 if (write_len == seg->rs_length - info->wi_seg_off) { 508 info->wi_seg_no++; 509 info->wi_seg_off = 0; 510 } else { 511 info->wi_seg_off += write_len; 512 } 513 remaining -= write_len; 514 } while (remaining); 515 516 return 0; 517 518 out_overflow: 519 trace_svcrdma_small_wrch_err(rdma, remaining, info->wi_seg_no, 520 info->wi_chunk->ch_segcount); 521 return -E2BIG; 522 } 523 524 /** 525 * svc_rdma_iov_write - Construct RDMA Writes from an iov 526 * @info: pointer to write arguments 527 * @iov: kvec to write 528 * 529 * Returns: 530 * On success, returns zero 531 * %-E2BIG if the client-provided Write chunk is too small 532 * %-ENOMEM if a resource has been exhausted 533 * %-EIO if an rdma-rw error occurred 534 */ 535 static int svc_rdma_iov_write(struct svc_rdma_write_info *info, 536 const struct kvec *iov) 537 { 538 info->wi_base = iov->iov_base; 539 return svc_rdma_build_writes(info, svc_rdma_vec_to_sg, 540 iov->iov_len); 541 } 542 543 /** 544 * svc_rdma_pages_write - Construct RDMA Writes from pages 545 * @info: pointer to write arguments 546 * @xdr: xdr_buf with pages to write 547 * @offset: offset into the content of @xdr 548 * @length: number of bytes to write 549 * 550 * Returns: 551 * On success, returns zero 552 * %-E2BIG if the client-provided Write chunk is too small 553 * %-ENOMEM if a resource has been exhausted 554 * %-EIO if an rdma-rw error occurred 555 */ 556 static int svc_rdma_pages_write(struct svc_rdma_write_info *info, 557 const struct xdr_buf *xdr, 558 unsigned int offset, 559 unsigned long length) 560 { 561 info->wi_xdr = xdr; 562 info->wi_next_off = offset - xdr->head[0].iov_len; 563 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg, 564 length); 565 } 566 567 /** 568 * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf 569 * @xdr: xdr_buf to write 570 * @data: pointer to write arguments 571 * 572 * Returns: 573 * On success, returns zero 574 * %-E2BIG if the client-provided Write chunk is too small 575 * %-ENOMEM if a resource has been exhausted 576 * %-EIO if an rdma-rw error occurred 577 */ 578 static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data) 579 { 580 struct svc_rdma_write_info *info = data; 581 int ret; 582 583 if (xdr->head[0].iov_len) { 584 ret = svc_rdma_iov_write(info, &xdr->head[0]); 585 if (ret < 0) 586 return ret; 587 } 588 589 if (xdr->page_len) { 590 ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len, 591 xdr->page_len); 592 if (ret < 0) 593 return ret; 594 } 595 596 if (xdr->tail[0].iov_len) { 597 ret = svc_rdma_iov_write(info, &xdr->tail[0]); 598 if (ret < 0) 599 return ret; 600 } 601 602 return xdr->len; 603 } 604 605 /** 606 * svc_rdma_send_write_chunk - Write all segments in a Write chunk 607 * @rdma: controlling RDMA transport 608 * @chunk: Write chunk provided by the client 609 * @xdr: xdr_buf containing the data payload 610 * 611 * Returns a non-negative number of bytes the chunk consumed, or 612 * %-E2BIG if the payload was larger than the Write chunk, 613 * %-EINVAL if client provided too many segments, 614 * %-ENOMEM if rdma_rw context pool was exhausted, 615 * %-ENOTCONN if posting failed (connection is lost), 616 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 617 */ 618 int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, 619 const struct svc_rdma_chunk *chunk, 620 const struct xdr_buf *xdr) 621 { 622 struct svc_rdma_write_info *info; 623 struct svc_rdma_chunk_ctxt *cc; 624 int ret; 625 626 info = svc_rdma_write_info_alloc(rdma, chunk); 627 if (!info) 628 return -ENOMEM; 629 cc = &info->wi_cc; 630 631 ret = svc_rdma_xb_write(xdr, info); 632 if (ret != xdr->len) 633 goto out_err; 634 635 trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount); 636 ret = svc_rdma_post_chunk_ctxt(cc); 637 if (ret < 0) 638 goto out_err; 639 return xdr->len; 640 641 out_err: 642 svc_rdma_write_info_free(info); 643 return ret; 644 } 645 646 /** 647 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk 648 * @rdma: controlling RDMA transport 649 * @rctxt: Write and Reply chunks from client 650 * @xdr: xdr_buf containing an RPC Reply 651 * 652 * Returns a non-negative number of bytes the chunk consumed, or 653 * %-E2BIG if the payload was larger than the Reply chunk, 654 * %-EINVAL if client provided too many segments, 655 * %-ENOMEM if rdma_rw context pool was exhausted, 656 * %-ENOTCONN if posting failed (connection is lost), 657 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 658 */ 659 int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, 660 const struct svc_rdma_recv_ctxt *rctxt, 661 const struct xdr_buf *xdr) 662 { 663 struct svc_rdma_write_info *info; 664 struct svc_rdma_chunk_ctxt *cc; 665 struct svc_rdma_chunk *chunk; 666 int ret; 667 668 if (pcl_is_empty(&rctxt->rc_reply_pcl)) 669 return 0; 670 671 chunk = pcl_first_chunk(&rctxt->rc_reply_pcl); 672 info = svc_rdma_write_info_alloc(rdma, chunk); 673 if (!info) 674 return -ENOMEM; 675 cc = &info->wi_cc; 676 677 ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr, 678 svc_rdma_xb_write, info); 679 if (ret < 0) 680 goto out_err; 681 682 trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount); 683 ret = svc_rdma_post_chunk_ctxt(cc); 684 if (ret < 0) 685 goto out_err; 686 687 return xdr->len; 688 689 out_err: 690 svc_rdma_write_info_free(info); 691 return ret; 692 } 693 694 /** 695 * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment 696 * @info: context for ongoing I/O 697 * @segment: co-ordinates of remote memory to be read 698 * 699 * Returns: 700 * %0: the Read WR chain was constructed successfully 701 * %-EINVAL: there were not enough rq_pages to finish 702 * %-ENOMEM: allocating a local resources failed 703 * %-EIO: a DMA mapping error occurred 704 */ 705 static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info, 706 const struct svc_rdma_segment *segment) 707 { 708 struct svc_rdma_recv_ctxt *head = info->ri_readctxt; 709 struct svc_rdma_chunk_ctxt *cc = &info->ri_cc; 710 struct svc_rqst *rqstp = info->ri_rqst; 711 unsigned int sge_no, seg_len, len; 712 struct svc_rdma_rw_ctxt *ctxt; 713 struct scatterlist *sg; 714 int ret; 715 716 len = segment->rs_length; 717 sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT; 718 ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no); 719 if (!ctxt) 720 return -ENOMEM; 721 ctxt->rw_nents = sge_no; 722 723 sg = ctxt->rw_sg_table.sgl; 724 for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) { 725 seg_len = min_t(unsigned int, len, 726 PAGE_SIZE - info->ri_pageoff); 727 728 if (!info->ri_pageoff) 729 head->rc_page_count++; 730 731 sg_set_page(sg, rqstp->rq_pages[info->ri_pageno], 732 seg_len, info->ri_pageoff); 733 sg = sg_next(sg); 734 735 info->ri_pageoff += seg_len; 736 if (info->ri_pageoff == PAGE_SIZE) { 737 info->ri_pageno++; 738 info->ri_pageoff = 0; 739 } 740 len -= seg_len; 741 742 /* Safety check */ 743 if (len && 744 &rqstp->rq_pages[info->ri_pageno + 1] > rqstp->rq_page_end) 745 goto out_overrun; 746 } 747 748 ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, segment->rs_offset, 749 segment->rs_handle, DMA_FROM_DEVICE); 750 if (ret < 0) 751 return -EIO; 752 percpu_counter_inc(&svcrdma_stat_read); 753 754 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 755 cc->cc_sqecount += ret; 756 return 0; 757 758 out_overrun: 759 trace_svcrdma_page_overrun_err(cc->cc_rdma, rqstp, info->ri_pageno); 760 return -EINVAL; 761 } 762 763 /** 764 * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk 765 * @info: context for ongoing I/O 766 * @chunk: Read chunk to pull 767 * 768 * Return values: 769 * %0: the Read WR chain was constructed successfully 770 * %-EINVAL: there were not enough resources to finish 771 * %-ENOMEM: allocating a local resources failed 772 * %-EIO: a DMA mapping error occurred 773 */ 774 static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info, 775 const struct svc_rdma_chunk *chunk) 776 { 777 const struct svc_rdma_segment *segment; 778 int ret; 779 780 ret = -EINVAL; 781 pcl_for_each_segment(segment, chunk) { 782 ret = svc_rdma_build_read_segment(info, segment); 783 if (ret < 0) 784 break; 785 info->ri_totalbytes += segment->rs_length; 786 } 787 return ret; 788 } 789 790 /** 791 * svc_rdma_copy_inline_range - Copy part of the inline content into pages 792 * @info: context for RDMA Reads 793 * @offset: offset into the Receive buffer of region to copy 794 * @remaining: length of region to copy 795 * 796 * Take a page at a time from rqstp->rq_pages and copy the inline 797 * content from the Receive buffer into that page. Update 798 * info->ri_pageno and info->ri_pageoff so that the next RDMA Read 799 * result will land contiguously with the copied content. 800 * 801 * Return values: 802 * %0: Inline content was successfully copied 803 * %-EINVAL: offset or length was incorrect 804 */ 805 static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info, 806 unsigned int offset, 807 unsigned int remaining) 808 { 809 struct svc_rdma_recv_ctxt *head = info->ri_readctxt; 810 unsigned char *dst, *src = head->rc_recv_buf; 811 struct svc_rqst *rqstp = info->ri_rqst; 812 unsigned int page_no, numpages; 813 814 numpages = PAGE_ALIGN(info->ri_pageoff + remaining) >> PAGE_SHIFT; 815 for (page_no = 0; page_no < numpages; page_no++) { 816 unsigned int page_len; 817 818 page_len = min_t(unsigned int, remaining, 819 PAGE_SIZE - info->ri_pageoff); 820 821 if (!info->ri_pageoff) 822 head->rc_page_count++; 823 824 dst = page_address(rqstp->rq_pages[info->ri_pageno]); 825 memcpy(dst + info->ri_pageno, src + offset, page_len); 826 827 info->ri_totalbytes += page_len; 828 info->ri_pageoff += page_len; 829 if (info->ri_pageoff == PAGE_SIZE) { 830 info->ri_pageno++; 831 info->ri_pageoff = 0; 832 } 833 remaining -= page_len; 834 offset += page_len; 835 } 836 837 return -EINVAL; 838 } 839 840 /** 841 * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks 842 * @info: context for RDMA Reads 843 * 844 * The chunk data lands in rqstp->rq_arg as a series of contiguous pages, 845 * like an incoming TCP call. 846 * 847 * Return values: 848 * %0: RDMA Read WQEs were successfully built 849 * %-EINVAL: client provided too many chunks or segments, 850 * %-ENOMEM: rdma_rw context pool was exhausted, 851 * %-ENOTCONN: posting failed (connection is lost), 852 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 853 */ 854 static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *info) 855 { 856 struct svc_rdma_recv_ctxt *head = info->ri_readctxt; 857 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; 858 struct xdr_buf *buf = &info->ri_rqst->rq_arg; 859 struct svc_rdma_chunk *chunk, *next; 860 unsigned int start, length; 861 int ret; 862 863 start = 0; 864 chunk = pcl_first_chunk(pcl); 865 length = chunk->ch_position; 866 ret = svc_rdma_copy_inline_range(info, start, length); 867 if (ret < 0) 868 return ret; 869 870 pcl_for_each_chunk(chunk, pcl) { 871 ret = svc_rdma_build_read_chunk(info, chunk); 872 if (ret < 0) 873 return ret; 874 875 next = pcl_next_chunk(pcl, chunk); 876 if (!next) 877 break; 878 879 start += length; 880 length = next->ch_position - info->ri_totalbytes; 881 ret = svc_rdma_copy_inline_range(info, start, length); 882 if (ret < 0) 883 return ret; 884 } 885 886 start += length; 887 length = head->rc_byte_len - start; 888 ret = svc_rdma_copy_inline_range(info, start, length); 889 if (ret < 0) 890 return ret; 891 892 buf->len += info->ri_totalbytes; 893 buf->buflen += info->ri_totalbytes; 894 895 buf->head[0].iov_base = page_address(info->ri_rqst->rq_pages[0]); 896 buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes); 897 buf->pages = &info->ri_rqst->rq_pages[1]; 898 buf->page_len = info->ri_totalbytes - buf->head[0].iov_len; 899 return 0; 900 } 901 902 /** 903 * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks 904 * @info: context for RDMA Reads 905 * 906 * The chunk data lands in the page list of rqstp->rq_arg.pages. 907 * 908 * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec. 909 * Therefore, XDR round-up of the Read chunk and trailing 910 * inline content must both be added at the end of the pagelist. 911 * 912 * Return values: 913 * %0: RDMA Read WQEs were successfully built 914 * %-EINVAL: client provided too many chunks or segments, 915 * %-ENOMEM: rdma_rw context pool was exhausted, 916 * %-ENOTCONN: posting failed (connection is lost), 917 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 918 */ 919 static int svc_rdma_read_data_item(struct svc_rdma_read_info *info) 920 { 921 struct svc_rdma_recv_ctxt *head = info->ri_readctxt; 922 struct xdr_buf *buf = &info->ri_rqst->rq_arg; 923 struct svc_rdma_chunk *chunk; 924 unsigned int length; 925 int ret; 926 927 chunk = pcl_first_chunk(&head->rc_read_pcl); 928 ret = svc_rdma_build_read_chunk(info, chunk); 929 if (ret < 0) 930 goto out; 931 932 /* Split the Receive buffer between the head and tail 933 * buffers at Read chunk's position. XDR roundup of the 934 * chunk is not included in either the pagelist or in 935 * the tail. 936 */ 937 buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position; 938 buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position; 939 buf->head[0].iov_len = chunk->ch_position; 940 941 /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2). 942 * 943 * If the client already rounded up the chunk length, the 944 * length does not change. Otherwise, the length of the page 945 * list is increased to include XDR round-up. 946 * 947 * Currently these chunks always start at page offset 0, 948 * thus the rounded-up length never crosses a page boundary. 949 */ 950 buf->pages = &info->ri_rqst->rq_pages[0]; 951 length = xdr_align_size(chunk->ch_length); 952 buf->page_len = length; 953 buf->len += length; 954 buf->buflen += length; 955 956 out: 957 return ret; 958 } 959 960 /** 961 * svc_rdma_read_chunk_range - Build RDMA Read WQEs for portion of a chunk 962 * @info: context for RDMA Reads 963 * @chunk: parsed Call chunk to pull 964 * @offset: offset of region to pull 965 * @length: length of region to pull 966 * 967 * Return values: 968 * %0: RDMA Read WQEs were successfully built 969 * %-EINVAL: there were not enough resources to finish 970 * %-ENOMEM: rdma_rw context pool was exhausted, 971 * %-ENOTCONN: posting failed (connection is lost), 972 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 973 */ 974 static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info, 975 const struct svc_rdma_chunk *chunk, 976 unsigned int offset, unsigned int length) 977 { 978 const struct svc_rdma_segment *segment; 979 int ret; 980 981 ret = -EINVAL; 982 pcl_for_each_segment(segment, chunk) { 983 struct svc_rdma_segment dummy; 984 985 if (offset > segment->rs_length) { 986 offset -= segment->rs_length; 987 continue; 988 } 989 990 dummy.rs_handle = segment->rs_handle; 991 dummy.rs_length = min_t(u32, length, segment->rs_length) - offset; 992 dummy.rs_offset = segment->rs_offset + offset; 993 994 ret = svc_rdma_build_read_segment(info, &dummy); 995 if (ret < 0) 996 break; 997 998 info->ri_totalbytes += dummy.rs_length; 999 length -= dummy.rs_length; 1000 offset = 0; 1001 } 1002 return ret; 1003 } 1004 1005 /** 1006 * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message 1007 * @info: context for RDMA Reads 1008 * 1009 * Return values: 1010 * %0: RDMA Read WQEs were successfully built 1011 * %-EINVAL: there were not enough resources to finish 1012 * %-ENOMEM: rdma_rw context pool was exhausted, 1013 * %-ENOTCONN: posting failed (connection is lost), 1014 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1015 */ 1016 static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info) 1017 { 1018 struct svc_rdma_recv_ctxt *head = info->ri_readctxt; 1019 const struct svc_rdma_chunk *call_chunk = 1020 pcl_first_chunk(&head->rc_call_pcl); 1021 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl; 1022 struct svc_rdma_chunk *chunk, *next; 1023 unsigned int start, length; 1024 int ret; 1025 1026 if (pcl_is_empty(pcl)) 1027 return svc_rdma_build_read_chunk(info, call_chunk); 1028 1029 start = 0; 1030 chunk = pcl_first_chunk(pcl); 1031 length = chunk->ch_position; 1032 ret = svc_rdma_read_chunk_range(info, call_chunk, start, length); 1033 if (ret < 0) 1034 return ret; 1035 1036 pcl_for_each_chunk(chunk, pcl) { 1037 ret = svc_rdma_build_read_chunk(info, chunk); 1038 if (ret < 0) 1039 return ret; 1040 1041 next = pcl_next_chunk(pcl, chunk); 1042 if (!next) 1043 break; 1044 1045 start += length; 1046 length = next->ch_position - info->ri_totalbytes; 1047 ret = svc_rdma_read_chunk_range(info, call_chunk, 1048 start, length); 1049 if (ret < 0) 1050 return ret; 1051 } 1052 1053 start += length; 1054 length = call_chunk->ch_length - start; 1055 return svc_rdma_read_chunk_range(info, call_chunk, start, length); 1056 } 1057 1058 /** 1059 * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message 1060 * @info: context for RDMA Reads 1061 * 1062 * The start of the data lands in the first page just after the 1063 * Transport header, and the rest lands in rqstp->rq_arg.pages. 1064 * 1065 * Assumptions: 1066 * - A PZRC is never sent in an RDMA_MSG message, though it's 1067 * allowed by spec. 1068 * 1069 * Return values: 1070 * %0: RDMA Read WQEs were successfully built 1071 * %-EINVAL: client provided too many chunks or segments, 1072 * %-ENOMEM: rdma_rw context pool was exhausted, 1073 * %-ENOTCONN: posting failed (connection is lost), 1074 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1075 */ 1076 static noinline int svc_rdma_read_special(struct svc_rdma_read_info *info) 1077 { 1078 struct xdr_buf *buf = &info->ri_rqst->rq_arg; 1079 int ret; 1080 1081 ret = svc_rdma_read_call_chunk(info); 1082 if (ret < 0) 1083 goto out; 1084 1085 buf->len += info->ri_totalbytes; 1086 buf->buflen += info->ri_totalbytes; 1087 1088 buf->head[0].iov_base = page_address(info->ri_rqst->rq_pages[0]); 1089 buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes); 1090 buf->pages = &info->ri_rqst->rq_pages[1]; 1091 buf->page_len = info->ri_totalbytes - buf->head[0].iov_len; 1092 1093 out: 1094 return ret; 1095 } 1096 1097 /** 1098 * svc_rdma_process_read_list - Pull list of Read chunks from the client 1099 * @rdma: controlling RDMA transport 1100 * @rqstp: set of pages to use as Read sink buffers 1101 * @head: pages under I/O collect here 1102 * 1103 * The RPC/RDMA protocol assumes that the upper layer's XDR decoders 1104 * pull each Read chunk as they decode an incoming RPC message. 1105 * 1106 * On Linux, however, the server needs to have a fully-constructed RPC 1107 * message in rqstp->rq_arg when there is a positive return code from 1108 * ->xpo_recvfrom. So the Read list is safety-checked immediately when 1109 * it is received, then here the whole Read list is pulled all at once. 1110 * The ingress RPC message is fully reconstructed once all associated 1111 * RDMA Reads have completed. 1112 * 1113 * Return values: 1114 * %1: all needed RDMA Reads were posted successfully, 1115 * %-EINVAL: client provided too many chunks or segments, 1116 * %-ENOMEM: rdma_rw context pool was exhausted, 1117 * %-ENOTCONN: posting failed (connection is lost), 1118 * %-EIO: rdma_rw initialization failed (DMA mapping, etc). 1119 */ 1120 int svc_rdma_process_read_list(struct svcxprt_rdma *rdma, 1121 struct svc_rqst *rqstp, 1122 struct svc_rdma_recv_ctxt *head) 1123 { 1124 struct svc_rdma_read_info *info; 1125 struct svc_rdma_chunk_ctxt *cc; 1126 int ret; 1127 1128 info = svc_rdma_read_info_alloc(rdma); 1129 if (!info) 1130 return -ENOMEM; 1131 cc = &info->ri_cc; 1132 info->ri_rqst = rqstp; 1133 info->ri_readctxt = head; 1134 info->ri_pageno = 0; 1135 info->ri_pageoff = 0; 1136 info->ri_totalbytes = 0; 1137 1138 if (pcl_is_empty(&head->rc_call_pcl)) { 1139 if (head->rc_read_pcl.cl_count == 1) 1140 ret = svc_rdma_read_data_item(info); 1141 else 1142 ret = svc_rdma_read_multiple_chunks(info); 1143 } else 1144 ret = svc_rdma_read_special(info); 1145 if (ret < 0) 1146 goto out_err; 1147 1148 trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount); 1149 init_completion(&cc->cc_done); 1150 ret = svc_rdma_post_chunk_ctxt(cc); 1151 if (ret < 0) 1152 goto out_err; 1153 1154 ret = 1; 1155 wait_for_completion(&cc->cc_done); 1156 if (cc->cc_status != IB_WC_SUCCESS) 1157 ret = -EIO; 1158 1159 /* rq_respages starts after the last arg page */ 1160 rqstp->rq_respages = &rqstp->rq_pages[head->rc_page_count]; 1161 rqstp->rq_next_page = rqstp->rq_respages + 1; 1162 1163 /* Ensure svc_rdma_recv_ctxt_put() does not try to release pages */ 1164 head->rc_page_count = 0; 1165 1166 out_err: 1167 svc_rdma_read_info_free(info); 1168 return ret; 1169 } 1170