xref: /openbmc/linux/net/sunrpc/xprtrdma/verbs.c (revision d0b73b48)
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39 
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49 
50 #include <linux/interrupt.h>
51 #include <linux/pci.h>	/* for Tavor hack below */
52 #include <linux/slab.h>
53 
54 #include "xprt_rdma.h"
55 
56 /*
57  * Globals/Macros
58  */
59 
60 #ifdef RPC_DEBUG
61 # define RPCDBG_FACILITY	RPCDBG_TRANS
62 #endif
63 
64 /*
65  * internal functions
66  */
67 
68 /*
69  * handle replies in tasklet context, using a single, global list
70  * rdma tasklet function -- just turn around and call the func
71  * for all replies on the list
72  */
73 
74 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
75 static LIST_HEAD(rpcrdma_tasklets_g);
76 
77 static void
78 rpcrdma_run_tasklet(unsigned long data)
79 {
80 	struct rpcrdma_rep *rep;
81 	void (*func)(struct rpcrdma_rep *);
82 	unsigned long flags;
83 
84 	data = data;
85 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
86 	while (!list_empty(&rpcrdma_tasklets_g)) {
87 		rep = list_entry(rpcrdma_tasklets_g.next,
88 				 struct rpcrdma_rep, rr_list);
89 		list_del(&rep->rr_list);
90 		func = rep->rr_func;
91 		rep->rr_func = NULL;
92 		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
93 
94 		if (func)
95 			func(rep);
96 		else
97 			rpcrdma_recv_buffer_put(rep);
98 
99 		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
100 	}
101 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
102 }
103 
104 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
105 
106 static inline void
107 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
108 {
109 	unsigned long flags;
110 
111 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
112 	list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
113 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
114 	tasklet_schedule(&rpcrdma_tasklet_g);
115 }
116 
117 static void
118 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
119 {
120 	struct rpcrdma_ep *ep = context;
121 
122 	dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
123 		__func__, event->event, event->device->name, context);
124 	if (ep->rep_connected == 1) {
125 		ep->rep_connected = -EIO;
126 		ep->rep_func(ep);
127 		wake_up_all(&ep->rep_connect_wait);
128 	}
129 }
130 
131 static void
132 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
133 {
134 	struct rpcrdma_ep *ep = context;
135 
136 	dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
137 		__func__, event->event, event->device->name, context);
138 	if (ep->rep_connected == 1) {
139 		ep->rep_connected = -EIO;
140 		ep->rep_func(ep);
141 		wake_up_all(&ep->rep_connect_wait);
142 	}
143 }
144 
145 static inline
146 void rpcrdma_event_process(struct ib_wc *wc)
147 {
148 	struct rpcrdma_mw *frmr;
149 	struct rpcrdma_rep *rep =
150 			(struct rpcrdma_rep *)(unsigned long) wc->wr_id;
151 
152 	dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
153 		__func__, rep, wc->status, wc->opcode, wc->byte_len);
154 
155 	if (!rep) /* send or bind completion that we don't care about */
156 		return;
157 
158 	if (IB_WC_SUCCESS != wc->status) {
159 		dprintk("RPC:       %s: WC opcode %d status %X, connection lost\n",
160 			__func__, wc->opcode, wc->status);
161 		rep->rr_len = ~0U;
162 		if (wc->opcode != IB_WC_FAST_REG_MR && wc->opcode != IB_WC_LOCAL_INV)
163 			rpcrdma_schedule_tasklet(rep);
164 		return;
165 	}
166 
167 	switch (wc->opcode) {
168 	case IB_WC_FAST_REG_MR:
169 		frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
170 		frmr->r.frmr.state = FRMR_IS_VALID;
171 		break;
172 	case IB_WC_LOCAL_INV:
173 		frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
174 		frmr->r.frmr.state = FRMR_IS_INVALID;
175 		break;
176 	case IB_WC_RECV:
177 		rep->rr_len = wc->byte_len;
178 		ib_dma_sync_single_for_cpu(
179 			rdmab_to_ia(rep->rr_buffer)->ri_id->device,
180 			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
181 		/* Keep (only) the most recent credits, after check validity */
182 		if (rep->rr_len >= 16) {
183 			struct rpcrdma_msg *p =
184 					(struct rpcrdma_msg *) rep->rr_base;
185 			unsigned int credits = ntohl(p->rm_credit);
186 			if (credits == 0) {
187 				dprintk("RPC:       %s: server"
188 					" dropped credits to 0!\n", __func__);
189 				/* don't deadlock */
190 				credits = 1;
191 			} else if (credits > rep->rr_buffer->rb_max_requests) {
192 				dprintk("RPC:       %s: server"
193 					" over-crediting: %d (%d)\n",
194 					__func__, credits,
195 					rep->rr_buffer->rb_max_requests);
196 				credits = rep->rr_buffer->rb_max_requests;
197 			}
198 			atomic_set(&rep->rr_buffer->rb_credits, credits);
199 		}
200 		/* fall through */
201 	case IB_WC_BIND_MW:
202 		rpcrdma_schedule_tasklet(rep);
203 		break;
204 	default:
205 		dprintk("RPC:       %s: unexpected WC event %X\n",
206 			__func__, wc->opcode);
207 		break;
208 	}
209 }
210 
211 static inline int
212 rpcrdma_cq_poll(struct ib_cq *cq)
213 {
214 	struct ib_wc wc;
215 	int rc;
216 
217 	for (;;) {
218 		rc = ib_poll_cq(cq, 1, &wc);
219 		if (rc < 0) {
220 			dprintk("RPC:       %s: ib_poll_cq failed %i\n",
221 				__func__, rc);
222 			return rc;
223 		}
224 		if (rc == 0)
225 			break;
226 
227 		rpcrdma_event_process(&wc);
228 	}
229 
230 	return 0;
231 }
232 
233 /*
234  * rpcrdma_cq_event_upcall
235  *
236  * This upcall handles recv, send, bind and unbind events.
237  * It is reentrant but processes single events in order to maintain
238  * ordering of receives to keep server credits.
239  *
240  * It is the responsibility of the scheduled tasklet to return
241  * recv buffers to the pool. NOTE: this affects synchronization of
242  * connection shutdown. That is, the structures required for
243  * the completion of the reply handler must remain intact until
244  * all memory has been reclaimed.
245  *
246  * Note that send events are suppressed and do not result in an upcall.
247  */
248 static void
249 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
250 {
251 	int rc;
252 
253 	rc = rpcrdma_cq_poll(cq);
254 	if (rc)
255 		return;
256 
257 	rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
258 	if (rc) {
259 		dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
260 			__func__, rc);
261 		return;
262 	}
263 
264 	rpcrdma_cq_poll(cq);
265 }
266 
267 #ifdef RPC_DEBUG
268 static const char * const conn[] = {
269 	"address resolved",
270 	"address error",
271 	"route resolved",
272 	"route error",
273 	"connect request",
274 	"connect response",
275 	"connect error",
276 	"unreachable",
277 	"rejected",
278 	"established",
279 	"disconnected",
280 	"device removal"
281 };
282 #endif
283 
284 static int
285 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
286 {
287 	struct rpcrdma_xprt *xprt = id->context;
288 	struct rpcrdma_ia *ia = &xprt->rx_ia;
289 	struct rpcrdma_ep *ep = &xprt->rx_ep;
290 #ifdef RPC_DEBUG
291 	struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
292 #endif
293 	struct ib_qp_attr attr;
294 	struct ib_qp_init_attr iattr;
295 	int connstate = 0;
296 
297 	switch (event->event) {
298 	case RDMA_CM_EVENT_ADDR_RESOLVED:
299 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
300 		ia->ri_async_rc = 0;
301 		complete(&ia->ri_done);
302 		break;
303 	case RDMA_CM_EVENT_ADDR_ERROR:
304 		ia->ri_async_rc = -EHOSTUNREACH;
305 		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
306 			__func__, ep);
307 		complete(&ia->ri_done);
308 		break;
309 	case RDMA_CM_EVENT_ROUTE_ERROR:
310 		ia->ri_async_rc = -ENETUNREACH;
311 		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
312 			__func__, ep);
313 		complete(&ia->ri_done);
314 		break;
315 	case RDMA_CM_EVENT_ESTABLISHED:
316 		connstate = 1;
317 		ib_query_qp(ia->ri_id->qp, &attr,
318 			IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
319 			&iattr);
320 		dprintk("RPC:       %s: %d responder resources"
321 			" (%d initiator)\n",
322 			__func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
323 		goto connected;
324 	case RDMA_CM_EVENT_CONNECT_ERROR:
325 		connstate = -ENOTCONN;
326 		goto connected;
327 	case RDMA_CM_EVENT_UNREACHABLE:
328 		connstate = -ENETDOWN;
329 		goto connected;
330 	case RDMA_CM_EVENT_REJECTED:
331 		connstate = -ECONNREFUSED;
332 		goto connected;
333 	case RDMA_CM_EVENT_DISCONNECTED:
334 		connstate = -ECONNABORTED;
335 		goto connected;
336 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
337 		connstate = -ENODEV;
338 connected:
339 		dprintk("RPC:       %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
340 			__func__,
341 			(event->event <= 11) ? conn[event->event] :
342 						"unknown connection error",
343 			&addr->sin_addr.s_addr,
344 			ntohs(addr->sin_port),
345 			ep, event->event);
346 		atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
347 		dprintk("RPC:       %s: %sconnected\n",
348 					__func__, connstate > 0 ? "" : "dis");
349 		ep->rep_connected = connstate;
350 		ep->rep_func(ep);
351 		wake_up_all(&ep->rep_connect_wait);
352 		break;
353 	default:
354 		dprintk("RPC:       %s: unexpected CM event %d\n",
355 			__func__, event->event);
356 		break;
357 	}
358 
359 #ifdef RPC_DEBUG
360 	if (connstate == 1) {
361 		int ird = attr.max_dest_rd_atomic;
362 		int tird = ep->rep_remote_cma.responder_resources;
363 		printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
364 			"on %s, memreg %d slots %d ird %d%s\n",
365 			&addr->sin_addr.s_addr,
366 			ntohs(addr->sin_port),
367 			ia->ri_id->device->name,
368 			ia->ri_memreg_strategy,
369 			xprt->rx_buf.rb_max_requests,
370 			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
371 	} else if (connstate < 0) {
372 		printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
373 			&addr->sin_addr.s_addr,
374 			ntohs(addr->sin_port),
375 			connstate);
376 	}
377 #endif
378 
379 	return 0;
380 }
381 
382 static struct rdma_cm_id *
383 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
384 			struct rpcrdma_ia *ia, struct sockaddr *addr)
385 {
386 	struct rdma_cm_id *id;
387 	int rc;
388 
389 	init_completion(&ia->ri_done);
390 
391 	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
392 	if (IS_ERR(id)) {
393 		rc = PTR_ERR(id);
394 		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
395 			__func__, rc);
396 		return id;
397 	}
398 
399 	ia->ri_async_rc = -ETIMEDOUT;
400 	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
401 	if (rc) {
402 		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
403 			__func__, rc);
404 		goto out;
405 	}
406 	wait_for_completion_interruptible_timeout(&ia->ri_done,
407 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
408 	rc = ia->ri_async_rc;
409 	if (rc)
410 		goto out;
411 
412 	ia->ri_async_rc = -ETIMEDOUT;
413 	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
414 	if (rc) {
415 		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
416 			__func__, rc);
417 		goto out;
418 	}
419 	wait_for_completion_interruptible_timeout(&ia->ri_done,
420 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
421 	rc = ia->ri_async_rc;
422 	if (rc)
423 		goto out;
424 
425 	return id;
426 
427 out:
428 	rdma_destroy_id(id);
429 	return ERR_PTR(rc);
430 }
431 
432 /*
433  * Drain any cq, prior to teardown.
434  */
435 static void
436 rpcrdma_clean_cq(struct ib_cq *cq)
437 {
438 	struct ib_wc wc;
439 	int count = 0;
440 
441 	while (1 == ib_poll_cq(cq, 1, &wc))
442 		++count;
443 
444 	if (count)
445 		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
446 			__func__, count, wc.opcode);
447 }
448 
449 /*
450  * Exported functions.
451  */
452 
453 /*
454  * Open and initialize an Interface Adapter.
455  *  o initializes fields of struct rpcrdma_ia, including
456  *    interface and provider attributes and protection zone.
457  */
458 int
459 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
460 {
461 	int rc, mem_priv;
462 	struct ib_device_attr devattr;
463 	struct rpcrdma_ia *ia = &xprt->rx_ia;
464 
465 	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
466 	if (IS_ERR(ia->ri_id)) {
467 		rc = PTR_ERR(ia->ri_id);
468 		goto out1;
469 	}
470 
471 	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
472 	if (IS_ERR(ia->ri_pd)) {
473 		rc = PTR_ERR(ia->ri_pd);
474 		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
475 			__func__, rc);
476 		goto out2;
477 	}
478 
479 	/*
480 	 * Query the device to determine if the requested memory
481 	 * registration strategy is supported. If it isn't, set the
482 	 * strategy to a globally supported model.
483 	 */
484 	rc = ib_query_device(ia->ri_id->device, &devattr);
485 	if (rc) {
486 		dprintk("RPC:       %s: ib_query_device failed %d\n",
487 			__func__, rc);
488 		goto out2;
489 	}
490 
491 	if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
492 		ia->ri_have_dma_lkey = 1;
493 		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
494 	}
495 
496 	switch (memreg) {
497 	case RPCRDMA_MEMWINDOWS:
498 	case RPCRDMA_MEMWINDOWS_ASYNC:
499 		if (!(devattr.device_cap_flags & IB_DEVICE_MEM_WINDOW)) {
500 			dprintk("RPC:       %s: MEMWINDOWS registration "
501 				"specified but not supported by adapter, "
502 				"using slower RPCRDMA_REGISTER\n",
503 				__func__);
504 			memreg = RPCRDMA_REGISTER;
505 		}
506 		break;
507 	case RPCRDMA_MTHCAFMR:
508 		if (!ia->ri_id->device->alloc_fmr) {
509 #if RPCRDMA_PERSISTENT_REGISTRATION
510 			dprintk("RPC:       %s: MTHCAFMR registration "
511 				"specified but not supported by adapter, "
512 				"using riskier RPCRDMA_ALLPHYSICAL\n",
513 				__func__);
514 			memreg = RPCRDMA_ALLPHYSICAL;
515 #else
516 			dprintk("RPC:       %s: MTHCAFMR registration "
517 				"specified but not supported by adapter, "
518 				"using slower RPCRDMA_REGISTER\n",
519 				__func__);
520 			memreg = RPCRDMA_REGISTER;
521 #endif
522 		}
523 		break;
524 	case RPCRDMA_FRMR:
525 		/* Requires both frmr reg and local dma lkey */
526 		if ((devattr.device_cap_flags &
527 		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
528 		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
529 #if RPCRDMA_PERSISTENT_REGISTRATION
530 			dprintk("RPC:       %s: FRMR registration "
531 				"specified but not supported by adapter, "
532 				"using riskier RPCRDMA_ALLPHYSICAL\n",
533 				__func__);
534 			memreg = RPCRDMA_ALLPHYSICAL;
535 #else
536 			dprintk("RPC:       %s: FRMR registration "
537 				"specified but not supported by adapter, "
538 				"using slower RPCRDMA_REGISTER\n",
539 				__func__);
540 			memreg = RPCRDMA_REGISTER;
541 #endif
542 		}
543 		break;
544 	}
545 
546 	/*
547 	 * Optionally obtain an underlying physical identity mapping in
548 	 * order to do a memory window-based bind. This base registration
549 	 * is protected from remote access - that is enabled only by binding
550 	 * for the specific bytes targeted during each RPC operation, and
551 	 * revoked after the corresponding completion similar to a storage
552 	 * adapter.
553 	 */
554 	switch (memreg) {
555 	case RPCRDMA_BOUNCEBUFFERS:
556 	case RPCRDMA_REGISTER:
557 	case RPCRDMA_FRMR:
558 		break;
559 #if RPCRDMA_PERSISTENT_REGISTRATION
560 	case RPCRDMA_ALLPHYSICAL:
561 		mem_priv = IB_ACCESS_LOCAL_WRITE |
562 				IB_ACCESS_REMOTE_WRITE |
563 				IB_ACCESS_REMOTE_READ;
564 		goto register_setup;
565 #endif
566 	case RPCRDMA_MEMWINDOWS_ASYNC:
567 	case RPCRDMA_MEMWINDOWS:
568 		mem_priv = IB_ACCESS_LOCAL_WRITE |
569 				IB_ACCESS_MW_BIND;
570 		goto register_setup;
571 	case RPCRDMA_MTHCAFMR:
572 		if (ia->ri_have_dma_lkey)
573 			break;
574 		mem_priv = IB_ACCESS_LOCAL_WRITE;
575 	register_setup:
576 		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
577 		if (IS_ERR(ia->ri_bind_mem)) {
578 			printk(KERN_ALERT "%s: ib_get_dma_mr for "
579 				"phys register failed with %lX\n\t"
580 				"Will continue with degraded performance\n",
581 				__func__, PTR_ERR(ia->ri_bind_mem));
582 			memreg = RPCRDMA_REGISTER;
583 			ia->ri_bind_mem = NULL;
584 		}
585 		break;
586 	default:
587 		printk(KERN_ERR "%s: invalid memory registration mode %d\n",
588 				__func__, memreg);
589 		rc = -EINVAL;
590 		goto out2;
591 	}
592 	dprintk("RPC:       %s: memory registration strategy is %d\n",
593 		__func__, memreg);
594 
595 	/* Else will do memory reg/dereg for each chunk */
596 	ia->ri_memreg_strategy = memreg;
597 
598 	return 0;
599 out2:
600 	rdma_destroy_id(ia->ri_id);
601 	ia->ri_id = NULL;
602 out1:
603 	return rc;
604 }
605 
606 /*
607  * Clean up/close an IA.
608  *   o if event handles and PD have been initialized, free them.
609  *   o close the IA
610  */
611 void
612 rpcrdma_ia_close(struct rpcrdma_ia *ia)
613 {
614 	int rc;
615 
616 	dprintk("RPC:       %s: entering\n", __func__);
617 	if (ia->ri_bind_mem != NULL) {
618 		rc = ib_dereg_mr(ia->ri_bind_mem);
619 		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
620 			__func__, rc);
621 	}
622 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
623 		if (ia->ri_id->qp)
624 			rdma_destroy_qp(ia->ri_id);
625 		rdma_destroy_id(ia->ri_id);
626 		ia->ri_id = NULL;
627 	}
628 	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
629 		rc = ib_dealloc_pd(ia->ri_pd);
630 		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
631 			__func__, rc);
632 	}
633 }
634 
635 /*
636  * Create unconnected endpoint.
637  */
638 int
639 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
640 				struct rpcrdma_create_data_internal *cdata)
641 {
642 	struct ib_device_attr devattr;
643 	int rc, err;
644 
645 	rc = ib_query_device(ia->ri_id->device, &devattr);
646 	if (rc) {
647 		dprintk("RPC:       %s: ib_query_device failed %d\n",
648 			__func__, rc);
649 		return rc;
650 	}
651 
652 	/* check provider's send/recv wr limits */
653 	if (cdata->max_requests > devattr.max_qp_wr)
654 		cdata->max_requests = devattr.max_qp_wr;
655 
656 	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
657 	ep->rep_attr.qp_context = ep;
658 	/* send_cq and recv_cq initialized below */
659 	ep->rep_attr.srq = NULL;
660 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
661 	switch (ia->ri_memreg_strategy) {
662 	case RPCRDMA_FRMR:
663 		/* Add room for frmr register and invalidate WRs.
664 		 * 1. FRMR reg WR for head
665 		 * 2. FRMR invalidate WR for head
666 		 * 3. FRMR reg WR for pagelist
667 		 * 4. FRMR invalidate WR for pagelist
668 		 * 5. FRMR reg WR for tail
669 		 * 6. FRMR invalidate WR for tail
670 		 * 7. The RDMA_SEND WR
671 		 */
672 		ep->rep_attr.cap.max_send_wr *= 7;
673 		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
674 			cdata->max_requests = devattr.max_qp_wr / 7;
675 			if (!cdata->max_requests)
676 				return -EINVAL;
677 			ep->rep_attr.cap.max_send_wr = cdata->max_requests * 7;
678 		}
679 		break;
680 	case RPCRDMA_MEMWINDOWS_ASYNC:
681 	case RPCRDMA_MEMWINDOWS:
682 		/* Add room for mw_binds+unbinds - overkill! */
683 		ep->rep_attr.cap.max_send_wr++;
684 		ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
685 		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
686 			return -EINVAL;
687 		break;
688 	default:
689 		break;
690 	}
691 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
692 	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
693 	ep->rep_attr.cap.max_recv_sge = 1;
694 	ep->rep_attr.cap.max_inline_data = 0;
695 	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
696 	ep->rep_attr.qp_type = IB_QPT_RC;
697 	ep->rep_attr.port_num = ~0;
698 
699 	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
700 		"iovs: send %d recv %d\n",
701 		__func__,
702 		ep->rep_attr.cap.max_send_wr,
703 		ep->rep_attr.cap.max_recv_wr,
704 		ep->rep_attr.cap.max_send_sge,
705 		ep->rep_attr.cap.max_recv_sge);
706 
707 	/* set trigger for requesting send completion */
708 	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
709 	switch (ia->ri_memreg_strategy) {
710 	case RPCRDMA_MEMWINDOWS_ASYNC:
711 	case RPCRDMA_MEMWINDOWS:
712 		ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
713 		break;
714 	default:
715 		break;
716 	}
717 	if (ep->rep_cqinit <= 2)
718 		ep->rep_cqinit = 0;
719 	INIT_CQCOUNT(ep);
720 	ep->rep_ia = ia;
721 	init_waitqueue_head(&ep->rep_connect_wait);
722 
723 	/*
724 	 * Create a single cq for receive dto and mw_bind (only ever
725 	 * care about unbind, really). Send completions are suppressed.
726 	 * Use single threaded tasklet upcalls to maintain ordering.
727 	 */
728 	ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
729 				  rpcrdma_cq_async_error_upcall, NULL,
730 				  ep->rep_attr.cap.max_recv_wr +
731 				  ep->rep_attr.cap.max_send_wr + 1, 0);
732 	if (IS_ERR(ep->rep_cq)) {
733 		rc = PTR_ERR(ep->rep_cq);
734 		dprintk("RPC:       %s: ib_create_cq failed: %i\n",
735 			__func__, rc);
736 		goto out1;
737 	}
738 
739 	rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
740 	if (rc) {
741 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
742 			__func__, rc);
743 		goto out2;
744 	}
745 
746 	ep->rep_attr.send_cq = ep->rep_cq;
747 	ep->rep_attr.recv_cq = ep->rep_cq;
748 
749 	/* Initialize cma parameters */
750 
751 	/* RPC/RDMA does not use private data */
752 	ep->rep_remote_cma.private_data = NULL;
753 	ep->rep_remote_cma.private_data_len = 0;
754 
755 	/* Client offers RDMA Read but does not initiate */
756 	ep->rep_remote_cma.initiator_depth = 0;
757 	if (ia->ri_memreg_strategy == RPCRDMA_BOUNCEBUFFERS)
758 		ep->rep_remote_cma.responder_resources = 0;
759 	else if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
760 		ep->rep_remote_cma.responder_resources = 32;
761 	else
762 		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
763 
764 	ep->rep_remote_cma.retry_count = 7;
765 	ep->rep_remote_cma.flow_control = 0;
766 	ep->rep_remote_cma.rnr_retry_count = 0;
767 
768 	return 0;
769 
770 out2:
771 	err = ib_destroy_cq(ep->rep_cq);
772 	if (err)
773 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
774 			__func__, err);
775 out1:
776 	return rc;
777 }
778 
779 /*
780  * rpcrdma_ep_destroy
781  *
782  * Disconnect and destroy endpoint. After this, the only
783  * valid operations on the ep are to free it (if dynamically
784  * allocated) or re-create it.
785  *
786  * The caller's error handling must be sure to not leak the endpoint
787  * if this function fails.
788  */
789 int
790 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
791 {
792 	int rc;
793 
794 	dprintk("RPC:       %s: entering, connected is %d\n",
795 		__func__, ep->rep_connected);
796 
797 	if (ia->ri_id->qp) {
798 		rc = rpcrdma_ep_disconnect(ep, ia);
799 		if (rc)
800 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
801 				" returned %i\n", __func__, rc);
802 		rdma_destroy_qp(ia->ri_id);
803 		ia->ri_id->qp = NULL;
804 	}
805 
806 	/* padding - could be done in rpcrdma_buffer_destroy... */
807 	if (ep->rep_pad_mr) {
808 		rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
809 		ep->rep_pad_mr = NULL;
810 	}
811 
812 	rpcrdma_clean_cq(ep->rep_cq);
813 	rc = ib_destroy_cq(ep->rep_cq);
814 	if (rc)
815 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
816 			__func__, rc);
817 
818 	return rc;
819 }
820 
821 /*
822  * Connect unconnected endpoint.
823  */
824 int
825 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
826 {
827 	struct rdma_cm_id *id;
828 	int rc = 0;
829 	int retry_count = 0;
830 
831 	if (ep->rep_connected != 0) {
832 		struct rpcrdma_xprt *xprt;
833 retry:
834 		rc = rpcrdma_ep_disconnect(ep, ia);
835 		if (rc && rc != -ENOTCONN)
836 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
837 				" status %i\n", __func__, rc);
838 		rpcrdma_clean_cq(ep->rep_cq);
839 
840 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
841 		id = rpcrdma_create_id(xprt, ia,
842 				(struct sockaddr *)&xprt->rx_data.addr);
843 		if (IS_ERR(id)) {
844 			rc = PTR_ERR(id);
845 			goto out;
846 		}
847 		/* TEMP TEMP TEMP - fail if new device:
848 		 * Deregister/remarshal *all* requests!
849 		 * Close and recreate adapter, pd, etc!
850 		 * Re-determine all attributes still sane!
851 		 * More stuff I haven't thought of!
852 		 * Rrrgh!
853 		 */
854 		if (ia->ri_id->device != id->device) {
855 			printk("RPC:       %s: can't reconnect on "
856 				"different device!\n", __func__);
857 			rdma_destroy_id(id);
858 			rc = -ENETDOWN;
859 			goto out;
860 		}
861 		/* END TEMP */
862 		rdma_destroy_qp(ia->ri_id);
863 		rdma_destroy_id(ia->ri_id);
864 		ia->ri_id = id;
865 	}
866 
867 	rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
868 	if (rc) {
869 		dprintk("RPC:       %s: rdma_create_qp failed %i\n",
870 			__func__, rc);
871 		goto out;
872 	}
873 
874 /* XXX Tavor device performs badly with 2K MTU! */
875 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
876 	struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
877 	if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
878 	    (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
879 	     pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
880 		struct ib_qp_attr attr = {
881 			.path_mtu = IB_MTU_1024
882 		};
883 		rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
884 	}
885 }
886 
887 	ep->rep_connected = 0;
888 
889 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
890 	if (rc) {
891 		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
892 				__func__, rc);
893 		goto out;
894 	}
895 
896 	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
897 
898 	/*
899 	 * Check state. A non-peer reject indicates no listener
900 	 * (ECONNREFUSED), which may be a transient state. All
901 	 * others indicate a transport condition which has already
902 	 * undergone a best-effort.
903 	 */
904 	if (ep->rep_connected == -ECONNREFUSED &&
905 	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
906 		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
907 		goto retry;
908 	}
909 	if (ep->rep_connected <= 0) {
910 		/* Sometimes, the only way to reliably connect to remote
911 		 * CMs is to use same nonzero values for ORD and IRD. */
912 		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
913 		    (ep->rep_remote_cma.responder_resources == 0 ||
914 		     ep->rep_remote_cma.initiator_depth !=
915 				ep->rep_remote_cma.responder_resources)) {
916 			if (ep->rep_remote_cma.responder_resources == 0)
917 				ep->rep_remote_cma.responder_resources = 1;
918 			ep->rep_remote_cma.initiator_depth =
919 				ep->rep_remote_cma.responder_resources;
920 			goto retry;
921 		}
922 		rc = ep->rep_connected;
923 	} else {
924 		dprintk("RPC:       %s: connected\n", __func__);
925 	}
926 
927 out:
928 	if (rc)
929 		ep->rep_connected = rc;
930 	return rc;
931 }
932 
933 /*
934  * rpcrdma_ep_disconnect
935  *
936  * This is separate from destroy to facilitate the ability
937  * to reconnect without recreating the endpoint.
938  *
939  * This call is not reentrant, and must not be made in parallel
940  * on the same endpoint.
941  */
942 int
943 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
944 {
945 	int rc;
946 
947 	rpcrdma_clean_cq(ep->rep_cq);
948 	rc = rdma_disconnect(ia->ri_id);
949 	if (!rc) {
950 		/* returns without wait if not connected */
951 		wait_event_interruptible(ep->rep_connect_wait,
952 							ep->rep_connected != 1);
953 		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
954 			(ep->rep_connected == 1) ? "still " : "dis");
955 	} else {
956 		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
957 		ep->rep_connected = rc;
958 	}
959 	return rc;
960 }
961 
962 /*
963  * Initialize buffer memory
964  */
965 int
966 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
967 	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
968 {
969 	char *p;
970 	size_t len;
971 	int i, rc;
972 	struct rpcrdma_mw *r;
973 
974 	buf->rb_max_requests = cdata->max_requests;
975 	spin_lock_init(&buf->rb_lock);
976 	atomic_set(&buf->rb_credits, 1);
977 
978 	/* Need to allocate:
979 	 *   1.  arrays for send and recv pointers
980 	 *   2.  arrays of struct rpcrdma_req to fill in pointers
981 	 *   3.  array of struct rpcrdma_rep for replies
982 	 *   4.  padding, if any
983 	 *   5.  mw's, fmr's or frmr's, if any
984 	 * Send/recv buffers in req/rep need to be registered
985 	 */
986 
987 	len = buf->rb_max_requests *
988 		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
989 	len += cdata->padding;
990 	switch (ia->ri_memreg_strategy) {
991 	case RPCRDMA_FRMR:
992 		len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
993 				sizeof(struct rpcrdma_mw);
994 		break;
995 	case RPCRDMA_MTHCAFMR:
996 		/* TBD we are perhaps overallocating here */
997 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
998 				sizeof(struct rpcrdma_mw);
999 		break;
1000 	case RPCRDMA_MEMWINDOWS_ASYNC:
1001 	case RPCRDMA_MEMWINDOWS:
1002 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
1003 				sizeof(struct rpcrdma_mw);
1004 		break;
1005 	default:
1006 		break;
1007 	}
1008 
1009 	/* allocate 1, 4 and 5 in one shot */
1010 	p = kzalloc(len, GFP_KERNEL);
1011 	if (p == NULL) {
1012 		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1013 			__func__, len);
1014 		rc = -ENOMEM;
1015 		goto out;
1016 	}
1017 	buf->rb_pool = p;	/* for freeing it later */
1018 
1019 	buf->rb_send_bufs = (struct rpcrdma_req **) p;
1020 	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1021 	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1022 	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1023 
1024 	/*
1025 	 * Register the zeroed pad buffer, if any.
1026 	 */
1027 	if (cdata->padding) {
1028 		rc = rpcrdma_register_internal(ia, p, cdata->padding,
1029 					    &ep->rep_pad_mr, &ep->rep_pad);
1030 		if (rc)
1031 			goto out;
1032 	}
1033 	p += cdata->padding;
1034 
1035 	/*
1036 	 * Allocate the fmr's, or mw's for mw_bind chunk registration.
1037 	 * We "cycle" the mw's in order to minimize rkey reuse,
1038 	 * and also reduce unbind-to-bind collision.
1039 	 */
1040 	INIT_LIST_HEAD(&buf->rb_mws);
1041 	r = (struct rpcrdma_mw *)p;
1042 	switch (ia->ri_memreg_strategy) {
1043 	case RPCRDMA_FRMR:
1044 		for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1045 			r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1046 							 RPCRDMA_MAX_SEGS);
1047 			if (IS_ERR(r->r.frmr.fr_mr)) {
1048 				rc = PTR_ERR(r->r.frmr.fr_mr);
1049 				dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1050 					" failed %i\n", __func__, rc);
1051 				goto out;
1052 			}
1053 			r->r.frmr.fr_pgl =
1054 				ib_alloc_fast_reg_page_list(ia->ri_id->device,
1055 							    RPCRDMA_MAX_SEGS);
1056 			if (IS_ERR(r->r.frmr.fr_pgl)) {
1057 				rc = PTR_ERR(r->r.frmr.fr_pgl);
1058 				dprintk("RPC:       %s: "
1059 					"ib_alloc_fast_reg_page_list "
1060 					"failed %i\n", __func__, rc);
1061 				goto out;
1062 			}
1063 			list_add(&r->mw_list, &buf->rb_mws);
1064 			++r;
1065 		}
1066 		break;
1067 	case RPCRDMA_MTHCAFMR:
1068 		/* TBD we are perhaps overallocating here */
1069 		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1070 			static struct ib_fmr_attr fa =
1071 				{ RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1072 			r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1073 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1074 				&fa);
1075 			if (IS_ERR(r->r.fmr)) {
1076 				rc = PTR_ERR(r->r.fmr);
1077 				dprintk("RPC:       %s: ib_alloc_fmr"
1078 					" failed %i\n", __func__, rc);
1079 				goto out;
1080 			}
1081 			list_add(&r->mw_list, &buf->rb_mws);
1082 			++r;
1083 		}
1084 		break;
1085 	case RPCRDMA_MEMWINDOWS_ASYNC:
1086 	case RPCRDMA_MEMWINDOWS:
1087 		/* Allocate one extra request's worth, for full cycling */
1088 		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1089 			r->r.mw = ib_alloc_mw(ia->ri_pd);
1090 			if (IS_ERR(r->r.mw)) {
1091 				rc = PTR_ERR(r->r.mw);
1092 				dprintk("RPC:       %s: ib_alloc_mw"
1093 					" failed %i\n", __func__, rc);
1094 				goto out;
1095 			}
1096 			list_add(&r->mw_list, &buf->rb_mws);
1097 			++r;
1098 		}
1099 		break;
1100 	default:
1101 		break;
1102 	}
1103 
1104 	/*
1105 	 * Allocate/init the request/reply buffers. Doing this
1106 	 * using kmalloc for now -- one for each buf.
1107 	 */
1108 	for (i = 0; i < buf->rb_max_requests; i++) {
1109 		struct rpcrdma_req *req;
1110 		struct rpcrdma_rep *rep;
1111 
1112 		len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
1113 		/* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
1114 		/* Typical ~2400b, so rounding up saves work later */
1115 		if (len < 4096)
1116 			len = 4096;
1117 		req = kmalloc(len, GFP_KERNEL);
1118 		if (req == NULL) {
1119 			dprintk("RPC:       %s: request buffer %d alloc"
1120 				" failed\n", __func__, i);
1121 			rc = -ENOMEM;
1122 			goto out;
1123 		}
1124 		memset(req, 0, sizeof(struct rpcrdma_req));
1125 		buf->rb_send_bufs[i] = req;
1126 		buf->rb_send_bufs[i]->rl_buffer = buf;
1127 
1128 		rc = rpcrdma_register_internal(ia, req->rl_base,
1129 				len - offsetof(struct rpcrdma_req, rl_base),
1130 				&buf->rb_send_bufs[i]->rl_handle,
1131 				&buf->rb_send_bufs[i]->rl_iov);
1132 		if (rc)
1133 			goto out;
1134 
1135 		buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1136 
1137 		len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1138 		rep = kmalloc(len, GFP_KERNEL);
1139 		if (rep == NULL) {
1140 			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1141 				__func__, i);
1142 			rc = -ENOMEM;
1143 			goto out;
1144 		}
1145 		memset(rep, 0, sizeof(struct rpcrdma_rep));
1146 		buf->rb_recv_bufs[i] = rep;
1147 		buf->rb_recv_bufs[i]->rr_buffer = buf;
1148 		init_waitqueue_head(&rep->rr_unbind);
1149 
1150 		rc = rpcrdma_register_internal(ia, rep->rr_base,
1151 				len - offsetof(struct rpcrdma_rep, rr_base),
1152 				&buf->rb_recv_bufs[i]->rr_handle,
1153 				&buf->rb_recv_bufs[i]->rr_iov);
1154 		if (rc)
1155 			goto out;
1156 
1157 	}
1158 	dprintk("RPC:       %s: max_requests %d\n",
1159 		__func__, buf->rb_max_requests);
1160 	/* done */
1161 	return 0;
1162 out:
1163 	rpcrdma_buffer_destroy(buf);
1164 	return rc;
1165 }
1166 
1167 /*
1168  * Unregister and destroy buffer memory. Need to deal with
1169  * partial initialization, so it's callable from failed create.
1170  * Must be called before destroying endpoint, as registrations
1171  * reference it.
1172  */
1173 void
1174 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1175 {
1176 	int rc, i;
1177 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1178 	struct rpcrdma_mw *r;
1179 
1180 	/* clean up in reverse order from create
1181 	 *   1.  recv mr memory (mr free, then kfree)
1182 	 *   1a. bind mw memory
1183 	 *   2.  send mr memory (mr free, then kfree)
1184 	 *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1185 	 *   4.  arrays
1186 	 */
1187 	dprintk("RPC:       %s: entering\n", __func__);
1188 
1189 	for (i = 0; i < buf->rb_max_requests; i++) {
1190 		if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1191 			rpcrdma_deregister_internal(ia,
1192 					buf->rb_recv_bufs[i]->rr_handle,
1193 					&buf->rb_recv_bufs[i]->rr_iov);
1194 			kfree(buf->rb_recv_bufs[i]);
1195 		}
1196 		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1197 			while (!list_empty(&buf->rb_mws)) {
1198 				r = list_entry(buf->rb_mws.next,
1199 					struct rpcrdma_mw, mw_list);
1200 				list_del(&r->mw_list);
1201 				switch (ia->ri_memreg_strategy) {
1202 				case RPCRDMA_FRMR:
1203 					rc = ib_dereg_mr(r->r.frmr.fr_mr);
1204 					if (rc)
1205 						dprintk("RPC:       %s:"
1206 							" ib_dereg_mr"
1207 							" failed %i\n",
1208 							__func__, rc);
1209 					ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1210 					break;
1211 				case RPCRDMA_MTHCAFMR:
1212 					rc = ib_dealloc_fmr(r->r.fmr);
1213 					if (rc)
1214 						dprintk("RPC:       %s:"
1215 							" ib_dealloc_fmr"
1216 							" failed %i\n",
1217 							__func__, rc);
1218 					break;
1219 				case RPCRDMA_MEMWINDOWS_ASYNC:
1220 				case RPCRDMA_MEMWINDOWS:
1221 					rc = ib_dealloc_mw(r->r.mw);
1222 					if (rc)
1223 						dprintk("RPC:       %s:"
1224 							" ib_dealloc_mw"
1225 							" failed %i\n",
1226 							__func__, rc);
1227 					break;
1228 				default:
1229 					break;
1230 				}
1231 			}
1232 			rpcrdma_deregister_internal(ia,
1233 					buf->rb_send_bufs[i]->rl_handle,
1234 					&buf->rb_send_bufs[i]->rl_iov);
1235 			kfree(buf->rb_send_bufs[i]);
1236 		}
1237 	}
1238 
1239 	kfree(buf->rb_pool);
1240 }
1241 
1242 /*
1243  * Get a set of request/reply buffers.
1244  *
1245  * Reply buffer (if needed) is attached to send buffer upon return.
1246  * Rule:
1247  *    rb_send_index and rb_recv_index MUST always be pointing to the
1248  *    *next* available buffer (non-NULL). They are incremented after
1249  *    removing buffers, and decremented *before* returning them.
1250  */
1251 struct rpcrdma_req *
1252 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1253 {
1254 	struct rpcrdma_req *req;
1255 	unsigned long flags;
1256 	int i;
1257 	struct rpcrdma_mw *r;
1258 
1259 	spin_lock_irqsave(&buffers->rb_lock, flags);
1260 	if (buffers->rb_send_index == buffers->rb_max_requests) {
1261 		spin_unlock_irqrestore(&buffers->rb_lock, flags);
1262 		dprintk("RPC:       %s: out of request buffers\n", __func__);
1263 		return ((struct rpcrdma_req *)NULL);
1264 	}
1265 
1266 	req = buffers->rb_send_bufs[buffers->rb_send_index];
1267 	if (buffers->rb_send_index < buffers->rb_recv_index) {
1268 		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1269 			__func__,
1270 			buffers->rb_recv_index - buffers->rb_send_index);
1271 		req->rl_reply = NULL;
1272 	} else {
1273 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1274 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1275 	}
1276 	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1277 	if (!list_empty(&buffers->rb_mws)) {
1278 		i = RPCRDMA_MAX_SEGS - 1;
1279 		do {
1280 			r = list_entry(buffers->rb_mws.next,
1281 					struct rpcrdma_mw, mw_list);
1282 			list_del(&r->mw_list);
1283 			req->rl_segments[i].mr_chunk.rl_mw = r;
1284 		} while (--i >= 0);
1285 	}
1286 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1287 	return req;
1288 }
1289 
1290 /*
1291  * Put request/reply buffers back into pool.
1292  * Pre-decrement counter/array index.
1293  */
1294 void
1295 rpcrdma_buffer_put(struct rpcrdma_req *req)
1296 {
1297 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1298 	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1299 	int i;
1300 	unsigned long flags;
1301 
1302 	BUG_ON(req->rl_nchunks != 0);
1303 	spin_lock_irqsave(&buffers->rb_lock, flags);
1304 	buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1305 	req->rl_niovs = 0;
1306 	if (req->rl_reply) {
1307 		buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1308 		init_waitqueue_head(&req->rl_reply->rr_unbind);
1309 		req->rl_reply->rr_func = NULL;
1310 		req->rl_reply = NULL;
1311 	}
1312 	switch (ia->ri_memreg_strategy) {
1313 	case RPCRDMA_FRMR:
1314 	case RPCRDMA_MTHCAFMR:
1315 	case RPCRDMA_MEMWINDOWS_ASYNC:
1316 	case RPCRDMA_MEMWINDOWS:
1317 		/*
1318 		 * Cycle mw's back in reverse order, and "spin" them.
1319 		 * This delays and scrambles reuse as much as possible.
1320 		 */
1321 		i = 1;
1322 		do {
1323 			struct rpcrdma_mw **mw;
1324 			mw = &req->rl_segments[i].mr_chunk.rl_mw;
1325 			list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1326 			*mw = NULL;
1327 		} while (++i < RPCRDMA_MAX_SEGS);
1328 		list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1329 					&buffers->rb_mws);
1330 		req->rl_segments[0].mr_chunk.rl_mw = NULL;
1331 		break;
1332 	default:
1333 		break;
1334 	}
1335 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1336 }
1337 
1338 /*
1339  * Recover reply buffers from pool.
1340  * This happens when recovering from error conditions.
1341  * Post-increment counter/array index.
1342  */
1343 void
1344 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1345 {
1346 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1347 	unsigned long flags;
1348 
1349 	if (req->rl_iov.length == 0)	/* special case xprt_rdma_allocate() */
1350 		buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1351 	spin_lock_irqsave(&buffers->rb_lock, flags);
1352 	if (buffers->rb_recv_index < buffers->rb_max_requests) {
1353 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1354 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1355 	}
1356 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1357 }
1358 
1359 /*
1360  * Put reply buffers back into pool when not attached to
1361  * request. This happens in error conditions, and when
1362  * aborting unbinds. Pre-decrement counter/array index.
1363  */
1364 void
1365 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1366 {
1367 	struct rpcrdma_buffer *buffers = rep->rr_buffer;
1368 	unsigned long flags;
1369 
1370 	rep->rr_func = NULL;
1371 	spin_lock_irqsave(&buffers->rb_lock, flags);
1372 	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1373 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1374 }
1375 
1376 /*
1377  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1378  */
1379 
1380 int
1381 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1382 				struct ib_mr **mrp, struct ib_sge *iov)
1383 {
1384 	struct ib_phys_buf ipb;
1385 	struct ib_mr *mr;
1386 	int rc;
1387 
1388 	/*
1389 	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1390 	 */
1391 	iov->addr = ib_dma_map_single(ia->ri_id->device,
1392 			va, len, DMA_BIDIRECTIONAL);
1393 	iov->length = len;
1394 
1395 	if (ia->ri_have_dma_lkey) {
1396 		*mrp = NULL;
1397 		iov->lkey = ia->ri_dma_lkey;
1398 		return 0;
1399 	} else if (ia->ri_bind_mem != NULL) {
1400 		*mrp = NULL;
1401 		iov->lkey = ia->ri_bind_mem->lkey;
1402 		return 0;
1403 	}
1404 
1405 	ipb.addr = iov->addr;
1406 	ipb.size = iov->length;
1407 	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1408 			IB_ACCESS_LOCAL_WRITE, &iov->addr);
1409 
1410 	dprintk("RPC:       %s: phys convert: 0x%llx "
1411 			"registered 0x%llx length %d\n",
1412 			__func__, (unsigned long long)ipb.addr,
1413 			(unsigned long long)iov->addr, len);
1414 
1415 	if (IS_ERR(mr)) {
1416 		*mrp = NULL;
1417 		rc = PTR_ERR(mr);
1418 		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1419 	} else {
1420 		*mrp = mr;
1421 		iov->lkey = mr->lkey;
1422 		rc = 0;
1423 	}
1424 
1425 	return rc;
1426 }
1427 
1428 int
1429 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1430 				struct ib_mr *mr, struct ib_sge *iov)
1431 {
1432 	int rc;
1433 
1434 	ib_dma_unmap_single(ia->ri_id->device,
1435 			iov->addr, iov->length, DMA_BIDIRECTIONAL);
1436 
1437 	if (NULL == mr)
1438 		return 0;
1439 
1440 	rc = ib_dereg_mr(mr);
1441 	if (rc)
1442 		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1443 	return rc;
1444 }
1445 
1446 /*
1447  * Wrappers for chunk registration, shared by read/write chunk code.
1448  */
1449 
1450 static void
1451 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1452 {
1453 	seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1454 	seg->mr_dmalen = seg->mr_len;
1455 	if (seg->mr_page)
1456 		seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1457 				seg->mr_page, offset_in_page(seg->mr_offset),
1458 				seg->mr_dmalen, seg->mr_dir);
1459 	else
1460 		seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1461 				seg->mr_offset,
1462 				seg->mr_dmalen, seg->mr_dir);
1463 	if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1464 		dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1465 			__func__,
1466 			(unsigned long long)seg->mr_dma,
1467 			seg->mr_offset, seg->mr_dmalen);
1468 	}
1469 }
1470 
1471 static void
1472 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1473 {
1474 	if (seg->mr_page)
1475 		ib_dma_unmap_page(ia->ri_id->device,
1476 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1477 	else
1478 		ib_dma_unmap_single(ia->ri_id->device,
1479 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1480 }
1481 
1482 static int
1483 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1484 			int *nsegs, int writing, struct rpcrdma_ia *ia,
1485 			struct rpcrdma_xprt *r_xprt)
1486 {
1487 	struct rpcrdma_mr_seg *seg1 = seg;
1488 	struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
1489 
1490 	u8 key;
1491 	int len, pageoff;
1492 	int i, rc;
1493 	int seg_len;
1494 	u64 pa;
1495 	int page_no;
1496 
1497 	pageoff = offset_in_page(seg1->mr_offset);
1498 	seg1->mr_offset -= pageoff;	/* start of page */
1499 	seg1->mr_len += pageoff;
1500 	len = -pageoff;
1501 	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1502 		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1503 	for (page_no = i = 0; i < *nsegs;) {
1504 		rpcrdma_map_one(ia, seg, writing);
1505 		pa = seg->mr_dma;
1506 		for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1507 			seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->
1508 				page_list[page_no++] = pa;
1509 			pa += PAGE_SIZE;
1510 		}
1511 		len += seg->mr_len;
1512 		++seg;
1513 		++i;
1514 		/* Check for holes */
1515 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1516 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1517 			break;
1518 	}
1519 	dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1520 		__func__, seg1->mr_chunk.rl_mw, i);
1521 
1522 	if (unlikely(seg1->mr_chunk.rl_mw->r.frmr.state == FRMR_IS_VALID)) {
1523 		dprintk("RPC:       %s: frmr %x left valid, posting invalidate.\n",
1524 			__func__,
1525 			seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey);
1526 		/* Invalidate before using. */
1527 		memset(&invalidate_wr, 0, sizeof invalidate_wr);
1528 		invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1529 		invalidate_wr.next = &frmr_wr;
1530 		invalidate_wr.opcode = IB_WR_LOCAL_INV;
1531 		invalidate_wr.send_flags = IB_SEND_SIGNALED;
1532 		invalidate_wr.ex.invalidate_rkey =
1533 			seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1534 		DECR_CQCOUNT(&r_xprt->rx_ep);
1535 		post_wr = &invalidate_wr;
1536 	} else
1537 		post_wr = &frmr_wr;
1538 
1539 	/* Bump the key */
1540 	key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1541 	ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1542 
1543 	/* Prepare FRMR WR */
1544 	memset(&frmr_wr, 0, sizeof frmr_wr);
1545 	frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1546 	frmr_wr.opcode = IB_WR_FAST_REG_MR;
1547 	frmr_wr.send_flags = IB_SEND_SIGNALED;
1548 	frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1549 	frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1550 	frmr_wr.wr.fast_reg.page_list_len = page_no;
1551 	frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1552 	frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1553 	BUG_ON(frmr_wr.wr.fast_reg.length < len);
1554 	frmr_wr.wr.fast_reg.access_flags = (writing ?
1555 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1556 				IB_ACCESS_REMOTE_READ);
1557 	frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1558 	DECR_CQCOUNT(&r_xprt->rx_ep);
1559 
1560 	rc = ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
1561 
1562 	if (rc) {
1563 		dprintk("RPC:       %s: failed ib_post_send for register,"
1564 			" status %i\n", __func__, rc);
1565 		while (i--)
1566 			rpcrdma_unmap_one(ia, --seg);
1567 	} else {
1568 		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1569 		seg1->mr_base = seg1->mr_dma + pageoff;
1570 		seg1->mr_nsegs = i;
1571 		seg1->mr_len = len;
1572 	}
1573 	*nsegs = i;
1574 	return rc;
1575 }
1576 
1577 static int
1578 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1579 			struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1580 {
1581 	struct rpcrdma_mr_seg *seg1 = seg;
1582 	struct ib_send_wr invalidate_wr, *bad_wr;
1583 	int rc;
1584 
1585 	while (seg1->mr_nsegs--)
1586 		rpcrdma_unmap_one(ia, seg++);
1587 
1588 	memset(&invalidate_wr, 0, sizeof invalidate_wr);
1589 	invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1590 	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1591 	invalidate_wr.send_flags = IB_SEND_SIGNALED;
1592 	invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1593 	DECR_CQCOUNT(&r_xprt->rx_ep);
1594 
1595 	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1596 	if (rc)
1597 		dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1598 			" status %i\n", __func__, rc);
1599 	return rc;
1600 }
1601 
1602 static int
1603 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1604 			int *nsegs, int writing, struct rpcrdma_ia *ia)
1605 {
1606 	struct rpcrdma_mr_seg *seg1 = seg;
1607 	u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1608 	int len, pageoff, i, rc;
1609 
1610 	pageoff = offset_in_page(seg1->mr_offset);
1611 	seg1->mr_offset -= pageoff;	/* start of page */
1612 	seg1->mr_len += pageoff;
1613 	len = -pageoff;
1614 	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1615 		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1616 	for (i = 0; i < *nsegs;) {
1617 		rpcrdma_map_one(ia, seg, writing);
1618 		physaddrs[i] = seg->mr_dma;
1619 		len += seg->mr_len;
1620 		++seg;
1621 		++i;
1622 		/* Check for holes */
1623 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1624 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1625 			break;
1626 	}
1627 	rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1628 				physaddrs, i, seg1->mr_dma);
1629 	if (rc) {
1630 		dprintk("RPC:       %s: failed ib_map_phys_fmr "
1631 			"%u@0x%llx+%i (%d)... status %i\n", __func__,
1632 			len, (unsigned long long)seg1->mr_dma,
1633 			pageoff, i, rc);
1634 		while (i--)
1635 			rpcrdma_unmap_one(ia, --seg);
1636 	} else {
1637 		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1638 		seg1->mr_base = seg1->mr_dma + pageoff;
1639 		seg1->mr_nsegs = i;
1640 		seg1->mr_len = len;
1641 	}
1642 	*nsegs = i;
1643 	return rc;
1644 }
1645 
1646 static int
1647 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1648 			struct rpcrdma_ia *ia)
1649 {
1650 	struct rpcrdma_mr_seg *seg1 = seg;
1651 	LIST_HEAD(l);
1652 	int rc;
1653 
1654 	list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1655 	rc = ib_unmap_fmr(&l);
1656 	while (seg1->mr_nsegs--)
1657 		rpcrdma_unmap_one(ia, seg++);
1658 	if (rc)
1659 		dprintk("RPC:       %s: failed ib_unmap_fmr,"
1660 			" status %i\n", __func__, rc);
1661 	return rc;
1662 }
1663 
1664 static int
1665 rpcrdma_register_memwin_external(struct rpcrdma_mr_seg *seg,
1666 			int *nsegs, int writing, struct rpcrdma_ia *ia,
1667 			struct rpcrdma_xprt *r_xprt)
1668 {
1669 	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1670 				  IB_ACCESS_REMOTE_READ);
1671 	struct ib_mw_bind param;
1672 	int rc;
1673 
1674 	*nsegs = 1;
1675 	rpcrdma_map_one(ia, seg, writing);
1676 	param.mr = ia->ri_bind_mem;
1677 	param.wr_id = 0ULL;	/* no send cookie */
1678 	param.addr = seg->mr_dma;
1679 	param.length = seg->mr_len;
1680 	param.send_flags = 0;
1681 	param.mw_access_flags = mem_priv;
1682 
1683 	DECR_CQCOUNT(&r_xprt->rx_ep);
1684 	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1685 	if (rc) {
1686 		dprintk("RPC:       %s: failed ib_bind_mw "
1687 			"%u@0x%llx status %i\n",
1688 			__func__, seg->mr_len,
1689 			(unsigned long long)seg->mr_dma, rc);
1690 		rpcrdma_unmap_one(ia, seg);
1691 	} else {
1692 		seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1693 		seg->mr_base = param.addr;
1694 		seg->mr_nsegs = 1;
1695 	}
1696 	return rc;
1697 }
1698 
1699 static int
1700 rpcrdma_deregister_memwin_external(struct rpcrdma_mr_seg *seg,
1701 			struct rpcrdma_ia *ia,
1702 			struct rpcrdma_xprt *r_xprt, void **r)
1703 {
1704 	struct ib_mw_bind param;
1705 	LIST_HEAD(l);
1706 	int rc;
1707 
1708 	BUG_ON(seg->mr_nsegs != 1);
1709 	param.mr = ia->ri_bind_mem;
1710 	param.addr = 0ULL;	/* unbind */
1711 	param.length = 0;
1712 	param.mw_access_flags = 0;
1713 	if (*r) {
1714 		param.wr_id = (u64) (unsigned long) *r;
1715 		param.send_flags = IB_SEND_SIGNALED;
1716 		INIT_CQCOUNT(&r_xprt->rx_ep);
1717 	} else {
1718 		param.wr_id = 0ULL;
1719 		param.send_flags = 0;
1720 		DECR_CQCOUNT(&r_xprt->rx_ep);
1721 	}
1722 	rc = ib_bind_mw(ia->ri_id->qp, seg->mr_chunk.rl_mw->r.mw, &param);
1723 	rpcrdma_unmap_one(ia, seg);
1724 	if (rc)
1725 		dprintk("RPC:       %s: failed ib_(un)bind_mw,"
1726 			" status %i\n", __func__, rc);
1727 	else
1728 		*r = NULL;	/* will upcall on completion */
1729 	return rc;
1730 }
1731 
1732 static int
1733 rpcrdma_register_default_external(struct rpcrdma_mr_seg *seg,
1734 			int *nsegs, int writing, struct rpcrdma_ia *ia)
1735 {
1736 	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1737 				  IB_ACCESS_REMOTE_READ);
1738 	struct rpcrdma_mr_seg *seg1 = seg;
1739 	struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1740 	int len, i, rc = 0;
1741 
1742 	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1743 		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1744 	for (len = 0, i = 0; i < *nsegs;) {
1745 		rpcrdma_map_one(ia, seg, writing);
1746 		ipb[i].addr = seg->mr_dma;
1747 		ipb[i].size = seg->mr_len;
1748 		len += seg->mr_len;
1749 		++seg;
1750 		++i;
1751 		/* Check for holes */
1752 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1753 		    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1754 			break;
1755 	}
1756 	seg1->mr_base = seg1->mr_dma;
1757 	seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1758 				ipb, i, mem_priv, &seg1->mr_base);
1759 	if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1760 		rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1761 		dprintk("RPC:       %s: failed ib_reg_phys_mr "
1762 			"%u@0x%llx (%d)... status %i\n",
1763 			__func__, len,
1764 			(unsigned long long)seg1->mr_dma, i, rc);
1765 		while (i--)
1766 			rpcrdma_unmap_one(ia, --seg);
1767 	} else {
1768 		seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1769 		seg1->mr_nsegs = i;
1770 		seg1->mr_len = len;
1771 	}
1772 	*nsegs = i;
1773 	return rc;
1774 }
1775 
1776 static int
1777 rpcrdma_deregister_default_external(struct rpcrdma_mr_seg *seg,
1778 			struct rpcrdma_ia *ia)
1779 {
1780 	struct rpcrdma_mr_seg *seg1 = seg;
1781 	int rc;
1782 
1783 	rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1784 	seg1->mr_chunk.rl_mr = NULL;
1785 	while (seg1->mr_nsegs--)
1786 		rpcrdma_unmap_one(ia, seg++);
1787 	if (rc)
1788 		dprintk("RPC:       %s: failed ib_dereg_mr,"
1789 			" status %i\n", __func__, rc);
1790 	return rc;
1791 }
1792 
1793 int
1794 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1795 			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1796 {
1797 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1798 	int rc = 0;
1799 
1800 	switch (ia->ri_memreg_strategy) {
1801 
1802 #if RPCRDMA_PERSISTENT_REGISTRATION
1803 	case RPCRDMA_ALLPHYSICAL:
1804 		rpcrdma_map_one(ia, seg, writing);
1805 		seg->mr_rkey = ia->ri_bind_mem->rkey;
1806 		seg->mr_base = seg->mr_dma;
1807 		seg->mr_nsegs = 1;
1808 		nsegs = 1;
1809 		break;
1810 #endif
1811 
1812 	/* Registration using frmr registration */
1813 	case RPCRDMA_FRMR:
1814 		rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1815 		break;
1816 
1817 	/* Registration using fmr memory registration */
1818 	case RPCRDMA_MTHCAFMR:
1819 		rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1820 		break;
1821 
1822 	/* Registration using memory windows */
1823 	case RPCRDMA_MEMWINDOWS_ASYNC:
1824 	case RPCRDMA_MEMWINDOWS:
1825 		rc = rpcrdma_register_memwin_external(seg, &nsegs, writing, ia, r_xprt);
1826 		break;
1827 
1828 	/* Default registration each time */
1829 	default:
1830 		rc = rpcrdma_register_default_external(seg, &nsegs, writing, ia);
1831 		break;
1832 	}
1833 	if (rc)
1834 		return -1;
1835 
1836 	return nsegs;
1837 }
1838 
1839 int
1840 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1841 		struct rpcrdma_xprt *r_xprt, void *r)
1842 {
1843 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1844 	int nsegs = seg->mr_nsegs, rc;
1845 
1846 	switch (ia->ri_memreg_strategy) {
1847 
1848 #if RPCRDMA_PERSISTENT_REGISTRATION
1849 	case RPCRDMA_ALLPHYSICAL:
1850 		BUG_ON(nsegs != 1);
1851 		rpcrdma_unmap_one(ia, seg);
1852 		rc = 0;
1853 		break;
1854 #endif
1855 
1856 	case RPCRDMA_FRMR:
1857 		rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1858 		break;
1859 
1860 	case RPCRDMA_MTHCAFMR:
1861 		rc = rpcrdma_deregister_fmr_external(seg, ia);
1862 		break;
1863 
1864 	case RPCRDMA_MEMWINDOWS_ASYNC:
1865 	case RPCRDMA_MEMWINDOWS:
1866 		rc = rpcrdma_deregister_memwin_external(seg, ia, r_xprt, &r);
1867 		break;
1868 
1869 	default:
1870 		rc = rpcrdma_deregister_default_external(seg, ia);
1871 		break;
1872 	}
1873 	if (r) {
1874 		struct rpcrdma_rep *rep = r;
1875 		void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1876 		rep->rr_func = NULL;
1877 		func(rep);	/* dereg done, callback now */
1878 	}
1879 	return nsegs;
1880 }
1881 
1882 /*
1883  * Prepost any receive buffer, then post send.
1884  *
1885  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1886  */
1887 int
1888 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1889 		struct rpcrdma_ep *ep,
1890 		struct rpcrdma_req *req)
1891 {
1892 	struct ib_send_wr send_wr, *send_wr_fail;
1893 	struct rpcrdma_rep *rep = req->rl_reply;
1894 	int rc;
1895 
1896 	if (rep) {
1897 		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1898 		if (rc)
1899 			goto out;
1900 		req->rl_reply = NULL;
1901 	}
1902 
1903 	send_wr.next = NULL;
1904 	send_wr.wr_id = 0ULL;	/* no send cookie */
1905 	send_wr.sg_list = req->rl_send_iov;
1906 	send_wr.num_sge = req->rl_niovs;
1907 	send_wr.opcode = IB_WR_SEND;
1908 	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
1909 		ib_dma_sync_single_for_device(ia->ri_id->device,
1910 			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1911 			DMA_TO_DEVICE);
1912 	ib_dma_sync_single_for_device(ia->ri_id->device,
1913 		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1914 		DMA_TO_DEVICE);
1915 	ib_dma_sync_single_for_device(ia->ri_id->device,
1916 		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1917 		DMA_TO_DEVICE);
1918 
1919 	if (DECR_CQCOUNT(ep) > 0)
1920 		send_wr.send_flags = 0;
1921 	else { /* Provider must take a send completion every now and then */
1922 		INIT_CQCOUNT(ep);
1923 		send_wr.send_flags = IB_SEND_SIGNALED;
1924 	}
1925 
1926 	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1927 	if (rc)
1928 		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1929 			rc);
1930 out:
1931 	return rc;
1932 }
1933 
1934 /*
1935  * (Re)post a receive buffer.
1936  */
1937 int
1938 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1939 		     struct rpcrdma_ep *ep,
1940 		     struct rpcrdma_rep *rep)
1941 {
1942 	struct ib_recv_wr recv_wr, *recv_wr_fail;
1943 	int rc;
1944 
1945 	recv_wr.next = NULL;
1946 	recv_wr.wr_id = (u64) (unsigned long) rep;
1947 	recv_wr.sg_list = &rep->rr_iov;
1948 	recv_wr.num_sge = 1;
1949 
1950 	ib_dma_sync_single_for_cpu(ia->ri_id->device,
1951 		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1952 
1953 	DECR_CQCOUNT(ep);
1954 	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1955 
1956 	if (rc)
1957 		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1958 			rc);
1959 	return rc;
1960 }
1961