xref: /openbmc/linux/net/sunrpc/xprtrdma/verbs.c (revision 615c36f5)
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39 
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49 
50 #include <linux/interrupt.h>
51 #include <linux/pci.h>	/* for Tavor hack below */
52 #include <linux/slab.h>
53 
54 #include "xprt_rdma.h"
55 
56 /*
57  * Globals/Macros
58  */
59 
60 #ifdef RPC_DEBUG
61 # define RPCDBG_FACILITY	RPCDBG_TRANS
62 #endif
63 
64 /*
65  * internal functions
66  */
67 
68 /*
69  * handle replies in tasklet context, using a single, global list
70  * rdma tasklet function -- just turn around and call the func
71  * for all replies on the list
72  */
73 
74 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
75 static LIST_HEAD(rpcrdma_tasklets_g);
76 
77 static void
78 rpcrdma_run_tasklet(unsigned long data)
79 {
80 	struct rpcrdma_rep *rep;
81 	void (*func)(struct rpcrdma_rep *);
82 	unsigned long flags;
83 
84 	data = data;
85 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
86 	while (!list_empty(&rpcrdma_tasklets_g)) {
87 		rep = list_entry(rpcrdma_tasklets_g.next,
88 				 struct rpcrdma_rep, rr_list);
89 		list_del(&rep->rr_list);
90 		func = rep->rr_func;
91 		rep->rr_func = NULL;
92 		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
93 
94 		if (func)
95 			func(rep);
96 		else
97 			rpcrdma_recv_buffer_put(rep);
98 
99 		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
100 	}
101 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
102 }
103 
104 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
105 
106 static inline void
107 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
108 {
109 	unsigned long flags;
110 
111 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
112 	list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
113 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
114 	tasklet_schedule(&rpcrdma_tasklet_g);
115 }
116 
117 static void
118 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
119 {
120 	struct rpcrdma_ep *ep = context;
121 
122 	dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
123 		__func__, event->event, event->device->name, context);
124 	if (ep->rep_connected == 1) {
125 		ep->rep_connected = -EIO;
126 		ep->rep_func(ep);
127 		wake_up_all(&ep->rep_connect_wait);
128 	}
129 }
130 
131 static void
132 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
133 {
134 	struct rpcrdma_ep *ep = context;
135 
136 	dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
137 		__func__, event->event, event->device->name, context);
138 	if (ep->rep_connected == 1) {
139 		ep->rep_connected = -EIO;
140 		ep->rep_func(ep);
141 		wake_up_all(&ep->rep_connect_wait);
142 	}
143 }
144 
145 static inline
146 void rpcrdma_event_process(struct ib_wc *wc)
147 {
148 	struct rpcrdma_mw *frmr;
149 	struct rpcrdma_rep *rep =
150 			(struct rpcrdma_rep *)(unsigned long) wc->wr_id;
151 
152 	dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
153 		__func__, rep, wc->status, wc->opcode, wc->byte_len);
154 
155 	if (!rep) /* send or bind completion that we don't care about */
156 		return;
157 
158 	if (IB_WC_SUCCESS != wc->status) {
159 		dprintk("RPC:       %s: WC opcode %d status %X, connection lost\n",
160 			__func__, wc->opcode, wc->status);
161 		rep->rr_len = ~0U;
162 		if (wc->opcode != IB_WC_FAST_REG_MR && wc->opcode != IB_WC_LOCAL_INV)
163 			rpcrdma_schedule_tasklet(rep);
164 		return;
165 	}
166 
167 	switch (wc->opcode) {
168 	case IB_WC_FAST_REG_MR:
169 		frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
170 		frmr->r.frmr.state = FRMR_IS_VALID;
171 		break;
172 	case IB_WC_LOCAL_INV:
173 		frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
174 		frmr->r.frmr.state = FRMR_IS_INVALID;
175 		break;
176 	case IB_WC_RECV:
177 		rep->rr_len = wc->byte_len;
178 		ib_dma_sync_single_for_cpu(
179 			rdmab_to_ia(rep->rr_buffer)->ri_id->device,
180 			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
181 		/* Keep (only) the most recent credits, after check validity */
182 		if (rep->rr_len >= 16) {
183 			struct rpcrdma_msg *p =
184 					(struct rpcrdma_msg *) rep->rr_base;
185 			unsigned int credits = ntohl(p->rm_credit);
186 			if (credits == 0) {
187 				dprintk("RPC:       %s: server"
188 					" dropped credits to 0!\n", __func__);
189 				/* don't deadlock */
190 				credits = 1;
191 			} else if (credits > rep->rr_buffer->rb_max_requests) {
192 				dprintk("RPC:       %s: server"
193 					" over-crediting: %d (%d)\n",
194 					__func__, credits,
195 					rep->rr_buffer->rb_max_requests);
196 				credits = rep->rr_buffer->rb_max_requests;
197 			}
198 			atomic_set(&rep->rr_buffer->rb_credits, credits);
199 		}
200 		/* fall through */
201 	case IB_WC_BIND_MW:
202 		rpcrdma_schedule_tasklet(rep);
203 		break;
204 	default:
205 		dprintk("RPC:       %s: unexpected WC event %X\n",
206 			__func__, wc->opcode);
207 		break;
208 	}
209 }
210 
211 static inline int
212 rpcrdma_cq_poll(struct ib_cq *cq)
213 {
214 	struct ib_wc wc;
215 	int rc;
216 
217 	for (;;) {
218 		rc = ib_poll_cq(cq, 1, &wc);
219 		if (rc < 0) {
220 			dprintk("RPC:       %s: ib_poll_cq failed %i\n",
221 				__func__, rc);
222 			return rc;
223 		}
224 		if (rc == 0)
225 			break;
226 
227 		rpcrdma_event_process(&wc);
228 	}
229 
230 	return 0;
231 }
232 
233 /*
234  * rpcrdma_cq_event_upcall
235  *
236  * This upcall handles recv, send, bind and unbind events.
237  * It is reentrant but processes single events in order to maintain
238  * ordering of receives to keep server credits.
239  *
240  * It is the responsibility of the scheduled tasklet to return
241  * recv buffers to the pool. NOTE: this affects synchronization of
242  * connection shutdown. That is, the structures required for
243  * the completion of the reply handler must remain intact until
244  * all memory has been reclaimed.
245  *
246  * Note that send events are suppressed and do not result in an upcall.
247  */
248 static void
249 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
250 {
251 	int rc;
252 
253 	rc = rpcrdma_cq_poll(cq);
254 	if (rc)
255 		return;
256 
257 	rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
258 	if (rc) {
259 		dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
260 			__func__, rc);
261 		return;
262 	}
263 
264 	rpcrdma_cq_poll(cq);
265 }
266 
267 #ifdef RPC_DEBUG
268 static const char * const conn[] = {
269 	"address resolved",
270 	"address error",
271 	"route resolved",
272 	"route error",
273 	"connect request",
274 	"connect response",
275 	"connect error",
276 	"unreachable",
277 	"rejected",
278 	"established",
279 	"disconnected",
280 	"device removal"
281 };
282 #endif
283 
284 static int
285 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
286 {
287 	struct rpcrdma_xprt *xprt = id->context;
288 	struct rpcrdma_ia *ia = &xprt->rx_ia;
289 	struct rpcrdma_ep *ep = &xprt->rx_ep;
290 #ifdef RPC_DEBUG
291 	struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
292 #endif
293 	struct ib_qp_attr attr;
294 	struct ib_qp_init_attr iattr;
295 	int connstate = 0;
296 
297 	switch (event->event) {
298 	case RDMA_CM_EVENT_ADDR_RESOLVED:
299 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
300 		ia->ri_async_rc = 0;
301 		complete(&ia->ri_done);
302 		break;
303 	case RDMA_CM_EVENT_ADDR_ERROR:
304 		ia->ri_async_rc = -EHOSTUNREACH;
305 		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
306 			__func__, ep);
307 		complete(&ia->ri_done);
308 		break;
309 	case RDMA_CM_EVENT_ROUTE_ERROR:
310 		ia->ri_async_rc = -ENETUNREACH;
311 		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
312 			__func__, ep);
313 		complete(&ia->ri_done);
314 		break;
315 	case RDMA_CM_EVENT_ESTABLISHED:
316 		connstate = 1;
317 		ib_query_qp(ia->ri_id->qp, &attr,
318 			IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
319 			&iattr);
320 		dprintk("RPC:       %s: %d responder resources"
321 			" (%d initiator)\n",
322 			__func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
323 		goto connected;
324 	case RDMA_CM_EVENT_CONNECT_ERROR:
325 		connstate = -ENOTCONN;
326 		goto connected;
327 	case RDMA_CM_EVENT_UNREACHABLE:
328 		connstate = -ENETDOWN;
329 		goto connected;
330 	case RDMA_CM_EVENT_REJECTED:
331 		connstate = -ECONNREFUSED;
332 		goto connected;
333 	case RDMA_CM_EVENT_DISCONNECTED:
334 		connstate = -ECONNABORTED;
335 		goto connected;
336 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
337 		connstate = -ENODEV;
338 connected:
339 		dprintk("RPC:       %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
340 			__func__,
341 			(event->event <= 11) ? conn[event->event] :
342 						"unknown connection error",
343 			&addr->sin_addr.s_addr,
344 			ntohs(addr->sin_port),
345 			ep, event->event);
346 		atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
347 		dprintk("RPC:       %s: %sconnected\n",
348 					__func__, connstate > 0 ? "" : "dis");
349 		ep->rep_connected = connstate;
350 		ep->rep_func(ep);
351 		wake_up_all(&ep->rep_connect_wait);
352 		break;
353 	default:
354 		dprintk("RPC:       %s: unexpected CM event %d\n",
355 			__func__, event->event);
356 		break;
357 	}
358 
359 #ifdef RPC_DEBUG
360 	if (connstate == 1) {
361 		int ird = attr.max_dest_rd_atomic;
362 		int tird = ep->rep_remote_cma.responder_resources;
363 		printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
364 			"on %s, memreg %d slots %d ird %d%s\n",
365 			&addr->sin_addr.s_addr,
366 			ntohs(addr->sin_port),
367 			ia->ri_id->device->name,
368 			ia->ri_memreg_strategy,
369 			xprt->rx_buf.rb_max_requests,
370 			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
371 	} else if (connstate < 0) {
372 		printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
373 			&addr->sin_addr.s_addr,
374 			ntohs(addr->sin_port),
375 			connstate);
376 	}
377 #endif
378 
379 	return 0;
380 }
381 
382 static struct rdma_cm_id *
383 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
384 			struct rpcrdma_ia *ia, struct sockaddr *addr)
385 {
386 	struct rdma_cm_id *id;
387 	int rc;
388 
389 	init_completion(&ia->ri_done);
390 
391 	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
392 	if (IS_ERR(id)) {
393 		rc = PTR_ERR(id);
394 		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
395 			__func__, rc);
396 		return id;
397 	}
398 
399 	ia->ri_async_rc = -ETIMEDOUT;
400 	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
401 	if (rc) {
402 		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
403 			__func__, rc);
404 		goto out;
405 	}
406 	wait_for_completion_interruptible_timeout(&ia->ri_done,
407 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
408 	rc = ia->ri_async_rc;
409 	if (rc)
410 		goto out;
411 
412 	ia->ri_async_rc = -ETIMEDOUT;
413 	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
414 	if (rc) {
415 		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
416 			__func__, rc);
417 		goto out;
418 	}
419 	wait_for_completion_interruptible_timeout(&ia->ri_done,
420 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
421 	rc = ia->ri_async_rc;
422 	if (rc)
423 		goto out;
424 
425 	return id;
426 
427 out:
428 	rdma_destroy_id(id);
429 	return ERR_PTR(rc);
430 }
431 
432 /*
433  * Drain any cq, prior to teardown.
434  */
435 static void
436 rpcrdma_clean_cq(struct ib_cq *cq)
437 {
438 	struct ib_wc wc;
439 	int count = 0;
440 
441 	while (1 == ib_poll_cq(cq, 1, &wc))
442 		++count;
443 
444 	if (count)
445 		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
446 			__func__, count, wc.opcode);
447 }
448 
449 /*
450  * Exported functions.
451  */
452 
453 /*
454  * Open and initialize an Interface Adapter.
455  *  o initializes fields of struct rpcrdma_ia, including
456  *    interface and provider attributes and protection zone.
457  */
458 int
459 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
460 {
461 	int rc, mem_priv;
462 	struct ib_device_attr devattr;
463 	struct rpcrdma_ia *ia = &xprt->rx_ia;
464 
465 	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
466 	if (IS_ERR(ia->ri_id)) {
467 		rc = PTR_ERR(ia->ri_id);
468 		goto out1;
469 	}
470 
471 	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
472 	if (IS_ERR(ia->ri_pd)) {
473 		rc = PTR_ERR(ia->ri_pd);
474 		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
475 			__func__, rc);
476 		goto out2;
477 	}
478 
479 	/*
480 	 * Query the device to determine if the requested memory
481 	 * registration strategy is supported. If it isn't, set the
482 	 * strategy to a globally supported model.
483 	 */
484 	rc = ib_query_device(ia->ri_id->device, &devattr);
485 	if (rc) {
486 		dprintk("RPC:       %s: ib_query_device failed %d\n",
487 			__func__, rc);
488 		goto out2;
489 	}
490 
491 	if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
492 		ia->ri_have_dma_lkey = 1;
493 		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
494 	}
495 
496 	switch (memreg) {
497 	case RPCRDMA_MEMWINDOWS:
498 	case RPCRDMA_MEMWINDOWS_ASYNC:
499 		if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
500 			dprintk("RPC:       %s: MEMWINDOWS registration "
501 				"specified but not supported by adapter, "
502 				"using slower RPCRDMA_REGISTER\n",
503 				__func__);
504 			memreg = RPCRDMA_REGISTER;
505 		}
506 		break;
507 	case RPCRDMA_MTHCAFMR:
508 		if (!ia->ri_id->device->alloc_fmr) {
509 #if RPCRDMA_PERSISTENT_REGISTRATION
510 			dprintk("RPC:       %s: MTHCAFMR registration "
511 				"specified but not supported by adapter, "
512 				"using riskier RPCRDMA_ALLPHYSICAL\n",
513 				__func__);
514 			memreg = RPCRDMA_ALLPHYSICAL;
515 #else
516 			dprintk("RPC:       %s: MTHCAFMR registration "
517 				"specified but not supported by adapter, "
518 				"using slower RPCRDMA_REGISTER\n",
519 				__func__);
520 			memreg = RPCRDMA_REGISTER;
521 #endif
522 		}
523 		break;
524 	case RPCRDMA_FRMR:
525 		/* Requires both frmr reg and local dma lkey */
526 		if ((devattr.device_cap_flags &
527 		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
528 		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
529 #if RPCRDMA_PERSISTENT_REGISTRATION
530 			dprintk("RPC:       %s: FRMR registration "
531 				"specified but not supported by adapter, "
532 				"using riskier RPCRDMA_ALLPHYSICAL\n",
533 				__func__);
534 			memreg = RPCRDMA_ALLPHYSICAL;
535 #else
536 			dprintk("RPC:       %s: FRMR registration "
537 				"specified but not supported by adapter, "
538 				"using slower RPCRDMA_REGISTER\n",
539 				__func__);
540 			memreg = RPCRDMA_REGISTER;
541 #endif
542 		}
543 		break;
544 	}
545 
546 	/*
547 	 * Optionally obtain an underlying physical identity mapping in
548 	 * order to do a memory window-based bind. This base registration
549 	 * is protected from remote access - that is enabled only by binding
550 	 * for the specific bytes targeted during each RPC operation, and
551 	 * revoked after the corresponding completion similar to a storage
552 	 * adapter.
553 	 */
554 	switch (memreg) {
555 	case RPCRDMA_BOUNCEBUFFERS:
556 	case RPCRDMA_REGISTER:
557 	case RPCRDMA_FRMR:
558 		break;
559 #if RPCRDMA_PERSISTENT_REGISTRATION
560 	case RPCRDMA_ALLPHYSICAL:
561 		mem_priv = IB_ACCESS_LOCAL_WRITE |
562 				IB_ACCESS_REMOTE_WRITE |
563 				IB_ACCESS_REMOTE_READ;
564 		goto register_setup;
565 #endif
566 	case RPCRDMA_MEMWINDOWS_ASYNC:
567 	case RPCRDMA_MEMWINDOWS:
568 		mem_priv = IB_ACCESS_LOCAL_WRITE |
569 				IB_ACCESS_MW_BIND;
570 		goto register_setup;
571 	case RPCRDMA_MTHCAFMR:
572 		if (ia->ri_have_dma_lkey)
573 			break;
574 		mem_priv = IB_ACCESS_LOCAL_WRITE;
575 	register_setup:
576 		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
577 		if (IS_ERR(ia->ri_bind_mem)) {
578 			printk(KERN_ALERT "%s: ib_get_dma_mr for "
579 				"phys register failed with %lX\n\t"
580 				"Will continue with degraded performance\n",
581 				__func__, PTR_ERR(ia->ri_bind_mem));
582 			memreg = RPCRDMA_REGISTER;
583 			ia->ri_bind_mem = NULL;
584 		}
585 		break;
586 	default:
587 		printk(KERN_ERR "%s: invalid memory registration mode %d\n",
588 				__func__, memreg);
589 		rc = -EINVAL;
590 		goto out2;
591 	}
592 	dprintk("RPC:       %s: memory registration strategy is %d\n",
593 		__func__, memreg);
594 
595 	/* Else will do memory reg/dereg for each chunk */
596 	ia->ri_memreg_strategy = memreg;
597 
598 	return 0;
599 out2:
600 	rdma_destroy_id(ia->ri_id);
601 	ia->ri_id = NULL;
602 out1:
603 	return rc;
604 }
605 
606 /*
607  * Clean up/close an IA.
608  *   o if event handles and PD have been initialized, free them.
609  *   o close the IA
610  */
611 void
612 rpcrdma_ia_close(struct rpcrdma_ia *ia)
613 {
614 	int rc;
615 
616 	dprintk("RPC:       %s: entering\n", __func__);
617 	if (ia->ri_bind_mem != NULL) {
618 		rc = ib_dereg_mr(ia->ri_bind_mem);
619 		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
620 			__func__, rc);
621 	}
622 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
623 		if (ia->ri_id->qp)
624 			rdma_destroy_qp(ia->ri_id);
625 		rdma_destroy_id(ia->ri_id);
626 		ia->ri_id = NULL;
627 	}
628 	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
629 		rc = ib_dealloc_pd(ia->ri_pd);
630 		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
631 			__func__, rc);
632 	}
633 }
634 
635 /*
636  * Create unconnected endpoint.
637  */
638 int
639 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
640 				struct rpcrdma_create_data_internal *cdata)
641 {
642 	struct ib_device_attr devattr;
643 	int rc, err;
644 
645 	rc = ib_query_device(ia->ri_id->device, &devattr);
646 	if (rc) {
647 		dprintk("RPC:       %s: ib_query_device failed %d\n",
648 			__func__, rc);
649 		return rc;
650 	}
651 
652 	/* check provider's send/recv wr limits */
653 	if (cdata->max_requests > devattr.max_qp_wr)
654 		cdata->max_requests = devattr.max_qp_wr;
655 
656 	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
657 	ep->rep_attr.qp_context = ep;
658 	/* send_cq and recv_cq initialized below */
659 	ep->rep_attr.srq = NULL;
660 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
661 	switch (ia->ri_memreg_strategy) {
662 	case RPCRDMA_FRMR:
663 		/* Add room for frmr register and invalidate WRs.
664 		 * 1. FRMR reg WR for head
665 		 * 2. FRMR invalidate WR for head
666 		 * 3. FRMR reg WR for pagelist
667 		 * 4. FRMR invalidate WR for pagelist
668 		 * 5. FRMR reg WR for tail
669 		 * 6. FRMR invalidate WR for tail
670 		 * 7. The RDMA_SEND WR
671 		 */
672 		ep->rep_attr.cap.max_send_wr *= 7;
673 		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
674 			cdata->max_requests = devattr.max_qp_wr / 7;
675 			if (!cdata->max_requests)
676 				return -EINVAL;
677 			ep->rep_attr.cap.max_send_wr = cdata->max_requests * 7;
678 		}
679 		break;
680 	case RPCRDMA_MEMWINDOWS_ASYNC:
681 	case RPCRDMA_MEMWINDOWS:
682 		/* Add room for mw_binds+unbinds - overkill! */
683 		ep->rep_attr.cap.max_send_wr++;
684 		ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
685 		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
686 			return -EINVAL;
687 		break;
688 	default:
689 		break;
690 	}
691 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
692 	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
693 	ep->rep_attr.cap.max_recv_sge = 1;
694 	ep->rep_attr.cap.max_inline_data = 0;
695 	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
696 	ep->rep_attr.qp_type = IB_QPT_RC;
697 	ep->rep_attr.port_num = ~0;
698 
699 	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
700 		"iovs: send %d recv %d\n",
701 		__func__,
702 		ep->rep_attr.cap.max_send_wr,
703 		ep->rep_attr.cap.max_recv_wr,
704 		ep->rep_attr.cap.max_send_sge,
705 		ep->rep_attr.cap.max_recv_sge);
706 
707 	/* set trigger for requesting send completion */
708 	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
709 	switch (ia->ri_memreg_strategy) {
710 	case RPCRDMA_MEMWINDOWS_ASYNC:
711 	case RPCRDMA_MEMWINDOWS:
712 		ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
713 		break;
714 	default:
715 		break;
716 	}
717 	if (ep->rep_cqinit <= 2)
718 		ep->rep_cqinit = 0;
719 	INIT_CQCOUNT(ep);
720 	ep->rep_ia = ia;
721 	init_waitqueue_head(&ep->rep_connect_wait);
722 
723 	/*
724 	 * Create a single cq for receive dto and mw_bind (only ever
725 	 * care about unbind, really). Send completions are suppressed.
726 	 * Use single threaded tasklet upcalls to maintain ordering.
727 	 */
728 	ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
729 				  rpcrdma_cq_async_error_upcall, NULL,
730 				  ep->rep_attr.cap.max_recv_wr +
731 				  ep->rep_attr.cap.max_send_wr + 1, 0);
732 	if (IS_ERR(ep->rep_cq)) {
733 		rc = PTR_ERR(ep->rep_cq);
734 		dprintk("RPC:       %s: ib_create_cq failed: %i\n",
735 			__func__, rc);
736 		goto out1;
737 	}
738 
739 	rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
740 	if (rc) {
741 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
742 			__func__, rc);
743 		goto out2;
744 	}
745 
746 	ep->rep_attr.send_cq = ep->rep_cq;
747 	ep->rep_attr.recv_cq = ep->rep_cq;
748 
749 	/* Initialize cma parameters */
750 
751 	/* RPC/RDMA does not use private data */
752 	ep->rep_remote_cma.private_data = NULL;
753 	ep->rep_remote_cma.private_data_len = 0;
754 
755 	/* Client offers RDMA Read but does not initiate */
756 	ep->rep_remote_cma.initiator_depth = 0;
757 	if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
758 		ep->rep_remote_cma.responder_resources = 0;
759 	else if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
760 		ep->rep_remote_cma.responder_resources = 32;
761 	else
762 		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
763 
764 	ep->rep_remote_cma.retry_count = 7;
765 	ep->rep_remote_cma.flow_control = 0;
766 	ep->rep_remote_cma.rnr_retry_count = 0;
767 
768 	return 0;
769 
770 out2:
771 	err = ib_destroy_cq(ep->rep_cq);
772 	if (err)
773 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
774 			__func__, err);
775 out1:
776 	return rc;
777 }
778 
779 /*
780  * rpcrdma_ep_destroy
781  *
782  * Disconnect and destroy endpoint. After this, the only
783  * valid operations on the ep are to free it (if dynamically
784  * allocated) or re-create it.
785  *
786  * The caller's error handling must be sure to not leak the endpoint
787  * if this function fails.
788  */
789 int
790 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
791 {
792 	int rc;
793 
794 	dprintk("RPC:       %s: entering, connected is %d\n",
795 		__func__, ep->rep_connected);
796 
797 	if (ia->ri_id->qp) {
798 		rc = rpcrdma_ep_disconnect(ep, ia);
799 		if (rc)
800 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
801 				" returned %i\n", __func__, rc);
802 		rdma_destroy_qp(ia->ri_id);
803 		ia->ri_id->qp = NULL;
804 	}
805 
806 	/* padding - could be done in rpcrdma_buffer_destroy... */
807 	if (ep->rep_pad_mr) {
808 		rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
809 		ep->rep_pad_mr = NULL;
810 	}
811 
812 	rpcrdma_clean_cq(ep->rep_cq);
813 	rc = ib_destroy_cq(ep->rep_cq);
814 	if (rc)
815 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
816 			__func__, rc);
817 
818 	return rc;
819 }
820 
821 /*
822  * Connect unconnected endpoint.
823  */
824 int
825 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
826 {
827 	struct rdma_cm_id *id;
828 	int rc = 0;
829 	int retry_count = 0;
830 
831 	if (ep->rep_connected != 0) {
832 		struct rpcrdma_xprt *xprt;
833 retry:
834 		rc = rpcrdma_ep_disconnect(ep, ia);
835 		if (rc && rc != -ENOTCONN)
836 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
837 				" status %i\n", __func__, rc);
838 		rpcrdma_clean_cq(ep->rep_cq);
839 
840 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
841 		id = rpcrdma_create_id(xprt, ia,
842 				(struct sockaddr *)&xprt->rx_data.addr);
843 		if (IS_ERR(id)) {
844 			rc = PTR_ERR(id);
845 			goto out;
846 		}
847 		/* TEMP TEMP TEMP - fail if new device:
848 		 * Deregister/remarshal *all* requests!
849 		 * Close and recreate adapter, pd, etc!
850 		 * Re-determine all attributes still sane!
851 		 * More stuff I haven't thought of!
852 		 * Rrrgh!
853 		 */
854 		if (ia->ri_id->device != id->device) {
855 			printk("RPC:       %s: can't reconnect on "
856 				"different device!\n", __func__);
857 			rdma_destroy_id(id);
858 			rc = -ENETDOWN;
859 			goto out;
860 		}
861 		/* END TEMP */
862 		rdma_destroy_qp(ia->ri_id);
863 		rdma_destroy_id(ia->ri_id);
864 		ia->ri_id = id;
865 	}
866 
867 	rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
868 	if (rc) {
869 		dprintk("RPC:       %s: rdma_create_qp failed %i\n",
870 			__func__, rc);
871 		goto out;
872 	}
873 
874 /* XXX Tavor device performs badly with 2K MTU! */
875 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
876 	struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
877 	if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
878 	    (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
879 	     pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
880 		struct ib_qp_attr attr = {
881 			.path_mtu = IB_MTU_1024
882 		};
883 		rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
884 	}
885 }
886 
887 	ep->rep_connected = 0;
888 
889 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
890 	if (rc) {
891 		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
892 				__func__, rc);
893 		goto out;
894 	}
895 
896 	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
897 
898 	/*
899 	 * Check state. A non-peer reject indicates no listener
900 	 * (ECONNREFUSED), which may be a transient state. All
901 	 * others indicate a transport condition which has already
902 	 * undergone a best-effort.
903 	 */
904 	if (ep->rep_connected == -ECONNREFUSED &&
905 	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
906 		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
907 		goto retry;
908 	}
909 	if (ep->rep_connected <= 0) {
910 		/* Sometimes, the only way to reliably connect to remote
911 		 * CMs is to use same nonzero values for ORD and IRD. */
912 		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
913 		    (ep->rep_remote_cma.responder_resources == 0 ||
914 		     ep->rep_remote_cma.initiator_depth !=
915 				ep->rep_remote_cma.responder_resources)) {
916 			if (ep->rep_remote_cma.responder_resources == 0)
917 				ep->rep_remote_cma.responder_resources = 1;
918 			ep->rep_remote_cma.initiator_depth =
919 				ep->rep_remote_cma.responder_resources;
920 			goto retry;
921 		}
922 		rc = ep->rep_connected;
923 	} else {
924 		dprintk("RPC:       %s: connected\n", __func__);
925 	}
926 
927 out:
928 	if (rc)
929 		ep->rep_connected = rc;
930 	return rc;
931 }
932 
933 /*
934  * rpcrdma_ep_disconnect
935  *
936  * This is separate from destroy to facilitate the ability
937  * to reconnect without recreating the endpoint.
938  *
939  * This call is not reentrant, and must not be made in parallel
940  * on the same endpoint.
941  */
942 int
943 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
944 {
945 	int rc;
946 
947 	rpcrdma_clean_cq(ep->rep_cq);
948 	rc = rdma_disconnect(ia->ri_id);
949 	if (!rc) {
950 		/* returns without wait if not connected */
951 		wait_event_interruptible(ep->rep_connect_wait,
952 							ep->rep_connected != 1);
953 		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
954 			(ep->rep_connected == 1) ? "still " : "dis");
955 	} else {
956 		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
957 		ep->rep_connected = rc;
958 	}
959 	return rc;
960 }
961 
962 /*
963  * Initialize buffer memory
964  */
965 int
966 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
967 	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
968 {
969 	char *p;
970 	size_t len;
971 	int i, rc;
972 	struct rpcrdma_mw *r;
973 
974 	buf->rb_max_requests = cdata->max_requests;
975 	spin_lock_init(&buf->rb_lock);
976 	atomic_set(&buf->rb_credits, 1);
977 
978 	/* Need to allocate:
979 	 *   1.  arrays for send and recv pointers
980 	 *   2.  arrays of struct rpcrdma_req to fill in pointers
981 	 *   3.  array of struct rpcrdma_rep for replies
982 	 *   4.  padding, if any
983 	 *   5.  mw's, fmr's or frmr's, if any
984 	 * Send/recv buffers in req/rep need to be registered
985 	 */
986 
987 	len = buf->rb_max_requests *
988 		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
989 	len += cdata->padding;
990 	switch (ia->ri_memreg_strategy) {
991 	case RPCRDMA_FRMR:
992 		len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
993 				sizeof(struct rpcrdma_mw);
994 		break;
995 	case RPCRDMA_MTHCAFMR:
996 		/* TBD we are perhaps overallocating here */
997 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
998 				sizeof(struct rpcrdma_mw);
999 		break;
1000 	case RPCRDMA_MEMWINDOWS_ASYNC:
1001 	case RPCRDMA_MEMWINDOWS:
1002 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
1003 				sizeof(struct rpcrdma_mw);
1004 		break;
1005 	default:
1006 		break;
1007 	}
1008 
1009 	/* allocate 1, 4 and 5 in one shot */
1010 	p = kzalloc(len, GFP_KERNEL);
1011 	if (p == NULL) {
1012 		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1013 			__func__, len);
1014 		rc = -ENOMEM;
1015 		goto out;
1016 	}
1017 	buf->rb_pool = p;	/* for freeing it later */
1018 
1019 	buf->rb_send_bufs = (struct rpcrdma_req **) p;
1020 	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1021 	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1022 	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1023 
1024 	/*
1025 	 * Register the zeroed pad buffer, if any.
1026 	 */
1027 	if (cdata->padding) {
1028 		rc = rpcrdma_register_internal(ia, p, cdata->padding,
1029 					    &ep->rep_pad_mr, &ep->rep_pad);
1030 		if (rc)
1031 			goto out;
1032 	}
1033 	p += cdata->padding;
1034 
1035 	/*
1036 	 * Allocate the fmr's, or mw's for mw_bind chunk registration.
1037 	 * We "cycle" the mw's in order to minimize rkey reuse,
1038 	 * and also reduce unbind-to-bind collision.
1039 	 */
1040 	INIT_LIST_HEAD(&buf->rb_mws);
1041 	r = (struct rpcrdma_mw *)p;
1042 	switch (ia->ri_memreg_strategy) {
1043 	case RPCRDMA_FRMR:
1044 		for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1045 			r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1046 							 RPCRDMA_MAX_SEGS);
1047 			if (IS_ERR(r->r.frmr.fr_mr)) {
1048 				rc = PTR_ERR(r->r.frmr.fr_mr);
1049 				dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1050 					" failed %i\n", __func__, rc);
1051 				goto out;
1052 			}
1053 			r->r.frmr.fr_pgl =
1054 				ib_alloc_fast_reg_page_list(ia->ri_id->device,
1055 							    RPCRDMA_MAX_SEGS);
1056 			if (IS_ERR(r->r.frmr.fr_pgl)) {
1057 				rc = PTR_ERR(r->r.frmr.fr_pgl);
1058 				dprintk("RPC:       %s: "
1059 					"ib_alloc_fast_reg_page_list "
1060 					"failed %i\n", __func__, rc);
1061 				goto out;
1062 			}
1063 			list_add(&r->mw_list, &buf->rb_mws);
1064 			++r;
1065 		}
1066 		break;
1067 	case RPCRDMA_MTHCAFMR:
1068 		/* TBD we are perhaps overallocating here */
1069 		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1070 			static struct ib_fmr_attr fa =
1071 				{ RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1072 			r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1073 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1074 				&fa);
1075 			if (IS_ERR(r->r.fmr)) {
1076 				rc = PTR_ERR(r->r.fmr);
1077 				dprintk("RPC:       %s: ib_alloc_fmr"
1078 					" failed %i\n", __func__, rc);
1079 				goto out;
1080 			}
1081 			list_add(&r->mw_list, &buf->rb_mws);
1082 			++r;
1083 		}
1084 		break;
1085 	case RPCRDMA_MEMWINDOWS_ASYNC:
1086 	case RPCRDMA_MEMWINDOWS:
1087 		/* Allocate one extra request's worth, for full cycling */
1088 		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1089 			r->r.mw = ib_alloc_mw(ia->ri_pd);
1090 			if (IS_ERR(r->r.mw)) {
1091 				rc = PTR_ERR(r->r.mw);
1092 				dprintk("RPC:       %s: ib_alloc_mw"
1093 					" failed %i\n", __func__, rc);
1094 				goto out;
1095 			}
1096 			list_add(&r->mw_list, &buf->rb_mws);
1097 			++r;
1098 		}
1099 		break;
1100 	default:
1101 		break;
1102 	}
1103 
1104 	/*
1105 	 * Allocate/init the request/reply buffers. Doing this
1106 	 * using kmalloc for now -- one for each buf.
1107 	 */
1108 	for (i = 0; i < buf->rb_max_requests; i++) {
1109 		struct rpcrdma_req *req;
1110 		struct rpcrdma_rep *rep;
1111 
1112 		len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1113 		/* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1114 		/* Typical ~2400b, so rounding up saves work later */
1115 		if (len < 4096)
1116 			len = 4096;
1117 		req = kmalloc(len, GFP_KERNEL);
1118 		if (req == NULL) {
1119 			dprintk("RPC:       %s: request buffer %d alloc"
1120 				" failed\n", __func__, i);
1121 			rc = -ENOMEM;
1122 			goto out;
1123 		}
1124 		memset(req, 0, sizeof(struct rpcrdma_req));
1125 		buf->rb_send_bufs[i] = req;
1126 		buf->rb_send_bufs[i]->rl_buffer = buf;
1127 
1128 		rc = rpcrdma_register_internal(ia, req->rl_base,
1129 				len - offsetof(struct rpcrdma_req, rl_base),
1130 				&buf->rb_send_bufs[i]->rl_handle,
1131 				&buf->rb_send_bufs[i]->rl_iov);
1132 		if (rc)
1133 			goto out;
1134 
1135 		buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1136 
1137 		len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1138 		rep = kmalloc(len, GFP_KERNEL);
1139 		if (rep == NULL) {
1140 			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1141 				__func__, i);
1142 			rc = -ENOMEM;
1143 			goto out;
1144 		}
1145 		memset(rep, 0, sizeof(struct rpcrdma_rep));
1146 		buf->rb_recv_bufs[i] = rep;
1147 		buf->rb_recv_bufs[i]->rr_buffer = buf;
1148 		init_waitqueue_head(&rep->rr_unbind);
1149 
1150 		rc = rpcrdma_register_internal(ia, rep->rr_base,
1151 				len - offsetof(struct rpcrdma_rep, rr_base),
1152 				&buf->rb_recv_bufs[i]->rr_handle,
1153 				&buf->rb_recv_bufs[i]->rr_iov);
1154 		if (rc)
1155 			goto out;
1156 
1157 	}
1158 	dprintk("RPC:       %s: max_requests %d\n",
1159 		__func__, buf->rb_max_requests);
1160 	/* done */
1161 	return 0;
1162 out:
1163 	rpcrdma_buffer_destroy(buf);
1164 	return rc;
1165 }
1166 
1167 /*
1168  * Unregister and destroy buffer memory. Need to deal with
1169  * partial initialization, so it's callable from failed create.
1170  * Must be called before destroying endpoint, as registrations
1171  * reference it.
1172  */
1173 void
1174 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1175 {
1176 	int rc, i;
1177 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1178 	struct rpcrdma_mw *r;
1179 
1180 	/* clean up in reverse order from create
1181 	 *   1.  recv mr memory (mr free, then kfree)
1182 	 *   1a. bind mw memory
1183 	 *   2.  send mr memory (mr free, then kfree)
1184 	 *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1185 	 *   4.  arrays
1186 	 */
1187 	dprintk("RPC:       %s: entering\n", __func__);
1188 
1189 	for (i = 0; i < buf->rb_max_requests; i++) {
1190 		if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1191 			rpcrdma_deregister_internal(ia,
1192 					buf->rb_recv_bufs[i]->rr_handle,
1193 					&buf->rb_recv_bufs[i]->rr_iov);
1194 			kfree(buf->rb_recv_bufs[i]);
1195 		}
1196 		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1197 			while (!list_empty(&buf->rb_mws)) {
1198 				r = list_entry(buf->rb_mws.next,
1199 					struct rpcrdma_mw, mw_list);
1200 				list_del(&r->mw_list);
1201 				switch (ia->ri_memreg_strategy) {
1202 				case RPCRDMA_FRMR:
1203 					rc = ib_dereg_mr(r->r.frmr.fr_mr);
1204 					if (rc)
1205 						dprintk("RPC:       %s:"
1206 							" ib_dereg_mr"
1207 							" failed %i\n",
1208 							__func__, rc);
1209 					ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1210 					break;
1211 				case RPCRDMA_MTHCAFMR:
1212 					rc = ib_dealloc_fmr(r->r.fmr);
1213 					if (rc)
1214 						dprintk("RPC:       %s:"
1215 							" ib_dealloc_fmr"
1216 							" failed %i\n",
1217 							__func__, rc);
1218 					break;
1219 				case RPCRDMA_MEMWINDOWS_ASYNC:
1220 				case RPCRDMA_MEMWINDOWS:
1221 					rc = ib_dealloc_mw(r->r.mw);
1222 					if (rc)
1223 						dprintk("RPC:       %s:"
1224 							" ib_dealloc_mw"
1225 							" failed %i\n",
1226 							__func__, rc);
1227 					break;
1228 				default:
1229 					break;
1230 				}
1231 			}
1232 			rpcrdma_deregister_internal(ia,
1233 					buf->rb_send_bufs[i]->rl_handle,
1234 					&buf->rb_send_bufs[i]->rl_iov);
1235 			kfree(buf->rb_send_bufs[i]);
1236 		}
1237 	}
1238 
1239 	kfree(buf->rb_pool);
1240 }
1241 
1242 /*
1243  * Get a set of request/reply buffers.
1244  *
1245  * Reply buffer (if needed) is attached to send buffer upon return.
1246  * Rule:
1247  *    rb_send_index and rb_recv_index MUST always be pointing to the
1248  *    *next* available buffer (non-NULL). They are incremented after
1249  *    removing buffers, and decremented *before* returning them.
1250  */
1251 struct rpcrdma_req *
1252 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1253 {
1254 	struct rpcrdma_req *req;
1255 	unsigned long flags;
1256 	int i;
1257 	struct rpcrdma_mw *r;
1258 
1259 	spin_lock_irqsave(&buffers->rb_lock, flags);
1260 	if (buffers->rb_send_index == buffers->rb_max_requests) {
1261 		spin_unlock_irqrestore(&buffers->rb_lock, flags);
1262 		dprintk("RPC:       %s: out of request buffers\n", __func__);
1263 		return ((struct rpcrdma_req *)NULL);
1264 	}
1265 
1266 	req = buffers->rb_send_bufs[buffers->rb_send_index];
1267 	if (buffers->rb_send_index < buffers->rb_recv_index) {
1268 		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1269 			__func__,
1270 			buffers->rb_recv_index - buffers->rb_send_index);
1271 		req->rl_reply = NULL;
1272 	} else {
1273 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1274 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1275 	}
1276 	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1277 	if (!list_empty(&buffers->rb_mws)) {
1278 		i = RPCRDMA_MAX_SEGS - 1;
1279 		do {
1280 			r = list_entry(buffers->rb_mws.next,
1281 					struct rpcrdma_mw, mw_list);
1282 			list_del(&r->mw_list);
1283 			req->rl_segments[i].mr_chunk.rl_mw = r;
1284 		} while (--i >= 0);
1285 	}
1286 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1287 	return req;
1288 }
1289 
1290 /*
1291  * Put request/reply buffers back into pool.
1292  * Pre-decrement counter/array index.
1293  */
1294 void
1295 rpcrdma_buffer_put(struct rpcrdma_req *req)
1296 {
1297 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1298 	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1299 	int i;
1300 	unsigned long flags;
1301 
1302 	BUG_ON(req->rl_nchunks != 0);
1303 	spin_lock_irqsave(&buffers->rb_lock, flags);
1304 	buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1305 	req->rl_niovs = 0;
1306 	if (req->rl_reply) {
1307 		buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1308 		init_waitqueue_head(&req->rl_reply->rr_unbind);
1309 		req->rl_reply->rr_func = NULL;
1310 		req->rl_reply = NULL;
1311 	}
1312 	switch (ia->ri_memreg_strategy) {
1313 	case RPCRDMA_FRMR:
1314 	case RPCRDMA_MTHCAFMR:
1315 	case RPCRDMA_MEMWINDOWS_ASYNC:
1316 	case RPCRDMA_MEMWINDOWS:
1317 		/*
1318 		 * Cycle mw's back in reverse order, and "spin" them.
1319 		 * This delays and scrambles reuse as much as possible.
1320 		 */
1321 		i = 1;
1322 		do {
1323 			struct rpcrdma_mw **mw;
1324 			mw = &req->rl_segments[i].mr_chunk.rl_mw;
1325 			list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1326 			*mw = NULL;
1327 		} while (++i < RPCRDMA_MAX_SEGS);
1328 		list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1329 					&buffers->rb_mws);
1330 		req->rl_segments[0].mr_chunk.rl_mw = NULL;
1331 		break;
1332 	default:
1333 		break;
1334 	}
1335 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1336 }
1337 
1338 /*
1339  * Recover reply buffers from pool.
1340  * This happens when recovering from error conditions.
1341  * Post-increment counter/array index.
1342  */
1343 void
1344 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1345 {
1346 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1347 	unsigned long flags;
1348 
1349 	if (req->rl_iov.length == 0)	/* special case xprt_rdma_allocate() */
1350 		buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1351 	spin_lock_irqsave(&buffers->rb_lock, flags);
1352 	if (buffers->rb_recv_index < buffers->rb_max_requests) {
1353 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1354 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1355 	}
1356 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1357 }
1358 
1359 /*
1360  * Put reply buffers back into pool when not attached to
1361  * request. This happens in error conditions, and when
1362  * aborting unbinds. Pre-decrement counter/array index.
1363  */
1364 void
1365 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1366 {
1367 	struct rpcrdma_buffer *buffers = rep->rr_buffer;
1368 	unsigned long flags;
1369 
1370 	rep->rr_func = NULL;
1371 	spin_lock_irqsave(&buffers->rb_lock, flags);
1372 	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1373 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1374 }
1375 
1376 /*
1377  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1378  */
1379 
1380 int
1381 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1382 				struct ib_mr **mrp, struct ib_sge *iov)
1383 {
1384 	struct ib_phys_buf ipb;
1385 	struct ib_mr *mr;
1386 	int rc;
1387 
1388 	/*
1389 	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1390 	 */
1391 	iov->addr = ib_dma_map_single(ia->ri_id->device,
1392 			va, len, DMA_BIDIRECTIONAL);
1393 	iov->length = len;
1394 
1395 	if (ia->ri_have_dma_lkey) {
1396 		*mrp = NULL;
1397 		iov->lkey = ia->ri_dma_lkey;
1398 		return 0;
1399 	} else if (ia->ri_bind_mem != NULL) {
1400 		*mrp = NULL;
1401 		iov->lkey = ia->ri_bind_mem->lkey;
1402 		return 0;
1403 	}
1404 
1405 	ipb.addr = iov->addr;
1406 	ipb.size = iov->length;
1407 	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1408 			IB_ACCESS_LOCAL_WRITE, &iov->addr);
1409 
1410 	dprintk("RPC:       %s: phys convert: 0x%llx "
1411 			"registered 0x%llx length %d\n",
1412 			__func__, (unsigned long long)ipb.addr,
1413 			(unsigned long long)iov->addr, len);
1414 
1415 	if (IS_ERR(mr)) {
1416 		*mrp = NULL;
1417 		rc = PTR_ERR(mr);
1418 		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1419 	} else {
1420 		*mrp = mr;
1421 		iov->lkey = mr->lkey;
1422 		rc = 0;
1423 	}
1424 
1425 	return rc;
1426 }
1427 
1428 int
1429 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1430 				struct ib_mr *mr, struct ib_sge *iov)
1431 {
1432 	int rc;
1433 
1434 	ib_dma_unmap_single(ia->ri_id->device,
1435 			iov->addr, iov->length, DMA_BIDIRECTIONAL);
1436 
1437 	if (NULL == mr)
1438 		return 0;
1439 
1440 	rc = ib_dereg_mr(mr);
1441 	if (rc)
1442 		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1443 	return rc;
1444 }
1445 
1446 /*
1447  * Wrappers for chunk registration, shared by read/write chunk code.
1448  */
1449 
1450 static void
1451 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1452 {
1453 	seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1454 	seg->mr_dmalen = seg->mr_len;
1455 	if (seg->mr_page)
1456 		seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1457 				seg->mr_page, offset_in_page(seg->mr_offset),
1458 				seg->mr_dmalen, seg->mr_dir);
1459 	else
1460 		seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1461 				seg->mr_offset,
1462 				seg->mr_dmalen, seg->mr_dir);
1463 	if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1464 		dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1465 			__func__,
1466 			(unsigned long long)seg->mr_dma,
1467 			seg->mr_offset, seg->mr_dmalen);
1468 	}
1469 }
1470 
1471 static void
1472 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1473 {
1474 	if (seg->mr_page)
1475 		ib_dma_unmap_page(ia->ri_id->device,
1476 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1477 	else
1478 		ib_dma_unmap_single(ia->ri_id->device,
1479 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1480 }
1481 
1482 static int
1483 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1484 			int *nsegs, int writing, struct rpcrdma_ia *ia,
1485 			struct rpcrdma_xprt *r_xprt)
1486 {
1487 	struct rpcrdma_mr_seg *seg1 = seg;
1488 	struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
1489 
1490 	u8 key;
1491 	int len, pageoff;
1492 	int i, rc;
1493 
1494 	pageoff = offset_in_page(seg1->mr_offset);
1495 	seg1->mr_offset -= pageoff;	/* start of page */
1496 	seg1->mr_len += pageoff;
1497 	len = -pageoff;
1498 	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1499 		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1500 	for (i = 0; i < *nsegs;) {
1501 		rpcrdma_map_one(ia, seg, writing);
1502 		seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
1503 		len += seg->mr_len;
1504 		BUG_ON(seg->mr_len > PAGE_SIZE);
1505 		++seg;
1506 		++i;
1507 		/* Check for holes */
1508 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1509 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1510 			break;
1511 	}
1512 	dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1513 		__func__, seg1->mr_chunk.rl_mw, i);
1514 
1515 	if (unlikely(seg1->mr_chunk.rl_mw->r.frmr.state == FRMR_IS_VALID)) {
1516 		dprintk("RPC:       %s: frmr %x left valid, posting invalidate.\n",
1517 			__func__,
1518 			seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey);
1519 		/* Invalidate before using. */
1520 		memset(&invalidate_wr, 0, sizeof invalidate_wr);
1521 		invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1522 		invalidate_wr.next = &frmr_wr;
1523 		invalidate_wr.opcode = IB_WR_LOCAL_INV;
1524 		invalidate_wr.send_flags = IB_SEND_SIGNALED;
1525 		invalidate_wr.ex.invalidate_rkey =
1526 			seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1527 		DECR_CQCOUNT(&r_xprt->rx_ep);
1528 		post_wr = &invalidate_wr;
1529 	} else
1530 		post_wr = &frmr_wr;
1531 
1532 	/* Bump the key */
1533 	key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1534 	ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1535 
1536 	/* Prepare FRMR WR */
1537 	memset(&frmr_wr, 0, sizeof frmr_wr);
1538 	frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1539 	frmr_wr.opcode = IB_WR_FAST_REG_MR;
1540 	frmr_wr.send_flags = IB_SEND_SIGNALED;
1541 	frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1542 	frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1543 	frmr_wr.wr.fast_reg.page_list_len = i;
1544 	frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1545 	frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
1546 	BUG_ON(frmr_wr.wr.fast_reg.length < len);
1547 	frmr_wr.wr.fast_reg.access_flags = (writing ?
1548 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1549 				IB_ACCESS_REMOTE_READ);
1550 	frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1551 	DECR_CQCOUNT(&r_xprt->rx_ep);
1552 
1553 	rc = ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
1554 
1555 	if (rc) {
1556 		dprintk("RPC:       %s: failed ib_post_send for register,"
1557 			" status %i\n", __func__, rc);
1558 		while (i--)
1559 			rpcrdma_unmap_one(ia, --seg);
1560 	} else {
1561 		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1562 		seg1->mr_base = seg1->mr_dma + pageoff;
1563 		seg1->mr_nsegs = i;
1564 		seg1->mr_len = len;
1565 	}
1566 	*nsegs = i;
1567 	return rc;
1568 }
1569 
1570 static int
1571 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1572 			struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1573 {
1574 	struct rpcrdma_mr_seg *seg1 = seg;
1575 	struct ib_send_wr invalidate_wr, *bad_wr;
1576 	int rc;
1577 
1578 	while (seg1->mr_nsegs--)
1579 		rpcrdma_unmap_one(ia, seg++);
1580 
1581 	memset(&invalidate_wr, 0, sizeof invalidate_wr);
1582 	invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1583 	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1584 	invalidate_wr.send_flags = IB_SEND_SIGNALED;
1585 	invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1586 	DECR_CQCOUNT(&r_xprt->rx_ep);
1587 
1588 	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1589 	if (rc)
1590 		dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1591 			" status %i\n", __func__, rc);
1592 	return rc;
1593 }
1594 
1595 static int
1596 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1597 			int *nsegs, int writing, struct rpcrdma_ia *ia)
1598 {
1599 	struct rpcrdma_mr_seg *seg1 = seg;
1600 	u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1601 	int len, pageoff, i, rc;
1602 
1603 	pageoff = offset_in_page(seg1->mr_offset);
1604 	seg1->mr_offset -= pageoff;	/* start of page */
1605 	seg1->mr_len += pageoff;
1606 	len = -pageoff;
1607 	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1608 		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1609 	for (i = 0; i < *nsegs;) {
1610 		rpcrdma_map_one(ia, seg, writing);
1611 		physaddrs[i] = seg->mr_dma;
1612 		len += seg->mr_len;
1613 		++seg;
1614 		++i;
1615 		/* Check for holes */
1616 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1617 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1618 			break;
1619 	}
1620 	rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1621 				physaddrs, i, seg1->mr_dma);
1622 	if (rc) {
1623 		dprintk("RPC:       %s: failed ib_map_phys_fmr "
1624 			"%u@0x%llx+%i (%d)... status %i\n", __func__,
1625 			len, (unsigned long long)seg1->mr_dma,
1626 			pageoff, i, rc);
1627 		while (i--)
1628 			rpcrdma_unmap_one(ia, --seg);
1629 	} else {
1630 		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1631 		seg1->mr_base = seg1->mr_dma + pageoff;
1632 		seg1->mr_nsegs = i;
1633 		seg1->mr_len = len;
1634 	}
1635 	*nsegs = i;
1636 	return rc;
1637 }
1638 
1639 static int
1640 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1641 			struct rpcrdma_ia *ia)
1642 {
1643 	struct rpcrdma_mr_seg *seg1 = seg;
1644 	LIST_HEAD(l);
1645 	int rc;
1646 
1647 	list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1648 	rc = ib_unmap_fmr(&l);
1649 	while (seg1->mr_nsegs--)
1650 		rpcrdma_unmap_one(ia, seg++);
1651 	if (rc)
1652 		dprintk("RPC:       %s: failed ib_unmap_fmr,"
1653 			" status %i\n", __func__, rc);
1654 	return rc;
1655 }
1656 
1657 static int
1658 rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1659 			int *nsegs, int writing, struct rpcrdma_ia *ia,
1660 			struct rpcrdma_xprt *r_xprt)
1661 {
1662 	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1663 				  IB_ACCESS_REMOTE_READ);
1664 	struct ib_mw_bind param;
1665 	int rc;
1666 
1667 	*nsegs = 1;
1668 	rpcrdma_map_one(ia, seg, writing);
1669 	param.mr = ia->ri_bind_mem;
1670 	param.wr_id = 0ULL;	/* no send cookie */
1671 	param.addr = seg->mr_dma;
1672 	param.length = seg->mr_len;
1673 	param.send_flags = 0;
1674 	param.mw_access_flags = mem_priv;
1675 
1676 	DECR_CQCOUNT(&r_xprt->rx_ep);
1677 	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1678 	if (rc) {
1679 		dprintk("RPC:       %s: failed ib_bind_mw "
1680 			"%u@0x%llx status %i\n",
1681 			__func__, seg->mr_len,
1682 			(unsigned long long)seg->mr_dma, rc);
1683 		rpcrdma_unmap_one(ia, seg);
1684 	} else {
1685 		seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1686 		seg->mr_base = param.addr;
1687 		seg->mr_nsegs = 1;
1688 	}
1689 	return rc;
1690 }
1691 
1692 static int
1693 rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1694 			struct rpcrdma_ia *ia,
1695 			struct rpcrdma_xprt *r_xprt, void **r)
1696 {
1697 	struct ib_mw_bind param;
1698 	LIST_HEAD(l);
1699 	int rc;
1700 
1701 	BUG_ON(seg->mr_nsegs != 1);
1702 	param.mr = ia->ri_bind_mem;
1703 	param.addr = 0ULL;	/* unbind */
1704 	param.length = 0;
1705 	param.mw_access_flags = 0;
1706 	if (*r) {
1707 		param.wr_id = (u64) (unsigned long) *r;
1708 		param.send_flags = IB_SEND_SIGNALED;
1709 		INIT_CQCOUNT(&r_xprt->rx_ep);
1710 	} else {
1711 		param.wr_id = 0ULL;
1712 		param.send_flags = 0;
1713 		DECR_CQCOUNT(&r_xprt->rx_ep);
1714 	}
1715 	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1716 	rpcrdma_unmap_one(ia, seg);
1717 	if (rc)
1718 		dprintk("RPC:       %s: failed ib_(un)bind_mw,"
1719 			" status %i\n", __func__, rc);
1720 	else
1721 		*r = NULL;	/* will upcall on completion */
1722 	return rc;
1723 }
1724 
1725 static int
1726 rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1727 			int *nsegs, int writing, struct rpcrdma_ia *ia)
1728 {
1729 	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1730 				  IB_ACCESS_REMOTE_READ);
1731 	struct rpcrdma_mr_seg *seg1 = seg;
1732 	struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1733 	int len, i, rc = 0;
1734 
1735 	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1736 		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1737 	for (len = 0, i = 0; i < *nsegs;) {
1738 		rpcrdma_map_one(ia, seg, writing);
1739 		ipb[i].addr = seg->mr_dma;
1740 		ipb[i].size = seg->mr_len;
1741 		len += seg->mr_len;
1742 		++seg;
1743 		++i;
1744 		/* Check for holes */
1745 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1746 		    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1747 			break;
1748 	}
1749 	seg1->mr_base = seg1->mr_dma;
1750 	seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1751 				ipb, i, mem_priv, &seg1->mr_base);
1752 	if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1753 		rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1754 		dprintk("RPC:       %s: failed ib_reg_phys_mr "
1755 			"%u@0x%llx (%d)... status %i\n",
1756 			__func__, len,
1757 			(unsigned long long)seg1->mr_dma, i, rc);
1758 		while (i--)
1759 			rpcrdma_unmap_one(ia, --seg);
1760 	} else {
1761 		seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1762 		seg1->mr_nsegs = i;
1763 		seg1->mr_len = len;
1764 	}
1765 	*nsegs = i;
1766 	return rc;
1767 }
1768 
1769 static int
1770 rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1771 			struct rpcrdma_ia *ia)
1772 {
1773 	struct rpcrdma_mr_seg *seg1 = seg;
1774 	int rc;
1775 
1776 	rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1777 	seg1->mr_chunk.rl_mr = NULL;
1778 	while (seg1->mr_nsegs--)
1779 		rpcrdma_unmap_one(ia, seg++);
1780 	if (rc)
1781 		dprintk("RPC:       %s: failed ib_dereg_mr,"
1782 			" status %i\n", __func__, rc);
1783 	return rc;
1784 }
1785 
1786 int
1787 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1788 			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1789 {
1790 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1791 	int rc = 0;
1792 
1793 	switch (ia->ri_memreg_strategy) {
1794 
1795 #if RPCRDMA_PERSISTENT_REGISTRATION
1796 	case RPCRDMA_ALLPHYSICAL:
1797 		rpcrdma_map_one(ia, seg, writing);
1798 		seg->mr_rkey = ia->ri_bind_mem->rkey;
1799 		seg->mr_base = seg->mr_dma;
1800 		seg->mr_nsegs = 1;
1801 		nsegs = 1;
1802 		break;
1803 #endif
1804 
1805 	/* Registration using frmr registration */
1806 	case RPCRDMA_FRMR:
1807 		rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1808 		break;
1809 
1810 	/* Registration using fmr memory registration */
1811 	case RPCRDMA_MTHCAFMR:
1812 		rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1813 		break;
1814 
1815 	/* Registration using memory windows */
1816 	case RPCRDMA_MEMWINDOWS_ASYNC:
1817 	case RPCRDMA_MEMWINDOWS:
1818 		rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1819 		break;
1820 
1821 	/* Default registration each time */
1822 	default:
1823 		rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1824 		break;
1825 	}
1826 	if (rc)
1827 		return -1;
1828 
1829 	return nsegs;
1830 }
1831 
1832 int
1833 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1834 		struct rpcrdma_xprt *r_xprt, void *r)
1835 {
1836 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1837 	int nsegs = seg->mr_nsegs, rc;
1838 
1839 	switch (ia->ri_memreg_strategy) {
1840 
1841 #if RPCRDMA_PERSISTENT_REGISTRATION
1842 	case RPCRDMA_ALLPHYSICAL:
1843 		BUG_ON(nsegs != 1);
1844 		rpcrdma_unmap_one(ia, seg);
1845 		rc = 0;
1846 		break;
1847 #endif
1848 
1849 	case RPCRDMA_FRMR:
1850 		rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1851 		break;
1852 
1853 	case RPCRDMA_MTHCAFMR:
1854 		rc = rpcrdma_deregister_fmr_external(seg, ia);
1855 		break;
1856 
1857 	case RPCRDMA_MEMWINDOWS_ASYNC:
1858 	case RPCRDMA_MEMWINDOWS:
1859 		rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1860 		break;
1861 
1862 	default:
1863 		rc = rpcrdma_deregister_default_external(seg, ia);
1864 		break;
1865 	}
1866 	if (r) {
1867 		struct rpcrdma_rep *rep = r;
1868 		void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1869 		rep->rr_func = NULL;
1870 		func(rep);	/* dereg done, callback now */
1871 	}
1872 	return nsegs;
1873 }
1874 
1875 /*
1876  * Prepost any receive buffer, then post send.
1877  *
1878  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1879  */
1880 int
1881 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1882 		struct rpcrdma_ep *ep,
1883 		struct rpcrdma_req *req)
1884 {
1885 	struct ib_send_wr send_wr, *send_wr_fail;
1886 	struct rpcrdma_rep *rep = req->rl_reply;
1887 	int rc;
1888 
1889 	if (rep) {
1890 		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1891 		if (rc)
1892 			goto out;
1893 		req->rl_reply = NULL;
1894 	}
1895 
1896 	send_wr.next = NULL;
1897 	send_wr.wr_id = 0ULL;	/* no send cookie */
1898 	send_wr.sg_list = req->rl_send_iov;
1899 	send_wr.num_sge = req->rl_niovs;
1900 	send_wr.opcode = IB_WR_SEND;
1901 	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
1902 		ib_dma_sync_single_for_device(ia->ri_id->device,
1903 			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1904 			DMA_TO_DEVICE);
1905 	ib_dma_sync_single_for_device(ia->ri_id->device,
1906 		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1907 		DMA_TO_DEVICE);
1908 	ib_dma_sync_single_for_device(ia->ri_id->device,
1909 		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1910 		DMA_TO_DEVICE);
1911 
1912 	if (DECR_CQCOUNT(ep) > 0)
1913 		send_wr.send_flags = 0;
1914 	else { /* Provider must take a send completion every now and then */
1915 		INIT_CQCOUNT(ep);
1916 		send_wr.send_flags = IB_SEND_SIGNALED;
1917 	}
1918 
1919 	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1920 	if (rc)
1921 		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1922 			rc);
1923 out:
1924 	return rc;
1925 }
1926 
1927 /*
1928  * (Re)post a receive buffer.
1929  */
1930 int
1931 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1932 		     struct rpcrdma_ep *ep,
1933 		     struct rpcrdma_rep *rep)
1934 {
1935 	struct ib_recv_wr recv_wr, *recv_wr_fail;
1936 	int rc;
1937 
1938 	recv_wr.next = NULL;
1939 	recv_wr.wr_id = (u64) (unsigned long) rep;
1940 	recv_wr.sg_list = &rep->rr_iov;
1941 	recv_wr.num_sge = 1;
1942 
1943 	ib_dma_sync_single_for_cpu(ia->ri_id->device,
1944 		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1945 
1946 	DECR_CQCOUNT(ep);
1947 	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1948 
1949 	if (rc)
1950 		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1951 			rc);
1952 	return rc;
1953 }
1954