1 /* 2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the BSD-type 8 * license below: 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 14 * Redistributions of source code must retain the above copyright 15 * notice, this list of conditions and the following disclaimer. 16 * 17 * Redistributions in binary form must reproduce the above 18 * copyright notice, this list of conditions and the following 19 * disclaimer in the documentation and/or other materials provided 20 * with the distribution. 21 * 22 * Neither the name of the Network Appliance, Inc. nor the names of 23 * its contributors may be used to endorse or promote products 24 * derived from this software without specific prior written 25 * permission. 26 * 27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 38 */ 39 40 /* 41 * rpc_rdma.c 42 * 43 * This file contains the guts of the RPC RDMA protocol, and 44 * does marshaling/unmarshaling, etc. It is also where interfacing 45 * to the Linux RPC framework lives. 46 */ 47 48 #include "xprt_rdma.h" 49 50 #include <linux/highmem.h> 51 52 #ifdef RPC_DEBUG 53 # define RPCDBG_FACILITY RPCDBG_TRANS 54 #endif 55 56 enum rpcrdma_chunktype { 57 rpcrdma_noch = 0, 58 rpcrdma_readch, 59 rpcrdma_areadch, 60 rpcrdma_writech, 61 rpcrdma_replych 62 }; 63 64 #ifdef RPC_DEBUG 65 static const char transfertypes[][12] = { 66 "pure inline", /* no chunks */ 67 " read chunk", /* some argument via rdma read */ 68 "*read chunk", /* entire request via rdma read */ 69 "write chunk", /* some result via rdma write */ 70 "reply chunk" /* entire reply via rdma write */ 71 }; 72 #endif 73 74 /* 75 * Chunk assembly from upper layer xdr_buf. 76 * 77 * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk 78 * elements. Segments are then coalesced when registered, if possible 79 * within the selected memreg mode. 80 * 81 * Note, this routine is never called if the connection's memory 82 * registration strategy is 0 (bounce buffers). 83 */ 84 85 static int 86 rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, 87 enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs) 88 { 89 int len, n = 0, p; 90 91 if (pos == 0 && xdrbuf->head[0].iov_len) { 92 seg[n].mr_page = NULL; 93 seg[n].mr_offset = xdrbuf->head[0].iov_base; 94 seg[n].mr_len = xdrbuf->head[0].iov_len; 95 ++n; 96 } 97 98 if (xdrbuf->page_len && (xdrbuf->pages[0] != NULL)) { 99 if (n == nsegs) 100 return 0; 101 seg[n].mr_page = xdrbuf->pages[0]; 102 seg[n].mr_offset = (void *)(unsigned long) xdrbuf->page_base; 103 seg[n].mr_len = min_t(u32, 104 PAGE_SIZE - xdrbuf->page_base, xdrbuf->page_len); 105 len = xdrbuf->page_len - seg[n].mr_len; 106 ++n; 107 p = 1; 108 while (len > 0) { 109 if (n == nsegs) 110 return 0; 111 seg[n].mr_page = xdrbuf->pages[p]; 112 seg[n].mr_offset = NULL; 113 seg[n].mr_len = min_t(u32, PAGE_SIZE, len); 114 len -= seg[n].mr_len; 115 ++n; 116 ++p; 117 } 118 } 119 120 if (xdrbuf->tail[0].iov_len) { 121 if (n == nsegs) 122 return 0; 123 seg[n].mr_page = NULL; 124 seg[n].mr_offset = xdrbuf->tail[0].iov_base; 125 seg[n].mr_len = xdrbuf->tail[0].iov_len; 126 ++n; 127 } 128 129 return n; 130 } 131 132 /* 133 * Create read/write chunk lists, and reply chunks, for RDMA 134 * 135 * Assume check against THRESHOLD has been done, and chunks are required. 136 * Assume only encoding one list entry for read|write chunks. The NFSv3 137 * protocol is simple enough to allow this as it only has a single "bulk 138 * result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The 139 * RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.) 140 * 141 * When used for a single reply chunk (which is a special write 142 * chunk used for the entire reply, rather than just the data), it 143 * is used primarily for READDIR and READLINK which would otherwise 144 * be severely size-limited by a small rdma inline read max. The server 145 * response will come back as an RDMA Write, followed by a message 146 * of type RDMA_NOMSG carrying the xid and length. As a result, reply 147 * chunks do not provide data alignment, however they do not require 148 * "fixup" (moving the response to the upper layer buffer) either. 149 * 150 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): 151 * 152 * Read chunklist (a linked list): 153 * N elements, position P (same P for all chunks of same arg!): 154 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 155 * 156 * Write chunklist (a list of (one) counted array): 157 * N elements: 158 * 1 - N - HLOO - HLOO - ... - HLOO - 0 159 * 160 * Reply chunk (a counted array): 161 * N elements: 162 * 1 - N - HLOO - HLOO - ... - HLOO 163 */ 164 165 static unsigned int 166 rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, 167 struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type) 168 { 169 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 170 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_task->tk_xprt); 171 int nsegs, nchunks = 0; 172 unsigned int pos; 173 struct rpcrdma_mr_seg *seg = req->rl_segments; 174 struct rpcrdma_read_chunk *cur_rchunk = NULL; 175 struct rpcrdma_write_array *warray = NULL; 176 struct rpcrdma_write_chunk *cur_wchunk = NULL; 177 __be32 *iptr = headerp->rm_body.rm_chunks; 178 179 if (type == rpcrdma_readch || type == rpcrdma_areadch) { 180 /* a read chunk - server will RDMA Read our memory */ 181 cur_rchunk = (struct rpcrdma_read_chunk *) iptr; 182 } else { 183 /* a write or reply chunk - server will RDMA Write our memory */ 184 *iptr++ = xdr_zero; /* encode a NULL read chunk list */ 185 if (type == rpcrdma_replych) 186 *iptr++ = xdr_zero; /* a NULL write chunk list */ 187 warray = (struct rpcrdma_write_array *) iptr; 188 cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1); 189 } 190 191 if (type == rpcrdma_replych || type == rpcrdma_areadch) 192 pos = 0; 193 else 194 pos = target->head[0].iov_len; 195 196 nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS); 197 if (nsegs == 0) 198 return 0; 199 200 do { 201 /* bind/register the memory, then build chunk from result. */ 202 int n = rpcrdma_register_external(seg, nsegs, 203 cur_wchunk != NULL, r_xprt); 204 if (n <= 0) 205 goto out; 206 if (cur_rchunk) { /* read */ 207 cur_rchunk->rc_discrim = xdr_one; 208 /* all read chunks have the same "position" */ 209 cur_rchunk->rc_position = htonl(pos); 210 cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey); 211 cur_rchunk->rc_target.rs_length = htonl(seg->mr_len); 212 xdr_encode_hyper( 213 (__be32 *)&cur_rchunk->rc_target.rs_offset, 214 seg->mr_base); 215 dprintk("RPC: %s: read chunk " 216 "elem %d@0x%llx:0x%x pos %u (%s)\n", __func__, 217 seg->mr_len, (unsigned long long)seg->mr_base, 218 seg->mr_rkey, pos, n < nsegs ? "more" : "last"); 219 cur_rchunk++; 220 r_xprt->rx_stats.read_chunk_count++; 221 } else { /* write/reply */ 222 cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey); 223 cur_wchunk->wc_target.rs_length = htonl(seg->mr_len); 224 xdr_encode_hyper( 225 (__be32 *)&cur_wchunk->wc_target.rs_offset, 226 seg->mr_base); 227 dprintk("RPC: %s: %s chunk " 228 "elem %d@0x%llx:0x%x (%s)\n", __func__, 229 (type == rpcrdma_replych) ? "reply" : "write", 230 seg->mr_len, (unsigned long long)seg->mr_base, 231 seg->mr_rkey, n < nsegs ? "more" : "last"); 232 cur_wchunk++; 233 if (type == rpcrdma_replych) 234 r_xprt->rx_stats.reply_chunk_count++; 235 else 236 r_xprt->rx_stats.write_chunk_count++; 237 r_xprt->rx_stats.total_rdma_request += seg->mr_len; 238 } 239 nchunks++; 240 seg += n; 241 nsegs -= n; 242 } while (nsegs); 243 244 /* success. all failures return above */ 245 req->rl_nchunks = nchunks; 246 247 BUG_ON(nchunks == 0); 248 249 /* 250 * finish off header. If write, marshal discrim and nchunks. 251 */ 252 if (cur_rchunk) { 253 iptr = (__be32 *) cur_rchunk; 254 *iptr++ = xdr_zero; /* finish the read chunk list */ 255 *iptr++ = xdr_zero; /* encode a NULL write chunk list */ 256 *iptr++ = xdr_zero; /* encode a NULL reply chunk */ 257 } else { 258 warray->wc_discrim = xdr_one; 259 warray->wc_nchunks = htonl(nchunks); 260 iptr = (__be32 *) cur_wchunk; 261 if (type == rpcrdma_writech) { 262 *iptr++ = xdr_zero; /* finish the write chunk list */ 263 *iptr++ = xdr_zero; /* encode a NULL reply chunk */ 264 } 265 } 266 267 /* 268 * Return header size. 269 */ 270 return (unsigned char *)iptr - (unsigned char *)headerp; 271 272 out: 273 for (pos = 0; nchunks--;) 274 pos += rpcrdma_deregister_external( 275 &req->rl_segments[pos], r_xprt, NULL); 276 return 0; 277 } 278 279 /* 280 * Copy write data inline. 281 * This function is used for "small" requests. Data which is passed 282 * to RPC via iovecs (or page list) is copied directly into the 283 * pre-registered memory buffer for this request. For small amounts 284 * of data, this is efficient. The cutoff value is tunable. 285 */ 286 static int 287 rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad) 288 { 289 int i, npages, curlen; 290 int copy_len; 291 unsigned char *srcp, *destp; 292 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); 293 294 destp = rqst->rq_svec[0].iov_base; 295 curlen = rqst->rq_svec[0].iov_len; 296 destp += curlen; 297 /* 298 * Do optional padding where it makes sense. Alignment of write 299 * payload can help the server, if our setting is accurate. 300 */ 301 pad -= (curlen + 36/*sizeof(struct rpcrdma_msg_padded)*/); 302 if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH) 303 pad = 0; /* don't pad this request */ 304 305 dprintk("RPC: %s: pad %d destp 0x%p len %d hdrlen %d\n", 306 __func__, pad, destp, rqst->rq_slen, curlen); 307 308 copy_len = rqst->rq_snd_buf.page_len; 309 r_xprt->rx_stats.pullup_copy_count += copy_len; 310 npages = PAGE_ALIGN(rqst->rq_snd_buf.page_base+copy_len) >> PAGE_SHIFT; 311 for (i = 0; copy_len && i < npages; i++) { 312 if (i == 0) 313 curlen = PAGE_SIZE - rqst->rq_snd_buf.page_base; 314 else 315 curlen = PAGE_SIZE; 316 if (curlen > copy_len) 317 curlen = copy_len; 318 dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n", 319 __func__, i, destp, copy_len, curlen); 320 srcp = kmap_atomic(rqst->rq_snd_buf.pages[i], 321 KM_SKB_SUNRPC_DATA); 322 if (i == 0) 323 memcpy(destp, srcp+rqst->rq_snd_buf.page_base, curlen); 324 else 325 memcpy(destp, srcp, curlen); 326 kunmap_atomic(srcp, KM_SKB_SUNRPC_DATA); 327 rqst->rq_svec[0].iov_len += curlen; 328 destp += curlen; 329 copy_len -= curlen; 330 } 331 if (rqst->rq_snd_buf.tail[0].iov_len) { 332 curlen = rqst->rq_snd_buf.tail[0].iov_len; 333 if (destp != rqst->rq_snd_buf.tail[0].iov_base) { 334 memcpy(destp, 335 rqst->rq_snd_buf.tail[0].iov_base, curlen); 336 r_xprt->rx_stats.pullup_copy_count += curlen; 337 } 338 dprintk("RPC: %s: tail destp 0x%p len %d curlen %d\n", 339 __func__, destp, copy_len, curlen); 340 rqst->rq_svec[0].iov_len += curlen; 341 } 342 /* header now contains entire send message */ 343 return pad; 344 } 345 346 /* 347 * Marshal a request: the primary job of this routine is to choose 348 * the transfer modes. See comments below. 349 * 350 * Uses multiple RDMA IOVs for a request: 351 * [0] -- RPC RDMA header, which uses memory from the *start* of the 352 * preregistered buffer that already holds the RPC data in 353 * its middle. 354 * [1] -- the RPC header/data, marshaled by RPC and the NFS protocol. 355 * [2] -- optional padding. 356 * [3] -- if padded, header only in [1] and data here. 357 */ 358 359 int 360 rpcrdma_marshal_req(struct rpc_rqst *rqst) 361 { 362 struct rpc_xprt *xprt = rqst->rq_task->tk_xprt; 363 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 364 struct rpcrdma_req *req = rpcr_to_rdmar(rqst); 365 char *base; 366 size_t hdrlen, rpclen, padlen; 367 enum rpcrdma_chunktype rtype, wtype; 368 struct rpcrdma_msg *headerp; 369 370 /* 371 * rpclen gets amount of data in first buffer, which is the 372 * pre-registered buffer. 373 */ 374 base = rqst->rq_svec[0].iov_base; 375 rpclen = rqst->rq_svec[0].iov_len; 376 377 /* build RDMA header in private area at front */ 378 headerp = (struct rpcrdma_msg *) req->rl_base; 379 /* don't htonl XID, it's already done in request */ 380 headerp->rm_xid = rqst->rq_xid; 381 headerp->rm_vers = xdr_one; 382 headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests); 383 headerp->rm_type = htonl(RDMA_MSG); 384 385 /* 386 * Chunks needed for results? 387 * 388 * o If the expected result is under the inline threshold, all ops 389 * return as inline (but see later). 390 * o Large non-read ops return as a single reply chunk. 391 * o Large read ops return data as write chunk(s), header as inline. 392 * 393 * Note: the NFS code sending down multiple result segments implies 394 * the op is one of read, readdir[plus], readlink or NFSv4 getacl. 395 */ 396 397 /* 398 * This code can handle read chunks, write chunks OR reply 399 * chunks -- only one type. If the request is too big to fit 400 * inline, then we will choose read chunks. If the request is 401 * a READ, then use write chunks to separate the file data 402 * into pages; otherwise use reply chunks. 403 */ 404 if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst)) 405 wtype = rpcrdma_noch; 406 else if (rqst->rq_rcv_buf.page_len == 0) 407 wtype = rpcrdma_replych; 408 else if (rqst->rq_rcv_buf.flags & XDRBUF_READ) 409 wtype = rpcrdma_writech; 410 else 411 wtype = rpcrdma_replych; 412 413 /* 414 * Chunks needed for arguments? 415 * 416 * o If the total request is under the inline threshold, all ops 417 * are sent as inline. 418 * o Large non-write ops are sent with the entire message as a 419 * single read chunk (protocol 0-position special case). 420 * o Large write ops transmit data as read chunk(s), header as 421 * inline. 422 * 423 * Note: the NFS code sending down multiple argument segments 424 * implies the op is a write. 425 * TBD check NFSv4 setacl 426 */ 427 if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst)) 428 rtype = rpcrdma_noch; 429 else if (rqst->rq_snd_buf.page_len == 0) 430 rtype = rpcrdma_areadch; 431 else 432 rtype = rpcrdma_readch; 433 434 /* The following simplification is not true forever */ 435 if (rtype != rpcrdma_noch && wtype == rpcrdma_replych) 436 wtype = rpcrdma_noch; 437 BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch); 438 439 if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS && 440 (rtype != rpcrdma_noch || wtype != rpcrdma_noch)) { 441 /* forced to "pure inline"? */ 442 dprintk("RPC: %s: too much data (%d/%d) for inline\n", 443 __func__, rqst->rq_rcv_buf.len, rqst->rq_snd_buf.len); 444 return -1; 445 } 446 447 hdrlen = 28; /*sizeof *headerp;*/ 448 padlen = 0; 449 450 /* 451 * Pull up any extra send data into the preregistered buffer. 452 * When padding is in use and applies to the transfer, insert 453 * it and change the message type. 454 */ 455 if (rtype == rpcrdma_noch) { 456 457 padlen = rpcrdma_inline_pullup(rqst, 458 RPCRDMA_INLINE_PAD_VALUE(rqst)); 459 460 if (padlen) { 461 headerp->rm_type = htonl(RDMA_MSGP); 462 headerp->rm_body.rm_padded.rm_align = 463 htonl(RPCRDMA_INLINE_PAD_VALUE(rqst)); 464 headerp->rm_body.rm_padded.rm_thresh = 465 htonl(RPCRDMA_INLINE_PAD_THRESH); 466 headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero; 467 headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero; 468 headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero; 469 hdrlen += 2 * sizeof(u32); /* extra words in padhdr */ 470 BUG_ON(wtype != rpcrdma_noch); 471 472 } else { 473 headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero; 474 headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero; 475 headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero; 476 /* new length after pullup */ 477 rpclen = rqst->rq_svec[0].iov_len; 478 /* 479 * Currently we try to not actually use read inline. 480 * Reply chunks have the desirable property that 481 * they land, packed, directly in the target buffers 482 * without headers, so they require no fixup. The 483 * additional RDMA Write op sends the same amount 484 * of data, streams on-the-wire and adds no overhead 485 * on receive. Therefore, we request a reply chunk 486 * for non-writes wherever feasible and efficient. 487 */ 488 if (wtype == rpcrdma_noch && 489 r_xprt->rx_ia.ri_memreg_strategy > RPCRDMA_REGISTER) 490 wtype = rpcrdma_replych; 491 } 492 } 493 494 /* 495 * Marshal chunks. This routine will return the header length 496 * consumed by marshaling. 497 */ 498 if (rtype != rpcrdma_noch) { 499 hdrlen = rpcrdma_create_chunks(rqst, 500 &rqst->rq_snd_buf, headerp, rtype); 501 wtype = rtype; /* simplify dprintk */ 502 503 } else if (wtype != rpcrdma_noch) { 504 hdrlen = rpcrdma_create_chunks(rqst, 505 &rqst->rq_rcv_buf, headerp, wtype); 506 } 507 508 if (hdrlen == 0) 509 return -1; 510 511 dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd\n" 512 " headerp 0x%p base 0x%p lkey 0x%x\n", 513 __func__, transfertypes[wtype], hdrlen, rpclen, padlen, 514 headerp, base, req->rl_iov.lkey); 515 516 /* 517 * initialize send_iov's - normally only two: rdma chunk header and 518 * single preregistered RPC header buffer, but if padding is present, 519 * then use a preregistered (and zeroed) pad buffer between the RPC 520 * header and any write data. In all non-rdma cases, any following 521 * data has been copied into the RPC header buffer. 522 */ 523 req->rl_send_iov[0].addr = req->rl_iov.addr; 524 req->rl_send_iov[0].length = hdrlen; 525 req->rl_send_iov[0].lkey = req->rl_iov.lkey; 526 527 req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base); 528 req->rl_send_iov[1].length = rpclen; 529 req->rl_send_iov[1].lkey = req->rl_iov.lkey; 530 531 req->rl_niovs = 2; 532 533 if (padlen) { 534 struct rpcrdma_ep *ep = &r_xprt->rx_ep; 535 536 req->rl_send_iov[2].addr = ep->rep_pad.addr; 537 req->rl_send_iov[2].length = padlen; 538 req->rl_send_iov[2].lkey = ep->rep_pad.lkey; 539 540 req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen; 541 req->rl_send_iov[3].length = rqst->rq_slen - rpclen; 542 req->rl_send_iov[3].lkey = req->rl_iov.lkey; 543 544 req->rl_niovs = 4; 545 } 546 547 return 0; 548 } 549 550 /* 551 * Chase down a received write or reply chunklist to get length 552 * RDMA'd by server. See map at rpcrdma_create_chunks()! :-) 553 */ 554 static int 555 rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __be32 **iptrp) 556 { 557 unsigned int i, total_len; 558 struct rpcrdma_write_chunk *cur_wchunk; 559 560 i = ntohl(**iptrp); /* get array count */ 561 if (i > max) 562 return -1; 563 cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1); 564 total_len = 0; 565 while (i--) { 566 struct rpcrdma_segment *seg = &cur_wchunk->wc_target; 567 ifdebug(FACILITY) { 568 u64 off; 569 xdr_decode_hyper((__be32 *)&seg->rs_offset, &off); 570 dprintk("RPC: %s: chunk %d@0x%llx:0x%x\n", 571 __func__, 572 ntohl(seg->rs_length), 573 (unsigned long long)off, 574 ntohl(seg->rs_handle)); 575 } 576 total_len += ntohl(seg->rs_length); 577 ++cur_wchunk; 578 } 579 /* check and adjust for properly terminated write chunk */ 580 if (wrchunk) { 581 __be32 *w = (__be32 *) cur_wchunk; 582 if (*w++ != xdr_zero) 583 return -1; 584 cur_wchunk = (struct rpcrdma_write_chunk *) w; 585 } 586 if ((char *) cur_wchunk > rep->rr_base + rep->rr_len) 587 return -1; 588 589 *iptrp = (__be32 *) cur_wchunk; 590 return total_len; 591 } 592 593 /* 594 * Scatter inline received data back into provided iov's. 595 */ 596 static void 597 rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len) 598 { 599 int i, npages, curlen, olen; 600 char *destp; 601 602 curlen = rqst->rq_rcv_buf.head[0].iov_len; 603 if (curlen > copy_len) { /* write chunk header fixup */ 604 curlen = copy_len; 605 rqst->rq_rcv_buf.head[0].iov_len = curlen; 606 } 607 608 dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n", 609 __func__, srcp, copy_len, curlen); 610 611 /* Shift pointer for first receive segment only */ 612 rqst->rq_rcv_buf.head[0].iov_base = srcp; 613 srcp += curlen; 614 copy_len -= curlen; 615 616 olen = copy_len; 617 i = 0; 618 rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen; 619 if (copy_len && rqst->rq_rcv_buf.page_len) { 620 npages = PAGE_ALIGN(rqst->rq_rcv_buf.page_base + 621 rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT; 622 for (; i < npages; i++) { 623 if (i == 0) 624 curlen = PAGE_SIZE - rqst->rq_rcv_buf.page_base; 625 else 626 curlen = PAGE_SIZE; 627 if (curlen > copy_len) 628 curlen = copy_len; 629 dprintk("RPC: %s: page %d" 630 " srcp 0x%p len %d curlen %d\n", 631 __func__, i, srcp, copy_len, curlen); 632 destp = kmap_atomic(rqst->rq_rcv_buf.pages[i], 633 KM_SKB_SUNRPC_DATA); 634 if (i == 0) 635 memcpy(destp + rqst->rq_rcv_buf.page_base, 636 srcp, curlen); 637 else 638 memcpy(destp, srcp, curlen); 639 flush_dcache_page(rqst->rq_rcv_buf.pages[i]); 640 kunmap_atomic(destp, KM_SKB_SUNRPC_DATA); 641 srcp += curlen; 642 copy_len -= curlen; 643 if (copy_len == 0) 644 break; 645 } 646 rqst->rq_rcv_buf.page_len = olen - copy_len; 647 } else 648 rqst->rq_rcv_buf.page_len = 0; 649 650 if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) { 651 curlen = copy_len; 652 if (curlen > rqst->rq_rcv_buf.tail[0].iov_len) 653 curlen = rqst->rq_rcv_buf.tail[0].iov_len; 654 if (rqst->rq_rcv_buf.tail[0].iov_base != srcp) 655 memcpy(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen); 656 dprintk("RPC: %s: tail srcp 0x%p len %d curlen %d\n", 657 __func__, srcp, copy_len, curlen); 658 rqst->rq_rcv_buf.tail[0].iov_len = curlen; 659 copy_len -= curlen; ++i; 660 } else 661 rqst->rq_rcv_buf.tail[0].iov_len = 0; 662 663 if (copy_len) 664 dprintk("RPC: %s: %d bytes in" 665 " %d extra segments (%d lost)\n", 666 __func__, olen, i, copy_len); 667 668 /* TBD avoid a warning from call_decode() */ 669 rqst->rq_private_buf = rqst->rq_rcv_buf; 670 } 671 672 /* 673 * This function is called when an async event is posted to 674 * the connection which changes the connection state. All it 675 * does at this point is mark the connection up/down, the rpc 676 * timers do the rest. 677 */ 678 void 679 rpcrdma_conn_func(struct rpcrdma_ep *ep) 680 { 681 struct rpc_xprt *xprt = ep->rep_xprt; 682 683 spin_lock_bh(&xprt->transport_lock); 684 if (ep->rep_connected > 0) { 685 if (!xprt_test_and_set_connected(xprt)) 686 xprt_wake_pending_tasks(xprt, 0); 687 } else { 688 if (xprt_test_and_clear_connected(xprt)) 689 xprt_wake_pending_tasks(xprt, ep->rep_connected); 690 } 691 spin_unlock_bh(&xprt->transport_lock); 692 } 693 694 /* 695 * This function is called when memory window unbind which we are waiting 696 * for completes. Just use rr_func (zeroed by upcall) to signal completion. 697 */ 698 static void 699 rpcrdma_unbind_func(struct rpcrdma_rep *rep) 700 { 701 wake_up(&rep->rr_unbind); 702 } 703 704 /* 705 * Called as a tasklet to do req/reply match and complete a request 706 * Errors must result in the RPC task either being awakened, or 707 * allowed to timeout, to discover the errors at that time. 708 */ 709 void 710 rpcrdma_reply_handler(struct rpcrdma_rep *rep) 711 { 712 struct rpcrdma_msg *headerp; 713 struct rpcrdma_req *req; 714 struct rpc_rqst *rqst; 715 struct rpc_xprt *xprt = rep->rr_xprt; 716 struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); 717 __be32 *iptr; 718 int i, rdmalen, status; 719 720 /* Check status. If bad, signal disconnect and return rep to pool */ 721 if (rep->rr_len == ~0U) { 722 rpcrdma_recv_buffer_put(rep); 723 if (r_xprt->rx_ep.rep_connected == 1) { 724 r_xprt->rx_ep.rep_connected = -EIO; 725 rpcrdma_conn_func(&r_xprt->rx_ep); 726 } 727 return; 728 } 729 if (rep->rr_len < 28) { 730 dprintk("RPC: %s: short/invalid reply\n", __func__); 731 goto repost; 732 } 733 headerp = (struct rpcrdma_msg *) rep->rr_base; 734 if (headerp->rm_vers != xdr_one) { 735 dprintk("RPC: %s: invalid version %d\n", 736 __func__, ntohl(headerp->rm_vers)); 737 goto repost; 738 } 739 740 /* Get XID and try for a match. */ 741 spin_lock(&xprt->transport_lock); 742 rqst = xprt_lookup_rqst(xprt, headerp->rm_xid); 743 if (rqst == NULL) { 744 spin_unlock(&xprt->transport_lock); 745 dprintk("RPC: %s: reply 0x%p failed " 746 "to match any request xid 0x%08x len %d\n", 747 __func__, rep, headerp->rm_xid, rep->rr_len); 748 repost: 749 r_xprt->rx_stats.bad_reply_count++; 750 rep->rr_func = rpcrdma_reply_handler; 751 if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep)) 752 rpcrdma_recv_buffer_put(rep); 753 754 return; 755 } 756 757 /* get request object */ 758 req = rpcr_to_rdmar(rqst); 759 760 dprintk("RPC: %s: reply 0x%p completes request 0x%p\n" 761 " RPC request 0x%p xid 0x%08x\n", 762 __func__, rep, req, rqst, headerp->rm_xid); 763 764 BUG_ON(!req || req->rl_reply); 765 766 /* from here on, the reply is no longer an orphan */ 767 req->rl_reply = rep; 768 769 /* check for expected message types */ 770 /* The order of some of these tests is important. */ 771 switch (headerp->rm_type) { 772 case __constant_htonl(RDMA_MSG): 773 /* never expect read chunks */ 774 /* never expect reply chunks (two ways to check) */ 775 /* never expect write chunks without having offered RDMA */ 776 if (headerp->rm_body.rm_chunks[0] != xdr_zero || 777 (headerp->rm_body.rm_chunks[1] == xdr_zero && 778 headerp->rm_body.rm_chunks[2] != xdr_zero) || 779 (headerp->rm_body.rm_chunks[1] != xdr_zero && 780 req->rl_nchunks == 0)) 781 goto badheader; 782 if (headerp->rm_body.rm_chunks[1] != xdr_zero) { 783 /* count any expected write chunks in read reply */ 784 /* start at write chunk array count */ 785 iptr = &headerp->rm_body.rm_chunks[2]; 786 rdmalen = rpcrdma_count_chunks(rep, 787 req->rl_nchunks, 1, &iptr); 788 /* check for validity, and no reply chunk after */ 789 if (rdmalen < 0 || *iptr++ != xdr_zero) 790 goto badheader; 791 rep->rr_len -= 792 ((unsigned char *)iptr - (unsigned char *)headerp); 793 status = rep->rr_len + rdmalen; 794 r_xprt->rx_stats.total_rdma_reply += rdmalen; 795 } else { 796 /* else ordinary inline */ 797 iptr = (__be32 *)((unsigned char *)headerp + 28); 798 rep->rr_len -= 28; /*sizeof *headerp;*/ 799 status = rep->rr_len; 800 } 801 /* Fix up the rpc results for upper layer */ 802 rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len); 803 break; 804 805 case __constant_htonl(RDMA_NOMSG): 806 /* never expect read or write chunks, always reply chunks */ 807 if (headerp->rm_body.rm_chunks[0] != xdr_zero || 808 headerp->rm_body.rm_chunks[1] != xdr_zero || 809 headerp->rm_body.rm_chunks[2] != xdr_one || 810 req->rl_nchunks == 0) 811 goto badheader; 812 iptr = (__be32 *)((unsigned char *)headerp + 28); 813 rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr); 814 if (rdmalen < 0) 815 goto badheader; 816 r_xprt->rx_stats.total_rdma_reply += rdmalen; 817 /* Reply chunk buffer already is the reply vector - no fixup. */ 818 status = rdmalen; 819 break; 820 821 badheader: 822 default: 823 dprintk("%s: invalid rpcrdma reply header (type %d):" 824 " chunks[012] == %d %d %d" 825 " expected chunks <= %d\n", 826 __func__, ntohl(headerp->rm_type), 827 headerp->rm_body.rm_chunks[0], 828 headerp->rm_body.rm_chunks[1], 829 headerp->rm_body.rm_chunks[2], 830 req->rl_nchunks); 831 status = -EIO; 832 r_xprt->rx_stats.bad_reply_count++; 833 break; 834 } 835 836 /* If using mw bind, start the deregister process now. */ 837 /* (Note: if mr_free(), cannot perform it here, in tasklet context) */ 838 if (req->rl_nchunks) switch (r_xprt->rx_ia.ri_memreg_strategy) { 839 case RPCRDMA_MEMWINDOWS: 840 for (i = 0; req->rl_nchunks-- > 1;) 841 i += rpcrdma_deregister_external( 842 &req->rl_segments[i], r_xprt, NULL); 843 /* Optionally wait (not here) for unbinds to complete */ 844 rep->rr_func = rpcrdma_unbind_func; 845 (void) rpcrdma_deregister_external(&req->rl_segments[i], 846 r_xprt, rep); 847 break; 848 case RPCRDMA_MEMWINDOWS_ASYNC: 849 for (i = 0; req->rl_nchunks--;) 850 i += rpcrdma_deregister_external(&req->rl_segments[i], 851 r_xprt, NULL); 852 break; 853 default: 854 break; 855 } 856 857 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n", 858 __func__, xprt, rqst, status); 859 xprt_complete_rqst(rqst->rq_task, status); 860 spin_unlock(&xprt->transport_lock); 861 } 862