1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Transport Layer 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 #include <linux/rculist.h> 15 #include <linux/random.h> 16 17 #include "rtrs-clt.h" 18 #include "rtrs-log.h" 19 20 #define RTRS_CONNECT_TIMEOUT_MS 30000 21 /* 22 * Wait a bit before trying to reconnect after a failure 23 * in order to give server time to finish clean up which 24 * leads to "false positives" failed reconnect attempts 25 */ 26 #define RTRS_RECONNECT_BACKOFF 1000 27 /* 28 * Wait for additional random time between 0 and 8 seconds 29 * before starting to reconnect to avoid clients reconnecting 30 * all at once in case of a major network outage 31 */ 32 #define RTRS_RECONNECT_SEED 8 33 34 MODULE_DESCRIPTION("RDMA Transport Client"); 35 MODULE_LICENSE("GPL"); 36 37 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops; 38 static struct rtrs_rdma_dev_pd dev_pd = { 39 .ops = &dev_pd_ops 40 }; 41 42 static struct workqueue_struct *rtrs_wq; 43 static struct class *rtrs_clt_dev_class; 44 45 static inline bool rtrs_clt_is_connected(const struct rtrs_clt *clt) 46 { 47 struct rtrs_clt_sess *sess; 48 bool connected = false; 49 50 rcu_read_lock(); 51 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) 52 connected |= READ_ONCE(sess->state) == RTRS_CLT_CONNECTED; 53 rcu_read_unlock(); 54 55 return connected; 56 } 57 58 static struct rtrs_permit * 59 __rtrs_get_permit(struct rtrs_clt *clt, enum rtrs_clt_con_type con_type) 60 { 61 size_t max_depth = clt->queue_depth; 62 struct rtrs_permit *permit; 63 int bit; 64 65 /* 66 * Adapted from null_blk get_tag(). Callers from different cpus may 67 * grab the same bit, since find_first_zero_bit is not atomic. 68 * But then the test_and_set_bit_lock will fail for all the 69 * callers but one, so that they will loop again. 70 * This way an explicit spinlock is not required. 71 */ 72 do { 73 bit = find_first_zero_bit(clt->permits_map, max_depth); 74 if (unlikely(bit >= max_depth)) 75 return NULL; 76 } while (unlikely(test_and_set_bit_lock(bit, clt->permits_map))); 77 78 permit = get_permit(clt, bit); 79 WARN_ON(permit->mem_id != bit); 80 permit->cpu_id = raw_smp_processor_id(); 81 permit->con_type = con_type; 82 83 return permit; 84 } 85 86 static inline void __rtrs_put_permit(struct rtrs_clt *clt, 87 struct rtrs_permit *permit) 88 { 89 clear_bit_unlock(permit->mem_id, clt->permits_map); 90 } 91 92 /** 93 * rtrs_clt_get_permit() - allocates permit for future RDMA operation 94 * @clt: Current session 95 * @con_type: Type of connection to use with the permit 96 * @can_wait: Wait type 97 * 98 * Description: 99 * Allocates permit for the following RDMA operation. Permit is used 100 * to preallocate all resources and to propagate memory pressure 101 * up earlier. 102 * 103 * Context: 104 * Can sleep if @wait == RTRS_TAG_WAIT 105 */ 106 struct rtrs_permit *rtrs_clt_get_permit(struct rtrs_clt *clt, 107 enum rtrs_clt_con_type con_type, 108 int can_wait) 109 { 110 struct rtrs_permit *permit; 111 DEFINE_WAIT(wait); 112 113 permit = __rtrs_get_permit(clt, con_type); 114 if (likely(permit) || !can_wait) 115 return permit; 116 117 do { 118 prepare_to_wait(&clt->permits_wait, &wait, 119 TASK_UNINTERRUPTIBLE); 120 permit = __rtrs_get_permit(clt, con_type); 121 if (likely(permit)) 122 break; 123 124 io_schedule(); 125 } while (1); 126 127 finish_wait(&clt->permits_wait, &wait); 128 129 return permit; 130 } 131 EXPORT_SYMBOL(rtrs_clt_get_permit); 132 133 /** 134 * rtrs_clt_put_permit() - puts allocated permit 135 * @clt: Current session 136 * @permit: Permit to be freed 137 * 138 * Context: 139 * Does not matter 140 */ 141 void rtrs_clt_put_permit(struct rtrs_clt *clt, struct rtrs_permit *permit) 142 { 143 if (WARN_ON(!test_bit(permit->mem_id, clt->permits_map))) 144 return; 145 146 __rtrs_put_permit(clt, permit); 147 148 /* 149 * rtrs_clt_get_permit() adds itself to the &clt->permits_wait list 150 * before calling schedule(). So if rtrs_clt_get_permit() is sleeping 151 * it must have added itself to &clt->permits_wait before 152 * __rtrs_put_permit() finished. 153 * Hence it is safe to guard wake_up() with a waitqueue_active() test. 154 */ 155 if (waitqueue_active(&clt->permits_wait)) 156 wake_up(&clt->permits_wait); 157 } 158 EXPORT_SYMBOL(rtrs_clt_put_permit); 159 160 void *rtrs_permit_to_pdu(struct rtrs_permit *permit) 161 { 162 return permit + 1; 163 } 164 EXPORT_SYMBOL(rtrs_permit_to_pdu); 165 166 /** 167 * rtrs_permit_to_clt_con() - returns RDMA connection pointer by the permit 168 * @sess: client session pointer 169 * @permit: permit for the allocation of the RDMA buffer 170 * Note: 171 * IO connection starts from 1. 172 * 0 connection is for user messages. 173 */ 174 static 175 struct rtrs_clt_con *rtrs_permit_to_clt_con(struct rtrs_clt_sess *sess, 176 struct rtrs_permit *permit) 177 { 178 int id = 0; 179 180 if (likely(permit->con_type == RTRS_IO_CON)) 181 id = (permit->cpu_id % (sess->s.con_num - 1)) + 1; 182 183 return to_clt_con(sess->s.con[id]); 184 } 185 186 /** 187 * __rtrs_clt_change_state() - change the session state through session state 188 * machine. 189 * 190 * @sess: client session to change the state of. 191 * @new_state: state to change to. 192 * 193 * returns true if successful, false if the requested state can not be set. 194 * 195 * Locks: 196 * state_wq lock must be hold. 197 */ 198 static bool __rtrs_clt_change_state(struct rtrs_clt_sess *sess, 199 enum rtrs_clt_state new_state) 200 { 201 enum rtrs_clt_state old_state; 202 bool changed = false; 203 204 lockdep_assert_held(&sess->state_wq.lock); 205 206 old_state = sess->state; 207 switch (new_state) { 208 case RTRS_CLT_CONNECTING: 209 switch (old_state) { 210 case RTRS_CLT_RECONNECTING: 211 changed = true; 212 fallthrough; 213 default: 214 break; 215 } 216 break; 217 case RTRS_CLT_RECONNECTING: 218 switch (old_state) { 219 case RTRS_CLT_CONNECTED: 220 case RTRS_CLT_CONNECTING_ERR: 221 case RTRS_CLT_CLOSED: 222 changed = true; 223 fallthrough; 224 default: 225 break; 226 } 227 break; 228 case RTRS_CLT_CONNECTED: 229 switch (old_state) { 230 case RTRS_CLT_CONNECTING: 231 changed = true; 232 fallthrough; 233 default: 234 break; 235 } 236 break; 237 case RTRS_CLT_CONNECTING_ERR: 238 switch (old_state) { 239 case RTRS_CLT_CONNECTING: 240 changed = true; 241 fallthrough; 242 default: 243 break; 244 } 245 break; 246 case RTRS_CLT_CLOSING: 247 switch (old_state) { 248 case RTRS_CLT_CONNECTING: 249 case RTRS_CLT_CONNECTING_ERR: 250 case RTRS_CLT_RECONNECTING: 251 case RTRS_CLT_CONNECTED: 252 changed = true; 253 fallthrough; 254 default: 255 break; 256 } 257 break; 258 case RTRS_CLT_CLOSED: 259 switch (old_state) { 260 case RTRS_CLT_CLOSING: 261 changed = true; 262 fallthrough; 263 default: 264 break; 265 } 266 break; 267 case RTRS_CLT_DEAD: 268 switch (old_state) { 269 case RTRS_CLT_CLOSED: 270 changed = true; 271 fallthrough; 272 default: 273 break; 274 } 275 break; 276 default: 277 break; 278 } 279 if (changed) { 280 sess->state = new_state; 281 wake_up_locked(&sess->state_wq); 282 } 283 284 return changed; 285 } 286 287 static bool rtrs_clt_change_state_from_to(struct rtrs_clt_sess *sess, 288 enum rtrs_clt_state old_state, 289 enum rtrs_clt_state new_state) 290 { 291 bool changed = false; 292 293 spin_lock_irq(&sess->state_wq.lock); 294 if (sess->state == old_state) 295 changed = __rtrs_clt_change_state(sess, new_state); 296 spin_unlock_irq(&sess->state_wq.lock); 297 298 return changed; 299 } 300 301 static void rtrs_rdma_error_recovery(struct rtrs_clt_con *con) 302 { 303 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 304 305 if (rtrs_clt_change_state_from_to(sess, 306 RTRS_CLT_CONNECTED, 307 RTRS_CLT_RECONNECTING)) { 308 struct rtrs_clt *clt = sess->clt; 309 unsigned int delay_ms; 310 311 /* 312 * Normal scenario, reconnect if we were successfully connected 313 */ 314 delay_ms = clt->reconnect_delay_sec * 1000; 315 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 316 msecs_to_jiffies(delay_ms + 317 prandom_u32() % RTRS_RECONNECT_SEED)); 318 } else { 319 /* 320 * Error can happen just on establishing new connection, 321 * so notify waiter with error state, waiter is responsible 322 * for cleaning the rest and reconnect if needed. 323 */ 324 rtrs_clt_change_state_from_to(sess, 325 RTRS_CLT_CONNECTING, 326 RTRS_CLT_CONNECTING_ERR); 327 } 328 } 329 330 static void rtrs_clt_fast_reg_done(struct ib_cq *cq, struct ib_wc *wc) 331 { 332 struct rtrs_clt_con *con = cq->cq_context; 333 334 if (unlikely(wc->status != IB_WC_SUCCESS)) { 335 rtrs_err(con->c.sess, "Failed IB_WR_REG_MR: %s\n", 336 ib_wc_status_msg(wc->status)); 337 rtrs_rdma_error_recovery(con); 338 } 339 } 340 341 static struct ib_cqe fast_reg_cqe = { 342 .done = rtrs_clt_fast_reg_done 343 }; 344 345 static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno, 346 bool notify, bool can_wait); 347 348 static void rtrs_clt_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc) 349 { 350 struct rtrs_clt_io_req *req = 351 container_of(wc->wr_cqe, typeof(*req), inv_cqe); 352 struct rtrs_clt_con *con = cq->cq_context; 353 354 if (unlikely(wc->status != IB_WC_SUCCESS)) { 355 rtrs_err(con->c.sess, "Failed IB_WR_LOCAL_INV: %s\n", 356 ib_wc_status_msg(wc->status)); 357 rtrs_rdma_error_recovery(con); 358 } 359 req->need_inv = false; 360 if (likely(req->need_inv_comp)) 361 complete(&req->inv_comp); 362 else 363 /* Complete request from INV callback */ 364 complete_rdma_req(req, req->inv_errno, true, false); 365 } 366 367 static int rtrs_inv_rkey(struct rtrs_clt_io_req *req) 368 { 369 struct rtrs_clt_con *con = req->con; 370 struct ib_send_wr wr = { 371 .opcode = IB_WR_LOCAL_INV, 372 .wr_cqe = &req->inv_cqe, 373 .send_flags = IB_SEND_SIGNALED, 374 .ex.invalidate_rkey = req->mr->rkey, 375 }; 376 req->inv_cqe.done = rtrs_clt_inv_rkey_done; 377 378 return ib_post_send(con->c.qp, &wr, NULL); 379 } 380 381 static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno, 382 bool notify, bool can_wait) 383 { 384 struct rtrs_clt_con *con = req->con; 385 struct rtrs_clt_sess *sess; 386 int err; 387 388 if (WARN_ON(!req->in_use)) 389 return; 390 if (WARN_ON(!req->con)) 391 return; 392 sess = to_clt_sess(con->c.sess); 393 394 if (req->sg_cnt) { 395 if (unlikely(req->dir == DMA_FROM_DEVICE && req->need_inv)) { 396 /* 397 * We are here to invalidate read requests 398 * ourselves. In normal scenario server should 399 * send INV for all read requests, but 400 * we are here, thus two things could happen: 401 * 402 * 1. this is failover, when errno != 0 403 * and can_wait == 1, 404 * 405 * 2. something totally bad happened and 406 * server forgot to send INV, so we 407 * should do that ourselves. 408 */ 409 410 if (likely(can_wait)) { 411 req->need_inv_comp = true; 412 } else { 413 /* This should be IO path, so always notify */ 414 WARN_ON(!notify); 415 /* Save errno for INV callback */ 416 req->inv_errno = errno; 417 } 418 419 err = rtrs_inv_rkey(req); 420 if (unlikely(err)) { 421 rtrs_err(con->c.sess, "Send INV WR key=%#x: %d\n", 422 req->mr->rkey, err); 423 } else if (likely(can_wait)) { 424 wait_for_completion(&req->inv_comp); 425 } else { 426 /* 427 * Something went wrong, so request will be 428 * completed from INV callback. 429 */ 430 WARN_ON_ONCE(1); 431 432 return; 433 } 434 } 435 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist, 436 req->sg_cnt, req->dir); 437 } 438 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT) 439 atomic_dec(&sess->stats->inflight); 440 441 req->in_use = false; 442 req->con = NULL; 443 444 if (notify) 445 req->conf(req->priv, errno); 446 } 447 448 static int rtrs_post_send_rdma(struct rtrs_clt_con *con, 449 struct rtrs_clt_io_req *req, 450 struct rtrs_rbuf *rbuf, u32 off, 451 u32 imm, struct ib_send_wr *wr) 452 { 453 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 454 enum ib_send_flags flags; 455 struct ib_sge sge; 456 457 if (unlikely(!req->sg_size)) { 458 rtrs_wrn(con->c.sess, 459 "Doing RDMA Write failed, no data supplied\n"); 460 return -EINVAL; 461 } 462 463 /* user data and user message in the first list element */ 464 sge.addr = req->iu->dma_addr; 465 sge.length = req->sg_size; 466 sge.lkey = sess->s.dev->ib_pd->local_dma_lkey; 467 468 /* 469 * From time to time we have to post signalled sends, 470 * or send queue will fill up and only QP reset can help. 471 */ 472 flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ? 473 0 : IB_SEND_SIGNALED; 474 475 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr, 476 req->sg_size, DMA_TO_DEVICE); 477 478 return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, &sge, 1, 479 rbuf->rkey, rbuf->addr + off, 480 imm, flags, wr); 481 } 482 483 static void process_io_rsp(struct rtrs_clt_sess *sess, u32 msg_id, 484 s16 errno, bool w_inval) 485 { 486 struct rtrs_clt_io_req *req; 487 488 if (WARN_ON(msg_id >= sess->queue_depth)) 489 return; 490 491 req = &sess->reqs[msg_id]; 492 /* Drop need_inv if server responded with send with invalidation */ 493 req->need_inv &= !w_inval; 494 complete_rdma_req(req, errno, true, false); 495 } 496 497 static void rtrs_clt_recv_done(struct rtrs_clt_con *con, struct ib_wc *wc) 498 { 499 struct rtrs_iu *iu; 500 int err; 501 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 502 503 WARN_ON(sess->flags != RTRS_MSG_NEW_RKEY_F); 504 iu = container_of(wc->wr_cqe, struct rtrs_iu, 505 cqe); 506 err = rtrs_iu_post_recv(&con->c, iu); 507 if (unlikely(err)) { 508 rtrs_err(con->c.sess, "post iu failed %d\n", err); 509 rtrs_rdma_error_recovery(con); 510 } 511 } 512 513 static void rtrs_clt_rkey_rsp_done(struct rtrs_clt_con *con, struct ib_wc *wc) 514 { 515 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 516 struct rtrs_msg_rkey_rsp *msg; 517 u32 imm_type, imm_payload; 518 bool w_inval = false; 519 struct rtrs_iu *iu; 520 u32 buf_id; 521 int err; 522 523 WARN_ON(sess->flags != RTRS_MSG_NEW_RKEY_F); 524 525 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 526 527 if (unlikely(wc->byte_len < sizeof(*msg))) { 528 rtrs_err(con->c.sess, "rkey response is malformed: size %d\n", 529 wc->byte_len); 530 goto out; 531 } 532 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr, 533 iu->size, DMA_FROM_DEVICE); 534 msg = iu->buf; 535 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_RKEY_RSP)) { 536 rtrs_err(sess->clt, "rkey response is malformed: type %d\n", 537 le16_to_cpu(msg->type)); 538 goto out; 539 } 540 buf_id = le16_to_cpu(msg->buf_id); 541 if (WARN_ON(buf_id >= sess->queue_depth)) 542 goto out; 543 544 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), &imm_type, &imm_payload); 545 if (likely(imm_type == RTRS_IO_RSP_IMM || 546 imm_type == RTRS_IO_RSP_W_INV_IMM)) { 547 u32 msg_id; 548 549 w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM); 550 rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err); 551 552 if (WARN_ON(buf_id != msg_id)) 553 goto out; 554 sess->rbufs[buf_id].rkey = le32_to_cpu(msg->rkey); 555 process_io_rsp(sess, msg_id, err, w_inval); 556 } 557 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, iu->dma_addr, 558 iu->size, DMA_FROM_DEVICE); 559 return rtrs_clt_recv_done(con, wc); 560 out: 561 rtrs_rdma_error_recovery(con); 562 } 563 564 static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc); 565 566 static struct ib_cqe io_comp_cqe = { 567 .done = rtrs_clt_rdma_done 568 }; 569 570 /* 571 * Post x2 empty WRs: first is for this RDMA with IMM, 572 * second is for RECV with INV, which happened earlier. 573 */ 574 static int rtrs_post_recv_empty_x2(struct rtrs_con *con, struct ib_cqe *cqe) 575 { 576 struct ib_recv_wr wr_arr[2], *wr; 577 int i; 578 579 memset(wr_arr, 0, sizeof(wr_arr)); 580 for (i = 0; i < ARRAY_SIZE(wr_arr); i++) { 581 wr = &wr_arr[i]; 582 wr->wr_cqe = cqe; 583 if (i) 584 /* Chain backwards */ 585 wr->next = &wr_arr[i - 1]; 586 } 587 588 return ib_post_recv(con->qp, wr, NULL); 589 } 590 591 static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc) 592 { 593 struct rtrs_clt_con *con = cq->cq_context; 594 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 595 u32 imm_type, imm_payload; 596 bool w_inval = false; 597 int err; 598 599 if (unlikely(wc->status != IB_WC_SUCCESS)) { 600 if (wc->status != IB_WC_WR_FLUSH_ERR) { 601 rtrs_err(sess->clt, "RDMA failed: %s\n", 602 ib_wc_status_msg(wc->status)); 603 rtrs_rdma_error_recovery(con); 604 } 605 return; 606 } 607 rtrs_clt_update_wc_stats(con); 608 609 switch (wc->opcode) { 610 case IB_WC_RECV_RDMA_WITH_IMM: 611 /* 612 * post_recv() RDMA write completions of IO reqs (read/write) 613 * and hb 614 */ 615 if (WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done)) 616 return; 617 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), 618 &imm_type, &imm_payload); 619 if (likely(imm_type == RTRS_IO_RSP_IMM || 620 imm_type == RTRS_IO_RSP_W_INV_IMM)) { 621 u32 msg_id; 622 623 w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM); 624 rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err); 625 626 process_io_rsp(sess, msg_id, err, w_inval); 627 } else if (imm_type == RTRS_HB_MSG_IMM) { 628 WARN_ON(con->c.cid); 629 rtrs_send_hb_ack(&sess->s); 630 if (sess->flags == RTRS_MSG_NEW_RKEY_F) 631 return rtrs_clt_recv_done(con, wc); 632 } else if (imm_type == RTRS_HB_ACK_IMM) { 633 WARN_ON(con->c.cid); 634 sess->s.hb_missed_cnt = 0; 635 if (sess->flags == RTRS_MSG_NEW_RKEY_F) 636 return rtrs_clt_recv_done(con, wc); 637 } else { 638 rtrs_wrn(con->c.sess, "Unknown IMM type %u\n", 639 imm_type); 640 } 641 if (w_inval) 642 /* 643 * Post x2 empty WRs: first is for this RDMA with IMM, 644 * second is for RECV with INV, which happened earlier. 645 */ 646 err = rtrs_post_recv_empty_x2(&con->c, &io_comp_cqe); 647 else 648 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 649 if (unlikely(err)) { 650 rtrs_err(con->c.sess, "rtrs_post_recv_empty(): %d\n", 651 err); 652 rtrs_rdma_error_recovery(con); 653 break; 654 } 655 break; 656 case IB_WC_RECV: 657 /* 658 * Key invalidations from server side 659 */ 660 WARN_ON(!(wc->wc_flags & IB_WC_WITH_INVALIDATE || 661 wc->wc_flags & IB_WC_WITH_IMM)); 662 WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done); 663 if (sess->flags == RTRS_MSG_NEW_RKEY_F) { 664 if (wc->wc_flags & IB_WC_WITH_INVALIDATE) 665 return rtrs_clt_recv_done(con, wc); 666 667 return rtrs_clt_rkey_rsp_done(con, wc); 668 } 669 break; 670 case IB_WC_RDMA_WRITE: 671 /* 672 * post_send() RDMA write completions of IO reqs (read/write) 673 * and hb 674 */ 675 break; 676 677 default: 678 rtrs_wrn(sess->clt, "Unexpected WC type: %d\n", wc->opcode); 679 return; 680 } 681 } 682 683 static int post_recv_io(struct rtrs_clt_con *con, size_t q_size) 684 { 685 int err, i; 686 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 687 688 for (i = 0; i < q_size; i++) { 689 if (sess->flags == RTRS_MSG_NEW_RKEY_F) { 690 struct rtrs_iu *iu = &con->rsp_ius[i]; 691 692 err = rtrs_iu_post_recv(&con->c, iu); 693 } else { 694 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 695 } 696 if (unlikely(err)) 697 return err; 698 } 699 700 return 0; 701 } 702 703 static int post_recv_sess(struct rtrs_clt_sess *sess) 704 { 705 size_t q_size = 0; 706 int err, cid; 707 708 for (cid = 0; cid < sess->s.con_num; cid++) { 709 if (cid == 0) 710 q_size = SERVICE_CON_QUEUE_DEPTH; 711 else 712 q_size = sess->queue_depth; 713 714 /* 715 * x2 for RDMA read responses + FR key invalidations, 716 * RDMA writes do not require any FR registrations. 717 */ 718 q_size *= 2; 719 720 err = post_recv_io(to_clt_con(sess->s.con[cid]), q_size); 721 if (unlikely(err)) { 722 rtrs_err(sess->clt, "post_recv_io(), err: %d\n", err); 723 return err; 724 } 725 } 726 727 return 0; 728 } 729 730 struct path_it { 731 int i; 732 struct list_head skip_list; 733 struct rtrs_clt *clt; 734 struct rtrs_clt_sess *(*next_path)(struct path_it *it); 735 }; 736 737 /** 738 * list_next_or_null_rr_rcu - get next list element in round-robin fashion. 739 * @head: the head for the list. 740 * @ptr: the list head to take the next element from. 741 * @type: the type of the struct this is embedded in. 742 * @memb: the name of the list_head within the struct. 743 * 744 * Next element returned in round-robin fashion, i.e. head will be skipped, 745 * but if list is observed as empty, NULL will be returned. 746 * 747 * This primitive may safely run concurrently with the _rcu list-mutation 748 * primitives such as list_add_rcu() as long as it's guarded by rcu_read_lock(). 749 */ 750 #define list_next_or_null_rr_rcu(head, ptr, type, memb) \ 751 ({ \ 752 list_next_or_null_rcu(head, ptr, type, memb) ?: \ 753 list_next_or_null_rcu(head, READ_ONCE((ptr)->next), \ 754 type, memb); \ 755 }) 756 757 /** 758 * get_next_path_rr() - Returns path in round-robin fashion. 759 * @it: the path pointer 760 * 761 * Related to @MP_POLICY_RR 762 * 763 * Locks: 764 * rcu_read_lock() must be hold. 765 */ 766 static struct rtrs_clt_sess *get_next_path_rr(struct path_it *it) 767 { 768 struct rtrs_clt_sess __rcu **ppcpu_path; 769 struct rtrs_clt_sess *path; 770 struct rtrs_clt *clt; 771 772 clt = it->clt; 773 774 /* 775 * Here we use two RCU objects: @paths_list and @pcpu_path 776 * pointer. See rtrs_clt_remove_path_from_arr() for details 777 * how that is handled. 778 */ 779 780 ppcpu_path = this_cpu_ptr(clt->pcpu_path); 781 path = rcu_dereference(*ppcpu_path); 782 if (unlikely(!path)) 783 path = list_first_or_null_rcu(&clt->paths_list, 784 typeof(*path), s.entry); 785 else 786 path = list_next_or_null_rr_rcu(&clt->paths_list, 787 &path->s.entry, 788 typeof(*path), 789 s.entry); 790 rcu_assign_pointer(*ppcpu_path, path); 791 792 return path; 793 } 794 795 /** 796 * get_next_path_min_inflight() - Returns path with minimal inflight count. 797 * @it: the path pointer 798 * 799 * Related to @MP_POLICY_MIN_INFLIGHT 800 * 801 * Locks: 802 * rcu_read_lock() must be hold. 803 */ 804 static struct rtrs_clt_sess *get_next_path_min_inflight(struct path_it *it) 805 { 806 struct rtrs_clt_sess *min_path = NULL; 807 struct rtrs_clt *clt = it->clt; 808 struct rtrs_clt_sess *sess; 809 int min_inflight = INT_MAX; 810 int inflight; 811 812 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) { 813 if (unlikely(!list_empty(raw_cpu_ptr(sess->mp_skip_entry)))) 814 continue; 815 816 inflight = atomic_read(&sess->stats->inflight); 817 818 if (inflight < min_inflight) { 819 min_inflight = inflight; 820 min_path = sess; 821 } 822 } 823 824 /* 825 * add the path to the skip list, so that next time we can get 826 * a different one 827 */ 828 if (min_path) 829 list_add(raw_cpu_ptr(min_path->mp_skip_entry), &it->skip_list); 830 831 return min_path; 832 } 833 834 static inline void path_it_init(struct path_it *it, struct rtrs_clt *clt) 835 { 836 INIT_LIST_HEAD(&it->skip_list); 837 it->clt = clt; 838 it->i = 0; 839 840 if (clt->mp_policy == MP_POLICY_RR) 841 it->next_path = get_next_path_rr; 842 else 843 it->next_path = get_next_path_min_inflight; 844 } 845 846 static inline void path_it_deinit(struct path_it *it) 847 { 848 struct list_head *skip, *tmp; 849 /* 850 * The skip_list is used only for the MIN_INFLIGHT policy. 851 * We need to remove paths from it, so that next IO can insert 852 * paths (->mp_skip_entry) into a skip_list again. 853 */ 854 list_for_each_safe(skip, tmp, &it->skip_list) 855 list_del_init(skip); 856 } 857 858 /** 859 * rtrs_clt_init_req() Initialize an rtrs_clt_io_req holding information 860 * about an inflight IO. 861 * The user buffer holding user control message (not data) is copied into 862 * the corresponding buffer of rtrs_iu (req->iu->buf), which later on will 863 * also hold the control message of rtrs. 864 * @req: an io request holding information about IO. 865 * @sess: client session 866 * @conf: conformation callback function to notify upper layer. 867 * @permit: permit for allocation of RDMA remote buffer 868 * @priv: private pointer 869 * @vec: kernel vector containing control message 870 * @usr_len: length of the user message 871 * @sg: scater list for IO data 872 * @sg_cnt: number of scater list entries 873 * @data_len: length of the IO data 874 * @dir: direction of the IO. 875 */ 876 static void rtrs_clt_init_req(struct rtrs_clt_io_req *req, 877 struct rtrs_clt_sess *sess, 878 void (*conf)(void *priv, int errno), 879 struct rtrs_permit *permit, void *priv, 880 const struct kvec *vec, size_t usr_len, 881 struct scatterlist *sg, size_t sg_cnt, 882 size_t data_len, int dir) 883 { 884 struct iov_iter iter; 885 size_t len; 886 887 req->permit = permit; 888 req->in_use = true; 889 req->usr_len = usr_len; 890 req->data_len = data_len; 891 req->sglist = sg; 892 req->sg_cnt = sg_cnt; 893 req->priv = priv; 894 req->dir = dir; 895 req->con = rtrs_permit_to_clt_con(sess, permit); 896 req->conf = conf; 897 req->need_inv = false; 898 req->need_inv_comp = false; 899 req->inv_errno = 0; 900 901 iov_iter_kvec(&iter, READ, vec, 1, usr_len); 902 len = _copy_from_iter(req->iu->buf, usr_len, &iter); 903 WARN_ON(len != usr_len); 904 905 reinit_completion(&req->inv_comp); 906 } 907 908 static struct rtrs_clt_io_req * 909 rtrs_clt_get_req(struct rtrs_clt_sess *sess, 910 void (*conf)(void *priv, int errno), 911 struct rtrs_permit *permit, void *priv, 912 const struct kvec *vec, size_t usr_len, 913 struct scatterlist *sg, size_t sg_cnt, 914 size_t data_len, int dir) 915 { 916 struct rtrs_clt_io_req *req; 917 918 req = &sess->reqs[permit->mem_id]; 919 rtrs_clt_init_req(req, sess, conf, permit, priv, vec, usr_len, 920 sg, sg_cnt, data_len, dir); 921 return req; 922 } 923 924 static struct rtrs_clt_io_req * 925 rtrs_clt_get_copy_req(struct rtrs_clt_sess *alive_sess, 926 struct rtrs_clt_io_req *fail_req) 927 { 928 struct rtrs_clt_io_req *req; 929 struct kvec vec = { 930 .iov_base = fail_req->iu->buf, 931 .iov_len = fail_req->usr_len 932 }; 933 934 req = &alive_sess->reqs[fail_req->permit->mem_id]; 935 rtrs_clt_init_req(req, alive_sess, fail_req->conf, fail_req->permit, 936 fail_req->priv, &vec, fail_req->usr_len, 937 fail_req->sglist, fail_req->sg_cnt, 938 fail_req->data_len, fail_req->dir); 939 return req; 940 } 941 942 static int rtrs_post_rdma_write_sg(struct rtrs_clt_con *con, 943 struct rtrs_clt_io_req *req, 944 struct rtrs_rbuf *rbuf, 945 u32 size, u32 imm) 946 { 947 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 948 struct ib_sge *sge = req->sge; 949 enum ib_send_flags flags; 950 struct scatterlist *sg; 951 size_t num_sge; 952 int i; 953 954 for_each_sg(req->sglist, sg, req->sg_cnt, i) { 955 sge[i].addr = sg_dma_address(sg); 956 sge[i].length = sg_dma_len(sg); 957 sge[i].lkey = sess->s.dev->ib_pd->local_dma_lkey; 958 } 959 sge[i].addr = req->iu->dma_addr; 960 sge[i].length = size; 961 sge[i].lkey = sess->s.dev->ib_pd->local_dma_lkey; 962 963 num_sge = 1 + req->sg_cnt; 964 965 /* 966 * From time to time we have to post signalled sends, 967 * or send queue will fill up and only QP reset can help. 968 */ 969 flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ? 970 0 : IB_SEND_SIGNALED; 971 972 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr, 973 size, DMA_TO_DEVICE); 974 975 return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, sge, num_sge, 976 rbuf->rkey, rbuf->addr, imm, 977 flags, NULL); 978 } 979 980 static int rtrs_clt_write_req(struct rtrs_clt_io_req *req) 981 { 982 struct rtrs_clt_con *con = req->con; 983 struct rtrs_sess *s = con->c.sess; 984 struct rtrs_clt_sess *sess = to_clt_sess(s); 985 struct rtrs_msg_rdma_write *msg; 986 987 struct rtrs_rbuf *rbuf; 988 int ret, count = 0; 989 u32 imm, buf_id; 990 991 const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len; 992 993 if (unlikely(tsize > sess->chunk_size)) { 994 rtrs_wrn(s, "Write request failed, size too big %zu > %d\n", 995 tsize, sess->chunk_size); 996 return -EMSGSIZE; 997 } 998 if (req->sg_cnt) { 999 count = ib_dma_map_sg(sess->s.dev->ib_dev, req->sglist, 1000 req->sg_cnt, req->dir); 1001 if (unlikely(!count)) { 1002 rtrs_wrn(s, "Write request failed, map failed\n"); 1003 return -EINVAL; 1004 } 1005 } 1006 /* put rtrs msg after sg and user message */ 1007 msg = req->iu->buf + req->usr_len; 1008 msg->type = cpu_to_le16(RTRS_MSG_WRITE); 1009 msg->usr_len = cpu_to_le16(req->usr_len); 1010 1011 /* rtrs message on server side will be after user data and message */ 1012 imm = req->permit->mem_off + req->data_len + req->usr_len; 1013 imm = rtrs_to_io_req_imm(imm); 1014 buf_id = req->permit->mem_id; 1015 req->sg_size = tsize; 1016 rbuf = &sess->rbufs[buf_id]; 1017 1018 /* 1019 * Update stats now, after request is successfully sent it is not 1020 * safe anymore to touch it. 1021 */ 1022 rtrs_clt_update_all_stats(req, WRITE); 1023 1024 ret = rtrs_post_rdma_write_sg(req->con, req, rbuf, 1025 req->usr_len + sizeof(*msg), 1026 imm); 1027 if (unlikely(ret)) { 1028 rtrs_err(s, "Write request failed: %d\n", ret); 1029 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT) 1030 atomic_dec(&sess->stats->inflight); 1031 if (req->sg_cnt) 1032 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist, 1033 req->sg_cnt, req->dir); 1034 } 1035 1036 return ret; 1037 } 1038 1039 static int rtrs_map_sg_fr(struct rtrs_clt_io_req *req, size_t count) 1040 { 1041 int nr; 1042 1043 /* Align the MR to a 4K page size to match the block virt boundary */ 1044 nr = ib_map_mr_sg(req->mr, req->sglist, count, NULL, SZ_4K); 1045 if (nr < 0) 1046 return nr; 1047 if (unlikely(nr < req->sg_cnt)) 1048 return -EINVAL; 1049 ib_update_fast_reg_key(req->mr, ib_inc_rkey(req->mr->rkey)); 1050 1051 return nr; 1052 } 1053 1054 static int rtrs_clt_read_req(struct rtrs_clt_io_req *req) 1055 { 1056 struct rtrs_clt_con *con = req->con; 1057 struct rtrs_sess *s = con->c.sess; 1058 struct rtrs_clt_sess *sess = to_clt_sess(s); 1059 struct rtrs_msg_rdma_read *msg; 1060 struct rtrs_ib_dev *dev; 1061 1062 struct ib_reg_wr rwr; 1063 struct ib_send_wr *wr = NULL; 1064 1065 int ret, count = 0; 1066 u32 imm, buf_id; 1067 1068 const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len; 1069 1070 s = &sess->s; 1071 dev = sess->s.dev; 1072 1073 if (unlikely(tsize > sess->chunk_size)) { 1074 rtrs_wrn(s, 1075 "Read request failed, message size is %zu, bigger than CHUNK_SIZE %d\n", 1076 tsize, sess->chunk_size); 1077 return -EMSGSIZE; 1078 } 1079 1080 if (req->sg_cnt) { 1081 count = ib_dma_map_sg(dev->ib_dev, req->sglist, req->sg_cnt, 1082 req->dir); 1083 if (unlikely(!count)) { 1084 rtrs_wrn(s, 1085 "Read request failed, dma map failed\n"); 1086 return -EINVAL; 1087 } 1088 } 1089 /* put our message into req->buf after user message*/ 1090 msg = req->iu->buf + req->usr_len; 1091 msg->type = cpu_to_le16(RTRS_MSG_READ); 1092 msg->usr_len = cpu_to_le16(req->usr_len); 1093 1094 if (count) { 1095 ret = rtrs_map_sg_fr(req, count); 1096 if (ret < 0) { 1097 rtrs_err_rl(s, 1098 "Read request failed, failed to map fast reg. data, err: %d\n", 1099 ret); 1100 ib_dma_unmap_sg(dev->ib_dev, req->sglist, req->sg_cnt, 1101 req->dir); 1102 return ret; 1103 } 1104 rwr = (struct ib_reg_wr) { 1105 .wr.opcode = IB_WR_REG_MR, 1106 .wr.wr_cqe = &fast_reg_cqe, 1107 .mr = req->mr, 1108 .key = req->mr->rkey, 1109 .access = (IB_ACCESS_LOCAL_WRITE | 1110 IB_ACCESS_REMOTE_WRITE), 1111 }; 1112 wr = &rwr.wr; 1113 1114 msg->sg_cnt = cpu_to_le16(1); 1115 msg->flags = cpu_to_le16(RTRS_MSG_NEED_INVAL_F); 1116 1117 msg->desc[0].addr = cpu_to_le64(req->mr->iova); 1118 msg->desc[0].key = cpu_to_le32(req->mr->rkey); 1119 msg->desc[0].len = cpu_to_le32(req->mr->length); 1120 1121 /* Further invalidation is required */ 1122 req->need_inv = !!RTRS_MSG_NEED_INVAL_F; 1123 1124 } else { 1125 msg->sg_cnt = 0; 1126 msg->flags = 0; 1127 } 1128 /* 1129 * rtrs message will be after the space reserved for disk data and 1130 * user message 1131 */ 1132 imm = req->permit->mem_off + req->data_len + req->usr_len; 1133 imm = rtrs_to_io_req_imm(imm); 1134 buf_id = req->permit->mem_id; 1135 1136 req->sg_size = sizeof(*msg); 1137 req->sg_size += le16_to_cpu(msg->sg_cnt) * sizeof(struct rtrs_sg_desc); 1138 req->sg_size += req->usr_len; 1139 1140 /* 1141 * Update stats now, after request is successfully sent it is not 1142 * safe anymore to touch it. 1143 */ 1144 rtrs_clt_update_all_stats(req, READ); 1145 1146 ret = rtrs_post_send_rdma(req->con, req, &sess->rbufs[buf_id], 1147 req->data_len, imm, wr); 1148 if (unlikely(ret)) { 1149 rtrs_err(s, "Read request failed: %d\n", ret); 1150 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT) 1151 atomic_dec(&sess->stats->inflight); 1152 req->need_inv = false; 1153 if (req->sg_cnt) 1154 ib_dma_unmap_sg(dev->ib_dev, req->sglist, 1155 req->sg_cnt, req->dir); 1156 } 1157 1158 return ret; 1159 } 1160 1161 /** 1162 * rtrs_clt_failover_req() Try to find an active path for a failed request 1163 * @clt: clt context 1164 * @fail_req: a failed io request. 1165 */ 1166 static int rtrs_clt_failover_req(struct rtrs_clt *clt, 1167 struct rtrs_clt_io_req *fail_req) 1168 { 1169 struct rtrs_clt_sess *alive_sess; 1170 struct rtrs_clt_io_req *req; 1171 int err = -ECONNABORTED; 1172 struct path_it it; 1173 1174 rcu_read_lock(); 1175 for (path_it_init(&it, clt); 1176 (alive_sess = it.next_path(&it)) && it.i < it.clt->paths_num; 1177 it.i++) { 1178 if (unlikely(READ_ONCE(alive_sess->state) != 1179 RTRS_CLT_CONNECTED)) 1180 continue; 1181 req = rtrs_clt_get_copy_req(alive_sess, fail_req); 1182 if (req->dir == DMA_TO_DEVICE) 1183 err = rtrs_clt_write_req(req); 1184 else 1185 err = rtrs_clt_read_req(req); 1186 if (unlikely(err)) { 1187 req->in_use = false; 1188 continue; 1189 } 1190 /* Success path */ 1191 rtrs_clt_inc_failover_cnt(alive_sess->stats); 1192 break; 1193 } 1194 path_it_deinit(&it); 1195 rcu_read_unlock(); 1196 1197 return err; 1198 } 1199 1200 static void fail_all_outstanding_reqs(struct rtrs_clt_sess *sess) 1201 { 1202 struct rtrs_clt *clt = sess->clt; 1203 struct rtrs_clt_io_req *req; 1204 int i, err; 1205 1206 if (!sess->reqs) 1207 return; 1208 for (i = 0; i < sess->queue_depth; ++i) { 1209 req = &sess->reqs[i]; 1210 if (!req->in_use) 1211 continue; 1212 1213 /* 1214 * Safely (without notification) complete failed request. 1215 * After completion this request is still useble and can 1216 * be failovered to another path. 1217 */ 1218 complete_rdma_req(req, -ECONNABORTED, false, true); 1219 1220 err = rtrs_clt_failover_req(clt, req); 1221 if (unlikely(err)) 1222 /* Failover failed, notify anyway */ 1223 req->conf(req->priv, err); 1224 } 1225 } 1226 1227 static void free_sess_reqs(struct rtrs_clt_sess *sess) 1228 { 1229 struct rtrs_clt_io_req *req; 1230 int i; 1231 1232 if (!sess->reqs) 1233 return; 1234 for (i = 0; i < sess->queue_depth; ++i) { 1235 req = &sess->reqs[i]; 1236 if (req->mr) 1237 ib_dereg_mr(req->mr); 1238 kfree(req->sge); 1239 rtrs_iu_free(req->iu, DMA_TO_DEVICE, 1240 sess->s.dev->ib_dev, 1); 1241 } 1242 kfree(sess->reqs); 1243 sess->reqs = NULL; 1244 } 1245 1246 static int alloc_sess_reqs(struct rtrs_clt_sess *sess) 1247 { 1248 struct rtrs_clt_io_req *req; 1249 struct rtrs_clt *clt = sess->clt; 1250 int i, err = -ENOMEM; 1251 1252 sess->reqs = kcalloc(sess->queue_depth, sizeof(*sess->reqs), 1253 GFP_KERNEL); 1254 if (!sess->reqs) 1255 return -ENOMEM; 1256 1257 for (i = 0; i < sess->queue_depth; ++i) { 1258 req = &sess->reqs[i]; 1259 req->iu = rtrs_iu_alloc(1, sess->max_hdr_size, GFP_KERNEL, 1260 sess->s.dev->ib_dev, 1261 DMA_TO_DEVICE, 1262 rtrs_clt_rdma_done); 1263 if (!req->iu) 1264 goto out; 1265 1266 req->sge = kmalloc_array(clt->max_segments + 1, 1267 sizeof(*req->sge), GFP_KERNEL); 1268 if (!req->sge) 1269 goto out; 1270 1271 req->mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG, 1272 sess->max_pages_per_mr); 1273 if (IS_ERR(req->mr)) { 1274 err = PTR_ERR(req->mr); 1275 req->mr = NULL; 1276 pr_err("Failed to alloc sess->max_pages_per_mr %d\n", 1277 sess->max_pages_per_mr); 1278 goto out; 1279 } 1280 1281 init_completion(&req->inv_comp); 1282 } 1283 1284 return 0; 1285 1286 out: 1287 free_sess_reqs(sess); 1288 1289 return err; 1290 } 1291 1292 static int alloc_permits(struct rtrs_clt *clt) 1293 { 1294 unsigned int chunk_bits; 1295 int err, i; 1296 1297 clt->permits_map = kcalloc(BITS_TO_LONGS(clt->queue_depth), 1298 sizeof(long), GFP_KERNEL); 1299 if (!clt->permits_map) { 1300 err = -ENOMEM; 1301 goto out_err; 1302 } 1303 clt->permits = kcalloc(clt->queue_depth, permit_size(clt), GFP_KERNEL); 1304 if (!clt->permits) { 1305 err = -ENOMEM; 1306 goto err_map; 1307 } 1308 chunk_bits = ilog2(clt->queue_depth - 1) + 1; 1309 for (i = 0; i < clt->queue_depth; i++) { 1310 struct rtrs_permit *permit; 1311 1312 permit = get_permit(clt, i); 1313 permit->mem_id = i; 1314 permit->mem_off = i << (MAX_IMM_PAYL_BITS - chunk_bits); 1315 } 1316 1317 return 0; 1318 1319 err_map: 1320 kfree(clt->permits_map); 1321 clt->permits_map = NULL; 1322 out_err: 1323 return err; 1324 } 1325 1326 static void free_permits(struct rtrs_clt *clt) 1327 { 1328 kfree(clt->permits_map); 1329 clt->permits_map = NULL; 1330 kfree(clt->permits); 1331 clt->permits = NULL; 1332 } 1333 1334 static void query_fast_reg_mode(struct rtrs_clt_sess *sess) 1335 { 1336 struct ib_device *ib_dev; 1337 u64 max_pages_per_mr; 1338 int mr_page_shift; 1339 1340 ib_dev = sess->s.dev->ib_dev; 1341 1342 /* 1343 * Use the smallest page size supported by the HCA, down to a 1344 * minimum of 4096 bytes. We're unlikely to build large sglists 1345 * out of smaller entries. 1346 */ 1347 mr_page_shift = max(12, ffs(ib_dev->attrs.page_size_cap) - 1); 1348 max_pages_per_mr = ib_dev->attrs.max_mr_size; 1349 do_div(max_pages_per_mr, (1ull << mr_page_shift)); 1350 sess->max_pages_per_mr = 1351 min3(sess->max_pages_per_mr, (u32)max_pages_per_mr, 1352 ib_dev->attrs.max_fast_reg_page_list_len); 1353 sess->max_send_sge = ib_dev->attrs.max_send_sge; 1354 } 1355 1356 static bool rtrs_clt_change_state_get_old(struct rtrs_clt_sess *sess, 1357 enum rtrs_clt_state new_state, 1358 enum rtrs_clt_state *old_state) 1359 { 1360 bool changed; 1361 1362 spin_lock_irq(&sess->state_wq.lock); 1363 *old_state = sess->state; 1364 changed = __rtrs_clt_change_state(sess, new_state); 1365 spin_unlock_irq(&sess->state_wq.lock); 1366 1367 return changed; 1368 } 1369 1370 static bool rtrs_clt_change_state(struct rtrs_clt_sess *sess, 1371 enum rtrs_clt_state new_state) 1372 { 1373 enum rtrs_clt_state old_state; 1374 1375 return rtrs_clt_change_state_get_old(sess, new_state, &old_state); 1376 } 1377 1378 static void rtrs_clt_hb_err_handler(struct rtrs_con *c) 1379 { 1380 struct rtrs_clt_con *con = container_of(c, typeof(*con), c); 1381 1382 rtrs_rdma_error_recovery(con); 1383 } 1384 1385 static void rtrs_clt_init_hb(struct rtrs_clt_sess *sess) 1386 { 1387 rtrs_init_hb(&sess->s, &io_comp_cqe, 1388 RTRS_HB_INTERVAL_MS, 1389 RTRS_HB_MISSED_MAX, 1390 rtrs_clt_hb_err_handler, 1391 rtrs_wq); 1392 } 1393 1394 static void rtrs_clt_start_hb(struct rtrs_clt_sess *sess) 1395 { 1396 rtrs_start_hb(&sess->s); 1397 } 1398 1399 static void rtrs_clt_stop_hb(struct rtrs_clt_sess *sess) 1400 { 1401 rtrs_stop_hb(&sess->s); 1402 } 1403 1404 static void rtrs_clt_reconnect_work(struct work_struct *work); 1405 static void rtrs_clt_close_work(struct work_struct *work); 1406 1407 static struct rtrs_clt_sess *alloc_sess(struct rtrs_clt *clt, 1408 const struct rtrs_addr *path, 1409 size_t con_num, u16 max_segments, 1410 size_t max_segment_size) 1411 { 1412 struct rtrs_clt_sess *sess; 1413 int err = -ENOMEM; 1414 int cpu; 1415 1416 sess = kzalloc(sizeof(*sess), GFP_KERNEL); 1417 if (!sess) 1418 goto err; 1419 1420 /* Extra connection for user messages */ 1421 con_num += 1; 1422 1423 sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL); 1424 if (!sess->s.con) 1425 goto err_free_sess; 1426 1427 sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL); 1428 if (!sess->stats) 1429 goto err_free_con; 1430 1431 mutex_init(&sess->init_mutex); 1432 uuid_gen(&sess->s.uuid); 1433 memcpy(&sess->s.dst_addr, path->dst, 1434 rdma_addr_size((struct sockaddr *)path->dst)); 1435 1436 /* 1437 * rdma_resolve_addr() passes src_addr to cma_bind_addr, which 1438 * checks the sa_family to be non-zero. If user passed src_addr=NULL 1439 * the sess->src_addr will contain only zeros, which is then fine. 1440 */ 1441 if (path->src) 1442 memcpy(&sess->s.src_addr, path->src, 1443 rdma_addr_size((struct sockaddr *)path->src)); 1444 strlcpy(sess->s.sessname, clt->sessname, sizeof(sess->s.sessname)); 1445 sess->s.con_num = con_num; 1446 sess->clt = clt; 1447 sess->max_pages_per_mr = max_segments * max_segment_size >> 12; 1448 init_waitqueue_head(&sess->state_wq); 1449 sess->state = RTRS_CLT_CONNECTING; 1450 atomic_set(&sess->connected_cnt, 0); 1451 INIT_WORK(&sess->close_work, rtrs_clt_close_work); 1452 INIT_DELAYED_WORK(&sess->reconnect_dwork, rtrs_clt_reconnect_work); 1453 rtrs_clt_init_hb(sess); 1454 1455 sess->mp_skip_entry = alloc_percpu(typeof(*sess->mp_skip_entry)); 1456 if (!sess->mp_skip_entry) 1457 goto err_free_stats; 1458 1459 for_each_possible_cpu(cpu) 1460 INIT_LIST_HEAD(per_cpu_ptr(sess->mp_skip_entry, cpu)); 1461 1462 err = rtrs_clt_init_stats(sess->stats); 1463 if (err) 1464 goto err_free_percpu; 1465 1466 return sess; 1467 1468 err_free_percpu: 1469 free_percpu(sess->mp_skip_entry); 1470 err_free_stats: 1471 kfree(sess->stats); 1472 err_free_con: 1473 kfree(sess->s.con); 1474 err_free_sess: 1475 kfree(sess); 1476 err: 1477 return ERR_PTR(err); 1478 } 1479 1480 void free_sess(struct rtrs_clt_sess *sess) 1481 { 1482 free_percpu(sess->mp_skip_entry); 1483 mutex_destroy(&sess->init_mutex); 1484 kfree(sess->s.con); 1485 kfree(sess->rbufs); 1486 kfree(sess); 1487 } 1488 1489 static int create_con(struct rtrs_clt_sess *sess, unsigned int cid) 1490 { 1491 struct rtrs_clt_con *con; 1492 1493 con = kzalloc(sizeof(*con), GFP_KERNEL); 1494 if (!con) 1495 return -ENOMEM; 1496 1497 /* Map first two connections to the first CPU */ 1498 con->cpu = (cid ? cid - 1 : 0) % nr_cpu_ids; 1499 con->c.cid = cid; 1500 con->c.sess = &sess->s; 1501 atomic_set(&con->io_cnt, 0); 1502 1503 sess->s.con[cid] = &con->c; 1504 1505 return 0; 1506 } 1507 1508 static void destroy_con(struct rtrs_clt_con *con) 1509 { 1510 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1511 1512 sess->s.con[con->c.cid] = NULL; 1513 kfree(con); 1514 } 1515 1516 static int create_con_cq_qp(struct rtrs_clt_con *con) 1517 { 1518 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1519 u16 wr_queue_size; 1520 int err, cq_vector; 1521 struct rtrs_msg_rkey_rsp *rsp; 1522 1523 /* 1524 * This function can fail, but still destroy_con_cq_qp() should 1525 * be called, this is because create_con_cq_qp() is called on cm 1526 * event path, thus caller/waiter never knows: have we failed before 1527 * create_con_cq_qp() or after. To solve this dilemma without 1528 * creating any additional flags just allow destroy_con_cq_qp() be 1529 * called many times. 1530 */ 1531 1532 if (con->c.cid == 0) { 1533 /* 1534 * One completion for each receive and two for each send 1535 * (send request + registration) 1536 * + 2 for drain and heartbeat 1537 * in case qp gets into error state 1538 */ 1539 wr_queue_size = SERVICE_CON_QUEUE_DEPTH * 3 + 2; 1540 /* We must be the first here */ 1541 if (WARN_ON(sess->s.dev)) 1542 return -EINVAL; 1543 1544 /* 1545 * The whole session uses device from user connection. 1546 * Be careful not to close user connection before ib dev 1547 * is gracefully put. 1548 */ 1549 sess->s.dev = rtrs_ib_dev_find_or_add(con->c.cm_id->device, 1550 &dev_pd); 1551 if (!sess->s.dev) { 1552 rtrs_wrn(sess->clt, 1553 "rtrs_ib_dev_find_get_or_add(): no memory\n"); 1554 return -ENOMEM; 1555 } 1556 sess->s.dev_ref = 1; 1557 query_fast_reg_mode(sess); 1558 } else { 1559 /* 1560 * Here we assume that session members are correctly set. 1561 * This is always true if user connection (cid == 0) is 1562 * established first. 1563 */ 1564 if (WARN_ON(!sess->s.dev)) 1565 return -EINVAL; 1566 if (WARN_ON(!sess->queue_depth)) 1567 return -EINVAL; 1568 1569 /* Shared between connections */ 1570 sess->s.dev_ref++; 1571 wr_queue_size = 1572 min_t(int, sess->s.dev->ib_dev->attrs.max_qp_wr, 1573 /* QD * (REQ + RSP + FR REGS or INVS) + drain */ 1574 sess->queue_depth * 3 + 1); 1575 } 1576 /* alloc iu to recv new rkey reply when server reports flags set */ 1577 if (sess->flags == RTRS_MSG_NEW_RKEY_F || con->c.cid == 0) { 1578 con->rsp_ius = rtrs_iu_alloc(wr_queue_size, sizeof(*rsp), 1579 GFP_KERNEL, sess->s.dev->ib_dev, 1580 DMA_FROM_DEVICE, 1581 rtrs_clt_rdma_done); 1582 if (!con->rsp_ius) 1583 return -ENOMEM; 1584 con->queue_size = wr_queue_size; 1585 } 1586 cq_vector = con->cpu % sess->s.dev->ib_dev->num_comp_vectors; 1587 err = rtrs_cq_qp_create(&sess->s, &con->c, sess->max_send_sge, 1588 cq_vector, wr_queue_size, wr_queue_size, 1589 IB_POLL_SOFTIRQ); 1590 /* 1591 * In case of error we do not bother to clean previous allocations, 1592 * since destroy_con_cq_qp() must be called. 1593 */ 1594 return err; 1595 } 1596 1597 static void destroy_con_cq_qp(struct rtrs_clt_con *con) 1598 { 1599 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1600 1601 /* 1602 * Be careful here: destroy_con_cq_qp() can be called even 1603 * create_con_cq_qp() failed, see comments there. 1604 */ 1605 1606 rtrs_cq_qp_destroy(&con->c); 1607 if (con->rsp_ius) { 1608 rtrs_iu_free(con->rsp_ius, DMA_FROM_DEVICE, 1609 sess->s.dev->ib_dev, con->queue_size); 1610 con->rsp_ius = NULL; 1611 con->queue_size = 0; 1612 } 1613 if (sess->s.dev_ref && !--sess->s.dev_ref) { 1614 rtrs_ib_dev_put(sess->s.dev); 1615 sess->s.dev = NULL; 1616 } 1617 } 1618 1619 static void stop_cm(struct rtrs_clt_con *con) 1620 { 1621 rdma_disconnect(con->c.cm_id); 1622 if (con->c.qp) 1623 ib_drain_qp(con->c.qp); 1624 } 1625 1626 static void destroy_cm(struct rtrs_clt_con *con) 1627 { 1628 rdma_destroy_id(con->c.cm_id); 1629 con->c.cm_id = NULL; 1630 } 1631 1632 static int rtrs_rdma_addr_resolved(struct rtrs_clt_con *con) 1633 { 1634 struct rtrs_sess *s = con->c.sess; 1635 int err; 1636 1637 err = create_con_cq_qp(con); 1638 if (err) { 1639 rtrs_err(s, "create_con_cq_qp(), err: %d\n", err); 1640 return err; 1641 } 1642 err = rdma_resolve_route(con->c.cm_id, RTRS_CONNECT_TIMEOUT_MS); 1643 if (err) { 1644 rtrs_err(s, "Resolving route failed, err: %d\n", err); 1645 destroy_con_cq_qp(con); 1646 } 1647 1648 return err; 1649 } 1650 1651 static int rtrs_rdma_route_resolved(struct rtrs_clt_con *con) 1652 { 1653 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1654 struct rtrs_clt *clt = sess->clt; 1655 struct rtrs_msg_conn_req msg; 1656 struct rdma_conn_param param; 1657 1658 int err; 1659 1660 param = (struct rdma_conn_param) { 1661 .retry_count = 7, 1662 .rnr_retry_count = 7, 1663 .private_data = &msg, 1664 .private_data_len = sizeof(msg), 1665 }; 1666 1667 msg = (struct rtrs_msg_conn_req) { 1668 .magic = cpu_to_le16(RTRS_MAGIC), 1669 .version = cpu_to_le16(RTRS_PROTO_VER), 1670 .cid = cpu_to_le16(con->c.cid), 1671 .cid_num = cpu_to_le16(sess->s.con_num), 1672 .recon_cnt = cpu_to_le16(sess->s.recon_cnt), 1673 }; 1674 uuid_copy(&msg.sess_uuid, &sess->s.uuid); 1675 uuid_copy(&msg.paths_uuid, &clt->paths_uuid); 1676 1677 err = rdma_connect(con->c.cm_id, ¶m); 1678 if (err) 1679 rtrs_err(clt, "rdma_connect(): %d\n", err); 1680 1681 return err; 1682 } 1683 1684 static int rtrs_rdma_conn_established(struct rtrs_clt_con *con, 1685 struct rdma_cm_event *ev) 1686 { 1687 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1688 struct rtrs_clt *clt = sess->clt; 1689 const struct rtrs_msg_conn_rsp *msg; 1690 u16 version, queue_depth; 1691 int errno; 1692 u8 len; 1693 1694 msg = ev->param.conn.private_data; 1695 len = ev->param.conn.private_data_len; 1696 if (len < sizeof(*msg)) { 1697 rtrs_err(clt, "Invalid RTRS connection response\n"); 1698 return -ECONNRESET; 1699 } 1700 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) { 1701 rtrs_err(clt, "Invalid RTRS magic\n"); 1702 return -ECONNRESET; 1703 } 1704 version = le16_to_cpu(msg->version); 1705 if (version >> 8 != RTRS_PROTO_VER_MAJOR) { 1706 rtrs_err(clt, "Unsupported major RTRS version: %d, expected %d\n", 1707 version >> 8, RTRS_PROTO_VER_MAJOR); 1708 return -ECONNRESET; 1709 } 1710 errno = le16_to_cpu(msg->errno); 1711 if (errno) { 1712 rtrs_err(clt, "Invalid RTRS message: errno %d\n", 1713 errno); 1714 return -ECONNRESET; 1715 } 1716 if (con->c.cid == 0) { 1717 queue_depth = le16_to_cpu(msg->queue_depth); 1718 1719 if (queue_depth > MAX_SESS_QUEUE_DEPTH) { 1720 rtrs_err(clt, "Invalid RTRS message: queue=%d\n", 1721 queue_depth); 1722 return -ECONNRESET; 1723 } 1724 if (!sess->rbufs || sess->queue_depth < queue_depth) { 1725 kfree(sess->rbufs); 1726 sess->rbufs = kcalloc(queue_depth, sizeof(*sess->rbufs), 1727 GFP_KERNEL); 1728 if (!sess->rbufs) 1729 return -ENOMEM; 1730 } 1731 sess->queue_depth = queue_depth; 1732 sess->max_hdr_size = le32_to_cpu(msg->max_hdr_size); 1733 sess->max_io_size = le32_to_cpu(msg->max_io_size); 1734 sess->flags = le32_to_cpu(msg->flags); 1735 sess->chunk_size = sess->max_io_size + sess->max_hdr_size; 1736 1737 /* 1738 * Global queue depth and IO size is always a minimum. 1739 * If while a reconnection server sends us a value a bit 1740 * higher - client does not care and uses cached minimum. 1741 * 1742 * Since we can have several sessions (paths) restablishing 1743 * connections in parallel, use lock. 1744 */ 1745 mutex_lock(&clt->paths_mutex); 1746 clt->queue_depth = min_not_zero(sess->queue_depth, 1747 clt->queue_depth); 1748 clt->max_io_size = min_not_zero(sess->max_io_size, 1749 clt->max_io_size); 1750 mutex_unlock(&clt->paths_mutex); 1751 1752 /* 1753 * Cache the hca_port and hca_name for sysfs 1754 */ 1755 sess->hca_port = con->c.cm_id->port_num; 1756 scnprintf(sess->hca_name, sizeof(sess->hca_name), 1757 sess->s.dev->ib_dev->name); 1758 sess->s.src_addr = con->c.cm_id->route.addr.src_addr; 1759 } 1760 1761 return 0; 1762 } 1763 1764 static inline void flag_success_on_conn(struct rtrs_clt_con *con) 1765 { 1766 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1767 1768 atomic_inc(&sess->connected_cnt); 1769 con->cm_err = 1; 1770 } 1771 1772 static int rtrs_rdma_conn_rejected(struct rtrs_clt_con *con, 1773 struct rdma_cm_event *ev) 1774 { 1775 struct rtrs_sess *s = con->c.sess; 1776 const struct rtrs_msg_conn_rsp *msg; 1777 const char *rej_msg; 1778 int status, errno; 1779 u8 data_len; 1780 1781 status = ev->status; 1782 rej_msg = rdma_reject_msg(con->c.cm_id, status); 1783 msg = rdma_consumer_reject_data(con->c.cm_id, ev, &data_len); 1784 1785 if (msg && data_len >= sizeof(*msg)) { 1786 errno = (int16_t)le16_to_cpu(msg->errno); 1787 if (errno == -EBUSY) 1788 rtrs_err(s, 1789 "Previous session is still exists on the server, please reconnect later\n"); 1790 else 1791 rtrs_err(s, 1792 "Connect rejected: status %d (%s), rtrs errno %d\n", 1793 status, rej_msg, errno); 1794 } else { 1795 rtrs_err(s, 1796 "Connect rejected but with malformed message: status %d (%s)\n", 1797 status, rej_msg); 1798 } 1799 1800 return -ECONNRESET; 1801 } 1802 1803 static void rtrs_clt_close_conns(struct rtrs_clt_sess *sess, bool wait) 1804 { 1805 if (rtrs_clt_change_state(sess, RTRS_CLT_CLOSING)) 1806 queue_work(rtrs_wq, &sess->close_work); 1807 if (wait) 1808 flush_work(&sess->close_work); 1809 } 1810 1811 static inline void flag_error_on_conn(struct rtrs_clt_con *con, int cm_err) 1812 { 1813 if (con->cm_err == 1) { 1814 struct rtrs_clt_sess *sess; 1815 1816 sess = to_clt_sess(con->c.sess); 1817 if (atomic_dec_and_test(&sess->connected_cnt)) 1818 1819 wake_up(&sess->state_wq); 1820 } 1821 con->cm_err = cm_err; 1822 } 1823 1824 static int rtrs_clt_rdma_cm_handler(struct rdma_cm_id *cm_id, 1825 struct rdma_cm_event *ev) 1826 { 1827 struct rtrs_clt_con *con = cm_id->context; 1828 struct rtrs_sess *s = con->c.sess; 1829 struct rtrs_clt_sess *sess = to_clt_sess(s); 1830 int cm_err = 0; 1831 1832 switch (ev->event) { 1833 case RDMA_CM_EVENT_ADDR_RESOLVED: 1834 cm_err = rtrs_rdma_addr_resolved(con); 1835 break; 1836 case RDMA_CM_EVENT_ROUTE_RESOLVED: 1837 cm_err = rtrs_rdma_route_resolved(con); 1838 break; 1839 case RDMA_CM_EVENT_ESTABLISHED: 1840 con->cm_err = rtrs_rdma_conn_established(con, ev); 1841 if (likely(!con->cm_err)) { 1842 /* 1843 * Report success and wake up. Here we abuse state_wq, 1844 * i.e. wake up without state change, but we set cm_err. 1845 */ 1846 flag_success_on_conn(con); 1847 wake_up(&sess->state_wq); 1848 return 0; 1849 } 1850 break; 1851 case RDMA_CM_EVENT_REJECTED: 1852 cm_err = rtrs_rdma_conn_rejected(con, ev); 1853 break; 1854 case RDMA_CM_EVENT_CONNECT_ERROR: 1855 case RDMA_CM_EVENT_UNREACHABLE: 1856 rtrs_wrn(s, "CM error event %d\n", ev->event); 1857 cm_err = -ECONNRESET; 1858 break; 1859 case RDMA_CM_EVENT_ADDR_ERROR: 1860 case RDMA_CM_EVENT_ROUTE_ERROR: 1861 cm_err = -EHOSTUNREACH; 1862 break; 1863 case RDMA_CM_EVENT_DISCONNECTED: 1864 case RDMA_CM_EVENT_ADDR_CHANGE: 1865 case RDMA_CM_EVENT_TIMEWAIT_EXIT: 1866 cm_err = -ECONNRESET; 1867 break; 1868 case RDMA_CM_EVENT_DEVICE_REMOVAL: 1869 /* 1870 * Device removal is a special case. Queue close and return 0. 1871 */ 1872 rtrs_clt_close_conns(sess, false); 1873 return 0; 1874 default: 1875 rtrs_err(s, "Unexpected RDMA CM event (%d)\n", ev->event); 1876 cm_err = -ECONNRESET; 1877 break; 1878 } 1879 1880 if (cm_err) { 1881 /* 1882 * cm error makes sense only on connection establishing, 1883 * in other cases we rely on normal procedure of reconnecting. 1884 */ 1885 flag_error_on_conn(con, cm_err); 1886 rtrs_rdma_error_recovery(con); 1887 } 1888 1889 return 0; 1890 } 1891 1892 static int create_cm(struct rtrs_clt_con *con) 1893 { 1894 struct rtrs_sess *s = con->c.sess; 1895 struct rtrs_clt_sess *sess = to_clt_sess(s); 1896 struct rdma_cm_id *cm_id; 1897 int err; 1898 1899 cm_id = rdma_create_id(&init_net, rtrs_clt_rdma_cm_handler, con, 1900 sess->s.dst_addr.ss_family == AF_IB ? 1901 RDMA_PS_IB : RDMA_PS_TCP, IB_QPT_RC); 1902 if (IS_ERR(cm_id)) { 1903 err = PTR_ERR(cm_id); 1904 rtrs_err(s, "Failed to create CM ID, err: %d\n", err); 1905 1906 return err; 1907 } 1908 con->c.cm_id = cm_id; 1909 con->cm_err = 0; 1910 /* allow the port to be reused */ 1911 err = rdma_set_reuseaddr(cm_id, 1); 1912 if (err != 0) { 1913 rtrs_err(s, "Set address reuse failed, err: %d\n", err); 1914 goto destroy_cm; 1915 } 1916 err = rdma_resolve_addr(cm_id, (struct sockaddr *)&sess->s.src_addr, 1917 (struct sockaddr *)&sess->s.dst_addr, 1918 RTRS_CONNECT_TIMEOUT_MS); 1919 if (err) { 1920 rtrs_err(s, "Failed to resolve address, err: %d\n", err); 1921 goto destroy_cm; 1922 } 1923 /* 1924 * Combine connection status and session events. This is needed 1925 * for waiting two possible cases: cm_err has something meaningful 1926 * or session state was really changed to error by device removal. 1927 */ 1928 err = wait_event_interruptible_timeout( 1929 sess->state_wq, 1930 con->cm_err || sess->state != RTRS_CLT_CONNECTING, 1931 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS)); 1932 if (err == 0 || err == -ERESTARTSYS) { 1933 if (err == 0) 1934 err = -ETIMEDOUT; 1935 /* Timedout or interrupted */ 1936 goto errr; 1937 } 1938 if (con->cm_err < 0) { 1939 err = con->cm_err; 1940 goto errr; 1941 } 1942 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTING) { 1943 /* Device removal */ 1944 err = -ECONNABORTED; 1945 goto errr; 1946 } 1947 1948 return 0; 1949 1950 errr: 1951 stop_cm(con); 1952 /* Is safe to call destroy if cq_qp is not inited */ 1953 destroy_con_cq_qp(con); 1954 destroy_cm: 1955 destroy_cm(con); 1956 1957 return err; 1958 } 1959 1960 static void rtrs_clt_sess_up(struct rtrs_clt_sess *sess) 1961 { 1962 struct rtrs_clt *clt = sess->clt; 1963 int up; 1964 1965 /* 1966 * We can fire RECONNECTED event only when all paths were 1967 * connected on rtrs_clt_open(), then each was disconnected 1968 * and the first one connected again. That's why this nasty 1969 * game with counter value. 1970 */ 1971 1972 mutex_lock(&clt->paths_ev_mutex); 1973 up = ++clt->paths_up; 1974 /* 1975 * Here it is safe to access paths num directly since up counter 1976 * is greater than MAX_PATHS_NUM only while rtrs_clt_open() is 1977 * in progress, thus paths removals are impossible. 1978 */ 1979 if (up > MAX_PATHS_NUM && up == MAX_PATHS_NUM + clt->paths_num) 1980 clt->paths_up = clt->paths_num; 1981 else if (up == 1) 1982 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_RECONNECTED); 1983 mutex_unlock(&clt->paths_ev_mutex); 1984 1985 /* Mark session as established */ 1986 sess->established = true; 1987 sess->reconnect_attempts = 0; 1988 sess->stats->reconnects.successful_cnt++; 1989 } 1990 1991 static void rtrs_clt_sess_down(struct rtrs_clt_sess *sess) 1992 { 1993 struct rtrs_clt *clt = sess->clt; 1994 1995 if (!sess->established) 1996 return; 1997 1998 sess->established = false; 1999 mutex_lock(&clt->paths_ev_mutex); 2000 WARN_ON(!clt->paths_up); 2001 if (--clt->paths_up == 0) 2002 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_DISCONNECTED); 2003 mutex_unlock(&clt->paths_ev_mutex); 2004 } 2005 2006 static void rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_sess *sess) 2007 { 2008 struct rtrs_clt_con *con; 2009 unsigned int cid; 2010 2011 WARN_ON(READ_ONCE(sess->state) == RTRS_CLT_CONNECTED); 2012 2013 /* 2014 * Possible race with rtrs_clt_open(), when DEVICE_REMOVAL comes 2015 * exactly in between. Start destroying after it finishes. 2016 */ 2017 mutex_lock(&sess->init_mutex); 2018 mutex_unlock(&sess->init_mutex); 2019 2020 /* 2021 * All IO paths must observe !CONNECTED state before we 2022 * free everything. 2023 */ 2024 synchronize_rcu(); 2025 2026 rtrs_clt_stop_hb(sess); 2027 2028 /* 2029 * The order it utterly crucial: firstly disconnect and complete all 2030 * rdma requests with error (thus set in_use=false for requests), 2031 * then fail outstanding requests checking in_use for each, and 2032 * eventually notify upper layer about session disconnection. 2033 */ 2034 2035 for (cid = 0; cid < sess->s.con_num; cid++) { 2036 if (!sess->s.con[cid]) 2037 break; 2038 con = to_clt_con(sess->s.con[cid]); 2039 stop_cm(con); 2040 } 2041 fail_all_outstanding_reqs(sess); 2042 free_sess_reqs(sess); 2043 rtrs_clt_sess_down(sess); 2044 2045 /* 2046 * Wait for graceful shutdown, namely when peer side invokes 2047 * rdma_disconnect(). 'connected_cnt' is decremented only on 2048 * CM events, thus if other side had crashed and hb has detected 2049 * something is wrong, here we will stuck for exactly timeout ms, 2050 * since CM does not fire anything. That is fine, we are not in 2051 * hurry. 2052 */ 2053 wait_event_timeout(sess->state_wq, !atomic_read(&sess->connected_cnt), 2054 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS)); 2055 2056 for (cid = 0; cid < sess->s.con_num; cid++) { 2057 if (!sess->s.con[cid]) 2058 break; 2059 con = to_clt_con(sess->s.con[cid]); 2060 destroy_con_cq_qp(con); 2061 destroy_cm(con); 2062 destroy_con(con); 2063 } 2064 } 2065 2066 static inline bool xchg_sessions(struct rtrs_clt_sess __rcu **rcu_ppcpu_path, 2067 struct rtrs_clt_sess *sess, 2068 struct rtrs_clt_sess *next) 2069 { 2070 struct rtrs_clt_sess **ppcpu_path; 2071 2072 /* Call cmpxchg() without sparse warnings */ 2073 ppcpu_path = (typeof(ppcpu_path))rcu_ppcpu_path; 2074 return sess == cmpxchg(ppcpu_path, sess, next); 2075 } 2076 2077 static void rtrs_clt_remove_path_from_arr(struct rtrs_clt_sess *sess) 2078 { 2079 struct rtrs_clt *clt = sess->clt; 2080 struct rtrs_clt_sess *next; 2081 bool wait_for_grace = false; 2082 int cpu; 2083 2084 mutex_lock(&clt->paths_mutex); 2085 list_del_rcu(&sess->s.entry); 2086 2087 /* Make sure everybody observes path removal. */ 2088 synchronize_rcu(); 2089 2090 /* 2091 * At this point nobody sees @sess in the list, but still we have 2092 * dangling pointer @pcpu_path which _can_ point to @sess. Since 2093 * nobody can observe @sess in the list, we guarantee that IO path 2094 * will not assign @sess to @pcpu_path, i.e. @pcpu_path can be equal 2095 * to @sess, but can never again become @sess. 2096 */ 2097 2098 /* 2099 * Decrement paths number only after grace period, because 2100 * caller of do_each_path() must firstly observe list without 2101 * path and only then decremented paths number. 2102 * 2103 * Otherwise there can be the following situation: 2104 * o Two paths exist and IO is coming. 2105 * o One path is removed: 2106 * CPU#0 CPU#1 2107 * do_each_path(): rtrs_clt_remove_path_from_arr(): 2108 * path = get_next_path() 2109 * ^^^ list_del_rcu(path) 2110 * [!CONNECTED path] clt->paths_num-- 2111 * ^^^^^^^^^ 2112 * load clt->paths_num from 2 to 1 2113 * ^^^^^^^^^ 2114 * sees 1 2115 * 2116 * path is observed as !CONNECTED, but do_each_path() loop 2117 * ends, because expression i < clt->paths_num is false. 2118 */ 2119 clt->paths_num--; 2120 2121 /* 2122 * Get @next connection from current @sess which is going to be 2123 * removed. If @sess is the last element, then @next is NULL. 2124 */ 2125 rcu_read_lock(); 2126 next = list_next_or_null_rr_rcu(&clt->paths_list, &sess->s.entry, 2127 typeof(*next), s.entry); 2128 rcu_read_unlock(); 2129 2130 /* 2131 * @pcpu paths can still point to the path which is going to be 2132 * removed, so change the pointer manually. 2133 */ 2134 for_each_possible_cpu(cpu) { 2135 struct rtrs_clt_sess __rcu **ppcpu_path; 2136 2137 ppcpu_path = per_cpu_ptr(clt->pcpu_path, cpu); 2138 if (rcu_dereference_protected(*ppcpu_path, 2139 lockdep_is_held(&clt->paths_mutex)) != sess) 2140 /* 2141 * synchronize_rcu() was called just after deleting 2142 * entry from the list, thus IO code path cannot 2143 * change pointer back to the pointer which is going 2144 * to be removed, we are safe here. 2145 */ 2146 continue; 2147 2148 /* 2149 * We race with IO code path, which also changes pointer, 2150 * thus we have to be careful not to overwrite it. 2151 */ 2152 if (xchg_sessions(ppcpu_path, sess, next)) 2153 /* 2154 * @ppcpu_path was successfully replaced with @next, 2155 * that means that someone could also pick up the 2156 * @sess and dereferencing it right now, so wait for 2157 * a grace period is required. 2158 */ 2159 wait_for_grace = true; 2160 } 2161 if (wait_for_grace) 2162 synchronize_rcu(); 2163 2164 mutex_unlock(&clt->paths_mutex); 2165 } 2166 2167 static void rtrs_clt_add_path_to_arr(struct rtrs_clt_sess *sess, 2168 struct rtrs_addr *addr) 2169 { 2170 struct rtrs_clt *clt = sess->clt; 2171 2172 mutex_lock(&clt->paths_mutex); 2173 clt->paths_num++; 2174 2175 list_add_tail_rcu(&sess->s.entry, &clt->paths_list); 2176 mutex_unlock(&clt->paths_mutex); 2177 } 2178 2179 static void rtrs_clt_close_work(struct work_struct *work) 2180 { 2181 struct rtrs_clt_sess *sess; 2182 2183 sess = container_of(work, struct rtrs_clt_sess, close_work); 2184 2185 cancel_delayed_work_sync(&sess->reconnect_dwork); 2186 rtrs_clt_stop_and_destroy_conns(sess); 2187 rtrs_clt_change_state(sess, RTRS_CLT_CLOSED); 2188 } 2189 2190 static int init_conns(struct rtrs_clt_sess *sess) 2191 { 2192 unsigned int cid; 2193 int err; 2194 2195 /* 2196 * On every new session connections increase reconnect counter 2197 * to avoid clashes with previous sessions not yet closed 2198 * sessions on a server side. 2199 */ 2200 sess->s.recon_cnt++; 2201 2202 /* Establish all RDMA connections */ 2203 for (cid = 0; cid < sess->s.con_num; cid++) { 2204 err = create_con(sess, cid); 2205 if (err) 2206 goto destroy; 2207 2208 err = create_cm(to_clt_con(sess->s.con[cid])); 2209 if (err) { 2210 destroy_con(to_clt_con(sess->s.con[cid])); 2211 goto destroy; 2212 } 2213 } 2214 err = alloc_sess_reqs(sess); 2215 if (err) 2216 goto destroy; 2217 2218 rtrs_clt_start_hb(sess); 2219 2220 return 0; 2221 2222 destroy: 2223 while (cid--) { 2224 struct rtrs_clt_con *con = to_clt_con(sess->s.con[cid]); 2225 2226 stop_cm(con); 2227 destroy_con_cq_qp(con); 2228 destroy_cm(con); 2229 destroy_con(con); 2230 } 2231 /* 2232 * If we've never taken async path and got an error, say, 2233 * doing rdma_resolve_addr(), switch to CONNECTION_ERR state 2234 * manually to keep reconnecting. 2235 */ 2236 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR); 2237 2238 return err; 2239 } 2240 2241 static void rtrs_clt_info_req_done(struct ib_cq *cq, struct ib_wc *wc) 2242 { 2243 struct rtrs_clt_con *con = cq->cq_context; 2244 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 2245 struct rtrs_iu *iu; 2246 2247 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 2248 rtrs_iu_free(iu, DMA_TO_DEVICE, sess->s.dev->ib_dev, 1); 2249 2250 if (unlikely(wc->status != IB_WC_SUCCESS)) { 2251 rtrs_err(sess->clt, "Sess info request send failed: %s\n", 2252 ib_wc_status_msg(wc->status)); 2253 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR); 2254 return; 2255 } 2256 2257 rtrs_clt_update_wc_stats(con); 2258 } 2259 2260 static int process_info_rsp(struct rtrs_clt_sess *sess, 2261 const struct rtrs_msg_info_rsp *msg) 2262 { 2263 unsigned int sg_cnt, total_len; 2264 int i, sgi; 2265 2266 sg_cnt = le16_to_cpu(msg->sg_cnt); 2267 if (unlikely(!sg_cnt)) 2268 return -EINVAL; 2269 /* 2270 * Check if IB immediate data size is enough to hold the mem_id and 2271 * the offset inside the memory chunk. 2272 */ 2273 if (unlikely((ilog2(sg_cnt - 1) + 1) + 2274 (ilog2(sess->chunk_size - 1) + 1) > 2275 MAX_IMM_PAYL_BITS)) { 2276 rtrs_err(sess->clt, 2277 "RDMA immediate size (%db) not enough to encode %d buffers of size %dB\n", 2278 MAX_IMM_PAYL_BITS, sg_cnt, sess->chunk_size); 2279 return -EINVAL; 2280 } 2281 if (unlikely(!sg_cnt || (sess->queue_depth % sg_cnt))) { 2282 rtrs_err(sess->clt, "Incorrect sg_cnt %d, is not multiple\n", 2283 sg_cnt); 2284 return -EINVAL; 2285 } 2286 total_len = 0; 2287 for (sgi = 0, i = 0; sgi < sg_cnt && i < sess->queue_depth; sgi++) { 2288 const struct rtrs_sg_desc *desc = &msg->desc[sgi]; 2289 u32 len, rkey; 2290 u64 addr; 2291 2292 addr = le64_to_cpu(desc->addr); 2293 rkey = le32_to_cpu(desc->key); 2294 len = le32_to_cpu(desc->len); 2295 2296 total_len += len; 2297 2298 if (unlikely(!len || (len % sess->chunk_size))) { 2299 rtrs_err(sess->clt, "Incorrect [%d].len %d\n", sgi, 2300 len); 2301 return -EINVAL; 2302 } 2303 for ( ; len && i < sess->queue_depth; i++) { 2304 sess->rbufs[i].addr = addr; 2305 sess->rbufs[i].rkey = rkey; 2306 2307 len -= sess->chunk_size; 2308 addr += sess->chunk_size; 2309 } 2310 } 2311 /* Sanity check */ 2312 if (unlikely(sgi != sg_cnt || i != sess->queue_depth)) { 2313 rtrs_err(sess->clt, "Incorrect sg vector, not fully mapped\n"); 2314 return -EINVAL; 2315 } 2316 if (unlikely(total_len != sess->chunk_size * sess->queue_depth)) { 2317 rtrs_err(sess->clt, "Incorrect total_len %d\n", total_len); 2318 return -EINVAL; 2319 } 2320 2321 return 0; 2322 } 2323 2324 static void rtrs_clt_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc) 2325 { 2326 struct rtrs_clt_con *con = cq->cq_context; 2327 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 2328 struct rtrs_msg_info_rsp *msg; 2329 enum rtrs_clt_state state; 2330 struct rtrs_iu *iu; 2331 size_t rx_sz; 2332 int err; 2333 2334 state = RTRS_CLT_CONNECTING_ERR; 2335 2336 WARN_ON(con->c.cid); 2337 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 2338 if (unlikely(wc->status != IB_WC_SUCCESS)) { 2339 rtrs_err(sess->clt, "Sess info response recv failed: %s\n", 2340 ib_wc_status_msg(wc->status)); 2341 goto out; 2342 } 2343 WARN_ON(wc->opcode != IB_WC_RECV); 2344 2345 if (unlikely(wc->byte_len < sizeof(*msg))) { 2346 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n", 2347 wc->byte_len); 2348 goto out; 2349 } 2350 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr, 2351 iu->size, DMA_FROM_DEVICE); 2352 msg = iu->buf; 2353 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_INFO_RSP)) { 2354 rtrs_err(sess->clt, "Sess info response is malformed: type %d\n", 2355 le16_to_cpu(msg->type)); 2356 goto out; 2357 } 2358 rx_sz = sizeof(*msg); 2359 rx_sz += sizeof(msg->desc[0]) * le16_to_cpu(msg->sg_cnt); 2360 if (unlikely(wc->byte_len < rx_sz)) { 2361 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n", 2362 wc->byte_len); 2363 goto out; 2364 } 2365 err = process_info_rsp(sess, msg); 2366 if (unlikely(err)) 2367 goto out; 2368 2369 err = post_recv_sess(sess); 2370 if (unlikely(err)) 2371 goto out; 2372 2373 state = RTRS_CLT_CONNECTED; 2374 2375 out: 2376 rtrs_clt_update_wc_stats(con); 2377 rtrs_iu_free(iu, DMA_FROM_DEVICE, sess->s.dev->ib_dev, 1); 2378 rtrs_clt_change_state(sess, state); 2379 } 2380 2381 static int rtrs_send_sess_info(struct rtrs_clt_sess *sess) 2382 { 2383 struct rtrs_clt_con *usr_con = to_clt_con(sess->s.con[0]); 2384 struct rtrs_msg_info_req *msg; 2385 struct rtrs_iu *tx_iu, *rx_iu; 2386 size_t rx_sz; 2387 int err; 2388 2389 rx_sz = sizeof(struct rtrs_msg_info_rsp); 2390 rx_sz += sizeof(u64) * MAX_SESS_QUEUE_DEPTH; 2391 2392 tx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), GFP_KERNEL, 2393 sess->s.dev->ib_dev, DMA_TO_DEVICE, 2394 rtrs_clt_info_req_done); 2395 rx_iu = rtrs_iu_alloc(1, rx_sz, GFP_KERNEL, sess->s.dev->ib_dev, 2396 DMA_FROM_DEVICE, rtrs_clt_info_rsp_done); 2397 if (unlikely(!tx_iu || !rx_iu)) { 2398 err = -ENOMEM; 2399 goto out; 2400 } 2401 /* Prepare for getting info response */ 2402 err = rtrs_iu_post_recv(&usr_con->c, rx_iu); 2403 if (unlikely(err)) { 2404 rtrs_err(sess->clt, "rtrs_iu_post_recv(), err: %d\n", err); 2405 goto out; 2406 } 2407 rx_iu = NULL; 2408 2409 msg = tx_iu->buf; 2410 msg->type = cpu_to_le16(RTRS_MSG_INFO_REQ); 2411 memcpy(msg->sessname, sess->s.sessname, sizeof(msg->sessname)); 2412 2413 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr, 2414 tx_iu->size, DMA_TO_DEVICE); 2415 2416 /* Send info request */ 2417 err = rtrs_iu_post_send(&usr_con->c, tx_iu, sizeof(*msg), NULL); 2418 if (unlikely(err)) { 2419 rtrs_err(sess->clt, "rtrs_iu_post_send(), err: %d\n", err); 2420 goto out; 2421 } 2422 tx_iu = NULL; 2423 2424 /* Wait for state change */ 2425 wait_event_interruptible_timeout(sess->state_wq, 2426 sess->state != RTRS_CLT_CONNECTING, 2427 msecs_to_jiffies( 2428 RTRS_CONNECT_TIMEOUT_MS)); 2429 if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)) { 2430 if (READ_ONCE(sess->state) == RTRS_CLT_CONNECTING_ERR) 2431 err = -ECONNRESET; 2432 else 2433 err = -ETIMEDOUT; 2434 goto out; 2435 } 2436 2437 out: 2438 if (tx_iu) 2439 rtrs_iu_free(tx_iu, DMA_TO_DEVICE, sess->s.dev->ib_dev, 1); 2440 if (rx_iu) 2441 rtrs_iu_free(rx_iu, DMA_FROM_DEVICE, sess->s.dev->ib_dev, 1); 2442 if (unlikely(err)) 2443 /* If we've never taken async path because of malloc problems */ 2444 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR); 2445 2446 return err; 2447 } 2448 2449 /** 2450 * init_sess() - establishes all session connections and does handshake 2451 * @sess: client session. 2452 * In case of error full close or reconnect procedure should be taken, 2453 * because reconnect or close async works can be started. 2454 */ 2455 static int init_sess(struct rtrs_clt_sess *sess) 2456 { 2457 int err; 2458 2459 mutex_lock(&sess->init_mutex); 2460 err = init_conns(sess); 2461 if (err) { 2462 rtrs_err(sess->clt, "init_conns(), err: %d\n", err); 2463 goto out; 2464 } 2465 err = rtrs_send_sess_info(sess); 2466 if (err) { 2467 rtrs_err(sess->clt, "rtrs_send_sess_info(), err: %d\n", err); 2468 goto out; 2469 } 2470 rtrs_clt_sess_up(sess); 2471 out: 2472 mutex_unlock(&sess->init_mutex); 2473 2474 return err; 2475 } 2476 2477 static void rtrs_clt_reconnect_work(struct work_struct *work) 2478 { 2479 struct rtrs_clt_sess *sess; 2480 struct rtrs_clt *clt; 2481 unsigned int delay_ms; 2482 int err; 2483 2484 sess = container_of(to_delayed_work(work), struct rtrs_clt_sess, 2485 reconnect_dwork); 2486 clt = sess->clt; 2487 2488 if (READ_ONCE(sess->state) != RTRS_CLT_RECONNECTING) 2489 return; 2490 2491 if (sess->reconnect_attempts >= clt->max_reconnect_attempts) { 2492 /* Close a session completely if max attempts is reached */ 2493 rtrs_clt_close_conns(sess, false); 2494 return; 2495 } 2496 sess->reconnect_attempts++; 2497 2498 /* Stop everything */ 2499 rtrs_clt_stop_and_destroy_conns(sess); 2500 msleep(RTRS_RECONNECT_BACKOFF); 2501 if (rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING)) { 2502 err = init_sess(sess); 2503 if (err) 2504 goto reconnect_again; 2505 } 2506 2507 return; 2508 2509 reconnect_again: 2510 if (rtrs_clt_change_state(sess, RTRS_CLT_RECONNECTING)) { 2511 sess->stats->reconnects.fail_cnt++; 2512 delay_ms = clt->reconnect_delay_sec * 1000; 2513 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 2514 msecs_to_jiffies(delay_ms + 2515 prandom_u32() % 2516 RTRS_RECONNECT_SEED)); 2517 } 2518 } 2519 2520 static void rtrs_clt_dev_release(struct device *dev) 2521 { 2522 struct rtrs_clt *clt = container_of(dev, struct rtrs_clt, dev); 2523 2524 kfree(clt); 2525 } 2526 2527 static struct rtrs_clt *alloc_clt(const char *sessname, size_t paths_num, 2528 u16 port, size_t pdu_sz, void *priv, 2529 void (*link_ev)(void *priv, 2530 enum rtrs_clt_link_ev ev), 2531 unsigned int max_segments, 2532 size_t max_segment_size, 2533 unsigned int reconnect_delay_sec, 2534 unsigned int max_reconnect_attempts) 2535 { 2536 struct rtrs_clt *clt; 2537 int err; 2538 2539 if (!paths_num || paths_num > MAX_PATHS_NUM) 2540 return ERR_PTR(-EINVAL); 2541 2542 if (strlen(sessname) >= sizeof(clt->sessname)) 2543 return ERR_PTR(-EINVAL); 2544 2545 clt = kzalloc(sizeof(*clt), GFP_KERNEL); 2546 if (!clt) 2547 return ERR_PTR(-ENOMEM); 2548 2549 clt->pcpu_path = alloc_percpu(typeof(*clt->pcpu_path)); 2550 if (!clt->pcpu_path) { 2551 kfree(clt); 2552 return ERR_PTR(-ENOMEM); 2553 } 2554 2555 uuid_gen(&clt->paths_uuid); 2556 INIT_LIST_HEAD_RCU(&clt->paths_list); 2557 clt->paths_num = paths_num; 2558 clt->paths_up = MAX_PATHS_NUM; 2559 clt->port = port; 2560 clt->pdu_sz = pdu_sz; 2561 clt->max_segments = max_segments; 2562 clt->max_segment_size = max_segment_size; 2563 clt->reconnect_delay_sec = reconnect_delay_sec; 2564 clt->max_reconnect_attempts = max_reconnect_attempts; 2565 clt->priv = priv; 2566 clt->link_ev = link_ev; 2567 clt->mp_policy = MP_POLICY_MIN_INFLIGHT; 2568 strlcpy(clt->sessname, sessname, sizeof(clt->sessname)); 2569 init_waitqueue_head(&clt->permits_wait); 2570 mutex_init(&clt->paths_ev_mutex); 2571 mutex_init(&clt->paths_mutex); 2572 2573 clt->dev.class = rtrs_clt_dev_class; 2574 clt->dev.release = rtrs_clt_dev_release; 2575 err = dev_set_name(&clt->dev, "%s", sessname); 2576 if (err) { 2577 free_percpu(clt->pcpu_path); 2578 kfree(clt); 2579 return ERR_PTR(err); 2580 } 2581 /* 2582 * Suppress user space notification until 2583 * sysfs files are created 2584 */ 2585 dev_set_uevent_suppress(&clt->dev, true); 2586 err = device_register(&clt->dev); 2587 if (err) { 2588 free_percpu(clt->pcpu_path); 2589 put_device(&clt->dev); 2590 return ERR_PTR(err); 2591 } 2592 2593 clt->kobj_paths = kobject_create_and_add("paths", &clt->dev.kobj); 2594 if (!clt->kobj_paths) { 2595 free_percpu(clt->pcpu_path); 2596 device_unregister(&clt->dev); 2597 return NULL; 2598 } 2599 err = rtrs_clt_create_sysfs_root_files(clt); 2600 if (err) { 2601 free_percpu(clt->pcpu_path); 2602 kobject_del(clt->kobj_paths); 2603 kobject_put(clt->kobj_paths); 2604 device_unregister(&clt->dev); 2605 return ERR_PTR(err); 2606 } 2607 dev_set_uevent_suppress(&clt->dev, false); 2608 kobject_uevent(&clt->dev.kobj, KOBJ_ADD); 2609 2610 return clt; 2611 } 2612 2613 static void wait_for_inflight_permits(struct rtrs_clt *clt) 2614 { 2615 if (clt->permits_map) { 2616 size_t sz = clt->queue_depth; 2617 2618 wait_event(clt->permits_wait, 2619 find_first_bit(clt->permits_map, sz) >= sz); 2620 } 2621 } 2622 2623 static void free_clt(struct rtrs_clt *clt) 2624 { 2625 wait_for_inflight_permits(clt); 2626 free_permits(clt); 2627 free_percpu(clt->pcpu_path); 2628 mutex_destroy(&clt->paths_ev_mutex); 2629 mutex_destroy(&clt->paths_mutex); 2630 /* release callback will free clt in last put */ 2631 device_unregister(&clt->dev); 2632 } 2633 2634 /** 2635 * rtrs_clt_open() - Open a session to an RTRS server 2636 * @ops: holds the link event callback and the private pointer. 2637 * @sessname: name of the session 2638 * @paths: Paths to be established defined by their src and dst addresses 2639 * @paths_num: Number of elements in the @paths array 2640 * @port: port to be used by the RTRS session 2641 * @pdu_sz: Size of extra payload which can be accessed after permit allocation. 2642 * @reconnect_delay_sec: time between reconnect tries 2643 * @max_segments: Max. number of segments per IO request 2644 * @max_segment_size: Max. size of one segment 2645 * @max_reconnect_attempts: Number of times to reconnect on error before giving 2646 * up, 0 for * disabled, -1 for forever 2647 * 2648 * Starts session establishment with the rtrs_server. The function can block 2649 * up to ~2000ms before it returns. 2650 * 2651 * Return a valid pointer on success otherwise PTR_ERR. 2652 */ 2653 struct rtrs_clt *rtrs_clt_open(struct rtrs_clt_ops *ops, 2654 const char *sessname, 2655 const struct rtrs_addr *paths, 2656 size_t paths_num, u16 port, 2657 size_t pdu_sz, u8 reconnect_delay_sec, 2658 u16 max_segments, 2659 size_t max_segment_size, 2660 s16 max_reconnect_attempts) 2661 { 2662 struct rtrs_clt_sess *sess, *tmp; 2663 struct rtrs_clt *clt; 2664 int err, i; 2665 2666 clt = alloc_clt(sessname, paths_num, port, pdu_sz, ops->priv, 2667 ops->link_ev, 2668 max_segments, max_segment_size, reconnect_delay_sec, 2669 max_reconnect_attempts); 2670 if (IS_ERR(clt)) { 2671 err = PTR_ERR(clt); 2672 goto out; 2673 } 2674 for (i = 0; i < paths_num; i++) { 2675 struct rtrs_clt_sess *sess; 2676 2677 sess = alloc_sess(clt, &paths[i], nr_cpu_ids, 2678 max_segments, max_segment_size); 2679 if (IS_ERR(sess)) { 2680 err = PTR_ERR(sess); 2681 goto close_all_sess; 2682 } 2683 list_add_tail_rcu(&sess->s.entry, &clt->paths_list); 2684 2685 err = init_sess(sess); 2686 if (err) { 2687 list_del_rcu(&sess->s.entry); 2688 rtrs_clt_close_conns(sess, true); 2689 free_sess(sess); 2690 goto close_all_sess; 2691 } 2692 2693 err = rtrs_clt_create_sess_files(sess); 2694 if (err) { 2695 list_del_rcu(&sess->s.entry); 2696 rtrs_clt_close_conns(sess, true); 2697 free_sess(sess); 2698 goto close_all_sess; 2699 } 2700 } 2701 err = alloc_permits(clt); 2702 if (err) 2703 goto close_all_sess; 2704 2705 return clt; 2706 2707 close_all_sess: 2708 list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) { 2709 rtrs_clt_destroy_sess_files(sess, NULL); 2710 rtrs_clt_close_conns(sess, true); 2711 kobject_put(&sess->kobj); 2712 } 2713 rtrs_clt_destroy_sysfs_root_files(clt); 2714 rtrs_clt_destroy_sysfs_root_folders(clt); 2715 free_clt(clt); 2716 2717 out: 2718 return ERR_PTR(err); 2719 } 2720 EXPORT_SYMBOL(rtrs_clt_open); 2721 2722 /** 2723 * rtrs_clt_close() - Close a session 2724 * @clt: Session handle. Session is freed upon return. 2725 */ 2726 void rtrs_clt_close(struct rtrs_clt *clt) 2727 { 2728 struct rtrs_clt_sess *sess, *tmp; 2729 2730 /* Firstly forbid sysfs access */ 2731 rtrs_clt_destroy_sysfs_root_files(clt); 2732 rtrs_clt_destroy_sysfs_root_folders(clt); 2733 2734 /* Now it is safe to iterate over all paths without locks */ 2735 list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) { 2736 rtrs_clt_destroy_sess_files(sess, NULL); 2737 rtrs_clt_close_conns(sess, true); 2738 kobject_put(&sess->kobj); 2739 } 2740 free_clt(clt); 2741 } 2742 EXPORT_SYMBOL(rtrs_clt_close); 2743 2744 int rtrs_clt_reconnect_from_sysfs(struct rtrs_clt_sess *sess) 2745 { 2746 enum rtrs_clt_state old_state; 2747 int err = -EBUSY; 2748 bool changed; 2749 2750 changed = rtrs_clt_change_state_get_old(sess, RTRS_CLT_RECONNECTING, 2751 &old_state); 2752 if (changed) { 2753 sess->reconnect_attempts = 0; 2754 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 0); 2755 } 2756 if (changed || old_state == RTRS_CLT_RECONNECTING) { 2757 /* 2758 * flush_delayed_work() queues pending work for immediate 2759 * execution, so do the flush if we have queued something 2760 * right now or work is pending. 2761 */ 2762 flush_delayed_work(&sess->reconnect_dwork); 2763 err = (READ_ONCE(sess->state) == 2764 RTRS_CLT_CONNECTED ? 0 : -ENOTCONN); 2765 } 2766 2767 return err; 2768 } 2769 2770 int rtrs_clt_disconnect_from_sysfs(struct rtrs_clt_sess *sess) 2771 { 2772 rtrs_clt_close_conns(sess, true); 2773 2774 return 0; 2775 } 2776 2777 int rtrs_clt_remove_path_from_sysfs(struct rtrs_clt_sess *sess, 2778 const struct attribute *sysfs_self) 2779 { 2780 enum rtrs_clt_state old_state; 2781 bool changed; 2782 2783 /* 2784 * Continue stopping path till state was changed to DEAD or 2785 * state was observed as DEAD: 2786 * 1. State was changed to DEAD - we were fast and nobody 2787 * invoked rtrs_clt_reconnect(), which can again start 2788 * reconnecting. 2789 * 2. State was observed as DEAD - we have someone in parallel 2790 * removing the path. 2791 */ 2792 do { 2793 rtrs_clt_close_conns(sess, true); 2794 changed = rtrs_clt_change_state_get_old(sess, 2795 RTRS_CLT_DEAD, 2796 &old_state); 2797 } while (!changed && old_state != RTRS_CLT_DEAD); 2798 2799 if (likely(changed)) { 2800 rtrs_clt_destroy_sess_files(sess, sysfs_self); 2801 rtrs_clt_remove_path_from_arr(sess); 2802 kobject_put(&sess->kobj); 2803 } 2804 2805 return 0; 2806 } 2807 2808 void rtrs_clt_set_max_reconnect_attempts(struct rtrs_clt *clt, int value) 2809 { 2810 clt->max_reconnect_attempts = (unsigned int)value; 2811 } 2812 2813 int rtrs_clt_get_max_reconnect_attempts(const struct rtrs_clt *clt) 2814 { 2815 return (int)clt->max_reconnect_attempts; 2816 } 2817 2818 /** 2819 * rtrs_clt_request() - Request data transfer to/from server via RDMA. 2820 * 2821 * @dir: READ/WRITE 2822 * @ops: callback function to be called as confirmation, and the pointer. 2823 * @clt: Session 2824 * @permit: Preallocated permit 2825 * @vec: Message that is sent to server together with the request. 2826 * Sum of len of all @vec elements limited to <= IO_MSG_SIZE. 2827 * Since the msg is copied internally it can be allocated on stack. 2828 * @nr: Number of elements in @vec. 2829 * @data_len: length of data sent to/from server 2830 * @sg: Pages to be sent/received to/from server. 2831 * @sg_cnt: Number of elements in the @sg 2832 * 2833 * Return: 2834 * 0: Success 2835 * <0: Error 2836 * 2837 * On dir=READ rtrs client will request a data transfer from Server to client. 2838 * The data that the server will respond with will be stored in @sg when 2839 * the user receives an %RTRS_CLT_RDMA_EV_RDMA_REQUEST_WRITE_COMPL event. 2840 * On dir=WRITE rtrs client will rdma write data in sg to server side. 2841 */ 2842 int rtrs_clt_request(int dir, struct rtrs_clt_req_ops *ops, 2843 struct rtrs_clt *clt, struct rtrs_permit *permit, 2844 const struct kvec *vec, size_t nr, size_t data_len, 2845 struct scatterlist *sg, unsigned int sg_cnt) 2846 { 2847 struct rtrs_clt_io_req *req; 2848 struct rtrs_clt_sess *sess; 2849 2850 enum dma_data_direction dma_dir; 2851 int err = -ECONNABORTED, i; 2852 size_t usr_len, hdr_len; 2853 struct path_it it; 2854 2855 /* Get kvec length */ 2856 for (i = 0, usr_len = 0; i < nr; i++) 2857 usr_len += vec[i].iov_len; 2858 2859 if (dir == READ) { 2860 hdr_len = sizeof(struct rtrs_msg_rdma_read) + 2861 sg_cnt * sizeof(struct rtrs_sg_desc); 2862 dma_dir = DMA_FROM_DEVICE; 2863 } else { 2864 hdr_len = sizeof(struct rtrs_msg_rdma_write); 2865 dma_dir = DMA_TO_DEVICE; 2866 } 2867 2868 rcu_read_lock(); 2869 for (path_it_init(&it, clt); 2870 (sess = it.next_path(&it)) && it.i < it.clt->paths_num; it.i++) { 2871 if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)) 2872 continue; 2873 2874 if (unlikely(usr_len + hdr_len > sess->max_hdr_size)) { 2875 rtrs_wrn_rl(sess->clt, 2876 "%s request failed, user message size is %zu and header length %zu, but max size is %u\n", 2877 dir == READ ? "Read" : "Write", 2878 usr_len, hdr_len, sess->max_hdr_size); 2879 err = -EMSGSIZE; 2880 break; 2881 } 2882 req = rtrs_clt_get_req(sess, ops->conf_fn, permit, ops->priv, 2883 vec, usr_len, sg, sg_cnt, data_len, 2884 dma_dir); 2885 if (dir == READ) 2886 err = rtrs_clt_read_req(req); 2887 else 2888 err = rtrs_clt_write_req(req); 2889 if (unlikely(err)) { 2890 req->in_use = false; 2891 continue; 2892 } 2893 /* Success path */ 2894 break; 2895 } 2896 path_it_deinit(&it); 2897 rcu_read_unlock(); 2898 2899 return err; 2900 } 2901 EXPORT_SYMBOL(rtrs_clt_request); 2902 2903 /** 2904 * rtrs_clt_query() - queries RTRS session attributes 2905 *@clt: session pointer 2906 *@attr: query results for session attributes. 2907 * Returns: 2908 * 0 on success 2909 * -ECOMM no connection to the server 2910 */ 2911 int rtrs_clt_query(struct rtrs_clt *clt, struct rtrs_attrs *attr) 2912 { 2913 if (!rtrs_clt_is_connected(clt)) 2914 return -ECOMM; 2915 2916 attr->queue_depth = clt->queue_depth; 2917 attr->max_io_size = clt->max_io_size; 2918 attr->sess_kobj = &clt->dev.kobj; 2919 strlcpy(attr->sessname, clt->sessname, sizeof(attr->sessname)); 2920 2921 return 0; 2922 } 2923 EXPORT_SYMBOL(rtrs_clt_query); 2924 2925 int rtrs_clt_create_path_from_sysfs(struct rtrs_clt *clt, 2926 struct rtrs_addr *addr) 2927 { 2928 struct rtrs_clt_sess *sess; 2929 int err; 2930 2931 sess = alloc_sess(clt, addr, nr_cpu_ids, clt->max_segments, 2932 clt->max_segment_size); 2933 if (IS_ERR(sess)) 2934 return PTR_ERR(sess); 2935 2936 /* 2937 * It is totally safe to add path in CONNECTING state: coming 2938 * IO will never grab it. Also it is very important to add 2939 * path before init, since init fires LINK_CONNECTED event. 2940 */ 2941 rtrs_clt_add_path_to_arr(sess, addr); 2942 2943 err = init_sess(sess); 2944 if (err) 2945 goto close_sess; 2946 2947 err = rtrs_clt_create_sess_files(sess); 2948 if (err) 2949 goto close_sess; 2950 2951 return 0; 2952 2953 close_sess: 2954 rtrs_clt_remove_path_from_arr(sess); 2955 rtrs_clt_close_conns(sess, true); 2956 free_sess(sess); 2957 2958 return err; 2959 } 2960 2961 static int rtrs_clt_ib_dev_init(struct rtrs_ib_dev *dev) 2962 { 2963 if (!(dev->ib_dev->attrs.device_cap_flags & 2964 IB_DEVICE_MEM_MGT_EXTENSIONS)) { 2965 pr_err("Memory registrations not supported.\n"); 2966 return -ENOTSUPP; 2967 } 2968 2969 return 0; 2970 } 2971 2972 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops = { 2973 .init = rtrs_clt_ib_dev_init 2974 }; 2975 2976 static int __init rtrs_client_init(void) 2977 { 2978 rtrs_rdma_dev_pd_init(0, &dev_pd); 2979 2980 rtrs_clt_dev_class = class_create(THIS_MODULE, "rtrs-client"); 2981 if (IS_ERR(rtrs_clt_dev_class)) { 2982 pr_err("Failed to create rtrs-client dev class\n"); 2983 return PTR_ERR(rtrs_clt_dev_class); 2984 } 2985 rtrs_wq = alloc_workqueue("rtrs_client_wq", 0, 0); 2986 if (!rtrs_wq) { 2987 class_destroy(rtrs_clt_dev_class); 2988 return -ENOMEM; 2989 } 2990 2991 return 0; 2992 } 2993 2994 static void __exit rtrs_client_exit(void) 2995 { 2996 destroy_workqueue(rtrs_wq); 2997 class_destroy(rtrs_clt_dev_class); 2998 rtrs_rdma_dev_pd_deinit(&dev_pd); 2999 } 3000 3001 module_init(rtrs_client_init); 3002 module_exit(rtrs_client_exit); 3003