xref: /openbmc/linux/net/sunrpc/xprtrdma/verbs.c (revision b8bb76713ec50df2f11efee386e16f93d51e1076)
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39 
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49 
50 #include <linux/pci.h>	/* for Tavor hack below */
51 
52 #include "xprt_rdma.h"
53 
54 /*
55  * Globals/Macros
56  */
57 
58 #ifdef RPC_DEBUG
59 # define RPCDBG_FACILITY	RPCDBG_TRANS
60 #endif
61 
62 /*
63  * internal functions
64  */
65 
66 /*
67  * handle replies in tasklet context, using a single, global list
68  * rdma tasklet function -- just turn around and call the func
69  * for all replies on the list
70  */
71 
72 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
73 static LIST_HEAD(rpcrdma_tasklets_g);
74 
75 static void
76 rpcrdma_run_tasklet(unsigned long data)
77 {
78 	struct rpcrdma_rep *rep;
79 	void (*func)(struct rpcrdma_rep *);
80 	unsigned long flags;
81 
82 	data = data;
83 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
84 	while (!list_empty(&rpcrdma_tasklets_g)) {
85 		rep = list_entry(rpcrdma_tasklets_g.next,
86 				 struct rpcrdma_rep, rr_list);
87 		list_del(&rep->rr_list);
88 		func = rep->rr_func;
89 		rep->rr_func = NULL;
90 		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
91 
92 		if (func)
93 			func(rep);
94 		else
95 			rpcrdma_recv_buffer_put(rep);
96 
97 		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
98 	}
99 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
100 }
101 
102 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
103 
104 static inline void
105 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
106 {
107 	unsigned long flags;
108 
109 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
110 	list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
111 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
112 	tasklet_schedule(&rpcrdma_tasklet_g);
113 }
114 
115 static void
116 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
117 {
118 	struct rpcrdma_ep *ep = context;
119 
120 	dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
121 		__func__, event->event, event->device->name, context);
122 	if (ep->rep_connected == 1) {
123 		ep->rep_connected = -EIO;
124 		ep->rep_func(ep);
125 		wake_up_all(&ep->rep_connect_wait);
126 	}
127 }
128 
129 static void
130 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
131 {
132 	struct rpcrdma_ep *ep = context;
133 
134 	dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
135 		__func__, event->event, event->device->name, context);
136 	if (ep->rep_connected == 1) {
137 		ep->rep_connected = -EIO;
138 		ep->rep_func(ep);
139 		wake_up_all(&ep->rep_connect_wait);
140 	}
141 }
142 
143 static inline
144 void rpcrdma_event_process(struct ib_wc *wc)
145 {
146 	struct rpcrdma_rep *rep =
147 			(struct rpcrdma_rep *)(unsigned long) wc->wr_id;
148 
149 	dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
150 		__func__, rep, wc->status, wc->opcode, wc->byte_len);
151 
152 	if (!rep) /* send or bind completion that we don't care about */
153 		return;
154 
155 	if (IB_WC_SUCCESS != wc->status) {
156 		dprintk("RPC:       %s: %s WC status %X, connection lost\n",
157 			__func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
158 			 wc->status);
159 		rep->rr_len = ~0U;
160 		rpcrdma_schedule_tasklet(rep);
161 		return;
162 	}
163 
164 	switch (wc->opcode) {
165 	case IB_WC_RECV:
166 		rep->rr_len = wc->byte_len;
167 		ib_dma_sync_single_for_cpu(
168 			rdmab_to_ia(rep->rr_buffer)->ri_id->device,
169 			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
170 		/* Keep (only) the most recent credits, after check validity */
171 		if (rep->rr_len >= 16) {
172 			struct rpcrdma_msg *p =
173 					(struct rpcrdma_msg *) rep->rr_base;
174 			unsigned int credits = ntohl(p->rm_credit);
175 			if (credits == 0) {
176 				dprintk("RPC:       %s: server"
177 					" dropped credits to 0!\n", __func__);
178 				/* don't deadlock */
179 				credits = 1;
180 			} else if (credits > rep->rr_buffer->rb_max_requests) {
181 				dprintk("RPC:       %s: server"
182 					" over-crediting: %d (%d)\n",
183 					__func__, credits,
184 					rep->rr_buffer->rb_max_requests);
185 				credits = rep->rr_buffer->rb_max_requests;
186 			}
187 			atomic_set(&rep->rr_buffer->rb_credits, credits);
188 		}
189 		/* fall through */
190 	case IB_WC_BIND_MW:
191 		rpcrdma_schedule_tasklet(rep);
192 		break;
193 	default:
194 		dprintk("RPC:       %s: unexpected WC event %X\n",
195 			__func__, wc->opcode);
196 		break;
197 	}
198 }
199 
200 static inline int
201 rpcrdma_cq_poll(struct ib_cq *cq)
202 {
203 	struct ib_wc wc;
204 	int rc;
205 
206 	for (;;) {
207 		rc = ib_poll_cq(cq, 1, &wc);
208 		if (rc < 0) {
209 			dprintk("RPC:       %s: ib_poll_cq failed %i\n",
210 				__func__, rc);
211 			return rc;
212 		}
213 		if (rc == 0)
214 			break;
215 
216 		rpcrdma_event_process(&wc);
217 	}
218 
219 	return 0;
220 }
221 
222 /*
223  * rpcrdma_cq_event_upcall
224  *
225  * This upcall handles recv, send, bind and unbind events.
226  * It is reentrant but processes single events in order to maintain
227  * ordering of receives to keep server credits.
228  *
229  * It is the responsibility of the scheduled tasklet to return
230  * recv buffers to the pool. NOTE: this affects synchronization of
231  * connection shutdown. That is, the structures required for
232  * the completion of the reply handler must remain intact until
233  * all memory has been reclaimed.
234  *
235  * Note that send events are suppressed and do not result in an upcall.
236  */
237 static void
238 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
239 {
240 	int rc;
241 
242 	rc = rpcrdma_cq_poll(cq);
243 	if (rc)
244 		return;
245 
246 	rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
247 	if (rc) {
248 		dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
249 			__func__, rc);
250 		return;
251 	}
252 
253 	rpcrdma_cq_poll(cq);
254 }
255 
256 #ifdef RPC_DEBUG
257 static const char * const conn[] = {
258 	"address resolved",
259 	"address error",
260 	"route resolved",
261 	"route error",
262 	"connect request",
263 	"connect response",
264 	"connect error",
265 	"unreachable",
266 	"rejected",
267 	"established",
268 	"disconnected",
269 	"device removal"
270 };
271 #endif
272 
273 static int
274 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
275 {
276 	struct rpcrdma_xprt *xprt = id->context;
277 	struct rpcrdma_ia *ia = &xprt->rx_ia;
278 	struct rpcrdma_ep *ep = &xprt->rx_ep;
279 #ifdef RPC_DEBUG
280 	struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
281 #endif
282 	struct ib_qp_attr attr;
283 	struct ib_qp_init_attr iattr;
284 	int connstate = 0;
285 
286 	switch (event->event) {
287 	case RDMA_CM_EVENT_ADDR_RESOLVED:
288 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
289 		ia->ri_async_rc = 0;
290 		complete(&ia->ri_done);
291 		break;
292 	case RDMA_CM_EVENT_ADDR_ERROR:
293 		ia->ri_async_rc = -EHOSTUNREACH;
294 		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
295 			__func__, ep);
296 		complete(&ia->ri_done);
297 		break;
298 	case RDMA_CM_EVENT_ROUTE_ERROR:
299 		ia->ri_async_rc = -ENETUNREACH;
300 		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
301 			__func__, ep);
302 		complete(&ia->ri_done);
303 		break;
304 	case RDMA_CM_EVENT_ESTABLISHED:
305 		connstate = 1;
306 		ib_query_qp(ia->ri_id->qp, &attr,
307 			IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
308 			&iattr);
309 		dprintk("RPC:       %s: %d responder resources"
310 			" (%d initiator)\n",
311 			__func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
312 		goto connected;
313 	case RDMA_CM_EVENT_CONNECT_ERROR:
314 		connstate = -ENOTCONN;
315 		goto connected;
316 	case RDMA_CM_EVENT_UNREACHABLE:
317 		connstate = -ENETDOWN;
318 		goto connected;
319 	case RDMA_CM_EVENT_REJECTED:
320 		connstate = -ECONNREFUSED;
321 		goto connected;
322 	case RDMA_CM_EVENT_DISCONNECTED:
323 		connstate = -ECONNABORTED;
324 		goto connected;
325 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
326 		connstate = -ENODEV;
327 connected:
328 		dprintk("RPC:       %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
329 			__func__,
330 			(event->event <= 11) ? conn[event->event] :
331 						"unknown connection error",
332 			&addr->sin_addr.s_addr,
333 			ntohs(addr->sin_port),
334 			ep, event->event);
335 		atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
336 		dprintk("RPC:       %s: %sconnected\n",
337 					__func__, connstate > 0 ? "" : "dis");
338 		ep->rep_connected = connstate;
339 		ep->rep_func(ep);
340 		wake_up_all(&ep->rep_connect_wait);
341 		break;
342 	default:
343 		dprintk("RPC:       %s: unexpected CM event %d\n",
344 			__func__, event->event);
345 		break;
346 	}
347 
348 #ifdef RPC_DEBUG
349 	if (connstate == 1) {
350 		int ird = attr.max_dest_rd_atomic;
351 		int tird = ep->rep_remote_cma.responder_resources;
352 		printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
353 			"on %s, memreg %d slots %d ird %d%s\n",
354 			&addr->sin_addr.s_addr,
355 			ntohs(addr->sin_port),
356 			ia->ri_id->device->name,
357 			ia->ri_memreg_strategy,
358 			xprt->rx_buf.rb_max_requests,
359 			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
360 	} else if (connstate < 0) {
361 		printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
362 			&addr->sin_addr.s_addr,
363 			ntohs(addr->sin_port),
364 			connstate);
365 	}
366 #endif
367 
368 	return 0;
369 }
370 
371 static struct rdma_cm_id *
372 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
373 			struct rpcrdma_ia *ia, struct sockaddr *addr)
374 {
375 	struct rdma_cm_id *id;
376 	int rc;
377 
378 	init_completion(&ia->ri_done);
379 
380 	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
381 	if (IS_ERR(id)) {
382 		rc = PTR_ERR(id);
383 		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
384 			__func__, rc);
385 		return id;
386 	}
387 
388 	ia->ri_async_rc = -ETIMEDOUT;
389 	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
390 	if (rc) {
391 		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
392 			__func__, rc);
393 		goto out;
394 	}
395 	wait_for_completion_interruptible_timeout(&ia->ri_done,
396 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
397 	rc = ia->ri_async_rc;
398 	if (rc)
399 		goto out;
400 
401 	ia->ri_async_rc = -ETIMEDOUT;
402 	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
403 	if (rc) {
404 		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
405 			__func__, rc);
406 		goto out;
407 	}
408 	wait_for_completion_interruptible_timeout(&ia->ri_done,
409 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
410 	rc = ia->ri_async_rc;
411 	if (rc)
412 		goto out;
413 
414 	return id;
415 
416 out:
417 	rdma_destroy_id(id);
418 	return ERR_PTR(rc);
419 }
420 
421 /*
422  * Drain any cq, prior to teardown.
423  */
424 static void
425 rpcrdma_clean_cq(struct ib_cq *cq)
426 {
427 	struct ib_wc wc;
428 	int count = 0;
429 
430 	while (1 == ib_poll_cq(cq, 1, &wc))
431 		++count;
432 
433 	if (count)
434 		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
435 			__func__, count, wc.opcode);
436 }
437 
438 /*
439  * Exported functions.
440  */
441 
442 /*
443  * Open and initialize an Interface Adapter.
444  *  o initializes fields of struct rpcrdma_ia, including
445  *    interface and provider attributes and protection zone.
446  */
447 int
448 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
449 {
450 	int rc, mem_priv;
451 	struct ib_device_attr devattr;
452 	struct rpcrdma_ia *ia = &xprt->rx_ia;
453 
454 	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
455 	if (IS_ERR(ia->ri_id)) {
456 		rc = PTR_ERR(ia->ri_id);
457 		goto out1;
458 	}
459 
460 	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
461 	if (IS_ERR(ia->ri_pd)) {
462 		rc = PTR_ERR(ia->ri_pd);
463 		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
464 			__func__, rc);
465 		goto out2;
466 	}
467 
468 	/*
469 	 * Query the device to determine if the requested memory
470 	 * registration strategy is supported. If it isn't, set the
471 	 * strategy to a globally supported model.
472 	 */
473 	rc = ib_query_device(ia->ri_id->device, &devattr);
474 	if (rc) {
475 		dprintk("RPC:       %s: ib_query_device failed %d\n",
476 			__func__, rc);
477 		goto out2;
478 	}
479 
480 	if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
481 		ia->ri_have_dma_lkey = 1;
482 		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
483 	}
484 
485 	switch (memreg) {
486 	case RPCRDMA_MEMWINDOWS:
487 	case RPCRDMA_MEMWINDOWS_ASYNC:
488 		if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
489 			dprintk("RPC:       %s: MEMWINDOWS registration "
490 				"specified but not supported by adapter, "
491 				"using slower RPCRDMA_REGISTER\n",
492 				__func__);
493 			memreg = RPCRDMA_REGISTER;
494 		}
495 		break;
496 	case RPCRDMA_MTHCAFMR:
497 		if (!ia->ri_id->device->alloc_fmr) {
498 #if RPCRDMA_PERSISTENT_REGISTRATION
499 			dprintk("RPC:       %s: MTHCAFMR registration "
500 				"specified but not supported by adapter, "
501 				"using riskier RPCRDMA_ALLPHYSICAL\n",
502 				__func__);
503 			memreg = RPCRDMA_ALLPHYSICAL;
504 #else
505 			dprintk("RPC:       %s: MTHCAFMR registration "
506 				"specified but not supported by adapter, "
507 				"using slower RPCRDMA_REGISTER\n",
508 				__func__);
509 			memreg = RPCRDMA_REGISTER;
510 #endif
511 		}
512 		break;
513 	case RPCRDMA_FRMR:
514 		/* Requires both frmr reg and local dma lkey */
515 		if ((devattr.device_cap_flags &
516 		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
517 		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
518 #if RPCRDMA_PERSISTENT_REGISTRATION
519 			dprintk("RPC:       %s: FRMR registration "
520 				"specified but not supported by adapter, "
521 				"using riskier RPCRDMA_ALLPHYSICAL\n",
522 				__func__);
523 			memreg = RPCRDMA_ALLPHYSICAL;
524 #else
525 			dprintk("RPC:       %s: FRMR registration "
526 				"specified but not supported by adapter, "
527 				"using slower RPCRDMA_REGISTER\n",
528 				__func__);
529 			memreg = RPCRDMA_REGISTER;
530 #endif
531 		}
532 		break;
533 	}
534 
535 	/*
536 	 * Optionally obtain an underlying physical identity mapping in
537 	 * order to do a memory window-based bind. This base registration
538 	 * is protected from remote access - that is enabled only by binding
539 	 * for the specific bytes targeted during each RPC operation, and
540 	 * revoked after the corresponding completion similar to a storage
541 	 * adapter.
542 	 */
543 	switch (memreg) {
544 	case RPCRDMA_BOUNCEBUFFERS:
545 	case RPCRDMA_REGISTER:
546 	case RPCRDMA_FRMR:
547 		break;
548 #if RPCRDMA_PERSISTENT_REGISTRATION
549 	case RPCRDMA_ALLPHYSICAL:
550 		mem_priv = IB_ACCESS_LOCAL_WRITE |
551 				IB_ACCESS_REMOTE_WRITE |
552 				IB_ACCESS_REMOTE_READ;
553 		goto register_setup;
554 #endif
555 	case RPCRDMA_MEMWINDOWS_ASYNC:
556 	case RPCRDMA_MEMWINDOWS:
557 		mem_priv = IB_ACCESS_LOCAL_WRITE |
558 				IB_ACCESS_MW_BIND;
559 		goto register_setup;
560 	case RPCRDMA_MTHCAFMR:
561 		if (ia->ri_have_dma_lkey)
562 			break;
563 		mem_priv = IB_ACCESS_LOCAL_WRITE;
564 	register_setup:
565 		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
566 		if (IS_ERR(ia->ri_bind_mem)) {
567 			printk(KERN_ALERT "%s: ib_get_dma_mr for "
568 				"phys register failed with %lX\n\t"
569 				"Will continue with degraded performance\n",
570 				__func__, PTR_ERR(ia->ri_bind_mem));
571 			memreg = RPCRDMA_REGISTER;
572 			ia->ri_bind_mem = NULL;
573 		}
574 		break;
575 	default:
576 		printk(KERN_ERR "%s: invalid memory registration mode %d\n",
577 				__func__, memreg);
578 		rc = -EINVAL;
579 		goto out2;
580 	}
581 	dprintk("RPC:       %s: memory registration strategy is %d\n",
582 		__func__, memreg);
583 
584 	/* Else will do memory reg/dereg for each chunk */
585 	ia->ri_memreg_strategy = memreg;
586 
587 	return 0;
588 out2:
589 	rdma_destroy_id(ia->ri_id);
590 	ia->ri_id = NULL;
591 out1:
592 	return rc;
593 }
594 
595 /*
596  * Clean up/close an IA.
597  *   o if event handles and PD have been initialized, free them.
598  *   o close the IA
599  */
600 void
601 rpcrdma_ia_close(struct rpcrdma_ia *ia)
602 {
603 	int rc;
604 
605 	dprintk("RPC:       %s: entering\n", __func__);
606 	if (ia->ri_bind_mem != NULL) {
607 		rc = ib_dereg_mr(ia->ri_bind_mem);
608 		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
609 			__func__, rc);
610 	}
611 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
612 		if (ia->ri_id->qp)
613 			rdma_destroy_qp(ia->ri_id);
614 		rdma_destroy_id(ia->ri_id);
615 		ia->ri_id = NULL;
616 	}
617 	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
618 		rc = ib_dealloc_pd(ia->ri_pd);
619 		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
620 			__func__, rc);
621 	}
622 }
623 
624 /*
625  * Create unconnected endpoint.
626  */
627 int
628 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
629 				struct rpcrdma_create_data_internal *cdata)
630 {
631 	struct ib_device_attr devattr;
632 	int rc, err;
633 
634 	rc = ib_query_device(ia->ri_id->device, &devattr);
635 	if (rc) {
636 		dprintk("RPC:       %s: ib_query_device failed %d\n",
637 			__func__, rc);
638 		return rc;
639 	}
640 
641 	/* check provider's send/recv wr limits */
642 	if (cdata->max_requests > devattr.max_qp_wr)
643 		cdata->max_requests = devattr.max_qp_wr;
644 
645 	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
646 	ep->rep_attr.qp_context = ep;
647 	/* send_cq and recv_cq initialized below */
648 	ep->rep_attr.srq = NULL;
649 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
650 	switch (ia->ri_memreg_strategy) {
651 	case RPCRDMA_FRMR:
652 		/* Add room for frmr register and invalidate WRs */
653 		ep->rep_attr.cap.max_send_wr *= 3;
654 		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
655 			return -EINVAL;
656 		break;
657 	case RPCRDMA_MEMWINDOWS_ASYNC:
658 	case RPCRDMA_MEMWINDOWS:
659 		/* Add room for mw_binds+unbinds - overkill! */
660 		ep->rep_attr.cap.max_send_wr++;
661 		ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
662 		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
663 			return -EINVAL;
664 		break;
665 	default:
666 		break;
667 	}
668 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
669 	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
670 	ep->rep_attr.cap.max_recv_sge = 1;
671 	ep->rep_attr.cap.max_inline_data = 0;
672 	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
673 	ep->rep_attr.qp_type = IB_QPT_RC;
674 	ep->rep_attr.port_num = ~0;
675 
676 	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
677 		"iovs: send %d recv %d\n",
678 		__func__,
679 		ep->rep_attr.cap.max_send_wr,
680 		ep->rep_attr.cap.max_recv_wr,
681 		ep->rep_attr.cap.max_send_sge,
682 		ep->rep_attr.cap.max_recv_sge);
683 
684 	/* set trigger for requesting send completion */
685 	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
686 	switch (ia->ri_memreg_strategy) {
687 	case RPCRDMA_MEMWINDOWS_ASYNC:
688 	case RPCRDMA_MEMWINDOWS:
689 		ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
690 		break;
691 	default:
692 		break;
693 	}
694 	if (ep->rep_cqinit <= 2)
695 		ep->rep_cqinit = 0;
696 	INIT_CQCOUNT(ep);
697 	ep->rep_ia = ia;
698 	init_waitqueue_head(&ep->rep_connect_wait);
699 
700 	/*
701 	 * Create a single cq for receive dto and mw_bind (only ever
702 	 * care about unbind, really). Send completions are suppressed.
703 	 * Use single threaded tasklet upcalls to maintain ordering.
704 	 */
705 	ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
706 				  rpcrdma_cq_async_error_upcall, NULL,
707 				  ep->rep_attr.cap.max_recv_wr +
708 				  ep->rep_attr.cap.max_send_wr + 1, 0);
709 	if (IS_ERR(ep->rep_cq)) {
710 		rc = PTR_ERR(ep->rep_cq);
711 		dprintk("RPC:       %s: ib_create_cq failed: %i\n",
712 			__func__, rc);
713 		goto out1;
714 	}
715 
716 	rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
717 	if (rc) {
718 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
719 			__func__, rc);
720 		goto out2;
721 	}
722 
723 	ep->rep_attr.send_cq = ep->rep_cq;
724 	ep->rep_attr.recv_cq = ep->rep_cq;
725 
726 	/* Initialize cma parameters */
727 
728 	/* RPC/RDMA does not use private data */
729 	ep->rep_remote_cma.private_data = NULL;
730 	ep->rep_remote_cma.private_data_len = 0;
731 
732 	/* Client offers RDMA Read but does not initiate */
733 	ep->rep_remote_cma.initiator_depth = 0;
734 	if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
735 		ep->rep_remote_cma.responder_resources = 0;
736 	else if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
737 		ep->rep_remote_cma.responder_resources = 32;
738 	else
739 		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
740 
741 	ep->rep_remote_cma.retry_count = 7;
742 	ep->rep_remote_cma.flow_control = 0;
743 	ep->rep_remote_cma.rnr_retry_count = 0;
744 
745 	return 0;
746 
747 out2:
748 	err = ib_destroy_cq(ep->rep_cq);
749 	if (err)
750 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
751 			__func__, err);
752 out1:
753 	return rc;
754 }
755 
756 /*
757  * rpcrdma_ep_destroy
758  *
759  * Disconnect and destroy endpoint. After this, the only
760  * valid operations on the ep are to free it (if dynamically
761  * allocated) or re-create it.
762  *
763  * The caller's error handling must be sure to not leak the endpoint
764  * if this function fails.
765  */
766 int
767 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
768 {
769 	int rc;
770 
771 	dprintk("RPC:       %s: entering, connected is %d\n",
772 		__func__, ep->rep_connected);
773 
774 	if (ia->ri_id->qp) {
775 		rc = rpcrdma_ep_disconnect(ep, ia);
776 		if (rc)
777 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
778 				" returned %i\n", __func__, rc);
779 		rdma_destroy_qp(ia->ri_id);
780 		ia->ri_id->qp = NULL;
781 	}
782 
783 	/* padding - could be done in rpcrdma_buffer_destroy... */
784 	if (ep->rep_pad_mr) {
785 		rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
786 		ep->rep_pad_mr = NULL;
787 	}
788 
789 	rpcrdma_clean_cq(ep->rep_cq);
790 	rc = ib_destroy_cq(ep->rep_cq);
791 	if (rc)
792 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
793 			__func__, rc);
794 
795 	return rc;
796 }
797 
798 /*
799  * Connect unconnected endpoint.
800  */
801 int
802 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
803 {
804 	struct rdma_cm_id *id;
805 	int rc = 0;
806 	int retry_count = 0;
807 
808 	if (ep->rep_connected != 0) {
809 		struct rpcrdma_xprt *xprt;
810 retry:
811 		rc = rpcrdma_ep_disconnect(ep, ia);
812 		if (rc && rc != -ENOTCONN)
813 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
814 				" status %i\n", __func__, rc);
815 		rpcrdma_clean_cq(ep->rep_cq);
816 
817 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
818 		id = rpcrdma_create_id(xprt, ia,
819 				(struct sockaddr *)&xprt->rx_data.addr);
820 		if (IS_ERR(id)) {
821 			rc = PTR_ERR(id);
822 			goto out;
823 		}
824 		/* TEMP TEMP TEMP - fail if new device:
825 		 * Deregister/remarshal *all* requests!
826 		 * Close and recreate adapter, pd, etc!
827 		 * Re-determine all attributes still sane!
828 		 * More stuff I haven't thought of!
829 		 * Rrrgh!
830 		 */
831 		if (ia->ri_id->device != id->device) {
832 			printk("RPC:       %s: can't reconnect on "
833 				"different device!\n", __func__);
834 			rdma_destroy_id(id);
835 			rc = -ENETDOWN;
836 			goto out;
837 		}
838 		/* END TEMP */
839 		rdma_destroy_qp(ia->ri_id);
840 		rdma_destroy_id(ia->ri_id);
841 		ia->ri_id = id;
842 	}
843 
844 	rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
845 	if (rc) {
846 		dprintk("RPC:       %s: rdma_create_qp failed %i\n",
847 			__func__, rc);
848 		goto out;
849 	}
850 
851 /* XXX Tavor device performs badly with 2K MTU! */
852 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
853 	struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
854 	if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
855 	    (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
856 	     pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
857 		struct ib_qp_attr attr = {
858 			.path_mtu = IB_MTU_1024
859 		};
860 		rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
861 	}
862 }
863 
864 	ep->rep_connected = 0;
865 
866 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
867 	if (rc) {
868 		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
869 				__func__, rc);
870 		goto out;
871 	}
872 
873 	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
874 
875 	/*
876 	 * Check state. A non-peer reject indicates no listener
877 	 * (ECONNREFUSED), which may be a transient state. All
878 	 * others indicate a transport condition which has already
879 	 * undergone a best-effort.
880 	 */
881 	if (ep->rep_connected == -ECONNREFUSED
882 	    && ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
883 		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
884 		goto retry;
885 	}
886 	if (ep->rep_connected <= 0) {
887 		/* Sometimes, the only way to reliably connect to remote
888 		 * CMs is to use same nonzero values for ORD and IRD. */
889 		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
890 		    (ep->rep_remote_cma.responder_resources == 0 ||
891 		     ep->rep_remote_cma.initiator_depth !=
892 				ep->rep_remote_cma.responder_resources)) {
893 			if (ep->rep_remote_cma.responder_resources == 0)
894 				ep->rep_remote_cma.responder_resources = 1;
895 			ep->rep_remote_cma.initiator_depth =
896 				ep->rep_remote_cma.responder_resources;
897 			goto retry;
898 		}
899 		rc = ep->rep_connected;
900 	} else {
901 		dprintk("RPC:       %s: connected\n", __func__);
902 	}
903 
904 out:
905 	if (rc)
906 		ep->rep_connected = rc;
907 	return rc;
908 }
909 
910 /*
911  * rpcrdma_ep_disconnect
912  *
913  * This is separate from destroy to facilitate the ability
914  * to reconnect without recreating the endpoint.
915  *
916  * This call is not reentrant, and must not be made in parallel
917  * on the same endpoint.
918  */
919 int
920 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
921 {
922 	int rc;
923 
924 	rpcrdma_clean_cq(ep->rep_cq);
925 	rc = rdma_disconnect(ia->ri_id);
926 	if (!rc) {
927 		/* returns without wait if not connected */
928 		wait_event_interruptible(ep->rep_connect_wait,
929 							ep->rep_connected != 1);
930 		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
931 			(ep->rep_connected == 1) ? "still " : "dis");
932 	} else {
933 		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
934 		ep->rep_connected = rc;
935 	}
936 	return rc;
937 }
938 
939 /*
940  * Initialize buffer memory
941  */
942 int
943 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
944 	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
945 {
946 	char *p;
947 	size_t len;
948 	int i, rc;
949 	struct rpcrdma_mw *r;
950 
951 	buf->rb_max_requests = cdata->max_requests;
952 	spin_lock_init(&buf->rb_lock);
953 	atomic_set(&buf->rb_credits, 1);
954 
955 	/* Need to allocate:
956 	 *   1.  arrays for send and recv pointers
957 	 *   2.  arrays of struct rpcrdma_req to fill in pointers
958 	 *   3.  array of struct rpcrdma_rep for replies
959 	 *   4.  padding, if any
960 	 *   5.  mw's, fmr's or frmr's, if any
961 	 * Send/recv buffers in req/rep need to be registered
962 	 */
963 
964 	len = buf->rb_max_requests *
965 		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
966 	len += cdata->padding;
967 	switch (ia->ri_memreg_strategy) {
968 	case RPCRDMA_FRMR:
969 		len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
970 				sizeof(struct rpcrdma_mw);
971 		break;
972 	case RPCRDMA_MTHCAFMR:
973 		/* TBD we are perhaps overallocating here */
974 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
975 				sizeof(struct rpcrdma_mw);
976 		break;
977 	case RPCRDMA_MEMWINDOWS_ASYNC:
978 	case RPCRDMA_MEMWINDOWS:
979 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
980 				sizeof(struct rpcrdma_mw);
981 		break;
982 	default:
983 		break;
984 	}
985 
986 	/* allocate 1, 4 and 5 in one shot */
987 	p = kzalloc(len, GFP_KERNEL);
988 	if (p == NULL) {
989 		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
990 			__func__, len);
991 		rc = -ENOMEM;
992 		goto out;
993 	}
994 	buf->rb_pool = p;	/* for freeing it later */
995 
996 	buf->rb_send_bufs = (struct rpcrdma_req **) p;
997 	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
998 	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
999 	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1000 
1001 	/*
1002 	 * Register the zeroed pad buffer, if any.
1003 	 */
1004 	if (cdata->padding) {
1005 		rc = rpcrdma_register_internal(ia, p, cdata->padding,
1006 					    &ep->rep_pad_mr, &ep->rep_pad);
1007 		if (rc)
1008 			goto out;
1009 	}
1010 	p += cdata->padding;
1011 
1012 	/*
1013 	 * Allocate the fmr's, or mw's for mw_bind chunk registration.
1014 	 * We "cycle" the mw's in order to minimize rkey reuse,
1015 	 * and also reduce unbind-to-bind collision.
1016 	 */
1017 	INIT_LIST_HEAD(&buf->rb_mws);
1018 	r = (struct rpcrdma_mw *)p;
1019 	switch (ia->ri_memreg_strategy) {
1020 	case RPCRDMA_FRMR:
1021 		for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1022 			r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1023 							 RPCRDMA_MAX_SEGS);
1024 			if (IS_ERR(r->r.frmr.fr_mr)) {
1025 				rc = PTR_ERR(r->r.frmr.fr_mr);
1026 				dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1027 					" failed %i\n", __func__, rc);
1028 				goto out;
1029 			}
1030 			r->r.frmr.fr_pgl =
1031 				ib_alloc_fast_reg_page_list(ia->ri_id->device,
1032 							    RPCRDMA_MAX_SEGS);
1033 			if (IS_ERR(r->r.frmr.fr_pgl)) {
1034 				rc = PTR_ERR(r->r.frmr.fr_pgl);
1035 				dprintk("RPC:       %s: "
1036 					"ib_alloc_fast_reg_page_list "
1037 					"failed %i\n", __func__, rc);
1038 				goto out;
1039 			}
1040 			list_add(&r->mw_list, &buf->rb_mws);
1041 			++r;
1042 		}
1043 		break;
1044 	case RPCRDMA_MTHCAFMR:
1045 		/* TBD we are perhaps overallocating here */
1046 		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1047 			static struct ib_fmr_attr fa =
1048 				{ RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1049 			r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1050 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1051 				&fa);
1052 			if (IS_ERR(r->r.fmr)) {
1053 				rc = PTR_ERR(r->r.fmr);
1054 				dprintk("RPC:       %s: ib_alloc_fmr"
1055 					" failed %i\n", __func__, rc);
1056 				goto out;
1057 			}
1058 			list_add(&r->mw_list, &buf->rb_mws);
1059 			++r;
1060 		}
1061 		break;
1062 	case RPCRDMA_MEMWINDOWS_ASYNC:
1063 	case RPCRDMA_MEMWINDOWS:
1064 		/* Allocate one extra request's worth, for full cycling */
1065 		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1066 			r->r.mw = ib_alloc_mw(ia->ri_pd);
1067 			if (IS_ERR(r->r.mw)) {
1068 				rc = PTR_ERR(r->r.mw);
1069 				dprintk("RPC:       %s: ib_alloc_mw"
1070 					" failed %i\n", __func__, rc);
1071 				goto out;
1072 			}
1073 			list_add(&r->mw_list, &buf->rb_mws);
1074 			++r;
1075 		}
1076 		break;
1077 	default:
1078 		break;
1079 	}
1080 
1081 	/*
1082 	 * Allocate/init the request/reply buffers. Doing this
1083 	 * using kmalloc for now -- one for each buf.
1084 	 */
1085 	for (i = 0; i < buf->rb_max_requests; i++) {
1086 		struct rpcrdma_req *req;
1087 		struct rpcrdma_rep *rep;
1088 
1089 		len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1090 		/* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1091 		/* Typical ~2400b, so rounding up saves work later */
1092 		if (len < 4096)
1093 			len = 4096;
1094 		req = kmalloc(len, GFP_KERNEL);
1095 		if (req == NULL) {
1096 			dprintk("RPC:       %s: request buffer %d alloc"
1097 				" failed\n", __func__, i);
1098 			rc = -ENOMEM;
1099 			goto out;
1100 		}
1101 		memset(req, 0, sizeof(struct rpcrdma_req));
1102 		buf->rb_send_bufs[i] = req;
1103 		buf->rb_send_bufs[i]->rl_buffer = buf;
1104 
1105 		rc = rpcrdma_register_internal(ia, req->rl_base,
1106 				len - offsetof(struct rpcrdma_req, rl_base),
1107 				&buf->rb_send_bufs[i]->rl_handle,
1108 				&buf->rb_send_bufs[i]->rl_iov);
1109 		if (rc)
1110 			goto out;
1111 
1112 		buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1113 
1114 		len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1115 		rep = kmalloc(len, GFP_KERNEL);
1116 		if (rep == NULL) {
1117 			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1118 				__func__, i);
1119 			rc = -ENOMEM;
1120 			goto out;
1121 		}
1122 		memset(rep, 0, sizeof(struct rpcrdma_rep));
1123 		buf->rb_recv_bufs[i] = rep;
1124 		buf->rb_recv_bufs[i]->rr_buffer = buf;
1125 		init_waitqueue_head(&rep->rr_unbind);
1126 
1127 		rc = rpcrdma_register_internal(ia, rep->rr_base,
1128 				len - offsetof(struct rpcrdma_rep, rr_base),
1129 				&buf->rb_recv_bufs[i]->rr_handle,
1130 				&buf->rb_recv_bufs[i]->rr_iov);
1131 		if (rc)
1132 			goto out;
1133 
1134 	}
1135 	dprintk("RPC:       %s: max_requests %d\n",
1136 		__func__, buf->rb_max_requests);
1137 	/* done */
1138 	return 0;
1139 out:
1140 	rpcrdma_buffer_destroy(buf);
1141 	return rc;
1142 }
1143 
1144 /*
1145  * Unregister and destroy buffer memory. Need to deal with
1146  * partial initialization, so it's callable from failed create.
1147  * Must be called before destroying endpoint, as registrations
1148  * reference it.
1149  */
1150 void
1151 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1152 {
1153 	int rc, i;
1154 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1155 	struct rpcrdma_mw *r;
1156 
1157 	/* clean up in reverse order from create
1158 	 *   1.  recv mr memory (mr free, then kfree)
1159 	 *   1a. bind mw memory
1160 	 *   2.  send mr memory (mr free, then kfree)
1161 	 *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1162 	 *   4.  arrays
1163 	 */
1164 	dprintk("RPC:       %s: entering\n", __func__);
1165 
1166 	for (i = 0; i < buf->rb_max_requests; i++) {
1167 		if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1168 			rpcrdma_deregister_internal(ia,
1169 					buf->rb_recv_bufs[i]->rr_handle,
1170 					&buf->rb_recv_bufs[i]->rr_iov);
1171 			kfree(buf->rb_recv_bufs[i]);
1172 		}
1173 		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1174 			while (!list_empty(&buf->rb_mws)) {
1175 				r = list_entry(buf->rb_mws.next,
1176 					struct rpcrdma_mw, mw_list);
1177 				list_del(&r->mw_list);
1178 				switch (ia->ri_memreg_strategy) {
1179 				case RPCRDMA_FRMR:
1180 					rc = ib_dereg_mr(r->r.frmr.fr_mr);
1181 					if (rc)
1182 						dprintk("RPC:       %s:"
1183 							" ib_dereg_mr"
1184 							" failed %i\n",
1185 							__func__, rc);
1186 					ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1187 					break;
1188 				case RPCRDMA_MTHCAFMR:
1189 					rc = ib_dealloc_fmr(r->r.fmr);
1190 					if (rc)
1191 						dprintk("RPC:       %s:"
1192 							" ib_dealloc_fmr"
1193 							" failed %i\n",
1194 							__func__, rc);
1195 					break;
1196 				case RPCRDMA_MEMWINDOWS_ASYNC:
1197 				case RPCRDMA_MEMWINDOWS:
1198 					rc = ib_dealloc_mw(r->r.mw);
1199 					if (rc)
1200 						dprintk("RPC:       %s:"
1201 							" ib_dealloc_mw"
1202 							" failed %i\n",
1203 							__func__, rc);
1204 					break;
1205 				default:
1206 					break;
1207 				}
1208 			}
1209 			rpcrdma_deregister_internal(ia,
1210 					buf->rb_send_bufs[i]->rl_handle,
1211 					&buf->rb_send_bufs[i]->rl_iov);
1212 			kfree(buf->rb_send_bufs[i]);
1213 		}
1214 	}
1215 
1216 	kfree(buf->rb_pool);
1217 }
1218 
1219 /*
1220  * Get a set of request/reply buffers.
1221  *
1222  * Reply buffer (if needed) is attached to send buffer upon return.
1223  * Rule:
1224  *    rb_send_index and rb_recv_index MUST always be pointing to the
1225  *    *next* available buffer (non-NULL). They are incremented after
1226  *    removing buffers, and decremented *before* returning them.
1227  */
1228 struct rpcrdma_req *
1229 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1230 {
1231 	struct rpcrdma_req *req;
1232 	unsigned long flags;
1233 	int i;
1234 	struct rpcrdma_mw *r;
1235 
1236 	spin_lock_irqsave(&buffers->rb_lock, flags);
1237 	if (buffers->rb_send_index == buffers->rb_max_requests) {
1238 		spin_unlock_irqrestore(&buffers->rb_lock, flags);
1239 		dprintk("RPC:       %s: out of request buffers\n", __func__);
1240 		return ((struct rpcrdma_req *)NULL);
1241 	}
1242 
1243 	req = buffers->rb_send_bufs[buffers->rb_send_index];
1244 	if (buffers->rb_send_index < buffers->rb_recv_index) {
1245 		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1246 			__func__,
1247 			buffers->rb_recv_index - buffers->rb_send_index);
1248 		req->rl_reply = NULL;
1249 	} else {
1250 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1251 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1252 	}
1253 	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1254 	if (!list_empty(&buffers->rb_mws)) {
1255 		i = RPCRDMA_MAX_SEGS - 1;
1256 		do {
1257 			r = list_entry(buffers->rb_mws.next,
1258 					struct rpcrdma_mw, mw_list);
1259 			list_del(&r->mw_list);
1260 			req->rl_segments[i].mr_chunk.rl_mw = r;
1261 		} while (--i >= 0);
1262 	}
1263 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1264 	return req;
1265 }
1266 
1267 /*
1268  * Put request/reply buffers back into pool.
1269  * Pre-decrement counter/array index.
1270  */
1271 void
1272 rpcrdma_buffer_put(struct rpcrdma_req *req)
1273 {
1274 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1275 	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1276 	int i;
1277 	unsigned long flags;
1278 
1279 	BUG_ON(req->rl_nchunks != 0);
1280 	spin_lock_irqsave(&buffers->rb_lock, flags);
1281 	buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1282 	req->rl_niovs = 0;
1283 	if (req->rl_reply) {
1284 		buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1285 		init_waitqueue_head(&req->rl_reply->rr_unbind);
1286 		req->rl_reply->rr_func = NULL;
1287 		req->rl_reply = NULL;
1288 	}
1289 	switch (ia->ri_memreg_strategy) {
1290 	case RPCRDMA_FRMR:
1291 	case RPCRDMA_MTHCAFMR:
1292 	case RPCRDMA_MEMWINDOWS_ASYNC:
1293 	case RPCRDMA_MEMWINDOWS:
1294 		/*
1295 		 * Cycle mw's back in reverse order, and "spin" them.
1296 		 * This delays and scrambles reuse as much as possible.
1297 		 */
1298 		i = 1;
1299 		do {
1300 			struct rpcrdma_mw **mw;
1301 			mw = &req->rl_segments[i].mr_chunk.rl_mw;
1302 			list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1303 			*mw = NULL;
1304 		} while (++i < RPCRDMA_MAX_SEGS);
1305 		list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1306 					&buffers->rb_mws);
1307 		req->rl_segments[0].mr_chunk.rl_mw = NULL;
1308 		break;
1309 	default:
1310 		break;
1311 	}
1312 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1313 }
1314 
1315 /*
1316  * Recover reply buffers from pool.
1317  * This happens when recovering from error conditions.
1318  * Post-increment counter/array index.
1319  */
1320 void
1321 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1322 {
1323 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1324 	unsigned long flags;
1325 
1326 	if (req->rl_iov.length == 0)	/* special case xprt_rdma_allocate() */
1327 		buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1328 	spin_lock_irqsave(&buffers->rb_lock, flags);
1329 	if (buffers->rb_recv_index < buffers->rb_max_requests) {
1330 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1331 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1332 	}
1333 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1334 }
1335 
1336 /*
1337  * Put reply buffers back into pool when not attached to
1338  * request. This happens in error conditions, and when
1339  * aborting unbinds. Pre-decrement counter/array index.
1340  */
1341 void
1342 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1343 {
1344 	struct rpcrdma_buffer *buffers = rep->rr_buffer;
1345 	unsigned long flags;
1346 
1347 	rep->rr_func = NULL;
1348 	spin_lock_irqsave(&buffers->rb_lock, flags);
1349 	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1350 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1351 }
1352 
1353 /*
1354  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1355  */
1356 
1357 int
1358 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1359 				struct ib_mr **mrp, struct ib_sge *iov)
1360 {
1361 	struct ib_phys_buf ipb;
1362 	struct ib_mr *mr;
1363 	int rc;
1364 
1365 	/*
1366 	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1367 	 */
1368 	iov->addr = ib_dma_map_single(ia->ri_id->device,
1369 			va, len, DMA_BIDIRECTIONAL);
1370 	iov->length = len;
1371 
1372 	if (ia->ri_have_dma_lkey) {
1373 		*mrp = NULL;
1374 		iov->lkey = ia->ri_dma_lkey;
1375 		return 0;
1376 	} else if (ia->ri_bind_mem != NULL) {
1377 		*mrp = NULL;
1378 		iov->lkey = ia->ri_bind_mem->lkey;
1379 		return 0;
1380 	}
1381 
1382 	ipb.addr = iov->addr;
1383 	ipb.size = iov->length;
1384 	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1385 			IB_ACCESS_LOCAL_WRITE, &iov->addr);
1386 
1387 	dprintk("RPC:       %s: phys convert: 0x%llx "
1388 			"registered 0x%llx length %d\n",
1389 			__func__, (unsigned long long)ipb.addr,
1390 			(unsigned long long)iov->addr, len);
1391 
1392 	if (IS_ERR(mr)) {
1393 		*mrp = NULL;
1394 		rc = PTR_ERR(mr);
1395 		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1396 	} else {
1397 		*mrp = mr;
1398 		iov->lkey = mr->lkey;
1399 		rc = 0;
1400 	}
1401 
1402 	return rc;
1403 }
1404 
1405 int
1406 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1407 				struct ib_mr *mr, struct ib_sge *iov)
1408 {
1409 	int rc;
1410 
1411 	ib_dma_unmap_single(ia->ri_id->device,
1412 			iov->addr, iov->length, DMA_BIDIRECTIONAL);
1413 
1414 	if (NULL == mr)
1415 		return 0;
1416 
1417 	rc = ib_dereg_mr(mr);
1418 	if (rc)
1419 		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1420 	return rc;
1421 }
1422 
1423 /*
1424  * Wrappers for chunk registration, shared by read/write chunk code.
1425  */
1426 
1427 static void
1428 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1429 {
1430 	seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1431 	seg->mr_dmalen = seg->mr_len;
1432 	if (seg->mr_page)
1433 		seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1434 				seg->mr_page, offset_in_page(seg->mr_offset),
1435 				seg->mr_dmalen, seg->mr_dir);
1436 	else
1437 		seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1438 				seg->mr_offset,
1439 				seg->mr_dmalen, seg->mr_dir);
1440 }
1441 
1442 static void
1443 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1444 {
1445 	if (seg->mr_page)
1446 		ib_dma_unmap_page(ia->ri_id->device,
1447 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1448 	else
1449 		ib_dma_unmap_single(ia->ri_id->device,
1450 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1451 }
1452 
1453 static int
1454 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1455 			int *nsegs, int writing, struct rpcrdma_ia *ia,
1456 			struct rpcrdma_xprt *r_xprt)
1457 {
1458 	struct rpcrdma_mr_seg *seg1 = seg;
1459 	struct ib_send_wr frmr_wr, *bad_wr;
1460 	u8 key;
1461 	int len, pageoff;
1462 	int i, rc;
1463 
1464 	pageoff = offset_in_page(seg1->mr_offset);
1465 	seg1->mr_offset -= pageoff;	/* start of page */
1466 	seg1->mr_len += pageoff;
1467 	len = -pageoff;
1468 	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1469 		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1470 	for (i = 0; i < *nsegs;) {
1471 		rpcrdma_map_one(ia, seg, writing);
1472 		seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
1473 		len += seg->mr_len;
1474 		++seg;
1475 		++i;
1476 		/* Check for holes */
1477 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1478 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1479 			break;
1480 	}
1481 	dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1482 		__func__, seg1->mr_chunk.rl_mw, i);
1483 
1484 	/* Bump the key */
1485 	key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1486 	ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1487 
1488 	/* Prepare FRMR WR */
1489 	memset(&frmr_wr, 0, sizeof frmr_wr);
1490 	frmr_wr.opcode = IB_WR_FAST_REG_MR;
1491 	frmr_wr.send_flags = 0;			/* unsignaled */
1492 	frmr_wr.wr.fast_reg.iova_start = (unsigned long)seg1->mr_dma;
1493 	frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1494 	frmr_wr.wr.fast_reg.page_list_len = i;
1495 	frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1496 	frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
1497 	frmr_wr.wr.fast_reg.access_flags = (writing ?
1498 				IB_ACCESS_REMOTE_WRITE : IB_ACCESS_REMOTE_READ);
1499 	frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1500 	DECR_CQCOUNT(&r_xprt->rx_ep);
1501 
1502 	rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr);
1503 
1504 	if (rc) {
1505 		dprintk("RPC:       %s: failed ib_post_send for register,"
1506 			" status %i\n", __func__, rc);
1507 		while (i--)
1508 			rpcrdma_unmap_one(ia, --seg);
1509 	} else {
1510 		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1511 		seg1->mr_base = seg1->mr_dma + pageoff;
1512 		seg1->mr_nsegs = i;
1513 		seg1->mr_len = len;
1514 	}
1515 	*nsegs = i;
1516 	return rc;
1517 }
1518 
1519 static int
1520 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1521 			struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1522 {
1523 	struct rpcrdma_mr_seg *seg1 = seg;
1524 	struct ib_send_wr invalidate_wr, *bad_wr;
1525 	int rc;
1526 
1527 	while (seg1->mr_nsegs--)
1528 		rpcrdma_unmap_one(ia, seg++);
1529 
1530 	memset(&invalidate_wr, 0, sizeof invalidate_wr);
1531 	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1532 	invalidate_wr.send_flags = 0;			/* unsignaled */
1533 	invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1534 	DECR_CQCOUNT(&r_xprt->rx_ep);
1535 
1536 	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1537 	if (rc)
1538 		dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1539 			" status %i\n", __func__, rc);
1540 	return rc;
1541 }
1542 
1543 static int
1544 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1545 			int *nsegs, int writing, struct rpcrdma_ia *ia)
1546 {
1547 	struct rpcrdma_mr_seg *seg1 = seg;
1548 	u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1549 	int len, pageoff, i, rc;
1550 
1551 	pageoff = offset_in_page(seg1->mr_offset);
1552 	seg1->mr_offset -= pageoff;	/* start of page */
1553 	seg1->mr_len += pageoff;
1554 	len = -pageoff;
1555 	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1556 		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1557 	for (i = 0; i < *nsegs;) {
1558 		rpcrdma_map_one(ia, seg, writing);
1559 		physaddrs[i] = seg->mr_dma;
1560 		len += seg->mr_len;
1561 		++seg;
1562 		++i;
1563 		/* Check for holes */
1564 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1565 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1566 			break;
1567 	}
1568 	rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1569 				physaddrs, i, seg1->mr_dma);
1570 	if (rc) {
1571 		dprintk("RPC:       %s: failed ib_map_phys_fmr "
1572 			"%u@0x%llx+%i (%d)... status %i\n", __func__,
1573 			len, (unsigned long long)seg1->mr_dma,
1574 			pageoff, i, rc);
1575 		while (i--)
1576 			rpcrdma_unmap_one(ia, --seg);
1577 	} else {
1578 		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1579 		seg1->mr_base = seg1->mr_dma + pageoff;
1580 		seg1->mr_nsegs = i;
1581 		seg1->mr_len = len;
1582 	}
1583 	*nsegs = i;
1584 	return rc;
1585 }
1586 
1587 static int
1588 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1589 			struct rpcrdma_ia *ia)
1590 {
1591 	struct rpcrdma_mr_seg *seg1 = seg;
1592 	LIST_HEAD(l);
1593 	int rc;
1594 
1595 	list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1596 	rc = ib_unmap_fmr(&l);
1597 	while (seg1->mr_nsegs--)
1598 		rpcrdma_unmap_one(ia, seg++);
1599 	if (rc)
1600 		dprintk("RPC:       %s: failed ib_unmap_fmr,"
1601 			" status %i\n", __func__, rc);
1602 	return rc;
1603 }
1604 
1605 static int
1606 rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1607 			int *nsegs, int writing, struct rpcrdma_ia *ia,
1608 			struct rpcrdma_xprt *r_xprt)
1609 {
1610 	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1611 				  IB_ACCESS_REMOTE_READ);
1612 	struct ib_mw_bind param;
1613 	int rc;
1614 
1615 	*nsegs = 1;
1616 	rpcrdma_map_one(ia, seg, writing);
1617 	param.mr = ia->ri_bind_mem;
1618 	param.wr_id = 0ULL;	/* no send cookie */
1619 	param.addr = seg->mr_dma;
1620 	param.length = seg->mr_len;
1621 	param.send_flags = 0;
1622 	param.mw_access_flags = mem_priv;
1623 
1624 	DECR_CQCOUNT(&r_xprt->rx_ep);
1625 	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1626 	if (rc) {
1627 		dprintk("RPC:       %s: failed ib_bind_mw "
1628 			"%u@0x%llx status %i\n",
1629 			__func__, seg->mr_len,
1630 			(unsigned long long)seg->mr_dma, rc);
1631 		rpcrdma_unmap_one(ia, seg);
1632 	} else {
1633 		seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1634 		seg->mr_base = param.addr;
1635 		seg->mr_nsegs = 1;
1636 	}
1637 	return rc;
1638 }
1639 
1640 static int
1641 rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1642 			struct rpcrdma_ia *ia,
1643 			struct rpcrdma_xprt *r_xprt, void **r)
1644 {
1645 	struct ib_mw_bind param;
1646 	LIST_HEAD(l);
1647 	int rc;
1648 
1649 	BUG_ON(seg->mr_nsegs != 1);
1650 	param.mr = ia->ri_bind_mem;
1651 	param.addr = 0ULL;	/* unbind */
1652 	param.length = 0;
1653 	param.mw_access_flags = 0;
1654 	if (*r) {
1655 		param.wr_id = (u64) (unsigned long) *r;
1656 		param.send_flags = IB_SEND_SIGNALED;
1657 		INIT_CQCOUNT(&r_xprt->rx_ep);
1658 	} else {
1659 		param.wr_id = 0ULL;
1660 		param.send_flags = 0;
1661 		DECR_CQCOUNT(&r_xprt->rx_ep);
1662 	}
1663 	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1664 	rpcrdma_unmap_one(ia, seg);
1665 	if (rc)
1666 		dprintk("RPC:       %s: failed ib_(un)bind_mw,"
1667 			" status %i\n", __func__, rc);
1668 	else
1669 		*r = NULL;	/* will upcall on completion */
1670 	return rc;
1671 }
1672 
1673 static int
1674 rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1675 			int *nsegs, int writing, struct rpcrdma_ia *ia)
1676 {
1677 	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1678 				  IB_ACCESS_REMOTE_READ);
1679 	struct rpcrdma_mr_seg *seg1 = seg;
1680 	struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1681 	int len, i, rc = 0;
1682 
1683 	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1684 		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1685 	for (len = 0, i = 0; i < *nsegs;) {
1686 		rpcrdma_map_one(ia, seg, writing);
1687 		ipb[i].addr = seg->mr_dma;
1688 		ipb[i].size = seg->mr_len;
1689 		len += seg->mr_len;
1690 		++seg;
1691 		++i;
1692 		/* Check for holes */
1693 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1694 		    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1695 			break;
1696 	}
1697 	seg1->mr_base = seg1->mr_dma;
1698 	seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1699 				ipb, i, mem_priv, &seg1->mr_base);
1700 	if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1701 		rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1702 		dprintk("RPC:       %s: failed ib_reg_phys_mr "
1703 			"%u@0x%llx (%d)... status %i\n",
1704 			__func__, len,
1705 			(unsigned long long)seg1->mr_dma, i, rc);
1706 		while (i--)
1707 			rpcrdma_unmap_one(ia, --seg);
1708 	} else {
1709 		seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1710 		seg1->mr_nsegs = i;
1711 		seg1->mr_len = len;
1712 	}
1713 	*nsegs = i;
1714 	return rc;
1715 }
1716 
1717 static int
1718 rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1719 			struct rpcrdma_ia *ia)
1720 {
1721 	struct rpcrdma_mr_seg *seg1 = seg;
1722 	int rc;
1723 
1724 	rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1725 	seg1->mr_chunk.rl_mr = NULL;
1726 	while (seg1->mr_nsegs--)
1727 		rpcrdma_unmap_one(ia, seg++);
1728 	if (rc)
1729 		dprintk("RPC:       %s: failed ib_dereg_mr,"
1730 			" status %i\n", __func__, rc);
1731 	return rc;
1732 }
1733 
1734 int
1735 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1736 			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1737 {
1738 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1739 	int rc = 0;
1740 
1741 	switch (ia->ri_memreg_strategy) {
1742 
1743 #if RPCRDMA_PERSISTENT_REGISTRATION
1744 	case RPCRDMA_ALLPHYSICAL:
1745 		rpcrdma_map_one(ia, seg, writing);
1746 		seg->mr_rkey = ia->ri_bind_mem->rkey;
1747 		seg->mr_base = seg->mr_dma;
1748 		seg->mr_nsegs = 1;
1749 		nsegs = 1;
1750 		break;
1751 #endif
1752 
1753 	/* Registration using frmr registration */
1754 	case RPCRDMA_FRMR:
1755 		rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1756 		break;
1757 
1758 	/* Registration using fmr memory registration */
1759 	case RPCRDMA_MTHCAFMR:
1760 		rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1761 		break;
1762 
1763 	/* Registration using memory windows */
1764 	case RPCRDMA_MEMWINDOWS_ASYNC:
1765 	case RPCRDMA_MEMWINDOWS:
1766 		rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1767 		break;
1768 
1769 	/* Default registration each time */
1770 	default:
1771 		rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1772 		break;
1773 	}
1774 	if (rc)
1775 		return -1;
1776 
1777 	return nsegs;
1778 }
1779 
1780 int
1781 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1782 		struct rpcrdma_xprt *r_xprt, void *r)
1783 {
1784 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1785 	int nsegs = seg->mr_nsegs, rc;
1786 
1787 	switch (ia->ri_memreg_strategy) {
1788 
1789 #if RPCRDMA_PERSISTENT_REGISTRATION
1790 	case RPCRDMA_ALLPHYSICAL:
1791 		BUG_ON(nsegs != 1);
1792 		rpcrdma_unmap_one(ia, seg);
1793 		rc = 0;
1794 		break;
1795 #endif
1796 
1797 	case RPCRDMA_FRMR:
1798 		rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1799 		break;
1800 
1801 	case RPCRDMA_MTHCAFMR:
1802 		rc = rpcrdma_deregister_fmr_external(seg, ia);
1803 		break;
1804 
1805 	case RPCRDMA_MEMWINDOWS_ASYNC:
1806 	case RPCRDMA_MEMWINDOWS:
1807 		rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1808 		break;
1809 
1810 	default:
1811 		rc = rpcrdma_deregister_default_external(seg, ia);
1812 		break;
1813 	}
1814 	if (r) {
1815 		struct rpcrdma_rep *rep = r;
1816 		void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1817 		rep->rr_func = NULL;
1818 		func(rep);	/* dereg done, callback now */
1819 	}
1820 	return nsegs;
1821 }
1822 
1823 /*
1824  * Prepost any receive buffer, then post send.
1825  *
1826  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1827  */
1828 int
1829 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1830 		struct rpcrdma_ep *ep,
1831 		struct rpcrdma_req *req)
1832 {
1833 	struct ib_send_wr send_wr, *send_wr_fail;
1834 	struct rpcrdma_rep *rep = req->rl_reply;
1835 	int rc;
1836 
1837 	if (rep) {
1838 		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1839 		if (rc)
1840 			goto out;
1841 		req->rl_reply = NULL;
1842 	}
1843 
1844 	send_wr.next = NULL;
1845 	send_wr.wr_id = 0ULL;	/* no send cookie */
1846 	send_wr.sg_list = req->rl_send_iov;
1847 	send_wr.num_sge = req->rl_niovs;
1848 	send_wr.opcode = IB_WR_SEND;
1849 	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
1850 		ib_dma_sync_single_for_device(ia->ri_id->device,
1851 			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1852 			DMA_TO_DEVICE);
1853 	ib_dma_sync_single_for_device(ia->ri_id->device,
1854 		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1855 		DMA_TO_DEVICE);
1856 	ib_dma_sync_single_for_device(ia->ri_id->device,
1857 		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1858 		DMA_TO_DEVICE);
1859 
1860 	if (DECR_CQCOUNT(ep) > 0)
1861 		send_wr.send_flags = 0;
1862 	else { /* Provider must take a send completion every now and then */
1863 		INIT_CQCOUNT(ep);
1864 		send_wr.send_flags = IB_SEND_SIGNALED;
1865 	}
1866 
1867 	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1868 	if (rc)
1869 		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1870 			rc);
1871 out:
1872 	return rc;
1873 }
1874 
1875 /*
1876  * (Re)post a receive buffer.
1877  */
1878 int
1879 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1880 		     struct rpcrdma_ep *ep,
1881 		     struct rpcrdma_rep *rep)
1882 {
1883 	struct ib_recv_wr recv_wr, *recv_wr_fail;
1884 	int rc;
1885 
1886 	recv_wr.next = NULL;
1887 	recv_wr.wr_id = (u64) (unsigned long) rep;
1888 	recv_wr.sg_list = &rep->rr_iov;
1889 	recv_wr.num_sge = 1;
1890 
1891 	ib_dma_sync_single_for_cpu(ia->ri_id->device,
1892 		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1893 
1894 	DECR_CQCOUNT(ep);
1895 	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1896 
1897 	if (rc)
1898 		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1899 			rc);
1900 	return rc;
1901 }
1902