1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Transport Layer 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 15 #include "rtrs-srv.h" 16 #include "rtrs-log.h" 17 #include <rdma/ib_cm.h> 18 #include <rdma/ib_verbs.h> 19 #include "rtrs-srv-trace.h" 20 21 MODULE_DESCRIPTION("RDMA Transport Server"); 22 MODULE_LICENSE("GPL"); 23 24 /* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */ 25 #define DEFAULT_MAX_CHUNK_SIZE (128 << 10) 26 #define DEFAULT_SESS_QUEUE_DEPTH 512 27 #define MAX_HDR_SIZE PAGE_SIZE 28 29 static struct rtrs_rdma_dev_pd dev_pd; 30 struct class *rtrs_dev_class; 31 static struct rtrs_srv_ib_ctx ib_ctx; 32 33 static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE; 34 static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH; 35 36 static bool always_invalidate = true; 37 module_param(always_invalidate, bool, 0444); 38 MODULE_PARM_DESC(always_invalidate, 39 "Invalidate memory registration for contiguous memory regions before accessing."); 40 41 module_param_named(max_chunk_size, max_chunk_size, int, 0444); 42 MODULE_PARM_DESC(max_chunk_size, 43 "Max size for each IO request, when change the unit is in byte (default: " 44 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)"); 45 46 module_param_named(sess_queue_depth, sess_queue_depth, int, 0444); 47 MODULE_PARM_DESC(sess_queue_depth, 48 "Number of buffers for pending I/O requests to allocate per session. Maximum: " 49 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: " 50 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")"); 51 52 static cpumask_t cq_affinity_mask = { CPU_BITS_ALL }; 53 54 static struct workqueue_struct *rtrs_wq; 55 56 static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c) 57 { 58 return container_of(c, struct rtrs_srv_con, c); 59 } 60 61 static bool rtrs_srv_change_state(struct rtrs_srv_path *srv_path, 62 enum rtrs_srv_state new_state) 63 { 64 enum rtrs_srv_state old_state; 65 bool changed = false; 66 67 spin_lock_irq(&srv_path->state_lock); 68 old_state = srv_path->state; 69 switch (new_state) { 70 case RTRS_SRV_CONNECTED: 71 if (old_state == RTRS_SRV_CONNECTING) 72 changed = true; 73 break; 74 case RTRS_SRV_CLOSING: 75 if (old_state == RTRS_SRV_CONNECTING || 76 old_state == RTRS_SRV_CONNECTED) 77 changed = true; 78 break; 79 case RTRS_SRV_CLOSED: 80 if (old_state == RTRS_SRV_CLOSING) 81 changed = true; 82 break; 83 default: 84 break; 85 } 86 if (changed) 87 srv_path->state = new_state; 88 spin_unlock_irq(&srv_path->state_lock); 89 90 return changed; 91 } 92 93 static void free_id(struct rtrs_srv_op *id) 94 { 95 if (!id) 96 return; 97 kfree(id); 98 } 99 100 static void rtrs_srv_free_ops_ids(struct rtrs_srv_path *srv_path) 101 { 102 struct rtrs_srv_sess *srv = srv_path->srv; 103 int i; 104 105 if (srv_path->ops_ids) { 106 for (i = 0; i < srv->queue_depth; i++) 107 free_id(srv_path->ops_ids[i]); 108 kfree(srv_path->ops_ids); 109 srv_path->ops_ids = NULL; 110 } 111 } 112 113 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc); 114 115 static struct ib_cqe io_comp_cqe = { 116 .done = rtrs_srv_rdma_done 117 }; 118 119 static inline void rtrs_srv_inflight_ref_release(struct percpu_ref *ref) 120 { 121 struct rtrs_srv_path *srv_path = container_of(ref, 122 struct rtrs_srv_path, 123 ids_inflight_ref); 124 125 percpu_ref_exit(&srv_path->ids_inflight_ref); 126 complete(&srv_path->complete_done); 127 } 128 129 static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_path *srv_path) 130 { 131 struct rtrs_srv_sess *srv = srv_path->srv; 132 struct rtrs_srv_op *id; 133 int i, ret; 134 135 srv_path->ops_ids = kcalloc(srv->queue_depth, 136 sizeof(*srv_path->ops_ids), 137 GFP_KERNEL); 138 if (!srv_path->ops_ids) 139 goto err; 140 141 for (i = 0; i < srv->queue_depth; ++i) { 142 id = kzalloc(sizeof(*id), GFP_KERNEL); 143 if (!id) 144 goto err; 145 146 srv_path->ops_ids[i] = id; 147 } 148 149 ret = percpu_ref_init(&srv_path->ids_inflight_ref, 150 rtrs_srv_inflight_ref_release, 0, GFP_KERNEL); 151 if (ret) { 152 pr_err("Percpu reference init failed\n"); 153 goto err; 154 } 155 init_completion(&srv_path->complete_done); 156 157 return 0; 158 159 err: 160 rtrs_srv_free_ops_ids(srv_path); 161 return -ENOMEM; 162 } 163 164 static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_path *srv_path) 165 { 166 percpu_ref_get(&srv_path->ids_inflight_ref); 167 } 168 169 static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_path *srv_path) 170 { 171 percpu_ref_put(&srv_path->ids_inflight_ref); 172 } 173 174 static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc) 175 { 176 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 177 struct rtrs_path *s = con->c.path; 178 struct rtrs_srv_path *srv_path = to_srv_path(s); 179 180 if (wc->status != IB_WC_SUCCESS) { 181 rtrs_err(s, "REG MR failed: %s\n", 182 ib_wc_status_msg(wc->status)); 183 close_path(srv_path); 184 return; 185 } 186 } 187 188 static struct ib_cqe local_reg_cqe = { 189 .done = rtrs_srv_reg_mr_done 190 }; 191 192 static int rdma_write_sg(struct rtrs_srv_op *id) 193 { 194 struct rtrs_path *s = id->con->c.path; 195 struct rtrs_srv_path *srv_path = to_srv_path(s); 196 dma_addr_t dma_addr = srv_path->dma_addr[id->msg_id]; 197 struct rtrs_srv_mr *srv_mr; 198 struct ib_send_wr inv_wr; 199 struct ib_rdma_wr imm_wr; 200 struct ib_rdma_wr *wr = NULL; 201 enum ib_send_flags flags; 202 size_t sg_cnt; 203 int err, offset; 204 bool need_inval; 205 u32 rkey = 0; 206 struct ib_reg_wr rwr; 207 struct ib_sge *plist; 208 struct ib_sge list; 209 210 sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt); 211 need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F; 212 if (sg_cnt != 1) 213 return -EINVAL; 214 215 offset = 0; 216 217 wr = &id->tx_wr; 218 plist = &id->tx_sg; 219 plist->addr = dma_addr + offset; 220 plist->length = le32_to_cpu(id->rd_msg->desc[0].len); 221 222 /* WR will fail with length error 223 * if this is 0 224 */ 225 if (plist->length == 0) { 226 rtrs_err(s, "Invalid RDMA-Write sg list length 0\n"); 227 return -EINVAL; 228 } 229 230 plist->lkey = srv_path->s.dev->ib_pd->local_dma_lkey; 231 offset += plist->length; 232 233 wr->wr.sg_list = plist; 234 wr->wr.num_sge = 1; 235 wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr); 236 wr->rkey = le32_to_cpu(id->rd_msg->desc[0].key); 237 if (rkey == 0) 238 rkey = wr->rkey; 239 else 240 /* Only one key is actually used */ 241 WARN_ON_ONCE(rkey != wr->rkey); 242 243 wr->wr.opcode = IB_WR_RDMA_WRITE; 244 wr->wr.wr_cqe = &io_comp_cqe; 245 wr->wr.ex.imm_data = 0; 246 wr->wr.send_flags = 0; 247 248 if (need_inval && always_invalidate) { 249 wr->wr.next = &rwr.wr; 250 rwr.wr.next = &inv_wr; 251 inv_wr.next = &imm_wr.wr; 252 } else if (always_invalidate) { 253 wr->wr.next = &rwr.wr; 254 rwr.wr.next = &imm_wr.wr; 255 } else if (need_inval) { 256 wr->wr.next = &inv_wr; 257 inv_wr.next = &imm_wr.wr; 258 } else { 259 wr->wr.next = &imm_wr.wr; 260 } 261 /* 262 * From time to time we have to post signaled sends, 263 * or send queue will fill up and only QP reset can help. 264 */ 265 flags = (atomic_inc_return(&id->con->c.wr_cnt) % s->signal_interval) ? 266 0 : IB_SEND_SIGNALED; 267 268 if (need_inval) { 269 inv_wr.sg_list = NULL; 270 inv_wr.num_sge = 0; 271 inv_wr.opcode = IB_WR_SEND_WITH_INV; 272 inv_wr.wr_cqe = &io_comp_cqe; 273 inv_wr.send_flags = 0; 274 inv_wr.ex.invalidate_rkey = rkey; 275 } 276 277 imm_wr.wr.next = NULL; 278 if (always_invalidate) { 279 struct rtrs_msg_rkey_rsp *msg; 280 281 srv_mr = &srv_path->mrs[id->msg_id]; 282 rwr.wr.opcode = IB_WR_REG_MR; 283 rwr.wr.wr_cqe = &local_reg_cqe; 284 rwr.wr.num_sge = 0; 285 rwr.mr = srv_mr->mr; 286 rwr.wr.send_flags = 0; 287 rwr.key = srv_mr->mr->rkey; 288 rwr.access = (IB_ACCESS_LOCAL_WRITE | 289 IB_ACCESS_REMOTE_WRITE); 290 msg = srv_mr->iu->buf; 291 msg->buf_id = cpu_to_le16(id->msg_id); 292 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 293 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 294 295 list.addr = srv_mr->iu->dma_addr; 296 list.length = sizeof(*msg); 297 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey; 298 imm_wr.wr.sg_list = &list; 299 imm_wr.wr.num_sge = 1; 300 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 301 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, 302 srv_mr->iu->dma_addr, 303 srv_mr->iu->size, DMA_TO_DEVICE); 304 } else { 305 imm_wr.wr.sg_list = NULL; 306 imm_wr.wr.num_sge = 0; 307 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 308 } 309 imm_wr.wr.send_flags = flags; 310 imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id, 311 0, need_inval)); 312 313 imm_wr.wr.wr_cqe = &io_comp_cqe; 314 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, dma_addr, 315 offset, DMA_BIDIRECTIONAL); 316 317 err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL); 318 if (err) 319 rtrs_err(s, 320 "Posting RDMA-Write-Request to QP failed, err: %d\n", 321 err); 322 323 return err; 324 } 325 326 /** 327 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE 328 * requests or on successful WRITE request. 329 * @con: the connection to send back result 330 * @id: the id associated with the IO 331 * @errno: the error number of the IO. 332 * 333 * Return 0 on success, errno otherwise. 334 */ 335 static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id, 336 int errno) 337 { 338 struct rtrs_path *s = con->c.path; 339 struct rtrs_srv_path *srv_path = to_srv_path(s); 340 struct ib_send_wr inv_wr, *wr = NULL; 341 struct ib_rdma_wr imm_wr; 342 struct ib_reg_wr rwr; 343 struct rtrs_srv_mr *srv_mr; 344 bool need_inval = false; 345 enum ib_send_flags flags; 346 u32 imm; 347 int err; 348 349 if (id->dir == READ) { 350 struct rtrs_msg_rdma_read *rd_msg = id->rd_msg; 351 size_t sg_cnt; 352 353 need_inval = le16_to_cpu(rd_msg->flags) & 354 RTRS_MSG_NEED_INVAL_F; 355 sg_cnt = le16_to_cpu(rd_msg->sg_cnt); 356 357 if (need_inval) { 358 if (sg_cnt) { 359 inv_wr.wr_cqe = &io_comp_cqe; 360 inv_wr.sg_list = NULL; 361 inv_wr.num_sge = 0; 362 inv_wr.opcode = IB_WR_SEND_WITH_INV; 363 inv_wr.send_flags = 0; 364 /* Only one key is actually used */ 365 inv_wr.ex.invalidate_rkey = 366 le32_to_cpu(rd_msg->desc[0].key); 367 } else { 368 WARN_ON_ONCE(1); 369 need_inval = false; 370 } 371 } 372 } 373 374 trace_send_io_resp_imm(id, need_inval, always_invalidate, errno); 375 376 if (need_inval && always_invalidate) { 377 wr = &inv_wr; 378 inv_wr.next = &rwr.wr; 379 rwr.wr.next = &imm_wr.wr; 380 } else if (always_invalidate) { 381 wr = &rwr.wr; 382 rwr.wr.next = &imm_wr.wr; 383 } else if (need_inval) { 384 wr = &inv_wr; 385 inv_wr.next = &imm_wr.wr; 386 } else { 387 wr = &imm_wr.wr; 388 } 389 /* 390 * From time to time we have to post signalled sends, 391 * or send queue will fill up and only QP reset can help. 392 */ 393 flags = (atomic_inc_return(&con->c.wr_cnt) % s->signal_interval) ? 394 0 : IB_SEND_SIGNALED; 395 imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval); 396 imm_wr.wr.next = NULL; 397 if (always_invalidate) { 398 struct ib_sge list; 399 struct rtrs_msg_rkey_rsp *msg; 400 401 srv_mr = &srv_path->mrs[id->msg_id]; 402 rwr.wr.next = &imm_wr.wr; 403 rwr.wr.opcode = IB_WR_REG_MR; 404 rwr.wr.wr_cqe = &local_reg_cqe; 405 rwr.wr.num_sge = 0; 406 rwr.wr.send_flags = 0; 407 rwr.mr = srv_mr->mr; 408 rwr.key = srv_mr->mr->rkey; 409 rwr.access = (IB_ACCESS_LOCAL_WRITE | 410 IB_ACCESS_REMOTE_WRITE); 411 msg = srv_mr->iu->buf; 412 msg->buf_id = cpu_to_le16(id->msg_id); 413 msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP); 414 msg->rkey = cpu_to_le32(srv_mr->mr->rkey); 415 416 list.addr = srv_mr->iu->dma_addr; 417 list.length = sizeof(*msg); 418 list.lkey = srv_path->s.dev->ib_pd->local_dma_lkey; 419 imm_wr.wr.sg_list = &list; 420 imm_wr.wr.num_sge = 1; 421 imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM; 422 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, 423 srv_mr->iu->dma_addr, 424 srv_mr->iu->size, DMA_TO_DEVICE); 425 } else { 426 imm_wr.wr.sg_list = NULL; 427 imm_wr.wr.num_sge = 0; 428 imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM; 429 } 430 imm_wr.wr.send_flags = flags; 431 imm_wr.wr.wr_cqe = &io_comp_cqe; 432 433 imm_wr.wr.ex.imm_data = cpu_to_be32(imm); 434 435 err = ib_post_send(id->con->c.qp, wr, NULL); 436 if (err) 437 rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n", 438 err); 439 440 return err; 441 } 442 443 void close_path(struct rtrs_srv_path *srv_path) 444 { 445 if (rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSING)) 446 queue_work(rtrs_wq, &srv_path->close_work); 447 WARN_ON(srv_path->state != RTRS_SRV_CLOSING); 448 } 449 450 static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state) 451 { 452 switch (state) { 453 case RTRS_SRV_CONNECTING: 454 return "RTRS_SRV_CONNECTING"; 455 case RTRS_SRV_CONNECTED: 456 return "RTRS_SRV_CONNECTED"; 457 case RTRS_SRV_CLOSING: 458 return "RTRS_SRV_CLOSING"; 459 case RTRS_SRV_CLOSED: 460 return "RTRS_SRV_CLOSED"; 461 default: 462 return "UNKNOWN"; 463 } 464 } 465 466 /** 467 * rtrs_srv_resp_rdma() - Finish an RDMA request 468 * 469 * @id: Internal RTRS operation identifier 470 * @status: Response Code sent to the other side for this operation. 471 * 0 = success, <=0 error 472 * Context: any 473 * 474 * Finish a RDMA operation. A message is sent to the client and the 475 * corresponding memory areas will be released. 476 */ 477 bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status) 478 { 479 struct rtrs_srv_path *srv_path; 480 struct rtrs_srv_con *con; 481 struct rtrs_path *s; 482 int err; 483 484 if (WARN_ON(!id)) 485 return true; 486 487 con = id->con; 488 s = con->c.path; 489 srv_path = to_srv_path(s); 490 491 id->status = status; 492 493 if (srv_path->state != RTRS_SRV_CONNECTED) { 494 rtrs_err_rl(s, 495 "Sending I/O response failed, server path %s is disconnected, path state %s\n", 496 kobject_name(&srv_path->kobj), 497 rtrs_srv_state_str(srv_path->state)); 498 goto out; 499 } 500 if (always_invalidate) { 501 struct rtrs_srv_mr *mr = &srv_path->mrs[id->msg_id]; 502 503 ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey)); 504 } 505 if (atomic_sub_return(1, &con->c.sq_wr_avail) < 0) { 506 rtrs_err(s, "IB send queue full: srv_path=%s cid=%d\n", 507 kobject_name(&srv_path->kobj), 508 con->c.cid); 509 atomic_add(1, &con->c.sq_wr_avail); 510 spin_lock(&con->rsp_wr_wait_lock); 511 list_add_tail(&id->wait_list, &con->rsp_wr_wait_list); 512 spin_unlock(&con->rsp_wr_wait_lock); 513 return false; 514 } 515 516 if (status || id->dir == WRITE || !id->rd_msg->sg_cnt) 517 err = send_io_resp_imm(con, id, status); 518 else 519 err = rdma_write_sg(id); 520 521 if (err) { 522 rtrs_err_rl(s, "IO response failed: %d: srv_path=%s\n", err, 523 kobject_name(&srv_path->kobj)); 524 close_path(srv_path); 525 } 526 out: 527 rtrs_srv_put_ops_ids(srv_path); 528 return true; 529 } 530 EXPORT_SYMBOL(rtrs_srv_resp_rdma); 531 532 /** 533 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv. 534 * @srv: Session pointer 535 * @priv: The private pointer that is associated with the session. 536 */ 537 void rtrs_srv_set_sess_priv(struct rtrs_srv_sess *srv, void *priv) 538 { 539 srv->priv = priv; 540 } 541 EXPORT_SYMBOL(rtrs_srv_set_sess_priv); 542 543 static void unmap_cont_bufs(struct rtrs_srv_path *srv_path) 544 { 545 int i; 546 547 for (i = 0; i < srv_path->mrs_num; i++) { 548 struct rtrs_srv_mr *srv_mr; 549 550 srv_mr = &srv_path->mrs[i]; 551 rtrs_iu_free(srv_mr->iu, srv_path->s.dev->ib_dev, 1); 552 ib_dereg_mr(srv_mr->mr); 553 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, srv_mr->sgt.sgl, 554 srv_mr->sgt.nents, DMA_BIDIRECTIONAL); 555 sg_free_table(&srv_mr->sgt); 556 } 557 kfree(srv_path->mrs); 558 } 559 560 static int map_cont_bufs(struct rtrs_srv_path *srv_path) 561 { 562 struct rtrs_srv_sess *srv = srv_path->srv; 563 struct rtrs_path *ss = &srv_path->s; 564 int i, mri, err, mrs_num; 565 unsigned int chunk_bits; 566 int chunks_per_mr = 1; 567 568 /* 569 * Here we map queue_depth chunks to MR. Firstly we have to 570 * figure out how many chunks can we map per MR. 571 */ 572 if (always_invalidate) { 573 /* 574 * in order to do invalidate for each chunks of memory, we needs 575 * more memory regions. 576 */ 577 mrs_num = srv->queue_depth; 578 } else { 579 chunks_per_mr = 580 srv_path->s.dev->ib_dev->attrs.max_fast_reg_page_list_len; 581 mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr); 582 chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num); 583 } 584 585 srv_path->mrs = kcalloc(mrs_num, sizeof(*srv_path->mrs), GFP_KERNEL); 586 if (!srv_path->mrs) 587 return -ENOMEM; 588 589 srv_path->mrs_num = mrs_num; 590 591 for (mri = 0; mri < mrs_num; mri++) { 592 struct rtrs_srv_mr *srv_mr = &srv_path->mrs[mri]; 593 struct sg_table *sgt = &srv_mr->sgt; 594 struct scatterlist *s; 595 struct ib_mr *mr; 596 int nr, nr_sgt, chunks; 597 598 chunks = chunks_per_mr * mri; 599 if (!always_invalidate) 600 chunks_per_mr = min_t(int, chunks_per_mr, 601 srv->queue_depth - chunks); 602 603 err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL); 604 if (err) 605 goto err; 606 607 for_each_sg(sgt->sgl, s, chunks_per_mr, i) 608 sg_set_page(s, srv->chunks[chunks + i], 609 max_chunk_size, 0); 610 611 nr_sgt = ib_dma_map_sg(srv_path->s.dev->ib_dev, sgt->sgl, 612 sgt->nents, DMA_BIDIRECTIONAL); 613 if (!nr_sgt) { 614 err = -EINVAL; 615 goto free_sg; 616 } 617 mr = ib_alloc_mr(srv_path->s.dev->ib_pd, IB_MR_TYPE_MEM_REG, 618 nr_sgt); 619 if (IS_ERR(mr)) { 620 err = PTR_ERR(mr); 621 goto unmap_sg; 622 } 623 nr = ib_map_mr_sg(mr, sgt->sgl, nr_sgt, 624 NULL, max_chunk_size); 625 if (nr < 0 || nr < sgt->nents) { 626 err = nr < 0 ? nr : -EINVAL; 627 goto dereg_mr; 628 } 629 630 if (always_invalidate) { 631 srv_mr->iu = rtrs_iu_alloc(1, 632 sizeof(struct rtrs_msg_rkey_rsp), 633 GFP_KERNEL, srv_path->s.dev->ib_dev, 634 DMA_TO_DEVICE, rtrs_srv_rdma_done); 635 if (!srv_mr->iu) { 636 err = -ENOMEM; 637 rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err); 638 goto dereg_mr; 639 } 640 } 641 /* Eventually dma addr for each chunk can be cached */ 642 for_each_sg(sgt->sgl, s, nr_sgt, i) 643 srv_path->dma_addr[chunks + i] = sg_dma_address(s); 644 645 ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey)); 646 srv_mr->mr = mr; 647 648 continue; 649 err: 650 while (mri--) { 651 srv_mr = &srv_path->mrs[mri]; 652 sgt = &srv_mr->sgt; 653 mr = srv_mr->mr; 654 rtrs_iu_free(srv_mr->iu, srv_path->s.dev->ib_dev, 1); 655 dereg_mr: 656 ib_dereg_mr(mr); 657 unmap_sg: 658 ib_dma_unmap_sg(srv_path->s.dev->ib_dev, sgt->sgl, 659 sgt->nents, DMA_BIDIRECTIONAL); 660 free_sg: 661 sg_free_table(sgt); 662 } 663 kfree(srv_path->mrs); 664 665 return err; 666 } 667 668 chunk_bits = ilog2(srv->queue_depth - 1) + 1; 669 srv_path->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits); 670 671 return 0; 672 } 673 674 static void rtrs_srv_hb_err_handler(struct rtrs_con *c) 675 { 676 close_path(to_srv_path(c->path)); 677 } 678 679 static void rtrs_srv_init_hb(struct rtrs_srv_path *srv_path) 680 { 681 rtrs_init_hb(&srv_path->s, &io_comp_cqe, 682 RTRS_HB_INTERVAL_MS, 683 RTRS_HB_MISSED_MAX, 684 rtrs_srv_hb_err_handler, 685 rtrs_wq); 686 } 687 688 static void rtrs_srv_start_hb(struct rtrs_srv_path *srv_path) 689 { 690 rtrs_start_hb(&srv_path->s); 691 } 692 693 static void rtrs_srv_stop_hb(struct rtrs_srv_path *srv_path) 694 { 695 rtrs_stop_hb(&srv_path->s); 696 } 697 698 static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc) 699 { 700 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 701 struct rtrs_path *s = con->c.path; 702 struct rtrs_srv_path *srv_path = to_srv_path(s); 703 struct rtrs_iu *iu; 704 705 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 706 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1); 707 708 if (wc->status != IB_WC_SUCCESS) { 709 rtrs_err(s, "Sess info response send failed: %s\n", 710 ib_wc_status_msg(wc->status)); 711 close_path(srv_path); 712 return; 713 } 714 WARN_ON(wc->opcode != IB_WC_SEND); 715 } 716 717 static void rtrs_srv_path_up(struct rtrs_srv_path *srv_path) 718 { 719 struct rtrs_srv_sess *srv = srv_path->srv; 720 struct rtrs_srv_ctx *ctx = srv->ctx; 721 int up; 722 723 mutex_lock(&srv->paths_ev_mutex); 724 up = ++srv->paths_up; 725 if (up == 1) 726 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL); 727 mutex_unlock(&srv->paths_ev_mutex); 728 729 /* Mark session as established */ 730 srv_path->established = true; 731 } 732 733 static void rtrs_srv_path_down(struct rtrs_srv_path *srv_path) 734 { 735 struct rtrs_srv_sess *srv = srv_path->srv; 736 struct rtrs_srv_ctx *ctx = srv->ctx; 737 738 if (!srv_path->established) 739 return; 740 741 srv_path->established = false; 742 mutex_lock(&srv->paths_ev_mutex); 743 WARN_ON(!srv->paths_up); 744 if (--srv->paths_up == 0) 745 ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv); 746 mutex_unlock(&srv->paths_ev_mutex); 747 } 748 749 static bool exist_pathname(struct rtrs_srv_ctx *ctx, 750 const char *pathname, const uuid_t *path_uuid) 751 { 752 struct rtrs_srv_sess *srv; 753 struct rtrs_srv_path *srv_path; 754 bool found = false; 755 756 mutex_lock(&ctx->srv_mutex); 757 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 758 mutex_lock(&srv->paths_mutex); 759 760 /* when a client with same uuid and same sessname tried to add a path */ 761 if (uuid_equal(&srv->paths_uuid, path_uuid)) { 762 mutex_unlock(&srv->paths_mutex); 763 continue; 764 } 765 766 list_for_each_entry(srv_path, &srv->paths_list, s.entry) { 767 if (strlen(srv_path->s.sessname) == strlen(pathname) && 768 !strcmp(srv_path->s.sessname, pathname)) { 769 found = true; 770 break; 771 } 772 } 773 mutex_unlock(&srv->paths_mutex); 774 if (found) 775 break; 776 } 777 mutex_unlock(&ctx->srv_mutex); 778 return found; 779 } 780 781 static int post_recv_path(struct rtrs_srv_path *srv_path); 782 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno); 783 784 static int process_info_req(struct rtrs_srv_con *con, 785 struct rtrs_msg_info_req *msg) 786 { 787 struct rtrs_path *s = con->c.path; 788 struct rtrs_srv_path *srv_path = to_srv_path(s); 789 struct ib_send_wr *reg_wr = NULL; 790 struct rtrs_msg_info_rsp *rsp; 791 struct rtrs_iu *tx_iu; 792 struct ib_reg_wr *rwr; 793 int mri, err; 794 size_t tx_sz; 795 796 err = post_recv_path(srv_path); 797 if (err) { 798 rtrs_err(s, "post_recv_path(), err: %d\n", err); 799 return err; 800 } 801 802 if (strchr(msg->pathname, '/') || strchr(msg->pathname, '.')) { 803 rtrs_err(s, "pathname cannot contain / and .\n"); 804 return -EINVAL; 805 } 806 807 if (exist_pathname(srv_path->srv->ctx, 808 msg->pathname, &srv_path->srv->paths_uuid)) { 809 rtrs_err(s, "pathname is duplicated: %s\n", msg->pathname); 810 return -EPERM; 811 } 812 strscpy(srv_path->s.sessname, msg->pathname, 813 sizeof(srv_path->s.sessname)); 814 815 rwr = kcalloc(srv_path->mrs_num, sizeof(*rwr), GFP_KERNEL); 816 if (!rwr) 817 return -ENOMEM; 818 819 tx_sz = sizeof(*rsp); 820 tx_sz += sizeof(rsp->desc[0]) * srv_path->mrs_num; 821 tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, srv_path->s.dev->ib_dev, 822 DMA_TO_DEVICE, rtrs_srv_info_rsp_done); 823 if (!tx_iu) { 824 err = -ENOMEM; 825 goto rwr_free; 826 } 827 828 rsp = tx_iu->buf; 829 rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP); 830 rsp->sg_cnt = cpu_to_le16(srv_path->mrs_num); 831 832 for (mri = 0; mri < srv_path->mrs_num; mri++) { 833 struct ib_mr *mr = srv_path->mrs[mri].mr; 834 835 rsp->desc[mri].addr = cpu_to_le64(mr->iova); 836 rsp->desc[mri].key = cpu_to_le32(mr->rkey); 837 rsp->desc[mri].len = cpu_to_le32(mr->length); 838 839 /* 840 * Fill in reg MR request and chain them *backwards* 841 */ 842 rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL; 843 rwr[mri].wr.opcode = IB_WR_REG_MR; 844 rwr[mri].wr.wr_cqe = &local_reg_cqe; 845 rwr[mri].wr.num_sge = 0; 846 rwr[mri].wr.send_flags = 0; 847 rwr[mri].mr = mr; 848 rwr[mri].key = mr->rkey; 849 rwr[mri].access = (IB_ACCESS_LOCAL_WRITE | 850 IB_ACCESS_REMOTE_WRITE); 851 reg_wr = &rwr[mri].wr; 852 } 853 854 err = rtrs_srv_create_path_files(srv_path); 855 if (err) 856 goto iu_free; 857 kobject_get(&srv_path->kobj); 858 get_device(&srv_path->srv->dev); 859 rtrs_srv_change_state(srv_path, RTRS_SRV_CONNECTED); 860 rtrs_srv_start_hb(srv_path); 861 862 /* 863 * We do not account number of established connections at the current 864 * moment, we rely on the client, which should send info request when 865 * all connections are successfully established. Thus, simply notify 866 * listener with a proper event if we are the first path. 867 */ 868 rtrs_srv_path_up(srv_path); 869 870 ib_dma_sync_single_for_device(srv_path->s.dev->ib_dev, 871 tx_iu->dma_addr, 872 tx_iu->size, DMA_TO_DEVICE); 873 874 /* Send info response */ 875 err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr); 876 if (err) { 877 rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err); 878 iu_free: 879 rtrs_iu_free(tx_iu, srv_path->s.dev->ib_dev, 1); 880 } 881 rwr_free: 882 kfree(rwr); 883 884 return err; 885 } 886 887 static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc) 888 { 889 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 890 struct rtrs_path *s = con->c.path; 891 struct rtrs_srv_path *srv_path = to_srv_path(s); 892 struct rtrs_msg_info_req *msg; 893 struct rtrs_iu *iu; 894 int err; 895 896 WARN_ON(con->c.cid); 897 898 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe); 899 if (wc->status != IB_WC_SUCCESS) { 900 rtrs_err(s, "Sess info request receive failed: %s\n", 901 ib_wc_status_msg(wc->status)); 902 goto close; 903 } 904 WARN_ON(wc->opcode != IB_WC_RECV); 905 906 if (wc->byte_len < sizeof(*msg)) { 907 rtrs_err(s, "Sess info request is malformed: size %d\n", 908 wc->byte_len); 909 goto close; 910 } 911 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev, iu->dma_addr, 912 iu->size, DMA_FROM_DEVICE); 913 msg = iu->buf; 914 if (le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ) { 915 rtrs_err(s, "Sess info request is malformed: type %d\n", 916 le16_to_cpu(msg->type)); 917 goto close; 918 } 919 err = process_info_req(con, msg); 920 if (err) 921 goto close; 922 923 out: 924 rtrs_iu_free(iu, srv_path->s.dev->ib_dev, 1); 925 return; 926 close: 927 close_path(srv_path); 928 goto out; 929 } 930 931 static int post_recv_info_req(struct rtrs_srv_con *con) 932 { 933 struct rtrs_path *s = con->c.path; 934 struct rtrs_srv_path *srv_path = to_srv_path(s); 935 struct rtrs_iu *rx_iu; 936 int err; 937 938 rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), 939 GFP_KERNEL, srv_path->s.dev->ib_dev, 940 DMA_FROM_DEVICE, rtrs_srv_info_req_done); 941 if (!rx_iu) 942 return -ENOMEM; 943 /* Prepare for getting info response */ 944 err = rtrs_iu_post_recv(&con->c, rx_iu); 945 if (err) { 946 rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err); 947 rtrs_iu_free(rx_iu, srv_path->s.dev->ib_dev, 1); 948 return err; 949 } 950 951 return 0; 952 } 953 954 static int post_recv_io(struct rtrs_srv_con *con, size_t q_size) 955 { 956 int i, err; 957 958 for (i = 0; i < q_size; i++) { 959 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 960 if (err) 961 return err; 962 } 963 964 return 0; 965 } 966 967 static int post_recv_path(struct rtrs_srv_path *srv_path) 968 { 969 struct rtrs_srv_sess *srv = srv_path->srv; 970 struct rtrs_path *s = &srv_path->s; 971 size_t q_size; 972 int err, cid; 973 974 for (cid = 0; cid < srv_path->s.con_num; cid++) { 975 if (cid == 0) 976 q_size = SERVICE_CON_QUEUE_DEPTH; 977 else 978 q_size = srv->queue_depth; 979 980 err = post_recv_io(to_srv_con(srv_path->s.con[cid]), q_size); 981 if (err) { 982 rtrs_err(s, "post_recv_io(), err: %d\n", err); 983 return err; 984 } 985 } 986 987 return 0; 988 } 989 990 static void process_read(struct rtrs_srv_con *con, 991 struct rtrs_msg_rdma_read *msg, 992 u32 buf_id, u32 off) 993 { 994 struct rtrs_path *s = con->c.path; 995 struct rtrs_srv_path *srv_path = to_srv_path(s); 996 struct rtrs_srv_sess *srv = srv_path->srv; 997 struct rtrs_srv_ctx *ctx = srv->ctx; 998 struct rtrs_srv_op *id; 999 1000 size_t usr_len, data_len; 1001 void *data; 1002 int ret; 1003 1004 if (srv_path->state != RTRS_SRV_CONNECTED) { 1005 rtrs_err_rl(s, 1006 "Processing read request failed, session is disconnected, sess state %s\n", 1007 rtrs_srv_state_str(srv_path->state)); 1008 return; 1009 } 1010 if (msg->sg_cnt != 1 && msg->sg_cnt != 0) { 1011 rtrs_err_rl(s, 1012 "Processing read request failed, invalid message\n"); 1013 return; 1014 } 1015 rtrs_srv_get_ops_ids(srv_path); 1016 rtrs_srv_update_rdma_stats(srv_path->stats, off, READ); 1017 id = srv_path->ops_ids[buf_id]; 1018 id->con = con; 1019 id->dir = READ; 1020 id->msg_id = buf_id; 1021 id->rd_msg = msg; 1022 usr_len = le16_to_cpu(msg->usr_len); 1023 data_len = off - usr_len; 1024 data = page_address(srv->chunks[buf_id]); 1025 ret = ctx->ops.rdma_ev(srv->priv, id, data, data_len, 1026 data + data_len, usr_len); 1027 1028 if (ret) { 1029 rtrs_err_rl(s, 1030 "Processing read request failed, user module cb reported for msg_id %d, err: %d\n", 1031 buf_id, ret); 1032 goto send_err_msg; 1033 } 1034 1035 return; 1036 1037 send_err_msg: 1038 ret = send_io_resp_imm(con, id, ret); 1039 if (ret < 0) { 1040 rtrs_err_rl(s, 1041 "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n", 1042 buf_id, ret); 1043 close_path(srv_path); 1044 } 1045 rtrs_srv_put_ops_ids(srv_path); 1046 } 1047 1048 static void process_write(struct rtrs_srv_con *con, 1049 struct rtrs_msg_rdma_write *req, 1050 u32 buf_id, u32 off) 1051 { 1052 struct rtrs_path *s = con->c.path; 1053 struct rtrs_srv_path *srv_path = to_srv_path(s); 1054 struct rtrs_srv_sess *srv = srv_path->srv; 1055 struct rtrs_srv_ctx *ctx = srv->ctx; 1056 struct rtrs_srv_op *id; 1057 1058 size_t data_len, usr_len; 1059 void *data; 1060 int ret; 1061 1062 if (srv_path->state != RTRS_SRV_CONNECTED) { 1063 rtrs_err_rl(s, 1064 "Processing write request failed, session is disconnected, sess state %s\n", 1065 rtrs_srv_state_str(srv_path->state)); 1066 return; 1067 } 1068 rtrs_srv_get_ops_ids(srv_path); 1069 rtrs_srv_update_rdma_stats(srv_path->stats, off, WRITE); 1070 id = srv_path->ops_ids[buf_id]; 1071 id->con = con; 1072 id->dir = WRITE; 1073 id->msg_id = buf_id; 1074 1075 usr_len = le16_to_cpu(req->usr_len); 1076 data_len = off - usr_len; 1077 data = page_address(srv->chunks[buf_id]); 1078 ret = ctx->ops.rdma_ev(srv->priv, id, data, data_len, 1079 data + data_len, usr_len); 1080 if (ret) { 1081 rtrs_err_rl(s, 1082 "Processing write request failed, user module callback reports err: %d\n", 1083 ret); 1084 goto send_err_msg; 1085 } 1086 1087 return; 1088 1089 send_err_msg: 1090 ret = send_io_resp_imm(con, id, ret); 1091 if (ret < 0) { 1092 rtrs_err_rl(s, 1093 "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n", 1094 buf_id, ret); 1095 close_path(srv_path); 1096 } 1097 rtrs_srv_put_ops_ids(srv_path); 1098 } 1099 1100 static void process_io_req(struct rtrs_srv_con *con, void *msg, 1101 u32 id, u32 off) 1102 { 1103 struct rtrs_path *s = con->c.path; 1104 struct rtrs_srv_path *srv_path = to_srv_path(s); 1105 struct rtrs_msg_rdma_hdr *hdr; 1106 unsigned int type; 1107 1108 ib_dma_sync_single_for_cpu(srv_path->s.dev->ib_dev, 1109 srv_path->dma_addr[id], 1110 max_chunk_size, DMA_BIDIRECTIONAL); 1111 hdr = msg; 1112 type = le16_to_cpu(hdr->type); 1113 1114 switch (type) { 1115 case RTRS_MSG_WRITE: 1116 process_write(con, msg, id, off); 1117 break; 1118 case RTRS_MSG_READ: 1119 process_read(con, msg, id, off); 1120 break; 1121 default: 1122 rtrs_err(s, 1123 "Processing I/O request failed, unknown message type received: 0x%02x\n", 1124 type); 1125 goto err; 1126 } 1127 1128 return; 1129 1130 err: 1131 close_path(srv_path); 1132 } 1133 1134 static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc) 1135 { 1136 struct rtrs_srv_mr *mr = 1137 container_of(wc->wr_cqe, typeof(*mr), inv_cqe); 1138 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1139 struct rtrs_path *s = con->c.path; 1140 struct rtrs_srv_path *srv_path = to_srv_path(s); 1141 struct rtrs_srv_sess *srv = srv_path->srv; 1142 u32 msg_id, off; 1143 void *data; 1144 1145 if (wc->status != IB_WC_SUCCESS) { 1146 rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n", 1147 ib_wc_status_msg(wc->status)); 1148 close_path(srv_path); 1149 } 1150 msg_id = mr->msg_id; 1151 off = mr->msg_off; 1152 data = page_address(srv->chunks[msg_id]) + off; 1153 process_io_req(con, data, msg_id, off); 1154 } 1155 1156 static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con, 1157 struct rtrs_srv_mr *mr) 1158 { 1159 struct ib_send_wr wr = { 1160 .opcode = IB_WR_LOCAL_INV, 1161 .wr_cqe = &mr->inv_cqe, 1162 .send_flags = IB_SEND_SIGNALED, 1163 .ex.invalidate_rkey = mr->mr->rkey, 1164 }; 1165 mr->inv_cqe.done = rtrs_srv_inv_rkey_done; 1166 1167 return ib_post_send(con->c.qp, &wr, NULL); 1168 } 1169 1170 static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con) 1171 { 1172 spin_lock(&con->rsp_wr_wait_lock); 1173 while (!list_empty(&con->rsp_wr_wait_list)) { 1174 struct rtrs_srv_op *id; 1175 int ret; 1176 1177 id = list_entry(con->rsp_wr_wait_list.next, 1178 struct rtrs_srv_op, wait_list); 1179 list_del(&id->wait_list); 1180 1181 spin_unlock(&con->rsp_wr_wait_lock); 1182 ret = rtrs_srv_resp_rdma(id, id->status); 1183 spin_lock(&con->rsp_wr_wait_lock); 1184 1185 if (!ret) { 1186 list_add(&id->wait_list, &con->rsp_wr_wait_list); 1187 break; 1188 } 1189 } 1190 spin_unlock(&con->rsp_wr_wait_lock); 1191 } 1192 1193 static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc) 1194 { 1195 struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context); 1196 struct rtrs_path *s = con->c.path; 1197 struct rtrs_srv_path *srv_path = to_srv_path(s); 1198 struct rtrs_srv_sess *srv = srv_path->srv; 1199 u32 imm_type, imm_payload; 1200 int err; 1201 1202 if (wc->status != IB_WC_SUCCESS) { 1203 if (wc->status != IB_WC_WR_FLUSH_ERR) { 1204 rtrs_err(s, 1205 "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n", 1206 ib_wc_status_msg(wc->status), wc->wr_cqe, 1207 wc->opcode, wc->vendor_err, wc->byte_len); 1208 close_path(srv_path); 1209 } 1210 return; 1211 } 1212 1213 switch (wc->opcode) { 1214 case IB_WC_RECV_RDMA_WITH_IMM: 1215 /* 1216 * post_recv() RDMA write completions of IO reqs (read/write) 1217 * and hb 1218 */ 1219 if (WARN_ON(wc->wr_cqe != &io_comp_cqe)) 1220 return; 1221 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe); 1222 if (err) { 1223 rtrs_err(s, "rtrs_post_recv(), err: %d\n", err); 1224 close_path(srv_path); 1225 break; 1226 } 1227 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), 1228 &imm_type, &imm_payload); 1229 if (imm_type == RTRS_IO_REQ_IMM) { 1230 u32 msg_id, off; 1231 void *data; 1232 1233 msg_id = imm_payload >> srv_path->mem_bits; 1234 off = imm_payload & ((1 << srv_path->mem_bits) - 1); 1235 if (msg_id >= srv->queue_depth || off >= max_chunk_size) { 1236 rtrs_err(s, "Wrong msg_id %u, off %u\n", 1237 msg_id, off); 1238 close_path(srv_path); 1239 return; 1240 } 1241 if (always_invalidate) { 1242 struct rtrs_srv_mr *mr = &srv_path->mrs[msg_id]; 1243 1244 mr->msg_off = off; 1245 mr->msg_id = msg_id; 1246 err = rtrs_srv_inv_rkey(con, mr); 1247 if (err) { 1248 rtrs_err(s, "rtrs_post_recv(), err: %d\n", 1249 err); 1250 close_path(srv_path); 1251 break; 1252 } 1253 } else { 1254 data = page_address(srv->chunks[msg_id]) + off; 1255 process_io_req(con, data, msg_id, off); 1256 } 1257 } else if (imm_type == RTRS_HB_MSG_IMM) { 1258 WARN_ON(con->c.cid); 1259 rtrs_send_hb_ack(&srv_path->s); 1260 } else if (imm_type == RTRS_HB_ACK_IMM) { 1261 WARN_ON(con->c.cid); 1262 srv_path->s.hb_missed_cnt = 0; 1263 } else { 1264 rtrs_wrn(s, "Unknown IMM type %u\n", imm_type); 1265 } 1266 break; 1267 case IB_WC_RDMA_WRITE: 1268 case IB_WC_SEND: 1269 /* 1270 * post_send() RDMA write completions of IO reqs (read/write) 1271 * and hb. 1272 */ 1273 atomic_add(s->signal_interval, &con->c.sq_wr_avail); 1274 1275 if (!list_empty_careful(&con->rsp_wr_wait_list)) 1276 rtrs_rdma_process_wr_wait_list(con); 1277 1278 break; 1279 default: 1280 rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode); 1281 return; 1282 } 1283 } 1284 1285 /** 1286 * rtrs_srv_get_path_name() - Get rtrs_srv peer hostname. 1287 * @srv: Session 1288 * @pathname: Pathname buffer 1289 * @len: Length of sessname buffer 1290 */ 1291 int rtrs_srv_get_path_name(struct rtrs_srv_sess *srv, char *pathname, 1292 size_t len) 1293 { 1294 struct rtrs_srv_path *srv_path; 1295 int err = -ENOTCONN; 1296 1297 mutex_lock(&srv->paths_mutex); 1298 list_for_each_entry(srv_path, &srv->paths_list, s.entry) { 1299 if (srv_path->state != RTRS_SRV_CONNECTED) 1300 continue; 1301 strscpy(pathname, srv_path->s.sessname, 1302 min_t(size_t, sizeof(srv_path->s.sessname), len)); 1303 err = 0; 1304 break; 1305 } 1306 mutex_unlock(&srv->paths_mutex); 1307 1308 return err; 1309 } 1310 EXPORT_SYMBOL(rtrs_srv_get_path_name); 1311 1312 /** 1313 * rtrs_srv_get_queue_depth() - Get rtrs_srv qdepth. 1314 * @srv: Session 1315 */ 1316 int rtrs_srv_get_queue_depth(struct rtrs_srv_sess *srv) 1317 { 1318 return srv->queue_depth; 1319 } 1320 EXPORT_SYMBOL(rtrs_srv_get_queue_depth); 1321 1322 static int find_next_bit_ring(struct rtrs_srv_path *srv_path) 1323 { 1324 struct ib_device *ib_dev = srv_path->s.dev->ib_dev; 1325 int v; 1326 1327 v = cpumask_next(srv_path->cur_cq_vector, &cq_affinity_mask); 1328 if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors) 1329 v = cpumask_first(&cq_affinity_mask); 1330 return v; 1331 } 1332 1333 static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_path *srv_path) 1334 { 1335 srv_path->cur_cq_vector = find_next_bit_ring(srv_path); 1336 1337 return srv_path->cur_cq_vector; 1338 } 1339 1340 static void rtrs_srv_dev_release(struct device *dev) 1341 { 1342 struct rtrs_srv_sess *srv = container_of(dev, struct rtrs_srv_sess, 1343 dev); 1344 1345 kfree(srv); 1346 } 1347 1348 static void free_srv(struct rtrs_srv_sess *srv) 1349 { 1350 int i; 1351 1352 WARN_ON(refcount_read(&srv->refcount)); 1353 for (i = 0; i < srv->queue_depth; i++) 1354 __free_pages(srv->chunks[i], get_order(max_chunk_size)); 1355 kfree(srv->chunks); 1356 mutex_destroy(&srv->paths_mutex); 1357 mutex_destroy(&srv->paths_ev_mutex); 1358 /* last put to release the srv structure */ 1359 put_device(&srv->dev); 1360 } 1361 1362 static struct rtrs_srv_sess *get_or_create_srv(struct rtrs_srv_ctx *ctx, 1363 const uuid_t *paths_uuid, 1364 bool first_conn) 1365 { 1366 struct rtrs_srv_sess *srv; 1367 int i; 1368 1369 mutex_lock(&ctx->srv_mutex); 1370 list_for_each_entry(srv, &ctx->srv_list, ctx_list) { 1371 if (uuid_equal(&srv->paths_uuid, paths_uuid) && 1372 refcount_inc_not_zero(&srv->refcount)) { 1373 mutex_unlock(&ctx->srv_mutex); 1374 return srv; 1375 } 1376 } 1377 mutex_unlock(&ctx->srv_mutex); 1378 /* 1379 * If this request is not the first connection request from the 1380 * client for this session then fail and return error. 1381 */ 1382 if (!first_conn) { 1383 pr_err_ratelimited("Error: Not the first connection request for this session\n"); 1384 return ERR_PTR(-ENXIO); 1385 } 1386 1387 /* need to allocate a new srv */ 1388 srv = kzalloc(sizeof(*srv), GFP_KERNEL); 1389 if (!srv) 1390 return ERR_PTR(-ENOMEM); 1391 1392 INIT_LIST_HEAD(&srv->paths_list); 1393 mutex_init(&srv->paths_mutex); 1394 mutex_init(&srv->paths_ev_mutex); 1395 uuid_copy(&srv->paths_uuid, paths_uuid); 1396 srv->queue_depth = sess_queue_depth; 1397 srv->ctx = ctx; 1398 device_initialize(&srv->dev); 1399 srv->dev.release = rtrs_srv_dev_release; 1400 1401 srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks), 1402 GFP_KERNEL); 1403 if (!srv->chunks) 1404 goto err_free_srv; 1405 1406 for (i = 0; i < srv->queue_depth; i++) { 1407 srv->chunks[i] = alloc_pages(GFP_KERNEL, 1408 get_order(max_chunk_size)); 1409 if (!srv->chunks[i]) 1410 goto err_free_chunks; 1411 } 1412 refcount_set(&srv->refcount, 1); 1413 mutex_lock(&ctx->srv_mutex); 1414 list_add(&srv->ctx_list, &ctx->srv_list); 1415 mutex_unlock(&ctx->srv_mutex); 1416 1417 return srv; 1418 1419 err_free_chunks: 1420 while (i--) 1421 __free_pages(srv->chunks[i], get_order(max_chunk_size)); 1422 kfree(srv->chunks); 1423 1424 err_free_srv: 1425 kfree(srv); 1426 return ERR_PTR(-ENOMEM); 1427 } 1428 1429 static void put_srv(struct rtrs_srv_sess *srv) 1430 { 1431 if (refcount_dec_and_test(&srv->refcount)) { 1432 struct rtrs_srv_ctx *ctx = srv->ctx; 1433 1434 WARN_ON(srv->dev.kobj.state_in_sysfs); 1435 1436 mutex_lock(&ctx->srv_mutex); 1437 list_del(&srv->ctx_list); 1438 mutex_unlock(&ctx->srv_mutex); 1439 free_srv(srv); 1440 } 1441 } 1442 1443 static void __add_path_to_srv(struct rtrs_srv_sess *srv, 1444 struct rtrs_srv_path *srv_path) 1445 { 1446 list_add_tail(&srv_path->s.entry, &srv->paths_list); 1447 srv->paths_num++; 1448 WARN_ON(srv->paths_num >= MAX_PATHS_NUM); 1449 } 1450 1451 static void del_path_from_srv(struct rtrs_srv_path *srv_path) 1452 { 1453 struct rtrs_srv_sess *srv = srv_path->srv; 1454 1455 if (WARN_ON(!srv)) 1456 return; 1457 1458 mutex_lock(&srv->paths_mutex); 1459 list_del(&srv_path->s.entry); 1460 WARN_ON(!srv->paths_num); 1461 srv->paths_num--; 1462 mutex_unlock(&srv->paths_mutex); 1463 } 1464 1465 /* return true if addresses are the same, error other wise */ 1466 static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b) 1467 { 1468 switch (a->sa_family) { 1469 case AF_IB: 1470 return memcmp(&((struct sockaddr_ib *)a)->sib_addr, 1471 &((struct sockaddr_ib *)b)->sib_addr, 1472 sizeof(struct ib_addr)) && 1473 (b->sa_family == AF_IB); 1474 case AF_INET: 1475 return memcmp(&((struct sockaddr_in *)a)->sin_addr, 1476 &((struct sockaddr_in *)b)->sin_addr, 1477 sizeof(struct in_addr)) && 1478 (b->sa_family == AF_INET); 1479 case AF_INET6: 1480 return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr, 1481 &((struct sockaddr_in6 *)b)->sin6_addr, 1482 sizeof(struct in6_addr)) && 1483 (b->sa_family == AF_INET6); 1484 default: 1485 return -ENOENT; 1486 } 1487 } 1488 1489 static bool __is_path_w_addr_exists(struct rtrs_srv_sess *srv, 1490 struct rdma_addr *addr) 1491 { 1492 struct rtrs_srv_path *srv_path; 1493 1494 list_for_each_entry(srv_path, &srv->paths_list, s.entry) 1495 if (!sockaddr_cmp((struct sockaddr *)&srv_path->s.dst_addr, 1496 (struct sockaddr *)&addr->dst_addr) && 1497 !sockaddr_cmp((struct sockaddr *)&srv_path->s.src_addr, 1498 (struct sockaddr *)&addr->src_addr)) 1499 return true; 1500 1501 return false; 1502 } 1503 1504 static void free_path(struct rtrs_srv_path *srv_path) 1505 { 1506 if (srv_path->kobj.state_in_sysfs) { 1507 kobject_del(&srv_path->kobj); 1508 kobject_put(&srv_path->kobj); 1509 } else { 1510 free_percpu(srv_path->stats->rdma_stats); 1511 kfree(srv_path->stats); 1512 kfree(srv_path); 1513 } 1514 } 1515 1516 static void rtrs_srv_close_work(struct work_struct *work) 1517 { 1518 struct rtrs_srv_path *srv_path; 1519 struct rtrs_srv_con *con; 1520 int i; 1521 1522 srv_path = container_of(work, typeof(*srv_path), close_work); 1523 1524 rtrs_srv_destroy_path_files(srv_path); 1525 rtrs_srv_stop_hb(srv_path); 1526 1527 for (i = 0; i < srv_path->s.con_num; i++) { 1528 if (!srv_path->s.con[i]) 1529 continue; 1530 con = to_srv_con(srv_path->s.con[i]); 1531 rdma_disconnect(con->c.cm_id); 1532 ib_drain_qp(con->c.qp); 1533 } 1534 1535 /* 1536 * Degrade ref count to the usual model with a single shared 1537 * atomic_t counter 1538 */ 1539 percpu_ref_kill(&srv_path->ids_inflight_ref); 1540 1541 /* Wait for all completion */ 1542 wait_for_completion(&srv_path->complete_done); 1543 1544 /* Notify upper layer if we are the last path */ 1545 rtrs_srv_path_down(srv_path); 1546 1547 unmap_cont_bufs(srv_path); 1548 rtrs_srv_free_ops_ids(srv_path); 1549 1550 for (i = 0; i < srv_path->s.con_num; i++) { 1551 if (!srv_path->s.con[i]) 1552 continue; 1553 con = to_srv_con(srv_path->s.con[i]); 1554 rtrs_cq_qp_destroy(&con->c); 1555 rdma_destroy_id(con->c.cm_id); 1556 kfree(con); 1557 } 1558 rtrs_ib_dev_put(srv_path->s.dev); 1559 1560 del_path_from_srv(srv_path); 1561 put_srv(srv_path->srv); 1562 srv_path->srv = NULL; 1563 rtrs_srv_change_state(srv_path, RTRS_SRV_CLOSED); 1564 1565 kfree(srv_path->dma_addr); 1566 kfree(srv_path->s.con); 1567 free_path(srv_path); 1568 } 1569 1570 static int rtrs_rdma_do_accept(struct rtrs_srv_path *srv_path, 1571 struct rdma_cm_id *cm_id) 1572 { 1573 struct rtrs_srv_sess *srv = srv_path->srv; 1574 struct rtrs_msg_conn_rsp msg; 1575 struct rdma_conn_param param; 1576 int err; 1577 1578 param = (struct rdma_conn_param) { 1579 .rnr_retry_count = 7, 1580 .private_data = &msg, 1581 .private_data_len = sizeof(msg), 1582 }; 1583 1584 msg = (struct rtrs_msg_conn_rsp) { 1585 .magic = cpu_to_le16(RTRS_MAGIC), 1586 .version = cpu_to_le16(RTRS_PROTO_VER), 1587 .queue_depth = cpu_to_le16(srv->queue_depth), 1588 .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE), 1589 .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE), 1590 }; 1591 1592 if (always_invalidate) 1593 msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F); 1594 1595 err = rdma_accept(cm_id, ¶m); 1596 if (err) 1597 pr_err("rdma_accept(), err: %d\n", err); 1598 1599 return err; 1600 } 1601 1602 static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno) 1603 { 1604 struct rtrs_msg_conn_rsp msg; 1605 int err; 1606 1607 msg = (struct rtrs_msg_conn_rsp) { 1608 .magic = cpu_to_le16(RTRS_MAGIC), 1609 .version = cpu_to_le16(RTRS_PROTO_VER), 1610 .errno = cpu_to_le16(errno), 1611 }; 1612 1613 err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED); 1614 if (err) 1615 pr_err("rdma_reject(), err: %d\n", err); 1616 1617 /* Bounce errno back */ 1618 return errno; 1619 } 1620 1621 static struct rtrs_srv_path * 1622 __find_path(struct rtrs_srv_sess *srv, const uuid_t *sess_uuid) 1623 { 1624 struct rtrs_srv_path *srv_path; 1625 1626 list_for_each_entry(srv_path, &srv->paths_list, s.entry) { 1627 if (uuid_equal(&srv_path->s.uuid, sess_uuid)) 1628 return srv_path; 1629 } 1630 1631 return NULL; 1632 } 1633 1634 static int create_con(struct rtrs_srv_path *srv_path, 1635 struct rdma_cm_id *cm_id, 1636 unsigned int cid) 1637 { 1638 struct rtrs_srv_sess *srv = srv_path->srv; 1639 struct rtrs_path *s = &srv_path->s; 1640 struct rtrs_srv_con *con; 1641 1642 u32 cq_num, max_send_wr, max_recv_wr, wr_limit; 1643 int err, cq_vector; 1644 1645 con = kzalloc(sizeof(*con), GFP_KERNEL); 1646 if (!con) { 1647 err = -ENOMEM; 1648 goto err; 1649 } 1650 1651 spin_lock_init(&con->rsp_wr_wait_lock); 1652 INIT_LIST_HEAD(&con->rsp_wr_wait_list); 1653 con->c.cm_id = cm_id; 1654 con->c.path = &srv_path->s; 1655 con->c.cid = cid; 1656 atomic_set(&con->c.wr_cnt, 1); 1657 wr_limit = srv_path->s.dev->ib_dev->attrs.max_qp_wr; 1658 1659 if (con->c.cid == 0) { 1660 /* 1661 * All receive and all send (each requiring invalidate) 1662 * + 2 for drain and heartbeat 1663 */ 1664 max_send_wr = min_t(int, wr_limit, 1665 SERVICE_CON_QUEUE_DEPTH * 2 + 2); 1666 max_recv_wr = max_send_wr; 1667 s->signal_interval = min_not_zero(srv->queue_depth, 1668 (size_t)SERVICE_CON_QUEUE_DEPTH); 1669 } else { 1670 /* when always_invlaidate enalbed, we need linv+rinv+mr+imm */ 1671 if (always_invalidate) 1672 max_send_wr = 1673 min_t(int, wr_limit, 1674 srv->queue_depth * (1 + 4) + 1); 1675 else 1676 max_send_wr = 1677 min_t(int, wr_limit, 1678 srv->queue_depth * (1 + 2) + 1); 1679 1680 max_recv_wr = srv->queue_depth + 1; 1681 /* 1682 * If we have all receive requests posted and 1683 * all write requests posted and each read request 1684 * requires an invalidate request + drain 1685 * and qp gets into error state. 1686 */ 1687 } 1688 cq_num = max_send_wr + max_recv_wr; 1689 atomic_set(&con->c.sq_wr_avail, max_send_wr); 1690 cq_vector = rtrs_srv_get_next_cq_vector(srv_path); 1691 1692 /* TODO: SOFTIRQ can be faster, but be careful with softirq context */ 1693 err = rtrs_cq_qp_create(&srv_path->s, &con->c, 1, cq_vector, cq_num, 1694 max_send_wr, max_recv_wr, 1695 IB_POLL_WORKQUEUE); 1696 if (err) { 1697 rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err); 1698 goto free_con; 1699 } 1700 if (con->c.cid == 0) { 1701 err = post_recv_info_req(con); 1702 if (err) 1703 goto free_cqqp; 1704 } 1705 WARN_ON(srv_path->s.con[cid]); 1706 srv_path->s.con[cid] = &con->c; 1707 1708 /* 1709 * Change context from server to current connection. The other 1710 * way is to use cm_id->qp->qp_context, which does not work on OFED. 1711 */ 1712 cm_id->context = &con->c; 1713 1714 return 0; 1715 1716 free_cqqp: 1717 rtrs_cq_qp_destroy(&con->c); 1718 free_con: 1719 kfree(con); 1720 1721 err: 1722 return err; 1723 } 1724 1725 static struct rtrs_srv_path *__alloc_path(struct rtrs_srv_sess *srv, 1726 struct rdma_cm_id *cm_id, 1727 unsigned int con_num, 1728 unsigned int recon_cnt, 1729 const uuid_t *uuid) 1730 { 1731 struct rtrs_srv_path *srv_path; 1732 int err = -ENOMEM; 1733 char str[NAME_MAX]; 1734 struct rtrs_addr path; 1735 1736 if (srv->paths_num >= MAX_PATHS_NUM) { 1737 err = -ECONNRESET; 1738 goto err; 1739 } 1740 if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) { 1741 err = -EEXIST; 1742 pr_err("Path with same addr exists\n"); 1743 goto err; 1744 } 1745 srv_path = kzalloc(sizeof(*srv_path), GFP_KERNEL); 1746 if (!srv_path) 1747 goto err; 1748 1749 srv_path->stats = kzalloc(sizeof(*srv_path->stats), GFP_KERNEL); 1750 if (!srv_path->stats) 1751 goto err_free_sess; 1752 1753 srv_path->stats->rdma_stats = alloc_percpu(struct rtrs_srv_stats_rdma_stats); 1754 if (!srv_path->stats->rdma_stats) 1755 goto err_free_stats; 1756 1757 srv_path->stats->srv_path = srv_path; 1758 1759 srv_path->dma_addr = kcalloc(srv->queue_depth, 1760 sizeof(*srv_path->dma_addr), 1761 GFP_KERNEL); 1762 if (!srv_path->dma_addr) 1763 goto err_free_percpu; 1764 1765 srv_path->s.con = kcalloc(con_num, sizeof(*srv_path->s.con), 1766 GFP_KERNEL); 1767 if (!srv_path->s.con) 1768 goto err_free_dma_addr; 1769 1770 srv_path->state = RTRS_SRV_CONNECTING; 1771 srv_path->srv = srv; 1772 srv_path->cur_cq_vector = -1; 1773 srv_path->s.dst_addr = cm_id->route.addr.dst_addr; 1774 srv_path->s.src_addr = cm_id->route.addr.src_addr; 1775 1776 /* temporary until receiving session-name from client */ 1777 path.src = &srv_path->s.src_addr; 1778 path.dst = &srv_path->s.dst_addr; 1779 rtrs_addr_to_str(&path, str, sizeof(str)); 1780 strscpy(srv_path->s.sessname, str, sizeof(srv_path->s.sessname)); 1781 1782 srv_path->s.con_num = con_num; 1783 srv_path->s.irq_con_num = con_num; 1784 srv_path->s.recon_cnt = recon_cnt; 1785 uuid_copy(&srv_path->s.uuid, uuid); 1786 spin_lock_init(&srv_path->state_lock); 1787 INIT_WORK(&srv_path->close_work, rtrs_srv_close_work); 1788 rtrs_srv_init_hb(srv_path); 1789 1790 srv_path->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd); 1791 if (!srv_path->s.dev) { 1792 err = -ENOMEM; 1793 goto err_free_con; 1794 } 1795 err = map_cont_bufs(srv_path); 1796 if (err) 1797 goto err_put_dev; 1798 1799 err = rtrs_srv_alloc_ops_ids(srv_path); 1800 if (err) 1801 goto err_unmap_bufs; 1802 1803 __add_path_to_srv(srv, srv_path); 1804 1805 return srv_path; 1806 1807 err_unmap_bufs: 1808 unmap_cont_bufs(srv_path); 1809 err_put_dev: 1810 rtrs_ib_dev_put(srv_path->s.dev); 1811 err_free_con: 1812 kfree(srv_path->s.con); 1813 err_free_dma_addr: 1814 kfree(srv_path->dma_addr); 1815 err_free_percpu: 1816 free_percpu(srv_path->stats->rdma_stats); 1817 err_free_stats: 1818 kfree(srv_path->stats); 1819 err_free_sess: 1820 kfree(srv_path); 1821 err: 1822 return ERR_PTR(err); 1823 } 1824 1825 static int rtrs_rdma_connect(struct rdma_cm_id *cm_id, 1826 const struct rtrs_msg_conn_req *msg, 1827 size_t len) 1828 { 1829 struct rtrs_srv_ctx *ctx = cm_id->context; 1830 struct rtrs_srv_path *srv_path; 1831 struct rtrs_srv_sess *srv; 1832 1833 u16 version, con_num, cid; 1834 u16 recon_cnt; 1835 int err = -ECONNRESET; 1836 1837 if (len < sizeof(*msg)) { 1838 pr_err("Invalid RTRS connection request\n"); 1839 goto reject_w_err; 1840 } 1841 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) { 1842 pr_err("Invalid RTRS magic\n"); 1843 goto reject_w_err; 1844 } 1845 version = le16_to_cpu(msg->version); 1846 if (version >> 8 != RTRS_PROTO_VER_MAJOR) { 1847 pr_err("Unsupported major RTRS version: %d, expected %d\n", 1848 version >> 8, RTRS_PROTO_VER_MAJOR); 1849 goto reject_w_err; 1850 } 1851 con_num = le16_to_cpu(msg->cid_num); 1852 if (con_num > 4096) { 1853 /* Sanity check */ 1854 pr_err("Too many connections requested: %d\n", con_num); 1855 goto reject_w_err; 1856 } 1857 cid = le16_to_cpu(msg->cid); 1858 if (cid >= con_num) { 1859 /* Sanity check */ 1860 pr_err("Incorrect cid: %d >= %d\n", cid, con_num); 1861 goto reject_w_err; 1862 } 1863 recon_cnt = le16_to_cpu(msg->recon_cnt); 1864 srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn); 1865 if (IS_ERR(srv)) { 1866 err = PTR_ERR(srv); 1867 pr_err("get_or_create_srv(), error %d\n", err); 1868 goto reject_w_err; 1869 } 1870 mutex_lock(&srv->paths_mutex); 1871 srv_path = __find_path(srv, &msg->sess_uuid); 1872 if (srv_path) { 1873 struct rtrs_path *s = &srv_path->s; 1874 1875 /* Session already holds a reference */ 1876 put_srv(srv); 1877 1878 if (srv_path->state != RTRS_SRV_CONNECTING) { 1879 rtrs_err(s, "Session in wrong state: %s\n", 1880 rtrs_srv_state_str(srv_path->state)); 1881 mutex_unlock(&srv->paths_mutex); 1882 goto reject_w_err; 1883 } 1884 /* 1885 * Sanity checks 1886 */ 1887 if (con_num != s->con_num || cid >= s->con_num) { 1888 rtrs_err(s, "Incorrect request: %d, %d\n", 1889 cid, con_num); 1890 mutex_unlock(&srv->paths_mutex); 1891 goto reject_w_err; 1892 } 1893 if (s->con[cid]) { 1894 rtrs_err(s, "Connection already exists: %d\n", 1895 cid); 1896 mutex_unlock(&srv->paths_mutex); 1897 goto reject_w_err; 1898 } 1899 } else { 1900 srv_path = __alloc_path(srv, cm_id, con_num, recon_cnt, 1901 &msg->sess_uuid); 1902 if (IS_ERR(srv_path)) { 1903 mutex_unlock(&srv->paths_mutex); 1904 put_srv(srv); 1905 err = PTR_ERR(srv_path); 1906 pr_err("RTRS server session allocation failed: %d\n", err); 1907 goto reject_w_err; 1908 } 1909 } 1910 err = create_con(srv_path, cm_id, cid); 1911 if (err) { 1912 rtrs_err((&srv_path->s), "create_con(), error %d\n", err); 1913 rtrs_rdma_do_reject(cm_id, err); 1914 /* 1915 * Since session has other connections we follow normal way 1916 * through workqueue, but still return an error to tell cma.c 1917 * to call rdma_destroy_id() for current connection. 1918 */ 1919 goto close_and_return_err; 1920 } 1921 err = rtrs_rdma_do_accept(srv_path, cm_id); 1922 if (err) { 1923 rtrs_err((&srv_path->s), "rtrs_rdma_do_accept(), error %d\n", err); 1924 rtrs_rdma_do_reject(cm_id, err); 1925 /* 1926 * Since current connection was successfully added to the 1927 * session we follow normal way through workqueue to close the 1928 * session, thus return 0 to tell cma.c we call 1929 * rdma_destroy_id() ourselves. 1930 */ 1931 err = 0; 1932 goto close_and_return_err; 1933 } 1934 mutex_unlock(&srv->paths_mutex); 1935 1936 return 0; 1937 1938 reject_w_err: 1939 return rtrs_rdma_do_reject(cm_id, err); 1940 1941 close_and_return_err: 1942 mutex_unlock(&srv->paths_mutex); 1943 close_path(srv_path); 1944 1945 return err; 1946 } 1947 1948 static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id, 1949 struct rdma_cm_event *ev) 1950 { 1951 struct rtrs_srv_path *srv_path = NULL; 1952 struct rtrs_path *s = NULL; 1953 1954 if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) { 1955 struct rtrs_con *c = cm_id->context; 1956 1957 s = c->path; 1958 srv_path = to_srv_path(s); 1959 } 1960 1961 switch (ev->event) { 1962 case RDMA_CM_EVENT_CONNECT_REQUEST: 1963 /* 1964 * In case of error cma.c will destroy cm_id, 1965 * see cma_process_remove() 1966 */ 1967 return rtrs_rdma_connect(cm_id, ev->param.conn.private_data, 1968 ev->param.conn.private_data_len); 1969 case RDMA_CM_EVENT_ESTABLISHED: 1970 /* Nothing here */ 1971 break; 1972 case RDMA_CM_EVENT_REJECTED: 1973 case RDMA_CM_EVENT_CONNECT_ERROR: 1974 case RDMA_CM_EVENT_UNREACHABLE: 1975 rtrs_err(s, "CM error (CM event: %s, err: %d)\n", 1976 rdma_event_msg(ev->event), ev->status); 1977 fallthrough; 1978 case RDMA_CM_EVENT_DISCONNECTED: 1979 case RDMA_CM_EVENT_ADDR_CHANGE: 1980 case RDMA_CM_EVENT_TIMEWAIT_EXIT: 1981 case RDMA_CM_EVENT_DEVICE_REMOVAL: 1982 close_path(srv_path); 1983 break; 1984 default: 1985 pr_err("Ignoring unexpected CM event %s, err %d\n", 1986 rdma_event_msg(ev->event), ev->status); 1987 break; 1988 } 1989 1990 return 0; 1991 } 1992 1993 static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx, 1994 struct sockaddr *addr, 1995 enum rdma_ucm_port_space ps) 1996 { 1997 struct rdma_cm_id *cm_id; 1998 int ret; 1999 2000 cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler, 2001 ctx, ps, IB_QPT_RC); 2002 if (IS_ERR(cm_id)) { 2003 ret = PTR_ERR(cm_id); 2004 pr_err("Creating id for RDMA connection failed, err: %d\n", 2005 ret); 2006 goto err_out; 2007 } 2008 ret = rdma_bind_addr(cm_id, addr); 2009 if (ret) { 2010 pr_err("Binding RDMA address failed, err: %d\n", ret); 2011 goto err_cm; 2012 } 2013 ret = rdma_listen(cm_id, 64); 2014 if (ret) { 2015 pr_err("Listening on RDMA connection failed, err: %d\n", 2016 ret); 2017 goto err_cm; 2018 } 2019 2020 return cm_id; 2021 2022 err_cm: 2023 rdma_destroy_id(cm_id); 2024 err_out: 2025 2026 return ERR_PTR(ret); 2027 } 2028 2029 static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port) 2030 { 2031 struct sockaddr_in6 sin = { 2032 .sin6_family = AF_INET6, 2033 .sin6_addr = IN6ADDR_ANY_INIT, 2034 .sin6_port = htons(port), 2035 }; 2036 struct sockaddr_ib sib = { 2037 .sib_family = AF_IB, 2038 .sib_sid = cpu_to_be64(RDMA_IB_IP_PS_IB | port), 2039 .sib_sid_mask = cpu_to_be64(0xffffffffffffffffULL), 2040 .sib_pkey = cpu_to_be16(0xffff), 2041 }; 2042 struct rdma_cm_id *cm_ip, *cm_ib; 2043 int ret; 2044 2045 /* 2046 * We accept both IPoIB and IB connections, so we need to keep 2047 * two cm id's, one for each socket type and port space. 2048 * If the cm initialization of one of the id's fails, we abort 2049 * everything. 2050 */ 2051 cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP); 2052 if (IS_ERR(cm_ip)) 2053 return PTR_ERR(cm_ip); 2054 2055 cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB); 2056 if (IS_ERR(cm_ib)) { 2057 ret = PTR_ERR(cm_ib); 2058 goto free_cm_ip; 2059 } 2060 2061 ctx->cm_id_ip = cm_ip; 2062 ctx->cm_id_ib = cm_ib; 2063 2064 return 0; 2065 2066 free_cm_ip: 2067 rdma_destroy_id(cm_ip); 2068 2069 return ret; 2070 } 2071 2072 static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops) 2073 { 2074 struct rtrs_srv_ctx *ctx; 2075 2076 ctx = kzalloc(sizeof(*ctx), GFP_KERNEL); 2077 if (!ctx) 2078 return NULL; 2079 2080 ctx->ops = *ops; 2081 mutex_init(&ctx->srv_mutex); 2082 INIT_LIST_HEAD(&ctx->srv_list); 2083 2084 return ctx; 2085 } 2086 2087 static void free_srv_ctx(struct rtrs_srv_ctx *ctx) 2088 { 2089 WARN_ON(!list_empty(&ctx->srv_list)); 2090 mutex_destroy(&ctx->srv_mutex); 2091 kfree(ctx); 2092 } 2093 2094 static int rtrs_srv_add_one(struct ib_device *device) 2095 { 2096 struct rtrs_srv_ctx *ctx; 2097 int ret = 0; 2098 2099 mutex_lock(&ib_ctx.ib_dev_mutex); 2100 if (ib_ctx.ib_dev_count) 2101 goto out; 2102 2103 /* 2104 * Since our CM IDs are NOT bound to any ib device we will create them 2105 * only once 2106 */ 2107 ctx = ib_ctx.srv_ctx; 2108 ret = rtrs_srv_rdma_init(ctx, ib_ctx.port); 2109 if (ret) { 2110 /* 2111 * We errored out here. 2112 * According to the ib code, if we encounter an error here then the 2113 * error code is ignored, and no more calls to our ops are made. 2114 */ 2115 pr_err("Failed to initialize RDMA connection"); 2116 goto err_out; 2117 } 2118 2119 out: 2120 /* 2121 * Keep a track on the number of ib devices added 2122 */ 2123 ib_ctx.ib_dev_count++; 2124 2125 err_out: 2126 mutex_unlock(&ib_ctx.ib_dev_mutex); 2127 return ret; 2128 } 2129 2130 static void rtrs_srv_remove_one(struct ib_device *device, void *client_data) 2131 { 2132 struct rtrs_srv_ctx *ctx; 2133 2134 mutex_lock(&ib_ctx.ib_dev_mutex); 2135 ib_ctx.ib_dev_count--; 2136 2137 if (ib_ctx.ib_dev_count) 2138 goto out; 2139 2140 /* 2141 * Since our CM IDs are NOT bound to any ib device we will remove them 2142 * only once, when the last device is removed 2143 */ 2144 ctx = ib_ctx.srv_ctx; 2145 rdma_destroy_id(ctx->cm_id_ip); 2146 rdma_destroy_id(ctx->cm_id_ib); 2147 2148 out: 2149 mutex_unlock(&ib_ctx.ib_dev_mutex); 2150 } 2151 2152 static struct ib_client rtrs_srv_client = { 2153 .name = "rtrs_server", 2154 .add = rtrs_srv_add_one, 2155 .remove = rtrs_srv_remove_one 2156 }; 2157 2158 /** 2159 * rtrs_srv_open() - open RTRS server context 2160 * @ops: callback functions 2161 * @port: port to listen on 2162 * 2163 * Creates server context with specified callbacks. 2164 * 2165 * Return a valid pointer on success otherwise PTR_ERR. 2166 */ 2167 struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port) 2168 { 2169 struct rtrs_srv_ctx *ctx; 2170 int err; 2171 2172 ctx = alloc_srv_ctx(ops); 2173 if (!ctx) 2174 return ERR_PTR(-ENOMEM); 2175 2176 mutex_init(&ib_ctx.ib_dev_mutex); 2177 ib_ctx.srv_ctx = ctx; 2178 ib_ctx.port = port; 2179 2180 err = ib_register_client(&rtrs_srv_client); 2181 if (err) { 2182 free_srv_ctx(ctx); 2183 return ERR_PTR(err); 2184 } 2185 2186 return ctx; 2187 } 2188 EXPORT_SYMBOL(rtrs_srv_open); 2189 2190 static void close_paths(struct rtrs_srv_sess *srv) 2191 { 2192 struct rtrs_srv_path *srv_path; 2193 2194 mutex_lock(&srv->paths_mutex); 2195 list_for_each_entry(srv_path, &srv->paths_list, s.entry) 2196 close_path(srv_path); 2197 mutex_unlock(&srv->paths_mutex); 2198 } 2199 2200 static void close_ctx(struct rtrs_srv_ctx *ctx) 2201 { 2202 struct rtrs_srv_sess *srv; 2203 2204 mutex_lock(&ctx->srv_mutex); 2205 list_for_each_entry(srv, &ctx->srv_list, ctx_list) 2206 close_paths(srv); 2207 mutex_unlock(&ctx->srv_mutex); 2208 flush_workqueue(rtrs_wq); 2209 } 2210 2211 /** 2212 * rtrs_srv_close() - close RTRS server context 2213 * @ctx: pointer to server context 2214 * 2215 * Closes RTRS server context with all client sessions. 2216 */ 2217 void rtrs_srv_close(struct rtrs_srv_ctx *ctx) 2218 { 2219 ib_unregister_client(&rtrs_srv_client); 2220 mutex_destroy(&ib_ctx.ib_dev_mutex); 2221 close_ctx(ctx); 2222 free_srv_ctx(ctx); 2223 } 2224 EXPORT_SYMBOL(rtrs_srv_close); 2225 2226 static int check_module_params(void) 2227 { 2228 if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) { 2229 pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n", 2230 sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH); 2231 return -EINVAL; 2232 } 2233 if (max_chunk_size < MIN_CHUNK_SIZE || !is_power_of_2(max_chunk_size)) { 2234 pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n", 2235 max_chunk_size, MIN_CHUNK_SIZE); 2236 return -EINVAL; 2237 } 2238 2239 /* 2240 * Check if IB immediate data size is enough to hold the mem_id and the 2241 * offset inside the memory chunk 2242 */ 2243 if ((ilog2(sess_queue_depth - 1) + 1) + 2244 (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) { 2245 pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n", 2246 MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size); 2247 return -EINVAL; 2248 } 2249 2250 return 0; 2251 } 2252 2253 static int __init rtrs_server_init(void) 2254 { 2255 int err; 2256 2257 pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n", 2258 KBUILD_MODNAME, RTRS_PROTO_VER_STRING, 2259 max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE, 2260 sess_queue_depth, always_invalidate); 2261 2262 rtrs_rdma_dev_pd_init(0, &dev_pd); 2263 2264 err = check_module_params(); 2265 if (err) { 2266 pr_err("Failed to load module, invalid module parameters, err: %d\n", 2267 err); 2268 return err; 2269 } 2270 rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server"); 2271 if (IS_ERR(rtrs_dev_class)) { 2272 err = PTR_ERR(rtrs_dev_class); 2273 goto out_err; 2274 } 2275 rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0); 2276 if (!rtrs_wq) { 2277 err = -ENOMEM; 2278 goto out_dev_class; 2279 } 2280 2281 return 0; 2282 2283 out_dev_class: 2284 class_destroy(rtrs_dev_class); 2285 out_err: 2286 return err; 2287 } 2288 2289 static void __exit rtrs_server_exit(void) 2290 { 2291 destroy_workqueue(rtrs_wq); 2292 class_destroy(rtrs_dev_class); 2293 rtrs_rdma_dev_pd_deinit(&dev_pd); 2294 } 2295 2296 module_init(rtrs_server_init); 2297 module_exit(rtrs_server_exit); 2298