1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Transport Layer 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 #include <linux/mempool.h> 15 16 #include "rtrs-srv.h" 17 #include "rtrs-log.h" 18 #include <rdma/ib_cm.h> 19 #include <rdma/ib_verbs.h> 20 21 MODULE_DESCRIPTION("RDMA Transport Server"); 22 MODULE_LICENSE("GPL"); 23 24 /* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */ 25 #define DEFAULT_MAX_CHUNK_SIZE (128 << 10) 26 #define DEFAULT_SESS_QUEUE_DEPTH 512 27 #define MAX_HDR_SIZE PAGE_SIZE 28 29 /* We guarantee to serve 10 paths at least */ 30 #define CHUNK_POOL_SZ 10 31 32 static struct rtrs_rdma_dev_pd dev_pd; 33 static mempool_t *chunk_pool; 34 struct class *rtrs_dev_class; 35 static struct rtrs_srv_ib_ctx ib_ctx; 36 37 static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE; 38 static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH; 39 40 static bool always_invalidate = true; 41 module_param(always_invalidate, bool, 0444); 42 MODULE_PARM_DESC(always_invalidate, 43 "Invalidate memory registration for contiguous memory regions before accessing."); 44 45 module_param_named(max_chunk_size, max_chunk_size, int, 0444); 46 MODULE_PARM_DESC(max_chunk_size, 47 "Max size for each IO request, when change the unit is in byte (default: " 48 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)"); 49 50 module_param_named(sess_queue_depth, sess_queue_depth, int, 0444); 51 MODULE_PARM_DESC(sess_queue_depth, 52 "Number of buffers for pending I/O requests to allocate per session. Maximum: " 53 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: " 54 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")"); 55 56 static cpumask_t cq_affinity_mask = { CPU_BITS_ALL }; 57 58 static struct workqueue_struct *rtrs_wq; 59 60 static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c) 61 { 62 return container_of(c, struct rtrs_srv_con, c); 63 } 64 65 static inline struct rtrs_srv_sess *to_srv_sess(struct rtrs_sess *s) 66 { 67 return container_of(s, struct rtrs_srv_sess, s); 68 } 69 70 static bool rtrs_srv_change_state(struct rtrs_srv_sess *sess, 71 enum rtrs_srv_state new_state) 72 { 73 enum rtrs_srv_state old_state; 74 bool changed = false; 75 76 spin_lock_irq(&sess->state_lock); 77 old_state = sess->state; 78 switch (new_state) { 79 case RTRS_SRV_CONNECTED: 80 if (old_state == RTRS_SRV_CONNECTING) 81 changed = true; 82 break; 83 case RTRS_SRV_CLOSING: 84 if (old_state == RTRS_SRV_CONNECTING || 85 old_state == RTRS_SRV_CONNECTED) 86 changed = true; 87 break; 88 case RTRS_SRV_CLOSED: 89 if (old_state == RTRS_SRV_CLOSING) 90 changed = true; 91 break; 92 default: 93 break; 94 } 95 if (changed) 96 sess->state = new_state; 97 spin_unlock_irq(&sess->state_lock); 98 99 return changed; 100 } 101 102 static void free_id(struct rtrs_srv_op *id) 103 { 104 if (!id) 105 return; 106 kfree(id); 107 } 108 109 static void rtrs_srv_free_ops_ids(struct rtrs_srv_sess *sess) 110 { 111 struct rtrs_srv *srv = sess->srv; 112 int i; 113 114 if (sess->ops_ids) { 115 for (i = 0; i < srv->queue_depth; i++) 116 free_id(sess->ops_ids[i]); 117 kfree(sess->ops_ids); 118 sess->ops_ids = NULL; 119 } 120 } 121 122 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc); 123 124 static struct ib_cqe io_comp_cqe = { 125 .done = rtrs_srv_rdma_done 126 }; 127 128 static inline void rtrs_srv_inflight_ref_release(struct percpu_ref *ref) 129 { 130 struct rtrs_srv_sess *sess = container_of(ref, struct rtrs_srv_sess, ids_inflight_ref); 131 132 percpu_ref_exit(&sess->ids_inflight_ref); 133 complete(&sess->complete_done); 134 } 135 136 static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_sess *sess) 137 { 138 struct rtrs_srv *srv = sess->srv; 139 struct rtrs_srv_op *id; 140 int i, ret; 141 142 sess->ops_ids = kcalloc(srv->queue_depth, sizeof(*sess->ops_ids), 143 GFP_KERNEL); 144 if (!sess->ops_ids) 145 goto err; 146 147 for (i = 0; i < srv->queue_depth; ++i) { 148 id = kzalloc(sizeof(*id), GFP_KERNEL); 149 if (!id) 150 goto err; 151 152 sess->ops_ids[i] = id; 153 } 154 155 ret = percpu_ref_init(&sess->ids_inflight_ref, 156 rtrs_srv_inflight_ref_release, 0, GFP_KERNEL); 157 if (ret) { 158 pr_err("Percpu reference init failed\n"); 159 goto err; 160 } 161 init_completion(&sess->complete_done); 162 163 return 0; 164 165 err: 166 rtrs_srv_free_ops_ids(sess); 167 return -ENOMEM; 168 } 169 170 static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_sess *sess) 171 { 172 percpu_ref_get(&sess->ids_inflight_ref); 173 } 174 175 static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_sess *sess) 176 { 177 percpu_ref_put(&sess->ids_inflight_ref); 178 } 179 180 static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc) 181 { 182 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 183 struct rtrs_sess *s = con->c.sess; 184 struct rtrs_srv_sess *sess = to_srv_sess(s); 185 186 if (unlikely(wc->status != IB_WC_SUCCESS)) { 187 rtrs_err(s, "REG MR failed: %s\n", 188 ib_wc_status_msg(wc->status)); 189 close_sess(sess); 190 return; 191 } 192 } 193 194 static struct ib_cqe local_reg_cqe = { 195 .done = rtrs_srv_reg_mr_done 196 }; 197 198 static int rdma_write_sg(struct rtrs_srv_op *id) 199 { 200 struct rtrs_sess *s = id->con->c.sess; 201 struct rtrs_srv_sess *sess = to_srv_sess(s); 202 dma_addr_t dma_addr = sess->dma_addr[id->msg_id]; 203 struct rtrs_srv_mr *srv_mr; 204 struct rtrs_srv *srv = sess->srv; 205 struct ib_send_wr inv_wr; 206 struct ib_rdma_wr imm_wr; 207 struct ib_rdma_wr *wr = NULL; 208 enum ib_send_flags flags; 209 size_t sg_cnt; 210 int err, offset; 211 bool need_inval; 212 u32 rkey = 0; 213 struct ib_reg_wr rwr; 214 struct ib_sge *plist; 215 struct ib_sge list; 216 217 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt); 218 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F; 219 if (unlikely(sg_cnt != 1)) 220 return -EINVAL; 221 222 offset = 0; 223 224 wr = &id->tx_wr; 225 plist = &id->tx_sg; 226 plist->addr = dma_addr + offset; 227 plist->length = le32_to_cpu(id->rd_msg->desc[0].len); 228 229 /* WR will fail with length error 230 * if this is 0 231 */ 232 if (unlikely(plist->length == 0)) { 233 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n"); 234 return -EINVAL; 235 } 236 237 plist->lkey = sess->s.dev->ib_pd->local_dma_lkey; 238 offset += plist->length; 239 240 wr->wr.sg_list = plist; 241 wr->wr.num_sge = 1; 242 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr); 243 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key); 244 if (rkey == 0) 245 rkey = wr->rkey; 246 else 247 /* Only one key is actually used */ 248 WARN_ON_ONCE(rkey != wr->rkey); 249 250 wr->wr.opcode = IB_WR_RDMA_WRITE; 251 wr->wr.wr_cqe = &io_comp_cqe; 252 wr->wr.ex.imm_data = 0; 253 wr->wr.send_flags = 0; 254 255 if (need_inval && always_invalidate) { 256 wr->wr.next = &rwr.wr; 257 rwr.wr.next = &inv_wr; 258 inv_wr.next = &imm_wr.wr; 259 } else if (always_invalidate) { 260 wr->wr.next = &rwr.wr; 261 rwr.wr.next = &imm_wr.wr; 262 } else if (need_inval) { 263 wr->wr.next = &inv_wr; 264 inv_wr.next = &imm_wr.wr; 265 } else { 266 wr->wr.next = &imm_wr.wr; 267 } 268 /* 269 * From time to time we have to post signaled sends, 270 * or send queue will fill up and only QP reset can help. 271 */ 272 flags = (atomic_inc_return(&id->con->wr_cnt) % srv->queue_depth) ? 273 0 : IB_SEND_SIGNALED; 274 275 if (need_inval) { 276 inv_wr.sg_list = NULL; 277 inv_wr.num_sge = 0; 278 inv_wr.opcode = IB_WR_SEND_WITH_INV; 279 inv_wr.wr_cqe = &io_comp_cqe; 280 inv_wr.send_flags = 0; 281 inv_wr.ex.invalidate_rkey = rkey; 282 } 283 284 imm_wr.wr.next = NULL; 285 if (always_invalidate) { 286 struct rtrs_msg_rkey_rsp *msg; 287 288 srv_mr = &sess->mrs[id->msg_id]; 289 rwr.wr.opcode = IB_WR_REG_MR; 290 rwr.wr.wr_cqe = &local_reg_cqe; 291 rwr.wr.num_sge = 0; 292 rwr.mr = srv_mr->mr; 293 rwr.wr.send_flags = 0; 294 rwr.key = srv_mr->mr->rkey; 295 rwr.access = (IB_ACCESS_LOCAL_WRITE | 296 IB_ACCESS_REMOTE_WRITE); 297 msg = srv_mr->iu->buf; 298 msg->buf_id = cpu_to_le16(id->msg_id); 299 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 300 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 301 302 list.addr = srv_mr->iu->dma_addr; 303 list.length = sizeof(*msg); 304 list.lkey = sess->s.dev->ib_pd->local_dma_lkey; 305 imm_wr.wr.sg_list = &list; 306 imm_wr.wr.num_sge = 1; 307 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 308 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, 309 srv_mr->iu->dma_addr, 310 srv_mr->iu->size, DMA_TO_DEVICE); 311 } else { 312 imm_wr.wr.sg_list = NULL; 313 imm_wr.wr.num_sge = 0; 314 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 315 } 316 imm_wr.wr.send_flags = flags; 317 imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id, 318 0, need_inval)); 319 320 imm_wr.wr.wr_cqe = &io_comp_cqe; 321 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, dma_addr, 322 offset, DMA_BIDIRECTIONAL); 323 324 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL); 325 if (unlikely(err)) 326 rtrs_err(s, 327 "Posting RDMA-Write-Request to QP failed, err: %d\n", 328 err); 329 330 return err; 331 } 332 333 /** 334 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE 335 * requests or on successful WRITE request. 336 * @con: the connection to send back result 337 * @id: the id associated with the IO 338 * @errno: the error number of the IO. 339 * 340 * Return 0 on success, errno otherwise. 341 */ 342 static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id, 343 int errno) 344 { 345 struct rtrs_sess *s = con->c.sess; 346 struct rtrs_srv_sess *sess = to_srv_sess(s); 347 struct ib_send_wr inv_wr, *wr = NULL; 348 struct ib_rdma_wr imm_wr; 349 struct ib_reg_wr rwr; 350 struct rtrs_srv *srv = sess->srv; 351 struct rtrs_srv_mr *srv_mr; 352 bool need_inval = false; 353 enum ib_send_flags flags; 354 u32 imm; 355 int err; 356 357 if (id->dir == READ) { 358 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg; 359 size_t sg_cnt; 360 361 need_inval = le16_to_cpu(rd_msg->flags) & 362 RTRS_MSG_NEED_INVAL_F; 363 sg_cnt = le16_to_cpu(rd_msg->sg_cnt); 364 365 if (need_inval) { 366 if (likely(sg_cnt)) { 367 inv_wr.wr_cqe = &io_comp_cqe; 368 inv_wr.sg_list = NULL; 369 inv_wr.num_sge = 0; 370 inv_wr.opcode = IB_WR_SEND_WITH_INV; 371 inv_wr.send_flags = 0; 372 /* Only one key is actually used */ 373 inv_wr.ex.invalidate_rkey = 374 le32_to_cpu(rd_msg->desc[0].key); 375 } else { 376 WARN_ON_ONCE(1); 377 need_inval = false; 378 } 379 } 380 } 381 382 if (need_inval && always_invalidate) { 383 wr = &inv_wr; 384 inv_wr.next = &rwr.wr; 385 rwr.wr.next = &imm_wr.wr; 386 } else if (always_invalidate) { 387 wr = &rwr.wr; 388 rwr.wr.next = &imm_wr.wr; 389 } else if (need_inval) { 390 wr = &inv_wr; 391 inv_wr.next = &imm_wr.wr; 392 } else { 393 wr = &imm_wr.wr; 394 } 395 /* 396 * From time to time we have to post signalled sends, 397 * or send queue will fill up and only QP reset can help. 398 */ 399 flags = (atomic_inc_return(&con->wr_cnt) % srv->queue_depth) ? 400 0 : IB_SEND_SIGNALED; 401 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval); 402 imm_wr.wr.next = NULL; 403 if (always_invalidate) { 404 struct ib_sge list; 405 struct rtrs_msg_rkey_rsp *msg; 406 407 srv_mr = &sess->mrs[id->msg_id]; 408 rwr.wr.next = &imm_wr.wr; 409 rwr.wr.opcode = IB_WR_REG_MR; 410 rwr.wr.wr_cqe = &local_reg_cqe; 411 rwr.wr.num_sge = 0; 412 rwr.wr.send_flags = 0; 413 rwr.mr = srv_mr->mr; 414 rwr.key = srv_mr->mr->rkey; 415 rwr.access = (IB_ACCESS_LOCAL_WRITE | 416 IB_ACCESS_REMOTE_WRITE); 417 msg = srv_mr->iu->buf; 418 msg->buf_id = cpu_to_le16(id->msg_id); 419 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 420 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 421 422 list.addr = srv_mr->iu->dma_addr; 423 list.length = sizeof(*msg); 424 list.lkey = sess->s.dev->ib_pd->local_dma_lkey; 425 imm_wr.wr.sg_list = &list; 426 imm_wr.wr.num_sge = 1; 427 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 428 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, 429 srv_mr->iu->dma_addr, 430 srv_mr->iu->size, DMA_TO_DEVICE); 431 } else { 432 imm_wr.wr.sg_list = NULL; 433 imm_wr.wr.num_sge = 0; 434 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 435 } 436 imm_wr.wr.send_flags = flags; 437 imm_wr.wr.wr_cqe = &io_comp_cqe; 438 439 imm_wr.wr.ex.imm_data = cpu_to_be32(imm); 440 441 err = ib_post_send(id->con->c.qp, wr, NULL); 442 if (unlikely(err)) 443 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n", 444 err); 445 446 return err; 447 } 448 449 void close_sess(struct rtrs_srv_sess *sess) 450 { 451 if (rtrs_srv_change_state(sess, RTRS_SRV_CLOSING)) 452 queue_work(rtrs_wq, &sess->close_work); 453 WARN_ON(sess->state != RTRS_SRV_CLOSING); 454 } 455 456 static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state) 457 { 458 switch (state) { 459 case RTRS_SRV_CONNECTING: 460 return "RTRS_SRV_CONNECTING"; 461 case RTRS_SRV_CONNECTED: 462 return "RTRS_SRV_CONNECTED"; 463 case RTRS_SRV_CLOSING: 464 return "RTRS_SRV_CLOSING"; 465 case RTRS_SRV_CLOSED: 466 return "RTRS_SRV_CLOSED"; 467 default: 468 return "UNKNOWN"; 469 } 470 } 471 472 /** 473 * rtrs_srv_resp_rdma() - Finish an RDMA request 474 * 475 * @id: Internal RTRS operation identifier 476 * @status: Response Code sent to the other side for this operation. 477 * 0 = success, <=0 error 478 * Context: any 479 * 480 * Finish a RDMA operation. A message is sent to the client and the 481 * corresponding memory areas will be released. 482 */ 483 bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status) 484 { 485 struct rtrs_srv_sess *sess; 486 struct rtrs_srv_con *con; 487 struct rtrs_sess *s; 488 int err; 489 490 if (WARN_ON(!id)) 491 return true; 492 493 con = id->con; 494 s = con->c.sess; 495 sess = to_srv_sess(s); 496 497 id->status = status; 498 499 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) { 500 rtrs_err_rl(s, 501 "Sending I/O response failed, session %s is disconnected, sess state %s\n", 502 kobject_name(&sess->kobj), 503 rtrs_srv_state_str(sess->state)); 504 goto out; 505 } 506 if (always_invalidate) { 507 struct rtrs_srv_mr *mr = &sess->mrs[id->msg_id]; 508 509 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey)); 510 } 511 if (unlikely(atomic_sub_return(1, 512 &con->sq_wr_avail) < 0)) { 513 rtrs_err(s, "IB send queue full: sess=%s cid=%d\n", 514 kobject_name(&sess->kobj), 515 con->c.cid); 516 atomic_add(1, &con->sq_wr_avail); 517 spin_lock(&con->rsp_wr_wait_lock); 518 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list); 519 spin_unlock(&con->rsp_wr_wait_lock); 520 return false; 521 } 522 523 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt) 524 err = send_io_resp_imm(con, id, status); 525 else 526 err = rdma_write_sg(id); 527 528 if (unlikely(err)) { 529 rtrs_err_rl(s, "IO response failed: %d: sess=%s\n", err, 530 kobject_name(&sess->kobj)); 531 close_sess(sess); 532 } 533 out: 534 rtrs_srv_put_ops_ids(sess); 535 return true; 536 } 537 EXPORT_SYMBOL(rtrs_srv_resp_rdma); 538 539 /** 540 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv. 541 * @srv: Session pointer 542 * @priv: The private pointer that is associated with the session. 543 */ 544 void rtrs_srv_set_sess_priv(struct rtrs_srv *srv, void *priv) 545 { 546 srv->priv = priv; 547 } 548 EXPORT_SYMBOL(rtrs_srv_set_sess_priv); 549 550 static void unmap_cont_bufs(struct rtrs_srv_sess *sess) 551 { 552 int i; 553 554 for (i = 0; i < sess->mrs_num; i++) { 555 struct rtrs_srv_mr *srv_mr; 556 557 srv_mr = &sess->mrs[i]; 558 rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1); 559 ib_dereg_mr(srv_mr->mr); 560 ib_dma_unmap_sg(sess->s.dev->ib_dev, srv_mr->sgt.sgl, 561 srv_mr->sgt.nents, DMA_BIDIRECTIONAL); 562 sg_free_table(&srv_mr->sgt); 563 } 564 kfree(sess->mrs); 565 } 566 567 static int map_cont_bufs(struct rtrs_srv_sess *sess) 568 { 569 struct rtrs_srv *srv = sess->srv; 570 struct rtrs_sess *ss = &sess->s; 571 int i, mri, err, mrs_num; 572 unsigned int chunk_bits; 573 int chunks_per_mr = 1; 574 575 /* 576 * Here we map queue_depth chunks to MR. Firstly we have to 577 * figure out how many chunks can we map per MR. 578 */ 579 if (always_invalidate) { 580 /* 581 * in order to do invalidate for each chunks of memory, we needs 582 * more memory regions. 583 */ 584 mrs_num = srv->queue_depth; 585 } else { 586 chunks_per_mr = 587 sess->s.dev->ib_dev->attrs.max_fast_reg_page_list_len; 588 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr); 589 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num); 590 } 591 592 sess->mrs = kcalloc(mrs_num, sizeof(*sess->mrs), GFP_KERNEL); 593 if (!sess->mrs) 594 return -ENOMEM; 595 596 sess->mrs_num = mrs_num; 597 598 for (mri = 0; mri < mrs_num; mri++) { 599 struct rtrs_srv_mr *srv_mr = &sess->mrs[mri]; 600 struct sg_table *sgt = &srv_mr->sgt; 601 struct scatterlist *s; 602 struct ib_mr *mr; 603 int nr, chunks; 604 605 chunks = chunks_per_mr * mri; 606 if (!always_invalidate) 607 chunks_per_mr = min_t(int, chunks_per_mr, 608 srv->queue_depth - chunks); 609 610 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL); 611 if (err) 612 goto err; 613 614 for_each_sg(sgt->sgl, s, chunks_per_mr, i) 615 sg_set_page(s, srv->chunks[chunks + i], 616 max_chunk_size, 0); 617 618 nr = ib_dma_map_sg(sess->s.dev->ib_dev, sgt->sgl, 619 sgt->nents, DMA_BIDIRECTIONAL); 620 if (nr < sgt->nents) { 621 err = nr < 0 ? nr : -EINVAL; 622 goto free_sg; 623 } 624 mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG, 625 sgt->nents); 626 if (IS_ERR(mr)) { 627 err = PTR_ERR(mr); 628 goto unmap_sg; 629 } 630 nr = ib_map_mr_sg(mr, sgt->sgl, sgt->nents, 631 NULL, max_chunk_size); 632 if (nr < 0 || nr < sgt->nents) { 633 err = nr < 0 ? nr : -EINVAL; 634 goto dereg_mr; 635 } 636 637 if (always_invalidate) { 638 srv_mr->iu = rtrs_iu_alloc(1, 639 sizeof(struct rtrs_msg_rkey_rsp), 640 GFP_KERNEL, sess->s.dev->ib_dev, 641 DMA_TO_DEVICE, rtrs_srv_rdma_done); 642 if (!srv_mr->iu) { 643 err = -ENOMEM; 644 rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err); 645 goto dereg_mr; 646 } 647 } 648 /* Eventually dma addr for each chunk can be cached */ 649 for_each_sg(sgt->sgl, s, sgt->orig_nents, i) 650 sess->dma_addr[chunks + i] = sg_dma_address(s); 651 652 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey)); 653 srv_mr->mr = mr; 654 655 continue; 656 err: 657 while (mri--) { 658 srv_mr = &sess->mrs[mri]; 659 sgt = &srv_mr->sgt; 660 mr = srv_mr->mr; 661 rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1); 662 dereg_mr: 663 ib_dereg_mr(mr); 664 unmap_sg: 665 ib_dma_unmap_sg(sess->s.dev->ib_dev, sgt->sgl, 666 sgt->nents, DMA_BIDIRECTIONAL); 667 free_sg: 668 sg_free_table(sgt); 669 } 670 kfree(sess->mrs); 671 672 return err; 673 } 674 675 chunk_bits = ilog2(srv->queue_depth - 1) + 1; 676 sess->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits); 677 678 return 0; 679 } 680 681 static void rtrs_srv_hb_err_handler(struct rtrs_con *c) 682 { 683 close_sess(to_srv_sess(c->sess)); 684 } 685 686 static void rtrs_srv_init_hb(struct rtrs_srv_sess *sess) 687 { 688 rtrs_init_hb(&sess->s, &io_comp_cqe, 689 RTRS_HB_INTERVAL_MS, 690 RTRS_HB_MISSED_MAX, 691 rtrs_srv_hb_err_handler, 692 rtrs_wq); 693 } 694 695 static void rtrs_srv_start_hb(struct rtrs_srv_sess *sess) 696 { 697 rtrs_start_hb(&sess->s); 698 } 699 700 static void rtrs_srv_stop_hb(struct rtrs_srv_sess *sess) 701 { 702 rtrs_stop_hb(&sess->s); 703 } 704 705 static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc) 706 { 707 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 708 struct rtrs_sess *s = con->c.sess; 709 struct rtrs_srv_sess *sess = to_srv_sess(s); 710 struct rtrs_iu *iu; 711 712 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 713 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 714 715 if (unlikely(wc->status != IB_WC_SUCCESS)) { 716 rtrs_err(s, "Sess info response send failed: %s\n", 717 ib_wc_status_msg(wc->status)); 718 close_sess(sess); 719 return; 720 } 721 WARN_ON(wc->opcode != IB_WC_SEND); 722 } 723 724 static void rtrs_srv_sess_up(struct rtrs_srv_sess *sess) 725 { 726 struct rtrs_srv *srv = sess->srv; 727 struct rtrs_srv_ctx *ctx = srv->ctx; 728 int up; 729 730 mutex_lock(&srv->paths_ev_mutex); 731 up = ++srv->paths_up; 732 if (up == 1) 733 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL); 734 mutex_unlock(&srv->paths_ev_mutex); 735 736 /* Mark session as established */ 737 sess->established = true; 738 } 739 740 static void rtrs_srv_sess_down(struct rtrs_srv_sess *sess) 741 { 742 struct rtrs_srv *srv = sess->srv; 743 struct rtrs_srv_ctx *ctx = srv->ctx; 744 745 if (!sess->established) 746 return; 747 748 sess->established = false; 749 mutex_lock(&srv->paths_ev_mutex); 750 WARN_ON(!srv->paths_up); 751 if (--srv->paths_up == 0) 752 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv); 753 mutex_unlock(&srv->paths_ev_mutex); 754 } 755 756 static bool exist_sessname(struct rtrs_srv_ctx *ctx, 757 const char *sessname, const uuid_t *path_uuid) 758 { 759 struct rtrs_srv *srv; 760 struct rtrs_srv_sess *sess; 761 bool found = false; 762 763 mutex_lock(&ctx->srv_mutex); 764 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 765 mutex_lock(&srv->paths_mutex); 766 767 /* when a client with same uuid and same sessname tried to add a path */ 768 if (uuid_equal(&srv->paths_uuid, path_uuid)) { 769 mutex_unlock(&srv->paths_mutex); 770 continue; 771 } 772 773 list_for_each_entry(sess, &srv->paths_list, s.entry) { 774 if (strlen(sess->s.sessname) == strlen(sessname) && 775 !strcmp(sess->s.sessname, sessname)) { 776 found = true; 777 break; 778 } 779 } 780 mutex_unlock(&srv->paths_mutex); 781 if (found) 782 break; 783 } 784 mutex_unlock(&ctx->srv_mutex); 785 return found; 786 } 787 788 static int post_recv_sess(struct rtrs_srv_sess *sess); 789 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno); 790 791 static int process_info_req(struct rtrs_srv_con *con, 792 struct rtrs_msg_info_req *msg) 793 { 794 struct rtrs_sess *s = con->c.sess; 795 struct rtrs_srv_sess *sess = to_srv_sess(s); 796 struct ib_send_wr *reg_wr = NULL; 797 struct rtrs_msg_info_rsp *rsp; 798 struct rtrs_iu *tx_iu; 799 struct ib_reg_wr *rwr; 800 int mri, err; 801 size_t tx_sz; 802 803 err = post_recv_sess(sess); 804 if (unlikely(err)) { 805 rtrs_err(s, "post_recv_sess(), err: %d\n", err); 806 return err; 807 } 808 809 if (exist_sessname(sess->srv->ctx, 810 msg->sessname, &sess->srv->paths_uuid)) { 811 rtrs_err(s, "sessname is duplicated: %s\n", msg->sessname); 812 return -EPERM; 813 } 814 strscpy(sess->s.sessname, msg->sessname, sizeof(sess->s.sessname)); 815 816 rwr = kcalloc(sess->mrs_num, sizeof(*rwr), GFP_KERNEL); 817 if (unlikely(!rwr)) 818 return -ENOMEM; 819 820 tx_sz = sizeof(*rsp); 821 tx_sz += sizeof(rsp->desc[0]) * sess->mrs_num; 822 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, sess->s.dev->ib_dev, 823 DMA_TO_DEVICE, rtrs_srv_info_rsp_done); 824 if (unlikely(!tx_iu)) { 825 err = -ENOMEM; 826 goto rwr_free; 827 } 828 829 rsp = tx_iu->buf; 830 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP); 831 rsp->sg_cnt = cpu_to_le16(sess->mrs_num); 832 833 for (mri = 0; mri < sess->mrs_num; mri++) { 834 struct ib_mr *mr = sess->mrs[mri].mr; 835 836 rsp->desc[mri].addr = cpu_to_le64(mr->iova); 837 rsp->desc[mri].key = cpu_to_le32(mr->rkey); 838 rsp->desc[mri].len = cpu_to_le32(mr->length); 839 840 /* 841 * Fill in reg MR request and chain them *backwards* 842 */ 843 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL; 844 rwr[mri].wr.opcode = IB_WR_REG_MR; 845 rwr[mri].wr.wr_cqe = &local_reg_cqe; 846 rwr[mri].wr.num_sge = 0; 847 rwr[mri].wr.send_flags = 0; 848 rwr[mri].mr = mr; 849 rwr[mri].key = mr->rkey; 850 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE | 851 IB_ACCESS_REMOTE_WRITE); 852 reg_wr = &rwr[mri].wr; 853 } 854 855 err = rtrs_srv_create_sess_files(sess); 856 if (unlikely(err)) 857 goto iu_free; 858 kobject_get(&sess->kobj); 859 get_device(&sess->srv->dev); 860 rtrs_srv_change_state(sess, RTRS_SRV_CONNECTED); 861 rtrs_srv_start_hb(sess); 862 863 /* 864 * We do not account number of established connections at the current 865 * moment, we rely on the client, which should send info request when 866 * all connections are successfully established. Thus, simply notify 867 * listener with a proper event if we are the first path. 868 */ 869 rtrs_srv_sess_up(sess); 870 871 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr, 872 tx_iu->size, DMA_TO_DEVICE); 873 874 /* Send info response */ 875 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr); 876 if (unlikely(err)) { 877 rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err); 878 iu_free: 879 rtrs_iu_free(tx_iu, sess->s.dev->ib_dev, 1); 880 } 881 rwr_free: 882 kfree(rwr); 883 884 return err; 885 } 886 887 static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc) 888 { 889 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 890 struct rtrs_sess *s = con->c.sess; 891 struct rtrs_srv_sess *sess = to_srv_sess(s); 892 struct rtrs_msg_info_req *msg; 893 struct rtrs_iu *iu; 894 int err; 895 896 WARN_ON(con->c.cid); 897 898 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 899 if (unlikely(wc->status != IB_WC_SUCCESS)) { 900 rtrs_err(s, "Sess info request receive failed: %s\n", 901 ib_wc_status_msg(wc->status)); 902 goto close; 903 } 904 WARN_ON(wc->opcode != IB_WC_RECV); 905 906 if (unlikely(wc->byte_len < sizeof(*msg))) { 907 rtrs_err(s, "Sess info request is malformed: size %d\n", 908 wc->byte_len); 909 goto close; 910 } 911 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr, 912 iu->size, DMA_FROM_DEVICE); 913 msg = iu->buf; 914 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ)) { 915 rtrs_err(s, "Sess info request is malformed: type %d\n", 916 le16_to_cpu(msg->type)); 917 goto close; 918 } 919 err = process_info_req(con, msg); 920 if (unlikely(err)) 921 goto close; 922 923 out: 924 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 925 return; 926 close: 927 close_sess(sess); 928 goto out; 929 } 930 931 static int post_recv_info_req(struct rtrs_srv_con *con) 932 { 933 struct rtrs_sess *s = con->c.sess; 934 struct rtrs_srv_sess *sess = to_srv_sess(s); 935 struct rtrs_iu *rx_iu; 936 int err; 937 938 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), 939 GFP_KERNEL, sess->s.dev->ib_dev, 940 DMA_FROM_DEVICE, rtrs_srv_info_req_done); 941 if (unlikely(!rx_iu)) 942 return -ENOMEM; 943 /* Prepare for getting info response */ 944 err = rtrs_iu_post_recv(&con->c, rx_iu); 945 if (unlikely(err)) { 946 rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err); 947 rtrs_iu_free(rx_iu, sess->s.dev->ib_dev, 1); 948 return err; 949 } 950 951 return 0; 952 } 953 954 static int post_recv_io(struct rtrs_srv_con *con, size_t q_size) 955 { 956 int i, err; 957 958 for (i = 0; i < q_size; i++) { 959 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 960 if (unlikely(err)) 961 return err; 962 } 963 964 return 0; 965 } 966 967 static int post_recv_sess(struct rtrs_srv_sess *sess) 968 { 969 struct rtrs_srv *srv = sess->srv; 970 struct rtrs_sess *s = &sess->s; 971 size_t q_size; 972 int err, cid; 973 974 for (cid = 0; cid < sess->s.con_num; cid++) { 975 if (cid == 0) 976 q_size = SERVICE_CON_QUEUE_DEPTH; 977 else 978 q_size = srv->queue_depth; 979 980 err = post_recv_io(to_srv_con(sess->s.con[cid]), q_size); 981 if (unlikely(err)) { 982 rtrs_err(s, "post_recv_io(), err: %d\n", err); 983 return err; 984 } 985 } 986 987 return 0; 988 } 989 990 static void process_read(struct rtrs_srv_con *con, 991 struct rtrs_msg_rdma_read *msg, 992 u32 buf_id, u32 off) 993 { 994 struct rtrs_sess *s = con->c.sess; 995 struct rtrs_srv_sess *sess = to_srv_sess(s); 996 struct rtrs_srv *srv = sess->srv; 997 struct rtrs_srv_ctx *ctx = srv->ctx; 998 struct rtrs_srv_op *id; 999 1000 size_t usr_len, data_len; 1001 void *data; 1002 int ret; 1003 1004 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) { 1005 rtrs_err_rl(s, 1006 "Processing read request failed, session is disconnected, sess state %s\n", 1007 rtrs_srv_state_str(sess->state)); 1008 return; 1009 } 1010 if (unlikely(msg->sg_cnt != 1 && msg->sg_cnt != 0)) { 1011 rtrs_err_rl(s, 1012 "Processing read request failed, invalid message\n"); 1013 return; 1014 } 1015 rtrs_srv_get_ops_ids(sess); 1016 rtrs_srv_update_rdma_stats(sess->stats, off, READ); 1017 id = sess->ops_ids[buf_id]; 1018 id->con = con; 1019 id->dir = READ; 1020 id->msg_id = buf_id; 1021 id->rd_msg = msg; 1022 usr_len = le16_to_cpu(msg->usr_len); 1023 data_len = off - usr_len; 1024 data = page_address(srv->chunks[buf_id]); 1025 ret = ctx->ops.rdma_ev(srv->priv, id, READ, data, data_len, 1026 data + data_len, usr_len); 1027 1028 if (unlikely(ret)) { 1029 rtrs_err_rl(s, 1030 "Processing read request failed, user module cb reported for msg_id %d, err: %d\n", 1031 buf_id, ret); 1032 goto send_err_msg; 1033 } 1034 1035 return; 1036 1037 send_err_msg: 1038 ret = send_io_resp_imm(con, id, ret); 1039 if (ret < 0) { 1040 rtrs_err_rl(s, 1041 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n", 1042 buf_id, ret); 1043 close_sess(sess); 1044 } 1045 rtrs_srv_put_ops_ids(sess); 1046 } 1047 1048 static void process_write(struct rtrs_srv_con *con, 1049 struct rtrs_msg_rdma_write *req, 1050 u32 buf_id, u32 off) 1051 { 1052 struct rtrs_sess *s = con->c.sess; 1053 struct rtrs_srv_sess *sess = to_srv_sess(s); 1054 struct rtrs_srv *srv = sess->srv; 1055 struct rtrs_srv_ctx *ctx = srv->ctx; 1056 struct rtrs_srv_op *id; 1057 1058 size_t data_len, usr_len; 1059 void *data; 1060 int ret; 1061 1062 if (unlikely(sess->state != RTRS_SRV_CONNECTED)) { 1063 rtrs_err_rl(s, 1064 "Processing write request failed, session is disconnected, sess state %s\n", 1065 rtrs_srv_state_str(sess->state)); 1066 return; 1067 } 1068 rtrs_srv_get_ops_ids(sess); 1069 rtrs_srv_update_rdma_stats(sess->stats, off, WRITE); 1070 id = sess->ops_ids[buf_id]; 1071 id->con = con; 1072 id->dir = WRITE; 1073 id->msg_id = buf_id; 1074 1075 usr_len = le16_to_cpu(req->usr_len); 1076 data_len = off - usr_len; 1077 data = page_address(srv->chunks[buf_id]); 1078 ret = ctx->ops.rdma_ev(srv->priv, id, WRITE, data, data_len, 1079 data + data_len, usr_len); 1080 if (unlikely(ret)) { 1081 rtrs_err_rl(s, 1082 "Processing write request failed, user module callback reports err: %d\n", 1083 ret); 1084 goto send_err_msg; 1085 } 1086 1087 return; 1088 1089 send_err_msg: 1090 ret = send_io_resp_imm(con, id, ret); 1091 if (ret < 0) { 1092 rtrs_err_rl(s, 1093 "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n", 1094 buf_id, ret); 1095 close_sess(sess); 1096 } 1097 rtrs_srv_put_ops_ids(sess); 1098 } 1099 1100 static void process_io_req(struct rtrs_srv_con *con, void *msg, 1101 u32 id, u32 off) 1102 { 1103 struct rtrs_sess *s = con->c.sess; 1104 struct rtrs_srv_sess *sess = to_srv_sess(s); 1105 struct rtrs_msg_rdma_hdr *hdr; 1106 unsigned int type; 1107 1108 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, sess->dma_addr[id], 1109 max_chunk_size, DMA_BIDIRECTIONAL); 1110 hdr = msg; 1111 type = le16_to_cpu(hdr->type); 1112 1113 switch (type) { 1114 case RTRS_MSG_WRITE: 1115 process_write(con, msg, id, off); 1116 break; 1117 case RTRS_MSG_READ: 1118 process_read(con, msg, id, off); 1119 break; 1120 default: 1121 rtrs_err(s, 1122 "Processing I/O request failed, unknown message type received: 0x%02x\n", 1123 type); 1124 goto err; 1125 } 1126 1127 return; 1128 1129 err: 1130 close_sess(sess); 1131 } 1132 1133 static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc) 1134 { 1135 struct rtrs_srv_mr *mr = 1136 container_of(wc->wr_cqe, typeof(*mr), inv_cqe); 1137 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1138 struct rtrs_sess *s = con->c.sess; 1139 struct rtrs_srv_sess *sess = to_srv_sess(s); 1140 struct rtrs_srv *srv = sess->srv; 1141 u32 msg_id, off; 1142 void *data; 1143 1144 if (unlikely(wc->status != IB_WC_SUCCESS)) { 1145 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n", 1146 ib_wc_status_msg(wc->status)); 1147 close_sess(sess); 1148 } 1149 msg_id = mr->msg_id; 1150 off = mr->msg_off; 1151 data = page_address(srv->chunks[msg_id]) + off; 1152 process_io_req(con, data, msg_id, off); 1153 } 1154 1155 static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con, 1156 struct rtrs_srv_mr *mr) 1157 { 1158 struct ib_send_wr wr = { 1159 .opcode = IB_WR_LOCAL_INV, 1160 .wr_cqe = &mr->inv_cqe, 1161 .send_flags = IB_SEND_SIGNALED, 1162 .ex.invalidate_rkey = mr->mr->rkey, 1163 }; 1164 mr->inv_cqe.done = rtrs_srv_inv_rkey_done; 1165 1166 return ib_post_send(con->c.qp, &wr, NULL); 1167 } 1168 1169 static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con) 1170 { 1171 spin_lock(&con->rsp_wr_wait_lock); 1172 while (!list_empty(&con->rsp_wr_wait_list)) { 1173 struct rtrs_srv_op *id; 1174 int ret; 1175 1176 id = list_entry(con->rsp_wr_wait_list.next, 1177 struct rtrs_srv_op, wait_list); 1178 list_del(&id->wait_list); 1179 1180 spin_unlock(&con->rsp_wr_wait_lock); 1181 ret = rtrs_srv_resp_rdma(id, id->status); 1182 spin_lock(&con->rsp_wr_wait_lock); 1183 1184 if (!ret) { 1185 list_add(&id->wait_list, &con->rsp_wr_wait_list); 1186 break; 1187 } 1188 } 1189 spin_unlock(&con->rsp_wr_wait_lock); 1190 } 1191 1192 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc) 1193 { 1194 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1195 struct rtrs_sess *s = con->c.sess; 1196 struct rtrs_srv_sess *sess = to_srv_sess(s); 1197 struct rtrs_srv *srv = sess->srv; 1198 u32 imm_type, imm_payload; 1199 int err; 1200 1201 if (unlikely(wc->status != IB_WC_SUCCESS)) { 1202 if (wc->status != IB_WC_WR_FLUSH_ERR) { 1203 rtrs_err(s, 1204 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n", 1205 ib_wc_status_msg(wc->status), wc->wr_cqe, 1206 wc->opcode, wc->vendor_err, wc->byte_len); 1207 close_sess(sess); 1208 } 1209 return; 1210 } 1211 1212 switch (wc->opcode) { 1213 case IB_WC_RECV_RDMA_WITH_IMM: 1214 /* 1215 * post_recv() RDMA write completions of IO reqs (read/write) 1216 * and hb 1217 */ 1218 if (WARN_ON(wc->wr_cqe != &io_comp_cqe)) 1219 return; 1220 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 1221 if (unlikely(err)) { 1222 rtrs_err(s, "rtrs_post_recv(), err: %d\n", err); 1223 close_sess(sess); 1224 break; 1225 } 1226 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), 1227 &imm_type, &imm_payload); 1228 if (likely(imm_type == RTRS_IO_REQ_IMM)) { 1229 u32 msg_id, off; 1230 void *data; 1231 1232 msg_id = imm_payload >> sess->mem_bits; 1233 off = imm_payload & ((1 << sess->mem_bits) - 1); 1234 if (unlikely(msg_id >= srv->queue_depth || 1235 off >= max_chunk_size)) { 1236 rtrs_err(s, "Wrong msg_id %u, off %u\n", 1237 msg_id, off); 1238 close_sess(sess); 1239 return; 1240 } 1241 if (always_invalidate) { 1242 struct rtrs_srv_mr *mr = &sess->mrs[msg_id]; 1243 1244 mr->msg_off = off; 1245 mr->msg_id = msg_id; 1246 err = rtrs_srv_inv_rkey(con, mr); 1247 if (unlikely(err)) { 1248 rtrs_err(s, "rtrs_post_recv(), err: %d\n", 1249 err); 1250 close_sess(sess); 1251 break; 1252 } 1253 } else { 1254 data = page_address(srv->chunks[msg_id]) + off; 1255 process_io_req(con, data, msg_id, off); 1256 } 1257 } else if (imm_type == RTRS_HB_MSG_IMM) { 1258 WARN_ON(con->c.cid); 1259 rtrs_send_hb_ack(&sess->s); 1260 } else if (imm_type == RTRS_HB_ACK_IMM) { 1261 WARN_ON(con->c.cid); 1262 sess->s.hb_missed_cnt = 0; 1263 } else { 1264 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type); 1265 } 1266 break; 1267 case IB_WC_RDMA_WRITE: 1268 case IB_WC_SEND: 1269 /* 1270 * post_send() RDMA write completions of IO reqs (read/write) 1271 */ 1272 atomic_add(srv->queue_depth, &con->sq_wr_avail); 1273 1274 if (unlikely(!list_empty_careful(&con->rsp_wr_wait_list))) 1275 rtrs_rdma_process_wr_wait_list(con); 1276 1277 break; 1278 default: 1279 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode); 1280 return; 1281 } 1282 } 1283 1284 /** 1285 * rtrs_srv_get_sess_name() - Get rtrs_srv peer hostname. 1286 * @srv: Session 1287 * @sessname: Sessname buffer 1288 * @len: Length of sessname buffer 1289 */ 1290 int rtrs_srv_get_sess_name(struct rtrs_srv *srv, char *sessname, size_t len) 1291 { 1292 struct rtrs_srv_sess *sess; 1293 int err = -ENOTCONN; 1294 1295 mutex_lock(&srv->paths_mutex); 1296 list_for_each_entry(sess, &srv->paths_list, s.entry) { 1297 if (sess->state != RTRS_SRV_CONNECTED) 1298 continue; 1299 strscpy(sessname, sess->s.sessname, 1300 min_t(size_t, sizeof(sess->s.sessname), len)); 1301 err = 0; 1302 break; 1303 } 1304 mutex_unlock(&srv->paths_mutex); 1305 1306 return err; 1307 } 1308 EXPORT_SYMBOL(rtrs_srv_get_sess_name); 1309 1310 /** 1311 * rtrs_srv_get_queue_depth() - Get rtrs_srv qdepth. 1312 * @srv: Session 1313 */ 1314 int rtrs_srv_get_queue_depth(struct rtrs_srv *srv) 1315 { 1316 return srv->queue_depth; 1317 } 1318 EXPORT_SYMBOL(rtrs_srv_get_queue_depth); 1319 1320 static int find_next_bit_ring(struct rtrs_srv_sess *sess) 1321 { 1322 struct ib_device *ib_dev = sess->s.dev->ib_dev; 1323 int v; 1324 1325 v = cpumask_next(sess->cur_cq_vector, &cq_affinity_mask); 1326 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors) 1327 v = cpumask_first(&cq_affinity_mask); 1328 return v; 1329 } 1330 1331 static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_sess *sess) 1332 { 1333 sess->cur_cq_vector = find_next_bit_ring(sess); 1334 1335 return sess->cur_cq_vector; 1336 } 1337 1338 static void rtrs_srv_dev_release(struct device *dev) 1339 { 1340 struct rtrs_srv *srv = container_of(dev, struct rtrs_srv, dev); 1341 1342 kfree(srv); 1343 } 1344 1345 static void free_srv(struct rtrs_srv *srv) 1346 { 1347 int i; 1348 1349 WARN_ON(refcount_read(&srv->refcount)); 1350 for (i = 0; i < srv->queue_depth; i++) 1351 mempool_free(srv->chunks[i], chunk_pool); 1352 kfree(srv->chunks); 1353 mutex_destroy(&srv->paths_mutex); 1354 mutex_destroy(&srv->paths_ev_mutex); 1355 /* last put to release the srv structure */ 1356 put_device(&srv->dev); 1357 } 1358 1359 static struct rtrs_srv *get_or_create_srv(struct rtrs_srv_ctx *ctx, 1360 const uuid_t *paths_uuid, 1361 bool first_conn) 1362 { 1363 struct rtrs_srv *srv; 1364 int i; 1365 1366 mutex_lock(&ctx->srv_mutex); 1367 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 1368 if (uuid_equal(&srv->paths_uuid, paths_uuid) && 1369 refcount_inc_not_zero(&srv->refcount)) { 1370 mutex_unlock(&ctx->srv_mutex); 1371 return srv; 1372 } 1373 } 1374 mutex_unlock(&ctx->srv_mutex); 1375 /* 1376 * If this request is not the first connection request from the 1377 * client for this session then fail and return error. 1378 */ 1379 if (!first_conn) { 1380 pr_err_ratelimited("Error: Not the first connection request for this session\n"); 1381 return ERR_PTR(-ENXIO); 1382 } 1383 1384 /* need to allocate a new srv */ 1385 srv = kzalloc(sizeof(*srv), GFP_KERNEL); 1386 if (!srv) 1387 return ERR_PTR(-ENOMEM); 1388 1389 INIT_LIST_HEAD(&srv->paths_list); 1390 mutex_init(&srv->paths_mutex); 1391 mutex_init(&srv->paths_ev_mutex); 1392 uuid_copy(&srv->paths_uuid, paths_uuid); 1393 srv->queue_depth = sess_queue_depth; 1394 srv->ctx = ctx; 1395 device_initialize(&srv->dev); 1396 srv->dev.release = rtrs_srv_dev_release; 1397 1398 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks), 1399 GFP_KERNEL); 1400 if (!srv->chunks) 1401 goto err_free_srv; 1402 1403 for (i = 0; i < srv->queue_depth; i++) { 1404 srv->chunks[i] = mempool_alloc(chunk_pool, GFP_KERNEL); 1405 if (!srv->chunks[i]) 1406 goto err_free_chunks; 1407 } 1408 refcount_set(&srv->refcount, 1); 1409 mutex_lock(&ctx->srv_mutex); 1410 list_add(&srv->ctx_list, &ctx->srv_list); 1411 mutex_unlock(&ctx->srv_mutex); 1412 1413 return srv; 1414 1415 err_free_chunks: 1416 while (i--) 1417 mempool_free(srv->chunks[i], chunk_pool); 1418 kfree(srv->chunks); 1419 1420 err_free_srv: 1421 kfree(srv); 1422 return ERR_PTR(-ENOMEM); 1423 } 1424 1425 static void put_srv(struct rtrs_srv *srv) 1426 { 1427 if (refcount_dec_and_test(&srv->refcount)) { 1428 struct rtrs_srv_ctx *ctx = srv->ctx; 1429 1430 WARN_ON(srv->dev.kobj.state_in_sysfs); 1431 1432 mutex_lock(&ctx->srv_mutex); 1433 list_del(&srv->ctx_list); 1434 mutex_unlock(&ctx->srv_mutex); 1435 free_srv(srv); 1436 } 1437 } 1438 1439 static void __add_path_to_srv(struct rtrs_srv *srv, 1440 struct rtrs_srv_sess *sess) 1441 { 1442 list_add_tail(&sess->s.entry, &srv->paths_list); 1443 srv->paths_num++; 1444 WARN_ON(srv->paths_num >= MAX_PATHS_NUM); 1445 } 1446 1447 static void del_path_from_srv(struct rtrs_srv_sess *sess) 1448 { 1449 struct rtrs_srv *srv = sess->srv; 1450 1451 if (WARN_ON(!srv)) 1452 return; 1453 1454 mutex_lock(&srv->paths_mutex); 1455 list_del(&sess->s.entry); 1456 WARN_ON(!srv->paths_num); 1457 srv->paths_num--; 1458 mutex_unlock(&srv->paths_mutex); 1459 } 1460 1461 /* return true if addresses are the same, error other wise */ 1462 static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b) 1463 { 1464 switch (a->sa_family) { 1465 case AF_IB: 1466 return memcmp(&((struct sockaddr_ib *)a)->sib_addr, 1467 &((struct sockaddr_ib *)b)->sib_addr, 1468 sizeof(struct ib_addr)) && 1469 (b->sa_family == AF_IB); 1470 case AF_INET: 1471 return memcmp(&((struct sockaddr_in *)a)->sin_addr, 1472 &((struct sockaddr_in *)b)->sin_addr, 1473 sizeof(struct in_addr)) && 1474 (b->sa_family == AF_INET); 1475 case AF_INET6: 1476 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr, 1477 &((struct sockaddr_in6 *)b)->sin6_addr, 1478 sizeof(struct in6_addr)) && 1479 (b->sa_family == AF_INET6); 1480 default: 1481 return -ENOENT; 1482 } 1483 } 1484 1485 static bool __is_path_w_addr_exists(struct rtrs_srv *srv, 1486 struct rdma_addr *addr) 1487 { 1488 struct rtrs_srv_sess *sess; 1489 1490 list_for_each_entry(sess, &srv->paths_list, s.entry) 1491 if (!sockaddr_cmp((struct sockaddr *)&sess->s.dst_addr, 1492 (struct sockaddr *)&addr->dst_addr) && 1493 !sockaddr_cmp((struct sockaddr *)&sess->s.src_addr, 1494 (struct sockaddr *)&addr->src_addr)) 1495 return true; 1496 1497 return false; 1498 } 1499 1500 static void free_sess(struct rtrs_srv_sess *sess) 1501 { 1502 if (sess->kobj.state_in_sysfs) { 1503 kobject_del(&sess->kobj); 1504 kobject_put(&sess->kobj); 1505 } else { 1506 kfree(sess->stats); 1507 kfree(sess); 1508 } 1509 } 1510 1511 static void rtrs_srv_close_work(struct work_struct *work) 1512 { 1513 struct rtrs_srv_sess *sess; 1514 struct rtrs_srv_con *con; 1515 int i; 1516 1517 sess = container_of(work, typeof(*sess), close_work); 1518 1519 rtrs_srv_destroy_sess_files(sess); 1520 rtrs_srv_stop_hb(sess); 1521 1522 for (i = 0; i < sess->s.con_num; i++) { 1523 if (!sess->s.con[i]) 1524 continue; 1525 con = to_srv_con(sess->s.con[i]); 1526 rdma_disconnect(con->c.cm_id); 1527 ib_drain_qp(con->c.qp); 1528 } 1529 1530 /* 1531 * Degrade ref count to the usual model with a single shared 1532 * atomic_t counter 1533 */ 1534 percpu_ref_kill(&sess->ids_inflight_ref); 1535 1536 /* Wait for all completion */ 1537 wait_for_completion(&sess->complete_done); 1538 1539 /* Notify upper layer if we are the last path */ 1540 rtrs_srv_sess_down(sess); 1541 1542 unmap_cont_bufs(sess); 1543 rtrs_srv_free_ops_ids(sess); 1544 1545 for (i = 0; i < sess->s.con_num; i++) { 1546 if (!sess->s.con[i]) 1547 continue; 1548 con = to_srv_con(sess->s.con[i]); 1549 rtrs_cq_qp_destroy(&con->c); 1550 rdma_destroy_id(con->c.cm_id); 1551 kfree(con); 1552 } 1553 rtrs_ib_dev_put(sess->s.dev); 1554 1555 del_path_from_srv(sess); 1556 put_srv(sess->srv); 1557 sess->srv = NULL; 1558 rtrs_srv_change_state(sess, RTRS_SRV_CLOSED); 1559 1560 kfree(sess->dma_addr); 1561 kfree(sess->s.con); 1562 free_sess(sess); 1563 } 1564 1565 static int rtrs_rdma_do_accept(struct rtrs_srv_sess *sess, 1566 struct rdma_cm_id *cm_id) 1567 { 1568 struct rtrs_srv *srv = sess->srv; 1569 struct rtrs_msg_conn_rsp msg; 1570 struct rdma_conn_param param; 1571 int err; 1572 1573 param = (struct rdma_conn_param) { 1574 .rnr_retry_count = 7, 1575 .private_data = &msg, 1576 .private_data_len = sizeof(msg), 1577 }; 1578 1579 msg = (struct rtrs_msg_conn_rsp) { 1580 .magic = cpu_to_le16(RTRS_MAGIC), 1581 .version = cpu_to_le16(RTRS_PROTO_VER), 1582 .queue_depth = cpu_to_le16(srv->queue_depth), 1583 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE), 1584 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE), 1585 }; 1586 1587 if (always_invalidate) 1588 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F); 1589 1590 err = rdma_accept(cm_id, ¶m); 1591 if (err) 1592 pr_err("rdma_accept(), err: %d\n", err); 1593 1594 return err; 1595 } 1596 1597 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno) 1598 { 1599 struct rtrs_msg_conn_rsp msg; 1600 int err; 1601 1602 msg = (struct rtrs_msg_conn_rsp) { 1603 .magic = cpu_to_le16(RTRS_MAGIC), 1604 .version = cpu_to_le16(RTRS_PROTO_VER), 1605 .errno = cpu_to_le16(errno), 1606 }; 1607 1608 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED); 1609 if (err) 1610 pr_err("rdma_reject(), err: %d\n", err); 1611 1612 /* Bounce errno back */ 1613 return errno; 1614 } 1615 1616 static struct rtrs_srv_sess * 1617 __find_sess(struct rtrs_srv *srv, const uuid_t *sess_uuid) 1618 { 1619 struct rtrs_srv_sess *sess; 1620 1621 list_for_each_entry(sess, &srv->paths_list, s.entry) { 1622 if (uuid_equal(&sess->s.uuid, sess_uuid)) 1623 return sess; 1624 } 1625 1626 return NULL; 1627 } 1628 1629 static int create_con(struct rtrs_srv_sess *sess, 1630 struct rdma_cm_id *cm_id, 1631 unsigned int cid) 1632 { 1633 struct rtrs_srv *srv = sess->srv; 1634 struct rtrs_sess *s = &sess->s; 1635 struct rtrs_srv_con *con; 1636 1637 u32 cq_num, max_send_wr, max_recv_wr, wr_limit; 1638 int err, cq_vector; 1639 1640 con = kzalloc(sizeof(*con), GFP_KERNEL); 1641 if (!con) { 1642 err = -ENOMEM; 1643 goto err; 1644 } 1645 1646 spin_lock_init(&con->rsp_wr_wait_lock); 1647 INIT_LIST_HEAD(&con->rsp_wr_wait_list); 1648 con->c.cm_id = cm_id; 1649 con->c.sess = &sess->s; 1650 con->c.cid = cid; 1651 atomic_set(&con->wr_cnt, 1); 1652 wr_limit = sess->s.dev->ib_dev->attrs.max_qp_wr; 1653 1654 if (con->c.cid == 0) { 1655 /* 1656 * All receive and all send (each requiring invalidate) 1657 * + 2 for drain and heartbeat 1658 */ 1659 max_send_wr = min_t(int, wr_limit, 1660 SERVICE_CON_QUEUE_DEPTH * 2 + 2); 1661 max_recv_wr = max_send_wr; 1662 } else { 1663 /* when always_invlaidate enalbed, we need linv+rinv+mr+imm */ 1664 if (always_invalidate) 1665 max_send_wr = 1666 min_t(int, wr_limit, 1667 srv->queue_depth * (1 + 4) + 1); 1668 else 1669 max_send_wr = 1670 min_t(int, wr_limit, 1671 srv->queue_depth * (1 + 2) + 1); 1672 1673 max_recv_wr = srv->queue_depth + 1; 1674 /* 1675 * If we have all receive requests posted and 1676 * all write requests posted and each read request 1677 * requires an invalidate request + drain 1678 * and qp gets into error state. 1679 */ 1680 } 1681 cq_num = max_send_wr + max_recv_wr; 1682 atomic_set(&con->sq_wr_avail, max_send_wr); 1683 cq_vector = rtrs_srv_get_next_cq_vector(sess); 1684 1685 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */ 1686 err = rtrs_cq_qp_create(&sess->s, &con->c, 1, cq_vector, cq_num, 1687 max_send_wr, max_recv_wr, 1688 IB_POLL_WORKQUEUE); 1689 if (err) { 1690 rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err); 1691 goto free_con; 1692 } 1693 if (con->c.cid == 0) { 1694 err = post_recv_info_req(con); 1695 if (err) 1696 goto free_cqqp; 1697 } 1698 WARN_ON(sess->s.con[cid]); 1699 sess->s.con[cid] = &con->c; 1700 1701 /* 1702 * Change context from server to current connection. The other 1703 * way is to use cm_id->qp->qp_context, which does not work on OFED. 1704 */ 1705 cm_id->context = &con->c; 1706 1707 return 0; 1708 1709 free_cqqp: 1710 rtrs_cq_qp_destroy(&con->c); 1711 free_con: 1712 kfree(con); 1713 1714 err: 1715 return err; 1716 } 1717 1718 static struct rtrs_srv_sess *__alloc_sess(struct rtrs_srv *srv, 1719 struct rdma_cm_id *cm_id, 1720 unsigned int con_num, 1721 unsigned int recon_cnt, 1722 const uuid_t *uuid) 1723 { 1724 struct rtrs_srv_sess *sess; 1725 int err = -ENOMEM; 1726 char str[NAME_MAX]; 1727 struct rtrs_addr path; 1728 1729 if (srv->paths_num >= MAX_PATHS_NUM) { 1730 err = -ECONNRESET; 1731 goto err; 1732 } 1733 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) { 1734 err = -EEXIST; 1735 pr_err("Path with same addr exists\n"); 1736 goto err; 1737 } 1738 sess = kzalloc(sizeof(*sess), GFP_KERNEL); 1739 if (!sess) 1740 goto err; 1741 1742 sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL); 1743 if (!sess->stats) 1744 goto err_free_sess; 1745 1746 sess->stats->sess = sess; 1747 1748 sess->dma_addr = kcalloc(srv->queue_depth, sizeof(*sess->dma_addr), 1749 GFP_KERNEL); 1750 if (!sess->dma_addr) 1751 goto err_free_stats; 1752 1753 sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL); 1754 if (!sess->s.con) 1755 goto err_free_dma_addr; 1756 1757 sess->state = RTRS_SRV_CONNECTING; 1758 sess->srv = srv; 1759 sess->cur_cq_vector = -1; 1760 sess->s.dst_addr = cm_id->route.addr.dst_addr; 1761 sess->s.src_addr = cm_id->route.addr.src_addr; 1762 1763 /* temporary until receiving session-name from client */ 1764 path.src = &sess->s.src_addr; 1765 path.dst = &sess->s.dst_addr; 1766 rtrs_addr_to_str(&path, str, sizeof(str)); 1767 strscpy(sess->s.sessname, str, sizeof(sess->s.sessname)); 1768 1769 sess->s.con_num = con_num; 1770 sess->s.recon_cnt = recon_cnt; 1771 uuid_copy(&sess->s.uuid, uuid); 1772 spin_lock_init(&sess->state_lock); 1773 INIT_WORK(&sess->close_work, rtrs_srv_close_work); 1774 rtrs_srv_init_hb(sess); 1775 1776 sess->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd); 1777 if (!sess->s.dev) { 1778 err = -ENOMEM; 1779 goto err_free_con; 1780 } 1781 err = map_cont_bufs(sess); 1782 if (err) 1783 goto err_put_dev; 1784 1785 err = rtrs_srv_alloc_ops_ids(sess); 1786 if (err) 1787 goto err_unmap_bufs; 1788 1789 __add_path_to_srv(srv, sess); 1790 1791 return sess; 1792 1793 err_unmap_bufs: 1794 unmap_cont_bufs(sess); 1795 err_put_dev: 1796 rtrs_ib_dev_put(sess->s.dev); 1797 err_free_con: 1798 kfree(sess->s.con); 1799 err_free_dma_addr: 1800 kfree(sess->dma_addr); 1801 err_free_stats: 1802 kfree(sess->stats); 1803 err_free_sess: 1804 kfree(sess); 1805 err: 1806 return ERR_PTR(err); 1807 } 1808 1809 static int rtrs_rdma_connect(struct rdma_cm_id *cm_id, 1810 const struct rtrs_msg_conn_req *msg, 1811 size_t len) 1812 { 1813 struct rtrs_srv_ctx *ctx = cm_id->context; 1814 struct rtrs_srv_sess *sess; 1815 struct rtrs_srv *srv; 1816 1817 u16 version, con_num, cid; 1818 u16 recon_cnt; 1819 int err = -ECONNRESET; 1820 1821 if (len < sizeof(*msg)) { 1822 pr_err("Invalid RTRS connection request\n"); 1823 goto reject_w_err; 1824 } 1825 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) { 1826 pr_err("Invalid RTRS magic\n"); 1827 goto reject_w_err; 1828 } 1829 version = le16_to_cpu(msg->version); 1830 if (version >> 8 != RTRS_PROTO_VER_MAJOR) { 1831 pr_err("Unsupported major RTRS version: %d, expected %d\n", 1832 version >> 8, RTRS_PROTO_VER_MAJOR); 1833 goto reject_w_err; 1834 } 1835 con_num = le16_to_cpu(msg->cid_num); 1836 if (con_num > 4096) { 1837 /* Sanity check */ 1838 pr_err("Too many connections requested: %d\n", con_num); 1839 goto reject_w_err; 1840 } 1841 cid = le16_to_cpu(msg->cid); 1842 if (cid >= con_num) { 1843 /* Sanity check */ 1844 pr_err("Incorrect cid: %d >= %d\n", cid, con_num); 1845 goto reject_w_err; 1846 } 1847 recon_cnt = le16_to_cpu(msg->recon_cnt); 1848 srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn); 1849 if (IS_ERR(srv)) { 1850 err = PTR_ERR(srv); 1851 pr_err("get_or_create_srv(), error %d\n", err); 1852 goto reject_w_err; 1853 } 1854 mutex_lock(&srv->paths_mutex); 1855 sess = __find_sess(srv, &msg->sess_uuid); 1856 if (sess) { 1857 struct rtrs_sess *s = &sess->s; 1858 1859 /* Session already holds a reference */ 1860 put_srv(srv); 1861 1862 if (sess->state != RTRS_SRV_CONNECTING) { 1863 rtrs_err(s, "Session in wrong state: %s\n", 1864 rtrs_srv_state_str(sess->state)); 1865 mutex_unlock(&srv->paths_mutex); 1866 goto reject_w_err; 1867 } 1868 /* 1869 * Sanity checks 1870 */ 1871 if (con_num != s->con_num || cid >= s->con_num) { 1872 rtrs_err(s, "Incorrect request: %d, %d\n", 1873 cid, con_num); 1874 mutex_unlock(&srv->paths_mutex); 1875 goto reject_w_err; 1876 } 1877 if (s->con[cid]) { 1878 rtrs_err(s, "Connection already exists: %d\n", 1879 cid); 1880 mutex_unlock(&srv->paths_mutex); 1881 goto reject_w_err; 1882 } 1883 } else { 1884 sess = __alloc_sess(srv, cm_id, con_num, recon_cnt, 1885 &msg->sess_uuid); 1886 if (IS_ERR(sess)) { 1887 mutex_unlock(&srv->paths_mutex); 1888 put_srv(srv); 1889 err = PTR_ERR(sess); 1890 pr_err("RTRS server session allocation failed: %d\n", err); 1891 goto reject_w_err; 1892 } 1893 } 1894 err = create_con(sess, cm_id, cid); 1895 if (err) { 1896 rtrs_err((&sess->s), "create_con(), error %d\n", err); 1897 (void)rtrs_rdma_do_reject(cm_id, err); 1898 /* 1899 * Since session has other connections we follow normal way 1900 * through workqueue, but still return an error to tell cma.c 1901 * to call rdma_destroy_id() for current connection. 1902 */ 1903 goto close_and_return_err; 1904 } 1905 err = rtrs_rdma_do_accept(sess, cm_id); 1906 if (err) { 1907 rtrs_err((&sess->s), "rtrs_rdma_do_accept(), error %d\n", err); 1908 (void)rtrs_rdma_do_reject(cm_id, err); 1909 /* 1910 * Since current connection was successfully added to the 1911 * session we follow normal way through workqueue to close the 1912 * session, thus return 0 to tell cma.c we call 1913 * rdma_destroy_id() ourselves. 1914 */ 1915 err = 0; 1916 goto close_and_return_err; 1917 } 1918 mutex_unlock(&srv->paths_mutex); 1919 1920 return 0; 1921 1922 reject_w_err: 1923 return rtrs_rdma_do_reject(cm_id, err); 1924 1925 close_and_return_err: 1926 mutex_unlock(&srv->paths_mutex); 1927 close_sess(sess); 1928 1929 return err; 1930 } 1931 1932 static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id, 1933 struct rdma_cm_event *ev) 1934 { 1935 struct rtrs_srv_sess *sess = NULL; 1936 struct rtrs_sess *s = NULL; 1937 1938 if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 1939 struct rtrs_con *c = cm_id->context; 1940 1941 s = c->sess; 1942 sess = to_srv_sess(s); 1943 } 1944 1945 switch (ev->event) { 1946 case RDMA_CM_EVENT_CONNECT_REQUEST: 1947 /* 1948 * In case of error cma.c will destroy cm_id, 1949 * see cma_process_remove() 1950 */ 1951 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data, 1952 ev->param.conn.private_data_len); 1953 case RDMA_CM_EVENT_ESTABLISHED: 1954 /* Nothing here */ 1955 break; 1956 case RDMA_CM_EVENT_REJECTED: 1957 case RDMA_CM_EVENT_CONNECT_ERROR: 1958 case RDMA_CM_EVENT_UNREACHABLE: 1959 rtrs_err(s, "CM error (CM event: %s, err: %d)\n", 1960 rdma_event_msg(ev->event), ev->status); 1961 fallthrough; 1962 case RDMA_CM_EVENT_DISCONNECTED: 1963 case RDMA_CM_EVENT_ADDR_CHANGE: 1964 case RDMA_CM_EVENT_TIMEWAIT_EXIT: 1965 case RDMA_CM_EVENT_DEVICE_REMOVAL: 1966 close_sess(sess); 1967 break; 1968 default: 1969 pr_err("Ignoring unexpected CM event %s, err %d\n", 1970 rdma_event_msg(ev->event), ev->status); 1971 break; 1972 } 1973 1974 return 0; 1975 } 1976 1977 static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx, 1978 struct sockaddr *addr, 1979 enum rdma_ucm_port_space ps) 1980 { 1981 struct rdma_cm_id *cm_id; 1982 int ret; 1983 1984 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler, 1985 ctx, ps, IB_QPT_RC); 1986 if (IS_ERR(cm_id)) { 1987 ret = PTR_ERR(cm_id); 1988 pr_err("Creating id for RDMA connection failed, err: %d\n", 1989 ret); 1990 goto err_out; 1991 } 1992 ret = rdma_bind_addr(cm_id, addr); 1993 if (ret) { 1994 pr_err("Binding RDMA address failed, err: %d\n", ret); 1995 goto err_cm; 1996 } 1997 ret = rdma_listen(cm_id, 64); 1998 if (ret) { 1999 pr_err("Listening on RDMA connection failed, err: %d\n", 2000 ret); 2001 goto err_cm; 2002 } 2003 2004 return cm_id; 2005 2006 err_cm: 2007 rdma_destroy_id(cm_id); 2008 err_out: 2009 2010 return ERR_PTR(ret); 2011 } 2012 2013 static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port) 2014 { 2015 struct sockaddr_in6 sin = { 2016 .sin6_family = AF_INET6, 2017 .sin6_addr = IN6ADDR_ANY_INIT, 2018 .sin6_port = htons(port), 2019 }; 2020 struct sockaddr_ib sib = { 2021 .sib_family = AF_IB, 2022 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port), 2023 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL), 2024 .sib_pkey = cpu_to_be16(0xffff), 2025 }; 2026 struct rdma_cm_id *cm_ip, *cm_ib; 2027 int ret; 2028 2029 /* 2030 * We accept both IPoIB and IB connections, so we need to keep 2031 * two cm id's, one for each socket type and port space. 2032 * If the cm initialization of one of the id's fails, we abort 2033 * everything. 2034 */ 2035 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP); 2036 if (IS_ERR(cm_ip)) 2037 return PTR_ERR(cm_ip); 2038 2039 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB); 2040 if (IS_ERR(cm_ib)) { 2041 ret = PTR_ERR(cm_ib); 2042 goto free_cm_ip; 2043 } 2044 2045 ctx->cm_id_ip = cm_ip; 2046 ctx->cm_id_ib = cm_ib; 2047 2048 return 0; 2049 2050 free_cm_ip: 2051 rdma_destroy_id(cm_ip); 2052 2053 return ret; 2054 } 2055 2056 static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops) 2057 { 2058 struct rtrs_srv_ctx *ctx; 2059 2060 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); 2061 if (!ctx) 2062 return NULL; 2063 2064 ctx->ops = *ops; 2065 mutex_init(&ctx->srv_mutex); 2066 INIT_LIST_HEAD(&ctx->srv_list); 2067 2068 return ctx; 2069 } 2070 2071 static void free_srv_ctx(struct rtrs_srv_ctx *ctx) 2072 { 2073 WARN_ON(!list_empty(&ctx->srv_list)); 2074 mutex_destroy(&ctx->srv_mutex); 2075 kfree(ctx); 2076 } 2077 2078 static int rtrs_srv_add_one(struct ib_device *device) 2079 { 2080 struct rtrs_srv_ctx *ctx; 2081 int ret = 0; 2082 2083 mutex_lock(&ib_ctx.ib_dev_mutex); 2084 if (ib_ctx.ib_dev_count) 2085 goto out; 2086 2087 /* 2088 * Since our CM IDs are NOT bound to any ib device we will create them 2089 * only once 2090 */ 2091 ctx = ib_ctx.srv_ctx; 2092 ret = rtrs_srv_rdma_init(ctx, ib_ctx.port); 2093 if (ret) { 2094 /* 2095 * We errored out here. 2096 * According to the ib code, if we encounter an error here then the 2097 * error code is ignored, and no more calls to our ops are made. 2098 */ 2099 pr_err("Failed to initialize RDMA connection"); 2100 goto err_out; 2101 } 2102 2103 out: 2104 /* 2105 * Keep a track on the number of ib devices added 2106 */ 2107 ib_ctx.ib_dev_count++; 2108 2109 err_out: 2110 mutex_unlock(&ib_ctx.ib_dev_mutex); 2111 return ret; 2112 } 2113 2114 static void rtrs_srv_remove_one(struct ib_device *device, void *client_data) 2115 { 2116 struct rtrs_srv_ctx *ctx; 2117 2118 mutex_lock(&ib_ctx.ib_dev_mutex); 2119 ib_ctx.ib_dev_count--; 2120 2121 if (ib_ctx.ib_dev_count) 2122 goto out; 2123 2124 /* 2125 * Since our CM IDs are NOT bound to any ib device we will remove them 2126 * only once, when the last device is removed 2127 */ 2128 ctx = ib_ctx.srv_ctx; 2129 rdma_destroy_id(ctx->cm_id_ip); 2130 rdma_destroy_id(ctx->cm_id_ib); 2131 2132 out: 2133 mutex_unlock(&ib_ctx.ib_dev_mutex); 2134 } 2135 2136 static struct ib_client rtrs_srv_client = { 2137 .name = "rtrs_server", 2138 .add = rtrs_srv_add_one, 2139 .remove = rtrs_srv_remove_one 2140 }; 2141 2142 /** 2143 * rtrs_srv_open() - open RTRS server context 2144 * @ops: callback functions 2145 * @port: port to listen on 2146 * 2147 * Creates server context with specified callbacks. 2148 * 2149 * Return a valid pointer on success otherwise PTR_ERR. 2150 */ 2151 struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port) 2152 { 2153 struct rtrs_srv_ctx *ctx; 2154 int err; 2155 2156 ctx = alloc_srv_ctx(ops); 2157 if (!ctx) 2158 return ERR_PTR(-ENOMEM); 2159 2160 mutex_init(&ib_ctx.ib_dev_mutex); 2161 ib_ctx.srv_ctx = ctx; 2162 ib_ctx.port = port; 2163 2164 err = ib_register_client(&rtrs_srv_client); 2165 if (err) { 2166 free_srv_ctx(ctx); 2167 return ERR_PTR(err); 2168 } 2169 2170 return ctx; 2171 } 2172 EXPORT_SYMBOL(rtrs_srv_open); 2173 2174 static void close_sessions(struct rtrs_srv *srv) 2175 { 2176 struct rtrs_srv_sess *sess; 2177 2178 mutex_lock(&srv->paths_mutex); 2179 list_for_each_entry(sess, &srv->paths_list, s.entry) 2180 close_sess(sess); 2181 mutex_unlock(&srv->paths_mutex); 2182 } 2183 2184 static void close_ctx(struct rtrs_srv_ctx *ctx) 2185 { 2186 struct rtrs_srv *srv; 2187 2188 mutex_lock(&ctx->srv_mutex); 2189 list_for_each_entry(srv, &ctx->srv_list, ctx_list) 2190 close_sessions(srv); 2191 mutex_unlock(&ctx->srv_mutex); 2192 flush_workqueue(rtrs_wq); 2193 } 2194 2195 /** 2196 * rtrs_srv_close() - close RTRS server context 2197 * @ctx: pointer to server context 2198 * 2199 * Closes RTRS server context with all client sessions. 2200 */ 2201 void rtrs_srv_close(struct rtrs_srv_ctx *ctx) 2202 { 2203 ib_unregister_client(&rtrs_srv_client); 2204 mutex_destroy(&ib_ctx.ib_dev_mutex); 2205 close_ctx(ctx); 2206 free_srv_ctx(ctx); 2207 } 2208 EXPORT_SYMBOL(rtrs_srv_close); 2209 2210 static int check_module_params(void) 2211 { 2212 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) { 2213 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n", 2214 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH); 2215 return -EINVAL; 2216 } 2217 if (max_chunk_size < MIN_CHUNK_SIZE || !is_power_of_2(max_chunk_size)) { 2218 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n", 2219 max_chunk_size, MIN_CHUNK_SIZE); 2220 return -EINVAL; 2221 } 2222 2223 /* 2224 * Check if IB immediate data size is enough to hold the mem_id and the 2225 * offset inside the memory chunk 2226 */ 2227 if ((ilog2(sess_queue_depth - 1) + 1) + 2228 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) { 2229 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n", 2230 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size); 2231 return -EINVAL; 2232 } 2233 2234 return 0; 2235 } 2236 2237 static int __init rtrs_server_init(void) 2238 { 2239 int err; 2240 2241 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n", 2242 KBUILD_MODNAME, RTRS_PROTO_VER_STRING, 2243 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE, 2244 sess_queue_depth, always_invalidate); 2245 2246 rtrs_rdma_dev_pd_init(0, &dev_pd); 2247 2248 err = check_module_params(); 2249 if (err) { 2250 pr_err("Failed to load module, invalid module parameters, err: %d\n", 2251 err); 2252 return err; 2253 } 2254 chunk_pool = mempool_create_page_pool(sess_queue_depth * CHUNK_POOL_SZ, 2255 get_order(max_chunk_size)); 2256 if (!chunk_pool) 2257 return -ENOMEM; 2258 rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server"); 2259 if (IS_ERR(rtrs_dev_class)) { 2260 err = PTR_ERR(rtrs_dev_class); 2261 goto out_chunk_pool; 2262 } 2263 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0); 2264 if (!rtrs_wq) { 2265 err = -ENOMEM; 2266 goto out_dev_class; 2267 } 2268 2269 return 0; 2270 2271 out_dev_class: 2272 class_destroy(rtrs_dev_class); 2273 out_chunk_pool: 2274 mempool_destroy(chunk_pool); 2275 2276 return err; 2277 } 2278 2279 static void __exit rtrs_server_exit(void) 2280 { 2281 destroy_workqueue(rtrs_wq); 2282 class_destroy(rtrs_dev_class); 2283 mempool_destroy(chunk_pool); 2284 rtrs_rdma_dev_pd_deinit(&dev_pd); 2285 } 2286 2287 module_init(rtrs_server_init); 2288 module_exit(rtrs_server_exit); 2289