1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Transport Layer 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 #include <linux/rculist.h> 15 #include <linux/random.h> 16 17 #include "rtrs-clt.h" 18 #include "rtrs-log.h" 19 20 #define RTRS_CONNECT_TIMEOUT_MS 30000 21 /* 22 * Wait a bit before trying to reconnect after a failure 23 * in order to give server time to finish clean up which 24 * leads to "false positives" failed reconnect attempts 25 */ 26 #define RTRS_RECONNECT_BACKOFF 1000 27 /* 28 * Wait for additional random time between 0 and 8 seconds 29 * before starting to reconnect to avoid clients reconnecting 30 * all at once in case of a major network outage 31 */ 32 #define RTRS_RECONNECT_SEED 8 33 34 MODULE_DESCRIPTION("RDMA Transport Client"); 35 MODULE_LICENSE("GPL"); 36 37 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops; 38 static struct rtrs_rdma_dev_pd dev_pd = { 39 .ops = &dev_pd_ops 40 }; 41 42 static struct workqueue_struct *rtrs_wq; 43 static struct class *rtrs_clt_dev_class; 44 45 static inline bool rtrs_clt_is_connected(const struct rtrs_clt *clt) 46 { 47 struct rtrs_clt_sess *sess; 48 bool connected = false; 49 50 rcu_read_lock(); 51 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) 52 connected |= READ_ONCE(sess->state) == RTRS_CLT_CONNECTED; 53 rcu_read_unlock(); 54 55 return connected; 56 } 57 58 static struct rtrs_permit * 59 __rtrs_get_permit(struct rtrs_clt *clt, enum rtrs_clt_con_type con_type) 60 { 61 size_t max_depth = clt->queue_depth; 62 struct rtrs_permit *permit; 63 int bit; 64 65 /* 66 * Adapted from null_blk get_tag(). Callers from different cpus may 67 * grab the same bit, since find_first_zero_bit is not atomic. 68 * But then the test_and_set_bit_lock will fail for all the 69 * callers but one, so that they will loop again. 70 * This way an explicit spinlock is not required. 71 */ 72 do { 73 bit = find_first_zero_bit(clt->permits_map, max_depth); 74 if (unlikely(bit >= max_depth)) 75 return NULL; 76 } while (unlikely(test_and_set_bit_lock(bit, clt->permits_map))); 77 78 permit = get_permit(clt, bit); 79 WARN_ON(permit->mem_id != bit); 80 permit->cpu_id = raw_smp_processor_id(); 81 permit->con_type = con_type; 82 83 return permit; 84 } 85 86 static inline void __rtrs_put_permit(struct rtrs_clt *clt, 87 struct rtrs_permit *permit) 88 { 89 clear_bit_unlock(permit->mem_id, clt->permits_map); 90 } 91 92 /** 93 * rtrs_clt_get_permit() - allocates permit for future RDMA operation 94 * @clt: Current session 95 * @con_type: Type of connection to use with the permit 96 * @can_wait: Wait type 97 * 98 * Description: 99 * Allocates permit for the following RDMA operation. Permit is used 100 * to preallocate all resources and to propagate memory pressure 101 * up earlier. 102 * 103 * Context: 104 * Can sleep if @wait == RTRS_TAG_WAIT 105 */ 106 struct rtrs_permit *rtrs_clt_get_permit(struct rtrs_clt *clt, 107 enum rtrs_clt_con_type con_type, 108 int can_wait) 109 { 110 struct rtrs_permit *permit; 111 DEFINE_WAIT(wait); 112 113 permit = __rtrs_get_permit(clt, con_type); 114 if (likely(permit) || !can_wait) 115 return permit; 116 117 do { 118 prepare_to_wait(&clt->permits_wait, &wait, 119 TASK_UNINTERRUPTIBLE); 120 permit = __rtrs_get_permit(clt, con_type); 121 if (likely(permit)) 122 break; 123 124 io_schedule(); 125 } while (1); 126 127 finish_wait(&clt->permits_wait, &wait); 128 129 return permit; 130 } 131 EXPORT_SYMBOL(rtrs_clt_get_permit); 132 133 /** 134 * rtrs_clt_put_permit() - puts allocated permit 135 * @clt: Current session 136 * @permit: Permit to be freed 137 * 138 * Context: 139 * Does not matter 140 */ 141 void rtrs_clt_put_permit(struct rtrs_clt *clt, struct rtrs_permit *permit) 142 { 143 if (WARN_ON(!test_bit(permit->mem_id, clt->permits_map))) 144 return; 145 146 __rtrs_put_permit(clt, permit); 147 148 /* 149 * rtrs_clt_get_permit() adds itself to the &clt->permits_wait list 150 * before calling schedule(). So if rtrs_clt_get_permit() is sleeping 151 * it must have added itself to &clt->permits_wait before 152 * __rtrs_put_permit() finished. 153 * Hence it is safe to guard wake_up() with a waitqueue_active() test. 154 */ 155 if (waitqueue_active(&clt->permits_wait)) 156 wake_up(&clt->permits_wait); 157 } 158 EXPORT_SYMBOL(rtrs_clt_put_permit); 159 160 void *rtrs_permit_to_pdu(struct rtrs_permit *permit) 161 { 162 return permit + 1; 163 } 164 EXPORT_SYMBOL(rtrs_permit_to_pdu); 165 166 /** 167 * rtrs_permit_to_clt_con() - returns RDMA connection pointer by the permit 168 * @sess: client session pointer 169 * @permit: permit for the allocation of the RDMA buffer 170 * Note: 171 * IO connection starts from 1. 172 * 0 connection is for user messages. 173 */ 174 static 175 struct rtrs_clt_con *rtrs_permit_to_clt_con(struct rtrs_clt_sess *sess, 176 struct rtrs_permit *permit) 177 { 178 int id = 0; 179 180 if (likely(permit->con_type == RTRS_IO_CON)) 181 id = (permit->cpu_id % (sess->s.con_num - 1)) + 1; 182 183 return to_clt_con(sess->s.con[id]); 184 } 185 186 /** 187 * __rtrs_clt_change_state() - change the session state through session state 188 * machine. 189 * 190 * @sess: client session to change the state of. 191 * @new_state: state to change to. 192 * 193 * returns true if successful, false if the requested state can not be set. 194 * 195 * Locks: 196 * state_wq lock must be hold. 197 */ 198 static bool __rtrs_clt_change_state(struct rtrs_clt_sess *sess, 199 enum rtrs_clt_state new_state) 200 { 201 enum rtrs_clt_state old_state; 202 bool changed = false; 203 204 lockdep_assert_held(&sess->state_wq.lock); 205 206 old_state = sess->state; 207 switch (new_state) { 208 case RTRS_CLT_CONNECTING: 209 switch (old_state) { 210 case RTRS_CLT_RECONNECTING: 211 changed = true; 212 fallthrough; 213 default: 214 break; 215 } 216 break; 217 case RTRS_CLT_RECONNECTING: 218 switch (old_state) { 219 case RTRS_CLT_CONNECTED: 220 case RTRS_CLT_CONNECTING_ERR: 221 case RTRS_CLT_CLOSED: 222 changed = true; 223 fallthrough; 224 default: 225 break; 226 } 227 break; 228 case RTRS_CLT_CONNECTED: 229 switch (old_state) { 230 case RTRS_CLT_CONNECTING: 231 changed = true; 232 fallthrough; 233 default: 234 break; 235 } 236 break; 237 case RTRS_CLT_CONNECTING_ERR: 238 switch (old_state) { 239 case RTRS_CLT_CONNECTING: 240 changed = true; 241 fallthrough; 242 default: 243 break; 244 } 245 break; 246 case RTRS_CLT_CLOSING: 247 switch (old_state) { 248 case RTRS_CLT_CONNECTING: 249 case RTRS_CLT_CONNECTING_ERR: 250 case RTRS_CLT_RECONNECTING: 251 case RTRS_CLT_CONNECTED: 252 changed = true; 253 fallthrough; 254 default: 255 break; 256 } 257 break; 258 case RTRS_CLT_CLOSED: 259 switch (old_state) { 260 case RTRS_CLT_CLOSING: 261 changed = true; 262 fallthrough; 263 default: 264 break; 265 } 266 break; 267 case RTRS_CLT_DEAD: 268 switch (old_state) { 269 case RTRS_CLT_CLOSED: 270 changed = true; 271 fallthrough; 272 default: 273 break; 274 } 275 break; 276 default: 277 break; 278 } 279 if (changed) { 280 sess->state = new_state; 281 wake_up_locked(&sess->state_wq); 282 } 283 284 return changed; 285 } 286 287 static bool rtrs_clt_change_state_from_to(struct rtrs_clt_sess *sess, 288 enum rtrs_clt_state old_state, 289 enum rtrs_clt_state new_state) 290 { 291 bool changed = false; 292 293 spin_lock_irq(&sess->state_wq.lock); 294 if (sess->state == old_state) 295 changed = __rtrs_clt_change_state(sess, new_state); 296 spin_unlock_irq(&sess->state_wq.lock); 297 298 return changed; 299 } 300 301 static void rtrs_rdma_error_recovery(struct rtrs_clt_con *con) 302 { 303 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 304 305 if (rtrs_clt_change_state_from_to(sess, 306 RTRS_CLT_CONNECTED, 307 RTRS_CLT_RECONNECTING)) { 308 struct rtrs_clt *clt = sess->clt; 309 unsigned int delay_ms; 310 311 /* 312 * Normal scenario, reconnect if we were successfully connected 313 */ 314 delay_ms = clt->reconnect_delay_sec * 1000; 315 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 316 msecs_to_jiffies(delay_ms + 317 prandom_u32() % RTRS_RECONNECT_SEED)); 318 } else { 319 /* 320 * Error can happen just on establishing new connection, 321 * so notify waiter with error state, waiter is responsible 322 * for cleaning the rest and reconnect if needed. 323 */ 324 rtrs_clt_change_state_from_to(sess, 325 RTRS_CLT_CONNECTING, 326 RTRS_CLT_CONNECTING_ERR); 327 } 328 } 329 330 static void rtrs_clt_fast_reg_done(struct ib_cq *cq, struct ib_wc *wc) 331 { 332 struct rtrs_clt_con *con = cq->cq_context; 333 334 if (unlikely(wc->status != IB_WC_SUCCESS)) { 335 rtrs_err(con->c.sess, "Failed IB_WR_REG_MR: %s\n", 336 ib_wc_status_msg(wc->status)); 337 rtrs_rdma_error_recovery(con); 338 } 339 } 340 341 static struct ib_cqe fast_reg_cqe = { 342 .done = rtrs_clt_fast_reg_done 343 }; 344 345 static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno, 346 bool notify, bool can_wait); 347 348 static void rtrs_clt_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc) 349 { 350 struct rtrs_clt_io_req *req = 351 container_of(wc->wr_cqe, typeof(*req), inv_cqe); 352 struct rtrs_clt_con *con = cq->cq_context; 353 354 if (unlikely(wc->status != IB_WC_SUCCESS)) { 355 rtrs_err(con->c.sess, "Failed IB_WR_LOCAL_INV: %s\n", 356 ib_wc_status_msg(wc->status)); 357 rtrs_rdma_error_recovery(con); 358 } 359 req->need_inv = false; 360 if (likely(req->need_inv_comp)) 361 complete(&req->inv_comp); 362 else 363 /* Complete request from INV callback */ 364 complete_rdma_req(req, req->inv_errno, true, false); 365 } 366 367 static int rtrs_inv_rkey(struct rtrs_clt_io_req *req) 368 { 369 struct rtrs_clt_con *con = req->con; 370 struct ib_send_wr wr = { 371 .opcode = IB_WR_LOCAL_INV, 372 .wr_cqe = &req->inv_cqe, 373 .send_flags = IB_SEND_SIGNALED, 374 .ex.invalidate_rkey = req->mr->rkey, 375 }; 376 req->inv_cqe.done = rtrs_clt_inv_rkey_done; 377 378 return ib_post_send(con->c.qp, &wr, NULL); 379 } 380 381 static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno, 382 bool notify, bool can_wait) 383 { 384 struct rtrs_clt_con *con = req->con; 385 struct rtrs_clt_sess *sess; 386 int err; 387 388 if (WARN_ON(!req->in_use)) 389 return; 390 if (WARN_ON(!req->con)) 391 return; 392 sess = to_clt_sess(con->c.sess); 393 394 if (req->sg_cnt) { 395 if (unlikely(req->dir == DMA_FROM_DEVICE && req->need_inv)) { 396 /* 397 * We are here to invalidate read requests 398 * ourselves. In normal scenario server should 399 * send INV for all read requests, but 400 * we are here, thus two things could happen: 401 * 402 * 1. this is failover, when errno != 0 403 * and can_wait == 1, 404 * 405 * 2. something totally bad happened and 406 * server forgot to send INV, so we 407 * should do that ourselves. 408 */ 409 410 if (likely(can_wait)) { 411 req->need_inv_comp = true; 412 } else { 413 /* This should be IO path, so always notify */ 414 WARN_ON(!notify); 415 /* Save errno for INV callback */ 416 req->inv_errno = errno; 417 } 418 419 err = rtrs_inv_rkey(req); 420 if (unlikely(err)) { 421 rtrs_err(con->c.sess, "Send INV WR key=%#x: %d\n", 422 req->mr->rkey, err); 423 } else if (likely(can_wait)) { 424 wait_for_completion(&req->inv_comp); 425 } else { 426 /* 427 * Something went wrong, so request will be 428 * completed from INV callback. 429 */ 430 WARN_ON_ONCE(1); 431 432 return; 433 } 434 } 435 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist, 436 req->sg_cnt, req->dir); 437 } 438 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT) 439 atomic_dec(&sess->stats->inflight); 440 441 req->in_use = false; 442 req->con = NULL; 443 444 if (notify) 445 req->conf(req->priv, errno); 446 } 447 448 static int rtrs_post_send_rdma(struct rtrs_clt_con *con, 449 struct rtrs_clt_io_req *req, 450 struct rtrs_rbuf *rbuf, u32 off, 451 u32 imm, struct ib_send_wr *wr) 452 { 453 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 454 enum ib_send_flags flags; 455 struct ib_sge sge; 456 457 if (unlikely(!req->sg_size)) { 458 rtrs_wrn(con->c.sess, 459 "Doing RDMA Write failed, no data supplied\n"); 460 return -EINVAL; 461 } 462 463 /* user data and user message in the first list element */ 464 sge.addr = req->iu->dma_addr; 465 sge.length = req->sg_size; 466 sge.lkey = sess->s.dev->ib_pd->local_dma_lkey; 467 468 /* 469 * From time to time we have to post signalled sends, 470 * or send queue will fill up and only QP reset can help. 471 */ 472 flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ? 473 0 : IB_SEND_SIGNALED; 474 475 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr, 476 req->sg_size, DMA_TO_DEVICE); 477 478 return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, &sge, 1, 479 rbuf->rkey, rbuf->addr + off, 480 imm, flags, wr); 481 } 482 483 static void process_io_rsp(struct rtrs_clt_sess *sess, u32 msg_id, 484 s16 errno, bool w_inval) 485 { 486 struct rtrs_clt_io_req *req; 487 488 if (WARN_ON(msg_id >= sess->queue_depth)) 489 return; 490 491 req = &sess->reqs[msg_id]; 492 /* Drop need_inv if server responded with send with invalidation */ 493 req->need_inv &= !w_inval; 494 complete_rdma_req(req, errno, true, false); 495 } 496 497 static void rtrs_clt_recv_done(struct rtrs_clt_con *con, struct ib_wc *wc) 498 { 499 struct rtrs_iu *iu; 500 int err; 501 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 502 503 WARN_ON(sess->flags != RTRS_MSG_NEW_RKEY_F); 504 iu = container_of(wc->wr_cqe, struct rtrs_iu, 505 cqe); 506 err = rtrs_iu_post_recv(&con->c, iu); 507 if (unlikely(err)) { 508 rtrs_err(con->c.sess, "post iu failed %d\n", err); 509 rtrs_rdma_error_recovery(con); 510 } 511 } 512 513 static void rtrs_clt_rkey_rsp_done(struct rtrs_clt_con *con, struct ib_wc *wc) 514 { 515 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 516 struct rtrs_msg_rkey_rsp *msg; 517 u32 imm_type, imm_payload; 518 bool w_inval = false; 519 struct rtrs_iu *iu; 520 u32 buf_id; 521 int err; 522 523 WARN_ON(sess->flags != RTRS_MSG_NEW_RKEY_F); 524 525 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 526 527 if (unlikely(wc->byte_len < sizeof(*msg))) { 528 rtrs_err(con->c.sess, "rkey response is malformed: size %d\n", 529 wc->byte_len); 530 goto out; 531 } 532 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr, 533 iu->size, DMA_FROM_DEVICE); 534 msg = iu->buf; 535 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_RKEY_RSP)) { 536 rtrs_err(sess->clt, "rkey response is malformed: type %d\n", 537 le16_to_cpu(msg->type)); 538 goto out; 539 } 540 buf_id = le16_to_cpu(msg->buf_id); 541 if (WARN_ON(buf_id >= sess->queue_depth)) 542 goto out; 543 544 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), &imm_type, &imm_payload); 545 if (likely(imm_type == RTRS_IO_RSP_IMM || 546 imm_type == RTRS_IO_RSP_W_INV_IMM)) { 547 u32 msg_id; 548 549 w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM); 550 rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err); 551 552 if (WARN_ON(buf_id != msg_id)) 553 goto out; 554 sess->rbufs[buf_id].rkey = le32_to_cpu(msg->rkey); 555 process_io_rsp(sess, msg_id, err, w_inval); 556 } 557 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, iu->dma_addr, 558 iu->size, DMA_FROM_DEVICE); 559 return rtrs_clt_recv_done(con, wc); 560 out: 561 rtrs_rdma_error_recovery(con); 562 } 563 564 static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc); 565 566 static struct ib_cqe io_comp_cqe = { 567 .done = rtrs_clt_rdma_done 568 }; 569 570 /* 571 * Post x2 empty WRs: first is for this RDMA with IMM, 572 * second is for RECV with INV, which happened earlier. 573 */ 574 static int rtrs_post_recv_empty_x2(struct rtrs_con *con, struct ib_cqe *cqe) 575 { 576 struct ib_recv_wr wr_arr[2], *wr; 577 int i; 578 579 memset(wr_arr, 0, sizeof(wr_arr)); 580 for (i = 0; i < ARRAY_SIZE(wr_arr); i++) { 581 wr = &wr_arr[i]; 582 wr->wr_cqe = cqe; 583 if (i) 584 /* Chain backwards */ 585 wr->next = &wr_arr[i - 1]; 586 } 587 588 return ib_post_recv(con->qp, wr, NULL); 589 } 590 591 static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc) 592 { 593 struct rtrs_clt_con *con = cq->cq_context; 594 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 595 u32 imm_type, imm_payload; 596 bool w_inval = false; 597 int err; 598 599 if (unlikely(wc->status != IB_WC_SUCCESS)) { 600 if (wc->status != IB_WC_WR_FLUSH_ERR) { 601 rtrs_err(sess->clt, "RDMA failed: %s\n", 602 ib_wc_status_msg(wc->status)); 603 rtrs_rdma_error_recovery(con); 604 } 605 return; 606 } 607 rtrs_clt_update_wc_stats(con); 608 609 switch (wc->opcode) { 610 case IB_WC_RECV_RDMA_WITH_IMM: 611 /* 612 * post_recv() RDMA write completions of IO reqs (read/write) 613 * and hb 614 */ 615 if (WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done)) 616 return; 617 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), 618 &imm_type, &imm_payload); 619 if (likely(imm_type == RTRS_IO_RSP_IMM || 620 imm_type == RTRS_IO_RSP_W_INV_IMM)) { 621 u32 msg_id; 622 623 w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM); 624 rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err); 625 626 process_io_rsp(sess, msg_id, err, w_inval); 627 } else if (imm_type == RTRS_HB_MSG_IMM) { 628 WARN_ON(con->c.cid); 629 rtrs_send_hb_ack(&sess->s); 630 if (sess->flags == RTRS_MSG_NEW_RKEY_F) 631 return rtrs_clt_recv_done(con, wc); 632 } else if (imm_type == RTRS_HB_ACK_IMM) { 633 WARN_ON(con->c.cid); 634 sess->s.hb_missed_cnt = 0; 635 if (sess->flags == RTRS_MSG_NEW_RKEY_F) 636 return rtrs_clt_recv_done(con, wc); 637 } else { 638 rtrs_wrn(con->c.sess, "Unknown IMM type %u\n", 639 imm_type); 640 } 641 if (w_inval) 642 /* 643 * Post x2 empty WRs: first is for this RDMA with IMM, 644 * second is for RECV with INV, which happened earlier. 645 */ 646 err = rtrs_post_recv_empty_x2(&con->c, &io_comp_cqe); 647 else 648 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 649 if (unlikely(err)) { 650 rtrs_err(con->c.sess, "rtrs_post_recv_empty(): %d\n", 651 err); 652 rtrs_rdma_error_recovery(con); 653 break; 654 } 655 break; 656 case IB_WC_RECV: 657 /* 658 * Key invalidations from server side 659 */ 660 WARN_ON(!(wc->wc_flags & IB_WC_WITH_INVALIDATE || 661 wc->wc_flags & IB_WC_WITH_IMM)); 662 WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done); 663 if (sess->flags == RTRS_MSG_NEW_RKEY_F) { 664 if (wc->wc_flags & IB_WC_WITH_INVALIDATE) 665 return rtrs_clt_recv_done(con, wc); 666 667 return rtrs_clt_rkey_rsp_done(con, wc); 668 } 669 break; 670 case IB_WC_RDMA_WRITE: 671 /* 672 * post_send() RDMA write completions of IO reqs (read/write) 673 * and hb 674 */ 675 break; 676 677 default: 678 rtrs_wrn(sess->clt, "Unexpected WC type: %d\n", wc->opcode); 679 return; 680 } 681 } 682 683 static int post_recv_io(struct rtrs_clt_con *con, size_t q_size) 684 { 685 int err, i; 686 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 687 688 for (i = 0; i < q_size; i++) { 689 if (sess->flags == RTRS_MSG_NEW_RKEY_F) { 690 struct rtrs_iu *iu = &con->rsp_ius[i]; 691 692 err = rtrs_iu_post_recv(&con->c, iu); 693 } else { 694 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 695 } 696 if (unlikely(err)) 697 return err; 698 } 699 700 return 0; 701 } 702 703 static int post_recv_sess(struct rtrs_clt_sess *sess) 704 { 705 size_t q_size = 0; 706 int err, cid; 707 708 for (cid = 0; cid < sess->s.con_num; cid++) { 709 if (cid == 0) 710 q_size = SERVICE_CON_QUEUE_DEPTH; 711 else 712 q_size = sess->queue_depth; 713 714 /* 715 * x2 for RDMA read responses + FR key invalidations, 716 * RDMA writes do not require any FR registrations. 717 */ 718 q_size *= 2; 719 720 err = post_recv_io(to_clt_con(sess->s.con[cid]), q_size); 721 if (unlikely(err)) { 722 rtrs_err(sess->clt, "post_recv_io(), err: %d\n", err); 723 return err; 724 } 725 } 726 727 return 0; 728 } 729 730 struct path_it { 731 int i; 732 struct list_head skip_list; 733 struct rtrs_clt *clt; 734 struct rtrs_clt_sess *(*next_path)(struct path_it *it); 735 }; 736 737 /** 738 * list_next_or_null_rr_rcu - get next list element in round-robin fashion. 739 * @head: the head for the list. 740 * @ptr: the list head to take the next element from. 741 * @type: the type of the struct this is embedded in. 742 * @memb: the name of the list_head within the struct. 743 * 744 * Next element returned in round-robin fashion, i.e. head will be skipped, 745 * but if list is observed as empty, NULL will be returned. 746 * 747 * This primitive may safely run concurrently with the _rcu list-mutation 748 * primitives such as list_add_rcu() as long as it's guarded by rcu_read_lock(). 749 */ 750 #define list_next_or_null_rr_rcu(head, ptr, type, memb) \ 751 ({ \ 752 list_next_or_null_rcu(head, ptr, type, memb) ?: \ 753 list_next_or_null_rcu(head, READ_ONCE((ptr)->next), \ 754 type, memb); \ 755 }) 756 757 /** 758 * get_next_path_rr() - Returns path in round-robin fashion. 759 * @it: the path pointer 760 * 761 * Related to @MP_POLICY_RR 762 * 763 * Locks: 764 * rcu_read_lock() must be hold. 765 */ 766 static struct rtrs_clt_sess *get_next_path_rr(struct path_it *it) 767 { 768 struct rtrs_clt_sess __rcu **ppcpu_path; 769 struct rtrs_clt_sess *path; 770 struct rtrs_clt *clt; 771 772 clt = it->clt; 773 774 /* 775 * Here we use two RCU objects: @paths_list and @pcpu_path 776 * pointer. See rtrs_clt_remove_path_from_arr() for details 777 * how that is handled. 778 */ 779 780 ppcpu_path = this_cpu_ptr(clt->pcpu_path); 781 path = rcu_dereference(*ppcpu_path); 782 if (unlikely(!path)) 783 path = list_first_or_null_rcu(&clt->paths_list, 784 typeof(*path), s.entry); 785 else 786 path = list_next_or_null_rr_rcu(&clt->paths_list, 787 &path->s.entry, 788 typeof(*path), 789 s.entry); 790 rcu_assign_pointer(*ppcpu_path, path); 791 792 return path; 793 } 794 795 /** 796 * get_next_path_min_inflight() - Returns path with minimal inflight count. 797 * @it: the path pointer 798 * 799 * Related to @MP_POLICY_MIN_INFLIGHT 800 * 801 * Locks: 802 * rcu_read_lock() must be hold. 803 */ 804 static struct rtrs_clt_sess *get_next_path_min_inflight(struct path_it *it) 805 { 806 struct rtrs_clt_sess *min_path = NULL; 807 struct rtrs_clt *clt = it->clt; 808 struct rtrs_clt_sess *sess; 809 int min_inflight = INT_MAX; 810 int inflight; 811 812 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) { 813 if (unlikely(!list_empty(raw_cpu_ptr(sess->mp_skip_entry)))) 814 continue; 815 816 inflight = atomic_read(&sess->stats->inflight); 817 818 if (inflight < min_inflight) { 819 min_inflight = inflight; 820 min_path = sess; 821 } 822 } 823 824 /* 825 * add the path to the skip list, so that next time we can get 826 * a different one 827 */ 828 if (min_path) 829 list_add(raw_cpu_ptr(min_path->mp_skip_entry), &it->skip_list); 830 831 return min_path; 832 } 833 834 static inline void path_it_init(struct path_it *it, struct rtrs_clt *clt) 835 { 836 INIT_LIST_HEAD(&it->skip_list); 837 it->clt = clt; 838 it->i = 0; 839 840 if (clt->mp_policy == MP_POLICY_RR) 841 it->next_path = get_next_path_rr; 842 else 843 it->next_path = get_next_path_min_inflight; 844 } 845 846 static inline void path_it_deinit(struct path_it *it) 847 { 848 struct list_head *skip, *tmp; 849 /* 850 * The skip_list is used only for the MIN_INFLIGHT policy. 851 * We need to remove paths from it, so that next IO can insert 852 * paths (->mp_skip_entry) into a skip_list again. 853 */ 854 list_for_each_safe(skip, tmp, &it->skip_list) 855 list_del_init(skip); 856 } 857 858 /** 859 * rtrs_clt_init_req() Initialize an rtrs_clt_io_req holding information 860 * about an inflight IO. 861 * The user buffer holding user control message (not data) is copied into 862 * the corresponding buffer of rtrs_iu (req->iu->buf), which later on will 863 * also hold the control message of rtrs. 864 * @req: an io request holding information about IO. 865 * @sess: client session 866 * @conf: conformation callback function to notify upper layer. 867 * @permit: permit for allocation of RDMA remote buffer 868 * @priv: private pointer 869 * @vec: kernel vector containing control message 870 * @usr_len: length of the user message 871 * @sg: scater list for IO data 872 * @sg_cnt: number of scater list entries 873 * @data_len: length of the IO data 874 * @dir: direction of the IO. 875 */ 876 static void rtrs_clt_init_req(struct rtrs_clt_io_req *req, 877 struct rtrs_clt_sess *sess, 878 void (*conf)(void *priv, int errno), 879 struct rtrs_permit *permit, void *priv, 880 const struct kvec *vec, size_t usr_len, 881 struct scatterlist *sg, size_t sg_cnt, 882 size_t data_len, int dir) 883 { 884 struct iov_iter iter; 885 size_t len; 886 887 req->permit = permit; 888 req->in_use = true; 889 req->usr_len = usr_len; 890 req->data_len = data_len; 891 req->sglist = sg; 892 req->sg_cnt = sg_cnt; 893 req->priv = priv; 894 req->dir = dir; 895 req->con = rtrs_permit_to_clt_con(sess, permit); 896 req->conf = conf; 897 req->need_inv = false; 898 req->need_inv_comp = false; 899 req->inv_errno = 0; 900 901 iov_iter_kvec(&iter, READ, vec, 1, usr_len); 902 len = _copy_from_iter(req->iu->buf, usr_len, &iter); 903 WARN_ON(len != usr_len); 904 905 reinit_completion(&req->inv_comp); 906 } 907 908 static struct rtrs_clt_io_req * 909 rtrs_clt_get_req(struct rtrs_clt_sess *sess, 910 void (*conf)(void *priv, int errno), 911 struct rtrs_permit *permit, void *priv, 912 const struct kvec *vec, size_t usr_len, 913 struct scatterlist *sg, size_t sg_cnt, 914 size_t data_len, int dir) 915 { 916 struct rtrs_clt_io_req *req; 917 918 req = &sess->reqs[permit->mem_id]; 919 rtrs_clt_init_req(req, sess, conf, permit, priv, vec, usr_len, 920 sg, sg_cnt, data_len, dir); 921 return req; 922 } 923 924 static struct rtrs_clt_io_req * 925 rtrs_clt_get_copy_req(struct rtrs_clt_sess *alive_sess, 926 struct rtrs_clt_io_req *fail_req) 927 { 928 struct rtrs_clt_io_req *req; 929 struct kvec vec = { 930 .iov_base = fail_req->iu->buf, 931 .iov_len = fail_req->usr_len 932 }; 933 934 req = &alive_sess->reqs[fail_req->permit->mem_id]; 935 rtrs_clt_init_req(req, alive_sess, fail_req->conf, fail_req->permit, 936 fail_req->priv, &vec, fail_req->usr_len, 937 fail_req->sglist, fail_req->sg_cnt, 938 fail_req->data_len, fail_req->dir); 939 return req; 940 } 941 942 static int rtrs_post_rdma_write_sg(struct rtrs_clt_con *con, 943 struct rtrs_clt_io_req *req, 944 struct rtrs_rbuf *rbuf, 945 u32 size, u32 imm) 946 { 947 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 948 struct ib_sge *sge = req->sge; 949 enum ib_send_flags flags; 950 struct scatterlist *sg; 951 size_t num_sge; 952 int i; 953 954 for_each_sg(req->sglist, sg, req->sg_cnt, i) { 955 sge[i].addr = sg_dma_address(sg); 956 sge[i].length = sg_dma_len(sg); 957 sge[i].lkey = sess->s.dev->ib_pd->local_dma_lkey; 958 } 959 sge[i].addr = req->iu->dma_addr; 960 sge[i].length = size; 961 sge[i].lkey = sess->s.dev->ib_pd->local_dma_lkey; 962 963 num_sge = 1 + req->sg_cnt; 964 965 /* 966 * From time to time we have to post signalled sends, 967 * or send queue will fill up and only QP reset can help. 968 */ 969 flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ? 970 0 : IB_SEND_SIGNALED; 971 972 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr, 973 size, DMA_TO_DEVICE); 974 975 return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, sge, num_sge, 976 rbuf->rkey, rbuf->addr, imm, 977 flags, NULL); 978 } 979 980 static int rtrs_clt_write_req(struct rtrs_clt_io_req *req) 981 { 982 struct rtrs_clt_con *con = req->con; 983 struct rtrs_sess *s = con->c.sess; 984 struct rtrs_clt_sess *sess = to_clt_sess(s); 985 struct rtrs_msg_rdma_write *msg; 986 987 struct rtrs_rbuf *rbuf; 988 int ret, count = 0; 989 u32 imm, buf_id; 990 991 const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len; 992 993 if (unlikely(tsize > sess->chunk_size)) { 994 rtrs_wrn(s, "Write request failed, size too big %zu > %d\n", 995 tsize, sess->chunk_size); 996 return -EMSGSIZE; 997 } 998 if (req->sg_cnt) { 999 count = ib_dma_map_sg(sess->s.dev->ib_dev, req->sglist, 1000 req->sg_cnt, req->dir); 1001 if (unlikely(!count)) { 1002 rtrs_wrn(s, "Write request failed, map failed\n"); 1003 return -EINVAL; 1004 } 1005 } 1006 /* put rtrs msg after sg and user message */ 1007 msg = req->iu->buf + req->usr_len; 1008 msg->type = cpu_to_le16(RTRS_MSG_WRITE); 1009 msg->usr_len = cpu_to_le16(req->usr_len); 1010 1011 /* rtrs message on server side will be after user data and message */ 1012 imm = req->permit->mem_off + req->data_len + req->usr_len; 1013 imm = rtrs_to_io_req_imm(imm); 1014 buf_id = req->permit->mem_id; 1015 req->sg_size = tsize; 1016 rbuf = &sess->rbufs[buf_id]; 1017 1018 /* 1019 * Update stats now, after request is successfully sent it is not 1020 * safe anymore to touch it. 1021 */ 1022 rtrs_clt_update_all_stats(req, WRITE); 1023 1024 ret = rtrs_post_rdma_write_sg(req->con, req, rbuf, 1025 req->usr_len + sizeof(*msg), 1026 imm); 1027 if (unlikely(ret)) { 1028 rtrs_err(s, "Write request failed: %d\n", ret); 1029 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT) 1030 atomic_dec(&sess->stats->inflight); 1031 if (req->sg_cnt) 1032 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist, 1033 req->sg_cnt, req->dir); 1034 } 1035 1036 return ret; 1037 } 1038 1039 static int rtrs_map_sg_fr(struct rtrs_clt_io_req *req, size_t count) 1040 { 1041 int nr; 1042 1043 /* Align the MR to a 4K page size to match the block virt boundary */ 1044 nr = ib_map_mr_sg(req->mr, req->sglist, count, NULL, SZ_4K); 1045 if (nr < 0) 1046 return nr; 1047 if (unlikely(nr < req->sg_cnt)) 1048 return -EINVAL; 1049 ib_update_fast_reg_key(req->mr, ib_inc_rkey(req->mr->rkey)); 1050 1051 return nr; 1052 } 1053 1054 static int rtrs_clt_read_req(struct rtrs_clt_io_req *req) 1055 { 1056 struct rtrs_clt_con *con = req->con; 1057 struct rtrs_sess *s = con->c.sess; 1058 struct rtrs_clt_sess *sess = to_clt_sess(s); 1059 struct rtrs_msg_rdma_read *msg; 1060 struct rtrs_ib_dev *dev; 1061 1062 struct ib_reg_wr rwr; 1063 struct ib_send_wr *wr = NULL; 1064 1065 int ret, count = 0; 1066 u32 imm, buf_id; 1067 1068 const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len; 1069 1070 s = &sess->s; 1071 dev = sess->s.dev; 1072 1073 if (unlikely(tsize > sess->chunk_size)) { 1074 rtrs_wrn(s, 1075 "Read request failed, message size is %zu, bigger than CHUNK_SIZE %d\n", 1076 tsize, sess->chunk_size); 1077 return -EMSGSIZE; 1078 } 1079 1080 if (req->sg_cnt) { 1081 count = ib_dma_map_sg(dev->ib_dev, req->sglist, req->sg_cnt, 1082 req->dir); 1083 if (unlikely(!count)) { 1084 rtrs_wrn(s, 1085 "Read request failed, dma map failed\n"); 1086 return -EINVAL; 1087 } 1088 } 1089 /* put our message into req->buf after user message*/ 1090 msg = req->iu->buf + req->usr_len; 1091 msg->type = cpu_to_le16(RTRS_MSG_READ); 1092 msg->usr_len = cpu_to_le16(req->usr_len); 1093 1094 if (count) { 1095 ret = rtrs_map_sg_fr(req, count); 1096 if (ret < 0) { 1097 rtrs_err_rl(s, 1098 "Read request failed, failed to map fast reg. data, err: %d\n", 1099 ret); 1100 ib_dma_unmap_sg(dev->ib_dev, req->sglist, req->sg_cnt, 1101 req->dir); 1102 return ret; 1103 } 1104 rwr = (struct ib_reg_wr) { 1105 .wr.opcode = IB_WR_REG_MR, 1106 .wr.wr_cqe = &fast_reg_cqe, 1107 .mr = req->mr, 1108 .key = req->mr->rkey, 1109 .access = (IB_ACCESS_LOCAL_WRITE | 1110 IB_ACCESS_REMOTE_WRITE), 1111 }; 1112 wr = &rwr.wr; 1113 1114 msg->sg_cnt = cpu_to_le16(1); 1115 msg->flags = cpu_to_le16(RTRS_MSG_NEED_INVAL_F); 1116 1117 msg->desc[0].addr = cpu_to_le64(req->mr->iova); 1118 msg->desc[0].key = cpu_to_le32(req->mr->rkey); 1119 msg->desc[0].len = cpu_to_le32(req->mr->length); 1120 1121 /* Further invalidation is required */ 1122 req->need_inv = !!RTRS_MSG_NEED_INVAL_F; 1123 1124 } else { 1125 msg->sg_cnt = 0; 1126 msg->flags = 0; 1127 } 1128 /* 1129 * rtrs message will be after the space reserved for disk data and 1130 * user message 1131 */ 1132 imm = req->permit->mem_off + req->data_len + req->usr_len; 1133 imm = rtrs_to_io_req_imm(imm); 1134 buf_id = req->permit->mem_id; 1135 1136 req->sg_size = sizeof(*msg); 1137 req->sg_size += le16_to_cpu(msg->sg_cnt) * sizeof(struct rtrs_sg_desc); 1138 req->sg_size += req->usr_len; 1139 1140 /* 1141 * Update stats now, after request is successfully sent it is not 1142 * safe anymore to touch it. 1143 */ 1144 rtrs_clt_update_all_stats(req, READ); 1145 1146 ret = rtrs_post_send_rdma(req->con, req, &sess->rbufs[buf_id], 1147 req->data_len, imm, wr); 1148 if (unlikely(ret)) { 1149 rtrs_err(s, "Read request failed: %d\n", ret); 1150 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT) 1151 atomic_dec(&sess->stats->inflight); 1152 req->need_inv = false; 1153 if (req->sg_cnt) 1154 ib_dma_unmap_sg(dev->ib_dev, req->sglist, 1155 req->sg_cnt, req->dir); 1156 } 1157 1158 return ret; 1159 } 1160 1161 /** 1162 * rtrs_clt_failover_req() Try to find an active path for a failed request 1163 * @clt: clt context 1164 * @fail_req: a failed io request. 1165 */ 1166 static int rtrs_clt_failover_req(struct rtrs_clt *clt, 1167 struct rtrs_clt_io_req *fail_req) 1168 { 1169 struct rtrs_clt_sess *alive_sess; 1170 struct rtrs_clt_io_req *req; 1171 int err = -ECONNABORTED; 1172 struct path_it it; 1173 1174 rcu_read_lock(); 1175 for (path_it_init(&it, clt); 1176 (alive_sess = it.next_path(&it)) && it.i < it.clt->paths_num; 1177 it.i++) { 1178 if (unlikely(READ_ONCE(alive_sess->state) != 1179 RTRS_CLT_CONNECTED)) 1180 continue; 1181 req = rtrs_clt_get_copy_req(alive_sess, fail_req); 1182 if (req->dir == DMA_TO_DEVICE) 1183 err = rtrs_clt_write_req(req); 1184 else 1185 err = rtrs_clt_read_req(req); 1186 if (unlikely(err)) { 1187 req->in_use = false; 1188 continue; 1189 } 1190 /* Success path */ 1191 rtrs_clt_inc_failover_cnt(alive_sess->stats); 1192 break; 1193 } 1194 path_it_deinit(&it); 1195 rcu_read_unlock(); 1196 1197 return err; 1198 } 1199 1200 static void fail_all_outstanding_reqs(struct rtrs_clt_sess *sess) 1201 { 1202 struct rtrs_clt *clt = sess->clt; 1203 struct rtrs_clt_io_req *req; 1204 int i, err; 1205 1206 if (!sess->reqs) 1207 return; 1208 for (i = 0; i < sess->queue_depth; ++i) { 1209 req = &sess->reqs[i]; 1210 if (!req->in_use) 1211 continue; 1212 1213 /* 1214 * Safely (without notification) complete failed request. 1215 * After completion this request is still useble and can 1216 * be failovered to another path. 1217 */ 1218 complete_rdma_req(req, -ECONNABORTED, false, true); 1219 1220 err = rtrs_clt_failover_req(clt, req); 1221 if (unlikely(err)) 1222 /* Failover failed, notify anyway */ 1223 req->conf(req->priv, err); 1224 } 1225 } 1226 1227 static void free_sess_reqs(struct rtrs_clt_sess *sess) 1228 { 1229 struct rtrs_clt_io_req *req; 1230 int i; 1231 1232 if (!sess->reqs) 1233 return; 1234 for (i = 0; i < sess->queue_depth; ++i) { 1235 req = &sess->reqs[i]; 1236 if (req->mr) 1237 ib_dereg_mr(req->mr); 1238 kfree(req->sge); 1239 rtrs_iu_free(req->iu, sess->s.dev->ib_dev, 1); 1240 } 1241 kfree(sess->reqs); 1242 sess->reqs = NULL; 1243 } 1244 1245 static int alloc_sess_reqs(struct rtrs_clt_sess *sess) 1246 { 1247 struct rtrs_clt_io_req *req; 1248 struct rtrs_clt *clt = sess->clt; 1249 int i, err = -ENOMEM; 1250 1251 sess->reqs = kcalloc(sess->queue_depth, sizeof(*sess->reqs), 1252 GFP_KERNEL); 1253 if (!sess->reqs) 1254 return -ENOMEM; 1255 1256 for (i = 0; i < sess->queue_depth; ++i) { 1257 req = &sess->reqs[i]; 1258 req->iu = rtrs_iu_alloc(1, sess->max_hdr_size, GFP_KERNEL, 1259 sess->s.dev->ib_dev, 1260 DMA_TO_DEVICE, 1261 rtrs_clt_rdma_done); 1262 if (!req->iu) 1263 goto out; 1264 1265 req->sge = kmalloc_array(clt->max_segments + 1, 1266 sizeof(*req->sge), GFP_KERNEL); 1267 if (!req->sge) 1268 goto out; 1269 1270 req->mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG, 1271 sess->max_pages_per_mr); 1272 if (IS_ERR(req->mr)) { 1273 err = PTR_ERR(req->mr); 1274 req->mr = NULL; 1275 pr_err("Failed to alloc sess->max_pages_per_mr %d\n", 1276 sess->max_pages_per_mr); 1277 goto out; 1278 } 1279 1280 init_completion(&req->inv_comp); 1281 } 1282 1283 return 0; 1284 1285 out: 1286 free_sess_reqs(sess); 1287 1288 return err; 1289 } 1290 1291 static int alloc_permits(struct rtrs_clt *clt) 1292 { 1293 unsigned int chunk_bits; 1294 int err, i; 1295 1296 clt->permits_map = kcalloc(BITS_TO_LONGS(clt->queue_depth), 1297 sizeof(long), GFP_KERNEL); 1298 if (!clt->permits_map) { 1299 err = -ENOMEM; 1300 goto out_err; 1301 } 1302 clt->permits = kcalloc(clt->queue_depth, permit_size(clt), GFP_KERNEL); 1303 if (!clt->permits) { 1304 err = -ENOMEM; 1305 goto err_map; 1306 } 1307 chunk_bits = ilog2(clt->queue_depth - 1) + 1; 1308 for (i = 0; i < clt->queue_depth; i++) { 1309 struct rtrs_permit *permit; 1310 1311 permit = get_permit(clt, i); 1312 permit->mem_id = i; 1313 permit->mem_off = i << (MAX_IMM_PAYL_BITS - chunk_bits); 1314 } 1315 1316 return 0; 1317 1318 err_map: 1319 kfree(clt->permits_map); 1320 clt->permits_map = NULL; 1321 out_err: 1322 return err; 1323 } 1324 1325 static void free_permits(struct rtrs_clt *clt) 1326 { 1327 kfree(clt->permits_map); 1328 clt->permits_map = NULL; 1329 kfree(clt->permits); 1330 clt->permits = NULL; 1331 } 1332 1333 static void query_fast_reg_mode(struct rtrs_clt_sess *sess) 1334 { 1335 struct ib_device *ib_dev; 1336 u64 max_pages_per_mr; 1337 int mr_page_shift; 1338 1339 ib_dev = sess->s.dev->ib_dev; 1340 1341 /* 1342 * Use the smallest page size supported by the HCA, down to a 1343 * minimum of 4096 bytes. We're unlikely to build large sglists 1344 * out of smaller entries. 1345 */ 1346 mr_page_shift = max(12, ffs(ib_dev->attrs.page_size_cap) - 1); 1347 max_pages_per_mr = ib_dev->attrs.max_mr_size; 1348 do_div(max_pages_per_mr, (1ull << mr_page_shift)); 1349 sess->max_pages_per_mr = 1350 min3(sess->max_pages_per_mr, (u32)max_pages_per_mr, 1351 ib_dev->attrs.max_fast_reg_page_list_len); 1352 sess->max_send_sge = ib_dev->attrs.max_send_sge; 1353 } 1354 1355 static bool rtrs_clt_change_state_get_old(struct rtrs_clt_sess *sess, 1356 enum rtrs_clt_state new_state, 1357 enum rtrs_clt_state *old_state) 1358 { 1359 bool changed; 1360 1361 spin_lock_irq(&sess->state_wq.lock); 1362 *old_state = sess->state; 1363 changed = __rtrs_clt_change_state(sess, new_state); 1364 spin_unlock_irq(&sess->state_wq.lock); 1365 1366 return changed; 1367 } 1368 1369 static bool rtrs_clt_change_state(struct rtrs_clt_sess *sess, 1370 enum rtrs_clt_state new_state) 1371 { 1372 enum rtrs_clt_state old_state; 1373 1374 return rtrs_clt_change_state_get_old(sess, new_state, &old_state); 1375 } 1376 1377 static void rtrs_clt_hb_err_handler(struct rtrs_con *c) 1378 { 1379 struct rtrs_clt_con *con = container_of(c, typeof(*con), c); 1380 1381 rtrs_rdma_error_recovery(con); 1382 } 1383 1384 static void rtrs_clt_init_hb(struct rtrs_clt_sess *sess) 1385 { 1386 rtrs_init_hb(&sess->s, &io_comp_cqe, 1387 RTRS_HB_INTERVAL_MS, 1388 RTRS_HB_MISSED_MAX, 1389 rtrs_clt_hb_err_handler, 1390 rtrs_wq); 1391 } 1392 1393 static void rtrs_clt_start_hb(struct rtrs_clt_sess *sess) 1394 { 1395 rtrs_start_hb(&sess->s); 1396 } 1397 1398 static void rtrs_clt_stop_hb(struct rtrs_clt_sess *sess) 1399 { 1400 rtrs_stop_hb(&sess->s); 1401 } 1402 1403 static void rtrs_clt_reconnect_work(struct work_struct *work); 1404 static void rtrs_clt_close_work(struct work_struct *work); 1405 1406 static struct rtrs_clt_sess *alloc_sess(struct rtrs_clt *clt, 1407 const struct rtrs_addr *path, 1408 size_t con_num, u16 max_segments, 1409 size_t max_segment_size) 1410 { 1411 struct rtrs_clt_sess *sess; 1412 int err = -ENOMEM; 1413 int cpu; 1414 1415 sess = kzalloc(sizeof(*sess), GFP_KERNEL); 1416 if (!sess) 1417 goto err; 1418 1419 /* Extra connection for user messages */ 1420 con_num += 1; 1421 1422 sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL); 1423 if (!sess->s.con) 1424 goto err_free_sess; 1425 1426 sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL); 1427 if (!sess->stats) 1428 goto err_free_con; 1429 1430 mutex_init(&sess->init_mutex); 1431 uuid_gen(&sess->s.uuid); 1432 memcpy(&sess->s.dst_addr, path->dst, 1433 rdma_addr_size((struct sockaddr *)path->dst)); 1434 1435 /* 1436 * rdma_resolve_addr() passes src_addr to cma_bind_addr, which 1437 * checks the sa_family to be non-zero. If user passed src_addr=NULL 1438 * the sess->src_addr will contain only zeros, which is then fine. 1439 */ 1440 if (path->src) 1441 memcpy(&sess->s.src_addr, path->src, 1442 rdma_addr_size((struct sockaddr *)path->src)); 1443 strlcpy(sess->s.sessname, clt->sessname, sizeof(sess->s.sessname)); 1444 sess->s.con_num = con_num; 1445 sess->clt = clt; 1446 sess->max_pages_per_mr = max_segments * max_segment_size >> 12; 1447 init_waitqueue_head(&sess->state_wq); 1448 sess->state = RTRS_CLT_CONNECTING; 1449 atomic_set(&sess->connected_cnt, 0); 1450 INIT_WORK(&sess->close_work, rtrs_clt_close_work); 1451 INIT_DELAYED_WORK(&sess->reconnect_dwork, rtrs_clt_reconnect_work); 1452 rtrs_clt_init_hb(sess); 1453 1454 sess->mp_skip_entry = alloc_percpu(typeof(*sess->mp_skip_entry)); 1455 if (!sess->mp_skip_entry) 1456 goto err_free_stats; 1457 1458 for_each_possible_cpu(cpu) 1459 INIT_LIST_HEAD(per_cpu_ptr(sess->mp_skip_entry, cpu)); 1460 1461 err = rtrs_clt_init_stats(sess->stats); 1462 if (err) 1463 goto err_free_percpu; 1464 1465 return sess; 1466 1467 err_free_percpu: 1468 free_percpu(sess->mp_skip_entry); 1469 err_free_stats: 1470 kfree(sess->stats); 1471 err_free_con: 1472 kfree(sess->s.con); 1473 err_free_sess: 1474 kfree(sess); 1475 err: 1476 return ERR_PTR(err); 1477 } 1478 1479 void free_sess(struct rtrs_clt_sess *sess) 1480 { 1481 free_percpu(sess->mp_skip_entry); 1482 mutex_destroy(&sess->init_mutex); 1483 kfree(sess->s.con); 1484 kfree(sess->rbufs); 1485 kfree(sess); 1486 } 1487 1488 static int create_con(struct rtrs_clt_sess *sess, unsigned int cid) 1489 { 1490 struct rtrs_clt_con *con; 1491 1492 con = kzalloc(sizeof(*con), GFP_KERNEL); 1493 if (!con) 1494 return -ENOMEM; 1495 1496 /* Map first two connections to the first CPU */ 1497 con->cpu = (cid ? cid - 1 : 0) % nr_cpu_ids; 1498 con->c.cid = cid; 1499 con->c.sess = &sess->s; 1500 atomic_set(&con->io_cnt, 0); 1501 mutex_init(&con->con_mutex); 1502 1503 sess->s.con[cid] = &con->c; 1504 1505 return 0; 1506 } 1507 1508 static void destroy_con(struct rtrs_clt_con *con) 1509 { 1510 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1511 1512 sess->s.con[con->c.cid] = NULL; 1513 mutex_destroy(&con->con_mutex); 1514 kfree(con); 1515 } 1516 1517 static int create_con_cq_qp(struct rtrs_clt_con *con) 1518 { 1519 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1520 u16 wr_queue_size; 1521 int err, cq_vector; 1522 struct rtrs_msg_rkey_rsp *rsp; 1523 1524 lockdep_assert_held(&con->con_mutex); 1525 if (con->c.cid == 0) { 1526 /* 1527 * One completion for each receive and two for each send 1528 * (send request + registration) 1529 * + 2 for drain and heartbeat 1530 * in case qp gets into error state 1531 */ 1532 wr_queue_size = SERVICE_CON_QUEUE_DEPTH * 3 + 2; 1533 /* We must be the first here */ 1534 if (WARN_ON(sess->s.dev)) 1535 return -EINVAL; 1536 1537 /* 1538 * The whole session uses device from user connection. 1539 * Be careful not to close user connection before ib dev 1540 * is gracefully put. 1541 */ 1542 sess->s.dev = rtrs_ib_dev_find_or_add(con->c.cm_id->device, 1543 &dev_pd); 1544 if (!sess->s.dev) { 1545 rtrs_wrn(sess->clt, 1546 "rtrs_ib_dev_find_get_or_add(): no memory\n"); 1547 return -ENOMEM; 1548 } 1549 sess->s.dev_ref = 1; 1550 query_fast_reg_mode(sess); 1551 } else { 1552 /* 1553 * Here we assume that session members are correctly set. 1554 * This is always true if user connection (cid == 0) is 1555 * established first. 1556 */ 1557 if (WARN_ON(!sess->s.dev)) 1558 return -EINVAL; 1559 if (WARN_ON(!sess->queue_depth)) 1560 return -EINVAL; 1561 1562 /* Shared between connections */ 1563 sess->s.dev_ref++; 1564 wr_queue_size = 1565 min_t(int, sess->s.dev->ib_dev->attrs.max_qp_wr, 1566 /* QD * (REQ + RSP + FR REGS or INVS) + drain */ 1567 sess->queue_depth * 3 + 1); 1568 } 1569 /* alloc iu to recv new rkey reply when server reports flags set */ 1570 if (sess->flags == RTRS_MSG_NEW_RKEY_F || con->c.cid == 0) { 1571 con->rsp_ius = rtrs_iu_alloc(wr_queue_size, sizeof(*rsp), 1572 GFP_KERNEL, sess->s.dev->ib_dev, 1573 DMA_FROM_DEVICE, 1574 rtrs_clt_rdma_done); 1575 if (!con->rsp_ius) 1576 return -ENOMEM; 1577 con->queue_size = wr_queue_size; 1578 } 1579 cq_vector = con->cpu % sess->s.dev->ib_dev->num_comp_vectors; 1580 err = rtrs_cq_qp_create(&sess->s, &con->c, sess->max_send_sge, 1581 cq_vector, wr_queue_size, wr_queue_size, 1582 IB_POLL_SOFTIRQ); 1583 /* 1584 * In case of error we do not bother to clean previous allocations, 1585 * since destroy_con_cq_qp() must be called. 1586 */ 1587 return err; 1588 } 1589 1590 static void destroy_con_cq_qp(struct rtrs_clt_con *con) 1591 { 1592 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1593 1594 /* 1595 * Be careful here: destroy_con_cq_qp() can be called even 1596 * create_con_cq_qp() failed, see comments there. 1597 */ 1598 lockdep_assert_held(&con->con_mutex); 1599 rtrs_cq_qp_destroy(&con->c); 1600 if (con->rsp_ius) { 1601 rtrs_iu_free(con->rsp_ius, sess->s.dev->ib_dev, con->queue_size); 1602 con->rsp_ius = NULL; 1603 con->queue_size = 0; 1604 } 1605 if (sess->s.dev_ref && !--sess->s.dev_ref) { 1606 rtrs_ib_dev_put(sess->s.dev); 1607 sess->s.dev = NULL; 1608 } 1609 } 1610 1611 static void stop_cm(struct rtrs_clt_con *con) 1612 { 1613 rdma_disconnect(con->c.cm_id); 1614 if (con->c.qp) 1615 ib_drain_qp(con->c.qp); 1616 } 1617 1618 static void destroy_cm(struct rtrs_clt_con *con) 1619 { 1620 rdma_destroy_id(con->c.cm_id); 1621 con->c.cm_id = NULL; 1622 } 1623 1624 static int rtrs_rdma_addr_resolved(struct rtrs_clt_con *con) 1625 { 1626 struct rtrs_sess *s = con->c.sess; 1627 int err; 1628 1629 mutex_lock(&con->con_mutex); 1630 err = create_con_cq_qp(con); 1631 mutex_unlock(&con->con_mutex); 1632 if (err) { 1633 rtrs_err(s, "create_con_cq_qp(), err: %d\n", err); 1634 return err; 1635 } 1636 err = rdma_resolve_route(con->c.cm_id, RTRS_CONNECT_TIMEOUT_MS); 1637 if (err) 1638 rtrs_err(s, "Resolving route failed, err: %d\n", err); 1639 1640 return err; 1641 } 1642 1643 static int rtrs_rdma_route_resolved(struct rtrs_clt_con *con) 1644 { 1645 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1646 struct rtrs_clt *clt = sess->clt; 1647 struct rtrs_msg_conn_req msg; 1648 struct rdma_conn_param param; 1649 1650 int err; 1651 1652 param = (struct rdma_conn_param) { 1653 .retry_count = 7, 1654 .rnr_retry_count = 7, 1655 .private_data = &msg, 1656 .private_data_len = sizeof(msg), 1657 }; 1658 1659 msg = (struct rtrs_msg_conn_req) { 1660 .magic = cpu_to_le16(RTRS_MAGIC), 1661 .version = cpu_to_le16(RTRS_PROTO_VER), 1662 .cid = cpu_to_le16(con->c.cid), 1663 .cid_num = cpu_to_le16(sess->s.con_num), 1664 .recon_cnt = cpu_to_le16(sess->s.recon_cnt), 1665 }; 1666 uuid_copy(&msg.sess_uuid, &sess->s.uuid); 1667 uuid_copy(&msg.paths_uuid, &clt->paths_uuid); 1668 1669 err = rdma_connect_locked(con->c.cm_id, ¶m); 1670 if (err) 1671 rtrs_err(clt, "rdma_connect_locked(): %d\n", err); 1672 1673 return err; 1674 } 1675 1676 static int rtrs_rdma_conn_established(struct rtrs_clt_con *con, 1677 struct rdma_cm_event *ev) 1678 { 1679 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1680 struct rtrs_clt *clt = sess->clt; 1681 const struct rtrs_msg_conn_rsp *msg; 1682 u16 version, queue_depth; 1683 int errno; 1684 u8 len; 1685 1686 msg = ev->param.conn.private_data; 1687 len = ev->param.conn.private_data_len; 1688 if (len < sizeof(*msg)) { 1689 rtrs_err(clt, "Invalid RTRS connection response\n"); 1690 return -ECONNRESET; 1691 } 1692 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) { 1693 rtrs_err(clt, "Invalid RTRS magic\n"); 1694 return -ECONNRESET; 1695 } 1696 version = le16_to_cpu(msg->version); 1697 if (version >> 8 != RTRS_PROTO_VER_MAJOR) { 1698 rtrs_err(clt, "Unsupported major RTRS version: %d, expected %d\n", 1699 version >> 8, RTRS_PROTO_VER_MAJOR); 1700 return -ECONNRESET; 1701 } 1702 errno = le16_to_cpu(msg->errno); 1703 if (errno) { 1704 rtrs_err(clt, "Invalid RTRS message: errno %d\n", 1705 errno); 1706 return -ECONNRESET; 1707 } 1708 if (con->c.cid == 0) { 1709 queue_depth = le16_to_cpu(msg->queue_depth); 1710 1711 if (queue_depth > MAX_SESS_QUEUE_DEPTH) { 1712 rtrs_err(clt, "Invalid RTRS message: queue=%d\n", 1713 queue_depth); 1714 return -ECONNRESET; 1715 } 1716 if (!sess->rbufs || sess->queue_depth < queue_depth) { 1717 kfree(sess->rbufs); 1718 sess->rbufs = kcalloc(queue_depth, sizeof(*sess->rbufs), 1719 GFP_KERNEL); 1720 if (!sess->rbufs) 1721 return -ENOMEM; 1722 } 1723 sess->queue_depth = queue_depth; 1724 sess->max_hdr_size = le32_to_cpu(msg->max_hdr_size); 1725 sess->max_io_size = le32_to_cpu(msg->max_io_size); 1726 sess->flags = le32_to_cpu(msg->flags); 1727 sess->chunk_size = sess->max_io_size + sess->max_hdr_size; 1728 1729 /* 1730 * Global queue depth and IO size is always a minimum. 1731 * If while a reconnection server sends us a value a bit 1732 * higher - client does not care and uses cached minimum. 1733 * 1734 * Since we can have several sessions (paths) restablishing 1735 * connections in parallel, use lock. 1736 */ 1737 mutex_lock(&clt->paths_mutex); 1738 clt->queue_depth = min_not_zero(sess->queue_depth, 1739 clt->queue_depth); 1740 clt->max_io_size = min_not_zero(sess->max_io_size, 1741 clt->max_io_size); 1742 mutex_unlock(&clt->paths_mutex); 1743 1744 /* 1745 * Cache the hca_port and hca_name for sysfs 1746 */ 1747 sess->hca_port = con->c.cm_id->port_num; 1748 scnprintf(sess->hca_name, sizeof(sess->hca_name), 1749 sess->s.dev->ib_dev->name); 1750 sess->s.src_addr = con->c.cm_id->route.addr.src_addr; 1751 } 1752 1753 return 0; 1754 } 1755 1756 static inline void flag_success_on_conn(struct rtrs_clt_con *con) 1757 { 1758 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 1759 1760 atomic_inc(&sess->connected_cnt); 1761 con->cm_err = 1; 1762 } 1763 1764 static int rtrs_rdma_conn_rejected(struct rtrs_clt_con *con, 1765 struct rdma_cm_event *ev) 1766 { 1767 struct rtrs_sess *s = con->c.sess; 1768 const struct rtrs_msg_conn_rsp *msg; 1769 const char *rej_msg; 1770 int status, errno; 1771 u8 data_len; 1772 1773 status = ev->status; 1774 rej_msg = rdma_reject_msg(con->c.cm_id, status); 1775 msg = rdma_consumer_reject_data(con->c.cm_id, ev, &data_len); 1776 1777 if (msg && data_len >= sizeof(*msg)) { 1778 errno = (int16_t)le16_to_cpu(msg->errno); 1779 if (errno == -EBUSY) 1780 rtrs_err(s, 1781 "Previous session is still exists on the server, please reconnect later\n"); 1782 else 1783 rtrs_err(s, 1784 "Connect rejected: status %d (%s), rtrs errno %d\n", 1785 status, rej_msg, errno); 1786 } else { 1787 rtrs_err(s, 1788 "Connect rejected but with malformed message: status %d (%s)\n", 1789 status, rej_msg); 1790 } 1791 1792 return -ECONNRESET; 1793 } 1794 1795 static void rtrs_clt_close_conns(struct rtrs_clt_sess *sess, bool wait) 1796 { 1797 if (rtrs_clt_change_state(sess, RTRS_CLT_CLOSING)) 1798 queue_work(rtrs_wq, &sess->close_work); 1799 if (wait) 1800 flush_work(&sess->close_work); 1801 } 1802 1803 static inline void flag_error_on_conn(struct rtrs_clt_con *con, int cm_err) 1804 { 1805 if (con->cm_err == 1) { 1806 struct rtrs_clt_sess *sess; 1807 1808 sess = to_clt_sess(con->c.sess); 1809 if (atomic_dec_and_test(&sess->connected_cnt)) 1810 1811 wake_up(&sess->state_wq); 1812 } 1813 con->cm_err = cm_err; 1814 } 1815 1816 static int rtrs_clt_rdma_cm_handler(struct rdma_cm_id *cm_id, 1817 struct rdma_cm_event *ev) 1818 { 1819 struct rtrs_clt_con *con = cm_id->context; 1820 struct rtrs_sess *s = con->c.sess; 1821 struct rtrs_clt_sess *sess = to_clt_sess(s); 1822 int cm_err = 0; 1823 1824 switch (ev->event) { 1825 case RDMA_CM_EVENT_ADDR_RESOLVED: 1826 cm_err = rtrs_rdma_addr_resolved(con); 1827 break; 1828 case RDMA_CM_EVENT_ROUTE_RESOLVED: 1829 cm_err = rtrs_rdma_route_resolved(con); 1830 break; 1831 case RDMA_CM_EVENT_ESTABLISHED: 1832 cm_err = rtrs_rdma_conn_established(con, ev); 1833 if (likely(!cm_err)) { 1834 /* 1835 * Report success and wake up. Here we abuse state_wq, 1836 * i.e. wake up without state change, but we set cm_err. 1837 */ 1838 flag_success_on_conn(con); 1839 wake_up(&sess->state_wq); 1840 return 0; 1841 } 1842 break; 1843 case RDMA_CM_EVENT_REJECTED: 1844 cm_err = rtrs_rdma_conn_rejected(con, ev); 1845 break; 1846 case RDMA_CM_EVENT_DISCONNECTED: 1847 /* No message for disconnecting */ 1848 cm_err = -ECONNRESET; 1849 break; 1850 case RDMA_CM_EVENT_CONNECT_ERROR: 1851 case RDMA_CM_EVENT_UNREACHABLE: 1852 case RDMA_CM_EVENT_ADDR_CHANGE: 1853 case RDMA_CM_EVENT_TIMEWAIT_EXIT: 1854 rtrs_wrn(s, "CM error event %d\n", ev->event); 1855 cm_err = -ECONNRESET; 1856 break; 1857 case RDMA_CM_EVENT_ADDR_ERROR: 1858 case RDMA_CM_EVENT_ROUTE_ERROR: 1859 rtrs_wrn(s, "CM error event %d\n", ev->event); 1860 cm_err = -EHOSTUNREACH; 1861 break; 1862 case RDMA_CM_EVENT_DEVICE_REMOVAL: 1863 /* 1864 * Device removal is a special case. Queue close and return 0. 1865 */ 1866 rtrs_clt_close_conns(sess, false); 1867 return 0; 1868 default: 1869 rtrs_err(s, "Unexpected RDMA CM event (%d)\n", ev->event); 1870 cm_err = -ECONNRESET; 1871 break; 1872 } 1873 1874 if (cm_err) { 1875 /* 1876 * cm error makes sense only on connection establishing, 1877 * in other cases we rely on normal procedure of reconnecting. 1878 */ 1879 flag_error_on_conn(con, cm_err); 1880 rtrs_rdma_error_recovery(con); 1881 } 1882 1883 return 0; 1884 } 1885 1886 static int create_cm(struct rtrs_clt_con *con) 1887 { 1888 struct rtrs_sess *s = con->c.sess; 1889 struct rtrs_clt_sess *sess = to_clt_sess(s); 1890 struct rdma_cm_id *cm_id; 1891 int err; 1892 1893 cm_id = rdma_create_id(&init_net, rtrs_clt_rdma_cm_handler, con, 1894 sess->s.dst_addr.ss_family == AF_IB ? 1895 RDMA_PS_IB : RDMA_PS_TCP, IB_QPT_RC); 1896 if (IS_ERR(cm_id)) { 1897 err = PTR_ERR(cm_id); 1898 rtrs_err(s, "Failed to create CM ID, err: %d\n", err); 1899 1900 return err; 1901 } 1902 con->c.cm_id = cm_id; 1903 con->cm_err = 0; 1904 /* allow the port to be reused */ 1905 err = rdma_set_reuseaddr(cm_id, 1); 1906 if (err != 0) { 1907 rtrs_err(s, "Set address reuse failed, err: %d\n", err); 1908 goto destroy_cm; 1909 } 1910 err = rdma_resolve_addr(cm_id, (struct sockaddr *)&sess->s.src_addr, 1911 (struct sockaddr *)&sess->s.dst_addr, 1912 RTRS_CONNECT_TIMEOUT_MS); 1913 if (err) { 1914 rtrs_err(s, "Failed to resolve address, err: %d\n", err); 1915 goto destroy_cm; 1916 } 1917 /* 1918 * Combine connection status and session events. This is needed 1919 * for waiting two possible cases: cm_err has something meaningful 1920 * or session state was really changed to error by device removal. 1921 */ 1922 err = wait_event_interruptible_timeout( 1923 sess->state_wq, 1924 con->cm_err || sess->state != RTRS_CLT_CONNECTING, 1925 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS)); 1926 if (err == 0 || err == -ERESTARTSYS) { 1927 if (err == 0) 1928 err = -ETIMEDOUT; 1929 /* Timedout or interrupted */ 1930 goto errr; 1931 } 1932 if (con->cm_err < 0) { 1933 err = con->cm_err; 1934 goto errr; 1935 } 1936 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTING) { 1937 /* Device removal */ 1938 err = -ECONNABORTED; 1939 goto errr; 1940 } 1941 1942 return 0; 1943 1944 errr: 1945 stop_cm(con); 1946 mutex_lock(&con->con_mutex); 1947 destroy_con_cq_qp(con); 1948 mutex_unlock(&con->con_mutex); 1949 destroy_cm: 1950 destroy_cm(con); 1951 1952 return err; 1953 } 1954 1955 static void rtrs_clt_sess_up(struct rtrs_clt_sess *sess) 1956 { 1957 struct rtrs_clt *clt = sess->clt; 1958 int up; 1959 1960 /* 1961 * We can fire RECONNECTED event only when all paths were 1962 * connected on rtrs_clt_open(), then each was disconnected 1963 * and the first one connected again. That's why this nasty 1964 * game with counter value. 1965 */ 1966 1967 mutex_lock(&clt->paths_ev_mutex); 1968 up = ++clt->paths_up; 1969 /* 1970 * Here it is safe to access paths num directly since up counter 1971 * is greater than MAX_PATHS_NUM only while rtrs_clt_open() is 1972 * in progress, thus paths removals are impossible. 1973 */ 1974 if (up > MAX_PATHS_NUM && up == MAX_PATHS_NUM + clt->paths_num) 1975 clt->paths_up = clt->paths_num; 1976 else if (up == 1) 1977 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_RECONNECTED); 1978 mutex_unlock(&clt->paths_ev_mutex); 1979 1980 /* Mark session as established */ 1981 sess->established = true; 1982 sess->reconnect_attempts = 0; 1983 sess->stats->reconnects.successful_cnt++; 1984 } 1985 1986 static void rtrs_clt_sess_down(struct rtrs_clt_sess *sess) 1987 { 1988 struct rtrs_clt *clt = sess->clt; 1989 1990 if (!sess->established) 1991 return; 1992 1993 sess->established = false; 1994 mutex_lock(&clt->paths_ev_mutex); 1995 WARN_ON(!clt->paths_up); 1996 if (--clt->paths_up == 0) 1997 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_DISCONNECTED); 1998 mutex_unlock(&clt->paths_ev_mutex); 1999 } 2000 2001 static void rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_sess *sess) 2002 { 2003 struct rtrs_clt_con *con; 2004 unsigned int cid; 2005 2006 WARN_ON(READ_ONCE(sess->state) == RTRS_CLT_CONNECTED); 2007 2008 /* 2009 * Possible race with rtrs_clt_open(), when DEVICE_REMOVAL comes 2010 * exactly in between. Start destroying after it finishes. 2011 */ 2012 mutex_lock(&sess->init_mutex); 2013 mutex_unlock(&sess->init_mutex); 2014 2015 /* 2016 * All IO paths must observe !CONNECTED state before we 2017 * free everything. 2018 */ 2019 synchronize_rcu(); 2020 2021 rtrs_clt_stop_hb(sess); 2022 2023 /* 2024 * The order it utterly crucial: firstly disconnect and complete all 2025 * rdma requests with error (thus set in_use=false for requests), 2026 * then fail outstanding requests checking in_use for each, and 2027 * eventually notify upper layer about session disconnection. 2028 */ 2029 2030 for (cid = 0; cid < sess->s.con_num; cid++) { 2031 if (!sess->s.con[cid]) 2032 break; 2033 con = to_clt_con(sess->s.con[cid]); 2034 stop_cm(con); 2035 } 2036 fail_all_outstanding_reqs(sess); 2037 free_sess_reqs(sess); 2038 rtrs_clt_sess_down(sess); 2039 2040 /* 2041 * Wait for graceful shutdown, namely when peer side invokes 2042 * rdma_disconnect(). 'connected_cnt' is decremented only on 2043 * CM events, thus if other side had crashed and hb has detected 2044 * something is wrong, here we will stuck for exactly timeout ms, 2045 * since CM does not fire anything. That is fine, we are not in 2046 * hurry. 2047 */ 2048 wait_event_timeout(sess->state_wq, !atomic_read(&sess->connected_cnt), 2049 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS)); 2050 2051 for (cid = 0; cid < sess->s.con_num; cid++) { 2052 if (!sess->s.con[cid]) 2053 break; 2054 con = to_clt_con(sess->s.con[cid]); 2055 mutex_lock(&con->con_mutex); 2056 destroy_con_cq_qp(con); 2057 mutex_unlock(&con->con_mutex); 2058 destroy_cm(con); 2059 destroy_con(con); 2060 } 2061 } 2062 2063 static inline bool xchg_sessions(struct rtrs_clt_sess __rcu **rcu_ppcpu_path, 2064 struct rtrs_clt_sess *sess, 2065 struct rtrs_clt_sess *next) 2066 { 2067 struct rtrs_clt_sess **ppcpu_path; 2068 2069 /* Call cmpxchg() without sparse warnings */ 2070 ppcpu_path = (typeof(ppcpu_path))rcu_ppcpu_path; 2071 return sess == cmpxchg(ppcpu_path, sess, next); 2072 } 2073 2074 static void rtrs_clt_remove_path_from_arr(struct rtrs_clt_sess *sess) 2075 { 2076 struct rtrs_clt *clt = sess->clt; 2077 struct rtrs_clt_sess *next; 2078 bool wait_for_grace = false; 2079 int cpu; 2080 2081 mutex_lock(&clt->paths_mutex); 2082 list_del_rcu(&sess->s.entry); 2083 2084 /* Make sure everybody observes path removal. */ 2085 synchronize_rcu(); 2086 2087 /* 2088 * At this point nobody sees @sess in the list, but still we have 2089 * dangling pointer @pcpu_path which _can_ point to @sess. Since 2090 * nobody can observe @sess in the list, we guarantee that IO path 2091 * will not assign @sess to @pcpu_path, i.e. @pcpu_path can be equal 2092 * to @sess, but can never again become @sess. 2093 */ 2094 2095 /* 2096 * Decrement paths number only after grace period, because 2097 * caller of do_each_path() must firstly observe list without 2098 * path and only then decremented paths number. 2099 * 2100 * Otherwise there can be the following situation: 2101 * o Two paths exist and IO is coming. 2102 * o One path is removed: 2103 * CPU#0 CPU#1 2104 * do_each_path(): rtrs_clt_remove_path_from_arr(): 2105 * path = get_next_path() 2106 * ^^^ list_del_rcu(path) 2107 * [!CONNECTED path] clt->paths_num-- 2108 * ^^^^^^^^^ 2109 * load clt->paths_num from 2 to 1 2110 * ^^^^^^^^^ 2111 * sees 1 2112 * 2113 * path is observed as !CONNECTED, but do_each_path() loop 2114 * ends, because expression i < clt->paths_num is false. 2115 */ 2116 clt->paths_num--; 2117 2118 /* 2119 * Get @next connection from current @sess which is going to be 2120 * removed. If @sess is the last element, then @next is NULL. 2121 */ 2122 rcu_read_lock(); 2123 next = list_next_or_null_rr_rcu(&clt->paths_list, &sess->s.entry, 2124 typeof(*next), s.entry); 2125 rcu_read_unlock(); 2126 2127 /* 2128 * @pcpu paths can still point to the path which is going to be 2129 * removed, so change the pointer manually. 2130 */ 2131 for_each_possible_cpu(cpu) { 2132 struct rtrs_clt_sess __rcu **ppcpu_path; 2133 2134 ppcpu_path = per_cpu_ptr(clt->pcpu_path, cpu); 2135 if (rcu_dereference_protected(*ppcpu_path, 2136 lockdep_is_held(&clt->paths_mutex)) != sess) 2137 /* 2138 * synchronize_rcu() was called just after deleting 2139 * entry from the list, thus IO code path cannot 2140 * change pointer back to the pointer which is going 2141 * to be removed, we are safe here. 2142 */ 2143 continue; 2144 2145 /* 2146 * We race with IO code path, which also changes pointer, 2147 * thus we have to be careful not to overwrite it. 2148 */ 2149 if (xchg_sessions(ppcpu_path, sess, next)) 2150 /* 2151 * @ppcpu_path was successfully replaced with @next, 2152 * that means that someone could also pick up the 2153 * @sess and dereferencing it right now, so wait for 2154 * a grace period is required. 2155 */ 2156 wait_for_grace = true; 2157 } 2158 if (wait_for_grace) 2159 synchronize_rcu(); 2160 2161 mutex_unlock(&clt->paths_mutex); 2162 } 2163 2164 static void rtrs_clt_add_path_to_arr(struct rtrs_clt_sess *sess) 2165 { 2166 struct rtrs_clt *clt = sess->clt; 2167 2168 mutex_lock(&clt->paths_mutex); 2169 clt->paths_num++; 2170 2171 list_add_tail_rcu(&sess->s.entry, &clt->paths_list); 2172 mutex_unlock(&clt->paths_mutex); 2173 } 2174 2175 static void rtrs_clt_close_work(struct work_struct *work) 2176 { 2177 struct rtrs_clt_sess *sess; 2178 2179 sess = container_of(work, struct rtrs_clt_sess, close_work); 2180 2181 cancel_delayed_work_sync(&sess->reconnect_dwork); 2182 rtrs_clt_stop_and_destroy_conns(sess); 2183 rtrs_clt_change_state(sess, RTRS_CLT_CLOSED); 2184 } 2185 2186 static int init_conns(struct rtrs_clt_sess *sess) 2187 { 2188 unsigned int cid; 2189 int err; 2190 2191 /* 2192 * On every new session connections increase reconnect counter 2193 * to avoid clashes with previous sessions not yet closed 2194 * sessions on a server side. 2195 */ 2196 sess->s.recon_cnt++; 2197 2198 /* Establish all RDMA connections */ 2199 for (cid = 0; cid < sess->s.con_num; cid++) { 2200 err = create_con(sess, cid); 2201 if (err) 2202 goto destroy; 2203 2204 err = create_cm(to_clt_con(sess->s.con[cid])); 2205 if (err) { 2206 destroy_con(to_clt_con(sess->s.con[cid])); 2207 goto destroy; 2208 } 2209 } 2210 err = alloc_sess_reqs(sess); 2211 if (err) 2212 goto destroy; 2213 2214 rtrs_clt_start_hb(sess); 2215 2216 return 0; 2217 2218 destroy: 2219 while (cid--) { 2220 struct rtrs_clt_con *con = to_clt_con(sess->s.con[cid]); 2221 2222 stop_cm(con); 2223 2224 mutex_lock(&con->con_mutex); 2225 destroy_con_cq_qp(con); 2226 mutex_unlock(&con->con_mutex); 2227 destroy_cm(con); 2228 destroy_con(con); 2229 } 2230 /* 2231 * If we've never taken async path and got an error, say, 2232 * doing rdma_resolve_addr(), switch to CONNECTION_ERR state 2233 * manually to keep reconnecting. 2234 */ 2235 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR); 2236 2237 return err; 2238 } 2239 2240 static void rtrs_clt_info_req_done(struct ib_cq *cq, struct ib_wc *wc) 2241 { 2242 struct rtrs_clt_con *con = cq->cq_context; 2243 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 2244 struct rtrs_iu *iu; 2245 2246 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 2247 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 2248 2249 if (unlikely(wc->status != IB_WC_SUCCESS)) { 2250 rtrs_err(sess->clt, "Sess info request send failed: %s\n", 2251 ib_wc_status_msg(wc->status)); 2252 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR); 2253 return; 2254 } 2255 2256 rtrs_clt_update_wc_stats(con); 2257 } 2258 2259 static int process_info_rsp(struct rtrs_clt_sess *sess, 2260 const struct rtrs_msg_info_rsp *msg) 2261 { 2262 unsigned int sg_cnt, total_len; 2263 int i, sgi; 2264 2265 sg_cnt = le16_to_cpu(msg->sg_cnt); 2266 if (unlikely(!sg_cnt || (sess->queue_depth % sg_cnt))) { 2267 rtrs_err(sess->clt, "Incorrect sg_cnt %d, is not multiple\n", 2268 sg_cnt); 2269 return -EINVAL; 2270 } 2271 2272 /* 2273 * Check if IB immediate data size is enough to hold the mem_id and 2274 * the offset inside the memory chunk. 2275 */ 2276 if (unlikely((ilog2(sg_cnt - 1) + 1) + 2277 (ilog2(sess->chunk_size - 1) + 1) > 2278 MAX_IMM_PAYL_BITS)) { 2279 rtrs_err(sess->clt, 2280 "RDMA immediate size (%db) not enough to encode %d buffers of size %dB\n", 2281 MAX_IMM_PAYL_BITS, sg_cnt, sess->chunk_size); 2282 return -EINVAL; 2283 } 2284 total_len = 0; 2285 for (sgi = 0, i = 0; sgi < sg_cnt && i < sess->queue_depth; sgi++) { 2286 const struct rtrs_sg_desc *desc = &msg->desc[sgi]; 2287 u32 len, rkey; 2288 u64 addr; 2289 2290 addr = le64_to_cpu(desc->addr); 2291 rkey = le32_to_cpu(desc->key); 2292 len = le32_to_cpu(desc->len); 2293 2294 total_len += len; 2295 2296 if (unlikely(!len || (len % sess->chunk_size))) { 2297 rtrs_err(sess->clt, "Incorrect [%d].len %d\n", sgi, 2298 len); 2299 return -EINVAL; 2300 } 2301 for ( ; len && i < sess->queue_depth; i++) { 2302 sess->rbufs[i].addr = addr; 2303 sess->rbufs[i].rkey = rkey; 2304 2305 len -= sess->chunk_size; 2306 addr += sess->chunk_size; 2307 } 2308 } 2309 /* Sanity check */ 2310 if (unlikely(sgi != sg_cnt || i != sess->queue_depth)) { 2311 rtrs_err(sess->clt, "Incorrect sg vector, not fully mapped\n"); 2312 return -EINVAL; 2313 } 2314 if (unlikely(total_len != sess->chunk_size * sess->queue_depth)) { 2315 rtrs_err(sess->clt, "Incorrect total_len %d\n", total_len); 2316 return -EINVAL; 2317 } 2318 2319 return 0; 2320 } 2321 2322 static void rtrs_clt_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc) 2323 { 2324 struct rtrs_clt_con *con = cq->cq_context; 2325 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess); 2326 struct rtrs_msg_info_rsp *msg; 2327 enum rtrs_clt_state state; 2328 struct rtrs_iu *iu; 2329 size_t rx_sz; 2330 int err; 2331 2332 state = RTRS_CLT_CONNECTING_ERR; 2333 2334 WARN_ON(con->c.cid); 2335 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 2336 if (unlikely(wc->status != IB_WC_SUCCESS)) { 2337 rtrs_err(sess->clt, "Sess info response recv failed: %s\n", 2338 ib_wc_status_msg(wc->status)); 2339 goto out; 2340 } 2341 WARN_ON(wc->opcode != IB_WC_RECV); 2342 2343 if (unlikely(wc->byte_len < sizeof(*msg))) { 2344 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n", 2345 wc->byte_len); 2346 goto out; 2347 } 2348 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr, 2349 iu->size, DMA_FROM_DEVICE); 2350 msg = iu->buf; 2351 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_INFO_RSP)) { 2352 rtrs_err(sess->clt, "Sess info response is malformed: type %d\n", 2353 le16_to_cpu(msg->type)); 2354 goto out; 2355 } 2356 rx_sz = sizeof(*msg); 2357 rx_sz += sizeof(msg->desc[0]) * le16_to_cpu(msg->sg_cnt); 2358 if (unlikely(wc->byte_len < rx_sz)) { 2359 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n", 2360 wc->byte_len); 2361 goto out; 2362 } 2363 err = process_info_rsp(sess, msg); 2364 if (unlikely(err)) 2365 goto out; 2366 2367 err = post_recv_sess(sess); 2368 if (unlikely(err)) 2369 goto out; 2370 2371 state = RTRS_CLT_CONNECTED; 2372 2373 out: 2374 rtrs_clt_update_wc_stats(con); 2375 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 2376 rtrs_clt_change_state(sess, state); 2377 } 2378 2379 static int rtrs_send_sess_info(struct rtrs_clt_sess *sess) 2380 { 2381 struct rtrs_clt_con *usr_con = to_clt_con(sess->s.con[0]); 2382 struct rtrs_msg_info_req *msg; 2383 struct rtrs_iu *tx_iu, *rx_iu; 2384 size_t rx_sz; 2385 int err; 2386 2387 rx_sz = sizeof(struct rtrs_msg_info_rsp); 2388 rx_sz += sizeof(u64) * MAX_SESS_QUEUE_DEPTH; 2389 2390 tx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), GFP_KERNEL, 2391 sess->s.dev->ib_dev, DMA_TO_DEVICE, 2392 rtrs_clt_info_req_done); 2393 rx_iu = rtrs_iu_alloc(1, rx_sz, GFP_KERNEL, sess->s.dev->ib_dev, 2394 DMA_FROM_DEVICE, rtrs_clt_info_rsp_done); 2395 if (unlikely(!tx_iu || !rx_iu)) { 2396 err = -ENOMEM; 2397 goto out; 2398 } 2399 /* Prepare for getting info response */ 2400 err = rtrs_iu_post_recv(&usr_con->c, rx_iu); 2401 if (unlikely(err)) { 2402 rtrs_err(sess->clt, "rtrs_iu_post_recv(), err: %d\n", err); 2403 goto out; 2404 } 2405 rx_iu = NULL; 2406 2407 msg = tx_iu->buf; 2408 msg->type = cpu_to_le16(RTRS_MSG_INFO_REQ); 2409 memcpy(msg->sessname, sess->s.sessname, sizeof(msg->sessname)); 2410 2411 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr, 2412 tx_iu->size, DMA_TO_DEVICE); 2413 2414 /* Send info request */ 2415 err = rtrs_iu_post_send(&usr_con->c, tx_iu, sizeof(*msg), NULL); 2416 if (unlikely(err)) { 2417 rtrs_err(sess->clt, "rtrs_iu_post_send(), err: %d\n", err); 2418 goto out; 2419 } 2420 tx_iu = NULL; 2421 2422 /* Wait for state change */ 2423 wait_event_interruptible_timeout(sess->state_wq, 2424 sess->state != RTRS_CLT_CONNECTING, 2425 msecs_to_jiffies( 2426 RTRS_CONNECT_TIMEOUT_MS)); 2427 if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)) { 2428 if (READ_ONCE(sess->state) == RTRS_CLT_CONNECTING_ERR) 2429 err = -ECONNRESET; 2430 else 2431 err = -ETIMEDOUT; 2432 goto out; 2433 } 2434 2435 out: 2436 if (tx_iu) 2437 rtrs_iu_free(tx_iu, sess->s.dev->ib_dev, 1); 2438 if (rx_iu) 2439 rtrs_iu_free(rx_iu, sess->s.dev->ib_dev, 1); 2440 if (unlikely(err)) 2441 /* If we've never taken async path because of malloc problems */ 2442 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR); 2443 2444 return err; 2445 } 2446 2447 /** 2448 * init_sess() - establishes all session connections and does handshake 2449 * @sess: client session. 2450 * In case of error full close or reconnect procedure should be taken, 2451 * because reconnect or close async works can be started. 2452 */ 2453 static int init_sess(struct rtrs_clt_sess *sess) 2454 { 2455 int err; 2456 2457 mutex_lock(&sess->init_mutex); 2458 err = init_conns(sess); 2459 if (err) { 2460 rtrs_err(sess->clt, "init_conns(), err: %d\n", err); 2461 goto out; 2462 } 2463 err = rtrs_send_sess_info(sess); 2464 if (err) { 2465 rtrs_err(sess->clt, "rtrs_send_sess_info(), err: %d\n", err); 2466 goto out; 2467 } 2468 rtrs_clt_sess_up(sess); 2469 out: 2470 mutex_unlock(&sess->init_mutex); 2471 2472 return err; 2473 } 2474 2475 static void rtrs_clt_reconnect_work(struct work_struct *work) 2476 { 2477 struct rtrs_clt_sess *sess; 2478 struct rtrs_clt *clt; 2479 unsigned int delay_ms; 2480 int err; 2481 2482 sess = container_of(to_delayed_work(work), struct rtrs_clt_sess, 2483 reconnect_dwork); 2484 clt = sess->clt; 2485 2486 if (READ_ONCE(sess->state) != RTRS_CLT_RECONNECTING) 2487 return; 2488 2489 if (sess->reconnect_attempts >= clt->max_reconnect_attempts) { 2490 /* Close a session completely if max attempts is reached */ 2491 rtrs_clt_close_conns(sess, false); 2492 return; 2493 } 2494 sess->reconnect_attempts++; 2495 2496 /* Stop everything */ 2497 rtrs_clt_stop_and_destroy_conns(sess); 2498 msleep(RTRS_RECONNECT_BACKOFF); 2499 if (rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING)) { 2500 err = init_sess(sess); 2501 if (err) 2502 goto reconnect_again; 2503 } 2504 2505 return; 2506 2507 reconnect_again: 2508 if (rtrs_clt_change_state(sess, RTRS_CLT_RECONNECTING)) { 2509 sess->stats->reconnects.fail_cnt++; 2510 delay_ms = clt->reconnect_delay_sec * 1000; 2511 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 2512 msecs_to_jiffies(delay_ms + 2513 prandom_u32() % 2514 RTRS_RECONNECT_SEED)); 2515 } 2516 } 2517 2518 static void rtrs_clt_dev_release(struct device *dev) 2519 { 2520 struct rtrs_clt *clt = container_of(dev, struct rtrs_clt, dev); 2521 2522 kfree(clt); 2523 } 2524 2525 static struct rtrs_clt *alloc_clt(const char *sessname, size_t paths_num, 2526 u16 port, size_t pdu_sz, void *priv, 2527 void (*link_ev)(void *priv, 2528 enum rtrs_clt_link_ev ev), 2529 unsigned int max_segments, 2530 size_t max_segment_size, 2531 unsigned int reconnect_delay_sec, 2532 unsigned int max_reconnect_attempts) 2533 { 2534 struct rtrs_clt *clt; 2535 int err; 2536 2537 if (!paths_num || paths_num > MAX_PATHS_NUM) 2538 return ERR_PTR(-EINVAL); 2539 2540 if (strlen(sessname) >= sizeof(clt->sessname)) 2541 return ERR_PTR(-EINVAL); 2542 2543 clt = kzalloc(sizeof(*clt), GFP_KERNEL); 2544 if (!clt) 2545 return ERR_PTR(-ENOMEM); 2546 2547 clt->pcpu_path = alloc_percpu(typeof(*clt->pcpu_path)); 2548 if (!clt->pcpu_path) { 2549 kfree(clt); 2550 return ERR_PTR(-ENOMEM); 2551 } 2552 2553 uuid_gen(&clt->paths_uuid); 2554 INIT_LIST_HEAD_RCU(&clt->paths_list); 2555 clt->paths_num = paths_num; 2556 clt->paths_up = MAX_PATHS_NUM; 2557 clt->port = port; 2558 clt->pdu_sz = pdu_sz; 2559 clt->max_segments = max_segments; 2560 clt->max_segment_size = max_segment_size; 2561 clt->reconnect_delay_sec = reconnect_delay_sec; 2562 clt->max_reconnect_attempts = max_reconnect_attempts; 2563 clt->priv = priv; 2564 clt->link_ev = link_ev; 2565 clt->mp_policy = MP_POLICY_MIN_INFLIGHT; 2566 strlcpy(clt->sessname, sessname, sizeof(clt->sessname)); 2567 init_waitqueue_head(&clt->permits_wait); 2568 mutex_init(&clt->paths_ev_mutex); 2569 mutex_init(&clt->paths_mutex); 2570 2571 clt->dev.class = rtrs_clt_dev_class; 2572 clt->dev.release = rtrs_clt_dev_release; 2573 err = dev_set_name(&clt->dev, "%s", sessname); 2574 if (err) { 2575 free_percpu(clt->pcpu_path); 2576 kfree(clt); 2577 return ERR_PTR(err); 2578 } 2579 /* 2580 * Suppress user space notification until 2581 * sysfs files are created 2582 */ 2583 dev_set_uevent_suppress(&clt->dev, true); 2584 err = device_register(&clt->dev); 2585 if (err) { 2586 free_percpu(clt->pcpu_path); 2587 put_device(&clt->dev); 2588 return ERR_PTR(err); 2589 } 2590 2591 clt->kobj_paths = kobject_create_and_add("paths", &clt->dev.kobj); 2592 if (!clt->kobj_paths) { 2593 free_percpu(clt->pcpu_path); 2594 device_unregister(&clt->dev); 2595 return NULL; 2596 } 2597 err = rtrs_clt_create_sysfs_root_files(clt); 2598 if (err) { 2599 free_percpu(clt->pcpu_path); 2600 kobject_del(clt->kobj_paths); 2601 kobject_put(clt->kobj_paths); 2602 device_unregister(&clt->dev); 2603 return ERR_PTR(err); 2604 } 2605 dev_set_uevent_suppress(&clt->dev, false); 2606 kobject_uevent(&clt->dev.kobj, KOBJ_ADD); 2607 2608 return clt; 2609 } 2610 2611 static void wait_for_inflight_permits(struct rtrs_clt *clt) 2612 { 2613 if (clt->permits_map) { 2614 size_t sz = clt->queue_depth; 2615 2616 wait_event(clt->permits_wait, 2617 find_first_bit(clt->permits_map, sz) >= sz); 2618 } 2619 } 2620 2621 static void free_clt(struct rtrs_clt *clt) 2622 { 2623 wait_for_inflight_permits(clt); 2624 free_permits(clt); 2625 free_percpu(clt->pcpu_path); 2626 mutex_destroy(&clt->paths_ev_mutex); 2627 mutex_destroy(&clt->paths_mutex); 2628 /* release callback will free clt in last put */ 2629 device_unregister(&clt->dev); 2630 } 2631 2632 /** 2633 * rtrs_clt_open() - Open a session to an RTRS server 2634 * @ops: holds the link event callback and the private pointer. 2635 * @sessname: name of the session 2636 * @paths: Paths to be established defined by their src and dst addresses 2637 * @paths_num: Number of elements in the @paths array 2638 * @port: port to be used by the RTRS session 2639 * @pdu_sz: Size of extra payload which can be accessed after permit allocation. 2640 * @reconnect_delay_sec: time between reconnect tries 2641 * @max_segments: Max. number of segments per IO request 2642 * @max_segment_size: Max. size of one segment 2643 * @max_reconnect_attempts: Number of times to reconnect on error before giving 2644 * up, 0 for * disabled, -1 for forever 2645 * 2646 * Starts session establishment with the rtrs_server. The function can block 2647 * up to ~2000ms before it returns. 2648 * 2649 * Return a valid pointer on success otherwise PTR_ERR. 2650 */ 2651 struct rtrs_clt *rtrs_clt_open(struct rtrs_clt_ops *ops, 2652 const char *sessname, 2653 const struct rtrs_addr *paths, 2654 size_t paths_num, u16 port, 2655 size_t pdu_sz, u8 reconnect_delay_sec, 2656 u16 max_segments, 2657 size_t max_segment_size, 2658 s16 max_reconnect_attempts) 2659 { 2660 struct rtrs_clt_sess *sess, *tmp; 2661 struct rtrs_clt *clt; 2662 int err, i; 2663 2664 clt = alloc_clt(sessname, paths_num, port, pdu_sz, ops->priv, 2665 ops->link_ev, 2666 max_segments, max_segment_size, reconnect_delay_sec, 2667 max_reconnect_attempts); 2668 if (IS_ERR(clt)) { 2669 err = PTR_ERR(clt); 2670 goto out; 2671 } 2672 for (i = 0; i < paths_num; i++) { 2673 struct rtrs_clt_sess *sess; 2674 2675 sess = alloc_sess(clt, &paths[i], nr_cpu_ids, 2676 max_segments, max_segment_size); 2677 if (IS_ERR(sess)) { 2678 err = PTR_ERR(sess); 2679 goto close_all_sess; 2680 } 2681 list_add_tail_rcu(&sess->s.entry, &clt->paths_list); 2682 2683 err = init_sess(sess); 2684 if (err) { 2685 list_del_rcu(&sess->s.entry); 2686 rtrs_clt_close_conns(sess, true); 2687 free_sess(sess); 2688 goto close_all_sess; 2689 } 2690 2691 err = rtrs_clt_create_sess_files(sess); 2692 if (err) { 2693 list_del_rcu(&sess->s.entry); 2694 rtrs_clt_close_conns(sess, true); 2695 free_sess(sess); 2696 goto close_all_sess; 2697 } 2698 } 2699 err = alloc_permits(clt); 2700 if (err) 2701 goto close_all_sess; 2702 2703 return clt; 2704 2705 close_all_sess: 2706 list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) { 2707 rtrs_clt_destroy_sess_files(sess, NULL); 2708 rtrs_clt_close_conns(sess, true); 2709 kobject_put(&sess->kobj); 2710 } 2711 rtrs_clt_destroy_sysfs_root_files(clt); 2712 rtrs_clt_destroy_sysfs_root_folders(clt); 2713 free_clt(clt); 2714 2715 out: 2716 return ERR_PTR(err); 2717 } 2718 EXPORT_SYMBOL(rtrs_clt_open); 2719 2720 /** 2721 * rtrs_clt_close() - Close a session 2722 * @clt: Session handle. Session is freed upon return. 2723 */ 2724 void rtrs_clt_close(struct rtrs_clt *clt) 2725 { 2726 struct rtrs_clt_sess *sess, *tmp; 2727 2728 /* Firstly forbid sysfs access */ 2729 rtrs_clt_destroy_sysfs_root_files(clt); 2730 rtrs_clt_destroy_sysfs_root_folders(clt); 2731 2732 /* Now it is safe to iterate over all paths without locks */ 2733 list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) { 2734 rtrs_clt_destroy_sess_files(sess, NULL); 2735 rtrs_clt_close_conns(sess, true); 2736 kobject_put(&sess->kobj); 2737 } 2738 free_clt(clt); 2739 } 2740 EXPORT_SYMBOL(rtrs_clt_close); 2741 2742 int rtrs_clt_reconnect_from_sysfs(struct rtrs_clt_sess *sess) 2743 { 2744 enum rtrs_clt_state old_state; 2745 int err = -EBUSY; 2746 bool changed; 2747 2748 changed = rtrs_clt_change_state_get_old(sess, RTRS_CLT_RECONNECTING, 2749 &old_state); 2750 if (changed) { 2751 sess->reconnect_attempts = 0; 2752 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 0); 2753 } 2754 if (changed || old_state == RTRS_CLT_RECONNECTING) { 2755 /* 2756 * flush_delayed_work() queues pending work for immediate 2757 * execution, so do the flush if we have queued something 2758 * right now or work is pending. 2759 */ 2760 flush_delayed_work(&sess->reconnect_dwork); 2761 err = (READ_ONCE(sess->state) == 2762 RTRS_CLT_CONNECTED ? 0 : -ENOTCONN); 2763 } 2764 2765 return err; 2766 } 2767 2768 int rtrs_clt_disconnect_from_sysfs(struct rtrs_clt_sess *sess) 2769 { 2770 rtrs_clt_close_conns(sess, true); 2771 2772 return 0; 2773 } 2774 2775 int rtrs_clt_remove_path_from_sysfs(struct rtrs_clt_sess *sess, 2776 const struct attribute *sysfs_self) 2777 { 2778 enum rtrs_clt_state old_state; 2779 bool changed; 2780 2781 /* 2782 * Continue stopping path till state was changed to DEAD or 2783 * state was observed as DEAD: 2784 * 1. State was changed to DEAD - we were fast and nobody 2785 * invoked rtrs_clt_reconnect(), which can again start 2786 * reconnecting. 2787 * 2. State was observed as DEAD - we have someone in parallel 2788 * removing the path. 2789 */ 2790 do { 2791 rtrs_clt_close_conns(sess, true); 2792 changed = rtrs_clt_change_state_get_old(sess, 2793 RTRS_CLT_DEAD, 2794 &old_state); 2795 } while (!changed && old_state != RTRS_CLT_DEAD); 2796 2797 if (likely(changed)) { 2798 rtrs_clt_destroy_sess_files(sess, sysfs_self); 2799 rtrs_clt_remove_path_from_arr(sess); 2800 kobject_put(&sess->kobj); 2801 } 2802 2803 return 0; 2804 } 2805 2806 void rtrs_clt_set_max_reconnect_attempts(struct rtrs_clt *clt, int value) 2807 { 2808 clt->max_reconnect_attempts = (unsigned int)value; 2809 } 2810 2811 int rtrs_clt_get_max_reconnect_attempts(const struct rtrs_clt *clt) 2812 { 2813 return (int)clt->max_reconnect_attempts; 2814 } 2815 2816 /** 2817 * rtrs_clt_request() - Request data transfer to/from server via RDMA. 2818 * 2819 * @dir: READ/WRITE 2820 * @ops: callback function to be called as confirmation, and the pointer. 2821 * @clt: Session 2822 * @permit: Preallocated permit 2823 * @vec: Message that is sent to server together with the request. 2824 * Sum of len of all @vec elements limited to <= IO_MSG_SIZE. 2825 * Since the msg is copied internally it can be allocated on stack. 2826 * @nr: Number of elements in @vec. 2827 * @data_len: length of data sent to/from server 2828 * @sg: Pages to be sent/received to/from server. 2829 * @sg_cnt: Number of elements in the @sg 2830 * 2831 * Return: 2832 * 0: Success 2833 * <0: Error 2834 * 2835 * On dir=READ rtrs client will request a data transfer from Server to client. 2836 * The data that the server will respond with will be stored in @sg when 2837 * the user receives an %RTRS_CLT_RDMA_EV_RDMA_REQUEST_WRITE_COMPL event. 2838 * On dir=WRITE rtrs client will rdma write data in sg to server side. 2839 */ 2840 int rtrs_clt_request(int dir, struct rtrs_clt_req_ops *ops, 2841 struct rtrs_clt *clt, struct rtrs_permit *permit, 2842 const struct kvec *vec, size_t nr, size_t data_len, 2843 struct scatterlist *sg, unsigned int sg_cnt) 2844 { 2845 struct rtrs_clt_io_req *req; 2846 struct rtrs_clt_sess *sess; 2847 2848 enum dma_data_direction dma_dir; 2849 int err = -ECONNABORTED, i; 2850 size_t usr_len, hdr_len; 2851 struct path_it it; 2852 2853 /* Get kvec length */ 2854 for (i = 0, usr_len = 0; i < nr; i++) 2855 usr_len += vec[i].iov_len; 2856 2857 if (dir == READ) { 2858 hdr_len = sizeof(struct rtrs_msg_rdma_read) + 2859 sg_cnt * sizeof(struct rtrs_sg_desc); 2860 dma_dir = DMA_FROM_DEVICE; 2861 } else { 2862 hdr_len = sizeof(struct rtrs_msg_rdma_write); 2863 dma_dir = DMA_TO_DEVICE; 2864 } 2865 2866 rcu_read_lock(); 2867 for (path_it_init(&it, clt); 2868 (sess = it.next_path(&it)) && it.i < it.clt->paths_num; it.i++) { 2869 if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)) 2870 continue; 2871 2872 if (unlikely(usr_len + hdr_len > sess->max_hdr_size)) { 2873 rtrs_wrn_rl(sess->clt, 2874 "%s request failed, user message size is %zu and header length %zu, but max size is %u\n", 2875 dir == READ ? "Read" : "Write", 2876 usr_len, hdr_len, sess->max_hdr_size); 2877 err = -EMSGSIZE; 2878 break; 2879 } 2880 req = rtrs_clt_get_req(sess, ops->conf_fn, permit, ops->priv, 2881 vec, usr_len, sg, sg_cnt, data_len, 2882 dma_dir); 2883 if (dir == READ) 2884 err = rtrs_clt_read_req(req); 2885 else 2886 err = rtrs_clt_write_req(req); 2887 if (unlikely(err)) { 2888 req->in_use = false; 2889 continue; 2890 } 2891 /* Success path */ 2892 break; 2893 } 2894 path_it_deinit(&it); 2895 rcu_read_unlock(); 2896 2897 return err; 2898 } 2899 EXPORT_SYMBOL(rtrs_clt_request); 2900 2901 /** 2902 * rtrs_clt_query() - queries RTRS session attributes 2903 *@clt: session pointer 2904 *@attr: query results for session attributes. 2905 * Returns: 2906 * 0 on success 2907 * -ECOMM no connection to the server 2908 */ 2909 int rtrs_clt_query(struct rtrs_clt *clt, struct rtrs_attrs *attr) 2910 { 2911 if (!rtrs_clt_is_connected(clt)) 2912 return -ECOMM; 2913 2914 attr->queue_depth = clt->queue_depth; 2915 attr->max_io_size = clt->max_io_size; 2916 attr->sess_kobj = &clt->dev.kobj; 2917 strlcpy(attr->sessname, clt->sessname, sizeof(attr->sessname)); 2918 2919 return 0; 2920 } 2921 EXPORT_SYMBOL(rtrs_clt_query); 2922 2923 int rtrs_clt_create_path_from_sysfs(struct rtrs_clt *clt, 2924 struct rtrs_addr *addr) 2925 { 2926 struct rtrs_clt_sess *sess; 2927 int err; 2928 2929 sess = alloc_sess(clt, addr, nr_cpu_ids, clt->max_segments, 2930 clt->max_segment_size); 2931 if (IS_ERR(sess)) 2932 return PTR_ERR(sess); 2933 2934 /* 2935 * It is totally safe to add path in CONNECTING state: coming 2936 * IO will never grab it. Also it is very important to add 2937 * path before init, since init fires LINK_CONNECTED event. 2938 */ 2939 rtrs_clt_add_path_to_arr(sess); 2940 2941 err = init_sess(sess); 2942 if (err) 2943 goto close_sess; 2944 2945 err = rtrs_clt_create_sess_files(sess); 2946 if (err) 2947 goto close_sess; 2948 2949 return 0; 2950 2951 close_sess: 2952 rtrs_clt_remove_path_from_arr(sess); 2953 rtrs_clt_close_conns(sess, true); 2954 free_sess(sess); 2955 2956 return err; 2957 } 2958 2959 static int rtrs_clt_ib_dev_init(struct rtrs_ib_dev *dev) 2960 { 2961 if (!(dev->ib_dev->attrs.device_cap_flags & 2962 IB_DEVICE_MEM_MGT_EXTENSIONS)) { 2963 pr_err("Memory registrations not supported.\n"); 2964 return -ENOTSUPP; 2965 } 2966 2967 return 0; 2968 } 2969 2970 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops = { 2971 .init = rtrs_clt_ib_dev_init 2972 }; 2973 2974 static int __init rtrs_client_init(void) 2975 { 2976 rtrs_rdma_dev_pd_init(0, &dev_pd); 2977 2978 rtrs_clt_dev_class = class_create(THIS_MODULE, "rtrs-client"); 2979 if (IS_ERR(rtrs_clt_dev_class)) { 2980 pr_err("Failed to create rtrs-client dev class\n"); 2981 return PTR_ERR(rtrs_clt_dev_class); 2982 } 2983 rtrs_wq = alloc_workqueue("rtrs_client_wq", 0, 0); 2984 if (!rtrs_wq) { 2985 class_destroy(rtrs_clt_dev_class); 2986 return -ENOMEM; 2987 } 2988 2989 return 0; 2990 } 2991 2992 static void __exit rtrs_client_exit(void) 2993 { 2994 destroy_workqueue(rtrs_wq); 2995 class_destroy(rtrs_clt_dev_class); 2996 rtrs_rdma_dev_pd_deinit(&dev_pd); 2997 } 2998 2999 module_init(rtrs_client_init); 3000 module_exit(rtrs_client_exit); 3001