xref: /openbmc/linux/net/sunrpc/xprtrdma/verbs.c (revision f42b3800)
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39 
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49 
50 #include <linux/pci.h>	/* for Tavor hack below */
51 
52 #include "xprt_rdma.h"
53 
54 /*
55  * Globals/Macros
56  */
57 
58 #ifdef RPC_DEBUG
59 # define RPCDBG_FACILITY	RPCDBG_TRANS
60 #endif
61 
62 /*
63  * internal functions
64  */
65 
66 /*
67  * handle replies in tasklet context, using a single, global list
68  * rdma tasklet function -- just turn around and call the func
69  * for all replies on the list
70  */
71 
72 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
73 static LIST_HEAD(rpcrdma_tasklets_g);
74 
75 static void
76 rpcrdma_run_tasklet(unsigned long data)
77 {
78 	struct rpcrdma_rep *rep;
79 	void (*func)(struct rpcrdma_rep *);
80 	unsigned long flags;
81 
82 	data = data;
83 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
84 	while (!list_empty(&rpcrdma_tasklets_g)) {
85 		rep = list_entry(rpcrdma_tasklets_g.next,
86 				 struct rpcrdma_rep, rr_list);
87 		list_del(&rep->rr_list);
88 		func = rep->rr_func;
89 		rep->rr_func = NULL;
90 		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
91 
92 		if (func)
93 			func(rep);
94 		else
95 			rpcrdma_recv_buffer_put(rep);
96 
97 		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
98 	}
99 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
100 }
101 
102 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
103 
104 static inline void
105 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
106 {
107 	unsigned long flags;
108 
109 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
110 	list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
111 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
112 	tasklet_schedule(&rpcrdma_tasklet_g);
113 }
114 
115 static void
116 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
117 {
118 	struct rpcrdma_ep *ep = context;
119 
120 	dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
121 		__func__, event->event, event->device->name, context);
122 	if (ep->rep_connected == 1) {
123 		ep->rep_connected = -EIO;
124 		ep->rep_func(ep);
125 		wake_up_all(&ep->rep_connect_wait);
126 	}
127 }
128 
129 static void
130 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
131 {
132 	struct rpcrdma_ep *ep = context;
133 
134 	dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
135 		__func__, event->event, event->device->name, context);
136 	if (ep->rep_connected == 1) {
137 		ep->rep_connected = -EIO;
138 		ep->rep_func(ep);
139 		wake_up_all(&ep->rep_connect_wait);
140 	}
141 }
142 
143 static inline
144 void rpcrdma_event_process(struct ib_wc *wc)
145 {
146 	struct rpcrdma_rep *rep =
147 			(struct rpcrdma_rep *)(unsigned long) wc->wr_id;
148 
149 	dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
150 		__func__, rep, wc->status, wc->opcode, wc->byte_len);
151 
152 	if (!rep) /* send or bind completion that we don't care about */
153 		return;
154 
155 	if (IB_WC_SUCCESS != wc->status) {
156 		dprintk("RPC:       %s: %s WC status %X, connection lost\n",
157 			__func__, (wc->opcode & IB_WC_RECV) ? "recv" : "send",
158 			 wc->status);
159 		rep->rr_len = ~0U;
160 		rpcrdma_schedule_tasklet(rep);
161 		return;
162 	}
163 
164 	switch (wc->opcode) {
165 	case IB_WC_RECV:
166 		rep->rr_len = wc->byte_len;
167 		ib_dma_sync_single_for_cpu(
168 			rdmab_to_ia(rep->rr_buffer)->ri_id->device,
169 			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
170 		/* Keep (only) the most recent credits, after check validity */
171 		if (rep->rr_len >= 16) {
172 			struct rpcrdma_msg *p =
173 					(struct rpcrdma_msg *) rep->rr_base;
174 			unsigned int credits = ntohl(p->rm_credit);
175 			if (credits == 0) {
176 				dprintk("RPC:       %s: server"
177 					" dropped credits to 0!\n", __func__);
178 				/* don't deadlock */
179 				credits = 1;
180 			} else if (credits > rep->rr_buffer->rb_max_requests) {
181 				dprintk("RPC:       %s: server"
182 					" over-crediting: %d (%d)\n",
183 					__func__, credits,
184 					rep->rr_buffer->rb_max_requests);
185 				credits = rep->rr_buffer->rb_max_requests;
186 			}
187 			atomic_set(&rep->rr_buffer->rb_credits, credits);
188 		}
189 		/* fall through */
190 	case IB_WC_BIND_MW:
191 		rpcrdma_schedule_tasklet(rep);
192 		break;
193 	default:
194 		dprintk("RPC:       %s: unexpected WC event %X\n",
195 			__func__, wc->opcode);
196 		break;
197 	}
198 }
199 
200 static inline int
201 rpcrdma_cq_poll(struct ib_cq *cq)
202 {
203 	struct ib_wc wc;
204 	int rc;
205 
206 	for (;;) {
207 		rc = ib_poll_cq(cq, 1, &wc);
208 		if (rc < 0) {
209 			dprintk("RPC:       %s: ib_poll_cq failed %i\n",
210 				__func__, rc);
211 			return rc;
212 		}
213 		if (rc == 0)
214 			break;
215 
216 		rpcrdma_event_process(&wc);
217 	}
218 
219 	return 0;
220 }
221 
222 /*
223  * rpcrdma_cq_event_upcall
224  *
225  * This upcall handles recv, send, bind and unbind events.
226  * It is reentrant but processes single events in order to maintain
227  * ordering of receives to keep server credits.
228  *
229  * It is the responsibility of the scheduled tasklet to return
230  * recv buffers to the pool. NOTE: this affects synchronization of
231  * connection shutdown. That is, the structures required for
232  * the completion of the reply handler must remain intact until
233  * all memory has been reclaimed.
234  *
235  * Note that send events are suppressed and do not result in an upcall.
236  */
237 static void
238 rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
239 {
240 	int rc;
241 
242 	rc = rpcrdma_cq_poll(cq);
243 	if (rc)
244 		return;
245 
246 	rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
247 	if (rc) {
248 		dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
249 			__func__, rc);
250 		return;
251 	}
252 
253 	rpcrdma_cq_poll(cq);
254 }
255 
256 #ifdef RPC_DEBUG
257 static const char * const conn[] = {
258 	"address resolved",
259 	"address error",
260 	"route resolved",
261 	"route error",
262 	"connect request",
263 	"connect response",
264 	"connect error",
265 	"unreachable",
266 	"rejected",
267 	"established",
268 	"disconnected",
269 	"device removal"
270 };
271 #endif
272 
273 static int
274 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
275 {
276 	struct rpcrdma_xprt *xprt = id->context;
277 	struct rpcrdma_ia *ia = &xprt->rx_ia;
278 	struct rpcrdma_ep *ep = &xprt->rx_ep;
279 	struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
280 	struct ib_qp_attr attr;
281 	struct ib_qp_init_attr iattr;
282 	int connstate = 0;
283 
284 	switch (event->event) {
285 	case RDMA_CM_EVENT_ADDR_RESOLVED:
286 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
287 		complete(&ia->ri_done);
288 		break;
289 	case RDMA_CM_EVENT_ADDR_ERROR:
290 		ia->ri_async_rc = -EHOSTUNREACH;
291 		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
292 			__func__, ep);
293 		complete(&ia->ri_done);
294 		break;
295 	case RDMA_CM_EVENT_ROUTE_ERROR:
296 		ia->ri_async_rc = -ENETUNREACH;
297 		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
298 			__func__, ep);
299 		complete(&ia->ri_done);
300 		break;
301 	case RDMA_CM_EVENT_ESTABLISHED:
302 		connstate = 1;
303 		ib_query_qp(ia->ri_id->qp, &attr,
304 			IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
305 			&iattr);
306 		dprintk("RPC:       %s: %d responder resources"
307 			" (%d initiator)\n",
308 			__func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
309 		goto connected;
310 	case RDMA_CM_EVENT_CONNECT_ERROR:
311 		connstate = -ENOTCONN;
312 		goto connected;
313 	case RDMA_CM_EVENT_UNREACHABLE:
314 		connstate = -ENETDOWN;
315 		goto connected;
316 	case RDMA_CM_EVENT_REJECTED:
317 		connstate = -ECONNREFUSED;
318 		goto connected;
319 	case RDMA_CM_EVENT_DISCONNECTED:
320 		connstate = -ECONNABORTED;
321 		goto connected;
322 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
323 		connstate = -ENODEV;
324 connected:
325 		dprintk("RPC:       %s: %s: %u.%u.%u.%u:%u"
326 			" (ep 0x%p event 0x%x)\n",
327 			__func__,
328 			(event->event <= 11) ? conn[event->event] :
329 						"unknown connection error",
330 			NIPQUAD(addr->sin_addr.s_addr),
331 			ntohs(addr->sin_port),
332 			ep, event->event);
333 		atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
334 		dprintk("RPC:       %s: %sconnected\n",
335 					__func__, connstate > 0 ? "" : "dis");
336 		ep->rep_connected = connstate;
337 		ep->rep_func(ep);
338 		wake_up_all(&ep->rep_connect_wait);
339 		break;
340 	default:
341 		ia->ri_async_rc = -EINVAL;
342 		dprintk("RPC:       %s: unexpected CM event %X\n",
343 			__func__, event->event);
344 		complete(&ia->ri_done);
345 		break;
346 	}
347 
348 	return 0;
349 }
350 
351 static struct rdma_cm_id *
352 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
353 			struct rpcrdma_ia *ia, struct sockaddr *addr)
354 {
355 	struct rdma_cm_id *id;
356 	int rc;
357 
358 	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP);
359 	if (IS_ERR(id)) {
360 		rc = PTR_ERR(id);
361 		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
362 			__func__, rc);
363 		return id;
364 	}
365 
366 	ia->ri_async_rc = 0;
367 	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
368 	if (rc) {
369 		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
370 			__func__, rc);
371 		goto out;
372 	}
373 	wait_for_completion(&ia->ri_done);
374 	rc = ia->ri_async_rc;
375 	if (rc)
376 		goto out;
377 
378 	ia->ri_async_rc = 0;
379 	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
380 	if (rc) {
381 		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
382 			__func__, rc);
383 		goto out;
384 	}
385 	wait_for_completion(&ia->ri_done);
386 	rc = ia->ri_async_rc;
387 	if (rc)
388 		goto out;
389 
390 	return id;
391 
392 out:
393 	rdma_destroy_id(id);
394 	return ERR_PTR(rc);
395 }
396 
397 /*
398  * Drain any cq, prior to teardown.
399  */
400 static void
401 rpcrdma_clean_cq(struct ib_cq *cq)
402 {
403 	struct ib_wc wc;
404 	int count = 0;
405 
406 	while (1 == ib_poll_cq(cq, 1, &wc))
407 		++count;
408 
409 	if (count)
410 		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
411 			__func__, count, wc.opcode);
412 }
413 
414 /*
415  * Exported functions.
416  */
417 
418 /*
419  * Open and initialize an Interface Adapter.
420  *  o initializes fields of struct rpcrdma_ia, including
421  *    interface and provider attributes and protection zone.
422  */
423 int
424 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
425 {
426 	int rc;
427 	struct rpcrdma_ia *ia = &xprt->rx_ia;
428 
429 	init_completion(&ia->ri_done);
430 
431 	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
432 	if (IS_ERR(ia->ri_id)) {
433 		rc = PTR_ERR(ia->ri_id);
434 		goto out1;
435 	}
436 
437 	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
438 	if (IS_ERR(ia->ri_pd)) {
439 		rc = PTR_ERR(ia->ri_pd);
440 		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
441 			__func__, rc);
442 		goto out2;
443 	}
444 
445 	/*
446 	 * Optionally obtain an underlying physical identity mapping in
447 	 * order to do a memory window-based bind. This base registration
448 	 * is protected from remote access - that is enabled only by binding
449 	 * for the specific bytes targeted during each RPC operation, and
450 	 * revoked after the corresponding completion similar to a storage
451 	 * adapter.
452 	 */
453 	if (memreg > RPCRDMA_REGISTER) {
454 		int mem_priv = IB_ACCESS_LOCAL_WRITE;
455 		switch (memreg) {
456 #if RPCRDMA_PERSISTENT_REGISTRATION
457 		case RPCRDMA_ALLPHYSICAL:
458 			mem_priv |= IB_ACCESS_REMOTE_WRITE;
459 			mem_priv |= IB_ACCESS_REMOTE_READ;
460 			break;
461 #endif
462 		case RPCRDMA_MEMWINDOWS_ASYNC:
463 		case RPCRDMA_MEMWINDOWS:
464 			mem_priv |= IB_ACCESS_MW_BIND;
465 			break;
466 		default:
467 			break;
468 		}
469 		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
470 		if (IS_ERR(ia->ri_bind_mem)) {
471 			printk(KERN_ALERT "%s: ib_get_dma_mr for "
472 				"phys register failed with %lX\n\t"
473 				"Will continue with degraded performance\n",
474 				__func__, PTR_ERR(ia->ri_bind_mem));
475 			memreg = RPCRDMA_REGISTER;
476 			ia->ri_bind_mem = NULL;
477 		}
478 	}
479 
480 	/* Else will do memory reg/dereg for each chunk */
481 	ia->ri_memreg_strategy = memreg;
482 
483 	return 0;
484 out2:
485 	rdma_destroy_id(ia->ri_id);
486 out1:
487 	return rc;
488 }
489 
490 /*
491  * Clean up/close an IA.
492  *   o if event handles and PD have been initialized, free them.
493  *   o close the IA
494  */
495 void
496 rpcrdma_ia_close(struct rpcrdma_ia *ia)
497 {
498 	int rc;
499 
500 	dprintk("RPC:       %s: entering\n", __func__);
501 	if (ia->ri_bind_mem != NULL) {
502 		rc = ib_dereg_mr(ia->ri_bind_mem);
503 		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
504 			__func__, rc);
505 	}
506 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id) && ia->ri_id->qp)
507 		rdma_destroy_qp(ia->ri_id);
508 	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
509 		rc = ib_dealloc_pd(ia->ri_pd);
510 		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
511 			__func__, rc);
512 	}
513 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id))
514 		rdma_destroy_id(ia->ri_id);
515 }
516 
517 /*
518  * Create unconnected endpoint.
519  */
520 int
521 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
522 				struct rpcrdma_create_data_internal *cdata)
523 {
524 	struct ib_device_attr devattr;
525 	int rc, err;
526 
527 	rc = ib_query_device(ia->ri_id->device, &devattr);
528 	if (rc) {
529 		dprintk("RPC:       %s: ib_query_device failed %d\n",
530 			__func__, rc);
531 		return rc;
532 	}
533 
534 	/* check provider's send/recv wr limits */
535 	if (cdata->max_requests > devattr.max_qp_wr)
536 		cdata->max_requests = devattr.max_qp_wr;
537 
538 	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
539 	ep->rep_attr.qp_context = ep;
540 	/* send_cq and recv_cq initialized below */
541 	ep->rep_attr.srq = NULL;
542 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
543 	switch (ia->ri_memreg_strategy) {
544 	case RPCRDMA_MEMWINDOWS_ASYNC:
545 	case RPCRDMA_MEMWINDOWS:
546 		/* Add room for mw_binds+unbinds - overkill! */
547 		ep->rep_attr.cap.max_send_wr++;
548 		ep->rep_attr.cap.max_send_wr *= (2 * RPCRDMA_MAX_SEGS);
549 		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr)
550 			return -EINVAL;
551 		break;
552 	default:
553 		break;
554 	}
555 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
556 	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
557 	ep->rep_attr.cap.max_recv_sge = 1;
558 	ep->rep_attr.cap.max_inline_data = 0;
559 	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
560 	ep->rep_attr.qp_type = IB_QPT_RC;
561 	ep->rep_attr.port_num = ~0;
562 
563 	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
564 		"iovs: send %d recv %d\n",
565 		__func__,
566 		ep->rep_attr.cap.max_send_wr,
567 		ep->rep_attr.cap.max_recv_wr,
568 		ep->rep_attr.cap.max_send_sge,
569 		ep->rep_attr.cap.max_recv_sge);
570 
571 	/* set trigger for requesting send completion */
572 	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
573 	switch (ia->ri_memreg_strategy) {
574 	case RPCRDMA_MEMWINDOWS_ASYNC:
575 	case RPCRDMA_MEMWINDOWS:
576 		ep->rep_cqinit -= RPCRDMA_MAX_SEGS;
577 		break;
578 	default:
579 		break;
580 	}
581 	if (ep->rep_cqinit <= 2)
582 		ep->rep_cqinit = 0;
583 	INIT_CQCOUNT(ep);
584 	ep->rep_ia = ia;
585 	init_waitqueue_head(&ep->rep_connect_wait);
586 
587 	/*
588 	 * Create a single cq for receive dto and mw_bind (only ever
589 	 * care about unbind, really). Send completions are suppressed.
590 	 * Use single threaded tasklet upcalls to maintain ordering.
591 	 */
592 	ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
593 				  rpcrdma_cq_async_error_upcall, NULL,
594 				  ep->rep_attr.cap.max_recv_wr +
595 				  ep->rep_attr.cap.max_send_wr + 1, 0);
596 	if (IS_ERR(ep->rep_cq)) {
597 		rc = PTR_ERR(ep->rep_cq);
598 		dprintk("RPC:       %s: ib_create_cq failed: %i\n",
599 			__func__, rc);
600 		goto out1;
601 	}
602 
603 	rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
604 	if (rc) {
605 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
606 			__func__, rc);
607 		goto out2;
608 	}
609 
610 	ep->rep_attr.send_cq = ep->rep_cq;
611 	ep->rep_attr.recv_cq = ep->rep_cq;
612 
613 	/* Initialize cma parameters */
614 
615 	/* RPC/RDMA does not use private data */
616 	ep->rep_remote_cma.private_data = NULL;
617 	ep->rep_remote_cma.private_data_len = 0;
618 
619 	/* Client offers RDMA Read but does not initiate */
620 	switch (ia->ri_memreg_strategy) {
621 	case RPCRDMA_BOUNCEBUFFERS:
622 		ep->rep_remote_cma.responder_resources = 0;
623 		break;
624 	case RPCRDMA_MTHCAFMR:
625 	case RPCRDMA_REGISTER:
626 		ep->rep_remote_cma.responder_resources = cdata->max_requests *
627 				(RPCRDMA_MAX_DATA_SEGS / 8);
628 		break;
629 	case RPCRDMA_MEMWINDOWS:
630 	case RPCRDMA_MEMWINDOWS_ASYNC:
631 #if RPCRDMA_PERSISTENT_REGISTRATION
632 	case RPCRDMA_ALLPHYSICAL:
633 #endif
634 		ep->rep_remote_cma.responder_resources = cdata->max_requests *
635 				(RPCRDMA_MAX_DATA_SEGS / 2);
636 		break;
637 	default:
638 		break;
639 	}
640 	if (ep->rep_remote_cma.responder_resources > devattr.max_qp_rd_atom)
641 		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
642 	ep->rep_remote_cma.initiator_depth = 0;
643 
644 	ep->rep_remote_cma.retry_count = 7;
645 	ep->rep_remote_cma.flow_control = 0;
646 	ep->rep_remote_cma.rnr_retry_count = 0;
647 
648 	return 0;
649 
650 out2:
651 	err = ib_destroy_cq(ep->rep_cq);
652 	if (err)
653 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
654 			__func__, err);
655 out1:
656 	return rc;
657 }
658 
659 /*
660  * rpcrdma_ep_destroy
661  *
662  * Disconnect and destroy endpoint. After this, the only
663  * valid operations on the ep are to free it (if dynamically
664  * allocated) or re-create it.
665  *
666  * The caller's error handling must be sure to not leak the endpoint
667  * if this function fails.
668  */
669 int
670 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
671 {
672 	int rc;
673 
674 	dprintk("RPC:       %s: entering, connected is %d\n",
675 		__func__, ep->rep_connected);
676 
677 	if (ia->ri_id->qp) {
678 		rc = rpcrdma_ep_disconnect(ep, ia);
679 		if (rc)
680 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
681 				" returned %i\n", __func__, rc);
682 	}
683 
684 	ep->rep_func = NULL;
685 
686 	/* padding - could be done in rpcrdma_buffer_destroy... */
687 	if (ep->rep_pad_mr) {
688 		rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
689 		ep->rep_pad_mr = NULL;
690 	}
691 
692 	if (ia->ri_id->qp) {
693 		rdma_destroy_qp(ia->ri_id);
694 		ia->ri_id->qp = NULL;
695 	}
696 
697 	rpcrdma_clean_cq(ep->rep_cq);
698 	rc = ib_destroy_cq(ep->rep_cq);
699 	if (rc)
700 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
701 			__func__, rc);
702 
703 	return rc;
704 }
705 
706 /*
707  * Connect unconnected endpoint.
708  */
709 int
710 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
711 {
712 	struct rdma_cm_id *id;
713 	int rc = 0;
714 	int retry_count = 0;
715 	int reconnect = (ep->rep_connected != 0);
716 
717 	if (reconnect) {
718 		struct rpcrdma_xprt *xprt;
719 retry:
720 		rc = rpcrdma_ep_disconnect(ep, ia);
721 		if (rc && rc != -ENOTCONN)
722 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
723 				" status %i\n", __func__, rc);
724 		rpcrdma_clean_cq(ep->rep_cq);
725 
726 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
727 		id = rpcrdma_create_id(xprt, ia,
728 				(struct sockaddr *)&xprt->rx_data.addr);
729 		if (IS_ERR(id)) {
730 			rc = PTR_ERR(id);
731 			goto out;
732 		}
733 		/* TEMP TEMP TEMP - fail if new device:
734 		 * Deregister/remarshal *all* requests!
735 		 * Close and recreate adapter, pd, etc!
736 		 * Re-determine all attributes still sane!
737 		 * More stuff I haven't thought of!
738 		 * Rrrgh!
739 		 */
740 		if (ia->ri_id->device != id->device) {
741 			printk("RPC:       %s: can't reconnect on "
742 				"different device!\n", __func__);
743 			rdma_destroy_id(id);
744 			rc = -ENETDOWN;
745 			goto out;
746 		}
747 		/* END TEMP */
748 		rdma_destroy_id(ia->ri_id);
749 		ia->ri_id = id;
750 	}
751 
752 	rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
753 	if (rc) {
754 		dprintk("RPC:       %s: rdma_create_qp failed %i\n",
755 			__func__, rc);
756 		goto out;
757 	}
758 
759 /* XXX Tavor device performs badly with 2K MTU! */
760 if (strnicmp(ia->ri_id->device->dma_device->bus->name, "pci", 3) == 0) {
761 	struct pci_dev *pcid = to_pci_dev(ia->ri_id->device->dma_device);
762 	if (pcid->device == PCI_DEVICE_ID_MELLANOX_TAVOR &&
763 	    (pcid->vendor == PCI_VENDOR_ID_MELLANOX ||
764 	     pcid->vendor == PCI_VENDOR_ID_TOPSPIN)) {
765 		struct ib_qp_attr attr = {
766 			.path_mtu = IB_MTU_1024
767 		};
768 		rc = ib_modify_qp(ia->ri_id->qp, &attr, IB_QP_PATH_MTU);
769 	}
770 }
771 
772 	/* Theoretically a client initiator_depth > 0 is not needed,
773 	 * but many peers fail to complete the connection unless they
774 	 * == responder_resources! */
775 	if (ep->rep_remote_cma.initiator_depth !=
776 				ep->rep_remote_cma.responder_resources)
777 		ep->rep_remote_cma.initiator_depth =
778 			ep->rep_remote_cma.responder_resources;
779 
780 	ep->rep_connected = 0;
781 
782 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
783 	if (rc) {
784 		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
785 				__func__, rc);
786 		goto out;
787 	}
788 
789 	if (reconnect)
790 		return 0;
791 
792 	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
793 
794 	/*
795 	 * Check state. A non-peer reject indicates no listener
796 	 * (ECONNREFUSED), which may be a transient state. All
797 	 * others indicate a transport condition which has already
798 	 * undergone a best-effort.
799 	 */
800 	if (ep->rep_connected == -ECONNREFUSED
801 	    && ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
802 		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
803 		goto retry;
804 	}
805 	if (ep->rep_connected <= 0) {
806 		/* Sometimes, the only way to reliably connect to remote
807 		 * CMs is to use same nonzero values for ORD and IRD. */
808 		ep->rep_remote_cma.initiator_depth =
809 					ep->rep_remote_cma.responder_resources;
810 		if (ep->rep_remote_cma.initiator_depth == 0)
811 			++ep->rep_remote_cma.initiator_depth;
812 		if (ep->rep_remote_cma.responder_resources == 0)
813 			++ep->rep_remote_cma.responder_resources;
814 		if (retry_count++ == 0)
815 			goto retry;
816 		rc = ep->rep_connected;
817 	} else {
818 		dprintk("RPC:       %s: connected\n", __func__);
819 	}
820 
821 out:
822 	if (rc)
823 		ep->rep_connected = rc;
824 	return rc;
825 }
826 
827 /*
828  * rpcrdma_ep_disconnect
829  *
830  * This is separate from destroy to facilitate the ability
831  * to reconnect without recreating the endpoint.
832  *
833  * This call is not reentrant, and must not be made in parallel
834  * on the same endpoint.
835  */
836 int
837 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
838 {
839 	int rc;
840 
841 	rpcrdma_clean_cq(ep->rep_cq);
842 	rc = rdma_disconnect(ia->ri_id);
843 	if (!rc) {
844 		/* returns without wait if not connected */
845 		wait_event_interruptible(ep->rep_connect_wait,
846 							ep->rep_connected != 1);
847 		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
848 			(ep->rep_connected == 1) ? "still " : "dis");
849 	} else {
850 		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
851 		ep->rep_connected = rc;
852 	}
853 	return rc;
854 }
855 
856 /*
857  * Initialize buffer memory
858  */
859 int
860 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
861 	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
862 {
863 	char *p;
864 	size_t len;
865 	int i, rc;
866 
867 	buf->rb_max_requests = cdata->max_requests;
868 	spin_lock_init(&buf->rb_lock);
869 	atomic_set(&buf->rb_credits, 1);
870 
871 	/* Need to allocate:
872 	 *   1.  arrays for send and recv pointers
873 	 *   2.  arrays of struct rpcrdma_req to fill in pointers
874 	 *   3.  array of struct rpcrdma_rep for replies
875 	 *   4.  padding, if any
876 	 *   5.  mw's, if any
877 	 * Send/recv buffers in req/rep need to be registered
878 	 */
879 
880 	len = buf->rb_max_requests *
881 		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
882 	len += cdata->padding;
883 	switch (ia->ri_memreg_strategy) {
884 	case RPCRDMA_MTHCAFMR:
885 		/* TBD we are perhaps overallocating here */
886 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
887 				sizeof(struct rpcrdma_mw);
888 		break;
889 	case RPCRDMA_MEMWINDOWS_ASYNC:
890 	case RPCRDMA_MEMWINDOWS:
891 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
892 				sizeof(struct rpcrdma_mw);
893 		break;
894 	default:
895 		break;
896 	}
897 
898 	/* allocate 1, 4 and 5 in one shot */
899 	p = kzalloc(len, GFP_KERNEL);
900 	if (p == NULL) {
901 		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
902 			__func__, len);
903 		rc = -ENOMEM;
904 		goto out;
905 	}
906 	buf->rb_pool = p;	/* for freeing it later */
907 
908 	buf->rb_send_bufs = (struct rpcrdma_req **) p;
909 	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
910 	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
911 	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
912 
913 	/*
914 	 * Register the zeroed pad buffer, if any.
915 	 */
916 	if (cdata->padding) {
917 		rc = rpcrdma_register_internal(ia, p, cdata->padding,
918 					    &ep->rep_pad_mr, &ep->rep_pad);
919 		if (rc)
920 			goto out;
921 	}
922 	p += cdata->padding;
923 
924 	/*
925 	 * Allocate the fmr's, or mw's for mw_bind chunk registration.
926 	 * We "cycle" the mw's in order to minimize rkey reuse,
927 	 * and also reduce unbind-to-bind collision.
928 	 */
929 	INIT_LIST_HEAD(&buf->rb_mws);
930 	switch (ia->ri_memreg_strategy) {
931 	case RPCRDMA_MTHCAFMR:
932 		{
933 		struct rpcrdma_mw *r = (struct rpcrdma_mw *)p;
934 		struct ib_fmr_attr fa = {
935 			RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT
936 		};
937 		/* TBD we are perhaps overallocating here */
938 		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
939 			r->r.fmr = ib_alloc_fmr(ia->ri_pd,
940 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
941 				&fa);
942 			if (IS_ERR(r->r.fmr)) {
943 				rc = PTR_ERR(r->r.fmr);
944 				dprintk("RPC:       %s: ib_alloc_fmr"
945 					" failed %i\n", __func__, rc);
946 				goto out;
947 			}
948 			list_add(&r->mw_list, &buf->rb_mws);
949 			++r;
950 		}
951 		}
952 		break;
953 	case RPCRDMA_MEMWINDOWS_ASYNC:
954 	case RPCRDMA_MEMWINDOWS:
955 		{
956 		struct rpcrdma_mw *r = (struct rpcrdma_mw *)p;
957 		/* Allocate one extra request's worth, for full cycling */
958 		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
959 			r->r.mw = ib_alloc_mw(ia->ri_pd);
960 			if (IS_ERR(r->r.mw)) {
961 				rc = PTR_ERR(r->r.mw);
962 				dprintk("RPC:       %s: ib_alloc_mw"
963 					" failed %i\n", __func__, rc);
964 				goto out;
965 			}
966 			list_add(&r->mw_list, &buf->rb_mws);
967 			++r;
968 		}
969 		}
970 		break;
971 	default:
972 		break;
973 	}
974 
975 	/*
976 	 * Allocate/init the request/reply buffers. Doing this
977 	 * using kmalloc for now -- one for each buf.
978 	 */
979 	for (i = 0; i < buf->rb_max_requests; i++) {
980 		struct rpcrdma_req *req;
981 		struct rpcrdma_rep *rep;
982 
983 		len = cdata->inline_wsize + sizeof(struct rpcrdma_req);
984 		/* RPC layer requests *double* size + 1K RPC_SLACK_SPACE! */
985 		/* Typical ~2400b, so rounding up saves work later */
986 		if (len < 4096)
987 			len = 4096;
988 		req = kmalloc(len, GFP_KERNEL);
989 		if (req == NULL) {
990 			dprintk("RPC:       %s: request buffer %d alloc"
991 				" failed\n", __func__, i);
992 			rc = -ENOMEM;
993 			goto out;
994 		}
995 		memset(req, 0, sizeof(struct rpcrdma_req));
996 		buf->rb_send_bufs[i] = req;
997 		buf->rb_send_bufs[i]->rl_buffer = buf;
998 
999 		rc = rpcrdma_register_internal(ia, req->rl_base,
1000 				len - offsetof(struct rpcrdma_req, rl_base),
1001 				&buf->rb_send_bufs[i]->rl_handle,
1002 				&buf->rb_send_bufs[i]->rl_iov);
1003 		if (rc)
1004 			goto out;
1005 
1006 		buf->rb_send_bufs[i]->rl_size = len-sizeof(struct rpcrdma_req);
1007 
1008 		len = cdata->inline_rsize + sizeof(struct rpcrdma_rep);
1009 		rep = kmalloc(len, GFP_KERNEL);
1010 		if (rep == NULL) {
1011 			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1012 				__func__, i);
1013 			rc = -ENOMEM;
1014 			goto out;
1015 		}
1016 		memset(rep, 0, sizeof(struct rpcrdma_rep));
1017 		buf->rb_recv_bufs[i] = rep;
1018 		buf->rb_recv_bufs[i]->rr_buffer = buf;
1019 		init_waitqueue_head(&rep->rr_unbind);
1020 
1021 		rc = rpcrdma_register_internal(ia, rep->rr_base,
1022 				len - offsetof(struct rpcrdma_rep, rr_base),
1023 				&buf->rb_recv_bufs[i]->rr_handle,
1024 				&buf->rb_recv_bufs[i]->rr_iov);
1025 		if (rc)
1026 			goto out;
1027 
1028 	}
1029 	dprintk("RPC:       %s: max_requests %d\n",
1030 		__func__, buf->rb_max_requests);
1031 	/* done */
1032 	return 0;
1033 out:
1034 	rpcrdma_buffer_destroy(buf);
1035 	return rc;
1036 }
1037 
1038 /*
1039  * Unregister and destroy buffer memory. Need to deal with
1040  * partial initialization, so it's callable from failed create.
1041  * Must be called before destroying endpoint, as registrations
1042  * reference it.
1043  */
1044 void
1045 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1046 {
1047 	int rc, i;
1048 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1049 
1050 	/* clean up in reverse order from create
1051 	 *   1.  recv mr memory (mr free, then kfree)
1052 	 *   1a. bind mw memory
1053 	 *   2.  send mr memory (mr free, then kfree)
1054 	 *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1055 	 *   4.  arrays
1056 	 */
1057 	dprintk("RPC:       %s: entering\n", __func__);
1058 
1059 	for (i = 0; i < buf->rb_max_requests; i++) {
1060 		if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1061 			rpcrdma_deregister_internal(ia,
1062 					buf->rb_recv_bufs[i]->rr_handle,
1063 					&buf->rb_recv_bufs[i]->rr_iov);
1064 			kfree(buf->rb_recv_bufs[i]);
1065 		}
1066 		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1067 			while (!list_empty(&buf->rb_mws)) {
1068 				struct rpcrdma_mw *r;
1069 				r = list_entry(buf->rb_mws.next,
1070 					struct rpcrdma_mw, mw_list);
1071 				list_del(&r->mw_list);
1072 				switch (ia->ri_memreg_strategy) {
1073 				case RPCRDMA_MTHCAFMR:
1074 					rc = ib_dealloc_fmr(r->r.fmr);
1075 					if (rc)
1076 						dprintk("RPC:       %s:"
1077 							" ib_dealloc_fmr"
1078 							" failed %i\n",
1079 							__func__, rc);
1080 					break;
1081 				case RPCRDMA_MEMWINDOWS_ASYNC:
1082 				case RPCRDMA_MEMWINDOWS:
1083 					rc = ib_dealloc_mw(r->r.mw);
1084 					if (rc)
1085 						dprintk("RPC:       %s:"
1086 							" ib_dealloc_mw"
1087 							" failed %i\n",
1088 							__func__, rc);
1089 					break;
1090 				default:
1091 					break;
1092 				}
1093 			}
1094 			rpcrdma_deregister_internal(ia,
1095 					buf->rb_send_bufs[i]->rl_handle,
1096 					&buf->rb_send_bufs[i]->rl_iov);
1097 			kfree(buf->rb_send_bufs[i]);
1098 		}
1099 	}
1100 
1101 	kfree(buf->rb_pool);
1102 }
1103 
1104 /*
1105  * Get a set of request/reply buffers.
1106  *
1107  * Reply buffer (if needed) is attached to send buffer upon return.
1108  * Rule:
1109  *    rb_send_index and rb_recv_index MUST always be pointing to the
1110  *    *next* available buffer (non-NULL). They are incremented after
1111  *    removing buffers, and decremented *before* returning them.
1112  */
1113 struct rpcrdma_req *
1114 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1115 {
1116 	struct rpcrdma_req *req;
1117 	unsigned long flags;
1118 
1119 	spin_lock_irqsave(&buffers->rb_lock, flags);
1120 	if (buffers->rb_send_index == buffers->rb_max_requests) {
1121 		spin_unlock_irqrestore(&buffers->rb_lock, flags);
1122 		dprintk("RPC:       %s: out of request buffers\n", __func__);
1123 		return ((struct rpcrdma_req *)NULL);
1124 	}
1125 
1126 	req = buffers->rb_send_bufs[buffers->rb_send_index];
1127 	if (buffers->rb_send_index < buffers->rb_recv_index) {
1128 		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1129 			__func__,
1130 			buffers->rb_recv_index - buffers->rb_send_index);
1131 		req->rl_reply = NULL;
1132 	} else {
1133 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1134 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1135 	}
1136 	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1137 	if (!list_empty(&buffers->rb_mws)) {
1138 		int i = RPCRDMA_MAX_SEGS - 1;
1139 		do {
1140 			struct rpcrdma_mw *r;
1141 			r = list_entry(buffers->rb_mws.next,
1142 					struct rpcrdma_mw, mw_list);
1143 			list_del(&r->mw_list);
1144 			req->rl_segments[i].mr_chunk.rl_mw = r;
1145 		} while (--i >= 0);
1146 	}
1147 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1148 	return req;
1149 }
1150 
1151 /*
1152  * Put request/reply buffers back into pool.
1153  * Pre-decrement counter/array index.
1154  */
1155 void
1156 rpcrdma_buffer_put(struct rpcrdma_req *req)
1157 {
1158 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1159 	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1160 	int i;
1161 	unsigned long flags;
1162 
1163 	BUG_ON(req->rl_nchunks != 0);
1164 	spin_lock_irqsave(&buffers->rb_lock, flags);
1165 	buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1166 	req->rl_niovs = 0;
1167 	if (req->rl_reply) {
1168 		buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1169 		init_waitqueue_head(&req->rl_reply->rr_unbind);
1170 		req->rl_reply->rr_func = NULL;
1171 		req->rl_reply = NULL;
1172 	}
1173 	switch (ia->ri_memreg_strategy) {
1174 	case RPCRDMA_MTHCAFMR:
1175 	case RPCRDMA_MEMWINDOWS_ASYNC:
1176 	case RPCRDMA_MEMWINDOWS:
1177 		/*
1178 		 * Cycle mw's back in reverse order, and "spin" them.
1179 		 * This delays and scrambles reuse as much as possible.
1180 		 */
1181 		i = 1;
1182 		do {
1183 			struct rpcrdma_mw **mw;
1184 			mw = &req->rl_segments[i].mr_chunk.rl_mw;
1185 			list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1186 			*mw = NULL;
1187 		} while (++i < RPCRDMA_MAX_SEGS);
1188 		list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1189 					&buffers->rb_mws);
1190 		req->rl_segments[0].mr_chunk.rl_mw = NULL;
1191 		break;
1192 	default:
1193 		break;
1194 	}
1195 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1196 }
1197 
1198 /*
1199  * Recover reply buffers from pool.
1200  * This happens when recovering from error conditions.
1201  * Post-increment counter/array index.
1202  */
1203 void
1204 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1205 {
1206 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1207 	unsigned long flags;
1208 
1209 	if (req->rl_iov.length == 0)	/* special case xprt_rdma_allocate() */
1210 		buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1211 	spin_lock_irqsave(&buffers->rb_lock, flags);
1212 	if (buffers->rb_recv_index < buffers->rb_max_requests) {
1213 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1214 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1215 	}
1216 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1217 }
1218 
1219 /*
1220  * Put reply buffers back into pool when not attached to
1221  * request. This happens in error conditions, and when
1222  * aborting unbinds. Pre-decrement counter/array index.
1223  */
1224 void
1225 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1226 {
1227 	struct rpcrdma_buffer *buffers = rep->rr_buffer;
1228 	unsigned long flags;
1229 
1230 	rep->rr_func = NULL;
1231 	spin_lock_irqsave(&buffers->rb_lock, flags);
1232 	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1233 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1234 }
1235 
1236 /*
1237  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1238  */
1239 
1240 int
1241 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1242 				struct ib_mr **mrp, struct ib_sge *iov)
1243 {
1244 	struct ib_phys_buf ipb;
1245 	struct ib_mr *mr;
1246 	int rc;
1247 
1248 	/*
1249 	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1250 	 */
1251 	iov->addr = ib_dma_map_single(ia->ri_id->device,
1252 			va, len, DMA_BIDIRECTIONAL);
1253 	iov->length = len;
1254 
1255 	if (ia->ri_bind_mem != NULL) {
1256 		*mrp = NULL;
1257 		iov->lkey = ia->ri_bind_mem->lkey;
1258 		return 0;
1259 	}
1260 
1261 	ipb.addr = iov->addr;
1262 	ipb.size = iov->length;
1263 	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1264 			IB_ACCESS_LOCAL_WRITE, &iov->addr);
1265 
1266 	dprintk("RPC:       %s: phys convert: 0x%llx "
1267 			"registered 0x%llx length %d\n",
1268 			__func__, (unsigned long long)ipb.addr,
1269 			(unsigned long long)iov->addr, len);
1270 
1271 	if (IS_ERR(mr)) {
1272 		*mrp = NULL;
1273 		rc = PTR_ERR(mr);
1274 		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1275 	} else {
1276 		*mrp = mr;
1277 		iov->lkey = mr->lkey;
1278 		rc = 0;
1279 	}
1280 
1281 	return rc;
1282 }
1283 
1284 int
1285 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1286 				struct ib_mr *mr, struct ib_sge *iov)
1287 {
1288 	int rc;
1289 
1290 	ib_dma_unmap_single(ia->ri_id->device,
1291 			iov->addr, iov->length, DMA_BIDIRECTIONAL);
1292 
1293 	if (NULL == mr)
1294 		return 0;
1295 
1296 	rc = ib_dereg_mr(mr);
1297 	if (rc)
1298 		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1299 	return rc;
1300 }
1301 
1302 /*
1303  * Wrappers for chunk registration, shared by read/write chunk code.
1304  */
1305 
1306 static void
1307 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1308 {
1309 	seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1310 	seg->mr_dmalen = seg->mr_len;
1311 	if (seg->mr_page)
1312 		seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1313 				seg->mr_page, offset_in_page(seg->mr_offset),
1314 				seg->mr_dmalen, seg->mr_dir);
1315 	else
1316 		seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1317 				seg->mr_offset,
1318 				seg->mr_dmalen, seg->mr_dir);
1319 }
1320 
1321 static void
1322 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1323 {
1324 	if (seg->mr_page)
1325 		ib_dma_unmap_page(ia->ri_id->device,
1326 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1327 	else
1328 		ib_dma_unmap_single(ia->ri_id->device,
1329 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1330 }
1331 
1332 int
1333 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1334 			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1335 {
1336 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1337 	int mem_priv = (writing ? IB_ACCESS_REMOTE_WRITE :
1338 				  IB_ACCESS_REMOTE_READ);
1339 	struct rpcrdma_mr_seg *seg1 = seg;
1340 	int i;
1341 	int rc = 0;
1342 
1343 	switch (ia->ri_memreg_strategy) {
1344 
1345 #if RPCRDMA_PERSISTENT_REGISTRATION
1346 	case RPCRDMA_ALLPHYSICAL:
1347 		rpcrdma_map_one(ia, seg, writing);
1348 		seg->mr_rkey = ia->ri_bind_mem->rkey;
1349 		seg->mr_base = seg->mr_dma;
1350 		seg->mr_nsegs = 1;
1351 		nsegs = 1;
1352 		break;
1353 #endif
1354 
1355 	/* Registration using fast memory registration */
1356 	case RPCRDMA_MTHCAFMR:
1357 		{
1358 		u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1359 		int len, pageoff = offset_in_page(seg->mr_offset);
1360 		seg1->mr_offset -= pageoff;	/* start of page */
1361 		seg1->mr_len += pageoff;
1362 		len = -pageoff;
1363 		if (nsegs > RPCRDMA_MAX_DATA_SEGS)
1364 			nsegs = RPCRDMA_MAX_DATA_SEGS;
1365 		for (i = 0; i < nsegs;) {
1366 			rpcrdma_map_one(ia, seg, writing);
1367 			physaddrs[i] = seg->mr_dma;
1368 			len += seg->mr_len;
1369 			++seg;
1370 			++i;
1371 			/* Check for holes */
1372 			if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
1373 			    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1374 				break;
1375 		}
1376 		nsegs = i;
1377 		rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1378 					physaddrs, nsegs, seg1->mr_dma);
1379 		if (rc) {
1380 			dprintk("RPC:       %s: failed ib_map_phys_fmr "
1381 				"%u@0x%llx+%i (%d)... status %i\n", __func__,
1382 				len, (unsigned long long)seg1->mr_dma,
1383 				pageoff, nsegs, rc);
1384 			while (nsegs--)
1385 				rpcrdma_unmap_one(ia, --seg);
1386 		} else {
1387 			seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1388 			seg1->mr_base = seg1->mr_dma + pageoff;
1389 			seg1->mr_nsegs = nsegs;
1390 			seg1->mr_len = len;
1391 		}
1392 		}
1393 		break;
1394 
1395 	/* Registration using memory windows */
1396 	case RPCRDMA_MEMWINDOWS_ASYNC:
1397 	case RPCRDMA_MEMWINDOWS:
1398 		{
1399 		struct ib_mw_bind param;
1400 		rpcrdma_map_one(ia, seg, writing);
1401 		param.mr = ia->ri_bind_mem;
1402 		param.wr_id = 0ULL;	/* no send cookie */
1403 		param.addr = seg->mr_dma;
1404 		param.length = seg->mr_len;
1405 		param.send_flags = 0;
1406 		param.mw_access_flags = mem_priv;
1407 
1408 		DECR_CQCOUNT(&r_xprt->rx_ep);
1409 		rc = ib_bind_mw(ia->ri_id->qp,
1410 					seg->mr_chunk.rl_mw->r.mw, &param);
1411 		if (rc) {
1412 			dprintk("RPC:       %s: failed ib_bind_mw "
1413 				"%u@0x%llx status %i\n",
1414 				__func__, seg->mr_len,
1415 				(unsigned long long)seg->mr_dma, rc);
1416 			rpcrdma_unmap_one(ia, seg);
1417 		} else {
1418 			seg->mr_rkey = seg->mr_chunk.rl_mw->r.mw->rkey;
1419 			seg->mr_base = param.addr;
1420 			seg->mr_nsegs = 1;
1421 			nsegs = 1;
1422 		}
1423 		}
1424 		break;
1425 
1426 	/* Default registration each time */
1427 	default:
1428 		{
1429 		struct ib_phys_buf ipb[RPCRDMA_MAX_DATA_SEGS];
1430 		int len = 0;
1431 		if (nsegs > RPCRDMA_MAX_DATA_SEGS)
1432 			nsegs = RPCRDMA_MAX_DATA_SEGS;
1433 		for (i = 0; i < nsegs;) {
1434 			rpcrdma_map_one(ia, seg, writing);
1435 			ipb[i].addr = seg->mr_dma;
1436 			ipb[i].size = seg->mr_len;
1437 			len += seg->mr_len;
1438 			++seg;
1439 			++i;
1440 			/* Check for holes */
1441 			if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
1442 			    offset_in_page((seg-1)->mr_offset+(seg-1)->mr_len))
1443 				break;
1444 		}
1445 		nsegs = i;
1446 		seg1->mr_base = seg1->mr_dma;
1447 		seg1->mr_chunk.rl_mr = ib_reg_phys_mr(ia->ri_pd,
1448 					ipb, nsegs, mem_priv, &seg1->mr_base);
1449 		if (IS_ERR(seg1->mr_chunk.rl_mr)) {
1450 			rc = PTR_ERR(seg1->mr_chunk.rl_mr);
1451 			dprintk("RPC:       %s: failed ib_reg_phys_mr "
1452 				"%u@0x%llx (%d)... status %i\n",
1453 				__func__, len,
1454 				(unsigned long long)seg1->mr_dma, nsegs, rc);
1455 			while (nsegs--)
1456 				rpcrdma_unmap_one(ia, --seg);
1457 		} else {
1458 			seg1->mr_rkey = seg1->mr_chunk.rl_mr->rkey;
1459 			seg1->mr_nsegs = nsegs;
1460 			seg1->mr_len = len;
1461 		}
1462 		}
1463 		break;
1464 	}
1465 	if (rc)
1466 		return -1;
1467 
1468 	return nsegs;
1469 }
1470 
1471 int
1472 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1473 		struct rpcrdma_xprt *r_xprt, void *r)
1474 {
1475 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1476 	struct rpcrdma_mr_seg *seg1 = seg;
1477 	int nsegs = seg->mr_nsegs, rc;
1478 
1479 	switch (ia->ri_memreg_strategy) {
1480 
1481 #if RPCRDMA_PERSISTENT_REGISTRATION
1482 	case RPCRDMA_ALLPHYSICAL:
1483 		BUG_ON(nsegs != 1);
1484 		rpcrdma_unmap_one(ia, seg);
1485 		rc = 0;
1486 		break;
1487 #endif
1488 
1489 	case RPCRDMA_MTHCAFMR:
1490 		{
1491 		LIST_HEAD(l);
1492 		list_add(&seg->mr_chunk.rl_mw->r.fmr->list, &l);
1493 		rc = ib_unmap_fmr(&l);
1494 		while (seg1->mr_nsegs--)
1495 			rpcrdma_unmap_one(ia, seg++);
1496 		}
1497 		if (rc)
1498 			dprintk("RPC:       %s: failed ib_unmap_fmr,"
1499 				" status %i\n", __func__, rc);
1500 		break;
1501 
1502 	case RPCRDMA_MEMWINDOWS_ASYNC:
1503 	case RPCRDMA_MEMWINDOWS:
1504 		{
1505 		struct ib_mw_bind param;
1506 		BUG_ON(nsegs != 1);
1507 		param.mr = ia->ri_bind_mem;
1508 		param.addr = 0ULL;	/* unbind */
1509 		param.length = 0;
1510 		param.mw_access_flags = 0;
1511 		if (r) {
1512 			param.wr_id = (u64) (unsigned long) r;
1513 			param.send_flags = IB_SEND_SIGNALED;
1514 			INIT_CQCOUNT(&r_xprt->rx_ep);
1515 		} else {
1516 			param.wr_id = 0ULL;
1517 			param.send_flags = 0;
1518 			DECR_CQCOUNT(&r_xprt->rx_ep);
1519 		}
1520 		rc = ib_bind_mw(ia->ri_id->qp,
1521 				seg->mr_chunk.rl_mw->r.mw, &param);
1522 		rpcrdma_unmap_one(ia, seg);
1523 		}
1524 		if (rc)
1525 			dprintk("RPC:       %s: failed ib_(un)bind_mw,"
1526 				" status %i\n", __func__, rc);
1527 		else
1528 			r = NULL;	/* will upcall on completion */
1529 		break;
1530 
1531 	default:
1532 		rc = ib_dereg_mr(seg1->mr_chunk.rl_mr);
1533 		seg1->mr_chunk.rl_mr = NULL;
1534 		while (seg1->mr_nsegs--)
1535 			rpcrdma_unmap_one(ia, seg++);
1536 		if (rc)
1537 			dprintk("RPC:       %s: failed ib_dereg_mr,"
1538 				" status %i\n", __func__, rc);
1539 		break;
1540 	}
1541 	if (r) {
1542 		struct rpcrdma_rep *rep = r;
1543 		void (*func)(struct rpcrdma_rep *) = rep->rr_func;
1544 		rep->rr_func = NULL;
1545 		func(rep);	/* dereg done, callback now */
1546 	}
1547 	return nsegs;
1548 }
1549 
1550 /*
1551  * Prepost any receive buffer, then post send.
1552  *
1553  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1554  */
1555 int
1556 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1557 		struct rpcrdma_ep *ep,
1558 		struct rpcrdma_req *req)
1559 {
1560 	struct ib_send_wr send_wr, *send_wr_fail;
1561 	struct rpcrdma_rep *rep = req->rl_reply;
1562 	int rc;
1563 
1564 	if (rep) {
1565 		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1566 		if (rc)
1567 			goto out;
1568 		req->rl_reply = NULL;
1569 	}
1570 
1571 	send_wr.next = NULL;
1572 	send_wr.wr_id = 0ULL;	/* no send cookie */
1573 	send_wr.sg_list = req->rl_send_iov;
1574 	send_wr.num_sge = req->rl_niovs;
1575 	send_wr.opcode = IB_WR_SEND;
1576 	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
1577 		ib_dma_sync_single_for_device(ia->ri_id->device,
1578 			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1579 			DMA_TO_DEVICE);
1580 	ib_dma_sync_single_for_device(ia->ri_id->device,
1581 		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1582 		DMA_TO_DEVICE);
1583 	ib_dma_sync_single_for_device(ia->ri_id->device,
1584 		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1585 		DMA_TO_DEVICE);
1586 
1587 	if (DECR_CQCOUNT(ep) > 0)
1588 		send_wr.send_flags = 0;
1589 	else { /* Provider must take a send completion every now and then */
1590 		INIT_CQCOUNT(ep);
1591 		send_wr.send_flags = IB_SEND_SIGNALED;
1592 	}
1593 
1594 	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1595 	if (rc)
1596 		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1597 			rc);
1598 out:
1599 	return rc;
1600 }
1601 
1602 /*
1603  * (Re)post a receive buffer.
1604  */
1605 int
1606 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1607 		     struct rpcrdma_ep *ep,
1608 		     struct rpcrdma_rep *rep)
1609 {
1610 	struct ib_recv_wr recv_wr, *recv_wr_fail;
1611 	int rc;
1612 
1613 	recv_wr.next = NULL;
1614 	recv_wr.wr_id = (u64) (unsigned long) rep;
1615 	recv_wr.sg_list = &rep->rr_iov;
1616 	recv_wr.num_sge = 1;
1617 
1618 	ib_dma_sync_single_for_cpu(ia->ri_id->device,
1619 		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1620 
1621 	DECR_CQCOUNT(ep);
1622 	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1623 
1624 	if (rc)
1625 		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1626 			rc);
1627 	return rc;
1628 }
1629