xref: /openbmc/linux/net/sunrpc/xprtrdma/verbs.c (revision df2634f43f5106947f3735a0b61a6527a4b278cd)
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39 
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49 
50 #include <linux/pci.h>	/* for Tavor hack below */
51 #include <linux/slab.h>
52 
53 #include "xprt_rdma.h"
54 
55 /*
56  * Globals/Macros
57  */
58 
59 #ifdef RPC_DEBUG
60 # define RPCDBG_FACILITY	RPCDBG_TRANS
61 #endif
62 
63 /*
64  * internal functions
65  */
66 
67 /*
68  * handle replies in tasklet context, using a single, global list
69  * rdma tasklet function -- just turn around and call the func
70  * for all replies on the list
71  */
72 
73 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
74 static LIST_HEAD(rpcrdma_tasklets_g);
75 
76 static void
77 rpcrdma_run_tasklet(unsigned long data)
78 {
79 	struct rpcrdma_rep *rep;
80 	void (*func)(struct rpcrdma_rep *);
81 	unsigned long flags;
82 
83 	data = data;
84 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
85 	while (!list_empty(&rpcrdma_tasklets_g)) {
86 		rep = list_entry(rpcrdma_tasklets_g.next,
87 				 struct rpcrdma_rep, rr_list);
88 		list_del(&rep->rr_list);
89 		func = rep->rr_func;
90 		rep->rr_func = NULL;
91 		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
92 
93 		if (func)
94 			func(rep);
95 		else
96 			rpcrdma_recv_buffer_put(rep);
97 
98 		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
99 	}
100 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
101 }
102 
103 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
104 
105 static inline void
106 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
107 {
108 	unsigned long flags;
109 
110 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
111 	list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
112 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
113 	tasklet_schedule(&rpcrdma_tasklet_g);
114 }
115 
116 static void
117 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
118 {
119 	struct rpcrdma_ep *ep = context;
120 
121 	dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
122 		__func__, event->event, event->device->name, context);
123 	if (ep->rep_connected == 1) {
124 		ep->rep_connected = -EIO;
125 		ep->rep_func(ep);
126 		wake_up_all(&ep->rep_connect_wait);
127 	}
128 }
129 
130 static void
131 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
132 {
133 	struct rpcrdma_ep *ep = context;
134 
135 	dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
136 		__func__, event->event, event->device->name, context);
137 	if (ep->rep_connected == 1) {
138 		ep->rep_connected = -EIO;
139 		ep->rep_func(ep);
140 		wake_up_all(&ep->rep_connect_wait);
141 	}
142 }
143 
144 static inline
145 void rpcrdma_event_process(struct ib_wc *wc)
146 {
147 	struct rpcrdma_rep *rep =
148 			(struct rpcrdma_rep *)(unsigned long) wc->wr_id;
149 
150 	dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
151 		__func__, rep, wc->status, wc->opcode, wc->byte_len);
152 
153 	if (!rep) /* send or bind completion that we don't care about */
154 		return;
155 
156 	if (IB_WC_SUCCESS != wc->status) {
157 		dprintk("RPC:       %s: %s WC status %X, connection lost\n",
158 			__func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
159 			 wc->status);
160 		rep->rr_len = ~0U;
161 		rpcrdma_schedule_tasklet(rep);
162 		return;
163 	}
164 
165 	switch (wc->opcode) {
166 	case IB_WC_RECV:
167 		rep->rr_len = wc->byte_len;
168 		ib_dma_sync_single_for_cpu(
169 			rdmab_to_ia(rep->rr_buffer)->ri_id->device,
170 			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
171 		/* Keep (only) the most recent credits, after check validity */
172 		if (rep->rr_len >= 16) {
173 			struct rpcrdma_msg *p =
174 					(struct rpcrdma_msg *) rep->rr_base;
175 			unsigned int credits = ntohl(p->rm_credit);
176 			if (credits == 0) {
177 				dprintk("RPC:       %s: server"
178 					" dropped credits to 0!\n", __func__);
179 				/* don't deadlock */
180 				credits = 1;
181 			} else if (credits > rep->rr_buffer->rb_max_requests) {
182 				dprintk("RPC:       %s: server"
183 					" over-crediting: %d (%d)\n",
184 					__func__, credits,
185 					rep->rr_buffer->rb_max_requests);
186 				credits = rep->rr_buffer->rb_max_requests;
187 			}
188 			atomic_set(&rep->rr_buffer->rb_credits, credits);
189 		}
190 		/* fall through */
191 	case IB_WC_BIND_MW:
192 		rpcrdma_schedule_tasklet(rep);
193 		break;
194 	default:
195 		dprintk("RPC:       %s: unexpected WC event %X\n",
196 			__func__, wc->opcode);
197 		break;
198 	}
199 }
200 
201 static inline int
202 rpcrdma_cq_poll(struct ib_cq *cq)
203 {
204 	struct ib_wc wc;
205 	int rc;
206 
207 	for (;;) {
208 		rc = ib_poll_cq(cq, 1, &wc);
209 		if (rc < 0) {
210 			dprintk("RPC:       %s: ib_poll_cq failed %i\n",
211 				__func__, rc);
212 			return rc;
213 		}
214 		if (rc == 0)
215 			break;
216 
217 		rpcrdma_event_process(&wc);
218 	}
219 
220 	return 0;
221 }
222 
223 /*
224  * rpcrdma_cq_event_upcall
225  *
226  * This upcall handles recv, send, bind and unbind events.
227  * It is reentrant but processes single events in order to maintain
228  * ordering of receives to keep server credits.
229  *
230  * It is the responsibility of the scheduled tasklet to return
231  * recv buffers to the pool. NOTE: this affects synchronization of
232  * connection shutdown. That is, the structures required for
233  * the completion of the reply handler must remain intact until
234  * all memory has been reclaimed.
235  *
236  * Note that send events are suppressed and do not result in an upcall.
237  */
238 static void
239 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
240 {
241 	int rc;
242 
243 	rc = rpcrdma_cq_poll(cq);
244 	if (rc)
245 		return;
246 
247 	rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
248 	if (rc) {
249 		dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
250 			__func__, rc);
251 		return;
252 	}
253 
254 	rpcrdma_cq_poll(cq);
255 }
256 
257 #ifdef RPC_DEBUG
258 static const char * const conn[] = {
259 	"address resolved",
260 	"address error",
261 	"route resolved",
262 	"route error",
263 	"connect request",
264 	"connect response",
265 	"connect error",
266 	"unreachable",
267 	"rejected",
268 	"established",
269 	"disconnected",
270 	"device removal"
271 };
272 #endif
273 
274 static int
275 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
276 {
277 	struct rpcrdma_xprt *xprt = id->context;
278 	struct rpcrdma_ia *ia = &xprt->rx_ia;
279 	struct rpcrdma_ep *ep = &xprt->rx_ep;
280 #ifdef RPC_DEBUG
281 	struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
282 #endif
283 	struct ib_qp_attr attr;
284 	struct ib_qp_init_attr iattr;
285 	int connstate = 0;
286 
287 	switch (event->event) {
288 	case RDMA_CM_EVENT_ADDR_RESOLVED:
289 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
290 		ia->ri_async_rc = 0;
291 		complete(&ia->ri_done);
292 		break;
293 	case RDMA_CM_EVENT_ADDR_ERROR:
294 		ia->ri_async_rc = -EHOSTUNREACH;
295 		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
296 			__func__, ep);
297 		complete(&ia->ri_done);
298 		break;
299 	case RDMA_CM_EVENT_ROUTE_ERROR:
300 		ia->ri_async_rc = -ENETUNREACH;
301 		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
302 			__func__, ep);
303 		complete(&ia->ri_done);
304 		break;
305 	case RDMA_CM_EVENT_ESTABLISHED:
306 		connstate = 1;
307 		ib_query_qp(ia->ri_id->qp, &attr,
308 			IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
309 			&iattr);
310 		dprintk("RPC:       %s: %d responder resources"
311 			" (%d initiator)\n",
312 			__func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
313 		goto connected;
314 	case RDMA_CM_EVENT_CONNECT_ERROR:
315 		connstate = -ENOTCONN;
316 		goto connected;
317 	case RDMA_CM_EVENT_UNREACHABLE:
318 		connstate = -ENETDOWN;
319 		goto connected;
320 	case RDMA_CM_EVENT_REJECTED:
321 		connstate = -ECONNREFUSED;
322 		goto connected;
323 	case RDMA_CM_EVENT_DISCONNECTED:
324 		connstate = -ECONNABORTED;
325 		goto connected;
326 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
327 		connstate = -ENODEV;
328 connected:
329 		dprintk("RPC:       %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
330 			__func__,
331 			(event->event <= 11) ? conn[event->event] :
332 						"unknown connection error",
333 			&addr->sin_addr.s_addr,
334 			ntohs(addr->sin_port),
335 			ep, event->event);
336 		atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
337 		dprintk("RPC:       %s: %sconnected\n",
338 					__func__, connstate > 0 ? "" : "dis");
339 		ep->rep_connected = connstate;
340 		ep->rep_func(ep);
341 		wake_up_all(&ep->rep_connect_wait);
342 		break;
343 	default:
344 		dprintk("RPC:       %s: unexpected CM event %d\n",
345 			__func__, event->event);
346 		break;
347 	}
348 
349 #ifdef RPC_DEBUG
350 	if (connstate == 1) {
351 		int ird = attr.max_dest_rd_atomic;
352 		int tird = ep->rep_remote_cma.responder_resources;
353 		printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
354 			"on %s, memreg %d slots %d ird %d%s\n",
355 			&addr->sin_addr.s_addr,
356 			ntohs(addr->sin_port),
357 			ia->ri_id->device->name,
358 			ia->ri_memreg_strategy,
359 			xprt->rx_buf.rb_max_requests,
360 			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
361 	} else if (connstate < 0) {
362 		printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
363 			&addr->sin_addr.s_addr,
364 			ntohs(addr->sin_port),
365 			connstate);
366 	}
367 #endif
368 
369 	return 0;
370 }
371 
372 static struct rdma_cm_id *
373 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
374 			struct rpcrdma_ia *ia, struct sockaddr *addr)
375 {
376 	struct rdma_cm_id *id;
377 	int rc;
378 
379 	init_completion(&ia->ri_done);
380 
381 	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
382 	if (IS_ERR(id)) {
383 		rc = PTR_ERR(id);
384 		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
385 			__func__, rc);
386 		return id;
387 	}
388 
389 	ia->ri_async_rc = -ETIMEDOUT;
390 	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
391 	if (rc) {
392 		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
393 			__func__, rc);
394 		goto out;
395 	}
396 	wait_for_completion_interruptible_timeout(&ia->ri_done,
397 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
398 	rc = ia->ri_async_rc;
399 	if (rc)
400 		goto out;
401 
402 	ia->ri_async_rc = -ETIMEDOUT;
403 	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
404 	if (rc) {
405 		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
406 			__func__, rc);
407 		goto out;
408 	}
409 	wait_for_completion_interruptible_timeout(&ia->ri_done,
410 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
411 	rc = ia->ri_async_rc;
412 	if (rc)
413 		goto out;
414 
415 	return id;
416 
417 out:
418 	rdma_destroy_id(id);
419 	return ERR_PTR(rc);
420 }
421 
422 /*
423  * Drain any cq, prior to teardown.
424  */
425 static void
426 rpcrdma_clean_cq(struct ib_cq *cq)
427 {
428 	struct ib_wc wc;
429 	int count = 0;
430 
431 	while (1 == ib_poll_cq(cq, 1, &wc))
432 		++count;
433 
434 	if (count)
435 		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
436 			__func__, count, wc.opcode);
437 }
438 
439 /*
440  * Exported functions.
441  */
442 
443 /*
444  * Open and initialize an Interface Adapter.
445  *  o initializes fields of struct rpcrdma_ia, including
446  *    interface and provider attributes and protection zone.
447  */
448 int
449 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
450 {
451 	int rc, mem_priv;
452 	struct ib_device_attr devattr;
453 	struct rpcrdma_ia *ia = &xprt->rx_ia;
454 
455 	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
456 	if (IS_ERR(ia->ri_id)) {
457 		rc = PTR_ERR(ia->ri_id);
458 		goto out1;
459 	}
460 
461 	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
462 	if (IS_ERR(ia->ri_pd)) {
463 		rc = PTR_ERR(ia->ri_pd);
464 		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
465 			__func__, rc);
466 		goto out2;
467 	}
468 
469 	/*
470 	 * Query the device to determine if the requested memory
471 	 * registration strategy is supported. If it isn't, set the
472 	 * strategy to a globally supported model.
473 	 */
474 	rc = ib_query_device(ia->ri_id->device, &devattr);
475 	if (rc) {
476 		dprintk("RPC:       %s: ib_query_device failed %d\n",
477 			__func__, rc);
478 		goto out2;
479 	}
480 
481 	if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
482 		ia->ri_have_dma_lkey = 1;
483 		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
484 	}
485 
486 	switch (memreg) {
487 	case RPCRDMA_MEMWINDOWS:
488 	case RPCRDMA_MEMWINDOWS_ASYNC:
489 		if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
490 			dprintk("RPC:       %s: MEMWINDOWS registration "
491 				"specified but not supported by adapter, "
492 				"using slower RPCRDMA_REGISTER\n",
493 				__func__);
494 			memreg = RPCRDMA_REGISTER;
495 		}
496 		break;
497 	case RPCRDMA_MTHCAFMR:
498 		if (!ia->ri_id->device->alloc_fmr) {
499 #if RPCRDMA_PERSISTENT_REGISTRATION
500 			dprintk("RPC:       %s: MTHCAFMR registration "
501 				"specified but not supported by adapter, "
502 				"using riskier RPCRDMA_ALLPHYSICAL\n",
503 				__func__);
504 			memreg = RPCRDMA_ALLPHYSICAL;
505 #else
506 			dprintk("RPC:       %s: MTHCAFMR registration "
507 				"specified but not supported by adapter, "
508 				"using slower RPCRDMA_REGISTER\n",
509 				__func__);
510 			memreg = RPCRDMA_REGISTER;
511 #endif
512 		}
513 		break;
514 	case RPCRDMA_FRMR:
515 		/* Requires both frmr reg and local dma lkey */
516 		if ((devattr.device_cap_flags &
517 		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
518 		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
519 #if RPCRDMA_PERSISTENT_REGISTRATION
520 			dprintk("RPC:       %s: FRMR registration "
521 				"specified but not supported by adapter, "
522 				"using riskier RPCRDMA_ALLPHYSICAL\n",
523 				__func__);
524 			memreg = RPCRDMA_ALLPHYSICAL;
525 #else
526 			dprintk("RPC:       %s: FRMR registration "
527 				"specified but not supported by adapter, "
528 				"using slower RPCRDMA_REGISTER\n",
529 				__func__);
530 			memreg = RPCRDMA_REGISTER;
531 #endif
532 		}
533 		break;
534 	}
535 
536 	/*
537 	 * Optionally obtain an underlying physical identity mapping in
538 	 * order to do a memory window-based bind. This base registration
539 	 * is protected from remote access - that is enabled only by binding
540 	 * for the specific bytes targeted during each RPC operation, and
541 	 * revoked after the corresponding completion similar to a storage
542 	 * adapter.
543 	 */
544 	switch (memreg) {
545 	case RPCRDMA_BOUNCEBUFFERS:
546 	case RPCRDMA_REGISTER:
547 	case RPCRDMA_FRMR:
548 		break;
549 #if RPCRDMA_PERSISTENT_REGISTRATION
550 	case RPCRDMA_ALLPHYSICAL:
551 		mem_priv = IB_ACCESS_LOCAL_WRITE |
552 				IB_ACCESS_REMOTE_WRITE |
553 				IB_ACCESS_REMOTE_READ;
554 		goto register_setup;
555 #endif
556 	case RPCRDMA_MEMWINDOWS_ASYNC:
557 	case RPCRDMA_MEMWINDOWS:
558 		mem_priv = IB_ACCESS_LOCAL_WRITE |
559 				IB_ACCESS_MW_BIND;
560 		goto register_setup;
561 	case RPCRDMA_MTHCAFMR:
562 		if (ia->ri_have_dma_lkey)
563 			break;
564 		mem_priv = IB_ACCESS_LOCAL_WRITE;
565 	register_setup:
566 		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
567 		if (IS_ERR(ia->ri_bind_mem)) {
568 			printk(KERN_ALERT "%s: ib_get_dma_mr for "
569 				"phys register failed with %lX\n\t"
570 				"Will continue with degraded performance\n",
571 				__func__, PTR_ERR(ia->ri_bind_mem));
572 			memreg = RPCRDMA_REGISTER;
573 			ia->ri_bind_mem = NULL;
574 		}
575 		break;
576 	default:
577 		printk(KERN_ERR "%s: invalid memory registration mode %d\n",
578 				__func__, memreg);
579 		rc = -EINVAL;
580 		goto out2;
581 	}
582 	dprintk("RPC:       %s: memory registration strategy is %d\n",
583 		__func__, memreg);
584 
585 	/* Else will do memory reg/dereg for each chunk */
586 	ia->ri_memreg_strategy = memreg;
587 
588 	return 0;
589 out2:
590 	rdma_destroy_id(ia->ri_id);
591 	ia->ri_id = NULL;
592 out1:
593 	return rc;
594 }
595 
596 /*
597  * Clean up/close an IA.
598  *   o if event handles and PD have been initialized, free them.
599  *   o close the IA
600  */
601 void
602 rpcrdma_ia_close(struct rpcrdma_ia *ia)
603 {
604 	int rc;
605 
606 	dprintk("RPC:       %s: entering\n", __func__);
607 	if (ia->ri_bind_mem != NULL) {
608 		rc = ib_dereg_mr(ia->ri_bind_mem);
609 		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
610 			__func__, rc);
611 	}
612 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
613 		if (ia->ri_id->qp)
614 			rdma_destroy_qp(ia->ri_id);
615 		rdma_destroy_id(ia->ri_id);
616 		ia->ri_id = NULL;
617 	}
618 	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
619 		rc = ib_dealloc_pd(ia->ri_pd);
620 		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
621 			__func__, rc);
622 	}
623 }
624 
625 /*
626  * Create unconnected endpoint.
627  */
628 int
629 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
630 				struct rpcrdma_create_data_internal *cdata)
631 {
632 	struct ib_device_attr devattr;
633 	int rc, err;
634 
635 	rc = ib_query_device(ia->ri_id->device, &devattr);
636 	if (rc) {
637 		dprintk("RPC:       %s: ib_query_device failed %d\n",
638 			__func__, rc);
639 		return rc;
640 	}
641 
642 	/* check provider's send/recv wr limits */
643 	if (cdata->max_requests > devattr.max_qp_wr)
644 		cdata->max_requests = devattr.max_qp_wr;
645 
646 	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
647 	ep->rep_attr.qp_context = ep;
648 	/* send_cq and recv_cq initialized below */
649 	ep->rep_attr.srq = NULL;
650 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
651 	switch (ia->ri_memreg_strategy) {
652 	case RPCRDMA_FRMR:
653 		/* Add room for frmr register and invalidate WRs.
654 		 * 1. FRMR reg WR for head
655 		 * 2. FRMR invalidate WR for head
656 		 * 3. FRMR reg WR for pagelist
657 		 * 4. FRMR invalidate WR for pagelist
658 		 * 5. FRMR reg WR for tail
659 		 * 6. FRMR invalidate WR for tail
660 		 * 7. The RDMA_SEND WR
661 		 */
662 		ep->rep_attr.cap.max_send_wr *= 7;
663 		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
664 			cdata->max_requests = devattr.max_qp_wr / 7;
665 			if (!cdata->max_requests)
666 				return -EINVAL;
667 			ep->rep_attr.cap.max_send_wr = cdata->max_requests * 7;
668 		}
669 		break;
670 	case RPCRDMA_MEMWINDOWS_ASYNC:
671 	case RPCRDMA_MEMWINDOWS:
672 		/* Add room for mw_binds+unbinds - overkill! */
673 		ep->rep_attr.cap.max_send_wr++;
674 		ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
675 		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
676 			return -EINVAL;
677 		break;
678 	default:
679 		break;
680 	}
681 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
682 	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
683 	ep->rep_attr.cap.max_recv_sge = 1;
684 	ep->rep_attr.cap.max_inline_data = 0;
685 	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
686 	ep->rep_attr.qp_type = IB_QPT_RC;
687 	ep->rep_attr.port_num = ~0;
688 
689 	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
690 		"iovs: send %d recv %d\n",
691 		__func__,
692 		ep->rep_attr.cap.max_send_wr,
693 		ep->rep_attr.cap.max_recv_wr,
694 		ep->rep_attr.cap.max_send_sge,
695 		ep->rep_attr.cap.max_recv_sge);
696 
697 	/* set trigger for requesting send completion */
698 	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
699 	switch (ia->ri_memreg_strategy) {
700 	case RPCRDMA_MEMWINDOWS_ASYNC:
701 	case RPCRDMA_MEMWINDOWS:
702 		ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
703 		break;
704 	default:
705 		break;
706 	}
707 	if (ep->rep_cqinit <= 2)
708 		ep->rep_cqinit = 0;
709 	INIT_CQCOUNT(ep);
710 	ep->rep_ia = ia;
711 	init_waitqueue_head(&ep->rep_connect_wait);
712 
713 	/*
714 	 * Create a single cq for receive dto and mw_bind (only ever
715 	 * care about unbind, really). Send completions are suppressed.
716 	 * Use single threaded tasklet upcalls to maintain ordering.
717 	 */
718 	ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
719 				  rpcrdma_cq_async_error_upcall, NULL,
720 				  ep->rep_attr.cap.max_recv_wr +
721 				  ep->rep_attr.cap.max_send_wr + 1, 0);
722 	if (IS_ERR(ep->rep_cq)) {
723 		rc = PTR_ERR(ep->rep_cq);
724 		dprintk("RPC:       %s: ib_create_cq failed: %i\n",
725 			__func__, rc);
726 		goto out1;
727 	}
728 
729 	rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
730 	if (rc) {
731 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
732 			__func__, rc);
733 		goto out2;
734 	}
735 
736 	ep->rep_attr.send_cq = ep->rep_cq;
737 	ep->rep_attr.recv_cq = ep->rep_cq;
738 
739 	/* Initialize cma parameters */
740 
741 	/* RPC/RDMA does not use private data */
742 	ep->rep_remote_cma.private_data = NULL;
743 	ep->rep_remote_cma.private_data_len = 0;
744 
745 	/* Client offers RDMA Read but does not initiate */
746 	ep->rep_remote_cma.initiator_depth = 0;
747 	if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
748 		ep->rep_remote_cma.responder_resources = 0;
749 	else if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
750 		ep->rep_remote_cma.responder_resources = 32;
751 	else
752 		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
753 
754 	ep->rep_remote_cma.retry_count = 7;
755 	ep->rep_remote_cma.flow_control = 0;
756 	ep->rep_remote_cma.rnr_retry_count = 0;
757 
758 	return 0;
759 
760 out2:
761 	err = ib_destroy_cq(ep->rep_cq);
762 	if (err)
763 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
764 			__func__, err);
765 out1:
766 	return rc;
767 }
768 
769 /*
770  * rpcrdma_ep_destroy
771  *
772  * Disconnect and destroy endpoint. After this, the only
773  * valid operations on the ep are to free it (if dynamically
774  * allocated) or re-create it.
775  *
776  * The caller's error handling must be sure to not leak the endpoint
777  * if this function fails.
778  */
779 int
780 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
781 {
782 	int rc;
783 
784 	dprintk("RPC:       %s: entering, connected is %d\n",
785 		__func__, ep->rep_connected);
786 
787 	if (ia->ri_id->qp) {
788 		rc = rpcrdma_ep_disconnect(ep, ia);
789 		if (rc)
790 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
791 				" returned %i\n", __func__, rc);
792 		rdma_destroy_qp(ia->ri_id);
793 		ia->ri_id->qp = NULL;
794 	}
795 
796 	/* padding - could be done in rpcrdma_buffer_destroy... */
797 	if (ep->rep_pad_mr) {
798 		rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
799 		ep->rep_pad_mr = NULL;
800 	}
801 
802 	rpcrdma_clean_cq(ep->rep_cq);
803 	rc = ib_destroy_cq(ep->rep_cq);
804 	if (rc)
805 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
806 			__func__, rc);
807 
808 	return rc;
809 }
810 
811 /*
812  * Connect unconnected endpoint.
813  */
814 int
815 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
816 {
817 	struct rdma_cm_id *id;
818 	int rc = 0;
819 	int retry_count = 0;
820 
821 	if (ep->rep_connected != 0) {
822 		struct rpcrdma_xprt *xprt;
823 retry:
824 		rc = rpcrdma_ep_disconnect(ep, ia);
825 		if (rc && rc != -ENOTCONN)
826 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
827 				" status %i\n", __func__, rc);
828 		rpcrdma_clean_cq(ep->rep_cq);
829 
830 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
831 		id = rpcrdma_create_id(xprt, ia,
832 				(struct sockaddr *)&xprt->rx_data.addr);
833 		if (IS_ERR(id)) {
834 			rc = PTR_ERR(id);
835 			goto out;
836 		}
837 		/* TEMP TEMP TEMP - fail if new device:
838 		 * Deregister/remarshal *all* requests!
839 		 * Close and recreate adapter, pd, etc!
840 		 * Re-determine all attributes still sane!
841 		 * More stuff I haven't thought of!
842 		 * Rrrgh!
843 		 */
844 		if (ia->ri_id->device != id->device) {
845 			printk("RPC:       %s: can't reconnect on "
846 				"different device!\n", __func__);
847 			rdma_destroy_id(id);
848 			rc = -ENETDOWN;
849 			goto out;
850 		}
851 		/* END TEMP */
852 		rdma_destroy_qp(ia->ri_id);
853 		rdma_destroy_id(ia->ri_id);
854 		ia->ri_id = id;
855 	}
856 
857 	rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
858 	if (rc) {
859 		dprintk("RPC:       %s: rdma_create_qp failed %i\n",
860 			__func__, rc);
861 		goto out;
862 	}
863 
864 /* XXX Tavor device performs badly with 2K MTU! */
865 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
866 	struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
867 	if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
868 	    (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
869 	     pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
870 		struct ib_qp_attr attr = {
871 			.path_mtu = IB_MTU_1024
872 		};
873 		rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
874 	}
875 }
876 
877 	ep->rep_connected = 0;
878 
879 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
880 	if (rc) {
881 		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
882 				__func__, rc);
883 		goto out;
884 	}
885 
886 	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
887 
888 	/*
889 	 * Check state. A non-peer reject indicates no listener
890 	 * (ECONNREFUSED), which may be a transient state. All
891 	 * others indicate a transport condition which has already
892 	 * undergone a best-effort.
893 	 */
894 	if (ep->rep_connected == -ECONNREFUSED &&
895 	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
896 		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
897 		goto retry;
898 	}
899 	if (ep->rep_connected <= 0) {
900 		/* Sometimes, the only way to reliably connect to remote
901 		 * CMs is to use same nonzero values for ORD and IRD. */
902 		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
903 		    (ep->rep_remote_cma.responder_resources == 0 ||
904 		     ep->rep_remote_cma.initiator_depth !=
905 				ep->rep_remote_cma.responder_resources)) {
906 			if (ep->rep_remote_cma.responder_resources == 0)
907 				ep->rep_remote_cma.responder_resources = 1;
908 			ep->rep_remote_cma.initiator_depth =
909 				ep->rep_remote_cma.responder_resources;
910 			goto retry;
911 		}
912 		rc = ep->rep_connected;
913 	} else {
914 		dprintk("RPC:       %s: connected\n", __func__);
915 	}
916 
917 out:
918 	if (rc)
919 		ep->rep_connected = rc;
920 	return rc;
921 }
922 
923 /*
924  * rpcrdma_ep_disconnect
925  *
926  * This is separate from destroy to facilitate the ability
927  * to reconnect without recreating the endpoint.
928  *
929  * This call is not reentrant, and must not be made in parallel
930  * on the same endpoint.
931  */
932 int
933 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
934 {
935 	int rc;
936 
937 	rpcrdma_clean_cq(ep->rep_cq);
938 	rc = rdma_disconnect(ia->ri_id);
939 	if (!rc) {
940 		/* returns without wait if not connected */
941 		wait_event_interruptible(ep->rep_connect_wait,
942 							ep->rep_connected != 1);
943 		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
944 			(ep->rep_connected == 1) ? "still " : "dis");
945 	} else {
946 		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
947 		ep->rep_connected = rc;
948 	}
949 	return rc;
950 }
951 
952 /*
953  * Initialize buffer memory
954  */
955 int
956 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
957 	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
958 {
959 	char *p;
960 	size_t len;
961 	int i, rc;
962 	struct rpcrdma_mw *r;
963 
964 	buf->rb_max_requests = cdata->max_requests;
965 	spin_lock_init(&buf->rb_lock);
966 	atomic_set(&buf->rb_credits, 1);
967 
968 	/* Need to allocate:
969 	 *   1.  arrays for send and recv pointers
970 	 *   2.  arrays of struct rpcrdma_req to fill in pointers
971 	 *   3.  array of struct rpcrdma_rep for replies
972 	 *   4.  padding, if any
973 	 *   5.  mw's, fmr's or frmr's, if any
974 	 * Send/recv buffers in req/rep need to be registered
975 	 */
976 
977 	len = buf->rb_max_requests *
978 		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
979 	len += cdata->padding;
980 	switch (ia->ri_memreg_strategy) {
981 	case RPCRDMA_FRMR:
982 		len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
983 				sizeof(struct rpcrdma_mw);
984 		break;
985 	case RPCRDMA_MTHCAFMR:
986 		/* TBD we are perhaps overallocating here */
987 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
988 				sizeof(struct rpcrdma_mw);
989 		break;
990 	case RPCRDMA_MEMWINDOWS_ASYNC:
991 	case RPCRDMA_MEMWINDOWS:
992 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
993 				sizeof(struct rpcrdma_mw);
994 		break;
995 	default:
996 		break;
997 	}
998 
999 	/* allocate 1, 4 and 5 in one shot */
1000 	p = kzalloc(len, GFP_KERNEL);
1001 	if (p == NULL) {
1002 		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1003 			__func__, len);
1004 		rc = -ENOMEM;
1005 		goto out;
1006 	}
1007 	buf->rb_pool = p;	/* for freeing it later */
1008 
1009 	buf->rb_send_bufs = (struct rpcrdma_req **) p;
1010 	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1011 	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1012 	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1013 
1014 	/*
1015 	 * Register the zeroed pad buffer, if any.
1016 	 */
1017 	if (cdata->padding) {
1018 		rc = rpcrdma_register_internal(ia, p, cdata->padding,
1019 					    &ep->rep_pad_mr, &ep->rep_pad);
1020 		if (rc)
1021 			goto out;
1022 	}
1023 	p += cdata->padding;
1024 
1025 	/*
1026 	 * Allocate the fmr's, or mw's for mw_bind chunk registration.
1027 	 * We "cycle" the mw's in order to minimize rkey reuse,
1028 	 * and also reduce unbind-to-bind collision.
1029 	 */
1030 	INIT_LIST_HEAD(&buf->rb_mws);
1031 	r = (struct rpcrdma_mw *)p;
1032 	switch (ia->ri_memreg_strategy) {
1033 	case RPCRDMA_FRMR:
1034 		for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1035 			r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1036 							 RPCRDMA_MAX_SEGS);
1037 			if (IS_ERR(r->r.frmr.fr_mr)) {
1038 				rc = PTR_ERR(r->r.frmr.fr_mr);
1039 				dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1040 					" failed %i\n", __func__, rc);
1041 				goto out;
1042 			}
1043 			r->r.frmr.fr_pgl =
1044 				ib_alloc_fast_reg_page_list(ia->ri_id->device,
1045 							    RPCRDMA_MAX_SEGS);
1046 			if (IS_ERR(r->r.frmr.fr_pgl)) {
1047 				rc = PTR_ERR(r->r.frmr.fr_pgl);
1048 				dprintk("RPC:       %s: "
1049 					"ib_alloc_fast_reg_page_list "
1050 					"failed %i\n", __func__, rc);
1051 				goto out;
1052 			}
1053 			list_add(&r->mw_list, &buf->rb_mws);
1054 			++r;
1055 		}
1056 		break;
1057 	case RPCRDMA_MTHCAFMR:
1058 		/* TBD we are perhaps overallocating here */
1059 		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1060 			static struct ib_fmr_attr fa =
1061 				{ RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1062 			r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1063 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1064 				&fa);
1065 			if (IS_ERR(r->r.fmr)) {
1066 				rc = PTR_ERR(r->r.fmr);
1067 				dprintk("RPC:       %s: ib_alloc_fmr"
1068 					" failed %i\n", __func__, rc);
1069 				goto out;
1070 			}
1071 			list_add(&r->mw_list, &buf->rb_mws);
1072 			++r;
1073 		}
1074 		break;
1075 	case RPCRDMA_MEMWINDOWS_ASYNC:
1076 	case RPCRDMA_MEMWINDOWS:
1077 		/* Allocate one extra request's worth, for full cycling */
1078 		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1079 			r->r.mw = ib_alloc_mw(ia->ri_pd);
1080 			if (IS_ERR(r->r.mw)) {
1081 				rc = PTR_ERR(r->r.mw);
1082 				dprintk("RPC:       %s: ib_alloc_mw"
1083 					" failed %i\n", __func__, rc);
1084 				goto out;
1085 			}
1086 			list_add(&r->mw_list, &buf->rb_mws);
1087 			++r;
1088 		}
1089 		break;
1090 	default:
1091 		break;
1092 	}
1093 
1094 	/*
1095 	 * Allocate/init the request/reply buffers. Doing this
1096 	 * using kmalloc for now -- one for each buf.
1097 	 */
1098 	for (i = 0; i < buf->rb_max_requests; i++) {
1099 		struct rpcrdma_req *req;
1100 		struct rpcrdma_rep *rep;
1101 
1102 		len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1103 		/* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1104 		/* Typical ~2400b, so rounding up saves work later */
1105 		if (len < 4096)
1106 			len = 4096;
1107 		req = kmalloc(len, GFP_KERNEL);
1108 		if (req == NULL) {
1109 			dprintk("RPC:       %s: request buffer %d alloc"
1110 				" failed\n", __func__, i);
1111 			rc = -ENOMEM;
1112 			goto out;
1113 		}
1114 		memset(req, 0, sizeof(struct rpcrdma_req));
1115 		buf->rb_send_bufs[i] = req;
1116 		buf->rb_send_bufs[i]->rl_buffer = buf;
1117 
1118 		rc = rpcrdma_register_internal(ia, req->rl_base,
1119 				len - offsetof(struct rpcrdma_req, rl_base),
1120 				&buf->rb_send_bufs[i]->rl_handle,
1121 				&buf->rb_send_bufs[i]->rl_iov);
1122 		if (rc)
1123 			goto out;
1124 
1125 		buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1126 
1127 		len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1128 		rep = kmalloc(len, GFP_KERNEL);
1129 		if (rep == NULL) {
1130 			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1131 				__func__, i);
1132 			rc = -ENOMEM;
1133 			goto out;
1134 		}
1135 		memset(rep, 0, sizeof(struct rpcrdma_rep));
1136 		buf->rb_recv_bufs[i] = rep;
1137 		buf->rb_recv_bufs[i]->rr_buffer = buf;
1138 		init_waitqueue_head(&rep->rr_unbind);
1139 
1140 		rc = rpcrdma_register_internal(ia, rep->rr_base,
1141 				len - offsetof(struct rpcrdma_rep, rr_base),
1142 				&buf->rb_recv_bufs[i]->rr_handle,
1143 				&buf->rb_recv_bufs[i]->rr_iov);
1144 		if (rc)
1145 			goto out;
1146 
1147 	}
1148 	dprintk("RPC:       %s: max_requests %d\n",
1149 		__func__, buf->rb_max_requests);
1150 	/* done */
1151 	return 0;
1152 out:
1153 	rpcrdma_buffer_destroy(buf);
1154 	return rc;
1155 }
1156 
1157 /*
1158  * Unregister and destroy buffer memory. Need to deal with
1159  * partial initialization, so it's callable from failed create.
1160  * Must be called before destroying endpoint, as registrations
1161  * reference it.
1162  */
1163 void
1164 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1165 {
1166 	int rc, i;
1167 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1168 	struct rpcrdma_mw *r;
1169 
1170 	/* clean up in reverse order from create
1171 	 *   1.  recv mr memory (mr free, then kfree)
1172 	 *   1a. bind mw memory
1173 	 *   2.  send mr memory (mr free, then kfree)
1174 	 *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1175 	 *   4.  arrays
1176 	 */
1177 	dprintk("RPC:       %s: entering\n", __func__);
1178 
1179 	for (i = 0; i < buf->rb_max_requests; i++) {
1180 		if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1181 			rpcrdma_deregister_internal(ia,
1182 					buf->rb_recv_bufs[i]->rr_handle,
1183 					&buf->rb_recv_bufs[i]->rr_iov);
1184 			kfree(buf->rb_recv_bufs[i]);
1185 		}
1186 		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1187 			while (!list_empty(&buf->rb_mws)) {
1188 				r = list_entry(buf->rb_mws.next,
1189 					struct rpcrdma_mw, mw_list);
1190 				list_del(&r->mw_list);
1191 				switch (ia->ri_memreg_strategy) {
1192 				case RPCRDMA_FRMR:
1193 					rc = ib_dereg_mr(r->r.frmr.fr_mr);
1194 					if (rc)
1195 						dprintk("RPC:       %s:"
1196 							" ib_dereg_mr"
1197 							" failed %i\n",
1198 							__func__, rc);
1199 					ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1200 					break;
1201 				case RPCRDMA_MTHCAFMR:
1202 					rc = ib_dealloc_fmr(r->r.fmr);
1203 					if (rc)
1204 						dprintk("RPC:       %s:"
1205 							" ib_dealloc_fmr"
1206 							" failed %i\n",
1207 							__func__, rc);
1208 					break;
1209 				case RPCRDMA_MEMWINDOWS_ASYNC:
1210 				case RPCRDMA_MEMWINDOWS:
1211 					rc = ib_dealloc_mw(r->r.mw);
1212 					if (rc)
1213 						dprintk("RPC:       %s:"
1214 							" ib_dealloc_mw"
1215 							" failed %i\n",
1216 							__func__, rc);
1217 					break;
1218 				default:
1219 					break;
1220 				}
1221 			}
1222 			rpcrdma_deregister_internal(ia,
1223 					buf->rb_send_bufs[i]->rl_handle,
1224 					&buf->rb_send_bufs[i]->rl_iov);
1225 			kfree(buf->rb_send_bufs[i]);
1226 		}
1227 	}
1228 
1229 	kfree(buf->rb_pool);
1230 }
1231 
1232 /*
1233  * Get a set of request/reply buffers.
1234  *
1235  * Reply buffer (if needed) is attached to send buffer upon return.
1236  * Rule:
1237  *    rb_send_index and rb_recv_index MUST always be pointing to the
1238  *    *next* available buffer (non-NULL). They are incremented after
1239  *    removing buffers, and decremented *before* returning them.
1240  */
1241 struct rpcrdma_req *
1242 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1243 {
1244 	struct rpcrdma_req *req;
1245 	unsigned long flags;
1246 	int i;
1247 	struct rpcrdma_mw *r;
1248 
1249 	spin_lock_irqsave(&buffers->rb_lock, flags);
1250 	if (buffers->rb_send_index == buffers->rb_max_requests) {
1251 		spin_unlock_irqrestore(&buffers->rb_lock, flags);
1252 		dprintk("RPC:       %s: out of request buffers\n", __func__);
1253 		return ((struct rpcrdma_req *)NULL);
1254 	}
1255 
1256 	req = buffers->rb_send_bufs[buffers->rb_send_index];
1257 	if (buffers->rb_send_index < buffers->rb_recv_index) {
1258 		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1259 			__func__,
1260 			buffers->rb_recv_index - buffers->rb_send_index);
1261 		req->rl_reply = NULL;
1262 	} else {
1263 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1264 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1265 	}
1266 	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1267 	if (!list_empty(&buffers->rb_mws)) {
1268 		i = RPCRDMA_MAX_SEGS - 1;
1269 		do {
1270 			r = list_entry(buffers->rb_mws.next,
1271 					struct rpcrdma_mw, mw_list);
1272 			list_del(&r->mw_list);
1273 			req->rl_segments[i].mr_chunk.rl_mw = r;
1274 		} while (--i >= 0);
1275 	}
1276 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1277 	return req;
1278 }
1279 
1280 /*
1281  * Put request/reply buffers back into pool.
1282  * Pre-decrement counter/array index.
1283  */
1284 void
1285 rpcrdma_buffer_put(struct rpcrdma_req *req)
1286 {
1287 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1288 	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1289 	int i;
1290 	unsigned long flags;
1291 
1292 	BUG_ON(req->rl_nchunks != 0);
1293 	spin_lock_irqsave(&buffers->rb_lock, flags);
1294 	buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1295 	req->rl_niovs = 0;
1296 	if (req->rl_reply) {
1297 		buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1298 		init_waitqueue_head(&req->rl_reply->rr_unbind);
1299 		req->rl_reply->rr_func = NULL;
1300 		req->rl_reply = NULL;
1301 	}
1302 	switch (ia->ri_memreg_strategy) {
1303 	case RPCRDMA_FRMR:
1304 	case RPCRDMA_MTHCAFMR:
1305 	case RPCRDMA_MEMWINDOWS_ASYNC:
1306 	case RPCRDMA_MEMWINDOWS:
1307 		/*
1308 		 * Cycle mw's back in reverse order, and "spin" them.
1309 		 * This delays and scrambles reuse as much as possible.
1310 		 */
1311 		i = 1;
1312 		do {
1313 			struct rpcrdma_mw **mw;
1314 			mw = &req->rl_segments[i].mr_chunk.rl_mw;
1315 			list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1316 			*mw = NULL;
1317 		} while (++i < RPCRDMA_MAX_SEGS);
1318 		list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1319 					&buffers->rb_mws);
1320 		req->rl_segments[0].mr_chunk.rl_mw = NULL;
1321 		break;
1322 	default:
1323 		break;
1324 	}
1325 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1326 }
1327 
1328 /*
1329  * Recover reply buffers from pool.
1330  * This happens when recovering from error conditions.
1331  * Post-increment counter/array index.
1332  */
1333 void
1334 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1335 {
1336 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1337 	unsigned long flags;
1338 
1339 	if (req->rl_iov.length == 0)	/* special case xprt_rdma_allocate() */
1340 		buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1341 	spin_lock_irqsave(&buffers->rb_lock, flags);
1342 	if (buffers->rb_recv_index < buffers->rb_max_requests) {
1343 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1344 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1345 	}
1346 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1347 }
1348 
1349 /*
1350  * Put reply buffers back into pool when not attached to
1351  * request. This happens in error conditions, and when
1352  * aborting unbinds. Pre-decrement counter/array index.
1353  */
1354 void
1355 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1356 {
1357 	struct rpcrdma_buffer *buffers = rep->rr_buffer;
1358 	unsigned long flags;
1359 
1360 	rep->rr_func = NULL;
1361 	spin_lock_irqsave(&buffers->rb_lock, flags);
1362 	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1363 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1364 }
1365 
1366 /*
1367  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1368  */
1369 
1370 int
1371 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1372 				struct ib_mr **mrp, struct ib_sge *iov)
1373 {
1374 	struct ib_phys_buf ipb;
1375 	struct ib_mr *mr;
1376 	int rc;
1377 
1378 	/*
1379 	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1380 	 */
1381 	iov->addr = ib_dma_map_single(ia->ri_id->device,
1382 			va, len, DMA_BIDIRECTIONAL);
1383 	iov->length = len;
1384 
1385 	if (ia->ri_have_dma_lkey) {
1386 		*mrp = NULL;
1387 		iov->lkey = ia->ri_dma_lkey;
1388 		return 0;
1389 	} else if (ia->ri_bind_mem != NULL) {
1390 		*mrp = NULL;
1391 		iov->lkey = ia->ri_bind_mem->lkey;
1392 		return 0;
1393 	}
1394 
1395 	ipb.addr = iov->addr;
1396 	ipb.size = iov->length;
1397 	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1398 			IB_ACCESS_LOCAL_WRITE, &iov->addr);
1399 
1400 	dprintk("RPC:       %s: phys convert: 0x%llx "
1401 			"registered 0x%llx length %d\n",
1402 			__func__, (unsigned long long)ipb.addr,
1403 			(unsigned long long)iov->addr, len);
1404 
1405 	if (IS_ERR(mr)) {
1406 		*mrp = NULL;
1407 		rc = PTR_ERR(mr);
1408 		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1409 	} else {
1410 		*mrp = mr;
1411 		iov->lkey = mr->lkey;
1412 		rc = 0;
1413 	}
1414 
1415 	return rc;
1416 }
1417 
1418 int
1419 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1420 				struct ib_mr *mr, struct ib_sge *iov)
1421 {
1422 	int rc;
1423 
1424 	ib_dma_unmap_single(ia->ri_id->device,
1425 			iov->addr, iov->length, DMA_BIDIRECTIONAL);
1426 
1427 	if (NULL == mr)
1428 		return 0;
1429 
1430 	rc = ib_dereg_mr(mr);
1431 	if (rc)
1432 		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1433 	return rc;
1434 }
1435 
1436 /*
1437  * Wrappers for chunk registration, shared by read/write chunk code.
1438  */
1439 
1440 static void
1441 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1442 {
1443 	seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1444 	seg->mr_dmalen = seg->mr_len;
1445 	if (seg->mr_page)
1446 		seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1447 				seg->mr_page, offset_in_page(seg->mr_offset),
1448 				seg->mr_dmalen, seg->mr_dir);
1449 	else
1450 		seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1451 				seg->mr_offset,
1452 				seg->mr_dmalen, seg->mr_dir);
1453 }
1454 
1455 static void
1456 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1457 {
1458 	if (seg->mr_page)
1459 		ib_dma_unmap_page(ia->ri_id->device,
1460 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1461 	else
1462 		ib_dma_unmap_single(ia->ri_id->device,
1463 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1464 }
1465 
1466 static int
1467 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1468 			int *nsegs, int writing, struct rpcrdma_ia *ia,
1469 			struct rpcrdma_xprt *r_xprt)
1470 {
1471 	struct rpcrdma_mr_seg *seg1 = seg;
1472 	struct ib_send_wr frmr_wr, *bad_wr;
1473 	u8 key;
1474 	int len, pageoff;
1475 	int i, rc;
1476 
1477 	pageoff = offset_in_page(seg1->mr_offset);
1478 	seg1->mr_offset -= pageoff;	/* start of page */
1479 	seg1->mr_len += pageoff;
1480 	len = -pageoff;
1481 	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1482 		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1483 	for (i = 0; i < *nsegs;) {
1484 		rpcrdma_map_one(ia, seg, writing);
1485 		seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->page_list[i] = seg->mr_dma;
1486 		len += seg->mr_len;
1487 		++seg;
1488 		++i;
1489 		/* Check for holes */
1490 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1491 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1492 			break;
1493 	}
1494 	dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1495 		__func__, seg1->mr_chunk.rl_mw, i);
1496 
1497 	/* Bump the key */
1498 	key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1499 	ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1500 
1501 	/* Prepare FRMR WR */
1502 	memset(&frmr_wr, 0, sizeof frmr_wr);
1503 	frmr_wr.opcode = IB_WR_FAST_REG_MR;
1504 	frmr_wr.send_flags = 0;			/* unsignaled */
1505 	frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1506 	frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1507 	frmr_wr.wr.fast_reg.page_list_len = i;
1508 	frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1509 	frmr_wr.wr.fast_reg.length = i << PAGE_SHIFT;
1510 	frmr_wr.wr.fast_reg.access_flags = (writing ?
1511 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1512 				IB_ACCESS_REMOTE_READ);
1513 	frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1514 	DECR_CQCOUNT(&r_xprt->rx_ep);
1515 
1516 	rc = ib_post_send(ia->ri_id->qp, &frmr_wr, &bad_wr);
1517 
1518 	if (rc) {
1519 		dprintk("RPC:       %s: failed ib_post_send for register,"
1520 			" status %i\n", __func__, rc);
1521 		while (i--)
1522 			rpcrdma_unmap_one(ia, --seg);
1523 	} else {
1524 		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1525 		seg1->mr_base = seg1->mr_dma + pageoff;
1526 		seg1->mr_nsegs = i;
1527 		seg1->mr_len = len;
1528 	}
1529 	*nsegs = i;
1530 	return rc;
1531 }
1532 
1533 static int
1534 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1535 			struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1536 {
1537 	struct rpcrdma_mr_seg *seg1 = seg;
1538 	struct ib_send_wr invalidate_wr, *bad_wr;
1539 	int rc;
1540 
1541 	while (seg1->mr_nsegs--)
1542 		rpcrdma_unmap_one(ia, seg++);
1543 
1544 	memset(&invalidate_wr, 0, sizeof invalidate_wr);
1545 	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1546 	invalidate_wr.send_flags = 0;			/* unsignaled */
1547 	invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1548 	DECR_CQCOUNT(&r_xprt->rx_ep);
1549 
1550 	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1551 	if (rc)
1552 		dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1553 			" status %i\n", __func__, rc);
1554 	return rc;
1555 }
1556 
1557 static int
1558 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1559 			int *nsegs, int writing, struct rpcrdma_ia *ia)
1560 {
1561 	struct rpcrdma_mr_seg *seg1 = seg;
1562 	u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1563 	int len, pageoff, i, rc;
1564 
1565 	pageoff = offset_in_page(seg1->mr_offset);
1566 	seg1->mr_offset -= pageoff;	/* start of page */
1567 	seg1->mr_len += pageoff;
1568 	len = -pageoff;
1569 	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1570 		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1571 	for (i = 0; i < *nsegs;) {
1572 		rpcrdma_map_one(ia, seg, writing);
1573 		physaddrs[i] = seg->mr_dma;
1574 		len += seg->mr_len;
1575 		++seg;
1576 		++i;
1577 		/* Check for holes */
1578 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1579 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1580 			break;
1581 	}
1582 	rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1583 				physaddrs, i, seg1->mr_dma);
1584 	if (rc) {
1585 		dprintk("RPC:       %s: failed ib_map_phys_fmr "
1586 			"%u@0x%llx+%i (%d)... status %i\n", __func__,
1587 			len, (unsigned long long)seg1->mr_dma,
1588 			pageoff, i, rc);
1589 		while (i--)
1590 			rpcrdma_unmap_one(ia, --seg);
1591 	} else {
1592 		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1593 		seg1->mr_base = seg1->mr_dma + pageoff;
1594 		seg1->mr_nsegs = i;
1595 		seg1->mr_len = len;
1596 	}
1597 	*nsegs = i;
1598 	return rc;
1599 }
1600 
1601 static int
1602 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1603 			struct rpcrdma_ia *ia)
1604 {
1605 	struct rpcrdma_mr_seg *seg1 = seg;
1606 	LIST_HEAD(l);
1607 	int rc;
1608 
1609 	list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1610 	rc = ib_unmap_fmr(&l);
1611 	while (seg1->mr_nsegs--)
1612 		rpcrdma_unmap_one(ia, seg++);
1613 	if (rc)
1614 		dprintk("RPC:       %s: failed ib_unmap_fmr,"
1615 			" status %i\n", __func__, rc);
1616 	return rc;
1617 }
1618 
1619 static int
1620 rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1621 			int *nsegs, int writing, struct rpcrdma_ia *ia,
1622 			struct rpcrdma_xprt *r_xprt)
1623 {
1624 	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1625 				  IB_ACCESS_REMOTE_READ);
1626 	struct ib_mw_bind param;
1627 	int rc;
1628 
1629 	*nsegs = 1;
1630 	rpcrdma_map_one(ia, seg, writing);
1631 	param.mr = ia->ri_bind_mem;
1632 	param.wr_id = 0ULL;	/* no send cookie */
1633 	param.addr = seg->mr_dma;
1634 	param.length = seg->mr_len;
1635 	param.send_flags = 0;
1636 	param.mw_access_flags = mem_priv;
1637 
1638 	DECR_CQCOUNT(&r_xprt->rx_ep);
1639 	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1640 	if (rc) {
1641 		dprintk("RPC:       %s: failed ib_bind_mw "
1642 			"%u@0x%llx status %i\n",
1643 			__func__, seg->mr_len,
1644 			(unsigned long long)seg->mr_dma, rc);
1645 		rpcrdma_unmap_one(ia, seg);
1646 	} else {
1647 		seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1648 		seg->mr_base = param.addr;
1649 		seg->mr_nsegs = 1;
1650 	}
1651 	return rc;
1652 }
1653 
1654 static int
1655 rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1656 			struct rpcrdma_ia *ia,
1657 			struct rpcrdma_xprt *r_xprt, void **r)
1658 {
1659 	struct ib_mw_bind param;
1660 	LIST_HEAD(l);
1661 	int rc;
1662 
1663 	BUG_ON(seg->mr_nsegs != 1);
1664 	param.mr = ia->ri_bind_mem;
1665 	param.addr = 0ULL;	/* unbind */
1666 	param.length = 0;
1667 	param.mw_access_flags = 0;
1668 	if (*r) {
1669 		param.wr_id = (u64) (unsigned long) *r;
1670 		param.send_flags = IB_SEND_SIGNALED;
1671 		INIT_CQCOUNT(&r_xprt->rx_ep);
1672 	} else {
1673 		param.wr_id = 0ULL;
1674 		param.send_flags = 0;
1675 		DECR_CQCOUNT(&r_xprt->rx_ep);
1676 	}
1677 	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1678 	rpcrdma_unmap_one(ia, seg);
1679 	if (rc)
1680 		dprintk("RPC:       %s: failed ib_(un)bind_mw,"
1681 			" status %i\n", __func__, rc);
1682 	else
1683 		*r = NULL;	/* will upcall on completion */
1684 	return rc;
1685 }
1686 
1687 static int
1688 rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1689 			int *nsegs, int writing, struct rpcrdma_ia *ia)
1690 {
1691 	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1692 				  IB_ACCESS_REMOTE_READ);
1693 	struct rpcrdma_mr_seg *seg1 = seg;
1694 	struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1695 	int len, i, rc = 0;
1696 
1697 	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1698 		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1699 	for (len = 0, i = 0; i < *nsegs;) {
1700 		rpcrdma_map_one(ia, seg, writing);
1701 		ipb[i].addr = seg->mr_dma;
1702 		ipb[i].size = seg->mr_len;
1703 		len += seg->mr_len;
1704 		++seg;
1705 		++i;
1706 		/* Check for holes */
1707 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1708 		    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1709 			break;
1710 	}
1711 	seg1->mr_base = seg1->mr_dma;
1712 	seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1713 				ipb, i, mem_priv, &seg1->mr_base);
1714 	if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1715 		rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1716 		dprintk("RPC:       %s: failed ib_reg_phys_mr "
1717 			"%u@0x%llx (%d)... status %i\n",
1718 			__func__, len,
1719 			(unsigned long long)seg1->mr_dma, i, rc);
1720 		while (i--)
1721 			rpcrdma_unmap_one(ia, --seg);
1722 	} else {
1723 		seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1724 		seg1->mr_nsegs = i;
1725 		seg1->mr_len = len;
1726 	}
1727 	*nsegs = i;
1728 	return rc;
1729 }
1730 
1731 static int
1732 rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1733 			struct rpcrdma_ia *ia)
1734 {
1735 	struct rpcrdma_mr_seg *seg1 = seg;
1736 	int rc;
1737 
1738 	rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1739 	seg1->mr_chunk.rl_mr = NULL;
1740 	while (seg1->mr_nsegs--)
1741 		rpcrdma_unmap_one(ia, seg++);
1742 	if (rc)
1743 		dprintk("RPC:       %s: failed ib_dereg_mr,"
1744 			" status %i\n", __func__, rc);
1745 	return rc;
1746 }
1747 
1748 int
1749 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1750 			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1751 {
1752 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1753 	int rc = 0;
1754 
1755 	switch (ia->ri_memreg_strategy) {
1756 
1757 #if RPCRDMA_PERSISTENT_REGISTRATION
1758 	case RPCRDMA_ALLPHYSICAL:
1759 		rpcrdma_map_one(ia, seg, writing);
1760 		seg->mr_rkey = ia->ri_bind_mem->rkey;
1761 		seg->mr_base = seg->mr_dma;
1762 		seg->mr_nsegs = 1;
1763 		nsegs = 1;
1764 		break;
1765 #endif
1766 
1767 	/* Registration using frmr registration */
1768 	case RPCRDMA_FRMR:
1769 		rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1770 		break;
1771 
1772 	/* Registration using fmr memory registration */
1773 	case RPCRDMA_MTHCAFMR:
1774 		rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1775 		break;
1776 
1777 	/* Registration using memory windows */
1778 	case RPCRDMA_MEMWINDOWS_ASYNC:
1779 	case RPCRDMA_MEMWINDOWS:
1780 		rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1781 		break;
1782 
1783 	/* Default registration each time */
1784 	default:
1785 		rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1786 		break;
1787 	}
1788 	if (rc)
1789 		return -1;
1790 
1791 	return nsegs;
1792 }
1793 
1794 int
1795 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1796 		struct rpcrdma_xprt *r_xprt, void *r)
1797 {
1798 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1799 	int nsegs = seg->mr_nsegs, rc;
1800 
1801 	switch (ia->ri_memreg_strategy) {
1802 
1803 #if RPCRDMA_PERSISTENT_REGISTRATION
1804 	case RPCRDMA_ALLPHYSICAL:
1805 		BUG_ON(nsegs != 1);
1806 		rpcrdma_unmap_one(ia, seg);
1807 		rc = 0;
1808 		break;
1809 #endif
1810 
1811 	case RPCRDMA_FRMR:
1812 		rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1813 		break;
1814 
1815 	case RPCRDMA_MTHCAFMR:
1816 		rc = rpcrdma_deregister_fmr_external(seg, ia);
1817 		break;
1818 
1819 	case RPCRDMA_MEMWINDOWS_ASYNC:
1820 	case RPCRDMA_MEMWINDOWS:
1821 		rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1822 		break;
1823 
1824 	default:
1825 		rc = rpcrdma_deregister_default_external(seg, ia);
1826 		break;
1827 	}
1828 	if (r) {
1829 		struct rpcrdma_rep *rep = r;
1830 		void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1831 		rep->rr_func = NULL;
1832 		func(rep);	/* dereg done, callback now */
1833 	}
1834 	return nsegs;
1835 }
1836 
1837 /*
1838  * Prepost any receive buffer, then post send.
1839  *
1840  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1841  */
1842 int
1843 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1844 		struct rpcrdma_ep *ep,
1845 		struct rpcrdma_req *req)
1846 {
1847 	struct ib_send_wr send_wr, *send_wr_fail;
1848 	struct rpcrdma_rep *rep = req->rl_reply;
1849 	int rc;
1850 
1851 	if (rep) {
1852 		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1853 		if (rc)
1854 			goto out;
1855 		req->rl_reply = NULL;
1856 	}
1857 
1858 	send_wr.next = NULL;
1859 	send_wr.wr_id = 0ULL;	/* no send cookie */
1860 	send_wr.sg_list = req->rl_send_iov;
1861 	send_wr.num_sge = req->rl_niovs;
1862 	send_wr.opcode = IB_WR_SEND;
1863 	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
1864 		ib_dma_sync_single_for_device(ia->ri_id->device,
1865 			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1866 			DMA_TO_DEVICE);
1867 	ib_dma_sync_single_for_device(ia->ri_id->device,
1868 		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1869 		DMA_TO_DEVICE);
1870 	ib_dma_sync_single_for_device(ia->ri_id->device,
1871 		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1872 		DMA_TO_DEVICE);
1873 
1874 	if (DECR_CQCOUNT(ep) > 0)
1875 		send_wr.send_flags = 0;
1876 	else { /* Provider must take a send completion every now and then */
1877 		INIT_CQCOUNT(ep);
1878 		send_wr.send_flags = IB_SEND_SIGNALED;
1879 	}
1880 
1881 	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1882 	if (rc)
1883 		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1884 			rc);
1885 out:
1886 	return rc;
1887 }
1888 
1889 /*
1890  * (Re)post a receive buffer.
1891  */
1892 int
1893 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1894 		     struct rpcrdma_ep *ep,
1895 		     struct rpcrdma_rep *rep)
1896 {
1897 	struct ib_recv_wr recv_wr, *recv_wr_fail;
1898 	int rc;
1899 
1900 	recv_wr.next = NULL;
1901 	recv_wr.wr_id = (u64) (unsigned long) rep;
1902 	recv_wr.sg_list = &rep->rr_iov;
1903 	recv_wr.num_sge = 1;
1904 
1905 	ib_dma_sync_single_for_cpu(ia->ri_id->device,
1906 		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1907 
1908 	DECR_CQCOUNT(ep);
1909 	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1910 
1911 	if (rc)
1912 		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1913 			rc);
1914 	return rc;
1915 }
1916