1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Transport Layer 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 #include <linux/mempool.h> 15 16 #include "rtrs-srv.h" 17 #include "rtrs-log.h" 18 #include <rdma/ib_cm.h> 19 #include <rdma/ib_verbs.h> 20 21 MODULE_DESCRIPTION("RDMA Transport Server"); 22 MODULE_LICENSE("GPL"); 23 24 /* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */ 25 #define DEFAULT_MAX_CHUNK_SIZE (128 << 10) 26 #define DEFAULT_SESS_QUEUE_DEPTH 512 27 #define MAX_HDR_SIZE PAGE_SIZE 28 29 /* We guarantee to serve 10 paths at least */ 30 #define CHUNK_POOL_SZ 10 31 32 static struct rtrs_rdma_dev_pd dev_pd; 33 static mempool_t *chunk_pool; 34 struct class *rtrs_dev_class; 35 static struct rtrs_srv_ib_ctx ib_ctx; 36 37 static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE; 38 static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH; 39 40 static bool always_invalidate = true; 41 module_param(always_invalidate, bool, 0444); 42 MODULE_PARM_DESC(always_invalidate, 43 "Invalidate memory registration for contiguous memory regions before accessing."); 44 45 module_param_named(max_chunk_size, max_chunk_size, int, 0444); 46 MODULE_PARM_DESC(max_chunk_size, 47 "Max size for each IO request, when change the unit is in byte (default: " 48 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)"); 49 50 module_param_named(sess_queue_depth, sess_queue_depth, int, 0444); 51 MODULE_PARM_DESC(sess_queue_depth, 52 "Number of buffers for pending I/O requests to allocate per session. Maximum: " 53 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: " 54 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")"); 55 56 static cpumask_t cq_affinity_mask = { CPU_BITS_ALL }; 57 58 static struct workqueue_struct *rtrs_wq; 59 60 static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c) 61 { 62 return container_of(c, struct rtrs_srv_con, c); 63 } 64 65 static inline struct rtrs_srv_sess *to_srv_sess(struct rtrs_sess *s) 66 { 67 return container_of(s, struct rtrs_srv_sess, s); 68 } 69 70 static bool rtrs_srv_change_state(struct rtrs_srv_sess *sess, 71 enum rtrs_srv_state new_state) 72 { 73 enum rtrs_srv_state old_state; 74 bool changed = false; 75 76 spin_lock_irq(&sess->state_lock); 77 old_state = sess->state; 78 switch (new_state) { 79 case RTRS_SRV_CONNECTED: 80 if (old_state == RTRS_SRV_CONNECTING) 81 changed = true; 82 break; 83 case RTRS_SRV_CLOSING: 84 if (old_state == RTRS_SRV_CONNECTING || 85 old_state == RTRS_SRV_CONNECTED) 86 changed = true; 87 break; 88 case RTRS_SRV_CLOSED: 89 if (old_state == RTRS_SRV_CLOSING) 90 changed = true; 91 break; 92 default: 93 break; 94 } 95 if (changed) 96 sess->state = new_state; 97 spin_unlock_irq(&sess->state_lock); 98 99 return changed; 100 } 101 102 static void free_id(struct rtrs_srv_op *id) 103 { 104 if (!id) 105 return; 106 kfree(id); 107 } 108 109 static void rtrs_srv_free_ops_ids(struct rtrs_srv_sess *sess) 110 { 111 struct rtrs_srv *srv = sess->srv; 112 int i; 113 114 if (sess->ops_ids) { 115 for (i = 0; i < srv->queue_depth; i++) 116 free_id(sess->ops_ids[i]); 117 kfree(sess->ops_ids); 118 sess->ops_ids = NULL; 119 } 120 } 121 122 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc); 123 124 static struct ib_cqe io_comp_cqe = { 125 .done = rtrs_srv_rdma_done 126 }; 127 128 static inline void rtrs_srv_inflight_ref_release(struct percpu_ref *ref) 129 { 130 struct rtrs_srv_sess *sess = container_of(ref, struct rtrs_srv_sess, ids_inflight_ref); 131 132 percpu_ref_exit(&sess->ids_inflight_ref); 133 complete(&sess->complete_done); 134 } 135 136 static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_sess *sess) 137 { 138 struct rtrs_srv *srv = sess->srv; 139 struct rtrs_srv_op *id; 140 int i, ret; 141 142 sess->ops_ids = kcalloc(srv->queue_depth, sizeof(*sess->ops_ids), 143 GFP_KERNEL); 144 if (!sess->ops_ids) 145 goto err; 146 147 for (i = 0; i < srv->queue_depth; ++i) { 148 id = kzalloc(sizeof(*id), GFP_KERNEL); 149 if (!id) 150 goto err; 151 152 sess->ops_ids[i] = id; 153 } 154 155 ret = percpu_ref_init(&sess->ids_inflight_ref, 156 rtrs_srv_inflight_ref_release, 0, GFP_KERNEL); 157 if (ret) { 158 pr_err("Percpu reference init failed\n"); 159 goto err; 160 } 161 init_completion(&sess->complete_done); 162 163 return 0; 164 165 err: 166 rtrs_srv_free_ops_ids(sess); 167 return -ENOMEM; 168 } 169 170 static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_sess *sess) 171 { 172 percpu_ref_get(&sess->ids_inflight_ref); 173 } 174 175 static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_sess *sess) 176 { 177 percpu_ref_put(&sess->ids_inflight_ref); 178 } 179 180 static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc) 181 { 182 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 183 struct rtrs_sess *s = con->c.sess; 184 struct rtrs_srv_sess *sess = to_srv_sess(s); 185 186 if (wc->status != IB_WC_SUCCESS) { 187 rtrs_err(s, "REG MR failed: %s\n", 188 ib_wc_status_msg(wc->status)); 189 close_sess(sess); 190 return; 191 } 192 } 193 194 static struct ib_cqe local_reg_cqe = { 195 .done = rtrs_srv_reg_mr_done 196 }; 197 198 static int rdma_write_sg(struct rtrs_srv_op *id) 199 { 200 struct rtrs_sess *s = id->con->c.sess; 201 struct rtrs_srv_sess *sess = to_srv_sess(s); 202 dma_addr_t dma_addr = sess->dma_addr[id->msg_id]; 203 struct rtrs_srv_mr *srv_mr; 204 struct ib_send_wr inv_wr; 205 struct ib_rdma_wr imm_wr; 206 struct ib_rdma_wr *wr = NULL; 207 enum ib_send_flags flags; 208 size_t sg_cnt; 209 int err, offset; 210 bool need_inval; 211 u32 rkey = 0; 212 struct ib_reg_wr rwr; 213 struct ib_sge *plist; 214 struct ib_sge list; 215 216 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt); 217 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F; 218 if (sg_cnt != 1) 219 return -EINVAL; 220 221 offset = 0; 222 223 wr = &id->tx_wr; 224 plist = &id->tx_sg; 225 plist->addr = dma_addr + offset; 226 plist->length = le32_to_cpu(id->rd_msg->desc[0].len); 227 228 /* WR will fail with length error 229 * if this is 0 230 */ 231 if (plist->length == 0) { 232 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n"); 233 return -EINVAL; 234 } 235 236 plist->lkey = sess->s.dev->ib_pd->local_dma_lkey; 237 offset += plist->length; 238 239 wr->wr.sg_list = plist; 240 wr->wr.num_sge = 1; 241 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr); 242 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key); 243 if (rkey == 0) 244 rkey = wr->rkey; 245 else 246 /* Only one key is actually used */ 247 WARN_ON_ONCE(rkey != wr->rkey); 248 249 wr->wr.opcode = IB_WR_RDMA_WRITE; 250 wr->wr.wr_cqe = &io_comp_cqe; 251 wr->wr.ex.imm_data = 0; 252 wr->wr.send_flags = 0; 253 254 if (need_inval && always_invalidate) { 255 wr->wr.next = &rwr.wr; 256 rwr.wr.next = &inv_wr; 257 inv_wr.next = &imm_wr.wr; 258 } else if (always_invalidate) { 259 wr->wr.next = &rwr.wr; 260 rwr.wr.next = &imm_wr.wr; 261 } else if (need_inval) { 262 wr->wr.next = &inv_wr; 263 inv_wr.next = &imm_wr.wr; 264 } else { 265 wr->wr.next = &imm_wr.wr; 266 } 267 /* 268 * From time to time we have to post signaled sends, 269 * or send queue will fill up and only QP reset can help. 270 */ 271 flags = (atomic_inc_return(&id->con->c.wr_cnt) % s->signal_interval) ? 272 0 : IB_SEND_SIGNALED; 273 274 if (need_inval) { 275 inv_wr.sg_list = NULL; 276 inv_wr.num_sge = 0; 277 inv_wr.opcode = IB_WR_SEND_WITH_INV; 278 inv_wr.wr_cqe = &io_comp_cqe; 279 inv_wr.send_flags = 0; 280 inv_wr.ex.invalidate_rkey = rkey; 281 } 282 283 imm_wr.wr.next = NULL; 284 if (always_invalidate) { 285 struct rtrs_msg_rkey_rsp *msg; 286 287 srv_mr = &sess->mrs[id->msg_id]; 288 rwr.wr.opcode = IB_WR_REG_MR; 289 rwr.wr.wr_cqe = &local_reg_cqe; 290 rwr.wr.num_sge = 0; 291 rwr.mr = srv_mr->mr; 292 rwr.wr.send_flags = 0; 293 rwr.key = srv_mr->mr->rkey; 294 rwr.access = (IB_ACCESS_LOCAL_WRITE | 295 IB_ACCESS_REMOTE_WRITE); 296 msg = srv_mr->iu->buf; 297 msg->buf_id = cpu_to_le16(id->msg_id); 298 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 299 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 300 301 list.addr = srv_mr->iu->dma_addr; 302 list.length = sizeof(*msg); 303 list.lkey = sess->s.dev->ib_pd->local_dma_lkey; 304 imm_wr.wr.sg_list = &list; 305 imm_wr.wr.num_sge = 1; 306 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 307 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, 308 srv_mr->iu->dma_addr, 309 srv_mr->iu->size, DMA_TO_DEVICE); 310 } else { 311 imm_wr.wr.sg_list = NULL; 312 imm_wr.wr.num_sge = 0; 313 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 314 } 315 imm_wr.wr.send_flags = flags; 316 imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id, 317 0, need_inval)); 318 319 imm_wr.wr.wr_cqe = &io_comp_cqe; 320 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, dma_addr, 321 offset, DMA_BIDIRECTIONAL); 322 323 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL); 324 if (err) 325 rtrs_err(s, 326 "Posting RDMA-Write-Request to QP failed, err: %d\n", 327 err); 328 329 return err; 330 } 331 332 /** 333 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE 334 * requests or on successful WRITE request. 335 * @con: the connection to send back result 336 * @id: the id associated with the IO 337 * @errno: the error number of the IO. 338 * 339 * Return 0 on success, errno otherwise. 340 */ 341 static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id, 342 int errno) 343 { 344 struct rtrs_sess *s = con->c.sess; 345 struct rtrs_srv_sess *sess = to_srv_sess(s); 346 struct ib_send_wr inv_wr, *wr = NULL; 347 struct ib_rdma_wr imm_wr; 348 struct ib_reg_wr rwr; 349 struct rtrs_srv_mr *srv_mr; 350 bool need_inval = false; 351 enum ib_send_flags flags; 352 u32 imm; 353 int err; 354 355 if (id->dir == READ) { 356 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg; 357 size_t sg_cnt; 358 359 need_inval = le16_to_cpu(rd_msg->flags) & 360 RTRS_MSG_NEED_INVAL_F; 361 sg_cnt = le16_to_cpu(rd_msg->sg_cnt); 362 363 if (need_inval) { 364 if (sg_cnt) { 365 inv_wr.wr_cqe = &io_comp_cqe; 366 inv_wr.sg_list = NULL; 367 inv_wr.num_sge = 0; 368 inv_wr.opcode = IB_WR_SEND_WITH_INV; 369 inv_wr.send_flags = 0; 370 /* Only one key is actually used */ 371 inv_wr.ex.invalidate_rkey = 372 le32_to_cpu(rd_msg->desc[0].key); 373 } else { 374 WARN_ON_ONCE(1); 375 need_inval = false; 376 } 377 } 378 } 379 380 if (need_inval && always_invalidate) { 381 wr = &inv_wr; 382 inv_wr.next = &rwr.wr; 383 rwr.wr.next = &imm_wr.wr; 384 } else if (always_invalidate) { 385 wr = &rwr.wr; 386 rwr.wr.next = &imm_wr.wr; 387 } else if (need_inval) { 388 wr = &inv_wr; 389 inv_wr.next = &imm_wr.wr; 390 } else { 391 wr = &imm_wr.wr; 392 } 393 /* 394 * From time to time we have to post signalled sends, 395 * or send queue will fill up and only QP reset can help. 396 */ 397 flags = (atomic_inc_return(&con->c.wr_cnt) % s->signal_interval) ? 398 0 : IB_SEND_SIGNALED; 399 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval); 400 imm_wr.wr.next = NULL; 401 if (always_invalidate) { 402 struct ib_sge list; 403 struct rtrs_msg_rkey_rsp *msg; 404 405 srv_mr = &sess->mrs[id->msg_id]; 406 rwr.wr.next = &imm_wr.wr; 407 rwr.wr.opcode = IB_WR_REG_MR; 408 rwr.wr.wr_cqe = &local_reg_cqe; 409 rwr.wr.num_sge = 0; 410 rwr.wr.send_flags = 0; 411 rwr.mr = srv_mr->mr; 412 rwr.key = srv_mr->mr->rkey; 413 rwr.access = (IB_ACCESS_LOCAL_WRITE | 414 IB_ACCESS_REMOTE_WRITE); 415 msg = srv_mr->iu->buf; 416 msg->buf_id = cpu_to_le16(id->msg_id); 417 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 418 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 419 420 list.addr = srv_mr->iu->dma_addr; 421 list.length = sizeof(*msg); 422 list.lkey = sess->s.dev->ib_pd->local_dma_lkey; 423 imm_wr.wr.sg_list = &list; 424 imm_wr.wr.num_sge = 1; 425 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 426 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, 427 srv_mr->iu->dma_addr, 428 srv_mr->iu->size, DMA_TO_DEVICE); 429 } else { 430 imm_wr.wr.sg_list = NULL; 431 imm_wr.wr.num_sge = 0; 432 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 433 } 434 imm_wr.wr.send_flags = flags; 435 imm_wr.wr.wr_cqe = &io_comp_cqe; 436 437 imm_wr.wr.ex.imm_data = cpu_to_be32(imm); 438 439 err = ib_post_send(id->con->c.qp, wr, NULL); 440 if (err) 441 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n", 442 err); 443 444 return err; 445 } 446 447 void close_sess(struct rtrs_srv_sess *sess) 448 { 449 if (rtrs_srv_change_state(sess, RTRS_SRV_CLOSING)) 450 queue_work(rtrs_wq, &sess->close_work); 451 WARN_ON(sess->state != RTRS_SRV_CLOSING); 452 } 453 454 static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state) 455 { 456 switch (state) { 457 case RTRS_SRV_CONNECTING: 458 return "RTRS_SRV_CONNECTING"; 459 case RTRS_SRV_CONNECTED: 460 return "RTRS_SRV_CONNECTED"; 461 case RTRS_SRV_CLOSING: 462 return "RTRS_SRV_CLOSING"; 463 case RTRS_SRV_CLOSED: 464 return "RTRS_SRV_CLOSED"; 465 default: 466 return "UNKNOWN"; 467 } 468 } 469 470 /** 471 * rtrs_srv_resp_rdma() - Finish an RDMA request 472 * 473 * @id: Internal RTRS operation identifier 474 * @status: Response Code sent to the other side for this operation. 475 * 0 = success, <=0 error 476 * Context: any 477 * 478 * Finish a RDMA operation. A message is sent to the client and the 479 * corresponding memory areas will be released. 480 */ 481 bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status) 482 { 483 struct rtrs_srv_sess *sess; 484 struct rtrs_srv_con *con; 485 struct rtrs_sess *s; 486 int err; 487 488 if (WARN_ON(!id)) 489 return true; 490 491 con = id->con; 492 s = con->c.sess; 493 sess = to_srv_sess(s); 494 495 id->status = status; 496 497 if (sess->state != RTRS_SRV_CONNECTED) { 498 rtrs_err_rl(s, 499 "Sending I/O response failed, session %s is disconnected, sess state %s\n", 500 kobject_name(&sess->kobj), 501 rtrs_srv_state_str(sess->state)); 502 goto out; 503 } 504 if (always_invalidate) { 505 struct rtrs_srv_mr *mr = &sess->mrs[id->msg_id]; 506 507 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey)); 508 } 509 if (atomic_sub_return(1, &con->c.sq_wr_avail) < 0) { 510 rtrs_err(s, "IB send queue full: sess=%s cid=%d\n", 511 kobject_name(&sess->kobj), 512 con->c.cid); 513 atomic_add(1, &con->c.sq_wr_avail); 514 spin_lock(&con->rsp_wr_wait_lock); 515 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list); 516 spin_unlock(&con->rsp_wr_wait_lock); 517 return false; 518 } 519 520 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt) 521 err = send_io_resp_imm(con, id, status); 522 else 523 err = rdma_write_sg(id); 524 525 if (err) { 526 rtrs_err_rl(s, "IO response failed: %d: sess=%s\n", err, 527 kobject_name(&sess->kobj)); 528 close_sess(sess); 529 } 530 out: 531 rtrs_srv_put_ops_ids(sess); 532 return true; 533 } 534 EXPORT_SYMBOL(rtrs_srv_resp_rdma); 535 536 /** 537 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv. 538 * @srv: Session pointer 539 * @priv: The private pointer that is associated with the session. 540 */ 541 void rtrs_srv_set_sess_priv(struct rtrs_srv *srv, void *priv) 542 { 543 srv->priv = priv; 544 } 545 EXPORT_SYMBOL(rtrs_srv_set_sess_priv); 546 547 static void unmap_cont_bufs(struct rtrs_srv_sess *sess) 548 { 549 int i; 550 551 for (i = 0; i < sess->mrs_num; i++) { 552 struct rtrs_srv_mr *srv_mr; 553 554 srv_mr = &sess->mrs[i]; 555 rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1); 556 ib_dereg_mr(srv_mr->mr); 557 ib_dma_unmap_sg(sess->s.dev->ib_dev, srv_mr->sgt.sgl, 558 srv_mr->sgt.nents, DMA_BIDIRECTIONAL); 559 sg_free_table(&srv_mr->sgt); 560 } 561 kfree(sess->mrs); 562 } 563 564 static int map_cont_bufs(struct rtrs_srv_sess *sess) 565 { 566 struct rtrs_srv *srv = sess->srv; 567 struct rtrs_sess *ss = &sess->s; 568 int i, mri, err, mrs_num; 569 unsigned int chunk_bits; 570 int chunks_per_mr = 1; 571 572 /* 573 * Here we map queue_depth chunks to MR. Firstly we have to 574 * figure out how many chunks can we map per MR. 575 */ 576 if (always_invalidate) { 577 /* 578 * in order to do invalidate for each chunks of memory, we needs 579 * more memory regions. 580 */ 581 mrs_num = srv->queue_depth; 582 } else { 583 chunks_per_mr = 584 sess->s.dev->ib_dev->attrs.max_fast_reg_page_list_len; 585 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr); 586 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num); 587 } 588 589 sess->mrs = kcalloc(mrs_num, sizeof(*sess->mrs), GFP_KERNEL); 590 if (!sess->mrs) 591 return -ENOMEM; 592 593 sess->mrs_num = mrs_num; 594 595 for (mri = 0; mri < mrs_num; mri++) { 596 struct rtrs_srv_mr *srv_mr = &sess->mrs[mri]; 597 struct sg_table *sgt = &srv_mr->sgt; 598 struct scatterlist *s; 599 struct ib_mr *mr; 600 int nr, chunks; 601 602 chunks = chunks_per_mr * mri; 603 if (!always_invalidate) 604 chunks_per_mr = min_t(int, chunks_per_mr, 605 srv->queue_depth - chunks); 606 607 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL); 608 if (err) 609 goto err; 610 611 for_each_sg(sgt->sgl, s, chunks_per_mr, i) 612 sg_set_page(s, srv->chunks[chunks + i], 613 max_chunk_size, 0); 614 615 nr = ib_dma_map_sg(sess->s.dev->ib_dev, sgt->sgl, 616 sgt->nents, DMA_BIDIRECTIONAL); 617 if (nr < sgt->nents) { 618 err = nr < 0 ? nr : -EINVAL; 619 goto free_sg; 620 } 621 mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG, 622 sgt->nents); 623 if (IS_ERR(mr)) { 624 err = PTR_ERR(mr); 625 goto unmap_sg; 626 } 627 nr = ib_map_mr_sg(mr, sgt->sgl, sgt->nents, 628 NULL, max_chunk_size); 629 if (nr < 0 || nr < sgt->nents) { 630 err = nr < 0 ? nr : -EINVAL; 631 goto dereg_mr; 632 } 633 634 if (always_invalidate) { 635 srv_mr->iu = rtrs_iu_alloc(1, 636 sizeof(struct rtrs_msg_rkey_rsp), 637 GFP_KERNEL, sess->s.dev->ib_dev, 638 DMA_TO_DEVICE, rtrs_srv_rdma_done); 639 if (!srv_mr->iu) { 640 err = -ENOMEM; 641 rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err); 642 goto dereg_mr; 643 } 644 } 645 /* Eventually dma addr for each chunk can be cached */ 646 for_each_sg(sgt->sgl, s, sgt->orig_nents, i) 647 sess->dma_addr[chunks + i] = sg_dma_address(s); 648 649 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey)); 650 srv_mr->mr = mr; 651 652 continue; 653 err: 654 while (mri--) { 655 srv_mr = &sess->mrs[mri]; 656 sgt = &srv_mr->sgt; 657 mr = srv_mr->mr; 658 rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1); 659 dereg_mr: 660 ib_dereg_mr(mr); 661 unmap_sg: 662 ib_dma_unmap_sg(sess->s.dev->ib_dev, sgt->sgl, 663 sgt->nents, DMA_BIDIRECTIONAL); 664 free_sg: 665 sg_free_table(sgt); 666 } 667 kfree(sess->mrs); 668 669 return err; 670 } 671 672 chunk_bits = ilog2(srv->queue_depth - 1) + 1; 673 sess->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits); 674 675 return 0; 676 } 677 678 static void rtrs_srv_hb_err_handler(struct rtrs_con *c) 679 { 680 close_sess(to_srv_sess(c->sess)); 681 } 682 683 static void rtrs_srv_init_hb(struct rtrs_srv_sess *sess) 684 { 685 rtrs_init_hb(&sess->s, &io_comp_cqe, 686 RTRS_HB_INTERVAL_MS, 687 RTRS_HB_MISSED_MAX, 688 rtrs_srv_hb_err_handler, 689 rtrs_wq); 690 } 691 692 static void rtrs_srv_start_hb(struct rtrs_srv_sess *sess) 693 { 694 rtrs_start_hb(&sess->s); 695 } 696 697 static void rtrs_srv_stop_hb(struct rtrs_srv_sess *sess) 698 { 699 rtrs_stop_hb(&sess->s); 700 } 701 702 static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc) 703 { 704 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 705 struct rtrs_sess *s = con->c.sess; 706 struct rtrs_srv_sess *sess = to_srv_sess(s); 707 struct rtrs_iu *iu; 708 709 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 710 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 711 712 if (wc->status != IB_WC_SUCCESS) { 713 rtrs_err(s, "Sess info response send failed: %s\n", 714 ib_wc_status_msg(wc->status)); 715 close_sess(sess); 716 return; 717 } 718 WARN_ON(wc->opcode != IB_WC_SEND); 719 } 720 721 static void rtrs_srv_sess_up(struct rtrs_srv_sess *sess) 722 { 723 struct rtrs_srv *srv = sess->srv; 724 struct rtrs_srv_ctx *ctx = srv->ctx; 725 int up; 726 727 mutex_lock(&srv->paths_ev_mutex); 728 up = ++srv->paths_up; 729 if (up == 1) 730 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL); 731 mutex_unlock(&srv->paths_ev_mutex); 732 733 /* Mark session as established */ 734 sess->established = true; 735 } 736 737 static void rtrs_srv_sess_down(struct rtrs_srv_sess *sess) 738 { 739 struct rtrs_srv *srv = sess->srv; 740 struct rtrs_srv_ctx *ctx = srv->ctx; 741 742 if (!sess->established) 743 return; 744 745 sess->established = false; 746 mutex_lock(&srv->paths_ev_mutex); 747 WARN_ON(!srv->paths_up); 748 if (--srv->paths_up == 0) 749 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv); 750 mutex_unlock(&srv->paths_ev_mutex); 751 } 752 753 static bool exist_sessname(struct rtrs_srv_ctx *ctx, 754 const char *sessname, const uuid_t *path_uuid) 755 { 756 struct rtrs_srv *srv; 757 struct rtrs_srv_sess *sess; 758 bool found = false; 759 760 mutex_lock(&ctx->srv_mutex); 761 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 762 mutex_lock(&srv->paths_mutex); 763 764 /* when a client with same uuid and same sessname tried to add a path */ 765 if (uuid_equal(&srv->paths_uuid, path_uuid)) { 766 mutex_unlock(&srv->paths_mutex); 767 continue; 768 } 769 770 list_for_each_entry(sess, &srv->paths_list, s.entry) { 771 if (strlen(sess->s.sessname) == strlen(sessname) && 772 !strcmp(sess->s.sessname, sessname)) { 773 found = true; 774 break; 775 } 776 } 777 mutex_unlock(&srv->paths_mutex); 778 if (found) 779 break; 780 } 781 mutex_unlock(&ctx->srv_mutex); 782 return found; 783 } 784 785 static int post_recv_sess(struct rtrs_srv_sess *sess); 786 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno); 787 788 static int process_info_req(struct rtrs_srv_con *con, 789 struct rtrs_msg_info_req *msg) 790 { 791 struct rtrs_sess *s = con->c.sess; 792 struct rtrs_srv_sess *sess = to_srv_sess(s); 793 struct ib_send_wr *reg_wr = NULL; 794 struct rtrs_msg_info_rsp *rsp; 795 struct rtrs_iu *tx_iu; 796 struct ib_reg_wr *rwr; 797 int mri, err; 798 size_t tx_sz; 799 800 err = post_recv_sess(sess); 801 if (err) { 802 rtrs_err(s, "post_recv_sess(), err: %d\n", err); 803 return err; 804 } 805 806 if (exist_sessname(sess->srv->ctx, 807 msg->sessname, &sess->srv->paths_uuid)) { 808 rtrs_err(s, "sessname is duplicated: %s\n", msg->sessname); 809 return -EPERM; 810 } 811 strscpy(sess->s.sessname, msg->sessname, sizeof(sess->s.sessname)); 812 813 rwr = kcalloc(sess->mrs_num, sizeof(*rwr), GFP_KERNEL); 814 if (!rwr) 815 return -ENOMEM; 816 817 tx_sz = sizeof(*rsp); 818 tx_sz += sizeof(rsp->desc[0]) * sess->mrs_num; 819 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, sess->s.dev->ib_dev, 820 DMA_TO_DEVICE, rtrs_srv_info_rsp_done); 821 if (!tx_iu) { 822 err = -ENOMEM; 823 goto rwr_free; 824 } 825 826 rsp = tx_iu->buf; 827 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP); 828 rsp->sg_cnt = cpu_to_le16(sess->mrs_num); 829 830 for (mri = 0; mri < sess->mrs_num; mri++) { 831 struct ib_mr *mr = sess->mrs[mri].mr; 832 833 rsp->desc[mri].addr = cpu_to_le64(mr->iova); 834 rsp->desc[mri].key = cpu_to_le32(mr->rkey); 835 rsp->desc[mri].len = cpu_to_le32(mr->length); 836 837 /* 838 * Fill in reg MR request and chain them *backwards* 839 */ 840 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL; 841 rwr[mri].wr.opcode = IB_WR_REG_MR; 842 rwr[mri].wr.wr_cqe = &local_reg_cqe; 843 rwr[mri].wr.num_sge = 0; 844 rwr[mri].wr.send_flags = 0; 845 rwr[mri].mr = mr; 846 rwr[mri].key = mr->rkey; 847 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE | 848 IB_ACCESS_REMOTE_WRITE); 849 reg_wr = &rwr[mri].wr; 850 } 851 852 err = rtrs_srv_create_sess_files(sess); 853 if (err) 854 goto iu_free; 855 kobject_get(&sess->kobj); 856 get_device(&sess->srv->dev); 857 rtrs_srv_change_state(sess, RTRS_SRV_CONNECTED); 858 rtrs_srv_start_hb(sess); 859 860 /* 861 * We do not account number of established connections at the current 862 * moment, we rely on the client, which should send info request when 863 * all connections are successfully established. Thus, simply notify 864 * listener with a proper event if we are the first path. 865 */ 866 rtrs_srv_sess_up(sess); 867 868 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr, 869 tx_iu->size, DMA_TO_DEVICE); 870 871 /* Send info response */ 872 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr); 873 if (err) { 874 rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err); 875 iu_free: 876 rtrs_iu_free(tx_iu, sess->s.dev->ib_dev, 1); 877 } 878 rwr_free: 879 kfree(rwr); 880 881 return err; 882 } 883 884 static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc) 885 { 886 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 887 struct rtrs_sess *s = con->c.sess; 888 struct rtrs_srv_sess *sess = to_srv_sess(s); 889 struct rtrs_msg_info_req *msg; 890 struct rtrs_iu *iu; 891 int err; 892 893 WARN_ON(con->c.cid); 894 895 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 896 if (wc->status != IB_WC_SUCCESS) { 897 rtrs_err(s, "Sess info request receive failed: %s\n", 898 ib_wc_status_msg(wc->status)); 899 goto close; 900 } 901 WARN_ON(wc->opcode != IB_WC_RECV); 902 903 if (wc->byte_len < sizeof(*msg)) { 904 rtrs_err(s, "Sess info request is malformed: size %d\n", 905 wc->byte_len); 906 goto close; 907 } 908 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr, 909 iu->size, DMA_FROM_DEVICE); 910 msg = iu->buf; 911 if (le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ) { 912 rtrs_err(s, "Sess info request is malformed: type %d\n", 913 le16_to_cpu(msg->type)); 914 goto close; 915 } 916 err = process_info_req(con, msg); 917 if (err) 918 goto close; 919 920 out: 921 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1); 922 return; 923 close: 924 close_sess(sess); 925 goto out; 926 } 927 928 static int post_recv_info_req(struct rtrs_srv_con *con) 929 { 930 struct rtrs_sess *s = con->c.sess; 931 struct rtrs_srv_sess *sess = to_srv_sess(s); 932 struct rtrs_iu *rx_iu; 933 int err; 934 935 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), 936 GFP_KERNEL, sess->s.dev->ib_dev, 937 DMA_FROM_DEVICE, rtrs_srv_info_req_done); 938 if (!rx_iu) 939 return -ENOMEM; 940 /* Prepare for getting info response */ 941 err = rtrs_iu_post_recv(&con->c, rx_iu); 942 if (err) { 943 rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err); 944 rtrs_iu_free(rx_iu, sess->s.dev->ib_dev, 1); 945 return err; 946 } 947 948 return 0; 949 } 950 951 static int post_recv_io(struct rtrs_srv_con *con, size_t q_size) 952 { 953 int i, err; 954 955 for (i = 0; i < q_size; i++) { 956 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 957 if (err) 958 return err; 959 } 960 961 return 0; 962 } 963 964 static int post_recv_sess(struct rtrs_srv_sess *sess) 965 { 966 struct rtrs_srv *srv = sess->srv; 967 struct rtrs_sess *s = &sess->s; 968 size_t q_size; 969 int err, cid; 970 971 for (cid = 0; cid < sess->s.con_num; cid++) { 972 if (cid == 0) 973 q_size = SERVICE_CON_QUEUE_DEPTH; 974 else 975 q_size = srv->queue_depth; 976 977 err = post_recv_io(to_srv_con(sess->s.con[cid]), q_size); 978 if (err) { 979 rtrs_err(s, "post_recv_io(), err: %d\n", err); 980 return err; 981 } 982 } 983 984 return 0; 985 } 986 987 static void process_read(struct rtrs_srv_con *con, 988 struct rtrs_msg_rdma_read *msg, 989 u32 buf_id, u32 off) 990 { 991 struct rtrs_sess *s = con->c.sess; 992 struct rtrs_srv_sess *sess = to_srv_sess(s); 993 struct rtrs_srv *srv = sess->srv; 994 struct rtrs_srv_ctx *ctx = srv->ctx; 995 struct rtrs_srv_op *id; 996 997 size_t usr_len, data_len; 998 void *data; 999 int ret; 1000 1001 if (sess->state != RTRS_SRV_CONNECTED) { 1002 rtrs_err_rl(s, 1003 "Processing read request failed, session is disconnected, sess state %s\n", 1004 rtrs_srv_state_str(sess->state)); 1005 return; 1006 } 1007 if (msg->sg_cnt != 1 && msg->sg_cnt != 0) { 1008 rtrs_err_rl(s, 1009 "Processing read request failed, invalid message\n"); 1010 return; 1011 } 1012 rtrs_srv_get_ops_ids(sess); 1013 rtrs_srv_update_rdma_stats(sess->stats, off, READ); 1014 id = sess->ops_ids[buf_id]; 1015 id->con = con; 1016 id->dir = READ; 1017 id->msg_id = buf_id; 1018 id->rd_msg = msg; 1019 usr_len = le16_to_cpu(msg->usr_len); 1020 data_len = off - usr_len; 1021 data = page_address(srv->chunks[buf_id]); 1022 ret = ctx->ops.rdma_ev(srv->priv, id, READ, data, data_len, 1023 data + data_len, usr_len); 1024 1025 if (ret) { 1026 rtrs_err_rl(s, 1027 "Processing read request failed, user module cb reported for msg_id %d, err: %d\n", 1028 buf_id, ret); 1029 goto send_err_msg; 1030 } 1031 1032 return; 1033 1034 send_err_msg: 1035 ret = send_io_resp_imm(con, id, ret); 1036 if (ret < 0) { 1037 rtrs_err_rl(s, 1038 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n", 1039 buf_id, ret); 1040 close_sess(sess); 1041 } 1042 rtrs_srv_put_ops_ids(sess); 1043 } 1044 1045 static void process_write(struct rtrs_srv_con *con, 1046 struct rtrs_msg_rdma_write *req, 1047 u32 buf_id, u32 off) 1048 { 1049 struct rtrs_sess *s = con->c.sess; 1050 struct rtrs_srv_sess *sess = to_srv_sess(s); 1051 struct rtrs_srv *srv = sess->srv; 1052 struct rtrs_srv_ctx *ctx = srv->ctx; 1053 struct rtrs_srv_op *id; 1054 1055 size_t data_len, usr_len; 1056 void *data; 1057 int ret; 1058 1059 if (sess->state != RTRS_SRV_CONNECTED) { 1060 rtrs_err_rl(s, 1061 "Processing write request failed, session is disconnected, sess state %s\n", 1062 rtrs_srv_state_str(sess->state)); 1063 return; 1064 } 1065 rtrs_srv_get_ops_ids(sess); 1066 rtrs_srv_update_rdma_stats(sess->stats, off, WRITE); 1067 id = sess->ops_ids[buf_id]; 1068 id->con = con; 1069 id->dir = WRITE; 1070 id->msg_id = buf_id; 1071 1072 usr_len = le16_to_cpu(req->usr_len); 1073 data_len = off - usr_len; 1074 data = page_address(srv->chunks[buf_id]); 1075 ret = ctx->ops.rdma_ev(srv->priv, id, WRITE, data, data_len, 1076 data + data_len, usr_len); 1077 if (ret) { 1078 rtrs_err_rl(s, 1079 "Processing write request failed, user module callback reports err: %d\n", 1080 ret); 1081 goto send_err_msg; 1082 } 1083 1084 return; 1085 1086 send_err_msg: 1087 ret = send_io_resp_imm(con, id, ret); 1088 if (ret < 0) { 1089 rtrs_err_rl(s, 1090 "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n", 1091 buf_id, ret); 1092 close_sess(sess); 1093 } 1094 rtrs_srv_put_ops_ids(sess); 1095 } 1096 1097 static void process_io_req(struct rtrs_srv_con *con, void *msg, 1098 u32 id, u32 off) 1099 { 1100 struct rtrs_sess *s = con->c.sess; 1101 struct rtrs_srv_sess *sess = to_srv_sess(s); 1102 struct rtrs_msg_rdma_hdr *hdr; 1103 unsigned int type; 1104 1105 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, sess->dma_addr[id], 1106 max_chunk_size, DMA_BIDIRECTIONAL); 1107 hdr = msg; 1108 type = le16_to_cpu(hdr->type); 1109 1110 switch (type) { 1111 case RTRS_MSG_WRITE: 1112 process_write(con, msg, id, off); 1113 break; 1114 case RTRS_MSG_READ: 1115 process_read(con, msg, id, off); 1116 break; 1117 default: 1118 rtrs_err(s, 1119 "Processing I/O request failed, unknown message type received: 0x%02x\n", 1120 type); 1121 goto err; 1122 } 1123 1124 return; 1125 1126 err: 1127 close_sess(sess); 1128 } 1129 1130 static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc) 1131 { 1132 struct rtrs_srv_mr *mr = 1133 container_of(wc->wr_cqe, typeof(*mr), inv_cqe); 1134 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1135 struct rtrs_sess *s = con->c.sess; 1136 struct rtrs_srv_sess *sess = to_srv_sess(s); 1137 struct rtrs_srv *srv = sess->srv; 1138 u32 msg_id, off; 1139 void *data; 1140 1141 if (wc->status != IB_WC_SUCCESS) { 1142 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n", 1143 ib_wc_status_msg(wc->status)); 1144 close_sess(sess); 1145 } 1146 msg_id = mr->msg_id; 1147 off = mr->msg_off; 1148 data = page_address(srv->chunks[msg_id]) + off; 1149 process_io_req(con, data, msg_id, off); 1150 } 1151 1152 static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con, 1153 struct rtrs_srv_mr *mr) 1154 { 1155 struct ib_send_wr wr = { 1156 .opcode = IB_WR_LOCAL_INV, 1157 .wr_cqe = &mr->inv_cqe, 1158 .send_flags = IB_SEND_SIGNALED, 1159 .ex.invalidate_rkey = mr->mr->rkey, 1160 }; 1161 mr->inv_cqe.done = rtrs_srv_inv_rkey_done; 1162 1163 return ib_post_send(con->c.qp, &wr, NULL); 1164 } 1165 1166 static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con) 1167 { 1168 spin_lock(&con->rsp_wr_wait_lock); 1169 while (!list_empty(&con->rsp_wr_wait_list)) { 1170 struct rtrs_srv_op *id; 1171 int ret; 1172 1173 id = list_entry(con->rsp_wr_wait_list.next, 1174 struct rtrs_srv_op, wait_list); 1175 list_del(&id->wait_list); 1176 1177 spin_unlock(&con->rsp_wr_wait_lock); 1178 ret = rtrs_srv_resp_rdma(id, id->status); 1179 spin_lock(&con->rsp_wr_wait_lock); 1180 1181 if (!ret) { 1182 list_add(&id->wait_list, &con->rsp_wr_wait_list); 1183 break; 1184 } 1185 } 1186 spin_unlock(&con->rsp_wr_wait_lock); 1187 } 1188 1189 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc) 1190 { 1191 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1192 struct rtrs_sess *s = con->c.sess; 1193 struct rtrs_srv_sess *sess = to_srv_sess(s); 1194 struct rtrs_srv *srv = sess->srv; 1195 u32 imm_type, imm_payload; 1196 int err; 1197 1198 if (wc->status != IB_WC_SUCCESS) { 1199 if (wc->status != IB_WC_WR_FLUSH_ERR) { 1200 rtrs_err(s, 1201 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n", 1202 ib_wc_status_msg(wc->status), wc->wr_cqe, 1203 wc->opcode, wc->vendor_err, wc->byte_len); 1204 close_sess(sess); 1205 } 1206 return; 1207 } 1208 1209 switch (wc->opcode) { 1210 case IB_WC_RECV_RDMA_WITH_IMM: 1211 /* 1212 * post_recv() RDMA write completions of IO reqs (read/write) 1213 * and hb 1214 */ 1215 if (WARN_ON(wc->wr_cqe != &io_comp_cqe)) 1216 return; 1217 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 1218 if (err) { 1219 rtrs_err(s, "rtrs_post_recv(), err: %d\n", err); 1220 close_sess(sess); 1221 break; 1222 } 1223 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), 1224 &imm_type, &imm_payload); 1225 if (imm_type == RTRS_IO_REQ_IMM) { 1226 u32 msg_id, off; 1227 void *data; 1228 1229 msg_id = imm_payload >> sess->mem_bits; 1230 off = imm_payload & ((1 << sess->mem_bits) - 1); 1231 if (msg_id >= srv->queue_depth || off >= max_chunk_size) { 1232 rtrs_err(s, "Wrong msg_id %u, off %u\n", 1233 msg_id, off); 1234 close_sess(sess); 1235 return; 1236 } 1237 if (always_invalidate) { 1238 struct rtrs_srv_mr *mr = &sess->mrs[msg_id]; 1239 1240 mr->msg_off = off; 1241 mr->msg_id = msg_id; 1242 err = rtrs_srv_inv_rkey(con, mr); 1243 if (err) { 1244 rtrs_err(s, "rtrs_post_recv(), err: %d\n", 1245 err); 1246 close_sess(sess); 1247 break; 1248 } 1249 } else { 1250 data = page_address(srv->chunks[msg_id]) + off; 1251 process_io_req(con, data, msg_id, off); 1252 } 1253 } else if (imm_type == RTRS_HB_MSG_IMM) { 1254 WARN_ON(con->c.cid); 1255 rtrs_send_hb_ack(&sess->s); 1256 } else if (imm_type == RTRS_HB_ACK_IMM) { 1257 WARN_ON(con->c.cid); 1258 sess->s.hb_missed_cnt = 0; 1259 } else { 1260 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type); 1261 } 1262 break; 1263 case IB_WC_RDMA_WRITE: 1264 case IB_WC_SEND: 1265 /* 1266 * post_send() RDMA write completions of IO reqs (read/write) 1267 * and hb. 1268 */ 1269 atomic_add(s->signal_interval, &con->c.sq_wr_avail); 1270 1271 if (!list_empty_careful(&con->rsp_wr_wait_list)) 1272 rtrs_rdma_process_wr_wait_list(con); 1273 1274 break; 1275 default: 1276 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode); 1277 return; 1278 } 1279 } 1280 1281 /** 1282 * rtrs_srv_get_sess_name() - Get rtrs_srv peer hostname. 1283 * @srv: Session 1284 * @sessname: Sessname buffer 1285 * @len: Length of sessname buffer 1286 */ 1287 int rtrs_srv_get_sess_name(struct rtrs_srv *srv, char *sessname, size_t len) 1288 { 1289 struct rtrs_srv_sess *sess; 1290 int err = -ENOTCONN; 1291 1292 mutex_lock(&srv->paths_mutex); 1293 list_for_each_entry(sess, &srv->paths_list, s.entry) { 1294 if (sess->state != RTRS_SRV_CONNECTED) 1295 continue; 1296 strscpy(sessname, sess->s.sessname, 1297 min_t(size_t, sizeof(sess->s.sessname), len)); 1298 err = 0; 1299 break; 1300 } 1301 mutex_unlock(&srv->paths_mutex); 1302 1303 return err; 1304 } 1305 EXPORT_SYMBOL(rtrs_srv_get_sess_name); 1306 1307 /** 1308 * rtrs_srv_get_queue_depth() - Get rtrs_srv qdepth. 1309 * @srv: Session 1310 */ 1311 int rtrs_srv_get_queue_depth(struct rtrs_srv *srv) 1312 { 1313 return srv->queue_depth; 1314 } 1315 EXPORT_SYMBOL(rtrs_srv_get_queue_depth); 1316 1317 static int find_next_bit_ring(struct rtrs_srv_sess *sess) 1318 { 1319 struct ib_device *ib_dev = sess->s.dev->ib_dev; 1320 int v; 1321 1322 v = cpumask_next(sess->cur_cq_vector, &cq_affinity_mask); 1323 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors) 1324 v = cpumask_first(&cq_affinity_mask); 1325 return v; 1326 } 1327 1328 static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_sess *sess) 1329 { 1330 sess->cur_cq_vector = find_next_bit_ring(sess); 1331 1332 return sess->cur_cq_vector; 1333 } 1334 1335 static void rtrs_srv_dev_release(struct device *dev) 1336 { 1337 struct rtrs_srv *srv = container_of(dev, struct rtrs_srv, dev); 1338 1339 kfree(srv); 1340 } 1341 1342 static void free_srv(struct rtrs_srv *srv) 1343 { 1344 int i; 1345 1346 WARN_ON(refcount_read(&srv->refcount)); 1347 for (i = 0; i < srv->queue_depth; i++) 1348 mempool_free(srv->chunks[i], chunk_pool); 1349 kfree(srv->chunks); 1350 mutex_destroy(&srv->paths_mutex); 1351 mutex_destroy(&srv->paths_ev_mutex); 1352 /* last put to release the srv structure */ 1353 put_device(&srv->dev); 1354 } 1355 1356 static struct rtrs_srv *get_or_create_srv(struct rtrs_srv_ctx *ctx, 1357 const uuid_t *paths_uuid, 1358 bool first_conn) 1359 { 1360 struct rtrs_srv *srv; 1361 int i; 1362 1363 mutex_lock(&ctx->srv_mutex); 1364 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 1365 if (uuid_equal(&srv->paths_uuid, paths_uuid) && 1366 refcount_inc_not_zero(&srv->refcount)) { 1367 mutex_unlock(&ctx->srv_mutex); 1368 return srv; 1369 } 1370 } 1371 mutex_unlock(&ctx->srv_mutex); 1372 /* 1373 * If this request is not the first connection request from the 1374 * client for this session then fail and return error. 1375 */ 1376 if (!first_conn) { 1377 pr_err_ratelimited("Error: Not the first connection request for this session\n"); 1378 return ERR_PTR(-ENXIO); 1379 } 1380 1381 /* need to allocate a new srv */ 1382 srv = kzalloc(sizeof(*srv), GFP_KERNEL); 1383 if (!srv) 1384 return ERR_PTR(-ENOMEM); 1385 1386 INIT_LIST_HEAD(&srv->paths_list); 1387 mutex_init(&srv->paths_mutex); 1388 mutex_init(&srv->paths_ev_mutex); 1389 uuid_copy(&srv->paths_uuid, paths_uuid); 1390 srv->queue_depth = sess_queue_depth; 1391 srv->ctx = ctx; 1392 device_initialize(&srv->dev); 1393 srv->dev.release = rtrs_srv_dev_release; 1394 1395 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks), 1396 GFP_KERNEL); 1397 if (!srv->chunks) 1398 goto err_free_srv; 1399 1400 for (i = 0; i < srv->queue_depth; i++) { 1401 srv->chunks[i] = mempool_alloc(chunk_pool, GFP_KERNEL); 1402 if (!srv->chunks[i]) 1403 goto err_free_chunks; 1404 } 1405 refcount_set(&srv->refcount, 1); 1406 mutex_lock(&ctx->srv_mutex); 1407 list_add(&srv->ctx_list, &ctx->srv_list); 1408 mutex_unlock(&ctx->srv_mutex); 1409 1410 return srv; 1411 1412 err_free_chunks: 1413 while (i--) 1414 mempool_free(srv->chunks[i], chunk_pool); 1415 kfree(srv->chunks); 1416 1417 err_free_srv: 1418 kfree(srv); 1419 return ERR_PTR(-ENOMEM); 1420 } 1421 1422 static void put_srv(struct rtrs_srv *srv) 1423 { 1424 if (refcount_dec_and_test(&srv->refcount)) { 1425 struct rtrs_srv_ctx *ctx = srv->ctx; 1426 1427 WARN_ON(srv->dev.kobj.state_in_sysfs); 1428 1429 mutex_lock(&ctx->srv_mutex); 1430 list_del(&srv->ctx_list); 1431 mutex_unlock(&ctx->srv_mutex); 1432 free_srv(srv); 1433 } 1434 } 1435 1436 static void __add_path_to_srv(struct rtrs_srv *srv, 1437 struct rtrs_srv_sess *sess) 1438 { 1439 list_add_tail(&sess->s.entry, &srv->paths_list); 1440 srv->paths_num++; 1441 WARN_ON(srv->paths_num >= MAX_PATHS_NUM); 1442 } 1443 1444 static void del_path_from_srv(struct rtrs_srv_sess *sess) 1445 { 1446 struct rtrs_srv *srv = sess->srv; 1447 1448 if (WARN_ON(!srv)) 1449 return; 1450 1451 mutex_lock(&srv->paths_mutex); 1452 list_del(&sess->s.entry); 1453 WARN_ON(!srv->paths_num); 1454 srv->paths_num--; 1455 mutex_unlock(&srv->paths_mutex); 1456 } 1457 1458 /* return true if addresses are the same, error other wise */ 1459 static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b) 1460 { 1461 switch (a->sa_family) { 1462 case AF_IB: 1463 return memcmp(&((struct sockaddr_ib *)a)->sib_addr, 1464 &((struct sockaddr_ib *)b)->sib_addr, 1465 sizeof(struct ib_addr)) && 1466 (b->sa_family == AF_IB); 1467 case AF_INET: 1468 return memcmp(&((struct sockaddr_in *)a)->sin_addr, 1469 &((struct sockaddr_in *)b)->sin_addr, 1470 sizeof(struct in_addr)) && 1471 (b->sa_family == AF_INET); 1472 case AF_INET6: 1473 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr, 1474 &((struct sockaddr_in6 *)b)->sin6_addr, 1475 sizeof(struct in6_addr)) && 1476 (b->sa_family == AF_INET6); 1477 default: 1478 return -ENOENT; 1479 } 1480 } 1481 1482 static bool __is_path_w_addr_exists(struct rtrs_srv *srv, 1483 struct rdma_addr *addr) 1484 { 1485 struct rtrs_srv_sess *sess; 1486 1487 list_for_each_entry(sess, &srv->paths_list, s.entry) 1488 if (!sockaddr_cmp((struct sockaddr *)&sess->s.dst_addr, 1489 (struct sockaddr *)&addr->dst_addr) && 1490 !sockaddr_cmp((struct sockaddr *)&sess->s.src_addr, 1491 (struct sockaddr *)&addr->src_addr)) 1492 return true; 1493 1494 return false; 1495 } 1496 1497 static void free_sess(struct rtrs_srv_sess *sess) 1498 { 1499 if (sess->kobj.state_in_sysfs) { 1500 kobject_del(&sess->kobj); 1501 kobject_put(&sess->kobj); 1502 } else { 1503 kfree(sess->stats); 1504 kfree(sess); 1505 } 1506 } 1507 1508 static void rtrs_srv_close_work(struct work_struct *work) 1509 { 1510 struct rtrs_srv_sess *sess; 1511 struct rtrs_srv_con *con; 1512 int i; 1513 1514 sess = container_of(work, typeof(*sess), close_work); 1515 1516 rtrs_srv_destroy_sess_files(sess); 1517 rtrs_srv_stop_hb(sess); 1518 1519 for (i = 0; i < sess->s.con_num; i++) { 1520 if (!sess->s.con[i]) 1521 continue; 1522 con = to_srv_con(sess->s.con[i]); 1523 rdma_disconnect(con->c.cm_id); 1524 ib_drain_qp(con->c.qp); 1525 } 1526 1527 /* 1528 * Degrade ref count to the usual model with a single shared 1529 * atomic_t counter 1530 */ 1531 percpu_ref_kill(&sess->ids_inflight_ref); 1532 1533 /* Wait for all completion */ 1534 wait_for_completion(&sess->complete_done); 1535 1536 /* Notify upper layer if we are the last path */ 1537 rtrs_srv_sess_down(sess); 1538 1539 unmap_cont_bufs(sess); 1540 rtrs_srv_free_ops_ids(sess); 1541 1542 for (i = 0; i < sess->s.con_num; i++) { 1543 if (!sess->s.con[i]) 1544 continue; 1545 con = to_srv_con(sess->s.con[i]); 1546 rtrs_cq_qp_destroy(&con->c); 1547 rdma_destroy_id(con->c.cm_id); 1548 kfree(con); 1549 } 1550 rtrs_ib_dev_put(sess->s.dev); 1551 1552 del_path_from_srv(sess); 1553 put_srv(sess->srv); 1554 sess->srv = NULL; 1555 rtrs_srv_change_state(sess, RTRS_SRV_CLOSED); 1556 1557 kfree(sess->dma_addr); 1558 kfree(sess->s.con); 1559 free_sess(sess); 1560 } 1561 1562 static int rtrs_rdma_do_accept(struct rtrs_srv_sess *sess, 1563 struct rdma_cm_id *cm_id) 1564 { 1565 struct rtrs_srv *srv = sess->srv; 1566 struct rtrs_msg_conn_rsp msg; 1567 struct rdma_conn_param param; 1568 int err; 1569 1570 param = (struct rdma_conn_param) { 1571 .rnr_retry_count = 7, 1572 .private_data = &msg, 1573 .private_data_len = sizeof(msg), 1574 }; 1575 1576 msg = (struct rtrs_msg_conn_rsp) { 1577 .magic = cpu_to_le16(RTRS_MAGIC), 1578 .version = cpu_to_le16(RTRS_PROTO_VER), 1579 .queue_depth = cpu_to_le16(srv->queue_depth), 1580 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE), 1581 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE), 1582 }; 1583 1584 if (always_invalidate) 1585 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F); 1586 1587 err = rdma_accept(cm_id, ¶m); 1588 if (err) 1589 pr_err("rdma_accept(), err: %d\n", err); 1590 1591 return err; 1592 } 1593 1594 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno) 1595 { 1596 struct rtrs_msg_conn_rsp msg; 1597 int err; 1598 1599 msg = (struct rtrs_msg_conn_rsp) { 1600 .magic = cpu_to_le16(RTRS_MAGIC), 1601 .version = cpu_to_le16(RTRS_PROTO_VER), 1602 .errno = cpu_to_le16(errno), 1603 }; 1604 1605 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED); 1606 if (err) 1607 pr_err("rdma_reject(), err: %d\n", err); 1608 1609 /* Bounce errno back */ 1610 return errno; 1611 } 1612 1613 static struct rtrs_srv_sess * 1614 __find_sess(struct rtrs_srv *srv, const uuid_t *sess_uuid) 1615 { 1616 struct rtrs_srv_sess *sess; 1617 1618 list_for_each_entry(sess, &srv->paths_list, s.entry) { 1619 if (uuid_equal(&sess->s.uuid, sess_uuid)) 1620 return sess; 1621 } 1622 1623 return NULL; 1624 } 1625 1626 static int create_con(struct rtrs_srv_sess *sess, 1627 struct rdma_cm_id *cm_id, 1628 unsigned int cid) 1629 { 1630 struct rtrs_srv *srv = sess->srv; 1631 struct rtrs_sess *s = &sess->s; 1632 struct rtrs_srv_con *con; 1633 1634 u32 cq_num, max_send_wr, max_recv_wr, wr_limit; 1635 int err, cq_vector; 1636 1637 con = kzalloc(sizeof(*con), GFP_KERNEL); 1638 if (!con) { 1639 err = -ENOMEM; 1640 goto err; 1641 } 1642 1643 spin_lock_init(&con->rsp_wr_wait_lock); 1644 INIT_LIST_HEAD(&con->rsp_wr_wait_list); 1645 con->c.cm_id = cm_id; 1646 con->c.sess = &sess->s; 1647 con->c.cid = cid; 1648 atomic_set(&con->c.wr_cnt, 1); 1649 wr_limit = sess->s.dev->ib_dev->attrs.max_qp_wr; 1650 1651 if (con->c.cid == 0) { 1652 /* 1653 * All receive and all send (each requiring invalidate) 1654 * + 2 for drain and heartbeat 1655 */ 1656 max_send_wr = min_t(int, wr_limit, 1657 SERVICE_CON_QUEUE_DEPTH * 2 + 2); 1658 max_recv_wr = max_send_wr; 1659 s->signal_interval = min_not_zero(srv->queue_depth, 1660 (size_t)SERVICE_CON_QUEUE_DEPTH); 1661 } else { 1662 /* when always_invlaidate enalbed, we need linv+rinv+mr+imm */ 1663 if (always_invalidate) 1664 max_send_wr = 1665 min_t(int, wr_limit, 1666 srv->queue_depth * (1 + 4) + 1); 1667 else 1668 max_send_wr = 1669 min_t(int, wr_limit, 1670 srv->queue_depth * (1 + 2) + 1); 1671 1672 max_recv_wr = srv->queue_depth + 1; 1673 /* 1674 * If we have all receive requests posted and 1675 * all write requests posted and each read request 1676 * requires an invalidate request + drain 1677 * and qp gets into error state. 1678 */ 1679 } 1680 cq_num = max_send_wr + max_recv_wr; 1681 atomic_set(&con->c.sq_wr_avail, max_send_wr); 1682 cq_vector = rtrs_srv_get_next_cq_vector(sess); 1683 1684 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */ 1685 err = rtrs_cq_qp_create(&sess->s, &con->c, 1, cq_vector, cq_num, 1686 max_send_wr, max_recv_wr, 1687 IB_POLL_WORKQUEUE); 1688 if (err) { 1689 rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err); 1690 goto free_con; 1691 } 1692 if (con->c.cid == 0) { 1693 err = post_recv_info_req(con); 1694 if (err) 1695 goto free_cqqp; 1696 } 1697 WARN_ON(sess->s.con[cid]); 1698 sess->s.con[cid] = &con->c; 1699 1700 /* 1701 * Change context from server to current connection. The other 1702 * way is to use cm_id->qp->qp_context, which does not work on OFED. 1703 */ 1704 cm_id->context = &con->c; 1705 1706 return 0; 1707 1708 free_cqqp: 1709 rtrs_cq_qp_destroy(&con->c); 1710 free_con: 1711 kfree(con); 1712 1713 err: 1714 return err; 1715 } 1716 1717 static struct rtrs_srv_sess *__alloc_sess(struct rtrs_srv *srv, 1718 struct rdma_cm_id *cm_id, 1719 unsigned int con_num, 1720 unsigned int recon_cnt, 1721 const uuid_t *uuid) 1722 { 1723 struct rtrs_srv_sess *sess; 1724 int err = -ENOMEM; 1725 char str[NAME_MAX]; 1726 struct rtrs_addr path; 1727 1728 if (srv->paths_num >= MAX_PATHS_NUM) { 1729 err = -ECONNRESET; 1730 goto err; 1731 } 1732 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) { 1733 err = -EEXIST; 1734 pr_err("Path with same addr exists\n"); 1735 goto err; 1736 } 1737 sess = kzalloc(sizeof(*sess), GFP_KERNEL); 1738 if (!sess) 1739 goto err; 1740 1741 sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL); 1742 if (!sess->stats) 1743 goto err_free_sess; 1744 1745 sess->stats->sess = sess; 1746 1747 sess->dma_addr = kcalloc(srv->queue_depth, sizeof(*sess->dma_addr), 1748 GFP_KERNEL); 1749 if (!sess->dma_addr) 1750 goto err_free_stats; 1751 1752 sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL); 1753 if (!sess->s.con) 1754 goto err_free_dma_addr; 1755 1756 sess->state = RTRS_SRV_CONNECTING; 1757 sess->srv = srv; 1758 sess->cur_cq_vector = -1; 1759 sess->s.dst_addr = cm_id->route.addr.dst_addr; 1760 sess->s.src_addr = cm_id->route.addr.src_addr; 1761 1762 /* temporary until receiving session-name from client */ 1763 path.src = &sess->s.src_addr; 1764 path.dst = &sess->s.dst_addr; 1765 rtrs_addr_to_str(&path, str, sizeof(str)); 1766 strscpy(sess->s.sessname, str, sizeof(sess->s.sessname)); 1767 1768 sess->s.con_num = con_num; 1769 sess->s.recon_cnt = recon_cnt; 1770 uuid_copy(&sess->s.uuid, uuid); 1771 spin_lock_init(&sess->state_lock); 1772 INIT_WORK(&sess->close_work, rtrs_srv_close_work); 1773 rtrs_srv_init_hb(sess); 1774 1775 sess->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd); 1776 if (!sess->s.dev) { 1777 err = -ENOMEM; 1778 goto err_free_con; 1779 } 1780 err = map_cont_bufs(sess); 1781 if (err) 1782 goto err_put_dev; 1783 1784 err = rtrs_srv_alloc_ops_ids(sess); 1785 if (err) 1786 goto err_unmap_bufs; 1787 1788 __add_path_to_srv(srv, sess); 1789 1790 return sess; 1791 1792 err_unmap_bufs: 1793 unmap_cont_bufs(sess); 1794 err_put_dev: 1795 rtrs_ib_dev_put(sess->s.dev); 1796 err_free_con: 1797 kfree(sess->s.con); 1798 err_free_dma_addr: 1799 kfree(sess->dma_addr); 1800 err_free_stats: 1801 kfree(sess->stats); 1802 err_free_sess: 1803 kfree(sess); 1804 err: 1805 return ERR_PTR(err); 1806 } 1807 1808 static int rtrs_rdma_connect(struct rdma_cm_id *cm_id, 1809 const struct rtrs_msg_conn_req *msg, 1810 size_t len) 1811 { 1812 struct rtrs_srv_ctx *ctx = cm_id->context; 1813 struct rtrs_srv_sess *sess; 1814 struct rtrs_srv *srv; 1815 1816 u16 version, con_num, cid; 1817 u16 recon_cnt; 1818 int err = -ECONNRESET; 1819 1820 if (len < sizeof(*msg)) { 1821 pr_err("Invalid RTRS connection request\n"); 1822 goto reject_w_err; 1823 } 1824 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) { 1825 pr_err("Invalid RTRS magic\n"); 1826 goto reject_w_err; 1827 } 1828 version = le16_to_cpu(msg->version); 1829 if (version >> 8 != RTRS_PROTO_VER_MAJOR) { 1830 pr_err("Unsupported major RTRS version: %d, expected %d\n", 1831 version >> 8, RTRS_PROTO_VER_MAJOR); 1832 goto reject_w_err; 1833 } 1834 con_num = le16_to_cpu(msg->cid_num); 1835 if (con_num > 4096) { 1836 /* Sanity check */ 1837 pr_err("Too many connections requested: %d\n", con_num); 1838 goto reject_w_err; 1839 } 1840 cid = le16_to_cpu(msg->cid); 1841 if (cid >= con_num) { 1842 /* Sanity check */ 1843 pr_err("Incorrect cid: %d >= %d\n", cid, con_num); 1844 goto reject_w_err; 1845 } 1846 recon_cnt = le16_to_cpu(msg->recon_cnt); 1847 srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn); 1848 if (IS_ERR(srv)) { 1849 err = PTR_ERR(srv); 1850 pr_err("get_or_create_srv(), error %d\n", err); 1851 goto reject_w_err; 1852 } 1853 mutex_lock(&srv->paths_mutex); 1854 sess = __find_sess(srv, &msg->sess_uuid); 1855 if (sess) { 1856 struct rtrs_sess *s = &sess->s; 1857 1858 /* Session already holds a reference */ 1859 put_srv(srv); 1860 1861 if (sess->state != RTRS_SRV_CONNECTING) { 1862 rtrs_err(s, "Session in wrong state: %s\n", 1863 rtrs_srv_state_str(sess->state)); 1864 mutex_unlock(&srv->paths_mutex); 1865 goto reject_w_err; 1866 } 1867 /* 1868 * Sanity checks 1869 */ 1870 if (con_num != s->con_num || cid >= s->con_num) { 1871 rtrs_err(s, "Incorrect request: %d, %d\n", 1872 cid, con_num); 1873 mutex_unlock(&srv->paths_mutex); 1874 goto reject_w_err; 1875 } 1876 if (s->con[cid]) { 1877 rtrs_err(s, "Connection already exists: %d\n", 1878 cid); 1879 mutex_unlock(&srv->paths_mutex); 1880 goto reject_w_err; 1881 } 1882 } else { 1883 sess = __alloc_sess(srv, cm_id, con_num, recon_cnt, 1884 &msg->sess_uuid); 1885 if (IS_ERR(sess)) { 1886 mutex_unlock(&srv->paths_mutex); 1887 put_srv(srv); 1888 err = PTR_ERR(sess); 1889 pr_err("RTRS server session allocation failed: %d\n", err); 1890 goto reject_w_err; 1891 } 1892 } 1893 err = create_con(sess, cm_id, cid); 1894 if (err) { 1895 rtrs_err((&sess->s), "create_con(), error %d\n", err); 1896 rtrs_rdma_do_reject(cm_id, err); 1897 /* 1898 * Since session has other connections we follow normal way 1899 * through workqueue, but still return an error to tell cma.c 1900 * to call rdma_destroy_id() for current connection. 1901 */ 1902 goto close_and_return_err; 1903 } 1904 err = rtrs_rdma_do_accept(sess, cm_id); 1905 if (err) { 1906 rtrs_err((&sess->s), "rtrs_rdma_do_accept(), error %d\n", err); 1907 rtrs_rdma_do_reject(cm_id, err); 1908 /* 1909 * Since current connection was successfully added to the 1910 * session we follow normal way through workqueue to close the 1911 * session, thus return 0 to tell cma.c we call 1912 * rdma_destroy_id() ourselves. 1913 */ 1914 err = 0; 1915 goto close_and_return_err; 1916 } 1917 mutex_unlock(&srv->paths_mutex); 1918 1919 return 0; 1920 1921 reject_w_err: 1922 return rtrs_rdma_do_reject(cm_id, err); 1923 1924 close_and_return_err: 1925 mutex_unlock(&srv->paths_mutex); 1926 close_sess(sess); 1927 1928 return err; 1929 } 1930 1931 static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id, 1932 struct rdma_cm_event *ev) 1933 { 1934 struct rtrs_srv_sess *sess = NULL; 1935 struct rtrs_sess *s = NULL; 1936 1937 if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 1938 struct rtrs_con *c = cm_id->context; 1939 1940 s = c->sess; 1941 sess = to_srv_sess(s); 1942 } 1943 1944 switch (ev->event) { 1945 case RDMA_CM_EVENT_CONNECT_REQUEST: 1946 /* 1947 * In case of error cma.c will destroy cm_id, 1948 * see cma_process_remove() 1949 */ 1950 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data, 1951 ev->param.conn.private_data_len); 1952 case RDMA_CM_EVENT_ESTABLISHED: 1953 /* Nothing here */ 1954 break; 1955 case RDMA_CM_EVENT_REJECTED: 1956 case RDMA_CM_EVENT_CONNECT_ERROR: 1957 case RDMA_CM_EVENT_UNREACHABLE: 1958 rtrs_err(s, "CM error (CM event: %s, err: %d)\n", 1959 rdma_event_msg(ev->event), ev->status); 1960 fallthrough; 1961 case RDMA_CM_EVENT_DISCONNECTED: 1962 case RDMA_CM_EVENT_ADDR_CHANGE: 1963 case RDMA_CM_EVENT_TIMEWAIT_EXIT: 1964 case RDMA_CM_EVENT_DEVICE_REMOVAL: 1965 close_sess(sess); 1966 break; 1967 default: 1968 pr_err("Ignoring unexpected CM event %s, err %d\n", 1969 rdma_event_msg(ev->event), ev->status); 1970 break; 1971 } 1972 1973 return 0; 1974 } 1975 1976 static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx, 1977 struct sockaddr *addr, 1978 enum rdma_ucm_port_space ps) 1979 { 1980 struct rdma_cm_id *cm_id; 1981 int ret; 1982 1983 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler, 1984 ctx, ps, IB_QPT_RC); 1985 if (IS_ERR(cm_id)) { 1986 ret = PTR_ERR(cm_id); 1987 pr_err("Creating id for RDMA connection failed, err: %d\n", 1988 ret); 1989 goto err_out; 1990 } 1991 ret = rdma_bind_addr(cm_id, addr); 1992 if (ret) { 1993 pr_err("Binding RDMA address failed, err: %d\n", ret); 1994 goto err_cm; 1995 } 1996 ret = rdma_listen(cm_id, 64); 1997 if (ret) { 1998 pr_err("Listening on RDMA connection failed, err: %d\n", 1999 ret); 2000 goto err_cm; 2001 } 2002 2003 return cm_id; 2004 2005 err_cm: 2006 rdma_destroy_id(cm_id); 2007 err_out: 2008 2009 return ERR_PTR(ret); 2010 } 2011 2012 static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port) 2013 { 2014 struct sockaddr_in6 sin = { 2015 .sin6_family = AF_INET6, 2016 .sin6_addr = IN6ADDR_ANY_INIT, 2017 .sin6_port = htons(port), 2018 }; 2019 struct sockaddr_ib sib = { 2020 .sib_family = AF_IB, 2021 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port), 2022 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL), 2023 .sib_pkey = cpu_to_be16(0xffff), 2024 }; 2025 struct rdma_cm_id *cm_ip, *cm_ib; 2026 int ret; 2027 2028 /* 2029 * We accept both IPoIB and IB connections, so we need to keep 2030 * two cm id's, one for each socket type and port space. 2031 * If the cm initialization of one of the id's fails, we abort 2032 * everything. 2033 */ 2034 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP); 2035 if (IS_ERR(cm_ip)) 2036 return PTR_ERR(cm_ip); 2037 2038 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB); 2039 if (IS_ERR(cm_ib)) { 2040 ret = PTR_ERR(cm_ib); 2041 goto free_cm_ip; 2042 } 2043 2044 ctx->cm_id_ip = cm_ip; 2045 ctx->cm_id_ib = cm_ib; 2046 2047 return 0; 2048 2049 free_cm_ip: 2050 rdma_destroy_id(cm_ip); 2051 2052 return ret; 2053 } 2054 2055 static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops) 2056 { 2057 struct rtrs_srv_ctx *ctx; 2058 2059 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); 2060 if (!ctx) 2061 return NULL; 2062 2063 ctx->ops = *ops; 2064 mutex_init(&ctx->srv_mutex); 2065 INIT_LIST_HEAD(&ctx->srv_list); 2066 2067 return ctx; 2068 } 2069 2070 static void free_srv_ctx(struct rtrs_srv_ctx *ctx) 2071 { 2072 WARN_ON(!list_empty(&ctx->srv_list)); 2073 mutex_destroy(&ctx->srv_mutex); 2074 kfree(ctx); 2075 } 2076 2077 static int rtrs_srv_add_one(struct ib_device *device) 2078 { 2079 struct rtrs_srv_ctx *ctx; 2080 int ret = 0; 2081 2082 mutex_lock(&ib_ctx.ib_dev_mutex); 2083 if (ib_ctx.ib_dev_count) 2084 goto out; 2085 2086 /* 2087 * Since our CM IDs are NOT bound to any ib device we will create them 2088 * only once 2089 */ 2090 ctx = ib_ctx.srv_ctx; 2091 ret = rtrs_srv_rdma_init(ctx, ib_ctx.port); 2092 if (ret) { 2093 /* 2094 * We errored out here. 2095 * According to the ib code, if we encounter an error here then the 2096 * error code is ignored, and no more calls to our ops are made. 2097 */ 2098 pr_err("Failed to initialize RDMA connection"); 2099 goto err_out; 2100 } 2101 2102 out: 2103 /* 2104 * Keep a track on the number of ib devices added 2105 */ 2106 ib_ctx.ib_dev_count++; 2107 2108 err_out: 2109 mutex_unlock(&ib_ctx.ib_dev_mutex); 2110 return ret; 2111 } 2112 2113 static void rtrs_srv_remove_one(struct ib_device *device, void *client_data) 2114 { 2115 struct rtrs_srv_ctx *ctx; 2116 2117 mutex_lock(&ib_ctx.ib_dev_mutex); 2118 ib_ctx.ib_dev_count--; 2119 2120 if (ib_ctx.ib_dev_count) 2121 goto out; 2122 2123 /* 2124 * Since our CM IDs are NOT bound to any ib device we will remove them 2125 * only once, when the last device is removed 2126 */ 2127 ctx = ib_ctx.srv_ctx; 2128 rdma_destroy_id(ctx->cm_id_ip); 2129 rdma_destroy_id(ctx->cm_id_ib); 2130 2131 out: 2132 mutex_unlock(&ib_ctx.ib_dev_mutex); 2133 } 2134 2135 static struct ib_client rtrs_srv_client = { 2136 .name = "rtrs_server", 2137 .add = rtrs_srv_add_one, 2138 .remove = rtrs_srv_remove_one 2139 }; 2140 2141 /** 2142 * rtrs_srv_open() - open RTRS server context 2143 * @ops: callback functions 2144 * @port: port to listen on 2145 * 2146 * Creates server context with specified callbacks. 2147 * 2148 * Return a valid pointer on success otherwise PTR_ERR. 2149 */ 2150 struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port) 2151 { 2152 struct rtrs_srv_ctx *ctx; 2153 int err; 2154 2155 ctx = alloc_srv_ctx(ops); 2156 if (!ctx) 2157 return ERR_PTR(-ENOMEM); 2158 2159 mutex_init(&ib_ctx.ib_dev_mutex); 2160 ib_ctx.srv_ctx = ctx; 2161 ib_ctx.port = port; 2162 2163 err = ib_register_client(&rtrs_srv_client); 2164 if (err) { 2165 free_srv_ctx(ctx); 2166 return ERR_PTR(err); 2167 } 2168 2169 return ctx; 2170 } 2171 EXPORT_SYMBOL(rtrs_srv_open); 2172 2173 static void close_sessions(struct rtrs_srv *srv) 2174 { 2175 struct rtrs_srv_sess *sess; 2176 2177 mutex_lock(&srv->paths_mutex); 2178 list_for_each_entry(sess, &srv->paths_list, s.entry) 2179 close_sess(sess); 2180 mutex_unlock(&srv->paths_mutex); 2181 } 2182 2183 static void close_ctx(struct rtrs_srv_ctx *ctx) 2184 { 2185 struct rtrs_srv *srv; 2186 2187 mutex_lock(&ctx->srv_mutex); 2188 list_for_each_entry(srv, &ctx->srv_list, ctx_list) 2189 close_sessions(srv); 2190 mutex_unlock(&ctx->srv_mutex); 2191 flush_workqueue(rtrs_wq); 2192 } 2193 2194 /** 2195 * rtrs_srv_close() - close RTRS server context 2196 * @ctx: pointer to server context 2197 * 2198 * Closes RTRS server context with all client sessions. 2199 */ 2200 void rtrs_srv_close(struct rtrs_srv_ctx *ctx) 2201 { 2202 ib_unregister_client(&rtrs_srv_client); 2203 mutex_destroy(&ib_ctx.ib_dev_mutex); 2204 close_ctx(ctx); 2205 free_srv_ctx(ctx); 2206 } 2207 EXPORT_SYMBOL(rtrs_srv_close); 2208 2209 static int check_module_params(void) 2210 { 2211 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) { 2212 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n", 2213 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH); 2214 return -EINVAL; 2215 } 2216 if (max_chunk_size < MIN_CHUNK_SIZE || !is_power_of_2(max_chunk_size)) { 2217 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n", 2218 max_chunk_size, MIN_CHUNK_SIZE); 2219 return -EINVAL; 2220 } 2221 2222 /* 2223 * Check if IB immediate data size is enough to hold the mem_id and the 2224 * offset inside the memory chunk 2225 */ 2226 if ((ilog2(sess_queue_depth - 1) + 1) + 2227 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) { 2228 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n", 2229 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size); 2230 return -EINVAL; 2231 } 2232 2233 return 0; 2234 } 2235 2236 static int __init rtrs_server_init(void) 2237 { 2238 int err; 2239 2240 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n", 2241 KBUILD_MODNAME, RTRS_PROTO_VER_STRING, 2242 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE, 2243 sess_queue_depth, always_invalidate); 2244 2245 rtrs_rdma_dev_pd_init(0, &dev_pd); 2246 2247 err = check_module_params(); 2248 if (err) { 2249 pr_err("Failed to load module, invalid module parameters, err: %d\n", 2250 err); 2251 return err; 2252 } 2253 chunk_pool = mempool_create_page_pool(sess_queue_depth * CHUNK_POOL_SZ, 2254 get_order(max_chunk_size)); 2255 if (!chunk_pool) 2256 return -ENOMEM; 2257 rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server"); 2258 if (IS_ERR(rtrs_dev_class)) { 2259 err = PTR_ERR(rtrs_dev_class); 2260 goto out_chunk_pool; 2261 } 2262 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0); 2263 if (!rtrs_wq) { 2264 err = -ENOMEM; 2265 goto out_dev_class; 2266 } 2267 2268 return 0; 2269 2270 out_dev_class: 2271 class_destroy(rtrs_dev_class); 2272 out_chunk_pool: 2273 mempool_destroy(chunk_pool); 2274 2275 return err; 2276 } 2277 2278 static void __exit rtrs_server_exit(void) 2279 { 2280 destroy_workqueue(rtrs_wq); 2281 class_destroy(rtrs_dev_class); 2282 mempool_destroy(chunk_pool); 2283 rtrs_rdma_dev_pd_deinit(&dev_pd); 2284 } 2285 2286 module_init(rtrs_server_init); 2287 module_exit(rtrs_server_exit); 2288