1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Transport Layer 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 #include <linux/mempool.h> 15 16 #include "rtrs-srv.h" 17 #include "rtrs-log.h" 18 #include <rdma/ib_cm.h> 19 #include <rdma/ib_verbs.h> 20 21 MODULE_DESCRIPTION("RDMA Transport Server"); 22 MODULE_LICENSE("GPL"); 23 24 /* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */ 25 #define DEFAULT_MAX_CHUNK_SIZE (128 << 10) 26 #define DEFAULT_SESS_QUEUE_DEPTH 512 27 #define MAX_HDR_SIZE PAGE_SIZE 28 29 /* We guarantee to serve 10 paths at least */ 30 #define CHUNK_POOL_SZ 10 31 32 static struct rtrs_rdma_dev_pd dev_pd; 33 static mempool_t *chunk_pool; 34 struct class *rtrs_dev_class; 35 static struct rtrs_srv_ib_ctx ib_ctx; 36 37 static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE; 38 static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH; 39 40 static bool always_invalidate = true; 41 module_param(always_invalidate, bool, 0444); 42 MODULE_PARM_DESC(always_invalidate, 43 "Invalidate memory registration for contiguous memory regions before accessing."); 44 45 module_param_named(max_chunk_size, max_chunk_size, int, 0444); 46 MODULE_PARM_DESC(max_chunk_size, 47 "Max size for each IO request, when change the unit is in byte (default: " 48 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)"); 49 50 module_param_named(sess_queue_depth, sess_queue_depth, int, 0444); 51 MODULE_PARM_DESC(sess_queue_depth, 52 "Number of buffers for pending I/O requests to allocate per session. Maximum: " 53 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: " 54 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")"); 55 56 static cpumask_t cq_affinity_mask = { CPU_BITS_ALL }; 57 58 static struct workqueue_struct *rtrs_wq; 59 60 static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c) 61 { 62 return container_of(c, struct rtrs_srv_con, c); 63 } 64 65 static inline struct rtrs_srv_sess *to_srv_sess(struct rtrs_sess *s) 66 { 67 return container_of(s, struct rtrs_srv_sess, s); 68 } 69 70 static bool __rtrs_srv_change_state(struct rtrs_srv_sess *sess, 71 enum rtrs_srv_state new_state) 72 { 73 enum rtrs_srv_state old_state; 74 bool changed = false; 75 76 lockdep_assert_held(&sess->state_lock); 77 old_state = sess->state; 78 switch (new_state) { 79 case RTRS_SRV_CONNECTED: 80 switch (old_state) { 81 case RTRS_SRV_CONNECTING: 82 changed = true; 83 fallthrough; 84 default: 85 break; 86 } 87 break; 88 case RTRS_SRV_CLOSING: 89 switch (old_state) { 90 case RTRS_SRV_CONNECTING: 91 case RTRS_SRV_CONNECTED: 92 changed = true; 93 fallthrough; 94 default: 95 break; 96 } 97 break; 98 case RTRS_SRV_CLOSED: 99 switch (old_state) { 100 case RTRS_SRV_CLOSING: 101 changed = true; 102 fallthrough; 103 default: 104 break; 105 } 106 break; 107 default: 108 break; 109 } 110 if (changed) 111 sess->state = new_state; 112 113 return changed; 114 } 115 116 static bool rtrs_srv_change_state(struct rtrs_srv_sess *sess, 117 enum rtrs_srv_state new_state) 118 { 119 bool changed; 120 121 spin_lock_irq(&sess->state_lock); 122 changed = __rtrs_srv_change_state(sess, new_state); 123 spin_unlock_irq(&sess->state_lock); 124 125 return changed; 126 } 127 128 static void free_id(struct rtrs_srv_op *id) 129 { 130 if (!id) 131 return; 132 kfree(id); 133 } 134 135 static void rtrs_srv_free_ops_ids(struct rtrs_srv_sess *sess) 136 { 137 struct rtrs_srv *srv = sess->srv; 138 int i; 139 140 WARN_ON(atomic_read(&sess->ids_inflight)); 141 if (sess->ops_ids) { 142 for (i = 0; i < srv->queue_depth; i++) 143 free_id(sess->ops_ids[i]); 144 kfree(sess->ops_ids); 145 sess->ops_ids = NULL; 146 } 147 } 148 149 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc); 150 151 static struct ib_cqe io_comp_cqe = { 152 .done = rtrs_srv_rdma_done 153 }; 154 155 static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_sess *sess) 156 { 157 struct rtrs_srv *srv = sess->srv; 158 struct rtrs_srv_op *id; 159 int i; 160 161 sess->ops_ids = kcalloc(srv->queue_depth, sizeof(*sess->ops_ids), 162 GFP_KERNEL); 163 if (!sess->ops_ids) 164 goto err; 165 166 for (i = 0; i < srv->queue_depth; ++i) { 167 id = kzalloc(sizeof(*id), GFP_KERNEL); 168 if (!id) 169 goto err; 170 171 sess->ops_ids[i] = id; 172 } 173 init_waitqueue_head(&sess->ids_waitq); 174 atomic_set(&sess->ids_inflight, 0); 175 176 return 0; 177 178 err: 179 rtrs_srv_free_ops_ids(sess); 180 return -ENOMEM; 181 } 182 183 static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_sess *sess) 184 { 185 atomic_inc(&sess->ids_inflight); 186 } 187 188 static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_sess *sess) 189 { 190 if (atomic_dec_and_test(&sess->ids_inflight)) 191 wake_up(&sess->ids_waitq); 192 } 193 194 static void rtrs_srv_wait_ops_ids(struct rtrs_srv_sess *sess) 195 { 196 wait_event(sess->ids_waitq, !atomic_read(&sess->ids_inflight)); 197 } 198 199 200 static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc) 201 { 202 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 203 struct rtrs_sess *s = con->c.sess; 204 struct rtrs_srv_sess *sess = to_srv_sess(s); 205 206 if (unlikely(wc->status != IB_WC_SUCCESS)) { 207 rtrs_err(s, "REG MR failed: %s\n", 208 ib_wc_status_msg(wc->status)); 209 close_sess(sess); 210 return; 211 } 212 } 213 214 static struct ib_cqe local_reg_cqe = { 215 .done = rtrs_srv_reg_mr_done 216 }; 217 218 static int rdma_write_sg(struct rtrs_srv_op *id) 219 { 220 struct rtrs_sess *s = id->con->c.sess; 221 struct rtrs_srv_sess *sess = to_srv_sess(s); 222 dma_addr_t dma_addr = sess->dma_addr[id->msg_id]; 223 struct rtrs_srv_mr *srv_mr; 224 struct rtrs_srv *srv = sess->srv; 225 struct ib_send_wr inv_wr; 226 struct ib_rdma_wr imm_wr; 227 struct ib_rdma_wr *wr = NULL; 228 enum ib_send_flags flags; 229 size_t sg_cnt; 230 int err, offset; 231 bool need_inval; 232 u32 rkey = 0; 233 struct ib_reg_wr rwr; 234 struct ib_sge *plist; 235 struct ib_sge list; 236 237 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt); 238 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F; 239 if (unlikely(sg_cnt != 1)) 240 return -EINVAL; 241 242 offset = 0; 243 244 wr = &id->tx_wr; 245 plist = &id->tx_sg; 246 plist->addr = dma_addr + offset; 247 plist->length = le32_to_cpu(id->rd_msg->desc[0].len); 248 249 /* WR will fail with length error 250 * if this is 0 251 */ 252 if (unlikely(plist->length == 0)) { 253 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n"); 254 return -EINVAL; 255 } 256 257 plist->lkey = sess->s.dev->ib_pd->local_dma_lkey; 258 offset += plist->length; 259 260 wr->wr.sg_list = plist; 261 wr->wr.num_sge = 1; 262 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr); 263 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key); 264 if (rkey == 0) 265 rkey = wr->rkey; 266 else 267 /* Only one key is actually used */ 268 WARN_ON_ONCE(rkey != wr->rkey); 269 270 wr->wr.opcode = IB_WR_RDMA_WRITE; 271 wr->wr.wr_cqe = &io_comp_cqe; 272 wr->wr.ex.imm_data = 0; 273 wr->wr.send_flags = 0; 274 275 if (need_inval && always_invalidate) { 276 wr->wr.next = &rwr.wr; 277 rwr.wr.next = &inv_wr; 278 inv_wr.next = &imm_wr.wr; 279 } else if (always_invalidate) { 280 wr->wr.next = &rwr.wr; 281 rwr.wr.next = &imm_wr.wr; 282 } else if (need_inval) { 283 wr->wr.next = &inv_wr; 284 inv_wr.next = &imm_wr.wr; 285 } else { 286 wr->wr.next = &imm_wr.wr; 287 } 288 /* 289 * From time to time we have to post signaled sends, 290 * or send queue will fill up and only QP reset can help. 291 */ 292 flags = (atomic_inc_return(&id->con->wr_cnt) % srv->queue_depth) ? 293 0 : IB_SEND_SIGNALED; 294 295 if (need_inval) { 296 inv_wr.sg_list = NULL; 297 inv_wr.num_sge = 0; 298 inv_wr.opcode = IB_WR_SEND_WITH_INV; 299 inv_wr.wr_cqe = &io_comp_cqe; 300 inv_wr.send_flags = 0; 301 inv_wr.ex.invalidate_rkey = rkey; 302 } 303 304 imm_wr.wr.next = NULL; 305 if (always_invalidate) { 306 struct rtrs_msg_rkey_rsp *msg; 307 308 srv_mr = &sess->mrs[id->msg_id]; 309 rwr.wr.opcode = IB_WR_REG_MR; 310 rwr.wr.wr_cqe = &local_reg_cqe; 311 rwr.wr.num_sge = 0; 312 rwr.mr = srv_mr->mr; 313 rwr.wr.send_flags = 0; 314 rwr.key = srv_mr->mr->rkey; 315 rwr.access = (IB_ACCESS_LOCAL_WRITE | 316 IB_ACCESS_REMOTE_WRITE); 317 msg = srv_mr->iu->buf; 318 msg->buf_id = cpu_to_le16(id->msg_id); 319 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 320 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 321 322 list.addr = srv_mr->iu->dma_addr; 323 list.length = sizeof(*msg); 324 list.lkey = sess->s.dev->ib_pd->local_dma_lkey; 325 imm_wr.wr.sg_list = &list; 326 imm_wr.wr.num_sge = 1; 327 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 328 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, 329 srv_mr->iu->dma_addr, 330 srv_mr->iu->size, DMA_TO_DEVICE); 331 } else { 332 imm_wr.wr.sg_list = NULL; 333 imm_wr.wr.num_sge = 0; 334 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 335 } 336 imm_wr.wr.send_flags = flags; 337 imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id, 338 0, need_inval)); 339 340 imm_wr.wr.wr_cqe = &io_comp_cqe; 341 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, dma_addr, 342 offset, DMA_BIDIRECTIONAL); 343 344 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL); 345 if (unlikely(err)) 346 rtrs_err(s, 347 "Posting RDMA-Write-Request to QP failed, err: %d\n", 348 err); 349 350 return err; 351 } 352 353 /** 354 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE 355 * requests or on successful WRITE request. 356 * @con: the connection to send back result 357 * @id: the id associated with the IO 358 * @errno: the error number of the IO. 359 * 360 * Return 0 on success, errno otherwise. 361 */ 362 static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id, 363 int errno) 364 { 365 struct rtrs_sess *s = con->c.sess; 366 struct rtrs_srv_sess *sess = to_srv_sess(s); 367 struct ib_send_wr inv_wr, *wr = NULL; 368 struct ib_rdma_wr imm_wr; 369 struct ib_reg_wr rwr; 370 struct rtrs_srv *srv = sess->srv; 371 struct rtrs_srv_mr *srv_mr; 372 bool need_inval = false; 373 enum ib_send_flags flags; 374 u32 imm; 375 int err; 376 377 if (id->dir == READ) { 378 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg; 379 size_t sg_cnt; 380 381 need_inval = le16_to_cpu(rd_msg->flags) & 382 RTRS_MSG_NEED_INVAL_F; 383 sg_cnt = le16_to_cpu(rd_msg->sg_cnt); 384 385 if (need_inval) { 386 if (likely(sg_cnt)) { 387 inv_wr.wr_cqe = &io_comp_cqe; 388 inv_wr.sg_list = NULL; 389 inv_wr.num_sge = 0; 390 inv_wr.opcode = IB_WR_SEND_WITH_INV; 391 inv_wr.send_flags = 0; 392 /* Only one key is actually used */ 393 inv_wr.ex.invalidate_rkey = 394 le32_to_cpu(rd_msg->desc[0].key); 395 } else { 396 WARN_ON_ONCE(1); 397 need_inval = false; 398 } 399 } 400 } 401 402 if (need_inval && always_invalidate) { 403 wr = &inv_wr; 404 inv_wr.next = &rwr.wr; 405 rwr.wr.next = &imm_wr.wr; 406 } else if (always_invalidate) { 407 wr = &rwr.wr; 408 rwr.wr.next = &imm_wr.wr; 409 } else if (need_inval) { 410 wr = &inv_wr; 411 inv_wr.next = &imm_wr.wr; 412 } else { 413 wr = &imm_wr.wr; 414 } 415 /* 416 * From time to time we have to post signalled sends, 417 * or send queue will fill up and only QP reset can help. 418 */ 419 flags = (atomic_inc_return(&con->wr_cnt) % srv->queue_depth) ? 420 0 : IB_SEND_SIGNALED; 421 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval); 422 imm_wr.wr.next = NULL; 423 if (always_invalidate) { 424 struct ib_sge list; 425 struct rtrs_msg_rkey_rsp *msg; 426 427 srv_mr = &sess->mrs[id->msg_id]; 428 rwr.wr.next = &imm_wr.wr; 429 rwr.wr.opcode = IB_WR_REG_MR; 430 rwr.wr.wr_cqe = &local_reg_cqe; 431 rwr.wr.num_sge = 0; 432 rwr.wr.send_flags = 0; 433 rwr.mr = srv_mr->mr; 434 rwr.key = srv_mr->mr->rkey; 435 rwr.access = (IB_ACCESS_LOCAL_WRITE | 436 IB_ACCESS_REMOTE_WRITE); 437 msg = srv_mr->iu->buf; 438 msg->buf_id = cpu_to_le16(id->msg_id); 439 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 440 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 441 442 list.addr = srv_mr->iu->dma_addr; 443 list.length = sizeof(*msg); 444 list.lkey = sess->s.dev->ib_pd->local_dma_lkey; 445 imm_wr.wr.sg_list = &list; 446 imm_wr.wr.num_sge = 1; 447 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 448 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, 449 srv_mr->iu->dma_addr, 450 srv_mr->iu->size, DMA_TO_DEVICE); 451 } else { 452 imm_wr.wr.sg_list = NULL; 453 imm_wr.wr.num_sge = 0; 454 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 455 } 456 imm_wr.wr.send_flags = flags; 457 imm_wr.wr.wr_cqe = &io_comp_cqe; 458 459 imm_wr.wr.ex.imm_data = cpu_to_be32(imm); 460 461 err = ib_post_send(id->con->c.qp, wr, NULL); 462 if (unlikely(err)) 463 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n", 464 err); 465 466 return err; 467 } 468 469 void close_sess(struct rtrs_srv_sess *sess) 470 { 471 if (rtrs_srv_change_state(sess, RTRS_SRV_CLOSING)) 472 queue_work(rtrs_wq, &sess->close_work); 473 WARN_ON(sess->state != RTRS_SRV_CLOSING); 474 } 475 476 static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state) 477 { 478 switch (state) { 479 case RTRS_SRV_CONNECTING: 480 return "RTRS_SRV_CONNECTING"; 481 case RTRS_SRV_CONNECTED: 482 return "RTRS_SRV_CONNECTED"; 483 case RTRS_SRV_CLOSING: 484 return "RTRS_SRV_CLOSING"; 485 case RTRS_SRV_CLOSED: 486 return "RTRS_SRV_CLOSED"; 487 default: 488 return "UNKNOWN"; 489 } 490 } 491 492 /** 493 * rtrs_srv_resp_rdma() - Finish an RDMA request 494 * 495 * @id: Internal RTRS operation identifier 496 * @status: Response Code sent to the other side for this operation. 497 * 0 = success, <=0 error 498 * Context: any 499 * 500 * Finish a RDMA operation. A message is sent to the client and the 501 * corresponding memory areas will be released. 502 */ 503 bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status) 504 { 505 struct rtrs_srv_sess *sess; 506 struct rtrs_srv_con *con; 507 struct rtrs_sess *s; 508 int err; 509 510 if (WARN_ON(!id)) 511 return true; 512 513 con = id->con; 514 s = con->c.sess; 515 sess = to_srv_sess(s); 516 517 id->status = status; 518 519 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) { 520 rtrs_err_rl(s, 521 "Sending I/O response failed, session %s is disconnected, sess state %s\n", 522 kobject_name(&sess->kobj), 523 rtrs_srv_state_str(sess->state)); 524 goto out; 525 } 526 if (always_invalidate) { 527 struct rtrs_srv_mr *mr = &sess->mrs[id->msg_id]; 528 529 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey)); 530 } 531 if (unlikely(atomic_sub_return(1, 532 &con->sq_wr_avail) < 0)) { 533 rtrs_err(s, "IB send queue full: sess=%s cid=%d\n", 534 kobject_name(&sess->kobj), 535 con->c.cid); 536 atomic_add(1, &con->sq_wr_avail); 537 spin_lock(&con->rsp_wr_wait_lock); 538 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list); 539 spin_unlock(&con->rsp_wr_wait_lock); 540 return false; 541 } 542 543 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt) 544 err = send_io_resp_imm(con, id, status); 545 else 546 err = rdma_write_sg(id); 547 548 if (unlikely(err)) { 549 rtrs_err_rl(s, "IO response failed: %d: sess=%s\n", err, 550 kobject_name(&sess->kobj)); 551 close_sess(sess); 552 } 553 out: 554 rtrs_srv_put_ops_ids(sess); 555 return true; 556 } 557 EXPORT_SYMBOL(rtrs_srv_resp_rdma); 558 559 /** 560 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv. 561 * @srv: Session pointer 562 * @priv: The private pointer that is associated with the session. 563 */ 564 void rtrs_srv_set_sess_priv(struct rtrs_srv *srv, void *priv) 565 { 566 srv->priv = priv; 567 } 568 EXPORT_SYMBOL(rtrs_srv_set_sess_priv); 569 570 static void unmap_cont_bufs(struct rtrs_srv_sess *sess) 571 { 572 int i; 573 574 for (i = 0; i < sess->mrs_num; i++) { 575 struct rtrs_srv_mr *srv_mr; 576 577 srv_mr = &sess->mrs[i]; 578 rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1); 579 ib_dereg_mr(srv_mr->mr); 580 ib_dma_unmap_sg(sess->s.dev->ib_dev, srv_mr->sgt.sgl, 581 srv_mr->sgt.nents, DMA_BIDIRECTIONAL); 582 sg_free_table(&srv_mr->sgt); 583 } 584 kfree(sess->mrs); 585 } 586 587 static int map_cont_bufs(struct rtrs_srv_sess *sess) 588 { 589 struct rtrs_srv *srv = sess->srv; 590 struct rtrs_sess *ss = &sess->s; 591 int i, mri, err, mrs_num; 592 unsigned int chunk_bits; 593 int chunks_per_mr = 1; 594 595 /* 596 * Here we map queue_depth chunks to MR. Firstly we have to 597 * figure out how many chunks can we map per MR. 598 */ 599 if (always_invalidate) { 600 /* 601 * in order to do invalidate for each chunks of memory, we needs 602 * more memory regions. 603 */ 604 mrs_num = srv->queue_depth; 605 } else { 606 chunks_per_mr = 607 sess->s.dev->ib_dev->attrs.max_fast_reg_page_list_len; 608 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr); 609 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num); 610 } 611 612 sess->mrs = kcalloc(mrs_num, sizeof(*sess->mrs), GFP_KERNEL); 613 if (!sess->mrs) 614 return -ENOMEM; 615 616 sess->mrs_num = mrs_num; 617 618 for (mri = 0; mri < mrs_num; mri++) { 619 struct rtrs_srv_mr *srv_mr = &sess->mrs[mri]; 620 struct sg_table *sgt = &srv_mr->sgt; 621 struct scatterlist *s; 622 struct ib_mr *mr; 623 int nr, chunks; 624 625 chunks = chunks_per_mr * mri; 626 if (!always_invalidate) 627 chunks_per_mr = min_t(int, chunks_per_mr, 628 srv->queue_depth - chunks); 629 630 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL); 631 if (err) 632 goto err; 633 634 for_each_sg(sgt->sgl, s, chunks_per_mr, i) 635 sg_set_page(s, srv->chunks[chunks + i], 636 max_chunk_size, 0); 637 638 nr = ib_dma_map_sg(sess->s.dev->ib_dev, sgt->sgl, 639 sgt->nents, DMA_BIDIRECTIONAL); 640 if (nr < sgt->nents) { 641 err = nr < 0 ? nr : -EINVAL; 642 goto free_sg; 643 } 644 mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG, 645 sgt->nents); 646 if (IS_ERR(mr)) { 647 err = PTR_ERR(mr); 648 goto unmap_sg; 649 } 650 nr = ib_map_mr_sg(mr, sgt->sgl, sgt->nents, 651 NULL, max_chunk_size); 652 if (nr < 0 || nr < sgt->nents) { 653 err = nr < 0 ? nr : -EINVAL; 654 goto dereg_mr; 655 } 656 657 if (always_invalidate) { 658 srv_mr->iu = rtrs_iu_alloc(1, 659 sizeof(struct rtrs_msg_rkey_rsp), 660 GFP_KERNEL, sess->s.dev->ib_dev, 661 DMA_TO_DEVICE, rtrs_srv_rdma_done); 662 if (!srv_mr->iu) { 663 err = -ENOMEM; 664 rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err); 665 goto dereg_mr; 666 } 667 } 668 /* Eventually dma addr for each chunk can be cached */ 669 for_each_sg(sgt->sgl, s, sgt->orig_nents, i) 670 sess->dma_addr[chunks + i] = sg_dma_address(s); 671 672 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey)); 673 srv_mr->mr = mr; 674 675 continue; 676 err: 677 while (mri--) { 678 srv_mr = &sess->mrs[mri]; 679 sgt = &srv_mr->sgt; 680 mr = srv_mr->mr; 681 rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1); 682 dereg_mr: 683 ib_dereg_mr(mr); 684 unmap_sg: 685 ib_dma_unmap_sg(sess->s.dev->ib_dev, sgt->sgl, 686 sgt->nents, DMA_BIDIRECTIONAL); 687 free_sg: 688 sg_free_table(sgt); 689 } 690 kfree(sess->mrs); 691 692 return err; 693 } 694 695 chunk_bits = ilog2(srv->queue_depth - 1) + 1; 696 sess->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits); 697 698 return 0; 699 } 700 701 static void rtrs_srv_hb_err_handler(struct rtrs_con *c) 702 { 703 close_sess(to_srv_sess(c->sess)); 704 } 705 706 static void rtrs_srv_init_hb(struct rtrs_srv_sess *sess) 707 { 708 rtrs_init_hb(&sess->s, &io_comp_cqe, 709 RTRS_HB_INTERVAL_MS, 710 RTRS_HB_MISSED_MAX, 711 rtrs_srv_hb_err_handler, 712 rtrs_wq); 713 } 714 715 static void rtrs_srv_start_hb(struct rtrs_srv_sess *sess) 716 { 717 rtrs_start_hb(&sess->s); 718 } 719 720 static void rtrs_srv_stop_hb(struct rtrs_srv_sess *sess) 721 { 722 rtrs_stop_hb(&sess->s); 723 } 724 725 static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc) 726 { 727 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 728 struct rtrs_sess *s = con->c.sess; 729 struct rtrs_srv_sess *sess = to_srv_sess(s); 730 struct rtrs_iu *iu; 731 732 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 733 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 734 735 if (unlikely(wc->status != IB_WC_SUCCESS)) { 736 rtrs_err(s, "Sess info response send failed: %s\n", 737 ib_wc_status_msg(wc->status)); 738 close_sess(sess); 739 return; 740 } 741 WARN_ON(wc->opcode != IB_WC_SEND); 742 } 743 744 static void rtrs_srv_sess_up(struct rtrs_srv_sess *sess) 745 { 746 struct rtrs_srv *srv = sess->srv; 747 struct rtrs_srv_ctx *ctx = srv->ctx; 748 int up; 749 750 mutex_lock(&srv->paths_ev_mutex); 751 up = ++srv->paths_up; 752 if (up == 1) 753 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL); 754 mutex_unlock(&srv->paths_ev_mutex); 755 756 /* Mark session as established */ 757 sess->established = true; 758 } 759 760 static void rtrs_srv_sess_down(struct rtrs_srv_sess *sess) 761 { 762 struct rtrs_srv *srv = sess->srv; 763 struct rtrs_srv_ctx *ctx = srv->ctx; 764 765 if (!sess->established) 766 return; 767 768 sess->established = false; 769 mutex_lock(&srv->paths_ev_mutex); 770 WARN_ON(!srv->paths_up); 771 if (--srv->paths_up == 0) 772 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv); 773 mutex_unlock(&srv->paths_ev_mutex); 774 } 775 776 static int post_recv_sess(struct rtrs_srv_sess *sess); 777 778 static int process_info_req(struct rtrs_srv_con *con, 779 struct rtrs_msg_info_req *msg) 780 { 781 struct rtrs_sess *s = con->c.sess; 782 struct rtrs_srv_sess *sess = to_srv_sess(s); 783 struct ib_send_wr *reg_wr = NULL; 784 struct rtrs_msg_info_rsp *rsp; 785 struct rtrs_iu *tx_iu; 786 struct ib_reg_wr *rwr; 787 int mri, err; 788 size_t tx_sz; 789 790 err = post_recv_sess(sess); 791 if (unlikely(err)) { 792 rtrs_err(s, "post_recv_sess(), err: %d\n", err); 793 return err; 794 } 795 rwr = kcalloc(sess->mrs_num, sizeof(*rwr), GFP_KERNEL); 796 if (unlikely(!rwr)) 797 return -ENOMEM; 798 strlcpy(sess->s.sessname, msg->sessname, sizeof(sess->s.sessname)); 799 800 tx_sz = sizeof(*rsp); 801 tx_sz += sizeof(rsp->desc[0]) * sess->mrs_num; 802 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, sess->s.dev->ib_dev, 803 DMA_TO_DEVICE, rtrs_srv_info_rsp_done); 804 if (unlikely(!tx_iu)) { 805 err = -ENOMEM; 806 goto rwr_free; 807 } 808 809 rsp = tx_iu->buf; 810 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP); 811 rsp->sg_cnt = cpu_to_le16(sess->mrs_num); 812 813 for (mri = 0; mri < sess->mrs_num; mri++) { 814 struct ib_mr *mr = sess->mrs[mri].mr; 815 816 rsp->desc[mri].addr = cpu_to_le64(mr->iova); 817 rsp->desc[mri].key = cpu_to_le32(mr->rkey); 818 rsp->desc[mri].len = cpu_to_le32(mr->length); 819 820 /* 821 * Fill in reg MR request and chain them *backwards* 822 */ 823 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL; 824 rwr[mri].wr.opcode = IB_WR_REG_MR; 825 rwr[mri].wr.wr_cqe = &local_reg_cqe; 826 rwr[mri].wr.num_sge = 0; 827 rwr[mri].wr.send_flags = 0; 828 rwr[mri].mr = mr; 829 rwr[mri].key = mr->rkey; 830 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE | 831 IB_ACCESS_REMOTE_WRITE); 832 reg_wr = &rwr[mri].wr; 833 } 834 835 err = rtrs_srv_create_sess_files(sess); 836 if (unlikely(err)) 837 goto iu_free; 838 kobject_get(&sess->kobj); 839 get_device(&sess->srv->dev); 840 rtrs_srv_change_state(sess, RTRS_SRV_CONNECTED); 841 rtrs_srv_start_hb(sess); 842 843 /* 844 * We do not account number of established connections at the current 845 * moment, we rely on the client, which should send info request when 846 * all connections are successfully established. Thus, simply notify 847 * listener with a proper event if we are the first path. 848 */ 849 rtrs_srv_sess_up(sess); 850 851 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr, 852 tx_iu->size, DMA_TO_DEVICE); 853 854 /* Send info response */ 855 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr); 856 if (unlikely(err)) { 857 rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err); 858 iu_free: 859 rtrs_iu_free(tx_iu, sess->s.dev->ib_dev, 1); 860 } 861 rwr_free: 862 kfree(rwr); 863 864 return err; 865 } 866 867 static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc) 868 { 869 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 870 struct rtrs_sess *s = con->c.sess; 871 struct rtrs_srv_sess *sess = to_srv_sess(s); 872 struct rtrs_msg_info_req *msg; 873 struct rtrs_iu *iu; 874 int err; 875 876 WARN_ON(con->c.cid); 877 878 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 879 if (unlikely(wc->status != IB_WC_SUCCESS)) { 880 rtrs_err(s, "Sess info request receive failed: %s\n", 881 ib_wc_status_msg(wc->status)); 882 goto close; 883 } 884 WARN_ON(wc->opcode != IB_WC_RECV); 885 886 if (unlikely(wc->byte_len < sizeof(*msg))) { 887 rtrs_err(s, "Sess info request is malformed: size %d\n", 888 wc->byte_len); 889 goto close; 890 } 891 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr, 892 iu->size, DMA_FROM_DEVICE); 893 msg = iu->buf; 894 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ)) { 895 rtrs_err(s, "Sess info request is malformed: type %d\n", 896 le16_to_cpu(msg->type)); 897 goto close; 898 } 899 err = process_info_req(con, msg); 900 if (unlikely(err)) 901 goto close; 902 903 out: 904 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 905 return; 906 close: 907 close_sess(sess); 908 goto out; 909 } 910 911 static int post_recv_info_req(struct rtrs_srv_con *con) 912 { 913 struct rtrs_sess *s = con->c.sess; 914 struct rtrs_srv_sess *sess = to_srv_sess(s); 915 struct rtrs_iu *rx_iu; 916 int err; 917 918 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), 919 GFP_KERNEL, sess->s.dev->ib_dev, 920 DMA_FROM_DEVICE, rtrs_srv_info_req_done); 921 if (unlikely(!rx_iu)) 922 return -ENOMEM; 923 /* Prepare for getting info response */ 924 err = rtrs_iu_post_recv(&con->c, rx_iu); 925 if (unlikely(err)) { 926 rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err); 927 rtrs_iu_free(rx_iu, sess->s.dev->ib_dev, 1); 928 return err; 929 } 930 931 return 0; 932 } 933 934 static int post_recv_io(struct rtrs_srv_con *con, size_t q_size) 935 { 936 int i, err; 937 938 for (i = 0; i < q_size; i++) { 939 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 940 if (unlikely(err)) 941 return err; 942 } 943 944 return 0; 945 } 946 947 static int post_recv_sess(struct rtrs_srv_sess *sess) 948 { 949 struct rtrs_srv *srv = sess->srv; 950 struct rtrs_sess *s = &sess->s; 951 size_t q_size; 952 int err, cid; 953 954 for (cid = 0; cid < sess->s.con_num; cid++) { 955 if (cid == 0) 956 q_size = SERVICE_CON_QUEUE_DEPTH; 957 else 958 q_size = srv->queue_depth; 959 960 err = post_recv_io(to_srv_con(sess->s.con[cid]), q_size); 961 if (unlikely(err)) { 962 rtrs_err(s, "post_recv_io(), err: %d\n", err); 963 return err; 964 } 965 } 966 967 return 0; 968 } 969 970 static void process_read(struct rtrs_srv_con *con, 971 struct rtrs_msg_rdma_read *msg, 972 u32 buf_id, u32 off) 973 { 974 struct rtrs_sess *s = con->c.sess; 975 struct rtrs_srv_sess *sess = to_srv_sess(s); 976 struct rtrs_srv *srv = sess->srv; 977 struct rtrs_srv_ctx *ctx = srv->ctx; 978 struct rtrs_srv_op *id; 979 980 size_t usr_len, data_len; 981 void *data; 982 int ret; 983 984 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) { 985 rtrs_err_rl(s, 986 "Processing read request failed, session is disconnected, sess state %s\n", 987 rtrs_srv_state_str(sess->state)); 988 return; 989 } 990 if (unlikely(msg->sg_cnt != 1 && msg->sg_cnt != 0)) { 991 rtrs_err_rl(s, 992 "Processing read request failed, invalid message\n"); 993 return; 994 } 995 rtrs_srv_get_ops_ids(sess); 996 rtrs_srv_update_rdma_stats(sess->stats, off, READ); 997 id = sess->ops_ids[buf_id]; 998 id->con = con; 999 id->dir = READ; 1000 id->msg_id = buf_id; 1001 id->rd_msg = msg; 1002 usr_len = le16_to_cpu(msg->usr_len); 1003 data_len = off - usr_len; 1004 data = page_address(srv->chunks[buf_id]); 1005 ret = ctx->ops.rdma_ev(srv->priv, id, READ, data, data_len, 1006 data + data_len, usr_len); 1007 1008 if (unlikely(ret)) { 1009 rtrs_err_rl(s, 1010 "Processing read request failed, user module cb reported for msg_id %d, err: %d\n", 1011 buf_id, ret); 1012 goto send_err_msg; 1013 } 1014 1015 return; 1016 1017 send_err_msg: 1018 ret = send_io_resp_imm(con, id, ret); 1019 if (ret < 0) { 1020 rtrs_err_rl(s, 1021 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n", 1022 buf_id, ret); 1023 close_sess(sess); 1024 } 1025 rtrs_srv_put_ops_ids(sess); 1026 } 1027 1028 static void process_write(struct rtrs_srv_con *con, 1029 struct rtrs_msg_rdma_write *req, 1030 u32 buf_id, u32 off) 1031 { 1032 struct rtrs_sess *s = con->c.sess; 1033 struct rtrs_srv_sess *sess = to_srv_sess(s); 1034 struct rtrs_srv *srv = sess->srv; 1035 struct rtrs_srv_ctx *ctx = srv->ctx; 1036 struct rtrs_srv_op *id; 1037 1038 size_t data_len, usr_len; 1039 void *data; 1040 int ret; 1041 1042 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) { 1043 rtrs_err_rl(s, 1044 "Processing write request failed, session is disconnected, sess state %s\n", 1045 rtrs_srv_state_str(sess->state)); 1046 return; 1047 } 1048 rtrs_srv_get_ops_ids(sess); 1049 rtrs_srv_update_rdma_stats(sess->stats, off, WRITE); 1050 id = sess->ops_ids[buf_id]; 1051 id->con = con; 1052 id->dir = WRITE; 1053 id->msg_id = buf_id; 1054 1055 usr_len = le16_to_cpu(req->usr_len); 1056 data_len = off - usr_len; 1057 data = page_address(srv->chunks[buf_id]); 1058 ret = ctx->ops.rdma_ev(srv->priv, id, WRITE, data, data_len, 1059 data + data_len, usr_len); 1060 if (unlikely(ret)) { 1061 rtrs_err_rl(s, 1062 "Processing write request failed, user module callback reports err: %d\n", 1063 ret); 1064 goto send_err_msg; 1065 } 1066 1067 return; 1068 1069 send_err_msg: 1070 ret = send_io_resp_imm(con, id, ret); 1071 if (ret < 0) { 1072 rtrs_err_rl(s, 1073 "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n", 1074 buf_id, ret); 1075 close_sess(sess); 1076 } 1077 rtrs_srv_put_ops_ids(sess); 1078 } 1079 1080 static void process_io_req(struct rtrs_srv_con *con, void *msg, 1081 u32 id, u32 off) 1082 { 1083 struct rtrs_sess *s = con->c.sess; 1084 struct rtrs_srv_sess *sess = to_srv_sess(s); 1085 struct rtrs_msg_rdma_hdr *hdr; 1086 unsigned int type; 1087 1088 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, sess->dma_addr[id], 1089 max_chunk_size, DMA_BIDIRECTIONAL); 1090 hdr = msg; 1091 type = le16_to_cpu(hdr->type); 1092 1093 switch (type) { 1094 case RTRS_MSG_WRITE: 1095 process_write(con, msg, id, off); 1096 break; 1097 case RTRS_MSG_READ: 1098 process_read(con, msg, id, off); 1099 break; 1100 default: 1101 rtrs_err(s, 1102 "Processing I/O request failed, unknown message type received: 0x%02x\n", 1103 type); 1104 goto err; 1105 } 1106 1107 return; 1108 1109 err: 1110 close_sess(sess); 1111 } 1112 1113 static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc) 1114 { 1115 struct rtrs_srv_mr *mr = 1116 container_of(wc->wr_cqe, typeof(*mr), inv_cqe); 1117 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1118 struct rtrs_sess *s = con->c.sess; 1119 struct rtrs_srv_sess *sess = to_srv_sess(s); 1120 struct rtrs_srv *srv = sess->srv; 1121 u32 msg_id, off; 1122 void *data; 1123 1124 if (unlikely(wc->status != IB_WC_SUCCESS)) { 1125 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n", 1126 ib_wc_status_msg(wc->status)); 1127 close_sess(sess); 1128 } 1129 msg_id = mr->msg_id; 1130 off = mr->msg_off; 1131 data = page_address(srv->chunks[msg_id]) + off; 1132 process_io_req(con, data, msg_id, off); 1133 } 1134 1135 static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con, 1136 struct rtrs_srv_mr *mr) 1137 { 1138 struct ib_send_wr wr = { 1139 .opcode = IB_WR_LOCAL_INV, 1140 .wr_cqe = &mr->inv_cqe, 1141 .send_flags = IB_SEND_SIGNALED, 1142 .ex.invalidate_rkey = mr->mr->rkey, 1143 }; 1144 mr->inv_cqe.done = rtrs_srv_inv_rkey_done; 1145 1146 return ib_post_send(con->c.qp, &wr, NULL); 1147 } 1148 1149 static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con) 1150 { 1151 spin_lock(&con->rsp_wr_wait_lock); 1152 while (!list_empty(&con->rsp_wr_wait_list)) { 1153 struct rtrs_srv_op *id; 1154 int ret; 1155 1156 id = list_entry(con->rsp_wr_wait_list.next, 1157 struct rtrs_srv_op, wait_list); 1158 list_del(&id->wait_list); 1159 1160 spin_unlock(&con->rsp_wr_wait_lock); 1161 ret = rtrs_srv_resp_rdma(id, id->status); 1162 spin_lock(&con->rsp_wr_wait_lock); 1163 1164 if (!ret) { 1165 list_add(&id->wait_list, &con->rsp_wr_wait_list); 1166 break; 1167 } 1168 } 1169 spin_unlock(&con->rsp_wr_wait_lock); 1170 } 1171 1172 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc) 1173 { 1174 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1175 struct rtrs_sess *s = con->c.sess; 1176 struct rtrs_srv_sess *sess = to_srv_sess(s); 1177 struct rtrs_srv *srv = sess->srv; 1178 u32 imm_type, imm_payload; 1179 int err; 1180 1181 if (unlikely(wc->status != IB_WC_SUCCESS)) { 1182 if (wc->status != IB_WC_WR_FLUSH_ERR) { 1183 rtrs_err(s, 1184 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n", 1185 ib_wc_status_msg(wc->status), wc->wr_cqe, 1186 wc->opcode, wc->vendor_err, wc->byte_len); 1187 close_sess(sess); 1188 } 1189 return; 1190 } 1191 1192 switch (wc->opcode) { 1193 case IB_WC_RECV_RDMA_WITH_IMM: 1194 /* 1195 * post_recv() RDMA write completions of IO reqs (read/write) 1196 * and hb 1197 */ 1198 if (WARN_ON(wc->wr_cqe != &io_comp_cqe)) 1199 return; 1200 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 1201 if (unlikely(err)) { 1202 rtrs_err(s, "rtrs_post_recv(), err: %d\n", err); 1203 close_sess(sess); 1204 break; 1205 } 1206 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), 1207 &imm_type, &imm_payload); 1208 if (likely(imm_type == RTRS_IO_REQ_IMM)) { 1209 u32 msg_id, off; 1210 void *data; 1211 1212 msg_id = imm_payload >> sess->mem_bits; 1213 off = imm_payload & ((1 << sess->mem_bits) - 1); 1214 if (unlikely(msg_id >= srv->queue_depth || 1215 off >= max_chunk_size)) { 1216 rtrs_err(s, "Wrong msg_id %u, off %u\n", 1217 msg_id, off); 1218 close_sess(sess); 1219 return; 1220 } 1221 if (always_invalidate) { 1222 struct rtrs_srv_mr *mr = &sess->mrs[msg_id]; 1223 1224 mr->msg_off = off; 1225 mr->msg_id = msg_id; 1226 err = rtrs_srv_inv_rkey(con, mr); 1227 if (unlikely(err)) { 1228 rtrs_err(s, "rtrs_post_recv(), err: %d\n", 1229 err); 1230 close_sess(sess); 1231 break; 1232 } 1233 } else { 1234 data = page_address(srv->chunks[msg_id]) + off; 1235 process_io_req(con, data, msg_id, off); 1236 } 1237 } else if (imm_type == RTRS_HB_MSG_IMM) { 1238 WARN_ON(con->c.cid); 1239 rtrs_send_hb_ack(&sess->s); 1240 } else if (imm_type == RTRS_HB_ACK_IMM) { 1241 WARN_ON(con->c.cid); 1242 sess->s.hb_missed_cnt = 0; 1243 } else { 1244 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type); 1245 } 1246 break; 1247 case IB_WC_RDMA_WRITE: 1248 case IB_WC_SEND: 1249 /* 1250 * post_send() RDMA write completions of IO reqs (read/write) 1251 */ 1252 atomic_add(srv->queue_depth, &con->sq_wr_avail); 1253 1254 if (unlikely(!list_empty_careful(&con->rsp_wr_wait_list))) 1255 rtrs_rdma_process_wr_wait_list(con); 1256 1257 break; 1258 default: 1259 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode); 1260 return; 1261 } 1262 } 1263 1264 /** 1265 * rtrs_srv_get_sess_name() - Get rtrs_srv peer hostname. 1266 * @srv: Session 1267 * @sessname: Sessname buffer 1268 * @len: Length of sessname buffer 1269 */ 1270 int rtrs_srv_get_sess_name(struct rtrs_srv *srv, char *sessname, size_t len) 1271 { 1272 struct rtrs_srv_sess *sess; 1273 int err = -ENOTCONN; 1274 1275 mutex_lock(&srv->paths_mutex); 1276 list_for_each_entry(sess, &srv->paths_list, s.entry) { 1277 if (sess->state != RTRS_SRV_CONNECTED) 1278 continue; 1279 strlcpy(sessname, sess->s.sessname, 1280 min_t(size_t, sizeof(sess->s.sessname), len)); 1281 err = 0; 1282 break; 1283 } 1284 mutex_unlock(&srv->paths_mutex); 1285 1286 return err; 1287 } 1288 EXPORT_SYMBOL(rtrs_srv_get_sess_name); 1289 1290 /** 1291 * rtrs_srv_get_sess_qdepth() - Get rtrs_srv qdepth. 1292 * @srv: Session 1293 */ 1294 int rtrs_srv_get_queue_depth(struct rtrs_srv *srv) 1295 { 1296 return srv->queue_depth; 1297 } 1298 EXPORT_SYMBOL(rtrs_srv_get_queue_depth); 1299 1300 static int find_next_bit_ring(struct rtrs_srv_sess *sess) 1301 { 1302 struct ib_device *ib_dev = sess->s.dev->ib_dev; 1303 int v; 1304 1305 v = cpumask_next(sess->cur_cq_vector, &cq_affinity_mask); 1306 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors) 1307 v = cpumask_first(&cq_affinity_mask); 1308 return v; 1309 } 1310 1311 static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_sess *sess) 1312 { 1313 sess->cur_cq_vector = find_next_bit_ring(sess); 1314 1315 return sess->cur_cq_vector; 1316 } 1317 1318 static void rtrs_srv_dev_release(struct device *dev) 1319 { 1320 struct rtrs_srv *srv = container_of(dev, struct rtrs_srv, dev); 1321 1322 kfree(srv); 1323 } 1324 1325 static void free_srv(struct rtrs_srv *srv) 1326 { 1327 int i; 1328 1329 WARN_ON(refcount_read(&srv->refcount)); 1330 for (i = 0; i < srv->queue_depth; i++) 1331 mempool_free(srv->chunks[i], chunk_pool); 1332 kfree(srv->chunks); 1333 mutex_destroy(&srv->paths_mutex); 1334 mutex_destroy(&srv->paths_ev_mutex); 1335 /* last put to release the srv structure */ 1336 put_device(&srv->dev); 1337 } 1338 1339 static struct rtrs_srv *get_or_create_srv(struct rtrs_srv_ctx *ctx, 1340 const uuid_t *paths_uuid, 1341 bool first_conn) 1342 { 1343 struct rtrs_srv *srv; 1344 int i; 1345 1346 mutex_lock(&ctx->srv_mutex); 1347 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 1348 if (uuid_equal(&srv->paths_uuid, paths_uuid) && 1349 refcount_inc_not_zero(&srv->refcount)) { 1350 mutex_unlock(&ctx->srv_mutex); 1351 return srv; 1352 } 1353 } 1354 mutex_unlock(&ctx->srv_mutex); 1355 /* 1356 * If this request is not the first connection request from the 1357 * client for this session then fail and return error. 1358 */ 1359 if (!first_conn) 1360 return ERR_PTR(-ENXIO); 1361 1362 /* need to allocate a new srv */ 1363 srv = kzalloc(sizeof(*srv), GFP_KERNEL); 1364 if (!srv) 1365 return ERR_PTR(-ENOMEM); 1366 1367 INIT_LIST_HEAD(&srv->paths_list); 1368 mutex_init(&srv->paths_mutex); 1369 mutex_init(&srv->paths_ev_mutex); 1370 uuid_copy(&srv->paths_uuid, paths_uuid); 1371 srv->queue_depth = sess_queue_depth; 1372 srv->ctx = ctx; 1373 device_initialize(&srv->dev); 1374 srv->dev.release = rtrs_srv_dev_release; 1375 1376 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks), 1377 GFP_KERNEL); 1378 if (!srv->chunks) 1379 goto err_free_srv; 1380 1381 for (i = 0; i < srv->queue_depth; i++) { 1382 srv->chunks[i] = mempool_alloc(chunk_pool, GFP_KERNEL); 1383 if (!srv->chunks[i]) 1384 goto err_free_chunks; 1385 } 1386 refcount_set(&srv->refcount, 1); 1387 mutex_lock(&ctx->srv_mutex); 1388 list_add(&srv->ctx_list, &ctx->srv_list); 1389 mutex_unlock(&ctx->srv_mutex); 1390 1391 return srv; 1392 1393 err_free_chunks: 1394 while (i--) 1395 mempool_free(srv->chunks[i], chunk_pool); 1396 kfree(srv->chunks); 1397 1398 err_free_srv: 1399 kfree(srv); 1400 return ERR_PTR(-ENOMEM); 1401 } 1402 1403 static void put_srv(struct rtrs_srv *srv) 1404 { 1405 if (refcount_dec_and_test(&srv->refcount)) { 1406 struct rtrs_srv_ctx *ctx = srv->ctx; 1407 1408 WARN_ON(srv->dev.kobj.state_in_sysfs); 1409 1410 mutex_lock(&ctx->srv_mutex); 1411 list_del(&srv->ctx_list); 1412 mutex_unlock(&ctx->srv_mutex); 1413 free_srv(srv); 1414 } 1415 } 1416 1417 static void __add_path_to_srv(struct rtrs_srv *srv, 1418 struct rtrs_srv_sess *sess) 1419 { 1420 list_add_tail(&sess->s.entry, &srv->paths_list); 1421 srv->paths_num++; 1422 WARN_ON(srv->paths_num >= MAX_PATHS_NUM); 1423 } 1424 1425 static void del_path_from_srv(struct rtrs_srv_sess *sess) 1426 { 1427 struct rtrs_srv *srv = sess->srv; 1428 1429 if (WARN_ON(!srv)) 1430 return; 1431 1432 mutex_lock(&srv->paths_mutex); 1433 list_del(&sess->s.entry); 1434 WARN_ON(!srv->paths_num); 1435 srv->paths_num--; 1436 mutex_unlock(&srv->paths_mutex); 1437 } 1438 1439 /* return true if addresses are the same, error other wise */ 1440 static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b) 1441 { 1442 switch (a->sa_family) { 1443 case AF_IB: 1444 return memcmp(&((struct sockaddr_ib *)a)->sib_addr, 1445 &((struct sockaddr_ib *)b)->sib_addr, 1446 sizeof(struct ib_addr)) && 1447 (b->sa_family == AF_IB); 1448 case AF_INET: 1449 return memcmp(&((struct sockaddr_in *)a)->sin_addr, 1450 &((struct sockaddr_in *)b)->sin_addr, 1451 sizeof(struct in_addr)) && 1452 (b->sa_family == AF_INET); 1453 case AF_INET6: 1454 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr, 1455 &((struct sockaddr_in6 *)b)->sin6_addr, 1456 sizeof(struct in6_addr)) && 1457 (b->sa_family == AF_INET6); 1458 default: 1459 return -ENOENT; 1460 } 1461 } 1462 1463 static bool __is_path_w_addr_exists(struct rtrs_srv *srv, 1464 struct rdma_addr *addr) 1465 { 1466 struct rtrs_srv_sess *sess; 1467 1468 list_for_each_entry(sess, &srv->paths_list, s.entry) 1469 if (!sockaddr_cmp((struct sockaddr *)&sess->s.dst_addr, 1470 (struct sockaddr *)&addr->dst_addr) && 1471 !sockaddr_cmp((struct sockaddr *)&sess->s.src_addr, 1472 (struct sockaddr *)&addr->src_addr)) 1473 return true; 1474 1475 return false; 1476 } 1477 1478 static void free_sess(struct rtrs_srv_sess *sess) 1479 { 1480 if (sess->kobj.state_in_sysfs) { 1481 kobject_del(&sess->kobj); 1482 kobject_put(&sess->kobj); 1483 } else { 1484 kfree(sess); 1485 } 1486 } 1487 1488 static void rtrs_srv_close_work(struct work_struct *work) 1489 { 1490 struct rtrs_srv_sess *sess; 1491 struct rtrs_srv_con *con; 1492 int i; 1493 1494 sess = container_of(work, typeof(*sess), close_work); 1495 1496 rtrs_srv_destroy_sess_files(sess); 1497 rtrs_srv_stop_hb(sess); 1498 1499 for (i = 0; i < sess->s.con_num; i++) { 1500 if (!sess->s.con[i]) 1501 continue; 1502 con = to_srv_con(sess->s.con[i]); 1503 rdma_disconnect(con->c.cm_id); 1504 ib_drain_qp(con->c.qp); 1505 } 1506 /* Wait for all inflights */ 1507 rtrs_srv_wait_ops_ids(sess); 1508 1509 /* Notify upper layer if we are the last path */ 1510 rtrs_srv_sess_down(sess); 1511 1512 unmap_cont_bufs(sess); 1513 rtrs_srv_free_ops_ids(sess); 1514 1515 for (i = 0; i < sess->s.con_num; i++) { 1516 if (!sess->s.con[i]) 1517 continue; 1518 con = to_srv_con(sess->s.con[i]); 1519 rtrs_cq_qp_destroy(&con->c); 1520 rdma_destroy_id(con->c.cm_id); 1521 kfree(con); 1522 } 1523 rtrs_ib_dev_put(sess->s.dev); 1524 1525 del_path_from_srv(sess); 1526 put_srv(sess->srv); 1527 sess->srv = NULL; 1528 rtrs_srv_change_state(sess, RTRS_SRV_CLOSED); 1529 1530 kfree(sess->dma_addr); 1531 kfree(sess->s.con); 1532 free_sess(sess); 1533 } 1534 1535 static int rtrs_rdma_do_accept(struct rtrs_srv_sess *sess, 1536 struct rdma_cm_id *cm_id) 1537 { 1538 struct rtrs_srv *srv = sess->srv; 1539 struct rtrs_msg_conn_rsp msg; 1540 struct rdma_conn_param param; 1541 int err; 1542 1543 param = (struct rdma_conn_param) { 1544 .rnr_retry_count = 7, 1545 .private_data = &msg, 1546 .private_data_len = sizeof(msg), 1547 }; 1548 1549 msg = (struct rtrs_msg_conn_rsp) { 1550 .magic = cpu_to_le16(RTRS_MAGIC), 1551 .version = cpu_to_le16(RTRS_PROTO_VER), 1552 .queue_depth = cpu_to_le16(srv->queue_depth), 1553 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE), 1554 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE), 1555 }; 1556 1557 if (always_invalidate) 1558 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F); 1559 1560 err = rdma_accept(cm_id, ¶m); 1561 if (err) 1562 pr_err("rdma_accept(), err: %d\n", err); 1563 1564 return err; 1565 } 1566 1567 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno) 1568 { 1569 struct rtrs_msg_conn_rsp msg; 1570 int err; 1571 1572 msg = (struct rtrs_msg_conn_rsp) { 1573 .magic = cpu_to_le16(RTRS_MAGIC), 1574 .version = cpu_to_le16(RTRS_PROTO_VER), 1575 .errno = cpu_to_le16(errno), 1576 }; 1577 1578 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED); 1579 if (err) 1580 pr_err("rdma_reject(), err: %d\n", err); 1581 1582 /* Bounce errno back */ 1583 return errno; 1584 } 1585 1586 static struct rtrs_srv_sess * 1587 __find_sess(struct rtrs_srv *srv, const uuid_t *sess_uuid) 1588 { 1589 struct rtrs_srv_sess *sess; 1590 1591 list_for_each_entry(sess, &srv->paths_list, s.entry) { 1592 if (uuid_equal(&sess->s.uuid, sess_uuid)) 1593 return sess; 1594 } 1595 1596 return NULL; 1597 } 1598 1599 static int create_con(struct rtrs_srv_sess *sess, 1600 struct rdma_cm_id *cm_id, 1601 unsigned int cid) 1602 { 1603 struct rtrs_srv *srv = sess->srv; 1604 struct rtrs_sess *s = &sess->s; 1605 struct rtrs_srv_con *con; 1606 1607 u32 cq_size, wr_queue_size; 1608 int err, cq_vector; 1609 1610 con = kzalloc(sizeof(*con), GFP_KERNEL); 1611 if (!con) { 1612 err = -ENOMEM; 1613 goto err; 1614 } 1615 1616 spin_lock_init(&con->rsp_wr_wait_lock); 1617 INIT_LIST_HEAD(&con->rsp_wr_wait_list); 1618 con->c.cm_id = cm_id; 1619 con->c.sess = &sess->s; 1620 con->c.cid = cid; 1621 atomic_set(&con->wr_cnt, 1); 1622 1623 if (con->c.cid == 0) { 1624 /* 1625 * All receive and all send (each requiring invalidate) 1626 * + 2 for drain and heartbeat 1627 */ 1628 wr_queue_size = SERVICE_CON_QUEUE_DEPTH * 3 + 2; 1629 cq_size = wr_queue_size; 1630 } else { 1631 /* 1632 * If we have all receive requests posted and 1633 * all write requests posted and each read request 1634 * requires an invalidate request + drain 1635 * and qp gets into error state. 1636 */ 1637 cq_size = srv->queue_depth * 3 + 1; 1638 /* 1639 * In theory we might have queue_depth * 32 1640 * outstanding requests if an unsafe global key is used 1641 * and we have queue_depth read requests each consisting 1642 * of 32 different addresses. div 3 for mlx5. 1643 */ 1644 wr_queue_size = sess->s.dev->ib_dev->attrs.max_qp_wr / 3; 1645 } 1646 atomic_set(&con->sq_wr_avail, wr_queue_size); 1647 cq_vector = rtrs_srv_get_next_cq_vector(sess); 1648 1649 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */ 1650 err = rtrs_cq_qp_create(&sess->s, &con->c, 1, cq_vector, cq_size, 1651 wr_queue_size, wr_queue_size, 1652 IB_POLL_WORKQUEUE); 1653 if (err) { 1654 rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err); 1655 goto free_con; 1656 } 1657 if (con->c.cid == 0) { 1658 err = post_recv_info_req(con); 1659 if (err) 1660 goto free_cqqp; 1661 } 1662 WARN_ON(sess->s.con[cid]); 1663 sess->s.con[cid] = &con->c; 1664 1665 /* 1666 * Change context from server to current connection. The other 1667 * way is to use cm_id->qp->qp_context, which does not work on OFED. 1668 */ 1669 cm_id->context = &con->c; 1670 1671 return 0; 1672 1673 free_cqqp: 1674 rtrs_cq_qp_destroy(&con->c); 1675 free_con: 1676 kfree(con); 1677 1678 err: 1679 return err; 1680 } 1681 1682 static struct rtrs_srv_sess *__alloc_sess(struct rtrs_srv *srv, 1683 struct rdma_cm_id *cm_id, 1684 unsigned int con_num, 1685 unsigned int recon_cnt, 1686 const uuid_t *uuid) 1687 { 1688 struct rtrs_srv_sess *sess; 1689 int err = -ENOMEM; 1690 char str[NAME_MAX]; 1691 struct rtrs_addr path; 1692 1693 if (srv->paths_num >= MAX_PATHS_NUM) { 1694 err = -ECONNRESET; 1695 goto err; 1696 } 1697 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) { 1698 err = -EEXIST; 1699 pr_err("Path with same addr exists\n"); 1700 goto err; 1701 } 1702 sess = kzalloc(sizeof(*sess), GFP_KERNEL); 1703 if (!sess) 1704 goto err; 1705 1706 sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL); 1707 if (!sess->stats) 1708 goto err_free_sess; 1709 1710 sess->stats->sess = sess; 1711 1712 sess->dma_addr = kcalloc(srv->queue_depth, sizeof(*sess->dma_addr), 1713 GFP_KERNEL); 1714 if (!sess->dma_addr) 1715 goto err_free_stats; 1716 1717 sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL); 1718 if (!sess->s.con) 1719 goto err_free_dma_addr; 1720 1721 sess->state = RTRS_SRV_CONNECTING; 1722 sess->srv = srv; 1723 sess->cur_cq_vector = -1; 1724 sess->s.dst_addr = cm_id->route.addr.dst_addr; 1725 sess->s.src_addr = cm_id->route.addr.src_addr; 1726 1727 /* temporary until receiving session-name from client */ 1728 path.src = &sess->s.src_addr; 1729 path.dst = &sess->s.dst_addr; 1730 rtrs_addr_to_str(&path, str, sizeof(str)); 1731 strlcpy(sess->s.sessname, str, sizeof(sess->s.sessname)); 1732 1733 sess->s.con_num = con_num; 1734 sess->s.recon_cnt = recon_cnt; 1735 uuid_copy(&sess->s.uuid, uuid); 1736 spin_lock_init(&sess->state_lock); 1737 INIT_WORK(&sess->close_work, rtrs_srv_close_work); 1738 rtrs_srv_init_hb(sess); 1739 1740 sess->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd); 1741 if (!sess->s.dev) { 1742 err = -ENOMEM; 1743 goto err_free_con; 1744 } 1745 err = map_cont_bufs(sess); 1746 if (err) 1747 goto err_put_dev; 1748 1749 err = rtrs_srv_alloc_ops_ids(sess); 1750 if (err) 1751 goto err_unmap_bufs; 1752 1753 __add_path_to_srv(srv, sess); 1754 1755 return sess; 1756 1757 err_unmap_bufs: 1758 unmap_cont_bufs(sess); 1759 err_put_dev: 1760 rtrs_ib_dev_put(sess->s.dev); 1761 err_free_con: 1762 kfree(sess->s.con); 1763 err_free_dma_addr: 1764 kfree(sess->dma_addr); 1765 err_free_stats: 1766 kfree(sess->stats); 1767 err_free_sess: 1768 kfree(sess); 1769 err: 1770 return ERR_PTR(err); 1771 } 1772 1773 static int rtrs_rdma_connect(struct rdma_cm_id *cm_id, 1774 const struct rtrs_msg_conn_req *msg, 1775 size_t len) 1776 { 1777 struct rtrs_srv_ctx *ctx = cm_id->context; 1778 struct rtrs_srv_sess *sess; 1779 struct rtrs_srv *srv; 1780 1781 u16 version, con_num, cid; 1782 u16 recon_cnt; 1783 int err; 1784 1785 if (len < sizeof(*msg)) { 1786 pr_err("Invalid RTRS connection request\n"); 1787 goto reject_w_econnreset; 1788 } 1789 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) { 1790 pr_err("Invalid RTRS magic\n"); 1791 goto reject_w_econnreset; 1792 } 1793 version = le16_to_cpu(msg->version); 1794 if (version >> 8 != RTRS_PROTO_VER_MAJOR) { 1795 pr_err("Unsupported major RTRS version: %d, expected %d\n", 1796 version >> 8, RTRS_PROTO_VER_MAJOR); 1797 goto reject_w_econnreset; 1798 } 1799 con_num = le16_to_cpu(msg->cid_num); 1800 if (con_num > 4096) { 1801 /* Sanity check */ 1802 pr_err("Too many connections requested: %d\n", con_num); 1803 goto reject_w_econnreset; 1804 } 1805 cid = le16_to_cpu(msg->cid); 1806 if (cid >= con_num) { 1807 /* Sanity check */ 1808 pr_err("Incorrect cid: %d >= %d\n", cid, con_num); 1809 goto reject_w_econnreset; 1810 } 1811 recon_cnt = le16_to_cpu(msg->recon_cnt); 1812 srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn); 1813 if (IS_ERR(srv)) { 1814 err = PTR_ERR(srv); 1815 goto reject_w_err; 1816 } 1817 mutex_lock(&srv->paths_mutex); 1818 sess = __find_sess(srv, &msg->sess_uuid); 1819 if (sess) { 1820 struct rtrs_sess *s = &sess->s; 1821 1822 /* Session already holds a reference */ 1823 put_srv(srv); 1824 1825 if (sess->state != RTRS_SRV_CONNECTING) { 1826 rtrs_err(s, "Session in wrong state: %s\n", 1827 rtrs_srv_state_str(sess->state)); 1828 mutex_unlock(&srv->paths_mutex); 1829 goto reject_w_econnreset; 1830 } 1831 /* 1832 * Sanity checks 1833 */ 1834 if (con_num != s->con_num || cid >= s->con_num) { 1835 rtrs_err(s, "Incorrect request: %d, %d\n", 1836 cid, con_num); 1837 mutex_unlock(&srv->paths_mutex); 1838 goto reject_w_econnreset; 1839 } 1840 if (s->con[cid]) { 1841 rtrs_err(s, "Connection already exists: %d\n", 1842 cid); 1843 mutex_unlock(&srv->paths_mutex); 1844 goto reject_w_econnreset; 1845 } 1846 } else { 1847 sess = __alloc_sess(srv, cm_id, con_num, recon_cnt, 1848 &msg->sess_uuid); 1849 if (IS_ERR(sess)) { 1850 mutex_unlock(&srv->paths_mutex); 1851 put_srv(srv); 1852 err = PTR_ERR(sess); 1853 goto reject_w_err; 1854 } 1855 } 1856 err = create_con(sess, cm_id, cid); 1857 if (err) { 1858 (void)rtrs_rdma_do_reject(cm_id, err); 1859 /* 1860 * Since session has other connections we follow normal way 1861 * through workqueue, but still return an error to tell cma.c 1862 * to call rdma_destroy_id() for current connection. 1863 */ 1864 goto close_and_return_err; 1865 } 1866 err = rtrs_rdma_do_accept(sess, cm_id); 1867 if (err) { 1868 (void)rtrs_rdma_do_reject(cm_id, err); 1869 /* 1870 * Since current connection was successfully added to the 1871 * session we follow normal way through workqueue to close the 1872 * session, thus return 0 to tell cma.c we call 1873 * rdma_destroy_id() ourselves. 1874 */ 1875 err = 0; 1876 goto close_and_return_err; 1877 } 1878 mutex_unlock(&srv->paths_mutex); 1879 1880 return 0; 1881 1882 reject_w_err: 1883 return rtrs_rdma_do_reject(cm_id, err); 1884 1885 reject_w_econnreset: 1886 return rtrs_rdma_do_reject(cm_id, -ECONNRESET); 1887 1888 close_and_return_err: 1889 mutex_unlock(&srv->paths_mutex); 1890 close_sess(sess); 1891 1892 return err; 1893 } 1894 1895 static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id, 1896 struct rdma_cm_event *ev) 1897 { 1898 struct rtrs_srv_sess *sess = NULL; 1899 struct rtrs_sess *s = NULL; 1900 1901 if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 1902 struct rtrs_con *c = cm_id->context; 1903 1904 s = c->sess; 1905 sess = to_srv_sess(s); 1906 } 1907 1908 switch (ev->event) { 1909 case RDMA_CM_EVENT_CONNECT_REQUEST: 1910 /* 1911 * In case of error cma.c will destroy cm_id, 1912 * see cma_process_remove() 1913 */ 1914 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data, 1915 ev->param.conn.private_data_len); 1916 case RDMA_CM_EVENT_ESTABLISHED: 1917 /* Nothing here */ 1918 break; 1919 case RDMA_CM_EVENT_REJECTED: 1920 case RDMA_CM_EVENT_CONNECT_ERROR: 1921 case RDMA_CM_EVENT_UNREACHABLE: 1922 rtrs_err(s, "CM error (CM event: %s, err: %d)\n", 1923 rdma_event_msg(ev->event), ev->status); 1924 fallthrough; 1925 case RDMA_CM_EVENT_DISCONNECTED: 1926 case RDMA_CM_EVENT_ADDR_CHANGE: 1927 case RDMA_CM_EVENT_TIMEWAIT_EXIT: 1928 case RDMA_CM_EVENT_DEVICE_REMOVAL: 1929 close_sess(sess); 1930 break; 1931 default: 1932 pr_err("Ignoring unexpected CM event %s, err %d\n", 1933 rdma_event_msg(ev->event), ev->status); 1934 break; 1935 } 1936 1937 return 0; 1938 } 1939 1940 static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx, 1941 struct sockaddr *addr, 1942 enum rdma_ucm_port_space ps) 1943 { 1944 struct rdma_cm_id *cm_id; 1945 int ret; 1946 1947 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler, 1948 ctx, ps, IB_QPT_RC); 1949 if (IS_ERR(cm_id)) { 1950 ret = PTR_ERR(cm_id); 1951 pr_err("Creating id for RDMA connection failed, err: %d\n", 1952 ret); 1953 goto err_out; 1954 } 1955 ret = rdma_bind_addr(cm_id, addr); 1956 if (ret) { 1957 pr_err("Binding RDMA address failed, err: %d\n", ret); 1958 goto err_cm; 1959 } 1960 ret = rdma_listen(cm_id, 64); 1961 if (ret) { 1962 pr_err("Listening on RDMA connection failed, err: %d\n", 1963 ret); 1964 goto err_cm; 1965 } 1966 1967 return cm_id; 1968 1969 err_cm: 1970 rdma_destroy_id(cm_id); 1971 err_out: 1972 1973 return ERR_PTR(ret); 1974 } 1975 1976 static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port) 1977 { 1978 struct sockaddr_in6 sin = { 1979 .sin6_family = AF_INET6, 1980 .sin6_addr = IN6ADDR_ANY_INIT, 1981 .sin6_port = htons(port), 1982 }; 1983 struct sockaddr_ib sib = { 1984 .sib_family = AF_IB, 1985 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port), 1986 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL), 1987 .sib_pkey = cpu_to_be16(0xffff), 1988 }; 1989 struct rdma_cm_id *cm_ip, *cm_ib; 1990 int ret; 1991 1992 /* 1993 * We accept both IPoIB and IB connections, so we need to keep 1994 * two cm id's, one for each socket type and port space. 1995 * If the cm initialization of one of the id's fails, we abort 1996 * everything. 1997 */ 1998 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP); 1999 if (IS_ERR(cm_ip)) 2000 return PTR_ERR(cm_ip); 2001 2002 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB); 2003 if (IS_ERR(cm_ib)) { 2004 ret = PTR_ERR(cm_ib); 2005 goto free_cm_ip; 2006 } 2007 2008 ctx->cm_id_ip = cm_ip; 2009 ctx->cm_id_ib = cm_ib; 2010 2011 return 0; 2012 2013 free_cm_ip: 2014 rdma_destroy_id(cm_ip); 2015 2016 return ret; 2017 } 2018 2019 static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops) 2020 { 2021 struct rtrs_srv_ctx *ctx; 2022 2023 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); 2024 if (!ctx) 2025 return NULL; 2026 2027 ctx->ops = *ops; 2028 mutex_init(&ctx->srv_mutex); 2029 INIT_LIST_HEAD(&ctx->srv_list); 2030 2031 return ctx; 2032 } 2033 2034 static void free_srv_ctx(struct rtrs_srv_ctx *ctx) 2035 { 2036 WARN_ON(!list_empty(&ctx->srv_list)); 2037 mutex_destroy(&ctx->srv_mutex); 2038 kfree(ctx); 2039 } 2040 2041 static int rtrs_srv_add_one(struct ib_device *device) 2042 { 2043 struct rtrs_srv_ctx *ctx; 2044 int ret = 0; 2045 2046 mutex_lock(&ib_ctx.ib_dev_mutex); 2047 if (ib_ctx.ib_dev_count) 2048 goto out; 2049 2050 /* 2051 * Since our CM IDs are NOT bound to any ib device we will create them 2052 * only once 2053 */ 2054 ctx = ib_ctx.srv_ctx; 2055 ret = rtrs_srv_rdma_init(ctx, ib_ctx.port); 2056 if (ret) { 2057 /* 2058 * We errored out here. 2059 * According to the ib code, if we encounter an error here then the 2060 * error code is ignored, and no more calls to our ops are made. 2061 */ 2062 pr_err("Failed to initialize RDMA connection"); 2063 goto err_out; 2064 } 2065 2066 out: 2067 /* 2068 * Keep a track on the number of ib devices added 2069 */ 2070 ib_ctx.ib_dev_count++; 2071 2072 err_out: 2073 mutex_unlock(&ib_ctx.ib_dev_mutex); 2074 return ret; 2075 } 2076 2077 static void rtrs_srv_remove_one(struct ib_device *device, void *client_data) 2078 { 2079 struct rtrs_srv_ctx *ctx; 2080 2081 mutex_lock(&ib_ctx.ib_dev_mutex); 2082 ib_ctx.ib_dev_count--; 2083 2084 if (ib_ctx.ib_dev_count) 2085 goto out; 2086 2087 /* 2088 * Since our CM IDs are NOT bound to any ib device we will remove them 2089 * only once, when the last device is removed 2090 */ 2091 ctx = ib_ctx.srv_ctx; 2092 rdma_destroy_id(ctx->cm_id_ip); 2093 rdma_destroy_id(ctx->cm_id_ib); 2094 2095 out: 2096 mutex_unlock(&ib_ctx.ib_dev_mutex); 2097 } 2098 2099 static struct ib_client rtrs_srv_client = { 2100 .name = "rtrs_server", 2101 .add = rtrs_srv_add_one, 2102 .remove = rtrs_srv_remove_one 2103 }; 2104 2105 /** 2106 * rtrs_srv_open() - open RTRS server context 2107 * @ops: callback functions 2108 * @port: port to listen on 2109 * 2110 * Creates server context with specified callbacks. 2111 * 2112 * Return a valid pointer on success otherwise PTR_ERR. 2113 */ 2114 struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port) 2115 { 2116 struct rtrs_srv_ctx *ctx; 2117 int err; 2118 2119 ctx = alloc_srv_ctx(ops); 2120 if (!ctx) 2121 return ERR_PTR(-ENOMEM); 2122 2123 mutex_init(&ib_ctx.ib_dev_mutex); 2124 ib_ctx.srv_ctx = ctx; 2125 ib_ctx.port = port; 2126 2127 err = ib_register_client(&rtrs_srv_client); 2128 if (err) { 2129 free_srv_ctx(ctx); 2130 return ERR_PTR(err); 2131 } 2132 2133 return ctx; 2134 } 2135 EXPORT_SYMBOL(rtrs_srv_open); 2136 2137 static void close_sessions(struct rtrs_srv *srv) 2138 { 2139 struct rtrs_srv_sess *sess; 2140 2141 mutex_lock(&srv->paths_mutex); 2142 list_for_each_entry(sess, &srv->paths_list, s.entry) 2143 close_sess(sess); 2144 mutex_unlock(&srv->paths_mutex); 2145 } 2146 2147 static void close_ctx(struct rtrs_srv_ctx *ctx) 2148 { 2149 struct rtrs_srv *srv; 2150 2151 mutex_lock(&ctx->srv_mutex); 2152 list_for_each_entry(srv, &ctx->srv_list, ctx_list) 2153 close_sessions(srv); 2154 mutex_unlock(&ctx->srv_mutex); 2155 flush_workqueue(rtrs_wq); 2156 } 2157 2158 /** 2159 * rtrs_srv_close() - close RTRS server context 2160 * @ctx: pointer to server context 2161 * 2162 * Closes RTRS server context with all client sessions. 2163 */ 2164 void rtrs_srv_close(struct rtrs_srv_ctx *ctx) 2165 { 2166 ib_unregister_client(&rtrs_srv_client); 2167 mutex_destroy(&ib_ctx.ib_dev_mutex); 2168 close_ctx(ctx); 2169 free_srv_ctx(ctx); 2170 } 2171 EXPORT_SYMBOL(rtrs_srv_close); 2172 2173 static int check_module_params(void) 2174 { 2175 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) { 2176 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n", 2177 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH); 2178 return -EINVAL; 2179 } 2180 if (max_chunk_size < 4096 || !is_power_of_2(max_chunk_size)) { 2181 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n", 2182 max_chunk_size, 4096); 2183 return -EINVAL; 2184 } 2185 2186 /* 2187 * Check if IB immediate data size is enough to hold the mem_id and the 2188 * offset inside the memory chunk 2189 */ 2190 if ((ilog2(sess_queue_depth - 1) + 1) + 2191 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) { 2192 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n", 2193 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size); 2194 return -EINVAL; 2195 } 2196 2197 return 0; 2198 } 2199 2200 static int __init rtrs_server_init(void) 2201 { 2202 int err; 2203 2204 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n", 2205 KBUILD_MODNAME, RTRS_PROTO_VER_STRING, 2206 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE, 2207 sess_queue_depth, always_invalidate); 2208 2209 rtrs_rdma_dev_pd_init(0, &dev_pd); 2210 2211 err = check_module_params(); 2212 if (err) { 2213 pr_err("Failed to load module, invalid module parameters, err: %d\n", 2214 err); 2215 return err; 2216 } 2217 chunk_pool = mempool_create_page_pool(sess_queue_depth * CHUNK_POOL_SZ, 2218 get_order(max_chunk_size)); 2219 if (!chunk_pool) 2220 return -ENOMEM; 2221 rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server"); 2222 if (IS_ERR(rtrs_dev_class)) { 2223 err = PTR_ERR(rtrs_dev_class); 2224 goto out_chunk_pool; 2225 } 2226 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0); 2227 if (!rtrs_wq) { 2228 err = -ENOMEM; 2229 goto out_dev_class; 2230 } 2231 2232 return 0; 2233 2234 out_dev_class: 2235 class_destroy(rtrs_dev_class); 2236 out_chunk_pool: 2237 mempool_destroy(chunk_pool); 2238 2239 return err; 2240 } 2241 2242 static void __exit rtrs_server_exit(void) 2243 { 2244 destroy_workqueue(rtrs_wq); 2245 class_destroy(rtrs_dev_class); 2246 mempool_destroy(chunk_pool); 2247 rtrs_rdma_dev_pd_deinit(&dev_pd); 2248 } 2249 2250 module_init(rtrs_server_init); 2251 module_exit(rtrs_server_exit); 2252