xref: /openbmc/linux/net/sunrpc/xprtrdma/verbs.c (revision d2999e1b)
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39 
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49 
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <asm/bitops.h>
53 
54 #include "xprt_rdma.h"
55 
56 /*
57  * Globals/Macros
58  */
59 
60 #ifdef RPC_DEBUG
61 # define RPCDBG_FACILITY	RPCDBG_TRANS
62 #endif
63 
64 /*
65  * internal functions
66  */
67 
68 /*
69  * handle replies in tasklet context, using a single, global list
70  * rdma tasklet function -- just turn around and call the func
71  * for all replies on the list
72  */
73 
74 static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
75 static LIST_HEAD(rpcrdma_tasklets_g);
76 
77 static void
78 rpcrdma_run_tasklet(unsigned long data)
79 {
80 	struct rpcrdma_rep *rep;
81 	void (*func)(struct rpcrdma_rep *);
82 	unsigned long flags;
83 
84 	data = data;
85 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
86 	while (!list_empty(&rpcrdma_tasklets_g)) {
87 		rep = list_entry(rpcrdma_tasklets_g.next,
88 				 struct rpcrdma_rep, rr_list);
89 		list_del(&rep->rr_list);
90 		func = rep->rr_func;
91 		rep->rr_func = NULL;
92 		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
93 
94 		if (func)
95 			func(rep);
96 		else
97 			rpcrdma_recv_buffer_put(rep);
98 
99 		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
100 	}
101 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
102 }
103 
104 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
105 
106 static inline void
107 rpcrdma_schedule_tasklet(struct rpcrdma_rep *rep)
108 {
109 	unsigned long flags;
110 
111 	spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
112 	list_add_tail(&rep->rr_list, &rpcrdma_tasklets_g);
113 	spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
114 	tasklet_schedule(&rpcrdma_tasklet_g);
115 }
116 
117 static void
118 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
119 {
120 	struct rpcrdma_ep *ep = context;
121 
122 	dprintk("RPC:       %s: QP error %X on device %s ep %p\n",
123 		__func__, event->event, event->device->name, context);
124 	if (ep->rep_connected == 1) {
125 		ep->rep_connected = -EIO;
126 		ep->rep_func(ep);
127 		wake_up_all(&ep->rep_connect_wait);
128 	}
129 }
130 
131 static void
132 rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
133 {
134 	struct rpcrdma_ep *ep = context;
135 
136 	dprintk("RPC:       %s: CQ error %X on device %s ep %p\n",
137 		__func__, event->event, event->device->name, context);
138 	if (ep->rep_connected == 1) {
139 		ep->rep_connected = -EIO;
140 		ep->rep_func(ep);
141 		wake_up_all(&ep->rep_connect_wait);
142 	}
143 }
144 
145 static void
146 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
147 {
148 	struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
149 
150 	dprintk("RPC:       %s: frmr %p status %X opcode %d\n",
151 		__func__, frmr, wc->status, wc->opcode);
152 
153 	if (wc->wr_id == 0ULL)
154 		return;
155 	if (wc->status != IB_WC_SUCCESS)
156 		return;
157 
158 	if (wc->opcode == IB_WC_FAST_REG_MR)
159 		frmr->r.frmr.state = FRMR_IS_VALID;
160 	else if (wc->opcode == IB_WC_LOCAL_INV)
161 		frmr->r.frmr.state = FRMR_IS_INVALID;
162 }
163 
164 static int
165 rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
166 {
167 	struct ib_wc *wcs;
168 	int budget, count, rc;
169 
170 	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
171 	do {
172 		wcs = ep->rep_send_wcs;
173 
174 		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
175 		if (rc <= 0)
176 			return rc;
177 
178 		count = rc;
179 		while (count-- > 0)
180 			rpcrdma_sendcq_process_wc(wcs++);
181 	} while (rc == RPCRDMA_POLLSIZE && --budget);
182 	return 0;
183 }
184 
185 /*
186  * Handle send, fast_reg_mr, and local_inv completions.
187  *
188  * Send events are typically suppressed and thus do not result
189  * in an upcall. Occasionally one is signaled, however. This
190  * prevents the provider's completion queue from wrapping and
191  * losing a completion.
192  */
193 static void
194 rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
195 {
196 	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
197 	int rc;
198 
199 	rc = rpcrdma_sendcq_poll(cq, ep);
200 	if (rc) {
201 		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
202 			__func__, rc);
203 		return;
204 	}
205 
206 	rc = ib_req_notify_cq(cq,
207 			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
208 	if (rc == 0)
209 		return;
210 	if (rc < 0) {
211 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
212 			__func__, rc);
213 		return;
214 	}
215 
216 	rpcrdma_sendcq_poll(cq, ep);
217 }
218 
219 static void
220 rpcrdma_recvcq_process_wc(struct ib_wc *wc)
221 {
222 	struct rpcrdma_rep *rep =
223 			(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
224 
225 	dprintk("RPC:       %s: rep %p status %X opcode %X length %u\n",
226 		__func__, rep, wc->status, wc->opcode, wc->byte_len);
227 
228 	if (wc->status != IB_WC_SUCCESS) {
229 		rep->rr_len = ~0U;
230 		goto out_schedule;
231 	}
232 	if (wc->opcode != IB_WC_RECV)
233 		return;
234 
235 	rep->rr_len = wc->byte_len;
236 	ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
237 			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
238 
239 	if (rep->rr_len >= 16) {
240 		struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
241 		unsigned int credits = ntohl(p->rm_credit);
242 
243 		if (credits == 0)
244 			credits = 1;	/* don't deadlock */
245 		else if (credits > rep->rr_buffer->rb_max_requests)
246 			credits = rep->rr_buffer->rb_max_requests;
247 		atomic_set(&rep->rr_buffer->rb_credits, credits);
248 	}
249 
250 out_schedule:
251 	rpcrdma_schedule_tasklet(rep);
252 }
253 
254 static int
255 rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
256 {
257 	struct ib_wc *wcs;
258 	int budget, count, rc;
259 
260 	budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
261 	do {
262 		wcs = ep->rep_recv_wcs;
263 
264 		rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
265 		if (rc <= 0)
266 			return rc;
267 
268 		count = rc;
269 		while (count-- > 0)
270 			rpcrdma_recvcq_process_wc(wcs++);
271 	} while (rc == RPCRDMA_POLLSIZE && --budget);
272 	return 0;
273 }
274 
275 /*
276  * Handle receive completions.
277  *
278  * It is reentrant but processes single events in order to maintain
279  * ordering of receives to keep server credits.
280  *
281  * It is the responsibility of the scheduled tasklet to return
282  * recv buffers to the pool. NOTE: this affects synchronization of
283  * connection shutdown. That is, the structures required for
284  * the completion of the reply handler must remain intact until
285  * all memory has been reclaimed.
286  */
287 static void
288 rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
289 {
290 	struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
291 	int rc;
292 
293 	rc = rpcrdma_recvcq_poll(cq, ep);
294 	if (rc) {
295 		dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
296 			__func__, rc);
297 		return;
298 	}
299 
300 	rc = ib_req_notify_cq(cq,
301 			IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
302 	if (rc == 0)
303 		return;
304 	if (rc < 0) {
305 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
306 			__func__, rc);
307 		return;
308 	}
309 
310 	rpcrdma_recvcq_poll(cq, ep);
311 }
312 
313 #ifdef RPC_DEBUG
314 static const char * const conn[] = {
315 	"address resolved",
316 	"address error",
317 	"route resolved",
318 	"route error",
319 	"connect request",
320 	"connect response",
321 	"connect error",
322 	"unreachable",
323 	"rejected",
324 	"established",
325 	"disconnected",
326 	"device removal"
327 };
328 #endif
329 
330 static int
331 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
332 {
333 	struct rpcrdma_xprt *xprt = id->context;
334 	struct rpcrdma_ia *ia = &xprt->rx_ia;
335 	struct rpcrdma_ep *ep = &xprt->rx_ep;
336 #ifdef RPC_DEBUG
337 	struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
338 #endif
339 	struct ib_qp_attr attr;
340 	struct ib_qp_init_attr iattr;
341 	int connstate = 0;
342 
343 	switch (event->event) {
344 	case RDMA_CM_EVENT_ADDR_RESOLVED:
345 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
346 		ia->ri_async_rc = 0;
347 		complete(&ia->ri_done);
348 		break;
349 	case RDMA_CM_EVENT_ADDR_ERROR:
350 		ia->ri_async_rc = -EHOSTUNREACH;
351 		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
352 			__func__, ep);
353 		complete(&ia->ri_done);
354 		break;
355 	case RDMA_CM_EVENT_ROUTE_ERROR:
356 		ia->ri_async_rc = -ENETUNREACH;
357 		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
358 			__func__, ep);
359 		complete(&ia->ri_done);
360 		break;
361 	case RDMA_CM_EVENT_ESTABLISHED:
362 		connstate = 1;
363 		ib_query_qp(ia->ri_id->qp, &attr,
364 			IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
365 			&iattr);
366 		dprintk("RPC:       %s: %d responder resources"
367 			" (%d initiator)\n",
368 			__func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
369 		goto connected;
370 	case RDMA_CM_EVENT_CONNECT_ERROR:
371 		connstate = -ENOTCONN;
372 		goto connected;
373 	case RDMA_CM_EVENT_UNREACHABLE:
374 		connstate = -ENETDOWN;
375 		goto connected;
376 	case RDMA_CM_EVENT_REJECTED:
377 		connstate = -ECONNREFUSED;
378 		goto connected;
379 	case RDMA_CM_EVENT_DISCONNECTED:
380 		connstate = -ECONNABORTED;
381 		goto connected;
382 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
383 		connstate = -ENODEV;
384 connected:
385 		dprintk("RPC:       %s: %s: %pI4:%u (ep 0x%p event 0x%x)\n",
386 			__func__,
387 			(event->event <= 11) ? conn[event->event] :
388 						"unknown connection error",
389 			&addr->sin_addr.s_addr,
390 			ntohs(addr->sin_port),
391 			ep, event->event);
392 		atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
393 		dprintk("RPC:       %s: %sconnected\n",
394 					__func__, connstate > 0 ? "" : "dis");
395 		ep->rep_connected = connstate;
396 		ep->rep_func(ep);
397 		wake_up_all(&ep->rep_connect_wait);
398 		break;
399 	default:
400 		dprintk("RPC:       %s: unexpected CM event %d\n",
401 			__func__, event->event);
402 		break;
403 	}
404 
405 #ifdef RPC_DEBUG
406 	if (connstate == 1) {
407 		int ird = attr.max_dest_rd_atomic;
408 		int tird = ep->rep_remote_cma.responder_resources;
409 		printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
410 			"on %s, memreg %d slots %d ird %d%s\n",
411 			&addr->sin_addr.s_addr,
412 			ntohs(addr->sin_port),
413 			ia->ri_id->device->name,
414 			ia->ri_memreg_strategy,
415 			xprt->rx_buf.rb_max_requests,
416 			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
417 	} else if (connstate < 0) {
418 		printk(KERN_INFO "rpcrdma: connection to %pI4:%u closed (%d)\n",
419 			&addr->sin_addr.s_addr,
420 			ntohs(addr->sin_port),
421 			connstate);
422 	}
423 #endif
424 
425 	return 0;
426 }
427 
428 static struct rdma_cm_id *
429 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
430 			struct rpcrdma_ia *ia, struct sockaddr *addr)
431 {
432 	struct rdma_cm_id *id;
433 	int rc;
434 
435 	init_completion(&ia->ri_done);
436 
437 	id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
438 	if (IS_ERR(id)) {
439 		rc = PTR_ERR(id);
440 		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
441 			__func__, rc);
442 		return id;
443 	}
444 
445 	ia->ri_async_rc = -ETIMEDOUT;
446 	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
447 	if (rc) {
448 		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
449 			__func__, rc);
450 		goto out;
451 	}
452 	wait_for_completion_interruptible_timeout(&ia->ri_done,
453 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
454 	rc = ia->ri_async_rc;
455 	if (rc)
456 		goto out;
457 
458 	ia->ri_async_rc = -ETIMEDOUT;
459 	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
460 	if (rc) {
461 		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
462 			__func__, rc);
463 		goto out;
464 	}
465 	wait_for_completion_interruptible_timeout(&ia->ri_done,
466 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
467 	rc = ia->ri_async_rc;
468 	if (rc)
469 		goto out;
470 
471 	return id;
472 
473 out:
474 	rdma_destroy_id(id);
475 	return ERR_PTR(rc);
476 }
477 
478 /*
479  * Drain any cq, prior to teardown.
480  */
481 static void
482 rpcrdma_clean_cq(struct ib_cq *cq)
483 {
484 	struct ib_wc wc;
485 	int count = 0;
486 
487 	while (1 == ib_poll_cq(cq, 1, &wc))
488 		++count;
489 
490 	if (count)
491 		dprintk("RPC:       %s: flushed %d events (last 0x%x)\n",
492 			__func__, count, wc.opcode);
493 }
494 
495 /*
496  * Exported functions.
497  */
498 
499 /*
500  * Open and initialize an Interface Adapter.
501  *  o initializes fields of struct rpcrdma_ia, including
502  *    interface and provider attributes and protection zone.
503  */
504 int
505 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
506 {
507 	int rc, mem_priv;
508 	struct ib_device_attr devattr;
509 	struct rpcrdma_ia *ia = &xprt->rx_ia;
510 
511 	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
512 	if (IS_ERR(ia->ri_id)) {
513 		rc = PTR_ERR(ia->ri_id);
514 		goto out1;
515 	}
516 
517 	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
518 	if (IS_ERR(ia->ri_pd)) {
519 		rc = PTR_ERR(ia->ri_pd);
520 		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
521 			__func__, rc);
522 		goto out2;
523 	}
524 
525 	/*
526 	 * Query the device to determine if the requested memory
527 	 * registration strategy is supported. If it isn't, set the
528 	 * strategy to a globally supported model.
529 	 */
530 	rc = ib_query_device(ia->ri_id->device, &devattr);
531 	if (rc) {
532 		dprintk("RPC:       %s: ib_query_device failed %d\n",
533 			__func__, rc);
534 		goto out2;
535 	}
536 
537 	if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
538 		ia->ri_have_dma_lkey = 1;
539 		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
540 	}
541 
542 	if (memreg == RPCRDMA_FRMR) {
543 		/* Requires both frmr reg and local dma lkey */
544 		if ((devattr.device_cap_flags &
545 		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
546 		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
547 			dprintk("RPC:       %s: FRMR registration "
548 				"not supported by HCA\n", __func__);
549 			memreg = RPCRDMA_MTHCAFMR;
550 		} else {
551 			/* Mind the ia limit on FRMR page list depth */
552 			ia->ri_max_frmr_depth = min_t(unsigned int,
553 				RPCRDMA_MAX_DATA_SEGS,
554 				devattr.max_fast_reg_page_list_len);
555 		}
556 	}
557 	if (memreg == RPCRDMA_MTHCAFMR) {
558 		if (!ia->ri_id->device->alloc_fmr) {
559 			dprintk("RPC:       %s: MTHCAFMR registration "
560 				"not supported by HCA\n", __func__);
561 #if RPCRDMA_PERSISTENT_REGISTRATION
562 			memreg = RPCRDMA_ALLPHYSICAL;
563 #else
564 			rc = -ENOMEM;
565 			goto out2;
566 #endif
567 		}
568 	}
569 
570 	/*
571 	 * Optionally obtain an underlying physical identity mapping in
572 	 * order to do a memory window-based bind. This base registration
573 	 * is protected from remote access - that is enabled only by binding
574 	 * for the specific bytes targeted during each RPC operation, and
575 	 * revoked after the corresponding completion similar to a storage
576 	 * adapter.
577 	 */
578 	switch (memreg) {
579 	case RPCRDMA_FRMR:
580 		break;
581 #if RPCRDMA_PERSISTENT_REGISTRATION
582 	case RPCRDMA_ALLPHYSICAL:
583 		mem_priv = IB_ACCESS_LOCAL_WRITE |
584 				IB_ACCESS_REMOTE_WRITE |
585 				IB_ACCESS_REMOTE_READ;
586 		goto register_setup;
587 #endif
588 	case RPCRDMA_MTHCAFMR:
589 		if (ia->ri_have_dma_lkey)
590 			break;
591 		mem_priv = IB_ACCESS_LOCAL_WRITE;
592 #if RPCRDMA_PERSISTENT_REGISTRATION
593 	register_setup:
594 #endif
595 		ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
596 		if (IS_ERR(ia->ri_bind_mem)) {
597 			printk(KERN_ALERT "%s: ib_get_dma_mr for "
598 				"phys register failed with %lX\n",
599 				__func__, PTR_ERR(ia->ri_bind_mem));
600 			rc = -ENOMEM;
601 			goto out2;
602 		}
603 		break;
604 	default:
605 		printk(KERN_ERR "RPC: Unsupported memory "
606 				"registration mode: %d\n", memreg);
607 		rc = -ENOMEM;
608 		goto out2;
609 	}
610 	dprintk("RPC:       %s: memory registration strategy is %d\n",
611 		__func__, memreg);
612 
613 	/* Else will do memory reg/dereg for each chunk */
614 	ia->ri_memreg_strategy = memreg;
615 
616 	return 0;
617 out2:
618 	rdma_destroy_id(ia->ri_id);
619 	ia->ri_id = NULL;
620 out1:
621 	return rc;
622 }
623 
624 /*
625  * Clean up/close an IA.
626  *   o if event handles and PD have been initialized, free them.
627  *   o close the IA
628  */
629 void
630 rpcrdma_ia_close(struct rpcrdma_ia *ia)
631 {
632 	int rc;
633 
634 	dprintk("RPC:       %s: entering\n", __func__);
635 	if (ia->ri_bind_mem != NULL) {
636 		rc = ib_dereg_mr(ia->ri_bind_mem);
637 		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
638 			__func__, rc);
639 	}
640 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
641 		if (ia->ri_id->qp)
642 			rdma_destroy_qp(ia->ri_id);
643 		rdma_destroy_id(ia->ri_id);
644 		ia->ri_id = NULL;
645 	}
646 	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
647 		rc = ib_dealloc_pd(ia->ri_pd);
648 		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
649 			__func__, rc);
650 	}
651 }
652 
653 /*
654  * Create unconnected endpoint.
655  */
656 int
657 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
658 				struct rpcrdma_create_data_internal *cdata)
659 {
660 	struct ib_device_attr devattr;
661 	struct ib_cq *sendcq, *recvcq;
662 	int rc, err;
663 
664 	rc = ib_query_device(ia->ri_id->device, &devattr);
665 	if (rc) {
666 		dprintk("RPC:       %s: ib_query_device failed %d\n",
667 			__func__, rc);
668 		return rc;
669 	}
670 
671 	/* check provider's send/recv wr limits */
672 	if (cdata->max_requests > devattr.max_qp_wr)
673 		cdata->max_requests = devattr.max_qp_wr;
674 
675 	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
676 	ep->rep_attr.qp_context = ep;
677 	/* send_cq and recv_cq initialized below */
678 	ep->rep_attr.srq = NULL;
679 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
680 	switch (ia->ri_memreg_strategy) {
681 	case RPCRDMA_FRMR: {
682 		int depth = 7;
683 
684 		/* Add room for frmr register and invalidate WRs.
685 		 * 1. FRMR reg WR for head
686 		 * 2. FRMR invalidate WR for head
687 		 * 3. N FRMR reg WRs for pagelist
688 		 * 4. N FRMR invalidate WRs for pagelist
689 		 * 5. FRMR reg WR for tail
690 		 * 6. FRMR invalidate WR for tail
691 		 * 7. The RDMA_SEND WR
692 		 */
693 
694 		/* Calculate N if the device max FRMR depth is smaller than
695 		 * RPCRDMA_MAX_DATA_SEGS.
696 		 */
697 		if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
698 			int delta = RPCRDMA_MAX_DATA_SEGS -
699 				    ia->ri_max_frmr_depth;
700 
701 			do {
702 				depth += 2; /* FRMR reg + invalidate */
703 				delta -= ia->ri_max_frmr_depth;
704 			} while (delta > 0);
705 
706 		}
707 		ep->rep_attr.cap.max_send_wr *= depth;
708 		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
709 			cdata->max_requests = devattr.max_qp_wr / depth;
710 			if (!cdata->max_requests)
711 				return -EINVAL;
712 			ep->rep_attr.cap.max_send_wr = cdata->max_requests *
713 						       depth;
714 		}
715 		break;
716 	}
717 	default:
718 		break;
719 	}
720 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
721 	ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
722 	ep->rep_attr.cap.max_recv_sge = 1;
723 	ep->rep_attr.cap.max_inline_data = 0;
724 	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
725 	ep->rep_attr.qp_type = IB_QPT_RC;
726 	ep->rep_attr.port_num = ~0;
727 
728 	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
729 		"iovs: send %d recv %d\n",
730 		__func__,
731 		ep->rep_attr.cap.max_send_wr,
732 		ep->rep_attr.cap.max_recv_wr,
733 		ep->rep_attr.cap.max_send_sge,
734 		ep->rep_attr.cap.max_recv_sge);
735 
736 	/* set trigger for requesting send completion */
737 	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
738 	if (ep->rep_cqinit <= 2)
739 		ep->rep_cqinit = 0;
740 	INIT_CQCOUNT(ep);
741 	ep->rep_ia = ia;
742 	init_waitqueue_head(&ep->rep_connect_wait);
743 	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
744 
745 	sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
746 				  rpcrdma_cq_async_error_upcall, ep,
747 				  ep->rep_attr.cap.max_send_wr + 1, 0);
748 	if (IS_ERR(sendcq)) {
749 		rc = PTR_ERR(sendcq);
750 		dprintk("RPC:       %s: failed to create send CQ: %i\n",
751 			__func__, rc);
752 		goto out1;
753 	}
754 
755 	rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
756 	if (rc) {
757 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
758 			__func__, rc);
759 		goto out2;
760 	}
761 
762 	recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
763 				  rpcrdma_cq_async_error_upcall, ep,
764 				  ep->rep_attr.cap.max_recv_wr + 1, 0);
765 	if (IS_ERR(recvcq)) {
766 		rc = PTR_ERR(recvcq);
767 		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
768 			__func__, rc);
769 		goto out2;
770 	}
771 
772 	rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
773 	if (rc) {
774 		dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
775 			__func__, rc);
776 		ib_destroy_cq(recvcq);
777 		goto out2;
778 	}
779 
780 	ep->rep_attr.send_cq = sendcq;
781 	ep->rep_attr.recv_cq = recvcq;
782 
783 	/* Initialize cma parameters */
784 
785 	/* RPC/RDMA does not use private data */
786 	ep->rep_remote_cma.private_data = NULL;
787 	ep->rep_remote_cma.private_data_len = 0;
788 
789 	/* Client offers RDMA Read but does not initiate */
790 	ep->rep_remote_cma.initiator_depth = 0;
791 	if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
792 		ep->rep_remote_cma.responder_resources = 32;
793 	else
794 		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
795 
796 	ep->rep_remote_cma.retry_count = 7;
797 	ep->rep_remote_cma.flow_control = 0;
798 	ep->rep_remote_cma.rnr_retry_count = 0;
799 
800 	return 0;
801 
802 out2:
803 	err = ib_destroy_cq(sendcq);
804 	if (err)
805 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
806 			__func__, err);
807 out1:
808 	return rc;
809 }
810 
811 /*
812  * rpcrdma_ep_destroy
813  *
814  * Disconnect and destroy endpoint. After this, the only
815  * valid operations on the ep are to free it (if dynamically
816  * allocated) or re-create it.
817  */
818 void
819 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
820 {
821 	int rc;
822 
823 	dprintk("RPC:       %s: entering, connected is %d\n",
824 		__func__, ep->rep_connected);
825 
826 	cancel_delayed_work_sync(&ep->rep_connect_worker);
827 
828 	if (ia->ri_id->qp) {
829 		rc = rpcrdma_ep_disconnect(ep, ia);
830 		if (rc)
831 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
832 				" returned %i\n", __func__, rc);
833 		rdma_destroy_qp(ia->ri_id);
834 		ia->ri_id->qp = NULL;
835 	}
836 
837 	/* padding - could be done in rpcrdma_buffer_destroy... */
838 	if (ep->rep_pad_mr) {
839 		rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
840 		ep->rep_pad_mr = NULL;
841 	}
842 
843 	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
844 	rc = ib_destroy_cq(ep->rep_attr.recv_cq);
845 	if (rc)
846 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
847 			__func__, rc);
848 
849 	rpcrdma_clean_cq(ep->rep_attr.send_cq);
850 	rc = ib_destroy_cq(ep->rep_attr.send_cq);
851 	if (rc)
852 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
853 			__func__, rc);
854 }
855 
856 /*
857  * Connect unconnected endpoint.
858  */
859 int
860 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
861 {
862 	struct rdma_cm_id *id;
863 	int rc = 0;
864 	int retry_count = 0;
865 
866 	if (ep->rep_connected != 0) {
867 		struct rpcrdma_xprt *xprt;
868 retry:
869 		dprintk("RPC:       %s: reconnecting...\n", __func__);
870 		rc = rpcrdma_ep_disconnect(ep, ia);
871 		if (rc && rc != -ENOTCONN)
872 			dprintk("RPC:       %s: rpcrdma_ep_disconnect"
873 				" status %i\n", __func__, rc);
874 
875 		rpcrdma_clean_cq(ep->rep_attr.recv_cq);
876 		rpcrdma_clean_cq(ep->rep_attr.send_cq);
877 
878 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
879 		id = rpcrdma_create_id(xprt, ia,
880 				(struct sockaddr *)&xprt->rx_data.addr);
881 		if (IS_ERR(id)) {
882 			rc = -EHOSTUNREACH;
883 			goto out;
884 		}
885 		/* TEMP TEMP TEMP - fail if new device:
886 		 * Deregister/remarshal *all* requests!
887 		 * Close and recreate adapter, pd, etc!
888 		 * Re-determine all attributes still sane!
889 		 * More stuff I haven't thought of!
890 		 * Rrrgh!
891 		 */
892 		if (ia->ri_id->device != id->device) {
893 			printk("RPC:       %s: can't reconnect on "
894 				"different device!\n", __func__);
895 			rdma_destroy_id(id);
896 			rc = -ENETUNREACH;
897 			goto out;
898 		}
899 		/* END TEMP */
900 		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
901 		if (rc) {
902 			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
903 				__func__, rc);
904 			rdma_destroy_id(id);
905 			rc = -ENETUNREACH;
906 			goto out;
907 		}
908 		rdma_destroy_qp(ia->ri_id);
909 		rdma_destroy_id(ia->ri_id);
910 		ia->ri_id = id;
911 	} else {
912 		dprintk("RPC:       %s: connecting...\n", __func__);
913 		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
914 		if (rc) {
915 			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
916 				__func__, rc);
917 			/* do not update ep->rep_connected */
918 			return -ENETUNREACH;
919 		}
920 	}
921 
922 	ep->rep_connected = 0;
923 
924 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
925 	if (rc) {
926 		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
927 				__func__, rc);
928 		goto out;
929 	}
930 
931 	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
932 
933 	/*
934 	 * Check state. A non-peer reject indicates no listener
935 	 * (ECONNREFUSED), which may be a transient state. All
936 	 * others indicate a transport condition which has already
937 	 * undergone a best-effort.
938 	 */
939 	if (ep->rep_connected == -ECONNREFUSED &&
940 	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
941 		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
942 		goto retry;
943 	}
944 	if (ep->rep_connected <= 0) {
945 		/* Sometimes, the only way to reliably connect to remote
946 		 * CMs is to use same nonzero values for ORD and IRD. */
947 		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
948 		    (ep->rep_remote_cma.responder_resources == 0 ||
949 		     ep->rep_remote_cma.initiator_depth !=
950 				ep->rep_remote_cma.responder_resources)) {
951 			if (ep->rep_remote_cma.responder_resources == 0)
952 				ep->rep_remote_cma.responder_resources = 1;
953 			ep->rep_remote_cma.initiator_depth =
954 				ep->rep_remote_cma.responder_resources;
955 			goto retry;
956 		}
957 		rc = ep->rep_connected;
958 	} else {
959 		dprintk("RPC:       %s: connected\n", __func__);
960 	}
961 
962 out:
963 	if (rc)
964 		ep->rep_connected = rc;
965 	return rc;
966 }
967 
968 /*
969  * rpcrdma_ep_disconnect
970  *
971  * This is separate from destroy to facilitate the ability
972  * to reconnect without recreating the endpoint.
973  *
974  * This call is not reentrant, and must not be made in parallel
975  * on the same endpoint.
976  */
977 int
978 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
979 {
980 	int rc;
981 
982 	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
983 	rpcrdma_clean_cq(ep->rep_attr.send_cq);
984 	rc = rdma_disconnect(ia->ri_id);
985 	if (!rc) {
986 		/* returns without wait if not connected */
987 		wait_event_interruptible(ep->rep_connect_wait,
988 							ep->rep_connected != 1);
989 		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
990 			(ep->rep_connected == 1) ? "still " : "dis");
991 	} else {
992 		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
993 		ep->rep_connected = rc;
994 	}
995 	return rc;
996 }
997 
998 /*
999  * Initialize buffer memory
1000  */
1001 int
1002 rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
1003 	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
1004 {
1005 	char *p;
1006 	size_t len, rlen, wlen;
1007 	int i, rc;
1008 	struct rpcrdma_mw *r;
1009 
1010 	buf->rb_max_requests = cdata->max_requests;
1011 	spin_lock_init(&buf->rb_lock);
1012 	atomic_set(&buf->rb_credits, 1);
1013 
1014 	/* Need to allocate:
1015 	 *   1.  arrays for send and recv pointers
1016 	 *   2.  arrays of struct rpcrdma_req to fill in pointers
1017 	 *   3.  array of struct rpcrdma_rep for replies
1018 	 *   4.  padding, if any
1019 	 *   5.  mw's, fmr's or frmr's, if any
1020 	 * Send/recv buffers in req/rep need to be registered
1021 	 */
1022 
1023 	len = buf->rb_max_requests *
1024 		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
1025 	len += cdata->padding;
1026 	switch (ia->ri_memreg_strategy) {
1027 	case RPCRDMA_FRMR:
1028 		len += buf->rb_max_requests * RPCRDMA_MAX_SEGS *
1029 				sizeof(struct rpcrdma_mw);
1030 		break;
1031 	case RPCRDMA_MTHCAFMR:
1032 		/* TBD we are perhaps overallocating here */
1033 		len += (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS *
1034 				sizeof(struct rpcrdma_mw);
1035 		break;
1036 	default:
1037 		break;
1038 	}
1039 
1040 	/* allocate 1, 4 and 5 in one shot */
1041 	p = kzalloc(len, GFP_KERNEL);
1042 	if (p == NULL) {
1043 		dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
1044 			__func__, len);
1045 		rc = -ENOMEM;
1046 		goto out;
1047 	}
1048 	buf->rb_pool = p;	/* for freeing it later */
1049 
1050 	buf->rb_send_bufs = (struct rpcrdma_req **) p;
1051 	p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
1052 	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
1053 	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
1054 
1055 	/*
1056 	 * Register the zeroed pad buffer, if any.
1057 	 */
1058 	if (cdata->padding) {
1059 		rc = rpcrdma_register_internal(ia, p, cdata->padding,
1060 					    &ep->rep_pad_mr, &ep->rep_pad);
1061 		if (rc)
1062 			goto out;
1063 	}
1064 	p += cdata->padding;
1065 
1066 	INIT_LIST_HEAD(&buf->rb_mws);
1067 	r = (struct rpcrdma_mw *)p;
1068 	switch (ia->ri_memreg_strategy) {
1069 	case RPCRDMA_FRMR:
1070 		for (i = buf->rb_max_requests * RPCRDMA_MAX_SEGS; i; i--) {
1071 			r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(ia->ri_pd,
1072 						ia->ri_max_frmr_depth);
1073 			if (IS_ERR(r->r.frmr.fr_mr)) {
1074 				rc = PTR_ERR(r->r.frmr.fr_mr);
1075 				dprintk("RPC:       %s: ib_alloc_fast_reg_mr"
1076 					" failed %i\n", __func__, rc);
1077 				goto out;
1078 			}
1079 			r->r.frmr.fr_pgl = ib_alloc_fast_reg_page_list(
1080 						ia->ri_id->device,
1081 						ia->ri_max_frmr_depth);
1082 			if (IS_ERR(r->r.frmr.fr_pgl)) {
1083 				rc = PTR_ERR(r->r.frmr.fr_pgl);
1084 				dprintk("RPC:       %s: "
1085 					"ib_alloc_fast_reg_page_list "
1086 					"failed %i\n", __func__, rc);
1087 
1088 				ib_dereg_mr(r->r.frmr.fr_mr);
1089 				goto out;
1090 			}
1091 			list_add(&r->mw_list, &buf->rb_mws);
1092 			++r;
1093 		}
1094 		break;
1095 	case RPCRDMA_MTHCAFMR:
1096 		/* TBD we are perhaps overallocating here */
1097 		for (i = (buf->rb_max_requests+1) * RPCRDMA_MAX_SEGS; i; i--) {
1098 			static struct ib_fmr_attr fa =
1099 				{ RPCRDMA_MAX_DATA_SEGS, 1, PAGE_SHIFT };
1100 			r->r.fmr = ib_alloc_fmr(ia->ri_pd,
1101 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ,
1102 				&fa);
1103 			if (IS_ERR(r->r.fmr)) {
1104 				rc = PTR_ERR(r->r.fmr);
1105 				dprintk("RPC:       %s: ib_alloc_fmr"
1106 					" failed %i\n", __func__, rc);
1107 				goto out;
1108 			}
1109 			list_add(&r->mw_list, &buf->rb_mws);
1110 			++r;
1111 		}
1112 		break;
1113 	default:
1114 		break;
1115 	}
1116 
1117 	/*
1118 	 * Allocate/init the request/reply buffers. Doing this
1119 	 * using kmalloc for now -- one for each buf.
1120 	 */
1121 	wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req));
1122 	rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep));
1123 	dprintk("RPC:       %s: wlen = %zu, rlen = %zu\n",
1124 		__func__, wlen, rlen);
1125 
1126 	for (i = 0; i < buf->rb_max_requests; i++) {
1127 		struct rpcrdma_req *req;
1128 		struct rpcrdma_rep *rep;
1129 
1130 		req = kmalloc(wlen, GFP_KERNEL);
1131 		if (req == NULL) {
1132 			dprintk("RPC:       %s: request buffer %d alloc"
1133 				" failed\n", __func__, i);
1134 			rc = -ENOMEM;
1135 			goto out;
1136 		}
1137 		memset(req, 0, sizeof(struct rpcrdma_req));
1138 		buf->rb_send_bufs[i] = req;
1139 		buf->rb_send_bufs[i]->rl_buffer = buf;
1140 
1141 		rc = rpcrdma_register_internal(ia, req->rl_base,
1142 				wlen - offsetof(struct rpcrdma_req, rl_base),
1143 				&buf->rb_send_bufs[i]->rl_handle,
1144 				&buf->rb_send_bufs[i]->rl_iov);
1145 		if (rc)
1146 			goto out;
1147 
1148 		buf->rb_send_bufs[i]->rl_size = wlen -
1149 						sizeof(struct rpcrdma_req);
1150 
1151 		rep = kmalloc(rlen, GFP_KERNEL);
1152 		if (rep == NULL) {
1153 			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
1154 				__func__, i);
1155 			rc = -ENOMEM;
1156 			goto out;
1157 		}
1158 		memset(rep, 0, sizeof(struct rpcrdma_rep));
1159 		buf->rb_recv_bufs[i] = rep;
1160 		buf->rb_recv_bufs[i]->rr_buffer = buf;
1161 
1162 		rc = rpcrdma_register_internal(ia, rep->rr_base,
1163 				rlen - offsetof(struct rpcrdma_rep, rr_base),
1164 				&buf->rb_recv_bufs[i]->rr_handle,
1165 				&buf->rb_recv_bufs[i]->rr_iov);
1166 		if (rc)
1167 			goto out;
1168 
1169 	}
1170 	dprintk("RPC:       %s: max_requests %d\n",
1171 		__func__, buf->rb_max_requests);
1172 	/* done */
1173 	return 0;
1174 out:
1175 	rpcrdma_buffer_destroy(buf);
1176 	return rc;
1177 }
1178 
1179 /*
1180  * Unregister and destroy buffer memory. Need to deal with
1181  * partial initialization, so it's callable from failed create.
1182  * Must be called before destroying endpoint, as registrations
1183  * reference it.
1184  */
1185 void
1186 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1187 {
1188 	int rc, i;
1189 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1190 	struct rpcrdma_mw *r;
1191 
1192 	/* clean up in reverse order from create
1193 	 *   1.  recv mr memory (mr free, then kfree)
1194 	 *   2.  send mr memory (mr free, then kfree)
1195 	 *   3.  padding (if any) [moved to rpcrdma_ep_destroy]
1196 	 *   4.  arrays
1197 	 */
1198 	dprintk("RPC:       %s: entering\n", __func__);
1199 
1200 	for (i = 0; i < buf->rb_max_requests; i++) {
1201 		if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
1202 			rpcrdma_deregister_internal(ia,
1203 					buf->rb_recv_bufs[i]->rr_handle,
1204 					&buf->rb_recv_bufs[i]->rr_iov);
1205 			kfree(buf->rb_recv_bufs[i]);
1206 		}
1207 		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
1208 			rpcrdma_deregister_internal(ia,
1209 					buf->rb_send_bufs[i]->rl_handle,
1210 					&buf->rb_send_bufs[i]->rl_iov);
1211 			kfree(buf->rb_send_bufs[i]);
1212 		}
1213 	}
1214 
1215 	while (!list_empty(&buf->rb_mws)) {
1216 		r = list_entry(buf->rb_mws.next,
1217 			struct rpcrdma_mw, mw_list);
1218 		list_del(&r->mw_list);
1219 		switch (ia->ri_memreg_strategy) {
1220 		case RPCRDMA_FRMR:
1221 			rc = ib_dereg_mr(r->r.frmr.fr_mr);
1222 			if (rc)
1223 				dprintk("RPC:       %s:"
1224 					" ib_dereg_mr"
1225 					" failed %i\n",
1226 					__func__, rc);
1227 			ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
1228 			break;
1229 		case RPCRDMA_MTHCAFMR:
1230 			rc = ib_dealloc_fmr(r->r.fmr);
1231 			if (rc)
1232 				dprintk("RPC:       %s:"
1233 					" ib_dealloc_fmr"
1234 					" failed %i\n",
1235 					__func__, rc);
1236 			break;
1237 		default:
1238 			break;
1239 		}
1240 	}
1241 
1242 	kfree(buf->rb_pool);
1243 }
1244 
1245 /*
1246  * Get a set of request/reply buffers.
1247  *
1248  * Reply buffer (if needed) is attached to send buffer upon return.
1249  * Rule:
1250  *    rb_send_index and rb_recv_index MUST always be pointing to the
1251  *    *next* available buffer (non-NULL). They are incremented after
1252  *    removing buffers, and decremented *before* returning them.
1253  */
1254 struct rpcrdma_req *
1255 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1256 {
1257 	struct rpcrdma_req *req;
1258 	unsigned long flags;
1259 	int i;
1260 	struct rpcrdma_mw *r;
1261 
1262 	spin_lock_irqsave(&buffers->rb_lock, flags);
1263 	if (buffers->rb_send_index == buffers->rb_max_requests) {
1264 		spin_unlock_irqrestore(&buffers->rb_lock, flags);
1265 		dprintk("RPC:       %s: out of request buffers\n", __func__);
1266 		return ((struct rpcrdma_req *)NULL);
1267 	}
1268 
1269 	req = buffers->rb_send_bufs[buffers->rb_send_index];
1270 	if (buffers->rb_send_index < buffers->rb_recv_index) {
1271 		dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
1272 			__func__,
1273 			buffers->rb_recv_index - buffers->rb_send_index);
1274 		req->rl_reply = NULL;
1275 	} else {
1276 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1277 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1278 	}
1279 	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
1280 	if (!list_empty(&buffers->rb_mws)) {
1281 		i = RPCRDMA_MAX_SEGS - 1;
1282 		do {
1283 			r = list_entry(buffers->rb_mws.next,
1284 					struct rpcrdma_mw, mw_list);
1285 			list_del(&r->mw_list);
1286 			req->rl_segments[i].mr_chunk.rl_mw = r;
1287 		} while (--i >= 0);
1288 	}
1289 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1290 	return req;
1291 }
1292 
1293 /*
1294  * Put request/reply buffers back into pool.
1295  * Pre-decrement counter/array index.
1296  */
1297 void
1298 rpcrdma_buffer_put(struct rpcrdma_req *req)
1299 {
1300 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1301 	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
1302 	int i;
1303 	unsigned long flags;
1304 
1305 	spin_lock_irqsave(&buffers->rb_lock, flags);
1306 	buffers->rb_send_bufs[--buffers->rb_send_index] = req;
1307 	req->rl_niovs = 0;
1308 	if (req->rl_reply) {
1309 		buffers->rb_recv_bufs[--buffers->rb_recv_index] = req->rl_reply;
1310 		req->rl_reply->rr_func = NULL;
1311 		req->rl_reply = NULL;
1312 	}
1313 	switch (ia->ri_memreg_strategy) {
1314 	case RPCRDMA_FRMR:
1315 	case RPCRDMA_MTHCAFMR:
1316 		/*
1317 		 * Cycle mw's back in reverse order, and "spin" them.
1318 		 * This delays and scrambles reuse as much as possible.
1319 		 */
1320 		i = 1;
1321 		do {
1322 			struct rpcrdma_mw **mw;
1323 			mw = &req->rl_segments[i].mr_chunk.rl_mw;
1324 			list_add_tail(&(*mw)->mw_list, &buffers->rb_mws);
1325 			*mw = NULL;
1326 		} while (++i < RPCRDMA_MAX_SEGS);
1327 		list_add_tail(&req->rl_segments[0].mr_chunk.rl_mw->mw_list,
1328 					&buffers->rb_mws);
1329 		req->rl_segments[0].mr_chunk.rl_mw = NULL;
1330 		break;
1331 	default:
1332 		break;
1333 	}
1334 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1335 }
1336 
1337 /*
1338  * Recover reply buffers from pool.
1339  * This happens when recovering from error conditions.
1340  * Post-increment counter/array index.
1341  */
1342 void
1343 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1344 {
1345 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1346 	unsigned long flags;
1347 
1348 	if (req->rl_iov.length == 0)	/* special case xprt_rdma_allocate() */
1349 		buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
1350 	spin_lock_irqsave(&buffers->rb_lock, flags);
1351 	if (buffers->rb_recv_index < buffers->rb_max_requests) {
1352 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
1353 		buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
1354 	}
1355 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1356 }
1357 
1358 /*
1359  * Put reply buffers back into pool when not attached to
1360  * request. This happens in error conditions.
1361  */
1362 void
1363 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1364 {
1365 	struct rpcrdma_buffer *buffers = rep->rr_buffer;
1366 	unsigned long flags;
1367 
1368 	rep->rr_func = NULL;
1369 	spin_lock_irqsave(&buffers->rb_lock, flags);
1370 	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
1371 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
1372 }
1373 
1374 /*
1375  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1376  */
1377 
1378 int
1379 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
1380 				struct ib_mr **mrp, struct ib_sge *iov)
1381 {
1382 	struct ib_phys_buf ipb;
1383 	struct ib_mr *mr;
1384 	int rc;
1385 
1386 	/*
1387 	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
1388 	 */
1389 	iov->addr = ib_dma_map_single(ia->ri_id->device,
1390 			va, len, DMA_BIDIRECTIONAL);
1391 	iov->length = len;
1392 
1393 	if (ia->ri_have_dma_lkey) {
1394 		*mrp = NULL;
1395 		iov->lkey = ia->ri_dma_lkey;
1396 		return 0;
1397 	} else if (ia->ri_bind_mem != NULL) {
1398 		*mrp = NULL;
1399 		iov->lkey = ia->ri_bind_mem->lkey;
1400 		return 0;
1401 	}
1402 
1403 	ipb.addr = iov->addr;
1404 	ipb.size = iov->length;
1405 	mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
1406 			IB_ACCESS_LOCAL_WRITE, &iov->addr);
1407 
1408 	dprintk("RPC:       %s: phys convert: 0x%llx "
1409 			"registered 0x%llx length %d\n",
1410 			__func__, (unsigned long long)ipb.addr,
1411 			(unsigned long long)iov->addr, len);
1412 
1413 	if (IS_ERR(mr)) {
1414 		*mrp = NULL;
1415 		rc = PTR_ERR(mr);
1416 		dprintk("RPC:       %s: failed with %i\n", __func__, rc);
1417 	} else {
1418 		*mrp = mr;
1419 		iov->lkey = mr->lkey;
1420 		rc = 0;
1421 	}
1422 
1423 	return rc;
1424 }
1425 
1426 int
1427 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
1428 				struct ib_mr *mr, struct ib_sge *iov)
1429 {
1430 	int rc;
1431 
1432 	ib_dma_unmap_single(ia->ri_id->device,
1433 			iov->addr, iov->length, DMA_BIDIRECTIONAL);
1434 
1435 	if (NULL == mr)
1436 		return 0;
1437 
1438 	rc = ib_dereg_mr(mr);
1439 	if (rc)
1440 		dprintk("RPC:       %s: ib_dereg_mr failed %i\n", __func__, rc);
1441 	return rc;
1442 }
1443 
1444 /*
1445  * Wrappers for chunk registration, shared by read/write chunk code.
1446  */
1447 
1448 static void
1449 rpcrdma_map_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg, int writing)
1450 {
1451 	seg->mr_dir = writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
1452 	seg->mr_dmalen = seg->mr_len;
1453 	if (seg->mr_page)
1454 		seg->mr_dma = ib_dma_map_page(ia->ri_id->device,
1455 				seg->mr_page, offset_in_page(seg->mr_offset),
1456 				seg->mr_dmalen, seg->mr_dir);
1457 	else
1458 		seg->mr_dma = ib_dma_map_single(ia->ri_id->device,
1459 				seg->mr_offset,
1460 				seg->mr_dmalen, seg->mr_dir);
1461 	if (ib_dma_mapping_error(ia->ri_id->device, seg->mr_dma)) {
1462 		dprintk("RPC:       %s: mr_dma %llx mr_offset %p mr_dma_len %zu\n",
1463 			__func__,
1464 			(unsigned long long)seg->mr_dma,
1465 			seg->mr_offset, seg->mr_dmalen);
1466 	}
1467 }
1468 
1469 static void
1470 rpcrdma_unmap_one(struct rpcrdma_ia *ia, struct rpcrdma_mr_seg *seg)
1471 {
1472 	if (seg->mr_page)
1473 		ib_dma_unmap_page(ia->ri_id->device,
1474 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1475 	else
1476 		ib_dma_unmap_single(ia->ri_id->device,
1477 				seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
1478 }
1479 
1480 static int
1481 rpcrdma_register_frmr_external(struct rpcrdma_mr_seg *seg,
1482 			int *nsegs, int writing, struct rpcrdma_ia *ia,
1483 			struct rpcrdma_xprt *r_xprt)
1484 {
1485 	struct rpcrdma_mr_seg *seg1 = seg;
1486 	struct ib_send_wr invalidate_wr, frmr_wr, *bad_wr, *post_wr;
1487 
1488 	u8 key;
1489 	int len, pageoff;
1490 	int i, rc;
1491 	int seg_len;
1492 	u64 pa;
1493 	int page_no;
1494 
1495 	pageoff = offset_in_page(seg1->mr_offset);
1496 	seg1->mr_offset -= pageoff;	/* start of page */
1497 	seg1->mr_len += pageoff;
1498 	len = -pageoff;
1499 	if (*nsegs > ia->ri_max_frmr_depth)
1500 		*nsegs = ia->ri_max_frmr_depth;
1501 	for (page_no = i = 0; i < *nsegs;) {
1502 		rpcrdma_map_one(ia, seg, writing);
1503 		pa = seg->mr_dma;
1504 		for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
1505 			seg1->mr_chunk.rl_mw->r.frmr.fr_pgl->
1506 				page_list[page_no++] = pa;
1507 			pa += PAGE_SIZE;
1508 		}
1509 		len += seg->mr_len;
1510 		++seg;
1511 		++i;
1512 		/* Check for holes */
1513 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1514 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1515 			break;
1516 	}
1517 	dprintk("RPC:       %s: Using frmr %p to map %d segments\n",
1518 		__func__, seg1->mr_chunk.rl_mw, i);
1519 
1520 	if (unlikely(seg1->mr_chunk.rl_mw->r.frmr.state == FRMR_IS_VALID)) {
1521 		dprintk("RPC:       %s: frmr %x left valid, posting invalidate.\n",
1522 			__func__,
1523 			seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey);
1524 		/* Invalidate before using. */
1525 		memset(&invalidate_wr, 0, sizeof invalidate_wr);
1526 		invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1527 		invalidate_wr.next = &frmr_wr;
1528 		invalidate_wr.opcode = IB_WR_LOCAL_INV;
1529 		invalidate_wr.send_flags = IB_SEND_SIGNALED;
1530 		invalidate_wr.ex.invalidate_rkey =
1531 			seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1532 		DECR_CQCOUNT(&r_xprt->rx_ep);
1533 		post_wr = &invalidate_wr;
1534 	} else
1535 		post_wr = &frmr_wr;
1536 
1537 	/* Prepare FRMR WR */
1538 	memset(&frmr_wr, 0, sizeof frmr_wr);
1539 	frmr_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1540 	frmr_wr.opcode = IB_WR_FAST_REG_MR;
1541 	frmr_wr.send_flags = IB_SEND_SIGNALED;
1542 	frmr_wr.wr.fast_reg.iova_start = seg1->mr_dma;
1543 	frmr_wr.wr.fast_reg.page_list = seg1->mr_chunk.rl_mw->r.frmr.fr_pgl;
1544 	frmr_wr.wr.fast_reg.page_list_len = page_no;
1545 	frmr_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
1546 	frmr_wr.wr.fast_reg.length = page_no << PAGE_SHIFT;
1547 	if (frmr_wr.wr.fast_reg.length < len) {
1548 		while (seg1->mr_nsegs--)
1549 			rpcrdma_unmap_one(ia, seg++);
1550 		return -EIO;
1551 	}
1552 
1553 	/* Bump the key */
1554 	key = (u8)(seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey & 0x000000FF);
1555 	ib_update_fast_reg_key(seg1->mr_chunk.rl_mw->r.frmr.fr_mr, ++key);
1556 
1557 	frmr_wr.wr.fast_reg.access_flags = (writing ?
1558 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
1559 				IB_ACCESS_REMOTE_READ);
1560 	frmr_wr.wr.fast_reg.rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1561 	DECR_CQCOUNT(&r_xprt->rx_ep);
1562 
1563 	rc = ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
1564 
1565 	if (rc) {
1566 		dprintk("RPC:       %s: failed ib_post_send for register,"
1567 			" status %i\n", __func__, rc);
1568 		while (i--)
1569 			rpcrdma_unmap_one(ia, --seg);
1570 	} else {
1571 		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1572 		seg1->mr_base = seg1->mr_dma + pageoff;
1573 		seg1->mr_nsegs = i;
1574 		seg1->mr_len = len;
1575 	}
1576 	*nsegs = i;
1577 	return rc;
1578 }
1579 
1580 static int
1581 rpcrdma_deregister_frmr_external(struct rpcrdma_mr_seg *seg,
1582 			struct rpcrdma_ia *ia, struct rpcrdma_xprt *r_xprt)
1583 {
1584 	struct rpcrdma_mr_seg *seg1 = seg;
1585 	struct ib_send_wr invalidate_wr, *bad_wr;
1586 	int rc;
1587 
1588 	while (seg1->mr_nsegs--)
1589 		rpcrdma_unmap_one(ia, seg++);
1590 
1591 	memset(&invalidate_wr, 0, sizeof invalidate_wr);
1592 	invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
1593 	invalidate_wr.opcode = IB_WR_LOCAL_INV;
1594 	invalidate_wr.send_flags = IB_SEND_SIGNALED;
1595 	invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
1596 	DECR_CQCOUNT(&r_xprt->rx_ep);
1597 
1598 	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
1599 	if (rc)
1600 		dprintk("RPC:       %s: failed ib_post_send for invalidate,"
1601 			" status %i\n", __func__, rc);
1602 	return rc;
1603 }
1604 
1605 static int
1606 rpcrdma_register_fmr_external(struct rpcrdma_mr_seg *seg,
1607 			int *nsegs, int writing, struct rpcrdma_ia *ia)
1608 {
1609 	struct rpcrdma_mr_seg *seg1 = seg;
1610 	u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
1611 	int len, pageoff, i, rc;
1612 
1613 	pageoff = offset_in_page(seg1->mr_offset);
1614 	seg1->mr_offset -= pageoff;	/* start of page */
1615 	seg1->mr_len += pageoff;
1616 	len = -pageoff;
1617 	if (*nsegs > RPCRDMA_MAX_DATA_SEGS)
1618 		*nsegs = RPCRDMA_MAX_DATA_SEGS;
1619 	for (i = 0; i < *nsegs;) {
1620 		rpcrdma_map_one(ia, seg, writing);
1621 		physaddrs[i] = seg->mr_dma;
1622 		len += seg->mr_len;
1623 		++seg;
1624 		++i;
1625 		/* Check for holes */
1626 		if ((i < *nsegs && offset_in_page(seg->mr_offset)) ||
1627 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
1628 			break;
1629 	}
1630 	rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
1631 				physaddrs, i, seg1->mr_dma);
1632 	if (rc) {
1633 		dprintk("RPC:       %s: failed ib_map_phys_fmr "
1634 			"%u@0x%llx+%i (%d)... status %i\n", __func__,
1635 			len, (unsigned long long)seg1->mr_dma,
1636 			pageoff, i, rc);
1637 		while (i--)
1638 			rpcrdma_unmap_one(ia, --seg);
1639 	} else {
1640 		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
1641 		seg1->mr_base = seg1->mr_dma + pageoff;
1642 		seg1->mr_nsegs = i;
1643 		seg1->mr_len = len;
1644 	}
1645 	*nsegs = i;
1646 	return rc;
1647 }
1648 
1649 static int
1650 rpcrdma_deregister_fmr_external(struct rpcrdma_mr_seg *seg,
1651 			struct rpcrdma_ia *ia)
1652 {
1653 	struct rpcrdma_mr_seg *seg1 = seg;
1654 	LIST_HEAD(l);
1655 	int rc;
1656 
1657 	list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
1658 	rc = ib_unmap_fmr(&l);
1659 	while (seg1->mr_nsegs--)
1660 		rpcrdma_unmap_one(ia, seg++);
1661 	if (rc)
1662 		dprintk("RPC:       %s: failed ib_unmap_fmr,"
1663 			" status %i\n", __func__, rc);
1664 	return rc;
1665 }
1666 
1667 int
1668 rpcrdma_register_external(struct rpcrdma_mr_seg *seg,
1669 			int nsegs, int writing, struct rpcrdma_xprt *r_xprt)
1670 {
1671 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1672 	int rc = 0;
1673 
1674 	switch (ia->ri_memreg_strategy) {
1675 
1676 #if RPCRDMA_PERSISTENT_REGISTRATION
1677 	case RPCRDMA_ALLPHYSICAL:
1678 		rpcrdma_map_one(ia, seg, writing);
1679 		seg->mr_rkey = ia->ri_bind_mem->rkey;
1680 		seg->mr_base = seg->mr_dma;
1681 		seg->mr_nsegs = 1;
1682 		nsegs = 1;
1683 		break;
1684 #endif
1685 
1686 	/* Registration using frmr registration */
1687 	case RPCRDMA_FRMR:
1688 		rc = rpcrdma_register_frmr_external(seg, &nsegs, writing, ia, r_xprt);
1689 		break;
1690 
1691 	/* Registration using fmr memory registration */
1692 	case RPCRDMA_MTHCAFMR:
1693 		rc = rpcrdma_register_fmr_external(seg, &nsegs, writing, ia);
1694 		break;
1695 
1696 	default:
1697 		return -1;
1698 	}
1699 	if (rc)
1700 		return -1;
1701 
1702 	return nsegs;
1703 }
1704 
1705 int
1706 rpcrdma_deregister_external(struct rpcrdma_mr_seg *seg,
1707 		struct rpcrdma_xprt *r_xprt)
1708 {
1709 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1710 	int nsegs = seg->mr_nsegs, rc;
1711 
1712 	switch (ia->ri_memreg_strategy) {
1713 
1714 #if RPCRDMA_PERSISTENT_REGISTRATION
1715 	case RPCRDMA_ALLPHYSICAL:
1716 		rpcrdma_unmap_one(ia, seg);
1717 		break;
1718 #endif
1719 
1720 	case RPCRDMA_FRMR:
1721 		rc = rpcrdma_deregister_frmr_external(seg, ia, r_xprt);
1722 		break;
1723 
1724 	case RPCRDMA_MTHCAFMR:
1725 		rc = rpcrdma_deregister_fmr_external(seg, ia);
1726 		break;
1727 
1728 	default:
1729 		break;
1730 	}
1731 	return nsegs;
1732 }
1733 
1734 /*
1735  * Prepost any receive buffer, then post send.
1736  *
1737  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1738  */
1739 int
1740 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1741 		struct rpcrdma_ep *ep,
1742 		struct rpcrdma_req *req)
1743 {
1744 	struct ib_send_wr send_wr, *send_wr_fail;
1745 	struct rpcrdma_rep *rep = req->rl_reply;
1746 	int rc;
1747 
1748 	if (rep) {
1749 		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1750 		if (rc)
1751 			goto out;
1752 		req->rl_reply = NULL;
1753 	}
1754 
1755 	send_wr.next = NULL;
1756 	send_wr.wr_id = 0ULL;	/* no send cookie */
1757 	send_wr.sg_list = req->rl_send_iov;
1758 	send_wr.num_sge = req->rl_niovs;
1759 	send_wr.opcode = IB_WR_SEND;
1760 	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
1761 		ib_dma_sync_single_for_device(ia->ri_id->device,
1762 			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
1763 			DMA_TO_DEVICE);
1764 	ib_dma_sync_single_for_device(ia->ri_id->device,
1765 		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
1766 		DMA_TO_DEVICE);
1767 	ib_dma_sync_single_for_device(ia->ri_id->device,
1768 		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
1769 		DMA_TO_DEVICE);
1770 
1771 	if (DECR_CQCOUNT(ep) > 0)
1772 		send_wr.send_flags = 0;
1773 	else { /* Provider must take a send completion every now and then */
1774 		INIT_CQCOUNT(ep);
1775 		send_wr.send_flags = IB_SEND_SIGNALED;
1776 	}
1777 
1778 	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1779 	if (rc)
1780 		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1781 			rc);
1782 out:
1783 	return rc;
1784 }
1785 
1786 /*
1787  * (Re)post a receive buffer.
1788  */
1789 int
1790 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1791 		     struct rpcrdma_ep *ep,
1792 		     struct rpcrdma_rep *rep)
1793 {
1794 	struct ib_recv_wr recv_wr, *recv_wr_fail;
1795 	int rc;
1796 
1797 	recv_wr.next = NULL;
1798 	recv_wr.wr_id = (u64) (unsigned long) rep;
1799 	recv_wr.sg_list = &rep->rr_iov;
1800 	recv_wr.num_sge = 1;
1801 
1802 	ib_dma_sync_single_for_cpu(ia->ri_id->device,
1803 		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
1804 
1805 	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1806 
1807 	if (rc)
1808 		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1809 			rc);
1810 	return rc;
1811 }
1812