xref: /openbmc/linux/net/sunrpc/xprtrdma/transport.c (revision a1e58bbd)
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39 
40 /*
41  * transport.c
42  *
43  * This file contains the top-level implementation of an RPC RDMA
44  * transport.
45  *
46  * Naming convention: functions beginning with xprt_ are part of the
47  * transport switch. All others are RPC RDMA internal.
48  */
49 
50 #include <linux/module.h>
51 #include <linux/init.h>
52 #include <linux/seq_file.h>
53 
54 #include "xprt_rdma.h"
55 
56 #ifdef RPC_DEBUG
57 # define RPCDBG_FACILITY	RPCDBG_TRANS
58 #endif
59 
60 MODULE_LICENSE("Dual BSD/GPL");
61 
62 MODULE_DESCRIPTION("RPC/RDMA Transport for Linux kernel NFS");
63 MODULE_AUTHOR("Network Appliance, Inc.");
64 
65 /*
66  * tunables
67  */
68 
69 static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
70 static unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
71 static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
72 static unsigned int xprt_rdma_inline_write_padding;
73 #if !RPCRDMA_PERSISTENT_REGISTRATION
74 static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_REGISTER; /* FMR? */
75 #else
76 static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_ALLPHYSICAL;
77 #endif
78 
79 #ifdef RPC_DEBUG
80 
81 static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
82 static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
83 static unsigned int zero;
84 static unsigned int max_padding = PAGE_SIZE;
85 static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
86 static unsigned int max_memreg = RPCRDMA_LAST - 1;
87 
88 static struct ctl_table_header *sunrpc_table_header;
89 
90 static ctl_table xr_tunables_table[] = {
91 	{
92 		.ctl_name       = CTL_UNNUMBERED,
93 		.procname	= "rdma_slot_table_entries",
94 		.data		= &xprt_rdma_slot_table_entries,
95 		.maxlen		= sizeof(unsigned int),
96 		.mode		= 0644,
97 		.proc_handler	= &proc_dointvec_minmax,
98 		.strategy	= &sysctl_intvec,
99 		.extra1		= &min_slot_table_size,
100 		.extra2		= &max_slot_table_size
101 	},
102 	{
103 		.ctl_name       = CTL_UNNUMBERED,
104 		.procname	= "rdma_max_inline_read",
105 		.data		= &xprt_rdma_max_inline_read,
106 		.maxlen		= sizeof(unsigned int),
107 		.mode		= 0644,
108 		.proc_handler	= &proc_dointvec,
109 		.strategy	= &sysctl_intvec,
110 	},
111 	{
112 		.ctl_name       = CTL_UNNUMBERED,
113 		.procname	= "rdma_max_inline_write",
114 		.data		= &xprt_rdma_max_inline_write,
115 		.maxlen		= sizeof(unsigned int),
116 		.mode		= 0644,
117 		.proc_handler	= &proc_dointvec,
118 		.strategy	= &sysctl_intvec,
119 	},
120 	{
121 		.ctl_name       = CTL_UNNUMBERED,
122 		.procname	= "rdma_inline_write_padding",
123 		.data		= &xprt_rdma_inline_write_padding,
124 		.maxlen		= sizeof(unsigned int),
125 		.mode		= 0644,
126 		.proc_handler	= &proc_dointvec_minmax,
127 		.strategy	= &sysctl_intvec,
128 		.extra1		= &zero,
129 		.extra2		= &max_padding,
130 	},
131 	{
132 		.ctl_name       = CTL_UNNUMBERED,
133 		.procname	= "rdma_memreg_strategy",
134 		.data		= &xprt_rdma_memreg_strategy,
135 		.maxlen		= sizeof(unsigned int),
136 		.mode		= 0644,
137 		.proc_handler	= &proc_dointvec_minmax,
138 		.strategy	= &sysctl_intvec,
139 		.extra1		= &min_memreg,
140 		.extra2		= &max_memreg,
141 	},
142 	{
143 		.ctl_name = 0,
144 	},
145 };
146 
147 static ctl_table sunrpc_table[] = {
148 	{
149 		.ctl_name	= CTL_SUNRPC,
150 		.procname	= "sunrpc",
151 		.mode		= 0555,
152 		.child		= xr_tunables_table
153 	},
154 	{
155 		.ctl_name = 0,
156 	},
157 };
158 
159 #endif
160 
161 static struct rpc_xprt_ops xprt_rdma_procs;	/* forward reference */
162 
163 static void
164 xprt_rdma_format_addresses(struct rpc_xprt *xprt)
165 {
166 	struct sockaddr_in *addr = (struct sockaddr_in *)
167 					&rpcx_to_rdmad(xprt).addr;
168 	char *buf;
169 
170 	buf = kzalloc(20, GFP_KERNEL);
171 	if (buf)
172 		snprintf(buf, 20, NIPQUAD_FMT, NIPQUAD(addr->sin_addr.s_addr));
173 	xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
174 
175 	buf = kzalloc(8, GFP_KERNEL);
176 	if (buf)
177 		snprintf(buf, 8, "%u", ntohs(addr->sin_port));
178 	xprt->address_strings[RPC_DISPLAY_PORT] = buf;
179 
180 	xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
181 
182 	buf = kzalloc(48, GFP_KERNEL);
183 	if (buf)
184 		snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
185 			NIPQUAD(addr->sin_addr.s_addr),
186 			ntohs(addr->sin_port), "rdma");
187 	xprt->address_strings[RPC_DISPLAY_ALL] = buf;
188 
189 	buf = kzalloc(10, GFP_KERNEL);
190 	if (buf)
191 		snprintf(buf, 10, "%02x%02x%02x%02x",
192 			NIPQUAD(addr->sin_addr.s_addr));
193 	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
194 
195 	buf = kzalloc(8, GFP_KERNEL);
196 	if (buf)
197 		snprintf(buf, 8, "%4hx", ntohs(addr->sin_port));
198 	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
199 
200 	buf = kzalloc(30, GFP_KERNEL);
201 	if (buf)
202 		snprintf(buf, 30, NIPQUAD_FMT".%u.%u",
203 			NIPQUAD(addr->sin_addr.s_addr),
204 			ntohs(addr->sin_port) >> 8,
205 			ntohs(addr->sin_port) & 0xff);
206 	xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
207 
208 	/* netid */
209 	xprt->address_strings[RPC_DISPLAY_NETID] = "rdma";
210 }
211 
212 static void
213 xprt_rdma_free_addresses(struct rpc_xprt *xprt)
214 {
215 	unsigned int i;
216 
217 	for (i = 0; i < RPC_DISPLAY_MAX; i++)
218 		switch (i) {
219 		case RPC_DISPLAY_PROTO:
220 		case RPC_DISPLAY_NETID:
221 			continue;
222 		default:
223 			kfree(xprt->address_strings[i]);
224 		}
225 }
226 
227 static void
228 xprt_rdma_connect_worker(struct work_struct *work)
229 {
230 	struct rpcrdma_xprt *r_xprt =
231 		container_of(work, struct rpcrdma_xprt, rdma_connect.work);
232 	struct rpc_xprt *xprt = &r_xprt->xprt;
233 	int rc = 0;
234 
235 	if (!xprt->shutdown) {
236 		xprt_clear_connected(xprt);
237 
238 		dprintk("RPC:       %s: %sconnect\n", __func__,
239 				r_xprt->rx_ep.rep_connected != 0 ? "re" : "");
240 		rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
241 		if (rc)
242 			goto out;
243 	}
244 	goto out_clear;
245 
246 out:
247 	xprt_wake_pending_tasks(xprt, rc);
248 
249 out_clear:
250 	dprintk("RPC:       %s: exit\n", __func__);
251 	xprt_clear_connecting(xprt);
252 }
253 
254 /*
255  * xprt_rdma_destroy
256  *
257  * Destroy the xprt.
258  * Free all memory associated with the object, including its own.
259  * NOTE: none of the *destroy methods free memory for their top-level
260  * objects, even though they may have allocated it (they do free
261  * private memory). It's up to the caller to handle it. In this
262  * case (RDMA transport), all structure memory is inlined with the
263  * struct rpcrdma_xprt.
264  */
265 static void
266 xprt_rdma_destroy(struct rpc_xprt *xprt)
267 {
268 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
269 	int rc;
270 
271 	dprintk("RPC:       %s: called\n", __func__);
272 
273 	cancel_delayed_work(&r_xprt->rdma_connect);
274 	flush_scheduled_work();
275 
276 	xprt_clear_connected(xprt);
277 
278 	rpcrdma_buffer_destroy(&r_xprt->rx_buf);
279 	rc = rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
280 	if (rc)
281 		dprintk("RPC:       %s: rpcrdma_ep_destroy returned %i\n",
282 			__func__, rc);
283 	rpcrdma_ia_close(&r_xprt->rx_ia);
284 
285 	xprt_rdma_free_addresses(xprt);
286 
287 	kfree(xprt->slot);
288 	xprt->slot = NULL;
289 	kfree(xprt);
290 
291 	dprintk("RPC:       %s: returning\n", __func__);
292 
293 	module_put(THIS_MODULE);
294 }
295 
296 static const struct rpc_timeout xprt_rdma_default_timeout = {
297 	.to_initval = 60 * HZ,
298 	.to_maxval = 60 * HZ,
299 };
300 
301 /**
302  * xprt_setup_rdma - Set up transport to use RDMA
303  *
304  * @args: rpc transport arguments
305  */
306 static struct rpc_xprt *
307 xprt_setup_rdma(struct xprt_create *args)
308 {
309 	struct rpcrdma_create_data_internal cdata;
310 	struct rpc_xprt *xprt;
311 	struct rpcrdma_xprt *new_xprt;
312 	struct rpcrdma_ep *new_ep;
313 	struct sockaddr_in *sin;
314 	int rc;
315 
316 	if (args->addrlen > sizeof(xprt->addr)) {
317 		dprintk("RPC:       %s: address too large\n", __func__);
318 		return ERR_PTR(-EBADF);
319 	}
320 
321 	xprt = kzalloc(sizeof(struct rpcrdma_xprt), GFP_KERNEL);
322 	if (xprt == NULL) {
323 		dprintk("RPC:       %s: couldn't allocate rpcrdma_xprt\n",
324 			__func__);
325 		return ERR_PTR(-ENOMEM);
326 	}
327 
328 	xprt->max_reqs = xprt_rdma_slot_table_entries;
329 	xprt->slot = kcalloc(xprt->max_reqs,
330 				sizeof(struct rpc_rqst), GFP_KERNEL);
331 	if (xprt->slot == NULL) {
332 		dprintk("RPC:       %s: couldn't allocate %d slots\n",
333 			__func__, xprt->max_reqs);
334 		kfree(xprt);
335 		return ERR_PTR(-ENOMEM);
336 	}
337 
338 	/* 60 second timeout, no retries */
339 	xprt->timeout = &xprt_rdma_default_timeout;
340 	xprt->bind_timeout = (60U * HZ);
341 	xprt->connect_timeout = (60U * HZ);
342 	xprt->reestablish_timeout = (5U * HZ);
343 	xprt->idle_timeout = (5U * 60 * HZ);
344 
345 	xprt->resvport = 0;		/* privileged port not needed */
346 	xprt->tsh_size = 0;		/* RPC-RDMA handles framing */
347 	xprt->max_payload = RPCRDMA_MAX_DATA_SEGS * PAGE_SIZE;
348 	xprt->ops = &xprt_rdma_procs;
349 
350 	/*
351 	 * Set up RDMA-specific connect data.
352 	 */
353 
354 	/* Put server RDMA address in local cdata */
355 	memcpy(&cdata.addr, args->dstaddr, args->addrlen);
356 
357 	/* Ensure xprt->addr holds valid server TCP (not RDMA)
358 	 * address, for any side protocols which peek at it */
359 	xprt->prot = IPPROTO_TCP;
360 	xprt->addrlen = args->addrlen;
361 	memcpy(&xprt->addr, &cdata.addr, xprt->addrlen);
362 
363 	sin = (struct sockaddr_in *)&cdata.addr;
364 	if (ntohs(sin->sin_port) != 0)
365 		xprt_set_bound(xprt);
366 
367 	dprintk("RPC:       %s: %u.%u.%u.%u:%u\n", __func__,
368 			NIPQUAD(sin->sin_addr.s_addr), ntohs(sin->sin_port));
369 
370 	/* Set max requests */
371 	cdata.max_requests = xprt->max_reqs;
372 
373 	/* Set some length limits */
374 	cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
375 	cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
376 
377 	cdata.inline_wsize = xprt_rdma_max_inline_write;
378 	if (cdata.inline_wsize > cdata.wsize)
379 		cdata.inline_wsize = cdata.wsize;
380 
381 	cdata.inline_rsize = xprt_rdma_max_inline_read;
382 	if (cdata.inline_rsize > cdata.rsize)
383 		cdata.inline_rsize = cdata.rsize;
384 
385 	cdata.padding = xprt_rdma_inline_write_padding;
386 
387 	/*
388 	 * Create new transport instance, which includes initialized
389 	 *  o ia
390 	 *  o endpoint
391 	 *  o buffers
392 	 */
393 
394 	new_xprt = rpcx_to_rdmax(xprt);
395 
396 	rc = rpcrdma_ia_open(new_xprt, (struct sockaddr *) &cdata.addr,
397 				xprt_rdma_memreg_strategy);
398 	if (rc)
399 		goto out1;
400 
401 	/*
402 	 * initialize and create ep
403 	 */
404 	new_xprt->rx_data = cdata;
405 	new_ep = &new_xprt->rx_ep;
406 	new_ep->rep_remote_addr = cdata.addr;
407 
408 	rc = rpcrdma_ep_create(&new_xprt->rx_ep,
409 				&new_xprt->rx_ia, &new_xprt->rx_data);
410 	if (rc)
411 		goto out2;
412 
413 	/*
414 	 * Allocate pre-registered send and receive buffers for headers and
415 	 * any inline data. Also specify any padding which will be provided
416 	 * from a preregistered zero buffer.
417 	 */
418 	rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia,
419 				&new_xprt->rx_data);
420 	if (rc)
421 		goto out3;
422 
423 	/*
424 	 * Register a callback for connection events. This is necessary because
425 	 * connection loss notification is async. We also catch connection loss
426 	 * when reaping receives.
427 	 */
428 	INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker);
429 	new_ep->rep_func = rpcrdma_conn_func;
430 	new_ep->rep_xprt = xprt;
431 
432 	xprt_rdma_format_addresses(xprt);
433 
434 	if (!try_module_get(THIS_MODULE))
435 		goto out4;
436 
437 	return xprt;
438 
439 out4:
440 	xprt_rdma_free_addresses(xprt);
441 	rc = -EINVAL;
442 out3:
443 	(void) rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
444 out2:
445 	rpcrdma_ia_close(&new_xprt->rx_ia);
446 out1:
447 	kfree(xprt->slot);
448 	kfree(xprt);
449 	return ERR_PTR(rc);
450 }
451 
452 /*
453  * Close a connection, during shutdown or timeout/reconnect
454  */
455 static void
456 xprt_rdma_close(struct rpc_xprt *xprt)
457 {
458 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
459 
460 	dprintk("RPC:       %s: closing\n", __func__);
461 	xprt_disconnect_done(xprt);
462 	(void) rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
463 }
464 
465 static void
466 xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
467 {
468 	struct sockaddr_in *sap;
469 
470 	sap = (struct sockaddr_in *)&xprt->addr;
471 	sap->sin_port = htons(port);
472 	sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr;
473 	sap->sin_port = htons(port);
474 	dprintk("RPC:       %s: %u\n", __func__, port);
475 }
476 
477 static void
478 xprt_rdma_connect(struct rpc_task *task)
479 {
480 	struct rpc_xprt *xprt = (struct rpc_xprt *)task->tk_xprt;
481 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
482 
483 	if (!xprt_test_and_set_connecting(xprt)) {
484 		if (r_xprt->rx_ep.rep_connected != 0) {
485 			/* Reconnect */
486 			schedule_delayed_work(&r_xprt->rdma_connect,
487 				xprt->reestablish_timeout);
488 		} else {
489 			schedule_delayed_work(&r_xprt->rdma_connect, 0);
490 			if (!RPC_IS_ASYNC(task))
491 				flush_scheduled_work();
492 		}
493 	}
494 }
495 
496 static int
497 xprt_rdma_reserve_xprt(struct rpc_task *task)
498 {
499 	struct rpc_xprt *xprt = task->tk_xprt;
500 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
501 	int credits = atomic_read(&r_xprt->rx_buf.rb_credits);
502 
503 	/* == RPC_CWNDSCALE @ init, but *after* setup */
504 	if (r_xprt->rx_buf.rb_cwndscale == 0UL) {
505 		r_xprt->rx_buf.rb_cwndscale = xprt->cwnd;
506 		dprintk("RPC:       %s: cwndscale %lu\n", __func__,
507 			r_xprt->rx_buf.rb_cwndscale);
508 		BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0);
509 	}
510 	xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale;
511 	return xprt_reserve_xprt_cong(task);
512 }
513 
514 /*
515  * The RDMA allocate/free functions need the task structure as a place
516  * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
517  * sequence. For this reason, the recv buffers are attached to send
518  * buffers for portions of the RPC. Note that the RPC layer allocates
519  * both send and receive buffers in the same call. We may register
520  * the receive buffer portion when using reply chunks.
521  */
522 static void *
523 xprt_rdma_allocate(struct rpc_task *task, size_t size)
524 {
525 	struct rpc_xprt *xprt = task->tk_xprt;
526 	struct rpcrdma_req *req, *nreq;
527 
528 	req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
529 	BUG_ON(NULL == req);
530 
531 	if (size > req->rl_size) {
532 		dprintk("RPC:       %s: size %zd too large for buffer[%zd]: "
533 			"prog %d vers %d proc %d\n",
534 			__func__, size, req->rl_size,
535 			task->tk_client->cl_prog, task->tk_client->cl_vers,
536 			task->tk_msg.rpc_proc->p_proc);
537 		/*
538 		 * Outgoing length shortage. Our inline write max must have
539 		 * been configured to perform direct i/o.
540 		 *
541 		 * This is therefore a large metadata operation, and the
542 		 * allocate call was made on the maximum possible message,
543 		 * e.g. containing long filename(s) or symlink data. In
544 		 * fact, while these metadata operations *might* carry
545 		 * large outgoing payloads, they rarely *do*. However, we
546 		 * have to commit to the request here, so reallocate and
547 		 * register it now. The data path will never require this
548 		 * reallocation.
549 		 *
550 		 * If the allocation or registration fails, the RPC framework
551 		 * will (doggedly) retry.
552 		 */
553 		if (rpcx_to_rdmax(xprt)->rx_ia.ri_memreg_strategy ==
554 				RPCRDMA_BOUNCEBUFFERS) {
555 			/* forced to "pure inline" */
556 			dprintk("RPC:       %s: too much data (%zd) for inline "
557 					"(r/w max %d/%d)\n", __func__, size,
558 					rpcx_to_rdmad(xprt).inline_rsize,
559 					rpcx_to_rdmad(xprt).inline_wsize);
560 			size = req->rl_size;
561 			rpc_exit(task, -EIO);		/* fail the operation */
562 			rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
563 			goto out;
564 		}
565 		if (task->tk_flags & RPC_TASK_SWAPPER)
566 			nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
567 		else
568 			nreq = kmalloc(sizeof *req + size, GFP_NOFS);
569 		if (nreq == NULL)
570 			goto outfail;
571 
572 		if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
573 				nreq->rl_base, size + sizeof(struct rpcrdma_req)
574 				- offsetof(struct rpcrdma_req, rl_base),
575 				&nreq->rl_handle, &nreq->rl_iov)) {
576 			kfree(nreq);
577 			goto outfail;
578 		}
579 		rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
580 		nreq->rl_size = size;
581 		nreq->rl_niovs = 0;
582 		nreq->rl_nchunks = 0;
583 		nreq->rl_buffer = (struct rpcrdma_buffer *)req;
584 		nreq->rl_reply = req->rl_reply;
585 		memcpy(nreq->rl_segments,
586 			req->rl_segments, sizeof nreq->rl_segments);
587 		/* flag the swap with an unused field */
588 		nreq->rl_iov.length = 0;
589 		req->rl_reply = NULL;
590 		req = nreq;
591 	}
592 	dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
593 out:
594 	return req->rl_xdr_buf;
595 
596 outfail:
597 	rpcrdma_buffer_put(req);
598 	rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
599 	return NULL;
600 }
601 
602 /*
603  * This function returns all RDMA resources to the pool.
604  */
605 static void
606 xprt_rdma_free(void *buffer)
607 {
608 	struct rpcrdma_req *req;
609 	struct rpcrdma_xprt *r_xprt;
610 	struct rpcrdma_rep *rep;
611 	int i;
612 
613 	if (buffer == NULL)
614 		return;
615 
616 	req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
617 	if (req->rl_iov.length == 0) {	/* see allocate above */
618 		r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer,
619 				      struct rpcrdma_xprt, rx_buf);
620 	} else
621 		r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
622 	rep = req->rl_reply;
623 
624 	dprintk("RPC:       %s: called on 0x%p%s\n",
625 		__func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
626 
627 	/*
628 	 * Finish the deregistration. When using mw bind, this was
629 	 * begun in rpcrdma_reply_handler(). In all other modes, we
630 	 * do it here, in thread context. The process is considered
631 	 * complete when the rr_func vector becomes NULL - this
632 	 * was put in place during rpcrdma_reply_handler() - the wait
633 	 * call below will not block if the dereg is "done". If
634 	 * interrupted, our framework will clean up.
635 	 */
636 	for (i = 0; req->rl_nchunks;) {
637 		--req->rl_nchunks;
638 		i += rpcrdma_deregister_external(
639 			&req->rl_segments[i], r_xprt, NULL);
640 	}
641 
642 	if (rep && wait_event_interruptible(rep->rr_unbind, !rep->rr_func)) {
643 		rep->rr_func = NULL;	/* abandon the callback */
644 		req->rl_reply = NULL;
645 	}
646 
647 	if (req->rl_iov.length == 0) {	/* see allocate above */
648 		struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
649 		oreq->rl_reply = req->rl_reply;
650 		(void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
651 						   req->rl_handle,
652 						   &req->rl_iov);
653 		kfree(req);
654 		req = oreq;
655 	}
656 
657 	/* Put back request+reply buffers */
658 	rpcrdma_buffer_put(req);
659 }
660 
661 /*
662  * send_request invokes the meat of RPC RDMA. It must do the following:
663  *  1.  Marshal the RPC request into an RPC RDMA request, which means
664  *	putting a header in front of data, and creating IOVs for RDMA
665  *	from those in the request.
666  *  2.  In marshaling, detect opportunities for RDMA, and use them.
667  *  3.  Post a recv message to set up asynch completion, then send
668  *	the request (rpcrdma_ep_post).
669  *  4.  No partial sends are possible in the RPC-RDMA protocol (as in UDP).
670  */
671 
672 static int
673 xprt_rdma_send_request(struct rpc_task *task)
674 {
675 	struct rpc_rqst *rqst = task->tk_rqstp;
676 	struct rpc_xprt *xprt = task->tk_xprt;
677 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
678 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
679 
680 	/* marshal the send itself */
681 	if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) {
682 		r_xprt->rx_stats.failed_marshal_count++;
683 		dprintk("RPC:       %s: rpcrdma_marshal_req failed\n",
684 			__func__);
685 		return -EIO;
686 	}
687 
688 	if (req->rl_reply == NULL) 		/* e.g. reconnection */
689 		rpcrdma_recv_buffer_get(req);
690 
691 	if (req->rl_reply) {
692 		req->rl_reply->rr_func = rpcrdma_reply_handler;
693 		/* this need only be done once, but... */
694 		req->rl_reply->rr_xprt = xprt;
695 	}
696 
697 	if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) {
698 		xprt_disconnect_done(xprt);
699 		return -ENOTCONN;	/* implies disconnect */
700 	}
701 
702 	rqst->rq_bytes_sent = 0;
703 	return 0;
704 }
705 
706 static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
707 {
708 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
709 	long idle_time = 0;
710 
711 	if (xprt_connected(xprt))
712 		idle_time = (long)(jiffies - xprt->last_used) / HZ;
713 
714 	seq_printf(seq,
715 	  "\txprt:\trdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu "
716 	  "%lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu\n",
717 
718 	   0,	/* need a local port? */
719 	   xprt->stat.bind_count,
720 	   xprt->stat.connect_count,
721 	   xprt->stat.connect_time,
722 	   idle_time,
723 	   xprt->stat.sends,
724 	   xprt->stat.recvs,
725 	   xprt->stat.bad_xids,
726 	   xprt->stat.req_u,
727 	   xprt->stat.bklog_u,
728 
729 	   r_xprt->rx_stats.read_chunk_count,
730 	   r_xprt->rx_stats.write_chunk_count,
731 	   r_xprt->rx_stats.reply_chunk_count,
732 	   r_xprt->rx_stats.total_rdma_request,
733 	   r_xprt->rx_stats.total_rdma_reply,
734 	   r_xprt->rx_stats.pullup_copy_count,
735 	   r_xprt->rx_stats.fixup_copy_count,
736 	   r_xprt->rx_stats.hardway_register_count,
737 	   r_xprt->rx_stats.failed_marshal_count,
738 	   r_xprt->rx_stats.bad_reply_count);
739 }
740 
741 /*
742  * Plumbing for rpc transport switch and kernel module
743  */
744 
745 static struct rpc_xprt_ops xprt_rdma_procs = {
746 	.reserve_xprt		= xprt_rdma_reserve_xprt,
747 	.release_xprt		= xprt_release_xprt_cong, /* sunrpc/xprt.c */
748 	.release_request	= xprt_release_rqst_cong,       /* ditto */
749 	.set_retrans_timeout	= xprt_set_retrans_timeout_def, /* ditto */
750 	.rpcbind		= rpcb_getport_async,	/* sunrpc/rpcb_clnt.c */
751 	.set_port		= xprt_rdma_set_port,
752 	.connect		= xprt_rdma_connect,
753 	.buf_alloc		= xprt_rdma_allocate,
754 	.buf_free		= xprt_rdma_free,
755 	.send_request		= xprt_rdma_send_request,
756 	.close			= xprt_rdma_close,
757 	.destroy		= xprt_rdma_destroy,
758 	.print_stats		= xprt_rdma_print_stats
759 };
760 
761 static struct xprt_class xprt_rdma = {
762 	.list			= LIST_HEAD_INIT(xprt_rdma.list),
763 	.name			= "rdma",
764 	.owner			= THIS_MODULE,
765 	.ident			= XPRT_TRANSPORT_RDMA,
766 	.setup			= xprt_setup_rdma,
767 };
768 
769 static void __exit xprt_rdma_cleanup(void)
770 {
771 	int rc;
772 
773 	dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");
774 #ifdef RPC_DEBUG
775 	if (sunrpc_table_header) {
776 		unregister_sysctl_table(sunrpc_table_header);
777 		sunrpc_table_header = NULL;
778 	}
779 #endif
780 	rc = xprt_unregister_transport(&xprt_rdma);
781 	if (rc)
782 		dprintk("RPC:       %s: xprt_unregister returned %i\n",
783 			__func__, rc);
784 }
785 
786 static int __init xprt_rdma_init(void)
787 {
788 	int rc;
789 
790 	rc = xprt_register_transport(&xprt_rdma);
791 
792 	if (rc)
793 		return rc;
794 
795 	dprintk(KERN_INFO "RPCRDMA Module Init, register RPC RDMA transport\n");
796 
797 	dprintk(KERN_INFO "Defaults:\n");
798 	dprintk(KERN_INFO "\tSlots %d\n"
799 		"\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
800 		xprt_rdma_slot_table_entries,
801 		xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
802 	dprintk(KERN_INFO "\tPadding %d\n\tMemreg %d\n",
803 		xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy);
804 
805 #ifdef RPC_DEBUG
806 	if (!sunrpc_table_header)
807 		sunrpc_table_header = register_sysctl_table(sunrpc_table);
808 #endif
809 	return 0;
810 }
811 
812 module_init(xprt_rdma_init);
813 module_exit(xprt_rdma_cleanup);
814