xref: /openbmc/linux/net/sunrpc/xprtrdma/verbs.c (revision 6774def6)
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39 
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49 
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <asm/bitops.h>
53 
54 #include "xprt_rdma.h"
55 
56 /*
57  * Globals/Macros
58  */
59 
60 #ifdef RPC_DEBUG
61 # define RPCDBG_FACILITY	RPCDBG_TRANS
62 #endif
63 
64 static void rpcrdma_reset_frmrs(struct rpcrdma_ia *);
65 
66 /*
67  * internal functions
68  */
69 
70 /*
71  * handle replies in tasklet context, using a single, global list
72  * rdma tasklet function -- just turn around and call the func
73  * for all replies on the list
74  */
75 
76 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
77 static LIST_HEAD(rpcrdma_tasklets_g);
78 
79 static void
80 rpcrdma_run_tasklet(unsigned long data)
81 {
82 	struct rpcrdma_rep *rep;
83 	void (*func)(struct rpcrdma_rep *);
84 	unsigned long flags;
85 
86 	data = data;
87 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
88 	while (!list_empty(&rpcrdma_tasklets_g)) {
89 		rep = list_entry(rpcrdma_tasklets_g.next,
90 				 struct rpcrdma_rep, rr_list);
91 		list_del(&rep->rr_list);
92 		func = rep->rr_func;
93 		rep->rr_func = NULL;
94 		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
95 
96 		if (func)
97 			func(rep);
98 		else
99 			rpcrdma_recv_buffer_put(rep);
100 
101 		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
102 	}
103 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
104 }
105 
106 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
107 
108 static void
109 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
110 {
111 	struct rpcrdma_ep *ep = context;
112 
113 	dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
114 		__func__, event->event, event->device->name, context);
115 	if (ep->rep_connected == 1) {
116 		ep->rep_connected = -EIO;
117 		ep->rep_func(ep);
118 		wake_up_all(&ep->rep_connect_wait);
119 	}
120 }
121 
122 static void
123 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
124 {
125 	struct rpcrdma_ep *ep = context;
126 
127 	dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
128 		__func__, event->event, event->device->name, context);
129 	if (ep->rep_connected == 1) {
130 		ep->rep_connected = -EIO;
131 		ep->rep_func(ep);
132 		wake_up_all(&ep->rep_connect_wait);
133 	}
134 }
135 
136 static void
137 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
138 {
139 	struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
140 
141 	dprintk("RPC:       %s: frmr %p status %X opcode %d\n",
142 		__func__, frmr, wc->status, wc->opcode);
143 
144 	if (wc->wr_id == 0ULL)
145 		return;
146 	if (wc->status != IB_WC_SUCCESS)
147 		frmr->r.frmr.fr_state = FRMR_IS_STALE;
148 }
149 
150 static int
151 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
152 {
153 	struct ib_wc *wcs;
154 	int budget, count, rc;
155 
156 	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
157 	do {
158 		wcs = ep->rep_send_wcs;
159 
160 		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
161 		if (rc <= 0)
162 			return rc;
163 
164 		count = rc;
165 		while (count-- > 0)
166 			rpcrdma_sendcq_process_wc(wcs++);
167 	} while (rc == RPCRDMA_POLLSIZE && --budget);
168 	return 0;
169 }
170 
171 /*
172  * Handle send, fast_reg_mr, and local_inv completions.
173  *
174  * Send events are typically suppressed and thus do not result
175  * in an upcall. Occasionally one is signaled, however. This
176  * prevents the provider's completion queue from wrapping and
177  * losing a completion.
178  */
179 static void
180 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
181 {
182 	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
183 	int rc;
184 
185 	rc = rpcrdma_sendcq_poll(cq, ep);
186 	if (rc) {
187 		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
188 			__func__, rc);
189 		return;
190 	}
191 
192 	rc = ib_req_notify_cq(cq,
193 			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
194 	if (rc == 0)
195 		return;
196 	if (rc < 0) {
197 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
198 			__func__, rc);
199 		return;
200 	}
201 
202 	rpcrdma_sendcq_poll(cq, ep);
203 }
204 
205 static void
206 rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
207 {
208 	struct rpcrdma_rep *rep =
209 			(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
210 
211 	dprintk("RPC:       %s: rep %p status %X opcode %X length %u\n",
212 		__func__, rep, wc->status, wc->opcode, wc->byte_len);
213 
214 	if (wc->status != IB_WC_SUCCESS) {
215 		rep->rr_len = ~0U;
216 		goto out_schedule;
217 	}
218 	if (wc->opcode != IB_WC_RECV)
219 		return;
220 
221 	rep->rr_len = wc->byte_len;
222 	ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
223 			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
224 
225 	if (rep->rr_len >= 16) {
226 		struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
227 		unsigned int credits = ntohl(p->rm_credit);
228 
229 		if (credits == 0)
230 			credits = 1;	/* don't deadlock */
231 		else if (credits > rep->rr_buffer->rb_max_requests)
232 			credits = rep->rr_buffer->rb_max_requests;
233 		atomic_set(&rep->rr_buffer->rb_credits, credits);
234 	}
235 
236 out_schedule:
237 	list_add_tail(&rep->rr_list, sched_list);
238 }
239 
240 static int
241 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
242 {
243 	struct list_head sched_list;
244 	struct ib_wc *wcs;
245 	int budget, count, rc;
246 	unsigned long flags;
247 
248 	INIT_LIST_HEAD(&sched_list);
249 	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
250 	do {
251 		wcs = ep->rep_recv_wcs;
252 
253 		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
254 		if (rc <= 0)
255 			goto out_schedule;
256 
257 		count = rc;
258 		while (count-- > 0)
259 			rpcrdma_recvcq_process_wc(wcs++, &sched_list);
260 	} while (rc == RPCRDMA_POLLSIZE && --budget);
261 	rc = 0;
262 
263 out_schedule:
264 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
265 	list_splice_tail(&sched_list, &rpcrdma_tasklets_g);
266 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
267 	tasklet_schedule(&rpcrdma_tasklet_g);
268 	return rc;
269 }
270 
271 /*
272  * Handle receive completions.
273  *
274  * It is reentrant but processes single events in order to maintain
275  * ordering of receives to keep server credits.
276  *
277  * It is the responsibility of the scheduled tasklet to return
278  * recv buffers to the pool. NOTE: this affects synchronization of
279  * connection shutdown. That is, the structures required for
280  * the completion of the reply handler must remain intact until
281  * all memory has been reclaimed.
282  */
283 static void
284 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
285 {
286 	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
287 	int rc;
288 
289 	rc = rpcrdma_recvcq_poll(cq, ep);
290 	if (rc) {
291 		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
292 			__func__, rc);
293 		return;
294 	}
295 
296 	rc = ib_req_notify_cq(cq,
297 			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
298 	if (rc == 0)
299 		return;
300 	if (rc < 0) {
301 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
302 			__func__, rc);
303 		return;
304 	}
305 
306 	rpcrdma_recvcq_poll(cq, ep);
307 }
308 
309 static void
310 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
311 {
312 	rpcrdma_recvcq_upcall(ep->rep_attr.recv_cq, ep);
313 	rpcrdma_sendcq_upcall(ep->rep_attr.send_cq, ep);
314 }
315 
316 #ifdef RPC_DEBUG
317 static const char * const conn[] = {
318 	"address resolved",
319 	"address error",
320 	"route resolved",
321 	"route error",
322 	"connect request",
323 	"connect response",
324 	"connect error",
325 	"unreachable",
326 	"rejected",
327 	"established",
328 	"disconnected",
329 	"device removal",
330 	"multicast join",
331 	"multicast error",
332 	"address change",
333 	"timewait exit",
334 };
335 
336 #define CONNECTION_MSG(status)						\
337 	((status) < ARRAY_SIZE(conn) ?					\
338 		conn[(status)] : "unrecognized connection error")
339 #endif
340 
341 static int
342 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
343 {
344 	struct rpcrdma_xprt *xprt = id->context;
345 	struct rpcrdma_ia *ia = &xprt->rx_ia;
346 	struct rpcrdma_ep *ep = &xprt->rx_ep;
347 #ifdef RPC_DEBUG
348 	struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
349 #endif
350 	struct ib_qp_attr attr;
351 	struct ib_qp_init_attr iattr;
352 	int connstate = 0;
353 
354 	switch (event->event) {
355 	case RDMA_CM_EVENT_ADDR_RESOLVED:
356 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
357 		ia->ri_async_rc = 0;
358 		complete(&ia->ri_done);
359 		break;
360 	case RDMA_CM_EVENT_ADDR_ERROR:
361 		ia->ri_async_rc = -EHOSTUNREACH;
362 		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
363 			__func__, ep);
364 		complete(&ia->ri_done);
365 		break;
366 	case RDMA_CM_EVENT_ROUTE_ERROR:
367 		ia->ri_async_rc = -ENETUNREACH;
368 		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
369 			__func__, ep);
370 		complete(&ia->ri_done);
371 		break;
372 	case RDMA_CM_EVENT_ESTABLISHED:
373 		connstate = 1;
374 		ib_query_qp(ia->ri_id->qp, &attr,
375 			IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
376 			&iattr);
377 		dprintk("RPC:       %s: %d responder resources"
378 			" (%d initiator)\n",
379 			__func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
380 		goto connected;
381 	case RDMA_CM_EVENT_CONNECT_ERROR:
382 		connstate = -ENOTCONN;
383 		goto connected;
384 	case RDMA_CM_EVENT_UNREACHABLE:
385 		connstate = -ENETDOWN;
386 		goto connected;
387 	case RDMA_CM_EVENT_REJECTED:
388 		connstate = -ECONNREFUSED;
389 		goto connected;
390 	case RDMA_CM_EVENT_DISCONNECTED:
391 		connstate = -ECONNABORTED;
392 		goto connected;
393 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
394 		connstate = -ENODEV;
395 connected:
396 		atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
397 		dprintk("RPC:       %s: %sconnected\n",
398 					__func__, connstate > 0 ? "" : "dis");
399 		ep->rep_connected = connstate;
400 		ep->rep_func(ep);
401 		wake_up_all(&ep->rep_connect_wait);
402 		/*FALLTHROUGH*/
403 	default:
404 		dprintk("RPC:       %s: %pI4:%u (ep 0x%p): %s\n",
405 			__func__, &addr->sin_addr.s_addr,
406 			ntohs(addr->sin_port), ep,
407 			CONNECTION_MSG(event->event));
408 		break;
409 	}
410 
411 #ifdef RPC_DEBUG
412 	if (connstate == 1) {
413 		int ird = attr.max_dest_rd_atomic;
414 		int tird = ep->rep_remote_cma.responder_resources;
415 		printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
416 			"on %s, memreg %d slots %d ird %d%s\n",
417 			&addr->sin_addr.s_addr,
418 			ntohs(addr->sin_port),
419 			ia->ri_id->device->name,
420 			ia->ri_memreg_strategy,
421 			xprt->rx_buf.rb_max_requests,
422 			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
423 	} else if (connstate < 0) {
424 		printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
425 			&addr->sin_addr.s_addr,
426 			ntohs(addr->sin_port),
427 			connstate);
428 	}
429 #endif
430 
431 	return 0;
432 }
433 
434 static struct rdma_cm_id *
435 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
436 			struct rpcrdma_ia *ia, struct sockaddr *addr)
437 {
438 	struct rdma_cm_id *id;
439 	int rc;
440 
441 	init_completion(&ia->ri_done);
442 
443 	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
444 	if (IS_ERR(id)) {
445 		rc = PTR_ERR(id);
446 		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
447 			__func__, rc);
448 		return id;
449 	}
450 
451 	ia->ri_async_rc = -ETIMEDOUT;
452 	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
453 	if (rc) {
454 		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
455 			__func__, rc);
456 		goto out;
457 	}
458 	wait_for_completion_interruptible_timeout(&ia->ri_done,
459 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
460 	rc = ia->ri_async_rc;
461 	if (rc)
462 		goto out;
463 
464 	ia->ri_async_rc = -ETIMEDOUT;
465 	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
466 	if (rc) {
467 		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
468 			__func__, rc);
469 		goto out;
470 	}
471 	wait_for_completion_interruptible_timeout(&ia->ri_done,
472 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
473 	rc = ia->ri_async_rc;
474 	if (rc)
475 		goto out;
476 
477 	return id;
478 
479 out:
480 	rdma_destroy_id(id);
481 	return ERR_PTR(rc);
482 }
483 
484 /*
485  * Drain any cq, prior to teardown.
486  */
487 static void
488 rpcrdma_clean_cq(struct ib_cq *cq)
489 {
490 	struct ib_wc wc;
491 	int count = 0;
492 
493 	while (1 == ib_poll_cq(cq, 1, &wc))
494 		++count;
495 
496 	if (count)
497 		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
498 			__func__, count, wc.opcode);
499 }
500 
501 /*
502  * Exported functions.
503  */
504 
505 /*
506  * Open and initialize an Interface Adapter.
507  *  o initializes fields of struct rpcrdma_ia, including
508  *    interface and provider attributes and protection zone.
509  */
510 int
511 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
512 {
513 	int rc, mem_priv;
514 	struct ib_device_attr devattr;
515 	struct rpcrdma_ia *ia = &xprt->rx_ia;
516 
517 	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
518 	if (IS_ERR(ia->ri_id)) {
519 		rc = PTR_ERR(ia->ri_id);
520 		goto out1;
521 	}
522 
523 	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
524 	if (IS_ERR(ia->ri_pd)) {
525 		rc = PTR_ERR(ia->ri_pd);
526 		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
527 			__func__, rc);
528 		goto out2;
529 	}
530 
531 	/*
532 	 * Query the device to determine if the requested memory
533 	 * registration strategy is supported. If it isn't, set the
534 	 * strategy to a globally supported model.
535 	 */
536 	rc = ib_query_device(ia->ri_id->device, &devattr);
537 	if (rc) {
538 		dprintk("RPC:       %s: ib_query_device failed %d\n",
539 			__func__, rc);
540 		goto out2;
541 	}
542 
543 	if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
544 		ia->ri_have_dma_lkey = 1;
545 		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
546 	}
547 
548 	if (memreg == RPCRDMA_FRMR) {
549 		/* Requires both frmr reg and local dma lkey */
550 		if ((devattr.device_cap_flags &
551 		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
552 		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
553 			dprintk("RPC:       %s: FRMR registration "
554 				"not supported by HCA\n", __func__);
555 			memreg = RPCRDMA_MTHCAFMR;
556 		} else {
557 			/* Mind the ia limit on FRMR page list depth */
558 			ia->ri_max_frmr_depth = min_t(unsigned int,
559 				RPCRDMA_MAX_DATA_SEGS,
560 				devattr.max_fast_reg_page_list_len);
561 		}
562 	}
563 	if (memreg == RPCRDMA_MTHCAFMR) {
564 		if (!ia->ri_id->device->alloc_fmr) {
565 			dprintk("RPC:       %s: MTHCAFMR registration "
566 				"not supported by HCA\n", __func__);
567 			memreg = RPCRDMA_ALLPHYSICAL;
568 		}
569 	}
570 
571 	/*
572 	 * Optionally obtain an underlying physical identity mapping in
573 	 * order to do a memory window-based bind. This base registration
574 	 * is protected from remote access - that is enabled only by binding
575 	 * for the specific bytes targeted during each RPC operation, and
576 	 * revoked after the corresponding completion similar to a storage
577 	 * adapter.
578 	 */
579 	switch (memreg) {
580 	case RPCRDMA_FRMR:
581 		break;
582 	case RPCRDMA_ALLPHYSICAL:
583 		mem_priv = IB_ACCESS_LOCAL_WRITE |
584 				IB_ACCESS_REMOTE_WRITE |
585 				IB_ACCESS_REMOTE_READ;
586 		goto register_setup;
587 	case RPCRDMA_MTHCAFMR:
588 		if (ia->ri_have_dma_lkey)
589 			break;
590 		mem_priv = IB_ACCESS_LOCAL_WRITE;
591 	register_setup:
592 		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
593 		if (IS_ERR(ia->ri_bind_mem)) {
594 			printk(KERN_ALERT "%s: ib_get_dma_mr for "
595 				"phys register failed with %lX\n",
596 				__func__, PTR_ERR(ia->ri_bind_mem));
597 			rc = -ENOMEM;
598 			goto out2;
599 		}
600 		break;
601 	default:
602 		printk(KERN_ERR "RPC: Unsupported memory "
603 				"registration mode: %d\n", memreg);
604 		rc = -ENOMEM;
605 		goto out2;
606 	}
607 	dprintk("RPC:       %s: memory registration strategy is %d\n",
608 		__func__, memreg);
609 
610 	/* Else will do memory reg/dereg for each chunk */
611 	ia->ri_memreg_strategy = memreg;
612 
613 	rwlock_init(&ia->ri_qplock);
614 	return 0;
615 out2:
616 	rdma_destroy_id(ia->ri_id);
617 	ia->ri_id = NULL;
618 out1:
619 	return rc;
620 }
621 
622 /*
623  * Clean up/close an IA.
624  *   o if event handles and PD have been initialized, free them.
625  *   o close the IA
626  */
627 void
628 rpcrdma_ia_close(struct rpcrdma_ia *ia)
629 {
630 	int rc;
631 
632 	dprintk("RPC:       %s: entering\n", __func__);
633 	if (ia->ri_bind_mem != NULL) {
634 		rc = ib_dereg_mr(ia->ri_bind_mem);
635 		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
636 			__func__, rc);
637 	}
638 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
639 		if (ia->ri_id->qp)
640 			rdma_destroy_qp(ia->ri_id);
641 		rdma_destroy_id(ia->ri_id);
642 		ia->ri_id = NULL;
643 	}
644 	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
645 		rc = ib_dealloc_pd(ia->ri_pd);
646 		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
647 			__func__, rc);
648 	}
649 }
650 
651 /*
652  * Create unconnected endpoint.
653  */
654 int
655 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
656 				struct rpcrdma_create_data_internal *cdata)
657 {
658 	struct ib_device_attr devattr;
659 	struct ib_cq *sendcq, *recvcq;
660 	int rc, err;
661 
662 	rc = ib_query_device(ia->ri_id->device, &devattr);
663 	if (rc) {
664 		dprintk("RPC:       %s: ib_query_device failed %d\n",
665 			__func__, rc);
666 		return rc;
667 	}
668 
669 	/* check provider's send/recv wr limits */
670 	if (cdata->max_requests > devattr.max_qp_wr)
671 		cdata->max_requests = devattr.max_qp_wr;
672 
673 	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
674 	ep->rep_attr.qp_context = ep;
675 	/* send_cq and recv_cq initialized below */
676 	ep->rep_attr.srq = NULL;
677 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
678 	switch (ia->ri_memreg_strategy) {
679 	case RPCRDMA_FRMR: {
680 		int depth = 7;
681 
682 		/* Add room for frmr register and invalidate WRs.
683 		 * 1. FRMR reg WR for head
684 		 * 2. FRMR invalidate WR for head
685 		 * 3. N FRMR reg WRs for pagelist
686 		 * 4. N FRMR invalidate WRs for pagelist
687 		 * 5. FRMR reg WR for tail
688 		 * 6. FRMR invalidate WR for tail
689 		 * 7. The RDMA_SEND WR
690 		 */
691 
692 		/* Calculate N if the device max FRMR depth is smaller than
693 		 * RPCRDMA_MAX_DATA_SEGS.
694 		 */
695 		if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
696 			int delta = RPCRDMA_MAX_DATA_SEGS -
697 				    ia->ri_max_frmr_depth;
698 
699 			do {
700 				depth += 2; /* FRMR reg + invalidate */
701 				delta -= ia->ri_max_frmr_depth;
702 			} while (delta > 0);
703 
704 		}
705 		ep->rep_attr.cap.max_send_wr *= depth;
706 		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
707 			cdata->max_requests = devattr.max_qp_wr / depth;
708 			if (!cdata->max_requests)
709 				return -EINVAL;
710 			ep->rep_attr.cap.max_send_wr = cdata->max_requests *
711 						       depth;
712 		}
713 		break;
714 	}
715 	default:
716 		break;
717 	}
718 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
719 	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
720 	ep->rep_attr.cap.max_recv_sge = 1;
721 	ep->rep_attr.cap.max_inline_data = 0;
722 	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
723 	ep->rep_attr.qp_type = IB_QPT_RC;
724 	ep->rep_attr.port_num = ~0;
725 
726 	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
727 		"iovs: send %d recv %d\n",
728 		__func__,
729 		ep->rep_attr.cap.max_send_wr,
730 		ep->rep_attr.cap.max_recv_wr,
731 		ep->rep_attr.cap.max_send_sge,
732 		ep->rep_attr.cap.max_recv_sge);
733 
734 	/* set trigger for requesting send completion */
735 	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
736 	if (ep->rep_cqinit <= 2)
737 		ep->rep_cqinit = 0;
738 	INIT_CQCOUNT(ep);
739 	ep->rep_ia = ia;
740 	init_waitqueue_head(&ep->rep_connect_wait);
741 	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
742 
743 	sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
744 				  rpcrdma_cq_async_error_upcall, ep,
745 				  ep->rep_attr.cap.max_send_wr + 1, 0);
746 	if (IS_ERR(sendcq)) {
747 		rc = PTR_ERR(sendcq);
748 		dprintk("RPC:       %s: failed to create send CQ: %i\n",
749 			__func__, rc);
750 		goto out1;
751 	}
752 
753 	rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
754 	if (rc) {
755 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
756 			__func__, rc);
757 		goto out2;
758 	}
759 
760 	recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
761 				  rpcrdma_cq_async_error_upcall, ep,
762 				  ep->rep_attr.cap.max_recv_wr + 1, 0);
763 	if (IS_ERR(recvcq)) {
764 		rc = PTR_ERR(recvcq);
765 		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
766 			__func__, rc);
767 		goto out2;
768 	}
769 
770 	rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
771 	if (rc) {
772 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
773 			__func__, rc);
774 		ib_destroy_cq(recvcq);
775 		goto out2;
776 	}
777 
778 	ep->rep_attr.send_cq = sendcq;
779 	ep->rep_attr.recv_cq = recvcq;
780 
781 	/* Initialize cma parameters */
782 
783 	/* RPC/RDMA does not use private data */
784 	ep->rep_remote_cma.private_data = NULL;
785 	ep->rep_remote_cma.private_data_len = 0;
786 
787 	/* Client offers RDMA Read but does not initiate */
788 	ep->rep_remote_cma.initiator_depth = 0;
789 	if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
790 		ep->rep_remote_cma.responder_resources = 32;
791 	else
792 		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
793 
794 	ep->rep_remote_cma.retry_count = 7;
795 	ep->rep_remote_cma.flow_control = 0;
796 	ep->rep_remote_cma.rnr_retry_count = 0;
797 
798 	return 0;
799 
800 out2:
801 	err = ib_destroy_cq(sendcq);
802 	if (err)
803 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
804 			__func__, err);
805 out1:
806 	return rc;
807 }
808 
809 /*
810  * rpcrdma_ep_destroy
811  *
812  * Disconnect and destroy endpoint. After this, the only
813  * valid operations on the ep are to free it (if dynamically
814  * allocated) or re-create it.
815  */
816 void
817 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
818 {
819 	int rc;
820 
821 	dprintk("RPC:       %s: entering, connected is %d\n",
822 		__func__, ep->rep_connected);
823 
824 	cancel_delayed_work_sync(&ep->rep_connect_worker);
825 
826 	if (ia->ri_id->qp) {
827 		rpcrdma_ep_disconnect(ep, ia);
828 		rdma_destroy_qp(ia->ri_id);
829 		ia->ri_id->qp = NULL;
830 	}
831 
832 	/* padding - could be done in rpcrdma_buffer_destroy... */
833 	if (ep->rep_pad_mr) {
834 		rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
835 		ep->rep_pad_mr = NULL;
836 	}
837 
838 	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
839 	rc = ib_destroy_cq(ep->rep_attr.recv_cq);
840 	if (rc)
841 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
842 			__func__, rc);
843 
844 	rpcrdma_clean_cq(ep->rep_attr.send_cq);
845 	rc = ib_destroy_cq(ep->rep_attr.send_cq);
846 	if (rc)
847 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
848 			__func__, rc);
849 }
850 
851 /*
852  * Connect unconnected endpoint.
853  */
854 int
855 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
856 {
857 	struct rdma_cm_id *id, *old;
858 	int rc = 0;
859 	int retry_count = 0;
860 
861 	if (ep->rep_connected != 0) {
862 		struct rpcrdma_xprt *xprt;
863 retry:
864 		dprintk("RPC:       %s: reconnecting...\n", __func__);
865 
866 		rpcrdma_ep_disconnect(ep, ia);
867 		rpcrdma_flush_cqs(ep);
868 
869 		if (ia->ri_memreg_strategy == RPCRDMA_FRMR)
870 			rpcrdma_reset_frmrs(ia);
871 
872 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
873 		id = rpcrdma_create_id(xprt, ia,
874 				(struct sockaddr *)&xprt->rx_data.addr);
875 		if (IS_ERR(id)) {
876 			rc = -EHOSTUNREACH;
877 			goto out;
878 		}
879 		/* TEMP TEMP TEMP - fail if new device:
880 		 * Deregister/remarshal *all* requests!
881 		 * Close and recreate adapter, pd, etc!
882 		 * Re-determine all attributes still sane!
883 		 * More stuff I haven't thought of!
884 		 * Rrrgh!
885 		 */
886 		if (ia->ri_id->device != id->device) {
887 			printk("RPC:       %s: can't reconnect on "
888 				"different device!\n", __func__);
889 			rdma_destroy_id(id);
890 			rc = -ENETUNREACH;
891 			goto out;
892 		}
893 		/* END TEMP */
894 		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
895 		if (rc) {
896 			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
897 				__func__, rc);
898 			rdma_destroy_id(id);
899 			rc = -ENETUNREACH;
900 			goto out;
901 		}
902 
903 		write_lock(&ia->ri_qplock);
904 		old = ia->ri_id;
905 		ia->ri_id = id;
906 		write_unlock(&ia->ri_qplock);
907 
908 		rdma_destroy_qp(old);
909 		rdma_destroy_id(old);
910 	} else {
911 		dprintk("RPC:       %s: connecting...\n", __func__);
912 		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
913 		if (rc) {
914 			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
915 				__func__, rc);
916 			/* do not update ep->rep_connected */
917 			return -ENETUNREACH;
918 		}
919 	}
920 
921 	ep->rep_connected = 0;
922 
923 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
924 	if (rc) {
925 		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
926 				__func__, rc);
927 		goto out;
928 	}
929 
930 	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
931 
932 	/*
933 	 * Check state. A non-peer reject indicates no listener
934 	 * (ECONNREFUSED), which may be a transient state. All
935 	 * others indicate a transport condition which has already
936 	 * undergone a best-effort.
937 	 */
938 	if (ep->rep_connected == -ECONNREFUSED &&
939 	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
940 		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
941 		goto retry;
942 	}
943 	if (ep->rep_connected <= 0) {
944 		/* Sometimes, the only way to reliably connect to remote
945 		 * CMs is to use same nonzero values for ORD and IRD. */
946 		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
947 		    (ep->rep_remote_cma.responder_resources == 0 ||
948 		     ep->rep_remote_cma.initiator_depth !=
949 				ep->rep_remote_cma.responder_resources)) {
950 			if (ep->rep_remote_cma.responder_resources == 0)
951 				ep->rep_remote_cma.responder_resources = 1;
952 			ep->rep_remote_cma.initiator_depth =
953 				ep->rep_remote_cma.responder_resources;
954 			goto retry;
955 		}
956 		rc = ep->rep_connected;
957 	} else {
958 		dprintk("RPC:       %s: connected\n", __func__);
959 	}
960 
961 out:
962 	if (rc)
963 		ep->rep_connected = rc;
964 	return rc;
965 }
966 
967 /*
968  * rpcrdma_ep_disconnect
969  *
970  * This is separate from destroy to facilitate the ability
971  * to reconnect without recreating the endpoint.
972  *
973  * This call is not reentrant, and must not be made in parallel
974  * on the same endpoint.
975  */
976 void
977 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
978 {
979 	int rc;
980 
981 	rpcrdma_flush_cqs(ep);
982 	rc = rdma_disconnect(ia->ri_id);
983 	if (!rc) {
984 		/* returns without wait if not connected */
985 		wait_event_interruptible(ep->rep_connect_wait,
986 							ep->rep_connected != 1);
987 		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
988 			(ep->rep_connected == 1) ? "still " : "dis");
989 	} else {
990 		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
991 		ep->rep_connected = rc;
992 	}
993 }
994 
995 static int
996 rpcrdma_init_fmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
997 {
998 	int mr_access_flags = IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ;
999 	struct ib_fmr_attr fmr_attr = {
1000 		.max_pages	= RPCRDMA_MAX_DATA_SEGS,
1001 		.max_maps	= 1,
1002 		.page_shift	= PAGE_SHIFT
1003 	};
1004 	struct rpcrdma_mw *r;
1005 	int i, rc;
1006 
1007 	i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
1008 	dprintk("RPC:       %s: initalizing %d FMRs\n", __func__, i);
1009 
1010 	while (i--) {
1011 		r = kzalloc(sizeof(*r), GFP_KERNEL);
1012 		if (r == NULL)
1013 			return -ENOMEM;
1014 
1015 		r->r.fmr = ib_alloc_fmr(ia->ri_pd, mr_access_flags, &fmr_attr);
1016 		if (IS_ERR(r->r.fmr)) {
1017 			rc = PTR_ERR(r->r.fmr);
1018 			dprintk("RPC:       %s: ib_alloc_fmr failed %i\n",
1019 				__func__, rc);
1020 			goto out_free;
1021 		}
1022 
1023 		list_add(&r->mw_list, &buf->rb_mws);
1024 		list_add(&r->mw_all, &buf->rb_all);
1025 	}
1026 	return 0;
1027 
1028 out_free:
1029 	kfree(r);
1030 	return rc;
1031 }
1032 
1033 static int
1034 rpcrdma_init_frmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
1035 {
1036 	struct rpcrdma_frmr *f;
1037 	struct rpcrdma_mw *r;
1038 	int i, rc;
1039 
1040 	i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
1041 	dprintk("RPC:       %s: initalizing %d FRMRs\n", __func__, i);
1042 
1043 	while (i--) {
1044 		r = kzalloc(sizeof(*r), GFP_KERNEL);
1045 		if (r == NULL)
1046 			return -ENOMEM;
1047 		f = &r->r.frmr;
1048 
1049 		f->fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1050 						ia->ri_max_frmr_depth);
1051 		if (IS_ERR(f->fr_mr)) {
1052 			rc = PTR_ERR(f->fr_mr);
1053 			dprintk("RPC:       %s: ib_alloc_fast_reg_mr "
1054 				"failed %i\n", __func__, rc);
1055 			goto out_free;
1056 		}
1057 
1058 		f->fr_pgl = ib_alloc_fast_reg_page_list(ia->ri_id->device,
1059 							ia->ri_max_frmr_depth);
1060 		if (IS_ERR(f->fr_pgl)) {
1061 			rc = PTR_ERR(f->fr_pgl);
1062 			dprintk("RPC:       %s: ib_alloc_fast_reg_page_list "
1063 				"failed %i\n", __func__, rc);
1064 
1065 			ib_dereg_mr(f->fr_mr);
1066 			goto out_free;
1067 		}
1068 
1069 		list_add(&r->mw_list, &buf->rb_mws);
1070 		list_add(&r->mw_all, &buf->rb_all);
1071 	}
1072 
1073 	return 0;
1074 
1075 out_free:
1076 	kfree(r);
1077 	return rc;
1078 }
1079 
1080 int
1081 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
1082 	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
1083 {
1084 	char *p;
1085 	size_t len, rlen, wlen;
1086 	int i, rc;
1087 
1088 	buf->rb_max_requests = cdata->max_requests;
1089 	spin_lock_init(&buf->rb_lock);
1090 	atomic_set(&buf->rb_credits, 1);
1091 
1092 	/* Need to allocate:
1093 	 *   1.  arrays for send and recv pointers
1094 	 *   2.  arrays of struct rpcrdma_req to fill in pointers
1095 	 *   3.  array of struct rpcrdma_rep for replies
1096 	 *   4.  padding, if any
1097 	 * Send/recv buffers in req/rep need to be registered
1098 	 */
1099 	len = buf->rb_max_requests *
1100 		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1101 	len += cdata->padding;
1102 
1103 	p = kzalloc(len, GFP_KERNEL);
1104 	if (p == NULL) {
1105 		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1106 			__func__, len);
1107 		rc = -ENOMEM;
1108 		goto out;
1109 	}
1110 	buf->rb_pool = p;	/* for freeing it later */
1111 
1112 	buf->rb_send_bufs = (struct rpcrdma_req **) p;
1113 	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1114 	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1115 	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1116 
1117 	/*
1118 	 * Register the zeroed pad buffer, if any.
1119 	 */
1120 	if (cdata->padding) {
1121 		rc = rpcrdma_register_internal(ia, p, cdata->padding,
1122 					    &ep->rep_pad_mr, &ep->rep_pad);
1123 		if (rc)
1124 			goto out;
1125 	}
1126 	p += cdata->padding;
1127 
1128 	INIT_LIST_HEAD(&buf->rb_mws);
1129 	INIT_LIST_HEAD(&buf->rb_all);
1130 	switch (ia->ri_memreg_strategy) {
1131 	case RPCRDMA_FRMR:
1132 		rc = rpcrdma_init_frmrs(ia, buf);
1133 		if (rc)
1134 			goto out;
1135 		break;
1136 	case RPCRDMA_MTHCAFMR:
1137 		rc = rpcrdma_init_fmrs(ia, buf);
1138 		if (rc)
1139 			goto out;
1140 		break;
1141 	default:
1142 		break;
1143 	}
1144 
1145 	/*
1146 	 * Allocate/init the request/reply buffers. Doing this
1147 	 * using kmalloc for now -- one for each buf.
1148 	 */
1149 	wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req));
1150 	rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep));
1151 	dprintk("RPC:       %s: wlen = %zu, rlen = %zu\n",
1152 		__func__, wlen, rlen);
1153 
1154 	for (i = 0; i < buf->rb_max_requests; i++) {
1155 		struct rpcrdma_req *req;
1156 		struct rpcrdma_rep *rep;
1157 
1158 		req = kmalloc(wlen, GFP_KERNEL);
1159 		if (req == NULL) {
1160 			dprintk("RPC:       %s: request buffer %d alloc"
1161 				" failed\n", __func__, i);
1162 			rc = -ENOMEM;
1163 			goto out;
1164 		}
1165 		memset(req, 0, sizeof(struct rpcrdma_req));
1166 		buf->rb_send_bufs[i] = req;
1167 		buf->rb_send_bufs[i]->rl_buffer = buf;
1168 
1169 		rc = rpcrdma_register_internal(ia, req->rl_base,
1170 				wlen - offsetof(struct rpcrdma_req, rl_base),
1171 				&buf->rb_send_bufs[i]->rl_handle,
1172 				&buf->rb_send_bufs[i]->rl_iov);
1173 		if (rc)
1174 			goto out;
1175 
1176 		buf->rb_send_bufs[i]->rl_size = wlen -
1177 						sizeof(struct rpcrdma_req);
1178 
1179 		rep = kmalloc(rlen, GFP_KERNEL);
1180 		if (rep == NULL) {
1181 			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1182 				__func__, i);
1183 			rc = -ENOMEM;
1184 			goto out;
1185 		}
1186 		memset(rep, 0, sizeof(struct rpcrdma_rep));
1187 		buf->rb_recv_bufs[i] = rep;
1188 		buf->rb_recv_bufs[i]->rr_buffer = buf;
1189 
1190 		rc = rpcrdma_register_internal(ia, rep->rr_base,
1191 				rlen - offsetof(struct rpcrdma_rep, rr_base),
1192 				&buf->rb_recv_bufs[i]->rr_handle,
1193 				&buf->rb_recv_bufs[i]->rr_iov);
1194 		if (rc)
1195 			goto out;
1196 
1197 	}
1198 	dprintk("RPC:       %s: max_requests %d\n",
1199 		__func__, buf->rb_max_requests);
1200 	/* done */
1201 	return 0;
1202 out:
1203 	rpcrdma_buffer_destroy(buf);
1204 	return rc;
1205 }
1206 
1207 static void
1208 rpcrdma_destroy_fmrs(struct rpcrdma_buffer *buf)
1209 {
1210 	struct rpcrdma_mw *r;
1211 	int rc;
1212 
1213 	while (!list_empty(&buf->rb_all)) {
1214 		r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
1215 		list_del(&r->mw_all);
1216 		list_del(&r->mw_list);
1217 
1218 		rc = ib_dealloc_fmr(r->r.fmr);
1219 		if (rc)
1220 			dprintk("RPC:       %s: ib_dealloc_fmr failed %i\n",
1221 				__func__, rc);
1222 
1223 		kfree(r);
1224 	}
1225 }
1226 
1227 static void
1228 rpcrdma_destroy_frmrs(struct rpcrdma_buffer *buf)
1229 {
1230 	struct rpcrdma_mw *r;
1231 	int rc;
1232 
1233 	while (!list_empty(&buf->rb_all)) {
1234 		r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
1235 		list_del(&r->mw_all);
1236 		list_del(&r->mw_list);
1237 
1238 		rc = ib_dereg_mr(r->r.frmr.fr_mr);
1239 		if (rc)
1240 			dprintk("RPC:       %s: ib_dereg_mr failed %i\n",
1241 				__func__, rc);
1242 		ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1243 
1244 		kfree(r);
1245 	}
1246 }
1247 
1248 void
1249 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1250 {
1251 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1252 	int i;
1253 
1254 	/* clean up in reverse order from create
1255 	 *   1.  recv mr memory (mr free, then kfree)
1256 	 *   2.  send mr memory (mr free, then kfree)
1257 	 *   3.  MWs
1258 	 */
1259 	dprintk("RPC:       %s: entering\n", __func__);
1260 
1261 	for (i = 0; i < buf->rb_max_requests; i++) {
1262 		if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1263 			rpcrdma_deregister_internal(ia,
1264 					buf->rb_recv_bufs[i]->rr_handle,
1265 					&buf->rb_recv_bufs[i]->rr_iov);
1266 			kfree(buf->rb_recv_bufs[i]);
1267 		}
1268 		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1269 			rpcrdma_deregister_internal(ia,
1270 					buf->rb_send_bufs[i]->rl_handle,
1271 					&buf->rb_send_bufs[i]->rl_iov);
1272 			kfree(buf->rb_send_bufs[i]);
1273 		}
1274 	}
1275 
1276 	switch (ia->ri_memreg_strategy) {
1277 	case RPCRDMA_FRMR:
1278 		rpcrdma_destroy_frmrs(buf);
1279 		break;
1280 	case RPCRDMA_MTHCAFMR:
1281 		rpcrdma_destroy_fmrs(buf);
1282 		break;
1283 	default:
1284 		break;
1285 	}
1286 
1287 	kfree(buf->rb_pool);
1288 }
1289 
1290 /* After a disconnect, a flushed FAST_REG_MR can leave an FRMR in
1291  * an unusable state. Find FRMRs in this state and dereg / reg
1292  * each.  FRMRs that are VALID and attached to an rpcrdma_req are
1293  * also torn down.
1294  *
1295  * This gives all in-use FRMRs a fresh rkey and leaves them INVALID.
1296  *
1297  * This is invoked only in the transport connect worker in order
1298  * to serialize with rpcrdma_register_frmr_external().
1299  */
1300 static void
1301 rpcrdma_reset_frmrs(struct rpcrdma_ia *ia)
1302 {
1303 	struct rpcrdma_xprt *r_xprt =
1304 				container_of(ia, struct rpcrdma_xprt, rx_ia);
1305 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1306 	struct list_head *pos;
1307 	struct rpcrdma_mw *r;
1308 	int rc;
1309 
1310 	list_for_each(pos, &buf->rb_all) {
1311 		r = list_entry(pos, struct rpcrdma_mw, mw_all);
1312 
1313 		if (r->r.frmr.fr_state == FRMR_IS_INVALID)
1314 			continue;
1315 
1316 		rc = ib_dereg_mr(r->r.frmr.fr_mr);
1317 		if (rc)
1318 			dprintk("RPC:       %s: ib_dereg_mr failed %i\n",
1319 				__func__, rc);
1320 		ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1321 
1322 		r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1323 					ia->ri_max_frmr_depth);
1324 		if (IS_ERR(r->r.frmr.fr_mr)) {
1325 			rc = PTR_ERR(r->r.frmr.fr_mr);
1326 			dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1327 				" failed %i\n", __func__, rc);
1328 			continue;
1329 		}
1330 		r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1331 					ia->ri_id->device,
1332 					ia->ri_max_frmr_depth);
1333 		if (IS_ERR(r->r.frmr.fr_pgl)) {
1334 			rc = PTR_ERR(r->r.frmr.fr_pgl);
1335 			dprintk("RPC:       %s: "
1336 				"ib_alloc_fast_reg_page_list "
1337 				"failed %i\n", __func__, rc);
1338 
1339 			ib_dereg_mr(r->r.frmr.fr_mr);
1340 			continue;
1341 		}
1342 		r->r.frmr.fr_state = FRMR_IS_INVALID;
1343 	}
1344 }
1345 
1346 /* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
1347  * some req segments uninitialized.
1348  */
1349 static void
1350 rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
1351 {
1352 	if (*mw) {
1353 		list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
1354 		*mw = NULL;
1355 	}
1356 }
1357 
1358 /* Cycle mw's back in reverse order, and "spin" them.
1359  * This delays and scrambles reuse as much as possible.
1360  */
1361 static void
1362 rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1363 {
1364 	struct rpcrdma_mr_seg *seg = req->rl_segments;
1365 	struct rpcrdma_mr_seg *seg1 = seg;
1366 	int i;
1367 
1368 	for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
1369 		rpcrdma_buffer_put_mr(&seg->mr_chunk.rl_mw, buf);
1370 	rpcrdma_buffer_put_mr(&seg1->mr_chunk.rl_mw, buf);
1371 }
1372 
1373 static void
1374 rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1375 {
1376 	buf->rb_send_bufs[--buf->rb_send_index] = req;
1377 	req->rl_niovs = 0;
1378 	if (req->rl_reply) {
1379 		buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
1380 		req->rl_reply->rr_func = NULL;
1381 		req->rl_reply = NULL;
1382 	}
1383 }
1384 
1385 /* rpcrdma_unmap_one() was already done by rpcrdma_deregister_frmr_external().
1386  * Redo only the ib_post_send().
1387  */
1388 static void
1389 rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
1390 {
1391 	struct rpcrdma_xprt *r_xprt =
1392 				container_of(ia, struct rpcrdma_xprt, rx_ia);
1393 	struct ib_send_wr invalidate_wr, *bad_wr;
1394 	int rc;
1395 
1396 	dprintk("RPC:       %s: FRMR %p is stale\n", __func__, r);
1397 
1398 	/* When this FRMR is re-inserted into rb_mws, it is no longer stale */
1399 	r->r.frmr.fr_state = FRMR_IS_INVALID;
1400 
1401 	memset(&invalidate_wr, 0, sizeof(invalidate_wr));
1402 	invalidate_wr.wr_id = (unsigned long)(void *)r;
1403 	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1404 	invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
1405 	DECR_CQCOUNT(&r_xprt->rx_ep);
1406 
1407 	dprintk("RPC:       %s: frmr %p invalidating rkey %08x\n",
1408 		__func__, r, r->r.frmr.fr_mr->rkey);
1409 
1410 	read_lock(&ia->ri_qplock);
1411 	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1412 	read_unlock(&ia->ri_qplock);
1413 	if (rc) {
1414 		/* Force rpcrdma_buffer_get() to retry */
1415 		r->r.frmr.fr_state = FRMR_IS_STALE;
1416 		dprintk("RPC:       %s: ib_post_send failed, %i\n",
1417 			__func__, rc);
1418 	}
1419 }
1420 
1421 static void
1422 rpcrdma_retry_flushed_linv(struct list_head *stale,
1423 			   struct rpcrdma_buffer *buf)
1424 {
1425 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1426 	struct list_head *pos;
1427 	struct rpcrdma_mw *r;
1428 	unsigned long flags;
1429 
1430 	list_for_each(pos, stale) {
1431 		r = list_entry(pos, struct rpcrdma_mw, mw_list);
1432 		rpcrdma_retry_local_inv(r, ia);
1433 	}
1434 
1435 	spin_lock_irqsave(&buf->rb_lock, flags);
1436 	list_splice_tail(stale, &buf->rb_mws);
1437 	spin_unlock_irqrestore(&buf->rb_lock, flags);
1438 }
1439 
1440 static struct rpcrdma_req *
1441 rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
1442 			 struct list_head *stale)
1443 {
1444 	struct rpcrdma_mw *r;
1445 	int i;
1446 
1447 	i = RPCRDMA_MAX_SEGS - 1;
1448 	while (!list_empty(&buf->rb_mws)) {
1449 		r = list_entry(buf->rb_mws.next,
1450 			       struct rpcrdma_mw, mw_list);
1451 		list_del(&r->mw_list);
1452 		if (r->r.frmr.fr_state == FRMR_IS_STALE) {
1453 			list_add(&r->mw_list, stale);
1454 			continue;
1455 		}
1456 		req->rl_segments[i].mr_chunk.rl_mw = r;
1457 		if (unlikely(i-- == 0))
1458 			return req;	/* Success */
1459 	}
1460 
1461 	/* Not enough entries on rb_mws for this req */
1462 	rpcrdma_buffer_put_sendbuf(req, buf);
1463 	rpcrdma_buffer_put_mrs(req, buf);
1464 	return NULL;
1465 }
1466 
1467 static struct rpcrdma_req *
1468 rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
1469 {
1470 	struct rpcrdma_mw *r;
1471 	int i;
1472 
1473 	i = RPCRDMA_MAX_SEGS - 1;
1474 	while (!list_empty(&buf->rb_mws)) {
1475 		r = list_entry(buf->rb_mws.next,
1476 			       struct rpcrdma_mw, mw_list);
1477 		list_del(&r->mw_list);
1478 		req->rl_segments[i].mr_chunk.rl_mw = r;
1479 		if (unlikely(i-- == 0))
1480 			return req;	/* Success */
1481 	}
1482 
1483 	/* Not enough entries on rb_mws for this req */
1484 	rpcrdma_buffer_put_sendbuf(req, buf);
1485 	rpcrdma_buffer_put_mrs(req, buf);
1486 	return NULL;
1487 }
1488 
1489 /*
1490  * Get a set of request/reply buffers.
1491  *
1492  * Reply buffer (if needed) is attached to send buffer upon return.
1493  * Rule:
1494  *    rb_send_index and rb_recv_index MUST always be pointing to the
1495  *    *next* available buffer (non-NULL). They are incremented after
1496  *    removing buffers, and decremented *before* returning them.
1497  */
1498 struct rpcrdma_req *
1499 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1500 {
1501 	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1502 	struct list_head stale;
1503 	struct rpcrdma_req *req;
1504 	unsigned long flags;
1505 
1506 	spin_lock_irqsave(&buffers->rb_lock, flags);
1507 	if (buffers->rb_send_index == buffers->rb_max_requests) {
1508 		spin_unlock_irqrestore(&buffers->rb_lock, flags);
1509 		dprintk("RPC:       %s: out of request buffers\n", __func__);
1510 		return ((struct rpcrdma_req *)NULL);
1511 	}
1512 
1513 	req = buffers->rb_send_bufs[buffers->rb_send_index];
1514 	if (buffers->rb_send_index < buffers->rb_recv_index) {
1515 		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1516 			__func__,
1517 			buffers->rb_recv_index - buffers->rb_send_index);
1518 		req->rl_reply = NULL;
1519 	} else {
1520 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1521 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1522 	}
1523 	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1524 
1525 	INIT_LIST_HEAD(&stale);
1526 	switch (ia->ri_memreg_strategy) {
1527 	case RPCRDMA_FRMR:
1528 		req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
1529 		break;
1530 	case RPCRDMA_MTHCAFMR:
1531 		req = rpcrdma_buffer_get_fmrs(req, buffers);
1532 		break;
1533 	default:
1534 		break;
1535 	}
1536 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1537 	if (!list_empty(&stale))
1538 		rpcrdma_retry_flushed_linv(&stale, buffers);
1539 	return req;
1540 }
1541 
1542 /*
1543  * Put request/reply buffers back into pool.
1544  * Pre-decrement counter/array index.
1545  */
1546 void
1547 rpcrdma_buffer_put(struct rpcrdma_req *req)
1548 {
1549 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1550 	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1551 	unsigned long flags;
1552 
1553 	spin_lock_irqsave(&buffers->rb_lock, flags);
1554 	rpcrdma_buffer_put_sendbuf(req, buffers);
1555 	switch (ia->ri_memreg_strategy) {
1556 	case RPCRDMA_FRMR:
1557 	case RPCRDMA_MTHCAFMR:
1558 		rpcrdma_buffer_put_mrs(req, buffers);
1559 		break;
1560 	default:
1561 		break;
1562 	}
1563 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1564 }
1565 
1566 /*
1567  * Recover reply buffers from pool.
1568  * This happens when recovering from error conditions.
1569  * Post-increment counter/array index.
1570  */
1571 void
1572 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1573 {
1574 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1575 	unsigned long flags;
1576 
1577 	if (req->rl_iov.length == 0)	/* special case xprt_rdma_allocate() */
1578 		buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1579 	spin_lock_irqsave(&buffers->rb_lock, flags);
1580 	if (buffers->rb_recv_index < buffers->rb_max_requests) {
1581 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1582 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1583 	}
1584 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1585 }
1586 
1587 /*
1588  * Put reply buffers back into pool when not attached to
1589  * request. This happens in error conditions.
1590  */
1591 void
1592 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1593 {
1594 	struct rpcrdma_buffer *buffers = rep->rr_buffer;
1595 	unsigned long flags;
1596 
1597 	rep->rr_func = NULL;
1598 	spin_lock_irqsave(&buffers->rb_lock, flags);
1599 	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1600 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1601 }
1602 
1603 /*
1604  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1605  */
1606 
1607 int
1608 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1609 				struct ib_mr **mrp, struct ib_sge *iov)
1610 {
1611 	struct ib_phys_buf ipb;
1612 	struct ib_mr *mr;
1613 	int rc;
1614 
1615 	/*
1616 	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1617 	 */
1618 	iov->addr = ib_dma_map_single(ia->ri_id->device,
1619 			va, len, DMA_BIDIRECTIONAL);
1620 	if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
1621 		return -ENOMEM;
1622 
1623 	iov->length = len;
1624 
1625 	if (ia->ri_have_dma_lkey) {
1626 		*mrp = NULL;
1627 		iov->lkey = ia->ri_dma_lkey;
1628 		return 0;
1629 	} else if (ia->ri_bind_mem != NULL) {
1630 		*mrp = NULL;
1631 		iov->lkey = ia->ri_bind_mem->lkey;
1632 		return 0;
1633 	}
1634 
1635 	ipb.addr = iov->addr;
1636 	ipb.size = iov->length;
1637 	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1638 			IB_ACCESS_LOCAL_WRITE, &iov->addr);
1639 
1640 	dprintk("RPC:       %s: phys convert: 0x%llx "
1641 			"registered 0x%llx length %d\n",
1642 			__func__, (unsigned long long)ipb.addr,
1643 			(unsigned long long)iov->addr, len);
1644 
1645 	if (IS_ERR(mr)) {
1646 		*mrp = NULL;
1647 		rc = PTR_ERR(mr);
1648 		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1649 	} else {
1650 		*mrp = mr;
1651 		iov->lkey = mr->lkey;
1652 		rc = 0;
1653 	}
1654 
1655 	return rc;
1656 }
1657 
1658 int
1659 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1660 				struct ib_mr *mr, struct ib_sge *iov)
1661 {
1662 	int rc;
1663 
1664 	ib_dma_unmap_single(ia->ri_id->device,
1665 			iov->addr, iov->length, DMA_BIDIRECTIONAL);
1666 
1667 	if (NULL == mr)
1668 		return 0;
1669 
1670 	rc = ib_dereg_mr(mr);
1671 	if (rc)
1672 		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1673 	return rc;
1674 }
1675 
1676 /*
1677  * Wrappers for chunk registration, shared by read/write chunk code.
1678  */
1679 
1680 static void
1681 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1682 {
1683 	seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1684 	seg->mr_dmalen = seg->mr_len;
1685 	if (seg->mr_page)
1686 		seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1687 				seg->mr_page, offset_in_page(seg->mr_offset),
1688 				seg->mr_dmalen, seg->mr_dir);
1689 	else
1690 		seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1691 				seg->mr_offset,
1692 				seg->mr_dmalen, seg->mr_dir);
1693 	if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1694 		dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1695 			__func__,
1696 			(unsigned long long)seg->mr_dma,
1697 			seg->mr_offset, seg->mr_dmalen);
1698 	}
1699 }
1700 
1701 static void
1702 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1703 {
1704 	if (seg->mr_page)
1705 		ib_dma_unmap_page(ia->ri_id->device,
1706 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1707 	else
1708 		ib_dma_unmap_single(ia->ri_id->device,
1709 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1710 }
1711 
1712 static int
1713 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1714 			int *nsegs, int writing, struct rpcrdma_ia *ia,
1715 			struct rpcrdma_xprt *r_xprt)
1716 {
1717 	struct rpcrdma_mr_seg *seg1 = seg;
1718 	struct rpcrdma_mw *mw = seg1->mr_chunk.rl_mw;
1719 	struct rpcrdma_frmr *frmr = &mw->r.frmr;
1720 	struct ib_mr *mr = frmr->fr_mr;
1721 	struct ib_send_wr fastreg_wr, *bad_wr;
1722 	u8 key;
1723 	int len, pageoff;
1724 	int i, rc;
1725 	int seg_len;
1726 	u64 pa;
1727 	int page_no;
1728 
1729 	pageoff = offset_in_page(seg1->mr_offset);
1730 	seg1->mr_offset -= pageoff;	/* start of page */
1731 	seg1->mr_len += pageoff;
1732 	len = -pageoff;
1733 	if (*nsegs > ia->ri_max_frmr_depth)
1734 		*nsegs = ia->ri_max_frmr_depth;
1735 	for (page_no = i = 0; i < *nsegs;) {
1736 		rpcrdma_map_one(ia, seg, writing);
1737 		pa = seg->mr_dma;
1738 		for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1739 			frmr->fr_pgl->page_list[page_no++] = pa;
1740 			pa += PAGE_SIZE;
1741 		}
1742 		len += seg->mr_len;
1743 		++seg;
1744 		++i;
1745 		/* Check for holes */
1746 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1747 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1748 			break;
1749 	}
1750 	dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1751 		__func__, mw, i);
1752 
1753 	frmr->fr_state = FRMR_IS_VALID;
1754 
1755 	memset(&fastreg_wr, 0, sizeof(fastreg_wr));
1756 	fastreg_wr.wr_id = (unsigned long)(void *)mw;
1757 	fastreg_wr.opcode = IB_WR_FAST_REG_MR;
1758 	fastreg_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1759 	fastreg_wr.wr.fast_reg.page_list = frmr->fr_pgl;
1760 	fastreg_wr.wr.fast_reg.page_list_len = page_no;
1761 	fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1762 	fastreg_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1763 	if (fastreg_wr.wr.fast_reg.length < len) {
1764 		rc = -EIO;
1765 		goto out_err;
1766 	}
1767 
1768 	/* Bump the key */
1769 	key = (u8)(mr->rkey & 0x000000FF);
1770 	ib_update_fast_reg_key(mr, ++key);
1771 
1772 	fastreg_wr.wr.fast_reg.access_flags = (writing ?
1773 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1774 				IB_ACCESS_REMOTE_READ);
1775 	fastreg_wr.wr.fast_reg.rkey = mr->rkey;
1776 	DECR_CQCOUNT(&r_xprt->rx_ep);
1777 
1778 	rc = ib_post_send(ia->ri_id->qp, &fastreg_wr, &bad_wr);
1779 	if (rc) {
1780 		dprintk("RPC:       %s: failed ib_post_send for register,"
1781 			" status %i\n", __func__, rc);
1782 		ib_update_fast_reg_key(mr, --key);
1783 		goto out_err;
1784 	} else {
1785 		seg1->mr_rkey = mr->rkey;
1786 		seg1->mr_base = seg1->mr_dma + pageoff;
1787 		seg1->mr_nsegs = i;
1788 		seg1->mr_len = len;
1789 	}
1790 	*nsegs = i;
1791 	return 0;
1792 out_err:
1793 	frmr->fr_state = FRMR_IS_INVALID;
1794 	while (i--)
1795 		rpcrdma_unmap_one(ia, --seg);
1796 	return rc;
1797 }
1798 
1799 static int
1800 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1801 			struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1802 {
1803 	struct rpcrdma_mr_seg *seg1 = seg;
1804 	struct ib_send_wr invalidate_wr, *bad_wr;
1805 	int rc;
1806 
1807 	seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
1808 
1809 	memset(&invalidate_wr, 0, sizeof invalidate_wr);
1810 	invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1811 	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1812 	invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1813 	DECR_CQCOUNT(&r_xprt->rx_ep);
1814 
1815 	read_lock(&ia->ri_qplock);
1816 	while (seg1->mr_nsegs--)
1817 		rpcrdma_unmap_one(ia, seg++);
1818 	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1819 	read_unlock(&ia->ri_qplock);
1820 	if (rc) {
1821 		/* Force rpcrdma_buffer_get() to retry */
1822 		seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
1823 		dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1824 			" status %i\n", __func__, rc);
1825 	}
1826 	return rc;
1827 }
1828 
1829 static int
1830 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1831 			int *nsegs, int writing, struct rpcrdma_ia *ia)
1832 {
1833 	struct rpcrdma_mr_seg *seg1 = seg;
1834 	u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1835 	int len, pageoff, i, rc;
1836 
1837 	pageoff = offset_in_page(seg1->mr_offset);
1838 	seg1->mr_offset -= pageoff;	/* start of page */
1839 	seg1->mr_len += pageoff;
1840 	len = -pageoff;
1841 	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1842 		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1843 	for (i = 0; i < *nsegs;) {
1844 		rpcrdma_map_one(ia, seg, writing);
1845 		physaddrs[i] = seg->mr_dma;
1846 		len += seg->mr_len;
1847 		++seg;
1848 		++i;
1849 		/* Check for holes */
1850 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1851 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1852 			break;
1853 	}
1854 	rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1855 				physaddrs, i, seg1->mr_dma);
1856 	if (rc) {
1857 		dprintk("RPC:       %s: failed ib_map_phys_fmr "
1858 			"%u@0x%llx+%i (%d)... status %i\n", __func__,
1859 			len, (unsigned long long)seg1->mr_dma,
1860 			pageoff, i, rc);
1861 		while (i--)
1862 			rpcrdma_unmap_one(ia, --seg);
1863 	} else {
1864 		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1865 		seg1->mr_base = seg1->mr_dma + pageoff;
1866 		seg1->mr_nsegs = i;
1867 		seg1->mr_len = len;
1868 	}
1869 	*nsegs = i;
1870 	return rc;
1871 }
1872 
1873 static int
1874 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1875 			struct rpcrdma_ia *ia)
1876 {
1877 	struct rpcrdma_mr_seg *seg1 = seg;
1878 	LIST_HEAD(l);
1879 	int rc;
1880 
1881 	list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1882 	rc = ib_unmap_fmr(&l);
1883 	read_lock(&ia->ri_qplock);
1884 	while (seg1->mr_nsegs--)
1885 		rpcrdma_unmap_one(ia, seg++);
1886 	read_unlock(&ia->ri_qplock);
1887 	if (rc)
1888 		dprintk("RPC:       %s: failed ib_unmap_fmr,"
1889 			" status %i\n", __func__, rc);
1890 	return rc;
1891 }
1892 
1893 int
1894 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1895 			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1896 {
1897 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1898 	int rc = 0;
1899 
1900 	switch (ia->ri_memreg_strategy) {
1901 
1902 	case RPCRDMA_ALLPHYSICAL:
1903 		rpcrdma_map_one(ia, seg, writing);
1904 		seg->mr_rkey = ia->ri_bind_mem->rkey;
1905 		seg->mr_base = seg->mr_dma;
1906 		seg->mr_nsegs = 1;
1907 		nsegs = 1;
1908 		break;
1909 
1910 	/* Registration using frmr registration */
1911 	case RPCRDMA_FRMR:
1912 		rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1913 		break;
1914 
1915 	/* Registration using fmr memory registration */
1916 	case RPCRDMA_MTHCAFMR:
1917 		rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1918 		break;
1919 
1920 	default:
1921 		return -1;
1922 	}
1923 	if (rc)
1924 		return -1;
1925 
1926 	return nsegs;
1927 }
1928 
1929 int
1930 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1931 		struct rpcrdma_xprt *r_xprt)
1932 {
1933 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1934 	int nsegs = seg->mr_nsegs, rc;
1935 
1936 	switch (ia->ri_memreg_strategy) {
1937 
1938 	case RPCRDMA_ALLPHYSICAL:
1939 		read_lock(&ia->ri_qplock);
1940 		rpcrdma_unmap_one(ia, seg);
1941 		read_unlock(&ia->ri_qplock);
1942 		break;
1943 
1944 	case RPCRDMA_FRMR:
1945 		rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1946 		break;
1947 
1948 	case RPCRDMA_MTHCAFMR:
1949 		rc = rpcrdma_deregister_fmr_external(seg, ia);
1950 		break;
1951 
1952 	default:
1953 		break;
1954 	}
1955 	return nsegs;
1956 }
1957 
1958 /*
1959  * Prepost any receive buffer, then post send.
1960  *
1961  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1962  */
1963 int
1964 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1965 		struct rpcrdma_ep *ep,
1966 		struct rpcrdma_req *req)
1967 {
1968 	struct ib_send_wr send_wr, *send_wr_fail;
1969 	struct rpcrdma_rep *rep = req->rl_reply;
1970 	int rc;
1971 
1972 	if (rep) {
1973 		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1974 		if (rc)
1975 			goto out;
1976 		req->rl_reply = NULL;
1977 	}
1978 
1979 	send_wr.next = NULL;
1980 	send_wr.wr_id = 0ULL;	/* no send cookie */
1981 	send_wr.sg_list = req->rl_send_iov;
1982 	send_wr.num_sge = req->rl_niovs;
1983 	send_wr.opcode = IB_WR_SEND;
1984 	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
1985 		ib_dma_sync_single_for_device(ia->ri_id->device,
1986 			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1987 			DMA_TO_DEVICE);
1988 	ib_dma_sync_single_for_device(ia->ri_id->device,
1989 		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1990 		DMA_TO_DEVICE);
1991 	ib_dma_sync_single_for_device(ia->ri_id->device,
1992 		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1993 		DMA_TO_DEVICE);
1994 
1995 	if (DECR_CQCOUNT(ep) > 0)
1996 		send_wr.send_flags = 0;
1997 	else { /* Provider must take a send completion every now and then */
1998 		INIT_CQCOUNT(ep);
1999 		send_wr.send_flags = IB_SEND_SIGNALED;
2000 	}
2001 
2002 	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
2003 	if (rc)
2004 		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
2005 			rc);
2006 out:
2007 	return rc;
2008 }
2009 
2010 /*
2011  * (Re)post a receive buffer.
2012  */
2013 int
2014 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
2015 		     struct rpcrdma_ep *ep,
2016 		     struct rpcrdma_rep *rep)
2017 {
2018 	struct ib_recv_wr recv_wr, *recv_wr_fail;
2019 	int rc;
2020 
2021 	recv_wr.next = NULL;
2022 	recv_wr.wr_id = (u64) (unsigned long) rep;
2023 	recv_wr.sg_list = &rep->rr_iov;
2024 	recv_wr.num_sge = 1;
2025 
2026 	ib_dma_sync_single_for_cpu(ia->ri_id->device,
2027 		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
2028 
2029 	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
2030 
2031 	if (rc)
2032 		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
2033 			rc);
2034 	return rc;
2035 }
2036 
2037 /* Physical mapping means one Read/Write list entry per-page.
2038  * All list entries must fit within an inline buffer
2039  *
2040  * NB: The server must return a Write list for NFS READ,
2041  *     which has the same constraint. Factor in the inline
2042  *     rsize as well.
2043  */
2044 static size_t
2045 rpcrdma_physical_max_payload(struct rpcrdma_xprt *r_xprt)
2046 {
2047 	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
2048 	unsigned int inline_size, pages;
2049 
2050 	inline_size = min_t(unsigned int,
2051 			    cdata->inline_wsize, cdata->inline_rsize);
2052 	inline_size -= RPCRDMA_HDRLEN_MIN;
2053 	pages = inline_size / sizeof(struct rpcrdma_segment);
2054 	return pages << PAGE_SHIFT;
2055 }
2056 
2057 static size_t
2058 rpcrdma_mr_max_payload(struct rpcrdma_xprt *r_xprt)
2059 {
2060 	return RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT;
2061 }
2062 
2063 size_t
2064 rpcrdma_max_payload(struct rpcrdma_xprt *r_xprt)
2065 {
2066 	size_t result;
2067 
2068 	switch (r_xprt->rx_ia.ri_memreg_strategy) {
2069 	case RPCRDMA_ALLPHYSICAL:
2070 		result = rpcrdma_physical_max_payload(r_xprt);
2071 		break;
2072 	default:
2073 		result = rpcrdma_mr_max_payload(r_xprt);
2074 	}
2075 	return result;
2076 }
2077