1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Network Block Driver 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 #include <linux/blkdev.h> 15 #include <linux/hdreg.h> 16 #include <linux/scatterlist.h> 17 #include <linux/idr.h> 18 19 #include "rnbd-clt.h" 20 21 MODULE_DESCRIPTION("RDMA Network Block Device Client"); 22 MODULE_LICENSE("GPL"); 23 24 static int rnbd_client_major; 25 static DEFINE_IDA(index_ida); 26 static DEFINE_MUTEX(ida_lock); 27 static DEFINE_MUTEX(sess_lock); 28 static LIST_HEAD(sess_list); 29 30 /* 31 * Maximum number of partitions an instance can have. 32 * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself) 33 */ 34 #define RNBD_PART_BITS 6 35 36 static inline bool rnbd_clt_get_sess(struct rnbd_clt_session *sess) 37 { 38 return refcount_inc_not_zero(&sess->refcount); 39 } 40 41 static void free_sess(struct rnbd_clt_session *sess); 42 43 static void rnbd_clt_put_sess(struct rnbd_clt_session *sess) 44 { 45 might_sleep(); 46 47 if (refcount_dec_and_test(&sess->refcount)) 48 free_sess(sess); 49 } 50 51 static void rnbd_clt_put_dev(struct rnbd_clt_dev *dev) 52 { 53 might_sleep(); 54 55 if (!refcount_dec_and_test(&dev->refcount)) 56 return; 57 58 mutex_lock(&ida_lock); 59 ida_simple_remove(&index_ida, dev->clt_device_id); 60 mutex_unlock(&ida_lock); 61 kfree(dev->hw_queues); 62 kfree(dev->pathname); 63 rnbd_clt_put_sess(dev->sess); 64 mutex_destroy(&dev->lock); 65 kfree(dev); 66 } 67 68 static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev *dev) 69 { 70 return refcount_inc_not_zero(&dev->refcount); 71 } 72 73 static int rnbd_clt_set_dev_attr(struct rnbd_clt_dev *dev, 74 const struct rnbd_msg_open_rsp *rsp) 75 { 76 struct rnbd_clt_session *sess = dev->sess; 77 78 if (!rsp->logical_block_size) 79 return -EINVAL; 80 81 dev->device_id = le32_to_cpu(rsp->device_id); 82 dev->nsectors = le64_to_cpu(rsp->nsectors); 83 dev->logical_block_size = le16_to_cpu(rsp->logical_block_size); 84 dev->physical_block_size = le16_to_cpu(rsp->physical_block_size); 85 dev->max_write_same_sectors = le32_to_cpu(rsp->max_write_same_sectors); 86 dev->max_discard_sectors = le32_to_cpu(rsp->max_discard_sectors); 87 dev->discard_granularity = le32_to_cpu(rsp->discard_granularity); 88 dev->discard_alignment = le32_to_cpu(rsp->discard_alignment); 89 dev->secure_discard = le16_to_cpu(rsp->secure_discard); 90 dev->rotational = rsp->rotational; 91 dev->wc = !!(rsp->cache_policy & RNBD_WRITEBACK); 92 dev->fua = !!(rsp->cache_policy & RNBD_FUA); 93 94 dev->max_hw_sectors = sess->max_io_size / SECTOR_SIZE; 95 dev->max_segments = BMAX_SEGMENTS; 96 97 return 0; 98 } 99 100 static int rnbd_clt_change_capacity(struct rnbd_clt_dev *dev, 101 size_t new_nsectors) 102 { 103 rnbd_clt_info(dev, "Device size changed from %zu to %zu sectors\n", 104 dev->nsectors, new_nsectors); 105 dev->nsectors = new_nsectors; 106 set_capacity_and_notify(dev->gd, dev->nsectors); 107 return 0; 108 } 109 110 static int process_msg_open_rsp(struct rnbd_clt_dev *dev, 111 struct rnbd_msg_open_rsp *rsp) 112 { 113 int err = 0; 114 115 mutex_lock(&dev->lock); 116 if (dev->dev_state == DEV_STATE_UNMAPPED) { 117 rnbd_clt_info(dev, 118 "Ignoring Open-Response message from server for unmapped device\n"); 119 err = -ENOENT; 120 goto out; 121 } 122 if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) { 123 u64 nsectors = le64_to_cpu(rsp->nsectors); 124 125 /* 126 * If the device was remapped and the size changed in the 127 * meantime we need to revalidate it 128 */ 129 if (dev->nsectors != nsectors) 130 rnbd_clt_change_capacity(dev, nsectors); 131 rnbd_clt_info(dev, "Device online, device remapped successfully\n"); 132 } 133 err = rnbd_clt_set_dev_attr(dev, rsp); 134 if (err) 135 goto out; 136 dev->dev_state = DEV_STATE_MAPPED; 137 138 out: 139 mutex_unlock(&dev->lock); 140 141 return err; 142 } 143 144 int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, size_t newsize) 145 { 146 int ret = 0; 147 148 mutex_lock(&dev->lock); 149 if (dev->dev_state != DEV_STATE_MAPPED) { 150 pr_err("Failed to set new size of the device, device is not opened\n"); 151 ret = -ENOENT; 152 goto out; 153 } 154 ret = rnbd_clt_change_capacity(dev, newsize); 155 156 out: 157 mutex_unlock(&dev->lock); 158 159 return ret; 160 } 161 162 static inline void rnbd_clt_dev_requeue(struct rnbd_queue *q) 163 { 164 if (WARN_ON(!q->hctx)) 165 return; 166 167 /* We can come here from interrupt, thus async=true */ 168 blk_mq_run_hw_queue(q->hctx, true); 169 } 170 171 enum { 172 RNBD_DELAY_IFBUSY = -1, 173 }; 174 175 /** 176 * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun 177 * @sess: Session to find a queue for 178 * @cpu: Cpu to start the search from 179 * 180 * Description: 181 * Each CPU has a list of HW queues, which needs to be rerun. If a list 182 * is not empty - it is marked with a bit. This function finds first 183 * set bit in a bitmap and returns corresponding CPU list. 184 */ 185 static struct rnbd_cpu_qlist * 186 rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu) 187 { 188 int bit; 189 190 /* Search from cpu to nr_cpu_ids */ 191 bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu); 192 if (bit < nr_cpu_ids) { 193 return per_cpu_ptr(sess->cpu_queues, bit); 194 } else if (cpu != 0) { 195 /* Search from 0 to cpu */ 196 bit = find_next_bit(sess->cpu_queues_bm, cpu, 0); 197 if (bit < cpu) 198 return per_cpu_ptr(sess->cpu_queues, bit); 199 } 200 201 return NULL; 202 } 203 204 static inline int nxt_cpu(int cpu) 205 { 206 return (cpu + 1) % nr_cpu_ids; 207 } 208 209 /** 210 * rnbd_rerun_if_needed() - rerun next queue marked as stopped 211 * @sess: Session to rerun a queue on 212 * 213 * Description: 214 * Each CPU has it's own list of HW queues, which should be rerun. 215 * Function finds such list with HW queues, takes a list lock, picks up 216 * the first HW queue out of the list and requeues it. 217 * 218 * Return: 219 * True if the queue was requeued, false otherwise. 220 * 221 * Context: 222 * Does not matter. 223 */ 224 static bool rnbd_rerun_if_needed(struct rnbd_clt_session *sess) 225 { 226 struct rnbd_queue *q = NULL; 227 struct rnbd_cpu_qlist *cpu_q; 228 unsigned long flags; 229 int *cpup; 230 231 /* 232 * To keep fairness and not to let other queues starve we always 233 * try to wake up someone else in round-robin manner. That of course 234 * increases latency but queues always have a chance to be executed. 235 */ 236 cpup = get_cpu_ptr(sess->cpu_rr); 237 for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q; 238 cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) { 239 if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags)) 240 continue; 241 if (unlikely(!test_bit(cpu_q->cpu, sess->cpu_queues_bm))) 242 goto unlock; 243 q = list_first_entry_or_null(&cpu_q->requeue_list, 244 typeof(*q), requeue_list); 245 if (WARN_ON(!q)) 246 goto clear_bit; 247 list_del_init(&q->requeue_list); 248 clear_bit_unlock(0, &q->in_list); 249 250 if (list_empty(&cpu_q->requeue_list)) { 251 /* Clear bit if nothing is left */ 252 clear_bit: 253 clear_bit(cpu_q->cpu, sess->cpu_queues_bm); 254 } 255 unlock: 256 spin_unlock_irqrestore(&cpu_q->requeue_lock, flags); 257 258 if (q) 259 break; 260 } 261 262 /** 263 * Saves the CPU that is going to be requeued on the per-cpu var. Just 264 * incrementing it doesn't work because rnbd_get_cpu_qlist() will 265 * always return the first CPU with something on the queue list when the 266 * value stored on the var is greater than the last CPU with something 267 * on the list. 268 */ 269 if (cpu_q) 270 *cpup = cpu_q->cpu; 271 put_cpu_var(sess->cpu_rr); 272 273 if (q) 274 rnbd_clt_dev_requeue(q); 275 276 return q; 277 } 278 279 /** 280 * rnbd_rerun_all_if_idle() - rerun all queues left in the list if 281 * session is idling (there are no requests 282 * in-flight). 283 * @sess: Session to rerun the queues on 284 * 285 * Description: 286 * This function tries to rerun all stopped queues if there are no 287 * requests in-flight anymore. This function tries to solve an obvious 288 * problem, when number of tags < than number of queues (hctx), which 289 * are stopped and put to sleep. If last permit, which has been just put, 290 * does not wake up all left queues (hctxs), IO requests hang forever. 291 * 292 * That can happen when all number of permits, say N, have been exhausted 293 * from one CPU, and we have many block devices per session, say M. 294 * Each block device has it's own queue (hctx) for each CPU, so eventually 295 * we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids. 296 * If number of permits N < M x nr_cpu_ids finally we will get an IO hang. 297 * 298 * To avoid this hang last caller of rnbd_put_permit() (last caller is the 299 * one who observes sess->busy == 0) must wake up all remaining queues. 300 * 301 * Context: 302 * Does not matter. 303 */ 304 static void rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess) 305 { 306 bool requeued; 307 308 do { 309 requeued = rnbd_rerun_if_needed(sess); 310 } while (atomic_read(&sess->busy) == 0 && requeued); 311 } 312 313 static struct rtrs_permit *rnbd_get_permit(struct rnbd_clt_session *sess, 314 enum rtrs_clt_con_type con_type, 315 int wait) 316 { 317 struct rtrs_permit *permit; 318 319 permit = rtrs_clt_get_permit(sess->rtrs, con_type, 320 wait ? RTRS_PERMIT_WAIT : 321 RTRS_PERMIT_NOWAIT); 322 if (likely(permit)) 323 /* We have a subtle rare case here, when all permits can be 324 * consumed before busy counter increased. This is safe, 325 * because loser will get NULL as a permit, observe 0 busy 326 * counter and immediately restart the queue himself. 327 */ 328 atomic_inc(&sess->busy); 329 330 return permit; 331 } 332 333 static void rnbd_put_permit(struct rnbd_clt_session *sess, 334 struct rtrs_permit *permit) 335 { 336 rtrs_clt_put_permit(sess->rtrs, permit); 337 atomic_dec(&sess->busy); 338 /* Paired with rnbd_clt_dev_add_to_requeue(). Decrement first 339 * and then check queue bits. 340 */ 341 smp_mb__after_atomic(); 342 rnbd_rerun_all_if_idle(sess); 343 } 344 345 static struct rnbd_iu *rnbd_get_iu(struct rnbd_clt_session *sess, 346 enum rtrs_clt_con_type con_type, 347 int wait) 348 { 349 struct rnbd_iu *iu; 350 struct rtrs_permit *permit; 351 352 iu = kzalloc(sizeof(*iu), GFP_KERNEL); 353 if (!iu) { 354 return NULL; 355 } 356 357 permit = rnbd_get_permit(sess, con_type, 358 wait ? RTRS_PERMIT_WAIT : 359 RTRS_PERMIT_NOWAIT); 360 if (unlikely(!permit)) { 361 kfree(iu); 362 return NULL; 363 } 364 365 iu->permit = permit; 366 /* 367 * 1st reference is dropped after finishing sending a "user" message, 368 * 2nd reference is dropped after confirmation with the response is 369 * returned. 370 * 1st and 2nd can happen in any order, so the rnbd_iu should be 371 * released (rtrs_permit returned to rtrs) only after both 372 * are finished. 373 */ 374 atomic_set(&iu->refcount, 2); 375 init_waitqueue_head(&iu->comp.wait); 376 iu->comp.errno = INT_MAX; 377 378 if (sg_alloc_table(&iu->sgt, 1, GFP_KERNEL)) { 379 rnbd_put_permit(sess, permit); 380 kfree(iu); 381 return NULL; 382 } 383 384 return iu; 385 } 386 387 static void rnbd_put_iu(struct rnbd_clt_session *sess, struct rnbd_iu *iu) 388 { 389 if (atomic_dec_and_test(&iu->refcount)) { 390 sg_free_table(&iu->sgt); 391 rnbd_put_permit(sess, iu->permit); 392 kfree(iu); 393 } 394 } 395 396 static void rnbd_softirq_done_fn(struct request *rq) 397 { 398 struct rnbd_clt_dev *dev = rq->rq_disk->private_data; 399 struct rnbd_clt_session *sess = dev->sess; 400 struct rnbd_iu *iu; 401 402 iu = blk_mq_rq_to_pdu(rq); 403 sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT); 404 rnbd_put_permit(sess, iu->permit); 405 blk_mq_end_request(rq, errno_to_blk_status(iu->errno)); 406 } 407 408 static void msg_io_conf(void *priv, int errno) 409 { 410 struct rnbd_iu *iu = priv; 411 struct rnbd_clt_dev *dev = iu->dev; 412 struct request *rq = iu->rq; 413 int rw = rq_data_dir(rq); 414 415 iu->errno = errno; 416 417 blk_mq_complete_request(rq); 418 419 if (errno) 420 rnbd_clt_info_rl(dev, "%s I/O failed with err: %d\n", 421 rw == READ ? "read" : "write", errno); 422 } 423 424 static void wake_up_iu_comp(struct rnbd_iu *iu, int errno) 425 { 426 iu->comp.errno = errno; 427 wake_up(&iu->comp.wait); 428 } 429 430 static void msg_conf(void *priv, int errno) 431 { 432 struct rnbd_iu *iu = priv; 433 434 iu->errno = errno; 435 schedule_work(&iu->work); 436 } 437 438 enum wait_type { 439 NO_WAIT = 0, 440 WAIT = 1 441 }; 442 443 static int send_usr_msg(struct rtrs_clt *rtrs, int dir, 444 struct rnbd_iu *iu, struct kvec *vec, 445 size_t len, struct scatterlist *sg, unsigned int sg_len, 446 void (*conf)(struct work_struct *work), 447 int *errno, enum wait_type wait) 448 { 449 int err; 450 struct rtrs_clt_req_ops req_ops; 451 452 INIT_WORK(&iu->work, conf); 453 req_ops = (struct rtrs_clt_req_ops) { 454 .priv = iu, 455 .conf_fn = msg_conf, 456 }; 457 err = rtrs_clt_request(dir, &req_ops, rtrs, iu->permit, 458 vec, 1, len, sg, sg_len); 459 if (!err && wait) { 460 wait_event(iu->comp.wait, iu->comp.errno != INT_MAX); 461 *errno = iu->comp.errno; 462 } else { 463 *errno = 0; 464 } 465 466 return err; 467 } 468 469 static void msg_close_conf(struct work_struct *work) 470 { 471 struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); 472 struct rnbd_clt_dev *dev = iu->dev; 473 474 wake_up_iu_comp(iu, iu->errno); 475 rnbd_put_iu(dev->sess, iu); 476 rnbd_clt_put_dev(dev); 477 } 478 479 static int send_msg_close(struct rnbd_clt_dev *dev, u32 device_id, bool wait) 480 { 481 struct rnbd_clt_session *sess = dev->sess; 482 struct rnbd_msg_close msg; 483 struct rnbd_iu *iu; 484 struct kvec vec = { 485 .iov_base = &msg, 486 .iov_len = sizeof(msg) 487 }; 488 int err, errno; 489 490 iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); 491 if (!iu) 492 return -ENOMEM; 493 494 iu->buf = NULL; 495 iu->dev = dev; 496 497 msg.hdr.type = cpu_to_le16(RNBD_MSG_CLOSE); 498 msg.device_id = cpu_to_le32(device_id); 499 500 WARN_ON(!rnbd_clt_get_dev(dev)); 501 err = send_usr_msg(sess->rtrs, WRITE, iu, &vec, 0, NULL, 0, 502 msg_close_conf, &errno, wait); 503 if (err) { 504 rnbd_clt_put_dev(dev); 505 rnbd_put_iu(sess, iu); 506 } else { 507 err = errno; 508 } 509 510 rnbd_put_iu(sess, iu); 511 return err; 512 } 513 514 static void msg_open_conf(struct work_struct *work) 515 { 516 struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); 517 struct rnbd_msg_open_rsp *rsp = iu->buf; 518 struct rnbd_clt_dev *dev = iu->dev; 519 int errno = iu->errno; 520 521 if (errno) { 522 rnbd_clt_err(dev, 523 "Opening failed, server responded: %d\n", 524 errno); 525 } else { 526 errno = process_msg_open_rsp(dev, rsp); 527 if (errno) { 528 u32 device_id = le32_to_cpu(rsp->device_id); 529 /* 530 * If server thinks its fine, but we fail to process 531 * then be nice and send a close to server. 532 */ 533 (void)send_msg_close(dev, device_id, NO_WAIT); 534 } 535 } 536 kfree(rsp); 537 wake_up_iu_comp(iu, errno); 538 rnbd_put_iu(dev->sess, iu); 539 rnbd_clt_put_dev(dev); 540 } 541 542 static void msg_sess_info_conf(struct work_struct *work) 543 { 544 struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); 545 struct rnbd_msg_sess_info_rsp *rsp = iu->buf; 546 struct rnbd_clt_session *sess = iu->sess; 547 548 if (!iu->errno) 549 sess->ver = min_t(u8, rsp->ver, RNBD_PROTO_VER_MAJOR); 550 551 kfree(rsp); 552 wake_up_iu_comp(iu, iu->errno); 553 rnbd_put_iu(sess, iu); 554 rnbd_clt_put_sess(sess); 555 } 556 557 static int send_msg_open(struct rnbd_clt_dev *dev, bool wait) 558 { 559 struct rnbd_clt_session *sess = dev->sess; 560 struct rnbd_msg_open_rsp *rsp; 561 struct rnbd_msg_open msg; 562 struct rnbd_iu *iu; 563 struct kvec vec = { 564 .iov_base = &msg, 565 .iov_len = sizeof(msg) 566 }; 567 int err, errno; 568 569 rsp = kzalloc(sizeof(*rsp), GFP_KERNEL); 570 if (!rsp) 571 return -ENOMEM; 572 573 iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); 574 if (!iu) { 575 kfree(rsp); 576 return -ENOMEM; 577 } 578 579 iu->buf = rsp; 580 iu->dev = dev; 581 582 sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp)); 583 584 msg.hdr.type = cpu_to_le16(RNBD_MSG_OPEN); 585 msg.access_mode = dev->access_mode; 586 strlcpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name)); 587 588 WARN_ON(!rnbd_clt_get_dev(dev)); 589 err = send_usr_msg(sess->rtrs, READ, iu, 590 &vec, sizeof(*rsp), iu->sgt.sgl, 1, 591 msg_open_conf, &errno, wait); 592 if (err) { 593 rnbd_clt_put_dev(dev); 594 rnbd_put_iu(sess, iu); 595 kfree(rsp); 596 } else { 597 err = errno; 598 } 599 600 rnbd_put_iu(sess, iu); 601 return err; 602 } 603 604 static int send_msg_sess_info(struct rnbd_clt_session *sess, bool wait) 605 { 606 struct rnbd_msg_sess_info_rsp *rsp; 607 struct rnbd_msg_sess_info msg; 608 struct rnbd_iu *iu; 609 struct kvec vec = { 610 .iov_base = &msg, 611 .iov_len = sizeof(msg) 612 }; 613 int err, errno; 614 615 rsp = kzalloc(sizeof(*rsp), GFP_KERNEL); 616 if (!rsp) 617 return -ENOMEM; 618 619 iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); 620 if (!iu) { 621 kfree(rsp); 622 return -ENOMEM; 623 } 624 625 iu->buf = rsp; 626 iu->sess = sess; 627 sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp)); 628 629 msg.hdr.type = cpu_to_le16(RNBD_MSG_SESS_INFO); 630 msg.ver = RNBD_PROTO_VER_MAJOR; 631 632 if (!rnbd_clt_get_sess(sess)) { 633 /* 634 * That can happen only in one case, when RTRS has restablished 635 * the connection and link_ev() is called, but session is almost 636 * dead, last reference on session is put and caller is waiting 637 * for RTRS to close everything. 638 */ 639 err = -ENODEV; 640 goto put_iu; 641 } 642 err = send_usr_msg(sess->rtrs, READ, iu, 643 &vec, sizeof(*rsp), iu->sgt.sgl, 1, 644 msg_sess_info_conf, &errno, wait); 645 if (err) { 646 rnbd_clt_put_sess(sess); 647 put_iu: 648 rnbd_put_iu(sess, iu); 649 kfree(rsp); 650 } else { 651 err = errno; 652 } 653 rnbd_put_iu(sess, iu); 654 return err; 655 } 656 657 static void set_dev_states_to_disconnected(struct rnbd_clt_session *sess) 658 { 659 struct rnbd_clt_dev *dev; 660 661 mutex_lock(&sess->lock); 662 list_for_each_entry(dev, &sess->devs_list, list) { 663 rnbd_clt_err(dev, "Device disconnected.\n"); 664 665 mutex_lock(&dev->lock); 666 if (dev->dev_state == DEV_STATE_MAPPED) 667 dev->dev_state = DEV_STATE_MAPPED_DISCONNECTED; 668 mutex_unlock(&dev->lock); 669 } 670 mutex_unlock(&sess->lock); 671 } 672 673 static void remap_devs(struct rnbd_clt_session *sess) 674 { 675 struct rnbd_clt_dev *dev; 676 struct rtrs_attrs attrs; 677 int err; 678 679 /* 680 * Careful here: we are called from RTRS link event directly, 681 * thus we can't send any RTRS request and wait for response 682 * or RTRS will not be able to complete request with failure 683 * if something goes wrong (failing of outstanding requests 684 * happens exactly from the context where we are blocking now). 685 * 686 * So to avoid deadlocks each usr message sent from here must 687 * be asynchronous. 688 */ 689 690 err = send_msg_sess_info(sess, NO_WAIT); 691 if (err) { 692 pr_err("send_msg_sess_info(\"%s\"): %d\n", sess->sessname, err); 693 return; 694 } 695 696 rtrs_clt_query(sess->rtrs, &attrs); 697 mutex_lock(&sess->lock); 698 sess->max_io_size = attrs.max_io_size; 699 700 list_for_each_entry(dev, &sess->devs_list, list) { 701 bool skip; 702 703 mutex_lock(&dev->lock); 704 skip = (dev->dev_state == DEV_STATE_INIT); 705 mutex_unlock(&dev->lock); 706 if (skip) 707 /* 708 * When device is establishing connection for the first 709 * time - do not remap, it will be closed soon. 710 */ 711 continue; 712 713 rnbd_clt_info(dev, "session reconnected, remapping device\n"); 714 err = send_msg_open(dev, NO_WAIT); 715 if (err) { 716 rnbd_clt_err(dev, "send_msg_open(): %d\n", err); 717 break; 718 } 719 } 720 mutex_unlock(&sess->lock); 721 } 722 723 static void rnbd_clt_link_ev(void *priv, enum rtrs_clt_link_ev ev) 724 { 725 struct rnbd_clt_session *sess = priv; 726 727 switch (ev) { 728 case RTRS_CLT_LINK_EV_DISCONNECTED: 729 set_dev_states_to_disconnected(sess); 730 break; 731 case RTRS_CLT_LINK_EV_RECONNECTED: 732 remap_devs(sess); 733 break; 734 default: 735 pr_err("Unknown session event received (%d), session: %s\n", 736 ev, sess->sessname); 737 } 738 } 739 740 static void rnbd_init_cpu_qlists(struct rnbd_cpu_qlist __percpu *cpu_queues) 741 { 742 unsigned int cpu; 743 struct rnbd_cpu_qlist *cpu_q; 744 745 for_each_possible_cpu(cpu) { 746 cpu_q = per_cpu_ptr(cpu_queues, cpu); 747 748 cpu_q->cpu = cpu; 749 INIT_LIST_HEAD(&cpu_q->requeue_list); 750 spin_lock_init(&cpu_q->requeue_lock); 751 } 752 } 753 754 static void destroy_mq_tags(struct rnbd_clt_session *sess) 755 { 756 if (sess->tag_set.tags) 757 blk_mq_free_tag_set(&sess->tag_set); 758 } 759 760 static inline void wake_up_rtrs_waiters(struct rnbd_clt_session *sess) 761 { 762 sess->rtrs_ready = true; 763 wake_up_all(&sess->rtrs_waitq); 764 } 765 766 static void close_rtrs(struct rnbd_clt_session *sess) 767 { 768 might_sleep(); 769 770 if (!IS_ERR_OR_NULL(sess->rtrs)) { 771 rtrs_clt_close(sess->rtrs); 772 sess->rtrs = NULL; 773 wake_up_rtrs_waiters(sess); 774 } 775 } 776 777 static void free_sess(struct rnbd_clt_session *sess) 778 { 779 WARN_ON(!list_empty(&sess->devs_list)); 780 781 might_sleep(); 782 783 close_rtrs(sess); 784 destroy_mq_tags(sess); 785 if (!list_empty(&sess->list)) { 786 mutex_lock(&sess_lock); 787 list_del(&sess->list); 788 mutex_unlock(&sess_lock); 789 } 790 free_percpu(sess->cpu_queues); 791 free_percpu(sess->cpu_rr); 792 mutex_destroy(&sess->lock); 793 kfree(sess); 794 } 795 796 static struct rnbd_clt_session *alloc_sess(const char *sessname) 797 { 798 struct rnbd_clt_session *sess; 799 int err, cpu; 800 801 sess = kzalloc_node(sizeof(*sess), GFP_KERNEL, NUMA_NO_NODE); 802 if (!sess) 803 return ERR_PTR(-ENOMEM); 804 strlcpy(sess->sessname, sessname, sizeof(sess->sessname)); 805 atomic_set(&sess->busy, 0); 806 mutex_init(&sess->lock); 807 INIT_LIST_HEAD(&sess->devs_list); 808 INIT_LIST_HEAD(&sess->list); 809 bitmap_zero(sess->cpu_queues_bm, NR_CPUS); 810 init_waitqueue_head(&sess->rtrs_waitq); 811 refcount_set(&sess->refcount, 1); 812 813 sess->cpu_queues = alloc_percpu(struct rnbd_cpu_qlist); 814 if (!sess->cpu_queues) { 815 err = -ENOMEM; 816 goto err; 817 } 818 rnbd_init_cpu_qlists(sess->cpu_queues); 819 820 /* 821 * That is simple percpu variable which stores cpu indices, which are 822 * incremented on each access. We need that for the sake of fairness 823 * to wake up queues in a round-robin manner. 824 */ 825 sess->cpu_rr = alloc_percpu(int); 826 if (!sess->cpu_rr) { 827 err = -ENOMEM; 828 goto err; 829 } 830 for_each_possible_cpu(cpu) 831 * per_cpu_ptr(sess->cpu_rr, cpu) = cpu; 832 833 return sess; 834 835 err: 836 free_sess(sess); 837 838 return ERR_PTR(err); 839 } 840 841 static int wait_for_rtrs_connection(struct rnbd_clt_session *sess) 842 { 843 wait_event(sess->rtrs_waitq, sess->rtrs_ready); 844 if (IS_ERR_OR_NULL(sess->rtrs)) 845 return -ECONNRESET; 846 847 return 0; 848 } 849 850 static void wait_for_rtrs_disconnection(struct rnbd_clt_session *sess) 851 __releases(&sess_lock) 852 __acquires(&sess_lock) 853 { 854 DEFINE_WAIT(wait); 855 856 prepare_to_wait(&sess->rtrs_waitq, &wait, TASK_UNINTERRUPTIBLE); 857 if (IS_ERR_OR_NULL(sess->rtrs)) { 858 finish_wait(&sess->rtrs_waitq, &wait); 859 return; 860 } 861 mutex_unlock(&sess_lock); 862 /* loop in caller, see __find_and_get_sess(). 863 * You can't leave mutex locked and call schedule(), you will catch a 864 * deadlock with a caller of free_sess(), which has just put the last 865 * reference and is about to take the sess_lock in order to delete 866 * the session from the list. 867 */ 868 schedule(); 869 mutex_lock(&sess_lock); 870 } 871 872 static struct rnbd_clt_session *__find_and_get_sess(const char *sessname) 873 __releases(&sess_lock) 874 __acquires(&sess_lock) 875 { 876 struct rnbd_clt_session *sess, *sn; 877 int err; 878 879 again: 880 list_for_each_entry_safe(sess, sn, &sess_list, list) { 881 if (strcmp(sessname, sess->sessname)) 882 continue; 883 884 if (sess->rtrs_ready && IS_ERR_OR_NULL(sess->rtrs)) 885 /* 886 * No RTRS connection, session is dying. 887 */ 888 continue; 889 890 if (rnbd_clt_get_sess(sess)) { 891 /* 892 * Alive session is found, wait for RTRS connection. 893 */ 894 mutex_unlock(&sess_lock); 895 err = wait_for_rtrs_connection(sess); 896 if (err) 897 rnbd_clt_put_sess(sess); 898 mutex_lock(&sess_lock); 899 900 if (err) 901 /* Session is dying, repeat the loop */ 902 goto again; 903 904 return sess; 905 } 906 /* 907 * Ref is 0, session is dying, wait for RTRS disconnect 908 * in order to avoid session names clashes. 909 */ 910 wait_for_rtrs_disconnection(sess); 911 /* 912 * RTRS is disconnected and soon session will be freed, 913 * so repeat a loop. 914 */ 915 goto again; 916 } 917 918 return NULL; 919 } 920 921 static struct 922 rnbd_clt_session *find_or_create_sess(const char *sessname, bool *first) 923 { 924 struct rnbd_clt_session *sess = NULL; 925 926 mutex_lock(&sess_lock); 927 sess = __find_and_get_sess(sessname); 928 if (!sess) { 929 sess = alloc_sess(sessname); 930 if (IS_ERR(sess)) { 931 mutex_unlock(&sess_lock); 932 return sess; 933 } 934 list_add(&sess->list, &sess_list); 935 *first = true; 936 } else 937 *first = false; 938 mutex_unlock(&sess_lock); 939 940 return sess; 941 } 942 943 static int rnbd_client_open(struct block_device *block_device, fmode_t mode) 944 { 945 struct rnbd_clt_dev *dev = block_device->bd_disk->private_data; 946 947 if (dev->read_only && (mode & FMODE_WRITE)) 948 return -EPERM; 949 950 if (dev->dev_state == DEV_STATE_UNMAPPED || 951 !rnbd_clt_get_dev(dev)) 952 return -EIO; 953 954 return 0; 955 } 956 957 static void rnbd_client_release(struct gendisk *gen, fmode_t mode) 958 { 959 struct rnbd_clt_dev *dev = gen->private_data; 960 961 rnbd_clt_put_dev(dev); 962 } 963 964 static int rnbd_client_getgeo(struct block_device *block_device, 965 struct hd_geometry *geo) 966 { 967 u64 size; 968 struct rnbd_clt_dev *dev; 969 970 dev = block_device->bd_disk->private_data; 971 size = dev->size * (dev->logical_block_size / SECTOR_SIZE); 972 geo->cylinders = size >> 6; /* size/64 */ 973 geo->heads = 4; 974 geo->sectors = 16; 975 geo->start = 0; 976 977 return 0; 978 } 979 980 static const struct block_device_operations rnbd_client_ops = { 981 .owner = THIS_MODULE, 982 .open = rnbd_client_open, 983 .release = rnbd_client_release, 984 .getgeo = rnbd_client_getgeo 985 }; 986 987 /* The amount of data that belongs to an I/O and the amount of data that 988 * should be read or written to the disk (bi_size) can differ. 989 * 990 * E.g. When WRITE_SAME is used, only a small amount of data is 991 * transferred that is then written repeatedly over a lot of sectors. 992 * 993 * Get the size of data to be transferred via RTRS by summing up the size 994 * of the scather-gather list entries. 995 */ 996 static size_t rnbd_clt_get_sg_size(struct scatterlist *sglist, u32 len) 997 { 998 struct scatterlist *sg; 999 size_t tsize = 0; 1000 int i; 1001 1002 for_each_sg(sglist, sg, len, i) 1003 tsize += sg->length; 1004 return tsize; 1005 } 1006 1007 static int rnbd_client_xfer_request(struct rnbd_clt_dev *dev, 1008 struct request *rq, 1009 struct rnbd_iu *iu) 1010 { 1011 struct rtrs_clt *rtrs = dev->sess->rtrs; 1012 struct rtrs_permit *permit = iu->permit; 1013 struct rnbd_msg_io msg; 1014 struct rtrs_clt_req_ops req_ops; 1015 unsigned int sg_cnt = 0; 1016 struct kvec vec; 1017 size_t size; 1018 int err; 1019 1020 iu->rq = rq; 1021 iu->dev = dev; 1022 msg.sector = cpu_to_le64(blk_rq_pos(rq)); 1023 msg.bi_size = cpu_to_le32(blk_rq_bytes(rq)); 1024 msg.rw = cpu_to_le32(rq_to_rnbd_flags(rq)); 1025 msg.prio = cpu_to_le16(req_get_ioprio(rq)); 1026 1027 /* 1028 * We only support discards with single segment for now. 1029 * See queue limits. 1030 */ 1031 if (req_op(rq) != REQ_OP_DISCARD) 1032 sg_cnt = blk_rq_map_sg(dev->queue, rq, iu->sgt.sgl); 1033 1034 if (sg_cnt == 0) 1035 sg_mark_end(&iu->sgt.sgl[0]); 1036 1037 msg.hdr.type = cpu_to_le16(RNBD_MSG_IO); 1038 msg.device_id = cpu_to_le32(dev->device_id); 1039 1040 vec = (struct kvec) { 1041 .iov_base = &msg, 1042 .iov_len = sizeof(msg) 1043 }; 1044 size = rnbd_clt_get_sg_size(iu->sgt.sgl, sg_cnt); 1045 req_ops = (struct rtrs_clt_req_ops) { 1046 .priv = iu, 1047 .conf_fn = msg_io_conf, 1048 }; 1049 err = rtrs_clt_request(rq_data_dir(rq), &req_ops, rtrs, permit, 1050 &vec, 1, size, iu->sgt.sgl, sg_cnt); 1051 if (unlikely(err)) { 1052 rnbd_clt_err_rl(dev, "RTRS failed to transfer IO, err: %d\n", 1053 err); 1054 return err; 1055 } 1056 1057 return 0; 1058 } 1059 1060 /** 1061 * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy 1062 * @dev: Device to be checked 1063 * @q: Queue to be added to the requeue list if required 1064 * 1065 * Description: 1066 * If session is busy, that means someone will requeue us when resources 1067 * are freed. If session is not doing anything - device is not added to 1068 * the list and @false is returned. 1069 */ 1070 static bool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev *dev, 1071 struct rnbd_queue *q) 1072 { 1073 struct rnbd_clt_session *sess = dev->sess; 1074 struct rnbd_cpu_qlist *cpu_q; 1075 unsigned long flags; 1076 bool added = true; 1077 bool need_set; 1078 1079 cpu_q = get_cpu_ptr(sess->cpu_queues); 1080 spin_lock_irqsave(&cpu_q->requeue_lock, flags); 1081 1082 if (likely(!test_and_set_bit_lock(0, &q->in_list))) { 1083 if (WARN_ON(!list_empty(&q->requeue_list))) 1084 goto unlock; 1085 1086 need_set = !test_bit(cpu_q->cpu, sess->cpu_queues_bm); 1087 if (need_set) { 1088 set_bit(cpu_q->cpu, sess->cpu_queues_bm); 1089 /* Paired with rnbd_put_permit(). Set a bit first 1090 * and then observe the busy counter. 1091 */ 1092 smp_mb__before_atomic(); 1093 } 1094 if (likely(atomic_read(&sess->busy))) { 1095 list_add_tail(&q->requeue_list, &cpu_q->requeue_list); 1096 } else { 1097 /* Very unlikely, but possible: busy counter was 1098 * observed as zero. Drop all bits and return 1099 * false to restart the queue by ourselves. 1100 */ 1101 if (need_set) 1102 clear_bit(cpu_q->cpu, sess->cpu_queues_bm); 1103 clear_bit_unlock(0, &q->in_list); 1104 added = false; 1105 } 1106 } 1107 unlock: 1108 spin_unlock_irqrestore(&cpu_q->requeue_lock, flags); 1109 put_cpu_ptr(sess->cpu_queues); 1110 1111 return added; 1112 } 1113 1114 static void rnbd_clt_dev_kick_mq_queue(struct rnbd_clt_dev *dev, 1115 struct blk_mq_hw_ctx *hctx, 1116 int delay) 1117 { 1118 struct rnbd_queue *q = hctx->driver_data; 1119 1120 if (delay != RNBD_DELAY_IFBUSY) 1121 blk_mq_delay_run_hw_queue(hctx, delay); 1122 else if (unlikely(!rnbd_clt_dev_add_to_requeue(dev, q))) 1123 /* 1124 * If session is not busy we have to restart 1125 * the queue ourselves. 1126 */ 1127 blk_mq_delay_run_hw_queue(hctx, 10/*ms*/); 1128 } 1129 1130 static blk_status_t rnbd_queue_rq(struct blk_mq_hw_ctx *hctx, 1131 const struct blk_mq_queue_data *bd) 1132 { 1133 struct request *rq = bd->rq; 1134 struct rnbd_clt_dev *dev = rq->rq_disk->private_data; 1135 struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq); 1136 int err; 1137 blk_status_t ret = BLK_STS_IOERR; 1138 1139 if (unlikely(dev->dev_state != DEV_STATE_MAPPED)) 1140 return BLK_STS_IOERR; 1141 1142 iu->permit = rnbd_get_permit(dev->sess, RTRS_IO_CON, 1143 RTRS_PERMIT_NOWAIT); 1144 if (unlikely(!iu->permit)) { 1145 rnbd_clt_dev_kick_mq_queue(dev, hctx, RNBD_DELAY_IFBUSY); 1146 return BLK_STS_RESOURCE; 1147 } 1148 1149 iu->sgt.sgl = iu->first_sgl; 1150 err = sg_alloc_table_chained(&iu->sgt, 1151 /* Even-if the request has no segment, 1152 * sglist must have one entry at least */ 1153 blk_rq_nr_phys_segments(rq) ? : 1, 1154 iu->sgt.sgl, 1155 RNBD_INLINE_SG_CNT); 1156 if (err) { 1157 rnbd_clt_err_rl(dev, "sg_alloc_table_chained ret=%d\n", err); 1158 rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/); 1159 rnbd_put_permit(dev->sess, iu->permit); 1160 return BLK_STS_RESOURCE; 1161 } 1162 1163 blk_mq_start_request(rq); 1164 err = rnbd_client_xfer_request(dev, rq, iu); 1165 if (likely(err == 0)) 1166 return BLK_STS_OK; 1167 if (unlikely(err == -EAGAIN || err == -ENOMEM)) { 1168 rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/); 1169 ret = BLK_STS_RESOURCE; 1170 } 1171 sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT); 1172 rnbd_put_permit(dev->sess, iu->permit); 1173 return ret; 1174 } 1175 1176 static struct blk_mq_ops rnbd_mq_ops = { 1177 .queue_rq = rnbd_queue_rq, 1178 .complete = rnbd_softirq_done_fn, 1179 }; 1180 1181 static int setup_mq_tags(struct rnbd_clt_session *sess) 1182 { 1183 struct blk_mq_tag_set *tag_set = &sess->tag_set; 1184 1185 memset(tag_set, 0, sizeof(*tag_set)); 1186 tag_set->ops = &rnbd_mq_ops; 1187 tag_set->queue_depth = sess->queue_depth; 1188 tag_set->numa_node = NUMA_NO_NODE; 1189 tag_set->flags = BLK_MQ_F_SHOULD_MERGE | 1190 BLK_MQ_F_TAG_QUEUE_SHARED; 1191 tag_set->cmd_size = sizeof(struct rnbd_iu) + RNBD_RDMA_SGL_SIZE; 1192 tag_set->nr_hw_queues = num_online_cpus(); 1193 1194 return blk_mq_alloc_tag_set(tag_set); 1195 } 1196 1197 static struct rnbd_clt_session * 1198 find_and_get_or_create_sess(const char *sessname, 1199 const struct rtrs_addr *paths, 1200 size_t path_cnt, u16 port_nr) 1201 { 1202 struct rnbd_clt_session *sess; 1203 struct rtrs_attrs attrs; 1204 int err; 1205 bool first; 1206 struct rtrs_clt_ops rtrs_ops; 1207 1208 sess = find_or_create_sess(sessname, &first); 1209 if (sess == ERR_PTR(-ENOMEM)) 1210 return ERR_PTR(-ENOMEM); 1211 else if (!first) 1212 return sess; 1213 1214 if (!path_cnt) { 1215 pr_err("Session %s not found, and path parameter not given", sessname); 1216 err = -ENXIO; 1217 goto put_sess; 1218 } 1219 1220 rtrs_ops = (struct rtrs_clt_ops) { 1221 .priv = sess, 1222 .link_ev = rnbd_clt_link_ev, 1223 }; 1224 /* 1225 * Nothing was found, establish rtrs connection and proceed further. 1226 */ 1227 sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname, 1228 paths, path_cnt, port_nr, 1229 0, /* Do not use pdu of rtrs */ 1230 RECONNECT_DELAY, BMAX_SEGMENTS, 1231 BLK_MAX_SEGMENT_SIZE, 1232 MAX_RECONNECTS); 1233 if (IS_ERR(sess->rtrs)) { 1234 err = PTR_ERR(sess->rtrs); 1235 goto wake_up_and_put; 1236 } 1237 rtrs_clt_query(sess->rtrs, &attrs); 1238 sess->max_io_size = attrs.max_io_size; 1239 sess->queue_depth = attrs.queue_depth; 1240 1241 err = setup_mq_tags(sess); 1242 if (err) 1243 goto close_rtrs; 1244 1245 err = send_msg_sess_info(sess, WAIT); 1246 if (err) 1247 goto close_rtrs; 1248 1249 wake_up_rtrs_waiters(sess); 1250 1251 return sess; 1252 1253 close_rtrs: 1254 close_rtrs(sess); 1255 put_sess: 1256 rnbd_clt_put_sess(sess); 1257 1258 return ERR_PTR(err); 1259 1260 wake_up_and_put: 1261 wake_up_rtrs_waiters(sess); 1262 goto put_sess; 1263 } 1264 1265 static inline void rnbd_init_hw_queue(struct rnbd_clt_dev *dev, 1266 struct rnbd_queue *q, 1267 struct blk_mq_hw_ctx *hctx) 1268 { 1269 INIT_LIST_HEAD(&q->requeue_list); 1270 q->dev = dev; 1271 q->hctx = hctx; 1272 } 1273 1274 static void rnbd_init_mq_hw_queues(struct rnbd_clt_dev *dev) 1275 { 1276 int i; 1277 struct blk_mq_hw_ctx *hctx; 1278 struct rnbd_queue *q; 1279 1280 queue_for_each_hw_ctx(dev->queue, hctx, i) { 1281 q = &dev->hw_queues[i]; 1282 rnbd_init_hw_queue(dev, q, hctx); 1283 hctx->driver_data = q; 1284 } 1285 } 1286 1287 static int setup_mq_dev(struct rnbd_clt_dev *dev) 1288 { 1289 dev->queue = blk_mq_init_queue(&dev->sess->tag_set); 1290 if (IS_ERR(dev->queue)) { 1291 rnbd_clt_err(dev, "Initializing multiqueue queue failed, err: %ld\n", 1292 PTR_ERR(dev->queue)); 1293 return PTR_ERR(dev->queue); 1294 } 1295 rnbd_init_mq_hw_queues(dev); 1296 return 0; 1297 } 1298 1299 static void setup_request_queue(struct rnbd_clt_dev *dev) 1300 { 1301 blk_queue_logical_block_size(dev->queue, dev->logical_block_size); 1302 blk_queue_physical_block_size(dev->queue, dev->physical_block_size); 1303 blk_queue_max_hw_sectors(dev->queue, dev->max_hw_sectors); 1304 blk_queue_max_write_same_sectors(dev->queue, 1305 dev->max_write_same_sectors); 1306 1307 /* 1308 * we don't support discards to "discontiguous" segments 1309 * in on request 1310 */ 1311 blk_queue_max_discard_segments(dev->queue, 1); 1312 1313 blk_queue_max_discard_sectors(dev->queue, dev->max_discard_sectors); 1314 dev->queue->limits.discard_granularity = dev->discard_granularity; 1315 dev->queue->limits.discard_alignment = dev->discard_alignment; 1316 if (dev->max_discard_sectors) 1317 blk_queue_flag_set(QUEUE_FLAG_DISCARD, dev->queue); 1318 if (dev->secure_discard) 1319 blk_queue_flag_set(QUEUE_FLAG_SECERASE, dev->queue); 1320 1321 blk_queue_flag_set(QUEUE_FLAG_SAME_COMP, dev->queue); 1322 blk_queue_flag_set(QUEUE_FLAG_SAME_FORCE, dev->queue); 1323 blk_queue_max_segments(dev->queue, dev->max_segments); 1324 blk_queue_io_opt(dev->queue, dev->sess->max_io_size); 1325 blk_queue_virt_boundary(dev->queue, SZ_4K - 1); 1326 blk_queue_write_cache(dev->queue, dev->wc, dev->fua); 1327 dev->queue->queuedata = dev; 1328 } 1329 1330 static void rnbd_clt_setup_gen_disk(struct rnbd_clt_dev *dev, int idx) 1331 { 1332 dev->gd->major = rnbd_client_major; 1333 dev->gd->first_minor = idx << RNBD_PART_BITS; 1334 dev->gd->fops = &rnbd_client_ops; 1335 dev->gd->queue = dev->queue; 1336 dev->gd->private_data = dev; 1337 snprintf(dev->gd->disk_name, sizeof(dev->gd->disk_name), "rnbd%d", 1338 idx); 1339 pr_debug("disk_name=%s, capacity=%zu\n", 1340 dev->gd->disk_name, 1341 dev->nsectors * (dev->logical_block_size / SECTOR_SIZE) 1342 ); 1343 1344 set_capacity(dev->gd, dev->nsectors); 1345 1346 if (dev->access_mode == RNBD_ACCESS_RO) { 1347 dev->read_only = true; 1348 set_disk_ro(dev->gd, true); 1349 } else { 1350 dev->read_only = false; 1351 } 1352 1353 if (!dev->rotational) 1354 blk_queue_flag_set(QUEUE_FLAG_NONROT, dev->queue); 1355 } 1356 1357 static int rnbd_client_setup_device(struct rnbd_clt_session *sess, 1358 struct rnbd_clt_dev *dev, int idx) 1359 { 1360 int err; 1361 1362 dev->size = dev->nsectors * dev->logical_block_size; 1363 1364 err = setup_mq_dev(dev); 1365 if (err) 1366 return err; 1367 1368 setup_request_queue(dev); 1369 1370 dev->gd = alloc_disk_node(1 << RNBD_PART_BITS, NUMA_NO_NODE); 1371 if (!dev->gd) { 1372 blk_cleanup_queue(dev->queue); 1373 return -ENOMEM; 1374 } 1375 1376 rnbd_clt_setup_gen_disk(dev, idx); 1377 1378 return 0; 1379 } 1380 1381 static struct rnbd_clt_dev *init_dev(struct rnbd_clt_session *sess, 1382 enum rnbd_access_mode access_mode, 1383 const char *pathname) 1384 { 1385 struct rnbd_clt_dev *dev; 1386 int ret; 1387 1388 dev = kzalloc_node(sizeof(*dev), GFP_KERNEL, NUMA_NO_NODE); 1389 if (!dev) 1390 return ERR_PTR(-ENOMEM); 1391 1392 dev->hw_queues = kcalloc(nr_cpu_ids, sizeof(*dev->hw_queues), 1393 GFP_KERNEL); 1394 if (!dev->hw_queues) { 1395 ret = -ENOMEM; 1396 goto out_alloc; 1397 } 1398 1399 mutex_lock(&ida_lock); 1400 ret = ida_simple_get(&index_ida, 0, 1 << (MINORBITS - RNBD_PART_BITS), 1401 GFP_KERNEL); 1402 mutex_unlock(&ida_lock); 1403 if (ret < 0) { 1404 pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n", 1405 pathname, sess->sessname, ret); 1406 goto out_queues; 1407 } 1408 1409 dev->pathname = kstrdup(pathname, GFP_KERNEL); 1410 if (!dev->pathname) { 1411 ret = -ENOMEM; 1412 goto out_queues; 1413 } 1414 1415 dev->clt_device_id = ret; 1416 dev->sess = sess; 1417 dev->access_mode = access_mode; 1418 mutex_init(&dev->lock); 1419 refcount_set(&dev->refcount, 1); 1420 dev->dev_state = DEV_STATE_INIT; 1421 1422 /* 1423 * Here we called from sysfs entry, thus clt-sysfs is 1424 * responsible that session will not disappear. 1425 */ 1426 WARN_ON(!rnbd_clt_get_sess(sess)); 1427 1428 return dev; 1429 1430 out_queues: 1431 kfree(dev->hw_queues); 1432 out_alloc: 1433 kfree(dev); 1434 return ERR_PTR(ret); 1435 } 1436 1437 static bool __exists_dev(const char *pathname, const char *sessname) 1438 { 1439 struct rnbd_clt_session *sess; 1440 struct rnbd_clt_dev *dev; 1441 bool found = false; 1442 1443 list_for_each_entry(sess, &sess_list, list) { 1444 if (sessname && strncmp(sess->sessname, sessname, 1445 sizeof(sess->sessname))) 1446 continue; 1447 mutex_lock(&sess->lock); 1448 list_for_each_entry(dev, &sess->devs_list, list) { 1449 if (strlen(dev->pathname) == strlen(pathname) && 1450 !strcmp(dev->pathname, pathname)) { 1451 found = true; 1452 break; 1453 } 1454 } 1455 mutex_unlock(&sess->lock); 1456 if (found) 1457 break; 1458 } 1459 1460 return found; 1461 } 1462 1463 static bool exists_devpath(const char *pathname, const char *sessname) 1464 { 1465 bool found; 1466 1467 mutex_lock(&sess_lock); 1468 found = __exists_dev(pathname, sessname); 1469 mutex_unlock(&sess_lock); 1470 1471 return found; 1472 } 1473 1474 static bool insert_dev_if_not_exists_devpath(const char *pathname, 1475 struct rnbd_clt_session *sess, 1476 struct rnbd_clt_dev *dev) 1477 { 1478 bool found; 1479 1480 mutex_lock(&sess_lock); 1481 found = __exists_dev(pathname, sess->sessname); 1482 if (!found) { 1483 mutex_lock(&sess->lock); 1484 list_add_tail(&dev->list, &sess->devs_list); 1485 mutex_unlock(&sess->lock); 1486 } 1487 mutex_unlock(&sess_lock); 1488 1489 return found; 1490 } 1491 1492 static void delete_dev(struct rnbd_clt_dev *dev) 1493 { 1494 struct rnbd_clt_session *sess = dev->sess; 1495 1496 mutex_lock(&sess->lock); 1497 list_del(&dev->list); 1498 mutex_unlock(&sess->lock); 1499 } 1500 1501 struct rnbd_clt_dev *rnbd_clt_map_device(const char *sessname, 1502 struct rtrs_addr *paths, 1503 size_t path_cnt, u16 port_nr, 1504 const char *pathname, 1505 enum rnbd_access_mode access_mode) 1506 { 1507 struct rnbd_clt_session *sess; 1508 struct rnbd_clt_dev *dev; 1509 int ret; 1510 1511 if (unlikely(exists_devpath(pathname, sessname))) 1512 return ERR_PTR(-EEXIST); 1513 1514 sess = find_and_get_or_create_sess(sessname, paths, path_cnt, port_nr); 1515 if (IS_ERR(sess)) 1516 return ERR_CAST(sess); 1517 1518 dev = init_dev(sess, access_mode, pathname); 1519 if (IS_ERR(dev)) { 1520 pr_err("map_device: failed to map device '%s' from session %s, can't initialize device, err: %ld\n", 1521 pathname, sess->sessname, PTR_ERR(dev)); 1522 ret = PTR_ERR(dev); 1523 goto put_sess; 1524 } 1525 if (insert_dev_if_not_exists_devpath(pathname, sess, dev)) { 1526 ret = -EEXIST; 1527 goto put_dev; 1528 } 1529 ret = send_msg_open(dev, WAIT); 1530 if (ret) { 1531 rnbd_clt_err(dev, 1532 "map_device: failed, can't open remote device, err: %d\n", 1533 ret); 1534 goto del_dev; 1535 } 1536 mutex_lock(&dev->lock); 1537 pr_debug("Opened remote device: session=%s, path='%s'\n", 1538 sess->sessname, pathname); 1539 ret = rnbd_client_setup_device(sess, dev, dev->clt_device_id); 1540 if (ret) { 1541 rnbd_clt_err(dev, 1542 "map_device: Failed to configure device, err: %d\n", 1543 ret); 1544 mutex_unlock(&dev->lock); 1545 goto send_close; 1546 } 1547 1548 rnbd_clt_info(dev, 1549 "map_device: Device mapped as %s (nsectors: %zu, logical_block_size: %d, physical_block_size: %d, max_write_same_sectors: %d, max_discard_sectors: %d, discard_granularity: %d, discard_alignment: %d, secure_discard: %d, max_segments: %d, max_hw_sectors: %d, rotational: %d, wc: %d, fua: %d)\n", 1550 dev->gd->disk_name, dev->nsectors, 1551 dev->logical_block_size, dev->physical_block_size, 1552 dev->max_write_same_sectors, dev->max_discard_sectors, 1553 dev->discard_granularity, dev->discard_alignment, 1554 dev->secure_discard, dev->max_segments, 1555 dev->max_hw_sectors, dev->rotational, dev->wc, dev->fua); 1556 1557 mutex_unlock(&dev->lock); 1558 1559 add_disk(dev->gd); 1560 rnbd_clt_put_sess(sess); 1561 1562 return dev; 1563 1564 send_close: 1565 send_msg_close(dev, dev->device_id, WAIT); 1566 del_dev: 1567 delete_dev(dev); 1568 put_dev: 1569 rnbd_clt_put_dev(dev); 1570 put_sess: 1571 rnbd_clt_put_sess(sess); 1572 1573 return ERR_PTR(ret); 1574 } 1575 1576 static void destroy_gen_disk(struct rnbd_clt_dev *dev) 1577 { 1578 del_gendisk(dev->gd); 1579 blk_cleanup_queue(dev->queue); 1580 put_disk(dev->gd); 1581 } 1582 1583 static void destroy_sysfs(struct rnbd_clt_dev *dev, 1584 const struct attribute *sysfs_self) 1585 { 1586 rnbd_clt_remove_dev_symlink(dev); 1587 if (dev->kobj.state_initialized) { 1588 if (sysfs_self) 1589 /* To avoid deadlock firstly remove itself */ 1590 sysfs_remove_file_self(&dev->kobj, sysfs_self); 1591 kobject_del(&dev->kobj); 1592 kobject_put(&dev->kobj); 1593 } 1594 } 1595 1596 int rnbd_clt_unmap_device(struct rnbd_clt_dev *dev, bool force, 1597 const struct attribute *sysfs_self) 1598 { 1599 struct rnbd_clt_session *sess = dev->sess; 1600 int refcount, ret = 0; 1601 bool was_mapped; 1602 1603 mutex_lock(&dev->lock); 1604 if (dev->dev_state == DEV_STATE_UNMAPPED) { 1605 rnbd_clt_info(dev, "Device is already being unmapped\n"); 1606 ret = -EALREADY; 1607 goto err; 1608 } 1609 refcount = refcount_read(&dev->refcount); 1610 if (!force && refcount > 1) { 1611 rnbd_clt_err(dev, 1612 "Closing device failed, device is in use, (%d device users)\n", 1613 refcount - 1); 1614 ret = -EBUSY; 1615 goto err; 1616 } 1617 was_mapped = (dev->dev_state == DEV_STATE_MAPPED); 1618 dev->dev_state = DEV_STATE_UNMAPPED; 1619 mutex_unlock(&dev->lock); 1620 1621 delete_dev(dev); 1622 destroy_sysfs(dev, sysfs_self); 1623 destroy_gen_disk(dev); 1624 if (was_mapped && sess->rtrs) 1625 send_msg_close(dev, dev->device_id, WAIT); 1626 1627 rnbd_clt_info(dev, "Device is unmapped\n"); 1628 1629 /* Likely last reference put */ 1630 rnbd_clt_put_dev(dev); 1631 1632 /* 1633 * Here device and session can be vanished! 1634 */ 1635 1636 return 0; 1637 err: 1638 mutex_unlock(&dev->lock); 1639 1640 return ret; 1641 } 1642 1643 int rnbd_clt_remap_device(struct rnbd_clt_dev *dev) 1644 { 1645 int err; 1646 1647 mutex_lock(&dev->lock); 1648 if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) 1649 err = 0; 1650 else if (dev->dev_state == DEV_STATE_UNMAPPED) 1651 err = -ENODEV; 1652 else if (dev->dev_state == DEV_STATE_MAPPED) 1653 err = -EALREADY; 1654 else 1655 err = -EBUSY; 1656 mutex_unlock(&dev->lock); 1657 if (!err) { 1658 rnbd_clt_info(dev, "Remapping device.\n"); 1659 err = send_msg_open(dev, WAIT); 1660 if (err) 1661 rnbd_clt_err(dev, "remap_device: %d\n", err); 1662 } 1663 1664 return err; 1665 } 1666 1667 static void unmap_device_work(struct work_struct *work) 1668 { 1669 struct rnbd_clt_dev *dev; 1670 1671 dev = container_of(work, typeof(*dev), unmap_on_rmmod_work); 1672 rnbd_clt_unmap_device(dev, true, NULL); 1673 } 1674 1675 static void rnbd_destroy_sessions(void) 1676 { 1677 struct rnbd_clt_session *sess, *sn; 1678 struct rnbd_clt_dev *dev, *tn; 1679 1680 /* Firstly forbid access through sysfs interface */ 1681 rnbd_clt_destroy_default_group(); 1682 rnbd_clt_destroy_sysfs_files(); 1683 1684 /* 1685 * Here at this point there is no any concurrent access to sessions 1686 * list and devices list: 1687 * 1. New session or device can't be created - session sysfs files 1688 * are removed. 1689 * 2. Device or session can't be removed - module reference is taken 1690 * into account in unmap device sysfs callback. 1691 * 3. No IO requests inflight - each file open of block_dev increases 1692 * module reference in get_disk(). 1693 * 1694 * But still there can be user requests inflights, which are sent by 1695 * asynchronous send_msg_*() functions, thus before unmapping devices 1696 * RTRS session must be explicitly closed. 1697 */ 1698 1699 list_for_each_entry_safe(sess, sn, &sess_list, list) { 1700 if (!rnbd_clt_get_sess(sess)) 1701 continue; 1702 close_rtrs(sess); 1703 list_for_each_entry_safe(dev, tn, &sess->devs_list, list) { 1704 /* 1705 * Here unmap happens in parallel for only one reason: 1706 * blk_cleanup_queue() takes around half a second, so 1707 * on huge amount of devices the whole module unload 1708 * procedure takes minutes. 1709 */ 1710 INIT_WORK(&dev->unmap_on_rmmod_work, unmap_device_work); 1711 queue_work(system_long_wq, &dev->unmap_on_rmmod_work); 1712 } 1713 rnbd_clt_put_sess(sess); 1714 } 1715 /* Wait for all scheduled unmap works */ 1716 flush_workqueue(system_long_wq); 1717 WARN_ON(!list_empty(&sess_list)); 1718 } 1719 1720 static int __init rnbd_client_init(void) 1721 { 1722 int err = 0; 1723 1724 BUILD_BUG_ON(sizeof(struct rnbd_msg_hdr) != 4); 1725 BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info) != 36); 1726 BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info_rsp) != 36); 1727 BUILD_BUG_ON(sizeof(struct rnbd_msg_open) != 264); 1728 BUILD_BUG_ON(sizeof(struct rnbd_msg_close) != 8); 1729 BUILD_BUG_ON(sizeof(struct rnbd_msg_open_rsp) != 56); 1730 rnbd_client_major = register_blkdev(rnbd_client_major, "rnbd"); 1731 if (rnbd_client_major <= 0) { 1732 pr_err("Failed to load module, block device registration failed\n"); 1733 return -EBUSY; 1734 } 1735 1736 err = rnbd_clt_create_sysfs_files(); 1737 if (err) { 1738 pr_err("Failed to load module, creating sysfs device files failed, err: %d\n", 1739 err); 1740 unregister_blkdev(rnbd_client_major, "rnbd"); 1741 } 1742 1743 return err; 1744 } 1745 1746 static void __exit rnbd_client_exit(void) 1747 { 1748 rnbd_destroy_sessions(); 1749 unregister_blkdev(rnbd_client_major, "rnbd"); 1750 ida_destroy(&index_ida); 1751 } 1752 1753 module_init(rnbd_client_init); 1754 module_exit(rnbd_client_exit); 1755