xref: /openbmc/linux/net/sunrpc/xprtrdma/verbs.c (revision 96de0e252cedffad61b3cb5e05662c591898e69a)
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39 
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49 
50 #include <linux/pci.h>	/* for Tavor hack below */
51 
52 #include "xprt_rdma.h"
53 
54 /*
55  * Globals/Macros
56  */
57 
58 #ifdef RPC_DEBUG
59 # define RPCDBG_FACILITY	RPCDBG_TRANS
60 #endif
61 
62 /*
63  * internal functions
64  */
65 
66 /*
67  * handle replies in tasklet context, using a single, global list
68  * rdma tasklet function -- just turn around and call the func
69  * for all replies on the list
70  */
71 
72 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
73 static LIST_HEAD(rpcrdma_tasklets_g);
74 
75 static void
76 rpcrdma_run_tasklet(unsigned long data)
77 {
78 	struct rpcrdma_rep *rep;
79 	void (*func)(struct rpcrdma_rep *);
80 	unsigned long flags;
81 
82 	data = data;
83 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
84 	while (!list_empty(&rpcrdma_tasklets_g)) {
85 		rep = list_entry(rpcrdma_tasklets_g.next,
86 				 struct rpcrdma_rep, rr_list);
87 		list_del(&rep->rr_list);
88 		func = rep->rr_func;
89 		rep->rr_func = NULL;
90 		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
91 
92 		if (func)
93 			func(rep);
94 		else
95 			rpcrdma_recv_buffer_put(rep);
96 
97 		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
98 	}
99 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
100 }
101 
102 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
103 
104 static inline void
105 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
106 {
107 	unsigned long flags;
108 
109 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
110 	list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
111 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
112 	tasklet_schedule(&rpcrdma_tasklet_g);
113 }
114 
115 static void
116 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
117 {
118 	struct rpcrdma_ep *ep = context;
119 
120 	dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
121 		__func__, event->event, event->device->name, context);
122 	if (ep->rep_connected == 1) {
123 		ep->rep_connected = -EIO;
124 		ep->rep_func(ep);
125 		wake_up_all(&ep->rep_connect_wait);
126 	}
127 }
128 
129 static void
130 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
131 {
132 	struct rpcrdma_ep *ep = context;
133 
134 	dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
135 		__func__, event->event, event->device->name, context);
136 	if (ep->rep_connected == 1) {
137 		ep->rep_connected = -EIO;
138 		ep->rep_func(ep);
139 		wake_up_all(&ep->rep_connect_wait);
140 	}
141 }
142 
143 static inline
144 void rpcrdma_event_process(struct ib_wc *wc)
145 {
146 	struct rpcrdma_rep *rep =
147 			(struct rpcrdma_rep *)(unsigned long) wc->wr_id;
148 
149 	dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
150 		__func__, rep, wc->status, wc->opcode, wc->byte_len);
151 
152 	if (!rep) /* send or bind completion that we don't care about */
153 		return;
154 
155 	if (IB_WC_SUCCESS != wc->status) {
156 		dprintk("RPC:       %s: %s WC status %X, connection lost\n",
157 			__func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
158 			 wc->status);
159 		rep->rr_len = ~0U;
160 		rpcrdma_schedule_tasklet(rep);
161 		return;
162 	}
163 
164 	switch (wc->opcode) {
165 	case IB_WC_RECV:
166 		rep->rr_len = wc->byte_len;
167 		ib_dma_sync_single_for_cpu(
168 			rdmab_to_ia(rep->rr_buffer)->ri_id->device,
169 			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
170 		/* Keep (only) the most recent credits, after check validity */
171 		if (rep->rr_len >= 16) {
172 			struct rpcrdma_msg *p =
173 					(struct rpcrdma_msg *) rep->rr_base;
174 			unsigned int credits = ntohl(p->rm_credit);
175 			if (credits == 0) {
176 				dprintk("RPC:       %s: server"
177 					" dropped credits to 0!\n", __func__);
178 				/* don't deadlock */
179 				credits = 1;
180 			} else if (credits > rep->rr_buffer->rb_max_requests) {
181 				dprintk("RPC:       %s: server"
182 					" over-crediting: %d (%d)\n",
183 					__func__, credits,
184 					rep->rr_buffer->rb_max_requests);
185 				credits = rep->rr_buffer->rb_max_requests;
186 			}
187 			atomic_set(&rep->rr_buffer->rb_credits, credits);
188 		}
189 		/* fall through */
190 	case IB_WC_BIND_MW:
191 		rpcrdma_schedule_tasklet(rep);
192 		break;
193 	default:
194 		dprintk("RPC:       %s: unexpected WC event %X\n",
195 			__func__, wc->opcode);
196 		break;
197 	}
198 }
199 
200 static inline int
201 rpcrdma_cq_poll(struct ib_cq *cq)
202 {
203 	struct ib_wc wc;
204 	int rc;
205 
206 	for (;;) {
207 		rc = ib_poll_cq(cq, 1, &wc);
208 		if (rc < 0) {
209 			dprintk("RPC:       %s: ib_poll_cq failed %i\n",
210 				__func__, rc);
211 			return rc;
212 		}
213 		if (rc == 0)
214 			break;
215 
216 		rpcrdma_event_process(&wc);
217 	}
218 
219 	return 0;
220 }
221 
222 /*
223  * rpcrdma_cq_event_upcall
224  *
225  * This upcall handles recv, send, bind and unbind events.
226  * It is reentrant but processes single events in order to maintain
227  * ordering of receives to keep server credits.
228  *
229  * It is the responsibility of the scheduled tasklet to return
230  * recv buffers to the pool. NOTE: this affects synchronization of
231  * connection shutdown. That is, the structures required for
232  * the completion of the reply handler must remain intact until
233  * all memory has been reclaimed.
234  *
235  * Note that send events are suppressed and do not result in an upcall.
236  */
237 static void
238 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
239 {
240 	int rc;
241 
242 	rc = rpcrdma_cq_poll(cq);
243 	if (rc)
244 		return;
245 
246 	rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
247 	if (rc) {
248 		dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
249 			__func__, rc);
250 		return;
251 	}
252 
253 	rpcrdma_cq_poll(cq);
254 }
255 
256 #ifdef RPC_DEBUG
257 static const char * const conn[] = {
258 	"address resolved",
259 	"address error",
260 	"route resolved",
261 	"route error",
262 	"connect request",
263 	"connect response",
264 	"connect error",
265 	"unreachable",
266 	"rejected",
267 	"established",
268 	"disconnected",
269 	"device removal"
270 };
271 #endif
272 
273 static int
274 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
275 {
276 	struct rpcrdma_xprt *xprt = id->context;
277 	struct rpcrdma_ia *ia = &xprt->rx_ia;
278 	struct rpcrdma_ep *ep = &xprt->rx_ep;
279 	struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
280 	struct ib_qp_attr attr;
281 	struct ib_qp_init_attr iattr;
282 	int connstate = 0;
283 
284 	switch (event->event) {
285 	case RDMA_CM_EVENT_ADDR_RESOLVED:
286 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
287 		complete(&ia->ri_done);
288 		break;
289 	case RDMA_CM_EVENT_ADDR_ERROR:
290 		ia->ri_async_rc = -EHOSTUNREACH;
291 		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
292 			__func__, ep);
293 		complete(&ia->ri_done);
294 		break;
295 	case RDMA_CM_EVENT_ROUTE_ERROR:
296 		ia->ri_async_rc = -ENETUNREACH;
297 		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
298 			__func__, ep);
299 		complete(&ia->ri_done);
300 		break;
301 	case RDMA_CM_EVENT_ESTABLISHED:
302 		connstate = 1;
303 		ib_query_qp(ia->ri_id->qp, &attr,
304 			IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
305 			&iattr);
306 		dprintk("RPC:       %s: %d responder resources"
307 			" (%d initiator)\n",
308 			__func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
309 		goto connected;
310 	case RDMA_CM_EVENT_CONNECT_ERROR:
311 		connstate = -ENOTCONN;
312 		goto connected;
313 	case RDMA_CM_EVENT_UNREACHABLE:
314 		connstate = -ENETDOWN;
315 		goto connected;
316 	case RDMA_CM_EVENT_REJECTED:
317 		connstate = -ECONNREFUSED;
318 		goto connected;
319 	case RDMA_CM_EVENT_DISCONNECTED:
320 		connstate = -ECONNABORTED;
321 		goto connected;
322 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
323 		connstate = -ENODEV;
324 connected:
325 		dprintk("RPC:       %s: %s: %u.%u.%u.%u:%u"
326 			" (ep 0x%p event 0x%x)\n",
327 			__func__,
328 			(event->event <= 11) ? conn[event->event] :
329 						"unknown connection error",
330 			NIPQUAD(addr->sin_addr.s_addr),
331 			ntohs(addr->sin_port),
332 			ep, event->event);
333 		atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
334 		dprintk("RPC:       %s: %sconnected\n",
335 					__func__, connstate > 0 ? "" : "dis");
336 		ep->rep_connected = connstate;
337 		ep->rep_func(ep);
338 		wake_up_all(&ep->rep_connect_wait);
339 		break;
340 	default:
341 		ia->ri_async_rc = -EINVAL;
342 		dprintk("RPC:       %s: unexpected CM event %X\n",
343 			__func__, event->event);
344 		complete(&ia->ri_done);
345 		break;
346 	}
347 
348 	return 0;
349 }
350 
351 static struct rdma_cm_id *
352 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
353 			struct rpcrdma_ia *ia, struct sockaddr *addr)
354 {
355 	struct rdma_cm_id *id;
356 	int rc;
357 
358 	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
359 	if (IS_ERR(id)) {
360 		rc = PTR_ERR(id);
361 		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
362 			__func__, rc);
363 		return id;
364 	}
365 
366 	ia->ri_async_rc = 0;
367 	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
368 	if (rc) {
369 		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
370 			__func__, rc);
371 		goto out;
372 	}
373 	wait_for_completion(&ia->ri_done);
374 	rc = ia->ri_async_rc;
375 	if (rc)
376 		goto out;
377 
378 	ia->ri_async_rc = 0;
379 	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
380 	if (rc) {
381 		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
382 			__func__, rc);
383 		goto out;
384 	}
385 	wait_for_completion(&ia->ri_done);
386 	rc = ia->ri_async_rc;
387 	if (rc)
388 		goto out;
389 
390 	return id;
391 
392 out:
393 	rdma_destroy_id(id);
394 	return ERR_PTR(rc);
395 }
396 
397 /*
398  * Drain any cq, prior to teardown.
399  */
400 static void
401 rpcrdma_clean_cq(struct ib_cq *cq)
402 {
403 	struct ib_wc wc;
404 	int count = 0;
405 
406 	while (1 == ib_poll_cq(cq, 1, &wc))
407 		++count;
408 
409 	if (count)
410 		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
411 			__func__, count, wc.opcode);
412 }
413 
414 /*
415  * Exported functions.
416  */
417 
418 /*
419  * Open and initialize an Interface Adapter.
420  *  o initializes fields of struct rpcrdma_ia, including
421  *    interface and provider attributes and protection zone.
422  */
423 int
424 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
425 {
426 	int rc;
427 	struct rpcrdma_ia *ia = &xprt->rx_ia;
428 
429 	init_completion(&ia->ri_done);
430 
431 	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
432 	if (IS_ERR(ia->ri_id)) {
433 		rc = PTR_ERR(ia->ri_id);
434 		goto out1;
435 	}
436 
437 	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
438 	if (IS_ERR(ia->ri_pd)) {
439 		rc = PTR_ERR(ia->ri_pd);
440 		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
441 			__func__, rc);
442 		goto out2;
443 	}
444 
445 	/*
446 	 * Optionally obtain an underlying physical identity mapping in
447 	 * order to do a memory window-based bind. This base registration
448 	 * is protected from remote access - that is enabled only by binding
449 	 * for the specific bytes targeted during each RPC operation, and
450 	 * revoked after the corresponding completion similar to a storage
451 	 * adapter.
452 	 */
453 	if (memreg > RPCRDMA_REGISTER) {
454 		int mem_priv = IB_ACCESS_LOCAL_WRITE;
455 		switch (memreg) {
456 #if RPCRDMA_PERSISTENT_REGISTRATION
457 		case RPCRDMA_ALLPHYSICAL:
458 			mem_priv |= IB_ACCESS_REMOTE_WRITE;
459 			mem_priv |= IB_ACCESS_REMOTE_READ;
460 			break;
461 #endif
462 		case RPCRDMA_MEMWINDOWS_ASYNC:
463 		case RPCRDMA_MEMWINDOWS:
464 			mem_priv |= IB_ACCESS_MW_BIND;
465 			break;
466 		default:
467 			break;
468 		}
469 		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
470 		if (IS_ERR(ia->ri_bind_mem)) {
471 			printk(KERN_ALERT "%s: ib_get_dma_mr for "
472 				"phys register failed with %lX\n\t"
473 				"Will continue with degraded performance\n",
474 				__func__, PTR_ERR(ia->ri_bind_mem));
475 			memreg = RPCRDMA_REGISTER;
476 			ia->ri_bind_mem = NULL;
477 		}
478 	}
479 
480 	/* Else will do memory reg/dereg for each chunk */
481 	ia->ri_memreg_strategy = memreg;
482 
483 	return 0;
484 out2:
485 	rdma_destroy_id(ia->ri_id);
486 out1:
487 	return rc;
488 }
489 
490 /*
491  * Clean up/close an IA.
492  *   o if event handles and PD have been initialized, free them.
493  *   o close the IA
494  */
495 void
496 rpcrdma_ia_close(struct rpcrdma_ia *ia)
497 {
498 	int rc;
499 
500 	dprintk("RPC:       %s: entering\n", __func__);
501 	if (ia->ri_bind_mem != NULL) {
502 		rc = ib_dereg_mr(ia->ri_bind_mem);
503 		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
504 			__func__, rc);
505 	}
506 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id) && ia->ri_id->qp)
507 		rdma_destroy_qp(ia->ri_id);
508 	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
509 		rc = ib_dealloc_pd(ia->ri_pd);
510 		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
511 			__func__, rc);
512 	}
513 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id))
514 		rdma_destroy_id(ia->ri_id);
515 }
516 
517 /*
518  * Create unconnected endpoint.
519  */
520 int
521 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
522 				struct rpcrdma_create_data_internal *cdata)
523 {
524 	struct ib_device_attr devattr;
525 	int rc;
526 
527 	rc = ib_query_device(ia->ri_id->device, &devattr);
528 	if (rc) {
529 		dprintk("RPC:       %s: ib_query_device failed %d\n",
530 			__func__, rc);
531 		return rc;
532 	}
533 
534 	/* check provider's send/recv wr limits */
535 	if (cdata->max_requests > devattr.max_qp_wr)
536 		cdata->max_requests = devattr.max_qp_wr;
537 
538 	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
539 	ep->rep_attr.qp_context = ep;
540 	/* send_cq and recv_cq initialized below */
541 	ep->rep_attr.srq = NULL;
542 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
543 	switch (ia->ri_memreg_strategy) {
544 	case RPCRDMA_MEMWINDOWS_ASYNC:
545 	case RPCRDMA_MEMWINDOWS:
546 		/* Add room for mw_binds+unbinds - overkill! */
547 		ep->rep_attr.cap.max_send_wr++;
548 		ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
549 		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
550 			return -EINVAL;
551 		break;
552 	default:
553 		break;
554 	}
555 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
556 	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
557 	ep->rep_attr.cap.max_recv_sge = 1;
558 	ep->rep_attr.cap.max_inline_data = 0;
559 	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
560 	ep->rep_attr.qp_type = IB_QPT_RC;
561 	ep->rep_attr.port_num = ~0;
562 
563 	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
564 		"iovs: send %d recv %d\n",
565 		__func__,
566 		ep->rep_attr.cap.max_send_wr,
567 		ep->rep_attr.cap.max_recv_wr,
568 		ep->rep_attr.cap.max_send_sge,
569 		ep->rep_attr.cap.max_recv_sge);
570 
571 	/* set trigger for requesting send completion */
572 	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
573 	switch (ia->ri_memreg_strategy) {
574 	case RPCRDMA_MEMWINDOWS_ASYNC:
575 	case RPCRDMA_MEMWINDOWS:
576 		ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
577 		break;
578 	default:
579 		break;
580 	}
581 	if (ep->rep_cqinit <= 2)
582 		ep->rep_cqinit = 0;
583 	INIT_CQCOUNT(ep);
584 	ep->rep_ia = ia;
585 	init_waitqueue_head(&ep->rep_connect_wait);
586 
587 	/*
588 	 * Create a single cq for receive dto and mw_bind (only ever
589 	 * care about unbind, really). Send completions are suppressed.
590 	 * Use single threaded tasklet upcalls to maintain ordering.
591 	 */
592 	ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
593 				  rpcrdma_cq_async_error_upcall, NULL,
594 				  ep->rep_attr.cap.max_recv_wr +
595 				  ep->rep_attr.cap.max_send_wr + 1, 0);
596 	if (IS_ERR(ep->rep_cq)) {
597 		rc = PTR_ERR(ep->rep_cq);
598 		dprintk("RPC:       %s: ib_create_cq failed: %i\n",
599 			__func__, rc);
600 		goto out1;
601 	}
602 
603 	rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
604 	if (rc) {
605 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
606 			__func__, rc);
607 		goto out2;
608 	}
609 
610 	ep->rep_attr.send_cq = ep->rep_cq;
611 	ep->rep_attr.recv_cq = ep->rep_cq;
612 
613 	/* Initialize cma parameters */
614 
615 	/* RPC/RDMA does not use private data */
616 	ep->rep_remote_cma.private_data = NULL;
617 	ep->rep_remote_cma.private_data_len = 0;
618 
619 	/* Client offers RDMA Read but does not initiate */
620 	switch (ia->ri_memreg_strategy) {
621 	case RPCRDMA_BOUNCEBUFFERS:
622 		ep->rep_remote_cma.responder_resources = 0;
623 		break;
624 	case RPCRDMA_MTHCAFMR:
625 	case RPCRDMA_REGISTER:
626 		ep->rep_remote_cma.responder_resources = cdata->max_requests *
627 				(RPCRDMA_MAX_DATA_SEGS / 8);
628 		break;
629 	case RPCRDMA_MEMWINDOWS:
630 	case RPCRDMA_MEMWINDOWS_ASYNC:
631 #if RPCRDMA_PERSISTENT_REGISTRATION
632 	case RPCRDMA_ALLPHYSICAL:
633 #endif
634 		ep->rep_remote_cma.responder_resources = cdata->max_requests *
635 				(RPCRDMA_MAX_DATA_SEGS / 2);
636 		break;
637 	default:
638 		break;
639 	}
640 	if (ep->rep_remote_cma.responder_resources > devattr.max_qp_rd_atom)
641 		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
642 	ep->rep_remote_cma.initiator_depth = 0;
643 
644 	ep->rep_remote_cma.retry_count = 7;
645 	ep->rep_remote_cma.flow_control = 0;
646 	ep->rep_remote_cma.rnr_retry_count = 0;
647 
648 	return 0;
649 
650 out2:
651 	if (ib_destroy_cq(ep->rep_cq))
652 		;
653 out1:
654 	return rc;
655 }
656 
657 /*
658  * rpcrdma_ep_destroy
659  *
660  * Disconnect and destroy endpoint. After this, the only
661  * valid operations on the ep are to free it (if dynamically
662  * allocated) or re-create it.
663  *
664  * The caller's error handling must be sure to not leak the endpoint
665  * if this function fails.
666  */
667 int
668 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
669 {
670 	int rc;
671 
672 	dprintk("RPC:       %s: entering, connected is %d\n",
673 		__func__, ep->rep_connected);
674 
675 	if (ia->ri_id->qp) {
676 		rc = rpcrdma_ep_disconnect(ep, ia);
677 		if (rc)
678 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
679 				" returned %i\n", __func__, rc);
680 	}
681 
682 	ep->rep_func = NULL;
683 
684 	/* padding - could be done in rpcrdma_buffer_destroy... */
685 	if (ep->rep_pad_mr) {
686 		rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
687 		ep->rep_pad_mr = NULL;
688 	}
689 
690 	if (ia->ri_id->qp) {
691 		rdma_destroy_qp(ia->ri_id);
692 		ia->ri_id->qp = NULL;
693 	}
694 
695 	rpcrdma_clean_cq(ep->rep_cq);
696 	rc = ib_destroy_cq(ep->rep_cq);
697 	if (rc)
698 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
699 			__func__, rc);
700 
701 	return rc;
702 }
703 
704 /*
705  * Connect unconnected endpoint.
706  */
707 int
708 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
709 {
710 	struct rdma_cm_id *id;
711 	int rc = 0;
712 	int retry_count = 0;
713 	int reconnect = (ep->rep_connected != 0);
714 
715 	if (reconnect) {
716 		struct rpcrdma_xprt *xprt;
717 retry:
718 		rc = rpcrdma_ep_disconnect(ep, ia);
719 		if (rc && rc != -ENOTCONN)
720 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
721 				" status %i\n", __func__, rc);
722 		rpcrdma_clean_cq(ep->rep_cq);
723 
724 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
725 		id = rpcrdma_create_id(xprt, ia,
726 				(struct sockaddr *)&xprt->rx_data.addr);
727 		if (IS_ERR(id)) {
728 			rc = PTR_ERR(id);
729 			goto out;
730 		}
731 		/* TEMP TEMP TEMP - fail if new device:
732 		 * Deregister/remarshal *all* requests!
733 		 * Close and recreate adapter, pd, etc!
734 		 * Re-determine all attributes still sane!
735 		 * More stuff I haven't thought of!
736 		 * Rrrgh!
737 		 */
738 		if (ia->ri_id->device != id->device) {
739 			printk("RPC:       %s: can't reconnect on "
740 				"different device!\n", __func__);
741 			rdma_destroy_id(id);
742 			rc = -ENETDOWN;
743 			goto out;
744 		}
745 		/* END TEMP */
746 		rdma_destroy_id(ia->ri_id);
747 		ia->ri_id = id;
748 	}
749 
750 	rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
751 	if (rc) {
752 		dprintk("RPC:       %s: rdma_create_qp failed %i\n",
753 			__func__, rc);
754 		goto out;
755 	}
756 
757 /* XXX Tavor device performs badly with 2K MTU! */
758 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
759 	struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
760 	if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
761 	    (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
762 	     pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
763 		struct ib_qp_attr attr = {
764 			.path_mtu = IB_MTU_1024
765 		};
766 		rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
767 	}
768 }
769 
770 	/* Theoretically a client initiator_depth > 0 is not needed,
771 	 * but many peers fail to complete the connection unless they
772 	 * == responder_resources! */
773 	if (ep->rep_remote_cma.initiator_depth !=
774 				ep->rep_remote_cma.responder_resources)
775 		ep->rep_remote_cma.initiator_depth =
776 			ep->rep_remote_cma.responder_resources;
777 
778 	ep->rep_connected = 0;
779 
780 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
781 	if (rc) {
782 		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
783 				__func__, rc);
784 		goto out;
785 	}
786 
787 	if (reconnect)
788 		return 0;
789 
790 	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
791 
792 	/*
793 	 * Check state. A non-peer reject indicates no listener
794 	 * (ECONNREFUSED), which may be a transient state. All
795 	 * others indicate a transport condition which has already
796 	 * undergone a best-effort.
797 	 */
798 	if (ep->rep_connected == -ECONNREFUSED
799 	    && ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
800 		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
801 		goto retry;
802 	}
803 	if (ep->rep_connected <= 0) {
804 		/* Sometimes, the only way to reliably connect to remote
805 		 * CMs is to use same nonzero values for ORD and IRD. */
806 		ep->rep_remote_cma.initiator_depth =
807 					ep->rep_remote_cma.responder_resources;
808 		if (ep->rep_remote_cma.initiator_depth == 0)
809 			++ep->rep_remote_cma.initiator_depth;
810 		if (ep->rep_remote_cma.responder_resources == 0)
811 			++ep->rep_remote_cma.responder_resources;
812 		if (retry_count++ == 0)
813 			goto retry;
814 		rc = ep->rep_connected;
815 	} else {
816 		dprintk("RPC:       %s: connected\n", __func__);
817 	}
818 
819 out:
820 	if (rc)
821 		ep->rep_connected = rc;
822 	return rc;
823 }
824 
825 /*
826  * rpcrdma_ep_disconnect
827  *
828  * This is separate from destroy to facilitate the ability
829  * to reconnect without recreating the endpoint.
830  *
831  * This call is not reentrant, and must not be made in parallel
832  * on the same endpoint.
833  */
834 int
835 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
836 {
837 	int rc;
838 
839 	rpcrdma_clean_cq(ep->rep_cq);
840 	rc = rdma_disconnect(ia->ri_id);
841 	if (!rc) {
842 		/* returns without wait if not connected */
843 		wait_event_interruptible(ep->rep_connect_wait,
844 							ep->rep_connected != 1);
845 		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
846 			(ep->rep_connected == 1) ? "still " : "dis");
847 	} else {
848 		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
849 		ep->rep_connected = rc;
850 	}
851 	return rc;
852 }
853 
854 /*
855  * Initialize buffer memory
856  */
857 int
858 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
859 	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
860 {
861 	char *p;
862 	size_t len;
863 	int i, rc;
864 
865 	buf->rb_max_requests = cdata->max_requests;
866 	spin_lock_init(&buf->rb_lock);
867 	atomic_set(&buf->rb_credits, 1);
868 
869 	/* Need to allocate:
870 	 *   1.  arrays for send and recv pointers
871 	 *   2.  arrays of struct rpcrdma_req to fill in pointers
872 	 *   3.  array of struct rpcrdma_rep for replies
873 	 *   4.  padding, if any
874 	 *   5.  mw's, if any
875 	 * Send/recv buffers in req/rep need to be registered
876 	 */
877 
878 	len = buf->rb_max_requests *
879 		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
880 	len += cdata->padding;
881 	switch (ia->ri_memreg_strategy) {
882 	case RPCRDMA_MTHCAFMR:
883 		/* TBD we are perhaps overallocating here */
884 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
885 				sizeof(struct rpcrdma_mw);
886 		break;
887 	case RPCRDMA_MEMWINDOWS_ASYNC:
888 	case RPCRDMA_MEMWINDOWS:
889 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
890 				sizeof(struct rpcrdma_mw);
891 		break;
892 	default:
893 		break;
894 	}
895 
896 	/* allocate 1, 4 and 5 in one shot */
897 	p = kzalloc(len, GFP_KERNEL);
898 	if (p == NULL) {
899 		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
900 			__func__, len);
901 		rc = -ENOMEM;
902 		goto out;
903 	}
904 	buf->rb_pool = p;	/* for freeing it later */
905 
906 	buf->rb_send_bufs = (struct rpcrdma_req **) p;
907 	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
908 	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
909 	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
910 
911 	/*
912 	 * Register the zeroed pad buffer, if any.
913 	 */
914 	if (cdata->padding) {
915 		rc = rpcrdma_register_internal(ia, p, cdata->padding,
916 					    &ep->rep_pad_mr, &ep->rep_pad);
917 		if (rc)
918 			goto out;
919 	}
920 	p += cdata->padding;
921 
922 	/*
923 	 * Allocate the fmr's, or mw's for mw_bind chunk registration.
924 	 * We "cycle" the mw's in order to minimize rkey reuse,
925 	 * and also reduce unbind-to-bind collision.
926 	 */
927 	INIT_LIST_HEAD(&buf->rb_mws);
928 	switch (ia->ri_memreg_strategy) {
929 	case RPCRDMA_MTHCAFMR:
930 		{
931 		struct rpcrdma_mw *r = (struct rpcrdma_mw *)p;
932 		struct ib_fmr_attr fa = {
933 			RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT
934 		};
935 		/* TBD we are perhaps overallocating here */
936 		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
937 			r->r.fmr = ib_alloc_fmr(ia->ri_pd,
938 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
939 				&fa);
940 			if (IS_ERR(r->r.fmr)) {
941 				rc = PTR_ERR(r->r.fmr);
942 				dprintk("RPC:       %s: ib_alloc_fmr"
943 					" failed %i\n", __func__, rc);
944 				goto out;
945 			}
946 			list_add(&r->mw_list, &buf->rb_mws);
947 			++r;
948 		}
949 		}
950 		break;
951 	case RPCRDMA_MEMWINDOWS_ASYNC:
952 	case RPCRDMA_MEMWINDOWS:
953 		{
954 		struct rpcrdma_mw *r = (struct rpcrdma_mw *)p;
955 		/* Allocate one extra request's worth, for full cycling */
956 		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
957 			r->r.mw = ib_alloc_mw(ia->ri_pd);
958 			if (IS_ERR(r->r.mw)) {
959 				rc = PTR_ERR(r->r.mw);
960 				dprintk("RPC:       %s: ib_alloc_mw"
961 					" failed %i\n", __func__, rc);
962 				goto out;
963 			}
964 			list_add(&r->mw_list, &buf->rb_mws);
965 			++r;
966 		}
967 		}
968 		break;
969 	default:
970 		break;
971 	}
972 
973 	/*
974 	 * Allocate/init the request/reply buffers. Doing this
975 	 * using kmalloc for now -- one for each buf.
976 	 */
977 	for (i = 0; i < buf->rb_max_requests; i++) {
978 		struct rpcrdma_req *req;
979 		struct rpcrdma_rep *rep;
980 
981 		len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
982 		/* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
983 		/* Typical ~2400b, so rounding up saves work later */
984 		if (len < 4096)
985 			len = 4096;
986 		req = kmalloc(len, GFP_KERNEL);
987 		if (req == NULL) {
988 			dprintk("RPC:       %s: request buffer %d alloc"
989 				" failed\n", __func__, i);
990 			rc = -ENOMEM;
991 			goto out;
992 		}
993 		memset(req, 0, sizeof(struct rpcrdma_req));
994 		buf->rb_send_bufs[i] = req;
995 		buf->rb_send_bufs[i]->rl_buffer = buf;
996 
997 		rc = rpcrdma_register_internal(ia, req->rl_base,
998 				len - offsetof(struct rpcrdma_req, rl_base),
999 				&buf->rb_send_bufs[i]->rl_handle,
1000 				&buf->rb_send_bufs[i]->rl_iov);
1001 		if (rc)
1002 			goto out;
1003 
1004 		buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1005 
1006 		len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1007 		rep = kmalloc(len, GFP_KERNEL);
1008 		if (rep == NULL) {
1009 			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1010 				__func__, i);
1011 			rc = -ENOMEM;
1012 			goto out;
1013 		}
1014 		memset(rep, 0, sizeof(struct rpcrdma_rep));
1015 		buf->rb_recv_bufs[i] = rep;
1016 		buf->rb_recv_bufs[i]->rr_buffer = buf;
1017 		init_waitqueue_head(&rep->rr_unbind);
1018 
1019 		rc = rpcrdma_register_internal(ia, rep->rr_base,
1020 				len - offsetof(struct rpcrdma_rep, rr_base),
1021 				&buf->rb_recv_bufs[i]->rr_handle,
1022 				&buf->rb_recv_bufs[i]->rr_iov);
1023 		if (rc)
1024 			goto out;
1025 
1026 	}
1027 	dprintk("RPC:       %s: max_requests %d\n",
1028 		__func__, buf->rb_max_requests);
1029 	/* done */
1030 	return 0;
1031 out:
1032 	rpcrdma_buffer_destroy(buf);
1033 	return rc;
1034 }
1035 
1036 /*
1037  * Unregister and destroy buffer memory. Need to deal with
1038  * partial initialization, so it's callable from failed create.
1039  * Must be called before destroying endpoint, as registrations
1040  * reference it.
1041  */
1042 void
1043 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1044 {
1045 	int rc, i;
1046 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1047 
1048 	/* clean up in reverse order from create
1049 	 *   1.  recv mr memory (mr free, then kfree)
1050 	 *   1a. bind mw memory
1051 	 *   2.  send mr memory (mr free, then kfree)
1052 	 *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1053 	 *   4.  arrays
1054 	 */
1055 	dprintk("RPC:       %s: entering\n", __func__);
1056 
1057 	for (i = 0; i < buf->rb_max_requests; i++) {
1058 		if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1059 			rpcrdma_deregister_internal(ia,
1060 					buf->rb_recv_bufs[i]->rr_handle,
1061 					&buf->rb_recv_bufs[i]->rr_iov);
1062 			kfree(buf->rb_recv_bufs[i]);
1063 		}
1064 		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1065 			while (!list_empty(&buf->rb_mws)) {
1066 				struct rpcrdma_mw *r;
1067 				r = list_entry(buf->rb_mws.next,
1068 					struct rpcrdma_mw, mw_list);
1069 				list_del(&r->mw_list);
1070 				switch (ia->ri_memreg_strategy) {
1071 				case RPCRDMA_MTHCAFMR:
1072 					rc = ib_dealloc_fmr(r->r.fmr);
1073 					if (rc)
1074 						dprintk("RPC:       %s:"
1075 							" ib_dealloc_fmr"
1076 							" failed %i\n",
1077 							__func__, rc);
1078 					break;
1079 				case RPCRDMA_MEMWINDOWS_ASYNC:
1080 				case RPCRDMA_MEMWINDOWS:
1081 					rc = ib_dealloc_mw(r->r.mw);
1082 					if (rc)
1083 						dprintk("RPC:       %s:"
1084 							" ib_dealloc_mw"
1085 							" failed %i\n",
1086 							__func__, rc);
1087 					break;
1088 				default:
1089 					break;
1090 				}
1091 			}
1092 			rpcrdma_deregister_internal(ia,
1093 					buf->rb_send_bufs[i]->rl_handle,
1094 					&buf->rb_send_bufs[i]->rl_iov);
1095 			kfree(buf->rb_send_bufs[i]);
1096 		}
1097 	}
1098 
1099 	kfree(buf->rb_pool);
1100 }
1101 
1102 /*
1103  * Get a set of request/reply buffers.
1104  *
1105  * Reply buffer (if needed) is attached to send buffer upon return.
1106  * Rule:
1107  *    rb_send_index and rb_recv_index MUST always be pointing to the
1108  *    *next* available buffer (non-NULL). They are incremented after
1109  *    removing buffers, and decremented *before* returning them.
1110  */
1111 struct rpcrdma_req *
1112 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1113 {
1114 	struct rpcrdma_req *req;
1115 	unsigned long flags;
1116 
1117 	spin_lock_irqsave(&buffers->rb_lock, flags);
1118 	if (buffers->rb_send_index == buffers->rb_max_requests) {
1119 		spin_unlock_irqrestore(&buffers->rb_lock, flags);
1120 		dprintk("RPC:       %s: out of request buffers\n", __func__);
1121 		return ((struct rpcrdma_req *)NULL);
1122 	}
1123 
1124 	req = buffers->rb_send_bufs[buffers->rb_send_index];
1125 	if (buffers->rb_send_index < buffers->rb_recv_index) {
1126 		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1127 			__func__,
1128 			buffers->rb_recv_index - buffers->rb_send_index);
1129 		req->rl_reply = NULL;
1130 	} else {
1131 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1132 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1133 	}
1134 	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1135 	if (!list_empty(&buffers->rb_mws)) {
1136 		int i = RPCRDMA_MAX_SEGS - 1;
1137 		do {
1138 			struct rpcrdma_mw *r;
1139 			r = list_entry(buffers->rb_mws.next,
1140 					struct rpcrdma_mw, mw_list);
1141 			list_del(&r->mw_list);
1142 			req->rl_segments[i].mr_chunk.rl_mw = r;
1143 		} while (--i >= 0);
1144 	}
1145 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1146 	return req;
1147 }
1148 
1149 /*
1150  * Put request/reply buffers back into pool.
1151  * Pre-decrement counter/array index.
1152  */
1153 void
1154 rpcrdma_buffer_put(struct rpcrdma_req *req)
1155 {
1156 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1157 	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1158 	int i;
1159 	unsigned long flags;
1160 
1161 	BUG_ON(req->rl_nchunks != 0);
1162 	spin_lock_irqsave(&buffers->rb_lock, flags);
1163 	buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1164 	req->rl_niovs = 0;
1165 	if (req->rl_reply) {
1166 		buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1167 		init_waitqueue_head(&req->rl_reply->rr_unbind);
1168 		req->rl_reply->rr_func = NULL;
1169 		req->rl_reply = NULL;
1170 	}
1171 	switch (ia->ri_memreg_strategy) {
1172 	case RPCRDMA_MTHCAFMR:
1173 	case RPCRDMA_MEMWINDOWS_ASYNC:
1174 	case RPCRDMA_MEMWINDOWS:
1175 		/*
1176 		 * Cycle mw's back in reverse order, and "spin" them.
1177 		 * This delays and scrambles reuse as much as possible.
1178 		 */
1179 		i = 1;
1180 		do {
1181 			struct rpcrdma_mw **mw;
1182 			mw = &req->rl_segments[i].mr_chunk.rl_mw;
1183 			list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1184 			*mw = NULL;
1185 		} while (++i < RPCRDMA_MAX_SEGS);
1186 		list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1187 					&buffers->rb_mws);
1188 		req->rl_segments[0].mr_chunk.rl_mw = NULL;
1189 		break;
1190 	default:
1191 		break;
1192 	}
1193 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1194 }
1195 
1196 /*
1197  * Recover reply buffers from pool.
1198  * This happens when recovering from error conditions.
1199  * Post-increment counter/array index.
1200  */
1201 void
1202 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1203 {
1204 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1205 	unsigned long flags;
1206 
1207 	if (req->rl_iov.length == 0)	/* special case xprt_rdma_allocate() */
1208 		buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1209 	spin_lock_irqsave(&buffers->rb_lock, flags);
1210 	if (buffers->rb_recv_index < buffers->rb_max_requests) {
1211 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1212 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1213 	}
1214 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1215 }
1216 
1217 /*
1218  * Put reply buffers back into pool when not attached to
1219  * request. This happens in error conditions, and when
1220  * aborting unbinds. Pre-decrement counter/array index.
1221  */
1222 void
1223 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1224 {
1225 	struct rpcrdma_buffer *buffers = rep->rr_buffer;
1226 	unsigned long flags;
1227 
1228 	rep->rr_func = NULL;
1229 	spin_lock_irqsave(&buffers->rb_lock, flags);
1230 	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1231 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1232 }
1233 
1234 /*
1235  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1236  */
1237 
1238 int
1239 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1240 				struct ib_mr **mrp, struct ib_sge *iov)
1241 {
1242 	struct ib_phys_buf ipb;
1243 	struct ib_mr *mr;
1244 	int rc;
1245 
1246 	/*
1247 	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1248 	 */
1249 	iov->addr = ib_dma_map_single(ia->ri_id->device,
1250 			va, len, DMA_BIDIRECTIONAL);
1251 	iov->length = len;
1252 
1253 	if (ia->ri_bind_mem != NULL) {
1254 		*mrp = NULL;
1255 		iov->lkey = ia->ri_bind_mem->lkey;
1256 		return 0;
1257 	}
1258 
1259 	ipb.addr = iov->addr;
1260 	ipb.size = iov->length;
1261 	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1262 			IB_ACCESS_LOCAL_WRITE, &iov->addr);
1263 
1264 	dprintk("RPC:       %s: phys convert: 0x%llx "
1265 			"registered 0x%llx length %d\n",
1266 			__func__, (unsigned long long)ipb.addr,
1267 			(unsigned long long)iov->addr, len);
1268 
1269 	if (IS_ERR(mr)) {
1270 		*mrp = NULL;
1271 		rc = PTR_ERR(mr);
1272 		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1273 	} else {
1274 		*mrp = mr;
1275 		iov->lkey = mr->lkey;
1276 		rc = 0;
1277 	}
1278 
1279 	return rc;
1280 }
1281 
1282 int
1283 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1284 				struct ib_mr *mr, struct ib_sge *iov)
1285 {
1286 	int rc;
1287 
1288 	ib_dma_unmap_single(ia->ri_id->device,
1289 			iov->addr, iov->length, DMA_BIDIRECTIONAL);
1290 
1291 	if (NULL == mr)
1292 		return 0;
1293 
1294 	rc = ib_dereg_mr(mr);
1295 	if (rc)
1296 		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1297 	return rc;
1298 }
1299 
1300 /*
1301  * Wrappers for chunk registration, shared by read/write chunk code.
1302  */
1303 
1304 static void
1305 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1306 {
1307 	seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1308 	seg->mr_dmalen = seg->mr_len;
1309 	if (seg->mr_page)
1310 		seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1311 				seg->mr_page, offset_in_page(seg->mr_offset),
1312 				seg->mr_dmalen, seg->mr_dir);
1313 	else
1314 		seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1315 				seg->mr_offset,
1316 				seg->mr_dmalen, seg->mr_dir);
1317 }
1318 
1319 static void
1320 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1321 {
1322 	if (seg->mr_page)
1323 		ib_dma_unmap_page(ia->ri_id->device,
1324 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1325 	else
1326 		ib_dma_unmap_single(ia->ri_id->device,
1327 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1328 }
1329 
1330 int
1331 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1332 			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1333 {
1334 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1335 	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1336 				  IB_ACCESS_REMOTE_READ);
1337 	struct rpcrdma_mr_seg *seg1 = seg;
1338 	int i;
1339 	int rc = 0;
1340 
1341 	switch (ia->ri_memreg_strategy) {
1342 
1343 #if RPCRDMA_PERSISTENT_REGISTRATION
1344 	case RPCRDMA_ALLPHYSICAL:
1345 		rpcrdma_map_one(ia, seg, writing);
1346 		seg->mr_rkey = ia->ri_bind_mem->rkey;
1347 		seg->mr_base = seg->mr_dma;
1348 		seg->mr_nsegs = 1;
1349 		nsegs = 1;
1350 		break;
1351 #endif
1352 
1353 	/* Registration using fast memory registration */
1354 	case RPCRDMA_MTHCAFMR:
1355 		{
1356 		u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1357 		int len, pageoff = offset_in_page(seg->mr_offset);
1358 		seg1->mr_offset -= pageoff;	/* start of page */
1359 		seg1->mr_len += pageoff;
1360 		len = -pageoff;
1361 		if (nsegs > RPCRDMA_MAX_DATA_SEGS)
1362 			nsegs = RPCRDMA_MAX_DATA_SEGS;
1363 		for (i = 0; i < nsegs;) {
1364 			rpcrdma_map_one(ia, seg, writing);
1365 			physaddrs[i] = seg->mr_dma;
1366 			len += seg->mr_len;
1367 			++seg;
1368 			++i;
1369 			/* Check for holes */
1370 			if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
1371 			    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1372 				break;
1373 		}
1374 		nsegs = i;
1375 		rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1376 					physaddrs, nsegs, seg1->mr_dma);
1377 		if (rc) {
1378 			dprintk("RPC:       %s: failed ib_map_phys_fmr "
1379 				"%u@0x%llx+%i (%d)... status %i\n", __func__,
1380 				len, (unsigned long long)seg1->mr_dma,
1381 				pageoff, nsegs, rc);
1382 			while (nsegs--)
1383 				rpcrdma_unmap_one(ia, --seg);
1384 		} else {
1385 			seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1386 			seg1->mr_base = seg1->mr_dma + pageoff;
1387 			seg1->mr_nsegs = nsegs;
1388 			seg1->mr_len = len;
1389 		}
1390 		}
1391 		break;
1392 
1393 	/* Registration using memory windows */
1394 	case RPCRDMA_MEMWINDOWS_ASYNC:
1395 	case RPCRDMA_MEMWINDOWS:
1396 		{
1397 		struct ib_mw_bind param;
1398 		rpcrdma_map_one(ia, seg, writing);
1399 		param.mr = ia->ri_bind_mem;
1400 		param.wr_id = 0ULL;	/* no send cookie */
1401 		param.addr = seg->mr_dma;
1402 		param.length = seg->mr_len;
1403 		param.send_flags = 0;
1404 		param.mw_access_flags = mem_priv;
1405 
1406 		DECR_CQCOUNT(&r_xprt->rx_ep);
1407 		rc = ib_bind_mw(ia->ri_id->qp,
1408 					seg->mr_chunk.rl_mw->r.mw, &param);
1409 		if (rc) {
1410 			dprintk("RPC:       %s: failed ib_bind_mw "
1411 				"%u@0x%llx status %i\n",
1412 				__func__, seg->mr_len,
1413 				(unsigned long long)seg->mr_dma, rc);
1414 			rpcrdma_unmap_one(ia, seg);
1415 		} else {
1416 			seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1417 			seg->mr_base = param.addr;
1418 			seg->mr_nsegs = 1;
1419 			nsegs = 1;
1420 		}
1421 		}
1422 		break;
1423 
1424 	/* Default registration each time */
1425 	default:
1426 		{
1427 		struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1428 		int len = 0;
1429 		if (nsegs > RPCRDMA_MAX_DATA_SEGS)
1430 			nsegs = RPCRDMA_MAX_DATA_SEGS;
1431 		for (i = 0; i < nsegs;) {
1432 			rpcrdma_map_one(ia, seg, writing);
1433 			ipb[i].addr = seg->mr_dma;
1434 			ipb[i].size = seg->mr_len;
1435 			len += seg->mr_len;
1436 			++seg;
1437 			++i;
1438 			/* Check for holes */
1439 			if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
1440 			    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1441 				break;
1442 		}
1443 		nsegs = i;
1444 		seg1->mr_base = seg1->mr_dma;
1445 		seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1446 					ipb, nsegs, mem_priv, &seg1->mr_base);
1447 		if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1448 			rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1449 			dprintk("RPC:       %s: failed ib_reg_phys_mr "
1450 				"%u@0x%llx (%d)... status %i\n",
1451 				__func__, len,
1452 				(unsigned long long)seg1->mr_dma, nsegs, rc);
1453 			while (nsegs--)
1454 				rpcrdma_unmap_one(ia, --seg);
1455 		} else {
1456 			seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1457 			seg1->mr_nsegs = nsegs;
1458 			seg1->mr_len = len;
1459 		}
1460 		}
1461 		break;
1462 	}
1463 	if (rc)
1464 		return -1;
1465 
1466 	return nsegs;
1467 }
1468 
1469 int
1470 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1471 		struct rpcrdma_xprt *r_xprt, void *r)
1472 {
1473 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1474 	struct rpcrdma_mr_seg *seg1 = seg;
1475 	int nsegs = seg->mr_nsegs, rc;
1476 
1477 	switch (ia->ri_memreg_strategy) {
1478 
1479 #if RPCRDMA_PERSISTENT_REGISTRATION
1480 	case RPCRDMA_ALLPHYSICAL:
1481 		BUG_ON(nsegs != 1);
1482 		rpcrdma_unmap_one(ia, seg);
1483 		rc = 0;
1484 		break;
1485 #endif
1486 
1487 	case RPCRDMA_MTHCAFMR:
1488 		{
1489 		LIST_HEAD(l);
1490 		list_add(&seg->mr_chunk.rl_mw->r.fmr->list, &l);
1491 		rc = ib_unmap_fmr(&l);
1492 		while (seg1->mr_nsegs--)
1493 			rpcrdma_unmap_one(ia, seg++);
1494 		}
1495 		if (rc)
1496 			dprintk("RPC:       %s: failed ib_unmap_fmr,"
1497 				" status %i\n", __func__, rc);
1498 		break;
1499 
1500 	case RPCRDMA_MEMWINDOWS_ASYNC:
1501 	case RPCRDMA_MEMWINDOWS:
1502 		{
1503 		struct ib_mw_bind param;
1504 		BUG_ON(nsegs != 1);
1505 		param.mr = ia->ri_bind_mem;
1506 		param.addr = 0ULL;	/* unbind */
1507 		param.length = 0;
1508 		param.mw_access_flags = 0;
1509 		if (r) {
1510 			param.wr_id = (u64) (unsigned long) r;
1511 			param.send_flags = IB_SEND_SIGNALED;
1512 			INIT_CQCOUNT(&r_xprt->rx_ep);
1513 		} else {
1514 			param.wr_id = 0ULL;
1515 			param.send_flags = 0;
1516 			DECR_CQCOUNT(&r_xprt->rx_ep);
1517 		}
1518 		rc = ib_bind_mw(ia->ri_id->qp,
1519 				seg->mr_chunk.rl_mw->r.mw, &param);
1520 		rpcrdma_unmap_one(ia, seg);
1521 		}
1522 		if (rc)
1523 			dprintk("RPC:       %s: failed ib_(un)bind_mw,"
1524 				" status %i\n", __func__, rc);
1525 		else
1526 			r = NULL;	/* will upcall on completion */
1527 		break;
1528 
1529 	default:
1530 		rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1531 		seg1->mr_chunk.rl_mr = NULL;
1532 		while (seg1->mr_nsegs--)
1533 			rpcrdma_unmap_one(ia, seg++);
1534 		if (rc)
1535 			dprintk("RPC:       %s: failed ib_dereg_mr,"
1536 				" status %i\n", __func__, rc);
1537 		break;
1538 	}
1539 	if (r) {
1540 		struct rpcrdma_rep *rep = r;
1541 		void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1542 		rep->rr_func = NULL;
1543 		func(rep);	/* dereg done, callback now */
1544 	}
1545 	return nsegs;
1546 }
1547 
1548 /*
1549  * Prepost any receive buffer, then post send.
1550  *
1551  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1552  */
1553 int
1554 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1555 		struct rpcrdma_ep *ep,
1556 		struct rpcrdma_req *req)
1557 {
1558 	struct ib_send_wr send_wr, *send_wr_fail;
1559 	struct rpcrdma_rep *rep = req->rl_reply;
1560 	int rc;
1561 
1562 	if (rep) {
1563 		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1564 		if (rc)
1565 			goto out;
1566 		req->rl_reply = NULL;
1567 	}
1568 
1569 	send_wr.next = NULL;
1570 	send_wr.wr_id = 0ULL;	/* no send cookie */
1571 	send_wr.sg_list = req->rl_send_iov;
1572 	send_wr.num_sge = req->rl_niovs;
1573 	send_wr.opcode = IB_WR_SEND;
1574 	send_wr.imm_data = 0;
1575 	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
1576 		ib_dma_sync_single_for_device(ia->ri_id->device,
1577 			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1578 			DMA_TO_DEVICE);
1579 	ib_dma_sync_single_for_device(ia->ri_id->device,
1580 		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1581 		DMA_TO_DEVICE);
1582 	ib_dma_sync_single_for_device(ia->ri_id->device,
1583 		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1584 		DMA_TO_DEVICE);
1585 
1586 	if (DECR_CQCOUNT(ep) > 0)
1587 		send_wr.send_flags = 0;
1588 	else { /* Provider must take a send completion every now and then */
1589 		INIT_CQCOUNT(ep);
1590 		send_wr.send_flags = IB_SEND_SIGNALED;
1591 	}
1592 
1593 	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1594 	if (rc)
1595 		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1596 			rc);
1597 out:
1598 	return rc;
1599 }
1600 
1601 /*
1602  * (Re)post a receive buffer.
1603  */
1604 int
1605 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1606 		     struct rpcrdma_ep *ep,
1607 		     struct rpcrdma_rep *rep)
1608 {
1609 	struct ib_recv_wr recv_wr, *recv_wr_fail;
1610 	int rc;
1611 
1612 	recv_wr.next = NULL;
1613 	recv_wr.wr_id = (u64) (unsigned long) rep;
1614 	recv_wr.sg_list = &rep->rr_iov;
1615 	recv_wr.num_sge = 1;
1616 
1617 	ib_dma_sync_single_for_cpu(ia->ri_id->device,
1618 		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1619 
1620 	DECR_CQCOUNT(ep);
1621 	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1622 
1623 	if (rc)
1624 		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1625 			rc);
1626 	return rc;
1627 }
1628