1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Network Block Driver 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 #include <linux/blkdev.h> 15 #include <linux/hdreg.h> 16 #include <linux/scatterlist.h> 17 #include <linux/idr.h> 18 19 #include "rnbd-clt.h" 20 21 MODULE_DESCRIPTION("RDMA Network Block Device Client"); 22 MODULE_LICENSE("GPL"); 23 24 static int rnbd_client_major; 25 static DEFINE_IDA(index_ida); 26 static DEFINE_MUTEX(ida_lock); 27 static DEFINE_MUTEX(sess_lock); 28 static LIST_HEAD(sess_list); 29 30 /* 31 * Maximum number of partitions an instance can have. 32 * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself) 33 */ 34 #define RNBD_PART_BITS 6 35 36 static inline bool rnbd_clt_get_sess(struct rnbd_clt_session *sess) 37 { 38 return refcount_inc_not_zero(&sess->refcount); 39 } 40 41 static void free_sess(struct rnbd_clt_session *sess); 42 43 static void rnbd_clt_put_sess(struct rnbd_clt_session *sess) 44 { 45 might_sleep(); 46 47 if (refcount_dec_and_test(&sess->refcount)) 48 free_sess(sess); 49 } 50 51 static void rnbd_clt_put_dev(struct rnbd_clt_dev *dev) 52 { 53 might_sleep(); 54 55 if (!refcount_dec_and_test(&dev->refcount)) 56 return; 57 58 mutex_lock(&ida_lock); 59 ida_simple_remove(&index_ida, dev->clt_device_id); 60 mutex_unlock(&ida_lock); 61 kfree(dev->hw_queues); 62 kfree(dev->pathname); 63 rnbd_clt_put_sess(dev->sess); 64 mutex_destroy(&dev->lock); 65 kfree(dev); 66 } 67 68 static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev *dev) 69 { 70 return refcount_inc_not_zero(&dev->refcount); 71 } 72 73 static int rnbd_clt_set_dev_attr(struct rnbd_clt_dev *dev, 74 const struct rnbd_msg_open_rsp *rsp) 75 { 76 struct rnbd_clt_session *sess = dev->sess; 77 78 if (!rsp->logical_block_size) 79 return -EINVAL; 80 81 dev->device_id = le32_to_cpu(rsp->device_id); 82 dev->nsectors = le64_to_cpu(rsp->nsectors); 83 dev->logical_block_size = le16_to_cpu(rsp->logical_block_size); 84 dev->physical_block_size = le16_to_cpu(rsp->physical_block_size); 85 dev->max_write_same_sectors = le32_to_cpu(rsp->max_write_same_sectors); 86 dev->max_discard_sectors = le32_to_cpu(rsp->max_discard_sectors); 87 dev->discard_granularity = le32_to_cpu(rsp->discard_granularity); 88 dev->discard_alignment = le32_to_cpu(rsp->discard_alignment); 89 dev->secure_discard = le16_to_cpu(rsp->secure_discard); 90 dev->rotational = rsp->rotational; 91 dev->wc = !!(rsp->cache_policy & RNBD_WRITEBACK); 92 dev->fua = !!(rsp->cache_policy & RNBD_FUA); 93 94 dev->max_hw_sectors = sess->max_io_size / SECTOR_SIZE; 95 dev->max_segments = BMAX_SEGMENTS; 96 97 return 0; 98 } 99 100 static int rnbd_clt_change_capacity(struct rnbd_clt_dev *dev, 101 size_t new_nsectors) 102 { 103 rnbd_clt_info(dev, "Device size changed from %zu to %zu sectors\n", 104 dev->nsectors, new_nsectors); 105 dev->nsectors = new_nsectors; 106 set_capacity_and_notify(dev->gd, dev->nsectors); 107 return 0; 108 } 109 110 static int process_msg_open_rsp(struct rnbd_clt_dev *dev, 111 struct rnbd_msg_open_rsp *rsp) 112 { 113 int err = 0; 114 115 mutex_lock(&dev->lock); 116 if (dev->dev_state == DEV_STATE_UNMAPPED) { 117 rnbd_clt_info(dev, 118 "Ignoring Open-Response message from server for unmapped device\n"); 119 err = -ENOENT; 120 goto out; 121 } 122 if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) { 123 u64 nsectors = le64_to_cpu(rsp->nsectors); 124 125 /* 126 * If the device was remapped and the size changed in the 127 * meantime we need to revalidate it 128 */ 129 if (dev->nsectors != nsectors) 130 rnbd_clt_change_capacity(dev, nsectors); 131 rnbd_clt_info(dev, "Device online, device remapped successfully\n"); 132 } 133 err = rnbd_clt_set_dev_attr(dev, rsp); 134 if (err) 135 goto out; 136 dev->dev_state = DEV_STATE_MAPPED; 137 138 out: 139 mutex_unlock(&dev->lock); 140 141 return err; 142 } 143 144 int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, size_t newsize) 145 { 146 int ret = 0; 147 148 mutex_lock(&dev->lock); 149 if (dev->dev_state != DEV_STATE_MAPPED) { 150 pr_err("Failed to set new size of the device, device is not opened\n"); 151 ret = -ENOENT; 152 goto out; 153 } 154 ret = rnbd_clt_change_capacity(dev, newsize); 155 156 out: 157 mutex_unlock(&dev->lock); 158 159 return ret; 160 } 161 162 static inline void rnbd_clt_dev_requeue(struct rnbd_queue *q) 163 { 164 if (WARN_ON(!q->hctx)) 165 return; 166 167 /* We can come here from interrupt, thus async=true */ 168 blk_mq_run_hw_queue(q->hctx, true); 169 } 170 171 enum { 172 RNBD_DELAY_IFBUSY = -1, 173 }; 174 175 /** 176 * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun 177 * @sess: Session to find a queue for 178 * @cpu: Cpu to start the search from 179 * 180 * Description: 181 * Each CPU has a list of HW queues, which needs to be rerun. If a list 182 * is not empty - it is marked with a bit. This function finds first 183 * set bit in a bitmap and returns corresponding CPU list. 184 */ 185 static struct rnbd_cpu_qlist * 186 rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu) 187 { 188 int bit; 189 190 /* Search from cpu to nr_cpu_ids */ 191 bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu); 192 if (bit < nr_cpu_ids) { 193 return per_cpu_ptr(sess->cpu_queues, bit); 194 } else if (cpu != 0) { 195 /* Search from 0 to cpu */ 196 bit = find_next_bit(sess->cpu_queues_bm, cpu, 0); 197 if (bit < cpu) 198 return per_cpu_ptr(sess->cpu_queues, bit); 199 } 200 201 return NULL; 202 } 203 204 static inline int nxt_cpu(int cpu) 205 { 206 return (cpu + 1) % nr_cpu_ids; 207 } 208 209 /** 210 * rnbd_rerun_if_needed() - rerun next queue marked as stopped 211 * @sess: Session to rerun a queue on 212 * 213 * Description: 214 * Each CPU has it's own list of HW queues, which should be rerun. 215 * Function finds such list with HW queues, takes a list lock, picks up 216 * the first HW queue out of the list and requeues it. 217 * 218 * Return: 219 * True if the queue was requeued, false otherwise. 220 * 221 * Context: 222 * Does not matter. 223 */ 224 static bool rnbd_rerun_if_needed(struct rnbd_clt_session *sess) 225 { 226 struct rnbd_queue *q = NULL; 227 struct rnbd_cpu_qlist *cpu_q; 228 unsigned long flags; 229 int *cpup; 230 231 /* 232 * To keep fairness and not to let other queues starve we always 233 * try to wake up someone else in round-robin manner. That of course 234 * increases latency but queues always have a chance to be executed. 235 */ 236 cpup = get_cpu_ptr(sess->cpu_rr); 237 for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q; 238 cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) { 239 if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags)) 240 continue; 241 if (unlikely(!test_bit(cpu_q->cpu, sess->cpu_queues_bm))) 242 goto unlock; 243 q = list_first_entry_or_null(&cpu_q->requeue_list, 244 typeof(*q), requeue_list); 245 if (WARN_ON(!q)) 246 goto clear_bit; 247 list_del_init(&q->requeue_list); 248 clear_bit_unlock(0, &q->in_list); 249 250 if (list_empty(&cpu_q->requeue_list)) { 251 /* Clear bit if nothing is left */ 252 clear_bit: 253 clear_bit(cpu_q->cpu, sess->cpu_queues_bm); 254 } 255 unlock: 256 spin_unlock_irqrestore(&cpu_q->requeue_lock, flags); 257 258 if (q) 259 break; 260 } 261 262 /** 263 * Saves the CPU that is going to be requeued on the per-cpu var. Just 264 * incrementing it doesn't work because rnbd_get_cpu_qlist() will 265 * always return the first CPU with something on the queue list when the 266 * value stored on the var is greater than the last CPU with something 267 * on the list. 268 */ 269 if (cpu_q) 270 *cpup = cpu_q->cpu; 271 put_cpu_var(sess->cpu_rr); 272 273 if (q) 274 rnbd_clt_dev_requeue(q); 275 276 return q; 277 } 278 279 /** 280 * rnbd_rerun_all_if_idle() - rerun all queues left in the list if 281 * session is idling (there are no requests 282 * in-flight). 283 * @sess: Session to rerun the queues on 284 * 285 * Description: 286 * This function tries to rerun all stopped queues if there are no 287 * requests in-flight anymore. This function tries to solve an obvious 288 * problem, when number of tags < than number of queues (hctx), which 289 * are stopped and put to sleep. If last permit, which has been just put, 290 * does not wake up all left queues (hctxs), IO requests hang forever. 291 * 292 * That can happen when all number of permits, say N, have been exhausted 293 * from one CPU, and we have many block devices per session, say M. 294 * Each block device has it's own queue (hctx) for each CPU, so eventually 295 * we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids. 296 * If number of permits N < M x nr_cpu_ids finally we will get an IO hang. 297 * 298 * To avoid this hang last caller of rnbd_put_permit() (last caller is the 299 * one who observes sess->busy == 0) must wake up all remaining queues. 300 * 301 * Context: 302 * Does not matter. 303 */ 304 static void rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess) 305 { 306 bool requeued; 307 308 do { 309 requeued = rnbd_rerun_if_needed(sess); 310 } while (atomic_read(&sess->busy) == 0 && requeued); 311 } 312 313 static struct rtrs_permit *rnbd_get_permit(struct rnbd_clt_session *sess, 314 enum rtrs_clt_con_type con_type, 315 int wait) 316 { 317 struct rtrs_permit *permit; 318 319 permit = rtrs_clt_get_permit(sess->rtrs, con_type, 320 wait ? RTRS_PERMIT_WAIT : 321 RTRS_PERMIT_NOWAIT); 322 if (likely(permit)) 323 /* We have a subtle rare case here, when all permits can be 324 * consumed before busy counter increased. This is safe, 325 * because loser will get NULL as a permit, observe 0 busy 326 * counter and immediately restart the queue himself. 327 */ 328 atomic_inc(&sess->busy); 329 330 return permit; 331 } 332 333 static void rnbd_put_permit(struct rnbd_clt_session *sess, 334 struct rtrs_permit *permit) 335 { 336 rtrs_clt_put_permit(sess->rtrs, permit); 337 atomic_dec(&sess->busy); 338 /* Paired with rnbd_clt_dev_add_to_requeue(). Decrement first 339 * and then check queue bits. 340 */ 341 smp_mb__after_atomic(); 342 rnbd_rerun_all_if_idle(sess); 343 } 344 345 static struct rnbd_iu *rnbd_get_iu(struct rnbd_clt_session *sess, 346 enum rtrs_clt_con_type con_type, 347 int wait) 348 { 349 struct rnbd_iu *iu; 350 struct rtrs_permit *permit; 351 352 iu = kzalloc(sizeof(*iu), GFP_KERNEL); 353 if (!iu) { 354 return NULL; 355 } 356 357 permit = rnbd_get_permit(sess, con_type, 358 wait ? RTRS_PERMIT_WAIT : 359 RTRS_PERMIT_NOWAIT); 360 if (unlikely(!permit)) { 361 kfree(iu); 362 return NULL; 363 } 364 365 iu->permit = permit; 366 /* 367 * 1st reference is dropped after finishing sending a "user" message, 368 * 2nd reference is dropped after confirmation with the response is 369 * returned. 370 * 1st and 2nd can happen in any order, so the rnbd_iu should be 371 * released (rtrs_permit returned to rtrs) only after both 372 * are finished. 373 */ 374 atomic_set(&iu->refcount, 2); 375 init_waitqueue_head(&iu->comp.wait); 376 iu->comp.errno = INT_MAX; 377 378 return iu; 379 } 380 381 static void rnbd_put_iu(struct rnbd_clt_session *sess, struct rnbd_iu *iu) 382 { 383 if (atomic_dec_and_test(&iu->refcount)) { 384 rnbd_put_permit(sess, iu->permit); 385 kfree(iu); 386 } 387 } 388 389 static void rnbd_softirq_done_fn(struct request *rq) 390 { 391 struct rnbd_clt_dev *dev = rq->rq_disk->private_data; 392 struct rnbd_clt_session *sess = dev->sess; 393 struct rnbd_iu *iu; 394 395 iu = blk_mq_rq_to_pdu(rq); 396 sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT); 397 rnbd_put_permit(sess, iu->permit); 398 blk_mq_end_request(rq, errno_to_blk_status(iu->errno)); 399 } 400 401 static void msg_io_conf(void *priv, int errno) 402 { 403 struct rnbd_iu *iu = priv; 404 struct rnbd_clt_dev *dev = iu->dev; 405 struct request *rq = iu->rq; 406 int rw = rq_data_dir(rq); 407 408 iu->errno = errno; 409 410 blk_mq_complete_request(rq); 411 412 if (errno) 413 rnbd_clt_info_rl(dev, "%s I/O failed with err: %d\n", 414 rw == READ ? "read" : "write", errno); 415 } 416 417 static void wake_up_iu_comp(struct rnbd_iu *iu, int errno) 418 { 419 iu->comp.errno = errno; 420 wake_up(&iu->comp.wait); 421 } 422 423 static void msg_conf(void *priv, int errno) 424 { 425 struct rnbd_iu *iu = priv; 426 427 iu->errno = errno; 428 schedule_work(&iu->work); 429 } 430 431 enum wait_type { 432 NO_WAIT = 0, 433 WAIT = 1 434 }; 435 436 static int send_usr_msg(struct rtrs_clt *rtrs, int dir, 437 struct rnbd_iu *iu, struct kvec *vec, 438 size_t len, struct scatterlist *sg, unsigned int sg_len, 439 void (*conf)(struct work_struct *work), 440 int *errno, enum wait_type wait) 441 { 442 int err; 443 struct rtrs_clt_req_ops req_ops; 444 445 INIT_WORK(&iu->work, conf); 446 req_ops = (struct rtrs_clt_req_ops) { 447 .priv = iu, 448 .conf_fn = msg_conf, 449 }; 450 err = rtrs_clt_request(dir, &req_ops, rtrs, iu->permit, 451 vec, 1, len, sg, sg_len); 452 if (!err && wait) { 453 wait_event(iu->comp.wait, iu->comp.errno != INT_MAX); 454 *errno = iu->comp.errno; 455 } else { 456 *errno = 0; 457 } 458 459 return err; 460 } 461 462 static void msg_close_conf(struct work_struct *work) 463 { 464 struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); 465 struct rnbd_clt_dev *dev = iu->dev; 466 467 wake_up_iu_comp(iu, iu->errno); 468 rnbd_put_iu(dev->sess, iu); 469 rnbd_clt_put_dev(dev); 470 } 471 472 static int send_msg_close(struct rnbd_clt_dev *dev, u32 device_id, bool wait) 473 { 474 struct rnbd_clt_session *sess = dev->sess; 475 struct rnbd_msg_close msg; 476 struct rnbd_iu *iu; 477 struct kvec vec = { 478 .iov_base = &msg, 479 .iov_len = sizeof(msg) 480 }; 481 int err, errno; 482 483 iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); 484 if (!iu) 485 return -ENOMEM; 486 487 iu->buf = NULL; 488 iu->dev = dev; 489 490 sg_alloc_table(&iu->sgt, 1, GFP_KERNEL); 491 492 msg.hdr.type = cpu_to_le16(RNBD_MSG_CLOSE); 493 msg.device_id = cpu_to_le32(device_id); 494 495 WARN_ON(!rnbd_clt_get_dev(dev)); 496 err = send_usr_msg(sess->rtrs, WRITE, iu, &vec, 0, NULL, 0, 497 msg_close_conf, &errno, wait); 498 if (err) { 499 rnbd_clt_put_dev(dev); 500 rnbd_put_iu(sess, iu); 501 } else { 502 err = errno; 503 } 504 505 sg_free_table(&iu->sgt); 506 rnbd_put_iu(sess, iu); 507 return err; 508 } 509 510 static void msg_open_conf(struct work_struct *work) 511 { 512 struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); 513 struct rnbd_msg_open_rsp *rsp = iu->buf; 514 struct rnbd_clt_dev *dev = iu->dev; 515 int errno = iu->errno; 516 517 if (errno) { 518 rnbd_clt_err(dev, 519 "Opening failed, server responded: %d\n", 520 errno); 521 } else { 522 errno = process_msg_open_rsp(dev, rsp); 523 if (errno) { 524 u32 device_id = le32_to_cpu(rsp->device_id); 525 /* 526 * If server thinks its fine, but we fail to process 527 * then be nice and send a close to server. 528 */ 529 (void)send_msg_close(dev, device_id, NO_WAIT); 530 } 531 } 532 kfree(rsp); 533 wake_up_iu_comp(iu, errno); 534 rnbd_put_iu(dev->sess, iu); 535 rnbd_clt_put_dev(dev); 536 } 537 538 static void msg_sess_info_conf(struct work_struct *work) 539 { 540 struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); 541 struct rnbd_msg_sess_info_rsp *rsp = iu->buf; 542 struct rnbd_clt_session *sess = iu->sess; 543 544 if (!iu->errno) 545 sess->ver = min_t(u8, rsp->ver, RNBD_PROTO_VER_MAJOR); 546 547 kfree(rsp); 548 wake_up_iu_comp(iu, iu->errno); 549 rnbd_put_iu(sess, iu); 550 rnbd_clt_put_sess(sess); 551 } 552 553 static int send_msg_open(struct rnbd_clt_dev *dev, bool wait) 554 { 555 struct rnbd_clt_session *sess = dev->sess; 556 struct rnbd_msg_open_rsp *rsp; 557 struct rnbd_msg_open msg; 558 struct rnbd_iu *iu; 559 struct kvec vec = { 560 .iov_base = &msg, 561 .iov_len = sizeof(msg) 562 }; 563 int err, errno; 564 565 rsp = kzalloc(sizeof(*rsp), GFP_KERNEL); 566 if (!rsp) 567 return -ENOMEM; 568 569 iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); 570 if (!iu) { 571 kfree(rsp); 572 return -ENOMEM; 573 } 574 575 iu->buf = rsp; 576 iu->dev = dev; 577 578 sg_alloc_table(&iu->sgt, 1, GFP_KERNEL); 579 sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp)); 580 581 msg.hdr.type = cpu_to_le16(RNBD_MSG_OPEN); 582 msg.access_mode = dev->access_mode; 583 strlcpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name)); 584 585 WARN_ON(!rnbd_clt_get_dev(dev)); 586 err = send_usr_msg(sess->rtrs, READ, iu, 587 &vec, sizeof(*rsp), iu->sgt.sgl, 1, 588 msg_open_conf, &errno, wait); 589 if (err) { 590 rnbd_clt_put_dev(dev); 591 rnbd_put_iu(sess, iu); 592 kfree(rsp); 593 } else { 594 err = errno; 595 } 596 597 sg_free_table(&iu->sgt); 598 rnbd_put_iu(sess, iu); 599 return err; 600 } 601 602 static int send_msg_sess_info(struct rnbd_clt_session *sess, bool wait) 603 { 604 struct rnbd_msg_sess_info_rsp *rsp; 605 struct rnbd_msg_sess_info msg; 606 struct rnbd_iu *iu; 607 struct kvec vec = { 608 .iov_base = &msg, 609 .iov_len = sizeof(msg) 610 }; 611 int err, errno; 612 613 rsp = kzalloc(sizeof(*rsp), GFP_KERNEL); 614 if (!rsp) 615 return -ENOMEM; 616 617 iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); 618 if (!iu) { 619 kfree(rsp); 620 return -ENOMEM; 621 } 622 623 iu->buf = rsp; 624 iu->sess = sess; 625 626 sg_alloc_table(&iu->sgt, 1, GFP_KERNEL); 627 sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp)); 628 629 msg.hdr.type = cpu_to_le16(RNBD_MSG_SESS_INFO); 630 msg.ver = RNBD_PROTO_VER_MAJOR; 631 632 if (!rnbd_clt_get_sess(sess)) { 633 /* 634 * That can happen only in one case, when RTRS has restablished 635 * the connection and link_ev() is called, but session is almost 636 * dead, last reference on session is put and caller is waiting 637 * for RTRS to close everything. 638 */ 639 err = -ENODEV; 640 goto put_iu; 641 } 642 err = send_usr_msg(sess->rtrs, READ, iu, 643 &vec, sizeof(*rsp), iu->sgt.sgl, 1, 644 msg_sess_info_conf, &errno, wait); 645 if (err) { 646 rnbd_clt_put_sess(sess); 647 put_iu: 648 rnbd_put_iu(sess, iu); 649 kfree(rsp); 650 } else { 651 err = errno; 652 } 653 sg_free_table(&iu->sgt); 654 rnbd_put_iu(sess, iu); 655 return err; 656 } 657 658 static void set_dev_states_to_disconnected(struct rnbd_clt_session *sess) 659 { 660 struct rnbd_clt_dev *dev; 661 662 mutex_lock(&sess->lock); 663 list_for_each_entry(dev, &sess->devs_list, list) { 664 rnbd_clt_err(dev, "Device disconnected.\n"); 665 666 mutex_lock(&dev->lock); 667 if (dev->dev_state == DEV_STATE_MAPPED) 668 dev->dev_state = DEV_STATE_MAPPED_DISCONNECTED; 669 mutex_unlock(&dev->lock); 670 } 671 mutex_unlock(&sess->lock); 672 } 673 674 static void remap_devs(struct rnbd_clt_session *sess) 675 { 676 struct rnbd_clt_dev *dev; 677 struct rtrs_attrs attrs; 678 int err; 679 680 /* 681 * Careful here: we are called from RTRS link event directly, 682 * thus we can't send any RTRS request and wait for response 683 * or RTRS will not be able to complete request with failure 684 * if something goes wrong (failing of outstanding requests 685 * happens exactly from the context where we are blocking now). 686 * 687 * So to avoid deadlocks each usr message sent from here must 688 * be asynchronous. 689 */ 690 691 err = send_msg_sess_info(sess, NO_WAIT); 692 if (err) { 693 pr_err("send_msg_sess_info(\"%s\"): %d\n", sess->sessname, err); 694 return; 695 } 696 697 rtrs_clt_query(sess->rtrs, &attrs); 698 mutex_lock(&sess->lock); 699 sess->max_io_size = attrs.max_io_size; 700 701 list_for_each_entry(dev, &sess->devs_list, list) { 702 bool skip; 703 704 mutex_lock(&dev->lock); 705 skip = (dev->dev_state == DEV_STATE_INIT); 706 mutex_unlock(&dev->lock); 707 if (skip) 708 /* 709 * When device is establishing connection for the first 710 * time - do not remap, it will be closed soon. 711 */ 712 continue; 713 714 rnbd_clt_info(dev, "session reconnected, remapping device\n"); 715 err = send_msg_open(dev, NO_WAIT); 716 if (err) { 717 rnbd_clt_err(dev, "send_msg_open(): %d\n", err); 718 break; 719 } 720 } 721 mutex_unlock(&sess->lock); 722 } 723 724 static void rnbd_clt_link_ev(void *priv, enum rtrs_clt_link_ev ev) 725 { 726 struct rnbd_clt_session *sess = priv; 727 728 switch (ev) { 729 case RTRS_CLT_LINK_EV_DISCONNECTED: 730 set_dev_states_to_disconnected(sess); 731 break; 732 case RTRS_CLT_LINK_EV_RECONNECTED: 733 remap_devs(sess); 734 break; 735 default: 736 pr_err("Unknown session event received (%d), session: %s\n", 737 ev, sess->sessname); 738 } 739 } 740 741 static void rnbd_init_cpu_qlists(struct rnbd_cpu_qlist __percpu *cpu_queues) 742 { 743 unsigned int cpu; 744 struct rnbd_cpu_qlist *cpu_q; 745 746 for_each_possible_cpu(cpu) { 747 cpu_q = per_cpu_ptr(cpu_queues, cpu); 748 749 cpu_q->cpu = cpu; 750 INIT_LIST_HEAD(&cpu_q->requeue_list); 751 spin_lock_init(&cpu_q->requeue_lock); 752 } 753 } 754 755 static void destroy_mq_tags(struct rnbd_clt_session *sess) 756 { 757 if (sess->tag_set.tags) 758 blk_mq_free_tag_set(&sess->tag_set); 759 } 760 761 static inline void wake_up_rtrs_waiters(struct rnbd_clt_session *sess) 762 { 763 sess->rtrs_ready = true; 764 wake_up_all(&sess->rtrs_waitq); 765 } 766 767 static void close_rtrs(struct rnbd_clt_session *sess) 768 { 769 might_sleep(); 770 771 if (!IS_ERR_OR_NULL(sess->rtrs)) { 772 rtrs_clt_close(sess->rtrs); 773 sess->rtrs = NULL; 774 wake_up_rtrs_waiters(sess); 775 } 776 } 777 778 static void free_sess(struct rnbd_clt_session *sess) 779 { 780 WARN_ON(!list_empty(&sess->devs_list)); 781 782 might_sleep(); 783 784 close_rtrs(sess); 785 destroy_mq_tags(sess); 786 if (!list_empty(&sess->list)) { 787 mutex_lock(&sess_lock); 788 list_del(&sess->list); 789 mutex_unlock(&sess_lock); 790 } 791 free_percpu(sess->cpu_queues); 792 free_percpu(sess->cpu_rr); 793 mutex_destroy(&sess->lock); 794 kfree(sess); 795 } 796 797 static struct rnbd_clt_session *alloc_sess(const char *sessname) 798 { 799 struct rnbd_clt_session *sess; 800 int err, cpu; 801 802 sess = kzalloc_node(sizeof(*sess), GFP_KERNEL, NUMA_NO_NODE); 803 if (!sess) 804 return ERR_PTR(-ENOMEM); 805 strlcpy(sess->sessname, sessname, sizeof(sess->sessname)); 806 atomic_set(&sess->busy, 0); 807 mutex_init(&sess->lock); 808 INIT_LIST_HEAD(&sess->devs_list); 809 INIT_LIST_HEAD(&sess->list); 810 bitmap_zero(sess->cpu_queues_bm, NR_CPUS); 811 init_waitqueue_head(&sess->rtrs_waitq); 812 refcount_set(&sess->refcount, 1); 813 814 sess->cpu_queues = alloc_percpu(struct rnbd_cpu_qlist); 815 if (!sess->cpu_queues) { 816 err = -ENOMEM; 817 goto err; 818 } 819 rnbd_init_cpu_qlists(sess->cpu_queues); 820 821 /* 822 * That is simple percpu variable which stores cpu indices, which are 823 * incremented on each access. We need that for the sake of fairness 824 * to wake up queues in a round-robin manner. 825 */ 826 sess->cpu_rr = alloc_percpu(int); 827 if (!sess->cpu_rr) { 828 err = -ENOMEM; 829 goto err; 830 } 831 for_each_possible_cpu(cpu) 832 * per_cpu_ptr(sess->cpu_rr, cpu) = cpu; 833 834 return sess; 835 836 err: 837 free_sess(sess); 838 839 return ERR_PTR(err); 840 } 841 842 static int wait_for_rtrs_connection(struct rnbd_clt_session *sess) 843 { 844 wait_event(sess->rtrs_waitq, sess->rtrs_ready); 845 if (IS_ERR_OR_NULL(sess->rtrs)) 846 return -ECONNRESET; 847 848 return 0; 849 } 850 851 static void wait_for_rtrs_disconnection(struct rnbd_clt_session *sess) 852 __releases(&sess_lock) 853 __acquires(&sess_lock) 854 { 855 DEFINE_WAIT(wait); 856 857 prepare_to_wait(&sess->rtrs_waitq, &wait, TASK_UNINTERRUPTIBLE); 858 if (IS_ERR_OR_NULL(sess->rtrs)) { 859 finish_wait(&sess->rtrs_waitq, &wait); 860 return; 861 } 862 mutex_unlock(&sess_lock); 863 /* loop in caller, see __find_and_get_sess(). 864 * You can't leave mutex locked and call schedule(), you will catch a 865 * deadlock with a caller of free_sess(), which has just put the last 866 * reference and is about to take the sess_lock in order to delete 867 * the session from the list. 868 */ 869 schedule(); 870 mutex_lock(&sess_lock); 871 } 872 873 static struct rnbd_clt_session *__find_and_get_sess(const char *sessname) 874 __releases(&sess_lock) 875 __acquires(&sess_lock) 876 { 877 struct rnbd_clt_session *sess, *sn; 878 int err; 879 880 again: 881 list_for_each_entry_safe(sess, sn, &sess_list, list) { 882 if (strcmp(sessname, sess->sessname)) 883 continue; 884 885 if (sess->rtrs_ready && IS_ERR_OR_NULL(sess->rtrs)) 886 /* 887 * No RTRS connection, session is dying. 888 */ 889 continue; 890 891 if (rnbd_clt_get_sess(sess)) { 892 /* 893 * Alive session is found, wait for RTRS connection. 894 */ 895 mutex_unlock(&sess_lock); 896 err = wait_for_rtrs_connection(sess); 897 if (err) 898 rnbd_clt_put_sess(sess); 899 mutex_lock(&sess_lock); 900 901 if (err) 902 /* Session is dying, repeat the loop */ 903 goto again; 904 905 return sess; 906 } 907 /* 908 * Ref is 0, session is dying, wait for RTRS disconnect 909 * in order to avoid session names clashes. 910 */ 911 wait_for_rtrs_disconnection(sess); 912 /* 913 * RTRS is disconnected and soon session will be freed, 914 * so repeat a loop. 915 */ 916 goto again; 917 } 918 919 return NULL; 920 } 921 922 static struct 923 rnbd_clt_session *find_or_create_sess(const char *sessname, bool *first) 924 { 925 struct rnbd_clt_session *sess = NULL; 926 927 mutex_lock(&sess_lock); 928 sess = __find_and_get_sess(sessname); 929 if (!sess) { 930 sess = alloc_sess(sessname); 931 if (IS_ERR(sess)) { 932 mutex_unlock(&sess_lock); 933 return sess; 934 } 935 list_add(&sess->list, &sess_list); 936 *first = true; 937 } else 938 *first = false; 939 mutex_unlock(&sess_lock); 940 941 return sess; 942 } 943 944 static int rnbd_client_open(struct block_device *block_device, fmode_t mode) 945 { 946 struct rnbd_clt_dev *dev = block_device->bd_disk->private_data; 947 948 if (dev->read_only && (mode & FMODE_WRITE)) 949 return -EPERM; 950 951 if (dev->dev_state == DEV_STATE_UNMAPPED || 952 !rnbd_clt_get_dev(dev)) 953 return -EIO; 954 955 return 0; 956 } 957 958 static void rnbd_client_release(struct gendisk *gen, fmode_t mode) 959 { 960 struct rnbd_clt_dev *dev = gen->private_data; 961 962 rnbd_clt_put_dev(dev); 963 } 964 965 static int rnbd_client_getgeo(struct block_device *block_device, 966 struct hd_geometry *geo) 967 { 968 u64 size; 969 struct rnbd_clt_dev *dev; 970 971 dev = block_device->bd_disk->private_data; 972 size = dev->size * (dev->logical_block_size / SECTOR_SIZE); 973 geo->cylinders = size >> 6; /* size/64 */ 974 geo->heads = 4; 975 geo->sectors = 16; 976 geo->start = 0; 977 978 return 0; 979 } 980 981 static const struct block_device_operations rnbd_client_ops = { 982 .owner = THIS_MODULE, 983 .open = rnbd_client_open, 984 .release = rnbd_client_release, 985 .getgeo = rnbd_client_getgeo 986 }; 987 988 /* The amount of data that belongs to an I/O and the amount of data that 989 * should be read or written to the disk (bi_size) can differ. 990 * 991 * E.g. When WRITE_SAME is used, only a small amount of data is 992 * transferred that is then written repeatedly over a lot of sectors. 993 * 994 * Get the size of data to be transferred via RTRS by summing up the size 995 * of the scather-gather list entries. 996 */ 997 static size_t rnbd_clt_get_sg_size(struct scatterlist *sglist, u32 len) 998 { 999 struct scatterlist *sg; 1000 size_t tsize = 0; 1001 int i; 1002 1003 for_each_sg(sglist, sg, len, i) 1004 tsize += sg->length; 1005 return tsize; 1006 } 1007 1008 static int rnbd_client_xfer_request(struct rnbd_clt_dev *dev, 1009 struct request *rq, 1010 struct rnbd_iu *iu) 1011 { 1012 struct rtrs_clt *rtrs = dev->sess->rtrs; 1013 struct rtrs_permit *permit = iu->permit; 1014 struct rnbd_msg_io msg; 1015 struct rtrs_clt_req_ops req_ops; 1016 unsigned int sg_cnt = 0; 1017 struct kvec vec; 1018 size_t size; 1019 int err; 1020 1021 iu->rq = rq; 1022 iu->dev = dev; 1023 msg.sector = cpu_to_le64(blk_rq_pos(rq)); 1024 msg.bi_size = cpu_to_le32(blk_rq_bytes(rq)); 1025 msg.rw = cpu_to_le32(rq_to_rnbd_flags(rq)); 1026 msg.prio = cpu_to_le16(req_get_ioprio(rq)); 1027 1028 /* 1029 * We only support discards with single segment for now. 1030 * See queue limits. 1031 */ 1032 if (req_op(rq) != REQ_OP_DISCARD) 1033 sg_cnt = blk_rq_map_sg(dev->queue, rq, iu->sgt.sgl); 1034 1035 if (sg_cnt == 0) 1036 sg_mark_end(&iu->sgt.sgl[0]); 1037 1038 msg.hdr.type = cpu_to_le16(RNBD_MSG_IO); 1039 msg.device_id = cpu_to_le32(dev->device_id); 1040 1041 vec = (struct kvec) { 1042 .iov_base = &msg, 1043 .iov_len = sizeof(msg) 1044 }; 1045 size = rnbd_clt_get_sg_size(iu->sgt.sgl, sg_cnt); 1046 req_ops = (struct rtrs_clt_req_ops) { 1047 .priv = iu, 1048 .conf_fn = msg_io_conf, 1049 }; 1050 err = rtrs_clt_request(rq_data_dir(rq), &req_ops, rtrs, permit, 1051 &vec, 1, size, iu->sgt.sgl, sg_cnt); 1052 if (unlikely(err)) { 1053 rnbd_clt_err_rl(dev, "RTRS failed to transfer IO, err: %d\n", 1054 err); 1055 return err; 1056 } 1057 1058 return 0; 1059 } 1060 1061 /** 1062 * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy 1063 * @dev: Device to be checked 1064 * @q: Queue to be added to the requeue list if required 1065 * 1066 * Description: 1067 * If session is busy, that means someone will requeue us when resources 1068 * are freed. If session is not doing anything - device is not added to 1069 * the list and @false is returned. 1070 */ 1071 static bool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev *dev, 1072 struct rnbd_queue *q) 1073 { 1074 struct rnbd_clt_session *sess = dev->sess; 1075 struct rnbd_cpu_qlist *cpu_q; 1076 unsigned long flags; 1077 bool added = true; 1078 bool need_set; 1079 1080 cpu_q = get_cpu_ptr(sess->cpu_queues); 1081 spin_lock_irqsave(&cpu_q->requeue_lock, flags); 1082 1083 if (likely(!test_and_set_bit_lock(0, &q->in_list))) { 1084 if (WARN_ON(!list_empty(&q->requeue_list))) 1085 goto unlock; 1086 1087 need_set = !test_bit(cpu_q->cpu, sess->cpu_queues_bm); 1088 if (need_set) { 1089 set_bit(cpu_q->cpu, sess->cpu_queues_bm); 1090 /* Paired with rnbd_put_permit(). Set a bit first 1091 * and then observe the busy counter. 1092 */ 1093 smp_mb__before_atomic(); 1094 } 1095 if (likely(atomic_read(&sess->busy))) { 1096 list_add_tail(&q->requeue_list, &cpu_q->requeue_list); 1097 } else { 1098 /* Very unlikely, but possible: busy counter was 1099 * observed as zero. Drop all bits and return 1100 * false to restart the queue by ourselves. 1101 */ 1102 if (need_set) 1103 clear_bit(cpu_q->cpu, sess->cpu_queues_bm); 1104 clear_bit_unlock(0, &q->in_list); 1105 added = false; 1106 } 1107 } 1108 unlock: 1109 spin_unlock_irqrestore(&cpu_q->requeue_lock, flags); 1110 put_cpu_ptr(sess->cpu_queues); 1111 1112 return added; 1113 } 1114 1115 static void rnbd_clt_dev_kick_mq_queue(struct rnbd_clt_dev *dev, 1116 struct blk_mq_hw_ctx *hctx, 1117 int delay) 1118 { 1119 struct rnbd_queue *q = hctx->driver_data; 1120 1121 if (delay != RNBD_DELAY_IFBUSY) 1122 blk_mq_delay_run_hw_queue(hctx, delay); 1123 else if (unlikely(!rnbd_clt_dev_add_to_requeue(dev, q))) 1124 /* 1125 * If session is not busy we have to restart 1126 * the queue ourselves. 1127 */ 1128 blk_mq_delay_run_hw_queue(hctx, 10/*ms*/); 1129 } 1130 1131 static blk_status_t rnbd_queue_rq(struct blk_mq_hw_ctx *hctx, 1132 const struct blk_mq_queue_data *bd) 1133 { 1134 struct request *rq = bd->rq; 1135 struct rnbd_clt_dev *dev = rq->rq_disk->private_data; 1136 struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq); 1137 int err; 1138 blk_status_t ret = BLK_STS_IOERR; 1139 1140 if (unlikely(dev->dev_state != DEV_STATE_MAPPED)) 1141 return BLK_STS_IOERR; 1142 1143 iu->permit = rnbd_get_permit(dev->sess, RTRS_IO_CON, 1144 RTRS_PERMIT_NOWAIT); 1145 if (unlikely(!iu->permit)) { 1146 rnbd_clt_dev_kick_mq_queue(dev, hctx, RNBD_DELAY_IFBUSY); 1147 return BLK_STS_RESOURCE; 1148 } 1149 1150 iu->sgt.sgl = iu->first_sgl; 1151 err = sg_alloc_table_chained(&iu->sgt, 1152 /* Even-if the request has no segment, 1153 * sglist must have one entry at least */ 1154 blk_rq_nr_phys_segments(rq) ? : 1, 1155 iu->sgt.sgl, 1156 RNBD_INLINE_SG_CNT); 1157 if (err) { 1158 rnbd_clt_err_rl(dev, "sg_alloc_table_chained ret=%d\n", err); 1159 rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/); 1160 rnbd_put_permit(dev->sess, iu->permit); 1161 return BLK_STS_RESOURCE; 1162 } 1163 1164 blk_mq_start_request(rq); 1165 err = rnbd_client_xfer_request(dev, rq, iu); 1166 if (likely(err == 0)) 1167 return BLK_STS_OK; 1168 if (unlikely(err == -EAGAIN || err == -ENOMEM)) { 1169 rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/); 1170 ret = BLK_STS_RESOURCE; 1171 } 1172 sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT); 1173 rnbd_put_permit(dev->sess, iu->permit); 1174 return ret; 1175 } 1176 1177 static struct blk_mq_ops rnbd_mq_ops = { 1178 .queue_rq = rnbd_queue_rq, 1179 .complete = rnbd_softirq_done_fn, 1180 }; 1181 1182 static int setup_mq_tags(struct rnbd_clt_session *sess) 1183 { 1184 struct blk_mq_tag_set *tag_set = &sess->tag_set; 1185 1186 memset(tag_set, 0, sizeof(*tag_set)); 1187 tag_set->ops = &rnbd_mq_ops; 1188 tag_set->queue_depth = sess->queue_depth; 1189 tag_set->numa_node = NUMA_NO_NODE; 1190 tag_set->flags = BLK_MQ_F_SHOULD_MERGE | 1191 BLK_MQ_F_TAG_QUEUE_SHARED; 1192 tag_set->cmd_size = sizeof(struct rnbd_iu) + RNBD_RDMA_SGL_SIZE; 1193 tag_set->nr_hw_queues = num_online_cpus(); 1194 1195 return blk_mq_alloc_tag_set(tag_set); 1196 } 1197 1198 static struct rnbd_clt_session * 1199 find_and_get_or_create_sess(const char *sessname, 1200 const struct rtrs_addr *paths, 1201 size_t path_cnt, u16 port_nr) 1202 { 1203 struct rnbd_clt_session *sess; 1204 struct rtrs_attrs attrs; 1205 int err; 1206 bool first; 1207 struct rtrs_clt_ops rtrs_ops; 1208 1209 sess = find_or_create_sess(sessname, &first); 1210 if (sess == ERR_PTR(-ENOMEM)) 1211 return ERR_PTR(-ENOMEM); 1212 else if (!first) 1213 return sess; 1214 1215 if (!path_cnt) { 1216 pr_err("Session %s not found, and path parameter not given", sessname); 1217 err = -ENXIO; 1218 goto put_sess; 1219 } 1220 1221 rtrs_ops = (struct rtrs_clt_ops) { 1222 .priv = sess, 1223 .link_ev = rnbd_clt_link_ev, 1224 }; 1225 /* 1226 * Nothing was found, establish rtrs connection and proceed further. 1227 */ 1228 sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname, 1229 paths, path_cnt, port_nr, 1230 0, /* Do not use pdu of rtrs */ 1231 RECONNECT_DELAY, BMAX_SEGMENTS, 1232 BLK_MAX_SEGMENT_SIZE, 1233 MAX_RECONNECTS); 1234 if (IS_ERR(sess->rtrs)) { 1235 err = PTR_ERR(sess->rtrs); 1236 goto wake_up_and_put; 1237 } 1238 rtrs_clt_query(sess->rtrs, &attrs); 1239 sess->max_io_size = attrs.max_io_size; 1240 sess->queue_depth = attrs.queue_depth; 1241 1242 err = setup_mq_tags(sess); 1243 if (err) 1244 goto close_rtrs; 1245 1246 err = send_msg_sess_info(sess, WAIT); 1247 if (err) 1248 goto close_rtrs; 1249 1250 wake_up_rtrs_waiters(sess); 1251 1252 return sess; 1253 1254 close_rtrs: 1255 close_rtrs(sess); 1256 put_sess: 1257 rnbd_clt_put_sess(sess); 1258 1259 return ERR_PTR(err); 1260 1261 wake_up_and_put: 1262 wake_up_rtrs_waiters(sess); 1263 goto put_sess; 1264 } 1265 1266 static inline void rnbd_init_hw_queue(struct rnbd_clt_dev *dev, 1267 struct rnbd_queue *q, 1268 struct blk_mq_hw_ctx *hctx) 1269 { 1270 INIT_LIST_HEAD(&q->requeue_list); 1271 q->dev = dev; 1272 q->hctx = hctx; 1273 } 1274 1275 static void rnbd_init_mq_hw_queues(struct rnbd_clt_dev *dev) 1276 { 1277 int i; 1278 struct blk_mq_hw_ctx *hctx; 1279 struct rnbd_queue *q; 1280 1281 queue_for_each_hw_ctx(dev->queue, hctx, i) { 1282 q = &dev->hw_queues[i]; 1283 rnbd_init_hw_queue(dev, q, hctx); 1284 hctx->driver_data = q; 1285 } 1286 } 1287 1288 static int setup_mq_dev(struct rnbd_clt_dev *dev) 1289 { 1290 dev->queue = blk_mq_init_queue(&dev->sess->tag_set); 1291 if (IS_ERR(dev->queue)) { 1292 rnbd_clt_err(dev, "Initializing multiqueue queue failed, err: %ld\n", 1293 PTR_ERR(dev->queue)); 1294 return PTR_ERR(dev->queue); 1295 } 1296 rnbd_init_mq_hw_queues(dev); 1297 return 0; 1298 } 1299 1300 static void setup_request_queue(struct rnbd_clt_dev *dev) 1301 { 1302 blk_queue_logical_block_size(dev->queue, dev->logical_block_size); 1303 blk_queue_physical_block_size(dev->queue, dev->physical_block_size); 1304 blk_queue_max_hw_sectors(dev->queue, dev->max_hw_sectors); 1305 blk_queue_max_write_same_sectors(dev->queue, 1306 dev->max_write_same_sectors); 1307 1308 /* 1309 * we don't support discards to "discontiguous" segments 1310 * in on request 1311 */ 1312 blk_queue_max_discard_segments(dev->queue, 1); 1313 1314 blk_queue_max_discard_sectors(dev->queue, dev->max_discard_sectors); 1315 dev->queue->limits.discard_granularity = dev->discard_granularity; 1316 dev->queue->limits.discard_alignment = dev->discard_alignment; 1317 if (dev->max_discard_sectors) 1318 blk_queue_flag_set(QUEUE_FLAG_DISCARD, dev->queue); 1319 if (dev->secure_discard) 1320 blk_queue_flag_set(QUEUE_FLAG_SECERASE, dev->queue); 1321 1322 blk_queue_flag_set(QUEUE_FLAG_SAME_COMP, dev->queue); 1323 blk_queue_flag_set(QUEUE_FLAG_SAME_FORCE, dev->queue); 1324 blk_queue_max_segments(dev->queue, dev->max_segments); 1325 blk_queue_io_opt(dev->queue, dev->sess->max_io_size); 1326 blk_queue_virt_boundary(dev->queue, SZ_4K - 1); 1327 blk_queue_write_cache(dev->queue, dev->wc, dev->fua); 1328 dev->queue->queuedata = dev; 1329 } 1330 1331 static void rnbd_clt_setup_gen_disk(struct rnbd_clt_dev *dev, int idx) 1332 { 1333 dev->gd->major = rnbd_client_major; 1334 dev->gd->first_minor = idx << RNBD_PART_BITS; 1335 dev->gd->fops = &rnbd_client_ops; 1336 dev->gd->queue = dev->queue; 1337 dev->gd->private_data = dev; 1338 snprintf(dev->gd->disk_name, sizeof(dev->gd->disk_name), "rnbd%d", 1339 idx); 1340 pr_debug("disk_name=%s, capacity=%zu\n", 1341 dev->gd->disk_name, 1342 dev->nsectors * (dev->logical_block_size / SECTOR_SIZE) 1343 ); 1344 1345 set_capacity(dev->gd, dev->nsectors); 1346 1347 if (dev->access_mode == RNBD_ACCESS_RO) { 1348 dev->read_only = true; 1349 set_disk_ro(dev->gd, true); 1350 } else { 1351 dev->read_only = false; 1352 } 1353 1354 if (!dev->rotational) 1355 blk_queue_flag_set(QUEUE_FLAG_NONROT, dev->queue); 1356 } 1357 1358 static int rnbd_client_setup_device(struct rnbd_clt_session *sess, 1359 struct rnbd_clt_dev *dev, int idx) 1360 { 1361 int err; 1362 1363 dev->size = dev->nsectors * dev->logical_block_size; 1364 1365 err = setup_mq_dev(dev); 1366 if (err) 1367 return err; 1368 1369 setup_request_queue(dev); 1370 1371 dev->gd = alloc_disk_node(1 << RNBD_PART_BITS, NUMA_NO_NODE); 1372 if (!dev->gd) { 1373 blk_cleanup_queue(dev->queue); 1374 return -ENOMEM; 1375 } 1376 1377 rnbd_clt_setup_gen_disk(dev, idx); 1378 1379 return 0; 1380 } 1381 1382 static struct rnbd_clt_dev *init_dev(struct rnbd_clt_session *sess, 1383 enum rnbd_access_mode access_mode, 1384 const char *pathname) 1385 { 1386 struct rnbd_clt_dev *dev; 1387 int ret; 1388 1389 dev = kzalloc_node(sizeof(*dev), GFP_KERNEL, NUMA_NO_NODE); 1390 if (!dev) 1391 return ERR_PTR(-ENOMEM); 1392 1393 dev->hw_queues = kcalloc(nr_cpu_ids, sizeof(*dev->hw_queues), 1394 GFP_KERNEL); 1395 if (!dev->hw_queues) { 1396 ret = -ENOMEM; 1397 goto out_alloc; 1398 } 1399 1400 mutex_lock(&ida_lock); 1401 ret = ida_simple_get(&index_ida, 0, 1 << (MINORBITS - RNBD_PART_BITS), 1402 GFP_KERNEL); 1403 mutex_unlock(&ida_lock); 1404 if (ret < 0) { 1405 pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n", 1406 pathname, sess->sessname, ret); 1407 goto out_queues; 1408 } 1409 1410 dev->pathname = kstrdup(pathname, GFP_KERNEL); 1411 if (!dev->pathname) { 1412 ret = -ENOMEM; 1413 goto out_queues; 1414 } 1415 1416 dev->clt_device_id = ret; 1417 dev->sess = sess; 1418 dev->access_mode = access_mode; 1419 mutex_init(&dev->lock); 1420 refcount_set(&dev->refcount, 1); 1421 dev->dev_state = DEV_STATE_INIT; 1422 1423 /* 1424 * Here we called from sysfs entry, thus clt-sysfs is 1425 * responsible that session will not disappear. 1426 */ 1427 WARN_ON(!rnbd_clt_get_sess(sess)); 1428 1429 return dev; 1430 1431 out_queues: 1432 kfree(dev->hw_queues); 1433 out_alloc: 1434 kfree(dev); 1435 return ERR_PTR(ret); 1436 } 1437 1438 static bool __exists_dev(const char *pathname, const char *sessname) 1439 { 1440 struct rnbd_clt_session *sess; 1441 struct rnbd_clt_dev *dev; 1442 bool found = false; 1443 1444 list_for_each_entry(sess, &sess_list, list) { 1445 if (sessname && strncmp(sess->sessname, sessname, 1446 sizeof(sess->sessname))) 1447 continue; 1448 mutex_lock(&sess->lock); 1449 list_for_each_entry(dev, &sess->devs_list, list) { 1450 if (strlen(dev->pathname) == strlen(pathname) && 1451 !strcmp(dev->pathname, pathname)) { 1452 found = true; 1453 break; 1454 } 1455 } 1456 mutex_unlock(&sess->lock); 1457 if (found) 1458 break; 1459 } 1460 1461 return found; 1462 } 1463 1464 static bool exists_devpath(const char *pathname, const char *sessname) 1465 { 1466 bool found; 1467 1468 mutex_lock(&sess_lock); 1469 found = __exists_dev(pathname, sessname); 1470 mutex_unlock(&sess_lock); 1471 1472 return found; 1473 } 1474 1475 static bool insert_dev_if_not_exists_devpath(const char *pathname, 1476 struct rnbd_clt_session *sess, 1477 struct rnbd_clt_dev *dev) 1478 { 1479 bool found; 1480 1481 mutex_lock(&sess_lock); 1482 found = __exists_dev(pathname, sess->sessname); 1483 if (!found) { 1484 mutex_lock(&sess->lock); 1485 list_add_tail(&dev->list, &sess->devs_list); 1486 mutex_unlock(&sess->lock); 1487 } 1488 mutex_unlock(&sess_lock); 1489 1490 return found; 1491 } 1492 1493 static void delete_dev(struct rnbd_clt_dev *dev) 1494 { 1495 struct rnbd_clt_session *sess = dev->sess; 1496 1497 mutex_lock(&sess->lock); 1498 list_del(&dev->list); 1499 mutex_unlock(&sess->lock); 1500 } 1501 1502 struct rnbd_clt_dev *rnbd_clt_map_device(const char *sessname, 1503 struct rtrs_addr *paths, 1504 size_t path_cnt, u16 port_nr, 1505 const char *pathname, 1506 enum rnbd_access_mode access_mode) 1507 { 1508 struct rnbd_clt_session *sess; 1509 struct rnbd_clt_dev *dev; 1510 int ret; 1511 1512 if (unlikely(exists_devpath(pathname, sessname))) 1513 return ERR_PTR(-EEXIST); 1514 1515 sess = find_and_get_or_create_sess(sessname, paths, path_cnt, port_nr); 1516 if (IS_ERR(sess)) 1517 return ERR_CAST(sess); 1518 1519 dev = init_dev(sess, access_mode, pathname); 1520 if (IS_ERR(dev)) { 1521 pr_err("map_device: failed to map device '%s' from session %s, can't initialize device, err: %ld\n", 1522 pathname, sess->sessname, PTR_ERR(dev)); 1523 ret = PTR_ERR(dev); 1524 goto put_sess; 1525 } 1526 if (insert_dev_if_not_exists_devpath(pathname, sess, dev)) { 1527 ret = -EEXIST; 1528 goto put_dev; 1529 } 1530 ret = send_msg_open(dev, WAIT); 1531 if (ret) { 1532 rnbd_clt_err(dev, 1533 "map_device: failed, can't open remote device, err: %d\n", 1534 ret); 1535 goto del_dev; 1536 } 1537 mutex_lock(&dev->lock); 1538 pr_debug("Opened remote device: session=%s, path='%s'\n", 1539 sess->sessname, pathname); 1540 ret = rnbd_client_setup_device(sess, dev, dev->clt_device_id); 1541 if (ret) { 1542 rnbd_clt_err(dev, 1543 "map_device: Failed to configure device, err: %d\n", 1544 ret); 1545 mutex_unlock(&dev->lock); 1546 goto send_close; 1547 } 1548 1549 rnbd_clt_info(dev, 1550 "map_device: Device mapped as %s (nsectors: %zu, logical_block_size: %d, physical_block_size: %d, max_write_same_sectors: %d, max_discard_sectors: %d, discard_granularity: %d, discard_alignment: %d, secure_discard: %d, max_segments: %d, max_hw_sectors: %d, rotational: %d, wc: %d, fua: %d)\n", 1551 dev->gd->disk_name, dev->nsectors, 1552 dev->logical_block_size, dev->physical_block_size, 1553 dev->max_write_same_sectors, dev->max_discard_sectors, 1554 dev->discard_granularity, dev->discard_alignment, 1555 dev->secure_discard, dev->max_segments, 1556 dev->max_hw_sectors, dev->rotational, dev->wc, dev->fua); 1557 1558 mutex_unlock(&dev->lock); 1559 1560 add_disk(dev->gd); 1561 rnbd_clt_put_sess(sess); 1562 1563 return dev; 1564 1565 send_close: 1566 send_msg_close(dev, dev->device_id, WAIT); 1567 del_dev: 1568 delete_dev(dev); 1569 put_dev: 1570 rnbd_clt_put_dev(dev); 1571 put_sess: 1572 rnbd_clt_put_sess(sess); 1573 1574 return ERR_PTR(ret); 1575 } 1576 1577 static void destroy_gen_disk(struct rnbd_clt_dev *dev) 1578 { 1579 del_gendisk(dev->gd); 1580 blk_cleanup_queue(dev->queue); 1581 put_disk(dev->gd); 1582 } 1583 1584 static void destroy_sysfs(struct rnbd_clt_dev *dev, 1585 const struct attribute *sysfs_self) 1586 { 1587 rnbd_clt_remove_dev_symlink(dev); 1588 if (dev->kobj.state_initialized) { 1589 if (sysfs_self) 1590 /* To avoid deadlock firstly remove itself */ 1591 sysfs_remove_file_self(&dev->kobj, sysfs_self); 1592 kobject_del(&dev->kobj); 1593 kobject_put(&dev->kobj); 1594 } 1595 } 1596 1597 int rnbd_clt_unmap_device(struct rnbd_clt_dev *dev, bool force, 1598 const struct attribute *sysfs_self) 1599 { 1600 struct rnbd_clt_session *sess = dev->sess; 1601 int refcount, ret = 0; 1602 bool was_mapped; 1603 1604 mutex_lock(&dev->lock); 1605 if (dev->dev_state == DEV_STATE_UNMAPPED) { 1606 rnbd_clt_info(dev, "Device is already being unmapped\n"); 1607 ret = -EALREADY; 1608 goto err; 1609 } 1610 refcount = refcount_read(&dev->refcount); 1611 if (!force && refcount > 1) { 1612 rnbd_clt_err(dev, 1613 "Closing device failed, device is in use, (%d device users)\n", 1614 refcount - 1); 1615 ret = -EBUSY; 1616 goto err; 1617 } 1618 was_mapped = (dev->dev_state == DEV_STATE_MAPPED); 1619 dev->dev_state = DEV_STATE_UNMAPPED; 1620 mutex_unlock(&dev->lock); 1621 1622 delete_dev(dev); 1623 destroy_sysfs(dev, sysfs_self); 1624 destroy_gen_disk(dev); 1625 if (was_mapped && sess->rtrs) 1626 send_msg_close(dev, dev->device_id, WAIT); 1627 1628 rnbd_clt_info(dev, "Device is unmapped\n"); 1629 1630 /* Likely last reference put */ 1631 rnbd_clt_put_dev(dev); 1632 1633 /* 1634 * Here device and session can be vanished! 1635 */ 1636 1637 return 0; 1638 err: 1639 mutex_unlock(&dev->lock); 1640 1641 return ret; 1642 } 1643 1644 int rnbd_clt_remap_device(struct rnbd_clt_dev *dev) 1645 { 1646 int err; 1647 1648 mutex_lock(&dev->lock); 1649 if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) 1650 err = 0; 1651 else if (dev->dev_state == DEV_STATE_UNMAPPED) 1652 err = -ENODEV; 1653 else if (dev->dev_state == DEV_STATE_MAPPED) 1654 err = -EALREADY; 1655 else 1656 err = -EBUSY; 1657 mutex_unlock(&dev->lock); 1658 if (!err) { 1659 rnbd_clt_info(dev, "Remapping device.\n"); 1660 err = send_msg_open(dev, WAIT); 1661 if (err) 1662 rnbd_clt_err(dev, "remap_device: %d\n", err); 1663 } 1664 1665 return err; 1666 } 1667 1668 static void unmap_device_work(struct work_struct *work) 1669 { 1670 struct rnbd_clt_dev *dev; 1671 1672 dev = container_of(work, typeof(*dev), unmap_on_rmmod_work); 1673 rnbd_clt_unmap_device(dev, true, NULL); 1674 } 1675 1676 static void rnbd_destroy_sessions(void) 1677 { 1678 struct rnbd_clt_session *sess, *sn; 1679 struct rnbd_clt_dev *dev, *tn; 1680 1681 /* Firstly forbid access through sysfs interface */ 1682 rnbd_clt_destroy_default_group(); 1683 rnbd_clt_destroy_sysfs_files(); 1684 1685 /* 1686 * Here at this point there is no any concurrent access to sessions 1687 * list and devices list: 1688 * 1. New session or device can't be created - session sysfs files 1689 * are removed. 1690 * 2. Device or session can't be removed - module reference is taken 1691 * into account in unmap device sysfs callback. 1692 * 3. No IO requests inflight - each file open of block_dev increases 1693 * module reference in get_disk(). 1694 * 1695 * But still there can be user requests inflights, which are sent by 1696 * asynchronous send_msg_*() functions, thus before unmapping devices 1697 * RTRS session must be explicitly closed. 1698 */ 1699 1700 list_for_each_entry_safe(sess, sn, &sess_list, list) { 1701 WARN_ON(!rnbd_clt_get_sess(sess)); 1702 close_rtrs(sess); 1703 list_for_each_entry_safe(dev, tn, &sess->devs_list, list) { 1704 /* 1705 * Here unmap happens in parallel for only one reason: 1706 * blk_cleanup_queue() takes around half a second, so 1707 * on huge amount of devices the whole module unload 1708 * procedure takes minutes. 1709 */ 1710 INIT_WORK(&dev->unmap_on_rmmod_work, unmap_device_work); 1711 queue_work(system_long_wq, &dev->unmap_on_rmmod_work); 1712 } 1713 rnbd_clt_put_sess(sess); 1714 } 1715 /* Wait for all scheduled unmap works */ 1716 flush_workqueue(system_long_wq); 1717 WARN_ON(!list_empty(&sess_list)); 1718 } 1719 1720 static int __init rnbd_client_init(void) 1721 { 1722 int err = 0; 1723 1724 BUILD_BUG_ON(sizeof(struct rnbd_msg_hdr) != 4); 1725 BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info) != 36); 1726 BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info_rsp) != 36); 1727 BUILD_BUG_ON(sizeof(struct rnbd_msg_open) != 264); 1728 BUILD_BUG_ON(sizeof(struct rnbd_msg_close) != 8); 1729 BUILD_BUG_ON(sizeof(struct rnbd_msg_open_rsp) != 56); 1730 rnbd_client_major = register_blkdev(rnbd_client_major, "rnbd"); 1731 if (rnbd_client_major <= 0) { 1732 pr_err("Failed to load module, block device registration failed\n"); 1733 return -EBUSY; 1734 } 1735 1736 err = rnbd_clt_create_sysfs_files(); 1737 if (err) { 1738 pr_err("Failed to load module, creating sysfs device files failed, err: %d\n", 1739 err); 1740 unregister_blkdev(rnbd_client_major, "rnbd"); 1741 } 1742 1743 return err; 1744 } 1745 1746 static void __exit rnbd_client_exit(void) 1747 { 1748 rnbd_destroy_sessions(); 1749 unregister_blkdev(rnbd_client_major, "rnbd"); 1750 ida_destroy(&index_ida); 1751 } 1752 1753 module_init(rnbd_client_init); 1754 module_exit(rnbd_client_exit); 1755