xref: /openbmc/linux/net/sunrpc/xprtrdma/transport.c (revision ecba1060)
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39 
40 /*
41  * transport.c
42  *
43  * This file contains the top-level implementation of an RPC RDMA
44  * transport.
45  *
46  * Naming convention: functions beginning with xprt_ are part of the
47  * transport switch. All others are RPC RDMA internal.
48  */
49 
50 #include <linux/module.h>
51 #include <linux/init.h>
52 #include <linux/seq_file.h>
53 
54 #include "xprt_rdma.h"
55 
56 #ifdef RPC_DEBUG
57 # define RPCDBG_FACILITY	RPCDBG_TRANS
58 #endif
59 
60 MODULE_LICENSE("Dual BSD/GPL");
61 
62 MODULE_DESCRIPTION("RPC/RDMA Transport for Linux kernel NFS");
63 MODULE_AUTHOR("Network Appliance, Inc.");
64 
65 /*
66  * tunables
67  */
68 
69 static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
70 static unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
71 static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
72 static unsigned int xprt_rdma_inline_write_padding;
73 static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR;
74                 int xprt_rdma_pad_optimize = 0;
75 
76 #ifdef RPC_DEBUG
77 
78 static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
79 static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
80 static unsigned int zero;
81 static unsigned int max_padding = PAGE_SIZE;
82 static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
83 static unsigned int max_memreg = RPCRDMA_LAST - 1;
84 
85 static struct ctl_table_header *sunrpc_table_header;
86 
87 static ctl_table xr_tunables_table[] = {
88 	{
89 		.ctl_name       = CTL_UNNUMBERED,
90 		.procname	= "rdma_slot_table_entries",
91 		.data		= &xprt_rdma_slot_table_entries,
92 		.maxlen		= sizeof(unsigned int),
93 		.mode		= 0644,
94 		.proc_handler	= &proc_dointvec_minmax,
95 		.strategy	= &sysctl_intvec,
96 		.extra1		= &min_slot_table_size,
97 		.extra2		= &max_slot_table_size
98 	},
99 	{
100 		.ctl_name       = CTL_UNNUMBERED,
101 		.procname	= "rdma_max_inline_read",
102 		.data		= &xprt_rdma_max_inline_read,
103 		.maxlen		= sizeof(unsigned int),
104 		.mode		= 0644,
105 		.proc_handler	= &proc_dointvec,
106 		.strategy	= &sysctl_intvec,
107 	},
108 	{
109 		.ctl_name       = CTL_UNNUMBERED,
110 		.procname	= "rdma_max_inline_write",
111 		.data		= &xprt_rdma_max_inline_write,
112 		.maxlen		= sizeof(unsigned int),
113 		.mode		= 0644,
114 		.proc_handler	= &proc_dointvec,
115 		.strategy	= &sysctl_intvec,
116 	},
117 	{
118 		.ctl_name       = CTL_UNNUMBERED,
119 		.procname	= "rdma_inline_write_padding",
120 		.data		= &xprt_rdma_inline_write_padding,
121 		.maxlen		= sizeof(unsigned int),
122 		.mode		= 0644,
123 		.proc_handler	= &proc_dointvec_minmax,
124 		.strategy	= &sysctl_intvec,
125 		.extra1		= &zero,
126 		.extra2		= &max_padding,
127 	},
128 	{
129 		.ctl_name       = CTL_UNNUMBERED,
130 		.procname	= "rdma_memreg_strategy",
131 		.data		= &xprt_rdma_memreg_strategy,
132 		.maxlen		= sizeof(unsigned int),
133 		.mode		= 0644,
134 		.proc_handler	= &proc_dointvec_minmax,
135 		.strategy	= &sysctl_intvec,
136 		.extra1		= &min_memreg,
137 		.extra2		= &max_memreg,
138 	},
139 	{
140 		.ctl_name       = CTL_UNNUMBERED,
141 		.procname	= "rdma_pad_optimize",
142 		.data		= &xprt_rdma_pad_optimize,
143 		.maxlen		= sizeof(unsigned int),
144 		.mode		= 0644,
145 		.proc_handler	= &proc_dointvec,
146 	},
147 	{
148 		.ctl_name = 0,
149 	},
150 };
151 
152 static ctl_table sunrpc_table[] = {
153 	{
154 		.ctl_name	= CTL_SUNRPC,
155 		.procname	= "sunrpc",
156 		.mode		= 0555,
157 		.child		= xr_tunables_table
158 	},
159 	{
160 		.ctl_name = 0,
161 	},
162 };
163 
164 #endif
165 
166 static struct rpc_xprt_ops xprt_rdma_procs;	/* forward reference */
167 
168 static void
169 xprt_rdma_format_addresses(struct rpc_xprt *xprt)
170 {
171 	struct sockaddr_in *addr = (struct sockaddr_in *)
172 					&rpcx_to_rdmad(xprt).addr;
173 	char *buf;
174 
175 	buf = kzalloc(20, GFP_KERNEL);
176 	if (buf)
177 		snprintf(buf, 20, "%pI4", &addr->sin_addr.s_addr);
178 	xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
179 
180 	buf = kzalloc(8, GFP_KERNEL);
181 	if (buf)
182 		snprintf(buf, 8, "%u", ntohs(addr->sin_port));
183 	xprt->address_strings[RPC_DISPLAY_PORT] = buf;
184 
185 	xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
186 
187 	buf = kzalloc(48, GFP_KERNEL);
188 	if (buf)
189 		snprintf(buf, 48, "addr=%pI4 port=%u proto=%s",
190 			&addr->sin_addr.s_addr,
191 			ntohs(addr->sin_port), "rdma");
192 	xprt->address_strings[RPC_DISPLAY_ALL] = buf;
193 
194 	buf = kzalloc(10, GFP_KERNEL);
195 	if (buf)
196 		snprintf(buf, 10, "%02x%02x%02x%02x",
197 			NIPQUAD(addr->sin_addr.s_addr));
198 	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
199 
200 	buf = kzalloc(8, GFP_KERNEL);
201 	if (buf)
202 		snprintf(buf, 8, "%4hx", ntohs(addr->sin_port));
203 	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
204 
205 	buf = kzalloc(30, GFP_KERNEL);
206 	if (buf)
207 		snprintf(buf, 30, "%pI4.%u.%u",
208 			&addr->sin_addr.s_addr,
209 			ntohs(addr->sin_port) >> 8,
210 			ntohs(addr->sin_port) & 0xff);
211 	xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
212 
213 	/* netid */
214 	xprt->address_strings[RPC_DISPLAY_NETID] = "rdma";
215 }
216 
217 static void
218 xprt_rdma_free_addresses(struct rpc_xprt *xprt)
219 {
220 	unsigned int i;
221 
222 	for (i = 0; i < RPC_DISPLAY_MAX; i++)
223 		switch (i) {
224 		case RPC_DISPLAY_PROTO:
225 		case RPC_DISPLAY_NETID:
226 			continue;
227 		default:
228 			kfree(xprt->address_strings[i]);
229 		}
230 }
231 
232 static void
233 xprt_rdma_connect_worker(struct work_struct *work)
234 {
235 	struct rpcrdma_xprt *r_xprt =
236 		container_of(work, struct rpcrdma_xprt, rdma_connect.work);
237 	struct rpc_xprt *xprt = &r_xprt->xprt;
238 	int rc = 0;
239 
240 	if (!xprt->shutdown) {
241 		xprt_clear_connected(xprt);
242 
243 		dprintk("RPC:       %s: %sconnect\n", __func__,
244 				r_xprt->rx_ep.rep_connected != 0 ? "re" : "");
245 		rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
246 		if (rc)
247 			goto out;
248 	}
249 	goto out_clear;
250 
251 out:
252 	xprt_wake_pending_tasks(xprt, rc);
253 
254 out_clear:
255 	dprintk("RPC:       %s: exit\n", __func__);
256 	xprt_clear_connecting(xprt);
257 }
258 
259 /*
260  * xprt_rdma_destroy
261  *
262  * Destroy the xprt.
263  * Free all memory associated with the object, including its own.
264  * NOTE: none of the *destroy methods free memory for their top-level
265  * objects, even though they may have allocated it (they do free
266  * private memory). It's up to the caller to handle it. In this
267  * case (RDMA transport), all structure memory is inlined with the
268  * struct rpcrdma_xprt.
269  */
270 static void
271 xprt_rdma_destroy(struct rpc_xprt *xprt)
272 {
273 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
274 	int rc;
275 
276 	dprintk("RPC:       %s: called\n", __func__);
277 
278 	cancel_delayed_work(&r_xprt->rdma_connect);
279 	flush_scheduled_work();
280 
281 	xprt_clear_connected(xprt);
282 
283 	rpcrdma_buffer_destroy(&r_xprt->rx_buf);
284 	rc = rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
285 	if (rc)
286 		dprintk("RPC:       %s: rpcrdma_ep_destroy returned %i\n",
287 			__func__, rc);
288 	rpcrdma_ia_close(&r_xprt->rx_ia);
289 
290 	xprt_rdma_free_addresses(xprt);
291 
292 	kfree(xprt->slot);
293 	xprt->slot = NULL;
294 	kfree(xprt);
295 
296 	dprintk("RPC:       %s: returning\n", __func__);
297 
298 	module_put(THIS_MODULE);
299 }
300 
301 static const struct rpc_timeout xprt_rdma_default_timeout = {
302 	.to_initval = 60 * HZ,
303 	.to_maxval = 60 * HZ,
304 };
305 
306 /**
307  * xprt_setup_rdma - Set up transport to use RDMA
308  *
309  * @args: rpc transport arguments
310  */
311 static struct rpc_xprt *
312 xprt_setup_rdma(struct xprt_create *args)
313 {
314 	struct rpcrdma_create_data_internal cdata;
315 	struct rpc_xprt *xprt;
316 	struct rpcrdma_xprt *new_xprt;
317 	struct rpcrdma_ep *new_ep;
318 	struct sockaddr_in *sin;
319 	int rc;
320 
321 	if (args->addrlen > sizeof(xprt->addr)) {
322 		dprintk("RPC:       %s: address too large\n", __func__);
323 		return ERR_PTR(-EBADF);
324 	}
325 
326 	xprt = kzalloc(sizeof(struct rpcrdma_xprt), GFP_KERNEL);
327 	if (xprt == NULL) {
328 		dprintk("RPC:       %s: couldn't allocate rpcrdma_xprt\n",
329 			__func__);
330 		return ERR_PTR(-ENOMEM);
331 	}
332 
333 	xprt->max_reqs = xprt_rdma_slot_table_entries;
334 	xprt->slot = kcalloc(xprt->max_reqs,
335 				sizeof(struct rpc_rqst), GFP_KERNEL);
336 	if (xprt->slot == NULL) {
337 		dprintk("RPC:       %s: couldn't allocate %d slots\n",
338 			__func__, xprt->max_reqs);
339 		kfree(xprt);
340 		return ERR_PTR(-ENOMEM);
341 	}
342 
343 	/* 60 second timeout, no retries */
344 	xprt->timeout = &xprt_rdma_default_timeout;
345 	xprt->bind_timeout = (60U * HZ);
346 	xprt->connect_timeout = (60U * HZ);
347 	xprt->reestablish_timeout = (5U * HZ);
348 	xprt->idle_timeout = (5U * 60 * HZ);
349 
350 	xprt->resvport = 0;		/* privileged port not needed */
351 	xprt->tsh_size = 0;		/* RPC-RDMA handles framing */
352 	xprt->max_payload = RPCRDMA_MAX_DATA_SEGS * PAGE_SIZE;
353 	xprt->ops = &xprt_rdma_procs;
354 
355 	/*
356 	 * Set up RDMA-specific connect data.
357 	 */
358 
359 	/* Put server RDMA address in local cdata */
360 	memcpy(&cdata.addr, args->dstaddr, args->addrlen);
361 
362 	/* Ensure xprt->addr holds valid server TCP (not RDMA)
363 	 * address, for any side protocols which peek at it */
364 	xprt->prot = IPPROTO_TCP;
365 	xprt->addrlen = args->addrlen;
366 	memcpy(&xprt->addr, &cdata.addr, xprt->addrlen);
367 
368 	sin = (struct sockaddr_in *)&cdata.addr;
369 	if (ntohs(sin->sin_port) != 0)
370 		xprt_set_bound(xprt);
371 
372 	dprintk("RPC:       %s: %pI4:%u\n",
373 		__func__, &sin->sin_addr.s_addr, ntohs(sin->sin_port));
374 
375 	/* Set max requests */
376 	cdata.max_requests = xprt->max_reqs;
377 
378 	/* Set some length limits */
379 	cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
380 	cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
381 
382 	cdata.inline_wsize = xprt_rdma_max_inline_write;
383 	if (cdata.inline_wsize > cdata.wsize)
384 		cdata.inline_wsize = cdata.wsize;
385 
386 	cdata.inline_rsize = xprt_rdma_max_inline_read;
387 	if (cdata.inline_rsize > cdata.rsize)
388 		cdata.inline_rsize = cdata.rsize;
389 
390 	cdata.padding = xprt_rdma_inline_write_padding;
391 
392 	/*
393 	 * Create new transport instance, which includes initialized
394 	 *  o ia
395 	 *  o endpoint
396 	 *  o buffers
397 	 */
398 
399 	new_xprt = rpcx_to_rdmax(xprt);
400 
401 	rc = rpcrdma_ia_open(new_xprt, (struct sockaddr *) &cdata.addr,
402 				xprt_rdma_memreg_strategy);
403 	if (rc)
404 		goto out1;
405 
406 	/*
407 	 * initialize and create ep
408 	 */
409 	new_xprt->rx_data = cdata;
410 	new_ep = &new_xprt->rx_ep;
411 	new_ep->rep_remote_addr = cdata.addr;
412 
413 	rc = rpcrdma_ep_create(&new_xprt->rx_ep,
414 				&new_xprt->rx_ia, &new_xprt->rx_data);
415 	if (rc)
416 		goto out2;
417 
418 	/*
419 	 * Allocate pre-registered send and receive buffers for headers and
420 	 * any inline data. Also specify any padding which will be provided
421 	 * from a preregistered zero buffer.
422 	 */
423 	rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia,
424 				&new_xprt->rx_data);
425 	if (rc)
426 		goto out3;
427 
428 	/*
429 	 * Register a callback for connection events. This is necessary because
430 	 * connection loss notification is async. We also catch connection loss
431 	 * when reaping receives.
432 	 */
433 	INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker);
434 	new_ep->rep_func = rpcrdma_conn_func;
435 	new_ep->rep_xprt = xprt;
436 
437 	xprt_rdma_format_addresses(xprt);
438 
439 	if (!try_module_get(THIS_MODULE))
440 		goto out4;
441 
442 	return xprt;
443 
444 out4:
445 	xprt_rdma_free_addresses(xprt);
446 	rc = -EINVAL;
447 out3:
448 	(void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
449 out2:
450 	rpcrdma_ia_close(&new_xprt->rx_ia);
451 out1:
452 	kfree(xprt->slot);
453 	kfree(xprt);
454 	return ERR_PTR(rc);
455 }
456 
457 /*
458  * Close a connection, during shutdown or timeout/reconnect
459  */
460 static void
461 xprt_rdma_close(struct rpc_xprt *xprt)
462 {
463 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
464 
465 	dprintk("RPC:       %s: closing\n", __func__);
466 	if (r_xprt->rx_ep.rep_connected > 0)
467 		xprt->reestablish_timeout = 0;
468 	xprt_disconnect_done(xprt);
469 	(void) rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
470 }
471 
472 static void
473 xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
474 {
475 	struct sockaddr_in *sap;
476 
477 	sap = (struct sockaddr_in *)&xprt->addr;
478 	sap->sin_port = htons(port);
479 	sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr;
480 	sap->sin_port = htons(port);
481 	dprintk("RPC:       %s: %u\n", __func__, port);
482 }
483 
484 static void
485 xprt_rdma_connect(struct rpc_task *task)
486 {
487 	struct rpc_xprt *xprt = (struct rpc_xprt *)task->tk_xprt;
488 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
489 
490 	if (!xprt_test_and_set_connecting(xprt)) {
491 		if (r_xprt->rx_ep.rep_connected != 0) {
492 			/* Reconnect */
493 			schedule_delayed_work(&r_xprt->rdma_connect,
494 				xprt->reestablish_timeout);
495 			xprt->reestablish_timeout <<= 1;
496 			if (xprt->reestablish_timeout > (30 * HZ))
497 				xprt->reestablish_timeout = (30 * HZ);
498 			else if (xprt->reestablish_timeout < (5 * HZ))
499 				xprt->reestablish_timeout = (5 * HZ);
500 		} else {
501 			schedule_delayed_work(&r_xprt->rdma_connect, 0);
502 			if (!RPC_IS_ASYNC(task))
503 				flush_scheduled_work();
504 		}
505 	}
506 }
507 
508 static int
509 xprt_rdma_reserve_xprt(struct rpc_task *task)
510 {
511 	struct rpc_xprt *xprt = task->tk_xprt;
512 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
513 	int credits = atomic_read(&r_xprt->rx_buf.rb_credits);
514 
515 	/* == RPC_CWNDSCALE @ init, but *after* setup */
516 	if (r_xprt->rx_buf.rb_cwndscale == 0UL) {
517 		r_xprt->rx_buf.rb_cwndscale = xprt->cwnd;
518 		dprintk("RPC:       %s: cwndscale %lu\n", __func__,
519 			r_xprt->rx_buf.rb_cwndscale);
520 		BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0);
521 	}
522 	xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale;
523 	return xprt_reserve_xprt_cong(task);
524 }
525 
526 /*
527  * The RDMA allocate/free functions need the task structure as a place
528  * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
529  * sequence. For this reason, the recv buffers are attached to send
530  * buffers for portions of the RPC. Note that the RPC layer allocates
531  * both send and receive buffers in the same call. We may register
532  * the receive buffer portion when using reply chunks.
533  */
534 static void *
535 xprt_rdma_allocate(struct rpc_task *task, size_t size)
536 {
537 	struct rpc_xprt *xprt = task->tk_xprt;
538 	struct rpcrdma_req *req, *nreq;
539 
540 	req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
541 	BUG_ON(NULL == req);
542 
543 	if (size > req->rl_size) {
544 		dprintk("RPC:       %s: size %zd too large for buffer[%zd]: "
545 			"prog %d vers %d proc %d\n",
546 			__func__, size, req->rl_size,
547 			task->tk_client->cl_prog, task->tk_client->cl_vers,
548 			task->tk_msg.rpc_proc->p_proc);
549 		/*
550 		 * Outgoing length shortage. Our inline write max must have
551 		 * been configured to perform direct i/o.
552 		 *
553 		 * This is therefore a large metadata operation, and the
554 		 * allocate call was made on the maximum possible message,
555 		 * e.g. containing long filename(s) or symlink data. In
556 		 * fact, while these metadata operations *might* carry
557 		 * large outgoing payloads, they rarely *do*. However, we
558 		 * have to commit to the request here, so reallocate and
559 		 * register it now. The data path will never require this
560 		 * reallocation.
561 		 *
562 		 * If the allocation or registration fails, the RPC framework
563 		 * will (doggedly) retry.
564 		 */
565 		if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy ==
566 				RPCRDMA_BOUNCEBUFFERS) {
567 			/* forced to "pure inline" */
568 			dprintk("RPC:       %s: too much data (%zd) for inline "
569 					"(r/w max %d/%d)\n", __func__, size,
570 					rpcx_to_rdmad(xprt).inline_rsize,
571 					rpcx_to_rdmad(xprt).inline_wsize);
572 			size = req->rl_size;
573 			rpc_exit(task, -EIO);		/* fail the operation */
574 			rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
575 			goto out;
576 		}
577 		if (task->tk_flags & RPC_TASK_SWAPPER)
578 			nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
579 		else
580 			nreq = kmalloc(sizeof *req + size, GFP_NOFS);
581 		if (nreq == NULL)
582 			goto outfail;
583 
584 		if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
585 				nreq->rl_base, size + sizeof(struct rpcrdma_req)
586 				- offsetof(struct rpcrdma_req, rl_base),
587 				&nreq->rl_handle, &nreq->rl_iov)) {
588 			kfree(nreq);
589 			goto outfail;
590 		}
591 		rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
592 		nreq->rl_size = size;
593 		nreq->rl_niovs = 0;
594 		nreq->rl_nchunks = 0;
595 		nreq->rl_buffer = (struct rpcrdma_buffer *)req;
596 		nreq->rl_reply = req->rl_reply;
597 		memcpy(nreq->rl_segments,
598 			req->rl_segments, sizeof nreq->rl_segments);
599 		/* flag the swap with an unused field */
600 		nreq->rl_iov.length = 0;
601 		req->rl_reply = NULL;
602 		req = nreq;
603 	}
604 	dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
605 out:
606 	req->rl_connect_cookie = 0;	/* our reserved value */
607 	return req->rl_xdr_buf;
608 
609 outfail:
610 	rpcrdma_buffer_put(req);
611 	rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
612 	return NULL;
613 }
614 
615 /*
616  * This function returns all RDMA resources to the pool.
617  */
618 static void
619 xprt_rdma_free(void *buffer)
620 {
621 	struct rpcrdma_req *req;
622 	struct rpcrdma_xprt *r_xprt;
623 	struct rpcrdma_rep *rep;
624 	int i;
625 
626 	if (buffer == NULL)
627 		return;
628 
629 	req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
630 	if (req->rl_iov.length == 0) {	/* see allocate above */
631 		r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer,
632 				      struct rpcrdma_xprt, rx_buf);
633 	} else
634 		r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
635 	rep = req->rl_reply;
636 
637 	dprintk("RPC:       %s: called on 0x%p%s\n",
638 		__func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
639 
640 	/*
641 	 * Finish the deregistration. When using mw bind, this was
642 	 * begun in rpcrdma_reply_handler(). In all other modes, we
643 	 * do it here, in thread context. The process is considered
644 	 * complete when the rr_func vector becomes NULL - this
645 	 * was put in place during rpcrdma_reply_handler() - the wait
646 	 * call below will not block if the dereg is "done". If
647 	 * interrupted, our framework will clean up.
648 	 */
649 	for (i = 0; req->rl_nchunks;) {
650 		--req->rl_nchunks;
651 		i += rpcrdma_deregister_external(
652 			&req->rl_segments[i], r_xprt, NULL);
653 	}
654 
655 	if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) {
656 		rep->rr_func = NULL;	/* abandon the callback */
657 		req->rl_reply = NULL;
658 	}
659 
660 	if (req->rl_iov.length == 0) {	/* see allocate above */
661 		struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
662 		oreq->rl_reply = req->rl_reply;
663 		(void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
664 						   req->rl_handle,
665 						   &req->rl_iov);
666 		kfree(req);
667 		req = oreq;
668 	}
669 
670 	/* Put back request+reply buffers */
671 	rpcrdma_buffer_put(req);
672 }
673 
674 /*
675  * send_request invokes the meat of RPC RDMA. It must do the following:
676  *  1.  Marshal the RPC request into an RPC RDMA request, which means
677  *	putting a header in front of data, and creating IOVs for RDMA
678  *	from those in the request.
679  *  2.  In marshaling, detect opportunities for RDMA, and use them.
680  *  3.  Post a recv message to set up asynch completion, then send
681  *	the request (rpcrdma_ep_post).
682  *  4.  No partial sends are possible in the RPC-RDMA protocol (as in UDP).
683  */
684 
685 static int
686 xprt_rdma_send_request(struct rpc_task *task)
687 {
688 	struct rpc_rqst *rqst = task->tk_rqstp;
689 	struct rpc_xprt *xprt = task->tk_xprt;
690 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
691 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
692 
693 	/* marshal the send itself */
694 	if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) {
695 		r_xprt->rx_stats.failed_marshal_count++;
696 		dprintk("RPC:       %s: rpcrdma_marshal_req failed\n",
697 			__func__);
698 		return -EIO;
699 	}
700 
701 	if (req->rl_reply == NULL) 		/* e.g. reconnection */
702 		rpcrdma_recv_buffer_get(req);
703 
704 	if (req->rl_reply) {
705 		req->rl_reply->rr_func = rpcrdma_reply_handler;
706 		/* this need only be done once, but... */
707 		req->rl_reply->rr_xprt = xprt;
708 	}
709 
710 	/* Must suppress retransmit to maintain credits */
711 	if (req->rl_connect_cookie == xprt->connect_cookie)
712 		goto drop_connection;
713 	req->rl_connect_cookie = xprt->connect_cookie;
714 
715 	if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
716 		goto drop_connection;
717 
718 	task->tk_bytes_sent += rqst->rq_snd_buf.len;
719 	rqst->rq_bytes_sent = 0;
720 	return 0;
721 
722 drop_connection:
723 	xprt_disconnect_done(xprt);
724 	return -ENOTCONN;	/* implies disconnect */
725 }
726 
727 static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
728 {
729 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
730 	long idle_time = 0;
731 
732 	if (xprt_connected(xprt))
733 		idle_time = (long)(jiffies - xprt->last_used) / HZ;
734 
735 	seq_printf(seq,
736 	  "\txprt:\trdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu "
737 	  "%lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu\n",
738 
739 	   0,	/* need a local port? */
740 	   xprt->stat.bind_count,
741 	   xprt->stat.connect_count,
742 	   xprt->stat.connect_time,
743 	   idle_time,
744 	   xprt->stat.sends,
745 	   xprt->stat.recvs,
746 	   xprt->stat.bad_xids,
747 	   xprt->stat.req_u,
748 	   xprt->stat.bklog_u,
749 
750 	   r_xprt->rx_stats.read_chunk_count,
751 	   r_xprt->rx_stats.write_chunk_count,
752 	   r_xprt->rx_stats.reply_chunk_count,
753 	   r_xprt->rx_stats.total_rdma_request,
754 	   r_xprt->rx_stats.total_rdma_reply,
755 	   r_xprt->rx_stats.pullup_copy_count,
756 	   r_xprt->rx_stats.fixup_copy_count,
757 	   r_xprt->rx_stats.hardway_register_count,
758 	   r_xprt->rx_stats.failed_marshal_count,
759 	   r_xprt->rx_stats.bad_reply_count);
760 }
761 
762 /*
763  * Plumbing for rpc transport switch and kernel module
764  */
765 
766 static struct rpc_xprt_ops xprt_rdma_procs = {
767 	.reserve_xprt		= xprt_rdma_reserve_xprt,
768 	.release_xprt		= xprt_release_xprt_cong, /* sunrpc/xprt.c */
769 	.release_request	= xprt_release_rqst_cong,       /* ditto */
770 	.set_retrans_timeout	= xprt_set_retrans_timeout_def, /* ditto */
771 	.rpcbind		= rpcb_getport_async,	/* sunrpc/rpcb_clnt.c */
772 	.set_port		= xprt_rdma_set_port,
773 	.connect		= xprt_rdma_connect,
774 	.buf_alloc		= xprt_rdma_allocate,
775 	.buf_free		= xprt_rdma_free,
776 	.send_request		= xprt_rdma_send_request,
777 	.close			= xprt_rdma_close,
778 	.destroy		= xprt_rdma_destroy,
779 	.print_stats		= xprt_rdma_print_stats
780 };
781 
782 static struct xprt_class xprt_rdma = {
783 	.list			= LIST_HEAD_INIT(xprt_rdma.list),
784 	.name			= "rdma",
785 	.owner			= THIS_MODULE,
786 	.ident			= XPRT_TRANSPORT_RDMA,
787 	.setup			= xprt_setup_rdma,
788 };
789 
790 static void __exit xprt_rdma_cleanup(void)
791 {
792 	int rc;
793 
794 	dprintk(KERN_INFO "RPCRDMA Module Removed, deregister RPC RDMA transport\n");
795 #ifdef RPC_DEBUG
796 	if (sunrpc_table_header) {
797 		unregister_sysctl_table(sunrpc_table_header);
798 		sunrpc_table_header = NULL;
799 	}
800 #endif
801 	rc = xprt_unregister_transport(&xprt_rdma);
802 	if (rc)
803 		dprintk("RPC:       %s: xprt_unregister returned %i\n",
804 			__func__, rc);
805 }
806 
807 static int __init xprt_rdma_init(void)
808 {
809 	int rc;
810 
811 	rc = xprt_register_transport(&xprt_rdma);
812 
813 	if (rc)
814 		return rc;
815 
816 	dprintk(KERN_INFO "RPCRDMA Module Init, register RPC RDMA transport\n");
817 
818 	dprintk(KERN_INFO "Defaults:\n");
819 	dprintk(KERN_INFO "\tSlots %d\n"
820 		"\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
821 		xprt_rdma_slot_table_entries,
822 		xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
823 	dprintk(KERN_INFO "\tPadding %d\n\tMemreg %d\n",
824 		xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy);
825 
826 #ifdef RPC_DEBUG
827 	if (!sunrpc_table_header)
828 		sunrpc_table_header = register_sysctl_table(sunrpc_table);
829 #endif
830 	return 0;
831 }
832 
833 module_init(xprt_rdma_init);
834 module_exit(xprt_rdma_cleanup);
835