1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Transport Layer 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 #include <linux/mempool.h> 15 16 #include "rtrs-srv.h" 17 #include "rtrs-log.h" 18 #include <rdma/ib_cm.h> 19 #include <rdma/ib_verbs.h> 20 21 MODULE_DESCRIPTION("RDMA Transport Server"); 22 MODULE_LICENSE("GPL"); 23 24 /* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */ 25 #define DEFAULT_MAX_CHUNK_SIZE (128 << 10) 26 #define DEFAULT_SESS_QUEUE_DEPTH 512 27 #define MAX_HDR_SIZE PAGE_SIZE 28 29 /* We guarantee to serve 10 paths at least */ 30 #define CHUNK_POOL_SZ 10 31 32 static struct rtrs_rdma_dev_pd dev_pd; 33 static mempool_t *chunk_pool; 34 struct class *rtrs_dev_class; 35 static struct rtrs_srv_ib_ctx ib_ctx; 36 37 static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE; 38 static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH; 39 40 static bool always_invalidate = true; 41 module_param(always_invalidate, bool, 0444); 42 MODULE_PARM_DESC(always_invalidate, 43 "Invalidate memory registration for contiguous memory regions before accessing."); 44 45 module_param_named(max_chunk_size, max_chunk_size, int, 0444); 46 MODULE_PARM_DESC(max_chunk_size, 47 "Max size for each IO request, when change the unit is in byte (default: " 48 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)"); 49 50 module_param_named(sess_queue_depth, sess_queue_depth, int, 0444); 51 MODULE_PARM_DESC(sess_queue_depth, 52 "Number of buffers for pending I/O requests to allocate per session. Maximum: " 53 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: " 54 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")"); 55 56 static cpumask_t cq_affinity_mask = { CPU_BITS_ALL }; 57 58 static struct workqueue_struct *rtrs_wq; 59 60 static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c) 61 { 62 return container_of(c, struct rtrs_srv_con, c); 63 } 64 65 static inline struct rtrs_srv_sess *to_srv_sess(struct rtrs_sess *s) 66 { 67 return container_of(s, struct rtrs_srv_sess, s); 68 } 69 70 static bool __rtrs_srv_change_state(struct rtrs_srv_sess *sess, 71 enum rtrs_srv_state new_state) 72 { 73 enum rtrs_srv_state old_state; 74 bool changed = false; 75 76 lockdep_assert_held(&sess->state_lock); 77 old_state = sess->state; 78 switch (new_state) { 79 case RTRS_SRV_CONNECTED: 80 switch (old_state) { 81 case RTRS_SRV_CONNECTING: 82 changed = true; 83 fallthrough; 84 default: 85 break; 86 } 87 break; 88 case RTRS_SRV_CLOSING: 89 switch (old_state) { 90 case RTRS_SRV_CONNECTING: 91 case RTRS_SRV_CONNECTED: 92 changed = true; 93 fallthrough; 94 default: 95 break; 96 } 97 break; 98 case RTRS_SRV_CLOSED: 99 switch (old_state) { 100 case RTRS_SRV_CLOSING: 101 changed = true; 102 fallthrough; 103 default: 104 break; 105 } 106 break; 107 default: 108 break; 109 } 110 if (changed) 111 sess->state = new_state; 112 113 return changed; 114 } 115 116 static bool rtrs_srv_change_state(struct rtrs_srv_sess *sess, 117 enum rtrs_srv_state new_state) 118 { 119 bool changed; 120 121 spin_lock_irq(&sess->state_lock); 122 changed = __rtrs_srv_change_state(sess, new_state); 123 spin_unlock_irq(&sess->state_lock); 124 125 return changed; 126 } 127 128 static void free_id(struct rtrs_srv_op *id) 129 { 130 if (!id) 131 return; 132 kfree(id); 133 } 134 135 static void rtrs_srv_free_ops_ids(struct rtrs_srv_sess *sess) 136 { 137 struct rtrs_srv *srv = sess->srv; 138 int i; 139 140 WARN_ON(atomic_read(&sess->ids_inflight)); 141 if (sess->ops_ids) { 142 for (i = 0; i < srv->queue_depth; i++) 143 free_id(sess->ops_ids[i]); 144 kfree(sess->ops_ids); 145 sess->ops_ids = NULL; 146 } 147 } 148 149 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc); 150 151 static struct ib_cqe io_comp_cqe = { 152 .done = rtrs_srv_rdma_done 153 }; 154 155 static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_sess *sess) 156 { 157 struct rtrs_srv *srv = sess->srv; 158 struct rtrs_srv_op *id; 159 int i; 160 161 sess->ops_ids = kcalloc(srv->queue_depth, sizeof(*sess->ops_ids), 162 GFP_KERNEL); 163 if (!sess->ops_ids) 164 goto err; 165 166 for (i = 0; i < srv->queue_depth; ++i) { 167 id = kzalloc(sizeof(*id), GFP_KERNEL); 168 if (!id) 169 goto err; 170 171 sess->ops_ids[i] = id; 172 } 173 init_waitqueue_head(&sess->ids_waitq); 174 atomic_set(&sess->ids_inflight, 0); 175 176 return 0; 177 178 err: 179 rtrs_srv_free_ops_ids(sess); 180 return -ENOMEM; 181 } 182 183 static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_sess *sess) 184 { 185 atomic_inc(&sess->ids_inflight); 186 } 187 188 static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_sess *sess) 189 { 190 if (atomic_dec_and_test(&sess->ids_inflight)) 191 wake_up(&sess->ids_waitq); 192 } 193 194 static void rtrs_srv_wait_ops_ids(struct rtrs_srv_sess *sess) 195 { 196 wait_event(sess->ids_waitq, !atomic_read(&sess->ids_inflight)); 197 } 198 199 200 static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc) 201 { 202 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 203 struct rtrs_sess *s = con->c.sess; 204 struct rtrs_srv_sess *sess = to_srv_sess(s); 205 206 if (unlikely(wc->status != IB_WC_SUCCESS)) { 207 rtrs_err(s, "REG MR failed: %s\n", 208 ib_wc_status_msg(wc->status)); 209 close_sess(sess); 210 return; 211 } 212 } 213 214 static struct ib_cqe local_reg_cqe = { 215 .done = rtrs_srv_reg_mr_done 216 }; 217 218 static int rdma_write_sg(struct rtrs_srv_op *id) 219 { 220 struct rtrs_sess *s = id->con->c.sess; 221 struct rtrs_srv_sess *sess = to_srv_sess(s); 222 dma_addr_t dma_addr = sess->dma_addr[id->msg_id]; 223 struct rtrs_srv_mr *srv_mr; 224 struct rtrs_srv *srv = sess->srv; 225 struct ib_send_wr inv_wr; 226 struct ib_rdma_wr imm_wr; 227 struct ib_rdma_wr *wr = NULL; 228 enum ib_send_flags flags; 229 size_t sg_cnt; 230 int err, offset; 231 bool need_inval; 232 u32 rkey = 0; 233 struct ib_reg_wr rwr; 234 struct ib_sge *plist; 235 struct ib_sge list; 236 237 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt); 238 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F; 239 if (unlikely(sg_cnt != 1)) 240 return -EINVAL; 241 242 offset = 0; 243 244 wr = &id->tx_wr; 245 plist = &id->tx_sg; 246 plist->addr = dma_addr + offset; 247 plist->length = le32_to_cpu(id->rd_msg->desc[0].len); 248 249 /* WR will fail with length error 250 * if this is 0 251 */ 252 if (unlikely(plist->length == 0)) { 253 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n"); 254 return -EINVAL; 255 } 256 257 plist->lkey = sess->s.dev->ib_pd->local_dma_lkey; 258 offset += plist->length; 259 260 wr->wr.sg_list = plist; 261 wr->wr.num_sge = 1; 262 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr); 263 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key); 264 if (rkey == 0) 265 rkey = wr->rkey; 266 else 267 /* Only one key is actually used */ 268 WARN_ON_ONCE(rkey != wr->rkey); 269 270 wr->wr.opcode = IB_WR_RDMA_WRITE; 271 wr->wr.wr_cqe = &io_comp_cqe; 272 wr->wr.ex.imm_data = 0; 273 wr->wr.send_flags = 0; 274 275 if (need_inval && always_invalidate) { 276 wr->wr.next = &rwr.wr; 277 rwr.wr.next = &inv_wr; 278 inv_wr.next = &imm_wr.wr; 279 } else if (always_invalidate) { 280 wr->wr.next = &rwr.wr; 281 rwr.wr.next = &imm_wr.wr; 282 } else if (need_inval) { 283 wr->wr.next = &inv_wr; 284 inv_wr.next = &imm_wr.wr; 285 } else { 286 wr->wr.next = &imm_wr.wr; 287 } 288 /* 289 * From time to time we have to post signaled sends, 290 * or send queue will fill up and only QP reset can help. 291 */ 292 flags = (atomic_inc_return(&id->con->wr_cnt) % srv->queue_depth) ? 293 0 : IB_SEND_SIGNALED; 294 295 if (need_inval) { 296 inv_wr.sg_list = NULL; 297 inv_wr.num_sge = 0; 298 inv_wr.opcode = IB_WR_SEND_WITH_INV; 299 inv_wr.wr_cqe = &io_comp_cqe; 300 inv_wr.send_flags = 0; 301 inv_wr.ex.invalidate_rkey = rkey; 302 } 303 304 imm_wr.wr.next = NULL; 305 if (always_invalidate) { 306 struct rtrs_msg_rkey_rsp *msg; 307 308 srv_mr = &sess->mrs[id->msg_id]; 309 rwr.wr.opcode = IB_WR_REG_MR; 310 rwr.wr.wr_cqe = &local_reg_cqe; 311 rwr.wr.num_sge = 0; 312 rwr.mr = srv_mr->mr; 313 rwr.wr.send_flags = 0; 314 rwr.key = srv_mr->mr->rkey; 315 rwr.access = (IB_ACCESS_LOCAL_WRITE | 316 IB_ACCESS_REMOTE_WRITE); 317 msg = srv_mr->iu->buf; 318 msg->buf_id = cpu_to_le16(id->msg_id); 319 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 320 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 321 322 list.addr = srv_mr->iu->dma_addr; 323 list.length = sizeof(*msg); 324 list.lkey = sess->s.dev->ib_pd->local_dma_lkey; 325 imm_wr.wr.sg_list = &list; 326 imm_wr.wr.num_sge = 1; 327 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 328 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, 329 srv_mr->iu->dma_addr, 330 srv_mr->iu->size, DMA_TO_DEVICE); 331 } else { 332 imm_wr.wr.sg_list = NULL; 333 imm_wr.wr.num_sge = 0; 334 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 335 } 336 imm_wr.wr.send_flags = flags; 337 imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id, 338 0, need_inval)); 339 340 imm_wr.wr.wr_cqe = &io_comp_cqe; 341 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, dma_addr, 342 offset, DMA_BIDIRECTIONAL); 343 344 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL); 345 if (unlikely(err)) 346 rtrs_err(s, 347 "Posting RDMA-Write-Request to QP failed, err: %d\n", 348 err); 349 350 return err; 351 } 352 353 /** 354 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE 355 * requests or on successful WRITE request. 356 * @con: the connection to send back result 357 * @id: the id associated with the IO 358 * @errno: the error number of the IO. 359 * 360 * Return 0 on success, errno otherwise. 361 */ 362 static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id, 363 int errno) 364 { 365 struct rtrs_sess *s = con->c.sess; 366 struct rtrs_srv_sess *sess = to_srv_sess(s); 367 struct ib_send_wr inv_wr, *wr = NULL; 368 struct ib_rdma_wr imm_wr; 369 struct ib_reg_wr rwr; 370 struct rtrs_srv *srv = sess->srv; 371 struct rtrs_srv_mr *srv_mr; 372 bool need_inval = false; 373 enum ib_send_flags flags; 374 u32 imm; 375 int err; 376 377 if (id->dir == READ) { 378 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg; 379 size_t sg_cnt; 380 381 need_inval = le16_to_cpu(rd_msg->flags) & 382 RTRS_MSG_NEED_INVAL_F; 383 sg_cnt = le16_to_cpu(rd_msg->sg_cnt); 384 385 if (need_inval) { 386 if (likely(sg_cnt)) { 387 inv_wr.wr_cqe = &io_comp_cqe; 388 inv_wr.sg_list = NULL; 389 inv_wr.num_sge = 0; 390 inv_wr.opcode = IB_WR_SEND_WITH_INV; 391 inv_wr.send_flags = 0; 392 /* Only one key is actually used */ 393 inv_wr.ex.invalidate_rkey = 394 le32_to_cpu(rd_msg->desc[0].key); 395 } else { 396 WARN_ON_ONCE(1); 397 need_inval = false; 398 } 399 } 400 } 401 402 if (need_inval && always_invalidate) { 403 wr = &inv_wr; 404 inv_wr.next = &rwr.wr; 405 rwr.wr.next = &imm_wr.wr; 406 } else if (always_invalidate) { 407 wr = &rwr.wr; 408 rwr.wr.next = &imm_wr.wr; 409 } else if (need_inval) { 410 wr = &inv_wr; 411 inv_wr.next = &imm_wr.wr; 412 } else { 413 wr = &imm_wr.wr; 414 } 415 /* 416 * From time to time we have to post signalled sends, 417 * or send queue will fill up and only QP reset can help. 418 */ 419 flags = (atomic_inc_return(&con->wr_cnt) % srv->queue_depth) ? 420 0 : IB_SEND_SIGNALED; 421 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval); 422 imm_wr.wr.next = NULL; 423 if (always_invalidate) { 424 struct ib_sge list; 425 struct rtrs_msg_rkey_rsp *msg; 426 427 srv_mr = &sess->mrs[id->msg_id]; 428 rwr.wr.next = &imm_wr.wr; 429 rwr.wr.opcode = IB_WR_REG_MR; 430 rwr.wr.wr_cqe = &local_reg_cqe; 431 rwr.wr.num_sge = 0; 432 rwr.wr.send_flags = 0; 433 rwr.mr = srv_mr->mr; 434 rwr.key = srv_mr->mr->rkey; 435 rwr.access = (IB_ACCESS_LOCAL_WRITE | 436 IB_ACCESS_REMOTE_WRITE); 437 msg = srv_mr->iu->buf; 438 msg->buf_id = cpu_to_le16(id->msg_id); 439 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 440 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 441 442 list.addr = srv_mr->iu->dma_addr; 443 list.length = sizeof(*msg); 444 list.lkey = sess->s.dev->ib_pd->local_dma_lkey; 445 imm_wr.wr.sg_list = &list; 446 imm_wr.wr.num_sge = 1; 447 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 448 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, 449 srv_mr->iu->dma_addr, 450 srv_mr->iu->size, DMA_TO_DEVICE); 451 } else { 452 imm_wr.wr.sg_list = NULL; 453 imm_wr.wr.num_sge = 0; 454 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 455 } 456 imm_wr.wr.send_flags = flags; 457 imm_wr.wr.wr_cqe = &io_comp_cqe; 458 459 imm_wr.wr.ex.imm_data = cpu_to_be32(imm); 460 461 err = ib_post_send(id->con->c.qp, wr, NULL); 462 if (unlikely(err)) 463 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n", 464 err); 465 466 return err; 467 } 468 469 void close_sess(struct rtrs_srv_sess *sess) 470 { 471 if (rtrs_srv_change_state(sess, RTRS_SRV_CLOSING)) 472 queue_work(rtrs_wq, &sess->close_work); 473 WARN_ON(sess->state != RTRS_SRV_CLOSING); 474 } 475 476 static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state) 477 { 478 switch (state) { 479 case RTRS_SRV_CONNECTING: 480 return "RTRS_SRV_CONNECTING"; 481 case RTRS_SRV_CONNECTED: 482 return "RTRS_SRV_CONNECTED"; 483 case RTRS_SRV_CLOSING: 484 return "RTRS_SRV_CLOSING"; 485 case RTRS_SRV_CLOSED: 486 return "RTRS_SRV_CLOSED"; 487 default: 488 return "UNKNOWN"; 489 } 490 } 491 492 /** 493 * rtrs_srv_resp_rdma() - Finish an RDMA request 494 * 495 * @id: Internal RTRS operation identifier 496 * @status: Response Code sent to the other side for this operation. 497 * 0 = success, <=0 error 498 * Context: any 499 * 500 * Finish a RDMA operation. A message is sent to the client and the 501 * corresponding memory areas will be released. 502 */ 503 bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status) 504 { 505 struct rtrs_srv_sess *sess; 506 struct rtrs_srv_con *con; 507 struct rtrs_sess *s; 508 int err; 509 510 if (WARN_ON(!id)) 511 return true; 512 513 con = id->con; 514 s = con->c.sess; 515 sess = to_srv_sess(s); 516 517 id->status = status; 518 519 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) { 520 rtrs_err_rl(s, 521 "Sending I/O response failed, session %s is disconnected, sess state %s\n", 522 kobject_name(&sess->kobj), 523 rtrs_srv_state_str(sess->state)); 524 goto out; 525 } 526 if (always_invalidate) { 527 struct rtrs_srv_mr *mr = &sess->mrs[id->msg_id]; 528 529 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey)); 530 } 531 if (unlikely(atomic_sub_return(1, 532 &con->sq_wr_avail) < 0)) { 533 rtrs_err(s, "IB send queue full: sess=%s cid=%d\n", 534 kobject_name(&sess->kobj), 535 con->c.cid); 536 atomic_add(1, &con->sq_wr_avail); 537 spin_lock(&con->rsp_wr_wait_lock); 538 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list); 539 spin_unlock(&con->rsp_wr_wait_lock); 540 return false; 541 } 542 543 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt) 544 err = send_io_resp_imm(con, id, status); 545 else 546 err = rdma_write_sg(id); 547 548 if (unlikely(err)) { 549 rtrs_err_rl(s, "IO response failed: %d: sess=%s\n", err, 550 kobject_name(&sess->kobj)); 551 close_sess(sess); 552 } 553 out: 554 rtrs_srv_put_ops_ids(sess); 555 return true; 556 } 557 EXPORT_SYMBOL(rtrs_srv_resp_rdma); 558 559 /** 560 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv. 561 * @srv: Session pointer 562 * @priv: The private pointer that is associated with the session. 563 */ 564 void rtrs_srv_set_sess_priv(struct rtrs_srv *srv, void *priv) 565 { 566 srv->priv = priv; 567 } 568 EXPORT_SYMBOL(rtrs_srv_set_sess_priv); 569 570 static void unmap_cont_bufs(struct rtrs_srv_sess *sess) 571 { 572 int i; 573 574 for (i = 0; i < sess->mrs_num; i++) { 575 struct rtrs_srv_mr *srv_mr; 576 577 srv_mr = &sess->mrs[i]; 578 rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1); 579 ib_dereg_mr(srv_mr->mr); 580 ib_dma_unmap_sg(sess->s.dev->ib_dev, srv_mr->sgt.sgl, 581 srv_mr->sgt.nents, DMA_BIDIRECTIONAL); 582 sg_free_table(&srv_mr->sgt); 583 } 584 kfree(sess->mrs); 585 } 586 587 static int map_cont_bufs(struct rtrs_srv_sess *sess) 588 { 589 struct rtrs_srv *srv = sess->srv; 590 struct rtrs_sess *ss = &sess->s; 591 int i, mri, err, mrs_num; 592 unsigned int chunk_bits; 593 int chunks_per_mr = 1; 594 595 /* 596 * Here we map queue_depth chunks to MR. Firstly we have to 597 * figure out how many chunks can we map per MR. 598 */ 599 if (always_invalidate) { 600 /* 601 * in order to do invalidate for each chunks of memory, we needs 602 * more memory regions. 603 */ 604 mrs_num = srv->queue_depth; 605 } else { 606 chunks_per_mr = 607 sess->s.dev->ib_dev->attrs.max_fast_reg_page_list_len; 608 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr); 609 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num); 610 } 611 612 sess->mrs = kcalloc(mrs_num, sizeof(*sess->mrs), GFP_KERNEL); 613 if (!sess->mrs) 614 return -ENOMEM; 615 616 sess->mrs_num = mrs_num; 617 618 for (mri = 0; mri < mrs_num; mri++) { 619 struct rtrs_srv_mr *srv_mr = &sess->mrs[mri]; 620 struct sg_table *sgt = &srv_mr->sgt; 621 struct scatterlist *s; 622 struct ib_mr *mr; 623 int nr, chunks; 624 625 chunks = chunks_per_mr * mri; 626 if (!always_invalidate) 627 chunks_per_mr = min_t(int, chunks_per_mr, 628 srv->queue_depth - chunks); 629 630 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL); 631 if (err) 632 goto err; 633 634 for_each_sg(sgt->sgl, s, chunks_per_mr, i) 635 sg_set_page(s, srv->chunks[chunks + i], 636 max_chunk_size, 0); 637 638 nr = ib_dma_map_sg(sess->s.dev->ib_dev, sgt->sgl, 639 sgt->nents, DMA_BIDIRECTIONAL); 640 if (nr < sgt->nents) { 641 err = nr < 0 ? nr : -EINVAL; 642 goto free_sg; 643 } 644 mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG, 645 sgt->nents); 646 if (IS_ERR(mr)) { 647 err = PTR_ERR(mr); 648 goto unmap_sg; 649 } 650 nr = ib_map_mr_sg(mr, sgt->sgl, sgt->nents, 651 NULL, max_chunk_size); 652 if (nr < 0 || nr < sgt->nents) { 653 err = nr < 0 ? nr : -EINVAL; 654 goto dereg_mr; 655 } 656 657 if (always_invalidate) { 658 srv_mr->iu = rtrs_iu_alloc(1, 659 sizeof(struct rtrs_msg_rkey_rsp), 660 GFP_KERNEL, sess->s.dev->ib_dev, 661 DMA_TO_DEVICE, rtrs_srv_rdma_done); 662 if (!srv_mr->iu) { 663 err = -ENOMEM; 664 rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err); 665 goto dereg_mr; 666 } 667 } 668 /* Eventually dma addr for each chunk can be cached */ 669 for_each_sg(sgt->sgl, s, sgt->orig_nents, i) 670 sess->dma_addr[chunks + i] = sg_dma_address(s); 671 672 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey)); 673 srv_mr->mr = mr; 674 675 continue; 676 err: 677 while (mri--) { 678 srv_mr = &sess->mrs[mri]; 679 sgt = &srv_mr->sgt; 680 mr = srv_mr->mr; 681 rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1); 682 dereg_mr: 683 ib_dereg_mr(mr); 684 unmap_sg: 685 ib_dma_unmap_sg(sess->s.dev->ib_dev, sgt->sgl, 686 sgt->nents, DMA_BIDIRECTIONAL); 687 free_sg: 688 sg_free_table(sgt); 689 } 690 kfree(sess->mrs); 691 692 return err; 693 } 694 695 chunk_bits = ilog2(srv->queue_depth - 1) + 1; 696 sess->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits); 697 698 return 0; 699 } 700 701 static void rtrs_srv_hb_err_handler(struct rtrs_con *c) 702 { 703 close_sess(to_srv_sess(c->sess)); 704 } 705 706 static void rtrs_srv_init_hb(struct rtrs_srv_sess *sess) 707 { 708 rtrs_init_hb(&sess->s, &io_comp_cqe, 709 RTRS_HB_INTERVAL_MS, 710 RTRS_HB_MISSED_MAX, 711 rtrs_srv_hb_err_handler, 712 rtrs_wq); 713 } 714 715 static void rtrs_srv_start_hb(struct rtrs_srv_sess *sess) 716 { 717 rtrs_start_hb(&sess->s); 718 } 719 720 static void rtrs_srv_stop_hb(struct rtrs_srv_sess *sess) 721 { 722 rtrs_stop_hb(&sess->s); 723 } 724 725 static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc) 726 { 727 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 728 struct rtrs_sess *s = con->c.sess; 729 struct rtrs_srv_sess *sess = to_srv_sess(s); 730 struct rtrs_iu *iu; 731 732 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 733 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 734 735 if (unlikely(wc->status != IB_WC_SUCCESS)) { 736 rtrs_err(s, "Sess info response send failed: %s\n", 737 ib_wc_status_msg(wc->status)); 738 close_sess(sess); 739 return; 740 } 741 WARN_ON(wc->opcode != IB_WC_SEND); 742 } 743 744 static void rtrs_srv_sess_up(struct rtrs_srv_sess *sess) 745 { 746 struct rtrs_srv *srv = sess->srv; 747 struct rtrs_srv_ctx *ctx = srv->ctx; 748 int up; 749 750 mutex_lock(&srv->paths_ev_mutex); 751 up = ++srv->paths_up; 752 if (up == 1) 753 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL); 754 mutex_unlock(&srv->paths_ev_mutex); 755 756 /* Mark session as established */ 757 sess->established = true; 758 } 759 760 static void rtrs_srv_sess_down(struct rtrs_srv_sess *sess) 761 { 762 struct rtrs_srv *srv = sess->srv; 763 struct rtrs_srv_ctx *ctx = srv->ctx; 764 765 if (!sess->established) 766 return; 767 768 sess->established = false; 769 mutex_lock(&srv->paths_ev_mutex); 770 WARN_ON(!srv->paths_up); 771 if (--srv->paths_up == 0) 772 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv); 773 mutex_unlock(&srv->paths_ev_mutex); 774 } 775 776 static int post_recv_sess(struct rtrs_srv_sess *sess); 777 778 static int process_info_req(struct rtrs_srv_con *con, 779 struct rtrs_msg_info_req *msg) 780 { 781 struct rtrs_sess *s = con->c.sess; 782 struct rtrs_srv_sess *sess = to_srv_sess(s); 783 struct ib_send_wr *reg_wr = NULL; 784 struct rtrs_msg_info_rsp *rsp; 785 struct rtrs_iu *tx_iu; 786 struct ib_reg_wr *rwr; 787 int mri, err; 788 size_t tx_sz; 789 790 err = post_recv_sess(sess); 791 if (unlikely(err)) { 792 rtrs_err(s, "post_recv_sess(), err: %d\n", err); 793 return err; 794 } 795 rwr = kcalloc(sess->mrs_num, sizeof(*rwr), GFP_KERNEL); 796 if (unlikely(!rwr)) 797 return -ENOMEM; 798 strlcpy(sess->s.sessname, msg->sessname, sizeof(sess->s.sessname)); 799 800 tx_sz = sizeof(*rsp); 801 tx_sz += sizeof(rsp->desc[0]) * sess->mrs_num; 802 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, sess->s.dev->ib_dev, 803 DMA_TO_DEVICE, rtrs_srv_info_rsp_done); 804 if (unlikely(!tx_iu)) { 805 err = -ENOMEM; 806 goto rwr_free; 807 } 808 809 rsp = tx_iu->buf; 810 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP); 811 rsp->sg_cnt = cpu_to_le16(sess->mrs_num); 812 813 for (mri = 0; mri < sess->mrs_num; mri++) { 814 struct ib_mr *mr = sess->mrs[mri].mr; 815 816 rsp->desc[mri].addr = cpu_to_le64(mr->iova); 817 rsp->desc[mri].key = cpu_to_le32(mr->rkey); 818 rsp->desc[mri].len = cpu_to_le32(mr->length); 819 820 /* 821 * Fill in reg MR request and chain them *backwards* 822 */ 823 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL; 824 rwr[mri].wr.opcode = IB_WR_REG_MR; 825 rwr[mri].wr.wr_cqe = &local_reg_cqe; 826 rwr[mri].wr.num_sge = 0; 827 rwr[mri].wr.send_flags = 0; 828 rwr[mri].mr = mr; 829 rwr[mri].key = mr->rkey; 830 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE | 831 IB_ACCESS_REMOTE_WRITE); 832 reg_wr = &rwr[mri].wr; 833 } 834 835 err = rtrs_srv_create_sess_files(sess); 836 if (unlikely(err)) 837 goto iu_free; 838 kobject_get(&sess->kobj); 839 get_device(&sess->srv->dev); 840 rtrs_srv_change_state(sess, RTRS_SRV_CONNECTED); 841 rtrs_srv_start_hb(sess); 842 843 /* 844 * We do not account number of established connections at the current 845 * moment, we rely on the client, which should send info request when 846 * all connections are successfully established. Thus, simply notify 847 * listener with a proper event if we are the first path. 848 */ 849 rtrs_srv_sess_up(sess); 850 851 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr, 852 tx_iu->size, DMA_TO_DEVICE); 853 854 /* Send info response */ 855 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr); 856 if (unlikely(err)) { 857 rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err); 858 iu_free: 859 rtrs_iu_free(tx_iu, sess->s.dev->ib_dev, 1); 860 } 861 rwr_free: 862 kfree(rwr); 863 864 return err; 865 } 866 867 static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc) 868 { 869 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 870 struct rtrs_sess *s = con->c.sess; 871 struct rtrs_srv_sess *sess = to_srv_sess(s); 872 struct rtrs_msg_info_req *msg; 873 struct rtrs_iu *iu; 874 int err; 875 876 WARN_ON(con->c.cid); 877 878 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 879 if (unlikely(wc->status != IB_WC_SUCCESS)) { 880 rtrs_err(s, "Sess info request receive failed: %s\n", 881 ib_wc_status_msg(wc->status)); 882 goto close; 883 } 884 WARN_ON(wc->opcode != IB_WC_RECV); 885 886 if (unlikely(wc->byte_len < sizeof(*msg))) { 887 rtrs_err(s, "Sess info request is malformed: size %d\n", 888 wc->byte_len); 889 goto close; 890 } 891 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr, 892 iu->size, DMA_FROM_DEVICE); 893 msg = iu->buf; 894 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ)) { 895 rtrs_err(s, "Sess info request is malformed: type %d\n", 896 le16_to_cpu(msg->type)); 897 goto close; 898 } 899 err = process_info_req(con, msg); 900 if (unlikely(err)) 901 goto close; 902 903 out: 904 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 905 return; 906 close: 907 close_sess(sess); 908 goto out; 909 } 910 911 static int post_recv_info_req(struct rtrs_srv_con *con) 912 { 913 struct rtrs_sess *s = con->c.sess; 914 struct rtrs_srv_sess *sess = to_srv_sess(s); 915 struct rtrs_iu *rx_iu; 916 int err; 917 918 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), 919 GFP_KERNEL, sess->s.dev->ib_dev, 920 DMA_FROM_DEVICE, rtrs_srv_info_req_done); 921 if (unlikely(!rx_iu)) 922 return -ENOMEM; 923 /* Prepare for getting info response */ 924 err = rtrs_iu_post_recv(&con->c, rx_iu); 925 if (unlikely(err)) { 926 rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err); 927 rtrs_iu_free(rx_iu, sess->s.dev->ib_dev, 1); 928 return err; 929 } 930 931 return 0; 932 } 933 934 static int post_recv_io(struct rtrs_srv_con *con, size_t q_size) 935 { 936 int i, err; 937 938 for (i = 0; i < q_size; i++) { 939 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 940 if (unlikely(err)) 941 return err; 942 } 943 944 return 0; 945 } 946 947 static int post_recv_sess(struct rtrs_srv_sess *sess) 948 { 949 struct rtrs_srv *srv = sess->srv; 950 struct rtrs_sess *s = &sess->s; 951 size_t q_size; 952 int err, cid; 953 954 for (cid = 0; cid < sess->s.con_num; cid++) { 955 if (cid == 0) 956 q_size = SERVICE_CON_QUEUE_DEPTH; 957 else 958 q_size = srv->queue_depth; 959 960 err = post_recv_io(to_srv_con(sess->s.con[cid]), q_size); 961 if (unlikely(err)) { 962 rtrs_err(s, "post_recv_io(), err: %d\n", err); 963 return err; 964 } 965 } 966 967 return 0; 968 } 969 970 static void process_read(struct rtrs_srv_con *con, 971 struct rtrs_msg_rdma_read *msg, 972 u32 buf_id, u32 off) 973 { 974 struct rtrs_sess *s = con->c.sess; 975 struct rtrs_srv_sess *sess = to_srv_sess(s); 976 struct rtrs_srv *srv = sess->srv; 977 struct rtrs_srv_ctx *ctx = srv->ctx; 978 struct rtrs_srv_op *id; 979 980 size_t usr_len, data_len; 981 void *data; 982 int ret; 983 984 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) { 985 rtrs_err_rl(s, 986 "Processing read request failed, session is disconnected, sess state %s\n", 987 rtrs_srv_state_str(sess->state)); 988 return; 989 } 990 if (unlikely(msg->sg_cnt != 1 && msg->sg_cnt != 0)) { 991 rtrs_err_rl(s, 992 "Processing read request failed, invalid message\n"); 993 return; 994 } 995 rtrs_srv_get_ops_ids(sess); 996 rtrs_srv_update_rdma_stats(sess->stats, off, READ); 997 id = sess->ops_ids[buf_id]; 998 id->con = con; 999 id->dir = READ; 1000 id->msg_id = buf_id; 1001 id->rd_msg = msg; 1002 usr_len = le16_to_cpu(msg->usr_len); 1003 data_len = off - usr_len; 1004 data = page_address(srv->chunks[buf_id]); 1005 ret = ctx->ops.rdma_ev(srv->priv, id, READ, data, data_len, 1006 data + data_len, usr_len); 1007 1008 if (unlikely(ret)) { 1009 rtrs_err_rl(s, 1010 "Processing read request failed, user module cb reported for msg_id %d, err: %d\n", 1011 buf_id, ret); 1012 goto send_err_msg; 1013 } 1014 1015 return; 1016 1017 send_err_msg: 1018 ret = send_io_resp_imm(con, id, ret); 1019 if (ret < 0) { 1020 rtrs_err_rl(s, 1021 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n", 1022 buf_id, ret); 1023 close_sess(sess); 1024 } 1025 rtrs_srv_put_ops_ids(sess); 1026 } 1027 1028 static void process_write(struct rtrs_srv_con *con, 1029 struct rtrs_msg_rdma_write *req, 1030 u32 buf_id, u32 off) 1031 { 1032 struct rtrs_sess *s = con->c.sess; 1033 struct rtrs_srv_sess *sess = to_srv_sess(s); 1034 struct rtrs_srv *srv = sess->srv; 1035 struct rtrs_srv_ctx *ctx = srv->ctx; 1036 struct rtrs_srv_op *id; 1037 1038 size_t data_len, usr_len; 1039 void *data; 1040 int ret; 1041 1042 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) { 1043 rtrs_err_rl(s, 1044 "Processing write request failed, session is disconnected, sess state %s\n", 1045 rtrs_srv_state_str(sess->state)); 1046 return; 1047 } 1048 rtrs_srv_get_ops_ids(sess); 1049 rtrs_srv_update_rdma_stats(sess->stats, off, WRITE); 1050 id = sess->ops_ids[buf_id]; 1051 id->con = con; 1052 id->dir = WRITE; 1053 id->msg_id = buf_id; 1054 1055 usr_len = le16_to_cpu(req->usr_len); 1056 data_len = off - usr_len; 1057 data = page_address(srv->chunks[buf_id]); 1058 ret = ctx->ops.rdma_ev(srv->priv, id, WRITE, data, data_len, 1059 data + data_len, usr_len); 1060 if (unlikely(ret)) { 1061 rtrs_err_rl(s, 1062 "Processing write request failed, user module callback reports err: %d\n", 1063 ret); 1064 goto send_err_msg; 1065 } 1066 1067 return; 1068 1069 send_err_msg: 1070 ret = send_io_resp_imm(con, id, ret); 1071 if (ret < 0) { 1072 rtrs_err_rl(s, 1073 "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n", 1074 buf_id, ret); 1075 close_sess(sess); 1076 } 1077 rtrs_srv_put_ops_ids(sess); 1078 } 1079 1080 static void process_io_req(struct rtrs_srv_con *con, void *msg, 1081 u32 id, u32 off) 1082 { 1083 struct rtrs_sess *s = con->c.sess; 1084 struct rtrs_srv_sess *sess = to_srv_sess(s); 1085 struct rtrs_msg_rdma_hdr *hdr; 1086 unsigned int type; 1087 1088 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, sess->dma_addr[id], 1089 max_chunk_size, DMA_BIDIRECTIONAL); 1090 hdr = msg; 1091 type = le16_to_cpu(hdr->type); 1092 1093 switch (type) { 1094 case RTRS_MSG_WRITE: 1095 process_write(con, msg, id, off); 1096 break; 1097 case RTRS_MSG_READ: 1098 process_read(con, msg, id, off); 1099 break; 1100 default: 1101 rtrs_err(s, 1102 "Processing I/O request failed, unknown message type received: 0x%02x\n", 1103 type); 1104 goto err; 1105 } 1106 1107 return; 1108 1109 err: 1110 close_sess(sess); 1111 } 1112 1113 static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc) 1114 { 1115 struct rtrs_srv_mr *mr = 1116 container_of(wc->wr_cqe, typeof(*mr), inv_cqe); 1117 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1118 struct rtrs_sess *s = con->c.sess; 1119 struct rtrs_srv_sess *sess = to_srv_sess(s); 1120 struct rtrs_srv *srv = sess->srv; 1121 u32 msg_id, off; 1122 void *data; 1123 1124 if (unlikely(wc->status != IB_WC_SUCCESS)) { 1125 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n", 1126 ib_wc_status_msg(wc->status)); 1127 close_sess(sess); 1128 } 1129 msg_id = mr->msg_id; 1130 off = mr->msg_off; 1131 data = page_address(srv->chunks[msg_id]) + off; 1132 process_io_req(con, data, msg_id, off); 1133 } 1134 1135 static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con, 1136 struct rtrs_srv_mr *mr) 1137 { 1138 struct ib_send_wr wr = { 1139 .opcode = IB_WR_LOCAL_INV, 1140 .wr_cqe = &mr->inv_cqe, 1141 .send_flags = IB_SEND_SIGNALED, 1142 .ex.invalidate_rkey = mr->mr->rkey, 1143 }; 1144 mr->inv_cqe.done = rtrs_srv_inv_rkey_done; 1145 1146 return ib_post_send(con->c.qp, &wr, NULL); 1147 } 1148 1149 static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con) 1150 { 1151 spin_lock(&con->rsp_wr_wait_lock); 1152 while (!list_empty(&con->rsp_wr_wait_list)) { 1153 struct rtrs_srv_op *id; 1154 int ret; 1155 1156 id = list_entry(con->rsp_wr_wait_list.next, 1157 struct rtrs_srv_op, wait_list); 1158 list_del(&id->wait_list); 1159 1160 spin_unlock(&con->rsp_wr_wait_lock); 1161 ret = rtrs_srv_resp_rdma(id, id->status); 1162 spin_lock(&con->rsp_wr_wait_lock); 1163 1164 if (!ret) { 1165 list_add(&id->wait_list, &con->rsp_wr_wait_list); 1166 break; 1167 } 1168 } 1169 spin_unlock(&con->rsp_wr_wait_lock); 1170 } 1171 1172 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc) 1173 { 1174 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1175 struct rtrs_sess *s = con->c.sess; 1176 struct rtrs_srv_sess *sess = to_srv_sess(s); 1177 struct rtrs_srv *srv = sess->srv; 1178 u32 imm_type, imm_payload; 1179 int err; 1180 1181 if (unlikely(wc->status != IB_WC_SUCCESS)) { 1182 if (wc->status != IB_WC_WR_FLUSH_ERR) { 1183 rtrs_err(s, 1184 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n", 1185 ib_wc_status_msg(wc->status), wc->wr_cqe, 1186 wc->opcode, wc->vendor_err, wc->byte_len); 1187 close_sess(sess); 1188 } 1189 return; 1190 } 1191 1192 switch (wc->opcode) { 1193 case IB_WC_RECV_RDMA_WITH_IMM: 1194 /* 1195 * post_recv() RDMA write completions of IO reqs (read/write) 1196 * and hb 1197 */ 1198 if (WARN_ON(wc->wr_cqe != &io_comp_cqe)) 1199 return; 1200 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 1201 if (unlikely(err)) { 1202 rtrs_err(s, "rtrs_post_recv(), err: %d\n", err); 1203 close_sess(sess); 1204 break; 1205 } 1206 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), 1207 &imm_type, &imm_payload); 1208 if (likely(imm_type == RTRS_IO_REQ_IMM)) { 1209 u32 msg_id, off; 1210 void *data; 1211 1212 msg_id = imm_payload >> sess->mem_bits; 1213 off = imm_payload & ((1 << sess->mem_bits) - 1); 1214 if (unlikely(msg_id >= srv->queue_depth || 1215 off >= max_chunk_size)) { 1216 rtrs_err(s, "Wrong msg_id %u, off %u\n", 1217 msg_id, off); 1218 close_sess(sess); 1219 return; 1220 } 1221 if (always_invalidate) { 1222 struct rtrs_srv_mr *mr = &sess->mrs[msg_id]; 1223 1224 mr->msg_off = off; 1225 mr->msg_id = msg_id; 1226 err = rtrs_srv_inv_rkey(con, mr); 1227 if (unlikely(err)) { 1228 rtrs_err(s, "rtrs_post_recv(), err: %d\n", 1229 err); 1230 close_sess(sess); 1231 break; 1232 } 1233 } else { 1234 data = page_address(srv->chunks[msg_id]) + off; 1235 process_io_req(con, data, msg_id, off); 1236 } 1237 } else if (imm_type == RTRS_HB_MSG_IMM) { 1238 WARN_ON(con->c.cid); 1239 rtrs_send_hb_ack(&sess->s); 1240 } else if (imm_type == RTRS_HB_ACK_IMM) { 1241 WARN_ON(con->c.cid); 1242 sess->s.hb_missed_cnt = 0; 1243 } else { 1244 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type); 1245 } 1246 break; 1247 case IB_WC_RDMA_WRITE: 1248 case IB_WC_SEND: 1249 /* 1250 * post_send() RDMA write completions of IO reqs (read/write) 1251 */ 1252 atomic_add(srv->queue_depth, &con->sq_wr_avail); 1253 1254 if (unlikely(!list_empty_careful(&con->rsp_wr_wait_list))) 1255 rtrs_rdma_process_wr_wait_list(con); 1256 1257 break; 1258 default: 1259 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode); 1260 return; 1261 } 1262 } 1263 1264 /** 1265 * rtrs_srv_get_sess_name() - Get rtrs_srv peer hostname. 1266 * @srv: Session 1267 * @sessname: Sessname buffer 1268 * @len: Length of sessname buffer 1269 */ 1270 int rtrs_srv_get_sess_name(struct rtrs_srv *srv, char *sessname, size_t len) 1271 { 1272 struct rtrs_srv_sess *sess; 1273 int err = -ENOTCONN; 1274 1275 mutex_lock(&srv->paths_mutex); 1276 list_for_each_entry(sess, &srv->paths_list, s.entry) { 1277 if (sess->state != RTRS_SRV_CONNECTED) 1278 continue; 1279 strlcpy(sessname, sess->s.sessname, 1280 min_t(size_t, sizeof(sess->s.sessname), len)); 1281 err = 0; 1282 break; 1283 } 1284 mutex_unlock(&srv->paths_mutex); 1285 1286 return err; 1287 } 1288 EXPORT_SYMBOL(rtrs_srv_get_sess_name); 1289 1290 /** 1291 * rtrs_srv_get_sess_qdepth() - Get rtrs_srv qdepth. 1292 * @srv: Session 1293 */ 1294 int rtrs_srv_get_queue_depth(struct rtrs_srv *srv) 1295 { 1296 return srv->queue_depth; 1297 } 1298 EXPORT_SYMBOL(rtrs_srv_get_queue_depth); 1299 1300 static int find_next_bit_ring(struct rtrs_srv_sess *sess) 1301 { 1302 struct ib_device *ib_dev = sess->s.dev->ib_dev; 1303 int v; 1304 1305 v = cpumask_next(sess->cur_cq_vector, &cq_affinity_mask); 1306 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors) 1307 v = cpumask_first(&cq_affinity_mask); 1308 return v; 1309 } 1310 1311 static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_sess *sess) 1312 { 1313 sess->cur_cq_vector = find_next_bit_ring(sess); 1314 1315 return sess->cur_cq_vector; 1316 } 1317 1318 static void rtrs_srv_dev_release(struct device *dev) 1319 { 1320 struct rtrs_srv *srv = container_of(dev, struct rtrs_srv, dev); 1321 1322 kfree(srv); 1323 } 1324 1325 static void free_srv(struct rtrs_srv *srv) 1326 { 1327 int i; 1328 1329 WARN_ON(refcount_read(&srv->refcount)); 1330 for (i = 0; i < srv->queue_depth; i++) 1331 mempool_free(srv->chunks[i], chunk_pool); 1332 kfree(srv->chunks); 1333 mutex_destroy(&srv->paths_mutex); 1334 mutex_destroy(&srv->paths_ev_mutex); 1335 /* last put to release the srv structure */ 1336 put_device(&srv->dev); 1337 } 1338 1339 static struct rtrs_srv *get_or_create_srv(struct rtrs_srv_ctx *ctx, 1340 const uuid_t *paths_uuid, 1341 bool first_conn) 1342 { 1343 struct rtrs_srv *srv; 1344 int i; 1345 1346 mutex_lock(&ctx->srv_mutex); 1347 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 1348 if (uuid_equal(&srv->paths_uuid, paths_uuid) && 1349 refcount_inc_not_zero(&srv->refcount)) { 1350 mutex_unlock(&ctx->srv_mutex); 1351 return srv; 1352 } 1353 } 1354 mutex_unlock(&ctx->srv_mutex); 1355 /* 1356 * If this request is not the first connection request from the 1357 * client for this session then fail and return error. 1358 */ 1359 if (!first_conn) { 1360 pr_err_ratelimited("Error: Not the first connection request for this session\n"); 1361 return ERR_PTR(-ENXIO); 1362 } 1363 1364 /* need to allocate a new srv */ 1365 srv = kzalloc(sizeof(*srv), GFP_KERNEL); 1366 if (!srv) 1367 return ERR_PTR(-ENOMEM); 1368 1369 INIT_LIST_HEAD(&srv->paths_list); 1370 mutex_init(&srv->paths_mutex); 1371 mutex_init(&srv->paths_ev_mutex); 1372 uuid_copy(&srv->paths_uuid, paths_uuid); 1373 srv->queue_depth = sess_queue_depth; 1374 srv->ctx = ctx; 1375 device_initialize(&srv->dev); 1376 srv->dev.release = rtrs_srv_dev_release; 1377 1378 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks), 1379 GFP_KERNEL); 1380 if (!srv->chunks) 1381 goto err_free_srv; 1382 1383 for (i = 0; i < srv->queue_depth; i++) { 1384 srv->chunks[i] = mempool_alloc(chunk_pool, GFP_KERNEL); 1385 if (!srv->chunks[i]) 1386 goto err_free_chunks; 1387 } 1388 refcount_set(&srv->refcount, 1); 1389 mutex_lock(&ctx->srv_mutex); 1390 list_add(&srv->ctx_list, &ctx->srv_list); 1391 mutex_unlock(&ctx->srv_mutex); 1392 1393 return srv; 1394 1395 err_free_chunks: 1396 while (i--) 1397 mempool_free(srv->chunks[i], chunk_pool); 1398 kfree(srv->chunks); 1399 1400 err_free_srv: 1401 kfree(srv); 1402 return ERR_PTR(-ENOMEM); 1403 } 1404 1405 static void put_srv(struct rtrs_srv *srv) 1406 { 1407 if (refcount_dec_and_test(&srv->refcount)) { 1408 struct rtrs_srv_ctx *ctx = srv->ctx; 1409 1410 WARN_ON(srv->dev.kobj.state_in_sysfs); 1411 1412 mutex_lock(&ctx->srv_mutex); 1413 list_del(&srv->ctx_list); 1414 mutex_unlock(&ctx->srv_mutex); 1415 free_srv(srv); 1416 } 1417 } 1418 1419 static void __add_path_to_srv(struct rtrs_srv *srv, 1420 struct rtrs_srv_sess *sess) 1421 { 1422 list_add_tail(&sess->s.entry, &srv->paths_list); 1423 srv->paths_num++; 1424 WARN_ON(srv->paths_num >= MAX_PATHS_NUM); 1425 } 1426 1427 static void del_path_from_srv(struct rtrs_srv_sess *sess) 1428 { 1429 struct rtrs_srv *srv = sess->srv; 1430 1431 if (WARN_ON(!srv)) 1432 return; 1433 1434 mutex_lock(&srv->paths_mutex); 1435 list_del(&sess->s.entry); 1436 WARN_ON(!srv->paths_num); 1437 srv->paths_num--; 1438 mutex_unlock(&srv->paths_mutex); 1439 } 1440 1441 /* return true if addresses are the same, error other wise */ 1442 static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b) 1443 { 1444 switch (a->sa_family) { 1445 case AF_IB: 1446 return memcmp(&((struct sockaddr_ib *)a)->sib_addr, 1447 &((struct sockaddr_ib *)b)->sib_addr, 1448 sizeof(struct ib_addr)) && 1449 (b->sa_family == AF_IB); 1450 case AF_INET: 1451 return memcmp(&((struct sockaddr_in *)a)->sin_addr, 1452 &((struct sockaddr_in *)b)->sin_addr, 1453 sizeof(struct in_addr)) && 1454 (b->sa_family == AF_INET); 1455 case AF_INET6: 1456 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr, 1457 &((struct sockaddr_in6 *)b)->sin6_addr, 1458 sizeof(struct in6_addr)) && 1459 (b->sa_family == AF_INET6); 1460 default: 1461 return -ENOENT; 1462 } 1463 } 1464 1465 static bool __is_path_w_addr_exists(struct rtrs_srv *srv, 1466 struct rdma_addr *addr) 1467 { 1468 struct rtrs_srv_sess *sess; 1469 1470 list_for_each_entry(sess, &srv->paths_list, s.entry) 1471 if (!sockaddr_cmp((struct sockaddr *)&sess->s.dst_addr, 1472 (struct sockaddr *)&addr->dst_addr) && 1473 !sockaddr_cmp((struct sockaddr *)&sess->s.src_addr, 1474 (struct sockaddr *)&addr->src_addr)) 1475 return true; 1476 1477 return false; 1478 } 1479 1480 static void free_sess(struct rtrs_srv_sess *sess) 1481 { 1482 if (sess->kobj.state_in_sysfs) { 1483 kobject_del(&sess->kobj); 1484 kobject_put(&sess->kobj); 1485 } else { 1486 kfree(sess); 1487 } 1488 } 1489 1490 static void rtrs_srv_close_work(struct work_struct *work) 1491 { 1492 struct rtrs_srv_sess *sess; 1493 struct rtrs_srv_con *con; 1494 int i; 1495 1496 sess = container_of(work, typeof(*sess), close_work); 1497 1498 rtrs_srv_destroy_sess_files(sess); 1499 rtrs_srv_stop_hb(sess); 1500 1501 for (i = 0; i < sess->s.con_num; i++) { 1502 if (!sess->s.con[i]) 1503 continue; 1504 con = to_srv_con(sess->s.con[i]); 1505 rdma_disconnect(con->c.cm_id); 1506 ib_drain_qp(con->c.qp); 1507 } 1508 /* Wait for all inflights */ 1509 rtrs_srv_wait_ops_ids(sess); 1510 1511 /* Notify upper layer if we are the last path */ 1512 rtrs_srv_sess_down(sess); 1513 1514 unmap_cont_bufs(sess); 1515 rtrs_srv_free_ops_ids(sess); 1516 1517 for (i = 0; i < sess->s.con_num; i++) { 1518 if (!sess->s.con[i]) 1519 continue; 1520 con = to_srv_con(sess->s.con[i]); 1521 rtrs_cq_qp_destroy(&con->c); 1522 rdma_destroy_id(con->c.cm_id); 1523 kfree(con); 1524 } 1525 rtrs_ib_dev_put(sess->s.dev); 1526 1527 del_path_from_srv(sess); 1528 put_srv(sess->srv); 1529 sess->srv = NULL; 1530 rtrs_srv_change_state(sess, RTRS_SRV_CLOSED); 1531 1532 kfree(sess->dma_addr); 1533 kfree(sess->s.con); 1534 free_sess(sess); 1535 } 1536 1537 static int rtrs_rdma_do_accept(struct rtrs_srv_sess *sess, 1538 struct rdma_cm_id *cm_id) 1539 { 1540 struct rtrs_srv *srv = sess->srv; 1541 struct rtrs_msg_conn_rsp msg; 1542 struct rdma_conn_param param; 1543 int err; 1544 1545 param = (struct rdma_conn_param) { 1546 .rnr_retry_count = 7, 1547 .private_data = &msg, 1548 .private_data_len = sizeof(msg), 1549 }; 1550 1551 msg = (struct rtrs_msg_conn_rsp) { 1552 .magic = cpu_to_le16(RTRS_MAGIC), 1553 .version = cpu_to_le16(RTRS_PROTO_VER), 1554 .queue_depth = cpu_to_le16(srv->queue_depth), 1555 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE), 1556 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE), 1557 }; 1558 1559 if (always_invalidate) 1560 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F); 1561 1562 err = rdma_accept(cm_id, ¶m); 1563 if (err) 1564 pr_err("rdma_accept(), err: %d\n", err); 1565 1566 return err; 1567 } 1568 1569 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno) 1570 { 1571 struct rtrs_msg_conn_rsp msg; 1572 int err; 1573 1574 msg = (struct rtrs_msg_conn_rsp) { 1575 .magic = cpu_to_le16(RTRS_MAGIC), 1576 .version = cpu_to_le16(RTRS_PROTO_VER), 1577 .errno = cpu_to_le16(errno), 1578 }; 1579 1580 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED); 1581 if (err) 1582 pr_err("rdma_reject(), err: %d\n", err); 1583 1584 /* Bounce errno back */ 1585 return errno; 1586 } 1587 1588 static struct rtrs_srv_sess * 1589 __find_sess(struct rtrs_srv *srv, const uuid_t *sess_uuid) 1590 { 1591 struct rtrs_srv_sess *sess; 1592 1593 list_for_each_entry(sess, &srv->paths_list, s.entry) { 1594 if (uuid_equal(&sess->s.uuid, sess_uuid)) 1595 return sess; 1596 } 1597 1598 return NULL; 1599 } 1600 1601 static int create_con(struct rtrs_srv_sess *sess, 1602 struct rdma_cm_id *cm_id, 1603 unsigned int cid) 1604 { 1605 struct rtrs_srv *srv = sess->srv; 1606 struct rtrs_sess *s = &sess->s; 1607 struct rtrs_srv_con *con; 1608 1609 u32 cq_size, wr_queue_size; 1610 int err, cq_vector; 1611 1612 con = kzalloc(sizeof(*con), GFP_KERNEL); 1613 if (!con) { 1614 err = -ENOMEM; 1615 goto err; 1616 } 1617 1618 spin_lock_init(&con->rsp_wr_wait_lock); 1619 INIT_LIST_HEAD(&con->rsp_wr_wait_list); 1620 con->c.cm_id = cm_id; 1621 con->c.sess = &sess->s; 1622 con->c.cid = cid; 1623 atomic_set(&con->wr_cnt, 1); 1624 1625 if (con->c.cid == 0) { 1626 /* 1627 * All receive and all send (each requiring invalidate) 1628 * + 2 for drain and heartbeat 1629 */ 1630 wr_queue_size = SERVICE_CON_QUEUE_DEPTH * 3 + 2; 1631 cq_size = wr_queue_size; 1632 } else { 1633 /* 1634 * If we have all receive requests posted and 1635 * all write requests posted and each read request 1636 * requires an invalidate request + drain 1637 * and qp gets into error state. 1638 */ 1639 cq_size = srv->queue_depth * 3 + 1; 1640 /* 1641 * In theory we might have queue_depth * 32 1642 * outstanding requests if an unsafe global key is used 1643 * and we have queue_depth read requests each consisting 1644 * of 32 different addresses. div 3 for mlx5. 1645 */ 1646 wr_queue_size = sess->s.dev->ib_dev->attrs.max_qp_wr / 3; 1647 } 1648 atomic_set(&con->sq_wr_avail, wr_queue_size); 1649 cq_vector = rtrs_srv_get_next_cq_vector(sess); 1650 1651 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */ 1652 err = rtrs_cq_qp_create(&sess->s, &con->c, 1, cq_vector, cq_size, 1653 wr_queue_size, wr_queue_size, 1654 IB_POLL_WORKQUEUE); 1655 if (err) { 1656 rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err); 1657 goto free_con; 1658 } 1659 if (con->c.cid == 0) { 1660 err = post_recv_info_req(con); 1661 if (err) 1662 goto free_cqqp; 1663 } 1664 WARN_ON(sess->s.con[cid]); 1665 sess->s.con[cid] = &con->c; 1666 1667 /* 1668 * Change context from server to current connection. The other 1669 * way is to use cm_id->qp->qp_context, which does not work on OFED. 1670 */ 1671 cm_id->context = &con->c; 1672 1673 return 0; 1674 1675 free_cqqp: 1676 rtrs_cq_qp_destroy(&con->c); 1677 free_con: 1678 kfree(con); 1679 1680 err: 1681 return err; 1682 } 1683 1684 static struct rtrs_srv_sess *__alloc_sess(struct rtrs_srv *srv, 1685 struct rdma_cm_id *cm_id, 1686 unsigned int con_num, 1687 unsigned int recon_cnt, 1688 const uuid_t *uuid) 1689 { 1690 struct rtrs_srv_sess *sess; 1691 int err = -ENOMEM; 1692 char str[NAME_MAX]; 1693 struct rtrs_addr path; 1694 1695 if (srv->paths_num >= MAX_PATHS_NUM) { 1696 err = -ECONNRESET; 1697 goto err; 1698 } 1699 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) { 1700 err = -EEXIST; 1701 pr_err("Path with same addr exists\n"); 1702 goto err; 1703 } 1704 sess = kzalloc(sizeof(*sess), GFP_KERNEL); 1705 if (!sess) 1706 goto err; 1707 1708 sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL); 1709 if (!sess->stats) 1710 goto err_free_sess; 1711 1712 sess->stats->sess = sess; 1713 1714 sess->dma_addr = kcalloc(srv->queue_depth, sizeof(*sess->dma_addr), 1715 GFP_KERNEL); 1716 if (!sess->dma_addr) 1717 goto err_free_stats; 1718 1719 sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL); 1720 if (!sess->s.con) 1721 goto err_free_dma_addr; 1722 1723 sess->state = RTRS_SRV_CONNECTING; 1724 sess->srv = srv; 1725 sess->cur_cq_vector = -1; 1726 sess->s.dst_addr = cm_id->route.addr.dst_addr; 1727 sess->s.src_addr = cm_id->route.addr.src_addr; 1728 1729 /* temporary until receiving session-name from client */ 1730 path.src = &sess->s.src_addr; 1731 path.dst = &sess->s.dst_addr; 1732 rtrs_addr_to_str(&path, str, sizeof(str)); 1733 strlcpy(sess->s.sessname, str, sizeof(sess->s.sessname)); 1734 1735 sess->s.con_num = con_num; 1736 sess->s.recon_cnt = recon_cnt; 1737 uuid_copy(&sess->s.uuid, uuid); 1738 spin_lock_init(&sess->state_lock); 1739 INIT_WORK(&sess->close_work, rtrs_srv_close_work); 1740 rtrs_srv_init_hb(sess); 1741 1742 sess->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd); 1743 if (!sess->s.dev) { 1744 err = -ENOMEM; 1745 goto err_free_con; 1746 } 1747 err = map_cont_bufs(sess); 1748 if (err) 1749 goto err_put_dev; 1750 1751 err = rtrs_srv_alloc_ops_ids(sess); 1752 if (err) 1753 goto err_unmap_bufs; 1754 1755 __add_path_to_srv(srv, sess); 1756 1757 return sess; 1758 1759 err_unmap_bufs: 1760 unmap_cont_bufs(sess); 1761 err_put_dev: 1762 rtrs_ib_dev_put(sess->s.dev); 1763 err_free_con: 1764 kfree(sess->s.con); 1765 err_free_dma_addr: 1766 kfree(sess->dma_addr); 1767 err_free_stats: 1768 kfree(sess->stats); 1769 err_free_sess: 1770 kfree(sess); 1771 err: 1772 return ERR_PTR(err); 1773 } 1774 1775 static int rtrs_rdma_connect(struct rdma_cm_id *cm_id, 1776 const struct rtrs_msg_conn_req *msg, 1777 size_t len) 1778 { 1779 struct rtrs_srv_ctx *ctx = cm_id->context; 1780 struct rtrs_srv_sess *sess; 1781 struct rtrs_srv *srv; 1782 1783 u16 version, con_num, cid; 1784 u16 recon_cnt; 1785 int err = -ECONNRESET; 1786 1787 if (len < sizeof(*msg)) { 1788 pr_err("Invalid RTRS connection request\n"); 1789 goto reject_w_err; 1790 } 1791 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) { 1792 pr_err("Invalid RTRS magic\n"); 1793 goto reject_w_err; 1794 } 1795 version = le16_to_cpu(msg->version); 1796 if (version >> 8 != RTRS_PROTO_VER_MAJOR) { 1797 pr_err("Unsupported major RTRS version: %d, expected %d\n", 1798 version >> 8, RTRS_PROTO_VER_MAJOR); 1799 goto reject_w_err; 1800 } 1801 con_num = le16_to_cpu(msg->cid_num); 1802 if (con_num > 4096) { 1803 /* Sanity check */ 1804 pr_err("Too many connections requested: %d\n", con_num); 1805 goto reject_w_err; 1806 } 1807 cid = le16_to_cpu(msg->cid); 1808 if (cid >= con_num) { 1809 /* Sanity check */ 1810 pr_err("Incorrect cid: %d >= %d\n", cid, con_num); 1811 goto reject_w_err; 1812 } 1813 recon_cnt = le16_to_cpu(msg->recon_cnt); 1814 srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn); 1815 if (IS_ERR(srv)) { 1816 err = PTR_ERR(srv); 1817 pr_err("get_or_create_srv(), error %d\n", err); 1818 goto reject_w_err; 1819 } 1820 mutex_lock(&srv->paths_mutex); 1821 sess = __find_sess(srv, &msg->sess_uuid); 1822 if (sess) { 1823 struct rtrs_sess *s = &sess->s; 1824 1825 /* Session already holds a reference */ 1826 put_srv(srv); 1827 1828 if (sess->state != RTRS_SRV_CONNECTING) { 1829 rtrs_err(s, "Session in wrong state: %s\n", 1830 rtrs_srv_state_str(sess->state)); 1831 mutex_unlock(&srv->paths_mutex); 1832 goto reject_w_err; 1833 } 1834 /* 1835 * Sanity checks 1836 */ 1837 if (con_num != s->con_num || cid >= s->con_num) { 1838 rtrs_err(s, "Incorrect request: %d, %d\n", 1839 cid, con_num); 1840 mutex_unlock(&srv->paths_mutex); 1841 goto reject_w_err; 1842 } 1843 if (s->con[cid]) { 1844 rtrs_err(s, "Connection already exists: %d\n", 1845 cid); 1846 mutex_unlock(&srv->paths_mutex); 1847 goto reject_w_err; 1848 } 1849 } else { 1850 sess = __alloc_sess(srv, cm_id, con_num, recon_cnt, 1851 &msg->sess_uuid); 1852 if (IS_ERR(sess)) { 1853 mutex_unlock(&srv->paths_mutex); 1854 put_srv(srv); 1855 err = PTR_ERR(sess); 1856 pr_err("RTRS server session allocation failed: %d\n", err); 1857 goto reject_w_err; 1858 } 1859 } 1860 err = create_con(sess, cm_id, cid); 1861 if (err) { 1862 rtrs_err((&sess->s), "create_con(), error %d\n", err); 1863 (void)rtrs_rdma_do_reject(cm_id, err); 1864 /* 1865 * Since session has other connections we follow normal way 1866 * through workqueue, but still return an error to tell cma.c 1867 * to call rdma_destroy_id() for current connection. 1868 */ 1869 goto close_and_return_err; 1870 } 1871 err = rtrs_rdma_do_accept(sess, cm_id); 1872 if (err) { 1873 rtrs_err((&sess->s), "rtrs_rdma_do_accept(), error %d\n", err); 1874 (void)rtrs_rdma_do_reject(cm_id, err); 1875 /* 1876 * Since current connection was successfully added to the 1877 * session we follow normal way through workqueue to close the 1878 * session, thus return 0 to tell cma.c we call 1879 * rdma_destroy_id() ourselves. 1880 */ 1881 err = 0; 1882 goto close_and_return_err; 1883 } 1884 mutex_unlock(&srv->paths_mutex); 1885 1886 return 0; 1887 1888 reject_w_err: 1889 return rtrs_rdma_do_reject(cm_id, err); 1890 1891 close_and_return_err: 1892 mutex_unlock(&srv->paths_mutex); 1893 close_sess(sess); 1894 1895 return err; 1896 } 1897 1898 static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id, 1899 struct rdma_cm_event *ev) 1900 { 1901 struct rtrs_srv_sess *sess = NULL; 1902 struct rtrs_sess *s = NULL; 1903 1904 if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 1905 struct rtrs_con *c = cm_id->context; 1906 1907 s = c->sess; 1908 sess = to_srv_sess(s); 1909 } 1910 1911 switch (ev->event) { 1912 case RDMA_CM_EVENT_CONNECT_REQUEST: 1913 /* 1914 * In case of error cma.c will destroy cm_id, 1915 * see cma_process_remove() 1916 */ 1917 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data, 1918 ev->param.conn.private_data_len); 1919 case RDMA_CM_EVENT_ESTABLISHED: 1920 /* Nothing here */ 1921 break; 1922 case RDMA_CM_EVENT_REJECTED: 1923 case RDMA_CM_EVENT_CONNECT_ERROR: 1924 case RDMA_CM_EVENT_UNREACHABLE: 1925 rtrs_err(s, "CM error (CM event: %s, err: %d)\n", 1926 rdma_event_msg(ev->event), ev->status); 1927 fallthrough; 1928 case RDMA_CM_EVENT_DISCONNECTED: 1929 case RDMA_CM_EVENT_ADDR_CHANGE: 1930 case RDMA_CM_EVENT_TIMEWAIT_EXIT: 1931 case RDMA_CM_EVENT_DEVICE_REMOVAL: 1932 close_sess(sess); 1933 break; 1934 default: 1935 pr_err("Ignoring unexpected CM event %s, err %d\n", 1936 rdma_event_msg(ev->event), ev->status); 1937 break; 1938 } 1939 1940 return 0; 1941 } 1942 1943 static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx, 1944 struct sockaddr *addr, 1945 enum rdma_ucm_port_space ps) 1946 { 1947 struct rdma_cm_id *cm_id; 1948 int ret; 1949 1950 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler, 1951 ctx, ps, IB_QPT_RC); 1952 if (IS_ERR(cm_id)) { 1953 ret = PTR_ERR(cm_id); 1954 pr_err("Creating id for RDMA connection failed, err: %d\n", 1955 ret); 1956 goto err_out; 1957 } 1958 ret = rdma_bind_addr(cm_id, addr); 1959 if (ret) { 1960 pr_err("Binding RDMA address failed, err: %d\n", ret); 1961 goto err_cm; 1962 } 1963 ret = rdma_listen(cm_id, 64); 1964 if (ret) { 1965 pr_err("Listening on RDMA connection failed, err: %d\n", 1966 ret); 1967 goto err_cm; 1968 } 1969 1970 return cm_id; 1971 1972 err_cm: 1973 rdma_destroy_id(cm_id); 1974 err_out: 1975 1976 return ERR_PTR(ret); 1977 } 1978 1979 static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port) 1980 { 1981 struct sockaddr_in6 sin = { 1982 .sin6_family = AF_INET6, 1983 .sin6_addr = IN6ADDR_ANY_INIT, 1984 .sin6_port = htons(port), 1985 }; 1986 struct sockaddr_ib sib = { 1987 .sib_family = AF_IB, 1988 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port), 1989 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL), 1990 .sib_pkey = cpu_to_be16(0xffff), 1991 }; 1992 struct rdma_cm_id *cm_ip, *cm_ib; 1993 int ret; 1994 1995 /* 1996 * We accept both IPoIB and IB connections, so we need to keep 1997 * two cm id's, one for each socket type and port space. 1998 * If the cm initialization of one of the id's fails, we abort 1999 * everything. 2000 */ 2001 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP); 2002 if (IS_ERR(cm_ip)) 2003 return PTR_ERR(cm_ip); 2004 2005 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB); 2006 if (IS_ERR(cm_ib)) { 2007 ret = PTR_ERR(cm_ib); 2008 goto free_cm_ip; 2009 } 2010 2011 ctx->cm_id_ip = cm_ip; 2012 ctx->cm_id_ib = cm_ib; 2013 2014 return 0; 2015 2016 free_cm_ip: 2017 rdma_destroy_id(cm_ip); 2018 2019 return ret; 2020 } 2021 2022 static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops) 2023 { 2024 struct rtrs_srv_ctx *ctx; 2025 2026 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); 2027 if (!ctx) 2028 return NULL; 2029 2030 ctx->ops = *ops; 2031 mutex_init(&ctx->srv_mutex); 2032 INIT_LIST_HEAD(&ctx->srv_list); 2033 2034 return ctx; 2035 } 2036 2037 static void free_srv_ctx(struct rtrs_srv_ctx *ctx) 2038 { 2039 WARN_ON(!list_empty(&ctx->srv_list)); 2040 mutex_destroy(&ctx->srv_mutex); 2041 kfree(ctx); 2042 } 2043 2044 static int rtrs_srv_add_one(struct ib_device *device) 2045 { 2046 struct rtrs_srv_ctx *ctx; 2047 int ret = 0; 2048 2049 mutex_lock(&ib_ctx.ib_dev_mutex); 2050 if (ib_ctx.ib_dev_count) 2051 goto out; 2052 2053 /* 2054 * Since our CM IDs are NOT bound to any ib device we will create them 2055 * only once 2056 */ 2057 ctx = ib_ctx.srv_ctx; 2058 ret = rtrs_srv_rdma_init(ctx, ib_ctx.port); 2059 if (ret) { 2060 /* 2061 * We errored out here. 2062 * According to the ib code, if we encounter an error here then the 2063 * error code is ignored, and no more calls to our ops are made. 2064 */ 2065 pr_err("Failed to initialize RDMA connection"); 2066 goto err_out; 2067 } 2068 2069 out: 2070 /* 2071 * Keep a track on the number of ib devices added 2072 */ 2073 ib_ctx.ib_dev_count++; 2074 2075 err_out: 2076 mutex_unlock(&ib_ctx.ib_dev_mutex); 2077 return ret; 2078 } 2079 2080 static void rtrs_srv_remove_one(struct ib_device *device, void *client_data) 2081 { 2082 struct rtrs_srv_ctx *ctx; 2083 2084 mutex_lock(&ib_ctx.ib_dev_mutex); 2085 ib_ctx.ib_dev_count--; 2086 2087 if (ib_ctx.ib_dev_count) 2088 goto out; 2089 2090 /* 2091 * Since our CM IDs are NOT bound to any ib device we will remove them 2092 * only once, when the last device is removed 2093 */ 2094 ctx = ib_ctx.srv_ctx; 2095 rdma_destroy_id(ctx->cm_id_ip); 2096 rdma_destroy_id(ctx->cm_id_ib); 2097 2098 out: 2099 mutex_unlock(&ib_ctx.ib_dev_mutex); 2100 } 2101 2102 static struct ib_client rtrs_srv_client = { 2103 .name = "rtrs_server", 2104 .add = rtrs_srv_add_one, 2105 .remove = rtrs_srv_remove_one 2106 }; 2107 2108 /** 2109 * rtrs_srv_open() - open RTRS server context 2110 * @ops: callback functions 2111 * @port: port to listen on 2112 * 2113 * Creates server context with specified callbacks. 2114 * 2115 * Return a valid pointer on success otherwise PTR_ERR. 2116 */ 2117 struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port) 2118 { 2119 struct rtrs_srv_ctx *ctx; 2120 int err; 2121 2122 ctx = alloc_srv_ctx(ops); 2123 if (!ctx) 2124 return ERR_PTR(-ENOMEM); 2125 2126 mutex_init(&ib_ctx.ib_dev_mutex); 2127 ib_ctx.srv_ctx = ctx; 2128 ib_ctx.port = port; 2129 2130 err = ib_register_client(&rtrs_srv_client); 2131 if (err) { 2132 free_srv_ctx(ctx); 2133 return ERR_PTR(err); 2134 } 2135 2136 return ctx; 2137 } 2138 EXPORT_SYMBOL(rtrs_srv_open); 2139 2140 static void close_sessions(struct rtrs_srv *srv) 2141 { 2142 struct rtrs_srv_sess *sess; 2143 2144 mutex_lock(&srv->paths_mutex); 2145 list_for_each_entry(sess, &srv->paths_list, s.entry) 2146 close_sess(sess); 2147 mutex_unlock(&srv->paths_mutex); 2148 } 2149 2150 static void close_ctx(struct rtrs_srv_ctx *ctx) 2151 { 2152 struct rtrs_srv *srv; 2153 2154 mutex_lock(&ctx->srv_mutex); 2155 list_for_each_entry(srv, &ctx->srv_list, ctx_list) 2156 close_sessions(srv); 2157 mutex_unlock(&ctx->srv_mutex); 2158 flush_workqueue(rtrs_wq); 2159 } 2160 2161 /** 2162 * rtrs_srv_close() - close RTRS server context 2163 * @ctx: pointer to server context 2164 * 2165 * Closes RTRS server context with all client sessions. 2166 */ 2167 void rtrs_srv_close(struct rtrs_srv_ctx *ctx) 2168 { 2169 ib_unregister_client(&rtrs_srv_client); 2170 mutex_destroy(&ib_ctx.ib_dev_mutex); 2171 close_ctx(ctx); 2172 free_srv_ctx(ctx); 2173 } 2174 EXPORT_SYMBOL(rtrs_srv_close); 2175 2176 static int check_module_params(void) 2177 { 2178 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) { 2179 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n", 2180 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH); 2181 return -EINVAL; 2182 } 2183 if (max_chunk_size < 4096 || !is_power_of_2(max_chunk_size)) { 2184 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n", 2185 max_chunk_size, 4096); 2186 return -EINVAL; 2187 } 2188 2189 /* 2190 * Check if IB immediate data size is enough to hold the mem_id and the 2191 * offset inside the memory chunk 2192 */ 2193 if ((ilog2(sess_queue_depth - 1) + 1) + 2194 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) { 2195 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n", 2196 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size); 2197 return -EINVAL; 2198 } 2199 2200 return 0; 2201 } 2202 2203 static int __init rtrs_server_init(void) 2204 { 2205 int err; 2206 2207 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n", 2208 KBUILD_MODNAME, RTRS_PROTO_VER_STRING, 2209 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE, 2210 sess_queue_depth, always_invalidate); 2211 2212 rtrs_rdma_dev_pd_init(0, &dev_pd); 2213 2214 err = check_module_params(); 2215 if (err) { 2216 pr_err("Failed to load module, invalid module parameters, err: %d\n", 2217 err); 2218 return err; 2219 } 2220 chunk_pool = mempool_create_page_pool(sess_queue_depth * CHUNK_POOL_SZ, 2221 get_order(max_chunk_size)); 2222 if (!chunk_pool) 2223 return -ENOMEM; 2224 rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server"); 2225 if (IS_ERR(rtrs_dev_class)) { 2226 err = PTR_ERR(rtrs_dev_class); 2227 goto out_chunk_pool; 2228 } 2229 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0); 2230 if (!rtrs_wq) { 2231 err = -ENOMEM; 2232 goto out_dev_class; 2233 } 2234 2235 return 0; 2236 2237 out_dev_class: 2238 class_destroy(rtrs_dev_class); 2239 out_chunk_pool: 2240 mempool_destroy(chunk_pool); 2241 2242 return err; 2243 } 2244 2245 static void __exit rtrs_server_exit(void) 2246 { 2247 destroy_workqueue(rtrs_wq); 2248 class_destroy(rtrs_dev_class); 2249 mempool_destroy(chunk_pool); 2250 rtrs_rdma_dev_pd_deinit(&dev_pd); 2251 } 2252 2253 module_init(rtrs_server_init); 2254 module_exit(rtrs_server_exit); 2255