xref: /openbmc/linux/net/sunrpc/xprtrdma/verbs.c (revision c819e2cf)
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39 
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49 
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <asm/bitops.h>
53 
54 #include "xprt_rdma.h"
55 
56 /*
57  * Globals/Macros
58  */
59 
60 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
61 # define RPCDBG_FACILITY	RPCDBG_TRANS
62 #endif
63 
64 static void rpcrdma_reset_frmrs(struct rpcrdma_ia *);
65 static void rpcrdma_reset_fmrs(struct rpcrdma_ia *);
66 
67 /*
68  * internal functions
69  */
70 
71 /*
72  * handle replies in tasklet context, using a single, global list
73  * rdma tasklet function -- just turn around and call the func
74  * for all replies on the list
75  */
76 
77 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
78 static LIST_HEAD(rpcrdma_tasklets_g);
79 
80 static void
81 rpcrdma_run_tasklet(unsigned long data)
82 {
83 	struct rpcrdma_rep *rep;
84 	void (*func)(struct rpcrdma_rep *);
85 	unsigned long flags;
86 
87 	data = data;
88 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
89 	while (!list_empty(&rpcrdma_tasklets_g)) {
90 		rep = list_entry(rpcrdma_tasklets_g.next,
91 				 struct rpcrdma_rep, rr_list);
92 		list_del(&rep->rr_list);
93 		func = rep->rr_func;
94 		rep->rr_func = NULL;
95 		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
96 
97 		if (func)
98 			func(rep);
99 		else
100 			rpcrdma_recv_buffer_put(rep);
101 
102 		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
103 	}
104 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
105 }
106 
107 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
108 
109 static const char * const async_event[] = {
110 	"CQ error",
111 	"QP fatal error",
112 	"QP request error",
113 	"QP access error",
114 	"communication established",
115 	"send queue drained",
116 	"path migration successful",
117 	"path mig error",
118 	"device fatal error",
119 	"port active",
120 	"port error",
121 	"LID change",
122 	"P_key change",
123 	"SM change",
124 	"SRQ error",
125 	"SRQ limit reached",
126 	"last WQE reached",
127 	"client reregister",
128 	"GID change",
129 };
130 
131 #define ASYNC_MSG(status)					\
132 	((status) < ARRAY_SIZE(async_event) ?			\
133 		async_event[(status)] : "unknown async error")
134 
135 static void
136 rpcrdma_schedule_tasklet(struct list_head *sched_list)
137 {
138 	unsigned long flags;
139 
140 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
141 	list_splice_tail(sched_list, &rpcrdma_tasklets_g);
142 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
143 	tasklet_schedule(&rpcrdma_tasklet_g);
144 }
145 
146 static void
147 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
148 {
149 	struct rpcrdma_ep *ep = context;
150 
151 	pr_err("RPC:       %s: %s on device %s ep %p\n",
152 	       __func__, ASYNC_MSG(event->event),
153 		event->device->name, context);
154 	if (ep->rep_connected == 1) {
155 		ep->rep_connected = -EIO;
156 		ep->rep_func(ep);
157 		wake_up_all(&ep->rep_connect_wait);
158 	}
159 }
160 
161 static void
162 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
163 {
164 	struct rpcrdma_ep *ep = context;
165 
166 	pr_err("RPC:       %s: %s on device %s ep %p\n",
167 	       __func__, ASYNC_MSG(event->event),
168 		event->device->name, context);
169 	if (ep->rep_connected == 1) {
170 		ep->rep_connected = -EIO;
171 		ep->rep_func(ep);
172 		wake_up_all(&ep->rep_connect_wait);
173 	}
174 }
175 
176 static void
177 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
178 {
179 	struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
180 
181 	dprintk("RPC:       %s: frmr %p status %X opcode %d\n",
182 		__func__, frmr, wc->status, wc->opcode);
183 
184 	if (wc->wr_id == 0ULL)
185 		return;
186 	if (wc->status != IB_WC_SUCCESS)
187 		frmr->r.frmr.fr_state = FRMR_IS_STALE;
188 }
189 
190 static int
191 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
192 {
193 	struct ib_wc *wcs;
194 	int budget, count, rc;
195 
196 	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
197 	do {
198 		wcs = ep->rep_send_wcs;
199 
200 		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
201 		if (rc <= 0)
202 			return rc;
203 
204 		count = rc;
205 		while (count-- > 0)
206 			rpcrdma_sendcq_process_wc(wcs++);
207 	} while (rc == RPCRDMA_POLLSIZE && --budget);
208 	return 0;
209 }
210 
211 /*
212  * Handle send, fast_reg_mr, and local_inv completions.
213  *
214  * Send events are typically suppressed and thus do not result
215  * in an upcall. Occasionally one is signaled, however. This
216  * prevents the provider's completion queue from wrapping and
217  * losing a completion.
218  */
219 static void
220 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
221 {
222 	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
223 	int rc;
224 
225 	rc = rpcrdma_sendcq_poll(cq, ep);
226 	if (rc) {
227 		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
228 			__func__, rc);
229 		return;
230 	}
231 
232 	rc = ib_req_notify_cq(cq,
233 			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
234 	if (rc == 0)
235 		return;
236 	if (rc < 0) {
237 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
238 			__func__, rc);
239 		return;
240 	}
241 
242 	rpcrdma_sendcq_poll(cq, ep);
243 }
244 
245 static void
246 rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
247 {
248 	struct rpcrdma_rep *rep =
249 			(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
250 
251 	dprintk("RPC:       %s: rep %p status %X opcode %X length %u\n",
252 		__func__, rep, wc->status, wc->opcode, wc->byte_len);
253 
254 	if (wc->status != IB_WC_SUCCESS) {
255 		rep->rr_len = ~0U;
256 		goto out_schedule;
257 	}
258 	if (wc->opcode != IB_WC_RECV)
259 		return;
260 
261 	rep->rr_len = wc->byte_len;
262 	ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
263 			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
264 
265 	if (rep->rr_len >= 16) {
266 		struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
267 		unsigned int credits = ntohl(p->rm_credit);
268 
269 		if (credits == 0)
270 			credits = 1;	/* don't deadlock */
271 		else if (credits > rep->rr_buffer->rb_max_requests)
272 			credits = rep->rr_buffer->rb_max_requests;
273 		atomic_set(&rep->rr_buffer->rb_credits, credits);
274 	}
275 
276 out_schedule:
277 	list_add_tail(&rep->rr_list, sched_list);
278 }
279 
280 static int
281 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
282 {
283 	struct list_head sched_list;
284 	struct ib_wc *wcs;
285 	int budget, count, rc;
286 
287 	INIT_LIST_HEAD(&sched_list);
288 	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
289 	do {
290 		wcs = ep->rep_recv_wcs;
291 
292 		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
293 		if (rc <= 0)
294 			goto out_schedule;
295 
296 		count = rc;
297 		while (count-- > 0)
298 			rpcrdma_recvcq_process_wc(wcs++, &sched_list);
299 	} while (rc == RPCRDMA_POLLSIZE && --budget);
300 	rc = 0;
301 
302 out_schedule:
303 	rpcrdma_schedule_tasklet(&sched_list);
304 	return rc;
305 }
306 
307 /*
308  * Handle receive completions.
309  *
310  * It is reentrant but processes single events in order to maintain
311  * ordering of receives to keep server credits.
312  *
313  * It is the responsibility of the scheduled tasklet to return
314  * recv buffers to the pool. NOTE: this affects synchronization of
315  * connection shutdown. That is, the structures required for
316  * the completion of the reply handler must remain intact until
317  * all memory has been reclaimed.
318  */
319 static void
320 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
321 {
322 	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
323 	int rc;
324 
325 	rc = rpcrdma_recvcq_poll(cq, ep);
326 	if (rc) {
327 		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
328 			__func__, rc);
329 		return;
330 	}
331 
332 	rc = ib_req_notify_cq(cq,
333 			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
334 	if (rc == 0)
335 		return;
336 	if (rc < 0) {
337 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
338 			__func__, rc);
339 		return;
340 	}
341 
342 	rpcrdma_recvcq_poll(cq, ep);
343 }
344 
345 static void
346 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
347 {
348 	struct ib_wc wc;
349 	LIST_HEAD(sched_list);
350 
351 	while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
352 		rpcrdma_recvcq_process_wc(&wc, &sched_list);
353 	if (!list_empty(&sched_list))
354 		rpcrdma_schedule_tasklet(&sched_list);
355 	while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
356 		rpcrdma_sendcq_process_wc(&wc);
357 }
358 
359 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
360 static const char * const conn[] = {
361 	"address resolved",
362 	"address error",
363 	"route resolved",
364 	"route error",
365 	"connect request",
366 	"connect response",
367 	"connect error",
368 	"unreachable",
369 	"rejected",
370 	"established",
371 	"disconnected",
372 	"device removal",
373 	"multicast join",
374 	"multicast error",
375 	"address change",
376 	"timewait exit",
377 };
378 
379 #define CONNECTION_MSG(status)						\
380 	((status) < ARRAY_SIZE(conn) ?					\
381 		conn[(status)] : "unrecognized connection error")
382 #endif
383 
384 static int
385 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
386 {
387 	struct rpcrdma_xprt *xprt = id->context;
388 	struct rpcrdma_ia *ia = &xprt->rx_ia;
389 	struct rpcrdma_ep *ep = &xprt->rx_ep;
390 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
391 	struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
392 #endif
393 	struct ib_qp_attr attr;
394 	struct ib_qp_init_attr iattr;
395 	int connstate = 0;
396 
397 	switch (event->event) {
398 	case RDMA_CM_EVENT_ADDR_RESOLVED:
399 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
400 		ia->ri_async_rc = 0;
401 		complete(&ia->ri_done);
402 		break;
403 	case RDMA_CM_EVENT_ADDR_ERROR:
404 		ia->ri_async_rc = -EHOSTUNREACH;
405 		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
406 			__func__, ep);
407 		complete(&ia->ri_done);
408 		break;
409 	case RDMA_CM_EVENT_ROUTE_ERROR:
410 		ia->ri_async_rc = -ENETUNREACH;
411 		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
412 			__func__, ep);
413 		complete(&ia->ri_done);
414 		break;
415 	case RDMA_CM_EVENT_ESTABLISHED:
416 		connstate = 1;
417 		ib_query_qp(ia->ri_id->qp, &attr,
418 			IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
419 			&iattr);
420 		dprintk("RPC:       %s: %d responder resources"
421 			" (%d initiator)\n",
422 			__func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
423 		goto connected;
424 	case RDMA_CM_EVENT_CONNECT_ERROR:
425 		connstate = -ENOTCONN;
426 		goto connected;
427 	case RDMA_CM_EVENT_UNREACHABLE:
428 		connstate = -ENETDOWN;
429 		goto connected;
430 	case RDMA_CM_EVENT_REJECTED:
431 		connstate = -ECONNREFUSED;
432 		goto connected;
433 	case RDMA_CM_EVENT_DISCONNECTED:
434 		connstate = -ECONNABORTED;
435 		goto connected;
436 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
437 		connstate = -ENODEV;
438 connected:
439 		atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
440 		dprintk("RPC:       %s: %sconnected\n",
441 					__func__, connstate > 0 ? "" : "dis");
442 		ep->rep_connected = connstate;
443 		ep->rep_func(ep);
444 		wake_up_all(&ep->rep_connect_wait);
445 		/*FALLTHROUGH*/
446 	default:
447 		dprintk("RPC:       %s: %pI4:%u (ep 0x%p): %s\n",
448 			__func__, &addr->sin_addr.s_addr,
449 			ntohs(addr->sin_port), ep,
450 			CONNECTION_MSG(event->event));
451 		break;
452 	}
453 
454 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
455 	if (connstate == 1) {
456 		int ird = attr.max_dest_rd_atomic;
457 		int tird = ep->rep_remote_cma.responder_resources;
458 		printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
459 			"on %s, memreg %d slots %d ird %d%s\n",
460 			&addr->sin_addr.s_addr,
461 			ntohs(addr->sin_port),
462 			ia->ri_id->device->name,
463 			ia->ri_memreg_strategy,
464 			xprt->rx_buf.rb_max_requests,
465 			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
466 	} else if (connstate < 0) {
467 		printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
468 			&addr->sin_addr.s_addr,
469 			ntohs(addr->sin_port),
470 			connstate);
471 	}
472 #endif
473 
474 	return 0;
475 }
476 
477 static struct rdma_cm_id *
478 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
479 			struct rpcrdma_ia *ia, struct sockaddr *addr)
480 {
481 	struct rdma_cm_id *id;
482 	int rc;
483 
484 	init_completion(&ia->ri_done);
485 
486 	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
487 	if (IS_ERR(id)) {
488 		rc = PTR_ERR(id);
489 		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
490 			__func__, rc);
491 		return id;
492 	}
493 
494 	ia->ri_async_rc = -ETIMEDOUT;
495 	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
496 	if (rc) {
497 		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
498 			__func__, rc);
499 		goto out;
500 	}
501 	wait_for_completion_interruptible_timeout(&ia->ri_done,
502 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
503 	rc = ia->ri_async_rc;
504 	if (rc)
505 		goto out;
506 
507 	ia->ri_async_rc = -ETIMEDOUT;
508 	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
509 	if (rc) {
510 		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
511 			__func__, rc);
512 		goto out;
513 	}
514 	wait_for_completion_interruptible_timeout(&ia->ri_done,
515 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
516 	rc = ia->ri_async_rc;
517 	if (rc)
518 		goto out;
519 
520 	return id;
521 
522 out:
523 	rdma_destroy_id(id);
524 	return ERR_PTR(rc);
525 }
526 
527 /*
528  * Drain any cq, prior to teardown.
529  */
530 static void
531 rpcrdma_clean_cq(struct ib_cq *cq)
532 {
533 	struct ib_wc wc;
534 	int count = 0;
535 
536 	while (1 == ib_poll_cq(cq, 1, &wc))
537 		++count;
538 
539 	if (count)
540 		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
541 			__func__, count, wc.opcode);
542 }
543 
544 /*
545  * Exported functions.
546  */
547 
548 /*
549  * Open and initialize an Interface Adapter.
550  *  o initializes fields of struct rpcrdma_ia, including
551  *    interface and provider attributes and protection zone.
552  */
553 int
554 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
555 {
556 	int rc, mem_priv;
557 	struct ib_device_attr devattr;
558 	struct rpcrdma_ia *ia = &xprt->rx_ia;
559 
560 	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
561 	if (IS_ERR(ia->ri_id)) {
562 		rc = PTR_ERR(ia->ri_id);
563 		goto out1;
564 	}
565 
566 	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
567 	if (IS_ERR(ia->ri_pd)) {
568 		rc = PTR_ERR(ia->ri_pd);
569 		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
570 			__func__, rc);
571 		goto out2;
572 	}
573 
574 	/*
575 	 * Query the device to determine if the requested memory
576 	 * registration strategy is supported. If it isn't, set the
577 	 * strategy to a globally supported model.
578 	 */
579 	rc = ib_query_device(ia->ri_id->device, &devattr);
580 	if (rc) {
581 		dprintk("RPC:       %s: ib_query_device failed %d\n",
582 			__func__, rc);
583 		goto out2;
584 	}
585 
586 	if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
587 		ia->ri_have_dma_lkey = 1;
588 		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
589 	}
590 
591 	if (memreg == RPCRDMA_FRMR) {
592 		/* Requires both frmr reg and local dma lkey */
593 		if ((devattr.device_cap_flags &
594 		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
595 		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
596 			dprintk("RPC:       %s: FRMR registration "
597 				"not supported by HCA\n", __func__);
598 			memreg = RPCRDMA_MTHCAFMR;
599 		} else {
600 			/* Mind the ia limit on FRMR page list depth */
601 			ia->ri_max_frmr_depth = min_t(unsigned int,
602 				RPCRDMA_MAX_DATA_SEGS,
603 				devattr.max_fast_reg_page_list_len);
604 		}
605 	}
606 	if (memreg == RPCRDMA_MTHCAFMR) {
607 		if (!ia->ri_id->device->alloc_fmr) {
608 			dprintk("RPC:       %s: MTHCAFMR registration "
609 				"not supported by HCA\n", __func__);
610 			memreg = RPCRDMA_ALLPHYSICAL;
611 		}
612 	}
613 
614 	/*
615 	 * Optionally obtain an underlying physical identity mapping in
616 	 * order to do a memory window-based bind. This base registration
617 	 * is protected from remote access - that is enabled only by binding
618 	 * for the specific bytes targeted during each RPC operation, and
619 	 * revoked after the corresponding completion similar to a storage
620 	 * adapter.
621 	 */
622 	switch (memreg) {
623 	case RPCRDMA_FRMR:
624 		break;
625 	case RPCRDMA_ALLPHYSICAL:
626 		mem_priv = IB_ACCESS_LOCAL_WRITE |
627 				IB_ACCESS_REMOTE_WRITE |
628 				IB_ACCESS_REMOTE_READ;
629 		goto register_setup;
630 	case RPCRDMA_MTHCAFMR:
631 		if (ia->ri_have_dma_lkey)
632 			break;
633 		mem_priv = IB_ACCESS_LOCAL_WRITE;
634 	register_setup:
635 		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
636 		if (IS_ERR(ia->ri_bind_mem)) {
637 			printk(KERN_ALERT "%s: ib_get_dma_mr for "
638 				"phys register failed with %lX\n",
639 				__func__, PTR_ERR(ia->ri_bind_mem));
640 			rc = -ENOMEM;
641 			goto out2;
642 		}
643 		break;
644 	default:
645 		printk(KERN_ERR "RPC: Unsupported memory "
646 				"registration mode: %d\n", memreg);
647 		rc = -ENOMEM;
648 		goto out2;
649 	}
650 	dprintk("RPC:       %s: memory registration strategy is %d\n",
651 		__func__, memreg);
652 
653 	/* Else will do memory reg/dereg for each chunk */
654 	ia->ri_memreg_strategy = memreg;
655 
656 	rwlock_init(&ia->ri_qplock);
657 	return 0;
658 out2:
659 	rdma_destroy_id(ia->ri_id);
660 	ia->ri_id = NULL;
661 out1:
662 	return rc;
663 }
664 
665 /*
666  * Clean up/close an IA.
667  *   o if event handles and PD have been initialized, free them.
668  *   o close the IA
669  */
670 void
671 rpcrdma_ia_close(struct rpcrdma_ia *ia)
672 {
673 	int rc;
674 
675 	dprintk("RPC:       %s: entering\n", __func__);
676 	if (ia->ri_bind_mem != NULL) {
677 		rc = ib_dereg_mr(ia->ri_bind_mem);
678 		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
679 			__func__, rc);
680 	}
681 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
682 		if (ia->ri_id->qp)
683 			rdma_destroy_qp(ia->ri_id);
684 		rdma_destroy_id(ia->ri_id);
685 		ia->ri_id = NULL;
686 	}
687 	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
688 		rc = ib_dealloc_pd(ia->ri_pd);
689 		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
690 			__func__, rc);
691 	}
692 }
693 
694 /*
695  * Create unconnected endpoint.
696  */
697 int
698 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
699 				struct rpcrdma_create_data_internal *cdata)
700 {
701 	struct ib_device_attr devattr;
702 	struct ib_cq *sendcq, *recvcq;
703 	int rc, err;
704 
705 	rc = ib_query_device(ia->ri_id->device, &devattr);
706 	if (rc) {
707 		dprintk("RPC:       %s: ib_query_device failed %d\n",
708 			__func__, rc);
709 		return rc;
710 	}
711 
712 	/* check provider's send/recv wr limits */
713 	if (cdata->max_requests > devattr.max_qp_wr)
714 		cdata->max_requests = devattr.max_qp_wr;
715 
716 	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
717 	ep->rep_attr.qp_context = ep;
718 	/* send_cq and recv_cq initialized below */
719 	ep->rep_attr.srq = NULL;
720 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
721 	switch (ia->ri_memreg_strategy) {
722 	case RPCRDMA_FRMR: {
723 		int depth = 7;
724 
725 		/* Add room for frmr register and invalidate WRs.
726 		 * 1. FRMR reg WR for head
727 		 * 2. FRMR invalidate WR for head
728 		 * 3. N FRMR reg WRs for pagelist
729 		 * 4. N FRMR invalidate WRs for pagelist
730 		 * 5. FRMR reg WR for tail
731 		 * 6. FRMR invalidate WR for tail
732 		 * 7. The RDMA_SEND WR
733 		 */
734 
735 		/* Calculate N if the device max FRMR depth is smaller than
736 		 * RPCRDMA_MAX_DATA_SEGS.
737 		 */
738 		if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
739 			int delta = RPCRDMA_MAX_DATA_SEGS -
740 				    ia->ri_max_frmr_depth;
741 
742 			do {
743 				depth += 2; /* FRMR reg + invalidate */
744 				delta -= ia->ri_max_frmr_depth;
745 			} while (delta > 0);
746 
747 		}
748 		ep->rep_attr.cap.max_send_wr *= depth;
749 		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
750 			cdata->max_requests = devattr.max_qp_wr / depth;
751 			if (!cdata->max_requests)
752 				return -EINVAL;
753 			ep->rep_attr.cap.max_send_wr = cdata->max_requests *
754 						       depth;
755 		}
756 		break;
757 	}
758 	default:
759 		break;
760 	}
761 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
762 	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
763 	ep->rep_attr.cap.max_recv_sge = 1;
764 	ep->rep_attr.cap.max_inline_data = 0;
765 	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
766 	ep->rep_attr.qp_type = IB_QPT_RC;
767 	ep->rep_attr.port_num = ~0;
768 
769 	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
770 		"iovs: send %d recv %d\n",
771 		__func__,
772 		ep->rep_attr.cap.max_send_wr,
773 		ep->rep_attr.cap.max_recv_wr,
774 		ep->rep_attr.cap.max_send_sge,
775 		ep->rep_attr.cap.max_recv_sge);
776 
777 	/* set trigger for requesting send completion */
778 	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
779 	if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
780 		ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
781 	else if (ep->rep_cqinit <= 2)
782 		ep->rep_cqinit = 0;
783 	INIT_CQCOUNT(ep);
784 	ep->rep_ia = ia;
785 	init_waitqueue_head(&ep->rep_connect_wait);
786 	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
787 
788 	sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
789 				  rpcrdma_cq_async_error_upcall, ep,
790 				  ep->rep_attr.cap.max_send_wr + 1, 0);
791 	if (IS_ERR(sendcq)) {
792 		rc = PTR_ERR(sendcq);
793 		dprintk("RPC:       %s: failed to create send CQ: %i\n",
794 			__func__, rc);
795 		goto out1;
796 	}
797 
798 	rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
799 	if (rc) {
800 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
801 			__func__, rc);
802 		goto out2;
803 	}
804 
805 	recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
806 				  rpcrdma_cq_async_error_upcall, ep,
807 				  ep->rep_attr.cap.max_recv_wr + 1, 0);
808 	if (IS_ERR(recvcq)) {
809 		rc = PTR_ERR(recvcq);
810 		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
811 			__func__, rc);
812 		goto out2;
813 	}
814 
815 	rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
816 	if (rc) {
817 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
818 			__func__, rc);
819 		ib_destroy_cq(recvcq);
820 		goto out2;
821 	}
822 
823 	ep->rep_attr.send_cq = sendcq;
824 	ep->rep_attr.recv_cq = recvcq;
825 
826 	/* Initialize cma parameters */
827 
828 	/* RPC/RDMA does not use private data */
829 	ep->rep_remote_cma.private_data = NULL;
830 	ep->rep_remote_cma.private_data_len = 0;
831 
832 	/* Client offers RDMA Read but does not initiate */
833 	ep->rep_remote_cma.initiator_depth = 0;
834 	if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
835 		ep->rep_remote_cma.responder_resources = 32;
836 	else
837 		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
838 
839 	ep->rep_remote_cma.retry_count = 7;
840 	ep->rep_remote_cma.flow_control = 0;
841 	ep->rep_remote_cma.rnr_retry_count = 0;
842 
843 	return 0;
844 
845 out2:
846 	err = ib_destroy_cq(sendcq);
847 	if (err)
848 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
849 			__func__, err);
850 out1:
851 	return rc;
852 }
853 
854 /*
855  * rpcrdma_ep_destroy
856  *
857  * Disconnect and destroy endpoint. After this, the only
858  * valid operations on the ep are to free it (if dynamically
859  * allocated) or re-create it.
860  */
861 void
862 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
863 {
864 	int rc;
865 
866 	dprintk("RPC:       %s: entering, connected is %d\n",
867 		__func__, ep->rep_connected);
868 
869 	cancel_delayed_work_sync(&ep->rep_connect_worker);
870 
871 	if (ia->ri_id->qp) {
872 		rpcrdma_ep_disconnect(ep, ia);
873 		rdma_destroy_qp(ia->ri_id);
874 		ia->ri_id->qp = NULL;
875 	}
876 
877 	/* padding - could be done in rpcrdma_buffer_destroy... */
878 	if (ep->rep_pad_mr) {
879 		rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
880 		ep->rep_pad_mr = NULL;
881 	}
882 
883 	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
884 	rc = ib_destroy_cq(ep->rep_attr.recv_cq);
885 	if (rc)
886 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
887 			__func__, rc);
888 
889 	rpcrdma_clean_cq(ep->rep_attr.send_cq);
890 	rc = ib_destroy_cq(ep->rep_attr.send_cq);
891 	if (rc)
892 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
893 			__func__, rc);
894 }
895 
896 /*
897  * Connect unconnected endpoint.
898  */
899 int
900 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
901 {
902 	struct rdma_cm_id *id, *old;
903 	int rc = 0;
904 	int retry_count = 0;
905 
906 	if (ep->rep_connected != 0) {
907 		struct rpcrdma_xprt *xprt;
908 retry:
909 		dprintk("RPC:       %s: reconnecting...\n", __func__);
910 
911 		rpcrdma_ep_disconnect(ep, ia);
912 		rpcrdma_flush_cqs(ep);
913 
914 		switch (ia->ri_memreg_strategy) {
915 		case RPCRDMA_FRMR:
916 			rpcrdma_reset_frmrs(ia);
917 			break;
918 		case RPCRDMA_MTHCAFMR:
919 			rpcrdma_reset_fmrs(ia);
920 			break;
921 		case RPCRDMA_ALLPHYSICAL:
922 			break;
923 		default:
924 			rc = -EIO;
925 			goto out;
926 		}
927 
928 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
929 		id = rpcrdma_create_id(xprt, ia,
930 				(struct sockaddr *)&xprt->rx_data.addr);
931 		if (IS_ERR(id)) {
932 			rc = -EHOSTUNREACH;
933 			goto out;
934 		}
935 		/* TEMP TEMP TEMP - fail if new device:
936 		 * Deregister/remarshal *all* requests!
937 		 * Close and recreate adapter, pd, etc!
938 		 * Re-determine all attributes still sane!
939 		 * More stuff I haven't thought of!
940 		 * Rrrgh!
941 		 */
942 		if (ia->ri_id->device != id->device) {
943 			printk("RPC:       %s: can't reconnect on "
944 				"different device!\n", __func__);
945 			rdma_destroy_id(id);
946 			rc = -ENETUNREACH;
947 			goto out;
948 		}
949 		/* END TEMP */
950 		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
951 		if (rc) {
952 			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
953 				__func__, rc);
954 			rdma_destroy_id(id);
955 			rc = -ENETUNREACH;
956 			goto out;
957 		}
958 
959 		write_lock(&ia->ri_qplock);
960 		old = ia->ri_id;
961 		ia->ri_id = id;
962 		write_unlock(&ia->ri_qplock);
963 
964 		rdma_destroy_qp(old);
965 		rdma_destroy_id(old);
966 	} else {
967 		dprintk("RPC:       %s: connecting...\n", __func__);
968 		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
969 		if (rc) {
970 			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
971 				__func__, rc);
972 			/* do not update ep->rep_connected */
973 			return -ENETUNREACH;
974 		}
975 	}
976 
977 	ep->rep_connected = 0;
978 
979 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
980 	if (rc) {
981 		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
982 				__func__, rc);
983 		goto out;
984 	}
985 
986 	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
987 
988 	/*
989 	 * Check state. A non-peer reject indicates no listener
990 	 * (ECONNREFUSED), which may be a transient state. All
991 	 * others indicate a transport condition which has already
992 	 * undergone a best-effort.
993 	 */
994 	if (ep->rep_connected == -ECONNREFUSED &&
995 	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
996 		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
997 		goto retry;
998 	}
999 	if (ep->rep_connected <= 0) {
1000 		/* Sometimes, the only way to reliably connect to remote
1001 		 * CMs is to use same nonzero values for ORD and IRD. */
1002 		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
1003 		    (ep->rep_remote_cma.responder_resources == 0 ||
1004 		     ep->rep_remote_cma.initiator_depth !=
1005 				ep->rep_remote_cma.responder_resources)) {
1006 			if (ep->rep_remote_cma.responder_resources == 0)
1007 				ep->rep_remote_cma.responder_resources = 1;
1008 			ep->rep_remote_cma.initiator_depth =
1009 				ep->rep_remote_cma.responder_resources;
1010 			goto retry;
1011 		}
1012 		rc = ep->rep_connected;
1013 	} else {
1014 		dprintk("RPC:       %s: connected\n", __func__);
1015 	}
1016 
1017 out:
1018 	if (rc)
1019 		ep->rep_connected = rc;
1020 	return rc;
1021 }
1022 
1023 /*
1024  * rpcrdma_ep_disconnect
1025  *
1026  * This is separate from destroy to facilitate the ability
1027  * to reconnect without recreating the endpoint.
1028  *
1029  * This call is not reentrant, and must not be made in parallel
1030  * on the same endpoint.
1031  */
1032 void
1033 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
1034 {
1035 	int rc;
1036 
1037 	rpcrdma_flush_cqs(ep);
1038 	rc = rdma_disconnect(ia->ri_id);
1039 	if (!rc) {
1040 		/* returns without wait if not connected */
1041 		wait_event_interruptible(ep->rep_connect_wait,
1042 							ep->rep_connected != 1);
1043 		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
1044 			(ep->rep_connected == 1) ? "still " : "dis");
1045 	} else {
1046 		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
1047 		ep->rep_connected = rc;
1048 	}
1049 }
1050 
1051 static int
1052 rpcrdma_init_fmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
1053 {
1054 	int mr_access_flags = IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ;
1055 	struct ib_fmr_attr fmr_attr = {
1056 		.max_pages	= RPCRDMA_MAX_DATA_SEGS,
1057 		.max_maps	= 1,
1058 		.page_shift	= PAGE_SHIFT
1059 	};
1060 	struct rpcrdma_mw *r;
1061 	int i, rc;
1062 
1063 	i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
1064 	dprintk("RPC:       %s: initalizing %d FMRs\n", __func__, i);
1065 
1066 	while (i--) {
1067 		r = kzalloc(sizeof(*r), GFP_KERNEL);
1068 		if (r == NULL)
1069 			return -ENOMEM;
1070 
1071 		r->r.fmr = ib_alloc_fmr(ia->ri_pd, mr_access_flags, &fmr_attr);
1072 		if (IS_ERR(r->r.fmr)) {
1073 			rc = PTR_ERR(r->r.fmr);
1074 			dprintk("RPC:       %s: ib_alloc_fmr failed %i\n",
1075 				__func__, rc);
1076 			goto out_free;
1077 		}
1078 
1079 		list_add(&r->mw_list, &buf->rb_mws);
1080 		list_add(&r->mw_all, &buf->rb_all);
1081 	}
1082 	return 0;
1083 
1084 out_free:
1085 	kfree(r);
1086 	return rc;
1087 }
1088 
1089 static int
1090 rpcrdma_init_frmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
1091 {
1092 	struct rpcrdma_frmr *f;
1093 	struct rpcrdma_mw *r;
1094 	int i, rc;
1095 
1096 	i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
1097 	dprintk("RPC:       %s: initalizing %d FRMRs\n", __func__, i);
1098 
1099 	while (i--) {
1100 		r = kzalloc(sizeof(*r), GFP_KERNEL);
1101 		if (r == NULL)
1102 			return -ENOMEM;
1103 		f = &r->r.frmr;
1104 
1105 		f->fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1106 						ia->ri_max_frmr_depth);
1107 		if (IS_ERR(f->fr_mr)) {
1108 			rc = PTR_ERR(f->fr_mr);
1109 			dprintk("RPC:       %s: ib_alloc_fast_reg_mr "
1110 				"failed %i\n", __func__, rc);
1111 			goto out_free;
1112 		}
1113 
1114 		f->fr_pgl = ib_alloc_fast_reg_page_list(ia->ri_id->device,
1115 							ia->ri_max_frmr_depth);
1116 		if (IS_ERR(f->fr_pgl)) {
1117 			rc = PTR_ERR(f->fr_pgl);
1118 			dprintk("RPC:       %s: ib_alloc_fast_reg_page_list "
1119 				"failed %i\n", __func__, rc);
1120 
1121 			ib_dereg_mr(f->fr_mr);
1122 			goto out_free;
1123 		}
1124 
1125 		list_add(&r->mw_list, &buf->rb_mws);
1126 		list_add(&r->mw_all, &buf->rb_all);
1127 	}
1128 
1129 	return 0;
1130 
1131 out_free:
1132 	kfree(r);
1133 	return rc;
1134 }
1135 
1136 int
1137 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
1138 	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
1139 {
1140 	char *p;
1141 	size_t len, rlen, wlen;
1142 	int i, rc;
1143 
1144 	buf->rb_max_requests = cdata->max_requests;
1145 	spin_lock_init(&buf->rb_lock);
1146 	atomic_set(&buf->rb_credits, 1);
1147 
1148 	/* Need to allocate:
1149 	 *   1.  arrays for send and recv pointers
1150 	 *   2.  arrays of struct rpcrdma_req to fill in pointers
1151 	 *   3.  array of struct rpcrdma_rep for replies
1152 	 *   4.  padding, if any
1153 	 * Send/recv buffers in req/rep need to be registered
1154 	 */
1155 	len = buf->rb_max_requests *
1156 		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1157 	len += cdata->padding;
1158 
1159 	p = kzalloc(len, GFP_KERNEL);
1160 	if (p == NULL) {
1161 		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1162 			__func__, len);
1163 		rc = -ENOMEM;
1164 		goto out;
1165 	}
1166 	buf->rb_pool = p;	/* for freeing it later */
1167 
1168 	buf->rb_send_bufs = (struct rpcrdma_req **) p;
1169 	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1170 	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1171 	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1172 
1173 	/*
1174 	 * Register the zeroed pad buffer, if any.
1175 	 */
1176 	if (cdata->padding) {
1177 		rc = rpcrdma_register_internal(ia, p, cdata->padding,
1178 					    &ep->rep_pad_mr, &ep->rep_pad);
1179 		if (rc)
1180 			goto out;
1181 	}
1182 	p += cdata->padding;
1183 
1184 	INIT_LIST_HEAD(&buf->rb_mws);
1185 	INIT_LIST_HEAD(&buf->rb_all);
1186 	switch (ia->ri_memreg_strategy) {
1187 	case RPCRDMA_FRMR:
1188 		rc = rpcrdma_init_frmrs(ia, buf);
1189 		if (rc)
1190 			goto out;
1191 		break;
1192 	case RPCRDMA_MTHCAFMR:
1193 		rc = rpcrdma_init_fmrs(ia, buf);
1194 		if (rc)
1195 			goto out;
1196 		break;
1197 	default:
1198 		break;
1199 	}
1200 
1201 	/*
1202 	 * Allocate/init the request/reply buffers. Doing this
1203 	 * using kmalloc for now -- one for each buf.
1204 	 */
1205 	wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req));
1206 	rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep));
1207 	dprintk("RPC:       %s: wlen = %zu, rlen = %zu\n",
1208 		__func__, wlen, rlen);
1209 
1210 	for (i = 0; i < buf->rb_max_requests; i++) {
1211 		struct rpcrdma_req *req;
1212 		struct rpcrdma_rep *rep;
1213 
1214 		req = kmalloc(wlen, GFP_KERNEL);
1215 		if (req == NULL) {
1216 			dprintk("RPC:       %s: request buffer %d alloc"
1217 				" failed\n", __func__, i);
1218 			rc = -ENOMEM;
1219 			goto out;
1220 		}
1221 		memset(req, 0, sizeof(struct rpcrdma_req));
1222 		buf->rb_send_bufs[i] = req;
1223 		buf->rb_send_bufs[i]->rl_buffer = buf;
1224 
1225 		rc = rpcrdma_register_internal(ia, req->rl_base,
1226 				wlen - offsetof(struct rpcrdma_req, rl_base),
1227 				&buf->rb_send_bufs[i]->rl_handle,
1228 				&buf->rb_send_bufs[i]->rl_iov);
1229 		if (rc)
1230 			goto out;
1231 
1232 		buf->rb_send_bufs[i]->rl_size = wlen -
1233 						sizeof(struct rpcrdma_req);
1234 
1235 		rep = kmalloc(rlen, GFP_KERNEL);
1236 		if (rep == NULL) {
1237 			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1238 				__func__, i);
1239 			rc = -ENOMEM;
1240 			goto out;
1241 		}
1242 		memset(rep, 0, sizeof(struct rpcrdma_rep));
1243 		buf->rb_recv_bufs[i] = rep;
1244 		buf->rb_recv_bufs[i]->rr_buffer = buf;
1245 
1246 		rc = rpcrdma_register_internal(ia, rep->rr_base,
1247 				rlen - offsetof(struct rpcrdma_rep, rr_base),
1248 				&buf->rb_recv_bufs[i]->rr_handle,
1249 				&buf->rb_recv_bufs[i]->rr_iov);
1250 		if (rc)
1251 			goto out;
1252 
1253 	}
1254 	dprintk("RPC:       %s: max_requests %d\n",
1255 		__func__, buf->rb_max_requests);
1256 	/* done */
1257 	return 0;
1258 out:
1259 	rpcrdma_buffer_destroy(buf);
1260 	return rc;
1261 }
1262 
1263 static void
1264 rpcrdma_destroy_fmrs(struct rpcrdma_buffer *buf)
1265 {
1266 	struct rpcrdma_mw *r;
1267 	int rc;
1268 
1269 	while (!list_empty(&buf->rb_all)) {
1270 		r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
1271 		list_del(&r->mw_all);
1272 		list_del(&r->mw_list);
1273 
1274 		rc = ib_dealloc_fmr(r->r.fmr);
1275 		if (rc)
1276 			dprintk("RPC:       %s: ib_dealloc_fmr failed %i\n",
1277 				__func__, rc);
1278 
1279 		kfree(r);
1280 	}
1281 }
1282 
1283 static void
1284 rpcrdma_destroy_frmrs(struct rpcrdma_buffer *buf)
1285 {
1286 	struct rpcrdma_mw *r;
1287 	int rc;
1288 
1289 	while (!list_empty(&buf->rb_all)) {
1290 		r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
1291 		list_del(&r->mw_all);
1292 		list_del(&r->mw_list);
1293 
1294 		rc = ib_dereg_mr(r->r.frmr.fr_mr);
1295 		if (rc)
1296 			dprintk("RPC:       %s: ib_dereg_mr failed %i\n",
1297 				__func__, rc);
1298 		ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1299 
1300 		kfree(r);
1301 	}
1302 }
1303 
1304 void
1305 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1306 {
1307 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1308 	int i;
1309 
1310 	/* clean up in reverse order from create
1311 	 *   1.  recv mr memory (mr free, then kfree)
1312 	 *   2.  send mr memory (mr free, then kfree)
1313 	 *   3.  MWs
1314 	 */
1315 	dprintk("RPC:       %s: entering\n", __func__);
1316 
1317 	for (i = 0; i < buf->rb_max_requests; i++) {
1318 		if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1319 			rpcrdma_deregister_internal(ia,
1320 					buf->rb_recv_bufs[i]->rr_handle,
1321 					&buf->rb_recv_bufs[i]->rr_iov);
1322 			kfree(buf->rb_recv_bufs[i]);
1323 		}
1324 		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1325 			rpcrdma_deregister_internal(ia,
1326 					buf->rb_send_bufs[i]->rl_handle,
1327 					&buf->rb_send_bufs[i]->rl_iov);
1328 			kfree(buf->rb_send_bufs[i]);
1329 		}
1330 	}
1331 
1332 	switch (ia->ri_memreg_strategy) {
1333 	case RPCRDMA_FRMR:
1334 		rpcrdma_destroy_frmrs(buf);
1335 		break;
1336 	case RPCRDMA_MTHCAFMR:
1337 		rpcrdma_destroy_fmrs(buf);
1338 		break;
1339 	default:
1340 		break;
1341 	}
1342 
1343 	kfree(buf->rb_pool);
1344 }
1345 
1346 /* After a disconnect, unmap all FMRs.
1347  *
1348  * This is invoked only in the transport connect worker in order
1349  * to serialize with rpcrdma_register_fmr_external().
1350  */
1351 static void
1352 rpcrdma_reset_fmrs(struct rpcrdma_ia *ia)
1353 {
1354 	struct rpcrdma_xprt *r_xprt =
1355 				container_of(ia, struct rpcrdma_xprt, rx_ia);
1356 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1357 	struct list_head *pos;
1358 	struct rpcrdma_mw *r;
1359 	LIST_HEAD(l);
1360 	int rc;
1361 
1362 	list_for_each(pos, &buf->rb_all) {
1363 		r = list_entry(pos, struct rpcrdma_mw, mw_all);
1364 
1365 		INIT_LIST_HEAD(&l);
1366 		list_add(&r->r.fmr->list, &l);
1367 		rc = ib_unmap_fmr(&l);
1368 		if (rc)
1369 			dprintk("RPC:       %s: ib_unmap_fmr failed %i\n",
1370 				__func__, rc);
1371 	}
1372 }
1373 
1374 /* After a disconnect, a flushed FAST_REG_MR can leave an FRMR in
1375  * an unusable state. Find FRMRs in this state and dereg / reg
1376  * each.  FRMRs that are VALID and attached to an rpcrdma_req are
1377  * also torn down.
1378  *
1379  * This gives all in-use FRMRs a fresh rkey and leaves them INVALID.
1380  *
1381  * This is invoked only in the transport connect worker in order
1382  * to serialize with rpcrdma_register_frmr_external().
1383  */
1384 static void
1385 rpcrdma_reset_frmrs(struct rpcrdma_ia *ia)
1386 {
1387 	struct rpcrdma_xprt *r_xprt =
1388 				container_of(ia, struct rpcrdma_xprt, rx_ia);
1389 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1390 	struct list_head *pos;
1391 	struct rpcrdma_mw *r;
1392 	int rc;
1393 
1394 	list_for_each(pos, &buf->rb_all) {
1395 		r = list_entry(pos, struct rpcrdma_mw, mw_all);
1396 
1397 		if (r->r.frmr.fr_state == FRMR_IS_INVALID)
1398 			continue;
1399 
1400 		rc = ib_dereg_mr(r->r.frmr.fr_mr);
1401 		if (rc)
1402 			dprintk("RPC:       %s: ib_dereg_mr failed %i\n",
1403 				__func__, rc);
1404 		ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1405 
1406 		r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1407 					ia->ri_max_frmr_depth);
1408 		if (IS_ERR(r->r.frmr.fr_mr)) {
1409 			rc = PTR_ERR(r->r.frmr.fr_mr);
1410 			dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1411 				" failed %i\n", __func__, rc);
1412 			continue;
1413 		}
1414 		r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1415 					ia->ri_id->device,
1416 					ia->ri_max_frmr_depth);
1417 		if (IS_ERR(r->r.frmr.fr_pgl)) {
1418 			rc = PTR_ERR(r->r.frmr.fr_pgl);
1419 			dprintk("RPC:       %s: "
1420 				"ib_alloc_fast_reg_page_list "
1421 				"failed %i\n", __func__, rc);
1422 
1423 			ib_dereg_mr(r->r.frmr.fr_mr);
1424 			continue;
1425 		}
1426 		r->r.frmr.fr_state = FRMR_IS_INVALID;
1427 	}
1428 }
1429 
1430 /* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1431  * some req segments uninitialized.
1432  */
1433 static void
1434 rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1435 {
1436 	if (*mw) {
1437 		list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1438 		*mw = NULL;
1439 	}
1440 }
1441 
1442 /* Cycle mw's back in reverse order, and "spin" them.
1443  * This delays and scrambles reuse as much as possible.
1444  */
1445 static void
1446 rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1447 {
1448 	struct rpcrdma_mr_seg *seg = req->rl_segments;
1449 	struct rpcrdma_mr_seg *seg1 = seg;
1450 	int i;
1451 
1452 	for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1453 		rpcrdma_buffer_put_mr(&seg->mr_chunk.rl_mw, buf);
1454 	rpcrdma_buffer_put_mr(&seg1->mr_chunk.rl_mw, buf);
1455 }
1456 
1457 static void
1458 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1459 {
1460 	buf->rb_send_bufs[--buf->rb_send_index] = req;
1461 	req->rl_niovs = 0;
1462 	if (req->rl_reply) {
1463 		buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1464 		req->rl_reply->rr_func = NULL;
1465 		req->rl_reply = NULL;
1466 	}
1467 }
1468 
1469 /* rpcrdma_unmap_one() was already done by rpcrdma_deregister_frmr_external().
1470  * Redo only the ib_post_send().
1471  */
1472 static void
1473 rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
1474 {
1475 	struct rpcrdma_xprt *r_xprt =
1476 				container_of(ia, struct rpcrdma_xprt, rx_ia);
1477 	struct ib_send_wr invalidate_wr, *bad_wr;
1478 	int rc;
1479 
1480 	dprintk("RPC:       %s: FRMR %p is stale\n", __func__, r);
1481 
1482 	/* When this FRMR is re-inserted into rb_mws, it is no longer stale */
1483 	r->r.frmr.fr_state = FRMR_IS_INVALID;
1484 
1485 	memset(&invalidate_wr, 0, sizeof(invalidate_wr));
1486 	invalidate_wr.wr_id = (unsigned long)(void *)r;
1487 	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1488 	invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
1489 	DECR_CQCOUNT(&r_xprt->rx_ep);
1490 
1491 	dprintk("RPC:       %s: frmr %p invalidating rkey %08x\n",
1492 		__func__, r, r->r.frmr.fr_mr->rkey);
1493 
1494 	read_lock(&ia->ri_qplock);
1495 	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1496 	read_unlock(&ia->ri_qplock);
1497 	if (rc) {
1498 		/* Force rpcrdma_buffer_get() to retry */
1499 		r->r.frmr.fr_state = FRMR_IS_STALE;
1500 		dprintk("RPC:       %s: ib_post_send failed, %i\n",
1501 			__func__, rc);
1502 	}
1503 }
1504 
1505 static void
1506 rpcrdma_retry_flushed_linv(struct list_head *stale,
1507 			   struct rpcrdma_buffer *buf)
1508 {
1509 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1510 	struct list_head *pos;
1511 	struct rpcrdma_mw *r;
1512 	unsigned long flags;
1513 
1514 	list_for_each(pos, stale) {
1515 		r = list_entry(pos, struct rpcrdma_mw, mw_list);
1516 		rpcrdma_retry_local_inv(r, ia);
1517 	}
1518 
1519 	spin_lock_irqsave(&buf->rb_lock, flags);
1520 	list_splice_tail(stale, &buf->rb_mws);
1521 	spin_unlock_irqrestore(&buf->rb_lock, flags);
1522 }
1523 
1524 static struct rpcrdma_req *
1525 rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
1526 			 struct list_head *stale)
1527 {
1528 	struct rpcrdma_mw *r;
1529 	int i;
1530 
1531 	i = RPCRDMA_MAX_SEGS - 1;
1532 	while (!list_empty(&buf->rb_mws)) {
1533 		r = list_entry(buf->rb_mws.next,
1534 			       struct rpcrdma_mw, mw_list);
1535 		list_del(&r->mw_list);
1536 		if (r->r.frmr.fr_state == FRMR_IS_STALE) {
1537 			list_add(&r->mw_list, stale);
1538 			continue;
1539 		}
1540 		req->rl_segments[i].mr_chunk.rl_mw = r;
1541 		if (unlikely(i-- == 0))
1542 			return req;	/* Success */
1543 	}
1544 
1545 	/* Not enough entries on rb_mws for this req */
1546 	rpcrdma_buffer_put_sendbuf(req, buf);
1547 	rpcrdma_buffer_put_mrs(req, buf);
1548 	return NULL;
1549 }
1550 
1551 static struct rpcrdma_req *
1552 rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1553 {
1554 	struct rpcrdma_mw *r;
1555 	int i;
1556 
1557 	i = RPCRDMA_MAX_SEGS - 1;
1558 	while (!list_empty(&buf->rb_mws)) {
1559 		r = list_entry(buf->rb_mws.next,
1560 			       struct rpcrdma_mw, mw_list);
1561 		list_del(&r->mw_list);
1562 		req->rl_segments[i].mr_chunk.rl_mw = r;
1563 		if (unlikely(i-- == 0))
1564 			return req;	/* Success */
1565 	}
1566 
1567 	/* Not enough entries on rb_mws for this req */
1568 	rpcrdma_buffer_put_sendbuf(req, buf);
1569 	rpcrdma_buffer_put_mrs(req, buf);
1570 	return NULL;
1571 }
1572 
1573 /*
1574  * Get a set of request/reply buffers.
1575  *
1576  * Reply buffer (if needed) is attached to send buffer upon return.
1577  * Rule:
1578  *    rb_send_index and rb_recv_index MUST always be pointing to the
1579  *    *next* available buffer (non-NULL). They are incremented after
1580  *    removing buffers, and decremented *before* returning them.
1581  */
1582 struct rpcrdma_req *
1583 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1584 {
1585 	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1586 	struct list_head stale;
1587 	struct rpcrdma_req *req;
1588 	unsigned long flags;
1589 
1590 	spin_lock_irqsave(&buffers->rb_lock, flags);
1591 	if (buffers->rb_send_index == buffers->rb_max_requests) {
1592 		spin_unlock_irqrestore(&buffers->rb_lock, flags);
1593 		dprintk("RPC:       %s: out of request buffers\n", __func__);
1594 		return ((struct rpcrdma_req *)NULL);
1595 	}
1596 
1597 	req = buffers->rb_send_bufs[buffers->rb_send_index];
1598 	if (buffers->rb_send_index < buffers->rb_recv_index) {
1599 		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1600 			__func__,
1601 			buffers->rb_recv_index - buffers->rb_send_index);
1602 		req->rl_reply = NULL;
1603 	} else {
1604 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1605 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1606 	}
1607 	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1608 
1609 	INIT_LIST_HEAD(&stale);
1610 	switch (ia->ri_memreg_strategy) {
1611 	case RPCRDMA_FRMR:
1612 		req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
1613 		break;
1614 	case RPCRDMA_MTHCAFMR:
1615 		req = rpcrdma_buffer_get_fmrs(req, buffers);
1616 		break;
1617 	default:
1618 		break;
1619 	}
1620 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1621 	if (!list_empty(&stale))
1622 		rpcrdma_retry_flushed_linv(&stale, buffers);
1623 	return req;
1624 }
1625 
1626 /*
1627  * Put request/reply buffers back into pool.
1628  * Pre-decrement counter/array index.
1629  */
1630 void
1631 rpcrdma_buffer_put(struct rpcrdma_req *req)
1632 {
1633 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1634 	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1635 	unsigned long flags;
1636 
1637 	spin_lock_irqsave(&buffers->rb_lock, flags);
1638 	rpcrdma_buffer_put_sendbuf(req, buffers);
1639 	switch (ia->ri_memreg_strategy) {
1640 	case RPCRDMA_FRMR:
1641 	case RPCRDMA_MTHCAFMR:
1642 		rpcrdma_buffer_put_mrs(req, buffers);
1643 		break;
1644 	default:
1645 		break;
1646 	}
1647 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1648 }
1649 
1650 /*
1651  * Recover reply buffers from pool.
1652  * This happens when recovering from error conditions.
1653  * Post-increment counter/array index.
1654  */
1655 void
1656 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1657 {
1658 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1659 	unsigned long flags;
1660 
1661 	if (req->rl_iov.length == 0)	/* special case xprt_rdma_allocate() */
1662 		buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1663 	spin_lock_irqsave(&buffers->rb_lock, flags);
1664 	if (buffers->rb_recv_index < buffers->rb_max_requests) {
1665 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1666 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1667 	}
1668 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1669 }
1670 
1671 /*
1672  * Put reply buffers back into pool when not attached to
1673  * request. This happens in error conditions.
1674  */
1675 void
1676 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1677 {
1678 	struct rpcrdma_buffer *buffers = rep->rr_buffer;
1679 	unsigned long flags;
1680 
1681 	rep->rr_func = NULL;
1682 	spin_lock_irqsave(&buffers->rb_lock, flags);
1683 	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1684 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1685 }
1686 
1687 /*
1688  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1689  */
1690 
1691 int
1692 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1693 				struct ib_mr **mrp, struct ib_sge *iov)
1694 {
1695 	struct ib_phys_buf ipb;
1696 	struct ib_mr *mr;
1697 	int rc;
1698 
1699 	/*
1700 	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1701 	 */
1702 	iov->addr = ib_dma_map_single(ia->ri_id->device,
1703 			va, len, DMA_BIDIRECTIONAL);
1704 	if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1705 		return -ENOMEM;
1706 
1707 	iov->length = len;
1708 
1709 	if (ia->ri_have_dma_lkey) {
1710 		*mrp = NULL;
1711 		iov->lkey = ia->ri_dma_lkey;
1712 		return 0;
1713 	} else if (ia->ri_bind_mem != NULL) {
1714 		*mrp = NULL;
1715 		iov->lkey = ia->ri_bind_mem->lkey;
1716 		return 0;
1717 	}
1718 
1719 	ipb.addr = iov->addr;
1720 	ipb.size = iov->length;
1721 	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1722 			IB_ACCESS_LOCAL_WRITE, &iov->addr);
1723 
1724 	dprintk("RPC:       %s: phys convert: 0x%llx "
1725 			"registered 0x%llx length %d\n",
1726 			__func__, (unsigned long long)ipb.addr,
1727 			(unsigned long long)iov->addr, len);
1728 
1729 	if (IS_ERR(mr)) {
1730 		*mrp = NULL;
1731 		rc = PTR_ERR(mr);
1732 		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1733 	} else {
1734 		*mrp = mr;
1735 		iov->lkey = mr->lkey;
1736 		rc = 0;
1737 	}
1738 
1739 	return rc;
1740 }
1741 
1742 int
1743 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1744 				struct ib_mr *mr, struct ib_sge *iov)
1745 {
1746 	int rc;
1747 
1748 	ib_dma_unmap_single(ia->ri_id->device,
1749 			iov->addr, iov->length, DMA_BIDIRECTIONAL);
1750 
1751 	if (NULL == mr)
1752 		return 0;
1753 
1754 	rc = ib_dereg_mr(mr);
1755 	if (rc)
1756 		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1757 	return rc;
1758 }
1759 
1760 /*
1761  * Wrappers for chunk registration, shared by read/write chunk code.
1762  */
1763 
1764 static void
1765 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1766 {
1767 	seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1768 	seg->mr_dmalen = seg->mr_len;
1769 	if (seg->mr_page)
1770 		seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1771 				seg->mr_page, offset_in_page(seg->mr_offset),
1772 				seg->mr_dmalen, seg->mr_dir);
1773 	else
1774 		seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1775 				seg->mr_offset,
1776 				seg->mr_dmalen, seg->mr_dir);
1777 	if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1778 		dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1779 			__func__,
1780 			(unsigned long long)seg->mr_dma,
1781 			seg->mr_offset, seg->mr_dmalen);
1782 	}
1783 }
1784 
1785 static void
1786 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1787 {
1788 	if (seg->mr_page)
1789 		ib_dma_unmap_page(ia->ri_id->device,
1790 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1791 	else
1792 		ib_dma_unmap_single(ia->ri_id->device,
1793 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1794 }
1795 
1796 static int
1797 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1798 			int *nsegs, int writing, struct rpcrdma_ia *ia,
1799 			struct rpcrdma_xprt *r_xprt)
1800 {
1801 	struct rpcrdma_mr_seg *seg1 = seg;
1802 	struct rpcrdma_mw *mw = seg1->mr_chunk.rl_mw;
1803 	struct rpcrdma_frmr *frmr = &mw->r.frmr;
1804 	struct ib_mr *mr = frmr->fr_mr;
1805 	struct ib_send_wr fastreg_wr, *bad_wr;
1806 	u8 key;
1807 	int len, pageoff;
1808 	int i, rc;
1809 	int seg_len;
1810 	u64 pa;
1811 	int page_no;
1812 
1813 	pageoff = offset_in_page(seg1->mr_offset);
1814 	seg1->mr_offset -= pageoff;	/* start of page */
1815 	seg1->mr_len += pageoff;
1816 	len = -pageoff;
1817 	if (*nsegs > ia->ri_max_frmr_depth)
1818 		*nsegs = ia->ri_max_frmr_depth;
1819 	for (page_no = i = 0; i < *nsegs;) {
1820 		rpcrdma_map_one(ia, seg, writing);
1821 		pa = seg->mr_dma;
1822 		for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1823 			frmr->fr_pgl->page_list[page_no++] = pa;
1824 			pa += PAGE_SIZE;
1825 		}
1826 		len += seg->mr_len;
1827 		++seg;
1828 		++i;
1829 		/* Check for holes */
1830 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1831 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1832 			break;
1833 	}
1834 	dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1835 		__func__, mw, i);
1836 
1837 	frmr->fr_state = FRMR_IS_VALID;
1838 
1839 	memset(&fastreg_wr, 0, sizeof(fastreg_wr));
1840 	fastreg_wr.wr_id = (unsigned long)(void *)mw;
1841 	fastreg_wr.opcode = IB_WR_FAST_REG_MR;
1842 	fastreg_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1843 	fastreg_wr.wr.fast_reg.page_list = frmr->fr_pgl;
1844 	fastreg_wr.wr.fast_reg.page_list_len = page_no;
1845 	fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1846 	fastreg_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1847 	if (fastreg_wr.wr.fast_reg.length < len) {
1848 		rc = -EIO;
1849 		goto out_err;
1850 	}
1851 
1852 	/* Bump the key */
1853 	key = (u8)(mr->rkey & 0x000000FF);
1854 	ib_update_fast_reg_key(mr, ++key);
1855 
1856 	fastreg_wr.wr.fast_reg.access_flags = (writing ?
1857 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1858 				IB_ACCESS_REMOTE_READ);
1859 	fastreg_wr.wr.fast_reg.rkey = mr->rkey;
1860 	DECR_CQCOUNT(&r_xprt->rx_ep);
1861 
1862 	rc = ib_post_send(ia->ri_id->qp, &fastreg_wr, &bad_wr);
1863 	if (rc) {
1864 		dprintk("RPC:       %s: failed ib_post_send for register,"
1865 			" status %i\n", __func__, rc);
1866 		ib_update_fast_reg_key(mr, --key);
1867 		goto out_err;
1868 	} else {
1869 		seg1->mr_rkey = mr->rkey;
1870 		seg1->mr_base = seg1->mr_dma + pageoff;
1871 		seg1->mr_nsegs = i;
1872 		seg1->mr_len = len;
1873 	}
1874 	*nsegs = i;
1875 	return 0;
1876 out_err:
1877 	frmr->fr_state = FRMR_IS_INVALID;
1878 	while (i--)
1879 		rpcrdma_unmap_one(ia, --seg);
1880 	return rc;
1881 }
1882 
1883 static int
1884 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1885 			struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1886 {
1887 	struct rpcrdma_mr_seg *seg1 = seg;
1888 	struct ib_send_wr invalidate_wr, *bad_wr;
1889 	int rc;
1890 
1891 	seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
1892 
1893 	memset(&invalidate_wr, 0, sizeof invalidate_wr);
1894 	invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1895 	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1896 	invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1897 	DECR_CQCOUNT(&r_xprt->rx_ep);
1898 
1899 	read_lock(&ia->ri_qplock);
1900 	while (seg1->mr_nsegs--)
1901 		rpcrdma_unmap_one(ia, seg++);
1902 	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1903 	read_unlock(&ia->ri_qplock);
1904 	if (rc) {
1905 		/* Force rpcrdma_buffer_get() to retry */
1906 		seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
1907 		dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1908 			" status %i\n", __func__, rc);
1909 	}
1910 	return rc;
1911 }
1912 
1913 static int
1914 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1915 			int *nsegs, int writing, struct rpcrdma_ia *ia)
1916 {
1917 	struct rpcrdma_mr_seg *seg1 = seg;
1918 	u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1919 	int len, pageoff, i, rc;
1920 
1921 	pageoff = offset_in_page(seg1->mr_offset);
1922 	seg1->mr_offset -= pageoff;	/* start of page */
1923 	seg1->mr_len += pageoff;
1924 	len = -pageoff;
1925 	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1926 		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1927 	for (i = 0; i < *nsegs;) {
1928 		rpcrdma_map_one(ia, seg, writing);
1929 		physaddrs[i] = seg->mr_dma;
1930 		len += seg->mr_len;
1931 		++seg;
1932 		++i;
1933 		/* Check for holes */
1934 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1935 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1936 			break;
1937 	}
1938 	rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1939 				physaddrs, i, seg1->mr_dma);
1940 	if (rc) {
1941 		dprintk("RPC:       %s: failed ib_map_phys_fmr "
1942 			"%u@0x%llx+%i (%d)... status %i\n", __func__,
1943 			len, (unsigned long long)seg1->mr_dma,
1944 			pageoff, i, rc);
1945 		while (i--)
1946 			rpcrdma_unmap_one(ia, --seg);
1947 	} else {
1948 		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1949 		seg1->mr_base = seg1->mr_dma + pageoff;
1950 		seg1->mr_nsegs = i;
1951 		seg1->mr_len = len;
1952 	}
1953 	*nsegs = i;
1954 	return rc;
1955 }
1956 
1957 static int
1958 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1959 			struct rpcrdma_ia *ia)
1960 {
1961 	struct rpcrdma_mr_seg *seg1 = seg;
1962 	LIST_HEAD(l);
1963 	int rc;
1964 
1965 	list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1966 	rc = ib_unmap_fmr(&l);
1967 	read_lock(&ia->ri_qplock);
1968 	while (seg1->mr_nsegs--)
1969 		rpcrdma_unmap_one(ia, seg++);
1970 	read_unlock(&ia->ri_qplock);
1971 	if (rc)
1972 		dprintk("RPC:       %s: failed ib_unmap_fmr,"
1973 			" status %i\n", __func__, rc);
1974 	return rc;
1975 }
1976 
1977 int
1978 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1979 			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1980 {
1981 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1982 	int rc = 0;
1983 
1984 	switch (ia->ri_memreg_strategy) {
1985 
1986 	case RPCRDMA_ALLPHYSICAL:
1987 		rpcrdma_map_one(ia, seg, writing);
1988 		seg->mr_rkey = ia->ri_bind_mem->rkey;
1989 		seg->mr_base = seg->mr_dma;
1990 		seg->mr_nsegs = 1;
1991 		nsegs = 1;
1992 		break;
1993 
1994 	/* Registration using frmr registration */
1995 	case RPCRDMA_FRMR:
1996 		rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1997 		break;
1998 
1999 	/* Registration using fmr memory registration */
2000 	case RPCRDMA_MTHCAFMR:
2001 		rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
2002 		break;
2003 
2004 	default:
2005 		return -EIO;
2006 	}
2007 	if (rc)
2008 		return rc;
2009 
2010 	return nsegs;
2011 }
2012 
2013 int
2014 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
2015 		struct rpcrdma_xprt *r_xprt)
2016 {
2017 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
2018 	int nsegs = seg->mr_nsegs, rc;
2019 
2020 	switch (ia->ri_memreg_strategy) {
2021 
2022 	case RPCRDMA_ALLPHYSICAL:
2023 		read_lock(&ia->ri_qplock);
2024 		rpcrdma_unmap_one(ia, seg);
2025 		read_unlock(&ia->ri_qplock);
2026 		break;
2027 
2028 	case RPCRDMA_FRMR:
2029 		rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
2030 		break;
2031 
2032 	case RPCRDMA_MTHCAFMR:
2033 		rc = rpcrdma_deregister_fmr_external(seg, ia);
2034 		break;
2035 
2036 	default:
2037 		break;
2038 	}
2039 	return nsegs;
2040 }
2041 
2042 /*
2043  * Prepost any receive buffer, then post send.
2044  *
2045  * Receive buffer is donated to hardware, reclaimed upon recv completion.
2046  */
2047 int
2048 rpcrdma_ep_post(struct rpcrdma_ia *ia,
2049 		struct rpcrdma_ep *ep,
2050 		struct rpcrdma_req *req)
2051 {
2052 	struct ib_send_wr send_wr, *send_wr_fail;
2053 	struct rpcrdma_rep *rep = req->rl_reply;
2054 	int rc;
2055 
2056 	if (rep) {
2057 		rc = rpcrdma_ep_post_recv(ia, ep, rep);
2058 		if (rc)
2059 			goto out;
2060 		req->rl_reply = NULL;
2061 	}
2062 
2063 	send_wr.next = NULL;
2064 	send_wr.wr_id = 0ULL;	/* no send cookie */
2065 	send_wr.sg_list = req->rl_send_iov;
2066 	send_wr.num_sge = req->rl_niovs;
2067 	send_wr.opcode = IB_WR_SEND;
2068 	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
2069 		ib_dma_sync_single_for_device(ia->ri_id->device,
2070 			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
2071 			DMA_TO_DEVICE);
2072 	ib_dma_sync_single_for_device(ia->ri_id->device,
2073 		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
2074 		DMA_TO_DEVICE);
2075 	ib_dma_sync_single_for_device(ia->ri_id->device,
2076 		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
2077 		DMA_TO_DEVICE);
2078 
2079 	if (DECR_CQCOUNT(ep) > 0)
2080 		send_wr.send_flags = 0;
2081 	else { /* Provider must take a send completion every now and then */
2082 		INIT_CQCOUNT(ep);
2083 		send_wr.send_flags = IB_SEND_SIGNALED;
2084 	}
2085 
2086 	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
2087 	if (rc)
2088 		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
2089 			rc);
2090 out:
2091 	return rc;
2092 }
2093 
2094 /*
2095  * (Re)post a receive buffer.
2096  */
2097 int
2098 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
2099 		     struct rpcrdma_ep *ep,
2100 		     struct rpcrdma_rep *rep)
2101 {
2102 	struct ib_recv_wr recv_wr, *recv_wr_fail;
2103 	int rc;
2104 
2105 	recv_wr.next = NULL;
2106 	recv_wr.wr_id = (u64) (unsigned long) rep;
2107 	recv_wr.sg_list = &rep->rr_iov;
2108 	recv_wr.num_sge = 1;
2109 
2110 	ib_dma_sync_single_for_cpu(ia->ri_id->device,
2111 		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
2112 
2113 	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
2114 
2115 	if (rc)
2116 		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
2117 			rc);
2118 	return rc;
2119 }
2120 
2121 /* Physical mapping means one Read/Write list entry per-page.
2122  * All list entries must fit within an inline buffer
2123  *
2124  * NB: The server must return a Write list for NFS READ,
2125  *     which has the same constraint. Factor in the inline
2126  *     rsize as well.
2127  */
2128 static size_t
2129 rpcrdma_physical_max_payload(struct rpcrdma_xprt *r_xprt)
2130 {
2131 	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
2132 	unsigned int inline_size, pages;
2133 
2134 	inline_size = min_t(unsigned int,
2135 			    cdata->inline_wsize, cdata->inline_rsize);
2136 	inline_size -= RPCRDMA_HDRLEN_MIN;
2137 	pages = inline_size / sizeof(struct rpcrdma_segment);
2138 	return pages << PAGE_SHIFT;
2139 }
2140 
2141 static size_t
2142 rpcrdma_mr_max_payload(struct rpcrdma_xprt *r_xprt)
2143 {
2144 	return RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT;
2145 }
2146 
2147 size_t
2148 rpcrdma_max_payload(struct rpcrdma_xprt *r_xprt)
2149 {
2150 	size_t result;
2151 
2152 	switch (r_xprt->rx_ia.ri_memreg_strategy) {
2153 	case RPCRDMA_ALLPHYSICAL:
2154 		result = rpcrdma_physical_max_payload(r_xprt);
2155 		break;
2156 	default:
2157 		result = rpcrdma_mr_max_payload(r_xprt);
2158 	}
2159 	return result;
2160 }
2161