1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Transport Layer 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 #include <linux/mempool.h> 15 16 #include "rtrs-srv.h" 17 #include "rtrs-log.h" 18 #include <rdma/ib_cm.h> 19 #include <rdma/ib_verbs.h> 20 21 MODULE_DESCRIPTION("RDMA Transport Server"); 22 MODULE_LICENSE("GPL"); 23 24 /* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */ 25 #define DEFAULT_MAX_CHUNK_SIZE (128 << 10) 26 #define DEFAULT_SESS_QUEUE_DEPTH 512 27 #define MAX_HDR_SIZE PAGE_SIZE 28 29 /* We guarantee to serve 10 paths at least */ 30 #define CHUNK_POOL_SZ 10 31 32 static struct rtrs_rdma_dev_pd dev_pd; 33 static mempool_t *chunk_pool; 34 struct class *rtrs_dev_class; 35 static struct rtrs_srv_ib_ctx ib_ctx; 36 37 static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE; 38 static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH; 39 40 static bool always_invalidate = true; 41 module_param(always_invalidate, bool, 0444); 42 MODULE_PARM_DESC(always_invalidate, 43 "Invalidate memory registration for contiguous memory regions before accessing."); 44 45 module_param_named(max_chunk_size, max_chunk_size, int, 0444); 46 MODULE_PARM_DESC(max_chunk_size, 47 "Max size for each IO request, when change the unit is in byte (default: " 48 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)"); 49 50 module_param_named(sess_queue_depth, sess_queue_depth, int, 0444); 51 MODULE_PARM_DESC(sess_queue_depth, 52 "Number of buffers for pending I/O requests to allocate per session. Maximum: " 53 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: " 54 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")"); 55 56 static cpumask_t cq_affinity_mask = { CPU_BITS_ALL }; 57 58 static struct workqueue_struct *rtrs_wq; 59 60 static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c) 61 { 62 return container_of(c, struct rtrs_srv_con, c); 63 } 64 65 static inline struct rtrs_srv_sess *to_srv_sess(struct rtrs_sess *s) 66 { 67 return container_of(s, struct rtrs_srv_sess, s); 68 } 69 70 static bool rtrs_srv_change_state(struct rtrs_srv_sess *sess, 71 enum rtrs_srv_state new_state) 72 { 73 enum rtrs_srv_state old_state; 74 bool changed = false; 75 76 spin_lock_irq(&sess->state_lock); 77 old_state = sess->state; 78 switch (new_state) { 79 case RTRS_SRV_CONNECTED: 80 if (old_state == RTRS_SRV_CONNECTING) 81 changed = true; 82 break; 83 case RTRS_SRV_CLOSING: 84 if (old_state == RTRS_SRV_CONNECTING || 85 old_state == RTRS_SRV_CONNECTED) 86 changed = true; 87 break; 88 case RTRS_SRV_CLOSED: 89 if (old_state == RTRS_SRV_CLOSING) 90 changed = true; 91 break; 92 default: 93 break; 94 } 95 if (changed) 96 sess->state = new_state; 97 spin_unlock_irq(&sess->state_lock); 98 99 return changed; 100 } 101 102 static void free_id(struct rtrs_srv_op *id) 103 { 104 if (!id) 105 return; 106 kfree(id); 107 } 108 109 static void rtrs_srv_free_ops_ids(struct rtrs_srv_sess *sess) 110 { 111 struct rtrs_srv *srv = sess->srv; 112 int i; 113 114 if (sess->ops_ids) { 115 for (i = 0; i < srv->queue_depth; i++) 116 free_id(sess->ops_ids[i]); 117 kfree(sess->ops_ids); 118 sess->ops_ids = NULL; 119 } 120 } 121 122 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc); 123 124 static struct ib_cqe io_comp_cqe = { 125 .done = rtrs_srv_rdma_done 126 }; 127 128 static inline void rtrs_srv_inflight_ref_release(struct percpu_ref *ref) 129 { 130 struct rtrs_srv_sess *sess = container_of(ref, struct rtrs_srv_sess, ids_inflight_ref); 131 132 percpu_ref_exit(&sess->ids_inflight_ref); 133 complete(&sess->complete_done); 134 } 135 136 static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_sess *sess) 137 { 138 struct rtrs_srv *srv = sess->srv; 139 struct rtrs_srv_op *id; 140 int i, ret; 141 142 sess->ops_ids = kcalloc(srv->queue_depth, sizeof(*sess->ops_ids), 143 GFP_KERNEL); 144 if (!sess->ops_ids) 145 goto err; 146 147 for (i = 0; i < srv->queue_depth; ++i) { 148 id = kzalloc(sizeof(*id), GFP_KERNEL); 149 if (!id) 150 goto err; 151 152 sess->ops_ids[i] = id; 153 } 154 155 ret = percpu_ref_init(&sess->ids_inflight_ref, 156 rtrs_srv_inflight_ref_release, 0, GFP_KERNEL); 157 if (ret) { 158 pr_err("Percpu reference init failed\n"); 159 goto err; 160 } 161 init_completion(&sess->complete_done); 162 163 return 0; 164 165 err: 166 rtrs_srv_free_ops_ids(sess); 167 return -ENOMEM; 168 } 169 170 static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_sess *sess) 171 { 172 percpu_ref_get(&sess->ids_inflight_ref); 173 } 174 175 static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_sess *sess) 176 { 177 percpu_ref_put(&sess->ids_inflight_ref); 178 } 179 180 static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc) 181 { 182 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 183 struct rtrs_sess *s = con->c.sess; 184 struct rtrs_srv_sess *sess = to_srv_sess(s); 185 186 if (wc->status != IB_WC_SUCCESS) { 187 rtrs_err(s, "REG MR failed: %s\n", 188 ib_wc_status_msg(wc->status)); 189 close_sess(sess); 190 return; 191 } 192 } 193 194 static struct ib_cqe local_reg_cqe = { 195 .done = rtrs_srv_reg_mr_done 196 }; 197 198 static int rdma_write_sg(struct rtrs_srv_op *id) 199 { 200 struct rtrs_sess *s = id->con->c.sess; 201 struct rtrs_srv_sess *sess = to_srv_sess(s); 202 dma_addr_t dma_addr = sess->dma_addr[id->msg_id]; 203 struct rtrs_srv_mr *srv_mr; 204 struct ib_send_wr inv_wr; 205 struct ib_rdma_wr imm_wr; 206 struct ib_rdma_wr *wr = NULL; 207 enum ib_send_flags flags; 208 size_t sg_cnt; 209 int err, offset; 210 bool need_inval; 211 u32 rkey = 0; 212 struct ib_reg_wr rwr; 213 struct ib_sge *plist; 214 struct ib_sge list; 215 216 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt); 217 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F; 218 if (sg_cnt != 1) 219 return -EINVAL; 220 221 offset = 0; 222 223 wr = &id->tx_wr; 224 plist = &id->tx_sg; 225 plist->addr = dma_addr + offset; 226 plist->length = le32_to_cpu(id->rd_msg->desc[0].len); 227 228 /* WR will fail with length error 229 * if this is 0 230 */ 231 if (plist->length == 0) { 232 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n"); 233 return -EINVAL; 234 } 235 236 plist->lkey = sess->s.dev->ib_pd->local_dma_lkey; 237 offset += plist->length; 238 239 wr->wr.sg_list = plist; 240 wr->wr.num_sge = 1; 241 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr); 242 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key); 243 if (rkey == 0) 244 rkey = wr->rkey; 245 else 246 /* Only one key is actually used */ 247 WARN_ON_ONCE(rkey != wr->rkey); 248 249 wr->wr.opcode = IB_WR_RDMA_WRITE; 250 wr->wr.wr_cqe = &io_comp_cqe; 251 wr->wr.ex.imm_data = 0; 252 wr->wr.send_flags = 0; 253 254 if (need_inval && always_invalidate) { 255 wr->wr.next = &rwr.wr; 256 rwr.wr.next = &inv_wr; 257 inv_wr.next = &imm_wr.wr; 258 } else if (always_invalidate) { 259 wr->wr.next = &rwr.wr; 260 rwr.wr.next = &imm_wr.wr; 261 } else if (need_inval) { 262 wr->wr.next = &inv_wr; 263 inv_wr.next = &imm_wr.wr; 264 } else { 265 wr->wr.next = &imm_wr.wr; 266 } 267 /* 268 * From time to time we have to post signaled sends, 269 * or send queue will fill up and only QP reset can help. 270 */ 271 flags = (atomic_inc_return(&id->con->c.wr_cnt) % s->signal_interval) ? 272 0 : IB_SEND_SIGNALED; 273 274 if (need_inval) { 275 inv_wr.sg_list = NULL; 276 inv_wr.num_sge = 0; 277 inv_wr.opcode = IB_WR_SEND_WITH_INV; 278 inv_wr.wr_cqe = &io_comp_cqe; 279 inv_wr.send_flags = 0; 280 inv_wr.ex.invalidate_rkey = rkey; 281 } 282 283 imm_wr.wr.next = NULL; 284 if (always_invalidate) { 285 struct rtrs_msg_rkey_rsp *msg; 286 287 srv_mr = &sess->mrs[id->msg_id]; 288 rwr.wr.opcode = IB_WR_REG_MR; 289 rwr.wr.wr_cqe = &local_reg_cqe; 290 rwr.wr.num_sge = 0; 291 rwr.mr = srv_mr->mr; 292 rwr.wr.send_flags = 0; 293 rwr.key = srv_mr->mr->rkey; 294 rwr.access = (IB_ACCESS_LOCAL_WRITE | 295 IB_ACCESS_REMOTE_WRITE); 296 msg = srv_mr->iu->buf; 297 msg->buf_id = cpu_to_le16(id->msg_id); 298 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 299 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 300 301 list.addr = srv_mr->iu->dma_addr; 302 list.length = sizeof(*msg); 303 list.lkey = sess->s.dev->ib_pd->local_dma_lkey; 304 imm_wr.wr.sg_list = &list; 305 imm_wr.wr.num_sge = 1; 306 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 307 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, 308 srv_mr->iu->dma_addr, 309 srv_mr->iu->size, DMA_TO_DEVICE); 310 } else { 311 imm_wr.wr.sg_list = NULL; 312 imm_wr.wr.num_sge = 0; 313 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 314 } 315 imm_wr.wr.send_flags = flags; 316 imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id, 317 0, need_inval)); 318 319 imm_wr.wr.wr_cqe = &io_comp_cqe; 320 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, dma_addr, 321 offset, DMA_BIDIRECTIONAL); 322 323 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL); 324 if (err) 325 rtrs_err(s, 326 "Posting RDMA-Write-Request to QP failed, err: %d\n", 327 err); 328 329 return err; 330 } 331 332 /** 333 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE 334 * requests or on successful WRITE request. 335 * @con: the connection to send back result 336 * @id: the id associated with the IO 337 * @errno: the error number of the IO. 338 * 339 * Return 0 on success, errno otherwise. 340 */ 341 static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id, 342 int errno) 343 { 344 struct rtrs_sess *s = con->c.sess; 345 struct rtrs_srv_sess *sess = to_srv_sess(s); 346 struct ib_send_wr inv_wr, *wr = NULL; 347 struct ib_rdma_wr imm_wr; 348 struct ib_reg_wr rwr; 349 struct rtrs_srv_mr *srv_mr; 350 bool need_inval = false; 351 enum ib_send_flags flags; 352 u32 imm; 353 int err; 354 355 if (id->dir == READ) { 356 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg; 357 size_t sg_cnt; 358 359 need_inval = le16_to_cpu(rd_msg->flags) & 360 RTRS_MSG_NEED_INVAL_F; 361 sg_cnt = le16_to_cpu(rd_msg->sg_cnt); 362 363 if (need_inval) { 364 if (sg_cnt) { 365 inv_wr.wr_cqe = &io_comp_cqe; 366 inv_wr.sg_list = NULL; 367 inv_wr.num_sge = 0; 368 inv_wr.opcode = IB_WR_SEND_WITH_INV; 369 inv_wr.send_flags = 0; 370 /* Only one key is actually used */ 371 inv_wr.ex.invalidate_rkey = 372 le32_to_cpu(rd_msg->desc[0].key); 373 } else { 374 WARN_ON_ONCE(1); 375 need_inval = false; 376 } 377 } 378 } 379 380 if (need_inval && always_invalidate) { 381 wr = &inv_wr; 382 inv_wr.next = &rwr.wr; 383 rwr.wr.next = &imm_wr.wr; 384 } else if (always_invalidate) { 385 wr = &rwr.wr; 386 rwr.wr.next = &imm_wr.wr; 387 } else if (need_inval) { 388 wr = &inv_wr; 389 inv_wr.next = &imm_wr.wr; 390 } else { 391 wr = &imm_wr.wr; 392 } 393 /* 394 * From time to time we have to post signalled sends, 395 * or send queue will fill up and only QP reset can help. 396 */ 397 flags = (atomic_inc_return(&con->c.wr_cnt) % s->signal_interval) ? 398 0 : IB_SEND_SIGNALED; 399 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval); 400 imm_wr.wr.next = NULL; 401 if (always_invalidate) { 402 struct ib_sge list; 403 struct rtrs_msg_rkey_rsp *msg; 404 405 srv_mr = &sess->mrs[id->msg_id]; 406 rwr.wr.next = &imm_wr.wr; 407 rwr.wr.opcode = IB_WR_REG_MR; 408 rwr.wr.wr_cqe = &local_reg_cqe; 409 rwr.wr.num_sge = 0; 410 rwr.wr.send_flags = 0; 411 rwr.mr = srv_mr->mr; 412 rwr.key = srv_mr->mr->rkey; 413 rwr.access = (IB_ACCESS_LOCAL_WRITE | 414 IB_ACCESS_REMOTE_WRITE); 415 msg = srv_mr->iu->buf; 416 msg->buf_id = cpu_to_le16(id->msg_id); 417 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 418 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 419 420 list.addr = srv_mr->iu->dma_addr; 421 list.length = sizeof(*msg); 422 list.lkey = sess->s.dev->ib_pd->local_dma_lkey; 423 imm_wr.wr.sg_list = &list; 424 imm_wr.wr.num_sge = 1; 425 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 426 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, 427 srv_mr->iu->dma_addr, 428 srv_mr->iu->size, DMA_TO_DEVICE); 429 } else { 430 imm_wr.wr.sg_list = NULL; 431 imm_wr.wr.num_sge = 0; 432 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 433 } 434 imm_wr.wr.send_flags = flags; 435 imm_wr.wr.wr_cqe = &io_comp_cqe; 436 437 imm_wr.wr.ex.imm_data = cpu_to_be32(imm); 438 439 err = ib_post_send(id->con->c.qp, wr, NULL); 440 if (err) 441 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n", 442 err); 443 444 return err; 445 } 446 447 void close_sess(struct rtrs_srv_sess *sess) 448 { 449 if (rtrs_srv_change_state(sess, RTRS_SRV_CLOSING)) 450 queue_work(rtrs_wq, &sess->close_work); 451 WARN_ON(sess->state != RTRS_SRV_CLOSING); 452 } 453 454 static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state) 455 { 456 switch (state) { 457 case RTRS_SRV_CONNECTING: 458 return "RTRS_SRV_CONNECTING"; 459 case RTRS_SRV_CONNECTED: 460 return "RTRS_SRV_CONNECTED"; 461 case RTRS_SRV_CLOSING: 462 return "RTRS_SRV_CLOSING"; 463 case RTRS_SRV_CLOSED: 464 return "RTRS_SRV_CLOSED"; 465 default: 466 return "UNKNOWN"; 467 } 468 } 469 470 /** 471 * rtrs_srv_resp_rdma() - Finish an RDMA request 472 * 473 * @id: Internal RTRS operation identifier 474 * @status: Response Code sent to the other side for this operation. 475 * 0 = success, <=0 error 476 * Context: any 477 * 478 * Finish a RDMA operation. A message is sent to the client and the 479 * corresponding memory areas will be released. 480 */ 481 bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status) 482 { 483 struct rtrs_srv_sess *sess; 484 struct rtrs_srv_con *con; 485 struct rtrs_sess *s; 486 int err; 487 488 if (WARN_ON(!id)) 489 return true; 490 491 con = id->con; 492 s = con->c.sess; 493 sess = to_srv_sess(s); 494 495 id->status = status; 496 497 if (sess->state != RTRS_SRV_CONNECTED) { 498 rtrs_err_rl(s, 499 "Sending I/O response failed, session %s is disconnected, sess state %s\n", 500 kobject_name(&sess->kobj), 501 rtrs_srv_state_str(sess->state)); 502 goto out; 503 } 504 if (always_invalidate) { 505 struct rtrs_srv_mr *mr = &sess->mrs[id->msg_id]; 506 507 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey)); 508 } 509 if (atomic_sub_return(1, &con->c.sq_wr_avail) < 0) { 510 rtrs_err(s, "IB send queue full: sess=%s cid=%d\n", 511 kobject_name(&sess->kobj), 512 con->c.cid); 513 atomic_add(1, &con->c.sq_wr_avail); 514 spin_lock(&con->rsp_wr_wait_lock); 515 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list); 516 spin_unlock(&con->rsp_wr_wait_lock); 517 return false; 518 } 519 520 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt) 521 err = send_io_resp_imm(con, id, status); 522 else 523 err = rdma_write_sg(id); 524 525 if (err) { 526 rtrs_err_rl(s, "IO response failed: %d: sess=%s\n", err, 527 kobject_name(&sess->kobj)); 528 close_sess(sess); 529 } 530 out: 531 rtrs_srv_put_ops_ids(sess); 532 return true; 533 } 534 EXPORT_SYMBOL(rtrs_srv_resp_rdma); 535 536 /** 537 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv. 538 * @srv: Session pointer 539 * @priv: The private pointer that is associated with the session. 540 */ 541 void rtrs_srv_set_sess_priv(struct rtrs_srv *srv, void *priv) 542 { 543 srv->priv = priv; 544 } 545 EXPORT_SYMBOL(rtrs_srv_set_sess_priv); 546 547 static void unmap_cont_bufs(struct rtrs_srv_sess *sess) 548 { 549 int i; 550 551 for (i = 0; i < sess->mrs_num; i++) { 552 struct rtrs_srv_mr *srv_mr; 553 554 srv_mr = &sess->mrs[i]; 555 rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1); 556 ib_dereg_mr(srv_mr->mr); 557 ib_dma_unmap_sg(sess->s.dev->ib_dev, srv_mr->sgt.sgl, 558 srv_mr->sgt.nents, DMA_BIDIRECTIONAL); 559 sg_free_table(&srv_mr->sgt); 560 } 561 kfree(sess->mrs); 562 } 563 564 static int map_cont_bufs(struct rtrs_srv_sess *sess) 565 { 566 struct rtrs_srv *srv = sess->srv; 567 struct rtrs_sess *ss = &sess->s; 568 int i, mri, err, mrs_num; 569 unsigned int chunk_bits; 570 int chunks_per_mr = 1; 571 572 /* 573 * Here we map queue_depth chunks to MR. Firstly we have to 574 * figure out how many chunks can we map per MR. 575 */ 576 if (always_invalidate) { 577 /* 578 * in order to do invalidate for each chunks of memory, we needs 579 * more memory regions. 580 */ 581 mrs_num = srv->queue_depth; 582 } else { 583 chunks_per_mr = 584 sess->s.dev->ib_dev->attrs.max_fast_reg_page_list_len; 585 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr); 586 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num); 587 } 588 589 sess->mrs = kcalloc(mrs_num, sizeof(*sess->mrs), GFP_KERNEL); 590 if (!sess->mrs) 591 return -ENOMEM; 592 593 sess->mrs_num = mrs_num; 594 595 for (mri = 0; mri < mrs_num; mri++) { 596 struct rtrs_srv_mr *srv_mr = &sess->mrs[mri]; 597 struct sg_table *sgt = &srv_mr->sgt; 598 struct scatterlist *s; 599 struct ib_mr *mr; 600 int nr, chunks; 601 602 chunks = chunks_per_mr * mri; 603 if (!always_invalidate) 604 chunks_per_mr = min_t(int, chunks_per_mr, 605 srv->queue_depth - chunks); 606 607 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL); 608 if (err) 609 goto err; 610 611 for_each_sg(sgt->sgl, s, chunks_per_mr, i) 612 sg_set_page(s, srv->chunks[chunks + i], 613 max_chunk_size, 0); 614 615 nr = ib_dma_map_sg(sess->s.dev->ib_dev, sgt->sgl, 616 sgt->nents, DMA_BIDIRECTIONAL); 617 if (nr < sgt->nents) { 618 err = nr < 0 ? nr : -EINVAL; 619 goto free_sg; 620 } 621 mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG, 622 sgt->nents); 623 if (IS_ERR(mr)) { 624 err = PTR_ERR(mr); 625 goto unmap_sg; 626 } 627 nr = ib_map_mr_sg(mr, sgt->sgl, sgt->nents, 628 NULL, max_chunk_size); 629 if (nr < 0 || nr < sgt->nents) { 630 err = nr < 0 ? nr : -EINVAL; 631 goto dereg_mr; 632 } 633 634 if (always_invalidate) { 635 srv_mr->iu = rtrs_iu_alloc(1, 636 sizeof(struct rtrs_msg_rkey_rsp), 637 GFP_KERNEL, sess->s.dev->ib_dev, 638 DMA_TO_DEVICE, rtrs_srv_rdma_done); 639 if (!srv_mr->iu) { 640 err = -ENOMEM; 641 rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err); 642 goto dereg_mr; 643 } 644 } 645 /* Eventually dma addr for each chunk can be cached */ 646 for_each_sg(sgt->sgl, s, sgt->orig_nents, i) 647 sess->dma_addr[chunks + i] = sg_dma_address(s); 648 649 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey)); 650 srv_mr->mr = mr; 651 652 continue; 653 err: 654 while (mri--) { 655 srv_mr = &sess->mrs[mri]; 656 sgt = &srv_mr->sgt; 657 mr = srv_mr->mr; 658 rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1); 659 dereg_mr: 660 ib_dereg_mr(mr); 661 unmap_sg: 662 ib_dma_unmap_sg(sess->s.dev->ib_dev, sgt->sgl, 663 sgt->nents, DMA_BIDIRECTIONAL); 664 free_sg: 665 sg_free_table(sgt); 666 } 667 kfree(sess->mrs); 668 669 return err; 670 } 671 672 chunk_bits = ilog2(srv->queue_depth - 1) + 1; 673 sess->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits); 674 675 return 0; 676 } 677 678 static void rtrs_srv_hb_err_handler(struct rtrs_con *c) 679 { 680 close_sess(to_srv_sess(c->sess)); 681 } 682 683 static void rtrs_srv_init_hb(struct rtrs_srv_sess *sess) 684 { 685 rtrs_init_hb(&sess->s, &io_comp_cqe, 686 RTRS_HB_INTERVAL_MS, 687 RTRS_HB_MISSED_MAX, 688 rtrs_srv_hb_err_handler, 689 rtrs_wq); 690 } 691 692 static void rtrs_srv_start_hb(struct rtrs_srv_sess *sess) 693 { 694 rtrs_start_hb(&sess->s); 695 } 696 697 static void rtrs_srv_stop_hb(struct rtrs_srv_sess *sess) 698 { 699 rtrs_stop_hb(&sess->s); 700 } 701 702 static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc) 703 { 704 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 705 struct rtrs_sess *s = con->c.sess; 706 struct rtrs_srv_sess *sess = to_srv_sess(s); 707 struct rtrs_iu *iu; 708 709 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 710 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 711 712 if (wc->status != IB_WC_SUCCESS) { 713 rtrs_err(s, "Sess info response send failed: %s\n", 714 ib_wc_status_msg(wc->status)); 715 close_sess(sess); 716 return; 717 } 718 WARN_ON(wc->opcode != IB_WC_SEND); 719 } 720 721 static void rtrs_srv_sess_up(struct rtrs_srv_sess *sess) 722 { 723 struct rtrs_srv *srv = sess->srv; 724 struct rtrs_srv_ctx *ctx = srv->ctx; 725 int up; 726 727 mutex_lock(&srv->paths_ev_mutex); 728 up = ++srv->paths_up; 729 if (up == 1) 730 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL); 731 mutex_unlock(&srv->paths_ev_mutex); 732 733 /* Mark session as established */ 734 sess->established = true; 735 } 736 737 static void rtrs_srv_sess_down(struct rtrs_srv_sess *sess) 738 { 739 struct rtrs_srv *srv = sess->srv; 740 struct rtrs_srv_ctx *ctx = srv->ctx; 741 742 if (!sess->established) 743 return; 744 745 sess->established = false; 746 mutex_lock(&srv->paths_ev_mutex); 747 WARN_ON(!srv->paths_up); 748 if (--srv->paths_up == 0) 749 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv); 750 mutex_unlock(&srv->paths_ev_mutex); 751 } 752 753 static bool exist_sessname(struct rtrs_srv_ctx *ctx, 754 const char *sessname, const uuid_t *path_uuid) 755 { 756 struct rtrs_srv *srv; 757 struct rtrs_srv_sess *sess; 758 bool found = false; 759 760 mutex_lock(&ctx->srv_mutex); 761 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 762 mutex_lock(&srv->paths_mutex); 763 764 /* when a client with same uuid and same sessname tried to add a path */ 765 if (uuid_equal(&srv->paths_uuid, path_uuid)) { 766 mutex_unlock(&srv->paths_mutex); 767 continue; 768 } 769 770 list_for_each_entry(sess, &srv->paths_list, s.entry) { 771 if (strlen(sess->s.sessname) == strlen(sessname) && 772 !strcmp(sess->s.sessname, sessname)) { 773 found = true; 774 break; 775 } 776 } 777 mutex_unlock(&srv->paths_mutex); 778 if (found) 779 break; 780 } 781 mutex_unlock(&ctx->srv_mutex); 782 return found; 783 } 784 785 static int post_recv_sess(struct rtrs_srv_sess *sess); 786 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno); 787 788 static int process_info_req(struct rtrs_srv_con *con, 789 struct rtrs_msg_info_req *msg) 790 { 791 struct rtrs_sess *s = con->c.sess; 792 struct rtrs_srv_sess *sess = to_srv_sess(s); 793 struct ib_send_wr *reg_wr = NULL; 794 struct rtrs_msg_info_rsp *rsp; 795 struct rtrs_iu *tx_iu; 796 struct ib_reg_wr *rwr; 797 int mri, err; 798 size_t tx_sz; 799 800 err = post_recv_sess(sess); 801 if (err) { 802 rtrs_err(s, "post_recv_sess(), err: %d\n", err); 803 return err; 804 } 805 806 if (strchr(msg->sessname, '/') || strchr(msg->sessname, '.')) { 807 rtrs_err(s, "sessname cannot contain / and .\n"); 808 return -EINVAL; 809 } 810 811 if (exist_sessname(sess->srv->ctx, 812 msg->sessname, &sess->srv->paths_uuid)) { 813 rtrs_err(s, "sessname is duplicated: %s\n", msg->sessname); 814 return -EPERM; 815 } 816 strscpy(sess->s.sessname, msg->sessname, sizeof(sess->s.sessname)); 817 818 rwr = kcalloc(sess->mrs_num, sizeof(*rwr), GFP_KERNEL); 819 if (!rwr) 820 return -ENOMEM; 821 822 tx_sz = sizeof(*rsp); 823 tx_sz += sizeof(rsp->desc[0]) * sess->mrs_num; 824 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, sess->s.dev->ib_dev, 825 DMA_TO_DEVICE, rtrs_srv_info_rsp_done); 826 if (!tx_iu) { 827 err = -ENOMEM; 828 goto rwr_free; 829 } 830 831 rsp = tx_iu->buf; 832 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP); 833 rsp->sg_cnt = cpu_to_le16(sess->mrs_num); 834 835 for (mri = 0; mri < sess->mrs_num; mri++) { 836 struct ib_mr *mr = sess->mrs[mri].mr; 837 838 rsp->desc[mri].addr = cpu_to_le64(mr->iova); 839 rsp->desc[mri].key = cpu_to_le32(mr->rkey); 840 rsp->desc[mri].len = cpu_to_le32(mr->length); 841 842 /* 843 * Fill in reg MR request and chain them *backwards* 844 */ 845 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL; 846 rwr[mri].wr.opcode = IB_WR_REG_MR; 847 rwr[mri].wr.wr_cqe = &local_reg_cqe; 848 rwr[mri].wr.num_sge = 0; 849 rwr[mri].wr.send_flags = 0; 850 rwr[mri].mr = mr; 851 rwr[mri].key = mr->rkey; 852 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE | 853 IB_ACCESS_REMOTE_WRITE); 854 reg_wr = &rwr[mri].wr; 855 } 856 857 err = rtrs_srv_create_sess_files(sess); 858 if (err) 859 goto iu_free; 860 kobject_get(&sess->kobj); 861 get_device(&sess->srv->dev); 862 rtrs_srv_change_state(sess, RTRS_SRV_CONNECTED); 863 rtrs_srv_start_hb(sess); 864 865 /* 866 * We do not account number of established connections at the current 867 * moment, we rely on the client, which should send info request when 868 * all connections are successfully established. Thus, simply notify 869 * listener with a proper event if we are the first path. 870 */ 871 rtrs_srv_sess_up(sess); 872 873 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr, 874 tx_iu->size, DMA_TO_DEVICE); 875 876 /* Send info response */ 877 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr); 878 if (err) { 879 rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err); 880 iu_free: 881 rtrs_iu_free(tx_iu, sess->s.dev->ib_dev, 1); 882 } 883 rwr_free: 884 kfree(rwr); 885 886 return err; 887 } 888 889 static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc) 890 { 891 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 892 struct rtrs_sess *s = con->c.sess; 893 struct rtrs_srv_sess *sess = to_srv_sess(s); 894 struct rtrs_msg_info_req *msg; 895 struct rtrs_iu *iu; 896 int err; 897 898 WARN_ON(con->c.cid); 899 900 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 901 if (wc->status != IB_WC_SUCCESS) { 902 rtrs_err(s, "Sess info request receive failed: %s\n", 903 ib_wc_status_msg(wc->status)); 904 goto close; 905 } 906 WARN_ON(wc->opcode != IB_WC_RECV); 907 908 if (wc->byte_len < sizeof(*msg)) { 909 rtrs_err(s, "Sess info request is malformed: size %d\n", 910 wc->byte_len); 911 goto close; 912 } 913 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr, 914 iu->size, DMA_FROM_DEVICE); 915 msg = iu->buf; 916 if (le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ) { 917 rtrs_err(s, "Sess info request is malformed: type %d\n", 918 le16_to_cpu(msg->type)); 919 goto close; 920 } 921 err = process_info_req(con, msg); 922 if (err) 923 goto close; 924 925 out: 926 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 927 return; 928 close: 929 close_sess(sess); 930 goto out; 931 } 932 933 static int post_recv_info_req(struct rtrs_srv_con *con) 934 { 935 struct rtrs_sess *s = con->c.sess; 936 struct rtrs_srv_sess *sess = to_srv_sess(s); 937 struct rtrs_iu *rx_iu; 938 int err; 939 940 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), 941 GFP_KERNEL, sess->s.dev->ib_dev, 942 DMA_FROM_DEVICE, rtrs_srv_info_req_done); 943 if (!rx_iu) 944 return -ENOMEM; 945 /* Prepare for getting info response */ 946 err = rtrs_iu_post_recv(&con->c, rx_iu); 947 if (err) { 948 rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err); 949 rtrs_iu_free(rx_iu, sess->s.dev->ib_dev, 1); 950 return err; 951 } 952 953 return 0; 954 } 955 956 static int post_recv_io(struct rtrs_srv_con *con, size_t q_size) 957 { 958 int i, err; 959 960 for (i = 0; i < q_size; i++) { 961 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 962 if (err) 963 return err; 964 } 965 966 return 0; 967 } 968 969 static int post_recv_sess(struct rtrs_srv_sess *sess) 970 { 971 struct rtrs_srv *srv = sess->srv; 972 struct rtrs_sess *s = &sess->s; 973 size_t q_size; 974 int err, cid; 975 976 for (cid = 0; cid < sess->s.con_num; cid++) { 977 if (cid == 0) 978 q_size = SERVICE_CON_QUEUE_DEPTH; 979 else 980 q_size = srv->queue_depth; 981 982 err = post_recv_io(to_srv_con(sess->s.con[cid]), q_size); 983 if (err) { 984 rtrs_err(s, "post_recv_io(), err: %d\n", err); 985 return err; 986 } 987 } 988 989 return 0; 990 } 991 992 static void process_read(struct rtrs_srv_con *con, 993 struct rtrs_msg_rdma_read *msg, 994 u32 buf_id, u32 off) 995 { 996 struct rtrs_sess *s = con->c.sess; 997 struct rtrs_srv_sess *sess = to_srv_sess(s); 998 struct rtrs_srv *srv = sess->srv; 999 struct rtrs_srv_ctx *ctx = srv->ctx; 1000 struct rtrs_srv_op *id; 1001 1002 size_t usr_len, data_len; 1003 void *data; 1004 int ret; 1005 1006 if (sess->state != RTRS_SRV_CONNECTED) { 1007 rtrs_err_rl(s, 1008 "Processing read request failed, session is disconnected, sess state %s\n", 1009 rtrs_srv_state_str(sess->state)); 1010 return; 1011 } 1012 if (msg->sg_cnt != 1 && msg->sg_cnt != 0) { 1013 rtrs_err_rl(s, 1014 "Processing read request failed, invalid message\n"); 1015 return; 1016 } 1017 rtrs_srv_get_ops_ids(sess); 1018 rtrs_srv_update_rdma_stats(sess->stats, off, READ); 1019 id = sess->ops_ids[buf_id]; 1020 id->con = con; 1021 id->dir = READ; 1022 id->msg_id = buf_id; 1023 id->rd_msg = msg; 1024 usr_len = le16_to_cpu(msg->usr_len); 1025 data_len = off - usr_len; 1026 data = page_address(srv->chunks[buf_id]); 1027 ret = ctx->ops.rdma_ev(srv->priv, id, READ, data, data_len, 1028 data + data_len, usr_len); 1029 1030 if (ret) { 1031 rtrs_err_rl(s, 1032 "Processing read request failed, user module cb reported for msg_id %d, err: %d\n", 1033 buf_id, ret); 1034 goto send_err_msg; 1035 } 1036 1037 return; 1038 1039 send_err_msg: 1040 ret = send_io_resp_imm(con, id, ret); 1041 if (ret < 0) { 1042 rtrs_err_rl(s, 1043 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n", 1044 buf_id, ret); 1045 close_sess(sess); 1046 } 1047 rtrs_srv_put_ops_ids(sess); 1048 } 1049 1050 static void process_write(struct rtrs_srv_con *con, 1051 struct rtrs_msg_rdma_write *req, 1052 u32 buf_id, u32 off) 1053 { 1054 struct rtrs_sess *s = con->c.sess; 1055 struct rtrs_srv_sess *sess = to_srv_sess(s); 1056 struct rtrs_srv *srv = sess->srv; 1057 struct rtrs_srv_ctx *ctx = srv->ctx; 1058 struct rtrs_srv_op *id; 1059 1060 size_t data_len, usr_len; 1061 void *data; 1062 int ret; 1063 1064 if (sess->state != RTRS_SRV_CONNECTED) { 1065 rtrs_err_rl(s, 1066 "Processing write request failed, session is disconnected, sess state %s\n", 1067 rtrs_srv_state_str(sess->state)); 1068 return; 1069 } 1070 rtrs_srv_get_ops_ids(sess); 1071 rtrs_srv_update_rdma_stats(sess->stats, off, WRITE); 1072 id = sess->ops_ids[buf_id]; 1073 id->con = con; 1074 id->dir = WRITE; 1075 id->msg_id = buf_id; 1076 1077 usr_len = le16_to_cpu(req->usr_len); 1078 data_len = off - usr_len; 1079 data = page_address(srv->chunks[buf_id]); 1080 ret = ctx->ops.rdma_ev(srv->priv, id, WRITE, data, data_len, 1081 data + data_len, usr_len); 1082 if (ret) { 1083 rtrs_err_rl(s, 1084 "Processing write request failed, user module callback reports err: %d\n", 1085 ret); 1086 goto send_err_msg; 1087 } 1088 1089 return; 1090 1091 send_err_msg: 1092 ret = send_io_resp_imm(con, id, ret); 1093 if (ret < 0) { 1094 rtrs_err_rl(s, 1095 "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n", 1096 buf_id, ret); 1097 close_sess(sess); 1098 } 1099 rtrs_srv_put_ops_ids(sess); 1100 } 1101 1102 static void process_io_req(struct rtrs_srv_con *con, void *msg, 1103 u32 id, u32 off) 1104 { 1105 struct rtrs_sess *s = con->c.sess; 1106 struct rtrs_srv_sess *sess = to_srv_sess(s); 1107 struct rtrs_msg_rdma_hdr *hdr; 1108 unsigned int type; 1109 1110 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, sess->dma_addr[id], 1111 max_chunk_size, DMA_BIDIRECTIONAL); 1112 hdr = msg; 1113 type = le16_to_cpu(hdr->type); 1114 1115 switch (type) { 1116 case RTRS_MSG_WRITE: 1117 process_write(con, msg, id, off); 1118 break; 1119 case RTRS_MSG_READ: 1120 process_read(con, msg, id, off); 1121 break; 1122 default: 1123 rtrs_err(s, 1124 "Processing I/O request failed, unknown message type received: 0x%02x\n", 1125 type); 1126 goto err; 1127 } 1128 1129 return; 1130 1131 err: 1132 close_sess(sess); 1133 } 1134 1135 static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc) 1136 { 1137 struct rtrs_srv_mr *mr = 1138 container_of(wc->wr_cqe, typeof(*mr), inv_cqe); 1139 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1140 struct rtrs_sess *s = con->c.sess; 1141 struct rtrs_srv_sess *sess = to_srv_sess(s); 1142 struct rtrs_srv *srv = sess->srv; 1143 u32 msg_id, off; 1144 void *data; 1145 1146 if (wc->status != IB_WC_SUCCESS) { 1147 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n", 1148 ib_wc_status_msg(wc->status)); 1149 close_sess(sess); 1150 } 1151 msg_id = mr->msg_id; 1152 off = mr->msg_off; 1153 data = page_address(srv->chunks[msg_id]) + off; 1154 process_io_req(con, data, msg_id, off); 1155 } 1156 1157 static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con, 1158 struct rtrs_srv_mr *mr) 1159 { 1160 struct ib_send_wr wr = { 1161 .opcode = IB_WR_LOCAL_INV, 1162 .wr_cqe = &mr->inv_cqe, 1163 .send_flags = IB_SEND_SIGNALED, 1164 .ex.invalidate_rkey = mr->mr->rkey, 1165 }; 1166 mr->inv_cqe.done = rtrs_srv_inv_rkey_done; 1167 1168 return ib_post_send(con->c.qp, &wr, NULL); 1169 } 1170 1171 static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con) 1172 { 1173 spin_lock(&con->rsp_wr_wait_lock); 1174 while (!list_empty(&con->rsp_wr_wait_list)) { 1175 struct rtrs_srv_op *id; 1176 int ret; 1177 1178 id = list_entry(con->rsp_wr_wait_list.next, 1179 struct rtrs_srv_op, wait_list); 1180 list_del(&id->wait_list); 1181 1182 spin_unlock(&con->rsp_wr_wait_lock); 1183 ret = rtrs_srv_resp_rdma(id, id->status); 1184 spin_lock(&con->rsp_wr_wait_lock); 1185 1186 if (!ret) { 1187 list_add(&id->wait_list, &con->rsp_wr_wait_list); 1188 break; 1189 } 1190 } 1191 spin_unlock(&con->rsp_wr_wait_lock); 1192 } 1193 1194 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc) 1195 { 1196 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1197 struct rtrs_sess *s = con->c.sess; 1198 struct rtrs_srv_sess *sess = to_srv_sess(s); 1199 struct rtrs_srv *srv = sess->srv; 1200 u32 imm_type, imm_payload; 1201 int err; 1202 1203 if (wc->status != IB_WC_SUCCESS) { 1204 if (wc->status != IB_WC_WR_FLUSH_ERR) { 1205 rtrs_err(s, 1206 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n", 1207 ib_wc_status_msg(wc->status), wc->wr_cqe, 1208 wc->opcode, wc->vendor_err, wc->byte_len); 1209 close_sess(sess); 1210 } 1211 return; 1212 } 1213 1214 switch (wc->opcode) { 1215 case IB_WC_RECV_RDMA_WITH_IMM: 1216 /* 1217 * post_recv() RDMA write completions of IO reqs (read/write) 1218 * and hb 1219 */ 1220 if (WARN_ON(wc->wr_cqe != &io_comp_cqe)) 1221 return; 1222 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 1223 if (err) { 1224 rtrs_err(s, "rtrs_post_recv(), err: %d\n", err); 1225 close_sess(sess); 1226 break; 1227 } 1228 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), 1229 &imm_type, &imm_payload); 1230 if (imm_type == RTRS_IO_REQ_IMM) { 1231 u32 msg_id, off; 1232 void *data; 1233 1234 msg_id = imm_payload >> sess->mem_bits; 1235 off = imm_payload & ((1 << sess->mem_bits) - 1); 1236 if (msg_id >= srv->queue_depth || off >= max_chunk_size) { 1237 rtrs_err(s, "Wrong msg_id %u, off %u\n", 1238 msg_id, off); 1239 close_sess(sess); 1240 return; 1241 } 1242 if (always_invalidate) { 1243 struct rtrs_srv_mr *mr = &sess->mrs[msg_id]; 1244 1245 mr->msg_off = off; 1246 mr->msg_id = msg_id; 1247 err = rtrs_srv_inv_rkey(con, mr); 1248 if (err) { 1249 rtrs_err(s, "rtrs_post_recv(), err: %d\n", 1250 err); 1251 close_sess(sess); 1252 break; 1253 } 1254 } else { 1255 data = page_address(srv->chunks[msg_id]) + off; 1256 process_io_req(con, data, msg_id, off); 1257 } 1258 } else if (imm_type == RTRS_HB_MSG_IMM) { 1259 WARN_ON(con->c.cid); 1260 rtrs_send_hb_ack(&sess->s); 1261 } else if (imm_type == RTRS_HB_ACK_IMM) { 1262 WARN_ON(con->c.cid); 1263 sess->s.hb_missed_cnt = 0; 1264 } else { 1265 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type); 1266 } 1267 break; 1268 case IB_WC_RDMA_WRITE: 1269 case IB_WC_SEND: 1270 /* 1271 * post_send() RDMA write completions of IO reqs (read/write) 1272 * and hb. 1273 */ 1274 atomic_add(s->signal_interval, &con->c.sq_wr_avail); 1275 1276 if (!list_empty_careful(&con->rsp_wr_wait_list)) 1277 rtrs_rdma_process_wr_wait_list(con); 1278 1279 break; 1280 default: 1281 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode); 1282 return; 1283 } 1284 } 1285 1286 /** 1287 * rtrs_srv_get_sess_name() - Get rtrs_srv peer hostname. 1288 * @srv: Session 1289 * @sessname: Sessname buffer 1290 * @len: Length of sessname buffer 1291 */ 1292 int rtrs_srv_get_sess_name(struct rtrs_srv *srv, char *sessname, size_t len) 1293 { 1294 struct rtrs_srv_sess *sess; 1295 int err = -ENOTCONN; 1296 1297 mutex_lock(&srv->paths_mutex); 1298 list_for_each_entry(sess, &srv->paths_list, s.entry) { 1299 if (sess->state != RTRS_SRV_CONNECTED) 1300 continue; 1301 strscpy(sessname, sess->s.sessname, 1302 min_t(size_t, sizeof(sess->s.sessname), len)); 1303 err = 0; 1304 break; 1305 } 1306 mutex_unlock(&srv->paths_mutex); 1307 1308 return err; 1309 } 1310 EXPORT_SYMBOL(rtrs_srv_get_sess_name); 1311 1312 /** 1313 * rtrs_srv_get_queue_depth() - Get rtrs_srv qdepth. 1314 * @srv: Session 1315 */ 1316 int rtrs_srv_get_queue_depth(struct rtrs_srv *srv) 1317 { 1318 return srv->queue_depth; 1319 } 1320 EXPORT_SYMBOL(rtrs_srv_get_queue_depth); 1321 1322 static int find_next_bit_ring(struct rtrs_srv_sess *sess) 1323 { 1324 struct ib_device *ib_dev = sess->s.dev->ib_dev; 1325 int v; 1326 1327 v = cpumask_next(sess->cur_cq_vector, &cq_affinity_mask); 1328 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors) 1329 v = cpumask_first(&cq_affinity_mask); 1330 return v; 1331 } 1332 1333 static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_sess *sess) 1334 { 1335 sess->cur_cq_vector = find_next_bit_ring(sess); 1336 1337 return sess->cur_cq_vector; 1338 } 1339 1340 static void rtrs_srv_dev_release(struct device *dev) 1341 { 1342 struct rtrs_srv *srv = container_of(dev, struct rtrs_srv, dev); 1343 1344 kfree(srv); 1345 } 1346 1347 static void free_srv(struct rtrs_srv *srv) 1348 { 1349 int i; 1350 1351 WARN_ON(refcount_read(&srv->refcount)); 1352 for (i = 0; i < srv->queue_depth; i++) 1353 mempool_free(srv->chunks[i], chunk_pool); 1354 kfree(srv->chunks); 1355 mutex_destroy(&srv->paths_mutex); 1356 mutex_destroy(&srv->paths_ev_mutex); 1357 /* last put to release the srv structure */ 1358 put_device(&srv->dev); 1359 } 1360 1361 static struct rtrs_srv *get_or_create_srv(struct rtrs_srv_ctx *ctx, 1362 const uuid_t *paths_uuid, 1363 bool first_conn) 1364 { 1365 struct rtrs_srv *srv; 1366 int i; 1367 1368 mutex_lock(&ctx->srv_mutex); 1369 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 1370 if (uuid_equal(&srv->paths_uuid, paths_uuid) && 1371 refcount_inc_not_zero(&srv->refcount)) { 1372 mutex_unlock(&ctx->srv_mutex); 1373 return srv; 1374 } 1375 } 1376 mutex_unlock(&ctx->srv_mutex); 1377 /* 1378 * If this request is not the first connection request from the 1379 * client for this session then fail and return error. 1380 */ 1381 if (!first_conn) { 1382 pr_err_ratelimited("Error: Not the first connection request for this session\n"); 1383 return ERR_PTR(-ENXIO); 1384 } 1385 1386 /* need to allocate a new srv */ 1387 srv = kzalloc(sizeof(*srv), GFP_KERNEL); 1388 if (!srv) 1389 return ERR_PTR(-ENOMEM); 1390 1391 INIT_LIST_HEAD(&srv->paths_list); 1392 mutex_init(&srv->paths_mutex); 1393 mutex_init(&srv->paths_ev_mutex); 1394 uuid_copy(&srv->paths_uuid, paths_uuid); 1395 srv->queue_depth = sess_queue_depth; 1396 srv->ctx = ctx; 1397 device_initialize(&srv->dev); 1398 srv->dev.release = rtrs_srv_dev_release; 1399 1400 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks), 1401 GFP_KERNEL); 1402 if (!srv->chunks) 1403 goto err_free_srv; 1404 1405 for (i = 0; i < srv->queue_depth; i++) { 1406 srv->chunks[i] = mempool_alloc(chunk_pool, GFP_KERNEL); 1407 if (!srv->chunks[i]) 1408 goto err_free_chunks; 1409 } 1410 refcount_set(&srv->refcount, 1); 1411 mutex_lock(&ctx->srv_mutex); 1412 list_add(&srv->ctx_list, &ctx->srv_list); 1413 mutex_unlock(&ctx->srv_mutex); 1414 1415 return srv; 1416 1417 err_free_chunks: 1418 while (i--) 1419 mempool_free(srv->chunks[i], chunk_pool); 1420 kfree(srv->chunks); 1421 1422 err_free_srv: 1423 kfree(srv); 1424 return ERR_PTR(-ENOMEM); 1425 } 1426 1427 static void put_srv(struct rtrs_srv *srv) 1428 { 1429 if (refcount_dec_and_test(&srv->refcount)) { 1430 struct rtrs_srv_ctx *ctx = srv->ctx; 1431 1432 WARN_ON(srv->dev.kobj.state_in_sysfs); 1433 1434 mutex_lock(&ctx->srv_mutex); 1435 list_del(&srv->ctx_list); 1436 mutex_unlock(&ctx->srv_mutex); 1437 free_srv(srv); 1438 } 1439 } 1440 1441 static void __add_path_to_srv(struct rtrs_srv *srv, 1442 struct rtrs_srv_sess *sess) 1443 { 1444 list_add_tail(&sess->s.entry, &srv->paths_list); 1445 srv->paths_num++; 1446 WARN_ON(srv->paths_num >= MAX_PATHS_NUM); 1447 } 1448 1449 static void del_path_from_srv(struct rtrs_srv_sess *sess) 1450 { 1451 struct rtrs_srv *srv = sess->srv; 1452 1453 if (WARN_ON(!srv)) 1454 return; 1455 1456 mutex_lock(&srv->paths_mutex); 1457 list_del(&sess->s.entry); 1458 WARN_ON(!srv->paths_num); 1459 srv->paths_num--; 1460 mutex_unlock(&srv->paths_mutex); 1461 } 1462 1463 /* return true if addresses are the same, error other wise */ 1464 static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b) 1465 { 1466 switch (a->sa_family) { 1467 case AF_IB: 1468 return memcmp(&((struct sockaddr_ib *)a)->sib_addr, 1469 &((struct sockaddr_ib *)b)->sib_addr, 1470 sizeof(struct ib_addr)) && 1471 (b->sa_family == AF_IB); 1472 case AF_INET: 1473 return memcmp(&((struct sockaddr_in *)a)->sin_addr, 1474 &((struct sockaddr_in *)b)->sin_addr, 1475 sizeof(struct in_addr)) && 1476 (b->sa_family == AF_INET); 1477 case AF_INET6: 1478 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr, 1479 &((struct sockaddr_in6 *)b)->sin6_addr, 1480 sizeof(struct in6_addr)) && 1481 (b->sa_family == AF_INET6); 1482 default: 1483 return -ENOENT; 1484 } 1485 } 1486 1487 static bool __is_path_w_addr_exists(struct rtrs_srv *srv, 1488 struct rdma_addr *addr) 1489 { 1490 struct rtrs_srv_sess *sess; 1491 1492 list_for_each_entry(sess, &srv->paths_list, s.entry) 1493 if (!sockaddr_cmp((struct sockaddr *)&sess->s.dst_addr, 1494 (struct sockaddr *)&addr->dst_addr) && 1495 !sockaddr_cmp((struct sockaddr *)&sess->s.src_addr, 1496 (struct sockaddr *)&addr->src_addr)) 1497 return true; 1498 1499 return false; 1500 } 1501 1502 static void free_sess(struct rtrs_srv_sess *sess) 1503 { 1504 if (sess->kobj.state_in_sysfs) { 1505 kobject_del(&sess->kobj); 1506 kobject_put(&sess->kobj); 1507 } else { 1508 kfree(sess->stats); 1509 kfree(sess); 1510 } 1511 } 1512 1513 static void rtrs_srv_close_work(struct work_struct *work) 1514 { 1515 struct rtrs_srv_sess *sess; 1516 struct rtrs_srv_con *con; 1517 int i; 1518 1519 sess = container_of(work, typeof(*sess), close_work); 1520 1521 rtrs_srv_destroy_sess_files(sess); 1522 rtrs_srv_stop_hb(sess); 1523 1524 for (i = 0; i < sess->s.con_num; i++) { 1525 if (!sess->s.con[i]) 1526 continue; 1527 con = to_srv_con(sess->s.con[i]); 1528 rdma_disconnect(con->c.cm_id); 1529 ib_drain_qp(con->c.qp); 1530 } 1531 1532 /* 1533 * Degrade ref count to the usual model with a single shared 1534 * atomic_t counter 1535 */ 1536 percpu_ref_kill(&sess->ids_inflight_ref); 1537 1538 /* Wait for all completion */ 1539 wait_for_completion(&sess->complete_done); 1540 1541 /* Notify upper layer if we are the last path */ 1542 rtrs_srv_sess_down(sess); 1543 1544 unmap_cont_bufs(sess); 1545 rtrs_srv_free_ops_ids(sess); 1546 1547 for (i = 0; i < sess->s.con_num; i++) { 1548 if (!sess->s.con[i]) 1549 continue; 1550 con = to_srv_con(sess->s.con[i]); 1551 rtrs_cq_qp_destroy(&con->c); 1552 rdma_destroy_id(con->c.cm_id); 1553 kfree(con); 1554 } 1555 rtrs_ib_dev_put(sess->s.dev); 1556 1557 del_path_from_srv(sess); 1558 put_srv(sess->srv); 1559 sess->srv = NULL; 1560 rtrs_srv_change_state(sess, RTRS_SRV_CLOSED); 1561 1562 kfree(sess->dma_addr); 1563 kfree(sess->s.con); 1564 free_sess(sess); 1565 } 1566 1567 static int rtrs_rdma_do_accept(struct rtrs_srv_sess *sess, 1568 struct rdma_cm_id *cm_id) 1569 { 1570 struct rtrs_srv *srv = sess->srv; 1571 struct rtrs_msg_conn_rsp msg; 1572 struct rdma_conn_param param; 1573 int err; 1574 1575 param = (struct rdma_conn_param) { 1576 .rnr_retry_count = 7, 1577 .private_data = &msg, 1578 .private_data_len = sizeof(msg), 1579 }; 1580 1581 msg = (struct rtrs_msg_conn_rsp) { 1582 .magic = cpu_to_le16(RTRS_MAGIC), 1583 .version = cpu_to_le16(RTRS_PROTO_VER), 1584 .queue_depth = cpu_to_le16(srv->queue_depth), 1585 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE), 1586 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE), 1587 }; 1588 1589 if (always_invalidate) 1590 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F); 1591 1592 err = rdma_accept(cm_id, ¶m); 1593 if (err) 1594 pr_err("rdma_accept(), err: %d\n", err); 1595 1596 return err; 1597 } 1598 1599 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno) 1600 { 1601 struct rtrs_msg_conn_rsp msg; 1602 int err; 1603 1604 msg = (struct rtrs_msg_conn_rsp) { 1605 .magic = cpu_to_le16(RTRS_MAGIC), 1606 .version = cpu_to_le16(RTRS_PROTO_VER), 1607 .errno = cpu_to_le16(errno), 1608 }; 1609 1610 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED); 1611 if (err) 1612 pr_err("rdma_reject(), err: %d\n", err); 1613 1614 /* Bounce errno back */ 1615 return errno; 1616 } 1617 1618 static struct rtrs_srv_sess * 1619 __find_sess(struct rtrs_srv *srv, const uuid_t *sess_uuid) 1620 { 1621 struct rtrs_srv_sess *sess; 1622 1623 list_for_each_entry(sess, &srv->paths_list, s.entry) { 1624 if (uuid_equal(&sess->s.uuid, sess_uuid)) 1625 return sess; 1626 } 1627 1628 return NULL; 1629 } 1630 1631 static int create_con(struct rtrs_srv_sess *sess, 1632 struct rdma_cm_id *cm_id, 1633 unsigned int cid) 1634 { 1635 struct rtrs_srv *srv = sess->srv; 1636 struct rtrs_sess *s = &sess->s; 1637 struct rtrs_srv_con *con; 1638 1639 u32 cq_num, max_send_wr, max_recv_wr, wr_limit; 1640 int err, cq_vector; 1641 1642 con = kzalloc(sizeof(*con), GFP_KERNEL); 1643 if (!con) { 1644 err = -ENOMEM; 1645 goto err; 1646 } 1647 1648 spin_lock_init(&con->rsp_wr_wait_lock); 1649 INIT_LIST_HEAD(&con->rsp_wr_wait_list); 1650 con->c.cm_id = cm_id; 1651 con->c.sess = &sess->s; 1652 con->c.cid = cid; 1653 atomic_set(&con->c.wr_cnt, 1); 1654 wr_limit = sess->s.dev->ib_dev->attrs.max_qp_wr; 1655 1656 if (con->c.cid == 0) { 1657 /* 1658 * All receive and all send (each requiring invalidate) 1659 * + 2 for drain and heartbeat 1660 */ 1661 max_send_wr = min_t(int, wr_limit, 1662 SERVICE_CON_QUEUE_DEPTH * 2 + 2); 1663 max_recv_wr = max_send_wr; 1664 s->signal_interval = min_not_zero(srv->queue_depth, 1665 (size_t)SERVICE_CON_QUEUE_DEPTH); 1666 } else { 1667 /* when always_invlaidate enalbed, we need linv+rinv+mr+imm */ 1668 if (always_invalidate) 1669 max_send_wr = 1670 min_t(int, wr_limit, 1671 srv->queue_depth * (1 + 4) + 1); 1672 else 1673 max_send_wr = 1674 min_t(int, wr_limit, 1675 srv->queue_depth * (1 + 2) + 1); 1676 1677 max_recv_wr = srv->queue_depth + 1; 1678 /* 1679 * If we have all receive requests posted and 1680 * all write requests posted and each read request 1681 * requires an invalidate request + drain 1682 * and qp gets into error state. 1683 */ 1684 } 1685 cq_num = max_send_wr + max_recv_wr; 1686 atomic_set(&con->c.sq_wr_avail, max_send_wr); 1687 cq_vector = rtrs_srv_get_next_cq_vector(sess); 1688 1689 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */ 1690 err = rtrs_cq_qp_create(&sess->s, &con->c, 1, cq_vector, cq_num, 1691 max_send_wr, max_recv_wr, 1692 IB_POLL_WORKQUEUE); 1693 if (err) { 1694 rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err); 1695 goto free_con; 1696 } 1697 if (con->c.cid == 0) { 1698 err = post_recv_info_req(con); 1699 if (err) 1700 goto free_cqqp; 1701 } 1702 WARN_ON(sess->s.con[cid]); 1703 sess->s.con[cid] = &con->c; 1704 1705 /* 1706 * Change context from server to current connection. The other 1707 * way is to use cm_id->qp->qp_context, which does not work on OFED. 1708 */ 1709 cm_id->context = &con->c; 1710 1711 return 0; 1712 1713 free_cqqp: 1714 rtrs_cq_qp_destroy(&con->c); 1715 free_con: 1716 kfree(con); 1717 1718 err: 1719 return err; 1720 } 1721 1722 static struct rtrs_srv_sess *__alloc_sess(struct rtrs_srv *srv, 1723 struct rdma_cm_id *cm_id, 1724 unsigned int con_num, 1725 unsigned int recon_cnt, 1726 const uuid_t *uuid) 1727 { 1728 struct rtrs_srv_sess *sess; 1729 int err = -ENOMEM; 1730 char str[NAME_MAX]; 1731 struct rtrs_addr path; 1732 1733 if (srv->paths_num >= MAX_PATHS_NUM) { 1734 err = -ECONNRESET; 1735 goto err; 1736 } 1737 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) { 1738 err = -EEXIST; 1739 pr_err("Path with same addr exists\n"); 1740 goto err; 1741 } 1742 sess = kzalloc(sizeof(*sess), GFP_KERNEL); 1743 if (!sess) 1744 goto err; 1745 1746 sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL); 1747 if (!sess->stats) 1748 goto err_free_sess; 1749 1750 sess->stats->sess = sess; 1751 1752 sess->dma_addr = kcalloc(srv->queue_depth, sizeof(*sess->dma_addr), 1753 GFP_KERNEL); 1754 if (!sess->dma_addr) 1755 goto err_free_stats; 1756 1757 sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL); 1758 if (!sess->s.con) 1759 goto err_free_dma_addr; 1760 1761 sess->state = RTRS_SRV_CONNECTING; 1762 sess->srv = srv; 1763 sess->cur_cq_vector = -1; 1764 sess->s.dst_addr = cm_id->route.addr.dst_addr; 1765 sess->s.src_addr = cm_id->route.addr.src_addr; 1766 1767 /* temporary until receiving session-name from client */ 1768 path.src = &sess->s.src_addr; 1769 path.dst = &sess->s.dst_addr; 1770 rtrs_addr_to_str(&path, str, sizeof(str)); 1771 strscpy(sess->s.sessname, str, sizeof(sess->s.sessname)); 1772 1773 sess->s.con_num = con_num; 1774 sess->s.irq_con_num = con_num; 1775 sess->s.recon_cnt = recon_cnt; 1776 uuid_copy(&sess->s.uuid, uuid); 1777 spin_lock_init(&sess->state_lock); 1778 INIT_WORK(&sess->close_work, rtrs_srv_close_work); 1779 rtrs_srv_init_hb(sess); 1780 1781 sess->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd); 1782 if (!sess->s.dev) { 1783 err = -ENOMEM; 1784 goto err_free_con; 1785 } 1786 err = map_cont_bufs(sess); 1787 if (err) 1788 goto err_put_dev; 1789 1790 err = rtrs_srv_alloc_ops_ids(sess); 1791 if (err) 1792 goto err_unmap_bufs; 1793 1794 __add_path_to_srv(srv, sess); 1795 1796 return sess; 1797 1798 err_unmap_bufs: 1799 unmap_cont_bufs(sess); 1800 err_put_dev: 1801 rtrs_ib_dev_put(sess->s.dev); 1802 err_free_con: 1803 kfree(sess->s.con); 1804 err_free_dma_addr: 1805 kfree(sess->dma_addr); 1806 err_free_stats: 1807 kfree(sess->stats); 1808 err_free_sess: 1809 kfree(sess); 1810 err: 1811 return ERR_PTR(err); 1812 } 1813 1814 static int rtrs_rdma_connect(struct rdma_cm_id *cm_id, 1815 const struct rtrs_msg_conn_req *msg, 1816 size_t len) 1817 { 1818 struct rtrs_srv_ctx *ctx = cm_id->context; 1819 struct rtrs_srv_sess *sess; 1820 struct rtrs_srv *srv; 1821 1822 u16 version, con_num, cid; 1823 u16 recon_cnt; 1824 int err = -ECONNRESET; 1825 1826 if (len < sizeof(*msg)) { 1827 pr_err("Invalid RTRS connection request\n"); 1828 goto reject_w_err; 1829 } 1830 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) { 1831 pr_err("Invalid RTRS magic\n"); 1832 goto reject_w_err; 1833 } 1834 version = le16_to_cpu(msg->version); 1835 if (version >> 8 != RTRS_PROTO_VER_MAJOR) { 1836 pr_err("Unsupported major RTRS version: %d, expected %d\n", 1837 version >> 8, RTRS_PROTO_VER_MAJOR); 1838 goto reject_w_err; 1839 } 1840 con_num = le16_to_cpu(msg->cid_num); 1841 if (con_num > 4096) { 1842 /* Sanity check */ 1843 pr_err("Too many connections requested: %d\n", con_num); 1844 goto reject_w_err; 1845 } 1846 cid = le16_to_cpu(msg->cid); 1847 if (cid >= con_num) { 1848 /* Sanity check */ 1849 pr_err("Incorrect cid: %d >= %d\n", cid, con_num); 1850 goto reject_w_err; 1851 } 1852 recon_cnt = le16_to_cpu(msg->recon_cnt); 1853 srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn); 1854 if (IS_ERR(srv)) { 1855 err = PTR_ERR(srv); 1856 pr_err("get_or_create_srv(), error %d\n", err); 1857 goto reject_w_err; 1858 } 1859 mutex_lock(&srv->paths_mutex); 1860 sess = __find_sess(srv, &msg->sess_uuid); 1861 if (sess) { 1862 struct rtrs_sess *s = &sess->s; 1863 1864 /* Session already holds a reference */ 1865 put_srv(srv); 1866 1867 if (sess->state != RTRS_SRV_CONNECTING) { 1868 rtrs_err(s, "Session in wrong state: %s\n", 1869 rtrs_srv_state_str(sess->state)); 1870 mutex_unlock(&srv->paths_mutex); 1871 goto reject_w_err; 1872 } 1873 /* 1874 * Sanity checks 1875 */ 1876 if (con_num != s->con_num || cid >= s->con_num) { 1877 rtrs_err(s, "Incorrect request: %d, %d\n", 1878 cid, con_num); 1879 mutex_unlock(&srv->paths_mutex); 1880 goto reject_w_err; 1881 } 1882 if (s->con[cid]) { 1883 rtrs_err(s, "Connection already exists: %d\n", 1884 cid); 1885 mutex_unlock(&srv->paths_mutex); 1886 goto reject_w_err; 1887 } 1888 } else { 1889 sess = __alloc_sess(srv, cm_id, con_num, recon_cnt, 1890 &msg->sess_uuid); 1891 if (IS_ERR(sess)) { 1892 mutex_unlock(&srv->paths_mutex); 1893 put_srv(srv); 1894 err = PTR_ERR(sess); 1895 pr_err("RTRS server session allocation failed: %d\n", err); 1896 goto reject_w_err; 1897 } 1898 } 1899 err = create_con(sess, cm_id, cid); 1900 if (err) { 1901 rtrs_err((&sess->s), "create_con(), error %d\n", err); 1902 rtrs_rdma_do_reject(cm_id, err); 1903 /* 1904 * Since session has other connections we follow normal way 1905 * through workqueue, but still return an error to tell cma.c 1906 * to call rdma_destroy_id() for current connection. 1907 */ 1908 goto close_and_return_err; 1909 } 1910 err = rtrs_rdma_do_accept(sess, cm_id); 1911 if (err) { 1912 rtrs_err((&sess->s), "rtrs_rdma_do_accept(), error %d\n", err); 1913 rtrs_rdma_do_reject(cm_id, err); 1914 /* 1915 * Since current connection was successfully added to the 1916 * session we follow normal way through workqueue to close the 1917 * session, thus return 0 to tell cma.c we call 1918 * rdma_destroy_id() ourselves. 1919 */ 1920 err = 0; 1921 goto close_and_return_err; 1922 } 1923 mutex_unlock(&srv->paths_mutex); 1924 1925 return 0; 1926 1927 reject_w_err: 1928 return rtrs_rdma_do_reject(cm_id, err); 1929 1930 close_and_return_err: 1931 mutex_unlock(&srv->paths_mutex); 1932 close_sess(sess); 1933 1934 return err; 1935 } 1936 1937 static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id, 1938 struct rdma_cm_event *ev) 1939 { 1940 struct rtrs_srv_sess *sess = NULL; 1941 struct rtrs_sess *s = NULL; 1942 1943 if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 1944 struct rtrs_con *c = cm_id->context; 1945 1946 s = c->sess; 1947 sess = to_srv_sess(s); 1948 } 1949 1950 switch (ev->event) { 1951 case RDMA_CM_EVENT_CONNECT_REQUEST: 1952 /* 1953 * In case of error cma.c will destroy cm_id, 1954 * see cma_process_remove() 1955 */ 1956 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data, 1957 ev->param.conn.private_data_len); 1958 case RDMA_CM_EVENT_ESTABLISHED: 1959 /* Nothing here */ 1960 break; 1961 case RDMA_CM_EVENT_REJECTED: 1962 case RDMA_CM_EVENT_CONNECT_ERROR: 1963 case RDMA_CM_EVENT_UNREACHABLE: 1964 rtrs_err(s, "CM error (CM event: %s, err: %d)\n", 1965 rdma_event_msg(ev->event), ev->status); 1966 fallthrough; 1967 case RDMA_CM_EVENT_DISCONNECTED: 1968 case RDMA_CM_EVENT_ADDR_CHANGE: 1969 case RDMA_CM_EVENT_TIMEWAIT_EXIT: 1970 case RDMA_CM_EVENT_DEVICE_REMOVAL: 1971 close_sess(sess); 1972 break; 1973 default: 1974 pr_err("Ignoring unexpected CM event %s, err %d\n", 1975 rdma_event_msg(ev->event), ev->status); 1976 break; 1977 } 1978 1979 return 0; 1980 } 1981 1982 static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx, 1983 struct sockaddr *addr, 1984 enum rdma_ucm_port_space ps) 1985 { 1986 struct rdma_cm_id *cm_id; 1987 int ret; 1988 1989 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler, 1990 ctx, ps, IB_QPT_RC); 1991 if (IS_ERR(cm_id)) { 1992 ret = PTR_ERR(cm_id); 1993 pr_err("Creating id for RDMA connection failed, err: %d\n", 1994 ret); 1995 goto err_out; 1996 } 1997 ret = rdma_bind_addr(cm_id, addr); 1998 if (ret) { 1999 pr_err("Binding RDMA address failed, err: %d\n", ret); 2000 goto err_cm; 2001 } 2002 ret = rdma_listen(cm_id, 64); 2003 if (ret) { 2004 pr_err("Listening on RDMA connection failed, err: %d\n", 2005 ret); 2006 goto err_cm; 2007 } 2008 2009 return cm_id; 2010 2011 err_cm: 2012 rdma_destroy_id(cm_id); 2013 err_out: 2014 2015 return ERR_PTR(ret); 2016 } 2017 2018 static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port) 2019 { 2020 struct sockaddr_in6 sin = { 2021 .sin6_family = AF_INET6, 2022 .sin6_addr = IN6ADDR_ANY_INIT, 2023 .sin6_port = htons(port), 2024 }; 2025 struct sockaddr_ib sib = { 2026 .sib_family = AF_IB, 2027 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port), 2028 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL), 2029 .sib_pkey = cpu_to_be16(0xffff), 2030 }; 2031 struct rdma_cm_id *cm_ip, *cm_ib; 2032 int ret; 2033 2034 /* 2035 * We accept both IPoIB and IB connections, so we need to keep 2036 * two cm id's, one for each socket type and port space. 2037 * If the cm initialization of one of the id's fails, we abort 2038 * everything. 2039 */ 2040 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP); 2041 if (IS_ERR(cm_ip)) 2042 return PTR_ERR(cm_ip); 2043 2044 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB); 2045 if (IS_ERR(cm_ib)) { 2046 ret = PTR_ERR(cm_ib); 2047 goto free_cm_ip; 2048 } 2049 2050 ctx->cm_id_ip = cm_ip; 2051 ctx->cm_id_ib = cm_ib; 2052 2053 return 0; 2054 2055 free_cm_ip: 2056 rdma_destroy_id(cm_ip); 2057 2058 return ret; 2059 } 2060 2061 static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops) 2062 { 2063 struct rtrs_srv_ctx *ctx; 2064 2065 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); 2066 if (!ctx) 2067 return NULL; 2068 2069 ctx->ops = *ops; 2070 mutex_init(&ctx->srv_mutex); 2071 INIT_LIST_HEAD(&ctx->srv_list); 2072 2073 return ctx; 2074 } 2075 2076 static void free_srv_ctx(struct rtrs_srv_ctx *ctx) 2077 { 2078 WARN_ON(!list_empty(&ctx->srv_list)); 2079 mutex_destroy(&ctx->srv_mutex); 2080 kfree(ctx); 2081 } 2082 2083 static int rtrs_srv_add_one(struct ib_device *device) 2084 { 2085 struct rtrs_srv_ctx *ctx; 2086 int ret = 0; 2087 2088 mutex_lock(&ib_ctx.ib_dev_mutex); 2089 if (ib_ctx.ib_dev_count) 2090 goto out; 2091 2092 /* 2093 * Since our CM IDs are NOT bound to any ib device we will create them 2094 * only once 2095 */ 2096 ctx = ib_ctx.srv_ctx; 2097 ret = rtrs_srv_rdma_init(ctx, ib_ctx.port); 2098 if (ret) { 2099 /* 2100 * We errored out here. 2101 * According to the ib code, if we encounter an error here then the 2102 * error code is ignored, and no more calls to our ops are made. 2103 */ 2104 pr_err("Failed to initialize RDMA connection"); 2105 goto err_out; 2106 } 2107 2108 out: 2109 /* 2110 * Keep a track on the number of ib devices added 2111 */ 2112 ib_ctx.ib_dev_count++; 2113 2114 err_out: 2115 mutex_unlock(&ib_ctx.ib_dev_mutex); 2116 return ret; 2117 } 2118 2119 static void rtrs_srv_remove_one(struct ib_device *device, void *client_data) 2120 { 2121 struct rtrs_srv_ctx *ctx; 2122 2123 mutex_lock(&ib_ctx.ib_dev_mutex); 2124 ib_ctx.ib_dev_count--; 2125 2126 if (ib_ctx.ib_dev_count) 2127 goto out; 2128 2129 /* 2130 * Since our CM IDs are NOT bound to any ib device we will remove them 2131 * only once, when the last device is removed 2132 */ 2133 ctx = ib_ctx.srv_ctx; 2134 rdma_destroy_id(ctx->cm_id_ip); 2135 rdma_destroy_id(ctx->cm_id_ib); 2136 2137 out: 2138 mutex_unlock(&ib_ctx.ib_dev_mutex); 2139 } 2140 2141 static struct ib_client rtrs_srv_client = { 2142 .name = "rtrs_server", 2143 .add = rtrs_srv_add_one, 2144 .remove = rtrs_srv_remove_one 2145 }; 2146 2147 /** 2148 * rtrs_srv_open() - open RTRS server context 2149 * @ops: callback functions 2150 * @port: port to listen on 2151 * 2152 * Creates server context with specified callbacks. 2153 * 2154 * Return a valid pointer on success otherwise PTR_ERR. 2155 */ 2156 struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port) 2157 { 2158 struct rtrs_srv_ctx *ctx; 2159 int err; 2160 2161 ctx = alloc_srv_ctx(ops); 2162 if (!ctx) 2163 return ERR_PTR(-ENOMEM); 2164 2165 mutex_init(&ib_ctx.ib_dev_mutex); 2166 ib_ctx.srv_ctx = ctx; 2167 ib_ctx.port = port; 2168 2169 err = ib_register_client(&rtrs_srv_client); 2170 if (err) { 2171 free_srv_ctx(ctx); 2172 return ERR_PTR(err); 2173 } 2174 2175 return ctx; 2176 } 2177 EXPORT_SYMBOL(rtrs_srv_open); 2178 2179 static void close_sessions(struct rtrs_srv *srv) 2180 { 2181 struct rtrs_srv_sess *sess; 2182 2183 mutex_lock(&srv->paths_mutex); 2184 list_for_each_entry(sess, &srv->paths_list, s.entry) 2185 close_sess(sess); 2186 mutex_unlock(&srv->paths_mutex); 2187 } 2188 2189 static void close_ctx(struct rtrs_srv_ctx *ctx) 2190 { 2191 struct rtrs_srv *srv; 2192 2193 mutex_lock(&ctx->srv_mutex); 2194 list_for_each_entry(srv, &ctx->srv_list, ctx_list) 2195 close_sessions(srv); 2196 mutex_unlock(&ctx->srv_mutex); 2197 flush_workqueue(rtrs_wq); 2198 } 2199 2200 /** 2201 * rtrs_srv_close() - close RTRS server context 2202 * @ctx: pointer to server context 2203 * 2204 * Closes RTRS server context with all client sessions. 2205 */ 2206 void rtrs_srv_close(struct rtrs_srv_ctx *ctx) 2207 { 2208 ib_unregister_client(&rtrs_srv_client); 2209 mutex_destroy(&ib_ctx.ib_dev_mutex); 2210 close_ctx(ctx); 2211 free_srv_ctx(ctx); 2212 } 2213 EXPORT_SYMBOL(rtrs_srv_close); 2214 2215 static int check_module_params(void) 2216 { 2217 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) { 2218 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n", 2219 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH); 2220 return -EINVAL; 2221 } 2222 if (max_chunk_size < MIN_CHUNK_SIZE || !is_power_of_2(max_chunk_size)) { 2223 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n", 2224 max_chunk_size, MIN_CHUNK_SIZE); 2225 return -EINVAL; 2226 } 2227 2228 /* 2229 * Check if IB immediate data size is enough to hold the mem_id and the 2230 * offset inside the memory chunk 2231 */ 2232 if ((ilog2(sess_queue_depth - 1) + 1) + 2233 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) { 2234 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n", 2235 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size); 2236 return -EINVAL; 2237 } 2238 2239 return 0; 2240 } 2241 2242 static int __init rtrs_server_init(void) 2243 { 2244 int err; 2245 2246 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n", 2247 KBUILD_MODNAME, RTRS_PROTO_VER_STRING, 2248 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE, 2249 sess_queue_depth, always_invalidate); 2250 2251 rtrs_rdma_dev_pd_init(0, &dev_pd); 2252 2253 err = check_module_params(); 2254 if (err) { 2255 pr_err("Failed to load module, invalid module parameters, err: %d\n", 2256 err); 2257 return err; 2258 } 2259 chunk_pool = mempool_create_page_pool(sess_queue_depth * CHUNK_POOL_SZ, 2260 get_order(max_chunk_size)); 2261 if (!chunk_pool) 2262 return -ENOMEM; 2263 rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server"); 2264 if (IS_ERR(rtrs_dev_class)) { 2265 err = PTR_ERR(rtrs_dev_class); 2266 goto out_chunk_pool; 2267 } 2268 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0); 2269 if (!rtrs_wq) { 2270 err = -ENOMEM; 2271 goto out_dev_class; 2272 } 2273 2274 return 0; 2275 2276 out_dev_class: 2277 class_destroy(rtrs_dev_class); 2278 out_chunk_pool: 2279 mempool_destroy(chunk_pool); 2280 2281 return err; 2282 } 2283 2284 static void __exit rtrs_server_exit(void) 2285 { 2286 destroy_workqueue(rtrs_wq); 2287 class_destroy(rtrs_dev_class); 2288 mempool_destroy(chunk_pool); 2289 rtrs_rdma_dev_pd_deinit(&dev_pd); 2290 } 2291 2292 module_init(rtrs_server_init); 2293 module_exit(rtrs_server_exit); 2294