1 /* 2 * Copyright (c) 2016 Oracle. All rights reserved. 3 * 4 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks. 5 */ 6 7 #include <linux/sunrpc/rpc_rdma.h> 8 #include <linux/sunrpc/svc_rdma.h> 9 #include <linux/sunrpc/debug.h> 10 11 #include <rdma/rw.h> 12 13 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 14 15 /* Each R/W context contains state for one chain of RDMA Read or 16 * Write Work Requests. 17 * 18 * Each WR chain handles a single contiguous server-side buffer, 19 * because scatterlist entries after the first have to start on 20 * page alignment. xdr_buf iovecs cannot guarantee alignment. 21 * 22 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment 23 * from a client may contain a unique R_key, so each WR chain moves 24 * up to one segment at a time. 25 * 26 * The scatterlist makes this data structure over 4KB in size. To 27 * make it less likely to fail, and to handle the allocation for 28 * smaller I/O requests without disabling bottom-halves, these 29 * contexts are created on demand, but cached and reused until the 30 * controlling svcxprt_rdma is destroyed. 31 */ 32 struct svc_rdma_rw_ctxt { 33 struct list_head rw_list; 34 struct rdma_rw_ctx rw_ctx; 35 int rw_nents; 36 struct sg_table rw_sg_table; 37 struct scatterlist rw_first_sgl[0]; 38 }; 39 40 static inline struct svc_rdma_rw_ctxt * 41 svc_rdma_next_ctxt(struct list_head *list) 42 { 43 return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt, 44 rw_list); 45 } 46 47 static struct svc_rdma_rw_ctxt * 48 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges) 49 { 50 struct svc_rdma_rw_ctxt *ctxt; 51 52 spin_lock(&rdma->sc_rw_ctxt_lock); 53 54 ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts); 55 if (ctxt) { 56 list_del(&ctxt->rw_list); 57 spin_unlock(&rdma->sc_rw_ctxt_lock); 58 } else { 59 spin_unlock(&rdma->sc_rw_ctxt_lock); 60 ctxt = kmalloc(sizeof(*ctxt) + 61 SG_CHUNK_SIZE * sizeof(struct scatterlist), 62 GFP_KERNEL); 63 if (!ctxt) 64 goto out; 65 INIT_LIST_HEAD(&ctxt->rw_list); 66 } 67 68 ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl; 69 if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges, 70 ctxt->rw_sg_table.sgl)) { 71 kfree(ctxt); 72 ctxt = NULL; 73 } 74 out: 75 return ctxt; 76 } 77 78 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma, 79 struct svc_rdma_rw_ctxt *ctxt) 80 { 81 sg_free_table_chained(&ctxt->rw_sg_table, true); 82 83 spin_lock(&rdma->sc_rw_ctxt_lock); 84 list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts); 85 spin_unlock(&rdma->sc_rw_ctxt_lock); 86 } 87 88 /** 89 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts 90 * @rdma: transport about to be destroyed 91 * 92 */ 93 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma) 94 { 95 struct svc_rdma_rw_ctxt *ctxt; 96 97 while ((ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts)) != NULL) { 98 list_del(&ctxt->rw_list); 99 kfree(ctxt); 100 } 101 } 102 103 /* A chunk context tracks all I/O for moving one Read or Write 104 * chunk. This is a a set of rdma_rw's that handle data movement 105 * for all segments of one chunk. 106 * 107 * These are small, acquired with a single allocator call, and 108 * no more than one is needed per chunk. They are allocated on 109 * demand, and not cached. 110 */ 111 struct svc_rdma_chunk_ctxt { 112 struct ib_cqe cc_cqe; 113 struct svcxprt_rdma *cc_rdma; 114 struct list_head cc_rwctxts; 115 int cc_sqecount; 116 enum dma_data_direction cc_dir; 117 }; 118 119 static void svc_rdma_cc_init(struct svcxprt_rdma *rdma, 120 struct svc_rdma_chunk_ctxt *cc, 121 enum dma_data_direction dir) 122 { 123 cc->cc_rdma = rdma; 124 svc_xprt_get(&rdma->sc_xprt); 125 126 INIT_LIST_HEAD(&cc->cc_rwctxts); 127 cc->cc_sqecount = 0; 128 cc->cc_dir = dir; 129 } 130 131 static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc) 132 { 133 struct svcxprt_rdma *rdma = cc->cc_rdma; 134 struct svc_rdma_rw_ctxt *ctxt; 135 136 while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) { 137 list_del(&ctxt->rw_list); 138 139 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp, 140 rdma->sc_port_num, ctxt->rw_sg_table.sgl, 141 ctxt->rw_nents, cc->cc_dir); 142 svc_rdma_put_rw_ctxt(rdma, ctxt); 143 } 144 svc_xprt_put(&rdma->sc_xprt); 145 } 146 147 /* State for sending a Write or Reply chunk. 148 * - Tracks progress of writing one chunk over all its segments 149 * - Stores arguments for the SGL constructor functions 150 */ 151 struct svc_rdma_write_info { 152 /* write state of this chunk */ 153 unsigned int wi_seg_off; 154 unsigned int wi_seg_no; 155 unsigned int wi_nsegs; 156 __be32 *wi_segs; 157 158 /* SGL constructor arguments */ 159 struct xdr_buf *wi_xdr; 160 unsigned char *wi_base; 161 unsigned int wi_next_off; 162 163 struct svc_rdma_chunk_ctxt wi_cc; 164 }; 165 166 static struct svc_rdma_write_info * 167 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk) 168 { 169 struct svc_rdma_write_info *info; 170 171 info = kmalloc(sizeof(*info), GFP_KERNEL); 172 if (!info) 173 return info; 174 175 info->wi_seg_off = 0; 176 info->wi_seg_no = 0; 177 info->wi_nsegs = be32_to_cpup(++chunk); 178 info->wi_segs = ++chunk; 179 svc_rdma_cc_init(rdma, &info->wi_cc, DMA_TO_DEVICE); 180 return info; 181 } 182 183 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info) 184 { 185 svc_rdma_cc_release(&info->wi_cc); 186 kfree(info); 187 } 188 189 /** 190 * svc_rdma_write_done - Write chunk completion 191 * @cq: controlling Completion Queue 192 * @wc: Work Completion 193 * 194 * Pages under I/O are freed by a subsequent Send completion. 195 */ 196 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc) 197 { 198 struct ib_cqe *cqe = wc->wr_cqe; 199 struct svc_rdma_chunk_ctxt *cc = 200 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe); 201 struct svcxprt_rdma *rdma = cc->cc_rdma; 202 struct svc_rdma_write_info *info = 203 container_of(cc, struct svc_rdma_write_info, wi_cc); 204 205 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 206 wake_up(&rdma->sc_send_wait); 207 208 if (unlikely(wc->status != IB_WC_SUCCESS)) { 209 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); 210 if (wc->status != IB_WC_WR_FLUSH_ERR) 211 pr_err("svcrdma: write ctx: %s (%u/0x%x)\n", 212 ib_wc_status_msg(wc->status), 213 wc->status, wc->vendor_err); 214 } 215 216 svc_rdma_write_info_free(info); 217 } 218 219 /* This function sleeps when the transport's Send Queue is congested. 220 * 221 * Assumptions: 222 * - If ib_post_send() succeeds, only one completion is expected, 223 * even if one or more WRs are flushed. This is true when posting 224 * an rdma_rw_ctx or when posting a single signaled WR. 225 */ 226 static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc) 227 { 228 struct svcxprt_rdma *rdma = cc->cc_rdma; 229 struct svc_xprt *xprt = &rdma->sc_xprt; 230 struct ib_send_wr *first_wr, *bad_wr; 231 struct list_head *tmp; 232 struct ib_cqe *cqe; 233 int ret; 234 235 first_wr = NULL; 236 cqe = &cc->cc_cqe; 237 list_for_each(tmp, &cc->cc_rwctxts) { 238 struct svc_rdma_rw_ctxt *ctxt; 239 240 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list); 241 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp, 242 rdma->sc_port_num, cqe, first_wr); 243 cqe = NULL; 244 } 245 246 do { 247 if (atomic_sub_return(cc->cc_sqecount, 248 &rdma->sc_sq_avail) > 0) { 249 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr); 250 if (ret) 251 break; 252 return 0; 253 } 254 255 atomic_inc(&rdma_stat_sq_starve); 256 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 257 wait_event(rdma->sc_send_wait, 258 atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount); 259 } while (1); 260 261 pr_err("svcrdma: ib_post_send failed (%d)\n", ret); 262 set_bit(XPT_CLOSE, &xprt->xpt_flags); 263 264 /* If even one was posted, there will be a completion. */ 265 if (bad_wr != first_wr) 266 return 0; 267 268 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail); 269 wake_up(&rdma->sc_send_wait); 270 return -ENOTCONN; 271 } 272 273 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf 274 */ 275 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info, 276 unsigned int len, 277 struct svc_rdma_rw_ctxt *ctxt) 278 { 279 struct scatterlist *sg = ctxt->rw_sg_table.sgl; 280 281 sg_set_buf(&sg[0], info->wi_base, len); 282 info->wi_base += len; 283 284 ctxt->rw_nents = 1; 285 } 286 287 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist. 288 */ 289 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info, 290 unsigned int remaining, 291 struct svc_rdma_rw_ctxt *ctxt) 292 { 293 unsigned int sge_no, sge_bytes, page_off, page_no; 294 struct xdr_buf *xdr = info->wi_xdr; 295 struct scatterlist *sg; 296 struct page **page; 297 298 page_off = (info->wi_next_off + xdr->page_base) & ~PAGE_MASK; 299 page_no = (info->wi_next_off + xdr->page_base) >> PAGE_SHIFT; 300 page = xdr->pages + page_no; 301 info->wi_next_off += remaining; 302 sg = ctxt->rw_sg_table.sgl; 303 sge_no = 0; 304 do { 305 sge_bytes = min_t(unsigned int, remaining, 306 PAGE_SIZE - page_off); 307 sg_set_page(sg, *page, sge_bytes, page_off); 308 309 remaining -= sge_bytes; 310 sg = sg_next(sg); 311 page_off = 0; 312 sge_no++; 313 page++; 314 } while (remaining); 315 316 ctxt->rw_nents = sge_no; 317 } 318 319 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing 320 * an RPC Reply. 321 */ 322 static int 323 svc_rdma_build_writes(struct svc_rdma_write_info *info, 324 void (*constructor)(struct svc_rdma_write_info *info, 325 unsigned int len, 326 struct svc_rdma_rw_ctxt *ctxt), 327 unsigned int remaining) 328 { 329 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc; 330 struct svcxprt_rdma *rdma = cc->cc_rdma; 331 struct svc_rdma_rw_ctxt *ctxt; 332 __be32 *seg; 333 int ret; 334 335 cc->cc_cqe.done = svc_rdma_write_done; 336 seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz; 337 do { 338 unsigned int write_len; 339 u32 seg_length, seg_handle; 340 u64 seg_offset; 341 342 if (info->wi_seg_no >= info->wi_nsegs) 343 goto out_overflow; 344 345 seg_handle = be32_to_cpup(seg); 346 seg_length = be32_to_cpup(seg + 1); 347 xdr_decode_hyper(seg + 2, &seg_offset); 348 seg_offset += info->wi_seg_off; 349 350 write_len = min(remaining, seg_length - info->wi_seg_off); 351 ctxt = svc_rdma_get_rw_ctxt(rdma, 352 (write_len >> PAGE_SHIFT) + 2); 353 if (!ctxt) 354 goto out_noctx; 355 356 constructor(info, write_len, ctxt); 357 ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, 358 rdma->sc_port_num, ctxt->rw_sg_table.sgl, 359 ctxt->rw_nents, 0, seg_offset, 360 seg_handle, DMA_TO_DEVICE); 361 if (ret < 0) 362 goto out_initerr; 363 364 list_add(&ctxt->rw_list, &cc->cc_rwctxts); 365 cc->cc_sqecount += ret; 366 if (write_len == seg_length - info->wi_seg_off) { 367 seg += 4; 368 info->wi_seg_no++; 369 info->wi_seg_off = 0; 370 } else { 371 info->wi_seg_off += write_len; 372 } 373 remaining -= write_len; 374 } while (remaining); 375 376 return 0; 377 378 out_overflow: 379 dprintk("svcrdma: inadequate space in Write chunk (%u)\n", 380 info->wi_nsegs); 381 return -E2BIG; 382 383 out_noctx: 384 dprintk("svcrdma: no R/W ctxs available\n"); 385 return -ENOMEM; 386 387 out_initerr: 388 svc_rdma_put_rw_ctxt(rdma, ctxt); 389 pr_err("svcrdma: failed to map pagelist (%d)\n", ret); 390 return -EIO; 391 } 392 393 /* Send one of an xdr_buf's kvecs by itself. To send a Reply 394 * chunk, the whole RPC Reply is written back to the client. 395 * This function writes either the head or tail of the xdr_buf 396 * containing the Reply. 397 */ 398 static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info, 399 struct kvec *vec) 400 { 401 info->wi_base = vec->iov_base; 402 return svc_rdma_build_writes(info, svc_rdma_vec_to_sg, 403 vec->iov_len); 404 } 405 406 /* Send an xdr_buf's page list by itself. A Write chunk is 407 * just the page list. a Reply chunk is the head, page list, 408 * and tail. This function is shared between the two types 409 * of chunk. 410 */ 411 static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info, 412 struct xdr_buf *xdr) 413 { 414 info->wi_xdr = xdr; 415 info->wi_next_off = 0; 416 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg, 417 xdr->page_len); 418 } 419 420 /** 421 * svc_rdma_send_write_chunk - Write all segments in a Write chunk 422 * @rdma: controlling RDMA transport 423 * @wr_ch: Write chunk provided by client 424 * @xdr: xdr_buf containing the data payload 425 * 426 * Returns a non-negative number of bytes the chunk consumed, or 427 * %-E2BIG if the payload was larger than the Write chunk, 428 * %-ENOMEM if rdma_rw context pool was exhausted, 429 * %-ENOTCONN if posting failed (connection is lost), 430 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 431 */ 432 int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch, 433 struct xdr_buf *xdr) 434 { 435 struct svc_rdma_write_info *info; 436 int ret; 437 438 if (!xdr->page_len) 439 return 0; 440 441 info = svc_rdma_write_info_alloc(rdma, wr_ch); 442 if (!info) 443 return -ENOMEM; 444 445 ret = svc_rdma_send_xdr_pagelist(info, xdr); 446 if (ret < 0) 447 goto out_err; 448 449 ret = svc_rdma_post_chunk_ctxt(&info->wi_cc); 450 if (ret < 0) 451 goto out_err; 452 return xdr->page_len; 453 454 out_err: 455 svc_rdma_write_info_free(info); 456 return ret; 457 } 458 459 /** 460 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk 461 * @rdma: controlling RDMA transport 462 * @rp_ch: Reply chunk provided by client 463 * @writelist: true if client provided a Write list 464 * @xdr: xdr_buf containing an RPC Reply 465 * 466 * Returns a non-negative number of bytes the chunk consumed, or 467 * %-E2BIG if the payload was larger than the Reply chunk, 468 * %-ENOMEM if rdma_rw context pool was exhausted, 469 * %-ENOTCONN if posting failed (connection is lost), 470 * %-EIO if rdma_rw initialization failed (DMA mapping, etc). 471 */ 472 int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch, 473 bool writelist, struct xdr_buf *xdr) 474 { 475 struct svc_rdma_write_info *info; 476 int consumed, ret; 477 478 info = svc_rdma_write_info_alloc(rdma, rp_ch); 479 if (!info) 480 return -ENOMEM; 481 482 ret = svc_rdma_send_xdr_kvec(info, &xdr->head[0]); 483 if (ret < 0) 484 goto out_err; 485 consumed = xdr->head[0].iov_len; 486 487 /* Send the page list in the Reply chunk only if the 488 * client did not provide Write chunks. 489 */ 490 if (!writelist && xdr->page_len) { 491 ret = svc_rdma_send_xdr_pagelist(info, xdr); 492 if (ret < 0) 493 goto out_err; 494 consumed += xdr->page_len; 495 } 496 497 if (xdr->tail[0].iov_len) { 498 ret = svc_rdma_send_xdr_kvec(info, &xdr->tail[0]); 499 if (ret < 0) 500 goto out_err; 501 consumed += xdr->tail[0].iov_len; 502 } 503 504 ret = svc_rdma_post_chunk_ctxt(&info->wi_cc); 505 if (ret < 0) 506 goto out_err; 507 return consumed; 508 509 out_err: 510 svc_rdma_write_info_free(info); 511 return ret; 512 } 513