xref: /openbmc/linux/drivers/infiniband/ulp/rtrs/rtrs-clt.c (revision 34d6f206a88c2651d216bd3487ac956a40b2ba8e)
1  // SPDX-License-Identifier: GPL-2.0-or-later
2  /*
3   * RDMA Transport Layer
4   *
5   * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6   * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7   * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
8   */
9  
10  #undef pr_fmt
11  #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
12  
13  #include <linux/module.h>
14  #include <linux/rculist.h>
15  #include <linux/random.h>
16  
17  #include "rtrs-clt.h"
18  #include "rtrs-log.h"
19  #include "rtrs-clt-trace.h"
20  
21  #define RTRS_CONNECT_TIMEOUT_MS 30000
22  /*
23   * Wait a bit before trying to reconnect after a failure
24   * in order to give server time to finish clean up which
25   * leads to "false positives" failed reconnect attempts
26   */
27  #define RTRS_RECONNECT_BACKOFF 1000
28  /*
29   * Wait for additional random time between 0 and 8 seconds
30   * before starting to reconnect to avoid clients reconnecting
31   * all at once in case of a major network outage
32   */
33  #define RTRS_RECONNECT_SEED 8
34  
35  #define FIRST_CONN 0x01
36  /* limit to 128 * 4k = 512k max IO */
37  #define RTRS_MAX_SEGMENTS          128
38  
39  MODULE_DESCRIPTION("RDMA Transport Client");
40  MODULE_LICENSE("GPL");
41  
42  static const struct rtrs_rdma_dev_pd_ops dev_pd_ops;
43  static struct rtrs_rdma_dev_pd dev_pd = {
44  	.ops = &dev_pd_ops
45  };
46  
47  static struct workqueue_struct *rtrs_wq;
48  static const struct class rtrs_clt_dev_class = {
49  	.name = "rtrs-client",
50  };
51  
rtrs_clt_is_connected(const struct rtrs_clt_sess * clt)52  static inline bool rtrs_clt_is_connected(const struct rtrs_clt_sess *clt)
53  {
54  	struct rtrs_clt_path *clt_path;
55  	bool connected = false;
56  
57  	rcu_read_lock();
58  	list_for_each_entry_rcu(clt_path, &clt->paths_list, s.entry)
59  		if (READ_ONCE(clt_path->state) == RTRS_CLT_CONNECTED) {
60  			connected = true;
61  			break;
62  		}
63  	rcu_read_unlock();
64  
65  	return connected;
66  }
67  
68  static struct rtrs_permit *
__rtrs_get_permit(struct rtrs_clt_sess * clt,enum rtrs_clt_con_type con_type)69  __rtrs_get_permit(struct rtrs_clt_sess *clt, enum rtrs_clt_con_type con_type)
70  {
71  	size_t max_depth = clt->queue_depth;
72  	struct rtrs_permit *permit;
73  	int bit;
74  
75  	/*
76  	 * Adapted from null_blk get_tag(). Callers from different cpus may
77  	 * grab the same bit, since find_first_zero_bit is not atomic.
78  	 * But then the test_and_set_bit_lock will fail for all the
79  	 * callers but one, so that they will loop again.
80  	 * This way an explicit spinlock is not required.
81  	 */
82  	do {
83  		bit = find_first_zero_bit(clt->permits_map, max_depth);
84  		if (bit >= max_depth)
85  			return NULL;
86  	} while (test_and_set_bit_lock(bit, clt->permits_map));
87  
88  	permit = get_permit(clt, bit);
89  	WARN_ON(permit->mem_id != bit);
90  	permit->cpu_id = raw_smp_processor_id();
91  	permit->con_type = con_type;
92  
93  	return permit;
94  }
95  
__rtrs_put_permit(struct rtrs_clt_sess * clt,struct rtrs_permit * permit)96  static inline void __rtrs_put_permit(struct rtrs_clt_sess *clt,
97  				      struct rtrs_permit *permit)
98  {
99  	clear_bit_unlock(permit->mem_id, clt->permits_map);
100  }
101  
102  /**
103   * rtrs_clt_get_permit() - allocates permit for future RDMA operation
104   * @clt:	Current session
105   * @con_type:	Type of connection to use with the permit
106   * @can_wait:	Wait type
107   *
108   * Description:
109   *    Allocates permit for the following RDMA operation.  Permit is used
110   *    to preallocate all resources and to propagate memory pressure
111   *    up earlier.
112   *
113   * Context:
114   *    Can sleep if @wait == RTRS_PERMIT_WAIT
115   */
rtrs_clt_get_permit(struct rtrs_clt_sess * clt,enum rtrs_clt_con_type con_type,enum wait_type can_wait)116  struct rtrs_permit *rtrs_clt_get_permit(struct rtrs_clt_sess *clt,
117  					  enum rtrs_clt_con_type con_type,
118  					  enum wait_type can_wait)
119  {
120  	struct rtrs_permit *permit;
121  	DEFINE_WAIT(wait);
122  
123  	permit = __rtrs_get_permit(clt, con_type);
124  	if (permit || !can_wait)
125  		return permit;
126  
127  	do {
128  		prepare_to_wait(&clt->permits_wait, &wait,
129  				TASK_UNINTERRUPTIBLE);
130  		permit = __rtrs_get_permit(clt, con_type);
131  		if (permit)
132  			break;
133  
134  		io_schedule();
135  	} while (1);
136  
137  	finish_wait(&clt->permits_wait, &wait);
138  
139  	return permit;
140  }
141  EXPORT_SYMBOL(rtrs_clt_get_permit);
142  
143  /**
144   * rtrs_clt_put_permit() - puts allocated permit
145   * @clt:	Current session
146   * @permit:	Permit to be freed
147   *
148   * Context:
149   *    Does not matter
150   */
rtrs_clt_put_permit(struct rtrs_clt_sess * clt,struct rtrs_permit * permit)151  void rtrs_clt_put_permit(struct rtrs_clt_sess *clt,
152  			 struct rtrs_permit *permit)
153  {
154  	if (WARN_ON(!test_bit(permit->mem_id, clt->permits_map)))
155  		return;
156  
157  	__rtrs_put_permit(clt, permit);
158  
159  	/*
160  	 * rtrs_clt_get_permit() adds itself to the &clt->permits_wait list
161  	 * before calling schedule(). So if rtrs_clt_get_permit() is sleeping
162  	 * it must have added itself to &clt->permits_wait before
163  	 * __rtrs_put_permit() finished.
164  	 * Hence it is safe to guard wake_up() with a waitqueue_active() test.
165  	 */
166  	if (waitqueue_active(&clt->permits_wait))
167  		wake_up(&clt->permits_wait);
168  }
169  EXPORT_SYMBOL(rtrs_clt_put_permit);
170  
171  /**
172   * rtrs_permit_to_clt_con() - returns RDMA connection pointer by the permit
173   * @clt_path: client path pointer
174   * @permit: permit for the allocation of the RDMA buffer
175   * Note:
176   *     IO connection starts from 1.
177   *     0 connection is for user messages.
178   */
179  static
rtrs_permit_to_clt_con(struct rtrs_clt_path * clt_path,struct rtrs_permit * permit)180  struct rtrs_clt_con *rtrs_permit_to_clt_con(struct rtrs_clt_path *clt_path,
181  					    struct rtrs_permit *permit)
182  {
183  	int id = 0;
184  
185  	if (permit->con_type == RTRS_IO_CON)
186  		id = (permit->cpu_id % (clt_path->s.irq_con_num - 1)) + 1;
187  
188  	return to_clt_con(clt_path->s.con[id]);
189  }
190  
191  /**
192   * rtrs_clt_change_state() - change the session state through session state
193   * machine.
194   *
195   * @clt_path: client path to change the state of.
196   * @new_state: state to change to.
197   *
198   * returns true if sess's state is changed to new state, otherwise return false.
199   *
200   * Locks:
201   * state_wq lock must be hold.
202   */
rtrs_clt_change_state(struct rtrs_clt_path * clt_path,enum rtrs_clt_state new_state)203  static bool rtrs_clt_change_state(struct rtrs_clt_path *clt_path,
204  				     enum rtrs_clt_state new_state)
205  {
206  	enum rtrs_clt_state old_state;
207  	bool changed = false;
208  
209  	lockdep_assert_held(&clt_path->state_wq.lock);
210  
211  	old_state = clt_path->state;
212  	switch (new_state) {
213  	case RTRS_CLT_CONNECTING:
214  		switch (old_state) {
215  		case RTRS_CLT_RECONNECTING:
216  			changed = true;
217  			fallthrough;
218  		default:
219  			break;
220  		}
221  		break;
222  	case RTRS_CLT_RECONNECTING:
223  		switch (old_state) {
224  		case RTRS_CLT_CONNECTED:
225  		case RTRS_CLT_CONNECTING_ERR:
226  		case RTRS_CLT_CLOSED:
227  			changed = true;
228  			fallthrough;
229  		default:
230  			break;
231  		}
232  		break;
233  	case RTRS_CLT_CONNECTED:
234  		switch (old_state) {
235  		case RTRS_CLT_CONNECTING:
236  			changed = true;
237  			fallthrough;
238  		default:
239  			break;
240  		}
241  		break;
242  	case RTRS_CLT_CONNECTING_ERR:
243  		switch (old_state) {
244  		case RTRS_CLT_CONNECTING:
245  			changed = true;
246  			fallthrough;
247  		default:
248  			break;
249  		}
250  		break;
251  	case RTRS_CLT_CLOSING:
252  		switch (old_state) {
253  		case RTRS_CLT_CONNECTING:
254  		case RTRS_CLT_CONNECTING_ERR:
255  		case RTRS_CLT_RECONNECTING:
256  		case RTRS_CLT_CONNECTED:
257  			changed = true;
258  			fallthrough;
259  		default:
260  			break;
261  		}
262  		break;
263  	case RTRS_CLT_CLOSED:
264  		switch (old_state) {
265  		case RTRS_CLT_CLOSING:
266  			changed = true;
267  			fallthrough;
268  		default:
269  			break;
270  		}
271  		break;
272  	case RTRS_CLT_DEAD:
273  		switch (old_state) {
274  		case RTRS_CLT_CLOSED:
275  			changed = true;
276  			fallthrough;
277  		default:
278  			break;
279  		}
280  		break;
281  	default:
282  		break;
283  	}
284  	if (changed) {
285  		clt_path->state = new_state;
286  		wake_up_locked(&clt_path->state_wq);
287  	}
288  
289  	return changed;
290  }
291  
rtrs_clt_change_state_from_to(struct rtrs_clt_path * clt_path,enum rtrs_clt_state old_state,enum rtrs_clt_state new_state)292  static bool rtrs_clt_change_state_from_to(struct rtrs_clt_path *clt_path,
293  					   enum rtrs_clt_state old_state,
294  					   enum rtrs_clt_state new_state)
295  {
296  	bool changed = false;
297  
298  	spin_lock_irq(&clt_path->state_wq.lock);
299  	if (clt_path->state == old_state)
300  		changed = rtrs_clt_change_state(clt_path, new_state);
301  	spin_unlock_irq(&clt_path->state_wq.lock);
302  
303  	return changed;
304  }
305  
306  static void rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_path *clt_path);
rtrs_rdma_error_recovery(struct rtrs_clt_con * con)307  static void rtrs_rdma_error_recovery(struct rtrs_clt_con *con)
308  {
309  	struct rtrs_clt_path *clt_path = to_clt_path(con->c.path);
310  
311  	trace_rtrs_rdma_error_recovery(clt_path);
312  
313  	if (rtrs_clt_change_state_from_to(clt_path,
314  					   RTRS_CLT_CONNECTED,
315  					   RTRS_CLT_RECONNECTING)) {
316  		queue_work(rtrs_wq, &clt_path->err_recovery_work);
317  	} else {
318  		/*
319  		 * Error can happen just on establishing new connection,
320  		 * so notify waiter with error state, waiter is responsible
321  		 * for cleaning the rest and reconnect if needed.
322  		 */
323  		rtrs_clt_change_state_from_to(clt_path,
324  					       RTRS_CLT_CONNECTING,
325  					       RTRS_CLT_CONNECTING_ERR);
326  	}
327  }
328  
rtrs_clt_fast_reg_done(struct ib_cq * cq,struct ib_wc * wc)329  static void rtrs_clt_fast_reg_done(struct ib_cq *cq, struct ib_wc *wc)
330  {
331  	struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
332  
333  	if (wc->status != IB_WC_SUCCESS) {
334  		rtrs_err(con->c.path, "Failed IB_WR_REG_MR: %s\n",
335  			  ib_wc_status_msg(wc->status));
336  		rtrs_rdma_error_recovery(con);
337  	}
338  }
339  
340  static struct ib_cqe fast_reg_cqe = {
341  	.done = rtrs_clt_fast_reg_done
342  };
343  
344  static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno,
345  			      bool notify, bool can_wait);
346  
rtrs_clt_inv_rkey_done(struct ib_cq * cq,struct ib_wc * wc)347  static void rtrs_clt_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc)
348  {
349  	struct rtrs_clt_io_req *req =
350  		container_of(wc->wr_cqe, typeof(*req), inv_cqe);
351  	struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
352  
353  	if (wc->status != IB_WC_SUCCESS) {
354  		rtrs_err(con->c.path, "Failed IB_WR_LOCAL_INV: %s\n",
355  			  ib_wc_status_msg(wc->status));
356  		rtrs_rdma_error_recovery(con);
357  	}
358  	req->need_inv = false;
359  	if (req->need_inv_comp)
360  		complete(&req->inv_comp);
361  	else
362  		/* Complete request from INV callback */
363  		complete_rdma_req(req, req->inv_errno, true, false);
364  }
365  
rtrs_inv_rkey(struct rtrs_clt_io_req * req)366  static int rtrs_inv_rkey(struct rtrs_clt_io_req *req)
367  {
368  	struct rtrs_clt_con *con = req->con;
369  	struct ib_send_wr wr = {
370  		.opcode		    = IB_WR_LOCAL_INV,
371  		.wr_cqe		    = &req->inv_cqe,
372  		.send_flags	    = IB_SEND_SIGNALED,
373  		.ex.invalidate_rkey = req->mr->rkey,
374  	};
375  	req->inv_cqe.done = rtrs_clt_inv_rkey_done;
376  
377  	return ib_post_send(con->c.qp, &wr, NULL);
378  }
379  
complete_rdma_req(struct rtrs_clt_io_req * req,int errno,bool notify,bool can_wait)380  static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno,
381  			      bool notify, bool can_wait)
382  {
383  	struct rtrs_clt_con *con = req->con;
384  	struct rtrs_clt_path *clt_path;
385  	int err;
386  
387  	if (!req->in_use)
388  		return;
389  	if (WARN_ON(!req->con))
390  		return;
391  	clt_path = to_clt_path(con->c.path);
392  
393  	if (req->sg_cnt) {
394  		if (req->dir == DMA_FROM_DEVICE && req->need_inv) {
395  			/*
396  			 * We are here to invalidate read requests
397  			 * ourselves.  In normal scenario server should
398  			 * send INV for all read requests, but
399  			 * we are here, thus two things could happen:
400  			 *
401  			 *    1.  this is failover, when errno != 0
402  			 *        and can_wait == 1,
403  			 *
404  			 *    2.  something totally bad happened and
405  			 *        server forgot to send INV, so we
406  			 *        should do that ourselves.
407  			 */
408  
409  			if (can_wait) {
410  				req->need_inv_comp = true;
411  			} else {
412  				/* This should be IO path, so always notify */
413  				WARN_ON(!notify);
414  				/* Save errno for INV callback */
415  				req->inv_errno = errno;
416  			}
417  
418  			refcount_inc(&req->ref);
419  			err = rtrs_inv_rkey(req);
420  			if (err) {
421  				rtrs_err(con->c.path, "Send INV WR key=%#x: %d\n",
422  					  req->mr->rkey, err);
423  			} else if (can_wait) {
424  				wait_for_completion(&req->inv_comp);
425  			} else {
426  				/*
427  				 * Something went wrong, so request will be
428  				 * completed from INV callback.
429  				 */
430  				WARN_ON_ONCE(1);
431  
432  				return;
433  			}
434  			if (!refcount_dec_and_test(&req->ref))
435  				return;
436  		}
437  		ib_dma_unmap_sg(clt_path->s.dev->ib_dev, req->sglist,
438  				req->sg_cnt, req->dir);
439  	}
440  	if (!refcount_dec_and_test(&req->ref))
441  		return;
442  	if (req->mp_policy == MP_POLICY_MIN_INFLIGHT)
443  		atomic_dec(&clt_path->stats->inflight);
444  
445  	req->in_use = false;
446  	req->con = NULL;
447  
448  	if (errno) {
449  		rtrs_err_rl(con->c.path, "IO request failed: error=%d path=%s [%s:%u] notify=%d\n",
450  			    errno, kobject_name(&clt_path->kobj), clt_path->hca_name,
451  			    clt_path->hca_port, notify);
452  	}
453  
454  	if (notify)
455  		req->conf(req->priv, errno);
456  }
457  
rtrs_post_send_rdma(struct rtrs_clt_con * con,struct rtrs_clt_io_req * req,struct rtrs_rbuf * rbuf,u32 off,u32 imm,struct ib_send_wr * wr)458  static int rtrs_post_send_rdma(struct rtrs_clt_con *con,
459  				struct rtrs_clt_io_req *req,
460  				struct rtrs_rbuf *rbuf, u32 off,
461  				u32 imm, struct ib_send_wr *wr)
462  {
463  	struct rtrs_clt_path *clt_path = to_clt_path(con->c.path);
464  	enum ib_send_flags flags;
465  	struct ib_sge sge;
466  
467  	if (!req->sg_size) {
468  		rtrs_wrn(con->c.path,
469  			 "Doing RDMA Write failed, no data supplied\n");
470  		return -EINVAL;
471  	}
472  
473  	/* user data and user message in the first list element */
474  	sge.addr   = req->iu->dma_addr;
475  	sge.length = req->sg_size;
476  	sge.lkey   = clt_path->s.dev->ib_pd->local_dma_lkey;
477  
478  	/*
479  	 * From time to time we have to post signalled sends,
480  	 * or send queue will fill up and only QP reset can help.
481  	 */
482  	flags = atomic_inc_return(&con->c.wr_cnt) % clt_path->s.signal_interval ?
483  			0 : IB_SEND_SIGNALED;
484  
485  	ib_dma_sync_single_for_device(clt_path->s.dev->ib_dev,
486  				      req->iu->dma_addr,
487  				      req->sg_size, DMA_TO_DEVICE);
488  
489  	return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, &sge, 1,
490  					    rbuf->rkey, rbuf->addr + off,
491  					    imm, flags, wr, NULL);
492  }
493  
process_io_rsp(struct rtrs_clt_path * clt_path,u32 msg_id,s16 errno,bool w_inval)494  static void process_io_rsp(struct rtrs_clt_path *clt_path, u32 msg_id,
495  			   s16 errno, bool w_inval)
496  {
497  	struct rtrs_clt_io_req *req;
498  
499  	if (WARN_ON(msg_id >= clt_path->queue_depth))
500  		return;
501  
502  	req = &clt_path->reqs[msg_id];
503  	/* Drop need_inv if server responded with send with invalidation */
504  	req->need_inv &= !w_inval;
505  	complete_rdma_req(req, errno, true, false);
506  }
507  
rtrs_clt_recv_done(struct rtrs_clt_con * con,struct ib_wc * wc)508  static void rtrs_clt_recv_done(struct rtrs_clt_con *con, struct ib_wc *wc)
509  {
510  	struct rtrs_iu *iu;
511  	int err;
512  	struct rtrs_clt_path *clt_path = to_clt_path(con->c.path);
513  
514  	WARN_ON((clt_path->flags & RTRS_MSG_NEW_RKEY_F) == 0);
515  	iu = container_of(wc->wr_cqe, struct rtrs_iu,
516  			  cqe);
517  	err = rtrs_iu_post_recv(&con->c, iu);
518  	if (err) {
519  		rtrs_err(con->c.path, "post iu failed %d\n", err);
520  		rtrs_rdma_error_recovery(con);
521  	}
522  }
523  
rtrs_clt_rkey_rsp_done(struct rtrs_clt_con * con,struct ib_wc * wc)524  static void rtrs_clt_rkey_rsp_done(struct rtrs_clt_con *con, struct ib_wc *wc)
525  {
526  	struct rtrs_clt_path *clt_path = to_clt_path(con->c.path);
527  	struct rtrs_msg_rkey_rsp *msg;
528  	u32 imm_type, imm_payload;
529  	bool w_inval = false;
530  	struct rtrs_iu *iu;
531  	u32 buf_id;
532  	int err;
533  
534  	WARN_ON((clt_path->flags & RTRS_MSG_NEW_RKEY_F) == 0);
535  
536  	iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
537  
538  	if (wc->byte_len < sizeof(*msg)) {
539  		rtrs_err(con->c.path, "rkey response is malformed: size %d\n",
540  			  wc->byte_len);
541  		goto out;
542  	}
543  	ib_dma_sync_single_for_cpu(clt_path->s.dev->ib_dev, iu->dma_addr,
544  				   iu->size, DMA_FROM_DEVICE);
545  	msg = iu->buf;
546  	if (le16_to_cpu(msg->type) != RTRS_MSG_RKEY_RSP) {
547  		rtrs_err(clt_path->clt,
548  			  "rkey response is malformed: type %d\n",
549  			  le16_to_cpu(msg->type));
550  		goto out;
551  	}
552  	buf_id = le16_to_cpu(msg->buf_id);
553  	if (WARN_ON(buf_id >= clt_path->queue_depth))
554  		goto out;
555  
556  	rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), &imm_type, &imm_payload);
557  	if (imm_type == RTRS_IO_RSP_IMM ||
558  	    imm_type == RTRS_IO_RSP_W_INV_IMM) {
559  		u32 msg_id;
560  
561  		w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM);
562  		rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err);
563  
564  		if (WARN_ON(buf_id != msg_id))
565  			goto out;
566  		clt_path->rbufs[buf_id].rkey = le32_to_cpu(msg->rkey);
567  		process_io_rsp(clt_path, msg_id, err, w_inval);
568  	}
569  	ib_dma_sync_single_for_device(clt_path->s.dev->ib_dev, iu->dma_addr,
570  				      iu->size, DMA_FROM_DEVICE);
571  	return rtrs_clt_recv_done(con, wc);
572  out:
573  	rtrs_rdma_error_recovery(con);
574  }
575  
576  static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc);
577  
578  static struct ib_cqe io_comp_cqe = {
579  	.done = rtrs_clt_rdma_done
580  };
581  
582  /*
583   * Post x2 empty WRs: first is for this RDMA with IMM,
584   * second is for RECV with INV, which happened earlier.
585   */
rtrs_post_recv_empty_x2(struct rtrs_con * con,struct ib_cqe * cqe)586  static int rtrs_post_recv_empty_x2(struct rtrs_con *con, struct ib_cqe *cqe)
587  {
588  	struct ib_recv_wr wr_arr[2], *wr;
589  	int i;
590  
591  	memset(wr_arr, 0, sizeof(wr_arr));
592  	for (i = 0; i < ARRAY_SIZE(wr_arr); i++) {
593  		wr = &wr_arr[i];
594  		wr->wr_cqe  = cqe;
595  		if (i)
596  			/* Chain backwards */
597  			wr->next = &wr_arr[i - 1];
598  	}
599  
600  	return ib_post_recv(con->qp, wr, NULL);
601  }
602  
rtrs_clt_rdma_done(struct ib_cq * cq,struct ib_wc * wc)603  static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc)
604  {
605  	struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
606  	struct rtrs_clt_path *clt_path = to_clt_path(con->c.path);
607  	u32 imm_type, imm_payload;
608  	bool w_inval = false;
609  	int err;
610  
611  	if (wc->status != IB_WC_SUCCESS) {
612  		if (wc->status != IB_WC_WR_FLUSH_ERR) {
613  			rtrs_err(clt_path->clt, "RDMA failed: %s\n",
614  				  ib_wc_status_msg(wc->status));
615  			rtrs_rdma_error_recovery(con);
616  		}
617  		return;
618  	}
619  	rtrs_clt_update_wc_stats(con);
620  
621  	switch (wc->opcode) {
622  	case IB_WC_RECV_RDMA_WITH_IMM:
623  		/*
624  		 * post_recv() RDMA write completions of IO reqs (read/write)
625  		 * and hb
626  		 */
627  		if (WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done))
628  			return;
629  		clt_path->s.hb_missed_cnt = 0;
630  		rtrs_from_imm(be32_to_cpu(wc->ex.imm_data),
631  			       &imm_type, &imm_payload);
632  		if (imm_type == RTRS_IO_RSP_IMM ||
633  		    imm_type == RTRS_IO_RSP_W_INV_IMM) {
634  			u32 msg_id;
635  
636  			w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM);
637  			rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err);
638  
639  			process_io_rsp(clt_path, msg_id, err, w_inval);
640  		} else if (imm_type == RTRS_HB_MSG_IMM) {
641  			WARN_ON(con->c.cid);
642  			rtrs_send_hb_ack(&clt_path->s);
643  			if (clt_path->flags & RTRS_MSG_NEW_RKEY_F)
644  				return  rtrs_clt_recv_done(con, wc);
645  		} else if (imm_type == RTRS_HB_ACK_IMM) {
646  			WARN_ON(con->c.cid);
647  			clt_path->s.hb_cur_latency =
648  				ktime_sub(ktime_get(), clt_path->s.hb_last_sent);
649  			if (clt_path->flags & RTRS_MSG_NEW_RKEY_F)
650  				return  rtrs_clt_recv_done(con, wc);
651  		} else {
652  			rtrs_wrn(con->c.path, "Unknown IMM type %u\n",
653  				  imm_type);
654  		}
655  		if (w_inval)
656  			/*
657  			 * Post x2 empty WRs: first is for this RDMA with IMM,
658  			 * second is for RECV with INV, which happened earlier.
659  			 */
660  			err = rtrs_post_recv_empty_x2(&con->c, &io_comp_cqe);
661  		else
662  			err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
663  		if (err) {
664  			rtrs_err(con->c.path, "rtrs_post_recv_empty(): %d\n",
665  				  err);
666  			rtrs_rdma_error_recovery(con);
667  		}
668  		break;
669  	case IB_WC_RECV:
670  		/*
671  		 * Key invalidations from server side
672  		 */
673  		clt_path->s.hb_missed_cnt = 0;
674  		WARN_ON(!(wc->wc_flags & IB_WC_WITH_INVALIDATE ||
675  			  wc->wc_flags & IB_WC_WITH_IMM));
676  		WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done);
677  		if (clt_path->flags & RTRS_MSG_NEW_RKEY_F) {
678  			if (wc->wc_flags & IB_WC_WITH_INVALIDATE)
679  				return  rtrs_clt_recv_done(con, wc);
680  
681  			return  rtrs_clt_rkey_rsp_done(con, wc);
682  		}
683  		break;
684  	case IB_WC_RDMA_WRITE:
685  		/*
686  		 * post_send() RDMA write completions of IO reqs (read/write)
687  		 * and hb.
688  		 */
689  		break;
690  
691  	default:
692  		rtrs_wrn(clt_path->clt, "Unexpected WC type: %d\n", wc->opcode);
693  		return;
694  	}
695  }
696  
post_recv_io(struct rtrs_clt_con * con,size_t q_size)697  static int post_recv_io(struct rtrs_clt_con *con, size_t q_size)
698  {
699  	int err, i;
700  	struct rtrs_clt_path *clt_path = to_clt_path(con->c.path);
701  
702  	for (i = 0; i < q_size; i++) {
703  		if (clt_path->flags & RTRS_MSG_NEW_RKEY_F) {
704  			struct rtrs_iu *iu = &con->rsp_ius[i];
705  
706  			err = rtrs_iu_post_recv(&con->c, iu);
707  		} else {
708  			err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
709  		}
710  		if (err)
711  			return err;
712  	}
713  
714  	return 0;
715  }
716  
post_recv_path(struct rtrs_clt_path * clt_path)717  static int post_recv_path(struct rtrs_clt_path *clt_path)
718  {
719  	size_t q_size = 0;
720  	int err, cid;
721  
722  	for (cid = 0; cid < clt_path->s.con_num; cid++) {
723  		if (cid == 0)
724  			q_size = SERVICE_CON_QUEUE_DEPTH;
725  		else
726  			q_size = clt_path->queue_depth;
727  
728  		/*
729  		 * x2 for RDMA read responses + FR key invalidations,
730  		 * RDMA writes do not require any FR registrations.
731  		 */
732  		q_size *= 2;
733  
734  		err = post_recv_io(to_clt_con(clt_path->s.con[cid]), q_size);
735  		if (err) {
736  			rtrs_err(clt_path->clt, "post_recv_io(), err: %d\n",
737  				 err);
738  			return err;
739  		}
740  	}
741  
742  	return 0;
743  }
744  
745  struct path_it {
746  	int i;
747  	struct list_head skip_list;
748  	struct rtrs_clt_sess *clt;
749  	struct rtrs_clt_path *(*next_path)(struct path_it *it);
750  };
751  
752  /*
753   * rtrs_clt_get_next_path_or_null - get clt path from the list or return NULL
754   * @head:	the head for the list.
755   * @clt_path:	The element to take the next clt_path from.
756   *
757   * Next clt path returned in round-robin fashion, i.e. head will be skipped,
758   * but if list is observed as empty, NULL will be returned.
759   *
760   * This function may safely run concurrently with the _rcu list-mutation
761   * primitives such as list_add_rcu() as long as it's guarded by rcu_read_lock().
762   */
763  static inline struct rtrs_clt_path *
rtrs_clt_get_next_path_or_null(struct list_head * head,struct rtrs_clt_path * clt_path)764  rtrs_clt_get_next_path_or_null(struct list_head *head, struct rtrs_clt_path *clt_path)
765  {
766  	return list_next_or_null_rcu(head, &clt_path->s.entry, typeof(*clt_path), s.entry) ?:
767  				     list_next_or_null_rcu(head,
768  							   READ_ONCE((&clt_path->s.entry)->next),
769  							   typeof(*clt_path), s.entry);
770  }
771  
772  /**
773   * get_next_path_rr() - Returns path in round-robin fashion.
774   * @it:	the path pointer
775   *
776   * Related to @MP_POLICY_RR
777   *
778   * Locks:
779   *    rcu_read_lock() must be hold.
780   */
get_next_path_rr(struct path_it * it)781  static struct rtrs_clt_path *get_next_path_rr(struct path_it *it)
782  {
783  	struct rtrs_clt_path __rcu **ppcpu_path;
784  	struct rtrs_clt_path *path;
785  	struct rtrs_clt_sess *clt;
786  
787  	clt = it->clt;
788  
789  	/*
790  	 * Here we use two RCU objects: @paths_list and @pcpu_path
791  	 * pointer.  See rtrs_clt_remove_path_from_arr() for details
792  	 * how that is handled.
793  	 */
794  
795  	ppcpu_path = this_cpu_ptr(clt->pcpu_path);
796  	path = rcu_dereference(*ppcpu_path);
797  	if (!path)
798  		path = list_first_or_null_rcu(&clt->paths_list,
799  					      typeof(*path), s.entry);
800  	else
801  		path = rtrs_clt_get_next_path_or_null(&clt->paths_list, path);
802  
803  	rcu_assign_pointer(*ppcpu_path, path);
804  
805  	return path;
806  }
807  
808  /**
809   * get_next_path_min_inflight() - Returns path with minimal inflight count.
810   * @it:	the path pointer
811   *
812   * Related to @MP_POLICY_MIN_INFLIGHT
813   *
814   * Locks:
815   *    rcu_read_lock() must be hold.
816   */
get_next_path_min_inflight(struct path_it * it)817  static struct rtrs_clt_path *get_next_path_min_inflight(struct path_it *it)
818  {
819  	struct rtrs_clt_path *min_path = NULL;
820  	struct rtrs_clt_sess *clt = it->clt;
821  	struct rtrs_clt_path *clt_path;
822  	int min_inflight = INT_MAX;
823  	int inflight;
824  
825  	list_for_each_entry_rcu(clt_path, &clt->paths_list, s.entry) {
826  		if (READ_ONCE(clt_path->state) != RTRS_CLT_CONNECTED)
827  			continue;
828  
829  		if (!list_empty(raw_cpu_ptr(clt_path->mp_skip_entry)))
830  			continue;
831  
832  		inflight = atomic_read(&clt_path->stats->inflight);
833  
834  		if (inflight < min_inflight) {
835  			min_inflight = inflight;
836  			min_path = clt_path;
837  		}
838  	}
839  
840  	/*
841  	 * add the path to the skip list, so that next time we can get
842  	 * a different one
843  	 */
844  	if (min_path)
845  		list_add(raw_cpu_ptr(min_path->mp_skip_entry), &it->skip_list);
846  
847  	return min_path;
848  }
849  
850  /**
851   * get_next_path_min_latency() - Returns path with minimal latency.
852   * @it:	the path pointer
853   *
854   * Return: a path with the lowest latency or NULL if all paths are tried
855   *
856   * Locks:
857   *    rcu_read_lock() must be hold.
858   *
859   * Related to @MP_POLICY_MIN_LATENCY
860   *
861   * This DOES skip an already-tried path.
862   * There is a skip-list to skip a path if the path has tried but failed.
863   * It will try the minimum latency path and then the second minimum latency
864   * path and so on. Finally it will return NULL if all paths are tried.
865   * Therefore the caller MUST check the returned
866   * path is NULL and trigger the IO error.
867   */
get_next_path_min_latency(struct path_it * it)868  static struct rtrs_clt_path *get_next_path_min_latency(struct path_it *it)
869  {
870  	struct rtrs_clt_path *min_path = NULL;
871  	struct rtrs_clt_sess *clt = it->clt;
872  	struct rtrs_clt_path *clt_path;
873  	ktime_t min_latency = KTIME_MAX;
874  	ktime_t latency;
875  
876  	list_for_each_entry_rcu(clt_path, &clt->paths_list, s.entry) {
877  		if (READ_ONCE(clt_path->state) != RTRS_CLT_CONNECTED)
878  			continue;
879  
880  		if (!list_empty(raw_cpu_ptr(clt_path->mp_skip_entry)))
881  			continue;
882  
883  		latency = clt_path->s.hb_cur_latency;
884  
885  		if (latency < min_latency) {
886  			min_latency = latency;
887  			min_path = clt_path;
888  		}
889  	}
890  
891  	/*
892  	 * add the path to the skip list, so that next time we can get
893  	 * a different one
894  	 */
895  	if (min_path)
896  		list_add(raw_cpu_ptr(min_path->mp_skip_entry), &it->skip_list);
897  
898  	return min_path;
899  }
900  
path_it_init(struct path_it * it,struct rtrs_clt_sess * clt)901  static inline void path_it_init(struct path_it *it, struct rtrs_clt_sess *clt)
902  {
903  	INIT_LIST_HEAD(&it->skip_list);
904  	it->clt = clt;
905  	it->i = 0;
906  
907  	if (clt->mp_policy == MP_POLICY_RR)
908  		it->next_path = get_next_path_rr;
909  	else if (clt->mp_policy == MP_POLICY_MIN_INFLIGHT)
910  		it->next_path = get_next_path_min_inflight;
911  	else
912  		it->next_path = get_next_path_min_latency;
913  }
914  
path_it_deinit(struct path_it * it)915  static inline void path_it_deinit(struct path_it *it)
916  {
917  	struct list_head *skip, *tmp;
918  	/*
919  	 * The skip_list is used only for the MIN_INFLIGHT and MIN_LATENCY policies.
920  	 * We need to remove paths from it, so that next IO can insert
921  	 * paths (->mp_skip_entry) into a skip_list again.
922  	 */
923  	list_for_each_safe(skip, tmp, &it->skip_list)
924  		list_del_init(skip);
925  }
926  
927  /**
928   * rtrs_clt_init_req() - Initialize an rtrs_clt_io_req holding information
929   * about an inflight IO.
930   * The user buffer holding user control message (not data) is copied into
931   * the corresponding buffer of rtrs_iu (req->iu->buf), which later on will
932   * also hold the control message of rtrs.
933   * @req: an io request holding information about IO.
934   * @clt_path: client path
935   * @conf: conformation callback function to notify upper layer.
936   * @permit: permit for allocation of RDMA remote buffer
937   * @priv: private pointer
938   * @vec: kernel vector containing control message
939   * @usr_len: length of the user message
940   * @sg: scater list for IO data
941   * @sg_cnt: number of scater list entries
942   * @data_len: length of the IO data
943   * @dir: direction of the IO.
944   */
rtrs_clt_init_req(struct rtrs_clt_io_req * req,struct rtrs_clt_path * clt_path,void (* conf)(void * priv,int errno),struct rtrs_permit * permit,void * priv,const struct kvec * vec,size_t usr_len,struct scatterlist * sg,size_t sg_cnt,size_t data_len,int dir)945  static void rtrs_clt_init_req(struct rtrs_clt_io_req *req,
946  			      struct rtrs_clt_path *clt_path,
947  			      void (*conf)(void *priv, int errno),
948  			      struct rtrs_permit *permit, void *priv,
949  			      const struct kvec *vec, size_t usr_len,
950  			      struct scatterlist *sg, size_t sg_cnt,
951  			      size_t data_len, int dir)
952  {
953  	struct iov_iter iter;
954  	size_t len;
955  
956  	req->permit = permit;
957  	req->in_use = true;
958  	req->usr_len = usr_len;
959  	req->data_len = data_len;
960  	req->sglist = sg;
961  	req->sg_cnt = sg_cnt;
962  	req->priv = priv;
963  	req->dir = dir;
964  	req->con = rtrs_permit_to_clt_con(clt_path, permit);
965  	req->conf = conf;
966  	req->need_inv = false;
967  	req->need_inv_comp = false;
968  	req->inv_errno = 0;
969  	refcount_set(&req->ref, 1);
970  	req->mp_policy = clt_path->clt->mp_policy;
971  
972  	iov_iter_kvec(&iter, ITER_SOURCE, vec, 1, usr_len);
973  	len = _copy_from_iter(req->iu->buf, usr_len, &iter);
974  	WARN_ON(len != usr_len);
975  
976  	reinit_completion(&req->inv_comp);
977  }
978  
979  static struct rtrs_clt_io_req *
rtrs_clt_get_req(struct rtrs_clt_path * clt_path,void (* conf)(void * priv,int errno),struct rtrs_permit * permit,void * priv,const struct kvec * vec,size_t usr_len,struct scatterlist * sg,size_t sg_cnt,size_t data_len,int dir)980  rtrs_clt_get_req(struct rtrs_clt_path *clt_path,
981  		 void (*conf)(void *priv, int errno),
982  		 struct rtrs_permit *permit, void *priv,
983  		 const struct kvec *vec, size_t usr_len,
984  		 struct scatterlist *sg, size_t sg_cnt,
985  		 size_t data_len, int dir)
986  {
987  	struct rtrs_clt_io_req *req;
988  
989  	req = &clt_path->reqs[permit->mem_id];
990  	rtrs_clt_init_req(req, clt_path, conf, permit, priv, vec, usr_len,
991  			   sg, sg_cnt, data_len, dir);
992  	return req;
993  }
994  
995  static struct rtrs_clt_io_req *
rtrs_clt_get_copy_req(struct rtrs_clt_path * alive_path,struct rtrs_clt_io_req * fail_req)996  rtrs_clt_get_copy_req(struct rtrs_clt_path *alive_path,
997  		       struct rtrs_clt_io_req *fail_req)
998  {
999  	struct rtrs_clt_io_req *req;
1000  	struct kvec vec = {
1001  		.iov_base = fail_req->iu->buf,
1002  		.iov_len  = fail_req->usr_len
1003  	};
1004  
1005  	req = &alive_path->reqs[fail_req->permit->mem_id];
1006  	rtrs_clt_init_req(req, alive_path, fail_req->conf, fail_req->permit,
1007  			   fail_req->priv, &vec, fail_req->usr_len,
1008  			   fail_req->sglist, fail_req->sg_cnt,
1009  			   fail_req->data_len, fail_req->dir);
1010  	return req;
1011  }
1012  
rtrs_post_rdma_write_sg(struct rtrs_clt_con * con,struct rtrs_clt_io_req * req,struct rtrs_rbuf * rbuf,bool fr_en,u32 count,u32 size,u32 imm,struct ib_send_wr * wr,struct ib_send_wr * tail)1013  static int rtrs_post_rdma_write_sg(struct rtrs_clt_con *con,
1014  				   struct rtrs_clt_io_req *req,
1015  				   struct rtrs_rbuf *rbuf, bool fr_en,
1016  				   u32 count, u32 size, u32 imm,
1017  				   struct ib_send_wr *wr,
1018  				   struct ib_send_wr *tail)
1019  {
1020  	struct rtrs_clt_path *clt_path = to_clt_path(con->c.path);
1021  	struct ib_sge *sge = req->sge;
1022  	enum ib_send_flags flags;
1023  	struct scatterlist *sg;
1024  	size_t num_sge;
1025  	int i;
1026  	struct ib_send_wr *ptail = NULL;
1027  
1028  	if (fr_en) {
1029  		i = 0;
1030  		sge[i].addr   = req->mr->iova;
1031  		sge[i].length = req->mr->length;
1032  		sge[i].lkey   = req->mr->lkey;
1033  		i++;
1034  		num_sge = 2;
1035  		ptail = tail;
1036  	} else {
1037  		for_each_sg(req->sglist, sg, count, i) {
1038  			sge[i].addr   = sg_dma_address(sg);
1039  			sge[i].length = sg_dma_len(sg);
1040  			sge[i].lkey   = clt_path->s.dev->ib_pd->local_dma_lkey;
1041  		}
1042  		num_sge = 1 + count;
1043  	}
1044  	sge[i].addr   = req->iu->dma_addr;
1045  	sge[i].length = size;
1046  	sge[i].lkey   = clt_path->s.dev->ib_pd->local_dma_lkey;
1047  
1048  	/*
1049  	 * From time to time we have to post signalled sends,
1050  	 * or send queue will fill up and only QP reset can help.
1051  	 */
1052  	flags = atomic_inc_return(&con->c.wr_cnt) % clt_path->s.signal_interval ?
1053  			0 : IB_SEND_SIGNALED;
1054  
1055  	ib_dma_sync_single_for_device(clt_path->s.dev->ib_dev,
1056  				      req->iu->dma_addr,
1057  				      size, DMA_TO_DEVICE);
1058  
1059  	return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, sge, num_sge,
1060  					    rbuf->rkey, rbuf->addr, imm,
1061  					    flags, wr, ptail);
1062  }
1063  
rtrs_map_sg_fr(struct rtrs_clt_io_req * req,size_t count)1064  static int rtrs_map_sg_fr(struct rtrs_clt_io_req *req, size_t count)
1065  {
1066  	int nr;
1067  
1068  	/* Align the MR to a 4K page size to match the block virt boundary */
1069  	nr = ib_map_mr_sg(req->mr, req->sglist, count, NULL, SZ_4K);
1070  	if (nr != count)
1071  		return nr < 0 ? nr : -EINVAL;
1072  	ib_update_fast_reg_key(req->mr, ib_inc_rkey(req->mr->rkey));
1073  
1074  	return nr;
1075  }
1076  
rtrs_clt_write_req(struct rtrs_clt_io_req * req)1077  static int rtrs_clt_write_req(struct rtrs_clt_io_req *req)
1078  {
1079  	struct rtrs_clt_con *con = req->con;
1080  	struct rtrs_path *s = con->c.path;
1081  	struct rtrs_clt_path *clt_path = to_clt_path(s);
1082  	struct rtrs_msg_rdma_write *msg;
1083  
1084  	struct rtrs_rbuf *rbuf;
1085  	int ret, count = 0;
1086  	u32 imm, buf_id;
1087  	struct ib_reg_wr rwr;
1088  	struct ib_send_wr inv_wr;
1089  	struct ib_send_wr *wr = NULL;
1090  	bool fr_en = false;
1091  
1092  	const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len;
1093  
1094  	if (tsize > clt_path->chunk_size) {
1095  		rtrs_wrn(s, "Write request failed, size too big %zu > %d\n",
1096  			  tsize, clt_path->chunk_size);
1097  		return -EMSGSIZE;
1098  	}
1099  	if (req->sg_cnt) {
1100  		count = ib_dma_map_sg(clt_path->s.dev->ib_dev, req->sglist,
1101  				      req->sg_cnt, req->dir);
1102  		if (!count) {
1103  			rtrs_wrn(s, "Write request failed, map failed\n");
1104  			return -EINVAL;
1105  		}
1106  	}
1107  	/* put rtrs msg after sg and user message */
1108  	msg = req->iu->buf + req->usr_len;
1109  	msg->type = cpu_to_le16(RTRS_MSG_WRITE);
1110  	msg->usr_len = cpu_to_le16(req->usr_len);
1111  
1112  	/* rtrs message on server side will be after user data and message */
1113  	imm = req->permit->mem_off + req->data_len + req->usr_len;
1114  	imm = rtrs_to_io_req_imm(imm);
1115  	buf_id = req->permit->mem_id;
1116  	req->sg_size = tsize;
1117  	rbuf = &clt_path->rbufs[buf_id];
1118  
1119  	if (count) {
1120  		ret = rtrs_map_sg_fr(req, count);
1121  		if (ret < 0) {
1122  			rtrs_err_rl(s,
1123  				    "Write request failed, failed to map fast reg. data, err: %d\n",
1124  				    ret);
1125  			ib_dma_unmap_sg(clt_path->s.dev->ib_dev, req->sglist,
1126  					req->sg_cnt, req->dir);
1127  			return ret;
1128  		}
1129  		inv_wr = (struct ib_send_wr) {
1130  			.opcode		    = IB_WR_LOCAL_INV,
1131  			.wr_cqe		    = &req->inv_cqe,
1132  			.send_flags	    = IB_SEND_SIGNALED,
1133  			.ex.invalidate_rkey = req->mr->rkey,
1134  		};
1135  		req->inv_cqe.done = rtrs_clt_inv_rkey_done;
1136  		rwr = (struct ib_reg_wr) {
1137  			.wr.opcode = IB_WR_REG_MR,
1138  			.wr.wr_cqe = &fast_reg_cqe,
1139  			.mr = req->mr,
1140  			.key = req->mr->rkey,
1141  			.access = (IB_ACCESS_LOCAL_WRITE),
1142  		};
1143  		wr = &rwr.wr;
1144  		fr_en = true;
1145  		refcount_inc(&req->ref);
1146  	}
1147  	/*
1148  	 * Update stats now, after request is successfully sent it is not
1149  	 * safe anymore to touch it.
1150  	 */
1151  	rtrs_clt_update_all_stats(req, WRITE);
1152  
1153  	ret = rtrs_post_rdma_write_sg(req->con, req, rbuf, fr_en, count,
1154  				      req->usr_len + sizeof(*msg),
1155  				      imm, wr, &inv_wr);
1156  	if (ret) {
1157  		rtrs_err_rl(s,
1158  			    "Write request failed: error=%d path=%s [%s:%u]\n",
1159  			    ret, kobject_name(&clt_path->kobj), clt_path->hca_name,
1160  			    clt_path->hca_port);
1161  		if (req->mp_policy == MP_POLICY_MIN_INFLIGHT)
1162  			atomic_dec(&clt_path->stats->inflight);
1163  		if (req->sg_cnt)
1164  			ib_dma_unmap_sg(clt_path->s.dev->ib_dev, req->sglist,
1165  					req->sg_cnt, req->dir);
1166  	}
1167  
1168  	return ret;
1169  }
1170  
rtrs_clt_read_req(struct rtrs_clt_io_req * req)1171  static int rtrs_clt_read_req(struct rtrs_clt_io_req *req)
1172  {
1173  	struct rtrs_clt_con *con = req->con;
1174  	struct rtrs_path *s = con->c.path;
1175  	struct rtrs_clt_path *clt_path = to_clt_path(s);
1176  	struct rtrs_msg_rdma_read *msg;
1177  	struct rtrs_ib_dev *dev = clt_path->s.dev;
1178  
1179  	struct ib_reg_wr rwr;
1180  	struct ib_send_wr *wr = NULL;
1181  
1182  	int ret, count = 0;
1183  	u32 imm, buf_id;
1184  
1185  	const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len;
1186  
1187  	if (tsize > clt_path->chunk_size) {
1188  		rtrs_wrn(s,
1189  			  "Read request failed, message size is %zu, bigger than CHUNK_SIZE %d\n",
1190  			  tsize, clt_path->chunk_size);
1191  		return -EMSGSIZE;
1192  	}
1193  
1194  	if (req->sg_cnt) {
1195  		count = ib_dma_map_sg(dev->ib_dev, req->sglist, req->sg_cnt,
1196  				      req->dir);
1197  		if (!count) {
1198  			rtrs_wrn(s,
1199  				  "Read request failed, dma map failed\n");
1200  			return -EINVAL;
1201  		}
1202  	}
1203  	/* put our message into req->buf after user message*/
1204  	msg = req->iu->buf + req->usr_len;
1205  	msg->type = cpu_to_le16(RTRS_MSG_READ);
1206  	msg->usr_len = cpu_to_le16(req->usr_len);
1207  
1208  	if (count) {
1209  		ret = rtrs_map_sg_fr(req, count);
1210  		if (ret < 0) {
1211  			rtrs_err_rl(s,
1212  				     "Read request failed, failed to map  fast reg. data, err: %d\n",
1213  				     ret);
1214  			ib_dma_unmap_sg(dev->ib_dev, req->sglist, req->sg_cnt,
1215  					req->dir);
1216  			return ret;
1217  		}
1218  		rwr = (struct ib_reg_wr) {
1219  			.wr.opcode = IB_WR_REG_MR,
1220  			.wr.wr_cqe = &fast_reg_cqe,
1221  			.mr = req->mr,
1222  			.key = req->mr->rkey,
1223  			.access = (IB_ACCESS_LOCAL_WRITE |
1224  				   IB_ACCESS_REMOTE_WRITE),
1225  		};
1226  		wr = &rwr.wr;
1227  
1228  		msg->sg_cnt = cpu_to_le16(1);
1229  		msg->flags = cpu_to_le16(RTRS_MSG_NEED_INVAL_F);
1230  
1231  		msg->desc[0].addr = cpu_to_le64(req->mr->iova);
1232  		msg->desc[0].key = cpu_to_le32(req->mr->rkey);
1233  		msg->desc[0].len = cpu_to_le32(req->mr->length);
1234  
1235  		/* Further invalidation is required */
1236  		req->need_inv = !!RTRS_MSG_NEED_INVAL_F;
1237  
1238  	} else {
1239  		msg->sg_cnt = 0;
1240  		msg->flags = 0;
1241  	}
1242  	/*
1243  	 * rtrs message will be after the space reserved for disk data and
1244  	 * user message
1245  	 */
1246  	imm = req->permit->mem_off + req->data_len + req->usr_len;
1247  	imm = rtrs_to_io_req_imm(imm);
1248  	buf_id = req->permit->mem_id;
1249  
1250  	req->sg_size  = sizeof(*msg);
1251  	req->sg_size += le16_to_cpu(msg->sg_cnt) * sizeof(struct rtrs_sg_desc);
1252  	req->sg_size += req->usr_len;
1253  
1254  	/*
1255  	 * Update stats now, after request is successfully sent it is not
1256  	 * safe anymore to touch it.
1257  	 */
1258  	rtrs_clt_update_all_stats(req, READ);
1259  
1260  	ret = rtrs_post_send_rdma(req->con, req, &clt_path->rbufs[buf_id],
1261  				   req->data_len, imm, wr);
1262  	if (ret) {
1263  		rtrs_err_rl(s,
1264  			    "Read request failed: error=%d path=%s [%s:%u]\n",
1265  			    ret, kobject_name(&clt_path->kobj), clt_path->hca_name,
1266  			    clt_path->hca_port);
1267  		if (req->mp_policy == MP_POLICY_MIN_INFLIGHT)
1268  			atomic_dec(&clt_path->stats->inflight);
1269  		req->need_inv = false;
1270  		if (req->sg_cnt)
1271  			ib_dma_unmap_sg(dev->ib_dev, req->sglist,
1272  					req->sg_cnt, req->dir);
1273  	}
1274  
1275  	return ret;
1276  }
1277  
1278  /**
1279   * rtrs_clt_failover_req() - Try to find an active path for a failed request
1280   * @clt: clt context
1281   * @fail_req: a failed io request.
1282   */
rtrs_clt_failover_req(struct rtrs_clt_sess * clt,struct rtrs_clt_io_req * fail_req)1283  static int rtrs_clt_failover_req(struct rtrs_clt_sess *clt,
1284  				 struct rtrs_clt_io_req *fail_req)
1285  {
1286  	struct rtrs_clt_path *alive_path;
1287  	struct rtrs_clt_io_req *req;
1288  	int err = -ECONNABORTED;
1289  	struct path_it it;
1290  
1291  	rcu_read_lock();
1292  	for (path_it_init(&it, clt);
1293  	     (alive_path = it.next_path(&it)) && it.i < it.clt->paths_num;
1294  	     it.i++) {
1295  		if (READ_ONCE(alive_path->state) != RTRS_CLT_CONNECTED)
1296  			continue;
1297  		req = rtrs_clt_get_copy_req(alive_path, fail_req);
1298  		if (req->dir == DMA_TO_DEVICE)
1299  			err = rtrs_clt_write_req(req);
1300  		else
1301  			err = rtrs_clt_read_req(req);
1302  		if (err) {
1303  			req->in_use = false;
1304  			continue;
1305  		}
1306  		/* Success path */
1307  		rtrs_clt_inc_failover_cnt(alive_path->stats);
1308  		break;
1309  	}
1310  	path_it_deinit(&it);
1311  	rcu_read_unlock();
1312  
1313  	return err;
1314  }
1315  
fail_all_outstanding_reqs(struct rtrs_clt_path * clt_path)1316  static void fail_all_outstanding_reqs(struct rtrs_clt_path *clt_path)
1317  {
1318  	struct rtrs_clt_sess *clt = clt_path->clt;
1319  	struct rtrs_clt_io_req *req;
1320  	int i, err;
1321  
1322  	if (!clt_path->reqs)
1323  		return;
1324  	for (i = 0; i < clt_path->queue_depth; ++i) {
1325  		req = &clt_path->reqs[i];
1326  		if (!req->in_use)
1327  			continue;
1328  
1329  		/*
1330  		 * Safely (without notification) complete failed request.
1331  		 * After completion this request is still useble and can
1332  		 * be failovered to another path.
1333  		 */
1334  		complete_rdma_req(req, -ECONNABORTED, false, true);
1335  
1336  		err = rtrs_clt_failover_req(clt, req);
1337  		if (err)
1338  			/* Failover failed, notify anyway */
1339  			req->conf(req->priv, err);
1340  	}
1341  }
1342  
free_path_reqs(struct rtrs_clt_path * clt_path)1343  static void free_path_reqs(struct rtrs_clt_path *clt_path)
1344  {
1345  	struct rtrs_clt_io_req *req;
1346  	int i;
1347  
1348  	if (!clt_path->reqs)
1349  		return;
1350  	for (i = 0; i < clt_path->queue_depth; ++i) {
1351  		req = &clt_path->reqs[i];
1352  		if (req->mr)
1353  			ib_dereg_mr(req->mr);
1354  		kfree(req->sge);
1355  		rtrs_iu_free(req->iu, clt_path->s.dev->ib_dev, 1);
1356  	}
1357  	kfree(clt_path->reqs);
1358  	clt_path->reqs = NULL;
1359  }
1360  
alloc_path_reqs(struct rtrs_clt_path * clt_path)1361  static int alloc_path_reqs(struct rtrs_clt_path *clt_path)
1362  {
1363  	struct rtrs_clt_io_req *req;
1364  	int i, err = -ENOMEM;
1365  
1366  	clt_path->reqs = kcalloc(clt_path->queue_depth,
1367  				 sizeof(*clt_path->reqs),
1368  				 GFP_KERNEL);
1369  	if (!clt_path->reqs)
1370  		return -ENOMEM;
1371  
1372  	for (i = 0; i < clt_path->queue_depth; ++i) {
1373  		req = &clt_path->reqs[i];
1374  		req->iu = rtrs_iu_alloc(1, clt_path->max_hdr_size, GFP_KERNEL,
1375  					 clt_path->s.dev->ib_dev,
1376  					 DMA_TO_DEVICE,
1377  					 rtrs_clt_rdma_done);
1378  		if (!req->iu)
1379  			goto out;
1380  
1381  		req->sge = kcalloc(2, sizeof(*req->sge), GFP_KERNEL);
1382  		if (!req->sge)
1383  			goto out;
1384  
1385  		req->mr = ib_alloc_mr(clt_path->s.dev->ib_pd,
1386  				      IB_MR_TYPE_MEM_REG,
1387  				      clt_path->max_pages_per_mr);
1388  		if (IS_ERR(req->mr)) {
1389  			err = PTR_ERR(req->mr);
1390  			req->mr = NULL;
1391  			pr_err("Failed to alloc clt_path->max_pages_per_mr %d\n",
1392  			       clt_path->max_pages_per_mr);
1393  			goto out;
1394  		}
1395  
1396  		init_completion(&req->inv_comp);
1397  	}
1398  
1399  	return 0;
1400  
1401  out:
1402  	free_path_reqs(clt_path);
1403  
1404  	return err;
1405  }
1406  
alloc_permits(struct rtrs_clt_sess * clt)1407  static int alloc_permits(struct rtrs_clt_sess *clt)
1408  {
1409  	unsigned int chunk_bits;
1410  	int err, i;
1411  
1412  	clt->permits_map = bitmap_zalloc(clt->queue_depth, GFP_KERNEL);
1413  	if (!clt->permits_map) {
1414  		err = -ENOMEM;
1415  		goto out_err;
1416  	}
1417  	clt->permits = kcalloc(clt->queue_depth, permit_size(clt), GFP_KERNEL);
1418  	if (!clt->permits) {
1419  		err = -ENOMEM;
1420  		goto err_map;
1421  	}
1422  	chunk_bits = ilog2(clt->queue_depth - 1) + 1;
1423  	for (i = 0; i < clt->queue_depth; i++) {
1424  		struct rtrs_permit *permit;
1425  
1426  		permit = get_permit(clt, i);
1427  		permit->mem_id = i;
1428  		permit->mem_off = i << (MAX_IMM_PAYL_BITS - chunk_bits);
1429  	}
1430  
1431  	return 0;
1432  
1433  err_map:
1434  	bitmap_free(clt->permits_map);
1435  	clt->permits_map = NULL;
1436  out_err:
1437  	return err;
1438  }
1439  
free_permits(struct rtrs_clt_sess * clt)1440  static void free_permits(struct rtrs_clt_sess *clt)
1441  {
1442  	if (clt->permits_map)
1443  		wait_event(clt->permits_wait,
1444  			   bitmap_empty(clt->permits_map, clt->queue_depth));
1445  
1446  	bitmap_free(clt->permits_map);
1447  	clt->permits_map = NULL;
1448  	kfree(clt->permits);
1449  	clt->permits = NULL;
1450  }
1451  
query_fast_reg_mode(struct rtrs_clt_path * clt_path)1452  static void query_fast_reg_mode(struct rtrs_clt_path *clt_path)
1453  {
1454  	struct ib_device *ib_dev;
1455  	u64 max_pages_per_mr;
1456  	int mr_page_shift;
1457  
1458  	ib_dev = clt_path->s.dev->ib_dev;
1459  
1460  	/*
1461  	 * Use the smallest page size supported by the HCA, down to a
1462  	 * minimum of 4096 bytes. We're unlikely to build large sglists
1463  	 * out of smaller entries.
1464  	 */
1465  	mr_page_shift      = max(12, ffs(ib_dev->attrs.page_size_cap) - 1);
1466  	max_pages_per_mr   = ib_dev->attrs.max_mr_size;
1467  	do_div(max_pages_per_mr, (1ull << mr_page_shift));
1468  	clt_path->max_pages_per_mr =
1469  		min3(clt_path->max_pages_per_mr, (u32)max_pages_per_mr,
1470  		     ib_dev->attrs.max_fast_reg_page_list_len);
1471  	clt_path->clt->max_segments =
1472  		min(clt_path->max_pages_per_mr, clt_path->clt->max_segments);
1473  }
1474  
rtrs_clt_change_state_get_old(struct rtrs_clt_path * clt_path,enum rtrs_clt_state new_state,enum rtrs_clt_state * old_state)1475  static bool rtrs_clt_change_state_get_old(struct rtrs_clt_path *clt_path,
1476  					   enum rtrs_clt_state new_state,
1477  					   enum rtrs_clt_state *old_state)
1478  {
1479  	bool changed;
1480  
1481  	spin_lock_irq(&clt_path->state_wq.lock);
1482  	if (old_state)
1483  		*old_state = clt_path->state;
1484  	changed = rtrs_clt_change_state(clt_path, new_state);
1485  	spin_unlock_irq(&clt_path->state_wq.lock);
1486  
1487  	return changed;
1488  }
1489  
rtrs_clt_hb_err_handler(struct rtrs_con * c)1490  static void rtrs_clt_hb_err_handler(struct rtrs_con *c)
1491  {
1492  	struct rtrs_clt_con *con = container_of(c, typeof(*con), c);
1493  
1494  	rtrs_rdma_error_recovery(con);
1495  }
1496  
rtrs_clt_init_hb(struct rtrs_clt_path * clt_path)1497  static void rtrs_clt_init_hb(struct rtrs_clt_path *clt_path)
1498  {
1499  	rtrs_init_hb(&clt_path->s, &io_comp_cqe,
1500  		      RTRS_HB_INTERVAL_MS,
1501  		      RTRS_HB_MISSED_MAX,
1502  		      rtrs_clt_hb_err_handler,
1503  		      rtrs_wq);
1504  }
1505  
1506  static void rtrs_clt_reconnect_work(struct work_struct *work);
1507  static void rtrs_clt_close_work(struct work_struct *work);
1508  
rtrs_clt_err_recovery_work(struct work_struct * work)1509  static void rtrs_clt_err_recovery_work(struct work_struct *work)
1510  {
1511  	struct rtrs_clt_path *clt_path;
1512  	struct rtrs_clt_sess *clt;
1513  	int delay_ms;
1514  
1515  	clt_path = container_of(work, struct rtrs_clt_path, err_recovery_work);
1516  	clt = clt_path->clt;
1517  	delay_ms = clt->reconnect_delay_sec * 1000;
1518  	rtrs_clt_stop_and_destroy_conns(clt_path);
1519  	queue_delayed_work(rtrs_wq, &clt_path->reconnect_dwork,
1520  			   msecs_to_jiffies(delay_ms +
1521  					    get_random_u32_below(RTRS_RECONNECT_SEED)));
1522  }
1523  
alloc_path(struct rtrs_clt_sess * clt,const struct rtrs_addr * path,size_t con_num,u32 nr_poll_queues)1524  static struct rtrs_clt_path *alloc_path(struct rtrs_clt_sess *clt,
1525  					const struct rtrs_addr *path,
1526  					size_t con_num, u32 nr_poll_queues)
1527  {
1528  	struct rtrs_clt_path *clt_path;
1529  	int err = -ENOMEM;
1530  	int cpu;
1531  	size_t total_con;
1532  
1533  	clt_path = kzalloc(sizeof(*clt_path), GFP_KERNEL);
1534  	if (!clt_path)
1535  		goto err;
1536  
1537  	/*
1538  	 * irqmode and poll
1539  	 * +1: Extra connection for user messages
1540  	 */
1541  	total_con = con_num + nr_poll_queues + 1;
1542  	clt_path->s.con = kcalloc(total_con, sizeof(*clt_path->s.con),
1543  				  GFP_KERNEL);
1544  	if (!clt_path->s.con)
1545  		goto err_free_path;
1546  
1547  	clt_path->s.con_num = total_con;
1548  	clt_path->s.irq_con_num = con_num + 1;
1549  
1550  	clt_path->stats = kzalloc(sizeof(*clt_path->stats), GFP_KERNEL);
1551  	if (!clt_path->stats)
1552  		goto err_free_con;
1553  
1554  	mutex_init(&clt_path->init_mutex);
1555  	uuid_gen(&clt_path->s.uuid);
1556  	memcpy(&clt_path->s.dst_addr, path->dst,
1557  	       rdma_addr_size((struct sockaddr *)path->dst));
1558  
1559  	/*
1560  	 * rdma_resolve_addr() passes src_addr to cma_bind_addr, which
1561  	 * checks the sa_family to be non-zero. If user passed src_addr=NULL
1562  	 * the sess->src_addr will contain only zeros, which is then fine.
1563  	 */
1564  	if (path->src)
1565  		memcpy(&clt_path->s.src_addr, path->src,
1566  		       rdma_addr_size((struct sockaddr *)path->src));
1567  	strscpy(clt_path->s.sessname, clt->sessname,
1568  		sizeof(clt_path->s.sessname));
1569  	clt_path->clt = clt;
1570  	clt_path->max_pages_per_mr = RTRS_MAX_SEGMENTS;
1571  	init_waitqueue_head(&clt_path->state_wq);
1572  	clt_path->state = RTRS_CLT_CONNECTING;
1573  	atomic_set(&clt_path->connected_cnt, 0);
1574  	INIT_WORK(&clt_path->close_work, rtrs_clt_close_work);
1575  	INIT_WORK(&clt_path->err_recovery_work, rtrs_clt_err_recovery_work);
1576  	INIT_DELAYED_WORK(&clt_path->reconnect_dwork, rtrs_clt_reconnect_work);
1577  	rtrs_clt_init_hb(clt_path);
1578  
1579  	clt_path->mp_skip_entry = alloc_percpu(typeof(*clt_path->mp_skip_entry));
1580  	if (!clt_path->mp_skip_entry)
1581  		goto err_free_stats;
1582  
1583  	for_each_possible_cpu(cpu)
1584  		INIT_LIST_HEAD(per_cpu_ptr(clt_path->mp_skip_entry, cpu));
1585  
1586  	err = rtrs_clt_init_stats(clt_path->stats);
1587  	if (err)
1588  		goto err_free_percpu;
1589  
1590  	return clt_path;
1591  
1592  err_free_percpu:
1593  	free_percpu(clt_path->mp_skip_entry);
1594  err_free_stats:
1595  	kfree(clt_path->stats);
1596  err_free_con:
1597  	kfree(clt_path->s.con);
1598  err_free_path:
1599  	kfree(clt_path);
1600  err:
1601  	return ERR_PTR(err);
1602  }
1603  
free_path(struct rtrs_clt_path * clt_path)1604  void free_path(struct rtrs_clt_path *clt_path)
1605  {
1606  	free_percpu(clt_path->mp_skip_entry);
1607  	mutex_destroy(&clt_path->init_mutex);
1608  	kfree(clt_path->s.con);
1609  	kfree(clt_path->rbufs);
1610  	kfree(clt_path);
1611  }
1612  
create_con(struct rtrs_clt_path * clt_path,unsigned int cid)1613  static int create_con(struct rtrs_clt_path *clt_path, unsigned int cid)
1614  {
1615  	struct rtrs_clt_con *con;
1616  
1617  	con = kzalloc(sizeof(*con), GFP_KERNEL);
1618  	if (!con)
1619  		return -ENOMEM;
1620  
1621  	/* Map first two connections to the first CPU */
1622  	con->cpu  = (cid ? cid - 1 : 0) % nr_cpu_ids;
1623  	con->c.cid = cid;
1624  	con->c.path = &clt_path->s;
1625  	/* Align with srv, init as 1 */
1626  	atomic_set(&con->c.wr_cnt, 1);
1627  	mutex_init(&con->con_mutex);
1628  
1629  	clt_path->s.con[cid] = &con->c;
1630  
1631  	return 0;
1632  }
1633  
destroy_con(struct rtrs_clt_con * con)1634  static void destroy_con(struct rtrs_clt_con *con)
1635  {
1636  	struct rtrs_clt_path *clt_path = to_clt_path(con->c.path);
1637  
1638  	clt_path->s.con[con->c.cid] = NULL;
1639  	mutex_destroy(&con->con_mutex);
1640  	kfree(con);
1641  }
1642  
create_con_cq_qp(struct rtrs_clt_con * con)1643  static int create_con_cq_qp(struct rtrs_clt_con *con)
1644  {
1645  	struct rtrs_clt_path *clt_path = to_clt_path(con->c.path);
1646  	u32 max_send_wr, max_recv_wr, cq_num, max_send_sge, wr_limit;
1647  	int err, cq_vector;
1648  	struct rtrs_msg_rkey_rsp *rsp;
1649  
1650  	lockdep_assert_held(&con->con_mutex);
1651  	if (con->c.cid == 0) {
1652  		max_send_sge = 1;
1653  		/* We must be the first here */
1654  		if (WARN_ON(clt_path->s.dev))
1655  			return -EINVAL;
1656  
1657  		/*
1658  		 * The whole session uses device from user connection.
1659  		 * Be careful not to close user connection before ib dev
1660  		 * is gracefully put.
1661  		 */
1662  		clt_path->s.dev = rtrs_ib_dev_find_or_add(con->c.cm_id->device,
1663  						       &dev_pd);
1664  		if (!clt_path->s.dev) {
1665  			rtrs_wrn(clt_path->clt,
1666  				  "rtrs_ib_dev_find_get_or_add(): no memory\n");
1667  			return -ENOMEM;
1668  		}
1669  		clt_path->s.dev_ref = 1;
1670  		query_fast_reg_mode(clt_path);
1671  		wr_limit = clt_path->s.dev->ib_dev->attrs.max_qp_wr;
1672  		/*
1673  		 * Two (request + registration) completion for send
1674  		 * Two for recv if always_invalidate is set on server
1675  		 * or one for recv.
1676  		 * + 2 for drain and heartbeat
1677  		 * in case qp gets into error state.
1678  		 */
1679  		max_send_wr =
1680  			min_t(int, wr_limit, SERVICE_CON_QUEUE_DEPTH * 2 + 2);
1681  		max_recv_wr = max_send_wr;
1682  	} else {
1683  		/*
1684  		 * Here we assume that session members are correctly set.
1685  		 * This is always true if user connection (cid == 0) is
1686  		 * established first.
1687  		 */
1688  		if (WARN_ON(!clt_path->s.dev))
1689  			return -EINVAL;
1690  		if (WARN_ON(!clt_path->queue_depth))
1691  			return -EINVAL;
1692  
1693  		wr_limit = clt_path->s.dev->ib_dev->attrs.max_qp_wr;
1694  		/* Shared between connections */
1695  		clt_path->s.dev_ref++;
1696  		max_send_wr = min_t(int, wr_limit,
1697  			      /* QD * (REQ + RSP + FR REGS or INVS) + drain */
1698  			      clt_path->queue_depth * 4 + 1);
1699  		max_recv_wr = min_t(int, wr_limit,
1700  			      clt_path->queue_depth * 3 + 1);
1701  		max_send_sge = 2;
1702  	}
1703  	atomic_set(&con->c.sq_wr_avail, max_send_wr);
1704  	cq_num = max_send_wr + max_recv_wr;
1705  	/* alloc iu to recv new rkey reply when server reports flags set */
1706  	if (clt_path->flags & RTRS_MSG_NEW_RKEY_F || con->c.cid == 0) {
1707  		con->rsp_ius = rtrs_iu_alloc(cq_num, sizeof(*rsp),
1708  					      GFP_KERNEL,
1709  					      clt_path->s.dev->ib_dev,
1710  					      DMA_FROM_DEVICE,
1711  					      rtrs_clt_rdma_done);
1712  		if (!con->rsp_ius)
1713  			return -ENOMEM;
1714  		con->queue_num = cq_num;
1715  	}
1716  	cq_vector = con->cpu % clt_path->s.dev->ib_dev->num_comp_vectors;
1717  	if (con->c.cid >= clt_path->s.irq_con_num)
1718  		err = rtrs_cq_qp_create(&clt_path->s, &con->c, max_send_sge,
1719  					cq_vector, cq_num, max_send_wr,
1720  					max_recv_wr, IB_POLL_DIRECT);
1721  	else
1722  		err = rtrs_cq_qp_create(&clt_path->s, &con->c, max_send_sge,
1723  					cq_vector, cq_num, max_send_wr,
1724  					max_recv_wr, IB_POLL_SOFTIRQ);
1725  	/*
1726  	 * In case of error we do not bother to clean previous allocations,
1727  	 * since destroy_con_cq_qp() must be called.
1728  	 */
1729  	return err;
1730  }
1731  
destroy_con_cq_qp(struct rtrs_clt_con * con)1732  static void destroy_con_cq_qp(struct rtrs_clt_con *con)
1733  {
1734  	struct rtrs_clt_path *clt_path = to_clt_path(con->c.path);
1735  
1736  	/*
1737  	 * Be careful here: destroy_con_cq_qp() can be called even
1738  	 * create_con_cq_qp() failed, see comments there.
1739  	 */
1740  	lockdep_assert_held(&con->con_mutex);
1741  	rtrs_cq_qp_destroy(&con->c);
1742  	if (con->rsp_ius) {
1743  		rtrs_iu_free(con->rsp_ius, clt_path->s.dev->ib_dev,
1744  			     con->queue_num);
1745  		con->rsp_ius = NULL;
1746  		con->queue_num = 0;
1747  	}
1748  	if (clt_path->s.dev_ref && !--clt_path->s.dev_ref) {
1749  		rtrs_ib_dev_put(clt_path->s.dev);
1750  		clt_path->s.dev = NULL;
1751  	}
1752  }
1753  
stop_cm(struct rtrs_clt_con * con)1754  static void stop_cm(struct rtrs_clt_con *con)
1755  {
1756  	rdma_disconnect(con->c.cm_id);
1757  	if (con->c.qp)
1758  		ib_drain_qp(con->c.qp);
1759  }
1760  
destroy_cm(struct rtrs_clt_con * con)1761  static void destroy_cm(struct rtrs_clt_con *con)
1762  {
1763  	rdma_destroy_id(con->c.cm_id);
1764  	con->c.cm_id = NULL;
1765  }
1766  
rtrs_rdma_addr_resolved(struct rtrs_clt_con * con)1767  static int rtrs_rdma_addr_resolved(struct rtrs_clt_con *con)
1768  {
1769  	struct rtrs_path *s = con->c.path;
1770  	int err;
1771  
1772  	mutex_lock(&con->con_mutex);
1773  	err = create_con_cq_qp(con);
1774  	mutex_unlock(&con->con_mutex);
1775  	if (err) {
1776  		rtrs_err(s, "create_con_cq_qp(), err: %d\n", err);
1777  		return err;
1778  	}
1779  	err = rdma_resolve_route(con->c.cm_id, RTRS_CONNECT_TIMEOUT_MS);
1780  	if (err)
1781  		rtrs_err(s, "Resolving route failed, err: %d\n", err);
1782  
1783  	return err;
1784  }
1785  
rtrs_rdma_route_resolved(struct rtrs_clt_con * con)1786  static int rtrs_rdma_route_resolved(struct rtrs_clt_con *con)
1787  {
1788  	struct rtrs_clt_path *clt_path = to_clt_path(con->c.path);
1789  	struct rtrs_clt_sess *clt = clt_path->clt;
1790  	struct rtrs_msg_conn_req msg;
1791  	struct rdma_conn_param param;
1792  
1793  	int err;
1794  
1795  	param = (struct rdma_conn_param) {
1796  		.retry_count = 7,
1797  		.rnr_retry_count = 7,
1798  		.private_data = &msg,
1799  		.private_data_len = sizeof(msg),
1800  	};
1801  
1802  	msg = (struct rtrs_msg_conn_req) {
1803  		.magic = cpu_to_le16(RTRS_MAGIC),
1804  		.version = cpu_to_le16(RTRS_PROTO_VER),
1805  		.cid = cpu_to_le16(con->c.cid),
1806  		.cid_num = cpu_to_le16(clt_path->s.con_num),
1807  		.recon_cnt = cpu_to_le16(clt_path->s.recon_cnt),
1808  	};
1809  	msg.first_conn = clt_path->for_new_clt ? FIRST_CONN : 0;
1810  	uuid_copy(&msg.sess_uuid, &clt_path->s.uuid);
1811  	uuid_copy(&msg.paths_uuid, &clt->paths_uuid);
1812  
1813  	err = rdma_connect_locked(con->c.cm_id, &param);
1814  	if (err)
1815  		rtrs_err(clt, "rdma_connect_locked(): %d\n", err);
1816  
1817  	return err;
1818  }
1819  
rtrs_rdma_conn_established(struct rtrs_clt_con * con,struct rdma_cm_event * ev)1820  static int rtrs_rdma_conn_established(struct rtrs_clt_con *con,
1821  				       struct rdma_cm_event *ev)
1822  {
1823  	struct rtrs_clt_path *clt_path = to_clt_path(con->c.path);
1824  	struct rtrs_clt_sess *clt = clt_path->clt;
1825  	const struct rtrs_msg_conn_rsp *msg;
1826  	u16 version, queue_depth;
1827  	int errno;
1828  	u8 len;
1829  
1830  	msg = ev->param.conn.private_data;
1831  	len = ev->param.conn.private_data_len;
1832  	if (len < sizeof(*msg)) {
1833  		rtrs_err(clt, "Invalid RTRS connection response\n");
1834  		return -ECONNRESET;
1835  	}
1836  	if (le16_to_cpu(msg->magic) != RTRS_MAGIC) {
1837  		rtrs_err(clt, "Invalid RTRS magic\n");
1838  		return -ECONNRESET;
1839  	}
1840  	version = le16_to_cpu(msg->version);
1841  	if (version >> 8 != RTRS_PROTO_VER_MAJOR) {
1842  		rtrs_err(clt, "Unsupported major RTRS version: %d, expected %d\n",
1843  			  version >> 8, RTRS_PROTO_VER_MAJOR);
1844  		return -ECONNRESET;
1845  	}
1846  	errno = le16_to_cpu(msg->errno);
1847  	if (errno) {
1848  		rtrs_err(clt, "Invalid RTRS message: errno %d\n",
1849  			  errno);
1850  		return -ECONNRESET;
1851  	}
1852  	if (con->c.cid == 0) {
1853  		queue_depth = le16_to_cpu(msg->queue_depth);
1854  
1855  		if (clt_path->queue_depth > 0 && queue_depth != clt_path->queue_depth) {
1856  			rtrs_err(clt, "Error: queue depth changed\n");
1857  
1858  			/*
1859  			 * Stop any more reconnection attempts
1860  			 */
1861  			clt_path->reconnect_attempts = -1;
1862  			rtrs_err(clt,
1863  				"Disabling auto-reconnect. Trigger a manual reconnect after issue is resolved\n");
1864  			return -ECONNRESET;
1865  		}
1866  
1867  		if (!clt_path->rbufs) {
1868  			clt_path->rbufs = kcalloc(queue_depth,
1869  						  sizeof(*clt_path->rbufs),
1870  						  GFP_KERNEL);
1871  			if (!clt_path->rbufs)
1872  				return -ENOMEM;
1873  		}
1874  		clt_path->queue_depth = queue_depth;
1875  		clt_path->s.signal_interval = min_not_zero(queue_depth,
1876  						(unsigned short) SERVICE_CON_QUEUE_DEPTH);
1877  		clt_path->max_hdr_size = le32_to_cpu(msg->max_hdr_size);
1878  		clt_path->max_io_size = le32_to_cpu(msg->max_io_size);
1879  		clt_path->flags = le32_to_cpu(msg->flags);
1880  		clt_path->chunk_size = clt_path->max_io_size + clt_path->max_hdr_size;
1881  
1882  		/*
1883  		 * Global IO size is always a minimum.
1884  		 * If while a reconnection server sends us a value a bit
1885  		 * higher - client does not care and uses cached minimum.
1886  		 *
1887  		 * Since we can have several sessions (paths) restablishing
1888  		 * connections in parallel, use lock.
1889  		 */
1890  		mutex_lock(&clt->paths_mutex);
1891  		clt->queue_depth = clt_path->queue_depth;
1892  		clt->max_io_size = min_not_zero(clt_path->max_io_size,
1893  						clt->max_io_size);
1894  		mutex_unlock(&clt->paths_mutex);
1895  
1896  		/*
1897  		 * Cache the hca_port and hca_name for sysfs
1898  		 */
1899  		clt_path->hca_port = con->c.cm_id->port_num;
1900  		scnprintf(clt_path->hca_name, sizeof(clt_path->hca_name),
1901  			  clt_path->s.dev->ib_dev->name);
1902  		clt_path->s.src_addr = con->c.cm_id->route.addr.src_addr;
1903  		/* set for_new_clt, to allow future reconnect on any path */
1904  		clt_path->for_new_clt = 1;
1905  	}
1906  
1907  	return 0;
1908  }
1909  
flag_success_on_conn(struct rtrs_clt_con * con)1910  static inline void flag_success_on_conn(struct rtrs_clt_con *con)
1911  {
1912  	struct rtrs_clt_path *clt_path = to_clt_path(con->c.path);
1913  
1914  	atomic_inc(&clt_path->connected_cnt);
1915  	con->cm_err = 1;
1916  }
1917  
rtrs_rdma_conn_rejected(struct rtrs_clt_con * con,struct rdma_cm_event * ev)1918  static int rtrs_rdma_conn_rejected(struct rtrs_clt_con *con,
1919  				    struct rdma_cm_event *ev)
1920  {
1921  	struct rtrs_path *s = con->c.path;
1922  	const struct rtrs_msg_conn_rsp *msg;
1923  	const char *rej_msg;
1924  	int status, errno;
1925  	u8 data_len;
1926  
1927  	status = ev->status;
1928  	rej_msg = rdma_reject_msg(con->c.cm_id, status);
1929  	msg = rdma_consumer_reject_data(con->c.cm_id, ev, &data_len);
1930  
1931  	if (msg && data_len >= sizeof(*msg)) {
1932  		errno = (int16_t)le16_to_cpu(msg->errno);
1933  		if (errno == -EBUSY)
1934  			rtrs_err(s,
1935  				  "Previous session is still exists on the server, please reconnect later\n");
1936  		else
1937  			rtrs_err(s,
1938  				  "Connect rejected: status %d (%s), rtrs errno %d\n",
1939  				  status, rej_msg, errno);
1940  	} else {
1941  		rtrs_err(s,
1942  			  "Connect rejected but with malformed message: status %d (%s)\n",
1943  			  status, rej_msg);
1944  	}
1945  
1946  	return -ECONNRESET;
1947  }
1948  
rtrs_clt_close_conns(struct rtrs_clt_path * clt_path,bool wait)1949  void rtrs_clt_close_conns(struct rtrs_clt_path *clt_path, bool wait)
1950  {
1951  	trace_rtrs_clt_close_conns(clt_path);
1952  
1953  	if (rtrs_clt_change_state_get_old(clt_path, RTRS_CLT_CLOSING, NULL))
1954  		queue_work(rtrs_wq, &clt_path->close_work);
1955  	if (wait)
1956  		flush_work(&clt_path->close_work);
1957  }
1958  
flag_error_on_conn(struct rtrs_clt_con * con,int cm_err)1959  static inline void flag_error_on_conn(struct rtrs_clt_con *con, int cm_err)
1960  {
1961  	if (con->cm_err == 1) {
1962  		struct rtrs_clt_path *clt_path;
1963  
1964  		clt_path = to_clt_path(con->c.path);
1965  		if (atomic_dec_and_test(&clt_path->connected_cnt))
1966  
1967  			wake_up(&clt_path->state_wq);
1968  	}
1969  	con->cm_err = cm_err;
1970  }
1971  
rtrs_clt_rdma_cm_handler(struct rdma_cm_id * cm_id,struct rdma_cm_event * ev)1972  static int rtrs_clt_rdma_cm_handler(struct rdma_cm_id *cm_id,
1973  				     struct rdma_cm_event *ev)
1974  {
1975  	struct rtrs_clt_con *con = cm_id->context;
1976  	struct rtrs_path *s = con->c.path;
1977  	struct rtrs_clt_path *clt_path = to_clt_path(s);
1978  	int cm_err = 0;
1979  
1980  	switch (ev->event) {
1981  	case RDMA_CM_EVENT_ADDR_RESOLVED:
1982  		cm_err = rtrs_rdma_addr_resolved(con);
1983  		break;
1984  	case RDMA_CM_EVENT_ROUTE_RESOLVED:
1985  		cm_err = rtrs_rdma_route_resolved(con);
1986  		break;
1987  	case RDMA_CM_EVENT_ESTABLISHED:
1988  		cm_err = rtrs_rdma_conn_established(con, ev);
1989  		if (!cm_err) {
1990  			/*
1991  			 * Report success and wake up. Here we abuse state_wq,
1992  			 * i.e. wake up without state change, but we set cm_err.
1993  			 */
1994  			flag_success_on_conn(con);
1995  			wake_up(&clt_path->state_wq);
1996  			return 0;
1997  		}
1998  		break;
1999  	case RDMA_CM_EVENT_REJECTED:
2000  		cm_err = rtrs_rdma_conn_rejected(con, ev);
2001  		break;
2002  	case RDMA_CM_EVENT_DISCONNECTED:
2003  		/* No message for disconnecting */
2004  		cm_err = -ECONNRESET;
2005  		break;
2006  	case RDMA_CM_EVENT_CONNECT_ERROR:
2007  	case RDMA_CM_EVENT_UNREACHABLE:
2008  	case RDMA_CM_EVENT_ADDR_CHANGE:
2009  	case RDMA_CM_EVENT_TIMEWAIT_EXIT:
2010  		rtrs_wrn(s, "CM error (CM event: %s, err: %d)\n",
2011  			 rdma_event_msg(ev->event), ev->status);
2012  		cm_err = -ECONNRESET;
2013  		break;
2014  	case RDMA_CM_EVENT_ADDR_ERROR:
2015  	case RDMA_CM_EVENT_ROUTE_ERROR:
2016  		rtrs_wrn(s, "CM error (CM event: %s, err: %d)\n",
2017  			 rdma_event_msg(ev->event), ev->status);
2018  		cm_err = -EHOSTUNREACH;
2019  		break;
2020  	case RDMA_CM_EVENT_DEVICE_REMOVAL:
2021  		/*
2022  		 * Device removal is a special case.  Queue close and return 0.
2023  		 */
2024  		rtrs_clt_close_conns(clt_path, false);
2025  		return 0;
2026  	default:
2027  		rtrs_err(s, "Unexpected RDMA CM error (CM event: %s, err: %d)\n",
2028  			 rdma_event_msg(ev->event), ev->status);
2029  		cm_err = -ECONNRESET;
2030  		break;
2031  	}
2032  
2033  	if (cm_err) {
2034  		/*
2035  		 * cm error makes sense only on connection establishing,
2036  		 * in other cases we rely on normal procedure of reconnecting.
2037  		 */
2038  		flag_error_on_conn(con, cm_err);
2039  		rtrs_rdma_error_recovery(con);
2040  	}
2041  
2042  	return 0;
2043  }
2044  
2045  /* The caller should do the cleanup in case of error */
create_cm(struct rtrs_clt_con * con)2046  static int create_cm(struct rtrs_clt_con *con)
2047  {
2048  	struct rtrs_path *s = con->c.path;
2049  	struct rtrs_clt_path *clt_path = to_clt_path(s);
2050  	struct rdma_cm_id *cm_id;
2051  	int err;
2052  
2053  	cm_id = rdma_create_id(&init_net, rtrs_clt_rdma_cm_handler, con,
2054  			       clt_path->s.dst_addr.ss_family == AF_IB ?
2055  			       RDMA_PS_IB : RDMA_PS_TCP, IB_QPT_RC);
2056  	if (IS_ERR(cm_id)) {
2057  		err = PTR_ERR(cm_id);
2058  		rtrs_err(s, "Failed to create CM ID, err: %d\n", err);
2059  
2060  		return err;
2061  	}
2062  	con->c.cm_id = cm_id;
2063  	con->cm_err = 0;
2064  	/* allow the port to be reused */
2065  	err = rdma_set_reuseaddr(cm_id, 1);
2066  	if (err != 0) {
2067  		rtrs_err(s, "Set address reuse failed, err: %d\n", err);
2068  		return err;
2069  	}
2070  	err = rdma_resolve_addr(cm_id, (struct sockaddr *)&clt_path->s.src_addr,
2071  				(struct sockaddr *)&clt_path->s.dst_addr,
2072  				RTRS_CONNECT_TIMEOUT_MS);
2073  	if (err) {
2074  		rtrs_err(s, "Failed to resolve address, err: %d\n", err);
2075  		return err;
2076  	}
2077  	/*
2078  	 * Combine connection status and session events. This is needed
2079  	 * for waiting two possible cases: cm_err has something meaningful
2080  	 * or session state was really changed to error by device removal.
2081  	 */
2082  	err = wait_event_interruptible_timeout(
2083  			clt_path->state_wq,
2084  			con->cm_err || clt_path->state != RTRS_CLT_CONNECTING,
2085  			msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS));
2086  	if (err == 0 || err == -ERESTARTSYS) {
2087  		if (err == 0)
2088  			err = -ETIMEDOUT;
2089  		/* Timedout or interrupted */
2090  		return err;
2091  	}
2092  	if (con->cm_err < 0)
2093  		return con->cm_err;
2094  	if (READ_ONCE(clt_path->state) != RTRS_CLT_CONNECTING)
2095  		/* Device removal */
2096  		return -ECONNABORTED;
2097  
2098  	return 0;
2099  }
2100  
rtrs_clt_path_up(struct rtrs_clt_path * clt_path)2101  static void rtrs_clt_path_up(struct rtrs_clt_path *clt_path)
2102  {
2103  	struct rtrs_clt_sess *clt = clt_path->clt;
2104  	int up;
2105  
2106  	/*
2107  	 * We can fire RECONNECTED event only when all paths were
2108  	 * connected on rtrs_clt_open(), then each was disconnected
2109  	 * and the first one connected again.  That's why this nasty
2110  	 * game with counter value.
2111  	 */
2112  
2113  	mutex_lock(&clt->paths_ev_mutex);
2114  	up = ++clt->paths_up;
2115  	/*
2116  	 * Here it is safe to access paths num directly since up counter
2117  	 * is greater than MAX_PATHS_NUM only while rtrs_clt_open() is
2118  	 * in progress, thus paths removals are impossible.
2119  	 */
2120  	if (up > MAX_PATHS_NUM && up == MAX_PATHS_NUM + clt->paths_num)
2121  		clt->paths_up = clt->paths_num;
2122  	else if (up == 1)
2123  		clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_RECONNECTED);
2124  	mutex_unlock(&clt->paths_ev_mutex);
2125  
2126  	/* Mark session as established */
2127  	clt_path->established = true;
2128  	clt_path->reconnect_attempts = 0;
2129  	clt_path->stats->reconnects.successful_cnt++;
2130  }
2131  
rtrs_clt_path_down(struct rtrs_clt_path * clt_path)2132  static void rtrs_clt_path_down(struct rtrs_clt_path *clt_path)
2133  {
2134  	struct rtrs_clt_sess *clt = clt_path->clt;
2135  
2136  	if (!clt_path->established)
2137  		return;
2138  
2139  	clt_path->established = false;
2140  	mutex_lock(&clt->paths_ev_mutex);
2141  	WARN_ON(!clt->paths_up);
2142  	if (--clt->paths_up == 0)
2143  		clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_DISCONNECTED);
2144  	mutex_unlock(&clt->paths_ev_mutex);
2145  }
2146  
rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_path * clt_path)2147  static void rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_path *clt_path)
2148  {
2149  	struct rtrs_clt_con *con;
2150  	unsigned int cid;
2151  
2152  	WARN_ON(READ_ONCE(clt_path->state) == RTRS_CLT_CONNECTED);
2153  
2154  	/*
2155  	 * Possible race with rtrs_clt_open(), when DEVICE_REMOVAL comes
2156  	 * exactly in between.  Start destroying after it finishes.
2157  	 */
2158  	mutex_lock(&clt_path->init_mutex);
2159  	mutex_unlock(&clt_path->init_mutex);
2160  
2161  	/*
2162  	 * All IO paths must observe !CONNECTED state before we
2163  	 * free everything.
2164  	 */
2165  	synchronize_rcu();
2166  
2167  	rtrs_stop_hb(&clt_path->s);
2168  
2169  	/*
2170  	 * The order it utterly crucial: firstly disconnect and complete all
2171  	 * rdma requests with error (thus set in_use=false for requests),
2172  	 * then fail outstanding requests checking in_use for each, and
2173  	 * eventually notify upper layer about session disconnection.
2174  	 */
2175  
2176  	for (cid = 0; cid < clt_path->s.con_num; cid++) {
2177  		if (!clt_path->s.con[cid])
2178  			break;
2179  		con = to_clt_con(clt_path->s.con[cid]);
2180  		stop_cm(con);
2181  	}
2182  	fail_all_outstanding_reqs(clt_path);
2183  	free_path_reqs(clt_path);
2184  	rtrs_clt_path_down(clt_path);
2185  
2186  	/*
2187  	 * Wait for graceful shutdown, namely when peer side invokes
2188  	 * rdma_disconnect(). 'connected_cnt' is decremented only on
2189  	 * CM events, thus if other side had crashed and hb has detected
2190  	 * something is wrong, here we will stuck for exactly timeout ms,
2191  	 * since CM does not fire anything.  That is fine, we are not in
2192  	 * hurry.
2193  	 */
2194  	wait_event_timeout(clt_path->state_wq,
2195  			   !atomic_read(&clt_path->connected_cnt),
2196  			   msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS));
2197  
2198  	for (cid = 0; cid < clt_path->s.con_num; cid++) {
2199  		if (!clt_path->s.con[cid])
2200  			break;
2201  		con = to_clt_con(clt_path->s.con[cid]);
2202  		mutex_lock(&con->con_mutex);
2203  		destroy_con_cq_qp(con);
2204  		mutex_unlock(&con->con_mutex);
2205  		destroy_cm(con);
2206  		destroy_con(con);
2207  	}
2208  }
2209  
rtrs_clt_remove_path_from_arr(struct rtrs_clt_path * clt_path)2210  static void rtrs_clt_remove_path_from_arr(struct rtrs_clt_path *clt_path)
2211  {
2212  	struct rtrs_clt_sess *clt = clt_path->clt;
2213  	struct rtrs_clt_path *next;
2214  	bool wait_for_grace = false;
2215  	int cpu;
2216  
2217  	mutex_lock(&clt->paths_mutex);
2218  	list_del_rcu(&clt_path->s.entry);
2219  
2220  	/* Make sure everybody observes path removal. */
2221  	synchronize_rcu();
2222  
2223  	/*
2224  	 * At this point nobody sees @sess in the list, but still we have
2225  	 * dangling pointer @pcpu_path which _can_ point to @sess.  Since
2226  	 * nobody can observe @sess in the list, we guarantee that IO path
2227  	 * will not assign @sess to @pcpu_path, i.e. @pcpu_path can be equal
2228  	 * to @sess, but can never again become @sess.
2229  	 */
2230  
2231  	/*
2232  	 * Decrement paths number only after grace period, because
2233  	 * caller of do_each_path() must firstly observe list without
2234  	 * path and only then decremented paths number.
2235  	 *
2236  	 * Otherwise there can be the following situation:
2237  	 *    o Two paths exist and IO is coming.
2238  	 *    o One path is removed:
2239  	 *      CPU#0                          CPU#1
2240  	 *      do_each_path():                rtrs_clt_remove_path_from_arr():
2241  	 *          path = get_next_path()
2242  	 *          ^^^                            list_del_rcu(path)
2243  	 *          [!CONNECTED path]              clt->paths_num--
2244  	 *                                              ^^^^^^^^^
2245  	 *          load clt->paths_num                 from 2 to 1
2246  	 *                    ^^^^^^^^^
2247  	 *                    sees 1
2248  	 *
2249  	 *      path is observed as !CONNECTED, but do_each_path() loop
2250  	 *      ends, because expression i < clt->paths_num is false.
2251  	 */
2252  	clt->paths_num--;
2253  
2254  	/*
2255  	 * Get @next connection from current @sess which is going to be
2256  	 * removed.  If @sess is the last element, then @next is NULL.
2257  	 */
2258  	rcu_read_lock();
2259  	next = rtrs_clt_get_next_path_or_null(&clt->paths_list, clt_path);
2260  	rcu_read_unlock();
2261  
2262  	/*
2263  	 * @pcpu paths can still point to the path which is going to be
2264  	 * removed, so change the pointer manually.
2265  	 */
2266  	for_each_possible_cpu(cpu) {
2267  		struct rtrs_clt_path __rcu **ppcpu_path;
2268  
2269  		ppcpu_path = per_cpu_ptr(clt->pcpu_path, cpu);
2270  		if (rcu_dereference_protected(*ppcpu_path,
2271  			lockdep_is_held(&clt->paths_mutex)) != clt_path)
2272  			/*
2273  			 * synchronize_rcu() was called just after deleting
2274  			 * entry from the list, thus IO code path cannot
2275  			 * change pointer back to the pointer which is going
2276  			 * to be removed, we are safe here.
2277  			 */
2278  			continue;
2279  
2280  		/*
2281  		 * We race with IO code path, which also changes pointer,
2282  		 * thus we have to be careful not to overwrite it.
2283  		 */
2284  		if (try_cmpxchg((struct rtrs_clt_path **)ppcpu_path, &clt_path,
2285  				next))
2286  			/*
2287  			 * @ppcpu_path was successfully replaced with @next,
2288  			 * that means that someone could also pick up the
2289  			 * @sess and dereferencing it right now, so wait for
2290  			 * a grace period is required.
2291  			 */
2292  			wait_for_grace = true;
2293  	}
2294  	if (wait_for_grace)
2295  		synchronize_rcu();
2296  
2297  	mutex_unlock(&clt->paths_mutex);
2298  }
2299  
rtrs_clt_add_path_to_arr(struct rtrs_clt_path * clt_path)2300  static void rtrs_clt_add_path_to_arr(struct rtrs_clt_path *clt_path)
2301  {
2302  	struct rtrs_clt_sess *clt = clt_path->clt;
2303  
2304  	mutex_lock(&clt->paths_mutex);
2305  	clt->paths_num++;
2306  
2307  	list_add_tail_rcu(&clt_path->s.entry, &clt->paths_list);
2308  	mutex_unlock(&clt->paths_mutex);
2309  }
2310  
rtrs_clt_close_work(struct work_struct * work)2311  static void rtrs_clt_close_work(struct work_struct *work)
2312  {
2313  	struct rtrs_clt_path *clt_path;
2314  
2315  	clt_path = container_of(work, struct rtrs_clt_path, close_work);
2316  
2317  	cancel_work_sync(&clt_path->err_recovery_work);
2318  	cancel_delayed_work_sync(&clt_path->reconnect_dwork);
2319  	rtrs_clt_stop_and_destroy_conns(clt_path);
2320  	rtrs_clt_change_state_get_old(clt_path, RTRS_CLT_CLOSED, NULL);
2321  }
2322  
init_conns(struct rtrs_clt_path * clt_path)2323  static int init_conns(struct rtrs_clt_path *clt_path)
2324  {
2325  	unsigned int cid;
2326  	int err, i;
2327  
2328  	/*
2329  	 * On every new session connections increase reconnect counter
2330  	 * to avoid clashes with previous sessions not yet closed
2331  	 * sessions on a server side.
2332  	 */
2333  	clt_path->s.recon_cnt++;
2334  
2335  	/* Establish all RDMA connections  */
2336  	for (cid = 0; cid < clt_path->s.con_num; cid++) {
2337  		err = create_con(clt_path, cid);
2338  		if (err)
2339  			goto destroy;
2340  
2341  		err = create_cm(to_clt_con(clt_path->s.con[cid]));
2342  		if (err)
2343  			goto destroy;
2344  	}
2345  
2346  	/*
2347  	 * Set the cid to con_num - 1, since if we fail later, we want to stay in bounds.
2348  	 */
2349  	cid = clt_path->s.con_num - 1;
2350  
2351  	err = alloc_path_reqs(clt_path);
2352  	if (err)
2353  		goto destroy;
2354  
2355  	return 0;
2356  
2357  destroy:
2358  	/* Make sure we do the cleanup in the order they are created */
2359  	for (i = 0; i <= cid; i++) {
2360  		struct rtrs_clt_con *con;
2361  
2362  		if (!clt_path->s.con[i])
2363  			break;
2364  
2365  		con = to_clt_con(clt_path->s.con[i]);
2366  		if (con->c.cm_id) {
2367  			stop_cm(con);
2368  			mutex_lock(&con->con_mutex);
2369  			destroy_con_cq_qp(con);
2370  			mutex_unlock(&con->con_mutex);
2371  			destroy_cm(con);
2372  		}
2373  		destroy_con(con);
2374  	}
2375  	/*
2376  	 * If we've never taken async path and got an error, say,
2377  	 * doing rdma_resolve_addr(), switch to CONNECTION_ERR state
2378  	 * manually to keep reconnecting.
2379  	 */
2380  	rtrs_clt_change_state_get_old(clt_path, RTRS_CLT_CONNECTING_ERR, NULL);
2381  
2382  	return err;
2383  }
2384  
rtrs_clt_info_req_done(struct ib_cq * cq,struct ib_wc * wc)2385  static void rtrs_clt_info_req_done(struct ib_cq *cq, struct ib_wc *wc)
2386  {
2387  	struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
2388  	struct rtrs_clt_path *clt_path = to_clt_path(con->c.path);
2389  	struct rtrs_iu *iu;
2390  
2391  	iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
2392  	rtrs_iu_free(iu, clt_path->s.dev->ib_dev, 1);
2393  
2394  	if (wc->status != IB_WC_SUCCESS) {
2395  		rtrs_err(clt_path->clt, "Path info request send failed: %s\n",
2396  			  ib_wc_status_msg(wc->status));
2397  		rtrs_clt_change_state_get_old(clt_path, RTRS_CLT_CONNECTING_ERR, NULL);
2398  		return;
2399  	}
2400  
2401  	rtrs_clt_update_wc_stats(con);
2402  }
2403  
process_info_rsp(struct rtrs_clt_path * clt_path,const struct rtrs_msg_info_rsp * msg)2404  static int process_info_rsp(struct rtrs_clt_path *clt_path,
2405  			    const struct rtrs_msg_info_rsp *msg)
2406  {
2407  	unsigned int sg_cnt, total_len;
2408  	int i, sgi;
2409  
2410  	sg_cnt = le16_to_cpu(msg->sg_cnt);
2411  	if (!sg_cnt || (clt_path->queue_depth % sg_cnt)) {
2412  		rtrs_err(clt_path->clt,
2413  			  "Incorrect sg_cnt %d, is not multiple\n",
2414  			  sg_cnt);
2415  		return -EINVAL;
2416  	}
2417  
2418  	/*
2419  	 * Check if IB immediate data size is enough to hold the mem_id and
2420  	 * the offset inside the memory chunk.
2421  	 */
2422  	if ((ilog2(sg_cnt - 1) + 1) + (ilog2(clt_path->chunk_size - 1) + 1) >
2423  	    MAX_IMM_PAYL_BITS) {
2424  		rtrs_err(clt_path->clt,
2425  			  "RDMA immediate size (%db) not enough to encode %d buffers of size %dB\n",
2426  			  MAX_IMM_PAYL_BITS, sg_cnt, clt_path->chunk_size);
2427  		return -EINVAL;
2428  	}
2429  	total_len = 0;
2430  	for (sgi = 0, i = 0; sgi < sg_cnt && i < clt_path->queue_depth; sgi++) {
2431  		const struct rtrs_sg_desc *desc = &msg->desc[sgi];
2432  		u32 len, rkey;
2433  		u64 addr;
2434  
2435  		addr = le64_to_cpu(desc->addr);
2436  		rkey = le32_to_cpu(desc->key);
2437  		len  = le32_to_cpu(desc->len);
2438  
2439  		total_len += len;
2440  
2441  		if (!len || (len % clt_path->chunk_size)) {
2442  			rtrs_err(clt_path->clt, "Incorrect [%d].len %d\n",
2443  				  sgi,
2444  				  len);
2445  			return -EINVAL;
2446  		}
2447  		for ( ; len && i < clt_path->queue_depth; i++) {
2448  			clt_path->rbufs[i].addr = addr;
2449  			clt_path->rbufs[i].rkey = rkey;
2450  
2451  			len  -= clt_path->chunk_size;
2452  			addr += clt_path->chunk_size;
2453  		}
2454  	}
2455  	/* Sanity check */
2456  	if (sgi != sg_cnt || i != clt_path->queue_depth) {
2457  		rtrs_err(clt_path->clt,
2458  			 "Incorrect sg vector, not fully mapped\n");
2459  		return -EINVAL;
2460  	}
2461  	if (total_len != clt_path->chunk_size * clt_path->queue_depth) {
2462  		rtrs_err(clt_path->clt, "Incorrect total_len %d\n", total_len);
2463  		return -EINVAL;
2464  	}
2465  
2466  	return 0;
2467  }
2468  
rtrs_clt_info_rsp_done(struct ib_cq * cq,struct ib_wc * wc)2469  static void rtrs_clt_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc)
2470  {
2471  	struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
2472  	struct rtrs_clt_path *clt_path = to_clt_path(con->c.path);
2473  	struct rtrs_msg_info_rsp *msg;
2474  	enum rtrs_clt_state state;
2475  	struct rtrs_iu *iu;
2476  	size_t rx_sz;
2477  	int err;
2478  
2479  	state = RTRS_CLT_CONNECTING_ERR;
2480  
2481  	WARN_ON(con->c.cid);
2482  	iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
2483  	if (wc->status != IB_WC_SUCCESS) {
2484  		rtrs_err(clt_path->clt, "Path info response recv failed: %s\n",
2485  			  ib_wc_status_msg(wc->status));
2486  		goto out;
2487  	}
2488  	WARN_ON(wc->opcode != IB_WC_RECV);
2489  
2490  	if (wc->byte_len < sizeof(*msg)) {
2491  		rtrs_err(clt_path->clt, "Path info response is malformed: size %d\n",
2492  			  wc->byte_len);
2493  		goto out;
2494  	}
2495  	ib_dma_sync_single_for_cpu(clt_path->s.dev->ib_dev, iu->dma_addr,
2496  				   iu->size, DMA_FROM_DEVICE);
2497  	msg = iu->buf;
2498  	if (le16_to_cpu(msg->type) != RTRS_MSG_INFO_RSP) {
2499  		rtrs_err(clt_path->clt, "Path info response is malformed: type %d\n",
2500  			  le16_to_cpu(msg->type));
2501  		goto out;
2502  	}
2503  	rx_sz  = sizeof(*msg);
2504  	rx_sz += sizeof(msg->desc[0]) * le16_to_cpu(msg->sg_cnt);
2505  	if (wc->byte_len < rx_sz) {
2506  		rtrs_err(clt_path->clt, "Path info response is malformed: size %d\n",
2507  			  wc->byte_len);
2508  		goto out;
2509  	}
2510  	err = process_info_rsp(clt_path, msg);
2511  	if (err)
2512  		goto out;
2513  
2514  	err = post_recv_path(clt_path);
2515  	if (err)
2516  		goto out;
2517  
2518  	state = RTRS_CLT_CONNECTED;
2519  
2520  out:
2521  	rtrs_clt_update_wc_stats(con);
2522  	rtrs_iu_free(iu, clt_path->s.dev->ib_dev, 1);
2523  	rtrs_clt_change_state_get_old(clt_path, state, NULL);
2524  }
2525  
rtrs_send_path_info(struct rtrs_clt_path * clt_path)2526  static int rtrs_send_path_info(struct rtrs_clt_path *clt_path)
2527  {
2528  	struct rtrs_clt_con *usr_con = to_clt_con(clt_path->s.con[0]);
2529  	struct rtrs_msg_info_req *msg;
2530  	struct rtrs_iu *tx_iu, *rx_iu;
2531  	size_t rx_sz;
2532  	int err;
2533  
2534  	rx_sz  = sizeof(struct rtrs_msg_info_rsp);
2535  	rx_sz += sizeof(struct rtrs_sg_desc) * clt_path->queue_depth;
2536  
2537  	tx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), GFP_KERNEL,
2538  			       clt_path->s.dev->ib_dev, DMA_TO_DEVICE,
2539  			       rtrs_clt_info_req_done);
2540  	rx_iu = rtrs_iu_alloc(1, rx_sz, GFP_KERNEL, clt_path->s.dev->ib_dev,
2541  			       DMA_FROM_DEVICE, rtrs_clt_info_rsp_done);
2542  	if (!tx_iu || !rx_iu) {
2543  		err = -ENOMEM;
2544  		goto out;
2545  	}
2546  	/* Prepare for getting info response */
2547  	err = rtrs_iu_post_recv(&usr_con->c, rx_iu);
2548  	if (err) {
2549  		rtrs_err(clt_path->clt, "rtrs_iu_post_recv(), err: %d\n", err);
2550  		goto out;
2551  	}
2552  	rx_iu = NULL;
2553  
2554  	msg = tx_iu->buf;
2555  	msg->type = cpu_to_le16(RTRS_MSG_INFO_REQ);
2556  	memcpy(msg->pathname, clt_path->s.sessname, sizeof(msg->pathname));
2557  
2558  	ib_dma_sync_single_for_device(clt_path->s.dev->ib_dev,
2559  				      tx_iu->dma_addr,
2560  				      tx_iu->size, DMA_TO_DEVICE);
2561  
2562  	/* Send info request */
2563  	err = rtrs_iu_post_send(&usr_con->c, tx_iu, sizeof(*msg), NULL);
2564  	if (err) {
2565  		rtrs_err(clt_path->clt, "rtrs_iu_post_send(), err: %d\n", err);
2566  		goto out;
2567  	}
2568  	tx_iu = NULL;
2569  
2570  	/* Wait for state change */
2571  	wait_event_interruptible_timeout(clt_path->state_wq,
2572  					 clt_path->state != RTRS_CLT_CONNECTING,
2573  					 msecs_to_jiffies(
2574  						 RTRS_CONNECT_TIMEOUT_MS));
2575  	if (READ_ONCE(clt_path->state) != RTRS_CLT_CONNECTED) {
2576  		if (READ_ONCE(clt_path->state) == RTRS_CLT_CONNECTING_ERR)
2577  			err = -ECONNRESET;
2578  		else
2579  			err = -ETIMEDOUT;
2580  	}
2581  
2582  out:
2583  	if (tx_iu)
2584  		rtrs_iu_free(tx_iu, clt_path->s.dev->ib_dev, 1);
2585  	if (rx_iu)
2586  		rtrs_iu_free(rx_iu, clt_path->s.dev->ib_dev, 1);
2587  	if (err)
2588  		/* If we've never taken async path because of malloc problems */
2589  		rtrs_clt_change_state_get_old(clt_path,
2590  					      RTRS_CLT_CONNECTING_ERR, NULL);
2591  
2592  	return err;
2593  }
2594  
2595  /**
2596   * init_path() - establishes all path connections and does handshake
2597   * @clt_path: client path.
2598   * In case of error full close or reconnect procedure should be taken,
2599   * because reconnect or close async works can be started.
2600   */
init_path(struct rtrs_clt_path * clt_path)2601  static int init_path(struct rtrs_clt_path *clt_path)
2602  {
2603  	int err;
2604  	char str[NAME_MAX];
2605  	struct rtrs_addr path = {
2606  		.src = &clt_path->s.src_addr,
2607  		.dst = &clt_path->s.dst_addr,
2608  	};
2609  
2610  	rtrs_addr_to_str(&path, str, sizeof(str));
2611  
2612  	mutex_lock(&clt_path->init_mutex);
2613  	err = init_conns(clt_path);
2614  	if (err) {
2615  		rtrs_err(clt_path->clt,
2616  			 "init_conns() failed: err=%d path=%s [%s:%u]\n", err,
2617  			 str, clt_path->hca_name, clt_path->hca_port);
2618  		goto out;
2619  	}
2620  	err = rtrs_send_path_info(clt_path);
2621  	if (err) {
2622  		rtrs_err(clt_path->clt,
2623  			 "rtrs_send_path_info() failed: err=%d path=%s [%s:%u]\n",
2624  			 err, str, clt_path->hca_name, clt_path->hca_port);
2625  		goto out;
2626  	}
2627  	rtrs_clt_path_up(clt_path);
2628  	rtrs_start_hb(&clt_path->s);
2629  out:
2630  	mutex_unlock(&clt_path->init_mutex);
2631  
2632  	return err;
2633  }
2634  
rtrs_clt_reconnect_work(struct work_struct * work)2635  static void rtrs_clt_reconnect_work(struct work_struct *work)
2636  {
2637  	struct rtrs_clt_path *clt_path;
2638  	struct rtrs_clt_sess *clt;
2639  	int err;
2640  
2641  	clt_path = container_of(to_delayed_work(work), struct rtrs_clt_path,
2642  				reconnect_dwork);
2643  	clt = clt_path->clt;
2644  
2645  	trace_rtrs_clt_reconnect_work(clt_path);
2646  
2647  	if (READ_ONCE(clt_path->state) != RTRS_CLT_RECONNECTING)
2648  		return;
2649  
2650  	if (clt_path->reconnect_attempts >= clt->max_reconnect_attempts) {
2651  		/* Close a path completely if max attempts is reached */
2652  		rtrs_clt_close_conns(clt_path, false);
2653  		return;
2654  	}
2655  	clt_path->reconnect_attempts++;
2656  
2657  	msleep(RTRS_RECONNECT_BACKOFF);
2658  	if (rtrs_clt_change_state_get_old(clt_path, RTRS_CLT_CONNECTING, NULL)) {
2659  		err = init_path(clt_path);
2660  		if (err)
2661  			goto reconnect_again;
2662  	}
2663  
2664  	return;
2665  
2666  reconnect_again:
2667  	if (rtrs_clt_change_state_get_old(clt_path, RTRS_CLT_RECONNECTING, NULL)) {
2668  		clt_path->stats->reconnects.fail_cnt++;
2669  		queue_work(rtrs_wq, &clt_path->err_recovery_work);
2670  	}
2671  }
2672  
rtrs_clt_dev_release(struct device * dev)2673  static void rtrs_clt_dev_release(struct device *dev)
2674  {
2675  	struct rtrs_clt_sess *clt = container_of(dev, struct rtrs_clt_sess,
2676  						 dev);
2677  
2678  	mutex_destroy(&clt->paths_ev_mutex);
2679  	mutex_destroy(&clt->paths_mutex);
2680  	kfree(clt);
2681  }
2682  
alloc_clt(const char * sessname,size_t paths_num,u16 port,size_t pdu_sz,void * priv,void (* link_ev)(void * priv,enum rtrs_clt_link_ev ev),unsigned int reconnect_delay_sec,unsigned int max_reconnect_attempts)2683  static struct rtrs_clt_sess *alloc_clt(const char *sessname, size_t paths_num,
2684  				  u16 port, size_t pdu_sz, void *priv,
2685  				  void	(*link_ev)(void *priv,
2686  						   enum rtrs_clt_link_ev ev),
2687  				  unsigned int reconnect_delay_sec,
2688  				  unsigned int max_reconnect_attempts)
2689  {
2690  	struct rtrs_clt_sess *clt;
2691  	int err;
2692  
2693  	if (!paths_num || paths_num > MAX_PATHS_NUM)
2694  		return ERR_PTR(-EINVAL);
2695  
2696  	if (strlen(sessname) >= sizeof(clt->sessname))
2697  		return ERR_PTR(-EINVAL);
2698  
2699  	clt = kzalloc(sizeof(*clt), GFP_KERNEL);
2700  	if (!clt)
2701  		return ERR_PTR(-ENOMEM);
2702  
2703  	clt->pcpu_path = alloc_percpu(typeof(*clt->pcpu_path));
2704  	if (!clt->pcpu_path) {
2705  		kfree(clt);
2706  		return ERR_PTR(-ENOMEM);
2707  	}
2708  
2709  	clt->dev.class = &rtrs_clt_dev_class;
2710  	clt->dev.release = rtrs_clt_dev_release;
2711  	uuid_gen(&clt->paths_uuid);
2712  	INIT_LIST_HEAD_RCU(&clt->paths_list);
2713  	clt->paths_num = paths_num;
2714  	clt->paths_up = MAX_PATHS_NUM;
2715  	clt->port = port;
2716  	clt->pdu_sz = pdu_sz;
2717  	clt->max_segments = RTRS_MAX_SEGMENTS;
2718  	clt->reconnect_delay_sec = reconnect_delay_sec;
2719  	clt->max_reconnect_attempts = max_reconnect_attempts;
2720  	clt->priv = priv;
2721  	clt->link_ev = link_ev;
2722  	clt->mp_policy = MP_POLICY_MIN_INFLIGHT;
2723  	strscpy(clt->sessname, sessname, sizeof(clt->sessname));
2724  	init_waitqueue_head(&clt->permits_wait);
2725  	mutex_init(&clt->paths_ev_mutex);
2726  	mutex_init(&clt->paths_mutex);
2727  	device_initialize(&clt->dev);
2728  
2729  	err = dev_set_name(&clt->dev, "%s", sessname);
2730  	if (err)
2731  		goto err_put;
2732  
2733  	/*
2734  	 * Suppress user space notification until
2735  	 * sysfs files are created
2736  	 */
2737  	dev_set_uevent_suppress(&clt->dev, true);
2738  	err = device_add(&clt->dev);
2739  	if (err)
2740  		goto err_put;
2741  
2742  	clt->kobj_paths = kobject_create_and_add("paths", &clt->dev.kobj);
2743  	if (!clt->kobj_paths) {
2744  		err = -ENOMEM;
2745  		goto err_del;
2746  	}
2747  	err = rtrs_clt_create_sysfs_root_files(clt);
2748  	if (err) {
2749  		kobject_del(clt->kobj_paths);
2750  		kobject_put(clt->kobj_paths);
2751  		goto err_del;
2752  	}
2753  	dev_set_uevent_suppress(&clt->dev, false);
2754  	kobject_uevent(&clt->dev.kobj, KOBJ_ADD);
2755  
2756  	return clt;
2757  err_del:
2758  	device_del(&clt->dev);
2759  err_put:
2760  	free_percpu(clt->pcpu_path);
2761  	put_device(&clt->dev);
2762  	return ERR_PTR(err);
2763  }
2764  
free_clt(struct rtrs_clt_sess * clt)2765  static void free_clt(struct rtrs_clt_sess *clt)
2766  {
2767  	free_percpu(clt->pcpu_path);
2768  
2769  	/*
2770  	 * release callback will free clt and destroy mutexes in last put
2771  	 */
2772  	device_unregister(&clt->dev);
2773  }
2774  
2775  /**
2776   * rtrs_clt_open() - Open a path to an RTRS server
2777   * @ops: holds the link event callback and the private pointer.
2778   * @pathname: name of the path to an RTRS server
2779   * @paths: Paths to be established defined by their src and dst addresses
2780   * @paths_num: Number of elements in the @paths array
2781   * @port: port to be used by the RTRS session
2782   * @pdu_sz: Size of extra payload which can be accessed after permit allocation.
2783   * @reconnect_delay_sec: time between reconnect tries
2784   * @max_reconnect_attempts: Number of times to reconnect on error before giving
2785   *			    up, 0 for * disabled, -1 for forever
2786   * @nr_poll_queues: number of polling mode connection using IB_POLL_DIRECT flag
2787   *
2788   * Starts session establishment with the rtrs_server. The function can block
2789   * up to ~2000ms before it returns.
2790   *
2791   * Return a valid pointer on success otherwise PTR_ERR.
2792   */
rtrs_clt_open(struct rtrs_clt_ops * ops,const char * pathname,const struct rtrs_addr * paths,size_t paths_num,u16 port,size_t pdu_sz,u8 reconnect_delay_sec,s16 max_reconnect_attempts,u32 nr_poll_queues)2793  struct rtrs_clt_sess *rtrs_clt_open(struct rtrs_clt_ops *ops,
2794  				 const char *pathname,
2795  				 const struct rtrs_addr *paths,
2796  				 size_t paths_num, u16 port,
2797  				 size_t pdu_sz, u8 reconnect_delay_sec,
2798  				 s16 max_reconnect_attempts, u32 nr_poll_queues)
2799  {
2800  	struct rtrs_clt_path *clt_path, *tmp;
2801  	struct rtrs_clt_sess *clt;
2802  	int err, i;
2803  
2804  	if (strchr(pathname, '/') || strchr(pathname, '.')) {
2805  		pr_err("pathname cannot contain / and .\n");
2806  		err = -EINVAL;
2807  		goto out;
2808  	}
2809  
2810  	clt = alloc_clt(pathname, paths_num, port, pdu_sz, ops->priv,
2811  			ops->link_ev,
2812  			reconnect_delay_sec,
2813  			max_reconnect_attempts);
2814  	if (IS_ERR(clt)) {
2815  		err = PTR_ERR(clt);
2816  		goto out;
2817  	}
2818  	for (i = 0; i < paths_num; i++) {
2819  		struct rtrs_clt_path *clt_path;
2820  
2821  		clt_path = alloc_path(clt, &paths[i], nr_cpu_ids,
2822  				  nr_poll_queues);
2823  		if (IS_ERR(clt_path)) {
2824  			err = PTR_ERR(clt_path);
2825  			goto close_all_path;
2826  		}
2827  		if (!i)
2828  			clt_path->for_new_clt = 1;
2829  		list_add_tail_rcu(&clt_path->s.entry, &clt->paths_list);
2830  
2831  		err = init_path(clt_path);
2832  		if (err) {
2833  			list_del_rcu(&clt_path->s.entry);
2834  			rtrs_clt_close_conns(clt_path, true);
2835  			free_percpu(clt_path->stats->pcpu_stats);
2836  			kfree(clt_path->stats);
2837  			free_path(clt_path);
2838  			goto close_all_path;
2839  		}
2840  
2841  		err = rtrs_clt_create_path_files(clt_path);
2842  		if (err) {
2843  			list_del_rcu(&clt_path->s.entry);
2844  			rtrs_clt_close_conns(clt_path, true);
2845  			free_percpu(clt_path->stats->pcpu_stats);
2846  			kfree(clt_path->stats);
2847  			free_path(clt_path);
2848  			goto close_all_path;
2849  		}
2850  	}
2851  	err = alloc_permits(clt);
2852  	if (err)
2853  		goto close_all_path;
2854  
2855  	return clt;
2856  
2857  close_all_path:
2858  	list_for_each_entry_safe(clt_path, tmp, &clt->paths_list, s.entry) {
2859  		rtrs_clt_destroy_path_files(clt_path, NULL);
2860  		rtrs_clt_close_conns(clt_path, true);
2861  		kobject_put(&clt_path->kobj);
2862  	}
2863  	rtrs_clt_destroy_sysfs_root(clt);
2864  	free_clt(clt);
2865  
2866  out:
2867  	return ERR_PTR(err);
2868  }
2869  EXPORT_SYMBOL(rtrs_clt_open);
2870  
2871  /**
2872   * rtrs_clt_close() - Close a path
2873   * @clt: Session handle. Session is freed upon return.
2874   */
rtrs_clt_close(struct rtrs_clt_sess * clt)2875  void rtrs_clt_close(struct rtrs_clt_sess *clt)
2876  {
2877  	struct rtrs_clt_path *clt_path, *tmp;
2878  
2879  	/* Firstly forbid sysfs access */
2880  	rtrs_clt_destroy_sysfs_root(clt);
2881  
2882  	/* Now it is safe to iterate over all paths without locks */
2883  	list_for_each_entry_safe(clt_path, tmp, &clt->paths_list, s.entry) {
2884  		rtrs_clt_close_conns(clt_path, true);
2885  		rtrs_clt_destroy_path_files(clt_path, NULL);
2886  		kobject_put(&clt_path->kobj);
2887  	}
2888  	free_permits(clt);
2889  	free_clt(clt);
2890  }
2891  EXPORT_SYMBOL(rtrs_clt_close);
2892  
rtrs_clt_reconnect_from_sysfs(struct rtrs_clt_path * clt_path)2893  int rtrs_clt_reconnect_from_sysfs(struct rtrs_clt_path *clt_path)
2894  {
2895  	enum rtrs_clt_state old_state;
2896  	int err = -EBUSY;
2897  	bool changed;
2898  
2899  	changed = rtrs_clt_change_state_get_old(clt_path,
2900  						 RTRS_CLT_RECONNECTING,
2901  						 &old_state);
2902  	if (changed) {
2903  		clt_path->reconnect_attempts = 0;
2904  		rtrs_clt_stop_and_destroy_conns(clt_path);
2905  		queue_delayed_work(rtrs_wq, &clt_path->reconnect_dwork, 0);
2906  	}
2907  	if (changed || old_state == RTRS_CLT_RECONNECTING) {
2908  		/*
2909  		 * flush_delayed_work() queues pending work for immediate
2910  		 * execution, so do the flush if we have queued something
2911  		 * right now or work is pending.
2912  		 */
2913  		flush_delayed_work(&clt_path->reconnect_dwork);
2914  		err = (READ_ONCE(clt_path->state) ==
2915  		       RTRS_CLT_CONNECTED ? 0 : -ENOTCONN);
2916  	}
2917  
2918  	return err;
2919  }
2920  
rtrs_clt_remove_path_from_sysfs(struct rtrs_clt_path * clt_path,const struct attribute * sysfs_self)2921  int rtrs_clt_remove_path_from_sysfs(struct rtrs_clt_path *clt_path,
2922  				     const struct attribute *sysfs_self)
2923  {
2924  	enum rtrs_clt_state old_state;
2925  	bool changed;
2926  
2927  	/*
2928  	 * Continue stopping path till state was changed to DEAD or
2929  	 * state was observed as DEAD:
2930  	 * 1. State was changed to DEAD - we were fast and nobody
2931  	 *    invoked rtrs_clt_reconnect(), which can again start
2932  	 *    reconnecting.
2933  	 * 2. State was observed as DEAD - we have someone in parallel
2934  	 *    removing the path.
2935  	 */
2936  	do {
2937  		rtrs_clt_close_conns(clt_path, true);
2938  		changed = rtrs_clt_change_state_get_old(clt_path,
2939  							RTRS_CLT_DEAD,
2940  							&old_state);
2941  	} while (!changed && old_state != RTRS_CLT_DEAD);
2942  
2943  	if (changed) {
2944  		rtrs_clt_remove_path_from_arr(clt_path);
2945  		rtrs_clt_destroy_path_files(clt_path, sysfs_self);
2946  		kobject_put(&clt_path->kobj);
2947  	}
2948  
2949  	return 0;
2950  }
2951  
rtrs_clt_set_max_reconnect_attempts(struct rtrs_clt_sess * clt,int value)2952  void rtrs_clt_set_max_reconnect_attempts(struct rtrs_clt_sess *clt, int value)
2953  {
2954  	clt->max_reconnect_attempts = (unsigned int)value;
2955  }
2956  
rtrs_clt_get_max_reconnect_attempts(const struct rtrs_clt_sess * clt)2957  int rtrs_clt_get_max_reconnect_attempts(const struct rtrs_clt_sess *clt)
2958  {
2959  	return (int)clt->max_reconnect_attempts;
2960  }
2961  
2962  /**
2963   * rtrs_clt_request() - Request data transfer to/from server via RDMA.
2964   *
2965   * @dir:	READ/WRITE
2966   * @ops:	callback function to be called as confirmation, and the pointer.
2967   * @clt:	Session
2968   * @permit:	Preallocated permit
2969   * @vec:	Message that is sent to server together with the request.
2970   *		Sum of len of all @vec elements limited to <= IO_MSG_SIZE.
2971   *		Since the msg is copied internally it can be allocated on stack.
2972   * @nr:		Number of elements in @vec.
2973   * @data_len:	length of data sent to/from server
2974   * @sg:		Pages to be sent/received to/from server.
2975   * @sg_cnt:	Number of elements in the @sg
2976   *
2977   * Return:
2978   * 0:		Success
2979   * <0:		Error
2980   *
2981   * On dir=READ rtrs client will request a data transfer from Server to client.
2982   * The data that the server will respond with will be stored in @sg when
2983   * the user receives an %RTRS_CLT_RDMA_EV_RDMA_REQUEST_WRITE_COMPL event.
2984   * On dir=WRITE rtrs client will rdma write data in sg to server side.
2985   */
rtrs_clt_request(int dir,struct rtrs_clt_req_ops * ops,struct rtrs_clt_sess * clt,struct rtrs_permit * permit,const struct kvec * vec,size_t nr,size_t data_len,struct scatterlist * sg,unsigned int sg_cnt)2986  int rtrs_clt_request(int dir, struct rtrs_clt_req_ops *ops,
2987  		     struct rtrs_clt_sess *clt, struct rtrs_permit *permit,
2988  		     const struct kvec *vec, size_t nr, size_t data_len,
2989  		     struct scatterlist *sg, unsigned int sg_cnt)
2990  {
2991  	struct rtrs_clt_io_req *req;
2992  	struct rtrs_clt_path *clt_path;
2993  
2994  	enum dma_data_direction dma_dir;
2995  	int err = -ECONNABORTED, i;
2996  	size_t usr_len, hdr_len;
2997  	struct path_it it;
2998  
2999  	/* Get kvec length */
3000  	for (i = 0, usr_len = 0; i < nr; i++)
3001  		usr_len += vec[i].iov_len;
3002  
3003  	if (dir == READ) {
3004  		hdr_len = sizeof(struct rtrs_msg_rdma_read) +
3005  			  sg_cnt * sizeof(struct rtrs_sg_desc);
3006  		dma_dir = DMA_FROM_DEVICE;
3007  	} else {
3008  		hdr_len = sizeof(struct rtrs_msg_rdma_write);
3009  		dma_dir = DMA_TO_DEVICE;
3010  	}
3011  
3012  	rcu_read_lock();
3013  	for (path_it_init(&it, clt);
3014  	     (clt_path = it.next_path(&it)) && it.i < it.clt->paths_num; it.i++) {
3015  		if (READ_ONCE(clt_path->state) != RTRS_CLT_CONNECTED)
3016  			continue;
3017  
3018  		if (usr_len + hdr_len > clt_path->max_hdr_size) {
3019  			rtrs_wrn_rl(clt_path->clt,
3020  				     "%s request failed, user message size is %zu and header length %zu, but max size is %u\n",
3021  				     dir == READ ? "Read" : "Write",
3022  				     usr_len, hdr_len, clt_path->max_hdr_size);
3023  			err = -EMSGSIZE;
3024  			break;
3025  		}
3026  		req = rtrs_clt_get_req(clt_path, ops->conf_fn, permit, ops->priv,
3027  				       vec, usr_len, sg, sg_cnt, data_len,
3028  				       dma_dir);
3029  		if (dir == READ)
3030  			err = rtrs_clt_read_req(req);
3031  		else
3032  			err = rtrs_clt_write_req(req);
3033  		if (err) {
3034  			req->in_use = false;
3035  			continue;
3036  		}
3037  		/* Success path */
3038  		break;
3039  	}
3040  	path_it_deinit(&it);
3041  	rcu_read_unlock();
3042  
3043  	return err;
3044  }
3045  EXPORT_SYMBOL(rtrs_clt_request);
3046  
rtrs_clt_rdma_cq_direct(struct rtrs_clt_sess * clt,unsigned int index)3047  int rtrs_clt_rdma_cq_direct(struct rtrs_clt_sess *clt, unsigned int index)
3048  {
3049  	/* If no path, return -1 for block layer not to try again */
3050  	int cnt = -1;
3051  	struct rtrs_con *con;
3052  	struct rtrs_clt_path *clt_path;
3053  	struct path_it it;
3054  
3055  	rcu_read_lock();
3056  	for (path_it_init(&it, clt);
3057  	     (clt_path = it.next_path(&it)) && it.i < it.clt->paths_num; it.i++) {
3058  		if (READ_ONCE(clt_path->state) != RTRS_CLT_CONNECTED)
3059  			continue;
3060  
3061  		con = clt_path->s.con[index + 1];
3062  		cnt = ib_process_cq_direct(con->cq, -1);
3063  		if (cnt)
3064  			break;
3065  	}
3066  	path_it_deinit(&it);
3067  	rcu_read_unlock();
3068  
3069  	return cnt;
3070  }
3071  EXPORT_SYMBOL(rtrs_clt_rdma_cq_direct);
3072  
3073  /**
3074   * rtrs_clt_query() - queries RTRS session attributes
3075   *@clt: session pointer
3076   *@attr: query results for session attributes.
3077   * Returns:
3078   *    0 on success
3079   *    -ECOMM		no connection to the server
3080   */
rtrs_clt_query(struct rtrs_clt_sess * clt,struct rtrs_attrs * attr)3081  int rtrs_clt_query(struct rtrs_clt_sess *clt, struct rtrs_attrs *attr)
3082  {
3083  	if (!rtrs_clt_is_connected(clt))
3084  		return -ECOMM;
3085  
3086  	attr->queue_depth      = clt->queue_depth;
3087  	attr->max_segments     = clt->max_segments;
3088  	/* Cap max_io_size to min of remote buffer size and the fr pages */
3089  	attr->max_io_size = min_t(int, clt->max_io_size,
3090  				  clt->max_segments * SZ_4K);
3091  
3092  	return 0;
3093  }
3094  EXPORT_SYMBOL(rtrs_clt_query);
3095  
rtrs_clt_create_path_from_sysfs(struct rtrs_clt_sess * clt,struct rtrs_addr * addr)3096  int rtrs_clt_create_path_from_sysfs(struct rtrs_clt_sess *clt,
3097  				     struct rtrs_addr *addr)
3098  {
3099  	struct rtrs_clt_path *clt_path;
3100  	int err;
3101  
3102  	clt_path = alloc_path(clt, addr, nr_cpu_ids, 0);
3103  	if (IS_ERR(clt_path))
3104  		return PTR_ERR(clt_path);
3105  
3106  	mutex_lock(&clt->paths_mutex);
3107  	if (clt->paths_num == 0) {
3108  		/*
3109  		 * When all the paths are removed for a session,
3110  		 * the addition of the first path is like a new session for
3111  		 * the storage server
3112  		 */
3113  		clt_path->for_new_clt = 1;
3114  	}
3115  
3116  	mutex_unlock(&clt->paths_mutex);
3117  
3118  	/*
3119  	 * It is totally safe to add path in CONNECTING state: coming
3120  	 * IO will never grab it.  Also it is very important to add
3121  	 * path before init, since init fires LINK_CONNECTED event.
3122  	 */
3123  	rtrs_clt_add_path_to_arr(clt_path);
3124  
3125  	err = init_path(clt_path);
3126  	if (err)
3127  		goto close_path;
3128  
3129  	err = rtrs_clt_create_path_files(clt_path);
3130  	if (err)
3131  		goto close_path;
3132  
3133  	return 0;
3134  
3135  close_path:
3136  	rtrs_clt_remove_path_from_arr(clt_path);
3137  	rtrs_clt_close_conns(clt_path, true);
3138  	free_percpu(clt_path->stats->pcpu_stats);
3139  	kfree(clt_path->stats);
3140  	free_path(clt_path);
3141  
3142  	return err;
3143  }
3144  
rtrs_clt_ib_dev_init(struct rtrs_ib_dev * dev)3145  static int rtrs_clt_ib_dev_init(struct rtrs_ib_dev *dev)
3146  {
3147  	if (!(dev->ib_dev->attrs.device_cap_flags &
3148  	      IB_DEVICE_MEM_MGT_EXTENSIONS)) {
3149  		pr_err("Memory registrations not supported.\n");
3150  		return -ENOTSUPP;
3151  	}
3152  
3153  	return 0;
3154  }
3155  
3156  static const struct rtrs_rdma_dev_pd_ops dev_pd_ops = {
3157  	.init = rtrs_clt_ib_dev_init
3158  };
3159  
rtrs_client_init(void)3160  static int __init rtrs_client_init(void)
3161  {
3162  	int ret = 0;
3163  
3164  	rtrs_rdma_dev_pd_init(0, &dev_pd);
3165  	ret = class_register(&rtrs_clt_dev_class);
3166  	if (ret) {
3167  		pr_err("Failed to create rtrs-client dev class\n");
3168  		return ret;
3169  	}
3170  	rtrs_wq = alloc_workqueue("rtrs_client_wq", 0, 0);
3171  	if (!rtrs_wq) {
3172  		class_unregister(&rtrs_clt_dev_class);
3173  		return -ENOMEM;
3174  	}
3175  
3176  	return 0;
3177  }
3178  
rtrs_client_exit(void)3179  static void __exit rtrs_client_exit(void)
3180  {
3181  	destroy_workqueue(rtrs_wq);
3182  	class_unregister(&rtrs_clt_dev_class);
3183  	rtrs_rdma_dev_pd_deinit(&dev_pd);
3184  }
3185  
3186  module_init(rtrs_client_init);
3187  module_exit(rtrs_client_exit);
3188