1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* 3 * RDMA Network Block Driver 4 * 5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. 6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. 7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. 8 */ 9 10 #undef pr_fmt 11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt 12 13 #include <linux/module.h> 14 #include <linux/blkdev.h> 15 #include <linux/hdreg.h> 16 #include <linux/scatterlist.h> 17 #include <linux/idr.h> 18 19 #include "rnbd-clt.h" 20 21 MODULE_DESCRIPTION("RDMA Network Block Device Client"); 22 MODULE_LICENSE("GPL"); 23 24 static int rnbd_client_major; 25 static DEFINE_IDA(index_ida); 26 static DEFINE_MUTEX(ida_lock); 27 static DEFINE_MUTEX(sess_lock); 28 static LIST_HEAD(sess_list); 29 30 /* 31 * Maximum number of partitions an instance can have. 32 * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself) 33 */ 34 #define RNBD_PART_BITS 6 35 36 static inline bool rnbd_clt_get_sess(struct rnbd_clt_session *sess) 37 { 38 return refcount_inc_not_zero(&sess->refcount); 39 } 40 41 static void free_sess(struct rnbd_clt_session *sess); 42 43 static void rnbd_clt_put_sess(struct rnbd_clt_session *sess) 44 { 45 might_sleep(); 46 47 if (refcount_dec_and_test(&sess->refcount)) 48 free_sess(sess); 49 } 50 51 static void rnbd_clt_put_dev(struct rnbd_clt_dev *dev) 52 { 53 might_sleep(); 54 55 if (!refcount_dec_and_test(&dev->refcount)) 56 return; 57 58 mutex_lock(&ida_lock); 59 ida_simple_remove(&index_ida, dev->clt_device_id); 60 mutex_unlock(&ida_lock); 61 kfree(dev->hw_queues); 62 rnbd_clt_put_sess(dev->sess); 63 mutex_destroy(&dev->lock); 64 kfree(dev); 65 } 66 67 static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev *dev) 68 { 69 return refcount_inc_not_zero(&dev->refcount); 70 } 71 72 static int rnbd_clt_set_dev_attr(struct rnbd_clt_dev *dev, 73 const struct rnbd_msg_open_rsp *rsp) 74 { 75 struct rnbd_clt_session *sess = dev->sess; 76 77 if (!rsp->logical_block_size) 78 return -EINVAL; 79 80 dev->device_id = le32_to_cpu(rsp->device_id); 81 dev->nsectors = le64_to_cpu(rsp->nsectors); 82 dev->logical_block_size = le16_to_cpu(rsp->logical_block_size); 83 dev->physical_block_size = le16_to_cpu(rsp->physical_block_size); 84 dev->max_write_same_sectors = le32_to_cpu(rsp->max_write_same_sectors); 85 dev->max_discard_sectors = le32_to_cpu(rsp->max_discard_sectors); 86 dev->discard_granularity = le32_to_cpu(rsp->discard_granularity); 87 dev->discard_alignment = le32_to_cpu(rsp->discard_alignment); 88 dev->secure_discard = le16_to_cpu(rsp->secure_discard); 89 dev->rotational = rsp->rotational; 90 91 dev->max_hw_sectors = sess->max_io_size / SECTOR_SIZE; 92 dev->max_segments = BMAX_SEGMENTS; 93 94 dev->max_hw_sectors = min_t(u32, dev->max_hw_sectors, 95 le32_to_cpu(rsp->max_hw_sectors)); 96 dev->max_segments = min_t(u16, dev->max_segments, 97 le16_to_cpu(rsp->max_segments)); 98 99 return 0; 100 } 101 102 static int rnbd_clt_change_capacity(struct rnbd_clt_dev *dev, 103 size_t new_nsectors) 104 { 105 rnbd_clt_info(dev, "Device size changed from %zu to %zu sectors\n", 106 dev->nsectors, new_nsectors); 107 dev->nsectors = new_nsectors; 108 set_capacity(dev->gd, dev->nsectors); 109 revalidate_disk_size(dev->gd, true); 110 return 0; 111 } 112 113 static int process_msg_open_rsp(struct rnbd_clt_dev *dev, 114 struct rnbd_msg_open_rsp *rsp) 115 { 116 int err = 0; 117 118 mutex_lock(&dev->lock); 119 if (dev->dev_state == DEV_STATE_UNMAPPED) { 120 rnbd_clt_info(dev, 121 "Ignoring Open-Response message from server for unmapped device\n"); 122 err = -ENOENT; 123 goto out; 124 } 125 if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) { 126 u64 nsectors = le64_to_cpu(rsp->nsectors); 127 128 /* 129 * If the device was remapped and the size changed in the 130 * meantime we need to revalidate it 131 */ 132 if (dev->nsectors != nsectors) 133 rnbd_clt_change_capacity(dev, nsectors); 134 rnbd_clt_info(dev, "Device online, device remapped successfully\n"); 135 } 136 err = rnbd_clt_set_dev_attr(dev, rsp); 137 if (err) 138 goto out; 139 dev->dev_state = DEV_STATE_MAPPED; 140 141 out: 142 mutex_unlock(&dev->lock); 143 144 return err; 145 } 146 147 int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, size_t newsize) 148 { 149 int ret = 0; 150 151 mutex_lock(&dev->lock); 152 if (dev->dev_state != DEV_STATE_MAPPED) { 153 pr_err("Failed to set new size of the device, device is not opened\n"); 154 ret = -ENOENT; 155 goto out; 156 } 157 ret = rnbd_clt_change_capacity(dev, newsize); 158 159 out: 160 mutex_unlock(&dev->lock); 161 162 return ret; 163 } 164 165 static inline void rnbd_clt_dev_requeue(struct rnbd_queue *q) 166 { 167 if (WARN_ON(!q->hctx)) 168 return; 169 170 /* We can come here from interrupt, thus async=true */ 171 blk_mq_run_hw_queue(q->hctx, true); 172 } 173 174 enum { 175 RNBD_DELAY_IFBUSY = -1, 176 }; 177 178 /** 179 * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun 180 * @sess: Session to find a queue for 181 * @cpu: Cpu to start the search from 182 * 183 * Description: 184 * Each CPU has a list of HW queues, which needs to be rerun. If a list 185 * is not empty - it is marked with a bit. This function finds first 186 * set bit in a bitmap and returns corresponding CPU list. 187 */ 188 static struct rnbd_cpu_qlist * 189 rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu) 190 { 191 int bit; 192 193 /* Search from cpu to nr_cpu_ids */ 194 bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu); 195 if (bit < nr_cpu_ids) { 196 return per_cpu_ptr(sess->cpu_queues, bit); 197 } else if (cpu != 0) { 198 /* Search from 0 to cpu */ 199 bit = find_next_bit(sess->cpu_queues_bm, cpu, 0); 200 if (bit < cpu) 201 return per_cpu_ptr(sess->cpu_queues, bit); 202 } 203 204 return NULL; 205 } 206 207 static inline int nxt_cpu(int cpu) 208 { 209 return (cpu + 1) % nr_cpu_ids; 210 } 211 212 /** 213 * rnbd_rerun_if_needed() - rerun next queue marked as stopped 214 * @sess: Session to rerun a queue on 215 * 216 * Description: 217 * Each CPU has it's own list of HW queues, which should be rerun. 218 * Function finds such list with HW queues, takes a list lock, picks up 219 * the first HW queue out of the list and requeues it. 220 * 221 * Return: 222 * True if the queue was requeued, false otherwise. 223 * 224 * Context: 225 * Does not matter. 226 */ 227 static bool rnbd_rerun_if_needed(struct rnbd_clt_session *sess) 228 { 229 struct rnbd_queue *q = NULL; 230 struct rnbd_cpu_qlist *cpu_q; 231 unsigned long flags; 232 int *cpup; 233 234 /* 235 * To keep fairness and not to let other queues starve we always 236 * try to wake up someone else in round-robin manner. That of course 237 * increases latency but queues always have a chance to be executed. 238 */ 239 cpup = get_cpu_ptr(sess->cpu_rr); 240 for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q; 241 cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) { 242 if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags)) 243 continue; 244 if (unlikely(!test_bit(cpu_q->cpu, sess->cpu_queues_bm))) 245 goto unlock; 246 q = list_first_entry_or_null(&cpu_q->requeue_list, 247 typeof(*q), requeue_list); 248 if (WARN_ON(!q)) 249 goto clear_bit; 250 list_del_init(&q->requeue_list); 251 clear_bit_unlock(0, &q->in_list); 252 253 if (list_empty(&cpu_q->requeue_list)) { 254 /* Clear bit if nothing is left */ 255 clear_bit: 256 clear_bit(cpu_q->cpu, sess->cpu_queues_bm); 257 } 258 unlock: 259 spin_unlock_irqrestore(&cpu_q->requeue_lock, flags); 260 261 if (q) 262 break; 263 } 264 265 /** 266 * Saves the CPU that is going to be requeued on the per-cpu var. Just 267 * incrementing it doesn't work because rnbd_get_cpu_qlist() will 268 * always return the first CPU with something on the queue list when the 269 * value stored on the var is greater than the last CPU with something 270 * on the list. 271 */ 272 if (cpu_q) 273 *cpup = cpu_q->cpu; 274 put_cpu_var(sess->cpu_rr); 275 276 if (q) 277 rnbd_clt_dev_requeue(q); 278 279 return q; 280 } 281 282 /** 283 * rnbd_rerun_all_if_idle() - rerun all queues left in the list if 284 * session is idling (there are no requests 285 * in-flight). 286 * @sess: Session to rerun the queues on 287 * 288 * Description: 289 * This function tries to rerun all stopped queues if there are no 290 * requests in-flight anymore. This function tries to solve an obvious 291 * problem, when number of tags < than number of queues (hctx), which 292 * are stopped and put to sleep. If last permit, which has been just put, 293 * does not wake up all left queues (hctxs), IO requests hang forever. 294 * 295 * That can happen when all number of permits, say N, have been exhausted 296 * from one CPU, and we have many block devices per session, say M. 297 * Each block device has it's own queue (hctx) for each CPU, so eventually 298 * we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids. 299 * If number of permits N < M x nr_cpu_ids finally we will get an IO hang. 300 * 301 * To avoid this hang last caller of rnbd_put_permit() (last caller is the 302 * one who observes sess->busy == 0) must wake up all remaining queues. 303 * 304 * Context: 305 * Does not matter. 306 */ 307 static void rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess) 308 { 309 bool requeued; 310 311 do { 312 requeued = rnbd_rerun_if_needed(sess); 313 } while (atomic_read(&sess->busy) == 0 && requeued); 314 } 315 316 static struct rtrs_permit *rnbd_get_permit(struct rnbd_clt_session *sess, 317 enum rtrs_clt_con_type con_type, 318 int wait) 319 { 320 struct rtrs_permit *permit; 321 322 permit = rtrs_clt_get_permit(sess->rtrs, con_type, 323 wait ? RTRS_PERMIT_WAIT : 324 RTRS_PERMIT_NOWAIT); 325 if (likely(permit)) 326 /* We have a subtle rare case here, when all permits can be 327 * consumed before busy counter increased. This is safe, 328 * because loser will get NULL as a permit, observe 0 busy 329 * counter and immediately restart the queue himself. 330 */ 331 atomic_inc(&sess->busy); 332 333 return permit; 334 } 335 336 static void rnbd_put_permit(struct rnbd_clt_session *sess, 337 struct rtrs_permit *permit) 338 { 339 rtrs_clt_put_permit(sess->rtrs, permit); 340 atomic_dec(&sess->busy); 341 /* Paired with rnbd_clt_dev_add_to_requeue(). Decrement first 342 * and then check queue bits. 343 */ 344 smp_mb__after_atomic(); 345 rnbd_rerun_all_if_idle(sess); 346 } 347 348 static struct rnbd_iu *rnbd_get_iu(struct rnbd_clt_session *sess, 349 enum rtrs_clt_con_type con_type, 350 int wait) 351 { 352 struct rnbd_iu *iu; 353 struct rtrs_permit *permit; 354 355 permit = rnbd_get_permit(sess, con_type, 356 wait ? RTRS_PERMIT_WAIT : 357 RTRS_PERMIT_NOWAIT); 358 if (unlikely(!permit)) 359 return NULL; 360 iu = rtrs_permit_to_pdu(permit); 361 iu->permit = permit; 362 /* 363 * 1st reference is dropped after finishing sending a "user" message, 364 * 2nd reference is dropped after confirmation with the response is 365 * returned. 366 * 1st and 2nd can happen in any order, so the rnbd_iu should be 367 * released (rtrs_permit returned to ibbtrs) only leased after both 368 * are finished. 369 */ 370 atomic_set(&iu->refcount, 2); 371 init_waitqueue_head(&iu->comp.wait); 372 iu->comp.errno = INT_MAX; 373 374 return iu; 375 } 376 377 static void rnbd_put_iu(struct rnbd_clt_session *sess, struct rnbd_iu *iu) 378 { 379 if (atomic_dec_and_test(&iu->refcount)) 380 rnbd_put_permit(sess, iu->permit); 381 } 382 383 static void rnbd_softirq_done_fn(struct request *rq) 384 { 385 struct rnbd_clt_dev *dev = rq->rq_disk->private_data; 386 struct rnbd_clt_session *sess = dev->sess; 387 struct rnbd_iu *iu; 388 389 iu = blk_mq_rq_to_pdu(rq); 390 rnbd_put_permit(sess, iu->permit); 391 blk_mq_end_request(rq, errno_to_blk_status(iu->errno)); 392 } 393 394 static void msg_io_conf(void *priv, int errno) 395 { 396 struct rnbd_iu *iu = priv; 397 struct rnbd_clt_dev *dev = iu->dev; 398 struct request *rq = iu->rq; 399 int rw = rq_data_dir(rq); 400 401 iu->errno = errno; 402 403 blk_mq_complete_request(rq); 404 405 if (errno) 406 rnbd_clt_info_rl(dev, "%s I/O failed with err: %d\n", 407 rw == READ ? "read" : "write", errno); 408 } 409 410 static void wake_up_iu_comp(struct rnbd_iu *iu, int errno) 411 { 412 iu->comp.errno = errno; 413 wake_up(&iu->comp.wait); 414 } 415 416 static void msg_conf(void *priv, int errno) 417 { 418 struct rnbd_iu *iu = priv; 419 420 iu->errno = errno; 421 schedule_work(&iu->work); 422 } 423 424 enum wait_type { 425 NO_WAIT = 0, 426 WAIT = 1 427 }; 428 429 static int send_usr_msg(struct rtrs_clt *rtrs, int dir, 430 struct rnbd_iu *iu, struct kvec *vec, size_t nr, 431 size_t len, struct scatterlist *sg, unsigned int sg_len, 432 void (*conf)(struct work_struct *work), 433 int *errno, enum wait_type wait) 434 { 435 int err; 436 struct rtrs_clt_req_ops req_ops; 437 438 INIT_WORK(&iu->work, conf); 439 req_ops = (struct rtrs_clt_req_ops) { 440 .priv = iu, 441 .conf_fn = msg_conf, 442 }; 443 err = rtrs_clt_request(dir, &req_ops, rtrs, iu->permit, 444 vec, nr, len, sg, sg_len); 445 if (!err && wait) { 446 wait_event(iu->comp.wait, iu->comp.errno != INT_MAX); 447 *errno = iu->comp.errno; 448 } else { 449 *errno = 0; 450 } 451 452 return err; 453 } 454 455 static void msg_close_conf(struct work_struct *work) 456 { 457 struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); 458 struct rnbd_clt_dev *dev = iu->dev; 459 460 wake_up_iu_comp(iu, iu->errno); 461 rnbd_put_iu(dev->sess, iu); 462 rnbd_clt_put_dev(dev); 463 } 464 465 static int send_msg_close(struct rnbd_clt_dev *dev, u32 device_id, bool wait) 466 { 467 struct rnbd_clt_session *sess = dev->sess; 468 struct rnbd_msg_close msg; 469 struct rnbd_iu *iu; 470 struct kvec vec = { 471 .iov_base = &msg, 472 .iov_len = sizeof(msg) 473 }; 474 int err, errno; 475 476 iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); 477 if (!iu) 478 return -ENOMEM; 479 480 iu->buf = NULL; 481 iu->dev = dev; 482 483 sg_mark_end(&iu->sglist[0]); 484 485 msg.hdr.type = cpu_to_le16(RNBD_MSG_CLOSE); 486 msg.device_id = cpu_to_le32(device_id); 487 488 WARN_ON(!rnbd_clt_get_dev(dev)); 489 err = send_usr_msg(sess->rtrs, WRITE, iu, &vec, 1, 0, NULL, 0, 490 msg_close_conf, &errno, wait); 491 if (err) { 492 rnbd_clt_put_dev(dev); 493 rnbd_put_iu(sess, iu); 494 } else { 495 err = errno; 496 } 497 498 rnbd_put_iu(sess, iu); 499 return err; 500 } 501 502 static void msg_open_conf(struct work_struct *work) 503 { 504 struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); 505 struct rnbd_msg_open_rsp *rsp = iu->buf; 506 struct rnbd_clt_dev *dev = iu->dev; 507 int errno = iu->errno; 508 509 if (errno) { 510 rnbd_clt_err(dev, 511 "Opening failed, server responded: %d\n", 512 errno); 513 } else { 514 errno = process_msg_open_rsp(dev, rsp); 515 if (errno) { 516 u32 device_id = le32_to_cpu(rsp->device_id); 517 /* 518 * If server thinks its fine, but we fail to process 519 * then be nice and send a close to server. 520 */ 521 (void)send_msg_close(dev, device_id, NO_WAIT); 522 } 523 } 524 kfree(rsp); 525 wake_up_iu_comp(iu, errno); 526 rnbd_put_iu(dev->sess, iu); 527 rnbd_clt_put_dev(dev); 528 } 529 530 static void msg_sess_info_conf(struct work_struct *work) 531 { 532 struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); 533 struct rnbd_msg_sess_info_rsp *rsp = iu->buf; 534 struct rnbd_clt_session *sess = iu->sess; 535 536 if (!iu->errno) 537 sess->ver = min_t(u8, rsp->ver, RNBD_PROTO_VER_MAJOR); 538 539 kfree(rsp); 540 wake_up_iu_comp(iu, iu->errno); 541 rnbd_put_iu(sess, iu); 542 rnbd_clt_put_sess(sess); 543 } 544 545 static int send_msg_open(struct rnbd_clt_dev *dev, bool wait) 546 { 547 struct rnbd_clt_session *sess = dev->sess; 548 struct rnbd_msg_open_rsp *rsp; 549 struct rnbd_msg_open msg; 550 struct rnbd_iu *iu; 551 struct kvec vec = { 552 .iov_base = &msg, 553 .iov_len = sizeof(msg) 554 }; 555 int err, errno; 556 557 rsp = kzalloc(sizeof(*rsp), GFP_KERNEL); 558 if (!rsp) 559 return -ENOMEM; 560 561 iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); 562 if (!iu) { 563 kfree(rsp); 564 return -ENOMEM; 565 } 566 567 iu->buf = rsp; 568 iu->dev = dev; 569 570 sg_init_one(iu->sglist, rsp, sizeof(*rsp)); 571 572 msg.hdr.type = cpu_to_le16(RNBD_MSG_OPEN); 573 msg.access_mode = dev->access_mode; 574 strlcpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name)); 575 576 WARN_ON(!rnbd_clt_get_dev(dev)); 577 err = send_usr_msg(sess->rtrs, READ, iu, 578 &vec, 1, sizeof(*rsp), iu->sglist, 1, 579 msg_open_conf, &errno, wait); 580 if (err) { 581 rnbd_clt_put_dev(dev); 582 rnbd_put_iu(sess, iu); 583 kfree(rsp); 584 } else { 585 err = errno; 586 } 587 588 rnbd_put_iu(sess, iu); 589 return err; 590 } 591 592 static int send_msg_sess_info(struct rnbd_clt_session *sess, bool wait) 593 { 594 struct rnbd_msg_sess_info_rsp *rsp; 595 struct rnbd_msg_sess_info msg; 596 struct rnbd_iu *iu; 597 struct kvec vec = { 598 .iov_base = &msg, 599 .iov_len = sizeof(msg) 600 }; 601 int err, errno; 602 603 rsp = kzalloc(sizeof(*rsp), GFP_KERNEL); 604 if (!rsp) 605 return -ENOMEM; 606 607 iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); 608 if (!iu) { 609 kfree(rsp); 610 return -ENOMEM; 611 } 612 613 iu->buf = rsp; 614 iu->sess = sess; 615 616 sg_init_one(iu->sglist, rsp, sizeof(*rsp)); 617 618 msg.hdr.type = cpu_to_le16(RNBD_MSG_SESS_INFO); 619 msg.ver = RNBD_PROTO_VER_MAJOR; 620 621 if (!rnbd_clt_get_sess(sess)) { 622 /* 623 * That can happen only in one case, when RTRS has restablished 624 * the connection and link_ev() is called, but session is almost 625 * dead, last reference on session is put and caller is waiting 626 * for RTRS to close everything. 627 */ 628 err = -ENODEV; 629 goto put_iu; 630 } 631 err = send_usr_msg(sess->rtrs, READ, iu, 632 &vec, 1, sizeof(*rsp), iu->sglist, 1, 633 msg_sess_info_conf, &errno, wait); 634 if (err) { 635 rnbd_clt_put_sess(sess); 636 put_iu: 637 rnbd_put_iu(sess, iu); 638 kfree(rsp); 639 } else { 640 err = errno; 641 } 642 643 rnbd_put_iu(sess, iu); 644 return err; 645 } 646 647 static void set_dev_states_to_disconnected(struct rnbd_clt_session *sess) 648 { 649 struct rnbd_clt_dev *dev; 650 651 mutex_lock(&sess->lock); 652 list_for_each_entry(dev, &sess->devs_list, list) { 653 rnbd_clt_err(dev, "Device disconnected.\n"); 654 655 mutex_lock(&dev->lock); 656 if (dev->dev_state == DEV_STATE_MAPPED) 657 dev->dev_state = DEV_STATE_MAPPED_DISCONNECTED; 658 mutex_unlock(&dev->lock); 659 } 660 mutex_unlock(&sess->lock); 661 } 662 663 static void remap_devs(struct rnbd_clt_session *sess) 664 { 665 struct rnbd_clt_dev *dev; 666 struct rtrs_attrs attrs; 667 int err; 668 669 /* 670 * Careful here: we are called from RTRS link event directly, 671 * thus we can't send any RTRS request and wait for response 672 * or RTRS will not be able to complete request with failure 673 * if something goes wrong (failing of outstanding requests 674 * happens exactly from the context where we are blocking now). 675 * 676 * So to avoid deadlocks each usr message sent from here must 677 * be asynchronous. 678 */ 679 680 err = send_msg_sess_info(sess, NO_WAIT); 681 if (err) { 682 pr_err("send_msg_sess_info(\"%s\"): %d\n", sess->sessname, err); 683 return; 684 } 685 686 rtrs_clt_query(sess->rtrs, &attrs); 687 mutex_lock(&sess->lock); 688 sess->max_io_size = attrs.max_io_size; 689 690 list_for_each_entry(dev, &sess->devs_list, list) { 691 bool skip; 692 693 mutex_lock(&dev->lock); 694 skip = (dev->dev_state == DEV_STATE_INIT); 695 mutex_unlock(&dev->lock); 696 if (skip) 697 /* 698 * When device is establishing connection for the first 699 * time - do not remap, it will be closed soon. 700 */ 701 continue; 702 703 rnbd_clt_info(dev, "session reconnected, remapping device\n"); 704 err = send_msg_open(dev, NO_WAIT); 705 if (err) { 706 rnbd_clt_err(dev, "send_msg_open(): %d\n", err); 707 break; 708 } 709 } 710 mutex_unlock(&sess->lock); 711 } 712 713 static void rnbd_clt_link_ev(void *priv, enum rtrs_clt_link_ev ev) 714 { 715 struct rnbd_clt_session *sess = priv; 716 717 switch (ev) { 718 case RTRS_CLT_LINK_EV_DISCONNECTED: 719 set_dev_states_to_disconnected(sess); 720 break; 721 case RTRS_CLT_LINK_EV_RECONNECTED: 722 remap_devs(sess); 723 break; 724 default: 725 pr_err("Unknown session event received (%d), session: %s\n", 726 ev, sess->sessname); 727 } 728 } 729 730 static void rnbd_init_cpu_qlists(struct rnbd_cpu_qlist __percpu *cpu_queues) 731 { 732 unsigned int cpu; 733 struct rnbd_cpu_qlist *cpu_q; 734 735 for_each_possible_cpu(cpu) { 736 cpu_q = per_cpu_ptr(cpu_queues, cpu); 737 738 cpu_q->cpu = cpu; 739 INIT_LIST_HEAD(&cpu_q->requeue_list); 740 spin_lock_init(&cpu_q->requeue_lock); 741 } 742 } 743 744 static void destroy_mq_tags(struct rnbd_clt_session *sess) 745 { 746 if (sess->tag_set.tags) 747 blk_mq_free_tag_set(&sess->tag_set); 748 } 749 750 static inline void wake_up_rtrs_waiters(struct rnbd_clt_session *sess) 751 { 752 sess->rtrs_ready = true; 753 wake_up_all(&sess->rtrs_waitq); 754 } 755 756 static void close_rtrs(struct rnbd_clt_session *sess) 757 { 758 might_sleep(); 759 760 if (!IS_ERR_OR_NULL(sess->rtrs)) { 761 rtrs_clt_close(sess->rtrs); 762 sess->rtrs = NULL; 763 wake_up_rtrs_waiters(sess); 764 } 765 } 766 767 static void free_sess(struct rnbd_clt_session *sess) 768 { 769 WARN_ON(!list_empty(&sess->devs_list)); 770 771 might_sleep(); 772 773 close_rtrs(sess); 774 destroy_mq_tags(sess); 775 if (!list_empty(&sess->list)) { 776 mutex_lock(&sess_lock); 777 list_del(&sess->list); 778 mutex_unlock(&sess_lock); 779 } 780 free_percpu(sess->cpu_queues); 781 free_percpu(sess->cpu_rr); 782 mutex_destroy(&sess->lock); 783 kfree(sess); 784 } 785 786 static struct rnbd_clt_session *alloc_sess(const char *sessname) 787 { 788 struct rnbd_clt_session *sess; 789 int err, cpu; 790 791 sess = kzalloc_node(sizeof(*sess), GFP_KERNEL, NUMA_NO_NODE); 792 if (!sess) 793 return ERR_PTR(-ENOMEM); 794 strlcpy(sess->sessname, sessname, sizeof(sess->sessname)); 795 atomic_set(&sess->busy, 0); 796 mutex_init(&sess->lock); 797 INIT_LIST_HEAD(&sess->devs_list); 798 INIT_LIST_HEAD(&sess->list); 799 bitmap_zero(sess->cpu_queues_bm, NR_CPUS); 800 init_waitqueue_head(&sess->rtrs_waitq); 801 refcount_set(&sess->refcount, 1); 802 803 sess->cpu_queues = alloc_percpu(struct rnbd_cpu_qlist); 804 if (!sess->cpu_queues) { 805 err = -ENOMEM; 806 goto err; 807 } 808 rnbd_init_cpu_qlists(sess->cpu_queues); 809 810 /* 811 * That is simple percpu variable which stores cpu indeces, which are 812 * incremented on each access. We need that for the sake of fairness 813 * to wake up queues in a round-robin manner. 814 */ 815 sess->cpu_rr = alloc_percpu(int); 816 if (!sess->cpu_rr) { 817 err = -ENOMEM; 818 goto err; 819 } 820 for_each_possible_cpu(cpu) 821 * per_cpu_ptr(sess->cpu_rr, cpu) = cpu; 822 823 return sess; 824 825 err: 826 free_sess(sess); 827 828 return ERR_PTR(err); 829 } 830 831 static int wait_for_rtrs_connection(struct rnbd_clt_session *sess) 832 { 833 wait_event(sess->rtrs_waitq, sess->rtrs_ready); 834 if (IS_ERR_OR_NULL(sess->rtrs)) 835 return -ECONNRESET; 836 837 return 0; 838 } 839 840 static void wait_for_rtrs_disconnection(struct rnbd_clt_session *sess) 841 __releases(&sess_lock) 842 __acquires(&sess_lock) 843 { 844 DEFINE_WAIT(wait); 845 846 prepare_to_wait(&sess->rtrs_waitq, &wait, TASK_UNINTERRUPTIBLE); 847 if (IS_ERR_OR_NULL(sess->rtrs)) { 848 finish_wait(&sess->rtrs_waitq, &wait); 849 return; 850 } 851 mutex_unlock(&sess_lock); 852 /* loop in caller, see __find_and_get_sess(). 853 * You can't leave mutex locked and call schedule(), you will catch a 854 * deadlock with a caller of free_sess(), which has just put the last 855 * reference and is about to take the sess_lock in order to delete 856 * the session from the list. 857 */ 858 schedule(); 859 mutex_lock(&sess_lock); 860 } 861 862 static struct rnbd_clt_session *__find_and_get_sess(const char *sessname) 863 __releases(&sess_lock) 864 __acquires(&sess_lock) 865 { 866 struct rnbd_clt_session *sess, *sn; 867 int err; 868 869 again: 870 list_for_each_entry_safe(sess, sn, &sess_list, list) { 871 if (strcmp(sessname, sess->sessname)) 872 continue; 873 874 if (sess->rtrs_ready && IS_ERR_OR_NULL(sess->rtrs)) 875 /* 876 * No RTRS connection, session is dying. 877 */ 878 continue; 879 880 if (rnbd_clt_get_sess(sess)) { 881 /* 882 * Alive session is found, wait for RTRS connection. 883 */ 884 mutex_unlock(&sess_lock); 885 err = wait_for_rtrs_connection(sess); 886 if (err) 887 rnbd_clt_put_sess(sess); 888 mutex_lock(&sess_lock); 889 890 if (err) 891 /* Session is dying, repeat the loop */ 892 goto again; 893 894 return sess; 895 } 896 /* 897 * Ref is 0, session is dying, wait for RTRS disconnect 898 * in order to avoid session names clashes. 899 */ 900 wait_for_rtrs_disconnection(sess); 901 /* 902 * RTRS is disconnected and soon session will be freed, 903 * so repeat a loop. 904 */ 905 goto again; 906 } 907 908 return NULL; 909 } 910 911 static struct 912 rnbd_clt_session *find_or_create_sess(const char *sessname, bool *first) 913 { 914 struct rnbd_clt_session *sess = NULL; 915 916 mutex_lock(&sess_lock); 917 sess = __find_and_get_sess(sessname); 918 if (!sess) { 919 sess = alloc_sess(sessname); 920 if (IS_ERR(sess)) { 921 mutex_unlock(&sess_lock); 922 return sess; 923 } 924 list_add(&sess->list, &sess_list); 925 *first = true; 926 } else 927 *first = false; 928 mutex_unlock(&sess_lock); 929 930 return sess; 931 } 932 933 static int rnbd_client_open(struct block_device *block_device, fmode_t mode) 934 { 935 struct rnbd_clt_dev *dev = block_device->bd_disk->private_data; 936 937 if (dev->read_only && (mode & FMODE_WRITE)) 938 return -EPERM; 939 940 if (dev->dev_state == DEV_STATE_UNMAPPED || 941 !rnbd_clt_get_dev(dev)) 942 return -EIO; 943 944 return 0; 945 } 946 947 static void rnbd_client_release(struct gendisk *gen, fmode_t mode) 948 { 949 struct rnbd_clt_dev *dev = gen->private_data; 950 951 rnbd_clt_put_dev(dev); 952 } 953 954 static int rnbd_client_getgeo(struct block_device *block_device, 955 struct hd_geometry *geo) 956 { 957 u64 size; 958 struct rnbd_clt_dev *dev; 959 960 dev = block_device->bd_disk->private_data; 961 size = dev->size * (dev->logical_block_size / SECTOR_SIZE); 962 geo->cylinders = size >> 6; /* size/64 */ 963 geo->heads = 4; 964 geo->sectors = 16; 965 geo->start = 0; 966 967 return 0; 968 } 969 970 static const struct block_device_operations rnbd_client_ops = { 971 .owner = THIS_MODULE, 972 .open = rnbd_client_open, 973 .release = rnbd_client_release, 974 .getgeo = rnbd_client_getgeo 975 }; 976 977 /* The amount of data that belongs to an I/O and the amount of data that 978 * should be read or written to the disk (bi_size) can differ. 979 * 980 * E.g. When WRITE_SAME is used, only a small amount of data is 981 * transferred that is then written repeatedly over a lot of sectors. 982 * 983 * Get the size of data to be transferred via RTRS by summing up the size 984 * of the scather-gather list entries. 985 */ 986 static size_t rnbd_clt_get_sg_size(struct scatterlist *sglist, u32 len) 987 { 988 struct scatterlist *sg; 989 size_t tsize = 0; 990 int i; 991 992 for_each_sg(sglist, sg, len, i) 993 tsize += sg->length; 994 return tsize; 995 } 996 997 static int rnbd_client_xfer_request(struct rnbd_clt_dev *dev, 998 struct request *rq, 999 struct rnbd_iu *iu) 1000 { 1001 struct rtrs_clt *rtrs = dev->sess->rtrs; 1002 struct rtrs_permit *permit = iu->permit; 1003 struct rnbd_msg_io msg; 1004 struct rtrs_clt_req_ops req_ops; 1005 unsigned int sg_cnt = 0; 1006 struct kvec vec; 1007 size_t size; 1008 int err; 1009 1010 iu->rq = rq; 1011 iu->dev = dev; 1012 msg.sector = cpu_to_le64(blk_rq_pos(rq)); 1013 msg.bi_size = cpu_to_le32(blk_rq_bytes(rq)); 1014 msg.rw = cpu_to_le32(rq_to_rnbd_flags(rq)); 1015 msg.prio = cpu_to_le16(req_get_ioprio(rq)); 1016 1017 /* 1018 * We only support discards with single segment for now. 1019 * See queue limits. 1020 */ 1021 if (req_op(rq) != REQ_OP_DISCARD) 1022 sg_cnt = blk_rq_map_sg(dev->queue, rq, iu->sglist); 1023 1024 if (sg_cnt == 0) 1025 /* Do not forget to mark the end */ 1026 sg_mark_end(&iu->sglist[0]); 1027 1028 msg.hdr.type = cpu_to_le16(RNBD_MSG_IO); 1029 msg.device_id = cpu_to_le32(dev->device_id); 1030 1031 vec = (struct kvec) { 1032 .iov_base = &msg, 1033 .iov_len = sizeof(msg) 1034 }; 1035 size = rnbd_clt_get_sg_size(iu->sglist, sg_cnt); 1036 req_ops = (struct rtrs_clt_req_ops) { 1037 .priv = iu, 1038 .conf_fn = msg_io_conf, 1039 }; 1040 err = rtrs_clt_request(rq_data_dir(rq), &req_ops, rtrs, permit, 1041 &vec, 1, size, iu->sglist, sg_cnt); 1042 if (unlikely(err)) { 1043 rnbd_clt_err_rl(dev, "RTRS failed to transfer IO, err: %d\n", 1044 err); 1045 return err; 1046 } 1047 1048 return 0; 1049 } 1050 1051 /** 1052 * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy 1053 * @dev: Device to be checked 1054 * @q: Queue to be added to the requeue list if required 1055 * 1056 * Description: 1057 * If session is busy, that means someone will requeue us when resources 1058 * are freed. If session is not doing anything - device is not added to 1059 * the list and @false is returned. 1060 */ 1061 static bool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev *dev, 1062 struct rnbd_queue *q) 1063 { 1064 struct rnbd_clt_session *sess = dev->sess; 1065 struct rnbd_cpu_qlist *cpu_q; 1066 unsigned long flags; 1067 bool added = true; 1068 bool need_set; 1069 1070 cpu_q = get_cpu_ptr(sess->cpu_queues); 1071 spin_lock_irqsave(&cpu_q->requeue_lock, flags); 1072 1073 if (likely(!test_and_set_bit_lock(0, &q->in_list))) { 1074 if (WARN_ON(!list_empty(&q->requeue_list))) 1075 goto unlock; 1076 1077 need_set = !test_bit(cpu_q->cpu, sess->cpu_queues_bm); 1078 if (need_set) { 1079 set_bit(cpu_q->cpu, sess->cpu_queues_bm); 1080 /* Paired with rnbd_put_permit(). Set a bit first 1081 * and then observe the busy counter. 1082 */ 1083 smp_mb__before_atomic(); 1084 } 1085 if (likely(atomic_read(&sess->busy))) { 1086 list_add_tail(&q->requeue_list, &cpu_q->requeue_list); 1087 } else { 1088 /* Very unlikely, but possible: busy counter was 1089 * observed as zero. Drop all bits and return 1090 * false to restart the queue by ourselves. 1091 */ 1092 if (need_set) 1093 clear_bit(cpu_q->cpu, sess->cpu_queues_bm); 1094 clear_bit_unlock(0, &q->in_list); 1095 added = false; 1096 } 1097 } 1098 unlock: 1099 spin_unlock_irqrestore(&cpu_q->requeue_lock, flags); 1100 put_cpu_ptr(sess->cpu_queues); 1101 1102 return added; 1103 } 1104 1105 static void rnbd_clt_dev_kick_mq_queue(struct rnbd_clt_dev *dev, 1106 struct blk_mq_hw_ctx *hctx, 1107 int delay) 1108 { 1109 struct rnbd_queue *q = hctx->driver_data; 1110 1111 if (delay != RNBD_DELAY_IFBUSY) 1112 blk_mq_delay_run_hw_queue(hctx, delay); 1113 else if (unlikely(!rnbd_clt_dev_add_to_requeue(dev, q))) 1114 /* 1115 * If session is not busy we have to restart 1116 * the queue ourselves. 1117 */ 1118 blk_mq_delay_run_hw_queue(hctx, 10/*ms*/); 1119 } 1120 1121 static blk_status_t rnbd_queue_rq(struct blk_mq_hw_ctx *hctx, 1122 const struct blk_mq_queue_data *bd) 1123 { 1124 struct request *rq = bd->rq; 1125 struct rnbd_clt_dev *dev = rq->rq_disk->private_data; 1126 struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq); 1127 int err; 1128 1129 if (unlikely(dev->dev_state != DEV_STATE_MAPPED)) 1130 return BLK_STS_IOERR; 1131 1132 iu->permit = rnbd_get_permit(dev->sess, RTRS_IO_CON, 1133 RTRS_PERMIT_NOWAIT); 1134 if (unlikely(!iu->permit)) { 1135 rnbd_clt_dev_kick_mq_queue(dev, hctx, RNBD_DELAY_IFBUSY); 1136 return BLK_STS_RESOURCE; 1137 } 1138 1139 blk_mq_start_request(rq); 1140 err = rnbd_client_xfer_request(dev, rq, iu); 1141 if (likely(err == 0)) 1142 return BLK_STS_OK; 1143 if (unlikely(err == -EAGAIN || err == -ENOMEM)) { 1144 rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/); 1145 rnbd_put_permit(dev->sess, iu->permit); 1146 return BLK_STS_RESOURCE; 1147 } 1148 1149 rnbd_put_permit(dev->sess, iu->permit); 1150 return BLK_STS_IOERR; 1151 } 1152 1153 static int rnbd_init_request(struct blk_mq_tag_set *set, struct request *rq, 1154 unsigned int hctx_idx, unsigned int numa_node) 1155 { 1156 struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq); 1157 1158 sg_init_table(iu->sglist, BMAX_SEGMENTS); 1159 return 0; 1160 } 1161 1162 static struct blk_mq_ops rnbd_mq_ops = { 1163 .queue_rq = rnbd_queue_rq, 1164 .init_request = rnbd_init_request, 1165 .complete = rnbd_softirq_done_fn, 1166 }; 1167 1168 static int setup_mq_tags(struct rnbd_clt_session *sess) 1169 { 1170 struct blk_mq_tag_set *tag_set = &sess->tag_set; 1171 1172 memset(tag_set, 0, sizeof(*tag_set)); 1173 tag_set->ops = &rnbd_mq_ops; 1174 tag_set->queue_depth = sess->queue_depth; 1175 tag_set->numa_node = NUMA_NO_NODE; 1176 tag_set->flags = BLK_MQ_F_SHOULD_MERGE | 1177 BLK_MQ_F_TAG_QUEUE_SHARED; 1178 tag_set->cmd_size = sizeof(struct rnbd_iu); 1179 tag_set->nr_hw_queues = num_online_cpus(); 1180 1181 return blk_mq_alloc_tag_set(tag_set); 1182 } 1183 1184 static struct rnbd_clt_session * 1185 find_and_get_or_create_sess(const char *sessname, 1186 const struct rtrs_addr *paths, 1187 size_t path_cnt, u16 port_nr) 1188 { 1189 struct rnbd_clt_session *sess; 1190 struct rtrs_attrs attrs; 1191 int err; 1192 bool first; 1193 struct rtrs_clt_ops rtrs_ops; 1194 1195 sess = find_or_create_sess(sessname, &first); 1196 if (sess == ERR_PTR(-ENOMEM)) 1197 return ERR_PTR(-ENOMEM); 1198 else if (!first) 1199 return sess; 1200 1201 rtrs_ops = (struct rtrs_clt_ops) { 1202 .priv = sess, 1203 .link_ev = rnbd_clt_link_ev, 1204 }; 1205 /* 1206 * Nothing was found, establish rtrs connection and proceed further. 1207 */ 1208 sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname, 1209 paths, path_cnt, port_nr, 1210 sizeof(struct rnbd_iu), 1211 RECONNECT_DELAY, BMAX_SEGMENTS, 1212 BLK_MAX_SEGMENT_SIZE, 1213 MAX_RECONNECTS); 1214 if (IS_ERR(sess->rtrs)) { 1215 err = PTR_ERR(sess->rtrs); 1216 goto wake_up_and_put; 1217 } 1218 rtrs_clt_query(sess->rtrs, &attrs); 1219 sess->max_io_size = attrs.max_io_size; 1220 sess->queue_depth = attrs.queue_depth; 1221 1222 err = setup_mq_tags(sess); 1223 if (err) 1224 goto close_rtrs; 1225 1226 err = send_msg_sess_info(sess, WAIT); 1227 if (err) 1228 goto close_rtrs; 1229 1230 wake_up_rtrs_waiters(sess); 1231 1232 return sess; 1233 1234 close_rtrs: 1235 close_rtrs(sess); 1236 put_sess: 1237 rnbd_clt_put_sess(sess); 1238 1239 return ERR_PTR(err); 1240 1241 wake_up_and_put: 1242 wake_up_rtrs_waiters(sess); 1243 goto put_sess; 1244 } 1245 1246 static inline void rnbd_init_hw_queue(struct rnbd_clt_dev *dev, 1247 struct rnbd_queue *q, 1248 struct blk_mq_hw_ctx *hctx) 1249 { 1250 INIT_LIST_HEAD(&q->requeue_list); 1251 q->dev = dev; 1252 q->hctx = hctx; 1253 } 1254 1255 static void rnbd_init_mq_hw_queues(struct rnbd_clt_dev *dev) 1256 { 1257 int i; 1258 struct blk_mq_hw_ctx *hctx; 1259 struct rnbd_queue *q; 1260 1261 queue_for_each_hw_ctx(dev->queue, hctx, i) { 1262 q = &dev->hw_queues[i]; 1263 rnbd_init_hw_queue(dev, q, hctx); 1264 hctx->driver_data = q; 1265 } 1266 } 1267 1268 static int setup_mq_dev(struct rnbd_clt_dev *dev) 1269 { 1270 dev->queue = blk_mq_init_queue(&dev->sess->tag_set); 1271 if (IS_ERR(dev->queue)) { 1272 rnbd_clt_err(dev, "Initializing multiqueue queue failed, err: %ld\n", 1273 PTR_ERR(dev->queue)); 1274 return PTR_ERR(dev->queue); 1275 } 1276 rnbd_init_mq_hw_queues(dev); 1277 return 0; 1278 } 1279 1280 static void setup_request_queue(struct rnbd_clt_dev *dev) 1281 { 1282 blk_queue_logical_block_size(dev->queue, dev->logical_block_size); 1283 blk_queue_physical_block_size(dev->queue, dev->physical_block_size); 1284 blk_queue_max_hw_sectors(dev->queue, dev->max_hw_sectors); 1285 blk_queue_max_write_same_sectors(dev->queue, 1286 dev->max_write_same_sectors); 1287 1288 /* 1289 * we don't support discards to "discontiguous" segments 1290 * in on request 1291 */ 1292 blk_queue_max_discard_segments(dev->queue, 1); 1293 1294 blk_queue_max_discard_sectors(dev->queue, dev->max_discard_sectors); 1295 dev->queue->limits.discard_granularity = dev->discard_granularity; 1296 dev->queue->limits.discard_alignment = dev->discard_alignment; 1297 if (dev->max_discard_sectors) 1298 blk_queue_flag_set(QUEUE_FLAG_DISCARD, dev->queue); 1299 if (dev->secure_discard) 1300 blk_queue_flag_set(QUEUE_FLAG_SECERASE, dev->queue); 1301 1302 blk_queue_flag_set(QUEUE_FLAG_SAME_COMP, dev->queue); 1303 blk_queue_flag_set(QUEUE_FLAG_SAME_FORCE, dev->queue); 1304 blk_queue_max_segments(dev->queue, dev->max_segments); 1305 blk_queue_io_opt(dev->queue, dev->sess->max_io_size); 1306 blk_queue_virt_boundary(dev->queue, SZ_4K - 1); 1307 blk_queue_write_cache(dev->queue, true, true); 1308 dev->queue->queuedata = dev; 1309 } 1310 1311 static void rnbd_clt_setup_gen_disk(struct rnbd_clt_dev *dev, int idx) 1312 { 1313 dev->gd->major = rnbd_client_major; 1314 dev->gd->first_minor = idx << RNBD_PART_BITS; 1315 dev->gd->fops = &rnbd_client_ops; 1316 dev->gd->queue = dev->queue; 1317 dev->gd->private_data = dev; 1318 snprintf(dev->gd->disk_name, sizeof(dev->gd->disk_name), "rnbd%d", 1319 idx); 1320 pr_debug("disk_name=%s, capacity=%zu\n", 1321 dev->gd->disk_name, 1322 dev->nsectors * (dev->logical_block_size / SECTOR_SIZE) 1323 ); 1324 1325 set_capacity(dev->gd, dev->nsectors); 1326 1327 if (dev->access_mode == RNBD_ACCESS_RO) { 1328 dev->read_only = true; 1329 set_disk_ro(dev->gd, true); 1330 } else { 1331 dev->read_only = false; 1332 } 1333 1334 if (!dev->rotational) 1335 blk_queue_flag_set(QUEUE_FLAG_NONROT, dev->queue); 1336 } 1337 1338 static int rnbd_client_setup_device(struct rnbd_clt_session *sess, 1339 struct rnbd_clt_dev *dev, int idx) 1340 { 1341 int err; 1342 1343 dev->size = dev->nsectors * dev->logical_block_size; 1344 1345 err = setup_mq_dev(dev); 1346 if (err) 1347 return err; 1348 1349 setup_request_queue(dev); 1350 1351 dev->gd = alloc_disk_node(1 << RNBD_PART_BITS, NUMA_NO_NODE); 1352 if (!dev->gd) { 1353 blk_cleanup_queue(dev->queue); 1354 return -ENOMEM; 1355 } 1356 1357 rnbd_clt_setup_gen_disk(dev, idx); 1358 1359 return 0; 1360 } 1361 1362 static struct rnbd_clt_dev *init_dev(struct rnbd_clt_session *sess, 1363 enum rnbd_access_mode access_mode, 1364 const char *pathname) 1365 { 1366 struct rnbd_clt_dev *dev; 1367 int ret; 1368 1369 dev = kzalloc_node(sizeof(*dev), GFP_KERNEL, NUMA_NO_NODE); 1370 if (!dev) 1371 return ERR_PTR(-ENOMEM); 1372 1373 dev->hw_queues = kcalloc(nr_cpu_ids, sizeof(*dev->hw_queues), 1374 GFP_KERNEL); 1375 if (!dev->hw_queues) { 1376 ret = -ENOMEM; 1377 goto out_alloc; 1378 } 1379 1380 mutex_lock(&ida_lock); 1381 ret = ida_simple_get(&index_ida, 0, 1 << (MINORBITS - RNBD_PART_BITS), 1382 GFP_KERNEL); 1383 mutex_unlock(&ida_lock); 1384 if (ret < 0) { 1385 pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n", 1386 pathname, sess->sessname, ret); 1387 goto out_queues; 1388 } 1389 dev->clt_device_id = ret; 1390 dev->sess = sess; 1391 dev->access_mode = access_mode; 1392 strlcpy(dev->pathname, pathname, sizeof(dev->pathname)); 1393 mutex_init(&dev->lock); 1394 refcount_set(&dev->refcount, 1); 1395 dev->dev_state = DEV_STATE_INIT; 1396 1397 /* 1398 * Here we called from sysfs entry, thus clt-sysfs is 1399 * responsible that session will not disappear. 1400 */ 1401 WARN_ON(!rnbd_clt_get_sess(sess)); 1402 1403 return dev; 1404 1405 out_queues: 1406 kfree(dev->hw_queues); 1407 out_alloc: 1408 kfree(dev); 1409 return ERR_PTR(ret); 1410 } 1411 1412 static bool __exists_dev(const char *pathname) 1413 { 1414 struct rnbd_clt_session *sess; 1415 struct rnbd_clt_dev *dev; 1416 bool found = false; 1417 1418 list_for_each_entry(sess, &sess_list, list) { 1419 mutex_lock(&sess->lock); 1420 list_for_each_entry(dev, &sess->devs_list, list) { 1421 if (!strncmp(dev->pathname, pathname, 1422 sizeof(dev->pathname))) { 1423 found = true; 1424 break; 1425 } 1426 } 1427 mutex_unlock(&sess->lock); 1428 if (found) 1429 break; 1430 } 1431 1432 return found; 1433 } 1434 1435 static bool exists_devpath(const char *pathname) 1436 { 1437 bool found; 1438 1439 mutex_lock(&sess_lock); 1440 found = __exists_dev(pathname); 1441 mutex_unlock(&sess_lock); 1442 1443 return found; 1444 } 1445 1446 static bool insert_dev_if_not_exists_devpath(const char *pathname, 1447 struct rnbd_clt_session *sess, 1448 struct rnbd_clt_dev *dev) 1449 { 1450 bool found; 1451 1452 mutex_lock(&sess_lock); 1453 found = __exists_dev(pathname); 1454 if (!found) { 1455 mutex_lock(&sess->lock); 1456 list_add_tail(&dev->list, &sess->devs_list); 1457 mutex_unlock(&sess->lock); 1458 } 1459 mutex_unlock(&sess_lock); 1460 1461 return found; 1462 } 1463 1464 static void delete_dev(struct rnbd_clt_dev *dev) 1465 { 1466 struct rnbd_clt_session *sess = dev->sess; 1467 1468 mutex_lock(&sess->lock); 1469 list_del(&dev->list); 1470 mutex_unlock(&sess->lock); 1471 } 1472 1473 struct rnbd_clt_dev *rnbd_clt_map_device(const char *sessname, 1474 struct rtrs_addr *paths, 1475 size_t path_cnt, u16 port_nr, 1476 const char *pathname, 1477 enum rnbd_access_mode access_mode) 1478 { 1479 struct rnbd_clt_session *sess; 1480 struct rnbd_clt_dev *dev; 1481 int ret; 1482 1483 if (exists_devpath(pathname)) 1484 return ERR_PTR(-EEXIST); 1485 1486 sess = find_and_get_or_create_sess(sessname, paths, path_cnt, port_nr); 1487 if (IS_ERR(sess)) 1488 return ERR_CAST(sess); 1489 1490 dev = init_dev(sess, access_mode, pathname); 1491 if (IS_ERR(dev)) { 1492 pr_err("map_device: failed to map device '%s' from session %s, can't initialize device, err: %ld\n", 1493 pathname, sess->sessname, PTR_ERR(dev)); 1494 ret = PTR_ERR(dev); 1495 goto put_sess; 1496 } 1497 if (insert_dev_if_not_exists_devpath(pathname, sess, dev)) { 1498 ret = -EEXIST; 1499 goto put_dev; 1500 } 1501 ret = send_msg_open(dev, WAIT); 1502 if (ret) { 1503 rnbd_clt_err(dev, 1504 "map_device: failed, can't open remote device, err: %d\n", 1505 ret); 1506 goto del_dev; 1507 } 1508 mutex_lock(&dev->lock); 1509 pr_debug("Opened remote device: session=%s, path='%s'\n", 1510 sess->sessname, pathname); 1511 ret = rnbd_client_setup_device(sess, dev, dev->clt_device_id); 1512 if (ret) { 1513 rnbd_clt_err(dev, 1514 "map_device: Failed to configure device, err: %d\n", 1515 ret); 1516 mutex_unlock(&dev->lock); 1517 goto del_dev; 1518 } 1519 1520 rnbd_clt_info(dev, 1521 "map_device: Device mapped as %s (nsectors: %zu, logical_block_size: %d, physical_block_size: %d, max_write_same_sectors: %d, max_discard_sectors: %d, discard_granularity: %d, discard_alignment: %d, secure_discard: %d, max_segments: %d, max_hw_sectors: %d, rotational: %d)\n", 1522 dev->gd->disk_name, dev->nsectors, 1523 dev->logical_block_size, dev->physical_block_size, 1524 dev->max_write_same_sectors, dev->max_discard_sectors, 1525 dev->discard_granularity, dev->discard_alignment, 1526 dev->secure_discard, dev->max_segments, 1527 dev->max_hw_sectors, dev->rotational); 1528 1529 mutex_unlock(&dev->lock); 1530 1531 add_disk(dev->gd); 1532 rnbd_clt_put_sess(sess); 1533 1534 return dev; 1535 1536 del_dev: 1537 delete_dev(dev); 1538 put_dev: 1539 rnbd_clt_put_dev(dev); 1540 put_sess: 1541 rnbd_clt_put_sess(sess); 1542 1543 return ERR_PTR(ret); 1544 } 1545 1546 static void destroy_gen_disk(struct rnbd_clt_dev *dev) 1547 { 1548 del_gendisk(dev->gd); 1549 blk_cleanup_queue(dev->queue); 1550 put_disk(dev->gd); 1551 } 1552 1553 static void destroy_sysfs(struct rnbd_clt_dev *dev, 1554 const struct attribute *sysfs_self) 1555 { 1556 rnbd_clt_remove_dev_symlink(dev); 1557 if (dev->kobj.state_initialized) { 1558 if (sysfs_self) 1559 /* To avoid deadlock firstly remove itself */ 1560 sysfs_remove_file_self(&dev->kobj, sysfs_self); 1561 kobject_del(&dev->kobj); 1562 kobject_put(&dev->kobj); 1563 } 1564 } 1565 1566 int rnbd_clt_unmap_device(struct rnbd_clt_dev *dev, bool force, 1567 const struct attribute *sysfs_self) 1568 { 1569 struct rnbd_clt_session *sess = dev->sess; 1570 int refcount, ret = 0; 1571 bool was_mapped; 1572 1573 mutex_lock(&dev->lock); 1574 if (dev->dev_state == DEV_STATE_UNMAPPED) { 1575 rnbd_clt_info(dev, "Device is already being unmapped\n"); 1576 ret = -EALREADY; 1577 goto err; 1578 } 1579 refcount = refcount_read(&dev->refcount); 1580 if (!force && refcount > 1) { 1581 rnbd_clt_err(dev, 1582 "Closing device failed, device is in use, (%d device users)\n", 1583 refcount - 1); 1584 ret = -EBUSY; 1585 goto err; 1586 } 1587 was_mapped = (dev->dev_state == DEV_STATE_MAPPED); 1588 dev->dev_state = DEV_STATE_UNMAPPED; 1589 mutex_unlock(&dev->lock); 1590 1591 delete_dev(dev); 1592 destroy_sysfs(dev, sysfs_self); 1593 destroy_gen_disk(dev); 1594 if (was_mapped && sess->rtrs) 1595 send_msg_close(dev, dev->device_id, WAIT); 1596 1597 rnbd_clt_info(dev, "Device is unmapped\n"); 1598 1599 /* Likely last reference put */ 1600 rnbd_clt_put_dev(dev); 1601 1602 /* 1603 * Here device and session can be vanished! 1604 */ 1605 1606 return 0; 1607 err: 1608 mutex_unlock(&dev->lock); 1609 1610 return ret; 1611 } 1612 1613 int rnbd_clt_remap_device(struct rnbd_clt_dev *dev) 1614 { 1615 int err; 1616 1617 mutex_lock(&dev->lock); 1618 if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) 1619 err = 0; 1620 else if (dev->dev_state == DEV_STATE_UNMAPPED) 1621 err = -ENODEV; 1622 else if (dev->dev_state == DEV_STATE_MAPPED) 1623 err = -EALREADY; 1624 else 1625 err = -EBUSY; 1626 mutex_unlock(&dev->lock); 1627 if (!err) { 1628 rnbd_clt_info(dev, "Remapping device.\n"); 1629 err = send_msg_open(dev, WAIT); 1630 if (err) 1631 rnbd_clt_err(dev, "remap_device: %d\n", err); 1632 } 1633 1634 return err; 1635 } 1636 1637 static void unmap_device_work(struct work_struct *work) 1638 { 1639 struct rnbd_clt_dev *dev; 1640 1641 dev = container_of(work, typeof(*dev), unmap_on_rmmod_work); 1642 rnbd_clt_unmap_device(dev, true, NULL); 1643 } 1644 1645 static void rnbd_destroy_sessions(void) 1646 { 1647 struct rnbd_clt_session *sess, *sn; 1648 struct rnbd_clt_dev *dev, *tn; 1649 1650 /* Firstly forbid access through sysfs interface */ 1651 rnbd_clt_destroy_default_group(); 1652 rnbd_clt_destroy_sysfs_files(); 1653 1654 /* 1655 * Here at this point there is no any concurrent access to sessions 1656 * list and devices list: 1657 * 1. New session or device can'be be created - session sysfs files 1658 * are removed. 1659 * 2. Device or session can't be removed - module reference is taken 1660 * into account in unmap device sysfs callback. 1661 * 3. No IO requests inflight - each file open of block_dev increases 1662 * module reference in get_disk(). 1663 * 1664 * But still there can be user requests inflights, which are sent by 1665 * asynchronous send_msg_*() functions, thus before unmapping devices 1666 * RTRS session must be explicitly closed. 1667 */ 1668 1669 list_for_each_entry_safe(sess, sn, &sess_list, list) { 1670 WARN_ON(!rnbd_clt_get_sess(sess)); 1671 close_rtrs(sess); 1672 list_for_each_entry_safe(dev, tn, &sess->devs_list, list) { 1673 /* 1674 * Here unmap happens in parallel for only one reason: 1675 * blk_cleanup_queue() takes around half a second, so 1676 * on huge amount of devices the whole module unload 1677 * procedure takes minutes. 1678 */ 1679 INIT_WORK(&dev->unmap_on_rmmod_work, unmap_device_work); 1680 queue_work(system_long_wq, &dev->unmap_on_rmmod_work); 1681 } 1682 rnbd_clt_put_sess(sess); 1683 } 1684 /* Wait for all scheduled unmap works */ 1685 flush_workqueue(system_long_wq); 1686 WARN_ON(!list_empty(&sess_list)); 1687 } 1688 1689 static int __init rnbd_client_init(void) 1690 { 1691 int err = 0; 1692 1693 BUILD_BUG_ON(sizeof(struct rnbd_msg_hdr) != 4); 1694 BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info) != 36); 1695 BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info_rsp) != 36); 1696 BUILD_BUG_ON(sizeof(struct rnbd_msg_open) != 264); 1697 BUILD_BUG_ON(sizeof(struct rnbd_msg_close) != 8); 1698 BUILD_BUG_ON(sizeof(struct rnbd_msg_open_rsp) != 56); 1699 rnbd_client_major = register_blkdev(rnbd_client_major, "rnbd"); 1700 if (rnbd_client_major <= 0) { 1701 pr_err("Failed to load module, block device registration failed\n"); 1702 return -EBUSY; 1703 } 1704 1705 err = rnbd_clt_create_sysfs_files(); 1706 if (err) { 1707 pr_err("Failed to load module, creating sysfs device files failed, err: %d\n", 1708 err); 1709 unregister_blkdev(rnbd_client_major, "rnbd"); 1710 } 1711 1712 return err; 1713 } 1714 1715 static void __exit rnbd_client_exit(void) 1716 { 1717 rnbd_destroy_sessions(); 1718 unregister_blkdev(rnbd_client_major, "rnbd"); 1719 ida_destroy(&index_ida); 1720 } 1721 1722 module_init(rnbd_client_init); 1723 module_exit(rnbd_client_exit); 1724