xref: /openbmc/linux/net/smc/smc_wr.c (revision f7d84fa7)
1 /*
2  * Shared Memory Communications over RDMA (SMC-R) and RoCE
3  *
4  * Work Requests exploiting Infiniband API
5  *
6  * Work requests (WR) of type ib_post_send or ib_post_recv respectively
7  * are submitted to either RC SQ or RC RQ respectively
8  * (reliably connected send/receive queue)
9  * and become work queue entries (WQEs).
10  * While an SQ WR/WQE is pending, we track it until transmission completion.
11  * Through a send or receive completion queue (CQ) respectively,
12  * we get completion queue entries (CQEs) [aka work completions (WCs)].
13  * Since the CQ callback is called from IRQ context, we split work by using
14  * bottom halves implemented by tasklets.
15  *
16  * SMC uses this to exchange LLC (link layer control)
17  * and CDC (connection data control) messages.
18  *
19  * Copyright IBM Corp. 2016
20  *
21  * Author(s):  Steffen Maier <maier@linux.vnet.ibm.com>
22  */
23 
24 #include <linux/atomic.h>
25 #include <linux/hashtable.h>
26 #include <linux/wait.h>
27 #include <rdma/ib_verbs.h>
28 #include <asm/div64.h>
29 
30 #include "smc.h"
31 #include "smc_wr.h"
32 
33 #define SMC_WR_MAX_POLL_CQE 10	/* max. # of compl. queue elements in 1 poll */
34 
35 #define SMC_WR_RX_HASH_BITS 4
36 static DEFINE_HASHTABLE(smc_wr_rx_hash, SMC_WR_RX_HASH_BITS);
37 static DEFINE_SPINLOCK(smc_wr_rx_hash_lock);
38 
39 struct smc_wr_tx_pend {	/* control data for a pending send request */
40 	u64			wr_id;		/* work request id sent */
41 	smc_wr_tx_handler	handler;
42 	enum ib_wc_status	wc_status;	/* CQE status */
43 	struct smc_link		*link;
44 	u32			idx;
45 	struct smc_wr_tx_pend_priv priv;
46 };
47 
48 /******************************** send queue *********************************/
49 
50 /*------------------------------- completion --------------------------------*/
51 
52 static inline int smc_wr_tx_find_pending_index(struct smc_link *link, u64 wr_id)
53 {
54 	u32 i;
55 
56 	for (i = 0; i < link->wr_tx_cnt; i++) {
57 		if (link->wr_tx_pends[i].wr_id == wr_id)
58 			return i;
59 	}
60 	return link->wr_tx_cnt;
61 }
62 
63 static inline void smc_wr_tx_process_cqe(struct ib_wc *wc)
64 {
65 	struct smc_wr_tx_pend pnd_snd;
66 	struct smc_link *link;
67 	u32 pnd_snd_idx;
68 	int i;
69 
70 	link = wc->qp->qp_context;
71 	pnd_snd_idx = smc_wr_tx_find_pending_index(link, wc->wr_id);
72 	if (pnd_snd_idx == link->wr_tx_cnt)
73 		return;
74 	link->wr_tx_pends[pnd_snd_idx].wc_status = wc->status;
75 	memcpy(&pnd_snd, &link->wr_tx_pends[pnd_snd_idx], sizeof(pnd_snd));
76 	/* clear the full struct smc_wr_tx_pend including .priv */
77 	memset(&link->wr_tx_pends[pnd_snd_idx], 0,
78 	       sizeof(link->wr_tx_pends[pnd_snd_idx]));
79 	memset(&link->wr_tx_bufs[pnd_snd_idx], 0,
80 	       sizeof(link->wr_tx_bufs[pnd_snd_idx]));
81 	if (!test_and_clear_bit(pnd_snd_idx, link->wr_tx_mask))
82 		return;
83 	if (wc->status) {
84 		struct smc_link_group *lgr;
85 
86 		for_each_set_bit(i, link->wr_tx_mask, link->wr_tx_cnt) {
87 			/* clear full struct smc_wr_tx_pend including .priv */
88 			memset(&link->wr_tx_pends[i], 0,
89 			       sizeof(link->wr_tx_pends[i]));
90 			memset(&link->wr_tx_bufs[i], 0,
91 			       sizeof(link->wr_tx_bufs[i]));
92 			clear_bit(i, link->wr_tx_mask);
93 		}
94 		/* terminate connections of this link group abnormally */
95 		lgr = container_of(link, struct smc_link_group,
96 				   lnk[SMC_SINGLE_LINK]);
97 		smc_lgr_terminate(lgr);
98 	}
99 	if (pnd_snd.handler)
100 		pnd_snd.handler(&pnd_snd.priv, link, wc->status);
101 	wake_up(&link->wr_tx_wait);
102 }
103 
104 static void smc_wr_tx_tasklet_fn(unsigned long data)
105 {
106 	struct smc_ib_device *dev = (struct smc_ib_device *)data;
107 	struct ib_wc wc[SMC_WR_MAX_POLL_CQE];
108 	int i = 0, rc;
109 	int polled = 0;
110 
111 again:
112 	polled++;
113 	do {
114 		rc = ib_poll_cq(dev->roce_cq_send, SMC_WR_MAX_POLL_CQE, wc);
115 		if (polled == 1) {
116 			ib_req_notify_cq(dev->roce_cq_send,
117 					 IB_CQ_NEXT_COMP |
118 					 IB_CQ_REPORT_MISSED_EVENTS);
119 		}
120 		if (!rc)
121 			break;
122 		for (i = 0; i < rc; i++)
123 			smc_wr_tx_process_cqe(&wc[i]);
124 	} while (rc > 0);
125 	if (polled == 1)
126 		goto again;
127 }
128 
129 void smc_wr_tx_cq_handler(struct ib_cq *ib_cq, void *cq_context)
130 {
131 	struct smc_ib_device *dev = (struct smc_ib_device *)cq_context;
132 
133 	tasklet_schedule(&dev->send_tasklet);
134 }
135 
136 /*---------------------------- request submission ---------------------------*/
137 
138 static inline int smc_wr_tx_get_free_slot_index(struct smc_link *link, u32 *idx)
139 {
140 	*idx = link->wr_tx_cnt;
141 	for_each_clear_bit(*idx, link->wr_tx_mask, link->wr_tx_cnt) {
142 		if (!test_and_set_bit(*idx, link->wr_tx_mask))
143 			return 0;
144 	}
145 	*idx = link->wr_tx_cnt;
146 	return -EBUSY;
147 }
148 
149 /**
150  * smc_wr_tx_get_free_slot() - returns buffer for message assembly,
151  *			and sets info for pending transmit tracking
152  * @link:		Pointer to smc_link used to later send the message.
153  * @handler:		Send completion handler function pointer.
154  * @wr_buf:		Out value returns pointer to message buffer.
155  * @wr_pend_priv:	Out value returns pointer serving as handler context.
156  *
157  * Return: 0 on success, or -errno on error.
158  */
159 int smc_wr_tx_get_free_slot(struct smc_link *link,
160 			    smc_wr_tx_handler handler,
161 			    struct smc_wr_buf **wr_buf,
162 			    struct smc_wr_tx_pend_priv **wr_pend_priv)
163 {
164 	struct smc_wr_tx_pend *wr_pend;
165 	struct ib_send_wr *wr_ib;
166 	u64 wr_id;
167 	u32 idx;
168 	int rc;
169 
170 	*wr_buf = NULL;
171 	*wr_pend_priv = NULL;
172 	if (in_softirq()) {
173 		rc = smc_wr_tx_get_free_slot_index(link, &idx);
174 		if (rc)
175 			return rc;
176 	} else {
177 		rc = wait_event_interruptible_timeout(
178 			link->wr_tx_wait,
179 			(smc_wr_tx_get_free_slot_index(link, &idx) != -EBUSY),
180 			SMC_WR_TX_WAIT_FREE_SLOT_TIME);
181 		if (!rc) {
182 			/* timeout - terminate connections */
183 			struct smc_link_group *lgr;
184 
185 			lgr = container_of(link, struct smc_link_group,
186 					   lnk[SMC_SINGLE_LINK]);
187 			smc_lgr_terminate(lgr);
188 			return -EPIPE;
189 		}
190 		if (rc == -ERESTARTSYS)
191 			return -EINTR;
192 		if (idx == link->wr_tx_cnt)
193 			return -EPIPE;
194 	}
195 	wr_id = smc_wr_tx_get_next_wr_id(link);
196 	wr_pend = &link->wr_tx_pends[idx];
197 	wr_pend->wr_id = wr_id;
198 	wr_pend->handler = handler;
199 	wr_pend->link = link;
200 	wr_pend->idx = idx;
201 	wr_ib = &link->wr_tx_ibs[idx];
202 	wr_ib->wr_id = wr_id;
203 	*wr_buf = &link->wr_tx_bufs[idx];
204 	*wr_pend_priv = &wr_pend->priv;
205 	return 0;
206 }
207 
208 int smc_wr_tx_put_slot(struct smc_link *link,
209 		       struct smc_wr_tx_pend_priv *wr_pend_priv)
210 {
211 	struct smc_wr_tx_pend *pend;
212 
213 	pend = container_of(wr_pend_priv, struct smc_wr_tx_pend, priv);
214 	if (pend->idx < link->wr_tx_cnt) {
215 		/* clear the full struct smc_wr_tx_pend including .priv */
216 		memset(&link->wr_tx_pends[pend->idx], 0,
217 		       sizeof(link->wr_tx_pends[pend->idx]));
218 		memset(&link->wr_tx_bufs[pend->idx], 0,
219 		       sizeof(link->wr_tx_bufs[pend->idx]));
220 		test_and_clear_bit(pend->idx, link->wr_tx_mask);
221 		return 1;
222 	}
223 
224 	return 0;
225 }
226 
227 /* Send prepared WR slot via ib_post_send.
228  * @priv: pointer to smc_wr_tx_pend_priv identifying prepared message buffer
229  */
230 int smc_wr_tx_send(struct smc_link *link, struct smc_wr_tx_pend_priv *priv)
231 {
232 	struct ib_send_wr *failed_wr = NULL;
233 	struct smc_wr_tx_pend *pend;
234 	int rc;
235 
236 	ib_req_notify_cq(link->smcibdev->roce_cq_send,
237 			 IB_CQ_SOLICITED_MASK | IB_CQ_REPORT_MISSED_EVENTS);
238 	pend = container_of(priv, struct smc_wr_tx_pend, priv);
239 	rc = ib_post_send(link->roce_qp, &link->wr_tx_ibs[pend->idx],
240 			  &failed_wr);
241 	if (rc)
242 		smc_wr_tx_put_slot(link, priv);
243 	return rc;
244 }
245 
246 void smc_wr_tx_dismiss_slots(struct smc_link *link, u8 wr_rx_hdr_type,
247 			     smc_wr_tx_filter filter,
248 			     smc_wr_tx_dismisser dismisser,
249 			     unsigned long data)
250 {
251 	struct smc_wr_tx_pend_priv *tx_pend;
252 	struct smc_wr_rx_hdr *wr_rx;
253 	int i;
254 
255 	for_each_set_bit(i, link->wr_tx_mask, link->wr_tx_cnt) {
256 		wr_rx = (struct smc_wr_rx_hdr *)&link->wr_rx_bufs[i];
257 		if (wr_rx->type != wr_rx_hdr_type)
258 			continue;
259 		tx_pend = &link->wr_tx_pends[i].priv;
260 		if (filter(tx_pend, data))
261 			dismisser(tx_pend);
262 	}
263 }
264 
265 bool smc_wr_tx_has_pending(struct smc_link *link, u8 wr_rx_hdr_type,
266 			   smc_wr_tx_filter filter, unsigned long data)
267 {
268 	struct smc_wr_tx_pend_priv *tx_pend;
269 	struct smc_wr_rx_hdr *wr_rx;
270 	int i;
271 
272 	for_each_set_bit(i, link->wr_tx_mask, link->wr_tx_cnt) {
273 		wr_rx = (struct smc_wr_rx_hdr *)&link->wr_rx_bufs[i];
274 		if (wr_rx->type != wr_rx_hdr_type)
275 			continue;
276 		tx_pend = &link->wr_tx_pends[i].priv;
277 		if (filter(tx_pend, data))
278 			return true;
279 	}
280 	return false;
281 }
282 
283 /****************************** receive queue ********************************/
284 
285 int smc_wr_rx_register_handler(struct smc_wr_rx_handler *handler)
286 {
287 	struct smc_wr_rx_handler *h_iter;
288 	int rc = 0;
289 
290 	spin_lock(&smc_wr_rx_hash_lock);
291 	hash_for_each_possible(smc_wr_rx_hash, h_iter, list, handler->type) {
292 		if (h_iter->type == handler->type) {
293 			rc = -EEXIST;
294 			goto out_unlock;
295 		}
296 	}
297 	hash_add(smc_wr_rx_hash, &handler->list, handler->type);
298 out_unlock:
299 	spin_unlock(&smc_wr_rx_hash_lock);
300 	return rc;
301 }
302 
303 /* Demultiplex a received work request based on the message type to its handler.
304  * Relies on smc_wr_rx_hash having been completely filled before any IB WRs,
305  * and not being modified any more afterwards so we don't need to lock it.
306  */
307 static inline void smc_wr_rx_demultiplex(struct ib_wc *wc)
308 {
309 	struct smc_link *link = (struct smc_link *)wc->qp->qp_context;
310 	struct smc_wr_rx_handler *handler;
311 	struct smc_wr_rx_hdr *wr_rx;
312 	u64 temp_wr_id;
313 	u32 index;
314 
315 	if (wc->byte_len < sizeof(*wr_rx))
316 		return; /* short message */
317 	temp_wr_id = wc->wr_id;
318 	index = do_div(temp_wr_id, link->wr_rx_cnt);
319 	wr_rx = (struct smc_wr_rx_hdr *)&link->wr_rx_bufs[index];
320 	hash_for_each_possible(smc_wr_rx_hash, handler, list, wr_rx->type) {
321 		if (handler->type == wr_rx->type)
322 			handler->handler(wc, wr_rx);
323 	}
324 }
325 
326 static inline void smc_wr_rx_process_cqes(struct ib_wc wc[], int num)
327 {
328 	struct smc_link *link;
329 	int i;
330 
331 	for (i = 0; i < num; i++) {
332 		link = wc[i].qp->qp_context;
333 		if (wc[i].status == IB_WC_SUCCESS) {
334 			smc_wr_rx_demultiplex(&wc[i]);
335 			smc_wr_rx_post(link); /* refill WR RX */
336 		} else {
337 			struct smc_link_group *lgr;
338 
339 			/* handle status errors */
340 			switch (wc[i].status) {
341 			case IB_WC_RETRY_EXC_ERR:
342 			case IB_WC_RNR_RETRY_EXC_ERR:
343 			case IB_WC_WR_FLUSH_ERR:
344 				/* terminate connections of this link group
345 				 * abnormally
346 				 */
347 				lgr = container_of(link, struct smc_link_group,
348 						   lnk[SMC_SINGLE_LINK]);
349 				smc_lgr_terminate(lgr);
350 				break;
351 			default:
352 				smc_wr_rx_post(link); /* refill WR RX */
353 				break;
354 			}
355 		}
356 	}
357 }
358 
359 static void smc_wr_rx_tasklet_fn(unsigned long data)
360 {
361 	struct smc_ib_device *dev = (struct smc_ib_device *)data;
362 	struct ib_wc wc[SMC_WR_MAX_POLL_CQE];
363 	int polled = 0;
364 	int rc;
365 
366 again:
367 	polled++;
368 	do {
369 		memset(&wc, 0, sizeof(wc));
370 		rc = ib_poll_cq(dev->roce_cq_recv, SMC_WR_MAX_POLL_CQE, wc);
371 		if (polled == 1) {
372 			ib_req_notify_cq(dev->roce_cq_recv,
373 					 IB_CQ_SOLICITED_MASK
374 					 | IB_CQ_REPORT_MISSED_EVENTS);
375 		}
376 		if (!rc)
377 			break;
378 		smc_wr_rx_process_cqes(&wc[0], rc);
379 	} while (rc > 0);
380 	if (polled == 1)
381 		goto again;
382 }
383 
384 void smc_wr_rx_cq_handler(struct ib_cq *ib_cq, void *cq_context)
385 {
386 	struct smc_ib_device *dev = (struct smc_ib_device *)cq_context;
387 
388 	tasklet_schedule(&dev->recv_tasklet);
389 }
390 
391 int smc_wr_rx_post_init(struct smc_link *link)
392 {
393 	u32 i;
394 	int rc = 0;
395 
396 	for (i = 0; i < link->wr_rx_cnt; i++)
397 		rc = smc_wr_rx_post(link);
398 	return rc;
399 }
400 
401 /***************************** init, exit, misc ******************************/
402 
403 void smc_wr_remember_qp_attr(struct smc_link *lnk)
404 {
405 	struct ib_qp_attr *attr = &lnk->qp_attr;
406 	struct ib_qp_init_attr init_attr;
407 
408 	memset(attr, 0, sizeof(*attr));
409 	memset(&init_attr, 0, sizeof(init_attr));
410 	ib_query_qp(lnk->roce_qp, attr,
411 		    IB_QP_STATE |
412 		    IB_QP_CUR_STATE |
413 		    IB_QP_PKEY_INDEX |
414 		    IB_QP_PORT |
415 		    IB_QP_QKEY |
416 		    IB_QP_AV |
417 		    IB_QP_PATH_MTU |
418 		    IB_QP_TIMEOUT |
419 		    IB_QP_RETRY_CNT |
420 		    IB_QP_RNR_RETRY |
421 		    IB_QP_RQ_PSN |
422 		    IB_QP_ALT_PATH |
423 		    IB_QP_MIN_RNR_TIMER |
424 		    IB_QP_SQ_PSN |
425 		    IB_QP_PATH_MIG_STATE |
426 		    IB_QP_CAP |
427 		    IB_QP_DEST_QPN,
428 		    &init_attr);
429 
430 	lnk->wr_tx_cnt = min_t(size_t, SMC_WR_BUF_CNT,
431 			       lnk->qp_attr.cap.max_send_wr);
432 	lnk->wr_rx_cnt = min_t(size_t, SMC_WR_BUF_CNT * 3,
433 			       lnk->qp_attr.cap.max_recv_wr);
434 }
435 
436 static void smc_wr_init_sge(struct smc_link *lnk)
437 {
438 	u32 i;
439 
440 	for (i = 0; i < lnk->wr_tx_cnt; i++) {
441 		lnk->wr_tx_sges[i].addr =
442 			lnk->wr_tx_dma_addr + i * SMC_WR_BUF_SIZE;
443 		lnk->wr_tx_sges[i].length = SMC_WR_TX_SIZE;
444 		lnk->wr_tx_sges[i].lkey = lnk->roce_pd->local_dma_lkey;
445 		lnk->wr_tx_ibs[i].next = NULL;
446 		lnk->wr_tx_ibs[i].sg_list = &lnk->wr_tx_sges[i];
447 		lnk->wr_tx_ibs[i].num_sge = 1;
448 		lnk->wr_tx_ibs[i].opcode = IB_WR_SEND;
449 		lnk->wr_tx_ibs[i].send_flags =
450 			IB_SEND_SIGNALED | IB_SEND_SOLICITED;
451 	}
452 	for (i = 0; i < lnk->wr_rx_cnt; i++) {
453 		lnk->wr_rx_sges[i].addr =
454 			lnk->wr_rx_dma_addr + i * SMC_WR_BUF_SIZE;
455 		lnk->wr_rx_sges[i].length = SMC_WR_BUF_SIZE;
456 		lnk->wr_rx_sges[i].lkey = lnk->roce_pd->local_dma_lkey;
457 		lnk->wr_rx_ibs[i].next = NULL;
458 		lnk->wr_rx_ibs[i].sg_list = &lnk->wr_rx_sges[i];
459 		lnk->wr_rx_ibs[i].num_sge = 1;
460 	}
461 }
462 
463 void smc_wr_free_link(struct smc_link *lnk)
464 {
465 	struct ib_device *ibdev;
466 
467 	memset(lnk->wr_tx_mask, 0,
468 	       BITS_TO_LONGS(SMC_WR_BUF_CNT) * sizeof(*lnk->wr_tx_mask));
469 
470 	if (!lnk->smcibdev)
471 		return;
472 	ibdev = lnk->smcibdev->ibdev;
473 
474 	if (lnk->wr_rx_dma_addr) {
475 		ib_dma_unmap_single(ibdev, lnk->wr_rx_dma_addr,
476 				    SMC_WR_BUF_SIZE * lnk->wr_rx_cnt,
477 				    DMA_FROM_DEVICE);
478 		lnk->wr_rx_dma_addr = 0;
479 	}
480 	if (lnk->wr_tx_dma_addr) {
481 		ib_dma_unmap_single(ibdev, lnk->wr_tx_dma_addr,
482 				    SMC_WR_BUF_SIZE * lnk->wr_tx_cnt,
483 				    DMA_TO_DEVICE);
484 		lnk->wr_tx_dma_addr = 0;
485 	}
486 }
487 
488 void smc_wr_free_link_mem(struct smc_link *lnk)
489 {
490 	kfree(lnk->wr_tx_pends);
491 	lnk->wr_tx_pends = NULL;
492 	kfree(lnk->wr_tx_mask);
493 	lnk->wr_tx_mask = NULL;
494 	kfree(lnk->wr_tx_sges);
495 	lnk->wr_tx_sges = NULL;
496 	kfree(lnk->wr_rx_sges);
497 	lnk->wr_rx_sges = NULL;
498 	kfree(lnk->wr_rx_ibs);
499 	lnk->wr_rx_ibs = NULL;
500 	kfree(lnk->wr_tx_ibs);
501 	lnk->wr_tx_ibs = NULL;
502 	kfree(lnk->wr_tx_bufs);
503 	lnk->wr_tx_bufs = NULL;
504 	kfree(lnk->wr_rx_bufs);
505 	lnk->wr_rx_bufs = NULL;
506 }
507 
508 int smc_wr_alloc_link_mem(struct smc_link *link)
509 {
510 	/* allocate link related memory */
511 	link->wr_tx_bufs = kcalloc(SMC_WR_BUF_CNT, SMC_WR_BUF_SIZE, GFP_KERNEL);
512 	if (!link->wr_tx_bufs)
513 		goto no_mem;
514 	link->wr_rx_bufs = kcalloc(SMC_WR_BUF_CNT * 3, SMC_WR_BUF_SIZE,
515 				   GFP_KERNEL);
516 	if (!link->wr_rx_bufs)
517 		goto no_mem_wr_tx_bufs;
518 	link->wr_tx_ibs = kcalloc(SMC_WR_BUF_CNT, sizeof(link->wr_tx_ibs[0]),
519 				  GFP_KERNEL);
520 	if (!link->wr_tx_ibs)
521 		goto no_mem_wr_rx_bufs;
522 	link->wr_rx_ibs = kcalloc(SMC_WR_BUF_CNT * 3,
523 				  sizeof(link->wr_rx_ibs[0]),
524 				  GFP_KERNEL);
525 	if (!link->wr_rx_ibs)
526 		goto no_mem_wr_tx_ibs;
527 	link->wr_tx_sges = kcalloc(SMC_WR_BUF_CNT, sizeof(link->wr_tx_sges[0]),
528 				   GFP_KERNEL);
529 	if (!link->wr_tx_sges)
530 		goto no_mem_wr_rx_ibs;
531 	link->wr_rx_sges = kcalloc(SMC_WR_BUF_CNT * 3,
532 				   sizeof(link->wr_rx_sges[0]),
533 				   GFP_KERNEL);
534 	if (!link->wr_rx_sges)
535 		goto no_mem_wr_tx_sges;
536 	link->wr_tx_mask = kzalloc(
537 		BITS_TO_LONGS(SMC_WR_BUF_CNT) * sizeof(*link->wr_tx_mask),
538 		GFP_KERNEL);
539 	if (!link->wr_tx_mask)
540 		goto no_mem_wr_rx_sges;
541 	link->wr_tx_pends = kcalloc(SMC_WR_BUF_CNT,
542 				    sizeof(link->wr_tx_pends[0]),
543 				    GFP_KERNEL);
544 	if (!link->wr_tx_pends)
545 		goto no_mem_wr_tx_mask;
546 	return 0;
547 
548 no_mem_wr_tx_mask:
549 	kfree(link->wr_tx_mask);
550 no_mem_wr_rx_sges:
551 	kfree(link->wr_rx_sges);
552 no_mem_wr_tx_sges:
553 	kfree(link->wr_tx_sges);
554 no_mem_wr_rx_ibs:
555 	kfree(link->wr_rx_ibs);
556 no_mem_wr_tx_ibs:
557 	kfree(link->wr_tx_ibs);
558 no_mem_wr_rx_bufs:
559 	kfree(link->wr_rx_bufs);
560 no_mem_wr_tx_bufs:
561 	kfree(link->wr_tx_bufs);
562 no_mem:
563 	return -ENOMEM;
564 }
565 
566 void smc_wr_remove_dev(struct smc_ib_device *smcibdev)
567 {
568 	tasklet_kill(&smcibdev->recv_tasklet);
569 	tasklet_kill(&smcibdev->send_tasklet);
570 }
571 
572 void smc_wr_add_dev(struct smc_ib_device *smcibdev)
573 {
574 	tasklet_init(&smcibdev->recv_tasklet, smc_wr_rx_tasklet_fn,
575 		     (unsigned long)smcibdev);
576 	tasklet_init(&smcibdev->send_tasklet, smc_wr_tx_tasklet_fn,
577 		     (unsigned long)smcibdev);
578 }
579 
580 int smc_wr_create_link(struct smc_link *lnk)
581 {
582 	struct ib_device *ibdev = lnk->smcibdev->ibdev;
583 	int rc = 0;
584 
585 	smc_wr_tx_set_wr_id(&lnk->wr_tx_id, 0);
586 	lnk->wr_rx_id = 0;
587 	lnk->wr_rx_dma_addr = ib_dma_map_single(
588 		ibdev, lnk->wr_rx_bufs,	SMC_WR_BUF_SIZE * lnk->wr_rx_cnt,
589 		DMA_FROM_DEVICE);
590 	if (ib_dma_mapping_error(ibdev, lnk->wr_rx_dma_addr)) {
591 		lnk->wr_rx_dma_addr = 0;
592 		rc = -EIO;
593 		goto out;
594 	}
595 	lnk->wr_tx_dma_addr = ib_dma_map_single(
596 		ibdev, lnk->wr_tx_bufs,	SMC_WR_BUF_SIZE * lnk->wr_tx_cnt,
597 		DMA_TO_DEVICE);
598 	if (ib_dma_mapping_error(ibdev, lnk->wr_tx_dma_addr)) {
599 		rc = -EIO;
600 		goto dma_unmap;
601 	}
602 	smc_wr_init_sge(lnk);
603 	memset(lnk->wr_tx_mask, 0,
604 	       BITS_TO_LONGS(SMC_WR_BUF_CNT) * sizeof(*lnk->wr_tx_mask));
605 	return rc;
606 
607 dma_unmap:
608 	ib_dma_unmap_single(ibdev, lnk->wr_rx_dma_addr,
609 			    SMC_WR_BUF_SIZE * lnk->wr_rx_cnt,
610 			    DMA_FROM_DEVICE);
611 	lnk->wr_rx_dma_addr = 0;
612 out:
613 	return rc;
614 }
615