xref: /openbmc/linux/net/sunrpc/xprtrdma/verbs.c (revision 23c2b932)
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39 
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49 
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <linux/prefetch.h>
53 #include <linux/sunrpc/addr.h>
54 #include <asm/bitops.h>
55 #include <linux/module.h> /* try_module_get()/module_put() */
56 
57 #include "xprt_rdma.h"
58 
59 /*
60  * Globals/Macros
61  */
62 
63 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
64 # define RPCDBG_FACILITY	RPCDBG_TRANS
65 #endif
66 
67 /*
68  * internal functions
69  */
70 
71 static struct workqueue_struct *rpcrdma_receive_wq;
72 
73 int
74 rpcrdma_alloc_wq(void)
75 {
76 	struct workqueue_struct *recv_wq;
77 
78 	recv_wq = alloc_workqueue("xprtrdma_receive",
79 				  WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
80 				  0);
81 	if (!recv_wq)
82 		return -ENOMEM;
83 
84 	rpcrdma_receive_wq = recv_wq;
85 	return 0;
86 }
87 
88 void
89 rpcrdma_destroy_wq(void)
90 {
91 	struct workqueue_struct *wq;
92 
93 	if (rpcrdma_receive_wq) {
94 		wq = rpcrdma_receive_wq;
95 		rpcrdma_receive_wq = NULL;
96 		destroy_workqueue(wq);
97 	}
98 }
99 
100 static void
101 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
102 {
103 	struct rpcrdma_ep *ep = context;
104 
105 	pr_err("RPC:       %s: %s on device %s ep %p\n",
106 	       __func__, ib_event_msg(event->event),
107 		event->device->name, context);
108 	if (ep->rep_connected == 1) {
109 		ep->rep_connected = -EIO;
110 		rpcrdma_conn_func(ep);
111 		wake_up_all(&ep->rep_connect_wait);
112 	}
113 }
114 
115 /**
116  * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
117  * @cq:	completion queue (ignored)
118  * @wc:	completed WR
119  *
120  */
121 static void
122 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
123 {
124 	/* WARNING: Only wr_cqe and status are reliable at this point */
125 	if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
126 		pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
127 		       ib_wc_status_msg(wc->status),
128 		       wc->status, wc->vendor_err);
129 }
130 
131 static void
132 rpcrdma_receive_worker(struct work_struct *work)
133 {
134 	struct rpcrdma_rep *rep =
135 			container_of(work, struct rpcrdma_rep, rr_work);
136 
137 	rpcrdma_reply_handler(rep);
138 }
139 
140 /* Perform basic sanity checking to avoid using garbage
141  * to update the credit grant value.
142  */
143 static void
144 rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
145 {
146 	struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
147 	struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
148 	u32 credits;
149 
150 	if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
151 		return;
152 
153 	credits = be32_to_cpu(rmsgp->rm_credit);
154 	if (credits == 0)
155 		credits = 1;	/* don't deadlock */
156 	else if (credits > buffer->rb_max_requests)
157 		credits = buffer->rb_max_requests;
158 
159 	atomic_set(&buffer->rb_credits, credits);
160 }
161 
162 /**
163  * rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
164  * @cq:	completion queue (ignored)
165  * @wc:	completed WR
166  *
167  */
168 static void
169 rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
170 {
171 	struct ib_cqe *cqe = wc->wr_cqe;
172 	struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
173 					       rr_cqe);
174 
175 	/* WARNING: Only wr_id and status are reliable at this point */
176 	if (wc->status != IB_WC_SUCCESS)
177 		goto out_fail;
178 
179 	/* status == SUCCESS means all fields in wc are trustworthy */
180 	if (wc->opcode != IB_WC_RECV)
181 		return;
182 
183 	dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
184 		__func__, rep, wc->byte_len);
185 
186 	rep->rr_len = wc->byte_len;
187 	ib_dma_sync_single_for_cpu(rep->rr_device,
188 				   rdmab_addr(rep->rr_rdmabuf),
189 				   rep->rr_len, DMA_FROM_DEVICE);
190 
191 	rpcrdma_update_granted_credits(rep);
192 
193 out_schedule:
194 	queue_work(rpcrdma_receive_wq, &rep->rr_work);
195 	return;
196 
197 out_fail:
198 	if (wc->status != IB_WC_WR_FLUSH_ERR)
199 		pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
200 		       ib_wc_status_msg(wc->status),
201 		       wc->status, wc->vendor_err);
202 	rep->rr_len = RPCRDMA_BAD_LEN;
203 	goto out_schedule;
204 }
205 
206 static int
207 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
208 {
209 	struct rpcrdma_xprt *xprt = id->context;
210 	struct rpcrdma_ia *ia = &xprt->rx_ia;
211 	struct rpcrdma_ep *ep = &xprt->rx_ep;
212 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
213 	struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
214 #endif
215 	struct ib_qp_attr *attr = &ia->ri_qp_attr;
216 	struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
217 	int connstate = 0;
218 
219 	switch (event->event) {
220 	case RDMA_CM_EVENT_ADDR_RESOLVED:
221 	case RDMA_CM_EVENT_ROUTE_RESOLVED:
222 		ia->ri_async_rc = 0;
223 		complete(&ia->ri_done);
224 		break;
225 	case RDMA_CM_EVENT_ADDR_ERROR:
226 		ia->ri_async_rc = -EHOSTUNREACH;
227 		dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
228 			__func__, ep);
229 		complete(&ia->ri_done);
230 		break;
231 	case RDMA_CM_EVENT_ROUTE_ERROR:
232 		ia->ri_async_rc = -ENETUNREACH;
233 		dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
234 			__func__, ep);
235 		complete(&ia->ri_done);
236 		break;
237 	case RDMA_CM_EVENT_ESTABLISHED:
238 		connstate = 1;
239 		ib_query_qp(ia->ri_id->qp, attr,
240 			    IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
241 			    iattr);
242 		dprintk("RPC:       %s: %d responder resources"
243 			" (%d initiator)\n",
244 			__func__, attr->max_dest_rd_atomic,
245 			attr->max_rd_atomic);
246 		goto connected;
247 	case RDMA_CM_EVENT_CONNECT_ERROR:
248 		connstate = -ENOTCONN;
249 		goto connected;
250 	case RDMA_CM_EVENT_UNREACHABLE:
251 		connstate = -ENETDOWN;
252 		goto connected;
253 	case RDMA_CM_EVENT_REJECTED:
254 		connstate = -ECONNREFUSED;
255 		goto connected;
256 	case RDMA_CM_EVENT_DISCONNECTED:
257 		connstate = -ECONNABORTED;
258 		goto connected;
259 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
260 		connstate = -ENODEV;
261 connected:
262 		dprintk("RPC:       %s: %sconnected\n",
263 					__func__, connstate > 0 ? "" : "dis");
264 		atomic_set(&xprt->rx_buf.rb_credits, 1);
265 		ep->rep_connected = connstate;
266 		rpcrdma_conn_func(ep);
267 		wake_up_all(&ep->rep_connect_wait);
268 		/*FALLTHROUGH*/
269 	default:
270 		dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
271 			__func__, sap, rpc_get_port(sap), ep,
272 			rdma_event_msg(event->event));
273 		break;
274 	}
275 
276 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
277 	if (connstate == 1) {
278 		int ird = attr->max_dest_rd_atomic;
279 		int tird = ep->rep_remote_cma.responder_resources;
280 
281 		pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
282 			sap, rpc_get_port(sap),
283 			ia->ri_device->name,
284 			ia->ri_ops->ro_displayname,
285 			xprt->rx_buf.rb_max_requests,
286 			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
287 	} else if (connstate < 0) {
288 		pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
289 			sap, rpc_get_port(sap), connstate);
290 	}
291 #endif
292 
293 	return 0;
294 }
295 
296 static void rpcrdma_destroy_id(struct rdma_cm_id *id)
297 {
298 	if (id) {
299 		module_put(id->device->owner);
300 		rdma_destroy_id(id);
301 	}
302 }
303 
304 static struct rdma_cm_id *
305 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
306 			struct rpcrdma_ia *ia, struct sockaddr *addr)
307 {
308 	struct rdma_cm_id *id;
309 	int rc;
310 
311 	init_completion(&ia->ri_done);
312 
313 	id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP,
314 			    IB_QPT_RC);
315 	if (IS_ERR(id)) {
316 		rc = PTR_ERR(id);
317 		dprintk("RPC:       %s: rdma_create_id() failed %i\n",
318 			__func__, rc);
319 		return id;
320 	}
321 
322 	ia->ri_async_rc = -ETIMEDOUT;
323 	rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
324 	if (rc) {
325 		dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
326 			__func__, rc);
327 		goto out;
328 	}
329 	wait_for_completion_interruptible_timeout(&ia->ri_done,
330 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
331 
332 	/* FIXME:
333 	 * Until xprtrdma supports DEVICE_REMOVAL, the provider must
334 	 * be pinned while there are active NFS/RDMA mounts to prevent
335 	 * hangs and crashes at umount time.
336 	 */
337 	if (!ia->ri_async_rc && !try_module_get(id->device->owner)) {
338 		dprintk("RPC:       %s: Failed to get device module\n",
339 			__func__);
340 		ia->ri_async_rc = -ENODEV;
341 	}
342 	rc = ia->ri_async_rc;
343 	if (rc)
344 		goto out;
345 
346 	ia->ri_async_rc = -ETIMEDOUT;
347 	rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
348 	if (rc) {
349 		dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
350 			__func__, rc);
351 		goto put;
352 	}
353 	wait_for_completion_interruptible_timeout(&ia->ri_done,
354 				msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
355 	rc = ia->ri_async_rc;
356 	if (rc)
357 		goto put;
358 
359 	return id;
360 put:
361 	module_put(id->device->owner);
362 out:
363 	rdma_destroy_id(id);
364 	return ERR_PTR(rc);
365 }
366 
367 /*
368  * Exported functions.
369  */
370 
371 /*
372  * Open and initialize an Interface Adapter.
373  *  o initializes fields of struct rpcrdma_ia, including
374  *    interface and provider attributes and protection zone.
375  */
376 int
377 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
378 {
379 	struct rpcrdma_ia *ia = &xprt->rx_ia;
380 	int rc;
381 
382 	ia->ri_dma_mr = NULL;
383 
384 	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
385 	if (IS_ERR(ia->ri_id)) {
386 		rc = PTR_ERR(ia->ri_id);
387 		goto out1;
388 	}
389 	ia->ri_device = ia->ri_id->device;
390 
391 	ia->ri_pd = ib_alloc_pd(ia->ri_device);
392 	if (IS_ERR(ia->ri_pd)) {
393 		rc = PTR_ERR(ia->ri_pd);
394 		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
395 			__func__, rc);
396 		goto out2;
397 	}
398 
399 	if (memreg == RPCRDMA_FRMR) {
400 		if (!(ia->ri_device->attrs.device_cap_flags &
401 				IB_DEVICE_MEM_MGT_EXTENSIONS) ||
402 		    (ia->ri_device->attrs.max_fast_reg_page_list_len == 0)) {
403 			dprintk("RPC:       %s: FRMR registration "
404 				"not supported by HCA\n", __func__);
405 			memreg = RPCRDMA_MTHCAFMR;
406 		}
407 	}
408 	if (memreg == RPCRDMA_MTHCAFMR) {
409 		if (!ia->ri_device->alloc_fmr) {
410 			dprintk("RPC:       %s: MTHCAFMR registration "
411 				"not supported by HCA\n", __func__);
412 			rc = -EINVAL;
413 			goto out3;
414 		}
415 	}
416 
417 	switch (memreg) {
418 	case RPCRDMA_FRMR:
419 		ia->ri_ops = &rpcrdma_frwr_memreg_ops;
420 		break;
421 	case RPCRDMA_ALLPHYSICAL:
422 		ia->ri_ops = &rpcrdma_physical_memreg_ops;
423 		break;
424 	case RPCRDMA_MTHCAFMR:
425 		ia->ri_ops = &rpcrdma_fmr_memreg_ops;
426 		break;
427 	default:
428 		printk(KERN_ERR "RPC: Unsupported memory "
429 				"registration mode: %d\n", memreg);
430 		rc = -ENOMEM;
431 		goto out3;
432 	}
433 	dprintk("RPC:       %s: memory registration strategy is '%s'\n",
434 		__func__, ia->ri_ops->ro_displayname);
435 
436 	return 0;
437 
438 out3:
439 	ib_dealloc_pd(ia->ri_pd);
440 	ia->ri_pd = NULL;
441 out2:
442 	rpcrdma_destroy_id(ia->ri_id);
443 	ia->ri_id = NULL;
444 out1:
445 	return rc;
446 }
447 
448 /*
449  * Clean up/close an IA.
450  *   o if event handles and PD have been initialized, free them.
451  *   o close the IA
452  */
453 void
454 rpcrdma_ia_close(struct rpcrdma_ia *ia)
455 {
456 	dprintk("RPC:       %s: entering\n", __func__);
457 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
458 		if (ia->ri_id->qp)
459 			rdma_destroy_qp(ia->ri_id);
460 		rpcrdma_destroy_id(ia->ri_id);
461 		ia->ri_id = NULL;
462 	}
463 
464 	/* If the pd is still busy, xprtrdma missed freeing a resource */
465 	if (ia->ri_pd && !IS_ERR(ia->ri_pd))
466 		ib_dealloc_pd(ia->ri_pd);
467 }
468 
469 /*
470  * Create unconnected endpoint.
471  */
472 int
473 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
474 				struct rpcrdma_create_data_internal *cdata)
475 {
476 	struct ib_cq *sendcq, *recvcq;
477 	unsigned int max_qp_wr;
478 	int rc;
479 
480 	if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
481 		dprintk("RPC:       %s: insufficient sge's available\n",
482 			__func__);
483 		return -ENOMEM;
484 	}
485 
486 	if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
487 		dprintk("RPC:       %s: insufficient wqe's available\n",
488 			__func__);
489 		return -ENOMEM;
490 	}
491 	max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1;
492 
493 	/* check provider's send/recv wr limits */
494 	if (cdata->max_requests > max_qp_wr)
495 		cdata->max_requests = max_qp_wr;
496 
497 	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
498 	ep->rep_attr.qp_context = ep;
499 	ep->rep_attr.srq = NULL;
500 	ep->rep_attr.cap.max_send_wr = cdata->max_requests;
501 	ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
502 	ep->rep_attr.cap.max_send_wr += 1;	/* drain cqe */
503 	rc = ia->ri_ops->ro_open(ia, ep, cdata);
504 	if (rc)
505 		return rc;
506 	ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
507 	ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
508 	ep->rep_attr.cap.max_recv_wr += 1;	/* drain cqe */
509 	ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
510 	ep->rep_attr.cap.max_recv_sge = 1;
511 	ep->rep_attr.cap.max_inline_data = 0;
512 	ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
513 	ep->rep_attr.qp_type = IB_QPT_RC;
514 	ep->rep_attr.port_num = ~0;
515 
516 	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
517 		"iovs: send %d recv %d\n",
518 		__func__,
519 		ep->rep_attr.cap.max_send_wr,
520 		ep->rep_attr.cap.max_recv_wr,
521 		ep->rep_attr.cap.max_send_sge,
522 		ep->rep_attr.cap.max_recv_sge);
523 
524 	/* set trigger for requesting send completion */
525 	ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
526 	if (ep->rep_cqinit <= 2)
527 		ep->rep_cqinit = 0;	/* always signal? */
528 	INIT_CQCOUNT(ep);
529 	init_waitqueue_head(&ep->rep_connect_wait);
530 	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
531 
532 	sendcq = ib_alloc_cq(ia->ri_device, NULL,
533 			     ep->rep_attr.cap.max_send_wr + 1,
534 			     0, IB_POLL_SOFTIRQ);
535 	if (IS_ERR(sendcq)) {
536 		rc = PTR_ERR(sendcq);
537 		dprintk("RPC:       %s: failed to create send CQ: %i\n",
538 			__func__, rc);
539 		goto out1;
540 	}
541 
542 	recvcq = ib_alloc_cq(ia->ri_device, NULL,
543 			     ep->rep_attr.cap.max_recv_wr + 1,
544 			     0, IB_POLL_SOFTIRQ);
545 	if (IS_ERR(recvcq)) {
546 		rc = PTR_ERR(recvcq);
547 		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
548 			__func__, rc);
549 		goto out2;
550 	}
551 
552 	ep->rep_attr.send_cq = sendcq;
553 	ep->rep_attr.recv_cq = recvcq;
554 
555 	/* Initialize cma parameters */
556 	memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
557 
558 	/* RPC/RDMA does not use private data */
559 	ep->rep_remote_cma.private_data = NULL;
560 	ep->rep_remote_cma.private_data_len = 0;
561 
562 	/* Client offers RDMA Read but does not initiate */
563 	ep->rep_remote_cma.initiator_depth = 0;
564 	if (ia->ri_device->attrs.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
565 		ep->rep_remote_cma.responder_resources = 32;
566 	else
567 		ep->rep_remote_cma.responder_resources =
568 						ia->ri_device->attrs.max_qp_rd_atom;
569 
570 	/* Limit transport retries so client can detect server
571 	 * GID changes quickly. RPC layer handles re-establishing
572 	 * transport connection and retransmission.
573 	 */
574 	ep->rep_remote_cma.retry_count = 6;
575 
576 	/* RPC-over-RDMA handles its own flow control. In addition,
577 	 * make all RNR NAKs visible so we know that RPC-over-RDMA
578 	 * flow control is working correctly (no NAKs should be seen).
579 	 */
580 	ep->rep_remote_cma.flow_control = 0;
581 	ep->rep_remote_cma.rnr_retry_count = 0;
582 
583 	return 0;
584 
585 out2:
586 	ib_free_cq(sendcq);
587 out1:
588 	if (ia->ri_dma_mr)
589 		ib_dereg_mr(ia->ri_dma_mr);
590 	return rc;
591 }
592 
593 /*
594  * rpcrdma_ep_destroy
595  *
596  * Disconnect and destroy endpoint. After this, the only
597  * valid operations on the ep are to free it (if dynamically
598  * allocated) or re-create it.
599  */
600 void
601 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
602 {
603 	int rc;
604 
605 	dprintk("RPC:       %s: entering, connected is %d\n",
606 		__func__, ep->rep_connected);
607 
608 	cancel_delayed_work_sync(&ep->rep_connect_worker);
609 
610 	if (ia->ri_id->qp) {
611 		rpcrdma_ep_disconnect(ep, ia);
612 		rdma_destroy_qp(ia->ri_id);
613 		ia->ri_id->qp = NULL;
614 	}
615 
616 	ib_free_cq(ep->rep_attr.recv_cq);
617 	ib_free_cq(ep->rep_attr.send_cq);
618 
619 	if (ia->ri_dma_mr) {
620 		rc = ib_dereg_mr(ia->ri_dma_mr);
621 		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
622 			__func__, rc);
623 	}
624 }
625 
626 /*
627  * Connect unconnected endpoint.
628  */
629 int
630 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
631 {
632 	struct rdma_cm_id *id, *old;
633 	int rc = 0;
634 	int retry_count = 0;
635 
636 	if (ep->rep_connected != 0) {
637 		struct rpcrdma_xprt *xprt;
638 retry:
639 		dprintk("RPC:       %s: reconnecting...\n", __func__);
640 
641 		rpcrdma_ep_disconnect(ep, ia);
642 
643 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
644 		id = rpcrdma_create_id(xprt, ia,
645 				(struct sockaddr *)&xprt->rx_data.addr);
646 		if (IS_ERR(id)) {
647 			rc = -EHOSTUNREACH;
648 			goto out;
649 		}
650 		/* TEMP TEMP TEMP - fail if new device:
651 		 * Deregister/remarshal *all* requests!
652 		 * Close and recreate adapter, pd, etc!
653 		 * Re-determine all attributes still sane!
654 		 * More stuff I haven't thought of!
655 		 * Rrrgh!
656 		 */
657 		if (ia->ri_device != id->device) {
658 			printk("RPC:       %s: can't reconnect on "
659 				"different device!\n", __func__);
660 			rpcrdma_destroy_id(id);
661 			rc = -ENETUNREACH;
662 			goto out;
663 		}
664 		/* END TEMP */
665 		rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
666 		if (rc) {
667 			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
668 				__func__, rc);
669 			rpcrdma_destroy_id(id);
670 			rc = -ENETUNREACH;
671 			goto out;
672 		}
673 
674 		old = ia->ri_id;
675 		ia->ri_id = id;
676 
677 		rdma_destroy_qp(old);
678 		rpcrdma_destroy_id(old);
679 	} else {
680 		dprintk("RPC:       %s: connecting...\n", __func__);
681 		rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
682 		if (rc) {
683 			dprintk("RPC:       %s: rdma_create_qp failed %i\n",
684 				__func__, rc);
685 			/* do not update ep->rep_connected */
686 			return -ENETUNREACH;
687 		}
688 	}
689 
690 	ep->rep_connected = 0;
691 
692 	rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
693 	if (rc) {
694 		dprintk("RPC:       %s: rdma_connect() failed with %i\n",
695 				__func__, rc);
696 		goto out;
697 	}
698 
699 	wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
700 
701 	/*
702 	 * Check state. A non-peer reject indicates no listener
703 	 * (ECONNREFUSED), which may be a transient state. All
704 	 * others indicate a transport condition which has already
705 	 * undergone a best-effort.
706 	 */
707 	if (ep->rep_connected == -ECONNREFUSED &&
708 	    ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
709 		dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
710 		goto retry;
711 	}
712 	if (ep->rep_connected <= 0) {
713 		/* Sometimes, the only way to reliably connect to remote
714 		 * CMs is to use same nonzero values for ORD and IRD. */
715 		if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
716 		    (ep->rep_remote_cma.responder_resources == 0 ||
717 		     ep->rep_remote_cma.initiator_depth !=
718 				ep->rep_remote_cma.responder_resources)) {
719 			if (ep->rep_remote_cma.responder_resources == 0)
720 				ep->rep_remote_cma.responder_resources = 1;
721 			ep->rep_remote_cma.initiator_depth =
722 				ep->rep_remote_cma.responder_resources;
723 			goto retry;
724 		}
725 		rc = ep->rep_connected;
726 	} else {
727 		struct rpcrdma_xprt *r_xprt;
728 		unsigned int extras;
729 
730 		dprintk("RPC:       %s: connected\n", __func__);
731 
732 		r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
733 		extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
734 
735 		if (extras) {
736 			rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
737 			if (rc) {
738 				pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
739 					__func__, rc);
740 				rc = 0;
741 			}
742 		}
743 	}
744 
745 out:
746 	if (rc)
747 		ep->rep_connected = rc;
748 	return rc;
749 }
750 
751 /*
752  * rpcrdma_ep_disconnect
753  *
754  * This is separate from destroy to facilitate the ability
755  * to reconnect without recreating the endpoint.
756  *
757  * This call is not reentrant, and must not be made in parallel
758  * on the same endpoint.
759  */
760 void
761 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
762 {
763 	int rc;
764 
765 	rc = rdma_disconnect(ia->ri_id);
766 	if (!rc) {
767 		/* returns without wait if not connected */
768 		wait_event_interruptible(ep->rep_connect_wait,
769 							ep->rep_connected != 1);
770 		dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
771 			(ep->rep_connected == 1) ? "still " : "dis");
772 	} else {
773 		dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
774 		ep->rep_connected = rc;
775 	}
776 
777 	ib_drain_qp(ia->ri_id->qp);
778 }
779 
780 struct rpcrdma_req *
781 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
782 {
783 	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
784 	struct rpcrdma_req *req;
785 
786 	req = kzalloc(sizeof(*req), GFP_KERNEL);
787 	if (req == NULL)
788 		return ERR_PTR(-ENOMEM);
789 
790 	INIT_LIST_HEAD(&req->rl_free);
791 	spin_lock(&buffer->rb_reqslock);
792 	list_add(&req->rl_all, &buffer->rb_allreqs);
793 	spin_unlock(&buffer->rb_reqslock);
794 	req->rl_cqe.done = rpcrdma_wc_send;
795 	req->rl_buffer = &r_xprt->rx_buf;
796 	return req;
797 }
798 
799 struct rpcrdma_rep *
800 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
801 {
802 	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
803 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
804 	struct rpcrdma_rep *rep;
805 	int rc;
806 
807 	rc = -ENOMEM;
808 	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
809 	if (rep == NULL)
810 		goto out;
811 
812 	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
813 					       GFP_KERNEL);
814 	if (IS_ERR(rep->rr_rdmabuf)) {
815 		rc = PTR_ERR(rep->rr_rdmabuf);
816 		goto out_free;
817 	}
818 
819 	rep->rr_device = ia->ri_device;
820 	rep->rr_cqe.done = rpcrdma_receive_wc;
821 	rep->rr_rxprt = r_xprt;
822 	INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
823 	return rep;
824 
825 out_free:
826 	kfree(rep);
827 out:
828 	return ERR_PTR(rc);
829 }
830 
831 int
832 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
833 {
834 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
835 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
836 	int i, rc;
837 
838 	buf->rb_max_requests = r_xprt->rx_data.max_requests;
839 	buf->rb_bc_srv_max_requests = 0;
840 	spin_lock_init(&buf->rb_lock);
841 	atomic_set(&buf->rb_credits, 1);
842 
843 	rc = ia->ri_ops->ro_init(r_xprt);
844 	if (rc)
845 		goto out;
846 
847 	INIT_LIST_HEAD(&buf->rb_send_bufs);
848 	INIT_LIST_HEAD(&buf->rb_allreqs);
849 	spin_lock_init(&buf->rb_reqslock);
850 	for (i = 0; i < buf->rb_max_requests; i++) {
851 		struct rpcrdma_req *req;
852 
853 		req = rpcrdma_create_req(r_xprt);
854 		if (IS_ERR(req)) {
855 			dprintk("RPC:       %s: request buffer %d alloc"
856 				" failed\n", __func__, i);
857 			rc = PTR_ERR(req);
858 			goto out;
859 		}
860 		req->rl_backchannel = false;
861 		list_add(&req->rl_free, &buf->rb_send_bufs);
862 	}
863 
864 	INIT_LIST_HEAD(&buf->rb_recv_bufs);
865 	for (i = 0; i < buf->rb_max_requests + 2; i++) {
866 		struct rpcrdma_rep *rep;
867 
868 		rep = rpcrdma_create_rep(r_xprt);
869 		if (IS_ERR(rep)) {
870 			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
871 				__func__, i);
872 			rc = PTR_ERR(rep);
873 			goto out;
874 		}
875 		list_add(&rep->rr_list, &buf->rb_recv_bufs);
876 	}
877 
878 	return 0;
879 out:
880 	rpcrdma_buffer_destroy(buf);
881 	return rc;
882 }
883 
884 static struct rpcrdma_req *
885 rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
886 {
887 	struct rpcrdma_req *req;
888 
889 	req = list_first_entry(&buf->rb_send_bufs,
890 			       struct rpcrdma_req, rl_free);
891 	list_del(&req->rl_free);
892 	return req;
893 }
894 
895 static struct rpcrdma_rep *
896 rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
897 {
898 	struct rpcrdma_rep *rep;
899 
900 	rep = list_first_entry(&buf->rb_recv_bufs,
901 			       struct rpcrdma_rep, rr_list);
902 	list_del(&rep->rr_list);
903 	return rep;
904 }
905 
906 static void
907 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
908 {
909 	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
910 	kfree(rep);
911 }
912 
913 void
914 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
915 {
916 	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
917 	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
918 	kfree(req);
919 }
920 
921 void
922 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
923 {
924 	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
925 
926 	while (!list_empty(&buf->rb_recv_bufs)) {
927 		struct rpcrdma_rep *rep;
928 
929 		rep = rpcrdma_buffer_get_rep_locked(buf);
930 		rpcrdma_destroy_rep(ia, rep);
931 	}
932 
933 	spin_lock(&buf->rb_reqslock);
934 	while (!list_empty(&buf->rb_allreqs)) {
935 		struct rpcrdma_req *req;
936 
937 		req = list_first_entry(&buf->rb_allreqs,
938 				       struct rpcrdma_req, rl_all);
939 		list_del(&req->rl_all);
940 
941 		spin_unlock(&buf->rb_reqslock);
942 		rpcrdma_destroy_req(ia, req);
943 		spin_lock(&buf->rb_reqslock);
944 	}
945 	spin_unlock(&buf->rb_reqslock);
946 
947 	ia->ri_ops->ro_destroy(buf);
948 }
949 
950 struct rpcrdma_mw *
951 rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
952 {
953 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
954 	struct rpcrdma_mw *mw = NULL;
955 
956 	spin_lock(&buf->rb_mwlock);
957 	if (!list_empty(&buf->rb_mws)) {
958 		mw = list_first_entry(&buf->rb_mws,
959 				      struct rpcrdma_mw, mw_list);
960 		list_del_init(&mw->mw_list);
961 	}
962 	spin_unlock(&buf->rb_mwlock);
963 
964 	if (!mw)
965 		pr_err("RPC:       %s: no MWs available\n", __func__);
966 	return mw;
967 }
968 
969 void
970 rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
971 {
972 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
973 
974 	spin_lock(&buf->rb_mwlock);
975 	list_add_tail(&mw->mw_list, &buf->rb_mws);
976 	spin_unlock(&buf->rb_mwlock);
977 }
978 
979 /*
980  * Get a set of request/reply buffers.
981  *
982  * Reply buffer (if available) is attached to send buffer upon return.
983  */
984 struct rpcrdma_req *
985 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
986 {
987 	struct rpcrdma_req *req;
988 
989 	spin_lock(&buffers->rb_lock);
990 	if (list_empty(&buffers->rb_send_bufs))
991 		goto out_reqbuf;
992 	req = rpcrdma_buffer_get_req_locked(buffers);
993 	if (list_empty(&buffers->rb_recv_bufs))
994 		goto out_repbuf;
995 	req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
996 	spin_unlock(&buffers->rb_lock);
997 	return req;
998 
999 out_reqbuf:
1000 	spin_unlock(&buffers->rb_lock);
1001 	pr_warn("RPC:       %s: out of request buffers\n", __func__);
1002 	return NULL;
1003 out_repbuf:
1004 	spin_unlock(&buffers->rb_lock);
1005 	pr_warn("RPC:       %s: out of reply buffers\n", __func__);
1006 	req->rl_reply = NULL;
1007 	return req;
1008 }
1009 
1010 /*
1011  * Put request/reply buffers back into pool.
1012  * Pre-decrement counter/array index.
1013  */
1014 void
1015 rpcrdma_buffer_put(struct rpcrdma_req *req)
1016 {
1017 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1018 	struct rpcrdma_rep *rep = req->rl_reply;
1019 
1020 	req->rl_niovs = 0;
1021 	req->rl_reply = NULL;
1022 
1023 	spin_lock(&buffers->rb_lock);
1024 	list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
1025 	if (rep)
1026 		list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1027 	spin_unlock(&buffers->rb_lock);
1028 }
1029 
1030 /*
1031  * Recover reply buffers from pool.
1032  * This happens when recovering from disconnect.
1033  */
1034 void
1035 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1036 {
1037 	struct rpcrdma_buffer *buffers = req->rl_buffer;
1038 
1039 	spin_lock(&buffers->rb_lock);
1040 	if (!list_empty(&buffers->rb_recv_bufs))
1041 		req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
1042 	spin_unlock(&buffers->rb_lock);
1043 }
1044 
1045 /*
1046  * Put reply buffers back into pool when not attached to
1047  * request. This happens in error conditions.
1048  */
1049 void
1050 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1051 {
1052 	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1053 
1054 	spin_lock(&buffers->rb_lock);
1055 	list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1056 	spin_unlock(&buffers->rb_lock);
1057 }
1058 
1059 /*
1060  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
1061  */
1062 
1063 void
1064 rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
1065 {
1066 	dprintk("RPC:       map_one: offset %p iova %llx len %zu\n",
1067 		seg->mr_offset,
1068 		(unsigned long long)seg->mr_dma, seg->mr_dmalen);
1069 }
1070 
1071 /**
1072  * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
1073  * @ia: controlling rpcrdma_ia
1074  * @size: size of buffer to be allocated, in bytes
1075  * @flags: GFP flags
1076  *
1077  * Returns pointer to private header of an area of internally
1078  * registered memory, or an ERR_PTR. The registered buffer follows
1079  * the end of the private header.
1080  *
1081  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1082  * receiving the payload of RDMA RECV operations. regbufs are not
1083  * used for RDMA READ/WRITE operations, thus are registered only for
1084  * LOCAL access.
1085  */
1086 struct rpcrdma_regbuf *
1087 rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
1088 {
1089 	struct rpcrdma_regbuf *rb;
1090 	struct ib_sge *iov;
1091 
1092 	rb = kmalloc(sizeof(*rb) + size, flags);
1093 	if (rb == NULL)
1094 		goto out;
1095 
1096 	iov = &rb->rg_iov;
1097 	iov->addr = ib_dma_map_single(ia->ri_device,
1098 				      (void *)rb->rg_base, size,
1099 				      DMA_BIDIRECTIONAL);
1100 	if (ib_dma_mapping_error(ia->ri_device, iov->addr))
1101 		goto out_free;
1102 
1103 	iov->length = size;
1104 	iov->lkey = ia->ri_pd->local_dma_lkey;
1105 	rb->rg_size = size;
1106 	rb->rg_owner = NULL;
1107 	return rb;
1108 
1109 out_free:
1110 	kfree(rb);
1111 out:
1112 	return ERR_PTR(-ENOMEM);
1113 }
1114 
1115 /**
1116  * rpcrdma_free_regbuf - deregister and free registered buffer
1117  * @ia: controlling rpcrdma_ia
1118  * @rb: regbuf to be deregistered and freed
1119  */
1120 void
1121 rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1122 {
1123 	struct ib_sge *iov;
1124 
1125 	if (!rb)
1126 		return;
1127 
1128 	iov = &rb->rg_iov;
1129 	ib_dma_unmap_single(ia->ri_device,
1130 			    iov->addr, iov->length, DMA_BIDIRECTIONAL);
1131 	kfree(rb);
1132 }
1133 
1134 /*
1135  * Prepost any receive buffer, then post send.
1136  *
1137  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1138  */
1139 int
1140 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1141 		struct rpcrdma_ep *ep,
1142 		struct rpcrdma_req *req)
1143 {
1144 	struct ib_device *device = ia->ri_device;
1145 	struct ib_send_wr send_wr, *send_wr_fail;
1146 	struct rpcrdma_rep *rep = req->rl_reply;
1147 	struct ib_sge *iov = req->rl_send_iov;
1148 	int i, rc;
1149 
1150 	if (rep) {
1151 		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1152 		if (rc)
1153 			goto out;
1154 		req->rl_reply = NULL;
1155 	}
1156 
1157 	send_wr.next = NULL;
1158 	send_wr.wr_cqe = &req->rl_cqe;
1159 	send_wr.sg_list = iov;
1160 	send_wr.num_sge = req->rl_niovs;
1161 	send_wr.opcode = IB_WR_SEND;
1162 
1163 	for (i = 0; i < send_wr.num_sge; i++)
1164 		ib_dma_sync_single_for_device(device, iov[i].addr,
1165 					      iov[i].length, DMA_TO_DEVICE);
1166 	dprintk("RPC:       %s: posting %d s/g entries\n",
1167 		__func__, send_wr.num_sge);
1168 
1169 	if (DECR_CQCOUNT(ep) > 0)
1170 		send_wr.send_flags = 0;
1171 	else { /* Provider must take a send completion every now and then */
1172 		INIT_CQCOUNT(ep);
1173 		send_wr.send_flags = IB_SEND_SIGNALED;
1174 	}
1175 
1176 	rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
1177 	if (rc)
1178 		dprintk("RPC:       %s: ib_post_send returned %i\n", __func__,
1179 			rc);
1180 out:
1181 	return rc;
1182 }
1183 
1184 /*
1185  * (Re)post a receive buffer.
1186  */
1187 int
1188 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1189 		     struct rpcrdma_ep *ep,
1190 		     struct rpcrdma_rep *rep)
1191 {
1192 	struct ib_recv_wr recv_wr, *recv_wr_fail;
1193 	int rc;
1194 
1195 	recv_wr.next = NULL;
1196 	recv_wr.wr_cqe = &rep->rr_cqe;
1197 	recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
1198 	recv_wr.num_sge = 1;
1199 
1200 	ib_dma_sync_single_for_cpu(ia->ri_device,
1201 				   rdmab_addr(rep->rr_rdmabuf),
1202 				   rdmab_length(rep->rr_rdmabuf),
1203 				   DMA_BIDIRECTIONAL);
1204 
1205 	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
1206 
1207 	if (rc)
1208 		dprintk("RPC:       %s: ib_post_recv returned %i\n", __func__,
1209 			rc);
1210 	return rc;
1211 }
1212 
1213 /**
1214  * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
1215  * @r_xprt: transport associated with these backchannel resources
1216  * @min_reqs: minimum number of incoming requests expected
1217  *
1218  * Returns zero if all requested buffers were posted, or a negative errno.
1219  */
1220 int
1221 rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
1222 {
1223 	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
1224 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1225 	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
1226 	struct rpcrdma_rep *rep;
1227 	int rc;
1228 
1229 	while (count--) {
1230 		spin_lock(&buffers->rb_lock);
1231 		if (list_empty(&buffers->rb_recv_bufs))
1232 			goto out_reqbuf;
1233 		rep = rpcrdma_buffer_get_rep_locked(buffers);
1234 		spin_unlock(&buffers->rb_lock);
1235 
1236 		rc = rpcrdma_ep_post_recv(ia, ep, rep);
1237 		if (rc)
1238 			goto out_rc;
1239 	}
1240 
1241 	return 0;
1242 
1243 out_reqbuf:
1244 	spin_unlock(&buffers->rb_lock);
1245 	pr_warn("%s: no extra receive buffers\n", __func__);
1246 	return -ENOMEM;
1247 
1248 out_rc:
1249 	rpcrdma_recv_buffer_put(rep);
1250 	return rc;
1251 }
1252