1 /* 2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 */ 39 40 /* 41 * rpc_rdma.c 42 * 43 * This file contains the guts of the RPC RDMA protocol, and 44 * does marshaling/unmarshaling, etc. It is also where interfacing 45 * to the Linux RPC framework lives. 46 */ 47 48 #include "xprt_rdma.h" 49 50 #include <linux/highmem.h> 51 52 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 53 # define RPCDBG_FACILITY RPCDBG_TRANS 54 #endif 55 56 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) 57 static const char transfertypes[][12] = { 58 "pure inline", /* no chunks */ 59 " read chunk", /* some argument via rdma read */ 60 "*read chunk", /* entire request via rdma read */ 61 "write chunk", /* some result via rdma write */ 62 "reply chunk" /* entire reply via rdma write */ 63 }; 64 #endif 65 66 /* 67 * Chunk assembly from upper layer xdr_buf. 68 * 69 * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk 70 * elements. Segments are then coalesced when registered, if possible 71 * within the selected memreg mode. 72 * 73 * Returns positive number of segments converted, or a negative errno. 74 */ 75 76 static int 77 rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, 78 enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs) 79 { 80 int len, n = 0, p; 81 int page_base; 82 struct page **ppages; 83 84 if (pos == 0 && xdrbuf->head[0].iov_len) { 85 seg[n].mr_page = NULL; 86 seg[n].mr_offset = xdrbuf->head[0].iov_base; 87 seg[n].mr_len = xdrbuf->head[0].iov_len; 88 ++n; 89 } 90 91 len = xdrbuf->page_len; 92 ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); 93 page_base = xdrbuf->page_base & ~PAGE_MASK; 94 p = 0; 95 while (len && n < nsegs) { 96 if (!ppages[p]) { 97 /* alloc the pagelist for receiving buffer */ 98 ppages[p] = alloc_page(GFP_ATOMIC); 99 if (!ppages[p]) 100 return -ENOMEM; 101 } 102 seg[n].mr_page = ppages[p]; 103 seg[n].mr_offset = (void *)(unsigned long) page_base; 104 seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len); 105 if (seg[n].mr_len > PAGE_SIZE) 106 return -EIO; 107 len -= seg[n].mr_len; 108 ++n; 109 ++p; 110 page_base = 0; /* page offset only applies to first page */ 111 } 112 113 /* Message overflows the seg array */ 114 if (len && n == nsegs) 115 return -EIO; 116 117 if (xdrbuf->tail[0].iov_len) { 118 /* the rpcrdma protocol allows us to omit any trailing 119 * xdr pad bytes, saving the server an RDMA operation. */ 120 if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize) 121 return n; 122 if (n == nsegs) 123 /* Tail remains, but we're out of segments */ 124 return -EIO; 125 seg[n].mr_page = NULL; 126 seg[n].mr_offset = xdrbuf->tail[0].iov_base; 127 seg[n].mr_len = xdrbuf->tail[0].iov_len; 128 ++n; 129 } 130 131 return n; 132 } 133 134 /* 135 * Create read/write chunk lists, and reply chunks, for RDMA 136 * 137 * Assume check against THRESHOLD has been done, and chunks are required. 138 * Assume only encoding one list entry for read|write chunks. The NFSv3 139 * protocol is simple enough to allow this as it only has a single "bulk 140 * result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The 141 * RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.) 142 * 143 * When used for a single reply chunk (which is a special write 144 * chunk used for the entire reply, rather than just the data), it 145 * is used primarily for READDIR and READLINK which would otherwise 146 * be severely size-limited by a small rdma inline read max. The server 147 * response will come back as an RDMA Write, followed by a message 148 * of type RDMA_NOMSG carrying the xid and length. As a result, reply 149 * chunks do not provide data alignment, however they do not require 150 * "fixup" (moving the response to the upper layer buffer) either. 151 * 152 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 153 * 154 * Read chunklist (a linked list): 155 * N elements, position P (same P for all chunks of same arg!): 156 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 157 * 158 * Write chunklist (a list of (one) counted array): 159 * N elements: 160 * 1 - N - HLOO - HLOO - ... - HLOO - 0 161 * 162 * Reply chunk (a counted array): 163 * N elements: 164 * 1 - N - HLOO - HLOO - ... - HLOO 165 * 166 * Returns positive RPC/RDMA header size, or negative errno. 167 */ 168 169 static ssize_t 170 rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, 171 struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type) 172 { 173 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 174 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 175 int n, nsegs, nchunks = 0; 176 unsigned int pos; 177 struct rpcrdma_mr_seg *seg = req->rl_segments; 178 struct rpcrdma_read_chunk *cur_rchunk = NULL; 179 struct rpcrdma_write_array *warray = NULL; 180 struct rpcrdma_write_chunk *cur_wchunk = NULL; 181 __be32 *iptr = headerp->rm_body.rm_chunks; 182 183 if (type == rpcrdma_readch || type == rpcrdma_areadch) { 184 /* a read chunk - server will RDMA Read our memory */ 185 cur_rchunk = (struct rpcrdma_read_chunk *) iptr; 186 } else { 187 /* a write or reply chunk - server will RDMA Write our memory */ 188 *iptr++ = xdr_zero; /* encode a NULL read chunk list */ 189 if (type == rpcrdma_replych) 190 *iptr++ = xdr_zero; /* a NULL write chunk list */ 191 warray = (struct rpcrdma_write_array *) iptr; 192 cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1); 193 } 194 195 if (type == rpcrdma_replych || type == rpcrdma_areadch) 196 pos = 0; 197 else 198 pos = target->head[0].iov_len; 199 200 nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS); 201 if (nsegs < 0) 202 return nsegs; 203 204 do { 205 n = rpcrdma_register_external(seg, nsegs, 206 cur_wchunk != NULL, r_xprt); 207 if (n <= 0) 208 goto out; 209 if (cur_rchunk) { /* read */ 210 cur_rchunk->rc_discrim = xdr_one; 211 /* all read chunks have the same "position" */ 212 cur_rchunk->rc_position = cpu_to_be32(pos); 213 cur_rchunk->rc_target.rs_handle = 214 cpu_to_be32(seg->mr_rkey); 215 cur_rchunk->rc_target.rs_length = 216 cpu_to_be32(seg->mr_len); 217 xdr_encode_hyper( 218 (__be32 *)&cur_rchunk->rc_target.rs_offset, 219 seg->mr_base); 220 dprintk("RPC: %s: read chunk " 221 "elem %d@0x%llx:0x%x pos %u (%s)\n", __func__, 222 seg->mr_len, (unsigned long long)seg->mr_base, 223 seg->mr_rkey, pos, n < nsegs ? "more" : "last"); 224 cur_rchunk++; 225 r_xprt->rx_stats.read_chunk_count++; 226 } else { /* write/reply */ 227 cur_wchunk->wc_target.rs_handle = 228 cpu_to_be32(seg->mr_rkey); 229 cur_wchunk->wc_target.rs_length = 230 cpu_to_be32(seg->mr_len); 231 xdr_encode_hyper( 232 (__be32 *)&cur_wchunk->wc_target.rs_offset, 233 seg->mr_base); 234 dprintk("RPC: %s: %s chunk " 235 "elem %d@0x%llx:0x%x (%s)\n", __func__, 236 (type == rpcrdma_replych) ? "reply" : "write", 237 seg->mr_len, (unsigned long long)seg->mr_base, 238 seg->mr_rkey, n < nsegs ? "more" : "last"); 239 cur_wchunk++; 240 if (type == rpcrdma_replych) 241 r_xprt->rx_stats.reply_chunk_count++; 242 else 243 r_xprt->rx_stats.write_chunk_count++; 244 r_xprt->rx_stats.total_rdma_request += seg->mr_len; 245 } 246 nchunks++; 247 seg += n; 248 nsegs -= n; 249 } while (nsegs); 250 251 /* success. all failures return above */ 252 req->rl_nchunks = nchunks; 253 254 /* 255 * finish off header. If write, marshal discrim and nchunks. 256 */ 257 if (cur_rchunk) { 258 iptr = (__be32 *) cur_rchunk; 259 *iptr++ = xdr_zero; /* finish the read chunk list */ 260 *iptr++ = xdr_zero; /* encode a NULL write chunk list */ 261 *iptr++ = xdr_zero; /* encode a NULL reply chunk */ 262 } else { 263 warray->wc_discrim = xdr_one; 264 warray->wc_nchunks = cpu_to_be32(nchunks); 265 iptr = (__be32 *) cur_wchunk; 266 if (type == rpcrdma_writech) { 267 *iptr++ = xdr_zero; /* finish the write chunk list */ 268 *iptr++ = xdr_zero; /* encode a NULL reply chunk */ 269 } 270 } 271 272 /* 273 * Return header size. 274 */ 275 return (unsigned char *)iptr - (unsigned char *)headerp; 276 277 out: 278 if (r_xprt->rx_ia.ri_memreg_strategy != RPCRDMA_FRMR) { 279 for (pos = 0; nchunks--;) 280 pos += rpcrdma_deregister_external( 281 &req->rl_segments[pos], r_xprt); 282 } 283 return n; 284 } 285 286 /* 287 * Marshal chunks. This routine returns the header length 288 * consumed by marshaling. 289 * 290 * Returns positive RPC/RDMA header size, or negative errno. 291 */ 292 293 ssize_t 294 rpcrdma_marshal_chunks(struct rpc_rqst *rqst, ssize_t result) 295 { 296 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 297 struct rpcrdma_msg *headerp = rdmab_to_msg(req->rl_rdmabuf); 298 299 if (req->rl_rtype != rpcrdma_noch) 300 result = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf, 301 headerp, req->rl_rtype); 302 else if (req->rl_wtype != rpcrdma_noch) 303 result = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf, 304 headerp, req->rl_wtype); 305 return result; 306 } 307 308 /* 309 * Copy write data inline. 310 * This function is used for "small" requests. Data which is passed 311 * to RPC via iovecs (or page list) is copied directly into the 312 * pre-registered memory buffer for this request. For small amounts 313 * of data, this is efficient. The cutoff value is tunable. 314 */ 315 static int 316 rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad) 317 { 318 int i, npages, curlen; 319 int copy_len; 320 unsigned char *srcp, *destp; 321 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 322 int page_base; 323 struct page **ppages; 324 325 destp = rqst->rq_svec[0].iov_base; 326 curlen = rqst->rq_svec[0].iov_len; 327 destp += curlen; 328 /* 329 * Do optional padding where it makes sense. Alignment of write 330 * payload can help the server, if our setting is accurate. 331 */ 332 pad -= (curlen + 36/*sizeof(struct rpcrdma_msg_padded)*/); 333 if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH) 334 pad = 0; /* don't pad this request */ 335 336 dprintk("RPC: %s: pad %d destp 0x%p len %d hdrlen %d\n", 337 __func__, pad, destp, rqst->rq_slen, curlen); 338 339 copy_len = rqst->rq_snd_buf.page_len; 340 341 if (rqst->rq_snd_buf.tail[0].iov_len) { 342 curlen = rqst->rq_snd_buf.tail[0].iov_len; 343 if (destp + copy_len != rqst->rq_snd_buf.tail[0].iov_base) { 344 memmove(destp + copy_len, 345 rqst->rq_snd_buf.tail[0].iov_base, curlen); 346 r_xprt->rx_stats.pullup_copy_count += curlen; 347 } 348 dprintk("RPC: %s: tail destp 0x%p len %d\n", 349 __func__, destp + copy_len, curlen); 350 rqst->rq_svec[0].iov_len += curlen; 351 } 352 r_xprt->rx_stats.pullup_copy_count += copy_len; 353 354 page_base = rqst->rq_snd_buf.page_base; 355 ppages = rqst->rq_snd_buf.pages + (page_base >> PAGE_SHIFT); 356 page_base &= ~PAGE_MASK; 357 npages = PAGE_ALIGN(page_base+copy_len) >> PAGE_SHIFT; 358 for (i = 0; copy_len && i < npages; i++) { 359 curlen = PAGE_SIZE - page_base; 360 if (curlen > copy_len) 361 curlen = copy_len; 362 dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n", 363 __func__, i, destp, copy_len, curlen); 364 srcp = kmap_atomic(ppages[i]); 365 memcpy(destp, srcp+page_base, curlen); 366 kunmap_atomic(srcp); 367 rqst->rq_svec[0].iov_len += curlen; 368 destp += curlen; 369 copy_len -= curlen; 370 page_base = 0; 371 } 372 /* header now contains entire send message */ 373 return pad; 374 } 375 376 /* 377 * Marshal a request: the primary job of this routine is to choose 378 * the transfer modes. See comments below. 379 * 380 * Uses multiple RDMA IOVs for a request: 381 * [0] -- RPC RDMA header, which uses memory from the *start* of the 382 * preregistered buffer that already holds the RPC data in 383 * its middle. 384 * [1] -- the RPC header/data, marshaled by RPC and the NFS protocol. 385 * [2] -- optional padding. 386 * [3] -- if padded, header only in [1] and data here. 387 * 388 * Returns zero on success, otherwise a negative errno. 389 */ 390 391 int 392 rpcrdma_marshal_req(struct rpc_rqst *rqst) 393 { 394 struct rpc_xprt *xprt = rqst->rq_xprt; 395 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 396 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 397 char *base; 398 size_t rpclen, padlen; 399 ssize_t hdrlen; 400 struct rpcrdma_msg *headerp; 401 402 /* 403 * rpclen gets amount of data in first buffer, which is the 404 * pre-registered buffer. 405 */ 406 base = rqst->rq_svec[0].iov_base; 407 rpclen = rqst->rq_svec[0].iov_len; 408 409 headerp = rdmab_to_msg(req->rl_rdmabuf); 410 /* don't byte-swap XID, it's already done in request */ 411 headerp->rm_xid = rqst->rq_xid; 412 headerp->rm_vers = rpcrdma_version; 413 headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests); 414 headerp->rm_type = rdma_msg; 415 416 /* 417 * Chunks needed for results? 418 * 419 * o If the expected result is under the inline threshold, all ops 420 * return as inline (but see later). 421 * o Large non-read ops return as a single reply chunk. 422 * o Large read ops return data as write chunk(s), header as inline. 423 * 424 * Note: the NFS code sending down multiple result segments implies 425 * the op is one of read, readdir[plus], readlink or NFSv4 getacl. 426 */ 427 428 /* 429 * This code can handle read chunks, write chunks OR reply 430 * chunks -- only one type. If the request is too big to fit 431 * inline, then we will choose read chunks. If the request is 432 * a READ, then use write chunks to separate the file data 433 * into pages; otherwise use reply chunks. 434 */ 435 if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst)) 436 req->rl_wtype = rpcrdma_noch; 437 else if (rqst->rq_rcv_buf.page_len == 0) 438 req->rl_wtype = rpcrdma_replych; 439 else if (rqst->rq_rcv_buf.flags & XDRBUF_READ) 440 req->rl_wtype = rpcrdma_writech; 441 else 442 req->rl_wtype = rpcrdma_replych; 443 444 /* 445 * Chunks needed for arguments? 446 * 447 * o If the total request is under the inline threshold, all ops 448 * are sent as inline. 449 * o Large non-write ops are sent with the entire message as a 450 * single read chunk (protocol 0-position special case). 451 * o Large write ops transmit data as read chunk(s), header as 452 * inline. 453 * 454 * Note: the NFS code sending down multiple argument segments 455 * implies the op is a write. 456 * TBD check NFSv4 setacl 457 */ 458 if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst)) 459 req->rl_rtype = rpcrdma_noch; 460 else if (rqst->rq_snd_buf.page_len == 0) 461 req->rl_rtype = rpcrdma_areadch; 462 else 463 req->rl_rtype = rpcrdma_readch; 464 465 /* The following simplification is not true forever */ 466 if (req->rl_rtype != rpcrdma_noch && req->rl_wtype == rpcrdma_replych) 467 req->rl_wtype = rpcrdma_noch; 468 if (req->rl_rtype != rpcrdma_noch && req->rl_wtype != rpcrdma_noch) { 469 dprintk("RPC: %s: cannot marshal multiple chunk lists\n", 470 __func__); 471 return -EIO; 472 } 473 474 hdrlen = RPCRDMA_HDRLEN_MIN; 475 padlen = 0; 476 477 /* 478 * Pull up any extra send data into the preregistered buffer. 479 * When padding is in use and applies to the transfer, insert 480 * it and change the message type. 481 */ 482 if (req->rl_rtype == rpcrdma_noch) { 483 484 padlen = rpcrdma_inline_pullup(rqst, 485 RPCRDMA_INLINE_PAD_VALUE(rqst)); 486 487 if (padlen) { 488 headerp->rm_type = rdma_msgp; 489 headerp->rm_body.rm_padded.rm_align = 490 cpu_to_be32(RPCRDMA_INLINE_PAD_VALUE(rqst)); 491 headerp->rm_body.rm_padded.rm_thresh = 492 cpu_to_be32(RPCRDMA_INLINE_PAD_THRESH); 493 headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero; 494 headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero; 495 headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero; 496 hdrlen += 2 * sizeof(u32); /* extra words in padhdr */ 497 if (req->rl_wtype != rpcrdma_noch) { 498 dprintk("RPC: %s: invalid chunk list\n", 499 __func__); 500 return -EIO; 501 } 502 } else { 503 headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero; 504 headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero; 505 headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero; 506 /* new length after pullup */ 507 rpclen = rqst->rq_svec[0].iov_len; 508 /* 509 * Currently we try to not actually use read inline. 510 * Reply chunks have the desirable property that 511 * they land, packed, directly in the target buffers 512 * without headers, so they require no fixup. The 513 * additional RDMA Write op sends the same amount 514 * of data, streams on-the-wire and adds no overhead 515 * on receive. Therefore, we request a reply chunk 516 * for non-writes wherever feasible and efficient. 517 */ 518 if (req->rl_wtype == rpcrdma_noch) 519 req->rl_wtype = rpcrdma_replych; 520 } 521 } 522 523 hdrlen = rpcrdma_marshal_chunks(rqst, hdrlen); 524 if (hdrlen < 0) 525 return hdrlen; 526 527 dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd" 528 " headerp 0x%p base 0x%p lkey 0x%x\n", 529 __func__, transfertypes[req->rl_wtype], hdrlen, rpclen, padlen, 530 headerp, base, rdmab_lkey(req->rl_rdmabuf)); 531 532 /* 533 * initialize send_iov's - normally only two: rdma chunk header and 534 * single preregistered RPC header buffer, but if padding is present, 535 * then use a preregistered (and zeroed) pad buffer between the RPC 536 * header and any write data. In all non-rdma cases, any following 537 * data has been copied into the RPC header buffer. 538 */ 539 req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); 540 req->rl_send_iov[0].length = hdrlen; 541 req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf); 542 543 req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf); 544 req->rl_send_iov[1].length = rpclen; 545 req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf); 546 547 req->rl_niovs = 2; 548 549 if (padlen) { 550 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 551 552 req->rl_send_iov[2].addr = rdmab_addr(ep->rep_padbuf); 553 req->rl_send_iov[2].length = padlen; 554 req->rl_send_iov[2].lkey = rdmab_lkey(ep->rep_padbuf); 555 556 req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen; 557 req->rl_send_iov[3].length = rqst->rq_slen - rpclen; 558 req->rl_send_iov[3].lkey = rdmab_lkey(req->rl_sendbuf); 559 560 req->rl_niovs = 4; 561 } 562 563 return 0; 564 } 565 566 /* 567 * Chase down a received write or reply chunklist to get length 568 * RDMA'd by server. See map at rpcrdma_create_chunks()! :-) 569 */ 570 static int 571 rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __be32 **iptrp) 572 { 573 unsigned int i, total_len; 574 struct rpcrdma_write_chunk *cur_wchunk; 575 char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf); 576 577 i = be32_to_cpu(**iptrp); 578 if (i > max) 579 return -1; 580 cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1); 581 total_len = 0; 582 while (i--) { 583 struct rpcrdma_segment *seg = &cur_wchunk->wc_target; 584 ifdebug(FACILITY) { 585 u64 off; 586 xdr_decode_hyper((__be32 *)&seg->rs_offset, &off); 587 dprintk("RPC: %s: chunk %d@0x%llx:0x%x\n", 588 __func__, 589 be32_to_cpu(seg->rs_length), 590 (unsigned long long)off, 591 be32_to_cpu(seg->rs_handle)); 592 } 593 total_len += be32_to_cpu(seg->rs_length); 594 ++cur_wchunk; 595 } 596 /* check and adjust for properly terminated write chunk */ 597 if (wrchunk) { 598 __be32 *w = (__be32 *) cur_wchunk; 599 if (*w++ != xdr_zero) 600 return -1; 601 cur_wchunk = (struct rpcrdma_write_chunk *) w; 602 } 603 if ((char *)cur_wchunk > base + rep->rr_len) 604 return -1; 605 606 *iptrp = (__be32 *) cur_wchunk; 607 return total_len; 608 } 609 610 /* 611 * Scatter inline received data back into provided iov's. 612 */ 613 static void 614 rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) 615 { 616 int i, npages, curlen, olen; 617 char *destp; 618 struct page **ppages; 619 int page_base; 620 621 curlen = rqst->rq_rcv_buf.head[0].iov_len; 622 if (curlen > copy_len) { /* write chunk header fixup */ 623 curlen = copy_len; 624 rqst->rq_rcv_buf.head[0].iov_len = curlen; 625 } 626 627 dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n", 628 __func__, srcp, copy_len, curlen); 629 630 /* Shift pointer for first receive segment only */ 631 rqst->rq_rcv_buf.head[0].iov_base = srcp; 632 srcp += curlen; 633 copy_len -= curlen; 634 635 olen = copy_len; 636 i = 0; 637 rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen; 638 page_base = rqst->rq_rcv_buf.page_base; 639 ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT); 640 page_base &= ~PAGE_MASK; 641 642 if (copy_len && rqst->rq_rcv_buf.page_len) { 643 npages = PAGE_ALIGN(page_base + 644 rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT; 645 for (; i < npages; i++) { 646 curlen = PAGE_SIZE - page_base; 647 if (curlen > copy_len) 648 curlen = copy_len; 649 dprintk("RPC: %s: page %d" 650 " srcp 0x%p len %d curlen %d\n", 651 __func__, i, srcp, copy_len, curlen); 652 destp = kmap_atomic(ppages[i]); 653 memcpy(destp + page_base, srcp, curlen); 654 flush_dcache_page(ppages[i]); 655 kunmap_atomic(destp); 656 srcp += curlen; 657 copy_len -= curlen; 658 if (copy_len == 0) 659 break; 660 page_base = 0; 661 } 662 } 663 664 if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) { 665 curlen = copy_len; 666 if (curlen > rqst->rq_rcv_buf.tail[0].iov_len) 667 curlen = rqst->rq_rcv_buf.tail[0].iov_len; 668 if (rqst->rq_rcv_buf.tail[0].iov_base != srcp) 669 memmove(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen); 670 dprintk("RPC: %s: tail srcp 0x%p len %d curlen %d\n", 671 __func__, srcp, copy_len, curlen); 672 rqst->rq_rcv_buf.tail[0].iov_len = curlen; 673 copy_len -= curlen; ++i; 674 } else 675 rqst->rq_rcv_buf.tail[0].iov_len = 0; 676 677 if (pad) { 678 /* implicit padding on terminal chunk */ 679 unsigned char *p = rqst->rq_rcv_buf.tail[0].iov_base; 680 while (pad--) 681 p[rqst->rq_rcv_buf.tail[0].iov_len++] = 0; 682 } 683 684 if (copy_len) 685 dprintk("RPC: %s: %d bytes in" 686 " %d extra segments (%d lost)\n", 687 __func__, olen, i, copy_len); 688 689 /* TBD avoid a warning from call_decode() */ 690 rqst->rq_private_buf = rqst->rq_rcv_buf; 691 } 692 693 void 694 rpcrdma_connect_worker(struct work_struct *work) 695 { 696 struct rpcrdma_ep *ep = 697 container_of(work, struct rpcrdma_ep, rep_connect_worker.work); 698 struct rpcrdma_xprt *r_xprt = 699 container_of(ep, struct rpcrdma_xprt, rx_ep); 700 struct rpc_xprt *xprt = &r_xprt->rx_xprt; 701 702 spin_lock_bh(&xprt->transport_lock); 703 if (++xprt->connect_cookie == 0) /* maintain a reserved value */ 704 ++xprt->connect_cookie; 705 if (ep->rep_connected > 0) { 706 if (!xprt_test_and_set_connected(xprt)) 707 xprt_wake_pending_tasks(xprt, 0); 708 } else { 709 if (xprt_test_and_clear_connected(xprt)) 710 xprt_wake_pending_tasks(xprt, -ENOTCONN); 711 } 712 spin_unlock_bh(&xprt->transport_lock); 713 } 714 715 /* 716 * This function is called when an async event is posted to 717 * the connection which changes the connection state. All it 718 * does at this point is mark the connection up/down, the rpc 719 * timers do the rest. 720 */ 721 void 722 rpcrdma_conn_func(struct rpcrdma_ep *ep) 723 { 724 schedule_delayed_work(&ep->rep_connect_worker, 0); 725 } 726 727 /* 728 * Called as a tasklet to do req/reply match and complete a request 729 * Errors must result in the RPC task either being awakened, or 730 * allowed to timeout, to discover the errors at that time. 731 */ 732 void 733 rpcrdma_reply_handler(struct rpcrdma_rep *rep) 734 { 735 struct rpcrdma_msg *headerp; 736 struct rpcrdma_req *req; 737 struct rpc_rqst *rqst; 738 struct rpc_xprt *xprt = rep->rr_xprt; 739 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 740 __be32 *iptr; 741 int credits, rdmalen, status; 742 unsigned long cwnd; 743 744 /* Check status. If bad, signal disconnect and return rep to pool */ 745 if (rep->rr_len == ~0U) { 746 rpcrdma_recv_buffer_put(rep); 747 if (r_xprt->rx_ep.rep_connected == 1) { 748 r_xprt->rx_ep.rep_connected = -EIO; 749 rpcrdma_conn_func(&r_xprt->rx_ep); 750 } 751 return; 752 } 753 if (rep->rr_len < RPCRDMA_HDRLEN_MIN) { 754 dprintk("RPC: %s: short/invalid reply\n", __func__); 755 goto repost; 756 } 757 headerp = rdmab_to_msg(rep->rr_rdmabuf); 758 if (headerp->rm_vers != rpcrdma_version) { 759 dprintk("RPC: %s: invalid version %d\n", 760 __func__, be32_to_cpu(headerp->rm_vers)); 761 goto repost; 762 } 763 764 /* Get XID and try for a match. */ 765 spin_lock(&xprt->transport_lock); 766 rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); 767 if (rqst == NULL) { 768 spin_unlock(&xprt->transport_lock); 769 dprintk("RPC: %s: reply 0x%p failed " 770 "to match any request xid 0x%08x len %d\n", 771 __func__, rep, be32_to_cpu(headerp->rm_xid), 772 rep->rr_len); 773 repost: 774 r_xprt->rx_stats.bad_reply_count++; 775 rep->rr_func = rpcrdma_reply_handler; 776 if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep)) 777 rpcrdma_recv_buffer_put(rep); 778 779 return; 780 } 781 782 /* get request object */ 783 req = rpcr_to_rdmar(rqst); 784 if (req->rl_reply) { 785 spin_unlock(&xprt->transport_lock); 786 dprintk("RPC: %s: duplicate reply 0x%p to RPC " 787 "request 0x%p: xid 0x%08x\n", __func__, rep, req, 788 be32_to_cpu(headerp->rm_xid)); 789 goto repost; 790 } 791 792 dprintk("RPC: %s: reply 0x%p completes request 0x%p\n" 793 " RPC request 0x%p xid 0x%08x\n", 794 __func__, rep, req, rqst, 795 be32_to_cpu(headerp->rm_xid)); 796 797 /* from here on, the reply is no longer an orphan */ 798 req->rl_reply = rep; 799 xprt->reestablish_timeout = 0; 800 801 /* check for expected message types */ 802 /* The order of some of these tests is important. */ 803 switch (headerp->rm_type) { 804 case rdma_msg: 805 /* never expect read chunks */ 806 /* never expect reply chunks (two ways to check) */ 807 /* never expect write chunks without having offered RDMA */ 808 if (headerp->rm_body.rm_chunks[0] != xdr_zero || 809 (headerp->rm_body.rm_chunks[1] == xdr_zero && 810 headerp->rm_body.rm_chunks[2] != xdr_zero) || 811 (headerp->rm_body.rm_chunks[1] != xdr_zero && 812 req->rl_nchunks == 0)) 813 goto badheader; 814 if (headerp->rm_body.rm_chunks[1] != xdr_zero) { 815 /* count any expected write chunks in read reply */ 816 /* start at write chunk array count */ 817 iptr = &headerp->rm_body.rm_chunks[2]; 818 rdmalen = rpcrdma_count_chunks(rep, 819 req->rl_nchunks, 1, &iptr); 820 /* check for validity, and no reply chunk after */ 821 if (rdmalen < 0 || *iptr++ != xdr_zero) 822 goto badheader; 823 rep->rr_len -= 824 ((unsigned char *)iptr - (unsigned char *)headerp); 825 status = rep->rr_len + rdmalen; 826 r_xprt->rx_stats.total_rdma_reply += rdmalen; 827 /* special case - last chunk may omit padding */ 828 if (rdmalen &= 3) { 829 rdmalen = 4 - rdmalen; 830 status += rdmalen; 831 } 832 } else { 833 /* else ordinary inline */ 834 rdmalen = 0; 835 iptr = (__be32 *)((unsigned char *)headerp + 836 RPCRDMA_HDRLEN_MIN); 837 rep->rr_len -= RPCRDMA_HDRLEN_MIN; 838 status = rep->rr_len; 839 } 840 /* Fix up the rpc results for upper layer */ 841 rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen); 842 break; 843 844 case rdma_nomsg: 845 /* never expect read or write chunks, always reply chunks */ 846 if (headerp->rm_body.rm_chunks[0] != xdr_zero || 847 headerp->rm_body.rm_chunks[1] != xdr_zero || 848 headerp->rm_body.rm_chunks[2] != xdr_one || 849 req->rl_nchunks == 0) 850 goto badheader; 851 iptr = (__be32 *)((unsigned char *)headerp + 852 RPCRDMA_HDRLEN_MIN); 853 rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr); 854 if (rdmalen < 0) 855 goto badheader; 856 r_xprt->rx_stats.total_rdma_reply += rdmalen; 857 /* Reply chunk buffer already is the reply vector - no fixup. */ 858 status = rdmalen; 859 break; 860 861 badheader: 862 default: 863 dprintk("%s: invalid rpcrdma reply header (type %d):" 864 " chunks[012] == %d %d %d" 865 " expected chunks <= %d\n", 866 __func__, be32_to_cpu(headerp->rm_type), 867 headerp->rm_body.rm_chunks[0], 868 headerp->rm_body.rm_chunks[1], 869 headerp->rm_body.rm_chunks[2], 870 req->rl_nchunks); 871 status = -EIO; 872 r_xprt->rx_stats.bad_reply_count++; 873 break; 874 } 875 876 credits = be32_to_cpu(headerp->rm_credit); 877 if (credits == 0) 878 credits = 1; /* don't deadlock */ 879 else if (credits > r_xprt->rx_buf.rb_max_requests) 880 credits = r_xprt->rx_buf.rb_max_requests; 881 882 cwnd = xprt->cwnd; 883 xprt->cwnd = credits << RPC_CWNDSHIFT; 884 if (xprt->cwnd > cwnd) 885 xprt_release_rqst_cong(rqst->rq_task); 886 887 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", 888 __func__, xprt, rqst, status); 889 xprt_complete_rqst(rqst->rq_task, status); 890 spin_unlock(&xprt->transport_lock); 891 } 892