xref: /openbmc/linux/net/sunrpc/xprtrdma/verbs.c (revision d5e7cafd)
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39 
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49 
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <linux/prefetch.h>
53 #include <asm/bitops.h>
54 
55 #include "xprt_rdma.h"
56 
57 /*
58  * Globals/Macros
59  */
60 
61 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
62 # define RPCDBG_FACILITY	RPCDBG_TRANS
63 #endif
64 
65 static void rpcrdma_reset_frmrs(struct rpcrdma_ia *);
66 static void rpcrdma_reset_fmrs(struct rpcrdma_ia *);
67 
68 /*
69  * internal functions
70  */
71 
72 /*
73  * handle replies in tasklet context, using a single, global list
74  * rdma tasklet function -- just turn around and call the func
75  * for all replies on the list
76  */
77 
78 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
79 static LIST_HEAD(rpcrdma_tasklets_g);
80 
81 static void
82 rpcrdma_run_tasklet(unsigned long data)
83 {
84 	struct rpcrdma_rep *rep;
85 	void (*func)(struct rpcrdma_rep *);
86 	unsigned long flags;
87 
88 	data = data;
89 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
90 	while (!list_empty(&rpcrdma_tasklets_g)) {
91 		rep = list_entry(rpcrdma_tasklets_g.next,
92 				 struct rpcrdma_rep, rr_list);
93 		list_del(&rep->rr_list);
94 		func = rep->rr_func;
95 		rep->rr_func = NULL;
96 		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
97 
98 		if (func)
99 			func(rep);
100 		else
101 			rpcrdma_recv_buffer_put(rep);
102 
103 		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
104 	}
105 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
106 }
107 
108 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
109 
110 static const char * const async_event[] = {
111 	"CQ error",
112 	"QP fatal error",
113 	"QP request error",
114 	"QP access error",
115 	"communication established",
116 	"send queue drained",
117 	"path migration successful",
118 	"path mig error",
119 	"device fatal error",
120 	"port active",
121 	"port error",
122 	"LID change",
123 	"P_key change",
124 	"SM change",
125 	"SRQ error",
126 	"SRQ limit reached",
127 	"last WQE reached",
128 	"client reregister",
129 	"GID change",
130 };
131 
132 #define ASYNC_MSG(status)					\
133 	((status) < ARRAY_SIZE(async_event) ?			\
134 		async_event[(status)] : "unknown async error")
135 
136 static void
137 rpcrdma_schedule_tasklet(struct list_head *sched_list)
138 {
139 	unsigned long flags;
140 
141 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
142 	list_splice_tail(sched_list, &rpcrdma_tasklets_g);
143 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
144 	tasklet_schedule(&rpcrdma_tasklet_g);
145 }
146 
147 static void
148 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
149 {
150 	struct rpcrdma_ep *ep = context;
151 
152 	pr_err("RPC:       %s: %s on device %s ep %p\n",
153 	       __func__, ASYNC_MSG(event->event),
154 		event->device->name, context);
155 	if (ep->rep_connected == 1) {
156 		ep->rep_connected = -EIO;
157 		rpcrdma_conn_func(ep);
158 		wake_up_all(&ep->rep_connect_wait);
159 	}
160 }
161 
162 static void
163 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
164 {
165 	struct rpcrdma_ep *ep = context;
166 
167 	pr_err("RPC:       %s: %s on device %s ep %p\n",
168 	       __func__, ASYNC_MSG(event->event),
169 		event->device->name, context);
170 	if (ep->rep_connected == 1) {
171 		ep->rep_connected = -EIO;
172 		rpcrdma_conn_func(ep);
173 		wake_up_all(&ep->rep_connect_wait);
174 	}
175 }
176 
177 static const char * const wc_status[] = {
178 	"success",
179 	"local length error",
180 	"local QP operation error",
181 	"local EE context operation error",
182 	"local protection error",
183 	"WR flushed",
184 	"memory management operation error",
185 	"bad response error",
186 	"local access error",
187 	"remote invalid request error",
188 	"remote access error",
189 	"remote operation error",
190 	"transport retry counter exceeded",
191 	"RNR retrycounter exceeded",
192 	"local RDD violation error",
193 	"remove invalid RD request",
194 	"operation aborted",
195 	"invalid EE context number",
196 	"invalid EE context state",
197 	"fatal error",
198 	"response timeout error",
199 	"general error",
200 };
201 
202 #define COMPLETION_MSG(status)					\
203 	((status) < ARRAY_SIZE(wc_status) ?			\
204 		wc_status[(status)] : "unexpected completion error")
205 
206 static void
207 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
208 {
209 	if (likely(wc->status == IB_WC_SUCCESS))
210 		return;
211 
212 	/* WARNING: Only wr_id and status are reliable at this point */
213 	if (wc->wr_id == 0ULL) {
214 		if (wc->status != IB_WC_WR_FLUSH_ERR)
215 			pr_err("RPC:       %s: SEND: %s\n",
216 			       __func__, COMPLETION_MSG(wc->status));
217 	} else {
218 		struct rpcrdma_mw *r;
219 
220 		r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
221 		r->r.frmr.fr_state = FRMR_IS_STALE;
222 		pr_err("RPC:       %s: frmr %p (stale): %s\n",
223 		       __func__, r, COMPLETION_MSG(wc->status));
224 	}
225 }
226 
227 static int
228 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
229 {
230 	struct ib_wc *wcs;
231 	int budget, count, rc;
232 
233 	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
234 	do {
235 		wcs = ep->rep_send_wcs;
236 
237 		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
238 		if (rc <= 0)
239 			return rc;
240 
241 		count = rc;
242 		while (count-- > 0)
243 			rpcrdma_sendcq_process_wc(wcs++);
244 	} while (rc == RPCRDMA_POLLSIZE && --budget);
245 	return 0;
246 }
247 
248 /*
249  * Handle send, fast_reg_mr, and local_inv completions.
250  *
251  * Send events are typically suppressed and thus do not result
252  * in an upcall. Occasionally one is signaled, however. This
253  * prevents the provider's completion queue from wrapping and
254  * losing a completion.
255  */
256 static void
257 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
258 {
259 	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
260 	int rc;
261 
262 	rc = rpcrdma_sendcq_poll(cq, ep);
263 	if (rc) {
264 		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
265 			__func__, rc);
266 		return;
267 	}
268 
269 	rc = ib_req_notify_cq(cq,
270 			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
271 	if (rc == 0)
272 		return;
273 	if (rc < 0) {
274 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
275 			__func__, rc);
276 		return;
277 	}
278 
279 	rpcrdma_sendcq_poll(cq, ep);
280 }
281 
282 static void
283 rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
284 {
285 	struct rpcrdma_rep *rep =
286 			(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
287 
288 	/* WARNING: Only wr_id and status are reliable at this point */
289 	if (wc->status != IB_WC_SUCCESS)
290 		goto out_fail;
291 
292 	/* status == SUCCESS means all fields in wc are trustworthy */
293 	if (wc->opcode != IB_WC_RECV)
294 		return;
295 
296 	dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
297 		__func__, rep, wc->byte_len);
298 
299 	rep->rr_len = wc->byte_len;
300 	ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
301 				   rdmab_addr(rep->rr_rdmabuf),
302 				   rep->rr_len, DMA_FROM_DEVICE);
303 	prefetch(rdmab_to_msg(rep->rr_rdmabuf));
304 
305 out_schedule:
306 	list_add_tail(&rep->rr_list, sched_list);
307 	return;
308 out_fail:
309 	if (wc->status != IB_WC_WR_FLUSH_ERR)
310 		pr_err("RPC:       %s: rep %p: %s\n",
311 		       __func__, rep, COMPLETION_MSG(wc->status));
312 	rep->rr_len = ~0U;
313 	goto out_schedule;
314 }
315 
316 static int
317 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
318 {
319 	struct list_head sched_list;
320 	struct ib_wc *wcs;
321 	int budget, count, rc;
322 
323 	INIT_LIST_HEAD(&sched_list);
324 	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
325 	do {
326 		wcs = ep->rep_recv_wcs;
327 
328 		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
329 		if (rc <= 0)
330 			goto out_schedule;
331 
332 		count = rc;
333 		while (count-- > 0)
334 			rpcrdma_recvcq_process_wc(wcs++, &sched_list);
335 	} while (rc == RPCRDMA_POLLSIZE && --budget);
336 	rc = 0;
337 
338 out_schedule:
339 	rpcrdma_schedule_tasklet(&sched_list);
340 	return rc;
341 }
342 
343 /*
344  * Handle receive completions.
345  *
346  * It is reentrant but processes single events in order to maintain
347  * ordering of receives to keep server credits.
348  *
349  * It is the responsibility of the scheduled tasklet to return
350  * recv buffers to the pool. NOTE: this affects synchronization of
351  * connection shutdown. That is, the structures required for
352  * the completion of the reply handler must remain intact until
353  * all memory has been reclaimed.
354  */
355 static void
356 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
357 {
358 	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
359 	int rc;
360 
361 	rc = rpcrdma_recvcq_poll(cq, ep);
362 	if (rc) {
363 		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
364 			__func__, rc);
365 		return;
366 	}
367 
368 	rc = ib_req_notify_cq(cq,
369 			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
370 	if (rc == 0)
371 		return;
372 	if (rc < 0) {
373 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
374 			__func__, rc);
375 		return;
376 	}
377 
378 	rpcrdma_recvcq_poll(cq, ep);
379 }
380 
381 static void
382 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
383 {
384 	struct ib_wc wc;
385 	LIST_HEAD(sched_list);
386 
387 	while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
388 		rpcrdma_recvcq_process_wc(&wc, &sched_list);
389 	if (!list_empty(&sched_list))
390 		rpcrdma_schedule_tasklet(&sched_list);
391 	while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
392 		rpcrdma_sendcq_process_wc(&wc);
393 }
394 
395 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
396 static const char * const conn[] = {
397 	"address resolved",
398 	"address error",
399 	"route resolved",
400 	"route error",
401 	"connect request",
402 	"connect response",
403 	"connect error",
404 	"unreachable",
405 	"rejected",
406 	"established",
407 	"disconnected",
408 	"device removal",
409 	"multicast join",
410 	"multicast error",
411 	"address change",
412 	"timewait exit",
413 };
414 
415 #define CONNECTION_MSG(status)						\
416 	((status) < ARRAY_SIZE(conn) ?					\
417 		conn[(status)] : "unrecognized connection error")
418 #endif
419 
420 static int
421 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
422 {
423 	struct rpcrdma_xprt *xprt = id->context;
424 	struct rpcrdma_ia *ia = &xprt->rx_ia;
425 	struct rpcrdma_ep *ep = &xprt->rx_ep;
426 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
427 	struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
428 #endif
429 	struct ib_qp_attr *attr = &ia->ri_qp_attr;
430 	struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
431 	int connstate = 0;
432 
433 	switch (event->event) {
434 	case RDMA_CM_EVENT_ADDR_RESOLVED:
435 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
436 		ia->ri_async_rc = 0;
437 		complete(&ia->ri_done);
438 		break;
439 	case RDMA_CM_EVENT_ADDR_ERROR:
440 		ia->ri_async_rc = -EHOSTUNREACH;
441 		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
442 			__func__, ep);
443 		complete(&ia->ri_done);
444 		break;
445 	case RDMA_CM_EVENT_ROUTE_ERROR:
446 		ia->ri_async_rc = -ENETUNREACH;
447 		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
448 			__func__, ep);
449 		complete(&ia->ri_done);
450 		break;
451 	case RDMA_CM_EVENT_ESTABLISHED:
452 		connstate = 1;
453 		ib_query_qp(ia->ri_id->qp, attr,
454 			    IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
455 			    iattr);
456 		dprintk("RPC:       %s: %d responder resources"
457 			" (%d initiator)\n",
458 			__func__, attr->max_dest_rd_atomic,
459 			attr->max_rd_atomic);
460 		goto connected;
461 	case RDMA_CM_EVENT_CONNECT_ERROR:
462 		connstate = -ENOTCONN;
463 		goto connected;
464 	case RDMA_CM_EVENT_UNREACHABLE:
465 		connstate = -ENETDOWN;
466 		goto connected;
467 	case RDMA_CM_EVENT_REJECTED:
468 		connstate = -ECONNREFUSED;
469 		goto connected;
470 	case RDMA_CM_EVENT_DISCONNECTED:
471 		connstate = -ECONNABORTED;
472 		goto connected;
473 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
474 		connstate = -ENODEV;
475 connected:
476 		dprintk("RPC:       %s: %sconnected\n",
477 					__func__, connstate > 0 ? "" : "dis");
478 		ep->rep_connected = connstate;
479 		rpcrdma_conn_func(ep);
480 		wake_up_all(&ep->rep_connect_wait);
481 		/*FALLTHROUGH*/
482 	default:
483 		dprintk("RPC:       %s: %pI4:%u (ep 0x%p): %s\n",
484 			__func__, &addr->sin_addr.s_addr,
485 			ntohs(addr->sin_port), ep,
486 			CONNECTION_MSG(event->event));
487 		break;
488 	}
489 
490 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
491 	if (connstate == 1) {
492 		int ird = attr->max_dest_rd_atomic;
493 		int tird = ep->rep_remote_cma.responder_resources;
494 		printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
495 			"on %s, memreg %d slots %d ird %d%s\n",
496 			&addr->sin_addr.s_addr,
497 			ntohs(addr->sin_port),
498 			ia->ri_id->device->name,
499 			ia->ri_memreg_strategy,
500 			xprt->rx_buf.rb_max_requests,
501 			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
502 	} else if (connstate < 0) {
503 		printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
504 			&addr->sin_addr.s_addr,
505 			ntohs(addr->sin_port),
506 			connstate);
507 	}
508 #endif
509 
510 	return 0;
511 }
512 
513 static struct rdma_cm_id *
514 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
515 			struct rpcrdma_ia *ia, struct sockaddr *addr)
516 {
517 	struct rdma_cm_id *id;
518 	int rc;
519 
520 	init_completion(&ia->ri_done);
521 
522 	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
523 	if (IS_ERR(id)) {
524 		rc = PTR_ERR(id);
525 		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
526 			__func__, rc);
527 		return id;
528 	}
529 
530 	ia->ri_async_rc = -ETIMEDOUT;
531 	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
532 	if (rc) {
533 		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
534 			__func__, rc);
535 		goto out;
536 	}
537 	wait_for_completion_interruptible_timeout(&ia->ri_done,
538 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
539 	rc = ia->ri_async_rc;
540 	if (rc)
541 		goto out;
542 
543 	ia->ri_async_rc = -ETIMEDOUT;
544 	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
545 	if (rc) {
546 		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
547 			__func__, rc);
548 		goto out;
549 	}
550 	wait_for_completion_interruptible_timeout(&ia->ri_done,
551 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
552 	rc = ia->ri_async_rc;
553 	if (rc)
554 		goto out;
555 
556 	return id;
557 
558 out:
559 	rdma_destroy_id(id);
560 	return ERR_PTR(rc);
561 }
562 
563 /*
564  * Drain any cq, prior to teardown.
565  */
566 static void
567 rpcrdma_clean_cq(struct ib_cq *cq)
568 {
569 	struct ib_wc wc;
570 	int count = 0;
571 
572 	while (1 == ib_poll_cq(cq, 1, &wc))
573 		++count;
574 
575 	if (count)
576 		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
577 			__func__, count, wc.opcode);
578 }
579 
580 /*
581  * Exported functions.
582  */
583 
584 /*
585  * Open and initialize an Interface Adapter.
586  *  o initializes fields of struct rpcrdma_ia, including
587  *    interface and provider attributes and protection zone.
588  */
589 int
590 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
591 {
592 	int rc, mem_priv;
593 	struct rpcrdma_ia *ia = &xprt->rx_ia;
594 	struct ib_device_attr *devattr = &ia->ri_devattr;
595 
596 	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
597 	if (IS_ERR(ia->ri_id)) {
598 		rc = PTR_ERR(ia->ri_id);
599 		goto out1;
600 	}
601 
602 	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
603 	if (IS_ERR(ia->ri_pd)) {
604 		rc = PTR_ERR(ia->ri_pd);
605 		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
606 			__func__, rc);
607 		goto out2;
608 	}
609 
610 	rc = ib_query_device(ia->ri_id->device, devattr);
611 	if (rc) {
612 		dprintk("RPC:       %s: ib_query_device failed %d\n",
613 			__func__, rc);
614 		goto out3;
615 	}
616 
617 	if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
618 		ia->ri_have_dma_lkey = 1;
619 		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
620 	}
621 
622 	if (memreg == RPCRDMA_FRMR) {
623 		/* Requires both frmr reg and local dma lkey */
624 		if ((devattr->device_cap_flags &
625 		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
626 		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
627 			dprintk("RPC:       %s: FRMR registration "
628 				"not supported by HCA\n", __func__);
629 			memreg = RPCRDMA_MTHCAFMR;
630 		} else {
631 			/* Mind the ia limit on FRMR page list depth */
632 			ia->ri_max_frmr_depth = min_t(unsigned int,
633 				RPCRDMA_MAX_DATA_SEGS,
634 				devattr->max_fast_reg_page_list_len);
635 		}
636 	}
637 	if (memreg == RPCRDMA_MTHCAFMR) {
638 		if (!ia->ri_id->device->alloc_fmr) {
639 			dprintk("RPC:       %s: MTHCAFMR registration "
640 				"not supported by HCA\n", __func__);
641 			memreg = RPCRDMA_ALLPHYSICAL;
642 		}
643 	}
644 
645 	/*
646 	 * Optionally obtain an underlying physical identity mapping in
647 	 * order to do a memory window-based bind. This base registration
648 	 * is protected from remote access - that is enabled only by binding
649 	 * for the specific bytes targeted during each RPC operation, and
650 	 * revoked after the corresponding completion similar to a storage
651 	 * adapter.
652 	 */
653 	switch (memreg) {
654 	case RPCRDMA_FRMR:
655 		break;
656 	case RPCRDMA_ALLPHYSICAL:
657 		mem_priv = IB_ACCESS_LOCAL_WRITE |
658 				IB_ACCESS_REMOTE_WRITE |
659 				IB_ACCESS_REMOTE_READ;
660 		goto register_setup;
661 	case RPCRDMA_MTHCAFMR:
662 		if (ia->ri_have_dma_lkey)
663 			break;
664 		mem_priv = IB_ACCESS_LOCAL_WRITE;
665 	register_setup:
666 		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
667 		if (IS_ERR(ia->ri_bind_mem)) {
668 			printk(KERN_ALERT "%s: ib_get_dma_mr for "
669 				"phys register failed with %lX\n",
670 				__func__, PTR_ERR(ia->ri_bind_mem));
671 			rc = -ENOMEM;
672 			goto out3;
673 		}
674 		break;
675 	default:
676 		printk(KERN_ERR "RPC: Unsupported memory "
677 				"registration mode: %d\n", memreg);
678 		rc = -ENOMEM;
679 		goto out3;
680 	}
681 	dprintk("RPC:       %s: memory registration strategy is %d\n",
682 		__func__, memreg);
683 
684 	/* Else will do memory reg/dereg for each chunk */
685 	ia->ri_memreg_strategy = memreg;
686 
687 	rwlock_init(&ia->ri_qplock);
688 	return 0;
689 
690 out3:
691 	ib_dealloc_pd(ia->ri_pd);
692 	ia->ri_pd = NULL;
693 out2:
694 	rdma_destroy_id(ia->ri_id);
695 	ia->ri_id = NULL;
696 out1:
697 	return rc;
698 }
699 
700 /*
701  * Clean up/close an IA.
702  *   o if event handles and PD have been initialized, free them.
703  *   o close the IA
704  */
705 void
706 rpcrdma_ia_close(struct rpcrdma_ia *ia)
707 {
708 	int rc;
709 
710 	dprintk("RPC:       %s: entering\n", __func__);
711 	if (ia->ri_bind_mem != NULL) {
712 		rc = ib_dereg_mr(ia->ri_bind_mem);
713 		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
714 			__func__, rc);
715 	}
716 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
717 		if (ia->ri_id->qp)
718 			rdma_destroy_qp(ia->ri_id);
719 		rdma_destroy_id(ia->ri_id);
720 		ia->ri_id = NULL;
721 	}
722 	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
723 		rc = ib_dealloc_pd(ia->ri_pd);
724 		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
725 			__func__, rc);
726 	}
727 }
728 
729 /*
730  * Create unconnected endpoint.
731  */
732 int
733 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
734 				struct rpcrdma_create_data_internal *cdata)
735 {
736 	struct ib_device_attr *devattr = &ia->ri_devattr;
737 	struct ib_cq *sendcq, *recvcq;
738 	int rc, err;
739 
740 	/* check provider's send/recv wr limits */
741 	if (cdata->max_requests > devattr->max_qp_wr)
742 		cdata->max_requests = devattr->max_qp_wr;
743 
744 	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
745 	ep->rep_attr.qp_context = ep;
746 	/* send_cq and recv_cq initialized below */
747 	ep->rep_attr.srq = NULL;
748 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
749 	switch (ia->ri_memreg_strategy) {
750 	case RPCRDMA_FRMR: {
751 		int depth = 7;
752 
753 		/* Add room for frmr register and invalidate WRs.
754 		 * 1. FRMR reg WR for head
755 		 * 2. FRMR invalidate WR for head
756 		 * 3. N FRMR reg WRs for pagelist
757 		 * 4. N FRMR invalidate WRs for pagelist
758 		 * 5. FRMR reg WR for tail
759 		 * 6. FRMR invalidate WR for tail
760 		 * 7. The RDMA_SEND WR
761 		 */
762 
763 		/* Calculate N if the device max FRMR depth is smaller than
764 		 * RPCRDMA_MAX_DATA_SEGS.
765 		 */
766 		if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
767 			int delta = RPCRDMA_MAX_DATA_SEGS -
768 				    ia->ri_max_frmr_depth;
769 
770 			do {
771 				depth += 2; /* FRMR reg + invalidate */
772 				delta -= ia->ri_max_frmr_depth;
773 			} while (delta > 0);
774 
775 		}
776 		ep->rep_attr.cap.max_send_wr *= depth;
777 		if (ep->rep_attr.cap.max_send_wr > devattr->max_qp_wr) {
778 			cdata->max_requests = devattr->max_qp_wr / depth;
779 			if (!cdata->max_requests)
780 				return -EINVAL;
781 			ep->rep_attr.cap.max_send_wr = cdata->max_requests *
782 						       depth;
783 		}
784 		break;
785 	}
786 	default:
787 		break;
788 	}
789 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
790 	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
791 	ep->rep_attr.cap.max_recv_sge = 1;
792 	ep->rep_attr.cap.max_inline_data = 0;
793 	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
794 	ep->rep_attr.qp_type = IB_QPT_RC;
795 	ep->rep_attr.port_num = ~0;
796 
797 	if (cdata->padding) {
798 		ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
799 						      GFP_KERNEL);
800 		if (IS_ERR(ep->rep_padbuf))
801 			return PTR_ERR(ep->rep_padbuf);
802 	} else
803 		ep->rep_padbuf = NULL;
804 
805 	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
806 		"iovs: send %d recv %d\n",
807 		__func__,
808 		ep->rep_attr.cap.max_send_wr,
809 		ep->rep_attr.cap.max_recv_wr,
810 		ep->rep_attr.cap.max_send_sge,
811 		ep->rep_attr.cap.max_recv_sge);
812 
813 	/* set trigger for requesting send completion */
814 	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
815 	if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
816 		ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
817 	else if (ep->rep_cqinit <= 2)
818 		ep->rep_cqinit = 0;
819 	INIT_CQCOUNT(ep);
820 	init_waitqueue_head(&ep->rep_connect_wait);
821 	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
822 
823 	sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
824 				  rpcrdma_cq_async_error_upcall, ep,
825 				  ep->rep_attr.cap.max_send_wr + 1, 0);
826 	if (IS_ERR(sendcq)) {
827 		rc = PTR_ERR(sendcq);
828 		dprintk("RPC:       %s: failed to create send CQ: %i\n",
829 			__func__, rc);
830 		goto out1;
831 	}
832 
833 	rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
834 	if (rc) {
835 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
836 			__func__, rc);
837 		goto out2;
838 	}
839 
840 	recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
841 				  rpcrdma_cq_async_error_upcall, ep,
842 				  ep->rep_attr.cap.max_recv_wr + 1, 0);
843 	if (IS_ERR(recvcq)) {
844 		rc = PTR_ERR(recvcq);
845 		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
846 			__func__, rc);
847 		goto out2;
848 	}
849 
850 	rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
851 	if (rc) {
852 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
853 			__func__, rc);
854 		ib_destroy_cq(recvcq);
855 		goto out2;
856 	}
857 
858 	ep->rep_attr.send_cq = sendcq;
859 	ep->rep_attr.recv_cq = recvcq;
860 
861 	/* Initialize cma parameters */
862 
863 	/* RPC/RDMA does not use private data */
864 	ep->rep_remote_cma.private_data = NULL;
865 	ep->rep_remote_cma.private_data_len = 0;
866 
867 	/* Client offers RDMA Read but does not initiate */
868 	ep->rep_remote_cma.initiator_depth = 0;
869 	if (devattr->max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
870 		ep->rep_remote_cma.responder_resources = 32;
871 	else
872 		ep->rep_remote_cma.responder_resources =
873 						devattr->max_qp_rd_atom;
874 
875 	ep->rep_remote_cma.retry_count = 7;
876 	ep->rep_remote_cma.flow_control = 0;
877 	ep->rep_remote_cma.rnr_retry_count = 0;
878 
879 	return 0;
880 
881 out2:
882 	err = ib_destroy_cq(sendcq);
883 	if (err)
884 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
885 			__func__, err);
886 out1:
887 	rpcrdma_free_regbuf(ia, ep->rep_padbuf);
888 	return rc;
889 }
890 
891 /*
892  * rpcrdma_ep_destroy
893  *
894  * Disconnect and destroy endpoint. After this, the only
895  * valid operations on the ep are to free it (if dynamically
896  * allocated) or re-create it.
897  */
898 void
899 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
900 {
901 	int rc;
902 
903 	dprintk("RPC:       %s: entering, connected is %d\n",
904 		__func__, ep->rep_connected);
905 
906 	cancel_delayed_work_sync(&ep->rep_connect_worker);
907 
908 	if (ia->ri_id->qp) {
909 		rpcrdma_ep_disconnect(ep, ia);
910 		rdma_destroy_qp(ia->ri_id);
911 		ia->ri_id->qp = NULL;
912 	}
913 
914 	rpcrdma_free_regbuf(ia, ep->rep_padbuf);
915 
916 	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
917 	rc = ib_destroy_cq(ep->rep_attr.recv_cq);
918 	if (rc)
919 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
920 			__func__, rc);
921 
922 	rpcrdma_clean_cq(ep->rep_attr.send_cq);
923 	rc = ib_destroy_cq(ep->rep_attr.send_cq);
924 	if (rc)
925 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
926 			__func__, rc);
927 }
928 
929 /*
930  * Connect unconnected endpoint.
931  */
932 int
933 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
934 {
935 	struct rdma_cm_id *id, *old;
936 	int rc = 0;
937 	int retry_count = 0;
938 
939 	if (ep->rep_connected != 0) {
940 		struct rpcrdma_xprt *xprt;
941 retry:
942 		dprintk("RPC:       %s: reconnecting...\n", __func__);
943 
944 		rpcrdma_ep_disconnect(ep, ia);
945 		rpcrdma_flush_cqs(ep);
946 
947 		switch (ia->ri_memreg_strategy) {
948 		case RPCRDMA_FRMR:
949 			rpcrdma_reset_frmrs(ia);
950 			break;
951 		case RPCRDMA_MTHCAFMR:
952 			rpcrdma_reset_fmrs(ia);
953 			break;
954 		case RPCRDMA_ALLPHYSICAL:
955 			break;
956 		default:
957 			rc = -EIO;
958 			goto out;
959 		}
960 
961 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
962 		id = rpcrdma_create_id(xprt, ia,
963 				(struct sockaddr *)&xprt->rx_data.addr);
964 		if (IS_ERR(id)) {
965 			rc = -EHOSTUNREACH;
966 			goto out;
967 		}
968 		/* TEMP TEMP TEMP - fail if new device:
969 		 * Deregister/remarshal *all* requests!
970 		 * Close and recreate adapter, pd, etc!
971 		 * Re-determine all attributes still sane!
972 		 * More stuff I haven't thought of!
973 		 * Rrrgh!
974 		 */
975 		if (ia->ri_id->device != id->device) {
976 			printk("RPC:       %s: can't reconnect on "
977 				"different device!\n", __func__);
978 			rdma_destroy_id(id);
979 			rc = -ENETUNREACH;
980 			goto out;
981 		}
982 		/* END TEMP */
983 		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
984 		if (rc) {
985 			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
986 				__func__, rc);
987 			rdma_destroy_id(id);
988 			rc = -ENETUNREACH;
989 			goto out;
990 		}
991 
992 		write_lock(&ia->ri_qplock);
993 		old = ia->ri_id;
994 		ia->ri_id = id;
995 		write_unlock(&ia->ri_qplock);
996 
997 		rdma_destroy_qp(old);
998 		rdma_destroy_id(old);
999 	} else {
1000 		dprintk("RPC:       %s: connecting...\n", __func__);
1001 		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
1002 		if (rc) {
1003 			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
1004 				__func__, rc);
1005 			/* do not update ep->rep_connected */
1006 			return -ENETUNREACH;
1007 		}
1008 	}
1009 
1010 	ep->rep_connected = 0;
1011 
1012 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
1013 	if (rc) {
1014 		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
1015 				__func__, rc);
1016 		goto out;
1017 	}
1018 
1019 	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
1020 
1021 	/*
1022 	 * Check state. A non-peer reject indicates no listener
1023 	 * (ECONNREFUSED), which may be a transient state. All
1024 	 * others indicate a transport condition which has already
1025 	 * undergone a best-effort.
1026 	 */
1027 	if (ep->rep_connected == -ECONNREFUSED &&
1028 	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
1029 		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
1030 		goto retry;
1031 	}
1032 	if (ep->rep_connected <= 0) {
1033 		/* Sometimes, the only way to reliably connect to remote
1034 		 * CMs is to use same nonzero values for ORD and IRD. */
1035 		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
1036 		    (ep->rep_remote_cma.responder_resources == 0 ||
1037 		     ep->rep_remote_cma.initiator_depth !=
1038 				ep->rep_remote_cma.responder_resources)) {
1039 			if (ep->rep_remote_cma.responder_resources == 0)
1040 				ep->rep_remote_cma.responder_resources = 1;
1041 			ep->rep_remote_cma.initiator_depth =
1042 				ep->rep_remote_cma.responder_resources;
1043 			goto retry;
1044 		}
1045 		rc = ep->rep_connected;
1046 	} else {
1047 		dprintk("RPC:       %s: connected\n", __func__);
1048 	}
1049 
1050 out:
1051 	if (rc)
1052 		ep->rep_connected = rc;
1053 	return rc;
1054 }
1055 
1056 /*
1057  * rpcrdma_ep_disconnect
1058  *
1059  * This is separate from destroy to facilitate the ability
1060  * to reconnect without recreating the endpoint.
1061  *
1062  * This call is not reentrant, and must not be made in parallel
1063  * on the same endpoint.
1064  */
1065 void
1066 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
1067 {
1068 	int rc;
1069 
1070 	rpcrdma_flush_cqs(ep);
1071 	rc = rdma_disconnect(ia->ri_id);
1072 	if (!rc) {
1073 		/* returns without wait if not connected */
1074 		wait_event_interruptible(ep->rep_connect_wait,
1075 							ep->rep_connected != 1);
1076 		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
1077 			(ep->rep_connected == 1) ? "still " : "dis");
1078 	} else {
1079 		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
1080 		ep->rep_connected = rc;
1081 	}
1082 }
1083 
1084 static struct rpcrdma_req *
1085 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
1086 {
1087 	struct rpcrdma_req *req;
1088 
1089 	req = kzalloc(sizeof(*req), GFP_KERNEL);
1090 	if (req == NULL)
1091 		return ERR_PTR(-ENOMEM);
1092 
1093 	req->rl_buffer = &r_xprt->rx_buf;
1094 	return req;
1095 }
1096 
1097 static struct rpcrdma_rep *
1098 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
1099 {
1100 	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1101 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1102 	struct rpcrdma_rep *rep;
1103 	int rc;
1104 
1105 	rc = -ENOMEM;
1106 	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
1107 	if (rep == NULL)
1108 		goto out;
1109 
1110 	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
1111 					       GFP_KERNEL);
1112 	if (IS_ERR(rep->rr_rdmabuf)) {
1113 		rc = PTR_ERR(rep->rr_rdmabuf);
1114 		goto out_free;
1115 	}
1116 
1117 	rep->rr_buffer = &r_xprt->rx_buf;
1118 	return rep;
1119 
1120 out_free:
1121 	kfree(rep);
1122 out:
1123 	return ERR_PTR(rc);
1124 }
1125 
1126 static int
1127 rpcrdma_init_fmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
1128 {
1129 	int mr_access_flags = IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ;
1130 	struct ib_fmr_attr fmr_attr = {
1131 		.max_pages	= RPCRDMA_MAX_DATA_SEGS,
1132 		.max_maps	= 1,
1133 		.page_shift	= PAGE_SHIFT
1134 	};
1135 	struct rpcrdma_mw *r;
1136 	int i, rc;
1137 
1138 	i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
1139 	dprintk("RPC:       %s: initalizing %d FMRs\n", __func__, i);
1140 
1141 	while (i--) {
1142 		r = kzalloc(sizeof(*r), GFP_KERNEL);
1143 		if (r == NULL)
1144 			return -ENOMEM;
1145 
1146 		r->r.fmr = ib_alloc_fmr(ia->ri_pd, mr_access_flags, &fmr_attr);
1147 		if (IS_ERR(r->r.fmr)) {
1148 			rc = PTR_ERR(r->r.fmr);
1149 			dprintk("RPC:       %s: ib_alloc_fmr failed %i\n",
1150 				__func__, rc);
1151 			goto out_free;
1152 		}
1153 
1154 		list_add(&r->mw_list, &buf->rb_mws);
1155 		list_add(&r->mw_all, &buf->rb_all);
1156 	}
1157 	return 0;
1158 
1159 out_free:
1160 	kfree(r);
1161 	return rc;
1162 }
1163 
1164 static int
1165 rpcrdma_init_frmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
1166 {
1167 	struct rpcrdma_frmr *f;
1168 	struct rpcrdma_mw *r;
1169 	int i, rc;
1170 
1171 	i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
1172 	dprintk("RPC:       %s: initalizing %d FRMRs\n", __func__, i);
1173 
1174 	while (i--) {
1175 		r = kzalloc(sizeof(*r), GFP_KERNEL);
1176 		if (r == NULL)
1177 			return -ENOMEM;
1178 		f = &r->r.frmr;
1179 
1180 		f->fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1181 						ia->ri_max_frmr_depth);
1182 		if (IS_ERR(f->fr_mr)) {
1183 			rc = PTR_ERR(f->fr_mr);
1184 			dprintk("RPC:       %s: ib_alloc_fast_reg_mr "
1185 				"failed %i\n", __func__, rc);
1186 			goto out_free;
1187 		}
1188 
1189 		f->fr_pgl = ib_alloc_fast_reg_page_list(ia->ri_id->device,
1190 							ia->ri_max_frmr_depth);
1191 		if (IS_ERR(f->fr_pgl)) {
1192 			rc = PTR_ERR(f->fr_pgl);
1193 			dprintk("RPC:       %s: ib_alloc_fast_reg_page_list "
1194 				"failed %i\n", __func__, rc);
1195 
1196 			ib_dereg_mr(f->fr_mr);
1197 			goto out_free;
1198 		}
1199 
1200 		list_add(&r->mw_list, &buf->rb_mws);
1201 		list_add(&r->mw_all, &buf->rb_all);
1202 	}
1203 
1204 	return 0;
1205 
1206 out_free:
1207 	kfree(r);
1208 	return rc;
1209 }
1210 
1211 int
1212 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
1213 {
1214 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1215 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1216 	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
1217 	char *p;
1218 	size_t len;
1219 	int i, rc;
1220 
1221 	buf->rb_max_requests = cdata->max_requests;
1222 	spin_lock_init(&buf->rb_lock);
1223 
1224 	/* Need to allocate:
1225 	 *   1.  arrays for send and recv pointers
1226 	 *   2.  arrays of struct rpcrdma_req to fill in pointers
1227 	 *   3.  array of struct rpcrdma_rep for replies
1228 	 * Send/recv buffers in req/rep need to be registered
1229 	 */
1230 	len = buf->rb_max_requests *
1231 		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1232 
1233 	p = kzalloc(len, GFP_KERNEL);
1234 	if (p == NULL) {
1235 		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1236 			__func__, len);
1237 		rc = -ENOMEM;
1238 		goto out;
1239 	}
1240 	buf->rb_pool = p;	/* for freeing it later */
1241 
1242 	buf->rb_send_bufs = (struct rpcrdma_req **) p;
1243 	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1244 	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1245 	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1246 
1247 	INIT_LIST_HEAD(&buf->rb_mws);
1248 	INIT_LIST_HEAD(&buf->rb_all);
1249 	switch (ia->ri_memreg_strategy) {
1250 	case RPCRDMA_FRMR:
1251 		rc = rpcrdma_init_frmrs(ia, buf);
1252 		if (rc)
1253 			goto out;
1254 		break;
1255 	case RPCRDMA_MTHCAFMR:
1256 		rc = rpcrdma_init_fmrs(ia, buf);
1257 		if (rc)
1258 			goto out;
1259 		break;
1260 	default:
1261 		break;
1262 	}
1263 
1264 	for (i = 0; i < buf->rb_max_requests; i++) {
1265 		struct rpcrdma_req *req;
1266 		struct rpcrdma_rep *rep;
1267 
1268 		req = rpcrdma_create_req(r_xprt);
1269 		if (IS_ERR(req)) {
1270 			dprintk("RPC:       %s: request buffer %d alloc"
1271 				" failed\n", __func__, i);
1272 			rc = PTR_ERR(req);
1273 			goto out;
1274 		}
1275 		buf->rb_send_bufs[i] = req;
1276 
1277 		rep = rpcrdma_create_rep(r_xprt);
1278 		if (IS_ERR(rep)) {
1279 			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1280 				__func__, i);
1281 			rc = PTR_ERR(rep);
1282 			goto out;
1283 		}
1284 		buf->rb_recv_bufs[i] = rep;
1285 	}
1286 
1287 	return 0;
1288 out:
1289 	rpcrdma_buffer_destroy(buf);
1290 	return rc;
1291 }
1292 
1293 static void
1294 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
1295 {
1296 	if (!rep)
1297 		return;
1298 
1299 	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
1300 	kfree(rep);
1301 }
1302 
1303 static void
1304 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
1305 {
1306 	if (!req)
1307 		return;
1308 
1309 	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
1310 	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
1311 	kfree(req);
1312 }
1313 
1314 static void
1315 rpcrdma_destroy_fmrs(struct rpcrdma_buffer *buf)
1316 {
1317 	struct rpcrdma_mw *r;
1318 	int rc;
1319 
1320 	while (!list_empty(&buf->rb_all)) {
1321 		r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
1322 		list_del(&r->mw_all);
1323 		list_del(&r->mw_list);
1324 
1325 		rc = ib_dealloc_fmr(r->r.fmr);
1326 		if (rc)
1327 			dprintk("RPC:       %s: ib_dealloc_fmr failed %i\n",
1328 				__func__, rc);
1329 
1330 		kfree(r);
1331 	}
1332 }
1333 
1334 static void
1335 rpcrdma_destroy_frmrs(struct rpcrdma_buffer *buf)
1336 {
1337 	struct rpcrdma_mw *r;
1338 	int rc;
1339 
1340 	while (!list_empty(&buf->rb_all)) {
1341 		r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
1342 		list_del(&r->mw_all);
1343 		list_del(&r->mw_list);
1344 
1345 		rc = ib_dereg_mr(r->r.frmr.fr_mr);
1346 		if (rc)
1347 			dprintk("RPC:       %s: ib_dereg_mr failed %i\n",
1348 				__func__, rc);
1349 		ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1350 
1351 		kfree(r);
1352 	}
1353 }
1354 
1355 void
1356 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1357 {
1358 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1359 	int i;
1360 
1361 	/* clean up in reverse order from create
1362 	 *   1.  recv mr memory (mr free, then kfree)
1363 	 *   2.  send mr memory (mr free, then kfree)
1364 	 *   3.  MWs
1365 	 */
1366 	dprintk("RPC:       %s: entering\n", __func__);
1367 
1368 	for (i = 0; i < buf->rb_max_requests; i++) {
1369 		if (buf->rb_recv_bufs)
1370 			rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
1371 		if (buf->rb_send_bufs)
1372 			rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
1373 	}
1374 
1375 	switch (ia->ri_memreg_strategy) {
1376 	case RPCRDMA_FRMR:
1377 		rpcrdma_destroy_frmrs(buf);
1378 		break;
1379 	case RPCRDMA_MTHCAFMR:
1380 		rpcrdma_destroy_fmrs(buf);
1381 		break;
1382 	default:
1383 		break;
1384 	}
1385 
1386 	kfree(buf->rb_pool);
1387 }
1388 
1389 /* After a disconnect, unmap all FMRs.
1390  *
1391  * This is invoked only in the transport connect worker in order
1392  * to serialize with rpcrdma_register_fmr_external().
1393  */
1394 static void
1395 rpcrdma_reset_fmrs(struct rpcrdma_ia *ia)
1396 {
1397 	struct rpcrdma_xprt *r_xprt =
1398 				container_of(ia, struct rpcrdma_xprt, rx_ia);
1399 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1400 	struct list_head *pos;
1401 	struct rpcrdma_mw *r;
1402 	LIST_HEAD(l);
1403 	int rc;
1404 
1405 	list_for_each(pos, &buf->rb_all) {
1406 		r = list_entry(pos, struct rpcrdma_mw, mw_all);
1407 
1408 		INIT_LIST_HEAD(&l);
1409 		list_add(&r->r.fmr->list, &l);
1410 		rc = ib_unmap_fmr(&l);
1411 		if (rc)
1412 			dprintk("RPC:       %s: ib_unmap_fmr failed %i\n",
1413 				__func__, rc);
1414 	}
1415 }
1416 
1417 /* After a disconnect, a flushed FAST_REG_MR can leave an FRMR in
1418  * an unusable state. Find FRMRs in this state and dereg / reg
1419  * each.  FRMRs that are VALID and attached to an rpcrdma_req are
1420  * also torn down.
1421  *
1422  * This gives all in-use FRMRs a fresh rkey and leaves them INVALID.
1423  *
1424  * This is invoked only in the transport connect worker in order
1425  * to serialize with rpcrdma_register_frmr_external().
1426  */
1427 static void
1428 rpcrdma_reset_frmrs(struct rpcrdma_ia *ia)
1429 {
1430 	struct rpcrdma_xprt *r_xprt =
1431 				container_of(ia, struct rpcrdma_xprt, rx_ia);
1432 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1433 	struct list_head *pos;
1434 	struct rpcrdma_mw *r;
1435 	int rc;
1436 
1437 	list_for_each(pos, &buf->rb_all) {
1438 		r = list_entry(pos, struct rpcrdma_mw, mw_all);
1439 
1440 		if (r->r.frmr.fr_state == FRMR_IS_INVALID)
1441 			continue;
1442 
1443 		rc = ib_dereg_mr(r->r.frmr.fr_mr);
1444 		if (rc)
1445 			dprintk("RPC:       %s: ib_dereg_mr failed %i\n",
1446 				__func__, rc);
1447 		ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1448 
1449 		r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1450 					ia->ri_max_frmr_depth);
1451 		if (IS_ERR(r->r.frmr.fr_mr)) {
1452 			rc = PTR_ERR(r->r.frmr.fr_mr);
1453 			dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1454 				" failed %i\n", __func__, rc);
1455 			continue;
1456 		}
1457 		r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1458 					ia->ri_id->device,
1459 					ia->ri_max_frmr_depth);
1460 		if (IS_ERR(r->r.frmr.fr_pgl)) {
1461 			rc = PTR_ERR(r->r.frmr.fr_pgl);
1462 			dprintk("RPC:       %s: "
1463 				"ib_alloc_fast_reg_page_list "
1464 				"failed %i\n", __func__, rc);
1465 
1466 			ib_dereg_mr(r->r.frmr.fr_mr);
1467 			continue;
1468 		}
1469 		r->r.frmr.fr_state = FRMR_IS_INVALID;
1470 	}
1471 }
1472 
1473 /* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1474  * some req segments uninitialized.
1475  */
1476 static void
1477 rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1478 {
1479 	if (*mw) {
1480 		list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1481 		*mw = NULL;
1482 	}
1483 }
1484 
1485 /* Cycle mw's back in reverse order, and "spin" them.
1486  * This delays and scrambles reuse as much as possible.
1487  */
1488 static void
1489 rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1490 {
1491 	struct rpcrdma_mr_seg *seg = req->rl_segments;
1492 	struct rpcrdma_mr_seg *seg1 = seg;
1493 	int i;
1494 
1495 	for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1496 		rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
1497 	rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
1498 }
1499 
1500 static void
1501 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1502 {
1503 	buf->rb_send_bufs[--buf->rb_send_index] = req;
1504 	req->rl_niovs = 0;
1505 	if (req->rl_reply) {
1506 		buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1507 		req->rl_reply->rr_func = NULL;
1508 		req->rl_reply = NULL;
1509 	}
1510 }
1511 
1512 /* rpcrdma_unmap_one() was already done by rpcrdma_deregister_frmr_external().
1513  * Redo only the ib_post_send().
1514  */
1515 static void
1516 rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
1517 {
1518 	struct rpcrdma_xprt *r_xprt =
1519 				container_of(ia, struct rpcrdma_xprt, rx_ia);
1520 	struct ib_send_wr invalidate_wr, *bad_wr;
1521 	int rc;
1522 
1523 	dprintk("RPC:       %s: FRMR %p is stale\n", __func__, r);
1524 
1525 	/* When this FRMR is re-inserted into rb_mws, it is no longer stale */
1526 	r->r.frmr.fr_state = FRMR_IS_INVALID;
1527 
1528 	memset(&invalidate_wr, 0, sizeof(invalidate_wr));
1529 	invalidate_wr.wr_id = (unsigned long)(void *)r;
1530 	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1531 	invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
1532 	DECR_CQCOUNT(&r_xprt->rx_ep);
1533 
1534 	dprintk("RPC:       %s: frmr %p invalidating rkey %08x\n",
1535 		__func__, r, r->r.frmr.fr_mr->rkey);
1536 
1537 	read_lock(&ia->ri_qplock);
1538 	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1539 	read_unlock(&ia->ri_qplock);
1540 	if (rc) {
1541 		/* Force rpcrdma_buffer_get() to retry */
1542 		r->r.frmr.fr_state = FRMR_IS_STALE;
1543 		dprintk("RPC:       %s: ib_post_send failed, %i\n",
1544 			__func__, rc);
1545 	}
1546 }
1547 
1548 static void
1549 rpcrdma_retry_flushed_linv(struct list_head *stale,
1550 			   struct rpcrdma_buffer *buf)
1551 {
1552 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1553 	struct list_head *pos;
1554 	struct rpcrdma_mw *r;
1555 	unsigned long flags;
1556 
1557 	list_for_each(pos, stale) {
1558 		r = list_entry(pos, struct rpcrdma_mw, mw_list);
1559 		rpcrdma_retry_local_inv(r, ia);
1560 	}
1561 
1562 	spin_lock_irqsave(&buf->rb_lock, flags);
1563 	list_splice_tail(stale, &buf->rb_mws);
1564 	spin_unlock_irqrestore(&buf->rb_lock, flags);
1565 }
1566 
1567 static struct rpcrdma_req *
1568 rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
1569 			 struct list_head *stale)
1570 {
1571 	struct rpcrdma_mw *r;
1572 	int i;
1573 
1574 	i = RPCRDMA_MAX_SEGS - 1;
1575 	while (!list_empty(&buf->rb_mws)) {
1576 		r = list_entry(buf->rb_mws.next,
1577 			       struct rpcrdma_mw, mw_list);
1578 		list_del(&r->mw_list);
1579 		if (r->r.frmr.fr_state == FRMR_IS_STALE) {
1580 			list_add(&r->mw_list, stale);
1581 			continue;
1582 		}
1583 		req->rl_segments[i].rl_mw = r;
1584 		if (unlikely(i-- == 0))
1585 			return req;	/* Success */
1586 	}
1587 
1588 	/* Not enough entries on rb_mws for this req */
1589 	rpcrdma_buffer_put_sendbuf(req, buf);
1590 	rpcrdma_buffer_put_mrs(req, buf);
1591 	return NULL;
1592 }
1593 
1594 static struct rpcrdma_req *
1595 rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1596 {
1597 	struct rpcrdma_mw *r;
1598 	int i;
1599 
1600 	i = RPCRDMA_MAX_SEGS - 1;
1601 	while (!list_empty(&buf->rb_mws)) {
1602 		r = list_entry(buf->rb_mws.next,
1603 			       struct rpcrdma_mw, mw_list);
1604 		list_del(&r->mw_list);
1605 		req->rl_segments[i].rl_mw = r;
1606 		if (unlikely(i-- == 0))
1607 			return req;	/* Success */
1608 	}
1609 
1610 	/* Not enough entries on rb_mws for this req */
1611 	rpcrdma_buffer_put_sendbuf(req, buf);
1612 	rpcrdma_buffer_put_mrs(req, buf);
1613 	return NULL;
1614 }
1615 
1616 /*
1617  * Get a set of request/reply buffers.
1618  *
1619  * Reply buffer (if needed) is attached to send buffer upon return.
1620  * Rule:
1621  *    rb_send_index and rb_recv_index MUST always be pointing to the
1622  *    *next* available buffer (non-NULL). They are incremented after
1623  *    removing buffers, and decremented *before* returning them.
1624  */
1625 struct rpcrdma_req *
1626 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1627 {
1628 	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1629 	struct list_head stale;
1630 	struct rpcrdma_req *req;
1631 	unsigned long flags;
1632 
1633 	spin_lock_irqsave(&buffers->rb_lock, flags);
1634 	if (buffers->rb_send_index == buffers->rb_max_requests) {
1635 		spin_unlock_irqrestore(&buffers->rb_lock, flags);
1636 		dprintk("RPC:       %s: out of request buffers\n", __func__);
1637 		return ((struct rpcrdma_req *)NULL);
1638 	}
1639 
1640 	req = buffers->rb_send_bufs[buffers->rb_send_index];
1641 	if (buffers->rb_send_index < buffers->rb_recv_index) {
1642 		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1643 			__func__,
1644 			buffers->rb_recv_index - buffers->rb_send_index);
1645 		req->rl_reply = NULL;
1646 	} else {
1647 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1648 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1649 	}
1650 	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1651 
1652 	INIT_LIST_HEAD(&stale);
1653 	switch (ia->ri_memreg_strategy) {
1654 	case RPCRDMA_FRMR:
1655 		req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
1656 		break;
1657 	case RPCRDMA_MTHCAFMR:
1658 		req = rpcrdma_buffer_get_fmrs(req, buffers);
1659 		break;
1660 	default:
1661 		break;
1662 	}
1663 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1664 	if (!list_empty(&stale))
1665 		rpcrdma_retry_flushed_linv(&stale, buffers);
1666 	return req;
1667 }
1668 
1669 /*
1670  * Put request/reply buffers back into pool.
1671  * Pre-decrement counter/array index.
1672  */
1673 void
1674 rpcrdma_buffer_put(struct rpcrdma_req *req)
1675 {
1676 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1677 	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1678 	unsigned long flags;
1679 
1680 	spin_lock_irqsave(&buffers->rb_lock, flags);
1681 	rpcrdma_buffer_put_sendbuf(req, buffers);
1682 	switch (ia->ri_memreg_strategy) {
1683 	case RPCRDMA_FRMR:
1684 	case RPCRDMA_MTHCAFMR:
1685 		rpcrdma_buffer_put_mrs(req, buffers);
1686 		break;
1687 	default:
1688 		break;
1689 	}
1690 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1691 }
1692 
1693 /*
1694  * Recover reply buffers from pool.
1695  * This happens when recovering from error conditions.
1696  * Post-increment counter/array index.
1697  */
1698 void
1699 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1700 {
1701 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1702 	unsigned long flags;
1703 
1704 	spin_lock_irqsave(&buffers->rb_lock, flags);
1705 	if (buffers->rb_recv_index < buffers->rb_max_requests) {
1706 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1707 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1708 	}
1709 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1710 }
1711 
1712 /*
1713  * Put reply buffers back into pool when not attached to
1714  * request. This happens in error conditions.
1715  */
1716 void
1717 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1718 {
1719 	struct rpcrdma_buffer *buffers = rep->rr_buffer;
1720 	unsigned long flags;
1721 
1722 	rep->rr_func = NULL;
1723 	spin_lock_irqsave(&buffers->rb_lock, flags);
1724 	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1725 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1726 }
1727 
1728 /*
1729  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1730  */
1731 
1732 static int
1733 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1734 				struct ib_mr **mrp, struct ib_sge *iov)
1735 {
1736 	struct ib_phys_buf ipb;
1737 	struct ib_mr *mr;
1738 	int rc;
1739 
1740 	/*
1741 	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1742 	 */
1743 	iov->addr = ib_dma_map_single(ia->ri_id->device,
1744 			va, len, DMA_BIDIRECTIONAL);
1745 	if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1746 		return -ENOMEM;
1747 
1748 	iov->length = len;
1749 
1750 	if (ia->ri_have_dma_lkey) {
1751 		*mrp = NULL;
1752 		iov->lkey = ia->ri_dma_lkey;
1753 		return 0;
1754 	} else if (ia->ri_bind_mem != NULL) {
1755 		*mrp = NULL;
1756 		iov->lkey = ia->ri_bind_mem->lkey;
1757 		return 0;
1758 	}
1759 
1760 	ipb.addr = iov->addr;
1761 	ipb.size = iov->length;
1762 	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1763 			IB_ACCESS_LOCAL_WRITE, &iov->addr);
1764 
1765 	dprintk("RPC:       %s: phys convert: 0x%llx "
1766 			"registered 0x%llx length %d\n",
1767 			__func__, (unsigned long long)ipb.addr,
1768 			(unsigned long long)iov->addr, len);
1769 
1770 	if (IS_ERR(mr)) {
1771 		*mrp = NULL;
1772 		rc = PTR_ERR(mr);
1773 		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1774 	} else {
1775 		*mrp = mr;
1776 		iov->lkey = mr->lkey;
1777 		rc = 0;
1778 	}
1779 
1780 	return rc;
1781 }
1782 
1783 static int
1784 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1785 				struct ib_mr *mr, struct ib_sge *iov)
1786 {
1787 	int rc;
1788 
1789 	ib_dma_unmap_single(ia->ri_id->device,
1790 			iov->addr, iov->length, DMA_BIDIRECTIONAL);
1791 
1792 	if (NULL == mr)
1793 		return 0;
1794 
1795 	rc = ib_dereg_mr(mr);
1796 	if (rc)
1797 		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1798 	return rc;
1799 }
1800 
1801 /**
1802  * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1803  * @ia: controlling rpcrdma_ia
1804  * @size: size of buffer to be allocated, in bytes
1805  * @flags: GFP flags
1806  *
1807  * Returns pointer to private header of an area of internally
1808  * registered memory, or an ERR_PTR. The registered buffer follows
1809  * the end of the private header.
1810  *
1811  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1812  * receiving the payload of RDMA RECV operations. regbufs are not
1813  * used for RDMA READ/WRITE operations, thus are registered only for
1814  * LOCAL access.
1815  */
1816 struct rpcrdma_regbuf *
1817 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1818 {
1819 	struct rpcrdma_regbuf *rb;
1820 	int rc;
1821 
1822 	rc = -ENOMEM;
1823 	rb = kmalloc(sizeof(*rb) + size, flags);
1824 	if (rb == NULL)
1825 		goto out;
1826 
1827 	rb->rg_size = size;
1828 	rb->rg_owner = NULL;
1829 	rc = rpcrdma_register_internal(ia, rb->rg_base, size,
1830 				       &rb->rg_mr, &rb->rg_iov);
1831 	if (rc)
1832 		goto out_free;
1833 
1834 	return rb;
1835 
1836 out_free:
1837 	kfree(rb);
1838 out:
1839 	return ERR_PTR(rc);
1840 }
1841 
1842 /**
1843  * rpcrdma_free_regbuf - deregister and free registered buffer
1844  * @ia: controlling rpcrdma_ia
1845  * @rb: regbuf to be deregistered and freed
1846  */
1847 void
1848 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1849 {
1850 	if (rb) {
1851 		rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
1852 		kfree(rb);
1853 	}
1854 }
1855 
1856 /*
1857  * Wrappers for chunk registration, shared by read/write chunk code.
1858  */
1859 
1860 static void
1861 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1862 {
1863 	seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1864 	seg->mr_dmalen = seg->mr_len;
1865 	if (seg->mr_page)
1866 		seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1867 				seg->mr_page, offset_in_page(seg->mr_offset),
1868 				seg->mr_dmalen, seg->mr_dir);
1869 	else
1870 		seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1871 				seg->mr_offset,
1872 				seg->mr_dmalen, seg->mr_dir);
1873 	if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1874 		dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1875 			__func__,
1876 			(unsigned long long)seg->mr_dma,
1877 			seg->mr_offset, seg->mr_dmalen);
1878 	}
1879 }
1880 
1881 static void
1882 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1883 {
1884 	if (seg->mr_page)
1885 		ib_dma_unmap_page(ia->ri_id->device,
1886 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1887 	else
1888 		ib_dma_unmap_single(ia->ri_id->device,
1889 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1890 }
1891 
1892 static int
1893 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1894 			int *nsegs, int writing, struct rpcrdma_ia *ia,
1895 			struct rpcrdma_xprt *r_xprt)
1896 {
1897 	struct rpcrdma_mr_seg *seg1 = seg;
1898 	struct rpcrdma_mw *mw = seg1->rl_mw;
1899 	struct rpcrdma_frmr *frmr = &mw->r.frmr;
1900 	struct ib_mr *mr = frmr->fr_mr;
1901 	struct ib_send_wr fastreg_wr, *bad_wr;
1902 	u8 key;
1903 	int len, pageoff;
1904 	int i, rc;
1905 	int seg_len;
1906 	u64 pa;
1907 	int page_no;
1908 
1909 	pageoff = offset_in_page(seg1->mr_offset);
1910 	seg1->mr_offset -= pageoff;	/* start of page */
1911 	seg1->mr_len += pageoff;
1912 	len = -pageoff;
1913 	if (*nsegs > ia->ri_max_frmr_depth)
1914 		*nsegs = ia->ri_max_frmr_depth;
1915 	for (page_no = i = 0; i < *nsegs;) {
1916 		rpcrdma_map_one(ia, seg, writing);
1917 		pa = seg->mr_dma;
1918 		for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1919 			frmr->fr_pgl->page_list[page_no++] = pa;
1920 			pa += PAGE_SIZE;
1921 		}
1922 		len += seg->mr_len;
1923 		++seg;
1924 		++i;
1925 		/* Check for holes */
1926 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1927 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1928 			break;
1929 	}
1930 	dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1931 		__func__, mw, i);
1932 
1933 	frmr->fr_state = FRMR_IS_VALID;
1934 
1935 	memset(&fastreg_wr, 0, sizeof(fastreg_wr));
1936 	fastreg_wr.wr_id = (unsigned long)(void *)mw;
1937 	fastreg_wr.opcode = IB_WR_FAST_REG_MR;
1938 	fastreg_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1939 	fastreg_wr.wr.fast_reg.page_list = frmr->fr_pgl;
1940 	fastreg_wr.wr.fast_reg.page_list_len = page_no;
1941 	fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1942 	fastreg_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1943 	if (fastreg_wr.wr.fast_reg.length < len) {
1944 		rc = -EIO;
1945 		goto out_err;
1946 	}
1947 
1948 	/* Bump the key */
1949 	key = (u8)(mr->rkey & 0x000000FF);
1950 	ib_update_fast_reg_key(mr, ++key);
1951 
1952 	fastreg_wr.wr.fast_reg.access_flags = (writing ?
1953 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1954 				IB_ACCESS_REMOTE_READ);
1955 	fastreg_wr.wr.fast_reg.rkey = mr->rkey;
1956 	DECR_CQCOUNT(&r_xprt->rx_ep);
1957 
1958 	rc = ib_post_send(ia->ri_id->qp, &fastreg_wr, &bad_wr);
1959 	if (rc) {
1960 		dprintk("RPC:       %s: failed ib_post_send for register,"
1961 			" status %i\n", __func__, rc);
1962 		ib_update_fast_reg_key(mr, --key);
1963 		goto out_err;
1964 	} else {
1965 		seg1->mr_rkey = mr->rkey;
1966 		seg1->mr_base = seg1->mr_dma + pageoff;
1967 		seg1->mr_nsegs = i;
1968 		seg1->mr_len = len;
1969 	}
1970 	*nsegs = i;
1971 	return 0;
1972 out_err:
1973 	frmr->fr_state = FRMR_IS_INVALID;
1974 	while (i--)
1975 		rpcrdma_unmap_one(ia, --seg);
1976 	return rc;
1977 }
1978 
1979 static int
1980 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1981 			struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1982 {
1983 	struct rpcrdma_mr_seg *seg1 = seg;
1984 	struct ib_send_wr invalidate_wr, *bad_wr;
1985 	int rc;
1986 
1987 	seg1->rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
1988 
1989 	memset(&invalidate_wr, 0, sizeof invalidate_wr);
1990 	invalidate_wr.wr_id = (unsigned long)(void *)seg1->rl_mw;
1991 	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1992 	invalidate_wr.ex.invalidate_rkey = seg1->rl_mw->r.frmr.fr_mr->rkey;
1993 	DECR_CQCOUNT(&r_xprt->rx_ep);
1994 
1995 	read_lock(&ia->ri_qplock);
1996 	while (seg1->mr_nsegs--)
1997 		rpcrdma_unmap_one(ia, seg++);
1998 	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1999 	read_unlock(&ia->ri_qplock);
2000 	if (rc) {
2001 		/* Force rpcrdma_buffer_get() to retry */
2002 		seg1->rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
2003 		dprintk("RPC:       %s: failed ib_post_send for invalidate,"
2004 			" status %i\n", __func__, rc);
2005 	}
2006 	return rc;
2007 }
2008 
2009 static int
2010 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
2011 			int *nsegs, int writing, struct rpcrdma_ia *ia)
2012 {
2013 	struct rpcrdma_mr_seg *seg1 = seg;
2014 	u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
2015 	int len, pageoff, i, rc;
2016 
2017 	pageoff = offset_in_page(seg1->mr_offset);
2018 	seg1->mr_offset -= pageoff;	/* start of page */
2019 	seg1->mr_len += pageoff;
2020 	len = -pageoff;
2021 	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
2022 		*nsegs = RPCRDMA_MAX_DATA_SEGS;
2023 	for (i = 0; i < *nsegs;) {
2024 		rpcrdma_map_one(ia, seg, writing);
2025 		physaddrs[i] = seg->mr_dma;
2026 		len += seg->mr_len;
2027 		++seg;
2028 		++i;
2029 		/* Check for holes */
2030 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
2031 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
2032 			break;
2033 	}
2034 	rc = ib_map_phys_fmr(seg1->rl_mw->r.fmr, physaddrs, i, seg1->mr_dma);
2035 	if (rc) {
2036 		dprintk("RPC:       %s: failed ib_map_phys_fmr "
2037 			"%u@0x%llx+%i (%d)... status %i\n", __func__,
2038 			len, (unsigned long long)seg1->mr_dma,
2039 			pageoff, i, rc);
2040 		while (i--)
2041 			rpcrdma_unmap_one(ia, --seg);
2042 	} else {
2043 		seg1->mr_rkey = seg1->rl_mw->r.fmr->rkey;
2044 		seg1->mr_base = seg1->mr_dma + pageoff;
2045 		seg1->mr_nsegs = i;
2046 		seg1->mr_len = len;
2047 	}
2048 	*nsegs = i;
2049 	return rc;
2050 }
2051 
2052 static int
2053 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
2054 			struct rpcrdma_ia *ia)
2055 {
2056 	struct rpcrdma_mr_seg *seg1 = seg;
2057 	LIST_HEAD(l);
2058 	int rc;
2059 
2060 	list_add(&seg1->rl_mw->r.fmr->list, &l);
2061 	rc = ib_unmap_fmr(&l);
2062 	read_lock(&ia->ri_qplock);
2063 	while (seg1->mr_nsegs--)
2064 		rpcrdma_unmap_one(ia, seg++);
2065 	read_unlock(&ia->ri_qplock);
2066 	if (rc)
2067 		dprintk("RPC:       %s: failed ib_unmap_fmr,"
2068 			" status %i\n", __func__, rc);
2069 	return rc;
2070 }
2071 
2072 int
2073 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
2074 			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
2075 {
2076 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
2077 	int rc = 0;
2078 
2079 	switch (ia->ri_memreg_strategy) {
2080 
2081 	case RPCRDMA_ALLPHYSICAL:
2082 		rpcrdma_map_one(ia, seg, writing);
2083 		seg->mr_rkey = ia->ri_bind_mem->rkey;
2084 		seg->mr_base = seg->mr_dma;
2085 		seg->mr_nsegs = 1;
2086 		nsegs = 1;
2087 		break;
2088 
2089 	/* Registration using frmr registration */
2090 	case RPCRDMA_FRMR:
2091 		rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
2092 		break;
2093 
2094 	/* Registration using fmr memory registration */
2095 	case RPCRDMA_MTHCAFMR:
2096 		rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
2097 		break;
2098 
2099 	default:
2100 		return -EIO;
2101 	}
2102 	if (rc)
2103 		return rc;
2104 
2105 	return nsegs;
2106 }
2107 
2108 int
2109 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
2110 		struct rpcrdma_xprt *r_xprt)
2111 {
2112 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
2113 	int nsegs = seg->mr_nsegs, rc;
2114 
2115 	switch (ia->ri_memreg_strategy) {
2116 
2117 	case RPCRDMA_ALLPHYSICAL:
2118 		read_lock(&ia->ri_qplock);
2119 		rpcrdma_unmap_one(ia, seg);
2120 		read_unlock(&ia->ri_qplock);
2121 		break;
2122 
2123 	case RPCRDMA_FRMR:
2124 		rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
2125 		break;
2126 
2127 	case RPCRDMA_MTHCAFMR:
2128 		rc = rpcrdma_deregister_fmr_external(seg, ia);
2129 		break;
2130 
2131 	default:
2132 		break;
2133 	}
2134 	return nsegs;
2135 }
2136 
2137 /*
2138  * Prepost any receive buffer, then post send.
2139  *
2140  * Receive buffer is donated to hardware, reclaimed upon recv completion.
2141  */
2142 int
2143 rpcrdma_ep_post(struct rpcrdma_ia *ia,
2144 		struct rpcrdma_ep *ep,
2145 		struct rpcrdma_req *req)
2146 {
2147 	struct ib_send_wr send_wr, *send_wr_fail;
2148 	struct rpcrdma_rep *rep = req->rl_reply;
2149 	int rc;
2150 
2151 	if (rep) {
2152 		rc = rpcrdma_ep_post_recv(ia, ep, rep);
2153 		if (rc)
2154 			goto out;
2155 		req->rl_reply = NULL;
2156 	}
2157 
2158 	send_wr.next = NULL;
2159 	send_wr.wr_id = 0ULL;	/* no send cookie */
2160 	send_wr.sg_list = req->rl_send_iov;
2161 	send_wr.num_sge = req->rl_niovs;
2162 	send_wr.opcode = IB_WR_SEND;
2163 	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
2164 		ib_dma_sync_single_for_device(ia->ri_id->device,
2165 			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
2166 			DMA_TO_DEVICE);
2167 	ib_dma_sync_single_for_device(ia->ri_id->device,
2168 		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
2169 		DMA_TO_DEVICE);
2170 	ib_dma_sync_single_for_device(ia->ri_id->device,
2171 		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
2172 		DMA_TO_DEVICE);
2173 
2174 	if (DECR_CQCOUNT(ep) > 0)
2175 		send_wr.send_flags = 0;
2176 	else { /* Provider must take a send completion every now and then */
2177 		INIT_CQCOUNT(ep);
2178 		send_wr.send_flags = IB_SEND_SIGNALED;
2179 	}
2180 
2181 	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
2182 	if (rc)
2183 		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
2184 			rc);
2185 out:
2186 	return rc;
2187 }
2188 
2189 /*
2190  * (Re)post a receive buffer.
2191  */
2192 int
2193 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
2194 		     struct rpcrdma_ep *ep,
2195 		     struct rpcrdma_rep *rep)
2196 {
2197 	struct ib_recv_wr recv_wr, *recv_wr_fail;
2198 	int rc;
2199 
2200 	recv_wr.next = NULL;
2201 	recv_wr.wr_id = (u64) (unsigned long) rep;
2202 	recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
2203 	recv_wr.num_sge = 1;
2204 
2205 	ib_dma_sync_single_for_cpu(ia->ri_id->device,
2206 				   rdmab_addr(rep->rr_rdmabuf),
2207 				   rdmab_length(rep->rr_rdmabuf),
2208 				   DMA_BIDIRECTIONAL);
2209 
2210 	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
2211 
2212 	if (rc)
2213 		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
2214 			rc);
2215 	return rc;
2216 }
2217 
2218 /* Physical mapping means one Read/Write list entry per-page.
2219  * All list entries must fit within an inline buffer
2220  *
2221  * NB: The server must return a Write list for NFS READ,
2222  *     which has the same constraint. Factor in the inline
2223  *     rsize as well.
2224  */
2225 static size_t
2226 rpcrdma_physical_max_payload(struct rpcrdma_xprt *r_xprt)
2227 {
2228 	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
2229 	unsigned int inline_size, pages;
2230 
2231 	inline_size = min_t(unsigned int,
2232 			    cdata->inline_wsize, cdata->inline_rsize);
2233 	inline_size -= RPCRDMA_HDRLEN_MIN;
2234 	pages = inline_size / sizeof(struct rpcrdma_segment);
2235 	return pages << PAGE_SHIFT;
2236 }
2237 
2238 static size_t
2239 rpcrdma_mr_max_payload(struct rpcrdma_xprt *r_xprt)
2240 {
2241 	return RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT;
2242 }
2243 
2244 size_t
2245 rpcrdma_max_payload(struct rpcrdma_xprt *r_xprt)
2246 {
2247 	size_t result;
2248 
2249 	switch (r_xprt->rx_ia.ri_memreg_strategy) {
2250 	case RPCRDMA_ALLPHYSICAL:
2251 		result = rpcrdma_physical_max_payload(r_xprt);
2252 		break;
2253 	default:
2254 		result = rpcrdma_mr_max_payload(r_xprt);
2255 	}
2256 	return result;
2257 }
2258