// SPDX-License-Identifier: GPL-2.0-or-later /* * RDMA Network Block Driver * * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved. * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved. * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved. */ #undef pr_fmt #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt #include <linux/module.h> #include <linux/blkdev.h> #include <linux/hdreg.h> #include <linux/scatterlist.h> #include <linux/idr.h> #include "rnbd-clt.h" MODULE_DESCRIPTION("RDMA Network Block Device Client"); MODULE_LICENSE("GPL"); static int rnbd_client_major; static DEFINE_IDA(index_ida); static DEFINE_MUTEX(ida_lock); static DEFINE_MUTEX(sess_lock); static LIST_HEAD(sess_list); /* * Maximum number of partitions an instance can have. * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself) */ #define RNBD_PART_BITS 6 static inline bool rnbd_clt_get_sess(struct rnbd_clt_session *sess) { return refcount_inc_not_zero(&sess->refcount); } static void free_sess(struct rnbd_clt_session *sess); static void rnbd_clt_put_sess(struct rnbd_clt_session *sess) { might_sleep(); if (refcount_dec_and_test(&sess->refcount)) free_sess(sess); } static void rnbd_clt_put_dev(struct rnbd_clt_dev *dev) { might_sleep(); if (!refcount_dec_and_test(&dev->refcount)) return; mutex_lock(&ida_lock); ida_simple_remove(&index_ida, dev->clt_device_id); mutex_unlock(&ida_lock); kfree(dev->hw_queues); kfree(dev->pathname); rnbd_clt_put_sess(dev->sess); mutex_destroy(&dev->lock); kfree(dev); } static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev *dev) { return refcount_inc_not_zero(&dev->refcount); } static int rnbd_clt_set_dev_attr(struct rnbd_clt_dev *dev, const struct rnbd_msg_open_rsp *rsp) { struct rnbd_clt_session *sess = dev->sess; if (!rsp->logical_block_size) return -EINVAL; dev->device_id = le32_to_cpu(rsp->device_id); dev->nsectors = le64_to_cpu(rsp->nsectors); dev->logical_block_size = le16_to_cpu(rsp->logical_block_size); dev->physical_block_size = le16_to_cpu(rsp->physical_block_size); dev->max_write_same_sectors = le32_to_cpu(rsp->max_write_same_sectors); dev->max_discard_sectors = le32_to_cpu(rsp->max_discard_sectors); dev->discard_granularity = le32_to_cpu(rsp->discard_granularity); dev->discard_alignment = le32_to_cpu(rsp->discard_alignment); dev->secure_discard = le16_to_cpu(rsp->secure_discard); dev->rotational = rsp->rotational; dev->wc = !!(rsp->cache_policy & RNBD_WRITEBACK); dev->fua = !!(rsp->cache_policy & RNBD_FUA); dev->max_hw_sectors = sess->max_io_size / SECTOR_SIZE; dev->max_segments = sess->max_segments; return 0; } static int rnbd_clt_change_capacity(struct rnbd_clt_dev *dev, size_t new_nsectors) { rnbd_clt_info(dev, "Device size changed from %zu to %zu sectors\n", dev->nsectors, new_nsectors); dev->nsectors = new_nsectors; set_capacity_and_notify(dev->gd, dev->nsectors); return 0; } static int process_msg_open_rsp(struct rnbd_clt_dev *dev, struct rnbd_msg_open_rsp *rsp) { struct kobject *gd_kobj; int err = 0; mutex_lock(&dev->lock); if (dev->dev_state == DEV_STATE_UNMAPPED) { rnbd_clt_info(dev, "Ignoring Open-Response message from server for unmapped device\n"); err = -ENOENT; goto out; } if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) { u64 nsectors = le64_to_cpu(rsp->nsectors); /* * If the device was remapped and the size changed in the * meantime we need to revalidate it */ if (dev->nsectors != nsectors) rnbd_clt_change_capacity(dev, nsectors); gd_kobj = &disk_to_dev(dev->gd)->kobj; kobject_uevent(gd_kobj, KOBJ_ONLINE); rnbd_clt_info(dev, "Device online, device remapped successfully\n"); } err = rnbd_clt_set_dev_attr(dev, rsp); if (err) goto out; dev->dev_state = DEV_STATE_MAPPED; out: mutex_unlock(&dev->lock); return err; } int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, size_t newsize) { int ret = 0; mutex_lock(&dev->lock); if (dev->dev_state != DEV_STATE_MAPPED) { pr_err("Failed to set new size of the device, device is not opened\n"); ret = -ENOENT; goto out; } ret = rnbd_clt_change_capacity(dev, newsize); out: mutex_unlock(&dev->lock); return ret; } static inline void rnbd_clt_dev_requeue(struct rnbd_queue *q) { if (WARN_ON(!q->hctx)) return; /* We can come here from interrupt, thus async=true */ blk_mq_run_hw_queue(q->hctx, true); } enum { RNBD_DELAY_IFBUSY = -1, }; /** * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun * @sess: Session to find a queue for * @cpu: Cpu to start the search from * * Description: * Each CPU has a list of HW queues, which needs to be rerun. If a list * is not empty - it is marked with a bit. This function finds first * set bit in a bitmap and returns corresponding CPU list. */ static struct rnbd_cpu_qlist * rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu) { int bit; /* Search from cpu to nr_cpu_ids */ bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu); if (bit < nr_cpu_ids) { return per_cpu_ptr(sess->cpu_queues, bit); } else if (cpu != 0) { /* Search from 0 to cpu */ bit = find_next_bit(sess->cpu_queues_bm, cpu, 0); if (bit < cpu) return per_cpu_ptr(sess->cpu_queues, bit); } return NULL; } static inline int nxt_cpu(int cpu) { return (cpu + 1) % nr_cpu_ids; } /** * rnbd_rerun_if_needed() - rerun next queue marked as stopped * @sess: Session to rerun a queue on * * Description: * Each CPU has it's own list of HW queues, which should be rerun. * Function finds such list with HW queues, takes a list lock, picks up * the first HW queue out of the list and requeues it. * * Return: * True if the queue was requeued, false otherwise. * * Context: * Does not matter. */ static bool rnbd_rerun_if_needed(struct rnbd_clt_session *sess) { struct rnbd_queue *q = NULL; struct rnbd_cpu_qlist *cpu_q; unsigned long flags; int *cpup; /* * To keep fairness and not to let other queues starve we always * try to wake up someone else in round-robin manner. That of course * increases latency but queues always have a chance to be executed. */ cpup = get_cpu_ptr(sess->cpu_rr); for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q; cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) { if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags)) continue; if (!test_bit(cpu_q->cpu, sess->cpu_queues_bm)) goto unlock; q = list_first_entry_or_null(&cpu_q->requeue_list, typeof(*q), requeue_list); if (WARN_ON(!q)) goto clear_bit; list_del_init(&q->requeue_list); clear_bit_unlock(0, &q->in_list); if (list_empty(&cpu_q->requeue_list)) { /* Clear bit if nothing is left */ clear_bit: clear_bit(cpu_q->cpu, sess->cpu_queues_bm); } unlock: spin_unlock_irqrestore(&cpu_q->requeue_lock, flags); if (q) break; } /** * Saves the CPU that is going to be requeued on the per-cpu var. Just * incrementing it doesn't work because rnbd_get_cpu_qlist() will * always return the first CPU with something on the queue list when the * value stored on the var is greater than the last CPU with something * on the list. */ if (cpu_q) *cpup = cpu_q->cpu; put_cpu_ptr(sess->cpu_rr); if (q) rnbd_clt_dev_requeue(q); return q; } /** * rnbd_rerun_all_if_idle() - rerun all queues left in the list if * session is idling (there are no requests * in-flight). * @sess: Session to rerun the queues on * * Description: * This function tries to rerun all stopped queues if there are no * requests in-flight anymore. This function tries to solve an obvious * problem, when number of tags < than number of queues (hctx), which * are stopped and put to sleep. If last permit, which has been just put, * does not wake up all left queues (hctxs), IO requests hang forever. * * That can happen when all number of permits, say N, have been exhausted * from one CPU, and we have many block devices per session, say M. * Each block device has it's own queue (hctx) for each CPU, so eventually * we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids. * If number of permits N < M x nr_cpu_ids finally we will get an IO hang. * * To avoid this hang last caller of rnbd_put_permit() (last caller is the * one who observes sess->busy == 0) must wake up all remaining queues. * * Context: * Does not matter. */ static void rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess) { bool requeued; do { requeued = rnbd_rerun_if_needed(sess); } while (atomic_read(&sess->busy) == 0 && requeued); } static struct rtrs_permit *rnbd_get_permit(struct rnbd_clt_session *sess, enum rtrs_clt_con_type con_type, enum wait_type wait) { struct rtrs_permit *permit; permit = rtrs_clt_get_permit(sess->rtrs, con_type, wait); if (permit) /* We have a subtle rare case here, when all permits can be * consumed before busy counter increased. This is safe, * because loser will get NULL as a permit, observe 0 busy * counter and immediately restart the queue himself. */ atomic_inc(&sess->busy); return permit; } static void rnbd_put_permit(struct rnbd_clt_session *sess, struct rtrs_permit *permit) { rtrs_clt_put_permit(sess->rtrs, permit); atomic_dec(&sess->busy); /* Paired with rnbd_clt_dev_add_to_requeue(). Decrement first * and then check queue bits. */ smp_mb__after_atomic(); rnbd_rerun_all_if_idle(sess); } static struct rnbd_iu *rnbd_get_iu(struct rnbd_clt_session *sess, enum rtrs_clt_con_type con_type, enum wait_type wait) { struct rnbd_iu *iu; struct rtrs_permit *permit; iu = kzalloc(sizeof(*iu), GFP_KERNEL); if (!iu) return NULL; permit = rnbd_get_permit(sess, con_type, wait); if (!permit) { kfree(iu); return NULL; } iu->permit = permit; /* * 1st reference is dropped after finishing sending a "user" message, * 2nd reference is dropped after confirmation with the response is * returned. * 1st and 2nd can happen in any order, so the rnbd_iu should be * released (rtrs_permit returned to rtrs) only after both * are finished. */ atomic_set(&iu->refcount, 2); init_waitqueue_head(&iu->comp.wait); iu->comp.errno = INT_MAX; if (sg_alloc_table(&iu->sgt, 1, GFP_KERNEL)) { rnbd_put_permit(sess, permit); kfree(iu); return NULL; } return iu; } static void rnbd_put_iu(struct rnbd_clt_session *sess, struct rnbd_iu *iu) { if (atomic_dec_and_test(&iu->refcount)) { sg_free_table(&iu->sgt); rnbd_put_permit(sess, iu->permit); kfree(iu); } } static void rnbd_softirq_done_fn(struct request *rq) { struct rnbd_clt_dev *dev = rq->rq_disk->private_data; struct rnbd_clt_session *sess = dev->sess; struct rnbd_iu *iu; iu = blk_mq_rq_to_pdu(rq); sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT); rnbd_put_permit(sess, iu->permit); blk_mq_end_request(rq, errno_to_blk_status(iu->errno)); } static void msg_io_conf(void *priv, int errno) { struct rnbd_iu *iu = priv; struct rnbd_clt_dev *dev = iu->dev; struct request *rq = iu->rq; int rw = rq_data_dir(rq); iu->errno = errno; blk_mq_complete_request(rq); if (errno) rnbd_clt_info_rl(dev, "%s I/O failed with err: %d\n", rw == READ ? "read" : "write", errno); } static void wake_up_iu_comp(struct rnbd_iu *iu, int errno) { iu->comp.errno = errno; wake_up(&iu->comp.wait); } static void msg_conf(void *priv, int errno) { struct rnbd_iu *iu = priv; iu->errno = errno; schedule_work(&iu->work); } static int send_usr_msg(struct rtrs_clt *rtrs, int dir, struct rnbd_iu *iu, struct kvec *vec, size_t len, struct scatterlist *sg, unsigned int sg_len, void (*conf)(struct work_struct *work), int *errno, int wait) { int err; struct rtrs_clt_req_ops req_ops; INIT_WORK(&iu->work, conf); req_ops = (struct rtrs_clt_req_ops) { .priv = iu, .conf_fn = msg_conf, }; err = rtrs_clt_request(dir, &req_ops, rtrs, iu->permit, vec, 1, len, sg, sg_len); if (!err && wait) { wait_event(iu->comp.wait, iu->comp.errno != INT_MAX); *errno = iu->comp.errno; } else { *errno = 0; } return err; } static void msg_close_conf(struct work_struct *work) { struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); struct rnbd_clt_dev *dev = iu->dev; wake_up_iu_comp(iu, iu->errno); rnbd_put_iu(dev->sess, iu); rnbd_clt_put_dev(dev); } static int send_msg_close(struct rnbd_clt_dev *dev, u32 device_id, enum wait_type wait) { struct rnbd_clt_session *sess = dev->sess; struct rnbd_msg_close msg; struct rnbd_iu *iu; struct kvec vec = { .iov_base = &msg, .iov_len = sizeof(msg) }; int err, errno; iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); if (!iu) return -ENOMEM; iu->buf = NULL; iu->dev = dev; msg.hdr.type = cpu_to_le16(RNBD_MSG_CLOSE); msg.device_id = cpu_to_le32(device_id); WARN_ON(!rnbd_clt_get_dev(dev)); err = send_usr_msg(sess->rtrs, WRITE, iu, &vec, 0, NULL, 0, msg_close_conf, &errno, wait); if (err) { rnbd_clt_put_dev(dev); rnbd_put_iu(sess, iu); } else { err = errno; } rnbd_put_iu(sess, iu); return err; } static void msg_open_conf(struct work_struct *work) { struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); struct rnbd_msg_open_rsp *rsp = iu->buf; struct rnbd_clt_dev *dev = iu->dev; int errno = iu->errno; if (errno) { rnbd_clt_err(dev, "Opening failed, server responded: %d\n", errno); } else { errno = process_msg_open_rsp(dev, rsp); if (errno) { u32 device_id = le32_to_cpu(rsp->device_id); /* * If server thinks its fine, but we fail to process * then be nice and send a close to server. */ send_msg_close(dev, device_id, RTRS_PERMIT_NOWAIT); } } kfree(rsp); wake_up_iu_comp(iu, errno); rnbd_put_iu(dev->sess, iu); rnbd_clt_put_dev(dev); } static void msg_sess_info_conf(struct work_struct *work) { struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work); struct rnbd_msg_sess_info_rsp *rsp = iu->buf; struct rnbd_clt_session *sess = iu->sess; if (!iu->errno) sess->ver = min_t(u8, rsp->ver, RNBD_PROTO_VER_MAJOR); kfree(rsp); wake_up_iu_comp(iu, iu->errno); rnbd_put_iu(sess, iu); rnbd_clt_put_sess(sess); } static int send_msg_open(struct rnbd_clt_dev *dev, enum wait_type wait) { struct rnbd_clt_session *sess = dev->sess; struct rnbd_msg_open_rsp *rsp; struct rnbd_msg_open msg; struct rnbd_iu *iu; struct kvec vec = { .iov_base = &msg, .iov_len = sizeof(msg) }; int err, errno; rsp = kzalloc(sizeof(*rsp), GFP_KERNEL); if (!rsp) return -ENOMEM; iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); if (!iu) { kfree(rsp); return -ENOMEM; } iu->buf = rsp; iu->dev = dev; sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp)); msg.hdr.type = cpu_to_le16(RNBD_MSG_OPEN); msg.access_mode = dev->access_mode; strscpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name)); WARN_ON(!rnbd_clt_get_dev(dev)); err = send_usr_msg(sess->rtrs, READ, iu, &vec, sizeof(*rsp), iu->sgt.sgl, 1, msg_open_conf, &errno, wait); if (err) { rnbd_clt_put_dev(dev); rnbd_put_iu(sess, iu); kfree(rsp); } else { err = errno; } rnbd_put_iu(sess, iu); return err; } static int send_msg_sess_info(struct rnbd_clt_session *sess, enum wait_type wait) { struct rnbd_msg_sess_info_rsp *rsp; struct rnbd_msg_sess_info msg; struct rnbd_iu *iu; struct kvec vec = { .iov_base = &msg, .iov_len = sizeof(msg) }; int err, errno; rsp = kzalloc(sizeof(*rsp), GFP_KERNEL); if (!rsp) return -ENOMEM; iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT); if (!iu) { kfree(rsp); return -ENOMEM; } iu->buf = rsp; iu->sess = sess; sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp)); msg.hdr.type = cpu_to_le16(RNBD_MSG_SESS_INFO); msg.ver = RNBD_PROTO_VER_MAJOR; if (!rnbd_clt_get_sess(sess)) { /* * That can happen only in one case, when RTRS has restablished * the connection and link_ev() is called, but session is almost * dead, last reference on session is put and caller is waiting * for RTRS to close everything. */ err = -ENODEV; goto put_iu; } err = send_usr_msg(sess->rtrs, READ, iu, &vec, sizeof(*rsp), iu->sgt.sgl, 1, msg_sess_info_conf, &errno, wait); if (err) { rnbd_clt_put_sess(sess); put_iu: rnbd_put_iu(sess, iu); kfree(rsp); } else { err = errno; } rnbd_put_iu(sess, iu); return err; } static void set_dev_states_to_disconnected(struct rnbd_clt_session *sess) { struct rnbd_clt_dev *dev; struct kobject *gd_kobj; mutex_lock(&sess->lock); list_for_each_entry(dev, &sess->devs_list, list) { rnbd_clt_err(dev, "Device disconnected.\n"); mutex_lock(&dev->lock); if (dev->dev_state == DEV_STATE_MAPPED) { dev->dev_state = DEV_STATE_MAPPED_DISCONNECTED; gd_kobj = &disk_to_dev(dev->gd)->kobj; kobject_uevent(gd_kobj, KOBJ_OFFLINE); } mutex_unlock(&dev->lock); } mutex_unlock(&sess->lock); } static void remap_devs(struct rnbd_clt_session *sess) { struct rnbd_clt_dev *dev; struct rtrs_attrs attrs; int err; /* * Careful here: we are called from RTRS link event directly, * thus we can't send any RTRS request and wait for response * or RTRS will not be able to complete request with failure * if something goes wrong (failing of outstanding requests * happens exactly from the context where we are blocking now). * * So to avoid deadlocks each usr message sent from here must * be asynchronous. */ err = send_msg_sess_info(sess, RTRS_PERMIT_NOWAIT); if (err) { pr_err("send_msg_sess_info(\"%s\"): %d\n", sess->sessname, err); return; } err = rtrs_clt_query(sess->rtrs, &attrs); if (err) { pr_err("rtrs_clt_query(\"%s\"): %d\n", sess->sessname, err); return; } mutex_lock(&sess->lock); sess->max_io_size = attrs.max_io_size; list_for_each_entry(dev, &sess->devs_list, list) { bool skip; mutex_lock(&dev->lock); skip = (dev->dev_state == DEV_STATE_INIT); mutex_unlock(&dev->lock); if (skip) /* * When device is establishing connection for the first * time - do not remap, it will be closed soon. */ continue; rnbd_clt_info(dev, "session reconnected, remapping device\n"); err = send_msg_open(dev, RTRS_PERMIT_NOWAIT); if (err) { rnbd_clt_err(dev, "send_msg_open(): %d\n", err); break; } } mutex_unlock(&sess->lock); } static void rnbd_clt_link_ev(void *priv, enum rtrs_clt_link_ev ev) { struct rnbd_clt_session *sess = priv; switch (ev) { case RTRS_CLT_LINK_EV_DISCONNECTED: set_dev_states_to_disconnected(sess); break; case RTRS_CLT_LINK_EV_RECONNECTED: remap_devs(sess); break; default: pr_err("Unknown session event received (%d), session: %s\n", ev, sess->sessname); } } static void rnbd_init_cpu_qlists(struct rnbd_cpu_qlist __percpu *cpu_queues) { unsigned int cpu; struct rnbd_cpu_qlist *cpu_q; for_each_possible_cpu(cpu) { cpu_q = per_cpu_ptr(cpu_queues, cpu); cpu_q->cpu = cpu; INIT_LIST_HEAD(&cpu_q->requeue_list); spin_lock_init(&cpu_q->requeue_lock); } } static void destroy_mq_tags(struct rnbd_clt_session *sess) { if (sess->tag_set.tags) blk_mq_free_tag_set(&sess->tag_set); } static inline void wake_up_rtrs_waiters(struct rnbd_clt_session *sess) { sess->rtrs_ready = true; wake_up_all(&sess->rtrs_waitq); } static void close_rtrs(struct rnbd_clt_session *sess) { might_sleep(); if (!IS_ERR_OR_NULL(sess->rtrs)) { rtrs_clt_close(sess->rtrs); sess->rtrs = NULL; wake_up_rtrs_waiters(sess); } } static void free_sess(struct rnbd_clt_session *sess) { WARN_ON(!list_empty(&sess->devs_list)); might_sleep(); close_rtrs(sess); destroy_mq_tags(sess); if (!list_empty(&sess->list)) { mutex_lock(&sess_lock); list_del(&sess->list); mutex_unlock(&sess_lock); } free_percpu(sess->cpu_queues); free_percpu(sess->cpu_rr); mutex_destroy(&sess->lock); kfree(sess); } static struct rnbd_clt_session *alloc_sess(const char *sessname) { struct rnbd_clt_session *sess; int err, cpu; sess = kzalloc_node(sizeof(*sess), GFP_KERNEL, NUMA_NO_NODE); if (!sess) return ERR_PTR(-ENOMEM); strscpy(sess->sessname, sessname, sizeof(sess->sessname)); atomic_set(&sess->busy, 0); mutex_init(&sess->lock); INIT_LIST_HEAD(&sess->devs_list); INIT_LIST_HEAD(&sess->list); bitmap_zero(sess->cpu_queues_bm, num_possible_cpus()); init_waitqueue_head(&sess->rtrs_waitq); refcount_set(&sess->refcount, 1); sess->cpu_queues = alloc_percpu(struct rnbd_cpu_qlist); if (!sess->cpu_queues) { err = -ENOMEM; goto err; } rnbd_init_cpu_qlists(sess->cpu_queues); /* * That is simple percpu variable which stores cpu indices, which are * incremented on each access. We need that for the sake of fairness * to wake up queues in a round-robin manner. */ sess->cpu_rr = alloc_percpu(int); if (!sess->cpu_rr) { err = -ENOMEM; goto err; } for_each_possible_cpu(cpu) * per_cpu_ptr(sess->cpu_rr, cpu) = cpu; return sess; err: free_sess(sess); return ERR_PTR(err); } static int wait_for_rtrs_connection(struct rnbd_clt_session *sess) { wait_event(sess->rtrs_waitq, sess->rtrs_ready); if (IS_ERR_OR_NULL(sess->rtrs)) return -ECONNRESET; return 0; } static void wait_for_rtrs_disconnection(struct rnbd_clt_session *sess) __releases(&sess_lock) __acquires(&sess_lock) { DEFINE_WAIT(wait); prepare_to_wait(&sess->rtrs_waitq, &wait, TASK_UNINTERRUPTIBLE); if (IS_ERR_OR_NULL(sess->rtrs)) { finish_wait(&sess->rtrs_waitq, &wait); return; } mutex_unlock(&sess_lock); /* loop in caller, see __find_and_get_sess(). * You can't leave mutex locked and call schedule(), you will catch a * deadlock with a caller of free_sess(), which has just put the last * reference and is about to take the sess_lock in order to delete * the session from the list. */ schedule(); mutex_lock(&sess_lock); } static struct rnbd_clt_session *__find_and_get_sess(const char *sessname) __releases(&sess_lock) __acquires(&sess_lock) { struct rnbd_clt_session *sess, *sn; int err; again: list_for_each_entry_safe(sess, sn, &sess_list, list) { if (strcmp(sessname, sess->sessname)) continue; if (sess->rtrs_ready && IS_ERR_OR_NULL(sess->rtrs)) /* * No RTRS connection, session is dying. */ continue; if (rnbd_clt_get_sess(sess)) { /* * Alive session is found, wait for RTRS connection. */ mutex_unlock(&sess_lock); err = wait_for_rtrs_connection(sess); if (err) rnbd_clt_put_sess(sess); mutex_lock(&sess_lock); if (err) /* Session is dying, repeat the loop */ goto again; return sess; } /* * Ref is 0, session is dying, wait for RTRS disconnect * in order to avoid session names clashes. */ wait_for_rtrs_disconnection(sess); /* * RTRS is disconnected and soon session will be freed, * so repeat a loop. */ goto again; } return NULL; } /* caller is responsible for initializing 'first' to false */ static struct rnbd_clt_session *find_or_create_sess(const char *sessname, bool *first) { struct rnbd_clt_session *sess = NULL; mutex_lock(&sess_lock); sess = __find_and_get_sess(sessname); if (!sess) { sess = alloc_sess(sessname); if (IS_ERR(sess)) { mutex_unlock(&sess_lock); return sess; } list_add(&sess->list, &sess_list); *first = true; } mutex_unlock(&sess_lock); return sess; } static int rnbd_client_open(struct block_device *block_device, fmode_t mode) { struct rnbd_clt_dev *dev = block_device->bd_disk->private_data; if (dev->read_only && (mode & FMODE_WRITE)) return -EPERM; if (dev->dev_state == DEV_STATE_UNMAPPED || !rnbd_clt_get_dev(dev)) return -EIO; return 0; } static void rnbd_client_release(struct gendisk *gen, fmode_t mode) { struct rnbd_clt_dev *dev = gen->private_data; rnbd_clt_put_dev(dev); } static int rnbd_client_getgeo(struct block_device *block_device, struct hd_geometry *geo) { u64 size; struct rnbd_clt_dev *dev; dev = block_device->bd_disk->private_data; size = dev->size * (dev->logical_block_size / SECTOR_SIZE); geo->cylinders = size >> 6; /* size/64 */ geo->heads = 4; geo->sectors = 16; geo->start = 0; return 0; } static const struct block_device_operations rnbd_client_ops = { .owner = THIS_MODULE, .open = rnbd_client_open, .release = rnbd_client_release, .getgeo = rnbd_client_getgeo }; /* The amount of data that belongs to an I/O and the amount of data that * should be read or written to the disk (bi_size) can differ. * * E.g. When WRITE_SAME is used, only a small amount of data is * transferred that is then written repeatedly over a lot of sectors. * * Get the size of data to be transferred via RTRS by summing up the size * of the scather-gather list entries. */ static size_t rnbd_clt_get_sg_size(struct scatterlist *sglist, u32 len) { struct scatterlist *sg; size_t tsize = 0; int i; for_each_sg(sglist, sg, len, i) tsize += sg->length; return tsize; } static int rnbd_client_xfer_request(struct rnbd_clt_dev *dev, struct request *rq, struct rnbd_iu *iu) { struct rtrs_clt *rtrs = dev->sess->rtrs; struct rtrs_permit *permit = iu->permit; struct rnbd_msg_io msg; struct rtrs_clt_req_ops req_ops; unsigned int sg_cnt = 0; struct kvec vec; size_t size; int err; iu->rq = rq; iu->dev = dev; msg.sector = cpu_to_le64(blk_rq_pos(rq)); msg.bi_size = cpu_to_le32(blk_rq_bytes(rq)); msg.rw = cpu_to_le32(rq_to_rnbd_flags(rq)); msg.prio = cpu_to_le16(req_get_ioprio(rq)); /* * We only support discards with single segment for now. * See queue limits. */ if (req_op(rq) != REQ_OP_DISCARD) sg_cnt = blk_rq_map_sg(dev->queue, rq, iu->sgt.sgl); if (sg_cnt == 0) sg_mark_end(&iu->sgt.sgl[0]); msg.hdr.type = cpu_to_le16(RNBD_MSG_IO); msg.device_id = cpu_to_le32(dev->device_id); vec = (struct kvec) { .iov_base = &msg, .iov_len = sizeof(msg) }; size = rnbd_clt_get_sg_size(iu->sgt.sgl, sg_cnt); req_ops = (struct rtrs_clt_req_ops) { .priv = iu, .conf_fn = msg_io_conf, }; err = rtrs_clt_request(rq_data_dir(rq), &req_ops, rtrs, permit, &vec, 1, size, iu->sgt.sgl, sg_cnt); if (err) { rnbd_clt_err_rl(dev, "RTRS failed to transfer IO, err: %d\n", err); return err; } return 0; } /** * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy * @dev: Device to be checked * @q: Queue to be added to the requeue list if required * * Description: * If session is busy, that means someone will requeue us when resources * are freed. If session is not doing anything - device is not added to * the list and @false is returned. */ static bool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev *dev, struct rnbd_queue *q) { struct rnbd_clt_session *sess = dev->sess; struct rnbd_cpu_qlist *cpu_q; unsigned long flags; bool added = true; bool need_set; cpu_q = get_cpu_ptr(sess->cpu_queues); spin_lock_irqsave(&cpu_q->requeue_lock, flags); if (!test_and_set_bit_lock(0, &q->in_list)) { if (WARN_ON(!list_empty(&q->requeue_list))) goto unlock; need_set = !test_bit(cpu_q->cpu, sess->cpu_queues_bm); if (need_set) { set_bit(cpu_q->cpu, sess->cpu_queues_bm); /* Paired with rnbd_put_permit(). Set a bit first * and then observe the busy counter. */ smp_mb__before_atomic(); } if (atomic_read(&sess->busy)) { list_add_tail(&q->requeue_list, &cpu_q->requeue_list); } else { /* Very unlikely, but possible: busy counter was * observed as zero. Drop all bits and return * false to restart the queue by ourselves. */ if (need_set) clear_bit(cpu_q->cpu, sess->cpu_queues_bm); clear_bit_unlock(0, &q->in_list); added = false; } } unlock: spin_unlock_irqrestore(&cpu_q->requeue_lock, flags); put_cpu_ptr(sess->cpu_queues); return added; } static void rnbd_clt_dev_kick_mq_queue(struct rnbd_clt_dev *dev, struct blk_mq_hw_ctx *hctx, int delay) { struct rnbd_queue *q = hctx->driver_data; if (delay != RNBD_DELAY_IFBUSY) blk_mq_delay_run_hw_queue(hctx, delay); else if (!rnbd_clt_dev_add_to_requeue(dev, q)) /* * If session is not busy we have to restart * the queue ourselves. */ blk_mq_delay_run_hw_queue(hctx, 10/*ms*/); } static blk_status_t rnbd_queue_rq(struct blk_mq_hw_ctx *hctx, const struct blk_mq_queue_data *bd) { struct request *rq = bd->rq; struct rnbd_clt_dev *dev = rq->rq_disk->private_data; struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq); int err; blk_status_t ret = BLK_STS_IOERR; if (dev->dev_state != DEV_STATE_MAPPED) return BLK_STS_IOERR; iu->permit = rnbd_get_permit(dev->sess, RTRS_IO_CON, RTRS_PERMIT_NOWAIT); if (!iu->permit) { rnbd_clt_dev_kick_mq_queue(dev, hctx, RNBD_DELAY_IFBUSY); return BLK_STS_RESOURCE; } iu->sgt.sgl = iu->first_sgl; err = sg_alloc_table_chained(&iu->sgt, /* Even-if the request has no segment, * sglist must have one entry at least. */ blk_rq_nr_phys_segments(rq) ? : 1, iu->sgt.sgl, RNBD_INLINE_SG_CNT); if (err) { rnbd_clt_err_rl(dev, "sg_alloc_table_chained ret=%d\n", err); rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/); rnbd_put_permit(dev->sess, iu->permit); return BLK_STS_RESOURCE; } blk_mq_start_request(rq); err = rnbd_client_xfer_request(dev, rq, iu); if (err == 0) return BLK_STS_OK; if (err == -EAGAIN || err == -ENOMEM) { rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/); ret = BLK_STS_RESOURCE; } sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT); rnbd_put_permit(dev->sess, iu->permit); return ret; } static int rnbd_rdma_poll(struct blk_mq_hw_ctx *hctx, struct io_comp_batch *iob) { struct rnbd_queue *q = hctx->driver_data; struct rnbd_clt_dev *dev = q->dev; int cnt; cnt = rtrs_clt_rdma_cq_direct(dev->sess->rtrs, hctx->queue_num); return cnt; } static int rnbd_rdma_map_queues(struct blk_mq_tag_set *set) { struct rnbd_clt_session *sess = set->driver_data; /* shared read/write queues */ set->map[HCTX_TYPE_DEFAULT].nr_queues = num_online_cpus(); set->map[HCTX_TYPE_DEFAULT].queue_offset = 0; set->map[HCTX_TYPE_READ].nr_queues = num_online_cpus(); set->map[HCTX_TYPE_READ].queue_offset = 0; blk_mq_map_queues(&set->map[HCTX_TYPE_DEFAULT]); blk_mq_map_queues(&set->map[HCTX_TYPE_READ]); if (sess->nr_poll_queues) { /* dedicated queue for poll */ set->map[HCTX_TYPE_POLL].nr_queues = sess->nr_poll_queues; set->map[HCTX_TYPE_POLL].queue_offset = set->map[HCTX_TYPE_READ].queue_offset + set->map[HCTX_TYPE_READ].nr_queues; blk_mq_map_queues(&set->map[HCTX_TYPE_POLL]); pr_info("[session=%s] mapped %d/%d/%d default/read/poll queues.\n", sess->sessname, set->map[HCTX_TYPE_DEFAULT].nr_queues, set->map[HCTX_TYPE_READ].nr_queues, set->map[HCTX_TYPE_POLL].nr_queues); } else { pr_info("[session=%s] mapped %d/%d default/read queues.\n", sess->sessname, set->map[HCTX_TYPE_DEFAULT].nr_queues, set->map[HCTX_TYPE_READ].nr_queues); } return 0; } static struct blk_mq_ops rnbd_mq_ops = { .queue_rq = rnbd_queue_rq, .complete = rnbd_softirq_done_fn, .map_queues = rnbd_rdma_map_queues, .poll = rnbd_rdma_poll, }; static int setup_mq_tags(struct rnbd_clt_session *sess) { struct blk_mq_tag_set *tag_set = &sess->tag_set; memset(tag_set, 0, sizeof(*tag_set)); tag_set->ops = &rnbd_mq_ops; tag_set->queue_depth = sess->queue_depth; tag_set->numa_node = NUMA_NO_NODE; tag_set->flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_TAG_QUEUE_SHARED; tag_set->cmd_size = sizeof(struct rnbd_iu) + RNBD_RDMA_SGL_SIZE; /* for HCTX_TYPE_DEFAULT, HCTX_TYPE_READ, HCTX_TYPE_POLL */ tag_set->nr_maps = sess->nr_poll_queues ? HCTX_MAX_TYPES : 2; /* * HCTX_TYPE_DEFAULT and HCTX_TYPE_READ share one set of queues * others are for HCTX_TYPE_POLL */ tag_set->nr_hw_queues = num_online_cpus() + sess->nr_poll_queues; tag_set->driver_data = sess; return blk_mq_alloc_tag_set(tag_set); } static struct rnbd_clt_session * find_and_get_or_create_sess(const char *sessname, const struct rtrs_addr *paths, size_t path_cnt, u16 port_nr, u32 nr_poll_queues) { struct rnbd_clt_session *sess; struct rtrs_attrs attrs; int err; bool first = false; struct rtrs_clt_ops rtrs_ops; sess = find_or_create_sess(sessname, &first); if (sess == ERR_PTR(-ENOMEM)) return ERR_PTR(-ENOMEM); else if ((nr_poll_queues && !first) || (!nr_poll_queues && sess->nr_poll_queues)) { /* * A device MUST have its own session to use the polling-mode. * It must fail to map new device with the same session. */ err = -EINVAL; goto put_sess; } if (!first) return sess; if (!path_cnt) { pr_err("Session %s not found, and path parameter not given", sessname); err = -ENXIO; goto put_sess; } rtrs_ops = (struct rtrs_clt_ops) { .priv = sess, .link_ev = rnbd_clt_link_ev, }; /* * Nothing was found, establish rtrs connection and proceed further. */ sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname, paths, path_cnt, port_nr, 0, /* Do not use pdu of rtrs */ RECONNECT_DELAY, MAX_RECONNECTS, nr_poll_queues); if (IS_ERR(sess->rtrs)) { err = PTR_ERR(sess->rtrs); goto wake_up_and_put; } err = rtrs_clt_query(sess->rtrs, &attrs); if (err) goto close_rtrs; sess->max_io_size = attrs.max_io_size; sess->queue_depth = attrs.queue_depth; sess->nr_poll_queues = nr_poll_queues; sess->max_segments = attrs.max_segments; err = setup_mq_tags(sess); if (err) goto close_rtrs; err = send_msg_sess_info(sess, RTRS_PERMIT_WAIT); if (err) goto close_rtrs; wake_up_rtrs_waiters(sess); return sess; close_rtrs: close_rtrs(sess); put_sess: rnbd_clt_put_sess(sess); return ERR_PTR(err); wake_up_and_put: wake_up_rtrs_waiters(sess); goto put_sess; } static inline void rnbd_init_hw_queue(struct rnbd_clt_dev *dev, struct rnbd_queue *q, struct blk_mq_hw_ctx *hctx) { INIT_LIST_HEAD(&q->requeue_list); q->dev = dev; q->hctx = hctx; } static void rnbd_init_mq_hw_queues(struct rnbd_clt_dev *dev) { int i; struct blk_mq_hw_ctx *hctx; struct rnbd_queue *q; queue_for_each_hw_ctx(dev->queue, hctx, i) { q = &dev->hw_queues[i]; rnbd_init_hw_queue(dev, q, hctx); hctx->driver_data = q; } } static void setup_request_queue(struct rnbd_clt_dev *dev) { blk_queue_logical_block_size(dev->queue, dev->logical_block_size); blk_queue_physical_block_size(dev->queue, dev->physical_block_size); blk_queue_max_hw_sectors(dev->queue, dev->max_hw_sectors); blk_queue_max_write_same_sectors(dev->queue, dev->max_write_same_sectors); /* * we don't support discards to "discontiguous" segments * in on request */ blk_queue_max_discard_segments(dev->queue, 1); blk_queue_max_discard_sectors(dev->queue, dev->max_discard_sectors); dev->queue->limits.discard_granularity = dev->discard_granularity; dev->queue->limits.discard_alignment = dev->discard_alignment; if (dev->max_discard_sectors) blk_queue_flag_set(QUEUE_FLAG_DISCARD, dev->queue); if (dev->secure_discard) blk_queue_flag_set(QUEUE_FLAG_SECERASE, dev->queue); blk_queue_flag_set(QUEUE_FLAG_SAME_COMP, dev->queue); blk_queue_flag_set(QUEUE_FLAG_SAME_FORCE, dev->queue); blk_queue_max_segments(dev->queue, dev->max_segments); blk_queue_io_opt(dev->queue, dev->sess->max_io_size); blk_queue_virt_boundary(dev->queue, SZ_4K - 1); blk_queue_write_cache(dev->queue, dev->wc, dev->fua); } static int rnbd_clt_setup_gen_disk(struct rnbd_clt_dev *dev, int idx) { int err; dev->gd->major = rnbd_client_major; dev->gd->first_minor = idx << RNBD_PART_BITS; dev->gd->minors = 1 << RNBD_PART_BITS; dev->gd->fops = &rnbd_client_ops; dev->gd->queue = dev->queue; dev->gd->private_data = dev; snprintf(dev->gd->disk_name, sizeof(dev->gd->disk_name), "rnbd%d", idx); pr_debug("disk_name=%s, capacity=%zu\n", dev->gd->disk_name, dev->nsectors * (dev->logical_block_size / SECTOR_SIZE) ); set_capacity(dev->gd, dev->nsectors); if (dev->access_mode == RNBD_ACCESS_RO) { dev->read_only = true; set_disk_ro(dev->gd, true); } else { dev->read_only = false; } if (!dev->rotational) blk_queue_flag_set(QUEUE_FLAG_NONROT, dev->queue); err = add_disk(dev->gd); if (err) blk_cleanup_disk(dev->gd); return err; } static int rnbd_client_setup_device(struct rnbd_clt_dev *dev) { int idx = dev->clt_device_id; dev->size = dev->nsectors * dev->logical_block_size; dev->gd = blk_mq_alloc_disk(&dev->sess->tag_set, dev); if (IS_ERR(dev->gd)) return PTR_ERR(dev->gd); dev->queue = dev->gd->queue; rnbd_init_mq_hw_queues(dev); setup_request_queue(dev); return rnbd_clt_setup_gen_disk(dev, idx); } static struct rnbd_clt_dev *init_dev(struct rnbd_clt_session *sess, enum rnbd_access_mode access_mode, const char *pathname, u32 nr_poll_queues) { struct rnbd_clt_dev *dev; int ret; dev = kzalloc_node(sizeof(*dev), GFP_KERNEL, NUMA_NO_NODE); if (!dev) return ERR_PTR(-ENOMEM); /* * nr_cpu_ids: the number of softirq queues * nr_poll_queues: the number of polling queues */ dev->hw_queues = kcalloc(nr_cpu_ids + nr_poll_queues, sizeof(*dev->hw_queues), GFP_KERNEL); if (!dev->hw_queues) { ret = -ENOMEM; goto out_alloc; } mutex_lock(&ida_lock); ret = ida_simple_get(&index_ida, 0, 1 << (MINORBITS - RNBD_PART_BITS), GFP_KERNEL); mutex_unlock(&ida_lock); if (ret < 0) { pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n", pathname, sess->sessname, ret); goto out_queues; } dev->pathname = kstrdup(pathname, GFP_KERNEL); if (!dev->pathname) { ret = -ENOMEM; goto out_queues; } dev->clt_device_id = ret; dev->sess = sess; dev->access_mode = access_mode; dev->nr_poll_queues = nr_poll_queues; mutex_init(&dev->lock); refcount_set(&dev->refcount, 1); dev->dev_state = DEV_STATE_INIT; /* * Here we called from sysfs entry, thus clt-sysfs is * responsible that session will not disappear. */ WARN_ON(!rnbd_clt_get_sess(sess)); return dev; out_queues: kfree(dev->hw_queues); out_alloc: kfree(dev); return ERR_PTR(ret); } static bool __exists_dev(const char *pathname, const char *sessname) { struct rnbd_clt_session *sess; struct rnbd_clt_dev *dev; bool found = false; list_for_each_entry(sess, &sess_list, list) { if (sessname && strncmp(sess->sessname, sessname, sizeof(sess->sessname))) continue; mutex_lock(&sess->lock); list_for_each_entry(dev, &sess->devs_list, list) { if (strlen(dev->pathname) == strlen(pathname) && !strcmp(dev->pathname, pathname)) { found = true; break; } } mutex_unlock(&sess->lock); if (found) break; } return found; } static bool exists_devpath(const char *pathname, const char *sessname) { bool found; mutex_lock(&sess_lock); found = __exists_dev(pathname, sessname); mutex_unlock(&sess_lock); return found; } static bool insert_dev_if_not_exists_devpath(struct rnbd_clt_dev *dev) { bool found; struct rnbd_clt_session *sess = dev->sess; mutex_lock(&sess_lock); found = __exists_dev(dev->pathname, sess->sessname); if (!found) { mutex_lock(&sess->lock); list_add_tail(&dev->list, &sess->devs_list); mutex_unlock(&sess->lock); } mutex_unlock(&sess_lock); return found; } static void delete_dev(struct rnbd_clt_dev *dev) { struct rnbd_clt_session *sess = dev->sess; mutex_lock(&sess->lock); list_del(&dev->list); mutex_unlock(&sess->lock); } struct rnbd_clt_dev *rnbd_clt_map_device(const char *sessname, struct rtrs_addr *paths, size_t path_cnt, u16 port_nr, const char *pathname, enum rnbd_access_mode access_mode, u32 nr_poll_queues) { struct rnbd_clt_session *sess; struct rnbd_clt_dev *dev; int ret; if (exists_devpath(pathname, sessname)) return ERR_PTR(-EEXIST); sess = find_and_get_or_create_sess(sessname, paths, path_cnt, port_nr, nr_poll_queues); if (IS_ERR(sess)) return ERR_CAST(sess); dev = init_dev(sess, access_mode, pathname, nr_poll_queues); if (IS_ERR(dev)) { pr_err("map_device: failed to map device '%s' from session %s, can't initialize device, err: %ld\n", pathname, sess->sessname, PTR_ERR(dev)); ret = PTR_ERR(dev); goto put_sess; } if (insert_dev_if_not_exists_devpath(dev)) { ret = -EEXIST; goto put_dev; } ret = send_msg_open(dev, RTRS_PERMIT_WAIT); if (ret) { rnbd_clt_err(dev, "map_device: failed, can't open remote device, err: %d\n", ret); goto del_dev; } mutex_lock(&dev->lock); pr_debug("Opened remote device: session=%s, path='%s'\n", sess->sessname, pathname); ret = rnbd_client_setup_device(dev); if (ret) { rnbd_clt_err(dev, "map_device: Failed to configure device, err: %d\n", ret); mutex_unlock(&dev->lock); goto send_close; } rnbd_clt_info(dev, "map_device: Device mapped as %s (nsectors: %zu, logical_block_size: %d, physical_block_size: %d, max_write_same_sectors: %d, max_discard_sectors: %d, discard_granularity: %d, discard_alignment: %d, secure_discard: %d, max_segments: %d, max_hw_sectors: %d, rotational: %d, wc: %d, fua: %d)\n", dev->gd->disk_name, dev->nsectors, dev->logical_block_size, dev->physical_block_size, dev->max_write_same_sectors, dev->max_discard_sectors, dev->discard_granularity, dev->discard_alignment, dev->secure_discard, dev->max_segments, dev->max_hw_sectors, dev->rotational, dev->wc, dev->fua); mutex_unlock(&dev->lock); rnbd_clt_put_sess(sess); return dev; send_close: send_msg_close(dev, dev->device_id, RTRS_PERMIT_WAIT); del_dev: delete_dev(dev); put_dev: rnbd_clt_put_dev(dev); put_sess: rnbd_clt_put_sess(sess); return ERR_PTR(ret); } static void destroy_gen_disk(struct rnbd_clt_dev *dev) { del_gendisk(dev->gd); blk_cleanup_disk(dev->gd); } static void destroy_sysfs(struct rnbd_clt_dev *dev, const struct attribute *sysfs_self) { rnbd_clt_remove_dev_symlink(dev); if (dev->kobj.state_initialized) { if (sysfs_self) /* To avoid deadlock firstly remove itself */ sysfs_remove_file_self(&dev->kobj, sysfs_self); kobject_del(&dev->kobj); kobject_put(&dev->kobj); } } int rnbd_clt_unmap_device(struct rnbd_clt_dev *dev, bool force, const struct attribute *sysfs_self) { struct rnbd_clt_session *sess = dev->sess; int refcount, ret = 0; bool was_mapped; mutex_lock(&dev->lock); if (dev->dev_state == DEV_STATE_UNMAPPED) { rnbd_clt_info(dev, "Device is already being unmapped\n"); ret = -EALREADY; goto err; } refcount = refcount_read(&dev->refcount); if (!force && refcount > 1) { rnbd_clt_err(dev, "Closing device failed, device is in use, (%d device users)\n", refcount - 1); ret = -EBUSY; goto err; } was_mapped = (dev->dev_state == DEV_STATE_MAPPED); dev->dev_state = DEV_STATE_UNMAPPED; mutex_unlock(&dev->lock); delete_dev(dev); destroy_sysfs(dev, sysfs_self); destroy_gen_disk(dev); if (was_mapped && sess->rtrs) send_msg_close(dev, dev->device_id, RTRS_PERMIT_WAIT); rnbd_clt_info(dev, "Device is unmapped\n"); /* Likely last reference put */ rnbd_clt_put_dev(dev); /* * Here device and session can be vanished! */ return 0; err: mutex_unlock(&dev->lock); return ret; } int rnbd_clt_remap_device(struct rnbd_clt_dev *dev) { int err; mutex_lock(&dev->lock); if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) err = 0; else if (dev->dev_state == DEV_STATE_UNMAPPED) err = -ENODEV; else if (dev->dev_state == DEV_STATE_MAPPED) err = -EALREADY; else err = -EBUSY; mutex_unlock(&dev->lock); if (!err) { rnbd_clt_info(dev, "Remapping device.\n"); err = send_msg_open(dev, RTRS_PERMIT_WAIT); if (err) rnbd_clt_err(dev, "remap_device: %d\n", err); } return err; } static void unmap_device_work(struct work_struct *work) { struct rnbd_clt_dev *dev; dev = container_of(work, typeof(*dev), unmap_on_rmmod_work); rnbd_clt_unmap_device(dev, true, NULL); } static void rnbd_destroy_sessions(void) { struct rnbd_clt_session *sess, *sn; struct rnbd_clt_dev *dev, *tn; /* Firstly forbid access through sysfs interface */ rnbd_clt_destroy_sysfs_files(); /* * Here at this point there is no any concurrent access to sessions * list and devices list: * 1. New session or device can't be created - session sysfs files * are removed. * 2. Device or session can't be removed - module reference is taken * into account in unmap device sysfs callback. * 3. No IO requests inflight - each file open of block_dev increases * module reference in get_disk(). * * But still there can be user requests inflights, which are sent by * asynchronous send_msg_*() functions, thus before unmapping devices * RTRS session must be explicitly closed. */ list_for_each_entry_safe(sess, sn, &sess_list, list) { if (!rnbd_clt_get_sess(sess)) continue; close_rtrs(sess); list_for_each_entry_safe(dev, tn, &sess->devs_list, list) { /* * Here unmap happens in parallel for only one reason: * blk_cleanup_queue() takes around half a second, so * on huge amount of devices the whole module unload * procedure takes minutes. */ INIT_WORK(&dev->unmap_on_rmmod_work, unmap_device_work); queue_work(system_long_wq, &dev->unmap_on_rmmod_work); } rnbd_clt_put_sess(sess); } /* Wait for all scheduled unmap works */ flush_workqueue(system_long_wq); WARN_ON(!list_empty(&sess_list)); } static int __init rnbd_client_init(void) { int err = 0; BUILD_BUG_ON(sizeof(struct rnbd_msg_hdr) != 4); BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info) != 36); BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info_rsp) != 36); BUILD_BUG_ON(sizeof(struct rnbd_msg_open) != 264); BUILD_BUG_ON(sizeof(struct rnbd_msg_close) != 8); BUILD_BUG_ON(sizeof(struct rnbd_msg_open_rsp) != 56); rnbd_client_major = register_blkdev(rnbd_client_major, "rnbd"); if (rnbd_client_major <= 0) { pr_err("Failed to load module, block device registration failed\n"); return -EBUSY; } err = rnbd_clt_create_sysfs_files(); if (err) { pr_err("Failed to load module, creating sysfs device files failed, err: %d\n", err); unregister_blkdev(rnbd_client_major, "rnbd"); } return err; } static void __exit rnbd_client_exit(void) { rnbd_destroy_sessions(); unregister_blkdev(rnbd_client_major, "rnbd"); ida_destroy(&index_ida); } module_init(rnbd_client_init); module_exit(rnbd_client_exit);