1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Network Block Driver 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 #include <linux/blkdev.h> 15 #include <linux/hdreg.h> 16 #include <linux/scatterlist.h> 17 #include <linux/idr.h> 18 19 #include "rnbd-clt.h" 20 21 MODULE_DESCRIPTION("RDMA Network Block Device Client"); 22 MODULE_LICENSE("GPL"); 23 24 static int rnbd_client_major; 25 static DEFINE_IDA(index_ida); 26 static DEFINE_MUTEX(ida_lock); 27 static DEFINE_MUTEX(sess_lock); 28 static LIST_HEAD(sess_list); 29 30 /* 31 * Maximum number of partitions an instance can have. 32 * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself) 33 */ 34 #define RNBD_PART_BITS 6 35 36 static inline bool rnbd_clt_get_sess(struct rnbd_clt_session *sess) 37 { 38 return refcount_inc_not_zero(&sess->refcount); 39 } 40 41 static void free_sess(struct rnbd_clt_session *sess); 42 43 static void rnbd_clt_put_sess(struct rnbd_clt_session *sess) 44 { 45 might_sleep(); 46 47 if (refcount_dec_and_test(&sess->refcount)) 48 free_sess(sess); 49 } 50 51 static void rnbd_clt_put_dev(struct rnbd_clt_dev *dev) 52 { 53 might_sleep(); 54 55 if (!refcount_dec_and_test(&dev->refcount)) 56 return; 57 58 mutex_lock(&ida_lock); 59 ida_simple_remove(&index_ida, dev->clt_device_id); 60 mutex_unlock(&ida_lock); 61 kfree(dev->hw_queues); 62 rnbd_clt_put_sess(dev->sess); 63 mutex_destroy(&dev->lock); 64 kfree(dev); 65 } 66 67 static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev *dev) 68 { 69 return refcount_inc_not_zero(&dev->refcount); 70 } 71 72 static int rnbd_clt_set_dev_attr(struct rnbd_clt_dev *dev, 73 const struct rnbd_msg_open_rsp *rsp) 74 { 75 struct rnbd_clt_session *sess = dev->sess; 76 77 if (!rsp->logical_block_size) 78 return -EINVAL; 79 80 dev->device_id = le32_to_cpu(rsp->device_id); 81 dev->nsectors = le64_to_cpu(rsp->nsectors); 82 dev->logical_block_size = le16_to_cpu(rsp->logical_block_size); 83 dev->physical_block_size = le16_to_cpu(rsp->physical_block_size); 84 dev->max_write_same_sectors = le32_to_cpu(rsp->max_write_same_sectors); 85 dev->max_discard_sectors = le32_to_cpu(rsp->max_discard_sectors); 86 dev->discard_granularity = le32_to_cpu(rsp->discard_granularity); 87 dev->discard_alignment = le32_to_cpu(rsp->discard_alignment); 88 dev->secure_discard = le16_to_cpu(rsp->secure_discard); 89 dev->rotational = rsp->rotational; 90 91 dev->max_hw_sectors = sess->max_io_size / SECTOR_SIZE; 92 dev->max_segments = BMAX_SEGMENTS; 93 94 return 0; 95 } 96 97 static int rnbd_clt_change_capacity(struct rnbd_clt_dev *dev, 98 size_t new_nsectors) 99 { 100 rnbd_clt_info(dev, "Device size changed from %zu to %zu sectors\n", 101 dev->nsectors, new_nsectors); 102 dev->nsectors = new_nsectors; 103 set_capacity(dev->gd, dev->nsectors); 104 revalidate_disk_size(dev->gd, true); 105 return 0; 106 } 107 108 static int process_msg_open_rsp(struct rnbd_clt_dev *dev, 109 struct rnbd_msg_open_rsp *rsp) 110 { 111 int err = 0; 112 113 mutex_lock(&dev->lock); 114 if (dev->dev_state == DEV_STATE_UNMAPPED) { 115 rnbd_clt_info(dev, 116 "Ignoring Open-Response message from server for unmapped device\n"); 117 err = -ENOENT; 118 goto out; 119 } 120 if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) { 121 u64 nsectors = le64_to_cpu(rsp->nsectors); 122 123 /* 124 * If the device was remapped and the size changed in the 125 * meantime we need to revalidate it 126 */ 127 if (dev->nsectors != nsectors) 128 rnbd_clt_change_capacity(dev, nsectors); 129 rnbd_clt_info(dev, "Device online, device remapped successfully\n"); 130 } 131 err = rnbd_clt_set_dev_attr(dev, rsp); 132 if (err) 133 goto out; 134 dev->dev_state = DEV_STATE_MAPPED; 135 136 out: 137 mutex_unlock(&dev->lock); 138 139 return err; 140 } 141 142 int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, size_t newsize) 143 { 144 int ret = 0; 145 146 mutex_lock(&dev->lock); 147 if (dev->dev_state != DEV_STATE_MAPPED) { 148 pr_err("Failed to set new size of the device, device is not opened\n"); 149 ret = -ENOENT; 150 goto out; 151 } 152 ret = rnbd_clt_change_capacity(dev, newsize); 153 154 out: 155 mutex_unlock(&dev->lock); 156 157 return ret; 158 } 159 160 static inline void rnbd_clt_dev_requeue(struct rnbd_queue *q) 161 { 162 if (WARN_ON(!q->hctx)) 163 return; 164 165 /* We can come here from interrupt, thus async=true */ 166 blk_mq_run_hw_queue(q->hctx, true); 167 } 168 169 enum { 170 RNBD_DELAY_IFBUSY = -1, 171 }; 172 173 /** 174 * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun 175 * @sess: Session to find a queue for 176 * @cpu: Cpu to start the search from 177 * 178 * Description: 179 * Each CPU has a list of HW queues, which needs to be rerun. If a list 180 * is not empty - it is marked with a bit. This function finds first 181 * set bit in a bitmap and returns corresponding CPU list. 182 */ 183 static struct rnbd_cpu_qlist * 184 rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu) 185 { 186 int bit; 187 188 /* Search from cpu to nr_cpu_ids */ 189 bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu); 190 if (bit < nr_cpu_ids) { 191 return per_cpu_ptr(sess->cpu_queues, bit); 192 } else if (cpu != 0) { 193 /* Search from 0 to cpu */ 194 bit = find_next_bit(sess->cpu_queues_bm, cpu, 0); 195 if (bit < cpu) 196 return per_cpu_ptr(sess->cpu_queues, bit); 197 } 198 199 return NULL; 200 } 201 202 static inline int nxt_cpu(int cpu) 203 { 204 return (cpu + 1) % nr_cpu_ids; 205 } 206 207 /** 208 * rnbd_rerun_if_needed() - rerun next queue marked as stopped 209 * @sess: Session to rerun a queue on 210 * 211 * Description: 212 * Each CPU has it's own list of HW queues, which should be rerun. 213 * Function finds such list with HW queues, takes a list lock, picks up 214 * the first HW queue out of the list and requeues it. 215 * 216 * Return: 217 * True if the queue was requeued, false otherwise. 218 * 219 * Context: 220 * Does not matter. 221 */ 222 static bool rnbd_rerun_if_needed(struct rnbd_clt_session *sess) 223 { 224 struct rnbd_queue *q = NULL; 225 struct rnbd_cpu_qlist *cpu_q; 226 unsigned long flags; 227 int *cpup; 228 229 /* 230 * To keep fairness and not to let other queues starve we always 231 * try to wake up someone else in round-robin manner. That of course 232 * increases latency but queues always have a chance to be executed. 233 */ 234 cpup = get_cpu_ptr(sess->cpu_rr); 235 for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q; 236 cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) { 237 if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags)) 238 continue; 239 if (unlikely(!test_bit(cpu_q->cpu, sess->cpu_queues_bm))) 240 goto unlock; 241 q = list_first_entry_or_null(&cpu_q->requeue_list, 242 typeof(*q), requeue_list); 243 if (WARN_ON(!q)) 244 goto clear_bit; 245 list_del_init(&q->requeue_list); 246 clear_bit_unlock(0, &q->in_list); 247 248 if (list_empty(&cpu_q->requeue_list)) { 249 /* Clear bit if nothing is left */ 250 clear_bit: 251 clear_bit(cpu_q->cpu, sess->cpu_queues_bm); 252 } 253 unlock: 254 spin_unlock_irqrestore(&cpu_q->requeue_lock, flags); 255 256 if (q) 257 break; 258 } 259 260 /** 261 * Saves the CPU that is going to be requeued on the per-cpu var. Just 262 * incrementing it doesn't work because rnbd_get_cpu_qlist() will 263 * always return the first CPU with something on the queue list when the 264 * value stored on the var is greater than the last CPU with something 265 * on the list. 266 */ 267 if (cpu_q) 268 *cpup = cpu_q->cpu; 269 put_cpu_var(sess->cpu_rr); 270 271 if (q) 272 rnbd_clt_dev_requeue(q); 273 274 return q; 275 } 276 277 /** 278 * rnbd_rerun_all_if_idle() - rerun all queues left in the list if 279 * session is idling (there are no requests 280 * in-flight). 281 * @sess: Session to rerun the queues on 282 * 283 * Description: 284 * This function tries to rerun all stopped queues if there are no 285 * requests in-flight anymore. This function tries to solve an obvious 286 * problem, when number of tags < than number of queues (hctx), which 287 * are stopped and put to sleep. If last permit, which has been just put, 288 * does not wake up all left queues (hctxs), IO requests hang forever. 289 * 290 * That can happen when all number of permits, say N, have been exhausted 291 * from one CPU, and we have many block devices per session, say M. 292 * Each block device has it's own queue (hctx) for each CPU, so eventually 293 * we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids. 294 * If number of permits N < M x nr_cpu_ids finally we will get an IO hang. 295 * 296 * To avoid this hang last caller of rnbd_put_permit() (last caller is the 297 * one who observes sess->busy == 0) must wake up all remaining queues. 298 * 299 * Context: 300 * Does not matter. 301 */ 302 static void rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess) 303 { 304 bool requeued; 305 306 do { 307 requeued = rnbd_rerun_if_needed(sess); 308 } while (atomic_read(&sess->busy) == 0 && requeued); 309 } 310 311 static struct rtrs_permit *rnbd_get_permit(struct rnbd_clt_session *sess, 312 enum rtrs_clt_con_type con_type, 313 int wait) 314 { 315 struct rtrs_permit *permit; 316 317 permit = rtrs_clt_get_permit(sess->rtrs, con_type, 318 wait ? RTRS_PERMIT_WAIT : 319 RTRS_PERMIT_NOWAIT); 320 if (likely(permit)) 321 /* We have a subtle rare case here, when all permits can be 322 * consumed before busy counter increased. This is safe, 323 * because loser will get NULL as a permit, observe 0 busy 324 * counter and immediately restart the queue himself. 325 */ 326 atomic_inc(&sess->busy); 327 328 return permit; 329 } 330 331 static void rnbd_put_permit(struct rnbd_clt_session *sess, 332 struct rtrs_permit *permit) 333 { 334 rtrs_clt_put_permit(sess->rtrs, permit); 335 atomic_dec(&sess->busy); 336 /* Paired with rnbd_clt_dev_add_to_requeue(). Decrement first 337 * and then check queue bits. 338 */ 339 smp_mb__after_atomic(); 340 rnbd_rerun_all_if_idle(sess); 341 } 342 343 static struct rnbd_iu *rnbd_get_iu(struct rnbd_clt_session *sess, 344 enum rtrs_clt_con_type con_type, 345 int wait) 346 { 347 struct rnbd_iu *iu; 348 struct rtrs_permit *permit; 349 350 permit = rnbd_get_permit(sess, con_type, 351 wait ? RTRS_PERMIT_WAIT : 352 RTRS_PERMIT_NOWAIT); 353 if (unlikely(!permit)) 354 return NULL; 355 iu = rtrs_permit_to_pdu(permit); 356 iu->permit = permit; 357 /* 358 * 1st reference is dropped after finishing sending a "user" message, 359 * 2nd reference is dropped after confirmation with the response is 360 * returned. 361 * 1st and 2nd can happen in any order, so the rnbd_iu should be 362 * released (rtrs_permit returned to ibbtrs) only leased after both 363 * are finished. 364 */ 365 atomic_set(&iu->refcount, 2); 366 init_waitqueue_head(&iu->comp.wait); 367 iu->comp.errno = INT_MAX; 368 369 return iu; 370 } 371 372 static void rnbd_put_iu(struct rnbd_clt_session *sess, struct rnbd_iu *iu) 373 { 374 if (atomic_dec_and_test(&iu->refcount)) 375 rnbd_put_permit(sess, iu->permit); 376 } 377 378 static void rnbd_softirq_done_fn(struct request *rq) 379 { 380 struct rnbd_clt_dev *dev = rq->rq_disk->private_data; 381 struct rnbd_clt_session *sess = dev->sess; 382 struct rnbd_iu *iu; 383 384 iu = blk_mq_rq_to_pdu(rq); 385 rnbd_put_permit(sess, iu->permit); 386 blk_mq_end_request(rq, errno_to_blk_status(iu->errno)); 387 } 388 389 static void msg_io_conf(void *priv, int errno) 390 { 391 struct rnbd_iu *iu = priv; 392 struct rnbd_clt_dev *dev = iu->dev; 393 struct request *rq = iu->rq; 394 int rw = rq_data_dir(rq); 395 396 iu->errno = errno; 397 398 blk_mq_complete_request(rq); 399 400 if (errno) 401 rnbd_clt_info_rl(dev, "%s I/O failed with err: %d\n", 402 rw == READ ? "read" : "write", errno); 403 } 404 405 static void wake_up_iu_comp(struct rnbd_iu *iu, int errno) 406 { 407 iu->comp.errno = errno; 408 wake_up(&iu->comp.wait); 409 } 410 411 static void msg_conf(void *priv, int errno) 412 { 413 struct rnbd_iu *iu = priv; 414 415 iu->errno = errno; 416 schedule_work(&iu->work); 417 } 418 419 enum wait_type { 420 NO_WAIT = 0, 421 WAIT = 1 422 }; 423 424 static int send_usr_msg(struct rtrs_clt *rtrs, int dir, 425 struct rnbd_iu *iu, struct kvec *vec, 426 size_t len, struct scatterlist *sg, unsigned int sg_len, 427 void (*conf)(struct work_struct *work), 428 int *errno, enum wait_type wait) 429 { 430 int err; 431 struct rtrs_clt_req_ops req_ops; 432 433 INIT_WORK(&iu->work, conf); 434 req_ops = (struct rtrs_clt_req_ops) { 435 .priv = iu, 436 .conf_fn = msg_conf, 437 }; 438 err = rtrs_clt_request(dir, &req_ops, rtrs, iu->permit, 439 vec, 1, len, sg, sg_len); 440 if (!err && wait) { 441 wait_event(iu->comp.wait, iu->comp.errno != INT_MAX); 442 *errno = iu->comp.errno; 443 } else { 444 *errno = 0; 445 } 446 447 return err; 448 } 449 450 static void msg_close_conf(struct work_struct *work) 451 { 452 struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); 453 struct rnbd_clt_dev *dev = iu->dev; 454 455 wake_up_iu_comp(iu, iu->errno); 456 rnbd_put_iu(dev->sess, iu); 457 rnbd_clt_put_dev(dev); 458 } 459 460 static int send_msg_close(struct rnbd_clt_dev *dev, u32 device_id, bool wait) 461 { 462 struct rnbd_clt_session *sess = dev->sess; 463 struct rnbd_msg_close msg; 464 struct rnbd_iu *iu; 465 struct kvec vec = { 466 .iov_base = &msg, 467 .iov_len = sizeof(msg) 468 }; 469 int err, errno; 470 471 iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); 472 if (!iu) 473 return -ENOMEM; 474 475 iu->buf = NULL; 476 iu->dev = dev; 477 478 sg_mark_end(&iu->sglist[0]); 479 480 msg.hdr.type = cpu_to_le16(RNBD_MSG_CLOSE); 481 msg.device_id = cpu_to_le32(device_id); 482 483 WARN_ON(!rnbd_clt_get_dev(dev)); 484 err = send_usr_msg(sess->rtrs, WRITE, iu, &vec, 0, NULL, 0, 485 msg_close_conf, &errno, wait); 486 if (err) { 487 rnbd_clt_put_dev(dev); 488 rnbd_put_iu(sess, iu); 489 } else { 490 err = errno; 491 } 492 493 rnbd_put_iu(sess, iu); 494 return err; 495 } 496 497 static void msg_open_conf(struct work_struct *work) 498 { 499 struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); 500 struct rnbd_msg_open_rsp *rsp = iu->buf; 501 struct rnbd_clt_dev *dev = iu->dev; 502 int errno = iu->errno; 503 504 if (errno) { 505 rnbd_clt_err(dev, 506 "Opening failed, server responded: %d\n", 507 errno); 508 } else { 509 errno = process_msg_open_rsp(dev, rsp); 510 if (errno) { 511 u32 device_id = le32_to_cpu(rsp->device_id); 512 /* 513 * If server thinks its fine, but we fail to process 514 * then be nice and send a close to server. 515 */ 516 (void)send_msg_close(dev, device_id, NO_WAIT); 517 } 518 } 519 kfree(rsp); 520 wake_up_iu_comp(iu, errno); 521 rnbd_put_iu(dev->sess, iu); 522 rnbd_clt_put_dev(dev); 523 } 524 525 static void msg_sess_info_conf(struct work_struct *work) 526 { 527 struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); 528 struct rnbd_msg_sess_info_rsp *rsp = iu->buf; 529 struct rnbd_clt_session *sess = iu->sess; 530 531 if (!iu->errno) 532 sess->ver = min_t(u8, rsp->ver, RNBD_PROTO_VER_MAJOR); 533 534 kfree(rsp); 535 wake_up_iu_comp(iu, iu->errno); 536 rnbd_put_iu(sess, iu); 537 rnbd_clt_put_sess(sess); 538 } 539 540 static int send_msg_open(struct rnbd_clt_dev *dev, bool wait) 541 { 542 struct rnbd_clt_session *sess = dev->sess; 543 struct rnbd_msg_open_rsp *rsp; 544 struct rnbd_msg_open msg; 545 struct rnbd_iu *iu; 546 struct kvec vec = { 547 .iov_base = &msg, 548 .iov_len = sizeof(msg) 549 }; 550 int err, errno; 551 552 rsp = kzalloc(sizeof(*rsp), GFP_KERNEL); 553 if (!rsp) 554 return -ENOMEM; 555 556 iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); 557 if (!iu) { 558 kfree(rsp); 559 return -ENOMEM; 560 } 561 562 iu->buf = rsp; 563 iu->dev = dev; 564 565 sg_init_one(iu->sglist, rsp, sizeof(*rsp)); 566 567 msg.hdr.type = cpu_to_le16(RNBD_MSG_OPEN); 568 msg.access_mode = dev->access_mode; 569 strlcpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name)); 570 571 WARN_ON(!rnbd_clt_get_dev(dev)); 572 err = send_usr_msg(sess->rtrs, READ, iu, 573 &vec, sizeof(*rsp), iu->sglist, 1, 574 msg_open_conf, &errno, wait); 575 if (err) { 576 rnbd_clt_put_dev(dev); 577 rnbd_put_iu(sess, iu); 578 kfree(rsp); 579 } else { 580 err = errno; 581 } 582 583 rnbd_put_iu(sess, iu); 584 return err; 585 } 586 587 static int send_msg_sess_info(struct rnbd_clt_session *sess, bool wait) 588 { 589 struct rnbd_msg_sess_info_rsp *rsp; 590 struct rnbd_msg_sess_info msg; 591 struct rnbd_iu *iu; 592 struct kvec vec = { 593 .iov_base = &msg, 594 .iov_len = sizeof(msg) 595 }; 596 int err, errno; 597 598 rsp = kzalloc(sizeof(*rsp), GFP_KERNEL); 599 if (!rsp) 600 return -ENOMEM; 601 602 iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); 603 if (!iu) { 604 kfree(rsp); 605 return -ENOMEM; 606 } 607 608 iu->buf = rsp; 609 iu->sess = sess; 610 611 sg_init_one(iu->sglist, rsp, sizeof(*rsp)); 612 613 msg.hdr.type = cpu_to_le16(RNBD_MSG_SESS_INFO); 614 msg.ver = RNBD_PROTO_VER_MAJOR; 615 616 if (!rnbd_clt_get_sess(sess)) { 617 /* 618 * That can happen only in one case, when RTRS has restablished 619 * the connection and link_ev() is called, but session is almost 620 * dead, last reference on session is put and caller is waiting 621 * for RTRS to close everything. 622 */ 623 err = -ENODEV; 624 goto put_iu; 625 } 626 err = send_usr_msg(sess->rtrs, READ, iu, 627 &vec, sizeof(*rsp), iu->sglist, 1, 628 msg_sess_info_conf, &errno, wait); 629 if (err) { 630 rnbd_clt_put_sess(sess); 631 put_iu: 632 rnbd_put_iu(sess, iu); 633 kfree(rsp); 634 } else { 635 err = errno; 636 } 637 638 rnbd_put_iu(sess, iu); 639 return err; 640 } 641 642 static void set_dev_states_to_disconnected(struct rnbd_clt_session *sess) 643 { 644 struct rnbd_clt_dev *dev; 645 646 mutex_lock(&sess->lock); 647 list_for_each_entry(dev, &sess->devs_list, list) { 648 rnbd_clt_err(dev, "Device disconnected.\n"); 649 650 mutex_lock(&dev->lock); 651 if (dev->dev_state == DEV_STATE_MAPPED) 652 dev->dev_state = DEV_STATE_MAPPED_DISCONNECTED; 653 mutex_unlock(&dev->lock); 654 } 655 mutex_unlock(&sess->lock); 656 } 657 658 static void remap_devs(struct rnbd_clt_session *sess) 659 { 660 struct rnbd_clt_dev *dev; 661 struct rtrs_attrs attrs; 662 int err; 663 664 /* 665 * Careful here: we are called from RTRS link event directly, 666 * thus we can't send any RTRS request and wait for response 667 * or RTRS will not be able to complete request with failure 668 * if something goes wrong (failing of outstanding requests 669 * happens exactly from the context where we are blocking now). 670 * 671 * So to avoid deadlocks each usr message sent from here must 672 * be asynchronous. 673 */ 674 675 err = send_msg_sess_info(sess, NO_WAIT); 676 if (err) { 677 pr_err("send_msg_sess_info(\"%s\"): %d\n", sess->sessname, err); 678 return; 679 } 680 681 rtrs_clt_query(sess->rtrs, &attrs); 682 mutex_lock(&sess->lock); 683 sess->max_io_size = attrs.max_io_size; 684 685 list_for_each_entry(dev, &sess->devs_list, list) { 686 bool skip; 687 688 mutex_lock(&dev->lock); 689 skip = (dev->dev_state == DEV_STATE_INIT); 690 mutex_unlock(&dev->lock); 691 if (skip) 692 /* 693 * When device is establishing connection for the first 694 * time - do not remap, it will be closed soon. 695 */ 696 continue; 697 698 rnbd_clt_info(dev, "session reconnected, remapping device\n"); 699 err = send_msg_open(dev, NO_WAIT); 700 if (err) { 701 rnbd_clt_err(dev, "send_msg_open(): %d\n", err); 702 break; 703 } 704 } 705 mutex_unlock(&sess->lock); 706 } 707 708 static void rnbd_clt_link_ev(void *priv, enum rtrs_clt_link_ev ev) 709 { 710 struct rnbd_clt_session *sess = priv; 711 712 switch (ev) { 713 case RTRS_CLT_LINK_EV_DISCONNECTED: 714 set_dev_states_to_disconnected(sess); 715 break; 716 case RTRS_CLT_LINK_EV_RECONNECTED: 717 remap_devs(sess); 718 break; 719 default: 720 pr_err("Unknown session event received (%d), session: %s\n", 721 ev, sess->sessname); 722 } 723 } 724 725 static void rnbd_init_cpu_qlists(struct rnbd_cpu_qlist __percpu *cpu_queues) 726 { 727 unsigned int cpu; 728 struct rnbd_cpu_qlist *cpu_q; 729 730 for_each_possible_cpu(cpu) { 731 cpu_q = per_cpu_ptr(cpu_queues, cpu); 732 733 cpu_q->cpu = cpu; 734 INIT_LIST_HEAD(&cpu_q->requeue_list); 735 spin_lock_init(&cpu_q->requeue_lock); 736 } 737 } 738 739 static void destroy_mq_tags(struct rnbd_clt_session *sess) 740 { 741 if (sess->tag_set.tags) 742 blk_mq_free_tag_set(&sess->tag_set); 743 } 744 745 static inline void wake_up_rtrs_waiters(struct rnbd_clt_session *sess) 746 { 747 sess->rtrs_ready = true; 748 wake_up_all(&sess->rtrs_waitq); 749 } 750 751 static void close_rtrs(struct rnbd_clt_session *sess) 752 { 753 might_sleep(); 754 755 if (!IS_ERR_OR_NULL(sess->rtrs)) { 756 rtrs_clt_close(sess->rtrs); 757 sess->rtrs = NULL; 758 wake_up_rtrs_waiters(sess); 759 } 760 } 761 762 static void free_sess(struct rnbd_clt_session *sess) 763 { 764 WARN_ON(!list_empty(&sess->devs_list)); 765 766 might_sleep(); 767 768 close_rtrs(sess); 769 destroy_mq_tags(sess); 770 if (!list_empty(&sess->list)) { 771 mutex_lock(&sess_lock); 772 list_del(&sess->list); 773 mutex_unlock(&sess_lock); 774 } 775 free_percpu(sess->cpu_queues); 776 free_percpu(sess->cpu_rr); 777 mutex_destroy(&sess->lock); 778 kfree(sess); 779 } 780 781 static struct rnbd_clt_session *alloc_sess(const char *sessname) 782 { 783 struct rnbd_clt_session *sess; 784 int err, cpu; 785 786 sess = kzalloc_node(sizeof(*sess), GFP_KERNEL, NUMA_NO_NODE); 787 if (!sess) 788 return ERR_PTR(-ENOMEM); 789 strlcpy(sess->sessname, sessname, sizeof(sess->sessname)); 790 atomic_set(&sess->busy, 0); 791 mutex_init(&sess->lock); 792 INIT_LIST_HEAD(&sess->devs_list); 793 INIT_LIST_HEAD(&sess->list); 794 bitmap_zero(sess->cpu_queues_bm, NR_CPUS); 795 init_waitqueue_head(&sess->rtrs_waitq); 796 refcount_set(&sess->refcount, 1); 797 798 sess->cpu_queues = alloc_percpu(struct rnbd_cpu_qlist); 799 if (!sess->cpu_queues) { 800 err = -ENOMEM; 801 goto err; 802 } 803 rnbd_init_cpu_qlists(sess->cpu_queues); 804 805 /* 806 * That is simple percpu variable which stores cpu indeces, which are 807 * incremented on each access. We need that for the sake of fairness 808 * to wake up queues in a round-robin manner. 809 */ 810 sess->cpu_rr = alloc_percpu(int); 811 if (!sess->cpu_rr) { 812 err = -ENOMEM; 813 goto err; 814 } 815 for_each_possible_cpu(cpu) 816 * per_cpu_ptr(sess->cpu_rr, cpu) = cpu; 817 818 return sess; 819 820 err: 821 free_sess(sess); 822 823 return ERR_PTR(err); 824 } 825 826 static int wait_for_rtrs_connection(struct rnbd_clt_session *sess) 827 { 828 wait_event(sess->rtrs_waitq, sess->rtrs_ready); 829 if (IS_ERR_OR_NULL(sess->rtrs)) 830 return -ECONNRESET; 831 832 return 0; 833 } 834 835 static void wait_for_rtrs_disconnection(struct rnbd_clt_session *sess) 836 __releases(&sess_lock) 837 __acquires(&sess_lock) 838 { 839 DEFINE_WAIT(wait); 840 841 prepare_to_wait(&sess->rtrs_waitq, &wait, TASK_UNINTERRUPTIBLE); 842 if (IS_ERR_OR_NULL(sess->rtrs)) { 843 finish_wait(&sess->rtrs_waitq, &wait); 844 return; 845 } 846 mutex_unlock(&sess_lock); 847 /* loop in caller, see __find_and_get_sess(). 848 * You can't leave mutex locked and call schedule(), you will catch a 849 * deadlock with a caller of free_sess(), which has just put the last 850 * reference and is about to take the sess_lock in order to delete 851 * the session from the list. 852 */ 853 schedule(); 854 mutex_lock(&sess_lock); 855 } 856 857 static struct rnbd_clt_session *__find_and_get_sess(const char *sessname) 858 __releases(&sess_lock) 859 __acquires(&sess_lock) 860 { 861 struct rnbd_clt_session *sess, *sn; 862 int err; 863 864 again: 865 list_for_each_entry_safe(sess, sn, &sess_list, list) { 866 if (strcmp(sessname, sess->sessname)) 867 continue; 868 869 if (sess->rtrs_ready && IS_ERR_OR_NULL(sess->rtrs)) 870 /* 871 * No RTRS connection, session is dying. 872 */ 873 continue; 874 875 if (rnbd_clt_get_sess(sess)) { 876 /* 877 * Alive session is found, wait for RTRS connection. 878 */ 879 mutex_unlock(&sess_lock); 880 err = wait_for_rtrs_connection(sess); 881 if (err) 882 rnbd_clt_put_sess(sess); 883 mutex_lock(&sess_lock); 884 885 if (err) 886 /* Session is dying, repeat the loop */ 887 goto again; 888 889 return sess; 890 } 891 /* 892 * Ref is 0, session is dying, wait for RTRS disconnect 893 * in order to avoid session names clashes. 894 */ 895 wait_for_rtrs_disconnection(sess); 896 /* 897 * RTRS is disconnected and soon session will be freed, 898 * so repeat a loop. 899 */ 900 goto again; 901 } 902 903 return NULL; 904 } 905 906 static struct 907 rnbd_clt_session *find_or_create_sess(const char *sessname, bool *first) 908 { 909 struct rnbd_clt_session *sess = NULL; 910 911 mutex_lock(&sess_lock); 912 sess = __find_and_get_sess(sessname); 913 if (!sess) { 914 sess = alloc_sess(sessname); 915 if (IS_ERR(sess)) { 916 mutex_unlock(&sess_lock); 917 return sess; 918 } 919 list_add(&sess->list, &sess_list); 920 *first = true; 921 } else 922 *first = false; 923 mutex_unlock(&sess_lock); 924 925 return sess; 926 } 927 928 static int rnbd_client_open(struct block_device *block_device, fmode_t mode) 929 { 930 struct rnbd_clt_dev *dev = block_device->bd_disk->private_data; 931 932 if (dev->read_only && (mode & FMODE_WRITE)) 933 return -EPERM; 934 935 if (dev->dev_state == DEV_STATE_UNMAPPED || 936 !rnbd_clt_get_dev(dev)) 937 return -EIO; 938 939 return 0; 940 } 941 942 static void rnbd_client_release(struct gendisk *gen, fmode_t mode) 943 { 944 struct rnbd_clt_dev *dev = gen->private_data; 945 946 rnbd_clt_put_dev(dev); 947 } 948 949 static int rnbd_client_getgeo(struct block_device *block_device, 950 struct hd_geometry *geo) 951 { 952 u64 size; 953 struct rnbd_clt_dev *dev; 954 955 dev = block_device->bd_disk->private_data; 956 size = dev->size * (dev->logical_block_size / SECTOR_SIZE); 957 geo->cylinders = size >> 6; /* size/64 */ 958 geo->heads = 4; 959 geo->sectors = 16; 960 geo->start = 0; 961 962 return 0; 963 } 964 965 static const struct block_device_operations rnbd_client_ops = { 966 .owner = THIS_MODULE, 967 .open = rnbd_client_open, 968 .release = rnbd_client_release, 969 .getgeo = rnbd_client_getgeo 970 }; 971 972 /* The amount of data that belongs to an I/O and the amount of data that 973 * should be read or written to the disk (bi_size) can differ. 974 * 975 * E.g. When WRITE_SAME is used, only a small amount of data is 976 * transferred that is then written repeatedly over a lot of sectors. 977 * 978 * Get the size of data to be transferred via RTRS by summing up the size 979 * of the scather-gather list entries. 980 */ 981 static size_t rnbd_clt_get_sg_size(struct scatterlist *sglist, u32 len) 982 { 983 struct scatterlist *sg; 984 size_t tsize = 0; 985 int i; 986 987 for_each_sg(sglist, sg, len, i) 988 tsize += sg->length; 989 return tsize; 990 } 991 992 static int rnbd_client_xfer_request(struct rnbd_clt_dev *dev, 993 struct request *rq, 994 struct rnbd_iu *iu) 995 { 996 struct rtrs_clt *rtrs = dev->sess->rtrs; 997 struct rtrs_permit *permit = iu->permit; 998 struct rnbd_msg_io msg; 999 struct rtrs_clt_req_ops req_ops; 1000 unsigned int sg_cnt = 0; 1001 struct kvec vec; 1002 size_t size; 1003 int err; 1004 1005 iu->rq = rq; 1006 iu->dev = dev; 1007 msg.sector = cpu_to_le64(blk_rq_pos(rq)); 1008 msg.bi_size = cpu_to_le32(blk_rq_bytes(rq)); 1009 msg.rw = cpu_to_le32(rq_to_rnbd_flags(rq)); 1010 msg.prio = cpu_to_le16(req_get_ioprio(rq)); 1011 1012 /* 1013 * We only support discards with single segment for now. 1014 * See queue limits. 1015 */ 1016 if (req_op(rq) != REQ_OP_DISCARD) 1017 sg_cnt = blk_rq_map_sg(dev->queue, rq, iu->sglist); 1018 1019 if (sg_cnt == 0) 1020 /* Do not forget to mark the end */ 1021 sg_mark_end(&iu->sglist[0]); 1022 1023 msg.hdr.type = cpu_to_le16(RNBD_MSG_IO); 1024 msg.device_id = cpu_to_le32(dev->device_id); 1025 1026 vec = (struct kvec) { 1027 .iov_base = &msg, 1028 .iov_len = sizeof(msg) 1029 }; 1030 size = rnbd_clt_get_sg_size(iu->sglist, sg_cnt); 1031 req_ops = (struct rtrs_clt_req_ops) { 1032 .priv = iu, 1033 .conf_fn = msg_io_conf, 1034 }; 1035 err = rtrs_clt_request(rq_data_dir(rq), &req_ops, rtrs, permit, 1036 &vec, 1, size, iu->sglist, sg_cnt); 1037 if (unlikely(err)) { 1038 rnbd_clt_err_rl(dev, "RTRS failed to transfer IO, err: %d\n", 1039 err); 1040 return err; 1041 } 1042 1043 return 0; 1044 } 1045 1046 /** 1047 * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy 1048 * @dev: Device to be checked 1049 * @q: Queue to be added to the requeue list if required 1050 * 1051 * Description: 1052 * If session is busy, that means someone will requeue us when resources 1053 * are freed. If session is not doing anything - device is not added to 1054 * the list and @false is returned. 1055 */ 1056 static bool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev *dev, 1057 struct rnbd_queue *q) 1058 { 1059 struct rnbd_clt_session *sess = dev->sess; 1060 struct rnbd_cpu_qlist *cpu_q; 1061 unsigned long flags; 1062 bool added = true; 1063 bool need_set; 1064 1065 cpu_q = get_cpu_ptr(sess->cpu_queues); 1066 spin_lock_irqsave(&cpu_q->requeue_lock, flags); 1067 1068 if (likely(!test_and_set_bit_lock(0, &q->in_list))) { 1069 if (WARN_ON(!list_empty(&q->requeue_list))) 1070 goto unlock; 1071 1072 need_set = !test_bit(cpu_q->cpu, sess->cpu_queues_bm); 1073 if (need_set) { 1074 set_bit(cpu_q->cpu, sess->cpu_queues_bm); 1075 /* Paired with rnbd_put_permit(). Set a bit first 1076 * and then observe the busy counter. 1077 */ 1078 smp_mb__before_atomic(); 1079 } 1080 if (likely(atomic_read(&sess->busy))) { 1081 list_add_tail(&q->requeue_list, &cpu_q->requeue_list); 1082 } else { 1083 /* Very unlikely, but possible: busy counter was 1084 * observed as zero. Drop all bits and return 1085 * false to restart the queue by ourselves. 1086 */ 1087 if (need_set) 1088 clear_bit(cpu_q->cpu, sess->cpu_queues_bm); 1089 clear_bit_unlock(0, &q->in_list); 1090 added = false; 1091 } 1092 } 1093 unlock: 1094 spin_unlock_irqrestore(&cpu_q->requeue_lock, flags); 1095 put_cpu_ptr(sess->cpu_queues); 1096 1097 return added; 1098 } 1099 1100 static void rnbd_clt_dev_kick_mq_queue(struct rnbd_clt_dev *dev, 1101 struct blk_mq_hw_ctx *hctx, 1102 int delay) 1103 { 1104 struct rnbd_queue *q = hctx->driver_data; 1105 1106 if (delay != RNBD_DELAY_IFBUSY) 1107 blk_mq_delay_run_hw_queue(hctx, delay); 1108 else if (unlikely(!rnbd_clt_dev_add_to_requeue(dev, q))) 1109 /* 1110 * If session is not busy we have to restart 1111 * the queue ourselves. 1112 */ 1113 blk_mq_delay_run_hw_queue(hctx, 10/*ms*/); 1114 } 1115 1116 static blk_status_t rnbd_queue_rq(struct blk_mq_hw_ctx *hctx, 1117 const struct blk_mq_queue_data *bd) 1118 { 1119 struct request *rq = bd->rq; 1120 struct rnbd_clt_dev *dev = rq->rq_disk->private_data; 1121 struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq); 1122 int err; 1123 1124 if (unlikely(dev->dev_state != DEV_STATE_MAPPED)) 1125 return BLK_STS_IOERR; 1126 1127 iu->permit = rnbd_get_permit(dev->sess, RTRS_IO_CON, 1128 RTRS_PERMIT_NOWAIT); 1129 if (unlikely(!iu->permit)) { 1130 rnbd_clt_dev_kick_mq_queue(dev, hctx, RNBD_DELAY_IFBUSY); 1131 return BLK_STS_RESOURCE; 1132 } 1133 1134 blk_mq_start_request(rq); 1135 err = rnbd_client_xfer_request(dev, rq, iu); 1136 if (likely(err == 0)) 1137 return BLK_STS_OK; 1138 if (unlikely(err == -EAGAIN || err == -ENOMEM)) { 1139 rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/); 1140 rnbd_put_permit(dev->sess, iu->permit); 1141 return BLK_STS_RESOURCE; 1142 } 1143 1144 rnbd_put_permit(dev->sess, iu->permit); 1145 return BLK_STS_IOERR; 1146 } 1147 1148 static int rnbd_init_request(struct blk_mq_tag_set *set, struct request *rq, 1149 unsigned int hctx_idx, unsigned int numa_node) 1150 { 1151 struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq); 1152 1153 sg_init_table(iu->sglist, BMAX_SEGMENTS); 1154 return 0; 1155 } 1156 1157 static struct blk_mq_ops rnbd_mq_ops = { 1158 .queue_rq = rnbd_queue_rq, 1159 .init_request = rnbd_init_request, 1160 .complete = rnbd_softirq_done_fn, 1161 }; 1162 1163 static int setup_mq_tags(struct rnbd_clt_session *sess) 1164 { 1165 struct blk_mq_tag_set *tag_set = &sess->tag_set; 1166 1167 memset(tag_set, 0, sizeof(*tag_set)); 1168 tag_set->ops = &rnbd_mq_ops; 1169 tag_set->queue_depth = sess->queue_depth; 1170 tag_set->numa_node = NUMA_NO_NODE; 1171 tag_set->flags = BLK_MQ_F_SHOULD_MERGE | 1172 BLK_MQ_F_TAG_QUEUE_SHARED; 1173 tag_set->cmd_size = sizeof(struct rnbd_iu); 1174 tag_set->nr_hw_queues = num_online_cpus(); 1175 1176 return blk_mq_alloc_tag_set(tag_set); 1177 } 1178 1179 static struct rnbd_clt_session * 1180 find_and_get_or_create_sess(const char *sessname, 1181 const struct rtrs_addr *paths, 1182 size_t path_cnt, u16 port_nr) 1183 { 1184 struct rnbd_clt_session *sess; 1185 struct rtrs_attrs attrs; 1186 int err; 1187 bool first; 1188 struct rtrs_clt_ops rtrs_ops; 1189 1190 sess = find_or_create_sess(sessname, &first); 1191 if (sess == ERR_PTR(-ENOMEM)) 1192 return ERR_PTR(-ENOMEM); 1193 else if (!first) 1194 return sess; 1195 1196 rtrs_ops = (struct rtrs_clt_ops) { 1197 .priv = sess, 1198 .link_ev = rnbd_clt_link_ev, 1199 }; 1200 /* 1201 * Nothing was found, establish rtrs connection and proceed further. 1202 */ 1203 sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname, 1204 paths, path_cnt, port_nr, 1205 sizeof(struct rnbd_iu), 1206 RECONNECT_DELAY, BMAX_SEGMENTS, 1207 BLK_MAX_SEGMENT_SIZE, 1208 MAX_RECONNECTS); 1209 if (IS_ERR(sess->rtrs)) { 1210 err = PTR_ERR(sess->rtrs); 1211 goto wake_up_and_put; 1212 } 1213 rtrs_clt_query(sess->rtrs, &attrs); 1214 sess->max_io_size = attrs.max_io_size; 1215 sess->queue_depth = attrs.queue_depth; 1216 1217 err = setup_mq_tags(sess); 1218 if (err) 1219 goto close_rtrs; 1220 1221 err = send_msg_sess_info(sess, WAIT); 1222 if (err) 1223 goto close_rtrs; 1224 1225 wake_up_rtrs_waiters(sess); 1226 1227 return sess; 1228 1229 close_rtrs: 1230 close_rtrs(sess); 1231 put_sess: 1232 rnbd_clt_put_sess(sess); 1233 1234 return ERR_PTR(err); 1235 1236 wake_up_and_put: 1237 wake_up_rtrs_waiters(sess); 1238 goto put_sess; 1239 } 1240 1241 static inline void rnbd_init_hw_queue(struct rnbd_clt_dev *dev, 1242 struct rnbd_queue *q, 1243 struct blk_mq_hw_ctx *hctx) 1244 { 1245 INIT_LIST_HEAD(&q->requeue_list); 1246 q->dev = dev; 1247 q->hctx = hctx; 1248 } 1249 1250 static void rnbd_init_mq_hw_queues(struct rnbd_clt_dev *dev) 1251 { 1252 int i; 1253 struct blk_mq_hw_ctx *hctx; 1254 struct rnbd_queue *q; 1255 1256 queue_for_each_hw_ctx(dev->queue, hctx, i) { 1257 q = &dev->hw_queues[i]; 1258 rnbd_init_hw_queue(dev, q, hctx); 1259 hctx->driver_data = q; 1260 } 1261 } 1262 1263 static int setup_mq_dev(struct rnbd_clt_dev *dev) 1264 { 1265 dev->queue = blk_mq_init_queue(&dev->sess->tag_set); 1266 if (IS_ERR(dev->queue)) { 1267 rnbd_clt_err(dev, "Initializing multiqueue queue failed, err: %ld\n", 1268 PTR_ERR(dev->queue)); 1269 return PTR_ERR(dev->queue); 1270 } 1271 rnbd_init_mq_hw_queues(dev); 1272 return 0; 1273 } 1274 1275 static void setup_request_queue(struct rnbd_clt_dev *dev) 1276 { 1277 blk_queue_logical_block_size(dev->queue, dev->logical_block_size); 1278 blk_queue_physical_block_size(dev->queue, dev->physical_block_size); 1279 blk_queue_max_hw_sectors(dev->queue, dev->max_hw_sectors); 1280 blk_queue_max_write_same_sectors(dev->queue, 1281 dev->max_write_same_sectors); 1282 1283 /* 1284 * we don't support discards to "discontiguous" segments 1285 * in on request 1286 */ 1287 blk_queue_max_discard_segments(dev->queue, 1); 1288 1289 blk_queue_max_discard_sectors(dev->queue, dev->max_discard_sectors); 1290 dev->queue->limits.discard_granularity = dev->discard_granularity; 1291 dev->queue->limits.discard_alignment = dev->discard_alignment; 1292 if (dev->max_discard_sectors) 1293 blk_queue_flag_set(QUEUE_FLAG_DISCARD, dev->queue); 1294 if (dev->secure_discard) 1295 blk_queue_flag_set(QUEUE_FLAG_SECERASE, dev->queue); 1296 1297 blk_queue_flag_set(QUEUE_FLAG_SAME_COMP, dev->queue); 1298 blk_queue_flag_set(QUEUE_FLAG_SAME_FORCE, dev->queue); 1299 blk_queue_max_segments(dev->queue, dev->max_segments); 1300 blk_queue_io_opt(dev->queue, dev->sess->max_io_size); 1301 blk_queue_virt_boundary(dev->queue, SZ_4K - 1); 1302 blk_queue_write_cache(dev->queue, true, true); 1303 dev->queue->queuedata = dev; 1304 } 1305 1306 static void rnbd_clt_setup_gen_disk(struct rnbd_clt_dev *dev, int idx) 1307 { 1308 dev->gd->major = rnbd_client_major; 1309 dev->gd->first_minor = idx << RNBD_PART_BITS; 1310 dev->gd->fops = &rnbd_client_ops; 1311 dev->gd->queue = dev->queue; 1312 dev->gd->private_data = dev; 1313 snprintf(dev->gd->disk_name, sizeof(dev->gd->disk_name), "rnbd%d", 1314 idx); 1315 pr_debug("disk_name=%s, capacity=%zu\n", 1316 dev->gd->disk_name, 1317 dev->nsectors * (dev->logical_block_size / SECTOR_SIZE) 1318 ); 1319 1320 set_capacity(dev->gd, dev->nsectors); 1321 1322 if (dev->access_mode == RNBD_ACCESS_RO) { 1323 dev->read_only = true; 1324 set_disk_ro(dev->gd, true); 1325 } else { 1326 dev->read_only = false; 1327 } 1328 1329 if (!dev->rotational) 1330 blk_queue_flag_set(QUEUE_FLAG_NONROT, dev->queue); 1331 } 1332 1333 static int rnbd_client_setup_device(struct rnbd_clt_session *sess, 1334 struct rnbd_clt_dev *dev, int idx) 1335 { 1336 int err; 1337 1338 dev->size = dev->nsectors * dev->logical_block_size; 1339 1340 err = setup_mq_dev(dev); 1341 if (err) 1342 return err; 1343 1344 setup_request_queue(dev); 1345 1346 dev->gd = alloc_disk_node(1 << RNBD_PART_BITS, NUMA_NO_NODE); 1347 if (!dev->gd) { 1348 blk_cleanup_queue(dev->queue); 1349 return -ENOMEM; 1350 } 1351 1352 rnbd_clt_setup_gen_disk(dev, idx); 1353 1354 return 0; 1355 } 1356 1357 static struct rnbd_clt_dev *init_dev(struct rnbd_clt_session *sess, 1358 enum rnbd_access_mode access_mode, 1359 const char *pathname) 1360 { 1361 struct rnbd_clt_dev *dev; 1362 int ret; 1363 1364 dev = kzalloc_node(sizeof(*dev), GFP_KERNEL, NUMA_NO_NODE); 1365 if (!dev) 1366 return ERR_PTR(-ENOMEM); 1367 1368 dev->hw_queues = kcalloc(nr_cpu_ids, sizeof(*dev->hw_queues), 1369 GFP_KERNEL); 1370 if (!dev->hw_queues) { 1371 ret = -ENOMEM; 1372 goto out_alloc; 1373 } 1374 1375 mutex_lock(&ida_lock); 1376 ret = ida_simple_get(&index_ida, 0, 1 << (MINORBITS - RNBD_PART_BITS), 1377 GFP_KERNEL); 1378 mutex_unlock(&ida_lock); 1379 if (ret < 0) { 1380 pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n", 1381 pathname, sess->sessname, ret); 1382 goto out_queues; 1383 } 1384 dev->clt_device_id = ret; 1385 dev->sess = sess; 1386 dev->access_mode = access_mode; 1387 strlcpy(dev->pathname, pathname, sizeof(dev->pathname)); 1388 mutex_init(&dev->lock); 1389 refcount_set(&dev->refcount, 1); 1390 dev->dev_state = DEV_STATE_INIT; 1391 1392 /* 1393 * Here we called from sysfs entry, thus clt-sysfs is 1394 * responsible that session will not disappear. 1395 */ 1396 WARN_ON(!rnbd_clt_get_sess(sess)); 1397 1398 return dev; 1399 1400 out_queues: 1401 kfree(dev->hw_queues); 1402 out_alloc: 1403 kfree(dev); 1404 return ERR_PTR(ret); 1405 } 1406 1407 static bool __exists_dev(const char *pathname) 1408 { 1409 struct rnbd_clt_session *sess; 1410 struct rnbd_clt_dev *dev; 1411 bool found = false; 1412 1413 list_for_each_entry(sess, &sess_list, list) { 1414 mutex_lock(&sess->lock); 1415 list_for_each_entry(dev, &sess->devs_list, list) { 1416 if (!strncmp(dev->pathname, pathname, 1417 sizeof(dev->pathname))) { 1418 found = true; 1419 break; 1420 } 1421 } 1422 mutex_unlock(&sess->lock); 1423 if (found) 1424 break; 1425 } 1426 1427 return found; 1428 } 1429 1430 static bool exists_devpath(const char *pathname) 1431 { 1432 bool found; 1433 1434 mutex_lock(&sess_lock); 1435 found = __exists_dev(pathname); 1436 mutex_unlock(&sess_lock); 1437 1438 return found; 1439 } 1440 1441 static bool insert_dev_if_not_exists_devpath(const char *pathname, 1442 struct rnbd_clt_session *sess, 1443 struct rnbd_clt_dev *dev) 1444 { 1445 bool found; 1446 1447 mutex_lock(&sess_lock); 1448 found = __exists_dev(pathname); 1449 if (!found) { 1450 mutex_lock(&sess->lock); 1451 list_add_tail(&dev->list, &sess->devs_list); 1452 mutex_unlock(&sess->lock); 1453 } 1454 mutex_unlock(&sess_lock); 1455 1456 return found; 1457 } 1458 1459 static void delete_dev(struct rnbd_clt_dev *dev) 1460 { 1461 struct rnbd_clt_session *sess = dev->sess; 1462 1463 mutex_lock(&sess->lock); 1464 list_del(&dev->list); 1465 mutex_unlock(&sess->lock); 1466 } 1467 1468 struct rnbd_clt_dev *rnbd_clt_map_device(const char *sessname, 1469 struct rtrs_addr *paths, 1470 size_t path_cnt, u16 port_nr, 1471 const char *pathname, 1472 enum rnbd_access_mode access_mode) 1473 { 1474 struct rnbd_clt_session *sess; 1475 struct rnbd_clt_dev *dev; 1476 int ret; 1477 1478 if (exists_devpath(pathname)) 1479 return ERR_PTR(-EEXIST); 1480 1481 sess = find_and_get_or_create_sess(sessname, paths, path_cnt, port_nr); 1482 if (IS_ERR(sess)) 1483 return ERR_CAST(sess); 1484 1485 dev = init_dev(sess, access_mode, pathname); 1486 if (IS_ERR(dev)) { 1487 pr_err("map_device: failed to map device '%s' from session %s, can't initialize device, err: %ld\n", 1488 pathname, sess->sessname, PTR_ERR(dev)); 1489 ret = PTR_ERR(dev); 1490 goto put_sess; 1491 } 1492 if (insert_dev_if_not_exists_devpath(pathname, sess, dev)) { 1493 ret = -EEXIST; 1494 goto put_dev; 1495 } 1496 ret = send_msg_open(dev, WAIT); 1497 if (ret) { 1498 rnbd_clt_err(dev, 1499 "map_device: failed, can't open remote device, err: %d\n", 1500 ret); 1501 goto del_dev; 1502 } 1503 mutex_lock(&dev->lock); 1504 pr_debug("Opened remote device: session=%s, path='%s'\n", 1505 sess->sessname, pathname); 1506 ret = rnbd_client_setup_device(sess, dev, dev->clt_device_id); 1507 if (ret) { 1508 rnbd_clt_err(dev, 1509 "map_device: Failed to configure device, err: %d\n", 1510 ret); 1511 mutex_unlock(&dev->lock); 1512 goto send_close; 1513 } 1514 1515 rnbd_clt_info(dev, 1516 "map_device: Device mapped as %s (nsectors: %zu, logical_block_size: %d, physical_block_size: %d, max_write_same_sectors: %d, max_discard_sectors: %d, discard_granularity: %d, discard_alignment: %d, secure_discard: %d, max_segments: %d, max_hw_sectors: %d, rotational: %d)\n", 1517 dev->gd->disk_name, dev->nsectors, 1518 dev->logical_block_size, dev->physical_block_size, 1519 dev->max_write_same_sectors, dev->max_discard_sectors, 1520 dev->discard_granularity, dev->discard_alignment, 1521 dev->secure_discard, dev->max_segments, 1522 dev->max_hw_sectors, dev->rotational); 1523 1524 mutex_unlock(&dev->lock); 1525 1526 add_disk(dev->gd); 1527 rnbd_clt_put_sess(sess); 1528 1529 return dev; 1530 1531 send_close: 1532 send_msg_close(dev, dev->device_id, WAIT); 1533 del_dev: 1534 delete_dev(dev); 1535 put_dev: 1536 rnbd_clt_put_dev(dev); 1537 put_sess: 1538 rnbd_clt_put_sess(sess); 1539 1540 return ERR_PTR(ret); 1541 } 1542 1543 static void destroy_gen_disk(struct rnbd_clt_dev *dev) 1544 { 1545 del_gendisk(dev->gd); 1546 blk_cleanup_queue(dev->queue); 1547 put_disk(dev->gd); 1548 } 1549 1550 static void destroy_sysfs(struct rnbd_clt_dev *dev, 1551 const struct attribute *sysfs_self) 1552 { 1553 rnbd_clt_remove_dev_symlink(dev); 1554 if (dev->kobj.state_initialized) { 1555 if (sysfs_self) 1556 /* To avoid deadlock firstly remove itself */ 1557 sysfs_remove_file_self(&dev->kobj, sysfs_self); 1558 kobject_del(&dev->kobj); 1559 kobject_put(&dev->kobj); 1560 } 1561 } 1562 1563 int rnbd_clt_unmap_device(struct rnbd_clt_dev *dev, bool force, 1564 const struct attribute *sysfs_self) 1565 { 1566 struct rnbd_clt_session *sess = dev->sess; 1567 int refcount, ret = 0; 1568 bool was_mapped; 1569 1570 mutex_lock(&dev->lock); 1571 if (dev->dev_state == DEV_STATE_UNMAPPED) { 1572 rnbd_clt_info(dev, "Device is already being unmapped\n"); 1573 ret = -EALREADY; 1574 goto err; 1575 } 1576 refcount = refcount_read(&dev->refcount); 1577 if (!force && refcount > 1) { 1578 rnbd_clt_err(dev, 1579 "Closing device failed, device is in use, (%d device users)\n", 1580 refcount - 1); 1581 ret = -EBUSY; 1582 goto err; 1583 } 1584 was_mapped = (dev->dev_state == DEV_STATE_MAPPED); 1585 dev->dev_state = DEV_STATE_UNMAPPED; 1586 mutex_unlock(&dev->lock); 1587 1588 delete_dev(dev); 1589 destroy_sysfs(dev, sysfs_self); 1590 destroy_gen_disk(dev); 1591 if (was_mapped && sess->rtrs) 1592 send_msg_close(dev, dev->device_id, WAIT); 1593 1594 rnbd_clt_info(dev, "Device is unmapped\n"); 1595 1596 /* Likely last reference put */ 1597 rnbd_clt_put_dev(dev); 1598 1599 /* 1600 * Here device and session can be vanished! 1601 */ 1602 1603 return 0; 1604 err: 1605 mutex_unlock(&dev->lock); 1606 1607 return ret; 1608 } 1609 1610 int rnbd_clt_remap_device(struct rnbd_clt_dev *dev) 1611 { 1612 int err; 1613 1614 mutex_lock(&dev->lock); 1615 if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) 1616 err = 0; 1617 else if (dev->dev_state == DEV_STATE_UNMAPPED) 1618 err = -ENODEV; 1619 else if (dev->dev_state == DEV_STATE_MAPPED) 1620 err = -EALREADY; 1621 else 1622 err = -EBUSY; 1623 mutex_unlock(&dev->lock); 1624 if (!err) { 1625 rnbd_clt_info(dev, "Remapping device.\n"); 1626 err = send_msg_open(dev, WAIT); 1627 if (err) 1628 rnbd_clt_err(dev, "remap_device: %d\n", err); 1629 } 1630 1631 return err; 1632 } 1633 1634 static void unmap_device_work(struct work_struct *work) 1635 { 1636 struct rnbd_clt_dev *dev; 1637 1638 dev = container_of(work, typeof(*dev), unmap_on_rmmod_work); 1639 rnbd_clt_unmap_device(dev, true, NULL); 1640 } 1641 1642 static void rnbd_destroy_sessions(void) 1643 { 1644 struct rnbd_clt_session *sess, *sn; 1645 struct rnbd_clt_dev *dev, *tn; 1646 1647 /* Firstly forbid access through sysfs interface */ 1648 rnbd_clt_destroy_default_group(); 1649 rnbd_clt_destroy_sysfs_files(); 1650 1651 /* 1652 * Here at this point there is no any concurrent access to sessions 1653 * list and devices list: 1654 * 1. New session or device can'be be created - session sysfs files 1655 * are removed. 1656 * 2. Device or session can't be removed - module reference is taken 1657 * into account in unmap device sysfs callback. 1658 * 3. No IO requests inflight - each file open of block_dev increases 1659 * module reference in get_disk(). 1660 * 1661 * But still there can be user requests inflights, which are sent by 1662 * asynchronous send_msg_*() functions, thus before unmapping devices 1663 * RTRS session must be explicitly closed. 1664 */ 1665 1666 list_for_each_entry_safe(sess, sn, &sess_list, list) { 1667 WARN_ON(!rnbd_clt_get_sess(sess)); 1668 close_rtrs(sess); 1669 list_for_each_entry_safe(dev, tn, &sess->devs_list, list) { 1670 /* 1671 * Here unmap happens in parallel for only one reason: 1672 * blk_cleanup_queue() takes around half a second, so 1673 * on huge amount of devices the whole module unload 1674 * procedure takes minutes. 1675 */ 1676 INIT_WORK(&dev->unmap_on_rmmod_work, unmap_device_work); 1677 queue_work(system_long_wq, &dev->unmap_on_rmmod_work); 1678 } 1679 rnbd_clt_put_sess(sess); 1680 } 1681 /* Wait for all scheduled unmap works */ 1682 flush_workqueue(system_long_wq); 1683 WARN_ON(!list_empty(&sess_list)); 1684 } 1685 1686 static int __init rnbd_client_init(void) 1687 { 1688 int err = 0; 1689 1690 BUILD_BUG_ON(sizeof(struct rnbd_msg_hdr) != 4); 1691 BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info) != 36); 1692 BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info_rsp) != 36); 1693 BUILD_BUG_ON(sizeof(struct rnbd_msg_open) != 264); 1694 BUILD_BUG_ON(sizeof(struct rnbd_msg_close) != 8); 1695 BUILD_BUG_ON(sizeof(struct rnbd_msg_open_rsp) != 56); 1696 rnbd_client_major = register_blkdev(rnbd_client_major, "rnbd"); 1697 if (rnbd_client_major <= 0) { 1698 pr_err("Failed to load module, block device registration failed\n"); 1699 return -EBUSY; 1700 } 1701 1702 err = rnbd_clt_create_sysfs_files(); 1703 if (err) { 1704 pr_err("Failed to load module, creating sysfs device files failed, err: %d\n", 1705 err); 1706 unregister_blkdev(rnbd_client_major, "rnbd"); 1707 } 1708 1709 return err; 1710 } 1711 1712 static void __exit rnbd_client_exit(void) 1713 { 1714 rnbd_destroy_sessions(); 1715 unregister_blkdev(rnbd_client_major, "rnbd"); 1716 ida_destroy(&index_ida); 1717 } 1718 1719 module_init(rnbd_client_init); 1720 module_exit(rnbd_client_exit); 1721