1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Transport Layer 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 #include <linux/mempool.h> 15 16 #include "rtrs-srv.h" 17 #include "rtrs-log.h" 18 #include <rdma/ib_cm.h> 19 #include <rdma/ib_verbs.h> 20 21 MODULE_DESCRIPTION("RDMA Transport Server"); 22 MODULE_LICENSE("GPL"); 23 24 /* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */ 25 #define DEFAULT_MAX_CHUNK_SIZE (128 << 10) 26 #define DEFAULT_SESS_QUEUE_DEPTH 512 27 #define MAX_HDR_SIZE PAGE_SIZE 28 29 /* We guarantee to serve 10 paths at least */ 30 #define CHUNK_POOL_SZ 10 31 32 static struct rtrs_rdma_dev_pd dev_pd; 33 static mempool_t *chunk_pool; 34 struct class *rtrs_dev_class; 35 static struct rtrs_srv_ib_ctx ib_ctx; 36 37 static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE; 38 static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH; 39 40 static bool always_invalidate = true; 41 module_param(always_invalidate, bool, 0444); 42 MODULE_PARM_DESC(always_invalidate, 43 "Invalidate memory registration for contiguous memory regions before accessing."); 44 45 module_param_named(max_chunk_size, max_chunk_size, int, 0444); 46 MODULE_PARM_DESC(max_chunk_size, 47 "Max size for each IO request, when change the unit is in byte (default: " 48 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)"); 49 50 module_param_named(sess_queue_depth, sess_queue_depth, int, 0444); 51 MODULE_PARM_DESC(sess_queue_depth, 52 "Number of buffers for pending I/O requests to allocate per session. Maximum: " 53 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: " 54 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")"); 55 56 static cpumask_t cq_affinity_mask = { CPU_BITS_ALL }; 57 58 static struct workqueue_struct *rtrs_wq; 59 60 static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c) 61 { 62 return container_of(c, struct rtrs_srv_con, c); 63 } 64 65 static inline struct rtrs_srv_sess *to_srv_sess(struct rtrs_sess *s) 66 { 67 return container_of(s, struct rtrs_srv_sess, s); 68 } 69 70 static bool __rtrs_srv_change_state(struct rtrs_srv_sess *sess, 71 enum rtrs_srv_state new_state) 72 { 73 enum rtrs_srv_state old_state; 74 bool changed = false; 75 76 lockdep_assert_held(&sess->state_lock); 77 old_state = sess->state; 78 switch (new_state) { 79 case RTRS_SRV_CONNECTED: 80 switch (old_state) { 81 case RTRS_SRV_CONNECTING: 82 changed = true; 83 fallthrough; 84 default: 85 break; 86 } 87 break; 88 case RTRS_SRV_CLOSING: 89 switch (old_state) { 90 case RTRS_SRV_CONNECTING: 91 case RTRS_SRV_CONNECTED: 92 changed = true; 93 fallthrough; 94 default: 95 break; 96 } 97 break; 98 case RTRS_SRV_CLOSED: 99 switch (old_state) { 100 case RTRS_SRV_CLOSING: 101 changed = true; 102 fallthrough; 103 default: 104 break; 105 } 106 break; 107 default: 108 break; 109 } 110 if (changed) 111 sess->state = new_state; 112 113 return changed; 114 } 115 116 static bool rtrs_srv_change_state(struct rtrs_srv_sess *sess, 117 enum rtrs_srv_state new_state) 118 { 119 bool changed; 120 121 spin_lock_irq(&sess->state_lock); 122 changed = __rtrs_srv_change_state(sess, new_state); 123 spin_unlock_irq(&sess->state_lock); 124 125 return changed; 126 } 127 128 static void free_id(struct rtrs_srv_op *id) 129 { 130 if (!id) 131 return; 132 kfree(id); 133 } 134 135 static void rtrs_srv_free_ops_ids(struct rtrs_srv_sess *sess) 136 { 137 struct rtrs_srv *srv = sess->srv; 138 int i; 139 140 WARN_ON(atomic_read(&sess->ids_inflight)); 141 if (sess->ops_ids) { 142 for (i = 0; i < srv->queue_depth; i++) 143 free_id(sess->ops_ids[i]); 144 kfree(sess->ops_ids); 145 sess->ops_ids = NULL; 146 } 147 } 148 149 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc); 150 151 static struct ib_cqe io_comp_cqe = { 152 .done = rtrs_srv_rdma_done 153 }; 154 155 static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_sess *sess) 156 { 157 struct rtrs_srv *srv = sess->srv; 158 struct rtrs_srv_op *id; 159 int i; 160 161 sess->ops_ids = kcalloc(srv->queue_depth, sizeof(*sess->ops_ids), 162 GFP_KERNEL); 163 if (!sess->ops_ids) 164 goto err; 165 166 for (i = 0; i < srv->queue_depth; ++i) { 167 id = kzalloc(sizeof(*id), GFP_KERNEL); 168 if (!id) 169 goto err; 170 171 sess->ops_ids[i] = id; 172 } 173 init_waitqueue_head(&sess->ids_waitq); 174 atomic_set(&sess->ids_inflight, 0); 175 176 return 0; 177 178 err: 179 rtrs_srv_free_ops_ids(sess); 180 return -ENOMEM; 181 } 182 183 static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_sess *sess) 184 { 185 atomic_inc(&sess->ids_inflight); 186 } 187 188 static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_sess *sess) 189 { 190 if (atomic_dec_and_test(&sess->ids_inflight)) 191 wake_up(&sess->ids_waitq); 192 } 193 194 static void rtrs_srv_wait_ops_ids(struct rtrs_srv_sess *sess) 195 { 196 wait_event(sess->ids_waitq, !atomic_read(&sess->ids_inflight)); 197 } 198 199 200 static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc) 201 { 202 struct rtrs_srv_con *con = cq->cq_context; 203 struct rtrs_sess *s = con->c.sess; 204 struct rtrs_srv_sess *sess = to_srv_sess(s); 205 206 if (unlikely(wc->status != IB_WC_SUCCESS)) { 207 rtrs_err(s, "REG MR failed: %s\n", 208 ib_wc_status_msg(wc->status)); 209 close_sess(sess); 210 return; 211 } 212 } 213 214 static struct ib_cqe local_reg_cqe = { 215 .done = rtrs_srv_reg_mr_done 216 }; 217 218 static int rdma_write_sg(struct rtrs_srv_op *id) 219 { 220 struct rtrs_sess *s = id->con->c.sess; 221 struct rtrs_srv_sess *sess = to_srv_sess(s); 222 dma_addr_t dma_addr = sess->dma_addr[id->msg_id]; 223 struct rtrs_srv_mr *srv_mr; 224 struct rtrs_srv *srv = sess->srv; 225 struct ib_send_wr inv_wr; 226 struct ib_rdma_wr imm_wr; 227 struct ib_rdma_wr *wr = NULL; 228 enum ib_send_flags flags; 229 size_t sg_cnt; 230 int err, offset; 231 bool need_inval; 232 u32 rkey = 0; 233 struct ib_reg_wr rwr; 234 struct ib_sge *plist; 235 struct ib_sge list; 236 237 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt); 238 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F; 239 if (unlikely(sg_cnt != 1)) 240 return -EINVAL; 241 242 offset = 0; 243 244 wr = &id->tx_wr; 245 plist = &id->tx_sg; 246 plist->addr = dma_addr + offset; 247 plist->length = le32_to_cpu(id->rd_msg->desc[0].len); 248 249 /* WR will fail with length error 250 * if this is 0 251 */ 252 if (unlikely(plist->length == 0)) { 253 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n"); 254 return -EINVAL; 255 } 256 257 plist->lkey = sess->s.dev->ib_pd->local_dma_lkey; 258 offset += plist->length; 259 260 wr->wr.sg_list = plist; 261 wr->wr.num_sge = 1; 262 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr); 263 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key); 264 if (rkey == 0) 265 rkey = wr->rkey; 266 else 267 /* Only one key is actually used */ 268 WARN_ON_ONCE(rkey != wr->rkey); 269 270 wr->wr.opcode = IB_WR_RDMA_WRITE; 271 wr->wr.wr_cqe = &io_comp_cqe; 272 wr->wr.ex.imm_data = 0; 273 wr->wr.send_flags = 0; 274 275 if (need_inval && always_invalidate) { 276 wr->wr.next = &rwr.wr; 277 rwr.wr.next = &inv_wr; 278 inv_wr.next = &imm_wr.wr; 279 } else if (always_invalidate) { 280 wr->wr.next = &rwr.wr; 281 rwr.wr.next = &imm_wr.wr; 282 } else if (need_inval) { 283 wr->wr.next = &inv_wr; 284 inv_wr.next = &imm_wr.wr; 285 } else { 286 wr->wr.next = &imm_wr.wr; 287 } 288 /* 289 * From time to time we have to post signaled sends, 290 * or send queue will fill up and only QP reset can help. 291 */ 292 flags = (atomic_inc_return(&id->con->wr_cnt) % srv->queue_depth) ? 293 0 : IB_SEND_SIGNALED; 294 295 if (need_inval) { 296 inv_wr.sg_list = NULL; 297 inv_wr.num_sge = 0; 298 inv_wr.opcode = IB_WR_SEND_WITH_INV; 299 inv_wr.wr_cqe = &io_comp_cqe; 300 inv_wr.send_flags = 0; 301 inv_wr.ex.invalidate_rkey = rkey; 302 } 303 304 imm_wr.wr.next = NULL; 305 if (always_invalidate) { 306 struct rtrs_msg_rkey_rsp *msg; 307 308 srv_mr = &sess->mrs[id->msg_id]; 309 rwr.wr.opcode = IB_WR_REG_MR; 310 rwr.wr.wr_cqe = &local_reg_cqe; 311 rwr.wr.num_sge = 0; 312 rwr.mr = srv_mr->mr; 313 rwr.wr.send_flags = 0; 314 rwr.key = srv_mr->mr->rkey; 315 rwr.access = (IB_ACCESS_LOCAL_WRITE | 316 IB_ACCESS_REMOTE_WRITE); 317 msg = srv_mr->iu->buf; 318 msg->buf_id = cpu_to_le16(id->msg_id); 319 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 320 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 321 322 list.addr = srv_mr->iu->dma_addr; 323 list.length = sizeof(*msg); 324 list.lkey = sess->s.dev->ib_pd->local_dma_lkey; 325 imm_wr.wr.sg_list = &list; 326 imm_wr.wr.num_sge = 1; 327 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 328 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, 329 srv_mr->iu->dma_addr, 330 srv_mr->iu->size, DMA_TO_DEVICE); 331 } else { 332 imm_wr.wr.sg_list = NULL; 333 imm_wr.wr.num_sge = 0; 334 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 335 } 336 imm_wr.wr.send_flags = flags; 337 imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id, 338 0, need_inval)); 339 340 imm_wr.wr.wr_cqe = &io_comp_cqe; 341 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, dma_addr, 342 offset, DMA_BIDIRECTIONAL); 343 344 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL); 345 if (unlikely(err)) 346 rtrs_err(s, 347 "Posting RDMA-Write-Request to QP failed, err: %d\n", 348 err); 349 350 return err; 351 } 352 353 /** 354 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE 355 * requests or on successful WRITE request. 356 * @con: the connection to send back result 357 * @id: the id associated with the IO 358 * @errno: the error number of the IO. 359 * 360 * Return 0 on success, errno otherwise. 361 */ 362 static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id, 363 int errno) 364 { 365 struct rtrs_sess *s = con->c.sess; 366 struct rtrs_srv_sess *sess = to_srv_sess(s); 367 struct ib_send_wr inv_wr, *wr = NULL; 368 struct ib_rdma_wr imm_wr; 369 struct ib_reg_wr rwr; 370 struct rtrs_srv *srv = sess->srv; 371 struct rtrs_srv_mr *srv_mr; 372 bool need_inval = false; 373 enum ib_send_flags flags; 374 u32 imm; 375 int err; 376 377 if (id->dir == READ) { 378 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg; 379 size_t sg_cnt; 380 381 need_inval = le16_to_cpu(rd_msg->flags) & 382 RTRS_MSG_NEED_INVAL_F; 383 sg_cnt = le16_to_cpu(rd_msg->sg_cnt); 384 385 if (need_inval) { 386 if (likely(sg_cnt)) { 387 inv_wr.wr_cqe = &io_comp_cqe; 388 inv_wr.sg_list = NULL; 389 inv_wr.num_sge = 0; 390 inv_wr.opcode = IB_WR_SEND_WITH_INV; 391 inv_wr.send_flags = 0; 392 /* Only one key is actually used */ 393 inv_wr.ex.invalidate_rkey = 394 le32_to_cpu(rd_msg->desc[0].key); 395 } else { 396 WARN_ON_ONCE(1); 397 need_inval = false; 398 } 399 } 400 } 401 402 if (need_inval && always_invalidate) { 403 wr = &inv_wr; 404 inv_wr.next = &rwr.wr; 405 rwr.wr.next = &imm_wr.wr; 406 } else if (always_invalidate) { 407 wr = &rwr.wr; 408 rwr.wr.next = &imm_wr.wr; 409 } else if (need_inval) { 410 wr = &inv_wr; 411 inv_wr.next = &imm_wr.wr; 412 } else { 413 wr = &imm_wr.wr; 414 } 415 /* 416 * From time to time we have to post signalled sends, 417 * or send queue will fill up and only QP reset can help. 418 */ 419 flags = (atomic_inc_return(&con->wr_cnt) % srv->queue_depth) ? 420 0 : IB_SEND_SIGNALED; 421 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval); 422 imm_wr.wr.next = NULL; 423 if (always_invalidate) { 424 struct ib_sge list; 425 struct rtrs_msg_rkey_rsp *msg; 426 427 srv_mr = &sess->mrs[id->msg_id]; 428 rwr.wr.next = &imm_wr.wr; 429 rwr.wr.opcode = IB_WR_REG_MR; 430 rwr.wr.wr_cqe = &local_reg_cqe; 431 rwr.wr.num_sge = 0; 432 rwr.wr.send_flags = 0; 433 rwr.mr = srv_mr->mr; 434 rwr.key = srv_mr->mr->rkey; 435 rwr.access = (IB_ACCESS_LOCAL_WRITE | 436 IB_ACCESS_REMOTE_WRITE); 437 msg = srv_mr->iu->buf; 438 msg->buf_id = cpu_to_le16(id->msg_id); 439 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 440 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 441 442 list.addr = srv_mr->iu->dma_addr; 443 list.length = sizeof(*msg); 444 list.lkey = sess->s.dev->ib_pd->local_dma_lkey; 445 imm_wr.wr.sg_list = &list; 446 imm_wr.wr.num_sge = 1; 447 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 448 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, 449 srv_mr->iu->dma_addr, 450 srv_mr->iu->size, DMA_TO_DEVICE); 451 } else { 452 imm_wr.wr.sg_list = NULL; 453 imm_wr.wr.num_sge = 0; 454 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 455 } 456 imm_wr.wr.send_flags = flags; 457 imm_wr.wr.wr_cqe = &io_comp_cqe; 458 459 imm_wr.wr.ex.imm_data = cpu_to_be32(imm); 460 461 err = ib_post_send(id->con->c.qp, wr, NULL); 462 if (unlikely(err)) 463 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n", 464 err); 465 466 return err; 467 } 468 469 void close_sess(struct rtrs_srv_sess *sess) 470 { 471 if (rtrs_srv_change_state(sess, RTRS_SRV_CLOSING)) 472 queue_work(rtrs_wq, &sess->close_work); 473 WARN_ON(sess->state != RTRS_SRV_CLOSING); 474 } 475 476 static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state) 477 { 478 switch (state) { 479 case RTRS_SRV_CONNECTING: 480 return "RTRS_SRV_CONNECTING"; 481 case RTRS_SRV_CONNECTED: 482 return "RTRS_SRV_CONNECTED"; 483 case RTRS_SRV_CLOSING: 484 return "RTRS_SRV_CLOSING"; 485 case RTRS_SRV_CLOSED: 486 return "RTRS_SRV_CLOSED"; 487 default: 488 return "UNKNOWN"; 489 } 490 } 491 492 /** 493 * rtrs_srv_resp_rdma() - Finish an RDMA request 494 * 495 * @id: Internal RTRS operation identifier 496 * @status: Response Code sent to the other side for this operation. 497 * 0 = success, <=0 error 498 * Context: any 499 * 500 * Finish a RDMA operation. A message is sent to the client and the 501 * corresponding memory areas will be released. 502 */ 503 bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status) 504 { 505 struct rtrs_srv_sess *sess; 506 struct rtrs_srv_con *con; 507 struct rtrs_sess *s; 508 int err; 509 510 if (WARN_ON(!id)) 511 return true; 512 513 con = id->con; 514 s = con->c.sess; 515 sess = to_srv_sess(s); 516 517 id->status = status; 518 519 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) { 520 rtrs_err_rl(s, 521 "Sending I/O response failed, session is disconnected, sess state %s\n", 522 rtrs_srv_state_str(sess->state)); 523 goto out; 524 } 525 if (always_invalidate) { 526 struct rtrs_srv_mr *mr = &sess->mrs[id->msg_id]; 527 528 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey)); 529 } 530 if (unlikely(atomic_sub_return(1, 531 &con->sq_wr_avail) < 0)) { 532 pr_err("IB send queue full\n"); 533 atomic_add(1, &con->sq_wr_avail); 534 spin_lock(&con->rsp_wr_wait_lock); 535 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list); 536 spin_unlock(&con->rsp_wr_wait_lock); 537 return false; 538 } 539 540 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt) 541 err = send_io_resp_imm(con, id, status); 542 else 543 err = rdma_write_sg(id); 544 545 if (unlikely(err)) { 546 rtrs_err_rl(s, "IO response failed: %d\n", err); 547 close_sess(sess); 548 } 549 out: 550 rtrs_srv_put_ops_ids(sess); 551 return true; 552 } 553 EXPORT_SYMBOL(rtrs_srv_resp_rdma); 554 555 /** 556 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv. 557 * @srv: Session pointer 558 * @priv: The private pointer that is associated with the session. 559 */ 560 void rtrs_srv_set_sess_priv(struct rtrs_srv *srv, void *priv) 561 { 562 srv->priv = priv; 563 } 564 EXPORT_SYMBOL(rtrs_srv_set_sess_priv); 565 566 static void unmap_cont_bufs(struct rtrs_srv_sess *sess) 567 { 568 int i; 569 570 for (i = 0; i < sess->mrs_num; i++) { 571 struct rtrs_srv_mr *srv_mr; 572 573 srv_mr = &sess->mrs[i]; 574 rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1); 575 ib_dereg_mr(srv_mr->mr); 576 ib_dma_unmap_sg(sess->s.dev->ib_dev, srv_mr->sgt.sgl, 577 srv_mr->sgt.nents, DMA_BIDIRECTIONAL); 578 sg_free_table(&srv_mr->sgt); 579 } 580 kfree(sess->mrs); 581 } 582 583 static int map_cont_bufs(struct rtrs_srv_sess *sess) 584 { 585 struct rtrs_srv *srv = sess->srv; 586 struct rtrs_sess *ss = &sess->s; 587 int i, mri, err, mrs_num; 588 unsigned int chunk_bits; 589 int chunks_per_mr = 1; 590 591 /* 592 * Here we map queue_depth chunks to MR. Firstly we have to 593 * figure out how many chunks can we map per MR. 594 */ 595 if (always_invalidate) { 596 /* 597 * in order to do invalidate for each chunks of memory, we needs 598 * more memory regions. 599 */ 600 mrs_num = srv->queue_depth; 601 } else { 602 chunks_per_mr = 603 sess->s.dev->ib_dev->attrs.max_fast_reg_page_list_len; 604 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr); 605 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num); 606 } 607 608 sess->mrs = kcalloc(mrs_num, sizeof(*sess->mrs), GFP_KERNEL); 609 if (!sess->mrs) 610 return -ENOMEM; 611 612 sess->mrs_num = mrs_num; 613 614 for (mri = 0; mri < mrs_num; mri++) { 615 struct rtrs_srv_mr *srv_mr = &sess->mrs[mri]; 616 struct sg_table *sgt = &srv_mr->sgt; 617 struct scatterlist *s; 618 struct ib_mr *mr; 619 int nr, chunks; 620 621 chunks = chunks_per_mr * mri; 622 if (!always_invalidate) 623 chunks_per_mr = min_t(int, chunks_per_mr, 624 srv->queue_depth - chunks); 625 626 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL); 627 if (err) 628 goto err; 629 630 for_each_sg(sgt->sgl, s, chunks_per_mr, i) 631 sg_set_page(s, srv->chunks[chunks + i], 632 max_chunk_size, 0); 633 634 nr = ib_dma_map_sg(sess->s.dev->ib_dev, sgt->sgl, 635 sgt->nents, DMA_BIDIRECTIONAL); 636 if (nr < sgt->nents) { 637 err = nr < 0 ? nr : -EINVAL; 638 goto free_sg; 639 } 640 mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG, 641 sgt->nents); 642 if (IS_ERR(mr)) { 643 err = PTR_ERR(mr); 644 goto unmap_sg; 645 } 646 nr = ib_map_mr_sg(mr, sgt->sgl, sgt->nents, 647 NULL, max_chunk_size); 648 if (nr < 0 || nr < sgt->nents) { 649 err = nr < 0 ? nr : -EINVAL; 650 goto dereg_mr; 651 } 652 653 if (always_invalidate) { 654 srv_mr->iu = rtrs_iu_alloc(1, 655 sizeof(struct rtrs_msg_rkey_rsp), 656 GFP_KERNEL, sess->s.dev->ib_dev, 657 DMA_TO_DEVICE, rtrs_srv_rdma_done); 658 if (!srv_mr->iu) { 659 err = -ENOMEM; 660 rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err); 661 goto dereg_mr; 662 } 663 } 664 /* Eventually dma addr for each chunk can be cached */ 665 for_each_sg(sgt->sgl, s, sgt->orig_nents, i) 666 sess->dma_addr[chunks + i] = sg_dma_address(s); 667 668 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey)); 669 srv_mr->mr = mr; 670 671 continue; 672 err: 673 while (mri--) { 674 srv_mr = &sess->mrs[mri]; 675 sgt = &srv_mr->sgt; 676 mr = srv_mr->mr; 677 rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1); 678 dereg_mr: 679 ib_dereg_mr(mr); 680 unmap_sg: 681 ib_dma_unmap_sg(sess->s.dev->ib_dev, sgt->sgl, 682 sgt->nents, DMA_BIDIRECTIONAL); 683 free_sg: 684 sg_free_table(sgt); 685 } 686 kfree(sess->mrs); 687 688 return err; 689 } 690 691 chunk_bits = ilog2(srv->queue_depth - 1) + 1; 692 sess->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits); 693 694 return 0; 695 } 696 697 static void rtrs_srv_hb_err_handler(struct rtrs_con *c) 698 { 699 close_sess(to_srv_sess(c->sess)); 700 } 701 702 static void rtrs_srv_init_hb(struct rtrs_srv_sess *sess) 703 { 704 rtrs_init_hb(&sess->s, &io_comp_cqe, 705 RTRS_HB_INTERVAL_MS, 706 RTRS_HB_MISSED_MAX, 707 rtrs_srv_hb_err_handler, 708 rtrs_wq); 709 } 710 711 static void rtrs_srv_start_hb(struct rtrs_srv_sess *sess) 712 { 713 rtrs_start_hb(&sess->s); 714 } 715 716 static void rtrs_srv_stop_hb(struct rtrs_srv_sess *sess) 717 { 718 rtrs_stop_hb(&sess->s); 719 } 720 721 static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc) 722 { 723 struct rtrs_srv_con *con = cq->cq_context; 724 struct rtrs_sess *s = con->c.sess; 725 struct rtrs_srv_sess *sess = to_srv_sess(s); 726 struct rtrs_iu *iu; 727 728 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 729 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 730 731 if (unlikely(wc->status != IB_WC_SUCCESS)) { 732 rtrs_err(s, "Sess info response send failed: %s\n", 733 ib_wc_status_msg(wc->status)); 734 close_sess(sess); 735 return; 736 } 737 WARN_ON(wc->opcode != IB_WC_SEND); 738 } 739 740 static void rtrs_srv_sess_up(struct rtrs_srv_sess *sess) 741 { 742 struct rtrs_srv *srv = sess->srv; 743 struct rtrs_srv_ctx *ctx = srv->ctx; 744 int up; 745 746 mutex_lock(&srv->paths_ev_mutex); 747 up = ++srv->paths_up; 748 if (up == 1) 749 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL); 750 mutex_unlock(&srv->paths_ev_mutex); 751 752 /* Mark session as established */ 753 sess->established = true; 754 } 755 756 static void rtrs_srv_sess_down(struct rtrs_srv_sess *sess) 757 { 758 struct rtrs_srv *srv = sess->srv; 759 struct rtrs_srv_ctx *ctx = srv->ctx; 760 761 if (!sess->established) 762 return; 763 764 sess->established = false; 765 mutex_lock(&srv->paths_ev_mutex); 766 WARN_ON(!srv->paths_up); 767 if (--srv->paths_up == 0) 768 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv); 769 mutex_unlock(&srv->paths_ev_mutex); 770 } 771 772 static int post_recv_sess(struct rtrs_srv_sess *sess); 773 774 static int process_info_req(struct rtrs_srv_con *con, 775 struct rtrs_msg_info_req *msg) 776 { 777 struct rtrs_sess *s = con->c.sess; 778 struct rtrs_srv_sess *sess = to_srv_sess(s); 779 struct ib_send_wr *reg_wr = NULL; 780 struct rtrs_msg_info_rsp *rsp; 781 struct rtrs_iu *tx_iu; 782 struct ib_reg_wr *rwr; 783 int mri, err; 784 size_t tx_sz; 785 786 err = post_recv_sess(sess); 787 if (unlikely(err)) { 788 rtrs_err(s, "post_recv_sess(), err: %d\n", err); 789 return err; 790 } 791 rwr = kcalloc(sess->mrs_num, sizeof(*rwr), GFP_KERNEL); 792 if (unlikely(!rwr)) 793 return -ENOMEM; 794 strlcpy(sess->s.sessname, msg->sessname, sizeof(sess->s.sessname)); 795 796 tx_sz = sizeof(*rsp); 797 tx_sz += sizeof(rsp->desc[0]) * sess->mrs_num; 798 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, sess->s.dev->ib_dev, 799 DMA_TO_DEVICE, rtrs_srv_info_rsp_done); 800 if (unlikely(!tx_iu)) { 801 err = -ENOMEM; 802 goto rwr_free; 803 } 804 805 rsp = tx_iu->buf; 806 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP); 807 rsp->sg_cnt = cpu_to_le16(sess->mrs_num); 808 809 for (mri = 0; mri < sess->mrs_num; mri++) { 810 struct ib_mr *mr = sess->mrs[mri].mr; 811 812 rsp->desc[mri].addr = cpu_to_le64(mr->iova); 813 rsp->desc[mri].key = cpu_to_le32(mr->rkey); 814 rsp->desc[mri].len = cpu_to_le32(mr->length); 815 816 /* 817 * Fill in reg MR request and chain them *backwards* 818 */ 819 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL; 820 rwr[mri].wr.opcode = IB_WR_REG_MR; 821 rwr[mri].wr.wr_cqe = &local_reg_cqe; 822 rwr[mri].wr.num_sge = 0; 823 rwr[mri].wr.send_flags = 0; 824 rwr[mri].mr = mr; 825 rwr[mri].key = mr->rkey; 826 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE | 827 IB_ACCESS_REMOTE_WRITE); 828 reg_wr = &rwr[mri].wr; 829 } 830 831 err = rtrs_srv_create_sess_files(sess); 832 if (unlikely(err)) 833 goto iu_free; 834 kobject_get(&sess->kobj); 835 get_device(&sess->srv->dev); 836 rtrs_srv_change_state(sess, RTRS_SRV_CONNECTED); 837 rtrs_srv_start_hb(sess); 838 839 /* 840 * We do not account number of established connections at the current 841 * moment, we rely on the client, which should send info request when 842 * all connections are successfully established. Thus, simply notify 843 * listener with a proper event if we are the first path. 844 */ 845 rtrs_srv_sess_up(sess); 846 847 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr, 848 tx_iu->size, DMA_TO_DEVICE); 849 850 /* Send info response */ 851 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr); 852 if (unlikely(err)) { 853 rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err); 854 iu_free: 855 rtrs_iu_free(tx_iu, sess->s.dev->ib_dev, 1); 856 } 857 rwr_free: 858 kfree(rwr); 859 860 return err; 861 } 862 863 static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc) 864 { 865 struct rtrs_srv_con *con = cq->cq_context; 866 struct rtrs_sess *s = con->c.sess; 867 struct rtrs_srv_sess *sess = to_srv_sess(s); 868 struct rtrs_msg_info_req *msg; 869 struct rtrs_iu *iu; 870 int err; 871 872 WARN_ON(con->c.cid); 873 874 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 875 if (unlikely(wc->status != IB_WC_SUCCESS)) { 876 rtrs_err(s, "Sess info request receive failed: %s\n", 877 ib_wc_status_msg(wc->status)); 878 goto close; 879 } 880 WARN_ON(wc->opcode != IB_WC_RECV); 881 882 if (unlikely(wc->byte_len < sizeof(*msg))) { 883 rtrs_err(s, "Sess info request is malformed: size %d\n", 884 wc->byte_len); 885 goto close; 886 } 887 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr, 888 iu->size, DMA_FROM_DEVICE); 889 msg = iu->buf; 890 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ)) { 891 rtrs_err(s, "Sess info request is malformed: type %d\n", 892 le16_to_cpu(msg->type)); 893 goto close; 894 } 895 err = process_info_req(con, msg); 896 if (unlikely(err)) 897 goto close; 898 899 out: 900 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 901 return; 902 close: 903 close_sess(sess); 904 goto out; 905 } 906 907 static int post_recv_info_req(struct rtrs_srv_con *con) 908 { 909 struct rtrs_sess *s = con->c.sess; 910 struct rtrs_srv_sess *sess = to_srv_sess(s); 911 struct rtrs_iu *rx_iu; 912 int err; 913 914 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), 915 GFP_KERNEL, sess->s.dev->ib_dev, 916 DMA_FROM_DEVICE, rtrs_srv_info_req_done); 917 if (unlikely(!rx_iu)) 918 return -ENOMEM; 919 /* Prepare for getting info response */ 920 err = rtrs_iu_post_recv(&con->c, rx_iu); 921 if (unlikely(err)) { 922 rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err); 923 rtrs_iu_free(rx_iu, sess->s.dev->ib_dev, 1); 924 return err; 925 } 926 927 return 0; 928 } 929 930 static int post_recv_io(struct rtrs_srv_con *con, size_t q_size) 931 { 932 int i, err; 933 934 for (i = 0; i < q_size; i++) { 935 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 936 if (unlikely(err)) 937 return err; 938 } 939 940 return 0; 941 } 942 943 static int post_recv_sess(struct rtrs_srv_sess *sess) 944 { 945 struct rtrs_srv *srv = sess->srv; 946 struct rtrs_sess *s = &sess->s; 947 size_t q_size; 948 int err, cid; 949 950 for (cid = 0; cid < sess->s.con_num; cid++) { 951 if (cid == 0) 952 q_size = SERVICE_CON_QUEUE_DEPTH; 953 else 954 q_size = srv->queue_depth; 955 956 err = post_recv_io(to_srv_con(sess->s.con[cid]), q_size); 957 if (unlikely(err)) { 958 rtrs_err(s, "post_recv_io(), err: %d\n", err); 959 return err; 960 } 961 } 962 963 return 0; 964 } 965 966 static void process_read(struct rtrs_srv_con *con, 967 struct rtrs_msg_rdma_read *msg, 968 u32 buf_id, u32 off) 969 { 970 struct rtrs_sess *s = con->c.sess; 971 struct rtrs_srv_sess *sess = to_srv_sess(s); 972 struct rtrs_srv *srv = sess->srv; 973 struct rtrs_srv_ctx *ctx = srv->ctx; 974 struct rtrs_srv_op *id; 975 976 size_t usr_len, data_len; 977 void *data; 978 int ret; 979 980 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) { 981 rtrs_err_rl(s, 982 "Processing read request failed, session is disconnected, sess state %s\n", 983 rtrs_srv_state_str(sess->state)); 984 return; 985 } 986 if (unlikely(msg->sg_cnt != 1 && msg->sg_cnt != 0)) { 987 rtrs_err_rl(s, 988 "Processing read request failed, invalid message\n"); 989 return; 990 } 991 rtrs_srv_get_ops_ids(sess); 992 rtrs_srv_update_rdma_stats(sess->stats, off, READ); 993 id = sess->ops_ids[buf_id]; 994 id->con = con; 995 id->dir = READ; 996 id->msg_id = buf_id; 997 id->rd_msg = msg; 998 usr_len = le16_to_cpu(msg->usr_len); 999 data_len = off - usr_len; 1000 data = page_address(srv->chunks[buf_id]); 1001 ret = ctx->ops.rdma_ev(srv, srv->priv, id, READ, data, data_len, 1002 data + data_len, usr_len); 1003 1004 if (unlikely(ret)) { 1005 rtrs_err_rl(s, 1006 "Processing read request failed, user module cb reported for msg_id %d, err: %d\n", 1007 buf_id, ret); 1008 goto send_err_msg; 1009 } 1010 1011 return; 1012 1013 send_err_msg: 1014 ret = send_io_resp_imm(con, id, ret); 1015 if (ret < 0) { 1016 rtrs_err_rl(s, 1017 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n", 1018 buf_id, ret); 1019 close_sess(sess); 1020 } 1021 rtrs_srv_put_ops_ids(sess); 1022 } 1023 1024 static void process_write(struct rtrs_srv_con *con, 1025 struct rtrs_msg_rdma_write *req, 1026 u32 buf_id, u32 off) 1027 { 1028 struct rtrs_sess *s = con->c.sess; 1029 struct rtrs_srv_sess *sess = to_srv_sess(s); 1030 struct rtrs_srv *srv = sess->srv; 1031 struct rtrs_srv_ctx *ctx = srv->ctx; 1032 struct rtrs_srv_op *id; 1033 1034 size_t data_len, usr_len; 1035 void *data; 1036 int ret; 1037 1038 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) { 1039 rtrs_err_rl(s, 1040 "Processing write request failed, session is disconnected, sess state %s\n", 1041 rtrs_srv_state_str(sess->state)); 1042 return; 1043 } 1044 rtrs_srv_get_ops_ids(sess); 1045 rtrs_srv_update_rdma_stats(sess->stats, off, WRITE); 1046 id = sess->ops_ids[buf_id]; 1047 id->con = con; 1048 id->dir = WRITE; 1049 id->msg_id = buf_id; 1050 1051 usr_len = le16_to_cpu(req->usr_len); 1052 data_len = off - usr_len; 1053 data = page_address(srv->chunks[buf_id]); 1054 ret = ctx->ops.rdma_ev(srv, srv->priv, id, WRITE, data, data_len, 1055 data + data_len, usr_len); 1056 if (unlikely(ret)) { 1057 rtrs_err_rl(s, 1058 "Processing write request failed, user module callback reports err: %d\n", 1059 ret); 1060 goto send_err_msg; 1061 } 1062 1063 return; 1064 1065 send_err_msg: 1066 ret = send_io_resp_imm(con, id, ret); 1067 if (ret < 0) { 1068 rtrs_err_rl(s, 1069 "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n", 1070 buf_id, ret); 1071 close_sess(sess); 1072 } 1073 rtrs_srv_put_ops_ids(sess); 1074 } 1075 1076 static void process_io_req(struct rtrs_srv_con *con, void *msg, 1077 u32 id, u32 off) 1078 { 1079 struct rtrs_sess *s = con->c.sess; 1080 struct rtrs_srv_sess *sess = to_srv_sess(s); 1081 struct rtrs_msg_rdma_hdr *hdr; 1082 unsigned int type; 1083 1084 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, sess->dma_addr[id], 1085 max_chunk_size, DMA_BIDIRECTIONAL); 1086 hdr = msg; 1087 type = le16_to_cpu(hdr->type); 1088 1089 switch (type) { 1090 case RTRS_MSG_WRITE: 1091 process_write(con, msg, id, off); 1092 break; 1093 case RTRS_MSG_READ: 1094 process_read(con, msg, id, off); 1095 break; 1096 default: 1097 rtrs_err(s, 1098 "Processing I/O request failed, unknown message type received: 0x%02x\n", 1099 type); 1100 goto err; 1101 } 1102 1103 return; 1104 1105 err: 1106 close_sess(sess); 1107 } 1108 1109 static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc) 1110 { 1111 struct rtrs_srv_mr *mr = 1112 container_of(wc->wr_cqe, typeof(*mr), inv_cqe); 1113 struct rtrs_srv_con *con = cq->cq_context; 1114 struct rtrs_sess *s = con->c.sess; 1115 struct rtrs_srv_sess *sess = to_srv_sess(s); 1116 struct rtrs_srv *srv = sess->srv; 1117 u32 msg_id, off; 1118 void *data; 1119 1120 if (unlikely(wc->status != IB_WC_SUCCESS)) { 1121 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n", 1122 ib_wc_status_msg(wc->status)); 1123 close_sess(sess); 1124 } 1125 msg_id = mr->msg_id; 1126 off = mr->msg_off; 1127 data = page_address(srv->chunks[msg_id]) + off; 1128 process_io_req(con, data, msg_id, off); 1129 } 1130 1131 static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con, 1132 struct rtrs_srv_mr *mr) 1133 { 1134 struct ib_send_wr wr = { 1135 .opcode = IB_WR_LOCAL_INV, 1136 .wr_cqe = &mr->inv_cqe, 1137 .send_flags = IB_SEND_SIGNALED, 1138 .ex.invalidate_rkey = mr->mr->rkey, 1139 }; 1140 mr->inv_cqe.done = rtrs_srv_inv_rkey_done; 1141 1142 return ib_post_send(con->c.qp, &wr, NULL); 1143 } 1144 1145 static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con) 1146 { 1147 spin_lock(&con->rsp_wr_wait_lock); 1148 while (!list_empty(&con->rsp_wr_wait_list)) { 1149 struct rtrs_srv_op *id; 1150 int ret; 1151 1152 id = list_entry(con->rsp_wr_wait_list.next, 1153 struct rtrs_srv_op, wait_list); 1154 list_del(&id->wait_list); 1155 1156 spin_unlock(&con->rsp_wr_wait_lock); 1157 ret = rtrs_srv_resp_rdma(id, id->status); 1158 spin_lock(&con->rsp_wr_wait_lock); 1159 1160 if (!ret) { 1161 list_add(&id->wait_list, &con->rsp_wr_wait_list); 1162 break; 1163 } 1164 } 1165 spin_unlock(&con->rsp_wr_wait_lock); 1166 } 1167 1168 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc) 1169 { 1170 struct rtrs_srv_con *con = cq->cq_context; 1171 struct rtrs_sess *s = con->c.sess; 1172 struct rtrs_srv_sess *sess = to_srv_sess(s); 1173 struct rtrs_srv *srv = sess->srv; 1174 u32 imm_type, imm_payload; 1175 int err; 1176 1177 if (unlikely(wc->status != IB_WC_SUCCESS)) { 1178 if (wc->status != IB_WC_WR_FLUSH_ERR) { 1179 rtrs_err(s, 1180 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n", 1181 ib_wc_status_msg(wc->status), wc->wr_cqe, 1182 wc->opcode, wc->vendor_err, wc->byte_len); 1183 close_sess(sess); 1184 } 1185 return; 1186 } 1187 1188 switch (wc->opcode) { 1189 case IB_WC_RECV_RDMA_WITH_IMM: 1190 /* 1191 * post_recv() RDMA write completions of IO reqs (read/write) 1192 * and hb 1193 */ 1194 if (WARN_ON(wc->wr_cqe != &io_comp_cqe)) 1195 return; 1196 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 1197 if (unlikely(err)) { 1198 rtrs_err(s, "rtrs_post_recv(), err: %d\n", err); 1199 close_sess(sess); 1200 break; 1201 } 1202 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), 1203 &imm_type, &imm_payload); 1204 if (likely(imm_type == RTRS_IO_REQ_IMM)) { 1205 u32 msg_id, off; 1206 void *data; 1207 1208 msg_id = imm_payload >> sess->mem_bits; 1209 off = imm_payload & ((1 << sess->mem_bits) - 1); 1210 if (unlikely(msg_id >= srv->queue_depth || 1211 off >= max_chunk_size)) { 1212 rtrs_err(s, "Wrong msg_id %u, off %u\n", 1213 msg_id, off); 1214 close_sess(sess); 1215 return; 1216 } 1217 if (always_invalidate) { 1218 struct rtrs_srv_mr *mr = &sess->mrs[msg_id]; 1219 1220 mr->msg_off = off; 1221 mr->msg_id = msg_id; 1222 err = rtrs_srv_inv_rkey(con, mr); 1223 if (unlikely(err)) { 1224 rtrs_err(s, "rtrs_post_recv(), err: %d\n", 1225 err); 1226 close_sess(sess); 1227 break; 1228 } 1229 } else { 1230 data = page_address(srv->chunks[msg_id]) + off; 1231 process_io_req(con, data, msg_id, off); 1232 } 1233 } else if (imm_type == RTRS_HB_MSG_IMM) { 1234 WARN_ON(con->c.cid); 1235 rtrs_send_hb_ack(&sess->s); 1236 } else if (imm_type == RTRS_HB_ACK_IMM) { 1237 WARN_ON(con->c.cid); 1238 sess->s.hb_missed_cnt = 0; 1239 } else { 1240 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type); 1241 } 1242 break; 1243 case IB_WC_RDMA_WRITE: 1244 case IB_WC_SEND: 1245 /* 1246 * post_send() RDMA write completions of IO reqs (read/write) 1247 */ 1248 atomic_add(srv->queue_depth, &con->sq_wr_avail); 1249 1250 if (unlikely(!list_empty_careful(&con->rsp_wr_wait_list))) 1251 rtrs_rdma_process_wr_wait_list(con); 1252 1253 break; 1254 default: 1255 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode); 1256 return; 1257 } 1258 } 1259 1260 /** 1261 * rtrs_srv_get_sess_name() - Get rtrs_srv peer hostname. 1262 * @srv: Session 1263 * @sessname: Sessname buffer 1264 * @len: Length of sessname buffer 1265 */ 1266 int rtrs_srv_get_sess_name(struct rtrs_srv *srv, char *sessname, size_t len) 1267 { 1268 struct rtrs_srv_sess *sess; 1269 int err = -ENOTCONN; 1270 1271 mutex_lock(&srv->paths_mutex); 1272 list_for_each_entry(sess, &srv->paths_list, s.entry) { 1273 if (sess->state != RTRS_SRV_CONNECTED) 1274 continue; 1275 strlcpy(sessname, sess->s.sessname, 1276 min_t(size_t, sizeof(sess->s.sessname), len)); 1277 err = 0; 1278 break; 1279 } 1280 mutex_unlock(&srv->paths_mutex); 1281 1282 return err; 1283 } 1284 EXPORT_SYMBOL(rtrs_srv_get_sess_name); 1285 1286 /** 1287 * rtrs_srv_get_sess_qdepth() - Get rtrs_srv qdepth. 1288 * @srv: Session 1289 */ 1290 int rtrs_srv_get_queue_depth(struct rtrs_srv *srv) 1291 { 1292 return srv->queue_depth; 1293 } 1294 EXPORT_SYMBOL(rtrs_srv_get_queue_depth); 1295 1296 static int find_next_bit_ring(struct rtrs_srv_sess *sess) 1297 { 1298 struct ib_device *ib_dev = sess->s.dev->ib_dev; 1299 int v; 1300 1301 v = cpumask_next(sess->cur_cq_vector, &cq_affinity_mask); 1302 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors) 1303 v = cpumask_first(&cq_affinity_mask); 1304 return v; 1305 } 1306 1307 static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_sess *sess) 1308 { 1309 sess->cur_cq_vector = find_next_bit_ring(sess); 1310 1311 return sess->cur_cq_vector; 1312 } 1313 1314 static void rtrs_srv_dev_release(struct device *dev) 1315 { 1316 struct rtrs_srv *srv = container_of(dev, struct rtrs_srv, dev); 1317 1318 kfree(srv); 1319 } 1320 1321 static void free_srv(struct rtrs_srv *srv) 1322 { 1323 int i; 1324 1325 WARN_ON(refcount_read(&srv->refcount)); 1326 for (i = 0; i < srv->queue_depth; i++) 1327 mempool_free(srv->chunks[i], chunk_pool); 1328 kfree(srv->chunks); 1329 mutex_destroy(&srv->paths_mutex); 1330 mutex_destroy(&srv->paths_ev_mutex); 1331 /* last put to release the srv structure */ 1332 put_device(&srv->dev); 1333 } 1334 1335 static struct rtrs_srv *get_or_create_srv(struct rtrs_srv_ctx *ctx, 1336 const uuid_t *paths_uuid, 1337 bool first_conn) 1338 { 1339 struct rtrs_srv *srv; 1340 int i; 1341 1342 mutex_lock(&ctx->srv_mutex); 1343 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 1344 if (uuid_equal(&srv->paths_uuid, paths_uuid) && 1345 refcount_inc_not_zero(&srv->refcount)) { 1346 mutex_unlock(&ctx->srv_mutex); 1347 return srv; 1348 } 1349 } 1350 mutex_unlock(&ctx->srv_mutex); 1351 /* 1352 * If this request is not the first connection request from the 1353 * client for this session then fail and return error. 1354 */ 1355 if (!first_conn) 1356 return ERR_PTR(-ENXIO); 1357 1358 /* need to allocate a new srv */ 1359 srv = kzalloc(sizeof(*srv), GFP_KERNEL); 1360 if (!srv) 1361 return ERR_PTR(-ENOMEM); 1362 1363 INIT_LIST_HEAD(&srv->paths_list); 1364 mutex_init(&srv->paths_mutex); 1365 mutex_init(&srv->paths_ev_mutex); 1366 uuid_copy(&srv->paths_uuid, paths_uuid); 1367 srv->queue_depth = sess_queue_depth; 1368 srv->ctx = ctx; 1369 device_initialize(&srv->dev); 1370 srv->dev.release = rtrs_srv_dev_release; 1371 1372 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks), 1373 GFP_KERNEL); 1374 if (!srv->chunks) 1375 goto err_free_srv; 1376 1377 for (i = 0; i < srv->queue_depth; i++) { 1378 srv->chunks[i] = mempool_alloc(chunk_pool, GFP_KERNEL); 1379 if (!srv->chunks[i]) 1380 goto err_free_chunks; 1381 } 1382 refcount_set(&srv->refcount, 1); 1383 mutex_lock(&ctx->srv_mutex); 1384 list_add(&srv->ctx_list, &ctx->srv_list); 1385 mutex_unlock(&ctx->srv_mutex); 1386 1387 return srv; 1388 1389 err_free_chunks: 1390 while (i--) 1391 mempool_free(srv->chunks[i], chunk_pool); 1392 kfree(srv->chunks); 1393 1394 err_free_srv: 1395 kfree(srv); 1396 return ERR_PTR(-ENOMEM); 1397 } 1398 1399 static void put_srv(struct rtrs_srv *srv) 1400 { 1401 if (refcount_dec_and_test(&srv->refcount)) { 1402 struct rtrs_srv_ctx *ctx = srv->ctx; 1403 1404 WARN_ON(srv->dev.kobj.state_in_sysfs); 1405 1406 mutex_lock(&ctx->srv_mutex); 1407 list_del(&srv->ctx_list); 1408 mutex_unlock(&ctx->srv_mutex); 1409 free_srv(srv); 1410 } 1411 } 1412 1413 static void __add_path_to_srv(struct rtrs_srv *srv, 1414 struct rtrs_srv_sess *sess) 1415 { 1416 list_add_tail(&sess->s.entry, &srv->paths_list); 1417 srv->paths_num++; 1418 WARN_ON(srv->paths_num >= MAX_PATHS_NUM); 1419 } 1420 1421 static void del_path_from_srv(struct rtrs_srv_sess *sess) 1422 { 1423 struct rtrs_srv *srv = sess->srv; 1424 1425 if (WARN_ON(!srv)) 1426 return; 1427 1428 mutex_lock(&srv->paths_mutex); 1429 list_del(&sess->s.entry); 1430 WARN_ON(!srv->paths_num); 1431 srv->paths_num--; 1432 mutex_unlock(&srv->paths_mutex); 1433 } 1434 1435 /* return true if addresses are the same, error other wise */ 1436 static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b) 1437 { 1438 switch (a->sa_family) { 1439 case AF_IB: 1440 return memcmp(&((struct sockaddr_ib *)a)->sib_addr, 1441 &((struct sockaddr_ib *)b)->sib_addr, 1442 sizeof(struct ib_addr)) && 1443 (b->sa_family == AF_IB); 1444 case AF_INET: 1445 return memcmp(&((struct sockaddr_in *)a)->sin_addr, 1446 &((struct sockaddr_in *)b)->sin_addr, 1447 sizeof(struct in_addr)) && 1448 (b->sa_family == AF_INET); 1449 case AF_INET6: 1450 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr, 1451 &((struct sockaddr_in6 *)b)->sin6_addr, 1452 sizeof(struct in6_addr)) && 1453 (b->sa_family == AF_INET6); 1454 default: 1455 return -ENOENT; 1456 } 1457 } 1458 1459 static bool __is_path_w_addr_exists(struct rtrs_srv *srv, 1460 struct rdma_addr *addr) 1461 { 1462 struct rtrs_srv_sess *sess; 1463 1464 list_for_each_entry(sess, &srv->paths_list, s.entry) 1465 if (!sockaddr_cmp((struct sockaddr *)&sess->s.dst_addr, 1466 (struct sockaddr *)&addr->dst_addr) && 1467 !sockaddr_cmp((struct sockaddr *)&sess->s.src_addr, 1468 (struct sockaddr *)&addr->src_addr)) 1469 return true; 1470 1471 return false; 1472 } 1473 1474 static void free_sess(struct rtrs_srv_sess *sess) 1475 { 1476 if (sess->kobj.state_in_sysfs) { 1477 kobject_del(&sess->kobj); 1478 kobject_put(&sess->kobj); 1479 } else { 1480 kfree(sess); 1481 } 1482 } 1483 1484 static void rtrs_srv_close_work(struct work_struct *work) 1485 { 1486 struct rtrs_srv_sess *sess; 1487 struct rtrs_srv_con *con; 1488 int i; 1489 1490 sess = container_of(work, typeof(*sess), close_work); 1491 1492 rtrs_srv_destroy_sess_files(sess); 1493 rtrs_srv_stop_hb(sess); 1494 1495 for (i = 0; i < sess->s.con_num; i++) { 1496 if (!sess->s.con[i]) 1497 continue; 1498 con = to_srv_con(sess->s.con[i]); 1499 rdma_disconnect(con->c.cm_id); 1500 ib_drain_qp(con->c.qp); 1501 } 1502 /* Wait for all inflights */ 1503 rtrs_srv_wait_ops_ids(sess); 1504 1505 /* Notify upper layer if we are the last path */ 1506 rtrs_srv_sess_down(sess); 1507 1508 unmap_cont_bufs(sess); 1509 rtrs_srv_free_ops_ids(sess); 1510 1511 for (i = 0; i < sess->s.con_num; i++) { 1512 if (!sess->s.con[i]) 1513 continue; 1514 con = to_srv_con(sess->s.con[i]); 1515 rtrs_cq_qp_destroy(&con->c); 1516 rdma_destroy_id(con->c.cm_id); 1517 kfree(con); 1518 } 1519 rtrs_ib_dev_put(sess->s.dev); 1520 1521 del_path_from_srv(sess); 1522 put_srv(sess->srv); 1523 sess->srv = NULL; 1524 rtrs_srv_change_state(sess, RTRS_SRV_CLOSED); 1525 1526 kfree(sess->dma_addr); 1527 kfree(sess->s.con); 1528 free_sess(sess); 1529 } 1530 1531 static int rtrs_rdma_do_accept(struct rtrs_srv_sess *sess, 1532 struct rdma_cm_id *cm_id) 1533 { 1534 struct rtrs_srv *srv = sess->srv; 1535 struct rtrs_msg_conn_rsp msg; 1536 struct rdma_conn_param param; 1537 int err; 1538 1539 param = (struct rdma_conn_param) { 1540 .rnr_retry_count = 7, 1541 .private_data = &msg, 1542 .private_data_len = sizeof(msg), 1543 }; 1544 1545 msg = (struct rtrs_msg_conn_rsp) { 1546 .magic = cpu_to_le16(RTRS_MAGIC), 1547 .version = cpu_to_le16(RTRS_PROTO_VER), 1548 .queue_depth = cpu_to_le16(srv->queue_depth), 1549 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE), 1550 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE), 1551 }; 1552 1553 if (always_invalidate) 1554 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F); 1555 1556 err = rdma_accept(cm_id, ¶m); 1557 if (err) 1558 pr_err("rdma_accept(), err: %d\n", err); 1559 1560 return err; 1561 } 1562 1563 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno) 1564 { 1565 struct rtrs_msg_conn_rsp msg; 1566 int err; 1567 1568 msg = (struct rtrs_msg_conn_rsp) { 1569 .magic = cpu_to_le16(RTRS_MAGIC), 1570 .version = cpu_to_le16(RTRS_PROTO_VER), 1571 .errno = cpu_to_le16(errno), 1572 }; 1573 1574 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED); 1575 if (err) 1576 pr_err("rdma_reject(), err: %d\n", err); 1577 1578 /* Bounce errno back */ 1579 return errno; 1580 } 1581 1582 static struct rtrs_srv_sess * 1583 __find_sess(struct rtrs_srv *srv, const uuid_t *sess_uuid) 1584 { 1585 struct rtrs_srv_sess *sess; 1586 1587 list_for_each_entry(sess, &srv->paths_list, s.entry) { 1588 if (uuid_equal(&sess->s.uuid, sess_uuid)) 1589 return sess; 1590 } 1591 1592 return NULL; 1593 } 1594 1595 static int create_con(struct rtrs_srv_sess *sess, 1596 struct rdma_cm_id *cm_id, 1597 unsigned int cid) 1598 { 1599 struct rtrs_srv *srv = sess->srv; 1600 struct rtrs_sess *s = &sess->s; 1601 struct rtrs_srv_con *con; 1602 1603 u32 cq_size, wr_queue_size; 1604 int err, cq_vector; 1605 1606 con = kzalloc(sizeof(*con), GFP_KERNEL); 1607 if (!con) { 1608 err = -ENOMEM; 1609 goto err; 1610 } 1611 1612 spin_lock_init(&con->rsp_wr_wait_lock); 1613 INIT_LIST_HEAD(&con->rsp_wr_wait_list); 1614 con->c.cm_id = cm_id; 1615 con->c.sess = &sess->s; 1616 con->c.cid = cid; 1617 atomic_set(&con->wr_cnt, 1); 1618 1619 if (con->c.cid == 0) { 1620 /* 1621 * All receive and all send (each requiring invalidate) 1622 * + 2 for drain and heartbeat 1623 */ 1624 wr_queue_size = SERVICE_CON_QUEUE_DEPTH * 3 + 2; 1625 cq_size = wr_queue_size; 1626 } else { 1627 /* 1628 * If we have all receive requests posted and 1629 * all write requests posted and each read request 1630 * requires an invalidate request + drain 1631 * and qp gets into error state. 1632 */ 1633 cq_size = srv->queue_depth * 3 + 1; 1634 /* 1635 * In theory we might have queue_depth * 32 1636 * outstanding requests if an unsafe global key is used 1637 * and we have queue_depth read requests each consisting 1638 * of 32 different addresses. div 3 for mlx5. 1639 */ 1640 wr_queue_size = sess->s.dev->ib_dev->attrs.max_qp_wr / 3; 1641 } 1642 atomic_set(&con->sq_wr_avail, wr_queue_size); 1643 cq_vector = rtrs_srv_get_next_cq_vector(sess); 1644 1645 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */ 1646 err = rtrs_cq_qp_create(&sess->s, &con->c, 1, cq_vector, cq_size, 1647 wr_queue_size, wr_queue_size, 1648 IB_POLL_WORKQUEUE); 1649 if (err) { 1650 rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err); 1651 goto free_con; 1652 } 1653 if (con->c.cid == 0) { 1654 err = post_recv_info_req(con); 1655 if (err) 1656 goto free_cqqp; 1657 } 1658 WARN_ON(sess->s.con[cid]); 1659 sess->s.con[cid] = &con->c; 1660 1661 /* 1662 * Change context from server to current connection. The other 1663 * way is to use cm_id->qp->qp_context, which does not work on OFED. 1664 */ 1665 cm_id->context = &con->c; 1666 1667 return 0; 1668 1669 free_cqqp: 1670 rtrs_cq_qp_destroy(&con->c); 1671 free_con: 1672 kfree(con); 1673 1674 err: 1675 return err; 1676 } 1677 1678 static struct rtrs_srv_sess *__alloc_sess(struct rtrs_srv *srv, 1679 struct rdma_cm_id *cm_id, 1680 unsigned int con_num, 1681 unsigned int recon_cnt, 1682 const uuid_t *uuid) 1683 { 1684 struct rtrs_srv_sess *sess; 1685 int err = -ENOMEM; 1686 1687 if (srv->paths_num >= MAX_PATHS_NUM) { 1688 err = -ECONNRESET; 1689 goto err; 1690 } 1691 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) { 1692 err = -EEXIST; 1693 pr_err("Path with same addr exists\n"); 1694 goto err; 1695 } 1696 sess = kzalloc(sizeof(*sess), GFP_KERNEL); 1697 if (!sess) 1698 goto err; 1699 1700 sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL); 1701 if (!sess->stats) 1702 goto err_free_sess; 1703 1704 sess->stats->sess = sess; 1705 1706 sess->dma_addr = kcalloc(srv->queue_depth, sizeof(*sess->dma_addr), 1707 GFP_KERNEL); 1708 if (!sess->dma_addr) 1709 goto err_free_stats; 1710 1711 sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL); 1712 if (!sess->s.con) 1713 goto err_free_dma_addr; 1714 1715 sess->state = RTRS_SRV_CONNECTING; 1716 sess->srv = srv; 1717 sess->cur_cq_vector = -1; 1718 sess->s.dst_addr = cm_id->route.addr.dst_addr; 1719 sess->s.src_addr = cm_id->route.addr.src_addr; 1720 sess->s.con_num = con_num; 1721 sess->s.recon_cnt = recon_cnt; 1722 uuid_copy(&sess->s.uuid, uuid); 1723 spin_lock_init(&sess->state_lock); 1724 INIT_WORK(&sess->close_work, rtrs_srv_close_work); 1725 rtrs_srv_init_hb(sess); 1726 1727 sess->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd); 1728 if (!sess->s.dev) { 1729 err = -ENOMEM; 1730 goto err_free_con; 1731 } 1732 err = map_cont_bufs(sess); 1733 if (err) 1734 goto err_put_dev; 1735 1736 err = rtrs_srv_alloc_ops_ids(sess); 1737 if (err) 1738 goto err_unmap_bufs; 1739 1740 __add_path_to_srv(srv, sess); 1741 1742 return sess; 1743 1744 err_unmap_bufs: 1745 unmap_cont_bufs(sess); 1746 err_put_dev: 1747 rtrs_ib_dev_put(sess->s.dev); 1748 err_free_con: 1749 kfree(sess->s.con); 1750 err_free_dma_addr: 1751 kfree(sess->dma_addr); 1752 err_free_stats: 1753 kfree(sess->stats); 1754 err_free_sess: 1755 kfree(sess); 1756 err: 1757 return ERR_PTR(err); 1758 } 1759 1760 static int rtrs_rdma_connect(struct rdma_cm_id *cm_id, 1761 const struct rtrs_msg_conn_req *msg, 1762 size_t len) 1763 { 1764 struct rtrs_srv_ctx *ctx = cm_id->context; 1765 struct rtrs_srv_sess *sess; 1766 struct rtrs_srv *srv; 1767 1768 u16 version, con_num, cid; 1769 u16 recon_cnt; 1770 int err; 1771 1772 if (len < sizeof(*msg)) { 1773 pr_err("Invalid RTRS connection request\n"); 1774 goto reject_w_econnreset; 1775 } 1776 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) { 1777 pr_err("Invalid RTRS magic\n"); 1778 goto reject_w_econnreset; 1779 } 1780 version = le16_to_cpu(msg->version); 1781 if (version >> 8 != RTRS_PROTO_VER_MAJOR) { 1782 pr_err("Unsupported major RTRS version: %d, expected %d\n", 1783 version >> 8, RTRS_PROTO_VER_MAJOR); 1784 goto reject_w_econnreset; 1785 } 1786 con_num = le16_to_cpu(msg->cid_num); 1787 if (con_num > 4096) { 1788 /* Sanity check */ 1789 pr_err("Too many connections requested: %d\n", con_num); 1790 goto reject_w_econnreset; 1791 } 1792 cid = le16_to_cpu(msg->cid); 1793 if (cid >= con_num) { 1794 /* Sanity check */ 1795 pr_err("Incorrect cid: %d >= %d\n", cid, con_num); 1796 goto reject_w_econnreset; 1797 } 1798 recon_cnt = le16_to_cpu(msg->recon_cnt); 1799 srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn); 1800 if (IS_ERR(srv)) { 1801 err = PTR_ERR(srv); 1802 goto reject_w_err; 1803 } 1804 mutex_lock(&srv->paths_mutex); 1805 sess = __find_sess(srv, &msg->sess_uuid); 1806 if (sess) { 1807 struct rtrs_sess *s = &sess->s; 1808 1809 /* Session already holds a reference */ 1810 put_srv(srv); 1811 1812 if (sess->state != RTRS_SRV_CONNECTING) { 1813 rtrs_err(s, "Session in wrong state: %s\n", 1814 rtrs_srv_state_str(sess->state)); 1815 mutex_unlock(&srv->paths_mutex); 1816 goto reject_w_econnreset; 1817 } 1818 /* 1819 * Sanity checks 1820 */ 1821 if (con_num != s->con_num || cid >= s->con_num) { 1822 rtrs_err(s, "Incorrect request: %d, %d\n", 1823 cid, con_num); 1824 mutex_unlock(&srv->paths_mutex); 1825 goto reject_w_econnreset; 1826 } 1827 if (s->con[cid]) { 1828 rtrs_err(s, "Connection already exists: %d\n", 1829 cid); 1830 mutex_unlock(&srv->paths_mutex); 1831 goto reject_w_econnreset; 1832 } 1833 } else { 1834 sess = __alloc_sess(srv, cm_id, con_num, recon_cnt, 1835 &msg->sess_uuid); 1836 if (IS_ERR(sess)) { 1837 mutex_unlock(&srv->paths_mutex); 1838 put_srv(srv); 1839 err = PTR_ERR(sess); 1840 goto reject_w_err; 1841 } 1842 } 1843 err = create_con(sess, cm_id, cid); 1844 if (err) { 1845 (void)rtrs_rdma_do_reject(cm_id, err); 1846 /* 1847 * Since session has other connections we follow normal way 1848 * through workqueue, but still return an error to tell cma.c 1849 * to call rdma_destroy_id() for current connection. 1850 */ 1851 goto close_and_return_err; 1852 } 1853 err = rtrs_rdma_do_accept(sess, cm_id); 1854 if (err) { 1855 (void)rtrs_rdma_do_reject(cm_id, err); 1856 /* 1857 * Since current connection was successfully added to the 1858 * session we follow normal way through workqueue to close the 1859 * session, thus return 0 to tell cma.c we call 1860 * rdma_destroy_id() ourselves. 1861 */ 1862 err = 0; 1863 goto close_and_return_err; 1864 } 1865 mutex_unlock(&srv->paths_mutex); 1866 1867 return 0; 1868 1869 reject_w_err: 1870 return rtrs_rdma_do_reject(cm_id, err); 1871 1872 reject_w_econnreset: 1873 return rtrs_rdma_do_reject(cm_id, -ECONNRESET); 1874 1875 close_and_return_err: 1876 mutex_unlock(&srv->paths_mutex); 1877 close_sess(sess); 1878 1879 return err; 1880 } 1881 1882 static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id, 1883 struct rdma_cm_event *ev) 1884 { 1885 struct rtrs_srv_sess *sess = NULL; 1886 struct rtrs_sess *s = NULL; 1887 1888 if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 1889 struct rtrs_con *c = cm_id->context; 1890 1891 s = c->sess; 1892 sess = to_srv_sess(s); 1893 } 1894 1895 switch (ev->event) { 1896 case RDMA_CM_EVENT_CONNECT_REQUEST: 1897 /* 1898 * In case of error cma.c will destroy cm_id, 1899 * see cma_process_remove() 1900 */ 1901 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data, 1902 ev->param.conn.private_data_len); 1903 case RDMA_CM_EVENT_ESTABLISHED: 1904 /* Nothing here */ 1905 break; 1906 case RDMA_CM_EVENT_REJECTED: 1907 case RDMA_CM_EVENT_CONNECT_ERROR: 1908 case RDMA_CM_EVENT_UNREACHABLE: 1909 rtrs_err(s, "CM error (CM event: %s, err: %d)\n", 1910 rdma_event_msg(ev->event), ev->status); 1911 close_sess(sess); 1912 break; 1913 case RDMA_CM_EVENT_DISCONNECTED: 1914 case RDMA_CM_EVENT_ADDR_CHANGE: 1915 case RDMA_CM_EVENT_TIMEWAIT_EXIT: 1916 close_sess(sess); 1917 break; 1918 case RDMA_CM_EVENT_DEVICE_REMOVAL: 1919 close_sess(sess); 1920 break; 1921 default: 1922 pr_err("Ignoring unexpected CM event %s, err %d\n", 1923 rdma_event_msg(ev->event), ev->status); 1924 break; 1925 } 1926 1927 return 0; 1928 } 1929 1930 static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx, 1931 struct sockaddr *addr, 1932 enum rdma_ucm_port_space ps) 1933 { 1934 struct rdma_cm_id *cm_id; 1935 int ret; 1936 1937 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler, 1938 ctx, ps, IB_QPT_RC); 1939 if (IS_ERR(cm_id)) { 1940 ret = PTR_ERR(cm_id); 1941 pr_err("Creating id for RDMA connection failed, err: %d\n", 1942 ret); 1943 goto err_out; 1944 } 1945 ret = rdma_bind_addr(cm_id, addr); 1946 if (ret) { 1947 pr_err("Binding RDMA address failed, err: %d\n", ret); 1948 goto err_cm; 1949 } 1950 ret = rdma_listen(cm_id, 64); 1951 if (ret) { 1952 pr_err("Listening on RDMA connection failed, err: %d\n", 1953 ret); 1954 goto err_cm; 1955 } 1956 1957 return cm_id; 1958 1959 err_cm: 1960 rdma_destroy_id(cm_id); 1961 err_out: 1962 1963 return ERR_PTR(ret); 1964 } 1965 1966 static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port) 1967 { 1968 struct sockaddr_in6 sin = { 1969 .sin6_family = AF_INET6, 1970 .sin6_addr = IN6ADDR_ANY_INIT, 1971 .sin6_port = htons(port), 1972 }; 1973 struct sockaddr_ib sib = { 1974 .sib_family = AF_IB, 1975 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port), 1976 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL), 1977 .sib_pkey = cpu_to_be16(0xffff), 1978 }; 1979 struct rdma_cm_id *cm_ip, *cm_ib; 1980 int ret; 1981 1982 /* 1983 * We accept both IPoIB and IB connections, so we need to keep 1984 * two cm id's, one for each socket type and port space. 1985 * If the cm initialization of one of the id's fails, we abort 1986 * everything. 1987 */ 1988 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP); 1989 if (IS_ERR(cm_ip)) 1990 return PTR_ERR(cm_ip); 1991 1992 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB); 1993 if (IS_ERR(cm_ib)) { 1994 ret = PTR_ERR(cm_ib); 1995 goto free_cm_ip; 1996 } 1997 1998 ctx->cm_id_ip = cm_ip; 1999 ctx->cm_id_ib = cm_ib; 2000 2001 return 0; 2002 2003 free_cm_ip: 2004 rdma_destroy_id(cm_ip); 2005 2006 return ret; 2007 } 2008 2009 static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops) 2010 { 2011 struct rtrs_srv_ctx *ctx; 2012 2013 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); 2014 if (!ctx) 2015 return NULL; 2016 2017 ctx->ops = *ops; 2018 mutex_init(&ctx->srv_mutex); 2019 INIT_LIST_HEAD(&ctx->srv_list); 2020 2021 return ctx; 2022 } 2023 2024 static void free_srv_ctx(struct rtrs_srv_ctx *ctx) 2025 { 2026 WARN_ON(!list_empty(&ctx->srv_list)); 2027 mutex_destroy(&ctx->srv_mutex); 2028 kfree(ctx); 2029 } 2030 2031 static int rtrs_srv_add_one(struct ib_device *device) 2032 { 2033 struct rtrs_srv_ctx *ctx; 2034 int ret = 0; 2035 2036 mutex_lock(&ib_ctx.ib_dev_mutex); 2037 if (ib_ctx.ib_dev_count) 2038 goto out; 2039 2040 /* 2041 * Since our CM IDs are NOT bound to any ib device we will create them 2042 * only once 2043 */ 2044 ctx = ib_ctx.srv_ctx; 2045 ret = rtrs_srv_rdma_init(ctx, ib_ctx.port); 2046 if (ret) { 2047 /* 2048 * We errored out here. 2049 * According to the ib code, if we encounter an error here then the 2050 * error code is ignored, and no more calls to our ops are made. 2051 */ 2052 pr_err("Failed to initialize RDMA connection"); 2053 goto err_out; 2054 } 2055 2056 out: 2057 /* 2058 * Keep a track on the number of ib devices added 2059 */ 2060 ib_ctx.ib_dev_count++; 2061 2062 err_out: 2063 mutex_unlock(&ib_ctx.ib_dev_mutex); 2064 return ret; 2065 } 2066 2067 static void rtrs_srv_remove_one(struct ib_device *device, void *client_data) 2068 { 2069 struct rtrs_srv_ctx *ctx; 2070 2071 mutex_lock(&ib_ctx.ib_dev_mutex); 2072 ib_ctx.ib_dev_count--; 2073 2074 if (ib_ctx.ib_dev_count) 2075 goto out; 2076 2077 /* 2078 * Since our CM IDs are NOT bound to any ib device we will remove them 2079 * only once, when the last device is removed 2080 */ 2081 ctx = ib_ctx.srv_ctx; 2082 rdma_destroy_id(ctx->cm_id_ip); 2083 rdma_destroy_id(ctx->cm_id_ib); 2084 2085 out: 2086 mutex_unlock(&ib_ctx.ib_dev_mutex); 2087 } 2088 2089 static struct ib_client rtrs_srv_client = { 2090 .name = "rtrs_server", 2091 .add = rtrs_srv_add_one, 2092 .remove = rtrs_srv_remove_one 2093 }; 2094 2095 /** 2096 * rtrs_srv_open() - open RTRS server context 2097 * @ops: callback functions 2098 * @port: port to listen on 2099 * 2100 * Creates server context with specified callbacks. 2101 * 2102 * Return a valid pointer on success otherwise PTR_ERR. 2103 */ 2104 struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port) 2105 { 2106 struct rtrs_srv_ctx *ctx; 2107 int err; 2108 2109 ctx = alloc_srv_ctx(ops); 2110 if (!ctx) 2111 return ERR_PTR(-ENOMEM); 2112 2113 mutex_init(&ib_ctx.ib_dev_mutex); 2114 ib_ctx.srv_ctx = ctx; 2115 ib_ctx.port = port; 2116 2117 err = ib_register_client(&rtrs_srv_client); 2118 if (err) { 2119 free_srv_ctx(ctx); 2120 return ERR_PTR(err); 2121 } 2122 2123 return ctx; 2124 } 2125 EXPORT_SYMBOL(rtrs_srv_open); 2126 2127 static void close_sessions(struct rtrs_srv *srv) 2128 { 2129 struct rtrs_srv_sess *sess; 2130 2131 mutex_lock(&srv->paths_mutex); 2132 list_for_each_entry(sess, &srv->paths_list, s.entry) 2133 close_sess(sess); 2134 mutex_unlock(&srv->paths_mutex); 2135 } 2136 2137 static void close_ctx(struct rtrs_srv_ctx *ctx) 2138 { 2139 struct rtrs_srv *srv; 2140 2141 mutex_lock(&ctx->srv_mutex); 2142 list_for_each_entry(srv, &ctx->srv_list, ctx_list) 2143 close_sessions(srv); 2144 mutex_unlock(&ctx->srv_mutex); 2145 flush_workqueue(rtrs_wq); 2146 } 2147 2148 /** 2149 * rtrs_srv_close() - close RTRS server context 2150 * @ctx: pointer to server context 2151 * 2152 * Closes RTRS server context with all client sessions. 2153 */ 2154 void rtrs_srv_close(struct rtrs_srv_ctx *ctx) 2155 { 2156 ib_unregister_client(&rtrs_srv_client); 2157 mutex_destroy(&ib_ctx.ib_dev_mutex); 2158 close_ctx(ctx); 2159 free_srv_ctx(ctx); 2160 } 2161 EXPORT_SYMBOL(rtrs_srv_close); 2162 2163 static int check_module_params(void) 2164 { 2165 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) { 2166 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n", 2167 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH); 2168 return -EINVAL; 2169 } 2170 if (max_chunk_size < 4096 || !is_power_of_2(max_chunk_size)) { 2171 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n", 2172 max_chunk_size, 4096); 2173 return -EINVAL; 2174 } 2175 2176 /* 2177 * Check if IB immediate data size is enough to hold the mem_id and the 2178 * offset inside the memory chunk 2179 */ 2180 if ((ilog2(sess_queue_depth - 1) + 1) + 2181 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) { 2182 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n", 2183 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size); 2184 return -EINVAL; 2185 } 2186 2187 return 0; 2188 } 2189 2190 static int __init rtrs_server_init(void) 2191 { 2192 int err; 2193 2194 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n", 2195 KBUILD_MODNAME, RTRS_PROTO_VER_STRING, 2196 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE, 2197 sess_queue_depth, always_invalidate); 2198 2199 rtrs_rdma_dev_pd_init(0, &dev_pd); 2200 2201 err = check_module_params(); 2202 if (err) { 2203 pr_err("Failed to load module, invalid module parameters, err: %d\n", 2204 err); 2205 return err; 2206 } 2207 chunk_pool = mempool_create_page_pool(sess_queue_depth * CHUNK_POOL_SZ, 2208 get_order(max_chunk_size)); 2209 if (!chunk_pool) 2210 return -ENOMEM; 2211 rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server"); 2212 if (IS_ERR(rtrs_dev_class)) { 2213 err = PTR_ERR(rtrs_dev_class); 2214 goto out_chunk_pool; 2215 } 2216 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0); 2217 if (!rtrs_wq) { 2218 err = -ENOMEM; 2219 goto out_dev_class; 2220 } 2221 2222 return 0; 2223 2224 out_dev_class: 2225 class_destroy(rtrs_dev_class); 2226 out_chunk_pool: 2227 mempool_destroy(chunk_pool); 2228 2229 return err; 2230 } 2231 2232 static void __exit rtrs_server_exit(void) 2233 { 2234 destroy_workqueue(rtrs_wq); 2235 class_destroy(rtrs_dev_class); 2236 mempool_destroy(chunk_pool); 2237 rtrs_rdma_dev_pd_deinit(&dev_pd); 2238 } 2239 2240 module_init(rtrs_server_init); 2241 module_exit(rtrs_server_exit); 2242