1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Transport Layer 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 #include <linux/rculist.h> 15 #include <linux/random.h> 16 17 #include "rtrs-clt.h" 18 #include "rtrs-log.h" 19 20 #define RTRS_CONNECT_TIMEOUT_MS 30000 21 /* 22 * Wait a bit before trying to reconnect after a failure 23 * in order to give server time to finish clean up which 24 * leads to "false positives" failed reconnect attempts 25 */ 26 #define RTRS_RECONNECT_BACKOFF 1000 27 /* 28 * Wait for additional random time between 0 and 8 seconds 29 * before starting to reconnect to avoid clients reconnecting 30 * all at once in case of a major network outage 31 */ 32 #define RTRS_RECONNECT_SEED 8 33 34 MODULE_DESCRIPTION("RDMA Transport Client"); 35 MODULE_LICENSE("GPL"); 36 37 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops; 38 static struct rtrs_rdma_dev_pd dev_pd = { 39 .ops = &dev_pd_ops 40 }; 41 42 static struct workqueue_struct *rtrs_wq; 43 static struct class *rtrs_clt_dev_class; 44 45 static inline bool rtrs_clt_is_connected(const struct rtrs_clt *clt) 46 { 47 struct rtrs_clt_sess *sess; 48 bool connected = false; 49 50 rcu_read_lock(); 51 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) 52 connected |= READ_ONCE(sess->state) == RTRS_CLT_CONNECTED; 53 rcu_read_unlock(); 54 55 return connected; 56 } 57 58 static struct rtrs_permit * 59 __rtrs_get_permit(struct rtrs_clt *clt, enum rtrs_clt_con_type con_type) 60 { 61 size_t max_depth = clt->queue_depth; 62 struct rtrs_permit *permit; 63 int bit; 64 65 /* 66 * Adapted from null_blk get_tag(). Callers from different cpus may 67 * grab the same bit, since find_first_zero_bit is not atomic. 68 * But then the test_and_set_bit_lock will fail for all the 69 * callers but one, so that they will loop again. 70 * This way an explicit spinlock is not required. 71 */ 72 do { 73 bit = find_first_zero_bit(clt->permits_map, max_depth); 74 if (unlikely(bit >= max_depth)) 75 return NULL; 76 } while (unlikely(test_and_set_bit_lock(bit, clt->permits_map))); 77 78 permit = get_permit(clt, bit); 79 WARN_ON(permit->mem_id != bit); 80 permit->cpu_id = raw_smp_processor_id(); 81 permit->con_type = con_type; 82 83 return permit; 84 } 85 86 static inline void __rtrs_put_permit(struct rtrs_clt *clt, 87 struct rtrs_permit *permit) 88 { 89 clear_bit_unlock(permit->mem_id, clt->permits_map); 90 } 91 92 /** 93 * rtrs_clt_get_permit() - allocates permit for future RDMA operation 94 * @clt: Current session 95 * @con_type: Type of connection to use with the permit 96 * @can_wait: Wait type 97 * 98 * Description: 99 * Allocates permit for the following RDMA operation. Permit is used 100 * to preallocate all resources and to propagate memory pressure 101 * up earlier. 102 * 103 * Context: 104 * Can sleep if @wait == RTRS_TAG_WAIT 105 */ 106 struct rtrs_permit *rtrs_clt_get_permit(struct rtrs_clt *clt, 107 enum rtrs_clt_con_type con_type, 108 int can_wait) 109 { 110 struct rtrs_permit *permit; 111 DEFINE_WAIT(wait); 112 113 permit = __rtrs_get_permit(clt, con_type); 114 if (likely(permit) || !can_wait) 115 return permit; 116 117 do { 118 prepare_to_wait(&clt->permits_wait, &wait, 119 TASK_UNINTERRUPTIBLE); 120 permit = __rtrs_get_permit(clt, con_type); 121 if (likely(permit)) 122 break; 123 124 io_schedule(); 125 } while (1); 126 127 finish_wait(&clt->permits_wait, &wait); 128 129 return permit; 130 } 131 EXPORT_SYMBOL(rtrs_clt_get_permit); 132 133 /** 134 * rtrs_clt_put_permit() - puts allocated permit 135 * @clt: Current session 136 * @permit: Permit to be freed 137 * 138 * Context: 139 * Does not matter 140 */ 141 void rtrs_clt_put_permit(struct rtrs_clt *clt, struct rtrs_permit *permit) 142 { 143 if (WARN_ON(!test_bit(permit->mem_id, clt->permits_map))) 144 return; 145 146 __rtrs_put_permit(clt, permit); 147 148 /* 149 * rtrs_clt_get_permit() adds itself to the &clt->permits_wait list 150 * before calling schedule(). So if rtrs_clt_get_permit() is sleeping 151 * it must have added itself to &clt->permits_wait before 152 * __rtrs_put_permit() finished. 153 * Hence it is safe to guard wake_up() with a waitqueue_active() test. 154 */ 155 if (waitqueue_active(&clt->permits_wait)) 156 wake_up(&clt->permits_wait); 157 } 158 EXPORT_SYMBOL(rtrs_clt_put_permit); 159 160 /** 161 * rtrs_permit_to_clt_con() - returns RDMA connection pointer by the permit 162 * @sess: client session pointer 163 * @permit: permit for the allocation of the RDMA buffer 164 * Note: 165 * IO connection starts from 1. 166 * 0 connection is for user messages. 167 */ 168 static 169 struct rtrs_clt_con *rtrs_permit_to_clt_con(struct rtrs_clt_sess *sess, 170 struct rtrs_permit *permit) 171 { 172 int id = 0; 173 174 if (likely(permit->con_type == RTRS_IO_CON)) 175 id = (permit->cpu_id % (sess->s.con_num - 1)) + 1; 176 177 return to_clt_con(sess->s.con[id]); 178 } 179 180 /** 181 * __rtrs_clt_change_state() - change the session state through session state 182 * machine. 183 * 184 * @sess: client session to change the state of. 185 * @new_state: state to change to. 186 * 187 * returns true if successful, false if the requested state can not be set. 188 * 189 * Locks: 190 * state_wq lock must be hold. 191 */ 192 static bool __rtrs_clt_change_state(struct rtrs_clt_sess *sess, 193 enum rtrs_clt_state new_state) 194 { 195 enum rtrs_clt_state old_state; 196 bool changed = false; 197 198 lockdep_assert_held(&sess->state_wq.lock); 199 200 old_state = sess->state; 201 switch (new_state) { 202 case RTRS_CLT_CONNECTING: 203 switch (old_state) { 204 case RTRS_CLT_RECONNECTING: 205 changed = true; 206 fallthrough; 207 default: 208 break; 209 } 210 break; 211 case RTRS_CLT_RECONNECTING: 212 switch (old_state) { 213 case RTRS_CLT_CONNECTED: 214 case RTRS_CLT_CONNECTING_ERR: 215 case RTRS_CLT_CLOSED: 216 changed = true; 217 fallthrough; 218 default: 219 break; 220 } 221 break; 222 case RTRS_CLT_CONNECTED: 223 switch (old_state) { 224 case RTRS_CLT_CONNECTING: 225 changed = true; 226 fallthrough; 227 default: 228 break; 229 } 230 break; 231 case RTRS_CLT_CONNECTING_ERR: 232 switch (old_state) { 233 case RTRS_CLT_CONNECTING: 234 changed = true; 235 fallthrough; 236 default: 237 break; 238 } 239 break; 240 case RTRS_CLT_CLOSING: 241 switch (old_state) { 242 case RTRS_CLT_CONNECTING: 243 case RTRS_CLT_CONNECTING_ERR: 244 case RTRS_CLT_RECONNECTING: 245 case RTRS_CLT_CONNECTED: 246 changed = true; 247 fallthrough; 248 default: 249 break; 250 } 251 break; 252 case RTRS_CLT_CLOSED: 253 switch (old_state) { 254 case RTRS_CLT_CLOSING: 255 changed = true; 256 fallthrough; 257 default: 258 break; 259 } 260 break; 261 case RTRS_CLT_DEAD: 262 switch (old_state) { 263 case RTRS_CLT_CLOSED: 264 changed = true; 265 fallthrough; 266 default: 267 break; 268 } 269 break; 270 default: 271 break; 272 } 273 if (changed) { 274 sess->state = new_state; 275 wake_up_locked(&sess->state_wq); 276 } 277 278 return changed; 279 } 280 281 static bool rtrs_clt_change_state_from_to(struct rtrs_clt_sess *sess, 282 enum rtrs_clt_state old_state, 283 enum rtrs_clt_state new_state) 284 { 285 bool changed = false; 286 287 spin_lock_irq(&sess->state_wq.lock); 288 if (sess->state == old_state) 289 changed = __rtrs_clt_change_state(sess, new_state); 290 spin_unlock_irq(&sess->state_wq.lock); 291 292 return changed; 293 } 294 295 static void rtrs_rdma_error_recovery(struct rtrs_clt_con *con) 296 { 297 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 298 299 if (rtrs_clt_change_state_from_to(sess, 300 RTRS_CLT_CONNECTED, 301 RTRS_CLT_RECONNECTING)) { 302 struct rtrs_clt *clt = sess->clt; 303 unsigned int delay_ms; 304 305 /* 306 * Normal scenario, reconnect if we were successfully connected 307 */ 308 delay_ms = clt->reconnect_delay_sec * 1000; 309 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 310 msecs_to_jiffies(delay_ms + 311 prandom_u32() % RTRS_RECONNECT_SEED)); 312 } else { 313 /* 314 * Error can happen just on establishing new connection, 315 * so notify waiter with error state, waiter is responsible 316 * for cleaning the rest and reconnect if needed. 317 */ 318 rtrs_clt_change_state_from_to(sess, 319 RTRS_CLT_CONNECTING, 320 RTRS_CLT_CONNECTING_ERR); 321 } 322 } 323 324 static void rtrs_clt_fast_reg_done(struct ib_cq *cq, struct ib_wc *wc) 325 { 326 struct rtrs_clt_con *con = cq->cq_context; 327 328 if (unlikely(wc->status != IB_WC_SUCCESS)) { 329 rtrs_err(con->c.sess, "Failed IB_WR_REG_MR: %s\n", 330 ib_wc_status_msg(wc->status)); 331 rtrs_rdma_error_recovery(con); 332 } 333 } 334 335 static struct ib_cqe fast_reg_cqe = { 336 .done = rtrs_clt_fast_reg_done 337 }; 338 339 static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno, 340 bool notify, bool can_wait); 341 342 static void rtrs_clt_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc) 343 { 344 struct rtrs_clt_io_req *req = 345 container_of(wc->wr_cqe, typeof(*req), inv_cqe); 346 struct rtrs_clt_con *con = cq->cq_context; 347 348 if (unlikely(wc->status != IB_WC_SUCCESS)) { 349 rtrs_err(con->c.sess, "Failed IB_WR_LOCAL_INV: %s\n", 350 ib_wc_status_msg(wc->status)); 351 rtrs_rdma_error_recovery(con); 352 } 353 req->need_inv = false; 354 if (likely(req->need_inv_comp)) 355 complete(&req->inv_comp); 356 else 357 /* Complete request from INV callback */ 358 complete_rdma_req(req, req->inv_errno, true, false); 359 } 360 361 static int rtrs_inv_rkey(struct rtrs_clt_io_req *req) 362 { 363 struct rtrs_clt_con *con = req->con; 364 struct ib_send_wr wr = { 365 .opcode = IB_WR_LOCAL_INV, 366 .wr_cqe = &req->inv_cqe, 367 .send_flags = IB_SEND_SIGNALED, 368 .ex.invalidate_rkey = req->mr->rkey, 369 }; 370 req->inv_cqe.done = rtrs_clt_inv_rkey_done; 371 372 return ib_post_send(con->c.qp, &wr, NULL); 373 } 374 375 static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno, 376 bool notify, bool can_wait) 377 { 378 struct rtrs_clt_con *con = req->con; 379 struct rtrs_clt_sess *sess; 380 int err; 381 382 if (WARN_ON(!req->in_use)) 383 return; 384 if (WARN_ON(!req->con)) 385 return; 386 sess = to_clt_sess(con->c.sess); 387 388 if (req->sg_cnt) { 389 if (unlikely(req->dir == DMA_FROM_DEVICE && req->need_inv)) { 390 /* 391 * We are here to invalidate read requests 392 * ourselves. In normal scenario server should 393 * send INV for all read requests, but 394 * we are here, thus two things could happen: 395 * 396 * 1. this is failover, when errno != 0 397 * and can_wait == 1, 398 * 399 * 2. something totally bad happened and 400 * server forgot to send INV, so we 401 * should do that ourselves. 402 */ 403 404 if (likely(can_wait)) { 405 req->need_inv_comp = true; 406 } else { 407 /* This should be IO path, so always notify */ 408 WARN_ON(!notify); 409 /* Save errno for INV callback */ 410 req->inv_errno = errno; 411 } 412 413 err = rtrs_inv_rkey(req); 414 if (unlikely(err)) { 415 rtrs_err(con->c.sess, "Send INV WR key=%#x: %d\n", 416 req->mr->rkey, err); 417 } else if (likely(can_wait)) { 418 wait_for_completion(&req->inv_comp); 419 } else { 420 /* 421 * Something went wrong, so request will be 422 * completed from INV callback. 423 */ 424 WARN_ON_ONCE(1); 425 426 return; 427 } 428 } 429 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist, 430 req->sg_cnt, req->dir); 431 } 432 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT) 433 atomic_dec(&sess->stats->inflight); 434 435 req->in_use = false; 436 req->con = NULL; 437 438 if (notify) 439 req->conf(req->priv, errno); 440 } 441 442 static int rtrs_post_send_rdma(struct rtrs_clt_con *con, 443 struct rtrs_clt_io_req *req, 444 struct rtrs_rbuf *rbuf, u32 off, 445 u32 imm, struct ib_send_wr *wr) 446 { 447 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 448 enum ib_send_flags flags; 449 struct ib_sge sge; 450 451 if (unlikely(!req->sg_size)) { 452 rtrs_wrn(con->c.sess, 453 "Doing RDMA Write failed, no data supplied\n"); 454 return -EINVAL; 455 } 456 457 /* user data and user message in the first list element */ 458 sge.addr = req->iu->dma_addr; 459 sge.length = req->sg_size; 460 sge.lkey = sess->s.dev->ib_pd->local_dma_lkey; 461 462 /* 463 * From time to time we have to post signalled sends, 464 * or send queue will fill up and only QP reset can help. 465 */ 466 flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ? 467 0 : IB_SEND_SIGNALED; 468 469 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr, 470 req->sg_size, DMA_TO_DEVICE); 471 472 return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, &sge, 1, 473 rbuf->rkey, rbuf->addr + off, 474 imm, flags, wr); 475 } 476 477 static void process_io_rsp(struct rtrs_clt_sess *sess, u32 msg_id, 478 s16 errno, bool w_inval) 479 { 480 struct rtrs_clt_io_req *req; 481 482 if (WARN_ON(msg_id >= sess->queue_depth)) 483 return; 484 485 req = &sess->reqs[msg_id]; 486 /* Drop need_inv if server responded with send with invalidation */ 487 req->need_inv &= !w_inval; 488 complete_rdma_req(req, errno, true, false); 489 } 490 491 static void rtrs_clt_recv_done(struct rtrs_clt_con *con, struct ib_wc *wc) 492 { 493 struct rtrs_iu *iu; 494 int err; 495 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 496 497 WARN_ON(sess->flags != RTRS_MSG_NEW_RKEY_F); 498 iu = container_of(wc->wr_cqe, struct rtrs_iu, 499 cqe); 500 err = rtrs_iu_post_recv(&con->c, iu); 501 if (unlikely(err)) { 502 rtrs_err(con->c.sess, "post iu failed %d\n", err); 503 rtrs_rdma_error_recovery(con); 504 } 505 } 506 507 static void rtrs_clt_rkey_rsp_done(struct rtrs_clt_con *con, struct ib_wc *wc) 508 { 509 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 510 struct rtrs_msg_rkey_rsp *msg; 511 u32 imm_type, imm_payload; 512 bool w_inval = false; 513 struct rtrs_iu *iu; 514 u32 buf_id; 515 int err; 516 517 WARN_ON(sess->flags != RTRS_MSG_NEW_RKEY_F); 518 519 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 520 521 if (unlikely(wc->byte_len < sizeof(*msg))) { 522 rtrs_err(con->c.sess, "rkey response is malformed: size %d\n", 523 wc->byte_len); 524 goto out; 525 } 526 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr, 527 iu->size, DMA_FROM_DEVICE); 528 msg = iu->buf; 529 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_RKEY_RSP)) { 530 rtrs_err(sess->clt, "rkey response is malformed: type %d\n", 531 le16_to_cpu(msg->type)); 532 goto out; 533 } 534 buf_id = le16_to_cpu(msg->buf_id); 535 if (WARN_ON(buf_id >= sess->queue_depth)) 536 goto out; 537 538 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), &imm_type, &imm_payload); 539 if (likely(imm_type == RTRS_IO_RSP_IMM || 540 imm_type == RTRS_IO_RSP_W_INV_IMM)) { 541 u32 msg_id; 542 543 w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM); 544 rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err); 545 546 if (WARN_ON(buf_id != msg_id)) 547 goto out; 548 sess->rbufs[buf_id].rkey = le32_to_cpu(msg->rkey); 549 process_io_rsp(sess, msg_id, err, w_inval); 550 } 551 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, iu->dma_addr, 552 iu->size, DMA_FROM_DEVICE); 553 return rtrs_clt_recv_done(con, wc); 554 out: 555 rtrs_rdma_error_recovery(con); 556 } 557 558 static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc); 559 560 static struct ib_cqe io_comp_cqe = { 561 .done = rtrs_clt_rdma_done 562 }; 563 564 /* 565 * Post x2 empty WRs: first is for this RDMA with IMM, 566 * second is for RECV with INV, which happened earlier. 567 */ 568 static int rtrs_post_recv_empty_x2(struct rtrs_con *con, struct ib_cqe *cqe) 569 { 570 struct ib_recv_wr wr_arr[2], *wr; 571 int i; 572 573 memset(wr_arr, 0, sizeof(wr_arr)); 574 for (i = 0; i < ARRAY_SIZE(wr_arr); i++) { 575 wr = &wr_arr[i]; 576 wr->wr_cqe = cqe; 577 if (i) 578 /* Chain backwards */ 579 wr->next = &wr_arr[i - 1]; 580 } 581 582 return ib_post_recv(con->qp, wr, NULL); 583 } 584 585 static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc) 586 { 587 struct rtrs_clt_con *con = cq->cq_context; 588 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 589 u32 imm_type, imm_payload; 590 bool w_inval = false; 591 int err; 592 593 if (unlikely(wc->status != IB_WC_SUCCESS)) { 594 if (wc->status != IB_WC_WR_FLUSH_ERR) { 595 rtrs_err(sess->clt, "RDMA failed: %s\n", 596 ib_wc_status_msg(wc->status)); 597 rtrs_rdma_error_recovery(con); 598 } 599 return; 600 } 601 rtrs_clt_update_wc_stats(con); 602 603 switch (wc->opcode) { 604 case IB_WC_RECV_RDMA_WITH_IMM: 605 /* 606 * post_recv() RDMA write completions of IO reqs (read/write) 607 * and hb 608 */ 609 if (WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done)) 610 return; 611 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), 612 &imm_type, &imm_payload); 613 if (likely(imm_type == RTRS_IO_RSP_IMM || 614 imm_type == RTRS_IO_RSP_W_INV_IMM)) { 615 u32 msg_id; 616 617 w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM); 618 rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err); 619 620 process_io_rsp(sess, msg_id, err, w_inval); 621 } else if (imm_type == RTRS_HB_MSG_IMM) { 622 WARN_ON(con->c.cid); 623 rtrs_send_hb_ack(&sess->s); 624 if (sess->flags == RTRS_MSG_NEW_RKEY_F) 625 return rtrs_clt_recv_done(con, wc); 626 } else if (imm_type == RTRS_HB_ACK_IMM) { 627 WARN_ON(con->c.cid); 628 sess->s.hb_missed_cnt = 0; 629 if (sess->flags == RTRS_MSG_NEW_RKEY_F) 630 return rtrs_clt_recv_done(con, wc); 631 } else { 632 rtrs_wrn(con->c.sess, "Unknown IMM type %u\n", 633 imm_type); 634 } 635 if (w_inval) 636 /* 637 * Post x2 empty WRs: first is for this RDMA with IMM, 638 * second is for RECV with INV, which happened earlier. 639 */ 640 err = rtrs_post_recv_empty_x2(&con->c, &io_comp_cqe); 641 else 642 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 643 if (unlikely(err)) { 644 rtrs_err(con->c.sess, "rtrs_post_recv_empty(): %d\n", 645 err); 646 rtrs_rdma_error_recovery(con); 647 break; 648 } 649 break; 650 case IB_WC_RECV: 651 /* 652 * Key invalidations from server side 653 */ 654 WARN_ON(!(wc->wc_flags & IB_WC_WITH_INVALIDATE || 655 wc->wc_flags & IB_WC_WITH_IMM)); 656 WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done); 657 if (sess->flags == RTRS_MSG_NEW_RKEY_F) { 658 if (wc->wc_flags & IB_WC_WITH_INVALIDATE) 659 return rtrs_clt_recv_done(con, wc); 660 661 return rtrs_clt_rkey_rsp_done(con, wc); 662 } 663 break; 664 case IB_WC_RDMA_WRITE: 665 /* 666 * post_send() RDMA write completions of IO reqs (read/write) 667 * and hb 668 */ 669 break; 670 671 default: 672 rtrs_wrn(sess->clt, "Unexpected WC type: %d\n", wc->opcode); 673 return; 674 } 675 } 676 677 static int post_recv_io(struct rtrs_clt_con *con, size_t q_size) 678 { 679 int err, i; 680 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 681 682 for (i = 0; i < q_size; i++) { 683 if (sess->flags == RTRS_MSG_NEW_RKEY_F) { 684 struct rtrs_iu *iu = &con->rsp_ius[i]; 685 686 err = rtrs_iu_post_recv(&con->c, iu); 687 } else { 688 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 689 } 690 if (unlikely(err)) 691 return err; 692 } 693 694 return 0; 695 } 696 697 static int post_recv_sess(struct rtrs_clt_sess *sess) 698 { 699 size_t q_size = 0; 700 int err, cid; 701 702 for (cid = 0; cid < sess->s.con_num; cid++) { 703 if (cid == 0) 704 q_size = SERVICE_CON_QUEUE_DEPTH; 705 else 706 q_size = sess->queue_depth; 707 708 /* 709 * x2 for RDMA read responses + FR key invalidations, 710 * RDMA writes do not require any FR registrations. 711 */ 712 q_size *= 2; 713 714 err = post_recv_io(to_clt_con(sess->s.con[cid]), q_size); 715 if (unlikely(err)) { 716 rtrs_err(sess->clt, "post_recv_io(), err: %d\n", err); 717 return err; 718 } 719 } 720 721 return 0; 722 } 723 724 struct path_it { 725 int i; 726 struct list_head skip_list; 727 struct rtrs_clt *clt; 728 struct rtrs_clt_sess *(*next_path)(struct path_it *it); 729 }; 730 731 /** 732 * list_next_or_null_rr_rcu - get next list element in round-robin fashion. 733 * @head: the head for the list. 734 * @ptr: the list head to take the next element from. 735 * @type: the type of the struct this is embedded in. 736 * @memb: the name of the list_head within the struct. 737 * 738 * Next element returned in round-robin fashion, i.e. head will be skipped, 739 * but if list is observed as empty, NULL will be returned. 740 * 741 * This primitive may safely run concurrently with the _rcu list-mutation 742 * primitives such as list_add_rcu() as long as it's guarded by rcu_read_lock(). 743 */ 744 #define list_next_or_null_rr_rcu(head, ptr, type, memb) \ 745 ({ \ 746 list_next_or_null_rcu(head, ptr, type, memb) ?: \ 747 list_next_or_null_rcu(head, READ_ONCE((ptr)->next), \ 748 type, memb); \ 749 }) 750 751 /** 752 * get_next_path_rr() - Returns path in round-robin fashion. 753 * @it: the path pointer 754 * 755 * Related to @MP_POLICY_RR 756 * 757 * Locks: 758 * rcu_read_lock() must be hold. 759 */ 760 static struct rtrs_clt_sess *get_next_path_rr(struct path_it *it) 761 { 762 struct rtrs_clt_sess __rcu **ppcpu_path; 763 struct rtrs_clt_sess *path; 764 struct rtrs_clt *clt; 765 766 clt = it->clt; 767 768 /* 769 * Here we use two RCU objects: @paths_list and @pcpu_path 770 * pointer. See rtrs_clt_remove_path_from_arr() for details 771 * how that is handled. 772 */ 773 774 ppcpu_path = this_cpu_ptr(clt->pcpu_path); 775 path = rcu_dereference(*ppcpu_path); 776 if (unlikely(!path)) 777 path = list_first_or_null_rcu(&clt->paths_list, 778 typeof(*path), s.entry); 779 else 780 path = list_next_or_null_rr_rcu(&clt->paths_list, 781 &path->s.entry, 782 typeof(*path), 783 s.entry); 784 rcu_assign_pointer(*ppcpu_path, path); 785 786 return path; 787 } 788 789 /** 790 * get_next_path_min_inflight() - Returns path with minimal inflight count. 791 * @it: the path pointer 792 * 793 * Related to @MP_POLICY_MIN_INFLIGHT 794 * 795 * Locks: 796 * rcu_read_lock() must be hold. 797 */ 798 static struct rtrs_clt_sess *get_next_path_min_inflight(struct path_it *it) 799 { 800 struct rtrs_clt_sess *min_path = NULL; 801 struct rtrs_clt *clt = it->clt; 802 struct rtrs_clt_sess *sess; 803 int min_inflight = INT_MAX; 804 int inflight; 805 806 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) { 807 if (unlikely(!list_empty(raw_cpu_ptr(sess->mp_skip_entry)))) 808 continue; 809 810 inflight = atomic_read(&sess->stats->inflight); 811 812 if (inflight < min_inflight) { 813 min_inflight = inflight; 814 min_path = sess; 815 } 816 } 817 818 /* 819 * add the path to the skip list, so that next time we can get 820 * a different one 821 */ 822 if (min_path) 823 list_add(raw_cpu_ptr(min_path->mp_skip_entry), &it->skip_list); 824 825 return min_path; 826 } 827 828 static inline void path_it_init(struct path_it *it, struct rtrs_clt *clt) 829 { 830 INIT_LIST_HEAD(&it->skip_list); 831 it->clt = clt; 832 it->i = 0; 833 834 if (clt->mp_policy == MP_POLICY_RR) 835 it->next_path = get_next_path_rr; 836 else 837 it->next_path = get_next_path_min_inflight; 838 } 839 840 static inline void path_it_deinit(struct path_it *it) 841 { 842 struct list_head *skip, *tmp; 843 /* 844 * The skip_list is used only for the MIN_INFLIGHT policy. 845 * We need to remove paths from it, so that next IO can insert 846 * paths (->mp_skip_entry) into a skip_list again. 847 */ 848 list_for_each_safe(skip, tmp, &it->skip_list) 849 list_del_init(skip); 850 } 851 852 /** 853 * rtrs_clt_init_req() Initialize an rtrs_clt_io_req holding information 854 * about an inflight IO. 855 * The user buffer holding user control message (not data) is copied into 856 * the corresponding buffer of rtrs_iu (req->iu->buf), which later on will 857 * also hold the control message of rtrs. 858 * @req: an io request holding information about IO. 859 * @sess: client session 860 * @conf: conformation callback function to notify upper layer. 861 * @permit: permit for allocation of RDMA remote buffer 862 * @priv: private pointer 863 * @vec: kernel vector containing control message 864 * @usr_len: length of the user message 865 * @sg: scater list for IO data 866 * @sg_cnt: number of scater list entries 867 * @data_len: length of the IO data 868 * @dir: direction of the IO. 869 */ 870 static void rtrs_clt_init_req(struct rtrs_clt_io_req *req, 871 struct rtrs_clt_sess *sess, 872 void (*conf)(void *priv, int errno), 873 struct rtrs_permit *permit, void *priv, 874 const struct kvec *vec, size_t usr_len, 875 struct scatterlist *sg, size_t sg_cnt, 876 size_t data_len, int dir) 877 { 878 struct iov_iter iter; 879 size_t len; 880 881 req->permit = permit; 882 req->in_use = true; 883 req->usr_len = usr_len; 884 req->data_len = data_len; 885 req->sglist = sg; 886 req->sg_cnt = sg_cnt; 887 req->priv = priv; 888 req->dir = dir; 889 req->con = rtrs_permit_to_clt_con(sess, permit); 890 req->conf = conf; 891 req->need_inv = false; 892 req->need_inv_comp = false; 893 req->inv_errno = 0; 894 895 iov_iter_kvec(&iter, READ, vec, 1, usr_len); 896 len = _copy_from_iter(req->iu->buf, usr_len, &iter); 897 WARN_ON(len != usr_len); 898 899 reinit_completion(&req->inv_comp); 900 } 901 902 static struct rtrs_clt_io_req * 903 rtrs_clt_get_req(struct rtrs_clt_sess *sess, 904 void (*conf)(void *priv, int errno), 905 struct rtrs_permit *permit, void *priv, 906 const struct kvec *vec, size_t usr_len, 907 struct scatterlist *sg, size_t sg_cnt, 908 size_t data_len, int dir) 909 { 910 struct rtrs_clt_io_req *req; 911 912 req = &sess->reqs[permit->mem_id]; 913 rtrs_clt_init_req(req, sess, conf, permit, priv, vec, usr_len, 914 sg, sg_cnt, data_len, dir); 915 return req; 916 } 917 918 static struct rtrs_clt_io_req * 919 rtrs_clt_get_copy_req(struct rtrs_clt_sess *alive_sess, 920 struct rtrs_clt_io_req *fail_req) 921 { 922 struct rtrs_clt_io_req *req; 923 struct kvec vec = { 924 .iov_base = fail_req->iu->buf, 925 .iov_len = fail_req->usr_len 926 }; 927 928 req = &alive_sess->reqs[fail_req->permit->mem_id]; 929 rtrs_clt_init_req(req, alive_sess, fail_req->conf, fail_req->permit, 930 fail_req->priv, &vec, fail_req->usr_len, 931 fail_req->sglist, fail_req->sg_cnt, 932 fail_req->data_len, fail_req->dir); 933 return req; 934 } 935 936 static int rtrs_post_rdma_write_sg(struct rtrs_clt_con *con, 937 struct rtrs_clt_io_req *req, 938 struct rtrs_rbuf *rbuf, 939 u32 size, u32 imm) 940 { 941 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 942 struct ib_sge *sge = req->sge; 943 enum ib_send_flags flags; 944 struct scatterlist *sg; 945 size_t num_sge; 946 int i; 947 948 for_each_sg(req->sglist, sg, req->sg_cnt, i) { 949 sge[i].addr = sg_dma_address(sg); 950 sge[i].length = sg_dma_len(sg); 951 sge[i].lkey = sess->s.dev->ib_pd->local_dma_lkey; 952 } 953 sge[i].addr = req->iu->dma_addr; 954 sge[i].length = size; 955 sge[i].lkey = sess->s.dev->ib_pd->local_dma_lkey; 956 957 num_sge = 1 + req->sg_cnt; 958 959 /* 960 * From time to time we have to post signalled sends, 961 * or send queue will fill up and only QP reset can help. 962 */ 963 flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ? 964 0 : IB_SEND_SIGNALED; 965 966 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr, 967 size, DMA_TO_DEVICE); 968 969 return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, sge, num_sge, 970 rbuf->rkey, rbuf->addr, imm, 971 flags, NULL); 972 } 973 974 static int rtrs_clt_write_req(struct rtrs_clt_io_req *req) 975 { 976 struct rtrs_clt_con *con = req->con; 977 struct rtrs_sess *s = con->c.sess; 978 struct rtrs_clt_sess *sess = to_clt_sess(s); 979 struct rtrs_msg_rdma_write *msg; 980 981 struct rtrs_rbuf *rbuf; 982 int ret, count = 0; 983 u32 imm, buf_id; 984 985 const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len; 986 987 if (unlikely(tsize > sess->chunk_size)) { 988 rtrs_wrn(s, "Write request failed, size too big %zu > %d\n", 989 tsize, sess->chunk_size); 990 return -EMSGSIZE; 991 } 992 if (req->sg_cnt) { 993 count = ib_dma_map_sg(sess->s.dev->ib_dev, req->sglist, 994 req->sg_cnt, req->dir); 995 if (unlikely(!count)) { 996 rtrs_wrn(s, "Write request failed, map failed\n"); 997 return -EINVAL; 998 } 999 } 1000 /* put rtrs msg after sg and user message */ 1001 msg = req->iu->buf + req->usr_len; 1002 msg->type = cpu_to_le16(RTRS_MSG_WRITE); 1003 msg->usr_len = cpu_to_le16(req->usr_len); 1004 1005 /* rtrs message on server side will be after user data and message */ 1006 imm = req->permit->mem_off + req->data_len + req->usr_len; 1007 imm = rtrs_to_io_req_imm(imm); 1008 buf_id = req->permit->mem_id; 1009 req->sg_size = tsize; 1010 rbuf = &sess->rbufs[buf_id]; 1011 1012 /* 1013 * Update stats now, after request is successfully sent it is not 1014 * safe anymore to touch it. 1015 */ 1016 rtrs_clt_update_all_stats(req, WRITE); 1017 1018 ret = rtrs_post_rdma_write_sg(req->con, req, rbuf, 1019 req->usr_len + sizeof(*msg), 1020 imm); 1021 if (unlikely(ret)) { 1022 rtrs_err(s, "Write request failed: %d\n", ret); 1023 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT) 1024 atomic_dec(&sess->stats->inflight); 1025 if (req->sg_cnt) 1026 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist, 1027 req->sg_cnt, req->dir); 1028 } 1029 1030 return ret; 1031 } 1032 1033 static int rtrs_map_sg_fr(struct rtrs_clt_io_req *req, size_t count) 1034 { 1035 int nr; 1036 1037 /* Align the MR to a 4K page size to match the block virt boundary */ 1038 nr = ib_map_mr_sg(req->mr, req->sglist, count, NULL, SZ_4K); 1039 if (nr < 0) 1040 return nr; 1041 if (unlikely(nr < req->sg_cnt)) 1042 return -EINVAL; 1043 ib_update_fast_reg_key(req->mr, ib_inc_rkey(req->mr->rkey)); 1044 1045 return nr; 1046 } 1047 1048 static int rtrs_clt_read_req(struct rtrs_clt_io_req *req) 1049 { 1050 struct rtrs_clt_con *con = req->con; 1051 struct rtrs_sess *s = con->c.sess; 1052 struct rtrs_clt_sess *sess = to_clt_sess(s); 1053 struct rtrs_msg_rdma_read *msg; 1054 struct rtrs_ib_dev *dev; 1055 1056 struct ib_reg_wr rwr; 1057 struct ib_send_wr *wr = NULL; 1058 1059 int ret, count = 0; 1060 u32 imm, buf_id; 1061 1062 const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len; 1063 1064 s = &sess->s; 1065 dev = sess->s.dev; 1066 1067 if (unlikely(tsize > sess->chunk_size)) { 1068 rtrs_wrn(s, 1069 "Read request failed, message size is %zu, bigger than CHUNK_SIZE %d\n", 1070 tsize, sess->chunk_size); 1071 return -EMSGSIZE; 1072 } 1073 1074 if (req->sg_cnt) { 1075 count = ib_dma_map_sg(dev->ib_dev, req->sglist, req->sg_cnt, 1076 req->dir); 1077 if (unlikely(!count)) { 1078 rtrs_wrn(s, 1079 "Read request failed, dma map failed\n"); 1080 return -EINVAL; 1081 } 1082 } 1083 /* put our message into req->buf after user message*/ 1084 msg = req->iu->buf + req->usr_len; 1085 msg->type = cpu_to_le16(RTRS_MSG_READ); 1086 msg->usr_len = cpu_to_le16(req->usr_len); 1087 1088 if (count) { 1089 ret = rtrs_map_sg_fr(req, count); 1090 if (ret < 0) { 1091 rtrs_err_rl(s, 1092 "Read request failed, failed to map fast reg. data, err: %d\n", 1093 ret); 1094 ib_dma_unmap_sg(dev->ib_dev, req->sglist, req->sg_cnt, 1095 req->dir); 1096 return ret; 1097 } 1098 rwr = (struct ib_reg_wr) { 1099 .wr.opcode = IB_WR_REG_MR, 1100 .wr.wr_cqe = &fast_reg_cqe, 1101 .mr = req->mr, 1102 .key = req->mr->rkey, 1103 .access = (IB_ACCESS_LOCAL_WRITE | 1104 IB_ACCESS_REMOTE_WRITE), 1105 }; 1106 wr = &rwr.wr; 1107 1108 msg->sg_cnt = cpu_to_le16(1); 1109 msg->flags = cpu_to_le16(RTRS_MSG_NEED_INVAL_F); 1110 1111 msg->desc[0].addr = cpu_to_le64(req->mr->iova); 1112 msg->desc[0].key = cpu_to_le32(req->mr->rkey); 1113 msg->desc[0].len = cpu_to_le32(req->mr->length); 1114 1115 /* Further invalidation is required */ 1116 req->need_inv = !!RTRS_MSG_NEED_INVAL_F; 1117 1118 } else { 1119 msg->sg_cnt = 0; 1120 msg->flags = 0; 1121 } 1122 /* 1123 * rtrs message will be after the space reserved for disk data and 1124 * user message 1125 */ 1126 imm = req->permit->mem_off + req->data_len + req->usr_len; 1127 imm = rtrs_to_io_req_imm(imm); 1128 buf_id = req->permit->mem_id; 1129 1130 req->sg_size = sizeof(*msg); 1131 req->sg_size += le16_to_cpu(msg->sg_cnt) * sizeof(struct rtrs_sg_desc); 1132 req->sg_size += req->usr_len; 1133 1134 /* 1135 * Update stats now, after request is successfully sent it is not 1136 * safe anymore to touch it. 1137 */ 1138 rtrs_clt_update_all_stats(req, READ); 1139 1140 ret = rtrs_post_send_rdma(req->con, req, &sess->rbufs[buf_id], 1141 req->data_len, imm, wr); 1142 if (unlikely(ret)) { 1143 rtrs_err(s, "Read request failed: %d\n", ret); 1144 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT) 1145 atomic_dec(&sess->stats->inflight); 1146 req->need_inv = false; 1147 if (req->sg_cnt) 1148 ib_dma_unmap_sg(dev->ib_dev, req->sglist, 1149 req->sg_cnt, req->dir); 1150 } 1151 1152 return ret; 1153 } 1154 1155 /** 1156 * rtrs_clt_failover_req() Try to find an active path for a failed request 1157 * @clt: clt context 1158 * @fail_req: a failed io request. 1159 */ 1160 static int rtrs_clt_failover_req(struct rtrs_clt *clt, 1161 struct rtrs_clt_io_req *fail_req) 1162 { 1163 struct rtrs_clt_sess *alive_sess; 1164 struct rtrs_clt_io_req *req; 1165 int err = -ECONNABORTED; 1166 struct path_it it; 1167 1168 rcu_read_lock(); 1169 for (path_it_init(&it, clt); 1170 (alive_sess = it.next_path(&it)) && it.i < it.clt->paths_num; 1171 it.i++) { 1172 if (unlikely(READ_ONCE(alive_sess->state) != 1173 RTRS_CLT_CONNECTED)) 1174 continue; 1175 req = rtrs_clt_get_copy_req(alive_sess, fail_req); 1176 if (req->dir == DMA_TO_DEVICE) 1177 err = rtrs_clt_write_req(req); 1178 else 1179 err = rtrs_clt_read_req(req); 1180 if (unlikely(err)) { 1181 req->in_use = false; 1182 continue; 1183 } 1184 /* Success path */ 1185 rtrs_clt_inc_failover_cnt(alive_sess->stats); 1186 break; 1187 } 1188 path_it_deinit(&it); 1189 rcu_read_unlock(); 1190 1191 return err; 1192 } 1193 1194 static void fail_all_outstanding_reqs(struct rtrs_clt_sess *sess) 1195 { 1196 struct rtrs_clt *clt = sess->clt; 1197 struct rtrs_clt_io_req *req; 1198 int i, err; 1199 1200 if (!sess->reqs) 1201 return; 1202 for (i = 0; i < sess->queue_depth; ++i) { 1203 req = &sess->reqs[i]; 1204 if (!req->in_use) 1205 continue; 1206 1207 /* 1208 * Safely (without notification) complete failed request. 1209 * After completion this request is still useble and can 1210 * be failovered to another path. 1211 */ 1212 complete_rdma_req(req, -ECONNABORTED, false, true); 1213 1214 err = rtrs_clt_failover_req(clt, req); 1215 if (unlikely(err)) 1216 /* Failover failed, notify anyway */ 1217 req->conf(req->priv, err); 1218 } 1219 } 1220 1221 static void free_sess_reqs(struct rtrs_clt_sess *sess) 1222 { 1223 struct rtrs_clt_io_req *req; 1224 int i; 1225 1226 if (!sess->reqs) 1227 return; 1228 for (i = 0; i < sess->queue_depth; ++i) { 1229 req = &sess->reqs[i]; 1230 if (req->mr) 1231 ib_dereg_mr(req->mr); 1232 kfree(req->sge); 1233 rtrs_iu_free(req->iu, sess->s.dev->ib_dev, 1); 1234 } 1235 kfree(sess->reqs); 1236 sess->reqs = NULL; 1237 } 1238 1239 static int alloc_sess_reqs(struct rtrs_clt_sess *sess) 1240 { 1241 struct rtrs_clt_io_req *req; 1242 struct rtrs_clt *clt = sess->clt; 1243 int i, err = -ENOMEM; 1244 1245 sess->reqs = kcalloc(sess->queue_depth, sizeof(*sess->reqs), 1246 GFP_KERNEL); 1247 if (!sess->reqs) 1248 return -ENOMEM; 1249 1250 for (i = 0; i < sess->queue_depth; ++i) { 1251 req = &sess->reqs[i]; 1252 req->iu = rtrs_iu_alloc(1, sess->max_hdr_size, GFP_KERNEL, 1253 sess->s.dev->ib_dev, 1254 DMA_TO_DEVICE, 1255 rtrs_clt_rdma_done); 1256 if (!req->iu) 1257 goto out; 1258 1259 req->sge = kmalloc_array(clt->max_segments + 1, 1260 sizeof(*req->sge), GFP_KERNEL); 1261 if (!req->sge) 1262 goto out; 1263 1264 req->mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG, 1265 sess->max_pages_per_mr); 1266 if (IS_ERR(req->mr)) { 1267 err = PTR_ERR(req->mr); 1268 req->mr = NULL; 1269 pr_err("Failed to alloc sess->max_pages_per_mr %d\n", 1270 sess->max_pages_per_mr); 1271 goto out; 1272 } 1273 1274 init_completion(&req->inv_comp); 1275 } 1276 1277 return 0; 1278 1279 out: 1280 free_sess_reqs(sess); 1281 1282 return err; 1283 } 1284 1285 static int alloc_permits(struct rtrs_clt *clt) 1286 { 1287 unsigned int chunk_bits; 1288 int err, i; 1289 1290 clt->permits_map = kcalloc(BITS_TO_LONGS(clt->queue_depth), 1291 sizeof(long), GFP_KERNEL); 1292 if (!clt->permits_map) { 1293 err = -ENOMEM; 1294 goto out_err; 1295 } 1296 clt->permits = kcalloc(clt->queue_depth, permit_size(clt), GFP_KERNEL); 1297 if (!clt->permits) { 1298 err = -ENOMEM; 1299 goto err_map; 1300 } 1301 chunk_bits = ilog2(clt->queue_depth - 1) + 1; 1302 for (i = 0; i < clt->queue_depth; i++) { 1303 struct rtrs_permit *permit; 1304 1305 permit = get_permit(clt, i); 1306 permit->mem_id = i; 1307 permit->mem_off = i << (MAX_IMM_PAYL_BITS - chunk_bits); 1308 } 1309 1310 return 0; 1311 1312 err_map: 1313 kfree(clt->permits_map); 1314 clt->permits_map = NULL; 1315 out_err: 1316 return err; 1317 } 1318 1319 static void free_permits(struct rtrs_clt *clt) 1320 { 1321 kfree(clt->permits_map); 1322 clt->permits_map = NULL; 1323 kfree(clt->permits); 1324 clt->permits = NULL; 1325 } 1326 1327 static void query_fast_reg_mode(struct rtrs_clt_sess *sess) 1328 { 1329 struct ib_device *ib_dev; 1330 u64 max_pages_per_mr; 1331 int mr_page_shift; 1332 1333 ib_dev = sess->s.dev->ib_dev; 1334 1335 /* 1336 * Use the smallest page size supported by the HCA, down to a 1337 * minimum of 4096 bytes. We're unlikely to build large sglists 1338 * out of smaller entries. 1339 */ 1340 mr_page_shift = max(12, ffs(ib_dev->attrs.page_size_cap) - 1); 1341 max_pages_per_mr = ib_dev->attrs.max_mr_size; 1342 do_div(max_pages_per_mr, (1ull << mr_page_shift)); 1343 sess->max_pages_per_mr = 1344 min3(sess->max_pages_per_mr, (u32)max_pages_per_mr, 1345 ib_dev->attrs.max_fast_reg_page_list_len); 1346 sess->max_send_sge = ib_dev->attrs.max_send_sge; 1347 } 1348 1349 static bool rtrs_clt_change_state_get_old(struct rtrs_clt_sess *sess, 1350 enum rtrs_clt_state new_state, 1351 enum rtrs_clt_state *old_state) 1352 { 1353 bool changed; 1354 1355 spin_lock_irq(&sess->state_wq.lock); 1356 *old_state = sess->state; 1357 changed = __rtrs_clt_change_state(sess, new_state); 1358 spin_unlock_irq(&sess->state_wq.lock); 1359 1360 return changed; 1361 } 1362 1363 static bool rtrs_clt_change_state(struct rtrs_clt_sess *sess, 1364 enum rtrs_clt_state new_state) 1365 { 1366 enum rtrs_clt_state old_state; 1367 1368 return rtrs_clt_change_state_get_old(sess, new_state, &old_state); 1369 } 1370 1371 static void rtrs_clt_hb_err_handler(struct rtrs_con *c) 1372 { 1373 struct rtrs_clt_con *con = container_of(c, typeof(*con), c); 1374 1375 rtrs_rdma_error_recovery(con); 1376 } 1377 1378 static void rtrs_clt_init_hb(struct rtrs_clt_sess *sess) 1379 { 1380 rtrs_init_hb(&sess->s, &io_comp_cqe, 1381 RTRS_HB_INTERVAL_MS, 1382 RTRS_HB_MISSED_MAX, 1383 rtrs_clt_hb_err_handler, 1384 rtrs_wq); 1385 } 1386 1387 static void rtrs_clt_start_hb(struct rtrs_clt_sess *sess) 1388 { 1389 rtrs_start_hb(&sess->s); 1390 } 1391 1392 static void rtrs_clt_stop_hb(struct rtrs_clt_sess *sess) 1393 { 1394 rtrs_stop_hb(&sess->s); 1395 } 1396 1397 static void rtrs_clt_reconnect_work(struct work_struct *work); 1398 static void rtrs_clt_close_work(struct work_struct *work); 1399 1400 static struct rtrs_clt_sess *alloc_sess(struct rtrs_clt *clt, 1401 const struct rtrs_addr *path, 1402 size_t con_num, u16 max_segments, 1403 size_t max_segment_size) 1404 { 1405 struct rtrs_clt_sess *sess; 1406 int err = -ENOMEM; 1407 int cpu; 1408 1409 sess = kzalloc(sizeof(*sess), GFP_KERNEL); 1410 if (!sess) 1411 goto err; 1412 1413 /* Extra connection for user messages */ 1414 con_num += 1; 1415 1416 sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL); 1417 if (!sess->s.con) 1418 goto err_free_sess; 1419 1420 sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL); 1421 if (!sess->stats) 1422 goto err_free_con; 1423 1424 mutex_init(&sess->init_mutex); 1425 uuid_gen(&sess->s.uuid); 1426 memcpy(&sess->s.dst_addr, path->dst, 1427 rdma_addr_size((struct sockaddr *)path->dst)); 1428 1429 /* 1430 * rdma_resolve_addr() passes src_addr to cma_bind_addr, which 1431 * checks the sa_family to be non-zero. If user passed src_addr=NULL 1432 * the sess->src_addr will contain only zeros, which is then fine. 1433 */ 1434 if (path->src) 1435 memcpy(&sess->s.src_addr, path->src, 1436 rdma_addr_size((struct sockaddr *)path->src)); 1437 strlcpy(sess->s.sessname, clt->sessname, sizeof(sess->s.sessname)); 1438 sess->s.con_num = con_num; 1439 sess->clt = clt; 1440 sess->max_pages_per_mr = max_segments * max_segment_size >> 12; 1441 init_waitqueue_head(&sess->state_wq); 1442 sess->state = RTRS_CLT_CONNECTING; 1443 atomic_set(&sess->connected_cnt, 0); 1444 INIT_WORK(&sess->close_work, rtrs_clt_close_work); 1445 INIT_DELAYED_WORK(&sess->reconnect_dwork, rtrs_clt_reconnect_work); 1446 rtrs_clt_init_hb(sess); 1447 1448 sess->mp_skip_entry = alloc_percpu(typeof(*sess->mp_skip_entry)); 1449 if (!sess->mp_skip_entry) 1450 goto err_free_stats; 1451 1452 for_each_possible_cpu(cpu) 1453 INIT_LIST_HEAD(per_cpu_ptr(sess->mp_skip_entry, cpu)); 1454 1455 err = rtrs_clt_init_stats(sess->stats); 1456 if (err) 1457 goto err_free_percpu; 1458 1459 return sess; 1460 1461 err_free_percpu: 1462 free_percpu(sess->mp_skip_entry); 1463 err_free_stats: 1464 kfree(sess->stats); 1465 err_free_con: 1466 kfree(sess->s.con); 1467 err_free_sess: 1468 kfree(sess); 1469 err: 1470 return ERR_PTR(err); 1471 } 1472 1473 void free_sess(struct rtrs_clt_sess *sess) 1474 { 1475 free_percpu(sess->mp_skip_entry); 1476 mutex_destroy(&sess->init_mutex); 1477 kfree(sess->s.con); 1478 kfree(sess->rbufs); 1479 kfree(sess); 1480 } 1481 1482 static int create_con(struct rtrs_clt_sess *sess, unsigned int cid) 1483 { 1484 struct rtrs_clt_con *con; 1485 1486 con = kzalloc(sizeof(*con), GFP_KERNEL); 1487 if (!con) 1488 return -ENOMEM; 1489 1490 /* Map first two connections to the first CPU */ 1491 con->cpu = (cid ? cid - 1 : 0) % nr_cpu_ids; 1492 con->c.cid = cid; 1493 con->c.sess = &sess->s; 1494 atomic_set(&con->io_cnt, 0); 1495 mutex_init(&con->con_mutex); 1496 1497 sess->s.con[cid] = &con->c; 1498 1499 return 0; 1500 } 1501 1502 static void destroy_con(struct rtrs_clt_con *con) 1503 { 1504 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1505 1506 sess->s.con[con->c.cid] = NULL; 1507 mutex_destroy(&con->con_mutex); 1508 kfree(con); 1509 } 1510 1511 static int create_con_cq_qp(struct rtrs_clt_con *con) 1512 { 1513 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1514 u16 wr_queue_size; 1515 int err, cq_vector; 1516 struct rtrs_msg_rkey_rsp *rsp; 1517 1518 lockdep_assert_held(&con->con_mutex); 1519 if (con->c.cid == 0) { 1520 /* 1521 * One completion for each receive and two for each send 1522 * (send request + registration) 1523 * + 2 for drain and heartbeat 1524 * in case qp gets into error state 1525 */ 1526 wr_queue_size = SERVICE_CON_QUEUE_DEPTH * 3 + 2; 1527 /* We must be the first here */ 1528 if (WARN_ON(sess->s.dev)) 1529 return -EINVAL; 1530 1531 /* 1532 * The whole session uses device from user connection. 1533 * Be careful not to close user connection before ib dev 1534 * is gracefully put. 1535 */ 1536 sess->s.dev = rtrs_ib_dev_find_or_add(con->c.cm_id->device, 1537 &dev_pd); 1538 if (!sess->s.dev) { 1539 rtrs_wrn(sess->clt, 1540 "rtrs_ib_dev_find_get_or_add(): no memory\n"); 1541 return -ENOMEM; 1542 } 1543 sess->s.dev_ref = 1; 1544 query_fast_reg_mode(sess); 1545 } else { 1546 /* 1547 * Here we assume that session members are correctly set. 1548 * This is always true if user connection (cid == 0) is 1549 * established first. 1550 */ 1551 if (WARN_ON(!sess->s.dev)) 1552 return -EINVAL; 1553 if (WARN_ON(!sess->queue_depth)) 1554 return -EINVAL; 1555 1556 /* Shared between connections */ 1557 sess->s.dev_ref++; 1558 wr_queue_size = 1559 min_t(int, sess->s.dev->ib_dev->attrs.max_qp_wr, 1560 /* QD * (REQ + RSP + FR REGS or INVS) + drain */ 1561 sess->queue_depth * 3 + 1); 1562 } 1563 /* alloc iu to recv new rkey reply when server reports flags set */ 1564 if (sess->flags == RTRS_MSG_NEW_RKEY_F || con->c.cid == 0) { 1565 con->rsp_ius = rtrs_iu_alloc(wr_queue_size, sizeof(*rsp), 1566 GFP_KERNEL, sess->s.dev->ib_dev, 1567 DMA_FROM_DEVICE, 1568 rtrs_clt_rdma_done); 1569 if (!con->rsp_ius) 1570 return -ENOMEM; 1571 con->queue_size = wr_queue_size; 1572 } 1573 cq_vector = con->cpu % sess->s.dev->ib_dev->num_comp_vectors; 1574 err = rtrs_cq_qp_create(&sess->s, &con->c, sess->max_send_sge, 1575 cq_vector, wr_queue_size, wr_queue_size, 1576 IB_POLL_SOFTIRQ); 1577 /* 1578 * In case of error we do not bother to clean previous allocations, 1579 * since destroy_con_cq_qp() must be called. 1580 */ 1581 return err; 1582 } 1583 1584 static void destroy_con_cq_qp(struct rtrs_clt_con *con) 1585 { 1586 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1587 1588 /* 1589 * Be careful here: destroy_con_cq_qp() can be called even 1590 * create_con_cq_qp() failed, see comments there. 1591 */ 1592 lockdep_assert_held(&con->con_mutex); 1593 rtrs_cq_qp_destroy(&con->c); 1594 if (con->rsp_ius) { 1595 rtrs_iu_free(con->rsp_ius, sess->s.dev->ib_dev, con->queue_size); 1596 con->rsp_ius = NULL; 1597 con->queue_size = 0; 1598 } 1599 if (sess->s.dev_ref && !--sess->s.dev_ref) { 1600 rtrs_ib_dev_put(sess->s.dev); 1601 sess->s.dev = NULL; 1602 } 1603 } 1604 1605 static void stop_cm(struct rtrs_clt_con *con) 1606 { 1607 rdma_disconnect(con->c.cm_id); 1608 if (con->c.qp) 1609 ib_drain_qp(con->c.qp); 1610 } 1611 1612 static void destroy_cm(struct rtrs_clt_con *con) 1613 { 1614 rdma_destroy_id(con->c.cm_id); 1615 con->c.cm_id = NULL; 1616 } 1617 1618 static int rtrs_rdma_addr_resolved(struct rtrs_clt_con *con) 1619 { 1620 struct rtrs_sess *s = con->c.sess; 1621 int err; 1622 1623 mutex_lock(&con->con_mutex); 1624 err = create_con_cq_qp(con); 1625 mutex_unlock(&con->con_mutex); 1626 if (err) { 1627 rtrs_err(s, "create_con_cq_qp(), err: %d\n", err); 1628 return err; 1629 } 1630 err = rdma_resolve_route(con->c.cm_id, RTRS_CONNECT_TIMEOUT_MS); 1631 if (err) 1632 rtrs_err(s, "Resolving route failed, err: %d\n", err); 1633 1634 return err; 1635 } 1636 1637 static int rtrs_rdma_route_resolved(struct rtrs_clt_con *con) 1638 { 1639 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1640 struct rtrs_clt *clt = sess->clt; 1641 struct rtrs_msg_conn_req msg; 1642 struct rdma_conn_param param; 1643 1644 int err; 1645 1646 param = (struct rdma_conn_param) { 1647 .retry_count = 7, 1648 .rnr_retry_count = 7, 1649 .private_data = &msg, 1650 .private_data_len = sizeof(msg), 1651 }; 1652 1653 msg = (struct rtrs_msg_conn_req) { 1654 .magic = cpu_to_le16(RTRS_MAGIC), 1655 .version = cpu_to_le16(RTRS_PROTO_VER), 1656 .cid = cpu_to_le16(con->c.cid), 1657 .cid_num = cpu_to_le16(sess->s.con_num), 1658 .recon_cnt = cpu_to_le16(sess->s.recon_cnt), 1659 }; 1660 uuid_copy(&msg.sess_uuid, &sess->s.uuid); 1661 uuid_copy(&msg.paths_uuid, &clt->paths_uuid); 1662 1663 err = rdma_connect_locked(con->c.cm_id, ¶m); 1664 if (err) 1665 rtrs_err(clt, "rdma_connect_locked(): %d\n", err); 1666 1667 return err; 1668 } 1669 1670 static int rtrs_rdma_conn_established(struct rtrs_clt_con *con, 1671 struct rdma_cm_event *ev) 1672 { 1673 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1674 struct rtrs_clt *clt = sess->clt; 1675 const struct rtrs_msg_conn_rsp *msg; 1676 u16 version, queue_depth; 1677 int errno; 1678 u8 len; 1679 1680 msg = ev->param.conn.private_data; 1681 len = ev->param.conn.private_data_len; 1682 if (len < sizeof(*msg)) { 1683 rtrs_err(clt, "Invalid RTRS connection response\n"); 1684 return -ECONNRESET; 1685 } 1686 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) { 1687 rtrs_err(clt, "Invalid RTRS magic\n"); 1688 return -ECONNRESET; 1689 } 1690 version = le16_to_cpu(msg->version); 1691 if (version >> 8 != RTRS_PROTO_VER_MAJOR) { 1692 rtrs_err(clt, "Unsupported major RTRS version: %d, expected %d\n", 1693 version >> 8, RTRS_PROTO_VER_MAJOR); 1694 return -ECONNRESET; 1695 } 1696 errno = le16_to_cpu(msg->errno); 1697 if (errno) { 1698 rtrs_err(clt, "Invalid RTRS message: errno %d\n", 1699 errno); 1700 return -ECONNRESET; 1701 } 1702 if (con->c.cid == 0) { 1703 queue_depth = le16_to_cpu(msg->queue_depth); 1704 1705 if (queue_depth > MAX_SESS_QUEUE_DEPTH) { 1706 rtrs_err(clt, "Invalid RTRS message: queue=%d\n", 1707 queue_depth); 1708 return -ECONNRESET; 1709 } 1710 if (!sess->rbufs || sess->queue_depth < queue_depth) { 1711 kfree(sess->rbufs); 1712 sess->rbufs = kcalloc(queue_depth, sizeof(*sess->rbufs), 1713 GFP_KERNEL); 1714 if (!sess->rbufs) 1715 return -ENOMEM; 1716 } 1717 sess->queue_depth = queue_depth; 1718 sess->max_hdr_size = le32_to_cpu(msg->max_hdr_size); 1719 sess->max_io_size = le32_to_cpu(msg->max_io_size); 1720 sess->flags = le32_to_cpu(msg->flags); 1721 sess->chunk_size = sess->max_io_size + sess->max_hdr_size; 1722 1723 /* 1724 * Global queue depth and IO size is always a minimum. 1725 * If while a reconnection server sends us a value a bit 1726 * higher - client does not care and uses cached minimum. 1727 * 1728 * Since we can have several sessions (paths) restablishing 1729 * connections in parallel, use lock. 1730 */ 1731 mutex_lock(&clt->paths_mutex); 1732 clt->queue_depth = min_not_zero(sess->queue_depth, 1733 clt->queue_depth); 1734 clt->max_io_size = min_not_zero(sess->max_io_size, 1735 clt->max_io_size); 1736 mutex_unlock(&clt->paths_mutex); 1737 1738 /* 1739 * Cache the hca_port and hca_name for sysfs 1740 */ 1741 sess->hca_port = con->c.cm_id->port_num; 1742 scnprintf(sess->hca_name, sizeof(sess->hca_name), 1743 sess->s.dev->ib_dev->name); 1744 sess->s.src_addr = con->c.cm_id->route.addr.src_addr; 1745 } 1746 1747 return 0; 1748 } 1749 1750 static inline void flag_success_on_conn(struct rtrs_clt_con *con) 1751 { 1752 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1753 1754 atomic_inc(&sess->connected_cnt); 1755 con->cm_err = 1; 1756 } 1757 1758 static int rtrs_rdma_conn_rejected(struct rtrs_clt_con *con, 1759 struct rdma_cm_event *ev) 1760 { 1761 struct rtrs_sess *s = con->c.sess; 1762 const struct rtrs_msg_conn_rsp *msg; 1763 const char *rej_msg; 1764 int status, errno; 1765 u8 data_len; 1766 1767 status = ev->status; 1768 rej_msg = rdma_reject_msg(con->c.cm_id, status); 1769 msg = rdma_consumer_reject_data(con->c.cm_id, ev, &data_len); 1770 1771 if (msg && data_len >= sizeof(*msg)) { 1772 errno = (int16_t)le16_to_cpu(msg->errno); 1773 if (errno == -EBUSY) 1774 rtrs_err(s, 1775 "Previous session is still exists on the server, please reconnect later\n"); 1776 else 1777 rtrs_err(s, 1778 "Connect rejected: status %d (%s), rtrs errno %d\n", 1779 status, rej_msg, errno); 1780 } else { 1781 rtrs_err(s, 1782 "Connect rejected but with malformed message: status %d (%s)\n", 1783 status, rej_msg); 1784 } 1785 1786 return -ECONNRESET; 1787 } 1788 1789 static void rtrs_clt_close_conns(struct rtrs_clt_sess *sess, bool wait) 1790 { 1791 if (rtrs_clt_change_state(sess, RTRS_CLT_CLOSING)) 1792 queue_work(rtrs_wq, &sess->close_work); 1793 if (wait) 1794 flush_work(&sess->close_work); 1795 } 1796 1797 static inline void flag_error_on_conn(struct rtrs_clt_con *con, int cm_err) 1798 { 1799 if (con->cm_err == 1) { 1800 struct rtrs_clt_sess *sess; 1801 1802 sess = to_clt_sess(con->c.sess); 1803 if (atomic_dec_and_test(&sess->connected_cnt)) 1804 1805 wake_up(&sess->state_wq); 1806 } 1807 con->cm_err = cm_err; 1808 } 1809 1810 static int rtrs_clt_rdma_cm_handler(struct rdma_cm_id *cm_id, 1811 struct rdma_cm_event *ev) 1812 { 1813 struct rtrs_clt_con *con = cm_id->context; 1814 struct rtrs_sess *s = con->c.sess; 1815 struct rtrs_clt_sess *sess = to_clt_sess(s); 1816 int cm_err = 0; 1817 1818 switch (ev->event) { 1819 case RDMA_CM_EVENT_ADDR_RESOLVED: 1820 cm_err = rtrs_rdma_addr_resolved(con); 1821 break; 1822 case RDMA_CM_EVENT_ROUTE_RESOLVED: 1823 cm_err = rtrs_rdma_route_resolved(con); 1824 break; 1825 case RDMA_CM_EVENT_ESTABLISHED: 1826 cm_err = rtrs_rdma_conn_established(con, ev); 1827 if (likely(!cm_err)) { 1828 /* 1829 * Report success and wake up. Here we abuse state_wq, 1830 * i.e. wake up without state change, but we set cm_err. 1831 */ 1832 flag_success_on_conn(con); 1833 wake_up(&sess->state_wq); 1834 return 0; 1835 } 1836 break; 1837 case RDMA_CM_EVENT_REJECTED: 1838 cm_err = rtrs_rdma_conn_rejected(con, ev); 1839 break; 1840 case RDMA_CM_EVENT_DISCONNECTED: 1841 /* No message for disconnecting */ 1842 cm_err = -ECONNRESET; 1843 break; 1844 case RDMA_CM_EVENT_CONNECT_ERROR: 1845 case RDMA_CM_EVENT_UNREACHABLE: 1846 case RDMA_CM_EVENT_ADDR_CHANGE: 1847 case RDMA_CM_EVENT_TIMEWAIT_EXIT: 1848 rtrs_wrn(s, "CM error event %d\n", ev->event); 1849 cm_err = -ECONNRESET; 1850 break; 1851 case RDMA_CM_EVENT_ADDR_ERROR: 1852 case RDMA_CM_EVENT_ROUTE_ERROR: 1853 rtrs_wrn(s, "CM error event %d\n", ev->event); 1854 cm_err = -EHOSTUNREACH; 1855 break; 1856 case RDMA_CM_EVENT_DEVICE_REMOVAL: 1857 /* 1858 * Device removal is a special case. Queue close and return 0. 1859 */ 1860 rtrs_clt_close_conns(sess, false); 1861 return 0; 1862 default: 1863 rtrs_err(s, "Unexpected RDMA CM event (%d)\n", ev->event); 1864 cm_err = -ECONNRESET; 1865 break; 1866 } 1867 1868 if (cm_err) { 1869 /* 1870 * cm error makes sense only on connection establishing, 1871 * in other cases we rely on normal procedure of reconnecting. 1872 */ 1873 flag_error_on_conn(con, cm_err); 1874 rtrs_rdma_error_recovery(con); 1875 } 1876 1877 return 0; 1878 } 1879 1880 static int create_cm(struct rtrs_clt_con *con) 1881 { 1882 struct rtrs_sess *s = con->c.sess; 1883 struct rtrs_clt_sess *sess = to_clt_sess(s); 1884 struct rdma_cm_id *cm_id; 1885 int err; 1886 1887 cm_id = rdma_create_id(&init_net, rtrs_clt_rdma_cm_handler, con, 1888 sess->s.dst_addr.ss_family == AF_IB ? 1889 RDMA_PS_IB : RDMA_PS_TCP, IB_QPT_RC); 1890 if (IS_ERR(cm_id)) { 1891 err = PTR_ERR(cm_id); 1892 rtrs_err(s, "Failed to create CM ID, err: %d\n", err); 1893 1894 return err; 1895 } 1896 con->c.cm_id = cm_id; 1897 con->cm_err = 0; 1898 /* allow the port to be reused */ 1899 err = rdma_set_reuseaddr(cm_id, 1); 1900 if (err != 0) { 1901 rtrs_err(s, "Set address reuse failed, err: %d\n", err); 1902 goto destroy_cm; 1903 } 1904 err = rdma_resolve_addr(cm_id, (struct sockaddr *)&sess->s.src_addr, 1905 (struct sockaddr *)&sess->s.dst_addr, 1906 RTRS_CONNECT_TIMEOUT_MS); 1907 if (err) { 1908 rtrs_err(s, "Failed to resolve address, err: %d\n", err); 1909 goto destroy_cm; 1910 } 1911 /* 1912 * Combine connection status and session events. This is needed 1913 * for waiting two possible cases: cm_err has something meaningful 1914 * or session state was really changed to error by device removal. 1915 */ 1916 err = wait_event_interruptible_timeout( 1917 sess->state_wq, 1918 con->cm_err || sess->state != RTRS_CLT_CONNECTING, 1919 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS)); 1920 if (err == 0 || err == -ERESTARTSYS) { 1921 if (err == 0) 1922 err = -ETIMEDOUT; 1923 /* Timedout or interrupted */ 1924 goto errr; 1925 } 1926 if (con->cm_err < 0) { 1927 err = con->cm_err; 1928 goto errr; 1929 } 1930 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTING) { 1931 /* Device removal */ 1932 err = -ECONNABORTED; 1933 goto errr; 1934 } 1935 1936 return 0; 1937 1938 errr: 1939 stop_cm(con); 1940 mutex_lock(&con->con_mutex); 1941 destroy_con_cq_qp(con); 1942 mutex_unlock(&con->con_mutex); 1943 destroy_cm: 1944 destroy_cm(con); 1945 1946 return err; 1947 } 1948 1949 static void rtrs_clt_sess_up(struct rtrs_clt_sess *sess) 1950 { 1951 struct rtrs_clt *clt = sess->clt; 1952 int up; 1953 1954 /* 1955 * We can fire RECONNECTED event only when all paths were 1956 * connected on rtrs_clt_open(), then each was disconnected 1957 * and the first one connected again. That's why this nasty 1958 * game with counter value. 1959 */ 1960 1961 mutex_lock(&clt->paths_ev_mutex); 1962 up = ++clt->paths_up; 1963 /* 1964 * Here it is safe to access paths num directly since up counter 1965 * is greater than MAX_PATHS_NUM only while rtrs_clt_open() is 1966 * in progress, thus paths removals are impossible. 1967 */ 1968 if (up > MAX_PATHS_NUM && up == MAX_PATHS_NUM + clt->paths_num) 1969 clt->paths_up = clt->paths_num; 1970 else if (up == 1) 1971 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_RECONNECTED); 1972 mutex_unlock(&clt->paths_ev_mutex); 1973 1974 /* Mark session as established */ 1975 sess->established = true; 1976 sess->reconnect_attempts = 0; 1977 sess->stats->reconnects.successful_cnt++; 1978 } 1979 1980 static void rtrs_clt_sess_down(struct rtrs_clt_sess *sess) 1981 { 1982 struct rtrs_clt *clt = sess->clt; 1983 1984 if (!sess->established) 1985 return; 1986 1987 sess->established = false; 1988 mutex_lock(&clt->paths_ev_mutex); 1989 WARN_ON(!clt->paths_up); 1990 if (--clt->paths_up == 0) 1991 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_DISCONNECTED); 1992 mutex_unlock(&clt->paths_ev_mutex); 1993 } 1994 1995 static void rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_sess *sess) 1996 { 1997 struct rtrs_clt_con *con; 1998 unsigned int cid; 1999 2000 WARN_ON(READ_ONCE(sess->state) == RTRS_CLT_CONNECTED); 2001 2002 /* 2003 * Possible race with rtrs_clt_open(), when DEVICE_REMOVAL comes 2004 * exactly in between. Start destroying after it finishes. 2005 */ 2006 mutex_lock(&sess->init_mutex); 2007 mutex_unlock(&sess->init_mutex); 2008 2009 /* 2010 * All IO paths must observe !CONNECTED state before we 2011 * free everything. 2012 */ 2013 synchronize_rcu(); 2014 2015 rtrs_clt_stop_hb(sess); 2016 2017 /* 2018 * The order it utterly crucial: firstly disconnect and complete all 2019 * rdma requests with error (thus set in_use=false for requests), 2020 * then fail outstanding requests checking in_use for each, and 2021 * eventually notify upper layer about session disconnection. 2022 */ 2023 2024 for (cid = 0; cid < sess->s.con_num; cid++) { 2025 if (!sess->s.con[cid]) 2026 break; 2027 con = to_clt_con(sess->s.con[cid]); 2028 stop_cm(con); 2029 } 2030 fail_all_outstanding_reqs(sess); 2031 free_sess_reqs(sess); 2032 rtrs_clt_sess_down(sess); 2033 2034 /* 2035 * Wait for graceful shutdown, namely when peer side invokes 2036 * rdma_disconnect(). 'connected_cnt' is decremented only on 2037 * CM events, thus if other side had crashed and hb has detected 2038 * something is wrong, here we will stuck for exactly timeout ms, 2039 * since CM does not fire anything. That is fine, we are not in 2040 * hurry. 2041 */ 2042 wait_event_timeout(sess->state_wq, !atomic_read(&sess->connected_cnt), 2043 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS)); 2044 2045 for (cid = 0; cid < sess->s.con_num; cid++) { 2046 if (!sess->s.con[cid]) 2047 break; 2048 con = to_clt_con(sess->s.con[cid]); 2049 mutex_lock(&con->con_mutex); 2050 destroy_con_cq_qp(con); 2051 mutex_unlock(&con->con_mutex); 2052 destroy_cm(con); 2053 destroy_con(con); 2054 } 2055 } 2056 2057 static inline bool xchg_sessions(struct rtrs_clt_sess __rcu **rcu_ppcpu_path, 2058 struct rtrs_clt_sess *sess, 2059 struct rtrs_clt_sess *next) 2060 { 2061 struct rtrs_clt_sess **ppcpu_path; 2062 2063 /* Call cmpxchg() without sparse warnings */ 2064 ppcpu_path = (typeof(ppcpu_path))rcu_ppcpu_path; 2065 return sess == cmpxchg(ppcpu_path, sess, next); 2066 } 2067 2068 static void rtrs_clt_remove_path_from_arr(struct rtrs_clt_sess *sess) 2069 { 2070 struct rtrs_clt *clt = sess->clt; 2071 struct rtrs_clt_sess *next; 2072 bool wait_for_grace = false; 2073 int cpu; 2074 2075 mutex_lock(&clt->paths_mutex); 2076 list_del_rcu(&sess->s.entry); 2077 2078 /* Make sure everybody observes path removal. */ 2079 synchronize_rcu(); 2080 2081 /* 2082 * At this point nobody sees @sess in the list, but still we have 2083 * dangling pointer @pcpu_path which _can_ point to @sess. Since 2084 * nobody can observe @sess in the list, we guarantee that IO path 2085 * will not assign @sess to @pcpu_path, i.e. @pcpu_path can be equal 2086 * to @sess, but can never again become @sess. 2087 */ 2088 2089 /* 2090 * Decrement paths number only after grace period, because 2091 * caller of do_each_path() must firstly observe list without 2092 * path and only then decremented paths number. 2093 * 2094 * Otherwise there can be the following situation: 2095 * o Two paths exist and IO is coming. 2096 * o One path is removed: 2097 * CPU#0 CPU#1 2098 * do_each_path(): rtrs_clt_remove_path_from_arr(): 2099 * path = get_next_path() 2100 * ^^^ list_del_rcu(path) 2101 * [!CONNECTED path] clt->paths_num-- 2102 * ^^^^^^^^^ 2103 * load clt->paths_num from 2 to 1 2104 * ^^^^^^^^^ 2105 * sees 1 2106 * 2107 * path is observed as !CONNECTED, but do_each_path() loop 2108 * ends, because expression i < clt->paths_num is false. 2109 */ 2110 clt->paths_num--; 2111 2112 /* 2113 * Get @next connection from current @sess which is going to be 2114 * removed. If @sess is the last element, then @next is NULL. 2115 */ 2116 rcu_read_lock(); 2117 next = list_next_or_null_rr_rcu(&clt->paths_list, &sess->s.entry, 2118 typeof(*next), s.entry); 2119 rcu_read_unlock(); 2120 2121 /* 2122 * @pcpu paths can still point to the path which is going to be 2123 * removed, so change the pointer manually. 2124 */ 2125 for_each_possible_cpu(cpu) { 2126 struct rtrs_clt_sess __rcu **ppcpu_path; 2127 2128 ppcpu_path = per_cpu_ptr(clt->pcpu_path, cpu); 2129 if (rcu_dereference_protected(*ppcpu_path, 2130 lockdep_is_held(&clt->paths_mutex)) != sess) 2131 /* 2132 * synchronize_rcu() was called just after deleting 2133 * entry from the list, thus IO code path cannot 2134 * change pointer back to the pointer which is going 2135 * to be removed, we are safe here. 2136 */ 2137 continue; 2138 2139 /* 2140 * We race with IO code path, which also changes pointer, 2141 * thus we have to be careful not to overwrite it. 2142 */ 2143 if (xchg_sessions(ppcpu_path, sess, next)) 2144 /* 2145 * @ppcpu_path was successfully replaced with @next, 2146 * that means that someone could also pick up the 2147 * @sess and dereferencing it right now, so wait for 2148 * a grace period is required. 2149 */ 2150 wait_for_grace = true; 2151 } 2152 if (wait_for_grace) 2153 synchronize_rcu(); 2154 2155 mutex_unlock(&clt->paths_mutex); 2156 } 2157 2158 static void rtrs_clt_add_path_to_arr(struct rtrs_clt_sess *sess) 2159 { 2160 struct rtrs_clt *clt = sess->clt; 2161 2162 mutex_lock(&clt->paths_mutex); 2163 clt->paths_num++; 2164 2165 list_add_tail_rcu(&sess->s.entry, &clt->paths_list); 2166 mutex_unlock(&clt->paths_mutex); 2167 } 2168 2169 static void rtrs_clt_close_work(struct work_struct *work) 2170 { 2171 struct rtrs_clt_sess *sess; 2172 2173 sess = container_of(work, struct rtrs_clt_sess, close_work); 2174 2175 cancel_delayed_work_sync(&sess->reconnect_dwork); 2176 rtrs_clt_stop_and_destroy_conns(sess); 2177 rtrs_clt_change_state(sess, RTRS_CLT_CLOSED); 2178 } 2179 2180 static int init_conns(struct rtrs_clt_sess *sess) 2181 { 2182 unsigned int cid; 2183 int err; 2184 2185 /* 2186 * On every new session connections increase reconnect counter 2187 * to avoid clashes with previous sessions not yet closed 2188 * sessions on a server side. 2189 */ 2190 sess->s.recon_cnt++; 2191 2192 /* Establish all RDMA connections */ 2193 for (cid = 0; cid < sess->s.con_num; cid++) { 2194 err = create_con(sess, cid); 2195 if (err) 2196 goto destroy; 2197 2198 err = create_cm(to_clt_con(sess->s.con[cid])); 2199 if (err) { 2200 destroy_con(to_clt_con(sess->s.con[cid])); 2201 goto destroy; 2202 } 2203 } 2204 err = alloc_sess_reqs(sess); 2205 if (err) 2206 goto destroy; 2207 2208 rtrs_clt_start_hb(sess); 2209 2210 return 0; 2211 2212 destroy: 2213 while (cid--) { 2214 struct rtrs_clt_con *con = to_clt_con(sess->s.con[cid]); 2215 2216 stop_cm(con); 2217 2218 mutex_lock(&con->con_mutex); 2219 destroy_con_cq_qp(con); 2220 mutex_unlock(&con->con_mutex); 2221 destroy_cm(con); 2222 destroy_con(con); 2223 } 2224 /* 2225 * If we've never taken async path and got an error, say, 2226 * doing rdma_resolve_addr(), switch to CONNECTION_ERR state 2227 * manually to keep reconnecting. 2228 */ 2229 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR); 2230 2231 return err; 2232 } 2233 2234 static void rtrs_clt_info_req_done(struct ib_cq *cq, struct ib_wc *wc) 2235 { 2236 struct rtrs_clt_con *con = cq->cq_context; 2237 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 2238 struct rtrs_iu *iu; 2239 2240 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 2241 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 2242 2243 if (unlikely(wc->status != IB_WC_SUCCESS)) { 2244 rtrs_err(sess->clt, "Sess info request send failed: %s\n", 2245 ib_wc_status_msg(wc->status)); 2246 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR); 2247 return; 2248 } 2249 2250 rtrs_clt_update_wc_stats(con); 2251 } 2252 2253 static int process_info_rsp(struct rtrs_clt_sess *sess, 2254 const struct rtrs_msg_info_rsp *msg) 2255 { 2256 unsigned int sg_cnt, total_len; 2257 int i, sgi; 2258 2259 sg_cnt = le16_to_cpu(msg->sg_cnt); 2260 if (unlikely(!sg_cnt || (sess->queue_depth % sg_cnt))) { 2261 rtrs_err(sess->clt, "Incorrect sg_cnt %d, is not multiple\n", 2262 sg_cnt); 2263 return -EINVAL; 2264 } 2265 2266 /* 2267 * Check if IB immediate data size is enough to hold the mem_id and 2268 * the offset inside the memory chunk. 2269 */ 2270 if (unlikely((ilog2(sg_cnt - 1) + 1) + 2271 (ilog2(sess->chunk_size - 1) + 1) > 2272 MAX_IMM_PAYL_BITS)) { 2273 rtrs_err(sess->clt, 2274 "RDMA immediate size (%db) not enough to encode %d buffers of size %dB\n", 2275 MAX_IMM_PAYL_BITS, sg_cnt, sess->chunk_size); 2276 return -EINVAL; 2277 } 2278 total_len = 0; 2279 for (sgi = 0, i = 0; sgi < sg_cnt && i < sess->queue_depth; sgi++) { 2280 const struct rtrs_sg_desc *desc = &msg->desc[sgi]; 2281 u32 len, rkey; 2282 u64 addr; 2283 2284 addr = le64_to_cpu(desc->addr); 2285 rkey = le32_to_cpu(desc->key); 2286 len = le32_to_cpu(desc->len); 2287 2288 total_len += len; 2289 2290 if (unlikely(!len || (len % sess->chunk_size))) { 2291 rtrs_err(sess->clt, "Incorrect [%d].len %d\n", sgi, 2292 len); 2293 return -EINVAL; 2294 } 2295 for ( ; len && i < sess->queue_depth; i++) { 2296 sess->rbufs[i].addr = addr; 2297 sess->rbufs[i].rkey = rkey; 2298 2299 len -= sess->chunk_size; 2300 addr += sess->chunk_size; 2301 } 2302 } 2303 /* Sanity check */ 2304 if (unlikely(sgi != sg_cnt || i != sess->queue_depth)) { 2305 rtrs_err(sess->clt, "Incorrect sg vector, not fully mapped\n"); 2306 return -EINVAL; 2307 } 2308 if (unlikely(total_len != sess->chunk_size * sess->queue_depth)) { 2309 rtrs_err(sess->clt, "Incorrect total_len %d\n", total_len); 2310 return -EINVAL; 2311 } 2312 2313 return 0; 2314 } 2315 2316 static void rtrs_clt_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc) 2317 { 2318 struct rtrs_clt_con *con = cq->cq_context; 2319 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 2320 struct rtrs_msg_info_rsp *msg; 2321 enum rtrs_clt_state state; 2322 struct rtrs_iu *iu; 2323 size_t rx_sz; 2324 int err; 2325 2326 state = RTRS_CLT_CONNECTING_ERR; 2327 2328 WARN_ON(con->c.cid); 2329 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 2330 if (unlikely(wc->status != IB_WC_SUCCESS)) { 2331 rtrs_err(sess->clt, "Sess info response recv failed: %s\n", 2332 ib_wc_status_msg(wc->status)); 2333 goto out; 2334 } 2335 WARN_ON(wc->opcode != IB_WC_RECV); 2336 2337 if (unlikely(wc->byte_len < sizeof(*msg))) { 2338 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n", 2339 wc->byte_len); 2340 goto out; 2341 } 2342 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr, 2343 iu->size, DMA_FROM_DEVICE); 2344 msg = iu->buf; 2345 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_INFO_RSP)) { 2346 rtrs_err(sess->clt, "Sess info response is malformed: type %d\n", 2347 le16_to_cpu(msg->type)); 2348 goto out; 2349 } 2350 rx_sz = sizeof(*msg); 2351 rx_sz += sizeof(msg->desc[0]) * le16_to_cpu(msg->sg_cnt); 2352 if (unlikely(wc->byte_len < rx_sz)) { 2353 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n", 2354 wc->byte_len); 2355 goto out; 2356 } 2357 err = process_info_rsp(sess, msg); 2358 if (unlikely(err)) 2359 goto out; 2360 2361 err = post_recv_sess(sess); 2362 if (unlikely(err)) 2363 goto out; 2364 2365 state = RTRS_CLT_CONNECTED; 2366 2367 out: 2368 rtrs_clt_update_wc_stats(con); 2369 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 2370 rtrs_clt_change_state(sess, state); 2371 } 2372 2373 static int rtrs_send_sess_info(struct rtrs_clt_sess *sess) 2374 { 2375 struct rtrs_clt_con *usr_con = to_clt_con(sess->s.con[0]); 2376 struct rtrs_msg_info_req *msg; 2377 struct rtrs_iu *tx_iu, *rx_iu; 2378 size_t rx_sz; 2379 int err; 2380 2381 rx_sz = sizeof(struct rtrs_msg_info_rsp); 2382 rx_sz += sizeof(u64) * MAX_SESS_QUEUE_DEPTH; 2383 2384 tx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), GFP_KERNEL, 2385 sess->s.dev->ib_dev, DMA_TO_DEVICE, 2386 rtrs_clt_info_req_done); 2387 rx_iu = rtrs_iu_alloc(1, rx_sz, GFP_KERNEL, sess->s.dev->ib_dev, 2388 DMA_FROM_DEVICE, rtrs_clt_info_rsp_done); 2389 if (unlikely(!tx_iu || !rx_iu)) { 2390 err = -ENOMEM; 2391 goto out; 2392 } 2393 /* Prepare for getting info response */ 2394 err = rtrs_iu_post_recv(&usr_con->c, rx_iu); 2395 if (unlikely(err)) { 2396 rtrs_err(sess->clt, "rtrs_iu_post_recv(), err: %d\n", err); 2397 goto out; 2398 } 2399 rx_iu = NULL; 2400 2401 msg = tx_iu->buf; 2402 msg->type = cpu_to_le16(RTRS_MSG_INFO_REQ); 2403 memcpy(msg->sessname, sess->s.sessname, sizeof(msg->sessname)); 2404 2405 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr, 2406 tx_iu->size, DMA_TO_DEVICE); 2407 2408 /* Send info request */ 2409 err = rtrs_iu_post_send(&usr_con->c, tx_iu, sizeof(*msg), NULL); 2410 if (unlikely(err)) { 2411 rtrs_err(sess->clt, "rtrs_iu_post_send(), err: %d\n", err); 2412 goto out; 2413 } 2414 tx_iu = NULL; 2415 2416 /* Wait for state change */ 2417 wait_event_interruptible_timeout(sess->state_wq, 2418 sess->state != RTRS_CLT_CONNECTING, 2419 msecs_to_jiffies( 2420 RTRS_CONNECT_TIMEOUT_MS)); 2421 if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)) { 2422 if (READ_ONCE(sess->state) == RTRS_CLT_CONNECTING_ERR) 2423 err = -ECONNRESET; 2424 else 2425 err = -ETIMEDOUT; 2426 goto out; 2427 } 2428 2429 out: 2430 if (tx_iu) 2431 rtrs_iu_free(tx_iu, sess->s.dev->ib_dev, 1); 2432 if (rx_iu) 2433 rtrs_iu_free(rx_iu, sess->s.dev->ib_dev, 1); 2434 if (unlikely(err)) 2435 /* If we've never taken async path because of malloc problems */ 2436 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR); 2437 2438 return err; 2439 } 2440 2441 /** 2442 * init_sess() - establishes all session connections and does handshake 2443 * @sess: client session. 2444 * In case of error full close or reconnect procedure should be taken, 2445 * because reconnect or close async works can be started. 2446 */ 2447 static int init_sess(struct rtrs_clt_sess *sess) 2448 { 2449 int err; 2450 2451 mutex_lock(&sess->init_mutex); 2452 err = init_conns(sess); 2453 if (err) { 2454 rtrs_err(sess->clt, "init_conns(), err: %d\n", err); 2455 goto out; 2456 } 2457 err = rtrs_send_sess_info(sess); 2458 if (err) { 2459 rtrs_err(sess->clt, "rtrs_send_sess_info(), err: %d\n", err); 2460 goto out; 2461 } 2462 rtrs_clt_sess_up(sess); 2463 out: 2464 mutex_unlock(&sess->init_mutex); 2465 2466 return err; 2467 } 2468 2469 static void rtrs_clt_reconnect_work(struct work_struct *work) 2470 { 2471 struct rtrs_clt_sess *sess; 2472 struct rtrs_clt *clt; 2473 unsigned int delay_ms; 2474 int err; 2475 2476 sess = container_of(to_delayed_work(work), struct rtrs_clt_sess, 2477 reconnect_dwork); 2478 clt = sess->clt; 2479 2480 if (READ_ONCE(sess->state) != RTRS_CLT_RECONNECTING) 2481 return; 2482 2483 if (sess->reconnect_attempts >= clt->max_reconnect_attempts) { 2484 /* Close a session completely if max attempts is reached */ 2485 rtrs_clt_close_conns(sess, false); 2486 return; 2487 } 2488 sess->reconnect_attempts++; 2489 2490 /* Stop everything */ 2491 rtrs_clt_stop_and_destroy_conns(sess); 2492 msleep(RTRS_RECONNECT_BACKOFF); 2493 if (rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING)) { 2494 err = init_sess(sess); 2495 if (err) 2496 goto reconnect_again; 2497 } 2498 2499 return; 2500 2501 reconnect_again: 2502 if (rtrs_clt_change_state(sess, RTRS_CLT_RECONNECTING)) { 2503 sess->stats->reconnects.fail_cnt++; 2504 delay_ms = clt->reconnect_delay_sec * 1000; 2505 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 2506 msecs_to_jiffies(delay_ms + 2507 prandom_u32() % 2508 RTRS_RECONNECT_SEED)); 2509 } 2510 } 2511 2512 static void rtrs_clt_dev_release(struct device *dev) 2513 { 2514 struct rtrs_clt *clt = container_of(dev, struct rtrs_clt, dev); 2515 2516 kfree(clt); 2517 } 2518 2519 static struct rtrs_clt *alloc_clt(const char *sessname, size_t paths_num, 2520 u16 port, size_t pdu_sz, void *priv, 2521 void (*link_ev)(void *priv, 2522 enum rtrs_clt_link_ev ev), 2523 unsigned int max_segments, 2524 size_t max_segment_size, 2525 unsigned int reconnect_delay_sec, 2526 unsigned int max_reconnect_attempts) 2527 { 2528 struct rtrs_clt *clt; 2529 int err; 2530 2531 if (!paths_num || paths_num > MAX_PATHS_NUM) 2532 return ERR_PTR(-EINVAL); 2533 2534 if (strlen(sessname) >= sizeof(clt->sessname)) 2535 return ERR_PTR(-EINVAL); 2536 2537 clt = kzalloc(sizeof(*clt), GFP_KERNEL); 2538 if (!clt) 2539 return ERR_PTR(-ENOMEM); 2540 2541 clt->pcpu_path = alloc_percpu(typeof(*clt->pcpu_path)); 2542 if (!clt->pcpu_path) { 2543 kfree(clt); 2544 return ERR_PTR(-ENOMEM); 2545 } 2546 2547 uuid_gen(&clt->paths_uuid); 2548 INIT_LIST_HEAD_RCU(&clt->paths_list); 2549 clt->paths_num = paths_num; 2550 clt->paths_up = MAX_PATHS_NUM; 2551 clt->port = port; 2552 clt->pdu_sz = pdu_sz; 2553 clt->max_segments = max_segments; 2554 clt->max_segment_size = max_segment_size; 2555 clt->reconnect_delay_sec = reconnect_delay_sec; 2556 clt->max_reconnect_attempts = max_reconnect_attempts; 2557 clt->priv = priv; 2558 clt->link_ev = link_ev; 2559 clt->mp_policy = MP_POLICY_MIN_INFLIGHT; 2560 strlcpy(clt->sessname, sessname, sizeof(clt->sessname)); 2561 init_waitqueue_head(&clt->permits_wait); 2562 mutex_init(&clt->paths_ev_mutex); 2563 mutex_init(&clt->paths_mutex); 2564 2565 clt->dev.class = rtrs_clt_dev_class; 2566 clt->dev.release = rtrs_clt_dev_release; 2567 err = dev_set_name(&clt->dev, "%s", sessname); 2568 if (err) { 2569 free_percpu(clt->pcpu_path); 2570 kfree(clt); 2571 return ERR_PTR(err); 2572 } 2573 /* 2574 * Suppress user space notification until 2575 * sysfs files are created 2576 */ 2577 dev_set_uevent_suppress(&clt->dev, true); 2578 err = device_register(&clt->dev); 2579 if (err) { 2580 free_percpu(clt->pcpu_path); 2581 put_device(&clt->dev); 2582 return ERR_PTR(err); 2583 } 2584 2585 clt->kobj_paths = kobject_create_and_add("paths", &clt->dev.kobj); 2586 if (!clt->kobj_paths) { 2587 free_percpu(clt->pcpu_path); 2588 device_unregister(&clt->dev); 2589 return NULL; 2590 } 2591 err = rtrs_clt_create_sysfs_root_files(clt); 2592 if (err) { 2593 free_percpu(clt->pcpu_path); 2594 kobject_del(clt->kobj_paths); 2595 kobject_put(clt->kobj_paths); 2596 device_unregister(&clt->dev); 2597 return ERR_PTR(err); 2598 } 2599 dev_set_uevent_suppress(&clt->dev, false); 2600 kobject_uevent(&clt->dev.kobj, KOBJ_ADD); 2601 2602 return clt; 2603 } 2604 2605 static void wait_for_inflight_permits(struct rtrs_clt *clt) 2606 { 2607 if (clt->permits_map) { 2608 size_t sz = clt->queue_depth; 2609 2610 wait_event(clt->permits_wait, 2611 find_first_bit(clt->permits_map, sz) >= sz); 2612 } 2613 } 2614 2615 static void free_clt(struct rtrs_clt *clt) 2616 { 2617 wait_for_inflight_permits(clt); 2618 free_permits(clt); 2619 free_percpu(clt->pcpu_path); 2620 mutex_destroy(&clt->paths_ev_mutex); 2621 mutex_destroy(&clt->paths_mutex); 2622 /* release callback will free clt in last put */ 2623 device_unregister(&clt->dev); 2624 } 2625 2626 /** 2627 * rtrs_clt_open() - Open a session to an RTRS server 2628 * @ops: holds the link event callback and the private pointer. 2629 * @sessname: name of the session 2630 * @paths: Paths to be established defined by their src and dst addresses 2631 * @paths_num: Number of elements in the @paths array 2632 * @port: port to be used by the RTRS session 2633 * @pdu_sz: Size of extra payload which can be accessed after permit allocation. 2634 * @reconnect_delay_sec: time between reconnect tries 2635 * @max_segments: Max. number of segments per IO request 2636 * @max_segment_size: Max. size of one segment 2637 * @max_reconnect_attempts: Number of times to reconnect on error before giving 2638 * up, 0 for * disabled, -1 for forever 2639 * 2640 * Starts session establishment with the rtrs_server. The function can block 2641 * up to ~2000ms before it returns. 2642 * 2643 * Return a valid pointer on success otherwise PTR_ERR. 2644 */ 2645 struct rtrs_clt *rtrs_clt_open(struct rtrs_clt_ops *ops, 2646 const char *sessname, 2647 const struct rtrs_addr *paths, 2648 size_t paths_num, u16 port, 2649 size_t pdu_sz, u8 reconnect_delay_sec, 2650 u16 max_segments, 2651 size_t max_segment_size, 2652 s16 max_reconnect_attempts) 2653 { 2654 struct rtrs_clt_sess *sess, *tmp; 2655 struct rtrs_clt *clt; 2656 int err, i; 2657 2658 clt = alloc_clt(sessname, paths_num, port, pdu_sz, ops->priv, 2659 ops->link_ev, 2660 max_segments, max_segment_size, reconnect_delay_sec, 2661 max_reconnect_attempts); 2662 if (IS_ERR(clt)) { 2663 err = PTR_ERR(clt); 2664 goto out; 2665 } 2666 for (i = 0; i < paths_num; i++) { 2667 struct rtrs_clt_sess *sess; 2668 2669 sess = alloc_sess(clt, &paths[i], nr_cpu_ids, 2670 max_segments, max_segment_size); 2671 if (IS_ERR(sess)) { 2672 err = PTR_ERR(sess); 2673 goto close_all_sess; 2674 } 2675 list_add_tail_rcu(&sess->s.entry, &clt->paths_list); 2676 2677 err = init_sess(sess); 2678 if (err) { 2679 list_del_rcu(&sess->s.entry); 2680 rtrs_clt_close_conns(sess, true); 2681 free_sess(sess); 2682 goto close_all_sess; 2683 } 2684 2685 err = rtrs_clt_create_sess_files(sess); 2686 if (err) { 2687 list_del_rcu(&sess->s.entry); 2688 rtrs_clt_close_conns(sess, true); 2689 free_sess(sess); 2690 goto close_all_sess; 2691 } 2692 } 2693 err = alloc_permits(clt); 2694 if (err) 2695 goto close_all_sess; 2696 2697 return clt; 2698 2699 close_all_sess: 2700 list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) { 2701 rtrs_clt_destroy_sess_files(sess, NULL); 2702 rtrs_clt_close_conns(sess, true); 2703 kobject_put(&sess->kobj); 2704 } 2705 rtrs_clt_destroy_sysfs_root_files(clt); 2706 rtrs_clt_destroy_sysfs_root_folders(clt); 2707 free_clt(clt); 2708 2709 out: 2710 return ERR_PTR(err); 2711 } 2712 EXPORT_SYMBOL(rtrs_clt_open); 2713 2714 /** 2715 * rtrs_clt_close() - Close a session 2716 * @clt: Session handle. Session is freed upon return. 2717 */ 2718 void rtrs_clt_close(struct rtrs_clt *clt) 2719 { 2720 struct rtrs_clt_sess *sess, *tmp; 2721 2722 /* Firstly forbid sysfs access */ 2723 rtrs_clt_destroy_sysfs_root_files(clt); 2724 rtrs_clt_destroy_sysfs_root_folders(clt); 2725 2726 /* Now it is safe to iterate over all paths without locks */ 2727 list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) { 2728 rtrs_clt_destroy_sess_files(sess, NULL); 2729 rtrs_clt_close_conns(sess, true); 2730 kobject_put(&sess->kobj); 2731 } 2732 free_clt(clt); 2733 } 2734 EXPORT_SYMBOL(rtrs_clt_close); 2735 2736 int rtrs_clt_reconnect_from_sysfs(struct rtrs_clt_sess *sess) 2737 { 2738 enum rtrs_clt_state old_state; 2739 int err = -EBUSY; 2740 bool changed; 2741 2742 changed = rtrs_clt_change_state_get_old(sess, RTRS_CLT_RECONNECTING, 2743 &old_state); 2744 if (changed) { 2745 sess->reconnect_attempts = 0; 2746 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 0); 2747 } 2748 if (changed || old_state == RTRS_CLT_RECONNECTING) { 2749 /* 2750 * flush_delayed_work() queues pending work for immediate 2751 * execution, so do the flush if we have queued something 2752 * right now or work is pending. 2753 */ 2754 flush_delayed_work(&sess->reconnect_dwork); 2755 err = (READ_ONCE(sess->state) == 2756 RTRS_CLT_CONNECTED ? 0 : -ENOTCONN); 2757 } 2758 2759 return err; 2760 } 2761 2762 int rtrs_clt_disconnect_from_sysfs(struct rtrs_clt_sess *sess) 2763 { 2764 rtrs_clt_close_conns(sess, true); 2765 2766 return 0; 2767 } 2768 2769 int rtrs_clt_remove_path_from_sysfs(struct rtrs_clt_sess *sess, 2770 const struct attribute *sysfs_self) 2771 { 2772 enum rtrs_clt_state old_state; 2773 bool changed; 2774 2775 /* 2776 * Continue stopping path till state was changed to DEAD or 2777 * state was observed as DEAD: 2778 * 1. State was changed to DEAD - we were fast and nobody 2779 * invoked rtrs_clt_reconnect(), which can again start 2780 * reconnecting. 2781 * 2. State was observed as DEAD - we have someone in parallel 2782 * removing the path. 2783 */ 2784 do { 2785 rtrs_clt_close_conns(sess, true); 2786 changed = rtrs_clt_change_state_get_old(sess, 2787 RTRS_CLT_DEAD, 2788 &old_state); 2789 } while (!changed && old_state != RTRS_CLT_DEAD); 2790 2791 if (likely(changed)) { 2792 rtrs_clt_destroy_sess_files(sess, sysfs_self); 2793 rtrs_clt_remove_path_from_arr(sess); 2794 kobject_put(&sess->kobj); 2795 } 2796 2797 return 0; 2798 } 2799 2800 void rtrs_clt_set_max_reconnect_attempts(struct rtrs_clt *clt, int value) 2801 { 2802 clt->max_reconnect_attempts = (unsigned int)value; 2803 } 2804 2805 int rtrs_clt_get_max_reconnect_attempts(const struct rtrs_clt *clt) 2806 { 2807 return (int)clt->max_reconnect_attempts; 2808 } 2809 2810 /** 2811 * rtrs_clt_request() - Request data transfer to/from server via RDMA. 2812 * 2813 * @dir: READ/WRITE 2814 * @ops: callback function to be called as confirmation, and the pointer. 2815 * @clt: Session 2816 * @permit: Preallocated permit 2817 * @vec: Message that is sent to server together with the request. 2818 * Sum of len of all @vec elements limited to <= IO_MSG_SIZE. 2819 * Since the msg is copied internally it can be allocated on stack. 2820 * @nr: Number of elements in @vec. 2821 * @data_len: length of data sent to/from server 2822 * @sg: Pages to be sent/received to/from server. 2823 * @sg_cnt: Number of elements in the @sg 2824 * 2825 * Return: 2826 * 0: Success 2827 * <0: Error 2828 * 2829 * On dir=READ rtrs client will request a data transfer from Server to client. 2830 * The data that the server will respond with will be stored in @sg when 2831 * the user receives an %RTRS_CLT_RDMA_EV_RDMA_REQUEST_WRITE_COMPL event. 2832 * On dir=WRITE rtrs client will rdma write data in sg to server side. 2833 */ 2834 int rtrs_clt_request(int dir, struct rtrs_clt_req_ops *ops, 2835 struct rtrs_clt *clt, struct rtrs_permit *permit, 2836 const struct kvec *vec, size_t nr, size_t data_len, 2837 struct scatterlist *sg, unsigned int sg_cnt) 2838 { 2839 struct rtrs_clt_io_req *req; 2840 struct rtrs_clt_sess *sess; 2841 2842 enum dma_data_direction dma_dir; 2843 int err = -ECONNABORTED, i; 2844 size_t usr_len, hdr_len; 2845 struct path_it it; 2846 2847 /* Get kvec length */ 2848 for (i = 0, usr_len = 0; i < nr; i++) 2849 usr_len += vec[i].iov_len; 2850 2851 if (dir == READ) { 2852 hdr_len = sizeof(struct rtrs_msg_rdma_read) + 2853 sg_cnt * sizeof(struct rtrs_sg_desc); 2854 dma_dir = DMA_FROM_DEVICE; 2855 } else { 2856 hdr_len = sizeof(struct rtrs_msg_rdma_write); 2857 dma_dir = DMA_TO_DEVICE; 2858 } 2859 2860 rcu_read_lock(); 2861 for (path_it_init(&it, clt); 2862 (sess = it.next_path(&it)) && it.i < it.clt->paths_num; it.i++) { 2863 if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)) 2864 continue; 2865 2866 if (unlikely(usr_len + hdr_len > sess->max_hdr_size)) { 2867 rtrs_wrn_rl(sess->clt, 2868 "%s request failed, user message size is %zu and header length %zu, but max size is %u\n", 2869 dir == READ ? "Read" : "Write", 2870 usr_len, hdr_len, sess->max_hdr_size); 2871 err = -EMSGSIZE; 2872 break; 2873 } 2874 req = rtrs_clt_get_req(sess, ops->conf_fn, permit, ops->priv, 2875 vec, usr_len, sg, sg_cnt, data_len, 2876 dma_dir); 2877 if (dir == READ) 2878 err = rtrs_clt_read_req(req); 2879 else 2880 err = rtrs_clt_write_req(req); 2881 if (unlikely(err)) { 2882 req->in_use = false; 2883 continue; 2884 } 2885 /* Success path */ 2886 break; 2887 } 2888 path_it_deinit(&it); 2889 rcu_read_unlock(); 2890 2891 return err; 2892 } 2893 EXPORT_SYMBOL(rtrs_clt_request); 2894 2895 /** 2896 * rtrs_clt_query() - queries RTRS session attributes 2897 *@clt: session pointer 2898 *@attr: query results for session attributes. 2899 * Returns: 2900 * 0 on success 2901 * -ECOMM no connection to the server 2902 */ 2903 int rtrs_clt_query(struct rtrs_clt *clt, struct rtrs_attrs *attr) 2904 { 2905 if (!rtrs_clt_is_connected(clt)) 2906 return -ECOMM; 2907 2908 attr->queue_depth = clt->queue_depth; 2909 attr->max_io_size = clt->max_io_size; 2910 attr->sess_kobj = &clt->dev.kobj; 2911 strlcpy(attr->sessname, clt->sessname, sizeof(attr->sessname)); 2912 2913 return 0; 2914 } 2915 EXPORT_SYMBOL(rtrs_clt_query); 2916 2917 int rtrs_clt_create_path_from_sysfs(struct rtrs_clt *clt, 2918 struct rtrs_addr *addr) 2919 { 2920 struct rtrs_clt_sess *sess; 2921 int err; 2922 2923 sess = alloc_sess(clt, addr, nr_cpu_ids, clt->max_segments, 2924 clt->max_segment_size); 2925 if (IS_ERR(sess)) 2926 return PTR_ERR(sess); 2927 2928 /* 2929 * It is totally safe to add path in CONNECTING state: coming 2930 * IO will never grab it. Also it is very important to add 2931 * path before init, since init fires LINK_CONNECTED event. 2932 */ 2933 rtrs_clt_add_path_to_arr(sess); 2934 2935 err = init_sess(sess); 2936 if (err) 2937 goto close_sess; 2938 2939 err = rtrs_clt_create_sess_files(sess); 2940 if (err) 2941 goto close_sess; 2942 2943 return 0; 2944 2945 close_sess: 2946 rtrs_clt_remove_path_from_arr(sess); 2947 rtrs_clt_close_conns(sess, true); 2948 free_sess(sess); 2949 2950 return err; 2951 } 2952 2953 static int rtrs_clt_ib_dev_init(struct rtrs_ib_dev *dev) 2954 { 2955 if (!(dev->ib_dev->attrs.device_cap_flags & 2956 IB_DEVICE_MEM_MGT_EXTENSIONS)) { 2957 pr_err("Memory registrations not supported.\n"); 2958 return -ENOTSUPP; 2959 } 2960 2961 return 0; 2962 } 2963 2964 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops = { 2965 .init = rtrs_clt_ib_dev_init 2966 }; 2967 2968 static int __init rtrs_client_init(void) 2969 { 2970 rtrs_rdma_dev_pd_init(0, &dev_pd); 2971 2972 rtrs_clt_dev_class = class_create(THIS_MODULE, "rtrs-client"); 2973 if (IS_ERR(rtrs_clt_dev_class)) { 2974 pr_err("Failed to create rtrs-client dev class\n"); 2975 return PTR_ERR(rtrs_clt_dev_class); 2976 } 2977 rtrs_wq = alloc_workqueue("rtrs_client_wq", 0, 0); 2978 if (!rtrs_wq) { 2979 class_destroy(rtrs_clt_dev_class); 2980 return -ENOMEM; 2981 } 2982 2983 return 0; 2984 } 2985 2986 static void __exit rtrs_client_exit(void) 2987 { 2988 destroy_workqueue(rtrs_wq); 2989 class_destroy(rtrs_clt_dev_class); 2990 rtrs_rdma_dev_pd_deinit(&dev_pd); 2991 } 2992 2993 module_init(rtrs_client_init); 2994 module_exit(rtrs_client_exit); 2995